1pub mod fts;
9pub mod incremental;
10pub mod migrations;
11pub mod project;
12pub mod query;
13pub mod rebuild;
14pub mod schema;
15
16use anyhow::{Context, Result};
17use rusqlite::Connection;
18use std::{path::Path, path::PathBuf, time::Duration};
19use tracing::debug;
20
21pub const DEFAULT_BUSY_TIMEOUT: Duration = Duration::from_secs(5);
23
24const PROJECTION_DIRTY_MARKER: &str = "cache/projection.dirty";
25
26pub fn open_projection(path: &Path) -> Result<Connection> {
33 if let Some(parent) = path.parent() {
34 std::fs::create_dir_all(parent)
35 .with_context(|| format!("create projection db directory {}", parent.display()))?;
36 }
37
38 if let Err(err) = bones_sqlite_vec::register_auto_extension() {
39 debug!(%err, "sqlite-vec auto-extension unavailable");
40 }
41
42 let mut conn = Connection::open(path)
43 .with_context(|| format!("open projection database {}", path.display()))?;
44
45 configure_connection(&conn).context("configure sqlite pragmas")?;
46 migrations::migrate(&mut conn).context("apply projection migrations")?;
47
48 Ok(conn)
49}
50
51pub fn ensure_projection(bones_dir: &Path) -> Result<Option<Connection>> {
68 let events_dir = bones_dir.join("events");
69 if !events_dir.is_dir() {
70 return Ok(None);
71 }
72
73 let db_path = bones_dir.join("bones.db");
74 let dirty_marker = projection_dirty_marker_path(bones_dir);
75 let marker_exists = dirty_marker.exists();
76
77 let needs_rebuild = projection_needs_rebuild(bones_dir, &events_dir, &db_path, marker_exists)?;
78
79 if needs_rebuild {
80 debug!("projection stale or missing, running incremental rebuild");
81 incremental::incremental_apply(&events_dir, &db_path, false)
82 .context("auto-rebuild projection")?;
83 if dirty_marker.exists() {
84 let _ = std::fs::remove_file(&dirty_marker);
85 }
86 }
87
88 query::try_open_projection_raw(&db_path)
90}
91
92fn projection_needs_rebuild(
93 bones_dir: &Path,
94 events_dir: &Path,
95 db_path: &Path,
96 marker_exists: bool,
97) -> Result<bool> {
98 if marker_exists {
99 return Ok(true);
100 }
101
102 let Some(conn) = query::try_open_projection_raw(db_path)? else {
103 return Ok(true);
104 };
105
106 let (offset, hash) = query::get_projection_cursor(&conn).unwrap_or((0, None));
107 if offset == 0 && hash.is_none() {
108 return Ok(true);
109 }
110
111 let (total_bytes, last_hash) =
112 incremental::event_log_cursor(events_dir).context("read event log cursor")?;
113 let cursor = usize::try_from(offset).unwrap_or(usize::MAX);
114 let stale = total_bytes != cursor || hash != last_hash;
115 if stale {
116 debug!(
117 cursor,
118 total_bytes,
119 cursor_hash = ?hash,
120 last_hash = ?last_hash,
121 bones_dir = %bones_dir.display(),
122 "projection cursor drift detected"
123 );
124 }
125
126 Ok(stale)
127}
128
129fn configure_connection(conn: &Connection) -> anyhow::Result<()> {
130 conn.pragma_update(None, "foreign_keys", "ON")
131 .context("PRAGMA foreign_keys = ON")?;
132 conn.pragma_update(None, "synchronous", "NORMAL")
133 .context("PRAGMA synchronous = NORMAL")?;
134 let _journal_mode: String = conn
135 .query_row("PRAGMA journal_mode = WAL", [], |row| row.get(0))
136 .context("PRAGMA journal_mode = WAL")?;
137 conn.busy_timeout(DEFAULT_BUSY_TIMEOUT)
138 .context("busy_timeout")?;
139 Ok(())
140}
141
142#[must_use]
144pub fn projection_dirty_marker_path(bones_dir: &Path) -> PathBuf {
145 bones_dir.join(PROJECTION_DIRTY_MARKER)
146}
147
148pub fn mark_projection_dirty(bones_dir: &Path, reason: &str) -> Result<()> {
155 let marker = projection_dirty_marker_path(bones_dir);
156 if let Some(parent) = marker.parent() {
157 std::fs::create_dir_all(parent)
158 .with_context(|| format!("create projection marker dir {}", parent.display()))?;
159 }
160
161 let ts = std::time::SystemTime::now()
162 .duration_since(std::time::UNIX_EPOCH)
163 .unwrap_or_default()
164 .as_micros();
165 std::fs::write(&marker, format!("{ts} {reason}\n"))
166 .with_context(|| format!("write projection marker {}", marker.display()))?;
167 Ok(())
168}
169
170pub fn mark_projection_dirty_from_connection(conn: &Connection, reason: &str) -> Result<()> {
177 let mut stmt = conn
178 .prepare("PRAGMA database_list")
179 .context("prepare PRAGMA database_list")?;
180 let mut rows = stmt.query([]).context("query PRAGMA database_list")?;
181
182 while let Some(row) = rows.next().context("iterate PRAGMA database_list")? {
183 let name: String = row.get(1).context("read database_list name")?;
184 if name != "main" {
185 continue;
186 }
187 let path: String = row.get(2).context("read database_list path")?;
188 if path.is_empty() {
189 return Ok(());
190 }
191 if let Some(bones_dir) = std::path::Path::new(&path).parent()
192 && bones_dir.ends_with(".bones")
193 {
194 return mark_projection_dirty(bones_dir, reason);
195 }
196 }
197
198 Ok(())
199}
200
201#[cfg(test)]
202mod tests {
203 use super::{DEFAULT_BUSY_TIMEOUT, open_projection};
204 use crate::db::migrations;
205 use crate::db::{ensure_projection, mark_projection_dirty, projection_dirty_marker_path};
206 use crate::event::Event;
207 use crate::event::data::{CreateData, EventData};
208 use crate::event::types::EventType;
209 use crate::event::writer;
210 use crate::model::item::{Kind, Urgency};
211 use crate::model::item_id::ItemId;
212 use crate::shard::ShardManager;
213 use std::collections::BTreeMap;
214 use tempfile::TempDir;
215
216 fn temp_db_path() -> (TempDir, std::path::PathBuf) {
217 let dir = tempfile::tempdir().expect("create temp dir");
218 let path = dir.path().join("bones-projection.sqlite3");
219 (dir, path)
220 }
221
222 fn make_create(item_id: &str, title: &str, ts: i64) -> Event {
223 Event {
224 wall_ts_us: ts,
225 agent: "test-agent".to_string(),
226 itc: "itc:AQ".to_string(),
227 parents: vec![],
228 event_type: EventType::Create,
229 item_id: ItemId::new_unchecked(item_id),
230 data: EventData::Create(CreateData {
231 title: title.to_string(),
232 kind: Kind::Task,
233 size: None,
234 urgency: Urgency::Default,
235 labels: vec![],
236 parent: None,
237 causation: None,
238 description: None,
239 extra: BTreeMap::new(),
240 }),
241 event_hash: String::new(),
242 }
243 }
244
245 #[test]
246 fn open_projection_sets_wal_busy_timeout_and_fk() {
247 let (_dir, path) = temp_db_path();
248 let conn = open_projection(&path).expect("open projection db");
249
250 let journal_mode: String = conn
251 .pragma_query_value(None, "journal_mode", |row| row.get(0))
252 .expect("query journal_mode");
253 assert_eq!(journal_mode.to_ascii_lowercase(), "wal");
254
255 let busy_timeout_ms: u64 = conn
256 .pragma_query_value(None, "busy_timeout", |row| row.get(0))
257 .expect("query busy_timeout");
258 assert_eq!(
259 u128::from(busy_timeout_ms),
260 DEFAULT_BUSY_TIMEOUT.as_millis()
261 );
262
263 let foreign_keys: i64 = conn
264 .pragma_query_value(None, "foreign_keys", |row| row.get(0))
265 .expect("query foreign_keys");
266 assert_eq!(foreign_keys, 1);
267 }
268
269 #[test]
270 fn open_projection_runs_migrations() {
271 let (_dir, path) = temp_db_path();
272 let conn = open_projection(&path).expect("open projection db");
273
274 let version = migrations::current_schema_version(&conn).expect("schema version query");
275 assert_eq!(version, migrations::LATEST_SCHEMA_VERSION);
276
277 let projection_version: i64 = conn
278 .query_row(
279 "SELECT schema_version FROM projection_meta WHERE id = 1",
280 [],
281 |row| row.get(0),
282 )
283 .expect("projection_meta schema version");
284 assert_eq!(
285 projection_version,
286 i64::from(migrations::LATEST_SCHEMA_VERSION)
287 );
288 }
289
290 #[test]
291 fn mark_projection_dirty_creates_marker_file() {
292 let dir = tempfile::tempdir().expect("create temp dir");
293 let bones_dir = dir.path().join(".bones");
294 std::fs::create_dir_all(bones_dir.join("events")).expect("events dir");
295
296 mark_projection_dirty(&bones_dir, "test reason").expect("mark projection dirty");
297
298 let marker = projection_dirty_marker_path(&bones_dir);
299 assert!(marker.exists(), "dirty marker should be created");
300 }
301
302 #[test]
303 fn ensure_projection_rebuild_clears_dirty_marker() {
304 let dir = tempfile::tempdir().expect("create temp dir");
305 let bones_dir = dir.path().join(".bones");
306 std::fs::create_dir_all(bones_dir.join("events")).expect("events dir");
307 std::fs::create_dir_all(bones_dir.join("cache")).expect("cache dir");
308
309 let shard_mgr = ShardManager::new(&bones_dir);
310 shard_mgr.init().expect("init shard");
311 let (year, month) = shard_mgr
312 .active_shard()
313 .expect("active shard")
314 .expect("some shard");
315
316 let mut create = Event {
317 wall_ts_us: 1_700_000_000_000_000,
318 agent: "test-agent".to_string(),
319 itc: "itc:AQ".to_string(),
320 parents: vec![],
321 event_type: EventType::Create,
322 item_id: ItemId::new_unchecked("bn-marker"),
323 data: EventData::Create(CreateData {
324 title: "marker test".to_string(),
325 kind: Kind::Task,
326 size: None,
327 urgency: Urgency::Default,
328 labels: vec![],
329 parent: None,
330 causation: None,
331 description: None,
332 extra: BTreeMap::new(),
333 }),
334 event_hash: String::new(),
335 };
336 let line = writer::write_event(&mut create).expect("serialize create event");
337 shard_mgr
338 .append_raw(year, month, &line)
339 .expect("append create event");
340
341 mark_projection_dirty(&bones_dir, "simulate projection failure").expect("mark dirty");
342 let marker = projection_dirty_marker_path(&bones_dir);
343 assert!(marker.exists(), "precondition: marker exists");
344
345 let conn = ensure_projection(&bones_dir)
346 .expect("ensure projection")
347 .expect("projection connection");
348 let item_count: i64 = conn
349 .query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
350 .expect("count items");
351 assert_eq!(item_count, 1);
352 assert!(
353 !marker.exists(),
354 "dirty marker should be cleared after successful recovery"
355 );
356 }
357
358 #[test]
359 fn ensure_projection_rebuilds_when_log_hash_changes_without_size_change() {
360 let dir = tempfile::tempdir().expect("create temp dir");
361 let bones_dir = dir.path().join(".bones");
362 std::fs::create_dir_all(bones_dir.join("events")).expect("events dir");
363
364 let shard_mgr = ShardManager::new(&bones_dir);
365 shard_mgr.init().expect("init shard");
366 let (year, month) = shard_mgr
367 .active_shard()
368 .expect("active shard")
369 .expect("some shard");
370
371 let mut first = make_create("bn-alpha", "first title", 1_700_000_000_000_000);
372 let first_line = writer::write_event(&mut first).expect("serialize first create");
373 shard_mgr
374 .append_raw(year, month, &first_line)
375 .expect("append first event");
376
377 let conn = ensure_projection(&bones_dir)
378 .expect("ensure projection")
379 .expect("projection connection");
380 let first_count: i64 = conn
381 .query_row(
382 "SELECT COUNT(*) FROM items WHERE item_id = 'bn-alpha'",
383 [],
384 |row| row.get(0),
385 )
386 .expect("count first item");
387 assert_eq!(first_count, 1);
388 drop(conn);
389
390 let mut second = make_create("bn-bravo", "other title", 1_700_000_000_000_000);
391 let second_line = writer::write_event(&mut second).expect("serialize second create");
392 assert_ne!(first.event_hash, second.event_hash);
393 assert_eq!(
394 first_line.len(),
395 second_line.len(),
396 "test setup needs a same-length event-log rewrite"
397 );
398
399 let shard_path = shard_mgr.shard_path(year, month);
400 let original_content = std::fs::read_to_string(&shard_path).expect("read shard");
401 let event_start = original_content
402 .rfind(&first_line)
403 .expect("original event line present");
404 let replacement = format!("{}{}", &original_content[..event_start], second_line);
405 assert_eq!(original_content.len(), replacement.len());
406 std::fs::write(&shard_path, replacement).expect("rewrite shard with same byte length");
407
408 let conn = ensure_projection(&bones_dir)
409 .expect("ensure projection after rewrite")
410 .expect("projection connection");
411 let counts: (i64, i64) = conn
412 .query_row(
413 "SELECT
414 SUM(CASE WHEN item_id = 'bn-alpha' THEN 1 ELSE 0 END),
415 SUM(CASE WHEN item_id = 'bn-bravo' THEN 1 ELSE 0 END)
416 FROM items",
417 [],
418 |row| Ok((row.get(0)?, row.get(1)?)),
419 )
420 .expect("count rewritten items");
421 assert_eq!(counts, (0, 1));
422 }
423}