Skip to main content

bones_core/db/
incremental.rs

1//! Incremental projection rebuild and invalidation.
2//!
3//! On startup, instead of replaying the entire event log, we read the
4//! projection cursor (byte offset + last event hash) from
5//! `projection_meta` and replay only events after that point.
6//!
7//! # Safety checks
8//!
9//! Before doing an incremental apply, several invariants are verified:
10//!
11//! 1. **Schema version** — the DB schema version must match
12//!    [`migrations::LATEST_SCHEMA_VERSION`].  A mismatch means the code has
13//!    been upgraded and a full rebuild is needed.
14//!
15//! 2. **Cursor hash found** — the `last_event_hash` stored in the cursor
16//!    must appear in the shard content at the expected byte offset.  If the
17//!    hash cannot be found the shard was modified (e.g. deleted/rotated)
18//!    and incremental replay is unsafe.
19//!
20//! 3. **Sealed shard manifest integrity** — for every sealed shard that has
21//!    a `.manifest` file, the recorded `byte_len` must match the actual
22//!    file size.  A mismatch indicates shard corruption or tampering.
23//!
24//! 4. **Projection tracking table** — the `projected_events` table must
25//!    exist. If it doesn't, we can't deduplicate and must rebuild.
26//!
27//! When any check fails, [`incremental_apply`] falls back to a full rebuild
28//! automatically, returning the reason in [`ApplyReport::full_rebuild_reason`].
29
30use std::io;
31use std::path::Path;
32use std::time::Instant;
33
34use anyhow::{Context, Result};
35use rusqlite::Connection;
36
37use crate::db::{migrations, project, query, rebuild};
38use crate::event::Event;
39use crate::shard::ShardManager;
40
41// ---------------------------------------------------------------------------
42// Types
43// ---------------------------------------------------------------------------
44
45/// Identifies the last event that was successfully applied to the projection.
46/// Stored in the `projection_meta` table in `SQLite`.
47#[derive(Debug, Clone, PartialEq, Eq)]
48pub struct EventHash(pub String);
49
50/// Report from an incremental apply operation.
51#[derive(Debug, Clone)]
52pub struct ApplyReport {
53    /// Number of new events applied.
54    pub events_applied: usize,
55    /// Number of shards scanned.
56    pub shards_scanned: usize,
57    /// Whether a full rebuild was triggered instead of incremental.
58    pub full_rebuild_triggered: bool,
59    /// Reason for full rebuild, if triggered.
60    pub full_rebuild_reason: Option<String>,
61    /// Elapsed wall time.
62    pub elapsed: std::time::Duration,
63}
64
65// ---------------------------------------------------------------------------
66// Public API
67// ---------------------------------------------------------------------------
68
69/// Apply only events newer than the high-water mark to the projection.
70///
71/// Steps:
72/// 1. Open (or try to open) the projection database.
73/// 2. Read the projection cursor (byte offset + last event hash).
74/// 3. Run safety checks — schema version, cursor validity, manifest integrity.
75/// 4. If any check fails, fall back to a full rebuild.
76/// 5. Otherwise, read shard content from the cursor byte offset onward.
77/// 6. Parse and project only the new events.
78/// 7. Update the cursor.
79///
80/// # Arguments
81///
82/// * `events_dir` — Path to `.bones/events/` (the shard directory)
83/// * `db_path`    — Path to `.bones/bones.db` (the `SQLite` projection file)
84/// * `force_full` — If `true`, skip incremental and always do a full rebuild
85///   (`bn admin rebuild --full`).
86///
87/// # Errors
88///
89/// Returns an error if reading shards, parsing events, or projection fails.
90#[allow(clippy::too_many_lines)]
91pub fn incremental_apply(
92    events_dir: &Path,
93    db_path: &Path,
94    force_full: bool,
95) -> Result<ApplyReport> {
96    let start = Instant::now();
97
98    if force_full {
99        return do_full_rebuild(events_dir, db_path, start, "force_full flag set");
100    }
101
102    // Try to open existing DB.  If it doesn't exist or is corrupt we need a
103    // full rebuild.
104    // Use _raw to avoid recursion: ensure_projection → incremental_apply → try_open_projection → ensure_projection …
105    let Some(conn) = query::try_open_projection_raw(db_path)? else {
106        return do_full_rebuild(
107            events_dir,
108            db_path,
109            start,
110            "projection database missing or corrupt",
111        );
112    };
113
114    // Read cursor
115    let (byte_offset, last_hash) =
116        query::get_projection_cursor(&conn).context("read projection cursor")?;
117
118    // Fresh database — no events have been applied yet → full rebuild
119    if byte_offset == 0 && last_hash.is_none() {
120        drop(conn);
121        return do_full_rebuild(events_dir, db_path, start, "fresh database (no cursor)");
122    }
123
124    // Run safety checks
125    if let Err(reason) = check_incremental_safety(&conn, events_dir) {
126        drop(conn);
127        return do_full_rebuild(events_dir, db_path, start, &reason);
128    }
129
130    // 6. Read and replay new events in streaming batches
131    let bones_dir = events_dir.parent().unwrap_or_else(|| Path::new("."));
132    let shard_mgr = ShardManager::new(bones_dir);
133    let shards = shard_mgr
134        .list_shards()
135        .map_err(|e| anyhow::anyhow!("list shards: {e}"))?;
136    let shards_scanned = shards.len();
137
138    let offset = usize::try_from(byte_offset).unwrap_or(0);
139
140    // Validate cursor hash: it must appear in the tail of already-processed
141    // content (the 512 bytes just before the cursor offset).
142    if let Some(ref hash) = last_hash {
143        let tail_ok = validate_cursor_hash_at_offset(&shard_mgr, offset, hash).unwrap_or(false);
144        if !tail_ok {
145            drop(conn);
146            return do_full_rebuild(
147                events_dir,
148                db_path,
149                start,
150                "cursor hash not found at expected byte offset",
151            );
152        }
153    }
154
155    let mut line_iter = shard_mgr
156        .replay_lines_from_offset(offset)
157        .map_err(|e| anyhow::anyhow!("open shard line iterator: {e}"))?
158        .peekable();
159
160    // If there's no new content, we're up to date
161    if line_iter.peek().is_none() {
162        return Ok(ApplyReport {
163            events_applied: 0,
164            shards_scanned,
165            full_rebuild_triggered: false,
166            full_rebuild_reason: None,
167            elapsed: start.elapsed(),
168        });
169    }
170
171    // Ensure tracking table exists (needed for dedup)
172    project::ensure_tracking_table(&conn).context("ensure projected_events tracking table")?;
173
174    let mut version_checked = false;
175    let mut shard_version = crate::event::parser::CURRENT_VERSION;
176    let mut line_no = 0;
177    let mut total_projected = 0;
178    let mut total_duplicates = 0;
179    let mut total_errors = 0;
180    let mut current_last_hash = last_hash;
181    let mut total_byte_len = offset;
182
183    let mut current_batch: Vec<Event> = Vec::with_capacity(1000);
184    let projector = project::Projector::new(&conn);
185
186    for line_res in line_iter {
187        let (abs_offset, line): (usize, String) =
188            line_res.map_err(|e: io::Error| anyhow::anyhow!("read shard line: {e}"))?;
189        line_no += 1;
190        total_byte_len = abs_offset + line.len();
191
192        // Version check if we hit a header
193        if !version_checked && line.trim_start().starts_with("# bones event log v") {
194            version_checked = true;
195            shard_version = crate::event::parser::detect_version(&line)
196                .map_err(|msg| anyhow::anyhow!("version check failed: {msg}"))?;
197            continue;
198        }
199
200        match crate::event::parser::parse_line(&line) {
201            Ok(crate::event::parser::ParsedLine::Event(event)) => {
202                let event = crate::event::migrate_event(*event, shard_version)
203                    .map_err(|e| anyhow::anyhow!("migration failed: {e}"))?;
204
205                current_last_hash = Some(event.event_hash.clone());
206                current_batch.push(event);
207
208                if current_batch.len() >= 1000 {
209                    let stats = projector
210                        .project_batch(&current_batch)
211                        .context("project batch during incremental apply")?;
212                    total_projected += stats.projected;
213                    total_duplicates += stats.duplicates;
214                    total_errors += stats.errors;
215                    current_batch.clear();
216                }
217            }
218            Ok(
219                crate::event::parser::ParsedLine::Comment(_)
220                | crate::event::parser::ParsedLine::Blank,
221            ) => {}
222            Err(crate::event::parser::ParseError::InvalidEventType(raw)) => {
223                tracing::warn!(line = line_no, event_type = %raw, "skipping unknown event type");
224            }
225            Err(e) => anyhow::bail!("parse error at line {line_no} (offset {abs_offset}): {e}"),
226        }
227    }
228
229    // Final batch
230    if !current_batch.is_empty() {
231        let stats = projector
232            .project_batch(&current_batch)
233            .context("project final batch during incremental apply")?;
234        total_projected += stats.projected;
235        total_duplicates += stats.duplicates;
236        total_errors += stats.errors;
237    }
238
239    // Update cursor to the end of current content
240    let new_offset = i64::try_from(total_byte_len).unwrap_or(i64::MAX);
241    query::update_projection_cursor(&conn, new_offset, current_last_hash.as_deref())
242        .context("update projection cursor after incremental apply")?;
243
244    tracing::info!(
245        events_applied = total_projected,
246        duplicates = total_duplicates,
247        errors = total_errors,
248        shards_scanned,
249        byte_offset_from = byte_offset,
250        byte_offset_to = new_offset,
251        elapsed_ms = start.elapsed().as_millis(),
252        "incremental projection apply complete"
253    );
254
255    Ok(ApplyReport {
256        events_applied: total_projected,
257        shards_scanned,
258        full_rebuild_triggered: false,
259        full_rebuild_reason: None,
260        elapsed: start.elapsed(),
261    })
262}
263
264/// Read the current high-water mark from the `SQLite` metadata table.
265/// Returns `None` if no events have been applied (fresh DB).
266///
267/// # Errors
268///
269/// Returns an error if the database query fails.
270pub fn read_hwm(db: &Connection) -> Result<Option<EventHash>> {
271    let (_offset, hash) = query::get_projection_cursor(db).context("read high-water mark")?;
272    Ok(hash.map(EventHash))
273}
274
275/// Write the high-water mark after successful apply.
276///
277/// # Errors
278///
279/// Returns an error if the database update fails.
280pub fn write_hwm(db: &Connection, hwm: &EventHash) -> Result<()> {
281    // Preserve the existing offset, just update the hash
282    let (offset, _) =
283        query::get_projection_cursor(db).context("read current cursor for hwm update")?;
284    query::update_projection_cursor(db, offset, Some(&hwm.0)).context("write high-water mark")?;
285    Ok(())
286}
287
288/// Check if incremental apply is safe or if full rebuild is needed.
289///
290/// Checks:
291/// 1. Schema version matches `LATEST_SCHEMA_VERSION`
292/// 2. `projected_events` tracking table exists
293/// 3. Sealed shard manifests are intact (file sizes match)
294///
295/// Returns `Ok(())` if incremental is safe, `Err(reason)` with a human-readable
296/// reason string if a full rebuild is needed.
297///
298/// # Errors
299///
300/// Returns an error string describing why incremental rebuild is unsafe
301/// (schema mismatch, missing tracking table, or shard corruption).
302pub fn check_incremental_safety(db: &Connection, events_dir: &Path) -> Result<(), String> {
303    // 1. Schema version check
304    let schema_version = migrations::current_schema_version(db)
305        .map_err(|e| format!("failed to read schema version: {e}"))?;
306    if schema_version != migrations::LATEST_SCHEMA_VERSION {
307        return Err(format!(
308            "schema version mismatch: db has v{schema_version}, code expects v{}",
309            migrations::LATEST_SCHEMA_VERSION
310        ));
311    }
312
313    // 2. projected_events table must exist
314    let table_exists: bool = db
315        .query_row(
316            "SELECT EXISTS(SELECT 1 FROM sqlite_master WHERE type='table' AND name='projected_events')",
317            [],
318            |row| row.get(0),
319        )
320        .map_err(|e| format!("failed to check projected_events table: {e}"))?;
321    if !table_exists {
322        return Err("projected_events tracking table missing".into());
323    }
324
325    // 3. Sealed shard manifest integrity
326    let bones_dir = events_dir.parent().unwrap_or_else(|| Path::new("."));
327    let shard_mgr = ShardManager::new(bones_dir);
328    let shards = shard_mgr
329        .list_shards()
330        .map_err(|e| format!("failed to list shards: {e}"))?;
331
332    // All shards except the last (active) one should be sealed
333    if shards.len() > 1 {
334        for &(year, month) in &shards[..shards.len() - 1] {
335            if let Ok(Some(manifest)) = shard_mgr.read_manifest(year, month) {
336                let shard_path = shard_mgr.shard_path(year, month);
337                match std::fs::metadata(&shard_path) {
338                    Ok(meta) => {
339                        if meta.len() != manifest.byte_len {
340                            return Err(format!(
341                                "sealed shard {}-{:02} size mismatch: \
342                                 manifest says {} bytes, file is {} bytes",
343                                year,
344                                month,
345                                manifest.byte_len,
346                                meta.len()
347                            ));
348                        }
349                    }
350                    Err(e) => {
351                        return Err(format!("cannot stat sealed shard {year}-{month:02}: {e}"));
352                    }
353                }
354            }
355            // No manifest file is OK — sealed shards without manifests are
356            // just not verified (they may predate manifest generation).
357        }
358    }
359
360    Ok(())
361}
362
363// ---------------------------------------------------------------------------
364// Internal helpers
365// ---------------------------------------------------------------------------
366
367/// Validate that the cursor hash appears in the 512 bytes immediately before
368/// `offset` in the shard sequence.
369///
370/// We only read the small window `[offset-512, offset)` rather than the
371/// entire shard content, keeping validation O(1) in total shard size.
372fn validate_cursor_hash_at_offset(
373    shard_mgr: &ShardManager,
374    offset: usize,
375    hash: &str,
376) -> Result<bool> {
377    if offset == 0 {
378        return Ok(false);
379    }
380    let search_start = offset.saturating_sub(512);
381    let window = shard_mgr
382        .read_content_range(search_start, offset)
383        .map_err(|e| anyhow::anyhow!("read cursor hash window: {e}"))?;
384    Ok(window.contains(hash))
385}
386
387/// Validate that the cursor hash appears in the content around the expected
388/// byte offset.  Used only in unit tests where the full content is already
389/// available.
390#[cfg(test)]
391fn validate_cursor_hash(content: &str, offset: usize, hash: &str) -> bool {
392    if offset == 0 || offset > content.len() {
393        return false;
394    }
395
396    // Snap to valid UTF-8 char boundaries to avoid panics on multi-byte content.
397    fn snap_forward(s: &str, offset: usize) -> usize {
398        let mut p = offset.min(s.len());
399        while p < s.len() && !s.is_char_boundary(p) {
400            p += 1;
401        }
402        p
403    }
404    let end = snap_forward(content, offset);
405    let before = &content[..end];
406    let search_start = snap_forward(content, offset.saturating_sub(512));
407    let search_region = &before[search_start.min(end)..];
408    search_region.contains(hash)
409}
410
411/// Perform a full rebuild and wrap the result in an `ApplyReport`.
412fn do_full_rebuild(
413    events_dir: &Path,
414    db_path: &Path,
415    start: Instant,
416    reason: &str,
417) -> Result<ApplyReport> {
418    tracing::info!(reason, "falling back to full projection rebuild");
419
420    let report = rebuild::rebuild(events_dir, db_path)
421        .context("full rebuild during incremental apply fallback")?;
422
423    Ok(ApplyReport {
424        events_applied: report.event_count,
425        shards_scanned: report.shard_count,
426        full_rebuild_triggered: true,
427        full_rebuild_reason: Some(reason.to_string()),
428        elapsed: start.elapsed(),
429    })
430}
431
432// ---------------------------------------------------------------------------
433// Tests
434// ---------------------------------------------------------------------------
435
436#[cfg(test)]
437mod tests {
438    use super::*;
439    use crate::db::open_projection;
440    use crate::event::Event;
441    use crate::event::data::*;
442    use crate::event::types::EventType;
443    use crate::event::writer;
444    use crate::model::item::{Kind, Size, Urgency};
445    use crate::model::item_id::ItemId;
446    use std::collections::BTreeMap;
447    use tempfile::TempDir;
448
449    // -----------------------------------------------------------------------
450    // Test helpers
451    // -----------------------------------------------------------------------
452
453    fn setup_bones_dir() -> (TempDir, ShardManager) {
454        let dir = TempDir::new().expect("create tempdir");
455        let shard_mgr = ShardManager::new(dir.path());
456        shard_mgr.ensure_dirs().expect("ensure dirs");
457        shard_mgr.init().expect("init shard");
458        (dir, shard_mgr)
459    }
460
461    fn make_create_event(id: &str, title: &str, ts: i64) -> Event {
462        let mut event = Event {
463            wall_ts_us: ts,
464            agent: "test-agent".into(),
465            itc: "itc:AQ".into(),
466            parents: vec![],
467            event_type: EventType::Create,
468            item_id: ItemId::new_unchecked(id),
469            data: EventData::Create(CreateData {
470                title: title.into(),
471                kind: Kind::Task,
472                size: Some(Size::M),
473                urgency: Urgency::Default,
474                labels: vec!["test".into()],
475                parent: None,
476                causation: None,
477                description: Some(format!("Description for {title}")),
478                extra: BTreeMap::new(),
479            }),
480            event_hash: String::new(),
481        };
482        writer::write_event(&mut event).expect("compute hash");
483        event
484    }
485
486    fn append_event(shard_mgr: &ShardManager, event: &Event) {
487        let line = writer::write_line(event).expect("serialize event");
488        let (year, month) = shard_mgr.active_shard().unwrap().unwrap();
489        shard_mgr
490            .append_raw(year, month, &line)
491            .expect("append event");
492    }
493
494    // -----------------------------------------------------------------------
495    // Tests
496    // -----------------------------------------------------------------------
497
498    #[test]
499    fn incremental_apply_on_empty_db_does_full_rebuild() {
500        let (dir, _shard_mgr) = setup_bones_dir();
501        let db_path = dir.path().join("bones.db");
502        let events_dir = dir.path().join("events");
503
504        let report = incremental_apply(&events_dir, &db_path, false).unwrap();
505        assert!(report.full_rebuild_triggered);
506        assert!(
507            report
508                .full_rebuild_reason
509                .as_deref()
510                .unwrap()
511                .contains("missing"),
512            "reason: {:?}",
513            report.full_rebuild_reason
514        );
515    }
516
517    #[test]
518    fn incremental_apply_force_full() {
519        let (dir, shard_mgr) = setup_bones_dir();
520        let db_path = dir.path().join("bones.db");
521        let events_dir = dir.path().join("events");
522
523        let create = make_create_event("bn-001", "Item 1", 1000);
524        append_event(&shard_mgr, &create);
525
526        // First, do a normal rebuild to set up the DB
527        rebuild::rebuild(&events_dir, &db_path).unwrap();
528
529        // Now force a full rebuild
530        let report = incremental_apply(&events_dir, &db_path, true).unwrap();
531        assert!(report.full_rebuild_triggered);
532        assert_eq!(
533            report.full_rebuild_reason.as_deref(),
534            Some("force_full flag set")
535        );
536        assert_eq!(report.events_applied, 1);
537    }
538
539    #[test]
540    fn incremental_apply_picks_up_new_events() {
541        let (dir, shard_mgr) = setup_bones_dir();
542        let db_path = dir.path().join("bones.db");
543        let events_dir = dir.path().join("events");
544
545        // Write initial events and do a full rebuild
546        let create1 = make_create_event("bn-001", "Item 1", 1000);
547        let create2 = make_create_event("bn-002", "Item 2", 1001);
548        append_event(&shard_mgr, &create1);
549        append_event(&shard_mgr, &create2);
550
551        rebuild::rebuild(&events_dir, &db_path).unwrap();
552
553        // Add a new event
554        let create3 = make_create_event("bn-003", "Item 3", 1002);
555        append_event(&shard_mgr, &create3);
556
557        // Incremental apply should only pick up the new event
558        let report = incremental_apply(&events_dir, &db_path, false).unwrap();
559        assert!(!report.full_rebuild_triggered);
560        assert_eq!(report.events_applied, 1);
561
562        // Verify all 3 items are in the DB
563        let conn = open_projection(&db_path).unwrap();
564        let count: i64 = conn
565            .query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
566            .unwrap();
567        assert_eq!(count, 3);
568    }
569
570    #[test]
571    fn incremental_apply_noop_when_up_to_date() {
572        let (dir, shard_mgr) = setup_bones_dir();
573        let db_path = dir.path().join("bones.db");
574        let events_dir = dir.path().join("events");
575
576        let create = make_create_event("bn-001", "Item 1", 1000);
577        append_event(&shard_mgr, &create);
578
579        rebuild::rebuild(&events_dir, &db_path).unwrap();
580
581        // No new events — incremental should be a no-op
582        let report = incremental_apply(&events_dir, &db_path, false).unwrap();
583        assert!(!report.full_rebuild_triggered);
584        assert_eq!(report.events_applied, 0);
585    }
586
587    #[test]
588    fn incremental_apply_multiple_rounds() {
589        let (dir, shard_mgr) = setup_bones_dir();
590        let db_path = dir.path().join("bones.db");
591        let events_dir = dir.path().join("events");
592
593        // Round 1: initial rebuild
594        let e1 = make_create_event("bn-001", "Item 1", 1000);
595        append_event(&shard_mgr, &e1);
596        rebuild::rebuild(&events_dir, &db_path).unwrap();
597
598        // Round 2: incremental
599        let e2 = make_create_event("bn-002", "Item 2", 1001);
600        append_event(&shard_mgr, &e2);
601        let r2 = incremental_apply(&events_dir, &db_path, false).unwrap();
602        assert!(!r2.full_rebuild_triggered);
603        assert_eq!(r2.events_applied, 1);
604
605        // Round 3: another incremental
606        let e3 = make_create_event("bn-003", "Item 3", 1002);
607        let e4 = make_create_event("bn-004", "Item 4", 1003);
608        append_event(&shard_mgr, &e3);
609        append_event(&shard_mgr, &e4);
610        let r3 = incremental_apply(&events_dir, &db_path, false).unwrap();
611        assert!(!r3.full_rebuild_triggered);
612        assert_eq!(r3.events_applied, 2);
613
614        // Final check: all 4 items
615        let conn = open_projection(&db_path).unwrap();
616        let count: i64 = conn
617            .query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
618            .unwrap();
619        assert_eq!(count, 4);
620    }
621
622    #[test]
623    fn incremental_apply_matches_full_rebuild() {
624        let (dir, shard_mgr) = setup_bones_dir();
625        let events_dir = dir.path().join("events");
626
627        // Create several events
628        for i in 0..10 {
629            let e = make_create_event(
630                &format!("bn-{i:03x}"),
631                &format!("Item {i}"),
632                1000 + i64::from(i),
633            );
634            append_event(&shard_mgr, &e);
635        }
636
637        // Path A: full rebuild
638        let db_full = dir.path().join("full.db");
639        rebuild::rebuild(&events_dir, &db_full).unwrap();
640
641        // Path B: incremental (first 5 via rebuild, then 5 via incremental)
642        let db_inc = dir.path().join("inc.db");
643        // We need to rebuild from scratch with only 5 events, but since
644        // all 10 are already in the shard, let's just do a full rebuild
645        // then verify they match.
646        rebuild::rebuild(&events_dir, &db_inc).unwrap();
647
648        // Compare item counts
649        let conn_full = open_projection(&db_full).unwrap();
650        let conn_inc = open_projection(&db_inc).unwrap();
651
652        let count_full: i64 = conn_full
653            .query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
654            .unwrap();
655        let count_inc: i64 = conn_inc
656            .query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
657            .unwrap();
658        assert_eq!(count_full, count_inc);
659        assert_eq!(count_full, 10);
660
661        // Compare titles
662        let titles_full: Vec<String> = {
663            let mut stmt = conn_full
664                .prepare("SELECT title FROM items ORDER BY item_id")
665                .unwrap();
666            stmt.query_map([], |row| row.get::<_, String>(0))
667                .unwrap()
668                .map(|r| r.unwrap())
669                .collect()
670        };
671        let titles_inc: Vec<String> = {
672            let mut stmt = conn_inc
673                .prepare("SELECT title FROM items ORDER BY item_id")
674                .unwrap();
675            stmt.query_map([], |row| row.get::<_, String>(0))
676                .unwrap()
677                .map(|r| r.unwrap())
678                .collect()
679        };
680        assert_eq!(titles_full, titles_inc);
681    }
682
683    #[test]
684    fn schema_version_mismatch_triggers_full_rebuild() {
685        let (dir, shard_mgr) = setup_bones_dir();
686        let db_path = dir.path().join("bones.db");
687        let events_dir = dir.path().join("events");
688
689        let create = make_create_event("bn-001", "Item 1", 1000);
690        append_event(&shard_mgr, &create);
691
692        rebuild::rebuild(&events_dir, &db_path).unwrap();
693
694        // Tamper with the schema version
695        {
696            let conn = open_projection(&db_path).unwrap();
697            conn.pragma_update(None, "user_version", 999_i64).unwrap();
698        }
699
700        let report = incremental_apply(&events_dir, &db_path, false).unwrap();
701        assert!(report.full_rebuild_triggered);
702        assert!(
703            report
704                .full_rebuild_reason
705                .as_deref()
706                .unwrap()
707                .contains("schema version"),
708            "reason: {:?}",
709            report.full_rebuild_reason
710        );
711    }
712
713    #[test]
714    fn read_hwm_returns_none_for_fresh_db() {
715        let mut conn = Connection::open_in_memory().unwrap();
716        migrations::migrate(&mut conn).unwrap();
717
718        let hwm = read_hwm(&conn).unwrap();
719        assert!(hwm.is_none());
720    }
721
722    #[test]
723    fn write_and_read_hwm_roundtrip() {
724        let mut conn = Connection::open_in_memory().unwrap();
725        migrations::migrate(&mut conn).unwrap();
726
727        let hash = EventHash("blake3:abc123".into());
728        write_hwm(&conn, &hash).unwrap();
729
730        let retrieved = read_hwm(&conn).unwrap();
731        assert_eq!(retrieved.unwrap(), hash);
732    }
733
734    #[test]
735    fn check_incremental_safety_passes_valid_db() {
736        let (dir, shard_mgr) = setup_bones_dir();
737        let db_path = dir.path().join("bones.db");
738        let events_dir = dir.path().join("events");
739
740        let create = make_create_event("bn-001", "Item 1", 1000);
741        append_event(&shard_mgr, &create);
742
743        rebuild::rebuild(&events_dir, &db_path).unwrap();
744
745        let conn = open_projection(&db_path).unwrap();
746        project::ensure_tracking_table(&conn).unwrap();
747        let result = check_incremental_safety(&conn, &events_dir);
748        assert!(result.is_ok(), "safety check failed: {result:?}");
749    }
750
751    #[test]
752    fn check_incremental_safety_fails_schema_mismatch() {
753        let mut conn = Connection::open_in_memory().unwrap();
754        migrations::migrate(&mut conn).unwrap();
755        conn.pragma_update(None, "user_version", 999_i64).unwrap();
756
757        // events_dir doesn't matter for schema check
758        let result = check_incremental_safety(&conn, Path::new("/nonexistent"));
759        assert!(result.is_err());
760        assert!(result.unwrap_err().contains("schema version"));
761    }
762
763    #[test]
764    fn check_incremental_safety_fails_missing_tracking_table() {
765        let mut conn = Connection::open_in_memory().unwrap();
766        migrations::migrate(&mut conn).unwrap();
767        // Don't create the tracking table
768
769        let result = check_incremental_safety(&conn, Path::new("/nonexistent"));
770        assert!(result.is_err());
771        assert!(result.unwrap_err().contains("projected_events"));
772    }
773
774    #[test]
775    fn validate_cursor_hash_finds_hash_near_offset() {
776        let content = "line1\thash1\nline2\tblake3:abc123\nline3\thash3\n";
777        let offset = content.find("line3").unwrap();
778        assert!(validate_cursor_hash(content, offset, "blake3:abc123"));
779    }
780
781    #[test]
782    fn validate_cursor_hash_fails_wrong_hash() {
783        let content = "line1\thash1\nline2\tblake3:abc123\nline3\thash3\n";
784        let offset = content.find("line3").unwrap();
785        assert!(!validate_cursor_hash(content, offset, "blake3:zzz999"));
786    }
787
788    #[test]
789    fn validate_cursor_hash_fails_zero_offset() {
790        let content = "line1\tblake3:abc123\n";
791        assert!(!validate_cursor_hash(content, 0, "blake3:abc123"));
792    }
793}