use mysql_common::named_params::parse_named_params;
use mysql_common::row::convert::from_row;
use std::borrow::Borrow;
use std::collections::VecDeque;
use std::fmt;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Condvar, Mutex};
use std::time::Duration as StdDuration;
use crate::prelude::{FromRow, GenericConnection};
use crate::time::{Duration, SteadyTime};
use crate::{
Conn, DriverError, Error, IsolationLevel, LocalInfileHandler, Opts, Params, QueryResult,
Result as MyResult, Row, Stmt, Transaction,
};
#[derive(Debug)]
struct InnerPool {
opts: Opts,
pool: VecDeque<Conn>,
}
impl InnerPool {
fn new(min: usize, max: usize, opts: Opts) -> MyResult<InnerPool> {
if min > max || max == 0 {
return Err(Error::DriverError(DriverError::InvalidPoolConstraints));
}
let mut pool = InnerPool {
opts,
pool: VecDeque::with_capacity(max),
};
for _ in 0..min {
pool.new_conn()?;
}
Ok(pool)
}
fn new_conn(&mut self) -> MyResult<()> {
match Conn::new(self.opts.clone()) {
Ok(conn) => {
self.pool.push_back(conn);
Ok(())
}
Err(err) => Err(err),
}
}
}
#[derive(Clone)]
pub struct Pool {
inner: Arc<(Mutex<InnerPool>, Condvar)>,
min: Arc<AtomicUsize>,
max: Arc<AtomicUsize>,
count: Arc<AtomicUsize>,
check_health: bool,
use_cache: bool,
}
impl Pool {
fn _get_conn<T: AsRef<str>>(
&self,
stmt: Option<T>,
timeout_ms: Option<u32>,
call_ping: bool,
) -> MyResult<PooledConn> {
let times = if let Some(timeout_ms) = timeout_ms {
Some((
SteadyTime::now(),
Duration::milliseconds(timeout_ms.into()),
StdDuration::from_millis(timeout_ms.into()),
))
} else {
None
};
let &(ref inner_pool, ref condvar) = &*self.inner;
let conn = if self.use_cache {
if let Some(query) = stmt {
let mut id = None;
let mut pool = inner_pool.lock()?;
for (i, conn) in pool.pool.iter().rev().enumerate() {
if conn.has_stmt(query.as_ref()) {
id = Some(i);
break;
}
}
id.and_then(|id| pool.pool.swap_remove_back(id))
} else {
None
}
} else {
None
};
let mut conn = if let Some(conn) = conn {
conn
} else {
let out_conn;
let mut pool = inner_pool.lock()?;
loop {
if let Some(conn) = pool.pool.pop_front() {
drop(pool);
out_conn = Some(conn);
break;
} else if self.count.load(Ordering::Relaxed) < self.max.load(Ordering::Relaxed) {
pool.new_conn()?;
self.count.fetch_add(1, Ordering::SeqCst);
} else {
pool = if let Some((start, timeout, std_timeout)) = times {
if SteadyTime::now() - start > timeout {
return Err(DriverError::Timeout.into());
}
condvar.wait_timeout(pool, std_timeout)?.0
} else {
condvar.wait(pool)?
}
}
}
out_conn.unwrap()
};
if call_ping && self.check_health && !conn.ping() {
conn.reset()?;
}
Ok(PooledConn {
pool: self.clone(),
conn: Some(conn),
})
}
pub fn new<T: Into<Opts>>(opts: T) -> MyResult<Pool> {
Pool::new_manual(10, 100, opts)
}
pub fn new_manual<T: Into<Opts>>(min: usize, max: usize, opts: T) -> MyResult<Pool> {
let pool = InnerPool::new(min, max, opts.into())?;
Ok(Pool {
inner: Arc::new((Mutex::new(pool), Condvar::new())),
min: Arc::new(AtomicUsize::new(min)),
max: Arc::new(AtomicUsize::new(max)),
count: Arc::new(AtomicUsize::new(min)),
use_cache: true,
check_health: true,
})
}
pub fn use_cache(&mut self, use_cache: bool) {
self.use_cache = use_cache;
}
pub fn check_health(&mut self, check_health: bool) {
self.check_health = check_health;
}
pub fn get_conn(&self) -> MyResult<PooledConn> {
self._get_conn(None::<String>, None, true)
}
pub fn try_get_conn(&self, timeout_ms: u32) -> MyResult<PooledConn> {
self._get_conn(None::<String>, Some(timeout_ms), true)
}
fn get_conn_by_stmt<T: AsRef<str>>(&self, query: T, call_ping: bool) -> MyResult<PooledConn> {
self._get_conn(Some(query), None, call_ping)
}
pub fn prepare<T: AsRef<str>>(&self, query: T) -> MyResult<Stmt<'static>> {
let conn = self.get_conn_by_stmt(query.as_ref(), true)?;
conn.pooled_prepare(query)
}
pub fn prep_exec<A, T>(&self, query: A, params: T) -> MyResult<QueryResult<'static>>
where
A: AsRef<str>,
T: Into<Params>,
{
let conn = self.get_conn_by_stmt(query.as_ref(), false)?;
let params = params.into();
match conn.pooled_prep_exec(query.as_ref(), params.clone()) {
Ok(stmt) => Ok(stmt),
Err(e) => {
if e.is_connectivity_error() {
let conn = self._get_conn(None::<String>, None, true)?;
conn.pooled_prep_exec(query, params)
} else {
Err(e)
}
}
}
}
pub fn first_exec<Q, P>(&self, query: Q, params: P) -> MyResult<Option<Row>>
where
Q: AsRef<str>,
P: Into<Params>,
{
self.prep_exec(query, params).and_then(|mut result| {
if let Some(row) = result.next() {
row.map(Some)
} else {
Ok(None)
}
})
}
pub fn start_transaction(
&self,
consistent_snapshot: bool,
isolation_level: Option<IsolationLevel>,
readonly: Option<bool>,
) -> MyResult<Transaction<'static>> {
let conn = self._get_conn(None::<String>, None, false)?;
let result = conn.pooled_start_transaction(consistent_snapshot, isolation_level, readonly);
match result {
Ok(trans) => Ok(trans),
Err(ref e) if e.is_connectivity_error() => {
let conn = self._get_conn(None::<String>, None, true)?;
conn.pooled_start_transaction(consistent_snapshot, isolation_level, readonly)
}
Err(e) => Err(e),
}
}
}
impl fmt::Debug for Pool {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"Pool {{ min: {}, max: {}, count: {} }}",
self.min.load(Ordering::Relaxed),
self.max.load(Ordering::Relaxed),
self.count.load(Ordering::Relaxed)
)
}
}
#[derive(Debug)]
pub struct PooledConn {
pool: Pool,
conn: Option<Conn>,
}
impl Drop for PooledConn {
fn drop(&mut self) {
if self.pool.count.load(Ordering::Relaxed) > self.pool.max.load(Ordering::Relaxed)
|| self.conn.is_none()
{
self.pool.count.fetch_sub(1, Ordering::SeqCst);
} else {
self.conn.as_mut().unwrap().set_local_infile_handler(None);
let mut pool = (self.pool.inner).0.lock().unwrap();
pool.pool.push_back(self.conn.take().unwrap());
drop(pool);
(self.pool.inner).1.notify_one();
}
}
}
impl PooledConn {
pub fn query<T: AsRef<str>>(&mut self, query: T) -> MyResult<QueryResult<'_>> {
self.conn.as_mut().unwrap().query(query)
}
pub fn first<T: AsRef<str>, U: FromRow>(&mut self, query: T) -> MyResult<Option<U>> {
self.query(query).and_then(|mut result| {
if let Some(row) = result.next() {
row.map(|x| Some(from_row(x)))
} else {
Ok(None)
}
})
}
pub fn prepare<T: AsRef<str>>(&mut self, query: T) -> MyResult<Stmt<'_>> {
self.conn.as_mut().unwrap().prepare(query)
}
pub fn prep_exec<A, T>(&mut self, query: A, params: T) -> MyResult<QueryResult<'_>>
where
A: AsRef<str>,
T: Into<Params>,
{
self.conn.as_mut().unwrap().prep_exec(query, params)
}
pub fn first_exec<Q, P, T>(&mut self, query: Q, params: P) -> MyResult<Option<T>>
where
Q: AsRef<str>,
P: Into<Params>,
T: FromRow,
{
self.prep_exec(query, params).and_then(|mut result| {
if let Some(row) = result.next() {
row.map(|x| Some(from_row(x)))
} else {
Ok(None)
}
})
}
pub fn start_transaction(
&mut self,
consistent_snapshot: bool,
isolation_level: Option<IsolationLevel>,
readonly: Option<bool>,
) -> MyResult<Transaction> {
self.conn.as_mut().unwrap().start_transaction(
consistent_snapshot,
isolation_level,
readonly,
)
}
pub fn as_mut(&mut self) -> &mut Conn {
self.conn.as_mut().unwrap()
}
pub fn as_ref(&self) -> &Conn {
self.conn.as_ref().unwrap()
}
pub fn unwrap(mut self) -> Conn {
self.conn.take().unwrap()
}
fn pooled_prepare<'a, T: AsRef<str>>(mut self, query: T) -> MyResult<Stmt<'a>> {
let query = query.as_ref();
let (named_params, real_query) = parse_named_params(query)?;
self.as_mut()
._prepare(real_query.borrow(), named_params)
.map(|stmt| Stmt::new_pooled(stmt, self))
}
fn pooled_prep_exec<'a, A, T>(mut self, query: A, params: T) -> MyResult<QueryResult<'a>>
where
A: AsRef<str>,
T: Into<Params>,
{
let query = query.as_ref();
let (named_params, real_query) = parse_named_params(query)?;
let stmt = self.as_mut()._prepare(real_query.borrow(), named_params)?;
let stmt = Stmt::new_pooled(stmt, self);
stmt.prep_exec(params)
}
fn pooled_start_transaction<'a>(
mut self,
consistent_snapshot: bool,
isolation_level: Option<IsolationLevel>,
readonly: Option<bool>,
) -> MyResult<Transaction<'a>> {
self.as_mut()
._start_transaction(consistent_snapshot, isolation_level, readonly)?;
Ok(Transaction::new_pooled(self))
}
pub fn set_local_infile_handler(&mut self, handler: Option<LocalInfileHandler>) {
self.conn
.as_mut()
.unwrap()
.set_local_infile_handler(handler);
}
}
impl GenericConnection for PooledConn {
fn query<T: AsRef<str>>(&mut self, query: T) -> MyResult<QueryResult<'_>> {
self.query(query)
}
fn first<T: AsRef<str>, U: FromRow>(&mut self, query: T) -> MyResult<Option<U>> {
self.first(query)
}
fn prepare<T: AsRef<str>>(&mut self, query: T) -> MyResult<Stmt<'_>> {
self.prepare(query)
}
fn prep_exec<A, T>(&mut self, query: A, params: T) -> MyResult<QueryResult<'_>>
where
A: AsRef<str>,
T: Into<Params>,
{
self.prep_exec(query, params)
}
fn first_exec<Q, P, T>(&mut self, query: Q, params: P) -> MyResult<Option<T>>
where
Q: AsRef<str>,
P: Into<Params>,
T: FromRow,
{
self.first_exec(query, params)
}
}
#[cfg(test)]
#[allow(non_snake_case)]
mod test {
mod pool {
use std::{thread, time::Duration};
use crate::test_misc::get_opts;
use crate::{from_row, from_value, params, DriverError, Error, OptsBuilder, Pool};
#[test]
fn multiple_pools_should_work() {
let pool = Pool::new(get_opts()).unwrap();
pool.prep_exec("DROP DATABASE IF EXISTS A", ()).unwrap();
pool.prep_exec("CREATE DATABASE A", ()).unwrap();
pool.prep_exec("DROP TABLE IF EXISTS A.a", ()).unwrap();
pool.prep_exec("CREATE TABLE IF NOT EXISTS A.a (id INT)", ())
.unwrap();
pool.prep_exec("INSERT INTO A.a VALUES (1)", ()).unwrap();
let mut builder = OptsBuilder::from_opts(get_opts());
builder.db_name(Some("A"));
let pool2 = Pool::new(builder).unwrap();
let row = pool2
.first_exec("SELECT COUNT(*) FROM a", ())
.unwrap()
.unwrap();
assert_eq!((1u8,), from_row(row));
pool.prep_exec("DROP DATABASE A", ()).unwrap();
}
struct A {
pool: Pool,
x: u32,
}
impl A {
fn add(&mut self) {
self.x += 1;
}
}
#[test]
fn should_fix_connectivity_errors_on_prepare() {
let pool = Pool::new_manual(2, 2, get_opts()).unwrap();
let mut conn = pool.get_conn().unwrap();
let row = pool
.first_exec("SELECT CONNECTION_ID();", ())
.unwrap()
.unwrap();
let (id,): (u32,) = from_row(row);
conn.prep_exec("KILL CONNECTION ?", (id,)).unwrap();
thread::sleep(Duration::from_millis(250));
pool.prepare("SHOW FULL PROCESSLIST").unwrap();
}
#[test]
fn should_fix_connectivity_errors_on_prep_exec() {
let pool = Pool::new_manual(2, 2, get_opts()).unwrap();
let mut conn = pool.get_conn().unwrap();
let row = pool
.first_exec("SELECT CONNECTION_ID();", ())
.unwrap()
.unwrap();
let (id,): (u32,) = from_row(row);
conn.prep_exec("KILL CONNECTION ?", (id,)).unwrap();
thread::sleep(Duration::from_millis(250));
pool.prep_exec("SHOW FULL PROCESSLIST", ()).unwrap();
}
#[test]
fn should_fix_connectivity_errors_on_start_transaction() {
let pool = Pool::new_manual(2, 2, get_opts()).unwrap();
let mut conn = pool.get_conn().unwrap();
let row = pool
.first_exec("SELECT CONNECTION_ID();", ())
.unwrap()
.unwrap();
let (id,): (u32,) = from_row(row);
conn.prep_exec("KILL CONNECTION ?", (id,)).unwrap();
thread::sleep(Duration::from_millis(250));
pool.start_transaction(false, None, None).unwrap();
}
#[test]
fn should_execute_queryes_on_PooledConn() {
let pool = Pool::new(get_opts()).unwrap();
let mut threads = Vec::new();
for _ in 0usize..10 {
let pool = pool.clone();
threads.push(thread::spawn(move || {
let conn = pool.get_conn();
assert!(conn.is_ok());
let mut conn = conn.unwrap();
assert!(conn.query("SELECT 1").is_ok());
}));
}
for t in threads.into_iter() {
assert!(t.join().is_ok());
}
}
#[test]
fn should_timeout_if_no_connections_available() {
let pool = Pool::new_manual(0, 1, get_opts()).unwrap();
let conn1 = pool.try_get_conn(357).unwrap();
let conn2 = pool.try_get_conn(357);
assert!(conn2.is_err());
match conn2 {
Err(Error::DriverError(DriverError::Timeout)) => assert!(true),
_ => assert!(false),
}
drop(conn1);
assert!(pool.try_get_conn(357).is_ok());
}
#[test]
fn should_execute_statements_on_PooledConn() {
let pool = Pool::new(get_opts()).unwrap();
let mut threads = Vec::new();
for _ in 0usize..10 {
let pool = pool.clone();
threads.push(thread::spawn(move || {
let mut conn = pool.get_conn().unwrap();
let mut stmt = conn.prepare("SELECT 1").unwrap();
assert!(stmt.execute(()).is_ok());
}));
}
for t in threads.into_iter() {
assert!(t.join().is_ok());
}
let pool = Pool::new(get_opts()).unwrap();
let mut threads = Vec::new();
for _ in 0usize..10 {
let pool = pool.clone();
threads.push(thread::spawn(move || {
let mut conn = pool.get_conn().unwrap();
conn.prep_exec("SELECT ?", (1,)).unwrap();
}));
}
for t in threads.into_iter() {
assert!(t.join().is_ok());
}
}
#[test]
fn should_execute_statements_on_Pool() {
let pool = Pool::new(get_opts()).unwrap();
let mut threads = Vec::new();
for _ in 0usize..10 {
let pool = pool.clone();
threads.push(thread::spawn(move || {
let mut stmt = pool.prepare("SELECT 1").unwrap();
assert!(stmt.execute(()).is_ok());
}));
}
for t in threads.into_iter() {
assert!(t.join().is_ok());
}
let pool = Pool::new(get_opts()).unwrap();
let mut threads = Vec::new();
for _ in 0usize..10 {
let pool = pool.clone();
threads.push(thread::spawn(move || {
pool.prep_exec("SELECT ?", (1,)).unwrap();
}));
}
for t in threads.into_iter() {
assert!(t.join().is_ok());
}
pool.prep_exec("SELECT 1", ())
.and_then(|mut res1| {
pool.prep_exec("SELECT 2", ()).map(|mut res2| {
let (x1,) = from_row::<(u8,)>(res1.next().unwrap().unwrap());
let (x2,) = from_row::<(u8,)>(res2.next().unwrap().unwrap());
assert_eq!(1, x1);
assert_eq!(2, x2);
})
})
.unwrap()
}
#[test]
#[allow(unused_variables)]
fn should_start_transaction_on_Pool() {
let pool = Pool::new_manual(1, 10, get_opts()).unwrap();
pool.prepare("CREATE TEMPORARY TABLE mysql.tbl(a INT)")
.ok()
.map(|mut stmt| {
assert!(stmt.execute(()).is_ok());
});
pool.start_transaction(false, None, None)
.and_then(|mut t| {
t.query("INSERT INTO mysql.tbl(a) VALUES(1)").unwrap();
t.query("INSERT INTO mysql.tbl(a) VALUES(2)").unwrap();
t.commit()
})
.unwrap();
pool.prepare("SELECT COUNT(a) FROM mysql.tbl")
.ok()
.map(|mut stmt| {
for x in stmt.execute(()).unwrap() {
let mut x = x.unwrap();
assert_eq!(from_value::<u8>(x.take(0).unwrap()), 2u8);
}
});
pool.start_transaction(false, None, None)
.and_then(|mut t| {
t.query("INSERT INTO mysql.tbl(a) VALUES(1)").unwrap();
t.query("INSERT INTO mysql.tbl(a) VALUES(2)").unwrap();
t.rollback()
})
.unwrap();
pool.prepare("SELECT COUNT(a) FROM mysql.tbl")
.ok()
.map(|mut stmt| {
for x in stmt.execute(()).unwrap() {
let mut x = x.unwrap();
assert_eq!(from_value::<u8>(x.take(0).unwrap()), 2u8);
}
});
pool.start_transaction(false, None, None)
.and_then(|mut t| {
t.query("INSERT INTO mysql.tbl(a) VALUES(1)").unwrap();
t.query("INSERT INTO mysql.tbl(a) VALUES(2)").unwrap();
Ok(())
})
.unwrap();
pool.prepare("SELECT COUNT(a) FROM mysql.tbl")
.ok()
.map(|mut stmt| {
for x in stmt.execute(()).unwrap() {
let mut x = x.unwrap();
assert_eq!(from_value::<u8>(x.take(0).unwrap()), 2u8);
}
});
let mut a = A { pool, x: 0 };
let transaction = a.pool.start_transaction(false, None, None).unwrap();
a.add();
}
#[test]
fn should_start_transaction_on_PooledConn() {
let pool = Pool::new(get_opts()).unwrap();
let mut conn = pool.get_conn().unwrap();
assert!(conn
.query("CREATE TEMPORARY TABLE mysql.tbl(a INT)")
.is_ok());
assert!(conn
.start_transaction(false, None, None)
.and_then(|mut t| {
assert!(t.query("INSERT INTO mysql.tbl(a) VALUES(1)").is_ok());
assert!(t.query("INSERT INTO mysql.tbl(a) VALUES(2)").is_ok());
t.commit()
})
.is_ok());
for x in conn.query("SELECT COUNT(a) FROM mysql.tbl").unwrap() {
let mut x = x.unwrap();
assert_eq!(from_value::<u8>(x.take(0).unwrap()), 2u8);
}
assert!(conn
.start_transaction(false, None, None)
.and_then(|mut t| {
assert!(t.query("INSERT INTO mysql.tbl(a) VALUES(1)").is_ok());
assert!(t.query("INSERT INTO mysql.tbl(a) VALUES(2)").is_ok());
t.rollback()
})
.is_ok());
for x in conn.query("SELECT COUNT(a) FROM mysql.tbl").unwrap() {
let mut x = x.unwrap();
assert_eq!(from_value::<u8>(x.take(0).unwrap()), 2u8);
}
assert!(conn
.start_transaction(false, None, None)
.and_then(|mut t| {
assert!(t.query("INSERT INTO mysql.tbl(a) VALUES(1)").is_ok());
assert!(t.query("INSERT INTO mysql.tbl(a) VALUES(2)").is_ok());
Ok(())
})
.is_ok());
for x in conn.query("SELECT COUNT(a) FROM mysql.tbl").unwrap() {
let mut x = x.unwrap();
assert_eq!(from_value::<u8>(x.take(0).unwrap()), 2u8);
}
}
#[test]
fn should_work_with_named_params() {
let pool = Pool::new(get_opts()).unwrap();
pool.prepare("SELECT :a, :b, :a + :b, :abc")
.map(|mut stmt| {
let mut result = stmt
.execute(params! {
"a" => 1,
"b" => 2,
"abc" => 4,
})
.unwrap();
let row = result.next().unwrap().unwrap();
assert_eq!((1, 2, 3, 4), from_row(row));
})
.unwrap();
let params = params! {"a" => 1, "b" => 2, "abc" => 4};
pool.prep_exec("SELECT :a, :b, :a+:b, :abc", params)
.map(|mut result| {
let row = result.next().unwrap().unwrap();
assert_eq!((1, 2, 3, 4), from_row(row));
})
.unwrap();
}
#[cfg(feature = "nightly")]
mod bench {
use test;
use std::thread;
use crate::test_misc::get_opts;
use crate::Pool;
#[bench]
fn many_prepares(bencher: &mut test::Bencher) {
let pool = Pool::new(get_opts()).unwrap();
bencher.iter(|| {
pool.prepare("SELECT 1").unwrap();
});
}
#[bench]
fn many_prepexecs(bencher: &mut test::Bencher) {
let pool = Pool::new(get_opts()).unwrap();
bencher.iter(|| {
pool.prep_exec("SELECT 1", ()).unwrap();
});
}
#[bench]
fn many_prepares_threaded(bencher: &mut test::Bencher) {
let pool = Pool::new(get_opts()).unwrap();
bencher.iter(|| {
let mut threads = Vec::new();
for _ in 0..4 {
let pool = pool.clone();
threads.push(thread::spawn(move || {
for _ in 0..250 {
test::black_box(
pool.prep_exec(
"SELECT 1, 'hello world', 123.321, ?, ?, ?",
("hello", "world", 65536),
)
.unwrap(),
);
}
}));
}
for t in threads {
t.join().unwrap();
}
});
}
#[bench]
fn many_prepares_threaded_no_cache(bencher: &mut test::Bencher) {
let mut pool = Pool::new(get_opts()).unwrap();
pool.use_cache(false);
bencher.iter(|| {
let mut threads = Vec::new();
for _ in 0..4 {
let pool = pool.clone();
threads.push(thread::spawn(move || {
for _ in 0..250 {
test::black_box(
pool.prep_exec(
"SELECT 1, 'hello world', 123.321, ?, ?, ?",
("hello", "world", 65536),
)
.unwrap(),
);
}
}));
}
for t in threads {
t.join().unwrap();
}
});
}
}
}
}