use std::fs;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use anyhow::{bail, Context, Result};
use serde::{Deserialize, Serialize};
use blake2::{Blake2b512, Digest};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Node {
pub id: String,
pub coll: String,
pub seq: u64,
pub data: serde_json::Value,
pub prev: Option<String>,
pub caused_by: Vec<String>,
pub ts: f64,
pub valid_from: Option<String>,
pub valid_to: Option<String>,
#[serde(skip_serializing_if = "String::is_empty", default)]
pub hash: String,
}
#[derive(Clone)]
pub struct Dek(pub [u8; 32]);
impl Dek {
pub fn from_tmk(tmk: &[u8; 32], salt: &[u8]) -> Self {
use sha2::{Sha256, Digest as _};
let mut h = Sha256::new();
h.update(tmk);
h.update(salt);
let result = h.finalize();
let mut key = [0u8; 32];
key.copy_from_slice(&result[..32]);
Dek(key)
}
}
fn blake2b(data: &[u8]) -> String {
let mut h = Blake2b512::new();
h.update(data);
hex::encode(&h.finalize()[..32]) }
fn dag_v3_enabled() -> bool {
std::env::var("NEDB_DAG_V3")
.map(|v| {
let v = v.trim();
v == "1" || v.eq_ignore_ascii_case("true")
|| v.eq_ignore_ascii_case("on")
|| v.eq_ignore_ascii_case("yes")
})
.unwrap_or(false)
}
fn encrypt(data: &[u8], dek: &Dek) -> Result<Vec<u8>> {
use aes_gcm::{Aes256Gcm, KeyInit, aead::{Aead, OsRng, rand_core::RngCore}};
let cipher = Aes256Gcm::new_from_slice(&dek.0)?;
let mut nonce_bytes = [0u8; 12];
OsRng.fill_bytes(&mut nonce_bytes);
let nonce = aes_gcm::Nonce::from(nonce_bytes);
let ciphertext = cipher.encrypt(&nonce, data)
.map_err(|e| anyhow::anyhow!("encrypt failed: {:?}", e))?;
let mut out = nonce_bytes.to_vec();
out.extend_from_slice(&ciphertext);
Ok(out)
}
fn decrypt(data: &[u8], dek: &Dek) -> Result<Vec<u8>> {
use aes_gcm::{Aes256Gcm, KeyInit, aead::Aead};
if data.len() < 12 { bail!("ciphertext too short"); }
let (nonce_bytes, ciphertext) = data.split_at(12);
let cipher = Aes256Gcm::new_from_slice(&dek.0)?;
let nonce = aes_gcm::Nonce::from_slice(nonce_bytes);
cipher.decrypt(nonce, ciphertext)
.map_err(|e| anyhow::anyhow!("decrypt failed: {:?}", e))
}
pub struct ObjectStore {
root: PathBuf,
dek: Option<Dek>,
mem: Option<Arc<dashmap::DashMap<String, Vec<u8>>>>,
seg: Option<crate::segment::SegmentStore>,
}
impl ObjectStore {
pub fn new(db_root: &Path, dek: Option<Dek>) -> Result<Self> {
let root = db_root.join("objects");
fs::create_dir_all(&root)
.context("create objects/ dir")?;
let seg = if dag_v3_enabled() {
Some(crate::segment::SegmentStore::open(&root)?)
} else {
None
};
Ok(Self { root, dek, mem: None, seg })
}
pub fn in_memory() -> Self {
Self {
root: PathBuf::from(":memory:"),
dek: None,
mem: Some(Arc::new(dashmap::DashMap::new())),
seg: None,
}
}
pub fn write(&self, node: &mut Node) -> Result<String> {
node.hash = String::new();
let raw = serde_json::to_vec(node)?;
let content = match &self.dek {
Some(dek) => encrypt(&raw, dek)?,
None => raw,
};
let hash = blake2b(&content);
if let Some(ref mem) = self.mem {
mem.entry(hash.clone()).or_insert_with(|| content);
} else if let Some(ref seg) = self.seg {
seg.put(&hash, &content)?;
} else {
let dir = self.root.join(&hash[..2]);
fs::create_dir_all(&dir)?;
let path = dir.join(&hash[2..]);
if !path.exists() {
let tmp = path.with_extension("tmp");
fs::write(&tmp, &content)?;
fs::rename(&tmp, &path)?;
}
}
node.hash = hash.clone();
Ok(hash)
}
pub fn read(&self, hash: &str) -> Result<Node> {
if hash.len() < 3 {
anyhow::bail!("invalid object hash (too short): {:?}", hash);
}
if let Some(ref mem) = self.mem {
let content = mem.get(hash)
.map(|v| v.clone())
.ok_or_else(|| anyhow::anyhow!("object not found in memory: {}", hash))?;
return self.decode(content, hash);
}
if let Some(ref seg) = self.seg {
if let Some(content) = seg.get(hash)? {
return self.decode(content, hash);
}
}
let path = self.root.join(&hash[..2]).join(&hash[2..]);
let c = fs::read(&path).with_context(|| format!("read object {}", hash))?;
let actual = blake2b(&c);
if actual != hash {
bail!("object {} tampered: expected {} got {}", hash, hash, actual);
}
self.decode(c, hash)
}
fn decode(&self, content: Vec<u8>, hash: &str) -> Result<Node> {
let raw = match &self.dek {
Some(dek) => decrypt(&content, dek)?,
None => content,
};
let mut node: Node = serde_json::from_slice(&raw)
.context("deserialize node")?;
if node.hash.is_empty() {
node.hash = hash.to_string();
}
Ok(node)
}
pub fn all_hashes(&self) -> Box<dyn Iterator<Item = String> + '_> {
if let Some(ref mem) = self.mem {
let hashes: Vec<String> = mem.iter().map(|e| e.key().clone()).collect();
return Box::new(hashes.into_iter());
}
if let Some(ref seg) = self.seg {
let mut seen: std::collections::HashSet<String> =
seg.all_hashes().into_iter().collect();
if let Ok(rd) = fs::read_dir(&self.root) {
for prefix_dir in rd.flatten() {
if !prefix_dir.file_type().map(|t| t.is_dir()).unwrap_or(false) { continue; }
let prefix = prefix_dir.file_name().to_string_lossy().to_string();
if prefix.len() != 2 { continue; } if let Ok(rd2) = fs::read_dir(prefix_dir.path()) {
for e in rd2.flatten() {
let name = e.file_name().to_string_lossy().to_string();
if name.ends_with(".tmp") { continue; }
seen.insert(format!("{}{}", prefix, name));
}
}
}
}
return Box::new(seen.into_iter());
}
let root = self.root.clone();
Box::new(fs::read_dir(&root)
.into_iter()
.flatten()
.filter_map(|e| e.ok())
.filter(|e| e.file_type().map(|t| t.is_dir()).unwrap_or(false))
.flat_map(move |prefix_dir| {
let prefix = prefix_dir.file_name().to_string_lossy().to_string();
fs::read_dir(prefix_dir.path())
.into_iter()
.flatten()
.filter_map(|e| e.ok())
.filter_map(move |e| {
let name = e.file_name().to_string_lossy().to_string();
if name.ends_with(".tmp") { return None; }
Some(format!("{}{}", prefix, name))
})
}))
}
pub fn sync(&self) -> Result<()> {
if let Some(ref seg) = self.seg {
seg.sync()?;
}
Ok(())
}
pub fn compact(&self, live: &std::collections::HashSet<String>) -> Result<crate::segment::CompactStats> {
match self.seg {
Some(ref seg) => seg.compact(live),
None => Ok(crate::segment::CompactStats::default()),
}
}
pub fn verify_all(&self) -> (usize, Vec<String>) {
use rayon::prelude::*;
let hashes: Vec<String> = self.all_hashes().collect();
let results: Vec<(bool, String)> = hashes.par_iter().map(|h| {
(self.read(h).is_ok(), h.clone())
}).collect();
let ok = results.iter().filter(|(ok, _)| *ok).count();
let bad: Vec<String> = results.into_iter()
.filter(|(ok, _)| !*ok)
.map(|(_, h)| h)
.collect();
(ok, bad)
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
fn make_node(id: &str, coll: &str, seq: u64) -> Node {
Node {
id: id.to_string(), coll: coll.to_string(), seq,
data: serde_json::json!({"height": seq, "hash": "0000abc"}),
prev: None, caused_by: vec![], ts: 1718400000.0,
valid_from: None, valid_to: None, hash: String::new(),
}
}
#[test]
fn write_read_roundtrip() {
let dir = tempdir().unwrap();
let store = ObjectStore::new(dir.path(), None).unwrap();
let mut node = make_node("1", "blocks", 1);
let hash = store.write(&mut node).unwrap();
assert_eq!(hash.len(), 64);
let read_back = store.read(&hash).unwrap();
assert_eq!(read_back.id, "1");
assert_eq!(read_back.coll, "blocks");
}
#[test]
fn write_is_idempotent() {
let dir = tempdir().unwrap();
let store = ObjectStore::new(dir.path(), None).unwrap();
let mut node = make_node("1", "blocks", 1);
let h1 = store.write(&mut node).unwrap();
let h2 = store.write(&mut node).unwrap();
assert_eq!(h1, h2);
}
#[test]
fn tamper_detected() {
let dir = tempdir().unwrap();
let store = ObjectStore::new(dir.path(), None).unwrap();
let mut node = make_node("1", "blocks", 1);
let hash = store.write(&mut node).unwrap();
let path = dir.path().join("objects").join(&hash[..2]).join(&hash[2..]);
let mut content = fs::read(&path).unwrap();
content[10] ^= 0xff;
fs::write(&path, content).unwrap();
assert!(store.read(&hash).is_err());
}
}