degen_sql/db/postgres/
postgres_db.rs

1use tokio_postgres::Client;
2use crate::db::postgres::models::model::PostgresModelError;
3use tokio::time::timeout;
4use tokio::time::Duration;
5use tokio::time::sleep;
6use log::info;
7use tokio;
8use tokio_postgres::{Error as PostgresError, NoTls};
9
10use std::error::Error;
11use tokio_postgres_migration::Migration;
12
13use dotenvy::dotenv;
14use std::env;
15
16use std::fs;
17use std::str;
18
19type MigrationDefinition = (String, String);
20
21#[derive(Clone)]
22struct Migrations {
23    up: Vec<MigrationDefinition>,
24    down: Vec<MigrationDefinition>,
25}
26
27pub trait MigrationAsStr {
28    fn to_str(&self) -> (&str, &str);
29}
30
31impl MigrationAsStr for MigrationDefinition {
32    fn to_str(&self) -> (&str, &str) {
33        return (self.0.as_str(), self.1.as_str());
34    }
35}
36
37pub struct Database {
38
39     pool: deadpool_postgres::Pool, // Or other pool implementation
40
41  //  pub client: Option<  tokio_postgres::Client > ,
42    pub migrations_dir_path: Option<String>,
43    pub connection_url:  String  , 
44   
45    pub max_reconnect_attempts: u32, 
46    pub timeout_duration: Duration, 
47}
48
49
50
51
52
53
54#[derive(Debug)]
55pub enum DatabaseError {
56     ConnectionFailed ,
57    PoolCreationFailed(String),
58    QueryFailed(tokio_postgres::Error),
59
60    RowParseError(String) , 
61
62    PostgresError(tokio_postgres::Error),
63    PoolError(deadpool::managed::PoolError<tokio_postgres::Error>),
64}
65
66impl From<tokio_postgres::Error> for DatabaseError {
67    fn from(error: tokio_postgres::Error) -> Self {
68        DatabaseError::PostgresError(error)
69    }
70}
71
72impl From<deadpool::managed::PoolError<tokio_postgres::Error>> for DatabaseError {
73    fn from(error: deadpool::managed::PoolError<tokio_postgres::Error>) -> Self {
74        DatabaseError::PoolError(error)
75    }
76}
77
78// First implement Display for your error type
79impl std::fmt::Display for DatabaseError {
80    fn fmt(&self, f: &mut  std::fmt::Formatter<'_>) ->  std::fmt::Result {
81        match self {
82            DatabaseError::ConnectionFailed  => write!(f, "Database connection failed"),
83            DatabaseError::PoolCreationFailed(msg) => write!(f, "Connection pool creation failed: {}", msg),
84            DatabaseError::QueryFailed(err) => write!(f, "Database query failed: {}", err),
85            DatabaseError::PoolError(err) => write!(f, "Pool error: {}", err),
86            DatabaseError::PostgresError( err ) =>  write!(f, "Postgres error: {}", err),
87            DatabaseError::RowParseError( msg ) => write!(f, "row parse error: {}", msg),
88            // Handle other variants
89        }
90    }
91}
92
93// Then implement the Error trait
94impl Error for DatabaseError {
95    fn source(&self) -> Option<&(dyn Error + 'static)> {
96        match self {
97            DatabaseError::QueryFailed(err) => Some(err),
98            // For PoolError, you'd need to check if it implements Error
99            // Otherwise return None for this variant
100            _ => None,
101        }
102    }
103}
104
105
106
107
108#[derive(Debug,Clone)]
109pub struct DatabaseCredentials {
110    pub db_name: String,
111    pub db_host: String,
112    pub db_user: String,
113    pub db_password: String,
114}
115
116impl Default for DatabaseCredentials {
117    fn default() -> Self {
118        Self {
119            db_name: "postgres".into(),
120            db_host: "localhost".into(),
121            db_user: "postgres".into(),
122            db_password: "postgres".into(),
123        }
124    }
125}
126
127impl DatabaseCredentials {
128    pub fn from_env() -> Self {
129        //dotenv().expect(".env file not found"); //need to run this beforehand !! 
130
131        Self {
132            db_name: env::var("DB_NAME").unwrap_or("postgres".into()),
133            db_host: env::var("DB_HOST").unwrap_or("localhost".into()),
134            db_user: env::var("DB_USER").unwrap_or("postgres".into()),
135            db_password: env::var("DB_PASSWORD").unwrap_or("postgres".into()),
136        }
137    }
138
139    pub fn build_connection_url(&self) -> String {
140        return format!(
141            "postgres://{}:{}@{}/{}",
142            self.db_user, self.db_password, self.db_host, self.db_name
143        );
144    }
145}
146
147enum PostgresInputType {
148    Query,
149    QueryOne,
150    Execute,
151}
152
153struct PostgresInput<'a> {
154    input_type: PostgresInputType,
155    query: String,
156    params: &'a [&'a (dyn tokio_postgres::types::ToSql + Sync)],
157}
158
159impl Database {
160    pub fn new(
161        conn_url: String, 
162        migrations_dir_path: Option<String>,
163    ) -> Result<Database, DatabaseError> {
164        // Parse the connection config from the URL
165        let config: tokio_postgres::Config = conn_url.parse()
166            .map_err(|_e| DatabaseError::ConnectionFailed )?;
167            
168        // Create a manager using the config
169        let manager = deadpool_postgres::Manager::new(config, tokio_postgres::NoTls);
170        
171        // Create the pool with builder pattern
172        let pool = deadpool_postgres::Pool::builder(manager)
173            .max_size(16)
174            .build()
175            .map_err(|e| DatabaseError::PoolCreationFailed(e.to_string()))?;
176        
177        Ok(Database {
178            pool,
179            migrations_dir_path,
180            connection_url: conn_url,
181            max_reconnect_attempts: 3,
182            timeout_duration: Duration::from_secs(5)
183        })
184    }
185}
186
187
188impl Database {
189
190   
191
192    pub async fn connect(
193       // credentials: DatabaseCredentials,
194       &  self 
195    ) -> Result<Client, PostgresError> {
196        // Define the connection URL.
197       // let conn_url = credentials.build_connection_url();
198
199        info!("Connecting to db: {}", self.connection_url);
200
201        let (client, connection) = tokio_postgres::connect(&self.connection_url, NoTls).await?;
202
203        // The connection object performs the actual communication with the database,
204        // so spawn it off to run on its own.
205        tokio::spawn(async move {
206            //this is a blocking call i think !!!
207            if let Err(e) = connection.await {
208                eprintln!("postgres connection error: {}", e);
209            }
210        });
211
212     //   self.client = Some(client);
213
214        Ok( client )
215    }
216
217
218/*
219    pub async fn reconnect(&mut self  ) -> Result<Option< Client >, PostgresError> {
220        let max_retries = 5;
221        let mut attempt = 0;
222         let conn_url = self.connection_url.clone() ;
223
224        while attempt < max_retries {
225            info!("Attempt {}: Reconnecting to database...", attempt + 1);
226
227            match tokio_postgres::connect(&conn_url, NoTls).await {
228                Ok((client, connection)) => {
229                    // Spawn a task to keep the connection alive
230                    tokio::spawn(async move {
231                        if let Err(e) = connection.await {
232                            eprintln!("postgres connection error: {}", e);
233                        }
234                    });
235
236                //    self.client = Some(client); // Replace old client with the new one
237                    info!("Reconnection successful.");
238                    return Ok( Some(client) );
239                }
240                Err(e) => {
241                  
242                    attempt += 1;
243
244                    if attempt == max_retries {
245                        return Err( e.into() )
246                    }
247                      eprintln!("Reconnection failed: {}. Retrying...", e);
248                    sleep(Duration::from_secs(2_u64.pow(attempt))).await; // Exponential backoff
249                }
250            }
251        }
252
253       Ok(None)  //should error ? 
254    }*/
255
256
257    fn read_migration_files(migrations_dir_path: Option<String>) -> Migrations {
258        let mut migrations = Migrations {
259            up: Vec::new(),
260            down: Vec::new(),
261        };
262
263        let migrations_dir =
264            migrations_dir_path.unwrap_or("./src/db/postgres/migrations".to_string());
265        let migration_dir_files = fs::read_dir(&migrations_dir).expect("Failed to read directory");
266
267        for file in migration_dir_files {
268            let file = file.expect("Failed to read migration file");
269
270            let path = file.path();
271            let filename = path.file_stem().unwrap().to_str().unwrap();
272
273            let filename_without_extension: &str = filename.split('.').next().unwrap();
274
275            // Read file contents
276            let contents = fs::read_to_string(file.path()).expect("Failed to read file contents");
277
278            //let contents = str::from_utf8(file.contents()).unwrap();
279
280            info!("File name: {}", filename);
281
282            if filename.contains(".down") {
283                info!("File contents: {}", contents);
284                migrations
285                    .down
286                    .push((filename_without_extension.into(), contents.clone()));
287            }
288
289            if filename.contains(".up") {
290                info!("File contents: {}", contents);
291                migrations
292                    .up
293                    .push((filename_without_extension.into(), contents.clone()));
294            }
295        }
296        
297            
298        // Sort `up` migrations in ascending alphabetical order
299        migrations.up.sort_by(|a, b| a.0.cmp(&b.0));
300        
301        // Sort `down` migrations in descending alphabetical order
302        migrations.down.sort_by(|a, b| b.0.cmp(&a.0));
303                
304
305        return migrations;
306    }
307
308    pub async fn migrate(&mut self) -> Result<(), Box<dyn Error>> {
309        let client = &mut self.connect().await?;
310
311        let migrations_dir_path = self.migrations_dir_path.clone();
312        let mut migrations: Migrations = Self::read_migration_files(migrations_dir_path);
313
314        for up_migration in migrations.up.drain(..) {
315            println!("migrating {} {} ", up_migration.0, up_migration.1);
316            let migration = Migration::new("migrations".to_string());
317
318            // execute non existing migrations
319            migration.up(client, &[up_migration.to_str()]).await?;
320        }
321
322        // ...
323        Ok(())
324    }
325
326    //basically need to do the DOWN migrations and also delete some records from the migrations table
327    //need to read from the migrations table with MigrationsModel::find
328    pub async fn rollback(&mut self) -> Result<(), Box<dyn Error>> {
329        Ok(())
330    }
331
332    pub async fn rollback_full(&mut self) -> Result<(), Box<dyn Error>> {
333        let migrations_dir_path = self.migrations_dir_path.clone();
334
335        let mut migrations: Migrations = Self::read_migration_files(migrations_dir_path);
336
337            let client = &mut self.connect().await?;
338
339        for down_migration in migrations.down.drain(..)
340         {
341            println!("migrating {}", down_migration.0);
342            let migration = Migration::new("migrations".to_string());
343            // execute non existing migrations
344            migration.down(client, &[down_migration.to_str()]).await?;
345        }
346
347        Ok(())
348    }
349
350     pub async fn query(
351        &self,
352        query: &str,
353        params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
354    ) -> Result<Vec<tokio_postgres::Row>, DatabaseError> {
355
356        /*let client = &self.connect().await?;
357
358        let rows = client.query(query, params).await?;
359        Ok(rows)*/
360
361         // Get a client from the pool
362        let client = self.pool.get().await?;
363        
364        // Execute the query and let the client be dropped automatically afterward
365        let rows = client.query(query, params).await?;
366        
367        Ok(rows)
368
369    } 
370 
371
372    pub async fn query_one(
373        & self,
374        query: &str,
375        params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
376    ) -> Result<tokio_postgres::Row, DatabaseError> {
377            /*
378          let client = &self.connect().await?;
379
380        let rows =  client.query_one(query, params).await?;
381        Ok(rows)
382
383        */
384
385
386         let client = self.pool.get().await ?;
387        
388        // Execute the query and let the client be dropped automatically afterward
389        let row = client.query_one(query, params).await?;
390        
391        Ok(row)
392    }
393 
394    //use for insert, update, etc
395    pub async fn execute(
396        &  self,
397        query: &str,
398        params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
399    ) -> Result<u64, DatabaseError> {
400        /*
401          let client = &self.connect().await?;
402
403        
404
405        let rows = client.execute(query, params).await?;
406        Ok(rows)*/
407
408
409         // Get a client from the pool
410        let client = self.pool.get().await?;
411        
412        // Execute the query and let the client be dropped automatically afterward
413        let count = client.execute(query, params).await?;
414        
415        Ok(count)
416
417
418    }   
419
420
421
422    pub async fn check_connection(&self) -> Result<bool, DatabaseError> {
423    // Get a client from the pool
424    let client = self.pool.get().await?;
425    
426    // Execute a simple query to check the connection
427    match client.execute("SELECT 1", &[]).await {
428        Ok(_) => Ok(true),
429        Err(e) => Err(DatabaseError::from(e))
430    }
431}
432
433    pub async fn recreate_pool(&mut self) -> Result<(), DatabaseError> {
434        // Parse the connection config from the URL
435        let config: tokio_postgres::Config = self.connection_url.parse()
436            .map_err(|_e| DatabaseError::ConnectionFailed)?;
437            
438        // Create a manager using the config
439        let manager = deadpool_postgres::Manager::new(config, tokio_postgres::NoTls);
440        
441        // Create a new pool
442        let new_pool = deadpool_postgres::Pool::builder(manager)
443            .max_size(16)
444            .build()
445            .map_err(|e| DatabaseError::PoolCreationFailed(e.to_string()))?;
446        
447        // Replace the old pool
448        self.pool = new_pool;
449        
450        Ok(())
451    }
452
453         
454     pub async fn query_with_timeout(
455        &self,
456        query: &str,
457        params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
458    ) -> Result<Vec<tokio_postgres::Row>, DatabaseError> {
459        match timeout(self.timeout_duration, self.query(query, params)).await {
460            Ok(result) => result,
461            Err(_) => Err(DatabaseError::ConnectionFailed) // Timeout occurred
462        }
463    }
464
465    pub async fn query_with_retry(
466        &self,
467        query: &str,
468        params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
469    ) -> Result<Vec<tokio_postgres::Row>, DatabaseError> {
470        let mut attempts = 0;
471        
472        while attempts < self.max_reconnect_attempts {
473            match self.query(query, params).await {
474                Ok(result) => return Ok(result),
475                Err(e) => {
476                    // If it's a connection error, retry
477                    if let DatabaseError::PoolError(_) = e {
478                        attempts += 1;
479                        if attempts >= self.max_reconnect_attempts {
480                            return Err(e);
481                        }
482                        // Wait before retrying
483                        sleep(Duration::from_secs(2_u64.pow(attempts))).await;
484                    } else {
485                        // For other errors, return immediately
486                        return Err(e);
487                    }
488                }
489            }
490        }
491        
492        Err(DatabaseError::ConnectionFailed)
493    }
494
495
496    /*async fn atomic_transaction(
497        &  self,
498        steps: Vec<PostgresInput<'_>>,
499    ) -> Result<(), PostgresError> {
500          let client = &mut self.connect().await?;
501
502        // Start a transaction
503        let transaction = client.transaction().await?;
504
505        //for each step in steps
506        for step in steps {
507            //execute the step
508            let result = transaction.execute(&step.query, step.params).await;
509            //check if the result is ok
510            if result.is_err() {
511                //if not rollback
512                transaction.rollback().await?;
513                //return error
514                return Err(PostgresError::from(result.err().unwrap()));
515            }
516        }
517
518        //if all steps are ok commit
519        transaction.commit().await?;
520        //return ok
521        Ok(())
522    }*/
523}
524
525pub fn try_get_option<'a, T: tokio_postgres::types::FromSql<'a>>(
526    row: &'a tokio_postgres::Row,
527    column: &str,
528) -> Option<T> {
529    match row.try_get::<&str, T>(column) {
530        Ok(value) => Some(value),
531        Err(_) => None,
532    }
533}