1use std::io;
31use std::path::Path;
32use std::time::Instant;
33
34use anyhow::{Context, Result};
35use rusqlite::Connection;
36
37use crate::db::{migrations, project, query, rebuild};
38use crate::event::Event;
39use crate::shard::ShardManager;
40
41#[derive(Debug, Clone, PartialEq, Eq)]
48pub struct EventHash(pub String);
49
50#[derive(Debug, Clone)]
52pub struct ApplyReport {
53 pub events_applied: usize,
55 pub shards_scanned: usize,
57 pub full_rebuild_triggered: bool,
59 pub full_rebuild_reason: Option<String>,
61 pub elapsed: std::time::Duration,
63}
64
65pub fn event_log_cursor(events_dir: &Path) -> Result<(usize, Option<String>)> {
78 let bones_dir = events_dir.parent().unwrap_or_else(|| Path::new("."));
79 let shard_mgr = ShardManager::new(bones_dir);
80
81 let mut version_checked = false;
82 let mut shard_version = crate::event::parser::CURRENT_VERSION;
83 let mut line_no = 0;
84 let mut total_byte_len = 0;
85 let mut last_event_hash = None;
86
87 let shard_line_iter = shard_mgr.replay_lines()?;
88 for line_res in shard_line_iter {
89 let (offset, line): (usize, String) =
90 line_res.map_err(|e: io::Error| anyhow::anyhow!("read shard line: {e}"))?;
91 line_no += 1;
92 total_byte_len = offset + line.len();
93
94 if !version_checked && line.trim_start().starts_with("# bones event log v") {
95 version_checked = true;
96 shard_version = crate::event::parser::detect_version(&line)
97 .map_err(|msg| anyhow::anyhow!("version check failed at line {line_no}: {msg}"))?;
98 continue;
99 }
100
101 match crate::event::parser::parse_line(&line) {
102 Ok(crate::event::parser::ParsedLine::Event(event)) => {
103 let event = crate::event::migrate_event(*event, shard_version)
104 .map_err(|e| anyhow::anyhow!("migration failed at line {line_no}: {e}"))?;
105 last_event_hash = Some(event.event_hash);
106 }
107 Ok(
108 crate::event::parser::ParsedLine::Comment(_)
109 | crate::event::parser::ParsedLine::Blank,
110 ) => {}
111 Err(crate::event::parser::ParseError::InvalidEventType(raw)) => {
112 tracing::warn!(line = line_no, event_type = %raw, "skipping unknown event type");
113 }
114 Err(e) => anyhow::bail!("parse error at line {line_no}: {e}"),
115 }
116 }
117
118 Ok((total_byte_len, last_event_hash))
119}
120
121#[allow(clippy::too_many_lines)]
143pub fn incremental_apply(
144 events_dir: &Path,
145 db_path: &Path,
146 force_full: bool,
147) -> Result<ApplyReport> {
148 let start = Instant::now();
149
150 if force_full {
151 return do_full_rebuild(events_dir, db_path, start, "force_full flag set");
152 }
153
154 let Some(conn) = query::try_open_projection_raw(db_path)? else {
158 return do_full_rebuild(
159 events_dir,
160 db_path,
161 start,
162 "projection database missing or corrupt",
163 );
164 };
165
166 let (byte_offset, last_hash) =
168 query::get_projection_cursor(&conn).context("read projection cursor")?;
169
170 if byte_offset == 0 && last_hash.is_none() {
172 drop(conn);
173 return do_full_rebuild(events_dir, db_path, start, "fresh database (no cursor)");
174 }
175
176 if let Err(reason) = check_incremental_safety(&conn, events_dir) {
178 drop(conn);
179 return do_full_rebuild(events_dir, db_path, start, &reason);
180 }
181
182 let bones_dir = events_dir.parent().unwrap_or_else(|| Path::new("."));
184 let shard_mgr = ShardManager::new(bones_dir);
185 let shards = shard_mgr
186 .list_shards()
187 .map_err(|e| anyhow::anyhow!("list shards: {e}"))?;
188 let shards_scanned = shards.len();
189
190 let offset = usize::try_from(byte_offset).unwrap_or(0);
191
192 if let Some(ref hash) = last_hash {
195 let tail_ok = validate_cursor_hash_at_offset(&shard_mgr, offset, hash).unwrap_or(false);
196 if !tail_ok {
197 drop(conn);
198 return do_full_rebuild(
199 events_dir,
200 db_path,
201 start,
202 "cursor hash not found at expected byte offset",
203 );
204 }
205 }
206
207 let mut line_iter = shard_mgr
208 .replay_lines_from_offset(offset)
209 .map_err(|e| anyhow::anyhow!("open shard line iterator: {e}"))?
210 .peekable();
211
212 if line_iter.peek().is_none() {
214 return Ok(ApplyReport {
215 events_applied: 0,
216 shards_scanned,
217 full_rebuild_triggered: false,
218 full_rebuild_reason: None,
219 elapsed: start.elapsed(),
220 });
221 }
222
223 project::ensure_tracking_table(&conn).context("ensure projected_events tracking table")?;
225
226 let mut version_checked = false;
227 let mut shard_version = crate::event::parser::CURRENT_VERSION;
228 let mut line_no = 0;
229 let mut total_projected = 0;
230 let mut total_duplicates = 0;
231 let mut total_errors = 0;
232 let mut current_last_hash = last_hash;
233 let mut total_byte_len = offset;
234
235 let mut current_batch: Vec<Event> = Vec::with_capacity(1000);
236 let projector = project::Projector::new(&conn);
237
238 for line_res in line_iter {
239 let (abs_offset, line): (usize, String) =
240 line_res.map_err(|e: io::Error| anyhow::anyhow!("read shard line: {e}"))?;
241 line_no += 1;
242 total_byte_len = abs_offset + line.len();
243
244 if !version_checked && line.trim_start().starts_with("# bones event log v") {
246 version_checked = true;
247 shard_version = crate::event::parser::detect_version(&line)
248 .map_err(|msg| anyhow::anyhow!("version check failed: {msg}"))?;
249 continue;
250 }
251
252 match crate::event::parser::parse_line(&line) {
253 Ok(crate::event::parser::ParsedLine::Event(event)) => {
254 let event = crate::event::migrate_event(*event, shard_version)
255 .map_err(|e| anyhow::anyhow!("migration failed: {e}"))?;
256
257 current_last_hash = Some(event.event_hash.clone());
258 current_batch.push(event);
259
260 if current_batch.len() >= 1000 {
261 let stats = projector
262 .project_batch(¤t_batch)
263 .context("project batch during incremental apply")?;
264 total_projected += stats.projected;
265 total_duplicates += stats.duplicates;
266 total_errors += stats.errors;
267 current_batch.clear();
268 }
269 }
270 Ok(
271 crate::event::parser::ParsedLine::Comment(_)
272 | crate::event::parser::ParsedLine::Blank,
273 ) => {}
274 Err(crate::event::parser::ParseError::InvalidEventType(raw)) => {
275 tracing::warn!(line = line_no, event_type = %raw, "skipping unknown event type");
276 }
277 Err(e) => anyhow::bail!("parse error at line {line_no} (offset {abs_offset}): {e}"),
278 }
279 }
280
281 if !current_batch.is_empty() {
283 let stats = projector
284 .project_batch(¤t_batch)
285 .context("project final batch during incremental apply")?;
286 total_projected += stats.projected;
287 total_duplicates += stats.duplicates;
288 total_errors += stats.errors;
289 }
290
291 let new_offset = i64::try_from(total_byte_len).unwrap_or(i64::MAX);
293 query::update_projection_cursor(&conn, new_offset, current_last_hash.as_deref())
294 .context("update projection cursor after incremental apply")?;
295
296 tracing::info!(
297 events_applied = total_projected,
298 duplicates = total_duplicates,
299 errors = total_errors,
300 shards_scanned,
301 byte_offset_from = byte_offset,
302 byte_offset_to = new_offset,
303 elapsed_ms = start.elapsed().as_millis(),
304 "incremental projection apply complete"
305 );
306
307 Ok(ApplyReport {
308 events_applied: total_projected,
309 shards_scanned,
310 full_rebuild_triggered: false,
311 full_rebuild_reason: None,
312 elapsed: start.elapsed(),
313 })
314}
315
316pub fn read_hwm(db: &Connection) -> Result<Option<EventHash>> {
323 let (_offset, hash) = query::get_projection_cursor(db).context("read high-water mark")?;
324 Ok(hash.map(EventHash))
325}
326
327pub fn write_hwm(db: &Connection, hwm: &EventHash) -> Result<()> {
333 let (offset, _) =
335 query::get_projection_cursor(db).context("read current cursor for hwm update")?;
336 query::update_projection_cursor(db, offset, Some(&hwm.0)).context("write high-water mark")?;
337 Ok(())
338}
339
340pub fn check_incremental_safety(db: &Connection, events_dir: &Path) -> Result<(), String> {
355 let schema_version = migrations::current_schema_version(db)
357 .map_err(|e| format!("failed to read schema version: {e}"))?;
358 if schema_version != migrations::LATEST_SCHEMA_VERSION {
359 return Err(format!(
360 "schema version mismatch: db has v{schema_version}, code expects v{}",
361 migrations::LATEST_SCHEMA_VERSION
362 ));
363 }
364
365 let table_exists: bool = db
367 .query_row(
368 "SELECT EXISTS(SELECT 1 FROM sqlite_master WHERE type='table' AND name='projected_events')",
369 [],
370 |row| row.get(0),
371 )
372 .map_err(|e| format!("failed to check projected_events table: {e}"))?;
373 if !table_exists {
374 return Err("projected_events tracking table missing".into());
375 }
376
377 let bones_dir = events_dir.parent().unwrap_or_else(|| Path::new("."));
379 let shard_mgr = ShardManager::new(bones_dir);
380 let shards = shard_mgr
381 .list_shards()
382 .map_err(|e| format!("failed to list shards: {e}"))?;
383
384 if shards.len() > 1 {
386 for &(year, month) in &shards[..shards.len() - 1] {
387 if let Ok(Some(manifest)) = shard_mgr.read_manifest(year, month) {
388 let shard_path = shard_mgr.shard_path(year, month);
389 match std::fs::metadata(&shard_path) {
390 Ok(meta) => {
391 if meta.len() != manifest.byte_len {
392 return Err(format!(
393 "sealed shard {}-{:02} size mismatch: \
394 manifest says {} bytes, file is {} bytes",
395 year,
396 month,
397 manifest.byte_len,
398 meta.len()
399 ));
400 }
401 }
402 Err(e) => {
403 return Err(format!("cannot stat sealed shard {year}-{month:02}: {e}"));
404 }
405 }
406 }
407 }
410 }
411
412 Ok(())
413}
414
415fn validate_cursor_hash_at_offset(
425 shard_mgr: &ShardManager,
426 offset: usize,
427 hash: &str,
428) -> Result<bool> {
429 if offset == 0 {
430 return Ok(false);
431 }
432 let search_start = offset.saturating_sub(512);
433 let window = shard_mgr
434 .read_content_range(search_start, offset)
435 .map_err(|e| anyhow::anyhow!("read cursor hash window: {e}"))?;
436 Ok(window.contains(hash))
437}
438
439#[cfg(test)]
443fn validate_cursor_hash(content: &str, offset: usize, hash: &str) -> bool {
444 if offset == 0 || offset > content.len() {
445 return false;
446 }
447
448 fn snap_forward(s: &str, offset: usize) -> usize {
450 let mut p = offset.min(s.len());
451 while p < s.len() && !s.is_char_boundary(p) {
452 p += 1;
453 }
454 p
455 }
456 let end = snap_forward(content, offset);
457 let before = &content[..end];
458 let search_start = snap_forward(content, offset.saturating_sub(512));
459 let search_region = &before[search_start.min(end)..];
460 search_region.contains(hash)
461}
462
463fn do_full_rebuild(
465 events_dir: &Path,
466 db_path: &Path,
467 start: Instant,
468 reason: &str,
469) -> Result<ApplyReport> {
470 tracing::info!(reason, "falling back to full projection rebuild");
471
472 let report = rebuild::rebuild(events_dir, db_path)
473 .context("full rebuild during incremental apply fallback")?;
474
475 Ok(ApplyReport {
476 events_applied: report.event_count,
477 shards_scanned: report.shard_count,
478 full_rebuild_triggered: true,
479 full_rebuild_reason: Some(reason.to_string()),
480 elapsed: start.elapsed(),
481 })
482}
483
484#[cfg(test)]
489mod tests {
490 use super::*;
491 use crate::db::open_projection;
492 use crate::event::Event;
493 use crate::event::data::*;
494 use crate::event::types::EventType;
495 use crate::event::writer;
496 use crate::model::item::{Kind, Size, Urgency};
497 use crate::model::item_id::ItemId;
498 use std::collections::BTreeMap;
499 use tempfile::TempDir;
500
501 fn setup_bones_dir() -> (TempDir, ShardManager) {
506 let dir = TempDir::new().expect("create tempdir");
507 let shard_mgr = ShardManager::new(dir.path());
508 shard_mgr.ensure_dirs().expect("ensure dirs");
509 shard_mgr.init().expect("init shard");
510 (dir, shard_mgr)
511 }
512
513 fn make_create_event(id: &str, title: &str, ts: i64) -> Event {
514 let mut event = Event {
515 wall_ts_us: ts,
516 agent: "test-agent".into(),
517 itc: "itc:AQ".into(),
518 parents: vec![],
519 event_type: EventType::Create,
520 item_id: ItemId::new_unchecked(id),
521 data: EventData::Create(CreateData {
522 title: title.into(),
523 kind: Kind::Task,
524 size: Some(Size::M),
525 urgency: Urgency::Default,
526 labels: vec!["test".into()],
527 parent: None,
528 causation: None,
529 description: Some(format!("Description for {title}")),
530 extra: BTreeMap::new(),
531 }),
532 event_hash: String::new(),
533 };
534 writer::write_event(&mut event).expect("compute hash");
535 event
536 }
537
538 fn append_event(shard_mgr: &ShardManager, event: &Event) {
539 let line = writer::write_line(event).expect("serialize event");
540 let (year, month) = shard_mgr.active_shard().unwrap().unwrap();
541 shard_mgr
542 .append_raw(year, month, &line)
543 .expect("append event");
544 }
545
546 #[test]
551 fn incremental_apply_on_empty_db_does_full_rebuild() {
552 let (dir, _shard_mgr) = setup_bones_dir();
553 let db_path = dir.path().join("bones.db");
554 let events_dir = dir.path().join("events");
555
556 let report = incremental_apply(&events_dir, &db_path, false).unwrap();
557 assert!(report.full_rebuild_triggered);
558 assert!(
559 report
560 .full_rebuild_reason
561 .as_deref()
562 .unwrap()
563 .contains("missing"),
564 "reason: {:?}",
565 report.full_rebuild_reason
566 );
567 }
568
569 #[test]
570 fn incremental_apply_force_full() {
571 let (dir, shard_mgr) = setup_bones_dir();
572 let db_path = dir.path().join("bones.db");
573 let events_dir = dir.path().join("events");
574
575 let create = make_create_event("bn-001", "Item 1", 1000);
576 append_event(&shard_mgr, &create);
577
578 rebuild::rebuild(&events_dir, &db_path).unwrap();
580
581 let report = incremental_apply(&events_dir, &db_path, true).unwrap();
583 assert!(report.full_rebuild_triggered);
584 assert_eq!(
585 report.full_rebuild_reason.as_deref(),
586 Some("force_full flag set")
587 );
588 assert_eq!(report.events_applied, 1);
589 }
590
591 #[test]
592 fn incremental_apply_picks_up_new_events() {
593 let (dir, shard_mgr) = setup_bones_dir();
594 let db_path = dir.path().join("bones.db");
595 let events_dir = dir.path().join("events");
596
597 let create1 = make_create_event("bn-001", "Item 1", 1000);
599 let create2 = make_create_event("bn-002", "Item 2", 1001);
600 append_event(&shard_mgr, &create1);
601 append_event(&shard_mgr, &create2);
602
603 rebuild::rebuild(&events_dir, &db_path).unwrap();
604
605 let create3 = make_create_event("bn-003", "Item 3", 1002);
607 append_event(&shard_mgr, &create3);
608
609 let report = incremental_apply(&events_dir, &db_path, false).unwrap();
611 assert!(!report.full_rebuild_triggered);
612 assert_eq!(report.events_applied, 1);
613
614 let conn = open_projection(&db_path).unwrap();
616 let count: i64 = conn
617 .query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
618 .unwrap();
619 assert_eq!(count, 3);
620 }
621
622 #[test]
623 fn incremental_apply_noop_when_up_to_date() {
624 let (dir, shard_mgr) = setup_bones_dir();
625 let db_path = dir.path().join("bones.db");
626 let events_dir = dir.path().join("events");
627
628 let create = make_create_event("bn-001", "Item 1", 1000);
629 append_event(&shard_mgr, &create);
630
631 rebuild::rebuild(&events_dir, &db_path).unwrap();
632
633 let report = incremental_apply(&events_dir, &db_path, false).unwrap();
635 assert!(!report.full_rebuild_triggered);
636 assert_eq!(report.events_applied, 0);
637 }
638
639 #[test]
640 fn incremental_apply_multiple_rounds() {
641 let (dir, shard_mgr) = setup_bones_dir();
642 let db_path = dir.path().join("bones.db");
643 let events_dir = dir.path().join("events");
644
645 let e1 = make_create_event("bn-001", "Item 1", 1000);
647 append_event(&shard_mgr, &e1);
648 rebuild::rebuild(&events_dir, &db_path).unwrap();
649
650 let e2 = make_create_event("bn-002", "Item 2", 1001);
652 append_event(&shard_mgr, &e2);
653 let r2 = incremental_apply(&events_dir, &db_path, false).unwrap();
654 assert!(!r2.full_rebuild_triggered);
655 assert_eq!(r2.events_applied, 1);
656
657 let e3 = make_create_event("bn-003", "Item 3", 1002);
659 let e4 = make_create_event("bn-004", "Item 4", 1003);
660 append_event(&shard_mgr, &e3);
661 append_event(&shard_mgr, &e4);
662 let r3 = incremental_apply(&events_dir, &db_path, false).unwrap();
663 assert!(!r3.full_rebuild_triggered);
664 assert_eq!(r3.events_applied, 2);
665
666 let conn = open_projection(&db_path).unwrap();
668 let count: i64 = conn
669 .query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
670 .unwrap();
671 assert_eq!(count, 4);
672 }
673
674 #[test]
675 fn incremental_apply_matches_full_rebuild() {
676 let (dir, shard_mgr) = setup_bones_dir();
677 let events_dir = dir.path().join("events");
678
679 for i in 0..10 {
681 let e = make_create_event(
682 &format!("bn-{i:03x}"),
683 &format!("Item {i}"),
684 1000 + i64::from(i),
685 );
686 append_event(&shard_mgr, &e);
687 }
688
689 let db_full = dir.path().join("full.db");
691 rebuild::rebuild(&events_dir, &db_full).unwrap();
692
693 let db_inc = dir.path().join("inc.db");
695 rebuild::rebuild(&events_dir, &db_inc).unwrap();
699
700 let conn_full = open_projection(&db_full).unwrap();
702 let conn_inc = open_projection(&db_inc).unwrap();
703
704 let count_full: i64 = conn_full
705 .query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
706 .unwrap();
707 let count_inc: i64 = conn_inc
708 .query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
709 .unwrap();
710 assert_eq!(count_full, count_inc);
711 assert_eq!(count_full, 10);
712
713 let titles_full: Vec<String> = {
715 let mut stmt = conn_full
716 .prepare("SELECT title FROM items ORDER BY item_id")
717 .unwrap();
718 stmt.query_map([], |row| row.get::<_, String>(0))
719 .unwrap()
720 .map(|r| r.unwrap())
721 .collect()
722 };
723 let titles_inc: Vec<String> = {
724 let mut stmt = conn_inc
725 .prepare("SELECT title FROM items ORDER BY item_id")
726 .unwrap();
727 stmt.query_map([], |row| row.get::<_, String>(0))
728 .unwrap()
729 .map(|r| r.unwrap())
730 .collect()
731 };
732 assert_eq!(titles_full, titles_inc);
733 }
734
735 #[test]
736 fn schema_version_mismatch_triggers_full_rebuild() {
737 let (dir, shard_mgr) = setup_bones_dir();
738 let db_path = dir.path().join("bones.db");
739 let events_dir = dir.path().join("events");
740
741 let create = make_create_event("bn-001", "Item 1", 1000);
742 append_event(&shard_mgr, &create);
743
744 rebuild::rebuild(&events_dir, &db_path).unwrap();
745
746 {
748 let conn = open_projection(&db_path).unwrap();
749 conn.pragma_update(None, "user_version", 999_i64).unwrap();
750 }
751
752 let report = incremental_apply(&events_dir, &db_path, false).unwrap();
753 assert!(report.full_rebuild_triggered);
754 assert!(
755 report
756 .full_rebuild_reason
757 .as_deref()
758 .unwrap()
759 .contains("schema version"),
760 "reason: {:?}",
761 report.full_rebuild_reason
762 );
763 }
764
765 #[test]
766 fn read_hwm_returns_none_for_fresh_db() {
767 let mut conn = Connection::open_in_memory().unwrap();
768 migrations::migrate(&mut conn).unwrap();
769
770 let hwm = read_hwm(&conn).unwrap();
771 assert!(hwm.is_none());
772 }
773
774 #[test]
775 fn write_and_read_hwm_roundtrip() {
776 let mut conn = Connection::open_in_memory().unwrap();
777 migrations::migrate(&mut conn).unwrap();
778
779 let hash = EventHash("blake3:abc123".into());
780 write_hwm(&conn, &hash).unwrap();
781
782 let retrieved = read_hwm(&conn).unwrap();
783 assert_eq!(retrieved.unwrap(), hash);
784 }
785
786 #[test]
787 fn check_incremental_safety_passes_valid_db() {
788 let (dir, shard_mgr) = setup_bones_dir();
789 let db_path = dir.path().join("bones.db");
790 let events_dir = dir.path().join("events");
791
792 let create = make_create_event("bn-001", "Item 1", 1000);
793 append_event(&shard_mgr, &create);
794
795 rebuild::rebuild(&events_dir, &db_path).unwrap();
796
797 let conn = open_projection(&db_path).unwrap();
798 project::ensure_tracking_table(&conn).unwrap();
799 let result = check_incremental_safety(&conn, &events_dir);
800 assert!(result.is_ok(), "safety check failed: {result:?}");
801 }
802
803 #[test]
804 fn check_incremental_safety_fails_schema_mismatch() {
805 let mut conn = Connection::open_in_memory().unwrap();
806 migrations::migrate(&mut conn).unwrap();
807 conn.pragma_update(None, "user_version", 999_i64).unwrap();
808
809 let result = check_incremental_safety(&conn, Path::new("/nonexistent"));
811 assert!(result.is_err());
812 assert!(result.unwrap_err().contains("schema version"));
813 }
814
815 #[test]
816 fn check_incremental_safety_fails_missing_tracking_table() {
817 let mut conn = Connection::open_in_memory().unwrap();
818 migrations::migrate(&mut conn).unwrap();
819 let result = check_incremental_safety(&conn, Path::new("/nonexistent"));
822 assert!(result.is_err());
823 assert!(result.unwrap_err().contains("projected_events"));
824 }
825
826 #[test]
827 fn validate_cursor_hash_finds_hash_near_offset() {
828 let content = "line1\thash1\nline2\tblake3:abc123\nline3\thash3\n";
829 let offset = content.find("line3").unwrap();
830 assert!(validate_cursor_hash(content, offset, "blake3:abc123"));
831 }
832
833 #[test]
834 fn validate_cursor_hash_fails_wrong_hash() {
835 let content = "line1\thash1\nline2\tblake3:abc123\nline3\thash3\n";
836 let offset = content.find("line3").unwrap();
837 assert!(!validate_cursor_hash(content, offset, "blake3:zzz999"));
838 }
839
840 #[test]
841 fn validate_cursor_hash_fails_zero_offset() {
842 let content = "line1\tblake3:abc123\n";
843 assert!(!validate_cursor_hash(content, 0, "blake3:abc123"));
844 }
845}