zero_postgres/tokio/
named_portal.rs

1//! Named portal for async iterative row fetching.
2
3use std::marker::PhantomData;
4
5use crate::conversion::FromRow;
6use crate::error::Result;
7use crate::handler::{BinaryHandler, CollectHandler};
8
9use super::Conn;
10
11/// Handle to a named portal for async iterative row fetching.
12///
13/// Created by [`Transaction::exec_portal()`]. Use [`execute()`](Self::execute) to retrieve rows
14/// in batches. The lifetime parameter ties the portal to the transaction that created it,
15/// preventing the transaction from being committed/rolled back while the portal is alive.
16///
17/// # Example
18///
19/// ```ignore
20/// conn.tx(|conn, tx| async move {
21///     let mut portal = tx.exec_portal(conn, &stmt, ()).await?;
22///
23///     while !portal.is_complete() {
24///         let rows: Vec<(i32,)> = portal.execute_collect(conn, 100).await?;
25///         process(rows);
26///     }
27///
28///     portal.close(conn).await?;
29///     tx.commit(conn).await
30/// }).await?;
31/// ```
32pub struct NamedPortal<'tx> {
33    pub(crate) name: String,
34    complete: bool,
35    _marker: PhantomData<&'tx ()>,
36}
37
38impl<'tx> NamedPortal<'tx> {
39    /// Create a new named portal.
40    pub(crate) fn new(name: String) -> Self {
41        Self {
42            name,
43            complete: false,
44            _marker: PhantomData,
45        }
46    }
47
48    /// Get the portal name.
49    pub fn name(&self) -> &str {
50        &self.name
51    }
52
53    /// Check if portal execution is complete (no more rows available).
54    pub fn is_complete(&self) -> bool {
55        self.complete
56    }
57
58    /// Execute the portal with a handler.
59    ///
60    /// Fetches up to `max_rows` rows. Pass 0 to fetch all remaining rows.
61    /// Updates internal completion status.
62    pub async fn execute<H: BinaryHandler>(
63        &mut self,
64        conn: &mut Conn,
65        max_rows: u32,
66        handler: &mut H,
67    ) -> Result<()> {
68        let has_more = conn.lowlevel_execute(&self.name, max_rows, handler).await?;
69        self.complete = !has_more;
70        Ok(())
71    }
72
73    /// Execute the portal and collect typed rows.
74    ///
75    /// Fetches up to `max_rows` rows. Pass 0 to fetch all remaining rows.
76    pub async fn execute_collect<T: for<'a> FromRow<'a>>(
77        &mut self,
78        conn: &mut Conn,
79        max_rows: u32,
80    ) -> Result<Vec<T>> {
81        let mut handler = CollectHandler::<T>::new();
82        self.execute(conn, max_rows, &mut handler).await?;
83        Ok(handler.into_rows())
84    }
85
86    /// Close the portal and sync.
87    ///
88    /// This sends Close(Portal) followed by Sync to end the transaction.
89    pub async fn close(self, conn: &mut Conn) -> Result<()> {
90        conn.lowlevel_close_portal(&self.name).await?;
91        conn.lowlevel_sync().await
92    }
93}