deadpool_sync/
lib.rs

1#![doc = include_str!("../README.md")]
2#![cfg_attr(docsrs, feature(doc_cfg))]
3#![deny(
4    nonstandard_style,
5    rust_2018_idioms,
6    rustdoc::broken_intra_doc_links,
7    rustdoc::private_intra_doc_links
8)]
9#![forbid(non_ascii_idents, unsafe_code)]
10#![warn(
11    deprecated_in_future,
12    missing_copy_implementations,
13    missing_debug_implementations,
14    missing_docs,
15    unreachable_pub,
16    unused_import_braces,
17    unused_labels,
18    unused_lifetimes,
19    unused_qualifications,
20    unused_results
21)]
22
23pub mod reexports;
24
25use std::{
26    any::Any,
27    fmt,
28    ops::{Deref, DerefMut},
29    sync::{Arc, Mutex, MutexGuard, PoisonError, TryLockError},
30};
31
32use deadpool_runtime::{Runtime, SpawnBlockingError};
33
34/// Possible errors returned when [`SyncWrapper::interact()`] fails.
35#[derive(Debug)]
36pub enum InteractError {
37    /// Provided callback has panicked.
38    Panic(Box<dyn Any + Send + 'static>),
39
40    /// Callback was aborted. This variant needs to exist for technical
41    /// reasons but you should never actually be able to get this as a
42    /// return value when calling `SyncWrapper::interact`.
43    Aborted,
44}
45
46impl fmt::Display for InteractError {
47    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
48        match self {
49            Self::Panic(_) => write!(f, "Panic"),
50            Self::Aborted => write!(f, "Aborted"),
51        }
52    }
53}
54
55impl std::error::Error for InteractError {}
56
57/// Wrapper for objects which only provides blocking functions that need to be
58/// called on a separate thread.
59///
60/// Access to the wrapped object is provided via the [`SyncWrapper::interact()`]
61/// method.
62#[must_use]
63pub struct SyncWrapper<T>
64where
65    T: Send + 'static,
66{
67    obj: Arc<Mutex<Option<T>>>,
68    runtime: Runtime,
69}
70
71// Implemented manually to avoid unnecessary trait bound on `E` type parameter.
72impl<T> fmt::Debug for SyncWrapper<T>
73where
74    T: fmt::Debug + Send + 'static,
75{
76    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
77        f.debug_struct("SyncWrapper")
78            .field("obj", &self.obj)
79            .field("runtime", &self.runtime)
80            .finish()
81    }
82}
83
84impl<T> SyncWrapper<T>
85where
86    T: Send + 'static,
87{
88    /// Creates a new wrapped object.
89    pub async fn new<F, E>(runtime: Runtime, f: F) -> Result<Self, E>
90    where
91        F: FnOnce() -> Result<T, E> + Send + 'static,
92        E: Send + 'static,
93    {
94        let result = match runtime.spawn_blocking(f).await {
95            // FIXME: Panicking when the creation panics is not nice.
96            // In order to handle this properly the Manager::create
97            // methods needs to support a custom error enum which
98            // supports a Panic variant.
99            Err(SpawnBlockingError::Panic(e)) => panic!("{:?}", e),
100            Ok(obj) => obj,
101        };
102        result.map(|obj| Self {
103            obj: Arc::new(Mutex::new(Some(obj))),
104            runtime,
105        })
106    }
107
108    /// Interacts with the underlying object.
109    ///
110    /// Expects a closure that takes the object as its parameter.
111    /// The closure is executed in a separate thread so that the async runtime
112    /// is not blocked.
113    pub async fn interact<F, R>(&self, f: F) -> Result<R, InteractError>
114    where
115        F: FnOnce(&mut T) -> R + Send + 'static,
116        R: Send + 'static,
117    {
118        let arc = self.obj.clone();
119        #[cfg(feature = "tracing")]
120        let span = tracing::Span::current();
121        self.runtime
122            .spawn_blocking(move || {
123                let mut guard = arc.lock().unwrap();
124                let conn: &mut T = guard.as_mut().ok_or(InteractError::Aborted)?;
125                #[cfg(feature = "tracing")]
126                let _span = span.enter();
127                Ok(f(conn))
128            })
129            .await
130            .map_err(|SpawnBlockingError::Panic(p)| InteractError::Panic(p))?
131    }
132
133    /// Indicates whether the underlying [`Mutex`] has been poisoned.
134    ///
135    /// This happens when a panic occurs while interacting with the object.
136    pub fn is_mutex_poisoned(&self) -> bool {
137        self.obj.is_poisoned()
138    }
139
140    /// Lock the underlying mutex and return a guard for the inner
141    /// object.
142    pub fn lock(&self) -> Result<SyncGuard<'_, T>, PoisonError<MutexGuard<'_, Option<T>>>> {
143        self.obj.lock().map(SyncGuard)
144    }
145
146    /// Try to lock the underlying mutex and return a guard for the
147    /// inner object.
148    pub fn try_lock(&self) -> Result<SyncGuard<'_, T>, TryLockError<MutexGuard<'_, Option<T>>>> {
149        self.obj.try_lock().map(SyncGuard)
150    }
151}
152
153impl<T> Drop for SyncWrapper<T>
154where
155    T: Send + 'static,
156{
157    fn drop(&mut self) {
158        let arc = self.obj.clone();
159        // Drop the `rusqlite::Connection` inside a `spawn_blocking`
160        // as the `drop` function of it can block.
161        self.runtime
162            .spawn_blocking_background(move || match arc.lock() {
163                Ok(mut guard) => drop(guard.take()),
164                Err(e) => drop(e.into_inner().take()),
165            })
166            .unwrap();
167    }
168}
169
170/// This guard is returned when calling `SyncWrapper::lock` or
171/// `SyncWrapper::try_lock`. This is basicly just a wrapper around
172/// a `MutexGuard` but hides some implementation details.
173///
174/// **Important:** Any blocking operation using this object
175/// should be executed on a separate thread (e.g. via `spawn_blocking`).
176#[derive(Debug)]
177pub struct SyncGuard<'a, T: Send>(MutexGuard<'a, Option<T>>);
178
179impl<'a, T: Send> Deref for SyncGuard<'a, T> {
180    type Target = T;
181    fn deref(&self) -> &Self::Target {
182        self.0.as_ref().unwrap()
183    }
184}
185
186impl<'a, T: Send> DerefMut for SyncGuard<'a, T> {
187    fn deref_mut(&mut self) -> &mut Self::Target {
188        self.0.as_mut().unwrap()
189    }
190}
191
192impl<'a, T: Send> AsRef<T> for SyncGuard<'a, T> {
193    fn as_ref(&self) -> &T {
194        self.0.as_ref().unwrap()
195    }
196}
197
198impl<'a, T: Send> AsMut<T> for SyncGuard<'a, T> {
199    fn as_mut(&mut self) -> &mut T {
200        self.0.as_mut().unwrap()
201    }
202}