matrix_sdk_sqlite/connection.rs
1// Copyright 2025 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! An implementation of `deadpool` for `rusqlite`.
16//!
17//! Initially, we were using `deadpool-sqlite`, that is also using `rusqlite` as
18//! the SQLite interface. However, in the implementation of
19//! [`deadpool::managed::Manager`], when recycling an object (i.e. an SQLite
20//! connection), [a SQL query is run to detect whether the connection is still
21//! alive][connection-test]. It creates performance issues:
22//!
23//! 1. It runs a prepared SQL query, which has a non-negligle cost. Imagine each
24//! connection is used to run on average one query; when recycled, a second
25//! query was constantly run. Even if it's a simple query, it requires to
26//! prepare a statement, to run and to query it.
27//! 2. The SQL query was run in a blocking task. Indeed,
28//! `deadpool_runtime::spawn_blocking` is used (via
29//! `deadpool_sync::SyncWrapper::interact`), which includes [blocking the
30//! thread, acquiring a lock][spawn_blocking] etc. All this has more
31//! performance cost.
32//!
33//! Measures have shown it is a performance bottleneck for us, especially on
34//! Android. Why specifically on Android and not other systems? This is still
35//! unclear at the time of writing (2025-11-11), despites having spent several
36//! days digging and trying to find an answer to this question.
37//!
38//! We have tried to use another approach to test the aliveness of the
39//! connections without running queries. It has involved patching `rusqlite` to
40//! add more bindings to SQLite, and patching `deadpool` itself, but without any
41//! successful results.
42//!
43//! Finally, we have started questioning the reason of this test: why testing
44//! whether the connection was still alive? After all, there is no reason a
45//! connection should die in our case:
46//!
47//! - all connections are local,
48//! - all interactions are behind [WAL], which is local only,
49//! - even if for an unknown reason the connection died, using it next time
50//! would create an error… exactly what would happen when recycling the
51//! connection.
52//!
53//! Consequently, we have created a new implementation of `deadpool` for
54//! `rusqlite` that doesn't test the aliveness of the connections when recycled.
55//! We assume they are all alive.
56//!
57//! This implementation is, at the time of writing (2025-11-11):
58//!
59//! - 3.5 times faster on Android than `deadpool-sqlite`, removing the lock and
60//! thread contention entirely,
61//! - 2 times faster on iOS.
62//!
63//! [connection-test]: https://github.com/deadpool-rs/deadpool/blob/d6f7d58756f0cc7bdd1f3d54d820c1332d67e4d5/crates/deadpool-sqlite/src/lib.rs#L80-L100
64//! [spawn_blocking]: https://github.com/deadpool-rs/deadpool/blob/d6f7d58756f0cc7bdd1f3d54d820c1332d67e4d5/crates/deadpool-sync/src/lib.rs#L113-L131
65//! [WAL]: https://www.sqlite.org/wal.html
66
67use std::{convert::Infallible, path::PathBuf, sync::Arc, time::Duration};
68
69pub use deadpool::managed::reexports::*;
70use deadpool::managed::{self, Metrics, PoolConfig, RecycleError};
71use deadpool_sync::SyncWrapper;
72use tokio::sync::Mutex;
73use tracing::{info, warn};
74
75/// The default runtime used by `matrix-sdk-sqlite` for `deadpool`.
76pub const RUNTIME: Runtime = Runtime::Tokio1;
77
78deadpool::managed_reexports!(
79 "matrix-sdk-sqlite",
80 Manager,
81 managed::Object<Manager>,
82 rusqlite::Error,
83 Infallible
84);
85
86/// Type representing a connection to SQLite from the [`Pool`].
87pub type Connection = Object;
88
89/// [`Manager`][managed::Manager] for creating and recycling SQLite
90/// [`Connection`]s.
91#[derive(Debug)]
92pub struct Manager {
93 pub(crate) database_path: PathBuf,
94}
95
96impl Manager {
97 /// Creates a new [`Manager`] for a database.
98 #[must_use]
99 pub fn new(database_path: PathBuf) -> Self {
100 Self { database_path }
101 }
102}
103
104impl managed::Manager for Manager {
105 type Type = SyncWrapper<rusqlite::Connection>;
106 type Error = rusqlite::Error;
107
108 async fn create(&self) -> Result<Self::Type, Self::Error> {
109 let path = self.database_path.clone();
110 SyncWrapper::new(RUNTIME, move || rusqlite::Connection::open(path)).await
111 }
112
113 async fn recycle(
114 &self,
115 conn: &mut Self::Type,
116 _: &Metrics,
117 ) -> managed::RecycleResult<Self::Error> {
118 if conn.is_mutex_poisoned() {
119 return Err(RecycleError::Message(
120 "Mutex is poisoned. Connection is considered unusable.".into(),
121 ));
122 }
123
124 Ok(())
125 }
126}
127
128/// Gracefully close the store-owned write connection.
129///
130/// Callers are expected to remove the enclosing [`SqliteConnections`] from the
131/// store before calling this helper so no new write acquisitions can happen
132/// through the store API.
133///
134/// 1. Waits for any in-flight write to complete by acquiring the lock.
135/// 2. Runs a WAL checkpoint (TRUNCATE) to flush pending data to the main
136/// database file and release WAL locks.
137/// 3. Drops the store-owned `Arc` on a blocking thread.
138///
139/// This is still best effort: if another cloned `Arc` or an
140/// `OwnedMutexGuard<Connection>` is still alive elsewhere, the underlying
141/// SQLite connection may remain alive until that handle is dropped.
142pub async fn close_connection(write_connection: Arc<Mutex<Connection>>) {
143 // Acquire the lock to wait for any in-flight write to complete.
144 let guard = write_connection.lock().await;
145
146 // Flush WAL and release locks while we still own the connection.
147 let _ = guard
148 .interact(|raw| {
149 raw.execute_batch("PRAGMA locking_mode = NORMAL; PRAGMA wal_checkpoint(TRUNCATE);")
150 .ok();
151 })
152 .await;
153
154 drop(guard);
155
156 // Drop the store-owned Arc on a blocking thread.
157 let _ = tokio::task::spawn_blocking(move || drop(write_connection)).await;
158}
159
160/// Live database connections held by each SQLite store.
161///
162/// Wrapped in `Option<SqliteConnections>` guarded by a `Mutex` inside each
163/// store; `Some` means the store is active, `None` means it is closed.
164pub(crate) struct SqliteConnections {
165 /// The pool of read connections.
166 pub pool: Pool,
167 /// The dedicated write connection.
168 ///
169 /// This lives behind `Arc<Mutex<_>>` so stores can clone the `Arc` and
170 /// obtain an `OwnedMutexGuard<Connection>` without holding the outer
171 /// `connections` mutex across await points.
172 pub write_connection: Arc<Mutex<Connection>>,
173}
174
175/// Close a store by taking its connections out.
176///
177/// After this returns, any new call to `read()` or `write()` through the
178/// store will fail with [`crate::error::Error::StoreClosed`] until
179/// [`reopen_connections`] is called.
180///
181/// Idempotent: if the store is already closed this is a no-op.
182pub(crate) async fn close_connections(connections: &Mutex<Option<SqliteConnections>>, label: &str) {
183 let mut guard = connections.lock().await;
184 let Some(conns) = guard.take() else {
185 // Already closed — idempotent.
186 return;
187 };
188
189 let SqliteConnections { pool, write_connection } = conns;
190
191 // Close the pool. Idle read connections are dropped immediately;
192 // in-flight reads complete and their connections are discarded (not
193 // recycled) on release. New pool.get() calls return PoolError::Closed.
194 pool.close();
195
196 let status = pool.status();
197 info!(
198 size = status.size,
199 max_size = status.max_size,
200 available = status.available,
201 "{label} pause: pool closed"
202 );
203
204 // Close the write connection: wait for any in-flight write to finish,
205 // run a WAL checkpoint, then drop on a blocking thread.
206 close_connection(write_connection).await;
207
208 let status = pool.status();
209 info!(
210 size = status.size,
211 max_size = status.max_size,
212 available = status.available,
213 "{label} pause: write connection released"
214 );
215
216 // Wait for any in-flight read connections to drain.
217 // The write connection has already been released above, so
218 // pool.status().size == 0 now correctly means every connection is gone
219 // and no SQLite file locks are held.
220 let deadline = tokio::time::Instant::now() + Duration::from_secs(5);
221 while pool.status().size > 0 {
222 if tokio::time::Instant::now() >= deadline {
223 let status = pool.status();
224 warn!(
225 size = status.size,
226 max_size = status.max_size,
227 available = status.available,
228 "Timed out waiting for SQLite pool connections to drain"
229 );
230 break;
231 }
232 tokio::time::sleep(Duration::from_millis(50)).await;
233 }
234}
235
236/// Resume a store by rebuilding its connections.
237///
238/// Idempotent: if the store is already active this is a no-op.
239pub(crate) async fn reopen_connections(
240 connections: &Mutex<Option<SqliteConnections>>,
241 db_path: PathBuf,
242 pool_config: PoolConfig,
243 runtime_config: crate::RuntimeConfig,
244) -> crate::error::Result<()> {
245 use crate::utils::SqliteAsyncConnExt as _;
246
247 let mut guard = connections.lock().await;
248 if guard.is_some() {
249 // Not closed — idempotent.
250 return Ok(());
251 }
252
253 // Rebuild the pool (connections are created lazily on first get()).
254 let pool =
255 Pool::builder(Manager::new(db_path)).config(pool_config).runtime(RUNTIME).build().map_err(
256 |e| crate::error::Error::InvalidData {
257 details: format!("Failed to rebuild connection pool: {e}"),
258 },
259 )?;
260
261 let write_conn = pool.get().await?;
262 // Re-apply runtime config (WAL mode, busy timeout, etc.)
263 write_conn.apply_runtime_config(runtime_config).await?;
264
265 *guard = Some(SqliteConnections { pool, write_connection: Arc::new(Mutex::new(write_conn)) });
266
267 Ok(())
268}