use crate::Poolable;
#[derive(Debug)]
pub struct AsyncPoolable(pg_wired::AsyncConn);
impl AsyncPoolable {
pub fn conn(&self) -> &pg_wired::AsyncConn {
&self.0
}
}
impl std::ops::Deref for AsyncPoolable {
type Target = pg_wired::AsyncConn;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl Poolable for AsyncPoolable {
type Error = pg_wired::PgWireError;
async fn connect(
addr: &str,
user: &str,
password: &str,
database: &str,
) -> Result<Self, Self::Error> {
let wire = pg_wired::WireConn::connect(addr, user, password, database).await?;
let async_conn = pg_wired::AsyncConn::new(wire);
Ok(AsyncPoolable(async_conn))
}
fn has_pending_data(&self) -> bool {
!self.0.is_alive() || self.0.is_broken()
}
async fn reset(&self) -> bool {
if !self.0.is_alive() {
return false;
}
if !self.0.take_state_mutated() {
return true;
}
let mut buf = bytes::BytesMut::new();
pg_wired::protocol::frontend::encode_message(
&pg_wired::protocol::types::FrontendMsg::Query(b"DISCARD ALL"),
&mut buf,
);
match self.0.submit(buf, pg_wired::ResponseCollector::Drain).await {
Ok(_) => {
self.0.clear_statement_cache();
true
}
Err(e) => {
tracing::warn!(error = %e, "connection reset failed, destroying");
false
}
}
}
}