1use std::io;
31use std::path::Path;
32use std::time::Instant;
33
34use anyhow::{Context, Result};
35use rusqlite::Connection;
36
37use crate::db::{migrations, project, query, rebuild};
38use crate::event::Event;
39use crate::shard::ShardManager;
40
41#[derive(Debug, Clone, PartialEq, Eq)]
48pub struct EventHash(pub String);
49
50#[derive(Debug, Clone)]
52pub struct ApplyReport {
53 pub events_applied: usize,
55 pub shards_scanned: usize,
57 pub full_rebuild_triggered: bool,
59 pub full_rebuild_reason: Option<String>,
61 pub elapsed: std::time::Duration,
63}
64
65#[allow(clippy::too_many_lines)]
91pub fn incremental_apply(
92 events_dir: &Path,
93 db_path: &Path,
94 force_full: bool,
95) -> Result<ApplyReport> {
96 let start = Instant::now();
97
98 if force_full {
99 return do_full_rebuild(events_dir, db_path, start, "force_full flag set");
100 }
101
102 let Some(conn) = query::try_open_projection_raw(db_path)? else {
106 return do_full_rebuild(
107 events_dir,
108 db_path,
109 start,
110 "projection database missing or corrupt",
111 );
112 };
113
114 let (byte_offset, last_hash) =
116 query::get_projection_cursor(&conn).context("read projection cursor")?;
117
118 if byte_offset == 0 && last_hash.is_none() {
120 drop(conn);
121 return do_full_rebuild(events_dir, db_path, start, "fresh database (no cursor)");
122 }
123
124 if let Err(reason) = check_incremental_safety(&conn, events_dir) {
126 drop(conn);
127 return do_full_rebuild(events_dir, db_path, start, &reason);
128 }
129
130 let bones_dir = events_dir.parent().unwrap_or_else(|| Path::new("."));
132 let shard_mgr = ShardManager::new(bones_dir);
133 let shards = shard_mgr
134 .list_shards()
135 .map_err(|e| anyhow::anyhow!("list shards: {e}"))?;
136 let shards_scanned = shards.len();
137
138 let offset = usize::try_from(byte_offset).unwrap_or(0);
139
140 if let Some(ref hash) = last_hash {
143 let tail_ok = validate_cursor_hash_at_offset(&shard_mgr, offset, hash).unwrap_or(false);
144 if !tail_ok {
145 drop(conn);
146 return do_full_rebuild(
147 events_dir,
148 db_path,
149 start,
150 "cursor hash not found at expected byte offset",
151 );
152 }
153 }
154
155 let mut line_iter = shard_mgr
156 .replay_lines_from_offset(offset)
157 .map_err(|e| anyhow::anyhow!("open shard line iterator: {e}"))?
158 .peekable();
159
160 if line_iter.peek().is_none() {
162 return Ok(ApplyReport {
163 events_applied: 0,
164 shards_scanned,
165 full_rebuild_triggered: false,
166 full_rebuild_reason: None,
167 elapsed: start.elapsed(),
168 });
169 }
170
171 project::ensure_tracking_table(&conn).context("ensure projected_events tracking table")?;
173
174 let mut version_checked = false;
175 let mut shard_version = crate::event::parser::CURRENT_VERSION;
176 let mut line_no = 0;
177 let mut total_projected = 0;
178 let mut total_duplicates = 0;
179 let mut total_errors = 0;
180 let mut current_last_hash = last_hash;
181 let mut total_byte_len = offset;
182
183 let mut current_batch: Vec<Event> = Vec::with_capacity(1000);
184 let projector = project::Projector::new(&conn);
185
186 for line_res in line_iter {
187 let (abs_offset, line): (usize, String) =
188 line_res.map_err(|e: io::Error| anyhow::anyhow!("read shard line: {e}"))?;
189 line_no += 1;
190 total_byte_len = abs_offset + line.len();
191
192 if !version_checked && line.trim_start().starts_with("# bones event log v") {
194 version_checked = true;
195 shard_version = crate::event::parser::detect_version(&line)
196 .map_err(|msg| anyhow::anyhow!("version check failed: {msg}"))?;
197 continue;
198 }
199
200 match crate::event::parser::parse_line(&line) {
201 Ok(crate::event::parser::ParsedLine::Event(event)) => {
202 let event = crate::event::migrate_event(*event, shard_version)
203 .map_err(|e| anyhow::anyhow!("migration failed: {e}"))?;
204
205 current_last_hash = Some(event.event_hash.clone());
206 current_batch.push(event);
207
208 if current_batch.len() >= 1000 {
209 let stats = projector
210 .project_batch(¤t_batch)
211 .context("project batch during incremental apply")?;
212 total_projected += stats.projected;
213 total_duplicates += stats.duplicates;
214 total_errors += stats.errors;
215 current_batch.clear();
216 }
217 }
218 Ok(
219 crate::event::parser::ParsedLine::Comment(_)
220 | crate::event::parser::ParsedLine::Blank,
221 ) => {}
222 Err(crate::event::parser::ParseError::InvalidEventType(raw)) => {
223 tracing::warn!(line = line_no, event_type = %raw, "skipping unknown event type");
224 }
225 Err(e) => anyhow::bail!("parse error at line {line_no} (offset {abs_offset}): {e}"),
226 }
227 }
228
229 if !current_batch.is_empty() {
231 let stats = projector
232 .project_batch(¤t_batch)
233 .context("project final batch during incremental apply")?;
234 total_projected += stats.projected;
235 total_duplicates += stats.duplicates;
236 total_errors += stats.errors;
237 }
238
239 let new_offset = i64::try_from(total_byte_len).unwrap_or(i64::MAX);
241 query::update_projection_cursor(&conn, new_offset, current_last_hash.as_deref())
242 .context("update projection cursor after incremental apply")?;
243
244 tracing::info!(
245 events_applied = total_projected,
246 duplicates = total_duplicates,
247 errors = total_errors,
248 shards_scanned,
249 byte_offset_from = byte_offset,
250 byte_offset_to = new_offset,
251 elapsed_ms = start.elapsed().as_millis(),
252 "incremental projection apply complete"
253 );
254
255 Ok(ApplyReport {
256 events_applied: total_projected,
257 shards_scanned,
258 full_rebuild_triggered: false,
259 full_rebuild_reason: None,
260 elapsed: start.elapsed(),
261 })
262}
263
264pub fn read_hwm(db: &Connection) -> Result<Option<EventHash>> {
271 let (_offset, hash) = query::get_projection_cursor(db).context("read high-water mark")?;
272 Ok(hash.map(EventHash))
273}
274
275pub fn write_hwm(db: &Connection, hwm: &EventHash) -> Result<()> {
281 let (offset, _) =
283 query::get_projection_cursor(db).context("read current cursor for hwm update")?;
284 query::update_projection_cursor(db, offset, Some(&hwm.0)).context("write high-water mark")?;
285 Ok(())
286}
287
288pub fn check_incremental_safety(db: &Connection, events_dir: &Path) -> Result<(), String> {
303 let schema_version = migrations::current_schema_version(db)
305 .map_err(|e| format!("failed to read schema version: {e}"))?;
306 if schema_version != migrations::LATEST_SCHEMA_VERSION {
307 return Err(format!(
308 "schema version mismatch: db has v{schema_version}, code expects v{}",
309 migrations::LATEST_SCHEMA_VERSION
310 ));
311 }
312
313 let table_exists: bool = db
315 .query_row(
316 "SELECT EXISTS(SELECT 1 FROM sqlite_master WHERE type='table' AND name='projected_events')",
317 [],
318 |row| row.get(0),
319 )
320 .map_err(|e| format!("failed to check projected_events table: {e}"))?;
321 if !table_exists {
322 return Err("projected_events tracking table missing".into());
323 }
324
325 let bones_dir = events_dir.parent().unwrap_or_else(|| Path::new("."));
327 let shard_mgr = ShardManager::new(bones_dir);
328 let shards = shard_mgr
329 .list_shards()
330 .map_err(|e| format!("failed to list shards: {e}"))?;
331
332 if shards.len() > 1 {
334 for &(year, month) in &shards[..shards.len() - 1] {
335 if let Ok(Some(manifest)) = shard_mgr.read_manifest(year, month) {
336 let shard_path = shard_mgr.shard_path(year, month);
337 match std::fs::metadata(&shard_path) {
338 Ok(meta) => {
339 if meta.len() != manifest.byte_len {
340 return Err(format!(
341 "sealed shard {}-{:02} size mismatch: \
342 manifest says {} bytes, file is {} bytes",
343 year,
344 month,
345 manifest.byte_len,
346 meta.len()
347 ));
348 }
349 }
350 Err(e) => {
351 return Err(format!("cannot stat sealed shard {year}-{month:02}: {e}"));
352 }
353 }
354 }
355 }
358 }
359
360 Ok(())
361}
362
363fn validate_cursor_hash_at_offset(
373 shard_mgr: &ShardManager,
374 offset: usize,
375 hash: &str,
376) -> Result<bool> {
377 if offset == 0 {
378 return Ok(false);
379 }
380 let search_start = offset.saturating_sub(512);
381 let window = shard_mgr
382 .read_content_range(search_start, offset)
383 .map_err(|e| anyhow::anyhow!("read cursor hash window: {e}"))?;
384 Ok(window.contains(hash))
385}
386
387#[cfg(test)]
391fn validate_cursor_hash(content: &str, offset: usize, hash: &str) -> bool {
392 if offset == 0 || offset > content.len() {
393 return false;
394 }
395
396 fn snap_forward(s: &str, offset: usize) -> usize {
398 let mut p = offset.min(s.len());
399 while p < s.len() && !s.is_char_boundary(p) {
400 p += 1;
401 }
402 p
403 }
404 let end = snap_forward(content, offset);
405 let before = &content[..end];
406 let search_start = snap_forward(content, offset.saturating_sub(512));
407 let search_region = &before[search_start.min(end)..];
408 search_region.contains(hash)
409}
410
411fn do_full_rebuild(
413 events_dir: &Path,
414 db_path: &Path,
415 start: Instant,
416 reason: &str,
417) -> Result<ApplyReport> {
418 tracing::info!(reason, "falling back to full projection rebuild");
419
420 let report = rebuild::rebuild(events_dir, db_path)
421 .context("full rebuild during incremental apply fallback")?;
422
423 Ok(ApplyReport {
424 events_applied: report.event_count,
425 shards_scanned: report.shard_count,
426 full_rebuild_triggered: true,
427 full_rebuild_reason: Some(reason.to_string()),
428 elapsed: start.elapsed(),
429 })
430}
431
432#[cfg(test)]
437mod tests {
438 use super::*;
439 use crate::db::open_projection;
440 use crate::event::Event;
441 use crate::event::data::*;
442 use crate::event::types::EventType;
443 use crate::event::writer;
444 use crate::model::item::{Kind, Size, Urgency};
445 use crate::model::item_id::ItemId;
446 use std::collections::BTreeMap;
447 use tempfile::TempDir;
448
449 fn setup_bones_dir() -> (TempDir, ShardManager) {
454 let dir = TempDir::new().expect("create tempdir");
455 let shard_mgr = ShardManager::new(dir.path());
456 shard_mgr.ensure_dirs().expect("ensure dirs");
457 shard_mgr.init().expect("init shard");
458 (dir, shard_mgr)
459 }
460
461 fn make_create_event(id: &str, title: &str, ts: i64) -> Event {
462 let mut event = Event {
463 wall_ts_us: ts,
464 agent: "test-agent".into(),
465 itc: "itc:AQ".into(),
466 parents: vec![],
467 event_type: EventType::Create,
468 item_id: ItemId::new_unchecked(id),
469 data: EventData::Create(CreateData {
470 title: title.into(),
471 kind: Kind::Task,
472 size: Some(Size::M),
473 urgency: Urgency::Default,
474 labels: vec!["test".into()],
475 parent: None,
476 causation: None,
477 description: Some(format!("Description for {title}")),
478 extra: BTreeMap::new(),
479 }),
480 event_hash: String::new(),
481 };
482 writer::write_event(&mut event).expect("compute hash");
483 event
484 }
485
486 fn append_event(shard_mgr: &ShardManager, event: &Event) {
487 let line = writer::write_line(event).expect("serialize event");
488 let (year, month) = shard_mgr.active_shard().unwrap().unwrap();
489 shard_mgr
490 .append_raw(year, month, &line)
491 .expect("append event");
492 }
493
494 #[test]
499 fn incremental_apply_on_empty_db_does_full_rebuild() {
500 let (dir, _shard_mgr) = setup_bones_dir();
501 let db_path = dir.path().join("bones.db");
502 let events_dir = dir.path().join("events");
503
504 let report = incremental_apply(&events_dir, &db_path, false).unwrap();
505 assert!(report.full_rebuild_triggered);
506 assert!(
507 report
508 .full_rebuild_reason
509 .as_deref()
510 .unwrap()
511 .contains("missing"),
512 "reason: {:?}",
513 report.full_rebuild_reason
514 );
515 }
516
517 #[test]
518 fn incremental_apply_force_full() {
519 let (dir, shard_mgr) = setup_bones_dir();
520 let db_path = dir.path().join("bones.db");
521 let events_dir = dir.path().join("events");
522
523 let create = make_create_event("bn-001", "Item 1", 1000);
524 append_event(&shard_mgr, &create);
525
526 rebuild::rebuild(&events_dir, &db_path).unwrap();
528
529 let report = incremental_apply(&events_dir, &db_path, true).unwrap();
531 assert!(report.full_rebuild_triggered);
532 assert_eq!(
533 report.full_rebuild_reason.as_deref(),
534 Some("force_full flag set")
535 );
536 assert_eq!(report.events_applied, 1);
537 }
538
539 #[test]
540 fn incremental_apply_picks_up_new_events() {
541 let (dir, shard_mgr) = setup_bones_dir();
542 let db_path = dir.path().join("bones.db");
543 let events_dir = dir.path().join("events");
544
545 let create1 = make_create_event("bn-001", "Item 1", 1000);
547 let create2 = make_create_event("bn-002", "Item 2", 1001);
548 append_event(&shard_mgr, &create1);
549 append_event(&shard_mgr, &create2);
550
551 rebuild::rebuild(&events_dir, &db_path).unwrap();
552
553 let create3 = make_create_event("bn-003", "Item 3", 1002);
555 append_event(&shard_mgr, &create3);
556
557 let report = incremental_apply(&events_dir, &db_path, false).unwrap();
559 assert!(!report.full_rebuild_triggered);
560 assert_eq!(report.events_applied, 1);
561
562 let conn = open_projection(&db_path).unwrap();
564 let count: i64 = conn
565 .query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
566 .unwrap();
567 assert_eq!(count, 3);
568 }
569
570 #[test]
571 fn incremental_apply_noop_when_up_to_date() {
572 let (dir, shard_mgr) = setup_bones_dir();
573 let db_path = dir.path().join("bones.db");
574 let events_dir = dir.path().join("events");
575
576 let create = make_create_event("bn-001", "Item 1", 1000);
577 append_event(&shard_mgr, &create);
578
579 rebuild::rebuild(&events_dir, &db_path).unwrap();
580
581 let report = incremental_apply(&events_dir, &db_path, false).unwrap();
583 assert!(!report.full_rebuild_triggered);
584 assert_eq!(report.events_applied, 0);
585 }
586
587 #[test]
588 fn incremental_apply_multiple_rounds() {
589 let (dir, shard_mgr) = setup_bones_dir();
590 let db_path = dir.path().join("bones.db");
591 let events_dir = dir.path().join("events");
592
593 let e1 = make_create_event("bn-001", "Item 1", 1000);
595 append_event(&shard_mgr, &e1);
596 rebuild::rebuild(&events_dir, &db_path).unwrap();
597
598 let e2 = make_create_event("bn-002", "Item 2", 1001);
600 append_event(&shard_mgr, &e2);
601 let r2 = incremental_apply(&events_dir, &db_path, false).unwrap();
602 assert!(!r2.full_rebuild_triggered);
603 assert_eq!(r2.events_applied, 1);
604
605 let e3 = make_create_event("bn-003", "Item 3", 1002);
607 let e4 = make_create_event("bn-004", "Item 4", 1003);
608 append_event(&shard_mgr, &e3);
609 append_event(&shard_mgr, &e4);
610 let r3 = incremental_apply(&events_dir, &db_path, false).unwrap();
611 assert!(!r3.full_rebuild_triggered);
612 assert_eq!(r3.events_applied, 2);
613
614 let conn = open_projection(&db_path).unwrap();
616 let count: i64 = conn
617 .query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
618 .unwrap();
619 assert_eq!(count, 4);
620 }
621
622 #[test]
623 fn incremental_apply_matches_full_rebuild() {
624 let (dir, shard_mgr) = setup_bones_dir();
625 let events_dir = dir.path().join("events");
626
627 for i in 0..10 {
629 let e = make_create_event(
630 &format!("bn-{i:03x}"),
631 &format!("Item {i}"),
632 1000 + i64::from(i),
633 );
634 append_event(&shard_mgr, &e);
635 }
636
637 let db_full = dir.path().join("full.db");
639 rebuild::rebuild(&events_dir, &db_full).unwrap();
640
641 let db_inc = dir.path().join("inc.db");
643 rebuild::rebuild(&events_dir, &db_inc).unwrap();
647
648 let conn_full = open_projection(&db_full).unwrap();
650 let conn_inc = open_projection(&db_inc).unwrap();
651
652 let count_full: i64 = conn_full
653 .query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
654 .unwrap();
655 let count_inc: i64 = conn_inc
656 .query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
657 .unwrap();
658 assert_eq!(count_full, count_inc);
659 assert_eq!(count_full, 10);
660
661 let titles_full: Vec<String> = {
663 let mut stmt = conn_full
664 .prepare("SELECT title FROM items ORDER BY item_id")
665 .unwrap();
666 stmt.query_map([], |row| row.get::<_, String>(0))
667 .unwrap()
668 .map(|r| r.unwrap())
669 .collect()
670 };
671 let titles_inc: Vec<String> = {
672 let mut stmt = conn_inc
673 .prepare("SELECT title FROM items ORDER BY item_id")
674 .unwrap();
675 stmt.query_map([], |row| row.get::<_, String>(0))
676 .unwrap()
677 .map(|r| r.unwrap())
678 .collect()
679 };
680 assert_eq!(titles_full, titles_inc);
681 }
682
683 #[test]
684 fn schema_version_mismatch_triggers_full_rebuild() {
685 let (dir, shard_mgr) = setup_bones_dir();
686 let db_path = dir.path().join("bones.db");
687 let events_dir = dir.path().join("events");
688
689 let create = make_create_event("bn-001", "Item 1", 1000);
690 append_event(&shard_mgr, &create);
691
692 rebuild::rebuild(&events_dir, &db_path).unwrap();
693
694 {
696 let conn = open_projection(&db_path).unwrap();
697 conn.pragma_update(None, "user_version", 999_i64).unwrap();
698 }
699
700 let report = incremental_apply(&events_dir, &db_path, false).unwrap();
701 assert!(report.full_rebuild_triggered);
702 assert!(
703 report
704 .full_rebuild_reason
705 .as_deref()
706 .unwrap()
707 .contains("schema version"),
708 "reason: {:?}",
709 report.full_rebuild_reason
710 );
711 }
712
713 #[test]
714 fn read_hwm_returns_none_for_fresh_db() {
715 let mut conn = Connection::open_in_memory().unwrap();
716 migrations::migrate(&mut conn).unwrap();
717
718 let hwm = read_hwm(&conn).unwrap();
719 assert!(hwm.is_none());
720 }
721
722 #[test]
723 fn write_and_read_hwm_roundtrip() {
724 let mut conn = Connection::open_in_memory().unwrap();
725 migrations::migrate(&mut conn).unwrap();
726
727 let hash = EventHash("blake3:abc123".into());
728 write_hwm(&conn, &hash).unwrap();
729
730 let retrieved = read_hwm(&conn).unwrap();
731 assert_eq!(retrieved.unwrap(), hash);
732 }
733
734 #[test]
735 fn check_incremental_safety_passes_valid_db() {
736 let (dir, shard_mgr) = setup_bones_dir();
737 let db_path = dir.path().join("bones.db");
738 let events_dir = dir.path().join("events");
739
740 let create = make_create_event("bn-001", "Item 1", 1000);
741 append_event(&shard_mgr, &create);
742
743 rebuild::rebuild(&events_dir, &db_path).unwrap();
744
745 let conn = open_projection(&db_path).unwrap();
746 project::ensure_tracking_table(&conn).unwrap();
747 let result = check_incremental_safety(&conn, &events_dir);
748 assert!(result.is_ok(), "safety check failed: {result:?}");
749 }
750
751 #[test]
752 fn check_incremental_safety_fails_schema_mismatch() {
753 let mut conn = Connection::open_in_memory().unwrap();
754 migrations::migrate(&mut conn).unwrap();
755 conn.pragma_update(None, "user_version", 999_i64).unwrap();
756
757 let result = check_incremental_safety(&conn, Path::new("/nonexistent"));
759 assert!(result.is_err());
760 assert!(result.unwrap_err().contains("schema version"));
761 }
762
763 #[test]
764 fn check_incremental_safety_fails_missing_tracking_table() {
765 let mut conn = Connection::open_in_memory().unwrap();
766 migrations::migrate(&mut conn).unwrap();
767 let result = check_incremental_safety(&conn, Path::new("/nonexistent"));
770 assert!(result.is_err());
771 assert!(result.unwrap_err().contains("projected_events"));
772 }
773
774 #[test]
775 fn validate_cursor_hash_finds_hash_near_offset() {
776 let content = "line1\thash1\nline2\tblake3:abc123\nline3\thash3\n";
777 let offset = content.find("line3").unwrap();
778 assert!(validate_cursor_hash(content, offset, "blake3:abc123"));
779 }
780
781 #[test]
782 fn validate_cursor_hash_fails_wrong_hash() {
783 let content = "line1\thash1\nline2\tblake3:abc123\nline3\thash3\n";
784 let offset = content.find("line3").unwrap();
785 assert!(!validate_cursor_hash(content, offset, "blake3:zzz999"));
786 }
787
788 #[test]
789 fn validate_cursor_hash_fails_zero_offset() {
790 let content = "line1\tblake3:abc123\n";
791 assert!(!validate_cursor_hash(content, 0, "blake3:abc123"));
792 }
793}