xitca_postgres/transaction/
portal.rs1use core::sync::atomic::Ordering;
2
3use postgres_protocol::message::backend;
4
5use crate::{
6 driver::codec::{
7 encode::{Encode, PortalCancel, PortalCreate, PortalQuery},
8 response::IntoResponse,
9 AsParams,
10 },
11 error::Error,
12 query::Query,
13 statement::Statement,
14};
15
16pub struct Portal<'a, C>
21where
22 C: Query,
23{
24 cli: &'a C,
25 name: String,
26 stmt: &'a Statement,
27}
28
29impl<C> Drop for Portal<'_, C>
30where
31 C: Query,
32{
33 fn drop(&mut self) {
34 let _ = self.cli._send_encode_query(PortalCancel { name: &self.name });
35 }
36}
37
38impl<C> Portal<'_, C>
39where
40 C: Query,
41{
42 pub(crate) async fn new<'p, I>(cli: &'p C, stmt: &'p Statement, params: I) -> Result<Portal<'p, C>, Error>
43 where
44 I: AsParams,
45 {
46 let name = format!("p{}", crate::NEXT_ID.fetch_add(1, Ordering::Relaxed));
47
48 let (_, mut res) = cli._send_encode_query(PortalCreate {
49 name: &name,
50 stmt: stmt.name(),
51 types: stmt.params(),
52 params,
53 })?;
54
55 match res.recv().await? {
56 backend::Message::BindComplete => {}
57 _ => return Err(Error::unexpected()),
58 }
59
60 Ok(Portal { cli, name, stmt })
61 }
62
63 pub fn query_portal(
64 &self,
65 max_rows: i32,
66 ) -> Result<<<PortalQuery<'_> as Encode>::Output as IntoResponse>::Response, Error> {
67 self.cli._query(PortalQuery {
68 name: &self.name,
69 columns: self.stmt.columns(),
70 max_rows,
71 })
72 }
73}