1use std::{
10 collections::VecDeque,
11 fmt,
12 ops::Deref,
13 sync::{
14 atomic::{AtomicUsize, Ordering},
15 Arc, Condvar, Mutex,
16 },
17 time::{Duration, Instant},
18};
19
20use crate::{
21 conn::query_result::{Binary, Text},
22 prelude::*,
23 Conn, DriverError, Error, LocalInfileHandler, Opts, Params, QueryResult, Result, Statement,
24 Transaction, TxOpts,
25};
26
27#[derive(Debug)]
28struct InnerPool {
29 opts: Opts,
30 pool: VecDeque<Conn>,
31}
32
33impl InnerPool {
34 fn new(min: usize, max: usize, opts: Opts) -> Result<InnerPool> {
35 if min > max || max == 0 {
36 return Err(Error::DriverError(DriverError::InvalidPoolConstraints));
37 }
38 let mut pool = InnerPool {
39 opts,
40 pool: VecDeque::with_capacity(max),
41 };
42 for _ in 0..min {
43 pool.new_conn()?;
44 }
45 Ok(pool)
46 }
47 fn new_conn(&mut self) -> Result<()> {
48 match Conn::new(self.opts.clone()) {
49 Ok(conn) => {
50 self.pool.push_back(conn);
51 Ok(())
52 }
53 Err(err) => Err(err),
54 }
55 }
56}
57
58struct ArcedPool {
59 inner: (Mutex<InnerPool>, Condvar),
60 min: usize,
61 max: usize,
62 count: AtomicUsize,
63}
64
65#[derive(Clone)]
101pub struct Pool {
102 arced_pool: Arc<ArcedPool>,
103 check_health: bool,
104 use_cache: bool,
105}
106
107impl Pool {
108 fn _get_conn<T: AsRef<[u8]>>(
114 &self,
115 stmt: Option<T>,
116 timeout_ms: Option<u32>,
117 call_ping: bool,
118 ) -> Result<PooledConn> {
119 let times = if let Some(timeout_ms) = timeout_ms {
120 Some((Instant::now(), Duration::from_millis(timeout_ms.into())))
121 } else {
122 None
123 };
124
125 let &(ref inner_pool, ref condvar) = &self.arced_pool.inner;
126
127 let conn = if self.use_cache {
128 if let Some(query) = stmt {
129 let mut id = None;
130 let mut pool = inner_pool.lock()?;
131 for (i, conn) in pool.pool.iter().rev().enumerate() {
132 if conn.has_stmt(query.as_ref()) {
133 id = Some(i);
134 break;
135 }
136 }
137 id.and_then(|id| pool.pool.swap_remove_back(id))
138 } else {
139 None
140 }
141 } else {
142 None
143 };
144
145 let mut conn = if let Some(conn) = conn {
146 conn
147 } else {
148 let mut pool = inner_pool.lock()?;
149 loop {
150 if let Some(conn) = pool.pool.pop_front() {
151 drop(pool);
152 break conn;
153 } else if self.arced_pool.count.load(Ordering::Relaxed) < self.arced_pool.max {
154 pool.new_conn()?;
155 self.arced_pool.count.fetch_add(1, Ordering::SeqCst);
156 } else {
157 pool = if let Some((start, timeout)) = times {
158 if start.elapsed() > timeout {
159 return Err(DriverError::Timeout.into());
160 }
161 condvar.wait_timeout(pool, timeout)?.0
162 } else {
163 condvar.wait(pool)?
164 }
165 }
166 }
167 };
168
169 if call_ping && self.check_health && !conn.ping() {
170 if let Err(err) = conn.reset() {
171 self.arced_pool.count.fetch_sub(1, Ordering::SeqCst);
172 return Err(err);
173 }
174 }
175
176 Ok(PooledConn {
177 pool: self.clone(),
178 conn: Some(conn),
179 })
180 }
181
182 pub fn new<T, E>(opts: T) -> Result<Pool>
184 where
185 Opts: TryFrom<T, Error = E>,
186 crate::Error: From<E>,
187 {
188 Pool::new_manual(10, 100, opts)
189 }
190
191 pub fn new_manual<T, E>(min: usize, max: usize, opts: T) -> Result<Pool>
193 where
194 Opts: TryFrom<T, Error = E>,
195 crate::Error: From<E>,
196 {
197 let pool = InnerPool::new(min, max, opts.try_into()?)?;
198 Ok(Pool {
199 arced_pool: Arc::new(ArcedPool {
200 inner: (Mutex::new(pool), Condvar::new()),
201 min,
202 max,
203 count: AtomicUsize::new(min),
204 }),
205 use_cache: true,
206 check_health: true,
207 })
208 }
209
210 #[doc(hidden)]
212 pub fn use_cache(&mut self, use_cache: bool) {
213 self.use_cache = use_cache;
214 }
215
216 pub fn check_health(&mut self, check_health: bool) {
218 self.check_health = check_health;
219 }
220
221 pub fn get_conn(&self) -> Result<PooledConn> {
228 self._get_conn(None::<String>, None, true)
229 }
230
231 pub fn try_get_conn(&self, timeout_ms: u32) -> Result<PooledConn> {
237 self._get_conn(None::<String>, Some(timeout_ms), true)
238 }
239
240 pub fn start_transaction(&self, tx_opts: TxOpts) -> Result<Transaction<'static>> {
242 let conn = self._get_conn(None::<String>, None, false)?;
243 let result = conn.pooled_start_transaction(tx_opts);
244 match result {
245 Ok(trans) => Ok(trans),
246 Err(ref e) if e.is_connectivity_error() => {
247 let conn = self._get_conn(None::<String>, None, true)?;
248 conn.pooled_start_transaction(tx_opts)
249 }
250 Err(e) => Err(e),
251 }
252 }
253}
254
255impl fmt::Debug for Pool {
256 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
257 write!(
258 f,
259 "Pool {{ min: {}, max: {}, count: {} }}",
260 self.arced_pool.min,
261 self.arced_pool.max,
262 self.arced_pool.count.load(Ordering::Relaxed)
263 )
264 }
265}
266
267#[derive(Debug)]
300pub struct PooledConn {
301 pool: Pool,
302 conn: Option<Conn>,
303}
304
305impl Deref for PooledConn {
306 type Target = Conn;
307
308 fn deref(&self) -> &Self::Target {
309 self.conn.as_ref().expect("deref after drop")
310 }
311}
312
313impl Drop for PooledConn {
314 fn drop(&mut self) {
315 if self.pool.arced_pool.count.load(Ordering::Relaxed) > self.pool.arced_pool.max
316 || self.conn.is_none()
317 {
318 self.pool.arced_pool.count.fetch_sub(1, Ordering::SeqCst);
319 } else {
320 self.conn.as_mut().unwrap().set_local_infile_handler(None);
321 let mut pool = (self.pool.arced_pool.inner).0.lock().unwrap();
322 pool.pool.push_back(self.conn.take().unwrap());
323 drop(pool);
324 (self.pool.arced_pool.inner).1.notify_one();
325 }
326 }
327}
328
329impl PooledConn {
330 pub fn start_transaction(&mut self, tx_opts: TxOpts) -> Result<Transaction> {
333 self.conn.as_mut().unwrap().start_transaction(tx_opts)
334 }
335
336 pub fn get_binlog_stream(
338 mut self,
339 request: crate::BinlogRequest<'_>,
340 ) -> Result<crate::BinlogStream> {
341 self.conn.take().unwrap().get_binlog_stream(request)
342 }
343
344 pub fn as_mut(&mut self) -> &mut Conn {
347 self.conn.as_mut().unwrap()
348 }
349
350 pub fn as_ref(&self) -> &Conn {
353 self.conn.as_ref().unwrap()
354 }
355
356 pub fn unwrap(mut self) -> Conn {
358 self.conn.take().unwrap()
359 }
360
361 fn pooled_start_transaction(mut self, tx_opts: TxOpts) -> Result<Transaction<'static>> {
362 self.as_mut()._start_transaction(tx_opts)?;
363 Ok(Transaction::new(self.into()))
364 }
365
366 pub fn set_local_infile_handler(&mut self, handler: Option<LocalInfileHandler>) {
370 self.conn
371 .as_mut()
372 .unwrap()
373 .set_local_infile_handler(handler);
374 }
375}
376
377impl Queryable for PooledConn {
378 fn query_iter<T: AsRef<str>>(&mut self, query: T) -> Result<QueryResult<'_, '_, '_, Text>> {
379 self.conn.as_mut().unwrap().query_iter(query)
380 }
381
382 fn prep<T: AsRef<str>>(&mut self, query: T) -> Result<Statement> {
383 self.conn.as_mut().unwrap().prep(query)
384 }
385
386 fn close(&mut self, stmt: Statement) -> Result<()> {
387 self.conn.as_mut().unwrap().close(stmt)
388 }
389
390 fn exec_iter<S, P>(&mut self, stmt: S, params: P) -> Result<QueryResult<'_, '_, '_, Binary>>
391 where
392 S: AsStatement,
393 P: Into<Params>,
394 {
395 self.conn.as_mut().unwrap().exec_iter(stmt, params)
396 }
397}
398
399#[cfg(not(target_os = "wasi"))]
400#[cfg(test)]
401#[allow(non_snake_case)]
402mod test {
403 mod pool {
404 use std::{thread, time::Duration};
405
406 use crate::{
407 from_value, prelude::*, test_misc::get_opts, DriverError, Error, OptsBuilder, Pool,
408 TxOpts,
409 };
410
411 #[test]
412 fn multiple_pools_should_work() {
413 let pool = Pool::new(get_opts()).unwrap();
414 pool.get_conn()
415 .unwrap()
416 .exec_drop("DROP DATABASE IF EXISTS A", ())
417 .unwrap();
418 pool.get_conn()
419 .unwrap()
420 .exec_drop("CREATE DATABASE A", ())
421 .unwrap();
422 pool.get_conn()
423 .unwrap()
424 .exec_drop("DROP TABLE IF EXISTS A.a", ())
425 .unwrap();
426 pool.get_conn()
427 .unwrap()
428 .exec_drop("CREATE TABLE IF NOT EXISTS A.a (id INT)", ())
429 .unwrap();
430 pool.get_conn()
431 .unwrap()
432 .exec_drop("INSERT INTO A.a VALUES (1)", ())
433 .unwrap();
434 let opts = OptsBuilder::from_opts(get_opts()).db_name(Some("A"));
435 let pool2 = Pool::new(opts).unwrap();
436 let count: u8 = pool2
437 .get_conn()
438 .unwrap()
439 .exec_first("SELECT COUNT(*) FROM a", ())
440 .unwrap()
441 .unwrap();
442 assert_eq!(1, count);
443 pool.get_conn()
444 .unwrap()
445 .exec_drop("DROP DATABASE A", ())
446 .unwrap();
447 }
448
449 struct A {
450 pool: Pool,
451 x: u32,
452 }
453
454 impl A {
455 fn add(&mut self) {
456 self.x += 1;
457 }
458 }
459
460 #[test]
461 fn should_fix_connectivity_errors_on_prepare() {
462 let pool = Pool::new_manual(2, 2, get_opts()).unwrap();
463 let mut conn = pool.get_conn().unwrap();
464
465 let id: u32 = pool
466 .get_conn()
467 .unwrap()
468 .exec_first("SELECT CONNECTION_ID();", ())
469 .unwrap()
470 .unwrap();
471
472 conn.query_drop(&*format!("KILL {}", id)).unwrap();
473 thread::sleep(Duration::from_millis(250));
474 pool.get_conn()
475 .unwrap()
476 .prep("SHOW FULL PROCESSLIST")
477 .unwrap();
478 }
479
480 #[test]
481 fn should_fix_connectivity_errors_on_prep_exec() {
482 let pool = Pool::new_manual(2, 2, get_opts()).unwrap();
483 let mut conn = pool.get_conn().unwrap();
484
485 let id: u32 = pool
486 .get_conn()
487 .unwrap()
488 .exec_first("SELECT CONNECTION_ID();", ())
489 .unwrap()
490 .unwrap();
491
492 conn.query_drop(&*format!("KILL {}", id)).unwrap();
493 thread::sleep(Duration::from_millis(250));
494 pool.get_conn()
495 .unwrap()
496 .exec_drop("SHOW FULL PROCESSLIST", ())
497 .unwrap();
498 }
499 #[test]
500 fn should_fix_connectivity_errors_on_start_transaction() {
501 let pool = Pool::new_manual(2, 2, get_opts()).unwrap();
502 let mut conn = pool.get_conn().unwrap();
503
504 let id: u32 = pool
505 .get_conn()
506 .unwrap()
507 .exec_first("SELECT CONNECTION_ID();", ())
508 .unwrap()
509 .unwrap();
510
511 conn.query_drop(&*format!("KILL {}", id)).unwrap();
512 thread::sleep(Duration::from_millis(250));
513 pool.start_transaction(TxOpts::default()).unwrap();
514 }
515 #[test]
516 fn should_execute_queryes_on_PooledConn() {
517 let pool = Pool::new(get_opts()).unwrap();
518 let mut threads = Vec::new();
519 for _ in 0usize..10 {
520 let pool = pool.clone();
521 threads.push(thread::spawn(move || {
522 let conn = pool.get_conn();
523 assert!(conn.is_ok());
524 let mut conn = conn.unwrap();
525 conn.query_drop("SELECT 1").unwrap();
526 }));
527 }
528 for t in threads.into_iter() {
529 assert!(t.join().is_ok());
530 }
531 }
532 #[test]
533 fn should_timeout_if_no_connections_available() {
534 let pool = Pool::new_manual(0, 1, get_opts()).unwrap();
535 let conn1 = pool.try_get_conn(357).unwrap();
536 let conn2 = pool.try_get_conn(357);
537 assert!(conn2.is_err());
538 match conn2 {
539 Err(Error::DriverError(DriverError::Timeout)) => assert!(true),
540 _ => assert!(false),
541 }
542 drop(conn1);
543 assert!(pool.try_get_conn(357).is_ok());
544 }
545
546 #[test]
547 fn should_execute_statements_on_PooledConn() {
548 let pool = Pool::new(get_opts()).unwrap();
549 let mut threads = Vec::new();
550 for _ in 0usize..10 {
551 let pool = pool.clone();
552 threads.push(thread::spawn(move || {
553 let mut conn = pool.get_conn().unwrap();
554 let stmt = conn.prep("SELECT 1").unwrap();
555 conn.exec_drop(&stmt, ()).unwrap();
556 }));
557 }
558 for t in threads.into_iter() {
559 assert!(t.join().is_ok());
560 }
561
562 let pool = Pool::new(get_opts()).unwrap();
563 let mut threads = Vec::new();
564 for _ in 0usize..10 {
565 let pool = pool.clone();
566 threads.push(thread::spawn(move || {
567 let mut conn = pool.get_conn().unwrap();
568 conn.exec_drop("SELECT ?", (1,)).unwrap();
569 }));
570 }
571 for t in threads.into_iter() {
572 assert!(t.join().is_ok());
573 }
574 }
575
576 #[test]
577 #[allow(unused_variables)]
578 fn should_start_transaction_on_Pool() {
579 let pool = Pool::new_manual(1, 10, get_opts()).unwrap();
580 pool.get_conn()
581 .unwrap()
582 .query_drop("CREATE TEMPORARY TABLE mysql.tbl(a INT)")
583 .unwrap();
584 pool.start_transaction(TxOpts::default())
585 .and_then(|mut t| {
586 t.query_drop("INSERT INTO mysql.tbl(a) VALUES(1)").unwrap();
587 t.query_drop("INSERT INTO mysql.tbl(a) VALUES(2)").unwrap();
588 t.commit()
589 })
590 .unwrap();
591 assert_eq!(
592 pool.get_conn()
593 .unwrap()
594 .query_first::<u8, _>("SELECT COUNT(a) FROM mysql.tbl")
595 .unwrap()
596 .unwrap(),
597 2_u8
598 );
599 pool.start_transaction(TxOpts::default())
600 .and_then(|mut t| {
601 t.query_drop("INSERT INTO mysql.tbl(a) VALUES(1)").unwrap();
602 t.query_drop("INSERT INTO mysql.tbl(a) VALUES(2)").unwrap();
603 t.rollback()
604 })
605 .unwrap();
606 assert_eq!(
607 pool.get_conn()
608 .unwrap()
609 .query_first::<u8, _>("SELECT COUNT(a) FROM mysql.tbl")
610 .unwrap()
611 .unwrap(),
612 2_u8
613 );
614 pool.start_transaction(TxOpts::default())
615 .and_then(|mut t| {
616 t.query_drop("INSERT INTO mysql.tbl(a) VALUES(1)").unwrap();
617 t.query_drop("INSERT INTO mysql.tbl(a) VALUES(2)").unwrap();
618 Ok(())
619 })
620 .unwrap();
621 assert_eq!(
622 pool.get_conn()
623 .unwrap()
624 .query_first::<u8, _>("SELECT COUNT(a) FROM mysql.tbl")
625 .unwrap()
626 .unwrap(),
627 2_u8
628 );
629 let mut a = A { pool, x: 0 };
630 let transaction = a.pool.start_transaction(TxOpts::default()).unwrap();
631 a.add();
632 }
633
634 #[test]
635 fn should_reuse_connections() -> crate::Result<()> {
636 let pool = Pool::new_manual(1, 1, get_opts())?;
637 let mut conn = pool.get_conn()?;
638
639 let server_version = conn.server_version();
640 let connection_id = conn.connection_id();
641
642 for _ in 0..16 {
643 drop(conn);
644 conn = pool.get_conn()?;
645 println!("CONN connection_id={}", conn.connection_id());
646 assert!(conn.connection_id() == connection_id || server_version < (5, 7, 2));
647 }
648
649 Ok(())
650 }
651
652 #[test]
653 fn should_start_transaction_on_PooledConn() {
654 let pool = Pool::new(get_opts()).unwrap();
655 let mut conn = pool.get_conn().unwrap();
656 conn.query_drop("CREATE TEMPORARY TABLE mysql.tbl(a INT)")
657 .unwrap();
658 conn.start_transaction(TxOpts::default())
659 .and_then(|mut t| {
660 t.query_drop("INSERT INTO mysql.tbl(a) VALUES(1)").unwrap();
661 t.query_drop("INSERT INTO mysql.tbl(a) VALUES(2)").unwrap();
662 t.commit()
663 })
664 .unwrap();
665 for x in conn.query_iter("SELECT COUNT(a) FROM mysql.tbl").unwrap() {
666 let mut x = x.unwrap();
667 assert_eq!(from_value::<u8>(x.take(0).unwrap()), 2u8);
668 }
669 conn.start_transaction(TxOpts::default())
670 .and_then(|mut t| {
671 t.query_drop("INSERT INTO mysql.tbl(a) VALUES(1)").unwrap();
672 t.query_drop("INSERT INTO mysql.tbl(a) VALUES(2)").unwrap();
673 t.rollback()
674 })
675 .unwrap();
676 for x in conn.query_iter("SELECT COUNT(a) FROM mysql.tbl").unwrap() {
677 let mut x = x.unwrap();
678 assert_eq!(from_value::<u8>(x.take(0).unwrap()), 2u8);
679 }
680 conn.start_transaction(TxOpts::default())
681 .and_then(|mut t| {
682 t.query_drop("INSERT INTO mysql.tbl(a) VALUES(1)").unwrap();
683 t.query_drop("INSERT INTO mysql.tbl(a) VALUES(2)").unwrap();
684 Ok(())
685 })
686 .unwrap();
687 for x in conn.query_iter("SELECT COUNT(a) FROM mysql.tbl").unwrap() {
688 let mut x = x.unwrap();
689 assert_eq!(from_value::<u8>(x.take(0).unwrap()), 2u8);
690 }
691 }
692
693 #[cfg(feature = "nightly")]
694 mod bench {
695 use test;
696
697 use std::thread;
698
699 use crate::{prelude::*, test_misc::get_opts, Pool};
700
701 #[bench]
702 fn many_prepexecs(bencher: &mut test::Bencher) {
703 let pool = Pool::new(get_opts()).unwrap();
704 bencher.iter(|| {
705 "SELECT 1".with(()).run(&pool).unwrap();
706 });
707 }
708
709 #[bench]
710 fn many_prepares_threaded(bencher: &mut test::Bencher) {
711 let pool = Pool::new(get_opts()).unwrap();
712 bencher.iter(|| {
713 let mut threads = Vec::new();
714 for _ in 0..4 {
715 let pool = pool.clone();
716 threads.push(thread::spawn(move || {
717 for _ in 0..250 {
718 test::black_box(
719 "SELECT 1, 'hello world', 123.321, ?, ?, ?"
720 .with(("hello", "world", 65536))
721 .run(&pool)
722 .unwrap(),
723 );
724 }
725 }));
726 }
727 for t in threads {
728 t.join().unwrap();
729 }
730 });
731 }
732
733 #[bench]
734 fn many_prepares_threaded_no_cache(bencher: &mut test::Bencher) {
735 let mut pool = Pool::new(get_opts()).unwrap();
736 pool.use_cache(false);
737 bencher.iter(|| {
738 let mut threads = Vec::new();
739 for _ in 0..4 {
740 let pool = pool.clone();
741 threads.push(thread::spawn(move || {
742 for _ in 0..250 {
743 test::black_box(
744 "SELECT 1, 'hello world', 123.321, ?, ?, ?"
745 .with(("hello", "world", 65536))
746 .run(&pool)
747 .unwrap(),
748 );
749 }
750 }));
751 }
752 for t in threads {
753 t.join().unwrap();
754 }
755 });
756 }
757 }
758 }
759}