1use crate::types::{
8 Flag, MailboxAttribute, MailboxName, NotifySetParams, SequenceSet, StoreOperation,
9};
10
11use std::any::Any;
16use std::marker::PhantomData;
17
18use crate::error::Error;
19use crate::types::response::{
20 AclEntry, EsearchResponse, ListRightsResponse, MetadataResult, NamespaceResponse,
21 QuotaResource, QuotaRootResponse, ThreadNode, UidRange,
22};
23use crate::types::validated::ParsedUidSet;
24use crate::types::{
25 Capability, Command, CopyResult, ExpungeResult, FetchResponse, MoveResult, StatusItem,
26 StatusResult, StoreResult,
27};
28
29use super::dispatch;
30use super::driver::ConsumerErased;
31use super::MailboxInfo;
32use super::SearchResult;
33
34#[derive(Debug)]
45pub enum PipelineError {
46 Disconnected,
48 Driver(Error),
52 TypeMismatch {
56 index: usize,
59 },
60}
61
62impl std::fmt::Display for PipelineError {
63 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64 match self {
65 Self::Disconnected => write!(f, "pipeline: driver task disconnected"),
66 Self::Driver(e) => write!(f, "pipeline: driver error: {e}"),
67 Self::TypeMismatch { index } => {
68 write!(f, "pipeline: type mismatch at command index {index}")
69 }
70 }
71 }
72}
73
74impl std::error::Error for PipelineError {
75 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
76 match self {
77 Self::Driver(e) => Some(e),
78 _ => None,
79 }
80 }
81}
82
83pub trait UnfoldTuple {
98 type Output;
100
101 fn unfold(
107 results: Vec<Result<Box<dyn Any + Send>, Error>>,
108 ) -> Result<Self::Output, PipelineError>;
109}
110
111fn downcast_result<T: Any + Send>(
113 result: Result<Box<dyn Any + Send>, Error>,
114 index: usize,
115) -> Result<Result<T, Error>, PipelineError> {
116 match result {
117 Ok(boxed) => match boxed.downcast::<T>() {
118 Ok(val) => Ok(Ok(*val)),
119 Err(_) => Err(PipelineError::TypeMismatch { index }),
120 },
121 Err(e) => Ok(Err(e)),
122 }
123}
124
125impl UnfoldTuple for () {
127 type Output = ();
128 fn unfold(results: Vec<Result<Box<dyn Any + Send>, Error>>) -> Result<(), PipelineError> {
129 if !results.is_empty() {
130 return Err(PipelineError::TypeMismatch { index: 0 });
131 }
132 Ok(())
133 }
134}
135
136macro_rules! impl_unfold_tuple {
141 ($nested:ty, $count:literal, [$($idx:literal : $T:ident),+ $(,)?]) => {
142 impl<$($T: Any + Send),+> UnfoldTuple for $nested {
143 type Output = ($(Result<$T, Error>,)+);
144
145 fn unfold(
146 results: Vec<Result<Box<dyn Any + Send>, Error>>,
147 ) -> Result<Self::Output, PipelineError> {
148 if results.len() != $count {
149 return Err(PipelineError::TypeMismatch { index: 0 });
150 }
151 let mut iter = results.into_iter();
152 Ok(($(
153 downcast_result::<$T>(
154 iter.next().ok_or(PipelineError::TypeMismatch { index: $idx })?,
155 $idx,
156 )?,
157 )+))
158 }
159 }
160 };
161}
162
163impl_unfold_tuple!((A, ()), 1, [0: A]);
165impl_unfold_tuple!((B, (A, ())), 2, [0: A, 1: B]);
167impl_unfold_tuple!((C, (B, (A, ()))), 3, [0: A, 1: B, 2: C]);
169impl_unfold_tuple!((D, (C, (B, (A, ())))), 4, [0: A, 1: B, 2: C, 3: D]);
171impl_unfold_tuple!((E, (D, (C, (B, (A, ()))))), 5, [0: A, 1: B, 2: C, 3: D, 4: E]);
173impl_unfold_tuple!((F, (E, (D, (C, (B, (A, ())))))), 6, [0: A, 1: B, 2: C, 3: D, 4: E, 5: F]);
175impl_unfold_tuple!(
177 (G, (F, (E, (D, (C, (B, (A, ()))))))),
178 7,
179 [0: A, 1: B, 2: C, 3: D, 4: E, 5: F, 6: G]
180);
181impl_unfold_tuple!(
183 (H, (G, (F, (E, (D, (C, (B, (A, ())))))))),
184 8,
185 [0: A, 1: B, 2: C, 3: D, 4: E, 5: F, 6: G, 7: H]
186);
187
188pub struct Pipeline<'conn, Accumulated> {
216 conn: &'conn super::ImapConnection,
217 pending: Vec<Box<dyn ConsumerErased>>,
218 commands: Vec<Command>,
219 _marker: PhantomData<Accumulated>,
220}
221
222impl<'conn> Pipeline<'conn, ()> {
223 pub(in crate::connection) fn new(conn: &'conn super::ImapConnection) -> Self {
225 Self {
226 conn,
227 pending: Vec::new(),
228 commands: Vec::new(),
229 _marker: PhantomData,
230 }
231 }
232}
233
234macro_rules! pipeline_method {
244 (
245 $(#[$meta:meta])*
246 fn $name:ident($($param:ident : $param_ty:ty),* $(,)?) -> $output:ty;
247 command = $cmd:expr;
248 consumer = $consumer:expr;
249 ) => {
250 $(#[$meta])*
251 pub fn $name(mut self $(, $param: $param_ty)*) -> Pipeline<'conn, ($output, T)> {
252 self.commands.push($cmd);
253 self.pending.push(Box::new($consumer) as Box<dyn ConsumerErased>);
254 Pipeline {
255 conn: self.conn,
256 pending: self.pending,
257 commands: self.commands,
258 _marker: PhantomData,
259 }
260 }
261 };
262}
263
264impl<'conn, T> Pipeline<'conn, T> {
265 pipeline_method! {
270 fn noop() -> ();
272 command = Command::Noop;
273 consumer = dispatch::TaggedOkConsumer::default();
274 }
275
276 pipeline_method! {
277 fn capability() -> Vec<Capability>;
279 command = Command::Capability;
280 consumer = dispatch::CapabilityConsumer::default();
281 }
282
283 pipeline_method! {
288 fn list(reference: String, pattern: String) -> Result<Vec<MailboxInfo>, Error>;
290 command = Command::List { reference, pattern };
291 consumer = dispatch::ListConsumer::new();
292 }
293
294 pub fn list_extended(
301 mut self,
302 selection_options: Vec<String>,
303 reference: String,
304 patterns: Vec<String>,
305 return_options: Vec<String>,
306 ) -> Pipeline<'conn, (Result<Vec<MailboxInfo>, Error>, T)> {
307 let filter_extended = !selection_options
310 .iter()
311 .any(|o| o.eq_ignore_ascii_case("SUBSCRIBED"));
312 let consumer =
313 dispatch::ListExtendedConsumer::new(filter_extended, selection_options.clone());
314 self.commands.push(Command::ListExtended {
315 selection_options,
316 reference,
317 patterns,
318 return_options,
319 });
320 self.pending
321 .push(Box::new(consumer) as Box<dyn ConsumerErased>);
322 Pipeline {
323 conn: self.conn,
324 pending: self.pending,
325 commands: self.commands,
326 _marker: PhantomData,
327 }
328 }
329
330 pipeline_method! {
331 fn list_status(
333 reference: String,
334 pattern: String,
335 status_items: String,
336 ) -> Result<Vec<(MailboxInfo, Vec<StatusItem>)>, Error>;
337 command = Command::ListStatus { reference, pattern, status_items };
338 consumer = dispatch::ListStatusConsumer::new();
339 }
340
341 pipeline_method! {
342 fn lsub(reference: String, pattern: String) -> Vec<MailboxInfo>;
344 command = Command::Lsub { reference, pattern };
345 consumer = dispatch::LsubConsumer::default();
346 }
347
348 pipeline_method! {
349 fn create(mailbox: MailboxName) -> Option<String>;
351 command = Command::Create { mailbox };
352 consumer = dispatch::CreateConsumer::default();
353 }
354
355 pipeline_method! {
356 fn create_special_use(
358 mailbox: MailboxName,
359 special_use: Vec<MailboxAttribute>,
360 ) -> Option<String>;
361 command = Command::CreateSpecialUse { mailbox, special_use };
362 consumer = dispatch::CreateConsumer::default();
363 }
364
365 pipeline_method! {
366 fn delete(mailbox: MailboxName) -> ();
368 command = Command::Delete { mailbox };
369 consumer = dispatch::TaggedOkConsumer::default();
370 }
371
372 pipeline_method! {
373 fn rename(mailbox: MailboxName, new_name: MailboxName) -> ();
375 command = Command::Rename { mailbox, new_name };
376 consumer = dispatch::TaggedOkConsumer::default();
377 }
378
379 pipeline_method! {
380 fn subscribe(mailbox: MailboxName) -> ();
382 command = Command::Subscribe { mailbox };
383 consumer = dispatch::TaggedOkConsumer::default();
384 }
385
386 pipeline_method! {
387 fn unsubscribe(mailbox: MailboxName) -> ();
389 command = Command::Unsubscribe { mailbox };
390 consumer = dispatch::TaggedOkConsumer::default();
391 }
392
393 pipeline_method! {
394 fn status(mailbox: MailboxName, items: String) -> StatusResult;
396 command = Command::Status { mailbox, items };
397 consumer = dispatch::StatusConsumer::new();
398 }
399
400 pipeline_method! {
401 fn namespace() -> NamespaceResponse;
403 command = Command::Namespace;
404 consumer = dispatch::NamespaceConsumer::default();
405 }
406
407 pipeline_method! {
412 fn check() -> ();
414 command = Command::Check;
415 consumer = dispatch::TaggedOkConsumer::default();
416 }
417
418 pipeline_method! {
419 fn expunge() -> ExpungeResult;
426 command = Command::Expunge;
427 consumer = dispatch::ExpungeConsumer::new();
428 }
429
430 pipeline_method! {
431 fn search(criteria: String) -> Result<SearchResult, Error>;
433 command = Command::Search { criteria };
434 consumer = dispatch::SearchConsumer::new();
435 }
436
437 pipeline_method! {
438 fn search_return(
440 criteria: String,
441 return_opts: Vec<String>,
442 ) -> Result<EsearchResponse, Error>;
443 command = Command::SearchReturn { criteria, return_opts };
444 consumer = dispatch::EsearchConsumer::new();
445 }
446
447 pipeline_method! {
448 fn search_save(criteria: String) -> Result<(), Error>;
450 command = Command::SearchSave { criteria };
451 consumer = dispatch::SearchSaveConsumer::new();
452 }
453
454 pipeline_method! {
455 fn fetch(
457 sequence_set: SequenceSet,
458 items: String,
459 changed_since: Option<u64>,
460 ) -> Vec<FetchResponse>;
461 command = Command::Fetch { sequence_set, items, changed_since };
462 consumer = dispatch::FetchConsumer::new();
463 }
464
465 pipeline_method! {
466 fn store(
468 sequence_set: SequenceSet,
469 operation: StoreOperation,
470 flags: Vec<Flag>,
471 unchanged_since: Option<u64>,
472 ) -> StoreResult;
473 command = Command::Store { sequence_set, operation, flags, unchanged_since };
474 consumer = dispatch::StoreConsumer::new();
475 }
476
477 pipeline_method! {
478 fn copy(sequence_set: SequenceSet, mailbox: MailboxName) -> CopyResult;
480 command = Command::Copy { sequence_set, mailbox };
481 consumer = dispatch::CopyConsumer::new();
482 }
483
484 pipeline_method! {
485 fn move_messages(sequence_set: SequenceSet, mailbox: MailboxName) -> MoveResult;
487 command = Command::Move { sequence_set, mailbox };
488 consumer = dispatch::MoveConsumer::new();
489 }
490
491 pipeline_method! {
496 fn uid_search(criteria: String) -> Result<SearchResult, Error>;
498 command = Command::UidSearch { criteria };
499 consumer = dispatch::SearchConsumer::new();
500 }
501
502 pipeline_method! {
503 fn uid_search_return(
505 criteria: String,
506 return_opts: Vec<String>,
507 ) -> Result<EsearchResponse, Error>;
508 command = Command::UidSearchReturn { criteria, return_opts };
509 consumer = dispatch::EsearchConsumer::new();
510 }
511
512 pipeline_method! {
513 fn uid_search_save(criteria: String) -> Result<(), Error>;
515 command = Command::UidSearchSave { criteria };
516 consumer = dispatch::SearchSaveConsumer::new();
517 }
518
519 #[allow(clippy::type_complexity)]
528 pub fn uid_fetch(
529 mut self,
530 sequence_set: SequenceSet,
531 items: String,
532 changed_since: Option<u64>,
533 vanished: bool,
534 ) -> Pipeline<'conn, ((Vec<FetchResponse>, Vec<UidRange>), T)> {
535 let parsed_set = ParsedUidSet::new(&sequence_set);
537 self.commands.push(Command::UidFetch {
538 sequence_set,
539 items,
540 changed_since,
541 vanished,
542 });
543 self.pending
544 .push(Box::new(dispatch::FetchVanishedConsumer::new(parsed_set))
545 as Box<dyn ConsumerErased>);
546 Pipeline {
547 conn: self.conn,
548 pending: self.pending,
549 commands: self.commands,
550 _marker: PhantomData,
551 }
552 }
553
554 pipeline_method! {
555 fn uid_store(
557 sequence_set: SequenceSet,
558 operation: StoreOperation,
559 flags: Vec<Flag>,
560 unchanged_since: Option<u64>,
561 ) -> StoreResult;
562 command = Command::UidStore { sequence_set, operation, flags, unchanged_since };
563 consumer = dispatch::StoreConsumer::new();
564 }
565
566 pipeline_method! {
567 fn uid_copy(sequence_set: SequenceSet, mailbox: MailboxName) -> CopyResult;
569 command = Command::UidCopy { sequence_set, mailbox };
570 consumer = dispatch::CopyConsumer::new();
571 }
572
573 pipeline_method! {
574 fn uid_move_messages(sequence_set: SequenceSet, mailbox: MailboxName) -> MoveResult;
576 command = Command::UidMove { sequence_set, mailbox };
577 consumer = dispatch::MoveConsumer::new();
578 }
579
580 pipeline_method! {
581 fn uid_expunge(sequence_set: SequenceSet) -> ExpungeResult;
583 command = Command::UidExpunge { sequence_set };
584 consumer = dispatch::ExpungeConsumer::new();
585 }
586
587 pipeline_method! {
592 fn id(params: Vec<(String, Option<String>)>) -> Vec<(String, Option<String>)>;
594 command = Command::Id(params);
595 consumer = dispatch::IdConsumer::default();
596 }
597
598 pub fn get_metadata(
600 mut self,
601 mailbox: MailboxName,
602 entries: Vec<String>,
603 max_size: Option<u64>,
604 depth: Option<String>,
605 ) -> Pipeline<'conn, (MetadataResult, T)> {
606 let consumer = dispatch::MetadataConsumer::new(mailbox.as_str().to_owned());
607 self.commands.push(Command::GetMetadata {
608 mailbox,
609 entries,
610 max_size,
611 depth,
612 });
613 self.pending
614 .push(Box::new(consumer) as Box<dyn ConsumerErased>);
615 Pipeline {
616 conn: self.conn,
617 pending: self.pending,
618 commands: self.commands,
619 _marker: PhantomData,
620 }
621 }
622
623 pipeline_method! {
624 fn set_metadata(
626 mailbox: MailboxName,
627 entries: Vec<(String, Option<Vec<u8>>)>,
628 ) -> ();
629 command = Command::SetMetadata { mailbox, entries };
630 consumer = dispatch::TaggedOkConsumer::default();
631 }
632
633 pipeline_method! {
634 fn thread(algorithm: String, charset: String, criteria: String) -> Vec<ThreadNode>;
636 command = Command::Thread { algorithm, charset, criteria };
637 consumer = dispatch::ThreadConsumer::default();
638 }
639
640 pipeline_method! {
641 fn uid_thread(algorithm: String, charset: String, criteria: String) -> Vec<ThreadNode>;
643 command = Command::UidThread { algorithm, charset, criteria };
644 consumer = dispatch::ThreadConsumer::default();
645 }
646
647 pipeline_method! {
648 fn sort(sort_criteria: String, charset: String, criteria: String) -> SearchResult;
650 command = Command::Sort { sort_criteria, charset, criteria };
651 consumer = dispatch::SortConsumer::default();
652 }
653
654 pipeline_method! {
655 fn uid_sort(sort_criteria: String, charset: String, criteria: String) -> SearchResult;
657 command = Command::UidSort { sort_criteria, charset, criteria };
658 consumer = dispatch::SortConsumer::default();
659 }
660
661 pipeline_method! {
662 fn notify_set(params: NotifySetParams) -> Result<bool, Error>;
664 command = Command::NotifySet(params);
665 consumer = dispatch::NotifySetConsumer::default();
666 }
667
668 pipeline_method! {
669 fn notify_none() -> ();
671 command = Command::NotifyNone;
672 consumer = dispatch::TaggedOkConsumer::default();
673 }
674
675 pub fn get_quota(mut self, root: String) -> Pipeline<'conn, (Vec<QuotaResource>, T)> {
677 let consumer = dispatch::QuotaConsumer::new(root.clone());
678 self.commands.push(Command::GetQuota { root });
679 self.pending
680 .push(Box::new(consumer) as Box<dyn ConsumerErased>);
681 Pipeline {
682 conn: self.conn,
683 pending: self.pending,
684 commands: self.commands,
685 _marker: PhantomData,
686 }
687 }
688
689 pub fn get_quota_root(
691 mut self,
692 mailbox: MailboxName,
693 ) -> Pipeline<'conn, (QuotaRootResponse, T)> {
694 let consumer = dispatch::QuotaRootConsumer::new(mailbox.as_str().to_owned());
695 self.commands.push(Command::GetQuotaRoot { mailbox });
696 self.pending
697 .push(Box::new(consumer) as Box<dyn ConsumerErased>);
698 Pipeline {
699 conn: self.conn,
700 pending: self.pending,
701 commands: self.commands,
702 _marker: PhantomData,
703 }
704 }
705
706 pub fn set_quota(
712 mut self,
713 root: String,
714 resources: Vec<(String, u64)>,
715 ) -> Pipeline<'conn, (Vec<QuotaResource>, T)> {
716 let consumer = dispatch::QuotaConsumer::new(root.clone());
717 self.commands.push(Command::SetQuota { root, resources });
718 self.pending
719 .push(Box::new(consumer) as Box<dyn ConsumerErased>);
720 Pipeline {
721 conn: self.conn,
722 pending: self.pending,
723 commands: self.commands,
724 _marker: PhantomData,
725 }
726 }
727
728 pipeline_method! {
729 fn set_acl(mailbox: MailboxName, identifier: String, rights: String) -> ();
731 command = Command::SetAcl { mailbox, identifier, rights };
732 consumer = dispatch::TaggedOkConsumer::default();
733 }
734
735 pipeline_method! {
736 fn delete_acl(mailbox: MailboxName, identifier: String) -> ();
738 command = Command::DeleteAcl { mailbox, identifier };
739 consumer = dispatch::TaggedOkConsumer::default();
740 }
741
742 pub fn get_acl(mut self, mailbox: MailboxName) -> Pipeline<'conn, (Vec<AclEntry>, T)> {
744 let consumer = dispatch::AclConsumer::new(mailbox.as_str().to_owned());
745 self.commands.push(Command::GetAcl { mailbox });
746 self.pending
747 .push(Box::new(consumer) as Box<dyn ConsumerErased>);
748 Pipeline {
749 conn: self.conn,
750 pending: self.pending,
751 commands: self.commands,
752 _marker: PhantomData,
753 }
754 }
755
756 pub fn list_rights(
758 mut self,
759 mailbox: MailboxName,
760 identifier: String,
761 ) -> Pipeline<'conn, (ListRightsResponse, T)> {
762 let consumer =
763 dispatch::ListRightsConsumer::new(mailbox.as_str().to_owned(), identifier.clone());
764 self.commands.push(Command::ListRights {
765 mailbox,
766 identifier,
767 });
768 self.pending
769 .push(Box::new(consumer) as Box<dyn ConsumerErased>);
770 Pipeline {
771 conn: self.conn,
772 pending: self.pending,
773 commands: self.commands,
774 _marker: PhantomData,
775 }
776 }
777
778 pub fn my_rights(mut self, mailbox: MailboxName) -> Pipeline<'conn, (String, T)> {
780 let consumer = dispatch::MyRightsConsumer::new(mailbox.as_str().to_owned());
781 self.commands.push(Command::MyRights { mailbox });
782 self.pending
783 .push(Box::new(consumer) as Box<dyn ConsumerErased>);
784 Pipeline {
785 conn: self.conn,
786 pending: self.pending,
787 commands: self.commands,
788 _marker: PhantomData,
789 }
790 }
791}
792
793impl<Accumulated: UnfoldTuple> Pipeline<'_, Accumulated> {
798 pub async fn execute(self) -> Result<Accumulated::Output, PipelineError> {
814 let results = self.execute_raw().await?;
815 Accumulated::unfold(results)
816 }
817}
818
819impl<T> Pipeline<'_, T> {
820 pub async fn execute_dynamic(
829 self,
830 ) -> Result<Vec<Result<Box<dyn Any + Send>, Error>>, PipelineError> {
831 self.execute_raw().await
832 }
833
834 async fn execute_raw(self) -> Result<Vec<Result<Box<dyn Any + Send>, Error>>, PipelineError> {
836 let (result_tx, result_rx) = tokio::sync::oneshot::channel();
837 let dcmd = super::driver::DriverCommand::Pipeline {
838 commands: self.commands,
839 consumers: self.pending,
840 result_tx,
841 };
842 if self.conn.cmd_tx.send(dcmd).await.is_err() {
843 return Err(PipelineError::Disconnected);
844 }
845 match result_rx.await {
846 Ok(Ok(results)) => Ok(results),
847 Ok(Err(e)) => Err(PipelineError::Driver(e)),
848 Err(_) => Err(PipelineError::Disconnected),
849 }
850 }
851}
852
853impl super::ImapConnection {
858 pub fn pipeline(&self) -> Pipeline<'_, ()> {
869 Pipeline::new(self)
870 }
871}
872
873#[cfg(test)]
878#[path = "tests.rs"]
879mod tests;