Skip to main content

zero_postgres/
handler.rs

1//! Handlers define what to do with received column definition and row packets.
2
3use crate::conversion::FromRow;
4use crate::error::Result;
5use crate::protocol::backend::query::{CommandComplete, DataRow, RowDescription};
6use crate::state::action::AsyncMessage;
7
8/// Handler for simple query protocol.
9///
10/// Callback patterns by statement type:
11/// - SELECT with rows: `result_start` → `row*` → `result_end`
12/// - SELECT with 0 rows: `result_start` → `result_end`
13/// - INSERT/UPDATE/DELETE: `result_end` only (with affected row count)
14///
15/// For multi-statement queries like `"SELECT 1; UPDATE foo SET x=1"`:
16/// ```text
17/// result_start → row* → result_end   // SELECT 1
18/// result_end                          // UPDATE
19/// ```
20pub trait SimpleHandler {
21    /// Called when a result set begins.
22    fn result_start(&mut self, cols: RowDescription<'_>) -> Result<()> {
23        let _ = cols;
24        Ok(())
25    }
26
27    /// Called for each data row.
28    fn row(&mut self, cols: RowDescription<'_>, row: DataRow<'_>) -> Result<()>;
29
30    /// Called when a result set ends.
31    fn result_end(&mut self, complete: CommandComplete<'_>) -> Result<()> {
32        let _ = complete;
33        Ok(())
34    }
35}
36
37/// Handler for extended query protocol.
38///
39/// Callback patterns by statement type:
40/// - SELECT with rows: `result_start` → `row*` → `result_end`
41/// - SELECT with 0 rows: `result_start` → `result_end`
42/// - INSERT/UPDATE/DELETE: `result_end` only (with affected row count)
43pub trait ExtendedHandler {
44    /// Called when a result set begins.
45    fn result_start(&mut self, cols: RowDescription<'_>) -> Result<()> {
46        let _ = cols;
47        Ok(())
48    }
49
50    /// Called for each data row.
51    fn row(&mut self, cols: RowDescription<'_>, row: DataRow<'_>) -> Result<()>;
52
53    /// Called when a result set ends.
54    fn result_end(&mut self, complete: CommandComplete<'_>) -> Result<()> {
55        let _ = complete;
56        Ok(())
57    }
58}
59
60/// A handler that discards all results.
61#[derive(Debug, Default)]
62pub struct DropHandler {
63    rows_affected: Option<u64>,
64}
65
66impl DropHandler {
67    /// Create a new drop handler.
68    pub fn new() -> Self {
69        Self::default()
70    }
71
72    /// Get the number of rows affected (if applicable).
73    pub fn rows_affected(&self) -> Option<u64> {
74        self.rows_affected
75    }
76}
77
78impl SimpleHandler for DropHandler {
79    fn row(&mut self, _cols: RowDescription<'_>, _row: DataRow<'_>) -> Result<()> {
80        Ok(())
81    }
82
83    fn result_end(&mut self, complete: CommandComplete<'_>) -> Result<()> {
84        self.rows_affected = complete.rows_affected();
85        Ok(())
86    }
87}
88
89impl ExtendedHandler for DropHandler {
90    fn row(&mut self, _cols: RowDescription<'_>, _row: DataRow<'_>) -> Result<()> {
91        Ok(())
92    }
93
94    fn result_end(&mut self, complete: CommandComplete<'_>) -> Result<()> {
95        self.rows_affected = complete.rows_affected();
96        Ok(())
97    }
98}
99
100/// Handler that collects typed rows.
101///
102/// # Example
103///
104/// ```ignore
105/// let mut handler: CollectHandler<(i32, String)> = CollectHandler::new();
106/// conn.query("SELECT id, name FROM users", &mut handler)?;
107/// for (id, name) in handler.into_rows() {
108///     println!("{}: {}", id, name);
109/// }
110/// ```
111#[derive(Default)]
112pub struct CollectHandler<T> {
113    rows: Vec<T>,
114}
115
116impl<T> CollectHandler<T> {
117    /// Create a new collect handler.
118    pub fn new() -> Self {
119        Self { rows: Vec::new() }
120    }
121
122    /// Get collected rows.
123    pub fn rows(&self) -> &[T] {
124        &self.rows
125    }
126
127    /// Take collected rows.
128    pub fn into_rows(self) -> Vec<T> {
129        self.rows
130    }
131
132    /// Get the number of collected rows.
133    pub fn len(&self) -> usize {
134        self.rows.len()
135    }
136
137    /// Check if no rows were collected.
138    pub fn is_empty(&self) -> bool {
139        self.rows.is_empty()
140    }
141}
142
143impl<T: for<'a> FromRow<'a>> SimpleHandler for CollectHandler<T> {
144    fn row(&mut self, cols: RowDescription<'_>, row: DataRow<'_>) -> Result<()> {
145        let typed_row = T::from_row_text(cols.fields(), row)?;
146        self.rows.push(typed_row);
147        Ok(())
148    }
149}
150
151impl<T: for<'a> FromRow<'a>> ExtendedHandler for CollectHandler<T> {
152    fn row(&mut self, cols: RowDescription<'_>, row: DataRow<'_>) -> Result<()> {
153        let typed_row = T::from_row_binary(cols.fields(), row)?;
154        self.rows.push(typed_row);
155        Ok(())
156    }
157}
158
159/// Handler that collects only the first row.
160#[derive(Default)]
161pub struct FirstRowHandler<T> {
162    row: Option<T>,
163}
164
165impl<T> FirstRowHandler<T> {
166    /// Create a new first row handler.
167    pub fn new() -> Self {
168        Self { row: None }
169    }
170
171    /// Get the first row if present.
172    pub fn get(&self) -> Option<&T> {
173        self.row.as_ref()
174    }
175
176    /// Take the first row.
177    pub fn into_row(self) -> Option<T> {
178        self.row
179    }
180}
181
182impl<T: for<'a> FromRow<'a>> SimpleHandler for FirstRowHandler<T> {
183    fn row(&mut self, cols: RowDescription<'_>, row: DataRow<'_>) -> Result<()> {
184        if self.row.is_none() {
185            let typed_row = T::from_row_text(cols.fields(), row)?;
186            self.row = Some(typed_row);
187        }
188        Ok(())
189    }
190}
191
192impl<T: for<'a> FromRow<'a>> ExtendedHandler for FirstRowHandler<T> {
193    fn row(&mut self, cols: RowDescription<'_>, row: DataRow<'_>) -> Result<()> {
194        if self.row.is_none() {
195            let typed_row = T::from_row_binary(cols.fields(), row)?;
196            self.row = Some(typed_row);
197        }
198        Ok(())
199    }
200}
201
202/// Handler that calls a closure for each row.
203pub struct ForEachHandler<T, F> {
204    f: F,
205    _marker: std::marker::PhantomData<T>,
206}
207
208impl<T, F> ForEachHandler<T, F>
209where
210    F: FnMut(T) -> Result<()>,
211{
212    /// Create a new foreach handler.
213    pub fn new(f: F) -> Self {
214        Self {
215            f,
216            _marker: std::marker::PhantomData,
217        }
218    }
219}
220
221impl<T: for<'a> FromRow<'a>, F: FnMut(T) -> Result<()>> SimpleHandler for ForEachHandler<T, F> {
222    fn row(&mut self, cols: RowDescription<'_>, row: DataRow<'_>) -> Result<()> {
223        let typed_row = T::from_row_text(cols.fields(), row)?;
224        (self.f)(typed_row)
225    }
226}
227
228impl<T: for<'a> FromRow<'a>, F: FnMut(T) -> Result<()>> ExtendedHandler for ForEachHandler<T, F> {
229    fn row(&mut self, cols: RowDescription<'_>, row: DataRow<'_>) -> Result<()> {
230        let typed_row = T::from_row_binary(cols.fields(), row)?;
231        (self.f)(typed_row)
232    }
233}
234
235/// A handler that calls a closure for each row using zero-copy RefFromRow.
236///
237/// Unlike `ForEachHandler`, this handler uses `RefFromRow` to decode rows
238/// as zero-copy references into the buffer. The closure receives a reference
239/// to the decoded struct.
240///
241/// # Requirements
242///
243/// - The row type must implement `RefFromRow`
244/// - All struct fields must use `LengthPrefixed<T>` with big-endian types
245/// - All columns must be `NOT NULL`
246pub struct ForEachRefHandler<Row, F> {
247    f: F,
248    _marker: std::marker::PhantomData<Row>,
249}
250
251impl<Row, F> ForEachRefHandler<Row, F> {
252    pub fn new(f: F) -> Self {
253        Self {
254            f,
255            _marker: std::marker::PhantomData,
256        }
257    }
258}
259
260impl<Row, F> ExtendedHandler for ForEachRefHandler<Row, F>
261where
262    Row: for<'a> crate::conversion::ref_row::RefFromRow<'a>,
263    F: for<'a> FnMut(&'a Row) -> Result<()>,
264{
265    fn row(&mut self, cols: RowDescription<'_>, row: DataRow<'_>) -> Result<()> {
266        let parsed = Row::ref_from_row_binary(cols.fields(), row)?;
267        (self.f)(parsed)
268    }
269}
270
271/// Handler for asynchronous messages from the server.
272///
273/// These messages can arrive at any time during query execution:
274/// - `Notification` - from LISTEN/NOTIFY
275/// - `Notice` - warnings and informational messages
276/// - `ParameterChanged` - server parameter updates
277///
278/// # Example
279///
280/// ```ignore
281/// use zero_postgres::{sync::Conn, AsyncMessage};
282///
283/// let mut conn = Conn::new(opts)?;
284///
285/// conn.set_async_message_handler(|msg: &AsyncMessage| {
286///     match msg {
287///         AsyncMessage::Notification { channel, payload, .. } => {
288///             println!("Notification on {}: {}", channel, payload);
289///         }
290///         AsyncMessage::Notice(err) => {
291///             println!("Notice: {:?}", err);
292///         }
293///         AsyncMessage::ParameterChanged { name, value } => {
294///             println!("Parameter {} changed to {}", name, value);
295///         }
296///     }
297/// });
298///
299/// // Subscribe to a channel
300/// conn.query_drop("LISTEN my_channel")?;
301///
302/// // Notifications will be delivered to the handler during any query
303/// conn.query_drop("")?; // empty query to poll for notifications
304/// ```
305pub trait AsyncMessageHandler: Send {
306    /// Handle an asynchronous message from the server.
307    fn handle(&mut self, msg: &AsyncMessage);
308}
309
310impl<F: FnMut(&AsyncMessage) + Send> AsyncMessageHandler for F {
311    fn handle(&mut self, msg: &AsyncMessage) {
312        self(msg)
313    }
314}