#![allow(clippy::pedantic, missing_docs, clippy::missing_docs_in_private_items)]
use serde::{de::DeserializeOwned, Serialize};
use serde_cbor::value::{from_value, to_value, Value};
use snafu::ResultExt;
use tracing::{error, info, info_span};
use crate::{
error::{BackendError, CryptoBoxError, Fetch, Store},
CryptoBox,
};
enum MethodInvokation {
ContainsKey {
key: Value,
namespace: String,
sender: async_oneshot::Sender<Result<bool, CryptoBoxError>>,
},
CreateNamespace {
name: String,
sender: async_oneshot::Sender<Result<(), CryptoBoxError>>,
},
Get {
key: Value,
namespace: String,
sender: async_oneshot::Sender<Result<Option<Value>, CryptoBoxError>>,
},
GetRoot {
key: Value,
sender: async_oneshot::Sender<Result<Option<Value>, CryptoBoxError>>,
},
Insert {
key: Value,
value: Value,
namespace: String,
sender: async_oneshot::Sender<Result<(), CryptoBoxError>>,
},
InsertRoot {
key: Value,
value: Value,
sender: async_oneshot::Sender<Result<(), CryptoBoxError>>,
},
NamespaceExists {
namespace: String,
sender: async_oneshot::Sender<bool>,
},
Namespaces {
sender: async_oneshot::Sender<Vec<String>>,
},
Flush {
sender: async_oneshot::Sender<Result<(), CryptoBoxError>>,
},
IntoInner {
sender: async_oneshot::Sender<CryptoBox>,
},
}
pub struct AsyncCryptoBox {
channel: flume::Sender<MethodInvokation>,
}
impl AsyncCryptoBox {
pub fn new(crypto_box: CryptoBox) -> Self {
let (tx, rx) = flume::bounded(1);
std::thread::spawn(move || {
let span = info_span!("AsyncCryptoBox backing task");
span.in_scope(|| {
info!("Starting AsyncCryptoBox backing task");
let mut crypto_box = crypto_box;
for method in rx.iter() {
let method: MethodInvokation = method;
match method {
MethodInvokation::ContainsKey {
key,
namespace,
mut sender,
} => {
let ret = crypto_box.contains_key::<Value, Value>(&key, &namespace);
if let Err(e) = sender.send(ret) {
error!(?e, "Failed to message caller");
}
}
MethodInvokation::CreateNamespace { name, mut sender } => {
let ret = crypto_box.create_namespace(name);
if let Err(e) = sender.send(ret) {
error!(?e, "Failed to message caller");
}
}
MethodInvokation::Get {
key,
namespace,
mut sender,
} => {
let ret = crypto_box.get::<Value, Value>(&key, &namespace);
if let Err(e) = sender.send(ret) {
error!(?e, "Failed to message caller");
}
}
MethodInvokation::GetRoot { key, mut sender } => {
let ret = crypto_box.get_root::<Value, Value>(&key);
if let Err(e) = sender.send(ret) {
error!(?e, "Failed to message caller");
}
}
MethodInvokation::Insert {
key,
value,
namespace,
mut sender,
} => {
let ret = crypto_box.insert(&key, &value, &namespace);
if let Err(e) = sender.send(ret) {
error!(?e, "Failed to message caller");
}
}
MethodInvokation::InsertRoot {
key,
value,
mut sender,
} => {
let ret = crypto_box.insert_root(&key, &value);
if let Err(e) = sender.send(ret) {
error!(?e, "Failed to message caller");
}
}
MethodInvokation::NamespaceExists {
namespace,
mut sender,
} => {
let ret = crypto_box.namespace_exists(&namespace);
if let Err(e) = sender.send(ret) {
error!(?e, "Failed to message caller");
}
}
MethodInvokation::Namespaces { mut sender } => {
let ret = crypto_box.namespaces();
if let Err(e) = sender.send(ret) {
error!(?e, "Failed to message caller");
}
}
MethodInvokation::Flush { mut sender } => {
let ret = crypto_box.flush();
if let Err(e) = sender.send(ret) {
error!(?e, "Failed to message caller");
}
}
MethodInvokation::IntoInner { mut sender } => {
if let Err(e) = sender.send(crypto_box) {
error!(?e, "Failed to message caller");
}
info!("Shutting down AsyncCryptoBox backing task");
break;
}
}
}
});
});
AsyncCryptoBox { channel: tx }
}
pub async fn namespace_exists(&self, name: &str) -> Result<bool, CryptoBoxError> {
let namespace = name.to_string();
let (sender, rx) = async_oneshot::oneshot();
self.channel
.send_async(MethodInvokation::NamespaceExists { namespace, sender })
.await
.map_err(|_| CryptoBoxError::AsyncError)?;
rx.await.map_err(|_| CryptoBoxError::AsyncError)
}
pub async fn namespaces(&self) -> Result<Vec<String>, CryptoBoxError> {
let (sender, rx) = async_oneshot::oneshot();
self.channel
.send_async(MethodInvokation::Namespaces { sender })
.await
.map_err(|_| CryptoBoxError::AsyncError)?;
rx.await.map_err(|_| CryptoBoxError::AsyncError)
}
pub async fn create_namespace(&mut self, name: String) -> Result<(), CryptoBoxError> {
let (sender, rx) = async_oneshot::oneshot();
self.channel
.send_async(MethodInvokation::CreateNamespace { name, sender })
.await
.map_err(|_| CryptoBoxError::AsyncError)?;
rx.await.map_err(|_| CryptoBoxError::AsyncError)?
}
pub async fn get<K, V>(&mut self, key: &K, namespace: &str) -> Result<Option<V>, CryptoBoxError>
where
K: Serialize,
V: DeserializeOwned,
{
let key = to_value(key)
.map_err(|_| BackendError::ItemSerialization)
.context(Fetch)?;
let namespace = namespace.to_string();
let (sender, rx) = async_oneshot::oneshot();
self.channel
.send_async(MethodInvokation::Get {
key,
namespace,
sender,
})
.await
.map_err(|_| CryptoBoxError::AsyncError)?;
let ret = rx.await.map_err(|_| CryptoBoxError::AsyncError)??;
if let Some(value) = ret {
from_value(value)
.map_err(|_| BackendError::ItemDeserialization)
.context(Fetch)
} else {
Ok(None)
}
}
pub async fn get_root<K, V>(&mut self, key: &K) -> Result<Option<V>, CryptoBoxError>
where
K: Serialize,
V: DeserializeOwned,
{
let key = to_value(key)
.map_err(|_| BackendError::ItemSerialization)
.context(Fetch)?;
let (sender, rx) = async_oneshot::oneshot();
self.channel
.send_async(MethodInvokation::GetRoot { key, sender })
.await
.map_err(|_| CryptoBoxError::AsyncError)?;
let ret = rx.await.map_err(|_| CryptoBoxError::AsyncError)??;
if let Some(value) = ret {
from_value(value)
.map_err(|_| BackendError::ItemDeserialization)
.context(Fetch)
} else {
Ok(None)
}
}
pub async fn insert<K, V>(
&mut self,
key: &K,
value: &V,
namespace: &str,
) -> Result<(), CryptoBoxError>
where
K: Serialize,
V: Serialize,
{
let key = to_value(key)
.map_err(|_| BackendError::ItemSerialization)
.context(Store)?;
let value = to_value(value)
.map_err(|_| BackendError::ItemSerialization)
.context(Store)?;
let namespace = namespace.to_string();
let (sender, rx) = async_oneshot::oneshot();
self.channel
.send_async(MethodInvokation::Insert {
key,
namespace,
sender,
value,
})
.await
.map_err(|_| CryptoBoxError::AsyncError)?;
rx.await.map_err(|_| CryptoBoxError::AsyncError)?
}
pub async fn insert_root<K, V>(&mut self, key: &K, value: &V) -> Result<(), CryptoBoxError>
where
K: Serialize,
V: Serialize,
{
let key = to_value(key)
.map_err(|_| BackendError::ItemSerialization)
.context(Store)?;
let value = to_value(value)
.map_err(|_| BackendError::ItemSerialization)
.context(Store)?;
let (sender, rx) = async_oneshot::oneshot();
self.channel
.send_async(MethodInvokation::InsertRoot { key, sender, value })
.await
.map_err(|_| CryptoBoxError::AsyncError)?;
rx.await.map_err(|_| CryptoBoxError::AsyncError)?
}
pub async fn contains_key<K, V>(
&mut self,
key: &K,
namespace: &str,
) -> Result<bool, CryptoBoxError>
where
K: Serialize,
V: DeserializeOwned,
{
let key = to_value(key)
.map_err(|_| BackendError::ItemSerialization)
.context(Fetch)?;
let namespace = namespace.to_string();
let (sender, rx) = async_oneshot::oneshot();
self.channel
.send_async(MethodInvokation::ContainsKey {
key,
namespace,
sender,
})
.await
.map_err(|_| CryptoBoxError::AsyncError)?;
rx.await.map_err(|_| CryptoBoxError::AsyncError)?
}
pub async fn flush(&mut self) -> Result<(), CryptoBoxError> {
let (sender, rx) = async_oneshot::oneshot();
self.channel
.send_async(MethodInvokation::Flush { sender })
.await
.map_err(|_| CryptoBoxError::AsyncError)?;
rx.await.map_err(|_| CryptoBoxError::AsyncError)?
}
pub async fn into_inner(&mut self) -> Result<CryptoBox, CryptoBoxError> {
let (sender, rx) = async_oneshot::oneshot();
self.channel
.send_async(MethodInvokation::IntoInner { sender })
.await
.map_err(|_| CryptoBoxError::AsyncError)?;
rx.await.map_err(|_| CryptoBoxError::AsyncError)
}
}