use async_trait::async_trait;
use std::sync::Arc;
use crate::{
account::config::{AccountConfig, HasAccountConfig},
envelope::{
get::GetEnvelope, list::ListEnvelopes, watch::WatchEnvelopes, Envelope, Envelopes, Id,
SingleId,
},
flag::{add::AddFlags, remove::RemoveFlags, set::SetFlags, Flags},
folder::{
add::AddFolder, delete::DeleteFolder, expunge::ExpungeFolder, list::ListFolders,
purge::PurgeFolder, Folders,
},
message::{
add::AddMessage, copy::CopyMessages, delete::DeleteMessages, get::GetMessages,
peek::PeekMessages, r#move::MoveMessages, send::SendMessage, Messages,
},
thread_pool::{ThreadPool, ThreadPoolBuilder, ThreadPoolContext, ThreadPoolContextBuilder},
Result,
};
use super::{
context::{BackendContext, BackendContextBuilder},
feature::BackendFeature,
AsyncTryIntoBackendFeatures, BackendBuilder, Error,
};
pub struct BackendPool<C: BackendContext> {
pub account_config: Arc<AccountConfig>,
pub pool: ThreadPool<C>,
pub add_folder: Option<BackendFeature<C, dyn AddFolder>>,
pub list_folders: Option<BackendFeature<C, dyn ListFolders>>,
pub expunge_folder: Option<BackendFeature<C, dyn ExpungeFolder>>,
pub purge_folder: Option<BackendFeature<C, dyn PurgeFolder>>,
pub delete_folder: Option<BackendFeature<C, dyn DeleteFolder>>,
pub get_envelope: Option<BackendFeature<C, dyn GetEnvelope>>,
pub list_envelopes: Option<BackendFeature<C, dyn ListEnvelopes>>,
pub watch_envelopes: Option<BackendFeature<C, dyn WatchEnvelopes>>,
pub add_flags: Option<BackendFeature<C, dyn AddFlags>>,
pub set_flags: Option<BackendFeature<C, dyn SetFlags>>,
pub remove_flags: Option<BackendFeature<C, dyn RemoveFlags>>,
pub add_message: Option<BackendFeature<C, dyn AddMessage>>,
pub send_message: Option<BackendFeature<C, dyn SendMessage>>,
pub peek_messages: Option<BackendFeature<C, dyn PeekMessages>>,
pub get_messages: Option<BackendFeature<C, dyn GetMessages>>,
pub copy_messages: Option<BackendFeature<C, dyn CopyMessages>>,
pub move_messages: Option<BackendFeature<C, dyn MoveMessages>>,
pub delete_messages: Option<BackendFeature<C, dyn DeleteMessages>>,
}
impl<C: BackendContext> HasAccountConfig for BackendPool<C> {
fn account_config(&self) -> &AccountConfig {
&self.account_config
}
}
#[async_trait]
impl<C: BackendContext + 'static> AddFolder for BackendPool<C> {
async fn add_folder(&self, folder: &str) -> Result<()> {
let folder = folder.to_owned();
let feature = self
.add_folder
.clone()
.ok_or(Error::AddFolderNotAvailableError)?;
self.pool
.exec(|ctx| async move {
feature(&ctx)
.ok_or(Error::AddFolderNotAvailableError)?
.add_folder(&folder)
.await
})
.await
}
}
#[async_trait]
impl<C: BackendContext + 'static> ListFolders for BackendPool<C> {
async fn list_folders(&self) -> Result<Folders> {
let feature = self
.list_folders
.clone()
.ok_or(Error::ListFoldersNotAvailableError)?;
self.pool
.exec(|ctx| async move {
feature(&ctx)
.ok_or(Error::ListFoldersNotAvailableError)?
.list_folders()
.await
})
.await
}
}
#[async_trait]
impl<C: BackendContext + 'static> ExpungeFolder for BackendPool<C> {
async fn expunge_folder(&self, folder: &str) -> Result<()> {
let folder = folder.to_owned();
let feature = self
.expunge_folder
.clone()
.ok_or(Error::ExpungeFolderNotAvailableError)?;
self.pool
.exec(|ctx| async move {
feature(&ctx)
.ok_or(Error::ExpungeFolderNotAvailableError)?
.expunge_folder(&folder)
.await
})
.await
}
}
#[async_trait]
impl<C: BackendContext + 'static> PurgeFolder for BackendPool<C> {
async fn purge_folder(&self, folder: &str) -> Result<()> {
let folder = folder.to_owned();
let feature = self
.purge_folder
.clone()
.ok_or(Error::PurgeFolderNotAvailableError)?;
self.pool
.exec(|ctx| async move {
feature(&ctx)
.ok_or(Error::PurgeFolderNotAvailableError)?
.purge_folder(&folder)
.await
})
.await
}
}
#[async_trait]
impl<C: BackendContext + 'static> DeleteFolder for BackendPool<C> {
async fn delete_folder(&self, folder: &str) -> Result<()> {
let folder = folder.to_owned();
let feature = self
.delete_folder
.clone()
.ok_or(Error::DeleteFolderNotAvailableError)?;
self.pool
.exec(|ctx| async move {
feature(&ctx)
.ok_or(Error::DeleteFolderNotAvailableError)?
.delete_folder(&folder)
.await
})
.await
}
}
#[async_trait]
impl<C: BackendContext + 'static> GetEnvelope for BackendPool<C> {
async fn get_envelope(&self, folder: &str, id: &Id) -> Result<Envelope> {
let folder = folder.to_owned();
let id = id.clone();
let feature = self
.get_envelope
.clone()
.ok_or(Error::GetEnvelopeNotAvailableError)?;
self.pool
.exec(|ctx| async move {
feature(&ctx)
.ok_or(Error::GetEnvelopeNotAvailableError)?
.get_envelope(&folder, &id)
.await
})
.await
}
}
#[async_trait]
impl<C: BackendContext + 'static> ListEnvelopes for BackendPool<C> {
async fn list_envelopes(
&self,
folder: &str,
page_size: usize,
page: usize,
) -> Result<Envelopes> {
let folder = folder.to_owned();
let feature = self
.list_envelopes
.clone()
.ok_or(Error::ListEnvelopesNotAvailableError)?;
self.pool
.exec(move |ctx| async move {
feature(&ctx)
.ok_or(Error::ListEnvelopesNotAvailableError)?
.list_envelopes(&folder, page_size, page)
.await
})
.await
}
}
#[async_trait]
impl<C: BackendContext + 'static> WatchEnvelopes for BackendPool<C> {
async fn watch_envelopes(&self, folder: &str) -> Result<()> {
let folder = folder.to_owned();
let feature = self
.watch_envelopes
.clone()
.ok_or(Error::WatchEnvelopesNotAvailableError)?;
self.pool
.exec(|ctx| async move {
feature(&ctx)
.ok_or(Error::WatchEnvelopesNotAvailableError)?
.watch_envelopes(&folder)
.await
})
.await
}
}
#[async_trait]
impl<C: BackendContext + 'static> AddFlags for BackendPool<C> {
async fn add_flags(&self, folder: &str, id: &Id, flags: &Flags) -> Result<()> {
let folder = folder.to_owned();
let id = id.clone();
let flags = flags.clone();
let feature = self
.add_flags
.clone()
.ok_or(Error::AddFlagsNotAvailableError)?;
self.pool
.exec(|ctx| async move {
feature(&ctx)
.ok_or(Error::AddFlagsNotAvailableError)?
.add_flags(&folder, &id, &flags)
.await
})
.await
}
}
#[async_trait]
impl<C: BackendContext + 'static> SetFlags for BackendPool<C> {
async fn set_flags(&self, folder: &str, id: &Id, flags: &Flags) -> Result<()> {
let folder = folder.to_owned();
let id = id.clone();
let flags = flags.clone();
let feature = self
.set_flags
.clone()
.ok_or(Error::SetFlagsNotAvailableError)?;
self.pool
.exec(|ctx| async move {
feature(&ctx)
.ok_or(Error::SetFlagsNotAvailableError)?
.set_flags(&folder, &id, &flags)
.await
})
.await
}
}
#[async_trait]
impl<C: BackendContext + 'static> RemoveFlags for BackendPool<C> {
async fn remove_flags(&self, folder: &str, id: &Id, flags: &Flags) -> Result<()> {
let folder = folder.to_owned();
let id = id.clone();
let flags = flags.clone();
let feature = self
.remove_flags
.clone()
.ok_or(Error::RemoveFlagsNotAvailableError)?;
self.pool
.exec(|ctx| async move {
feature(&ctx)
.ok_or(Error::RemoveFlagsNotAvailableError)?
.remove_flags(&folder, &id, &flags)
.await
})
.await
}
}
#[async_trait]
impl<C: BackendContext + 'static> AddMessage for BackendPool<C> {
async fn add_message_with_flags(
&self,
folder: &str,
msg: &[u8],
flags: &Flags,
) -> Result<SingleId> {
let folder = folder.to_owned();
let msg = msg.to_owned();
let flags = flags.clone();
let feature = self
.add_message
.clone()
.ok_or(Error::AddMessageNotAvailableError)?;
self.pool
.exec(|ctx| async move {
feature(&ctx)
.ok_or(Error::AddMessageWithFlagsNotAvailableError)?
.add_message_with_flags(&folder, &msg, &flags)
.await
})
.await
}
}
#[async_trait]
impl<C: BackendContext + 'static> SendMessage for BackendPool<C> {
async fn send_message(&self, msg: &[u8]) -> Result<()> {
let msg = msg.to_owned();
let feature = self
.send_message
.clone()
.ok_or(Error::SendMessageNotAvailableError)?;
self.pool
.exec(|ctx| async move {
feature(&ctx)
.ok_or(Error::SendMessageNotAvailableError)?
.send_message(&msg)
.await
})
.await
}
}
#[async_trait]
impl<C: BackendContext + 'static> PeekMessages for BackendPool<C> {
async fn peek_messages(&self, folder: &str, id: &Id) -> Result<Messages> {
let folder = folder.to_owned();
let id = id.clone();
let feature = self
.peek_messages
.clone()
.ok_or(Error::PeekMessagesNotAvailableError)?;
self.pool
.exec(|ctx| async move {
feature(&ctx)
.ok_or(Error::PeekMessagesNotAvailableError)?
.peek_messages(&folder, &id)
.await
})
.await
}
}
#[async_trait]
impl<C: BackendContext + 'static> GetMessages for BackendPool<C> {
async fn get_messages(&self, folder: &str, id: &Id) -> Result<Messages> {
let folder = folder.to_owned();
let id = id.clone();
let feature = self
.get_messages
.clone()
.ok_or(Error::GetMessagesNotAvailableError)?;
self.pool
.exec(|ctx| async move {
feature(&ctx)
.ok_or(Error::GetMessagesNotAvailableError)?
.get_messages(&folder, &id)
.await
})
.await
}
}
#[async_trait]
impl<C: BackendContext + 'static> CopyMessages for BackendPool<C> {
async fn copy_messages(&self, from_folder: &str, to_folder: &str, id: &Id) -> Result<()> {
let from_folder = from_folder.to_owned();
let to_folder = to_folder.to_owned();
let id = id.clone();
let feature = self
.copy_messages
.clone()
.ok_or(Error::CopyMessagesNotAvailableError)?;
self.pool
.exec(|ctx| async move {
feature(&ctx)
.ok_or(Error::CopyMessagesNotAvailableError)?
.copy_messages(&from_folder, &to_folder, &id)
.await
})
.await
}
}
#[async_trait]
impl<C: BackendContext + 'static> MoveMessages for BackendPool<C> {
async fn move_messages(&self, from_folder: &str, to_folder: &str, id: &Id) -> Result<()> {
let from_folder = from_folder.to_owned();
let to_folder = to_folder.to_owned();
let id = id.clone();
let feature = self
.move_messages
.clone()
.ok_or(Error::MoveMessagesNotAvailableError)?;
self.pool
.exec(|ctx| async move {
feature(&ctx)
.ok_or(Error::MoveMessagesNotAvailableError)?
.move_messages(&from_folder, &to_folder, &id)
.await
})
.await
}
}
#[async_trait]
impl<C: BackendContext + 'static> DeleteMessages for BackendPool<C> {
async fn delete_messages(&self, folder: &str, id: &Id) -> Result<()> {
let folder = folder.to_owned();
let id = id.clone();
let feature = self
.delete_messages
.clone()
.ok_or(Error::DeleteMessagesNotAvailableError)?;
self.pool
.exec(|ctx| async move {
feature(&ctx)
.ok_or(Error::DeleteMessagesNotAvailableError)?
.delete_messages(&folder, &id)
.await
})
.await
}
}
#[async_trait]
impl<CB> AsyncTryIntoBackendFeatures<BackendPool<CB::Context>> for BackendBuilder<CB>
where
CB: BackendContextBuilder + 'static,
{
async fn try_into_backend(self) -> Result<BackendPool<CB::Context>> {
let add_folder = self.get_add_folder();
let list_folders = self.get_list_folders();
let expunge_folder = self.get_expunge_folder();
let purge_folder = self.get_purge_folder();
let delete_folder = self.get_delete_folder();
let get_envelope = self.get_get_envelope();
let list_envelopes = self.get_list_envelopes();
let watch_envelopes = self.get_watch_envelopes();
let add_flags = self.get_add_flags();
let set_flags = self.get_set_flags();
let remove_flags = self.get_remove_flags();
let add_message = self.get_add_message();
let send_message = self.get_send_message();
let peek_messages = self.get_peek_messages();
let get_messages = self.get_get_messages();
let copy_messages = self.get_copy_messages();
let move_messages = self.get_move_messages();
let delete_messages = self.get_delete_messages();
Ok(BackendPool {
account_config: self.account_config.clone(),
pool: ThreadPoolBuilder::new(self.ctx_builder).build().await?,
add_folder,
list_folders,
expunge_folder,
purge_folder,
delete_folder,
get_envelope,
list_envelopes,
watch_envelopes,
add_flags,
set_flags,
remove_flags,
add_message,
send_message,
peek_messages,
get_messages,
copy_messages,
move_messages,
delete_messages,
})
}
}
impl<T> ThreadPoolContext for T where T: BackendContext {}
#[async_trait]
impl<T> ThreadPoolContextBuilder for T
where
T: BackendContextBuilder,
{
type Context = T::Context;
async fn build(self) -> Result<Self::Context> {
BackendContextBuilder::build(self).await
}
}