simple_pg 0.5.8

Provides extentions and utilites for working with postgres.
Documentation
use std::{pin::Pin, task::Poll};

use super::*;
use bytes::Bytes;
use postgres::{binary_copy::BinaryCopyInWriter, types::Type};
use simple_pg_client::CopyInSink;

async fn query_for_each_inner(
    tx: &Conn,
    raw: &str,
    params: &[&(dyn ToSql + Sync)],
    map: &mut (dyn Send + Sync + FnMut(Row) -> Result<(), Error>),
) -> Result<(), Error> {
    use futures_core::Stream;
    let stmt = tx.prepare_cached(raw).await?;
    let mut stream = std::pin::pin!(tx.query_raw_statement(stmt, params).await?);
    std::future::poll_fn(|cx| loop {
        match stream.as_mut().poll_next(cx) {
            Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(err)),
            Poll::Ready(Some(Ok(row))) => {
                if let Err(err) = map(row) {
                    return Poll::Ready(Err(err));
                }
            }
            Poll::Ready(None) => return Poll::Ready(Ok(())),
            Poll::Pending => return Poll::Pending::<Result<(), Error>>,
        }
    })
    .await
}

impl<'a> Sql<'a> {
    pub fn fragment(&self) -> SqlFragment<'_> {
        SqlFragment::Lucky(self)
    }
    pub async fn execute(self, tx: &Conn) -> Result<u64, postgres::error::Error> {
        let stmt = tx.prepare_cached(self.raw).await?;
        tx.execute_stmt(stmt, self.bindings).await
    }
    pub async fn batch_execute(self, tx: &Conn) -> Result<(), postgres::error::Error> {
        tx.batch_execute(self.raw).await
    }
    pub async fn query(self, tx: &Conn) -> Result<Vec<Row>, postgres::error::Error> {
        let stmt = tx.prepare_cached(self.raw).await?;
        tx.query(&stmt, self.bindings).await
    }

    pub async fn query_map<T: Sync + Send>(
        self,
        tx: &Conn,
        mut map: impl Send + Sync + FnMut(Row) -> Result<T, postgres::error::Error>,
    ) -> Result<Vec<T>, postgres::error::Error> {
        let mut collect = Vec::new();
        query_for_each_inner(tx, &self.raw, &self.bindings, &mut |row| {
            collect.push(map(row)?);
            Ok(())
        })
        .await?;
        Ok(collect)
    }
    pub async fn query_one(self, tx: &Conn) -> Result<Row, postgres::error::Error> {
        let stmt = tx.prepare_cached(self.raw).await?;
        tx.query_one_stmt(stmt, self.bindings).await
    }
    pub async fn query_into<T: FromSqlOwned>(self, tx: &Conn) -> Result<T, postgres::error::Error> {
        let stmt = tx.prepare_cached(self.raw).await?;
        Ok(tx.query_one_stmt(stmt, self.bindings).await?.get_unwrap(0))
    }
    pub async fn query_opt(self, tx: &Conn) -> Result<Option<Row>, postgres::error::Error> {
        let stmt = tx.prepare_cached(self.raw).await?;
        tx.query_opt(&stmt, self.bindings).await
    }
    pub async fn copy_in(self, tx: &Conn) -> Result<CopyInSink<Bytes>, postgres::error::Error> {
        tx.copy_in(self.raw).await
    }
    pub async fn copy_in_binary(
        self,
        tx: &Conn,
        types: &[Type],
    ) -> Result<Pin<Box<BinaryCopyInWriter>>, postgres::error::Error> {
        Ok(Box::pin(BinaryCopyInWriter::new(
            tx.copy_in(self.raw).await?,
            types,
        )))
    }
}

impl<'a> GeneratedSql<'a> {
    pub async fn execute(self, tx: &Conn) -> Result<u64, postgres::error::Error> {
        let stmt = tx.prepare_cached(&self.raw).await?;
        tx.execute_stmt(stmt, &self.bindings).await
    }
    pub async fn query_map<T: Sync + Send>(
        self,
        tx: &Conn,
        mut map: impl Send + Sync + FnMut(Row) -> Result<T, postgres::error::Error>,
    ) -> Result<Vec<T>, postgres::error::Error> {
        let mut collect = Vec::new();
        query_for_each_inner(tx, &self.raw, &self.bindings, &mut |row| {
            collect.push(map(row)?);
            Ok(())
        })
        .await?;
        Ok(collect)
    }
    pub async fn batch_execute(self, tx: &Conn) -> Result<(), postgres::error::Error> {
        tx.batch_execute(&self.raw).await
    }
    pub async fn query(self, tx: &Conn) -> Result<Vec<Row>, postgres::error::Error> {
        let stmt = tx.prepare_cached(&self.raw).await?;
        tx.query(&stmt, &self.bindings).await
    }
    pub async fn query_one(self, tx: &Conn) -> Result<Row, postgres::error::Error> {
        let stmt = tx.prepare_cached(&self.raw).await?;
        tx.query_one_stmt(stmt, &self.bindings).await
    }
    pub async fn query_into<T: FromSqlOwned>(
        self,
        tx: &mut Conn,
    ) -> Result<T, postgres::error::Error> {
        let stmt = tx.prepare_cached(&self.raw).await?;
        Ok(tx.query_one(&stmt, &self.bindings).await?.get_unwrap(0))
    }
    pub async fn query_opt(self, tx: &mut Conn) -> Result<Option<Row>, postgres::error::Error> {
        let stmt = tx.prepare_cached(&self.raw).await?;
        tx.query_opt(&stmt, &self.bindings).await
    }
}

// non caching
// impl<'a> Sql<'a> {
//     pub fn fragment(&self) -> SqlFragment<'_> {
//         SqlFragment::Lucky(self)
//     }
//     pub async fn execute(self, tx: &mut Client) -> Result<u64, postgres::error::Error> {
//         tx.execute(self.raw, self.bindings).await
//     }
//     pub async fn batch_execute(self, tx: &mut Client) -> Result<(), postgres::error::Error> {
//         tx.batch_execute(self.raw).await
//     }
//     pub async fn query(self, tx: &mut Client)-> Result<Vec<Row>, postgres::error::Error> {
//         tx.query(self.raw, self.bindings).await
//     }
//     pub async fn query_one(self, tx: &mut Client)-> Result<Row, postgres::error::Error> {
//         tx.query_one(self.raw, self.bindings).await
//     }
//     pub async fn query_into<T: FromSqlOwned>(self, tx: &mut Client)-> Result<T, postgres::error::Error> {
//         Ok(tx.query_one(self.raw, self.bindings).await?.get(0))
//     }
//     pub async fn query_opt(self, tx: &mut Client)-> Result<Option<Row>, postgres::error::Error> {
//         tx.query_opt(self.raw, self.bindings).await
//     }
// }

// impl<'a> GeneratedSql<'a> {
//     pub async fn execute(self, tx: &mut Client) -> Result<u64, postgres::error::Error> {
//         tx.execute(&self.raw, &self.bindings).await
//     }
//     pub async fn batch_execute(self, tx: &mut Client) -> Result<(), postgres::error::Error> {
//         tx.batch_execute(&self.raw).await
//     }
//     pub async fn query(self, tx: &mut Client)-> Result<Vec<Row>, postgres::error::Error> {
//         tx.query(&self.raw, &self.bindings).await
//     }
//     pub async fn query_one(self, tx: &mut Client)-> Result<Row, postgres::error::Error> {
//         tx.query_one(&self.raw, &self.bindings).await
//     }
//     pub async fn query_into<T: FromSqlOwned>(self, tx: &mut Client)-> Result<T, postgres::error::Error> {
//         Ok(tx.query_one(&self.raw, &self.bindings).await?.get(0))
//     }
//     pub async fn query_opt(self, tx: &mut Client)-> Result<Option<Row>, postgres::error::Error> {
//         tx.query_opt(&self.raw, &self.bindings).await
//     }
// }