use std::sync::Arc;
use std::thread;
use zero_mysql::Opts;
use zero_mysql::sync::Pool;
include!("common/check.rs");
include!("common/check_eq.rs");
const TEST_URL: &str = "mysql://test:1234@localhost:3306/test";
#[test]
fn pool_basic() -> Result<(), Box<dyn std::error::Error>> {
let opts = Opts::try_from(TEST_URL)?;
let pool = Arc::new(Pool::new(opts));
let mut conn = pool.get()?;
conn.query_drop("SELECT 1")?;
Ok(())
}
#[test]
fn pool_connection_reuse() -> Result<(), Box<dyn std::error::Error>> {
let mut opts = Opts::try_from(TEST_URL)?;
opts.pool_max_idle_conn = 1;
opts.pool_reset_conn = false;
let pool = Arc::new(Pool::new(opts));
let conn1 = pool.get()?;
let conn_id1 = conn1.connection_id();
drop(conn1);
let conn2 = pool.get()?;
let conn_id2 = conn2.connection_id();
check_eq!(conn_id1, conn_id2, "connection should be reused from pool");
Ok(())
}
#[test]
fn pool_max_idle_conn() -> Result<(), Box<dyn std::error::Error>> {
let mut opts = Opts::try_from(TEST_URL)?;
opts.pool_max_idle_conn = 2;
opts.pool_reset_conn = false;
let pool = Arc::new(Pool::new(opts));
let conn1 = pool.get()?;
let conn2 = pool.get()?;
let conn3 = pool.get()?;
let id1 = conn1.connection_id();
let id2 = conn2.connection_id();
let id3 = conn3.connection_id();
drop(conn1);
drop(conn2);
drop(conn3);
let conn_a = pool.get()?;
let conn_b = pool.get()?;
let id_a = conn_a.connection_id();
let id_b = conn_b.connection_id();
check!(
(id_a == id1 || id_a == id2) && (id_b == id1 || id_b == id2),
"connections should be reused from pool (got {id_a} and {id_b}, expected {id1} and {id2}), conn3 was dropped (id3={id3})"
);
Ok(())
}
#[test]
fn pool_max_concurrency() -> Result<(), Box<dyn std::error::Error>> {
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
let mut opts = Opts::try_from(TEST_URL)?;
opts.pool_max_concurrency = Some(2);
opts.pool_reset_conn = false;
let pool = Arc::new(Pool::new(opts));
let active_count = Arc::new(AtomicUsize::new(0));
let max_active = Arc::new(AtomicUsize::new(0));
let mut handles = vec![];
for _ in 0..4 {
let pool = Arc::clone(&pool);
let active_count = Arc::clone(&active_count);
let max_active = Arc::clone(&max_active);
handles.push(thread::spawn(
move || -> Result<(), zero_mysql::error::Error> {
let _conn = pool.get()?;
let current = active_count.fetch_add(1, Ordering::SeqCst) + 1;
loop {
let old_max = max_active.load(Ordering::SeqCst);
if current <= old_max {
break;
}
if max_active
.compare_exchange(old_max, current, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
{
break;
}
}
thread::sleep(Duration::from_millis(50));
active_count.fetch_sub(1, Ordering::SeqCst);
Ok(())
},
));
}
for handle in handles {
handle.join().expect("thread panicked")?;
}
let observed_max = max_active.load(Ordering::SeqCst);
check!(
observed_max <= 2,
"max concurrent connections should be limited to 2, but observed {observed_max}"
);
Ok(())
}
#[test]
fn pool_reset_conn() -> Result<(), Box<dyn std::error::Error>> {
let mut opts = Opts::try_from(TEST_URL)?;
opts.pool_max_idle_conn = 1;
opts.pool_reset_conn = true;
let pool = Arc::new(Pool::new(opts));
{
let mut conn = pool.get()?;
conn.query_drop("SET @test_var = 42")?;
}
{
let mut conn = pool.get()?;
struct VarHandler {
value: Option<String>,
}
impl zero_mysql::protocol::r#trait::TextResultSetHandler for VarHandler {
fn no_result_set(
&mut self,
_: zero_mysql::protocol::response::OkPayloadBytes,
) -> zero_mysql::error::Result<()> {
Ok(())
}
fn resultset_start(
&mut self,
_: &[zero_mysql::protocol::command::ColumnDefinition<'_>],
) -> zero_mysql::error::Result<()> {
Ok(())
}
fn resultset_end(
&mut self,
_: zero_mysql::protocol::response::OkPayloadBytes,
) -> zero_mysql::error::Result<()> {
Ok(())
}
fn row(
&mut self,
_: &[zero_mysql::protocol::command::ColumnDefinition<'_>],
row: zero_mysql::protocol::TextRowPayload<'_>,
) -> zero_mysql::error::Result<()> {
if row.0.first() == Some(&0xFB) {
self.value = None;
} else {
let (value, _) = zero_mysql::protocol::primitive::read_string_lenenc(row.0)?;
self.value = Some(String::from_utf8_lossy(value).into_owned());
}
Ok(())
}
}
let mut handler = VarHandler {
value: Some("not_null".to_string()),
};
conn.query("SELECT @test_var", &mut handler)?;
check!(
handler.value.is_none(),
"session variable should be NULL after connection reset, got {:?}",
handler.value
);
}
Ok(())
}
#[test]
fn pool_multi_threaded() -> Result<(), Box<dyn std::error::Error>> {
let mut opts = Opts::try_from(TEST_URL)?;
opts.pool_max_idle_conn = 5;
opts.pool_reset_conn = false;
let pool = Arc::new(Pool::new(opts));
let mut handles = vec![];
for i in 0..10 {
let pool = Arc::clone(&pool);
handles.push(thread::spawn(
move || -> Result<(), zero_mysql::error::Error> {
let mut conn = pool.get()?;
conn.query_drop(&format!("SELECT {i}"))?;
Ok(())
},
));
}
for handle in handles {
handle.join().expect("thread panicked")?;
}
Ok(())
}