use std::collections::BTreeMap;
use std::path::{Path, PathBuf};
use ckg_core::{Error, RepoId, Result};
use cozo::{DataValue, DbInstance, ScriptMutability};
use crate::schema::PER_REPO_DDL;
use super::meta::{read_meta_bool, stamp_meta_bool, stamp_needs_reindex};
use super::{Storage, map_err};
pub(super) const ENGINE: &str = "newrocksdb";
#[derive(Debug)]
pub(super) enum RootPathProbe {
Recorded(String),
NoRow,
ReadFailed(String),
}
fn read_root_path(db: &DbInstance) -> RootPathProbe {
match db.run_script(
"?[v] := *Meta{key: \"root_path\", value: v}",
BTreeMap::new(),
ScriptMutability::Immutable,
) {
Ok(rows) => match rows.rows.first().and_then(|r| r.first()) {
Some(DataValue::Str(s)) => RootPathProbe::Recorded(s.to_string()),
None => RootPathProbe::NoRow,
Some(other) => RootPathProbe::ReadFailed(format!(
"non-string root_path in Meta: {:?}",
other
)),
},
Err(e) => RootPathProbe::ReadFailed(e.to_string()),
}
}
fn stamp_schema_version(db: &DbInstance, schema_version: &str) -> Result<()> {
let mut params = BTreeMap::new();
params.insert("v".into(), DataValue::from(schema_version));
db.run_script(
"?[key, value] <- [[\"schema_version\", $v]] :put Meta {key => value}",
params,
ScriptMutability::Mutable,
)
.map_err(map_err)?;
Ok(())
}
fn stamp_root_path(db: &DbInstance, root_path: &str) -> Result<()> {
let mut params = BTreeMap::new();
params.insert("v".into(), DataValue::from(root_path));
db.run_script(
"?[key, value] <- [[\"root_path\", $v]] :put Meta {key => value}",
params,
ScriptMutability::Mutable,
)
.map_err(map_err)?;
Ok(())
}
fn paths_canonicalize_equal(a: &str, b: &str) -> bool {
let Ok(pa) = std::path::Path::new(a).canonicalize() else {
return false;
};
let Ok(pb) = std::path::Path::new(b).canonicalize() else {
return false;
};
pa == pb
}
fn canonical_or_input(path: &str) -> String {
std::path::Path::new(path)
.canonicalize()
.ok()
.map(|p| p.display().to_string())
.unwrap_or_else(|| path.to_string())
}
pub(super) fn run_idempotent(db: &DbInstance, script: &str) -> Result<()> {
match db.run_script(script, BTreeMap::new(), ScriptMutability::Mutable) {
Ok(_) => Ok(()),
Err(e) => {
let msg = e.to_string();
if msg.contains("conflicts with an existing one")
|| msg.contains("already exists")
|| msg.contains("EvalRelationConflict")
{
Ok(())
} else {
Err(map_err(e))
}
}
}
}
fn rebuild_at_path(path: &Path, schema_version: &str) -> Result<DbInstance> {
let parent = path.parent().ok_or_else(|| {
Error::Storage(format!("rebuild target has no parent: {}", path.display()))
})?;
let file = path.file_name().and_then(|s| s.to_str()).ok_or_else(|| {
Error::Storage(format!(
"rebuild target has no file name: {}",
path.display()
))
})?;
let new_path = parent.join(format!("{file}.new"));
let old_path = parent.join(format!("{file}.old"));
sweep_rebuild_debris(path);
std::fs::create_dir_all(&new_path)?;
let new_db = DbInstance::new(ENGINE, &new_path, "{}").map_err(map_err)?;
for stmt in PER_REPO_DDL {
run_idempotent(&new_db, stmt)?;
}
stamp_schema_version(&new_db, schema_version)?;
drop(new_db);
swap_directories(path, &new_path, &old_path)?;
let installed = DbInstance::new(ENGINE, path, "{}").map_err(map_err)?;
if let Err(cleanup_err) = std::fs::remove_dir_all(&old_path) {
tracing::error!(
"rebuild succeeded but old-DB cleanup failed at {}: {cleanup_err}; \
will be swept on next rebuild",
old_path.display()
);
}
Ok(installed)
}
#[cfg(target_os = "linux")]
fn swap_directories(path: &Path, new_path: &Path, old_path: &Path) -> Result<()> {
use std::ffi::CString;
use std::os::unix::ffi::OsStrExt;
let c_path = CString::new(path.as_os_str().as_bytes())
.map_err(|e| Error::wrap("rebuild path has interior NUL", e))?;
let c_new = CString::new(new_path.as_os_str().as_bytes())
.map_err(|e| Error::wrap("rebuild path has interior NUL", e))?;
let rc = unsafe {
libc::renameat2(
libc::AT_FDCWD,
c_path.as_ptr(),
libc::AT_FDCWD,
c_new.as_ptr(),
libc::RENAME_EXCHANGE,
)
};
if rc != 0 {
let errno = std::io::Error::last_os_error();
tracing::warn!(
"renameat2(RENAME_EXCHANGE) failed ({errno}); falling back to two-rename swap"
);
return swap_directories_two_rename(path, new_path, old_path);
}
if let Err(rename_err) = std::fs::rename(new_path, old_path) {
tracing::error!(
"renameat2 swap succeeded but old-DB rename {} → {} failed: {rename_err}",
new_path.display(),
old_path.display()
);
}
Ok(())
}
#[cfg(not(target_os = "linux"))]
fn swap_directories(path: &Path, new_path: &Path, old_path: &Path) -> Result<()> {
swap_directories_two_rename(path, new_path, old_path)
}
fn swap_directories_two_rename(path: &Path, new_path: &Path, old_path: &Path) -> Result<()> {
debug_assert!(path.is_absolute(), "swap_directories: path must be absolute: {path:?}");
std::fs::rename(path, old_path)?;
if let Err(swap_err) = std::fs::rename(new_path, path) {
if let Err(rb) = std::fs::rename(old_path, path) {
tracing::error!(
"rebuild swap failed AND rollback failed: could not restore {} → {}: {rb}",
old_path.display(),
path.display()
);
let _ = std::fs::remove_dir_all(new_path);
return Err(Error::Storage(format!(
"DB CORRUPTED: rebuild swap failed (swap_err: {swap_err}) and rollback also \
failed (rb: {rb}). Original DB is at {} (manual recovery: \
`mv {} {}`).",
old_path.display(),
old_path.display(),
path.display(),
)));
}
return Err(swap_err.into());
}
Ok(())
}
pub(super) fn sweep_rebuild_debris(path: &Path) {
let parent = match path.parent() {
Some(p) => p,
None => return,
};
let file = match path.file_name().and_then(|s| s.to_str()) {
Some(f) => f,
None => return,
};
let new_path = parent.join(format!("{file}.new"));
if new_path.exists()
&& let Err(e) = std::fs::remove_dir_all(&new_path)
{
tracing::warn!(
"could not sweep rebuild debris at {}: {e}",
new_path.display()
);
}
let path_has_healthy_db = path.join("CURRENT").is_file();
if !path_has_healthy_db {
let old_path = parent.join(format!("{file}.old"));
if old_path.exists() {
tracing::warn!(
"preserving recovery dir at {} because {} is empty/missing — \
manual recovery: `mv {} {}`",
old_path.display(),
path.display(),
old_path.display(),
path.display()
);
}
return;
}
let old_path = parent.join(format!("{file}.old"));
if old_path.exists()
&& let Err(e) = std::fs::remove_dir_all(&old_path)
{
tracing::warn!(
"could not sweep rebuild debris at {}: {e}",
old_path.display()
);
}
}
impl Storage {
pub fn open_unverified(repo_id: RepoId, base: &Path) -> Result<Self> {
Self::open_inner(repo_id, base, None)
}
pub fn open_at(repo_id: RepoId, root_path: &Path, base: &Path) -> Result<Self> {
let caller_root = root_path.to_string_lossy().into_owned();
Self::open_inner(repo_id, base, Some(caller_root))
}
pub(super) fn open_inner(
repo_id: RepoId,
base: &Path,
root_path: Option<String>,
) -> Result<Self> {
debug_assert!(
RepoId::is_valid(repo_id.as_str()),
"RepoId fed into open_inner is not 24-hex: {:?}",
repo_id.as_str()
);
let db_path: PathBuf = base.join("workspace_folders").join(repo_id.as_str());
sweep_rebuild_debris(&db_path);
std::fs::create_dir_all(&db_path)?;
let db = DbInstance::new(ENGINE, &db_path, "{}").map_err(map_err)?;
for stmt in PER_REPO_DDL {
run_idempotent(&db, stmt)?;
}
let current = crate::schema::SCHEMA_VERSION_STR;
let read = match db.run_script(
"?[v] := *Meta{key: \"schema_version\", value: v}",
BTreeMap::new(),
ScriptMutability::Immutable,
) {
Ok(r) => Some(r),
Err(e) => {
tracing::warn!(
"could not read schema_version from {} ({e}); treating as fresh DB",
db_path.display()
);
None
}
};
let mismatch = read
.as_ref()
.and_then(|r| r.rows.first())
.and_then(|row| row.first())
.and_then(|v| match v {
DataValue::Str(s) if s.as_str() != current => Some(s.to_string()),
_ => None,
});
if let Some(stored) = mismatch {
let stored_v: u32 = stored.parse().map_err(|_| {
Error::Storage(format!(
"schema version meta is corrupted at {}: \
stored value {stored:?} is not a valid u32. \
Manual recovery: inspect Meta{{key:\"schema_version\"}} \
and either stamp the correct value or delete the DB.",
db_path.display()
))
})?;
let current_v: u32 = current.parse().unwrap_or(0);
if stored_v > current_v {
return Err(map_err(format!(
"DB at {} has schema version {stored} which is newer than \
this binary's version {current}. Upgrade ckg to >= v{stored} \
or delete the DB and re-index.",
db_path.display()
)));
}
tracing::warn!(
"schema version mismatch (on-disk: {stored}, binary: {current}); \
auto-rebuilding {}",
db_path.display()
);
drop(db);
let new_db = rebuild_at_path(&db_path, ¤t)?;
if let Some(rp) = &root_path {
stamp_root_path(&new_db, rp)?;
}
stamp_needs_reindex(&new_db, true)?;
return Ok(Self {
repo_id,
db_path,
db: new_db,
});
}
stamp_schema_version(&db, ¤t)?;
if let Some(caller_root) = root_path {
match read_root_path(&db) {
RootPathProbe::Recorded(stored) if stored != caller_root => {
if paths_canonicalize_equal(&stored, &caller_root) {
let canon = canonical_or_input(&caller_root);
if stored != canon {
tracing::warn!(
"{}: recorded root_path {stored:?} differs from caller \
{caller_root:?} byte-for-byte but canonicalizes equal — \
re-stamping with canonical shape {canon:?} (CR-M-7 auto-migrate)",
db_path.display()
);
stamp_root_path(&db, &canon)?;
}
} else {
return Err(Error::Storage(format!(
"RepoId collision detected at {}: DB recorded root_path \
{stored:?} but caller passed {caller_root:?}. Refusing to \
silently merge two repos. Recover by removing the per-repo \
DB and re-indexing the correct repo.",
db_path.display()
)));
}
}
RootPathProbe::Recorded(_) => { }
RootPathProbe::NoRow => {
stamp_root_path(&db, &caller_root)?;
}
RootPathProbe::ReadFailed(e) => {
return Err(Error::Storage(format!(
"could not verify recorded root_path at {} (Meta read failed: {e}); \
refusing to open under collision-detection contract. Re-run with \
`Storage::open_unverified` if you intend to bypass I2 protection.",
db_path.display()
)));
}
}
}
if read_meta_bool(&db, "index_in_progress") {
tracing::warn!(
"{}: previous index run did not complete cleanly — \
stamping needs_reindex=true. The :put indexer upserts \
rows by id, so plain `ckg index` would NOT evict \
orphan rows from the partial run. Re-run \
`ckg index --clean` to repopulate from a clean state.",
db_path.display()
);
stamp_needs_reindex(&db, true)?;
stamp_meta_bool(&db, "index_in_progress", false)?;
}
Ok(Self {
repo_id,
db_path,
db,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use ckg_core::RepoId;
use tempfile::tempdir;
#[test]
fn schema_downgrade_is_refused() {
let base = tempdir().unwrap();
let repo_id = RepoId::try_new("aabbccddeeff00112233aabb").unwrap();
let storage = Storage::open_unverified(repo_id.clone(), base.path()).unwrap();
let future_version = (crate::schema::SCHEMA_VERSION + 1).to_string();
stamp_schema_version(storage.db(), &future_version).unwrap();
drop(storage);
let result = Storage::open_unverified(repo_id, base.path());
assert!(result.is_err(), "open must refuse when stored schema version > binary version");
let msg = result.err().unwrap().to_string();
assert!(
msg.contains("newer than") || msg.contains("schema version"),
"error must reference schema version mismatch; got: {msg}"
);
assert!(
msg.contains(&future_version),
"error must include stored version number; got: {msg}"
);
}
#[test]
fn absent_root_path_returns_no_row() {
let base = tempdir().unwrap();
let repo_id = RepoId::try_new("cafebabe0011223344556677").unwrap();
let storage = Storage::open_unverified(repo_id, base.path()).unwrap();
let db = storage.db();
let probe = read_root_path(db);
assert!(
matches!(probe, RootPathProbe::NoRow),
"fresh DB must return NoRow before root_path is stamped"
);
}
#[test]
fn stamped_root_path_returns_recorded() {
let base = tempdir().unwrap();
let repo_id = RepoId::try_new("aabb00112233445566778899").unwrap();
let storage = Storage::open_unverified(repo_id, base.path()).unwrap();
let db = storage.db();
stamp_root_path(db, "/my/repo").unwrap();
let probe = read_root_path(db);
match probe {
RootPathProbe::Recorded(s) => assert_eq!(s, "/my/repo"),
other => panic!("expected Recorded, got {other:?}"),
}
}
#[test]
fn corrupted_schema_version_is_refused() {
let base = tempdir().unwrap();
let repo_id = RepoId::try_new("deadbeefcafe001122334455").unwrap();
let storage = Storage::open_unverified(repo_id.clone(), base.path()).unwrap();
stamp_schema_version(storage.db(), "not_a_number").unwrap();
drop(storage);
let result = Storage::open_unverified(repo_id, base.path());
assert!(result.is_err(), "open must refuse on corrupted (non-numeric) schema_version");
let msg = result.err().unwrap().to_string();
assert!(
msg.contains("corrupted") || msg.contains("not a valid"),
"error must mention schema version corruption; got: {msg}"
);
assert!(
msg.contains("not_a_number"),
"error must echo the bad stored value; got: {msg}"
);
}
}