degen_sql/db/postgres/postgres_db.rs
1use deadpool_postgres::Timeouts;
2use tokio_postgres::Client;
3use crate::db::postgres::models::model::PostgresModelError;
4use tokio::time::timeout;
5use tokio::time::Duration;
6use tokio::time::sleep;
7use log::info;
8use tokio;
9use tokio_postgres::{Error as PostgresError, NoTls};
10
11use std::error::Error;
12use tokio_postgres_migration::Migration;
13
14use dotenvy::dotenv;
15use std::env;
16
17use std::fs;
18use std::str;
19
20type MigrationDefinition = (String, String);
21
22#[derive(Clone)]
23struct Migrations {
24 up: Vec<MigrationDefinition>,
25 down: Vec<MigrationDefinition>,
26}
27
28pub trait MigrationAsStr {
29 fn to_str(&self) -> (&str, &str);
30}
31
32impl MigrationAsStr for MigrationDefinition {
33 fn to_str(&self) -> (&str, &str) {
34 return (self.0.as_str(), self.1.as_str());
35 }
36}
37
38pub struct Database {
39
40 pool: deadpool_postgres::Pool, // Or other pool implementation
41
42 // pub client: Option< tokio_postgres::Client > ,
43 pub migrations_dir_path: Option<String>,
44 pub connection_url: String ,
45
46 pub max_reconnect_attempts: u32,
47 pub timeout_duration: Duration,
48}
49
50
51
52
53
54/*
55#[derive(Debug)]
56pub enum DatabaseError {
57 ConnectionFailed ,
58 PoolCreationFailed(String),
59 QueryFailed(tokio_postgres::Error),
60
61 RowParseError(String) ,
62
63 PostgresError(tokio_postgres::Error),
64 PoolError(deadpool::managed::PoolError<tokio_postgres::Error>),
65}
66
67impl From<tokio_postgres::Error> for DatabaseError {
68 fn from(error: tokio_postgres::Error) -> Self {
69 DatabaseError::PostgresError(error)
70 }
71}
72
73impl From<deadpool::managed::PoolError<tokio_postgres::Error>> for DatabaseError {
74 fn from(error: deadpool::managed::PoolError<tokio_postgres::Error>) -> Self {
75 DatabaseError::PoolError(error)
76 }
77}
78
79// First implement Display for your error type
80impl std::fmt::Display for DatabaseError {
81 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
82 match self {
83 DatabaseError::ConnectionFailed => write!(f, "Database connection failed"),
84 DatabaseError::PoolCreationFailed(msg) => write!(f, "Connection pool creation failed: {}", msg),
85 DatabaseError::QueryFailed(err) => write!(f, "Database query failed: {}", err),
86 DatabaseError::PoolError(err) => write!(f, "Pool error: {}", err),
87 DatabaseError::PostgresError( err ) => write!(f, "Postgres error: {}", err),
88 DatabaseError::RowParseError( msg ) => write!(f, "row parse error: {}", msg),
89 // Handle other variants
90 }
91 }
92}
93
94// Then implement the Error trait
95impl Error for DatabaseError {
96 fn source(&self) -> Option<&(dyn Error + 'static)> {
97 match self {
98 DatabaseError::QueryFailed(err) => Some(err),
99 // For PoolError, you'd need to check if it implements Error
100 // Otherwise return None for this variant
101 _ => None,
102 }
103 }
104}
105*/
106
107
108
109#[derive(Debug,Clone)]
110pub struct DatabaseCredentials {
111 pub db_name: String,
112 pub db_host: String,
113 pub db_user: String,
114 pub db_password: String,
115}
116
117impl Default for DatabaseCredentials {
118 fn default() -> Self {
119 Self {
120 db_name: "postgres".into(),
121 db_host: "localhost".into(),
122 db_user: "postgres".into(),
123 db_password: "postgres".into(),
124 }
125 }
126}
127
128impl DatabaseCredentials {
129 pub fn from_env() -> Self {
130 //dotenv().expect(".env file not found"); //need to run this beforehand !!
131
132 Self {
133 db_name: env::var("DB_NAME").unwrap_or("postgres".into()),
134 db_host: env::var("DB_HOST").unwrap_or("localhost".into()),
135 db_user: env::var("DB_USER").unwrap_or("postgres".into()),
136 db_password: env::var("DB_PASSWORD").unwrap_or("postgres".into()),
137 }
138 }
139
140 pub fn build_connection_url(&self) -> String {
141 return format!(
142 "postgres://{}:{}@{}/{}",
143 self.db_user, self.db_password, self.db_host, self.db_name
144 );
145 }
146}
147
148enum PostgresInputType {
149 Query,
150 QueryOne,
151 Execute,
152}
153
154struct PostgresInput<'a> {
155 input_type: PostgresInputType,
156 query: String,
157 params: &'a [&'a (dyn tokio_postgres::types::ToSql + Sync)],
158}
159
160impl Database {
161 pub fn new(
162 conn_url: String,
163 max_pool_connections: usize,
164 migrations_dir_path: Option<String>,
165 ) -> Result<Database, PostgresModelError> {
166 // Parse the connection config from the URL
167 let config: tokio_postgres::Config = conn_url.parse()
168 .map_err(|_e| PostgresModelError::ConnectionFailed )?;
169
170 // Create a manager using the config
171 let manager = deadpool_postgres::Manager::new(config, tokio_postgres::NoTls);
172
173/*
174 let deadpool_timeouts = Timeouts {
175 create: Some(Duration::from_secs(5)),
176 recycle: Some(Duration::from_secs(5)),
177 wait: Some(Duration::from_secs(5))
178 }; */
179
180 // Create the pool with builder pattern
181 let pool = deadpool_postgres::Pool::builder(manager)
182 .max_size( max_pool_connections )
183 // .timeouts( deadpool_timeouts )
184 .build()
185 .map_err(|e| PostgresModelError::PoolCreationFailed(e.to_string()))?;
186
187
188
189 Ok(Database {
190 pool,
191 migrations_dir_path,
192 connection_url: conn_url,
193 max_reconnect_attempts: 3,
194 timeout_duration: Duration::from_secs(5)
195 })
196 }
197}
198
199
200impl Database {
201
202
203
204 pub async fn connect(
205 // credentials: DatabaseCredentials,
206 & self
207 ) -> Result<Client, PostgresError> {
208 // Define the connection URL.
209 // let conn_url = credentials.build_connection_url();
210
211 info!("Connecting to db: {}", self.connection_url);
212
213 let (client, connection) = tokio_postgres::connect(&self.connection_url, NoTls).await?;
214
215 // The connection object performs the actual communication with the database,
216 // so spawn it off to run on its own.
217 tokio::spawn(async move {
218 //this is a blocking call i think !!!
219 if let Err(e) = connection.await {
220 eprintln!("postgres connection error: {}", e);
221 }
222 });
223
224 // self.client = Some(client);
225
226 Ok( client )
227 }
228
229
230/*
231 pub async fn reconnect(&mut self ) -> Result<Option< Client >, PostgresError> {
232 let max_retries = 5;
233 let mut attempt = 0;
234 let conn_url = self.connection_url.clone() ;
235
236 while attempt < max_retries {
237 info!("Attempt {}: Reconnecting to database...", attempt + 1);
238
239 match tokio_postgres::connect(&conn_url, NoTls).await {
240 Ok((client, connection)) => {
241 // Spawn a task to keep the connection alive
242 tokio::spawn(async move {
243 if let Err(e) = connection.await {
244 eprintln!("postgres connection error: {}", e);
245 }
246 });
247
248 // self.client = Some(client); // Replace old client with the new one
249 info!("Reconnection successful.");
250 return Ok( Some(client) );
251 }
252 Err(e) => {
253
254 attempt += 1;
255
256 if attempt == max_retries {
257 return Err( e.into() )
258 }
259 eprintln!("Reconnection failed: {}. Retrying...", e);
260 sleep(Duration::from_secs(2_u64.pow(attempt))).await; // Exponential backoff
261 }
262 }
263 }
264
265 Ok(None) //should error ?
266 }*/
267
268
269 fn read_migration_files(migrations_dir_path: Option<String>) -> Migrations {
270 let mut migrations = Migrations {
271 up: Vec::new(),
272 down: Vec::new(),
273 };
274
275 let migrations_dir =
276 migrations_dir_path.unwrap_or("./src/db/postgres/migrations".to_string());
277 let migration_dir_files = fs::read_dir(&migrations_dir).expect("Failed to read directory");
278
279 for file in migration_dir_files {
280 let file = file.expect("Failed to read migration file");
281
282 let path = file.path();
283 let filename = path.file_stem().unwrap().to_str().unwrap();
284
285 let filename_without_extension: &str = filename.split('.').next().unwrap();
286
287 // Read file contents
288 let contents = fs::read_to_string(file.path()).expect("Failed to read file contents");
289
290 //let contents = str::from_utf8(file.contents()).unwrap();
291
292 info!("File name: {}", filename);
293
294 if filename.contains(".down") {
295 info!("File contents: {}", contents);
296 migrations
297 .down
298 .push((filename_without_extension.into(), contents.clone()));
299 }
300
301 if filename.contains(".up") {
302 info!("File contents: {}", contents);
303 migrations
304 .up
305 .push((filename_without_extension.into(), contents.clone()));
306 }
307 }
308
309
310 // Sort `up` migrations in ascending alphabetical order
311 migrations.up.sort_by(|a, b| a.0.cmp(&b.0));
312
313 // Sort `down` migrations in descending alphabetical order
314 migrations.down.sort_by(|a, b| b.0.cmp(&a.0));
315
316
317 return migrations;
318 }
319
320 pub async fn migrate(&mut self) -> Result<(), Box<dyn Error>> {
321 let client = &mut self.connect().await?;
322
323 let migrations_dir_path = self.migrations_dir_path.clone();
324 let mut migrations: Migrations = Self::read_migration_files(migrations_dir_path);
325
326 for up_migration in migrations.up.drain(..) {
327 println!("migrating {} {} ", up_migration.0, up_migration.1);
328 let migration = Migration::new("migrations".to_string());
329
330 // execute non existing migrations
331 migration.up(client, &[up_migration.to_str()]).await?;
332 }
333
334 // ...
335 Ok(())
336 }
337
338 //basically need to do the DOWN migrations and also delete some records from the migrations table
339 //need to read from the migrations table with MigrationsModel::find
340 pub async fn rollback(&mut self) -> Result<(), Box<dyn Error>> {
341 Ok(())
342 }
343
344 pub async fn rollback_full(&mut self) -> Result<(), Box<dyn Error>> {
345 let migrations_dir_path = self.migrations_dir_path.clone();
346
347 let mut migrations: Migrations = Self::read_migration_files(migrations_dir_path);
348
349 let client = &mut self.connect().await?;
350
351 for down_migration in migrations.down.drain(..)
352 {
353 println!("migrating {}", down_migration.0);
354 let migration = Migration::new("migrations".to_string());
355 // execute non existing migrations
356 migration.down(client, &[down_migration.to_str()]).await?;
357 }
358
359 Ok(())
360 }
361
362 pub async fn query(
363 &self,
364 query: &str,
365 params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
366 ) -> Result<Vec<tokio_postgres::Row>, PostgresModelError> {
367
368 /*let client = &self.connect().await?;
369
370 let rows = client.query(query, params).await?;
371 Ok(rows)*/
372
373 // Get a client from the pool
374 let client = self.pool.get().await?;
375
376 // Execute the query and let the client be dropped automatically afterward
377 let rows = client.query(query, params).await?;
378
379 Ok(rows)
380
381 }
382
383
384 pub async fn query_one(
385 & self,
386 query: &str,
387 params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
388 ) -> Result<tokio_postgres::Row, PostgresModelError> {
389 /*
390 let client = &self.connect().await?;
391
392 let rows = client.query_one(query, params).await?;
393 Ok(rows)
394
395 */
396
397
398 let client = self.pool.get().await ?;
399
400 // Execute the query and let the client be dropped automatically afterward
401 let row = client.query_one(query, params).await?;
402
403 Ok(row)
404 }
405
406 //use for insert, update, etc
407 pub async fn execute(
408 & self,
409 query: &str,
410 params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
411 ) -> Result<u64, PostgresModelError> {
412 /*
413 let client = &self.connect().await?;
414
415
416
417 let rows = client.execute(query, params).await?;
418 Ok(rows)*/
419
420
421 // Get a client from the pool
422 let client = self.pool.get().await?;
423
424 // Execute the query and let the client be dropped automatically afterward
425 let count = client.execute(query, params).await?;
426
427 Ok(count)
428
429
430 }
431
432
433
434 pub async fn check_connection(&self) -> Result<bool, PostgresModelError> {
435 // Get a client from the pool
436 let client = self.pool.get().await?;
437
438 // Execute a simple query to check the connection
439 match client.execute("SELECT 1", &[]).await {
440 Ok(_) => Ok(true),
441 Err(e) => Err(PostgresModelError::from(e))
442 }
443}
444
445 pub async fn recreate_pool(&mut self) -> Result<(), PostgresModelError> {
446 // Parse the connection config from the URL
447 let config: tokio_postgres::Config = self.connection_url.parse()
448 .map_err(|_e| PostgresModelError::ConnectionFailed)?;
449
450 // Create a manager using the config
451 let manager = deadpool_postgres::Manager::new(config, tokio_postgres::NoTls);
452
453 // Create a new pool
454 let new_pool = deadpool_postgres::Pool::builder(manager)
455 .max_size(16)
456 .build()
457 .map_err(|e| PostgresModelError::PoolCreationFailed(e.to_string()))?;
458
459 // Replace the old pool
460 self.pool = new_pool;
461
462 Ok(())
463 }
464
465
466 pub async fn query_with_timeout(
467 &self,
468 query: &str,
469 params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
470 ) -> Result<Vec<tokio_postgres::Row>, PostgresModelError> {
471 match timeout(self.timeout_duration, self.query(query, params)).await {
472 Ok(result) => result,
473 Err(_) => Err(PostgresModelError::ConnectionFailed) // Timeout occurred
474 }
475 }
476
477 pub async fn query_with_retry(
478 &self,
479 query: &str,
480 params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
481 ) -> Result<Vec<tokio_postgres::Row>, PostgresModelError> {
482 let mut attempts = 0;
483
484 while attempts < self.max_reconnect_attempts {
485 match self.query(query, params).await {
486 Ok(result) => return Ok(result),
487 Err(e) => {
488 // If it's a connection error, retry
489 if let PostgresModelError::PoolError(_) = e {
490 attempts += 1;
491 if attempts >= self.max_reconnect_attempts {
492 return Err(e);
493 }
494 // Wait before retrying
495 sleep(Duration::from_secs(2_u64.pow(attempts))).await;
496 } else {
497 // For other errors, return immediately
498 return Err(e);
499 }
500 }
501 }
502 }
503
504 Err(PostgresModelError::ConnectionFailed)
505 }
506
507
508 /*async fn atomic_transaction(
509 & self,
510 steps: Vec<PostgresInput<'_>>,
511 ) -> Result<(), PostgresError> {
512 let client = &mut self.connect().await?;
513
514 // Start a transaction
515 let transaction = client.transaction().await?;
516
517 //for each step in steps
518 for step in steps {
519 //execute the step
520 let result = transaction.execute(&step.query, step.params).await;
521 //check if the result is ok
522 if result.is_err() {
523 //if not rollback
524 transaction.rollback().await?;
525 //return error
526 return Err(PostgresError::from(result.err().unwrap()));
527 }
528 }
529
530 //if all steps are ok commit
531 transaction.commit().await?;
532 //return ok
533 Ok(())
534 }*/
535}
536
537pub fn try_get_option<'a, T: tokio_postgres::types::FromSql<'a>>(
538 row: &'a tokio_postgres::Row,
539 column: &str,
540) -> Option<T> {
541 match row.try_get::<&str, T>(column) {
542 Ok(value) => Some(value),
543 Err(_) => None,
544 }
545}