1use tokio_postgres::Client;
2use crate::db::postgres::models::model::PostgresModelError;
3use tokio::time::timeout;
4use tokio::time::Duration;
5use tokio::time::sleep;
6use log::info;
7use tokio;
8use tokio_postgres::{Error as PostgresError, NoTls};
9
10use std::error::Error;
11use tokio_postgres_migration::Migration;
12
13use dotenvy::dotenv;
14use std::env;
15
16use std::fs;
17use std::str;
18
19type MigrationDefinition = (String, String);
20
21#[derive(Clone)]
22struct Migrations {
23 up: Vec<MigrationDefinition>,
24 down: Vec<MigrationDefinition>,
25}
26
27pub trait MigrationAsStr {
28 fn to_str(&self) -> (&str, &str);
29}
30
31impl MigrationAsStr for MigrationDefinition {
32 fn to_str(&self) -> (&str, &str) {
33 return (self.0.as_str(), self.1.as_str());
34 }
35}
36
37pub struct Database {
38
39 pool: deadpool_postgres::Pool, pub migrations_dir_path: Option<String>,
43 pub connection_url: String ,
44
45 pub max_reconnect_attempts: u32,
46 pub timeout_duration: Duration,
47}
48
49
50
51
52
53
54#[derive(Debug)]
55pub enum DatabaseError {
56 ConnectionFailed ,
57 PoolCreationFailed(String),
58 QueryFailed(tokio_postgres::Error),
59
60 RowParseError(String) ,
61
62 PostgresError(tokio_postgres::Error),
63 PoolError(deadpool::managed::PoolError<tokio_postgres::Error>),
64}
65
66impl From<tokio_postgres::Error> for DatabaseError {
67 fn from(error: tokio_postgres::Error) -> Self {
68 DatabaseError::PostgresError(error)
69 }
70}
71
72impl From<deadpool::managed::PoolError<tokio_postgres::Error>> for DatabaseError {
73 fn from(error: deadpool::managed::PoolError<tokio_postgres::Error>) -> Self {
74 DatabaseError::PoolError(error)
75 }
76}
77
78impl std::fmt::Display for DatabaseError {
80 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81 match self {
82 DatabaseError::ConnectionFailed => write!(f, "Database connection failed"),
83 DatabaseError::PoolCreationFailed(msg) => write!(f, "Connection pool creation failed: {}", msg),
84 DatabaseError::QueryFailed(err) => write!(f, "Database query failed: {}", err),
85 DatabaseError::PoolError(err) => write!(f, "Pool error: {}", err),
86 DatabaseError::PostgresError( err ) => write!(f, "Postgres error: {}", err),
87 DatabaseError::RowParseError( msg ) => write!(f, "row parse error: {}", msg),
88 }
90 }
91}
92
93impl Error for DatabaseError {
95 fn source(&self) -> Option<&(dyn Error + 'static)> {
96 match self {
97 DatabaseError::QueryFailed(err) => Some(err),
98 _ => None,
101 }
102 }
103}
104
105
106
107
108#[derive(Debug,Clone)]
109pub struct DatabaseCredentials {
110 pub db_name: String,
111 pub db_host: String,
112 pub db_user: String,
113 pub db_password: String,
114}
115
116impl Default for DatabaseCredentials {
117 fn default() -> Self {
118 Self {
119 db_name: "postgres".into(),
120 db_host: "localhost".into(),
121 db_user: "postgres".into(),
122 db_password: "postgres".into(),
123 }
124 }
125}
126
127impl DatabaseCredentials {
128 pub fn from_env() -> Self {
129 Self {
132 db_name: env::var("DB_NAME").unwrap_or("postgres".into()),
133 db_host: env::var("DB_HOST").unwrap_or("localhost".into()),
134 db_user: env::var("DB_USER").unwrap_or("postgres".into()),
135 db_password: env::var("DB_PASSWORD").unwrap_or("postgres".into()),
136 }
137 }
138
139 pub fn build_connection_url(&self) -> String {
140 return format!(
141 "postgres://{}:{}@{}/{}",
142 self.db_user, self.db_password, self.db_host, self.db_name
143 );
144 }
145}
146
147enum PostgresInputType {
148 Query,
149 QueryOne,
150 Execute,
151}
152
153struct PostgresInput<'a> {
154 input_type: PostgresInputType,
155 query: String,
156 params: &'a [&'a (dyn tokio_postgres::types::ToSql + Sync)],
157}
158
159impl Database {
160 pub fn new(
161 conn_url: String,
162 migrations_dir_path: Option<String>,
163 ) -> Result<Database, DatabaseError> {
164 let config: tokio_postgres::Config = conn_url.parse()
166 .map_err(|_e| DatabaseError::ConnectionFailed )?;
167
168 let manager = deadpool_postgres::Manager::new(config, tokio_postgres::NoTls);
170
171 let pool = deadpool_postgres::Pool::builder(manager)
173 .max_size(16)
174 .build()
175 .map_err(|e| DatabaseError::PoolCreationFailed(e.to_string()))?;
176
177 Ok(Database {
178 pool,
179 migrations_dir_path,
180 connection_url: conn_url,
181 max_reconnect_attempts: 3,
182 timeout_duration: Duration::from_secs(5)
183 })
184 }
185}
186
187
188impl Database {
189
190
191
192 pub async fn connect(
193 & self
195 ) -> Result<Client, PostgresError> {
196 info!("Connecting to db: {}", self.connection_url);
200
201 let (client, connection) = tokio_postgres::connect(&self.connection_url, NoTls).await?;
202
203 tokio::spawn(async move {
206 if let Err(e) = connection.await {
208 eprintln!("postgres connection error: {}", e);
209 }
210 });
211
212 Ok( client )
215 }
216
217
218fn read_migration_files(migrations_dir_path: Option<String>) -> Migrations {
258 let mut migrations = Migrations {
259 up: Vec::new(),
260 down: Vec::new(),
261 };
262
263 let migrations_dir =
264 migrations_dir_path.unwrap_or("./src/db/postgres/migrations".to_string());
265 let migration_dir_files = fs::read_dir(&migrations_dir).expect("Failed to read directory");
266
267 for file in migration_dir_files {
268 let file = file.expect("Failed to read migration file");
269
270 let path = file.path();
271 let filename = path.file_stem().unwrap().to_str().unwrap();
272
273 let filename_without_extension: &str = filename.split('.').next().unwrap();
274
275 let contents = fs::read_to_string(file.path()).expect("Failed to read file contents");
277
278 info!("File name: {}", filename);
281
282 if filename.contains(".down") {
283 info!("File contents: {}", contents);
284 migrations
285 .down
286 .push((filename_without_extension.into(), contents.clone()));
287 }
288
289 if filename.contains(".up") {
290 info!("File contents: {}", contents);
291 migrations
292 .up
293 .push((filename_without_extension.into(), contents.clone()));
294 }
295 }
296
297
298 migrations.up.sort_by(|a, b| a.0.cmp(&b.0));
300
301 migrations.down.sort_by(|a, b| b.0.cmp(&a.0));
303
304
305 return migrations;
306 }
307
308 pub async fn migrate(&mut self) -> Result<(), Box<dyn Error>> {
309 let client = &mut self.connect().await?;
310
311 let migrations_dir_path = self.migrations_dir_path.clone();
312 let mut migrations: Migrations = Self::read_migration_files(migrations_dir_path);
313
314 for up_migration in migrations.up.drain(..) {
315 println!("migrating {} {} ", up_migration.0, up_migration.1);
316 let migration = Migration::new("migrations".to_string());
317
318 migration.up(client, &[up_migration.to_str()]).await?;
320 }
321
322 Ok(())
324 }
325
326 pub async fn rollback(&mut self) -> Result<(), Box<dyn Error>> {
329 Ok(())
330 }
331
332 pub async fn rollback_full(&mut self) -> Result<(), Box<dyn Error>> {
333 let migrations_dir_path = self.migrations_dir_path.clone();
334
335 let mut migrations: Migrations = Self::read_migration_files(migrations_dir_path);
336
337 let client = &mut self.connect().await?;
338
339 for down_migration in migrations.down.drain(..)
340 {
341 println!("migrating {}", down_migration.0);
342 let migration = Migration::new("migrations".to_string());
343 migration.down(client, &[down_migration.to_str()]).await?;
345 }
346
347 Ok(())
348 }
349
350 pub async fn query(
351 &self,
352 query: &str,
353 params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
354 ) -> Result<Vec<tokio_postgres::Row>, DatabaseError> {
355
356 let client = self.pool.get().await?;
363
364 let rows = client.query(query, params).await?;
366
367 Ok(rows)
368
369 }
370
371
372 pub async fn query_one(
373 & self,
374 query: &str,
375 params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
376 ) -> Result<tokio_postgres::Row, DatabaseError> {
377 let client = self.pool.get().await ?;
387
388 let row = client.query_one(query, params).await?;
390
391 Ok(row)
392 }
393
394 pub async fn execute(
396 & self,
397 query: &str,
398 params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
399 ) -> Result<u64, DatabaseError> {
400 let client = self.pool.get().await?;
411
412 let count = client.execute(query, params).await?;
414
415 Ok(count)
416
417
418 }
419
420
421
422 pub async fn check_connection(&self) -> Result<bool, DatabaseError> {
423 let client = self.pool.get().await?;
425
426 match client.execute("SELECT 1", &[]).await {
428 Ok(_) => Ok(true),
429 Err(e) => Err(DatabaseError::from(e))
430 }
431}
432
433 pub async fn recreate_pool(&mut self) -> Result<(), DatabaseError> {
434 let config: tokio_postgres::Config = self.connection_url.parse()
436 .map_err(|_e| DatabaseError::ConnectionFailed)?;
437
438 let manager = deadpool_postgres::Manager::new(config, tokio_postgres::NoTls);
440
441 let new_pool = deadpool_postgres::Pool::builder(manager)
443 .max_size(16)
444 .build()
445 .map_err(|e| DatabaseError::PoolCreationFailed(e.to_string()))?;
446
447 self.pool = new_pool;
449
450 Ok(())
451 }
452
453
454 pub async fn query_with_timeout(
455 &self,
456 query: &str,
457 params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
458 ) -> Result<Vec<tokio_postgres::Row>, DatabaseError> {
459 match timeout(self.timeout_duration, self.query(query, params)).await {
460 Ok(result) => result,
461 Err(_) => Err(DatabaseError::ConnectionFailed) }
463 }
464
465 pub async fn query_with_retry(
466 &self,
467 query: &str,
468 params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
469 ) -> Result<Vec<tokio_postgres::Row>, DatabaseError> {
470 let mut attempts = 0;
471
472 while attempts < self.max_reconnect_attempts {
473 match self.query(query, params).await {
474 Ok(result) => return Ok(result),
475 Err(e) => {
476 if let DatabaseError::PoolError(_) = e {
478 attempts += 1;
479 if attempts >= self.max_reconnect_attempts {
480 return Err(e);
481 }
482 sleep(Duration::from_secs(2_u64.pow(attempts))).await;
484 } else {
485 return Err(e);
487 }
488 }
489 }
490 }
491
492 Err(DatabaseError::ConnectionFailed)
493 }
494
495
496 }
524
525pub fn try_get_option<'a, T: tokio_postgres::types::FromSql<'a>>(
526 row: &'a tokio_postgres::Row,
527 column: &str,
528) -> Option<T> {
529 match row.try_get::<&str, T>(column) {
530 Ok(value) => Some(value),
531 Err(_) => None,
532 }
533}