degen_sql/db/postgres/
postgres_db.rs

1use tokio_postgres::Client;
2use crate::db::postgres::models::model::PostgresModelError;
3use tokio::time::timeout;
4use tokio::time::Duration;
5use tokio::time::sleep;
6use log::info;
7use tokio;
8use tokio_postgres::{Error as PostgresError, NoTls};
9
10use std::error::Error;
11use tokio_postgres_migration::Migration;
12
13use dotenvy::dotenv;
14use std::env;
15
16use std::fs;
17use std::str;
18
19type MigrationDefinition = (String, String);
20
21#[derive(Clone)]
22struct Migrations {
23    up: Vec<MigrationDefinition>,
24    down: Vec<MigrationDefinition>,
25}
26
27pub trait MigrationAsStr {
28    fn to_str(&self) -> (&str, &str);
29}
30
31impl MigrationAsStr for MigrationDefinition {
32    fn to_str(&self) -> (&str, &str) {
33        return (self.0.as_str(), self.1.as_str());
34    }
35}
36
37pub struct Database {
38
39     pool: deadpool_postgres::Pool, // Or other pool implementation
40
41  //  pub client: Option<  tokio_postgres::Client > ,
42    pub migrations_dir_path: Option<String>,
43    pub connection_url:  String  , 
44   
45    pub max_reconnect_attempts: u32, 
46    pub timeout_duration: Duration, 
47}
48
49
50
51
52
53/*
54#[derive(Debug)]
55pub enum DatabaseError {
56     ConnectionFailed ,
57    PoolCreationFailed(String),
58    QueryFailed(tokio_postgres::Error),
59
60    RowParseError(String) , 
61
62    PostgresError(tokio_postgres::Error),
63    PoolError(deadpool::managed::PoolError<tokio_postgres::Error>),
64}
65
66impl From<tokio_postgres::Error> for DatabaseError {
67    fn from(error: tokio_postgres::Error) -> Self {
68        DatabaseError::PostgresError(error)
69    }
70}
71
72impl From<deadpool::managed::PoolError<tokio_postgres::Error>> for DatabaseError {
73    fn from(error: deadpool::managed::PoolError<tokio_postgres::Error>) -> Self {
74        DatabaseError::PoolError(error)
75    }
76}
77
78// First implement Display for your error type
79impl std::fmt::Display for DatabaseError {
80    fn fmt(&self, f: &mut  std::fmt::Formatter<'_>) ->  std::fmt::Result {
81        match self {
82            DatabaseError::ConnectionFailed  => write!(f, "Database connection failed"),
83            DatabaseError::PoolCreationFailed(msg) => write!(f, "Connection pool creation failed: {}", msg),
84            DatabaseError::QueryFailed(err) => write!(f, "Database query failed: {}", err),
85            DatabaseError::PoolError(err) => write!(f, "Pool error: {}", err),
86            DatabaseError::PostgresError( err ) =>  write!(f, "Postgres error: {}", err),
87            DatabaseError::RowParseError( msg ) => write!(f, "row parse error: {}", msg),
88            // Handle other variants
89        }
90    }
91}
92
93// Then implement the Error trait
94impl Error for DatabaseError {
95    fn source(&self) -> Option<&(dyn Error + 'static)> {
96        match self {
97            DatabaseError::QueryFailed(err) => Some(err),
98            // For PoolError, you'd need to check if it implements Error
99            // Otherwise return None for this variant
100            _ => None,
101        }
102    }
103}
104*/
105
106
107
108#[derive(Debug,Clone)]
109pub struct DatabaseCredentials {
110    pub db_name: String,
111    pub db_host: String,
112    pub db_user: String,
113    pub db_password: String,
114}
115
116impl Default for DatabaseCredentials {
117    fn default() -> Self {
118        Self {
119            db_name: "postgres".into(),
120            db_host: "localhost".into(),
121            db_user: "postgres".into(),
122            db_password: "postgres".into(),
123        }
124    }
125}
126
127impl DatabaseCredentials {
128    pub fn from_env() -> Self {
129        //dotenv().expect(".env file not found"); //need to run this beforehand !! 
130
131        Self {
132            db_name: env::var("DB_NAME").unwrap_or("postgres".into()),
133            db_host: env::var("DB_HOST").unwrap_or("localhost".into()),
134            db_user: env::var("DB_USER").unwrap_or("postgres".into()),
135            db_password: env::var("DB_PASSWORD").unwrap_or("postgres".into()),
136        }
137    }
138
139    pub fn build_connection_url(&self) -> String {
140        return format!(
141            "postgres://{}:{}@{}/{}",
142            self.db_user, self.db_password, self.db_host, self.db_name
143        );
144    }
145}
146
147enum PostgresInputType {
148    Query,
149    QueryOne,
150    Execute,
151}
152
153struct PostgresInput<'a> {
154    input_type: PostgresInputType,
155    query: String,
156    params: &'a [&'a (dyn tokio_postgres::types::ToSql + Sync)],
157}
158
159impl Database {
160    pub fn new(
161        conn_url: String, 
162        max_pool_connections: usize, 
163        migrations_dir_path: Option<String>,
164    ) -> Result<Database, PostgresModelError> {
165        // Parse the connection config from the URL
166        let config: tokio_postgres::Config = conn_url.parse()
167            .map_err(|_e| PostgresModelError::ConnectionFailed )?;
168            
169        // Create a manager using the config
170        let manager = deadpool_postgres::Manager::new(config, tokio_postgres::NoTls);
171        
172        // Create the pool with builder pattern
173        let pool = deadpool_postgres::Pool::builder(manager)
174            .max_size( max_pool_connections )
175            .build()
176            .map_err(|e| PostgresModelError::PoolCreationFailed(e.to_string()))?;
177        
178        Ok(Database {
179            pool,
180            migrations_dir_path,
181            connection_url: conn_url,
182            max_reconnect_attempts: 3,
183            timeout_duration: Duration::from_secs(5)
184        })
185    }
186}
187
188
189impl Database {
190
191   
192
193    pub async fn connect(
194       // credentials: DatabaseCredentials,
195       &  self 
196    ) -> Result<Client, PostgresError> {
197        // Define the connection URL.
198       // let conn_url = credentials.build_connection_url();
199
200        info!("Connecting to db: {}", self.connection_url);
201
202        let (client, connection) = tokio_postgres::connect(&self.connection_url, NoTls).await?;
203
204        // The connection object performs the actual communication with the database,
205        // so spawn it off to run on its own.
206        tokio::spawn(async move {
207            //this is a blocking call i think !!!
208            if let Err(e) = connection.await {
209                eprintln!("postgres connection error: {}", e);
210            }
211        });
212
213     //   self.client = Some(client);
214
215        Ok( client )
216    }
217
218
219/*
220    pub async fn reconnect(&mut self  ) -> Result<Option< Client >, PostgresError> {
221        let max_retries = 5;
222        let mut attempt = 0;
223         let conn_url = self.connection_url.clone() ;
224
225        while attempt < max_retries {
226            info!("Attempt {}: Reconnecting to database...", attempt + 1);
227
228            match tokio_postgres::connect(&conn_url, NoTls).await {
229                Ok((client, connection)) => {
230                    // Spawn a task to keep the connection alive
231                    tokio::spawn(async move {
232                        if let Err(e) = connection.await {
233                            eprintln!("postgres connection error: {}", e);
234                        }
235                    });
236
237                //    self.client = Some(client); // Replace old client with the new one
238                    info!("Reconnection successful.");
239                    return Ok( Some(client) );
240                }
241                Err(e) => {
242                  
243                    attempt += 1;
244
245                    if attempt == max_retries {
246                        return Err( e.into() )
247                    }
248                      eprintln!("Reconnection failed: {}. Retrying...", e);
249                    sleep(Duration::from_secs(2_u64.pow(attempt))).await; // Exponential backoff
250                }
251            }
252        }
253
254       Ok(None)  //should error ? 
255    }*/
256
257
258    fn read_migration_files(migrations_dir_path: Option<String>) -> Migrations {
259        let mut migrations = Migrations {
260            up: Vec::new(),
261            down: Vec::new(),
262        };
263
264        let migrations_dir =
265            migrations_dir_path.unwrap_or("./src/db/postgres/migrations".to_string());
266        let migration_dir_files = fs::read_dir(&migrations_dir).expect("Failed to read directory");
267
268        for file in migration_dir_files {
269            let file = file.expect("Failed to read migration file");
270
271            let path = file.path();
272            let filename = path.file_stem().unwrap().to_str().unwrap();
273
274            let filename_without_extension: &str = filename.split('.').next().unwrap();
275
276            // Read file contents
277            let contents = fs::read_to_string(file.path()).expect("Failed to read file contents");
278
279            //let contents = str::from_utf8(file.contents()).unwrap();
280
281            info!("File name: {}", filename);
282
283            if filename.contains(".down") {
284                info!("File contents: {}", contents);
285                migrations
286                    .down
287                    .push((filename_without_extension.into(), contents.clone()));
288            }
289
290            if filename.contains(".up") {
291                info!("File contents: {}", contents);
292                migrations
293                    .up
294                    .push((filename_without_extension.into(), contents.clone()));
295            }
296        }
297        
298            
299        // Sort `up` migrations in ascending alphabetical order
300        migrations.up.sort_by(|a, b| a.0.cmp(&b.0));
301        
302        // Sort `down` migrations in descending alphabetical order
303        migrations.down.sort_by(|a, b| b.0.cmp(&a.0));
304                
305
306        return migrations;
307    }
308
309    pub async fn migrate(&mut self) -> Result<(), Box<dyn Error>> {
310        let client = &mut self.connect().await?;
311
312        let migrations_dir_path = self.migrations_dir_path.clone();
313        let mut migrations: Migrations = Self::read_migration_files(migrations_dir_path);
314
315        for up_migration in migrations.up.drain(..) {
316            println!("migrating {} {} ", up_migration.0, up_migration.1);
317            let migration = Migration::new("migrations".to_string());
318
319            // execute non existing migrations
320            migration.up(client, &[up_migration.to_str()]).await?;
321        }
322
323        // ...
324        Ok(())
325    }
326
327    //basically need to do the DOWN migrations and also delete some records from the migrations table
328    //need to read from the migrations table with MigrationsModel::find
329    pub async fn rollback(&mut self) -> Result<(), Box<dyn Error>> {
330        Ok(())
331    }
332
333    pub async fn rollback_full(&mut self) -> Result<(), Box<dyn Error>> {
334        let migrations_dir_path = self.migrations_dir_path.clone();
335
336        let mut migrations: Migrations = Self::read_migration_files(migrations_dir_path);
337
338            let client = &mut self.connect().await?;
339
340        for down_migration in migrations.down.drain(..)
341         {
342            println!("migrating {}", down_migration.0);
343            let migration = Migration::new("migrations".to_string());
344            // execute non existing migrations
345            migration.down(client, &[down_migration.to_str()]).await?;
346        }
347
348        Ok(())
349    }
350
351     pub async fn query(
352        &self,
353        query: &str,
354        params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
355    ) -> Result<Vec<tokio_postgres::Row>, PostgresModelError> {
356
357        /*let client = &self.connect().await?;
358
359        let rows = client.query(query, params).await?;
360        Ok(rows)*/
361
362         // Get a client from the pool
363        let client = self.pool.get().await?;
364        
365        // Execute the query and let the client be dropped automatically afterward
366        let rows = client.query(query, params).await?;
367        
368        Ok(rows)
369
370    } 
371 
372
373    pub async fn query_one(
374        & self,
375        query: &str,
376        params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
377    ) -> Result<tokio_postgres::Row, PostgresModelError> {
378            /*
379          let client = &self.connect().await?;
380
381        let rows =  client.query_one(query, params).await?;
382        Ok(rows)
383
384        */
385
386
387         let client = self.pool.get().await ?;
388        
389        // Execute the query and let the client be dropped automatically afterward
390        let row = client.query_one(query, params).await?;
391        
392        Ok(row)
393    }
394 
395    //use for insert, update, etc
396    pub async fn execute(
397        &  self,
398        query: &str,
399        params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
400    ) -> Result<u64, PostgresModelError> {
401        /*
402          let client = &self.connect().await?;
403
404        
405
406        let rows = client.execute(query, params).await?;
407        Ok(rows)*/
408
409
410         // Get a client from the pool
411        let client = self.pool.get().await?;
412        
413        // Execute the query and let the client be dropped automatically afterward
414        let count = client.execute(query, params).await?;
415        
416        Ok(count)
417
418
419    }   
420
421
422
423    pub async fn check_connection(&self) -> Result<bool, PostgresModelError> {
424    // Get a client from the pool
425    let client = self.pool.get().await?;
426    
427    // Execute a simple query to check the connection
428    match client.execute("SELECT 1", &[]).await {
429        Ok(_) => Ok(true),
430        Err(e) => Err(PostgresModelError::from(e))
431    }
432}
433
434    pub async fn recreate_pool(&mut self) -> Result<(), PostgresModelError> {
435        // Parse the connection config from the URL
436        let config: tokio_postgres::Config = self.connection_url.parse()
437            .map_err(|_e| PostgresModelError::ConnectionFailed)?;
438            
439        // Create a manager using the config
440        let manager = deadpool_postgres::Manager::new(config, tokio_postgres::NoTls);
441        
442        // Create a new pool
443        let new_pool = deadpool_postgres::Pool::builder(manager)
444            .max_size(16)
445            .build()
446            .map_err(|e| PostgresModelError::PoolCreationFailed(e.to_string()))?;
447        
448        // Replace the old pool
449        self.pool = new_pool;
450        
451        Ok(())
452    }
453
454         
455     pub async fn query_with_timeout(
456        &self,
457        query: &str,
458        params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
459    ) -> Result<Vec<tokio_postgres::Row>, PostgresModelError> {
460        match timeout(self.timeout_duration, self.query(query, params)).await {
461            Ok(result) => result,
462            Err(_) => Err(PostgresModelError::ConnectionFailed) // Timeout occurred
463        }
464    }
465
466    pub async fn query_with_retry(
467        &self,
468        query: &str,
469        params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
470    ) -> Result<Vec<tokio_postgres::Row>, PostgresModelError> {
471        let mut attempts = 0;
472        
473        while attempts < self.max_reconnect_attempts {
474            match self.query(query, params).await {
475                Ok(result) => return Ok(result),
476                Err(e) => {
477                    // If it's a connection error, retry
478                    if let PostgresModelError::PoolError(_) = e {
479                        attempts += 1;
480                        if attempts >= self.max_reconnect_attempts {
481                            return Err(e);
482                        }
483                        // Wait before retrying
484                        sleep(Duration::from_secs(2_u64.pow(attempts))).await;
485                    } else {
486                        // For other errors, return immediately
487                        return Err(e);
488                    }
489                }
490            }
491        }
492        
493        Err(PostgresModelError::ConnectionFailed)
494    }
495
496
497    /*async fn atomic_transaction(
498        &  self,
499        steps: Vec<PostgresInput<'_>>,
500    ) -> Result<(), PostgresError> {
501          let client = &mut self.connect().await?;
502
503        // Start a transaction
504        let transaction = client.transaction().await?;
505
506        //for each step in steps
507        for step in steps {
508            //execute the step
509            let result = transaction.execute(&step.query, step.params).await;
510            //check if the result is ok
511            if result.is_err() {
512                //if not rollback
513                transaction.rollback().await?;
514                //return error
515                return Err(PostgresError::from(result.err().unwrap()));
516            }
517        }
518
519        //if all steps are ok commit
520        transaction.commit().await?;
521        //return ok
522        Ok(())
523    }*/
524}
525
526pub fn try_get_option<'a, T: tokio_postgres::types::FromSql<'a>>(
527    row: &'a tokio_postgres::Row,
528    column: &str,
529) -> Option<T> {
530    match row.try_get::<&str, T>(column) {
531        Ok(value) => Some(value),
532        Err(_) => None,
533    }
534}