1use std::path::Path;
7use std::time::Instant;
8
9use anyhow::{Context, Result};
10
11use crate::db::{open_projection, project};
12use crate::event::Event;
13use crate::shard::ShardManager;
14use std::io;
15
16const DEFAULT_REBUILD_BATCH_SIZE: usize = 4_000;
17
18fn rebuild_batch_size() -> usize {
19 std::env::var("BONES_REBUILD_BATCH_SIZE")
20 .ok()
21 .and_then(|v| v.parse::<usize>().ok())
22 .filter(|v| *v > 0)
23 .unwrap_or(DEFAULT_REBUILD_BATCH_SIZE)
24}
25
26fn configure_rebuild_pragmas(conn: &rusqlite::Connection) -> Result<()> {
27 conn.pragma_update(None, "temp_store", "MEMORY")
28 .context("PRAGMA temp_store = MEMORY")?;
29 conn.pragma_update(None, "cache_size", -131_072_i64)
30 .context("PRAGMA cache_size")?;
31 conn.pragma_update(None, "mmap_size", 268_435_456_i64)
32 .context("PRAGMA mmap_size")?;
33
34 conn.pragma_update(None, "synchronous", "OFF")
44 .context("PRAGMA synchronous = OFF")?;
45 let _: String = conn
46 .query_row("PRAGMA journal_mode = OFF", [], |row| row.get(0))
47 .context("PRAGMA journal_mode = OFF")?;
48 let _: String = conn
49 .query_row("PRAGMA locking_mode = EXCLUSIVE", [], |row| row.get(0))
50 .context("PRAGMA locking_mode = EXCLUSIVE")?;
51 Ok(())
52}
53
54const FTS_TRIGGERS: &[&str] = &["items_ai", "items_au", "items_ad"];
58
59fn drop_fts_triggers(conn: &rusqlite::Connection) -> Result<()> {
64 for name in FTS_TRIGGERS {
65 conn.execute_batch(&format!("DROP TRIGGER IF EXISTS {name}"))
66 .with_context(|| format!("drop trigger {name}"))?;
67 }
68 Ok(())
69}
70
71fn restore_fts_triggers(conn: &rusqlite::Connection) -> Result<()> {
74 conn.execute_batch(
75 "CREATE TRIGGER IF NOT EXISTS items_ai
76 AFTER INSERT ON items
77 BEGIN
78 INSERT INTO items_fts(rowid, title, description, labels, item_id)
79 VALUES (
80 new.rowid,
81 new.title,
82 COALESCE(new.description, ''),
83 COALESCE(new.search_labels, ''),
84 new.item_id
85 );
86 END;
87
88 CREATE TRIGGER IF NOT EXISTS items_au
89 AFTER UPDATE ON items
90 BEGIN
91 DELETE FROM items_fts WHERE rowid = old.rowid;
92 INSERT INTO items_fts(rowid, title, description, labels, item_id)
93 VALUES (
94 new.rowid,
95 new.title,
96 COALESCE(new.description, ''),
97 COALESCE(new.search_labels, ''),
98 new.item_id
99 );
100 END;
101
102 CREATE TRIGGER IF NOT EXISTS items_ad
103 AFTER DELETE ON items
104 BEGIN
105 DELETE FROM items_fts WHERE rowid = old.rowid;
106 END;",
107 )
108 .context("recreate FTS5 maintenance triggers after rebuild")?;
109 Ok(())
110}
111
112#[derive(Debug, Clone, PartialEq, Eq)]
118pub struct RebuildReport {
119 pub event_count: usize,
121 pub item_count: usize,
123 pub elapsed: std::time::Duration,
125 pub shard_count: usize,
127 pub fts5_rebuilt: bool,
129}
130
131fn check_sealed_shard_integrity(shard_mgr: &ShardManager) -> Result<()> {
137 let issues = shard_mgr
138 .validate_sealed_shards()
139 .map_err(|e| anyhow::anyhow!("sealed shard validation: {e}"))?;
140 if !issues.is_empty() {
141 for issue in &issues {
142 tracing::error!(
143 shard = %issue.shard_name,
144 problem = %issue.problem,
145 "sealed shard integrity check failed"
146 );
147 }
148 anyhow::bail!(
149 "sealed shard corrupted: {} (run `bn doctor` to diagnose)",
150 issues[0].problem
151 );
152 }
153 Ok(())
154}
155
156pub fn rebuild(events_dir: &Path, db_path: &Path) -> Result<RebuildReport> {
173 let start = Instant::now();
174
175 if db_path.exists() {
177 std::fs::remove_file(db_path)
178 .with_context(|| format!("remove existing projection db {}", db_path.display()))?;
179 let wal_path = db_path.with_extension("db-wal");
181 let shm_path = db_path.with_extension("db-shm");
182 let _ = std::fs::remove_file(wal_path);
183 let _ = std::fs::remove_file(shm_path);
184 }
185
186 let conn = open_projection(db_path).context("create fresh projection database")?;
188 configure_rebuild_pragmas(&conn).context("configure rebuild sqlite pragmas")?;
189 project::ensure_tracking_table(&conn).context("create tracking table")?;
190
191 drop_fts_triggers(&conn).context("drop FTS5 triggers for bulk rebuild")?;
196
197 let bones_dir = events_dir.parent().unwrap_or_else(|| Path::new("."));
199 let shard_mgr = ShardManager::new(bones_dir);
200
201 let shards = shard_mgr
202 .list_shards()
203 .map_err(|e| anyhow::anyhow!("list shards: {e}"))?;
204 let shard_count = shards.len();
205
206 check_sealed_shard_integrity(&shard_mgr)?;
207
208 let mut version_checked = false;
211 let mut shard_version = crate::event::parser::CURRENT_VERSION;
212 let mut line_no = 0;
213 let mut total_projected = 0;
214 let mut total_duplicates = 0;
215 let mut last_event_hash = None;
216 let mut total_byte_len = 0;
217
218 let batch_size = rebuild_batch_size();
219 let mut current_batch: Vec<Event> = Vec::with_capacity(batch_size);
220 let projector = project::Projector::new(&conn);
221
222 let shard_line_iter = shard_mgr.replay_lines()?;
223
224 for line_res in shard_line_iter {
225 let (offset, line): (usize, String) =
226 line_res.map_err(|e: io::Error| anyhow::anyhow!("read shard line: {e}"))?;
227 line_no += 1;
228 total_byte_len = offset + line.len();
229
230 if !version_checked && line.trim_start().starts_with("# bones event log v") {
231 version_checked = true;
232 shard_version = crate::event::parser::detect_version(&line)
233 .map_err(|msg| anyhow::anyhow!("version check failed at line {line_no}: {msg}"))?;
234 continue;
235 }
236
237 match crate::event::parser::parse_line(&line) {
238 Ok(crate::event::parser::ParsedLine::Event(event)) => {
239 let event = crate::event::migrate_event(*event, shard_version)
240 .map_err(|e| anyhow::anyhow!("migration failed at line {line_no}: {e}"))?;
241
242 last_event_hash = Some(event.event_hash.clone());
243 current_batch.push(event);
244
245 if current_batch.len() >= batch_size {
246 let stats = projector
247 .project_batch(¤t_batch)
248 .context("project batch during rebuild")?;
249 total_projected += stats.projected;
250 total_duplicates += stats.duplicates;
251 current_batch.clear();
252 }
253 }
254 Ok(
255 crate::event::parser::ParsedLine::Comment(_)
256 | crate::event::parser::ParsedLine::Blank,
257 ) => {}
258 Err(crate::event::parser::ParseError::InvalidEventType(raw)) => {
259 tracing::warn!(line = line_no, event_type = %raw, "skipping unknown event type");
260 }
261 Err(e) => anyhow::bail!("parse error at line {line_no}: {e}"),
262 }
263 }
264
265 if !current_batch.is_empty() {
267 let stats = projector
268 .project_batch(¤t_batch)
269 .context("project final batch during rebuild")?;
270 total_projected += stats.projected;
271 total_duplicates += stats.duplicates;
272 }
273
274 crate::db::fts::rebuild_fts_index(&conn).context("rebuild FTS5 index after bulk load")?;
278 restore_fts_triggers(&conn).context("restore FTS5 triggers after bulk rebuild")?;
279
280 let byte_offset_i64 = i64::try_from(total_byte_len).unwrap_or(i64::MAX);
282 crate::db::query::update_projection_cursor(&conn, byte_offset_i64, last_event_hash.as_deref())
283 .context("update projection cursor after rebuild")?;
284
285 let item_count: i64 = conn
287 .query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
288 .context("count items after rebuild")?;
289
290 let elapsed = start.elapsed();
291
292 tracing::info!(
293 event_count = total_projected,
294 duplicates = total_duplicates,
295 batch_size,
296 item_count,
297 shard_count,
298 elapsed_ms = elapsed.as_millis(),
299 "projection rebuild complete"
300 );
301
302 Ok(RebuildReport {
303 event_count: total_projected,
304 item_count: usize::try_from(item_count).unwrap_or(0),
305 elapsed,
306 shard_count,
307 fts5_rebuilt: true, })
309}
310
311#[cfg(test)]
316mod tests {
317 use super::*;
318 use crate::event::Event;
319 use crate::event::data::*;
320 use crate::event::types::EventType;
321 use crate::event::writer;
322 use crate::model::item::{Kind, Size, Urgency};
323 use crate::model::item_id::ItemId;
324 use crate::shard::ShardManager;
325 use std::collections::BTreeMap;
326 use tempfile::TempDir;
327
328 fn setup_bones_dir() -> (TempDir, ShardManager) {
329 let dir = TempDir::new().expect("create tempdir");
330 let shard_mgr = ShardManager::new(dir.path());
331 shard_mgr.ensure_dirs().expect("ensure dirs");
332 shard_mgr.init().expect("init shard");
333 (dir, shard_mgr)
334 }
335
336 fn make_create_event(id: &str, title: &str, ts: i64) -> Event {
337 let mut event = Event {
338 wall_ts_us: ts,
339 agent: "test-agent".into(),
340 itc: "itc:AQ".into(),
341 parents: vec![],
342 event_type: EventType::Create,
343 item_id: ItemId::new_unchecked(id),
344 data: EventData::Create(CreateData {
345 title: title.into(),
346 kind: Kind::Task,
347 size: Some(Size::M),
348 urgency: Urgency::Default,
349 labels: vec!["test".into()],
350 parent: None,
351 causation: None,
352 description: Some(format!("Description for {title}")),
353 extra: BTreeMap::new(),
354 }),
355 event_hash: String::new(),
356 };
357 writer::write_event(&mut event).expect("compute hash");
359 event
360 }
361
362 fn make_move_event(
363 id: &str,
364 state: crate::model::item::State,
365 ts: i64,
366 parent_hash: &str,
367 ) -> Event {
368 let mut event = Event {
369 wall_ts_us: ts,
370 agent: "test-agent".into(),
371 itc: "itc:AQ".into(),
372 parents: vec![parent_hash.into()],
373 event_type: EventType::Move,
374 item_id: ItemId::new_unchecked(id),
375 data: EventData::Move(MoveData {
376 state,
377 reason: None,
378 extra: BTreeMap::new(),
379 }),
380 event_hash: String::new(),
381 };
382 writer::write_event(&mut event).expect("compute hash");
383 event
384 }
385
386 fn append_event(shard_mgr: &ShardManager, event: &Event) {
387 let line = writer::write_line(event).expect("serialize event");
388 let (year, month) = shard_mgr.active_shard().unwrap().unwrap();
389 shard_mgr
390 .append_raw(year, month, &line)
391 .expect("append event");
392 }
393
394 #[test]
395 fn rebuild_empty_event_log() {
396 let (dir, _shard_mgr) = setup_bones_dir();
397 let db_path = dir.path().join("bones.db");
398 let events_dir = dir.path().join("events");
399
400 let report = rebuild(&events_dir, &db_path).unwrap();
401 assert_eq!(report.event_count, 0);
402 assert_eq!(report.item_count, 0);
403 assert_eq!(report.shard_count, 1); assert!(report.fts5_rebuilt);
405
406 let conn = open_projection(&db_path).unwrap();
408 let count: i64 = conn
409 .query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
410 .unwrap();
411 assert_eq!(count, 0);
412 }
413
414 #[test]
415 fn rebuild_with_events() {
416 let (dir, shard_mgr) = setup_bones_dir();
417 let db_path = dir.path().join("bones.db");
418 let events_dir = dir.path().join("events");
419
420 let create1 = make_create_event("bn-001", "First item", 1000);
422 let create2 = make_create_event("bn-002", "Second item", 1001);
423 let mv = make_move_event(
424 "bn-001",
425 crate::model::item::State::Doing,
426 2000,
427 &create1.event_hash,
428 );
429
430 append_event(&shard_mgr, &create1);
431 append_event(&shard_mgr, &create2);
432 append_event(&shard_mgr, &mv);
433
434 let report = rebuild(&events_dir, &db_path).unwrap();
435 assert_eq!(report.event_count, 3);
436 assert_eq!(report.item_count, 2);
437
438 let conn = open_projection(&db_path).unwrap();
440 let item: String = conn
441 .query_row(
442 "SELECT state FROM items WHERE item_id = 'bn-001'",
443 [],
444 |row| row.get(0),
445 )
446 .unwrap();
447 assert_eq!(item, "doing");
448 }
449
450 #[test]
451 fn rebuild_replaces_existing_db() {
452 let (dir, shard_mgr) = setup_bones_dir();
453 let db_path = dir.path().join("bones.db");
454 let events_dir = dir.path().join("events");
455
456 let create1 = make_create_event("bn-001", "Item 1", 1000);
458 append_event(&shard_mgr, &create1);
459
460 let report1 = rebuild(&events_dir, &db_path).unwrap();
461 assert_eq!(report1.event_count, 1);
462 assert_eq!(report1.item_count, 1);
463
464 let create2 = make_create_event("bn-002", "Item 2", 1001);
466 append_event(&shard_mgr, &create2);
467
468 let report2 = rebuild(&events_dir, &db_path).unwrap();
469 assert_eq!(report2.event_count, 2);
470 assert_eq!(report2.item_count, 2);
471 }
472
473 #[test]
474 fn rebuild_is_deterministic() {
475 let (dir, shard_mgr) = setup_bones_dir();
476 let events_dir = dir.path().join("events");
477
478 let create1 = make_create_event("bn-001", "Deterministic test", 1000);
479 let create2 = make_create_event("bn-002", "Another item", 1001);
480 append_event(&shard_mgr, &create1);
481 append_event(&shard_mgr, &create2);
482
483 let db_path_a = dir.path().join("bones_a.db");
485 let db_path_b = dir.path().join("bones_b.db");
486
487 let report_a = rebuild(&events_dir, &db_path_a).unwrap();
488 let report_b = rebuild(&events_dir, &db_path_b).unwrap();
489
490 assert_eq!(report_a.event_count, report_b.event_count);
491 assert_eq!(report_a.item_count, report_b.item_count);
492
493 let conn_a = open_projection(&db_path_a).unwrap();
495 let conn_b = open_projection(&db_path_b).unwrap();
496
497 let titles_a: Vec<String> = {
498 let mut stmt = conn_a
499 .prepare("SELECT title FROM items ORDER BY item_id")
500 .unwrap();
501 stmt.query_map([], |row| row.get(0))
502 .unwrap()
503 .map(|r| r.unwrap())
504 .collect()
505 };
506
507 let titles_b: Vec<String> = {
508 let mut stmt = conn_b
509 .prepare("SELECT title FROM items ORDER BY item_id")
510 .unwrap();
511 stmt.query_map([], |row| row.get(0))
512 .unwrap()
513 .map(|r| r.unwrap())
514 .collect()
515 };
516
517 assert_eq!(titles_a, titles_b);
518 }
519
520 #[test]
521 fn rebuild_populates_fts() {
522 let (dir, shard_mgr) = setup_bones_dir();
523 let db_path = dir.path().join("bones.db");
524 let events_dir = dir.path().join("events");
525
526 let create = make_create_event("bn-001", "Authentication timeout fix", 1000);
527 append_event(&shard_mgr, &create);
528
529 rebuild(&events_dir, &db_path).unwrap();
530
531 let conn = open_projection(&db_path).unwrap();
532 let hits: i64 = conn
533 .query_row(
534 "SELECT COUNT(*) FROM items_fts WHERE items_fts MATCH 'authentication'",
535 [],
536 |row| row.get(0),
537 )
538 .unwrap();
539 assert_eq!(hits, 1);
540 }
541
542 #[test]
543 fn rebuild_updates_projection_cursor() {
544 let (dir, shard_mgr) = setup_bones_dir();
545 let db_path = dir.path().join("bones.db");
546 let events_dir = dir.path().join("events");
547
548 let create = make_create_event("bn-001", "Item", 1000);
549 append_event(&shard_mgr, &create);
550
551 rebuild(&events_dir, &db_path).unwrap();
552
553 let conn = open_projection(&db_path).unwrap();
554 let (offset, hash) = crate::db::query::get_projection_cursor(&conn).unwrap();
555 assert!(offset > 0, "cursor offset should be non-zero after rebuild");
556 assert!(hash.is_some(), "cursor hash should be set after rebuild");
557 }
558
559 #[test]
560 fn rebuild_handles_duplicate_events() {
561 let (dir, shard_mgr) = setup_bones_dir();
562 let db_path = dir.path().join("bones.db");
563 let events_dir = dir.path().join("events");
564
565 let create = make_create_event("bn-001", "Item", 1000);
567 append_event(&shard_mgr, &create);
568 append_event(&shard_mgr, &create);
569
570 let report = rebuild(&events_dir, &db_path).unwrap();
571 assert_eq!(report.event_count, 1);
573 assert_eq!(report.item_count, 1);
574 }
575
576 #[test]
577 fn rebuild_with_bd_prefix_events() {
578 let (dir, shard_mgr) = setup_bones_dir();
579 let db_path = dir.path().join("bones.db");
580 let events_dir = dir.path().join("events");
581
582 let create1 = make_create_event("bd-9mx", "Parent item", 1000);
584 let create2 = make_create_event("bd-4kz", "Child item", 1001);
585
586 append_event(&shard_mgr, &create1);
587 append_event(&shard_mgr, &create2);
588
589 let report = rebuild(&events_dir, &db_path).unwrap();
590 assert_eq!(
591 report.event_count, 2,
592 "should project 2 events with bd- prefix"
593 );
594 assert_eq!(report.item_count, 2, "should have 2 items with bd- prefix");
595 }
596
597 #[test]
598 fn rebuild_performance_reasonable() {
599 let (dir, shard_mgr) = setup_bones_dir();
600 let db_path = dir.path().join("bones.db");
601 let events_dir = dir.path().join("events");
602
603 for i in 0..100_u32 {
605 let create = make_create_event(
606 &format!("bn-{i:04x}"),
607 &format!("Item {i}"),
608 i64::from(i) * 1000,
609 );
610 append_event(&shard_mgr, &create);
611 }
612
613 let report = rebuild(&events_dir, &db_path).unwrap();
614 assert_eq!(report.event_count, 100);
615 assert_eq!(report.item_count, 100);
616 assert!(
617 report.elapsed.as_millis() < 1000,
618 "rebuild of 100 items took {}ms, expected <1000ms",
619 report.elapsed.as_millis()
620 );
621 }
622}