use std::io;
use std::path::Path;
use std::time::Instant;
use anyhow::{Context, Result};
use rusqlite::Connection;
use crate::db::{migrations, project, query, rebuild};
use crate::event::Event;
use crate::shard::ShardManager;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct EventHash(pub String);
#[derive(Debug, Clone)]
pub struct ApplyReport {
pub events_applied: usize,
pub shards_scanned: usize,
pub full_rebuild_triggered: bool,
pub full_rebuild_reason: Option<String>,
pub elapsed: std::time::Duration,
}
pub fn event_log_cursor(events_dir: &Path) -> Result<(usize, Option<String>)> {
let bones_dir = events_dir.parent().unwrap_or_else(|| Path::new("."));
let shard_mgr = ShardManager::new(bones_dir);
let mut version_checked = false;
let mut shard_version = crate::event::parser::CURRENT_VERSION;
let mut line_no = 0;
let mut total_byte_len = 0;
let mut last_event_hash = None;
let shard_line_iter = shard_mgr.replay_lines()?;
for line_res in shard_line_iter {
let (offset, line): (usize, String) =
line_res.map_err(|e: io::Error| anyhow::anyhow!("read shard line: {e}"))?;
line_no += 1;
total_byte_len = offset + line.len();
if !version_checked && line.trim_start().starts_with("# bones event log v") {
version_checked = true;
shard_version = crate::event::parser::detect_version(&line)
.map_err(|msg| anyhow::anyhow!("version check failed at line {line_no}: {msg}"))?;
continue;
}
match crate::event::parser::parse_line(&line) {
Ok(crate::event::parser::ParsedLine::Event(event)) => {
let event = crate::event::migrate_event(*event, shard_version)
.map_err(|e| anyhow::anyhow!("migration failed at line {line_no}: {e}"))?;
last_event_hash = Some(event.event_hash);
}
Ok(
crate::event::parser::ParsedLine::Comment(_)
| crate::event::parser::ParsedLine::Blank,
) => {}
Err(crate::event::parser::ParseError::InvalidEventType(raw)) => {
tracing::warn!(line = line_no, event_type = %raw, "skipping unknown event type");
}
Err(e) => anyhow::bail!("parse error at line {line_no}: {e}"),
}
}
Ok((total_byte_len, last_event_hash))
}
#[allow(clippy::too_many_lines)]
pub fn incremental_apply(
events_dir: &Path,
db_path: &Path,
force_full: bool,
) -> Result<ApplyReport> {
let start = Instant::now();
if force_full {
return do_full_rebuild(events_dir, db_path, start, "force_full flag set");
}
let Some(conn) = query::try_open_projection_raw(db_path)? else {
return do_full_rebuild(
events_dir,
db_path,
start,
"projection database missing or corrupt",
);
};
let (byte_offset, last_hash) =
query::get_projection_cursor(&conn).context("read projection cursor")?;
if byte_offset == 0 && last_hash.is_none() {
drop(conn);
return do_full_rebuild(events_dir, db_path, start, "fresh database (no cursor)");
}
if let Err(reason) = check_incremental_safety(&conn, events_dir) {
drop(conn);
return do_full_rebuild(events_dir, db_path, start, &reason);
}
let bones_dir = events_dir.parent().unwrap_or_else(|| Path::new("."));
let shard_mgr = ShardManager::new(bones_dir);
let shards = shard_mgr
.list_shards()
.map_err(|e| anyhow::anyhow!("list shards: {e}"))?;
let shards_scanned = shards.len();
let offset = usize::try_from(byte_offset).unwrap_or(0);
if let Some(ref hash) = last_hash {
let tail_ok = validate_cursor_hash_at_offset(&shard_mgr, offset, hash).unwrap_or(false);
if !tail_ok {
drop(conn);
return do_full_rebuild(
events_dir,
db_path,
start,
"cursor hash not found at expected byte offset",
);
}
}
let mut line_iter = shard_mgr
.replay_lines_from_offset(offset)
.map_err(|e| anyhow::anyhow!("open shard line iterator: {e}"))?
.peekable();
if line_iter.peek().is_none() {
return Ok(ApplyReport {
events_applied: 0,
shards_scanned,
full_rebuild_triggered: false,
full_rebuild_reason: None,
elapsed: start.elapsed(),
});
}
project::ensure_tracking_table(&conn).context("ensure projected_events tracking table")?;
let mut version_checked = false;
let mut shard_version = crate::event::parser::CURRENT_VERSION;
let mut line_no = 0;
let mut total_projected = 0;
let mut total_duplicates = 0;
let mut total_errors = 0;
let mut current_last_hash = last_hash;
let mut total_byte_len = offset;
let mut current_batch: Vec<Event> = Vec::with_capacity(1000);
let projector = project::Projector::new(&conn);
for line_res in line_iter {
let (abs_offset, line): (usize, String) =
line_res.map_err(|e: io::Error| anyhow::anyhow!("read shard line: {e}"))?;
line_no += 1;
total_byte_len = abs_offset + line.len();
if !version_checked && line.trim_start().starts_with("# bones event log v") {
version_checked = true;
shard_version = crate::event::parser::detect_version(&line)
.map_err(|msg| anyhow::anyhow!("version check failed: {msg}"))?;
continue;
}
match crate::event::parser::parse_line(&line) {
Ok(crate::event::parser::ParsedLine::Event(event)) => {
let event = crate::event::migrate_event(*event, shard_version)
.map_err(|e| anyhow::anyhow!("migration failed: {e}"))?;
current_last_hash = Some(event.event_hash.clone());
current_batch.push(event);
if current_batch.len() >= 1000 {
let stats = projector
.project_batch(¤t_batch)
.context("project batch during incremental apply")?;
total_projected += stats.projected;
total_duplicates += stats.duplicates;
total_errors += stats.errors;
current_batch.clear();
}
}
Ok(
crate::event::parser::ParsedLine::Comment(_)
| crate::event::parser::ParsedLine::Blank,
) => {}
Err(crate::event::parser::ParseError::InvalidEventType(raw)) => {
tracing::warn!(line = line_no, event_type = %raw, "skipping unknown event type");
}
Err(e) => anyhow::bail!("parse error at line {line_no} (offset {abs_offset}): {e}"),
}
}
if !current_batch.is_empty() {
let stats = projector
.project_batch(¤t_batch)
.context("project final batch during incremental apply")?;
total_projected += stats.projected;
total_duplicates += stats.duplicates;
total_errors += stats.errors;
}
let new_offset = i64::try_from(total_byte_len).unwrap_or(i64::MAX);
query::update_projection_cursor(&conn, new_offset, current_last_hash.as_deref())
.context("update projection cursor after incremental apply")?;
tracing::info!(
events_applied = total_projected,
duplicates = total_duplicates,
errors = total_errors,
shards_scanned,
byte_offset_from = byte_offset,
byte_offset_to = new_offset,
elapsed_ms = start.elapsed().as_millis(),
"incremental projection apply complete"
);
Ok(ApplyReport {
events_applied: total_projected,
shards_scanned,
full_rebuild_triggered: false,
full_rebuild_reason: None,
elapsed: start.elapsed(),
})
}
pub fn read_hwm(db: &Connection) -> Result<Option<EventHash>> {
let (_offset, hash) = query::get_projection_cursor(db).context("read high-water mark")?;
Ok(hash.map(EventHash))
}
pub fn write_hwm(db: &Connection, hwm: &EventHash) -> Result<()> {
let (offset, _) =
query::get_projection_cursor(db).context("read current cursor for hwm update")?;
query::update_projection_cursor(db, offset, Some(&hwm.0)).context("write high-water mark")?;
Ok(())
}
pub fn check_incremental_safety(db: &Connection, events_dir: &Path) -> Result<(), String> {
let schema_version = migrations::current_schema_version(db)
.map_err(|e| format!("failed to read schema version: {e}"))?;
if schema_version != migrations::LATEST_SCHEMA_VERSION {
return Err(format!(
"schema version mismatch: db has v{schema_version}, code expects v{}",
migrations::LATEST_SCHEMA_VERSION
));
}
let table_exists: bool = db
.query_row(
"SELECT EXISTS(SELECT 1 FROM sqlite_master WHERE type='table' AND name='projected_events')",
[],
|row| row.get(0),
)
.map_err(|e| format!("failed to check projected_events table: {e}"))?;
if !table_exists {
return Err("projected_events tracking table missing".into());
}
let bones_dir = events_dir.parent().unwrap_or_else(|| Path::new("."));
let shard_mgr = ShardManager::new(bones_dir);
let shards = shard_mgr
.list_shards()
.map_err(|e| format!("failed to list shards: {e}"))?;
if shards.len() > 1 {
for &(year, month) in &shards[..shards.len() - 1] {
if let Ok(Some(manifest)) = shard_mgr.read_manifest(year, month) {
let shard_path = shard_mgr.shard_path(year, month);
match std::fs::metadata(&shard_path) {
Ok(meta) => {
if meta.len() != manifest.byte_len {
return Err(format!(
"sealed shard {}-{:02} size mismatch: \
manifest says {} bytes, file is {} bytes",
year,
month,
manifest.byte_len,
meta.len()
));
}
}
Err(e) => {
return Err(format!("cannot stat sealed shard {year}-{month:02}: {e}"));
}
}
}
}
}
Ok(())
}
fn validate_cursor_hash_at_offset(
shard_mgr: &ShardManager,
offset: usize,
hash: &str,
) -> Result<bool> {
if offset == 0 {
return Ok(false);
}
let search_start = offset.saturating_sub(512);
let window = shard_mgr
.read_content_range(search_start, offset)
.map_err(|e| anyhow::anyhow!("read cursor hash window: {e}"))?;
Ok(window.contains(hash))
}
#[cfg(test)]
fn validate_cursor_hash(content: &str, offset: usize, hash: &str) -> bool {
if offset == 0 || offset > content.len() {
return false;
}
fn snap_forward(s: &str, offset: usize) -> usize {
let mut p = offset.min(s.len());
while p < s.len() && !s.is_char_boundary(p) {
p += 1;
}
p
}
let end = snap_forward(content, offset);
let before = &content[..end];
let search_start = snap_forward(content, offset.saturating_sub(512));
let search_region = &before[search_start.min(end)..];
search_region.contains(hash)
}
fn do_full_rebuild(
events_dir: &Path,
db_path: &Path,
start: Instant,
reason: &str,
) -> Result<ApplyReport> {
tracing::info!(reason, "falling back to full projection rebuild");
let report = rebuild::rebuild(events_dir, db_path)
.context("full rebuild during incremental apply fallback")?;
Ok(ApplyReport {
events_applied: report.event_count,
shards_scanned: report.shard_count,
full_rebuild_triggered: true,
full_rebuild_reason: Some(reason.to_string()),
elapsed: start.elapsed(),
})
}
#[cfg(test)]
mod tests {
use super::*;
use crate::db::open_projection;
use crate::event::Event;
use crate::event::data::*;
use crate::event::types::EventType;
use crate::event::writer;
use crate::model::item::{Kind, Size, Urgency};
use crate::model::item_id::ItemId;
use std::collections::BTreeMap;
use tempfile::TempDir;
fn setup_bones_dir() -> (TempDir, ShardManager) {
let dir = TempDir::new().expect("create tempdir");
let shard_mgr = ShardManager::new(dir.path());
shard_mgr.ensure_dirs().expect("ensure dirs");
shard_mgr.init().expect("init shard");
(dir, shard_mgr)
}
fn make_create_event(id: &str, title: &str, ts: i64) -> Event {
let mut event = Event {
wall_ts_us: ts,
agent: "test-agent".into(),
itc: "itc:AQ".into(),
parents: vec![],
event_type: EventType::Create,
item_id: ItemId::new_unchecked(id),
data: EventData::Create(CreateData {
title: title.into(),
kind: Kind::Task,
size: Some(Size::M),
urgency: Urgency::Default,
labels: vec!["test".into()],
parent: None,
causation: None,
description: Some(format!("Description for {title}")),
extra: BTreeMap::new(),
}),
event_hash: String::new(),
};
writer::write_event(&mut event).expect("compute hash");
event
}
fn append_event(shard_mgr: &ShardManager, event: &Event) {
let line = writer::write_line(event).expect("serialize event");
let (year, month) = shard_mgr.active_shard().unwrap().unwrap();
shard_mgr
.append_raw(year, month, &line)
.expect("append event");
}
#[test]
fn incremental_apply_on_empty_db_does_full_rebuild() {
let (dir, _shard_mgr) = setup_bones_dir();
let db_path = dir.path().join("bones.db");
let events_dir = dir.path().join("events");
let report = incremental_apply(&events_dir, &db_path, false).unwrap();
assert!(report.full_rebuild_triggered);
assert!(
report
.full_rebuild_reason
.as_deref()
.unwrap()
.contains("missing"),
"reason: {:?}",
report.full_rebuild_reason
);
}
#[test]
fn incremental_apply_force_full() {
let (dir, shard_mgr) = setup_bones_dir();
let db_path = dir.path().join("bones.db");
let events_dir = dir.path().join("events");
let create = make_create_event("bn-001", "Item 1", 1000);
append_event(&shard_mgr, &create);
rebuild::rebuild(&events_dir, &db_path).unwrap();
let report = incremental_apply(&events_dir, &db_path, true).unwrap();
assert!(report.full_rebuild_triggered);
assert_eq!(
report.full_rebuild_reason.as_deref(),
Some("force_full flag set")
);
assert_eq!(report.events_applied, 1);
}
#[test]
fn incremental_apply_picks_up_new_events() {
let (dir, shard_mgr) = setup_bones_dir();
let db_path = dir.path().join("bones.db");
let events_dir = dir.path().join("events");
let create1 = make_create_event("bn-001", "Item 1", 1000);
let create2 = make_create_event("bn-002", "Item 2", 1001);
append_event(&shard_mgr, &create1);
append_event(&shard_mgr, &create2);
rebuild::rebuild(&events_dir, &db_path).unwrap();
let create3 = make_create_event("bn-003", "Item 3", 1002);
append_event(&shard_mgr, &create3);
let report = incremental_apply(&events_dir, &db_path, false).unwrap();
assert!(!report.full_rebuild_triggered);
assert_eq!(report.events_applied, 1);
let conn = open_projection(&db_path).unwrap();
let count: i64 = conn
.query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
.unwrap();
assert_eq!(count, 3);
}
#[test]
fn incremental_apply_noop_when_up_to_date() {
let (dir, shard_mgr) = setup_bones_dir();
let db_path = dir.path().join("bones.db");
let events_dir = dir.path().join("events");
let create = make_create_event("bn-001", "Item 1", 1000);
append_event(&shard_mgr, &create);
rebuild::rebuild(&events_dir, &db_path).unwrap();
let report = incremental_apply(&events_dir, &db_path, false).unwrap();
assert!(!report.full_rebuild_triggered);
assert_eq!(report.events_applied, 0);
}
#[test]
fn incremental_apply_multiple_rounds() {
let (dir, shard_mgr) = setup_bones_dir();
let db_path = dir.path().join("bones.db");
let events_dir = dir.path().join("events");
let e1 = make_create_event("bn-001", "Item 1", 1000);
append_event(&shard_mgr, &e1);
rebuild::rebuild(&events_dir, &db_path).unwrap();
let e2 = make_create_event("bn-002", "Item 2", 1001);
append_event(&shard_mgr, &e2);
let r2 = incremental_apply(&events_dir, &db_path, false).unwrap();
assert!(!r2.full_rebuild_triggered);
assert_eq!(r2.events_applied, 1);
let e3 = make_create_event("bn-003", "Item 3", 1002);
let e4 = make_create_event("bn-004", "Item 4", 1003);
append_event(&shard_mgr, &e3);
append_event(&shard_mgr, &e4);
let r3 = incremental_apply(&events_dir, &db_path, false).unwrap();
assert!(!r3.full_rebuild_triggered);
assert_eq!(r3.events_applied, 2);
let conn = open_projection(&db_path).unwrap();
let count: i64 = conn
.query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
.unwrap();
assert_eq!(count, 4);
}
#[test]
fn incremental_apply_matches_full_rebuild() {
let (dir, shard_mgr) = setup_bones_dir();
let events_dir = dir.path().join("events");
for i in 0..10 {
let e = make_create_event(
&format!("bn-{i:03x}"),
&format!("Item {i}"),
1000 + i64::from(i),
);
append_event(&shard_mgr, &e);
}
let db_full = dir.path().join("full.db");
rebuild::rebuild(&events_dir, &db_full).unwrap();
let db_inc = dir.path().join("inc.db");
rebuild::rebuild(&events_dir, &db_inc).unwrap();
let conn_full = open_projection(&db_full).unwrap();
let conn_inc = open_projection(&db_inc).unwrap();
let count_full: i64 = conn_full
.query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
.unwrap();
let count_inc: i64 = conn_inc
.query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
.unwrap();
assert_eq!(count_full, count_inc);
assert_eq!(count_full, 10);
let titles_full: Vec<String> = {
let mut stmt = conn_full
.prepare("SELECT title FROM items ORDER BY item_id")
.unwrap();
stmt.query_map([], |row| row.get::<_, String>(0))
.unwrap()
.map(|r| r.unwrap())
.collect()
};
let titles_inc: Vec<String> = {
let mut stmt = conn_inc
.prepare("SELECT title FROM items ORDER BY item_id")
.unwrap();
stmt.query_map([], |row| row.get::<_, String>(0))
.unwrap()
.map(|r| r.unwrap())
.collect()
};
assert_eq!(titles_full, titles_inc);
}
#[test]
fn schema_version_mismatch_triggers_full_rebuild() {
let (dir, shard_mgr) = setup_bones_dir();
let db_path = dir.path().join("bones.db");
let events_dir = dir.path().join("events");
let create = make_create_event("bn-001", "Item 1", 1000);
append_event(&shard_mgr, &create);
rebuild::rebuild(&events_dir, &db_path).unwrap();
{
let conn = open_projection(&db_path).unwrap();
conn.pragma_update(None, "user_version", 999_i64).unwrap();
}
let report = incremental_apply(&events_dir, &db_path, false).unwrap();
assert!(report.full_rebuild_triggered);
assert!(
report
.full_rebuild_reason
.as_deref()
.unwrap()
.contains("schema version"),
"reason: {:?}",
report.full_rebuild_reason
);
}
#[test]
fn read_hwm_returns_none_for_fresh_db() {
let mut conn = Connection::open_in_memory().unwrap();
migrations::migrate(&mut conn).unwrap();
let hwm = read_hwm(&conn).unwrap();
assert!(hwm.is_none());
}
#[test]
fn write_and_read_hwm_roundtrip() {
let mut conn = Connection::open_in_memory().unwrap();
migrations::migrate(&mut conn).unwrap();
let hash = EventHash("blake3:abc123".into());
write_hwm(&conn, &hash).unwrap();
let retrieved = read_hwm(&conn).unwrap();
assert_eq!(retrieved.unwrap(), hash);
}
#[test]
fn check_incremental_safety_passes_valid_db() {
let (dir, shard_mgr) = setup_bones_dir();
let db_path = dir.path().join("bones.db");
let events_dir = dir.path().join("events");
let create = make_create_event("bn-001", "Item 1", 1000);
append_event(&shard_mgr, &create);
rebuild::rebuild(&events_dir, &db_path).unwrap();
let conn = open_projection(&db_path).unwrap();
project::ensure_tracking_table(&conn).unwrap();
let result = check_incremental_safety(&conn, &events_dir);
assert!(result.is_ok(), "safety check failed: {result:?}");
}
#[test]
fn check_incremental_safety_fails_schema_mismatch() {
let mut conn = Connection::open_in_memory().unwrap();
migrations::migrate(&mut conn).unwrap();
conn.pragma_update(None, "user_version", 999_i64).unwrap();
let result = check_incremental_safety(&conn, Path::new("/nonexistent"));
assert!(result.is_err());
assert!(result.unwrap_err().contains("schema version"));
}
#[test]
fn check_incremental_safety_fails_missing_tracking_table() {
let mut conn = Connection::open_in_memory().unwrap();
migrations::migrate(&mut conn).unwrap();
let result = check_incremental_safety(&conn, Path::new("/nonexistent"));
assert!(result.is_err());
assert!(result.unwrap_err().contains("projected_events"));
}
#[test]
fn validate_cursor_hash_finds_hash_near_offset() {
let content = "line1\thash1\nline2\tblake3:abc123\nline3\thash3\n";
let offset = content.find("line3").unwrap();
assert!(validate_cursor_hash(content, offset, "blake3:abc123"));
}
#[test]
fn validate_cursor_hash_fails_wrong_hash() {
let content = "line1\thash1\nline2\tblake3:abc123\nline3\thash3\n";
let offset = content.find("line3").unwrap();
assert!(!validate_cursor_hash(content, offset, "blake3:zzz999"));
}
#[test]
fn validate_cursor_hash_fails_zero_offset() {
let content = "line1\tblake3:abc123\n";
assert!(!validate_cursor_hash(content, 0, "blake3:abc123"));
}
}