1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
use core::sync::atomic::Ordering;

use postgres_protocol::message::backend;

use crate::{
    driver::codec::{AsParams, IntoStream, PortalCancel, PortalCreate, PortalQuery},
    error::Error,
    query::Query,
    statement::Statement,
};

/// A portal.
///
/// Portals can only be used with the connection that created them, and only exist for the duration of the transaction
/// in which they were created.
pub struct Portal<'a, C>
where
    C: Query,
{
    cli: &'a C,
    name: String,
    stmt: &'a Statement,
}

impl<C> Drop for Portal<'_, C>
where
    C: Query,
{
    fn drop(&mut self) {
        let _ = self
            .cli
            ._send_encode_query::<_, crate::ZeroParam>(PortalCancel { name: &self.name }, []);
    }
}

impl<C> Portal<'_, C>
where
    C: Query,
{
    pub(crate) async fn new<'p, I>(cli: &'p C, stmt: &'p Statement, params: I) -> Result<Portal<'p, C>, Error>
    where
        I: AsParams,
    {
        let name = format!("p{}", crate::NEXT_ID.fetch_add(1, Ordering::Relaxed));

        let mut res = cli._send_encode_query(
            PortalCreate {
                name: &name,
                stmt: stmt.name(),
                types: stmt.params(),
            },
            params,
        )?;

        match res.recv().await? {
            backend::Message::BindComplete => {}
            _ => return Err(Error::unexpected()),
        }

        Ok(Portal { cli, name, stmt })
    }

    pub fn query_portal(&self, max_rows: i32) -> Result<<PortalQuery<'_> as IntoStream>::RowStream<'_>, Error> {
        self.cli._query_raw::<_, crate::ZeroParam>(
            PortalQuery {
                name: &self.name,
                columns: self.stmt.columns(),
                max_rows,
            },
            [],
        )
    }
}