Skip to main content

email/imap/
mod.rs

1pub mod config;
2mod error;
3
4use std::{
5    collections::HashMap, env, fmt, io::ErrorKind::ConnectionReset, num::NonZeroU32, sync::Arc,
6    time::Duration,
7};
8
9use async_trait::async_trait;
10use futures::{stream::FuturesUnordered, StreamExt};
11use imap_client::{
12    client::tokio::{Client, ClientError},
13    imap_next::imap_types::{
14        auth::AuthMechanism,
15        core::{IString, NString, Vec1},
16        extensions::{
17            sort::SortCriterion,
18            thread::{Thread, ThreadingAlgorithm},
19        },
20        fetch::MessageDataItem,
21        flag::{Flag, StoreType},
22        search::SearchKey,
23        sequence::SequenceSet,
24    },
25    stream::Error as StreamError,
26    tasks::{tasks::select::SelectDataUnvalidated, SchedulerError},
27};
28use once_cell::sync::Lazy;
29use tokio::{
30    select,
31    sync::{oneshot, Mutex, MutexGuard},
32    time::sleep,
33};
34use tracing::{debug, instrument, trace, warn};
35
36use self::config::{ImapAuthConfig, ImapConfig};
37#[doc(inline)]
38pub use self::error::{Error, Result};
39#[cfg(feature = "oauth2")]
40use crate::account::config::oauth2::OAuth2Method;
41#[cfg(feature = "thread")]
42use crate::envelope::thread::{imap::ThreadImapEnvelopes, ThreadEnvelopes};
43#[cfg(feature = "watch")]
44use crate::envelope::watch::{imap::WatchImapEnvelopes, WatchEnvelopes};
45use crate::{
46    account::config::AccountConfig,
47    backend::{
48        context::{BackendContext, BackendContextBuilder},
49        feature::{BackendFeature, CheckUp},
50    },
51    envelope::{
52        get::{imap::GetImapEnvelope, GetEnvelope},
53        imap::FETCH_ENVELOPES,
54        list::{imap::ListImapEnvelopes, ListEnvelopes},
55        Envelope, Envelopes,
56    },
57    flag::{
58        add::{imap::AddImapFlags, AddFlags},
59        remove::{imap::RemoveImapFlags, RemoveFlags},
60        set::{imap::SetImapFlags, SetFlags},
61    },
62    folder::{
63        add::{imap::AddImapFolder, AddFolder},
64        delete::{imap::DeleteImapFolder, DeleteFolder},
65        expunge::{imap::ExpungeImapFolder, ExpungeFolder},
66        list::{imap::ListImapFolders, ListFolders},
67        purge::{imap::PurgeImapFolder, PurgeFolder},
68        Folders,
69    },
70    message::{
71        add::{imap::AddImapMessage, AddMessage},
72        copy::{imap::CopyImapMessages, CopyMessages},
73        delete::{imap::DeleteImapMessages, DeleteMessages},
74        get::{imap::GetImapMessages, GetMessages},
75        imap::{FETCH_MESSAGES, PEEK_MESSAGES},
76        peek::{imap::PeekImapMessages, PeekMessages},
77        r#move::{imap::MoveImapMessages, MoveMessages},
78        remove::{imap::RemoveImapMessages, RemoveMessages},
79        Messages,
80    },
81    retry::{self, Retry, RetryState},
82    tls::{Encryption, Tls, TlsProvider},
83    AnyResult,
84};
85
86static ID_PARAMS: Lazy<Vec<(IString<'static>, NString<'static>)>> = Lazy::new(|| {
87    vec![
88        (
89            "name".try_into().unwrap(),
90            NString(
91                env::var("CARGO_PKG_NAME")
92                    .ok()
93                    .map(|e| e.try_into().unwrap()),
94            ),
95        ),
96        (
97            "vendor".try_into().unwrap(),
98            NString(
99                env::var("CARGO_PKG_NAME")
100                    .ok()
101                    .map(|e| e.try_into().unwrap()),
102            ),
103        ),
104        (
105            "version".try_into().unwrap(),
106            NString(
107                env::var("CARGO_PKG_VERSION")
108                    .ok()
109                    .map(|e| e.try_into().unwrap()),
110            ),
111        ),
112        (
113            "support-url".try_into().unwrap(),
114            NString(Some(
115                "https://github.com/orgs/pimalaya/discussions/new?category=q-a"
116                    .try_into()
117                    .unwrap(),
118            )),
119        ),
120    ]
121});
122
123enum ImapRetryState<T> {
124    Retry,
125    TimedOut,
126    Ok(std::result::Result<T, ClientError>),
127}
128
129/// The IMAP backend context.
130///
131/// This context is unsync, which means it cannot be shared between
132/// threads. For the sync version, see [`ImapContextSync`].
133pub struct ImapClient {
134    pub id: u8,
135
136    /// The account configuration.
137    pub account_config: Arc<AccountConfig>,
138
139    /// The IMAP configuration.
140    pub imap_config: Arc<ImapConfig>,
141
142    /// The next gen IMAP client builder.
143    pub client_builder: ImapClientBuilder,
144
145    /// The next gen IMAP client.
146    inner: Client,
147
148    /// The selected mailbox.
149    mailbox: Option<String>,
150
151    retry: Retry,
152}
153
154impl ImapClient {
155    async fn retry<T>(
156        &mut self,
157        res: retry::Result<std::result::Result<T, ClientError>>,
158    ) -> Result<ImapRetryState<T>> {
159        match self.retry.next(res) {
160            RetryState::Retry => {
161                debug!(attempt = self.retry.attempts, "request timed out");
162                Ok(ImapRetryState::Retry)
163            }
164            RetryState::TimedOut => {
165                return Ok(ImapRetryState::TimedOut);
166            }
167            RetryState::Ok(Err(ClientError::Stream(err))) => {
168                match err {
169                    StreamError::State(SchedulerError::UnexpectedByeResponse(bye)) => {
170                        debug!(reason = bye.text.to_string(), "stream closed");
171                    }
172                    StreamError::Io(err) if err.kind() == ConnectionReset => {
173                        debug!("connection reset");
174                    }
175                    StreamError::Closed => {
176                        debug!("stream closed");
177                    }
178                    StreamError::Io(err) if err.kind() == ConnectionReset => {
179                        debug!("connection reset");
180                    }
181                    err => {
182                        let err = ClientError::Stream(err);
183                        return Ok(ImapRetryState::Ok(Err(err)));
184                    }
185                };
186
187                debug!("re-connecting…");
188
189                self.inner = self.client_builder.build().await?;
190
191                if let Some(mbox) = &self.mailbox {
192                    self.inner
193                        .select(mbox.clone())
194                        .await
195                        .map_err(Error::SelectMailboxError)?;
196                }
197
198                self.retry.attempts = 0;
199                Ok(ImapRetryState::Retry)
200            }
201            RetryState::Ok(res) => {
202                return Ok(ImapRetryState::Ok(res));
203            }
204        }
205    }
206
207    pub fn ext_sort_supported(&self) -> bool {
208        self.inner.state.ext_sort_supported()
209    }
210
211    #[instrument(skip_all, fields(client = self.id))]
212    pub async fn noop(&mut self) -> Result<()> {
213        self.retry.reset();
214
215        loop {
216            let res = self.retry.timeout(self.inner.noop()).await;
217
218            match self.retry(res).await? {
219                ImapRetryState::Retry => continue,
220                ImapRetryState::TimedOut => break Err(Error::NoOpTimedOutError),
221                ImapRetryState::Ok(res) => break res.map_err(Error::NoOpError),
222            }
223        }
224    }
225
226    #[instrument(skip_all, fields(client = self.id))]
227    pub async fn select_mailbox(&mut self, mbox: impl ToString) -> Result<SelectDataUnvalidated> {
228        self.retry.reset();
229
230        let data = loop {
231            let res = self
232                .retry
233                .timeout(self.inner.select(mbox.to_string()))
234                .await;
235
236            match self.retry(res).await? {
237                ImapRetryState::Retry => continue,
238                ImapRetryState::TimedOut => break Err(Error::SelectMailboxTimedOutError),
239                ImapRetryState::Ok(res) => break res.map_err(Error::SelectMailboxError),
240            }
241        }?;
242
243        self.mailbox = Some(mbox.to_string());
244
245        Ok(data)
246    }
247
248    #[instrument(skip_all, fields(client = self.id))]
249    pub async fn examine_mailbox(&mut self, mbox: impl ToString) -> Result<SelectDataUnvalidated> {
250        self.retry.reset();
251
252        loop {
253            let res = self
254                .retry
255                .timeout(self.inner.examine(mbox.to_string()))
256                .await;
257
258            match self.retry(res).await? {
259                ImapRetryState::Retry => continue,
260                ImapRetryState::TimedOut => break Err(Error::ExamineMailboxTimedOutError),
261                ImapRetryState::Ok(res) => break res.map_err(Error::ExamineMailboxError),
262            }
263        }
264    }
265
266    #[instrument(skip_all, fields(client = self.id))]
267    pub async fn create_mailbox(&mut self, mbox: impl ToString) -> Result<()> {
268        self.retry.reset();
269
270        loop {
271            let res = self
272                .retry
273                .timeout(self.inner.create(mbox.to_string()))
274                .await;
275
276            match self.retry(res).await? {
277                ImapRetryState::Retry => continue,
278                ImapRetryState::TimedOut => break Err(Error::CreateMailboxTimedOutError),
279                ImapRetryState::Ok(res) => break res.map_err(Error::CreateMailboxError),
280            }
281        }
282    }
283
284    #[instrument(skip_all, fields(client = self.id))]
285    pub async fn list_all_mailboxes(&mut self, config: &AccountConfig) -> Result<Folders> {
286        self.retry.reset();
287
288        let mboxes = loop {
289            let res = self.retry.timeout(self.inner.list("", "*")).await;
290
291            match self.retry(res).await? {
292                ImapRetryState::Retry => continue,
293                ImapRetryState::TimedOut => break Err(Error::ListMailboxesTimedOutError),
294                ImapRetryState::Ok(res) => break res.map_err(Error::ListMailboxesError),
295            }
296        }?;
297
298        let folders = Folders::from_imap_mailboxes(config, mboxes);
299
300        Ok(folders)
301    }
302
303    #[instrument(skip_all, fields(client = self.id))]
304    pub async fn expunge_mailbox(&mut self, mbox: impl ToString) -> Result<usize> {
305        self.select_mailbox(mbox).await?;
306
307        self.retry.reset();
308
309        let expunged = loop {
310            let res = self.retry.timeout(self.inner.expunge()).await;
311
312            match self.retry(res).await? {
313                ImapRetryState::Retry => continue,
314                ImapRetryState::TimedOut => break Err(Error::ExpungeMailboxTimedOutError),
315                ImapRetryState::Ok(res) => break res.map_err(Error::ExpungeMailboxError),
316            }
317        }?;
318
319        Ok(expunged.len())
320    }
321
322    #[instrument(skip_all, fields(client = self.id))]
323    pub async fn purge_mailbox(&mut self, mbox: impl ToString) -> Result<usize> {
324        self.select_mailbox(mbox).await?;
325
326        self.add_deleted_flag_silently("1:*".try_into().unwrap())
327            .await?;
328
329        let expunged = loop {
330            let res = self.retry.timeout(self.inner.expunge()).await;
331
332            match self.retry(res).await? {
333                ImapRetryState::Retry => continue,
334                ImapRetryState::TimedOut => break Err(Error::ExpungeMailboxTimedOutError),
335                ImapRetryState::Ok(res) => break res.map_err(Error::ExpungeMailboxError),
336            }
337        }?;
338
339        Ok(expunged.len())
340    }
341
342    #[instrument(skip_all, fields(client = self.id))]
343    pub async fn delete_mailbox(&mut self, mbox: impl ToString) -> Result<()> {
344        self.retry.reset();
345
346        loop {
347            let res = self
348                .retry
349                .timeout(self.inner.delete(mbox.to_string()))
350                .await;
351
352            match self.retry(res).await? {
353                ImapRetryState::Retry => continue,
354                ImapRetryState::TimedOut => break Err(Error::DeleteMailboxTimedOutError),
355                ImapRetryState::Ok(res) => break res.map_err(Error::DeleteMailboxError),
356            }
357        }
358    }
359
360    #[instrument(skip_all, fields(client = self.id))]
361    pub async fn fetch_envelopes(&mut self, uids: SequenceSet) -> Result<Envelopes> {
362        self.retry.reset();
363
364        let fetches = loop {
365            let res = self
366                .retry
367                .timeout(self.inner.uid_fetch(uids.clone(), FETCH_ENVELOPES.clone()))
368                .await;
369
370            match self.retry(res).await? {
371                ImapRetryState::Retry => continue,
372                ImapRetryState::TimedOut => break Err(Error::FetchMessagesTimedOutError),
373                ImapRetryState::Ok(res) => break res.map_err(Error::FetchMessagesError),
374            }
375        }?;
376
377        Ok(Envelopes::from_imap_data_items(fetches))
378    }
379
380    #[instrument(skip_all, fields(client = self.id))]
381    pub async fn fetch_envelopes_map(
382        &mut self,
383        uids: SequenceSet,
384    ) -> Result<HashMap<String, Envelope>> {
385        let fetches = loop {
386            let res = self
387                .retry
388                .timeout(self.inner.uid_fetch(uids.clone(), FETCH_ENVELOPES.clone()))
389                .await;
390
391            match self.retry(res).await? {
392                ImapRetryState::Retry => continue,
393                ImapRetryState::TimedOut => break Err(Error::FetchMessagesTimedOutError),
394                ImapRetryState::Ok(res) => break res.map_err(Error::FetchMessagesError),
395            }
396        }?;
397
398        let map = fetches
399            .into_values()
400            .map(|items| {
401                let envelope = Envelope::from_imap_data_items(items.as_ref());
402                (envelope.id.clone(), envelope)
403            })
404            .collect();
405
406        Ok(map)
407    }
408
409    #[instrument(skip_all, fields(client = self.id))]
410    pub async fn fetch_first_envelope(&mut self, uid: u32) -> Result<Envelope> {
411        let items = loop {
412            let task = self
413                .inner
414                .uid_fetch_first(uid.try_into().unwrap(), FETCH_ENVELOPES.clone());
415
416            let res = self.retry.timeout(task).await;
417
418            match self.retry(res).await? {
419                ImapRetryState::Retry => continue,
420                ImapRetryState::TimedOut => break Err(Error::FetchMessagesTimedOutError),
421                ImapRetryState::Ok(res) => break res.map_err(Error::FetchMessagesError),
422            }
423        }?;
424
425        Ok(Envelope::from_imap_data_items(items.as_ref()))
426    }
427
428    #[instrument(skip_all, fields(client = self.id))]
429    pub async fn fetch_envelopes_by_sequence(&mut self, seq: SequenceSet) -> Result<Envelopes> {
430        let fetches = loop {
431            let res = self
432                .retry
433                .timeout(self.inner.fetch(seq.clone(), FETCH_ENVELOPES.clone()))
434                .await;
435
436            match self.retry(res).await? {
437                ImapRetryState::Retry => continue,
438                ImapRetryState::TimedOut => break Err(Error::FetchMessagesTimedOutError),
439                ImapRetryState::Ok(res) => break res.map_err(Error::FetchMessagesError),
440            }
441        }?;
442
443        Ok(Envelopes::from_imap_data_items(fetches))
444    }
445
446    #[instrument(skip_all, fields(client = self.id))]
447    pub async fn fetch_all_envelopes(&mut self) -> Result<Envelopes> {
448        self.fetch_envelopes_by_sequence("1:*".try_into().unwrap())
449            .await
450    }
451
452    #[instrument(skip_all, fields(client = self.id))]
453    pub async fn sort_uids(
454        &mut self,
455        sort_criteria: impl IntoIterator<Item = SortCriterion> + Clone,
456        search_criteria: impl IntoIterator<Item = SearchKey<'static>> + Clone,
457    ) -> Result<Vec<NonZeroU32>> {
458        loop {
459            let task = self
460                .inner
461                .uid_sort(sort_criteria.clone(), search_criteria.clone());
462
463            let res = self.retry.timeout(task).await;
464
465            match self.retry(res).await? {
466                ImapRetryState::Retry => continue,
467                ImapRetryState::TimedOut => break Err(Error::SortUidsTimedOutError),
468                ImapRetryState::Ok(res) => break res.map_err(Error::SortUidsError),
469            }
470        }
471    }
472
473    #[instrument(skip_all, fields(client = self.id))]
474    pub async fn search_uids(
475        &mut self,
476        search_criteria: impl IntoIterator<Item = SearchKey<'static>> + Clone,
477    ) -> Result<Vec<NonZeroU32>> {
478        loop {
479            let res = self
480                .retry
481                .timeout(self.inner.uid_search(search_criteria.clone()))
482                .await;
483
484            match self.retry(res).await? {
485                ImapRetryState::Retry => continue,
486                ImapRetryState::TimedOut => break Err(Error::SearchUidsTimedOutError),
487                ImapRetryState::Ok(res) => break res.map_err(Error::SearchUidsError),
488            }
489        }
490    }
491
492    #[instrument(skip_all, fields(client = self.id))]
493    pub async fn sort_envelopes(
494        &mut self,
495        sort_criteria: impl IntoIterator<Item = SortCriterion> + Clone,
496        search_criteria: impl IntoIterator<Item = SearchKey<'static>> + Clone,
497    ) -> Result<Envelopes> {
498        let fetches = loop {
499            let task = self.inner.uid_sort_or_fallback(
500                sort_criteria.clone(),
501                search_criteria.clone(),
502                FETCH_ENVELOPES.clone(),
503            );
504
505            let res = self.retry.timeout(task).await;
506
507            match self.retry(res).await? {
508                ImapRetryState::Retry => continue,
509                ImapRetryState::TimedOut => break Err(Error::FetchMessagesTimedOutError),
510                ImapRetryState::Ok(res) => break res.map_err(Error::FetchMessagesError),
511            }
512        }?;
513
514        Ok(Envelopes::from(fetches))
515    }
516
517    #[instrument(skip_all, fields(client = self.id))]
518    pub async fn thread_envelopes(
519        &mut self,
520        search_criteria: impl IntoIterator<Item = SearchKey<'static>> + Clone,
521    ) -> Result<Vec<Thread>> {
522        loop {
523            let task = self
524                .inner
525                .uid_thread(ThreadingAlgorithm::References, search_criteria.clone());
526
527            let res = self.retry.timeout(task).await;
528
529            match self.retry(res).await? {
530                ImapRetryState::Retry => continue,
531                ImapRetryState::TimedOut => break Err(Error::ThreadMessagesTimedOutError),
532                ImapRetryState::Ok(res) => break res.map_err(Error::ThreadMessagesError),
533            }
534        }
535    }
536
537    #[instrument(skip_all, fields(client = self.id))]
538    pub async fn idle(
539        &mut self,
540        wait_for_shutdown_request: &mut oneshot::Receiver<()>,
541    ) -> Result<()> {
542        let tag = self.inner.enqueue_idle();
543
544        select! {
545            output = self.inner.idle(tag.clone()) => {
546                output.map_err(Error::StartIdleError)?;
547                Ok(())
548            },
549            _ = wait_for_shutdown_request => {
550                debug!("shutdown requested, sending done command…");
551                self.inner.idle_done(tag.clone()).await.map_err(Error::StopIdleError)?;
552                Err(Error::IdleInterruptedError)
553            }
554        }
555    }
556
557    #[instrument(skip_all, fields(client = self.id))]
558    pub async fn add_flags(
559        &mut self,
560        uids: SequenceSet,
561        flags: impl IntoIterator<Item = Flag<'static>> + Clone,
562    ) -> Result<HashMap<NonZeroU32, Vec1<MessageDataItem<'static>>>> {
563        loop {
564            let task = self
565                .inner
566                .uid_store(uids.clone(), StoreType::Add, flags.clone());
567
568            let res = self.retry.timeout(task).await;
569
570            match self.retry(res).await? {
571                ImapRetryState::Retry => continue,
572                ImapRetryState::TimedOut => break Err(Error::StoreFlagsTimedOutError),
573                ImapRetryState::Ok(res) => break res.map_err(Error::StoreFlagsError),
574            }
575        }
576    }
577
578    #[instrument(skip_all, fields(client = self.id))]
579    pub async fn add_deleted_flag(
580        &mut self,
581        uids: SequenceSet,
582    ) -> Result<HashMap<NonZeroU32, Vec1<MessageDataItem<'static>>>> {
583        loop {
584            let task = self
585                .inner
586                .uid_store(uids.clone(), StoreType::Add, Some(Flag::Deleted));
587
588            let res = self.retry.timeout(task).await;
589
590            match self.retry(res).await? {
591                ImapRetryState::Retry => continue,
592                ImapRetryState::TimedOut => break Err(Error::StoreFlagsTimedOutError),
593                ImapRetryState::Ok(res) => break res.map_err(Error::StoreFlagsError),
594            }
595        }
596    }
597
598    #[instrument(skip_all, fields(client = self.id))]
599    pub async fn add_deleted_flag_silently(&mut self, uids: SequenceSet) -> Result<()> {
600        loop {
601            let task =
602                self.inner
603                    .uid_silent_store(uids.clone(), StoreType::Add, Some(Flag::Deleted));
604
605            let res = self.retry.timeout(task).await;
606
607            match self.retry(res).await? {
608                ImapRetryState::Retry => continue,
609                ImapRetryState::TimedOut => break Err(Error::StoreFlagsTimedOutError),
610                ImapRetryState::Ok(res) => break res.map_err(Error::StoreFlagsError),
611            }
612        }
613    }
614
615    #[instrument(skip_all, fields(client = self.id))]
616    pub async fn add_flags_silently(
617        &mut self,
618        uids: SequenceSet,
619        flags: impl IntoIterator<Item = Flag<'static>> + Clone,
620    ) -> Result<()> {
621        loop {
622            let task = self
623                .inner
624                .uid_silent_store(uids.clone(), StoreType::Add, flags.clone());
625
626            let res = self.retry.timeout(task).await;
627
628            match self.retry(res).await? {
629                ImapRetryState::Retry => continue,
630                ImapRetryState::TimedOut => break Err(Error::StoreFlagsTimedOutError),
631                ImapRetryState::Ok(res) => break res.map_err(Error::StoreFlagsError),
632            }
633        }
634    }
635
636    #[instrument(skip_all, fields(client = self.id))]
637    pub async fn set_flags(
638        &mut self,
639        uids: SequenceSet,
640        flags: impl IntoIterator<Item = Flag<'static>> + Clone,
641    ) -> Result<HashMap<NonZeroU32, Vec1<MessageDataItem<'static>>>> {
642        loop {
643            let task = self
644                .inner
645                .uid_store(uids.clone(), StoreType::Replace, flags.clone());
646
647            let res = self.retry.timeout(task).await;
648
649            match self.retry(res).await? {
650                ImapRetryState::Retry => continue,
651                ImapRetryState::TimedOut => break Err(Error::StoreFlagsTimedOutError),
652                ImapRetryState::Ok(res) => break res.map_err(Error::StoreFlagsError),
653            }
654        }
655    }
656
657    #[instrument(skip_all, fields(client = self.id))]
658    pub async fn set_flags_silently(
659        &mut self,
660        uids: SequenceSet,
661        flags: impl IntoIterator<Item = Flag<'static>> + Clone,
662    ) -> Result<()> {
663        loop {
664            let task = self
665                .inner
666                .uid_silent_store(uids.clone(), StoreType::Replace, flags.clone());
667
668            let res = self.retry.timeout(task).await;
669
670            match self.retry(res).await? {
671                ImapRetryState::Retry => continue,
672                ImapRetryState::TimedOut => break Err(Error::StoreFlagsTimedOutError),
673                ImapRetryState::Ok(res) => break res.map_err(Error::StoreFlagsError),
674            }
675        }
676    }
677
678    #[instrument(skip_all, fields(client = self.id))]
679    pub async fn remove_flags(
680        &mut self,
681        uids: SequenceSet,
682        flags: impl IntoIterator<Item = Flag<'static>> + Clone,
683    ) -> Result<HashMap<NonZeroU32, Vec1<MessageDataItem<'static>>>> {
684        loop {
685            let task = self
686                .inner
687                .uid_store(uids.clone(), StoreType::Remove, flags.clone());
688
689            let res = self.retry.timeout(task).await;
690
691            match self.retry(res).await? {
692                ImapRetryState::Retry => continue,
693                ImapRetryState::TimedOut => break Err(Error::StoreFlagsTimedOutError),
694                ImapRetryState::Ok(res) => break res.map_err(Error::StoreFlagsError),
695            }
696        }
697    }
698
699    #[instrument(skip_all, fields(client = self.id))]
700    pub async fn remove_flags_silently(
701        &mut self,
702        uids: SequenceSet,
703        flags: impl IntoIterator<Item = Flag<'static>> + Clone,
704    ) -> Result<()> {
705        loop {
706            let task = self
707                .inner
708                .uid_silent_store(uids.clone(), StoreType::Remove, flags.clone());
709
710            let res = self.retry.timeout(task).await;
711
712            match self.retry(res).await? {
713                ImapRetryState::Retry => continue,
714                ImapRetryState::TimedOut => break Err(Error::StoreFlagsTimedOutError),
715                ImapRetryState::Ok(res) => break res.map_err(Error::StoreFlagsError),
716            }
717        }
718    }
719
720    #[instrument(skip_all, fields(client = self.id))]
721    pub async fn add_message(
722        &mut self,
723        mbox: impl ToString,
724        flags: impl IntoIterator<Item = Flag<'static>> + Clone,
725        msg: impl AsRef<[u8]> + Clone,
726    ) -> Result<NonZeroU32> {
727        let id = loop {
728            let task =
729                self.inner
730                    .appenduid_or_fallback(mbox.to_string(), flags.clone(), msg.clone());
731
732            let res = self.retry.timeout(task).await;
733
734            match self.retry(res).await? {
735                ImapRetryState::Retry => continue,
736                ImapRetryState::TimedOut => break Err(Error::AddMessageTimedOutError),
737                ImapRetryState::Ok(res) => break res.map_err(Error::AddMessageError),
738            }
739        }?;
740
741        id.ok_or(Error::FindAppendedMessageUidError)
742    }
743
744    #[instrument(skip_all, fields(client = self.id))]
745    pub async fn fetch_messages(&mut self, uids: SequenceSet) -> Result<Messages> {
746        let mut fetches = loop {
747            let res = self
748                .retry
749                .timeout(self.inner.uid_fetch(uids.clone(), FETCH_MESSAGES.clone()))
750                .await;
751
752            match self.retry(res).await? {
753                ImapRetryState::Retry => continue,
754                ImapRetryState::TimedOut => break Err(Error::FetchMessagesTimedOutError),
755                ImapRetryState::Ok(res) => break res.map_err(Error::FetchMessagesError),
756            }
757        }?;
758
759        let fetches: Vec<_> = uids
760            .iter(NonZeroU32::MAX)
761            .filter_map(|ref uid| fetches.remove(uid))
762            .collect();
763
764        Ok(Messages::from(fetches))
765    }
766
767    #[instrument(skip_all, fields(client = self.id))]
768    pub async fn peek_messages(&mut self, uids: SequenceSet) -> Result<Messages> {
769        let mut fetches = loop {
770            let res = self
771                .retry
772                .timeout(self.inner.uid_fetch(uids.clone(), PEEK_MESSAGES.clone()))
773                .await;
774
775            match self.retry(res).await? {
776                ImapRetryState::Retry => continue,
777                ImapRetryState::TimedOut => break Err(Error::FetchMessagesTimedOutError),
778                ImapRetryState::Ok(res) => break res.map_err(Error::FetchMessagesError),
779            }
780        }?;
781
782        let fetches: Vec<_> = uids
783            .iter(NonZeroU32::MAX)
784            .filter_map(|ref uid| fetches.remove(uid))
785            .collect();
786
787        Ok(Messages::from(fetches))
788    }
789
790    #[instrument(skip_all, fields(client = self.id))]
791    pub async fn copy_messages(&mut self, uids: SequenceSet, mbox: impl ToString) -> Result<()> {
792        loop {
793            let res = self
794                .retry
795                .timeout(self.inner.uid_copy(uids.clone(), mbox.to_string()))
796                .await;
797
798            match self.retry(res).await? {
799                ImapRetryState::Retry => continue,
800                ImapRetryState::TimedOut => break Err(Error::CopyMessagesTimedOutError),
801                ImapRetryState::Ok(res) => break res.map_err(Error::CopyMessagesError),
802            }
803        }
804    }
805
806    #[instrument(skip_all, fields(client = self.id))]
807    pub async fn move_messages(&mut self, uids: SequenceSet, mbox: impl ToString) -> Result<()> {
808        loop {
809            let res = self
810                .retry
811                .timeout(self.inner.uid_move(uids.clone(), mbox.to_string()))
812                .await;
813
814            match self.retry(res).await? {
815                ImapRetryState::Retry => continue,
816                ImapRetryState::TimedOut => break Err(Error::MoveMessagesTimedOutError),
817                ImapRetryState::Ok(res) => break res.map_err(Error::MoveMessagesError),
818            }
819        }
820    }
821}
822
823impl fmt::Debug for ImapClient {
824    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
825        f.debug_struct("ImapContext")
826            .field("id", &self.id)
827            .field("mailbox", &self.mailbox)
828            .field("imap_config", &self.imap_config)
829            .finish_non_exhaustive()
830    }
831}
832
833/// The sync version of the IMAP backend context.
834///
835/// This is just an IMAP session wrapped into a mutex, so the same
836/// IMAP session can be shared and updated across multiple threads.
837#[derive(Debug, Clone)]
838pub struct ImapContext {
839    /// The account configuration.
840    pub account_config: Arc<AccountConfig>,
841
842    /// The IMAP configuration.
843    pub imap_config: Arc<ImapConfig>,
844
845    clients: Vec<Arc<Mutex<ImapClient>>>,
846}
847
848impl ImapContext {
849    pub async fn client(&self) -> MutexGuard<'_, ImapClient> {
850        loop {
851            let lock = self
852                .clients
853                .iter()
854                .find_map(|client| client.try_lock().ok());
855
856            if let Some(ctx) = lock {
857                let total = self.clients.len();
858                let id = ctx.id;
859                debug!("client {id}/{total} is free, locking it");
860                break ctx;
861            } else {
862                trace!("no free client, sleeping for 1s");
863                sleep(Duration::from_secs(1)).await;
864            }
865        }
866    }
867}
868
869impl BackendContext for ImapContext {}
870
871/// The IMAP backend context builder.
872#[derive(Clone, Debug, Default, Eq, PartialEq)]
873pub struct ImapContextBuilder {
874    /// The account configuration.
875    pub account_config: Arc<AccountConfig>,
876
877    /// The IMAP configuration.
878    pub imap_config: Arc<ImapConfig>,
879
880    /// The prebuilt IMAP credentials.
881    prebuilt_credentials: Option<String>,
882
883    pool_size: u8,
884}
885
886impl ImapContextBuilder {
887    pub fn new(account_config: Arc<AccountConfig>, imap_config: Arc<ImapConfig>) -> Self {
888        let pool_size = imap_config.clients_pool_size();
889
890        Self {
891            account_config,
892            imap_config,
893            prebuilt_credentials: None,
894            pool_size,
895        }
896    }
897
898    pub async fn prebuild_credentials(&mut self) -> Result<()> {
899        self.prebuilt_credentials = Some(self.imap_config.build_credentials().await?);
900        Ok(())
901    }
902
903    pub async fn with_prebuilt_credentials(mut self) -> Result<Self> {
904        self.prebuild_credentials().await?;
905        Ok(self)
906    }
907
908    pub fn with_pool_size(mut self, pool_size: u8) -> Self {
909        self.pool_size = pool_size;
910        self
911    }
912}
913
914#[cfg(feature = "sync")]
915impl crate::sync::hash::SyncHash for ImapContextBuilder {
916    fn sync_hash(&self, state: &mut std::hash::DefaultHasher) {
917        self.imap_config.sync_hash(state);
918    }
919}
920
921#[async_trait]
922impl BackendContextBuilder for ImapContextBuilder {
923    type Context = ImapContext;
924
925    fn check_up(&self) -> Option<BackendFeature<Self::Context, dyn CheckUp>> {
926        Some(Arc::new(CheckUpImap::some_new_boxed))
927    }
928
929    fn add_folder(&self) -> Option<BackendFeature<Self::Context, dyn AddFolder>> {
930        Some(Arc::new(AddImapFolder::some_new_boxed))
931    }
932
933    fn list_folders(&self) -> Option<BackendFeature<Self::Context, dyn ListFolders>> {
934        Some(Arc::new(ListImapFolders::some_new_boxed))
935    }
936
937    fn expunge_folder(&self) -> Option<BackendFeature<Self::Context, dyn ExpungeFolder>> {
938        Some(Arc::new(ExpungeImapFolder::some_new_boxed))
939    }
940
941    fn purge_folder(&self) -> Option<BackendFeature<Self::Context, dyn PurgeFolder>> {
942        Some(Arc::new(PurgeImapFolder::some_new_boxed))
943    }
944
945    fn delete_folder(&self) -> Option<BackendFeature<Self::Context, dyn DeleteFolder>> {
946        Some(Arc::new(DeleteImapFolder::some_new_boxed))
947    }
948
949    fn get_envelope(&self) -> Option<BackendFeature<Self::Context, dyn GetEnvelope>> {
950        Some(Arc::new(GetImapEnvelope::some_new_boxed))
951    }
952
953    fn list_envelopes(&self) -> Option<BackendFeature<Self::Context, dyn ListEnvelopes>> {
954        Some(Arc::new(ListImapEnvelopes::some_new_boxed))
955    }
956
957    #[cfg(feature = "thread")]
958    fn thread_envelopes(&self) -> Option<BackendFeature<Self::Context, dyn ThreadEnvelopes>> {
959        Some(Arc::new(ThreadImapEnvelopes::some_new_boxed))
960    }
961
962    #[cfg(feature = "watch")]
963    fn watch_envelopes(&self) -> Option<BackendFeature<Self::Context, dyn WatchEnvelopes>> {
964        Some(Arc::new(WatchImapEnvelopes::some_new_boxed))
965    }
966
967    fn add_flags(&self) -> Option<BackendFeature<Self::Context, dyn AddFlags>> {
968        Some(Arc::new(AddImapFlags::some_new_boxed))
969    }
970
971    fn set_flags(&self) -> Option<BackendFeature<Self::Context, dyn SetFlags>> {
972        Some(Arc::new(SetImapFlags::some_new_boxed))
973    }
974
975    fn remove_flags(&self) -> Option<BackendFeature<Self::Context, dyn RemoveFlags>> {
976        Some(Arc::new(RemoveImapFlags::some_new_boxed))
977    }
978
979    fn add_message(&self) -> Option<BackendFeature<Self::Context, dyn AddMessage>> {
980        Some(Arc::new(AddImapMessage::some_new_boxed))
981    }
982
983    fn peek_messages(&self) -> Option<BackendFeature<Self::Context, dyn PeekMessages>> {
984        Some(Arc::new(PeekImapMessages::some_new_boxed))
985    }
986
987    fn get_messages(&self) -> Option<BackendFeature<Self::Context, dyn GetMessages>> {
988        Some(Arc::new(GetImapMessages::some_new_boxed))
989    }
990
991    fn copy_messages(&self) -> Option<BackendFeature<Self::Context, dyn CopyMessages>> {
992        Some(Arc::new(CopyImapMessages::some_new_boxed))
993    }
994
995    fn move_messages(&self) -> Option<BackendFeature<Self::Context, dyn MoveMessages>> {
996        Some(Arc::new(MoveImapMessages::some_new_boxed))
997    }
998
999    fn delete_messages(&self) -> Option<BackendFeature<Self::Context, dyn DeleteMessages>> {
1000        Some(Arc::new(DeleteImapMessages::some_new_boxed))
1001    }
1002
1003    fn remove_messages(&self) -> Option<BackendFeature<Self::Context, dyn RemoveMessages>> {
1004        Some(Arc::new(RemoveImapMessages::some_new_boxed))
1005    }
1006
1007    async fn build(self) -> AnyResult<Self::Context> {
1008        let client_builder =
1009            ImapClientBuilder::new(self.imap_config.clone(), self.prebuilt_credentials);
1010
1011        debug!("building {} IMAP clients", self.pool_size);
1012
1013        let clients = FuturesUnordered::from_iter((0..self.pool_size).map(move |i| {
1014            let mut client_builder = client_builder.clone();
1015            tokio::spawn(async move {
1016                let client = client_builder.build().await?;
1017                Ok((i + 1, client_builder, client))
1018            })
1019        }))
1020        .map(|res| match res {
1021            Err(err) => Err(Error::JoinClientError(err)),
1022            Ok(Err(err)) => Err(Error::BuildClientError(Box::new(err))),
1023            Ok(Ok((id, client_builder, inner))) => Ok(Arc::new(Mutex::new(ImapClient {
1024                id,
1025                account_config: self.account_config.clone(),
1026                imap_config: self.imap_config.clone(),
1027                client_builder,
1028                inner,
1029                mailbox: Default::default(),
1030                retry: Default::default(),
1031            }))),
1032        })
1033        .collect::<Vec<_>>()
1034        .await
1035        .into_iter()
1036        .collect::<Result<_>>()?;
1037
1038        Ok(ImapContext {
1039            account_config: self.account_config,
1040            imap_config: self.imap_config,
1041            clients,
1042        })
1043    }
1044}
1045
1046#[derive(Clone, Debug)]
1047pub struct CheckUpImap {
1048    ctx: ImapContext,
1049}
1050
1051impl CheckUpImap {
1052    pub fn new(ctx: &ImapContext) -> Self {
1053        Self { ctx: ctx.clone() }
1054    }
1055
1056    pub fn new_boxed(ctx: &ImapContext) -> Box<dyn CheckUp> {
1057        Box::new(Self::new(ctx))
1058    }
1059
1060    pub fn some_new_boxed(ctx: &ImapContext) -> Option<Box<dyn CheckUp>> {
1061        Some(Self::new_boxed(ctx))
1062    }
1063}
1064
1065#[async_trait]
1066impl CheckUp for CheckUpImap {
1067    #[instrument(skip_all)]
1068    async fn check_up(&self) -> AnyResult<()> {
1069        debug!("executing check up backend feature");
1070        Ok(self.ctx.client().await.noop().await?)
1071    }
1072}
1073
1074#[derive(Clone, Debug)]
1075pub struct ImapClientBuilder {
1076    pub config: Arc<ImapConfig>,
1077    pub credentials: Option<String>,
1078}
1079
1080impl ImapClientBuilder {
1081    pub fn new(config: Arc<ImapConfig>, credentials: Option<String>) -> Self {
1082        Self {
1083            config,
1084            credentials,
1085        }
1086    }
1087
1088    /// Creates a new session from an IMAP configuration and optional
1089    /// pre-built credentials.
1090    ///
1091    /// Pre-built credentials are useful to prevent building them
1092    /// every time a new session is created. The main use case is for
1093    /// the synchronization, where multiple sessions can be created in
1094    /// a row.
1095    #[instrument(name = "client::build", skip(self))]
1096    pub async fn build(&mut self) -> Result<Client> {
1097        let mut client = match &self.config.encryption {
1098            Some(Encryption::None) => Client::insecure(&self.config.host, self.config.port)
1099                .await
1100                .map_err(|err| {
1101                    let host = self.config.host.clone();
1102                    let port = self.config.port.clone();
1103                    Error::BuildInsecureClientError(err, host, port)
1104                })?,
1105            Some(Encryption::Tls(Tls {
1106                provider: Some(TlsProvider::None),
1107                ..
1108            }))
1109            | Some(Encryption::StartTls(Tls {
1110                provider: Some(TlsProvider::None),
1111                ..
1112            })) => {
1113                return Err(Error::BuildTlsClientMissingProvider);
1114            }
1115            #[cfg(feature = "rustls")]
1116            Some(Encryption::Tls(Tls {
1117                provider: Some(TlsProvider::Rustls(_)) | None,
1118                cert,
1119            })) => Client::rustls(&self.config.host, self.config.port, false, cert.clone())
1120                .await
1121                .map_err(|err| {
1122                    let host = self.config.host.clone();
1123                    let port = self.config.port.clone();
1124                    Error::BuildStartTlsClientError(err, host, port)
1125                })?,
1126            #[cfg(feature = "rustls")]
1127            None => Client::rustls(&self.config.host, self.config.port, false, None)
1128                .await
1129                .map_err(|err| {
1130                    let host = self.config.host.clone();
1131                    let port = self.config.port.clone();
1132                    Error::BuildStartTlsClientError(err, host, port)
1133                })?,
1134            // #[cfg(feature = "native-tls")]
1135            // Some(Encryption::Tls(Tls {
1136            //     provider: Some(TlsProvider::NativeTls(_)),
1137            // })) => Client::native_tls(&self.config.host, self.config.port, false)
1138            //     .await
1139            //     .map_err(|err| {
1140            //         let host = self.config.host.clone();
1141            //         let port = self.config.port.clone();
1142            //         Error::BuildStartTlsClientError(err, host, port)
1143            //     })?,
1144            #[cfg(feature = "rustls")]
1145            Some(Encryption::StartTls(Tls {
1146                provider: Some(TlsProvider::Rustls(_)) | None,
1147                cert,
1148            })) => Client::rustls(&self.config.host, self.config.port, true, cert.clone())
1149                .await
1150                .map_err(|err| {
1151                    let host = self.config.host.clone();
1152                    let port = self.config.port.clone();
1153                    Error::BuildStartTlsClientError(err, host, port)
1154                })?,
1155            // #[cfg(feature = "native-tls")]
1156            // Some(Encryption::StartTls(Tls {
1157            //     provider: Some(TlsProvider::NativeTls(_)),
1158            // })) => Client::native_tls(&self.config.host, self.config.port, true)
1159            //     .await
1160            //     .map_err(|err| {
1161            //         let host = self.config.host.clone();
1162            //         let port = self.config.port.clone();
1163            //         Error::BuildStartTlsClientError(err, host, port)
1164            //     })?,
1165        };
1166
1167        client
1168            .state
1169            .set_some_idle_timeout(self.config.find_watch_timeout().map(Duration::from_secs));
1170
1171        match &self.config.auth {
1172            ImapAuthConfig::Password(passwd) => {
1173                debug!("using password authentication");
1174
1175                let passwd = match self.credentials.as_ref() {
1176                    Some(passwd) => passwd.to_string(),
1177                    None => passwd
1178                        .get()
1179                        .await
1180                        .map_err(Error::GetPasswdImapError)?
1181                        .lines()
1182                        .next()
1183                        .ok_or(Error::GetPasswdEmptyImapError)?
1184                        .to_owned(),
1185                };
1186
1187                let mechanisms: Vec<_> =
1188                    client.state.supported_auth_mechanisms().cloned().collect();
1189                let mut authenticated = false;
1190
1191                debug!(?mechanisms, "supported auth mechanisms");
1192
1193                for mechanism in mechanisms {
1194                    debug!(?mechanism, "trying auth mechanism…");
1195
1196                    let auth = match mechanism {
1197                        AuthMechanism::Plain => {
1198                            client
1199                                .authenticate_plain(self.config.login.as_str(), passwd.as_str())
1200                                .await
1201                        }
1202                        // TODO
1203                        // AuthMechanism::Login => {
1204                        //     client
1205                        //         .authenticate_login(self.config.login.as_str(), passwd.as_str())
1206                        //         .await
1207                        // }
1208                        _ => {
1209                            continue;
1210                        }
1211                    };
1212
1213                    if let Err(ref err) = auth {
1214                        warn!(?mechanism, ?err, "authentication failed");
1215                    }
1216
1217                    if auth.is_ok() {
1218                        debug!(?mechanism, "authentication succeeded!");
1219                        authenticated = true;
1220                        break;
1221                    }
1222                }
1223
1224                if !authenticated {
1225                    if !client.state.login_supported() {
1226                        return Err(Error::LoginNotSupportedError);
1227                    }
1228
1229                    debug!("trying login…");
1230
1231                    client
1232                        .login(self.config.login.as_str(), passwd.as_str())
1233                        .await
1234                        .map_err(Error::LoginError)?;
1235
1236                    debug!("login succeeded!");
1237                }
1238            }
1239            #[cfg(feature = "oauth2")]
1240            ImapAuthConfig::OAuth2(oauth2) => {
1241                debug!("using OAuth 2.0 authentication");
1242
1243                match oauth2.method {
1244                    OAuth2Method::XOAuth2 => {
1245                        if !client.state.supports_auth_mechanism(AuthMechanism::XOAuth2) {
1246                            let auth = client.state.supported_auth_mechanisms().cloned().collect();
1247                            return Err(Error::AuthenticateXOAuth2NotSupportedError(auth));
1248                        }
1249
1250                        debug!("using XOAUTH2 auth mechanism");
1251
1252                        let access_token = match self.credentials.as_ref() {
1253                            Some(access_token) => access_token.to_string(),
1254                            None => oauth2
1255                                .access_token()
1256                                .await
1257                                .map_err(Error::RefreshAccessTokenError)?,
1258                        };
1259
1260                        let auth = client
1261                            .authenticate_xoauth2(self.config.login.as_str(), access_token.as_str())
1262                            .await;
1263
1264                        if auth.is_err() {
1265                            warn!("authentication failed, refreshing access token and retrying…");
1266
1267                            let access_token = oauth2
1268                                .refresh_access_token()
1269                                .await
1270                                .map_err(Error::RefreshAccessTokenError)?;
1271
1272                            client
1273                                .authenticate_xoauth2(
1274                                    self.config.login.as_str(),
1275                                    access_token.as_str(),
1276                                )
1277                                .await
1278                                .map_err(Error::AuthenticateXOauth2Error)?;
1279
1280                            self.credentials = Some(access_token);
1281                        }
1282                    }
1283                    OAuth2Method::OAuthBearer => {
1284                        if !client
1285                            .state
1286                            .supports_auth_mechanism("OAUTHBEARER".try_into().unwrap())
1287                        {
1288                            let auth = client.state.supported_auth_mechanisms().cloned().collect();
1289                            return Err(Error::AuthenticateOAuthBearerNotSupportedError(auth));
1290                        }
1291
1292                        debug!("using OAUTHBEARER auth mechanism");
1293
1294                        let access_token = match self.credentials.as_ref() {
1295                            Some(access_token) => access_token.to_string(),
1296                            None => oauth2
1297                                .access_token()
1298                                .await
1299                                .map_err(Error::RefreshAccessTokenError)?,
1300                        };
1301
1302                        let auth = client
1303                            .authenticate_oauthbearer(
1304                                self.config.login.as_str(),
1305                                self.config.host.as_str(),
1306                                self.config.port,
1307                                access_token.as_str(),
1308                            )
1309                            .await;
1310
1311                        if auth.is_err() {
1312                            warn!("authentication failed, refreshing access token and retrying");
1313
1314                            let access_token = oauth2
1315                                .refresh_access_token()
1316                                .await
1317                                .map_err(Error::RefreshAccessTokenError)?;
1318
1319                            client
1320                                .authenticate_oauthbearer(
1321                                    self.config.login.as_str(),
1322                                    self.config.host.as_str(),
1323                                    self.config.port,
1324                                    access_token.as_str(),
1325                                )
1326                                .await
1327                                .map_err(Error::AuthenticateOAuthBearerError)?;
1328
1329                            self.credentials = Some(access_token);
1330                        }
1331                    }
1332                }
1333            }
1334        };
1335
1336        if self.config.send_id_after_auth() {
1337            let params = ID_PARAMS.clone();
1338            debug!(?params, "client identity");
1339
1340            let params = client
1341                .id(Some(ID_PARAMS.clone()))
1342                .await
1343                .map_err(Error::ExchangeIdsError)?;
1344
1345            debug!(?params, "server identity");
1346        }
1347
1348        // TODO: make it customizable
1349        //
1350        // debug!("enabling UTF8 capability…");
1351        //
1352        // client
1353        //     .enable(Some(CapabilityEnable::Utf8(Utf8Kind::Accept)))
1354        //     .await
1355        //     .map_err(Error::EnableCapabilityError)?;
1356
1357        Ok(client)
1358    }
1359}