xitca_postgres/transaction/
portal.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
use core::sync::atomic::Ordering;

use postgres_protocol::message::backend;

use crate::{
    driver::codec::{
        encode::{Encode, PortalCancel, PortalCreate, PortalQuery},
        response::IntoResponse,
        AsParams,
    },
    error::Error,
    query::Query,
    statement::Statement,
};

/// A portal.
///
/// Portals can only be used with the connection that created them, and only exist for the duration of the transaction
/// in which they were created.
pub struct Portal<'a, C>
where
    C: Query,
{
    cli: &'a C,
    name: String,
    stmt: &'a Statement,
}

impl<C> Drop for Portal<'_, C>
where
    C: Query,
{
    fn drop(&mut self) {
        let _ = self.cli._send_encode_query(PortalCancel { name: &self.name });
    }
}

impl<C> Portal<'_, C>
where
    C: Query,
{
    pub(crate) async fn new<'p, I>(cli: &'p C, stmt: &'p Statement, params: I) -> Result<Portal<'p, C>, Error>
    where
        I: AsParams,
    {
        let name = format!("p{}", crate::NEXT_ID.fetch_add(1, Ordering::Relaxed));

        let (_, mut res) = cli._send_encode_query(PortalCreate {
            name: &name,
            stmt: stmt.name(),
            types: stmt.params(),
            params,
        })?;

        match res.recv().await? {
            backend::Message::BindComplete => {}
            _ => return Err(Error::unexpected()),
        }

        Ok(Portal { cli, name, stmt })
    }

    pub fn query_portal(
        &self,
        max_rows: i32,
    ) -> Result<<<PortalQuery<'_> as Encode>::Output as IntoResponse>::Response, Error> {
        self.cli._query(PortalQuery {
            name: &self.name,
            columns: self.stmt.columns(),
            max_rows,
        })
    }
}