use std::sync::{Arc, Mutex};
use super::IsolationLevel;
use super::Transaction;
use super::super::error::{MyError, DriverError};
use super::super::value::ToRow;
use super::{MyConn, MyOpts, Stmt, QueryResult};
use super::super::error::{MyResult};
#[derive(Debug)]
struct MyInnerPool {
opts: MyOpts,
pool: Vec<MyConn>,
min: usize,
max: usize,
count: usize
}
impl MyInnerPool {
fn new(min: usize, max: usize, opts: MyOpts) -> MyResult<MyInnerPool> {
if min > max || max == 0 {
return Err(MyError::MyDriverError(DriverError::InvalidPoolConstraints));
}
let mut pool = MyInnerPool {
opts: opts,
pool: Vec::with_capacity(max),
max: max,
min: min,
count: 0
};
for _ in 0..min {
try!(pool.new_conn());
}
Ok(pool)
}
fn new_conn(&mut self) -> MyResult<()> {
match MyConn::new(self.opts.clone()) {
Ok(conn) => {
self.pool.push(conn);
Ok(())
},
Err(err) => Err(err)
}
}
}
#[derive(Clone, Debug)]
pub struct MyPool {
pool: Arc<Mutex<MyInnerPool>>
}
impl MyPool {
pub fn new(opts: MyOpts) -> MyResult<MyPool> {
MyPool::new_manual(10, 100, opts)
}
pub fn new_manual(min: usize, max: usize, opts: MyOpts) -> MyResult<MyPool> {
let pool = try!(MyInnerPool::new(min, max, opts));
Ok(MyPool{ pool: Arc::new(Mutex::new(pool)) })
}
pub fn get_conn(&self) -> MyResult<MyPooledConn> {
let mut pool = match self.pool.lock() {
Ok(mutex) => mutex,
_ => return Err(MyError::MyDriverError(
DriverError::PoisonedPoolMutex)),
};
while pool.pool.is_empty() {
if pool.count < pool.max {
match pool.new_conn() {
Ok(()) => {
pool.count += 1;
break;
},
Err(err) => return Err(err)
}
}
}
let mut conn = pool.pool.pop().unwrap();
if !conn.ping() {
try!(conn.reset());
}
Ok(MyPooledConn {pool: self.clone(), conn: Some(conn)})
}
fn get_conn_by_stmt<T: AsRef<str>>(&self, query: T) -> MyResult<MyPooledConn> {
let conn = {
let mut pool = match self.pool.lock() {
Ok(mutex) => mutex,
_ => return Err(MyError::MyDriverError(
DriverError::PoisonedPoolMutex)),
};
let mut id = None;
for (i, conn) in pool.pool.iter().enumerate() {
if conn.has_stmt(query.as_ref()) {
id = Some(i);
break;
}
}
if let Some(id) = id {
let mut conn = pool.pool.remove(id);
if !conn.ping() {
try!(conn.reset());
}
Some(MyPooledConn {pool: self.clone(), conn: Some(conn)})
} else {
None
}
};
match conn {
Some(pooled_conn) => Ok(pooled_conn),
None => self.get_conn(),
}
}
pub fn prepare<'a, T: AsRef<str> + 'a>(&'a self, query: T) -> MyResult<Stmt<'a>> {
let conn = try!(self.get_conn_by_stmt(query.as_ref()));
conn.pooled_prepare(query)
}
pub fn prep_exec<'a, A: AsRef<str>, T: ToRow>(&'a self, query: A, params: T) -> MyResult<QueryResult<'a>> {
let conn = try!(self.get_conn_by_stmt(query.as_ref()));
conn.pooled_prep_exec(query, params)
}
pub fn start_transaction(&self,
consistent_snapshot: bool,
isolation_level: Option<IsolationLevel>,
readonly: Option<bool>) -> MyResult<Transaction> {
(try!(self.get_conn())).pooled_start_transaction(consistent_snapshot, isolation_level, readonly)
}
}
#[derive(Debug)]
pub struct MyPooledConn {
pool: MyPool,
conn: Option<MyConn>
}
impl Drop for MyPooledConn {
fn drop(&mut self) {
let mut pool = self.pool.pool.lock().unwrap();
if pool.count > pool.min || self.conn.is_none() {
pool.count -= 1;
} else {
pool.pool.push(self.conn.take().unwrap());
}
}
}
impl MyPooledConn {
pub fn query<'a, T: AsRef<str> + 'a>(&'a mut self, query: T) -> MyResult<QueryResult<'a>> {
self.conn.as_mut().unwrap().query(query)
}
pub fn prepare<'a, T: AsRef<str> + 'a>(&'a mut self, query: T) -> MyResult<Stmt<'a>> {
self.conn.as_mut().unwrap().prepare(query)
}
pub fn prep_exec<'a, A: AsRef<str> + 'a, T: ToRow>(&'a mut self, query: A, params: T) -> MyResult<QueryResult<'a>> {
self.conn.as_mut().unwrap().prep_exec(query, params)
}
pub fn start_transaction<'a>(&'a mut self,
consistent_snapshot: bool,
isolation_level: Option<IsolationLevel>,
readonly: Option<bool>) -> MyResult<Transaction<'a>> {
self.conn.as_mut().unwrap().start_transaction(consistent_snapshot,
isolation_level,
readonly)
}
pub fn as_mut<'a>(&'a mut self) -> &'a mut MyConn {
self.conn.as_mut().unwrap()
}
pub fn as_ref<'a>(&'a self) -> &'a MyConn {
self.conn.as_ref().unwrap()
}
pub fn unwrap(mut self) -> MyConn {
self.conn.take().unwrap()
}
fn pooled_prepare<'a, T: AsRef<str>>(mut self, query: T) -> MyResult<Stmt<'a>> {
match self.as_mut()._prepare(query.as_ref()) {
Ok(stmt) => Ok(Stmt::new_pooled(stmt, self)),
Err(err) => Err(err)
}
}
fn pooled_prep_exec<'a, A: AsRef<str>, T: ToRow>(mut self, query: A, params: T) -> MyResult<QueryResult<'a>> {
let stmt = try!(self.as_mut()._prepare(query.as_ref()));
let stmt = Stmt::new_pooled(stmt, self);
stmt.prep_exec(params)
}
fn pooled_start_transaction<'a>(mut self,
consistent_snapshot: bool,
isolation_level: Option<IsolationLevel>,
readonly: Option<bool>) -> MyResult<Transaction<'a>> {
let _ = try!(self.as_mut()._start_transaction(consistent_snapshot,
isolation_level,
readonly));
Ok(Transaction::new_pooled(self))
}
}
#[cfg(test)]
#[allow(non_snake_case)]
mod test {
use conn::MyOpts;
use std::default::Default;
pub static USER: &'static str = "root";
pub static PASS: &'static str = "password";
pub static ADDR: &'static str = "127.0.0.1";
pub static PORT: u16 = 3307;
#[cfg(feature = "openssl")]
pub fn get_opts() -> MyOpts {
let pwd: String = ::std::env::var("MYSQL_SERVER_PASS").unwrap_or(PASS.to_string());
let port: u16 = ::std::env::var("MYSQL_SERVER_PORT").ok()
.map(|my_port| my_port.parse().ok().unwrap_or(PORT))
.unwrap_or(PORT);
MyOpts {
user: Some(USER.to_string()),
pass: Some(pwd),
tcp_addr: Some(ADDR.to_string()),
tcp_port: port,
ssl_opts: Some((::std::convert::From::from("tests/ca-cert.pem"), None)),
..Default::default()
}
}
#[cfg(not(feature = "ssl"))]
pub fn get_opts() -> MyOpts {
let pwd: String = ::std::env::var("MYSQL_SERVER_PASS").unwrap_or(PASS.to_string());
let port: u16 = ::std::env::var("MYSQL_SERVER_PORT").ok()
.map(|my_port| my_port.parse().ok().unwrap_or(PORT))
.unwrap_or(PORT);
MyOpts {
user: Some(USER.to_string()),
pass: Some(pwd),
tcp_addr: Some(ADDR.to_string()),
tcp_port: port,
..Default::default()
}
}
mod pool {
use super::get_opts;
use std::thread;
use super::super::MyPool;
use super::super::super::super::value::from_value;
#[test]
fn should_execute_queryes_on_MyPooledConn() {
let pool = MyPool::new(get_opts()).unwrap();
let mut threads = Vec::new();
for _ in 0usize..10 {
let pool = pool.clone();
threads.push(thread::spawn(move || {
let conn = pool.get_conn();
assert!(conn.is_ok());
let mut conn = conn.unwrap();
assert!(conn.query("SELECT 1").is_ok());
}));
}
for t in threads.into_iter() {
assert!(t.join().is_ok());
}
}
#[test]
fn should_execute_statements_on_MyPooledConn() {
let pool = MyPool::new(get_opts()).unwrap();
let mut threads = Vec::new();
for _ in 0usize..10 {
let pool = pool.clone();
threads.push(thread::spawn(move || {
let mut conn = pool.get_conn().unwrap();
let mut stmt = conn.prepare("SELECT 1").unwrap();
assert!(stmt.execute(()).is_ok());
}));
}
for t in threads.into_iter() {
assert!(t.join().is_ok());
}
let pool = MyPool::new(get_opts()).unwrap();
let mut threads = Vec::new();
for _ in 0usize..10 {
let pool = pool.clone();
threads.push(thread::spawn(move || {
let mut conn = pool.get_conn().unwrap();
conn.prep_exec("SELECT ?", (1,)).unwrap();
}));
}
for t in threads.into_iter() {
assert!(t.join().is_ok());
}
}
#[test]
fn should_execute_statements_on_MyPool() {
let pool = MyPool::new(get_opts()).unwrap();
let mut threads = Vec::new();
for _ in 0usize..10 {
let pool = pool.clone();
threads.push(thread::spawn(move || {
let mut stmt = pool.prepare("SELECT 1").unwrap();
assert!(stmt.execute(()).is_ok());
}));
}
for t in threads.into_iter() {
assert!(t.join().is_ok());
}
let pool = MyPool::new(get_opts()).unwrap();
let mut threads = Vec::new();
for _ in 0usize..10 {
let pool = pool.clone();
threads.push(thread::spawn(move || {
pool.prep_exec("SELECT ?", (1,)).unwrap();
}));
}
for t in threads.into_iter() {
assert!(t.join().is_ok());
}
}
#[test]
fn should_start_transaction_on_MyPool() {
let pool = MyPool::new(get_opts()).unwrap();
pool.prepare("CREATE TEMPORARY TABLE x.tbl(a INT)").ok().map(|mut stmt| {
assert!(stmt.execute(()).is_ok());
});
assert!(pool.start_transaction(false, None, None).and_then(|mut t| {
assert!(t.query("INSERT INTO x.tbl(a) VALUES(1)").is_ok());
assert!(t.query("INSERT INTO x.tbl(a) VALUES(2)").is_ok());
t.commit()
}).is_ok());
pool.prepare("SELECT COUNT(a) FROM x.tbl").ok().map(|mut stmt| {
for x in stmt.execute(()).unwrap() {
let mut x = x.unwrap();
assert_eq!(from_value::<u8>(x.pop().unwrap()), 2u8);
}
});
assert!(pool.start_transaction(false, None, None).and_then(|mut t| {
assert!(t.query("INSERT INTO x.tbl(a) VALUES(1)").is_ok());
assert!(t.query("INSERT INTO x.tbl(a) VALUES(2)").is_ok());
t.rollback()
}).is_ok());
pool.prepare("SELECT COUNT(a) FROM x.tbl").ok().map(|mut stmt| {
for x in stmt.execute(()).unwrap() {
let mut x = x.unwrap();
assert_eq!(from_value::<u8>(x.pop().unwrap()), 2u8);
}
});
assert!(pool.start_transaction(false, None, None).and_then(|mut t| {
assert!(t.query("INSERT INTO x.tbl(a) VALUES(1)").is_ok());
assert!(t.query("INSERT INTO x.tbl(a) VALUES(2)").is_ok());
Ok(())
}).is_ok());
pool.prepare("SELECT COUNT(a) FROM x.tbl").ok().map(|mut stmt| {
for x in stmt.execute(()).unwrap() {
let mut x = x.unwrap();
assert_eq!(from_value::<u8>(x.pop().unwrap()), 2u8);
}
});
}
#[test]
fn should_start_transaction_on_MyPooledConn() {
let pool = MyPool::new(get_opts()).unwrap();
let mut conn = pool.get_conn().unwrap();
assert!(conn.query("CREATE TEMPORARY TABLE x.tbl(a INT)").is_ok());
assert!(conn.start_transaction(false, None, None).and_then(|mut t| {
assert!(t.query("INSERT INTO x.tbl(a) VALUES(1)").is_ok());
assert!(t.query("INSERT INTO x.tbl(a) VALUES(2)").is_ok());
t.commit()
}).is_ok());
for x in conn.query("SELECT COUNT(a) FROM x.tbl").unwrap() {
let mut x = x.unwrap();
assert_eq!(from_value::<u8>(x.pop().unwrap()), 2u8);
}
assert!(conn.start_transaction(false, None, None).and_then(|mut t| {
assert!(t.query("INSERT INTO x.tbl(a) VALUES(1)").is_ok());
assert!(t.query("INSERT INTO x.tbl(a) VALUES(2)").is_ok());
t.rollback()
}).is_ok());
for x in conn.query("SELECT COUNT(a) FROM x.tbl").unwrap() {
let mut x = x.unwrap();
assert_eq!(from_value::<u8>(x.pop().unwrap()), 2u8);
}
assert!(conn.start_transaction(false, None, None).and_then(|mut t| {
assert!(t.query("INSERT INTO x.tbl(a) VALUES(1)").is_ok());
assert!(t.query("INSERT INTO x.tbl(a) VALUES(2)").is_ok());
Ok(())
}).is_ok());
for x in conn.query("SELECT COUNT(a) FROM x.tbl").unwrap() {
let mut x = x.unwrap();
assert_eq!(from_value::<u8>(x.pop().unwrap()), 2u8);
}
}
}
}