use std::path::Path;
use std::time::Instant;
use anyhow::{Context, Result};
use crate::db::{open_projection, project};
use crate::event::Event;
use crate::shard::ShardManager;
use std::io;
const DEFAULT_REBUILD_BATCH_SIZE: usize = 4_000;
fn rebuild_batch_size() -> usize {
std::env::var("BONES_REBUILD_BATCH_SIZE")
.ok()
.and_then(|v| v.parse::<usize>().ok())
.filter(|v| *v > 0)
.unwrap_or(DEFAULT_REBUILD_BATCH_SIZE)
}
fn configure_rebuild_pragmas(conn: &rusqlite::Connection) -> Result<()> {
conn.pragma_update(None, "temp_store", "MEMORY")
.context("PRAGMA temp_store = MEMORY")?;
conn.pragma_update(None, "cache_size", -131_072_i64)
.context("PRAGMA cache_size")?;
conn.pragma_update(None, "mmap_size", 268_435_456_i64)
.context("PRAGMA mmap_size")?;
conn.pragma_update(None, "synchronous", "OFF")
.context("PRAGMA synchronous = OFF")?;
let _: String = conn
.query_row("PRAGMA journal_mode = OFF", [], |row| row.get(0))
.context("PRAGMA journal_mode = OFF")?;
let _: String = conn
.query_row("PRAGMA locking_mode = EXCLUSIVE", [], |row| row.get(0))
.context("PRAGMA locking_mode = EXCLUSIVE")?;
Ok(())
}
const FTS_TRIGGERS: &[&str] = &["items_ai", "items_au", "items_ad"];
fn drop_fts_triggers(conn: &rusqlite::Connection) -> Result<()> {
for name in FTS_TRIGGERS {
conn.execute_batch(&format!("DROP TRIGGER IF EXISTS {name}"))
.with_context(|| format!("drop trigger {name}"))?;
}
Ok(())
}
fn restore_fts_triggers(conn: &rusqlite::Connection) -> Result<()> {
conn.execute_batch(
"CREATE TRIGGER IF NOT EXISTS items_ai
AFTER INSERT ON items
BEGIN
INSERT INTO items_fts(rowid, title, description, labels, item_id)
VALUES (
new.rowid,
new.title,
COALESCE(new.description, ''),
COALESCE(new.search_labels, ''),
new.item_id
);
END;
CREATE TRIGGER IF NOT EXISTS items_au
AFTER UPDATE ON items
BEGIN
DELETE FROM items_fts WHERE rowid = old.rowid;
INSERT INTO items_fts(rowid, title, description, labels, item_id)
VALUES (
new.rowid,
new.title,
COALESCE(new.description, ''),
COALESCE(new.search_labels, ''),
new.item_id
);
END;
CREATE TRIGGER IF NOT EXISTS items_ad
AFTER DELETE ON items
BEGIN
DELETE FROM items_fts WHERE rowid = old.rowid;
END;",
)
.context("recreate FTS5 maintenance triggers after rebuild")?;
Ok(())
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RebuildReport {
pub event_count: usize,
pub item_count: usize,
pub elapsed: std::time::Duration,
pub shard_count: usize,
pub fts5_rebuilt: bool,
}
fn check_sealed_shard_integrity(shard_mgr: &ShardManager) -> Result<()> {
let issues = shard_mgr
.validate_sealed_shards()
.map_err(|e| anyhow::anyhow!("sealed shard validation: {e}"))?;
if !issues.is_empty() {
for issue in &issues {
tracing::error!(
shard = %issue.shard_name,
problem = %issue.problem,
"sealed shard integrity check failed"
);
}
anyhow::bail!(
"sealed shard corrupted: {} (run `bn doctor` to diagnose)",
issues[0].problem
);
}
Ok(())
}
pub fn rebuild(events_dir: &Path, db_path: &Path) -> Result<RebuildReport> {
let start = Instant::now();
if db_path.exists() {
std::fs::remove_file(db_path)
.with_context(|| format!("remove existing projection db {}", db_path.display()))?;
let wal_path = db_path.with_extension("db-wal");
let shm_path = db_path.with_extension("db-shm");
let _ = std::fs::remove_file(wal_path);
let _ = std::fs::remove_file(shm_path);
}
let conn = open_projection(db_path).context("create fresh projection database")?;
configure_rebuild_pragmas(&conn).context("configure rebuild sqlite pragmas")?;
project::ensure_tracking_table(&conn).context("create tracking table")?;
drop_fts_triggers(&conn).context("drop FTS5 triggers for bulk rebuild")?;
let bones_dir = events_dir.parent().unwrap_or_else(|| Path::new("."));
let shard_mgr = ShardManager::new(bones_dir);
let shards = shard_mgr
.list_shards()
.map_err(|e| anyhow::anyhow!("list shards: {e}"))?;
let shard_count = shards.len();
check_sealed_shard_integrity(&shard_mgr)?;
let mut version_checked = false;
let mut shard_version = crate::event::parser::CURRENT_VERSION;
let mut line_no = 0;
let mut total_projected = 0;
let mut total_duplicates = 0;
let mut last_event_hash = None;
let mut total_byte_len = 0;
let batch_size = rebuild_batch_size();
let mut current_batch: Vec<Event> = Vec::with_capacity(batch_size);
let projector = project::Projector::new(&conn);
let shard_line_iter = shard_mgr.replay_lines()?;
for line_res in shard_line_iter {
let (offset, line): (usize, String) =
line_res.map_err(|e: io::Error| anyhow::anyhow!("read shard line: {e}"))?;
line_no += 1;
total_byte_len = offset + line.len();
if !version_checked && line.trim_start().starts_with("# bones event log v") {
version_checked = true;
shard_version = crate::event::parser::detect_version(&line)
.map_err(|msg| anyhow::anyhow!("version check failed at line {line_no}: {msg}"))?;
continue;
}
match crate::event::parser::parse_line(&line) {
Ok(crate::event::parser::ParsedLine::Event(event)) => {
let event = crate::event::migrate_event(*event, shard_version)
.map_err(|e| anyhow::anyhow!("migration failed at line {line_no}: {e}"))?;
last_event_hash = Some(event.event_hash.clone());
current_batch.push(event);
if current_batch.len() >= batch_size {
let stats = projector
.project_batch(¤t_batch)
.context("project batch during rebuild")?;
total_projected += stats.projected;
total_duplicates += stats.duplicates;
current_batch.clear();
}
}
Ok(
crate::event::parser::ParsedLine::Comment(_)
| crate::event::parser::ParsedLine::Blank,
) => {}
Err(crate::event::parser::ParseError::InvalidEventType(raw)) => {
tracing::warn!(line = line_no, event_type = %raw, "skipping unknown event type");
}
Err(e) => anyhow::bail!("parse error at line {line_no}: {e}"),
}
}
if !current_batch.is_empty() {
let stats = projector
.project_batch(¤t_batch)
.context("project final batch during rebuild")?;
total_projected += stats.projected;
total_duplicates += stats.duplicates;
}
crate::db::fts::rebuild_fts_index(&conn).context("rebuild FTS5 index after bulk load")?;
restore_fts_triggers(&conn).context("restore FTS5 triggers after bulk rebuild")?;
let byte_offset_i64 = i64::try_from(total_byte_len).unwrap_or(i64::MAX);
crate::db::query::update_projection_cursor(&conn, byte_offset_i64, last_event_hash.as_deref())
.context("update projection cursor after rebuild")?;
let item_count: i64 = conn
.query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
.context("count items after rebuild")?;
let elapsed = start.elapsed();
tracing::info!(
event_count = total_projected,
duplicates = total_duplicates,
batch_size,
item_count,
shard_count,
elapsed_ms = elapsed.as_millis(),
"projection rebuild complete"
);
Ok(RebuildReport {
event_count: total_projected,
item_count: usize::try_from(item_count).unwrap_or(0),
elapsed,
shard_count,
fts5_rebuilt: true, })
}
#[cfg(test)]
mod tests {
use super::*;
use crate::event::Event;
use crate::event::data::*;
use crate::event::types::EventType;
use crate::event::writer;
use crate::model::item::{Kind, Size, Urgency};
use crate::model::item_id::ItemId;
use crate::shard::ShardManager;
use std::collections::BTreeMap;
use tempfile::TempDir;
fn setup_bones_dir() -> (TempDir, ShardManager) {
let dir = TempDir::new().expect("create tempdir");
let shard_mgr = ShardManager::new(dir.path());
shard_mgr.ensure_dirs().expect("ensure dirs");
shard_mgr.init().expect("init shard");
(dir, shard_mgr)
}
fn make_create_event(id: &str, title: &str, ts: i64) -> Event {
let mut event = Event {
wall_ts_us: ts,
agent: "test-agent".into(),
itc: "itc:AQ".into(),
parents: vec![],
event_type: EventType::Create,
item_id: ItemId::new_unchecked(id),
data: EventData::Create(CreateData {
title: title.into(),
kind: Kind::Task,
size: Some(Size::M),
urgency: Urgency::Default,
labels: vec!["test".into()],
parent: None,
causation: None,
description: Some(format!("Description for {title}")),
extra: BTreeMap::new(),
}),
event_hash: String::new(),
};
writer::write_event(&mut event).expect("compute hash");
event
}
fn make_move_event(
id: &str,
state: crate::model::item::State,
ts: i64,
parent_hash: &str,
) -> Event {
let mut event = Event {
wall_ts_us: ts,
agent: "test-agent".into(),
itc: "itc:AQ".into(),
parents: vec![parent_hash.into()],
event_type: EventType::Move,
item_id: ItemId::new_unchecked(id),
data: EventData::Move(MoveData {
state,
reason: None,
extra: BTreeMap::new(),
}),
event_hash: String::new(),
};
writer::write_event(&mut event).expect("compute hash");
event
}
fn append_event(shard_mgr: &ShardManager, event: &Event) {
let line = writer::write_line(event).expect("serialize event");
let (year, month) = shard_mgr.active_shard().unwrap().unwrap();
shard_mgr
.append_raw(year, month, &line)
.expect("append event");
}
#[test]
fn rebuild_empty_event_log() {
let (dir, _shard_mgr) = setup_bones_dir();
let db_path = dir.path().join("bones.db");
let events_dir = dir.path().join("events");
let report = rebuild(&events_dir, &db_path).unwrap();
assert_eq!(report.event_count, 0);
assert_eq!(report.item_count, 0);
assert_eq!(report.shard_count, 1); assert!(report.fts5_rebuilt);
let conn = open_projection(&db_path).unwrap();
let count: i64 = conn
.query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
.unwrap();
assert_eq!(count, 0);
}
#[test]
fn rebuild_with_events() {
let (dir, shard_mgr) = setup_bones_dir();
let db_path = dir.path().join("bones.db");
let events_dir = dir.path().join("events");
let create1 = make_create_event("bn-001", "First item", 1000);
let create2 = make_create_event("bn-002", "Second item", 1001);
let mv = make_move_event(
"bn-001",
crate::model::item::State::Doing,
2000,
&create1.event_hash,
);
append_event(&shard_mgr, &create1);
append_event(&shard_mgr, &create2);
append_event(&shard_mgr, &mv);
let report = rebuild(&events_dir, &db_path).unwrap();
assert_eq!(report.event_count, 3);
assert_eq!(report.item_count, 2);
let conn = open_projection(&db_path).unwrap();
let item: String = conn
.query_row(
"SELECT state FROM items WHERE item_id = 'bn-001'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(item, "doing");
}
#[test]
fn rebuild_replaces_existing_db() {
let (dir, shard_mgr) = setup_bones_dir();
let db_path = dir.path().join("bones.db");
let events_dir = dir.path().join("events");
let create1 = make_create_event("bn-001", "Item 1", 1000);
append_event(&shard_mgr, &create1);
let report1 = rebuild(&events_dir, &db_path).unwrap();
assert_eq!(report1.event_count, 1);
assert_eq!(report1.item_count, 1);
let create2 = make_create_event("bn-002", "Item 2", 1001);
append_event(&shard_mgr, &create2);
let report2 = rebuild(&events_dir, &db_path).unwrap();
assert_eq!(report2.event_count, 2);
assert_eq!(report2.item_count, 2);
}
#[test]
fn rebuild_is_deterministic() {
let (dir, shard_mgr) = setup_bones_dir();
let events_dir = dir.path().join("events");
let create1 = make_create_event("bn-001", "Deterministic test", 1000);
let create2 = make_create_event("bn-002", "Another item", 1001);
append_event(&shard_mgr, &create1);
append_event(&shard_mgr, &create2);
let db_path_a = dir.path().join("bones_a.db");
let db_path_b = dir.path().join("bones_b.db");
let report_a = rebuild(&events_dir, &db_path_a).unwrap();
let report_b = rebuild(&events_dir, &db_path_b).unwrap();
assert_eq!(report_a.event_count, report_b.event_count);
assert_eq!(report_a.item_count, report_b.item_count);
let conn_a = open_projection(&db_path_a).unwrap();
let conn_b = open_projection(&db_path_b).unwrap();
let titles_a: Vec<String> = {
let mut stmt = conn_a
.prepare("SELECT title FROM items ORDER BY item_id")
.unwrap();
stmt.query_map([], |row| row.get(0))
.unwrap()
.map(|r| r.unwrap())
.collect()
};
let titles_b: Vec<String> = {
let mut stmt = conn_b
.prepare("SELECT title FROM items ORDER BY item_id")
.unwrap();
stmt.query_map([], |row| row.get(0))
.unwrap()
.map(|r| r.unwrap())
.collect()
};
assert_eq!(titles_a, titles_b);
}
#[test]
fn rebuild_populates_fts() {
let (dir, shard_mgr) = setup_bones_dir();
let db_path = dir.path().join("bones.db");
let events_dir = dir.path().join("events");
let create = make_create_event("bn-001", "Authentication timeout fix", 1000);
append_event(&shard_mgr, &create);
rebuild(&events_dir, &db_path).unwrap();
let conn = open_projection(&db_path).unwrap();
let hits: i64 = conn
.query_row(
"SELECT COUNT(*) FROM items_fts WHERE items_fts MATCH 'authentication'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(hits, 1);
}
#[test]
fn rebuild_updates_projection_cursor() {
let (dir, shard_mgr) = setup_bones_dir();
let db_path = dir.path().join("bones.db");
let events_dir = dir.path().join("events");
let create = make_create_event("bn-001", "Item", 1000);
append_event(&shard_mgr, &create);
rebuild(&events_dir, &db_path).unwrap();
let conn = open_projection(&db_path).unwrap();
let (offset, hash) = crate::db::query::get_projection_cursor(&conn).unwrap();
assert!(offset > 0, "cursor offset should be non-zero after rebuild");
assert!(hash.is_some(), "cursor hash should be set after rebuild");
}
#[test]
fn rebuild_handles_duplicate_events() {
let (dir, shard_mgr) = setup_bones_dir();
let db_path = dir.path().join("bones.db");
let events_dir = dir.path().join("events");
let create = make_create_event("bn-001", "Item", 1000);
append_event(&shard_mgr, &create);
append_event(&shard_mgr, &create);
let report = rebuild(&events_dir, &db_path).unwrap();
assert_eq!(report.event_count, 1);
assert_eq!(report.item_count, 1);
}
#[test]
fn rebuild_with_bd_prefix_events() {
let (dir, shard_mgr) = setup_bones_dir();
let db_path = dir.path().join("bones.db");
let events_dir = dir.path().join("events");
let create1 = make_create_event("bd-9mx", "Parent item", 1000);
let create2 = make_create_event("bd-4kz", "Child item", 1001);
append_event(&shard_mgr, &create1);
append_event(&shard_mgr, &create2);
let report = rebuild(&events_dir, &db_path).unwrap();
assert_eq!(
report.event_count, 2,
"should project 2 events with bd- prefix"
);
assert_eq!(report.item_count, 2, "should have 2 items with bd- prefix");
}
#[test]
fn rebuild_performance_reasonable() {
let (dir, shard_mgr) = setup_bones_dir();
let db_path = dir.path().join("bones.db");
let events_dir = dir.path().join("events");
for i in 0..100_u32 {
let create = make_create_event(
&format!("bn-{i:04x}"),
&format!("Item {i}"),
i64::from(i) * 1000,
);
append_event(&shard_mgr, &create);
}
let report = rebuild(&events_dir, &db_path).unwrap();
assert_eq!(report.event_count, 100);
assert_eq!(report.item_count, 100);
assert!(
report.elapsed.as_millis() < 1000,
"rebuild of 100 items took {}ms, expected <1000ms",
report.elapsed.as_millis()
);
}
}