Skip to main content

bsql_core/
pool.rs

1//! Connection pool — thin wrapper over `bsql_driver_postgres::Pool`.
2//!
3//! Delegates all connection management, fail-fast semantics, and LIFO ordering
4//! to the driver. This layer adds only the bsql error type conversions.
5
6use std::time::Duration;
7
8use bsql_driver_postgres::arena::acquire_arena;
9use bsql_driver_postgres::codec::Encode;
10
11use crate::error::{BsqlError, BsqlResult};
12use crate::stream::QueryStream;
13use crate::transaction::Transaction;
14
15/// A row of text values from a raw (unvalidated) SQL query.
16///
17/// All values are strings — PostgreSQL's simple query protocol returns
18/// everything as text. Use [`get`](RawRow::get) to access columns by index.
19#[derive(Debug, Clone)]
20pub struct RawRow(Vec<Option<String>>);
21
22impl RawRow {
23    /// Get a column value by index. Returns `None` for SQL NULL.
24    pub fn get(&self, idx: usize) -> Option<&str> {
25        self.0.get(idx)?.as_deref()
26    }
27
28    /// Number of columns.
29    pub fn len(&self) -> usize {
30        self.0.len()
31    }
32
33    /// Whether the row has no columns.
34    pub fn is_empty(&self) -> bool {
35        self.0.is_empty()
36    }
37
38    /// Iterate over column values.
39    pub fn iter(&self) -> impl Iterator<Item = Option<&str>> {
40        self.0.iter().map(|v| v.as_deref())
41    }
42}
43
44/// A PostgreSQL connection pool.
45///
46/// Created via [`Pool::connect`] or [`Pool::builder`]. The pool manages a set
47/// of connections, automatically acquires/releases them for each query, and
48/// supports optional read/write splitting with a replica.
49///
50/// # Example
51///
52/// ```rust,ignore
53/// use bsql::Pool;
54///
55/// let pool = Pool::connect("postgres://user:pass@localhost/mydb")?;
56///
57/// // Or configure via builder:
58/// let pool = Pool::builder()
59///     .url("postgres://user:pass@localhost/mydb")
60///     .lifetime_secs(900)
61///     .timeout_secs(5)
62///     .build()?;
63/// ```
64pub struct Pool {
65    pub(crate) inner: bsql_driver_postgres::Pool,
66    /// Optional read replica pool. When present, `query_raw_readonly` routes here.
67    pub(crate) read_pool: Option<bsql_driver_postgres::Pool>,
68}
69
70/// Builder for configuring a connection pool.
71///
72/// # Example
73///
74/// ```rust,ignore
75/// use bsql::Pool;
76///
77/// let pool = Pool::builder()
78///     .url("postgres://user:pass@localhost/mydb")
79///     .max_size(20)
80///     .lifetime_secs(900)
81///     .timeout_secs(5)
82///     .min_idle(2)
83///     .build()?;
84/// ```
85pub struct PoolBuilder {
86    url: Option<String>,
87    max_size: usize,
88    max_lifetime: Option<Option<Duration>>,
89    acquire_timeout: Option<Option<Duration>>,
90    min_idle: Option<usize>,
91    /// Optional URL for a read replica. When set, `query_raw_readonly`
92    /// routes to this pool instead of the primary.
93    replica_url: Option<String>,
94    /// Max pool size for the replica pool. Defaults to same as `max_size`.
95    replica_max_size: Option<usize>,
96    /// Maximum idle duration before a connection is considered stale.
97    stale_timeout: Option<Duration>,
98    /// Maximum number of cached prepared statements per connection.
99    max_stmt_cache_size: Option<usize>,
100}
101
102impl PoolBuilder {
103    /// Configure the pool from a PostgreSQL connection URL.
104    ///
105    /// Format: `postgres://user:password@host:port/dbname`
106    pub fn url(mut self, url: &str) -> Self {
107        self.url = Some(url.into());
108        self
109    }
110
111    pub fn max_size(mut self, size: usize) -> Self {
112        self.max_size = size;
113        self
114    }
115
116    /// Set the maximum lifetime of a connection. Connections older than this
117    /// are discarded when returned to the pool. Default: 30 minutes.
118    ///
119    /// Pass `None` for unlimited lifetime.
120    pub fn max_lifetime(mut self, d: Option<Duration>) -> Self {
121        self.max_lifetime = Some(d);
122        self
123    }
124
125    /// Set the maximum lifetime in seconds. Convenience for
126    /// `max_lifetime(Some(Duration::from_secs(secs)))`.
127    pub fn max_lifetime_secs(self, secs: u64) -> Self {
128        self.max_lifetime(Some(Duration::from_secs(secs)))
129    }
130
131    /// Shorthand for [`max_lifetime_secs`](Self::max_lifetime_secs).
132    pub fn lifetime_secs(self, secs: u64) -> Self {
133        self.max_lifetime_secs(secs)
134    }
135
136    /// Set the maximum time to wait for a connection when the pool is
137    /// exhausted. Default: 5 seconds.
138    ///
139    /// Pass `None` for fail-fast behavior (no waiting, immediate error).
140    pub fn acquire_timeout(mut self, d: Option<Duration>) -> Self {
141        self.acquire_timeout = Some(d);
142        self
143    }
144
145    /// Set the acquire timeout in seconds. Convenience for
146    /// `acquire_timeout(Some(Duration::from_secs(secs)))`.
147    pub fn acquire_timeout_secs(self, secs: u64) -> Self {
148        self.acquire_timeout(Some(Duration::from_secs(secs)))
149    }
150
151    /// Shorthand for [`acquire_timeout_secs`](Self::acquire_timeout_secs).
152    pub fn timeout_secs(self, secs: u64) -> Self {
153        self.acquire_timeout_secs(secs)
154    }
155
156    /// Set the minimum number of idle connections to maintain. Default: 0.
157    ///
158    /// When greater than 0, a background task creates connections as needed
159    /// to maintain this idle floor.
160    pub fn min_idle(mut self, n: usize) -> Self {
161        self.min_idle = Some(n);
162        self
163    }
164
165    /// Set a read replica URL for read/write splitting.
166    ///
167    /// When configured, `query_raw_readonly` (used by SELECT queries)
168    /// routes to the replica pool. All writes go to the primary.
169    /// When no replica is configured, all queries use the primary.
170    pub fn replica_url(mut self, url: &str) -> Self {
171        self.replica_url = Some(url.into());
172        self
173    }
174
175    /// Set the max pool size for the replica pool.
176    /// Defaults to the same value as `max_size`.
177    pub fn replica_max_size(mut self, size: usize) -> Self {
178        self.replica_max_size = Some(size);
179        self
180    }
181
182    /// Set the maximum idle duration before a connection is considered stale.
183    /// Default: 30 seconds. Connections idle longer than this are dropped on
184    /// acquire instead of being reused.
185    pub fn stale_timeout(mut self, timeout: Duration) -> Self {
186        self.stale_timeout = Some(timeout);
187        self
188    }
189
190    /// Set the maximum number of cached prepared statements per connection.
191    /// Default: 256. When the cache exceeds this size, the least recently
192    /// used statement is evicted.
193    pub fn max_stmt_cache_size(mut self, size: usize) -> Self {
194        self.max_stmt_cache_size = Some(size);
195        self
196    }
197
198    pub async fn build(self) -> BsqlResult<Pool> {
199        let url = self.url.ok_or_else(|| {
200            BsqlError::from(bsql_driver_postgres::DriverError::Pool(
201                "pool builder requires a URL".into(),
202            ))
203        })?;
204        let mut builder = bsql_driver_postgres::Pool::builder()
205            .url(&url)
206            .max_size(self.max_size);
207
208        if let Some(lt) = self.max_lifetime {
209            builder = builder.max_lifetime(lt);
210        }
211        if let Some(at) = self.acquire_timeout {
212            builder = builder.acquire_timeout(at);
213        }
214        if let Some(mi) = self.min_idle {
215            builder = builder.min_idle(mi);
216        }
217        if let Some(st) = self.stale_timeout {
218            builder = builder.stale_timeout(st);
219        }
220        if let Some(msc) = self.max_stmt_cache_size {
221            builder = builder.max_stmt_cache_size(msc);
222        }
223
224        let inner = builder.build().map_err(BsqlError::from)?;
225
226        // Build replica pool if configured
227        let read_pool = if let Some(replica_url) = &self.replica_url {
228            let replica_size = self.replica_max_size.unwrap_or(self.max_size);
229            let mut rbuilder = bsql_driver_postgres::Pool::builder()
230                .url(replica_url)
231                .max_size(replica_size);
232            if let Some(lt) = self.max_lifetime {
233                rbuilder = rbuilder.max_lifetime(lt);
234            }
235            if let Some(at) = self.acquire_timeout {
236                rbuilder = rbuilder.acquire_timeout(at);
237            }
238            Some(rbuilder.build().map_err(BsqlError::from)?)
239        } else {
240            None
241        };
242
243        Ok(Pool { inner, read_pool })
244    }
245}
246
247impl Pool {
248    /// Connect to PostgreSQL using a connection URL.
249    ///
250    /// Creates the pool (parses URL, allocates pool structures). Actual TCP/UDS
251    /// connections are established lazily on first `acquire()`.
252    ///
253    /// Format: `postgres://user:password@host:port/dbname`
254    pub async fn connect(url: &str) -> BsqlResult<Self> {
255        let inner = bsql_driver_postgres::Pool::connect(url).map_err(BsqlError::from)?;
256        Ok(Pool {
257            inner,
258            read_pool: None,
259        })
260    }
261
262    /// Create a pool builder for fine-grained configuration.
263    pub fn builder() -> PoolBuilder {
264        PoolBuilder {
265            url: None,
266            max_size: 10,
267            max_lifetime: None,
268            acquire_timeout: None,
269            min_idle: None,
270            replica_url: None,
271            replica_max_size: None,
272            stale_timeout: None,
273            max_stmt_cache_size: None,
274        }
275    }
276
277    /// Acquire a connection from the pool.
278    ///
279    /// **Fail-fast**: returns `BsqlError::Pool` immediately if no connections
280    /// are available (unless `acquire_timeout` is configured).
281    pub async fn acquire(&self) -> BsqlResult<PoolConnection> {
282        let guard = self.inner.acquire().map_err(BsqlError::from)?;
283        Ok(PoolConnection { inner: guard })
284    }
285
286    /// Begin a new transaction.
287    ///
288    /// Acquires a connection and sends BEGIN immediately.
289    pub async fn begin(&self) -> BsqlResult<Transaction> {
290        let tx = self.inner.begin().map_err(BsqlError::from)?;
291        Ok(Transaction::from_driver(tx))
292    }
293
294    /// Execute a query and return a stream of rows.
295    ///
296    /// Acquires a connection from the pool and returns a [`QueryStream`]
297    /// that holds the connection alive until the stream is consumed or dropped.
298    ///
299    /// Uses true PG-level streaming via `Execute(max_rows=64)`. Only 64 rows
300    /// are in memory at a time. The stream fetches additional chunks on demand
301    /// via the `PortalSuspended` / re-`Execute` protocol.
302    pub async fn query_stream(
303        &self,
304        sql: &str,
305        sql_hash: u64,
306        params: &[&(dyn Encode + Sync)],
307    ) -> BsqlResult<QueryStream> {
308        let mut guard = self.inner.acquire().map_err(BsqlError::from)?;
309        let mut arena = acquire_arena();
310
311        // chunk_size=64 rows per Execute call
312        const CHUNK_SIZE: i32 = 64;
313
314        let (columns, _) = guard
315            .query_streaming_start(sql, sql_hash, params, CHUNK_SIZE)
316            .map_err(BsqlError::from)?;
317
318        let num_cols = columns.len();
319        let mut all_col_offsets: Vec<(usize, i32)> =
320            Vec::with_capacity(num_cols * CHUNK_SIZE as usize);
321
322        let more = guard
323            .streaming_next_chunk(&mut arena, &mut all_col_offsets)
324            .map_err(BsqlError::from)?;
325
326        let first_result = bsql_driver_postgres::QueryResult::from_parts(
327            all_col_offsets,
328            num_cols,
329            columns.clone(),
330            0,
331        );
332
333        Ok(QueryStream::new(guard, arena, first_result, columns, !more))
334    }
335
336    /// Set the SQL statements to pre-PREPARE on new connections.
337    ///
338    /// Each SQL string is PREPAREd on new connections before they are returned
339    /// from `acquire()`. This eliminates first-use Parse overhead for hot queries.
340    ///
341    /// Warmup errors are silently ignored — a bad warmup SQL does not prevent
342    /// the connection from being usable.
343    pub fn set_warmup_sqls<S: Into<Box<str>>>(&self, sqls: impl IntoIterator<Item = S>) {
344        self.inner.set_warmup_sqls(sqls);
345    }
346
347    /// Execute arbitrary SQL and return text rows.
348    ///
349    /// Uses PostgreSQL's simple query protocol — all values returned as strings.
350    /// This bypasses bsql's compile-time SQL validation entirely.
351    ///
352    /// Use for DDL, ad-hoc queries, migrations, or the rare dynamic SQL that
353    /// cannot be expressed via `query!`. For type-safe queries, use `query!`.
354    pub async fn raw_query(&self, sql: &str) -> BsqlResult<Vec<RawRow>> {
355        let mut guard = self.inner.acquire().map_err(BsqlError::from)?;
356        let rows = guard
357            .simple_query_rows(sql)
358            .map_err(BsqlError::from_driver_query)?;
359        Ok(rows.into_iter().map(RawRow).collect())
360    }
361
362    /// Execute arbitrary SQL without returning rows.
363    ///
364    /// Uses PostgreSQL's simple query protocol. Useful for DDL (CREATE TABLE,
365    /// ALTER, DROP), SET commands, or any statement where you don't need results.
366    pub async fn raw_execute(&self, sql: &str) -> BsqlResult<()> {
367        let mut guard = self.inner.acquire().map_err(BsqlError::from)?;
368        guard
369            .simple_query(sql)
370            .map_err(BsqlError::from_driver_query)?;
371        Ok(())
372    }
373
374    /// Bulk copy data INTO a table from an iterator of text rows.
375    ///
376    /// Each row is a tab-separated string (TSV format, matching PostgreSQL's
377    /// default COPY text format). Returns the number of rows copied.
378    ///
379    /// This is 10-100x faster than individual INSERTs for bulk data loading.
380    ///
381    /// # Example
382    ///
383    /// ```rust,ignore
384    /// let rows = vec!["alice\talice@example.com", "bob\tbob@example.com"];
385    /// let count = pool.copy_in("users", &["name", "email"], rows.iter().map(|s| s.as_str())).await?;
386    /// ```
387    pub async fn copy_in<'a, I>(&self, table: &str, columns: &[&str], rows: I) -> BsqlResult<u64>
388    where
389        I: IntoIterator<Item = &'a str>,
390    {
391        let mut guard = self.inner.acquire().map_err(BsqlError::from)?;
392        guard
393            .copy_in(table, columns, rows)
394            .map_err(BsqlError::from_driver_query)
395    }
396
397    /// Bulk copy data OUT of a table or query result to a writer.
398    ///
399    /// Data is written in PostgreSQL's text format (tab-separated columns,
400    /// newline-terminated rows). Returns the number of rows copied.
401    ///
402    /// # Example
403    ///
404    /// ```rust,ignore
405    /// let mut buf = Vec::new();
406    /// let count = pool.copy_out("SELECT name, email FROM users", &mut buf).await?;
407    /// ```
408    pub async fn copy_out<W: std::io::Write>(
409        &self,
410        query: &str,
411        writer: &mut W,
412    ) -> BsqlResult<u64> {
413        let mut guard = self.inner.acquire().map_err(BsqlError::from)?;
414        guard
415            .copy_out(query, writer)
416            .map_err(BsqlError::from_driver_query)
417    }
418
419    /// Pool status metrics: idle, active, open, and max_size.
420    ///
421    /// Returns detailed pool utilization metrics from the driver.
422    pub fn status(&self) -> PoolStatus {
423        let driver_status = self.inner.status();
424        PoolStatus {
425            idle: driver_status.idle,
426            active: driver_status.active,
427            open: driver_status.open,
428            max_size: driver_status.max_size,
429        }
430    }
431
432    /// Gracefully close the pool (and replica pool if configured).
433    ///
434    /// No new connections can be acquired after this call. All idle connections
435    /// are closed immediately. Active connections are closed when returned to
436    /// the pool.
437    pub fn close(&self) {
438        self.inner.close();
439        if let Some(ref rp) = self.read_pool {
440            rp.close();
441        }
442    }
443
444    /// Whether the pool has been closed.
445    pub fn is_closed(&self) -> bool {
446        self.inner.is_closed()
447    }
448
449    /// Whether a read replica pool is configured.
450    pub fn has_replica(&self) -> bool {
451        self.read_pool.is_some()
452    }
453
454    /// Whether this pool uses sync connections via Unix domain sockets.
455    ///
456    /// When `true`, the pool automatically uses `SyncConnection` (blocking I/O)
457    /// internally, eliminating async overhead for sub-microsecond UDS I/O.
458    /// The user API is identical — this is purely a performance optimization.
459    pub fn is_uds(&self) -> bool {
460        self.inner.is_uds()
461    }
462
463    /// Process each row directly from the wire buffer via a closure.
464    ///
465    /// Acquires a connection, calls `Connection::for_each`, and releases.
466    /// Zero arena allocation — the closure reads columns directly from
467    /// the DataRow message bytes.
468    ///
469    /// When `readonly` is true and a replica pool is configured, routes
470    /// to the replica pool; otherwise uses the primary.
471    pub async fn for_each_raw<F>(
472        &self,
473        sql: &str,
474        sql_hash: u64,
475        params: &[&(dyn Encode + Sync)],
476        readonly: bool,
477        mut f: F,
478    ) -> BsqlResult<()>
479    where
480        F: FnMut(bsql_driver_postgres::PgDataRow<'_>) -> BsqlResult<()>,
481    {
482        let pool = if readonly {
483            self.read_pool.as_ref().unwrap_or(&self.inner)
484        } else {
485            &self.inner
486        };
487        let mut guard = pool.acquire().map_err(BsqlError::from)?;
488        // Bridge BsqlError from the user closure into DriverError for the
489        // driver-level for_each. Any closure error is stashed in `user_err`
490        // and re-surfaced after the driver returns.
491        let mut user_err: Option<BsqlError> = None;
492        let driver_result = guard.for_each(sql, sql_hash, params, |row| match f(row) {
493            Ok(()) => Ok(()),
494            Err(e) => {
495                user_err = Some(e);
496                Err(bsql_driver_postgres::DriverError::Protocol(
497                    "for_each closure error".into(),
498                ))
499            }
500        });
501        // If the user closure produced an error, return it directly.
502        if let Some(e) = user_err {
503            return Err(e);
504        }
505        driver_result.map_err(BsqlError::from_driver_query)
506    }
507
508    /// Process each DataRow as raw bytes via inline sequential decode.
509    ///
510    /// Like `for_each_raw` but passes the raw `&[u8]` DataRow payload directly
511    /// to the closure — no `PgDataRow` construction, no SmallVec pre-scan.
512    /// The generated macro code decodes columns inline by advancing a position
513    /// cursor through the bytes.
514    #[doc(hidden)]
515    pub async fn __for_each_raw_bytes<F>(
516        &self,
517        sql: &str,
518        sql_hash: u64,
519        params: &[&(dyn Encode + Sync)],
520        readonly: bool,
521        mut f: F,
522    ) -> BsqlResult<()>
523    where
524        F: FnMut(&[u8]) -> BsqlResult<()>,
525    {
526        let pool = if readonly {
527            self.read_pool.as_ref().unwrap_or(&self.inner)
528        } else {
529            &self.inner
530        };
531        let mut guard = pool.acquire().map_err(BsqlError::from)?;
532        let mut user_err: Option<BsqlError> = None;
533        let driver_result = guard.for_each_raw(sql, sql_hash, params, |data| match f(data) {
534            Ok(()) => Ok(()),
535            Err(e) => {
536                user_err = Some(e);
537                Err(bsql_driver_postgres::DriverError::Protocol(
538                    "for_each closure error".into(),
539                ))
540            }
541        });
542        if let Some(e) = user_err {
543            return Err(e);
544        }
545        driver_result.map_err(BsqlError::from_driver_query)
546    }
547}
548
549impl Clone for Pool {
550    fn clone(&self) -> Self {
551        Pool {
552            inner: self.inner.clone(),
553            read_pool: self.read_pool.clone(),
554        }
555    }
556}
557
558impl std::fmt::Debug for Pool {
559    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
560        f.debug_struct("Pool")
561            .field("status", &self.status())
562            .finish()
563    }
564}
565
566/// A connection borrowed from the pool.
567///
568/// Provides exclusive (`&mut`) access to the underlying `PoolGuard` — no
569/// `Mutex` needed. Generated code converts `&mut PoolConnection` into a
570/// [`QueryTarget`](crate::executor::QueryTarget) for dispatch.
571///
572/// Returned to the pool when dropped.
573pub struct PoolConnection {
574    pub(crate) inner: bsql_driver_postgres::PoolGuard,
575}
576
577impl std::fmt::Debug for PoolConnection {
578    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
579        f.debug_struct("PoolConnection").finish()
580    }
581}
582
583/// Snapshot of pool utilization.
584#[derive(Debug, Clone, Copy)]
585pub struct PoolStatus {
586    /// Number of idle connections in the pool.
587    pub idle: usize,
588    /// Number of connections currently in use.
589    pub active: usize,
590    /// Total open connections (idle + active).
591    pub open: usize,
592    /// Maximum pool size.
593    pub max_size: usize,
594}
595
596impl std::fmt::Display for PoolStatus {
597    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
598        write!(
599            f,
600            "idle={}, active={}, open={}, max={}",
601            self.idle, self.active, self.open, self.max_size
602        )
603    }
604}
605
606#[cfg(test)]
607mod tests {
608    use super::*;
609
610    #[test]
611    fn builder_defaults() {
612        let b = Pool::builder();
613        assert_eq!(b.max_size, 10);
614        assert!(b.max_lifetime.is_none());
615        assert!(b.acquire_timeout.is_none());
616        assert!(b.min_idle.is_none());
617    }
618
619    #[test]
620    fn builder_max_lifetime() {
621        let b = Pool::builder().max_lifetime(Some(Duration::from_secs(60)));
622        assert_eq!(b.max_lifetime, Some(Some(Duration::from_secs(60))));
623    }
624
625    #[test]
626    fn builder_max_lifetime_none_disables() {
627        let b = Pool::builder().max_lifetime(None);
628        assert_eq!(b.max_lifetime, Some(None));
629    }
630
631    #[test]
632    fn builder_acquire_timeout() {
633        let b = Pool::builder().acquire_timeout(Some(Duration::from_secs(3)));
634        assert_eq!(b.acquire_timeout, Some(Some(Duration::from_secs(3))));
635    }
636
637    #[test]
638    fn builder_acquire_timeout_none_disables() {
639        let b = Pool::builder().acquire_timeout(None);
640        assert_eq!(b.acquire_timeout, Some(None));
641    }
642
643    #[test]
644    fn builder_min_idle() {
645        let b = Pool::builder().min_idle(5);
646        assert_eq!(b.min_idle, Some(5));
647    }
648
649    // --- Convenience methods ---
650
651    #[test]
652    fn builder_max_lifetime_secs() {
653        let b = Pool::builder().max_lifetime_secs(1800);
654        assert_eq!(b.max_lifetime, Some(Some(Duration::from_secs(1800))));
655    }
656
657    #[test]
658    fn builder_acquire_timeout_secs() {
659        let b = Pool::builder().acquire_timeout_secs(5);
660        assert_eq!(b.acquire_timeout, Some(Some(Duration::from_secs(5))));
661    }
662
663    // --- Shorthand aliases ---
664
665    #[test]
666    fn builder_lifetime_secs_shorthand() {
667        let b = Pool::builder().lifetime_secs(900);
668        assert_eq!(b.max_lifetime, Some(Some(Duration::from_secs(900))));
669    }
670
671    #[test]
672    fn builder_timeout_secs_shorthand() {
673        let b = Pool::builder().timeout_secs(3);
674        assert_eq!(b.acquire_timeout, Some(Some(Duration::from_secs(3))));
675    }
676
677    // --- Task 2: Read/write splitting ---
678
679    #[test]
680    fn builder_defaults_no_replica() {
681        let b = Pool::builder();
682        assert!(b.replica_url.is_none());
683        assert!(b.replica_max_size.is_none());
684    }
685
686    #[test]
687    fn builder_replica_url() {
688        let b = Pool::builder().replica_url("postgres://replica:5432/db");
689        assert_eq!(b.replica_url.as_deref(), Some("postgres://replica:5432/db"));
690    }
691
692    #[test]
693    fn builder_replica_max_size() {
694        let b = Pool::builder().replica_max_size(20);
695        assert_eq!(b.replica_max_size, Some(20));
696    }
697
698    #[tokio::test]
699    async fn pool_connect_has_no_replica() {
700        let pool = Pool::connect("postgres://user:pass@localhost/db")
701            .await
702            .unwrap();
703        assert!(!pool.has_replica());
704    }
705
706    // --- Auto-UDS sync connection tests ---
707
708    #[tokio::test]
709    async fn pool_is_uds_false_for_tcp() {
710        let pool = Pool::connect("postgres://user:pass@localhost/db")
711            .await
712            .unwrap();
713        assert!(!pool.is_uds());
714    }
715
716    #[cfg(unix)]
717    #[tokio::test]
718    async fn pool_is_uds_true_for_unix_socket() {
719        let pool = Pool::connect("postgres://user@localhost/db?host=/tmp")
720            .await
721            .unwrap();
722        assert!(pool.is_uds());
723    }
724
725    #[tokio::test]
726    async fn pool_is_uds_false_for_ip() {
727        let pool = Pool::connect("postgres://user:pass@127.0.0.1/db")
728            .await
729            .unwrap();
730        assert!(!pool.is_uds());
731    }
732
733    // --- PoolStatus Display ---
734
735    #[test]
736    fn pool_status_display() {
737        let status = PoolStatus {
738            idle: 3,
739            active: 2,
740            open: 5,
741            max_size: 10,
742        };
743        assert_eq!(status.to_string(), "idle=3, active=2, open=5, max=10");
744    }
745
746    #[test]
747    fn pool_status_display_zeros() {
748        let status = PoolStatus {
749            idle: 0,
750            active: 0,
751            open: 0,
752            max_size: 0,
753        };
754        assert_eq!(status.to_string(), "idle=0, active=0, open=0, max=0");
755    }
756
757    // --- PoolConnection Debug ---
758
759    #[test]
760    fn pool_connection_debug() {
761        // PoolConnection wraps a PoolGuard, Debug should not panic
762        let dbg_str = "PoolConnection";
763        assert!(!dbg_str.is_empty());
764        // We can't construct a PoolConnection without a real pool guard,
765        // but we verify the impl exists at compile time through the trait bound.
766        fn _assert_debug<T: std::fmt::Debug>() {}
767        _assert_debug::<PoolConnection>();
768    }
769
770    // --- Pool Debug ---
771
772    #[tokio::test]
773    async fn pool_debug() {
774        let pool = Pool::connect("postgres://user:pass@localhost/db")
775            .await
776            .unwrap();
777        let dbg = format!("{pool:?}");
778        assert!(dbg.contains("Pool"), "Debug should show Pool: {dbg}");
779    }
780
781    // --- Pool Clone ---
782
783    #[tokio::test]
784    async fn pool_clone_is_cheap() {
785        let pool = Pool::connect("postgres://user:pass@localhost/db")
786            .await
787            .unwrap();
788        let pool2 = pool.clone();
789        assert_eq!(pool.status().max_size, pool2.status().max_size);
790        assert!(!pool.has_replica());
791        assert!(!pool2.has_replica());
792    }
793
794    // --- Send + Sync assertions ---
795
796    fn _assert_send<T: Send>() {}
797    fn _assert_sync<T: Sync>() {}
798
799    #[test]
800    fn pool_is_send_and_sync() {
801        _assert_send::<Pool>();
802        _assert_sync::<Pool>();
803    }
804
805    #[test]
806    fn pool_connection_is_send() {
807        _assert_send::<PoolConnection>();
808    }
809
810    #[test]
811    fn pool_status_is_send_and_sync() {
812        _assert_send::<PoolStatus>();
813        _assert_sync::<PoolStatus>();
814    }
815
816    // --- Builder without URL ---
817
818    #[tokio::test]
819    async fn builder_build_without_url_errors() {
820        let result = Pool::builder().build().await;
821        assert!(result.is_err());
822        let err = result.unwrap_err().to_string();
823        assert!(err.contains("URL"), "error should mention URL: {err}");
824    }
825
826    // --- PoolBuilder chaining ---
827
828    #[test]
829    fn builder_chaining() {
830        let b = Pool::builder()
831            .url("postgres://u@localhost/db")
832            .max_size(20)
833            .lifetime_secs(600)
834            .timeout_secs(3)
835            .min_idle(2)
836            .replica_url("postgres://u@replica/db")
837            .replica_max_size(10);
838        assert_eq!(b.max_size, 20);
839        assert_eq!(b.min_idle, Some(2));
840        assert_eq!(b.replica_max_size, Some(10));
841    }
842
843    // --- RawRow ---
844
845    #[test]
846    fn raw_row_get() {
847        let row = RawRow(vec![Some("hello".into()), None, Some("42".into())]);
848        assert_eq!(row.get(0), Some("hello"));
849        assert_eq!(row.get(1), None);
850        assert_eq!(row.get(2), Some("42"));
851        assert_eq!(row.get(99), None);
852        assert_eq!(row.len(), 3);
853    }
854
855    #[test]
856    fn raw_row_is_empty() {
857        let empty = RawRow(vec![]);
858        assert!(empty.is_empty());
859        assert_eq!(empty.len(), 0);
860
861        let non_empty = RawRow(vec![Some("x".into())]);
862        assert!(!non_empty.is_empty());
863    }
864
865    #[test]
866    fn raw_row_iter() {
867        let row = RawRow(vec![Some("a".into()), None, Some("b".into())]);
868        let vals: Vec<_> = row.iter().collect();
869        assert_eq!(vals, vec![Some("a"), None, Some("b")]);
870    }
871
872    #[test]
873    fn raw_row_clone() {
874        let row = RawRow(vec![Some("hello".into()), None]);
875        let cloned = row.clone();
876        assert_eq!(cloned.get(0), Some("hello"));
877        assert_eq!(cloned.get(1), None);
878        assert_eq!(cloned.len(), 2);
879    }
880
881    #[test]
882    fn raw_row_debug() {
883        let row = RawRow(vec![Some("x".into())]);
884        let dbg = format!("{row:?}");
885        assert!(dbg.contains("RawRow"), "Debug should show RawRow: {dbg}");
886    }
887
888    // --- RawRow additional edge cases ---
889
890    #[test]
891    fn raw_row_all_null_values() {
892        let row = RawRow(vec![None, None, None]);
893        assert_eq!(row.len(), 3);
894        assert!(!row.is_empty());
895        assert_eq!(row.get(0), None);
896        assert_eq!(row.get(1), None);
897        assert_eq!(row.get(2), None);
898        // iter should produce all None
899        let vals: Vec<_> = row.iter().collect();
900        assert_eq!(vals, vec![None, None, None]);
901    }
902
903    #[test]
904    fn raw_row_empty_string_values() {
905        let row = RawRow(vec![Some(String::new()), Some("".into())]);
906        assert_eq!(row.len(), 2);
907        // Empty string is Some(""), not None
908        assert_eq!(row.get(0), Some(""));
909        assert_eq!(row.get(1), Some(""));
910    }
911
912    #[test]
913    fn raw_row_get_out_of_bounds() {
914        let row = RawRow(vec![Some("only".into())]);
915        assert_eq!(row.get(0), Some("only"));
916        assert_eq!(row.get(1), None);
917        assert_eq!(row.get(100), None);
918        assert_eq!(row.get(usize::MAX), None);
919    }
920
921    #[test]
922    fn raw_row_iter_empty() {
923        let row = RawRow(vec![]);
924        let vals: Vec<_> = row.iter().collect();
925        assert!(vals.is_empty());
926    }
927
928    #[test]
929    fn raw_row_iter_mixed() {
930        let row = RawRow(vec![
931            Some("hello".into()),
932            None,
933            Some("world".into()),
934            None,
935            Some("".into()),
936        ]);
937        let vals: Vec<_> = row.iter().collect();
938        assert_eq!(
939            vals,
940            vec![Some("hello"), None, Some("world"), None, Some("")]
941        );
942    }
943
944    #[test]
945    fn raw_row_single_null() {
946        let row = RawRow(vec![None]);
947        assert_eq!(row.len(), 1);
948        assert!(!row.is_empty());
949        assert_eq!(row.get(0), None);
950    }
951
952    // --- PoolBuilder stale_timeout ---
953
954    #[test]
955    fn builder_stale_timeout() {
956        let b = Pool::builder().stale_timeout(Duration::from_secs(15));
957        assert_eq!(b.stale_timeout, Some(Duration::from_secs(15)));
958    }
959
960    #[test]
961    fn builder_stale_timeout_default_is_none() {
962        let b = Pool::builder();
963        assert!(b.stale_timeout.is_none());
964    }
965
966    // --- PoolBuilder max_stmt_cache_size ---
967
968    #[test]
969    fn builder_max_stmt_cache_size() {
970        let b = Pool::builder().max_stmt_cache_size(512);
971        assert_eq!(b.max_stmt_cache_size, Some(512));
972    }
973
974    #[test]
975    fn builder_max_stmt_cache_size_default_is_none() {
976        let b = Pool::builder();
977        assert!(b.max_stmt_cache_size.is_none());
978    }
979
980    // --- Pool close / is_closed ---
981
982    #[tokio::test]
983    async fn pool_close_and_is_closed() {
984        let pool = Pool::connect("postgres://user:pass@localhost/db")
985            .await
986            .unwrap();
987        assert!(!pool.is_closed());
988        pool.close();
989        assert!(pool.is_closed());
990    }
991
992    // --- Pool status on fresh pool ---
993
994    #[tokio::test]
995    async fn pool_status_on_fresh_pool() {
996        let pool = Pool::connect("postgres://user:pass@localhost/db")
997            .await
998            .unwrap();
999        let status = pool.status();
1000        assert_eq!(status.idle, 0, "fresh pool should have 0 idle");
1001        assert_eq!(status.active, 0, "fresh pool should have 0 active");
1002        assert_eq!(status.open, 0, "fresh pool should have 0 open");
1003        assert_eq!(status.max_size, 10, "default max_size should be 10");
1004    }
1005
1006    // --- PoolStatus Clone and Copy ---
1007
1008    #[test]
1009    fn pool_status_clone_and_copy() {
1010        let status = PoolStatus {
1011            idle: 1,
1012            active: 2,
1013            open: 3,
1014            max_size: 10,
1015        };
1016        let cloned = status;
1017        assert_eq!(cloned.idle, 1);
1018        assert_eq!(cloned.active, 2);
1019        assert_eq!(cloned.open, 3);
1020        assert_eq!(cloned.max_size, 10);
1021    }
1022
1023    // --- PoolStatus Debug ---
1024
1025    #[test]
1026    fn pool_status_debug() {
1027        let status = PoolStatus {
1028            idle: 1,
1029            active: 2,
1030            open: 3,
1031            max_size: 10,
1032        };
1033        let dbg = format!("{status:?}");
1034        assert!(
1035            dbg.contains("PoolStatus"),
1036            "Debug should show PoolStatus: {dbg}"
1037        );
1038    }
1039
1040    // --- Builder max_size ---
1041
1042    #[test]
1043    fn builder_max_size() {
1044        let b = Pool::builder().max_size(50);
1045        assert_eq!(b.max_size, 50);
1046    }
1047
1048    // --- Builder url ---
1049
1050    #[test]
1051    fn builder_url_stored() {
1052        let b = Pool::builder().url("postgres://localhost/test");
1053        assert_eq!(b.url.as_deref(), Some("postgres://localhost/test"));
1054    }
1055
1056    // --- RawRow unicode content ---
1057
1058    #[test]
1059    fn raw_row_unicode_content() {
1060        let row = RawRow(vec![
1061            Some("\u{1F600}".into()),                                // emoji
1062            Some("\u{0645}\u{0631}\u{062D}\u{0628}\u{0627}".into()), // Arabic
1063            Some("\u{00E9}\u{00E8}\u{00EA}".into()),                 // French accents
1064        ]);
1065        assert_eq!(row.get(0), Some("\u{1F600}"));
1066        assert_eq!(row.len(), 3);
1067    }
1068
1069    // --- RawRow large column count ---
1070
1071    #[test]
1072    fn raw_row_many_columns() {
1073        let cols: Vec<Option<String>> = (0..100).map(|i| Some(format!("val_{i}"))).collect();
1074        let row = RawRow(cols);
1075        assert_eq!(row.len(), 100);
1076        assert_eq!(row.get(0), Some("val_0"));
1077        assert_eq!(row.get(99), Some("val_99"));
1078        assert_eq!(row.get(100), None);
1079    }
1080
1081    // --- Pool has_replica false by default ---
1082
1083    #[tokio::test]
1084    async fn pool_has_replica_false_default() {
1085        let pool = Pool::connect("postgres://user:pass@localhost/db")
1086            .await
1087            .unwrap();
1088        assert!(!pool.has_replica());
1089    }
1090
1091    // --- Builder complete chaining returns correct state ---
1092
1093    #[test]
1094    fn builder_full_chain() {
1095        let b = Pool::builder()
1096            .url("postgres://u@localhost/db")
1097            .max_size(32)
1098            .lifetime_secs(600)
1099            .timeout_secs(3)
1100            .min_idle(4)
1101            .stale_timeout(Duration::from_secs(30))
1102            .max_stmt_cache_size(128)
1103            .replica_url("postgres://u@replica/db")
1104            .replica_max_size(16);
1105        assert_eq!(b.max_size, 32);
1106        assert_eq!(b.min_idle, Some(4));
1107        assert_eq!(b.stale_timeout, Some(Duration::from_secs(30)));
1108        assert_eq!(b.max_stmt_cache_size, Some(128));
1109        assert_eq!(b.replica_max_size, Some(16));
1110    }
1111}