degen_sql/db/postgres/
postgres_db.rs

1use deadpool_postgres::Timeouts;
2use tokio_postgres::Client;
3use crate::db::postgres::models::model::PostgresModelError;
4use tokio::time::timeout;
5use tokio::time::Duration;
6use tokio::time::sleep;
7use log::info;
8use tokio;
9use tokio_postgres::{Error as PostgresError, NoTls};
10
11use std::error::Error;
12use tokio_postgres_migration::Migration;
13
14use dotenvy::dotenv;
15use std::env;
16
17use std::fs;
18use std::str;
19
20type MigrationDefinition = (String, String);
21
22#[derive(Clone)]
23struct Migrations {
24    up: Vec<MigrationDefinition>,
25    down: Vec<MigrationDefinition>,
26}
27
28pub trait MigrationAsStr {
29    fn to_str(&self) -> (&str, &str);
30}
31
32impl MigrationAsStr for MigrationDefinition {
33    fn to_str(&self) -> (&str, &str) {
34        return (self.0.as_str(), self.1.as_str());
35    }
36}
37
38pub struct Database {
39
40     pool: deadpool_postgres::Pool, // Or other pool implementation
41
42  //  pub client: Option<  tokio_postgres::Client > ,
43    pub migrations_dir_path: Option<String>,
44    pub connection_url:  String  , 
45   
46    pub max_reconnect_attempts: u32, 
47    pub timeout_duration: Duration, 
48}
49
50
51
52 
53
54
55#[derive(Debug,Clone)]
56pub struct DatabaseCredentials {
57    pub db_name: String,
58    pub db_host: String,
59    pub db_user: String,
60    pub db_password: String,
61}
62
63impl Default for DatabaseCredentials {
64    fn default() -> Self {
65        Self {
66            db_name: "postgres".into(),
67            db_host: "localhost".into(),
68            db_user: "postgres".into(),
69            db_password: "postgres".into(),
70        }
71    }
72}
73
74impl DatabaseCredentials {
75    pub fn from_env() -> Self {
76        //dotenv().expect(".env file not found"); //need to run this beforehand !! 
77
78        Self {
79            db_name: env::var("DB_NAME").unwrap_or("postgres".into()),
80            db_host: env::var("DB_HOST").unwrap_or("localhost".into()),
81            db_user: env::var("DB_USER").unwrap_or("postgres".into()),
82            db_password: env::var("DB_PASSWORD").unwrap_or("postgres".into()),
83        }
84    }
85
86    pub fn build_connection_url(&self) -> String {
87        return format!(
88            "postgres://{}:{}@{}/{}",
89            self.db_user, self.db_password, self.db_host, self.db_name
90        );
91    }
92}
93
94enum PostgresInputType {
95    Query,
96    QueryOne,
97    Execute,
98}
99
100struct PostgresInput<'a> {
101    input_type: PostgresInputType,
102    query: String,
103    params: &'a [&'a (dyn tokio_postgres::types::ToSql + Sync)],
104}
105
106impl Database {
107    pub fn new(
108        conn_url: String, 
109        max_pool_connections: usize, 
110        migrations_dir_path: Option<String>,
111    ) -> Result<Database, PostgresModelError> {
112        // Parse the connection config from the URL
113        let config: tokio_postgres::Config = conn_url.parse()
114            .map_err(|_e| PostgresModelError::ConnectionFailed )?;
115            
116        // Create a manager using the config
117        let manager = deadpool_postgres::Manager::new(config, tokio_postgres::NoTls);
118
119/*
120        let deadpool_timeouts = Timeouts {
121            create: Some(Duration::from_secs(5)),
122            recycle: Some(Duration::from_secs(5)),
123            wait: Some(Duration::from_secs(5))
124        };  */
125        
126        // Create the pool with builder pattern
127        let pool = deadpool_postgres::Pool::builder(manager)
128            .max_size( max_pool_connections )
129           // .timeouts( deadpool_timeouts ) 
130            .build()
131            .map_err(|e| PostgresModelError::PoolCreationFailed(e.to_string()))?;
132 
133
134        
135        Ok(Database {
136            pool,
137            migrations_dir_path,
138            connection_url: conn_url,
139            max_reconnect_attempts: 3,
140            timeout_duration: Duration::from_secs(5)
141        })
142    }
143}
144
145
146impl Database {
147
148   
149
150    pub async fn connect(
151       // credentials: DatabaseCredentials,
152       &  self 
153    ) -> Result<Client, PostgresError> {
154        // Define the connection URL.
155       // let conn_url = credentials.build_connection_url();
156
157        info!("Connecting to db: {}", self.connection_url);
158
159        let (client, connection) = tokio_postgres::connect(&self.connection_url, NoTls).await?;
160
161        // The connection object performs the actual communication with the database,
162        // so spawn it off to run on its own.
163        tokio::spawn(async move {
164            //this is a blocking call i think !!!
165            if let Err(e) = connection.await {
166                eprintln!("postgres connection error: {}", e);
167            }
168        });
169
170     //   self.client = Some(client);
171
172        Ok( client )
173    }
174
175 
176
177    fn read_migration_files(migrations_dir_path: Option<String>) -> Migrations {
178        let mut migrations = Migrations {
179            up: Vec::new(),
180            down: Vec::new(),
181        };
182
183        let migrations_dir =
184            migrations_dir_path.unwrap_or("./src/db/postgres/migrations".to_string());
185        let migration_dir_files = fs::read_dir(&migrations_dir).expect("Failed to read directory");
186
187        for file in migration_dir_files {
188            let file = file.expect("Failed to read migration file");
189
190            let path = file.path();
191            let filename = path.file_stem().unwrap().to_str().unwrap();
192
193            let filename_without_extension: &str = filename.split('.').next().unwrap();
194
195            // Read file contents
196            let contents = fs::read_to_string(file.path()).expect("Failed to read file contents");
197
198            //let contents = str::from_utf8(file.contents()).unwrap();
199
200            info!("File name: {}", filename);
201
202            if filename.contains(".down") {
203                info!("File contents: {}", contents);
204                migrations
205                    .down
206                    .push((filename_without_extension.into(), contents.clone()));
207            }
208
209            if filename.contains(".up") {
210                info!("File contents: {}", contents);
211                migrations
212                    .up
213                    .push((filename_without_extension.into(), contents.clone()));
214            }
215        }
216        
217            
218        // Sort `up` migrations in ascending alphabetical order
219        migrations.up.sort_by(|a, b| a.0.cmp(&b.0));
220        
221        // Sort `down` migrations in descending alphabetical order
222        migrations.down.sort_by(|a, b| b.0.cmp(&a.0));
223                
224
225        return migrations;
226    }
227
228    pub async fn migrate(&mut self) -> Result<(), Box<dyn Error>> {
229        let client = &mut self.connect().await?;
230
231        let migrations_dir_path = self.migrations_dir_path.clone();
232        let mut migrations: Migrations = Self::read_migration_files(migrations_dir_path);
233
234        for up_migration in migrations.up.drain(..) {
235            println!("migrating {} {} ", up_migration.0, up_migration.1);
236            let migration = Migration::new("migrations".to_string());
237
238            // execute non existing migrations
239            migration.up(client, &[up_migration.to_str()]).await?;
240        }
241
242        // ...
243        Ok(())
244    }
245
246    //basically need to do the DOWN migrations and also delete some records from the migrations table
247    //need to read from the migrations table with MigrationsModel::find
248    pub async fn rollback(&mut self) -> Result<(), Box<dyn Error>> {
249        Ok(())
250    }
251
252    pub async fn rollback_full(&mut self) -> Result<(), Box<dyn Error>> {
253        let migrations_dir_path = self.migrations_dir_path.clone();
254
255        let mut migrations: Migrations = Self::read_migration_files(migrations_dir_path);
256
257            let client = &mut self.connect().await?;
258
259        for down_migration in migrations.down.drain(..)
260         {
261            println!("migrating {}", down_migration.0);
262            let migration = Migration::new("migrations".to_string());
263            // execute non existing migrations
264            migration.down(client, &[down_migration.to_str()]).await?;
265        }
266
267        Ok(())
268    }
269
270     pub async fn query(
271        &self,
272        query: &str,
273        params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
274    ) -> Result<Vec<tokio_postgres::Row>, PostgresModelError> {
275
276        /*let client = &self.connect().await?;
277
278        let rows = client.query(query, params).await?;
279        Ok(rows)*/
280
281         // Get a client from the pool
282        let client = self.pool.get().await?;
283        
284        // Execute the query and let the client be dropped automatically afterward
285        let rows = client.query(query, params).await?;
286        
287        Ok(rows)
288
289    } 
290 
291
292    pub async fn query_one(
293        & self,
294        query: &str,
295        params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
296    ) -> Result<tokio_postgres::Row, PostgresModelError> {
297            /*
298          let client = &self.connect().await?;
299
300        let rows =  client.query_one(query, params).await?;
301        Ok(rows)
302
303        */
304
305
306         let client = self.pool.get().await ?;
307        
308        // Execute the query and let the client be dropped automatically afterward
309        let row = client.query_one(query, params).await?;
310        
311        Ok(row)
312    }
313 
314    //use for insert, update, etc
315    pub async fn execute(
316        &  self,
317        query: &str,
318        params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
319    ) -> Result<u64, PostgresModelError> {
320        /*
321          let client = &self.connect().await?;
322
323        
324
325        let rows = client.execute(query, params).await?;
326        Ok(rows)*/
327
328
329         // Get a client from the pool
330        let client = self.pool.get().await?;
331        
332        // Execute the query and let the client be dropped automatically afterward
333        let count = client.execute(query, params).await?;
334        
335        Ok(count)
336
337
338    }   
339
340
341
342    pub async fn check_connection(&self) -> Result<bool, PostgresModelError> {
343    // Get a client from the pool
344    let client = self.pool.get().await?;
345    
346    // Execute a simple query to check the connection
347    match client.execute("SELECT 1", &[]).await {
348        Ok(_) => Ok(true),
349        Err(e) => Err(PostgresModelError::from(e))
350    }
351}
352    
353    /*
354    pub async fn recreate_pool(&mut self) -> Result<(), PostgresModelError> {
355        // Parse the connection config from the URL
356        let config: tokio_postgres::Config = self.connection_url.parse()
357            .map_err(|_e| PostgresModelError::ConnectionFailed)?;
358            
359        // Create a manager using the config
360        let manager = deadpool_postgres::Manager::new(config, tokio_postgres::NoTls);
361        
362        // Create a new pool
363        let new_pool = deadpool_postgres::Pool::builder(manager)
364            .max_size(16)
365            .build()
366            .map_err(|e| PostgresModelError::PoolCreationFailed(e.to_string()))?;
367        
368        // Replace the old pool
369        self.pool = new_pool;
370        
371        Ok(())
372    }
373
374         
375     pub async fn query_with_timeout(
376        &self,
377        query: &str,
378        params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
379    ) -> Result<Vec<tokio_postgres::Row>, PostgresModelError> {
380        match timeout(self.timeout_duration, self.query(query, params)).await {
381            Ok(result) => result,
382            Err(_) => Err(PostgresModelError::ConnectionFailed) // Timeout occurred
383        }
384    }
385
386    pub async fn query_with_retry(
387        &self,
388        query: &str,
389        params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
390    ) -> Result<Vec<tokio_postgres::Row>, PostgresModelError> {
391        let mut attempts = 0;
392        
393        while attempts < self.max_reconnect_attempts {
394            match self.query(query, params).await {
395                Ok(result) => return Ok(result),
396                Err(e) => {
397                    // If it's a connection error, retry
398                    if let PostgresModelError::PoolError(_) = e {
399                        attempts += 1;
400                        if attempts >= self.max_reconnect_attempts {
401                            return Err(e);
402                        }
403                        // Wait before retrying
404                        sleep(Duration::from_secs(2_u64.pow(attempts))).await;
405                    } else {
406                        // For other errors, return immediately
407                        return Err(e);
408                    }
409                }
410            }
411        }
412        
413        Err(PostgresModelError::ConnectionFailed)
414    }*/
415
416 
417}
418
419pub fn try_get_option<'a, T: tokio_postgres::types::FromSql<'a>>(
420    row: &'a tokio_postgres::Row,
421    column: &str,
422) -> Option<T> {
423    match row.try_get::<&str, T>(column) {
424        Ok(value) => Some(value),
425        Err(_) => None,
426    }
427}