Skip to main content

deadpool_sync/
lib.rs

1#![doc = include_str!("../README.md")]
2#![cfg_attr(docsrs, feature(doc_cfg))]
3#![deny(
4    nonstandard_style,
5    rust_2018_idioms,
6    rustdoc::broken_intra_doc_links,
7    rustdoc::private_intra_doc_links
8)]
9#![forbid(non_ascii_idents, unsafe_code)]
10#![warn(
11    deprecated_in_future,
12    missing_copy_implementations,
13    missing_debug_implementations,
14    missing_docs,
15    unreachable_pub,
16    unused_import_braces,
17    unused_labels,
18    unused_lifetimes,
19    unused_qualifications,
20    unused_results
21)]
22
23pub mod reexports;
24
25use std::{
26    any::Any,
27    fmt,
28    ops::{Deref, DerefMut},
29    sync::{Arc, Mutex, MutexGuard, PoisonError, TryLockError},
30};
31
32use deadpool_runtime::{Runtime, SpawnBlockingError};
33
34/// Possible errors returned when [`SyncWrapper::interact()`] fails.
35#[derive(Debug)]
36pub enum InteractError {
37    /// Provided callback has panicked.
38    Panic(Box<dyn Any + Send + 'static>),
39
40    /// Callback was aborted. This variant needs to exist for technical
41    /// reasons but you should never actually be able to get this as a
42    /// return value when calling `SyncWrapper::interact`.
43    Aborted,
44}
45
46impl fmt::Display for InteractError {
47    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
48        match self {
49            Self::Panic(_) => write!(f, "Panic"),
50            Self::Aborted => write!(f, "Aborted"),
51        }
52    }
53}
54
55impl std::error::Error for InteractError {}
56
57impl From<SpawnBlockingError> for InteractError {
58    fn from(value: SpawnBlockingError) -> Self {
59        match value {
60            SpawnBlockingError::Panic(p) => Self::Panic(p),
61            SpawnBlockingError::Cancelled => Self::Aborted,
62        }
63    }
64}
65
66/// Wrapper for objects which only provides blocking functions that need to be
67/// called on a separate thread.
68///
69/// Access to the wrapped object is provided via the [`SyncWrapper::interact()`]
70/// method.
71#[must_use]
72pub struct SyncWrapper<T>
73where
74    T: Send + 'static,
75{
76    obj: Arc<Mutex<Option<T>>>,
77    runtime: Runtime,
78}
79
80// Implemented manually to avoid unnecessary trait bound on `E` type parameter.
81impl<T> fmt::Debug for SyncWrapper<T>
82where
83    T: fmt::Debug + Send + 'static,
84{
85    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
86        f.debug_struct("SyncWrapper")
87            .field("obj", &self.obj)
88            .field("runtime", &self.runtime)
89            .finish()
90    }
91}
92
93impl<T> SyncWrapper<T>
94where
95    T: Send + 'static,
96{
97    /// Creates a new wrapped object.
98    pub async fn new<F, E>(runtime: Runtime, f: F) -> Result<Self, E>
99    where
100        F: FnOnce() -> Result<T, E> + Send + 'static,
101        E: Send + 'static,
102    {
103        let result = match runtime.spawn_blocking(f).await {
104            // FIXME: Panicking when the creation panics is not nice.
105            // In order to handle this properly the Manager::create
106            // methods needs to support a custom error enum which
107            // supports a Panic variant.
108            Err(SpawnBlockingError::Panic(e)) => panic!("{e:?}"),
109            Err(SpawnBlockingError::Cancelled) => panic!("Task has been cancelled"),
110            Ok(obj) => obj,
111        };
112        result.map(|obj| Self {
113            obj: Arc::new(Mutex::new(Some(obj))),
114            runtime,
115        })
116    }
117
118    /// Interacts with the underlying object.
119    ///
120    /// Expects a closure that takes the object as its parameter.
121    /// The closure is executed in a separate thread so that the async runtime
122    /// is not blocked.
123    pub async fn interact<F, R>(&self, f: F) -> Result<R, InteractError>
124    where
125        F: FnOnce(&mut T) -> R + Send + 'static,
126        R: Send + 'static,
127    {
128        let arc = self.obj.clone();
129        #[cfg(feature = "tracing")]
130        let span = tracing::Span::current();
131        self.runtime
132            .spawn_blocking(move || {
133                let mut guard = arc.lock().unwrap();
134                let conn: &mut T = guard.as_mut().ok_or(InteractError::Aborted)?;
135                #[cfg(feature = "tracing")]
136                let _span = span.enter();
137                Ok(f(conn))
138            })
139            .await
140            .map_err(InteractError::from)?
141    }
142
143    /// Indicates whether the underlying [`Mutex`] has been poisoned.
144    ///
145    /// This happens when a panic occurs while interacting with the object.
146    pub fn is_mutex_poisoned(&self) -> bool {
147        self.obj.is_poisoned()
148    }
149
150    /// Lock the underlying mutex and return a guard for the inner
151    /// object.
152    pub fn lock(&self) -> Result<SyncGuard<'_, T>, PoisonError<MutexGuard<'_, Option<T>>>> {
153        self.obj.lock().map(SyncGuard)
154    }
155
156    /// Try to lock the underlying mutex and return a guard for the
157    /// inner object.
158    pub fn try_lock(&self) -> Result<SyncGuard<'_, T>, TryLockError<MutexGuard<'_, Option<T>>>> {
159        self.obj.try_lock().map(SyncGuard)
160    }
161}
162
163impl<T> Drop for SyncWrapper<T>
164where
165    T: Send + 'static,
166{
167    fn drop(&mut self) {
168        let arc = self.obj.clone();
169        // Drop the `rusqlite::Connection` inside a `spawn_blocking`
170        // as the `drop` function of it can block.
171        self.runtime
172            .spawn_blocking_background(move || match arc.lock() {
173                Ok(mut guard) => drop(guard.take()),
174                Err(e) => drop(e.into_inner().take()),
175            })
176            .unwrap();
177    }
178}
179
180/// This guard is returned when calling `SyncWrapper::lock` or
181/// `SyncWrapper::try_lock`. This is basically just a wrapper around
182/// a `MutexGuard` but hides some implementation details.
183///
184/// **Important:** Any blocking operation using this object
185/// should be executed on a separate thread (e.g. via `spawn_blocking`).
186#[derive(Debug)]
187pub struct SyncGuard<'a, T: Send>(MutexGuard<'a, Option<T>>);
188
189impl<T: Send> Deref for SyncGuard<'_, T> {
190    type Target = T;
191    fn deref(&self) -> &Self::Target {
192        self.0.as_ref().unwrap()
193    }
194}
195
196impl<T: Send> DerefMut for SyncGuard<'_, T> {
197    fn deref_mut(&mut self) -> &mut Self::Target {
198        self.0.as_mut().unwrap()
199    }
200}
201
202impl<T: Send> AsRef<T> for SyncGuard<'_, T> {
203    fn as_ref(&self) -> &T {
204        self.0.as_ref().unwrap()
205    }
206}
207
208impl<T: Send> AsMut<T> for SyncGuard<'_, T> {
209    fn as_mut(&mut self) -> &mut T {
210        self.0.as_mut().unwrap()
211    }
212}