holochain_sqlite/db/
pool.rs1use crate::db::key::DbKey;
2use holochain_serialized_bytes::prelude::*;
3use once_cell::sync::Lazy;
4use rusqlite::*;
5use scheduled_thread_pool::ScheduledThreadPool;
6use schemars::JsonSchema;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::{path::Path, sync::Arc, time::Duration};
9
10static CONNECTION_TIMEOUT_MS: AtomicU64 = AtomicU64::new(3_000);
12
13const SQLITE_BUSY_TIMEOUT: Duration = Duration::from_secs(30);
14
15static R2D2_THREADPOOL: Lazy<Arc<ScheduledThreadPool>> = Lazy::new(|| {
16 let t = ScheduledThreadPool::new(1);
17 Arc::new(t)
18});
19
20pub type ConnectionPool = r2d2::Pool<r2d2_sqlite::SqliteConnectionManager>;
21
22#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, Default)]
26pub enum DbSyncLevel {
27 Full,
29 #[default]
31 Normal,
32 Off,
34}
35
36#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, Default, JsonSchema)]
40pub enum DbSyncStrategy {
41 Fast,
45 #[default]
50 Resilient,
51}
52
53#[derive(Default, Debug, Clone)]
55pub struct PoolConfig {
56 pub synchronous_level: DbSyncLevel,
58
59 pub key: DbKey,
61}
62
63pub(super) fn new_connection_pool(path: Option<&Path>, config: PoolConfig) -> ConnectionPool {
64 use r2d2_sqlite::SqliteConnectionManager;
65 let manager = match path {
66 Some(path) => SqliteConnectionManager::file(path),
67 None => SqliteConnectionManager::memory(),
68 };
69 let customizer = Box::new(ConnCustomizer { config });
70
71 let max_cons = num_read_threads() * 2 + 1;
78
79 r2d2::Pool::builder()
80 .max_size(max_cons as u32)
82 .min_idle(Some(0))
84 .idle_timeout(Some(Duration::from_secs(30)))
86 .connection_timeout(Duration::from_millis(
87 CONNECTION_TIMEOUT_MS.load(Ordering::Acquire),
88 ))
89 .thread_pool(R2D2_THREADPOOL.clone())
90 .connection_customizer(customizer)
91 .build(manager)
92 .unwrap()
93}
94
95#[derive(Debug)]
96struct ConnCustomizer {
97 config: PoolConfig,
98}
99
100impl r2d2::CustomizeConnection<Connection, rusqlite::Error> for ConnCustomizer {
101 fn on_acquire(&self, conn: &mut Connection) -> Result<(), rusqlite::Error> {
102 initialize_connection(conn, &self.config)?;
103 Ok(())
104 }
105}
106
107pub(super) fn initialize_connection(conn: &mut Connection, config: &PoolConfig) -> Result<()> {
108 conn.busy_timeout(SQLITE_BUSY_TIMEOUT)?;
110
111 #[cfg(feature = "sqlite-encrypted")]
112 conn.execute_batch(&String::from_utf8_lossy(
113 &*config.key.unlocked.lock().unwrap().lock(),
114 ))?;
115
116 conn.pragma_update(None, "trusted_schema", false)?;
119
120 conn.pragma_update(None, "foreign_keys", "ON".to_string())?;
122
123 match config.synchronous_level {
124 DbSyncLevel::Full => conn.pragma_update(None, "synchronous", "2".to_string())?,
125 DbSyncLevel::Normal => conn.pragma_update(None, "synchronous", "1".to_string())?,
126 DbSyncLevel::Off => conn.pragma_update(None, "synchronous", "0".to_string())?,
127 }
128
129 vtab::array::load_module(conn)?;
130
131 Ok(())
132}
133
134pub fn num_read_threads() -> usize {
135 let num_cpus = num_cpus::get();
136 let num_threads = num_cpus.checked_div(2).unwrap_or(0);
137 std::cmp::max(num_threads, 4)
138}
139
140#[cfg(feature = "test_utils")]
141pub fn set_connection_timeout(timeout_ms: u64) {
142 CONNECTION_TIMEOUT_MS.store(timeout_ms, Ordering::Relaxed);
143}