1use std::ops::{Deref, DerefMut};
2
3use crate::config::DbConfig;
4
5use sqlx::postgres::PgPoolOptions;
6use sqlx::{Connection, PgConnection, Pool, Postgres};
7
8pub struct Reader(Pool<Postgres>);
9
10impl Reader {
11 pub async fn new(db_config: DbConfig) -> Result<Self, sqlx::Error> {
12 let conn_string = db_config.connection_url();
13 let mut conn = PgConnection::connect(&conn_string).await?;
14 let sql = "show transaction_read_only";
15 let transaction_read_only: String = sqlx::query_scalar(sql).fetch_one(&mut conn).await?;
16 if transaction_read_only != "on" {
17 return Err(sqlx::Error::Configuration(
18 "The host is not a reader instance; please check your configurations".into(),
19 ));
20 }
21
22 let pool = PgPoolOptions::new()
23 .min_connections(db_config.min_pool_size)
24 .max_connections(db_config.max_pool_size)
25 .test_before_acquire(true)
26 .connect(&db_config.connection_url())
27 .await?;
28 Ok(Self(pool))
29 }
30
31 pub fn pool(&self) -> &Pool<Postgres> {
32 &self.0
33 }
34}
35impl Deref for Reader {
36 type Target = Pool<Postgres>;
37
38 fn deref(&self) -> &Self::Target {
39 &self.0
40 }
41}
42
43impl DerefMut for Reader {
44 fn deref_mut(&mut self) -> &mut Self::Target {
45 &mut self.0
46 }
47}
48
49#[cfg(test)]
50mod reader_tests {
51
52 use crate::config::{DbConfig, DbConfigBuilder};
53 use crate::reader::Reader;
54 use std::error::Error;
55
56 #[tokio::test]
57 async fn should_return_error_because_instance_is_not_a_reader() -> Result<(), Box<dyn Error>> {
58 dotenv::from_filename(".writer.env").ok();
59 let db_config = DbConfig::from_env();
60 let reader = Reader::new(db_config.clone()).await;
61
62 let err = reader.map_err(|e| e.to_string()).err().unwrap();
63 assert_eq!(err, "error with configuration: The host is not a reader instance; please check your configurations");
64 Ok(())
65 }
66
67 #[tokio::test]
68 async fn should_return_data_when_instance_is_a_reader() -> Result<(), Box<dyn Error>> {
69 dotenv::from_filename(".reader.env").ok();
70 let db_config = DbConfig::from_env();
71 let reader = Reader::new(db_config.clone()).await?;
72 let row: (i32,) = sqlx::query_as("SELECT 1").fetch_one(reader.pool()).await?;
73 assert_eq!(row.0, 1);
74 Ok(())
75 }
76
77 #[tokio::test]
78 async fn should_return_data_when_instance_is_a_reader_from_direct_config(
79 ) -> Result<(), Box<dyn Error>> {
80 let db_config = DbConfigBuilder::new()
81 .host("localhost")
82 .user("reader_user")
83 .name("reader")
84 .pass("password")
85 .app_name("some-app-to-run")
86 .port(5431)
87 .min_pool_size(1)
88 .max_pool_size(3)
89 .idle_in_transaction_session(3000)
90 .build()
91 .expect("problem to create db config");
92 let reader = Reader::new(db_config.clone()).await?;
93 let row: (i32,) = sqlx::query_as("SELECT 1").fetch_one(reader.pool()).await?;
94 assert_eq!(row.0, 1);
95 Ok(())
96 }
97
98 #[tokio::test]
99 async fn should_return_error_because_user_has_not_access() -> Result<(), Box<dyn Error>> {
100 dotenv::from_filename(".reader.env").ok();
101 let db_config = DbConfig::from_env();
102 let reader = Reader::new(db_config.clone()).await?;
103 let result = sqlx::query("CREATE TEMP TABLE temp_test (id INT)")
104 .execute(reader.pool())
105 .await;
106
107 let is_read_only = match result {
108 Ok(_) => false,
109 Err(sqlx::Error::Database(db_err)) => {
110 let cant_write_in_read_only_transaction = "25006";
111 db_err.code().as_deref() == Some(cant_write_in_read_only_transaction)
112 }
113 Err(_) => false,
114 };
115 assert!(is_read_only);
116 Ok(())
117 }
118}