Skip to main content

nedb_engine/
store.rs

1//! Content-addressed object store — the foundation of NEDB v2.
2//!
3//! Every document version is stored as an immutable, encrypted, BLAKE2b-hashed
4//! object at `objects/{hash[0:2]}/{hash[2:]}`. Once written, objects never change.
5//!
6//! Uncorruptable by design:
7//! - Writes are atomic (write to .tmp → rename)
8//! - Every read verifies the BLAKE2b hash of the content
9//! - A partial write leaves a .tmp file that is ignored on startup
10//! - There is no single mutable file that can be partially overwritten
11
12use std::fs;
13use std::path::{Path, PathBuf};
14use std::sync::Arc;
15use anyhow::{bail, Context, Result};
16use serde::{Deserialize, Serialize};
17use blake2::{Blake2b512, Digest};
18
19/// A single versioned document node in the DAG.
20#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct Node {
22    /// User-supplied document ID (e.g. "618000", "abc-token-id")
23    pub id:         String,
24    /// Collection name (e.g. "blocks", "itsl_ops")
25    pub coll:       String,
26    /// Monotonic global sequence number assigned at write time
27    pub seq:        u64,
28    /// The document payload (arbitrary JSON)
29    pub data:       serde_json::Value,
30    /// BLAKE2b hash of the previous version of this document (version chain)
31    pub prev:       Option<String>,
32    /// BLAKE2b hashes of nodes that causally led to this write
33    pub caused_by:  Vec<String>,
34    /// Unix timestamp (seconds since epoch)
35    pub ts:         f64,
36    /// Bi-temporal valid-from (ISO 8601)
37    pub valid_from: Option<String>,
38    /// Bi-temporal valid-to   (ISO 8601); None = still valid
39    pub valid_to:   Option<String>,
40    /// BLAKE2b hash of this node's encrypted content (set after writing)
41    #[serde(skip_serializing_if = "String::is_empty", default)]
42    pub hash:       String,
43}
44
45/// Encryption key material (AES-256-GCM).
46/// In v1 this was called DEK; the structure is the same.
47#[derive(Clone)]
48pub struct Dek(pub [u8; 32]);
49
50impl Dek {
51    pub fn from_tmk(tmk: &[u8; 32], salt: &[u8]) -> Self {
52        use sha2::{Sha256, Digest as _};
53        let mut h = Sha256::new();
54        h.update(tmk);
55        h.update(salt);
56        let result = h.finalize();
57        let mut key = [0u8; 32];
58        key.copy_from_slice(&result[..32]);
59        Dek(key)
60    }
61}
62
63fn blake2b(data: &[u8]) -> String {
64    let mut h = Blake2b512::new();
65    h.update(data);
66    hex::encode(&h.finalize()[..32])   // use first 32 bytes → 64 hex chars
67}
68
69/// NEDB v3 opt-in: the `--dag-v3` flag sets `NEDB_DAG_V3`, which switches the
70/// object substrate to the packed segment store. Default off → byte-for-byte v2.
71fn dag_v3_enabled() -> bool {
72    std::env::var("NEDB_DAG_V3")
73        .map(|v| {
74            let v = v.trim();
75            v == "1" || v.eq_ignore_ascii_case("true")
76                     || v.eq_ignore_ascii_case("on")
77                     || v.eq_ignore_ascii_case("yes")
78        })
79        .unwrap_or(false)
80}
81
82fn encrypt(data: &[u8], dek: &Dek) -> Result<Vec<u8>> {
83    use aes_gcm::{Aes256Gcm, KeyInit, aead::{Aead, OsRng, rand_core::RngCore}};
84    let cipher = Aes256Gcm::new_from_slice(&dek.0)?;
85    let mut nonce_bytes = [0u8; 12];
86    OsRng.fill_bytes(&mut nonce_bytes);
87    let nonce = aes_gcm::Nonce::from(nonce_bytes);
88    let ciphertext = cipher.encrypt(&nonce, data)
89        .map_err(|e| anyhow::anyhow!("encrypt failed: {:?}", e))?;
90    // Format: 12-byte nonce || ciphertext
91    let mut out = nonce_bytes.to_vec();
92    out.extend_from_slice(&ciphertext);
93    Ok(out)
94}
95
96fn decrypt(data: &[u8], dek: &Dek) -> Result<Vec<u8>> {
97    use aes_gcm::{Aes256Gcm, KeyInit, aead::Aead};
98    if data.len() < 12 { bail!("ciphertext too short"); }
99    let (nonce_bytes, ciphertext) = data.split_at(12);
100    let cipher = Aes256Gcm::new_from_slice(&dek.0)?;
101    let nonce = aes_gcm::Nonce::from_slice(nonce_bytes);
102    cipher.decrypt(nonce, ciphertext)
103        .map_err(|e| anyhow::anyhow!("decrypt failed: {:?}", e))
104}
105
106/// Content-addressed, encrypted, tamper-evident object store.
107pub struct ObjectStore {
108    root: PathBuf,
109    dek:  Option<Dek>,
110    /// In-memory store: hash → raw bytes. None = disk-backed (normal mode).
111    mem:  Option<Arc<dashmap::DashMap<String, Vec<u8>>>>,
112    /// NEDB v3 packed substrate. Some = segment mode (NEDB_DAG_V3 / --dag-v3);
113    /// new writes go to segments, reads fall back to loose v2 objects.
114    seg:  Option<crate::segment::SegmentStore>,
115}
116
117impl ObjectStore {
118    pub fn new(db_root: &Path, dek: Option<Dek>) -> Result<Self> {
119        let root = db_root.join("objects");
120        fs::create_dir_all(&root)
121            .context("create objects/ dir")?;
122        // v3 opt-in: bring up the packed segment substrate (and rebuild its
123        // index by scanning existing segments). Off by default → loose objects.
124        let seg = if dag_v3_enabled() {
125            Some(crate::segment::SegmentStore::open(&root)?)
126        } else {
127            None
128        };
129        Ok(Self { root, dek, mem: None, seg })
130    }
131
132    /// Create a pure in-memory object store — no disk, no files.
133    pub fn in_memory() -> Self {
134        Self {
135            root: PathBuf::from(":memory:"),
136            dek:  None,
137            mem:  Some(Arc::new(dashmap::DashMap::new())),
138            seg:  None,
139        }
140    }
141
142    /// Write a node. Returns the content hash (the node's permanent ID in the DAG).
143    pub fn write(&self, node: &mut Node) -> Result<String> {
144        // The content hash is taken over the node's CONTENT, never over its own
145        // hash field (which is circular). `hash` is `skip_serializing_if =
146        // "String::is_empty"`, so a *fresh* node already excludes it — but a
147        // node being re-written (node.hash already set from a prior write) would
148        // serialize the populated hash and produce a different content hash,
149        // breaking idempotency. Clear it first so first-write and re-write
150        // serialize byte-for-byte identical content.
151        node.hash = String::new();
152        let raw = serde_json::to_vec(node)?;
153        let content = match &self.dek {
154            Some(dek) => encrypt(&raw, dek)?,
155            None      => raw,
156        };
157        let hash = blake2b(&content);
158
159        if let Some(ref mem) = self.mem {
160            // In-memory: store in DashMap — idempotent
161            mem.entry(hash.clone()).or_insert_with(|| content);
162        } else if let Some(ref seg) = self.seg {
163            // v3: append into a packed segment (one fsync per batch via sync()).
164            seg.put(&hash, &content)?;
165        } else {
166            // v2: loose object file, written atomically via tmp → rename
167            let dir  = self.root.join(&hash[..2]);
168            fs::create_dir_all(&dir)?;
169            let path = dir.join(&hash[2..]);
170            if !path.exists() {
171                let tmp = path.with_extension("tmp");
172                fs::write(&tmp, &content)?;
173                fs::rename(&tmp, &path)?;
174            }
175        }
176        node.hash = hash.clone();
177        Ok(hash)
178    }
179
180    /// Read and verify a node by its hash. Returns error on hash mismatch (tamper).
181    pub fn read(&self, hash: &str) -> Result<Node> {
182        if hash.len() < 3 {
183            anyhow::bail!("invalid object hash (too short): {:?}", hash);
184        }
185
186        // In-memory mode (tests).
187        if let Some(ref mem) = self.mem {
188            let content = mem.get(hash)
189                .map(|v| v.clone())
190                .ok_or_else(|| anyhow::anyhow!("object not found in memory: {}", hash))?;
191            return self.decode(content, hash);
192        }
193
194        // v3 segment mode: try segments first (self-verifying), then fall back
195        // to the loose-object path so existing v2 data stays readable.
196        if let Some(ref seg) = self.seg {
197            if let Some(content) = seg.get(hash)? {
198                return self.decode(content, hash);
199            }
200            // miss → fall through to loose objects (dual-read migration)
201        }
202
203        // v2 loose object.
204        let path = self.root.join(&hash[..2]).join(&hash[2..]);
205        let c = fs::read(&path).with_context(|| format!("read object {}", hash))?;
206        // Hash verification — any bit rot or tampering is caught here
207        let actual = blake2b(&c);
208        if actual != hash {
209            bail!("object {} tampered: expected {} got {}", hash, hash, actual);
210        }
211        self.decode(c, hash)
212    }
213
214    /// Decrypt (if a DEK is set) and deserialize raw content bytes into a Node.
215    /// Hash verification is the caller's responsibility (done before this for the
216    /// loose path; inside SegmentStore::get for the segment path; trusted for mem).
217    fn decode(&self, content: Vec<u8>, hash: &str) -> Result<Node> {
218        let raw = match &self.dek {
219            Some(dek) => decrypt(&content, dek)?,
220            None      => content,
221        };
222        let mut node: Node = serde_json::from_slice(&raw)
223            .context("deserialize node")?;
224        if node.hash.is_empty() {
225            node.hash = hash.to_string();
226        }
227        Ok(node)
228    }
229
230    /// List all object hashes (for startup index rebuild / verify).
231    pub fn all_hashes(&self) -> Box<dyn Iterator<Item = String> + '_> {
232        // In-memory: collect from DashMap
233        if let Some(ref mem) = self.mem {
234            let hashes: Vec<String> = mem.iter().map(|e| e.key().clone()).collect();
235            return Box::new(hashes.into_iter());
236        }
237
238        // v3: union of packed-segment hashes and any loose v2 objects (deduped),
239        // skipping the segments/ subdir during the loose walk.
240        if let Some(ref seg) = self.seg {
241            let mut seen: std::collections::HashSet<String> =
242                seg.all_hashes().into_iter().collect();
243            if let Ok(rd) = fs::read_dir(&self.root) {
244                for prefix_dir in rd.flatten() {
245                    if !prefix_dir.file_type().map(|t| t.is_dir()).unwrap_or(false) { continue; }
246                    let prefix = prefix_dir.file_name().to_string_lossy().to_string();
247                    if prefix.len() != 2 { continue; } // skip "segments" and non-prefix dirs
248                    if let Ok(rd2) = fs::read_dir(prefix_dir.path()) {
249                        for e in rd2.flatten() {
250                            let name = e.file_name().to_string_lossy().to_string();
251                            if name.ends_with(".tmp") { continue; }
252                            seen.insert(format!("{}{}", prefix, name));
253                        }
254                    }
255                }
256            }
257            return Box::new(seen.into_iter());
258        }
259
260        // v2 (default): lazy walk of the objects/ directory tree (unchanged).
261        let root = self.root.clone();
262        Box::new(fs::read_dir(&root)
263            .into_iter()
264            .flatten()
265            .filter_map(|e| e.ok())
266            .filter(|e| e.file_type().map(|t| t.is_dir()).unwrap_or(false))
267            .flat_map(move |prefix_dir| {
268                let prefix = prefix_dir.file_name().to_string_lossy().to_string();
269                fs::read_dir(prefix_dir.path())
270                    .into_iter()
271                    .flatten()
272                    .filter_map(|e| e.ok())
273                    .filter_map(move |e| {
274                        let name = e.file_name().to_string_lossy().to_string();
275                        if name.ends_with(".tmp") { return None; }
276                        Some(format!("{}{}", prefix, name))
277                    })
278            }))
279    }
280
281    /// Flush durable state for the active segment (v3): one fsync per batch,
282    /// wired into Db::flush_all(). No-op for loose-object and in-memory modes.
283    pub fn sync(&self) -> Result<()> {
284        if let Some(ref seg) = self.seg {
285            seg.sync()?;
286        }
287        Ok(())
288    }
289
290    /// Compact the packed segment store (v3), keeping only objects whose hash is
291    /// in `live` and reclaiming the rest. No-op (zeroed stats) for loose-object
292    /// and in-memory modes.
293    pub fn compact(&self, live: &std::collections::HashSet<String>) -> Result<crate::segment::CompactStats> {
294        match self.seg {
295            Some(ref seg) => seg.compact(live),
296            None => Ok(crate::segment::CompactStats::default()),
297        }
298    }
299
300    /// Verify all objects. Returns (ok_count, tampered_hashes).
301    pub fn verify_all(&self) -> (usize, Vec<String>) {
302        use rayon::prelude::*;
303        let hashes: Vec<String> = self.all_hashes().collect();
304        let results: Vec<(bool, String)> = hashes.par_iter().map(|h| {
305            (self.read(h).is_ok(), h.clone())
306        }).collect();
307        let ok = results.iter().filter(|(ok, _)| *ok).count();
308        let bad: Vec<String> = results.into_iter()
309            .filter(|(ok, _)| !*ok)
310            .map(|(_, h)| h)
311            .collect();
312        (ok, bad)
313    }
314}
315
316#[cfg(test)]
317mod tests {
318    use super::*;
319    use tempfile::tempdir;
320
321    fn make_node(id: &str, coll: &str, seq: u64) -> Node {
322        Node {
323            id: id.to_string(), coll: coll.to_string(), seq,
324            data: serde_json::json!({"height": seq, "hash": "0000abc"}),
325            prev: None, caused_by: vec![], ts: 1718400000.0,
326            valid_from: None, valid_to: None, hash: String::new(),
327        }
328    }
329
330    #[test]
331    fn write_read_roundtrip() {
332        let dir = tempdir().unwrap();
333        let store = ObjectStore::new(dir.path(), None).unwrap();
334        let mut node = make_node("1", "blocks", 1);
335        let hash = store.write(&mut node).unwrap();
336        assert_eq!(hash.len(), 64);
337        let read_back = store.read(&hash).unwrap();
338        assert_eq!(read_back.id, "1");
339        assert_eq!(read_back.coll, "blocks");
340    }
341
342    #[test]
343    fn write_is_idempotent() {
344        let dir = tempdir().unwrap();
345        let store = ObjectStore::new(dir.path(), None).unwrap();
346        let mut node = make_node("1", "blocks", 1);
347        let h1 = store.write(&mut node).unwrap();
348        let h2 = store.write(&mut node).unwrap();
349        assert_eq!(h1, h2);
350    }
351
352    #[test]
353    fn tamper_detected() {
354        let dir = tempdir().unwrap();
355        let store = ObjectStore::new(dir.path(), None).unwrap();
356        let mut node = make_node("1", "blocks", 1);
357        let hash = store.write(&mut node).unwrap();
358        // Corrupt the object file
359        let path = dir.path().join("objects").join(&hash[..2]).join(&hash[2..]);
360        let mut content = fs::read(&path).unwrap();
361        content[10] ^= 0xff;
362        fs::write(&path, content).unwrap();
363        assert!(store.read(&hash).is_err());
364    }
365}