1use std::{
2 cmp::Ordering,
3 collections::HashMap,
4 io,
5 num::NonZeroU32,
6 pin::Pin,
7 task::{Context, Poll},
8};
9
10use imap_next::{
11 client::{Error as NextError, Event, Options as ClientOptions},
12 imap_types::{
13 command::{Command, CommandBody},
14 core::{AString, IString, Literal, LiteralMode, NString, QuotedChar, Tag, Vec1},
15 error::ValidationError,
16 extensions::{
17 binary::{Literal8, LiteralOrLiteral8},
18 enable::CapabilityEnable,
19 sort::{SortCriterion, SortKey},
20 thread::{Thread, ThreadingAlgorithm},
21 },
22 fetch::{MacroOrMessageDataItemNames, MessageDataItem, MessageDataItemName},
23 flag::{Flag, FlagNameAttribute, StoreType},
24 mailbox::{ListMailbox, Mailbox},
25 response::{Code, Status, Tagged},
26 search::SearchKey,
27 secret::Secret,
28 sequence::SequenceSet,
29 IntoStatic,
30 },
31};
32#[cfg(any(feature = "tokio-rustls", feature = "tokio-native-tls"))]
33use rip_starttls::imap::tokio::RipStarttls;
34use thiserror::Error;
35use tokio::{
36 io::{AsyncRead, AsyncWrite, ReadBuf},
37 net::TcpStream,
38 time::timeout,
39};
40use tracing::{debug, trace, warn};
41
42use crate::{
43 stream::{self, Stream},
44 tasks::{
45 tasks::{
46 append::{AppendTask, PostAppendCheckTask, PostAppendNoOpTask},
47 appenduid::AppendUidTask,
48 authenticate::AuthenticateTask,
49 capability::CapabilityTask,
50 check::CheckTask,
51 copy::CopyTask,
52 create::CreateTask,
53 delete::DeleteTask,
54 enable::EnableTask,
55 expunge::ExpungeTask,
56 fetch::{FetchFirstTask, FetchTask},
57 id::IdTask,
58 list::ListTask,
59 login::LoginTask,
60 noop::NoOpTask,
61 r#move::MoveTask,
62 search::SearchTask,
63 select::{SelectDataUnvalidated, SelectTask},
64 sort::SortTask,
65 store::StoreTask,
66 thread::ThreadTask,
67 TaskError,
68 },
69 SchedulerError, SchedulerEvent, Task,
70 },
71};
72
73static MAX_SEQUENCE_SIZE: u8 = u8::MAX; #[derive(Debug, Error)]
76pub enum ClientError {
77 #[error("cannot upgrade client to TLS: client is already in TLS state")]
78 ClientAlreadyTlsError,
79 #[error("cannot do STARTTLS prefix")]
80 DoStarttlsPrefixError(#[from] io::Error),
81 #[error("stream error")]
82 Stream(#[from] stream::Error<SchedulerError>),
83 #[error("validation error")]
84 Validation(#[from] ValidationError),
85 #[error("cannot connect to TCP stream")]
86 ConnectToTcpStreamError(#[source] io::Error),
87 #[error("cannot connect to TLS stream")]
88 ConnectToTlsStreamError(#[source] io::Error),
89
90 #[cfg(feature = "tokio-native-tls")]
91 #[error("cannot connect to native TLS stream")]
92 ConnectToNativeTlsStreamError(#[source] tokio_native_tls::native_tls::Error),
93 #[cfg(feature = "tokio-native-tls")]
94 #[error("cannot create native TLS connector")]
95 CreateNativeTlsConnectorError(#[source] tokio_native_tls::native_tls::Error),
96
97 #[error("cannot receive greeting from server")]
98 ReceiveGreeting(#[source] stream::Error<SchedulerError>),
99 #[error("cannot resolve IMAP task")]
100 ResolveTask(#[from] TaskError),
101}
102
103pub enum MaybeTlsStream {
104 Plain(TcpStream),
105 #[cfg(feature = "tokio-rustls")]
106 Rustls(tokio_rustls::client::TlsStream<TcpStream>),
107 #[cfg(feature = "tokio-native-tls")]
108 NativeTls(tokio_native_tls::TlsStream<TcpStream>),
109}
110
111impl AsyncRead for MaybeTlsStream {
112 fn poll_read(
113 self: Pin<&mut Self>,
114 cx: &mut Context<'_>,
115 buf: &mut ReadBuf<'_>,
116 ) -> Poll<io::Result<()>> {
117 match self.get_mut() {
118 Self::Plain(s) => Pin::new(s).poll_read(cx, buf),
119 #[cfg(feature = "tokio-rustls")]
120 Self::Rustls(s) => Pin::new(s).poll_read(cx, buf),
121 #[cfg(feature = "tokio-native-tls")]
122 Self::NativeTls(s) => Pin::new(s).poll_read(cx, buf),
123 }
124 }
125}
126
127impl AsyncWrite for MaybeTlsStream {
128 fn poll_write(
129 self: Pin<&mut Self>,
130 cx: &mut Context<'_>,
131 buf: &[u8],
132 ) -> Poll<io::Result<usize>> {
133 match self.get_mut() {
134 Self::Plain(s) => Pin::new(s).poll_write(cx, buf),
135 #[cfg(feature = "tokio-rustls")]
136 Self::Rustls(s) => Pin::new(s).poll_write(cx, buf),
137 #[cfg(feature = "tokio-native-tls")]
138 Self::NativeTls(s) => Pin::new(s).poll_write(cx, buf),
139 }
140 }
141
142 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
143 match self.get_mut() {
144 Self::Plain(s) => Pin::new(s).poll_flush(cx),
145 #[cfg(feature = "tokio-rustls")]
146 Self::Rustls(s) => Pin::new(s).poll_flush(cx),
147 #[cfg(feature = "tokio-native-tls")]
148 Self::NativeTls(s) => Pin::new(s).poll_flush(cx),
149 }
150 }
151
152 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
153 match self.get_mut() {
154 Self::Plain(s) => Pin::new(s).poll_shutdown(cx),
155 #[cfg(feature = "tokio-rustls")]
156 Self::Rustls(s) => Pin::new(s).poll_shutdown(cx),
157 #[cfg(feature = "tokio-native-tls")]
158 Self::NativeTls(s) => Pin::new(s).poll_shutdown(cx),
159 }
160 }
161}
162
163pub struct Client {
164 host: String,
165 pub state: super::Client,
166 pub stream: Stream<MaybeTlsStream>,
167}
168
169impl Client {
174 pub async fn insecure(host: impl ToString, port: u16) -> Result<Self, ClientError> {
180 let mut client = Self::tcp(host, port, false).await?;
181
182 if !client.receive_greeting().await? {
183 client.refresh_capabilities().await?;
184 }
185
186 Ok(client)
187 }
188
189 #[cfg(feature = "tokio-rustls")]
195 pub async fn rustls(
196 host: impl ToString,
197 port: u16,
198 starttls: bool,
199 ) -> Result<Self, ClientError> {
200 let tcp = Self::tcp(host, port, starttls).await?;
201 Self::upgrade_rustls(tcp, starttls).await
202 }
203
204 #[cfg(feature = "tokio-native-tls")]
210 pub async fn native_tls(
211 host: impl ToString,
212 port: u16,
213 starttls: bool,
214 ) -> Result<Self, ClientError> {
215 let tcp = Self::tcp(host, port, starttls).await?;
216 Self::upgrade_native_tls(tcp, starttls).await
217 }
218
219 async fn tcp(
224 host: impl ToString,
225 port: u16,
226 discard_greeting: bool,
227 ) -> Result<Self, ClientError> {
228 let host = host.to_string();
229
230 let tcp_stream = TcpStream::connect((host.as_str(), port))
231 .await
232 .map_err(ClientError::ConnectToTcpStreamError)?;
233
234 let stream = Stream::new(MaybeTlsStream::Plain(tcp_stream));
235
236 let mut opts = ClientOptions::default();
237 opts.crlf_relaxed = true;
238 opts.discard_greeting = discard_greeting;
239
240 let state = super::Client::new(opts);
241
242 Ok(Self {
243 host,
244 stream,
245 state,
246 })
247 }
248
249 #[cfg(feature = "tokio-rustls")]
259 async fn upgrade_rustls(mut self, starttls: bool) -> Result<Self, ClientError> {
260 use std::sync::Arc;
261
262 use rustls_platform_verifier::ConfigVerifierExt;
263 use tokio_rustls::{
264 rustls::{pki_types::ServerName, ClientConfig},
265 TlsConnector,
266 };
267
268 let MaybeTlsStream::Plain(mut tcp_stream) = self.stream.into_inner() else {
269 return Err(ClientError::ClientAlreadyTlsError);
270 };
271
272 if starttls {
273 tcp_stream = RipStarttls::default()
274 .do_starttls_prefix(tcp_stream)
275 .await
276 .map_err(ClientError::DoStarttlsPrefixError)?;
277 }
278
279 let mut config = ClientConfig::with_platform_verifier();
280
281 config.alpn_protocols = vec![b"imap".to_vec()];
283
284 let connector = TlsConnector::from(Arc::new(config));
285 let dnsname = ServerName::try_from(self.host.clone()).unwrap();
286
287 let tls_stream = connector
288 .connect(dnsname, tcp_stream)
289 .await
290 .map_err(ClientError::ConnectToTlsStreamError)?;
291
292 self.stream = Stream::new(MaybeTlsStream::Rustls(tls_stream));
293
294 if starttls || !self.receive_greeting().await? {
295 self.refresh_capabilities().await?;
296 }
297
298 Ok(self)
299 }
300
301 #[cfg(feature = "tokio-native-tls")]
311 async fn upgrade_native_tls(mut self, starttls: bool) -> Result<Self, ClientError> {
312 use tokio_native_tls::{native_tls, TlsConnector};
313
314 let MaybeTlsStream::Plain(mut tcp_stream) = self.stream.into_inner() else {
315 return Err(ClientError::ClientAlreadyTlsError);
316 };
317
318 if starttls {
319 tcp_stream = RipStarttls::default()
320 .do_starttls_prefix(tcp_stream)
321 .await
322 .map_err(ClientError::DoStarttlsPrefixError)?;
323 }
324
325 let connector =
326 native_tls::TlsConnector::new().map_err(ClientError::CreateNativeTlsConnectorError)?;
327 let connector = TlsConnector::from(connector);
328
329 let tls_stream = connector
330 .connect(&self.host, tcp_stream)
331 .await
332 .map_err(ClientError::ConnectToNativeTlsStreamError)?;
333
334 self.stream = Stream::new(MaybeTlsStream::NativeTls(tls_stream));
335
336 if starttls || !self.receive_greeting().await? {
337 self.refresh_capabilities().await?;
338 }
339
340 Ok(self)
341 }
342
343 async fn receive_greeting(&mut self) -> Result<bool, ClientError> {
350 let evt = self
351 .stream
352 .next(&mut self.state.resolver)
353 .await
354 .map_err(ClientError::ReceiveGreeting)?;
355
356 if let SchedulerEvent::GreetingReceived(greeting) = evt {
357 if let Some(Code::Capability(capabilities)) = greeting.code {
358 self.state.capabilities = capabilities;
359 return Ok(true);
360 }
361 }
362
363 Ok(false)
364 }
365}
366
367impl Client {
373 pub async fn resolve<T: Task>(&mut self, task: T) -> Result<T::Output, ClientError> {
375 Ok(self.stream.next(self.state.resolver.resolve(task)).await?)
376 }
377
378 pub async fn enable(
380 &mut self,
381 capabilities: impl IntoIterator<Item = CapabilityEnable<'_>>,
382 ) -> Result<Option<Vec<CapabilityEnable<'_>>>, ClientError> {
383 if !self.state.ext_enable_supported() {
384 warn!("IMAP ENABLE extension not supported, skipping");
385 return Ok(None);
386 }
387
388 let capabilities: Vec<_> = capabilities
389 .into_iter()
390 .map(IntoStatic::into_static)
391 .collect();
392
393 if capabilities.is_empty() {
394 return Ok(None);
395 }
396
397 let capabilities = Vec1::try_from(capabilities).unwrap();
398
399 Ok(self.resolve(EnableTask::new(capabilities)).await??)
400 }
401
402 pub async fn create(
404 &mut self,
405 mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
406 ) -> Result<(), ClientError> {
407 let mbox = mailbox.try_into()?.into_static();
408 Ok(self.resolve(CreateTask::new(mbox)).await??)
409 }
410
411 pub async fn list(
413 &mut self,
414 mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
415 mailbox_wildcard: impl TryInto<ListMailbox<'_>, Error = ValidationError>,
416 ) -> Result<
417 Vec<(
418 Mailbox<'static>,
419 Option<QuotedChar>,
420 Vec<FlagNameAttribute<'static>>,
421 )>,
422 ClientError,
423 > {
424 let mbox = mailbox.try_into()?.into_static();
425 let mbox_wcard = mailbox_wildcard.try_into()?.into_static();
426 Ok(self.resolve(ListTask::new(mbox, mbox_wcard)).await??)
427 }
428
429 pub async fn select(
431 &mut self,
432 mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
433 ) -> Result<SelectDataUnvalidated, ClientError> {
434 let mbox = mailbox.try_into()?.into_static();
435 Ok(self.resolve(SelectTask::new(mbox)).await??)
436 }
437
438 pub async fn examine(
440 &mut self,
441 mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
442 ) -> Result<SelectDataUnvalidated, ClientError> {
443 let mbox = mailbox.try_into()?.into_static();
444 Ok(self.resolve(SelectTask::read_only(mbox)).await??)
445 }
446
447 pub async fn expunge(&mut self) -> Result<Vec<NonZeroU32>, ClientError> {
452 Ok(self.resolve(ExpungeTask::new()).await??)
453 }
454
455 pub async fn delete(
457 &mut self,
458 mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
459 ) -> Result<(), ClientError> {
460 let mbox = mailbox.try_into()?.into_static();
461 Ok(self.resolve(DeleteTask::new(mbox)).await??)
462 }
463
464 async fn _search(
466 &mut self,
467 criteria: impl IntoIterator<Item = SearchKey<'_>>,
468 uid: bool,
469 ) -> Result<Vec<NonZeroU32>, ClientError> {
470 let criteria: Vec<_> = criteria.into_iter().map(IntoStatic::into_static).collect();
471
472 let criteria = if criteria.is_empty() {
473 Vec1::from(SearchKey::All)
474 } else {
475 Vec1::try_from(criteria).unwrap()
476 };
477
478 Ok(self
479 .resolve(SearchTask::new(criteria).with_uid(uid))
480 .await??)
481 }
482
483 pub async fn search(
488 &mut self,
489 criteria: impl IntoIterator<Item = SearchKey<'_>>,
490 ) -> Result<Vec<NonZeroU32>, ClientError> {
491 self._search(criteria, false).await
492 }
493
494 pub async fn uid_search(
499 &mut self,
500 criteria: impl IntoIterator<Item = SearchKey<'_>>,
501 ) -> Result<Vec<NonZeroU32>, ClientError> {
502 self._search(criteria, true).await
503 }
504
505 async fn _sort(
508 &mut self,
509 sort_criteria: impl IntoIterator<Item = SortCriterion>,
510 search_criteria: impl IntoIterator<Item = SearchKey<'_>>,
511 uid: bool,
512 ) -> Result<Vec<NonZeroU32>, ClientError> {
513 let sort: Vec<_> = sort_criteria.into_iter().collect();
514 let sort = if sort.is_empty() {
515 Vec1::from(SortCriterion {
516 reverse: true,
517 key: SortKey::Date,
518 })
519 } else {
520 Vec1::try_from(sort).unwrap()
521 };
522
523 let search: Vec<_> = search_criteria
524 .into_iter()
525 .map(IntoStatic::into_static)
526 .collect();
527 let search = if search.is_empty() {
528 Vec1::from(SearchKey::All)
529 } else {
530 Vec1::try_from(search).unwrap()
531 };
532
533 Ok(self
534 .resolve(SortTask::new(sort, search).with_uid(uid))
535 .await??)
536 }
537
538 pub async fn sort(
544 &mut self,
545 sort_criteria: impl IntoIterator<Item = SortCriterion>,
546 search_criteria: impl IntoIterator<Item = SearchKey<'_>>,
547 ) -> Result<Vec<NonZeroU32>, ClientError> {
548 self._sort(sort_criteria, search_criteria, false).await
549 }
550
551 pub async fn uid_sort(
557 &mut self,
558 sort_criteria: impl IntoIterator<Item = SortCriterion>,
559 search_criteria: impl IntoIterator<Item = SearchKey<'_>>,
560 ) -> Result<Vec<NonZeroU32>, ClientError> {
561 self._sort(sort_criteria, search_criteria, true).await
562 }
563
564 async fn _thread(
565 &mut self,
566 algorithm: ThreadingAlgorithm<'_>,
567 search_criteria: impl IntoIterator<Item = SearchKey<'_>>,
568 uid: bool,
569 ) -> Result<Vec<Thread>, ClientError> {
570 let alg = algorithm.into_static();
571
572 let search: Vec<_> = search_criteria
573 .into_iter()
574 .map(IntoStatic::into_static)
575 .collect();
576 let search = if search.is_empty() {
577 Vec1::from(SearchKey::All)
578 } else {
579 Vec1::try_from(search).unwrap()
580 };
581
582 Ok(self
583 .resolve(ThreadTask::new(alg, search).with_uid(uid))
584 .await??)
585 }
586
587 pub async fn thread(
588 &mut self,
589 algorithm: ThreadingAlgorithm<'_>,
590 search_criteria: impl IntoIterator<Item = SearchKey<'_>>,
591 ) -> Result<Vec<Thread>, ClientError> {
592 self._thread(algorithm, search_criteria, false).await
593 }
594
595 pub async fn uid_thread(
596 &mut self,
597 algorithm: ThreadingAlgorithm<'_>,
598 search_criteria: impl IntoIterator<Item = SearchKey<'_>>,
599 ) -> Result<Vec<Thread>, ClientError> {
600 self._thread(algorithm, search_criteria, true).await
601 }
602
603 async fn _store(
604 &mut self,
605 sequence_set: SequenceSet,
606 kind: StoreType,
607 flags: impl IntoIterator<Item = Flag<'_>>,
608 uid: bool,
609 ) -> Result<HashMap<NonZeroU32, Vec1<MessageDataItem<'static>>>, ClientError> {
610 let flags: Vec<_> = flags.into_iter().map(IntoStatic::into_static).collect();
611
612 Ok(self
613 .resolve(StoreTask::new(sequence_set, kind, flags).with_uid(uid))
614 .await??)
615 }
616
617 pub async fn store(
618 &mut self,
619 sequence_set: SequenceSet,
620 kind: StoreType,
621 flags: impl IntoIterator<Item = Flag<'_>>,
622 ) -> Result<HashMap<NonZeroU32, Vec1<MessageDataItem<'static>>>, ClientError> {
623 self._store(sequence_set, kind, flags, false).await
624 }
625
626 pub async fn uid_store(
627 &mut self,
628 sequence_set: SequenceSet,
629 kind: StoreType,
630 flags: impl IntoIterator<Item = Flag<'_>>,
631 ) -> Result<HashMap<NonZeroU32, Vec1<MessageDataItem<'static>>>, ClientError> {
632 self._store(sequence_set, kind, flags, true).await
633 }
634
635 async fn _silent_store(
636 &mut self,
637 sequence_set: SequenceSet,
638 kind: StoreType,
639 flags: impl IntoIterator<Item = Flag<'_>>,
640 uid: bool,
641 ) -> Result<(), ClientError> {
642 let flags: Vec<_> = flags.into_iter().map(IntoStatic::into_static).collect();
643
644 let task = StoreTask::new(sequence_set, kind, flags)
645 .with_uid(uid)
646 .silent();
647
648 Ok(self.resolve(task).await??)
649 }
650
651 pub async fn silent_store(
652 &mut self,
653 sequence_set: SequenceSet,
654 kind: StoreType,
655 flags: impl IntoIterator<Item = Flag<'_>>,
656 ) -> Result<(), ClientError> {
657 self._silent_store(sequence_set, kind, flags, false).await
658 }
659
660 pub async fn uid_silent_store(
661 &mut self,
662 sequence_set: SequenceSet,
663 kind: StoreType,
664 flags: impl IntoIterator<Item = Flag<'_>>,
665 ) -> Result<(), ClientError> {
666 self._silent_store(sequence_set, kind, flags, true).await
667 }
668
669 pub async fn post_append_noop(&mut self) -> Result<Option<u32>, ClientError> {
670 Ok(self.resolve(PostAppendNoOpTask::new()).await??)
671 }
672
673 pub async fn post_append_check(&mut self) -> Result<Option<u32>, ClientError> {
674 Ok(self.resolve(PostAppendCheckTask::new()).await??)
675 }
676
677 async fn _fetch_first(
678 &mut self,
679 id: NonZeroU32,
680 items: MacroOrMessageDataItemNames<'_>,
681 uid: bool,
682 ) -> Result<Vec1<MessageDataItem<'static>>, ClientError> {
683 let items = items.into_static();
684
685 Ok(self
686 .resolve(FetchFirstTask::new(id, items).with_uid(uid))
687 .await??)
688 }
689
690 pub async fn fetch_first(
691 &mut self,
692 id: NonZeroU32,
693 items: MacroOrMessageDataItemNames<'_>,
694 ) -> Result<Vec1<MessageDataItem<'static>>, ClientError> {
695 self._fetch_first(id, items, false).await
696 }
697
698 pub async fn uid_fetch_first(
699 &mut self,
700 id: NonZeroU32,
701 items: MacroOrMessageDataItemNames<'_>,
702 ) -> Result<Vec1<MessageDataItem<'static>>, ClientError> {
703 self._fetch_first(id, items, true).await
704 }
705
706 async fn _copy(
707 &mut self,
708 sequence_set: SequenceSet,
709 mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
710 uid: bool,
711 ) -> Result<(), ClientError> {
712 let mbox = mailbox.try_into()?.into_static();
713
714 Ok(self
715 .resolve(CopyTask::new(sequence_set, mbox).with_uid(uid))
716 .await??)
717 }
718
719 pub async fn copy(
720 &mut self,
721 sequence_set: SequenceSet,
722 mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
723 ) -> Result<(), ClientError> {
724 self._copy(sequence_set, mailbox, false).await
725 }
726
727 pub async fn uid_copy(
728 &mut self,
729 sequence_set: SequenceSet,
730 mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
731 ) -> Result<(), ClientError> {
732 self._copy(sequence_set, mailbox, true).await
733 }
734
735 async fn _move(
736 &mut self,
737 sequence_set: SequenceSet,
738 mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
739 uid: bool,
740 ) -> Result<(), ClientError> {
741 let mbox = mailbox.try_into()?.into_static();
742
743 Ok(self
744 .resolve(MoveTask::new(sequence_set, mbox).with_uid(uid))
745 .await??)
746 }
747
748 pub async fn r#move(
749 &mut self,
750 sequence_set: SequenceSet,
751 mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
752 ) -> Result<(), ClientError> {
753 self._move(sequence_set, mailbox, false).await
754 }
755
756 pub async fn uid_move(
757 &mut self,
758 sequence_set: SequenceSet,
759 mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
760 ) -> Result<(), ClientError> {
761 self._move(sequence_set, mailbox, true).await
762 }
763
764 pub async fn check(&mut self) -> Result<(), ClientError> {
766 Ok(self.resolve(CheckTask::new()).await??)
767 }
768
769 pub async fn noop(&mut self) -> Result<(), ClientError> {
771 Ok(self.resolve(NoOpTask::new()).await??)
772 }
773}
774
775impl Client {
782 pub async fn refresh_capabilities(&mut self) -> Result<(), ClientError> {
784 self.state.capabilities = self.resolve(CapabilityTask::new()).await??;
785
786 Ok(())
787 }
788
789 pub async fn login(
791 &mut self,
792 username: impl TryInto<AString<'_>, Error = ValidationError>,
793 password: impl TryInto<AString<'_>, Error = ValidationError>,
794 ) -> Result<(), ClientError> {
795 let username = username.try_into()?.into_static();
796 let password = password.try_into()?.into_static();
797 let login = self.resolve(LoginTask::new(username, Secret::new(password)));
798
799 match login.await?? {
800 Some(capabilities) => {
801 self.state.capabilities = capabilities;
802 }
803 None => {
804 self.refresh_capabilities().await?;
805 }
806 };
807
808 Ok(())
809 }
810
811 async fn authenticate(&mut self, task: AuthenticateTask) -> Result<(), ClientError> {
816 match self.resolve(task).await?? {
817 Some(capabilities) => {
818 self.state.capabilities = capabilities;
819 }
820 None => {
821 self.refresh_capabilities().await?;
822 }
823 };
824
825 Ok(())
826 }
827
828 pub async fn authenticate_plain(
830 &mut self,
831 login: impl AsRef<str>,
832 password: impl AsRef<str>,
833 ) -> Result<(), ClientError> {
834 self.authenticate(AuthenticateTask::plain(
835 login.as_ref(),
836 password.as_ref(),
837 self.state.ext_sasl_ir_supported(),
838 ))
839 .await
840 }
841
842 pub async fn authenticate_xoauth2(
844 &mut self,
845 login: impl AsRef<str>,
846 token: impl AsRef<str>,
847 ) -> Result<(), ClientError> {
848 self.authenticate(AuthenticateTask::xoauth2(
849 login.as_ref(),
850 token.as_ref(),
851 self.state.ext_sasl_ir_supported(),
852 ))
853 .await
854 }
855
856 pub async fn authenticate_oauthbearer(
858 &mut self,
859 user: impl AsRef<str>,
860 host: impl AsRef<str>,
861 port: u16,
862 token: impl AsRef<str>,
863 ) -> Result<(), ClientError> {
864 self.authenticate(AuthenticateTask::oauthbearer(
865 user.as_ref(),
866 host.as_ref(),
867 port,
868 token.as_ref(),
869 self.state.ext_sasl_ir_supported(),
870 ))
871 .await
872 }
873
874 pub async fn id(
879 &mut self,
880 params: Option<Vec<(IString<'static>, NString<'static>)>>,
881 ) -> Result<Option<Vec<(IString<'static>, NString<'static>)>>, ClientError> {
882 Ok(if self.state.ext_id_supported() {
883 self.resolve(IdTask::new(params)).await??
884 } else {
885 warn!("IMAP ID extension not supported, skipping");
886 None
887 })
888 }
889
890 pub async fn append(
891 &mut self,
892 mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
893 flags: impl IntoIterator<Item = Flag<'_>>,
894 message: impl AsRef<[u8]>,
895 ) -> Result<Option<u32>, ClientError> {
896 let mbox = mailbox.try_into()?.into_static();
897
898 let flags: Vec<_> = flags.into_iter().map(IntoStatic::into_static).collect();
899
900 let msg = to_static_literal(message, self.state.ext_binary_supported())?;
901
902 Ok(self
903 .resolve(AppendTask::new(mbox, msg).with_flags(flags))
904 .await??)
905 }
906
907 pub async fn appenduid(
908 &mut self,
909 mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
910 flags: impl IntoIterator<Item = Flag<'_>>,
911 message: impl AsRef<[u8]>,
912 ) -> Result<Option<(NonZeroU32, NonZeroU32)>, ClientError> {
913 let mbox = mailbox.try_into()?.into_static();
914
915 let flags: Vec<_> = flags.into_iter().map(IntoStatic::into_static).collect();
916
917 let msg = to_static_literal(message, self.state.ext_binary_supported())?;
918
919 Ok(self
920 .resolve(AppendUidTask::new(mbox, msg).with_flags(flags))
921 .await??)
922 }
923}
924
925impl Client {
932 async fn _fetch(
933 &mut self,
934 sequence_set: SequenceSet,
935 items: MacroOrMessageDataItemNames<'_>,
936 uid: bool,
937 ) -> Result<HashMap<NonZeroU32, Vec1<MessageDataItem<'static>>>, ClientError> {
938 let mut items = match items {
939 MacroOrMessageDataItemNames::Macro(m) => m.expand().into_static(),
940 MacroOrMessageDataItemNames::MessageDataItemNames(items) => items.into_static(),
941 };
942
943 if uid {
944 items.push(MessageDataItemName::Uid);
945 }
946
947 let seq_map = self
948 .resolve(FetchTask::new(sequence_set, items.into()).with_uid(uid))
949 .await??;
950
951 if uid {
952 let mut uid_map = HashMap::new();
953
954 for (seq, items) in seq_map {
955 let uid = items.as_ref().iter().find_map(|item| {
956 if let MessageDataItem::Uid(uid) = item {
957 Some(*uid)
958 } else {
959 None
960 }
961 });
962
963 match uid {
964 Some(uid) => {
965 uid_map.insert(uid, items);
966 }
967 None => {
968 warn!(?seq, "cannot get message uid, skipping it");
969 }
970 }
971 }
972
973 Ok(uid_map)
974 } else {
975 Ok(seq_map)
976 }
977 }
978
979 pub async fn fetch(
980 &mut self,
981 sequence_set: SequenceSet,
982 items: MacroOrMessageDataItemNames<'_>,
983 ) -> Result<HashMap<NonZeroU32, Vec1<MessageDataItem<'static>>>, ClientError> {
984 self._fetch(sequence_set, items, false).await
985 }
986
987 pub async fn uid_fetch(
988 &mut self,
989 sequence_set: SequenceSet,
990 items: MacroOrMessageDataItemNames<'_>,
991 ) -> Result<HashMap<NonZeroU32, Vec1<MessageDataItem<'static>>>, ClientError> {
992 self._fetch(sequence_set, items, true).await
993 }
994
995 async fn _sort_or_fallback(
996 &mut self,
997 sort_criteria: impl IntoIterator<Item = SortCriterion> + Clone,
998 search_criteria: impl IntoIterator<Item = SearchKey<'_>>,
999 fetch_items: MacroOrMessageDataItemNames<'_>,
1000 uid: bool,
1001 ) -> Result<Vec<Vec1<MessageDataItem<'static>>>, ClientError> {
1002 let mut fetch_items = match fetch_items {
1003 MacroOrMessageDataItemNames::Macro(m) => m.expand().into_static(),
1004 MacroOrMessageDataItemNames::MessageDataItemNames(items) => items,
1005 };
1006
1007 if uid && !fetch_items.contains(&MessageDataItemName::Uid) {
1008 fetch_items.push(MessageDataItemName::Uid);
1009 }
1010
1011 let mut fetches = HashMap::new();
1012
1013 if self.state.ext_sort_supported() {
1014 let fetch_items = MacroOrMessageDataItemNames::MessageDataItemNames(fetch_items);
1015 let ids = self._sort(sort_criteria, search_criteria, uid).await?;
1016 let ids_chunks = ids.chunks(MAX_SEQUENCE_SIZE as usize);
1017 let ids_chunks_len = ids_chunks.len();
1018
1019 for (n, ids) in ids_chunks.enumerate() {
1020 debug!(?ids, "fetching sort envelopes {}/{ids_chunks_len}", n + 1);
1021 let ids = SequenceSet::try_from(ids.to_vec())?;
1022 let items = fetch_items.clone();
1023 fetches.extend(self._fetch(ids, items, uid).await?);
1024 }
1025
1026 let items = ids.into_iter().flat_map(|id| fetches.remove(&id)).collect();
1027
1028 Ok(items)
1029 } else {
1030 warn!("IMAP SORT extension not supported, using fallback");
1031
1032 let ids = self._search(search_criteria, uid).await?;
1033 let ids_chunks = ids.chunks(MAX_SEQUENCE_SIZE as usize);
1034 let ids_chunks_len = ids_chunks.len();
1035
1036 sort_criteria
1037 .clone()
1038 .into_iter()
1039 .filter_map(|criterion| match criterion.key {
1040 SortKey::Arrival => Some(MessageDataItemName::InternalDate),
1041 SortKey::Cc => Some(MessageDataItemName::Envelope),
1042 SortKey::Date => Some(MessageDataItemName::Envelope),
1043 SortKey::From => Some(MessageDataItemName::Envelope),
1044 SortKey::Size => Some(MessageDataItemName::Rfc822Size),
1045 SortKey::Subject => Some(MessageDataItemName::Envelope),
1046 SortKey::To => Some(MessageDataItemName::Envelope),
1047 SortKey::DisplayFrom => None,
1048 SortKey::DisplayTo => None,
1049 })
1050 .for_each(|item| {
1051 if !fetch_items.contains(&item) {
1052 fetch_items.push(item)
1053 }
1054 });
1055
1056 for (n, ids) in ids_chunks.enumerate() {
1057 debug!(?ids, "fetching search envelopes {}/{ids_chunks_len}", n + 1);
1058 let ids = SequenceSet::try_from(ids.to_vec())?;
1059 let items = fetch_items.clone();
1060 fetches.extend(self._fetch(ids, items.into(), uid).await?);
1061 }
1062
1063 let mut fetches: Vec<_> = fetches.into_values().collect();
1064
1065 fetches.sort_by(|a, b| {
1066 for criterion in sort_criteria.clone().into_iter() {
1067 let mut cmp = cmp_fetch_items(&criterion.key, a, b);
1068
1069 if criterion.reverse {
1070 cmp = cmp.reverse();
1071 }
1072
1073 if cmp.is_ne() {
1074 return cmp;
1075 }
1076 }
1077
1078 cmp_fetch_items(&SortKey::Date, a, b)
1079 });
1080
1081 Ok(fetches)
1082 }
1083 }
1084
1085 pub async fn sort_or_fallback(
1086 &mut self,
1087 sort_criteria: impl IntoIterator<Item = SortCriterion> + Clone,
1088 search_criteria: impl IntoIterator<Item = SearchKey<'_>>,
1089 fetch_items: MacroOrMessageDataItemNames<'_>,
1090 ) -> Result<Vec<Vec1<MessageDataItem<'static>>>, ClientError> {
1091 self._sort_or_fallback(sort_criteria, search_criteria, fetch_items, false)
1092 .await
1093 }
1094
1095 pub async fn uid_sort_or_fallback(
1096 &mut self,
1097 sort_criteria: impl IntoIterator<Item = SortCriterion> + Clone,
1098 search_criteria: impl IntoIterator<Item = SearchKey<'_>>,
1099 fetch_items: MacroOrMessageDataItemNames<'_>,
1100 ) -> Result<Vec<Vec1<MessageDataItem<'static>>>, ClientError> {
1101 self._sort_or_fallback(sort_criteria, search_criteria, fetch_items, true)
1102 .await
1103 }
1104
1105 pub async fn appenduid_or_fallback(
1106 &mut self,
1107 mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError> + Clone,
1108 flags: impl IntoIterator<Item = Flag<'_>>,
1109 message: impl AsRef<[u8]>,
1110 ) -> Result<Option<NonZeroU32>, ClientError> {
1111 if self.state.ext_uidplus_supported() {
1112 Ok(self
1113 .appenduid(mailbox, flags, message)
1114 .await?
1115 .map(|(uid, _)| uid))
1116 } else {
1117 warn!("IMAP UIDPLUS extension not supported, using fallback");
1118
1119 self.select(mailbox.clone()).await?;
1128
1129 let seq = match self.append(mailbox, flags, message).await? {
1130 Some(seq) => seq,
1131 None => match self.post_append_noop().await? {
1132 Some(seq) => seq,
1133 None => self
1134 .post_append_check()
1135 .await?
1136 .ok_or(ClientError::ResolveTask(TaskError::MissingData(
1137 "APPENDUID: seq".into(),
1138 )))?,
1139 },
1140 };
1141
1142 let uid = self
1143 .search(Vec1::from(SearchKey::SequenceSet(seq.try_into().unwrap())))
1144 .await?
1145 .into_iter()
1146 .next();
1147
1148 Ok(uid)
1149 }
1150 }
1151
1152 async fn _move_or_fallback(
1153 &mut self,
1154 sequence_set: SequenceSet,
1155 mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
1156 uid: bool,
1157 ) -> Result<(), ClientError> {
1158 if self.state.ext_move_supported() {
1159 self._move(sequence_set, mailbox, uid).await
1160 } else {
1161 warn!("IMAP MOVE extension not supported, using fallback");
1162 self._copy(sequence_set.clone(), mailbox, uid).await?;
1163 self._silent_store(sequence_set, StoreType::Add, Some(Flag::Deleted), uid)
1164 .await?;
1165 self.expunge().await?;
1166 Ok(())
1167 }
1168 }
1169
1170 pub async fn move_or_fallback(
1171 &mut self,
1172 sequence_set: SequenceSet,
1173 mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
1174 ) -> Result<(), ClientError> {
1175 self._move_or_fallback(sequence_set, mailbox, false).await
1176 }
1177
1178 pub async fn uid_move_or_fallback(
1179 &mut self,
1180 sequence_set: SequenceSet,
1181 mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
1182 ) -> Result<(), ClientError> {
1183 self._move_or_fallback(sequence_set, mailbox, true).await
1184 }
1185
1186 pub fn enqueue_idle(&mut self) -> Tag<'static> {
1187 let tag = self.state.resolver.scheduler.tag_generator.generate();
1188
1189 self.state
1190 .resolver
1191 .scheduler
1192 .client_next
1193 .enqueue_command(Command {
1194 tag: tag.clone(),
1195 body: CommandBody::Idle,
1196 });
1197
1198 tag.into_static()
1199 }
1200
1201 #[tracing::instrument(name = "idle", skip_all)]
1202 pub async fn idle(&mut self, tag: Tag<'static>) -> Result<(), stream::Error<NextError>> {
1203 debug!("starting the main loop");
1204
1205 loop {
1206 let progress = self
1207 .stream
1208 .next(&mut self.state.resolver.scheduler.client_next);
1209 match timeout(self.state.idle_timeout, progress).await.ok() {
1210 None => {
1211 debug!("timed out, sending done command…");
1212 self.state.resolver.scheduler.client_next.set_idle_done();
1213 }
1214 Some(Err(err)) => {
1215 break Err(err);
1216 }
1217 Some(Ok(Event::IdleCommandSent { .. })) => {
1218 debug!("command sent");
1219 }
1220 Some(Ok(Event::IdleAccepted { .. })) => {
1221 debug!("command accepted, entering idle mode");
1222 }
1223 Some(Ok(Event::IdleRejected { status, .. })) => {
1224 warn!("command rejected, aborting: {status:?}");
1225 break Ok(());
1226 }
1227 Some(Ok(Event::IdleDoneSent { .. })) => {
1228 debug!("done command sent");
1229 }
1230 Some(Ok(Event::DataReceived { data })) => {
1231 debug!("received data, sending done command…");
1232 trace!("{data:#?}");
1233 self.state.resolver.scheduler.client_next.set_idle_done();
1234 }
1235 Some(Ok(Event::StatusReceived {
1236 status:
1237 Status::Tagged(Tagged {
1238 tag: ref got_tag, ..
1239 }),
1240 })) if *got_tag == tag => {
1241 debug!("received tagged response, exiting");
1242 break Ok(());
1243 }
1244 Some(event) => {
1245 debug!("received unknown event, ignoring: {event:?}");
1246 }
1247 }
1248 }
1249 }
1250
1251 #[tracing::instrument(name = "idle/done", skip_all)]
1252 pub async fn idle_done(&mut self, tag: Tag<'static>) -> Result<(), stream::Error<NextError>> {
1253 self.state.resolver.scheduler.client_next.set_idle_done();
1254
1255 loop {
1256 let progress = self
1257 .stream
1258 .next(&mut self.state.resolver.scheduler.client_next)
1259 .await?;
1260
1261 match progress {
1262 Event::IdleDoneSent { .. } => {
1263 debug!("done command sent");
1264 }
1265 Event::StatusReceived {
1266 status:
1267 Status::Tagged(Tagged {
1268 tag: ref got_tag, ..
1269 }),
1270 } if *got_tag == tag => {
1271 debug!("received tagged response, exiting");
1272 break Ok(());
1273 }
1274 event => {
1275 debug!("received unknown event, ignoring: {event:?}");
1276 }
1277 }
1278 }
1279 }
1280}
1281
1282pub(crate) fn cmp_fetch_items(
1283 criterion: &SortKey,
1284 a: &Vec1<MessageDataItem>,
1285 b: &Vec1<MessageDataItem>,
1286) -> Ordering {
1287 use MessageDataItem::*;
1288
1289 match &criterion {
1290 SortKey::Arrival => {
1291 let a = a.as_ref().iter().find_map(|a| {
1292 if let InternalDate(dt) = a {
1293 Some(dt.as_ref())
1294 } else {
1295 None
1296 }
1297 });
1298
1299 let b = b.as_ref().iter().find_map(|b| {
1300 if let InternalDate(dt) = b {
1301 Some(dt.as_ref())
1302 } else {
1303 None
1304 }
1305 });
1306
1307 a.cmp(&b)
1308 }
1309 SortKey::Date => {
1310 let a = a.as_ref().iter().find_map(|a| {
1311 if let Envelope(envelope) = a {
1312 envelope.date.0.as_ref().map(AsRef::as_ref)
1313 } else {
1314 None
1315 }
1316 });
1317
1318 let b = b.as_ref().iter().find_map(|b| {
1319 if let Envelope(envelope) = b {
1320 envelope.date.0.as_ref().map(AsRef::as_ref)
1321 } else {
1322 None
1323 }
1324 });
1325
1326 a.cmp(&b)
1327 }
1328 SortKey::Size => {
1329 let a = a.as_ref().iter().find_map(|a| {
1330 if let Rfc822Size(size) = a {
1331 Some(size)
1332 } else {
1333 None
1334 }
1335 });
1336
1337 let b = b.as_ref().iter().find_map(|b| {
1338 if let Rfc822Size(size) = b {
1339 Some(size)
1340 } else {
1341 None
1342 }
1343 });
1344
1345 a.cmp(&b)
1346 }
1347 SortKey::Subject => {
1348 let a = a.as_ref().iter().find_map(|a| {
1349 if let Envelope(envelope) = a {
1350 envelope.subject.0.as_ref().map(AsRef::as_ref)
1351 } else {
1352 None
1353 }
1354 });
1355
1356 let b = b.as_ref().iter().find_map(|b| {
1357 if let Envelope(envelope) = b {
1358 envelope.subject.0.as_ref().map(AsRef::as_ref)
1359 } else {
1360 None
1361 }
1362 });
1363
1364 a.cmp(&b)
1365 }
1366 SortKey::Cc | SortKey::From | SortKey::To | SortKey::DisplayFrom | SortKey::DisplayTo => {
1368 Ordering::Equal
1369 }
1370 }
1371}
1372
1373pub(crate) fn to_static_literal(
1374 message: impl AsRef<[u8]>,
1375 ext_binary_supported: bool,
1376) -> Result<LiteralOrLiteral8<'static>, ValidationError> {
1377 let message = if ext_binary_supported {
1378 LiteralOrLiteral8::Literal8(Literal8 {
1379 data: message.as_ref().into(),
1380 mode: LiteralMode::Sync,
1381 })
1382 } else {
1383 warn!("IMAP BINARY extension not supported, using fallback");
1384 Literal::validate(message.as_ref())?;
1385 LiteralOrLiteral8::Literal(Literal::unvalidated(message.as_ref()))
1386 };
1387
1388 Ok(message.into_static())
1389}