degen_sql/db/postgres/postgres_db.rs
1use tokio_postgres::Client;
2use crate::db::postgres::models::model::PostgresModelError;
3use tokio::time::timeout;
4use tokio::time::Duration;
5use tokio::time::sleep;
6use log::info;
7use tokio;
8use tokio_postgres::{Error as PostgresError, NoTls};
9
10use std::error::Error;
11use tokio_postgres_migration::Migration;
12
13use dotenvy::dotenv;
14use std::env;
15
16use std::fs;
17use std::str;
18
19type MigrationDefinition = (String, String);
20
21#[derive(Clone)]
22struct Migrations {
23 up: Vec<MigrationDefinition>,
24 down: Vec<MigrationDefinition>,
25}
26
27pub trait MigrationAsStr {
28 fn to_str(&self) -> (&str, &str);
29}
30
31impl MigrationAsStr for MigrationDefinition {
32 fn to_str(&self) -> (&str, &str) {
33 return (self.0.as_str(), self.1.as_str());
34 }
35}
36
37pub struct Database {
38
39 pool: deadpool_postgres::Pool, // Or other pool implementation
40
41 // pub client: Option< tokio_postgres::Client > ,
42 pub migrations_dir_path: Option<String>,
43 pub connection_url: String ,
44
45 pub max_reconnect_attempts: u32,
46 pub timeout_duration: Duration,
47}
48
49
50
51
52
53/*
54#[derive(Debug)]
55pub enum DatabaseError {
56 ConnectionFailed ,
57 PoolCreationFailed(String),
58 QueryFailed(tokio_postgres::Error),
59
60 RowParseError(String) ,
61
62 PostgresError(tokio_postgres::Error),
63 PoolError(deadpool::managed::PoolError<tokio_postgres::Error>),
64}
65
66impl From<tokio_postgres::Error> for DatabaseError {
67 fn from(error: tokio_postgres::Error) -> Self {
68 DatabaseError::PostgresError(error)
69 }
70}
71
72impl From<deadpool::managed::PoolError<tokio_postgres::Error>> for DatabaseError {
73 fn from(error: deadpool::managed::PoolError<tokio_postgres::Error>) -> Self {
74 DatabaseError::PoolError(error)
75 }
76}
77
78// First implement Display for your error type
79impl std::fmt::Display for DatabaseError {
80 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81 match self {
82 DatabaseError::ConnectionFailed => write!(f, "Database connection failed"),
83 DatabaseError::PoolCreationFailed(msg) => write!(f, "Connection pool creation failed: {}", msg),
84 DatabaseError::QueryFailed(err) => write!(f, "Database query failed: {}", err),
85 DatabaseError::PoolError(err) => write!(f, "Pool error: {}", err),
86 DatabaseError::PostgresError( err ) => write!(f, "Postgres error: {}", err),
87 DatabaseError::RowParseError( msg ) => write!(f, "row parse error: {}", msg),
88 // Handle other variants
89 }
90 }
91}
92
93// Then implement the Error trait
94impl Error for DatabaseError {
95 fn source(&self) -> Option<&(dyn Error + 'static)> {
96 match self {
97 DatabaseError::QueryFailed(err) => Some(err),
98 // For PoolError, you'd need to check if it implements Error
99 // Otherwise return None for this variant
100 _ => None,
101 }
102 }
103}
104*/
105
106
107
108#[derive(Debug,Clone)]
109pub struct DatabaseCredentials {
110 pub db_name: String,
111 pub db_host: String,
112 pub db_user: String,
113 pub db_password: String,
114}
115
116impl Default for DatabaseCredentials {
117 fn default() -> Self {
118 Self {
119 db_name: "postgres".into(),
120 db_host: "localhost".into(),
121 db_user: "postgres".into(),
122 db_password: "postgres".into(),
123 }
124 }
125}
126
127impl DatabaseCredentials {
128 pub fn from_env() -> Self {
129 //dotenv().expect(".env file not found"); //need to run this beforehand !!
130
131 Self {
132 db_name: env::var("DB_NAME").unwrap_or("postgres".into()),
133 db_host: env::var("DB_HOST").unwrap_or("localhost".into()),
134 db_user: env::var("DB_USER").unwrap_or("postgres".into()),
135 db_password: env::var("DB_PASSWORD").unwrap_or("postgres".into()),
136 }
137 }
138
139 pub fn build_connection_url(&self) -> String {
140 return format!(
141 "postgres://{}:{}@{}/{}",
142 self.db_user, self.db_password, self.db_host, self.db_name
143 );
144 }
145}
146
147enum PostgresInputType {
148 Query,
149 QueryOne,
150 Execute,
151}
152
153struct PostgresInput<'a> {
154 input_type: PostgresInputType,
155 query: String,
156 params: &'a [&'a (dyn tokio_postgres::types::ToSql + Sync)],
157}
158
159impl Database {
160 pub fn new(
161 conn_url: String,
162 migrations_dir_path: Option<String>,
163 ) -> Result<Database, PostgresModelError> {
164 // Parse the connection config from the URL
165 let config: tokio_postgres::Config = conn_url.parse()
166 .map_err(|_e| PostgresModelError::ConnectionFailed )?;
167
168 // Create a manager using the config
169 let manager = deadpool_postgres::Manager::new(config, tokio_postgres::NoTls);
170
171 // Create the pool with builder pattern
172 let pool = deadpool_postgres::Pool::builder(manager)
173 .max_size(16)
174 .build()
175 .map_err(|e| PostgresModelError::PoolCreationFailed(e.to_string()))?;
176
177 Ok(Database {
178 pool,
179 migrations_dir_path,
180 connection_url: conn_url,
181 max_reconnect_attempts: 3,
182 timeout_duration: Duration::from_secs(5)
183 })
184 }
185}
186
187
188impl Database {
189
190
191
192 pub async fn connect(
193 // credentials: DatabaseCredentials,
194 & self
195 ) -> Result<Client, PostgresError> {
196 // Define the connection URL.
197 // let conn_url = credentials.build_connection_url();
198
199 info!("Connecting to db: {}", self.connection_url);
200
201 let (client, connection) = tokio_postgres::connect(&self.connection_url, NoTls).await?;
202
203 // The connection object performs the actual communication with the database,
204 // so spawn it off to run on its own.
205 tokio::spawn(async move {
206 //this is a blocking call i think !!!
207 if let Err(e) = connection.await {
208 eprintln!("postgres connection error: {}", e);
209 }
210 });
211
212 // self.client = Some(client);
213
214 Ok( client )
215 }
216
217
218/*
219 pub async fn reconnect(&mut self ) -> Result<Option< Client >, PostgresError> {
220 let max_retries = 5;
221 let mut attempt = 0;
222 let conn_url = self.connection_url.clone() ;
223
224 while attempt < max_retries {
225 info!("Attempt {}: Reconnecting to database...", attempt + 1);
226
227 match tokio_postgres::connect(&conn_url, NoTls).await {
228 Ok((client, connection)) => {
229 // Spawn a task to keep the connection alive
230 tokio::spawn(async move {
231 if let Err(e) = connection.await {
232 eprintln!("postgres connection error: {}", e);
233 }
234 });
235
236 // self.client = Some(client); // Replace old client with the new one
237 info!("Reconnection successful.");
238 return Ok( Some(client) );
239 }
240 Err(e) => {
241
242 attempt += 1;
243
244 if attempt == max_retries {
245 return Err( e.into() )
246 }
247 eprintln!("Reconnection failed: {}. Retrying...", e);
248 sleep(Duration::from_secs(2_u64.pow(attempt))).await; // Exponential backoff
249 }
250 }
251 }
252
253 Ok(None) //should error ?
254 }*/
255
256
257 fn read_migration_files(migrations_dir_path: Option<String>) -> Migrations {
258 let mut migrations = Migrations {
259 up: Vec::new(),
260 down: Vec::new(),
261 };
262
263 let migrations_dir =
264 migrations_dir_path.unwrap_or("./src/db/postgres/migrations".to_string());
265 let migration_dir_files = fs::read_dir(&migrations_dir).expect("Failed to read directory");
266
267 for file in migration_dir_files {
268 let file = file.expect("Failed to read migration file");
269
270 let path = file.path();
271 let filename = path.file_stem().unwrap().to_str().unwrap();
272
273 let filename_without_extension: &str = filename.split('.').next().unwrap();
274
275 // Read file contents
276 let contents = fs::read_to_string(file.path()).expect("Failed to read file contents");
277
278 //let contents = str::from_utf8(file.contents()).unwrap();
279
280 info!("File name: {}", filename);
281
282 if filename.contains(".down") {
283 info!("File contents: {}", contents);
284 migrations
285 .down
286 .push((filename_without_extension.into(), contents.clone()));
287 }
288
289 if filename.contains(".up") {
290 info!("File contents: {}", contents);
291 migrations
292 .up
293 .push((filename_without_extension.into(), contents.clone()));
294 }
295 }
296
297
298 // Sort `up` migrations in ascending alphabetical order
299 migrations.up.sort_by(|a, b| a.0.cmp(&b.0));
300
301 // Sort `down` migrations in descending alphabetical order
302 migrations.down.sort_by(|a, b| b.0.cmp(&a.0));
303
304
305 return migrations;
306 }
307
308 pub async fn migrate(&mut self) -> Result<(), Box<dyn Error>> {
309 let client = &mut self.connect().await?;
310
311 let migrations_dir_path = self.migrations_dir_path.clone();
312 let mut migrations: Migrations = Self::read_migration_files(migrations_dir_path);
313
314 for up_migration in migrations.up.drain(..) {
315 println!("migrating {} {} ", up_migration.0, up_migration.1);
316 let migration = Migration::new("migrations".to_string());
317
318 // execute non existing migrations
319 migration.up(client, &[up_migration.to_str()]).await?;
320 }
321
322 // ...
323 Ok(())
324 }
325
326 //basically need to do the DOWN migrations and also delete some records from the migrations table
327 //need to read from the migrations table with MigrationsModel::find
328 pub async fn rollback(&mut self) -> Result<(), Box<dyn Error>> {
329 Ok(())
330 }
331
332 pub async fn rollback_full(&mut self) -> Result<(), Box<dyn Error>> {
333 let migrations_dir_path = self.migrations_dir_path.clone();
334
335 let mut migrations: Migrations = Self::read_migration_files(migrations_dir_path);
336
337 let client = &mut self.connect().await?;
338
339 for down_migration in migrations.down.drain(..)
340 {
341 println!("migrating {}", down_migration.0);
342 let migration = Migration::new("migrations".to_string());
343 // execute non existing migrations
344 migration.down(client, &[down_migration.to_str()]).await?;
345 }
346
347 Ok(())
348 }
349
350 pub async fn query(
351 &self,
352 query: &str,
353 params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
354 ) -> Result<Vec<tokio_postgres::Row>, PostgresModelError> {
355
356 /*let client = &self.connect().await?;
357
358 let rows = client.query(query, params).await?;
359 Ok(rows)*/
360
361 // Get a client from the pool
362 let client = self.pool.get().await?;
363
364 // Execute the query and let the client be dropped automatically afterward
365 let rows = client.query(query, params).await?;
366
367 Ok(rows)
368
369 }
370
371
372 pub async fn query_one(
373 & self,
374 query: &str,
375 params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
376 ) -> Result<tokio_postgres::Row, PostgresModelError> {
377 /*
378 let client = &self.connect().await?;
379
380 let rows = client.query_one(query, params).await?;
381 Ok(rows)
382
383 */
384
385
386 let client = self.pool.get().await ?;
387
388 // Execute the query and let the client be dropped automatically afterward
389 let row = client.query_one(query, params).await?;
390
391 Ok(row)
392 }
393
394 //use for insert, update, etc
395 pub async fn execute(
396 & self,
397 query: &str,
398 params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
399 ) -> Result<u64, PostgresModelError> {
400 /*
401 let client = &self.connect().await?;
402
403
404
405 let rows = client.execute(query, params).await?;
406 Ok(rows)*/
407
408
409 // Get a client from the pool
410 let client = self.pool.get().await?;
411
412 // Execute the query and let the client be dropped automatically afterward
413 let count = client.execute(query, params).await?;
414
415 Ok(count)
416
417
418 }
419
420
421
422 pub async fn check_connection(&self) -> Result<bool, PostgresModelError> {
423 // Get a client from the pool
424 let client = self.pool.get().await?;
425
426 // Execute a simple query to check the connection
427 match client.execute("SELECT 1", &[]).await {
428 Ok(_) => Ok(true),
429 Err(e) => Err(PostgresModelError::from(e))
430 }
431}
432
433 pub async fn recreate_pool(&mut self) -> Result<(), PostgresModelError> {
434 // Parse the connection config from the URL
435 let config: tokio_postgres::Config = self.connection_url.parse()
436 .map_err(|_e| PostgresModelError::ConnectionFailed)?;
437
438 // Create a manager using the config
439 let manager = deadpool_postgres::Manager::new(config, tokio_postgres::NoTls);
440
441 // Create a new pool
442 let new_pool = deadpool_postgres::Pool::builder(manager)
443 .max_size(16)
444 .build()
445 .map_err(|e| PostgresModelError::PoolCreationFailed(e.to_string()))?;
446
447 // Replace the old pool
448 self.pool = new_pool;
449
450 Ok(())
451 }
452
453
454 pub async fn query_with_timeout(
455 &self,
456 query: &str,
457 params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
458 ) -> Result<Vec<tokio_postgres::Row>, PostgresModelError> {
459 match timeout(self.timeout_duration, self.query(query, params)).await {
460 Ok(result) => result,
461 Err(_) => Err(PostgresModelError::ConnectionFailed) // Timeout occurred
462 }
463 }
464
465 pub async fn query_with_retry(
466 &self,
467 query: &str,
468 params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
469 ) -> Result<Vec<tokio_postgres::Row>, PostgresModelError> {
470 let mut attempts = 0;
471
472 while attempts < self.max_reconnect_attempts {
473 match self.query(query, params).await {
474 Ok(result) => return Ok(result),
475 Err(e) => {
476 // If it's a connection error, retry
477 if let PostgresModelError::PoolError(_) = e {
478 attempts += 1;
479 if attempts >= self.max_reconnect_attempts {
480 return Err(e);
481 }
482 // Wait before retrying
483 sleep(Duration::from_secs(2_u64.pow(attempts))).await;
484 } else {
485 // For other errors, return immediately
486 return Err(e);
487 }
488 }
489 }
490 }
491
492 Err(PostgresModelError::ConnectionFailed)
493 }
494
495
496 /*async fn atomic_transaction(
497 & self,
498 steps: Vec<PostgresInput<'_>>,
499 ) -> Result<(), PostgresError> {
500 let client = &mut self.connect().await?;
501
502 // Start a transaction
503 let transaction = client.transaction().await?;
504
505 //for each step in steps
506 for step in steps {
507 //execute the step
508 let result = transaction.execute(&step.query, step.params).await;
509 //check if the result is ok
510 if result.is_err() {
511 //if not rollback
512 transaction.rollback().await?;
513 //return error
514 return Err(PostgresError::from(result.err().unwrap()));
515 }
516 }
517
518 //if all steps are ok commit
519 transaction.commit().await?;
520 //return ok
521 Ok(())
522 }*/
523}
524
525pub fn try_get_option<'a, T: tokio_postgres::types::FromSql<'a>>(
526 row: &'a tokio_postgres::Row,
527 column: &str,
528) -> Option<T> {
529 match row.try_get::<&str, T>(column) {
530 Ok(value) => Some(value),
531 Err(_) => None,
532 }
533}