use std::collections::BTreeMap;
use std::path::{Path, PathBuf};
use ckg_core::{Error, Result};
use cozo::{DataValue, DbInstance, ScriptMutability};
use self::meta::{read_meta_bool, stamp_meta_bool, stamp_needs_reindex};
mod embed;
mod insert;
mod lifecycle;
mod meta;
mod registry;
mod resolve;
pub use embed::STORAGE_EMBED_DIM;
pub use registry::RegistryStorage;
pub(super) fn map_err(e: impl std::fmt::Display) -> Error {
Error::Storage(e.to_string())
}
pub struct Storage {
pub(super) repo_id: ckg_core::RepoId,
pub(super) db_path: PathBuf,
pub(super) db: DbInstance,
}
impl Storage {
pub fn repo_id(&self) -> &ckg_core::RepoId {
&self.repo_id
}
pub fn db_path(&self) -> &Path {
&self.db_path
}
pub fn recorded_root_path(&self) -> Result<Option<String>> {
let rows = self
.db
.run_script(
"?[v] := *Meta{key: \"root_path\", value: v}",
BTreeMap::new(),
ScriptMutability::Immutable,
)
.map_err(map_err)?;
Ok(rows.rows.first().and_then(|r| r.first()).and_then(|v| match v {
DataValue::Str(s) => Some(s.to_string()),
_ => None,
}))
}
pub fn db(&self) -> &DbInstance {
&self.db
}
pub fn run_mutable_unchecked(&self, script: &str) -> Result<cozo::NamedRows> {
self.db
.run_script(script, BTreeMap::new(), ScriptMutability::Mutable)
.map_err(map_err)
}
pub fn run_immutable(&self, script: &str) -> Result<cozo::NamedRows> {
self.db
.run_script(script, BTreeMap::new(), ScriptMutability::Immutable)
.map_err(map_err)
}
pub fn run_with(
&self,
script: &str,
params: BTreeMap<String, DataValue>,
) -> Result<cozo::NamedRows> {
self.db
.run_script(script, params, ScriptMutability::Mutable)
.map_err(map_err)
}
pub fn run_with_immutable(
&self,
script: &str,
params: BTreeMap<String, DataValue>,
) -> Result<cozo::NamedRows> {
self.db
.run_script(script, params, ScriptMutability::Immutable)
.map_err(map_err)
}
pub fn needs_reindex(&self) -> bool {
read_meta_bool(&self.db, "needs_reindex")
}
pub fn mark_index_in_progress(&self) -> Result<()> {
stamp_meta_bool(&self.db, "index_in_progress", true)
}
pub fn is_index_in_progress(&self) -> bool {
read_meta_bool(&self.db, "index_in_progress")
}
pub fn mark_indexed(&self) -> Result<()> {
stamp_needs_reindex(&self.db, false)?;
stamp_meta_bool(&self.db, "index_in_progress", false)?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use ckg_core::{EdgeKind, Kind, RepoId};
use cozo::DataValue;
use tempfile::tempdir;
use crate::store::lifecycle::sweep_rebuild_debris;
fn make_sym(id: &str, qname: &str) -> ckg_core::Symbol {
ckg_core::Symbol {
id: id.into(),
qname: qname.into(),
name: qname.split("::").last().unwrap_or(qname).into(),
kind: Kind::Function,
file: "x.rs".into(),
line: 1,
col: 0,
is_public: true,
doc: String::new(),
hash: "h".into(),
}
}
#[test]
fn open_and_idempotent() {
let dir = tempdir().unwrap();
let id = RepoId::try_new("aaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
{
let _s1 = Storage::open_unverified(id.clone(), dir.path()).unwrap();
}
let _s2 = Storage::open_unverified(id, dir.path()).unwrap();
}
#[test]
fn put_symbols_and_edges() {
let dir = tempdir().unwrap();
let id = RepoId::try_new("bbbbbbbbbbbbbbbbbbbbbbbb").unwrap();
let st = Storage::open_unverified(id, dir.path()).unwrap();
let syms = vec![make_sym("a", "Foo::bar"), make_sym("b", "Baz::qux")];
st.put_symbols(&syms).unwrap();
let edges = vec![ckg_core::Edge {
kind: EdgeKind::Calls,
src: "a".into(),
dst: "b".into(),
confidence: 1.0,
}];
st.put_edges(&edges).unwrap();
let rows = st.run_mutable_unchecked("?[id] := *Symbol{id}").unwrap();
assert_eq!(rows.rows.len(), 2);
let edges = st.run_mutable_unchecked("?[s, d] := *Calls{src: s, dst: d}").unwrap();
assert_eq!(edges.rows.len(), 1);
}
#[test]
fn crashed_run_promotes_needs_reindex() {
let dir = tempdir().unwrap();
let id = RepoId::try_new("cccccccccccccccccccccccc").unwrap();
{
let st = Storage::open_unverified(id.clone(), dir.path()).unwrap();
st.mark_index_in_progress().unwrap();
assert!(st.is_index_in_progress());
}
let st = Storage::open_unverified(id, dir.path()).unwrap();
assert!(
st.needs_reindex(),
"open after crashed run must set needs_reindex=true"
);
assert!(
!st.is_index_in_progress(),
"open must clear the in-progress flag (one-shot recovery)"
);
}
#[test]
fn mark_indexed_clears_both_sentinels() {
let dir = tempdir().unwrap();
let id = RepoId::try_new("dddddddddddddddddddddddd").unwrap();
let st = Storage::open_unverified(id, dir.path()).unwrap();
st.mark_index_in_progress().unwrap();
st.mark_indexed().unwrap();
assert!(!st.needs_reindex());
assert!(!st.is_index_in_progress());
}
#[test]
fn open_at_canonicalize_equal_path_auto_migrates() {
let outer = tempdir().unwrap();
let real = outer.path().join("real_repo");
std::fs::create_dir_all(&real).unwrap();
let alias = outer.path().join("alias_repo");
#[cfg(unix)]
std::os::unix::fs::symlink(&real, &alias).unwrap();
#[cfg(not(unix))]
{
return;
}
let base = tempdir().unwrap();
let id = RepoId::try_new("eeeeeeeeeeeeeeeeeeeeeeee").unwrap();
{
let _ = Storage::open_at(id.clone(), &alias, base.path()).unwrap();
}
let _ = Storage::open_at(id, &real, base.path())
.expect("symlink-equivalent path must auto-migrate, not collide");
}
#[test]
fn open_at_canonicalize_fails_falls_through_to_collision() {
let outer = tempdir().unwrap();
let real = outer.path().join("real_repo");
std::fs::create_dir_all(&real).unwrap();
let base = tempdir().unwrap();
let id = RepoId::try_new("ffffffffffffffffffffffff").unwrap();
{
let _ = Storage::open_at(id.clone(), &real, base.path()).unwrap();
}
std::fs::remove_dir_all(&real).unwrap();
let other = outer.path().join("other_repo");
std::fs::create_dir_all(&other).unwrap();
match Storage::open_at(id, &other, base.path()) {
Ok(_) => panic!("must fail closed when stored path can't canonicalize"),
Err(e) => {
let msg = e.to_string();
assert!(
msg.contains("RepoId collision"),
"expected collision error, got: {msg}"
);
}
}
}
#[test]
fn open_at_distinct_repos_still_collide() {
let outer = tempdir().unwrap();
let repo_a = outer.path().join("repo_a");
let repo_b = outer.path().join("repo_b");
std::fs::create_dir_all(&repo_a).unwrap();
std::fs::create_dir_all(&repo_b).unwrap();
let base = tempdir().unwrap();
let id = RepoId::try_new("0123456789abcdef01234567").unwrap();
{
let _ = Storage::open_at(id.clone(), &repo_a, base.path()).unwrap();
}
match Storage::open_at(id, &repo_b, base.path()) {
Ok(_) => panic!("two distinct repos must surface collision error"),
Err(e) => {
let msg = e.to_string();
assert!(
msg.contains("RepoId collision"),
"expected collision error, got: {msg}"
);
}
}
}
#[test]
fn registry_open() {
let dir = tempdir().unwrap();
let _r = RegistryStorage::open(dir.path()).unwrap();
}
#[test]
fn open_at_refuses_root_path_mismatch() {
let dir = tempdir().unwrap();
let id = RepoId::try_new("cccccccccccccccccccccccc").unwrap();
let root_a = std::path::Path::new("/tmp/ckg-test-repo-A");
let root_b = std::path::Path::new("/tmp/ckg-test-repo-B");
{
let _s = Storage::open_at(id.clone(), root_a, dir.path()).unwrap();
}
{
let _s = Storage::open_at(id.clone(), root_a, dir.path()).unwrap();
}
let res = Storage::open_at(id.clone(), root_b, dir.path());
match res {
Ok(_) => panic!("expected collision error, got Ok"),
Err(e) => assert!(
e.to_string().contains("RepoId collision detected"),
"expected collision error; got {e}"
),
}
}
#[test]
fn open_without_root_skips_verification() {
let dir = tempdir().unwrap();
let id = RepoId::try_new("dddddddddddddddddddddddd").unwrap();
{
let _s = Storage::open_at(
id.clone(),
std::path::Path::new("/tmp/some/root"),
dir.path(),
)
.unwrap();
}
let _s = Storage::open_unverified(id, dir.path()).unwrap();
}
#[test]
fn sweep_preserves_old_when_path_lacks_current_file() {
let dir = tempdir().unwrap();
let workspace = dir
.path()
.join("workspace_folders")
.join("recovery_test_id");
std::fs::create_dir_all(&workspace).unwrap();
std::fs::write(workspace.join("LOG"), b"stale rocksdb log").unwrap();
let old_path = workspace.with_extension("old");
std::fs::create_dir_all(&old_path).unwrap();
std::fs::write(old_path.join("CURRENT"), b"MANIFEST-000001\n").unwrap();
std::fs::write(old_path.join("MANIFEST-000001"), b"").unwrap();
sweep_rebuild_debris(&workspace);
assert!(
old_path.exists(),
"sweep destroyed the recovery dir at {} despite <path> being unhealthy",
old_path.display()
);
assert!(
old_path.join("CURRENT").is_file(),
"recovery dir contents lost"
);
}
#[test]
fn sweep_removes_old_when_path_has_current() {
let dir = tempdir().unwrap();
let workspace = dir.path().join("workspace_folders").join("healthy_path");
std::fs::create_dir_all(&workspace).unwrap();
std::fs::write(workspace.join("CURRENT"), b"MANIFEST-000001\n").unwrap();
let old_path = workspace.with_extension("old");
std::fs::create_dir_all(&old_path).unwrap();
std::fs::write(old_path.join("LOG"), b"stale").unwrap();
sweep_rebuild_debris(&workspace);
assert!(
!old_path.exists(),
"healthy <path> should have triggered .old sweep"
);
}
#[test]
fn open_at_stamp_then_verify_round_trip() {
let dir = tempdir().unwrap();
let id = RepoId::try_new("01101011001011001011001a").unwrap();
let root = std::path::Path::new("/tmp/ckg-ni1-test");
{
let _ = Storage::open_at(id.clone(), root, dir.path()).unwrap();
}
{
let _ = Storage::open_at(id.clone(), root, dir.path()).unwrap();
}
let mismatch_root = std::path::Path::new("/tmp/ckg-ni1-other");
let res = Storage::open_at(id, mismatch_root, dir.path());
match res {
Ok(_) => panic!("should have refused mismatched root"),
Err(e) => {
let m = e.to_string();
assert!(
m.contains("RepoId collision detected"),
"expected collision error, got: {m}"
);
}
}
}
#[test]
fn resolve_cross_file_calls_rewrites_unique_bare_name() {
let dir = tempdir().unwrap();
let id = RepoId::try_new("eeeeeeeeeeeeeeeeeeeeeeee").unwrap();
let st = Storage::open_unverified(id, dir.path()).unwrap();
st.put_symbols(&[
ckg_core::Symbol {
id: "src_id".into(),
qname: "Caller::go".into(),
name: "go".into(),
kind: Kind::Function,
file: "a.rs".into(),
line: 1,
col: 0,
is_public: true,
doc: String::new(),
hash: "h1".into(),
},
ckg_core::Symbol {
id: "tgt_id".into(),
qname: "Target::run".into(),
name: "uniq_target".into(),
kind: Kind::Function,
file: "b.rs".into(),
line: 1,
col: 0,
is_public: true,
doc: String::new(),
hash: "h2".into(),
},
])
.unwrap();
st.put_edges(&[ckg_core::Edge {
kind: EdgeKind::Calls,
src: "src_id".into(),
dst: "uniq_target".into(), confidence: 0.5,
}])
.unwrap();
let rewritten = st.resolve_cross_file_calls().unwrap();
assert_eq!(rewritten, 1, "expected exactly one rewrite");
let rows = st.run_mutable_unchecked("?[s, d] := *Calls{src: s, dst: d}").unwrap();
let dst = rows
.rows
.first()
.and_then(|r| r.get(1))
.and_then(|v| match v {
DataValue::Str(s) => Some(s.to_string()),
_ => None,
})
.unwrap();
assert_eq!(dst, "tgt_id", "dst should rewrite to target id");
}
}