1use std::sync::{Arc, OnceLock};
2use std::time::Duration;
3
4use diesel::prelude::*;
5use diesel::r2d2::{self, CustomizeConnection, State};
6use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness};
7use scheduled_thread_pool::ScheduledThreadPool;
8use url::Url;
9
10#[macro_use]
11mod macros;
12mod config;
13pub use crate::config::DbConfig;
14pub use palpo_core as core;
15
16pub mod full_text_search;
17
18pub mod pool;
19pub use pool::{DieselPool, PgPooledConnection, PoolError};
20
21pub mod room;
22pub mod schema;
23pub mod user;
24pub mod sending;
25pub mod misc;
26pub mod media;
27
28mod error;
29pub use error::DataError;
30
31use crate::core::{Seqnum, UnixMillis};
32
33pub type DataResult<T> = Result<T, DataError>;
34
35pub static DIESEL_POOL: OnceLock<DieselPool> = OnceLock::new();
36pub static REPLICA_POOL: OnceLock<Option<DieselPool>> = OnceLock::new();
37
38pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!();
39
40pub fn init(config: &DbConfig) {
41 let builder = r2d2::Pool::builder()
42 .max_size(config.pool_size)
43 .min_idle(config.min_idle)
44 .connection_timeout(Duration::from_millis(config.connection_timeout))
45 .connection_customizer(Box::new(config::ConnectionConfig {
46 statement_timeout: config.statement_timeout,
47 }))
48 .thread_pool(Arc::new(ScheduledThreadPool::new(config.helper_threads)));
49
50 let pool = DieselPool::new(&config.url, &config, builder).expect("diesel pool should be created");
51 DIESEL_POOL.set(pool).expect("diesel pool should be set");
52 migrate();
53}
54pub fn migrate() {
55 let conn = &mut connect().expect("db connect should worked");
56 conn.run_pending_migrations(MIGRATIONS)
57 .expect("migrate db should worked");
58}
59
60pub fn connect() -> Result<PgPooledConnection, PoolError> {
61 match DIESEL_POOL.get().expect("diesel pool should set").get() {
62 Ok(conn) => Ok(conn),
63 Err(e) => {
64 println!("db connect error {e}");
65 Err(e)
66 }
67 }
68}
69pub fn state() -> State {
70 DIESEL_POOL.get().expect("diesel pool should set").state()
71}
72
73pub fn connection_url(config: &DbConfig, url: &str) -> String {
74 let mut url = Url::parse(url).expect("Invalid database URL");
75
76 if config.enforce_tls {
77 maybe_append_url_param(&mut url, "sslmode", "require");
78 }
79
80 maybe_append_url_param(&mut url, "tcp_user_timeout", &config.tcp_timeout.to_string());
83
84 url.into()
85}
86
87fn maybe_append_url_param(url: &mut Url, key: &str, value: &str) {
88 if !url.query_pairs().any(|(k, _)| k == key) {
89 url.query_pairs_mut().append_pair(key, value);
90 }
91}
92
93pub fn next_sn() -> DataResult<Seqnum> {
94 diesel::dsl::sql::<diesel::sql_types::BigInt>("SELECT nextval('occur_sn_seq')")
95 .get_result::<Seqnum>(&mut connect()?)
96 .map_err(Into::into)
97}
98pub fn curr_sn() -> DataResult<Seqnum> {
99 diesel::dsl::sql::<diesel::sql_types::BigInt>("SELECT last_value from occur_sn_seq")
100 .get_result::<Seqnum>(&mut connect()?)
101 .map_err(Into::into)
102}
103