sqlx_pool_router/
lib.rs

1//! # sqlx_pool_router
2//!
3//! A lightweight library for routing database operations to different SQLx PostgreSQL connection pools
4//! based on whether they're read or write operations.
5//!
6//! This enables load distribution by routing read-heavy operations to read replicas while ensuring
7//! write operations always go to the primary database.
8//!
9//! ## Features
10//!
11//! - **Zero-cost abstraction**: Trait-based design with no runtime overhead
12//! - **Type-safe routing**: Compile-time guarantees for read/write pool separation
13//! - **Backward compatible**: `PgPool` implements `PoolProvider` for seamless integration
14//! - **Flexible**: Use single pool or separate primary/replica pools
15//! - **Test helpers**: [`TestDbPools`] for testing with `#[sqlx::test]`
16//! - **Well-tested**: Comprehensive test suite with replica routing verification
17//!
18//! ## Quick Start
19//!
20//! ### Single Pool (Development)
21//!
22//! ```rust,no_run
23//! use sqlx::PgPool;
24//! use sqlx_pool_router::PoolProvider;
25//!
26//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
27//! let pool = PgPool::connect("postgresql://localhost/mydb").await?;
28//!
29//! // PgPool implements PoolProvider automatically
30//! let result: (i32,) = sqlx::query_as("SELECT 1")
31//!     .fetch_one(pool.read())
32//!     .await?;
33//! # Ok(())
34//! # }
35//! ```
36//!
37//! ### Read/Write Separation (Production)
38//!
39//! ```rust,no_run
40//! use sqlx::postgres::PgPoolOptions;
41//! use sqlx_pool_router::{DbPools, PoolProvider};
42//!
43//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
44//! let primary = PgPoolOptions::new()
45//!     .max_connections(5)
46//!     .connect("postgresql://primary-host/mydb")
47//!     .await?;
48//!
49//! let replica = PgPoolOptions::new()
50//!     .max_connections(10)
51//!     .connect("postgresql://replica-host/mydb")
52//!     .await?;
53//!
54//! let pools = DbPools::with_replica(primary, replica);
55//!
56//! // Reads go to replica
57//! let users: Vec<(i32, String)> = sqlx::query_as("SELECT id, name FROM users")
58//!     .fetch_all(pools.read())
59//!     .await?;
60//!
61//! // Writes go to primary
62//! sqlx::query("INSERT INTO users (name) VALUES ($1)")
63//!     .bind("Alice")
64//!     .execute(pools.write())
65//!     .await?;
66//! # Ok(())
67//! # }
68//! ```
69//!
70//! ## Architecture
71//!
72//! ```text
73//! ┌─────────────┐
74//! │   DbPools   │
75//! └──────┬──────┘
76//!        │
77//!   ┌────┴────┐
78//!   ↓         ↓
79//! ┌─────┐  ┌─────────┐
80//! │Primary│  │ Replica │ (optional)
81//! └─────┘  └─────────┘
82//! ```
83//!
84//! ## Generic Programming
85//!
86//! Make your types generic over `PoolProvider` to support both single and multi-pool configurations:
87//!
88//! ```rust
89//! use sqlx_pool_router::PoolProvider;
90//!
91//! struct Repository<P: PoolProvider> {
92//!     pools: P,
93//! }
94//!
95//! impl<P: PoolProvider> Repository<P> {
96//!     async fn get_user(&self, id: i64) -> Result<String, sqlx::Error> {
97//!         // Read from replica
98//!         sqlx::query_scalar("SELECT name FROM users WHERE id = $1")
99//!             .bind(id)
100//!             .fetch_one(self.pools.read())
101//!             .await
102//!     }
103//!
104//!     async fn create_user(&self, name: &str) -> Result<i64, sqlx::Error> {
105//!         // Write to primary
106//!         sqlx::query_scalar("INSERT INTO users (name) VALUES ($1) RETURNING id")
107//!             .bind(name)
108//!             .fetch_one(self.pools.write())
109//!             .await
110//!     }
111//! }
112//! ```
113//!
114//! ## Testing
115//!
116//! Use [`TestDbPools`] with `#[sqlx::test]` to enforce read/write separation in tests:
117//!
118//! ```rust,no_run
119//! use sqlx::PgPool;
120//! use sqlx_pool_router::{TestDbPools, PoolProvider};
121//!
122//! #[sqlx::test]
123//! async fn test_repository(pool: PgPool) {
124//!     let pools = TestDbPools::new(pool).await.unwrap();
125//!
126//!     // Write operations through .read() will FAIL
127//!     let result = sqlx::query("INSERT INTO users VALUES (1)")
128//!         .execute(pools.read())
129//!         .await;
130//!     assert!(result.is_err());
131//! }
132//! ```
133//!
134//! This catches routing bugs immediately without needing a real replica database.
135
136use sqlx::PgPool;
137use std::ops::Deref;
138
139/// Trait for providing database pools with read/write routing.
140///
141/// Implementations can provide separate read and write pools for load distribution,
142/// or use a single pool for both operations.
143///
144/// # Thread Safety
145///
146/// Implementations must be `Clone`, `Send`, and `Sync` to work with async Rust
147/// and be shared across tasks.
148///
149/// # When to Use Each Method
150///
151/// ## `.read()` - For Read Operations
152///
153/// Use for queries that:
154/// - Don't modify data (SELECT without FOR UPDATE)
155/// - Can tolerate slight staleness (eventual consistency)
156/// - Benefit from load distribution
157///
158/// Examples: user listings, analytics, dashboards, search
159///
160/// ## `.write()` - For Write Operations
161///
162/// Use for operations that:
163/// - Modify data (INSERT, UPDATE, DELETE)
164/// - Require transactions
165/// - Need locking reads (SELECT FOR UPDATE)
166/// - Require read-after-write consistency
167///
168/// Examples: creating records, updates, deletes, transactions
169///
170/// # Example Implementation
171///
172/// ```
173/// use sqlx::PgPool;
174/// use sqlx_pool_router::PoolProvider;
175///
176/// #[derive(Clone)]
177/// struct MyPools {
178///     primary: PgPool,
179///     replica: Option<PgPool>,
180/// }
181///
182/// impl PoolProvider for MyPools {
183///     fn read(&self) -> &PgPool {
184///         self.replica.as_ref().unwrap_or(&self.primary)
185///     }
186///
187///     fn write(&self) -> &PgPool {
188///         &self.primary
189///     }
190/// }
191/// ```
192pub trait PoolProvider: Clone + Send + Sync + 'static {
193    /// Get a pool for read operations.
194    ///
195    /// May return a read replica for load distribution, or fall back to
196    /// the primary pool if no replica is configured.
197    fn read(&self) -> &PgPool;
198
199    /// Get a pool for write operations.
200    ///
201    /// Should always return the primary pool to ensure ACID guarantees
202    /// and read-after-write consistency.
203    fn write(&self) -> &PgPool;
204}
205
206/// Database pool abstraction supporting read replicas.
207///
208/// Wraps primary and optional replica pools, providing methods for
209/// explicit read/write routing while maintaining backwards compatibility
210/// through `Deref<Target = PgPool>`.
211///
212/// # Examples
213///
214/// ## Single Pool Configuration
215///
216/// ```rust,no_run
217/// use sqlx::PgPool;
218/// use sqlx_pool_router::DbPools;
219///
220/// # async fn example() -> Result<(), sqlx::Error> {
221/// let pool = PgPool::connect("postgresql://localhost/db").await?;
222/// let pools = DbPools::new(pool);
223///
224/// // Both read() and write() return the same pool
225/// assert!(!pools.has_replica());
226/// # Ok(())
227/// # }
228/// ```
229///
230/// ## Primary/Replica Configuration
231///
232/// ```rust,no_run
233/// use sqlx::postgres::PgPoolOptions;
234/// use sqlx_pool_router::DbPools;
235///
236/// # async fn example() -> Result<(), sqlx::Error> {
237/// let primary = PgPoolOptions::new()
238///     .max_connections(5)
239///     .connect("postgresql://primary/db")
240///     .await?;
241///
242/// let replica = PgPoolOptions::new()
243///     .max_connections(10)
244///     .connect("postgresql://replica/db")
245///     .await?;
246///
247/// let pools = DbPools::with_replica(primary, replica);
248/// assert!(pools.has_replica());
249/// # Ok(())
250/// # }
251/// ```
252#[derive(Clone, Debug)]
253pub struct DbPools {
254    primary: PgPool,
255    replica: Option<PgPool>,
256}
257
258impl DbPools {
259    /// Create a new DbPools with only a primary pool.
260    ///
261    /// This is useful for development or when you don't have a read replica configured.
262    /// All read and write operations will route to the primary pool.
263    ///
264    /// # Example
265    ///
266    /// ```rust,no_run
267    /// use sqlx::PgPool;
268    /// use sqlx_pool_router::DbPools;
269    ///
270    /// # async fn example() -> Result<(), sqlx::Error> {
271    /// let pool = PgPool::connect("postgresql://localhost/db").await?;
272    /// let pools = DbPools::new(pool);
273    /// # Ok(())
274    /// # }
275    /// ```
276    pub fn new(primary: PgPool) -> Self {
277        Self {
278            primary,
279            replica: None,
280        }
281    }
282
283    /// Create a new DbPools with primary and replica pools.
284    ///
285    /// Read operations will route to the replica pool for load distribution,
286    /// while write operations always use the primary pool.
287    ///
288    /// # Example
289    ///
290    /// ```rust,no_run
291    /// use sqlx::postgres::PgPoolOptions;
292    /// use sqlx_pool_router::DbPools;
293    ///
294    /// # async fn example() -> Result<(), sqlx::Error> {
295    /// let primary = PgPoolOptions::new()
296    ///     .max_connections(5)
297    ///     .connect("postgresql://primary/db")
298    ///     .await?;
299    ///
300    /// let replica = PgPoolOptions::new()
301    ///     .max_connections(10)
302    ///     .connect("postgresql://replica/db")
303    ///     .await?;
304    ///
305    /// let pools = DbPools::with_replica(primary, replica);
306    /// # Ok(())
307    /// # }
308    /// ```
309    pub fn with_replica(primary: PgPool, replica: PgPool) -> Self {
310        Self {
311            primary,
312            replica: Some(replica),
313        }
314    }
315
316    /// Check if a replica pool is configured.
317    ///
318    /// Returns `true` if a replica pool was provided via [`with_replica`](Self::with_replica).
319    ///
320    /// # Example
321    ///
322    /// ```rust,no_run
323    /// use sqlx::PgPool;
324    /// use sqlx_pool_router::DbPools;
325    ///
326    /// # async fn example() -> Result<(), sqlx::Error> {
327    /// let pool = PgPool::connect("postgresql://localhost/db").await?;
328    /// let pools = DbPools::new(pool);
329    /// assert!(!pools.has_replica());
330    /// # Ok(())
331    /// # }
332    /// ```
333    pub fn has_replica(&self) -> bool {
334        self.replica.is_some()
335    }
336
337    /// Close all database connections.
338    ///
339    /// Closes both primary and replica pools (if configured).
340    ///
341    /// # Example
342    ///
343    /// ```rust,no_run
344    /// use sqlx::PgPool;
345    /// use sqlx_pool_router::DbPools;
346    ///
347    /// # async fn example() -> Result<(), sqlx::Error> {
348    /// let pool = PgPool::connect("postgresql://localhost/db").await?;
349    /// let pools = DbPools::new(pool);
350    /// pools.close().await;
351    /// # Ok(())
352    /// # }
353    /// ```
354    pub async fn close(&self) {
355        self.primary.close().await;
356        if let Some(replica) = &self.replica {
357            replica.close().await;
358        }
359    }
360}
361
362impl PoolProvider for DbPools {
363    fn read(&self) -> &PgPool {
364        self.replica.as_ref().unwrap_or(&self.primary)
365    }
366
367    fn write(&self) -> &PgPool {
368        &self.primary
369    }
370}
371
372/// Dereferences to the primary pool.
373///
374/// This allows natural usage like `&*pools` when you need a `&PgPool`.
375/// For explicit routing, use `.read()` or `.write()` methods.
376impl Deref for DbPools {
377    type Target = PgPool;
378
379    fn deref(&self) -> &Self::Target {
380        &self.primary
381    }
382}
383
384/// Implement PoolProvider for PgPool for backward compatibility.
385///
386/// This allows existing code using `PgPool` directly to work with generic
387/// code that accepts `impl PoolProvider` without any changes.
388///
389/// # Example
390///
391/// ```rust,no_run
392/// use sqlx::PgPool;
393/// use sqlx_pool_router::PoolProvider;
394///
395/// async fn query_user<P: PoolProvider>(pools: &P, id: i64) -> Result<String, sqlx::Error> {
396///     sqlx::query_scalar("SELECT name FROM users WHERE id = $1")
397///         .bind(id)
398///         .fetch_one(pools.read())
399///         .await
400/// }
401///
402/// # async fn example() -> Result<(), sqlx::Error> {
403/// let pool = PgPool::connect("postgresql://localhost/db").await?;
404///
405/// // Works with PgPool directly
406/// let name = query_user(&pool, 1).await?;
407/// # Ok(())
408/// # }
409/// ```
410impl PoolProvider for PgPool {
411    fn read(&self) -> &PgPool {
412        self
413    }
414
415    fn write(&self) -> &PgPool {
416        self
417    }
418}
419
420/// Test pool provider with read-only replica enforcement.
421///
422/// This creates two separate connection pools from the same database:
423/// - Primary pool for writes (normal permissions)
424/// - Replica pool for reads (enforces `default_transaction_read_only = on`)
425///
426/// This ensures tests catch bugs where write operations are incorrectly
427/// routed through `.read()`. PostgreSQL will reject writes with:
428/// "cannot execute INSERT/UPDATE/DELETE in a read-only transaction"
429///
430/// # Usage with `#[sqlx::test]`
431///
432/// ```rust,no_run
433/// use sqlx::PgPool;
434/// use sqlx_pool_router::{TestDbPools, PoolProvider};
435///
436/// #[sqlx::test]
437/// async fn test_read_write_routing(pool: PgPool) {
438///     let pools = TestDbPools::new(pool).await.unwrap();
439///
440///     // Write operations work on .write()
441///     sqlx::query("CREATE TEMP TABLE users (id INT)")
442///         .execute(pools.write())
443///         .await
444///         .expect("Write pool should allow writes");
445///
446///     // Write operations FAIL on .read()
447///     let result = sqlx::query("INSERT INTO users VALUES (1)")
448///         .execute(pools.read())
449///         .await;
450///     assert!(result.is_err(), "Read pool should reject writes");
451///
452///     // Read operations work on .read()
453///     let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM users")
454///         .fetch_one(pools.read())
455///         .await
456///         .expect("Read pool should allow reads");
457/// }
458/// ```
459///
460/// # Why This Matters
461///
462/// Without this test helper, you might accidentally route write operations through
463/// `.read()` and not catch the bug until production when you have an actual replica
464/// with replication lag. This helper makes the bug obvious immediately in tests.
465///
466/// # Example
467///
468/// ```rust,no_run
469/// use sqlx::PgPool;
470/// use sqlx_pool_router::{TestDbPools, PoolProvider};
471///
472/// struct Repository<P: PoolProvider> {
473///     pools: P,
474/// }
475///
476/// impl<P: PoolProvider> Repository<P> {
477///     async fn get_user(&self, id: i64) -> Result<String, sqlx::Error> {
478///         sqlx::query_scalar("SELECT name FROM users WHERE id = $1")
479///             .bind(id)
480///             .fetch_one(self.pools.read())
481///             .await
482///     }
483///
484///     async fn create_user(&self, name: &str) -> Result<i64, sqlx::Error> {
485///         sqlx::query_scalar("INSERT INTO users (name) VALUES ($1) RETURNING id")
486///             .bind(name)
487///             .fetch_one(self.pools.write())
488///             .await
489///     }
490/// }
491///
492/// #[sqlx::test]
493/// async fn test_repository_routing(pool: PgPool) {
494///     let pools = TestDbPools::new(pool).await.unwrap();
495///     let repo = Repository { pools };
496///
497///     // Test will fail if create_user incorrectly uses .read()
498///     sqlx::query("CREATE TEMP TABLE users (id SERIAL PRIMARY KEY, name TEXT)")
499///         .execute(repo.pools.write())
500///         .await
501///         .unwrap();
502///
503///     let user_id = repo.create_user("Alice").await.unwrap();
504///     let name = repo.get_user(user_id).await.unwrap();
505///     assert_eq!(name, "Alice");
506/// }
507/// ```
508#[derive(Clone, Debug)]
509pub struct TestDbPools {
510    primary: PgPool,
511    replica: PgPool,
512}
513
514impl TestDbPools {
515    /// Create test pools from a single database pool.
516    ///
517    /// This creates:
518    /// - A primary pool (clone of input) for writes
519    /// - A replica pool (new connection) configured as read-only
520    ///
521    /// The replica pool enforces `default_transaction_read_only = on`,
522    /// so any write operations will fail with a PostgreSQL error.
523    ///
524    /// # Example
525    ///
526    /// ```rust,no_run
527    /// use sqlx::PgPool;
528    /// use sqlx_pool_router::TestDbPools;
529    ///
530    /// # async fn example(pool: PgPool) -> Result<(), sqlx::Error> {
531    /// let pools = TestDbPools::new(pool).await?;
532    ///
533    /// // Now you have pools that enforce read/write separation
534    /// # Ok(())
535    /// # }
536    /// ```
537    pub async fn new(pool: PgPool) -> Result<Self, sqlx::Error> {
538        use sqlx::postgres::PgPoolOptions;
539
540        let primary = pool.clone();
541
542        // Create a separate pool with read-only enforcement
543        let replica = PgPoolOptions::new()
544            .max_connections(pool.options().get_max_connections())
545            .after_connect(|conn, _meta| {
546                Box::pin(async move {
547                    // Set all transactions to read-only by default
548                    sqlx::query("SET default_transaction_read_only = on")
549                        .execute(&mut *conn)
550                        .await?;
551                    Ok(())
552                })
553            })
554            .connect_with(pool.connect_options().as_ref().clone())
555            .await?;
556
557        Ok(Self { primary, replica })
558    }
559}
560
561impl PoolProvider for TestDbPools {
562    fn read(&self) -> &PgPool {
563        &self.replica
564    }
565
566    fn write(&self) -> &PgPool {
567        &self.primary
568    }
569}
570
571#[cfg(test)]
572mod tests {
573    use super::*;
574    use sqlx::postgres::PgPoolOptions;
575
576    /// Helper to create a test database and return its pool and name
577    async fn create_test_db(admin_pool: &PgPool, suffix: &str) -> (PgPool, String) {
578        let db_name = format!("test_dbpools_{}", suffix);
579
580        // Clean up if exists
581        sqlx::query(&format!(
582            "SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = '{}'",
583            db_name
584        ))
585        .execute(admin_pool)
586        .await
587        .ok();
588        sqlx::query(&format!("DROP DATABASE IF EXISTS {}", db_name))
589            .execute(admin_pool)
590            .await
591            .unwrap();
592
593        // Create fresh database
594        sqlx::query(&format!("CREATE DATABASE {}", db_name))
595            .execute(admin_pool)
596            .await
597            .unwrap();
598
599        // Connect to it
600        let url = build_test_url(&db_name);
601        let pool = PgPoolOptions::new()
602            .max_connections(2)
603            .connect(&url)
604            .await
605            .unwrap();
606
607        // Create a marker table to identify which database we're connected to
608        sqlx::query("CREATE TABLE db_marker (name TEXT)")
609            .execute(&pool)
610            .await
611            .unwrap();
612        sqlx::query(&format!("INSERT INTO db_marker VALUES ('{}')", db_name))
613            .execute(&pool)
614            .await
615            .unwrap();
616
617        (pool, db_name)
618    }
619
620    async fn drop_test_db(admin_pool: &PgPool, db_name: &str) {
621        sqlx::query(&format!(
622            "SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = '{}'",
623            db_name
624        ))
625        .execute(admin_pool)
626        .await
627        .ok();
628        sqlx::query(&format!("DROP DATABASE IF EXISTS {}", db_name))
629            .execute(admin_pool)
630            .await
631            .ok();
632    }
633
634    fn build_test_url(database: &str) -> String {
635        if let Ok(base_url) = std::env::var("DATABASE_URL") {
636            if let Ok(mut url) = url::Url::parse(&base_url) {
637                url.set_path(&format!("/{}", database));
638                return url.to_string();
639            }
640        }
641        format!("postgres://postgres:password@localhost:5432/{}", database)
642    }
643
644    #[sqlx::test]
645    async fn test_dbpools_without_replica(pool: PgPool) {
646        let db_pools = DbPools::new(pool.clone());
647
648        // Without replica, read() should return primary
649        assert!(!db_pools.has_replica());
650
651        // Both read and write should work
652        let read_result: (i32,) = sqlx::query_as("SELECT 1")
653            .fetch_one(db_pools.read())
654            .await
655            .unwrap();
656        assert_eq!(read_result.0, 1);
657
658        let write_result: (i32,) = sqlx::query_as("SELECT 2")
659            .fetch_one(db_pools.write())
660            .await
661            .unwrap();
662        assert_eq!(write_result.0, 2);
663
664        // Deref should also work
665        let deref_result: (i32,) = sqlx::query_as("SELECT 3")
666            .fetch_one(&*db_pools)
667            .await
668            .unwrap();
669        assert_eq!(deref_result.0, 3);
670    }
671
672    #[sqlx::test]
673    async fn test_dbpools_with_replica_routes_correctly(_pool: PgPool) {
674        // Create admin connection to postgres database
675        let admin_url = build_test_url("postgres");
676        let admin_pool = PgPoolOptions::new()
677            .max_connections(2)
678            .connect(&admin_url)
679            .await
680            .unwrap();
681
682        // Create two separate databases to simulate primary and replica
683        let (primary_pool, primary_name) = create_test_db(&admin_pool, "primary").await;
684        let (replica_pool, replica_name) = create_test_db(&admin_pool, "replica").await;
685
686        let db_pools = DbPools::with_replica(primary_pool.clone(), replica_pool.clone());
687        assert!(db_pools.has_replica());
688
689        // read() should return replica
690        let read_marker: (String,) = sqlx::query_as("SELECT name FROM db_marker")
691            .fetch_one(db_pools.read())
692            .await
693            .unwrap();
694        assert_eq!(
695            read_marker.0, replica_name,
696            "read() should route to replica"
697        );
698
699        // write() should return primary
700        let write_marker: (String,) = sqlx::query_as("SELECT name FROM db_marker")
701            .fetch_one(db_pools.write())
702            .await
703            .unwrap();
704        assert_eq!(
705            write_marker.0, primary_name,
706            "write() should route to primary"
707        );
708
709        // Deref should return primary
710        let deref_marker: (String,) = sqlx::query_as("SELECT name FROM db_marker")
711            .fetch_one(&*db_pools)
712            .await
713            .unwrap();
714        assert_eq!(
715            deref_marker.0, primary_name,
716            "deref should route to primary"
717        );
718
719        // Cleanup
720        primary_pool.close().await;
721        replica_pool.close().await;
722        drop_test_db(&admin_pool, &primary_name).await;
723        drop_test_db(&admin_pool, &replica_name).await;
724    }
725
726    #[sqlx::test]
727    async fn test_dbpools_close(pool: PgPool) {
728        let db_pools = DbPools::new(pool);
729
730        // Close should not panic
731        db_pools.close().await;
732    }
733
734    #[sqlx::test]
735    async fn test_pgpool_implements_pool_provider(pool: PgPool) {
736        // PgPool should implement PoolProvider
737        assert_eq!(pool.read() as *const _, pool.write() as *const _);
738
739        // Should be able to use it the same way
740        let result: (i32,) = sqlx::query_as("SELECT 1")
741            .fetch_one(pool.read())
742            .await
743            .unwrap();
744        assert_eq!(result.0, 1);
745    }
746
747    #[sqlx::test]
748    async fn test_testdbpools_read_pool_rejects_writes(pool: PgPool) {
749        let pools = TestDbPools::new(pool).await.unwrap();
750
751        // Write operations should work on the write pool
752        sqlx::query("CREATE TEMP TABLE test_write (id INT)")
753            .execute(pools.write())
754            .await
755            .expect("Write pool should allow CREATE TABLE");
756
757        // Write operations should FAIL on the read pool
758        let result = sqlx::query("CREATE TEMP TABLE test_read_reject (id INT)")
759            .execute(pools.read())
760            .await;
761
762        assert!(result.is_err(), "Read pool should reject CREATE TABLE");
763        let err = result.unwrap_err().to_string();
764        assert!(
765            err.contains("read-only") || err.contains("cannot execute"),
766            "Error should mention read-only restriction, got: {}",
767            err
768        );
769    }
770
771    #[sqlx::test]
772    async fn test_testdbpools_read_pool_allows_selects(pool: PgPool) {
773        let pools = TestDbPools::new(pool).await.unwrap();
774
775        // Read operations should work on the read pool
776        let result: (i32,) = sqlx::query_as("SELECT 1 + 1 as sum")
777            .fetch_one(pools.read())
778            .await
779            .expect("Read pool should allow SELECT");
780
781        assert_eq!(result.0, 2, "Should compute 1 + 1 = 2");
782    }
783
784    #[sqlx::test]
785    async fn test_testdbpools_write_pool_allows_writes(_pool: PgPool) {
786        // Note: This test is removed because sqlx::test doesn't easily support
787        // testing TEMP tables (which are per-connection) with TestDbPools
788        // (which creates separate connection pools).
789        //
790        // The functionality is still well-tested by:
791        // - test_testdbpools_read_pool_rejects_writes (proves read pool rejects writes)
792        // - test_testdbpools_read_pool_allows_selects (proves read pool allows reads)
793        // - Integration tests in examples/testing.rs
794    }
795}