1use std::{
2 cmp::Ordering,
3 collections::HashMap,
4 io,
5 num::NonZeroU32,
6 path::PathBuf,
7 pin::Pin,
8 sync::Arc,
9 task::{Context, Poll},
10};
11
12use imap_next::{
13 client::{Error as NextError, Event, Options as ClientOptions},
14 imap_types::{
15 command::{Command, CommandBody},
16 core::{AString, IString, Literal, LiteralMode, NString, QuotedChar, Tag, Vec1},
17 error::ValidationError,
18 extensions::{
19 binary::{Literal8, LiteralOrLiteral8},
20 enable::CapabilityEnable,
21 sort::{SortCriterion, SortKey},
22 thread::{Thread, ThreadingAlgorithm},
23 },
24 fetch::{MacroOrMessageDataItemNames, MessageDataItem, MessageDataItemName},
25 flag::{Flag, FlagNameAttribute, StoreType},
26 mailbox::{ListMailbox, Mailbox},
27 response::{Code, Status, Tagged},
28 search::SearchKey,
29 secret::Secret,
30 sequence::SequenceSet,
31 IntoStatic,
32 },
33};
34use rip_starttls::imap::tokio::RipStarttls;
35use rustls_platform_verifier::ConfigVerifierExt;
36use thiserror::Error;
37use tokio::{
38 fs,
39 io::{AsyncRead, AsyncWrite, ReadBuf},
40 net::TcpStream,
41 time::timeout,
42};
43use tokio_rustls::{
44 rustls::{
45 pki_types::{pem::PemObject, CertificateDer, ServerName},
46 ClientConfig,
47 },
48 TlsConnector,
49};
50use tracing::{debug, trace};
51
52use crate::{
53 client::verifier::FingerprintVerifier,
54 stream::{self, Stream},
55 tasks::{
56 tasks::{
57 append::{AppendTask, PostAppendCheckTask, PostAppendNoOpTask},
58 appenduid::AppendUidTask,
59 authenticate::AuthenticateTask,
60 capability::CapabilityTask,
61 check::CheckTask,
62 copy::CopyTask,
63 create::CreateTask,
64 delete::DeleteTask,
65 enable::EnableTask,
66 expunge::ExpungeTask,
67 fetch::{FetchFirstTask, FetchTask},
68 id::IdTask,
69 list::ListTask,
70 login::LoginTask,
71 noop::NoOpTask,
72 r#move::MoveTask,
73 search::SearchTask,
74 select::{SelectDataUnvalidated, SelectTask},
75 sort::SortTask,
76 store::StoreTask,
77 thread::ThreadTask,
78 TaskError,
79 },
80 SchedulerError, SchedulerEvent, Task,
81 },
82};
83
84static MAX_SEQUENCE_SIZE: u8 = u8::MAX; #[derive(Debug, Error)]
87pub enum ClientError {
88 #[error("cannot upgrade client to TLS: client is already in TLS state")]
89 ClientAlreadyTlsError,
90 #[error("cannot do STARTTLS prefix")]
91 DoStarttlsPrefixError(#[from] io::Error),
92 #[error("stream error")]
93 Stream(#[from] stream::Error<SchedulerError>),
94 #[error("validation error")]
95 Validation(#[from] ValidationError),
96 #[error("cannot connect to TCP stream")]
97 ConnectToTcpStreamError(#[source] io::Error),
98 #[error("cannot connect to TLS stream")]
99 ConnectToTlsStreamError(#[source] io::Error),
100 #[error("cannot create TLS client config")]
101 TlsConfig(#[source] tokio_rustls::rustls::Error),
102
103 #[error("cannot receive greeting from server")]
104 ReceiveGreeting(#[source] stream::Error<SchedulerError>),
105 #[error("cannot resolve IMAP task")]
106 ResolveTask(#[from] TaskError),
107 #[error("{0} should contain at least one valid certificate")]
108 EmptyCert(PathBuf),
109 #[error("{1} does not contain a valid certificate")]
110 InvalidCert(tokio_rustls::rustls::pki_types::pem::Error, PathBuf),
111}
112
113pub enum MaybeTlsStream {
114 Plain(TcpStream),
115 Rustls(tokio_rustls::client::TlsStream<TcpStream>),
116}
117
118impl AsyncRead for MaybeTlsStream {
119 fn poll_read(
120 self: Pin<&mut Self>,
121 cx: &mut Context<'_>,
122 buf: &mut ReadBuf<'_>,
123 ) -> Poll<io::Result<()>> {
124 match self.get_mut() {
125 Self::Plain(s) => Pin::new(s).poll_read(cx, buf),
126 Self::Rustls(s) => Pin::new(s).poll_read(cx, buf),
127 }
128 }
129}
130
131impl AsyncWrite for MaybeTlsStream {
132 fn poll_write(
133 self: Pin<&mut Self>,
134 cx: &mut Context<'_>,
135 buf: &[u8],
136 ) -> Poll<io::Result<usize>> {
137 match self.get_mut() {
138 Self::Plain(s) => Pin::new(s).poll_write(cx, buf),
139 Self::Rustls(s) => Pin::new(s).poll_write(cx, buf),
140 }
141 }
142
143 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
144 match self.get_mut() {
145 Self::Plain(s) => Pin::new(s).poll_flush(cx),
146 Self::Rustls(s) => Pin::new(s).poll_flush(cx),
147 }
148 }
149
150 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
151 match self.get_mut() {
152 Self::Plain(s) => Pin::new(s).poll_shutdown(cx),
153 Self::Rustls(s) => Pin::new(s).poll_shutdown(cx),
154 }
155 }
156}
157
158pub struct Client {
159 host: String,
160 pub state: super::Client,
161 pub stream: Stream<MaybeTlsStream>,
162}
163
164impl Client {
169 pub async fn insecure(host: impl ToString, port: u16) -> Result<Self, ClientError> {
175 let mut client = Self::tcp(host, port, false).await?;
176
177 if !client.receive_greeting().await? {
178 client.refresh_capabilities().await?;
179 }
180
181 Ok(client)
182 }
183
184 pub async fn rustls(
190 host: impl ToString,
191 port: u16,
192 starttls: bool,
193 cert: Option<PathBuf>,
194 ) -> Result<Self, ClientError> {
195 let tcp = Self::tcp(host, port, starttls).await?;
196 Self::upgrade_rustls(tcp, starttls, cert).await
197 }
198
199 async fn tcp(
204 host: impl ToString,
205 port: u16,
206 discard_greeting: bool,
207 ) -> Result<Self, ClientError> {
208 let host = host.to_string();
209
210 let tcp_stream = TcpStream::connect((host.as_str(), port))
211 .await
212 .map_err(ClientError::ConnectToTcpStreamError)?;
213
214 let stream = Stream::new(MaybeTlsStream::Plain(tcp_stream));
215
216 let mut opts = ClientOptions::default();
217 opts.crlf_relaxed = true;
218 opts.discard_greeting = discard_greeting;
219
220 let state = super::Client::new(opts);
221
222 Ok(Self {
223 host,
224 stream,
225 state,
226 })
227 }
228
229 async fn upgrade_rustls(
239 mut self,
240 starttls: bool,
241 cert: Option<PathBuf>,
242 ) -> Result<Self, ClientError> {
243 let MaybeTlsStream::Plain(mut tcp_stream) = self.stream.into_inner() else {
244 return Err(ClientError::ClientAlreadyTlsError);
245 };
246
247 if starttls {
248 tcp_stream = RipStarttls::default()
249 .do_starttls_prefix(tcp_stream)
250 .await
251 .map_err(ClientError::DoStarttlsPrefixError)?;
252 }
253
254 let mut config = if let Some(pem_path) = cert {
255 let pem = fs::read(&pem_path).await?;
256
257 let Some(cert) = CertificateDer::pem_slice_iter(&pem).next() else {
258 return Err(ClientError::EmptyCert(pem_path));
259 };
260
261 let cert = match cert {
262 Ok(cert) => cert,
263 Err(err) => return Err(ClientError::InvalidCert(err, pem_path)),
264 };
265
266 let verifier = FingerprintVerifier::new(&cert);
267
268 ClientConfig::builder()
269 .dangerous()
270 .with_custom_certificate_verifier(Arc::new(verifier))
271 .with_no_client_auth()
272 } else {
273 ClientConfig::with_platform_verifier().map_err(ClientError::TlsConfig)?
274 };
275
276 config.alpn_protocols = vec![b"imap".to_vec()];
278
279 let connector = TlsConnector::from(Arc::new(config));
280 let dnsname = ServerName::try_from(self.host.clone()).unwrap();
281
282 let tls_stream = connector
283 .connect(dnsname, tcp_stream)
284 .await
285 .map_err(ClientError::ConnectToTlsStreamError)?;
286
287 self.stream = Stream::new(MaybeTlsStream::Rustls(tls_stream));
288
289 if starttls || !self.receive_greeting().await? {
290 self.refresh_capabilities().await?;
291 }
292
293 Ok(self)
294 }
295
296 async fn receive_greeting(&mut self) -> Result<bool, ClientError> {
303 let evt = self
304 .stream
305 .next(&mut self.state.resolver)
306 .await
307 .map_err(ClientError::ReceiveGreeting)?;
308
309 if let SchedulerEvent::GreetingReceived(greeting) = evt {
310 if let Some(Code::Capability(capabilities)) = greeting.code {
311 self.state.capabilities = capabilities;
312 return Ok(true);
313 }
314 }
315
316 Ok(false)
317 }
318}
319
320impl Client {
326 pub async fn resolve<T: Task>(&mut self, task: T) -> Result<T::Output, ClientError> {
328 Ok(self.stream.next(self.state.resolver.resolve(task)).await?)
329 }
330
331 pub async fn enable(
333 &mut self,
334 capabilities: impl IntoIterator<Item = CapabilityEnable<'_>>,
335 ) -> Result<Option<Vec<CapabilityEnable<'_>>>, ClientError> {
336 if !self.state.ext_enable_supported() {
337 debug!("IMAP ENABLE extension not supported, skipping");
338 return Ok(None);
339 }
340
341 let capabilities: Vec<_> = capabilities
342 .into_iter()
343 .map(IntoStatic::into_static)
344 .collect();
345
346 if capabilities.is_empty() {
347 return Ok(None);
348 }
349
350 let capabilities = Vec1::try_from(capabilities).unwrap();
351
352 Ok(self.resolve(EnableTask::new(capabilities)).await??)
353 }
354
355 pub async fn create(
357 &mut self,
358 mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
359 ) -> Result<(), ClientError> {
360 let mbox = mailbox.try_into()?.into_static();
361 Ok(self.resolve(CreateTask::new(mbox)).await??)
362 }
363
364 pub async fn list(
366 &mut self,
367 mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
368 mailbox_wildcard: impl TryInto<ListMailbox<'_>, Error = ValidationError>,
369 ) -> Result<
370 Vec<(
371 Mailbox<'static>,
372 Option<QuotedChar>,
373 Vec<FlagNameAttribute<'static>>,
374 )>,
375 ClientError,
376 > {
377 let mbox = mailbox.try_into()?.into_static();
378 let mbox_wcard = mailbox_wildcard.try_into()?.into_static();
379 Ok(self.resolve(ListTask::new(mbox, mbox_wcard)).await??)
380 }
381
382 pub async fn select(
384 &mut self,
385 mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
386 ) -> Result<SelectDataUnvalidated, ClientError> {
387 let mbox = mailbox.try_into()?.into_static();
388 Ok(self.resolve(SelectTask::new(mbox)).await??)
389 }
390
391 pub async fn examine(
393 &mut self,
394 mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
395 ) -> Result<SelectDataUnvalidated, ClientError> {
396 let mbox = mailbox.try_into()?.into_static();
397 Ok(self.resolve(SelectTask::read_only(mbox)).await??)
398 }
399
400 pub async fn expunge(&mut self) -> Result<Vec<NonZeroU32>, ClientError> {
405 Ok(self.resolve(ExpungeTask::new()).await??)
406 }
407
408 pub async fn delete(
410 &mut self,
411 mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
412 ) -> Result<(), ClientError> {
413 let mbox = mailbox.try_into()?.into_static();
414 Ok(self.resolve(DeleteTask::new(mbox)).await??)
415 }
416
417 async fn _search(
419 &mut self,
420 criteria: impl IntoIterator<Item = SearchKey<'_>>,
421 uid: bool,
422 ) -> Result<Vec<NonZeroU32>, ClientError> {
423 let criteria: Vec<_> = criteria.into_iter().map(IntoStatic::into_static).collect();
424
425 let criteria = if criteria.is_empty() {
426 Vec1::from(SearchKey::All)
427 } else {
428 Vec1::try_from(criteria).unwrap()
429 };
430
431 Ok(self
432 .resolve(SearchTask::new(criteria).with_uid(uid))
433 .await??)
434 }
435
436 pub async fn search(
441 &mut self,
442 criteria: impl IntoIterator<Item = SearchKey<'_>>,
443 ) -> Result<Vec<NonZeroU32>, ClientError> {
444 self._search(criteria, false).await
445 }
446
447 pub async fn uid_search(
452 &mut self,
453 criteria: impl IntoIterator<Item = SearchKey<'_>>,
454 ) -> Result<Vec<NonZeroU32>, ClientError> {
455 self._search(criteria, true).await
456 }
457
458 async fn _sort(
461 &mut self,
462 sort_criteria: impl IntoIterator<Item = SortCriterion>,
463 search_criteria: impl IntoIterator<Item = SearchKey<'_>>,
464 uid: bool,
465 ) -> Result<Vec<NonZeroU32>, ClientError> {
466 let sort: Vec<_> = sort_criteria.into_iter().collect();
467 let sort = if sort.is_empty() {
468 Vec1::from(SortCriterion {
469 reverse: true,
470 key: SortKey::Date,
471 })
472 } else {
473 Vec1::try_from(sort).unwrap()
474 };
475
476 let search: Vec<_> = search_criteria
477 .into_iter()
478 .map(IntoStatic::into_static)
479 .collect();
480 let search = if search.is_empty() {
481 Vec1::from(SearchKey::All)
482 } else {
483 Vec1::try_from(search).unwrap()
484 };
485
486 Ok(self
487 .resolve(SortTask::new(sort, search).with_uid(uid))
488 .await??)
489 }
490
491 pub async fn sort(
497 &mut self,
498 sort_criteria: impl IntoIterator<Item = SortCriterion>,
499 search_criteria: impl IntoIterator<Item = SearchKey<'_>>,
500 ) -> Result<Vec<NonZeroU32>, ClientError> {
501 self._sort(sort_criteria, search_criteria, false).await
502 }
503
504 pub async fn uid_sort(
510 &mut self,
511 sort_criteria: impl IntoIterator<Item = SortCriterion>,
512 search_criteria: impl IntoIterator<Item = SearchKey<'_>>,
513 ) -> Result<Vec<NonZeroU32>, ClientError> {
514 self._sort(sort_criteria, search_criteria, true).await
515 }
516
517 async fn _thread(
518 &mut self,
519 algorithm: ThreadingAlgorithm<'_>,
520 search_criteria: impl IntoIterator<Item = SearchKey<'_>>,
521 uid: bool,
522 ) -> Result<Vec<Thread>, ClientError> {
523 let alg = algorithm.into_static();
524
525 let search: Vec<_> = search_criteria
526 .into_iter()
527 .map(IntoStatic::into_static)
528 .collect();
529 let search = if search.is_empty() {
530 Vec1::from(SearchKey::All)
531 } else {
532 Vec1::try_from(search).unwrap()
533 };
534
535 Ok(self
536 .resolve(ThreadTask::new(alg, search).with_uid(uid))
537 .await??)
538 }
539
540 pub async fn thread(
541 &mut self,
542 algorithm: ThreadingAlgorithm<'_>,
543 search_criteria: impl IntoIterator<Item = SearchKey<'_>>,
544 ) -> Result<Vec<Thread>, ClientError> {
545 self._thread(algorithm, search_criteria, false).await
546 }
547
548 pub async fn uid_thread(
549 &mut self,
550 algorithm: ThreadingAlgorithm<'_>,
551 search_criteria: impl IntoIterator<Item = SearchKey<'_>>,
552 ) -> Result<Vec<Thread>, ClientError> {
553 self._thread(algorithm, search_criteria, true).await
554 }
555
556 async fn _store(
557 &mut self,
558 sequence_set: SequenceSet,
559 kind: StoreType,
560 flags: impl IntoIterator<Item = Flag<'_>>,
561 uid: bool,
562 ) -> Result<HashMap<NonZeroU32, Vec1<MessageDataItem<'static>>>, ClientError> {
563 let flags: Vec<_> = flags.into_iter().map(IntoStatic::into_static).collect();
564
565 Ok(self
566 .resolve(StoreTask::new(sequence_set, kind, flags).with_uid(uid))
567 .await??)
568 }
569
570 pub async fn store(
571 &mut self,
572 sequence_set: SequenceSet,
573 kind: StoreType,
574 flags: impl IntoIterator<Item = Flag<'_>>,
575 ) -> Result<HashMap<NonZeroU32, Vec1<MessageDataItem<'static>>>, ClientError> {
576 self._store(sequence_set, kind, flags, false).await
577 }
578
579 pub async fn uid_store(
580 &mut self,
581 sequence_set: SequenceSet,
582 kind: StoreType,
583 flags: impl IntoIterator<Item = Flag<'_>>,
584 ) -> Result<HashMap<NonZeroU32, Vec1<MessageDataItem<'static>>>, ClientError> {
585 self._store(sequence_set, kind, flags, true).await
586 }
587
588 async fn _silent_store(
589 &mut self,
590 sequence_set: SequenceSet,
591 kind: StoreType,
592 flags: impl IntoIterator<Item = Flag<'_>>,
593 uid: bool,
594 ) -> Result<(), ClientError> {
595 let flags: Vec<_> = flags.into_iter().map(IntoStatic::into_static).collect();
596
597 let task = StoreTask::new(sequence_set, kind, flags)
598 .with_uid(uid)
599 .silent();
600
601 Ok(self.resolve(task).await??)
602 }
603
604 pub async fn silent_store(
605 &mut self,
606 sequence_set: SequenceSet,
607 kind: StoreType,
608 flags: impl IntoIterator<Item = Flag<'_>>,
609 ) -> Result<(), ClientError> {
610 self._silent_store(sequence_set, kind, flags, false).await
611 }
612
613 pub async fn uid_silent_store(
614 &mut self,
615 sequence_set: SequenceSet,
616 kind: StoreType,
617 flags: impl IntoIterator<Item = Flag<'_>>,
618 ) -> Result<(), ClientError> {
619 self._silent_store(sequence_set, kind, flags, true).await
620 }
621
622 pub async fn post_append_noop(&mut self) -> Result<Option<u32>, ClientError> {
623 Ok(self.resolve(PostAppendNoOpTask::new()).await??)
624 }
625
626 pub async fn post_append_check(&mut self) -> Result<Option<u32>, ClientError> {
627 Ok(self.resolve(PostAppendCheckTask::new()).await??)
628 }
629
630 async fn _fetch_first(
631 &mut self,
632 id: NonZeroU32,
633 items: MacroOrMessageDataItemNames<'_>,
634 uid: bool,
635 ) -> Result<Vec1<MessageDataItem<'static>>, ClientError> {
636 let items = items.into_static();
637
638 Ok(self
639 .resolve(FetchFirstTask::new(id, items).with_uid(uid))
640 .await??)
641 }
642
643 pub async fn fetch_first(
644 &mut self,
645 id: NonZeroU32,
646 items: MacroOrMessageDataItemNames<'_>,
647 ) -> Result<Vec1<MessageDataItem<'static>>, ClientError> {
648 self._fetch_first(id, items, false).await
649 }
650
651 pub async fn uid_fetch_first(
652 &mut self,
653 id: NonZeroU32,
654 items: MacroOrMessageDataItemNames<'_>,
655 ) -> Result<Vec1<MessageDataItem<'static>>, ClientError> {
656 self._fetch_first(id, items, true).await
657 }
658
659 async fn _copy(
660 &mut self,
661 sequence_set: SequenceSet,
662 mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
663 uid: bool,
664 ) -> Result<(), ClientError> {
665 let mbox = mailbox.try_into()?.into_static();
666
667 Ok(self
668 .resolve(CopyTask::new(sequence_set, mbox).with_uid(uid))
669 .await??)
670 }
671
672 pub async fn copy(
673 &mut self,
674 sequence_set: SequenceSet,
675 mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
676 ) -> Result<(), ClientError> {
677 self._copy(sequence_set, mailbox, false).await
678 }
679
680 pub async fn uid_copy(
681 &mut self,
682 sequence_set: SequenceSet,
683 mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
684 ) -> Result<(), ClientError> {
685 self._copy(sequence_set, mailbox, true).await
686 }
687
688 async fn _move(
689 &mut self,
690 sequence_set: SequenceSet,
691 mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
692 uid: bool,
693 ) -> Result<(), ClientError> {
694 let mbox = mailbox.try_into()?.into_static();
695
696 Ok(self
697 .resolve(MoveTask::new(sequence_set, mbox).with_uid(uid))
698 .await??)
699 }
700
701 pub async fn r#move(
702 &mut self,
703 sequence_set: SequenceSet,
704 mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
705 ) -> Result<(), ClientError> {
706 self._move(sequence_set, mailbox, false).await
707 }
708
709 pub async fn uid_move(
710 &mut self,
711 sequence_set: SequenceSet,
712 mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
713 ) -> Result<(), ClientError> {
714 self._move(sequence_set, mailbox, true).await
715 }
716
717 pub async fn check(&mut self) -> Result<(), ClientError> {
719 Ok(self.resolve(CheckTask::new()).await??)
720 }
721
722 pub async fn noop(&mut self) -> Result<(), ClientError> {
724 Ok(self.resolve(NoOpTask::new()).await??)
725 }
726}
727
728impl Client {
735 pub async fn refresh_capabilities(&mut self) -> Result<(), ClientError> {
737 self.state.capabilities = self.resolve(CapabilityTask::new()).await??;
738
739 Ok(())
740 }
741
742 pub async fn login(
744 &mut self,
745 username: impl TryInto<AString<'_>, Error = ValidationError>,
746 password: impl TryInto<AString<'_>, Error = ValidationError>,
747 ) -> Result<(), ClientError> {
748 let username = username.try_into()?.into_static();
749 let password = password.try_into()?.into_static();
750 let login = self.resolve(LoginTask::new(username, Secret::new(password)));
751
752 match login.await?? {
753 Some(capabilities) => {
754 self.state.capabilities = capabilities;
755 }
756 None => {
757 self.refresh_capabilities().await?;
758 }
759 };
760
761 Ok(())
762 }
763
764 async fn authenticate(&mut self, task: AuthenticateTask) -> Result<(), ClientError> {
769 match self.resolve(task).await?? {
770 Some(capabilities) => {
771 self.state.capabilities = capabilities;
772 }
773 None => {
774 self.refresh_capabilities().await?;
775 }
776 };
777
778 Ok(())
779 }
780
781 pub async fn authenticate_plain(
783 &mut self,
784 login: impl AsRef<str>,
785 password: impl AsRef<str>,
786 ) -> Result<(), ClientError> {
787 self.authenticate(AuthenticateTask::plain(
788 login.as_ref(),
789 password.as_ref(),
790 self.state.ext_sasl_ir_supported(),
791 ))
792 .await
793 }
794
795 pub async fn authenticate_xoauth2(
797 &mut self,
798 login: impl AsRef<str>,
799 token: impl AsRef<str>,
800 ) -> Result<(), ClientError> {
801 self.authenticate(AuthenticateTask::xoauth2(
802 login.as_ref(),
803 token.as_ref(),
804 self.state.ext_sasl_ir_supported(),
805 ))
806 .await
807 }
808
809 pub async fn authenticate_oauthbearer(
811 &mut self,
812 user: impl AsRef<str>,
813 host: impl AsRef<str>,
814 port: u16,
815 token: impl AsRef<str>,
816 ) -> Result<(), ClientError> {
817 self.authenticate(AuthenticateTask::oauthbearer(
818 user.as_ref(),
819 host.as_ref(),
820 port,
821 token.as_ref(),
822 self.state.ext_sasl_ir_supported(),
823 ))
824 .await
825 }
826
827 pub async fn id(
832 &mut self,
833 params: Option<Vec<(IString<'static>, NString<'static>)>>,
834 ) -> Result<Option<Vec<(IString<'static>, NString<'static>)>>, ClientError> {
835 Ok(if self.state.ext_id_supported() {
836 self.resolve(IdTask::new(params)).await??
837 } else {
838 debug!("IMAP ID extension not supported, skipping");
839 None
840 })
841 }
842
843 pub async fn append(
844 &mut self,
845 mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
846 flags: impl IntoIterator<Item = Flag<'_>>,
847 message: impl AsRef<[u8]>,
848 ) -> Result<Option<u32>, ClientError> {
849 let mbox = mailbox.try_into()?.into_static();
850
851 let flags: Vec<_> = flags.into_iter().map(IntoStatic::into_static).collect();
852
853 let msg = to_static_literal(message, self.state.ext_binary_supported())?;
854
855 Ok(self
856 .resolve(AppendTask::new(mbox, msg).with_flags(flags))
857 .await??)
858 }
859
860 pub async fn appenduid(
861 &mut self,
862 mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
863 flags: impl IntoIterator<Item = Flag<'_>>,
864 message: impl AsRef<[u8]>,
865 ) -> Result<Option<(NonZeroU32, NonZeroU32)>, ClientError> {
866 let mbox = mailbox.try_into()?.into_static();
867
868 let flags: Vec<_> = flags.into_iter().map(IntoStatic::into_static).collect();
869
870 let msg = to_static_literal(message, self.state.ext_binary_supported())?;
871
872 Ok(self
873 .resolve(AppendUidTask::new(mbox, msg).with_flags(flags))
874 .await??)
875 }
876}
877
878impl Client {
885 async fn _fetch(
886 &mut self,
887 sequence_set: SequenceSet,
888 items: MacroOrMessageDataItemNames<'_>,
889 uid: bool,
890 ) -> Result<HashMap<NonZeroU32, Vec1<MessageDataItem<'static>>>, ClientError> {
891 let mut items = match items {
892 MacroOrMessageDataItemNames::Macro(m) => m.expand().into_static(),
893 MacroOrMessageDataItemNames::MessageDataItemNames(items) => items.into_static(),
894 };
895
896 if uid {
897 items.push(MessageDataItemName::Uid);
898 }
899
900 let seq_map = self
901 .resolve(FetchTask::new(sequence_set, items.into()).with_uid(uid))
902 .await??;
903
904 if uid {
905 let mut uid_map = HashMap::new();
906
907 for (seq, items) in seq_map {
908 let uid = items.as_ref().iter().find_map(|item| {
909 if let MessageDataItem::Uid(uid) = item {
910 Some(*uid)
911 } else {
912 None
913 }
914 });
915
916 match uid {
917 Some(uid) => {
918 uid_map.insert(uid, items);
919 }
920 None => {
921 debug!(?seq, "cannot get message uid, skipping it");
922 }
923 }
924 }
925
926 Ok(uid_map)
927 } else {
928 Ok(seq_map)
929 }
930 }
931
932 pub async fn fetch(
933 &mut self,
934 sequence_set: SequenceSet,
935 items: MacroOrMessageDataItemNames<'_>,
936 ) -> Result<HashMap<NonZeroU32, Vec1<MessageDataItem<'static>>>, ClientError> {
937 self._fetch(sequence_set, items, false).await
938 }
939
940 pub async fn uid_fetch(
941 &mut self,
942 sequence_set: SequenceSet,
943 items: MacroOrMessageDataItemNames<'_>,
944 ) -> Result<HashMap<NonZeroU32, Vec1<MessageDataItem<'static>>>, ClientError> {
945 self._fetch(sequence_set, items, true).await
946 }
947
948 async fn _sort_or_fallback(
949 &mut self,
950 sort_criteria: impl IntoIterator<Item = SortCriterion> + Clone,
951 search_criteria: impl IntoIterator<Item = SearchKey<'_>>,
952 fetch_items: MacroOrMessageDataItemNames<'_>,
953 uid: bool,
954 ) -> Result<Vec<Vec1<MessageDataItem<'static>>>, ClientError> {
955 let mut fetch_items = match fetch_items {
956 MacroOrMessageDataItemNames::Macro(m) => m.expand().into_static(),
957 MacroOrMessageDataItemNames::MessageDataItemNames(items) => items,
958 };
959
960 if uid && !fetch_items.contains(&MessageDataItemName::Uid) {
961 fetch_items.push(MessageDataItemName::Uid);
962 }
963
964 let mut fetches = HashMap::new();
965
966 if self.state.ext_sort_supported() {
967 let fetch_items = MacroOrMessageDataItemNames::MessageDataItemNames(fetch_items);
968 let ids = self._sort(sort_criteria, search_criteria, uid).await?;
969 let ids_chunks = ids.chunks(MAX_SEQUENCE_SIZE as usize);
970 let ids_chunks_len = ids_chunks.len();
971
972 for (n, ids) in ids_chunks.enumerate() {
973 debug!(?ids, "fetching sort envelopes {}/{ids_chunks_len}", n + 1);
974 let ids = SequenceSet::try_from(ids.to_vec())?;
975 let items = fetch_items.clone();
976 fetches.extend(self._fetch(ids, items, uid).await?);
977 }
978
979 let items = ids.into_iter().flat_map(|id| fetches.remove(&id)).collect();
980
981 Ok(items)
982 } else {
983 debug!("IMAP SORT extension not supported, using fallback");
984
985 let ids = self._search(search_criteria, uid).await?;
986 let ids_chunks = ids.chunks(MAX_SEQUENCE_SIZE as usize);
987 let ids_chunks_len = ids_chunks.len();
988
989 sort_criteria
990 .clone()
991 .into_iter()
992 .filter_map(|criterion| match criterion.key {
993 SortKey::Arrival => Some(MessageDataItemName::InternalDate),
994 SortKey::Cc => Some(MessageDataItemName::Envelope),
995 SortKey::Date => Some(MessageDataItemName::Envelope),
996 SortKey::From => Some(MessageDataItemName::Envelope),
997 SortKey::Size => Some(MessageDataItemName::Rfc822Size),
998 SortKey::Subject => Some(MessageDataItemName::Envelope),
999 SortKey::To => Some(MessageDataItemName::Envelope),
1000 SortKey::DisplayFrom => None,
1001 SortKey::DisplayTo => None,
1002 })
1003 .for_each(|item| {
1004 if !fetch_items.contains(&item) {
1005 fetch_items.push(item)
1006 }
1007 });
1008
1009 for (n, ids) in ids_chunks.enumerate() {
1010 debug!(?ids, "fetching search envelopes {}/{ids_chunks_len}", n + 1);
1011 let ids = SequenceSet::try_from(ids.to_vec())?;
1012 let items = fetch_items.clone();
1013 fetches.extend(self._fetch(ids, items.into(), uid).await?);
1014 }
1015
1016 let mut fetches: Vec<_> = fetches.into_values().collect();
1017
1018 fetches.sort_by(|a, b| {
1019 for criterion in sort_criteria.clone().into_iter() {
1020 let mut cmp = cmp_fetch_items(&criterion.key, a, b);
1021
1022 if criterion.reverse {
1023 cmp = cmp.reverse();
1024 }
1025
1026 if cmp.is_ne() {
1027 return cmp;
1028 }
1029 }
1030
1031 cmp_fetch_items(&SortKey::Date, a, b)
1032 });
1033
1034 Ok(fetches)
1035 }
1036 }
1037
1038 pub async fn sort_or_fallback(
1039 &mut self,
1040 sort_criteria: impl IntoIterator<Item = SortCriterion> + Clone,
1041 search_criteria: impl IntoIterator<Item = SearchKey<'_>>,
1042 fetch_items: MacroOrMessageDataItemNames<'_>,
1043 ) -> Result<Vec<Vec1<MessageDataItem<'static>>>, ClientError> {
1044 self._sort_or_fallback(sort_criteria, search_criteria, fetch_items, false)
1045 .await
1046 }
1047
1048 pub async fn uid_sort_or_fallback(
1049 &mut self,
1050 sort_criteria: impl IntoIterator<Item = SortCriterion> + Clone,
1051 search_criteria: impl IntoIterator<Item = SearchKey<'_>>,
1052 fetch_items: MacroOrMessageDataItemNames<'_>,
1053 ) -> Result<Vec<Vec1<MessageDataItem<'static>>>, ClientError> {
1054 self._sort_or_fallback(sort_criteria, search_criteria, fetch_items, true)
1055 .await
1056 }
1057
1058 pub async fn appenduid_or_fallback(
1059 &mut self,
1060 mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError> + Clone,
1061 flags: impl IntoIterator<Item = Flag<'_>>,
1062 message: impl AsRef<[u8]>,
1063 ) -> Result<Option<NonZeroU32>, ClientError> {
1064 if self.state.ext_uidplus_supported() {
1065 Ok(self
1066 .appenduid(mailbox, flags, message)
1067 .await?
1068 .map(|(uid, _)| uid))
1069 } else {
1070 debug!("IMAP UIDPLUS extension not supported, using fallback");
1071
1072 self.select(mailbox.clone()).await?;
1081
1082 let seq = match self.append(mailbox, flags, message).await? {
1083 Some(seq) => seq,
1084 None => match self.post_append_noop().await? {
1085 Some(seq) => seq,
1086 None => self
1087 .post_append_check()
1088 .await?
1089 .ok_or(ClientError::ResolveTask(TaskError::MissingData(
1090 "APPENDUID: seq".into(),
1091 )))?,
1092 },
1093 };
1094
1095 let uid = self
1096 .search(Vec1::from(SearchKey::SequenceSet(seq.try_into().unwrap())))
1097 .await?
1098 .into_iter()
1099 .next();
1100
1101 Ok(uid)
1102 }
1103 }
1104
1105 async fn _move_or_fallback(
1106 &mut self,
1107 sequence_set: SequenceSet,
1108 mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
1109 uid: bool,
1110 ) -> Result<(), ClientError> {
1111 if self.state.ext_move_supported() {
1112 self._move(sequence_set, mailbox, uid).await
1113 } else {
1114 debug!("IMAP MOVE extension not supported, using fallback");
1115 self._copy(sequence_set.clone(), mailbox, uid).await?;
1116 self._silent_store(sequence_set, StoreType::Add, Some(Flag::Deleted), uid)
1117 .await?;
1118 self.expunge().await?;
1119 Ok(())
1120 }
1121 }
1122
1123 pub async fn move_or_fallback(
1124 &mut self,
1125 sequence_set: SequenceSet,
1126 mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
1127 ) -> Result<(), ClientError> {
1128 self._move_or_fallback(sequence_set, mailbox, false).await
1129 }
1130
1131 pub async fn uid_move_or_fallback(
1132 &mut self,
1133 sequence_set: SequenceSet,
1134 mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
1135 ) -> Result<(), ClientError> {
1136 self._move_or_fallback(sequence_set, mailbox, true).await
1137 }
1138
1139 pub fn enqueue_idle(&mut self) -> Tag<'static> {
1140 let tag = self.state.resolver.scheduler.tag_generator.generate();
1141
1142 self.state
1143 .resolver
1144 .scheduler
1145 .client_next
1146 .enqueue_command(Command {
1147 tag: tag.clone(),
1148 body: CommandBody::Idle,
1149 });
1150
1151 tag.into_static()
1152 }
1153
1154 #[tracing::instrument(name = "idle", skip_all)]
1155 pub async fn idle(&mut self, tag: Tag<'static>) -> Result<(), stream::Error<NextError>> {
1156 debug!("starting the main loop");
1157
1158 loop {
1159 let progress = self
1160 .stream
1161 .next(&mut self.state.resolver.scheduler.client_next);
1162 match timeout(self.state.idle_timeout, progress).await.ok() {
1163 None => {
1164 debug!("timed out, sending done command…");
1165 self.state.resolver.scheduler.client_next.set_idle_done();
1166 }
1167 Some(Err(err)) => {
1168 break Err(err);
1169 }
1170 Some(Ok(Event::IdleCommandSent { .. })) => {
1171 debug!("command sent");
1172 }
1173 Some(Ok(Event::IdleAccepted { .. })) => {
1174 debug!("command accepted, entering idle mode");
1175 }
1176 Some(Ok(Event::IdleRejected { status, .. })) => {
1177 debug!("command rejected, aborting: {status:?}");
1178 break Ok(());
1179 }
1180 Some(Ok(Event::IdleDoneSent { .. })) => {
1181 debug!("done command sent");
1182 }
1183 Some(Ok(Event::DataReceived { data })) => {
1184 debug!("received data, sending done command…");
1185 trace!("{data:#?}");
1186 self.state.resolver.scheduler.client_next.set_idle_done();
1187 }
1188 Some(Ok(Event::StatusReceived {
1189 status:
1190 Status::Tagged(Tagged {
1191 tag: ref got_tag, ..
1192 }),
1193 })) if *got_tag == tag => {
1194 debug!("received tagged response, exiting");
1195 break Ok(());
1196 }
1197 Some(event) => {
1198 debug!("received unknown event, ignoring: {event:?}");
1199 }
1200 }
1201 }
1202 }
1203
1204 #[tracing::instrument(name = "idle/done", skip_all)]
1205 pub async fn idle_done(&mut self, tag: Tag<'static>) -> Result<(), stream::Error<NextError>> {
1206 self.state.resolver.scheduler.client_next.set_idle_done();
1207
1208 loop {
1209 let progress = self
1210 .stream
1211 .next(&mut self.state.resolver.scheduler.client_next)
1212 .await?;
1213
1214 match progress {
1215 Event::IdleDoneSent { .. } => {
1216 debug!("done command sent");
1217 }
1218 Event::StatusReceived {
1219 status:
1220 Status::Tagged(Tagged {
1221 tag: ref got_tag, ..
1222 }),
1223 } if *got_tag == tag => {
1224 debug!("received tagged response, exiting");
1225 break Ok(());
1226 }
1227 event => {
1228 debug!("received unknown event, ignoring: {event:?}");
1229 }
1230 }
1231 }
1232 }
1233}
1234
1235pub(crate) fn cmp_fetch_items(
1236 criterion: &SortKey,
1237 a: &Vec1<MessageDataItem>,
1238 b: &Vec1<MessageDataItem>,
1239) -> Ordering {
1240 use MessageDataItem::*;
1241
1242 match &criterion {
1243 SortKey::Arrival => {
1244 let a = a.as_ref().iter().find_map(|a| {
1245 if let InternalDate(dt) = a {
1246 Some(dt.as_ref())
1247 } else {
1248 None
1249 }
1250 });
1251
1252 let b = b.as_ref().iter().find_map(|b| {
1253 if let InternalDate(dt) = b {
1254 Some(dt.as_ref())
1255 } else {
1256 None
1257 }
1258 });
1259
1260 a.cmp(&b)
1261 }
1262 SortKey::Date => {
1263 let a = a.as_ref().iter().find_map(|a| {
1264 if let Envelope(envelope) = a {
1265 envelope.date.0.as_ref().map(AsRef::as_ref)
1266 } else {
1267 None
1268 }
1269 });
1270
1271 let b = b.as_ref().iter().find_map(|b| {
1272 if let Envelope(envelope) = b {
1273 envelope.date.0.as_ref().map(AsRef::as_ref)
1274 } else {
1275 None
1276 }
1277 });
1278
1279 a.cmp(&b)
1280 }
1281 SortKey::Size => {
1282 let a = a.as_ref().iter().find_map(|a| {
1283 if let Rfc822Size(size) = a {
1284 Some(size)
1285 } else {
1286 None
1287 }
1288 });
1289
1290 let b = b.as_ref().iter().find_map(|b| {
1291 if let Rfc822Size(size) = b {
1292 Some(size)
1293 } else {
1294 None
1295 }
1296 });
1297
1298 a.cmp(&b)
1299 }
1300 SortKey::Subject => {
1301 let a = a.as_ref().iter().find_map(|a| {
1302 if let Envelope(envelope) = a {
1303 envelope.subject.0.as_ref().map(AsRef::as_ref)
1304 } else {
1305 None
1306 }
1307 });
1308
1309 let b = b.as_ref().iter().find_map(|b| {
1310 if let Envelope(envelope) = b {
1311 envelope.subject.0.as_ref().map(AsRef::as_ref)
1312 } else {
1313 None
1314 }
1315 });
1316
1317 a.cmp(&b)
1318 }
1319 SortKey::Cc | SortKey::From | SortKey::To | SortKey::DisplayFrom | SortKey::DisplayTo => {
1321 Ordering::Equal
1322 }
1323 }
1324}
1325
1326pub(crate) fn to_static_literal(
1327 message: impl AsRef<[u8]>,
1328 ext_binary_supported: bool,
1329) -> Result<LiteralOrLiteral8<'static>, ValidationError> {
1330 let message = if ext_binary_supported {
1331 LiteralOrLiteral8::Literal8(Literal8 {
1332 data: message.as_ref().into(),
1333 mode: LiteralMode::Sync,
1334 })
1335 } else {
1336 debug!("IMAP BINARY extension not supported, using fallback");
1337 Literal::validate(message.as_ref())?;
1338 LiteralOrLiteral8::Literal(Literal::unvalidated(message.as_ref()))
1339 };
1340
1341 Ok(message.into_static())
1342}