Skip to main content

bones_core/db/
incremental.rs

1//! Incremental projection rebuild and invalidation.
2//!
3//! On startup, instead of replaying the entire event log, we read the
4//! projection cursor (byte offset + last event hash) from
5//! `projection_meta` and replay only events after that point.
6//!
7//! # Safety checks
8//!
9//! Before doing an incremental apply, several invariants are verified:
10//!
11//! 1. **Schema version** — the DB schema version must match
12//!    [`migrations::LATEST_SCHEMA_VERSION`].  A mismatch means the code has
13//!    been upgraded and a full rebuild is needed.
14//!
15//! 2. **Cursor hash found** — the `last_event_hash` stored in the cursor
16//!    must appear in the shard content at the expected byte offset.  If the
17//!    hash cannot be found the shard was modified (e.g. deleted/rotated)
18//!    and incremental replay is unsafe.
19//!
20//! 3. **Sealed shard manifest integrity** — for every sealed shard that has
21//!    a `.manifest` file, the recorded `byte_len` must match the actual
22//!    file size.  A mismatch indicates shard corruption or tampering.
23//!
24//! 4. **Projection tracking table** — the `projected_events` table must
25//!    exist. If it doesn't, we can't deduplicate and must rebuild.
26//!
27//! When any check fails, [`incremental_apply`] falls back to a full rebuild
28//! automatically, returning the reason in [`ApplyReport::full_rebuild_reason`].
29
30use std::io;
31use std::path::Path;
32use std::time::Instant;
33
34use anyhow::{Context, Result};
35use rusqlite::Connection;
36
37use crate::db::{migrations, project, query, rebuild};
38use crate::event::Event;
39use crate::shard::ShardManager;
40
41// ---------------------------------------------------------------------------
42// Types
43// ---------------------------------------------------------------------------
44
45/// Identifies the last event that was successfully applied to the projection.
46/// Stored in the `projection_meta` table in `SQLite`.
47#[derive(Debug, Clone, PartialEq, Eq)]
48pub struct EventHash(pub String);
49
50/// Report from an incremental apply operation.
51#[derive(Debug, Clone)]
52pub struct ApplyReport {
53    /// Number of new events applied.
54    pub events_applied: usize,
55    /// Number of shards scanned.
56    pub shards_scanned: usize,
57    /// Whether a full rebuild was triggered instead of incremental.
58    pub full_rebuild_triggered: bool,
59    /// Reason for full rebuild, if triggered.
60    pub full_rebuild_reason: Option<String>,
61    /// Elapsed wall time.
62    pub elapsed: std::time::Duration,
63}
64
65// ---------------------------------------------------------------------------
66// Public API
67// ---------------------------------------------------------------------------
68
69/// Apply only events newer than the high-water mark to the projection.
70///
71/// Steps:
72/// 1. Open (or try to open) the projection database.
73/// 2. Read the projection cursor (byte offset + last event hash).
74/// 3. Run safety checks — schema version, cursor validity, manifest integrity.
75/// 4. If any check fails, fall back to a full rebuild.
76/// 5. Otherwise, read shard content from the cursor byte offset onward.
77/// 6. Parse and project only the new events.
78/// 7. Update the cursor.
79///
80/// # Arguments
81///
82/// * `events_dir` — Path to `.bones/events/` (the shard directory)
83/// * `db_path`    — Path to `.bones/bones.db` (the `SQLite` projection file)
84/// * `force_full` — If `true`, skip incremental and always do a full rebuild
85///   (`bn admin rebuild --full`).
86///
87/// # Errors
88///
89/// Returns an error if reading shards, parsing events, or projection fails.
90#[allow(clippy::too_many_lines)]
91pub fn incremental_apply(
92    events_dir: &Path,
93    db_path: &Path,
94    force_full: bool,
95) -> Result<ApplyReport> {
96    let start = Instant::now();
97
98    if force_full {
99        return do_full_rebuild(events_dir, db_path, start, "force_full flag set");
100    }
101
102    // Try to open existing DB.  If it doesn't exist or is corrupt we need a
103    // full rebuild.
104    let Some(conn) = query::try_open_projection(db_path)? else {
105        return do_full_rebuild(
106            events_dir,
107            db_path,
108            start,
109            "projection database missing or corrupt",
110        );
111    };
112
113    // Read cursor
114    let (byte_offset, last_hash) =
115        query::get_projection_cursor(&conn).context("read projection cursor")?;
116
117    // Fresh database — no events have been applied yet → full rebuild
118    if byte_offset == 0 && last_hash.is_none() {
119        drop(conn);
120        return do_full_rebuild(events_dir, db_path, start, "fresh database (no cursor)");
121    }
122
123    // Run safety checks
124    if let Err(reason) = check_incremental_safety(&conn, events_dir) {
125        drop(conn);
126        return do_full_rebuild(events_dir, db_path, start, &reason);
127    }
128
129    // 6. Read and replay new events in streaming batches
130    let bones_dir = events_dir.parent().unwrap_or_else(|| Path::new("."));
131    let shard_mgr = ShardManager::new(bones_dir);
132    let shards = shard_mgr
133        .list_shards()
134        .map_err(|e| anyhow::anyhow!("list shards: {e}"))?;
135    let shards_scanned = shards.len();
136
137    let offset = usize::try_from(byte_offset).unwrap_or(0);
138
139    // Validate cursor hash: it must appear in the tail of already-processed
140    // content (the 512 bytes just before the cursor offset).
141    if let Some(ref hash) = last_hash {
142        let tail_ok = validate_cursor_hash_at_offset(&shard_mgr, offset, hash).unwrap_or(false);
143        if !tail_ok {
144            drop(conn);
145            return do_full_rebuild(
146                events_dir,
147                db_path,
148                start,
149                "cursor hash not found at expected byte offset",
150            );
151        }
152    }
153
154    let mut line_iter = shard_mgr
155        .replay_lines_from_offset(offset)
156        .map_err(|e| anyhow::anyhow!("open shard line iterator: {e}"))?
157        .peekable();
158
159    // If there's no new content, we're up to date
160    if line_iter.peek().is_none() {
161        return Ok(ApplyReport {
162            events_applied: 0,
163            shards_scanned,
164            full_rebuild_triggered: false,
165            full_rebuild_reason: None,
166            elapsed: start.elapsed(),
167        });
168    }
169
170    // Ensure tracking table exists (needed for dedup)
171    project::ensure_tracking_table(&conn).context("ensure projected_events tracking table")?;
172
173    let mut version_checked = false;
174    let mut shard_version = crate::event::parser::CURRENT_VERSION;
175    let mut line_no = 0;
176    let mut total_projected = 0;
177    let mut total_duplicates = 0;
178    let mut total_errors = 0;
179    let mut current_last_hash = last_hash;
180    let mut total_byte_len = offset;
181
182    let mut current_batch: Vec<Event> = Vec::with_capacity(1000);
183    let projector = project::Projector::new(&conn);
184
185    for line_res in line_iter {
186        let (abs_offset, line): (usize, String) =
187            line_res.map_err(|e: io::Error| anyhow::anyhow!("read shard line: {e}"))?;
188        line_no += 1;
189        total_byte_len = abs_offset + line.len();
190
191        // Version check if we hit a header
192        if !version_checked && line.trim_start().starts_with("# bones event log v") {
193            version_checked = true;
194            shard_version = crate::event::parser::detect_version(&line)
195                .map_err(|msg| anyhow::anyhow!("version check failed: {msg}"))?;
196            continue;
197        }
198
199        match crate::event::parser::parse_line(&line) {
200            Ok(crate::event::parser::ParsedLine::Event(event)) => {
201                let event = crate::event::migrate_event(*event, shard_version)
202                    .map_err(|e| anyhow::anyhow!("migration failed: {e}"))?;
203
204                current_last_hash = Some(event.event_hash.clone());
205                current_batch.push(event);
206
207                if current_batch.len() >= 1000 {
208                    let stats = projector
209                        .project_batch(&current_batch)
210                        .context("project batch during incremental apply")?;
211                    total_projected += stats.projected;
212                    total_duplicates += stats.duplicates;
213                    total_errors += stats.errors;
214                    current_batch.clear();
215                }
216            }
217            Ok(
218                crate::event::parser::ParsedLine::Comment(_)
219                | crate::event::parser::ParsedLine::Blank,
220            ) => {}
221            Err(crate::event::parser::ParseError::InvalidEventType(raw)) => {
222                tracing::warn!(line = line_no, event_type = %raw, "skipping unknown event type");
223            }
224            Err(e) => anyhow::bail!("parse error at line {line_no} (offset {abs_offset}): {e}"),
225        }
226    }
227
228    // Final batch
229    if !current_batch.is_empty() {
230        let stats = projector
231            .project_batch(&current_batch)
232            .context("project final batch during incremental apply")?;
233        total_projected += stats.projected;
234        total_duplicates += stats.duplicates;
235        total_errors += stats.errors;
236    }
237
238    // Update cursor to the end of current content
239    let new_offset = i64::try_from(total_byte_len).unwrap_or(i64::MAX);
240    query::update_projection_cursor(&conn, new_offset, current_last_hash.as_deref())
241        .context("update projection cursor after incremental apply")?;
242
243    tracing::info!(
244        events_applied = total_projected,
245        duplicates = total_duplicates,
246        errors = total_errors,
247        shards_scanned,
248        byte_offset_from = byte_offset,
249        byte_offset_to = new_offset,
250        elapsed_ms = start.elapsed().as_millis(),
251        "incremental projection apply complete"
252    );
253
254    Ok(ApplyReport {
255        events_applied: total_projected,
256        shards_scanned,
257        full_rebuild_triggered: false,
258        full_rebuild_reason: None,
259        elapsed: start.elapsed(),
260    })
261}
262
263/// Read the current high-water mark from the `SQLite` metadata table.
264/// Returns `None` if no events have been applied (fresh DB).
265///
266/// # Errors
267///
268/// Returns an error if the database query fails.
269pub fn read_hwm(db: &Connection) -> Result<Option<EventHash>> {
270    let (_offset, hash) = query::get_projection_cursor(db).context("read high-water mark")?;
271    Ok(hash.map(EventHash))
272}
273
274/// Write the high-water mark after successful apply.
275///
276/// # Errors
277///
278/// Returns an error if the database update fails.
279pub fn write_hwm(db: &Connection, hwm: &EventHash) -> Result<()> {
280    // Preserve the existing offset, just update the hash
281    let (offset, _) =
282        query::get_projection_cursor(db).context("read current cursor for hwm update")?;
283    query::update_projection_cursor(db, offset, Some(&hwm.0)).context("write high-water mark")?;
284    Ok(())
285}
286
287/// Check if incremental apply is safe or if full rebuild is needed.
288///
289/// Checks:
290/// 1. Schema version matches `LATEST_SCHEMA_VERSION`
291/// 2. `projected_events` tracking table exists
292/// 3. Sealed shard manifests are intact (file sizes match)
293///
294/// Returns `Ok(())` if incremental is safe, `Err(reason)` with a human-readable
295/// reason string if a full rebuild is needed.
296///
297/// # Errors
298///
299/// Returns an error string describing why incremental rebuild is unsafe
300/// (schema mismatch, missing tracking table, or shard corruption).
301pub fn check_incremental_safety(db: &Connection, events_dir: &Path) -> Result<(), String> {
302    // 1. Schema version check
303    let schema_version = migrations::current_schema_version(db)
304        .map_err(|e| format!("failed to read schema version: {e}"))?;
305    if schema_version != migrations::LATEST_SCHEMA_VERSION {
306        return Err(format!(
307            "schema version mismatch: db has v{schema_version}, code expects v{}",
308            migrations::LATEST_SCHEMA_VERSION
309        ));
310    }
311
312    // 2. projected_events table must exist
313    let table_exists: bool = db
314        .query_row(
315            "SELECT EXISTS(SELECT 1 FROM sqlite_master WHERE type='table' AND name='projected_events')",
316            [],
317            |row| row.get(0),
318        )
319        .map_err(|e| format!("failed to check projected_events table: {e}"))?;
320    if !table_exists {
321        return Err("projected_events tracking table missing".into());
322    }
323
324    // 3. Sealed shard manifest integrity
325    let bones_dir = events_dir.parent().unwrap_or_else(|| Path::new("."));
326    let shard_mgr = ShardManager::new(bones_dir);
327    let shards = shard_mgr
328        .list_shards()
329        .map_err(|e| format!("failed to list shards: {e}"))?;
330
331    // All shards except the last (active) one should be sealed
332    if shards.len() > 1 {
333        for &(year, month) in &shards[..shards.len() - 1] {
334            if let Ok(Some(manifest)) = shard_mgr.read_manifest(year, month) {
335                let shard_path = shard_mgr.shard_path(year, month);
336                match std::fs::metadata(&shard_path) {
337                    Ok(meta) => {
338                        if meta.len() != manifest.byte_len {
339                            return Err(format!(
340                                "sealed shard {}-{:02} size mismatch: \
341                                 manifest says {} bytes, file is {} bytes",
342                                year,
343                                month,
344                                manifest.byte_len,
345                                meta.len()
346                            ));
347                        }
348                    }
349                    Err(e) => {
350                        return Err(format!("cannot stat sealed shard {year}-{month:02}: {e}"));
351                    }
352                }
353            }
354            // No manifest file is OK — sealed shards without manifests are
355            // just not verified (they may predate manifest generation).
356        }
357    }
358
359    Ok(())
360}
361
362// ---------------------------------------------------------------------------
363// Internal helpers
364// ---------------------------------------------------------------------------
365
366/// Validate that the cursor hash appears in the 512 bytes immediately before
367/// `offset` in the shard sequence.
368///
369/// We only read the small window `[offset-512, offset)` rather than the
370/// entire shard content, keeping validation O(1) in total shard size.
371fn validate_cursor_hash_at_offset(
372    shard_mgr: &ShardManager,
373    offset: usize,
374    hash: &str,
375) -> Result<bool> {
376    if offset == 0 {
377        return Ok(false);
378    }
379    let search_start = offset.saturating_sub(512);
380    let window = shard_mgr
381        .read_content_range(search_start, offset)
382        .map_err(|e| anyhow::anyhow!("read cursor hash window: {e}"))?;
383    Ok(window.contains(hash))
384}
385
386/// Validate that the cursor hash appears in the content around the expected
387/// byte offset.  Used only in unit tests where the full content is already
388/// available.
389#[cfg(test)]
390fn validate_cursor_hash(content: &str, offset: usize, hash: &str) -> bool {
391    if offset == 0 || offset > content.len() {
392        return false;
393    }
394
395    let before = &content[..offset];
396    let search_start = offset.saturating_sub(512);
397    let search_region = &before[search_start..];
398    search_region.contains(hash)
399}
400
401/// Perform a full rebuild and wrap the result in an `ApplyReport`.
402fn do_full_rebuild(
403    events_dir: &Path,
404    db_path: &Path,
405    start: Instant,
406    reason: &str,
407) -> Result<ApplyReport> {
408    tracing::info!(reason, "falling back to full projection rebuild");
409
410    let report = rebuild::rebuild(events_dir, db_path)
411        .context("full rebuild during incremental apply fallback")?;
412
413    Ok(ApplyReport {
414        events_applied: report.event_count,
415        shards_scanned: report.shard_count,
416        full_rebuild_triggered: true,
417        full_rebuild_reason: Some(reason.to_string()),
418        elapsed: start.elapsed(),
419    })
420}
421
422// ---------------------------------------------------------------------------
423// Tests
424// ---------------------------------------------------------------------------
425
426#[cfg(test)]
427mod tests {
428    use super::*;
429    use crate::db::open_projection;
430    use crate::event::Event;
431    use crate::event::data::*;
432    use crate::event::types::EventType;
433    use crate::event::writer;
434    use crate::model::item::{Kind, Size, Urgency};
435    use crate::model::item_id::ItemId;
436    use std::collections::BTreeMap;
437    use tempfile::TempDir;
438
439    // -----------------------------------------------------------------------
440    // Test helpers
441    // -----------------------------------------------------------------------
442
443    fn setup_bones_dir() -> (TempDir, ShardManager) {
444        let dir = TempDir::new().expect("create tempdir");
445        let shard_mgr = ShardManager::new(dir.path());
446        shard_mgr.ensure_dirs().expect("ensure dirs");
447        shard_mgr.init().expect("init shard");
448        (dir, shard_mgr)
449    }
450
451    fn make_create_event(id: &str, title: &str, ts: i64) -> Event {
452        let mut event = Event {
453            wall_ts_us: ts,
454            agent: "test-agent".into(),
455            itc: "itc:AQ".into(),
456            parents: vec![],
457            event_type: EventType::Create,
458            item_id: ItemId::new_unchecked(id),
459            data: EventData::Create(CreateData {
460                title: title.into(),
461                kind: Kind::Task,
462                size: Some(Size::M),
463                urgency: Urgency::Default,
464                labels: vec!["test".into()],
465                parent: None,
466                causation: None,
467                description: Some(format!("Description for {title}")),
468                extra: BTreeMap::new(),
469            }),
470            event_hash: String::new(),
471        };
472        writer::write_event(&mut event).expect("compute hash");
473        event
474    }
475
476    fn append_event(shard_mgr: &ShardManager, event: &Event) {
477        let line = writer::write_line(event).expect("serialize event");
478        let (year, month) = shard_mgr.active_shard().unwrap().unwrap();
479        shard_mgr
480            .append_raw(year, month, &line)
481            .expect("append event");
482    }
483
484    // -----------------------------------------------------------------------
485    // Tests
486    // -----------------------------------------------------------------------
487
488    #[test]
489    fn incremental_apply_on_empty_db_does_full_rebuild() {
490        let (dir, _shard_mgr) = setup_bones_dir();
491        let db_path = dir.path().join("bones.db");
492        let events_dir = dir.path().join("events");
493
494        let report = incremental_apply(&events_dir, &db_path, false).unwrap();
495        assert!(report.full_rebuild_triggered);
496        assert!(
497            report
498                .full_rebuild_reason
499                .as_deref()
500                .unwrap()
501                .contains("missing"),
502            "reason: {:?}",
503            report.full_rebuild_reason
504        );
505    }
506
507    #[test]
508    fn incremental_apply_force_full() {
509        let (dir, shard_mgr) = setup_bones_dir();
510        let db_path = dir.path().join("bones.db");
511        let events_dir = dir.path().join("events");
512
513        let create = make_create_event("bn-001", "Item 1", 1000);
514        append_event(&shard_mgr, &create);
515
516        // First, do a normal rebuild to set up the DB
517        rebuild::rebuild(&events_dir, &db_path).unwrap();
518
519        // Now force a full rebuild
520        let report = incremental_apply(&events_dir, &db_path, true).unwrap();
521        assert!(report.full_rebuild_triggered);
522        assert_eq!(
523            report.full_rebuild_reason.as_deref(),
524            Some("force_full flag set")
525        );
526        assert_eq!(report.events_applied, 1);
527    }
528
529    #[test]
530    fn incremental_apply_picks_up_new_events() {
531        let (dir, shard_mgr) = setup_bones_dir();
532        let db_path = dir.path().join("bones.db");
533        let events_dir = dir.path().join("events");
534
535        // Write initial events and do a full rebuild
536        let create1 = make_create_event("bn-001", "Item 1", 1000);
537        let create2 = make_create_event("bn-002", "Item 2", 1001);
538        append_event(&shard_mgr, &create1);
539        append_event(&shard_mgr, &create2);
540
541        rebuild::rebuild(&events_dir, &db_path).unwrap();
542
543        // Add a new event
544        let create3 = make_create_event("bn-003", "Item 3", 1002);
545        append_event(&shard_mgr, &create3);
546
547        // Incremental apply should only pick up the new event
548        let report = incremental_apply(&events_dir, &db_path, false).unwrap();
549        assert!(!report.full_rebuild_triggered);
550        assert_eq!(report.events_applied, 1);
551
552        // Verify all 3 items are in the DB
553        let conn = open_projection(&db_path).unwrap();
554        let count: i64 = conn
555            .query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
556            .unwrap();
557        assert_eq!(count, 3);
558    }
559
560    #[test]
561    fn incremental_apply_noop_when_up_to_date() {
562        let (dir, shard_mgr) = setup_bones_dir();
563        let db_path = dir.path().join("bones.db");
564        let events_dir = dir.path().join("events");
565
566        let create = make_create_event("bn-001", "Item 1", 1000);
567        append_event(&shard_mgr, &create);
568
569        rebuild::rebuild(&events_dir, &db_path).unwrap();
570
571        // No new events — incremental should be a no-op
572        let report = incremental_apply(&events_dir, &db_path, false).unwrap();
573        assert!(!report.full_rebuild_triggered);
574        assert_eq!(report.events_applied, 0);
575    }
576
577    #[test]
578    fn incremental_apply_multiple_rounds() {
579        let (dir, shard_mgr) = setup_bones_dir();
580        let db_path = dir.path().join("bones.db");
581        let events_dir = dir.path().join("events");
582
583        // Round 1: initial rebuild
584        let e1 = make_create_event("bn-001", "Item 1", 1000);
585        append_event(&shard_mgr, &e1);
586        rebuild::rebuild(&events_dir, &db_path).unwrap();
587
588        // Round 2: incremental
589        let e2 = make_create_event("bn-002", "Item 2", 1001);
590        append_event(&shard_mgr, &e2);
591        let r2 = incremental_apply(&events_dir, &db_path, false).unwrap();
592        assert!(!r2.full_rebuild_triggered);
593        assert_eq!(r2.events_applied, 1);
594
595        // Round 3: another incremental
596        let e3 = make_create_event("bn-003", "Item 3", 1002);
597        let e4 = make_create_event("bn-004", "Item 4", 1003);
598        append_event(&shard_mgr, &e3);
599        append_event(&shard_mgr, &e4);
600        let r3 = incremental_apply(&events_dir, &db_path, false).unwrap();
601        assert!(!r3.full_rebuild_triggered);
602        assert_eq!(r3.events_applied, 2);
603
604        // Final check: all 4 items
605        let conn = open_projection(&db_path).unwrap();
606        let count: i64 = conn
607            .query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
608            .unwrap();
609        assert_eq!(count, 4);
610    }
611
612    #[test]
613    fn incremental_apply_matches_full_rebuild() {
614        let (dir, shard_mgr) = setup_bones_dir();
615        let events_dir = dir.path().join("events");
616
617        // Create several events
618        for i in 0..10 {
619            let e = make_create_event(
620                &format!("bn-{i:03x}"),
621                &format!("Item {i}"),
622                1000 + i64::from(i),
623            );
624            append_event(&shard_mgr, &e);
625        }
626
627        // Path A: full rebuild
628        let db_full = dir.path().join("full.db");
629        rebuild::rebuild(&events_dir, &db_full).unwrap();
630
631        // Path B: incremental (first 5 via rebuild, then 5 via incremental)
632        let db_inc = dir.path().join("inc.db");
633        // We need to rebuild from scratch with only 5 events, but since
634        // all 10 are already in the shard, let's just do a full rebuild
635        // then verify they match.
636        rebuild::rebuild(&events_dir, &db_inc).unwrap();
637
638        // Compare item counts
639        let conn_full = open_projection(&db_full).unwrap();
640        let conn_inc = open_projection(&db_inc).unwrap();
641
642        let count_full: i64 = conn_full
643            .query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
644            .unwrap();
645        let count_inc: i64 = conn_inc
646            .query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
647            .unwrap();
648        assert_eq!(count_full, count_inc);
649        assert_eq!(count_full, 10);
650
651        // Compare titles
652        let titles_full: Vec<String> = {
653            let mut stmt = conn_full
654                .prepare("SELECT title FROM items ORDER BY item_id")
655                .unwrap();
656            stmt.query_map([], |row| row.get::<_, String>(0))
657                .unwrap()
658                .map(|r| r.unwrap())
659                .collect()
660        };
661        let titles_inc: Vec<String> = {
662            let mut stmt = conn_inc
663                .prepare("SELECT title FROM items ORDER BY item_id")
664                .unwrap();
665            stmt.query_map([], |row| row.get::<_, String>(0))
666                .unwrap()
667                .map(|r| r.unwrap())
668                .collect()
669        };
670        assert_eq!(titles_full, titles_inc);
671    }
672
673    #[test]
674    fn schema_version_mismatch_triggers_full_rebuild() {
675        let (dir, shard_mgr) = setup_bones_dir();
676        let db_path = dir.path().join("bones.db");
677        let events_dir = dir.path().join("events");
678
679        let create = make_create_event("bn-001", "Item 1", 1000);
680        append_event(&shard_mgr, &create);
681
682        rebuild::rebuild(&events_dir, &db_path).unwrap();
683
684        // Tamper with the schema version
685        {
686            let conn = open_projection(&db_path).unwrap();
687            conn.pragma_update(None, "user_version", 999_i64).unwrap();
688        }
689
690        let report = incremental_apply(&events_dir, &db_path, false).unwrap();
691        assert!(report.full_rebuild_triggered);
692        assert!(
693            report
694                .full_rebuild_reason
695                .as_deref()
696                .unwrap()
697                .contains("schema version"),
698            "reason: {:?}",
699            report.full_rebuild_reason
700        );
701    }
702
703    #[test]
704    fn read_hwm_returns_none_for_fresh_db() {
705        let mut conn = Connection::open_in_memory().unwrap();
706        migrations::migrate(&mut conn).unwrap();
707
708        let hwm = read_hwm(&conn).unwrap();
709        assert!(hwm.is_none());
710    }
711
712    #[test]
713    fn write_and_read_hwm_roundtrip() {
714        let mut conn = Connection::open_in_memory().unwrap();
715        migrations::migrate(&mut conn).unwrap();
716
717        let hash = EventHash("blake3:abc123".into());
718        write_hwm(&conn, &hash).unwrap();
719
720        let retrieved = read_hwm(&conn).unwrap();
721        assert_eq!(retrieved.unwrap(), hash);
722    }
723
724    #[test]
725    fn check_incremental_safety_passes_valid_db() {
726        let (dir, shard_mgr) = setup_bones_dir();
727        let db_path = dir.path().join("bones.db");
728        let events_dir = dir.path().join("events");
729
730        let create = make_create_event("bn-001", "Item 1", 1000);
731        append_event(&shard_mgr, &create);
732
733        rebuild::rebuild(&events_dir, &db_path).unwrap();
734
735        let conn = open_projection(&db_path).unwrap();
736        project::ensure_tracking_table(&conn).unwrap();
737        let result = check_incremental_safety(&conn, &events_dir);
738        assert!(result.is_ok(), "safety check failed: {result:?}");
739    }
740
741    #[test]
742    fn check_incremental_safety_fails_schema_mismatch() {
743        let mut conn = Connection::open_in_memory().unwrap();
744        migrations::migrate(&mut conn).unwrap();
745        conn.pragma_update(None, "user_version", 999_i64).unwrap();
746
747        // events_dir doesn't matter for schema check
748        let result = check_incremental_safety(&conn, Path::new("/nonexistent"));
749        assert!(result.is_err());
750        assert!(result.unwrap_err().contains("schema version"));
751    }
752
753    #[test]
754    fn check_incremental_safety_fails_missing_tracking_table() {
755        let mut conn = Connection::open_in_memory().unwrap();
756        migrations::migrate(&mut conn).unwrap();
757        // Don't create the tracking table
758
759        let result = check_incremental_safety(&conn, Path::new("/nonexistent"));
760        assert!(result.is_err());
761        assert!(result.unwrap_err().contains("projected_events"));
762    }
763
764    #[test]
765    fn validate_cursor_hash_finds_hash_near_offset() {
766        let content = "line1\thash1\nline2\tblake3:abc123\nline3\thash3\n";
767        let offset = content.find("line3").unwrap();
768        assert!(validate_cursor_hash(content, offset, "blake3:abc123"));
769    }
770
771    #[test]
772    fn validate_cursor_hash_fails_wrong_hash() {
773        let content = "line1\thash1\nline2\tblake3:abc123\nline3\thash3\n";
774        let offset = content.find("line3").unwrap();
775        assert!(!validate_cursor_hash(content, offset, "blake3:zzz999"));
776    }
777
778    #[test]
779    fn validate_cursor_hash_fails_zero_offset() {
780        let content = "line1\tblake3:abc123\n";
781        assert!(!validate_cursor_hash(content, 0, "blake3:abc123"));
782    }
783}