Skip to main content

unreql_deadpool/
lib.rs

1//! # Deadpool for UnReQL
2//!
3//! This crate implements a [`deadpool`](https://crates.io/crates/deadpool)
4//! manager for [`unreql`](https://crates.io/crates/unreql).
5//!
6//! ## Example
7//!
8//! ```rust
9//! use unreql::{r, cmd::connect};
10//! use unreql_deadpool::{IntoPoolWrapper, SessionManager};
11//! use deadpool::managed::Pool;
12//!
13//! # async fn example() -> unreql::Result<()> {
14//! let cfg = connect::Options::default();
15//! let manager = SessionManager::new(cfg);
16//! let pool = Pool::builder(manager).max_size(20).build().unwrap().wrapper();
17//! # #[derive(serde::Deserialize)] struct User;
18//! let user: User = r.table("users").get("id").exec(&pool).await?;
19//! # Ok(()) }
20//! ```
21
22use std::ops::Deref;
23
24use async_trait::async_trait;
25use deadpool::managed::{self, Pool, PoolError};
26
27use unreql::{
28    cmd::{connect, options::RunOptions, run},
29    r, Connection, Error, Session,
30};
31
32#[derive(Debug)]
33pub struct SessionManager {
34    options: connect::Options,
35}
36
37impl SessionManager {
38    pub fn new(options: connect::Options) -> Self {
39        Self { options }
40    }
41
42    /// Get a new session outside the pool.
43    /// Use the new session to create a connection for changes
44    pub async fn new_session(&self) -> Result<Session, Error> {
45        r.connect(self.options.clone()).await
46    }
47}
48
49impl managed::Manager for SessionManager {
50    type Type = Session;
51    type Error = Error;
52
53    async fn create(&self) -> Result<Self::Type, Self::Error> {
54        self.new_session().await
55    }
56
57    async fn recycle(
58        &self,
59        conn: &mut Self::Type,
60        _: &managed::Metrics,
61    ) -> managed::RecycleResult<Error> {
62        let _: i64 = r.expr(200).exec(conn).await?;
63        Ok(())
64    }
65}
66
67#[derive(Debug, Clone)]
68pub struct PoolWrapper(Pool<SessionManager>);
69
70impl Deref for PoolWrapper {
71    type Target = Pool<SessionManager>;
72
73    fn deref(&self) -> &Self::Target {
74        &self.0
75    }
76}
77
78#[async_trait]
79impl run::Arg for &PoolWrapper {
80    async fn into_run_opts(self, for_changes: bool) -> Result<(Connection, RunOptions), Error> {
81        if for_changes {
82            // for `changes` create a separate new connection to DB
83            let sess = self.manager().new_session().await?;
84            sess.into_run_opts(for_changes).await
85        } else {
86            // otherwise the available connection is used
87            let sess = match self.get().await {
88                Ok(v) => v,
89                Err(err) => {
90                    return match err {
91                        PoolError::Backend(err) => Err(err),
92                        _ => Err(Error::Driver(unreql::Driver::Other(err.to_string()))),
93                    }
94                }
95            };
96            sess.into_run_opts(for_changes).await
97        }
98    }
99}
100
101#[async_trait]
102impl run::Arg for PoolWrapper {
103    async fn into_run_opts(self, for_changes: bool) -> Result<(Connection, RunOptions), Error> {
104        if for_changes {
105            // for `changes` create a separate new connection to DB
106            let sess = self.manager().new_session().await?;
107            sess.into_run_opts(for_changes).await
108        } else {
109            // otherwise the available connection is used
110            let sess = match self.get().await {
111                Ok(v) => v,
112                Err(err) => {
113                    return match err {
114                        PoolError::Backend(err) => Err(err),
115                        _ => Err(Error::Driver(unreql::Driver::Other(err.to_string()))),
116                    }
117                }
118            };
119            sess.into_run_opts(for_changes).await
120        }
121    }
122}
123
124pub trait IntoPoolWrapper {
125    fn wrapper(self) -> PoolWrapper;
126}
127
128impl IntoPoolWrapper for Pool<SessionManager> {
129    fn wrapper(self) -> PoolWrapper {
130        self.into()
131    }
132}
133
134impl From<Pool<SessionManager>> for PoolWrapper {
135    fn from(pool: Pool<SessionManager>) -> Self {
136        Self(pool)
137    }
138}