xitca_postgres/transaction/
portal.rsuse core::sync::atomic::Ordering;
use postgres_protocol::message::backend;
use crate::{
driver::codec::{
encode::{Encode, PortalCancel, PortalCreate, PortalQuery},
response::IntoResponse,
AsParams,
},
error::Error,
query::Query,
statement::Statement,
};
pub struct Portal<'a, C>
where
C: Query,
{
cli: &'a C,
name: String,
stmt: &'a Statement,
}
impl<C> Drop for Portal<'_, C>
where
C: Query,
{
fn drop(&mut self) {
let _ = self.cli._send_encode_query(PortalCancel { name: &self.name });
}
}
impl<C> Portal<'_, C>
where
C: Query,
{
pub(crate) async fn new<'p, I>(cli: &'p C, stmt: &'p Statement, params: I) -> Result<Portal<'p, C>, Error>
where
I: AsParams,
{
let name = format!("p{}", crate::NEXT_ID.fetch_add(1, Ordering::Relaxed));
let (_, mut res) = cli._send_encode_query(PortalCreate {
name: &name,
stmt: stmt.name(),
types: stmt.params(),
params,
})?;
match res.recv().await? {
backend::Message::BindComplete => {}
_ => return Err(Error::unexpected()),
}
Ok(Portal { cli, name, stmt })
}
pub fn query_portal(
&self,
max_rows: i32,
) -> Result<<<PortalQuery<'_> as Encode>::Output as IntoResponse>::Response, Error> {
self.cli._query(PortalQuery {
name: &self.name,
columns: self.stmt.columns(),
max_rows,
})
}
}