1use tokio_postgres::Client;
2use crate::db::postgres::models::model::PostgresModelError;
3use tokio::time::timeout;
4use tokio::time::Duration;
5use tokio::time::sleep;
6use log::info;
7use tokio;
8use tokio_postgres::{Error as PostgresError, NoTls};
9
10use std::error::Error;
11use tokio_postgres_migration::Migration;
12
13use dotenvy::dotenv;
14use std::env;
15
16use std::fs;
17use std::str;
18
19type MigrationDefinition = (String, String);
20
21#[derive(Clone)]
22struct Migrations {
23 up: Vec<MigrationDefinition>,
24 down: Vec<MigrationDefinition>,
25}
26
27pub trait MigrationAsStr {
28 fn to_str(&self) -> (&str, &str);
29}
30
31impl MigrationAsStr for MigrationDefinition {
32 fn to_str(&self) -> (&str, &str) {
33 return (self.0.as_str(), self.1.as_str());
34 }
35}
36
37pub struct Database {
38 pub migrations_dir_path: Option<String>,
40 pub connection_url: String ,
41
42 pub max_reconnect_attempts: u32,
43 pub timeout_duration: Duration,
44}
45
46#[derive(Debug,Clone)]
47pub struct DatabaseCredentials {
48 pub db_name: String,
49 pub db_host: String,
50 pub db_user: String,
51 pub db_password: String,
52}
53
54impl Default for DatabaseCredentials {
55 fn default() -> Self {
56 Self {
57 db_name: "postgres".into(),
58 db_host: "localhost".into(),
59 db_user: "postgres".into(),
60 db_password: "postgres".into(),
61 }
62 }
63}
64
65impl DatabaseCredentials {
66 pub fn from_env() -> Self {
67 Self {
70 db_name: env::var("DB_NAME").unwrap_or("postgres".into()),
71 db_host: env::var("DB_HOST").unwrap_or("localhost".into()),
72 db_user: env::var("DB_USER").unwrap_or("postgres".into()),
73 db_password: env::var("DB_PASSWORD").unwrap_or("postgres".into()),
74 }
75 }
76
77 pub fn build_connection_url(&self) -> String {
78 return format!(
79 "postgres://{}:{}@{}/{}",
80 self.db_user, self.db_password, self.db_host, self.db_name
81 );
82 }
83}
84
85enum PostgresInputType {
86 Query,
87 QueryOne,
88 Execute,
89}
90
91struct PostgresInput<'a> {
92 input_type: PostgresInputType,
93 query: String,
94 params: &'a [&'a (dyn tokio_postgres::types::ToSql + Sync)],
95}
96
97impl Database {
98
99 pub fn new(
100
101 conn_url: String,
102 migrations_dir_path: Option<String>,
103 ) -> Result<Database, PostgresError> {
104
105
106
107 Ok(Database {
108 migrations_dir_path,
110 connection_url: conn_url.clone() ,
111 max_reconnect_attempts: 3 ,
112 timeout_duration: Duration::from_secs( 5 )
113 })
114
115 }
116
117 pub async fn connect(
118 & self
120 ) -> Result<Client, PostgresError> {
121 info!("Connecting to db: {}", self.connection_url);
125
126 let (client, connection) = tokio_postgres::connect(&self.connection_url, NoTls).await?;
127
128 tokio::spawn(async move {
131 if let Err(e) = connection.await {
133 eprintln!("postgres connection error: {}", e);
134 }
135 });
136
137 Ok( client )
140 }
141
142
143
144 pub async fn reconnect(&mut self ) -> Result<Option< Client >, PostgresError> {
145 let max_retries = 5;
146 let mut attempt = 0;
147 let conn_url = self.connection_url.clone() ;
148
149 while attempt < max_retries {
150 info!("Attempt {}: Reconnecting to database...", attempt + 1);
151
152 match tokio_postgres::connect(&conn_url, NoTls).await {
153 Ok((client, connection)) => {
154 tokio::spawn(async move {
156 if let Err(e) = connection.await {
157 eprintln!("postgres connection error: {}", e);
158 }
159 });
160
161 info!("Reconnection successful.");
163 return Ok( Some(client) );
164 }
165 Err(e) => {
166
167 attempt += 1;
168
169 if attempt == max_retries {
170 return Err( e.into() )
171 }
172 eprintln!("Reconnection failed: {}. Retrying...", e);
173 sleep(Duration::from_secs(2_u64.pow(attempt))).await; }
175 }
176 }
177
178 Ok(None) }
180
181
182 fn read_migration_files(migrations_dir_path: Option<String>) -> Migrations {
183 let mut migrations = Migrations {
184 up: Vec::new(),
185 down: Vec::new(),
186 };
187
188 let migrations_dir =
189 migrations_dir_path.unwrap_or("./src/db/postgres/migrations".to_string());
190 let migration_dir_files = fs::read_dir(&migrations_dir).expect("Failed to read directory");
191
192 for file in migration_dir_files {
193 let file = file.expect("Failed to read migration file");
194
195 let path = file.path();
196 let filename = path.file_stem().unwrap().to_str().unwrap();
197
198 let filename_without_extension: &str = filename.split('.').next().unwrap();
199
200 let contents = fs::read_to_string(file.path()).expect("Failed to read file contents");
202
203 info!("File name: {}", filename);
206
207 if filename.contains(".down") {
208 info!("File contents: {}", contents);
209 migrations
210 .down
211 .push((filename_without_extension.into(), contents.clone()));
212 }
213
214 if filename.contains(".up") {
215 info!("File contents: {}", contents);
216 migrations
217 .up
218 .push((filename_without_extension.into(), contents.clone()));
219 }
220 }
221
222
223 migrations.up.sort_by(|a, b| a.0.cmp(&b.0));
225
226 migrations.down.sort_by(|a, b| b.0.cmp(&a.0));
228
229
230 return migrations;
231 }
232
233 pub async fn migrate(&mut self) -> Result<(), Box<dyn Error>> {
234 let client = &mut self.connect().await?;
235
236 let migrations_dir_path = self.migrations_dir_path.clone();
237 let mut migrations: Migrations = Self::read_migration_files(migrations_dir_path);
238
239 for up_migration in migrations.up.drain(..) {
240 println!("migrating {} {} ", up_migration.0, up_migration.1);
241 let migration = Migration::new("migrations".to_string());
242
243 migration.up(client, &[up_migration.to_str()]).await?;
245 }
246
247 Ok(())
249 }
250
251 pub async fn rollback(&mut self) -> Result<(), Box<dyn Error>> {
254 Ok(())
255 }
256
257 pub async fn rollback_full(&mut self) -> Result<(), Box<dyn Error>> {
258 let migrations_dir_path = self.migrations_dir_path.clone();
259
260 let mut migrations: Migrations = Self::read_migration_files(migrations_dir_path);
261
262 let client = &mut self.connect().await?;
263
264 for down_migration in migrations.down.drain(..)
265 {
266 println!("migrating {}", down_migration.0);
267 let migration = Migration::new("migrations".to_string());
268 migration.down(client, &[down_migration.to_str()]).await?;
270 }
271
272 Ok(())
273 }
274
275 pub async fn query(
276 &self,
277 query: &str,
278 params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
279 ) -> Result<Vec<tokio_postgres::Row>, PostgresError> {
280
281 let client = &self.connect().await?;
282
283 let rows = client.query(query, params).await?;
284 Ok(rows)
285 }
286
287 pub async fn query_one(
333 & self,
334 query: &str,
335 params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
336 ) -> Result<tokio_postgres::Row, PostgresError> {
337
338 let client = &self.connect().await?;
339
340 let rows = client.query_one(query, params).await?;
341 Ok(rows)
342 }
343
344
345 pub async fn execute(
391 & self,
392 query: &str,
393 params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
394 ) -> Result<u64, PostgresError> {
395
396 let client = &self.connect().await?;
397
398
399
400 let rows = client.execute(query, params).await?;
401 Ok(rows)
402 }
403
404async fn atomic_transaction(
447 & self,
448 steps: Vec<PostgresInput<'_>>,
449 ) -> Result<(), PostgresError> {
450 let client = &mut self.connect().await?;
451
452 let transaction = client.transaction().await?;
454
455 for step in steps {
457 let result = transaction.execute(&step.query, step.params).await;
459 if result.is_err() {
461 transaction.rollback().await?;
463 return Err(PostgresError::from(result.err().unwrap()));
465 }
466 }
467
468 transaction.commit().await?;
470 Ok(())
472 }
473}
474
475pub fn try_get_option<'a, T: tokio_postgres::types::FromSql<'a>>(
476 row: &'a tokio_postgres::Row,
477 column: &str,
478) -> Option<T> {
479 match row.try_get::<&str, T>(column) {
480 Ok(value) => Some(value),
481 Err(_) => None,
482 }
483}