1use tokio_postgres::Client;
2use crate::db::postgres::models::model::PostgresModelError;
3use tokio::time::timeout;
4use tokio::time::Duration;
5use tokio::time::sleep;
6use log::info;
7use tokio;
8use tokio_postgres::{Error as PostgresError, NoTls};
9
10use std::error::Error;
11use tokio_postgres_migration::Migration;
12
13use std::env;
15
16use std::fs;
17use std::str;
18
19type MigrationDefinition = (String, String);
20
21#[derive(Clone)]
22struct Migrations {
23 up: Vec<MigrationDefinition>,
24 down: Vec<MigrationDefinition>,
25}
26
27pub trait MigrationAsStr {
28 fn to_str(&self) -> (&str, &str);
29}
30
31impl MigrationAsStr for MigrationDefinition {
32 fn to_str(&self) -> (&str, &str) {
33 return (self.0.as_str(), self.1.as_str());
34 }
35}
36
37pub struct Database {
38
39 pool: deadpool_postgres::Pool, pub migrations_dir_path: Option<String>,
43 pub connection_url: String ,
44
45 pub max_reconnect_attempts: u32,
46 pub timeout_duration: Duration,
47}
48
49
50
51
52
53#[derive(Debug,Clone)]
109pub struct DatabaseCredentials {
110 pub db_name: String,
111 pub db_host: String,
112 pub db_user: String,
113 pub db_password: String,
114}
115
116impl Default for DatabaseCredentials {
117 fn default() -> Self {
118 Self {
119 db_name: "postgres".into(),
120 db_host: "localhost".into(),
121 db_user: "postgres".into(),
122 db_password: "postgres".into(),
123 }
124 }
125}
126
127impl DatabaseCredentials {
128 pub fn from_env() -> Self {
129 Self {
132 db_name: env::var("DB_NAME").unwrap_or("postgres".into()),
133 db_host: env::var("DB_HOST").unwrap_or("localhost".into()),
134 db_user: env::var("DB_USER").unwrap_or("postgres".into()),
135 db_password: env::var("DB_PASSWORD").unwrap_or("postgres".into()),
136 }
137 }
138
139 pub fn build_connection_url(&self) -> String {
140 return format!(
141 "postgres://{}:{}@{}/{}",
142 self.db_user, self.db_password, self.db_host, self.db_name
143 );
144 }
145}
146
147enum PostgresInputType {
148 Query,
149 QueryOne,
150 Execute,
151}
152
153struct PostgresInput<'a> {
154 input_type: PostgresInputType,
155 query: String,
156 params: &'a [&'a (dyn tokio_postgres::types::ToSql + Sync + Send)],
157}
158
159impl Database {
160 pub fn new(
161 conn_url: String,
162 migrations_dir_path: Option<String>,
163 ) -> Result<Database, PostgresModelError> {
164 let config: tokio_postgres::Config = conn_url.parse()
166 .map_err(|_e| PostgresModelError::ConnectionFailed )?;
167
168 let manager = deadpool_postgres::Manager::new(config, tokio_postgres::NoTls);
170
171 let pool = deadpool_postgres::Pool::builder(manager)
173 .max_size(16)
174 .build()
175 .map_err(|e| PostgresModelError::PoolCreationFailed(e.to_string()))?;
176
177 Ok(Database {
178 pool,
179 migrations_dir_path,
180 connection_url: conn_url,
181 max_reconnect_attempts: 3,
182 timeout_duration: Duration::from_secs(5)
183 })
184 }
185}
186
187
188impl Database {
189
190
191
192 pub async fn connect(
193 & self
195 ) -> Result<Client, PostgresError> {
196 info!("Connecting to db: {}", self.connection_url);
200
201 let (client, connection) = tokio_postgres::connect(&self.connection_url, NoTls).await?;
202
203 tokio::spawn(async move {
206 if let Err(e) = connection.await {
208 eprintln!("postgres connection error: {}", e);
209 }
210 });
211
212 Ok( client )
215 }
216
217
218fn read_migration_files(migrations_dir_path: Option<String>) -> Migrations {
258 let mut migrations = Migrations {
259 up: Vec::new(),
260 down: Vec::new(),
261 };
262
263 let migrations_dir =
264 migrations_dir_path.unwrap_or("./src/db/postgres/migrations".to_string());
265 let migration_dir_files = fs::read_dir(&migrations_dir).expect("Failed to read directory");
266
267 for file in migration_dir_files {
268 let file = file.expect("Failed to read migration file");
269
270 let path = file.path();
271 let filename = path.file_stem().unwrap().to_str().unwrap();
272
273 let filename_without_extension: &str = filename.split('.').next().unwrap();
274
275 let contents = fs::read_to_string(file.path()).expect("Failed to read file contents");
277
278 info!("File name: {}", filename);
281
282 if filename.contains(".down") {
283 info!("File contents: {}", contents);
284 migrations
285 .down
286 .push((filename_without_extension.into(), contents.clone()));
287 }
288
289 if filename.contains(".up") {
290 info!("File contents: {}", contents);
291 migrations
292 .up
293 .push((filename_without_extension.into(), contents.clone()));
294 }
295 }
296
297
298 migrations.up.sort_by(|a, b| a.0.cmp(&b.0));
300
301 migrations.down.sort_by(|a, b| b.0.cmp(&a.0));
303
304
305 return migrations;
306 }
307
308 pub async fn migrate(&mut self) -> Result<(), Box<dyn Error>> {
309 let client = &mut self.connect().await?;
310
311 let migrations_dir_path = self.migrations_dir_path.clone();
312 let mut migrations: Migrations = Self::read_migration_files(migrations_dir_path);
313
314 for up_migration in migrations.up.drain(..) {
315 println!("migrating {} {} ", up_migration.0, up_migration.1);
316 let migration = Migration::new("migrations".to_string());
317
318 migration.up(client, &[up_migration.to_str()]).await?;
320 }
321
322 Ok(())
324 }
325
326 pub async fn rollback(&mut self) -> Result<(), Box<dyn Error>> {
329 Ok(())
330 }
331
332 pub async fn rollback_full(&mut self) -> Result<(), Box<dyn Error>> {
333 let migrations_dir_path = self.migrations_dir_path.clone();
334
335 let mut migrations: Migrations = Self::read_migration_files(migrations_dir_path);
336
337 let client = &mut self.connect().await?;
338
339 for down_migration in migrations.down.drain(..)
340 {
341 println!("migrating {}", down_migration.0);
342 let migration = Migration::new("migrations".to_string());
343 migration.down(client, &[down_migration.to_str()]).await?;
345 }
346
347 Ok(())
348 }
349
350 pub async fn query(
351 &self,
352 query: &str,
353 params: &[&(dyn tokio_postgres::types::ToSql + Sync + Send )],
354 ) -> Result<Vec<tokio_postgres::Row>, PostgresModelError> {
355
356 let client = self.pool.get().await?;
363
364 let cast_params: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = params
366 .iter()
367 .map(|&p| p as &(dyn tokio_postgres::types::ToSql + Sync))
368 .collect();
369
370 let rows = client.query(query, &cast_params).await?;
372
373 Ok(rows)
374
375 }
376
377
378 pub async fn query_one(
379 & self,
380 query: &str,
381 params: &[&(dyn tokio_postgres::types::ToSql + Sync + Send )],
382 ) -> Result<tokio_postgres::Row, PostgresModelError> {
383 let client = self.pool.get().await ?;
393
394 let cast_params: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = params
396 .iter()
397 .map(|&p| p as &(dyn tokio_postgres::types::ToSql + Sync))
398 .collect();
399
400 let row = client.query_one(query, &cast_params).await?;
402
403 Ok(row)
404 }
405
406 pub async fn execute(
408 & self,
409 query: &str,
410 params: &[&(dyn tokio_postgres::types::ToSql + Sync + Send )],
411 ) -> Result<u64, PostgresModelError> {
412 let client = self.pool.get().await?;
423
424 let cast_params: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = params
426 .iter()
427 .map(|&p| p as &(dyn tokio_postgres::types::ToSql + Sync))
428 .collect();
429
430 let count = client.execute(query, &cast_params).await?;
432
433 Ok(count)
434
435
436 }
437
438
439
440 pub async fn check_connection(&self) -> Result<bool, PostgresModelError> {
441 let client = self.pool.get().await?;
443
444 match client.execute("SELECT 1", &[]).await {
446 Ok(_) => Ok(true),
447 Err(e) => Err(PostgresModelError::from(e))
448 }
449}
450
451 pub async fn recreate_pool(&mut self) -> Result<(), PostgresModelError> {
452 let config: tokio_postgres::Config = self.connection_url.parse()
454 .map_err(|_e| PostgresModelError::ConnectionFailed)?;
455
456 let manager = deadpool_postgres::Manager::new(config, tokio_postgres::NoTls);
458
459 let new_pool = deadpool_postgres::Pool::builder(manager)
461 .max_size(16)
462 .build()
463 .map_err(|e| PostgresModelError::PoolCreationFailed(e.to_string()))?;
464
465 self.pool = new_pool;
467
468 Ok(())
469 }
470
471
472 pub async fn query_with_timeout(
473 &self,
474 query: &str,
475 params: &[&(dyn tokio_postgres::types::ToSql + Sync + Send)],
476 ) -> Result<Vec<tokio_postgres::Row>, PostgresModelError> {
477 match timeout(self.timeout_duration, self.query(query, params)).await {
478 Ok(result) => result,
479 Err(_) => Err(PostgresModelError::ConnectionFailed) }
481 }
482
483 pub async fn query_with_retry(
484 &self,
485 query: &str,
486 params: &[&(dyn tokio_postgres::types::ToSql + Sync + Send)],
487 ) -> Result<Vec<tokio_postgres::Row>, PostgresModelError> {
488 let mut attempts = 0;
489
490 while attempts < self.max_reconnect_attempts {
491 match self.query(query, params).await {
492 Ok(result) => return Ok(result),
493 Err(e) => {
494 if let PostgresModelError::PoolError(_) = e {
496 attempts += 1;
497 if attempts >= self.max_reconnect_attempts {
498 return Err(e);
499 }
500 sleep(Duration::from_secs(2_u64.pow(attempts))).await;
502 } else {
503 return Err(e);
505 }
506 }
507 }
508 }
509
510 Err(PostgresModelError::ConnectionFailed)
511 }
512
513
514 }
542
543pub fn try_get_option<'a, T: tokio_postgres::types::FromSql<'a>>(
544 row: &'a tokio_postgres::Row,
545 column: &str,
546) -> Option<T> {
547 match row.try_get::<&str, T>(column) {
548 Ok(value) => Some(value),
549 Err(_) => None,
550 }
551}