xitca_postgres/transaction/
portal.rs

1use core::sync::atomic::Ordering;
2
3use postgres_protocol::message::backend;
4
5use crate::{
6    driver::codec::{
7        encode::{Encode, PortalCancel, PortalCreate, PortalQuery},
8        response::IntoResponse,
9        AsParams,
10    },
11    error::Error,
12    query::Query,
13    statement::Statement,
14};
15
16/// A portal.
17///
18/// Portals can only be used with the connection that created them, and only exist for the duration of the transaction
19/// in which they were created.
20pub struct Portal<'a, C>
21where
22    C: Query,
23{
24    cli: &'a C,
25    name: String,
26    stmt: &'a Statement,
27}
28
29impl<C> Drop for Portal<'_, C>
30where
31    C: Query,
32{
33    fn drop(&mut self) {
34        let _ = self.cli._send_encode_query(PortalCancel { name: &self.name });
35    }
36}
37
38impl<C> Portal<'_, C>
39where
40    C: Query,
41{
42    pub(crate) async fn new<'p, I>(cli: &'p C, stmt: &'p Statement, params: I) -> Result<Portal<'p, C>, Error>
43    where
44        I: AsParams,
45    {
46        let name = format!("p{}", crate::NEXT_ID.fetch_add(1, Ordering::Relaxed));
47
48        let (_, mut res) = cli._send_encode_query(PortalCreate {
49            name: &name,
50            stmt: stmt.name(),
51            types: stmt.params(),
52            params,
53        })?;
54
55        match res.recv().await? {
56            backend::Message::BindComplete => {}
57            _ => return Err(Error::unexpected()),
58        }
59
60        Ok(Portal { cli, name, stmt })
61    }
62
63    pub fn query_portal(
64        &self,
65        max_rows: i32,
66    ) -> Result<<<PortalQuery<'_> as Encode>::Output as IntoResponse>::Response, Error> {
67        self.cli._query(PortalQuery {
68            name: &self.name,
69            columns: self.stmt.columns(),
70            max_rows,
71        })
72    }
73}