1#![doc = include_str!("../README.md")]
2#![cfg_attr(docsrs, feature(doc_cfg))]
3#![deny(
4 nonstandard_style,
5 rust_2018_idioms,
6 rustdoc::broken_intra_doc_links,
7 rustdoc::private_intra_doc_links
8)]
9#![forbid(non_ascii_idents, unsafe_code)]
10#![warn(
11 deprecated_in_future,
12 missing_copy_implementations,
13 missing_debug_implementations,
14 missing_docs,
15 unreachable_pub,
16 unused_import_braces,
17 unused_labels,
18 unused_lifetimes,
19 unused_qualifications,
20 unused_results
21)]
22
23pub mod reexports;
24
25use std::{
26 any::Any,
27 fmt,
28 ops::{Deref, DerefMut},
29 sync::{Arc, Mutex, MutexGuard, PoisonError, TryLockError},
30};
31
32use deadpool_runtime::{Runtime, SpawnBlockingError};
33
34#[derive(Debug)]
36pub enum InteractError {
37 Panic(Box<dyn Any + Send + 'static>),
39
40 Aborted,
44}
45
46impl fmt::Display for InteractError {
47 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
48 match self {
49 Self::Panic(_) => write!(f, "Panic"),
50 Self::Aborted => write!(f, "Aborted"),
51 }
52 }
53}
54
55impl std::error::Error for InteractError {}
56
57#[must_use]
63pub struct SyncWrapper<T>
64where
65 T: Send + 'static,
66{
67 obj: Arc<Mutex<Option<T>>>,
68 runtime: Runtime,
69}
70
71impl<T> fmt::Debug for SyncWrapper<T>
73where
74 T: fmt::Debug + Send + 'static,
75{
76 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
77 f.debug_struct("SyncWrapper")
78 .field("obj", &self.obj)
79 .field("runtime", &self.runtime)
80 .finish()
81 }
82}
83
84impl<T> SyncWrapper<T>
85where
86 T: Send + 'static,
87{
88 pub async fn new<F, E>(runtime: Runtime, f: F) -> Result<Self, E>
90 where
91 F: FnOnce() -> Result<T, E> + Send + 'static,
92 E: Send + 'static,
93 {
94 let result = match runtime.spawn_blocking(f).await {
95 Err(SpawnBlockingError::Panic(e)) => panic!("{:?}", e),
100 Ok(obj) => obj,
101 };
102 result.map(|obj| Self {
103 obj: Arc::new(Mutex::new(Some(obj))),
104 runtime,
105 })
106 }
107
108 pub async fn interact<F, R>(&self, f: F) -> Result<R, InteractError>
114 where
115 F: FnOnce(&mut T) -> R + Send + 'static,
116 R: Send + 'static,
117 {
118 let arc = self.obj.clone();
119 #[cfg(feature = "tracing")]
120 let span = tracing::Span::current();
121 self.runtime
122 .spawn_blocking(move || {
123 let mut guard = arc.lock().unwrap();
124 let conn: &mut T = guard.as_mut().ok_or(InteractError::Aborted)?;
125 #[cfg(feature = "tracing")]
126 let _span = span.enter();
127 Ok(f(conn))
128 })
129 .await
130 .map_err(|SpawnBlockingError::Panic(p)| InteractError::Panic(p))?
131 }
132
133 pub fn is_mutex_poisoned(&self) -> bool {
137 self.obj.is_poisoned()
138 }
139
140 pub fn lock(&self) -> Result<SyncGuard<'_, T>, PoisonError<MutexGuard<'_, Option<T>>>> {
143 self.obj.lock().map(SyncGuard)
144 }
145
146 pub fn try_lock(&self) -> Result<SyncGuard<'_, T>, TryLockError<MutexGuard<'_, Option<T>>>> {
149 self.obj.try_lock().map(SyncGuard)
150 }
151}
152
153impl<T> Drop for SyncWrapper<T>
154where
155 T: Send + 'static,
156{
157 fn drop(&mut self) {
158 let arc = self.obj.clone();
159 self.runtime
162 .spawn_blocking_background(move || match arc.lock() {
163 Ok(mut guard) => drop(guard.take()),
164 Err(e) => drop(e.into_inner().take()),
165 })
166 .unwrap();
167 }
168}
169
170#[derive(Debug)]
177pub struct SyncGuard<'a, T: Send>(MutexGuard<'a, Option<T>>);
178
179impl<'a, T: Send> Deref for SyncGuard<'a, T> {
180 type Target = T;
181 fn deref(&self) -> &Self::Target {
182 self.0.as_ref().unwrap()
183 }
184}
185
186impl<'a, T: Send> DerefMut for SyncGuard<'a, T> {
187 fn deref_mut(&mut self) -> &mut Self::Target {
188 self.0.as_mut().unwrap()
189 }
190}
191
192impl<'a, T: Send> AsRef<T> for SyncGuard<'a, T> {
193 fn as_ref(&self) -> &T {
194 self.0.as_ref().unwrap()
195 }
196}
197
198impl<'a, T: Send> AsMut<T> for SyncGuard<'a, T> {
199 fn as_mut(&mut self) -> &mut T {
200 self.0.as_mut().unwrap()
201 }
202}