use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use pg_pool::wire::WirePoolable;
use pg_pool::{ConnPool, ConnPoolConfig, LifecycleHooks};
use pg_wired::PgPipeline;
fn env_or<'a>(var: &str, default: &'a str) -> std::borrow::Cow<'a, str> {
match std::env::var(var) {
Ok(v) => std::borrow::Cow::Owned(v),
Err(_) => std::borrow::Cow::Borrowed(default),
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = env_or("RESOLUTE_TEST_ADDR", "127.0.0.1:54322");
let user = env_or("RESOLUTE_TEST_USER", "postgres");
let pass = env_or("RESOLUTE_TEST_PASSWORD", "postgres");
let db = env_or("RESOLUTE_TEST_DB", "postgrest_test");
let mut config = ConnPoolConfig::default();
config.addr = addr.into_owned();
config.user = user.into_owned();
config.password = pass.into_owned();
config.database = db.into_owned();
config.min_idle = 1;
config.max_size = 4;
config.checkout_timeout = Duration::from_secs(5);
let creates = Arc::new(AtomicU64::new(0));
let checkouts = Arc::new(AtomicU64::new(0));
let checkins = Arc::new(AtomicU64::new(0));
let hooks = {
let c = Arc::clone(&creates);
let co = Arc::clone(&checkouts);
let ci = Arc::clone(&checkins);
let mut hooks = LifecycleHooks::<WirePoolable>::default();
hooks.on_create = Some(Box::new(move |_conn| {
c.fetch_add(1, Ordering::Relaxed);
}));
hooks.on_checkout = Some(Box::new(move |_conn| {
co.fetch_add(1, Ordering::Relaxed);
}));
hooks.on_checkin = Some(Box::new(move |_conn| {
ci.fetch_add(1, Ordering::Relaxed);
}));
hooks
};
let pool = ConnPool::<WirePoolable>::new(config, hooks).await?;
pool.warm_up(2).await;
println!("after warm_up: {}", pool.status());
{
let guard = pool.get().await?;
let wire = guard.take().0;
let mut pipe = PgPipeline::new(wire);
let rows = pipe.query("SELECT 42::int4 AS n", &[], &[]).await?;
let val = std::str::from_utf8(rows[0].cell(0).unwrap())?;
println!("pipelined query returned: {val}");
}
let handles: Vec<_> = (0..6)
.map(|i| {
let pool = Arc::clone(&pool);
tokio::spawn(async move {
let guard = pool.get().await.expect("checkout failed");
tokio::time::sleep(Duration::from_millis(10)).await;
println!("worker {i}: held connection then released");
drop(guard);
})
})
.collect();
for h in handles {
h.await?;
}
let m = pool.metrics();
println!(
"metrics: total = {}, idle = {}, in_use = {}, checkouts = {}, created = {}",
m.total, m.idle, m.in_use, m.total_checkouts, m.total_created,
);
println!(
"hook counters: creates = {}, checkouts = {}, checkins = {}",
creates.load(Ordering::Relaxed),
checkouts.load(Ordering::Relaxed),
checkins.load(Ordering::Relaxed),
);
pool.drain().await;
println!("drained: {}", pool.status());
Ok(())
}