palpo_data/
lib.rs

1use std::sync::{Arc, OnceLock};
2use std::time::Duration;
3
4use diesel::prelude::*;
5use diesel::r2d2::{self, CustomizeConnection, State};
6use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness};
7use scheduled_thread_pool::ScheduledThreadPool;
8use url::Url;
9
10#[macro_use]
11mod macros;
12mod config;
13pub use crate::config::DbConfig;
14pub use palpo_core as core;
15
16pub mod full_text_search;
17
18pub mod pool;
19pub use pool::{DieselPool, PgPooledConnection, PoolError};
20
21pub mod room;
22pub mod schema;
23pub mod user;
24pub mod sending;
25pub mod misc;
26pub mod media;
27
28mod error;
29pub use error::DataError;
30
31use crate::core::{Seqnum, UnixMillis};
32
33pub type DataResult<T> = Result<T, DataError>;
34
35pub static DIESEL_POOL: OnceLock<DieselPool> = OnceLock::new();
36pub static REPLICA_POOL: OnceLock<Option<DieselPool>> = OnceLock::new();
37
38pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!();
39
40pub fn init(config: &DbConfig) {
41    let builder = r2d2::Pool::builder()
42        .max_size(config.pool_size)
43        .min_idle(config.min_idle)
44        .connection_timeout(Duration::from_millis(config.connection_timeout))
45        .connection_customizer(Box::new(config::ConnectionConfig {
46            statement_timeout: config.statement_timeout,
47        }))
48        .thread_pool(Arc::new(ScheduledThreadPool::new(config.helper_threads)));
49
50    let pool = DieselPool::new(&config.url, &config, builder).expect("diesel pool should be created");
51    DIESEL_POOL.set(pool).expect("diesel pool should be set");
52    migrate();
53}
54pub fn migrate() {
55    let conn = &mut connect().expect("db connect should worked");
56    conn.run_pending_migrations(MIGRATIONS)
57        .expect("migrate db should worked");
58}
59
60pub fn connect() -> Result<PgPooledConnection, PoolError> {
61    match DIESEL_POOL.get().expect("diesel pool should set").get() {
62        Ok(conn) => Ok(conn),
63        Err(e) => {
64            println!("db connect error {e}");
65            Err(e)
66        }
67    }
68}
69pub fn state() -> State {
70    DIESEL_POOL.get().expect("diesel pool should set").state()
71}
72
73pub fn connection_url(config: &DbConfig, url: &str) -> String {
74    let mut url = Url::parse(url).expect("Invalid database URL");
75
76    if config.enforce_tls {
77        maybe_append_url_param(&mut url, "sslmode", "require");
78    }
79
80    // Configure the time it takes for diesel to return an error when there is full packet loss
81    // between the application and the database.
82    maybe_append_url_param(&mut url, "tcp_user_timeout", &config.tcp_timeout.to_string());
83
84    url.into()
85}
86
87fn maybe_append_url_param(url: &mut Url, key: &str, value: &str) {
88    if !url.query_pairs().any(|(k, _)| k == key) {
89        url.query_pairs_mut().append_pair(key, value);
90    }
91}
92
93pub fn next_sn() -> DataResult<Seqnum> {
94    diesel::dsl::sql::<diesel::sql_types::BigInt>("SELECT nextval('occur_sn_seq')")
95        .get_result::<Seqnum>(&mut connect()?)
96        .map_err(Into::into)
97}
98pub fn curr_sn() -> DataResult<Seqnum> {
99    diesel::dsl::sql::<diesel::sql_types::BigInt>("SELECT last_value from occur_sn_seq")
100        .get_result::<Seqnum>(&mut connect()?)
101        .map_err(Into::into)
102}
103