Skip to main content

xitca_postgres/transaction/
portal.rs

1use core::sync::atomic::Ordering;
2
3use crate::{
4    client::ClientBorrow,
5    driver::codec::{
6        AsParams,
7        encode::{Encode, PortalCancel, PortalCreate, PortalQuery},
8        response::IntoResponse,
9    },
10    error::Error,
11    protocol::message::backend,
12    statement::Statement,
13};
14
15/// A portal.
16///
17/// Portals can only be used with the connection that created them, and only exist for the duration of the transaction
18/// in which they were created.
19pub struct Portal<'a, C>
20where
21    C: ClientBorrow,
22{
23    cli: &'a C,
24    name: String,
25    stmt: &'a Statement,
26}
27
28impl<C> Drop for Portal<'_, C>
29where
30    C: ClientBorrow,
31{
32    fn drop(&mut self) {
33        let _ = self.cli.borrow_cli_ref().query_raw(PortalCancel { name: &self.name });
34    }
35}
36
37impl<C> Portal<'_, C>
38where
39    C: ClientBorrow,
40{
41    pub(crate) async fn new<'p, I>(cli: &'p C, stmt: &'p Statement, params: I) -> Result<Portal<'p, C>, Error>
42    where
43        I: AsParams,
44    {
45        let name = format!("p{}", crate::NEXT_ID.fetch_add(1, Ordering::Relaxed));
46
47        let (_, mut res) = cli.borrow_cli_ref().query_raw(PortalCreate {
48            name: &name,
49            stmt: stmt.name(),
50            types: stmt.params(),
51            params,
52        })?;
53
54        match res.recv().await? {
55            backend::Message::BindComplete => {}
56            _ => return Err(Error::unexpected()),
57        }
58
59        Ok(Portal { cli, name, stmt })
60    }
61
62    pub fn query_portal(
63        &self,
64        max_rows: i32,
65    ) -> Result<<<PortalQuery<'_> as Encode>::Output as IntoResponse>::Response, Error> {
66        self.cli.borrow_cli_ref().query(PortalQuery {
67            name: &self.name,
68            columns: self.stmt.columns(),
69            max_rows,
70        })
71    }
72}