Skip to main content

matrix_sdk_sqlite/
connection.rs

1// Copyright 2025 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! An implementation of `deadpool` for `rusqlite`.
16//!
17//! Initially, we were using `deadpool-sqlite`, that is also using `rusqlite` as
18//! the SQLite interface. However, in the implementation of
19//! [`deadpool::managed::Manager`], when recycling an object (i.e. an SQLite
20//! connection), [a SQL query is run to detect whether the connection is still
21//! alive][connection-test]. It creates performance issues:
22//!
23//! 1. It runs a prepared SQL query, which has a non-negligle cost. Imagine each
24//!    connection is used to run on average one query; when recycled, a second
25//!    query was constantly run. Even if it's a simple query, it requires to
26//!    prepare a statement, to run and to query it.
27//! 2. The SQL query was run in a blocking task. Indeed,
28//!    `deadpool_runtime::spawn_blocking` is used (via
29//!    `deadpool_sync::SyncWrapper::interact`), which includes [blocking the
30//!    thread, acquiring a lock][spawn_blocking] etc. All this has more
31//!    performance cost.
32//!
33//! Measures have shown it is a performance bottleneck for us, especially on
34//! Android. Why specifically on Android and not other systems? This is still
35//! unclear at the time of writing (2025-11-11), despites having spent several
36//! days digging and trying to find an answer to this question.
37//!
38//! We have tried to use another approach to test the aliveness of the
39//! connections without running queries. It has involved patching `rusqlite` to
40//! add more bindings to SQLite, and patching `deadpool` itself, but without any
41//! successful results.
42//!
43//! Finally, we have started questioning the reason of this test: why testing
44//! whether the connection was still alive? After all, there is no reason a
45//! connection should die in our case:
46//!
47//! - all connections are local,
48//! - all interactions are behind [WAL], which is local only,
49//! - even if for an unknown reason the connection died, using it next time
50//!   would create an error… exactly what would happen when recycling the
51//!   connection.
52//!
53//! Consequently, we have created a new implementation of `deadpool` for
54//! `rusqlite` that doesn't test the aliveness of the connections when recycled.
55//! We assume they are all alive.
56//!
57//! This implementation is, at the time of writing (2025-11-11):
58//!
59//! - 3.5 times faster on Android than `deadpool-sqlite`, removing the lock and
60//!   thread contention entirely,
61//! - 2 times faster on iOS.
62//!
63//! [connection-test]: https://github.com/deadpool-rs/deadpool/blob/d6f7d58756f0cc7bdd1f3d54d820c1332d67e4d5/crates/deadpool-sqlite/src/lib.rs#L80-L100
64//! [spawn_blocking]: https://github.com/deadpool-rs/deadpool/blob/d6f7d58756f0cc7bdd1f3d54d820c1332d67e4d5/crates/deadpool-sync/src/lib.rs#L113-L131
65//! [WAL]: https://www.sqlite.org/wal.html
66
67use std::{convert::Infallible, path::PathBuf, sync::Arc, time::Duration};
68
69pub use deadpool::managed::reexports::*;
70use deadpool::managed::{self, Metrics, PoolConfig, RecycleError};
71use deadpool_sync::SyncWrapper;
72use tokio::sync::Mutex;
73use tracing::{info, warn};
74
75/// The default runtime used by `matrix-sdk-sqlite` for `deadpool`.
76pub const RUNTIME: Runtime = Runtime::Tokio1;
77
78deadpool::managed_reexports!(
79    "matrix-sdk-sqlite",
80    Manager,
81    managed::Object<Manager>,
82    rusqlite::Error,
83    Infallible
84);
85
86/// Type representing a connection to SQLite from the [`Pool`].
87pub type Connection = Object;
88
89/// [`Manager`][managed::Manager] for creating and recycling SQLite
90/// [`Connection`]s.
91#[derive(Debug)]
92pub struct Manager {
93    pub(crate) database_path: PathBuf,
94}
95
96impl Manager {
97    /// Creates a new [`Manager`] for a database.
98    #[must_use]
99    pub fn new(database_path: PathBuf) -> Self {
100        Self { database_path }
101    }
102}
103
104impl managed::Manager for Manager {
105    type Type = SyncWrapper<rusqlite::Connection>;
106    type Error = rusqlite::Error;
107
108    async fn create(&self) -> Result<Self::Type, Self::Error> {
109        let path = self.database_path.clone();
110        SyncWrapper::new(RUNTIME, move || rusqlite::Connection::open(path)).await
111    }
112
113    async fn recycle(
114        &self,
115        conn: &mut Self::Type,
116        _: &Metrics,
117    ) -> managed::RecycleResult<Self::Error> {
118        if conn.is_mutex_poisoned() {
119            return Err(RecycleError::Message(
120                "Mutex is poisoned. Connection is considered unusable.".into(),
121            ));
122        }
123
124        Ok(())
125    }
126}
127
128/// Gracefully close the store-owned write connection.
129///
130/// Callers are expected to remove the enclosing [`SqliteConnections`] from the
131/// store before calling this helper so no new write acquisitions can happen
132/// through the store API.
133///
134/// 1. Waits for any in-flight write to complete by acquiring the lock.
135/// 2. Runs a WAL checkpoint (TRUNCATE) to flush pending data to the main
136///    database file and release WAL locks.
137/// 3. Drops the store-owned `Arc` on a blocking thread.
138///
139/// This is still best effort: if another cloned `Arc` or an
140/// `OwnedMutexGuard<Connection>` is still alive elsewhere, the underlying
141/// SQLite connection may remain alive until that handle is dropped.
142pub async fn close_connection(write_connection: Arc<Mutex<Connection>>) {
143    // Acquire the lock to wait for any in-flight write to complete.
144    let guard = write_connection.lock().await;
145
146    // Flush WAL and release locks while we still own the connection.
147    let _ = guard
148        .interact(|raw| {
149            raw.execute_batch("PRAGMA locking_mode = NORMAL; PRAGMA wal_checkpoint(TRUNCATE);")
150                .ok();
151        })
152        .await;
153
154    drop(guard);
155
156    // Drop the store-owned Arc on a blocking thread.
157    let _ = tokio::task::spawn_blocking(move || drop(write_connection)).await;
158}
159
160/// Live database connections held by each SQLite store.
161///
162/// Wrapped in `Option<SqliteConnections>` guarded by a `Mutex` inside each
163/// store; `Some` means the store is active, `None` means it is closed.
164pub(crate) struct SqliteConnections {
165    /// The pool of read connections.
166    pub pool: Pool,
167    /// The dedicated write connection.
168    ///
169    /// This lives behind `Arc<Mutex<_>>` so stores can clone the `Arc` and
170    /// obtain an `OwnedMutexGuard<Connection>` without holding the outer
171    /// `connections` mutex across await points.
172    pub write_connection: Arc<Mutex<Connection>>,
173}
174
175/// Close a store by taking its connections out.
176///
177/// After this returns, any new call to `read()` or `write()` through the
178/// store will fail with [`crate::error::Error::StoreClosed`] until
179/// [`reopen_connections`] is called.
180///
181/// Idempotent: if the store is already closed this is a no-op.
182pub(crate) async fn close_connections(connections: &Mutex<Option<SqliteConnections>>, label: &str) {
183    let mut guard = connections.lock().await;
184    let Some(conns) = guard.take() else {
185        // Already closed — idempotent.
186        return;
187    };
188
189    let SqliteConnections { pool, write_connection } = conns;
190
191    // Close the pool. Idle read connections are dropped immediately;
192    // in-flight reads complete and their connections are discarded (not
193    // recycled) on release. New pool.get() calls return PoolError::Closed.
194    pool.close();
195
196    let status = pool.status();
197    info!(
198        size = status.size,
199        max_size = status.max_size,
200        available = status.available,
201        "{label} pause: pool closed"
202    );
203
204    // Close the write connection: wait for any in-flight write to finish,
205    // run a WAL checkpoint, then drop on a blocking thread.
206    close_connection(write_connection).await;
207
208    let status = pool.status();
209    info!(
210        size = status.size,
211        max_size = status.max_size,
212        available = status.available,
213        "{label} pause: write connection released"
214    );
215
216    // Wait for any in-flight read connections to drain.
217    // The write connection has already been released above, so
218    // pool.status().size == 0 now correctly means every connection is gone
219    // and no SQLite file locks are held.
220    let deadline = tokio::time::Instant::now() + Duration::from_secs(5);
221    while pool.status().size > 0 {
222        if tokio::time::Instant::now() >= deadline {
223            let status = pool.status();
224            warn!(
225                size = status.size,
226                max_size = status.max_size,
227                available = status.available,
228                "Timed out waiting for SQLite pool connections to drain"
229            );
230            break;
231        }
232        tokio::time::sleep(Duration::from_millis(50)).await;
233    }
234}
235
236/// Resume a store by rebuilding its connections.
237///
238/// Idempotent: if the store is already active this is a no-op.
239pub(crate) async fn reopen_connections(
240    connections: &Mutex<Option<SqliteConnections>>,
241    db_path: PathBuf,
242    pool_config: PoolConfig,
243    runtime_config: crate::RuntimeConfig,
244) -> crate::error::Result<()> {
245    use crate::utils::SqliteAsyncConnExt as _;
246
247    let mut guard = connections.lock().await;
248    if guard.is_some() {
249        // Not closed — idempotent.
250        return Ok(());
251    }
252
253    // Rebuild the pool (connections are created lazily on first get()).
254    let pool =
255        Pool::builder(Manager::new(db_path)).config(pool_config).runtime(RUNTIME).build().map_err(
256            |e| crate::error::Error::InvalidData {
257                details: format!("Failed to rebuild connection pool: {e}"),
258            },
259        )?;
260
261    let write_conn = pool.get().await?;
262    // Re-apply runtime config (WAL mode, busy timeout, etc.)
263    write_conn.apply_runtime_config(runtime_config).await?;
264
265    *guard = Some(SqliteConnections { pool, write_connection: Arc::new(Mutex::new(write_conn)) });
266
267    Ok(())
268}