1#![allow(mismatched_lifetime_syntaxes)]
2
3use dark_std::sync::AtomicDuration;
4use fast_pool::plugin::{CheckMode, DurationManager};
5use futures_core::future::BoxFuture;
6use log::error;
7use rbdc::db::{Connection, ExecResult, Row};
8use rbdc::pool::ConnectionGuard;
9use rbdc::pool::ConnectionManager;
10use rbdc::pool::Pool;
11use rbdc::Error;
12use rbs::value::map::ValueMap;
13use rbs::Value;
14use std::sync::Arc;
15use std::time::Duration;
16use rbdc::db::{ConnectOptions, Driver};
17
18#[derive(Debug,Clone)]
19pub struct FastPool {
20 pub manager: Arc<ConnectionManager>,
21 pub inner: fast_pool::Pool<DurationManager<ConnManagerProxy>>,
22 pub timeout: Arc<AtomicDuration>,
23}
24
25impl FastPool{
26 pub fn new_url<D: Driver + 'static>(driver: D, url: &str) -> Result<Self, Error>
27 where
28 Self: Sized,
29 {
30 Self::new(ConnectionManager::new(driver,url)?)
31 }
32
33 pub fn new_option<D: Driver + 'static, Options: ConnectOptions>(driver: D, options: Options) -> Result<Self, Error>
34 where
35 Self: Sized,
36 {
37 Self::new(ConnectionManager::new_options(driver,options))
38 }
39}
40
41#[derive(Debug)]
42pub struct ConnManagerProxy {
43 inner: ConnectionManager,
44}
45
46impl ConnManagerProxy{
47 pub fn new(inner: ConnectionManager) -> Self {
48 Self {
49 inner,
50 }
51 }
52}
53
54#[derive(Debug)]
55pub struct ConnProxy {
56 conn: Option<fast_pool::ConnectionGuard<DurationManager<ConnManagerProxy>>>,
57}
58
59impl From<ConnectionManager> for ConnManagerProxy {
60 fn from(value: ConnectionManager) -> Self {
61 ConnManagerProxy {
62 inner: value
63 }
64 }
65}
66
67#[async_trait::async_trait]
68impl Pool for FastPool {
69 fn new(manager: ConnectionManager) -> Result<Self, Error>
70 where
71 Self: Sized,
72 {
73 Ok(Self {
74 manager: manager.clone().into(),
75 inner: fast_pool::Pool::new(DurationManager::new(ConnManagerProxy::new(manager),CheckMode::NoLimit)),
76 timeout: Arc::new(AtomicDuration::new(None)),
77 })
78 }
79
80 async fn get(&self) -> Result<Box<dyn Connection>, Error> {
81 let v = self
82 .inner
83 .get_timeout(self.timeout.get())
84 .await
85 .map_err(|e| Error::from(e.to_string()))?;
86 let proxy = ConnProxy {
87 conn: Some(v),
88 };
89 Ok(Box::new(proxy))
90 }
91
92 async fn get_timeout(&self, mut d: Duration) -> Result<Box<dyn Connection>, Error> {
93 if d.is_zero() {
94 let state = self.inner.state();
95 if state.in_use < state.max_open {
96 d = Duration::from_secs(10);
97 } else {
98 return Err(Error::from("Time out in the connection pool"));
99 }
100 }
101 let v = self
102 .inner
103 .get_timeout(Some(d))
104 .await
105 .map_err(|e| Error::from(e.to_string()))?;
106 let proxy = ConnProxy {
107 conn: Some(v),
108 };
109 Ok(Box::new(proxy))
110 }
111
112 async fn set_timeout(&self, timeout: Option<Duration>) {
113 self.timeout.store(timeout);
114 }
115
116 async fn set_conn_max_lifetime(&self, max_lifetime: Option<Duration>) {
117 let manager = self.inner.downcast_manager::<DurationManager<ConnManagerProxy>>();
118 if let Some(manager) = manager {
119 if let Some(max_lifetime) = max_lifetime {
120 manager.mode.set_mode(CheckMode::MaxLifetime(max_lifetime));
121 }else{
122 manager.mode.set_mode(CheckMode::NoLimit);
123 }
124 }else{
125 error!("FastPool method set_conn_max_lifetime need use DurationManager to init");
126 }
127 }
128
129 async fn set_max_idle_conns(&self, n: u64) {
130 self.inner.set_max_idle_conns(n);
131 }
132
133 async fn set_max_open_conns(&self, n: u64) {
134 self.inner.set_max_open(n);
135 }
136
137 fn driver_type(&self) -> &str {
138 self.manager.driver_type()
139 }
140
141 async fn state(&self) -> Value {
142 let mut m = ValueMap::with_capacity(10);
143 let state = self.inner.state();
144 m.insert("max_open".to_string().into(), state.max_open.into());
145 m.insert("connections".to_string().into(), state.connections.into());
146 m.insert("in_use".to_string().into(), state.in_use.into());
147 m.insert("idle".to_string().into(), state.idle.into());
148 m.insert("waits".to_string().into(), state.waits.into());
149 m.insert("connecting".to_string().into(), state.connecting.into());
150 m.insert("checking".to_string().into(), state.checking.into());
151 Value::Map(m)
152 }
153}
154
155impl fast_pool::Manager for ConnManagerProxy {
156 type Connection = ConnectionGuard;
157 type Error = Error;
158
159 async fn connect(&self) -> Result<Self::Connection, Self::Error> {
160 self.inner.connect().await
161 }
162
163 async fn check(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
164 let r = self.inner.check(conn).await;
165 match r {
166 Ok(_) => Ok(()),
167 Err(e) => {
168 _ = conn.close().await;
169 Err(e)
170 }
171 }
172 }
173}
174
175impl Connection for ConnProxy {
176 fn get_rows(
177 &mut self,
178 sql: &str,
179 params: Vec<Value>,
180 ) -> BoxFuture<'_, Result<Vec<Box<dyn Row>>, Error>> {
181 if self.conn.is_none() {
182 return Box::pin(async { Err(Error::from("conn is drop")) });
183 }
184 self.conn.as_mut().unwrap().get_rows(sql, params)
185 }
186
187 fn get_values(
188 &mut self,
189 sql: &str,
190 params: Vec<Value>,
191 ) -> BoxFuture<'_, Result<Value, Error>> {
192 if self.conn.is_none() {
193 return Box::pin(async { Err(Error::from("conn is drop")) });
194 }
195 self.conn.as_mut().unwrap().get_values(sql, params)
196 }
197
198 fn exec(&mut self, sql: &str, params: Vec<Value>) -> BoxFuture<'_, Result<ExecResult, Error>> {
199 if self.conn.is_none() {
200 return Box::pin(async { Err(Error::from("conn is drop")) });
201 }
202 self.conn.as_mut().unwrap().exec(sql, params)
203 }
204
205 fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
206 if self.conn.is_none() {
207 return Box::pin(async { Err(Error::from("conn is drop")) });
208 }
209 self.conn.as_mut().unwrap().ping()
210 }
211
212 fn close(&mut self) -> BoxFuture<'_, Result<(), Error>> {
213 if self.conn.is_none() {
214 return Box::pin(async { Err(Error::from("conn is drop")) });
215 }
216 self.conn.as_mut().unwrap().close()
217 }
218
219 fn begin(&mut self) -> BoxFuture<'_, Result<(), Error>> {
220 if self.conn.is_none() {
221 return Box::pin(async { Err(Error::from("conn is drop")) });
222 }
223 self.conn.as_mut().unwrap().begin()
224 }
225 fn commit(&mut self) -> BoxFuture<'_, Result<(), Error>> {
226 if self.conn.is_none() {
227 return Box::pin(async { Err(Error::from("conn is drop")) });
228 }
229 self.conn.as_mut().unwrap().commit()
230 }
231 fn rollback(&mut self) -> BoxFuture<'_, Result<(), Error>> {
232 if self.conn.is_none() {
233 return Box::pin(async { Err(Error::from("conn is drop")) });
234 }
235 self.conn.as_mut().unwrap().rollback()
236 }
237}
238
239#[cfg(test)]
240mod test {
241 use crate::FastPool;
242 use futures_core::future::BoxFuture;
243 use rbdc::db::{ConnectOptions, Connection, Driver, ExecResult, Row};
244 use rbdc::pool::ConnectionManager;
245 use rbdc::pool::Pool;
246 use rbs::{Error, Value};
247
248 #[derive(Debug)]
249 pub struct Opt {}
250 impl ConnectOptions for Opt {
251 fn connect(&self) -> BoxFuture<'_, Result<Box<dyn Connection>, Error>> {
252 Box::pin(async { Ok(Box::new(Conn {}) as Box<dyn Connection>) })
253 }
254
255 fn set_uri(&mut self, _uri: &str) -> Result<(), Error> {
256 Ok(())
257 }
258 }
259
260 #[derive(Debug)]
261 pub struct Conn {}
262
263 impl Connection for Conn {
264 fn get_rows(
265 &mut self,
266 _sql: &str,
267 _params: Vec<Value>,
268 ) -> BoxFuture<'_, Result<Vec<Box<dyn Row>>, Error>> {
269 Box::pin(async { Ok(vec![]) })
270 }
271
272 fn exec(
273 &mut self,
274 _sql: &str,
275 _params: Vec<Value>,
276 ) -> BoxFuture<'_, Result<ExecResult, Error>> {
277 Box::pin(async { Ok(ExecResult::default()) })
278 }
279
280 fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
281 Box::pin(async { Ok(()) })
282 }
283
284 fn close(&mut self) -> BoxFuture<'_, Result<(), Error>> {
285 Box::pin(async { Ok(()) })
286 }
287 }
288
289 #[derive(Debug)]
290 pub struct D {}
291 impl Driver for D {
292 fn name(&self) -> &str {
293 "d"
294 }
295
296 fn connect(&self, _url: &str) -> BoxFuture<'_, Result<Box<dyn Connection>, Error>> {
297 Box::pin(async { Ok(Box::new(Conn {}) as Box<dyn Connection>) })
298 }
299
300 fn connect_opt<'a>(
301 &'a self,
302 _opt: &'a dyn ConnectOptions,
303 ) -> BoxFuture<'a, Result<Box<dyn Connection>, Error>> {
304 Box::pin(async { Ok(Box::new(Conn {}) as Box<dyn Connection>) })
305 }
306
307 fn default_option(&self) -> Box<dyn ConnectOptions> {
308 Box::new(Opt {})
309 }
310 }
311
312 #[test]
313 fn test() {
314 let pool = Box::new(FastPool::new(ConnectionManager::new(D {}, "").unwrap()));
315 println!("ok={}", pool.is_ok());
316 }
317}