Skip to main content

bones_core/db/
rebuild.rs

1//! Full projection rebuild from the event log.
2//!
3//! `bn admin rebuild` drops and recreates the entire `SQLite` DB from the canonical
4//! event log, proving the projection is disposable and reproducible.
5
6use std::path::Path;
7use std::time::Instant;
8
9use anyhow::{Context, Result};
10
11use crate::db::{open_projection, project};
12use crate::event::Event;
13use crate::shard::ShardManager;
14use std::io;
15
16const DEFAULT_REBUILD_BATCH_SIZE: usize = 4_000;
17
18fn rebuild_batch_size() -> usize {
19    std::env::var("BONES_REBUILD_BATCH_SIZE")
20        .ok()
21        .and_then(|v| v.parse::<usize>().ok())
22        .filter(|v| *v > 0)
23        .unwrap_or(DEFAULT_REBUILD_BATCH_SIZE)
24}
25
26fn configure_rebuild_pragmas(conn: &rusqlite::Connection) -> Result<()> {
27    conn.pragma_update(None, "temp_store", "MEMORY")
28        .context("PRAGMA temp_store = MEMORY")?;
29    conn.pragma_update(None, "cache_size", -131_072_i64)
30        .context("PRAGMA cache_size")?;
31    conn.pragma_update(None, "mmap_size", 268_435_456_i64)
32        .context("PRAGMA mmap_size")?;
33    Ok(())
34}
35
36// ---------------------------------------------------------------------------
37// RebuildReport
38// ---------------------------------------------------------------------------
39
40/// Report returned after a full projection rebuild.
41#[derive(Debug, Clone, PartialEq, Eq)]
42pub struct RebuildReport {
43    /// Total events replayed from all shards.
44    pub event_count: usize,
45    /// Total unique items in the rebuilt projection.
46    pub item_count: usize,
47    /// Wall-clock elapsed time for the rebuild.
48    pub elapsed: std::time::Duration,
49    /// Number of shard files processed.
50    pub shard_count: usize,
51    /// Whether FTS5 index was rebuilt.
52    pub fts5_rebuilt: bool,
53}
54
55// ---------------------------------------------------------------------------
56// rebuild
57// ---------------------------------------------------------------------------
58
59/// Validate sealed shard manifests, bailing on corruption.
60fn check_sealed_shard_integrity(shard_mgr: &ShardManager) -> Result<()> {
61    let issues = shard_mgr
62        .validate_sealed_shards()
63        .map_err(|e| anyhow::anyhow!("sealed shard validation: {e}"))?;
64    if !issues.is_empty() {
65        for issue in &issues {
66            tracing::error!(
67                shard = %issue.shard_name,
68                problem = %issue.problem,
69                "sealed shard integrity check failed"
70            );
71        }
72        anyhow::bail!(
73            "sealed shard corrupted: {} (run `bn doctor` to diagnose)",
74            issues[0].problem
75        );
76    }
77    Ok(())
78}
79
80/// Drop the existing DB and rebuild it from the canonical event log.
81///
82/// 1. Deletes the existing database file (if any)
83/// 2. Creates a fresh schema via `open_projection`
84/// 3. Replays all events from `events_dir` shards through the projector
85/// 4. FTS5 index is maintained via triggers during projection
86/// 5. Updates the projection cursor with the final offset
87///
88/// # Arguments
89///
90/// * `events_dir` — Path to `.bones/events/` (the shard directory)
91/// * `db_path` — Path to `.bones/bones.db` (the `SQLite` projection file)
92///
93/// # Errors
94///
95/// Returns an error if shard reading, event parsing, or projection fails.
96pub fn rebuild(events_dir: &Path, db_path: &Path) -> Result<RebuildReport> {
97    let start = Instant::now();
98
99    // 1. Delete existing database file
100    if db_path.exists() {
101        std::fs::remove_file(db_path)
102            .with_context(|| format!("remove existing projection db {}", db_path.display()))?;
103        // Also remove WAL and SHM files
104        let wal_path = db_path.with_extension("db-wal");
105        let shm_path = db_path.with_extension("db-shm");
106        let _ = std::fs::remove_file(wal_path);
107        let _ = std::fs::remove_file(shm_path);
108    }
109
110    // 2. Create fresh schema
111    let conn = open_projection(db_path).context("create fresh projection database")?;
112    configure_rebuild_pragmas(&conn).context("configure rebuild sqlite pragmas")?;
113    project::ensure_tracking_table(&conn).context("create tracking table")?;
114
115    // 3. Read and replay all events in streaming batches
116    let bones_dir = events_dir.parent().unwrap_or_else(|| Path::new("."));
117    let shard_mgr = ShardManager::new(bones_dir);
118
119    let shards = shard_mgr
120        .list_shards()
121        .map_err(|e| anyhow::anyhow!("list shards: {e}"))?;
122    let shard_count = shards.len();
123
124    check_sealed_shard_integrity(&shard_mgr)?;
125
126    // We need a custom loop because EventParser expects Iterator<Item = String>
127    // and returns Result<Event, ...>.
128    let mut version_checked = false;
129    let mut shard_version = crate::event::parser::CURRENT_VERSION;
130    let mut line_no = 0;
131    let mut total_projected = 0;
132    let mut total_duplicates = 0;
133    let mut last_event_hash = None;
134    let mut total_byte_len = 0;
135
136    let batch_size = rebuild_batch_size();
137    let mut current_batch: Vec<Event> = Vec::with_capacity(batch_size);
138    let projector = project::Projector::new(&conn);
139
140    let shard_line_iter = shard_mgr.replay_lines()?;
141
142    for line_res in shard_line_iter {
143        let (offset, line): (usize, String) =
144            line_res.map_err(|e: io::Error| anyhow::anyhow!("read shard line: {e}"))?;
145        line_no += 1;
146        total_byte_len = offset + line.len();
147
148        if !version_checked && line.trim_start().starts_with("# bones event log v") {
149            version_checked = true;
150            shard_version = crate::event::parser::detect_version(&line)
151                .map_err(|msg| anyhow::anyhow!("version check failed at line {line_no}: {msg}"))?;
152            continue;
153        }
154
155        match crate::event::parser::parse_line(&line) {
156            Ok(crate::event::parser::ParsedLine::Event(event)) => {
157                let event = crate::event::migrate_event(*event, shard_version)
158                    .map_err(|e| anyhow::anyhow!("migration failed at line {line_no}: {e}"))?;
159
160                last_event_hash = Some(event.event_hash.clone());
161                current_batch.push(event);
162
163                if current_batch.len() >= batch_size {
164                    let stats = projector
165                        .project_batch(&current_batch)
166                        .context("project batch during rebuild")?;
167                    total_projected += stats.projected;
168                    total_duplicates += stats.duplicates;
169                    current_batch.clear();
170                }
171            }
172            Ok(
173                crate::event::parser::ParsedLine::Comment(_)
174                | crate::event::parser::ParsedLine::Blank,
175            ) => {}
176            Err(crate::event::parser::ParseError::InvalidEventType(raw)) => {
177                tracing::warn!(line = line_no, event_type = %raw, "skipping unknown event type");
178            }
179            Err(e) => anyhow::bail!("parse error at line {line_no}: {e}"),
180        }
181    }
182
183    // Final batch
184    if !current_batch.is_empty() {
185        let stats = projector
186            .project_batch(&current_batch)
187            .context("project final batch during rebuild")?;
188        total_projected += stats.projected;
189        total_duplicates += stats.duplicates;
190    }
191
192    // 5. Update projection cursor
193    let byte_offset_i64 = i64::try_from(total_byte_len).unwrap_or(i64::MAX);
194    crate::db::query::update_projection_cursor(&conn, byte_offset_i64, last_event_hash.as_deref())
195        .context("update projection cursor after rebuild")?;
196
197    // Count unique items
198    let item_count: i64 = conn
199        .query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
200        .context("count items after rebuild")?;
201
202    let elapsed = start.elapsed();
203
204    tracing::info!(
205        event_count = total_projected,
206        duplicates = total_duplicates,
207        batch_size,
208        item_count,
209        shard_count,
210        elapsed_ms = elapsed.as_millis(),
211        "projection rebuild complete"
212    );
213
214    Ok(RebuildReport {
215        event_count: total_projected,
216        item_count: usize::try_from(item_count).unwrap_or(0),
217        elapsed,
218        shard_count,
219        fts5_rebuilt: true, // FTS5 triggers fire during projection
220    })
221}
222
223// ---------------------------------------------------------------------------
224// Tests
225// ---------------------------------------------------------------------------
226
227#[cfg(test)]
228mod tests {
229    use super::*;
230    use crate::event::Event;
231    use crate::event::data::*;
232    use crate::event::types::EventType;
233    use crate::event::writer;
234    use crate::model::item::{Kind, Size, Urgency};
235    use crate::model::item_id::ItemId;
236    use crate::shard::ShardManager;
237    use std::collections::BTreeMap;
238    use tempfile::TempDir;
239
240    fn setup_bones_dir() -> (TempDir, ShardManager) {
241        let dir = TempDir::new().expect("create tempdir");
242        let shard_mgr = ShardManager::new(dir.path());
243        shard_mgr.ensure_dirs().expect("ensure dirs");
244        shard_mgr.init().expect("init shard");
245        (dir, shard_mgr)
246    }
247
248    fn make_create_event(id: &str, title: &str, ts: i64) -> Event {
249        let mut event = Event {
250            wall_ts_us: ts,
251            agent: "test-agent".into(),
252            itc: "itc:AQ".into(),
253            parents: vec![],
254            event_type: EventType::Create,
255            item_id: ItemId::new_unchecked(id),
256            data: EventData::Create(CreateData {
257                title: title.into(),
258                kind: Kind::Task,
259                size: Some(Size::M),
260                urgency: Urgency::Default,
261                labels: vec!["test".into()],
262                parent: None,
263                causation: None,
264                description: Some(format!("Description for {title}")),
265                extra: BTreeMap::new(),
266            }),
267            event_hash: String::new(),
268        };
269        // Compute hash
270        writer::write_event(&mut event).expect("compute hash");
271        event
272    }
273
274    fn make_move_event(
275        id: &str,
276        state: crate::model::item::State,
277        ts: i64,
278        parent_hash: &str,
279    ) -> Event {
280        let mut event = Event {
281            wall_ts_us: ts,
282            agent: "test-agent".into(),
283            itc: "itc:AQ".into(),
284            parents: vec![parent_hash.into()],
285            event_type: EventType::Move,
286            item_id: ItemId::new_unchecked(id),
287            data: EventData::Move(MoveData {
288                state,
289                reason: None,
290                extra: BTreeMap::new(),
291            }),
292            event_hash: String::new(),
293        };
294        writer::write_event(&mut event).expect("compute hash");
295        event
296    }
297
298    fn append_event(shard_mgr: &ShardManager, event: &Event) {
299        let line = writer::write_line(event).expect("serialize event");
300        let (year, month) = shard_mgr.active_shard().unwrap().unwrap();
301        shard_mgr
302            .append_raw(year, month, &line)
303            .expect("append event");
304    }
305
306    #[test]
307    fn rebuild_empty_event_log() {
308        let (dir, _shard_mgr) = setup_bones_dir();
309        let db_path = dir.path().join("bones.db");
310        let events_dir = dir.path().join("events");
311
312        let report = rebuild(&events_dir, &db_path).unwrap();
313        assert_eq!(report.event_count, 0);
314        assert_eq!(report.item_count, 0);
315        assert_eq!(report.shard_count, 1); // init creates one shard
316        assert!(report.fts5_rebuilt);
317
318        // Verify DB exists and is valid
319        let conn = open_projection(&db_path).unwrap();
320        let count: i64 = conn
321            .query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
322            .unwrap();
323        assert_eq!(count, 0);
324    }
325
326    #[test]
327    fn rebuild_with_events() {
328        let (dir, shard_mgr) = setup_bones_dir();
329        let db_path = dir.path().join("bones.db");
330        let events_dir = dir.path().join("events");
331
332        // Write events
333        let create1 = make_create_event("bn-001", "First item", 1000);
334        let create2 = make_create_event("bn-002", "Second item", 1001);
335        let mv = make_move_event(
336            "bn-001",
337            crate::model::item::State::Doing,
338            2000,
339            &create1.event_hash,
340        );
341
342        append_event(&shard_mgr, &create1);
343        append_event(&shard_mgr, &create2);
344        append_event(&shard_mgr, &mv);
345
346        let report = rebuild(&events_dir, &db_path).unwrap();
347        assert_eq!(report.event_count, 3);
348        assert_eq!(report.item_count, 2);
349
350        // Verify items
351        let conn = open_projection(&db_path).unwrap();
352        let item: String = conn
353            .query_row(
354                "SELECT state FROM items WHERE item_id = 'bn-001'",
355                [],
356                |row| row.get(0),
357            )
358            .unwrap();
359        assert_eq!(item, "doing");
360    }
361
362    #[test]
363    fn rebuild_replaces_existing_db() {
364        let (dir, shard_mgr) = setup_bones_dir();
365        let db_path = dir.path().join("bones.db");
366        let events_dir = dir.path().join("events");
367
368        // First rebuild with 1 event
369        let create1 = make_create_event("bn-001", "Item 1", 1000);
370        append_event(&shard_mgr, &create1);
371
372        let report1 = rebuild(&events_dir, &db_path).unwrap();
373        assert_eq!(report1.event_count, 1);
374        assert_eq!(report1.item_count, 1);
375
376        // Add another event and rebuild again
377        let create2 = make_create_event("bn-002", "Item 2", 1001);
378        append_event(&shard_mgr, &create2);
379
380        let report2 = rebuild(&events_dir, &db_path).unwrap();
381        assert_eq!(report2.event_count, 2);
382        assert_eq!(report2.item_count, 2);
383    }
384
385    #[test]
386    fn rebuild_is_deterministic() {
387        let (dir, shard_mgr) = setup_bones_dir();
388        let events_dir = dir.path().join("events");
389
390        let create1 = make_create_event("bn-001", "Deterministic test", 1000);
391        let create2 = make_create_event("bn-002", "Another item", 1001);
392        append_event(&shard_mgr, &create1);
393        append_event(&shard_mgr, &create2);
394
395        // Rebuild twice to different DB paths
396        let db_path_a = dir.path().join("bones_a.db");
397        let db_path_b = dir.path().join("bones_b.db");
398
399        let report_a = rebuild(&events_dir, &db_path_a).unwrap();
400        let report_b = rebuild(&events_dir, &db_path_b).unwrap();
401
402        assert_eq!(report_a.event_count, report_b.event_count);
403        assert_eq!(report_a.item_count, report_b.item_count);
404
405        // Verify same items in both
406        let conn_a = open_projection(&db_path_a).unwrap();
407        let conn_b = open_projection(&db_path_b).unwrap();
408
409        let titles_a: Vec<String> = {
410            let mut stmt = conn_a
411                .prepare("SELECT title FROM items ORDER BY item_id")
412                .unwrap();
413            stmt.query_map([], |row| row.get(0))
414                .unwrap()
415                .map(|r| r.unwrap())
416                .collect()
417        };
418
419        let titles_b: Vec<String> = {
420            let mut stmt = conn_b
421                .prepare("SELECT title FROM items ORDER BY item_id")
422                .unwrap();
423            stmt.query_map([], |row| row.get(0))
424                .unwrap()
425                .map(|r| r.unwrap())
426                .collect()
427        };
428
429        assert_eq!(titles_a, titles_b);
430    }
431
432    #[test]
433    fn rebuild_populates_fts() {
434        let (dir, shard_mgr) = setup_bones_dir();
435        let db_path = dir.path().join("bones.db");
436        let events_dir = dir.path().join("events");
437
438        let create = make_create_event("bn-001", "Authentication timeout fix", 1000);
439        append_event(&shard_mgr, &create);
440
441        rebuild(&events_dir, &db_path).unwrap();
442
443        let conn = open_projection(&db_path).unwrap();
444        let hits: i64 = conn
445            .query_row(
446                "SELECT COUNT(*) FROM items_fts WHERE items_fts MATCH 'authentication'",
447                [],
448                |row| row.get(0),
449            )
450            .unwrap();
451        assert_eq!(hits, 1);
452    }
453
454    #[test]
455    fn rebuild_updates_projection_cursor() {
456        let (dir, shard_mgr) = setup_bones_dir();
457        let db_path = dir.path().join("bones.db");
458        let events_dir = dir.path().join("events");
459
460        let create = make_create_event("bn-001", "Item", 1000);
461        append_event(&shard_mgr, &create);
462
463        rebuild(&events_dir, &db_path).unwrap();
464
465        let conn = open_projection(&db_path).unwrap();
466        let (offset, hash) = crate::db::query::get_projection_cursor(&conn).unwrap();
467        assert!(offset > 0, "cursor offset should be non-zero after rebuild");
468        assert!(hash.is_some(), "cursor hash should be set after rebuild");
469    }
470
471    #[test]
472    fn rebuild_handles_duplicate_events() {
473        let (dir, shard_mgr) = setup_bones_dir();
474        let db_path = dir.path().join("bones.db");
475        let events_dir = dir.path().join("events");
476
477        // Append same event twice (simulates git merge duplication)
478        let create = make_create_event("bn-001", "Item", 1000);
479        append_event(&shard_mgr, &create);
480        append_event(&shard_mgr, &create);
481
482        let report = rebuild(&events_dir, &db_path).unwrap();
483        // Only 1 unique event projected, 1 duplicate skipped
484        assert_eq!(report.event_count, 1);
485        assert_eq!(report.item_count, 1);
486    }
487
488    #[test]
489    fn rebuild_with_bd_prefix_events() {
490        let (dir, shard_mgr) = setup_bones_dir();
491        let db_path = dir.path().join("bones.db");
492        let events_dir = dir.path().join("events");
493
494        // Write events with bd- prefix (migrated bead IDs)
495        let create1 = make_create_event("bd-9mx", "Parent item", 1000);
496        let create2 = make_create_event("bd-4kz", "Child item", 1001);
497
498        append_event(&shard_mgr, &create1);
499        append_event(&shard_mgr, &create2);
500
501        let report = rebuild(&events_dir, &db_path).unwrap();
502        assert_eq!(
503            report.event_count, 2,
504            "should project 2 events with bd- prefix"
505        );
506        assert_eq!(report.item_count, 2, "should have 2 items with bd- prefix");
507    }
508
509    #[test]
510    fn rebuild_performance_reasonable() {
511        let (dir, shard_mgr) = setup_bones_dir();
512        let db_path = dir.path().join("bones.db");
513        let events_dir = dir.path().join("events");
514
515        // Create 100 items — should be well under 1s
516        for i in 0..100_u32 {
517            let create = make_create_event(
518                &format!("bn-{i:04x}"),
519                &format!("Item {i}"),
520                i64::from(i) * 1000,
521            );
522            append_event(&shard_mgr, &create);
523        }
524
525        let report = rebuild(&events_dir, &db_path).unwrap();
526        assert_eq!(report.event_count, 100);
527        assert_eq!(report.item_count, 100);
528        assert!(
529            report.elapsed.as_millis() < 1000,
530            "rebuild of 100 items took {}ms, expected <1000ms",
531            report.elapsed.as_millis()
532        );
533    }
534}