Skip to main content

deadpool_sync/
lib.rs

1#![doc = include_str!("../README.md")]
2#![cfg_attr(docsrs, feature(doc_cfg))]
3#![deny(
4    nonstandard_style,
5    rust_2018_idioms,
6    rustdoc::broken_intra_doc_links,
7    rustdoc::private_intra_doc_links
8)]
9#![forbid(non_ascii_idents, unsafe_code)]
10#![warn(
11    deprecated_in_future,
12    missing_copy_implementations,
13    missing_debug_implementations,
14    missing_docs,
15    unreachable_pub,
16    unused_import_braces,
17    unused_labels,
18    unused_lifetimes,
19    unused_qualifications,
20    unused_results
21)]
22
23pub mod reexports;
24
25use std::{
26    any::Any,
27    fmt,
28    ops::{Deref, DerefMut},
29    sync::{Arc, Mutex, MutexGuard, PoisonError, TryLockError},
30};
31
32use deadpool_runtime::{Runtime, SpawnBlockingError, spawn_blocking, spawn_blocking_background};
33
34/// Possible errors returned when [`SyncWrapper::interact()`] fails.
35#[derive(Debug)]
36pub enum InteractError {
37    /// Provided callback has panicked.
38    Panic(Box<dyn Any + Send + 'static>),
39
40    /// Callback was cancelled. This variant needs to exist for technical
41    /// reasons but you should never actually be able to get this as a
42    /// return value when calling `SyncWrapper::interact`.
43    Cancelled,
44}
45
46impl fmt::Display for InteractError {
47    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
48        match self {
49            Self::Panic(_) => write!(f, "Panic"),
50            Self::Cancelled => write!(f, "Cancelled"),
51        }
52    }
53}
54
55impl std::error::Error for InteractError {}
56
57impl From<SpawnBlockingError> for InteractError {
58    fn from(value: SpawnBlockingError) -> Self {
59        match value {
60            SpawnBlockingError::Panic(p) => Self::Panic(p),
61            SpawnBlockingError::Cancelled => Self::Cancelled,
62        }
63    }
64}
65
66/// Wrapper for objects which only provides blocking functions that need to be
67/// called on a separate thread.
68///
69/// Access to the wrapped object is provided via the [`SyncWrapper::interact()`]
70/// method.
71#[must_use]
72pub struct SyncWrapper<T>
73where
74    T: Send + 'static,
75{
76    obj: Arc<Mutex<Option<T>>>,
77    runtime: Runtime,
78}
79
80// Implemented manually to avoid unnecessary trait bound on `E` type parameter.
81impl<T> fmt::Debug for SyncWrapper<T>
82where
83    T: fmt::Debug + Send + 'static,
84{
85    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
86        f.debug_struct("SyncWrapper")
87            .field("obj", &self.obj)
88            .field("runtime", &self.runtime)
89            .finish()
90    }
91}
92
93impl<T> SyncWrapper<T>
94where
95    T: Send + 'static,
96{
97    /// Creates a new wrapped object.
98    pub async fn new<F, E>(runtime: Runtime, f: F) -> Result<Self, E>
99    where
100        F: FnOnce() -> Result<T, E> + Send + 'static,
101        E: Send + 'static,
102    {
103        let result = match spawn_blocking(runtime, f).await {
104            // FIXME: Panicking when the creation panics is not nice.
105            // In order to handle this properly the Manager::create
106            // methods needs to support a custom error enum which
107            // supports a Panic variant.
108            Err(SpawnBlockingError::Panic(e)) => panic!("{e:?}"),
109            Err(SpawnBlockingError::Cancelled) => panic!("Task has been cancelled"),
110            Ok(obj) => obj,
111        };
112        result.map(|obj| Self {
113            obj: Arc::new(Mutex::new(Some(obj))),
114            runtime,
115        })
116    }
117
118    /// Interacts with the underlying object.
119    ///
120    /// Expects a closure that takes the object as its parameter.
121    /// The closure is executed in a separate thread so that the async runtime
122    /// is not blocked.
123    pub async fn interact<F, R>(&self, f: F) -> Result<R, InteractError>
124    where
125        F: FnOnce(&mut T) -> R + Send + 'static,
126        R: Send + 'static,
127    {
128        let arc = self.obj.clone();
129        #[cfg(feature = "tracing")]
130        let span = tracing::Span::current();
131        spawn_blocking(self.runtime, move || {
132            let mut guard = arc.lock().unwrap();
133            let conn: &mut T = guard.as_mut().ok_or(InteractError::Cancelled)?;
134            #[cfg(feature = "tracing")]
135            let _span = span.enter();
136            Ok(f(conn))
137        })
138        .await
139        .map_err(InteractError::from)?
140    }
141
142    /// Indicates whether the underlying [`Mutex`] has been poisoned.
143    ///
144    /// This happens when a panic occurs while interacting with the object.
145    pub fn is_mutex_poisoned(&self) -> bool {
146        self.obj.is_poisoned()
147    }
148
149    /// Lock the underlying mutex and return a guard for the inner
150    /// object.
151    pub fn lock(&self) -> Result<SyncGuard<'_, T>, PoisonError<MutexGuard<'_, Option<T>>>> {
152        self.obj.lock().map(SyncGuard)
153    }
154
155    /// Try to lock the underlying mutex and return a guard for the
156    /// inner object.
157    pub fn try_lock(&self) -> Result<SyncGuard<'_, T>, TryLockError<MutexGuard<'_, Option<T>>>> {
158        self.obj.try_lock().map(SyncGuard)
159    }
160}
161
162impl<T> Drop for SyncWrapper<T>
163where
164    T: Send + 'static,
165{
166    fn drop(&mut self) {
167        let arc = self.obj.clone();
168        // Drop the `rusqlite::Connection` inside a `spawn_blocking`
169        // as the `drop` function of it can block.
170        spawn_blocking_background(self.runtime, move || match arc.lock() {
171            Ok(mut guard) => drop(guard.take()),
172            Err(e) => drop(e.into_inner().take()),
173        })
174        .unwrap();
175    }
176}
177
178/// This guard is returned when calling `SyncWrapper::lock` or
179/// `SyncWrapper::try_lock`. This is basically just a wrapper around
180/// a `MutexGuard` but hides some implementation details.
181///
182/// **Important:** Any blocking operation using this object
183/// should be executed on a separate thread (e.g. via `spawn_blocking`).
184#[derive(Debug)]
185pub struct SyncGuard<'a, T: Send>(MutexGuard<'a, Option<T>>);
186
187impl<T: Send> Deref for SyncGuard<'_, T> {
188    type Target = T;
189    fn deref(&self) -> &Self::Target {
190        self.0.as_ref().unwrap()
191    }
192}
193
194impl<T: Send> DerefMut for SyncGuard<'_, T> {
195    fn deref_mut(&mut self) -> &mut Self::Target {
196        self.0.as_mut().unwrap()
197    }
198}
199
200impl<T: Send> AsRef<T> for SyncGuard<'_, T> {
201    fn as_ref(&self) -> &T {
202        self.0.as_ref().unwrap()
203    }
204}
205
206impl<T: Send> AsMut<T> for SyncGuard<'_, T> {
207    fn as_mut(&mut self) -> &mut T {
208        self.0.as_mut().unwrap()
209    }
210}