holochain_sqlite/db/
pool.rs

1use crate::db::key::DbKey;
2use holochain_serialized_bytes::prelude::*;
3use once_cell::sync::Lazy;
4use rusqlite::*;
5use scheduled_thread_pool::ScheduledThreadPool;
6use schemars::JsonSchema;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::{path::Path, sync::Arc, time::Duration};
9
10// Should never be getting a connection from the pool when one isn't available so this can be set low
11static CONNECTION_TIMEOUT_MS: AtomicU64 = AtomicU64::new(3_000);
12
13const SQLITE_BUSY_TIMEOUT: Duration = Duration::from_secs(30);
14
15static R2D2_THREADPOOL: Lazy<Arc<ScheduledThreadPool>> = Lazy::new(|| {
16    let t = ScheduledThreadPool::new(1);
17    Arc::new(t)
18});
19
20pub type ConnectionPool = r2d2::Pool<r2d2_sqlite::SqliteConnectionManager>;
21
22/// The sqlite synchronous level.
23/// Corresponds to the `PRAGMA synchronous` pragma.
24/// See [sqlite documentation](https://www.sqlite.org/pragma.html#pragma_synchronous).
25#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, Default)]
26pub enum DbSyncLevel {
27    /// Use xSync for all writes. Not needed for WAL mode.
28    Full,
29    /// Sync at critical moments. Default.
30    #[default]
31    Normal,
32    /// Syncing is left to the operating system and power loss could result in corrupted database.
33    Off,
34}
35
36/// The strategy for database file system synchronization.
37/// Some databases like the cache can be safely rebuilt if
38/// corruption occurs due to using the faster [`DbSyncLevel::Off`].
39#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, Default, JsonSchema)]
40pub enum DbSyncStrategy {
41    /// Allows databases that can be wiped and rebuilt to
42    /// use the faster [`DbSyncLevel::Off`].
43    /// This is the default.
44    Fast,
45    /// Makes all databases use at least [`DbSyncLevel::Normal`].
46    /// This is probably not needed unless you have an SSD and
47    /// would prefer to lower the chances of databases needing to
48    /// be rebuilt.
49    #[default]
50    Resilient,
51}
52
53/// Configuration for holochain_sqlite ConnectionPool.
54#[derive(Default, Debug, Clone)]
55pub struct PoolConfig {
56    /// The sqlite synchronous level.
57    pub synchronous_level: DbSyncLevel,
58
59    /// The key with which to encrypt this database.
60    pub key: DbKey,
61}
62
63pub(super) fn new_connection_pool(path: Option<&Path>, config: PoolConfig) -> ConnectionPool {
64    use r2d2_sqlite::SqliteConnectionManager;
65    let manager = match path {
66        Some(path) => SqliteConnectionManager::file(path),
67        None => SqliteConnectionManager::memory(),
68    };
69    let customizer = Box::new(ConnCustomizer { config });
70
71    /*
72     * We want
73     * - num_read_threads connections for standard read limit
74     * - num_read_threads for use in long running read transactions, to allow the normal pool to continue to be used
75     * - 1 connection for writing
76     */
77    let max_cons = num_read_threads() * 2 + 1;
78
79    r2d2::Pool::builder()
80        // Only up to 20 connections at a time
81        .max_size(max_cons as u32)
82        // Never maintain idle connections
83        .min_idle(Some(0))
84        // Close connections after 30-60 seconds of idle time
85        .idle_timeout(Some(Duration::from_secs(30)))
86        .connection_timeout(Duration::from_millis(
87            CONNECTION_TIMEOUT_MS.load(Ordering::Acquire),
88        ))
89        .thread_pool(R2D2_THREADPOOL.clone())
90        .connection_customizer(customizer)
91        .build(manager)
92        .unwrap()
93}
94
95#[derive(Debug)]
96struct ConnCustomizer {
97    config: PoolConfig,
98}
99
100impl r2d2::CustomizeConnection<Connection, rusqlite::Error> for ConnCustomizer {
101    fn on_acquire(&self, conn: &mut Connection) -> Result<(), rusqlite::Error> {
102        initialize_connection(conn, &self.config)?;
103        Ok(())
104    }
105}
106
107pub(super) fn initialize_connection(conn: &mut Connection, config: &PoolConfig) -> Result<()> {
108    // Tell SQLite to wait this long during write contention.
109    conn.busy_timeout(SQLITE_BUSY_TIMEOUT)?;
110
111    #[cfg(feature = "sqlite-encrypted")]
112    conn.execute_batch(&String::from_utf8_lossy(
113        &*config.key.unlocked.lock().unwrap().lock(),
114    ))?;
115
116    // this is recommended to always be off:
117    // https://sqlite.org/pragma.html#pragma_trusted_schema
118    conn.pragma_update(None, "trusted_schema", false)?;
119
120    // enable foreign key support
121    conn.pragma_update(None, "foreign_keys", "ON".to_string())?;
122
123    match config.synchronous_level {
124        DbSyncLevel::Full => conn.pragma_update(None, "synchronous", "2".to_string())?,
125        DbSyncLevel::Normal => conn.pragma_update(None, "synchronous", "1".to_string())?,
126        DbSyncLevel::Off => conn.pragma_update(None, "synchronous", "0".to_string())?,
127    }
128
129    vtab::array::load_module(conn)?;
130
131    Ok(())
132}
133
134pub fn num_read_threads() -> usize {
135    let num_cpus = num_cpus::get();
136    let num_threads = num_cpus.checked_div(2).unwrap_or(0);
137    std::cmp::max(num_threads, 4)
138}
139
140#[cfg(feature = "test_utils")]
141pub fn set_connection_timeout(timeout_ms: u64) {
142    CONNECTION_TIMEOUT_MS.store(timeout_ms, Ordering::Relaxed);
143}