1use std::io;
31use std::path::Path;
32use std::time::Instant;
33
34use anyhow::{Context, Result};
35use rusqlite::Connection;
36
37use crate::db::{migrations, project, query, rebuild};
38use crate::event::Event;
39use crate::shard::ShardManager;
40
41#[derive(Debug, Clone, PartialEq, Eq)]
48pub struct EventHash(pub String);
49
50#[derive(Debug, Clone)]
52pub struct ApplyReport {
53 pub events_applied: usize,
55 pub shards_scanned: usize,
57 pub full_rebuild_triggered: bool,
59 pub full_rebuild_reason: Option<String>,
61 pub elapsed: std::time::Duration,
63}
64
65#[allow(clippy::too_many_lines)]
91pub fn incremental_apply(
92 events_dir: &Path,
93 db_path: &Path,
94 force_full: bool,
95) -> Result<ApplyReport> {
96 let start = Instant::now();
97
98 if force_full {
99 return do_full_rebuild(events_dir, db_path, start, "force_full flag set");
100 }
101
102 let Some(conn) = query::try_open_projection(db_path)? else {
105 return do_full_rebuild(
106 events_dir,
107 db_path,
108 start,
109 "projection database missing or corrupt",
110 );
111 };
112
113 let (byte_offset, last_hash) =
115 query::get_projection_cursor(&conn).context("read projection cursor")?;
116
117 if byte_offset == 0 && last_hash.is_none() {
119 drop(conn);
120 return do_full_rebuild(events_dir, db_path, start, "fresh database (no cursor)");
121 }
122
123 if let Err(reason) = check_incremental_safety(&conn, events_dir) {
125 drop(conn);
126 return do_full_rebuild(events_dir, db_path, start, &reason);
127 }
128
129 let bones_dir = events_dir.parent().unwrap_or_else(|| Path::new("."));
131 let shard_mgr = ShardManager::new(bones_dir);
132 let shards = shard_mgr
133 .list_shards()
134 .map_err(|e| anyhow::anyhow!("list shards: {e}"))?;
135 let shards_scanned = shards.len();
136
137 let offset = usize::try_from(byte_offset).unwrap_or(0);
138
139 if let Some(ref hash) = last_hash {
142 let tail_ok = validate_cursor_hash_at_offset(&shard_mgr, offset, hash).unwrap_or(false);
143 if !tail_ok {
144 drop(conn);
145 return do_full_rebuild(
146 events_dir,
147 db_path,
148 start,
149 "cursor hash not found at expected byte offset",
150 );
151 }
152 }
153
154 let mut line_iter = shard_mgr
155 .replay_lines_from_offset(offset)
156 .map_err(|e| anyhow::anyhow!("open shard line iterator: {e}"))?
157 .peekable();
158
159 if line_iter.peek().is_none() {
161 return Ok(ApplyReport {
162 events_applied: 0,
163 shards_scanned,
164 full_rebuild_triggered: false,
165 full_rebuild_reason: None,
166 elapsed: start.elapsed(),
167 });
168 }
169
170 project::ensure_tracking_table(&conn).context("ensure projected_events tracking table")?;
172
173 let mut version_checked = false;
174 let mut shard_version = crate::event::parser::CURRENT_VERSION;
175 let mut line_no = 0;
176 let mut total_projected = 0;
177 let mut total_duplicates = 0;
178 let mut total_errors = 0;
179 let mut current_last_hash = last_hash;
180 let mut total_byte_len = offset;
181
182 let mut current_batch: Vec<Event> = Vec::with_capacity(1000);
183 let projector = project::Projector::new(&conn);
184
185 for line_res in line_iter {
186 let (abs_offset, line): (usize, String) =
187 line_res.map_err(|e: io::Error| anyhow::anyhow!("read shard line: {e}"))?;
188 line_no += 1;
189 total_byte_len = abs_offset + line.len();
190
191 if !version_checked && line.trim_start().starts_with("# bones event log v") {
193 version_checked = true;
194 shard_version = crate::event::parser::detect_version(&line)
195 .map_err(|msg| anyhow::anyhow!("version check failed: {msg}"))?;
196 continue;
197 }
198
199 match crate::event::parser::parse_line(&line) {
200 Ok(crate::event::parser::ParsedLine::Event(event)) => {
201 let event = crate::event::migrate_event(*event, shard_version)
202 .map_err(|e| anyhow::anyhow!("migration failed: {e}"))?;
203
204 current_last_hash = Some(event.event_hash.clone());
205 current_batch.push(event);
206
207 if current_batch.len() >= 1000 {
208 let stats = projector
209 .project_batch(¤t_batch)
210 .context("project batch during incremental apply")?;
211 total_projected += stats.projected;
212 total_duplicates += stats.duplicates;
213 total_errors += stats.errors;
214 current_batch.clear();
215 }
216 }
217 Ok(
218 crate::event::parser::ParsedLine::Comment(_)
219 | crate::event::parser::ParsedLine::Blank,
220 ) => {}
221 Err(crate::event::parser::ParseError::InvalidEventType(raw)) => {
222 tracing::warn!(line = line_no, event_type = %raw, "skipping unknown event type");
223 }
224 Err(e) => anyhow::bail!("parse error at line {line_no} (offset {abs_offset}): {e}"),
225 }
226 }
227
228 if !current_batch.is_empty() {
230 let stats = projector
231 .project_batch(¤t_batch)
232 .context("project final batch during incremental apply")?;
233 total_projected += stats.projected;
234 total_duplicates += stats.duplicates;
235 total_errors += stats.errors;
236 }
237
238 let new_offset = i64::try_from(total_byte_len).unwrap_or(i64::MAX);
240 query::update_projection_cursor(&conn, new_offset, current_last_hash.as_deref())
241 .context("update projection cursor after incremental apply")?;
242
243 tracing::info!(
244 events_applied = total_projected,
245 duplicates = total_duplicates,
246 errors = total_errors,
247 shards_scanned,
248 byte_offset_from = byte_offset,
249 byte_offset_to = new_offset,
250 elapsed_ms = start.elapsed().as_millis(),
251 "incremental projection apply complete"
252 );
253
254 Ok(ApplyReport {
255 events_applied: total_projected,
256 shards_scanned,
257 full_rebuild_triggered: false,
258 full_rebuild_reason: None,
259 elapsed: start.elapsed(),
260 })
261}
262
263pub fn read_hwm(db: &Connection) -> Result<Option<EventHash>> {
270 let (_offset, hash) = query::get_projection_cursor(db).context("read high-water mark")?;
271 Ok(hash.map(EventHash))
272}
273
274pub fn write_hwm(db: &Connection, hwm: &EventHash) -> Result<()> {
280 let (offset, _) =
282 query::get_projection_cursor(db).context("read current cursor for hwm update")?;
283 query::update_projection_cursor(db, offset, Some(&hwm.0)).context("write high-water mark")?;
284 Ok(())
285}
286
287pub fn check_incremental_safety(db: &Connection, events_dir: &Path) -> Result<(), String> {
302 let schema_version = migrations::current_schema_version(db)
304 .map_err(|e| format!("failed to read schema version: {e}"))?;
305 if schema_version != migrations::LATEST_SCHEMA_VERSION {
306 return Err(format!(
307 "schema version mismatch: db has v{schema_version}, code expects v{}",
308 migrations::LATEST_SCHEMA_VERSION
309 ));
310 }
311
312 let table_exists: bool = db
314 .query_row(
315 "SELECT EXISTS(SELECT 1 FROM sqlite_master WHERE type='table' AND name='projected_events')",
316 [],
317 |row| row.get(0),
318 )
319 .map_err(|e| format!("failed to check projected_events table: {e}"))?;
320 if !table_exists {
321 return Err("projected_events tracking table missing".into());
322 }
323
324 let bones_dir = events_dir.parent().unwrap_or_else(|| Path::new("."));
326 let shard_mgr = ShardManager::new(bones_dir);
327 let shards = shard_mgr
328 .list_shards()
329 .map_err(|e| format!("failed to list shards: {e}"))?;
330
331 if shards.len() > 1 {
333 for &(year, month) in &shards[..shards.len() - 1] {
334 if let Ok(Some(manifest)) = shard_mgr.read_manifest(year, month) {
335 let shard_path = shard_mgr.shard_path(year, month);
336 match std::fs::metadata(&shard_path) {
337 Ok(meta) => {
338 if meta.len() != manifest.byte_len {
339 return Err(format!(
340 "sealed shard {}-{:02} size mismatch: \
341 manifest says {} bytes, file is {} bytes",
342 year,
343 month,
344 manifest.byte_len,
345 meta.len()
346 ));
347 }
348 }
349 Err(e) => {
350 return Err(format!("cannot stat sealed shard {year}-{month:02}: {e}"));
351 }
352 }
353 }
354 }
357 }
358
359 Ok(())
360}
361
362fn validate_cursor_hash_at_offset(
372 shard_mgr: &ShardManager,
373 offset: usize,
374 hash: &str,
375) -> Result<bool> {
376 if offset == 0 {
377 return Ok(false);
378 }
379 let search_start = offset.saturating_sub(512);
380 let window = shard_mgr
381 .read_content_range(search_start, offset)
382 .map_err(|e| anyhow::anyhow!("read cursor hash window: {e}"))?;
383 Ok(window.contains(hash))
384}
385
386#[cfg(test)]
390fn validate_cursor_hash(content: &str, offset: usize, hash: &str) -> bool {
391 if offset == 0 || offset > content.len() {
392 return false;
393 }
394
395 let before = &content[..offset];
396 let search_start = offset.saturating_sub(512);
397 let search_region = &before[search_start..];
398 search_region.contains(hash)
399}
400
401fn do_full_rebuild(
403 events_dir: &Path,
404 db_path: &Path,
405 start: Instant,
406 reason: &str,
407) -> Result<ApplyReport> {
408 tracing::info!(reason, "falling back to full projection rebuild");
409
410 let report = rebuild::rebuild(events_dir, db_path)
411 .context("full rebuild during incremental apply fallback")?;
412
413 Ok(ApplyReport {
414 events_applied: report.event_count,
415 shards_scanned: report.shard_count,
416 full_rebuild_triggered: true,
417 full_rebuild_reason: Some(reason.to_string()),
418 elapsed: start.elapsed(),
419 })
420}
421
422#[cfg(test)]
427mod tests {
428 use super::*;
429 use crate::db::open_projection;
430 use crate::event::Event;
431 use crate::event::data::*;
432 use crate::event::types::EventType;
433 use crate::event::writer;
434 use crate::model::item::{Kind, Size, Urgency};
435 use crate::model::item_id::ItemId;
436 use std::collections::BTreeMap;
437 use tempfile::TempDir;
438
439 fn setup_bones_dir() -> (TempDir, ShardManager) {
444 let dir = TempDir::new().expect("create tempdir");
445 let shard_mgr = ShardManager::new(dir.path());
446 shard_mgr.ensure_dirs().expect("ensure dirs");
447 shard_mgr.init().expect("init shard");
448 (dir, shard_mgr)
449 }
450
451 fn make_create_event(id: &str, title: &str, ts: i64) -> Event {
452 let mut event = Event {
453 wall_ts_us: ts,
454 agent: "test-agent".into(),
455 itc: "itc:AQ".into(),
456 parents: vec![],
457 event_type: EventType::Create,
458 item_id: ItemId::new_unchecked(id),
459 data: EventData::Create(CreateData {
460 title: title.into(),
461 kind: Kind::Task,
462 size: Some(Size::M),
463 urgency: Urgency::Default,
464 labels: vec!["test".into()],
465 parent: None,
466 causation: None,
467 description: Some(format!("Description for {title}")),
468 extra: BTreeMap::new(),
469 }),
470 event_hash: String::new(),
471 };
472 writer::write_event(&mut event).expect("compute hash");
473 event
474 }
475
476 fn append_event(shard_mgr: &ShardManager, event: &Event) {
477 let line = writer::write_line(event).expect("serialize event");
478 let (year, month) = shard_mgr.active_shard().unwrap().unwrap();
479 shard_mgr
480 .append_raw(year, month, &line)
481 .expect("append event");
482 }
483
484 #[test]
489 fn incremental_apply_on_empty_db_does_full_rebuild() {
490 let (dir, _shard_mgr) = setup_bones_dir();
491 let db_path = dir.path().join("bones.db");
492 let events_dir = dir.path().join("events");
493
494 let report = incremental_apply(&events_dir, &db_path, false).unwrap();
495 assert!(report.full_rebuild_triggered);
496 assert!(
497 report
498 .full_rebuild_reason
499 .as_deref()
500 .unwrap()
501 .contains("missing"),
502 "reason: {:?}",
503 report.full_rebuild_reason
504 );
505 }
506
507 #[test]
508 fn incremental_apply_force_full() {
509 let (dir, shard_mgr) = setup_bones_dir();
510 let db_path = dir.path().join("bones.db");
511 let events_dir = dir.path().join("events");
512
513 let create = make_create_event("bn-001", "Item 1", 1000);
514 append_event(&shard_mgr, &create);
515
516 rebuild::rebuild(&events_dir, &db_path).unwrap();
518
519 let report = incremental_apply(&events_dir, &db_path, true).unwrap();
521 assert!(report.full_rebuild_triggered);
522 assert_eq!(
523 report.full_rebuild_reason.as_deref(),
524 Some("force_full flag set")
525 );
526 assert_eq!(report.events_applied, 1);
527 }
528
529 #[test]
530 fn incremental_apply_picks_up_new_events() {
531 let (dir, shard_mgr) = setup_bones_dir();
532 let db_path = dir.path().join("bones.db");
533 let events_dir = dir.path().join("events");
534
535 let create1 = make_create_event("bn-001", "Item 1", 1000);
537 let create2 = make_create_event("bn-002", "Item 2", 1001);
538 append_event(&shard_mgr, &create1);
539 append_event(&shard_mgr, &create2);
540
541 rebuild::rebuild(&events_dir, &db_path).unwrap();
542
543 let create3 = make_create_event("bn-003", "Item 3", 1002);
545 append_event(&shard_mgr, &create3);
546
547 let report = incremental_apply(&events_dir, &db_path, false).unwrap();
549 assert!(!report.full_rebuild_triggered);
550 assert_eq!(report.events_applied, 1);
551
552 let conn = open_projection(&db_path).unwrap();
554 let count: i64 = conn
555 .query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
556 .unwrap();
557 assert_eq!(count, 3);
558 }
559
560 #[test]
561 fn incremental_apply_noop_when_up_to_date() {
562 let (dir, shard_mgr) = setup_bones_dir();
563 let db_path = dir.path().join("bones.db");
564 let events_dir = dir.path().join("events");
565
566 let create = make_create_event("bn-001", "Item 1", 1000);
567 append_event(&shard_mgr, &create);
568
569 rebuild::rebuild(&events_dir, &db_path).unwrap();
570
571 let report = incremental_apply(&events_dir, &db_path, false).unwrap();
573 assert!(!report.full_rebuild_triggered);
574 assert_eq!(report.events_applied, 0);
575 }
576
577 #[test]
578 fn incremental_apply_multiple_rounds() {
579 let (dir, shard_mgr) = setup_bones_dir();
580 let db_path = dir.path().join("bones.db");
581 let events_dir = dir.path().join("events");
582
583 let e1 = make_create_event("bn-001", "Item 1", 1000);
585 append_event(&shard_mgr, &e1);
586 rebuild::rebuild(&events_dir, &db_path).unwrap();
587
588 let e2 = make_create_event("bn-002", "Item 2", 1001);
590 append_event(&shard_mgr, &e2);
591 let r2 = incremental_apply(&events_dir, &db_path, false).unwrap();
592 assert!(!r2.full_rebuild_triggered);
593 assert_eq!(r2.events_applied, 1);
594
595 let e3 = make_create_event("bn-003", "Item 3", 1002);
597 let e4 = make_create_event("bn-004", "Item 4", 1003);
598 append_event(&shard_mgr, &e3);
599 append_event(&shard_mgr, &e4);
600 let r3 = incremental_apply(&events_dir, &db_path, false).unwrap();
601 assert!(!r3.full_rebuild_triggered);
602 assert_eq!(r3.events_applied, 2);
603
604 let conn = open_projection(&db_path).unwrap();
606 let count: i64 = conn
607 .query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
608 .unwrap();
609 assert_eq!(count, 4);
610 }
611
612 #[test]
613 fn incremental_apply_matches_full_rebuild() {
614 let (dir, shard_mgr) = setup_bones_dir();
615 let events_dir = dir.path().join("events");
616
617 for i in 0..10 {
619 let e = make_create_event(
620 &format!("bn-{i:03x}"),
621 &format!("Item {i}"),
622 1000 + i64::from(i),
623 );
624 append_event(&shard_mgr, &e);
625 }
626
627 let db_full = dir.path().join("full.db");
629 rebuild::rebuild(&events_dir, &db_full).unwrap();
630
631 let db_inc = dir.path().join("inc.db");
633 rebuild::rebuild(&events_dir, &db_inc).unwrap();
637
638 let conn_full = open_projection(&db_full).unwrap();
640 let conn_inc = open_projection(&db_inc).unwrap();
641
642 let count_full: i64 = conn_full
643 .query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
644 .unwrap();
645 let count_inc: i64 = conn_inc
646 .query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
647 .unwrap();
648 assert_eq!(count_full, count_inc);
649 assert_eq!(count_full, 10);
650
651 let titles_full: Vec<String> = {
653 let mut stmt = conn_full
654 .prepare("SELECT title FROM items ORDER BY item_id")
655 .unwrap();
656 stmt.query_map([], |row| row.get::<_, String>(0))
657 .unwrap()
658 .map(|r| r.unwrap())
659 .collect()
660 };
661 let titles_inc: Vec<String> = {
662 let mut stmt = conn_inc
663 .prepare("SELECT title FROM items ORDER BY item_id")
664 .unwrap();
665 stmt.query_map([], |row| row.get::<_, String>(0))
666 .unwrap()
667 .map(|r| r.unwrap())
668 .collect()
669 };
670 assert_eq!(titles_full, titles_inc);
671 }
672
673 #[test]
674 fn schema_version_mismatch_triggers_full_rebuild() {
675 let (dir, shard_mgr) = setup_bones_dir();
676 let db_path = dir.path().join("bones.db");
677 let events_dir = dir.path().join("events");
678
679 let create = make_create_event("bn-001", "Item 1", 1000);
680 append_event(&shard_mgr, &create);
681
682 rebuild::rebuild(&events_dir, &db_path).unwrap();
683
684 {
686 let conn = open_projection(&db_path).unwrap();
687 conn.pragma_update(None, "user_version", 999_i64).unwrap();
688 }
689
690 let report = incremental_apply(&events_dir, &db_path, false).unwrap();
691 assert!(report.full_rebuild_triggered);
692 assert!(
693 report
694 .full_rebuild_reason
695 .as_deref()
696 .unwrap()
697 .contains("schema version"),
698 "reason: {:?}",
699 report.full_rebuild_reason
700 );
701 }
702
703 #[test]
704 fn read_hwm_returns_none_for_fresh_db() {
705 let mut conn = Connection::open_in_memory().unwrap();
706 migrations::migrate(&mut conn).unwrap();
707
708 let hwm = read_hwm(&conn).unwrap();
709 assert!(hwm.is_none());
710 }
711
712 #[test]
713 fn write_and_read_hwm_roundtrip() {
714 let mut conn = Connection::open_in_memory().unwrap();
715 migrations::migrate(&mut conn).unwrap();
716
717 let hash = EventHash("blake3:abc123".into());
718 write_hwm(&conn, &hash).unwrap();
719
720 let retrieved = read_hwm(&conn).unwrap();
721 assert_eq!(retrieved.unwrap(), hash);
722 }
723
724 #[test]
725 fn check_incremental_safety_passes_valid_db() {
726 let (dir, shard_mgr) = setup_bones_dir();
727 let db_path = dir.path().join("bones.db");
728 let events_dir = dir.path().join("events");
729
730 let create = make_create_event("bn-001", "Item 1", 1000);
731 append_event(&shard_mgr, &create);
732
733 rebuild::rebuild(&events_dir, &db_path).unwrap();
734
735 let conn = open_projection(&db_path).unwrap();
736 project::ensure_tracking_table(&conn).unwrap();
737 let result = check_incremental_safety(&conn, &events_dir);
738 assert!(result.is_ok(), "safety check failed: {result:?}");
739 }
740
741 #[test]
742 fn check_incremental_safety_fails_schema_mismatch() {
743 let mut conn = Connection::open_in_memory().unwrap();
744 migrations::migrate(&mut conn).unwrap();
745 conn.pragma_update(None, "user_version", 999_i64).unwrap();
746
747 let result = check_incremental_safety(&conn, Path::new("/nonexistent"));
749 assert!(result.is_err());
750 assert!(result.unwrap_err().contains("schema version"));
751 }
752
753 #[test]
754 fn check_incremental_safety_fails_missing_tracking_table() {
755 let mut conn = Connection::open_in_memory().unwrap();
756 migrations::migrate(&mut conn).unwrap();
757 let result = check_incremental_safety(&conn, Path::new("/nonexistent"));
760 assert!(result.is_err());
761 assert!(result.unwrap_err().contains("projected_events"));
762 }
763
764 #[test]
765 fn validate_cursor_hash_finds_hash_near_offset() {
766 let content = "line1\thash1\nline2\tblake3:abc123\nline3\thash3\n";
767 let offset = content.find("line3").unwrap();
768 assert!(validate_cursor_hash(content, offset, "blake3:abc123"));
769 }
770
771 #[test]
772 fn validate_cursor_hash_fails_wrong_hash() {
773 let content = "line1\thash1\nline2\tblake3:abc123\nline3\thash3\n";
774 let offset = content.find("line3").unwrap();
775 assert!(!validate_cursor_hash(content, offset, "blake3:zzz999"));
776 }
777
778 #[test]
779 fn validate_cursor_hash_fails_zero_offset() {
780 let content = "line1\tblake3:abc123\n";
781 assert!(!validate_cursor_hash(content, 0, "blake3:abc123"));
782 }
783}