1use crate::db::postgres::models::model::PostgresModelError;
2use tokio::time::timeout;
3use tokio::time::Duration;
4use tokio::time::sleep;
5use log::info;
6use tokio;
7use tokio_postgres::{Error as PostgresError, NoTls};
8
9use std::error::Error;
10use tokio_postgres_migration::Migration;
11
12use dotenvy::dotenv;
13use std::env;
14
15use std::fs;
16use std::str;
17
18type MigrationDefinition = (String, String);
19
20#[derive(Clone)]
21struct Migrations {
22 up: Vec<MigrationDefinition>,
23 down: Vec<MigrationDefinition>,
24}
25
26pub trait MigrationAsStr {
27 fn to_str(&self) -> (&str, &str);
28}
29
30impl MigrationAsStr for MigrationDefinition {
31 fn to_str(&self) -> (&str, &str) {
32 return (self.0.as_str(), self.1.as_str());
33 }
34}
35
36pub struct Database {
37 pub client: tokio_postgres::Client,
38 pub migrations_dir_path: Option<String>,
39 pub reconnection_url: String ,
40 pub max_reconnect_attempts: u32,
41 pub timeout_duration: Duration,
42}
43
44#[derive(Debug,Clone)]
45pub struct DatabaseCredentials {
46 pub db_name: String,
47 pub db_host: String,
48 pub db_user: String,
49 pub db_password: String,
50}
51
52impl Default for DatabaseCredentials {
53 fn default() -> Self {
54 Self {
55 db_name: "postgres".into(),
56 db_host: "localhost".into(),
57 db_user: "postgres".into(),
58 db_password: "postgres".into(),
59 }
60 }
61}
62
63impl DatabaseCredentials {
64 pub fn from_env() -> Self {
65 Self {
68 db_name: env::var("DB_NAME").unwrap_or("postgres".into()),
69 db_host: env::var("DB_HOST").unwrap_or("localhost".into()),
70 db_user: env::var("DB_USER").unwrap_or("postgres".into()),
71 db_password: env::var("DB_PASSWORD").unwrap_or("postgres".into()),
72 }
73 }
74
75 pub fn build_connection_url(&self) -> String {
76 return format!(
77 "postgres://{}:{}@{}/{}",
78 self.db_user, self.db_password, self.db_host, self.db_name
79 );
80 }
81}
82
83enum PostgresInputType {
84 Query,
85 QueryOne,
86 Execute,
87}
88
89struct PostgresInput<'a> {
90 input_type: PostgresInputType,
91 query: String,
92 params: &'a [&'a (dyn tokio_postgres::types::ToSql + Sync)],
93}
94
95impl Database {
96 pub async fn connect(
97 conn_url: String,
99 migrations_dir_path: Option<String>,
100 ) -> Result<Database, PostgresError> {
101 info!("Connecting to db: {}", conn_url);
105
106 let (client, connection) = tokio_postgres::connect(&conn_url, NoTls).await?;
107
108 tokio::spawn(async move {
111 if let Err(e) = connection.await {
113 eprintln!("postgres connection error: {}", e);
114 }
115 });
116
117 Ok(Database {
118 client,
119 migrations_dir_path,
120 reconnection_url: conn_url.clone() ,
121 max_reconnect_attempts: 3 ,
122 timeout_duration: Duration::from_secs( 5 )
123 })
124 }
125
126
127
128 pub async fn reconnect(&mut self ) -> Result<(), PostgresError> {
129 let max_retries = 5;
130 let mut attempt = 0;
131 let conn_url = self.reconnection_url.clone() ;
132
133 while attempt < max_retries {
134 info!("Attempt {}: Reconnecting to database...", attempt + 1);
135
136 match tokio_postgres::connect(&conn_url, NoTls).await {
137 Ok((client, connection)) => {
138 tokio::spawn(async move {
140 if let Err(e) = connection.await {
141 eprintln!("postgres connection error: {}", e);
142 }
143 });
144
145 self.client = client; info!("Reconnection successful.");
147 return Ok(());
148 }
149 Err(e) => {
150
151 attempt += 1;
152
153 if attempt == max_retries {
154 return Err( e.into() )
155 }
156 eprintln!("Reconnection failed: {}. Retrying...", e);
157 sleep(Duration::from_secs(2_u64.pow(attempt))).await; }
159 }
160 }
161
162 Ok(()) }
164
165
166 fn read_migration_files(migrations_dir_path: Option<String>) -> Migrations {
167 let mut migrations = Migrations {
168 up: Vec::new(),
169 down: Vec::new(),
170 };
171
172 let migrations_dir =
173 migrations_dir_path.unwrap_or("./src/db/postgres/migrations".to_string());
174 let migration_dir_files = fs::read_dir(&migrations_dir).expect("Failed to read directory");
175
176 for file in migration_dir_files {
177 let file = file.expect("Failed to read migration file");
178
179 let path = file.path();
180 let filename = path.file_stem().unwrap().to_str().unwrap();
181
182 let filename_without_extension: &str = filename.split('.').next().unwrap();
183
184 let contents = fs::read_to_string(file.path()).expect("Failed to read file contents");
186
187 info!("File name: {}", filename);
190
191 if filename.contains(".down") {
192 info!("File contents: {}", contents);
193 migrations
194 .down
195 .push((filename_without_extension.into(), contents.clone()));
196 }
197
198 if filename.contains(".up") {
199 info!("File contents: {}", contents);
200 migrations
201 .up
202 .push((filename_without_extension.into(), contents.clone()));
203 }
204 }
205
206
207 migrations.up.sort_by(|a, b| a.0.cmp(&b.0));
209
210 migrations.down.sort_by(|a, b| b.0.cmp(&a.0));
212
213
214 return migrations;
215 }
216
217 pub async fn migrate(&mut self) -> Result<(), Box<dyn Error>> {
218 let client = &mut self.client;
219
220 let migrations_dir_path = self.migrations_dir_path.clone();
221 let mut migrations: Migrations = Self::read_migration_files(migrations_dir_path);
222
223 for up_migration in migrations.up.drain(..) {
224 println!("migrating {} {} ", up_migration.0, up_migration.1);
225 let migration = Migration::new("migrations".to_string());
226
227 migration.up(client, &[up_migration.to_str()]).await?;
229 }
230
231 Ok(())
233 }
234
235 pub async fn rollback(&mut self) -> Result<(), Box<dyn Error>> {
238 Ok(())
239 }
240
241 pub async fn rollback_full(&mut self) -> Result<(), Box<dyn Error>> {
242 let migrations_dir_path = self.migrations_dir_path.clone();
243
244 let mut migrations: Migrations = Self::read_migration_files(migrations_dir_path);
245
246 let client = &mut self.client;
247
248 for down_migration in migrations.down.drain(..)
249 {
250 println!("migrating {}", down_migration.0);
251 let migration = Migration::new("migrations".to_string());
252 migration.down(client, &[down_migration.to_str()]).await?;
254 }
255
256 Ok(())
257 }
258
259 pub async fn query(
260 &self,
261 query: &str,
262 params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
263 ) -> Result<Vec<tokio_postgres::Row>, PostgresError> {
264 let rows = self.client.query(query, params).await?;
265 Ok(rows)
266 }
267
268 pub async fn query_with_reconnect(
269 &mut self,
270 query: &str,
271 params: &[&(dyn tokio_postgres::types::ToSql + Sync)]
272
273 ) -> Result<Vec<tokio_postgres::Row>, PostgresModelError> {
274
275 let max_tries = self.max_reconnect_attempts;
276 let timeout_duration = self.timeout_duration;
277
278 let mut attempts = 0;
279
280 loop {
281
282 attempts += 1;
283
284 let insert_result = timeout(
285 timeout_duration,
286 self.client.query(query, params),
287 ).await;
288
289 match insert_result {
290 Ok(Ok(rows)) => return Ok(rows),
291 Ok(Err(e)) => {
292 eprintln!("Database error: {:?}", e);
293 return Err(e .into());
294 },
295 Err( _ ) => {
296 eprintln!("Database timeout occurred.");
297 let _reconnect_result = self.reconnect().await;
298
299 if attempts == max_tries {
300
301 return Err(PostgresModelError::Timeout ) ;
302 }
303 }
305 }
306 }
307 }
308
309
310
311 pub async fn query_one(
312 &self,
313 query: &str,
314 params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
315 ) -> Result<tokio_postgres::Row, PostgresError> {
316 let rows = self.client.query_one(query, params).await?;
317 Ok(rows)
318 }
319
320
321 pub async fn query_one_with_reconnect(
322 &mut self,
323 query: &str,
324 params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
325
326 ) -> Result<tokio_postgres::Row, PostgresModelError> {
327
328 let timeout_duration = self.timeout_duration;
329
330 let max_tries = self.max_reconnect_attempts;
331 let mut attempts = 0;
332
333 loop {
334
335 attempts += 1;
336
337 let insert_result = timeout(
338 timeout_duration,
339 self.client.query_one(query, params),
340 ).await;
341
342 match insert_result {
343 Ok(Ok(row)) => return Ok(row),
344 Ok(Err(e)) => {
345 eprintln!("Database error: {:?}", e);
346 return Err(e .into());
347 },
348 Err( _ ) => {
349 eprintln!("Database timeout occurred.");
350 let _reconnect_result = self.reconnect().await;
351
352 if attempts == max_tries {
353
354 return Err(PostgresModelError::Timeout ) ;
355 }
356 }
358 }
359 }
360 }
361
362
363 pub async fn execute(
365 &self,
366 query: &str,
367 params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
368 ) -> Result<u64, PostgresError> {
369 let rows = self.client.execute(query, params).await?;
370 Ok(rows)
371 }
372
373
374 pub async fn execute_with_reconnect(
375 &mut self,
376 query: &str,
377 params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
378 ) -> Result<u64, PostgresModelError> {
380
381 let timeout_duration = self.timeout_duration;
382
383 let max_tries = self.max_reconnect_attempts;
384 let mut attempts = 0;
385
386 loop {
387
388 attempts += 1;
389
390 let insert_result = timeout(
391 timeout_duration,
392 self.client.execute(query, params),
393 ).await;
394
395 match insert_result {
396 Ok(Ok(row)) => return Ok(row),
397 Ok(Err(e)) => {
398 eprintln!("Database error: {:?}", e);
399 return Err(e .into());
400 },
401 Err( _ ) => {
402 eprintln!("Database timeout occurred.");
403 let _reconnect_result = self.reconnect().await;
404
405 if attempts == max_tries {
406
407 return Err(PostgresModelError::Timeout ) ;
408 }
409 }
411 }
412 }
413 }
414
415 async fn atomic_transaction(
416 &mut self,
417 steps: Vec<PostgresInput<'_>>,
418 ) -> Result<(), PostgresError> {
419 let transaction = self.client.transaction().await?;
421
422 for step in steps {
424 let result = transaction.execute(&step.query, step.params).await;
426 if result.is_err() {
428 transaction.rollback().await?;
430 return Err(PostgresError::from(result.err().unwrap()));
432 }
433 }
434
435 transaction.commit().await?;
437 Ok(())
439 }
440}
441
442pub fn try_get_option<'a, T: tokio_postgres::types::FromSql<'a>>(
443 row: &'a tokio_postgres::Row,
444 column: &str,
445) -> Option<T> {
446 match row.try_get::<&str, T>(column) {
447 Ok(value) => Some(value),
448 Err(_) => None,
449 }
450}