use std::{pin::Pin, task::Poll};
use super::*;
use bytes::Bytes;
use postgres::{binary_copy::BinaryCopyInWriter, types::Type};
use simple_pg_client::CopyInSink;
async fn query_for_each_inner(
tx: &Conn,
raw: &str,
params: &[&(dyn ToSql + Sync)],
map: &mut (dyn Send + Sync + FnMut(Row) -> Result<(), Error>),
) -> Result<(), Error> {
use futures_core::Stream;
let stmt = tx.prepare_cached(raw).await?;
let mut stream = std::pin::pin!(tx.query_raw_statement(stmt, params).await?);
std::future::poll_fn(|cx| loop {
match stream.as_mut().poll_next(cx) {
Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(err)),
Poll::Ready(Some(Ok(row))) => {
if let Err(err) = map(row) {
return Poll::Ready(Err(err));
}
}
Poll::Ready(None) => return Poll::Ready(Ok(())),
Poll::Pending => return Poll::Pending::<Result<(), Error>>,
}
})
.await
}
impl<'a> Sql<'a> {
pub fn fragment(&self) -> SqlFragment<'_> {
SqlFragment::Lucky(self)
}
pub async fn execute(self, tx: &Conn) -> Result<u64, postgres::error::Error> {
let stmt = tx.prepare_cached(self.raw).await?;
tx.execute_stmt(stmt, self.bindings).await
}
pub async fn batch_execute(self, tx: &Conn) -> Result<(), postgres::error::Error> {
tx.batch_execute(self.raw).await
}
pub async fn query(self, tx: &Conn) -> Result<Vec<Row>, postgres::error::Error> {
let stmt = tx.prepare_cached(self.raw).await?;
tx.query(&stmt, self.bindings).await
}
pub async fn query_map<T: Sync + Send>(
self,
tx: &Conn,
mut map: impl Send + Sync + FnMut(Row) -> Result<T, postgres::error::Error>,
) -> Result<Vec<T>, postgres::error::Error> {
let mut collect = Vec::new();
query_for_each_inner(tx, &self.raw, &self.bindings, &mut |row| {
collect.push(map(row)?);
Ok(())
})
.await?;
Ok(collect)
}
pub async fn query_one(self, tx: &Conn) -> Result<Row, postgres::error::Error> {
let stmt = tx.prepare_cached(self.raw).await?;
tx.query_one_stmt(stmt, self.bindings).await
}
pub async fn query_into<T: FromSqlOwned>(self, tx: &Conn) -> Result<T, postgres::error::Error> {
let stmt = tx.prepare_cached(self.raw).await?;
Ok(tx.query_one_stmt(stmt, self.bindings).await?.get_unwrap(0))
}
pub async fn query_opt(self, tx: &Conn) -> Result<Option<Row>, postgres::error::Error> {
let stmt = tx.prepare_cached(self.raw).await?;
tx.query_opt(&stmt, self.bindings).await
}
pub async fn copy_in(self, tx: &Conn) -> Result<CopyInSink<Bytes>, postgres::error::Error> {
tx.copy_in(self.raw).await
}
pub async fn copy_in_binary(
self,
tx: &Conn,
types: &[Type],
) -> Result<Pin<Box<BinaryCopyInWriter>>, postgres::error::Error> {
Ok(Box::pin(BinaryCopyInWriter::new(
tx.copy_in(self.raw).await?,
types,
)))
}
}
impl<'a> GeneratedSql<'a> {
pub async fn execute(self, tx: &Conn) -> Result<u64, postgres::error::Error> {
let stmt = tx.prepare_cached(&self.raw).await?;
tx.execute_stmt(stmt, &self.bindings).await
}
pub async fn query_map<T: Sync + Send>(
self,
tx: &Conn,
mut map: impl Send + Sync + FnMut(Row) -> Result<T, postgres::error::Error>,
) -> Result<Vec<T>, postgres::error::Error> {
let mut collect = Vec::new();
query_for_each_inner(tx, &self.raw, &self.bindings, &mut |row| {
collect.push(map(row)?);
Ok(())
})
.await?;
Ok(collect)
}
pub async fn batch_execute(self, tx: &Conn) -> Result<(), postgres::error::Error> {
tx.batch_execute(&self.raw).await
}
pub async fn query(self, tx: &Conn) -> Result<Vec<Row>, postgres::error::Error> {
let stmt = tx.prepare_cached(&self.raw).await?;
tx.query(&stmt, &self.bindings).await
}
pub async fn query_one(self, tx: &Conn) -> Result<Row, postgres::error::Error> {
let stmt = tx.prepare_cached(&self.raw).await?;
tx.query_one_stmt(stmt, &self.bindings).await
}
pub async fn query_into<T: FromSqlOwned>(
self,
tx: &mut Conn,
) -> Result<T, postgres::error::Error> {
let stmt = tx.prepare_cached(&self.raw).await?;
Ok(tx.query_one(&stmt, &self.bindings).await?.get_unwrap(0))
}
pub async fn query_opt(self, tx: &mut Conn) -> Result<Option<Row>, postgres::error::Error> {
let stmt = tx.prepare_cached(&self.raw).await?;
tx.query_opt(&stmt, &self.bindings).await
}
}