use crate::db::key::DbKey;
use holochain_serialized_bytes::prelude::*;
use once_cell::sync::Lazy;
use rusqlite::*;
use scheduled_thread_pool::ScheduledThreadPool;
use schemars::JsonSchema;
use std::sync::atomic::{AtomicU64, Ordering};
use std::{path::Path, sync::Arc, time::Duration};
static CONNECTION_TIMEOUT_MS: AtomicU64 = AtomicU64::new(3_000);
const SQLITE_BUSY_TIMEOUT: Duration = Duration::from_secs(30);
static R2D2_THREADPOOL: Lazy<Arc<ScheduledThreadPool>> = Lazy::new(|| {
let t = ScheduledThreadPool::new(1);
Arc::new(t)
});
pub type ConnectionPool = r2d2::Pool<r2d2_sqlite::SqliteConnectionManager>;
#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, Default)]
pub enum DbSyncLevel {
Full,
#[default]
Normal,
Off,
}
#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, Default, JsonSchema)]
#[serde(deny_unknown_fields)]
pub enum DbSyncStrategy {
Fast,
#[default]
Resilient,
}
#[derive(Debug, Clone)]
pub struct PoolConfig {
pub synchronous_level: DbSyncLevel,
pub key: DbKey,
pub max_readers: u16,
}
impl Default for PoolConfig {
fn default() -> Self {
Self {
synchronous_level: Default::default(),
key: Default::default(),
max_readers: 8,
}
}
}
pub(super) fn new_connection_pool(path: Option<&Path>, config: PoolConfig) -> ConnectionPool {
use r2d2_sqlite::SqliteConnectionManager;
let manager = match path {
Some(path) => SqliteConnectionManager::file(path),
None => SqliteConnectionManager::memory(),
};
let max_size = config.max_readers as u32 + 1;
let customizer = Box::new(ConnCustomizer { config });
r2d2::Pool::builder()
.max_size(max_size)
.min_idle(Some(0))
.idle_timeout(Some(Duration::from_secs(30)))
.connection_timeout(Duration::from_millis(
CONNECTION_TIMEOUT_MS.load(Ordering::Acquire),
))
.thread_pool(R2D2_THREADPOOL.clone())
.connection_customizer(customizer)
.build(manager)
.unwrap()
}
#[derive(Debug)]
struct ConnCustomizer {
config: PoolConfig,
}
impl r2d2::CustomizeConnection<Connection, rusqlite::Error> for ConnCustomizer {
fn on_acquire(&self, conn: &mut Connection) -> Result<(), rusqlite::Error> {
initialize_connection(conn, &self.config)?;
Ok(())
}
}
pub(super) fn initialize_connection(conn: &mut Connection, config: &PoolConfig) -> Result<()> {
conn.busy_timeout(SQLITE_BUSY_TIMEOUT)?;
#[cfg(feature = "sqlite-encrypted")]
conn.execute_batch(&String::from_utf8_lossy(
&*config.key.unlocked.lock().unwrap().lock(),
))?;
conn.pragma_update(None, "trusted_schema", false)?;
conn.pragma_update(None, "foreign_keys", "ON".to_string())?;
match config.synchronous_level {
DbSyncLevel::Full => conn.pragma_update(None, "synchronous", "2".to_string())?,
DbSyncLevel::Normal => conn.pragma_update(None, "synchronous", "1".to_string())?,
DbSyncLevel::Off => conn.pragma_update(None, "synchronous", "0".to_string())?,
}
vtab::array::load_module(conn)?;
Ok(())
}
#[cfg(feature = "test_utils")]
pub fn set_connection_timeout(timeout_ms: u64) {
CONNECTION_TIMEOUT_MS.store(timeout_ms, Ordering::Relaxed);
}