zero_postgres/sync/
conn.rs

1//! Synchronous PostgreSQL connection.
2
3use std::net::TcpStream;
4#[cfg(unix)]
5use std::os::unix::net::UnixStream;
6
7use crate::buffer_pool::PooledBufferSet;
8use crate::conversion::ToParams;
9use crate::error::{Error, Result};
10use crate::handler::{
11    AsyncMessageHandler, BinaryHandler, DropHandler, FirstRowHandler, TextHandler,
12};
13use crate::opts::Opts;
14use crate::protocol::backend::BackendKeyData;
15use crate::protocol::frontend::write_terminate;
16use crate::protocol::types::TransactionStatus;
17use crate::state::StateMachine;
18use crate::state::action::Action;
19use crate::state::connection::ConnectionStateMachine;
20use crate::state::extended::{BindStateMachine, ExtendedQueryStateMachine, PreparedStatement};
21use crate::state::simple_query::SimpleQueryStateMachine;
22use crate::statement::IntoStatement;
23
24use super::stream::Stream;
25use super::unnamed_portal::UnnamedPortal;
26
27/// Synchronous PostgreSQL connection.
28pub struct Conn {
29    pub(crate) stream: Stream,
30    pub(crate) buffer_set: PooledBufferSet,
31    backend_key: Option<BackendKeyData>,
32    server_params: Vec<(String, String)>,
33    pub(crate) transaction_status: TransactionStatus,
34    pub(crate) is_broken: bool,
35    name_counter: u64,
36    async_message_handler: Option<Box<dyn AsyncMessageHandler>>,
37}
38
39impl Conn {
40    /// Connect to a PostgreSQL server.
41    pub fn new<O: TryInto<Opts>>(opts: O) -> Result<Self>
42    where
43        Error: From<O::Error>,
44    {
45        let opts = opts.try_into()?;
46
47        let stream = if let Some(socket_path) = &opts.socket {
48            #[cfg(unix)]
49            {
50                Stream::unix(UnixStream::connect(socket_path)?)
51            }
52            #[cfg(not(unix))]
53            {
54                let _ = socket_path;
55                return Err(Error::Unsupported(
56                    "Unix sockets are not supported on this platform".into(),
57                ));
58            }
59        } else {
60            if opts.host.is_empty() {
61                return Err(Error::InvalidUsage("host is empty".into()));
62            }
63            let addr = format!("{}:{}", opts.host, opts.port);
64            let tcp = TcpStream::connect(&addr)?;
65            tcp.set_nodelay(true)?;
66            Stream::tcp(tcp)
67        };
68
69        Self::new_with_stream(stream, opts)
70    }
71
72    /// Connect using an existing stream.
73    #[allow(unused_mut)]
74    pub fn new_with_stream(mut stream: Stream, options: Opts) -> Result<Self> {
75        let mut buffer_set = options.buffer_pool.get_buffer_set();
76        let mut state_machine = ConnectionStateMachine::new(options.clone());
77
78        // Drive the connection state machine
79        loop {
80            match state_machine.step(&mut buffer_set)? {
81                Action::WriteAndReadByte => {
82                    stream.write_all(&buffer_set.write_buffer)?;
83                    stream.flush()?;
84                    let byte = stream.read_u8()?;
85                    state_machine.set_ssl_response(byte);
86                }
87                Action::ReadMessage => {
88                    stream.read_message(&mut buffer_set)?;
89                }
90                Action::Write => {
91                    stream.write_all(&buffer_set.write_buffer)?;
92                    stream.flush()?;
93                }
94                Action::WriteAndReadMessage => {
95                    stream.write_all(&buffer_set.write_buffer)?;
96                    stream.flush()?;
97                    stream.read_message(&mut buffer_set)?;
98                }
99                Action::TlsHandshake => {
100                    #[cfg(feature = "sync-tls")]
101                    {
102                        stream = stream.upgrade_to_tls(&options.host)?;
103                    }
104                    #[cfg(not(feature = "sync-tls"))]
105                    {
106                        return Err(Error::Unsupported(
107                            "TLS requested but sync-tls feature not enabled".into(),
108                        ));
109                    }
110                }
111                Action::HandleAsyncMessageAndReadMessage(_) => {
112                    // Ignore async messages during startup, read next message
113                    stream.read_message(&mut buffer_set)?;
114                }
115                Action::Finished => break,
116            }
117        }
118
119        let conn = Self {
120            stream,
121            buffer_set,
122            backend_key: state_machine.backend_key().cloned(),
123            server_params: state_machine.take_server_params(),
124            transaction_status: state_machine.transaction_status(),
125            is_broken: false,
126            name_counter: 0,
127            async_message_handler: None,
128        };
129
130        // Upgrade to Unix socket if connected via TCP to loopback
131        #[cfg(unix)]
132        let conn = if options.upgrade_to_unix_socket && conn.stream.is_tcp_loopback() {
133            conn.try_upgrade_to_unix_socket(&options)
134        } else {
135            conn
136        };
137
138        Ok(conn)
139    }
140
141    /// Try to upgrade to Unix socket connection.
142    /// Returns upgraded conn on success, original conn on failure.
143    #[cfg(unix)]
144    fn try_upgrade_to_unix_socket(mut self, opts: &Opts) -> Self {
145        // Query unix_socket_directories from server
146        let mut handler = FirstRowHandler::<(String,)>::new();
147        if self
148            .query("SHOW unix_socket_directories", &mut handler)
149            .is_err()
150        {
151            return self;
152        }
153
154        let socket_dir = match handler.into_row() {
155            Some((dirs,)) => {
156                // May contain multiple directories, use the first one
157                match dirs.split(',').next() {
158                    Some(d) if !d.trim().is_empty() => d.trim().to_string(),
159                    _ => return self,
160                }
161            }
162            None => return self,
163        };
164
165        // Build socket path: {directory}/.s.PGSQL.{port}
166        let socket_path = format!("{}/.s.PGSQL.{}", socket_dir, opts.port);
167
168        // Connect via Unix socket
169        let unix_stream = match UnixStream::connect(&socket_path) {
170            Ok(s) => s,
171            Err(_) => return self,
172        };
173
174        // Create new connection over Unix socket
175        let mut opts_unix = opts.clone();
176        opts_unix.upgrade_to_unix_socket = false;
177
178        match Self::new_with_stream(Stream::unix(unix_stream), opts_unix) {
179            Ok(new_conn) => new_conn,
180            Err(_) => self,
181        }
182    }
183
184    /// Get the backend key data for query cancellation.
185    pub fn backend_key(&self) -> Option<&BackendKeyData> {
186        self.backend_key.as_ref()
187    }
188
189    /// Get the connection ID (backend process ID).
190    ///
191    /// Returns 0 if the backend key data is not available.
192    pub fn connection_id(&self) -> u32 {
193        self.backend_key.as_ref().map_or(0, |k| k.process_id())
194    }
195
196    /// Get server parameters.
197    pub fn server_params(&self) -> &[(String, String)] {
198        &self.server_params
199    }
200
201    /// Get the current transaction status.
202    pub fn transaction_status(&self) -> TransactionStatus {
203        self.transaction_status
204    }
205
206    /// Check if currently in a transaction.
207    pub fn in_transaction(&self) -> bool {
208        self.transaction_status.in_transaction()
209    }
210
211    /// Check if the connection is broken.
212    pub fn is_broken(&self) -> bool {
213        self.is_broken
214    }
215
216    /// Generate the next unique portal name.
217    pub(crate) fn next_portal_name(&mut self) -> String {
218        self.name_counter += 1;
219        format!("_zero_p_{}", self.name_counter)
220    }
221
222    /// Create a named portal by binding a statement.
223    ///
224    /// Used internally by Transaction::exec_portal.
225    pub(crate) fn create_named_portal<S: IntoStatement, P: ToParams>(
226        &mut self,
227        portal_name: &str,
228        statement: &S,
229        params: &P,
230    ) -> Result<()> {
231        // Create bind state machine for named portal
232        let mut state_machine = if let Some(sql) = statement.as_sql() {
233            BindStateMachine::bind_sql(&mut self.buffer_set, portal_name, sql, params)?
234        } else {
235            let stmt = statement.as_prepared().unwrap();
236            BindStateMachine::bind_prepared(
237                &mut self.buffer_set,
238                portal_name,
239                &stmt.wire_name(),
240                &stmt.param_oids,
241                params,
242            )?
243        };
244
245        // Drive the state machine to completion (ParseComplete + BindComplete)
246        loop {
247            match state_machine.step(&mut self.buffer_set)? {
248                Action::ReadMessage => {
249                    self.stream.read_message(&mut self.buffer_set)?;
250                }
251                Action::Write => {
252                    self.stream.write_all(&self.buffer_set.write_buffer)?;
253                    self.stream.flush()?;
254                }
255                Action::WriteAndReadMessage => {
256                    self.stream.write_all(&self.buffer_set.write_buffer)?;
257                    self.stream.flush()?;
258                    self.stream.read_message(&mut self.buffer_set)?;
259                }
260                Action::Finished => break,
261                _ => return Err(Error::Protocol("Unexpected action in bind".into())),
262            }
263        }
264
265        Ok(())
266    }
267
268    /// Set the async message handler.
269    ///
270    /// The handler is called when the server sends asynchronous messages:
271    /// - `Notification` - from LISTEN/NOTIFY
272    /// - `Notice` - warnings and informational messages
273    /// - `ParameterChanged` - server parameter updates
274    pub fn set_async_message_handler<H: AsyncMessageHandler + 'static>(&mut self, handler: H) {
275        self.async_message_handler = Some(Box::new(handler));
276    }
277
278    /// Remove the async message handler.
279    pub fn clear_async_message_handler(&mut self) {
280        self.async_message_handler = None;
281    }
282
283    /// Run a pipeline of batched queries.
284    ///
285    /// This provides automatic cleanup of the pipeline on exit, ensuring
286    /// the connection is left in a valid state even if the closure fails.
287    ///
288    /// # Example
289    ///
290    /// ```ignore
291    /// let stmt = conn.prepare("INSERT INTO users (name) VALUES ($1) RETURNING id")?;
292    ///
293    /// let (id1, id2) = conn.run_pipeline(|p| {
294    ///     let t1 = p.exec(&stmt, ("alice",))?;
295    ///     let t2 = p.exec(&stmt, ("bob",))?;
296    ///     p.sync()?;
297    ///
298    ///     let id1: Option<(i32,)> = p.claim_one(t1)?;
299    ///     let id2: Option<(i32,)> = p.claim_one(t2)?;
300    ///     Ok((id1, id2))
301    /// })?;
302    /// ```
303    pub fn run_pipeline<T, F>(&mut self, f: F) -> Result<T>
304    where
305        F: FnOnce(&mut super::pipeline::Pipeline<'_>) -> Result<T>,
306    {
307        let mut pipeline = super::pipeline::Pipeline::new_inner(self);
308        let result = f(&mut pipeline);
309        pipeline.cleanup();
310        result
311    }
312
313    /// Ping the server with an empty query to check connection aliveness.
314    pub fn ping(&mut self) -> Result<()> {
315        self.query_drop("")?;
316        Ok(())
317    }
318
319    /// Drive a state machine to completion.
320    fn drive<S: StateMachine>(&mut self, state_machine: &mut S) -> Result<()> {
321        loop {
322            match state_machine.step(&mut self.buffer_set)? {
323                Action::WriteAndReadByte => {
324                    return Err(Error::Protocol(
325                        "Unexpected WriteAndReadByte in query state machine".into(),
326                    ));
327                }
328                Action::ReadMessage => {
329                    self.stream.read_message(&mut self.buffer_set)?;
330                }
331                Action::Write => {
332                    self.stream.write_all(&self.buffer_set.write_buffer)?;
333                    self.stream.flush()?;
334                }
335                Action::WriteAndReadMessage => {
336                    self.stream.write_all(&self.buffer_set.write_buffer)?;
337                    self.stream.flush()?;
338                    self.stream.read_message(&mut self.buffer_set)?;
339                }
340                Action::TlsHandshake => {
341                    return Err(Error::Protocol(
342                        "Unexpected TlsHandshake in query state machine".into(),
343                    ));
344                }
345                Action::HandleAsyncMessageAndReadMessage(ref async_msg) => {
346                    if let Some(ref mut h) = self.async_message_handler {
347                        h.handle(async_msg);
348                    }
349                    // Read next message after handling async message
350                    self.stream.read_message(&mut self.buffer_set)?;
351                }
352                Action::Finished => {
353                    self.transaction_status = state_machine.transaction_status();
354                    break;
355                }
356            }
357        }
358        Ok(())
359    }
360
361    /// Execute a simple query with a handler.
362    pub fn query<H: TextHandler>(&mut self, sql: &str, handler: &mut H) -> Result<()> {
363        let result = self.query_inner(sql, handler);
364        if let Err(e) = &result
365            && e.is_connection_broken()
366        {
367            self.is_broken = true;
368        }
369        result
370    }
371
372    fn query_inner<H: TextHandler>(&mut self, sql: &str, handler: &mut H) -> Result<()> {
373        let mut state_machine = SimpleQueryStateMachine::new(handler, sql);
374        self.drive(&mut state_machine)
375    }
376
377    /// Execute a simple query and discard results.
378    pub fn query_drop(&mut self, sql: &str) -> Result<Option<u64>> {
379        let mut handler = DropHandler::new();
380        self.query(sql, &mut handler)?;
381        Ok(handler.rows_affected())
382    }
383
384    /// Execute a simple query and collect typed rows.
385    ///
386    /// # Example
387    ///
388    /// ```ignore
389    /// let rows: Vec<(i32, String)> = conn.query_typed("SELECT id, name FROM users")?;
390    /// for (id, name) in rows {
391    ///     println!("{}: {}", id, name);
392    /// }
393    /// ```
394    pub fn query_collect<T: for<'a> crate::conversion::FromRow<'a>>(
395        &mut self,
396        sql: &str,
397    ) -> Result<Vec<T>> {
398        let mut handler = crate::handler::CollectHandler::<T>::new();
399        self.query(sql, &mut handler)?;
400        Ok(handler.into_rows())
401    }
402
403    /// Execute a simple query and return the first typed row.
404    pub fn query_first<T: for<'a> crate::conversion::FromRow<'a>>(
405        &mut self,
406        sql: &str,
407    ) -> Result<Option<T>> {
408        let mut handler = crate::handler::FirstRowHandler::<T>::new();
409        self.query(sql, &mut handler)?;
410        Ok(handler.into_row())
411    }
412
413    /// Close the connection gracefully.
414    pub fn close(mut self) -> Result<()> {
415        self.buffer_set.write_buffer.clear();
416        write_terminate(&mut self.buffer_set.write_buffer);
417        self.stream.write_all(&self.buffer_set.write_buffer)?;
418        self.stream.flush()?;
419        Ok(())
420    }
421
422    // === Extended Query Protocol ===
423
424    /// Prepare a statement using the extended query protocol.
425    pub fn prepare(&mut self, query: &str) -> Result<PreparedStatement> {
426        self.prepare_typed(query, &[])
427    }
428
429    /// Prepare multiple statements in a single round-trip.
430    ///
431    /// This is more efficient than calling `prepare()` multiple times when you
432    /// need to prepare several statements, as it batches the network communication.
433    ///
434    /// # Example
435    ///
436    /// ```ignore
437    /// let stmts = conn.prepare_batch(&[
438    ///     "SELECT id, name FROM users WHERE id = $1",
439    ///     "INSERT INTO users (name) VALUES ($1) RETURNING id",
440    ///     "UPDATE users SET name = $1 WHERE id = $2",
441    /// ])?;
442    ///
443    /// // Use stmts[0], stmts[1], stmts[2]...
444    /// ```
445    pub fn prepare_batch(&mut self, queries: &[&str]) -> Result<Vec<PreparedStatement>> {
446        if queries.is_empty() {
447            return Ok(Vec::new());
448        }
449
450        let start_idx = self.name_counter + 1;
451        self.name_counter += queries.len() as u64;
452
453        let result = self.prepare_batch_inner(queries, start_idx);
454        if let Err(e) = &result
455            && e.is_connection_broken()
456        {
457            self.is_broken = true;
458        }
459        result
460    }
461
462    fn prepare_batch_inner(
463        &mut self,
464        queries: &[&str],
465        start_idx: u64,
466    ) -> Result<Vec<PreparedStatement>> {
467        use crate::state::batch_prepare::BatchPrepareStateMachine;
468
469        let mut state_machine =
470            BatchPrepareStateMachine::new(&mut self.buffer_set, queries, start_idx);
471
472        loop {
473            match state_machine.step(&mut self.buffer_set)? {
474                Action::ReadMessage => {
475                    self.stream.read_message(&mut self.buffer_set)?;
476                }
477                Action::WriteAndReadMessage => {
478                    self.stream.write_all(&self.buffer_set.write_buffer)?;
479                    self.stream.flush()?;
480                    self.stream.read_message(&mut self.buffer_set)?;
481                }
482                Action::Finished => {
483                    self.transaction_status = state_machine.transaction_status();
484                    break;
485                }
486                _ => return Err(Error::Protocol("Unexpected action in batch prepare".into())),
487            }
488        }
489
490        Ok(state_machine.take_statements())
491    }
492
493    /// Prepare a statement with explicit parameter types.
494    pub fn prepare_typed(&mut self, query: &str, param_oids: &[u32]) -> Result<PreparedStatement> {
495        self.name_counter += 1;
496        let idx = self.name_counter;
497        let result = self.prepare_inner(idx, query, param_oids);
498        if let Err(e) = &result
499            && e.is_connection_broken()
500        {
501            self.is_broken = true;
502        }
503        result
504    }
505
506    fn prepare_inner(
507        &mut self,
508        idx: u64,
509        query: &str,
510        param_oids: &[u32],
511    ) -> Result<PreparedStatement> {
512        let mut handler = DropHandler::new();
513        let mut state_machine = ExtendedQueryStateMachine::prepare(
514            &mut handler,
515            &mut self.buffer_set,
516            idx,
517            query,
518            param_oids,
519        );
520        self.drive(&mut state_machine)?;
521        state_machine
522            .take_prepared_statement()
523            .ok_or_else(|| Error::Protocol("No prepared statement".into()))
524    }
525
526    /// Execute a statement with a handler.
527    ///
528    /// The statement can be either:
529    /// - A `&PreparedStatement` returned from `prepare()`
530    /// - A raw SQL `&str` for one-shot execution
531    ///
532    /// # Examples
533    ///
534    /// ```ignore
535    /// // Using prepared statement
536    /// let stmt = conn.prepare("SELECT $1::int")?;
537    /// conn.exec(&stmt, (42,), &mut handler)?;
538    ///
539    /// // Using raw SQL
540    /// conn.exec("SELECT $1::int", (42,), &mut handler)?;
541    /// ```
542    pub fn exec<S: IntoStatement, P: ToParams, H: BinaryHandler>(
543        &mut self,
544        statement: S,
545        params: P,
546        handler: &mut H,
547    ) -> Result<()> {
548        let result = self.exec_inner(&statement, &params, handler);
549        if let Err(e) = &result
550            && e.is_connection_broken()
551        {
552            self.is_broken = true;
553        }
554        result
555    }
556
557    fn exec_inner<S: IntoStatement, P: ToParams, H: BinaryHandler>(
558        &mut self,
559        statement: &S,
560        params: &P,
561        handler: &mut H,
562    ) -> Result<()> {
563        let mut state_machine = if statement.needs_parse() {
564            ExtendedQueryStateMachine::execute_sql(
565                handler,
566                &mut self.buffer_set,
567                statement.as_sql().unwrap(),
568                params,
569            )?
570        } else {
571            let stmt = statement.as_prepared().unwrap();
572            ExtendedQueryStateMachine::execute(
573                handler,
574                &mut self.buffer_set,
575                &stmt.wire_name(),
576                &stmt.param_oids,
577                params,
578            )?
579        };
580
581        self.drive(&mut state_machine)
582    }
583
584    /// Execute a statement and discard results.
585    ///
586    /// The statement can be either a `&PreparedStatement` or a raw SQL `&str`.
587    pub fn exec_drop<S: IntoStatement, P: ToParams>(
588        &mut self,
589        statement: S,
590        params: P,
591    ) -> Result<Option<u64>> {
592        let mut handler = DropHandler::new();
593        self.exec(statement, params, &mut handler)?;
594        Ok(handler.rows_affected())
595    }
596
597    /// Execute a statement and collect typed rows.
598    ///
599    /// The statement can be either a `&PreparedStatement` or a raw SQL `&str`.
600    ///
601    /// # Example
602    ///
603    /// ```ignore
604    /// let stmt = conn.prepare("SELECT id, name FROM users WHERE id = $1")?;
605    /// let rows: Vec<(i32, String)> = conn.exec_collect(&stmt, (42,))?;
606    ///
607    /// // Or with raw SQL:
608    /// let rows: Vec<(i32, String)> = conn.exec_collect("SELECT id, name FROM users", ())?;
609    /// ```
610    pub fn exec_collect<
611        T: for<'a> crate::conversion::FromRow<'a>,
612        S: IntoStatement,
613        P: ToParams,
614    >(
615        &mut self,
616        statement: S,
617        params: P,
618    ) -> Result<Vec<T>> {
619        let mut handler = crate::handler::CollectHandler::<T>::new();
620        self.exec(statement, params, &mut handler)?;
621        Ok(handler.into_rows())
622    }
623
624    /// Execute a statement and return the first typed row.
625    ///
626    /// The statement can be either a `&PreparedStatement` or a raw SQL `&str`.
627    ///
628    /// # Example
629    ///
630    /// ```ignore
631    /// let stmt = conn.prepare("SELECT id, name FROM users WHERE id = $1")?;
632    /// let row: Option<(i32, String)> = conn.exec_first(&stmt, (42,))?;
633    ///
634    /// // Or with raw SQL:
635    /// let row: Option<(i32, String)> = conn.exec_first("SELECT id, name FROM users LIMIT 1", ())?;
636    /// ```
637    pub fn exec_first<T: for<'a> crate::conversion::FromRow<'a>, S: IntoStatement, P: ToParams>(
638        &mut self,
639        statement: S,
640        params: P,
641    ) -> Result<Option<T>> {
642        let mut handler = crate::handler::FirstRowHandler::<T>::new();
643        self.exec(statement, params, &mut handler)?;
644        Ok(handler.into_row())
645    }
646
647    /// Execute a statement with multiple parameter sets in a batch.
648    ///
649    /// This is more efficient than calling `exec_drop` multiple times as it
650    /// batches the network communication. The statement is parsed once (if raw SQL)
651    /// and then bound/executed for each parameter set.
652    ///
653    /// Parameters are processed in chunks (default 1000) to avoid overwhelming
654    /// the server with too many pending operations.
655    ///
656    /// The statement can be either:
657    /// - A `&PreparedStatement` returned from `prepare()`
658    /// - A raw SQL `&str` for one-shot execution
659    ///
660    /// # Example
661    ///
662    /// ```ignore
663    /// // Using prepared statement
664    /// let stmt = conn.prepare("INSERT INTO users (name, age) VALUES ($1, $2)")?;
665    /// conn.exec_batch(&stmt, &[
666    ///     ("alice", 30),
667    ///     ("bob", 25),
668    ///     ("charlie", 35),
669    /// ])?;
670    ///
671    /// // Using raw SQL
672    /// conn.exec_batch("INSERT INTO users (name, age) VALUES ($1, $2)", &[
673    ///     ("alice", 30),
674    ///     ("bob", 25),
675    /// ])?;
676    /// ```
677    pub fn exec_batch<S: IntoStatement, P: ToParams>(
678        &mut self,
679        statement: S,
680        params_list: &[P],
681    ) -> Result<()> {
682        self.exec_batch_chunked(statement, params_list, 1000)
683    }
684
685    /// Execute a statement with multiple parameter sets in a batch with custom chunk size.
686    ///
687    /// Same as `exec_batch` but allows specifying the chunk size for batching.
688    pub fn exec_batch_chunked<S: IntoStatement, P: ToParams>(
689        &mut self,
690        statement: S,
691        params_list: &[P],
692        chunk_size: usize,
693    ) -> Result<()> {
694        let result = self.exec_batch_inner(&statement, params_list, chunk_size);
695        if let Err(e) = &result
696            && e.is_connection_broken()
697        {
698            self.is_broken = true;
699        }
700        result
701    }
702
703    fn exec_batch_inner<S: IntoStatement, P: ToParams>(
704        &mut self,
705        statement: &S,
706        params_list: &[P],
707        chunk_size: usize,
708    ) -> Result<()> {
709        use crate::protocol::frontend::{write_bind, write_execute, write_parse, write_sync};
710        use crate::state::extended::BatchStateMachine;
711
712        if params_list.is_empty() {
713            return Ok(());
714        }
715
716        let chunk_size = chunk_size.max(1);
717        let needs_parse = statement.needs_parse();
718        let sql = statement.as_sql();
719        let prepared = statement.as_prepared();
720
721        // Get param OIDs from first params or prepared statement
722        let param_oids: Vec<u32> = if let Some(stmt) = prepared {
723            stmt.param_oids.clone()
724        } else {
725            params_list[0].natural_oids()
726        };
727
728        // Statement name: empty for raw SQL, actual name for prepared
729        let stmt_name = prepared.map(|s| s.wire_name()).unwrap_or_default();
730
731        for chunk in params_list.chunks(chunk_size) {
732            self.buffer_set.write_buffer.clear();
733
734            // For raw SQL, send Parse each chunk (reuses unnamed statement)
735            let parse_in_chunk = needs_parse;
736            if parse_in_chunk {
737                write_parse(
738                    &mut self.buffer_set.write_buffer,
739                    "",
740                    sql.unwrap(),
741                    &param_oids,
742                );
743            }
744
745            // Write Bind + Execute for each param set
746            for params in chunk {
747                let effective_stmt_name = if needs_parse { "" } else { &stmt_name };
748                write_bind(
749                    &mut self.buffer_set.write_buffer,
750                    "",
751                    effective_stmt_name,
752                    params,
753                    &param_oids,
754                )?;
755                write_execute(&mut self.buffer_set.write_buffer, "", 0);
756            }
757
758            // Send Sync
759            write_sync(&mut self.buffer_set.write_buffer);
760
761            // Drive state machine
762            let mut state_machine = BatchStateMachine::new(parse_in_chunk);
763            self.drive_batch(&mut state_machine)?;
764            self.transaction_status = state_machine.transaction_status();
765        }
766
767        Ok(())
768    }
769
770    /// Drive a batch state machine to completion.
771    fn drive_batch(
772        &mut self,
773        state_machine: &mut crate::state::extended::BatchStateMachine,
774    ) -> Result<()> {
775        use crate::protocol::backend::{ReadyForQuery, msg_type};
776        use crate::state::action::Action;
777
778        loop {
779            let step_result = state_machine.step(&mut self.buffer_set);
780            match step_result {
781                Ok(Action::ReadMessage) => {
782                    self.stream.read_message(&mut self.buffer_set)?;
783                }
784                Ok(Action::WriteAndReadMessage) => {
785                    self.stream.write_all(&self.buffer_set.write_buffer)?;
786                    self.stream.flush()?;
787                    self.stream.read_message(&mut self.buffer_set)?;
788                }
789                Ok(Action::Finished) => {
790                    break;
791                }
792                Ok(_) => return Err(Error::Protocol("Unexpected action in batch".into())),
793                Err(e) => {
794                    // On error, drain to ReadyForQuery to leave connection in clean state
795                    loop {
796                        self.stream.read_message(&mut self.buffer_set)?;
797                        if self.buffer_set.type_byte == msg_type::READY_FOR_QUERY {
798                            let ready = ReadyForQuery::parse(&self.buffer_set.read_buffer)?;
799                            self.transaction_status =
800                                ready.transaction_status().unwrap_or_default();
801                            break;
802                        }
803                    }
804                    return Err(e);
805                }
806            }
807        }
808        Ok(())
809    }
810
811    /// Close a prepared statement.
812    pub fn close_statement(&mut self, stmt: &PreparedStatement) -> Result<()> {
813        let result = self.close_statement_inner(&stmt.wire_name());
814        if let Err(e) = &result
815            && e.is_connection_broken()
816        {
817            self.is_broken = true;
818        }
819        result
820    }
821
822    fn close_statement_inner(&mut self, name: &str) -> Result<()> {
823        let mut handler = DropHandler::new();
824        let mut state_machine =
825            ExtendedQueryStateMachine::close_statement(&mut handler, &mut self.buffer_set, name);
826        self.drive(&mut state_machine)
827    }
828
829    /// Execute a closure within a transaction.
830    ///
831    /// If the closure returns `Ok`, the transaction is committed.
832    /// If the closure returns `Err` or the transaction is not explicitly
833    /// committed or rolled back, the transaction is rolled back.
834    ///
835    /// # Errors
836    ///
837    /// Returns `Error::InvalidUsage` if called while already in a transaction.
838    pub fn tx<F, R>(&mut self, f: F) -> Result<R>
839    where
840        F: FnOnce(&mut Conn, super::transaction::Transaction) -> Result<R>,
841    {
842        if self.in_transaction() {
843            return Err(Error::InvalidUsage(
844                "nested transactions are not supported".into(),
845            ));
846        }
847
848        self.query_drop("BEGIN")?;
849
850        let tx = super::transaction::Transaction::new(self.connection_id());
851        let result = f(self, tx);
852
853        // If still in a transaction (not committed or rolled back), roll it back
854        if self.in_transaction() {
855            let rollback_result = self.query_drop("ROLLBACK");
856
857            // Return the first error (either from closure or rollback)
858            if let Err(e) = result {
859                return Err(e);
860            }
861            rollback_result?;
862        }
863
864        result
865    }
866}
867
868// === Low-level Extended Query Protocol ===
869
870impl Conn {
871    /// Low-level bind: send BIND message and receive BindComplete.
872    ///
873    /// This allows creating named portals. Unlike `exec()`, this does NOT
874    /// send EXECUTE or SYNC - the caller controls when to execute and sync.
875    ///
876    /// # Arguments
877    /// - `portal`: Portal name (empty string "" for unnamed portal)
878    /// - `statement_name`: Prepared statement name
879    /// - `params`: Parameter values
880    pub fn lowlevel_bind<P: ToParams>(
881        &mut self,
882        portal: &str,
883        statement_name: &str,
884        params: P,
885    ) -> Result<()> {
886        let result = self.lowlevel_bind_inner(portal, statement_name, &params);
887        if let Err(e) = &result
888            && e.is_connection_broken()
889        {
890            self.is_broken = true;
891        }
892        result
893    }
894
895    fn lowlevel_bind_inner<P: ToParams>(
896        &mut self,
897        portal: &str,
898        statement_name: &str,
899        params: &P,
900    ) -> Result<()> {
901        use crate::protocol::backend::{BindComplete, ErrorResponse, RawMessage, msg_type};
902        use crate::protocol::frontend::{write_bind, write_flush};
903
904        let param_oids = params.natural_oids();
905        self.buffer_set.write_buffer.clear();
906        write_bind(
907            &mut self.buffer_set.write_buffer,
908            portal,
909            statement_name,
910            params,
911            &param_oids,
912        )?;
913        write_flush(&mut self.buffer_set.write_buffer);
914
915        self.stream.write_all(&self.buffer_set.write_buffer)?;
916        self.stream.flush()?;
917
918        loop {
919            self.stream.read_message(&mut self.buffer_set)?;
920            let type_byte = self.buffer_set.type_byte;
921
922            if RawMessage::is_async_type(type_byte) {
923                continue;
924            }
925
926            match type_byte {
927                msg_type::BIND_COMPLETE => {
928                    BindComplete::parse(&self.buffer_set.read_buffer)?;
929                    return Ok(());
930                }
931                msg_type::ERROR_RESPONSE => {
932                    let error = ErrorResponse::parse(&self.buffer_set.read_buffer)?;
933                    return Err(error.into_error());
934                }
935                _ => {
936                    return Err(Error::Protocol(format!(
937                        "Expected BindComplete or ErrorResponse, got '{}'",
938                        type_byte as char
939                    )));
940                }
941            }
942        }
943    }
944
945    /// Low-level execute: send EXECUTE message and receive results.
946    ///
947    /// Executes a previously bound portal. Does NOT send SYNC.
948    ///
949    /// # Arguments
950    /// - `portal`: Portal name (empty string "" for unnamed portal)
951    /// - `max_rows`: Maximum rows to return (0 = unlimited)
952    /// - `handler`: Handler to receive rows
953    ///
954    /// # Returns
955    /// - `Ok(true)` if more rows available (PortalSuspended received)
956    /// - `Ok(false)` if execution completed (CommandComplete received)
957    pub fn lowlevel_execute<H: BinaryHandler>(
958        &mut self,
959        portal: &str,
960        max_rows: u32,
961        handler: &mut H,
962    ) -> Result<bool> {
963        let result = self.lowlevel_execute_inner(portal, max_rows, handler);
964        if let Err(e) = &result
965            && e.is_connection_broken()
966        {
967            self.is_broken = true;
968        }
969        result
970    }
971
972    fn lowlevel_execute_inner<H: BinaryHandler>(
973        &mut self,
974        portal: &str,
975        max_rows: u32,
976        handler: &mut H,
977    ) -> Result<bool> {
978        use crate::protocol::backend::{
979            CommandComplete, DataRow, ErrorResponse, NoData, PortalSuspended, RawMessage,
980            RowDescription, msg_type,
981        };
982        use crate::protocol::frontend::{write_describe_portal, write_execute, write_flush};
983
984        self.buffer_set.write_buffer.clear();
985        write_describe_portal(&mut self.buffer_set.write_buffer, portal);
986        write_execute(&mut self.buffer_set.write_buffer, portal, max_rows);
987        write_flush(&mut self.buffer_set.write_buffer);
988
989        self.stream.write_all(&self.buffer_set.write_buffer)?;
990        self.stream.flush()?;
991
992        let mut column_buffer: Vec<u8> = Vec::new();
993
994        loop {
995            self.stream.read_message(&mut self.buffer_set)?;
996            let type_byte = self.buffer_set.type_byte;
997
998            if RawMessage::is_async_type(type_byte) {
999                continue;
1000            }
1001
1002            match type_byte {
1003                msg_type::ROW_DESCRIPTION => {
1004                    column_buffer.clear();
1005                    column_buffer.extend_from_slice(&self.buffer_set.read_buffer);
1006                    let cols = RowDescription::parse(&column_buffer)?;
1007                    handler.result_start(cols)?;
1008                }
1009                msg_type::NO_DATA => {
1010                    NoData::parse(&self.buffer_set.read_buffer)?;
1011                }
1012                msg_type::DATA_ROW => {
1013                    let cols = RowDescription::parse(&column_buffer)?;
1014                    let row = DataRow::parse(&self.buffer_set.read_buffer)?;
1015                    handler.row(cols, row)?;
1016                }
1017                msg_type::COMMAND_COMPLETE => {
1018                    let complete = CommandComplete::parse(&self.buffer_set.read_buffer)?;
1019                    handler.result_end(complete)?;
1020                    return Ok(false); // No more rows
1021                }
1022                msg_type::PORTAL_SUSPENDED => {
1023                    PortalSuspended::parse(&self.buffer_set.read_buffer)?;
1024                    return Ok(true); // More rows available
1025                }
1026                msg_type::ERROR_RESPONSE => {
1027                    let error = ErrorResponse::parse(&self.buffer_set.read_buffer)?;
1028                    return Err(error.into_error());
1029                }
1030                _ => {
1031                    return Err(Error::Protocol(format!(
1032                        "Unexpected message in execute: '{}'",
1033                        type_byte as char
1034                    )));
1035                }
1036            }
1037        }
1038    }
1039
1040    /// Low-level sync: send SYNC and receive ReadyForQuery.
1041    ///
1042    /// This ends an extended query sequence and:
1043    /// - Commits implicit transaction if successful
1044    /// - Rolls back implicit transaction if failed
1045    /// - Updates transaction status
1046    pub fn lowlevel_sync(&mut self) -> Result<()> {
1047        let result = self.lowlevel_sync_inner();
1048        if let Err(e) = &result
1049            && e.is_connection_broken()
1050        {
1051            self.is_broken = true;
1052        }
1053        result
1054    }
1055
1056    fn lowlevel_sync_inner(&mut self) -> Result<()> {
1057        use crate::protocol::backend::{ErrorResponse, RawMessage, ReadyForQuery, msg_type};
1058        use crate::protocol::frontend::write_sync;
1059
1060        self.buffer_set.write_buffer.clear();
1061        write_sync(&mut self.buffer_set.write_buffer);
1062
1063        self.stream.write_all(&self.buffer_set.write_buffer)?;
1064        self.stream.flush()?;
1065
1066        let mut pending_error: Option<Error> = None;
1067
1068        loop {
1069            self.stream.read_message(&mut self.buffer_set)?;
1070            let type_byte = self.buffer_set.type_byte;
1071
1072            if RawMessage::is_async_type(type_byte) {
1073                continue;
1074            }
1075
1076            match type_byte {
1077                msg_type::READY_FOR_QUERY => {
1078                    let ready = ReadyForQuery::parse(&self.buffer_set.read_buffer)?;
1079                    self.transaction_status = ready.transaction_status().unwrap_or_default();
1080                    if let Some(e) = pending_error {
1081                        return Err(e);
1082                    }
1083                    return Ok(());
1084                }
1085                msg_type::ERROR_RESPONSE => {
1086                    let error = ErrorResponse::parse(&self.buffer_set.read_buffer)?;
1087                    pending_error = Some(error.into_error());
1088                }
1089                _ => {
1090                    // Ignore other messages before ReadyForQuery
1091                }
1092            }
1093        }
1094    }
1095
1096    /// Low-level flush: send FLUSH to force server to send pending responses.
1097    ///
1098    /// Unlike SYNC, FLUSH does not end the transaction or wait for ReadyForQuery.
1099    /// It just forces the server to send any pending responses (like ParseComplete,
1100    /// BindComplete, RowDescription, DataRow, etc.) without ending the extended
1101    /// query sequence.
1102    pub fn lowlevel_flush(&mut self) -> Result<()> {
1103        use crate::protocol::frontend::write_flush;
1104
1105        self.buffer_set.write_buffer.clear();
1106        write_flush(&mut self.buffer_set.write_buffer);
1107
1108        self.stream.write_all(&self.buffer_set.write_buffer)?;
1109        self.stream.flush()?;
1110        Ok(())
1111    }
1112
1113    /// Execute a statement with iterative row fetching.
1114    ///
1115    /// Creates an unnamed portal and passes it to the closure. The closure can
1116    /// call `portal.fetch(n, handler)` multiple times to retrieve rows in batches.
1117    /// Sync is called after the closure returns to end the implicit transaction.
1118    ///
1119    /// The statement can be either:
1120    /// - A `&PreparedStatement` returned from `prepare()`
1121    /// - A raw SQL `&str` for one-shot execution
1122    ///
1123    /// # Example
1124    /// ```ignore
1125    /// // Using prepared statement
1126    /// let stmt = conn.prepare("SELECT * FROM users")?;
1127    /// conn.exec_iter(&stmt, (), |portal| {
1128    ///     while portal.fetch(100, &mut handler)? {
1129    ///         // process handler.into_rows()...
1130    ///     }
1131    ///     Ok(())
1132    /// })?;
1133    ///
1134    /// // Using raw SQL
1135    /// conn.exec_iter("SELECT * FROM users", (), |portal| {
1136    ///     while portal.fetch(100, &mut handler)? {
1137    ///         // process handler.into_rows()...
1138    ///     }
1139    ///     Ok(())
1140    /// })?;
1141    /// ```
1142    pub fn exec_iter<S: IntoStatement, P, F, T>(
1143        &mut self,
1144        statement: S,
1145        params: P,
1146        f: F,
1147    ) -> Result<T>
1148    where
1149        P: ToParams,
1150        F: FnOnce(&mut UnnamedPortal<'_>) -> Result<T>,
1151    {
1152        let result = self.exec_iter_inner(&statement, &params, f);
1153        if let Err(e) = &result
1154            && e.is_connection_broken()
1155        {
1156            self.is_broken = true;
1157        }
1158        result
1159    }
1160
1161    fn exec_iter_inner<S: IntoStatement, P, F, T>(
1162        &mut self,
1163        statement: &S,
1164        params: &P,
1165        f: F,
1166    ) -> Result<T>
1167    where
1168        P: ToParams,
1169        F: FnOnce(&mut UnnamedPortal<'_>) -> Result<T>,
1170    {
1171        // Create bind state machine for unnamed portal
1172        let mut state_machine = if let Some(sql) = statement.as_sql() {
1173            BindStateMachine::bind_sql(&mut self.buffer_set, "", sql, params)?
1174        } else {
1175            let stmt = statement.as_prepared().unwrap();
1176            BindStateMachine::bind_prepared(
1177                &mut self.buffer_set,
1178                "",
1179                &stmt.wire_name(),
1180                &stmt.param_oids,
1181                params,
1182            )?
1183        };
1184
1185        // Drive the state machine to completion (ParseComplete + BindComplete)
1186        loop {
1187            match state_machine.step(&mut self.buffer_set)? {
1188                Action::ReadMessage => {
1189                    self.stream.read_message(&mut self.buffer_set)?;
1190                }
1191                Action::Write => {
1192                    self.stream.write_all(&self.buffer_set.write_buffer)?;
1193                    self.stream.flush()?;
1194                }
1195                Action::WriteAndReadMessage => {
1196                    self.stream.write_all(&self.buffer_set.write_buffer)?;
1197                    self.stream.flush()?;
1198                    self.stream.read_message(&mut self.buffer_set)?;
1199                }
1200                Action::Finished => break,
1201                _ => return Err(Error::Protocol("Unexpected action in bind".into())),
1202            }
1203        }
1204
1205        // Execute closure with portal handle
1206        let mut portal = UnnamedPortal { conn: self };
1207        let result = f(&mut portal);
1208
1209        // Always sync to end implicit transaction (even on error)
1210        let sync_result = portal.conn.lowlevel_sync();
1211
1212        // Return closure result, or sync error if closure succeeded but sync failed
1213        match (result, sync_result) {
1214            (Ok(v), Ok(())) => Ok(v),
1215            (Err(e), _) => Err(e),
1216            (Ok(_), Err(e)) => Err(e),
1217        }
1218    }
1219
1220    /// Low-level close portal: send Close(Portal) and receive CloseComplete.
1221    pub fn lowlevel_close_portal(&mut self, portal: &str) -> Result<()> {
1222        let result = self.lowlevel_close_portal_inner(portal);
1223        if let Err(e) = &result
1224            && e.is_connection_broken()
1225        {
1226            self.is_broken = true;
1227        }
1228        result
1229    }
1230
1231    fn lowlevel_close_portal_inner(&mut self, portal: &str) -> Result<()> {
1232        use crate::protocol::backend::{CloseComplete, ErrorResponse, RawMessage, msg_type};
1233        use crate::protocol::frontend::{write_close_portal, write_flush};
1234
1235        self.buffer_set.write_buffer.clear();
1236        write_close_portal(&mut self.buffer_set.write_buffer, portal);
1237        write_flush(&mut self.buffer_set.write_buffer);
1238
1239        self.stream.write_all(&self.buffer_set.write_buffer)?;
1240        self.stream.flush()?;
1241
1242        loop {
1243            self.stream.read_message(&mut self.buffer_set)?;
1244            let type_byte = self.buffer_set.type_byte;
1245
1246            if RawMessage::is_async_type(type_byte) {
1247                continue;
1248            }
1249
1250            match type_byte {
1251                msg_type::CLOSE_COMPLETE => {
1252                    CloseComplete::parse(&self.buffer_set.read_buffer)?;
1253                    return Ok(());
1254                }
1255                msg_type::ERROR_RESPONSE => {
1256                    let error = ErrorResponse::parse(&self.buffer_set.read_buffer)?;
1257                    return Err(error.into_error());
1258                }
1259                _ => {
1260                    return Err(Error::Protocol(format!(
1261                        "Expected CloseComplete or ErrorResponse, got '{}'",
1262                        type_byte as char
1263                    )));
1264                }
1265            }
1266        }
1267    }
1268}
1269
1270impl Drop for Conn {
1271    fn drop(&mut self) {
1272        // Try to send Terminate message, ignore errors
1273        self.buffer_set.write_buffer.clear();
1274        write_terminate(&mut self.buffer_set.write_buffer);
1275        let _ = self.stream.write_all(&self.buffer_set.write_buffer);
1276        let _ = self.stream.flush();
1277    }
1278}