use std::{convert::Infallible, path::PathBuf, sync::Arc, time::Duration};
pub use deadpool::managed::reexports::*;
use deadpool::managed::{self, Metrics, PoolConfig, RecycleError};
use deadpool_sync::SyncWrapper;
use tokio::sync::Mutex;
use tracing::{info, warn};
pub const RUNTIME: Runtime = Runtime::Tokio1;
deadpool::managed_reexports!(
"matrix-sdk-sqlite",
Manager,
managed::Object<Manager>,
rusqlite::Error,
Infallible
);
pub type Connection = Object;
#[derive(Debug)]
pub struct Manager {
pub(crate) database_path: PathBuf,
}
impl Manager {
#[must_use]
pub fn new(database_path: PathBuf) -> Self {
Self { database_path }
}
}
impl managed::Manager for Manager {
type Type = SyncWrapper<rusqlite::Connection>;
type Error = rusqlite::Error;
async fn create(&self) -> Result<Self::Type, Self::Error> {
let path = self.database_path.clone();
SyncWrapper::new(RUNTIME, move || rusqlite::Connection::open(path)).await
}
async fn recycle(
&self,
conn: &mut Self::Type,
_: &Metrics,
) -> managed::RecycleResult<Self::Error> {
if conn.is_mutex_poisoned() {
return Err(RecycleError::Message(
"Mutex is poisoned. Connection is considered unusable.".into(),
));
}
Ok(())
}
}
pub async fn close_connection(write_connection: Arc<Mutex<Connection>>) {
let guard = write_connection.lock().await;
let _ = guard
.interact(|raw| {
raw.execute_batch("PRAGMA locking_mode = NORMAL; PRAGMA wal_checkpoint(TRUNCATE);")
.ok();
})
.await;
drop(guard);
let _ = tokio::task::spawn_blocking(move || drop(write_connection)).await;
}
pub(crate) struct SqliteConnections {
pub pool: Pool,
pub write_connection: Arc<Mutex<Connection>>,
}
pub(crate) async fn close_connections(connections: &Mutex<Option<SqliteConnections>>, label: &str) {
let mut guard = connections.lock().await;
let Some(conns) = guard.take() else {
return;
};
let SqliteConnections { pool, write_connection } = conns;
pool.close();
let status = pool.status();
info!(
size = status.size,
max_size = status.max_size,
available = status.available,
"{label} pause: pool closed"
);
close_connection(write_connection).await;
let status = pool.status();
info!(
size = status.size,
max_size = status.max_size,
available = status.available,
"{label} pause: write connection released"
);
let deadline = tokio::time::Instant::now() + Duration::from_secs(5);
while pool.status().size > 0 {
if tokio::time::Instant::now() >= deadline {
let status = pool.status();
warn!(
size = status.size,
max_size = status.max_size,
available = status.available,
"Timed out waiting for SQLite pool connections to drain"
);
break;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
}
pub(crate) async fn reopen_connections(
connections: &Mutex<Option<SqliteConnections>>,
db_path: PathBuf,
pool_config: PoolConfig,
runtime_config: crate::RuntimeConfig,
) -> crate::error::Result<()> {
use crate::utils::SqliteAsyncConnExt as _;
let mut guard = connections.lock().await;
if guard.is_some() {
return Ok(());
}
let pool =
Pool::builder(Manager::new(db_path)).config(pool_config).runtime(RUNTIME).build().map_err(
|e| crate::error::Error::InvalidData {
details: format!("Failed to rebuild connection pool: {e}"),
},
)?;
let write_conn = pool.get().await?;
write_conn.apply_runtime_config(runtime_config).await?;
*guard = Some(SqliteConnections { pool, write_connection: Arc::new(Mutex::new(write_conn)) });
Ok(())
}