sqlx_aurora/
reader.rs

1use std::ops::{Deref, DerefMut};
2
3use crate::config::DbConfig;
4
5use sqlx::postgres::PgPoolOptions;
6use sqlx::{Connection, PgConnection, Pool, Postgres};
7
8pub struct Reader(Pool<Postgres>);
9
10impl Reader {
11    pub async fn new(db_config: DbConfig) -> Result<Self, sqlx::Error> {
12        let conn_string = db_config.connection_url();
13        let mut conn = PgConnection::connect(&conn_string).await?;
14        let sql = "show transaction_read_only";
15        let transaction_read_only: String = sqlx::query_scalar(sql).fetch_one(&mut conn).await?;
16        if transaction_read_only != "on" {
17            return Err(sqlx::Error::Configuration(
18                "The host is not a reader instance; please check your configurations".into(),
19            ));
20        }
21
22        let pool = PgPoolOptions::new()
23            .min_connections(db_config.min_pool_size)
24            .max_connections(db_config.max_pool_size)
25            .test_before_acquire(true)
26            .connect(&db_config.connection_url())
27            .await?;
28        Ok(Self(pool))
29    }
30
31    pub fn pool(&self) -> &Pool<Postgres> {
32        &self.0
33    }
34}
35impl Deref for Reader {
36    type Target = Pool<Postgres>;
37
38    fn deref(&self) -> &Self::Target {
39        &self.0
40    }
41}
42
43impl DerefMut for Reader {
44    fn deref_mut(&mut self) -> &mut Self::Target {
45        &mut self.0
46    }
47}
48
49#[cfg(test)]
50mod reader_tests {
51
52    use crate::config::{DbConfig, DbConfigBuilder};
53    use crate::reader::Reader;
54    use std::error::Error;
55
56    #[tokio::test]
57    async fn should_return_error_because_instance_is_not_a_reader() -> Result<(), Box<dyn Error>> {
58        dotenv::from_filename(".writer.env").ok();
59        let db_config = DbConfig::from_env();
60        let reader = Reader::new(db_config.clone()).await;
61
62        let err = reader.map_err(|e| e.to_string()).err().unwrap();
63        assert_eq!(err, "error with configuration: The host is not a reader instance; please check your configurations");
64        Ok(())
65    }
66
67    #[tokio::test]
68    async fn should_return_data_when_instance_is_a_reader() -> Result<(), Box<dyn Error>> {
69        dotenv::from_filename(".reader.env").ok();
70        let db_config = DbConfig::from_env();
71        let reader = Reader::new(db_config.clone()).await?;
72        let row: (i32,) = sqlx::query_as("SELECT 1").fetch_one(reader.pool()).await?;
73        assert_eq!(row.0, 1);
74        Ok(())
75    }
76
77    #[tokio::test]
78    async fn should_return_data_when_instance_is_a_reader_from_direct_config(
79    ) -> Result<(), Box<dyn Error>> {
80        let db_config = DbConfigBuilder::new()
81            .host("localhost")
82            .user("reader_user")
83            .name("reader")
84            .pass("password")
85            .app_name("some-app-to-run")
86            .port(5431)
87            .min_pool_size(1)
88            .max_pool_size(3)
89            .idle_in_transaction_session(3000)
90            .build()
91            .expect("problem to create db config");
92        let reader = Reader::new(db_config.clone()).await?;
93        let row: (i32,) = sqlx::query_as("SELECT 1").fetch_one(reader.pool()).await?;
94        assert_eq!(row.0, 1);
95        Ok(())
96    }
97
98    #[tokio::test]
99    async fn should_return_error_because_user_has_not_access() -> Result<(), Box<dyn Error>> {
100        dotenv::from_filename(".reader.env").ok();
101        let db_config = DbConfig::from_env();
102        let reader = Reader::new(db_config.clone()).await?;
103        let result = sqlx::query("CREATE TEMP TABLE temp_test (id INT)")
104            .execute(reader.pool())
105            .await;
106
107        let is_read_only = match result {
108            Ok(_) => false,
109            Err(sqlx::Error::Database(db_err)) => {
110                let cant_write_in_read_only_transaction = "25006";
111                db_err.code().as_deref() == Some(cant_write_in_read_only_transaction)
112            }
113            Err(_) => false,
114        };
115        assert!(is_read_only);
116        Ok(())
117    }
118}