1use std::ops::Deref;
23
24use async_trait::async_trait;
25use deadpool::managed::{self, Pool, PoolError};
26
27use unreql::{
28 cmd::{connect, options::RunOptions, run},
29 r, Connection, Error, Session,
30};
31
32#[derive(Debug)]
33pub struct SessionManager {
34 options: connect::Options,
35}
36
37impl SessionManager {
38 pub fn new(options: connect::Options) -> Self {
39 Self { options }
40 }
41
42 pub async fn new_session(&self) -> Result<Session, Error> {
45 r.connect(self.options.clone()).await
46 }
47}
48
49impl managed::Manager for SessionManager {
50 type Type = Session;
51 type Error = Error;
52
53 async fn create(&self) -> Result<Self::Type, Self::Error> {
54 self.new_session().await
55 }
56
57 async fn recycle(
58 &self,
59 conn: &mut Self::Type,
60 _: &managed::Metrics,
61 ) -> managed::RecycleResult<Error> {
62 let _: i64 = r.expr(200).exec(conn).await?;
63 Ok(())
64 }
65}
66
67#[derive(Debug, Clone)]
68pub struct PoolWrapper(Pool<SessionManager>);
69
70impl Deref for PoolWrapper {
71 type Target = Pool<SessionManager>;
72
73 fn deref(&self) -> &Self::Target {
74 &self.0
75 }
76}
77
78#[async_trait]
79impl run::Arg for &PoolWrapper {
80 async fn into_run_opts(self, for_changes: bool) -> Result<(Connection, RunOptions), Error> {
81 if for_changes {
82 let sess = self.manager().new_session().await?;
84 sess.into_run_opts(for_changes).await
85 } else {
86 let sess = match self.get().await {
88 Ok(v) => v,
89 Err(err) => {
90 return match err {
91 PoolError::Backend(err) => Err(err),
92 _ => Err(Error::Driver(unreql::Driver::Other(err.to_string()))),
93 }
94 }
95 };
96 sess.into_run_opts(for_changes).await
97 }
98 }
99}
100
101#[async_trait]
102impl run::Arg for PoolWrapper {
103 async fn into_run_opts(self, for_changes: bool) -> Result<(Connection, RunOptions), Error> {
104 if for_changes {
105 let sess = self.manager().new_session().await?;
107 sess.into_run_opts(for_changes).await
108 } else {
109 let sess = match self.get().await {
111 Ok(v) => v,
112 Err(err) => {
113 return match err {
114 PoolError::Backend(err) => Err(err),
115 _ => Err(Error::Driver(unreql::Driver::Other(err.to_string()))),
116 }
117 }
118 };
119 sess.into_run_opts(for_changes).await
120 }
121 }
122}
123
124pub trait IntoPoolWrapper {
125 fn wrapper(self) -> PoolWrapper;
126}
127
128impl IntoPoolWrapper for Pool<SessionManager> {
129 fn wrapper(self) -> PoolWrapper {
130 self.into()
131 }
132}
133
134impl From<Pool<SessionManager>> for PoolWrapper {
135 fn from(pool: Pool<SessionManager>) -> Self {
136 Self(pool)
137 }
138}