1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
//! Content-addressed object store — the foundation of NEDB v2.
//!
//! Every document version is stored as an immutable, encrypted, BLAKE2b-hashed
//! object at `objects/{hash[0:2]}/{hash[2:]}`. Once written, objects never change.
//!
//! Uncorruptable by design:
//! - Writes are atomic (write to .tmp → rename)
//! - Every read verifies the BLAKE2b hash of the content
//! - A partial write leaves a .tmp file that is ignored on startup
//! - There is no single mutable file that can be partially overwritten
use std::fs;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use anyhow::{bail, Context, Result};
use serde::{Deserialize, Serialize};
use blake2::{Blake2b512, Digest};
/// A single versioned document node in the DAG.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Node {
/// User-supplied document ID (e.g. "618000", "abc-token-id")
pub id: String,
/// Collection name (e.g. "blocks", "itsl_ops")
pub coll: String,
/// Monotonic global sequence number assigned at write time
pub seq: u64,
/// The document payload (arbitrary JSON)
pub data: serde_json::Value,
/// BLAKE2b hash of the previous version of this document (version chain)
pub prev: Option<String>,
/// BLAKE2b hashes of nodes that causally led to this write
pub caused_by: Vec<String>,
/// Unix timestamp (seconds since epoch)
pub ts: f64,
/// Bi-temporal valid-from (ISO 8601)
pub valid_from: Option<String>,
/// Bi-temporal valid-to (ISO 8601); None = still valid
pub valid_to: Option<String>,
/// BLAKE2b hash of this node's encrypted content (set after writing)
#[serde(skip_serializing_if = "String::is_empty", default)]
pub hash: String,
}
/// Encryption key material (AES-256-GCM).
/// In v1 this was called DEK; the structure is the same.
#[derive(Clone)]
pub struct Dek(pub [u8; 32]);
impl Dek {
pub fn from_tmk(tmk: &[u8; 32], salt: &[u8]) -> Self {
use sha2::{Sha256, Digest as _};
let mut h = Sha256::new();
h.update(tmk);
h.update(salt);
let result = h.finalize();
let mut key = [0u8; 32];
key.copy_from_slice(&result[..32]);
Dek(key)
}
}
fn blake2b(data: &[u8]) -> String {
let mut h = Blake2b512::new();
h.update(data);
hex::encode(&h.finalize()[..32]) // use first 32 bytes → 64 hex chars
}
/// NEDB v3 opt-in: the `--dag-v3` flag sets `NEDB_DAG_V3`, which switches the
/// object substrate to the packed segment store. Default off → byte-for-byte v2.
fn dag_v3_enabled() -> bool {
std::env::var("NEDB_DAG_V3")
.map(|v| {
let v = v.trim();
v == "1" || v.eq_ignore_ascii_case("true")
|| v.eq_ignore_ascii_case("on")
|| v.eq_ignore_ascii_case("yes")
})
.unwrap_or(false)
}
fn encrypt(data: &[u8], dek: &Dek) -> Result<Vec<u8>> {
use aes_gcm::{Aes256Gcm, KeyInit, aead::{Aead, OsRng, rand_core::RngCore}};
let cipher = Aes256Gcm::new_from_slice(&dek.0)?;
let mut nonce_bytes = [0u8; 12];
OsRng.fill_bytes(&mut nonce_bytes);
let nonce = aes_gcm::Nonce::from(nonce_bytes);
let ciphertext = cipher.encrypt(&nonce, data)
.map_err(|e| anyhow::anyhow!("encrypt failed: {:?}", e))?;
// Format: 12-byte nonce || ciphertext
let mut out = nonce_bytes.to_vec();
out.extend_from_slice(&ciphertext);
Ok(out)
}
fn decrypt(data: &[u8], dek: &Dek) -> Result<Vec<u8>> {
use aes_gcm::{Aes256Gcm, KeyInit, aead::Aead};
if data.len() < 12 { bail!("ciphertext too short"); }
let (nonce_bytes, ciphertext) = data.split_at(12);
let cipher = Aes256Gcm::new_from_slice(&dek.0)?;
let nonce = aes_gcm::Nonce::from_slice(nonce_bytes);
cipher.decrypt(nonce, ciphertext)
.map_err(|e| anyhow::anyhow!("decrypt failed: {:?}", e))
}
/// Content-addressed, encrypted, tamper-evident object store.
pub struct ObjectStore {
root: PathBuf,
dek: Option<Dek>,
/// In-memory store: hash → raw bytes. None = disk-backed (normal mode).
mem: Option<Arc<dashmap::DashMap<String, Vec<u8>>>>,
/// NEDB v3 packed substrate. Some = segment mode (NEDB_DAG_V3 / --dag-v3);
/// new writes go to segments, reads fall back to loose v2 objects.
seg: Option<crate::segment::SegmentStore>,
}
impl ObjectStore {
pub fn new(db_root: &Path, dek: Option<Dek>) -> Result<Self> {
let root = db_root.join("objects");
fs::create_dir_all(&root)
.context("create objects/ dir")?;
// v3 opt-in: bring up the packed segment substrate (and rebuild its
// index by scanning existing segments). Off by default → loose objects.
let seg = if dag_v3_enabled() {
Some(crate::segment::SegmentStore::open(&root)?)
} else {
None
};
Ok(Self { root, dek, mem: None, seg })
}
/// Create a pure in-memory object store — no disk, no files.
pub fn in_memory() -> Self {
Self {
root: PathBuf::from(":memory:"),
dek: None,
mem: Some(Arc::new(dashmap::DashMap::new())),
seg: None,
}
}
/// Write a node. Returns the content hash (the node's permanent ID in the DAG).
pub fn write(&self, node: &mut Node) -> Result<String> {
// The content hash is taken over the node's CONTENT, never over its own
// hash field (which is circular). `hash` is `skip_serializing_if =
// "String::is_empty"`, so a *fresh* node already excludes it — but a
// node being re-written (node.hash already set from a prior write) would
// serialize the populated hash and produce a different content hash,
// breaking idempotency. Clear it first so first-write and re-write
// serialize byte-for-byte identical content.
node.hash = String::new();
let raw = serde_json::to_vec(node)?;
let content = match &self.dek {
Some(dek) => encrypt(&raw, dek)?,
None => raw,
};
let hash = blake2b(&content);
if let Some(ref mem) = self.mem {
// In-memory: store in DashMap — idempotent
mem.entry(hash.clone()).or_insert_with(|| content);
} else if let Some(ref seg) = self.seg {
// v3: append into a packed segment (one fsync per batch via sync()).
seg.put(&hash, &content)?;
} else {
// v2: loose object file, written atomically via tmp → rename
let dir = self.root.join(&hash[..2]);
fs::create_dir_all(&dir)?;
let path = dir.join(&hash[2..]);
if !path.exists() {
let tmp = path.with_extension("tmp");
fs::write(&tmp, &content)?;
fs::rename(&tmp, &path)?;
}
}
node.hash = hash.clone();
Ok(hash)
}
/// Read and verify a node by its hash. Returns error on hash mismatch (tamper).
pub fn read(&self, hash: &str) -> Result<Node> {
if hash.len() < 3 {
anyhow::bail!("invalid object hash (too short): {:?}", hash);
}
// In-memory mode (tests).
if let Some(ref mem) = self.mem {
let content = mem.get(hash)
.map(|v| v.clone())
.ok_or_else(|| anyhow::anyhow!("object not found in memory: {}", hash))?;
return self.decode(content, hash);
}
// v3 segment mode: try segments first (self-verifying), then fall back
// to the loose-object path so existing v2 data stays readable.
if let Some(ref seg) = self.seg {
if let Some(content) = seg.get(hash)? {
return self.decode(content, hash);
}
// miss → fall through to loose objects (dual-read migration)
}
// v2 loose object.
let path = self.root.join(&hash[..2]).join(&hash[2..]);
let c = fs::read(&path).with_context(|| format!("read object {}", hash))?;
// Hash verification — any bit rot or tampering is caught here
let actual = blake2b(&c);
if actual != hash {
bail!("object {} tampered: expected {} got {}", hash, hash, actual);
}
self.decode(c, hash)
}
/// Decrypt (if a DEK is set) and deserialize raw content bytes into a Node.
/// Hash verification is the caller's responsibility (done before this for the
/// loose path; inside SegmentStore::get for the segment path; trusted for mem).
fn decode(&self, content: Vec<u8>, hash: &str) -> Result<Node> {
let raw = match &self.dek {
Some(dek) => decrypt(&content, dek)?,
None => content,
};
let mut node: Node = serde_json::from_slice(&raw)
.context("deserialize node")?;
if node.hash.is_empty() {
node.hash = hash.to_string();
}
Ok(node)
}
/// List all object hashes (for startup index rebuild / verify).
pub fn all_hashes(&self) -> Box<dyn Iterator<Item = String> + '_> {
// In-memory: collect from DashMap
if let Some(ref mem) = self.mem {
let hashes: Vec<String> = mem.iter().map(|e| e.key().clone()).collect();
return Box::new(hashes.into_iter());
}
// v3: union of packed-segment hashes and any loose v2 objects (deduped),
// skipping the segments/ subdir during the loose walk.
if let Some(ref seg) = self.seg {
let mut seen: std::collections::HashSet<String> =
seg.all_hashes().into_iter().collect();
if let Ok(rd) = fs::read_dir(&self.root) {
for prefix_dir in rd.flatten() {
if !prefix_dir.file_type().map(|t| t.is_dir()).unwrap_or(false) { continue; }
let prefix = prefix_dir.file_name().to_string_lossy().to_string();
if prefix.len() != 2 { continue; } // skip "segments" and non-prefix dirs
if let Ok(rd2) = fs::read_dir(prefix_dir.path()) {
for e in rd2.flatten() {
let name = e.file_name().to_string_lossy().to_string();
if name.ends_with(".tmp") { continue; }
seen.insert(format!("{}{}", prefix, name));
}
}
}
}
return Box::new(seen.into_iter());
}
// v2 (default): lazy walk of the objects/ directory tree (unchanged).
let root = self.root.clone();
Box::new(fs::read_dir(&root)
.into_iter()
.flatten()
.filter_map(|e| e.ok())
.filter(|e| e.file_type().map(|t| t.is_dir()).unwrap_or(false))
.flat_map(move |prefix_dir| {
let prefix = prefix_dir.file_name().to_string_lossy().to_string();
fs::read_dir(prefix_dir.path())
.into_iter()
.flatten()
.filter_map(|e| e.ok())
.filter_map(move |e| {
let name = e.file_name().to_string_lossy().to_string();
if name.ends_with(".tmp") { return None; }
Some(format!("{}{}", prefix, name))
})
}))
}
/// Flush durable state for the active segment (v3): one fsync per batch,
/// wired into Db::flush_all(). No-op for loose-object and in-memory modes.
pub fn sync(&self) -> Result<()> {
if let Some(ref seg) = self.seg {
seg.sync()?;
}
Ok(())
}
/// Compact the packed segment store (v3), keeping only objects whose hash is
/// in `live` and reclaiming the rest. No-op (zeroed stats) for loose-object
/// and in-memory modes.
pub fn compact(&self, live: &std::collections::HashSet<String>) -> Result<crate::segment::CompactStats> {
match self.seg {
Some(ref seg) => seg.compact(live),
None => Ok(crate::segment::CompactStats::default()),
}
}
/// Verify all objects. Returns (ok_count, tampered_hashes).
pub fn verify_all(&self) -> (usize, Vec<String>) {
use rayon::prelude::*;
let hashes: Vec<String> = self.all_hashes().collect();
let results: Vec<(bool, String)> = hashes.par_iter().map(|h| {
(self.read(h).is_ok(), h.clone())
}).collect();
let ok = results.iter().filter(|(ok, _)| *ok).count();
let bad: Vec<String> = results.into_iter()
.filter(|(ok, _)| !*ok)
.map(|(_, h)| h)
.collect();
(ok, bad)
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
fn make_node(id: &str, coll: &str, seq: u64) -> Node {
Node {
id: id.to_string(), coll: coll.to_string(), seq,
data: serde_json::json!({"height": seq, "hash": "0000abc"}),
prev: None, caused_by: vec![], ts: 1718400000.0,
valid_from: None, valid_to: None, hash: String::new(),
}
}
#[test]
fn write_read_roundtrip() {
let dir = tempdir().unwrap();
let store = ObjectStore::new(dir.path(), None).unwrap();
let mut node = make_node("1", "blocks", 1);
let hash = store.write(&mut node).unwrap();
assert_eq!(hash.len(), 64);
let read_back = store.read(&hash).unwrap();
assert_eq!(read_back.id, "1");
assert_eq!(read_back.coll, "blocks");
}
#[test]
fn write_is_idempotent() {
let dir = tempdir().unwrap();
let store = ObjectStore::new(dir.path(), None).unwrap();
let mut node = make_node("1", "blocks", 1);
let h1 = store.write(&mut node).unwrap();
let h2 = store.write(&mut node).unwrap();
assert_eq!(h1, h2);
}
#[test]
fn tamper_detected() {
let dir = tempdir().unwrap();
let store = ObjectStore::new(dir.path(), None).unwrap();
let mut node = make_node("1", "blocks", 1);
let hash = store.write(&mut node).unwrap();
// Corrupt the object file
let path = dir.path().join("objects").join(&hash[..2]).join(&hash[2..]);
let mut content = fs::read(&path).unwrap();
content[10] ^= 0xff;
fs::write(&path, content).unwrap();
assert!(store.read(&hash).is_err());
}
}