1#![doc = include_str!("../README.md")]
2#![cfg_attr(docsrs, feature(doc_cfg))]
3#![deny(
4 nonstandard_style,
5 rust_2018_idioms,
6 rustdoc::broken_intra_doc_links,
7 rustdoc::private_intra_doc_links
8)]
9#![forbid(non_ascii_idents, unsafe_code)]
10#![warn(
11 deprecated_in_future,
12 missing_copy_implementations,
13 missing_debug_implementations,
14 missing_docs,
15 unreachable_pub,
16 unused_import_braces,
17 unused_labels,
18 unused_lifetimes,
19 unused_qualifications,
20 unused_results
21)]
22
23pub mod reexports;
24
25use std::{
26 any::Any,
27 fmt,
28 ops::{Deref, DerefMut},
29 sync::{Arc, Mutex, MutexGuard, PoisonError, TryLockError},
30};
31
32use deadpool_runtime::{Runtime, SpawnBlockingError, spawn_blocking, spawn_blocking_background};
33
34#[derive(Debug)]
36pub enum InteractError {
37 Panic(Box<dyn Any + Send + 'static>),
39
40 Cancelled,
44}
45
46impl fmt::Display for InteractError {
47 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
48 match self {
49 Self::Panic(_) => write!(f, "Panic"),
50 Self::Cancelled => write!(f, "Cancelled"),
51 }
52 }
53}
54
55impl std::error::Error for InteractError {}
56
57impl From<SpawnBlockingError> for InteractError {
58 fn from(value: SpawnBlockingError) -> Self {
59 match value {
60 SpawnBlockingError::Panic(p) => Self::Panic(p),
61 SpawnBlockingError::Cancelled => Self::Cancelled,
62 }
63 }
64}
65
66#[must_use]
72pub struct SyncWrapper<T>
73where
74 T: Send + 'static,
75{
76 obj: Arc<Mutex<Option<T>>>,
77 runtime: Runtime,
78}
79
80impl<T> fmt::Debug for SyncWrapper<T>
82where
83 T: fmt::Debug + Send + 'static,
84{
85 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
86 f.debug_struct("SyncWrapper")
87 .field("obj", &self.obj)
88 .field("runtime", &self.runtime)
89 .finish()
90 }
91}
92
93impl<T> SyncWrapper<T>
94where
95 T: Send + 'static,
96{
97 pub async fn new<F, E>(runtime: Runtime, f: F) -> Result<Self, E>
99 where
100 F: FnOnce() -> Result<T, E> + Send + 'static,
101 E: Send + 'static,
102 {
103 let result = match spawn_blocking(runtime, f).await {
104 Err(SpawnBlockingError::Panic(e)) => panic!("{e:?}"),
109 Err(SpawnBlockingError::Cancelled) => panic!("Task has been cancelled"),
110 Ok(obj) => obj,
111 };
112 result.map(|obj| Self {
113 obj: Arc::new(Mutex::new(Some(obj))),
114 runtime,
115 })
116 }
117
118 pub async fn interact<F, R>(&self, f: F) -> Result<R, InteractError>
124 where
125 F: FnOnce(&mut T) -> R + Send + 'static,
126 R: Send + 'static,
127 {
128 let arc = self.obj.clone();
129 #[cfg(feature = "tracing")]
130 let span = tracing::Span::current();
131 spawn_blocking(self.runtime, move || {
132 let mut guard = arc.lock().unwrap();
133 let conn: &mut T = guard.as_mut().ok_or(InteractError::Cancelled)?;
134 #[cfg(feature = "tracing")]
135 let _span = span.enter();
136 Ok(f(conn))
137 })
138 .await
139 .map_err(InteractError::from)?
140 }
141
142 pub fn is_mutex_poisoned(&self) -> bool {
146 self.obj.is_poisoned()
147 }
148
149 pub fn lock(&self) -> Result<SyncGuard<'_, T>, PoisonError<MutexGuard<'_, Option<T>>>> {
152 self.obj.lock().map(SyncGuard)
153 }
154
155 pub fn try_lock(&self) -> Result<SyncGuard<'_, T>, TryLockError<MutexGuard<'_, Option<T>>>> {
158 self.obj.try_lock().map(SyncGuard)
159 }
160}
161
162impl<T> Drop for SyncWrapper<T>
163where
164 T: Send + 'static,
165{
166 fn drop(&mut self) {
167 let arc = self.obj.clone();
168 spawn_blocking_background(self.runtime, move || match arc.lock() {
171 Ok(mut guard) => drop(guard.take()),
172 Err(e) => drop(e.into_inner().take()),
173 })
174 .unwrap();
175 }
176}
177
178#[derive(Debug)]
185pub struct SyncGuard<'a, T: Send>(MutexGuard<'a, Option<T>>);
186
187impl<T: Send> Deref for SyncGuard<'_, T> {
188 type Target = T;
189 fn deref(&self) -> &Self::Target {
190 self.0.as_ref().unwrap()
191 }
192}
193
194impl<T: Send> DerefMut for SyncGuard<'_, T> {
195 fn deref_mut(&mut self) -> &mut Self::Target {
196 self.0.as_mut().unwrap()
197 }
198}
199
200impl<T: Send> AsRef<T> for SyncGuard<'_, T> {
201 fn as_ref(&self) -> &T {
202 self.0.as_ref().unwrap()
203 }
204}
205
206impl<T: Send> AsMut<T> for SyncGuard<'_, T> {
207 fn as_mut(&mut self) -> &mut T {
208 self.0.as_mut().unwrap()
209 }
210}