Skip to main content

ckg_storage/store/
lifecycle.rs

1//! DB lifecycle: open, schema-version check, rebuild, path-swap, debris sweep.
2//!
3//! Contains `Storage::open_at`, `open_unverified`, `open_inner`, plus all
4//! the supporting free functions for the build-then-swap rebuild pattern and
5//! `root_path` collision-detection (I2).
6
7use std::collections::BTreeMap;
8use std::path::{Path, PathBuf};
9
10use ckg_core::{Error, RepoId, Result};
11use cozo::{DataValue, DbInstance, ScriptMutability};
12
13use crate::schema::PER_REPO_DDL;
14
15use super::meta::{read_meta_bool, stamp_meta_bool, stamp_needs_reindex};
16use super::{Storage, map_err};
17
18/// Cozo engine label for the `storage-new-rocksdb` feature.
19pub(super) const ENGINE: &str = "newrocksdb";
20
21/// Result of probing Meta for the recorded `root_path`.
22///
23/// **NI1 fix**: the original implementation used `.ok()?` which collapsed
24/// every error class — Meta missing (legit fresh DB), Cozo IO failure,
25/// transient lock — into `None`. Three explicit states let `open_inner`
26/// distinguish "no row → stamp it" from "could not read → refuse open".
27#[derive(Debug)]
28pub(super) enum RootPathProbe {
29    /// Meta has a `root_path` row with this value.
30    Recorded(String),
31    /// Query succeeded with zero rows — fresh DB or pre-I2 DB. Safe to stamp.
32    NoRow,
33    /// Query itself failed. Hard error: do NOT stamp.
34    ReadFailed(String),
35}
36
37fn read_root_path(db: &DbInstance) -> RootPathProbe {
38    match db.run_script(
39        "?[v] := *Meta{key: \"root_path\", value: v}",
40        BTreeMap::new(),
41        ScriptMutability::Immutable,
42    ) {
43        Ok(rows) => match rows.rows.first().and_then(|r| r.first()) {
44            Some(DataValue::Str(s)) => RootPathProbe::Recorded(s.to_string()),
45            // No row at all — fresh DB or pre-I2 DB. Safe to stamp.
46            None => RootPathProbe::NoRow,
47            // Row exists but value is not a string (e.g. Null or Int from a
48            // corrupted write). Do NOT treat this as NoRow — that would
49            // silently overwrite corrupted data and mask the problem.
50            Some(other) => RootPathProbe::ReadFailed(format!(
51                "non-string root_path in Meta: {:?}",
52                other
53            )),
54        },
55        Err(e) => RootPathProbe::ReadFailed(e.to_string()),
56    }
57}
58
59/// Write the `schema_version` row into `Meta` via parameter binding so a
60/// non-numeric / quote-bearing version string cannot escape the literal.
61fn stamp_schema_version(db: &DbInstance, schema_version: &str) -> Result<()> {
62    let mut params = BTreeMap::new();
63    params.insert("v".into(), DataValue::from(schema_version));
64    db.run_script(
65        "?[key, value] <- [[\"schema_version\", $v]] :put Meta {key => value}",
66        params,
67        ScriptMutability::Mutable,
68    )
69    .map_err(map_err)?;
70    Ok(())
71}
72
73/// Stamp the canonical `root_path` for this repo into `Meta` (I2).
74fn stamp_root_path(db: &DbInstance, root_path: &str) -> Result<()> {
75    let mut params = BTreeMap::new();
76    params.insert("v".into(), DataValue::from(root_path));
77    db.run_script(
78        "?[key, value] <- [[\"root_path\", $v]] :put Meta {key => value}",
79        params,
80        ScriptMutability::Mutable,
81    )
82    .map_err(map_err)?;
83    Ok(())
84}
85
86/// CR-M-7: returns true iff two path-shaped strings resolve to the same
87/// absolute path after `Path::canonicalize`. Used to tolerate
88/// symlink-equivalent recorded vs caller paths (macOS `/var ↔
89/// /private/var`, Linux bind mounts). Conservative: if EITHER side
90/// fails to canonicalize (path no longer exists, broken symlink, etc.)
91/// returns false — we can't prove equivalence, so the caller's
92/// collision-detection error fires (matches pre-M-7 fail-closed
93/// behavior). True hash collisions between two distinct repos
94/// canonicalize to different absolutes and also return false.
95fn paths_canonicalize_equal(a: &str, b: &str) -> bool {
96    // M10: cache the canonicalize result — each is a syscall.
97    let Ok(pa) = std::path::Path::new(a).canonicalize() else {
98        return false;
99    };
100    let Ok(pb) = std::path::Path::new(b).canonicalize() else {
101        return false;
102    };
103    pa == pb
104}
105
106/// CR-review-#3: returns the canonical absolute string for `path` if
107/// canonicalize succeeds, else falls back to the input. Used by M-7
108/// auto-migrate to stamp a STABLE shape regardless of which alias the
109/// caller passed — without this, two callers passing different
110/// symlink shapes would flap the recorded value back and forth on
111/// every alternating open.
112fn canonical_or_input(path: &str) -> String {
113    std::path::Path::new(path)
114        .canonicalize()
115        .ok()
116        .map(|p| p.display().to_string())
117        .unwrap_or_else(|| path.to_string())
118}
119
120/// Run a script, swallowing "relation already exists" errors so DDL is
121/// idempotent across re-opens.
122pub(super) fn run_idempotent(db: &DbInstance, script: &str) -> Result<()> {
123    use crate::cozo_compat::CozoErrorKind;
124    match db.run_script(script, BTreeMap::new(), ScriptMutability::Mutable) {
125        Ok(_) => Ok(()),
126        Err(e) => {
127            let mapped = map_err(e);
128            // F16+: classify the error shape via the typed enum. When
129            // Cozo eventually exposes typed errors upstream, only the
130            // body of `CozoErrorKind::of` changes; this match keeps
131            // working unchanged. The legacy "conflicts with an existing
132            // one" wording is handled inside `Other` via an explicit
133            // substring check so it doesn't drift.
134            match CozoErrorKind::of(&mapped) {
135                CozoErrorKind::RelationAlreadyExists | CozoErrorKind::RelationConflict => Ok(()),
136                CozoErrorKind::Other
137                    if mapped.to_string().contains("conflicts with an existing one") =>
138                {
139                    Ok(())
140                }
141                _ => Err(mapped),
142            }
143        }
144    }
145}
146
147/// Schema-mismatch rebuild using a build-then-swap pattern:
148///   1. Build a fresh DB at `<path>.new` (DDL + schema-stamp). On failure, old DB untouched.
149///   2. Drop the new handle. Sweep stale `<path>.old`.
150///   3. `rename(path → path.old)` + `rename(path.new → path)` — two atomic renames.
151///   4. Reopen at `path`. Cleanup `path.old`.
152fn rebuild_at_path(path: &Path, schema_version: &str) -> Result<DbInstance> {
153    let parent = path.parent().ok_or_else(|| {
154        Error::Storage(format!("rebuild target has no parent: {}", path.display()))
155    })?;
156    let file = path.file_name().and_then(|s| s.to_str()).ok_or_else(|| {
157        Error::Storage(format!(
158            "rebuild target has no file name: {}",
159            path.display()
160        ))
161    })?;
162    let new_path = parent.join(format!("{file}.new"));
163    let old_path = parent.join(format!("{file}.old"));
164
165    // Sweep any debris from a prior crashed rebuild before claiming names.
166    sweep_rebuild_debris(path);
167
168    // 1. Build the fresh DB at <path>.new. Original is untouched on error.
169    std::fs::create_dir_all(&new_path)?;
170    let new_db = DbInstance::new(ENGINE, &new_path, "{}").map_err(map_err)?;
171    for stmt in PER_REPO_DDL {
172        run_idempotent(&new_db, stmt)?;
173    }
174    stamp_schema_version(&new_db, schema_version)?;
175    // Drop the handle so the rename can move the directory cleanly.
176    drop(new_db);
177
178    // 2. Swap directories (atomic on Linux, two-rename on others).
179    swap_directories(path, &new_path, &old_path)?;
180
181    // 3. Reopen at the canonical path.
182    let installed = DbInstance::new(ENGINE, path, "{}").map_err(map_err)?;
183
184    // 4. Cleanup the old DB — best-effort, log on failure.
185    if let Err(cleanup_err) = std::fs::remove_dir_all(&old_path) {
186        tracing::error!(
187            "rebuild succeeded but old-DB cleanup failed at {}: {cleanup_err}; \
188             will be swept on next rebuild",
189            old_path.display()
190        );
191    }
192
193    Ok(installed)
194}
195
196/// Linux: atomically swap `<path>` and `<new_path>` via
197/// `renameat2(RENAME_EXCHANGE)`, then rename the displaced original to
198/// `<old_path>` for cleanup. No missing-path window for concurrent readers.
199///
200/// Other platforms: fall back to two `std::fs::rename` calls. Brief window
201/// where `<path>` doesn't exist (~one syscall); concurrent readers get ENOENT.
202#[cfg(target_os = "linux")]
203fn swap_directories(path: &Path, new_path: &Path, old_path: &Path) -> Result<()> {
204    use std::ffi::CString;
205    use std::os::unix::ffi::OsStrExt;
206
207    let c_path = CString::new(path.as_os_str().as_bytes())
208        .map_err(|e| Error::wrap("rebuild path has interior NUL", e))?;
209    let c_new = CString::new(new_path.as_os_str().as_bytes())
210        .map_err(|e| Error::wrap("rebuild path has interior NUL", e))?;
211    // SAFETY: passing C strings owned by `c_path` / `c_new`; AT_FDCWD matches
212    // std::fs semantics. Syscall returns 0 on success, -1 on error.
213    let rc = unsafe {
214        libc::renameat2(
215            libc::AT_FDCWD,
216            c_path.as_ptr(),
217            libc::AT_FDCWD,
218            c_new.as_ptr(),
219            libc::RENAME_EXCHANGE,
220        )
221    };
222    if rc != 0 {
223        // Fallback: kernel < 3.15 or filesystem doesn't support RENAME_EXCHANGE.
224        let errno = std::io::Error::last_os_error();
225        tracing::warn!(
226            "renameat2(RENAME_EXCHANGE) failed ({errno}); falling back to two-rename swap"
227        );
228        return swap_directories_two_rename(path, new_path, old_path);
229    }
230    // After EXCHANGE: `<path>` holds the new DB; `<new_path>` holds the old.
231    if let Err(rename_err) = std::fs::rename(new_path, old_path) {
232        tracing::error!(
233            "renameat2 swap succeeded but old-DB rename {} → {} failed: {rename_err}",
234            new_path.display(),
235            old_path.display()
236        );
237    }
238    Ok(())
239}
240
241#[cfg(not(target_os = "linux"))]
242fn swap_directories(path: &Path, new_path: &Path, old_path: &Path) -> Result<()> {
243    swap_directories_two_rename(path, new_path, old_path)
244}
245
246/// Portable fallback: rename original out of the way, then move new in.
247/// Brief missing-path window between the two renames.
248fn swap_directories_two_rename(path: &Path, new_path: &Path, old_path: &Path) -> Result<()> {
249    // M7: absolute paths only — relative paths with rename() are CWD-dependent.
250    debug_assert!(path.is_absolute(), "swap_directories: path must be absolute: {path:?}");
251    std::fs::rename(path, old_path)?;
252    if let Err(swap_err) = std::fs::rename(new_path, path) {
253        // Restore the original so the user isn't worse off.
254        if let Err(rb) = std::fs::rename(old_path, path) {
255            // CRITICAL: original is now orphaned at <old_path> and the
256            // canonical path is empty. Surface a distinct hard error so
257            // an operator can `mv <old_path> <path>` manually.
258            tracing::error!(
259                "rebuild swap failed AND rollback failed: could not restore {} → {}: {rb}",
260                old_path.display(),
261                path.display()
262            );
263            // STORAGE-C1: remove the orphaned .new directory so
264            // `sweep_rebuild_debris` on next open doesn't delete healthy
265            // DDL that an operator might want to inspect.
266            let _ = std::fs::remove_dir_all(new_path);
267            return Err(Error::Storage(format!(
268                "DB CORRUPTED: rebuild swap failed (swap_err: {swap_err}) and rollback also \
269                 failed (rb: {rb}). Original DB is at {} (manual recovery: \
270                 `mv {} {}`).",
271                old_path.display(),
272                old_path.display(),
273                path.display(),
274            )));
275        }
276        return Err(swap_err.into());
277    }
278    Ok(())
279}
280
281/// Sweep stale rebuild debris (`<path>.new` / `<path>.old`).
282///
283/// **Recovery preservation (R1):** if `<path>` itself doesn't exist or lacks
284/// a `CURRENT` file (i.e. is not a healthy RocksDB), `<path>.old` is the
285/// manual-recovery copy of a prior failed swap — leave it in place.
286/// We sweep `.new` unconditionally — that's always partial-build debris.
287///
288/// ## Concurrency (L2)
289///
290/// This function is called at `open_at()` entry, BEFORE acquiring the repo
291/// lock. Concurrent openers may race through sweep — that's benign because
292/// `remove_dir_all` of a non-existent path returns `NotFound` which we log
293/// and ignore. The worst case is a duplicated `tracing::warn` line.
294///
295/// ## Visibility (L9)
296///
297/// `pub(super)` — called from `open_at()` and `open_unverified()` in this
298/// module. Not exposed to downstream crates.
299pub(super) fn sweep_rebuild_debris(path: &Path) {
300    let parent = match path.parent() {
301        Some(p) => p,
302        None => return,
303    };
304    let file = match path.file_name().and_then(|s| s.to_str()) {
305        Some(f) => f,
306        None => return,
307    };
308    let new_path = parent.join(format!("{file}.new"));
309    if new_path.exists()
310        && let Err(e) = std::fs::remove_dir_all(&new_path)
311    {
312        tracing::warn!(
313            "could not sweep rebuild debris at {}: {e}",
314            new_path.display()
315        );
316    }
317    // A RocksDB directory is healthy when CURRENT exists (pointing at the
318    // active MANIFEST) AND at least one MANIFEST-* file is present. CURRENT
319    // alone can exist as an empty shell left by a partial init — requiring a
320    // MANIFEST-* file as well ensures we don't delete the .old recovery copy
321    // when the primary directory is only partially written.
322    // Conservative: when in doubt, do NOT delete .old.
323    let has_manifest_file = std::fs::read_dir(path)
324        .map(|mut dir| {
325            dir.any(|entry| {
326                entry
327                    .ok()
328                    .and_then(|e| e.file_name().into_string().ok())
329                    .map(|name| name.starts_with("MANIFEST-"))
330                    .unwrap_or(false)
331            })
332        })
333        .unwrap_or(false);
334    let path_has_healthy_db = path.join("CURRENT").is_file() && has_manifest_file;
335    if !path_has_healthy_db {
336        let old_path = parent.join(format!("{file}.old"));
337        if old_path.exists() {
338            tracing::warn!(
339                "preserving recovery dir at {} because {} is empty/missing — \
340                 manual recovery: `mv {} {}`",
341                old_path.display(),
342                path.display(),
343                old_path.display(),
344                path.display()
345            );
346        }
347        return;
348    }
349    let old_path = parent.join(format!("{file}.old"));
350    if old_path.exists()
351        && let Err(e) = std::fs::remove_dir_all(&old_path)
352    {
353        tracing::warn!(
354            "could not sweep rebuild debris at {}: {e}",
355            old_path.display()
356        );
357    }
358}
359
360impl Storage {
361    /// **Escape hatch** — open without recording / verifying a canonical
362    /// root_path. Use ONLY when the caller has no canonical root available
363    /// (tests, ad-hoc CLI exploration of a known `repo_id`, benchmarks).
364    pub fn open_unverified(repo_id: RepoId, base: &Path) -> Result<Self> {
365        Self::open_inner(repo_id, base, None, /*allow_rebuild=*/ true)
366    }
367
368    /// Open or create with canonical root-path verification (I2).
369    ///
370    /// On first init, the canonical path is stamped into Meta. Subsequent opens
371    /// read it back; if the recorded path doesn't match we **refuse** rather
372    /// than silently merge two repos that happen to hash to the same 96-bit RepoId.
373    ///
374    /// On schema-version mismatch, performs a destructive auto-rebuild. This is
375    /// intentional for the indexer path, which holds the repo lock. Readers
376    /// (query/resolve/MCP) should use `open_at_readonly` instead so they never
377    /// trigger a destructive rebuild concurrently with an ongoing `ckg index`.
378    pub fn open_at(repo_id: RepoId, root_path: &Path, base: &Path) -> Result<Self> {
379        // CR-review-#7: this is the caller's path, lossy-decoded — NOT
380        // canonicalized (the variable used to be called `canonical`
381        // which was misleading). The actual canonicalize happens
382        // inside `paths_canonicalize_equal` / `canonical_or_input`.
383        let caller_root = root_path.to_string_lossy().into_owned();
384        Self::open_inner(repo_id, base, Some(caller_root), /*allow_rebuild=*/ true)
385    }
386
387    /// Read-only open: like `open_at` but **never** triggers a destructive
388    /// schema-mismatch rebuild. On version mismatch, returns a clear error
389    /// directing the user to re-index rather than wiping the DB.
390    ///
391    /// Use this from all reader paths (query, resolve, MCP) so a reader
392    /// running concurrently with `ckg index` cannot destroy the indexer's
393    /// in-progress write.
394    pub fn open_at_readonly(repo_id: RepoId, root_path: &Path, base: &Path) -> Result<Self> {
395        let caller_root = root_path.to_string_lossy().into_owned();
396        Self::open_inner(repo_id, base, Some(caller_root), /*allow_rebuild=*/ false)
397    }
398
399    pub(super) fn open_inner(
400        repo_id: RepoId,
401        base: &Path,
402        root_path: Option<String>,
403        allow_rebuild: bool,
404    ) -> Result<Self> {
405        // CR-storage-H1: defense-in-depth at the FS boundary. RepoId's
406        // private field + try_new validator ensures only 24-hex strings
407        // reach here from production code, but a debug-mode assert
408        // catches accidental construction via raw struct expressions
409        // in tests / future internal code paths. Production is fail-
410        // closed too — Cozo would reject the path, but the error
411        // surface is opaque.
412        debug_assert!(
413            RepoId::is_valid(repo_id.as_str()),
414            "RepoId fed into open_inner is not 24-hex: {:?}",
415            repo_id.as_str()
416        );
417        let db_path: PathBuf = base.join("workspace_folders").join(repo_id.as_str());
418        sweep_rebuild_debris(&db_path);
419        // I1/A4: 0700 atomically at create — see ckg_fs::create_dir_secure.
420        ckg_fs::create_dir_secure(&db_path)
421            .map_err(|e| ckg_core::Error::Storage(format!("create repo db dir: {e}")))?;
422        let db = DbInstance::new(ENGINE, &db_path, "{}").map_err(map_err)?;
423        for stmt in PER_REPO_DDL {
424            run_idempotent(&db, stmt)?;
425        }
426
427        // Schema-version check: on mismatch, auto-rebuild.
428        // L7: use the pre-computed string constant to avoid repeated `.to_string()`.
429        let current = crate::schema::SCHEMA_VERSION_STR;
430        // Fail closed on read error: an IO or lock failure here could mask a
431        // real schema mismatch. Treating it as "fresh DB" would stamp the
432        // current schema version over an unknown on-disk state — potentially
433        // opening a DB whose schema we cannot safely interpret. Mirror the
434        // RootPathProbe::ReadFailed policy: refuse the open so the operator
435        // can inspect and recover explicitly.
436        let read = db.run_script(
437            "?[v] := *Meta{key: \"schema_version\", value: v}",
438            BTreeMap::new(),
439            ScriptMutability::Immutable,
440        ).map_err(|e| {
441            Error::Storage(format!(
442                "could not read schema_version from {} ({e}); \
443                 refusing to open under unknown schema state. \
444                 If this is a new DB, delete it and let ckg recreate it.",
445                db_path.display()
446            ))
447        })?;
448
449        let mismatch = read
450            .rows.first()
451            .and_then(|row| row.first())
452            .and_then(|v| match v {
453                DataValue::Str(s) if s.as_str() != current => Some(s.to_string()),
454                _ => None,
455            });
456        if let Some(stored) = mismatch {
457            // H4: parse().unwrap_or(0) would silently treat a corrupted
458            // non-numeric schema_version as 0, then trigger rebuild and
459            // destroy the DB. Hard-error instead so the operator can inspect.
460            let stored_v: u32 = stored.parse().map_err(|_| {
461                Error::Storage(format!(
462                    "schema version meta is corrupted at {}: \
463                     stored value {stored:?} is not a valid u32. \
464                     Manual recovery: inspect Meta{{key:\"schema_version\"}} \
465                     and either stamp the correct value or delete the DB.",
466                    db_path.display()
467                ))
468            })?;
469            // M9: refuse downgrade — stored schema is newer than this binary.
470            // Policy: if the binary is older than the on-disk schema, the
471            // operator must upgrade ckg. We deliberately do NOT offer an
472            // --allow-downgrade escape hatch — the on-disk format may have
473            // changed in ways an older binary cannot read safely, and a
474            // silent downgrade would corrupt the DB.
475            let current_v: u32 = current.parse().unwrap_or(0);
476            if stored_v > current_v {
477                return Err(map_err(format!(
478                    "DB at {} has schema version {stored} which is newer than \
479                     this binary's version {current}. Upgrade ckg to >= v{stored} \
480                     or delete the DB and re-index.",
481                    db_path.display()
482                )));
483            }
484            // Reader paths (query/resolve/MCP) must never trigger a destructive
485            // rebuild. They run concurrently with `ckg index` and hold no repo
486            // lock; wiping the DB mid-index would corrupt the indexer's write.
487            // Return a clear actionable error instead.
488            if !allow_rebuild {
489                return Err(Error::Storage(format!(
490                    "DB at {} has schema version v{stored} but this binary expects v{current}. \
491                     Run `ckg index --clean <repo>` to upgrade the schema. \
492                     (Read-only paths never auto-rebuild to avoid corrupting a concurrent index.)",
493                    db_path.display()
494                )));
495            }
496            // N1 (red-team #2): make it loud that "auto-rebuild" wipes
497            // the existing graph. `rebuild_at_path` builds an EMPTY DB
498            // and stamps `needs_reindex=true`; the user must re-run
499            // `ckg index` before any query returns useful data. The
500            // atomic-swap pattern below keeps the OLD DB intact until
501            // the new one is ready, so a crash mid-rebuild is safe,
502            // but the end state still loses every Symbol/edge row.
503            tracing::warn!(
504                "schema version mismatch (on-disk: v{stored}, binary: v{current}); \
505                 DESTRUCTIVE rebuild of {} — existing symbols/edges will be wiped. \
506                 Re-run `ckg index <repo>` afterwards. (The old DB stays intact \
507                 until the new empty DB is atomically swapped in.)",
508                db_path.display()
509            );
510            drop(db);
511            let new_db = rebuild_at_path(&db_path, &current)?;
512            if let Some(rp) = &root_path {
513                stamp_root_path(&new_db, rp)?;
514            }
515            stamp_needs_reindex(&new_db, true)?;
516            return Ok(Self {
517                repo_id,
518                db_path,
519                db: new_db,
520            });
521        }
522
523        // Stamp current schema version (idempotent — `:put` overwrites).
524        stamp_schema_version(&db, &current)?;
525
526        // I2: root_path verification.
527        // CR-M-7: tolerate symlink-equivalent paths. macOS `/var ↔
528        // /private/var` and Linux bind-mount aliases produced spurious
529        // "RepoId collision" errors when the caller passed one shape
530        // and the DB recorded the other. The fix: if a byte-compare
531        // mismatch occurs but BOTH sides canonicalize to the same
532        // absolute path, accept and re-stamp the recorded value with
533        // the canonical form (one-shot transparent migration). True
534        // collisions (two distinct repos that happen to hash to the
535        // same RepoId) still differ post-canonicalize and continue to
536        // fail closed.
537        if let Some(caller_root) = root_path {
538            match read_root_path(&db) {
539                RootPathProbe::Recorded(stored) if stored != caller_root => {
540                    if paths_canonicalize_equal(&stored, &caller_root) {
541                        // CR-review-#3: stamp the canonical absolute shape
542                        // (not the caller's), so any future caller passing
543                        // an alias canonicalizes to the same recorded
544                        // value and triggers no further migration writes.
545                        // Prevents flap under concurrent multi-shape opens.
546                        let canon = canonical_or_input(&caller_root);
547                        // Guard against a symlink swap: even when two paths
548                        // canonicalize to the same directory, that directory
549                        // must belong to the same repo. A swapped symlink
550                        // pointing at a different real repo would produce a
551                        // different RepoId — reject to fail closed rather
552                        // than accepting an impostor. Only check when
553                        // canonicalization actually succeeds (i.e. canon
554                        // differs from the raw input, meaning canonicalize
555                        // resolved something).
556                        let canon_path = std::path::Path::new(&canon);
557                        if let Ok(derived) = ckg_core::RepoId::from_path(canon_path) {
558                            if derived != repo_id {
559                                return Err(Error::Storage(format!(
560                                    "symlink-swap detected at {}: paths {stored:?} and \
561                                     {caller_root:?} canonicalize to the same directory \
562                                     ({canon:?}) but that directory's RepoId ({}) does not \
563                                     match the DB's RepoId ({}). Refusing open.",
564                                    db_path.display(),
565                                    derived.as_str(),
566                                    repo_id.as_str()
567                                )));
568                            }
569                        }
570                        if stored != canon {
571                            tracing::warn!(
572                                "{}: recorded root_path {stored:?} differs from caller \
573                                 {caller_root:?} byte-for-byte but canonicalizes equal — \
574                                 re-stamping with canonical shape {canon:?} (CR-M-7 auto-migrate)",
575                                db_path.display()
576                            );
577                            stamp_root_path(&db, &canon)?;
578                        }
579                        // Note: registry's `Repo.root_path` (stamped by
580                        // put_repo at index time) is NOT updated here —
581                        // it's the manifest-key authority and changing
582                        // it could break ckg-resolve lookup. The Meta-vs-
583                        // registry asymmetry is intentional; callers
584                        // matching by registry shape will hit auto-
585                        // migrate exactly once per (alias, repo) pair.
586                    } else {
587                        return Err(Error::Storage(format!(
588                            "RepoId collision detected at {}: DB recorded root_path \
589                             {stored:?} but caller passed {caller_root:?}. Refusing to \
590                             silently merge two repos. Recover by removing the per-repo \
591                             DB and re-indexing the correct repo.",
592                            db_path.display()
593                        )));
594                    }
595                }
596                RootPathProbe::Recorded(_) => { /* matches: confirmed identity */ }
597                RootPathProbe::NoRow => {
598                    stamp_root_path(&db, &caller_root)?;
599                }
600                RootPathProbe::ReadFailed(e) => {
601                    return Err(Error::Storage(format!(
602                        "could not verify recorded root_path at {} (Meta read failed: {e}); \
603                         refusing to open under collision-detection contract. Re-run with \
604                         `Storage::open_unverified` if you intend to bypass I2 protection.",
605                        db_path.display()
606                    )));
607                }
608            }
609        }
610
611        // CR-I-2: Crash-recovery probe. If `index_in_progress` was set but
612        // never cleared, promote `needs_reindex=true` and clear the flag.
613        if read_meta_bool(&db, "index_in_progress") {
614            tracing::warn!(
615                "{}: previous index run did not complete cleanly — \
616                 stamping needs_reindex=true. The :put indexer upserts \
617                 rows by id, so plain `ckg index` would NOT evict \
618                 orphan rows from the partial run. Re-run \
619                 `ckg index --clean` to repopulate from a clean state.",
620                db_path.display()
621            );
622            stamp_needs_reindex(&db, true)?;
623            stamp_meta_bool(&db, "index_in_progress", false)?;
624        }
625        Ok(Self {
626            repo_id,
627            db_path,
628            db,
629        })
630    }
631}
632
633#[cfg(test)]
634mod tests {
635    use super::*;
636    use ckg_core::RepoId;
637    use tempfile::tempdir;
638
639    /// H3: Opening a DB stamped with SCHEMA_VERSION + 1 must be refused.
640    /// Policy: forced upgrade — no --allow-downgrade escape hatch.
641    #[test]
642    fn schema_downgrade_is_refused() {
643        let base = tempdir().unwrap();
644        let repo_id = RepoId::try_new("aabbccddeeff00112233aabb").unwrap();
645
646        // First open — creates the DB at current schema version.
647        let storage = Storage::open_unverified(repo_id.clone(), base.path()).unwrap();
648        let future_version = (crate::schema::SCHEMA_VERSION + 1).to_string();
649        // Stamp a future version to simulate a newer binary having written this DB.
650        stamp_schema_version(storage.db(), &future_version).unwrap();
651        drop(storage);
652
653        // Second open — must refuse (stored_v > current_v).
654        let result = Storage::open_unverified(repo_id, base.path());
655        assert!(result.is_err(), "open must refuse when stored schema version > binary version");
656        let msg = result.err().unwrap().to_string();
657        assert!(
658            msg.contains("newer than") || msg.contains("schema version"),
659            "error must reference schema version mismatch; got: {msg}"
660        );
661        assert!(
662            msg.contains(&future_version),
663            "error must include stored version number; got: {msg}"
664        );
665    }
666
667    /// STORAGE-M-2: `read_root_path` on a fresh DB (no row stamped yet) must
668    /// return `NoRow`, not `ReadFailed`. Verifies the `None => NoRow` arm is
669    /// reachable and correctly distinct from an error.
670    ///
671    /// Note: Cozo enforces `Meta {key: String => value: String}` at the schema
672    /// level, so it is impossible to stamp a Null or Int into Meta via the
673    /// standard API. The `Some(other) => ReadFailed(...)` arm in `read_root_path`
674    /// is a defensive catch for hypothetical future Cozo API changes that relax
675    /// type enforcement (e.g. expose a union DataValue in projection). It cannot
676    /// be exercised via a live DB in the current Cozo version.
677    #[test]
678    fn absent_root_path_returns_no_row() {
679        let base = tempdir().unwrap();
680        let repo_id = RepoId::try_new("cafebabe0011223344556677").unwrap();
681        // open_unverified creates the schema but does not stamp root_path.
682        let storage = Storage::open_unverified(repo_id, base.path()).unwrap();
683        let db = storage.db();
684
685        let probe = read_root_path(db);
686        assert!(
687            matches!(probe, RootPathProbe::NoRow),
688            "fresh DB must return NoRow before root_path is stamped"
689        );
690    }
691
692    /// STORAGE-M-2b: once root_path is stamped, `read_root_path` must return
693    /// `Recorded` with the correct value — confirming the `Some(Str)` arm.
694    #[test]
695    fn stamped_root_path_returns_recorded() {
696        let base = tempdir().unwrap();
697        let repo_id = RepoId::try_new("aabb00112233445566778899").unwrap();
698        let storage = Storage::open_unverified(repo_id, base.path()).unwrap();
699        let db = storage.db();
700
701        stamp_root_path(db, "/my/repo").unwrap();
702        let probe = read_root_path(db);
703        match probe {
704            RootPathProbe::Recorded(s) => assert_eq!(s, "/my/repo"),
705            other => panic!("expected Recorded, got {other:?}"),
706        }
707    }
708
709    /// H4: A non-numeric schema_version must hard-error, not silently rebuild.
710    /// Previously `parse().unwrap_or(0)` would trigger rebuild and destroy data.
711    #[test]
712    fn corrupted_schema_version_is_refused() {
713        let base = tempdir().unwrap();
714        let repo_id = RepoId::try_new("deadbeefcafe001122334455").unwrap();
715
716        // First open — creates the DB normally.
717        let storage = Storage::open_unverified(repo_id.clone(), base.path()).unwrap();
718        // Stamp a non-numeric value to simulate on-disk corruption.
719        stamp_schema_version(storage.db(), "not_a_number").unwrap();
720        drop(storage);
721
722        // Second open — must error, NOT silently rebuild.
723        let result = Storage::open_unverified(repo_id, base.path());
724        assert!(result.is_err(), "open must refuse on corrupted (non-numeric) schema_version");
725        let msg = result.err().unwrap().to_string();
726        assert!(
727            msg.contains("corrupted") || msg.contains("not a valid"),
728            "error must mention schema version corruption; got: {msg}"
729        );
730        assert!(
731            msg.contains("not_a_number"),
732            "error must echo the bad stored value; got: {msg}"
733        );
734    }
735}