zero_postgres/
handler.rs

1//! Typed result handlers.
2
3use crate::conversion::FromRow;
4use crate::error::Result;
5use crate::protocol::backend::query::{CommandComplete, DataRow, RowDescription};
6use crate::state::action::AsyncMessage;
7
8/// Handler for simple query results (text format).
9///
10/// Callback patterns by statement type:
11/// - SELECT with rows: `result_start` → `row*` → `result_end`
12/// - SELECT with 0 rows: `result_start` → `result_end`
13/// - INSERT/UPDATE/DELETE: `result_end` only (with affected row count)
14///
15/// For multi-statement queries like `"SELECT 1; UPDATE foo SET x=1"`:
16/// ```text
17/// result_start → row* → result_end   // SELECT 1
18/// result_end                          // UPDATE
19/// ```
20pub trait TextHandler {
21    /// Called when a result set begins.
22    fn result_start(&mut self, cols: RowDescription<'_>) -> Result<()> {
23        let _ = cols;
24        Ok(())
25    }
26
27    /// Called for each data row.
28    fn row(&mut self, cols: RowDescription<'_>, row: DataRow<'_>) -> Result<()>;
29
30    /// Called when a result set ends.
31    fn result_end(&mut self, complete: CommandComplete<'_>) -> Result<()> {
32        let _ = complete;
33        Ok(())
34    }
35}
36
37/// Handler for extended query results (binary format).
38///
39/// Callback patterns by statement type:
40/// - SELECT with rows: `result_start` → `row*` → `result_end`
41/// - SELECT with 0 rows: `result_start` → `result_end`
42/// - INSERT/UPDATE/DELETE: `result_end` only (with affected row count)
43pub trait BinaryHandler {
44    /// Called when a result set begins.
45    fn result_start(&mut self, cols: RowDescription<'_>) -> Result<()> {
46        let _ = cols;
47        Ok(())
48    }
49
50    /// Called for each data row.
51    fn row(&mut self, cols: RowDescription<'_>, row: DataRow<'_>) -> Result<()>;
52
53    /// Called when a result set ends.
54    fn result_end(&mut self, complete: CommandComplete<'_>) -> Result<()> {
55        let _ = complete;
56        Ok(())
57    }
58}
59
60/// A handler that discards all results.
61#[derive(Debug, Default)]
62pub struct DropHandler {
63    rows_affected: Option<u64>,
64}
65
66impl DropHandler {
67    /// Create a new drop handler.
68    pub fn new() -> Self {
69        Self::default()
70    }
71
72    /// Get the number of rows affected (if applicable).
73    pub fn rows_affected(&self) -> Option<u64> {
74        self.rows_affected
75    }
76}
77
78impl TextHandler for DropHandler {
79    fn row(&mut self, _cols: RowDescription<'_>, _row: DataRow<'_>) -> Result<()> {
80        Ok(())
81    }
82
83    fn result_end(&mut self, complete: CommandComplete<'_>) -> Result<()> {
84        self.rows_affected = complete.rows_affected();
85        Ok(())
86    }
87}
88
89impl BinaryHandler for DropHandler {
90    fn row(&mut self, _cols: RowDescription<'_>, _row: DataRow<'_>) -> Result<()> {
91        Ok(())
92    }
93
94    fn result_end(&mut self, complete: CommandComplete<'_>) -> Result<()> {
95        self.rows_affected = complete.rows_affected();
96        Ok(())
97    }
98}
99
100/// Handler that collects typed rows.
101///
102/// # Example
103///
104/// ```ignore
105/// let mut handler: CollectHandler<(i32, String)> = CollectHandler::new();
106/// conn.query("SELECT id, name FROM users", &mut handler)?;
107/// for (id, name) in handler.into_rows() {
108///     println!("{}: {}", id, name);
109/// }
110/// ```
111#[derive(Default)]
112pub struct CollectHandler<T> {
113    rows: Vec<T>,
114}
115
116impl<T> CollectHandler<T> {
117    /// Create a new collect handler.
118    pub fn new() -> Self {
119        Self { rows: Vec::new() }
120    }
121
122    /// Get collected rows.
123    pub fn rows(&self) -> &[T] {
124        &self.rows
125    }
126
127    /// Take collected rows.
128    pub fn into_rows(self) -> Vec<T> {
129        self.rows
130    }
131
132    /// Get the number of collected rows.
133    pub fn len(&self) -> usize {
134        self.rows.len()
135    }
136
137    /// Check if no rows were collected.
138    pub fn is_empty(&self) -> bool {
139        self.rows.is_empty()
140    }
141}
142
143impl<T: for<'a> FromRow<'a>> TextHandler for CollectHandler<T> {
144    fn row(&mut self, cols: RowDescription<'_>, row: DataRow<'_>) -> Result<()> {
145        let typed_row = T::from_row_text(cols.fields(), row)?;
146        self.rows.push(typed_row);
147        Ok(())
148    }
149}
150
151impl<T: for<'a> FromRow<'a>> BinaryHandler for CollectHandler<T> {
152    fn row(&mut self, cols: RowDescription<'_>, row: DataRow<'_>) -> Result<()> {
153        let typed_row = T::from_row_binary(cols.fields(), row)?;
154        self.rows.push(typed_row);
155        Ok(())
156    }
157}
158
159/// Handler that collects only the first row.
160#[derive(Default)]
161pub struct FirstRowHandler<T> {
162    row: Option<T>,
163}
164
165impl<T> FirstRowHandler<T> {
166    /// Create a new first row handler.
167    pub fn new() -> Self {
168        Self { row: None }
169    }
170
171    /// Get the first row if present.
172    pub fn get(&self) -> Option<&T> {
173        self.row.as_ref()
174    }
175
176    /// Take the first row.
177    pub fn into_row(self) -> Option<T> {
178        self.row
179    }
180}
181
182impl<T: for<'a> FromRow<'a>> TextHandler for FirstRowHandler<T> {
183    fn row(&mut self, cols: RowDescription<'_>, row: DataRow<'_>) -> Result<()> {
184        if self.row.is_none() {
185            let typed_row = T::from_row_text(cols.fields(), row)?;
186            self.row = Some(typed_row);
187        }
188        Ok(())
189    }
190}
191
192impl<T: for<'a> FromRow<'a>> BinaryHandler for FirstRowHandler<T> {
193    fn row(&mut self, cols: RowDescription<'_>, row: DataRow<'_>) -> Result<()> {
194        if self.row.is_none() {
195            let typed_row = T::from_row_binary(cols.fields(), row)?;
196            self.row = Some(typed_row);
197        }
198        Ok(())
199    }
200}
201
202/// Handler that calls a closure for each row.
203pub struct ForEachHandler<T, F> {
204    f: F,
205    _marker: std::marker::PhantomData<T>,
206}
207
208impl<T, F> ForEachHandler<T, F>
209where
210    F: FnMut(T),
211{
212    /// Create a new foreach handler.
213    pub fn new(f: F) -> Self {
214        Self {
215            f,
216            _marker: std::marker::PhantomData,
217        }
218    }
219}
220
221impl<T: for<'a> FromRow<'a>, F: FnMut(T)> TextHandler for ForEachHandler<T, F> {
222    fn row(&mut self, cols: RowDescription<'_>, row: DataRow<'_>) -> Result<()> {
223        let typed_row = T::from_row_text(cols.fields(), row)?;
224        (self.f)(typed_row);
225        Ok(())
226    }
227}
228
229impl<T: for<'a> FromRow<'a>, F: FnMut(T)> BinaryHandler for ForEachHandler<T, F> {
230    fn row(&mut self, cols: RowDescription<'_>, row: DataRow<'_>) -> Result<()> {
231        let typed_row = T::from_row_binary(cols.fields(), row)?;
232        (self.f)(typed_row);
233        Ok(())
234    }
235}
236
237/// Handler for asynchronous messages from the server.
238///
239/// These messages can arrive at any time during query execution:
240/// - `Notification` - from LISTEN/NOTIFY
241/// - `Notice` - warnings and informational messages
242/// - `ParameterChanged` - server parameter updates
243///
244/// # Example
245///
246/// ```ignore
247/// use zero_postgres::{sync::Conn, AsyncMessage};
248///
249/// let mut conn = Conn::new(opts)?;
250///
251/// conn.set_async_message_handler(|msg: &AsyncMessage| {
252///     match msg {
253///         AsyncMessage::Notification { channel, payload, .. } => {
254///             println!("Notification on {}: {}", channel, payload);
255///         }
256///         AsyncMessage::Notice(err) => {
257///             println!("Notice: {:?}", err);
258///         }
259///         AsyncMessage::ParameterChanged { name, value } => {
260///             println!("Parameter {} changed to {}", name, value);
261///         }
262///     }
263/// });
264///
265/// // Subscribe to a channel
266/// conn.query_drop("LISTEN my_channel")?;
267///
268/// // Notifications will be delivered to the handler during any query
269/// conn.query_drop("")?; // empty query to poll for notifications
270/// ```
271pub trait AsyncMessageHandler: Send {
272    /// Handle an asynchronous message from the server.
273    fn handle(&mut self, msg: &AsyncMessage);
274}
275
276impl<F: FnMut(&AsyncMessage) + Send> AsyncMessageHandler for F {
277    fn handle(&mut self, msg: &AsyncMessage) {
278        self(msg)
279    }
280}