zero_postgres/sync/named_portal.rs
1//! Named portal for iterative row fetching.
2
3use std::marker::PhantomData;
4
5use crate::conversion::FromRow;
6use crate::error::Result;
7use crate::handler::{CollectHandler, ExtendedHandler, ForEachHandler};
8
9use super::Conn;
10
11/// Handle to a named portal for iterative row fetching.
12///
13/// Created by [`Transaction::exec_portal_named()`]. Use [`exec()`](Self::exec) to retrieve rows
14/// in batches. The lifetime parameter ties the portal to the transaction that created it,
15/// preventing the transaction from being committed/rolled back while the portal is alive.
16///
17/// # Example
18///
19/// ```ignore
20/// let tx = conn.begin()?;
21/// let mut portal = tx.exec_portal_named(&mut conn, &stmt, ())?;
22///
23/// while !portal.is_complete() {
24/// let rows: Vec<(i32,)> = portal.exec_collect(&mut conn, 100)?;
25/// process(rows);
26/// }
27///
28/// portal.close(&mut conn)?;
29/// tx.commit(&mut conn)?;
30/// ```
31pub struct NamedPortal<'tx> {
32 pub(crate) name: String,
33 complete: bool,
34 _marker: PhantomData<&'tx ()>,
35}
36
37impl<'tx> NamedPortal<'tx> {
38 /// Create a new named portal.
39 pub(crate) fn new(name: String) -> Self {
40 Self {
41 name,
42 complete: false,
43 _marker: PhantomData,
44 }
45 }
46
47 /// Get the portal name.
48 pub fn name(&self) -> &str {
49 &self.name
50 }
51
52 /// Check if portal execution is complete (no more rows available).
53 pub fn is_complete(&self) -> bool {
54 self.complete
55 }
56
57 /// Execute the portal with a handler.
58 ///
59 /// Fetches up to `max_rows` rows. Pass 0 to fetch all remaining rows.
60 /// Updates internal completion status.
61 pub fn exec<H: ExtendedHandler>(
62 &mut self,
63 conn: &mut Conn,
64 max_rows: u32,
65 handler: &mut H,
66 ) -> Result<()> {
67 let has_more = conn.lowlevel_execute(&self.name, max_rows, handler)?;
68 self.complete = !has_more;
69 Ok(())
70 }
71
72 /// Execute the portal and collect typed rows.
73 ///
74 /// Fetches up to `max_rows` rows. Pass 0 to fetch all remaining rows.
75 pub fn exec_collect<T: for<'a> FromRow<'a>>(
76 &mut self,
77 conn: &mut Conn,
78 max_rows: u32,
79 ) -> Result<Vec<T>> {
80 let mut handler = CollectHandler::<T>::new();
81 self.exec(conn, max_rows, &mut handler)?;
82 Ok(handler.into_rows())
83 }
84
85 /// Execute the portal and call a closure for each row.
86 ///
87 /// Fetches up to `max_rows` rows. Pass 0 to fetch all remaining rows.
88 pub fn exec_foreach<T: for<'a> FromRow<'a>, F: FnMut(T) -> Result<()>>(
89 &mut self,
90 conn: &mut Conn,
91 max_rows: u32,
92 f: F,
93 ) -> Result<()> {
94 let mut handler = ForEachHandler::<T, F>::new(f);
95 self.exec(conn, max_rows, &mut handler)
96 }
97
98 /// Close the portal and sync.
99 ///
100 /// This sends Close(Portal) followed by Sync to end the transaction.
101 pub fn close(self, conn: &mut Conn) -> Result<()> {
102 conn.lowlevel_close_portal(&self.name)?;
103 conn.lowlevel_sync()
104 }
105}