use std::time::Duration;
use std::sync::Arc;
use apexbase::data::Value;
use apexbase::embedded::Row;
use deadpool_apexbase::{Config, InteractError, Pool, Runtime, Timeouts};
fn create_pool() -> (tempfile::TempDir, Pool) {
let tmp = tempfile::tempdir().unwrap();
let cfg = Config::new(tmp.path());
let pool = cfg.create_pool(Runtime::Tokio1).unwrap();
(tmp, pool)
}
fn row1(name: &str, age: i64, score: f64, city: &str) -> Row {
let mut r = Row::new();
r.insert("name".to_string(), Value::String(name.to_string()));
r.insert("age".to_string(), Value::Int64(age));
r.insert("score".to_string(), Value::Float64(score));
r.insert("city".to_string(), Value::String(city.to_string()));
r
}
#[tokio::test]
async fn test_basic_crud() {
let (_tmp, pool) = create_pool();
let db = pool.get().await.unwrap();
let (id, count) = db
.interact(|apexdb| {
let table = apexdb.create_table("t1").unwrap();
let id = table.insert(row1("Alice", 30, 92.5, "NY")).unwrap();
let count = table.count().unwrap();
(id, count)
})
.await
.unwrap();
assert_eq!(count, 1);
assert_eq!(id, 1);
}
#[tokio::test]
async fn test_insert_and_retrieve() {
let (_tmp, pool) = create_pool();
let db = pool.get().await.unwrap();
let retrieved_name = db
.interact(|apexdb| {
let table = apexdb.create_table("t2").unwrap();
let mut r = Row::new();
r.insert("x".to_string(), Value::Int64(42));
let id = table.insert(r).unwrap();
let row = table.retrieve(id).unwrap().unwrap();
row.get("x").cloned()
})
.await
.unwrap();
assert_eq!(retrieved_name, Some(Value::Int64(42)));
}
#[tokio::test]
async fn test_delete() {
let (_tmp, pool) = create_pool();
let db = pool.get().await.unwrap();
let deleted = db
.interact(|apexdb| {
let table = apexdb.create_table("t3").unwrap();
let mut r = Row::new();
r.insert("v".to_string(), Value::Int64(1));
let id = table.insert(r).unwrap();
let deleted = table.delete(id).unwrap();
let count = table.count().unwrap();
(deleted, count)
})
.await
.unwrap();
assert!(deleted.0);
assert_eq!(deleted.1, 0);
}
#[tokio::test]
async fn test_replace() {
let (_tmp, pool) = create_pool();
let db = pool.get().await.unwrap();
let result = db
.interact(|apexdb| {
let table = apexdb.create_table("rep_t").unwrap();
let id = table.insert(row1("Alice", 30, 90.0, "NY")).unwrap();
let mut updated = Row::new();
updated.insert("name".to_string(), Value::String("Alice-v2".to_string()));
updated.insert("age".to_string(), Value::Int64(31));
updated.insert("score".to_string(), Value::Float64(99.0));
updated.insert("city".to_string(), Value::String("LA".to_string()));
let replaced = table.replace(id, updated).unwrap();
let rs = table
.execute(&format!(
"SELECT name, age, score FROM rep_t WHERE _id = {}",
id
))
.unwrap();
let rows = rs.to_rows().unwrap();
(replaced, rows)
})
.await
.unwrap();
assert!(result.0);
assert_eq!(result.1.len(), 1);
assert_eq!(
result.1[0].get("name"),
Some(&Value::String("Alice-v2".to_string()))
);
assert_eq!(result.1[0].get("age"), Some(&Value::Int64(31)));
}
#[tokio::test]
async fn test_insert_batch() {
let (_tmp, pool) = create_pool();
let db = pool.get().await.unwrap();
let (ids, count) = db
.interact(|apexdb| {
let table = apexdb.create_table("batch_t").unwrap();
let records: Vec<Row> = (0..100i64)
.map(|i| {
let mut r = Row::new();
r.insert("i".to_string(), Value::Int64(i));
r
})
.collect();
let ids = table.insert_batch(&records).unwrap();
let count = table.count().unwrap();
(ids, count)
})
.await
.unwrap();
assert_eq!(ids.len(), 100);
assert_eq!(count, 100);
}
#[tokio::test]
async fn test_sql_query() {
let (_tmp, pool) = create_pool();
let db = pool.get().await.unwrap();
let num_rows = db
.interact(|apexdb| {
let table = apexdb.create_table("filter_t").unwrap();
for (name, age) in &[("A", 20i64), ("B", 30), ("C", 40), ("D", 25), ("E", 35)] {
let mut r = Row::new();
r.insert("name".to_string(), Value::String(name.to_string()));
r.insert("age".to_string(), Value::Int64(*age));
table.insert(r).unwrap();
}
let rs = table
.execute("SELECT name FROM filter_t WHERE age > 28")
.unwrap();
rs.num_rows()
})
.await
.unwrap();
assert_eq!(num_rows, 3);
}
#[tokio::test]
async fn test_sql_order_by_limit() {
let (_tmp, pool) = create_pool();
let db = pool.get().await.unwrap();
let scores = db
.interact(|apexdb| {
let table = apexdb.create_table("ord_t").unwrap();
for score in &[50i64, 90, 30, 70, 10] {
let mut r = Row::new();
r.insert("score".to_string(), Value::Int64(*score));
table.insert(r).unwrap();
}
let rs = table
.execute("SELECT score FROM ord_t ORDER BY score DESC LIMIT 3")
.unwrap();
let rows = rs.to_rows().unwrap();
rows.into_iter()
.map(|r| r.get("score").cloned())
.collect::<Vec<_>>()
})
.await
.unwrap();
assert_eq!(scores.len(), 3);
assert_eq!(scores[0], Some(Value::Int64(90)));
assert_eq!(scores[1], Some(Value::Int64(70)));
assert_eq!(scores[2], Some(Value::Int64(50)));
}
#[tokio::test]
async fn test_sql_group_by() {
let (_tmp, pool) = create_pool();
let db = pool.get().await.unwrap();
let rows = db
.interact(|apexdb| {
let table = apexdb.create_table("grp_t").unwrap();
for city in &["NY", "NY", "LA", "LA", "LA", "Tokyo"] {
let mut r = Row::new();
r.insert("city".to_string(), Value::String(city.to_string()));
r.insert("v".to_string(), Value::Int64(1));
table.insert(r).unwrap();
}
let rs = table
.execute(
"SELECT city, COUNT(*) AS n FROM grp_t GROUP BY city ORDER BY n DESC",
)
.unwrap();
rs.to_rows().unwrap()
})
.await
.unwrap();
assert_eq!(rows.len(), 3);
assert_eq!(rows[0].get("city"), Some(&Value::String("LA".to_string())));
}
#[tokio::test]
async fn test_delete_batch() {
let (_tmp, pool) = create_pool();
let db = pool.get().await.unwrap();
let (deleted, remaining) = db
.interact(|apexdb| {
let table = apexdb.create_table("del_batch_t").unwrap();
let ids: Vec<u64> = (0..5i64)
.map(|i| {
let mut r = Row::new();
r.insert("i".to_string(), Value::Int64(i));
table.insert(r).unwrap()
})
.collect();
let deleted = table.delete_batch(&ids[0..3]).unwrap();
let remaining = table.count().unwrap();
(deleted, remaining)
})
.await
.unwrap();
assert_eq!(deleted, 3);
assert_eq!(remaining, 2);
}
#[tokio::test]
async fn test_retrieve_nonexistent() {
let (_tmp, pool) = create_pool();
let db = pool.get().await.unwrap();
let is_none = db
.interact(|apexdb| {
let table = apexdb.create_table("norow_t").unwrap();
let mut r = Row::new();
r.insert("v".to_string(), Value::Int64(1));
table.insert(r).unwrap();
table.retrieve(99999).unwrap().is_none()
})
.await
.unwrap();
assert!(is_none);
}
#[tokio::test]
async fn test_exists() {
let (_tmp, pool) = create_pool();
let db = pool.get().await.unwrap();
let (_id, exists_after_insert, exists_after_delete) = db
.interact(|apexdb| {
let table = apexdb.create_table("exists_t").unwrap();
let mut r = Row::new();
r.insert("k".to_string(), Value::Int64(1));
let id = table.insert(r).unwrap();
let exists_after = table.exists(id).unwrap();
table.delete(id).unwrap();
let exists_after_del = table.exists(id).unwrap();
(id, exists_after, exists_after_del)
})
.await
.unwrap();
assert!(exists_after_insert);
assert!(!exists_after_delete);
}
#[tokio::test]
async fn test_delete_nonexistent() {
let (_tmp, pool) = create_pool();
let db = pool.get().await.unwrap();
let result = db
.interact(|apexdb| {
let table = apexdb.create_table("nodel_t").unwrap();
let mut r = Row::new();
r.insert("v".to_string(), Value::Int64(1));
table.insert(r).unwrap();
table.delete(99999).unwrap()
})
.await
.unwrap();
assert!(!result);
}
#[tokio::test]
async fn test_pool_reuse() {
let (_tmp, pool) = create_pool();
{
let db = pool.get().await.unwrap();
let count = db
.interact(|apexdb| {
let table = apexdb.create_table("reuse_t").unwrap();
let mut r = Row::new();
r.insert("v".to_string(), Value::Int64(42));
table.insert(r).unwrap();
table.count().unwrap()
})
.await
.unwrap();
assert_eq!(count, 1);
}
{
let db = pool.get().await.unwrap();
let count = db
.interact(|apexdb| {
let table = apexdb.table("reuse_t").unwrap();
table.count().unwrap()
})
.await
.unwrap();
assert_eq!(count, 1);
}
}
#[tokio::test]
async fn test_panic_recovery() {
let (_tmp, pool) = create_pool();
{
let db = pool.get().await.unwrap();
let result = db
.interact::<_, ()>(|_apexdb| {
panic!("Whopsies!");
})
.await;
assert!(matches!(result, Err(InteractError::Panic(_))));
}
let db = pool.get().await.unwrap();
let result = db
.interact(|apexdb| {
let table = apexdb.create_table("panic_recovery_t").unwrap();
let mut r = Row::new();
r.insert("v".to_string(), Value::Int64(1));
let id = table.insert(r).unwrap();
table.retrieve(id).unwrap()
})
.await
.unwrap();
assert!(result.is_some());
}
#[tokio::test]
async fn test_multiple_connections() {
let (_tmp, pool) = create_pool();
let db1 = pool.get().await.unwrap();
let db2 = pool.get().await.unwrap();
let db3 = pool.get().await.unwrap();
let r1 = db1
.interact(|apexdb| {
let table = apexdb.create_table("multi_a").unwrap();
let mut r = Row::new();
r.insert("v".to_string(), Value::Int64(1));
table.insert(r).unwrap()
})
.await;
let r2 = db2
.interact(|apexdb| {
let table = apexdb.create_table("multi_b").unwrap();
let mut r = Row::new();
r.insert("v".to_string(), Value::Int64(2));
table.insert(r).unwrap()
})
.await;
let r3 = db3
.interact(|apexdb| {
let table = apexdb.create_table("multi_c").unwrap();
let mut r = Row::new();
r.insert("v".to_string(), Value::Int64(3));
table.insert(r).unwrap()
})
.await;
assert!(r1.is_ok());
assert!(r2.is_ok());
assert!(r3.is_ok());
}
#[tokio::test]
async fn test_pool_exhaustion() {
let (_tmp, pool) = create_pool();
let timeout = Timeouts {
create: Some(Duration::from_secs(1)),
wait: Some(Duration::from_secs(1)),
recycle: Some(Duration::from_secs(1)),
};
let max_size = pool.status().max_size;
let get_times = max_size + 1;
let mut connections = Vec::new();
for i in 0..get_times {
match pool.timeout_get(&timeout).await {
Ok(db) => {
let tbl_name = format!("exhaust_test_{}", i);
let result = db
.interact(move |apexdb| {
let table = apexdb.create_table(&tbl_name).unwrap();
table.count().unwrap()
})
.await
.unwrap();
assert_eq!(result, 0);
connections.push(db);
}
Err(e) => {
eprintln!("等待超时:{:?},程序继续运行", e);
}
};
}
assert_eq!(connections.len(), max_size);
}
#[tokio::test]
async fn concurrent_connections_read() {
let (_tmp, pool) = create_pool();
let pool = Arc::new(pool);
{
let conn = pool.get().await.unwrap();
conn.interact(|inner| {
inner
.execute("CREATE TABLE _t_concurrent (id INTEGER, val TEXT)")
.expect("Failed to create table");
for i in 0..10 {
inner
.execute(&format!(
"INSERT INTO _t_concurrent (id, val) VALUES ({i}, 'data_{i}')"
))
.expect("Failed to insert");
}
})
.await
.unwrap();
}
let mut handles = Vec::new();
for i in 0..10 {
let pool = Arc::clone(&pool);
handles.push(tokio::spawn(async move {
let conn = pool.get().await.expect("Failed to get connection");
conn.interact(move |inner| {
let result = inner
.execute(&format!("SELECT val FROM _t_concurrent WHERE id = {i}"))
.expect("Failed to query");
let rows = result.to_rows().expect("Failed to get rows");
let row = rows.first().expect("No rows returned");
let val = row.get("val").expect("Empty row");
match val {
Value::String(s) => assert_eq!(s.as_str(), format!("data_{i}")),
_ => panic!("Expected text, got {:?}", val),
}
})
.await
.expect("Interact failed");
}));
}
for handle in handles {
handle.await.expect("Task failed");
}
}
#[tokio::test]
async fn concurrent_connections_write() {
let (_tmp, pool) = create_pool();
let pool = Arc::new(pool);
{
let conn = pool.get().await.unwrap();
conn.interact(|inner| {
inner
.execute(
"CREATE TABLE IF NOT EXISTS _t_concurrent_write (id INTEGER PRIMARY KEY AUTOINCREMENT, thread INTEGER, seq INTEGER)",
)
.expect("Failed to create table");
})
.await
.expect("Interact failed");
}
let writers: Vec<_> = (0..8)
.map(|i| {
let pool = Arc::clone(&pool);
tokio::spawn(async move {
let conn = pool.get().await.unwrap();
conn.interact(move |inner| {
for j in 0..10 {
let id = i * 10 + j + 1;
inner
.execute(&format!(
"INSERT INTO _t_concurrent_write (id, thread, seq) VALUES ({id}, {i}, {j})"
))
.expect("Failed to insert");
}
})
.await
.expect("Interact failed");
})
})
.collect();
for writer in writers {
writer.await.expect("Task failed");
}
{
let conn = pool.get().await.unwrap();
let count: i64 = conn
.interact(|inner| {
let result = inner
.execute("SELECT count(*) AS cnt FROM _t_concurrent_write")
.expect("Failed to query");
let rows = result.to_rows().expect("Failed to get rows");
let row = rows.first().expect("No rows returned");
let val = row.get("cnt").expect("Empty row");
match val {
Value::Int64(n) => *n,
_ => panic!("Expected count, got {:?}", val),
}
})
.await
.unwrap();
assert_eq!(count, 80);
}
}