zero_postgres/sync/pipeline/
mod.rs

1//! Pipeline mode for batching multiple queries.
2//!
3//! Pipeline mode allows sending multiple queries to the server without waiting
4//! for responses, reducing round-trip latency.
5//!
6//! # Example
7//!
8//! ```ignore
9//! // Prepare statements outside the pipeline
10//! let stmts = conn.prepare_batch(&[
11//!     "SELECT id, name FROM users WHERE active = $1",
12//!     "INSERT INTO users (name) VALUES ($1) RETURNING id",
13//! ])?;
14//!
15//! let (active, inactive, count) = conn.pipeline(|p| {
16//!     // Queue executions
17//!     let t1 = p.exec(&stmts[0], (true,))?;
18//!     let t2 = p.exec(&stmts[0], (false,))?;
19//!     let t3 = p.exec("SELECT COUNT(*) FROM users", ())?;
20//!
21//!     p.sync()?;
22//!
23//!     // Claim results in order with different methods
24//!     let active: Vec<(i32, String)> = p.claim_collect(t1)?;
25//!     let inactive: Option<(i32, String)> = p.claim_one(t2)?;
26//!     let count: Vec<(i64,)> = p.claim_collect(t3)?;
27//!
28//!     Ok((active, inactive, count))
29//! })?;
30//! ```
31
32use std::collections::VecDeque;
33
34use crate::pipeline::Expectation;
35use crate::pipeline::Ticket;
36
37use crate::conversion::{FromRow, ToParams};
38use crate::error::{Error, Result};
39use crate::handler::ExtendedHandler;
40use crate::protocol::backend::{
41    BindComplete, CommandComplete, DataRow, EmptyQueryResponse, ErrorResponse, NoData,
42    ParseComplete, RawMessage, ReadyForQuery, RowDescription, msg_type,
43};
44use crate::protocol::frontend::{
45    write_bind, write_describe_portal, write_execute, write_flush, write_parse, write_sync,
46};
47use crate::state::extended::PreparedStatement;
48use crate::statement::IntoStatement;
49
50use super::conn::Conn;
51
52/// Pipeline mode for batching multiple queries.
53///
54/// Created by [`Conn::pipeline`].
55pub struct Pipeline<'a> {
56    conn: &'a mut Conn,
57    /// Monotonically increasing counter for queued operations
58    queue_seq: usize,
59    /// Next sequence number to claim
60    claim_seq: usize,
61    /// Whether the pipeline is in aborted state (error occurred)
62    aborted: bool,
63    /// Expected responses queue (exec operations and Sync points)
64    expectations: VecDeque<Expectation>,
65}
66
67impl<'a> Pipeline<'a> {
68    /// Create a new pipeline.
69    ///
70    /// Prefer using [`Conn::pipeline`] which handles cleanup automatically.
71    /// This constructor is available for advanced use cases.
72    #[cfg(feature = "lowlevel")]
73    pub fn new(conn: &'a mut Conn) -> Self {
74        Self::new_inner(conn)
75    }
76
77    /// Create a new pipeline (internal).
78    pub(crate) fn new_inner(conn: &'a mut Conn) -> Self {
79        conn.buffer_set.write_buffer.clear();
80        Self {
81            conn,
82            queue_seq: 0,
83            claim_seq: 0,
84            aborted: false,
85            expectations: VecDeque::new(),
86        }
87    }
88
89    /// Cleanup the pipeline, draining any unclaimed tickets.
90    ///
91    /// This is called automatically by [`Conn::pipeline`].
92    /// Also available with the `lowlevel` feature for manual cleanup.
93    #[cfg(feature = "lowlevel")]
94    pub fn cleanup(&mut self) {
95        self.cleanup_inner();
96    }
97
98    #[cfg(not(feature = "lowlevel"))]
99    pub(crate) fn cleanup(&mut self) {
100        self.cleanup_inner();
101    }
102
103    fn cleanup_inner(&mut self) {
104        // Nothing to clean up if no operations were queued and no expectations pending
105        if self.queue_seq == 0 && self.expectations.is_empty() {
106            return;
107        }
108
109        // Send sync if we have pending operations that weren't synced
110        if !self.conn.buffer_set.write_buffer.is_empty() {
111            let _ = self.sync();
112        } else if !self.expectations.iter().any(|e| *e == Expectation::Sync) {
113            // Buffer was flushed but no sync sent - send one now
114            let _ = self.sync();
115        }
116
117        // Drain remaining expectations
118        if self.aborted {
119            // In aborted state, server skipped remaining commands - only consume ReadyForQuery(s)
120            while let Some(expectation) = self.expectations.pop_front() {
121                if expectation == Expectation::Sync {
122                    let _ = self.consume_ready_for_query();
123                }
124            }
125        } else {
126            // Normal drain: process all expectations
127            while let Some(expectation) = self.expectations.pop_front() {
128                let _ = self.drain_expectation(expectation);
129            }
130        }
131
132        // Reset state
133        self.queue_seq = 0;
134        self.claim_seq = 0;
135        self.aborted = false;
136    }
137
138    /// Drain a single expectation.
139    fn drain_expectation(&mut self, expectation: Expectation) {
140        let mut handler = crate::handler::DropHandler::new();
141        let _ = match expectation {
142            Expectation::ParseBindExecute => self.claim_parse_bind_exec_inner(&mut handler),
143            Expectation::BindExecute => self.claim_bind_exec_inner(&mut handler, None),
144            Expectation::Sync => self.consume_ready_for_query(),
145        };
146    }
147
148    // ========================================================================
149    // Queue Operations
150    // ========================================================================
151
152    /// Queue a statement execution.
153    ///
154    /// The statement can be either:
155    /// - A `&PreparedStatement` returned from `conn.prepare()` or `conn.prepare_batch()`
156    /// - A raw SQL `&str` for one-shot execution
157    ///
158    /// This method only buffers the command locally - no network I/O occurs until
159    /// `sync()` or `flush()` is called.
160    ///
161    /// # Example
162    ///
163    /// ```ignore
164    /// let stmt = conn.prepare("SELECT id, name FROM users WHERE id = $1")?;
165    ///
166    /// let (r1, r2) = conn.pipeline(|p| {
167    ///     let t1 = p.exec(&stmt, (1,))?;
168    ///     let t2 = p.exec("SELECT COUNT(*) FROM users", ())?;
169    ///     p.sync()?;
170    ///
171    ///     let r1: Vec<(i32, String)> = p.claim_collect(t1)?;
172    ///     let r2: Option<(i64,)> = p.claim_one(t2)?;
173    ///     Ok((r1, r2))
174    /// })?;
175    /// ```
176    pub fn exec<'s, P: ToParams>(
177        &mut self,
178        statement: &'s (impl IntoStatement + ?Sized),
179        params: P,
180    ) -> Result<Ticket<'s>> {
181        let seq = self.queue_seq;
182        self.queue_seq += 1;
183
184        if statement.needs_parse() {
185            self.exec_sql_inner(statement.as_sql().unwrap(), &params)?;
186            Ok(Ticket { seq, stmt: None })
187        } else {
188            let stmt = statement.as_prepared().unwrap();
189            self.exec_prepared_inner(&stmt.wire_name(), &stmt.param_oids, &params)?;
190            Ok(Ticket {
191                seq,
192                stmt: Some(stmt),
193            })
194        }
195    }
196
197    fn exec_sql_inner<P: ToParams>(&mut self, sql: &str, params: &P) -> Result<()> {
198        let param_oids = params.natural_oids();
199        let buf = &mut self.conn.buffer_set.write_buffer;
200        write_parse(buf, "", sql, &param_oids);
201        write_bind(buf, "", "", params, &param_oids)?;
202        write_describe_portal(buf, "");
203        write_execute(buf, "", 0);
204        self.expectations.push_back(Expectation::ParseBindExecute);
205        Ok(())
206    }
207
208    fn exec_prepared_inner<P: ToParams>(
209        &mut self,
210        stmt_name: &str,
211        param_oids: &[u32],
212        params: &P,
213    ) -> Result<()> {
214        let buf = &mut self.conn.buffer_set.write_buffer;
215        write_bind(buf, "", stmt_name, params, param_oids)?;
216        // Skip write_describe_portal - use cached RowDescription from PreparedStatement
217        write_execute(buf, "", 0);
218        self.expectations.push_back(Expectation::BindExecute);
219        Ok(())
220    }
221
222    /// Send a FLUSH message to trigger server response.
223    ///
224    /// This forces the server to send all pending responses without establishing
225    /// a transaction boundary. Called automatically by claim methods when needed.
226    pub fn flush(&mut self) -> Result<()> {
227        if !self.conn.buffer_set.write_buffer.is_empty() {
228            write_flush(&mut self.conn.buffer_set.write_buffer);
229            self.conn
230                .stream
231                .write_all(&self.conn.buffer_set.write_buffer)?;
232            self.conn.stream.flush()?;
233            self.conn.buffer_set.write_buffer.clear();
234        }
235        Ok(())
236    }
237
238    /// Send a SYNC message to establish a transaction boundary.
239    ///
240    /// After calling sync, you must claim all queued operations in order.
241    /// The ReadyForQuery message will be consumed automatically after claiming.
242    pub fn sync(&mut self) -> Result<()> {
243        let result = self.sync_inner();
244        if let Err(e) = &result
245            && e.is_connection_broken()
246        {
247            self.conn.is_broken = true;
248        }
249        result
250    }
251
252    fn sync_inner(&mut self) -> Result<()> {
253        write_sync(&mut self.conn.buffer_set.write_buffer);
254        self.expectations.push_back(Expectation::Sync);
255        self.conn
256            .stream
257            .write_all(&self.conn.buffer_set.write_buffer)?;
258        self.conn.stream.flush()?;
259        self.conn.buffer_set.write_buffer.clear();
260        Ok(())
261    }
262
263    /// Consume a single ReadyForQuery message.
264    fn consume_ready_for_query(&mut self) -> Result<()> {
265        loop {
266            self.conn.stream.read_message(&mut self.conn.buffer_set)?;
267            let type_byte = self.conn.buffer_set.type_byte;
268
269            if RawMessage::is_async_type(type_byte) {
270                continue;
271            }
272
273            if type_byte == msg_type::ERROR_RESPONSE {
274                let error = ErrorResponse::parse(&self.conn.buffer_set.read_buffer)?;
275                return Err(error.into_error());
276            }
277
278            if type_byte == msg_type::READY_FOR_QUERY {
279                let ready = ReadyForQuery::parse(&self.conn.buffer_set.read_buffer)?;
280                self.conn.transaction_status = ready.transaction_status().unwrap_or_default();
281                return Ok(());
282            }
283        }
284    }
285
286    /// Consume all pending Sync expectations from the front of the queue.
287    fn consume_pending_syncs(&mut self) -> Result<()> {
288        while self.expectations.front() == Some(&Expectation::Sync) {
289            self.expectations.pop_front();
290            self.consume_ready_for_query()?;
291            // Reset aborted state - after ReadyForQuery, pipeline can continue
292            self.aborted = false;
293        }
294        Ok(())
295    }
296
297    // ========================================================================
298    // Claim Operations
299    // ========================================================================
300
301    /// Claim with a custom handler.
302    ///
303    /// Results must be claimed in the same order they were queued.
304    #[cfg(feature = "lowlevel")]
305    pub fn claim<H: ExtendedHandler>(&mut self, ticket: Ticket<'_>, handler: &mut H) -> Result<()> {
306        self.claim_with_handler(ticket, handler)
307    }
308
309    fn claim_with_handler<H: ExtendedHandler>(
310        &mut self,
311        ticket: Ticket<'_>,
312        handler: &mut H,
313    ) -> Result<()> {
314        self.check_sequence(ticket.seq)?;
315
316        // Auto-sync if buffer has unsent data
317        if !self.conn.buffer_set.write_buffer.is_empty() {
318            self.sync()?;
319        }
320
321        if self.aborted {
322            self.claim_seq += 1;
323            // Pop but don't process the exec expectation (server skipped it)
324            self.expectations.pop_front();
325            self.consume_pending_syncs()?;
326            return Err(Error::Protocol(
327                "pipeline aborted due to earlier error".into(),
328            ));
329        }
330
331        let expectation = self.expectations.pop_front();
332
333        let result = match expectation {
334            Some(Expectation::ParseBindExecute) => self.claim_parse_bind_exec_inner(handler),
335            Some(Expectation::BindExecute) => self.claim_bind_exec_inner(handler, ticket.stmt),
336            Some(Expectation::Sync) => Err(Error::Protocol("unexpected Sync expectation".into())),
337            None => Err(Error::Protocol("no expectation in queue".into())),
338        };
339
340        if let Err(e) = &result {
341            if e.is_connection_broken() {
342                self.conn.is_broken = true;
343            }
344            self.aborted = true;
345        }
346        self.claim_seq += 1;
347        self.consume_pending_syncs()?;
348        result
349    }
350
351    /// Claim and collect all rows.
352    ///
353    /// Results must be claimed in the same order they were queued.
354    pub fn claim_collect<T: for<'b> FromRow<'b>>(&mut self, ticket: Ticket<'_>) -> Result<Vec<T>> {
355        let mut handler = crate::handler::CollectHandler::<T>::new();
356        self.claim_with_handler(ticket, &mut handler)?;
357        Ok(handler.into_rows())
358    }
359
360    /// Claim and return just the first row.
361    ///
362    /// Results must be claimed in the same order they were queued.
363    pub fn claim_one<T: for<'b> FromRow<'b>>(&mut self, ticket: Ticket<'_>) -> Result<Option<T>> {
364        let mut handler = crate::handler::FirstRowHandler::<T>::new();
365        self.claim_with_handler(ticket, &mut handler)?;
366        Ok(handler.into_row())
367    }
368
369    /// Claim and discard all rows.
370    ///
371    /// Results must be claimed in the same order they were queued.
372    pub fn claim_drop(&mut self, ticket: Ticket<'_>) -> Result<()> {
373        let mut handler = crate::handler::DropHandler::new();
374        self.claim_with_handler(ticket, &mut handler)
375    }
376
377    /// Check that the ticket sequence matches the expected claim sequence.
378    fn check_sequence(&self, seq: usize) -> Result<()> {
379        if seq != self.claim_seq {
380            return Err(Error::InvalidUsage(format!(
381                "claim out of order: expected seq {}, got {}",
382                self.claim_seq, seq
383            )));
384        }
385        Ok(())
386    }
387
388    /// Claim Parse + Bind + Execute (for raw SQL exec() calls).
389    fn claim_parse_bind_exec_inner<H: ExtendedHandler>(&mut self, handler: &mut H) -> Result<()> {
390        // Expect: ParseComplete
391        self.read_next_message()?;
392        if self.conn.buffer_set.type_byte != msg_type::PARSE_COMPLETE {
393            return self.unexpected_message("ParseComplete");
394        }
395        ParseComplete::parse(&self.conn.buffer_set.read_buffer)?;
396
397        // Expect: BindComplete
398        self.read_next_message()?;
399        if self.conn.buffer_set.type_byte != msg_type::BIND_COMPLETE {
400            return self.unexpected_message("BindComplete");
401        }
402        BindComplete::parse(&self.conn.buffer_set.read_buffer)?;
403
404        // Now read rows
405        self.claim_rows_inner(handler)
406    }
407
408    /// Claim Bind + Execute (for prepared statement exec() calls).
409    fn claim_bind_exec_inner<H: ExtendedHandler>(
410        &mut self,
411        handler: &mut H,
412        stmt: Option<&PreparedStatement>,
413    ) -> Result<()> {
414        // Expect: BindComplete
415        self.read_next_message()?;
416        if self.conn.buffer_set.type_byte != msg_type::BIND_COMPLETE {
417            return self.unexpected_message("BindComplete");
418        }
419        BindComplete::parse(&self.conn.buffer_set.read_buffer)?;
420
421        // Use cached RowDescription from PreparedStatement (no copy)
422        let row_desc = stmt.and_then(|s| s.row_desc_payload());
423
424        // Now read rows (no RowDescription/NoData expected from server)
425        self.claim_rows_cached_inner(handler, row_desc)
426    }
427
428    /// Common row reading logic (reads RowDescription from server).
429    fn claim_rows_inner<H: ExtendedHandler>(&mut self, handler: &mut H) -> Result<()> {
430        // Expect RowDescription or NoData
431        self.read_next_message()?;
432        let has_rows = match self.conn.buffer_set.type_byte {
433            msg_type::ROW_DESCRIPTION => {
434                self.conn.buffer_set.save_column_buffer();
435                true
436            }
437            msg_type::NO_DATA => {
438                NoData::parse(&self.conn.buffer_set.read_buffer)?;
439                // No rows will follow, but we still need terminal message
440                false
441            }
442            _ => {
443                return Err(Error::Protocol(format!(
444                    "expected RowDescription or NoData, got '{}'",
445                    self.conn.buffer_set.type_byte as char
446                )));
447            }
448        };
449
450        // Read data rows until terminal message
451        loop {
452            self.read_next_message()?;
453            let type_byte = self.conn.buffer_set.type_byte;
454
455            match type_byte {
456                msg_type::DATA_ROW => {
457                    if !has_rows {
458                        return Err(Error::Protocol(
459                            "received DataRow but no RowDescription".into(),
460                        ));
461                    }
462                    let cols = RowDescription::parse(&self.conn.buffer_set.column_buffer)?;
463                    let row = DataRow::parse(&self.conn.buffer_set.read_buffer)?;
464                    handler.row(cols, row)?;
465                }
466                msg_type::COMMAND_COMPLETE => {
467                    let cmd = CommandComplete::parse(&self.conn.buffer_set.read_buffer)?;
468                    handler.result_end(cmd)?;
469                    return Ok(());
470                }
471                msg_type::EMPTY_QUERY_RESPONSE => {
472                    EmptyQueryResponse::parse(&self.conn.buffer_set.read_buffer)?;
473                    return Ok(());
474                }
475                _ => {
476                    return Err(Error::Protocol(format!(
477                        "unexpected message type in pipeline claim: '{}'",
478                        type_byte as char
479                    )));
480                }
481            }
482        }
483    }
484
485    /// Row reading logic with cached RowDescription (no server message expected).
486    fn claim_rows_cached_inner<H: ExtendedHandler>(
487        &mut self,
488        handler: &mut H,
489        row_desc: Option<&[u8]>,
490    ) -> Result<()> {
491        // Read data rows until terminal message
492        loop {
493            self.read_next_message()?;
494            let type_byte = self.conn.buffer_set.type_byte;
495
496            match type_byte {
497                msg_type::DATA_ROW => {
498                    let row_desc = row_desc.ok_or_else(|| {
499                        Error::Protocol("received DataRow but no RowDescription cached".into())
500                    })?;
501                    let cols = RowDescription::parse(row_desc)?;
502                    let row = DataRow::parse(&self.conn.buffer_set.read_buffer)?;
503                    handler.row(cols, row)?;
504                }
505                msg_type::COMMAND_COMPLETE => {
506                    let cmd = CommandComplete::parse(&self.conn.buffer_set.read_buffer)?;
507                    handler.result_end(cmd)?;
508                    return Ok(());
509                }
510                msg_type::EMPTY_QUERY_RESPONSE => {
511                    EmptyQueryResponse::parse(&self.conn.buffer_set.read_buffer)?;
512                    return Ok(());
513                }
514                _ => {
515                    return Err(Error::Protocol(format!(
516                        "unexpected message type in pipeline claim: '{}'",
517                        type_byte as char
518                    )));
519                }
520            }
521        }
522    }
523
524    /// Read the next message, skipping async messages and handling errors.
525    fn read_next_message(&mut self) -> Result<()> {
526        loop {
527            self.conn.stream.read_message(&mut self.conn.buffer_set)?;
528            let type_byte = self.conn.buffer_set.type_byte;
529
530            // Handle async messages
531            if RawMessage::is_async_type(type_byte) {
532                continue;
533            }
534
535            // Handle error
536            if type_byte == msg_type::ERROR_RESPONSE {
537                let error = ErrorResponse::parse(&self.conn.buffer_set.read_buffer)?;
538                return Err(error.into_error());
539            }
540
541            return Ok(());
542        }
543    }
544
545    /// Create an error for unexpected message type.
546    fn unexpected_message<T>(&self, expected: &str) -> Result<T> {
547        Err(Error::Protocol(format!(
548            "expected {}, got '{}'",
549            expected, self.conn.buffer_set.type_byte as char
550        )))
551    }
552
553    /// Returns the number of operations that have been queued but not yet claimed.
554    pub fn pending_count(&self) -> usize {
555        self.queue_seq - self.claim_seq
556    }
557
558    /// Returns true if the pipeline is in aborted state due to an error.
559    pub fn is_aborted(&self) -> bool {
560        self.aborted
561    }
562}