degen_sql/db/postgres/postgres_db.rs
1use tokio_postgres::Client;
2use crate::db::postgres::models::model::PostgresModelError;
3use tokio::time::timeout;
4use tokio::time::Duration;
5use tokio::time::sleep;
6use log::info;
7use tokio;
8use tokio_postgres::{Error as PostgresError, NoTls};
9
10use std::error::Error;
11use tokio_postgres_migration::Migration;
12
13use dotenvy::dotenv;
14use std::env;
15
16use std::fs;
17use std::str;
18
19type MigrationDefinition = (String, String);
20
21#[derive(Clone)]
22struct Migrations {
23 up: Vec<MigrationDefinition>,
24 down: Vec<MigrationDefinition>,
25}
26
27pub trait MigrationAsStr {
28 fn to_str(&self) -> (&str, &str);
29}
30
31impl MigrationAsStr for MigrationDefinition {
32 fn to_str(&self) -> (&str, &str) {
33 return (self.0.as_str(), self.1.as_str());
34 }
35}
36
37pub struct Database {
38
39 pool: deadpool_postgres::Pool, // Or other pool implementation
40
41 // pub client: Option< tokio_postgres::Client > ,
42 pub migrations_dir_path: Option<String>,
43 pub connection_url: String ,
44
45 pub max_reconnect_attempts: u32,
46 pub timeout_duration: Duration,
47}
48
49
50
51
52
53/*
54#[derive(Debug)]
55pub enum DatabaseError {
56 ConnectionFailed ,
57 PoolCreationFailed(String),
58 QueryFailed(tokio_postgres::Error),
59
60 RowParseError(String) ,
61
62 PostgresError(tokio_postgres::Error),
63 PoolError(deadpool::managed::PoolError<tokio_postgres::Error>),
64}
65
66impl From<tokio_postgres::Error> for DatabaseError {
67 fn from(error: tokio_postgres::Error) -> Self {
68 DatabaseError::PostgresError(error)
69 }
70}
71
72impl From<deadpool::managed::PoolError<tokio_postgres::Error>> for DatabaseError {
73 fn from(error: deadpool::managed::PoolError<tokio_postgres::Error>) -> Self {
74 DatabaseError::PoolError(error)
75 }
76}
77
78// First implement Display for your error type
79impl std::fmt::Display for DatabaseError {
80 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81 match self {
82 DatabaseError::ConnectionFailed => write!(f, "Database connection failed"),
83 DatabaseError::PoolCreationFailed(msg) => write!(f, "Connection pool creation failed: {}", msg),
84 DatabaseError::QueryFailed(err) => write!(f, "Database query failed: {}", err),
85 DatabaseError::PoolError(err) => write!(f, "Pool error: {}", err),
86 DatabaseError::PostgresError( err ) => write!(f, "Postgres error: {}", err),
87 DatabaseError::RowParseError( msg ) => write!(f, "row parse error: {}", msg),
88 // Handle other variants
89 }
90 }
91}
92
93// Then implement the Error trait
94impl Error for DatabaseError {
95 fn source(&self) -> Option<&(dyn Error + 'static)> {
96 match self {
97 DatabaseError::QueryFailed(err) => Some(err),
98 // For PoolError, you'd need to check if it implements Error
99 // Otherwise return None for this variant
100 _ => None,
101 }
102 }
103}
104*/
105
106
107
108#[derive(Debug,Clone)]
109pub struct DatabaseCredentials {
110 pub db_name: String,
111 pub db_host: String,
112 pub db_user: String,
113 pub db_password: String,
114}
115
116impl Default for DatabaseCredentials {
117 fn default() -> Self {
118 Self {
119 db_name: "postgres".into(),
120 db_host: "localhost".into(),
121 db_user: "postgres".into(),
122 db_password: "postgres".into(),
123 }
124 }
125}
126
127impl DatabaseCredentials {
128 pub fn from_env() -> Self {
129 //dotenv().expect(".env file not found"); //need to run this beforehand !!
130
131 Self {
132 db_name: env::var("DB_NAME").unwrap_or("postgres".into()),
133 db_host: env::var("DB_HOST").unwrap_or("localhost".into()),
134 db_user: env::var("DB_USER").unwrap_or("postgres".into()),
135 db_password: env::var("DB_PASSWORD").unwrap_or("postgres".into()),
136 }
137 }
138
139 pub fn build_connection_url(&self) -> String {
140 return format!(
141 "postgres://{}:{}@{}/{}",
142 self.db_user, self.db_password, self.db_host, self.db_name
143 );
144 }
145}
146
147enum PostgresInputType {
148 Query,
149 QueryOne,
150 Execute,
151}
152
153struct PostgresInput<'a> {
154 input_type: PostgresInputType,
155 query: String,
156 params: &'a [&'a (dyn tokio_postgres::types::ToSql + Sync)],
157}
158
159impl Database {
160 pub fn new(
161 conn_url: String,
162 max_pool_connections: usize,
163 migrations_dir_path: Option<String>,
164 ) -> Result<Database, PostgresModelError> {
165 // Parse the connection config from the URL
166 let config: tokio_postgres::Config = conn_url.parse()
167 .map_err(|_e| PostgresModelError::ConnectionFailed )?;
168
169 // Create a manager using the config
170 let manager = deadpool_postgres::Manager::new(config, tokio_postgres::NoTls);
171
172 // Create the pool with builder pattern
173 let pool = deadpool_postgres::Pool::builder(manager)
174 .max_size( max_pool_connections )
175 .build()
176 .map_err(|e| PostgresModelError::PoolCreationFailed(e.to_string()))?;
177
178 Ok(Database {
179 pool,
180 migrations_dir_path,
181 connection_url: conn_url,
182 max_reconnect_attempts: 3,
183 timeout_duration: Duration::from_secs(5)
184 })
185 }
186}
187
188
189impl Database {
190
191
192
193 pub async fn connect(
194 // credentials: DatabaseCredentials,
195 & self
196 ) -> Result<Client, PostgresError> {
197 // Define the connection URL.
198 // let conn_url = credentials.build_connection_url();
199
200 info!("Connecting to db: {}", self.connection_url);
201
202 let (client, connection) = tokio_postgres::connect(&self.connection_url, NoTls).await?;
203
204 // The connection object performs the actual communication with the database,
205 // so spawn it off to run on its own.
206 tokio::spawn(async move {
207 //this is a blocking call i think !!!
208 if let Err(e) = connection.await {
209 eprintln!("postgres connection error: {}", e);
210 }
211 });
212
213 // self.client = Some(client);
214
215 Ok( client )
216 }
217
218
219/*
220 pub async fn reconnect(&mut self ) -> Result<Option< Client >, PostgresError> {
221 let max_retries = 5;
222 let mut attempt = 0;
223 let conn_url = self.connection_url.clone() ;
224
225 while attempt < max_retries {
226 info!("Attempt {}: Reconnecting to database...", attempt + 1);
227
228 match tokio_postgres::connect(&conn_url, NoTls).await {
229 Ok((client, connection)) => {
230 // Spawn a task to keep the connection alive
231 tokio::spawn(async move {
232 if let Err(e) = connection.await {
233 eprintln!("postgres connection error: {}", e);
234 }
235 });
236
237 // self.client = Some(client); // Replace old client with the new one
238 info!("Reconnection successful.");
239 return Ok( Some(client) );
240 }
241 Err(e) => {
242
243 attempt += 1;
244
245 if attempt == max_retries {
246 return Err( e.into() )
247 }
248 eprintln!("Reconnection failed: {}. Retrying...", e);
249 sleep(Duration::from_secs(2_u64.pow(attempt))).await; // Exponential backoff
250 }
251 }
252 }
253
254 Ok(None) //should error ?
255 }*/
256
257
258 fn read_migration_files(migrations_dir_path: Option<String>) -> Migrations {
259 let mut migrations = Migrations {
260 up: Vec::new(),
261 down: Vec::new(),
262 };
263
264 let migrations_dir =
265 migrations_dir_path.unwrap_or("./src/db/postgres/migrations".to_string());
266 let migration_dir_files = fs::read_dir(&migrations_dir).expect("Failed to read directory");
267
268 for file in migration_dir_files {
269 let file = file.expect("Failed to read migration file");
270
271 let path = file.path();
272 let filename = path.file_stem().unwrap().to_str().unwrap();
273
274 let filename_without_extension: &str = filename.split('.').next().unwrap();
275
276 // Read file contents
277 let contents = fs::read_to_string(file.path()).expect("Failed to read file contents");
278
279 //let contents = str::from_utf8(file.contents()).unwrap();
280
281 info!("File name: {}", filename);
282
283 if filename.contains(".down") {
284 info!("File contents: {}", contents);
285 migrations
286 .down
287 .push((filename_without_extension.into(), contents.clone()));
288 }
289
290 if filename.contains(".up") {
291 info!("File contents: {}", contents);
292 migrations
293 .up
294 .push((filename_without_extension.into(), contents.clone()));
295 }
296 }
297
298
299 // Sort `up` migrations in ascending alphabetical order
300 migrations.up.sort_by(|a, b| a.0.cmp(&b.0));
301
302 // Sort `down` migrations in descending alphabetical order
303 migrations.down.sort_by(|a, b| b.0.cmp(&a.0));
304
305
306 return migrations;
307 }
308
309 pub async fn migrate(&mut self) -> Result<(), Box<dyn Error>> {
310 let client = &mut self.connect().await?;
311
312 let migrations_dir_path = self.migrations_dir_path.clone();
313 let mut migrations: Migrations = Self::read_migration_files(migrations_dir_path);
314
315 for up_migration in migrations.up.drain(..) {
316 println!("migrating {} {} ", up_migration.0, up_migration.1);
317 let migration = Migration::new("migrations".to_string());
318
319 // execute non existing migrations
320 migration.up(client, &[up_migration.to_str()]).await?;
321 }
322
323 // ...
324 Ok(())
325 }
326
327 //basically need to do the DOWN migrations and also delete some records from the migrations table
328 //need to read from the migrations table with MigrationsModel::find
329 pub async fn rollback(&mut self) -> Result<(), Box<dyn Error>> {
330 Ok(())
331 }
332
333 pub async fn rollback_full(&mut self) -> Result<(), Box<dyn Error>> {
334 let migrations_dir_path = self.migrations_dir_path.clone();
335
336 let mut migrations: Migrations = Self::read_migration_files(migrations_dir_path);
337
338 let client = &mut self.connect().await?;
339
340 for down_migration in migrations.down.drain(..)
341 {
342 println!("migrating {}", down_migration.0);
343 let migration = Migration::new("migrations".to_string());
344 // execute non existing migrations
345 migration.down(client, &[down_migration.to_str()]).await?;
346 }
347
348 Ok(())
349 }
350
351 pub async fn query(
352 &self,
353 query: &str,
354 params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
355 ) -> Result<Vec<tokio_postgres::Row>, PostgresModelError> {
356
357 /*let client = &self.connect().await?;
358
359 let rows = client.query(query, params).await?;
360 Ok(rows)*/
361
362 // Get a client from the pool
363 let client = self.pool.get().await?;
364
365 // Execute the query and let the client be dropped automatically afterward
366 let rows = client.query(query, params).await?;
367
368 Ok(rows)
369
370 }
371
372
373 pub async fn query_one(
374 & self,
375 query: &str,
376 params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
377 ) -> Result<tokio_postgres::Row, PostgresModelError> {
378 /*
379 let client = &self.connect().await?;
380
381 let rows = client.query_one(query, params).await?;
382 Ok(rows)
383
384 */
385
386
387 let client = self.pool.get().await ?;
388
389 // Execute the query and let the client be dropped automatically afterward
390 let row = client.query_one(query, params).await?;
391
392 Ok(row)
393 }
394
395 //use for insert, update, etc
396 pub async fn execute(
397 & self,
398 query: &str,
399 params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
400 ) -> Result<u64, PostgresModelError> {
401 /*
402 let client = &self.connect().await?;
403
404
405
406 let rows = client.execute(query, params).await?;
407 Ok(rows)*/
408
409
410 // Get a client from the pool
411 let client = self.pool.get().await?;
412
413 // Execute the query and let the client be dropped automatically afterward
414 let count = client.execute(query, params).await?;
415
416 Ok(count)
417
418
419 }
420
421
422
423 pub async fn check_connection(&self) -> Result<bool, PostgresModelError> {
424 // Get a client from the pool
425 let client = self.pool.get().await?;
426
427 // Execute a simple query to check the connection
428 match client.execute("SELECT 1", &[]).await {
429 Ok(_) => Ok(true),
430 Err(e) => Err(PostgresModelError::from(e))
431 }
432}
433
434 pub async fn recreate_pool(&mut self) -> Result<(), PostgresModelError> {
435 // Parse the connection config from the URL
436 let config: tokio_postgres::Config = self.connection_url.parse()
437 .map_err(|_e| PostgresModelError::ConnectionFailed)?;
438
439 // Create a manager using the config
440 let manager = deadpool_postgres::Manager::new(config, tokio_postgres::NoTls);
441
442 // Create a new pool
443 let new_pool = deadpool_postgres::Pool::builder(manager)
444 .max_size(16)
445 .build()
446 .map_err(|e| PostgresModelError::PoolCreationFailed(e.to_string()))?;
447
448 // Replace the old pool
449 self.pool = new_pool;
450
451 Ok(())
452 }
453
454
455 pub async fn query_with_timeout(
456 &self,
457 query: &str,
458 params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
459 ) -> Result<Vec<tokio_postgres::Row>, PostgresModelError> {
460 match timeout(self.timeout_duration, self.query(query, params)).await {
461 Ok(result) => result,
462 Err(_) => Err(PostgresModelError::ConnectionFailed) // Timeout occurred
463 }
464 }
465
466 pub async fn query_with_retry(
467 &self,
468 query: &str,
469 params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
470 ) -> Result<Vec<tokio_postgres::Row>, PostgresModelError> {
471 let mut attempts = 0;
472
473 while attempts < self.max_reconnect_attempts {
474 match self.query(query, params).await {
475 Ok(result) => return Ok(result),
476 Err(e) => {
477 // If it's a connection error, retry
478 if let PostgresModelError::PoolError(_) = e {
479 attempts += 1;
480 if attempts >= self.max_reconnect_attempts {
481 return Err(e);
482 }
483 // Wait before retrying
484 sleep(Duration::from_secs(2_u64.pow(attempts))).await;
485 } else {
486 // For other errors, return immediately
487 return Err(e);
488 }
489 }
490 }
491 }
492
493 Err(PostgresModelError::ConnectionFailed)
494 }
495
496
497 /*async fn atomic_transaction(
498 & self,
499 steps: Vec<PostgresInput<'_>>,
500 ) -> Result<(), PostgresError> {
501 let client = &mut self.connect().await?;
502
503 // Start a transaction
504 let transaction = client.transaction().await?;
505
506 //for each step in steps
507 for step in steps {
508 //execute the step
509 let result = transaction.execute(&step.query, step.params).await;
510 //check if the result is ok
511 if result.is_err() {
512 //if not rollback
513 transaction.rollback().await?;
514 //return error
515 return Err(PostgresError::from(result.err().unwrap()));
516 }
517 }
518
519 //if all steps are ok commit
520 transaction.commit().await?;
521 //return ok
522 Ok(())
523 }*/
524}
525
526pub fn try_get_option<'a, T: tokio_postgres::types::FromSql<'a>>(
527 row: &'a tokio_postgres::Row,
528 column: &str,
529) -> Option<T> {
530 match row.try_get::<&str, T>(column) {
531 Ok(value) => Some(value),
532 Err(_) => None,
533 }
534}