use crate::db::key::DbKey;
use crate::functions::add_custom_functions;
use holochain_serialized_bytes::prelude::*;
use once_cell::sync::Lazy;
use rusqlite::*;
use scheduled_thread_pool::ScheduledThreadPool;
use std::sync::atomic::{AtomicU64, Ordering};
use std::{path::Path, sync::Arc, time::Duration};
static CONNECTION_TIMEOUT_MS: AtomicU64 = AtomicU64::new(3_000);
const SQLITE_BUSY_TIMEOUT: Duration = Duration::from_secs(30);
static R2D2_THREADPOOL: Lazy<Arc<ScheduledThreadPool>> = Lazy::new(|| {
let t = ScheduledThreadPool::new(1);
Arc::new(t)
});
pub type ConnectionPool = r2d2::Pool<r2d2_sqlite::SqliteConnectionManager>;
#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, Default)]
pub enum DbSyncLevel {
Full,
#[default]
Normal,
Off,
}
#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, Default)]
pub enum DbSyncStrategy {
Fast,
#[default]
Resilient,
}
#[derive(Default, Debug, Clone)]
pub struct PoolConfig {
pub synchronous_level: DbSyncLevel,
pub key: DbKey,
}
pub(super) fn new_connection_pool(path: Option<&Path>, config: PoolConfig) -> ConnectionPool {
use r2d2_sqlite::SqliteConnectionManager;
let manager = match path {
Some(path) => SqliteConnectionManager::file(path),
None => SqliteConnectionManager::memory(),
};
let customizer = Box::new(ConnCustomizer { config });
let max_cons = num_read_threads() * 2 + 1;
r2d2::Pool::builder()
.max_size(max_cons as u32)
.min_idle(Some(0))
.idle_timeout(Some(Duration::from_secs(30)))
.connection_timeout(Duration::from_millis(
CONNECTION_TIMEOUT_MS.load(Ordering::Acquire),
))
.thread_pool(R2D2_THREADPOOL.clone())
.connection_customizer(customizer)
.build(manager)
.unwrap()
}
#[derive(Debug)]
struct ConnCustomizer {
config: PoolConfig,
}
impl r2d2::CustomizeConnection<Connection, rusqlite::Error> for ConnCustomizer {
fn on_acquire(&self, conn: &mut Connection) -> Result<(), rusqlite::Error> {
initialize_connection(conn, &self.config)?;
Ok(())
}
}
pub(super) fn initialize_connection(conn: &mut Connection, config: &PoolConfig) -> Result<()> {
conn.busy_timeout(SQLITE_BUSY_TIMEOUT)?;
#[cfg(feature = "sqlite-encrypted")]
conn.execute_batch(&String::from_utf8_lossy(&config.key.unlocked.read_lock()))?;
conn.pragma_update(None, "trusted_schema", false)?;
conn.pragma_update(None, "foreign_keys", "ON".to_string())?;
match config.synchronous_level {
DbSyncLevel::Full => conn.pragma_update(None, "synchronous", "2".to_string())?,
DbSyncLevel::Normal => conn.pragma_update(None, "synchronous", "1".to_string())?,
DbSyncLevel::Off => conn.pragma_update(None, "synchronous", "0".to_string())?,
}
add_custom_functions(conn)?;
Ok(())
}
pub fn num_read_threads() -> usize {
let num_cpus = num_cpus::get();
let num_threads = num_cpus.checked_div(2).unwrap_or(0);
std::cmp::max(num_threads, 4)
}
#[cfg(feature = "test_utils")]
pub fn set_connection_timeout(timeout_ms: u64) {
CONNECTION_TIMEOUT_MS.store(timeout_ms, Ordering::Relaxed);
}