1use std::fs;
13use std::path::{Path, PathBuf};
14use std::sync::Arc;
15use anyhow::{bail, Context, Result};
16use serde::{Deserialize, Serialize};
17use blake2::{Blake2b512, Digest};
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct Node {
22 pub id: String,
24 pub coll: String,
26 pub seq: u64,
28 pub data: serde_json::Value,
30 pub prev: Option<String>,
32 pub caused_by: Vec<String>,
34 pub ts: f64,
36 pub valid_from: Option<String>,
38 pub valid_to: Option<String>,
40 #[serde(skip_serializing_if = "String::is_empty", default)]
42 pub hash: String,
43}
44
45#[derive(Clone)]
48pub struct Dek(pub [u8; 32]);
49
50impl Dek {
51 pub fn from_tmk(tmk: &[u8; 32], salt: &[u8]) -> Self {
52 use sha2::{Sha256, Digest as _};
53 let mut h = Sha256::new();
54 h.update(tmk);
55 h.update(salt);
56 let result = h.finalize();
57 let mut key = [0u8; 32];
58 key.copy_from_slice(&result[..32]);
59 Dek(key)
60 }
61}
62
63fn blake2b(data: &[u8]) -> String {
64 let mut h = Blake2b512::new();
65 h.update(data);
66 hex::encode(&h.finalize()[..32]) }
68
69fn dag_v3_enabled() -> bool {
72 std::env::var("NEDB_DAG_V3")
73 .map(|v| {
74 let v = v.trim();
75 v == "1" || v.eq_ignore_ascii_case("true")
76 || v.eq_ignore_ascii_case("on")
77 || v.eq_ignore_ascii_case("yes")
78 })
79 .unwrap_or(false)
80}
81
82fn encrypt(data: &[u8], dek: &Dek) -> Result<Vec<u8>> {
83 use aes_gcm::{Aes256Gcm, KeyInit, aead::{Aead, OsRng, rand_core::RngCore}};
84 let cipher = Aes256Gcm::new_from_slice(&dek.0)?;
85 let mut nonce_bytes = [0u8; 12];
86 OsRng.fill_bytes(&mut nonce_bytes);
87 let nonce = aes_gcm::Nonce::from(nonce_bytes);
88 let ciphertext = cipher.encrypt(&nonce, data)
89 .map_err(|e| anyhow::anyhow!("encrypt failed: {:?}", e))?;
90 let mut out = nonce_bytes.to_vec();
92 out.extend_from_slice(&ciphertext);
93 Ok(out)
94}
95
96fn decrypt(data: &[u8], dek: &Dek) -> Result<Vec<u8>> {
97 use aes_gcm::{Aes256Gcm, KeyInit, aead::Aead};
98 if data.len() < 12 { bail!("ciphertext too short"); }
99 let (nonce_bytes, ciphertext) = data.split_at(12);
100 let cipher = Aes256Gcm::new_from_slice(&dek.0)?;
101 let nonce = aes_gcm::Nonce::from_slice(nonce_bytes);
102 cipher.decrypt(nonce, ciphertext)
103 .map_err(|e| anyhow::anyhow!("decrypt failed: {:?}", e))
104}
105
106pub struct ObjectStore {
108 root: PathBuf,
109 dek: Option<Dek>,
110 mem: Option<Arc<dashmap::DashMap<String, Vec<u8>>>>,
112 seg: Option<crate::segment::SegmentStore>,
115}
116
117impl ObjectStore {
118 pub fn new(db_root: &Path, dek: Option<Dek>) -> Result<Self> {
119 let root = db_root.join("objects");
120 fs::create_dir_all(&root)
121 .context("create objects/ dir")?;
122 let seg = if dag_v3_enabled() {
125 Some(crate::segment::SegmentStore::open(&root)?)
126 } else {
127 None
128 };
129 Ok(Self { root, dek, mem: None, seg })
130 }
131
132 pub fn in_memory() -> Self {
134 Self {
135 root: PathBuf::from(":memory:"),
136 dek: None,
137 mem: Some(Arc::new(dashmap::DashMap::new())),
138 seg: None,
139 }
140 }
141
142 pub fn write(&self, node: &mut Node) -> Result<String> {
144 let raw = serde_json::to_vec(node)?;
145 let content = match &self.dek {
146 Some(dek) => encrypt(&raw, dek)?,
147 None => raw,
148 };
149 let hash = blake2b(&content);
150
151 if let Some(ref mem) = self.mem {
152 mem.entry(hash.clone()).or_insert_with(|| content);
154 } else if let Some(ref seg) = self.seg {
155 seg.put(&hash, &content)?;
157 } else {
158 let dir = self.root.join(&hash[..2]);
160 fs::create_dir_all(&dir)?;
161 let path = dir.join(&hash[2..]);
162 if !path.exists() {
163 let tmp = path.with_extension("tmp");
164 fs::write(&tmp, &content)?;
165 fs::rename(&tmp, &path)?;
166 }
167 }
168 node.hash = hash.clone();
169 Ok(hash)
170 }
171
172 pub fn read(&self, hash: &str) -> Result<Node> {
174 if hash.len() < 3 {
175 anyhow::bail!("invalid object hash (too short): {:?}", hash);
176 }
177
178 if let Some(ref mem) = self.mem {
180 let content = mem.get(hash)
181 .map(|v| v.clone())
182 .ok_or_else(|| anyhow::anyhow!("object not found in memory: {}", hash))?;
183 return self.decode(content, hash);
184 }
185
186 if let Some(ref seg) = self.seg {
189 if let Some(content) = seg.get(hash)? {
190 return self.decode(content, hash);
191 }
192 }
194
195 let path = self.root.join(&hash[..2]).join(&hash[2..]);
197 let c = fs::read(&path).with_context(|| format!("read object {}", hash))?;
198 let actual = blake2b(&c);
200 if actual != hash {
201 bail!("object {} tampered: expected {} got {}", hash, hash, actual);
202 }
203 self.decode(c, hash)
204 }
205
206 fn decode(&self, content: Vec<u8>, hash: &str) -> Result<Node> {
210 let raw = match &self.dek {
211 Some(dek) => decrypt(&content, dek)?,
212 None => content,
213 };
214 let mut node: Node = serde_json::from_slice(&raw)
215 .context("deserialize node")?;
216 if node.hash.is_empty() {
217 node.hash = hash.to_string();
218 }
219 Ok(node)
220 }
221
222 pub fn all_hashes(&self) -> Box<dyn Iterator<Item = String> + '_> {
224 if let Some(ref mem) = self.mem {
226 let hashes: Vec<String> = mem.iter().map(|e| e.key().clone()).collect();
227 return Box::new(hashes.into_iter());
228 }
229
230 if let Some(ref seg) = self.seg {
233 let mut seen: std::collections::HashSet<String> =
234 seg.all_hashes().into_iter().collect();
235 if let Ok(rd) = fs::read_dir(&self.root) {
236 for prefix_dir in rd.flatten() {
237 if !prefix_dir.file_type().map(|t| t.is_dir()).unwrap_or(false) { continue; }
238 let prefix = prefix_dir.file_name().to_string_lossy().to_string();
239 if prefix.len() != 2 { continue; } if let Ok(rd2) = fs::read_dir(prefix_dir.path()) {
241 for e in rd2.flatten() {
242 let name = e.file_name().to_string_lossy().to_string();
243 if name.ends_with(".tmp") { continue; }
244 seen.insert(format!("{}{}", prefix, name));
245 }
246 }
247 }
248 }
249 return Box::new(seen.into_iter());
250 }
251
252 let root = self.root.clone();
254 Box::new(fs::read_dir(&root)
255 .into_iter()
256 .flatten()
257 .filter_map(|e| e.ok())
258 .filter(|e| e.file_type().map(|t| t.is_dir()).unwrap_or(false))
259 .flat_map(move |prefix_dir| {
260 let prefix = prefix_dir.file_name().to_string_lossy().to_string();
261 fs::read_dir(prefix_dir.path())
262 .into_iter()
263 .flatten()
264 .filter_map(|e| e.ok())
265 .filter_map(move |e| {
266 let name = e.file_name().to_string_lossy().to_string();
267 if name.ends_with(".tmp") { return None; }
268 Some(format!("{}{}", prefix, name))
269 })
270 }))
271 }
272
273 pub fn sync(&self) -> Result<()> {
276 if let Some(ref seg) = self.seg {
277 seg.sync()?;
278 }
279 Ok(())
280 }
281
282 pub fn compact(&self, live: &std::collections::HashSet<String>) -> Result<crate::segment::CompactStats> {
286 match self.seg {
287 Some(ref seg) => seg.compact(live),
288 None => Ok(crate::segment::CompactStats::default()),
289 }
290 }
291
292 pub fn verify_all(&self) -> (usize, Vec<String>) {
294 use rayon::prelude::*;
295 let hashes: Vec<String> = self.all_hashes().collect();
296 let results: Vec<(bool, String)> = hashes.par_iter().map(|h| {
297 (self.read(h).is_ok(), h.clone())
298 }).collect();
299 let ok = results.iter().filter(|(ok, _)| *ok).count();
300 let bad: Vec<String> = results.into_iter()
301 .filter(|(ok, _)| !*ok)
302 .map(|(_, h)| h)
303 .collect();
304 (ok, bad)
305 }
306}
307
308#[cfg(test)]
309mod tests {
310 use super::*;
311 use tempfile::tempdir;
312
313 fn make_node(id: &str, coll: &str, seq: u64) -> Node {
314 Node {
315 id: id.to_string(), coll: coll.to_string(), seq,
316 data: serde_json::json!({"height": seq, "hash": "0000abc"}),
317 prev: None, caused_by: vec![], ts: 1718400000.0,
318 valid_from: None, valid_to: None, hash: String::new(),
319 }
320 }
321
322 #[test]
323 fn write_read_roundtrip() {
324 let dir = tempdir().unwrap();
325 let store = ObjectStore::new(dir.path(), None).unwrap();
326 let mut node = make_node("1", "blocks", 1);
327 let hash = store.write(&mut node).unwrap();
328 assert_eq!(hash.len(), 64);
329 let read_back = store.read(&hash).unwrap();
330 assert_eq!(read_back.id, "1");
331 assert_eq!(read_back.coll, "blocks");
332 }
333
334 #[test]
335 fn write_is_idempotent() {
336 let dir = tempdir().unwrap();
337 let store = ObjectStore::new(dir.path(), None).unwrap();
338 let mut node = make_node("1", "blocks", 1);
339 let h1 = store.write(&mut node).unwrap();
340 let h2 = store.write(&mut node).unwrap();
341 assert_eq!(h1, h2);
342 }
343
344 #[test]
345 fn tamper_detected() {
346 let dir = tempdir().unwrap();
347 let store = ObjectStore::new(dir.path(), None).unwrap();
348 let mut node = make_node("1", "blocks", 1);
349 let hash = store.write(&mut node).unwrap();
350 let path = dir.path().join("objects").join(&hash[..2]).join(&hash[2..]);
352 let mut content = fs::read(&path).unwrap();
353 content[10] ^= 0xff;
354 fs::write(&path, content).unwrap();
355 assert!(store.read(&hash).is_err());
356 }
357}