async_bb8_diesel/
connection_manager.rs1use crate::{Connection, ConnectionError};
4use async_trait::async_trait;
5use diesel::r2d2::{self, ManageConnection, R2D2Connection};
6use std::sync::{Arc, Mutex};
7
8#[derive(Clone)]
38pub struct ConnectionManager<T> {
39 inner: Arc<Mutex<r2d2::ConnectionManager<T>>>,
40}
41
42impl<T: Send + 'static> ConnectionManager<T> {
43 pub fn new<S: Into<String>>(database_url: S) -> Self {
44 Self {
45 inner: Arc::new(Mutex::new(r2d2::ConnectionManager::new(database_url))),
46 }
47 }
48
49 async fn run_blocking<R, F>(&self, f: F) -> R
50 where
51 R: Send + 'static,
52 F: Send + 'static + FnOnce(&r2d2::ConnectionManager<T>) -> R,
53 {
54 let cloned = self.inner.clone();
55 tokio::task::spawn_blocking(move || f(&*cloned.lock().unwrap()))
56 .await
57 .unwrap()
59 }
60}
61
62#[async_trait]
63impl<T> bb8::ManageConnection for ConnectionManager<T>
64where
65 T: R2D2Connection + Send + 'static,
66{
67 type Connection = Connection<T>;
68 type Error = ConnectionError;
69
70 async fn connect(&self) -> Result<Self::Connection, Self::Error> {
71 self.run_blocking(|m| m.connect())
72 .await
73 .map(Connection::new)
74 .map_err(ConnectionError::Connection)
75 }
76
77 async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
78 let c = Connection(conn.0.clone());
79 self.run_blocking(move |m| {
80 m.is_valid(&mut *c.inner())?;
81 Ok(())
82 })
83 .await
84 }
85
86 fn has_broken(&self, _: &mut Self::Connection) -> bool {
87 false
91 }
92}