pub mod reexports {
pub use super::super::reexports::*;
pub use super::{InteractError, SyncGuard};
}
use std::{
any::Any,
fmt,
marker::PhantomData,
ops::{Deref, DerefMut},
sync::{Arc, Mutex, MutexGuard, PoisonError, TryLockError},
};
use crate::{Runtime, SpawnBlockingError};
#[derive(Debug)]
pub enum InteractError<E> {
Panic(Box<dyn Any + Send + 'static>),
Aborted,
Backend(E),
}
impl<E: fmt::Display> fmt::Display for InteractError<E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Panic(_) => write!(f, "Panic"),
Self::Aborted => write!(f, "Aborted"),
Self::Backend(e) => write!(f, "Backend error: {}", e),
}
}
}
impl<E: std::error::Error + 'static> std::error::Error for InteractError<E> {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::Panic(_) | Self::Aborted => None,
Self::Backend(e) => Some(e),
}
}
}
#[must_use]
pub struct SyncWrapper<T, E>
where
T: Send + 'static,
E: Send + 'static,
{
obj: Arc<Mutex<Option<T>>>,
runtime: Runtime,
_error: PhantomData<fn() -> E>,
}
impl<T, E> fmt::Debug for SyncWrapper<T, E>
where
T: fmt::Debug + Send + 'static,
E: Send + 'static,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SyncWrapper")
.field("obj", &self.obj)
.field("runtime", &self.runtime)
.field("_error", &self._error)
.finish()
}
}
impl<T, E> SyncWrapper<T, E>
where
T: Send + 'static,
E: Send + 'static,
{
pub async fn new<F>(runtime: Runtime, f: F) -> Result<Self, E>
where
F: FnOnce() -> Result<T, E> + Send + 'static,
{
let result = match runtime.spawn_blocking(move || f()).await {
Err(SpawnBlockingError::Panic(e)) => panic!("{:?}", e),
Ok(obj) => obj,
};
result.map(|obj| Self {
obj: Arc::new(Mutex::new(Some(obj))),
runtime,
_error: PhantomData::default(),
})
}
pub async fn interact<F, R>(&self, f: F) -> Result<R, InteractError<E>>
where
F: FnOnce(&mut T) -> Result<R, E> + Send + 'static,
R: Send + 'static,
{
let arc = self.obj.clone();
self.runtime
.spawn_blocking(move || {
let mut guard = arc.lock().unwrap();
let conn = guard.as_mut().unwrap();
f(conn)
})
.await
.map_err(|e| match e {
SpawnBlockingError::Panic(p) => InteractError::Panic(p),
})?
.map_err(InteractError::Backend)
}
pub fn is_mutex_poisoned(&self) -> bool {
self.obj.is_poisoned()
}
pub fn lock(&self) -> Result<SyncGuard<'_, T>, PoisonError<MutexGuard<'_, Option<T>>>> {
self.obj.lock().map(SyncGuard)
}
pub fn try_lock(&self) -> Result<SyncGuard<'_, T>, TryLockError<MutexGuard<'_, Option<T>>>> {
self.obj.try_lock().map(SyncGuard)
}
}
impl<T, E> Drop for SyncWrapper<T, E>
where
T: Send + 'static,
E: Send + 'static,
{
fn drop(&mut self) {
let arc = self.obj.clone();
self.runtime
.spawn_blocking_background(move || match arc.lock() {
Ok(mut guard) => drop(guard.take()),
Err(e) => drop(e.into_inner().take()),
})
.unwrap();
}
}
#[derive(Debug)]
pub struct SyncGuard<'a, T: Send>(MutexGuard<'a, Option<T>>);
impl<'a, T: Send> Deref for SyncGuard<'a, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
self.0.as_ref().unwrap()
}
}
impl<'a, T: Send> DerefMut for SyncGuard<'a, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.0.as_mut().unwrap()
}
}
impl<'a, T: Send> AsRef<T> for SyncGuard<'a, T> {
fn as_ref(&self) -> &T {
self.0.as_ref().unwrap()
}
}
impl<'a, T: Send> AsMut<T> for SyncGuard<'a, T> {
fn as_mut(&mut self) -> &mut T {
self.0.as_mut().unwrap()
}
}