email/imap/
mod.rs

1pub mod config;
2mod error;
3
4use std::{
5    collections::HashMap, env, fmt, io::ErrorKind::ConnectionReset, num::NonZeroU32, sync::Arc,
6    time::Duration,
7};
8
9use async_trait::async_trait;
10use futures::{stream::FuturesUnordered, StreamExt};
11use imap_client::{
12    client::tokio::{Client, ClientError},
13    imap_next::imap_types::{
14        auth::AuthMechanism,
15        core::{IString, NString, Vec1},
16        extensions::{
17            sort::SortCriterion,
18            thread::{Thread, ThreadingAlgorithm},
19        },
20        fetch::MessageDataItem,
21        flag::{Flag, StoreType},
22        search::SearchKey,
23        sequence::SequenceSet,
24    },
25    stream::Error as StreamError,
26    tasks::{tasks::select::SelectDataUnvalidated, SchedulerError},
27};
28use once_cell::sync::Lazy;
29use tokio::{
30    select,
31    sync::{oneshot, Mutex, MutexGuard},
32    time::sleep,
33};
34use tracing::{debug, instrument, trace, warn};
35
36use self::config::{ImapAuthConfig, ImapConfig};
37#[doc(inline)]
38pub use self::error::{Error, Result};
39#[cfg(feature = "oauth2")]
40use crate::account::config::oauth2::OAuth2Method;
41#[cfg(feature = "thread")]
42use crate::envelope::thread::{imap::ThreadImapEnvelopes, ThreadEnvelopes};
43#[cfg(feature = "watch")]
44use crate::envelope::watch::{imap::WatchImapEnvelopes, WatchEnvelopes};
45use crate::{
46    account::config::AccountConfig,
47    backend::{
48        context::{BackendContext, BackendContextBuilder},
49        feature::{BackendFeature, CheckUp},
50    },
51    envelope::{
52        get::{imap::GetImapEnvelope, GetEnvelope},
53        imap::FETCH_ENVELOPES,
54        list::{imap::ListImapEnvelopes, ListEnvelopes},
55        Envelope, Envelopes,
56    },
57    flag::{
58        add::{imap::AddImapFlags, AddFlags},
59        remove::{imap::RemoveImapFlags, RemoveFlags},
60        set::{imap::SetImapFlags, SetFlags},
61    },
62    folder::{
63        add::{imap::AddImapFolder, AddFolder},
64        delete::{imap::DeleteImapFolder, DeleteFolder},
65        expunge::{imap::ExpungeImapFolder, ExpungeFolder},
66        list::{imap::ListImapFolders, ListFolders},
67        purge::{imap::PurgeImapFolder, PurgeFolder},
68        Folders,
69    },
70    message::{
71        add::{imap::AddImapMessage, AddMessage},
72        copy::{imap::CopyImapMessages, CopyMessages},
73        delete::{imap::DeleteImapMessages, DeleteMessages},
74        get::{imap::GetImapMessages, GetMessages},
75        imap::{FETCH_MESSAGES, PEEK_MESSAGES},
76        peek::{imap::PeekImapMessages, PeekMessages},
77        r#move::{imap::MoveImapMessages, MoveMessages},
78        remove::{imap::RemoveImapMessages, RemoveMessages},
79        Messages,
80    },
81    retry::{self, Retry, RetryState},
82    tls::{Encryption, Tls, TlsProvider},
83    AnyResult,
84};
85
86static ID_PARAMS: Lazy<Vec<(IString<'static>, NString<'static>)>> = Lazy::new(|| {
87    vec![
88        (
89            "name".try_into().unwrap(),
90            NString(
91                env::var("CARGO_PKG_NAME")
92                    .ok()
93                    .map(|e| e.try_into().unwrap()),
94            ),
95        ),
96        (
97            "vendor".try_into().unwrap(),
98            NString(
99                env::var("CARGO_PKG_NAME")
100                    .ok()
101                    .map(|e| e.try_into().unwrap()),
102            ),
103        ),
104        (
105            "version".try_into().unwrap(),
106            NString(
107                env::var("CARGO_PKG_VERSION")
108                    .ok()
109                    .map(|e| e.try_into().unwrap()),
110            ),
111        ),
112        (
113            "support-url".try_into().unwrap(),
114            NString(Some(
115                "https://github.com/orgs/pimalaya/discussions/new?category=q-a"
116                    .try_into()
117                    .unwrap(),
118            )),
119        ),
120    ]
121});
122
123enum ImapRetryState<T> {
124    Retry,
125    TimedOut,
126    Ok(std::result::Result<T, ClientError>),
127}
128
129/// The IMAP backend context.
130///
131/// This context is unsync, which means it cannot be shared between
132/// threads. For the sync version, see [`ImapContextSync`].
133pub struct ImapClient {
134    pub id: u8,
135
136    /// The account configuration.
137    pub account_config: Arc<AccountConfig>,
138
139    /// The IMAP configuration.
140    pub imap_config: Arc<ImapConfig>,
141
142    /// The next gen IMAP client builder.
143    pub client_builder: ImapClientBuilder,
144
145    /// The next gen IMAP client.
146    inner: Client,
147
148    /// The selected mailbox.
149    mailbox: Option<String>,
150
151    retry: Retry,
152}
153
154impl ImapClient {
155    async fn retry<T>(
156        &mut self,
157        res: retry::Result<std::result::Result<T, ClientError>>,
158    ) -> Result<ImapRetryState<T>> {
159        match self.retry.next(res) {
160            RetryState::Retry => {
161                debug!(attempt = self.retry.attempts, "request timed out");
162                Ok(ImapRetryState::Retry)
163            }
164            RetryState::TimedOut => {
165                return Ok(ImapRetryState::TimedOut);
166            }
167            RetryState::Ok(Err(ClientError::Stream(err))) => {
168                match err {
169                    StreamError::State(SchedulerError::UnexpectedByeResponse(bye)) => {
170                        debug!(reason = bye.text.to_string(), "stream closed");
171                    }
172                    StreamError::Io(err) if err.kind() == ConnectionReset => {
173                        debug!("connection reset");
174                    }
175                    StreamError::Closed => {
176                        debug!("stream closed");
177                    }
178                    StreamError::Io(err) if err.kind() == ConnectionReset => {
179                        debug!("connection reset");
180                    }
181                    err => {
182                        let err = ClientError::Stream(err);
183                        return Ok(ImapRetryState::Ok(Err(err)));
184                    }
185                };
186
187                debug!("re-connecting…");
188
189                self.inner = self.client_builder.build().await?;
190
191                if let Some(mbox) = &self.mailbox {
192                    self.inner
193                        .select(mbox.clone())
194                        .await
195                        .map_err(Error::SelectMailboxError)?;
196                }
197
198                self.retry.attempts = 0;
199                Ok(ImapRetryState::Retry)
200            }
201            RetryState::Ok(res) => {
202                return Ok(ImapRetryState::Ok(res));
203            }
204        }
205    }
206
207    pub fn ext_sort_supported(&self) -> bool {
208        self.inner.state.ext_sort_supported()
209    }
210
211    #[instrument(skip_all, fields(client = self.id))]
212    pub async fn noop(&mut self) -> Result<()> {
213        self.retry.reset();
214
215        loop {
216            let res = self.retry.timeout(self.inner.noop()).await;
217
218            match self.retry(res).await? {
219                ImapRetryState::Retry => continue,
220                ImapRetryState::TimedOut => break Err(Error::NoOpTimedOutError),
221                ImapRetryState::Ok(res) => break res.map_err(Error::NoOpError),
222            }
223        }
224    }
225
226    #[instrument(skip_all, fields(client = self.id))]
227    pub async fn select_mailbox(&mut self, mbox: impl ToString) -> Result<SelectDataUnvalidated> {
228        self.retry.reset();
229
230        let data = loop {
231            let res = self
232                .retry
233                .timeout(self.inner.select(mbox.to_string()))
234                .await;
235
236            match self.retry(res).await? {
237                ImapRetryState::Retry => continue,
238                ImapRetryState::TimedOut => break Err(Error::SelectMailboxTimedOutError),
239                ImapRetryState::Ok(res) => break res.map_err(Error::SelectMailboxError),
240            }
241        }?;
242
243        self.mailbox = Some(mbox.to_string());
244
245        Ok(data)
246    }
247
248    #[instrument(skip_all, fields(client = self.id))]
249    pub async fn examine_mailbox(&mut self, mbox: impl ToString) -> Result<SelectDataUnvalidated> {
250        self.retry.reset();
251
252        loop {
253            let res = self
254                .retry
255                .timeout(self.inner.examine(mbox.to_string()))
256                .await;
257
258            match self.retry(res).await? {
259                ImapRetryState::Retry => continue,
260                ImapRetryState::TimedOut => break Err(Error::ExamineMailboxTimedOutError),
261                ImapRetryState::Ok(res) => break res.map_err(Error::ExamineMailboxError),
262            }
263        }
264    }
265
266    #[instrument(skip_all, fields(client = self.id))]
267    pub async fn create_mailbox(&mut self, mbox: impl ToString) -> Result<()> {
268        self.retry.reset();
269
270        loop {
271            let res = self
272                .retry
273                .timeout(self.inner.create(mbox.to_string()))
274                .await;
275
276            match self.retry(res).await? {
277                ImapRetryState::Retry => continue,
278                ImapRetryState::TimedOut => break Err(Error::CreateMailboxTimedOutError),
279                ImapRetryState::Ok(res) => break res.map_err(Error::CreateMailboxError),
280            }
281        }
282    }
283
284    #[instrument(skip_all, fields(client = self.id))]
285    pub async fn list_all_mailboxes(&mut self, config: &AccountConfig) -> Result<Folders> {
286        self.retry.reset();
287
288        let mboxes = loop {
289            let res = self.retry.timeout(self.inner.list("", "*")).await;
290
291            match self.retry(res).await? {
292                ImapRetryState::Retry => continue,
293                ImapRetryState::TimedOut => break Err(Error::ListMailboxesTimedOutError),
294                ImapRetryState::Ok(res) => break res.map_err(Error::ListMailboxesError),
295            }
296        }?;
297
298        let folders = Folders::from_imap_mailboxes(config, mboxes);
299
300        Ok(folders)
301    }
302
303    #[instrument(skip_all, fields(client = self.id))]
304    pub async fn expunge_mailbox(&mut self, mbox: impl ToString) -> Result<usize> {
305        self.select_mailbox(mbox).await?;
306
307        self.retry.reset();
308
309        let expunged = loop {
310            let res = self.retry.timeout(self.inner.expunge()).await;
311
312            match self.retry(res).await? {
313                ImapRetryState::Retry => continue,
314                ImapRetryState::TimedOut => break Err(Error::ExpungeMailboxTimedOutError),
315                ImapRetryState::Ok(res) => break res.map_err(Error::ExpungeMailboxError),
316            }
317        }?;
318
319        Ok(expunged.len())
320    }
321
322    #[instrument(skip_all, fields(client = self.id))]
323    pub async fn purge_mailbox(&mut self, mbox: impl ToString) -> Result<usize> {
324        self.select_mailbox(mbox).await?;
325
326        self.add_deleted_flag_silently("1:*".try_into().unwrap())
327            .await?;
328
329        let expunged = loop {
330            let res = self.retry.timeout(self.inner.expunge()).await;
331
332            match self.retry(res).await? {
333                ImapRetryState::Retry => continue,
334                ImapRetryState::TimedOut => break Err(Error::ExpungeMailboxTimedOutError),
335                ImapRetryState::Ok(res) => break res.map_err(Error::ExpungeMailboxError),
336            }
337        }?;
338
339        Ok(expunged.len())
340    }
341
342    #[instrument(skip_all, fields(client = self.id))]
343    pub async fn delete_mailbox(&mut self, mbox: impl ToString) -> Result<()> {
344        self.retry.reset();
345
346        loop {
347            let res = self
348                .retry
349                .timeout(self.inner.delete(mbox.to_string()))
350                .await;
351
352            match self.retry(res).await? {
353                ImapRetryState::Retry => continue,
354                ImapRetryState::TimedOut => break Err(Error::DeleteMailboxTimedOutError),
355                ImapRetryState::Ok(res) => break res.map_err(Error::DeleteMailboxError),
356            }
357        }
358    }
359
360    #[instrument(skip_all, fields(client = self.id))]
361    pub async fn fetch_envelopes(&mut self, uids: SequenceSet) -> Result<Envelopes> {
362        self.retry.reset();
363
364        let fetches = loop {
365            let res = self
366                .retry
367                .timeout(self.inner.uid_fetch(uids.clone(), FETCH_ENVELOPES.clone()))
368                .await;
369
370            match self.retry(res).await? {
371                ImapRetryState::Retry => continue,
372                ImapRetryState::TimedOut => break Err(Error::FetchMessagesTimedOutError),
373                ImapRetryState::Ok(res) => break res.map_err(Error::FetchMessagesError),
374            }
375        }?;
376
377        Ok(Envelopes::from_imap_data_items(fetches))
378    }
379
380    #[instrument(skip_all, fields(client = self.id))]
381    pub async fn fetch_envelopes_map(
382        &mut self,
383        uids: SequenceSet,
384    ) -> Result<HashMap<String, Envelope>> {
385        let fetches = loop {
386            let res = self
387                .retry
388                .timeout(self.inner.uid_fetch(uids.clone(), FETCH_ENVELOPES.clone()))
389                .await;
390
391            match self.retry(res).await? {
392                ImapRetryState::Retry => continue,
393                ImapRetryState::TimedOut => break Err(Error::FetchMessagesTimedOutError),
394                ImapRetryState::Ok(res) => break res.map_err(Error::FetchMessagesError),
395            }
396        }?;
397
398        let map = fetches
399            .into_values()
400            .map(|items| {
401                let envelope = Envelope::from_imap_data_items(items.as_ref());
402                (envelope.id.clone(), envelope)
403            })
404            .collect();
405
406        Ok(map)
407    }
408
409    #[instrument(skip_all, fields(client = self.id))]
410    pub async fn fetch_first_envelope(&mut self, uid: u32) -> Result<Envelope> {
411        let items = loop {
412            let task = self
413                .inner
414                .uid_fetch_first(uid.try_into().unwrap(), FETCH_ENVELOPES.clone());
415
416            let res = self.retry.timeout(task).await;
417
418            match self.retry(res).await? {
419                ImapRetryState::Retry => continue,
420                ImapRetryState::TimedOut => break Err(Error::FetchMessagesTimedOutError),
421                ImapRetryState::Ok(res) => break res.map_err(Error::FetchMessagesError),
422            }
423        }?;
424
425        Ok(Envelope::from_imap_data_items(items.as_ref()))
426    }
427
428    #[instrument(skip_all, fields(client = self.id))]
429    pub async fn fetch_envelopes_by_sequence(&mut self, seq: SequenceSet) -> Result<Envelopes> {
430        let fetches = loop {
431            let res = self
432                .retry
433                .timeout(self.inner.fetch(seq.clone(), FETCH_ENVELOPES.clone()))
434                .await;
435
436            match self.retry(res).await? {
437                ImapRetryState::Retry => continue,
438                ImapRetryState::TimedOut => break Err(Error::FetchMessagesTimedOutError),
439                ImapRetryState::Ok(res) => break res.map_err(Error::FetchMessagesError),
440            }
441        }?;
442
443        Ok(Envelopes::from_imap_data_items(fetches))
444    }
445
446    #[instrument(skip_all, fields(client = self.id))]
447    pub async fn fetch_all_envelopes(&mut self) -> Result<Envelopes> {
448        self.fetch_envelopes_by_sequence("1:*".try_into().unwrap())
449            .await
450    }
451
452    #[instrument(skip_all, fields(client = self.id))]
453    pub async fn sort_uids(
454        &mut self,
455        sort_criteria: impl IntoIterator<Item = SortCriterion> + Clone,
456        search_criteria: impl IntoIterator<Item = SearchKey<'static>> + Clone,
457    ) -> Result<Vec<NonZeroU32>> {
458        loop {
459            let task = self
460                .inner
461                .uid_sort(sort_criteria.clone(), search_criteria.clone());
462
463            let res = self.retry.timeout(task).await;
464
465            match self.retry(res).await? {
466                ImapRetryState::Retry => continue,
467                ImapRetryState::TimedOut => break Err(Error::SortUidsTimedOutError),
468                ImapRetryState::Ok(res) => break res.map_err(Error::SortUidsError),
469            }
470        }
471    }
472
473    #[instrument(skip_all, fields(client = self.id))]
474    pub async fn search_uids(
475        &mut self,
476        search_criteria: impl IntoIterator<Item = SearchKey<'static>> + Clone,
477    ) -> Result<Vec<NonZeroU32>> {
478        loop {
479            let res = self
480                .retry
481                .timeout(self.inner.uid_search(search_criteria.clone()))
482                .await;
483
484            match self.retry(res).await? {
485                ImapRetryState::Retry => continue,
486                ImapRetryState::TimedOut => break Err(Error::SearchUidsTimedOutError),
487                ImapRetryState::Ok(res) => break res.map_err(Error::SearchUidsError),
488            }
489        }
490    }
491
492    #[instrument(skip_all, fields(client = self.id))]
493    pub async fn sort_envelopes(
494        &mut self,
495        sort_criteria: impl IntoIterator<Item = SortCriterion> + Clone,
496        search_criteria: impl IntoIterator<Item = SearchKey<'static>> + Clone,
497    ) -> Result<Envelopes> {
498        let fetches = loop {
499            let task = self.inner.uid_sort_or_fallback(
500                sort_criteria.clone(),
501                search_criteria.clone(),
502                FETCH_ENVELOPES.clone(),
503            );
504
505            let res = self.retry.timeout(task).await;
506
507            match self.retry(res).await? {
508                ImapRetryState::Retry => continue,
509                ImapRetryState::TimedOut => break Err(Error::FetchMessagesTimedOutError),
510                ImapRetryState::Ok(res) => break res.map_err(Error::FetchMessagesError),
511            }
512        }?;
513
514        Ok(Envelopes::from(fetches))
515    }
516
517    #[instrument(skip_all, fields(client = self.id))]
518    pub async fn thread_envelopes(
519        &mut self,
520        search_criteria: impl IntoIterator<Item = SearchKey<'static>> + Clone,
521    ) -> Result<Vec<Thread>> {
522        loop {
523            let task = self
524                .inner
525                .uid_thread(ThreadingAlgorithm::References, search_criteria.clone());
526
527            let res = self.retry.timeout(task).await;
528
529            match self.retry(res).await? {
530                ImapRetryState::Retry => continue,
531                ImapRetryState::TimedOut => break Err(Error::ThreadMessagesTimedOutError),
532                ImapRetryState::Ok(res) => break res.map_err(Error::ThreadMessagesError),
533            }
534        }
535    }
536
537    #[instrument(skip_all, fields(client = self.id))]
538    pub async fn idle(
539        &mut self,
540        wait_for_shutdown_request: &mut oneshot::Receiver<()>,
541    ) -> Result<()> {
542        let tag = self.inner.enqueue_idle();
543
544        select! {
545            output = self.inner.idle(tag.clone()) => {
546                output.map_err(Error::StartIdleError)?;
547                Ok(())
548            },
549            _ = wait_for_shutdown_request => {
550                debug!("shutdown requested, sending done command…");
551                self.inner.idle_done(tag.clone()).await.map_err(Error::StopIdleError)?;
552                Err(Error::IdleInterruptedError)
553            }
554        }
555    }
556
557    #[instrument(skip_all, fields(client = self.id))]
558    pub async fn add_flags(
559        &mut self,
560        uids: SequenceSet,
561        flags: impl IntoIterator<Item = Flag<'static>> + Clone,
562    ) -> Result<HashMap<NonZeroU32, Vec1<MessageDataItem<'static>>>> {
563        loop {
564            let task = self
565                .inner
566                .uid_store(uids.clone(), StoreType::Add, flags.clone());
567
568            let res = self.retry.timeout(task).await;
569
570            match self.retry(res).await? {
571                ImapRetryState::Retry => continue,
572                ImapRetryState::TimedOut => break Err(Error::StoreFlagsTimedOutError),
573                ImapRetryState::Ok(res) => break res.map_err(Error::StoreFlagsError),
574            }
575        }
576    }
577
578    #[instrument(skip_all, fields(client = self.id))]
579    pub async fn add_deleted_flag(
580        &mut self,
581        uids: SequenceSet,
582    ) -> Result<HashMap<NonZeroU32, Vec1<MessageDataItem<'static>>>> {
583        loop {
584            let task = self
585                .inner
586                .uid_store(uids.clone(), StoreType::Add, Some(Flag::Deleted));
587
588            let res = self.retry.timeout(task).await;
589
590            match self.retry(res).await? {
591                ImapRetryState::Retry => continue,
592                ImapRetryState::TimedOut => break Err(Error::StoreFlagsTimedOutError),
593                ImapRetryState::Ok(res) => break res.map_err(Error::StoreFlagsError),
594            }
595        }
596    }
597
598    #[instrument(skip_all, fields(client = self.id))]
599    pub async fn add_deleted_flag_silently(&mut self, uids: SequenceSet) -> Result<()> {
600        loop {
601            let task =
602                self.inner
603                    .uid_silent_store(uids.clone(), StoreType::Add, Some(Flag::Deleted));
604
605            let res = self.retry.timeout(task).await;
606
607            match self.retry(res).await? {
608                ImapRetryState::Retry => continue,
609                ImapRetryState::TimedOut => break Err(Error::StoreFlagsTimedOutError),
610                ImapRetryState::Ok(res) => break res.map_err(Error::StoreFlagsError),
611            }
612        }
613    }
614
615    #[instrument(skip_all, fields(client = self.id))]
616    pub async fn add_flags_silently(
617        &mut self,
618        uids: SequenceSet,
619        flags: impl IntoIterator<Item = Flag<'static>> + Clone,
620    ) -> Result<()> {
621        loop {
622            let task = self
623                .inner
624                .uid_silent_store(uids.clone(), StoreType::Add, flags.clone());
625
626            let res = self.retry.timeout(task).await;
627
628            match self.retry(res).await? {
629                ImapRetryState::Retry => continue,
630                ImapRetryState::TimedOut => break Err(Error::StoreFlagsTimedOutError),
631                ImapRetryState::Ok(res) => break res.map_err(Error::StoreFlagsError),
632            }
633        }
634    }
635
636    #[instrument(skip_all, fields(client = self.id))]
637    pub async fn set_flags(
638        &mut self,
639        uids: SequenceSet,
640        flags: impl IntoIterator<Item = Flag<'static>> + Clone,
641    ) -> Result<HashMap<NonZeroU32, Vec1<MessageDataItem<'static>>>> {
642        loop {
643            let task = self
644                .inner
645                .uid_store(uids.clone(), StoreType::Replace, flags.clone());
646
647            let res = self.retry.timeout(task).await;
648
649            match self.retry(res).await? {
650                ImapRetryState::Retry => continue,
651                ImapRetryState::TimedOut => break Err(Error::StoreFlagsTimedOutError),
652                ImapRetryState::Ok(res) => break res.map_err(Error::StoreFlagsError),
653            }
654        }
655    }
656
657    #[instrument(skip_all, fields(client = self.id))]
658    pub async fn set_flags_silently(
659        &mut self,
660        uids: SequenceSet,
661        flags: impl IntoIterator<Item = Flag<'static>> + Clone,
662    ) -> Result<()> {
663        loop {
664            let task = self
665                .inner
666                .uid_silent_store(uids.clone(), StoreType::Replace, flags.clone());
667
668            let res = self.retry.timeout(task).await;
669
670            match self.retry(res).await? {
671                ImapRetryState::Retry => continue,
672                ImapRetryState::TimedOut => break Err(Error::StoreFlagsTimedOutError),
673                ImapRetryState::Ok(res) => break res.map_err(Error::StoreFlagsError),
674            }
675        }
676    }
677
678    #[instrument(skip_all, fields(client = self.id))]
679    pub async fn remove_flags(
680        &mut self,
681        uids: SequenceSet,
682        flags: impl IntoIterator<Item = Flag<'static>> + Clone,
683    ) -> Result<HashMap<NonZeroU32, Vec1<MessageDataItem<'static>>>> {
684        loop {
685            let task = self
686                .inner
687                .uid_store(uids.clone(), StoreType::Remove, flags.clone());
688
689            let res = self.retry.timeout(task).await;
690
691            match self.retry(res).await? {
692                ImapRetryState::Retry => continue,
693                ImapRetryState::TimedOut => break Err(Error::StoreFlagsTimedOutError),
694                ImapRetryState::Ok(res) => break res.map_err(Error::StoreFlagsError),
695            }
696        }
697    }
698
699    #[instrument(skip_all, fields(client = self.id))]
700    pub async fn remove_flags_silently(
701        &mut self,
702        uids: SequenceSet,
703        flags: impl IntoIterator<Item = Flag<'static>> + Clone,
704    ) -> Result<()> {
705        loop {
706            let task = self
707                .inner
708                .uid_silent_store(uids.clone(), StoreType::Remove, flags.clone());
709
710            let res = self.retry.timeout(task).await;
711
712            match self.retry(res).await? {
713                ImapRetryState::Retry => continue,
714                ImapRetryState::TimedOut => break Err(Error::StoreFlagsTimedOutError),
715                ImapRetryState::Ok(res) => break res.map_err(Error::StoreFlagsError),
716            }
717        }
718    }
719
720    #[instrument(skip_all, fields(client = self.id))]
721    pub async fn add_message(
722        &mut self,
723        mbox: impl ToString,
724        flags: impl IntoIterator<Item = Flag<'static>> + Clone,
725        msg: impl AsRef<[u8]> + Clone,
726    ) -> Result<NonZeroU32> {
727        let id = loop {
728            let task =
729                self.inner
730                    .appenduid_or_fallback(mbox.to_string(), flags.clone(), msg.clone());
731
732            let res = self.retry.timeout(task).await;
733
734            match self.retry(res).await? {
735                ImapRetryState::Retry => continue,
736                ImapRetryState::TimedOut => break Err(Error::AddMessageTimedOutError),
737                ImapRetryState::Ok(res) => break res.map_err(Error::AddMessageError),
738            }
739        }?;
740
741        id.ok_or(Error::FindAppendedMessageUidError)
742    }
743
744    #[instrument(skip_all, fields(client = self.id))]
745    pub async fn fetch_messages(&mut self, uids: SequenceSet) -> Result<Messages> {
746        let mut fetches = loop {
747            let res = self
748                .retry
749                .timeout(self.inner.uid_fetch(uids.clone(), FETCH_MESSAGES.clone()))
750                .await;
751
752            match self.retry(res).await? {
753                ImapRetryState::Retry => continue,
754                ImapRetryState::TimedOut => break Err(Error::FetchMessagesTimedOutError),
755                ImapRetryState::Ok(res) => break res.map_err(Error::FetchMessagesError),
756            }
757        }?;
758
759        let fetches: Vec<_> = uids
760            .iter(NonZeroU32::MAX)
761            .filter_map(|ref uid| fetches.remove(uid))
762            .collect();
763
764        Ok(Messages::from(fetches))
765    }
766
767    #[instrument(skip_all, fields(client = self.id))]
768    pub async fn peek_messages(&mut self, uids: SequenceSet) -> Result<Messages> {
769        let mut fetches = loop {
770            let res = self
771                .retry
772                .timeout(self.inner.uid_fetch(uids.clone(), PEEK_MESSAGES.clone()))
773                .await;
774
775            match self.retry(res).await? {
776                ImapRetryState::Retry => continue,
777                ImapRetryState::TimedOut => break Err(Error::FetchMessagesTimedOutError),
778                ImapRetryState::Ok(res) => break res.map_err(Error::FetchMessagesError),
779            }
780        }?;
781
782        let fetches: Vec<_> = uids
783            .iter(NonZeroU32::MAX)
784            .filter_map(|ref uid| fetches.remove(uid))
785            .collect();
786
787        Ok(Messages::from(fetches))
788    }
789
790    #[instrument(skip_all, fields(client = self.id))]
791    pub async fn copy_messages(&mut self, uids: SequenceSet, mbox: impl ToString) -> Result<()> {
792        loop {
793            let res = self
794                .retry
795                .timeout(self.inner.uid_copy(uids.clone(), mbox.to_string()))
796                .await;
797
798            match self.retry(res).await? {
799                ImapRetryState::Retry => continue,
800                ImapRetryState::TimedOut => break Err(Error::CopyMessagesTimedOutError),
801                ImapRetryState::Ok(res) => break res.map_err(Error::CopyMessagesError),
802            }
803        }
804    }
805
806    #[instrument(skip_all, fields(client = self.id))]
807    pub async fn move_messages(&mut self, uids: SequenceSet, mbox: impl ToString) -> Result<()> {
808        loop {
809            let res = self
810                .retry
811                .timeout(self.inner.uid_move(uids.clone(), mbox.to_string()))
812                .await;
813
814            match self.retry(res).await? {
815                ImapRetryState::Retry => continue,
816                ImapRetryState::TimedOut => break Err(Error::MoveMessagesTimedOutError),
817                ImapRetryState::Ok(res) => break res.map_err(Error::MoveMessagesError),
818            }
819        }
820    }
821}
822
823impl fmt::Debug for ImapClient {
824    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
825        f.debug_struct("ImapContext")
826            .field("id", &self.id)
827            .field("mailbox", &self.mailbox)
828            .field("imap_config", &self.imap_config)
829            .finish_non_exhaustive()
830    }
831}
832
833/// The sync version of the IMAP backend context.
834///
835/// This is just an IMAP session wrapped into a mutex, so the same
836/// IMAP session can be shared and updated across multiple threads.
837#[derive(Debug, Clone)]
838pub struct ImapContext {
839    /// The account configuration.
840    pub account_config: Arc<AccountConfig>,
841
842    /// The IMAP configuration.
843    pub imap_config: Arc<ImapConfig>,
844
845    clients: Vec<Arc<Mutex<ImapClient>>>,
846}
847
848impl ImapContext {
849    pub async fn client(&self) -> MutexGuard<'_, ImapClient> {
850        loop {
851            let lock = self
852                .clients
853                .iter()
854                .find_map(|client| client.try_lock().ok());
855
856            if let Some(ctx) = lock {
857                let total = self.clients.len();
858                let id = ctx.id;
859                debug!("client {id}/{total} is free, locking it");
860                break ctx;
861            } else {
862                trace!("no free client, sleeping for 1s");
863                sleep(Duration::from_secs(1)).await;
864            }
865        }
866    }
867}
868
869impl BackendContext for ImapContext {}
870
871/// The IMAP backend context builder.
872#[derive(Clone, Debug, Default, Eq, PartialEq)]
873pub struct ImapContextBuilder {
874    /// The account configuration.
875    pub account_config: Arc<AccountConfig>,
876
877    /// The IMAP configuration.
878    pub imap_config: Arc<ImapConfig>,
879
880    /// The prebuilt IMAP credentials.
881    prebuilt_credentials: Option<String>,
882
883    pool_size: u8,
884}
885
886impl ImapContextBuilder {
887    pub fn new(account_config: Arc<AccountConfig>, imap_config: Arc<ImapConfig>) -> Self {
888        let pool_size = imap_config.clients_pool_size();
889
890        Self {
891            account_config,
892            imap_config,
893            prebuilt_credentials: None,
894            pool_size,
895        }
896    }
897
898    pub async fn prebuild_credentials(&mut self) -> Result<()> {
899        self.prebuilt_credentials = Some(self.imap_config.build_credentials().await?);
900        Ok(())
901    }
902
903    pub async fn with_prebuilt_credentials(mut self) -> Result<Self> {
904        self.prebuild_credentials().await?;
905        Ok(self)
906    }
907
908    pub fn with_pool_size(mut self, pool_size: u8) -> Self {
909        self.pool_size = pool_size;
910        self
911    }
912}
913
914#[cfg(feature = "sync")]
915impl crate::sync::hash::SyncHash for ImapContextBuilder {
916    fn sync_hash(&self, state: &mut std::hash::DefaultHasher) {
917        self.imap_config.sync_hash(state);
918    }
919}
920
921#[async_trait]
922impl BackendContextBuilder for ImapContextBuilder {
923    type Context = ImapContext;
924
925    fn check_up(&self) -> Option<BackendFeature<Self::Context, dyn CheckUp>> {
926        Some(Arc::new(CheckUpImap::some_new_boxed))
927    }
928
929    fn add_folder(&self) -> Option<BackendFeature<Self::Context, dyn AddFolder>> {
930        Some(Arc::new(AddImapFolder::some_new_boxed))
931    }
932
933    fn list_folders(&self) -> Option<BackendFeature<Self::Context, dyn ListFolders>> {
934        Some(Arc::new(ListImapFolders::some_new_boxed))
935    }
936
937    fn expunge_folder(&self) -> Option<BackendFeature<Self::Context, dyn ExpungeFolder>> {
938        Some(Arc::new(ExpungeImapFolder::some_new_boxed))
939    }
940
941    fn purge_folder(&self) -> Option<BackendFeature<Self::Context, dyn PurgeFolder>> {
942        Some(Arc::new(PurgeImapFolder::some_new_boxed))
943    }
944
945    fn delete_folder(&self) -> Option<BackendFeature<Self::Context, dyn DeleteFolder>> {
946        Some(Arc::new(DeleteImapFolder::some_new_boxed))
947    }
948
949    fn get_envelope(&self) -> Option<BackendFeature<Self::Context, dyn GetEnvelope>> {
950        Some(Arc::new(GetImapEnvelope::some_new_boxed))
951    }
952
953    fn list_envelopes(&self) -> Option<BackendFeature<Self::Context, dyn ListEnvelopes>> {
954        Some(Arc::new(ListImapEnvelopes::some_new_boxed))
955    }
956
957    #[cfg(feature = "thread")]
958    fn thread_envelopes(&self) -> Option<BackendFeature<Self::Context, dyn ThreadEnvelopes>> {
959        Some(Arc::new(ThreadImapEnvelopes::some_new_boxed))
960    }
961
962    #[cfg(feature = "watch")]
963    fn watch_envelopes(&self) -> Option<BackendFeature<Self::Context, dyn WatchEnvelopes>> {
964        Some(Arc::new(WatchImapEnvelopes::some_new_boxed))
965    }
966
967    fn add_flags(&self) -> Option<BackendFeature<Self::Context, dyn AddFlags>> {
968        Some(Arc::new(AddImapFlags::some_new_boxed))
969    }
970
971    fn set_flags(&self) -> Option<BackendFeature<Self::Context, dyn SetFlags>> {
972        Some(Arc::new(SetImapFlags::some_new_boxed))
973    }
974
975    fn remove_flags(&self) -> Option<BackendFeature<Self::Context, dyn RemoveFlags>> {
976        Some(Arc::new(RemoveImapFlags::some_new_boxed))
977    }
978
979    fn add_message(&self) -> Option<BackendFeature<Self::Context, dyn AddMessage>> {
980        Some(Arc::new(AddImapMessage::some_new_boxed))
981    }
982
983    fn peek_messages(&self) -> Option<BackendFeature<Self::Context, dyn PeekMessages>> {
984        Some(Arc::new(PeekImapMessages::some_new_boxed))
985    }
986
987    fn get_messages(&self) -> Option<BackendFeature<Self::Context, dyn GetMessages>> {
988        Some(Arc::new(GetImapMessages::some_new_boxed))
989    }
990
991    fn copy_messages(&self) -> Option<BackendFeature<Self::Context, dyn CopyMessages>> {
992        Some(Arc::new(CopyImapMessages::some_new_boxed))
993    }
994
995    fn move_messages(&self) -> Option<BackendFeature<Self::Context, dyn MoveMessages>> {
996        Some(Arc::new(MoveImapMessages::some_new_boxed))
997    }
998
999    fn delete_messages(&self) -> Option<BackendFeature<Self::Context, dyn DeleteMessages>> {
1000        Some(Arc::new(DeleteImapMessages::some_new_boxed))
1001    }
1002
1003    fn remove_messages(&self) -> Option<BackendFeature<Self::Context, dyn RemoveMessages>> {
1004        Some(Arc::new(RemoveImapMessages::some_new_boxed))
1005    }
1006
1007    async fn build(self) -> AnyResult<Self::Context> {
1008        let client_builder =
1009            ImapClientBuilder::new(self.imap_config.clone(), self.prebuilt_credentials);
1010
1011        debug!("building {} IMAP clients", self.pool_size);
1012
1013        let clients = FuturesUnordered::from_iter((0..self.pool_size).map(move |i| {
1014            let mut client_builder = client_builder.clone();
1015            tokio::spawn(async move {
1016                let client = client_builder.build().await?;
1017                Ok((i + 1, client_builder, client))
1018            })
1019        }))
1020        .map(|res| match res {
1021            Err(err) => Err(Error::JoinClientError(err)),
1022            Ok(Err(err)) => Err(Error::BuildClientError(Box::new(err))),
1023            Ok(Ok((id, client_builder, inner))) => Ok(Arc::new(Mutex::new(ImapClient {
1024                id,
1025                account_config: self.account_config.clone(),
1026                imap_config: self.imap_config.clone(),
1027                client_builder,
1028                inner,
1029                mailbox: Default::default(),
1030                retry: Default::default(),
1031            }))),
1032        })
1033        .collect::<Vec<_>>()
1034        .await
1035        .into_iter()
1036        .collect::<Result<_>>()?;
1037
1038        Ok(ImapContext {
1039            account_config: self.account_config,
1040            imap_config: self.imap_config,
1041            clients,
1042        })
1043    }
1044}
1045
1046#[derive(Clone, Debug)]
1047pub struct CheckUpImap {
1048    ctx: ImapContext,
1049}
1050
1051impl CheckUpImap {
1052    pub fn new(ctx: &ImapContext) -> Self {
1053        Self { ctx: ctx.clone() }
1054    }
1055
1056    pub fn new_boxed(ctx: &ImapContext) -> Box<dyn CheckUp> {
1057        Box::new(Self::new(ctx))
1058    }
1059
1060    pub fn some_new_boxed(ctx: &ImapContext) -> Option<Box<dyn CheckUp>> {
1061        Some(Self::new_boxed(ctx))
1062    }
1063}
1064
1065#[async_trait]
1066impl CheckUp for CheckUpImap {
1067    #[instrument(skip_all)]
1068    async fn check_up(&self) -> AnyResult<()> {
1069        debug!("executing check up backend feature");
1070        Ok(self.ctx.client().await.noop().await?)
1071    }
1072}
1073
1074#[derive(Clone, Debug)]
1075pub struct ImapClientBuilder {
1076    pub config: Arc<ImapConfig>,
1077    pub credentials: Option<String>,
1078}
1079
1080impl ImapClientBuilder {
1081    pub fn new(config: Arc<ImapConfig>, credentials: Option<String>) -> Self {
1082        Self {
1083            config,
1084            credentials,
1085        }
1086    }
1087
1088    /// Creates a new session from an IMAP configuration and optional
1089    /// pre-built credentials.
1090    ///
1091    /// Pre-built credentials are useful to prevent building them
1092    /// every time a new session is created. The main use case is for
1093    /// the synchronization, where multiple sessions can be created in
1094    /// a row.
1095    #[instrument(name = "client::build", skip(self))]
1096    pub async fn build(&mut self) -> Result<Client> {
1097        let mut client = match &self.config.encryption {
1098            Some(Encryption::None) => Client::insecure(&self.config.host, self.config.port)
1099                .await
1100                .map_err(|err| {
1101                    let host = self.config.host.clone();
1102                    let port = self.config.port.clone();
1103                    Error::BuildInsecureClientError(err, host, port)
1104                })?,
1105            Some(Encryption::Tls(Tls {
1106                provider: Some(TlsProvider::None),
1107            }))
1108            | Some(Encryption::StartTls(Tls {
1109                provider: Some(TlsProvider::None),
1110            })) => {
1111                return Err(Error::BuildTlsClientMissingProvider);
1112            }
1113            #[cfg(feature = "rustls")]
1114            Some(Encryption::Tls(Tls {
1115                provider: Some(TlsProvider::Rustls(_)) | None,
1116            }))
1117            | None => Client::rustls(&self.config.host, self.config.port, false)
1118                .await
1119                .map_err(|err| {
1120                    let host = self.config.host.clone();
1121                    let port = self.config.port.clone();
1122                    Error::BuildStartTlsClientError(err, host, port)
1123                })?,
1124            #[cfg(feature = "native-tls")]
1125            Some(Encryption::Tls(Tls {
1126                provider: Some(TlsProvider::NativeTls(_)),
1127            })) => Client::native_tls(&self.config.host, self.config.port, false)
1128                .await
1129                .map_err(|err| {
1130                    let host = self.config.host.clone();
1131                    let port = self.config.port.clone();
1132                    Error::BuildStartTlsClientError(err, host, port)
1133                })?,
1134            #[cfg(feature = "rustls")]
1135            Some(Encryption::StartTls(Tls {
1136                provider: Some(TlsProvider::Rustls(_)) | None,
1137            })) => Client::rustls(&self.config.host, self.config.port, true)
1138                .await
1139                .map_err(|err| {
1140                    let host = self.config.host.clone();
1141                    let port = self.config.port.clone();
1142                    Error::BuildStartTlsClientError(err, host, port)
1143                })?,
1144            #[cfg(feature = "native-tls")]
1145            Some(Encryption::StartTls(Tls {
1146                provider: Some(TlsProvider::NativeTls(_)),
1147            })) => Client::native_tls(&self.config.host, self.config.port, true)
1148                .await
1149                .map_err(|err| {
1150                    let host = self.config.host.clone();
1151                    let port = self.config.port.clone();
1152                    Error::BuildStartTlsClientError(err, host, port)
1153                })?,
1154        };
1155
1156        client
1157            .state
1158            .set_some_idle_timeout(self.config.find_watch_timeout().map(Duration::from_secs));
1159
1160        match &self.config.auth {
1161            ImapAuthConfig::Password(passwd) => {
1162                debug!("using password authentication");
1163
1164                let passwd = match self.credentials.as_ref() {
1165                    Some(passwd) => passwd.to_string(),
1166                    None => passwd
1167                        .get()
1168                        .await
1169                        .map_err(Error::GetPasswdImapError)?
1170                        .lines()
1171                        .next()
1172                        .ok_or(Error::GetPasswdEmptyImapError)?
1173                        .to_owned(),
1174                };
1175
1176                let mechanisms: Vec<_> =
1177                    client.state.supported_auth_mechanisms().cloned().collect();
1178                let mut authenticated = false;
1179
1180                debug!(?mechanisms, "supported auth mechanisms");
1181
1182                for mechanism in mechanisms {
1183                    debug!(?mechanism, "trying auth mechanism…");
1184
1185                    let auth = match mechanism {
1186                        AuthMechanism::Plain => {
1187                            client
1188                                .authenticate_plain(self.config.login.as_str(), passwd.as_str())
1189                                .await
1190                        }
1191                        // TODO
1192                        // AuthMechanism::Login => {
1193                        //     client
1194                        //         .authenticate_login(self.config.login.as_str(), passwd.as_str())
1195                        //         .await
1196                        // }
1197                        _ => {
1198                            continue;
1199                        }
1200                    };
1201
1202                    if let Err(ref err) = auth {
1203                        warn!(?mechanism, ?err, "authentication failed");
1204                    }
1205
1206                    if auth.is_ok() {
1207                        debug!(?mechanism, "authentication succeeded!");
1208                        authenticated = true;
1209                        break;
1210                    }
1211                }
1212
1213                if !authenticated {
1214                    if !client.state.login_supported() {
1215                        return Err(Error::LoginNotSupportedError);
1216                    }
1217
1218                    debug!("trying login…");
1219
1220                    client
1221                        .login(self.config.login.as_str(), passwd.as_str())
1222                        .await
1223                        .map_err(Error::LoginError)?;
1224
1225                    debug!("login succeeded!");
1226                }
1227            }
1228            #[cfg(feature = "oauth2")]
1229            ImapAuthConfig::OAuth2(oauth2) => {
1230                debug!("using OAuth 2.0 authentication");
1231
1232                match oauth2.method {
1233                    OAuth2Method::XOAuth2 => {
1234                        if !client.state.supports_auth_mechanism(AuthMechanism::XOAuth2) {
1235                            let auth = client.state.supported_auth_mechanisms().cloned().collect();
1236                            return Err(Error::AuthenticateXOAuth2NotSupportedError(auth));
1237                        }
1238
1239                        debug!("using XOAUTH2 auth mechanism");
1240
1241                        let access_token = match self.credentials.as_ref() {
1242                            Some(access_token) => access_token.to_string(),
1243                            None => oauth2
1244                                .access_token()
1245                                .await
1246                                .map_err(Error::RefreshAccessTokenError)?,
1247                        };
1248
1249                        let auth = client
1250                            .authenticate_xoauth2(self.config.login.as_str(), access_token.as_str())
1251                            .await;
1252
1253                        if auth.is_err() {
1254                            warn!("authentication failed, refreshing access token and retrying…");
1255
1256                            let access_token = oauth2
1257                                .refresh_access_token()
1258                                .await
1259                                .map_err(Error::RefreshAccessTokenError)?;
1260
1261                            client
1262                                .authenticate_xoauth2(
1263                                    self.config.login.as_str(),
1264                                    access_token.as_str(),
1265                                )
1266                                .await
1267                                .map_err(Error::AuthenticateXOauth2Error)?;
1268
1269                            self.credentials = Some(access_token);
1270                        }
1271                    }
1272                    OAuth2Method::OAuthBearer => {
1273                        if !client
1274                            .state
1275                            .supports_auth_mechanism("OAUTHBEARER".try_into().unwrap())
1276                        {
1277                            let auth = client.state.supported_auth_mechanisms().cloned().collect();
1278                            return Err(Error::AuthenticateOAuthBearerNotSupportedError(auth));
1279                        }
1280
1281                        debug!("using OAUTHBEARER auth mechanism");
1282
1283                        let access_token = match self.credentials.as_ref() {
1284                            Some(access_token) => access_token.to_string(),
1285                            None => oauth2
1286                                .access_token()
1287                                .await
1288                                .map_err(Error::RefreshAccessTokenError)?,
1289                        };
1290
1291                        let auth = client
1292                            .authenticate_oauthbearer(
1293                                self.config.login.as_str(),
1294                                self.config.host.as_str(),
1295                                self.config.port,
1296                                access_token.as_str(),
1297                            )
1298                            .await;
1299
1300                        if auth.is_err() {
1301                            warn!("authentication failed, refreshing access token and retrying");
1302
1303                            let access_token = oauth2
1304                                .refresh_access_token()
1305                                .await
1306                                .map_err(Error::RefreshAccessTokenError)?;
1307
1308                            client
1309                                .authenticate_oauthbearer(
1310                                    self.config.login.as_str(),
1311                                    self.config.host.as_str(),
1312                                    self.config.port,
1313                                    access_token.as_str(),
1314                                )
1315                                .await
1316                                .map_err(Error::AuthenticateOAuthBearerError)?;
1317
1318                            self.credentials = Some(access_token);
1319                        }
1320                    }
1321                }
1322            }
1323        };
1324
1325        if self.config.send_id_after_auth() {
1326            let params = ID_PARAMS.clone();
1327            debug!(?params, "client identity");
1328
1329            let params = client
1330                .id(Some(ID_PARAMS.clone()))
1331                .await
1332                .map_err(Error::ExchangeIdsError)?;
1333
1334            debug!(?params, "server identity");
1335        }
1336
1337        // TODO: make it customizable
1338        //
1339        // debug!("enabling UTF8 capability…");
1340        //
1341        // client
1342        //     .enable(Some(CapabilityEnable::Utf8(Utf8Kind::Accept)))
1343        //     .await
1344        //     .map_err(Error::EnableCapabilityError)?;
1345
1346        Ok(client)
1347    }
1348}