Skip to main content

bones_core/db/
mod.rs

1//! `SQLite` projection database utilities.
2//!
3//! Runtime defaults are intentionally conservative:
4//! - `journal_mode = WAL` to allow concurrent readers while writers append
5//! - `busy_timeout = 5s` to reduce transient lock failures under contention
6//! - `foreign_keys = ON` to protect relational integrity in projection tables
7
8pub mod fts;
9pub mod incremental;
10pub mod migrations;
11pub mod project;
12pub mod query;
13pub mod rebuild;
14pub mod schema;
15
16use anyhow::{Context, Result};
17use rusqlite::Connection;
18use std::{path::Path, time::Duration};
19use tracing::debug;
20
21/// Busy timeout used for projection DB connections.
22pub const DEFAULT_BUSY_TIMEOUT: Duration = Duration::from_secs(5);
23
24/// Open (or create) the projection `SQLite` database, apply runtime pragmas,
25/// and migrate schema to the latest version.
26///
27/// # Errors
28///
29/// Returns an error if opening/configuring/migrating the database fails.
30pub fn open_projection(path: &Path) -> Result<Connection> {
31    if let Some(parent) = path.parent() {
32        std::fs::create_dir_all(parent)
33            .with_context(|| format!("create projection db directory {}", parent.display()))?;
34    }
35
36    if let Err(err) = bones_sqlite_vec::register_auto_extension() {
37        debug!(%err, "sqlite-vec auto-extension unavailable");
38    }
39
40    let mut conn = Connection::open(path)
41        .with_context(|| format!("open projection database {}", path.display()))?;
42
43    configure_connection(&conn).context("configure sqlite pragmas")?;
44    migrations::migrate(&mut conn).context("apply projection migrations")?;
45
46    Ok(conn)
47}
48
49/// Ensure the projection database exists and is up-to-date.
50///
51/// If the database is missing, corrupt, or behind the event log, an
52/// incremental apply is triggered automatically. Returns `None` only if
53/// the events directory itself does not exist (no bones project).
54///
55/// This is the recommended entry point for read commands — it eliminates
56/// the need for users to run `bn admin rebuild` manually.
57///
58/// # Arguments
59///
60/// * `bones_dir` — Path to the `.bones/` directory.
61///
62/// # Errors
63///
64/// Returns an error if the rebuild or database open fails.
65pub fn ensure_projection(bones_dir: &Path) -> Result<Option<Connection>> {
66    let events_dir = bones_dir.join("events");
67    if !events_dir.is_dir() {
68        return Ok(None);
69    }
70
71    let db_path = bones_dir.join("bones.db");
72
73    // Try opening existing projection (raw to avoid recursion).
74    let needs_rebuild = query::try_open_projection_raw(&db_path)?.is_none_or(|conn| {
75        // Check if projection is current by comparing cursor against
76        // shard content. If cursor is at 0 with no hash, the DB was
77        // freshly created and needs a full rebuild.
78        let (offset, hash) = query::get_projection_cursor(&conn).unwrap_or((0, None));
79        if offset == 0 && hash.is_none() {
80            true
81        } else {
82            // Check if cursor and shard content are out of sync (new events beyond cursor, or cursor overshoots after sync).
83            let mgr = crate::shard::ShardManager::new(bones_dir);
84            let total_bytes = mgr.total_content_len().unwrap_or(0);
85            let cursor = usize::try_from(offset).unwrap_or(0);
86            total_bytes != cursor
87        }
88    });
89
90    if needs_rebuild {
91        debug!("projection stale or missing, running incremental rebuild");
92        incremental::incremental_apply(&events_dir, &db_path, false)
93            .context("auto-rebuild projection")?;
94    }
95
96    // Re-open after potential rebuild (raw to avoid recursion).
97    query::try_open_projection_raw(&db_path)
98}
99
100fn configure_connection(conn: &Connection) -> anyhow::Result<()> {
101    conn.pragma_update(None, "foreign_keys", "ON")
102        .context("PRAGMA foreign_keys = ON")?;
103    conn.pragma_update(None, "synchronous", "NORMAL")
104        .context("PRAGMA synchronous = NORMAL")?;
105    let _journal_mode: String = conn
106        .query_row("PRAGMA journal_mode = WAL", [], |row| row.get(0))
107        .context("PRAGMA journal_mode = WAL")?;
108    conn.busy_timeout(DEFAULT_BUSY_TIMEOUT)
109        .context("busy_timeout")?;
110    Ok(())
111}
112
113#[cfg(test)]
114mod tests {
115    use super::{DEFAULT_BUSY_TIMEOUT, open_projection};
116    use crate::db::migrations;
117    use tempfile::TempDir;
118
119    fn temp_db_path() -> (TempDir, std::path::PathBuf) {
120        let dir = tempfile::tempdir().expect("create temp dir");
121        let path = dir.path().join("bones-projection.sqlite3");
122        (dir, path)
123    }
124
125    #[test]
126    fn open_projection_sets_wal_busy_timeout_and_fk() {
127        let (_dir, path) = temp_db_path();
128        let conn = open_projection(&path).expect("open projection db");
129
130        let journal_mode: String = conn
131            .pragma_query_value(None, "journal_mode", |row| row.get(0))
132            .expect("query journal_mode");
133        assert_eq!(journal_mode.to_ascii_lowercase(), "wal");
134
135        let busy_timeout_ms: u64 = conn
136            .pragma_query_value(None, "busy_timeout", |row| row.get(0))
137            .expect("query busy_timeout");
138        assert_eq!(
139            u128::from(busy_timeout_ms),
140            DEFAULT_BUSY_TIMEOUT.as_millis()
141        );
142
143        let foreign_keys: i64 = conn
144            .pragma_query_value(None, "foreign_keys", |row| row.get(0))
145            .expect("query foreign_keys");
146        assert_eq!(foreign_keys, 1);
147    }
148
149    #[test]
150    fn open_projection_runs_migrations() {
151        let (_dir, path) = temp_db_path();
152        let conn = open_projection(&path).expect("open projection db");
153
154        let version = migrations::current_schema_version(&conn).expect("schema version query");
155        assert_eq!(version, migrations::LATEST_SCHEMA_VERSION);
156
157        let projection_version: i64 = conn
158            .query_row(
159                "SELECT schema_version FROM projection_meta WHERE id = 1",
160                [],
161                |row| row.get(0),
162            )
163            .expect("projection_meta schema version");
164        assert_eq!(
165            projection_version,
166            i64::from(migrations::LATEST_SCHEMA_VERSION)
167        );
168    }
169}