Skip to main content

bsql_core/
pool.rs

1//! Connection pool — thin wrapper over `bsql_driver_postgres::Pool`.
2//!
3//! Delegates all connection management, fail-fast semantics, and LIFO ordering
4//! to the driver. This layer adds only the bsql error type conversions.
5
6use std::time::Duration;
7
8use bsql_driver_postgres::arena::acquire_arena;
9use bsql_driver_postgres::codec::Encode;
10
11use crate::error::{BsqlError, BsqlResult};
12use crate::stream::QueryStream;
13use crate::transaction::Transaction;
14
15/// A row of text values from a raw (unvalidated) SQL query.
16///
17/// All values are strings — PostgreSQL's simple query protocol returns
18/// everything as text. Use [`get`](RawRow::get) to access columns by index.
19#[derive(Debug, Clone)]
20pub struct RawRow(Vec<Option<String>>);
21
22impl RawRow {
23    /// Get a column value by index. Returns `None` for SQL NULL.
24    pub fn get(&self, idx: usize) -> Option<&str> {
25        self.0.get(idx)?.as_deref()
26    }
27
28    /// Number of columns.
29    pub fn len(&self) -> usize {
30        self.0.len()
31    }
32
33    /// Whether the row has no columns.
34    pub fn is_empty(&self) -> bool {
35        self.0.is_empty()
36    }
37
38    /// Iterate over column values.
39    pub fn iter(&self) -> impl Iterator<Item = Option<&str>> {
40        self.0.iter().map(|v| v.as_deref())
41    }
42}
43
44/// Convert a binary-encoded column value to its text representation.
45///
46/// Uses the column type OID to dispatch to the correct decoder. For types
47/// not explicitly handled, falls back to hex-encoded raw bytes.
48fn binary_col_to_text(row: &bsql_driver_postgres::Row<'_>, idx: usize, type_oid: u32) -> String {
49    match type_oid {
50        16 => row // bool
51            .get_bool(idx)
52            .map(|v| if v { "t" } else { "f" })
53            .unwrap_or("")
54            .to_owned(),
55        21 => row.get_i16(idx).map(|v| v.to_string()).unwrap_or_default(), // int2
56        23 => row.get_i32(idx).map(|v| v.to_string()).unwrap_or_default(), // int4
57        20 => row.get_i64(idx).map(|v| v.to_string()).unwrap_or_default(), // int8
58        26 => row
59            .get_i32(idx)
60            .map(|v| (v as u32).to_string())
61            .unwrap_or_default(), // oid
62        700 => row.get_f32(idx).map(|v| v.to_string()).unwrap_or_default(), // float4
63        701 => row.get_f64(idx).map(|v| v.to_string()).unwrap_or_default(), // float8
64        25 | 1042 | 1043 | 114 | 142 | 3802 => {
65            // text, char, varchar, json, xml, jsonb — all UTF-8 text
66            row.get_str(idx).unwrap_or("").to_owned()
67        }
68        17 => {
69            // bytea — hex-encode with \x prefix (PG default hex output)
70            match row.get_bytes(idx) {
71                Some(bytes) => {
72                    let mut s = String::with_capacity(2 + bytes.len() * 2);
73                    s.push_str("\\x");
74                    for b in bytes {
75                        use std::fmt::Write;
76                        let _ = write!(s, "{b:02x}");
77                    }
78                    s
79                }
80                None => String::new(),
81            }
82        }
83        _ => {
84            // Fallback: try UTF-8 text decode first (handles many PG types
85            // like inet, uuid, timestamps that are text-representable),
86            // then fall back to hex-encoded raw bytes.
87            if let Some(s) = row.get_str(idx) {
88                s.to_owned()
89            } else if let Some(bytes) = row.get_bytes(idx) {
90                let mut s = String::with_capacity(2 + bytes.len() * 2);
91                s.push_str("\\x");
92                for b in bytes {
93                    use std::fmt::Write;
94                    let _ = write!(s, "{b:02x}");
95                }
96                s
97            } else {
98                String::new()
99            }
100        }
101    }
102}
103
104/// A PostgreSQL connection pool.
105///
106/// Created via [`PgPool::connect`] or [`PgPool::builder`]. The pool manages a set
107/// of connections, automatically acquires/releases them for each query, and
108/// supports optional read/write splitting with a replica.
109///
110/// # Example
111///
112/// ```rust,ignore
113/// use bsql::Pool;
114///
115/// let pool = Pool::connect("postgres://user:pass@localhost/mydb")?;
116///
117/// // Or configure via builder:
118/// let pool = Pool::builder()
119///     .url("postgres://user:pass@localhost/mydb")
120///     .lifetime_secs(900)
121///     .timeout_secs(5)
122///     .build()?;
123/// ```
124pub struct PgPool {
125    pub(crate) inner: bsql_driver_postgres::Pool,
126    /// Optional read replica pool. When present, `query_raw_readonly` routes here.
127    pub(crate) read_pool: Option<bsql_driver_postgres::Pool>,
128}
129
130/// Backward-compatible type alias. Use [`PgPool`] for new code.
131pub type Pool = PgPool;
132
133/// Builder for configuring a connection pool.
134///
135/// # Example
136///
137/// ```rust,ignore
138/// use bsql::Pool;
139///
140/// let pool = Pool::builder()
141///     .url("postgres://user:pass@localhost/mydb")
142///     .max_size(20)
143///     .lifetime_secs(900)
144///     .timeout_secs(5)
145///     .min_idle(2)
146///     .build()?;
147/// ```
148pub struct PoolBuilder {
149    url: Option<String>,
150    max_size: usize,
151    max_lifetime: Option<Option<Duration>>,
152    acquire_timeout: Option<Option<Duration>>,
153    min_idle: Option<usize>,
154    /// Optional URL for a read replica. When set, `query_raw_readonly`
155    /// routes to this pool instead of the primary.
156    replica_url: Option<String>,
157    /// Max pool size for the replica pool. Defaults to same as `max_size`.
158    replica_max_size: Option<usize>,
159    /// Maximum idle duration before a connection is considered stale.
160    stale_timeout: Option<Duration>,
161    /// Maximum number of cached prepared statements per connection.
162    max_stmt_cache_size: Option<usize>,
163}
164
165impl PoolBuilder {
166    /// Configure the pool from a PostgreSQL connection URL.
167    ///
168    /// Format: `postgres://user:password@host:port/dbname`
169    pub fn url(mut self, url: &str) -> Self {
170        self.url = Some(url.into());
171        self
172    }
173
174    pub fn max_size(mut self, size: usize) -> Self {
175        self.max_size = size;
176        self
177    }
178
179    /// Set the maximum lifetime of a connection. Connections older than this
180    /// are discarded when returned to the pool. Default: 30 minutes.
181    ///
182    /// Pass `None` for unlimited lifetime.
183    pub fn max_lifetime(mut self, d: Option<Duration>) -> Self {
184        self.max_lifetime = Some(d);
185        self
186    }
187
188    /// Set the maximum lifetime in seconds. Convenience for
189    /// `max_lifetime(Some(Duration::from_secs(secs)))`.
190    pub fn max_lifetime_secs(self, secs: u64) -> Self {
191        self.max_lifetime(Some(Duration::from_secs(secs)))
192    }
193
194    /// Shorthand for [`max_lifetime_secs`](Self::max_lifetime_secs).
195    pub fn lifetime_secs(self, secs: u64) -> Self {
196        self.max_lifetime_secs(secs)
197    }
198
199    /// Set the maximum time to wait for a connection when the pool is
200    /// exhausted. Default: 5 seconds.
201    ///
202    /// Pass `None` for fail-fast behavior (no waiting, immediate error).
203    pub fn acquire_timeout(mut self, d: Option<Duration>) -> Self {
204        self.acquire_timeout = Some(d);
205        self
206    }
207
208    /// Set the acquire timeout in seconds. Convenience for
209    /// `acquire_timeout(Some(Duration::from_secs(secs)))`.
210    pub fn acquire_timeout_secs(self, secs: u64) -> Self {
211        self.acquire_timeout(Some(Duration::from_secs(secs)))
212    }
213
214    /// Shorthand for [`acquire_timeout_secs`](Self::acquire_timeout_secs).
215    pub fn timeout_secs(self, secs: u64) -> Self {
216        self.acquire_timeout_secs(secs)
217    }
218
219    /// Set the minimum number of idle connections to maintain. Default: 0.
220    ///
221    /// When greater than 0, a background task creates connections as needed
222    /// to maintain this idle floor.
223    pub fn min_idle(mut self, n: usize) -> Self {
224        self.min_idle = Some(n);
225        self
226    }
227
228    /// Set a read replica URL for read/write splitting.
229    ///
230    /// When configured, `query_raw_readonly` (used by SELECT queries)
231    /// routes to the replica pool. All writes go to the primary.
232    /// When no replica is configured, all queries use the primary.
233    pub fn replica_url(mut self, url: &str) -> Self {
234        self.replica_url = Some(url.into());
235        self
236    }
237
238    /// Set the max pool size for the replica pool.
239    /// Defaults to the same value as `max_size`.
240    pub fn replica_max_size(mut self, size: usize) -> Self {
241        self.replica_max_size = Some(size);
242        self
243    }
244
245    /// Set the maximum idle duration before a connection is considered stale.
246    /// Default: 30 seconds. Connections idle longer than this are dropped on
247    /// acquire instead of being reused.
248    pub fn stale_timeout(mut self, timeout: Duration) -> Self {
249        self.stale_timeout = Some(timeout);
250        self
251    }
252
253    /// Set the maximum number of cached prepared statements per connection.
254    /// Default: 256. When the cache exceeds this size, the least recently
255    /// used statement is evicted.
256    pub fn max_stmt_cache_size(mut self, size: usize) -> Self {
257        self.max_stmt_cache_size = Some(size);
258        self
259    }
260
261    pub async fn build(self) -> BsqlResult<PgPool> {
262        let url = self.url.ok_or_else(|| {
263            BsqlError::from(bsql_driver_postgres::DriverError::Pool(
264                "pool builder requires a URL".into(),
265            ))
266        })?;
267        let mut builder = bsql_driver_postgres::Pool::builder()
268            .url(&url)
269            .max_size(self.max_size);
270
271        if let Some(lt) = self.max_lifetime {
272            builder = builder.max_lifetime(lt);
273        }
274        if let Some(at) = self.acquire_timeout {
275            builder = builder.acquire_timeout(at);
276        }
277        if let Some(mi) = self.min_idle {
278            builder = builder.min_idle(mi);
279        }
280        if let Some(st) = self.stale_timeout {
281            builder = builder.stale_timeout(st);
282        }
283        if let Some(msc) = self.max_stmt_cache_size {
284            builder = builder.max_stmt_cache_size(msc);
285        }
286
287        let inner = builder.build().map_err(BsqlError::from)?;
288
289        // Build replica pool if configured
290        let read_pool = if let Some(replica_url) = &self.replica_url {
291            let replica_size = self.replica_max_size.unwrap_or(self.max_size);
292            let mut rbuilder = bsql_driver_postgres::Pool::builder()
293                .url(replica_url)
294                .max_size(replica_size);
295            if let Some(lt) = self.max_lifetime {
296                rbuilder = rbuilder.max_lifetime(lt);
297            }
298            if let Some(at) = self.acquire_timeout {
299                rbuilder = rbuilder.acquire_timeout(at);
300            }
301            Some(rbuilder.build().map_err(BsqlError::from)?)
302        } else {
303            None
304        };
305
306        Ok(PgPool { inner, read_pool })
307    }
308}
309
310impl PgPool {
311    /// Connect to PostgreSQL using a connection URL.
312    ///
313    /// Creates the pool (parses URL, allocates pool structures). Actual TCP/UDS
314    /// connections are established lazily on first `acquire()`.
315    ///
316    /// Format: `postgres://user:password@host:port/dbname`
317    pub async fn connect(url: &str) -> BsqlResult<Self> {
318        let inner = bsql_driver_postgres::Pool::connect(url).map_err(BsqlError::from)?;
319        Ok(PgPool {
320            inner,
321            read_pool: None,
322        })
323    }
324
325    /// Create a pool builder for fine-grained configuration.
326    pub fn builder() -> PoolBuilder {
327        PoolBuilder {
328            url: None,
329            max_size: 10,
330            max_lifetime: None,
331            acquire_timeout: None,
332            min_idle: None,
333            replica_url: None,
334            replica_max_size: None,
335            stale_timeout: None,
336            max_stmt_cache_size: None,
337        }
338    }
339
340    /// Acquire a connection from the pool.
341    ///
342    /// **Fail-fast**: returns `BsqlError::Pool` immediately if no connections
343    /// are available (unless `acquire_timeout` is configured).
344    pub async fn acquire(&self) -> BsqlResult<PoolConnection> {
345        let guard = self.inner.acquire().map_err(BsqlError::from)?;
346        Ok(PoolConnection { inner: guard })
347    }
348
349    /// Begin a new transaction.
350    ///
351    /// Acquires a connection and sends BEGIN immediately.
352    pub async fn begin(&self) -> BsqlResult<Transaction> {
353        let tx = self.inner.begin().map_err(BsqlError::from)?;
354        Ok(Transaction::from_driver(tx))
355    }
356
357    /// Execute a query and return a stream of rows.
358    ///
359    /// Acquires a connection from the pool and returns a [`QueryStream`]
360    /// that holds the connection alive until the stream is consumed or dropped.
361    ///
362    /// Uses true PG-level streaming via `Execute(max_rows=64)`. Only 64 rows
363    /// are in memory at a time. The stream fetches additional chunks on demand
364    /// via the `PortalSuspended` / re-`Execute` protocol.
365    pub async fn query_stream(
366        &self,
367        sql: &str,
368        sql_hash: u64,
369        params: &[&(dyn Encode + Sync)],
370    ) -> BsqlResult<QueryStream> {
371        let mut guard = self.inner.acquire().map_err(BsqlError::from)?;
372        let mut arena = acquire_arena();
373
374        // chunk_size=64 rows per Execute call
375        const CHUNK_SIZE: i32 = 64;
376
377        let (columns, _) = guard
378            .query_streaming_start(sql, sql_hash, params, CHUNK_SIZE)
379            .map_err(BsqlError::from)?;
380
381        let num_cols = columns.len();
382        let mut all_col_offsets: Vec<(usize, i32)> =
383            Vec::with_capacity(num_cols * CHUNK_SIZE as usize);
384
385        let more = guard
386            .streaming_next_chunk(&mut arena, &mut all_col_offsets)
387            .map_err(BsqlError::from)?;
388
389        let first_result = bsql_driver_postgres::QueryResult::from_parts(
390            all_col_offsets,
391            num_cols,
392            columns.clone(),
393            0,
394        );
395
396        Ok(QueryStream::new(guard, arena, first_result, columns, !more))
397    }
398
399    /// Set the SQL statements to pre-PREPARE on new connections.
400    ///
401    /// Each SQL string is PREPAREd on new connections before they are returned
402    /// from `acquire()`. This eliminates first-use Parse overhead for hot queries.
403    ///
404    /// Warmup errors are silently ignored — a bad warmup SQL does not prevent
405    /// the connection from being usable.
406    pub fn set_warmup_sqls<S: Into<Box<str>>>(&self, sqls: impl IntoIterator<Item = S>) {
407        self.inner.set_warmup_sqls(sqls);
408    }
409
410    /// Execute arbitrary SQL and return text rows.
411    ///
412    /// Uses PostgreSQL's simple query protocol — all values returned as strings.
413    /// This bypasses bsql's compile-time SQL validation entirely.
414    ///
415    /// Use for DDL, ad-hoc queries, migrations, or the rare dynamic SQL that
416    /// cannot be expressed via `query!`. For type-safe queries, use `query!`.
417    pub async fn raw_query(&self, sql: &str) -> BsqlResult<Vec<RawRow>> {
418        let mut guard = self.inner.acquire().map_err(BsqlError::from)?;
419        let rows = guard
420            .simple_query_rows(sql)
421            .map_err(BsqlError::from_driver_query)?;
422        Ok(rows.into_iter().map(RawRow).collect())
423    }
424
425    /// Execute arbitrary SQL without returning rows.
426    ///
427    /// Uses PostgreSQL's simple query protocol. Useful for DDL (CREATE TABLE,
428    /// ALTER, DROP), SET commands, or any statement where you don't need results.
429    pub async fn raw_execute(&self, sql: &str) -> BsqlResult<()> {
430        let mut guard = self.inner.acquire().map_err(BsqlError::from)?;
431        guard
432            .simple_query(sql)
433            .map_err(BsqlError::from_driver_query)?;
434        Ok(())
435    }
436
437    /// Execute parameterized SQL and return text rows.
438    ///
439    /// Uses PostgreSQL's extended query protocol (Parse+Bind+Execute) with
440    /// positional `$1, $2, ...` parameter placeholders. Results are converted
441    /// from binary format to text strings.
442    ///
443    /// This is the parameterized equivalent of [`raw_query`](Self::raw_query).
444    /// Use when you need parameter binding but don't want compile-time
445    /// validation via `query!`.
446    ///
447    /// # Example
448    ///
449    /// ```rust,ignore
450    /// let rows = pool.raw_query_params(
451    ///     "SELECT $1::int4 + $2::int4 AS sum",
452    ///     &[&1i32, &2i32],
453    /// ).await?;
454    /// assert_eq!(rows[0].get(0), Some("3"));
455    /// ```
456    pub async fn raw_query_params(
457        &self,
458        sql: &str,
459        params: &[&(dyn Encode + Sync)],
460    ) -> BsqlResult<Vec<RawRow>> {
461        let sql_hash = crate::rapid_hash_str(sql);
462        let mut guard = self.inner.acquire().map_err(BsqlError::from)?;
463        let result = guard
464            .query(sql, sql_hash, params)
465            .map_err(BsqlError::from_driver_query)?;
466        let arena = bsql_driver_postgres::Arena::empty();
467        let columns = result.columns();
468        let num_rows = result.len();
469        let mut rows = Vec::with_capacity(num_rows);
470        for i in 0..num_rows {
471            let row = result.row(i, &arena);
472            let num_cols = row.column_count();
473            let mut values = Vec::with_capacity(num_cols);
474            for (c, col) in columns.iter().enumerate().take(num_cols) {
475                if row.is_null(c) {
476                    values.push(None);
477                } else {
478                    let text = binary_col_to_text(&row, c, col.type_oid);
479                    values.push(Some(text));
480                }
481            }
482            rows.push(RawRow(values));
483        }
484        Ok(rows)
485    }
486
487    /// Bulk copy data INTO a table from an iterator of text rows.
488    ///
489    /// Each row is a tab-separated string (TSV format, matching PostgreSQL's
490    /// default COPY text format). Returns the number of rows copied.
491    ///
492    /// This is 10-100x faster than individual INSERTs for bulk data loading.
493    ///
494    /// # Example
495    ///
496    /// ```rust,ignore
497    /// let rows = vec!["alice\talice@example.com", "bob\tbob@example.com"];
498    /// let count = pool.copy_in("users", &["name", "email"], rows.iter().map(|s| s.as_str())).await?;
499    /// ```
500    pub async fn copy_in<'a, I>(&self, table: &str, columns: &[&str], rows: I) -> BsqlResult<u64>
501    where
502        I: IntoIterator<Item = &'a str>,
503    {
504        let mut guard = self.inner.acquire().map_err(BsqlError::from)?;
505        guard
506            .copy_in(table, columns, rows)
507            .map_err(BsqlError::from_driver_query)
508    }
509
510    /// Bulk copy data OUT of a table or query result to a writer.
511    ///
512    /// Data is written in PostgreSQL's text format (tab-separated columns,
513    /// newline-terminated rows). Returns the number of rows copied.
514    ///
515    /// # Example
516    ///
517    /// ```rust,ignore
518    /// let mut buf = Vec::new();
519    /// let count = pool.copy_out("SELECT name, email FROM users", &mut buf).await?;
520    /// ```
521    pub async fn copy_out<W: std::io::Write>(
522        &self,
523        query: &str,
524        writer: &mut W,
525    ) -> BsqlResult<u64> {
526        let mut guard = self.inner.acquire().map_err(BsqlError::from)?;
527        guard
528            .copy_out(query, writer)
529            .map_err(BsqlError::from_driver_query)
530    }
531
532    /// Pool status metrics: idle, active, open, and max_size.
533    ///
534    /// Returns detailed pool utilization metrics from the driver.
535    pub fn status(&self) -> PoolStatus {
536        let driver_status = self.inner.status();
537        PoolStatus {
538            idle: driver_status.idle,
539            active: driver_status.active,
540            open: driver_status.open,
541            max_size: driver_status.max_size,
542        }
543    }
544
545    /// Gracefully close the pool (and replica pool if configured).
546    ///
547    /// No new connections can be acquired after this call. All idle connections
548    /// are closed immediately. Active connections are closed when returned to
549    /// the pool.
550    pub fn close(&self) {
551        self.inner.close();
552        if let Some(ref rp) = self.read_pool {
553            rp.close();
554        }
555    }
556
557    /// Whether the pool has been closed.
558    pub fn is_closed(&self) -> bool {
559        self.inner.is_closed()
560    }
561
562    /// Whether a read replica pool is configured.
563    pub fn has_replica(&self) -> bool {
564        self.read_pool.is_some()
565    }
566
567    /// Whether this pool uses sync connections via Unix domain sockets.
568    ///
569    /// When `true`, the pool automatically uses `SyncConnection` (blocking I/O)
570    /// internally, eliminating async overhead for sub-microsecond UDS I/O.
571    /// The user API is identical — this is purely a performance optimization.
572    pub fn is_uds(&self) -> bool {
573        self.inner.is_uds()
574    }
575
576    /// Process each row directly from the wire buffer via a closure.
577    ///
578    /// Acquires a connection, calls `Connection::for_each`, and releases.
579    /// Zero arena allocation — the closure reads columns directly from
580    /// the DataRow message bytes.
581    ///
582    /// When `readonly` is true and a replica pool is configured, routes
583    /// to the replica pool; otherwise uses the primary.
584    pub async fn for_each_raw<F>(
585        &self,
586        sql: &str,
587        sql_hash: u64,
588        params: &[&(dyn Encode + Sync)],
589        readonly: bool,
590        mut f: F,
591    ) -> BsqlResult<()>
592    where
593        F: FnMut(bsql_driver_postgres::PgDataRow<'_>) -> BsqlResult<()>,
594    {
595        let pool = if readonly {
596            self.read_pool.as_ref().unwrap_or(&self.inner)
597        } else {
598            &self.inner
599        };
600        let mut guard = pool.acquire().map_err(BsqlError::from)?;
601        // Bridge BsqlError from the user closure into DriverError for the
602        // driver-level for_each. Any closure error is stashed in `user_err`
603        // and re-surfaced after the driver returns.
604        let mut user_err: Option<BsqlError> = None;
605        let driver_result = guard.for_each(sql, sql_hash, params, |row| match f(row) {
606            Ok(()) => Ok(()),
607            Err(e) => {
608                user_err = Some(e);
609                Err(bsql_driver_postgres::DriverError::Protocol(
610                    "for_each closure error".into(),
611                ))
612            }
613        });
614        // If the user closure produced an error, return it directly.
615        if let Some(e) = user_err {
616            return Err(e);
617        }
618        driver_result.map_err(BsqlError::from_driver_query)
619    }
620
621    /// Process each DataRow as raw bytes via inline sequential decode.
622    ///
623    /// Like `for_each_raw` but passes the raw `&[u8]` DataRow payload directly
624    /// to the closure — no `PgDataRow` construction, no SmallVec pre-scan.
625    /// The generated macro code decodes columns inline by advancing a position
626    /// cursor through the bytes.
627    #[doc(hidden)]
628    pub async fn __for_each_raw_bytes<F>(
629        &self,
630        sql: &str,
631        sql_hash: u64,
632        params: &[&(dyn Encode + Sync)],
633        readonly: bool,
634        mut f: F,
635    ) -> BsqlResult<()>
636    where
637        F: FnMut(&[u8]) -> BsqlResult<()>,
638    {
639        let pool = if readonly {
640            self.read_pool.as_ref().unwrap_or(&self.inner)
641        } else {
642            &self.inner
643        };
644        let mut guard = pool.acquire().map_err(BsqlError::from)?;
645        let mut user_err: Option<BsqlError> = None;
646        let driver_result = guard.for_each_raw(sql, sql_hash, params, |data| match f(data) {
647            Ok(()) => Ok(()),
648            Err(e) => {
649                user_err = Some(e);
650                Err(bsql_driver_postgres::DriverError::Protocol(
651                    "for_each closure error".into(),
652                ))
653            }
654        });
655        if let Some(e) = user_err {
656            return Err(e);
657        }
658        driver_result.map_err(BsqlError::from_driver_query)
659    }
660}
661
662impl Clone for PgPool {
663    fn clone(&self) -> Self {
664        PgPool {
665            inner: self.inner.clone(),
666            read_pool: self.read_pool.clone(),
667        }
668    }
669}
670
671impl std::fmt::Debug for PgPool {
672    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
673        f.debug_struct("PgPool")
674            .field("status", &self.status())
675            .finish()
676    }
677}
678
679/// A connection borrowed from the pool.
680///
681/// Provides exclusive (`&mut`) access to the underlying `PoolGuard` — no
682/// `Mutex` needed. Generated code converts `&mut PoolConnection` into a
683/// [`QueryTarget`](crate::executor::QueryTarget) for dispatch.
684///
685/// Returned to the pool when dropped.
686pub struct PoolConnection {
687    pub(crate) inner: bsql_driver_postgres::PoolGuard,
688}
689
690impl std::fmt::Debug for PoolConnection {
691    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
692        f.debug_struct("PoolConnection").finish()
693    }
694}
695
696/// Snapshot of pool utilization.
697#[derive(Debug, Clone, Copy)]
698pub struct PoolStatus {
699    /// Number of idle connections in the pool.
700    pub idle: usize,
701    /// Number of connections currently in use.
702    pub active: usize,
703    /// Total open connections (idle + active).
704    pub open: usize,
705    /// Maximum pool size.
706    pub max_size: usize,
707}
708
709impl std::fmt::Display for PoolStatus {
710    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
711        write!(
712            f,
713            "idle={}, active={}, open={}, max={}",
714            self.idle, self.active, self.open, self.max_size
715        )
716    }
717}
718
719#[cfg(test)]
720mod tests {
721    use super::*;
722
723    #[test]
724    fn builder_defaults() {
725        let b = Pool::builder();
726        assert_eq!(b.max_size, 10);
727        assert!(b.max_lifetime.is_none());
728        assert!(b.acquire_timeout.is_none());
729        assert!(b.min_idle.is_none());
730    }
731
732    #[test]
733    fn builder_max_lifetime() {
734        let b = Pool::builder().max_lifetime(Some(Duration::from_secs(60)));
735        assert_eq!(b.max_lifetime, Some(Some(Duration::from_secs(60))));
736    }
737
738    #[test]
739    fn builder_max_lifetime_none_disables() {
740        let b = Pool::builder().max_lifetime(None);
741        assert_eq!(b.max_lifetime, Some(None));
742    }
743
744    #[test]
745    fn builder_acquire_timeout() {
746        let b = Pool::builder().acquire_timeout(Some(Duration::from_secs(3)));
747        assert_eq!(b.acquire_timeout, Some(Some(Duration::from_secs(3))));
748    }
749
750    #[test]
751    fn builder_acquire_timeout_none_disables() {
752        let b = Pool::builder().acquire_timeout(None);
753        assert_eq!(b.acquire_timeout, Some(None));
754    }
755
756    #[test]
757    fn builder_min_idle() {
758        let b = Pool::builder().min_idle(5);
759        assert_eq!(b.min_idle, Some(5));
760    }
761
762    // --- Convenience methods ---
763
764    #[test]
765    fn builder_max_lifetime_secs() {
766        let b = Pool::builder().max_lifetime_secs(1800);
767        assert_eq!(b.max_lifetime, Some(Some(Duration::from_secs(1800))));
768    }
769
770    #[test]
771    fn builder_acquire_timeout_secs() {
772        let b = Pool::builder().acquire_timeout_secs(5);
773        assert_eq!(b.acquire_timeout, Some(Some(Duration::from_secs(5))));
774    }
775
776    // --- Shorthand aliases ---
777
778    #[test]
779    fn builder_lifetime_secs_shorthand() {
780        let b = Pool::builder().lifetime_secs(900);
781        assert_eq!(b.max_lifetime, Some(Some(Duration::from_secs(900))));
782    }
783
784    #[test]
785    fn builder_timeout_secs_shorthand() {
786        let b = Pool::builder().timeout_secs(3);
787        assert_eq!(b.acquire_timeout, Some(Some(Duration::from_secs(3))));
788    }
789
790    // --- Task 2: Read/write splitting ---
791
792    #[test]
793    fn builder_defaults_no_replica() {
794        let b = Pool::builder();
795        assert!(b.replica_url.is_none());
796        assert!(b.replica_max_size.is_none());
797    }
798
799    #[test]
800    fn builder_replica_url() {
801        let b = Pool::builder().replica_url("postgres://replica:5432/db");
802        assert_eq!(b.replica_url.as_deref(), Some("postgres://replica:5432/db"));
803    }
804
805    #[test]
806    fn builder_replica_max_size() {
807        let b = Pool::builder().replica_max_size(20);
808        assert_eq!(b.replica_max_size, Some(20));
809    }
810
811    #[tokio::test]
812    async fn pool_connect_has_no_replica() {
813        let pool = Pool::connect("postgres://user:pass@localhost/db")
814            .await
815            .unwrap();
816        assert!(!pool.has_replica());
817    }
818
819    // --- Auto-UDS sync connection tests ---
820
821    #[tokio::test]
822    async fn pool_is_uds_false_for_tcp() {
823        let pool = Pool::connect("postgres://user:pass@localhost/db")
824            .await
825            .unwrap();
826        assert!(!pool.is_uds());
827    }
828
829    #[cfg(unix)]
830    #[tokio::test]
831    async fn pool_is_uds_true_for_unix_socket() {
832        let pool = Pool::connect("postgres://user@localhost/db?host=/tmp")
833            .await
834            .unwrap();
835        assert!(pool.is_uds());
836    }
837
838    #[tokio::test]
839    async fn pool_is_uds_false_for_ip() {
840        let pool = Pool::connect("postgres://user:pass@127.0.0.1/db")
841            .await
842            .unwrap();
843        assert!(!pool.is_uds());
844    }
845
846    // --- PoolStatus Display ---
847
848    #[test]
849    fn pool_status_display() {
850        let status = PoolStatus {
851            idle: 3,
852            active: 2,
853            open: 5,
854            max_size: 10,
855        };
856        assert_eq!(status.to_string(), "idle=3, active=2, open=5, max=10");
857    }
858
859    #[test]
860    fn pool_status_display_zeros() {
861        let status = PoolStatus {
862            idle: 0,
863            active: 0,
864            open: 0,
865            max_size: 0,
866        };
867        assert_eq!(status.to_string(), "idle=0, active=0, open=0, max=0");
868    }
869
870    // --- PoolConnection Debug ---
871
872    #[test]
873    fn pool_connection_debug() {
874        // PoolConnection wraps a PoolGuard, Debug should not panic
875        let dbg_str = "PoolConnection";
876        assert!(!dbg_str.is_empty());
877        // We can't construct a PoolConnection without a real pool guard,
878        // but we verify the impl exists at compile time through the trait bound.
879        fn _assert_debug<T: std::fmt::Debug>() {}
880        _assert_debug::<PoolConnection>();
881    }
882
883    // --- Pool Debug ---
884
885    #[tokio::test]
886    async fn pool_debug() {
887        let pool = Pool::connect("postgres://user:pass@localhost/db")
888            .await
889            .unwrap();
890        let dbg = format!("{pool:?}");
891        assert!(dbg.contains("Pool"), "Debug should show Pool: {dbg}");
892    }
893
894    // --- Pool Clone ---
895
896    #[tokio::test]
897    async fn pool_clone_is_cheap() {
898        let pool = Pool::connect("postgres://user:pass@localhost/db")
899            .await
900            .unwrap();
901        let pool2 = pool.clone();
902        assert_eq!(pool.status().max_size, pool2.status().max_size);
903        assert!(!pool.has_replica());
904        assert!(!pool2.has_replica());
905    }
906
907    // --- Send + Sync assertions ---
908
909    fn _assert_send<T: Send>() {}
910    fn _assert_sync<T: Sync>() {}
911
912    #[test]
913    fn pool_is_send_and_sync() {
914        _assert_send::<Pool>();
915        _assert_sync::<Pool>();
916    }
917
918    #[test]
919    fn pool_connection_is_send() {
920        _assert_send::<PoolConnection>();
921    }
922
923    #[test]
924    fn pool_status_is_send_and_sync() {
925        _assert_send::<PoolStatus>();
926        _assert_sync::<PoolStatus>();
927    }
928
929    // --- Builder without URL ---
930
931    #[tokio::test]
932    async fn builder_build_without_url_errors() {
933        let result = Pool::builder().build().await;
934        assert!(result.is_err());
935        let err = result.unwrap_err().to_string();
936        assert!(err.contains("URL"), "error should mention URL: {err}");
937    }
938
939    // --- PoolBuilder chaining ---
940
941    #[test]
942    fn builder_chaining() {
943        let b = Pool::builder()
944            .url("postgres://u@localhost/db")
945            .max_size(20)
946            .lifetime_secs(600)
947            .timeout_secs(3)
948            .min_idle(2)
949            .replica_url("postgres://u@replica/db")
950            .replica_max_size(10);
951        assert_eq!(b.max_size, 20);
952        assert_eq!(b.min_idle, Some(2));
953        assert_eq!(b.replica_max_size, Some(10));
954    }
955
956    // --- RawRow ---
957
958    #[test]
959    fn raw_row_get() {
960        let row = RawRow(vec![Some("hello".into()), None, Some("42".into())]);
961        assert_eq!(row.get(0), Some("hello"));
962        assert_eq!(row.get(1), None);
963        assert_eq!(row.get(2), Some("42"));
964        assert_eq!(row.get(99), None);
965        assert_eq!(row.len(), 3);
966    }
967
968    #[test]
969    fn raw_row_is_empty() {
970        let empty = RawRow(vec![]);
971        assert!(empty.is_empty());
972        assert_eq!(empty.len(), 0);
973
974        let non_empty = RawRow(vec![Some("x".into())]);
975        assert!(!non_empty.is_empty());
976    }
977
978    #[test]
979    fn raw_row_iter() {
980        let row = RawRow(vec![Some("a".into()), None, Some("b".into())]);
981        let vals: Vec<_> = row.iter().collect();
982        assert_eq!(vals, vec![Some("a"), None, Some("b")]);
983    }
984
985    #[test]
986    fn raw_row_clone() {
987        let row = RawRow(vec![Some("hello".into()), None]);
988        let cloned = row.clone();
989        assert_eq!(cloned.get(0), Some("hello"));
990        assert_eq!(cloned.get(1), None);
991        assert_eq!(cloned.len(), 2);
992    }
993
994    #[test]
995    fn raw_row_debug() {
996        let row = RawRow(vec![Some("x".into())]);
997        let dbg = format!("{row:?}");
998        assert!(dbg.contains("RawRow"), "Debug should show RawRow: {dbg}");
999    }
1000
1001    // --- RawRow additional edge cases ---
1002
1003    #[test]
1004    fn raw_row_all_null_values() {
1005        let row = RawRow(vec![None, None, None]);
1006        assert_eq!(row.len(), 3);
1007        assert!(!row.is_empty());
1008        assert_eq!(row.get(0), None);
1009        assert_eq!(row.get(1), None);
1010        assert_eq!(row.get(2), None);
1011        // iter should produce all None
1012        let vals: Vec<_> = row.iter().collect();
1013        assert_eq!(vals, vec![None, None, None]);
1014    }
1015
1016    #[test]
1017    fn raw_row_empty_string_values() {
1018        let row = RawRow(vec![Some(String::new()), Some("".into())]);
1019        assert_eq!(row.len(), 2);
1020        // Empty string is Some(""), not None
1021        assert_eq!(row.get(0), Some(""));
1022        assert_eq!(row.get(1), Some(""));
1023    }
1024
1025    #[test]
1026    fn raw_row_get_out_of_bounds() {
1027        let row = RawRow(vec![Some("only".into())]);
1028        assert_eq!(row.get(0), Some("only"));
1029        assert_eq!(row.get(1), None);
1030        assert_eq!(row.get(100), None);
1031        assert_eq!(row.get(usize::MAX), None);
1032    }
1033
1034    #[test]
1035    fn raw_row_iter_empty() {
1036        let row = RawRow(vec![]);
1037        let vals: Vec<_> = row.iter().collect();
1038        assert!(vals.is_empty());
1039    }
1040
1041    #[test]
1042    fn raw_row_iter_mixed() {
1043        let row = RawRow(vec![
1044            Some("hello".into()),
1045            None,
1046            Some("world".into()),
1047            None,
1048            Some("".into()),
1049        ]);
1050        let vals: Vec<_> = row.iter().collect();
1051        assert_eq!(
1052            vals,
1053            vec![Some("hello"), None, Some("world"), None, Some("")]
1054        );
1055    }
1056
1057    #[test]
1058    fn raw_row_single_null() {
1059        let row = RawRow(vec![None]);
1060        assert_eq!(row.len(), 1);
1061        assert!(!row.is_empty());
1062        assert_eq!(row.get(0), None);
1063    }
1064
1065    // --- PoolBuilder stale_timeout ---
1066
1067    #[test]
1068    fn builder_stale_timeout() {
1069        let b = Pool::builder().stale_timeout(Duration::from_secs(15));
1070        assert_eq!(b.stale_timeout, Some(Duration::from_secs(15)));
1071    }
1072
1073    #[test]
1074    fn builder_stale_timeout_default_is_none() {
1075        let b = Pool::builder();
1076        assert!(b.stale_timeout.is_none());
1077    }
1078
1079    // --- PoolBuilder max_stmt_cache_size ---
1080
1081    #[test]
1082    fn builder_max_stmt_cache_size() {
1083        let b = Pool::builder().max_stmt_cache_size(512);
1084        assert_eq!(b.max_stmt_cache_size, Some(512));
1085    }
1086
1087    #[test]
1088    fn builder_max_stmt_cache_size_default_is_none() {
1089        let b = Pool::builder();
1090        assert!(b.max_stmt_cache_size.is_none());
1091    }
1092
1093    // --- Pool close / is_closed ---
1094
1095    #[tokio::test]
1096    async fn pool_close_and_is_closed() {
1097        let pool = Pool::connect("postgres://user:pass@localhost/db")
1098            .await
1099            .unwrap();
1100        assert!(!pool.is_closed());
1101        pool.close();
1102        assert!(pool.is_closed());
1103    }
1104
1105    // --- Pool status on fresh pool ---
1106
1107    #[tokio::test]
1108    async fn pool_status_on_fresh_pool() {
1109        let pool = Pool::connect("postgres://user:pass@localhost/db")
1110            .await
1111            .unwrap();
1112        let status = pool.status();
1113        assert_eq!(status.idle, 0, "fresh pool should have 0 idle");
1114        assert_eq!(status.active, 0, "fresh pool should have 0 active");
1115        assert_eq!(status.open, 0, "fresh pool should have 0 open");
1116        assert_eq!(status.max_size, 10, "default max_size should be 10");
1117    }
1118
1119    // --- PoolStatus Clone and Copy ---
1120
1121    #[test]
1122    fn pool_status_clone_and_copy() {
1123        let status = PoolStatus {
1124            idle: 1,
1125            active: 2,
1126            open: 3,
1127            max_size: 10,
1128        };
1129        let cloned = status;
1130        assert_eq!(cloned.idle, 1);
1131        assert_eq!(cloned.active, 2);
1132        assert_eq!(cloned.open, 3);
1133        assert_eq!(cloned.max_size, 10);
1134    }
1135
1136    // --- PoolStatus Debug ---
1137
1138    #[test]
1139    fn pool_status_debug() {
1140        let status = PoolStatus {
1141            idle: 1,
1142            active: 2,
1143            open: 3,
1144            max_size: 10,
1145        };
1146        let dbg = format!("{status:?}");
1147        assert!(
1148            dbg.contains("PoolStatus"),
1149            "Debug should show PoolStatus: {dbg}"
1150        );
1151    }
1152
1153    // --- Builder max_size ---
1154
1155    #[test]
1156    fn builder_max_size() {
1157        let b = Pool::builder().max_size(50);
1158        assert_eq!(b.max_size, 50);
1159    }
1160
1161    // --- Builder url ---
1162
1163    #[test]
1164    fn builder_url_stored() {
1165        let b = Pool::builder().url("postgres://localhost/test");
1166        assert_eq!(b.url.as_deref(), Some("postgres://localhost/test"));
1167    }
1168
1169    // --- RawRow unicode content ---
1170
1171    #[test]
1172    fn raw_row_unicode_content() {
1173        let row = RawRow(vec![
1174            Some("\u{1F600}".into()),                                // emoji
1175            Some("\u{0645}\u{0631}\u{062D}\u{0628}\u{0627}".into()), // Arabic
1176            Some("\u{00E9}\u{00E8}\u{00EA}".into()),                 // French accents
1177        ]);
1178        assert_eq!(row.get(0), Some("\u{1F600}"));
1179        assert_eq!(row.len(), 3);
1180    }
1181
1182    // --- RawRow large column count ---
1183
1184    #[test]
1185    fn raw_row_many_columns() {
1186        let cols: Vec<Option<String>> = (0..100).map(|i| Some(format!("val_{i}"))).collect();
1187        let row = RawRow(cols);
1188        assert_eq!(row.len(), 100);
1189        assert_eq!(row.get(0), Some("val_0"));
1190        assert_eq!(row.get(99), Some("val_99"));
1191        assert_eq!(row.get(100), None);
1192    }
1193
1194    // --- Pool has_replica false by default ---
1195
1196    #[tokio::test]
1197    async fn pool_has_replica_false_default() {
1198        let pool = Pool::connect("postgres://user:pass@localhost/db")
1199            .await
1200            .unwrap();
1201        assert!(!pool.has_replica());
1202    }
1203
1204    // --- Builder complete chaining returns correct state ---
1205
1206    #[test]
1207    fn builder_full_chain() {
1208        let b = Pool::builder()
1209            .url("postgres://u@localhost/db")
1210            .max_size(32)
1211            .lifetime_secs(600)
1212            .timeout_secs(3)
1213            .min_idle(4)
1214            .stale_timeout(Duration::from_secs(30))
1215            .max_stmt_cache_size(128)
1216            .replica_url("postgres://u@replica/db")
1217            .replica_max_size(16);
1218        assert_eq!(b.max_size, 32);
1219        assert_eq!(b.min_idle, Some(4));
1220        assert_eq!(b.stale_timeout, Some(Duration::from_secs(30)));
1221        assert_eq!(b.max_stmt_cache_size, Some(128));
1222        assert_eq!(b.replica_max_size, Some(16));
1223    }
1224
1225    // --- RawRow construction ---
1226
1227    #[test]
1228    fn raw_row_with_nulls() {
1229        let row = RawRow(vec![Some("hello".into()), None, Some("world".into())]);
1230        assert_eq!(row.len(), 3);
1231        assert_eq!(row.get(0), Some("hello"));
1232        assert_eq!(row.get(1), None);
1233        assert_eq!(row.get(2), Some("world"));
1234    }
1235
1236    #[test]
1237    fn raw_row_all_nulls() {
1238        let row = RawRow(vec![None, None]);
1239        assert_eq!(row.len(), 2);
1240        assert_eq!(row.get(0), None);
1241        assert_eq!(row.get(1), None);
1242    }
1243
1244    // --- PgPool rename with Pool type alias ---
1245
1246    #[test]
1247    fn pgpool_and_pool_alias_are_same_type() {
1248        // Pool is a type alias for PgPool — both refer to the same struct.
1249        fn accepts_pgpool(_: &PgPool) {}
1250        fn accepts_pool(_: &Pool) {}
1251        fn returns_pgpool() -> fn(&PgPool) {
1252            accepts_pool // Pool alias accepted where PgPool expected
1253        }
1254        fn returns_pool() -> fn(&Pool) {
1255            accepts_pgpool // PgPool accepted where Pool alias expected
1256        }
1257        let _ = returns_pgpool();
1258        let _ = returns_pool();
1259    }
1260
1261    #[test]
1262    fn pgpool_builder_accessible_via_alias() {
1263        // Confirm Pool::builder() works through the type alias
1264        let b = Pool::builder();
1265        assert_eq!(b.max_size, 10);
1266        // Also confirm PgPool::builder() works
1267        let b2 = PgPool::builder();
1268        assert_eq!(b2.max_size, 10);
1269    }
1270}