1use lazy_static::lazy_static;
2use postgres::types::ToSql;
3use postgres::{Client, Error as PostgresError, NoTls, Row};
4use r2d2::Pool;
5use r2d2_postgres::PostgresConnectionManager;
6use std::env;
7use std::sync::{Arc, Mutex, Once};
8use std::time::Duration;
9
10fn create_postgres_error(_message: &str) -> PostgresError {
12 let result = Client::connect("invalid-connection-string", NoTls);
15 match result {
16 Ok(_) => unreachable!(), Err(e) => {
18 e
22 }
23 }
24}
25
26lazy_static! {
28 static ref POSTGRES_CLIENT: Mutex<Option<Arc<PostgresClientWrapper>>> = Mutex::new(None);
29 static ref POSTGRES_POOL: Mutex<Option<Arc<Pool<PostgresConnectionManager<NoTls>>>>> =
30 Mutex::new(None);
31 static ref INIT: Once = Once::new();
32}
33
34#[derive(Debug)]
39pub struct PostgresConfigBuilder {
40 pub host: String,
41 pub port: u16,
42 pub user: String,
43 pub password: Option<String>,
44 pub database: String,
45 pub application_name: Option<String>,
46 pub connect_timeout: Option<u64>,
47 pub ssl_mode: Option<String>,
48 pub pool_max_size: Option<u32>,
50 pub pool_min_idle: Option<u32>,
51 pub pool_idle_timeout: Option<Duration>,
52 pub pool_connection_timeout: Option<Duration>,
53 pub pool_max_lifetime: Option<Duration>,
54 pub use_pool: bool,
55}
56
57impl Default for PostgresConfigBuilder {
58 fn default() -> Self {
59 Self {
60 host: "localhost".to_string(),
61 port: 5432,
62 user: "postgres".to_string(),
63 password: None,
64 database: "postgres".to_string(),
65 application_name: None,
66 connect_timeout: None,
67 ssl_mode: None,
68 pool_max_size: Some(10),
70 pool_min_idle: Some(1),
71 pool_idle_timeout: Some(Duration::from_secs(300)),
72 pool_connection_timeout: Some(Duration::from_secs(30)),
73 pool_max_lifetime: Some(Duration::from_secs(1800)),
74 use_pool: false,
75 }
76 }
77}
78
79impl PostgresConfigBuilder {
80 pub fn new() -> Self {
82 Self::default()
83 }
84
85 pub fn host(mut self, host: &str) -> Self {
87 self.host = host.to_string();
88 self
89 }
90
91 pub fn port(mut self, port: u16) -> Self {
93 self.port = port;
94 self
95 }
96
97 pub fn user(mut self, user: &str) -> Self {
99 self.user = user.to_string();
100 self
101 }
102
103 pub fn password(mut self, password: &str) -> Self {
105 self.password = Some(password.to_string());
106 self
107 }
108
109 pub fn database(mut self, database: &str) -> Self {
111 self.database = database.to_string();
112 self
113 }
114
115 pub fn application_name(mut self, application_name: &str) -> Self {
117 self.application_name = Some(application_name.to_string());
118 self
119 }
120
121 pub fn connect_timeout(mut self, seconds: u64) -> Self {
123 self.connect_timeout = Some(seconds);
124 self
125 }
126
127 pub fn ssl_mode(mut self, ssl_mode: &str) -> Self {
129 self.ssl_mode = Some(ssl_mode.to_string());
130 self
131 }
132
133 pub fn use_pool(mut self, use_pool: bool) -> Self {
135 self.use_pool = use_pool;
136 self
137 }
138
139 pub fn pool_max_size(mut self, size: u32) -> Self {
141 self.pool_max_size = Some(size);
142 self
143 }
144
145 pub fn pool_min_idle(mut self, size: u32) -> Self {
147 self.pool_min_idle = Some(size);
148 self
149 }
150
151 pub fn pool_idle_timeout(mut self, timeout: Duration) -> Self {
153 self.pool_idle_timeout = Some(timeout);
154 self
155 }
156
157 pub fn pool_connection_timeout(mut self, timeout: Duration) -> Self {
159 self.pool_connection_timeout = Some(timeout);
160 self
161 }
162
163 pub fn pool_max_lifetime(mut self, lifetime: Duration) -> Self {
165 self.pool_max_lifetime = Some(lifetime);
166 self
167 }
168
169 pub fn build_connection_string(&self) -> String {
171 let mut conn_string = format!(
172 "host={} port={} user={} dbname={}",
173 self.host, self.port, self.user, self.database
174 );
175
176 if let Some(password) = &self.password {
177 conn_string.push_str(&format!(" password={}", password));
178 }
179
180 if let Some(app_name) = &self.application_name {
181 conn_string.push_str(&format!(" application_name={}", app_name));
182 }
183
184 if let Some(timeout) = self.connect_timeout {
185 conn_string.push_str(&format!(" connect_timeout={}", timeout));
186 }
187
188 if let Some(ssl_mode) = &self.ssl_mode {
189 conn_string.push_str(&format!(" sslmode={}", ssl_mode));
190 }
191
192 conn_string
193 }
194
195 pub fn build(&self) -> Result<Client, PostgresError> {
197 let conn_string = self.build_connection_string();
198 Client::connect(&conn_string, NoTls)
199 }
200
201 pub fn build_pool(&self) -> Result<Pool<PostgresConnectionManager<NoTls>>, r2d2::Error> {
203 let conn_string = self.build_connection_string();
204 let manager = PostgresConnectionManager::new(conn_string.parse().unwrap(), NoTls);
205
206 let mut pool_builder = r2d2::Pool::builder();
207
208 if let Some(max_size) = self.pool_max_size {
209 pool_builder = pool_builder.max_size(max_size);
210 }
211
212 if let Some(min_idle) = self.pool_min_idle {
213 pool_builder = pool_builder.min_idle(Some(min_idle));
214 }
215
216 if let Some(idle_timeout) = self.pool_idle_timeout {
217 pool_builder = pool_builder.idle_timeout(Some(idle_timeout));
218 }
219
220 if let Some(connection_timeout) = self.pool_connection_timeout {
221 pool_builder = pool_builder.connection_timeout(connection_timeout);
222 }
223
224 if let Some(max_lifetime) = self.pool_max_lifetime {
225 pool_builder = pool_builder.max_lifetime(Some(max_lifetime));
226 }
227
228 pool_builder.build(manager)
229 }
230}
231
232pub struct PostgresClientWrapper {
234 connection_string: String,
235 client: Mutex<Option<Client>>,
236}
237
238pub fn transaction<F, T>(operations: F) -> Result<T, PostgresError>
258where
259 F: FnOnce(&mut Client) -> Result<T, PostgresError>,
260{
261 let client = get_postgres_client()?;
262 let client_mutex = client.get_client()?;
263 let mut client_guard = client_mutex.lock().unwrap();
264
265 if let Some(client) = client_guard.as_mut() {
266 client.execute("BEGIN", &[])?;
268
269 match operations(client) {
271 Ok(result) => {
272 client.execute("COMMIT", &[])?;
274 Ok(result)
275 }
276 Err(e) => {
277 let _ = client.execute("ROLLBACK", &[]);
279 Err(e)
280 }
281 }
282 } else {
283 Err(create_postgres_error("Failed to get PostgreSQL client"))
284 }
285}
286
287pub fn transaction_with_pool<F, T>(operations: F) -> Result<T, PostgresError>
307where
308 F: FnOnce(&mut Client) -> Result<T, PostgresError>,
309{
310 let pool = get_postgres_pool()?;
311 let mut client = pool.get().map_err(|e| {
312 create_postgres_error(&format!("Failed to get connection from pool: {}", e))
313 })?;
314
315 client.execute("BEGIN", &[])?;
317
318 match operations(&mut client) {
320 Ok(result) => {
321 client.execute("COMMIT", &[])?;
323 Ok(result)
324 }
325 Err(e) => {
326 let _ = client.execute("ROLLBACK", &[]);
328 Err(e)
329 }
330 }
331}
332
333impl PostgresClientWrapper {
334 fn new(connection_string: String) -> Self {
336 PostgresClientWrapper {
337 connection_string,
338 client: Mutex::new(None),
339 }
340 }
341
342 fn get_client(&self) -> Result<&Mutex<Option<Client>>, PostgresError> {
344 let mut client_guard = self.client.lock().unwrap();
345
346 if client_guard.is_none() {
348 *client_guard = Some(Client::connect(&self.connection_string, NoTls)?);
349 }
350
351 Ok(&self.client)
352 }
353
354 pub fn execute(
356 &self,
357 query: &str,
358 params: &[&(dyn postgres::types::ToSql + Sync)],
359 ) -> Result<u64, PostgresError> {
360 let client_mutex = self.get_client()?;
361 let mut client_guard = client_mutex.lock().unwrap();
362
363 if let Some(client) = client_guard.as_mut() {
364 client.execute(query, params)
365 } else {
366 Err(create_postgres_error("Failed to get PostgreSQL client"))
367 }
368 }
369
370 pub fn query(
372 &self,
373 query: &str,
374 params: &[&(dyn postgres::types::ToSql + Sync)],
375 ) -> Result<Vec<Row>, PostgresError> {
376 let client_mutex = self.get_client()?;
377 let mut client_guard = client_mutex.lock().unwrap();
378
379 if let Some(client) = client_guard.as_mut() {
380 client.query(query, params)
381 } else {
382 Err(create_postgres_error("Failed to get PostgreSQL client"))
383 }
384 }
385
386 pub fn query_one(
388 &self,
389 query: &str,
390 params: &[&(dyn postgres::types::ToSql + Sync)],
391 ) -> Result<Row, PostgresError> {
392 let client_mutex = self.get_client()?;
393 let mut client_guard = client_mutex.lock().unwrap();
394
395 if let Some(client) = client_guard.as_mut() {
396 client.query_one(query, params)
397 } else {
398 Err(create_postgres_error("Failed to get PostgreSQL client"))
399 }
400 }
401
402 pub fn query_opt(
404 &self,
405 query: &str,
406 params: &[&(dyn postgres::types::ToSql + Sync)],
407 ) -> Result<Option<Row>, PostgresError> {
408 let client_mutex = self.get_client()?;
409 let mut client_guard = client_mutex.lock().unwrap();
410
411 if let Some(client) = client_guard.as_mut() {
412 client.query_opt(query, params)
413 } else {
414 Err(create_postgres_error("Failed to get PostgreSQL client"))
415 }
416 }
417
418 pub fn ping(&self) -> Result<bool, PostgresError> {
420 let result = self.query("SELECT 1", &[]);
421 match result {
422 Ok(_) => Ok(true),
423 Err(e) => Err(e),
424 }
425 }
426}
427
428pub fn get_postgres_client() -> Result<Arc<PostgresClientWrapper>, PostgresError> {
430 {
432 let guard = POSTGRES_CLIENT.lock().unwrap();
433 if let Some(ref client) = &*guard {
434 return Ok(Arc::clone(client));
435 }
436 }
437
438 let client = create_postgres_client()?;
440
441 {
443 let mut guard = POSTGRES_CLIENT.lock().unwrap();
444 *guard = Some(Arc::clone(&client));
445 }
446
447 Ok(client)
448}
449
450fn create_postgres_client() -> Result<Arc<PostgresClientWrapper>, PostgresError> {
452 let host = env::var("POSTGRES_HOST").unwrap_or_else(|_| String::from("localhost"));
454 let port = env::var("POSTGRES_PORT")
455 .ok()
456 .and_then(|p| p.parse::<u16>().ok())
457 .unwrap_or(5432);
458 let user = env::var("POSTGRES_USER").unwrap_or_else(|_| String::from("postgres"));
459 let password = env::var("POSTGRES_PASSWORD").ok();
460 let database = env::var("POSTGRES_DB").unwrap_or_else(|_| String::from("postgres"));
461
462 let mut builder = PostgresConfigBuilder::new()
464 .host(&host)
465 .port(port)
466 .user(&user)
467 .database(&database);
468
469 if let Some(pass) = password {
470 builder = builder.password(&pass);
471 }
472
473 let connection_string = builder.build_connection_string();
474
475 let wrapper = Arc::new(PostgresClientWrapper::new(connection_string));
477
478 match wrapper.ping() {
480 Ok(_) => Ok(wrapper),
481 Err(e) => Err(e),
482 }
483}
484
485pub fn reset() -> Result<(), PostgresError> {
487 {
489 let mut client_guard = POSTGRES_CLIENT.lock().unwrap();
490 *client_guard = None;
491 }
492
493 get_postgres_client()?;
495 Ok(())
496}
497
498pub fn execute(
500 query: &str,
501 params: &[&(dyn postgres::types::ToSql + Sync)],
502) -> Result<u64, PostgresError> {
503 let client = get_postgres_client()?;
504 client.execute(query, params)
505}
506
507pub fn query(
509 query: &str,
510 params: &[&(dyn postgres::types::ToSql + Sync)],
511) -> Result<Vec<Row>, PostgresError> {
512 let client = get_postgres_client()?;
513 client.query(query, params)
514}
515
516pub fn query_one(
518 query: &str,
519 params: &[&(dyn postgres::types::ToSql + Sync)],
520) -> Result<Row, PostgresError> {
521 let client = get_postgres_client()?;
522 client.query_one(query, params)
523}
524
525pub fn query_opt(
527 query: &str,
528 params: &[&(dyn postgres::types::ToSql + Sync)],
529) -> Result<Option<Row>, PostgresError> {
530 let client = get_postgres_client()?;
531 client.query_opt(query, params)
532}
533
534pub fn with_config(config: PostgresConfigBuilder) -> Result<Client, PostgresError> {
536 config.build()
537}
538
539pub fn with_pool_config(
541 config: PostgresConfigBuilder,
542) -> Result<Pool<PostgresConnectionManager<NoTls>>, r2d2::Error> {
543 config.build_pool()
544}
545
546pub fn get_postgres_pool() -> Result<Arc<Pool<PostgresConnectionManager<NoTls>>>, PostgresError> {
548 {
550 let guard = POSTGRES_POOL.lock().unwrap();
551 if let Some(ref pool) = &*guard {
552 return Ok(Arc::clone(pool));
553 }
554 }
555
556 let pool = create_postgres_pool()?;
558
559 {
561 let mut guard = POSTGRES_POOL.lock().unwrap();
562 *guard = Some(Arc::clone(&pool));
563 }
564
565 Ok(pool)
566}
567
568fn create_postgres_pool() -> Result<Arc<Pool<PostgresConnectionManager<NoTls>>>, PostgresError> {
570 let host = env::var("POSTGRES_HOST").unwrap_or_else(|_| String::from("localhost"));
572 let port = env::var("POSTGRES_PORT")
573 .ok()
574 .and_then(|p| p.parse::<u16>().ok())
575 .unwrap_or(5432);
576 let user = env::var("POSTGRES_USER").unwrap_or_else(|_| String::from("postgres"));
577 let password = env::var("POSTGRES_PASSWORD").ok();
578 let database = env::var("POSTGRES_DB").unwrap_or_else(|_| String::from("postgres"));
579
580 let mut builder = PostgresConfigBuilder::new()
582 .host(&host)
583 .port(port)
584 .user(&user)
585 .database(&database)
586 .use_pool(true);
587
588 if let Some(pass) = password {
589 builder = builder.password(&pass);
590 }
591
592 match builder.build_pool() {
594 Ok(pool) => {
595 match pool.get() {
597 Ok(_) => Ok(Arc::new(pool)),
598 Err(e) => Err(create_postgres_error(&format!(
599 "Failed to connect to PostgreSQL: {}",
600 e
601 ))),
602 }
603 }
604 Err(e) => Err(create_postgres_error(&format!(
605 "Failed to create PostgreSQL connection pool: {}",
606 e
607 ))),
608 }
609}
610
611pub fn reset_pool() -> Result<(), PostgresError> {
613 {
615 let mut pool_guard = POSTGRES_POOL.lock().unwrap();
616 *pool_guard = None;
617 }
618
619 get_postgres_pool()?;
621 Ok(())
622}
623
624pub fn execute_with_pool(
626 query: &str,
627 params: &[&(dyn postgres::types::ToSql + Sync)],
628) -> Result<u64, PostgresError> {
629 let pool = get_postgres_pool()?;
630 let mut client = pool.get().map_err(|e| {
631 create_postgres_error(&format!("Failed to get connection from pool: {}", e))
632 })?;
633 client.execute(query, params)
634}
635
636pub fn query_with_pool(
638 query: &str,
639 params: &[&(dyn postgres::types::ToSql + Sync)],
640) -> Result<Vec<Row>, PostgresError> {
641 let pool = get_postgres_pool()?;
642 let mut client = pool.get().map_err(|e| {
643 create_postgres_error(&format!("Failed to get connection from pool: {}", e))
644 })?;
645 client.query(query, params)
646}
647
648pub fn query_one_with_pool(
650 query: &str,
651 params: &[&(dyn postgres::types::ToSql + Sync)],
652) -> Result<Row, PostgresError> {
653 let pool = get_postgres_pool()?;
654 let mut client = pool.get().map_err(|e| {
655 create_postgres_error(&format!("Failed to get connection from pool: {}", e))
656 })?;
657 client.query_one(query, params)
658}
659
660pub fn query_opt_with_pool(
662 query: &str,
663 params: &[&(dyn postgres::types::ToSql + Sync)],
664) -> Result<Option<Row>, PostgresError> {
665 let pool = get_postgres_pool()?;
666 let mut client = pool.get().map_err(|e| {
667 create_postgres_error(&format!("Failed to get connection from pool: {}", e))
668 })?;
669 client.query_opt(query, params)
670}
671
672#[derive(Default)]
677pub struct QueryParams {
678 params: Vec<Box<dyn ToSql + Sync>>,
679}
680
681impl QueryParams {
682 pub fn new() -> Self {
684 Self { params: Vec::new() }
685 }
686
687 pub fn add<T: 'static + ToSql + Sync>(&mut self, value: T) -> &mut Self {
689 self.params.push(Box::new(value));
690 self
691 }
692
693 pub fn add_str(&mut self, value: &str) -> &mut Self {
695 self.add(value.to_string())
696 }
697
698 pub fn add_int(&mut self, value: i32) -> &mut Self {
700 self.add(value)
701 }
702
703 pub fn add_float(&mut self, value: f64) -> &mut Self {
705 self.add(value)
706 }
707
708 pub fn add_bool(&mut self, value: bool) -> &mut Self {
710 self.add(value)
711 }
712
713 pub fn add_opt<T: 'static + ToSql + Sync>(&mut self, value: Option<T>) -> &mut Self {
715 if let Some(v) = value {
716 self.add(v);
717 } else {
718 self.params.push(Box::new(None::<String>));
720 }
721 self
722 }
723
724 pub fn as_slice(&self) -> Vec<&(dyn ToSql + Sync)> {
726 self.params
727 .iter()
728 .map(|p| p.as_ref() as &(dyn ToSql + Sync))
729 .collect()
730 }
731}
732
733pub fn execute_with_params(query_str: &str, params: &QueryParams) -> Result<u64, PostgresError> {
735 let client = get_postgres_client()?;
736 client.execute(query_str, ¶ms.as_slice())
737}
738
739pub fn query_with_params(query_str: &str, params: &QueryParams) -> Result<Vec<Row>, PostgresError> {
741 let client = get_postgres_client()?;
742 client.query(query_str, ¶ms.as_slice())
743}
744
745pub fn query_one_with_params(query_str: &str, params: &QueryParams) -> Result<Row, PostgresError> {
747 let client = get_postgres_client()?;
748 client.query_one(query_str, ¶ms.as_slice())
749}
750
751pub fn query_opt_with_params(
753 query_str: &str,
754 params: &QueryParams,
755) -> Result<Option<Row>, PostgresError> {
756 let client = get_postgres_client()?;
757 client.query_opt(query_str, ¶ms.as_slice())
758}
759
760pub fn execute_with_pool_params(
762 query_str: &str,
763 params: &QueryParams,
764) -> Result<u64, PostgresError> {
765 execute_with_pool(query_str, ¶ms.as_slice())
766}
767
768pub fn query_with_pool_params(
770 query_str: &str,
771 params: &QueryParams,
772) -> Result<Vec<Row>, PostgresError> {
773 query_with_pool(query_str, ¶ms.as_slice())
774}
775
776pub fn query_one_with_pool_params(
778 query_str: &str,
779 params: &QueryParams,
780) -> Result<Row, PostgresError> {
781 query_one_with_pool(query_str, ¶ms.as_slice())
782}
783
784pub fn query_opt_with_pool_params(
786 query_str: &str,
787 params: &QueryParams,
788) -> Result<Option<Row>, PostgresError> {
789 query_opt_with_pool(query_str, ¶ms.as_slice())
790}
791
792pub fn notify(channel: &str, payload: &str) -> Result<(), PostgresError> {
803 let client = get_postgres_client()?;
804 client.execute(&format!("NOTIFY {}, '{}'", channel, payload), &[])?;
805 Ok(())
806}
807
808pub fn notify_with_pool(channel: &str, payload: &str) -> Result<(), PostgresError> {
819 let pool = get_postgres_pool()?;
820 let mut client = pool.get().map_err(|e| {
821 create_postgres_error(&format!("Failed to get connection from pool: {}", e))
822 })?;
823 client.execute(&format!("NOTIFY {}, '{}'", channel, payload), &[])?;
824 Ok(())
825}