zero_postgres/sync/
conn.rs

1//! Synchronous PostgreSQL connection.
2
3use std::net::TcpStream;
4use std::os::unix::net::UnixStream;
5
6use crate::buffer_pool::PooledBufferSet;
7use crate::conversion::ToParams;
8use crate::error::{Error, Result};
9use crate::handler::{
10    AsyncMessageHandler, BinaryHandler, DropHandler, FirstRowHandler, TextHandler,
11};
12use crate::opts::Opts;
13use crate::protocol::backend::BackendKeyData;
14use crate::protocol::frontend::write_terminate;
15use crate::protocol::types::TransactionStatus;
16use crate::state::StateMachine;
17use crate::state::action::Action;
18use crate::state::connection::ConnectionStateMachine;
19use crate::state::extended::{BindStateMachine, ExtendedQueryStateMachine, PreparedStatement};
20use crate::state::simple_query::SimpleQueryStateMachine;
21use crate::statement::IntoStatement;
22
23use super::stream::Stream;
24use super::unnamed_portal::UnnamedPortal;
25
26/// Synchronous PostgreSQL connection.
27pub struct Conn {
28    pub(crate) stream: Stream,
29    pub(crate) buffer_set: PooledBufferSet,
30    backend_key: Option<BackendKeyData>,
31    server_params: Vec<(String, String)>,
32    pub(crate) transaction_status: TransactionStatus,
33    pub(crate) is_broken: bool,
34    name_counter: u64,
35    async_message_handler: Option<Box<dyn AsyncMessageHandler>>,
36}
37
38impl Conn {
39    /// Connect to a PostgreSQL server.
40    pub fn new<O: TryInto<Opts>>(opts: O) -> Result<Self>
41    where
42        Error: From<O::Error>,
43    {
44        let opts = opts.try_into()?;
45
46        let stream = if let Some(socket_path) = &opts.socket {
47            Stream::unix(UnixStream::connect(socket_path)?)
48        } else {
49            if opts.host.is_empty() {
50                return Err(Error::InvalidUsage("host is empty".into()));
51            }
52            let addr = format!("{}:{}", opts.host, opts.port);
53            let tcp = TcpStream::connect(&addr)?;
54            tcp.set_nodelay(true)?;
55            Stream::tcp(tcp)
56        };
57
58        Self::new_with_stream(stream, opts)
59    }
60
61    /// Connect using an existing stream.
62    #[allow(unused_mut)]
63    pub fn new_with_stream(mut stream: Stream, options: Opts) -> Result<Self> {
64        let mut buffer_set = options.buffer_pool.get_buffer_set();
65        let mut state_machine = ConnectionStateMachine::new(options.clone());
66
67        // Drive the connection state machine
68        loop {
69            match state_machine.step(&mut buffer_set)? {
70                Action::WriteAndReadByte => {
71                    stream.write_all(&buffer_set.write_buffer)?;
72                    stream.flush()?;
73                    let byte = stream.read_u8()?;
74                    state_machine.set_ssl_response(byte);
75                }
76                Action::ReadMessage => {
77                    stream.read_message(&mut buffer_set)?;
78                }
79                Action::Write => {
80                    stream.write_all(&buffer_set.write_buffer)?;
81                    stream.flush()?;
82                }
83                Action::WriteAndReadMessage => {
84                    stream.write_all(&buffer_set.write_buffer)?;
85                    stream.flush()?;
86                    stream.read_message(&mut buffer_set)?;
87                }
88                Action::TlsHandshake => {
89                    #[cfg(feature = "sync-tls")]
90                    {
91                        stream = stream.upgrade_to_tls(&options.host)?;
92                    }
93                    #[cfg(not(feature = "sync-tls"))]
94                    {
95                        return Err(Error::Unsupported(
96                            "TLS requested but sync-tls feature not enabled".into(),
97                        ));
98                    }
99                }
100                Action::HandleAsyncMessageAndReadMessage(_) => {
101                    // Ignore async messages during startup, read next message
102                    stream.read_message(&mut buffer_set)?;
103                }
104                Action::Finished => break,
105            }
106        }
107
108        let conn = Self {
109            stream,
110            buffer_set,
111            backend_key: state_machine.backend_key().cloned(),
112            server_params: state_machine.take_server_params(),
113            transaction_status: state_machine.transaction_status(),
114            is_broken: false,
115            name_counter: 0,
116            async_message_handler: None,
117        };
118
119        // Upgrade to Unix socket if connected via TCP to loopback
120        let conn = if options.prefer_unix_socket && conn.stream.is_tcp_loopback() {
121            conn.try_upgrade_to_unix_socket(&options)
122        } else {
123            conn
124        };
125
126        Ok(conn)
127    }
128
129    /// Try to upgrade to Unix socket connection.
130    /// Returns upgraded conn on success, original conn on failure.
131    fn try_upgrade_to_unix_socket(mut self, opts: &Opts) -> Self {
132        // Query unix_socket_directories from server
133        let mut handler = FirstRowHandler::<(String,)>::new();
134        if self
135            .query("SHOW unix_socket_directories", &mut handler)
136            .is_err()
137        {
138            return self;
139        }
140
141        let socket_dir = match handler.into_row() {
142            Some((dirs,)) => {
143                // May contain multiple directories, use the first one
144                match dirs.split(',').next() {
145                    Some(d) if !d.trim().is_empty() => d.trim().to_string(),
146                    _ => return self,
147                }
148            }
149            None => return self,
150        };
151
152        // Build socket path: {directory}/.s.PGSQL.{port}
153        let socket_path = format!("{}/.s.PGSQL.{}", socket_dir, opts.port);
154
155        // Connect via Unix socket
156        let unix_stream = match UnixStream::connect(&socket_path) {
157            Ok(s) => s,
158            Err(_) => return self,
159        };
160
161        // Create new connection over Unix socket
162        let mut opts_unix = opts.clone();
163        opts_unix.prefer_unix_socket = false;
164
165        match Self::new_with_stream(Stream::unix(unix_stream), opts_unix) {
166            Ok(new_conn) => new_conn,
167            Err(_) => self,
168        }
169    }
170
171    /// Get the backend key data for query cancellation.
172    pub fn backend_key(&self) -> Option<&BackendKeyData> {
173        self.backend_key.as_ref()
174    }
175
176    /// Get the connection ID (backend process ID).
177    ///
178    /// Returns 0 if the backend key data is not available.
179    pub fn connection_id(&self) -> u32 {
180        self.backend_key.as_ref().map_or(0, |k| k.process_id())
181    }
182
183    /// Get server parameters.
184    pub fn server_params(&self) -> &[(String, String)] {
185        &self.server_params
186    }
187
188    /// Get the current transaction status.
189    pub fn transaction_status(&self) -> TransactionStatus {
190        self.transaction_status
191    }
192
193    /// Check if currently in a transaction.
194    pub fn in_transaction(&self) -> bool {
195        self.transaction_status.in_transaction()
196    }
197
198    /// Check if the connection is broken.
199    pub fn is_broken(&self) -> bool {
200        self.is_broken
201    }
202
203    /// Generate the next unique portal name.
204    pub(crate) fn next_portal_name(&mut self) -> String {
205        self.name_counter += 1;
206        format!("_zero_p_{}", self.name_counter)
207    }
208
209    /// Create a named portal by binding a statement.
210    ///
211    /// Used internally by Transaction::exec_portal.
212    pub(crate) fn create_named_portal<S: IntoStatement, P: ToParams>(
213        &mut self,
214        portal_name: &str,
215        statement: &S,
216        params: &P,
217    ) -> Result<()> {
218        // Create bind state machine for named portal
219        let mut state_machine = if let Some(sql) = statement.as_sql() {
220            BindStateMachine::bind_sql(&mut self.buffer_set, portal_name, sql, params)?
221        } else {
222            let stmt = statement.as_prepared().unwrap();
223            BindStateMachine::bind_prepared(
224                &mut self.buffer_set,
225                portal_name,
226                &stmt.wire_name(),
227                &stmt.param_oids,
228                params,
229            )?
230        };
231
232        // Drive the state machine to completion (ParseComplete + BindComplete)
233        loop {
234            match state_machine.step(&mut self.buffer_set)? {
235                Action::ReadMessage => {
236                    self.stream.read_message(&mut self.buffer_set)?;
237                }
238                Action::Write => {
239                    self.stream.write_all(&self.buffer_set.write_buffer)?;
240                    self.stream.flush()?;
241                }
242                Action::WriteAndReadMessage => {
243                    self.stream.write_all(&self.buffer_set.write_buffer)?;
244                    self.stream.flush()?;
245                    self.stream.read_message(&mut self.buffer_set)?;
246                }
247                Action::Finished => break,
248                _ => return Err(Error::Protocol("Unexpected action in bind".into())),
249            }
250        }
251
252        Ok(())
253    }
254
255    /// Set the async message handler.
256    ///
257    /// The handler is called when the server sends asynchronous messages:
258    /// - `Notification` - from LISTEN/NOTIFY
259    /// - `Notice` - warnings and informational messages
260    /// - `ParameterChanged` - server parameter updates
261    pub fn set_async_message_handler<H: AsyncMessageHandler + 'static>(&mut self, handler: H) {
262        self.async_message_handler = Some(Box::new(handler));
263    }
264
265    /// Remove the async message handler.
266    pub fn clear_async_message_handler(&mut self) {
267        self.async_message_handler = None;
268    }
269
270    /// Run a pipeline of batched queries.
271    ///
272    /// This provides automatic cleanup of the pipeline on exit, ensuring
273    /// the connection is left in a valid state even if the closure fails.
274    ///
275    /// # Example
276    ///
277    /// ```ignore
278    /// let stmt = conn.prepare("INSERT INTO users (name) VALUES ($1) RETURNING id")?;
279    ///
280    /// let (id1, id2) = conn.run_pipeline(|p| {
281    ///     let t1 = p.exec(&stmt, ("alice",))?;
282    ///     let t2 = p.exec(&stmt, ("bob",))?;
283    ///     p.sync()?;
284    ///
285    ///     let id1: Option<(i32,)> = p.claim_one(t1)?;
286    ///     let id2: Option<(i32,)> = p.claim_one(t2)?;
287    ///     Ok((id1, id2))
288    /// })?;
289    /// ```
290    pub fn run_pipeline<T, F>(&mut self, f: F) -> Result<T>
291    where
292        F: FnOnce(&mut super::pipeline::Pipeline<'_>) -> Result<T>,
293    {
294        let mut pipeline = super::pipeline::Pipeline::new_inner(self);
295        let result = f(&mut pipeline);
296        pipeline.cleanup();
297        result
298    }
299
300    /// Ping the server with an empty query to check connection aliveness.
301    pub fn ping(&mut self) -> Result<()> {
302        self.query_drop("")?;
303        Ok(())
304    }
305
306    /// Drive a state machine to completion.
307    fn drive<S: StateMachine>(&mut self, state_machine: &mut S) -> Result<()> {
308        loop {
309            match state_machine.step(&mut self.buffer_set)? {
310                Action::WriteAndReadByte => {
311                    return Err(Error::Protocol(
312                        "Unexpected WriteAndReadByte in query state machine".into(),
313                    ));
314                }
315                Action::ReadMessage => {
316                    self.stream.read_message(&mut self.buffer_set)?;
317                }
318                Action::Write => {
319                    self.stream.write_all(&self.buffer_set.write_buffer)?;
320                    self.stream.flush()?;
321                }
322                Action::WriteAndReadMessage => {
323                    self.stream.write_all(&self.buffer_set.write_buffer)?;
324                    self.stream.flush()?;
325                    self.stream.read_message(&mut self.buffer_set)?;
326                }
327                Action::TlsHandshake => {
328                    return Err(Error::Protocol(
329                        "Unexpected TlsHandshake in query state machine".into(),
330                    ));
331                }
332                Action::HandleAsyncMessageAndReadMessage(ref async_msg) => {
333                    if let Some(ref mut h) = self.async_message_handler {
334                        h.handle(async_msg);
335                    }
336                    // Read next message after handling async message
337                    self.stream.read_message(&mut self.buffer_set)?;
338                }
339                Action::Finished => {
340                    self.transaction_status = state_machine.transaction_status();
341                    break;
342                }
343            }
344        }
345        Ok(())
346    }
347
348    /// Execute a simple query with a handler.
349    pub fn query<H: TextHandler>(&mut self, sql: &str, handler: &mut H) -> Result<()> {
350        let result = self.query_inner(sql, handler);
351        if let Err(e) = &result
352            && e.is_connection_broken()
353        {
354            self.is_broken = true;
355        }
356        result
357    }
358
359    fn query_inner<H: TextHandler>(&mut self, sql: &str, handler: &mut H) -> Result<()> {
360        let mut state_machine = SimpleQueryStateMachine::new(handler, sql);
361        self.drive(&mut state_machine)
362    }
363
364    /// Execute a simple query and discard results.
365    pub fn query_drop(&mut self, sql: &str) -> Result<Option<u64>> {
366        let mut handler = DropHandler::new();
367        self.query(sql, &mut handler)?;
368        Ok(handler.rows_affected())
369    }
370
371    /// Execute a simple query and collect typed rows.
372    ///
373    /// # Example
374    ///
375    /// ```ignore
376    /// let rows: Vec<(i32, String)> = conn.query_typed("SELECT id, name FROM users")?;
377    /// for (id, name) in rows {
378    ///     println!("{}: {}", id, name);
379    /// }
380    /// ```
381    pub fn query_collect<T: for<'a> crate::conversion::FromRow<'a>>(
382        &mut self,
383        sql: &str,
384    ) -> Result<Vec<T>> {
385        let mut handler = crate::handler::CollectHandler::<T>::new();
386        self.query(sql, &mut handler)?;
387        Ok(handler.into_rows())
388    }
389
390    /// Execute a simple query and return the first typed row.
391    pub fn query_first<T: for<'a> crate::conversion::FromRow<'a>>(
392        &mut self,
393        sql: &str,
394    ) -> Result<Option<T>> {
395        let mut handler = crate::handler::FirstRowHandler::<T>::new();
396        self.query(sql, &mut handler)?;
397        Ok(handler.into_row())
398    }
399
400    /// Close the connection gracefully.
401    pub fn close(mut self) -> Result<()> {
402        self.buffer_set.write_buffer.clear();
403        write_terminate(&mut self.buffer_set.write_buffer);
404        self.stream.write_all(&self.buffer_set.write_buffer)?;
405        self.stream.flush()?;
406        Ok(())
407    }
408
409    // === Extended Query Protocol ===
410
411    /// Prepare a statement using the extended query protocol.
412    pub fn prepare(&mut self, query: &str) -> Result<PreparedStatement> {
413        self.prepare_typed(query, &[])
414    }
415
416    /// Prepare multiple statements in a single round-trip.
417    ///
418    /// This is more efficient than calling `prepare()` multiple times when you
419    /// need to prepare several statements, as it batches the network communication.
420    ///
421    /// # Example
422    ///
423    /// ```ignore
424    /// let stmts = conn.prepare_batch(&[
425    ///     "SELECT id, name FROM users WHERE id = $1",
426    ///     "INSERT INTO users (name) VALUES ($1) RETURNING id",
427    ///     "UPDATE users SET name = $1 WHERE id = $2",
428    /// ])?;
429    ///
430    /// // Use stmts[0], stmts[1], stmts[2]...
431    /// ```
432    pub fn prepare_batch(&mut self, queries: &[&str]) -> Result<Vec<PreparedStatement>> {
433        if queries.is_empty() {
434            return Ok(Vec::new());
435        }
436
437        let start_idx = self.name_counter + 1;
438        self.name_counter += queries.len() as u64;
439
440        let result = self.prepare_batch_inner(queries, start_idx);
441        if let Err(e) = &result
442            && e.is_connection_broken()
443        {
444            self.is_broken = true;
445        }
446        result
447    }
448
449    fn prepare_batch_inner(
450        &mut self,
451        queries: &[&str],
452        start_idx: u64,
453    ) -> Result<Vec<PreparedStatement>> {
454        use crate::state::batch_prepare::BatchPrepareStateMachine;
455
456        let mut state_machine =
457            BatchPrepareStateMachine::new(&mut self.buffer_set, queries, start_idx);
458
459        loop {
460            match state_machine.step(&mut self.buffer_set)? {
461                Action::ReadMessage => {
462                    self.stream.read_message(&mut self.buffer_set)?;
463                }
464                Action::WriteAndReadMessage => {
465                    self.stream.write_all(&self.buffer_set.write_buffer)?;
466                    self.stream.flush()?;
467                    self.stream.read_message(&mut self.buffer_set)?;
468                }
469                Action::Finished => {
470                    self.transaction_status = state_machine.transaction_status();
471                    break;
472                }
473                _ => return Err(Error::Protocol("Unexpected action in batch prepare".into())),
474            }
475        }
476
477        Ok(state_machine.take_statements())
478    }
479
480    /// Prepare a statement with explicit parameter types.
481    pub fn prepare_typed(&mut self, query: &str, param_oids: &[u32]) -> Result<PreparedStatement> {
482        self.name_counter += 1;
483        let idx = self.name_counter;
484        let result = self.prepare_inner(idx, query, param_oids);
485        if let Err(e) = &result
486            && e.is_connection_broken()
487        {
488            self.is_broken = true;
489        }
490        result
491    }
492
493    fn prepare_inner(
494        &mut self,
495        idx: u64,
496        query: &str,
497        param_oids: &[u32],
498    ) -> Result<PreparedStatement> {
499        let mut handler = DropHandler::new();
500        let mut state_machine = ExtendedQueryStateMachine::prepare(
501            &mut handler,
502            &mut self.buffer_set,
503            idx,
504            query,
505            param_oids,
506        );
507        self.drive(&mut state_machine)?;
508        state_machine
509            .take_prepared_statement()
510            .ok_or_else(|| Error::Protocol("No prepared statement".into()))
511    }
512
513    /// Execute a statement with a handler.
514    ///
515    /// The statement can be either:
516    /// - A `&PreparedStatement` returned from `prepare()`
517    /// - A raw SQL `&str` for one-shot execution
518    ///
519    /// # Examples
520    ///
521    /// ```ignore
522    /// // Using prepared statement
523    /// let stmt = conn.prepare("SELECT $1::int")?;
524    /// conn.exec(&stmt, (42,), &mut handler)?;
525    ///
526    /// // Using raw SQL
527    /// conn.exec("SELECT $1::int", (42,), &mut handler)?;
528    /// ```
529    pub fn exec<S: IntoStatement, P: ToParams, H: BinaryHandler>(
530        &mut self,
531        statement: S,
532        params: P,
533        handler: &mut H,
534    ) -> Result<()> {
535        let result = self.exec_inner(&statement, &params, handler);
536        if let Err(e) = &result
537            && e.is_connection_broken()
538        {
539            self.is_broken = true;
540        }
541        result
542    }
543
544    fn exec_inner<S: IntoStatement, P: ToParams, H: BinaryHandler>(
545        &mut self,
546        statement: &S,
547        params: &P,
548        handler: &mut H,
549    ) -> Result<()> {
550        let mut state_machine = if statement.needs_parse() {
551            ExtendedQueryStateMachine::execute_sql(
552                handler,
553                &mut self.buffer_set,
554                statement.as_sql().unwrap(),
555                params,
556            )?
557        } else {
558            let stmt = statement.as_prepared().unwrap();
559            ExtendedQueryStateMachine::execute(
560                handler,
561                &mut self.buffer_set,
562                &stmt.wire_name(),
563                &stmt.param_oids,
564                params,
565            )?
566        };
567
568        self.drive(&mut state_machine)
569    }
570
571    /// Execute a statement and discard results.
572    ///
573    /// The statement can be either a `&PreparedStatement` or a raw SQL `&str`.
574    pub fn exec_drop<S: IntoStatement, P: ToParams>(
575        &mut self,
576        statement: S,
577        params: P,
578    ) -> Result<Option<u64>> {
579        let mut handler = DropHandler::new();
580        self.exec(statement, params, &mut handler)?;
581        Ok(handler.rows_affected())
582    }
583
584    /// Execute a statement and collect typed rows.
585    ///
586    /// The statement can be either a `&PreparedStatement` or a raw SQL `&str`.
587    ///
588    /// # Example
589    ///
590    /// ```ignore
591    /// let stmt = conn.prepare("SELECT id, name FROM users WHERE id = $1")?;
592    /// let rows: Vec<(i32, String)> = conn.exec_collect(&stmt, (42,))?;
593    ///
594    /// // Or with raw SQL:
595    /// let rows: Vec<(i32, String)> = conn.exec_collect("SELECT id, name FROM users", ())?;
596    /// ```
597    pub fn exec_collect<
598        T: for<'a> crate::conversion::FromRow<'a>,
599        S: IntoStatement,
600        P: ToParams,
601    >(
602        &mut self,
603        statement: S,
604        params: P,
605    ) -> Result<Vec<T>> {
606        let mut handler = crate::handler::CollectHandler::<T>::new();
607        self.exec(statement, params, &mut handler)?;
608        Ok(handler.into_rows())
609    }
610
611    /// Execute a statement and return the first typed row.
612    ///
613    /// The statement can be either a `&PreparedStatement` or a raw SQL `&str`.
614    ///
615    /// # Example
616    ///
617    /// ```ignore
618    /// let stmt = conn.prepare("SELECT id, name FROM users WHERE id = $1")?;
619    /// let row: Option<(i32, String)> = conn.exec_first(&stmt, (42,))?;
620    ///
621    /// // Or with raw SQL:
622    /// let row: Option<(i32, String)> = conn.exec_first("SELECT id, name FROM users LIMIT 1", ())?;
623    /// ```
624    pub fn exec_first<T: for<'a> crate::conversion::FromRow<'a>, S: IntoStatement, P: ToParams>(
625        &mut self,
626        statement: S,
627        params: P,
628    ) -> Result<Option<T>> {
629        let mut handler = crate::handler::FirstRowHandler::<T>::new();
630        self.exec(statement, params, &mut handler)?;
631        Ok(handler.into_row())
632    }
633
634    /// Execute a statement with multiple parameter sets in a batch.
635    ///
636    /// This is more efficient than calling `exec_drop` multiple times as it
637    /// batches the network communication. The statement is parsed once (if raw SQL)
638    /// and then bound/executed for each parameter set.
639    ///
640    /// Parameters are processed in chunks (default 1000) to avoid overwhelming
641    /// the server with too many pending operations.
642    ///
643    /// The statement can be either:
644    /// - A `&PreparedStatement` returned from `prepare()`
645    /// - A raw SQL `&str` for one-shot execution
646    ///
647    /// # Example
648    ///
649    /// ```ignore
650    /// // Using prepared statement
651    /// let stmt = conn.prepare("INSERT INTO users (name, age) VALUES ($1, $2)")?;
652    /// conn.exec_batch(&stmt, &[
653    ///     ("alice", 30),
654    ///     ("bob", 25),
655    ///     ("charlie", 35),
656    /// ])?;
657    ///
658    /// // Using raw SQL
659    /// conn.exec_batch("INSERT INTO users (name, age) VALUES ($1, $2)", &[
660    ///     ("alice", 30),
661    ///     ("bob", 25),
662    /// ])?;
663    /// ```
664    pub fn exec_batch<S: IntoStatement, P: ToParams>(
665        &mut self,
666        statement: S,
667        params_list: &[P],
668    ) -> Result<()> {
669        self.exec_batch_chunked(statement, params_list, 1000)
670    }
671
672    /// Execute a statement with multiple parameter sets in a batch with custom chunk size.
673    ///
674    /// Same as `exec_batch` but allows specifying the chunk size for batching.
675    pub fn exec_batch_chunked<S: IntoStatement, P: ToParams>(
676        &mut self,
677        statement: S,
678        params_list: &[P],
679        chunk_size: usize,
680    ) -> Result<()> {
681        let result = self.exec_batch_inner(&statement, params_list, chunk_size);
682        if let Err(e) = &result
683            && e.is_connection_broken()
684        {
685            self.is_broken = true;
686        }
687        result
688    }
689
690    fn exec_batch_inner<S: IntoStatement, P: ToParams>(
691        &mut self,
692        statement: &S,
693        params_list: &[P],
694        chunk_size: usize,
695    ) -> Result<()> {
696        use crate::protocol::frontend::{write_bind, write_execute, write_parse, write_sync};
697        use crate::state::extended::BatchStateMachine;
698
699        if params_list.is_empty() {
700            return Ok(());
701        }
702
703        let chunk_size = chunk_size.max(1);
704        let needs_parse = statement.needs_parse();
705        let sql = statement.as_sql();
706        let prepared = statement.as_prepared();
707
708        // Get param OIDs from first params or prepared statement
709        let param_oids: Vec<u32> = if let Some(stmt) = prepared {
710            stmt.param_oids.clone()
711        } else {
712            params_list[0].natural_oids()
713        };
714
715        // Statement name: empty for raw SQL, actual name for prepared
716        let stmt_name = prepared.map(|s| s.wire_name()).unwrap_or_default();
717
718        for chunk in params_list.chunks(chunk_size) {
719            self.buffer_set.write_buffer.clear();
720
721            // For raw SQL, send Parse each chunk (reuses unnamed statement)
722            let parse_in_chunk = needs_parse;
723            if parse_in_chunk {
724                write_parse(
725                    &mut self.buffer_set.write_buffer,
726                    "",
727                    sql.unwrap(),
728                    &param_oids,
729                );
730            }
731
732            // Write Bind + Execute for each param set
733            for params in chunk {
734                let effective_stmt_name = if needs_parse { "" } else { &stmt_name };
735                write_bind(
736                    &mut self.buffer_set.write_buffer,
737                    "",
738                    effective_stmt_name,
739                    params,
740                    &param_oids,
741                )?;
742                write_execute(&mut self.buffer_set.write_buffer, "", 0);
743            }
744
745            // Send Sync
746            write_sync(&mut self.buffer_set.write_buffer);
747
748            // Drive state machine
749            let mut state_machine = BatchStateMachine::new(parse_in_chunk);
750            self.drive_batch(&mut state_machine)?;
751            self.transaction_status = state_machine.transaction_status();
752        }
753
754        Ok(())
755    }
756
757    /// Drive a batch state machine to completion.
758    fn drive_batch(
759        &mut self,
760        state_machine: &mut crate::state::extended::BatchStateMachine,
761    ) -> Result<()> {
762        use crate::protocol::backend::{ReadyForQuery, msg_type};
763        use crate::state::action::Action;
764
765        loop {
766            let step_result = state_machine.step(&mut self.buffer_set);
767            match step_result {
768                Ok(Action::ReadMessage) => {
769                    self.stream.read_message(&mut self.buffer_set)?;
770                }
771                Ok(Action::WriteAndReadMessage) => {
772                    self.stream.write_all(&self.buffer_set.write_buffer)?;
773                    self.stream.flush()?;
774                    self.stream.read_message(&mut self.buffer_set)?;
775                }
776                Ok(Action::Finished) => {
777                    break;
778                }
779                Ok(_) => return Err(Error::Protocol("Unexpected action in batch".into())),
780                Err(e) => {
781                    // On error, drain to ReadyForQuery to leave connection in clean state
782                    loop {
783                        self.stream.read_message(&mut self.buffer_set)?;
784                        if self.buffer_set.type_byte == msg_type::READY_FOR_QUERY {
785                            let ready = ReadyForQuery::parse(&self.buffer_set.read_buffer)?;
786                            self.transaction_status =
787                                ready.transaction_status().unwrap_or_default();
788                            break;
789                        }
790                    }
791                    return Err(e);
792                }
793            }
794        }
795        Ok(())
796    }
797
798    /// Close a prepared statement.
799    pub fn close_statement(&mut self, stmt: &PreparedStatement) -> Result<()> {
800        let result = self.close_statement_inner(&stmt.wire_name());
801        if let Err(e) = &result
802            && e.is_connection_broken()
803        {
804            self.is_broken = true;
805        }
806        result
807    }
808
809    fn close_statement_inner(&mut self, name: &str) -> Result<()> {
810        let mut handler = DropHandler::new();
811        let mut state_machine =
812            ExtendedQueryStateMachine::close_statement(&mut handler, &mut self.buffer_set, name);
813        self.drive(&mut state_machine)
814    }
815
816    /// Execute a closure within a transaction.
817    ///
818    /// If the closure returns `Ok`, the transaction is committed.
819    /// If the closure returns `Err` or the transaction is not explicitly
820    /// committed or rolled back, the transaction is rolled back.
821    ///
822    /// # Errors
823    ///
824    /// Returns `Error::InvalidUsage` if called while already in a transaction.
825    pub fn tx<F, R>(&mut self, f: F) -> Result<R>
826    where
827        F: FnOnce(&mut Conn, super::transaction::Transaction) -> Result<R>,
828    {
829        if self.in_transaction() {
830            return Err(Error::InvalidUsage(
831                "nested transactions are not supported".into(),
832            ));
833        }
834
835        self.query_drop("BEGIN")?;
836
837        let tx = super::transaction::Transaction::new(self.connection_id());
838        let result = f(self, tx);
839
840        // If still in a transaction (not committed or rolled back), roll it back
841        if self.in_transaction() {
842            let rollback_result = self.query_drop("ROLLBACK");
843
844            // Return the first error (either from closure or rollback)
845            if let Err(e) = result {
846                return Err(e);
847            }
848            rollback_result?;
849        }
850
851        result
852    }
853}
854
855// === Low-level Extended Query Protocol ===
856
857impl Conn {
858    /// Low-level bind: send BIND message and receive BindComplete.
859    ///
860    /// This allows creating named portals. Unlike `exec()`, this does NOT
861    /// send EXECUTE or SYNC - the caller controls when to execute and sync.
862    ///
863    /// # Arguments
864    /// - `portal`: Portal name (empty string "" for unnamed portal)
865    /// - `statement_name`: Prepared statement name
866    /// - `params`: Parameter values
867    pub fn lowlevel_bind<P: ToParams>(
868        &mut self,
869        portal: &str,
870        statement_name: &str,
871        params: P,
872    ) -> Result<()> {
873        let result = self.lowlevel_bind_inner(portal, statement_name, &params);
874        if let Err(e) = &result
875            && e.is_connection_broken()
876        {
877            self.is_broken = true;
878        }
879        result
880    }
881
882    fn lowlevel_bind_inner<P: ToParams>(
883        &mut self,
884        portal: &str,
885        statement_name: &str,
886        params: &P,
887    ) -> Result<()> {
888        use crate::protocol::backend::{BindComplete, ErrorResponse, RawMessage, msg_type};
889        use crate::protocol::frontend::{write_bind, write_flush};
890
891        let param_oids = params.natural_oids();
892        self.buffer_set.write_buffer.clear();
893        write_bind(
894            &mut self.buffer_set.write_buffer,
895            portal,
896            statement_name,
897            params,
898            &param_oids,
899        )?;
900        write_flush(&mut self.buffer_set.write_buffer);
901
902        self.stream.write_all(&self.buffer_set.write_buffer)?;
903        self.stream.flush()?;
904
905        loop {
906            self.stream.read_message(&mut self.buffer_set)?;
907            let type_byte = self.buffer_set.type_byte;
908
909            if RawMessage::is_async_type(type_byte) {
910                continue;
911            }
912
913            match type_byte {
914                msg_type::BIND_COMPLETE => {
915                    BindComplete::parse(&self.buffer_set.read_buffer)?;
916                    return Ok(());
917                }
918                msg_type::ERROR_RESPONSE => {
919                    let error = ErrorResponse::parse(&self.buffer_set.read_buffer)?;
920                    return Err(error.into_error());
921                }
922                _ => {
923                    return Err(Error::Protocol(format!(
924                        "Expected BindComplete or ErrorResponse, got '{}'",
925                        type_byte as char
926                    )));
927                }
928            }
929        }
930    }
931
932    /// Low-level execute: send EXECUTE message and receive results.
933    ///
934    /// Executes a previously bound portal. Does NOT send SYNC.
935    ///
936    /// # Arguments
937    /// - `portal`: Portal name (empty string "" for unnamed portal)
938    /// - `max_rows`: Maximum rows to return (0 = unlimited)
939    /// - `handler`: Handler to receive rows
940    ///
941    /// # Returns
942    /// - `Ok(true)` if more rows available (PortalSuspended received)
943    /// - `Ok(false)` if execution completed (CommandComplete received)
944    pub fn lowlevel_execute<H: BinaryHandler>(
945        &mut self,
946        portal: &str,
947        max_rows: u32,
948        handler: &mut H,
949    ) -> Result<bool> {
950        let result = self.lowlevel_execute_inner(portal, max_rows, handler);
951        if let Err(e) = &result
952            && e.is_connection_broken()
953        {
954            self.is_broken = true;
955        }
956        result
957    }
958
959    fn lowlevel_execute_inner<H: BinaryHandler>(
960        &mut self,
961        portal: &str,
962        max_rows: u32,
963        handler: &mut H,
964    ) -> Result<bool> {
965        use crate::protocol::backend::{
966            CommandComplete, DataRow, ErrorResponse, NoData, PortalSuspended, RawMessage,
967            RowDescription, msg_type,
968        };
969        use crate::protocol::frontend::{write_describe_portal, write_execute, write_flush};
970
971        self.buffer_set.write_buffer.clear();
972        write_describe_portal(&mut self.buffer_set.write_buffer, portal);
973        write_execute(&mut self.buffer_set.write_buffer, portal, max_rows);
974        write_flush(&mut self.buffer_set.write_buffer);
975
976        self.stream.write_all(&self.buffer_set.write_buffer)?;
977        self.stream.flush()?;
978
979        let mut column_buffer: Vec<u8> = Vec::new();
980
981        loop {
982            self.stream.read_message(&mut self.buffer_set)?;
983            let type_byte = self.buffer_set.type_byte;
984
985            if RawMessage::is_async_type(type_byte) {
986                continue;
987            }
988
989            match type_byte {
990                msg_type::ROW_DESCRIPTION => {
991                    column_buffer.clear();
992                    column_buffer.extend_from_slice(&self.buffer_set.read_buffer);
993                    let cols = RowDescription::parse(&column_buffer)?;
994                    handler.result_start(cols)?;
995                }
996                msg_type::NO_DATA => {
997                    NoData::parse(&self.buffer_set.read_buffer)?;
998                }
999                msg_type::DATA_ROW => {
1000                    let cols = RowDescription::parse(&column_buffer)?;
1001                    let row = DataRow::parse(&self.buffer_set.read_buffer)?;
1002                    handler.row(cols, row)?;
1003                }
1004                msg_type::COMMAND_COMPLETE => {
1005                    let complete = CommandComplete::parse(&self.buffer_set.read_buffer)?;
1006                    handler.result_end(complete)?;
1007                    return Ok(false); // No more rows
1008                }
1009                msg_type::PORTAL_SUSPENDED => {
1010                    PortalSuspended::parse(&self.buffer_set.read_buffer)?;
1011                    return Ok(true); // More rows available
1012                }
1013                msg_type::ERROR_RESPONSE => {
1014                    let error = ErrorResponse::parse(&self.buffer_set.read_buffer)?;
1015                    return Err(error.into_error());
1016                }
1017                _ => {
1018                    return Err(Error::Protocol(format!(
1019                        "Unexpected message in execute: '{}'",
1020                        type_byte as char
1021                    )));
1022                }
1023            }
1024        }
1025    }
1026
1027    /// Low-level sync: send SYNC and receive ReadyForQuery.
1028    ///
1029    /// This ends an extended query sequence and:
1030    /// - Commits implicit transaction if successful
1031    /// - Rolls back implicit transaction if failed
1032    /// - Updates transaction status
1033    pub fn lowlevel_sync(&mut self) -> Result<()> {
1034        let result = self.lowlevel_sync_inner();
1035        if let Err(e) = &result
1036            && e.is_connection_broken()
1037        {
1038            self.is_broken = true;
1039        }
1040        result
1041    }
1042
1043    fn lowlevel_sync_inner(&mut self) -> Result<()> {
1044        use crate::protocol::backend::{ErrorResponse, RawMessage, ReadyForQuery, msg_type};
1045        use crate::protocol::frontend::write_sync;
1046
1047        self.buffer_set.write_buffer.clear();
1048        write_sync(&mut self.buffer_set.write_buffer);
1049
1050        self.stream.write_all(&self.buffer_set.write_buffer)?;
1051        self.stream.flush()?;
1052
1053        let mut pending_error: Option<Error> = None;
1054
1055        loop {
1056            self.stream.read_message(&mut self.buffer_set)?;
1057            let type_byte = self.buffer_set.type_byte;
1058
1059            if RawMessage::is_async_type(type_byte) {
1060                continue;
1061            }
1062
1063            match type_byte {
1064                msg_type::READY_FOR_QUERY => {
1065                    let ready = ReadyForQuery::parse(&self.buffer_set.read_buffer)?;
1066                    self.transaction_status = ready.transaction_status().unwrap_or_default();
1067                    if let Some(e) = pending_error {
1068                        return Err(e);
1069                    }
1070                    return Ok(());
1071                }
1072                msg_type::ERROR_RESPONSE => {
1073                    let error = ErrorResponse::parse(&self.buffer_set.read_buffer)?;
1074                    pending_error = Some(error.into_error());
1075                }
1076                _ => {
1077                    // Ignore other messages before ReadyForQuery
1078                }
1079            }
1080        }
1081    }
1082
1083    /// Low-level flush: send FLUSH to force server to send pending responses.
1084    ///
1085    /// Unlike SYNC, FLUSH does not end the transaction or wait for ReadyForQuery.
1086    /// It just forces the server to send any pending responses (like ParseComplete,
1087    /// BindComplete, RowDescription, DataRow, etc.) without ending the extended
1088    /// query sequence.
1089    pub fn lowlevel_flush(&mut self) -> Result<()> {
1090        use crate::protocol::frontend::write_flush;
1091
1092        self.buffer_set.write_buffer.clear();
1093        write_flush(&mut self.buffer_set.write_buffer);
1094
1095        self.stream.write_all(&self.buffer_set.write_buffer)?;
1096        self.stream.flush()?;
1097        Ok(())
1098    }
1099
1100    /// Execute a statement with iterative row fetching.
1101    ///
1102    /// Creates an unnamed portal and passes it to the closure. The closure can
1103    /// call `portal.fetch(n, handler)` multiple times to retrieve rows in batches.
1104    /// Sync is called after the closure returns to end the implicit transaction.
1105    ///
1106    /// The statement can be either:
1107    /// - A `&PreparedStatement` returned from `prepare()`
1108    /// - A raw SQL `&str` for one-shot execution
1109    ///
1110    /// # Example
1111    /// ```ignore
1112    /// // Using prepared statement
1113    /// let stmt = conn.prepare("SELECT * FROM users")?;
1114    /// conn.exec_iter(&stmt, (), |portal| {
1115    ///     while portal.fetch(100, &mut handler)? {
1116    ///         // process handler.into_rows()...
1117    ///     }
1118    ///     Ok(())
1119    /// })?;
1120    ///
1121    /// // Using raw SQL
1122    /// conn.exec_iter("SELECT * FROM users", (), |portal| {
1123    ///     while portal.fetch(100, &mut handler)? {
1124    ///         // process handler.into_rows()...
1125    ///     }
1126    ///     Ok(())
1127    /// })?;
1128    /// ```
1129    pub fn exec_iter<S: IntoStatement, P, F, T>(
1130        &mut self,
1131        statement: S,
1132        params: P,
1133        f: F,
1134    ) -> Result<T>
1135    where
1136        P: ToParams,
1137        F: FnOnce(&mut UnnamedPortal<'_>) -> Result<T>,
1138    {
1139        let result = self.exec_iter_inner(&statement, &params, f);
1140        if let Err(e) = &result
1141            && e.is_connection_broken()
1142        {
1143            self.is_broken = true;
1144        }
1145        result
1146    }
1147
1148    fn exec_iter_inner<S: IntoStatement, P, F, T>(
1149        &mut self,
1150        statement: &S,
1151        params: &P,
1152        f: F,
1153    ) -> Result<T>
1154    where
1155        P: ToParams,
1156        F: FnOnce(&mut UnnamedPortal<'_>) -> Result<T>,
1157    {
1158        // Create bind state machine for unnamed portal
1159        let mut state_machine = if let Some(sql) = statement.as_sql() {
1160            BindStateMachine::bind_sql(&mut self.buffer_set, "", sql, params)?
1161        } else {
1162            let stmt = statement.as_prepared().unwrap();
1163            BindStateMachine::bind_prepared(
1164                &mut self.buffer_set,
1165                "",
1166                &stmt.wire_name(),
1167                &stmt.param_oids,
1168                params,
1169            )?
1170        };
1171
1172        // Drive the state machine to completion (ParseComplete + BindComplete)
1173        loop {
1174            match state_machine.step(&mut self.buffer_set)? {
1175                Action::ReadMessage => {
1176                    self.stream.read_message(&mut self.buffer_set)?;
1177                }
1178                Action::Write => {
1179                    self.stream.write_all(&self.buffer_set.write_buffer)?;
1180                    self.stream.flush()?;
1181                }
1182                Action::WriteAndReadMessage => {
1183                    self.stream.write_all(&self.buffer_set.write_buffer)?;
1184                    self.stream.flush()?;
1185                    self.stream.read_message(&mut self.buffer_set)?;
1186                }
1187                Action::Finished => break,
1188                _ => return Err(Error::Protocol("Unexpected action in bind".into())),
1189            }
1190        }
1191
1192        // Execute closure with portal handle
1193        let mut portal = UnnamedPortal { conn: self };
1194        let result = f(&mut portal);
1195
1196        // Always sync to end implicit transaction (even on error)
1197        let sync_result = portal.conn.lowlevel_sync();
1198
1199        // Return closure result, or sync error if closure succeeded but sync failed
1200        match (result, sync_result) {
1201            (Ok(v), Ok(())) => Ok(v),
1202            (Err(e), _) => Err(e),
1203            (Ok(_), Err(e)) => Err(e),
1204        }
1205    }
1206
1207    /// Low-level close portal: send Close(Portal) and receive CloseComplete.
1208    pub fn lowlevel_close_portal(&mut self, portal: &str) -> Result<()> {
1209        let result = self.lowlevel_close_portal_inner(portal);
1210        if let Err(e) = &result
1211            && e.is_connection_broken()
1212        {
1213            self.is_broken = true;
1214        }
1215        result
1216    }
1217
1218    fn lowlevel_close_portal_inner(&mut self, portal: &str) -> Result<()> {
1219        use crate::protocol::backend::{CloseComplete, ErrorResponse, RawMessage, msg_type};
1220        use crate::protocol::frontend::{write_close_portal, write_flush};
1221
1222        self.buffer_set.write_buffer.clear();
1223        write_close_portal(&mut self.buffer_set.write_buffer, portal);
1224        write_flush(&mut self.buffer_set.write_buffer);
1225
1226        self.stream.write_all(&self.buffer_set.write_buffer)?;
1227        self.stream.flush()?;
1228
1229        loop {
1230            self.stream.read_message(&mut self.buffer_set)?;
1231            let type_byte = self.buffer_set.type_byte;
1232
1233            if RawMessage::is_async_type(type_byte) {
1234                continue;
1235            }
1236
1237            match type_byte {
1238                msg_type::CLOSE_COMPLETE => {
1239                    CloseComplete::parse(&self.buffer_set.read_buffer)?;
1240                    return Ok(());
1241                }
1242                msg_type::ERROR_RESPONSE => {
1243                    let error = ErrorResponse::parse(&self.buffer_set.read_buffer)?;
1244                    return Err(error.into_error());
1245                }
1246                _ => {
1247                    return Err(Error::Protocol(format!(
1248                        "Expected CloseComplete or ErrorResponse, got '{}'",
1249                        type_byte as char
1250                    )));
1251                }
1252            }
1253        }
1254    }
1255}
1256
1257impl Drop for Conn {
1258    fn drop(&mut self) {
1259        // Try to send Terminate message, ignore errors
1260        self.buffer_set.write_buffer.clear();
1261        write_terminate(&mut self.buffer_set.write_buffer);
1262        let _ = self.stream.write_all(&self.buffer_set.write_buffer);
1263        let _ = self.stream.flush();
1264    }
1265}