degen_sql/db/postgres/
postgres_db.rs

1use deadpool_postgres::Timeouts;
2use tokio_postgres::Client;
3use crate::db::postgres::models::model::PostgresModelError;
4use tokio::time::timeout;
5use tokio::time::Duration;
6use tokio::time::sleep;
7use log::info;
8use tokio;
9use tokio_postgres::{Error as PostgresError, NoTls};
10
11use std::error::Error;
12use tokio_postgres_migration::Migration;
13
14use dotenvy::dotenv;
15use std::env;
16
17use std::fs;
18use std::str;
19
20type MigrationDefinition = (String, String);
21
22#[derive(Clone)]
23struct Migrations {
24    up: Vec<MigrationDefinition>,
25    down: Vec<MigrationDefinition>,
26}
27
28pub trait MigrationAsStr {
29    fn to_str(&self) -> (&str, &str);
30}
31
32impl MigrationAsStr for MigrationDefinition {
33    fn to_str(&self) -> (&str, &str) {
34        return (self.0.as_str(), self.1.as_str());
35    }
36}
37
38pub struct Database {
39
40     pool: deadpool_postgres::Pool, // Or other pool implementation
41
42  //  pub client: Option<  tokio_postgres::Client > ,
43    pub migrations_dir_path: Option<String>,
44    pub connection_url:  String  , 
45   
46    pub max_reconnect_attempts: u32, 
47    pub timeout_duration: Duration, 
48}
49
50
51
52
53
54/*
55#[derive(Debug)]
56pub enum DatabaseError {
57     ConnectionFailed ,
58    PoolCreationFailed(String),
59    QueryFailed(tokio_postgres::Error),
60
61    RowParseError(String) , 
62
63    PostgresError(tokio_postgres::Error),
64    PoolError(deadpool::managed::PoolError<tokio_postgres::Error>),
65}
66
67impl From<tokio_postgres::Error> for DatabaseError {
68    fn from(error: tokio_postgres::Error) -> Self {
69        DatabaseError::PostgresError(error)
70    }
71}
72
73impl From<deadpool::managed::PoolError<tokio_postgres::Error>> for DatabaseError {
74    fn from(error: deadpool::managed::PoolError<tokio_postgres::Error>) -> Self {
75        DatabaseError::PoolError(error)
76    }
77}
78
79// First implement Display for your error type
80impl std::fmt::Display for DatabaseError {
81    fn fmt(&self, f: &mut  std::fmt::Formatter<'_>) ->  std::fmt::Result {
82        match self {
83            DatabaseError::ConnectionFailed  => write!(f, "Database connection failed"),
84            DatabaseError::PoolCreationFailed(msg) => write!(f, "Connection pool creation failed: {}", msg),
85            DatabaseError::QueryFailed(err) => write!(f, "Database query failed: {}", err),
86            DatabaseError::PoolError(err) => write!(f, "Pool error: {}", err),
87            DatabaseError::PostgresError( err ) =>  write!(f, "Postgres error: {}", err),
88            DatabaseError::RowParseError( msg ) => write!(f, "row parse error: {}", msg),
89            // Handle other variants
90        }
91    }
92}
93
94// Then implement the Error trait
95impl Error for DatabaseError {
96    fn source(&self) -> Option<&(dyn Error + 'static)> {
97        match self {
98            DatabaseError::QueryFailed(err) => Some(err),
99            // For PoolError, you'd need to check if it implements Error
100            // Otherwise return None for this variant
101            _ => None,
102        }
103    }
104}
105*/
106
107
108
109#[derive(Debug,Clone)]
110pub struct DatabaseCredentials {
111    pub db_name: String,
112    pub db_host: String,
113    pub db_user: String,
114    pub db_password: String,
115}
116
117impl Default for DatabaseCredentials {
118    fn default() -> Self {
119        Self {
120            db_name: "postgres".into(),
121            db_host: "localhost".into(),
122            db_user: "postgres".into(),
123            db_password: "postgres".into(),
124        }
125    }
126}
127
128impl DatabaseCredentials {
129    pub fn from_env() -> Self {
130        //dotenv().expect(".env file not found"); //need to run this beforehand !! 
131
132        Self {
133            db_name: env::var("DB_NAME").unwrap_or("postgres".into()),
134            db_host: env::var("DB_HOST").unwrap_or("localhost".into()),
135            db_user: env::var("DB_USER").unwrap_or("postgres".into()),
136            db_password: env::var("DB_PASSWORD").unwrap_or("postgres".into()),
137        }
138    }
139
140    pub fn build_connection_url(&self) -> String {
141        return format!(
142            "postgres://{}:{}@{}/{}",
143            self.db_user, self.db_password, self.db_host, self.db_name
144        );
145    }
146}
147
148enum PostgresInputType {
149    Query,
150    QueryOne,
151    Execute,
152}
153
154struct PostgresInput<'a> {
155    input_type: PostgresInputType,
156    query: String,
157    params: &'a [&'a (dyn tokio_postgres::types::ToSql + Sync)],
158}
159
160impl Database {
161    pub fn new(
162        conn_url: String, 
163        max_pool_connections: usize, 
164        migrations_dir_path: Option<String>,
165    ) -> Result<Database, PostgresModelError> {
166        // Parse the connection config from the URL
167        let config: tokio_postgres::Config = conn_url.parse()
168            .map_err(|_e| PostgresModelError::ConnectionFailed )?;
169            
170        // Create a manager using the config
171        let manager = deadpool_postgres::Manager::new(config, tokio_postgres::NoTls);
172
173/*
174        let deadpool_timeouts = Timeouts {
175            create: Some(Duration::from_secs(5)),
176            recycle: Some(Duration::from_secs(5)),
177            wait: Some(Duration::from_secs(5))
178        };  */
179        
180        // Create the pool with builder pattern
181        let pool = deadpool_postgres::Pool::builder(manager)
182            .max_size( max_pool_connections )
183           // .timeouts( deadpool_timeouts ) 
184            .build()
185            .map_err(|e| PostgresModelError::PoolCreationFailed(e.to_string()))?;
186 
187
188        
189        Ok(Database {
190            pool,
191            migrations_dir_path,
192            connection_url: conn_url,
193            max_reconnect_attempts: 3,
194            timeout_duration: Duration::from_secs(5)
195        })
196    }
197}
198
199
200impl Database {
201
202   
203
204    pub async fn connect(
205       // credentials: DatabaseCredentials,
206       &  self 
207    ) -> Result<Client, PostgresError> {
208        // Define the connection URL.
209       // let conn_url = credentials.build_connection_url();
210
211        info!("Connecting to db: {}", self.connection_url);
212
213        let (client, connection) = tokio_postgres::connect(&self.connection_url, NoTls).await?;
214
215        // The connection object performs the actual communication with the database,
216        // so spawn it off to run on its own.
217        tokio::spawn(async move {
218            //this is a blocking call i think !!!
219            if let Err(e) = connection.await {
220                eprintln!("postgres connection error: {}", e);
221            }
222        });
223
224     //   self.client = Some(client);
225
226        Ok( client )
227    }
228
229
230/*
231    pub async fn reconnect(&mut self  ) -> Result<Option< Client >, PostgresError> {
232        let max_retries = 5;
233        let mut attempt = 0;
234         let conn_url = self.connection_url.clone() ;
235
236        while attempt < max_retries {
237            info!("Attempt {}: Reconnecting to database...", attempt + 1);
238
239            match tokio_postgres::connect(&conn_url, NoTls).await {
240                Ok((client, connection)) => {
241                    // Spawn a task to keep the connection alive
242                    tokio::spawn(async move {
243                        if let Err(e) = connection.await {
244                            eprintln!("postgres connection error: {}", e);
245                        }
246                    });
247
248                //    self.client = Some(client); // Replace old client with the new one
249                    info!("Reconnection successful.");
250                    return Ok( Some(client) );
251                }
252                Err(e) => {
253                  
254                    attempt += 1;
255
256                    if attempt == max_retries {
257                        return Err( e.into() )
258                    }
259                      eprintln!("Reconnection failed: {}. Retrying...", e);
260                    sleep(Duration::from_secs(2_u64.pow(attempt))).await; // Exponential backoff
261                }
262            }
263        }
264
265       Ok(None)  //should error ? 
266    }*/
267
268
269    fn read_migration_files(migrations_dir_path: Option<String>) -> Migrations {
270        let mut migrations = Migrations {
271            up: Vec::new(),
272            down: Vec::new(),
273        };
274
275        let migrations_dir =
276            migrations_dir_path.unwrap_or("./src/db/postgres/migrations".to_string());
277        let migration_dir_files = fs::read_dir(&migrations_dir).expect("Failed to read directory");
278
279        for file in migration_dir_files {
280            let file = file.expect("Failed to read migration file");
281
282            let path = file.path();
283            let filename = path.file_stem().unwrap().to_str().unwrap();
284
285            let filename_without_extension: &str = filename.split('.').next().unwrap();
286
287            // Read file contents
288            let contents = fs::read_to_string(file.path()).expect("Failed to read file contents");
289
290            //let contents = str::from_utf8(file.contents()).unwrap();
291
292            info!("File name: {}", filename);
293
294            if filename.contains(".down") {
295                info!("File contents: {}", contents);
296                migrations
297                    .down
298                    .push((filename_without_extension.into(), contents.clone()));
299            }
300
301            if filename.contains(".up") {
302                info!("File contents: {}", contents);
303                migrations
304                    .up
305                    .push((filename_without_extension.into(), contents.clone()));
306            }
307        }
308        
309            
310        // Sort `up` migrations in ascending alphabetical order
311        migrations.up.sort_by(|a, b| a.0.cmp(&b.0));
312        
313        // Sort `down` migrations in descending alphabetical order
314        migrations.down.sort_by(|a, b| b.0.cmp(&a.0));
315                
316
317        return migrations;
318    }
319
320    pub async fn migrate(&mut self) -> Result<(), Box<dyn Error>> {
321        let client = &mut self.connect().await?;
322
323        let migrations_dir_path = self.migrations_dir_path.clone();
324        let mut migrations: Migrations = Self::read_migration_files(migrations_dir_path);
325
326        for up_migration in migrations.up.drain(..) {
327            println!("migrating {} {} ", up_migration.0, up_migration.1);
328            let migration = Migration::new("migrations".to_string());
329
330            // execute non existing migrations
331            migration.up(client, &[up_migration.to_str()]).await?;
332        }
333
334        // ...
335        Ok(())
336    }
337
338    //basically need to do the DOWN migrations and also delete some records from the migrations table
339    //need to read from the migrations table with MigrationsModel::find
340    pub async fn rollback(&mut self) -> Result<(), Box<dyn Error>> {
341        Ok(())
342    }
343
344    pub async fn rollback_full(&mut self) -> Result<(), Box<dyn Error>> {
345        let migrations_dir_path = self.migrations_dir_path.clone();
346
347        let mut migrations: Migrations = Self::read_migration_files(migrations_dir_path);
348
349            let client = &mut self.connect().await?;
350
351        for down_migration in migrations.down.drain(..)
352         {
353            println!("migrating {}", down_migration.0);
354            let migration = Migration::new("migrations".to_string());
355            // execute non existing migrations
356            migration.down(client, &[down_migration.to_str()]).await?;
357        }
358
359        Ok(())
360    }
361
362     pub async fn query(
363        &self,
364        query: &str,
365        params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
366    ) -> Result<Vec<tokio_postgres::Row>, PostgresModelError> {
367
368        /*let client = &self.connect().await?;
369
370        let rows = client.query(query, params).await?;
371        Ok(rows)*/
372
373         // Get a client from the pool
374        let client = self.pool.get().await?;
375        
376        // Execute the query and let the client be dropped automatically afterward
377        let rows = client.query(query, params).await?;
378        
379        Ok(rows)
380
381    } 
382 
383
384    pub async fn query_one(
385        & self,
386        query: &str,
387        params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
388    ) -> Result<tokio_postgres::Row, PostgresModelError> {
389            /*
390          let client = &self.connect().await?;
391
392        let rows =  client.query_one(query, params).await?;
393        Ok(rows)
394
395        */
396
397
398         let client = self.pool.get().await ?;
399        
400        // Execute the query and let the client be dropped automatically afterward
401        let row = client.query_one(query, params).await?;
402        
403        Ok(row)
404    }
405 
406    //use for insert, update, etc
407    pub async fn execute(
408        &  self,
409        query: &str,
410        params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
411    ) -> Result<u64, PostgresModelError> {
412        /*
413          let client = &self.connect().await?;
414
415        
416
417        let rows = client.execute(query, params).await?;
418        Ok(rows)*/
419
420
421         // Get a client from the pool
422        let client = self.pool.get().await?;
423        
424        // Execute the query and let the client be dropped automatically afterward
425        let count = client.execute(query, params).await?;
426        
427        Ok(count)
428
429
430    }   
431
432
433
434    pub async fn check_connection(&self) -> Result<bool, PostgresModelError> {
435    // Get a client from the pool
436    let client = self.pool.get().await?;
437    
438    // Execute a simple query to check the connection
439    match client.execute("SELECT 1", &[]).await {
440        Ok(_) => Ok(true),
441        Err(e) => Err(PostgresModelError::from(e))
442    }
443}
444
445    pub async fn recreate_pool(&mut self) -> Result<(), PostgresModelError> {
446        // Parse the connection config from the URL
447        let config: tokio_postgres::Config = self.connection_url.parse()
448            .map_err(|_e| PostgresModelError::ConnectionFailed)?;
449            
450        // Create a manager using the config
451        let manager = deadpool_postgres::Manager::new(config, tokio_postgres::NoTls);
452        
453        // Create a new pool
454        let new_pool = deadpool_postgres::Pool::builder(manager)
455            .max_size(16)
456            .build()
457            .map_err(|e| PostgresModelError::PoolCreationFailed(e.to_string()))?;
458        
459        // Replace the old pool
460        self.pool = new_pool;
461        
462        Ok(())
463    }
464
465         
466     pub async fn query_with_timeout(
467        &self,
468        query: &str,
469        params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
470    ) -> Result<Vec<tokio_postgres::Row>, PostgresModelError> {
471        match timeout(self.timeout_duration, self.query(query, params)).await {
472            Ok(result) => result,
473            Err(_) => Err(PostgresModelError::ConnectionFailed) // Timeout occurred
474        }
475    }
476
477    pub async fn query_with_retry(
478        &self,
479        query: &str,
480        params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
481    ) -> Result<Vec<tokio_postgres::Row>, PostgresModelError> {
482        let mut attempts = 0;
483        
484        while attempts < self.max_reconnect_attempts {
485            match self.query(query, params).await {
486                Ok(result) => return Ok(result),
487                Err(e) => {
488                    // If it's a connection error, retry
489                    if let PostgresModelError::PoolError(_) = e {
490                        attempts += 1;
491                        if attempts >= self.max_reconnect_attempts {
492                            return Err(e);
493                        }
494                        // Wait before retrying
495                        sleep(Duration::from_secs(2_u64.pow(attempts))).await;
496                    } else {
497                        // For other errors, return immediately
498                        return Err(e);
499                    }
500                }
501            }
502        }
503        
504        Err(PostgresModelError::ConnectionFailed)
505    }
506
507
508    /*async fn atomic_transaction(
509        &  self,
510        steps: Vec<PostgresInput<'_>>,
511    ) -> Result<(), PostgresError> {
512          let client = &mut self.connect().await?;
513
514        // Start a transaction
515        let transaction = client.transaction().await?;
516
517        //for each step in steps
518        for step in steps {
519            //execute the step
520            let result = transaction.execute(&step.query, step.params).await;
521            //check if the result is ok
522            if result.is_err() {
523                //if not rollback
524                transaction.rollback().await?;
525                //return error
526                return Err(PostgresError::from(result.err().unwrap()));
527            }
528        }
529
530        //if all steps are ok commit
531        transaction.commit().await?;
532        //return ok
533        Ok(())
534    }*/
535}
536
537pub fn try_get_option<'a, T: tokio_postgres::types::FromSql<'a>>(
538    row: &'a tokio_postgres::Row,
539    column: &str,
540) -> Option<T> {
541    match row.try_get::<&str, T>(column) {
542        Ok(value) => Some(value),
543        Err(_) => None,
544    }
545}