mysql/conn/
pool.rs

1// Copyright (c) 2020 rust-mysql-simple contributors
2//
3// Licensed under the Apache License, Version 2.0
4// <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0> or the MIT
5// license <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. All files in the project carrying such notice may not be copied,
7// modified, or distributed except according to those terms.
8
9use std::{
10    collections::VecDeque,
11    fmt,
12    ops::Deref,
13    sync::{
14        atomic::{AtomicUsize, Ordering},
15        Arc, Condvar, Mutex,
16    },
17    time::{Duration, Instant},
18};
19
20use crate::{
21    conn::query_result::{Binary, Text},
22    prelude::*,
23    Conn, DriverError, Error, LocalInfileHandler, Opts, Params, QueryResult, Result, Statement,
24    Transaction, TxOpts,
25};
26
27#[derive(Debug)]
28struct InnerPool {
29    opts: Opts,
30    pool: VecDeque<Conn>,
31}
32
33impl InnerPool {
34    fn new(min: usize, max: usize, opts: Opts) -> Result<InnerPool> {
35        if min > max || max == 0 {
36            return Err(Error::DriverError(DriverError::InvalidPoolConstraints));
37        }
38        let mut pool = InnerPool {
39            opts,
40            pool: VecDeque::with_capacity(max),
41        };
42        for _ in 0..min {
43            pool.new_conn()?;
44        }
45        Ok(pool)
46    }
47    fn new_conn(&mut self) -> Result<()> {
48        match Conn::new(self.opts.clone()) {
49            Ok(conn) => {
50                self.pool.push_back(conn);
51                Ok(())
52            }
53            Err(err) => Err(err),
54        }
55    }
56}
57
58struct ArcedPool {
59    inner: (Mutex<InnerPool>, Condvar),
60    min: usize,
61    max: usize,
62    count: AtomicUsize,
63}
64
65/// `Pool` serves to provide you with a [`PooledConn`](struct.PooledConn.html)'s.
66/// However you can prepare statements directly on `Pool` without
67/// invoking [`Pool::get_conn`](struct.Pool.html#method.get_conn).
68///
69/// `Pool` will hold at least `min` connections and will create as many as `max`
70/// connections with possible overhead of one connection per alive thread.
71///
72/// Example of multithreaded `Pool` usage:
73///
74/// ```rust
75/// # mysql::doctest_wrapper!(__result, {
76/// # use mysql::*;
77/// # use mysql::prelude::*;
78/// # let mut conn = Conn::new(get_opts())?;
79/// let opts = get_opts();
80/// let pool = Pool::new(opts).unwrap();
81/// let mut threads = Vec::new();
82///
83/// for _ in 0..100 {
84///     let pool = pool.clone();
85///     threads.push(std::thread::spawn(move || {
86///         let mut conn = pool.get_conn().unwrap();
87///         let result: u8 = conn.query_first("SELECT 1").unwrap().unwrap();
88///         assert_eq!(result, 1_u8);
89///     }));
90/// }
91///
92/// for t in threads.into_iter() {
93///     assert!(t.join().is_ok());
94/// }
95/// # });
96/// ```
97///
98/// For more info on how to work with mysql connection please look at
99/// [`PooledConn`](struct.PooledConn.html) documentation.
100#[derive(Clone)]
101pub struct Pool {
102    arced_pool: Arc<ArcedPool>,
103    check_health: bool,
104    use_cache: bool,
105}
106
107impl Pool {
108    /// Will return connection taken from a pool.
109    ///
110    /// Will verify and fix it via `Conn::ping` and `Conn::reset` if `call_ping` is `true`.
111    /// Will try to get concrete connection if `id` is `Some(_)`.
112    /// Will wait til timeout if `timeout_ms` is `Some(_)`
113    fn _get_conn<T: AsRef<[u8]>>(
114        &self,
115        stmt: Option<T>,
116        timeout_ms: Option<u32>,
117        call_ping: bool,
118    ) -> Result<PooledConn> {
119        let times = if let Some(timeout_ms) = timeout_ms {
120            Some((Instant::now(), Duration::from_millis(timeout_ms.into())))
121        } else {
122            None
123        };
124
125        let &(ref inner_pool, ref condvar) = &self.arced_pool.inner;
126
127        let conn = if self.use_cache {
128            if let Some(query) = stmt {
129                let mut id = None;
130                let mut pool = inner_pool.lock()?;
131                for (i, conn) in pool.pool.iter().rev().enumerate() {
132                    if conn.has_stmt(query.as_ref()) {
133                        id = Some(i);
134                        break;
135                    }
136                }
137                id.and_then(|id| pool.pool.swap_remove_back(id))
138            } else {
139                None
140            }
141        } else {
142            None
143        };
144
145        let mut conn = if let Some(conn) = conn {
146            conn
147        } else {
148            let mut pool = inner_pool.lock()?;
149            loop {
150                if let Some(conn) = pool.pool.pop_front() {
151                    drop(pool);
152                    break conn;
153                } else if self.arced_pool.count.load(Ordering::Relaxed) < self.arced_pool.max {
154                    pool.new_conn()?;
155                    self.arced_pool.count.fetch_add(1, Ordering::SeqCst);
156                } else {
157                    pool = if let Some((start, timeout)) = times {
158                        if start.elapsed() > timeout {
159                            return Err(DriverError::Timeout.into());
160                        }
161                        condvar.wait_timeout(pool, timeout)?.0
162                    } else {
163                        condvar.wait(pool)?
164                    }
165                }
166            }
167        };
168
169        if call_ping && self.check_health && !conn.ping() {
170            if let Err(err) = conn.reset() {
171                self.arced_pool.count.fetch_sub(1, Ordering::SeqCst);
172                return Err(err);
173            }
174        }
175
176        Ok(PooledConn {
177            pool: self.clone(),
178            conn: Some(conn),
179        })
180    }
181
182    /// Creates new pool with `min = 10` and `max = 100`.
183    pub fn new<T, E>(opts: T) -> Result<Pool>
184    where
185        Opts: TryFrom<T, Error = E>,
186        crate::Error: From<E>,
187    {
188        Pool::new_manual(10, 100, opts)
189    }
190
191    /// Same as `new` but you can set `min` and `max`.
192    pub fn new_manual<T, E>(min: usize, max: usize, opts: T) -> Result<Pool>
193    where
194        Opts: TryFrom<T, Error = E>,
195        crate::Error: From<E>,
196    {
197        let pool = InnerPool::new(min, max, opts.try_into()?)?;
198        Ok(Pool {
199            arced_pool: Arc::new(ArcedPool {
200                inner: (Mutex::new(pool), Condvar::new()),
201                min,
202                max,
203                count: AtomicUsize::new(min),
204            }),
205            use_cache: true,
206            check_health: true,
207        })
208    }
209
210    /// A way to turn off searching for cached statement (on by default).
211    #[doc(hidden)]
212    pub fn use_cache(&mut self, use_cache: bool) {
213        self.use_cache = use_cache;
214    }
215
216    /// A way to turn off connection health check on each call to `get_conn` (on by default).
217    pub fn check_health(&mut self, check_health: bool) {
218        self.check_health = check_health;
219    }
220
221    /// Gives you a [`PooledConn`](struct.PooledConn.html).
222    ///
223    /// `Pool` will check that connection is alive via
224    /// [`Conn::ping`](struct.Conn.html#method.ping) and will
225    /// call [`Conn::reset`](struct.Conn.html#method.reset) if
226    /// necessary.
227    pub fn get_conn(&self) -> Result<PooledConn> {
228        self._get_conn(None::<String>, None, true)
229    }
230
231    /// Will try to get connection for a duration of `timeout_ms` milliseconds.
232    ///
233    /// # Failure
234    /// This function will return `Error::DriverError(DriverError::Timeout)` if timeout was
235    /// reached while waiting for new connection to become available.
236    pub fn try_get_conn(&self, timeout_ms: u32) -> Result<PooledConn> {
237        self._get_conn(None::<String>, Some(timeout_ms), true)
238    }
239
240    /// Shortcut for `pool.get_conn()?.start_transaction(..)`.
241    pub fn start_transaction(&self, tx_opts: TxOpts) -> Result<Transaction<'static>> {
242        let conn = self._get_conn(None::<String>, None, false)?;
243        let result = conn.pooled_start_transaction(tx_opts);
244        match result {
245            Ok(trans) => Ok(trans),
246            Err(ref e) if e.is_connectivity_error() => {
247                let conn = self._get_conn(None::<String>, None, true)?;
248                conn.pooled_start_transaction(tx_opts)
249            }
250            Err(e) => Err(e),
251        }
252    }
253}
254
255impl fmt::Debug for Pool {
256    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
257        write!(
258            f,
259            "Pool {{ min: {}, max: {}, count: {} }}",
260            self.arced_pool.min,
261            self.arced_pool.max,
262            self.arced_pool.count.load(Ordering::Relaxed)
263        )
264    }
265}
266
267/// Pooled mysql connection which will return to the pool on `drop`.
268///
269/// You should prefer using `prep` along `exec` instead of `query` from the Queryable trait where
270/// possible, except cases when statement has no params and when it has no return values or return
271/// values which evaluates to `Value::Bytes`.
272///
273/// `query` is a part of mysql text protocol, so under the hood you will always receive
274/// `Value::Bytes` as a result and `from_value` will need to parse it if you want, for example, `i64`
275///
276/// ```rust
277/// # mysql::doctest_wrapper!(__result, {
278/// # use mysql::*;
279/// # use mysql::prelude::*;
280/// # let mut conn = Conn::new(get_opts())?;
281/// let pool = Pool::new(get_opts()).unwrap();
282/// let mut conn = pool.get_conn().unwrap();
283///
284/// conn.query_first("SELECT 42").map(|result: Option<Value>| {
285///     let result = result.unwrap();
286///     assert_eq!(result, Value::Bytes(b"42".to_vec()));
287///     assert_eq!(from_value::<i64>(result), 42i64);
288/// }).unwrap();
289/// conn.exec_iter("SELECT 42", ()).map(|mut result| {
290///     let cell = result.next().unwrap().unwrap().take(0).unwrap();
291///     assert_eq!(cell, Value::Int(42i64));
292///     assert_eq!(from_value::<i64>(cell), 42i64);
293/// }).unwrap();
294/// # });
295/// ```
296///
297/// For more info on how to work with query results please look at
298/// [`QueryResult`](../struct.QueryResult.html) documentation.
299#[derive(Debug)]
300pub struct PooledConn {
301    pool: Pool,
302    conn: Option<Conn>,
303}
304
305impl Deref for PooledConn {
306    type Target = Conn;
307
308    fn deref(&self) -> &Self::Target {
309        self.conn.as_ref().expect("deref after drop")
310    }
311}
312
313impl Drop for PooledConn {
314    fn drop(&mut self) {
315        if self.pool.arced_pool.count.load(Ordering::Relaxed) > self.pool.arced_pool.max
316            || self.conn.is_none()
317        {
318            self.pool.arced_pool.count.fetch_sub(1, Ordering::SeqCst);
319        } else {
320            self.conn.as_mut().unwrap().set_local_infile_handler(None);
321            let mut pool = (self.pool.arced_pool.inner).0.lock().unwrap();
322            pool.pool.push_back(self.conn.take().unwrap());
323            drop(pool);
324            (self.pool.arced_pool.inner).1.notify_one();
325        }
326    }
327}
328
329impl PooledConn {
330    /// Redirects to
331    /// [`Conn#start_transaction`](struct.Conn.html#method.start_transaction)
332    pub fn start_transaction(&mut self, tx_opts: TxOpts) -> Result<Transaction> {
333        self.conn.as_mut().unwrap().start_transaction(tx_opts)
334    }
335
336    /// Turns this connection into a binlog stream (see [`Conn::get_binlog_stream`]).
337    pub fn get_binlog_stream(
338        mut self,
339        request: crate::BinlogRequest<'_>,
340    ) -> Result<crate::BinlogStream> {
341        self.conn.take().unwrap().get_binlog_stream(request)
342    }
343
344    /// Gives mutable reference to the wrapped
345    /// [`Conn`](struct.Conn.html).
346    pub fn as_mut(&mut self) -> &mut Conn {
347        self.conn.as_mut().unwrap()
348    }
349
350    /// Gives reference to the wrapped
351    /// [`Conn`](struct.Conn.html).
352    pub fn as_ref(&self) -> &Conn {
353        self.conn.as_ref().unwrap()
354    }
355
356    /// Unwraps wrapped [`Conn`](struct.Conn.html).
357    pub fn unwrap(mut self) -> Conn {
358        self.conn.take().unwrap()
359    }
360
361    fn pooled_start_transaction(mut self, tx_opts: TxOpts) -> Result<Transaction<'static>> {
362        self.as_mut()._start_transaction(tx_opts)?;
363        Ok(Transaction::new(self.into()))
364    }
365
366    /// A way to override default local infile handler for this pooled connection. Destructor will
367    /// restore original handler before returning connection to a pool.
368    /// See [`Conn::set_local_infile_handler`](struct.Conn.html#method.set_local_infile_handler).
369    pub fn set_local_infile_handler(&mut self, handler: Option<LocalInfileHandler>) {
370        self.conn
371            .as_mut()
372            .unwrap()
373            .set_local_infile_handler(handler);
374    }
375}
376
377impl Queryable for PooledConn {
378    fn query_iter<T: AsRef<str>>(&mut self, query: T) -> Result<QueryResult<'_, '_, '_, Text>> {
379        self.conn.as_mut().unwrap().query_iter(query)
380    }
381
382    fn prep<T: AsRef<str>>(&mut self, query: T) -> Result<Statement> {
383        self.conn.as_mut().unwrap().prep(query)
384    }
385
386    fn close(&mut self, stmt: Statement) -> Result<()> {
387        self.conn.as_mut().unwrap().close(stmt)
388    }
389
390    fn exec_iter<S, P>(&mut self, stmt: S, params: P) -> Result<QueryResult<'_, '_, '_, Binary>>
391    where
392        S: AsStatement,
393        P: Into<Params>,
394    {
395        self.conn.as_mut().unwrap().exec_iter(stmt, params)
396    }
397}
398
399#[cfg(not(target_os = "wasi"))]
400#[cfg(test)]
401#[allow(non_snake_case)]
402mod test {
403    mod pool {
404        use std::{thread, time::Duration};
405
406        use crate::{
407            from_value, prelude::*, test_misc::get_opts, DriverError, Error, OptsBuilder, Pool,
408            TxOpts,
409        };
410
411        #[test]
412        fn multiple_pools_should_work() {
413            let pool = Pool::new(get_opts()).unwrap();
414            pool.get_conn()
415                .unwrap()
416                .exec_drop("DROP DATABASE IF EXISTS A", ())
417                .unwrap();
418            pool.get_conn()
419                .unwrap()
420                .exec_drop("CREATE DATABASE A", ())
421                .unwrap();
422            pool.get_conn()
423                .unwrap()
424                .exec_drop("DROP TABLE IF EXISTS A.a", ())
425                .unwrap();
426            pool.get_conn()
427                .unwrap()
428                .exec_drop("CREATE TABLE IF NOT EXISTS A.a (id INT)", ())
429                .unwrap();
430            pool.get_conn()
431                .unwrap()
432                .exec_drop("INSERT INTO A.a VALUES (1)", ())
433                .unwrap();
434            let opts = OptsBuilder::from_opts(get_opts()).db_name(Some("A"));
435            let pool2 = Pool::new(opts).unwrap();
436            let count: u8 = pool2
437                .get_conn()
438                .unwrap()
439                .exec_first("SELECT COUNT(*) FROM a", ())
440                .unwrap()
441                .unwrap();
442            assert_eq!(1, count);
443            pool.get_conn()
444                .unwrap()
445                .exec_drop("DROP DATABASE A", ())
446                .unwrap();
447        }
448
449        struct A {
450            pool: Pool,
451            x: u32,
452        }
453
454        impl A {
455            fn add(&mut self) {
456                self.x += 1;
457            }
458        }
459
460        #[test]
461        fn should_fix_connectivity_errors_on_prepare() {
462            let pool = Pool::new_manual(2, 2, get_opts()).unwrap();
463            let mut conn = pool.get_conn().unwrap();
464
465            let id: u32 = pool
466                .get_conn()
467                .unwrap()
468                .exec_first("SELECT CONNECTION_ID();", ())
469                .unwrap()
470                .unwrap();
471
472            conn.query_drop(&*format!("KILL {}", id)).unwrap();
473            thread::sleep(Duration::from_millis(250));
474            pool.get_conn()
475                .unwrap()
476                .prep("SHOW FULL PROCESSLIST")
477                .unwrap();
478        }
479
480        #[test]
481        fn should_fix_connectivity_errors_on_prep_exec() {
482            let pool = Pool::new_manual(2, 2, get_opts()).unwrap();
483            let mut conn = pool.get_conn().unwrap();
484
485            let id: u32 = pool
486                .get_conn()
487                .unwrap()
488                .exec_first("SELECT CONNECTION_ID();", ())
489                .unwrap()
490                .unwrap();
491
492            conn.query_drop(&*format!("KILL {}", id)).unwrap();
493            thread::sleep(Duration::from_millis(250));
494            pool.get_conn()
495                .unwrap()
496                .exec_drop("SHOW FULL PROCESSLIST", ())
497                .unwrap();
498        }
499        #[test]
500        fn should_fix_connectivity_errors_on_start_transaction() {
501            let pool = Pool::new_manual(2, 2, get_opts()).unwrap();
502            let mut conn = pool.get_conn().unwrap();
503
504            let id: u32 = pool
505                .get_conn()
506                .unwrap()
507                .exec_first("SELECT CONNECTION_ID();", ())
508                .unwrap()
509                .unwrap();
510
511            conn.query_drop(&*format!("KILL {}", id)).unwrap();
512            thread::sleep(Duration::from_millis(250));
513            pool.start_transaction(TxOpts::default()).unwrap();
514        }
515        #[test]
516        fn should_execute_queryes_on_PooledConn() {
517            let pool = Pool::new(get_opts()).unwrap();
518            let mut threads = Vec::new();
519            for _ in 0usize..10 {
520                let pool = pool.clone();
521                threads.push(thread::spawn(move || {
522                    let conn = pool.get_conn();
523                    assert!(conn.is_ok());
524                    let mut conn = conn.unwrap();
525                    conn.query_drop("SELECT 1").unwrap();
526                }));
527            }
528            for t in threads.into_iter() {
529                assert!(t.join().is_ok());
530            }
531        }
532        #[test]
533        fn should_timeout_if_no_connections_available() {
534            let pool = Pool::new_manual(0, 1, get_opts()).unwrap();
535            let conn1 = pool.try_get_conn(357).unwrap();
536            let conn2 = pool.try_get_conn(357);
537            assert!(conn2.is_err());
538            match conn2 {
539                Err(Error::DriverError(DriverError::Timeout)) => assert!(true),
540                _ => assert!(false),
541            }
542            drop(conn1);
543            assert!(pool.try_get_conn(357).is_ok());
544        }
545
546        #[test]
547        fn should_execute_statements_on_PooledConn() {
548            let pool = Pool::new(get_opts()).unwrap();
549            let mut threads = Vec::new();
550            for _ in 0usize..10 {
551                let pool = pool.clone();
552                threads.push(thread::spawn(move || {
553                    let mut conn = pool.get_conn().unwrap();
554                    let stmt = conn.prep("SELECT 1").unwrap();
555                    conn.exec_drop(&stmt, ()).unwrap();
556                }));
557            }
558            for t in threads.into_iter() {
559                assert!(t.join().is_ok());
560            }
561
562            let pool = Pool::new(get_opts()).unwrap();
563            let mut threads = Vec::new();
564            for _ in 0usize..10 {
565                let pool = pool.clone();
566                threads.push(thread::spawn(move || {
567                    let mut conn = pool.get_conn().unwrap();
568                    conn.exec_drop("SELECT ?", (1,)).unwrap();
569                }));
570            }
571            for t in threads.into_iter() {
572                assert!(t.join().is_ok());
573            }
574        }
575
576        #[test]
577        #[allow(unused_variables)]
578        fn should_start_transaction_on_Pool() {
579            let pool = Pool::new_manual(1, 10, get_opts()).unwrap();
580            pool.get_conn()
581                .unwrap()
582                .query_drop("CREATE TEMPORARY TABLE mysql.tbl(a INT)")
583                .unwrap();
584            pool.start_transaction(TxOpts::default())
585                .and_then(|mut t| {
586                    t.query_drop("INSERT INTO mysql.tbl(a) VALUES(1)").unwrap();
587                    t.query_drop("INSERT INTO mysql.tbl(a) VALUES(2)").unwrap();
588                    t.commit()
589                })
590                .unwrap();
591            assert_eq!(
592                pool.get_conn()
593                    .unwrap()
594                    .query_first::<u8, _>("SELECT COUNT(a) FROM mysql.tbl")
595                    .unwrap()
596                    .unwrap(),
597                2_u8
598            );
599            pool.start_transaction(TxOpts::default())
600                .and_then(|mut t| {
601                    t.query_drop("INSERT INTO mysql.tbl(a) VALUES(1)").unwrap();
602                    t.query_drop("INSERT INTO mysql.tbl(a) VALUES(2)").unwrap();
603                    t.rollback()
604                })
605                .unwrap();
606            assert_eq!(
607                pool.get_conn()
608                    .unwrap()
609                    .query_first::<u8, _>("SELECT COUNT(a) FROM mysql.tbl")
610                    .unwrap()
611                    .unwrap(),
612                2_u8
613            );
614            pool.start_transaction(TxOpts::default())
615                .and_then(|mut t| {
616                    t.query_drop("INSERT INTO mysql.tbl(a) VALUES(1)").unwrap();
617                    t.query_drop("INSERT INTO mysql.tbl(a) VALUES(2)").unwrap();
618                    Ok(())
619                })
620                .unwrap();
621            assert_eq!(
622                pool.get_conn()
623                    .unwrap()
624                    .query_first::<u8, _>("SELECT COUNT(a) FROM mysql.tbl")
625                    .unwrap()
626                    .unwrap(),
627                2_u8
628            );
629            let mut a = A { pool, x: 0 };
630            let transaction = a.pool.start_transaction(TxOpts::default()).unwrap();
631            a.add();
632        }
633
634        #[test]
635        fn should_reuse_connections() -> crate::Result<()> {
636            let pool = Pool::new_manual(1, 1, get_opts())?;
637            let mut conn = pool.get_conn()?;
638
639            let server_version = conn.server_version();
640            let connection_id = conn.connection_id();
641
642            for _ in 0..16 {
643                drop(conn);
644                conn = pool.get_conn()?;
645                println!("CONN connection_id={}", conn.connection_id());
646                assert!(conn.connection_id() == connection_id || server_version < (5, 7, 2));
647            }
648
649            Ok(())
650        }
651
652        #[test]
653        fn should_start_transaction_on_PooledConn() {
654            let pool = Pool::new(get_opts()).unwrap();
655            let mut conn = pool.get_conn().unwrap();
656            conn.query_drop("CREATE TEMPORARY TABLE mysql.tbl(a INT)")
657                .unwrap();
658            conn.start_transaction(TxOpts::default())
659                .and_then(|mut t| {
660                    t.query_drop("INSERT INTO mysql.tbl(a) VALUES(1)").unwrap();
661                    t.query_drop("INSERT INTO mysql.tbl(a) VALUES(2)").unwrap();
662                    t.commit()
663                })
664                .unwrap();
665            for x in conn.query_iter("SELECT COUNT(a) FROM mysql.tbl").unwrap() {
666                let mut x = x.unwrap();
667                assert_eq!(from_value::<u8>(x.take(0).unwrap()), 2u8);
668            }
669            conn.start_transaction(TxOpts::default())
670                .and_then(|mut t| {
671                    t.query_drop("INSERT INTO mysql.tbl(a) VALUES(1)").unwrap();
672                    t.query_drop("INSERT INTO mysql.tbl(a) VALUES(2)").unwrap();
673                    t.rollback()
674                })
675                .unwrap();
676            for x in conn.query_iter("SELECT COUNT(a) FROM mysql.tbl").unwrap() {
677                let mut x = x.unwrap();
678                assert_eq!(from_value::<u8>(x.take(0).unwrap()), 2u8);
679            }
680            conn.start_transaction(TxOpts::default())
681                .and_then(|mut t| {
682                    t.query_drop("INSERT INTO mysql.tbl(a) VALUES(1)").unwrap();
683                    t.query_drop("INSERT INTO mysql.tbl(a) VALUES(2)").unwrap();
684                    Ok(())
685                })
686                .unwrap();
687            for x in conn.query_iter("SELECT COUNT(a) FROM mysql.tbl").unwrap() {
688                let mut x = x.unwrap();
689                assert_eq!(from_value::<u8>(x.take(0).unwrap()), 2u8);
690            }
691        }
692
693        #[cfg(feature = "nightly")]
694        mod bench {
695            use test;
696
697            use std::thread;
698
699            use crate::{prelude::*, test_misc::get_opts, Pool};
700
701            #[bench]
702            fn many_prepexecs(bencher: &mut test::Bencher) {
703                let pool = Pool::new(get_opts()).unwrap();
704                bencher.iter(|| {
705                    "SELECT 1".with(()).run(&pool).unwrap();
706                });
707            }
708
709            #[bench]
710            fn many_prepares_threaded(bencher: &mut test::Bencher) {
711                let pool = Pool::new(get_opts()).unwrap();
712                bencher.iter(|| {
713                    let mut threads = Vec::new();
714                    for _ in 0..4 {
715                        let pool = pool.clone();
716                        threads.push(thread::spawn(move || {
717                            for _ in 0..250 {
718                                test::black_box(
719                                    "SELECT 1, 'hello world', 123.321, ?, ?, ?"
720                                        .with(("hello", "world", 65536))
721                                        .run(&pool)
722                                        .unwrap(),
723                                );
724                            }
725                        }));
726                    }
727                    for t in threads {
728                        t.join().unwrap();
729                    }
730                });
731            }
732
733            #[bench]
734            fn many_prepares_threaded_no_cache(bencher: &mut test::Bencher) {
735                let mut pool = Pool::new(get_opts()).unwrap();
736                pool.use_cache(false);
737                bencher.iter(|| {
738                    let mut threads = Vec::new();
739                    for _ in 0..4 {
740                        let pool = pool.clone();
741                        threads.push(thread::spawn(move || {
742                            for _ in 0..250 {
743                                test::black_box(
744                                    "SELECT 1, 'hello world', 123.321, ?, ?, ?"
745                                        .with(("hello", "world", 65536))
746                                        .run(&pool)
747                                        .unwrap(),
748                                );
749                            }
750                        }));
751                    }
752                    for t in threads {
753                        t.join().unwrap();
754                    }
755                });
756            }
757        }
758    }
759}