1pub mod fts;
9pub mod incremental;
10pub mod migrations;
11pub mod project;
12pub mod query;
13pub mod rebuild;
14pub mod schema;
15
16use anyhow::{Context, Result};
17use rusqlite::Connection;
18use std::{path::Path, time::Duration};
19use tracing::debug;
20
21pub const DEFAULT_BUSY_TIMEOUT: Duration = Duration::from_secs(5);
23
24pub fn open_projection(path: &Path) -> Result<Connection> {
31 if let Some(parent) = path.parent() {
32 std::fs::create_dir_all(parent)
33 .with_context(|| format!("create projection db directory {}", parent.display()))?;
34 }
35
36 if let Err(err) = bones_sqlite_vec::register_auto_extension() {
37 debug!(%err, "sqlite-vec auto-extension unavailable");
38 }
39
40 let mut conn = Connection::open(path)
41 .with_context(|| format!("open projection database {}", path.display()))?;
42
43 configure_connection(&conn).context("configure sqlite pragmas")?;
44 migrations::migrate(&mut conn).context("apply projection migrations")?;
45
46 Ok(conn)
47}
48
49pub fn ensure_projection(bones_dir: &Path) -> Result<Option<Connection>> {
66 let events_dir = bones_dir.join("events");
67 if !events_dir.is_dir() {
68 return Ok(None);
69 }
70
71 let db_path = bones_dir.join("bones.db");
72
73 let needs_rebuild = query::try_open_projection_raw(&db_path)?.is_none_or(|conn| {
75 let (offset, hash) = query::get_projection_cursor(&conn).unwrap_or((0, None));
79 if offset == 0 && hash.is_none() {
80 true
81 } else {
82 let mgr = crate::shard::ShardManager::new(bones_dir);
84 let total_bytes = mgr.total_content_len().unwrap_or(0);
85 let cursor = usize::try_from(offset).unwrap_or(0);
86 total_bytes != cursor
87 }
88 });
89
90 if needs_rebuild {
91 debug!("projection stale or missing, running incremental rebuild");
92 incremental::incremental_apply(&events_dir, &db_path, false)
93 .context("auto-rebuild projection")?;
94 }
95
96 query::try_open_projection_raw(&db_path)
98}
99
100fn configure_connection(conn: &Connection) -> anyhow::Result<()> {
101 conn.pragma_update(None, "foreign_keys", "ON")
102 .context("PRAGMA foreign_keys = ON")?;
103 conn.pragma_update(None, "synchronous", "NORMAL")
104 .context("PRAGMA synchronous = NORMAL")?;
105 let _journal_mode: String = conn
106 .query_row("PRAGMA journal_mode = WAL", [], |row| row.get(0))
107 .context("PRAGMA journal_mode = WAL")?;
108 conn.busy_timeout(DEFAULT_BUSY_TIMEOUT)
109 .context("busy_timeout")?;
110 Ok(())
111}
112
113#[cfg(test)]
114mod tests {
115 use super::{DEFAULT_BUSY_TIMEOUT, open_projection};
116 use crate::db::migrations;
117 use tempfile::TempDir;
118
119 fn temp_db_path() -> (TempDir, std::path::PathBuf) {
120 let dir = tempfile::tempdir().expect("create temp dir");
121 let path = dir.path().join("bones-projection.sqlite3");
122 (dir, path)
123 }
124
125 #[test]
126 fn open_projection_sets_wal_busy_timeout_and_fk() {
127 let (_dir, path) = temp_db_path();
128 let conn = open_projection(&path).expect("open projection db");
129
130 let journal_mode: String = conn
131 .pragma_query_value(None, "journal_mode", |row| row.get(0))
132 .expect("query journal_mode");
133 assert_eq!(journal_mode.to_ascii_lowercase(), "wal");
134
135 let busy_timeout_ms: u64 = conn
136 .pragma_query_value(None, "busy_timeout", |row| row.get(0))
137 .expect("query busy_timeout");
138 assert_eq!(
139 u128::from(busy_timeout_ms),
140 DEFAULT_BUSY_TIMEOUT.as_millis()
141 );
142
143 let foreign_keys: i64 = conn
144 .pragma_query_value(None, "foreign_keys", |row| row.get(0))
145 .expect("query foreign_keys");
146 assert_eq!(foreign_keys, 1);
147 }
148
149 #[test]
150 fn open_projection_runs_migrations() {
151 let (_dir, path) = temp_db_path();
152 let conn = open_projection(&path).expect("open projection db");
153
154 let version = migrations::current_schema_version(&conn).expect("schema version query");
155 assert_eq!(version, migrations::LATEST_SCHEMA_VERSION);
156
157 let projection_version: i64 = conn
158 .query_row(
159 "SELECT schema_version FROM projection_meta WHERE id = 1",
160 [],
161 |row| row.get(0),
162 )
163 .expect("projection_meta schema version");
164 assert_eq!(
165 projection_version,
166 i64::from(migrations::LATEST_SCHEMA_VERSION)
167 );
168 }
169}