mod guards;
use crate::error::OrUserError;
use crate::container::Container;
use crate::manager::FileManager;
pub use self::guards::{
AccessGuard,
AccessGuardMut,
OwnedAccessGuard,
OwnedAccessGuardMut
};
use tokio::sync::RwLock;
use std::path::Path;
use std::sync::Arc;
pub type StandardContainerSharedAsync<T, Format> = ContainerSharedAsync<T, crate::manager::standard::StandardManager<Format>>;
pub type StandardContainerSharedAsyncOptions = crate::manager::standard::StandardManagerOptions;
#[cfg_attr(docsrs, doc(cfg(feature = "atomic")))]
#[cfg(feature = "atomic")]
pub type AtomicContainerSharedAsync<T, Format, Support> = ContainerSharedAsync<T, crate::manager::atomic::AtomicManager<Format, Support>>;
#[cfg_attr(docsrs, doc(cfg(feature = "atomic")))]
#[cfg(feature = "atomic")]
pub type AtomicContainerSharedAsyncOptions<Support> = crate::manager::atomic::AtomicManagerOptions<Support>;
macro_rules! spawn_blocking {
($expr:expr) => (tokio::task::spawn_blocking(move || $expr).await.expect("blocking task failed"));
}
#[repr(transparent)]
#[derive(Debug)]
pub struct ContainerSharedAsync<T, Manager> {
ptr: Arc<RwLock<Container<T, Manager>>>
}
impl<T, Manager> ContainerSharedAsync<T, Manager> {
pub fn new(value: T, manager: Manager) -> Self {
ContainerSharedAsync::from(Container::new(value, manager))
}
pub fn try_unwrap(self) -> Result<Container<T, Manager>, Self> {
match Arc::try_unwrap(self.ptr) {
Ok(inner) => Ok(RwLock::into_inner(inner)),
Err(ptr) => Err(ContainerSharedAsync { ptr })
}
}
pub fn get_mut(&mut self) -> Option<&mut Container<T, Manager>> {
Arc::get_mut(&mut self.ptr).map(RwLock::get_mut)
}
#[inline]
pub async fn access(&self) -> AccessGuard<'_, T, Manager> {
AccessGuard::new(self.ptr.read().await)
}
#[inline]
pub async fn access_mut(&self) -> AccessGuardMut<'_, T, Manager> {
AccessGuardMut::new(self.ptr.write().await)
}
#[inline]
pub async fn access_owned(&self) -> OwnedAccessGuard<T, Manager> {
OwnedAccessGuard::new(self.ptr.clone().read_owned().await)
}
#[inline]
pub async fn access_owned_mut(&self) -> OwnedAccessGuardMut<T, Manager> {
OwnedAccessGuardMut::new(self.ptr.clone().write_owned().await)
}
#[inline]
pub fn try_access(&self) -> Option<AccessGuard<'_, T, Manager>> {
self.ptr.try_read().map(AccessGuard::new).ok()
}
#[inline]
pub fn try_access_mut(&self) -> Option<AccessGuardMut<'_, T, Manager>> {
self.ptr.try_write().map(AccessGuardMut::new).ok()
}
#[inline]
pub fn try_access_owned(&self) -> Option<OwnedAccessGuard<T, Manager>> {
self.ptr.clone().try_read_owned().map(OwnedAccessGuard::new).ok()
}
#[inline]
pub fn try_access_owned_mut(&self) -> Option<OwnedAccessGuardMut<T, Manager>> {
self.ptr.clone().try_write_owned().map(OwnedAccessGuardMut::new).ok()
}
pub async fn operate<F, R>(&self, operation: F) -> R
where F: AsyncFnOnce(&T) -> R {
operation(&*self.access().await).await
}
pub async fn operate_mut<F, R>(&self, operation: F) -> R
where F: AsyncFnOnce(&mut T) -> R {
operation(&mut *self.access_mut().await).await
}
}
impl<T, Manager> ContainerSharedAsync<T, Manager>
where
T: Send + Sync + 'static,
Manager: FileManager<T> + Send + Sync + 'static,
Manager::Format: Send,
Manager::Options: Send,
Manager::Error: Send
{
pub async fn open<P: AsRef<Path>>(
path: P, format: Manager::Format, options: Manager::Options
) -> Result<Self, Manager::Error> {
let path = path.as_ref().to_owned();
spawn_blocking!(Container::<T, _>::open(path, format, options)).map(From::from)
}
pub async fn create_overwrite<P: AsRef<Path>>(
path: P, format: Manager::Format, options: Manager::Options, value: T
) -> Result<Self, Manager::Error> {
let path = path.as_ref().to_owned();
spawn_blocking!(Container::<T, _>::create_overwrite(path, format, options, value)).map(From::from)
}
pub async fn create_or<P: AsRef<Path>>(
path: P, format: Manager::Format, options: Manager::Options, value: T
) -> Result<Self, Manager::Error> {
let path = path.as_ref().to_owned();
spawn_blocking!(Container::<T, _>::create_or(path, format, options, value)).map(From::from)
}
pub async fn create_or_else<P: AsRef<Path>, C>(
path: P, format: Manager::Format, options: Manager::Options, closure: C
) -> Result<Self, Manager::Error>
where C: FnOnce() -> T + Send + 'static {
let path = path.as_ref().to_owned();
spawn_blocking!(Container::<T, _>::create_or_else(path, format, options, closure)).map(From::from)
}
pub async fn create_or_default<P: AsRef<Path>>(
path: P, format: Manager::Format, options: Manager::Options
) -> Result<Self, Manager::Error>
where T: Default {
let path = path.as_ref().to_owned();
spawn_blocking!(Container::<T, _>::create_or_default(path, format, options)).map(From::from)
}
#[deprecated = "use `ContainerSharedAsync::operate` instead"]
pub async fn operate_nonblocking<F, R>(&self, operation: F) -> R
where F: FnOnce(&T) -> R + Send + 'static, R: Send + 'static {
let guard = self.access_owned().await;
spawn_blocking!(operation(&guard))
}
#[deprecated = "use `ContainerSharedAsync::operate_mut` instead"]
pub async fn operate_mut_nonblocking<F, R>(&self, operation: F) -> R
where F: FnOnce(&mut T) -> R + Send + 'static, R: Send + 'static {
let mut guard = self.access_owned_mut().await;
spawn_blocking!(operation(&mut guard))
}
#[doc(alias = "operate_load", alias = "operate_reload")]
pub async fn operate_refresh<F, R>(&self, operation: F) -> Result<R, Manager::Error>
where F: AsyncFnOnce(&T, T) -> R {
let mut guard = self.access_owned_mut().await;
let (old_value, guard) = spawn_blocking!(guard.container_mut().refresh().map(|t| (t, guard)))?;
let guard = OwnedAccessGuardMut::downgrade(guard);
Ok(operation(&guard, old_value).await)
}
#[doc(alias = "operate_mut_store", alias = "operate_mut_save")]
pub async fn operate_mut_commit<F, R, U>(&self, operation: F) -> Result<R, OrUserError<Manager::Error, U>>
where F: AsyncFnOnce(&mut T) -> Result<R, U> {
let mut guard = self.access_owned_mut().await;
let ret = operation(&mut guard).await.map_err(OrUserError::User)?;
Self::commit_with_guard_owned(guard).await?;
Ok(ret)
}
#[doc(alias = "load", alias = "reload")]
pub async fn refresh(&self) -> Result<T, Manager::Error> {
let mut guard = self.access_owned_mut().await;
spawn_blocking!(guard.container_mut().refresh())
}
#[doc(alias = "store", alias = "save")]
pub async fn commit(&self) -> Result<(), Manager::Error> {
let guard = self.access_owned_mut().await;
Self::commit_with_guard_owned(guard).await
}
pub async fn commit_with_guard_owned(mut guard: OwnedAccessGuardMut<T, Manager>) -> Result<(), Manager::Error> {
spawn_blocking!(guard.container_mut().commit())
}
pub async fn overwrite(&self, value: T) -> Result<(), Manager::Error> {
let mut guard = self.access_owned_mut().await;
spawn_blocking!(guard.container_mut().overwrite(value))
}
}
impl<T, Manager> Clone for ContainerSharedAsync<T, Manager> {
#[inline]
fn clone(&self) -> Self {
ContainerSharedAsync { ptr: Arc::clone(&self.ptr) }
}
}
impl<T, Manager> From<Container<T, Manager>> for ContainerSharedAsync<T, Manager> {
#[inline]
fn from(container: Container<T, Manager>) -> Self {
ContainerSharedAsync { ptr: Arc::new(RwLock::new(container)) }
}
}