zero_postgres/tokio/named_portal.rs
1//! Named portal for async iterative row fetching.
2
3use std::marker::PhantomData;
4
5use crate::conversion::FromRow;
6use crate::error::Result;
7use crate::handler::{BinaryHandler, CollectHandler};
8
9use super::Conn;
10
11/// Handle to a named portal for async iterative row fetching.
12///
13/// Created by [`Transaction::exec_portal()`]. Use [`execute()`](Self::execute) to retrieve rows
14/// in batches. The lifetime parameter ties the portal to the transaction that created it,
15/// preventing the transaction from being committed/rolled back while the portal is alive.
16///
17/// # Example
18///
19/// ```ignore
20/// conn.tx(|conn, tx| async move {
21/// let mut portal = tx.exec_portal(conn, &stmt, ()).await?;
22///
23/// while !portal.is_complete() {
24/// let rows: Vec<(i32,)> = portal.execute_collect(conn, 100).await?;
25/// process(rows);
26/// }
27///
28/// portal.close(conn).await?;
29/// tx.commit(conn).await
30/// }).await?;
31/// ```
32pub struct NamedPortal<'tx> {
33 pub(crate) name: String,
34 complete: bool,
35 _marker: PhantomData<&'tx ()>,
36}
37
38impl<'tx> NamedPortal<'tx> {
39 /// Create a new named portal.
40 pub(crate) fn new(name: String) -> Self {
41 Self {
42 name,
43 complete: false,
44 _marker: PhantomData,
45 }
46 }
47
48 /// Get the portal name.
49 pub fn name(&self) -> &str {
50 &self.name
51 }
52
53 /// Check if portal execution is complete (no more rows available).
54 pub fn is_complete(&self) -> bool {
55 self.complete
56 }
57
58 /// Execute the portal with a handler.
59 ///
60 /// Fetches up to `max_rows` rows. Pass 0 to fetch all remaining rows.
61 /// Updates internal completion status.
62 pub async fn execute<H: BinaryHandler>(
63 &mut self,
64 conn: &mut Conn,
65 max_rows: u32,
66 handler: &mut H,
67 ) -> Result<()> {
68 let has_more = conn.lowlevel_execute(&self.name, max_rows, handler).await?;
69 self.complete = !has_more;
70 Ok(())
71 }
72
73 /// Execute the portal and collect typed rows.
74 ///
75 /// Fetches up to `max_rows` rows. Pass 0 to fetch all remaining rows.
76 pub async fn execute_collect<T: for<'a> FromRow<'a>>(
77 &mut self,
78 conn: &mut Conn,
79 max_rows: u32,
80 ) -> Result<Vec<T>> {
81 let mut handler = CollectHandler::<T>::new();
82 self.execute(conn, max_rows, &mut handler).await?;
83 Ok(handler.into_rows())
84 }
85
86 /// Close the portal and sync.
87 ///
88 /// This sends Close(Portal) followed by Sync to end the transaction.
89 pub async fn close(self, conn: &mut Conn) -> Result<()> {
90 conn.lowlevel_close_portal(&self.name).await?;
91 conn.lowlevel_sync().await
92 }
93}