use anyhow::{Context, Result};
use deadpool_sqlite::{Config, Hook, InteractError, Pool as DeadPool, Runtime};
use rusqlite_migration::{M, Migrations};
use std::path::Path;
use std::sync::OnceLock;
use std::sync::atomic::AtomicBool;
static DB_INTEGRITY_FAILED: AtomicBool = AtomicBool::new(false);
static GLOBAL_POOL: OnceLock<Pool> = OnceLock::new();
pub fn global_pool() -> Option<&'static Pool> {
GLOBAL_POOL.get()
}
pub fn db_integrity_failed() -> bool {
DB_INTEGRITY_FAILED.swap(false, std::sync::atomic::Ordering::Relaxed)
}
pub type Pool = DeadPool;
pub fn interact_err(e: InteractError) -> anyhow::Error {
anyhow::anyhow!("Database interact error: {}", e)
}
pub struct Database {
pub(crate) pool: Pool,
}
fn apply_pragmas(conn: &rusqlite::Connection) -> std::result::Result<(), rusqlite::Error> {
conn.execute_batch(
"PRAGMA journal_mode = WAL;
PRAGMA busy_timeout = 30000;
PRAGMA synchronous = NORMAL;
PRAGMA cache_size = -65536;",
)
}
fn apply_pragmas_in_memory(
conn: &rusqlite::Connection,
) -> std::result::Result<(), rusqlite::Error> {
conn.execute_batch(
"PRAGMA journal_mode = MEMORY;
PRAGMA busy_timeout = 30000;
PRAGMA synchronous = OFF;
PRAGMA foreign_keys = ON;",
)
}
impl Database {
pub async fn connect<P: AsRef<Path>>(path: P) -> Result<Self> {
let path = path.as_ref();
if let Some(parent) = path.parent()
&& !parent.exists()
{
tracing::debug!("Creating database directory: {:?}", parent);
std::fs::create_dir_all(parent)
.with_context(|| format!("Failed to create database directory: {:?}", parent))?;
}
let path_str = path.to_string_lossy().into_owned();
let pool = Config::new(&path_str)
.builder(Runtime::Tokio1)
.context("Failed to build pool config")?
.max_size(16)
.post_create(Hook::async_fn(|conn, _| {
Box::pin(async move {
conn.interact(|conn| apply_pragmas(conn))
.await
.map_err(|e| deadpool_sqlite::HookError::Message(e.to_string().into()))?
.map_err(|e| deadpool_sqlite::HookError::Message(e.to_string().into()))?;
Ok(())
})
}))
.build()
.context("Failed to create connection pool")?;
tracing::info!(
"Connected to database: {} (WAL, pool=16, busy_timeout=30s)",
path_str
);
let _ = GLOBAL_POOL.set(pool.clone());
Ok(Self { pool })
}
pub async fn connect_in_memory() -> Result<Self> {
let id = uuid::Uuid::new_v4().simple().to_string();
let uri = format!("file:mem_{}?mode=memory&cache=shared", id);
let pool = Config::new(uri)
.builder(Runtime::Tokio1)
.context("Failed to build pool config")?
.max_size(1)
.post_create(Hook::async_fn(|conn, _| {
Box::pin(async move {
conn.interact(|conn| apply_pragmas_in_memory(conn))
.await
.map_err(|e| deadpool_sqlite::HookError::Message(e.to_string().into()))?
.map_err(|e| deadpool_sqlite::HookError::Message(e.to_string().into()))?;
Ok(())
})
}))
.build()
.context("Failed to create in-memory pool")?;
tracing::debug!("Connected to in-memory database");
Ok(Self { pool })
}
pub fn pool(&self) -> &Pool {
&self.pool
}
pub fn is_connected(&self) -> bool {
self.pool.status().size > 0 || self.pool.status().max_size > 0
}
pub const MIGRATION_COUNT: usize = 28;
pub async fn run_migrations(&self) -> Result<()> {
let migrations = Migrations::new(vec![
M::up(include_str!(
"../migrations/20251028000001_initial_schema.sql"
)),
M::up(include_str!(
"../migrations/20251028000002_modernize_schema.sql"
)),
M::up(include_str!("../migrations/20251111000001_add_plans.sql")),
M::up(include_str!(
"../migrations/20251113000001_add_plan_enhancements.sql"
)),
M::up(include_str!(
"../migrations/20260224000001_add_a2a_tasks.sql"
)),
M::up(include_str!(
"../migrations/20260226000001_add_session_provider.sql"
)),
M::up(include_str!(
"../migrations/20260305000001_add_channel_messages.sql"
)),
M::up(include_str!(
"../migrations/20260305000002_add_cron_jobs.sql"
)),
M::up(include_str!(
"../migrations/20260306000001_add_usage_ledger.sql"
)),
M::up(include_str!(
"../migrations/20260307000001_add_session_working_dir.sql"
)),
M::up(include_str!(
"../migrations/20260308000001_add_pending_requests.sql"
)),
M::up(include_str!(
"../migrations/20260330000001_pending_requests_channel_chat_id.sql"
)),
M::up(include_str!(
"../migrations/20260402000001_add_cron_job_runs.sql"
)),
M::up(include_str!(
"../migrations/20260412000001_add_feedback_ledger.sql"
)),
M::up(include_str!(
"../migrations/20260415000001_add_tool_executions.sql"
)),
M::up(include_str!(
"../migrations/20260415000002_add_session_category.sql"
)),
M::up(include_str!(
"../migrations/20260415000003_fix_tool_executions_schema.sql"
)),
M::up(include_str!(
"../migrations/20260416000001_add_message_input_tokens.sql"
)),
M::up(include_str!(
"../migrations/20260421000001_add_message_thinking.sql"
)),
M::up(include_str!(
"../migrations/20260426000001_add_recent_paths.sql"
)),
M::up(include_str!(
"../migrations/20260507000001_add_cron_deliver_api_key.sql"
)),
M::up(include_str!(
"../migrations/20260517000001_cron_jobs_text_recast.sql"
)),
M::up(include_str!(
"../migrations/20260522000001_add_auto_title_attempted.sql"
)),
M::up(include_str!(
"../migrations/20260529000001_add_channel_thread_id.sql"
)),
M::up(include_str!(
"../migrations/20260606000001_add_message_cache_tokens.sql"
)),
M::up(include_str!(
"../migrations/20260608000001_add_cron_job_profile.sql"
)),
M::up(include_str!(
"../migrations/20260614000001_add_projects_and_file_size.sql"
)),
M::up(include_str!(
"../migrations/20260626000001_add_goal_state.sql"
)),
]);
self.pool
.get()
.await
.context("Failed to get connection for migrations")?
.interact(move |conn| {
let user_version: i64 =
conn.pragma_query_value(None, "user_version", |r| r.get(0))?;
let has_sqlx: bool = conn
.prepare(
"SELECT COUNT(*) FROM sqlite_master \
WHERE type='table' AND name='_sqlx_migrations'",
)?
.query_row([], |r| r.get::<_, i64>(0))
.map(|c| c > 0)?;
if has_sqlx && user_version == 0 {
tracing::info!(
"Detected sqlx-managed database — stamping migration version to {}",
Self::MIGRATION_COUNT
);
conn.pragma_update(None, "user_version", Self::MIGRATION_COUNT as i64)?;
}
migrations.to_latest(conn)
})
.await
.map_err(interact_err)?
.context("Failed to run database migrations")?;
tracing::info!("Database migrations completed");
let integrity_ok = self
.pool
.get()
.await
.context("Failed to get connection for integrity check")?
.interact(|conn| -> rusqlite::Result<bool> {
let result: String =
conn.pragma_query_value(None, "integrity_check", |r| r.get(0))?;
Ok(result == "ok")
})
.await
.map_err(interact_err)?
.context("Failed to run integrity check")?;
if !integrity_ok {
tracing::error!(
"Database integrity check FAILED — data may be corrupted. \
Consider backing up and recreating the database."
);
DB_INTEGRITY_FAILED.store(true, std::sync::atomic::Ordering::Relaxed);
} else {
tracing::debug!("Database integrity check passed");
}
Ok(())
}
pub fn close(&self) {
self.pool.close();
tracing::info!("Database connection closed");
}
pub async fn get_user_version(&self) -> Result<i64> {
let version = self
.pool
.get()
.await
.context("Failed to get connection for user_version")?
.interact(|conn| conn.pragma_query_value(None, "user_version", |r| r.get(0)))
.await
.map_err(interact_err)?
.context("Failed to read user_version")?;
Ok(version)
}
}
pub trait PoolExt {
fn is_connected(&self) -> bool;
}
impl PoolExt for Pool {
fn is_connected(&self) -> bool {
self.status().size > 0 || self.status().max_size > 0
}
}