use std::fs;
use std::io;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};
use crate::atomic::{cleanup_orphan_temps, write_atomic};
use crate::blob_store::BlobStore;
use crate::clock::now_unix;
use crate::commit::CommitStore;
use crate::encryption::{EncryptionConfig, EncryptionRuntime};
use crate::hash::Hash;
use crate::index::SearchIndexStore;
use crate::lock::acquire_lock;
use crate::manifest::ManifestStore;
use crate::object_store::ObjectStore;
use crate::refs::RefsStore;
use crate::state::StateStore;
use crate::wal::{Wal, WalRecoveryReport};
const DB_CONFIG_SCHEMA_VERSION: u32 = 3;
const DEFAULT_CAS_RETRIES: usize = 16;
#[derive(Debug, Clone)]
pub struct Database {
pub root: PathBuf,
pub blob_store: BlobStore,
pub object_store: ObjectStore,
pub manifest_store: ManifestStore,
pub state_store: StateStore,
pub commit_store: CommitStore,
pub index_store: SearchIndexStore,
pub refs: RefsStore,
pub wal: Wal,
pub config: Config,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct Config {
pub schema_version: u32,
pub hashing: String,
pub created_at: u64,
pub verify_on_read: bool,
#[serde(default)]
pub compression: Option<String>,
#[serde(default)]
pub encryption: Option<EncryptionConfig>,
}
impl Database {
pub fn init(path: impl AsRef<Path>) -> Result<()> {
let root = path.as_ref();
fs::create_dir_all(root).with_context(|| format!("failed creating {}", root.display()))?;
fs::create_dir_all(root.join("blobs"))?;
fs::create_dir_all(root.join("objects"))?;
fs::create_dir_all(root.join("refs").join("heads"))?;
fs::create_dir_all(root.join("refs").join("states"))?;
fs::create_dir_all(root.join("index"))?;
fs::create_dir_all(root.join("wal"))?;
fs::create_dir_all(root.join("meta"))?;
let cfg_path = root.join("meta").join("config.json");
if !cfg_path.exists() {
let cfg = Config {
schema_version: DB_CONFIG_SCHEMA_VERSION,
hashing: "blake3".into(),
created_at: now_unix()?,
verify_on_read: false,
compression: None,
encryption: None,
};
let bytes = serde_json::to_vec_pretty(&cfg)?;
write_atomic(&cfg_path, &bytes)?;
}
let db = Self::open(root)?;
let _ = db.state_store.empty_root()?;
Ok(())
}
pub fn open(path: impl AsRef<Path>) -> Result<Self> {
let root = path.as_ref().to_path_buf();
let cfg_path = root.join("meta").join("config.json");
if !cfg_path.exists() {
return Err(anyhow::anyhow!(
"database not initialized at {}",
root.display()
));
}
let _recovery_lock = acquire_lock(
root.join("meta").join("recovery.lock"),
Duration::from_secs(10),
)?;
let mut config = load_config(&cfg_path)?;
let mut config_dirty = false;
if let Some(enc) = config.encryption.as_mut()
&& enc.enabled
&& crate::encryption::ensure_master_salt(enc)?
{
config_dirty = true;
}
if config_dirty {
let bytes = serde_json::to_vec_pretty(&config)?;
write_atomic(&cfg_path, &bytes)?;
}
let encryption = build_encryption_runtime(&config)?;
let compress = config.compression.as_deref() == Some("zstd");
let blob_store = BlobStore::with_runtime_options(
root.join("blobs"),
config.verify_on_read,
compress,
encryption.clone(),
);
let object_store = ObjectStore::with_runtime_options(
root.join("objects"),
config.verify_on_read,
compress,
encryption.clone(),
);
let wal = Wal::new(root.join("wal"));
let refs = RefsStore::new(root.join("refs"), wal.clone());
let state_store = StateStore::new(object_store.clone(), blob_store.clone(), wal.clone());
let manifest_store = ManifestStore::new(object_store.clone());
let commit_store = CommitStore::new(object_store.clone());
let index_store =
SearchIndexStore::with_encryption(root.join("index"), encryption.clone());
blob_store.ensure_dir()?;
object_store.ensure_dir()?;
wal.ensure_dir()?;
refs.ensure_dirs()?;
index_store.ensure_dir()?;
let _report: WalRecoveryReport = wal.recover_refs(refs.root())?;
let _ = cleanup_orphan_temps(&root.join("blobs"), true)?;
let _ = cleanup_orphan_temps(&root.join("objects"), true)?;
let _ = cleanup_orphan_temps(&root.join("refs").join("heads"), true)?;
let _ = cleanup_orphan_temps(&root.join("refs").join("states"), true)?;
let _ = cleanup_orphan_temps(&root.join("meta"), false)?;
Ok(Self {
root,
blob_store,
object_store,
manifest_store,
state_store,
commit_store,
index_store,
refs,
wal,
config,
})
}
pub fn resolve_state_root(&self, head: &str) -> Result<Hash> {
if let Some(staged) = self.refs.state_get(head)? {
return Ok(staged);
}
if let Some(commit_hash) = self.refs.head_get(head)? {
let commit = self.commit_store.get_commit(commit_hash)?;
return Ok(commit.state_root);
}
self.state_store.empty_root()
}
pub fn state_set_at_head(&self, head: &str, key: &[u8], value: &[u8]) -> Result<Hash> {
self.apply_state_update_with_cas(head, |base_root| {
self.state_store.set(base_root, key, value)
})
}
pub fn state_del_at_head(&self, head: &str, key: &[u8]) -> Result<Hash> {
self.apply_state_update_with_cas(head, |base_root| self.state_store.del(base_root, key))
}
pub fn state_compact_at_head(&self, head: &str) -> Result<Hash> {
self.apply_state_update_with_cas(head, |base_root| self.state_store.compact(base_root))
}
pub fn state_set_many_at_head(&self, head: &str, pairs: &[(&[u8], &[u8])]) -> Result<Hash> {
self.apply_state_update_with_cas(head, |base_root| {
self.state_store.set_many(base_root, pairs)
})
}
pub fn state_del_many_at_head(&self, head: &str, keys: &[&[u8]]) -> Result<Hash> {
self.apply_state_update_with_cas(head, |base_root| {
self.state_store.del_many(base_root, keys)
})
}
pub fn create_commit_at_head(
&self,
head: &str,
author: &str,
message: &str,
manifests: Vec<Hash>,
) -> Result<Hash> {
for _ in 0..DEFAULT_CAS_RETRIES {
let parent = self.refs.head_get(head)?;
let parents = parent.into_iter().collect::<Vec<_>>();
let staged_before = self.refs.state_get(head)?;
let state_root = match staged_before {
Some(s) => s,
None => match parent {
Some(commit_hash) => self.commit_store.get_commit(commit_hash)?.state_root,
None => self.state_store.empty_root()?,
},
};
let candidate = self.commit_store.create_commit(
parents,
state_root,
manifests.clone(),
author.to_string(),
message.to_string(),
)?;
if !self
.refs
.state_compare_and_set(head, staged_before, state_root)?
{
let parent_now = self.refs.head_get(head)?;
if parent_now != parent {
continue;
}
return Err(anyhow::anyhow!(
"concurrent state advance on head '{}': staged state changed during commit; \
refusing to overwrite. Retry the commit against the updated state.",
head
));
}
if self.refs.head_compare_and_set(head, parent, candidate)? {
return Ok(candidate);
}
}
Err(anyhow::anyhow!(
"concurrent update contention while creating commit for '{}'",
head
))
}
pub fn ensure_index_ready(&self, commit: Hash) -> Result<()> {
if self.index_store.read_index(commit).is_ok() {
return Ok(());
}
let _ = self.index_store.build_for_head(
commit,
&self.commit_store,
&self.manifest_store,
&self.blob_store,
)?;
Ok(())
}
fn resolve_base_root_for_state_update(
&self,
head: &str,
expected_state: Option<Hash>,
) -> Result<Hash> {
if let Some(root) = expected_state {
return Ok(root);
}
if let Some(commit_hash) = self.refs.head_get(head)? {
let commit = self.commit_store.get_commit(commit_hash)?;
return Ok(commit.state_root);
}
self.state_store.empty_root()
}
fn apply_state_update_with_cas<F>(&self, head: &str, mut op: F) -> Result<Hash>
where
F: FnMut(Hash) -> Result<Hash>,
{
for _ in 0..DEFAULT_CAS_RETRIES {
let expected_state = self.refs.state_get(head)?;
let base_root = self.resolve_base_root_for_state_update(head, expected_state)?;
let new_root = op(base_root)?;
if self
.refs
.state_compare_and_set(head, expected_state, new_root)?
{
return Ok(new_root);
}
}
Err(anyhow::anyhow!(
"concurrent state update contention on head '{}', retry command",
head
))
}
pub fn rotate_encryption_key(&self, new_password: &str) -> Result<usize> {
let enc_config = self
.config
.encryption
.as_ref()
.filter(|e| e.enabled)
.ok_or_else(|| anyhow::anyhow!("encryption is not enabled; nothing to rotate"))?;
if new_password.is_empty() {
return Err(anyhow::anyhow!("new password cannot be empty"));
}
let old_runtime = self
.encryption_runtime()
.ok_or_else(|| {
anyhow::anyhow!(
"rotate_encryption_key needs the current runtime; \
set NELEUS_DB_ENCRYPTION_PASSWORD"
)
})?;
let new_runtime = Arc::new(EncryptionRuntime::from_config(
enc_config.clone(),
new_password.to_string(),
)?);
let _rotation_lock = acquire_lock(
self.root.join("meta").join("rotation.lock"),
Duration::from_secs(30),
)?;
let mut count = 0usize;
for dir in [self.root.join("blobs"), self.root.join("objects")] {
count += reencrypt_cas_dir(&dir, &old_runtime, &new_runtime)?;
}
let cfg_path = self.root.join("meta").join("config.json");
let bytes = serde_json::to_vec_pretty(&self.config)?;
crate::atomic::write_atomic(&cfg_path, &bytes)?;
Ok(count)
}
fn encryption_runtime(&self) -> Option<Arc<EncryptionRuntime>> {
let enc = self.config.encryption.as_ref().filter(|e| e.enabled)?;
let password = std::env::var("NELEUS_DB_ENCRYPTION_PASSWORD").ok()?;
EncryptionRuntime::from_config(enc.clone(), password)
.ok()
.map(Arc::new)
}
}
pub fn init(path: impl AsRef<Path>) -> Result<()> {
Database::init(path)
}
pub fn open(path: impl AsRef<Path>) -> Result<Database> {
Database::open(path)
}
fn load_config(cfg_path: &Path) -> Result<Config> {
let raw = fs::read(cfg_path)
.with_context(|| format!("failed to read config {}", cfg_path.display()))?;
let cfg: Config = serde_json::from_slice(&raw)
.with_context(|| format!("failed to parse config {}", cfg_path.display()))?;
if cfg.schema_version != DB_CONFIG_SCHEMA_VERSION {
return Err(anyhow::anyhow!(
"unsupported config schema_version {} (expected {})",
cfg.schema_version,
DB_CONFIG_SCHEMA_VERSION
));
}
Ok(cfg)
}
fn build_encryption_runtime(config: &Config) -> Result<Option<Arc<EncryptionRuntime>>> {
let Some(enc) = &config.encryption else {
return Ok(None);
};
if !enc.enabled {
return Ok(None);
}
let password = std::env::var("NELEUS_DB_ENCRYPTION_PASSWORD").with_context(
|| "encryption is enabled in config but NELEUS_DB_ENCRYPTION_PASSWORD is not set",
)?;
let runtime = EncryptionRuntime::from_config(enc.clone(), password)?;
Ok(Some(Arc::new(runtime)))
}
fn reencrypt_cas_dir(
dir: &Path,
old_runtime: &Arc<EncryptionRuntime>,
new_runtime: &Arc<EncryptionRuntime>,
) -> Result<usize> {
let mut count = 0usize;
let entries = match fs::read_dir(dir) {
Ok(e) => e,
Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(0),
Err(e) => return Err(e.into()),
};
for entry in entries {
let entry = entry?;
let path = entry.path();
if path.is_dir() {
count += reencrypt_cas_dir(&path, old_runtime, new_runtime)?;
continue;
}
let is_content = path
.file_name()
.and_then(|n| n.to_str())
.map(|n| n.len() == 64 && n.chars().all(|c| c.is_ascii_hexdigit()))
.unwrap_or(false);
if !is_content {
continue;
}
let raw = fs::read(&path)?;
let plaintext = match old_runtime.decrypt(&raw) {
Ok(p) => p,
Err(_) => {
if new_runtime.decrypt(&raw).is_ok() {
continue;
}
return Err(anyhow::anyhow!(
"rotation aborted: {} decrypts with neither old nor new key (likely corrupted)",
path.display()
));
}
};
let new_ciphertext = new_runtime.encrypt(&plaintext)?;
crate::atomic::write_atomic(&path, &new_ciphertext)?;
count += 1;
}
Ok(count)
}
#[cfg(test)]
mod tests {
use std::fs;
use tempfile::TempDir;
use super::*;
use crate::hash::hash_blob;
use crate::wal::{WalEntry, WalOp, WalPayload};
#[test]
fn init_creates_expected_layout() {
let tmp = TempDir::new().unwrap();
let db_root = tmp.path().join("neleus_db");
Database::init(&db_root).unwrap();
assert!(db_root.join("blobs").exists());
assert!(db_root.join("objects").exists());
assert!(db_root.join("refs").join("heads").exists());
assert!(db_root.join("index").exists());
assert!(db_root.join("wal").exists());
assert!(db_root.join("meta").join("config.json").exists());
}
#[test]
fn open_after_init_works() {
let tmp = TempDir::new().unwrap();
let db_root = tmp.path().join("neleus_db");
Database::init(&db_root).unwrap();
let db = Database::open(&db_root).unwrap();
assert_eq!(db.root, db_root);
}
#[test]
fn open_fails_without_init() {
let tmp = TempDir::new().unwrap();
assert!(Database::open(tmp.path()).is_err());
}
#[test]
fn init_is_idempotent() {
let tmp = TempDir::new().unwrap();
let db_root = tmp.path().join("neleus_db");
Database::init(&db_root).unwrap();
Database::init(&db_root).unwrap();
assert!(db_root.join("meta").join("config.json").exists());
}
#[test]
fn config_is_valid_json() {
let tmp = TempDir::new().unwrap();
let db_root = tmp.path().join("neleus_db");
Database::init(&db_root).unwrap();
let raw = fs::read(db_root.join("meta").join("config.json")).unwrap();
let v: serde_json::Value = serde_json::from_slice(&raw).unwrap();
assert_eq!(v["hashing"], "blake3");
assert_eq!(v["schema_version"], DB_CONFIG_SCHEMA_VERSION);
}
#[test]
fn interrupted_temp_write_does_not_corrupt_refs() {
let tmp = TempDir::new().unwrap();
let db_root = tmp.path().join("neleus_db");
Database::init(&db_root).unwrap();
let db = Database::open(&db_root).unwrap();
let stable = hash_blob(b"stable-commit");
db.refs.head_set("main", stable).unwrap();
let tmp_ref = db_root
.join("refs")
.join("heads")
.join(".main.tmp-crash-simulated");
fs::write(&tmp_ref, format!("{}\n", hash_blob(b"partial-commit"))).unwrap();
let reopened = Database::open(&db_root).unwrap();
let head = reopened.refs.head_get("main").unwrap();
assert_eq!(head, Some(stable));
}
#[test]
fn wal_recovery_replays_pending_ref_update() {
let tmp = TempDir::new().unwrap();
let db_root = tmp.path().join("neleus_db");
Database::init(&db_root).unwrap();
let wal = Wal::new(db_root.join("wal"));
let hash = hash_blob(b"recovered-commit");
let entry = WalEntry {
schema_version: 1,
op: WalOp::RefHeadSet,
payload: WalPayload::RefUpdate {
name: "main".into(),
hash,
},
};
let _p = wal.begin_entry(&entry).unwrap();
let db = Database::open(&db_root).unwrap();
assert_eq!(db.refs.head_get("main").unwrap(), Some(hash));
assert!(db.wal.pending().unwrap().is_empty());
}
#[test]
fn wal_recovery_rolls_back_bad_entries() {
let tmp = TempDir::new().unwrap();
let db_root = tmp.path().join("neleus_db");
Database::init(&db_root).unwrap();
fs::write(db_root.join("wal").join("bad.wal"), b"not-cbor").unwrap();
let db = Database::open(&db_root).unwrap();
assert!(db.wal.pending().unwrap().is_empty());
}
#[test]
fn high_level_state_set_and_get_work() {
let tmp = TempDir::new().unwrap();
let db_root = tmp.path().join("neleus_db");
Database::init(&db_root).unwrap();
let db = Database::open(&db_root).unwrap();
let root = db.state_set_at_head("main", b"k", b"v").unwrap();
let read_root = db.resolve_state_root("main").unwrap();
assert_eq!(root, read_root);
assert_eq!(db.state_store.get(root, b"k").unwrap(), Some(b"v".to_vec()));
}
#[test]
fn open_cleans_orphan_temp_under_nested_ref_namespace() {
let tmp = TempDir::new().unwrap();
let db_root = tmp.path().join("neleus_db");
Database::init(&db_root).unwrap();
let nested_dir = db_root.join("refs").join("heads").join("feature");
fs::create_dir_all(&nested_dir).unwrap();
let orphan = nested_dir.join(format!(".foo.tmp-{}-1-0", i32::MAX as u32));
fs::write(&orphan, b"partial").unwrap();
assert!(orphan.exists());
let _db = Database::open(&db_root).unwrap();
assert!(!orphan.exists(), "nested orphan temp survived open");
}
#[test]
fn high_level_commit_updates_head() {
let tmp = TempDir::new().unwrap();
let db_root = tmp.path().join("neleus_db");
Database::init(&db_root).unwrap();
let db = Database::open(&db_root).unwrap();
let _ = db.state_set_at_head("main", b"k", b"v").unwrap();
let commit = db
.create_commit_at_head("main", "agent", "m1", vec![])
.unwrap();
assert_eq!(db.refs.head_get("main").unwrap(), Some(commit));
}
#[test]
fn commit_preserves_latest_staged_state_sequential() {
let tmp = TempDir::new().unwrap();
let db_root = tmp.path().join("neleus_db");
Database::init(&db_root).unwrap();
let db = Database::open(&db_root).unwrap();
let s1 = db.state_set_at_head("main", b"k", b"v1").unwrap();
let s2 = db.state_set_at_head("main", b"k", b"v2").unwrap();
assert_ne!(s1, s2);
let commit = db
.create_commit_at_head("main", "agent", "m", vec![])
.unwrap();
assert_eq!(db.refs.state_get("main").unwrap(), Some(s2));
let c = db.commit_store.get_commit(commit).unwrap();
assert_eq!(c.state_root, s2);
assert_eq!(
db.state_store.get(c.state_root, b"k").unwrap(),
Some(b"v2".to_vec())
);
}
#[test]
fn concurrent_staged_writes_never_silently_rolled_back() {
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
let tmp = TempDir::new().unwrap();
let db_root = tmp.path().join("neleus_db");
Database::init(&db_root).unwrap();
let db = Arc::new(Database::open(&db_root).unwrap());
let _ = db.state_set_at_head("main", b"k", b"0").unwrap();
let _ = db
.create_commit_at_head("main", "init", "init", vec![])
.unwrap();
let stop = Arc::new(AtomicBool::new(false));
let writer_db = Arc::clone(&db);
let writer_stop = Arc::clone(&stop);
let writer = std::thread::spawn(move || {
for i in 1..=150u32 {
if writer_stop.load(Ordering::Relaxed) {
break;
}
let v = i.to_string();
let _ = writer_db.state_set_at_head("main", b"k", v.as_bytes());
}
});
let watcher_db = Arc::clone(&db);
let watcher_stop = Arc::clone(&stop);
let watcher = std::thread::spawn(move || {
let mut last_seen: Option<u32> = None;
let mut violations: Vec<(u32, u32)> = Vec::new();
while !watcher_stop.load(Ordering::Relaxed) {
let n = match watcher_db.refs.state_get("main") {
Ok(Some(root)) => match watcher_db.state_store.get(root, b"k") {
Ok(Some(val)) => std::str::from_utf8(&val)
.ok()
.and_then(|s| s.parse::<u32>().ok()),
_ => None,
},
_ => None,
};
if let Some(n) = n {
if let Some(prev) = last_seen
&& n < prev
{
violations.push((prev, n));
}
last_seen = Some(n);
}
}
violations
});
for _ in 0..150 {
let _ = db.create_commit_at_head("main", "agent", "m", vec![]);
}
stop.store(true, Ordering::Relaxed);
writer.join().unwrap();
let violations = watcher.join().unwrap();
assert!(
violations.is_empty(),
"state ref moved backwards (rollback bug): {:?}",
violations
);
}
fn encryption_test_lock() -> std::sync::MutexGuard<'static, ()> {
static M: std::sync::Mutex<()> = std::sync::Mutex::new(());
M.lock().unwrap_or_else(|e| e.into_inner())
}
fn init_encrypted_db(path: &Path, password: &str) -> Database {
use crate::encryption::EncryptionConfig;
fs::create_dir_all(path.join("blobs")).unwrap();
fs::create_dir_all(path.join("objects")).unwrap();
fs::create_dir_all(path.join("refs").join("heads")).unwrap();
fs::create_dir_all(path.join("refs").join("states")).unwrap();
fs::create_dir_all(path.join("index")).unwrap();
fs::create_dir_all(path.join("wal")).unwrap();
fs::create_dir_all(path.join("meta")).unwrap();
let cfg = Config {
schema_version: DB_CONFIG_SCHEMA_VERSION,
hashing: "blake3".into(),
created_at: 0,
verify_on_read: false,
compression: None,
encryption: Some(EncryptionConfig {
enabled: true,
algorithm: "aes-256-gcm".into(),
..EncryptionConfig::default()
}),
};
write_atomic(
&path.join("meta").join("config.json"),
&serde_json::to_vec_pretty(&cfg).unwrap(),
)
.unwrap();
unsafe { std::env::set_var("NELEUS_DB_ENCRYPTION_PASSWORD", password) };
let db = Database::open(path).unwrap();
let _ = db.state_store.empty_root().unwrap();
db
}
#[test]
fn search_index_is_encrypted_on_disk_when_encryption_enabled() {
use crate::manifest::{ChunkingSpec, DocManifest};
let _guard = encryption_test_lock();
let tmp = TempDir::new().unwrap();
let db_root = tmp.path().join("neleus_db");
let db = init_encrypted_db(&db_root, "index-test-password");
let needle = b"SECRET-NEEDLE-XYZZY-PLAINTEXT-CHUNK";
let chunk_hash = db.blob_store.put(needle).unwrap();
let original_hash = db.blob_store.put(needle).unwrap();
let doc = DocManifest {
schema_version: 1,
source: "test".into(),
created_at: 0,
chunking: ChunkingSpec {
method: "fixed".into(),
chunk_size: needle.len(),
overlap: 0,
},
chunks: vec![chunk_hash],
original: original_hash,
};
let manifest_hash = db.manifest_store.put_manifest(&doc).unwrap();
let _ = db.state_set_at_head("main", b"seed", b"v").unwrap();
let commit = db
.create_commit_at_head("main", "agent", "m", vec![manifest_hash])
.unwrap();
db.ensure_index_ready(commit).unwrap();
let path = db_root
.join("index")
.join(commit.to_string())
.join("search_index.cbor");
let raw = fs::read(&path).unwrap();
assert!(
!raw.windows(needle.len()).any(|w| w == needle),
"plaintext chunk text leaked into on-disk search index"
);
let parsed = db.index_store.read_index(commit).unwrap();
assert!(parsed.chunks.iter().any(|c| c.chunk_hash == chunk_hash));
}
#[test]
fn rotate_encryption_key_preserves_round_trip() {
let _guard = encryption_test_lock();
let tmp = TempDir::new().unwrap();
let db_root = tmp.path().join("neleus_db");
let db = init_encrypted_db(&db_root, "old-password");
let h = db.blob_store.put(b"secret-payload").unwrap();
assert_eq!(db.blob_store.get(h).unwrap(), b"secret-payload");
let rotated = db.rotate_encryption_key("new-strong-password").unwrap();
assert!(rotated > 0, "expected at least one file rotated");
drop(db);
unsafe {
std::env::set_var("NELEUS_DB_ENCRYPTION_PASSWORD", "new-strong-password")
};
let db = Database::open(&db_root).unwrap();
assert_eq!(db.blob_store.get(h).unwrap(), b"secret-payload");
}
#[test]
fn rotate_encryption_key_aborts_on_corruption() {
use crate::cas::CasStore;
let _guard = encryption_test_lock();
let tmp = TempDir::new().unwrap();
let db_root = tmp.path().join("neleus_db");
let db = init_encrypted_db(&db_root, "old-password");
let h = db.blob_store.put(b"victim").unwrap();
let blob_path = CasStore::new(db_root.join("blobs")).path_for(h);
fs::write(&blob_path, b"this is not a valid envelope").unwrap();
let err = db.rotate_encryption_key("new-password").unwrap_err();
let msg = err.to_string();
assert!(
msg.contains("rotation aborted"),
"expected rotation-abort error, got: {msg}"
);
}
}