zero_postgres/sync/pipeline/
mod.rs

1//! Pipeline mode for batching multiple queries.
2//!
3//! Pipeline mode allows sending multiple queries to the server without waiting
4//! for responses, reducing round-trip latency.
5//!
6//! # Example
7//!
8//! ```ignore
9//! // Prepare statements outside the pipeline
10//! let stmts = conn.prepare_batch(&[
11//!     "SELECT id, name FROM users WHERE active = $1",
12//!     "INSERT INTO users (name) VALUES ($1) RETURNING id",
13//! ])?;
14//!
15//! let (active, inactive, count) = conn.run_pipeline(|p| {
16//!     // Queue executions
17//!     let t1 = p.exec(&stmts[0], (true,))?;
18//!     let t2 = p.exec(&stmts[0], (false,))?;
19//!     let t3 = p.exec("SELECT COUNT(*) FROM users", ())?;
20//!
21//!     p.sync()?;
22//!
23//!     // Claim results in order with different methods
24//!     let active: Vec<(i32, String)> = p.claim_collect(t1)?;
25//!     let inactive: Option<(i32, String)> = p.claim_one(t2)?;
26//!     let count: Vec<(i64,)> = p.claim_collect(t3)?;
27//!
28//!     Ok((active, inactive, count))
29//! })?;
30//! ```
31
32use std::collections::VecDeque;
33
34use crate::pipeline::Expectation;
35use crate::pipeline::Ticket;
36
37use crate::conversion::{FromRow, ToParams};
38use crate::error::{Error, Result};
39use crate::handler::BinaryHandler;
40use crate::protocol::backend::{
41    BindComplete, CommandComplete, DataRow, EmptyQueryResponse, ErrorResponse, NoData,
42    ParseComplete, RawMessage, ReadyForQuery, RowDescription, msg_type,
43};
44use crate::protocol::frontend::{
45    write_bind, write_describe_portal, write_execute, write_flush, write_parse, write_sync,
46};
47use crate::state::extended::PreparedStatement;
48use crate::statement::IntoStatement;
49
50use super::conn::Conn;
51
52/// Pipeline mode for batching multiple queries.
53///
54/// Created by [`Conn::run_pipeline`].
55pub struct Pipeline<'a> {
56    conn: &'a mut Conn,
57    /// Monotonically increasing counter for queued operations
58    queue_seq: usize,
59    /// Next sequence number to claim
60    claim_seq: usize,
61    /// Whether the pipeline is in aborted state (error occurred)
62    aborted: bool,
63    /// Buffer for column descriptions during row processing
64    column_buffer: Vec<u8>,
65    /// Expected responses queue (exec operations and Sync points)
66    expectations: VecDeque<Expectation>,
67}
68
69impl<'a> Pipeline<'a> {
70    /// Create a new pipeline.
71    ///
72    /// Prefer using [`Conn::run_pipeline`] which handles cleanup automatically.
73    /// This constructor is available for advanced use cases.
74    #[cfg(feature = "lowlevel")]
75    pub fn new(conn: &'a mut Conn) -> Self {
76        Self::new_inner(conn)
77    }
78
79    /// Create a new pipeline (internal).
80    pub(crate) fn new_inner(conn: &'a mut Conn) -> Self {
81        conn.buffer_set.write_buffer.clear();
82        Self {
83            conn,
84            queue_seq: 0,
85            claim_seq: 0,
86            aborted: false,
87            column_buffer: Vec::new(),
88            expectations: VecDeque::new(),
89        }
90    }
91
92    /// Cleanup the pipeline, draining any unclaimed tickets.
93    ///
94    /// This is called automatically by [`Conn::run_pipeline`].
95    /// Also available with the `lowlevel` feature for manual cleanup.
96    #[cfg(feature = "lowlevel")]
97    pub fn cleanup(&mut self) {
98        self.cleanup_inner();
99    }
100
101    #[cfg(not(feature = "lowlevel"))]
102    pub(crate) fn cleanup(&mut self) {
103        self.cleanup_inner();
104    }
105
106    fn cleanup_inner(&mut self) {
107        // Nothing to clean up if no operations were queued and no expectations pending
108        if self.queue_seq == 0 && self.expectations.is_empty() {
109            return;
110        }
111
112        // Send sync if we have pending operations that weren't synced
113        if !self.conn.buffer_set.write_buffer.is_empty() {
114            let _ = self.sync();
115        } else if !self.expectations.iter().any(|e| *e == Expectation::Sync) {
116            // Buffer was flushed but no sync sent - send one now
117            let _ = self.sync();
118        }
119
120        // Drain remaining expectations
121        if self.aborted {
122            // In aborted state, server skipped remaining commands - only consume ReadyForQuery(s)
123            while let Some(expectation) = self.expectations.pop_front() {
124                if expectation == Expectation::Sync {
125                    let _ = self.consume_ready_for_query();
126                }
127            }
128        } else {
129            // Normal drain: process all expectations
130            while let Some(expectation) = self.expectations.pop_front() {
131                let _ = self.drain_expectation(expectation);
132            }
133        }
134
135        // Reset state
136        self.queue_seq = 0;
137        self.claim_seq = 0;
138        self.aborted = false;
139    }
140
141    /// Drain a single expectation.
142    fn drain_expectation(&mut self, expectation: Expectation) {
143        let mut handler = crate::handler::DropHandler::new();
144        let _ = match expectation {
145            Expectation::ParseBindExecute => self.claim_parse_bind_exec_inner(&mut handler),
146            Expectation::BindExecute => self.claim_bind_exec_inner(&mut handler, None),
147            Expectation::Sync => self.consume_ready_for_query(),
148        };
149    }
150
151    // ========================================================================
152    // Queue Operations
153    // ========================================================================
154
155    /// Queue a statement execution.
156    ///
157    /// The statement can be either:
158    /// - A `&PreparedStatement` returned from `conn.prepare()` or `conn.prepare_batch()`
159    /// - A raw SQL `&str` for one-shot execution
160    ///
161    /// This method only buffers the command locally - no network I/O occurs until
162    /// `sync()` or `flush()` is called.
163    ///
164    /// # Example
165    ///
166    /// ```ignore
167    /// let stmt = conn.prepare("SELECT id, name FROM users WHERE id = $1")?;
168    ///
169    /// let (r1, r2) = conn.run_pipeline(|p| {
170    ///     let t1 = p.exec(&stmt, (1,))?;
171    ///     let t2 = p.exec("SELECT COUNT(*) FROM users", ())?;
172    ///     p.sync()?;
173    ///
174    ///     let r1: Vec<(i32, String)> = p.claim_collect(t1)?;
175    ///     let r2: Option<(i64,)> = p.claim_one(t2)?;
176    ///     Ok((r1, r2))
177    /// })?;
178    /// ```
179    pub fn exec<'s, P: ToParams>(
180        &mut self,
181        statement: &'s (impl IntoStatement + ?Sized),
182        params: P,
183    ) -> Result<Ticket<'s>> {
184        let seq = self.queue_seq;
185        self.queue_seq += 1;
186
187        if statement.needs_parse() {
188            self.exec_sql_inner(statement.as_sql().unwrap(), &params)?;
189            Ok(Ticket { seq, stmt: None })
190        } else {
191            let stmt = statement.as_prepared().unwrap();
192            self.exec_prepared_inner(&stmt.wire_name(), &stmt.param_oids, &params)?;
193            Ok(Ticket {
194                seq,
195                stmt: Some(stmt),
196            })
197        }
198    }
199
200    fn exec_sql_inner<P: ToParams>(&mut self, sql: &str, params: &P) -> Result<()> {
201        let param_oids = params.natural_oids();
202        let buf = &mut self.conn.buffer_set.write_buffer;
203        write_parse(buf, "", sql, &param_oids);
204        write_bind(buf, "", "", params, &param_oids)?;
205        write_describe_portal(buf, "");
206        write_execute(buf, "", 0);
207        self.expectations.push_back(Expectation::ParseBindExecute);
208        Ok(())
209    }
210
211    fn exec_prepared_inner<P: ToParams>(
212        &mut self,
213        stmt_name: &str,
214        param_oids: &[u32],
215        params: &P,
216    ) -> Result<()> {
217        let buf = &mut self.conn.buffer_set.write_buffer;
218        write_bind(buf, "", stmt_name, params, param_oids)?;
219        // Skip write_describe_portal - use cached RowDescription from PreparedStatement
220        write_execute(buf, "", 0);
221        self.expectations.push_back(Expectation::BindExecute);
222        Ok(())
223    }
224
225    /// Send a FLUSH message to trigger server response.
226    ///
227    /// This forces the server to send all pending responses without establishing
228    /// a transaction boundary. Called automatically by claim methods when needed.
229    pub fn flush(&mut self) -> Result<()> {
230        if !self.conn.buffer_set.write_buffer.is_empty() {
231            write_flush(&mut self.conn.buffer_set.write_buffer);
232            self.conn
233                .stream
234                .write_all(&self.conn.buffer_set.write_buffer)?;
235            self.conn.stream.flush()?;
236            self.conn.buffer_set.write_buffer.clear();
237        }
238        Ok(())
239    }
240
241    /// Send a SYNC message to establish a transaction boundary.
242    ///
243    /// After calling sync, you must claim all queued operations in order.
244    /// The ReadyForQuery message will be consumed automatically after claiming.
245    pub fn sync(&mut self) -> Result<()> {
246        let result = self.sync_inner();
247        if let Err(e) = &result
248            && e.is_connection_broken()
249        {
250            self.conn.is_broken = true;
251        }
252        result
253    }
254
255    fn sync_inner(&mut self) -> Result<()> {
256        write_sync(&mut self.conn.buffer_set.write_buffer);
257        self.expectations.push_back(Expectation::Sync);
258        self.conn
259            .stream
260            .write_all(&self.conn.buffer_set.write_buffer)?;
261        self.conn.stream.flush()?;
262        self.conn.buffer_set.write_buffer.clear();
263        Ok(())
264    }
265
266    /// Consume a single ReadyForQuery message.
267    fn consume_ready_for_query(&mut self) -> Result<()> {
268        loop {
269            self.conn.stream.read_message(&mut self.conn.buffer_set)?;
270            let type_byte = self.conn.buffer_set.type_byte;
271
272            if RawMessage::is_async_type(type_byte) {
273                continue;
274            }
275
276            if type_byte == msg_type::ERROR_RESPONSE {
277                let error = ErrorResponse::parse(&self.conn.buffer_set.read_buffer)?;
278                return Err(error.into_error());
279            }
280
281            if type_byte == msg_type::READY_FOR_QUERY {
282                let ready = ReadyForQuery::parse(&self.conn.buffer_set.read_buffer)?;
283                self.conn.transaction_status = ready.transaction_status().unwrap_or_default();
284                return Ok(());
285            }
286        }
287    }
288
289    /// Consume all pending Sync expectations from the front of the queue.
290    fn consume_pending_syncs(&mut self) -> Result<()> {
291        while self.expectations.front() == Some(&Expectation::Sync) {
292            self.expectations.pop_front();
293            self.consume_ready_for_query()?;
294            // Reset aborted state - after ReadyForQuery, pipeline can continue
295            self.aborted = false;
296        }
297        Ok(())
298    }
299
300    // ========================================================================
301    // Claim Operations
302    // ========================================================================
303
304    /// Claim with a custom handler.
305    ///
306    /// Results must be claimed in the same order they were queued.
307    #[cfg(feature = "lowlevel")]
308    pub fn claim<H: BinaryHandler>(&mut self, ticket: Ticket<'_>, handler: &mut H) -> Result<()> {
309        self.claim_with_handler(ticket, handler)
310    }
311
312    fn claim_with_handler<H: BinaryHandler>(
313        &mut self,
314        ticket: Ticket<'_>,
315        handler: &mut H,
316    ) -> Result<()> {
317        self.check_sequence(ticket.seq)?;
318
319        // Auto-sync if buffer has unsent data
320        if !self.conn.buffer_set.write_buffer.is_empty() {
321            self.sync()?;
322        }
323
324        if self.aborted {
325            self.claim_seq += 1;
326            // Pop but don't process the exec expectation (server skipped it)
327            self.expectations.pop_front();
328            self.consume_pending_syncs()?;
329            return Err(Error::Protocol(
330                "pipeline aborted due to earlier error".into(),
331            ));
332        }
333
334        let expectation = self.expectations.pop_front();
335
336        let result = match expectation {
337            Some(Expectation::ParseBindExecute) => self.claim_parse_bind_exec_inner(handler),
338            Some(Expectation::BindExecute) => self.claim_bind_exec_inner(handler, ticket.stmt),
339            Some(Expectation::Sync) => Err(Error::Protocol("unexpected Sync expectation".into())),
340            None => Err(Error::Protocol("no expectation in queue".into())),
341        };
342
343        if let Err(e) = &result {
344            if e.is_connection_broken() {
345                self.conn.is_broken = true;
346            }
347            self.aborted = true;
348        }
349        self.claim_seq += 1;
350        self.consume_pending_syncs()?;
351        result
352    }
353
354    /// Claim and collect all rows.
355    ///
356    /// Results must be claimed in the same order they were queued.
357    pub fn claim_collect<T: for<'b> FromRow<'b>>(&mut self, ticket: Ticket<'_>) -> Result<Vec<T>> {
358        let mut handler = crate::handler::CollectHandler::<T>::new();
359        self.claim_with_handler(ticket, &mut handler)?;
360        Ok(handler.into_rows())
361    }
362
363    /// Claim and return just the first row.
364    ///
365    /// Results must be claimed in the same order they were queued.
366    pub fn claim_one<T: for<'b> FromRow<'b>>(&mut self, ticket: Ticket<'_>) -> Result<Option<T>> {
367        let mut handler = crate::handler::FirstRowHandler::<T>::new();
368        self.claim_with_handler(ticket, &mut handler)?;
369        Ok(handler.into_row())
370    }
371
372    /// Claim and discard all rows.
373    ///
374    /// Results must be claimed in the same order they were queued.
375    pub fn claim_drop(&mut self, ticket: Ticket<'_>) -> Result<()> {
376        let mut handler = crate::handler::DropHandler::new();
377        self.claim_with_handler(ticket, &mut handler)
378    }
379
380    /// Check that the ticket sequence matches the expected claim sequence.
381    fn check_sequence(&self, seq: usize) -> Result<()> {
382        if seq != self.claim_seq {
383            return Err(Error::InvalidUsage(format!(
384                "claim out of order: expected seq {}, got {}",
385                self.claim_seq, seq
386            )));
387        }
388        Ok(())
389    }
390
391    /// Claim Parse + Bind + Execute (for raw SQL exec() calls).
392    fn claim_parse_bind_exec_inner<H: BinaryHandler>(&mut self, handler: &mut H) -> Result<()> {
393        // Expect: ParseComplete
394        self.read_next_message()?;
395        if self.conn.buffer_set.type_byte != msg_type::PARSE_COMPLETE {
396            return self.unexpected_message("ParseComplete");
397        }
398        ParseComplete::parse(&self.conn.buffer_set.read_buffer)?;
399
400        // Expect: BindComplete
401        self.read_next_message()?;
402        if self.conn.buffer_set.type_byte != msg_type::BIND_COMPLETE {
403            return self.unexpected_message("BindComplete");
404        }
405        BindComplete::parse(&self.conn.buffer_set.read_buffer)?;
406
407        // Now read rows
408        self.claim_rows_inner(handler)
409    }
410
411    /// Claim Bind + Execute (for prepared statement exec() calls).
412    fn claim_bind_exec_inner<H: BinaryHandler>(
413        &mut self,
414        handler: &mut H,
415        stmt: Option<&PreparedStatement>,
416    ) -> Result<()> {
417        // Expect: BindComplete
418        self.read_next_message()?;
419        if self.conn.buffer_set.type_byte != msg_type::BIND_COMPLETE {
420            return self.unexpected_message("BindComplete");
421        }
422        BindComplete::parse(&self.conn.buffer_set.read_buffer)?;
423
424        // Use cached RowDescription from PreparedStatement (no copy)
425        let row_desc = stmt.and_then(|s| s.row_desc_payload());
426
427        // Now read rows (no RowDescription/NoData expected from server)
428        self.claim_rows_cached_inner(handler, row_desc)
429    }
430
431    /// Common row reading logic (reads RowDescription from server).
432    fn claim_rows_inner<H: BinaryHandler>(&mut self, handler: &mut H) -> Result<()> {
433        // Expect RowDescription or NoData
434        self.read_next_message()?;
435        let has_rows = match self.conn.buffer_set.type_byte {
436            msg_type::ROW_DESCRIPTION => {
437                self.column_buffer.clear();
438                self.column_buffer
439                    .extend_from_slice(&self.conn.buffer_set.read_buffer);
440                true
441            }
442            msg_type::NO_DATA => {
443                NoData::parse(&self.conn.buffer_set.read_buffer)?;
444                // No rows will follow, but we still need terminal message
445                false
446            }
447            _ => {
448                return Err(Error::Protocol(format!(
449                    "expected RowDescription or NoData, got '{}'",
450                    self.conn.buffer_set.type_byte as char
451                )));
452            }
453        };
454
455        // Read data rows until terminal message
456        loop {
457            self.read_next_message()?;
458            let type_byte = self.conn.buffer_set.type_byte;
459
460            match type_byte {
461                msg_type::DATA_ROW => {
462                    if !has_rows {
463                        return Err(Error::Protocol(
464                            "received DataRow but no RowDescription".into(),
465                        ));
466                    }
467                    let cols = RowDescription::parse(&self.column_buffer)?;
468                    let row = DataRow::parse(&self.conn.buffer_set.read_buffer)?;
469                    handler.row(cols, row)?;
470                }
471                msg_type::COMMAND_COMPLETE => {
472                    let cmd = CommandComplete::parse(&self.conn.buffer_set.read_buffer)?;
473                    handler.result_end(cmd)?;
474                    return Ok(());
475                }
476                msg_type::EMPTY_QUERY_RESPONSE => {
477                    EmptyQueryResponse::parse(&self.conn.buffer_set.read_buffer)?;
478                    return Ok(());
479                }
480                _ => {
481                    return Err(Error::Protocol(format!(
482                        "unexpected message type in pipeline claim: '{}'",
483                        type_byte as char
484                    )));
485                }
486            }
487        }
488    }
489
490    /// Row reading logic with cached RowDescription (no server message expected).
491    fn claim_rows_cached_inner<H: BinaryHandler>(
492        &mut self,
493        handler: &mut H,
494        row_desc: Option<&[u8]>,
495    ) -> Result<()> {
496        // Read data rows until terminal message
497        loop {
498            self.read_next_message()?;
499            let type_byte = self.conn.buffer_set.type_byte;
500
501            match type_byte {
502                msg_type::DATA_ROW => {
503                    let row_desc = row_desc.ok_or_else(|| {
504                        Error::Protocol("received DataRow but no RowDescription cached".into())
505                    })?;
506                    let cols = RowDescription::parse(row_desc)?;
507                    let row = DataRow::parse(&self.conn.buffer_set.read_buffer)?;
508                    handler.row(cols, row)?;
509                }
510                msg_type::COMMAND_COMPLETE => {
511                    let cmd = CommandComplete::parse(&self.conn.buffer_set.read_buffer)?;
512                    handler.result_end(cmd)?;
513                    return Ok(());
514                }
515                msg_type::EMPTY_QUERY_RESPONSE => {
516                    EmptyQueryResponse::parse(&self.conn.buffer_set.read_buffer)?;
517                    return Ok(());
518                }
519                _ => {
520                    return Err(Error::Protocol(format!(
521                        "unexpected message type in pipeline claim: '{}'",
522                        type_byte as char
523                    )));
524                }
525            }
526        }
527    }
528
529    /// Read the next message, skipping async messages and handling errors.
530    fn read_next_message(&mut self) -> Result<()> {
531        loop {
532            self.conn.stream.read_message(&mut self.conn.buffer_set)?;
533            let type_byte = self.conn.buffer_set.type_byte;
534
535            // Handle async messages
536            if RawMessage::is_async_type(type_byte) {
537                continue;
538            }
539
540            // Handle error
541            if type_byte == msg_type::ERROR_RESPONSE {
542                let error = ErrorResponse::parse(&self.conn.buffer_set.read_buffer)?;
543                return Err(error.into_error());
544            }
545
546            return Ok(());
547        }
548    }
549
550    /// Create an error for unexpected message type.
551    fn unexpected_message<T>(&self, expected: &str) -> Result<T> {
552        Err(Error::Protocol(format!(
553            "expected {}, got '{}'",
554            expected, self.conn.buffer_set.type_byte as char
555        )))
556    }
557
558    /// Returns the number of operations that have been queued but not yet claimed.
559    pub fn pending_count(&self) -> usize {
560        self.queue_seq - self.claim_seq
561    }
562
563    /// Returns true if the pipeline is in aborted state due to an error.
564    pub fn is_aborted(&self) -> bool {
565        self.aborted
566    }
567}