1pub mod config;
2mod error;
3
4use std::{
5 collections::HashMap, env, fmt, io::ErrorKind::ConnectionReset, num::NonZeroU32, sync::Arc,
6 time::Duration,
7};
8
9use async_trait::async_trait;
10use futures::{stream::FuturesUnordered, StreamExt};
11use imap_client::{
12 client::tokio::{Client, ClientError},
13 imap_next::imap_types::{
14 auth::AuthMechanism,
15 core::{IString, NString, Vec1},
16 extensions::{
17 sort::SortCriterion,
18 thread::{Thread, ThreadingAlgorithm},
19 },
20 fetch::MessageDataItem,
21 flag::{Flag, StoreType},
22 search::SearchKey,
23 sequence::SequenceSet,
24 },
25 stream::Error as StreamError,
26 tasks::{tasks::select::SelectDataUnvalidated, SchedulerError},
27};
28use once_cell::sync::Lazy;
29use tokio::{
30 select,
31 sync::{oneshot, Mutex, MutexGuard},
32 time::sleep,
33};
34use tracing::{debug, instrument, trace, warn};
35
36use self::config::{ImapAuthConfig, ImapConfig};
37#[doc(inline)]
38pub use self::error::{Error, Result};
39#[cfg(feature = "oauth2")]
40use crate::account::config::oauth2::OAuth2Method;
41#[cfg(feature = "thread")]
42use crate::envelope::thread::{imap::ThreadImapEnvelopes, ThreadEnvelopes};
43#[cfg(feature = "watch")]
44use crate::envelope::watch::{imap::WatchImapEnvelopes, WatchEnvelopes};
45use crate::{
46 account::config::AccountConfig,
47 backend::{
48 context::{BackendContext, BackendContextBuilder},
49 feature::{BackendFeature, CheckUp},
50 },
51 envelope::{
52 get::{imap::GetImapEnvelope, GetEnvelope},
53 imap::FETCH_ENVELOPES,
54 list::{imap::ListImapEnvelopes, ListEnvelopes},
55 Envelope, Envelopes,
56 },
57 flag::{
58 add::{imap::AddImapFlags, AddFlags},
59 remove::{imap::RemoveImapFlags, RemoveFlags},
60 set::{imap::SetImapFlags, SetFlags},
61 },
62 folder::{
63 add::{imap::AddImapFolder, AddFolder},
64 delete::{imap::DeleteImapFolder, DeleteFolder},
65 expunge::{imap::ExpungeImapFolder, ExpungeFolder},
66 list::{imap::ListImapFolders, ListFolders},
67 purge::{imap::PurgeImapFolder, PurgeFolder},
68 Folders,
69 },
70 message::{
71 add::{imap::AddImapMessage, AddMessage},
72 copy::{imap::CopyImapMessages, CopyMessages},
73 delete::{imap::DeleteImapMessages, DeleteMessages},
74 get::{imap::GetImapMessages, GetMessages},
75 imap::{FETCH_MESSAGES, PEEK_MESSAGES},
76 peek::{imap::PeekImapMessages, PeekMessages},
77 r#move::{imap::MoveImapMessages, MoveMessages},
78 remove::{imap::RemoveImapMessages, RemoveMessages},
79 Messages,
80 },
81 retry::{self, Retry, RetryState},
82 tls::{Encryption, Tls, TlsProvider},
83 AnyResult,
84};
85
86static ID_PARAMS: Lazy<Vec<(IString<'static>, NString<'static>)>> = Lazy::new(|| {
87 vec![
88 (
89 "name".try_into().unwrap(),
90 NString(
91 env::var("CARGO_PKG_NAME")
92 .ok()
93 .map(|e| e.try_into().unwrap()),
94 ),
95 ),
96 (
97 "vendor".try_into().unwrap(),
98 NString(
99 env::var("CARGO_PKG_NAME")
100 .ok()
101 .map(|e| e.try_into().unwrap()),
102 ),
103 ),
104 (
105 "version".try_into().unwrap(),
106 NString(
107 env::var("CARGO_PKG_VERSION")
108 .ok()
109 .map(|e| e.try_into().unwrap()),
110 ),
111 ),
112 (
113 "support-url".try_into().unwrap(),
114 NString(Some(
115 "https://github.com/orgs/pimalaya/discussions/new?category=q-a"
116 .try_into()
117 .unwrap(),
118 )),
119 ),
120 ]
121});
122
123enum ImapRetryState<T> {
124 Retry,
125 TimedOut,
126 Ok(std::result::Result<T, ClientError>),
127}
128
129pub struct ImapClient {
134 pub id: u8,
135
136 pub account_config: Arc<AccountConfig>,
138
139 pub imap_config: Arc<ImapConfig>,
141
142 pub client_builder: ImapClientBuilder,
144
145 inner: Client,
147
148 mailbox: Option<String>,
150
151 retry: Retry,
152}
153
154impl ImapClient {
155 async fn retry<T>(
156 &mut self,
157 res: retry::Result<std::result::Result<T, ClientError>>,
158 ) -> Result<ImapRetryState<T>> {
159 match self.retry.next(res) {
160 RetryState::Retry => {
161 debug!(attempt = self.retry.attempts, "request timed out");
162 Ok(ImapRetryState::Retry)
163 }
164 RetryState::TimedOut => {
165 return Ok(ImapRetryState::TimedOut);
166 }
167 RetryState::Ok(Err(ClientError::Stream(err))) => {
168 match err {
169 StreamError::State(SchedulerError::UnexpectedByeResponse(bye)) => {
170 debug!(reason = bye.text.to_string(), "stream closed");
171 }
172 StreamError::Io(err) if err.kind() == ConnectionReset => {
173 debug!("connection reset");
174 }
175 StreamError::Closed => {
176 debug!("stream closed");
177 }
178 StreamError::Io(err) if err.kind() == ConnectionReset => {
179 debug!("connection reset");
180 }
181 err => {
182 let err = ClientError::Stream(err);
183 return Ok(ImapRetryState::Ok(Err(err)));
184 }
185 };
186
187 debug!("re-connecting…");
188
189 self.inner = self.client_builder.build().await?;
190
191 if let Some(mbox) = &self.mailbox {
192 self.inner
193 .select(mbox.clone())
194 .await
195 .map_err(Error::SelectMailboxError)?;
196 }
197
198 self.retry.attempts = 0;
199 Ok(ImapRetryState::Retry)
200 }
201 RetryState::Ok(res) => {
202 return Ok(ImapRetryState::Ok(res));
203 }
204 }
205 }
206
207 pub fn ext_sort_supported(&self) -> bool {
208 self.inner.state.ext_sort_supported()
209 }
210
211 #[instrument(skip_all, fields(client = self.id))]
212 pub async fn noop(&mut self) -> Result<()> {
213 self.retry.reset();
214
215 loop {
216 let res = self.retry.timeout(self.inner.noop()).await;
217
218 match self.retry(res).await? {
219 ImapRetryState::Retry => continue,
220 ImapRetryState::TimedOut => break Err(Error::NoOpTimedOutError),
221 ImapRetryState::Ok(res) => break res.map_err(Error::NoOpError),
222 }
223 }
224 }
225
226 #[instrument(skip_all, fields(client = self.id))]
227 pub async fn select_mailbox(&mut self, mbox: impl ToString) -> Result<SelectDataUnvalidated> {
228 self.retry.reset();
229
230 let data = loop {
231 let res = self
232 .retry
233 .timeout(self.inner.select(mbox.to_string()))
234 .await;
235
236 match self.retry(res).await? {
237 ImapRetryState::Retry => continue,
238 ImapRetryState::TimedOut => break Err(Error::SelectMailboxTimedOutError),
239 ImapRetryState::Ok(res) => break res.map_err(Error::SelectMailboxError),
240 }
241 }?;
242
243 self.mailbox = Some(mbox.to_string());
244
245 Ok(data)
246 }
247
248 #[instrument(skip_all, fields(client = self.id))]
249 pub async fn examine_mailbox(&mut self, mbox: impl ToString) -> Result<SelectDataUnvalidated> {
250 self.retry.reset();
251
252 loop {
253 let res = self
254 .retry
255 .timeout(self.inner.examine(mbox.to_string()))
256 .await;
257
258 match self.retry(res).await? {
259 ImapRetryState::Retry => continue,
260 ImapRetryState::TimedOut => break Err(Error::ExamineMailboxTimedOutError),
261 ImapRetryState::Ok(res) => break res.map_err(Error::ExamineMailboxError),
262 }
263 }
264 }
265
266 #[instrument(skip_all, fields(client = self.id))]
267 pub async fn create_mailbox(&mut self, mbox: impl ToString) -> Result<()> {
268 self.retry.reset();
269
270 loop {
271 let res = self
272 .retry
273 .timeout(self.inner.create(mbox.to_string()))
274 .await;
275
276 match self.retry(res).await? {
277 ImapRetryState::Retry => continue,
278 ImapRetryState::TimedOut => break Err(Error::CreateMailboxTimedOutError),
279 ImapRetryState::Ok(res) => break res.map_err(Error::CreateMailboxError),
280 }
281 }
282 }
283
284 #[instrument(skip_all, fields(client = self.id))]
285 pub async fn list_all_mailboxes(&mut self, config: &AccountConfig) -> Result<Folders> {
286 self.retry.reset();
287
288 let mboxes = loop {
289 let res = self.retry.timeout(self.inner.list("", "*")).await;
290
291 match self.retry(res).await? {
292 ImapRetryState::Retry => continue,
293 ImapRetryState::TimedOut => break Err(Error::ListMailboxesTimedOutError),
294 ImapRetryState::Ok(res) => break res.map_err(Error::ListMailboxesError),
295 }
296 }?;
297
298 let folders = Folders::from_imap_mailboxes(config, mboxes);
299
300 Ok(folders)
301 }
302
303 #[instrument(skip_all, fields(client = self.id))]
304 pub async fn expunge_mailbox(&mut self, mbox: impl ToString) -> Result<usize> {
305 self.select_mailbox(mbox).await?;
306
307 self.retry.reset();
308
309 let expunged = loop {
310 let res = self.retry.timeout(self.inner.expunge()).await;
311
312 match self.retry(res).await? {
313 ImapRetryState::Retry => continue,
314 ImapRetryState::TimedOut => break Err(Error::ExpungeMailboxTimedOutError),
315 ImapRetryState::Ok(res) => break res.map_err(Error::ExpungeMailboxError),
316 }
317 }?;
318
319 Ok(expunged.len())
320 }
321
322 #[instrument(skip_all, fields(client = self.id))]
323 pub async fn purge_mailbox(&mut self, mbox: impl ToString) -> Result<usize> {
324 self.select_mailbox(mbox).await?;
325
326 self.add_deleted_flag_silently("1:*".try_into().unwrap())
327 .await?;
328
329 let expunged = loop {
330 let res = self.retry.timeout(self.inner.expunge()).await;
331
332 match self.retry(res).await? {
333 ImapRetryState::Retry => continue,
334 ImapRetryState::TimedOut => break Err(Error::ExpungeMailboxTimedOutError),
335 ImapRetryState::Ok(res) => break res.map_err(Error::ExpungeMailboxError),
336 }
337 }?;
338
339 Ok(expunged.len())
340 }
341
342 #[instrument(skip_all, fields(client = self.id))]
343 pub async fn delete_mailbox(&mut self, mbox: impl ToString) -> Result<()> {
344 self.retry.reset();
345
346 loop {
347 let res = self
348 .retry
349 .timeout(self.inner.delete(mbox.to_string()))
350 .await;
351
352 match self.retry(res).await? {
353 ImapRetryState::Retry => continue,
354 ImapRetryState::TimedOut => break Err(Error::DeleteMailboxTimedOutError),
355 ImapRetryState::Ok(res) => break res.map_err(Error::DeleteMailboxError),
356 }
357 }
358 }
359
360 #[instrument(skip_all, fields(client = self.id))]
361 pub async fn fetch_envelopes(&mut self, uids: SequenceSet) -> Result<Envelopes> {
362 self.retry.reset();
363
364 let fetches = loop {
365 let res = self
366 .retry
367 .timeout(self.inner.uid_fetch(uids.clone(), FETCH_ENVELOPES.clone()))
368 .await;
369
370 match self.retry(res).await? {
371 ImapRetryState::Retry => continue,
372 ImapRetryState::TimedOut => break Err(Error::FetchMessagesTimedOutError),
373 ImapRetryState::Ok(res) => break res.map_err(Error::FetchMessagesError),
374 }
375 }?;
376
377 Ok(Envelopes::from_imap_data_items(fetches))
378 }
379
380 #[instrument(skip_all, fields(client = self.id))]
381 pub async fn fetch_envelopes_map(
382 &mut self,
383 uids: SequenceSet,
384 ) -> Result<HashMap<String, Envelope>> {
385 let fetches = loop {
386 let res = self
387 .retry
388 .timeout(self.inner.uid_fetch(uids.clone(), FETCH_ENVELOPES.clone()))
389 .await;
390
391 match self.retry(res).await? {
392 ImapRetryState::Retry => continue,
393 ImapRetryState::TimedOut => break Err(Error::FetchMessagesTimedOutError),
394 ImapRetryState::Ok(res) => break res.map_err(Error::FetchMessagesError),
395 }
396 }?;
397
398 let map = fetches
399 .into_values()
400 .map(|items| {
401 let envelope = Envelope::from_imap_data_items(items.as_ref());
402 (envelope.id.clone(), envelope)
403 })
404 .collect();
405
406 Ok(map)
407 }
408
409 #[instrument(skip_all, fields(client = self.id))]
410 pub async fn fetch_first_envelope(&mut self, uid: u32) -> Result<Envelope> {
411 let items = loop {
412 let task = self
413 .inner
414 .uid_fetch_first(uid.try_into().unwrap(), FETCH_ENVELOPES.clone());
415
416 let res = self.retry.timeout(task).await;
417
418 match self.retry(res).await? {
419 ImapRetryState::Retry => continue,
420 ImapRetryState::TimedOut => break Err(Error::FetchMessagesTimedOutError),
421 ImapRetryState::Ok(res) => break res.map_err(Error::FetchMessagesError),
422 }
423 }?;
424
425 Ok(Envelope::from_imap_data_items(items.as_ref()))
426 }
427
428 #[instrument(skip_all, fields(client = self.id))]
429 pub async fn fetch_envelopes_by_sequence(&mut self, seq: SequenceSet) -> Result<Envelopes> {
430 let fetches = loop {
431 let res = self
432 .retry
433 .timeout(self.inner.fetch(seq.clone(), FETCH_ENVELOPES.clone()))
434 .await;
435
436 match self.retry(res).await? {
437 ImapRetryState::Retry => continue,
438 ImapRetryState::TimedOut => break Err(Error::FetchMessagesTimedOutError),
439 ImapRetryState::Ok(res) => break res.map_err(Error::FetchMessagesError),
440 }
441 }?;
442
443 Ok(Envelopes::from_imap_data_items(fetches))
444 }
445
446 #[instrument(skip_all, fields(client = self.id))]
447 pub async fn fetch_all_envelopes(&mut self) -> Result<Envelopes> {
448 self.fetch_envelopes_by_sequence("1:*".try_into().unwrap())
449 .await
450 }
451
452 #[instrument(skip_all, fields(client = self.id))]
453 pub async fn sort_uids(
454 &mut self,
455 sort_criteria: impl IntoIterator<Item = SortCriterion> + Clone,
456 search_criteria: impl IntoIterator<Item = SearchKey<'static>> + Clone,
457 ) -> Result<Vec<NonZeroU32>> {
458 loop {
459 let task = self
460 .inner
461 .uid_sort(sort_criteria.clone(), search_criteria.clone());
462
463 let res = self.retry.timeout(task).await;
464
465 match self.retry(res).await? {
466 ImapRetryState::Retry => continue,
467 ImapRetryState::TimedOut => break Err(Error::SortUidsTimedOutError),
468 ImapRetryState::Ok(res) => break res.map_err(Error::SortUidsError),
469 }
470 }
471 }
472
473 #[instrument(skip_all, fields(client = self.id))]
474 pub async fn search_uids(
475 &mut self,
476 search_criteria: impl IntoIterator<Item = SearchKey<'static>> + Clone,
477 ) -> Result<Vec<NonZeroU32>> {
478 loop {
479 let res = self
480 .retry
481 .timeout(self.inner.uid_search(search_criteria.clone()))
482 .await;
483
484 match self.retry(res).await? {
485 ImapRetryState::Retry => continue,
486 ImapRetryState::TimedOut => break Err(Error::SearchUidsTimedOutError),
487 ImapRetryState::Ok(res) => break res.map_err(Error::SearchUidsError),
488 }
489 }
490 }
491
492 #[instrument(skip_all, fields(client = self.id))]
493 pub async fn sort_envelopes(
494 &mut self,
495 sort_criteria: impl IntoIterator<Item = SortCriterion> + Clone,
496 search_criteria: impl IntoIterator<Item = SearchKey<'static>> + Clone,
497 ) -> Result<Envelopes> {
498 let fetches = loop {
499 let task = self.inner.uid_sort_or_fallback(
500 sort_criteria.clone(),
501 search_criteria.clone(),
502 FETCH_ENVELOPES.clone(),
503 );
504
505 let res = self.retry.timeout(task).await;
506
507 match self.retry(res).await? {
508 ImapRetryState::Retry => continue,
509 ImapRetryState::TimedOut => break Err(Error::FetchMessagesTimedOutError),
510 ImapRetryState::Ok(res) => break res.map_err(Error::FetchMessagesError),
511 }
512 }?;
513
514 Ok(Envelopes::from(fetches))
515 }
516
517 #[instrument(skip_all, fields(client = self.id))]
518 pub async fn thread_envelopes(
519 &mut self,
520 search_criteria: impl IntoIterator<Item = SearchKey<'static>> + Clone,
521 ) -> Result<Vec<Thread>> {
522 loop {
523 let task = self
524 .inner
525 .uid_thread(ThreadingAlgorithm::References, search_criteria.clone());
526
527 let res = self.retry.timeout(task).await;
528
529 match self.retry(res).await? {
530 ImapRetryState::Retry => continue,
531 ImapRetryState::TimedOut => break Err(Error::ThreadMessagesTimedOutError),
532 ImapRetryState::Ok(res) => break res.map_err(Error::ThreadMessagesError),
533 }
534 }
535 }
536
537 #[instrument(skip_all, fields(client = self.id))]
538 pub async fn idle(
539 &mut self,
540 wait_for_shutdown_request: &mut oneshot::Receiver<()>,
541 ) -> Result<()> {
542 let tag = self.inner.enqueue_idle();
543
544 select! {
545 output = self.inner.idle(tag.clone()) => {
546 output.map_err(Error::StartIdleError)?;
547 Ok(())
548 },
549 _ = wait_for_shutdown_request => {
550 debug!("shutdown requested, sending done command…");
551 self.inner.idle_done(tag.clone()).await.map_err(Error::StopIdleError)?;
552 Err(Error::IdleInterruptedError)
553 }
554 }
555 }
556
557 #[instrument(skip_all, fields(client = self.id))]
558 pub async fn add_flags(
559 &mut self,
560 uids: SequenceSet,
561 flags: impl IntoIterator<Item = Flag<'static>> + Clone,
562 ) -> Result<HashMap<NonZeroU32, Vec1<MessageDataItem<'static>>>> {
563 loop {
564 let task = self
565 .inner
566 .uid_store(uids.clone(), StoreType::Add, flags.clone());
567
568 let res = self.retry.timeout(task).await;
569
570 match self.retry(res).await? {
571 ImapRetryState::Retry => continue,
572 ImapRetryState::TimedOut => break Err(Error::StoreFlagsTimedOutError),
573 ImapRetryState::Ok(res) => break res.map_err(Error::StoreFlagsError),
574 }
575 }
576 }
577
578 #[instrument(skip_all, fields(client = self.id))]
579 pub async fn add_deleted_flag(
580 &mut self,
581 uids: SequenceSet,
582 ) -> Result<HashMap<NonZeroU32, Vec1<MessageDataItem<'static>>>> {
583 loop {
584 let task = self
585 .inner
586 .uid_store(uids.clone(), StoreType::Add, Some(Flag::Deleted));
587
588 let res = self.retry.timeout(task).await;
589
590 match self.retry(res).await? {
591 ImapRetryState::Retry => continue,
592 ImapRetryState::TimedOut => break Err(Error::StoreFlagsTimedOutError),
593 ImapRetryState::Ok(res) => break res.map_err(Error::StoreFlagsError),
594 }
595 }
596 }
597
598 #[instrument(skip_all, fields(client = self.id))]
599 pub async fn add_deleted_flag_silently(&mut self, uids: SequenceSet) -> Result<()> {
600 loop {
601 let task =
602 self.inner
603 .uid_silent_store(uids.clone(), StoreType::Add, Some(Flag::Deleted));
604
605 let res = self.retry.timeout(task).await;
606
607 match self.retry(res).await? {
608 ImapRetryState::Retry => continue,
609 ImapRetryState::TimedOut => break Err(Error::StoreFlagsTimedOutError),
610 ImapRetryState::Ok(res) => break res.map_err(Error::StoreFlagsError),
611 }
612 }
613 }
614
615 #[instrument(skip_all, fields(client = self.id))]
616 pub async fn add_flags_silently(
617 &mut self,
618 uids: SequenceSet,
619 flags: impl IntoIterator<Item = Flag<'static>> + Clone,
620 ) -> Result<()> {
621 loop {
622 let task = self
623 .inner
624 .uid_silent_store(uids.clone(), StoreType::Add, flags.clone());
625
626 let res = self.retry.timeout(task).await;
627
628 match self.retry(res).await? {
629 ImapRetryState::Retry => continue,
630 ImapRetryState::TimedOut => break Err(Error::StoreFlagsTimedOutError),
631 ImapRetryState::Ok(res) => break res.map_err(Error::StoreFlagsError),
632 }
633 }
634 }
635
636 #[instrument(skip_all, fields(client = self.id))]
637 pub async fn set_flags(
638 &mut self,
639 uids: SequenceSet,
640 flags: impl IntoIterator<Item = Flag<'static>> + Clone,
641 ) -> Result<HashMap<NonZeroU32, Vec1<MessageDataItem<'static>>>> {
642 loop {
643 let task = self
644 .inner
645 .uid_store(uids.clone(), StoreType::Replace, flags.clone());
646
647 let res = self.retry.timeout(task).await;
648
649 match self.retry(res).await? {
650 ImapRetryState::Retry => continue,
651 ImapRetryState::TimedOut => break Err(Error::StoreFlagsTimedOutError),
652 ImapRetryState::Ok(res) => break res.map_err(Error::StoreFlagsError),
653 }
654 }
655 }
656
657 #[instrument(skip_all, fields(client = self.id))]
658 pub async fn set_flags_silently(
659 &mut self,
660 uids: SequenceSet,
661 flags: impl IntoIterator<Item = Flag<'static>> + Clone,
662 ) -> Result<()> {
663 loop {
664 let task = self
665 .inner
666 .uid_silent_store(uids.clone(), StoreType::Replace, flags.clone());
667
668 let res = self.retry.timeout(task).await;
669
670 match self.retry(res).await? {
671 ImapRetryState::Retry => continue,
672 ImapRetryState::TimedOut => break Err(Error::StoreFlagsTimedOutError),
673 ImapRetryState::Ok(res) => break res.map_err(Error::StoreFlagsError),
674 }
675 }
676 }
677
678 #[instrument(skip_all, fields(client = self.id))]
679 pub async fn remove_flags(
680 &mut self,
681 uids: SequenceSet,
682 flags: impl IntoIterator<Item = Flag<'static>> + Clone,
683 ) -> Result<HashMap<NonZeroU32, Vec1<MessageDataItem<'static>>>> {
684 loop {
685 let task = self
686 .inner
687 .uid_store(uids.clone(), StoreType::Remove, flags.clone());
688
689 let res = self.retry.timeout(task).await;
690
691 match self.retry(res).await? {
692 ImapRetryState::Retry => continue,
693 ImapRetryState::TimedOut => break Err(Error::StoreFlagsTimedOutError),
694 ImapRetryState::Ok(res) => break res.map_err(Error::StoreFlagsError),
695 }
696 }
697 }
698
699 #[instrument(skip_all, fields(client = self.id))]
700 pub async fn remove_flags_silently(
701 &mut self,
702 uids: SequenceSet,
703 flags: impl IntoIterator<Item = Flag<'static>> + Clone,
704 ) -> Result<()> {
705 loop {
706 let task = self
707 .inner
708 .uid_silent_store(uids.clone(), StoreType::Remove, flags.clone());
709
710 let res = self.retry.timeout(task).await;
711
712 match self.retry(res).await? {
713 ImapRetryState::Retry => continue,
714 ImapRetryState::TimedOut => break Err(Error::StoreFlagsTimedOutError),
715 ImapRetryState::Ok(res) => break res.map_err(Error::StoreFlagsError),
716 }
717 }
718 }
719
720 #[instrument(skip_all, fields(client = self.id))]
721 pub async fn add_message(
722 &mut self,
723 mbox: impl ToString,
724 flags: impl IntoIterator<Item = Flag<'static>> + Clone,
725 msg: impl AsRef<[u8]> + Clone,
726 ) -> Result<NonZeroU32> {
727 let id = loop {
728 let task =
729 self.inner
730 .appenduid_or_fallback(mbox.to_string(), flags.clone(), msg.clone());
731
732 let res = self.retry.timeout(task).await;
733
734 match self.retry(res).await? {
735 ImapRetryState::Retry => continue,
736 ImapRetryState::TimedOut => break Err(Error::AddMessageTimedOutError),
737 ImapRetryState::Ok(res) => break res.map_err(Error::AddMessageError),
738 }
739 }?;
740
741 id.ok_or(Error::FindAppendedMessageUidError)
742 }
743
744 #[instrument(skip_all, fields(client = self.id))]
745 pub async fn fetch_messages(&mut self, uids: SequenceSet) -> Result<Messages> {
746 let mut fetches = loop {
747 let res = self
748 .retry
749 .timeout(self.inner.uid_fetch(uids.clone(), FETCH_MESSAGES.clone()))
750 .await;
751
752 match self.retry(res).await? {
753 ImapRetryState::Retry => continue,
754 ImapRetryState::TimedOut => break Err(Error::FetchMessagesTimedOutError),
755 ImapRetryState::Ok(res) => break res.map_err(Error::FetchMessagesError),
756 }
757 }?;
758
759 let fetches: Vec<_> = uids
760 .iter(NonZeroU32::MAX)
761 .filter_map(|ref uid| fetches.remove(uid))
762 .collect();
763
764 Ok(Messages::from(fetches))
765 }
766
767 #[instrument(skip_all, fields(client = self.id))]
768 pub async fn peek_messages(&mut self, uids: SequenceSet) -> Result<Messages> {
769 let mut fetches = loop {
770 let res = self
771 .retry
772 .timeout(self.inner.uid_fetch(uids.clone(), PEEK_MESSAGES.clone()))
773 .await;
774
775 match self.retry(res).await? {
776 ImapRetryState::Retry => continue,
777 ImapRetryState::TimedOut => break Err(Error::FetchMessagesTimedOutError),
778 ImapRetryState::Ok(res) => break res.map_err(Error::FetchMessagesError),
779 }
780 }?;
781
782 let fetches: Vec<_> = uids
783 .iter(NonZeroU32::MAX)
784 .filter_map(|ref uid| fetches.remove(uid))
785 .collect();
786
787 Ok(Messages::from(fetches))
788 }
789
790 #[instrument(skip_all, fields(client = self.id))]
791 pub async fn copy_messages(&mut self, uids: SequenceSet, mbox: impl ToString) -> Result<()> {
792 loop {
793 let res = self
794 .retry
795 .timeout(self.inner.uid_copy(uids.clone(), mbox.to_string()))
796 .await;
797
798 match self.retry(res).await? {
799 ImapRetryState::Retry => continue,
800 ImapRetryState::TimedOut => break Err(Error::CopyMessagesTimedOutError),
801 ImapRetryState::Ok(res) => break res.map_err(Error::CopyMessagesError),
802 }
803 }
804 }
805
806 #[instrument(skip_all, fields(client = self.id))]
807 pub async fn move_messages(&mut self, uids: SequenceSet, mbox: impl ToString) -> Result<()> {
808 loop {
809 let res = self
810 .retry
811 .timeout(self.inner.uid_move(uids.clone(), mbox.to_string()))
812 .await;
813
814 match self.retry(res).await? {
815 ImapRetryState::Retry => continue,
816 ImapRetryState::TimedOut => break Err(Error::MoveMessagesTimedOutError),
817 ImapRetryState::Ok(res) => break res.map_err(Error::MoveMessagesError),
818 }
819 }
820 }
821}
822
823impl fmt::Debug for ImapClient {
824 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
825 f.debug_struct("ImapContext")
826 .field("id", &self.id)
827 .field("mailbox", &self.mailbox)
828 .field("imap_config", &self.imap_config)
829 .finish_non_exhaustive()
830 }
831}
832
833#[derive(Debug, Clone)]
838pub struct ImapContext {
839 pub account_config: Arc<AccountConfig>,
841
842 pub imap_config: Arc<ImapConfig>,
844
845 clients: Vec<Arc<Mutex<ImapClient>>>,
846}
847
848impl ImapContext {
849 pub async fn client(&self) -> MutexGuard<'_, ImapClient> {
850 loop {
851 let lock = self
852 .clients
853 .iter()
854 .find_map(|client| client.try_lock().ok());
855
856 if let Some(ctx) = lock {
857 let total = self.clients.len();
858 let id = ctx.id;
859 debug!("client {id}/{total} is free, locking it");
860 break ctx;
861 } else {
862 trace!("no free client, sleeping for 1s");
863 sleep(Duration::from_secs(1)).await;
864 }
865 }
866 }
867}
868
869impl BackendContext for ImapContext {}
870
871#[derive(Clone, Debug, Default, Eq, PartialEq)]
873pub struct ImapContextBuilder {
874 pub account_config: Arc<AccountConfig>,
876
877 pub imap_config: Arc<ImapConfig>,
879
880 prebuilt_credentials: Option<String>,
882
883 pool_size: u8,
884}
885
886impl ImapContextBuilder {
887 pub fn new(account_config: Arc<AccountConfig>, imap_config: Arc<ImapConfig>) -> Self {
888 let pool_size = imap_config.clients_pool_size();
889
890 Self {
891 account_config,
892 imap_config,
893 prebuilt_credentials: None,
894 pool_size,
895 }
896 }
897
898 pub async fn prebuild_credentials(&mut self) -> Result<()> {
899 self.prebuilt_credentials = Some(self.imap_config.build_credentials().await?);
900 Ok(())
901 }
902
903 pub async fn with_prebuilt_credentials(mut self) -> Result<Self> {
904 self.prebuild_credentials().await?;
905 Ok(self)
906 }
907
908 pub fn with_pool_size(mut self, pool_size: u8) -> Self {
909 self.pool_size = pool_size;
910 self
911 }
912}
913
914#[cfg(feature = "sync")]
915impl crate::sync::hash::SyncHash for ImapContextBuilder {
916 fn sync_hash(&self, state: &mut std::hash::DefaultHasher) {
917 self.imap_config.sync_hash(state);
918 }
919}
920
921#[async_trait]
922impl BackendContextBuilder for ImapContextBuilder {
923 type Context = ImapContext;
924
925 fn check_up(&self) -> Option<BackendFeature<Self::Context, dyn CheckUp>> {
926 Some(Arc::new(CheckUpImap::some_new_boxed))
927 }
928
929 fn add_folder(&self) -> Option<BackendFeature<Self::Context, dyn AddFolder>> {
930 Some(Arc::new(AddImapFolder::some_new_boxed))
931 }
932
933 fn list_folders(&self) -> Option<BackendFeature<Self::Context, dyn ListFolders>> {
934 Some(Arc::new(ListImapFolders::some_new_boxed))
935 }
936
937 fn expunge_folder(&self) -> Option<BackendFeature<Self::Context, dyn ExpungeFolder>> {
938 Some(Arc::new(ExpungeImapFolder::some_new_boxed))
939 }
940
941 fn purge_folder(&self) -> Option<BackendFeature<Self::Context, dyn PurgeFolder>> {
942 Some(Arc::new(PurgeImapFolder::some_new_boxed))
943 }
944
945 fn delete_folder(&self) -> Option<BackendFeature<Self::Context, dyn DeleteFolder>> {
946 Some(Arc::new(DeleteImapFolder::some_new_boxed))
947 }
948
949 fn get_envelope(&self) -> Option<BackendFeature<Self::Context, dyn GetEnvelope>> {
950 Some(Arc::new(GetImapEnvelope::some_new_boxed))
951 }
952
953 fn list_envelopes(&self) -> Option<BackendFeature<Self::Context, dyn ListEnvelopes>> {
954 Some(Arc::new(ListImapEnvelopes::some_new_boxed))
955 }
956
957 #[cfg(feature = "thread")]
958 fn thread_envelopes(&self) -> Option<BackendFeature<Self::Context, dyn ThreadEnvelopes>> {
959 Some(Arc::new(ThreadImapEnvelopes::some_new_boxed))
960 }
961
962 #[cfg(feature = "watch")]
963 fn watch_envelopes(&self) -> Option<BackendFeature<Self::Context, dyn WatchEnvelopes>> {
964 Some(Arc::new(WatchImapEnvelopes::some_new_boxed))
965 }
966
967 fn add_flags(&self) -> Option<BackendFeature<Self::Context, dyn AddFlags>> {
968 Some(Arc::new(AddImapFlags::some_new_boxed))
969 }
970
971 fn set_flags(&self) -> Option<BackendFeature<Self::Context, dyn SetFlags>> {
972 Some(Arc::new(SetImapFlags::some_new_boxed))
973 }
974
975 fn remove_flags(&self) -> Option<BackendFeature<Self::Context, dyn RemoveFlags>> {
976 Some(Arc::new(RemoveImapFlags::some_new_boxed))
977 }
978
979 fn add_message(&self) -> Option<BackendFeature<Self::Context, dyn AddMessage>> {
980 Some(Arc::new(AddImapMessage::some_new_boxed))
981 }
982
983 fn peek_messages(&self) -> Option<BackendFeature<Self::Context, dyn PeekMessages>> {
984 Some(Arc::new(PeekImapMessages::some_new_boxed))
985 }
986
987 fn get_messages(&self) -> Option<BackendFeature<Self::Context, dyn GetMessages>> {
988 Some(Arc::new(GetImapMessages::some_new_boxed))
989 }
990
991 fn copy_messages(&self) -> Option<BackendFeature<Self::Context, dyn CopyMessages>> {
992 Some(Arc::new(CopyImapMessages::some_new_boxed))
993 }
994
995 fn move_messages(&self) -> Option<BackendFeature<Self::Context, dyn MoveMessages>> {
996 Some(Arc::new(MoveImapMessages::some_new_boxed))
997 }
998
999 fn delete_messages(&self) -> Option<BackendFeature<Self::Context, dyn DeleteMessages>> {
1000 Some(Arc::new(DeleteImapMessages::some_new_boxed))
1001 }
1002
1003 fn remove_messages(&self) -> Option<BackendFeature<Self::Context, dyn RemoveMessages>> {
1004 Some(Arc::new(RemoveImapMessages::some_new_boxed))
1005 }
1006
1007 async fn build(self) -> AnyResult<Self::Context> {
1008 let client_builder =
1009 ImapClientBuilder::new(self.imap_config.clone(), self.prebuilt_credentials);
1010
1011 debug!("building {} IMAP clients", self.pool_size);
1012
1013 let clients = FuturesUnordered::from_iter((0..self.pool_size).map(move |i| {
1014 let mut client_builder = client_builder.clone();
1015 tokio::spawn(async move {
1016 let client = client_builder.build().await?;
1017 Ok((i + 1, client_builder, client))
1018 })
1019 }))
1020 .map(|res| match res {
1021 Err(err) => Err(Error::JoinClientError(err)),
1022 Ok(Err(err)) => Err(Error::BuildClientError(Box::new(err))),
1023 Ok(Ok((id, client_builder, inner))) => Ok(Arc::new(Mutex::new(ImapClient {
1024 id,
1025 account_config: self.account_config.clone(),
1026 imap_config: self.imap_config.clone(),
1027 client_builder,
1028 inner,
1029 mailbox: Default::default(),
1030 retry: Default::default(),
1031 }))),
1032 })
1033 .collect::<Vec<_>>()
1034 .await
1035 .into_iter()
1036 .collect::<Result<_>>()?;
1037
1038 Ok(ImapContext {
1039 account_config: self.account_config,
1040 imap_config: self.imap_config,
1041 clients,
1042 })
1043 }
1044}
1045
1046#[derive(Clone, Debug)]
1047pub struct CheckUpImap {
1048 ctx: ImapContext,
1049}
1050
1051impl CheckUpImap {
1052 pub fn new(ctx: &ImapContext) -> Self {
1053 Self { ctx: ctx.clone() }
1054 }
1055
1056 pub fn new_boxed(ctx: &ImapContext) -> Box<dyn CheckUp> {
1057 Box::new(Self::new(ctx))
1058 }
1059
1060 pub fn some_new_boxed(ctx: &ImapContext) -> Option<Box<dyn CheckUp>> {
1061 Some(Self::new_boxed(ctx))
1062 }
1063}
1064
1065#[async_trait]
1066impl CheckUp for CheckUpImap {
1067 #[instrument(skip_all)]
1068 async fn check_up(&self) -> AnyResult<()> {
1069 debug!("executing check up backend feature");
1070 Ok(self.ctx.client().await.noop().await?)
1071 }
1072}
1073
1074#[derive(Clone, Debug)]
1075pub struct ImapClientBuilder {
1076 pub config: Arc<ImapConfig>,
1077 pub credentials: Option<String>,
1078}
1079
1080impl ImapClientBuilder {
1081 pub fn new(config: Arc<ImapConfig>, credentials: Option<String>) -> Self {
1082 Self {
1083 config,
1084 credentials,
1085 }
1086 }
1087
1088 #[instrument(name = "client::build", skip(self))]
1096 pub async fn build(&mut self) -> Result<Client> {
1097 let mut client = match &self.config.encryption {
1098 Some(Encryption::None) => Client::insecure(&self.config.host, self.config.port)
1099 .await
1100 .map_err(|err| {
1101 let host = self.config.host.clone();
1102 let port = self.config.port.clone();
1103 Error::BuildInsecureClientError(err, host, port)
1104 })?,
1105 Some(Encryption::Tls(Tls {
1106 provider: Some(TlsProvider::None),
1107 }))
1108 | Some(Encryption::StartTls(Tls {
1109 provider: Some(TlsProvider::None),
1110 })) => {
1111 return Err(Error::BuildTlsClientMissingProvider);
1112 }
1113 #[cfg(feature = "rustls")]
1114 Some(Encryption::Tls(Tls {
1115 provider: Some(TlsProvider::Rustls(_)) | None,
1116 }))
1117 | None => Client::rustls(&self.config.host, self.config.port, false)
1118 .await
1119 .map_err(|err| {
1120 let host = self.config.host.clone();
1121 let port = self.config.port.clone();
1122 Error::BuildStartTlsClientError(err, host, port)
1123 })?,
1124 #[cfg(feature = "native-tls")]
1125 Some(Encryption::Tls(Tls {
1126 provider: Some(TlsProvider::NativeTls(_)),
1127 })) => Client::native_tls(&self.config.host, self.config.port, false)
1128 .await
1129 .map_err(|err| {
1130 let host = self.config.host.clone();
1131 let port = self.config.port.clone();
1132 Error::BuildStartTlsClientError(err, host, port)
1133 })?,
1134 #[cfg(feature = "rustls")]
1135 Some(Encryption::StartTls(Tls {
1136 provider: Some(TlsProvider::Rustls(_)) | None,
1137 })) => Client::rustls(&self.config.host, self.config.port, true)
1138 .await
1139 .map_err(|err| {
1140 let host = self.config.host.clone();
1141 let port = self.config.port.clone();
1142 Error::BuildStartTlsClientError(err, host, port)
1143 })?,
1144 #[cfg(feature = "native-tls")]
1145 Some(Encryption::StartTls(Tls {
1146 provider: Some(TlsProvider::NativeTls(_)),
1147 })) => Client::native_tls(&self.config.host, self.config.port, true)
1148 .await
1149 .map_err(|err| {
1150 let host = self.config.host.clone();
1151 let port = self.config.port.clone();
1152 Error::BuildStartTlsClientError(err, host, port)
1153 })?,
1154 };
1155
1156 client
1157 .state
1158 .set_some_idle_timeout(self.config.find_watch_timeout().map(Duration::from_secs));
1159
1160 match &self.config.auth {
1161 ImapAuthConfig::Password(passwd) => {
1162 debug!("using password authentication");
1163
1164 let passwd = match self.credentials.as_ref() {
1165 Some(passwd) => passwd.to_string(),
1166 None => passwd
1167 .get()
1168 .await
1169 .map_err(Error::GetPasswdImapError)?
1170 .lines()
1171 .next()
1172 .ok_or(Error::GetPasswdEmptyImapError)?
1173 .to_owned(),
1174 };
1175
1176 let mechanisms: Vec<_> =
1177 client.state.supported_auth_mechanisms().cloned().collect();
1178 let mut authenticated = false;
1179
1180 debug!(?mechanisms, "supported auth mechanisms");
1181
1182 for mechanism in mechanisms {
1183 debug!(?mechanism, "trying auth mechanism…");
1184
1185 let auth = match mechanism {
1186 AuthMechanism::Plain => {
1187 client
1188 .authenticate_plain(self.config.login.as_str(), passwd.as_str())
1189 .await
1190 }
1191 _ => {
1198 continue;
1199 }
1200 };
1201
1202 if let Err(ref err) = auth {
1203 warn!(?mechanism, ?err, "authentication failed");
1204 }
1205
1206 if auth.is_ok() {
1207 debug!(?mechanism, "authentication succeeded!");
1208 authenticated = true;
1209 break;
1210 }
1211 }
1212
1213 if !authenticated {
1214 if !client.state.login_supported() {
1215 return Err(Error::LoginNotSupportedError);
1216 }
1217
1218 debug!("trying login…");
1219
1220 client
1221 .login(self.config.login.as_str(), passwd.as_str())
1222 .await
1223 .map_err(Error::LoginError)?;
1224
1225 debug!("login succeeded!");
1226 }
1227 }
1228 #[cfg(feature = "oauth2")]
1229 ImapAuthConfig::OAuth2(oauth2) => {
1230 debug!("using OAuth 2.0 authentication");
1231
1232 match oauth2.method {
1233 OAuth2Method::XOAuth2 => {
1234 if !client.state.supports_auth_mechanism(AuthMechanism::XOAuth2) {
1235 let auth = client.state.supported_auth_mechanisms().cloned().collect();
1236 return Err(Error::AuthenticateXOAuth2NotSupportedError(auth));
1237 }
1238
1239 debug!("using XOAUTH2 auth mechanism");
1240
1241 let access_token = match self.credentials.as_ref() {
1242 Some(access_token) => access_token.to_string(),
1243 None => oauth2
1244 .access_token()
1245 .await
1246 .map_err(Error::RefreshAccessTokenError)?,
1247 };
1248
1249 let auth = client
1250 .authenticate_xoauth2(self.config.login.as_str(), access_token.as_str())
1251 .await;
1252
1253 if auth.is_err() {
1254 warn!("authentication failed, refreshing access token and retrying…");
1255
1256 let access_token = oauth2
1257 .refresh_access_token()
1258 .await
1259 .map_err(Error::RefreshAccessTokenError)?;
1260
1261 client
1262 .authenticate_xoauth2(
1263 self.config.login.as_str(),
1264 access_token.as_str(),
1265 )
1266 .await
1267 .map_err(Error::AuthenticateXOauth2Error)?;
1268
1269 self.credentials = Some(access_token);
1270 }
1271 }
1272 OAuth2Method::OAuthBearer => {
1273 if !client
1274 .state
1275 .supports_auth_mechanism("OAUTHBEARER".try_into().unwrap())
1276 {
1277 let auth = client.state.supported_auth_mechanisms().cloned().collect();
1278 return Err(Error::AuthenticateOAuthBearerNotSupportedError(auth));
1279 }
1280
1281 debug!("using OAUTHBEARER auth mechanism");
1282
1283 let access_token = match self.credentials.as_ref() {
1284 Some(access_token) => access_token.to_string(),
1285 None => oauth2
1286 .access_token()
1287 .await
1288 .map_err(Error::RefreshAccessTokenError)?,
1289 };
1290
1291 let auth = client
1292 .authenticate_oauthbearer(
1293 self.config.login.as_str(),
1294 self.config.host.as_str(),
1295 self.config.port,
1296 access_token.as_str(),
1297 )
1298 .await;
1299
1300 if auth.is_err() {
1301 warn!("authentication failed, refreshing access token and retrying");
1302
1303 let access_token = oauth2
1304 .refresh_access_token()
1305 .await
1306 .map_err(Error::RefreshAccessTokenError)?;
1307
1308 client
1309 .authenticate_oauthbearer(
1310 self.config.login.as_str(),
1311 self.config.host.as_str(),
1312 self.config.port,
1313 access_token.as_str(),
1314 )
1315 .await
1316 .map_err(Error::AuthenticateOAuthBearerError)?;
1317
1318 self.credentials = Some(access_token);
1319 }
1320 }
1321 }
1322 }
1323 };
1324
1325 if self.config.send_id_after_auth() {
1326 let params = ID_PARAMS.clone();
1327 debug!(?params, "client identity");
1328
1329 let params = client
1330 .id(Some(ID_PARAMS.clone()))
1331 .await
1332 .map_err(Error::ExchangeIdsError)?;
1333
1334 debug!(?params, "server identity");
1335 }
1336
1337 Ok(client)
1347 }
1348}