1use std::{
2 cmp::Ordering,
3 collections::HashMap,
4 io,
5 num::NonZeroU32,
6 path::PathBuf,
7 pin::Pin,
8 sync::Arc,
9 task::{Context, Poll},
10};
11
12use imap_next::{
13 client::{Error as NextError, Event, Options as ClientOptions},
14 imap_types::{
15 command::{Command, CommandBody, FetchModifier},
16 core::{AString, IString, Literal, LiteralMode, NString, QuotedChar, Tag, Vec1},
17 error::ValidationError,
18 extensions::{
19 binary::{Literal8, LiteralOrLiteral8},
20 enable::CapabilityEnable,
21 sort::{SortCriterion, SortKey},
22 thread::{Thread, ThreadingAlgorithm},
23 },
24 fetch::{MacroOrMessageDataItemNames, MessageDataItem, MessageDataItemName},
25 flag::{Flag, FlagNameAttribute, StoreType},
26 mailbox::{ListMailbox, Mailbox},
27 response::{Code, Status, Tagged},
28 search::SearchKey,
29 secret::Secret,
30 sequence::SequenceSet,
31 IntoStatic,
32 },
33};
34use rip_starttls::imap::tokio::RipStarttls;
35use rustls_platform_verifier::ConfigVerifierExt;
36use thiserror::Error;
37use tokio::{
38 fs,
39 io::{AsyncRead, AsyncWrite, ReadBuf},
40 net::TcpStream,
41 time::timeout,
42};
43use tokio_rustls::{
44 rustls::{
45 pki_types::{pem::PemObject, CertificateDer, ServerName},
46 ClientConfig,
47 },
48 TlsConnector,
49};
50use tracing::{debug, trace};
51
52use crate::{
53 client::verifier::FingerprintVerifier,
54 stream::{self, Stream},
55 tasks::{
56 tasks::{
57 append::{AppendTask, PostAppendCheckTask, PostAppendNoOpTask},
58 appenduid::AppendUidTask,
59 authenticate::AuthenticateTask,
60 capability::CapabilityTask,
61 check::CheckTask,
62 copy::CopyTask,
63 create::CreateTask,
64 delete::DeleteTask,
65 enable::EnableTask,
66 expunge::ExpungeTask,
67 fetch::{FetchFirstTask, FetchTask},
68 id::IdTask,
69 list::ListTask,
70 login::LoginTask,
71 noop::NoOpTask,
72 r#move::MoveTask,
73 search::SearchTask,
74 select::{SelectDataUnvalidated, SelectTask},
75 sort::SortTask,
76 store::StoreTask,
77 thread::ThreadTask,
78 TaskError,
79 },
80 SchedulerError, SchedulerEvent, Task,
81 },
82};
83
84static MAX_SEQUENCE_SIZE: u8 = u8::MAX; #[derive(Debug, Error)]
87pub enum ClientError {
88 #[error("cannot upgrade client to TLS: client is already in TLS state")]
89 ClientAlreadyTlsError,
90 #[error("cannot do STARTTLS prefix")]
91 DoStarttlsPrefixError(#[from] io::Error),
92 #[error("stream error")]
93 Stream(#[from] stream::Error<SchedulerError>),
94 #[error("validation error")]
95 Validation(#[from] ValidationError),
96 #[error("cannot connect to TCP stream")]
97 ConnectToTcpStreamError(#[source] io::Error),
98 #[error("cannot connect to TLS stream")]
99 ConnectToTlsStreamError(#[source] io::Error),
100 #[error("cannot create TLS client config")]
101 TlsConfig(#[source] tokio_rustls::rustls::Error),
102
103 #[error("cannot receive greeting from server")]
104 ReceiveGreeting(#[source] stream::Error<SchedulerError>),
105 #[error("cannot resolve IMAP task")]
106 ResolveTask(#[from] TaskError),
107 #[error("{0} should contain at least one valid certificate")]
108 EmptyCert(PathBuf),
109 #[error("{1} does not contain a valid certificate")]
110 InvalidCert(tokio_rustls::rustls::pki_types::pem::Error, PathBuf),
111}
112
113pub enum MaybeTlsStream {
114 Plain(TcpStream),
115 Rustls(tokio_rustls::client::TlsStream<TcpStream>),
116}
117
118impl AsyncRead for MaybeTlsStream {
119 fn poll_read(
120 self: Pin<&mut Self>,
121 cx: &mut Context<'_>,
122 buf: &mut ReadBuf<'_>,
123 ) -> Poll<io::Result<()>> {
124 match self.get_mut() {
125 Self::Plain(s) => Pin::new(s).poll_read(cx, buf),
126 Self::Rustls(s) => Pin::new(s).poll_read(cx, buf),
127 }
128 }
129}
130
131impl AsyncWrite for MaybeTlsStream {
132 fn poll_write(
133 self: Pin<&mut Self>,
134 cx: &mut Context<'_>,
135 buf: &[u8],
136 ) -> Poll<io::Result<usize>> {
137 match self.get_mut() {
138 Self::Plain(s) => Pin::new(s).poll_write(cx, buf),
139 Self::Rustls(s) => Pin::new(s).poll_write(cx, buf),
140 }
141 }
142
143 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
144 match self.get_mut() {
145 Self::Plain(s) => Pin::new(s).poll_flush(cx),
146 Self::Rustls(s) => Pin::new(s).poll_flush(cx),
147 }
148 }
149
150 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
151 match self.get_mut() {
152 Self::Plain(s) => Pin::new(s).poll_shutdown(cx),
153 Self::Rustls(s) => Pin::new(s).poll_shutdown(cx),
154 }
155 }
156}
157
158pub struct Client {
159 host: String,
160 pub state: super::Client,
161 pub stream: Stream<MaybeTlsStream>,
162}
163
164impl Client {
169 pub async fn insecure(host: impl ToString, port: u16) -> Result<Self, ClientError> {
175 let mut client = Self::tcp(host, port, false).await?;
176
177 if !client.receive_greeting().await? {
178 client.refresh_capabilities().await?;
179 }
180
181 Ok(client)
182 }
183
184 pub async fn rustls(
190 host: impl ToString,
191 port: u16,
192 starttls: bool,
193 cert: Option<PathBuf>,
194 ) -> Result<Self, ClientError> {
195 let tcp = Self::tcp(host, port, starttls).await?;
196 Self::upgrade_rustls(tcp, starttls, cert).await
197 }
198
199 async fn tcp(
204 host: impl ToString,
205 port: u16,
206 discard_greeting: bool,
207 ) -> Result<Self, ClientError> {
208 let host = host.to_string();
209
210 let tcp_stream = TcpStream::connect((host.as_str(), port))
211 .await
212 .map_err(ClientError::ConnectToTcpStreamError)?;
213
214 let stream = Stream::new(MaybeTlsStream::Plain(tcp_stream));
215
216 let mut opts = ClientOptions::default();
217 opts.crlf_relaxed = true;
218 opts.discard_greeting = discard_greeting;
219
220 let state = super::Client::new(opts);
221
222 Ok(Self {
223 host,
224 stream,
225 state,
226 })
227 }
228
229 async fn upgrade_rustls(
239 mut self,
240 starttls: bool,
241 cert: Option<PathBuf>,
242 ) -> Result<Self, ClientError> {
243 let MaybeTlsStream::Plain(mut tcp_stream) = self.stream.into_inner() else {
244 return Err(ClientError::ClientAlreadyTlsError);
245 };
246
247 if starttls {
248 tcp_stream = RipStarttls::default()
249 .do_starttls_prefix(tcp_stream)
250 .await
251 .map_err(ClientError::DoStarttlsPrefixError)?;
252 }
253
254 let mut config = if let Some(pem_path) = cert {
255 let pem = fs::read(&pem_path).await?;
256
257 let Some(cert) = CertificateDer::pem_slice_iter(&pem).next() else {
258 return Err(ClientError::EmptyCert(pem_path));
259 };
260
261 let cert = match cert {
262 Ok(cert) => cert,
263 Err(err) => return Err(ClientError::InvalidCert(err, pem_path)),
264 };
265
266 let verifier = FingerprintVerifier::new(&cert);
267
268 ClientConfig::builder()
269 .dangerous()
270 .with_custom_certificate_verifier(Arc::new(verifier))
271 .with_no_client_auth()
272 } else {
273 ClientConfig::with_platform_verifier().map_err(ClientError::TlsConfig)?
274 };
275
276 config.alpn_protocols = vec![b"imap".to_vec()];
278
279 let connector = TlsConnector::from(Arc::new(config));
280 let dnsname = ServerName::try_from(self.host.clone()).unwrap();
281
282 let tls_stream = connector
283 .connect(dnsname, tcp_stream)
284 .await
285 .map_err(ClientError::ConnectToTlsStreamError)?;
286
287 self.stream = Stream::new(MaybeTlsStream::Rustls(tls_stream));
288
289 if starttls || !self.receive_greeting().await? {
290 self.refresh_capabilities().await?;
291 }
292
293 Ok(self)
294 }
295
296 async fn receive_greeting(&mut self) -> Result<bool, ClientError> {
303 let evt = self
304 .stream
305 .next(&mut self.state.resolver)
306 .await
307 .map_err(ClientError::ReceiveGreeting)?;
308
309 if let SchedulerEvent::GreetingReceived(greeting) = evt {
310 if let Some(Code::Capability(capabilities)) = greeting.code {
311 self.state.capabilities = capabilities;
312 return Ok(true);
313 }
314 }
315
316 Ok(false)
317 }
318}
319
320impl Client {
326 pub async fn resolve<T: Task>(&mut self, task: T) -> Result<T::Output, ClientError> {
328 Ok(self.stream.next(self.state.resolver.resolve(task)).await?)
329 }
330
331 pub async fn enable(
333 &mut self,
334 capabilities: impl IntoIterator<Item = CapabilityEnable<'_>>,
335 ) -> Result<Option<Vec<CapabilityEnable<'_>>>, ClientError> {
336 if !self.state.ext_enable_supported() {
337 debug!("IMAP ENABLE extension not supported, skipping");
338 return Ok(None);
339 }
340
341 let capabilities: Vec<_> = capabilities
342 .into_iter()
343 .map(IntoStatic::into_static)
344 .collect();
345
346 if capabilities.is_empty() {
347 return Ok(None);
348 }
349
350 let capabilities = Vec1::try_from(capabilities).unwrap();
351
352 Ok(self.resolve(EnableTask::new(capabilities)).await??)
353 }
354
355 pub async fn enable_condstore_if_supported(&mut self) -> Result<bool, ClientError> {
357 if !self.state.ext_condstore_supported() || !self.state.ext_enable_supported() {
358 return Ok(false);
359 }
360
361 let enable = self.enable(vec![CapabilityEnable::CondStore]);
362
363 let enabled = (enable.await?)
364 .map(|capabilities| capabilities.contains(&CapabilityEnable::CondStore))
365 .unwrap_or(false);
366 self.state.condstore_enabled = enabled;
367 Ok(enabled)
368 }
369
370 pub async fn create(
372 &mut self,
373 mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
374 ) -> Result<(), ClientError> {
375 let mbox = mailbox.try_into()?.into_static();
376 Ok(self.resolve(CreateTask::new(mbox)).await??)
377 }
378
379 pub async fn list(
381 &mut self,
382 mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
383 mailbox_wildcard: impl TryInto<ListMailbox<'_>, Error = ValidationError>,
384 ) -> Result<
385 Vec<(
386 Mailbox<'static>,
387 Option<QuotedChar>,
388 Vec<FlagNameAttribute<'static>>,
389 )>,
390 ClientError,
391 > {
392 let mbox = mailbox.try_into()?.into_static();
393 let mbox_wcard = mailbox_wildcard.try_into()?.into_static();
394 Ok(self.resolve(ListTask::new(mbox, mbox_wcard)).await??)
395 }
396
397 pub async fn select(
399 &mut self,
400 mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
401 ) -> Result<SelectDataUnvalidated, ClientError> {
402 let mbox = mailbox.try_into()?.into_static();
403 let task = SelectTask::new(mbox).with_condstore(self.state.condstore_enabled());
404 Ok(self.resolve(task).await??)
405 }
406
407 pub async fn examine(
409 &mut self,
410 mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
411 ) -> Result<SelectDataUnvalidated, ClientError> {
412 let mbox = mailbox.try_into()?.into_static();
413 let task = SelectTask::read_only(mbox).with_condstore(self.state.condstore_enabled());
414 Ok(self.resolve(task).await??)
415 }
416
417 pub async fn expunge(&mut self) -> Result<Vec<NonZeroU32>, ClientError> {
422 Ok(self.resolve(ExpungeTask::new()).await??)
423 }
424
425 pub async fn delete(
427 &mut self,
428 mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
429 ) -> Result<(), ClientError> {
430 let mbox = mailbox.try_into()?.into_static();
431 Ok(self.resolve(DeleteTask::new(mbox)).await??)
432 }
433
434 async fn _search(
436 &mut self,
437 criteria: impl IntoIterator<Item = SearchKey<'_>>,
438 uid: bool,
439 ) -> Result<Vec<NonZeroU32>, ClientError> {
440 let criteria: Vec<_> = criteria.into_iter().map(IntoStatic::into_static).collect();
441
442 let criteria = if criteria.is_empty() {
443 Vec1::from(SearchKey::All)
444 } else {
445 Vec1::try_from(criteria).unwrap()
446 };
447
448 Ok(self
449 .resolve(SearchTask::new(criteria).with_uid(uid))
450 .await??)
451 }
452
453 pub async fn search(
458 &mut self,
459 criteria: impl IntoIterator<Item = SearchKey<'_>>,
460 ) -> Result<Vec<NonZeroU32>, ClientError> {
461 self._search(criteria, false).await
462 }
463
464 pub async fn uid_search(
469 &mut self,
470 criteria: impl IntoIterator<Item = SearchKey<'_>>,
471 ) -> Result<Vec<NonZeroU32>, ClientError> {
472 self._search(criteria, true).await
473 }
474
475 async fn _sort(
478 &mut self,
479 sort_criteria: impl IntoIterator<Item = SortCriterion>,
480 search_criteria: impl IntoIterator<Item = SearchKey<'_>>,
481 uid: bool,
482 ) -> Result<Vec<NonZeroU32>, ClientError> {
483 let sort: Vec<_> = sort_criteria.into_iter().collect();
484 let sort = if sort.is_empty() {
485 Vec1::from(SortCriterion {
486 reverse: true,
487 key: SortKey::Date,
488 })
489 } else {
490 Vec1::try_from(sort).unwrap()
491 };
492
493 let search: Vec<_> = search_criteria
494 .into_iter()
495 .map(IntoStatic::into_static)
496 .collect();
497 let search = if search.is_empty() {
498 Vec1::from(SearchKey::All)
499 } else {
500 Vec1::try_from(search).unwrap()
501 };
502
503 Ok(self
504 .resolve(SortTask::new(sort, search).with_uid(uid))
505 .await??)
506 }
507
508 pub async fn sort(
514 &mut self,
515 sort_criteria: impl IntoIterator<Item = SortCriterion>,
516 search_criteria: impl IntoIterator<Item = SearchKey<'_>>,
517 ) -> Result<Vec<NonZeroU32>, ClientError> {
518 self._sort(sort_criteria, search_criteria, false).await
519 }
520
521 pub async fn uid_sort(
527 &mut self,
528 sort_criteria: impl IntoIterator<Item = SortCriterion>,
529 search_criteria: impl IntoIterator<Item = SearchKey<'_>>,
530 ) -> Result<Vec<NonZeroU32>, ClientError> {
531 self._sort(sort_criteria, search_criteria, true).await
532 }
533
534 async fn _thread(
535 &mut self,
536 algorithm: ThreadingAlgorithm<'_>,
537 search_criteria: impl IntoIterator<Item = SearchKey<'_>>,
538 uid: bool,
539 ) -> Result<Vec<Thread>, ClientError> {
540 let alg = algorithm.into_static();
541
542 let search: Vec<_> = search_criteria
543 .into_iter()
544 .map(IntoStatic::into_static)
545 .collect();
546 let search = if search.is_empty() {
547 Vec1::from(SearchKey::All)
548 } else {
549 Vec1::try_from(search).unwrap()
550 };
551
552 Ok(self
553 .resolve(ThreadTask::new(alg, search).with_uid(uid))
554 .await??)
555 }
556
557 pub async fn thread(
558 &mut self,
559 algorithm: ThreadingAlgorithm<'_>,
560 search_criteria: impl IntoIterator<Item = SearchKey<'_>>,
561 ) -> Result<Vec<Thread>, ClientError> {
562 self._thread(algorithm, search_criteria, false).await
563 }
564
565 pub async fn uid_thread(
566 &mut self,
567 algorithm: ThreadingAlgorithm<'_>,
568 search_criteria: impl IntoIterator<Item = SearchKey<'_>>,
569 ) -> Result<Vec<Thread>, ClientError> {
570 self._thread(algorithm, search_criteria, true).await
571 }
572
573 async fn _store(
574 &mut self,
575 sequence_set: SequenceSet,
576 kind: StoreType,
577 flags: impl IntoIterator<Item = Flag<'_>>,
578 uid: bool,
579 ) -> Result<HashMap<NonZeroU32, Vec1<MessageDataItem<'static>>>, ClientError> {
580 let flags: Vec<_> = flags.into_iter().map(IntoStatic::into_static).collect();
581
582 Ok(self
583 .resolve(StoreTask::new(sequence_set, kind, flags).with_uid(uid))
584 .await??)
585 }
586
587 pub async fn store(
588 &mut self,
589 sequence_set: SequenceSet,
590 kind: StoreType,
591 flags: impl IntoIterator<Item = Flag<'_>>,
592 ) -> Result<HashMap<NonZeroU32, Vec1<MessageDataItem<'static>>>, ClientError> {
593 self._store(sequence_set, kind, flags, false).await
594 }
595
596 pub async fn uid_store(
597 &mut self,
598 sequence_set: SequenceSet,
599 kind: StoreType,
600 flags: impl IntoIterator<Item = Flag<'_>>,
601 ) -> Result<HashMap<NonZeroU32, Vec1<MessageDataItem<'static>>>, ClientError> {
602 self._store(sequence_set, kind, flags, true).await
603 }
604
605 async fn _silent_store(
606 &mut self,
607 sequence_set: SequenceSet,
608 kind: StoreType,
609 flags: impl IntoIterator<Item = Flag<'_>>,
610 uid: bool,
611 ) -> Result<(), ClientError> {
612 let flags: Vec<_> = flags.into_iter().map(IntoStatic::into_static).collect();
613
614 let task = StoreTask::new(sequence_set, kind, flags)
615 .with_uid(uid)
616 .silent();
617
618 Ok(self.resolve(task).await??)
619 }
620
621 pub async fn silent_store(
622 &mut self,
623 sequence_set: SequenceSet,
624 kind: StoreType,
625 flags: impl IntoIterator<Item = Flag<'_>>,
626 ) -> Result<(), ClientError> {
627 self._silent_store(sequence_set, kind, flags, false).await
628 }
629
630 pub async fn uid_silent_store(
631 &mut self,
632 sequence_set: SequenceSet,
633 kind: StoreType,
634 flags: impl IntoIterator<Item = Flag<'_>>,
635 ) -> Result<(), ClientError> {
636 self._silent_store(sequence_set, kind, flags, true).await
637 }
638
639 pub async fn post_append_noop(&mut self) -> Result<Option<u32>, ClientError> {
640 Ok(self.resolve(PostAppendNoOpTask::new()).await??)
641 }
642
643 pub async fn post_append_check(&mut self) -> Result<Option<u32>, ClientError> {
644 Ok(self.resolve(PostAppendCheckTask::new()).await??)
645 }
646
647 async fn _fetch_first(
648 &mut self,
649 id: NonZeroU32,
650 items: MacroOrMessageDataItemNames<'_>,
651 uid: bool,
652 ) -> Result<Vec1<MessageDataItem<'static>>, ClientError> {
653 let items = items.into_static();
654
655 Ok(self
656 .resolve(FetchFirstTask::new(id, items).with_uid(uid))
657 .await??)
658 }
659
660 pub async fn fetch_first(
661 &mut self,
662 id: NonZeroU32,
663 items: MacroOrMessageDataItemNames<'_>,
664 ) -> Result<Vec1<MessageDataItem<'static>>, ClientError> {
665 self._fetch_first(id, items, false).await
666 }
667
668 pub async fn uid_fetch_first(
669 &mut self,
670 id: NonZeroU32,
671 items: MacroOrMessageDataItemNames<'_>,
672 ) -> Result<Vec1<MessageDataItem<'static>>, ClientError> {
673 self._fetch_first(id, items, true).await
674 }
675
676 async fn _copy(
677 &mut self,
678 sequence_set: SequenceSet,
679 mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
680 uid: bool,
681 ) -> Result<(), ClientError> {
682 let mbox = mailbox.try_into()?.into_static();
683
684 Ok(self
685 .resolve(CopyTask::new(sequence_set, mbox).with_uid(uid))
686 .await??)
687 }
688
689 pub async fn copy(
690 &mut self,
691 sequence_set: SequenceSet,
692 mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
693 ) -> Result<(), ClientError> {
694 self._copy(sequence_set, mailbox, false).await
695 }
696
697 pub async fn uid_copy(
698 &mut self,
699 sequence_set: SequenceSet,
700 mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
701 ) -> Result<(), ClientError> {
702 self._copy(sequence_set, mailbox, true).await
703 }
704
705 async fn _move(
706 &mut self,
707 sequence_set: SequenceSet,
708 mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
709 uid: bool,
710 ) -> Result<(), ClientError> {
711 let mbox = mailbox.try_into()?.into_static();
712
713 Ok(self
714 .resolve(MoveTask::new(sequence_set, mbox).with_uid(uid))
715 .await??)
716 }
717
718 pub async fn r#move(
719 &mut self,
720 sequence_set: SequenceSet,
721 mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
722 ) -> Result<(), ClientError> {
723 self._move(sequence_set, mailbox, false).await
724 }
725
726 pub async fn uid_move(
727 &mut self,
728 sequence_set: SequenceSet,
729 mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
730 ) -> Result<(), ClientError> {
731 self._move(sequence_set, mailbox, true).await
732 }
733
734 pub async fn check(&mut self) -> Result<(), ClientError> {
736 Ok(self.resolve(CheckTask::new()).await??)
737 }
738
739 pub async fn noop(&mut self) -> Result<(), ClientError> {
741 Ok(self.resolve(NoOpTask::new()).await??)
742 }
743}
744
745impl Client {
752 pub async fn refresh_capabilities(&mut self) -> Result<(), ClientError> {
754 self.state.capabilities = self.resolve(CapabilityTask::new()).await??;
755
756 Ok(())
757 }
758
759 pub async fn login(
761 &mut self,
762 username: impl TryInto<AString<'_>, Error = ValidationError>,
763 password: impl TryInto<AString<'_>, Error = ValidationError>,
764 ) -> Result<(), ClientError> {
765 let username = username.try_into()?.into_static();
766 let password = password.try_into()?.into_static();
767 let login = self.resolve(LoginTask::new(username, Secret::new(password)));
768
769 match login.await?? {
770 Some(capabilities) => {
771 self.state.capabilities = capabilities;
772 }
773 None => {
774 self.refresh_capabilities().await?;
775 }
776 };
777
778 Ok(())
779 }
780
781 async fn authenticate(&mut self, task: AuthenticateTask) -> Result<(), ClientError> {
786 match self.resolve(task).await?? {
787 Some(capabilities) => {
788 self.state.capabilities = capabilities;
789 }
790 None => {
791 self.refresh_capabilities().await?;
792 }
793 };
794
795 Ok(())
796 }
797
798 pub async fn authenticate_plain(
800 &mut self,
801 login: impl AsRef<str>,
802 password: impl AsRef<str>,
803 ) -> Result<(), ClientError> {
804 self.authenticate(AuthenticateTask::plain(
805 login.as_ref(),
806 password.as_ref(),
807 self.state.ext_sasl_ir_supported(),
808 ))
809 .await
810 }
811
812 pub async fn authenticate_xoauth2(
814 &mut self,
815 login: impl AsRef<str>,
816 token: impl AsRef<str>,
817 ) -> Result<(), ClientError> {
818 self.authenticate(AuthenticateTask::xoauth2(
819 login.as_ref(),
820 token.as_ref(),
821 self.state.ext_sasl_ir_supported(),
822 ))
823 .await
824 }
825
826 pub async fn authenticate_oauthbearer(
828 &mut self,
829 user: impl AsRef<str>,
830 host: impl AsRef<str>,
831 port: u16,
832 token: impl AsRef<str>,
833 ) -> Result<(), ClientError> {
834 self.authenticate(AuthenticateTask::oauthbearer(
835 user.as_ref(),
836 host.as_ref(),
837 port,
838 token.as_ref(),
839 self.state.ext_sasl_ir_supported(),
840 ))
841 .await
842 }
843
844 pub async fn id(
849 &mut self,
850 params: Option<Vec<(IString<'static>, NString<'static>)>>,
851 ) -> Result<Option<Vec<(IString<'static>, NString<'static>)>>, ClientError> {
852 Ok(if self.state.ext_id_supported() {
853 self.resolve(IdTask::new(params)).await??
854 } else {
855 debug!("IMAP ID extension not supported, skipping");
856 None
857 })
858 }
859
860 pub async fn append(
861 &mut self,
862 mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
863 flags: impl IntoIterator<Item = Flag<'_>>,
864 message: impl AsRef<[u8]>,
865 ) -> Result<Option<u32>, ClientError> {
866 let mbox = mailbox.try_into()?.into_static();
867
868 let flags: Vec<_> = flags.into_iter().map(IntoStatic::into_static).collect();
869
870 let msg = to_static_literal(message, self.state.ext_binary_supported())?;
871
872 Ok(self
873 .resolve(AppendTask::new(mbox, msg).with_flags(flags))
874 .await??)
875 }
876
877 pub async fn appenduid(
878 &mut self,
879 mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
880 flags: impl IntoIterator<Item = Flag<'_>>,
881 message: impl AsRef<[u8]>,
882 ) -> Result<Option<(NonZeroU32, NonZeroU32)>, ClientError> {
883 let mbox = mailbox.try_into()?.into_static();
884
885 let flags: Vec<_> = flags.into_iter().map(IntoStatic::into_static).collect();
886
887 let msg = to_static_literal(message, self.state.ext_binary_supported())?;
888
889 Ok(self
890 .resolve(AppendUidTask::new(mbox, msg).with_flags(flags))
891 .await??)
892 }
893}
894
895impl Client {
902 async fn _fetch(
903 &mut self,
904 sequence_set: SequenceSet,
905 items: MacroOrMessageDataItemNames<'_>,
906 uid: bool,
907 modifiers: Option<Vec<FetchModifier>>,
908 ) -> Result<HashMap<NonZeroU32, Vec1<MessageDataItem<'static>>>, ClientError> {
909 let mut items = match items {
910 MacroOrMessageDataItemNames::Macro(m) => m.expand().into_static(),
911 MacroOrMessageDataItemNames::MessageDataItemNames(items) => items.into_static(),
912 };
913
914 if uid {
915 items.push(MessageDataItemName::Uid);
916 }
917
918 let mut task = FetchTask::new(sequence_set, items.into()).with_uid(uid);
919 modifiers.map(|m| task.set_modifiers(m));
920
921 let seq_map = self.resolve(task).await??;
922
923 if uid {
924 let mut uid_map = HashMap::new();
925
926 for (seq, items) in seq_map {
927 let uid = items.as_ref().iter().find_map(|item| {
928 if let MessageDataItem::Uid(uid) = item {
929 Some(*uid)
930 } else {
931 None
932 }
933 });
934
935 match uid {
936 Some(uid) => {
937 uid_map.insert(uid, items);
938 }
939 None => {
940 debug!(?seq, "cannot get message uid, skipping it");
941 }
942 }
943 }
944
945 Ok(uid_map)
946 } else {
947 Ok(seq_map)
948 }
949 }
950
951 pub async fn fetch(
952 &mut self,
953 sequence_set: SequenceSet,
954 items: MacroOrMessageDataItemNames<'_>,
955 ) -> Result<HashMap<NonZeroU32, Vec1<MessageDataItem<'static>>>, ClientError> {
956 self._fetch(sequence_set, items, false, None).await
957 }
958
959 pub async fn fetch_with_modifiers(
960 &mut self,
961 sequence_set: SequenceSet,
962 items: MacroOrMessageDataItemNames<'_>,
963 modifiers: Vec<FetchModifier>,
964 ) -> Result<HashMap<NonZeroU32, Vec1<MessageDataItem<'static>>>, ClientError> {
965 self._fetch(sequence_set, items, false, Some(modifiers))
966 .await
967 }
968
969 pub async fn uid_fetch(
970 &mut self,
971 sequence_set: SequenceSet,
972 items: MacroOrMessageDataItemNames<'_>,
973 ) -> Result<HashMap<NonZeroU32, Vec1<MessageDataItem<'static>>>, ClientError> {
974 self._fetch(sequence_set, items, true, None).await
975 }
976
977 pub async fn uid_fetch_with_modifiers(
978 &mut self,
979 sequence_set: SequenceSet,
980 items: MacroOrMessageDataItemNames<'_>,
981 modifiers: Vec<FetchModifier>,
982 ) -> Result<HashMap<NonZeroU32, Vec1<MessageDataItem<'static>>>, ClientError> {
983 self._fetch(sequence_set, items, true, Some(modifiers))
984 .await
985 }
986
987 async fn _sort_or_fallback(
988 &mut self,
989 sort_criteria: impl IntoIterator<Item = SortCriterion> + Clone,
990 search_criteria: impl IntoIterator<Item = SearchKey<'_>>,
991 fetch_items: MacroOrMessageDataItemNames<'_>,
992 uid: bool,
993 ) -> Result<Vec<Vec1<MessageDataItem<'static>>>, ClientError> {
994 let mut fetch_items = match fetch_items {
995 MacroOrMessageDataItemNames::Macro(m) => m.expand().into_static(),
996 MacroOrMessageDataItemNames::MessageDataItemNames(items) => items,
997 };
998
999 if uid && !fetch_items.contains(&MessageDataItemName::Uid) {
1000 fetch_items.push(MessageDataItemName::Uid);
1001 }
1002
1003 let mut fetches = HashMap::new();
1004
1005 if self.state.ext_sort_supported() {
1006 let fetch_items = MacroOrMessageDataItemNames::MessageDataItemNames(fetch_items);
1007 let ids = self._sort(sort_criteria, search_criteria, uid).await?;
1008 let ids_chunks = ids.chunks(MAX_SEQUENCE_SIZE as usize);
1009 let ids_chunks_len = ids_chunks.len();
1010
1011 for (n, ids) in ids_chunks.enumerate() {
1012 debug!(?ids, "fetching sort envelopes {}/{ids_chunks_len}", n + 1);
1013 let ids = SequenceSet::try_from(ids.to_vec())?;
1014 let items = fetch_items.clone();
1015 fetches.extend(self._fetch(ids, items, uid, None).await?);
1016 }
1017
1018 let items = ids.into_iter().flat_map(|id| fetches.remove(&id)).collect();
1019
1020 Ok(items)
1021 } else {
1022 debug!("IMAP SORT extension not supported, using fallback");
1023
1024 let ids = self._search(search_criteria, uid).await?;
1025 let ids_chunks = ids.chunks(MAX_SEQUENCE_SIZE as usize);
1026 let ids_chunks_len = ids_chunks.len();
1027
1028 sort_criteria
1029 .clone()
1030 .into_iter()
1031 .filter_map(|criterion| match criterion.key {
1032 SortKey::Arrival => Some(MessageDataItemName::InternalDate),
1033 SortKey::Cc => Some(MessageDataItemName::Envelope),
1034 SortKey::Date => Some(MessageDataItemName::Envelope),
1035 SortKey::From => Some(MessageDataItemName::Envelope),
1036 SortKey::Size => Some(MessageDataItemName::Rfc822Size),
1037 SortKey::Subject => Some(MessageDataItemName::Envelope),
1038 SortKey::To => Some(MessageDataItemName::Envelope),
1039 SortKey::DisplayFrom => None,
1040 SortKey::DisplayTo => None,
1041 })
1042 .for_each(|item| {
1043 if !fetch_items.contains(&item) {
1044 fetch_items.push(item)
1045 }
1046 });
1047
1048 for (n, ids) in ids_chunks.enumerate() {
1049 debug!(?ids, "fetching search envelopes {}/{ids_chunks_len}", n + 1);
1050 let ids = SequenceSet::try_from(ids.to_vec())?;
1051 let items = fetch_items.clone();
1052 fetches.extend(self._fetch(ids, items.into(), uid, None).await?);
1053 }
1054
1055 let mut fetches: Vec<_> = fetches.into_values().collect();
1056
1057 fetches.sort_by(|a, b| {
1058 for criterion in sort_criteria.clone().into_iter() {
1059 let mut cmp = cmp_fetch_items(&criterion.key, a, b);
1060
1061 if criterion.reverse {
1062 cmp = cmp.reverse();
1063 }
1064
1065 if cmp.is_ne() {
1066 return cmp;
1067 }
1068 }
1069
1070 cmp_fetch_items(&SortKey::Date, a, b)
1071 });
1072
1073 Ok(fetches)
1074 }
1075 }
1076
1077 pub async fn sort_or_fallback(
1078 &mut self,
1079 sort_criteria: impl IntoIterator<Item = SortCriterion> + Clone,
1080 search_criteria: impl IntoIterator<Item = SearchKey<'_>>,
1081 fetch_items: MacroOrMessageDataItemNames<'_>,
1082 ) -> Result<Vec<Vec1<MessageDataItem<'static>>>, ClientError> {
1083 self._sort_or_fallback(sort_criteria, search_criteria, fetch_items, false)
1084 .await
1085 }
1086
1087 pub async fn uid_sort_or_fallback(
1088 &mut self,
1089 sort_criteria: impl IntoIterator<Item = SortCriterion> + Clone,
1090 search_criteria: impl IntoIterator<Item = SearchKey<'_>>,
1091 fetch_items: MacroOrMessageDataItemNames<'_>,
1092 ) -> Result<Vec<Vec1<MessageDataItem<'static>>>, ClientError> {
1093 self._sort_or_fallback(sort_criteria, search_criteria, fetch_items, true)
1094 .await
1095 }
1096
1097 pub async fn appenduid_or_fallback(
1098 &mut self,
1099 mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError> + Clone,
1100 flags: impl IntoIterator<Item = Flag<'_>>,
1101 message: impl AsRef<[u8]>,
1102 ) -> Result<Option<NonZeroU32>, ClientError> {
1103 if self.state.ext_uidplus_supported() {
1104 Ok(self
1105 .appenduid(mailbox, flags, message)
1106 .await?
1107 .map(|(uid, _)| uid))
1108 } else {
1109 debug!("IMAP UIDPLUS extension not supported, using fallback");
1110
1111 self.select(mailbox.clone()).await?;
1120
1121 let seq = match self.append(mailbox, flags, message).await? {
1122 Some(seq) => seq,
1123 None => match self.post_append_noop().await? {
1124 Some(seq) => seq,
1125 None => self
1126 .post_append_check()
1127 .await?
1128 .ok_or(ClientError::ResolveTask(TaskError::MissingData(
1129 "APPENDUID: seq".into(),
1130 )))?,
1131 },
1132 };
1133
1134 let uid = self
1135 .search(Vec1::from(SearchKey::SequenceSet(seq.try_into().unwrap())))
1136 .await?
1137 .into_iter()
1138 .next();
1139
1140 Ok(uid)
1141 }
1142 }
1143
1144 async fn _move_or_fallback(
1145 &mut self,
1146 sequence_set: SequenceSet,
1147 mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
1148 uid: bool,
1149 ) -> Result<(), ClientError> {
1150 if self.state.ext_move_supported() {
1151 self._move(sequence_set, mailbox, uid).await
1152 } else {
1153 debug!("IMAP MOVE extension not supported, using fallback");
1154 self._copy(sequence_set.clone(), mailbox, uid).await?;
1155 self._silent_store(sequence_set, StoreType::Add, Some(Flag::Deleted), uid)
1156 .await?;
1157 self.expunge().await?;
1158 Ok(())
1159 }
1160 }
1161
1162 pub async fn move_or_fallback(
1163 &mut self,
1164 sequence_set: SequenceSet,
1165 mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
1166 ) -> Result<(), ClientError> {
1167 self._move_or_fallback(sequence_set, mailbox, false).await
1168 }
1169
1170 pub async fn uid_move_or_fallback(
1171 &mut self,
1172 sequence_set: SequenceSet,
1173 mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
1174 ) -> Result<(), ClientError> {
1175 self._move_or_fallback(sequence_set, mailbox, true).await
1176 }
1177
1178 pub fn enqueue_idle(&mut self) -> Tag<'static> {
1179 let tag = self.state.resolver.scheduler.tag_generator.generate();
1180
1181 self.state
1182 .resolver
1183 .scheduler
1184 .client_next
1185 .enqueue_command(Command {
1186 tag: tag.clone(),
1187 body: CommandBody::Idle,
1188 });
1189
1190 tag.into_static()
1191 }
1192
1193 #[tracing::instrument(name = "idle", skip_all)]
1194 pub async fn idle(&mut self, tag: Tag<'static>) -> Result<(), stream::Error<NextError>> {
1195 debug!("starting the main loop");
1196
1197 loop {
1198 let progress = self
1199 .stream
1200 .next(&mut self.state.resolver.scheduler.client_next);
1201 match timeout(self.state.idle_timeout, progress).await.ok() {
1202 None => {
1203 debug!("timed out, sending done command…");
1204 self.state.resolver.scheduler.client_next.set_idle_done();
1205 }
1206 Some(Err(err)) => {
1207 break Err(err);
1208 }
1209 Some(Ok(Event::IdleCommandSent { .. })) => {
1210 debug!("command sent");
1211 }
1212 Some(Ok(Event::IdleAccepted { .. })) => {
1213 debug!("command accepted, entering idle mode");
1214 }
1215 Some(Ok(Event::IdleRejected { status, .. })) => {
1216 debug!("command rejected, aborting: {status:?}");
1217 break Ok(());
1218 }
1219 Some(Ok(Event::IdleDoneSent { .. })) => {
1220 debug!("done command sent");
1221 }
1222 Some(Ok(Event::DataReceived { data })) => {
1223 debug!("received data, sending done command…");
1224 trace!("{data:#?}");
1225 self.state.resolver.scheduler.client_next.set_idle_done();
1226 }
1227 Some(Ok(Event::StatusReceived {
1228 status:
1229 Status::Tagged(Tagged {
1230 tag: ref got_tag, ..
1231 }),
1232 })) if *got_tag == tag => {
1233 debug!("received tagged response, exiting");
1234 break Ok(());
1235 }
1236 Some(event) => {
1237 debug!("received unknown event, ignoring: {event:?}");
1238 }
1239 }
1240 }
1241 }
1242
1243 #[tracing::instrument(name = "idle/done", skip_all)]
1244 pub async fn idle_done(&mut self, tag: Tag<'static>) -> Result<(), stream::Error<NextError>> {
1245 self.state.resolver.scheduler.client_next.set_idle_done();
1246
1247 loop {
1248 let progress = self
1249 .stream
1250 .next(&mut self.state.resolver.scheduler.client_next)
1251 .await?;
1252
1253 match progress {
1254 Event::IdleDoneSent { .. } => {
1255 debug!("done command sent");
1256 }
1257 Event::StatusReceived {
1258 status:
1259 Status::Tagged(Tagged {
1260 tag: ref got_tag, ..
1261 }),
1262 } if *got_tag == tag => {
1263 debug!("received tagged response, exiting");
1264 break Ok(());
1265 }
1266 event => {
1267 debug!("received unknown event, ignoring: {event:?}");
1268 }
1269 }
1270 }
1271 }
1272}
1273
1274pub(crate) fn cmp_fetch_items(
1275 criterion: &SortKey,
1276 a: &Vec1<MessageDataItem>,
1277 b: &Vec1<MessageDataItem>,
1278) -> Ordering {
1279 use MessageDataItem::*;
1280
1281 match &criterion {
1282 SortKey::Arrival => {
1283 let a = a.as_ref().iter().find_map(|a| {
1284 if let InternalDate(dt) = a {
1285 Some(dt.as_ref())
1286 } else {
1287 None
1288 }
1289 });
1290
1291 let b = b.as_ref().iter().find_map(|b| {
1292 if let InternalDate(dt) = b {
1293 Some(dt.as_ref())
1294 } else {
1295 None
1296 }
1297 });
1298
1299 a.cmp(&b)
1300 }
1301 SortKey::Date => {
1302 let a = a.as_ref().iter().find_map(|a| {
1303 if let Envelope(envelope) = a {
1304 envelope.date.0.as_ref().map(AsRef::as_ref)
1305 } else {
1306 None
1307 }
1308 });
1309
1310 let b = b.as_ref().iter().find_map(|b| {
1311 if let Envelope(envelope) = b {
1312 envelope.date.0.as_ref().map(AsRef::as_ref)
1313 } else {
1314 None
1315 }
1316 });
1317
1318 a.cmp(&b)
1319 }
1320 SortKey::Size => {
1321 let a = a.as_ref().iter().find_map(|a| {
1322 if let Rfc822Size(size) = a {
1323 Some(size)
1324 } else {
1325 None
1326 }
1327 });
1328
1329 let b = b.as_ref().iter().find_map(|b| {
1330 if let Rfc822Size(size) = b {
1331 Some(size)
1332 } else {
1333 None
1334 }
1335 });
1336
1337 a.cmp(&b)
1338 }
1339 SortKey::Subject => {
1340 let a = a.as_ref().iter().find_map(|a| {
1341 if let Envelope(envelope) = a {
1342 envelope.subject.0.as_ref().map(AsRef::as_ref)
1343 } else {
1344 None
1345 }
1346 });
1347
1348 let b = b.as_ref().iter().find_map(|b| {
1349 if let Envelope(envelope) = b {
1350 envelope.subject.0.as_ref().map(AsRef::as_ref)
1351 } else {
1352 None
1353 }
1354 });
1355
1356 a.cmp(&b)
1357 }
1358 SortKey::Cc | SortKey::From | SortKey::To | SortKey::DisplayFrom | SortKey::DisplayTo => {
1360 Ordering::Equal
1361 }
1362 }
1363}
1364
1365pub(crate) fn to_static_literal(
1366 message: impl AsRef<[u8]>,
1367 ext_binary_supported: bool,
1368) -> Result<LiteralOrLiteral8<'static>, ValidationError> {
1369 let message = if ext_binary_supported {
1370 LiteralOrLiteral8::Literal8(Literal8 {
1371 data: message.as_ref().into(),
1372 mode: LiteralMode::Sync,
1373 })
1374 } else {
1375 debug!("IMAP BINARY extension not supported, using fallback");
1376 Literal::validate(message.as_ref())?;
1377 LiteralOrLiteral8::Literal(Literal::unvalidated(message.as_ref()))
1378 };
1379
1380 Ok(message.into_static())
1381}