use crate::types::{
Flag, MailboxAttribute, MailboxName, NotifySetParams, SequenceSet, StoreOperation,
};
use std::any::Any;
use std::marker::PhantomData;
use crate::error::Error;
use crate::types::response::{
AclEntry, EsearchResponse, ListRightsResponse, MetadataResult, NamespaceResponse,
QuotaResource, QuotaRootResponse, ThreadNode, UidRange,
};
use crate::types::validated::ParsedUidSet;
use crate::types::{
Capability, Command, CopyResult, ExpungeResult, FetchResponse, MoveResult, StatusItem,
StatusResult, StoreResult,
};
use super::dispatch;
use super::driver::ConsumerErased;
use super::MailboxInfo;
use super::SearchResult;
#[derive(Debug)]
pub enum PipelineError {
Disconnected,
Driver(Error),
TypeMismatch {
index: usize,
},
}
impl std::fmt::Display for PipelineError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Disconnected => write!(f, "pipeline: driver task disconnected"),
Self::Driver(e) => write!(f, "pipeline: driver error: {e}"),
Self::TypeMismatch { index } => {
write!(f, "pipeline: type mismatch at command index {index}")
}
}
}
}
impl std::error::Error for PipelineError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::Driver(e) => Some(e),
_ => None,
}
}
}
pub trait UnfoldTuple {
type Output;
fn unfold(
results: Vec<Result<Box<dyn Any + Send>, Error>>,
) -> Result<Self::Output, PipelineError>;
}
fn downcast_result<T: Any + Send>(
result: Result<Box<dyn Any + Send>, Error>,
index: usize,
) -> Result<Result<T, Error>, PipelineError> {
match result {
Ok(boxed) => match boxed.downcast::<T>() {
Ok(val) => Ok(Ok(*val)),
Err(_) => Err(PipelineError::TypeMismatch { index }),
},
Err(e) => Ok(Err(e)),
}
}
impl UnfoldTuple for () {
type Output = ();
fn unfold(results: Vec<Result<Box<dyn Any + Send>, Error>>) -> Result<(), PipelineError> {
if !results.is_empty() {
return Err(PipelineError::TypeMismatch { index: 0 });
}
Ok(())
}
}
macro_rules! impl_unfold_tuple {
($nested:ty, $count:literal, [$($idx:literal : $T:ident),+ $(,)?]) => {
impl<$($T: Any + Send),+> UnfoldTuple for $nested {
type Output = ($(Result<$T, Error>,)+);
fn unfold(
results: Vec<Result<Box<dyn Any + Send>, Error>>,
) -> Result<Self::Output, PipelineError> {
if results.len() != $count {
return Err(PipelineError::TypeMismatch { index: 0 });
}
let mut iter = results.into_iter();
Ok(($(
downcast_result::<$T>(
iter.next().ok_or(PipelineError::TypeMismatch { index: $idx })?,
$idx,
)?,
)+))
}
}
};
}
impl_unfold_tuple!((A, ()), 1, [0: A]);
impl_unfold_tuple!((B, (A, ())), 2, [0: A, 1: B]);
impl_unfold_tuple!((C, (B, (A, ()))), 3, [0: A, 1: B, 2: C]);
impl_unfold_tuple!((D, (C, (B, (A, ())))), 4, [0: A, 1: B, 2: C, 3: D]);
impl_unfold_tuple!((E, (D, (C, (B, (A, ()))))), 5, [0: A, 1: B, 2: C, 3: D, 4: E]);
impl_unfold_tuple!((F, (E, (D, (C, (B, (A, ())))))), 6, [0: A, 1: B, 2: C, 3: D, 4: E, 5: F]);
impl_unfold_tuple!(
(G, (F, (E, (D, (C, (B, (A, ()))))))),
7,
[0: A, 1: B, 2: C, 3: D, 4: E, 5: F, 6: G]
);
impl_unfold_tuple!(
(H, (G, (F, (E, (D, (C, (B, (A, ())))))))),
8,
[0: A, 1: B, 2: C, 3: D, 4: E, 5: F, 6: G, 7: H]
);
pub struct Pipeline<'conn, Accumulated> {
conn: &'conn super::ImapConnection,
pending: Vec<Box<dyn ConsumerErased>>,
commands: Vec<Command>,
_marker: PhantomData<Accumulated>,
}
impl<'conn> Pipeline<'conn, ()> {
pub(in crate::connection) fn new(conn: &'conn super::ImapConnection) -> Self {
Self {
conn,
pending: Vec::new(),
commands: Vec::new(),
_marker: PhantomData,
}
}
}
macro_rules! pipeline_method {
(
$(#[$meta:meta])*
fn $name:ident($($param:ident : $param_ty:ty),* $(,)?) -> $output:ty;
command = $cmd:expr;
consumer = $consumer:expr;
) => {
$(#[$meta])*
pub fn $name(mut self $(, $param: $param_ty)*) -> Pipeline<'conn, ($output, T)> {
self.commands.push($cmd);
self.pending.push(Box::new($consumer) as Box<dyn ConsumerErased>);
Pipeline {
conn: self.conn,
pending: self.pending,
commands: self.commands,
_marker: PhantomData,
}
}
};
}
impl<'conn, T> Pipeline<'conn, T> {
pipeline_method! {
fn noop() -> ();
command = Command::Noop;
consumer = dispatch::TaggedOkConsumer::default();
}
pipeline_method! {
fn capability() -> Vec<Capability>;
command = Command::Capability;
consumer = dispatch::CapabilityConsumer::default();
}
pipeline_method! {
fn list(reference: String, pattern: String) -> Result<Vec<MailboxInfo>, Error>;
command = Command::List { reference, pattern };
consumer = dispatch::ListConsumer::new();
}
pub fn list_extended(
mut self,
selection_options: Vec<String>,
reference: String,
patterns: Vec<String>,
return_options: Vec<String>,
) -> Pipeline<'conn, (Result<Vec<MailboxInfo>, Error>, T)> {
let filter_extended = !selection_options
.iter()
.any(|o| o.eq_ignore_ascii_case("SUBSCRIBED"));
let consumer =
dispatch::ListExtendedConsumer::new(filter_extended, selection_options.clone());
self.commands.push(Command::ListExtended {
selection_options,
reference,
patterns,
return_options,
});
self.pending
.push(Box::new(consumer) as Box<dyn ConsumerErased>);
Pipeline {
conn: self.conn,
pending: self.pending,
commands: self.commands,
_marker: PhantomData,
}
}
pipeline_method! {
fn list_status(
reference: String,
pattern: String,
status_items: String,
) -> Result<Vec<(MailboxInfo, Vec<StatusItem>)>, Error>;
command = Command::ListStatus { reference, pattern, status_items };
consumer = dispatch::ListStatusConsumer::new();
}
pipeline_method! {
fn lsub(reference: String, pattern: String) -> Vec<MailboxInfo>;
command = Command::Lsub { reference, pattern };
consumer = dispatch::LsubConsumer::default();
}
pipeline_method! {
fn create(mailbox: MailboxName) -> Option<String>;
command = Command::Create { mailbox };
consumer = dispatch::CreateConsumer::default();
}
pipeline_method! {
fn create_special_use(
mailbox: MailboxName,
special_use: Vec<MailboxAttribute>,
) -> Option<String>;
command = Command::CreateSpecialUse { mailbox, special_use };
consumer = dispatch::CreateConsumer::default();
}
pipeline_method! {
fn delete(mailbox: MailboxName) -> ();
command = Command::Delete { mailbox };
consumer = dispatch::TaggedOkConsumer::default();
}
pipeline_method! {
fn rename(mailbox: MailboxName, new_name: MailboxName) -> ();
command = Command::Rename { mailbox, new_name };
consumer = dispatch::TaggedOkConsumer::default();
}
pipeline_method! {
fn subscribe(mailbox: MailboxName) -> ();
command = Command::Subscribe { mailbox };
consumer = dispatch::TaggedOkConsumer::default();
}
pipeline_method! {
fn unsubscribe(mailbox: MailboxName) -> ();
command = Command::Unsubscribe { mailbox };
consumer = dispatch::TaggedOkConsumer::default();
}
pipeline_method! {
fn status(mailbox: MailboxName, items: String) -> StatusResult;
command = Command::Status { mailbox, items };
consumer = dispatch::StatusConsumer::new();
}
pipeline_method! {
fn namespace() -> NamespaceResponse;
command = Command::Namespace;
consumer = dispatch::NamespaceConsumer::default();
}
pipeline_method! {
fn check() -> ();
command = Command::Check;
consumer = dispatch::TaggedOkConsumer::default();
}
pipeline_method! {
fn expunge() -> ExpungeResult;
command = Command::Expunge;
consumer = dispatch::ExpungeConsumer::new();
}
pipeline_method! {
fn search(criteria: String) -> Result<SearchResult, Error>;
command = Command::Search { criteria };
consumer = dispatch::SearchConsumer::new();
}
pipeline_method! {
fn search_return(
criteria: String,
return_opts: Vec<String>,
) -> Result<EsearchResponse, Error>;
command = Command::SearchReturn { criteria, return_opts };
consumer = dispatch::EsearchConsumer::new();
}
pipeline_method! {
fn search_save(criteria: String) -> Result<(), Error>;
command = Command::SearchSave { criteria };
consumer = dispatch::SearchSaveConsumer::new();
}
pipeline_method! {
fn fetch(
sequence_set: SequenceSet,
items: String,
changed_since: Option<u64>,
) -> Vec<FetchResponse>;
command = Command::Fetch { sequence_set, items, changed_since };
consumer = dispatch::FetchConsumer::new();
}
pipeline_method! {
fn store(
sequence_set: SequenceSet,
operation: StoreOperation,
flags: Vec<Flag>,
unchanged_since: Option<u64>,
) -> StoreResult;
command = Command::Store { sequence_set, operation, flags, unchanged_since };
consumer = dispatch::StoreConsumer::new();
}
pipeline_method! {
fn copy(sequence_set: SequenceSet, mailbox: MailboxName) -> CopyResult;
command = Command::Copy { sequence_set, mailbox };
consumer = dispatch::CopyConsumer::new();
}
pipeline_method! {
fn move_messages(sequence_set: SequenceSet, mailbox: MailboxName) -> MoveResult;
command = Command::Move { sequence_set, mailbox };
consumer = dispatch::MoveConsumer::new();
}
pipeline_method! {
fn uid_search(criteria: String) -> Result<SearchResult, Error>;
command = Command::UidSearch { criteria };
consumer = dispatch::SearchConsumer::new();
}
pipeline_method! {
fn uid_search_return(
criteria: String,
return_opts: Vec<String>,
) -> Result<EsearchResponse, Error>;
command = Command::UidSearchReturn { criteria, return_opts };
consumer = dispatch::EsearchConsumer::new();
}
pipeline_method! {
fn uid_search_save(criteria: String) -> Result<(), Error>;
command = Command::UidSearchSave { criteria };
consumer = dispatch::SearchSaveConsumer::new();
}
#[allow(clippy::type_complexity)]
pub fn uid_fetch(
mut self,
sequence_set: SequenceSet,
items: String,
changed_since: Option<u64>,
vanished: bool,
) -> Pipeline<'conn, ((Vec<FetchResponse>, Vec<UidRange>), T)> {
let parsed_set = ParsedUidSet::new(&sequence_set);
self.commands.push(Command::UidFetch {
sequence_set,
items,
changed_since,
vanished,
});
self.pending
.push(Box::new(dispatch::FetchVanishedConsumer::new(parsed_set))
as Box<dyn ConsumerErased>);
Pipeline {
conn: self.conn,
pending: self.pending,
commands: self.commands,
_marker: PhantomData,
}
}
pipeline_method! {
fn uid_store(
sequence_set: SequenceSet,
operation: StoreOperation,
flags: Vec<Flag>,
unchanged_since: Option<u64>,
) -> StoreResult;
command = Command::UidStore { sequence_set, operation, flags, unchanged_since };
consumer = dispatch::StoreConsumer::new();
}
pipeline_method! {
fn uid_copy(sequence_set: SequenceSet, mailbox: MailboxName) -> CopyResult;
command = Command::UidCopy { sequence_set, mailbox };
consumer = dispatch::CopyConsumer::new();
}
pipeline_method! {
fn uid_move_messages(sequence_set: SequenceSet, mailbox: MailboxName) -> MoveResult;
command = Command::UidMove { sequence_set, mailbox };
consumer = dispatch::MoveConsumer::new();
}
pipeline_method! {
fn uid_expunge(sequence_set: SequenceSet) -> ExpungeResult;
command = Command::UidExpunge { sequence_set };
consumer = dispatch::ExpungeConsumer::new();
}
pipeline_method! {
fn id(params: Vec<(String, Option<String>)>) -> Vec<(String, Option<String>)>;
command = Command::Id(params);
consumer = dispatch::IdConsumer::default();
}
pub fn get_metadata(
mut self,
mailbox: MailboxName,
entries: Vec<String>,
max_size: Option<u64>,
depth: Option<String>,
) -> Pipeline<'conn, (MetadataResult, T)> {
let consumer = dispatch::MetadataConsumer::new(mailbox.as_str().to_owned());
self.commands.push(Command::GetMetadata {
mailbox,
entries,
max_size,
depth,
});
self.pending
.push(Box::new(consumer) as Box<dyn ConsumerErased>);
Pipeline {
conn: self.conn,
pending: self.pending,
commands: self.commands,
_marker: PhantomData,
}
}
pipeline_method! {
fn set_metadata(
mailbox: MailboxName,
entries: Vec<(String, Option<Vec<u8>>)>,
) -> ();
command = Command::SetMetadata { mailbox, entries };
consumer = dispatch::TaggedOkConsumer::default();
}
pipeline_method! {
fn thread(algorithm: String, charset: String, criteria: String) -> Vec<ThreadNode>;
command = Command::Thread { algorithm, charset, criteria };
consumer = dispatch::ThreadConsumer::default();
}
pipeline_method! {
fn uid_thread(algorithm: String, charset: String, criteria: String) -> Vec<ThreadNode>;
command = Command::UidThread { algorithm, charset, criteria };
consumer = dispatch::ThreadConsumer::default();
}
pipeline_method! {
fn sort(sort_criteria: String, charset: String, criteria: String) -> SearchResult;
command = Command::Sort { sort_criteria, charset, criteria };
consumer = dispatch::SortConsumer::default();
}
pipeline_method! {
fn uid_sort(sort_criteria: String, charset: String, criteria: String) -> SearchResult;
command = Command::UidSort { sort_criteria, charset, criteria };
consumer = dispatch::SortConsumer::default();
}
pipeline_method! {
fn notify_set(params: NotifySetParams) -> Result<bool, Error>;
command = Command::NotifySet(params);
consumer = dispatch::NotifySetConsumer::default();
}
pipeline_method! {
fn notify_none() -> ();
command = Command::NotifyNone;
consumer = dispatch::TaggedOkConsumer::default();
}
pub fn get_quota(mut self, root: String) -> Pipeline<'conn, (Vec<QuotaResource>, T)> {
let consumer = dispatch::QuotaConsumer::new(root.clone());
self.commands.push(Command::GetQuota { root });
self.pending
.push(Box::new(consumer) as Box<dyn ConsumerErased>);
Pipeline {
conn: self.conn,
pending: self.pending,
commands: self.commands,
_marker: PhantomData,
}
}
pub fn get_quota_root(
mut self,
mailbox: MailboxName,
) -> Pipeline<'conn, (QuotaRootResponse, T)> {
let consumer = dispatch::QuotaRootConsumer::new(mailbox.as_str().to_owned());
self.commands.push(Command::GetQuotaRoot { mailbox });
self.pending
.push(Box::new(consumer) as Box<dyn ConsumerErased>);
Pipeline {
conn: self.conn,
pending: self.pending,
commands: self.commands,
_marker: PhantomData,
}
}
pub fn set_quota(
mut self,
root: String,
resources: Vec<(String, u64)>,
) -> Pipeline<'conn, (Vec<QuotaResource>, T)> {
let consumer = dispatch::QuotaConsumer::new(root.clone());
self.commands.push(Command::SetQuota { root, resources });
self.pending
.push(Box::new(consumer) as Box<dyn ConsumerErased>);
Pipeline {
conn: self.conn,
pending: self.pending,
commands: self.commands,
_marker: PhantomData,
}
}
pipeline_method! {
fn set_acl(mailbox: MailboxName, identifier: String, rights: String) -> ();
command = Command::SetAcl { mailbox, identifier, rights };
consumer = dispatch::TaggedOkConsumer::default();
}
pipeline_method! {
fn delete_acl(mailbox: MailboxName, identifier: String) -> ();
command = Command::DeleteAcl { mailbox, identifier };
consumer = dispatch::TaggedOkConsumer::default();
}
pub fn get_acl(mut self, mailbox: MailboxName) -> Pipeline<'conn, (Vec<AclEntry>, T)> {
let consumer = dispatch::AclConsumer::new(mailbox.as_str().to_owned());
self.commands.push(Command::GetAcl { mailbox });
self.pending
.push(Box::new(consumer) as Box<dyn ConsumerErased>);
Pipeline {
conn: self.conn,
pending: self.pending,
commands: self.commands,
_marker: PhantomData,
}
}
pub fn list_rights(
mut self,
mailbox: MailboxName,
identifier: String,
) -> Pipeline<'conn, (ListRightsResponse, T)> {
let consumer =
dispatch::ListRightsConsumer::new(mailbox.as_str().to_owned(), identifier.clone());
self.commands.push(Command::ListRights {
mailbox,
identifier,
});
self.pending
.push(Box::new(consumer) as Box<dyn ConsumerErased>);
Pipeline {
conn: self.conn,
pending: self.pending,
commands: self.commands,
_marker: PhantomData,
}
}
pub fn my_rights(mut self, mailbox: MailboxName) -> Pipeline<'conn, (String, T)> {
let consumer = dispatch::MyRightsConsumer::new(mailbox.as_str().to_owned());
self.commands.push(Command::MyRights { mailbox });
self.pending
.push(Box::new(consumer) as Box<dyn ConsumerErased>);
Pipeline {
conn: self.conn,
pending: self.pending,
commands: self.commands,
_marker: PhantomData,
}
}
}
impl<Accumulated: UnfoldTuple> Pipeline<'_, Accumulated> {
pub async fn execute(self) -> Result<Accumulated::Output, PipelineError> {
let results = self.execute_raw().await?;
Accumulated::unfold(results)
}
}
impl<T> Pipeline<'_, T> {
pub async fn execute_dynamic(
self,
) -> Result<Vec<Result<Box<dyn Any + Send>, Error>>, PipelineError> {
self.execute_raw().await
}
async fn execute_raw(self) -> Result<Vec<Result<Box<dyn Any + Send>, Error>>, PipelineError> {
let (result_tx, result_rx) = tokio::sync::oneshot::channel();
let dcmd = super::driver::DriverCommand::Pipeline {
commands: self.commands,
consumers: self.pending,
result_tx,
};
if self.conn.cmd_tx.send(dcmd).await.is_err() {
return Err(PipelineError::Disconnected);
}
match result_rx.await {
Ok(Ok(results)) => Ok(results),
Ok(Err(e)) => Err(PipelineError::Driver(e)),
Err(_) => Err(PipelineError::Disconnected),
}
}
}
impl super::ImapConnection {
pub fn pipeline(&self) -> Pipeline<'_, ()> {
Pipeline::new(self)
}
}
#[cfg(test)]
#[path = "tests.rs"]
mod tests;