use std::future::IntoFuture;
use bb8::{ManageConnection, Pool};
use xitca_postgres::{
Client, Config, Execute, Postgres,
dev::{ClientBorrow, ClientBorrowMut},
error::{DriverDown, Error},
iter::AsyncLendingIterator,
transaction::{Transaction, TransactionBuilder},
};
pub struct PoolManager {
config: Config,
}
impl ManageConnection for PoolManager {
type Connection = PoolConnection;
type Error = Error;
async fn connect(&self) -> Result<Self::Connection, Self::Error> {
let (conn, driver) = Postgres::new(self.config.clone()).connect().await?;
tokio::spawn(driver.into_future());
Ok(PoolConnection { conn })
}
async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
if conn.conn.closed() {
Err(DriverDown.into())
} else {
Ok(())
}
}
fn has_broken(&self, conn: &mut Self::Connection) -> bool {
conn.conn.closed()
}
}
pub struct PoolConnection {
conn: Client,
}
impl ClientBorrowMut for PoolConnection {
fn borrow_cli_mut(&mut self) -> &mut Client {
&mut self.conn
}
}
impl ClientBorrow for PoolConnection {
fn borrow_cli_ref(&self) -> &Client {
&self.conn
}
}
impl PoolConnection {
pub async fn transaction(&mut self) -> Result<Transaction<'_, Self>, Error> {
TransactionBuilder::new().begin(self).await
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let config = Config::try_from("postgres://postgres:postgres@localhost:5432")?;
let manager = PoolManager { config };
let pool = Pool::builder().build(manager).await?;
let mut conn = pool.get().await?;
let transaction = conn.conn.transaction().await?;
let mut res = "SELECT 1".query(&transaction).await?;
let row = res.try_next().await?.ok_or("row not found")?;
assert_eq!(Some("1"), row.get(0));
transaction.rollback().await?;
let transaction = conn.transaction().await?;
let mut res = "SELECT 1".query(&transaction).await?;
let row = res.try_next().await?.ok_or("row not found")?;
assert_eq!(Some("1"), row.get(0));
transaction.commit().await?;
Ok(())
}