Skip to main content

bones_core/db/
rebuild.rs

1//! Full projection rebuild from the event log.
2//!
3//! `bn admin rebuild` drops and recreates the entire `SQLite` DB from the canonical
4//! event log, proving the projection is disposable and reproducible.
5
6use std::path::Path;
7use std::time::Instant;
8
9use anyhow::{Context, Result};
10
11use crate::db::{open_projection, project};
12use crate::event::Event;
13use crate::shard::ShardManager;
14use std::io;
15
16const DEFAULT_REBUILD_BATCH_SIZE: usize = 4_000;
17
18fn rebuild_batch_size() -> usize {
19    std::env::var("BONES_REBUILD_BATCH_SIZE")
20        .ok()
21        .and_then(|v| v.parse::<usize>().ok())
22        .filter(|v| *v > 0)
23        .unwrap_or(DEFAULT_REBUILD_BATCH_SIZE)
24}
25
26fn configure_rebuild_pragmas(conn: &rusqlite::Connection) -> Result<()> {
27    conn.pragma_update(None, "temp_store", "MEMORY")
28        .context("PRAGMA temp_store = MEMORY")?;
29    conn.pragma_update(None, "cache_size", -131_072_i64)
30        .context("PRAGMA cache_size")?;
31    conn.pragma_update(None, "mmap_size", 268_435_456_i64)
32        .context("PRAGMA mmap_size")?;
33
34    // Bulk-load tuning: this DB was just created and is about to be
35    // populated from the canonical event log. If anything goes wrong the
36    // whole file is discarded and the rebuild is re-run, so crash durability
37    // during the rebuild window has no value. Turning fsync + journaling off
38    // saves the bulk-load from paying for either.
39    //
40    // `configure_pragmas` in db/mod.rs restores the safe defaults
41    // (journal_mode=WAL, synchronous=NORMAL) the next time this DB is
42    // opened for normal use.
43    conn.pragma_update(None, "synchronous", "OFF")
44        .context("PRAGMA synchronous = OFF")?;
45    let _: String = conn
46        .query_row("PRAGMA journal_mode = OFF", [], |row| row.get(0))
47        .context("PRAGMA journal_mode = OFF")?;
48    let _: String = conn
49        .query_row("PRAGMA locking_mode = EXCLUSIVE", [], |row| row.get(0))
50        .context("PRAGMA locking_mode = EXCLUSIVE")?;
51    Ok(())
52}
53
54/// Names of the FTS5 maintenance triggers on `items`. Kept in one place so
55/// the rebuild path can drop them before bulk-loading and recreate them
56/// after, matching the definitions in `db/schema.rs`.
57const FTS_TRIGGERS: &[&str] = &["items_ai", "items_au", "items_ad"];
58
59/// Drop the three FTS5 maintenance triggers so bulk inserts during rebuild
60/// don't pay per-row FTS5 maintenance cost. The triggers are recreated by
61/// [`restore_fts_triggers`] after the rebuild's FTS index has been
62/// repopulated in bulk via [`crate::db::fts::rebuild_fts_index`].
63fn drop_fts_triggers(conn: &rusqlite::Connection) -> Result<()> {
64    for name in FTS_TRIGGERS {
65        conn.execute_batch(&format!("DROP TRIGGER IF EXISTS {name}"))
66            .with_context(|| format!("drop trigger {name}"))?;
67    }
68    Ok(())
69}
70
71/// Recreate the FTS5 maintenance triggers dropped by [`drop_fts_triggers`].
72/// Kept byte-for-byte in sync with `db/schema.rs`.
73fn restore_fts_triggers(conn: &rusqlite::Connection) -> Result<()> {
74    conn.execute_batch(
75        "CREATE TRIGGER IF NOT EXISTS items_ai
76         AFTER INSERT ON items
77         BEGIN
78             INSERT INTO items_fts(rowid, title, description, labels, item_id)
79             VALUES (
80                 new.rowid,
81                 new.title,
82                 COALESCE(new.description, ''),
83                 COALESCE(new.search_labels, ''),
84                 new.item_id
85             );
86         END;
87
88         CREATE TRIGGER IF NOT EXISTS items_au
89         AFTER UPDATE ON items
90         BEGIN
91             DELETE FROM items_fts WHERE rowid = old.rowid;
92             INSERT INTO items_fts(rowid, title, description, labels, item_id)
93             VALUES (
94                 new.rowid,
95                 new.title,
96                 COALESCE(new.description, ''),
97                 COALESCE(new.search_labels, ''),
98                 new.item_id
99             );
100         END;
101
102         CREATE TRIGGER IF NOT EXISTS items_ad
103         AFTER DELETE ON items
104         BEGIN
105             DELETE FROM items_fts WHERE rowid = old.rowid;
106         END;",
107    )
108    .context("recreate FTS5 maintenance triggers after rebuild")?;
109    Ok(())
110}
111
112// ---------------------------------------------------------------------------
113// RebuildReport
114// ---------------------------------------------------------------------------
115
116/// Report returned after a full projection rebuild.
117#[derive(Debug, Clone, PartialEq, Eq)]
118pub struct RebuildReport {
119    /// Total events replayed from all shards.
120    pub event_count: usize,
121    /// Total unique items in the rebuilt projection.
122    pub item_count: usize,
123    /// Wall-clock elapsed time for the rebuild.
124    pub elapsed: std::time::Duration,
125    /// Number of shard files processed.
126    pub shard_count: usize,
127    /// Whether FTS5 index was rebuilt.
128    pub fts5_rebuilt: bool,
129}
130
131// ---------------------------------------------------------------------------
132// rebuild
133// ---------------------------------------------------------------------------
134
135/// Validate sealed shard manifests, bailing on corruption.
136fn check_sealed_shard_integrity(shard_mgr: &ShardManager) -> Result<()> {
137    let issues = shard_mgr
138        .validate_sealed_shards()
139        .map_err(|e| anyhow::anyhow!("sealed shard validation: {e}"))?;
140    if !issues.is_empty() {
141        for issue in &issues {
142            tracing::error!(
143                shard = %issue.shard_name,
144                problem = %issue.problem,
145                "sealed shard integrity check failed"
146            );
147        }
148        anyhow::bail!(
149            "sealed shard corrupted: {} (run `bn doctor` to diagnose)",
150            issues[0].problem
151        );
152    }
153    Ok(())
154}
155
156/// Drop the existing DB and rebuild it from the canonical event log.
157///
158/// 1. Deletes the existing database file (if any)
159/// 2. Creates a fresh schema via `open_projection`
160/// 3. Replays all events from `events_dir` shards through the projector
161/// 4. FTS5 index is maintained via triggers during projection
162/// 5. Updates the projection cursor with the final offset
163///
164/// # Arguments
165///
166/// * `events_dir` — Path to `.bones/events/` (the shard directory)
167/// * `db_path` — Path to `.bones/bones.db` (the `SQLite` projection file)
168///
169/// # Errors
170///
171/// Returns an error if shard reading, event parsing, or projection fails.
172pub fn rebuild(events_dir: &Path, db_path: &Path) -> Result<RebuildReport> {
173    let start = Instant::now();
174
175    // 1. Delete existing database file
176    if db_path.exists() {
177        std::fs::remove_file(db_path)
178            .with_context(|| format!("remove existing projection db {}", db_path.display()))?;
179        // Also remove WAL and SHM files
180        let wal_path = db_path.with_extension("db-wal");
181        let shm_path = db_path.with_extension("db-shm");
182        let _ = std::fs::remove_file(wal_path);
183        let _ = std::fs::remove_file(shm_path);
184    }
185
186    // 2. Create fresh schema
187    let conn = open_projection(db_path).context("create fresh projection database")?;
188    configure_rebuild_pragmas(&conn).context("configure rebuild sqlite pragmas")?;
189    project::ensure_tracking_table(&conn).context("create tracking table")?;
190
191    // 2b. Disable FTS5 maintenance triggers for the bulk-load. Every
192    // row projection would otherwise incur an insert into items_fts; we
193    // repopulate the whole FTS5 index once at the end in a single
194    // query, which is ~5x cheaper on the bench corpus.
195    drop_fts_triggers(&conn).context("drop FTS5 triggers for bulk rebuild")?;
196
197    // 3. Read and replay all events in streaming batches
198    let bones_dir = events_dir.parent().unwrap_or_else(|| Path::new("."));
199    let shard_mgr = ShardManager::new(bones_dir);
200
201    let shards = shard_mgr
202        .list_shards()
203        .map_err(|e| anyhow::anyhow!("list shards: {e}"))?;
204    let shard_count = shards.len();
205
206    check_sealed_shard_integrity(&shard_mgr)?;
207
208    // We need a custom loop because EventParser expects Iterator<Item = String>
209    // and returns Result<Event, ...>.
210    let mut version_checked = false;
211    let mut shard_version = crate::event::parser::CURRENT_VERSION;
212    let mut line_no = 0;
213    let mut total_projected = 0;
214    let mut total_duplicates = 0;
215    let mut last_event_hash = None;
216    let mut total_byte_len = 0;
217
218    let batch_size = rebuild_batch_size();
219    let mut current_batch: Vec<Event> = Vec::with_capacity(batch_size);
220    let projector = project::Projector::new(&conn);
221
222    let shard_line_iter = shard_mgr.replay_lines()?;
223
224    for line_res in shard_line_iter {
225        let (offset, line): (usize, String) =
226            line_res.map_err(|e: io::Error| anyhow::anyhow!("read shard line: {e}"))?;
227        line_no += 1;
228        total_byte_len = offset + line.len();
229
230        if !version_checked && line.trim_start().starts_with("# bones event log v") {
231            version_checked = true;
232            shard_version = crate::event::parser::detect_version(&line)
233                .map_err(|msg| anyhow::anyhow!("version check failed at line {line_no}: {msg}"))?;
234            continue;
235        }
236
237        match crate::event::parser::parse_line(&line) {
238            Ok(crate::event::parser::ParsedLine::Event(event)) => {
239                let event = crate::event::migrate_event(*event, shard_version)
240                    .map_err(|e| anyhow::anyhow!("migration failed at line {line_no}: {e}"))?;
241
242                last_event_hash = Some(event.event_hash.clone());
243                current_batch.push(event);
244
245                if current_batch.len() >= batch_size {
246                    let stats = projector
247                        .project_batch(&current_batch)
248                        .context("project batch during rebuild")?;
249                    total_projected += stats.projected;
250                    total_duplicates += stats.duplicates;
251                    current_batch.clear();
252                }
253            }
254            Ok(
255                crate::event::parser::ParsedLine::Comment(_)
256                | crate::event::parser::ParsedLine::Blank,
257            ) => {}
258            Err(crate::event::parser::ParseError::InvalidEventType(raw)) => {
259                tracing::warn!(line = line_no, event_type = %raw, "skipping unknown event type");
260            }
261            Err(e) => anyhow::bail!("parse error at line {line_no}: {e}"),
262        }
263    }
264
265    // Final batch
266    if !current_batch.is_empty() {
267        let stats = projector
268            .project_batch(&current_batch)
269            .context("project final batch during rebuild")?;
270        total_projected += stats.projected;
271        total_duplicates += stats.duplicates;
272    }
273
274    // 4b. Rebuild the FTS5 index in bulk now that all items are in place,
275    // then recreate the maintenance triggers so subsequent incremental
276    // projections keep the FTS5 index in sync row-by-row.
277    crate::db::fts::rebuild_fts_index(&conn).context("rebuild FTS5 index after bulk load")?;
278    restore_fts_triggers(&conn).context("restore FTS5 triggers after bulk rebuild")?;
279
280    // 5. Update projection cursor
281    let byte_offset_i64 = i64::try_from(total_byte_len).unwrap_or(i64::MAX);
282    crate::db::query::update_projection_cursor(&conn, byte_offset_i64, last_event_hash.as_deref())
283        .context("update projection cursor after rebuild")?;
284
285    // Count unique items
286    let item_count: i64 = conn
287        .query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
288        .context("count items after rebuild")?;
289
290    let elapsed = start.elapsed();
291
292    tracing::info!(
293        event_count = total_projected,
294        duplicates = total_duplicates,
295        batch_size,
296        item_count,
297        shard_count,
298        elapsed_ms = elapsed.as_millis(),
299        "projection rebuild complete"
300    );
301
302    Ok(RebuildReport {
303        event_count: total_projected,
304        item_count: usize::try_from(item_count).unwrap_or(0),
305        elapsed,
306        shard_count,
307        fts5_rebuilt: true, // FTS5 triggers fire during projection
308    })
309}
310
311// ---------------------------------------------------------------------------
312// Tests
313// ---------------------------------------------------------------------------
314
315#[cfg(test)]
316mod tests {
317    use super::*;
318    use crate::event::Event;
319    use crate::event::data::*;
320    use crate::event::types::EventType;
321    use crate::event::writer;
322    use crate::model::item::{Kind, Size, Urgency};
323    use crate::model::item_id::ItemId;
324    use crate::shard::ShardManager;
325    use std::collections::BTreeMap;
326    use tempfile::TempDir;
327
328    fn setup_bones_dir() -> (TempDir, ShardManager) {
329        let dir = TempDir::new().expect("create tempdir");
330        let shard_mgr = ShardManager::new(dir.path());
331        shard_mgr.ensure_dirs().expect("ensure dirs");
332        shard_mgr.init().expect("init shard");
333        (dir, shard_mgr)
334    }
335
336    fn make_create_event(id: &str, title: &str, ts: i64) -> Event {
337        let mut event = Event {
338            wall_ts_us: ts,
339            agent: "test-agent".into(),
340            itc: "itc:AQ".into(),
341            parents: vec![],
342            event_type: EventType::Create,
343            item_id: ItemId::new_unchecked(id),
344            data: EventData::Create(CreateData {
345                title: title.into(),
346                kind: Kind::Task,
347                size: Some(Size::M),
348                urgency: Urgency::Default,
349                labels: vec!["test".into()],
350                parent: None,
351                causation: None,
352                description: Some(format!("Description for {title}")),
353                extra: BTreeMap::new(),
354            }),
355            event_hash: String::new(),
356        };
357        // Compute hash
358        writer::write_event(&mut event).expect("compute hash");
359        event
360    }
361
362    fn make_move_event(
363        id: &str,
364        state: crate::model::item::State,
365        ts: i64,
366        parent_hash: &str,
367    ) -> Event {
368        let mut event = Event {
369            wall_ts_us: ts,
370            agent: "test-agent".into(),
371            itc: "itc:AQ".into(),
372            parents: vec![parent_hash.into()],
373            event_type: EventType::Move,
374            item_id: ItemId::new_unchecked(id),
375            data: EventData::Move(MoveData {
376                state,
377                reason: None,
378                extra: BTreeMap::new(),
379            }),
380            event_hash: String::new(),
381        };
382        writer::write_event(&mut event).expect("compute hash");
383        event
384    }
385
386    fn append_event(shard_mgr: &ShardManager, event: &Event) {
387        let line = writer::write_line(event).expect("serialize event");
388        let (year, month) = shard_mgr.active_shard().unwrap().unwrap();
389        shard_mgr
390            .append_raw(year, month, &line)
391            .expect("append event");
392    }
393
394    #[test]
395    fn rebuild_empty_event_log() {
396        let (dir, _shard_mgr) = setup_bones_dir();
397        let db_path = dir.path().join("bones.db");
398        let events_dir = dir.path().join("events");
399
400        let report = rebuild(&events_dir, &db_path).unwrap();
401        assert_eq!(report.event_count, 0);
402        assert_eq!(report.item_count, 0);
403        assert_eq!(report.shard_count, 1); // init creates one shard
404        assert!(report.fts5_rebuilt);
405
406        // Verify DB exists and is valid
407        let conn = open_projection(&db_path).unwrap();
408        let count: i64 = conn
409            .query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
410            .unwrap();
411        assert_eq!(count, 0);
412    }
413
414    #[test]
415    fn rebuild_with_events() {
416        let (dir, shard_mgr) = setup_bones_dir();
417        let db_path = dir.path().join("bones.db");
418        let events_dir = dir.path().join("events");
419
420        // Write events
421        let create1 = make_create_event("bn-001", "First item", 1000);
422        let create2 = make_create_event("bn-002", "Second item", 1001);
423        let mv = make_move_event(
424            "bn-001",
425            crate::model::item::State::Doing,
426            2000,
427            &create1.event_hash,
428        );
429
430        append_event(&shard_mgr, &create1);
431        append_event(&shard_mgr, &create2);
432        append_event(&shard_mgr, &mv);
433
434        let report = rebuild(&events_dir, &db_path).unwrap();
435        assert_eq!(report.event_count, 3);
436        assert_eq!(report.item_count, 2);
437
438        // Verify items
439        let conn = open_projection(&db_path).unwrap();
440        let item: String = conn
441            .query_row(
442                "SELECT state FROM items WHERE item_id = 'bn-001'",
443                [],
444                |row| row.get(0),
445            )
446            .unwrap();
447        assert_eq!(item, "doing");
448    }
449
450    #[test]
451    fn rebuild_replaces_existing_db() {
452        let (dir, shard_mgr) = setup_bones_dir();
453        let db_path = dir.path().join("bones.db");
454        let events_dir = dir.path().join("events");
455
456        // First rebuild with 1 event
457        let create1 = make_create_event("bn-001", "Item 1", 1000);
458        append_event(&shard_mgr, &create1);
459
460        let report1 = rebuild(&events_dir, &db_path).unwrap();
461        assert_eq!(report1.event_count, 1);
462        assert_eq!(report1.item_count, 1);
463
464        // Add another event and rebuild again
465        let create2 = make_create_event("bn-002", "Item 2", 1001);
466        append_event(&shard_mgr, &create2);
467
468        let report2 = rebuild(&events_dir, &db_path).unwrap();
469        assert_eq!(report2.event_count, 2);
470        assert_eq!(report2.item_count, 2);
471    }
472
473    #[test]
474    fn rebuild_is_deterministic() {
475        let (dir, shard_mgr) = setup_bones_dir();
476        let events_dir = dir.path().join("events");
477
478        let create1 = make_create_event("bn-001", "Deterministic test", 1000);
479        let create2 = make_create_event("bn-002", "Another item", 1001);
480        append_event(&shard_mgr, &create1);
481        append_event(&shard_mgr, &create2);
482
483        // Rebuild twice to different DB paths
484        let db_path_a = dir.path().join("bones_a.db");
485        let db_path_b = dir.path().join("bones_b.db");
486
487        let report_a = rebuild(&events_dir, &db_path_a).unwrap();
488        let report_b = rebuild(&events_dir, &db_path_b).unwrap();
489
490        assert_eq!(report_a.event_count, report_b.event_count);
491        assert_eq!(report_a.item_count, report_b.item_count);
492
493        // Verify same items in both
494        let conn_a = open_projection(&db_path_a).unwrap();
495        let conn_b = open_projection(&db_path_b).unwrap();
496
497        let titles_a: Vec<String> = {
498            let mut stmt = conn_a
499                .prepare("SELECT title FROM items ORDER BY item_id")
500                .unwrap();
501            stmt.query_map([], |row| row.get(0))
502                .unwrap()
503                .map(|r| r.unwrap())
504                .collect()
505        };
506
507        let titles_b: Vec<String> = {
508            let mut stmt = conn_b
509                .prepare("SELECT title FROM items ORDER BY item_id")
510                .unwrap();
511            stmt.query_map([], |row| row.get(0))
512                .unwrap()
513                .map(|r| r.unwrap())
514                .collect()
515        };
516
517        assert_eq!(titles_a, titles_b);
518    }
519
520    #[test]
521    fn rebuild_populates_fts() {
522        let (dir, shard_mgr) = setup_bones_dir();
523        let db_path = dir.path().join("bones.db");
524        let events_dir = dir.path().join("events");
525
526        let create = make_create_event("bn-001", "Authentication timeout fix", 1000);
527        append_event(&shard_mgr, &create);
528
529        rebuild(&events_dir, &db_path).unwrap();
530
531        let conn = open_projection(&db_path).unwrap();
532        let hits: i64 = conn
533            .query_row(
534                "SELECT COUNT(*) FROM items_fts WHERE items_fts MATCH 'authentication'",
535                [],
536                |row| row.get(0),
537            )
538            .unwrap();
539        assert_eq!(hits, 1);
540    }
541
542    #[test]
543    fn rebuild_updates_projection_cursor() {
544        let (dir, shard_mgr) = setup_bones_dir();
545        let db_path = dir.path().join("bones.db");
546        let events_dir = dir.path().join("events");
547
548        let create = make_create_event("bn-001", "Item", 1000);
549        append_event(&shard_mgr, &create);
550
551        rebuild(&events_dir, &db_path).unwrap();
552
553        let conn = open_projection(&db_path).unwrap();
554        let (offset, hash) = crate::db::query::get_projection_cursor(&conn).unwrap();
555        assert!(offset > 0, "cursor offset should be non-zero after rebuild");
556        assert!(hash.is_some(), "cursor hash should be set after rebuild");
557    }
558
559    #[test]
560    fn rebuild_handles_duplicate_events() {
561        let (dir, shard_mgr) = setup_bones_dir();
562        let db_path = dir.path().join("bones.db");
563        let events_dir = dir.path().join("events");
564
565        // Append same event twice (simulates git merge duplication)
566        let create = make_create_event("bn-001", "Item", 1000);
567        append_event(&shard_mgr, &create);
568        append_event(&shard_mgr, &create);
569
570        let report = rebuild(&events_dir, &db_path).unwrap();
571        // Only 1 unique event projected, 1 duplicate skipped
572        assert_eq!(report.event_count, 1);
573        assert_eq!(report.item_count, 1);
574    }
575
576    #[test]
577    fn rebuild_with_bd_prefix_events() {
578        let (dir, shard_mgr) = setup_bones_dir();
579        let db_path = dir.path().join("bones.db");
580        let events_dir = dir.path().join("events");
581
582        // Write events with bd- prefix (migrated bead IDs)
583        let create1 = make_create_event("bd-9mx", "Parent item", 1000);
584        let create2 = make_create_event("bd-4kz", "Child item", 1001);
585
586        append_event(&shard_mgr, &create1);
587        append_event(&shard_mgr, &create2);
588
589        let report = rebuild(&events_dir, &db_path).unwrap();
590        assert_eq!(
591            report.event_count, 2,
592            "should project 2 events with bd- prefix"
593        );
594        assert_eq!(report.item_count, 2, "should have 2 items with bd- prefix");
595    }
596
597    #[test]
598    fn rebuild_performance_reasonable() {
599        let (dir, shard_mgr) = setup_bones_dir();
600        let db_path = dir.path().join("bones.db");
601        let events_dir = dir.path().join("events");
602
603        // Create 100 items — should be well under 1s
604        for i in 0..100_u32 {
605            let create = make_create_event(
606                &format!("bn-{i:04x}"),
607                &format!("Item {i}"),
608                i64::from(i) * 1000,
609            );
610            append_event(&shard_mgr, &create);
611        }
612
613        let report = rebuild(&events_dir, &db_path).unwrap();
614        assert_eq!(report.event_count, 100);
615        assert_eq!(report.item_count, 100);
616        assert!(
617            report.elapsed.as_millis() < 1000,
618            "rebuild of 100 items took {}ms, expected <1000ms",
619            report.elapsed.as_millis()
620        );
621    }
622}