Skip to main content

nedb_engine/
store.rs

1//! Content-addressed object store — the foundation of NEDB v2.
2//!
3//! Every document version is stored as an immutable, encrypted, BLAKE2b-hashed
4//! object at `objects/{hash[0:2]}/{hash[2:]}`. Once written, objects never change.
5//!
6//! Uncorruptable by design:
7//! - Writes are atomic (write to .tmp → rename)
8//! - Every read verifies the BLAKE2b hash of the content
9//! - A partial write leaves a .tmp file that is ignored on startup
10//! - There is no single mutable file that can be partially overwritten
11
12use std::fs;
13use std::path::{Path, PathBuf};
14use std::sync::Arc;
15use anyhow::{bail, Context, Result};
16use serde::{Deserialize, Serialize};
17use blake2::{Blake2b512, Digest};
18
19/// A single versioned document node in the DAG.
20#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct Node {
22    /// User-supplied document ID (e.g. "618000", "abc-token-id")
23    pub id:         String,
24    /// Collection name (e.g. "blocks", "itsl_ops")
25    pub coll:       String,
26    /// Monotonic global sequence number assigned at write time
27    pub seq:        u64,
28    /// The document payload (arbitrary JSON)
29    pub data:       serde_json::Value,
30    /// BLAKE2b hash of the previous version of this document (version chain)
31    pub prev:       Option<String>,
32    /// BLAKE2b hashes of nodes that causally led to this write
33    pub caused_by:  Vec<String>,
34    /// Unix timestamp (seconds since epoch)
35    pub ts:         f64,
36    /// Bi-temporal valid-from (ISO 8601)
37    pub valid_from: Option<String>,
38    /// Bi-temporal valid-to   (ISO 8601); None = still valid
39    pub valid_to:   Option<String>,
40    /// BLAKE2b hash of this node's encrypted content (set after writing)
41    #[serde(skip_serializing_if = "String::is_empty", default)]
42    pub hash:       String,
43}
44
45/// Encryption key material (AES-256-GCM).
46/// In v1 this was called DEK; the structure is the same.
47#[derive(Clone)]
48pub struct Dek(pub [u8; 32]);
49
50impl Dek {
51    pub fn from_tmk(tmk: &[u8; 32], salt: &[u8]) -> Self {
52        use sha2::{Sha256, Digest as _};
53        let mut h = Sha256::new();
54        h.update(tmk);
55        h.update(salt);
56        let result = h.finalize();
57        let mut key = [0u8; 32];
58        key.copy_from_slice(&result[..32]);
59        Dek(key)
60    }
61}
62
63fn blake2b(data: &[u8]) -> String {
64    let mut h = Blake2b512::new();
65    h.update(data);
66    hex::encode(&h.finalize()[..32])   // use first 32 bytes → 64 hex chars
67}
68
69fn encrypt(data: &[u8], dek: &Dek) -> Result<Vec<u8>> {
70    use aes_gcm::{Aes256Gcm, KeyInit, aead::{Aead, OsRng, rand_core::RngCore}};
71    let cipher = Aes256Gcm::new_from_slice(&dek.0)?;
72    let mut nonce_bytes = [0u8; 12];
73    OsRng.fill_bytes(&mut nonce_bytes);
74    let nonce = aes_gcm::Nonce::from(nonce_bytes);
75    let ciphertext = cipher.encrypt(&nonce, data)
76        .map_err(|e| anyhow::anyhow!("encrypt failed: {:?}", e))?;
77    // Format: 12-byte nonce || ciphertext
78    let mut out = nonce_bytes.to_vec();
79    out.extend_from_slice(&ciphertext);
80    Ok(out)
81}
82
83fn decrypt(data: &[u8], dek: &Dek) -> Result<Vec<u8>> {
84    use aes_gcm::{Aes256Gcm, KeyInit, aead::Aead};
85    if data.len() < 12 { bail!("ciphertext too short"); }
86    let (nonce_bytes, ciphertext) = data.split_at(12);
87    let cipher = Aes256Gcm::new_from_slice(&dek.0)?;
88    let nonce = aes_gcm::Nonce::from_slice(nonce_bytes);
89    cipher.decrypt(nonce, ciphertext)
90        .map_err(|e| anyhow::anyhow!("decrypt failed: {:?}", e))
91}
92
93/// Content-addressed, encrypted, tamper-evident object store.
94pub struct ObjectStore {
95    root: PathBuf,
96    dek:  Option<Dek>,
97    /// In-memory store: hash → raw bytes. None = disk-backed (normal mode).
98    mem:  Option<Arc<dashmap::DashMap<String, Vec<u8>>>>,
99}
100
101impl ObjectStore {
102    pub fn new(db_root: &Path, dek: Option<Dek>) -> Result<Self> {
103        let root = db_root.join("objects");
104        fs::create_dir_all(&root)
105            .context("create objects/ dir")?;
106        Ok(Self { root, dek, mem: None })
107    }
108
109    /// Create a pure in-memory object store — no disk, no files.
110    pub fn in_memory() -> Self {
111        Self {
112            root: PathBuf::from(":memory:"),
113            dek:  None,
114            mem:  Some(Arc::new(dashmap::DashMap::new())),
115        }
116    }
117
118    /// Write a node. Returns the content hash (the node's permanent ID in the DAG).
119    pub fn write(&self, node: &mut Node) -> Result<String> {
120        let raw = serde_json::to_vec(node)?;
121        let content = match &self.dek {
122            Some(dek) => encrypt(&raw, dek)?,
123            None      => raw,
124        };
125        let hash = blake2b(&content);
126
127        if let Some(ref mem) = self.mem {
128            // In-memory: store in DashMap — idempotent
129            mem.entry(hash.clone()).or_insert_with(|| content);
130        } else {
131            // Disk: write atomically via tmp → rename
132            let dir  = self.root.join(&hash[..2]);
133            fs::create_dir_all(&dir)?;
134            let path = dir.join(&hash[2..]);
135            if !path.exists() {
136                let tmp = path.with_extension("tmp");
137                fs::write(&tmp, &content)?;
138                fs::rename(&tmp, &path)?;
139            }
140        }
141        node.hash = hash.clone();
142        Ok(hash)
143    }
144
145    /// Read and verify a node by its hash. Returns error on hash mismatch (tamper).
146    pub fn read(&self, hash: &str) -> Result<Node> {
147        if hash.len() < 3 {
148            anyhow::bail!("invalid object hash (too short): {:?}", hash);
149        }
150
151        let content: Vec<u8> = if let Some(ref mem) = self.mem {
152            mem.get(hash)
153                .map(|v| v.clone())
154                .ok_or_else(|| anyhow::anyhow!("object not found in memory: {}", hash))?
155        } else {
156            let path = self.root.join(&hash[..2]).join(&hash[2..]);
157            let c = fs::read(&path).with_context(|| format!("read object {}", hash))?;
158            // Hash verification — any bit rot or tampering is caught here
159            let actual = blake2b(&c);
160            if actual != hash {
161                bail!("object {} tampered: expected {} got {}", hash, hash, actual);
162            }
163            c
164        };
165
166        let raw = match &self.dek {
167            Some(dek) => decrypt(&content, dek)?,
168            None      => content,
169        };
170        let mut node: Node = serde_json::from_slice(&raw)
171            .context("deserialize node")?;
172        if node.hash.is_empty() {
173            node.hash = hash.to_string();
174        }
175        Ok(node)
176    }
177
178    /// List all object hashes (for startup index rebuild / verify).
179    pub fn all_hashes(&self) -> Box<dyn Iterator<Item = String> + '_> {
180        // In-memory: collect from DashMap
181        if let Some(ref mem) = self.mem {
182            let hashes: Vec<String> = mem.iter().map(|e| e.key().clone()).collect();
183            return Box::new(hashes.into_iter());
184        }
185        // Disk: walk objects/ directory tree
186        let root = self.root.clone();
187        Box::new(fs::read_dir(&root)
188            .into_iter()
189            .flatten()
190            .filter_map(|e| e.ok())
191            .filter(|e| e.file_type().map(|t| t.is_dir()).unwrap_or(false))
192            .flat_map(move |prefix_dir| {
193                let prefix = prefix_dir.file_name().to_string_lossy().to_string();
194                fs::read_dir(prefix_dir.path())
195                    .into_iter()
196                    .flatten()
197                    .filter_map(|e| e.ok())
198                    .filter_map(move |e| {
199                        let name = e.file_name().to_string_lossy().to_string();
200                        if name.ends_with(".tmp") { return None; }
201                        Some(format!("{}{}", prefix, name))
202                    })
203            }))
204    }
205
206    /// Verify all objects. Returns (ok_count, tampered_hashes).
207    pub fn verify_all(&self) -> (usize, Vec<String>) {
208        use rayon::prelude::*;
209        let hashes: Vec<String> = self.all_hashes().collect();
210        let results: Vec<(bool, String)> = hashes.par_iter().map(|h| {
211            (self.read(h).is_ok(), h.clone())
212        }).collect();
213        let ok = results.iter().filter(|(ok, _)| *ok).count();
214        let bad: Vec<String> = results.into_iter()
215            .filter(|(ok, _)| !*ok)
216            .map(|(_, h)| h)
217            .collect();
218        (ok, bad)
219    }
220}
221
222#[cfg(test)]
223mod tests {
224    use super::*;
225    use tempfile::tempdir;
226
227    fn make_node(id: &str, coll: &str, seq: u64) -> Node {
228        Node {
229            id: id.to_string(), coll: coll.to_string(), seq,
230            data: serde_json::json!({"height": seq, "hash": "0000abc"}),
231            prev: None, caused_by: vec![], ts: 1718400000.0,
232            valid_from: None, valid_to: None, hash: String::new(),
233        }
234    }
235
236    #[test]
237    fn write_read_roundtrip() {
238        let dir = tempdir().unwrap();
239        let store = ObjectStore::new(dir.path(), None).unwrap();
240        let mut node = make_node("1", "blocks", 1);
241        let hash = store.write(&mut node).unwrap();
242        assert_eq!(hash.len(), 64);
243        let read_back = store.read(&hash).unwrap();
244        assert_eq!(read_back.id, "1");
245        assert_eq!(read_back.coll, "blocks");
246    }
247
248    #[test]
249    fn write_is_idempotent() {
250        let dir = tempdir().unwrap();
251        let store = ObjectStore::new(dir.path(), None).unwrap();
252        let mut node = make_node("1", "blocks", 1);
253        let h1 = store.write(&mut node).unwrap();
254        let h2 = store.write(&mut node).unwrap();
255        assert_eq!(h1, h2);
256    }
257
258    #[test]
259    fn tamper_detected() {
260        let dir = tempdir().unwrap();
261        let store = ObjectStore::new(dir.path(), None).unwrap();
262        let mut node = make_node("1", "blocks", 1);
263        let hash = store.write(&mut node).unwrap();
264        // Corrupt the object file
265        let path = dir.path().join("objects").join(&hash[..2]).join(&hash[2..]);
266        let mut content = fs::read(&path).unwrap();
267        content[10] ^= 0xff;
268        fs::write(&path, content).unwrap();
269        assert!(store.read(&hash).is_err());
270    }
271}