degen_sql/db/postgres/
postgres_db.rs

1use crate::db::postgres::models::model::PostgresModelError;
2use tokio::time::timeout;
3use tokio::time::Duration;
4use tokio::time::sleep;
5use log::info;
6use tokio;
7use tokio_postgres::{Error as PostgresError, NoTls};
8
9use std::error::Error;
10use tokio_postgres_migration::Migration;
11
12use dotenvy::dotenv;
13use std::env;
14
15use std::fs;
16use std::str;
17
18type MigrationDefinition = (String, String);
19
20#[derive(Clone)]
21struct Migrations {
22    up: Vec<MigrationDefinition>,
23    down: Vec<MigrationDefinition>,
24}
25
26pub trait MigrationAsStr {
27    fn to_str(&self) -> (&str, &str);
28}
29
30impl MigrationAsStr for MigrationDefinition {
31    fn to_str(&self) -> (&str, &str) {
32        return (self.0.as_str(), self.1.as_str());
33    }
34}
35
36pub struct Database {
37    pub client: tokio_postgres::Client,
38    pub migrations_dir_path: Option<String>,
39    pub reconnection_url:  String  , 
40    pub max_reconnect_attempts: u32, 
41    pub timeout_duration: Duration, 
42}
43
44#[derive(Debug,Clone)]
45pub struct DatabaseCredentials {
46    pub db_name: String,
47    pub db_host: String,
48    pub db_user: String,
49    pub db_password: String,
50}
51
52impl Default for DatabaseCredentials {
53    fn default() -> Self {
54        Self {
55            db_name: "postgres".into(),
56            db_host: "localhost".into(),
57            db_user: "postgres".into(),
58            db_password: "postgres".into(),
59        }
60    }
61}
62
63impl DatabaseCredentials {
64    pub fn from_env() -> Self {
65        //dotenv().expect(".env file not found"); //need to run this beforehand !! 
66
67        Self {
68            db_name: env::var("DB_NAME").unwrap_or("postgres".into()),
69            db_host: env::var("DB_HOST").unwrap_or("localhost".into()),
70            db_user: env::var("DB_USER").unwrap_or("postgres".into()),
71            db_password: env::var("DB_PASSWORD").unwrap_or("postgres".into()),
72        }
73    }
74
75    pub fn build_connection_url(&self) -> String {
76        return format!(
77            "postgres://{}:{}@{}/{}",
78            self.db_user, self.db_password, self.db_host, self.db_name
79        );
80    }
81}
82
83enum PostgresInputType {
84    Query,
85    QueryOne,
86    Execute,
87}
88
89struct PostgresInput<'a> {
90    input_type: PostgresInputType,
91    query: String,
92    params: &'a [&'a (dyn tokio_postgres::types::ToSql + Sync)],
93}
94
95impl Database {
96    pub async fn connect(
97       // credentials: DatabaseCredentials,
98       conn_url: String, 
99        migrations_dir_path: Option<String>,
100    ) -> Result<Database, PostgresError> {
101        // Define the connection URL.
102       // let conn_url = credentials.build_connection_url();
103
104        info!("Connecting to db: {}", conn_url);
105
106        let (client, connection) = tokio_postgres::connect(&conn_url, NoTls).await?;
107
108        // The connection object performs the actual communication with the database,
109        // so spawn it off to run on its own.
110        tokio::spawn(async move {
111            //this is a blocking call i think !!!
112            if let Err(e) = connection.await {
113                eprintln!("postgres connection error: {}", e);
114            }
115        });
116
117        Ok(Database {
118            client,
119            migrations_dir_path,
120            reconnection_url:  conn_url.clone() ,
121            max_reconnect_attempts: 3 ,
122            timeout_duration: Duration::from_secs( 5 )
123        })
124    }
125
126
127
128    pub async fn reconnect(&mut self  ) -> Result<(), PostgresError> {
129        let max_retries = 5;
130        let mut attempt = 0;
131         let conn_url = self.reconnection_url.clone() ;
132
133        while attempt < max_retries {
134            info!("Attempt {}: Reconnecting to database...", attempt + 1);
135
136            match tokio_postgres::connect(&conn_url, NoTls).await {
137                Ok((client, connection)) => {
138                    // Spawn a task to keep the connection alive
139                    tokio::spawn(async move {
140                        if let Err(e) = connection.await {
141                            eprintln!("postgres connection error: {}", e);
142                        }
143                    });
144
145                    self.client = client; // Replace old client with the new one
146                    info!("Reconnection successful.");
147                    return Ok(());
148                }
149                Err(e) => {
150                  
151                    attempt += 1;
152
153                    if attempt == max_retries {
154                        return Err( e.into() )
155                    }
156                      eprintln!("Reconnection failed: {}. Retrying...", e);
157                    sleep(Duration::from_secs(2_u64.pow(attempt))).await; // Exponential backoff
158                }
159            }
160        }
161
162       Ok(())  //should error ? 
163    }
164
165
166    fn read_migration_files(migrations_dir_path: Option<String>) -> Migrations {
167        let mut migrations = Migrations {
168            up: Vec::new(),
169            down: Vec::new(),
170        };
171
172        let migrations_dir =
173            migrations_dir_path.unwrap_or("./src/db/postgres/migrations".to_string());
174        let migration_dir_files = fs::read_dir(&migrations_dir).expect("Failed to read directory");
175
176        for file in migration_dir_files {
177            let file = file.expect("Failed to read migration file");
178
179            let path = file.path();
180            let filename = path.file_stem().unwrap().to_str().unwrap();
181
182            let filename_without_extension: &str = filename.split('.').next().unwrap();
183
184            // Read file contents
185            let contents = fs::read_to_string(file.path()).expect("Failed to read file contents");
186
187            //let contents = str::from_utf8(file.contents()).unwrap();
188
189            info!("File name: {}", filename);
190
191            if filename.contains(".down") {
192                info!("File contents: {}", contents);
193                migrations
194                    .down
195                    .push((filename_without_extension.into(), contents.clone()));
196            }
197
198            if filename.contains(".up") {
199                info!("File contents: {}", contents);
200                migrations
201                    .up
202                    .push((filename_without_extension.into(), contents.clone()));
203            }
204        }
205        
206            
207        // Sort `up` migrations in ascending alphabetical order
208        migrations.up.sort_by(|a, b| a.0.cmp(&b.0));
209        
210        // Sort `down` migrations in descending alphabetical order
211        migrations.down.sort_by(|a, b| b.0.cmp(&a.0));
212                
213
214        return migrations;
215    }
216
217    pub async fn migrate(&mut self) -> Result<(), Box<dyn Error>> {
218        let client = &mut self.client;
219
220        let migrations_dir_path = self.migrations_dir_path.clone();
221        let mut migrations: Migrations = Self::read_migration_files(migrations_dir_path);
222
223        for up_migration in migrations.up.drain(..) {
224            println!("migrating {} {} ", up_migration.0, up_migration.1);
225            let migration = Migration::new("migrations".to_string());
226
227            // execute non existing migrations
228            migration.up(client, &[up_migration.to_str()]).await?;
229        }
230
231        // ...
232        Ok(())
233    }
234
235    //basically need to do the DOWN migrations and also delete some records from the migrations table
236    //need to read from the migrations table with MigrationsModel::find
237    pub async fn rollback(&mut self) -> Result<(), Box<dyn Error>> {
238        Ok(())
239    }
240
241    pub async fn rollback_full(&mut self) -> Result<(), Box<dyn Error>> {
242        let migrations_dir_path = self.migrations_dir_path.clone();
243
244        let mut migrations: Migrations = Self::read_migration_files(migrations_dir_path);
245
246        let client = &mut self.client;
247
248        for down_migration in migrations.down.drain(..)
249         {
250            println!("migrating {}", down_migration.0);
251            let migration = Migration::new("migrations".to_string());
252            // execute non existing migrations
253            migration.down(client, &[down_migration.to_str()]).await?;
254        }
255
256        Ok(())
257    }
258
259    pub async fn query(
260        &self,
261        query: &str,
262        params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
263    ) -> Result<Vec<tokio_postgres::Row>, PostgresError> {
264        let rows = self.client.query(query, params).await?;
265        Ok(rows)
266    }
267
268      pub async fn query_with_reconnect(
269        &mut self,
270        query: &str,
271        params: &[&(dyn tokio_postgres::types::ToSql + Sync)] 
272       
273    ) -> Result<Vec<tokio_postgres::Row>, PostgresModelError> {
274
275        let max_tries = self.max_reconnect_attempts; 
276        let timeout_duration = self.timeout_duration;
277
278        let mut attempts = 0;
279
280        loop {
281
282            attempts += 1; 
283
284            let insert_result = timeout(
285                timeout_duration,
286                self.client.query(query, params),
287            ).await;
288
289            match insert_result {
290                Ok(Ok(rows)) => return Ok(rows),
291                Ok(Err(e)) => {
292                    eprintln!("Database error: {:?}", e);
293                    return Err(e .into());
294                },
295                Err( _ ) => {
296                    eprintln!("Database timeout occurred.");
297                    let _reconnect_result = self.reconnect().await;
298
299                    if attempts == max_tries {
300
301                        return Err(PostgresModelError::Timeout ) ;
302                    }
303                    // After reconnection, the loop will continue to retry the query
304                }
305            }
306        }
307    }
308
309
310
311    pub async fn query_one(
312        &self,
313        query: &str,
314        params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
315    ) -> Result<tokio_postgres::Row, PostgresError> {
316        let rows = self.client.query_one(query, params).await?;
317        Ok(rows)
318    }
319
320 
321    pub async fn query_one_with_reconnect(
322        &mut self,
323        query: &str,
324        params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
325       
326    ) -> Result<tokio_postgres::Row, PostgresModelError> {
327
328        let timeout_duration = self.timeout_duration;
329
330        let max_tries = self.max_reconnect_attempts; 
331        let mut attempts = 0;
332
333        loop {
334
335            attempts += 1; 
336
337            let insert_result = timeout(
338                timeout_duration,
339                self.client.query_one(query, params),
340            ).await;
341
342            match insert_result {
343                Ok(Ok(row)) => return Ok(row),
344                Ok(Err(e)) => {
345                    eprintln!("Database error: {:?}", e);
346                    return Err(e .into());
347                },
348                Err( _ ) => {
349                    eprintln!("Database timeout occurred.");
350                    let _reconnect_result = self.reconnect().await;
351
352                    if attempts == max_tries {
353
354                        return Err(PostgresModelError::Timeout ) ;
355                    }
356                    // After reconnection, the loop will continue to retry the query
357                }
358            }
359        }
360    }
361
362
363    //use for insert, update, etc
364    pub async fn execute(
365        &self,
366        query: &str,
367        params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
368    ) -> Result<u64, PostgresError> {
369        let rows = self.client.execute(query, params).await?;
370        Ok(rows)
371    }
372    
373
374    pub async fn execute_with_reconnect(
375        &mut self,
376        query: &str,
377        params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
378        //timeout_duration: Duration,
379    ) -> Result<u64, PostgresModelError> {
380
381         let timeout_duration = self.timeout_duration;
382         
383        let max_tries = self.max_reconnect_attempts; 
384        let mut attempts = 0;
385
386        loop {
387
388            attempts += 1; 
389
390            let insert_result = timeout(
391                timeout_duration,
392                self.client.execute(query, params),
393            ).await;
394
395            match insert_result {
396                Ok(Ok(row)) => return Ok(row),
397                Ok(Err(e)) => {
398                    eprintln!("Database error: {:?}", e);
399                    return Err(e .into());
400                },
401                Err( _ ) => {
402                    eprintln!("Database timeout occurred.");
403                    let _reconnect_result = self.reconnect().await;
404
405                    if attempts == max_tries {
406
407                        return Err(PostgresModelError::Timeout ) ;
408                    }
409                    // After reconnection, the loop will continue to retry the query
410                }
411            }
412        }
413    }
414
415    async fn atomic_transaction(
416        &mut self,
417        steps: Vec<PostgresInput<'_>>,
418    ) -> Result<(), PostgresError> {
419        // Start a transaction
420        let transaction = self.client.transaction().await?;
421
422        //for each step in steps
423        for step in steps {
424            //execute the step
425            let result = transaction.execute(&step.query, step.params).await;
426            //check if the result is ok
427            if result.is_err() {
428                //if not rollback
429                transaction.rollback().await?;
430                //return error
431                return Err(PostgresError::from(result.err().unwrap()));
432            }
433        }
434
435        //if all steps are ok commit
436        transaction.commit().await?;
437        //return ok
438        Ok(())
439    }
440}
441
442pub fn try_get_option<'a, T: tokio_postgres::types::FromSql<'a>>(
443    row: &'a tokio_postgres::Row,
444    column: &str,
445) -> Option<T> {
446    match row.try_get::<&str, T>(column) {
447        Ok(value) => Some(value),
448        Err(_) => None,
449    }
450}