Skip to main content

bones_core/db/
rebuild.rs

1//! Full projection rebuild from the event log.
2//!
3//! `bn admin rebuild` drops and recreates the entire `SQLite` DB from the canonical
4//! event log, proving the projection is disposable and reproducible.
5
6use std::path::Path;
7use std::time::Instant;
8
9use anyhow::{Context, Result};
10
11use crate::db::{open_projection, project};
12use crate::event::Event;
13use crate::shard::ShardManager;
14use std::io;
15
16// ---------------------------------------------------------------------------
17// RebuildReport
18// ---------------------------------------------------------------------------
19
20/// Report returned after a full projection rebuild.
21#[derive(Debug, Clone, PartialEq, Eq)]
22pub struct RebuildReport {
23    /// Total events replayed from all shards.
24    pub event_count: usize,
25    /// Total unique items in the rebuilt projection.
26    pub item_count: usize,
27    /// Wall-clock elapsed time for the rebuild.
28    pub elapsed: std::time::Duration,
29    /// Number of shard files processed.
30    pub shard_count: usize,
31    /// Whether FTS5 index was rebuilt.
32    pub fts5_rebuilt: bool,
33}
34
35// ---------------------------------------------------------------------------
36// rebuild
37// ---------------------------------------------------------------------------
38
39/// Drop the existing DB and rebuild it from the canonical event log.
40///
41/// 1. Deletes the existing database file (if any)
42/// 2. Creates a fresh schema via `open_projection`
43/// 3. Replays all events from `events_dir` shards through the projector
44/// 4. FTS5 index is maintained via triggers during projection
45/// 5. Updates the projection cursor with the final offset
46///
47/// # Arguments
48///
49/// * `events_dir` — Path to `.bones/events/` (the shard directory)
50/// * `db_path` — Path to `.bones/bones.db` (the `SQLite` projection file)
51///
52/// # Errors
53///
54/// Returns an error if shard reading, event parsing, or projection fails.
55pub fn rebuild(events_dir: &Path, db_path: &Path) -> Result<RebuildReport> {
56    let start = Instant::now();
57
58    // 1. Delete existing database file
59    if db_path.exists() {
60        std::fs::remove_file(db_path)
61            .with_context(|| format!("remove existing projection db {}", db_path.display()))?;
62        // Also remove WAL and SHM files
63        let wal_path = db_path.with_extension("db-wal");
64        let shm_path = db_path.with_extension("db-shm");
65        let _ = std::fs::remove_file(wal_path);
66        let _ = std::fs::remove_file(shm_path);
67    }
68
69    // 2. Create fresh schema
70    let conn = open_projection(db_path).context("create fresh projection database")?;
71    project::ensure_tracking_table(&conn).context("create tracking table")?;
72
73    // 3. Read and replay all events in streaming batches
74    let bones_dir = events_dir.parent().unwrap_or_else(|| Path::new("."));
75    let shard_mgr = ShardManager::new(bones_dir);
76
77    let shards = shard_mgr
78        .list_shards()
79        .map_err(|e| anyhow::anyhow!("list shards: {e}"))?;
80    let shard_count = shards.len();
81
82    // We need a custom loop because EventParser expects Iterator<Item = String>
83    // and returns Result<Event, ...>.
84    let mut version_checked = false;
85    let mut shard_version = crate::event::parser::CURRENT_VERSION;
86    let mut line_no = 0;
87    let mut total_projected = 0;
88    let mut total_duplicates = 0;
89    let mut last_event_hash = None;
90    let mut total_byte_len = 0;
91
92    let mut current_batch: Vec<Event> = Vec::with_capacity(1000);
93    let projector = project::Projector::new(&conn);
94
95    let shard_line_iter = shard_mgr.replay_lines()?;
96
97    for line_res in shard_line_iter {
98        let (offset, line): (usize, String) =
99            line_res.map_err(|e: io::Error| anyhow::anyhow!("read shard line: {e}"))?;
100        line_no += 1;
101        total_byte_len = offset + line.len();
102
103        if !version_checked && line.trim_start().starts_with("# bones event log v") {
104            version_checked = true;
105            shard_version = crate::event::parser::detect_version(&line)
106                .map_err(|msg| anyhow::anyhow!("version check failed at line {line_no}: {msg}"))?;
107            continue;
108        }
109
110        match crate::event::parser::parse_line(&line) {
111            Ok(crate::event::parser::ParsedLine::Event(event)) => {
112                let event = crate::event::migrate_event(*event, shard_version)
113                    .map_err(|e| anyhow::anyhow!("migration failed at line {line_no}: {e}"))?;
114
115                last_event_hash = Some(event.event_hash.clone());
116                current_batch.push(event);
117
118                if current_batch.len() >= 1000 {
119                    let stats = projector
120                        .project_batch(&current_batch)
121                        .context("project batch during rebuild")?;
122                    total_projected += stats.projected;
123                    total_duplicates += stats.duplicates;
124                    current_batch.clear();
125                }
126            }
127            Ok(
128                crate::event::parser::ParsedLine::Comment(_)
129                | crate::event::parser::ParsedLine::Blank,
130            ) => {}
131            Err(crate::event::parser::ParseError::InvalidEventType(raw)) => {
132                tracing::warn!(line = line_no, event_type = %raw, "skipping unknown event type");
133            }
134            Err(e) => anyhow::bail!("parse error at line {line_no}: {e}"),
135        }
136    }
137
138    // Final batch
139    if !current_batch.is_empty() {
140        let stats = projector
141            .project_batch(&current_batch)
142            .context("project final batch during rebuild")?;
143        total_projected += stats.projected;
144        total_duplicates += stats.duplicates;
145    }
146
147    // 5. Update projection cursor
148    let byte_offset_i64 = i64::try_from(total_byte_len).unwrap_or(i64::MAX);
149    crate::db::query::update_projection_cursor(&conn, byte_offset_i64, last_event_hash.as_deref())
150        .context("update projection cursor after rebuild")?;
151
152    // Count unique items
153    let item_count: i64 = conn
154        .query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
155        .context("count items after rebuild")?;
156
157    let elapsed = start.elapsed();
158
159    tracing::info!(
160        event_count = total_projected,
161        duplicates = total_duplicates,
162        item_count,
163        shard_count,
164        elapsed_ms = elapsed.as_millis(),
165        "projection rebuild complete"
166    );
167
168    Ok(RebuildReport {
169        event_count: total_projected,
170        item_count: usize::try_from(item_count).unwrap_or(0),
171        elapsed,
172        shard_count,
173        fts5_rebuilt: true, // FTS5 triggers fire during projection
174    })
175}
176
177// ---------------------------------------------------------------------------
178// Tests
179// ---------------------------------------------------------------------------
180
181#[cfg(test)]
182mod tests {
183    use super::*;
184    use crate::event::Event;
185    use crate::event::data::*;
186    use crate::event::types::EventType;
187    use crate::event::writer;
188    use crate::model::item::{Kind, Size, Urgency};
189    use crate::model::item_id::ItemId;
190    use crate::shard::ShardManager;
191    use std::collections::BTreeMap;
192    use tempfile::TempDir;
193
194    fn setup_bones_dir() -> (TempDir, ShardManager) {
195        let dir = TempDir::new().expect("create tempdir");
196        let shard_mgr = ShardManager::new(dir.path());
197        shard_mgr.ensure_dirs().expect("ensure dirs");
198        shard_mgr.init().expect("init shard");
199        (dir, shard_mgr)
200    }
201
202    fn make_create_event(id: &str, title: &str, ts: i64) -> Event {
203        let mut event = Event {
204            wall_ts_us: ts,
205            agent: "test-agent".into(),
206            itc: "itc:AQ".into(),
207            parents: vec![],
208            event_type: EventType::Create,
209            item_id: ItemId::new_unchecked(id),
210            data: EventData::Create(CreateData {
211                title: title.into(),
212                kind: Kind::Task,
213                size: Some(Size::M),
214                urgency: Urgency::Default,
215                labels: vec!["test".into()],
216                parent: None,
217                causation: None,
218                description: Some(format!("Description for {title}")),
219                extra: BTreeMap::new(),
220            }),
221            event_hash: String::new(),
222        };
223        // Compute hash
224        writer::write_event(&mut event).expect("compute hash");
225        event
226    }
227
228    fn make_move_event(
229        id: &str,
230        state: crate::model::item::State,
231        ts: i64,
232        parent_hash: &str,
233    ) -> Event {
234        let mut event = Event {
235            wall_ts_us: ts,
236            agent: "test-agent".into(),
237            itc: "itc:AQ".into(),
238            parents: vec![parent_hash.into()],
239            event_type: EventType::Move,
240            item_id: ItemId::new_unchecked(id),
241            data: EventData::Move(MoveData {
242                state,
243                reason: None,
244                extra: BTreeMap::new(),
245            }),
246            event_hash: String::new(),
247        };
248        writer::write_event(&mut event).expect("compute hash");
249        event
250    }
251
252    fn append_event(shard_mgr: &ShardManager, event: &Event) {
253        let line = writer::write_line(event).expect("serialize event");
254        let (year, month) = shard_mgr.active_shard().unwrap().unwrap();
255        shard_mgr
256            .append_raw(year, month, &line)
257            .expect("append event");
258    }
259
260    #[test]
261    fn rebuild_empty_event_log() {
262        let (dir, _shard_mgr) = setup_bones_dir();
263        let db_path = dir.path().join("bones.db");
264        let events_dir = dir.path().join("events");
265
266        let report = rebuild(&events_dir, &db_path).unwrap();
267        assert_eq!(report.event_count, 0);
268        assert_eq!(report.item_count, 0);
269        assert_eq!(report.shard_count, 1); // init creates one shard
270        assert!(report.fts5_rebuilt);
271
272        // Verify DB exists and is valid
273        let conn = open_projection(&db_path).unwrap();
274        let count: i64 = conn
275            .query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
276            .unwrap();
277        assert_eq!(count, 0);
278    }
279
280    #[test]
281    fn rebuild_with_events() {
282        let (dir, shard_mgr) = setup_bones_dir();
283        let db_path = dir.path().join("bones.db");
284        let events_dir = dir.path().join("events");
285
286        // Write events
287        let create1 = make_create_event("bn-001", "First item", 1000);
288        let create2 = make_create_event("bn-002", "Second item", 1001);
289        let mv = make_move_event(
290            "bn-001",
291            crate::model::item::State::Doing,
292            2000,
293            &create1.event_hash,
294        );
295
296        append_event(&shard_mgr, &create1);
297        append_event(&shard_mgr, &create2);
298        append_event(&shard_mgr, &mv);
299
300        let report = rebuild(&events_dir, &db_path).unwrap();
301        assert_eq!(report.event_count, 3);
302        assert_eq!(report.item_count, 2);
303
304        // Verify items
305        let conn = open_projection(&db_path).unwrap();
306        let item: String = conn
307            .query_row(
308                "SELECT state FROM items WHERE item_id = 'bn-001'",
309                [],
310                |row| row.get(0),
311            )
312            .unwrap();
313        assert_eq!(item, "doing");
314    }
315
316    #[test]
317    fn rebuild_replaces_existing_db() {
318        let (dir, shard_mgr) = setup_bones_dir();
319        let db_path = dir.path().join("bones.db");
320        let events_dir = dir.path().join("events");
321
322        // First rebuild with 1 event
323        let create1 = make_create_event("bn-001", "Item 1", 1000);
324        append_event(&shard_mgr, &create1);
325
326        let report1 = rebuild(&events_dir, &db_path).unwrap();
327        assert_eq!(report1.event_count, 1);
328        assert_eq!(report1.item_count, 1);
329
330        // Add another event and rebuild again
331        let create2 = make_create_event("bn-002", "Item 2", 1001);
332        append_event(&shard_mgr, &create2);
333
334        let report2 = rebuild(&events_dir, &db_path).unwrap();
335        assert_eq!(report2.event_count, 2);
336        assert_eq!(report2.item_count, 2);
337    }
338
339    #[test]
340    fn rebuild_is_deterministic() {
341        let (dir, shard_mgr) = setup_bones_dir();
342        let events_dir = dir.path().join("events");
343
344        let create1 = make_create_event("bn-001", "Deterministic test", 1000);
345        let create2 = make_create_event("bn-002", "Another item", 1001);
346        append_event(&shard_mgr, &create1);
347        append_event(&shard_mgr, &create2);
348
349        // Rebuild twice to different DB paths
350        let db_path_a = dir.path().join("bones_a.db");
351        let db_path_b = dir.path().join("bones_b.db");
352
353        let report_a = rebuild(&events_dir, &db_path_a).unwrap();
354        let report_b = rebuild(&events_dir, &db_path_b).unwrap();
355
356        assert_eq!(report_a.event_count, report_b.event_count);
357        assert_eq!(report_a.item_count, report_b.item_count);
358
359        // Verify same items in both
360        let conn_a = open_projection(&db_path_a).unwrap();
361        let conn_b = open_projection(&db_path_b).unwrap();
362
363        let titles_a: Vec<String> = {
364            let mut stmt = conn_a
365                .prepare("SELECT title FROM items ORDER BY item_id")
366                .unwrap();
367            stmt.query_map([], |row| row.get(0))
368                .unwrap()
369                .map(|r| r.unwrap())
370                .collect()
371        };
372
373        let titles_b: Vec<String> = {
374            let mut stmt = conn_b
375                .prepare("SELECT title FROM items ORDER BY item_id")
376                .unwrap();
377            stmt.query_map([], |row| row.get(0))
378                .unwrap()
379                .map(|r| r.unwrap())
380                .collect()
381        };
382
383        assert_eq!(titles_a, titles_b);
384    }
385
386    #[test]
387    fn rebuild_populates_fts() {
388        let (dir, shard_mgr) = setup_bones_dir();
389        let db_path = dir.path().join("bones.db");
390        let events_dir = dir.path().join("events");
391
392        let create = make_create_event("bn-001", "Authentication timeout fix", 1000);
393        append_event(&shard_mgr, &create);
394
395        rebuild(&events_dir, &db_path).unwrap();
396
397        let conn = open_projection(&db_path).unwrap();
398        let hits: i64 = conn
399            .query_row(
400                "SELECT COUNT(*) FROM items_fts WHERE items_fts MATCH 'authentication'",
401                [],
402                |row| row.get(0),
403            )
404            .unwrap();
405        assert_eq!(hits, 1);
406    }
407
408    #[test]
409    fn rebuild_updates_projection_cursor() {
410        let (dir, shard_mgr) = setup_bones_dir();
411        let db_path = dir.path().join("bones.db");
412        let events_dir = dir.path().join("events");
413
414        let create = make_create_event("bn-001", "Item", 1000);
415        append_event(&shard_mgr, &create);
416
417        rebuild(&events_dir, &db_path).unwrap();
418
419        let conn = open_projection(&db_path).unwrap();
420        let (offset, hash) = crate::db::query::get_projection_cursor(&conn).unwrap();
421        assert!(offset > 0, "cursor offset should be non-zero after rebuild");
422        assert!(hash.is_some(), "cursor hash should be set after rebuild");
423    }
424
425    #[test]
426    fn rebuild_handles_duplicate_events() {
427        let (dir, shard_mgr) = setup_bones_dir();
428        let db_path = dir.path().join("bones.db");
429        let events_dir = dir.path().join("events");
430
431        // Append same event twice (simulates git merge duplication)
432        let create = make_create_event("bn-001", "Item", 1000);
433        append_event(&shard_mgr, &create);
434        append_event(&shard_mgr, &create);
435
436        let report = rebuild(&events_dir, &db_path).unwrap();
437        // Only 1 unique event projected, 1 duplicate skipped
438        assert_eq!(report.event_count, 1);
439        assert_eq!(report.item_count, 1);
440    }
441
442    #[test]
443    fn rebuild_with_bd_prefix_events() {
444        let (dir, shard_mgr) = setup_bones_dir();
445        let db_path = dir.path().join("bones.db");
446        let events_dir = dir.path().join("events");
447
448        // Write events with bd- prefix (migrated bead IDs)
449        let create1 = make_create_event("bd-9mx", "Parent item", 1000);
450        let create2 = make_create_event("bd-4kz", "Child item", 1001);
451
452        append_event(&shard_mgr, &create1);
453        append_event(&shard_mgr, &create2);
454
455        let report = rebuild(&events_dir, &db_path).unwrap();
456        assert_eq!(
457            report.event_count, 2,
458            "should project 2 events with bd- prefix"
459        );
460        assert_eq!(report.item_count, 2, "should have 2 items with bd- prefix");
461    }
462
463    #[test]
464    fn rebuild_performance_reasonable() {
465        let (dir, shard_mgr) = setup_bones_dir();
466        let db_path = dir.path().join("bones.db");
467        let events_dir = dir.path().join("events");
468
469        // Create 100 items — should be well under 1s
470        for i in 0..100_u32 {
471            let create = make_create_event(
472                &format!("bn-{i:04x}"),
473                &format!("Item {i}"),
474                i64::from(i) * 1000,
475            );
476            append_event(&shard_mgr, &create);
477        }
478
479        let report = rebuild(&events_dir, &db_path).unwrap();
480        assert_eq!(report.event_count, 100);
481        assert_eq!(report.item_count, 100);
482        assert!(
483            report.elapsed.as_millis() < 1000,
484            "rebuild of 100 items took {}ms, expected <1000ms",
485            report.elapsed.as_millis()
486        );
487    }
488}