rbdc_pool_fast/
lib.rs

1#![allow(mismatched_lifetime_syntaxes)]
2
3use dark_std::sync::AtomicDuration;
4use fast_pool::plugin::{CheckMode, DurationManager};
5use futures_core::future::BoxFuture;
6use log::error;
7use rbdc::db::{Connection, ExecResult, Row};
8use rbdc::pool::ConnectionGuard;
9use rbdc::pool::ConnectionManager;
10use rbdc::pool::Pool;
11use rbdc::Error;
12use rbs::value::map::ValueMap;
13use rbs::Value;
14use std::sync::Arc;
15use std::time::Duration;
16use rbdc::db::{ConnectOptions, Driver};
17
18#[derive(Debug,Clone)]
19pub struct FastPool {
20    pub manager: Arc<ConnectionManager>,
21    pub inner: fast_pool::Pool<DurationManager<ConnManagerProxy>>,
22    pub timeout: Arc<AtomicDuration>,
23}
24
25impl FastPool{
26    pub fn new_url<D: Driver + 'static>(driver: D, url: &str) -> Result<Self, Error>
27    where
28        Self: Sized,
29    {
30        Self::new(ConnectionManager::new(driver,url)?)
31    }
32
33    pub fn new_option<D: Driver + 'static, Options: ConnectOptions>(driver: D, options: Options) -> Result<Self, Error>
34    where
35        Self: Sized,
36    {
37        Self::new(ConnectionManager::new_options(driver,options))
38    }
39}
40
41#[derive(Debug)]
42pub struct ConnManagerProxy {
43     inner: ConnectionManager,
44}
45
46impl ConnManagerProxy{
47    pub fn new(inner: ConnectionManager) -> Self {
48        Self {
49            inner,
50        }
51    }
52}
53
54#[derive(Debug)]
55pub struct ConnProxy {
56    conn: Option<fast_pool::ConnectionGuard<DurationManager<ConnManagerProxy>>>,
57}
58
59impl From<ConnectionManager> for ConnManagerProxy {
60    fn from(value: ConnectionManager) -> Self {
61        ConnManagerProxy {
62            inner: value
63        }
64    }
65}
66
67#[async_trait::async_trait]
68impl Pool for FastPool {
69    fn new(manager: ConnectionManager) -> Result<Self, Error>
70    where
71        Self: Sized,
72    {
73        Ok(Self {
74            manager: manager.clone().into(),
75            inner: fast_pool::Pool::new(DurationManager::new(ConnManagerProxy::new(manager),CheckMode::NoLimit)),
76            timeout: Arc::new(AtomicDuration::new(None)),
77        })
78    }
79
80    async fn get(&self) -> Result<Box<dyn Connection>, Error> {
81        let v = self
82            .inner
83            .get_timeout(self.timeout.get())
84            .await
85            .map_err(|e| Error::from(e.to_string()))?;
86        let proxy = ConnProxy {
87            conn: Some(v),
88        };
89        Ok(Box::new(proxy))
90    }
91
92    async fn get_timeout(&self, mut d: Duration) -> Result<Box<dyn Connection>, Error> {
93        if d.is_zero() {
94            let state = self.inner.state();
95            if state.in_use < state.max_open {
96                d = Duration::from_secs(10);
97            } else {
98                return Err(Error::from("Time out in the connection pool"));
99            }
100        }
101        let v = self
102            .inner
103            .get_timeout(Some(d))
104            .await
105            .map_err(|e| Error::from(e.to_string()))?;
106        let proxy = ConnProxy {
107            conn: Some(v),
108        };
109        Ok(Box::new(proxy))
110    }
111
112    async fn set_timeout(&self, timeout: Option<Duration>) {
113        self.timeout.store(timeout);
114    }
115
116    async fn set_conn_max_lifetime(&self, max_lifetime: Option<Duration>) {
117        let manager = self.inner.downcast_manager::<DurationManager<ConnManagerProxy>>();
118        if let Some(manager) = manager {
119            if let Some(max_lifetime) = max_lifetime {
120                manager.mode.set_mode(CheckMode::MaxLifetime(max_lifetime));
121            }else{
122                manager.mode.set_mode(CheckMode::NoLimit);
123            }
124        }else{
125           error!("FastPool method set_conn_max_lifetime need use DurationManager to init");
126        }
127    }
128
129    async fn set_max_idle_conns(&self, n: u64) {
130        self.inner.set_max_idle_conns(n);
131    }
132
133    async fn set_max_open_conns(&self, n: u64) {
134        self.inner.set_max_open(n);
135    }
136
137    fn driver_type(&self) -> &str {
138        self.manager.driver_type()
139    }
140
141    async fn state(&self) -> Value {
142        let mut m = ValueMap::with_capacity(10);
143        let state = self.inner.state();
144        m.insert("max_open".to_string().into(), state.max_open.into());
145        m.insert("connections".to_string().into(), state.connections.into());
146        m.insert("in_use".to_string().into(), state.in_use.into());
147        m.insert("idle".to_string().into(), state.idle.into());
148        m.insert("waits".to_string().into(), state.waits.into());
149        m.insert("connecting".to_string().into(), state.connecting.into());
150        m.insert("checking".to_string().into(), state.checking.into());
151        Value::Map(m)
152    }
153}
154
155impl fast_pool::Manager for ConnManagerProxy {
156    type Connection = ConnectionGuard;
157    type Error = Error;
158
159    async fn connect(&self) -> Result<Self::Connection, Self::Error> {
160        self.inner.connect().await
161    }
162
163    async fn check(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
164        let r = self.inner.check(conn).await;
165        match r {
166            Ok(_) => Ok(()),
167            Err(e) => {
168                _ = conn.close().await;
169                Err(e)
170            }
171        }
172    }
173}
174
175impl Connection for ConnProxy {
176    fn get_rows(
177        &mut self,
178        sql: &str,
179        params: Vec<Value>,
180    ) -> BoxFuture<'_, Result<Vec<Box<dyn Row>>, Error>> {
181        if self.conn.is_none() {
182            return Box::pin(async { Err(Error::from("conn is drop")) });
183        }
184        self.conn.as_mut().unwrap().get_rows(sql, params)
185    }
186
187    fn get_values(
188        &mut self,
189        sql: &str,
190        params: Vec<Value>,
191    ) -> BoxFuture<'_, Result<Value, Error>> {
192        if self.conn.is_none() {
193            return Box::pin(async { Err(Error::from("conn is drop")) });
194        }
195        self.conn.as_mut().unwrap().get_values(sql, params)
196    }
197
198    fn exec(&mut self, sql: &str, params: Vec<Value>) -> BoxFuture<'_, Result<ExecResult, Error>> {
199        if self.conn.is_none() {
200            return Box::pin(async { Err(Error::from("conn is drop")) });
201        }
202        self.conn.as_mut().unwrap().exec(sql, params)
203    }
204
205    fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
206        if self.conn.is_none() {
207            return Box::pin(async { Err(Error::from("conn is drop")) });
208        }
209        self.conn.as_mut().unwrap().ping()
210    }
211
212    fn close(&mut self) -> BoxFuture<'_, Result<(), Error>> {
213        if self.conn.is_none() {
214            return Box::pin(async { Err(Error::from("conn is drop")) });
215        }
216        self.conn.as_mut().unwrap().close()
217    }
218
219    fn begin(&mut self) -> BoxFuture<'_, Result<(), Error>> {
220        if self.conn.is_none() {
221            return Box::pin(async { Err(Error::from("conn is drop")) });
222        }
223        self.conn.as_mut().unwrap().begin()
224    }
225    fn commit(&mut self) -> BoxFuture<'_, Result<(), Error>> {
226        if self.conn.is_none() {
227            return Box::pin(async { Err(Error::from("conn is drop")) });
228        }
229        self.conn.as_mut().unwrap().commit()
230    }
231    fn rollback(&mut self) -> BoxFuture<'_, Result<(), Error>> {
232        if self.conn.is_none() {
233            return Box::pin(async { Err(Error::from("conn is drop")) });
234        }
235        self.conn.as_mut().unwrap().rollback()
236    }
237}
238
239#[cfg(test)]
240mod test {
241    use crate::FastPool;
242    use futures_core::future::BoxFuture;
243    use rbdc::db::{ConnectOptions, Connection, Driver, ExecResult, Row};
244    use rbdc::pool::ConnectionManager;
245    use rbdc::pool::Pool;
246    use rbs::{Error, Value};
247
248    #[derive(Debug)]
249    pub struct Opt {}
250    impl ConnectOptions for Opt {
251        fn connect(&self) -> BoxFuture<'_, Result<Box<dyn Connection>, Error>> {
252            Box::pin(async { Ok(Box::new(Conn {}) as Box<dyn Connection>) })
253        }
254
255        fn set_uri(&mut self, _uri: &str) -> Result<(), Error> {
256            Ok(())
257        }
258    }
259
260    #[derive(Debug)]
261    pub struct Conn {}
262
263    impl Connection for Conn {
264        fn get_rows(
265            &mut self,
266            _sql: &str,
267            _params: Vec<Value>,
268        ) -> BoxFuture<'_, Result<Vec<Box<dyn Row>>, Error>> {
269            Box::pin(async { Ok(vec![]) })
270        }
271
272        fn exec(
273            &mut self,
274            _sql: &str,
275            _params: Vec<Value>,
276        ) -> BoxFuture<'_, Result<ExecResult, Error>> {
277            Box::pin(async { Ok(ExecResult::default()) })
278        }
279
280        fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
281            Box::pin(async { Ok(()) })
282        }
283
284        fn close(&mut self) -> BoxFuture<'_, Result<(), Error>> {
285            Box::pin(async { Ok(()) })
286        }
287    }
288
289    #[derive(Debug)]
290    pub struct D {}
291    impl Driver for D {
292        fn name(&self) -> &str {
293            "d"
294        }
295
296        fn connect(&self, _url: &str) -> BoxFuture<'_, Result<Box<dyn Connection>, Error>> {
297            Box::pin(async { Ok(Box::new(Conn {}) as Box<dyn Connection>) })
298        }
299
300        fn connect_opt<'a>(
301            &'a self,
302            _opt: &'a dyn ConnectOptions,
303        ) -> BoxFuture<'a, Result<Box<dyn Connection>, Error>> {
304            Box::pin(async { Ok(Box::new(Conn {}) as Box<dyn Connection>) })
305        }
306
307        fn default_option(&self) -> Box<dyn ConnectOptions> {
308            Box::new(Opt {})
309        }
310    }
311
312    #[test]
313    fn test() {
314        let pool = Box::new(FastPool::new(ConnectionManager::new(D {}, "").unwrap()));
315        println!("ok={}", pool.is_ok());
316    }
317}