ckg_storage/store/lifecycle.rs
1//! DB lifecycle: open, schema-version check, rebuild, path-swap, debris sweep.
2//!
3//! Contains `Storage::open_at`, `open_unverified`, `open_inner`, plus all
4//! the supporting free functions for the build-then-swap rebuild pattern and
5//! `root_path` collision-detection (I2).
6
7use std::collections::BTreeMap;
8use std::path::{Path, PathBuf};
9
10use ckg_core::{Error, RepoId, Result};
11use cozo::{DataValue, DbInstance, ScriptMutability};
12
13use crate::schema::PER_REPO_DDL;
14
15use super::meta::{read_meta_bool, stamp_meta_bool, stamp_needs_reindex};
16use super::{Storage, map_err};
17
18/// Cozo engine label for the `storage-new-rocksdb` feature.
19pub(super) const ENGINE: &str = "newrocksdb";
20
21/// Result of probing Meta for the recorded `root_path`.
22///
23/// **NI1 fix**: the original implementation used `.ok()?` which collapsed
24/// every error class — Meta missing (legit fresh DB), Cozo IO failure,
25/// transient lock — into `None`. Three explicit states let `open_inner`
26/// distinguish "no row → stamp it" from "could not read → refuse open".
27#[derive(Debug)]
28pub(super) enum RootPathProbe {
29 /// Meta has a `root_path` row with this value.
30 Recorded(String),
31 /// Query succeeded with zero rows — fresh DB or pre-I2 DB. Safe to stamp.
32 NoRow,
33 /// Query itself failed. Hard error: do NOT stamp.
34 ReadFailed(String),
35}
36
37fn read_root_path(db: &DbInstance) -> RootPathProbe {
38 match db.run_script(
39 "?[v] := *Meta{key: \"root_path\", value: v}",
40 BTreeMap::new(),
41 ScriptMutability::Immutable,
42 ) {
43 Ok(rows) => match rows.rows.first().and_then(|r| r.first()) {
44 Some(DataValue::Str(s)) => RootPathProbe::Recorded(s.to_string()),
45 // No row at all — fresh DB or pre-I2 DB. Safe to stamp.
46 None => RootPathProbe::NoRow,
47 // Row exists but value is not a string (e.g. Null or Int from a
48 // corrupted write). Do NOT treat this as NoRow — that would
49 // silently overwrite corrupted data and mask the problem.
50 Some(other) => RootPathProbe::ReadFailed(format!(
51 "non-string root_path in Meta: {:?}",
52 other
53 )),
54 },
55 Err(e) => RootPathProbe::ReadFailed(e.to_string()),
56 }
57}
58
59/// Write the `schema_version` row into `Meta` via parameter binding so a
60/// non-numeric / quote-bearing version string cannot escape the literal.
61fn stamp_schema_version(db: &DbInstance, schema_version: &str) -> Result<()> {
62 let mut params = BTreeMap::new();
63 params.insert("v".into(), DataValue::from(schema_version));
64 db.run_script(
65 "?[key, value] <- [[\"schema_version\", $v]] :put Meta {key => value}",
66 params,
67 ScriptMutability::Mutable,
68 )
69 .map_err(map_err)?;
70 Ok(())
71}
72
73/// Stamp the canonical `root_path` for this repo into `Meta` (I2).
74fn stamp_root_path(db: &DbInstance, root_path: &str) -> Result<()> {
75 let mut params = BTreeMap::new();
76 params.insert("v".into(), DataValue::from(root_path));
77 db.run_script(
78 "?[key, value] <- [[\"root_path\", $v]] :put Meta {key => value}",
79 params,
80 ScriptMutability::Mutable,
81 )
82 .map_err(map_err)?;
83 Ok(())
84}
85
86/// CR-M-7: returns true iff two path-shaped strings resolve to the same
87/// absolute path after `Path::canonicalize`. Used to tolerate
88/// symlink-equivalent recorded vs caller paths (macOS `/var ↔
89/// /private/var`, Linux bind mounts). Conservative: if EITHER side
90/// fails to canonicalize (path no longer exists, broken symlink, etc.)
91/// returns false — we can't prove equivalence, so the caller's
92/// collision-detection error fires (matches pre-M-7 fail-closed
93/// behavior). True hash collisions between two distinct repos
94/// canonicalize to different absolutes and also return false.
95fn paths_canonicalize_equal(a: &str, b: &str) -> bool {
96 // M10: cache the canonicalize result — each is a syscall.
97 let Ok(pa) = std::path::Path::new(a).canonicalize() else {
98 return false;
99 };
100 let Ok(pb) = std::path::Path::new(b).canonicalize() else {
101 return false;
102 };
103 pa == pb
104}
105
106/// CR-review-#3: returns the canonical absolute string for `path` if
107/// canonicalize succeeds, else falls back to the input. Used by M-7
108/// auto-migrate to stamp a STABLE shape regardless of which alias the
109/// caller passed — without this, two callers passing different
110/// symlink shapes would flap the recorded value back and forth on
111/// every alternating open.
112fn canonical_or_input(path: &str) -> String {
113 std::path::Path::new(path)
114 .canonicalize()
115 .ok()
116 .map(|p| p.display().to_string())
117 .unwrap_or_else(|| path.to_string())
118}
119
120/// Run a script, swallowing "relation already exists" errors so DDL is
121/// idempotent across re-opens.
122pub(super) fn run_idempotent(db: &DbInstance, script: &str) -> Result<()> {
123 use crate::cozo_compat::CozoErrorKind;
124 match db.run_script(script, BTreeMap::new(), ScriptMutability::Mutable) {
125 Ok(_) => Ok(()),
126 Err(e) => {
127 let mapped = map_err(e);
128 // F16+: classify the error shape via the typed enum. When
129 // Cozo eventually exposes typed errors upstream, only the
130 // body of `CozoErrorKind::of` changes; this match keeps
131 // working unchanged. The legacy "conflicts with an existing
132 // one" wording is handled inside `Other` via an explicit
133 // substring check so it doesn't drift.
134 match CozoErrorKind::of(&mapped) {
135 CozoErrorKind::RelationAlreadyExists | CozoErrorKind::RelationConflict => Ok(()),
136 CozoErrorKind::Other
137 if mapped.to_string().contains("conflicts with an existing one") =>
138 {
139 Ok(())
140 }
141 _ => Err(mapped),
142 }
143 }
144 }
145}
146
147/// Schema-mismatch rebuild using a build-then-swap pattern:
148/// 1. Build a fresh DB at `<path>.new` (DDL + schema-stamp). On failure, old DB untouched.
149/// 2. Drop the new handle. Sweep stale `<path>.old`.
150/// 3. `rename(path → path.old)` + `rename(path.new → path)` — two atomic renames.
151/// 4. Reopen at `path`. Cleanup `path.old`.
152fn rebuild_at_path(path: &Path, schema_version: &str) -> Result<DbInstance> {
153 let parent = path.parent().ok_or_else(|| {
154 Error::Storage(format!("rebuild target has no parent: {}", path.display()))
155 })?;
156 let file = path.file_name().and_then(|s| s.to_str()).ok_or_else(|| {
157 Error::Storage(format!(
158 "rebuild target has no file name: {}",
159 path.display()
160 ))
161 })?;
162 let new_path = parent.join(format!("{file}.new"));
163 let old_path = parent.join(format!("{file}.old"));
164
165 // Sweep any debris from a prior crashed rebuild before claiming names.
166 sweep_rebuild_debris(path);
167
168 // 1. Build the fresh DB at <path>.new. Original is untouched on error.
169 std::fs::create_dir_all(&new_path)?;
170 let new_db = DbInstance::new(ENGINE, &new_path, "{}").map_err(map_err)?;
171 for stmt in PER_REPO_DDL {
172 run_idempotent(&new_db, stmt)?;
173 }
174 stamp_schema_version(&new_db, schema_version)?;
175 // Drop the handle so the rename can move the directory cleanly.
176 drop(new_db);
177
178 // 2. Swap directories (atomic on Linux, two-rename on others).
179 swap_directories(path, &new_path, &old_path)?;
180
181 // 3. Reopen at the canonical path.
182 let installed = DbInstance::new(ENGINE, path, "{}").map_err(map_err)?;
183
184 // 4. Cleanup the old DB — best-effort, log on failure.
185 if let Err(cleanup_err) = std::fs::remove_dir_all(&old_path) {
186 tracing::error!(
187 "rebuild succeeded but old-DB cleanup failed at {}: {cleanup_err}; \
188 will be swept on next rebuild",
189 old_path.display()
190 );
191 }
192
193 Ok(installed)
194}
195
196/// Linux: atomically swap `<path>` and `<new_path>` via
197/// `renameat2(RENAME_EXCHANGE)`, then rename the displaced original to
198/// `<old_path>` for cleanup. No missing-path window for concurrent readers.
199///
200/// Other platforms: fall back to two `std::fs::rename` calls. Brief window
201/// where `<path>` doesn't exist (~one syscall); concurrent readers get ENOENT.
202#[cfg(target_os = "linux")]
203fn swap_directories(path: &Path, new_path: &Path, old_path: &Path) -> Result<()> {
204 use std::ffi::CString;
205 use std::os::unix::ffi::OsStrExt;
206
207 let c_path = CString::new(path.as_os_str().as_bytes())
208 .map_err(|e| Error::wrap("rebuild path has interior NUL", e))?;
209 let c_new = CString::new(new_path.as_os_str().as_bytes())
210 .map_err(|e| Error::wrap("rebuild path has interior NUL", e))?;
211 // SAFETY: passing C strings owned by `c_path` / `c_new`; AT_FDCWD matches
212 // std::fs semantics. Syscall returns 0 on success, -1 on error.
213 let rc = unsafe {
214 libc::renameat2(
215 libc::AT_FDCWD,
216 c_path.as_ptr(),
217 libc::AT_FDCWD,
218 c_new.as_ptr(),
219 libc::RENAME_EXCHANGE,
220 )
221 };
222 if rc != 0 {
223 // Fallback: kernel < 3.15 or filesystem doesn't support RENAME_EXCHANGE.
224 let errno = std::io::Error::last_os_error();
225 tracing::warn!(
226 "renameat2(RENAME_EXCHANGE) failed ({errno}); falling back to two-rename swap"
227 );
228 return swap_directories_two_rename(path, new_path, old_path);
229 }
230 // After EXCHANGE: `<path>` holds the new DB; `<new_path>` holds the old.
231 if let Err(rename_err) = std::fs::rename(new_path, old_path) {
232 tracing::error!(
233 "renameat2 swap succeeded but old-DB rename {} → {} failed: {rename_err}",
234 new_path.display(),
235 old_path.display()
236 );
237 }
238 Ok(())
239}
240
241#[cfg(not(target_os = "linux"))]
242fn swap_directories(path: &Path, new_path: &Path, old_path: &Path) -> Result<()> {
243 swap_directories_two_rename(path, new_path, old_path)
244}
245
246/// Portable fallback: rename original out of the way, then move new in.
247/// Brief missing-path window between the two renames.
248fn swap_directories_two_rename(path: &Path, new_path: &Path, old_path: &Path) -> Result<()> {
249 // M7: absolute paths only — relative paths with rename() are CWD-dependent.
250 debug_assert!(path.is_absolute(), "swap_directories: path must be absolute: {path:?}");
251 std::fs::rename(path, old_path)?;
252 if let Err(swap_err) = std::fs::rename(new_path, path) {
253 // Restore the original so the user isn't worse off.
254 if let Err(rb) = std::fs::rename(old_path, path) {
255 // CRITICAL: original is now orphaned at <old_path> and the
256 // canonical path is empty. Surface a distinct hard error so
257 // an operator can `mv <old_path> <path>` manually.
258 tracing::error!(
259 "rebuild swap failed AND rollback failed: could not restore {} → {}: {rb}",
260 old_path.display(),
261 path.display()
262 );
263 // STORAGE-C1: remove the orphaned .new directory so
264 // `sweep_rebuild_debris` on next open doesn't delete healthy
265 // DDL that an operator might want to inspect.
266 let _ = std::fs::remove_dir_all(new_path);
267 return Err(Error::Storage(format!(
268 "DB CORRUPTED: rebuild swap failed (swap_err: {swap_err}) and rollback also \
269 failed (rb: {rb}). Original DB is at {} (manual recovery: \
270 `mv {} {}`).",
271 old_path.display(),
272 old_path.display(),
273 path.display(),
274 )));
275 }
276 return Err(swap_err.into());
277 }
278 Ok(())
279}
280
281/// Sweep stale rebuild debris (`<path>.new` / `<path>.old`).
282///
283/// **Recovery preservation (R1):** if `<path>` itself doesn't exist or lacks
284/// a `CURRENT` file (i.e. is not a healthy RocksDB), `<path>.old` is the
285/// manual-recovery copy of a prior failed swap — leave it in place.
286/// We sweep `.new` unconditionally — that's always partial-build debris.
287///
288/// ## Concurrency (L2)
289///
290/// This function is called at `open_at()` entry, BEFORE acquiring the repo
291/// lock. Concurrent openers may race through sweep — that's benign because
292/// `remove_dir_all` of a non-existent path returns `NotFound` which we log
293/// and ignore. The worst case is a duplicated `tracing::warn` line.
294///
295/// ## Visibility (L9)
296///
297/// `pub(super)` — called from `open_at()` and `open_unverified()` in this
298/// module. Not exposed to downstream crates.
299pub(super) fn sweep_rebuild_debris(path: &Path) {
300 let parent = match path.parent() {
301 Some(p) => p,
302 None => return,
303 };
304 let file = match path.file_name().and_then(|s| s.to_str()) {
305 Some(f) => f,
306 None => return,
307 };
308 let new_path = parent.join(format!("{file}.new"));
309 if new_path.exists()
310 && let Err(e) = std::fs::remove_dir_all(&new_path)
311 {
312 tracing::warn!(
313 "could not sweep rebuild debris at {}: {e}",
314 new_path.display()
315 );
316 }
317 // A RocksDB directory is healthy when CURRENT exists (pointing at the
318 // active MANIFEST) AND at least one MANIFEST-* file is present. CURRENT
319 // alone can exist as an empty shell left by a partial init — requiring a
320 // MANIFEST-* file as well ensures we don't delete the .old recovery copy
321 // when the primary directory is only partially written.
322 // Conservative: when in doubt, do NOT delete .old.
323 let has_manifest_file = std::fs::read_dir(path)
324 .map(|mut dir| {
325 dir.any(|entry| {
326 entry
327 .ok()
328 .and_then(|e| e.file_name().into_string().ok())
329 .map(|name| name.starts_with("MANIFEST-"))
330 .unwrap_or(false)
331 })
332 })
333 .unwrap_or(false);
334 let path_has_healthy_db = path.join("CURRENT").is_file() && has_manifest_file;
335 if !path_has_healthy_db {
336 let old_path = parent.join(format!("{file}.old"));
337 if old_path.exists() {
338 tracing::warn!(
339 "preserving recovery dir at {} because {} is empty/missing — \
340 manual recovery: `mv {} {}`",
341 old_path.display(),
342 path.display(),
343 old_path.display(),
344 path.display()
345 );
346 }
347 return;
348 }
349 let old_path = parent.join(format!("{file}.old"));
350 if old_path.exists()
351 && let Err(e) = std::fs::remove_dir_all(&old_path)
352 {
353 tracing::warn!(
354 "could not sweep rebuild debris at {}: {e}",
355 old_path.display()
356 );
357 }
358}
359
360impl Storage {
361 /// **Escape hatch** — open without recording / verifying a canonical
362 /// root_path. Use ONLY when the caller has no canonical root available
363 /// (tests, ad-hoc CLI exploration of a known `repo_id`, benchmarks).
364 pub fn open_unverified(repo_id: RepoId, base: &Path) -> Result<Self> {
365 Self::open_inner(repo_id, base, None, /*allow_rebuild=*/ true)
366 }
367
368 /// Open or create with canonical root-path verification (I2).
369 ///
370 /// On first init, the canonical path is stamped into Meta. Subsequent opens
371 /// read it back; if the recorded path doesn't match we **refuse** rather
372 /// than silently merge two repos that happen to hash to the same 96-bit RepoId.
373 ///
374 /// On schema-version mismatch, performs a destructive auto-rebuild. This is
375 /// intentional for the indexer path, which holds the repo lock. Readers
376 /// (query/resolve/MCP) should use `open_at_readonly` instead so they never
377 /// trigger a destructive rebuild concurrently with an ongoing `ckg index`.
378 pub fn open_at(repo_id: RepoId, root_path: &Path, base: &Path) -> Result<Self> {
379 // CR-review-#7: this is the caller's path, lossy-decoded — NOT
380 // canonicalized (the variable used to be called `canonical`
381 // which was misleading). The actual canonicalize happens
382 // inside `paths_canonicalize_equal` / `canonical_or_input`.
383 let caller_root = root_path.to_string_lossy().into_owned();
384 Self::open_inner(repo_id, base, Some(caller_root), /*allow_rebuild=*/ true)
385 }
386
387 /// Read-only open: like `open_at` but **never** triggers a destructive
388 /// schema-mismatch rebuild. On version mismatch, returns a clear error
389 /// directing the user to re-index rather than wiping the DB.
390 ///
391 /// Use this from all reader paths (query, resolve, MCP) so a reader
392 /// running concurrently with `ckg index` cannot destroy the indexer's
393 /// in-progress write.
394 pub fn open_at_readonly(repo_id: RepoId, root_path: &Path, base: &Path) -> Result<Self> {
395 let caller_root = root_path.to_string_lossy().into_owned();
396 Self::open_inner(repo_id, base, Some(caller_root), /*allow_rebuild=*/ false)
397 }
398
399 pub(super) fn open_inner(
400 repo_id: RepoId,
401 base: &Path,
402 root_path: Option<String>,
403 allow_rebuild: bool,
404 ) -> Result<Self> {
405 // CR-storage-H1: defense-in-depth at the FS boundary. RepoId's
406 // private field + try_new validator ensures only 24-hex strings
407 // reach here from production code, but a debug-mode assert
408 // catches accidental construction via raw struct expressions
409 // in tests / future internal code paths. Production is fail-
410 // closed too — Cozo would reject the path, but the error
411 // surface is opaque.
412 debug_assert!(
413 RepoId::is_valid(repo_id.as_str()),
414 "RepoId fed into open_inner is not 24-hex: {:?}",
415 repo_id.as_str()
416 );
417 let db_path: PathBuf = base.join("workspace_folders").join(repo_id.as_str());
418 sweep_rebuild_debris(&db_path);
419 // I1/A4: 0700 atomically at create — see ckg_fs::create_dir_secure.
420 ckg_fs::create_dir_secure(&db_path)
421 .map_err(|e| ckg_core::Error::Storage(format!("create repo db dir: {e}")))?;
422 let db = DbInstance::new(ENGINE, &db_path, "{}").map_err(map_err)?;
423 for stmt in PER_REPO_DDL {
424 run_idempotent(&db, stmt)?;
425 }
426
427 // Schema-version check: on mismatch, auto-rebuild.
428 // L7: use the pre-computed string constant to avoid repeated `.to_string()`.
429 let current = crate::schema::SCHEMA_VERSION_STR;
430 // Fail closed on read error: an IO or lock failure here could mask a
431 // real schema mismatch. Treating it as "fresh DB" would stamp the
432 // current schema version over an unknown on-disk state — potentially
433 // opening a DB whose schema we cannot safely interpret. Mirror the
434 // RootPathProbe::ReadFailed policy: refuse the open so the operator
435 // can inspect and recover explicitly.
436 let read = db.run_script(
437 "?[v] := *Meta{key: \"schema_version\", value: v}",
438 BTreeMap::new(),
439 ScriptMutability::Immutable,
440 ).map_err(|e| {
441 Error::Storage(format!(
442 "could not read schema_version from {} ({e}); \
443 refusing to open under unknown schema state. \
444 If this is a new DB, delete it and let ckg recreate it.",
445 db_path.display()
446 ))
447 })?;
448
449 let mismatch = read
450 .rows.first()
451 .and_then(|row| row.first())
452 .and_then(|v| match v {
453 DataValue::Str(s) if s.as_str() != current => Some(s.to_string()),
454 _ => None,
455 });
456 if let Some(stored) = mismatch {
457 // H4: parse().unwrap_or(0) would silently treat a corrupted
458 // non-numeric schema_version as 0, then trigger rebuild and
459 // destroy the DB. Hard-error instead so the operator can inspect.
460 let stored_v: u32 = stored.parse().map_err(|_| {
461 Error::Storage(format!(
462 "schema version meta is corrupted at {}: \
463 stored value {stored:?} is not a valid u32. \
464 Manual recovery: inspect Meta{{key:\"schema_version\"}} \
465 and either stamp the correct value or delete the DB.",
466 db_path.display()
467 ))
468 })?;
469 // M9: refuse downgrade — stored schema is newer than this binary.
470 // Policy: if the binary is older than the on-disk schema, the
471 // operator must upgrade ckg. We deliberately do NOT offer an
472 // --allow-downgrade escape hatch — the on-disk format may have
473 // changed in ways an older binary cannot read safely, and a
474 // silent downgrade would corrupt the DB.
475 let current_v: u32 = current.parse().unwrap_or(0);
476 if stored_v > current_v {
477 return Err(map_err(format!(
478 "DB at {} has schema version {stored} which is newer than \
479 this binary's version {current}. Upgrade ckg to >= v{stored} \
480 or delete the DB and re-index.",
481 db_path.display()
482 )));
483 }
484 // Reader paths (query/resolve/MCP) must never trigger a destructive
485 // rebuild. They run concurrently with `ckg index` and hold no repo
486 // lock; wiping the DB mid-index would corrupt the indexer's write.
487 // Return a clear actionable error instead.
488 if !allow_rebuild {
489 return Err(Error::Storage(format!(
490 "DB at {} has schema version v{stored} but this binary expects v{current}. \
491 Run `ckg index --clean <repo>` to upgrade the schema. \
492 (Read-only paths never auto-rebuild to avoid corrupting a concurrent index.)",
493 db_path.display()
494 )));
495 }
496 // N1 (red-team #2): make it loud that "auto-rebuild" wipes
497 // the existing graph. `rebuild_at_path` builds an EMPTY DB
498 // and stamps `needs_reindex=true`; the user must re-run
499 // `ckg index` before any query returns useful data. The
500 // atomic-swap pattern below keeps the OLD DB intact until
501 // the new one is ready, so a crash mid-rebuild is safe,
502 // but the end state still loses every Symbol/edge row.
503 tracing::warn!(
504 "schema version mismatch (on-disk: v{stored}, binary: v{current}); \
505 DESTRUCTIVE rebuild of {} — existing symbols/edges will be wiped. \
506 Re-run `ckg index <repo>` afterwards. (The old DB stays intact \
507 until the new empty DB is atomically swapped in.)",
508 db_path.display()
509 );
510 drop(db);
511 let new_db = rebuild_at_path(&db_path, ¤t)?;
512 if let Some(rp) = &root_path {
513 stamp_root_path(&new_db, rp)?;
514 }
515 stamp_needs_reindex(&new_db, true)?;
516 return Ok(Self {
517 repo_id,
518 db_path,
519 db: new_db,
520 });
521 }
522
523 // Stamp current schema version (idempotent — `:put` overwrites).
524 stamp_schema_version(&db, ¤t)?;
525
526 // I2: root_path verification.
527 // CR-M-7: tolerate symlink-equivalent paths. macOS `/var ↔
528 // /private/var` and Linux bind-mount aliases produced spurious
529 // "RepoId collision" errors when the caller passed one shape
530 // and the DB recorded the other. The fix: if a byte-compare
531 // mismatch occurs but BOTH sides canonicalize to the same
532 // absolute path, accept and re-stamp the recorded value with
533 // the canonical form (one-shot transparent migration). True
534 // collisions (two distinct repos that happen to hash to the
535 // same RepoId) still differ post-canonicalize and continue to
536 // fail closed.
537 if let Some(caller_root) = root_path {
538 match read_root_path(&db) {
539 RootPathProbe::Recorded(stored) if stored != caller_root => {
540 if paths_canonicalize_equal(&stored, &caller_root) {
541 // CR-review-#3: stamp the canonical absolute shape
542 // (not the caller's), so any future caller passing
543 // an alias canonicalizes to the same recorded
544 // value and triggers no further migration writes.
545 // Prevents flap under concurrent multi-shape opens.
546 let canon = canonical_or_input(&caller_root);
547 // Guard against a symlink swap: even when two paths
548 // canonicalize to the same directory, that directory
549 // must belong to the same repo. A swapped symlink
550 // pointing at a different real repo would produce a
551 // different RepoId — reject to fail closed rather
552 // than accepting an impostor. Only check when
553 // canonicalization actually succeeds (i.e. canon
554 // differs from the raw input, meaning canonicalize
555 // resolved something).
556 let canon_path = std::path::Path::new(&canon);
557 if let Ok(derived) = ckg_core::RepoId::from_path(canon_path) {
558 if derived != repo_id {
559 return Err(Error::Storage(format!(
560 "symlink-swap detected at {}: paths {stored:?} and \
561 {caller_root:?} canonicalize to the same directory \
562 ({canon:?}) but that directory's RepoId ({}) does not \
563 match the DB's RepoId ({}). Refusing open.",
564 db_path.display(),
565 derived.as_str(),
566 repo_id.as_str()
567 )));
568 }
569 }
570 if stored != canon {
571 tracing::warn!(
572 "{}: recorded root_path {stored:?} differs from caller \
573 {caller_root:?} byte-for-byte but canonicalizes equal — \
574 re-stamping with canonical shape {canon:?} (CR-M-7 auto-migrate)",
575 db_path.display()
576 );
577 stamp_root_path(&db, &canon)?;
578 }
579 // Note: registry's `Repo.root_path` (stamped by
580 // put_repo at index time) is NOT updated here —
581 // it's the manifest-key authority and changing
582 // it could break ckg-resolve lookup. The Meta-vs-
583 // registry asymmetry is intentional; callers
584 // matching by registry shape will hit auto-
585 // migrate exactly once per (alias, repo) pair.
586 } else {
587 return Err(Error::Storage(format!(
588 "RepoId collision detected at {}: DB recorded root_path \
589 {stored:?} but caller passed {caller_root:?}. Refusing to \
590 silently merge two repos. Recover by removing the per-repo \
591 DB and re-indexing the correct repo.",
592 db_path.display()
593 )));
594 }
595 }
596 RootPathProbe::Recorded(_) => { /* matches: confirmed identity */ }
597 RootPathProbe::NoRow => {
598 stamp_root_path(&db, &caller_root)?;
599 }
600 RootPathProbe::ReadFailed(e) => {
601 return Err(Error::Storage(format!(
602 "could not verify recorded root_path at {} (Meta read failed: {e}); \
603 refusing to open under collision-detection contract. Re-run with \
604 `Storage::open_unverified` if you intend to bypass I2 protection.",
605 db_path.display()
606 )));
607 }
608 }
609 }
610
611 // CR-I-2: Crash-recovery probe. If `index_in_progress` was set but
612 // never cleared, promote `needs_reindex=true` and clear the flag.
613 if read_meta_bool(&db, "index_in_progress") {
614 tracing::warn!(
615 "{}: previous index run did not complete cleanly — \
616 stamping needs_reindex=true. The :put indexer upserts \
617 rows by id, so plain `ckg index` would NOT evict \
618 orphan rows from the partial run. Re-run \
619 `ckg index --clean` to repopulate from a clean state.",
620 db_path.display()
621 );
622 stamp_needs_reindex(&db, true)?;
623 stamp_meta_bool(&db, "index_in_progress", false)?;
624 }
625 Ok(Self {
626 repo_id,
627 db_path,
628 db,
629 })
630 }
631}
632
633#[cfg(test)]
634mod tests {
635 use super::*;
636 use ckg_core::RepoId;
637 use tempfile::tempdir;
638
639 /// H3: Opening a DB stamped with SCHEMA_VERSION + 1 must be refused.
640 /// Policy: forced upgrade — no --allow-downgrade escape hatch.
641 #[test]
642 fn schema_downgrade_is_refused() {
643 let base = tempdir().unwrap();
644 let repo_id = RepoId::try_new("aabbccddeeff00112233aabb").unwrap();
645
646 // First open — creates the DB at current schema version.
647 let storage = Storage::open_unverified(repo_id.clone(), base.path()).unwrap();
648 let future_version = (crate::schema::SCHEMA_VERSION + 1).to_string();
649 // Stamp a future version to simulate a newer binary having written this DB.
650 stamp_schema_version(storage.db(), &future_version).unwrap();
651 drop(storage);
652
653 // Second open — must refuse (stored_v > current_v).
654 let result = Storage::open_unverified(repo_id, base.path());
655 assert!(result.is_err(), "open must refuse when stored schema version > binary version");
656 let msg = result.err().unwrap().to_string();
657 assert!(
658 msg.contains("newer than") || msg.contains("schema version"),
659 "error must reference schema version mismatch; got: {msg}"
660 );
661 assert!(
662 msg.contains(&future_version),
663 "error must include stored version number; got: {msg}"
664 );
665 }
666
667 /// STORAGE-M-2: `read_root_path` on a fresh DB (no row stamped yet) must
668 /// return `NoRow`, not `ReadFailed`. Verifies the `None => NoRow` arm is
669 /// reachable and correctly distinct from an error.
670 ///
671 /// Note: Cozo enforces `Meta {key: String => value: String}` at the schema
672 /// level, so it is impossible to stamp a Null or Int into Meta via the
673 /// standard API. The `Some(other) => ReadFailed(...)` arm in `read_root_path`
674 /// is a defensive catch for hypothetical future Cozo API changes that relax
675 /// type enforcement (e.g. expose a union DataValue in projection). It cannot
676 /// be exercised via a live DB in the current Cozo version.
677 #[test]
678 fn absent_root_path_returns_no_row() {
679 let base = tempdir().unwrap();
680 let repo_id = RepoId::try_new("cafebabe0011223344556677").unwrap();
681 // open_unverified creates the schema but does not stamp root_path.
682 let storage = Storage::open_unverified(repo_id, base.path()).unwrap();
683 let db = storage.db();
684
685 let probe = read_root_path(db);
686 assert!(
687 matches!(probe, RootPathProbe::NoRow),
688 "fresh DB must return NoRow before root_path is stamped"
689 );
690 }
691
692 /// STORAGE-M-2b: once root_path is stamped, `read_root_path` must return
693 /// `Recorded` with the correct value — confirming the `Some(Str)` arm.
694 #[test]
695 fn stamped_root_path_returns_recorded() {
696 let base = tempdir().unwrap();
697 let repo_id = RepoId::try_new("aabb00112233445566778899").unwrap();
698 let storage = Storage::open_unverified(repo_id, base.path()).unwrap();
699 let db = storage.db();
700
701 stamp_root_path(db, "/my/repo").unwrap();
702 let probe = read_root_path(db);
703 match probe {
704 RootPathProbe::Recorded(s) => assert_eq!(s, "/my/repo"),
705 other => panic!("expected Recorded, got {other:?}"),
706 }
707 }
708
709 /// H4: A non-numeric schema_version must hard-error, not silently rebuild.
710 /// Previously `parse().unwrap_or(0)` would trigger rebuild and destroy data.
711 #[test]
712 fn corrupted_schema_version_is_refused() {
713 let base = tempdir().unwrap();
714 let repo_id = RepoId::try_new("deadbeefcafe001122334455").unwrap();
715
716 // First open — creates the DB normally.
717 let storage = Storage::open_unverified(repo_id.clone(), base.path()).unwrap();
718 // Stamp a non-numeric value to simulate on-disk corruption.
719 stamp_schema_version(storage.db(), "not_a_number").unwrap();
720 drop(storage);
721
722 // Second open — must error, NOT silently rebuild.
723 let result = Storage::open_unverified(repo_id, base.path());
724 assert!(result.is_err(), "open must refuse on corrupted (non-numeric) schema_version");
725 let msg = result.err().unwrap().to_string();
726 assert!(
727 msg.contains("corrupted") || msg.contains("not a valid"),
728 "error must mention schema version corruption; got: {msg}"
729 );
730 assert!(
731 msg.contains("not_a_number"),
732 "error must echo the bad stored value; got: {msg}"
733 );
734 }
735}