Skip to main content

nedb_engine/
store.rs

1//! Content-addressed object store — the foundation of NEDB v2.
2//!
3//! Every document version is stored as an immutable, encrypted, BLAKE2b-hashed
4//! object at `objects/{hash[0:2]}/{hash[2:]}`. Once written, objects never change.
5//!
6//! Uncorruptable by design:
7//! - Writes are atomic (write to .tmp → rename)
8//! - Every read verifies the BLAKE2b hash of the content
9//! - A partial write leaves a .tmp file that is ignored on startup
10//! - There is no single mutable file that can be partially overwritten
11
12use std::fs;
13use std::path::{Path, PathBuf};
14use std::sync::Arc;
15use anyhow::{bail, Context, Result};
16use serde::{Deserialize, Serialize};
17use blake2::{Blake2b512, Digest};
18
19/// A single versioned document node in the DAG.
20#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct Node {
22    /// User-supplied document ID (e.g. "618000", "abc-token-id")
23    pub id:         String,
24    /// Collection name (e.g. "blocks", "itsl_ops")
25    pub coll:       String,
26    /// Monotonic global sequence number assigned at write time
27    pub seq:        u64,
28    /// The document payload (arbitrary JSON)
29    pub data:       serde_json::Value,
30    /// BLAKE2b hash of the previous version of this document (version chain)
31    pub prev:       Option<String>,
32    /// BLAKE2b hashes of nodes that causally led to this write
33    pub caused_by:  Vec<String>,
34    /// Unix timestamp (seconds since epoch)
35    pub ts:         f64,
36    /// Bi-temporal valid-from (ISO 8601)
37    pub valid_from: Option<String>,
38    /// Bi-temporal valid-to   (ISO 8601); None = still valid
39    pub valid_to:   Option<String>,
40    /// BLAKE2b hash of this node's encrypted content (set after writing)
41    #[serde(skip_serializing_if = "String::is_empty", default)]
42    pub hash:       String,
43}
44
45/// Encryption key material (AES-256-GCM).
46/// In v1 this was called DEK; the structure is the same.
47#[derive(Clone)]
48pub struct Dek(pub [u8; 32]);
49
50impl Dek {
51    pub fn from_tmk(tmk: &[u8; 32], salt: &[u8]) -> Self {
52        use sha2::{Sha256, Digest as _};
53        let mut h = Sha256::new();
54        h.update(tmk);
55        h.update(salt);
56        let result = h.finalize();
57        let mut key = [0u8; 32];
58        key.copy_from_slice(&result[..32]);
59        Dek(key)
60    }
61}
62
63fn blake2b(data: &[u8]) -> String {
64    let mut h = Blake2b512::new();
65    h.update(data);
66    hex::encode(&h.finalize()[..32])   // use first 32 bytes → 64 hex chars
67}
68
69/// NEDB v3 opt-in: the `--dag-v3` flag sets `NEDB_DAG_V3`, which switches the
70/// object substrate to the packed segment store. Default off → byte-for-byte v2.
71fn dag_v3_enabled() -> bool {
72    std::env::var("NEDB_DAG_V3")
73        .map(|v| {
74            let v = v.trim();
75            v == "1" || v.eq_ignore_ascii_case("true")
76                     || v.eq_ignore_ascii_case("on")
77                     || v.eq_ignore_ascii_case("yes")
78        })
79        .unwrap_or(false)
80}
81
82fn encrypt(data: &[u8], dek: &Dek) -> Result<Vec<u8>> {
83    use aes_gcm::{Aes256Gcm, KeyInit, aead::{Aead, OsRng, rand_core::RngCore}};
84    let cipher = Aes256Gcm::new_from_slice(&dek.0)?;
85    let mut nonce_bytes = [0u8; 12];
86    OsRng.fill_bytes(&mut nonce_bytes);
87    let nonce = aes_gcm::Nonce::from(nonce_bytes);
88    let ciphertext = cipher.encrypt(&nonce, data)
89        .map_err(|e| anyhow::anyhow!("encrypt failed: {:?}", e))?;
90    // Format: 12-byte nonce || ciphertext
91    let mut out = nonce_bytes.to_vec();
92    out.extend_from_slice(&ciphertext);
93    Ok(out)
94}
95
96fn decrypt(data: &[u8], dek: &Dek) -> Result<Vec<u8>> {
97    use aes_gcm::{Aes256Gcm, KeyInit, aead::Aead};
98    if data.len() < 12 { bail!("ciphertext too short"); }
99    let (nonce_bytes, ciphertext) = data.split_at(12);
100    let cipher = Aes256Gcm::new_from_slice(&dek.0)?;
101    let nonce = aes_gcm::Nonce::from_slice(nonce_bytes);
102    cipher.decrypt(nonce, ciphertext)
103        .map_err(|e| anyhow::anyhow!("decrypt failed: {:?}", e))
104}
105
106/// Content-addressed, encrypted, tamper-evident object store.
107pub struct ObjectStore {
108    root: PathBuf,
109    dek:  Option<Dek>,
110    /// In-memory store: hash → raw bytes. None = disk-backed (normal mode).
111    mem:  Option<Arc<dashmap::DashMap<String, Vec<u8>>>>,
112    /// NEDB v3 packed substrate. Some = segment mode (NEDB_DAG_V3 / --dag-v3);
113    /// new writes go to segments, reads fall back to loose v2 objects.
114    seg:  Option<crate::segment::SegmentStore>,
115}
116
117impl ObjectStore {
118    pub fn new(db_root: &Path, dek: Option<Dek>) -> Result<Self> {
119        let root = db_root.join("objects");
120        fs::create_dir_all(&root)
121            .context("create objects/ dir")?;
122        // v3 opt-in: bring up the packed segment substrate (and rebuild its
123        // index by scanning existing segments). Off by default → loose objects.
124        let seg = if dag_v3_enabled() {
125            Some(crate::segment::SegmentStore::open(&root)?)
126        } else {
127            None
128        };
129        Ok(Self { root, dek, mem: None, seg })
130    }
131
132    /// Create a pure in-memory object store — no disk, no files.
133    pub fn in_memory() -> Self {
134        Self {
135            root: PathBuf::from(":memory:"),
136            dek:  None,
137            mem:  Some(Arc::new(dashmap::DashMap::new())),
138            seg:  None,
139        }
140    }
141
142    /// Write a node. Returns the content hash (the node's permanent ID in the DAG).
143    pub fn write(&self, node: &mut Node) -> Result<String> {
144        let raw = serde_json::to_vec(node)?;
145        let content = match &self.dek {
146            Some(dek) => encrypt(&raw, dek)?,
147            None      => raw,
148        };
149        let hash = blake2b(&content);
150
151        if let Some(ref mem) = self.mem {
152            // In-memory: store in DashMap — idempotent
153            mem.entry(hash.clone()).or_insert_with(|| content);
154        } else if let Some(ref seg) = self.seg {
155            // v3: append into a packed segment (one fsync per batch via sync()).
156            seg.put(&hash, &content)?;
157        } else {
158            // v2: loose object file, written atomically via tmp → rename
159            let dir  = self.root.join(&hash[..2]);
160            fs::create_dir_all(&dir)?;
161            let path = dir.join(&hash[2..]);
162            if !path.exists() {
163                let tmp = path.with_extension("tmp");
164                fs::write(&tmp, &content)?;
165                fs::rename(&tmp, &path)?;
166            }
167        }
168        node.hash = hash.clone();
169        Ok(hash)
170    }
171
172    /// Read and verify a node by its hash. Returns error on hash mismatch (tamper).
173    pub fn read(&self, hash: &str) -> Result<Node> {
174        if hash.len() < 3 {
175            anyhow::bail!("invalid object hash (too short): {:?}", hash);
176        }
177
178        // In-memory mode (tests).
179        if let Some(ref mem) = self.mem {
180            let content = mem.get(hash)
181                .map(|v| v.clone())
182                .ok_or_else(|| anyhow::anyhow!("object not found in memory: {}", hash))?;
183            return self.decode(content, hash);
184        }
185
186        // v3 segment mode: try segments first (self-verifying), then fall back
187        // to the loose-object path so existing v2 data stays readable.
188        if let Some(ref seg) = self.seg {
189            if let Some(content) = seg.get(hash)? {
190                return self.decode(content, hash);
191            }
192            // miss → fall through to loose objects (dual-read migration)
193        }
194
195        // v2 loose object.
196        let path = self.root.join(&hash[..2]).join(&hash[2..]);
197        let c = fs::read(&path).with_context(|| format!("read object {}", hash))?;
198        // Hash verification — any bit rot or tampering is caught here
199        let actual = blake2b(&c);
200        if actual != hash {
201            bail!("object {} tampered: expected {} got {}", hash, hash, actual);
202        }
203        self.decode(c, hash)
204    }
205
206    /// Decrypt (if a DEK is set) and deserialize raw content bytes into a Node.
207    /// Hash verification is the caller's responsibility (done before this for the
208    /// loose path; inside SegmentStore::get for the segment path; trusted for mem).
209    fn decode(&self, content: Vec<u8>, hash: &str) -> Result<Node> {
210        let raw = match &self.dek {
211            Some(dek) => decrypt(&content, dek)?,
212            None      => content,
213        };
214        let mut node: Node = serde_json::from_slice(&raw)
215            .context("deserialize node")?;
216        if node.hash.is_empty() {
217            node.hash = hash.to_string();
218        }
219        Ok(node)
220    }
221
222    /// List all object hashes (for startup index rebuild / verify).
223    pub fn all_hashes(&self) -> Box<dyn Iterator<Item = String> + '_> {
224        // In-memory: collect from DashMap
225        if let Some(ref mem) = self.mem {
226            let hashes: Vec<String> = mem.iter().map(|e| e.key().clone()).collect();
227            return Box::new(hashes.into_iter());
228        }
229
230        // v3: union of packed-segment hashes and any loose v2 objects (deduped),
231        // skipping the segments/ subdir during the loose walk.
232        if let Some(ref seg) = self.seg {
233            let mut seen: std::collections::HashSet<String> =
234                seg.all_hashes().into_iter().collect();
235            if let Ok(rd) = fs::read_dir(&self.root) {
236                for prefix_dir in rd.flatten() {
237                    if !prefix_dir.file_type().map(|t| t.is_dir()).unwrap_or(false) { continue; }
238                    let prefix = prefix_dir.file_name().to_string_lossy().to_string();
239                    if prefix.len() != 2 { continue; } // skip "segments" and non-prefix dirs
240                    if let Ok(rd2) = fs::read_dir(prefix_dir.path()) {
241                        for e in rd2.flatten() {
242                            let name = e.file_name().to_string_lossy().to_string();
243                            if name.ends_with(".tmp") { continue; }
244                            seen.insert(format!("{}{}", prefix, name));
245                        }
246                    }
247                }
248            }
249            return Box::new(seen.into_iter());
250        }
251
252        // v2 (default): lazy walk of the objects/ directory tree (unchanged).
253        let root = self.root.clone();
254        Box::new(fs::read_dir(&root)
255            .into_iter()
256            .flatten()
257            .filter_map(|e| e.ok())
258            .filter(|e| e.file_type().map(|t| t.is_dir()).unwrap_or(false))
259            .flat_map(move |prefix_dir| {
260                let prefix = prefix_dir.file_name().to_string_lossy().to_string();
261                fs::read_dir(prefix_dir.path())
262                    .into_iter()
263                    .flatten()
264                    .filter_map(|e| e.ok())
265                    .filter_map(move |e| {
266                        let name = e.file_name().to_string_lossy().to_string();
267                        if name.ends_with(".tmp") { return None; }
268                        Some(format!("{}{}", prefix, name))
269                    })
270            }))
271    }
272
273    /// Flush durable state for the active segment (v3): one fsync per batch,
274    /// wired into Db::flush_all(). No-op for loose-object and in-memory modes.
275    pub fn sync(&self) -> Result<()> {
276        if let Some(ref seg) = self.seg {
277            seg.sync()?;
278        }
279        Ok(())
280    }
281
282    /// Compact the packed segment store (v3), keeping only objects whose hash is
283    /// in `live` and reclaiming the rest. No-op (zeroed stats) for loose-object
284    /// and in-memory modes.
285    pub fn compact(&self, live: &std::collections::HashSet<String>) -> Result<crate::segment::CompactStats> {
286        match self.seg {
287            Some(ref seg) => seg.compact(live),
288            None => Ok(crate::segment::CompactStats::default()),
289        }
290    }
291
292    /// Verify all objects. Returns (ok_count, tampered_hashes).
293    pub fn verify_all(&self) -> (usize, Vec<String>) {
294        use rayon::prelude::*;
295        let hashes: Vec<String> = self.all_hashes().collect();
296        let results: Vec<(bool, String)> = hashes.par_iter().map(|h| {
297            (self.read(h).is_ok(), h.clone())
298        }).collect();
299        let ok = results.iter().filter(|(ok, _)| *ok).count();
300        let bad: Vec<String> = results.into_iter()
301            .filter(|(ok, _)| !*ok)
302            .map(|(_, h)| h)
303            .collect();
304        (ok, bad)
305    }
306}
307
308#[cfg(test)]
309mod tests {
310    use super::*;
311    use tempfile::tempdir;
312
313    fn make_node(id: &str, coll: &str, seq: u64) -> Node {
314        Node {
315            id: id.to_string(), coll: coll.to_string(), seq,
316            data: serde_json::json!({"height": seq, "hash": "0000abc"}),
317            prev: None, caused_by: vec![], ts: 1718400000.0,
318            valid_from: None, valid_to: None, hash: String::new(),
319        }
320    }
321
322    #[test]
323    fn write_read_roundtrip() {
324        let dir = tempdir().unwrap();
325        let store = ObjectStore::new(dir.path(), None).unwrap();
326        let mut node = make_node("1", "blocks", 1);
327        let hash = store.write(&mut node).unwrap();
328        assert_eq!(hash.len(), 64);
329        let read_back = store.read(&hash).unwrap();
330        assert_eq!(read_back.id, "1");
331        assert_eq!(read_back.coll, "blocks");
332    }
333
334    #[test]
335    fn write_is_idempotent() {
336        let dir = tempdir().unwrap();
337        let store = ObjectStore::new(dir.path(), None).unwrap();
338        let mut node = make_node("1", "blocks", 1);
339        let h1 = store.write(&mut node).unwrap();
340        let h2 = store.write(&mut node).unwrap();
341        assert_eq!(h1, h2);
342    }
343
344    #[test]
345    fn tamper_detected() {
346        let dir = tempdir().unwrap();
347        let store = ObjectStore::new(dir.path(), None).unwrap();
348        let mut node = make_node("1", "blocks", 1);
349        let hash = store.write(&mut node).unwrap();
350        // Corrupt the object file
351        let path = dir.path().join("objects").join(&hash[..2]).join(&hash[2..]);
352        let mut content = fs::read(&path).unwrap();
353        content[10] ^= 0xff;
354        fs::write(&path, content).unwrap();
355        assert!(store.read(&hash).is_err());
356    }
357}