degen_sql/db/postgres/
postgres_db.rs

1use tokio_postgres::Client;
2use crate::db::postgres::models::model::PostgresModelError;
3use tokio::time::timeout;
4use tokio::time::Duration;
5use tokio::time::sleep;
6use log::info;
7use tokio;
8use tokio_postgres::{Error as PostgresError, NoTls};
9
10use std::error::Error;
11use tokio_postgres_migration::Migration;
12
13use dotenvy::dotenv;
14use std::env;
15
16use std::fs;
17use std::str;
18
19type MigrationDefinition = (String, String);
20
21#[derive(Clone)]
22struct Migrations {
23    up: Vec<MigrationDefinition>,
24    down: Vec<MigrationDefinition>,
25}
26
27pub trait MigrationAsStr {
28    fn to_str(&self) -> (&str, &str);
29}
30
31impl MigrationAsStr for MigrationDefinition {
32    fn to_str(&self) -> (&str, &str) {
33        return (self.0.as_str(), self.1.as_str());
34    }
35}
36
37pub struct Database {
38  //  pub client: Option<  tokio_postgres::Client > ,
39    pub migrations_dir_path: Option<String>,
40    pub connection_url:  String  , 
41   
42    pub max_reconnect_attempts: u32, 
43    pub timeout_duration: Duration, 
44}
45
46#[derive(Debug,Clone)]
47pub struct DatabaseCredentials {
48    pub db_name: String,
49    pub db_host: String,
50    pub db_user: String,
51    pub db_password: String,
52}
53
54impl Default for DatabaseCredentials {
55    fn default() -> Self {
56        Self {
57            db_name: "postgres".into(),
58            db_host: "localhost".into(),
59            db_user: "postgres".into(),
60            db_password: "postgres".into(),
61        }
62    }
63}
64
65impl DatabaseCredentials {
66    pub fn from_env() -> Self {
67        //dotenv().expect(".env file not found"); //need to run this beforehand !! 
68
69        Self {
70            db_name: env::var("DB_NAME").unwrap_or("postgres".into()),
71            db_host: env::var("DB_HOST").unwrap_or("localhost".into()),
72            db_user: env::var("DB_USER").unwrap_or("postgres".into()),
73            db_password: env::var("DB_PASSWORD").unwrap_or("postgres".into()),
74        }
75    }
76
77    pub fn build_connection_url(&self) -> String {
78        return format!(
79            "postgres://{}:{}@{}/{}",
80            self.db_user, self.db_password, self.db_host, self.db_name
81        );
82    }
83}
84
85enum PostgresInputType {
86    Query,
87    QueryOne,
88    Execute,
89}
90
91struct PostgresInput<'a> {
92    input_type: PostgresInputType,
93    query: String,
94    params: &'a [&'a (dyn tokio_postgres::types::ToSql + Sync)],
95}
96
97impl Database {
98
99    pub fn new(
100
101         conn_url: String, 
102         migrations_dir_path: Option<String>,
103     ) -> Result<Database, PostgresError> {
104
105
106
107        Ok(Database {
108            //client: None,  
109            migrations_dir_path,
110            connection_url:  conn_url.clone() ,
111            max_reconnect_attempts: 3 ,
112            timeout_duration: Duration::from_secs( 5 )
113        })
114
115    }
116
117    pub async fn connect(
118       // credentials: DatabaseCredentials,
119       &  self 
120    ) -> Result<Client, PostgresError> {
121        // Define the connection URL.
122       // let conn_url = credentials.build_connection_url();
123
124        info!("Connecting to db: {}", self.connection_url);
125
126        let (client, connection) = tokio_postgres::connect(&self.connection_url, NoTls).await?;
127
128        // The connection object performs the actual communication with the database,
129        // so spawn it off to run on its own.
130        tokio::spawn(async move {
131            //this is a blocking call i think !!!
132            if let Err(e) = connection.await {
133                eprintln!("postgres connection error: {}", e);
134            }
135        });
136
137     //   self.client = Some(client);
138
139        Ok( client )
140    }
141
142
143
144    pub async fn reconnect(&mut self  ) -> Result<Option< Client >, PostgresError> {
145        let max_retries = 5;
146        let mut attempt = 0;
147         let conn_url = self.connection_url.clone() ;
148
149        while attempt < max_retries {
150            info!("Attempt {}: Reconnecting to database...", attempt + 1);
151
152            match tokio_postgres::connect(&conn_url, NoTls).await {
153                Ok((client, connection)) => {
154                    // Spawn a task to keep the connection alive
155                    tokio::spawn(async move {
156                        if let Err(e) = connection.await {
157                            eprintln!("postgres connection error: {}", e);
158                        }
159                    });
160
161                //    self.client = Some(client); // Replace old client with the new one
162                    info!("Reconnection successful.");
163                    return Ok( Some(client) );
164                }
165                Err(e) => {
166                  
167                    attempt += 1;
168
169                    if attempt == max_retries {
170                        return Err( e.into() )
171                    }
172                      eprintln!("Reconnection failed: {}. Retrying...", e);
173                    sleep(Duration::from_secs(2_u64.pow(attempt))).await; // Exponential backoff
174                }
175            }
176        }
177
178       Ok(None)  //should error ? 
179    }
180
181
182    fn read_migration_files(migrations_dir_path: Option<String>) -> Migrations {
183        let mut migrations = Migrations {
184            up: Vec::new(),
185            down: Vec::new(),
186        };
187
188        let migrations_dir =
189            migrations_dir_path.unwrap_or("./src/db/postgres/migrations".to_string());
190        let migration_dir_files = fs::read_dir(&migrations_dir).expect("Failed to read directory");
191
192        for file in migration_dir_files {
193            let file = file.expect("Failed to read migration file");
194
195            let path = file.path();
196            let filename = path.file_stem().unwrap().to_str().unwrap();
197
198            let filename_without_extension: &str = filename.split('.').next().unwrap();
199
200            // Read file contents
201            let contents = fs::read_to_string(file.path()).expect("Failed to read file contents");
202
203            //let contents = str::from_utf8(file.contents()).unwrap();
204
205            info!("File name: {}", filename);
206
207            if filename.contains(".down") {
208                info!("File contents: {}", contents);
209                migrations
210                    .down
211                    .push((filename_without_extension.into(), contents.clone()));
212            }
213
214            if filename.contains(".up") {
215                info!("File contents: {}", contents);
216                migrations
217                    .up
218                    .push((filename_without_extension.into(), contents.clone()));
219            }
220        }
221        
222            
223        // Sort `up` migrations in ascending alphabetical order
224        migrations.up.sort_by(|a, b| a.0.cmp(&b.0));
225        
226        // Sort `down` migrations in descending alphabetical order
227        migrations.down.sort_by(|a, b| b.0.cmp(&a.0));
228                
229
230        return migrations;
231    }
232
233    pub async fn migrate(&mut self) -> Result<(), Box<dyn Error>> {
234        let client = &mut self.connect().await?;
235
236        let migrations_dir_path = self.migrations_dir_path.clone();
237        let mut migrations: Migrations = Self::read_migration_files(migrations_dir_path);
238
239        for up_migration in migrations.up.drain(..) {
240            println!("migrating {} {} ", up_migration.0, up_migration.1);
241            let migration = Migration::new("migrations".to_string());
242
243            // execute non existing migrations
244            migration.up(client, &[up_migration.to_str()]).await?;
245        }
246
247        // ...
248        Ok(())
249    }
250
251    //basically need to do the DOWN migrations and also delete some records from the migrations table
252    //need to read from the migrations table with MigrationsModel::find
253    pub async fn rollback(&mut self) -> Result<(), Box<dyn Error>> {
254        Ok(())
255    }
256
257    pub async fn rollback_full(&mut self) -> Result<(), Box<dyn Error>> {
258        let migrations_dir_path = self.migrations_dir_path.clone();
259
260        let mut migrations: Migrations = Self::read_migration_files(migrations_dir_path);
261
262            let client = &mut self.connect().await?;
263
264        for down_migration in migrations.down.drain(..)
265         {
266            println!("migrating {}", down_migration.0);
267            let migration = Migration::new("migrations".to_string());
268            // execute non existing migrations
269            migration.down(client, &[down_migration.to_str()]).await?;
270        }
271
272        Ok(())
273    }
274
275     pub async fn query(
276        &self,
277        query: &str,
278        params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
279    ) -> Result<Vec<tokio_postgres::Row>, PostgresError> {
280
281        let client = &self.connect().await?;
282
283        let rows = client.query(query, params).await?;
284        Ok(rows)
285    } 
286
287   /*  pub async fn query_and_connect(
288        &mut self,
289        query: &str,
290        params: &[&(dyn tokio_postgres::types::ToSql + Sync)] 
291       
292    ) -> Result<Vec<tokio_postgres::Row>, PostgresModelError> {
293
294        let client = &mut self.connect().await?;
295
296        let max_tries = self.max_reconnect_attempts; 
297        let timeout_duration = self.timeout_duration;
298
299        let mut attempts = 0;
300
301        loop {
302
303            attempts += 1; 
304
305            let insert_result = timeout(
306                timeout_duration,
307                 client.query(query, params),
308            ).await;
309
310            match insert_result {
311                Ok(Ok(rows)) => return Ok(rows),
312                Ok(Err(e)) => {
313                    eprintln!("Database error: {:?}", e);
314                    return Err(e .into());
315                },
316                Err( _ ) => {
317                    eprintln!("Database timeout occurred.");
318                    let _reconnect_result = self.reconnect().await;
319
320                    if attempts == max_tries {
321
322                        return Err(PostgresModelError::Timeout ) ;
323                    }
324                    // After reconnection, the loop will continue to retry the query
325                }
326            }
327        }
328    }*/
329
330
331
332    pub async fn query_one(
333        & self,
334        query: &str,
335        params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
336    ) -> Result<tokio_postgres::Row, PostgresError> {
337
338          let client = &self.connect().await?;
339
340        let rows =  client.query_one(query, params).await?;
341        Ok(rows)
342    }
343
344 
345 /*   pub async fn query_one_with_reconnect(
346        &mut self,
347        query: &str,
348        params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
349       
350    ) -> Result<tokio_postgres::Row, PostgresModelError> {
351
352        let timeout_duration = self.timeout_duration;
353
354        let max_tries = self.max_reconnect_attempts; 
355        let mut attempts = 0;
356
357        loop {
358
359            attempts += 1; 
360
361            let insert_result = timeout(
362                timeout_duration,
363                self.client.query_one(query, params),
364            ).await;
365
366            match insert_result {
367                Ok(Ok(row)) => return Ok(row),
368                Ok(Err(e)) => {
369
370
371                    eprintln!("Database error: {:?}", e);
372                    return Err(e .into());
373                },
374                Err( _ ) => {
375                    eprintln!("Database timeout occurred.");
376                    let _reconnect_result = self.reconnect().await;
377
378                    if attempts == max_tries {
379
380                        return Err(PostgresModelError::Timeout ) ;
381                    }
382                    // After reconnection, the loop will continue to retry the query
383                }
384            }
385        }
386    }
387*/
388
389    //use for insert, update, etc
390    pub async fn execute(
391        &  self,
392        query: &str,
393        params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
394    ) -> Result<u64, PostgresError> {
395
396          let client = &self.connect().await?;
397
398        
399
400        let rows = client.execute(query, params).await?;
401        Ok(rows)
402    }
403    
404/*
405    pub async fn execute_with_reconnect(
406        &mut self,
407        query: &str,
408        params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
409        //timeout_duration: Duration,
410    ) -> Result<u64, PostgresModelError> {
411
412         let timeout_duration = self.timeout_duration;
413         
414        let max_tries = self.max_reconnect_attempts; 
415        let mut attempts = 0;
416
417        loop {
418
419            attempts += 1; 
420
421            let insert_result = timeout(
422                timeout_duration,
423                self.client.execute(query, params),
424            ).await;
425
426            match insert_result {
427                Ok(Ok(row)) => return Ok(row),
428                Ok(Err(e)) => {
429                    eprintln!("Database error: {:?}", e);
430                    return Err(e .into());
431                },
432                Err( _ ) => {
433                    eprintln!("Database timeout occurred.");
434                    let _reconnect_result = self.reconnect().await;
435
436                    if attempts == max_tries {
437
438                        return Err(PostgresModelError::Timeout ) ;
439                    }
440                    // After reconnection, the loop will continue to retry the query
441                }
442            }
443        }
444    }*/
445
446    async fn atomic_transaction(
447        &  self,
448        steps: Vec<PostgresInput<'_>>,
449    ) -> Result<(), PostgresError> {
450          let client = &mut self.connect().await?;
451
452        // Start a transaction
453        let transaction = client.transaction().await?;
454
455        //for each step in steps
456        for step in steps {
457            //execute the step
458            let result = transaction.execute(&step.query, step.params).await;
459            //check if the result is ok
460            if result.is_err() {
461                //if not rollback
462                transaction.rollback().await?;
463                //return error
464                return Err(PostgresError::from(result.err().unwrap()));
465            }
466        }
467
468        //if all steps are ok commit
469        transaction.commit().await?;
470        //return ok
471        Ok(())
472    }
473}
474
475pub fn try_get_option<'a, T: tokio_postgres::types::FromSql<'a>>(
476    row: &'a tokio_postgres::Row,
477    column: &str,
478) -> Option<T> {
479    match row.try_get::<&str, T>(column) {
480        Ok(value) => Some(value),
481        Err(_) => None,
482    }
483}