1use std::fs;
13use std::path::{Path, PathBuf};
14use std::sync::Arc;
15use anyhow::{bail, Context, Result};
16use serde::{Deserialize, Serialize};
17use blake2::{Blake2b512, Digest};
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct Node {
22 pub id: String,
24 pub coll: String,
26 pub seq: u64,
28 pub data: serde_json::Value,
30 pub prev: Option<String>,
32 pub caused_by: Vec<String>,
34 pub ts: f64,
36 pub valid_from: Option<String>,
38 pub valid_to: Option<String>,
40 #[serde(skip_serializing_if = "String::is_empty", default)]
42 pub hash: String,
43}
44
45#[derive(Clone)]
48pub struct Dek(pub [u8; 32]);
49
50impl Dek {
51 pub fn from_tmk(tmk: &[u8; 32], salt: &[u8]) -> Self {
52 use sha2::{Sha256, Digest as _};
53 let mut h = Sha256::new();
54 h.update(tmk);
55 h.update(salt);
56 let result = h.finalize();
57 let mut key = [0u8; 32];
58 key.copy_from_slice(&result[..32]);
59 Dek(key)
60 }
61}
62
63fn blake2b(data: &[u8]) -> String {
64 let mut h = Blake2b512::new();
65 h.update(data);
66 hex::encode(&h.finalize()[..32]) }
68
69fn dag_v3_enabled() -> bool {
72 std::env::var("NEDB_DAG_V3")
73 .map(|v| {
74 let v = v.trim();
75 v == "1" || v.eq_ignore_ascii_case("true")
76 || v.eq_ignore_ascii_case("on")
77 || v.eq_ignore_ascii_case("yes")
78 })
79 .unwrap_or(false)
80}
81
82fn encrypt(data: &[u8], dek: &Dek) -> Result<Vec<u8>> {
83 use aes_gcm::{Aes256Gcm, KeyInit, aead::{Aead, OsRng, rand_core::RngCore}};
84 let cipher = Aes256Gcm::new_from_slice(&dek.0)?;
85 let mut nonce_bytes = [0u8; 12];
86 OsRng.fill_bytes(&mut nonce_bytes);
87 let nonce = aes_gcm::Nonce::from(nonce_bytes);
88 let ciphertext = cipher.encrypt(&nonce, data)
89 .map_err(|e| anyhow::anyhow!("encrypt failed: {:?}", e))?;
90 let mut out = nonce_bytes.to_vec();
92 out.extend_from_slice(&ciphertext);
93 Ok(out)
94}
95
96fn decrypt(data: &[u8], dek: &Dek) -> Result<Vec<u8>> {
97 use aes_gcm::{Aes256Gcm, KeyInit, aead::Aead};
98 if data.len() < 12 { bail!("ciphertext too short"); }
99 let (nonce_bytes, ciphertext) = data.split_at(12);
100 let cipher = Aes256Gcm::new_from_slice(&dek.0)?;
101 let nonce = aes_gcm::Nonce::from_slice(nonce_bytes);
102 cipher.decrypt(nonce, ciphertext)
103 .map_err(|e| anyhow::anyhow!("decrypt failed: {:?}", e))
104}
105
106pub struct ObjectStore {
108 root: PathBuf,
109 dek: Option<Dek>,
110 mem: Option<Arc<dashmap::DashMap<String, Vec<u8>>>>,
112 seg: Option<crate::segment::SegmentStore>,
115}
116
117impl ObjectStore {
118 pub fn new(db_root: &Path, dek: Option<Dek>) -> Result<Self> {
119 let root = db_root.join("objects");
120 fs::create_dir_all(&root)
121 .context("create objects/ dir")?;
122 let seg = if dag_v3_enabled() {
125 Some(crate::segment::SegmentStore::open(&root)?)
126 } else {
127 None
128 };
129 Ok(Self { root, dek, mem: None, seg })
130 }
131
132 pub fn in_memory() -> Self {
134 Self {
135 root: PathBuf::from(":memory:"),
136 dek: None,
137 mem: Some(Arc::new(dashmap::DashMap::new())),
138 seg: None,
139 }
140 }
141
142 pub fn write(&self, node: &mut Node) -> Result<String> {
144 node.hash = String::new();
152 let raw = serde_json::to_vec(node)?;
153 let content = match &self.dek {
154 Some(dek) => encrypt(&raw, dek)?,
155 None => raw,
156 };
157 let hash = blake2b(&content);
158
159 if let Some(ref mem) = self.mem {
160 mem.entry(hash.clone()).or_insert_with(|| content);
162 } else if let Some(ref seg) = self.seg {
163 seg.put(&hash, &content)?;
165 } else {
166 let dir = self.root.join(&hash[..2]);
168 fs::create_dir_all(&dir)?;
169 let path = dir.join(&hash[2..]);
170 if !path.exists() {
171 let tmp = path.with_extension("tmp");
172 fs::write(&tmp, &content)?;
173 fs::rename(&tmp, &path)?;
174 }
175 }
176 node.hash = hash.clone();
177 Ok(hash)
178 }
179
180 pub fn read(&self, hash: &str) -> Result<Node> {
182 if hash.len() < 3 {
183 anyhow::bail!("invalid object hash (too short): {:?}", hash);
184 }
185
186 if let Some(ref mem) = self.mem {
188 let content = mem.get(hash)
189 .map(|v| v.clone())
190 .ok_or_else(|| anyhow::anyhow!("object not found in memory: {}", hash))?;
191 return self.decode(content, hash);
192 }
193
194 if let Some(ref seg) = self.seg {
197 if let Some(content) = seg.get(hash)? {
198 return self.decode(content, hash);
199 }
200 }
202
203 let path = self.root.join(&hash[..2]).join(&hash[2..]);
205 let c = fs::read(&path).with_context(|| format!("read object {}", hash))?;
206 let actual = blake2b(&c);
208 if actual != hash {
209 bail!("object {} tampered: expected {} got {}", hash, hash, actual);
210 }
211 self.decode(c, hash)
212 }
213
214 fn decode(&self, content: Vec<u8>, hash: &str) -> Result<Node> {
218 let raw = match &self.dek {
219 Some(dek) => decrypt(&content, dek)?,
220 None => content,
221 };
222 let mut node: Node = serde_json::from_slice(&raw)
223 .context("deserialize node")?;
224 if node.hash.is_empty() {
225 node.hash = hash.to_string();
226 }
227 Ok(node)
228 }
229
230 pub fn all_hashes(&self) -> Box<dyn Iterator<Item = String> + '_> {
232 if let Some(ref mem) = self.mem {
234 let hashes: Vec<String> = mem.iter().map(|e| e.key().clone()).collect();
235 return Box::new(hashes.into_iter());
236 }
237
238 if let Some(ref seg) = self.seg {
241 let mut seen: std::collections::HashSet<String> =
242 seg.all_hashes().into_iter().collect();
243 if let Ok(rd) = fs::read_dir(&self.root) {
244 for prefix_dir in rd.flatten() {
245 if !prefix_dir.file_type().map(|t| t.is_dir()).unwrap_or(false) { continue; }
246 let prefix = prefix_dir.file_name().to_string_lossy().to_string();
247 if prefix.len() != 2 { continue; } if let Ok(rd2) = fs::read_dir(prefix_dir.path()) {
249 for e in rd2.flatten() {
250 let name = e.file_name().to_string_lossy().to_string();
251 if name.ends_with(".tmp") { continue; }
252 seen.insert(format!("{}{}", prefix, name));
253 }
254 }
255 }
256 }
257 return Box::new(seen.into_iter());
258 }
259
260 let root = self.root.clone();
262 Box::new(fs::read_dir(&root)
263 .into_iter()
264 .flatten()
265 .filter_map(|e| e.ok())
266 .filter(|e| e.file_type().map(|t| t.is_dir()).unwrap_or(false))
267 .flat_map(move |prefix_dir| {
268 let prefix = prefix_dir.file_name().to_string_lossy().to_string();
269 fs::read_dir(prefix_dir.path())
270 .into_iter()
271 .flatten()
272 .filter_map(|e| e.ok())
273 .filter_map(move |e| {
274 let name = e.file_name().to_string_lossy().to_string();
275 if name.ends_with(".tmp") { return None; }
276 Some(format!("{}{}", prefix, name))
277 })
278 }))
279 }
280
281 pub fn sync(&self) -> Result<()> {
284 if let Some(ref seg) = self.seg {
285 seg.sync()?;
286 }
287 Ok(())
288 }
289
290 pub fn compact(&self, live: &std::collections::HashSet<String>) -> Result<crate::segment::CompactStats> {
294 match self.seg {
295 Some(ref seg) => seg.compact(live),
296 None => Ok(crate::segment::CompactStats::default()),
297 }
298 }
299
300 pub fn verify_all(&self) -> (usize, Vec<String>) {
302 use rayon::prelude::*;
303 let hashes: Vec<String> = self.all_hashes().collect();
304 let results: Vec<(bool, String)> = hashes.par_iter().map(|h| {
305 (self.read(h).is_ok(), h.clone())
306 }).collect();
307 let ok = results.iter().filter(|(ok, _)| *ok).count();
308 let bad: Vec<String> = results.into_iter()
309 .filter(|(ok, _)| !*ok)
310 .map(|(_, h)| h)
311 .collect();
312 (ok, bad)
313 }
314}
315
316#[cfg(test)]
317mod tests {
318 use super::*;
319 use tempfile::tempdir;
320
321 fn make_node(id: &str, coll: &str, seq: u64) -> Node {
322 Node {
323 id: id.to_string(), coll: coll.to_string(), seq,
324 data: serde_json::json!({"height": seq, "hash": "0000abc"}),
325 prev: None, caused_by: vec![], ts: 1718400000.0,
326 valid_from: None, valid_to: None, hash: String::new(),
327 }
328 }
329
330 #[test]
331 fn write_read_roundtrip() {
332 let dir = tempdir().unwrap();
333 let store = ObjectStore::new(dir.path(), None).unwrap();
334 let mut node = make_node("1", "blocks", 1);
335 let hash = store.write(&mut node).unwrap();
336 assert_eq!(hash.len(), 64);
337 let read_back = store.read(&hash).unwrap();
338 assert_eq!(read_back.id, "1");
339 assert_eq!(read_back.coll, "blocks");
340 }
341
342 #[test]
343 fn write_is_idempotent() {
344 let dir = tempdir().unwrap();
345 let store = ObjectStore::new(dir.path(), None).unwrap();
346 let mut node = make_node("1", "blocks", 1);
347 let h1 = store.write(&mut node).unwrap();
348 let h2 = store.write(&mut node).unwrap();
349 assert_eq!(h1, h2);
350 }
351
352 #[test]
353 fn tamper_detected() {
354 let dir = tempdir().unwrap();
355 let store = ObjectStore::new(dir.path(), None).unwrap();
356 let mut node = make_node("1", "blocks", 1);
357 let hash = store.write(&mut node).unwrap();
358 let path = dir.path().join("objects").join(&hash[..2]).join(&hash[2..]);
360 let mut content = fs::read(&path).unwrap();
361 content[10] ^= 0xff;
362 fs::write(&path, content).unwrap();
363 assert!(store.read(&hash).is_err());
364 }
365}