Skip to main content

bones_core/db/
mod.rs

1//! `SQLite` projection database utilities.
2//!
3//! Runtime defaults are intentionally conservative:
4//! - `journal_mode = WAL` to allow concurrent readers while writers append
5//! - `busy_timeout = 5s` to reduce transient lock failures under contention
6//! - `foreign_keys = ON` to protect relational integrity in projection tables
7
8pub mod fts;
9pub mod incremental;
10pub mod migrations;
11pub mod project;
12pub mod query;
13pub mod rebuild;
14pub mod schema;
15
16use anyhow::{Context, Result};
17use rusqlite::Connection;
18use std::{path::Path, path::PathBuf, time::Duration};
19use tracing::debug;
20
21/// Busy timeout used for projection DB connections.
22pub const DEFAULT_BUSY_TIMEOUT: Duration = Duration::from_secs(5);
23
24const PROJECTION_DIRTY_MARKER: &str = "cache/projection.dirty";
25
26/// Open (or create) the projection `SQLite` database, apply runtime pragmas,
27/// and migrate schema to the latest version.
28///
29/// # Errors
30///
31/// Returns an error if opening/configuring/migrating the database fails.
32pub fn open_projection(path: &Path) -> Result<Connection> {
33    if let Some(parent) = path.parent() {
34        std::fs::create_dir_all(parent)
35            .with_context(|| format!("create projection db directory {}", parent.display()))?;
36    }
37
38    if let Err(err) = bones_sqlite_vec::register_auto_extension() {
39        debug!(%err, "sqlite-vec auto-extension unavailable");
40    }
41
42    let mut conn = Connection::open(path)
43        .with_context(|| format!("open projection database {}", path.display()))?;
44
45    configure_connection(&conn).context("configure sqlite pragmas")?;
46    migrations::migrate(&mut conn).context("apply projection migrations")?;
47
48    Ok(conn)
49}
50
51/// Ensure the projection database exists and is up-to-date.
52///
53/// If the database is missing, corrupt, or behind the event log, an
54/// incremental apply is triggered automatically. Returns `None` only if
55/// the events directory itself does not exist (no bones project).
56///
57/// This is the recommended entry point for read commands — it eliminates
58/// the need for users to run `bn admin rebuild` manually.
59///
60/// # Arguments
61///
62/// * `bones_dir` — Path to the `.bones/` directory.
63///
64/// # Errors
65///
66/// Returns an error if the rebuild or database open fails.
67pub fn ensure_projection(bones_dir: &Path) -> Result<Option<Connection>> {
68    let events_dir = bones_dir.join("events");
69    if !events_dir.is_dir() {
70        return Ok(None);
71    }
72
73    let db_path = bones_dir.join("bones.db");
74    let dirty_marker = projection_dirty_marker_path(bones_dir);
75    let marker_exists = dirty_marker.exists();
76
77    let needs_rebuild = projection_needs_rebuild(bones_dir, &events_dir, &db_path, marker_exists)?;
78
79    if needs_rebuild {
80        debug!("projection stale or missing, running incremental rebuild");
81        incremental::incremental_apply(&events_dir, &db_path, false)
82            .context("auto-rebuild projection")?;
83        if dirty_marker.exists() {
84            let _ = std::fs::remove_file(&dirty_marker);
85        }
86    }
87
88    // Re-open after potential rebuild (raw to avoid recursion).
89    query::try_open_projection_raw(&db_path)
90}
91
92fn projection_needs_rebuild(
93    bones_dir: &Path,
94    events_dir: &Path,
95    db_path: &Path,
96    marker_exists: bool,
97) -> Result<bool> {
98    if marker_exists {
99        return Ok(true);
100    }
101
102    let Some(conn) = query::try_open_projection_raw(db_path)? else {
103        return Ok(true);
104    };
105
106    let (offset, hash) = query::get_projection_cursor(&conn).unwrap_or((0, None));
107    if offset == 0 && hash.is_none() {
108        return Ok(true);
109    }
110
111    let (total_bytes, last_hash) =
112        incremental::event_log_cursor(events_dir).context("read event log cursor")?;
113    let cursor = usize::try_from(offset).unwrap_or(usize::MAX);
114    let stale = total_bytes != cursor || hash != last_hash;
115    if stale {
116        debug!(
117            cursor,
118            total_bytes,
119            cursor_hash = ?hash,
120            last_hash = ?last_hash,
121            bones_dir = %bones_dir.display(),
122            "projection cursor drift detected"
123        );
124    }
125
126    Ok(stale)
127}
128
129fn configure_connection(conn: &Connection) -> anyhow::Result<()> {
130    conn.pragma_update(None, "foreign_keys", "ON")
131        .context("PRAGMA foreign_keys = ON")?;
132    conn.pragma_update(None, "synchronous", "NORMAL")
133        .context("PRAGMA synchronous = NORMAL")?;
134    let _journal_mode: String = conn
135        .query_row("PRAGMA journal_mode = WAL", [], |row| row.get(0))
136        .context("PRAGMA journal_mode = WAL")?;
137    conn.busy_timeout(DEFAULT_BUSY_TIMEOUT)
138        .context("busy_timeout")?;
139    Ok(())
140}
141
142/// Compute the marker path that signals projection drift.
143#[must_use]
144pub fn projection_dirty_marker_path(bones_dir: &Path) -> PathBuf {
145    bones_dir.join(PROJECTION_DIRTY_MARKER)
146}
147
148/// Mark projection state as dirty so read paths trigger deterministic recovery.
149///
150/// # Errors
151///
152/// Returns an error if the marker directory cannot be created or marker file
153/// cannot be written.
154pub fn mark_projection_dirty(bones_dir: &Path, reason: &str) -> Result<()> {
155    let marker = projection_dirty_marker_path(bones_dir);
156    if let Some(parent) = marker.parent() {
157        std::fs::create_dir_all(parent)
158            .with_context(|| format!("create projection marker dir {}", parent.display()))?;
159    }
160
161    let ts = std::time::SystemTime::now()
162        .duration_since(std::time::UNIX_EPOCH)
163        .unwrap_or_default()
164        .as_micros();
165    std::fs::write(&marker, format!("{ts} {reason}\n"))
166        .with_context(|| format!("write projection marker {}", marker.display()))?;
167    Ok(())
168}
169
170/// Mark projection dirty by resolving the active database path from a connection.
171///
172/// # Errors
173///
174/// Returns an error if database metadata cannot be read or if writing the
175/// marker file fails after locating a `.bones` database path.
176pub fn mark_projection_dirty_from_connection(conn: &Connection, reason: &str) -> Result<()> {
177    let mut stmt = conn
178        .prepare("PRAGMA database_list")
179        .context("prepare PRAGMA database_list")?;
180    let mut rows = stmt.query([]).context("query PRAGMA database_list")?;
181
182    while let Some(row) = rows.next().context("iterate PRAGMA database_list")? {
183        let name: String = row.get(1).context("read database_list name")?;
184        if name != "main" {
185            continue;
186        }
187        let path: String = row.get(2).context("read database_list path")?;
188        if path.is_empty() {
189            return Ok(());
190        }
191        if let Some(bones_dir) = std::path::Path::new(&path).parent()
192            && bones_dir.ends_with(".bones")
193        {
194            return mark_projection_dirty(bones_dir, reason);
195        }
196    }
197
198    Ok(())
199}
200
201#[cfg(test)]
202mod tests {
203    use super::{DEFAULT_BUSY_TIMEOUT, open_projection};
204    use crate::db::migrations;
205    use crate::db::{ensure_projection, mark_projection_dirty, projection_dirty_marker_path};
206    use crate::event::Event;
207    use crate::event::data::{CreateData, EventData};
208    use crate::event::types::EventType;
209    use crate::event::writer;
210    use crate::model::item::{Kind, Urgency};
211    use crate::model::item_id::ItemId;
212    use crate::shard::ShardManager;
213    use std::collections::BTreeMap;
214    use tempfile::TempDir;
215
216    fn temp_db_path() -> (TempDir, std::path::PathBuf) {
217        let dir = tempfile::tempdir().expect("create temp dir");
218        let path = dir.path().join("bones-projection.sqlite3");
219        (dir, path)
220    }
221
222    fn make_create(item_id: &str, title: &str, ts: i64) -> Event {
223        Event {
224            wall_ts_us: ts,
225            agent: "test-agent".to_string(),
226            itc: "itc:AQ".to_string(),
227            parents: vec![],
228            event_type: EventType::Create,
229            item_id: ItemId::new_unchecked(item_id),
230            data: EventData::Create(CreateData {
231                title: title.to_string(),
232                kind: Kind::Task,
233                size: None,
234                urgency: Urgency::Default,
235                labels: vec![],
236                parent: None,
237                causation: None,
238                description: None,
239                extra: BTreeMap::new(),
240            }),
241            event_hash: String::new(),
242        }
243    }
244
245    #[test]
246    fn open_projection_sets_wal_busy_timeout_and_fk() {
247        let (_dir, path) = temp_db_path();
248        let conn = open_projection(&path).expect("open projection db");
249
250        let journal_mode: String = conn
251            .pragma_query_value(None, "journal_mode", |row| row.get(0))
252            .expect("query journal_mode");
253        assert_eq!(journal_mode.to_ascii_lowercase(), "wal");
254
255        let busy_timeout_ms: u64 = conn
256            .pragma_query_value(None, "busy_timeout", |row| row.get(0))
257            .expect("query busy_timeout");
258        assert_eq!(
259            u128::from(busy_timeout_ms),
260            DEFAULT_BUSY_TIMEOUT.as_millis()
261        );
262
263        let foreign_keys: i64 = conn
264            .pragma_query_value(None, "foreign_keys", |row| row.get(0))
265            .expect("query foreign_keys");
266        assert_eq!(foreign_keys, 1);
267    }
268
269    #[test]
270    fn open_projection_runs_migrations() {
271        let (_dir, path) = temp_db_path();
272        let conn = open_projection(&path).expect("open projection db");
273
274        let version = migrations::current_schema_version(&conn).expect("schema version query");
275        assert_eq!(version, migrations::LATEST_SCHEMA_VERSION);
276
277        let projection_version: i64 = conn
278            .query_row(
279                "SELECT schema_version FROM projection_meta WHERE id = 1",
280                [],
281                |row| row.get(0),
282            )
283            .expect("projection_meta schema version");
284        assert_eq!(
285            projection_version,
286            i64::from(migrations::LATEST_SCHEMA_VERSION)
287        );
288    }
289
290    #[test]
291    fn mark_projection_dirty_creates_marker_file() {
292        let dir = tempfile::tempdir().expect("create temp dir");
293        let bones_dir = dir.path().join(".bones");
294        std::fs::create_dir_all(bones_dir.join("events")).expect("events dir");
295
296        mark_projection_dirty(&bones_dir, "test reason").expect("mark projection dirty");
297
298        let marker = projection_dirty_marker_path(&bones_dir);
299        assert!(marker.exists(), "dirty marker should be created");
300    }
301
302    #[test]
303    fn ensure_projection_rebuild_clears_dirty_marker() {
304        let dir = tempfile::tempdir().expect("create temp dir");
305        let bones_dir = dir.path().join(".bones");
306        std::fs::create_dir_all(bones_dir.join("events")).expect("events dir");
307        std::fs::create_dir_all(bones_dir.join("cache")).expect("cache dir");
308
309        let shard_mgr = ShardManager::new(&bones_dir);
310        shard_mgr.init().expect("init shard");
311        let (year, month) = shard_mgr
312            .active_shard()
313            .expect("active shard")
314            .expect("some shard");
315
316        let mut create = Event {
317            wall_ts_us: 1_700_000_000_000_000,
318            agent: "test-agent".to_string(),
319            itc: "itc:AQ".to_string(),
320            parents: vec![],
321            event_type: EventType::Create,
322            item_id: ItemId::new_unchecked("bn-marker"),
323            data: EventData::Create(CreateData {
324                title: "marker test".to_string(),
325                kind: Kind::Task,
326                size: None,
327                urgency: Urgency::Default,
328                labels: vec![],
329                parent: None,
330                causation: None,
331                description: None,
332                extra: BTreeMap::new(),
333            }),
334            event_hash: String::new(),
335        };
336        let line = writer::write_event(&mut create).expect("serialize create event");
337        shard_mgr
338            .append_raw(year, month, &line)
339            .expect("append create event");
340
341        mark_projection_dirty(&bones_dir, "simulate projection failure").expect("mark dirty");
342        let marker = projection_dirty_marker_path(&bones_dir);
343        assert!(marker.exists(), "precondition: marker exists");
344
345        let conn = ensure_projection(&bones_dir)
346            .expect("ensure projection")
347            .expect("projection connection");
348        let item_count: i64 = conn
349            .query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
350            .expect("count items");
351        assert_eq!(item_count, 1);
352        assert!(
353            !marker.exists(),
354            "dirty marker should be cleared after successful recovery"
355        );
356    }
357
358    #[test]
359    fn ensure_projection_rebuilds_when_log_hash_changes_without_size_change() {
360        let dir = tempfile::tempdir().expect("create temp dir");
361        let bones_dir = dir.path().join(".bones");
362        std::fs::create_dir_all(bones_dir.join("events")).expect("events dir");
363
364        let shard_mgr = ShardManager::new(&bones_dir);
365        shard_mgr.init().expect("init shard");
366        let (year, month) = shard_mgr
367            .active_shard()
368            .expect("active shard")
369            .expect("some shard");
370
371        let mut first = make_create("bn-alpha", "first title", 1_700_000_000_000_000);
372        let first_line = writer::write_event(&mut first).expect("serialize first create");
373        shard_mgr
374            .append_raw(year, month, &first_line)
375            .expect("append first event");
376
377        let conn = ensure_projection(&bones_dir)
378            .expect("ensure projection")
379            .expect("projection connection");
380        let first_count: i64 = conn
381            .query_row(
382                "SELECT COUNT(*) FROM items WHERE item_id = 'bn-alpha'",
383                [],
384                |row| row.get(0),
385            )
386            .expect("count first item");
387        assert_eq!(first_count, 1);
388        drop(conn);
389
390        let mut second = make_create("bn-bravo", "other title", 1_700_000_000_000_000);
391        let second_line = writer::write_event(&mut second).expect("serialize second create");
392        assert_ne!(first.event_hash, second.event_hash);
393        assert_eq!(
394            first_line.len(),
395            second_line.len(),
396            "test setup needs a same-length event-log rewrite"
397        );
398
399        let shard_path = shard_mgr.shard_path(year, month);
400        let original_content = std::fs::read_to_string(&shard_path).expect("read shard");
401        let event_start = original_content
402            .rfind(&first_line)
403            .expect("original event line present");
404        let replacement = format!("{}{}", &original_content[..event_start], second_line);
405        assert_eq!(original_content.len(), replacement.len());
406        std::fs::write(&shard_path, replacement).expect("rewrite shard with same byte length");
407
408        let conn = ensure_projection(&bones_dir)
409            .expect("ensure projection after rewrite")
410            .expect("projection connection");
411        let counts: (i64, i64) = conn
412            .query_row(
413                "SELECT
414                    SUM(CASE WHEN item_id = 'bn-alpha' THEN 1 ELSE 0 END),
415                    SUM(CASE WHEN item_id = 'bn-bravo' THEN 1 ELSE 0 END)
416                 FROM items",
417                [],
418                |row| Ok((row.get(0)?, row.get(1)?)),
419            )
420            .expect("count rewritten items");
421        assert_eq!(counts, (0, 1));
422    }
423}