1use deadpool_postgres::Timeouts;
2use tokio_postgres::Client;
3use crate::db::postgres::models::model::PostgresModelError;
4use tokio::time::timeout;
5use tokio::time::Duration;
6use tokio::time::sleep;
7use log::info;
8use tokio;
9use tokio_postgres::{Error as PostgresError, NoTls};
10
11use std::error::Error;
12use tokio_postgres_migration::Migration;
13
14use dotenvy::dotenv;
15use std::env;
16
17use std::fs;
18use std::str;
19
20type MigrationDefinition = (String, String);
21
22#[derive(Clone)]
23struct Migrations {
24 up: Vec<MigrationDefinition>,
25 down: Vec<MigrationDefinition>,
26}
27
28pub trait MigrationAsStr {
29 fn to_str(&self) -> (&str, &str);
30}
31
32impl MigrationAsStr for MigrationDefinition {
33 fn to_str(&self) -> (&str, &str) {
34 return (self.0.as_str(), self.1.as_str());
35 }
36}
37
38pub struct Database {
39
40 pool: deadpool_postgres::Pool, pub migrations_dir_path: Option<String>,
44 pub connection_url: String ,
45
46 pub max_reconnect_attempts: u32,
47 pub timeout_duration: Duration,
48}
49
50
51
52
53
54
55#[derive(Debug,Clone)]
56pub struct DatabaseCredentials {
57 pub db_name: String,
58 pub db_host: String,
59 pub db_user: String,
60 pub db_password: String,
61}
62
63impl Default for DatabaseCredentials {
64 fn default() -> Self {
65 Self {
66 db_name: "postgres".into(),
67 db_host: "localhost".into(),
68 db_user: "postgres".into(),
69 db_password: "postgres".into(),
70 }
71 }
72}
73
74impl DatabaseCredentials {
75 pub fn from_env() -> Self {
76 Self {
79 db_name: env::var("DB_NAME").unwrap_or("postgres".into()),
80 db_host: env::var("DB_HOST").unwrap_or("localhost".into()),
81 db_user: env::var("DB_USER").unwrap_or("postgres".into()),
82 db_password: env::var("DB_PASSWORD").unwrap_or("postgres".into()),
83 }
84 }
85
86 pub fn build_connection_url(&self) -> String {
87 return format!(
88 "postgres://{}:{}@{}/{}",
89 self.db_user, self.db_password, self.db_host, self.db_name
90 );
91 }
92}
93
94enum PostgresInputType {
95 Query,
96 QueryOne,
97 Execute,
98}
99
100struct PostgresInput<'a> {
101 input_type: PostgresInputType,
102 query: String,
103 params: &'a [&'a (dyn tokio_postgres::types::ToSql + Sync)],
104}
105
106impl Database {
107 pub fn new(
108 conn_url: String,
109 max_pool_connections: usize,
110 migrations_dir_path: Option<String>,
111 ) -> Result<Database, PostgresModelError> {
112 let config: tokio_postgres::Config = conn_url.parse()
114 .map_err(|_e| PostgresModelError::ConnectionFailed )?;
115
116 let manager = deadpool_postgres::Manager::new(config, tokio_postgres::NoTls);
118
119let pool = deadpool_postgres::Pool::builder(manager)
128 .max_size( max_pool_connections )
129 .build()
131 .map_err(|e| PostgresModelError::PoolCreationFailed(e.to_string()))?;
132
133
134
135 Ok(Database {
136 pool,
137 migrations_dir_path,
138 connection_url: conn_url,
139 max_reconnect_attempts: 3,
140 timeout_duration: Duration::from_secs(5)
141 })
142 }
143}
144
145
146impl Database {
147
148
149
150 pub async fn connect(
151 & self
153 ) -> Result<Client, PostgresError> {
154 info!("Connecting to db: {}", self.connection_url);
158
159 let (client, connection) = tokio_postgres::connect(&self.connection_url, NoTls).await?;
160
161 tokio::spawn(async move {
164 if let Err(e) = connection.await {
166 eprintln!("postgres connection error: {}", e);
167 }
168 });
169
170 Ok( client )
173 }
174
175
176
177 fn read_migration_files(migrations_dir_path: Option<String>) -> Migrations {
178 let mut migrations = Migrations {
179 up: Vec::new(),
180 down: Vec::new(),
181 };
182
183 let migrations_dir =
184 migrations_dir_path.unwrap_or("./src/db/postgres/migrations".to_string());
185 let migration_dir_files = fs::read_dir(&migrations_dir).expect("Failed to read directory");
186
187 for file in migration_dir_files {
188 let file = file.expect("Failed to read migration file");
189
190 let path = file.path();
191 let filename = path.file_stem().unwrap().to_str().unwrap();
192
193 let filename_without_extension: &str = filename.split('.').next().unwrap();
194
195 let contents = fs::read_to_string(file.path()).expect("Failed to read file contents");
197
198 info!("File name: {}", filename);
201
202 if filename.contains(".down") {
203 info!("File contents: {}", contents);
204 migrations
205 .down
206 .push((filename_without_extension.into(), contents.clone()));
207 }
208
209 if filename.contains(".up") {
210 info!("File contents: {}", contents);
211 migrations
212 .up
213 .push((filename_without_extension.into(), contents.clone()));
214 }
215 }
216
217
218 migrations.up.sort_by(|a, b| a.0.cmp(&b.0));
220
221 migrations.down.sort_by(|a, b| b.0.cmp(&a.0));
223
224
225 return migrations;
226 }
227
228 pub async fn migrate(&mut self) -> Result<(), Box<dyn Error>> {
229 let client = &mut self.connect().await?;
230
231 let migrations_dir_path = self.migrations_dir_path.clone();
232 let mut migrations: Migrations = Self::read_migration_files(migrations_dir_path);
233
234 for up_migration in migrations.up.drain(..) {
235 println!("migrating {} {} ", up_migration.0, up_migration.1);
236 let migration = Migration::new("migrations".to_string());
237
238 migration.up(client, &[up_migration.to_str()]).await?;
240 }
241
242 Ok(())
244 }
245
246 pub async fn rollback(&mut self) -> Result<(), Box<dyn Error>> {
249 Ok(())
250 }
251
252 pub async fn rollback_full(&mut self) -> Result<(), Box<dyn Error>> {
253 let migrations_dir_path = self.migrations_dir_path.clone();
254
255 let mut migrations: Migrations = Self::read_migration_files(migrations_dir_path);
256
257 let client = &mut self.connect().await?;
258
259 for down_migration in migrations.down.drain(..)
260 {
261 println!("migrating {}", down_migration.0);
262 let migration = Migration::new("migrations".to_string());
263 migration.down(client, &[down_migration.to_str()]).await?;
265 }
266
267 Ok(())
268 }
269
270 pub async fn query(
271 &self,
272 query: &str,
273 params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
274 ) -> Result<Vec<tokio_postgres::Row>, PostgresModelError> {
275
276 let client = self.pool.get().await?;
283
284 let rows = client.query(query, params).await?;
286
287 Ok(rows)
288
289 }
290
291
292 pub async fn query_one(
293 & self,
294 query: &str,
295 params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
296 ) -> Result<tokio_postgres::Row, PostgresModelError> {
297 let client = self.pool.get().await ?;
307
308 let row = client.query_one(query, params).await?;
310
311 Ok(row)
312 }
313
314 pub async fn execute(
316 & self,
317 query: &str,
318 params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
319 ) -> Result<u64, PostgresModelError> {
320 let client = self.pool.get().await?;
331
332 let count = client.execute(query, params).await?;
334
335 Ok(count)
336
337
338 }
339
340
341
342 pub async fn check_connection(&self) -> Result<bool, PostgresModelError> {
343 let client = self.pool.get().await?;
345
346 match client.execute("SELECT 1", &[]).await {
348 Ok(_) => Ok(true),
349 Err(e) => Err(PostgresModelError::from(e))
350 }
351}
352
353 }
418
419pub fn try_get_option<'a, T: tokio_postgres::types::FromSql<'a>>(
420 row: &'a tokio_postgres::Row,
421 column: &str,
422) -> Option<T> {
423 match row.try_get::<&str, T>(column) {
424 Ok(value) => Some(value),
425 Err(_) => None,
426 }
427}