Skip to main content

nedb_engine/
db.rs

1//! Main DAG database — coordinates ObjectStore, IdIndex, SortedIndexes, GraphStore.
2
3use std::fs;
4use std::path::{Path, PathBuf};
5use std::sync::Arc;
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7use anyhow::Result;
8use dashmap::DashMap;
9use serde_json::Value;
10use parking_lot::RwLock;
11
12use crate::store::{Dek, Node, ObjectStore};
13use crate::index::{IdIndex, OrderedValue, SortedIndexes};
14use crate::graph::GraphStore;
15use crate::migrate;
16
17/// MANIFEST: cached {seq, head} written atomically after every write.
18/// On startup, if MANIFEST exists and no sorted indexes need rebuilding,
19/// startup is O(1) — just read this one file instead of scanning all objects.
20#[derive(serde::Serialize, serde::Deserialize)]
21struct Manifest {
22    seq:  u64,
23    head: String,
24}
25
26pub struct Db {
27    pub objects:        ObjectStore,
28    pub id_index:       IdIndex,
29    pub sorted_indexes: SortedIndexes,
30    pub graph:          GraphStore,
31    pub root:           PathBuf,
32    /// Dirty flag — set true when head changes, cleared after manifest flush.
33    /// Decouples flush_manifest from the hot write path so concurrent writes
34    /// don't serialise on 2× file I/O per PUT.
35    manifest_dirty:     Arc<AtomicBool>,
36    pub seq:            AtomicU64,
37    /// Cached Merkle head — updated incrementally on every write (O(1)).
38    head:               RwLock<String>,
39    /// True once startup is fully ready (MANIFEST loaded or cold scan complete).
40    /// Warm starts set this true before returning from open().
41    /// Cold starts set this true in the background thread when scan completes.
42    /// Writes are held with 503 until this is true; reads always proceed.
43    pub startup_ready:  Arc<AtomicBool>,
44    /// Seq → hash lookup for v1 compatibility. Populated by put(), put_batch(),
45    /// and the cold-scan background pass. Only covers nodes from the current
46    /// process session + cold-scan; older seqs not in this map cannot be resolved.
47    seq_index:          Arc<DashMap<u64, String>>,
48}
49
50impl Db {
51    /// Create a pure in-memory database — no disk I/O, no migration, instant startup.
52    /// Perfect for tests, hot-cache layers, and ephemeral sessions.
53    /// All data is lost when the Db is dropped.
54    pub fn in_memory() -> Self {
55        Self {
56            objects:        ObjectStore::in_memory(),
57            id_index:       IdIndex::in_memory(),
58            sorted_indexes: SortedIndexes::new(),
59            graph:          GraphStore::in_memory(),
60            root:           std::path::PathBuf::from(":memory:"),
61            seq:            AtomicU64::new(0),
62            head:           RwLock::new(String::new()),
63            startup_ready:  Arc::new(AtomicBool::new(true)),  // always ready
64            manifest_dirty: Arc::new(AtomicBool::new(false)),
65            seq_index:      Arc::new(DashMap::new()),
66        }
67    }
68
69    /// Open (or create) a database. Runs v1→v2 migration automatically if log.aof is present.
70    pub fn open(db_root: &Path, dek: Option<Dek>) -> Result<Self> {
71        std::fs::create_dir_all(db_root)?;
72
73        let objects        = ObjectStore::new(db_root, dek.clone())?;
74        let id_index       = IdIndex::new(db_root)?;
75        let sorted_indexes = SortedIndexes::new();
76        let graph          = GraphStore::new(db_root)?;
77
78        let mut db = Self {
79            objects,
80            id_index,
81            sorted_indexes,
82            graph,
83            root: db_root.to_path_buf(),
84            seq:  AtomicU64::new(0),
85            head: RwLock::new(String::new()),
86            startup_ready:  Arc::new(AtomicBool::new(false)),
87            manifest_dirty: Arc::new(AtomicBool::new(false)),
88            seq_index:      Arc::new(DashMap::new()),
89        };
90
91        // Auto-migrate v1 → v2 if needed (pass DEK so encrypted AOFs convert correctly)
92        migrate::migrate_if_needed(
93            db_root,
94            &db.objects,
95            &db.id_index,
96            &db.sorted_indexes,
97            &db.graph,
98            dek.as_ref(),
99        )?;
100
101        // Fast startup: load seq+head from MANIFEST if no sorted indexes need rebuilding.
102        // Falls back to full object scan only when necessary (first open, or post-migration).
103        db.startup_rebuild()?;
104
105        Ok(db)
106    }
107
108    /// Smart startup:
109    /// - Warm (MANIFEST exists): O(1) load → startup_ready = true immediately.
110    /// - Cold (no MANIFEST): start server immediately, run scan in background thread.
111    ///   Writes return 503 until scan completes; reads always proceed.
112    fn startup_rebuild(&mut self) -> Result<()> {
113        let manifest_path = self.root.join("MANIFEST");
114        let needs_index_rebuild = !self.sorted_indexes.is_empty();
115
116        // Warm path: MANIFEST + no sorted indexes to rebuild → instant start
117        if manifest_path.exists() && !needs_index_rebuild {
118            if let Some(m) = fs::read_to_string(&manifest_path)
119                .ok()
120                .and_then(|s| serde_json::from_str::<Manifest>(&s).ok())
121            {
122                // Self-heal: MANIFEST with an empty or short head is corrupt/stale.
123                // Fall through to cold scan so the head is rebuilt correctly from objects.
124                if m.head.len() < 8 {
125                    eprintln!("  [nedbd] MANIFEST head invalid (len={}), self-healing via cold scan", m.head.len());
126                } else {
127                    self.seq.store(m.seq, Ordering::SeqCst); // m.seq is already the next-to-assign counter
128                    *self.head.write() = m.head.clone();
129                    self.startup_ready.store(true, Ordering::SeqCst);
130                    println!("  [nedbd] warm start — seq={} head={}...", m.seq, &m.head[..8]);
131                    return Ok(());
132                }
133            } else {
134                eprintln!("  [nedbd] MANIFEST corrupt or missing, falling back to cold scan");
135            }
136        }
137
138        // Cold path: mark as not ready, return immediately.
139        // The actual background scan is started by Db::start_cold_scan(arc)
140        // which is called from Manager::open_all() AFTER Arc::new(db) — when
141        // the Db is heap-allocated and its field addresses are permanently stable.
142        // Capturing field addresses here would cause UB: Db moves on return.
143        println!("  [nedbd] cold start — background scan will start after heap allocation");
144        Ok(())
145    }
146
147    /// Call this from Manager::open_all() after Arc::new(db).
148    /// Spawns the cold scan background thread with stable heap addresses.
149    /// No-op if startup is already complete (warm start).
150    pub fn start_cold_scan(self_arc: Arc<Self>) {
151        if self_arc.startup_ready.load(Ordering::SeqCst) {
152            return; // warm start — already ready
153        }
154        // Fast path: if the database is empty (new or just created), skip the
155        // background thread entirely. No objects to scan = instant startup.
156        if self_arc.objects.all_hashes().next().is_none() {
157            self_arc.startup_ready.store(true, Ordering::SeqCst);
158            return;
159        }
160        println!("  [nedbd] cold start — background scan starting, server accepting reads now");
161        std::thread::spawn(move || {
162            let db = self_arc;
163            cold_scan_background_arc(db);
164        });
165    }
166
167    /// Write a document. Returns the new node with its content hash set.
168    pub fn put(
169        &self,
170        coll: &str,
171        id: &str,
172        data: Value,
173        caused_by: Vec<String>,
174        valid_from: Option<String>,
175        valid_to:   Option<String>,
176    ) -> Result<Node> {
177        let seq  = self.seq.fetch_add(1, Ordering::SeqCst);
178        let prev = self.id_index.get(coll, id);
179
180        // Remove old node from sorted indexes (it's being superseded)
181        if let Some(old_hash) = &prev {
182            if let Ok(old_node) = self.objects.read(old_hash) {
183                if let Value::Object(ref obj) = old_node.data {
184                    for (field, value) in obj {
185                        self.sorted_indexes.remove(coll, field, value, old_hash);
186                    }
187                }
188            }
189        }
190
191        let mut node = Node {
192            id:         id.to_string(),
193            coll:       coll.to_string(),
194            seq,
195            data:       data.clone(),
196            prev,
197            caused_by:  caused_by.clone(),
198            ts:         now(),
199            valid_from,
200            valid_to,
201            hash:       String::new(),
202        };
203
204        // Write to object store (atomic, content-addressed)
205        let hash = self.objects.write(&mut node)?;
206        self.seq_index.insert(seq, hash.clone());
207
208        // Update id index (atomic file)
209        self.id_index.set(coll, id, &hash)?;
210
211        // Update sorted indexes
212        if let Value::Object(ref obj) = data {
213            for (field, value) in obj {
214                if self.sorted_indexes.has(coll, field) {
215                    self.sorted_indexes.insert(coll, field, value, &hash);
216                }
217            }
218        }
219
220        // Write causal graph edges
221        for cause in &caused_by {
222            self.graph.add_edge(&hash, "caused_by", cause)?;
223            self.graph.add_edge(cause, "caused_by_rev", &hash)?;
224        }
225
226        // Update running Merkle head: O(1) chain, no full recompute.
227        // new_head = BLAKE2b(prev_head || seq_bytes || new_object_hash)
228        self.update_head(seq, &hash);
229
230        Ok(node)
231    }
232
233    /// Batch put: write N documents in parallel, preserving monotonic seq ordering.
234    /// Pre-allocates N seq numbers atomically, then parallelises object writes and
235    /// id-index updates via Rayon. Each op is independent — safe to parallelise.
236    /// Returns nodes in input order with assigned seq numbers.
237    pub fn put_batch(
238        &self,
239        ops: Vec<(String, String, Value, Vec<String>, Option<String>, Option<String>)>,
240        // (coll, id, data, caused_by, valid_from, valid_to)
241    ) -> Result<Vec<Node>> {
242        use rayon::prelude::*;
243
244        if ops.is_empty() { return Ok(vec![]); }
245        let n = ops.len() as u64;
246
247        // Pre-allocate N consecutive seq numbers — preserves ordering under concurrency
248        let base_seq = self.seq.fetch_add(n, Ordering::SeqCst);
249        let ts = now();
250
251        // Build nodes with assigned seq numbers
252        let mut nodes: Vec<Node> = ops.into_iter().enumerate().map(|(i, (coll, id, data, caused_by, valid_from, valid_to))| {
253            let prev = self.id_index.get(&coll, &id);
254            Node {
255                id, coll, seq: base_seq + i as u64,
256                data, prev, caused_by,
257                ts, valid_from, valid_to,
258                hash: String::new(),
259            }
260        }).collect();
261
262        // Parallel object writes (content-addressed, idempotent, safe to parallelise)
263        let write_errors: Vec<anyhow::Error> = nodes.par_iter_mut()
264            .filter_map(|node| self.objects.write(node).err())
265            .collect();
266        if let Some(e) = write_errors.into_iter().next() { return Err(e); }
267
268        // Parallel id-index updates
269        let index_errors: Vec<anyhow::Error> = nodes.par_iter()
270            .filter_map(|node| self.id_index.set(&node.coll, &node.id, &node.hash).err())
271            .collect();
272        if let Some(e) = index_errors.into_iter().next() { return Err(e); }
273
274        // Sorted indexes + causal graph (sequential — small overhead, usually no indexes)
275        for node in &nodes {
276            self.seq_index.insert(node.seq, node.hash.clone());
277            if let Value::Object(ref obj) = node.data {
278                for (field, value) in obj {
279                    if self.sorted_indexes.has(&node.coll, field) {
280                        self.sorted_indexes.insert(&node.coll, field, value, &node.hash);
281                    }
282                }
283            }
284            for cause in &node.caused_by {
285                self.graph.add_edge(&node.hash, "caused_by", cause).ok();
286                self.graph.add_edge(cause, "caused_by_rev", &node.hash).ok();
287            }
288        }
289
290        // Single Merkle head update for the whole batch (chain all hashes)
291        for node in &nodes {
292            self.update_head(node.seq, &node.hash);
293        }
294
295        Ok(nodes)
296    }
297
298    /// Update the running Merkle head with a new write. O(1), lock-free on the flush path.
299    /// Sets dirty flag — the background ticker calls flush_manifest periodically.
300    /// This removes 2× file I/O ops from the hot write path, unblocking concurrent writes.
301    fn update_head(&self, seq: u64, new_hash: &str) {
302        use blake2::{Blake2b512, Digest};
303        let prev = self.head.read().clone();
304        let mut h = Blake2b512::new();
305        h.update(prev.as_bytes());
306        h.update(seq.to_le_bytes());
307        h.update(new_hash.as_bytes());
308        *self.head.write() = hex::encode(&h.finalize()[..32]);
309        // Mark dirty — background ticker will flush to MANIFEST (no I/O on write path)
310        self.manifest_dirty.store(true, Ordering::Release);
311    }
312
313    /// Flush both the id-index WAL and MANIFEST. Used on graceful shutdown.
314    pub fn flush_all(&self) {
315        self.id_index.flush_write_buf();
316        self.flush_manifest();
317    }
318
319    /// Flush MANIFEST to disk if dirty. No-op for in-memory databases.
320    pub fn flush_manifest_if_dirty(&self) {
321        if self.root == std::path::PathBuf::from(":memory:") { return; }
322        if self.manifest_dirty.compare_exchange(
323            true, false, Ordering::AcqRel, Ordering::Relaxed
324        ).is_ok() {
325            self.flush_manifest();
326        }
327    }
328
329    /// Atomically persist current seq+head to MANIFEST. No-op for in-memory databases.
330    pub fn flush_manifest(&self) {
331        if self.root == std::path::PathBuf::from(":memory:") { return; }
332        let seq  = self.seq.load(Ordering::SeqCst);
333        let head = self.head.read().clone();
334        let m = Manifest { seq, head };
335        if let Ok(json) = serde_json::to_string(&m) {
336            let path = self.root.join("MANIFEST");
337            let tmp  = self.root.join("MANIFEST.tmp");
338            let _ = fs::write(&tmp, &json);
339            let _ = fs::rename(&tmp, &path);
340        }
341    }
342
343    /// Start a background thread that flushes both the id-index WAL and MANIFEST
344    /// every `interval_ms` milliseconds.
345    /// Call this after Arc::new(db) — the Arc keeps Db alive for the thread's lifetime.
346    pub fn start_manifest_ticker(self_arc: Arc<Self>, interval_ms: u64) {
347        let db = self_arc;
348        std::thread::spawn(move || {
349            loop {
350                std::thread::sleep(std::time::Duration::from_millis(interval_ms));
351                // Flush id-index WAL to disk (parallel Rayon writes)
352                db.id_index.flush_write_buf();
353                // Then flush MANIFEST
354                db.flush_manifest_if_dirty();
355            }
356        });
357    }
358
359    /// Return the current Merkle head string. O(1) — read from cache.
360    pub fn head(&self) -> String {
361        self.head.read().clone()
362    }
363
364    /// Delete a document — writes a tombstone node and removes the id from the index.
365    /// The object history is preserved in the DAG; only the live id pointer is cleared.
366    pub fn delete(&self, coll: &str, id: &str) -> Result<bool> {
367        let prev = match self.id_index.get(coll, id) {
368            None => return Ok(false),   // already gone
369            Some(h) => h,
370        };
371        let seq = self.seq.fetch_add(1, Ordering::SeqCst);
372        let mut tombstone = Node {
373            id:         format!("_del_{}", id),
374            coll:       coll.to_string(),
375            seq,
376            data:       serde_json::json!({"_deleted": id, "_prev": prev}),
377            prev:       Some(prev),
378            caused_by:  vec![],
379            ts:         now(),
380            valid_from: None,
381            valid_to:   None,
382            hash:       String::new(),
383        };
384        let hash = self.objects.write(&mut tombstone)?;
385        self.update_head(seq, &hash);
386        // Remove the live id pointer — doc is now invisible to queries and list()
387        self.id_index.remove(coll, id)?;
388        Ok(true)
389    }
390
391    /// Get the current version of a document by id.
392    pub fn get(&self, coll: &str, id: &str) -> Option<Node> {
393        let hash = self.id_index.get(coll, id)?;
394        self.objects.read(&hash).ok()
395    }
396
397    /// Get a specific version of a document by object hash.
398    pub fn get_by_hash(&self, hash: &str) -> Option<Node> {
399        self.objects.read(hash).ok()
400    }
401
402    /// Get a document AS OF a specific sequence number.
403    /// Walks the version chain (prev links) backward until seq <= target.
404    pub fn get_as_of(&self, coll: &str, id: &str, target_seq: u64) -> Option<Node> {
405        let hash = self.id_index.get(coll, id)?;
406        let mut current = self.objects.read(&hash).ok()?;
407        loop {
408            if current.seq <= target_seq {
409                return Some(current);
410            }
411            let prev_hash = current.prev.as_deref()?;
412            current = self.objects.read(prev_hash).ok()?;
413        }
414    }
415
416    /// List all documents in a collection, returning current versions.
417    pub fn list(&self, coll: &str) -> Vec<Node> {
418        self.id_index
419            .list_ids(coll)
420            .into_iter()
421            .filter_map(|id| self.get(coll, &id))
422            .collect()
423    }
424
425    /// ORDER BY field ASC LIMIT n — uses sorted index if available, else falls back to full scan.
426    pub fn order_by_asc(&self, coll: &str, field: &str, limit: usize) -> Vec<Node> {
427        if self.sorted_indexes.has(coll, field) {
428            self.sorted_indexes
429                .top_k_asc(coll, field, limit)
430                .into_iter()
431                .filter_map(|h| self.objects.read(&h).ok())
432                .collect()
433        } else {
434            let mut docs = self.list(coll);
435            docs.sort_by(|a, b| {
436                let av = a.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
437                let bv = b.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
438                av.cmp(&bv)
439            });
440            docs.truncate(limit);
441            docs
442        }
443    }
444
445    /// ORDER BY field DESC LIMIT n
446    pub fn order_by_desc(&self, coll: &str, field: &str, limit: usize) -> Vec<Node> {
447        if self.sorted_indexes.has(coll, field) {
448            self.sorted_indexes
449                .top_k_desc(coll, field, limit)
450                .into_iter()
451                .filter_map(|h| self.objects.read(&h).ok())
452                .collect()
453        } else {
454            let mut docs = self.list(coll);
455            docs.sort_by(|a, b| {
456                let av = a.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
457                let bv = b.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
458                bv.cmp(&av)
459            });
460            docs.truncate(limit);
461            docs
462        }
463    }
464
465    /// TRACE caused_by — walk causal graph from a node.
466    pub fn trace(&self, hash: &str, reverse: bool, limit: usize) -> Vec<Node> {
467        self.graph
468            .trace(hash, "caused_by", reverse, limit)
469            .into_iter()
470            .filter_map(|h| self.objects.read(&h).ok())
471            .collect()
472    }
473
474    /// Verify tamper-evidence of all objects.
475    pub fn verify(&self) -> (usize, Vec<String>) {
476        self.objects.verify_all()
477    }
478
479    /// Create a sorted index for a (coll, field) pair.
480    pub fn create_sorted_index(&self, coll: &str, field: &str) {
481        self.sorted_indexes.ensure(coll, field);
482        // Backfill from existing objects
483        for id in self.id_index.list_ids(coll) {
484            if let Some(node) = self.get(coll, &id) {
485                if let Value::Object(ref obj) = node.data {
486                    if let Some(value) = obj.get(field) {
487                        self.sorted_indexes.insert(coll, field, value, &node.hash);
488                    }
489                }
490            }
491        }
492    }
493
494    /// Resolve a sequence number to its content hash (v1 compatibility).
495    /// Only covers nodes written in the current process session + cold-scan nodes.
496    pub fn get_hash_by_seq(&self, seq: u64) -> Option<String> {
497        self.seq_index.get(&seq).map(|r| r.clone())
498    }
499
500    /// Add an explicit named relation edge between two documents.
501    /// Add an explicit named relation between two "coll:id" nodes.
502    /// Relations stored as __links__ documents — NQL-queryable, time-travelable,
503    /// consistent with the PyO3 binding which uses the same __links__ convention.
504    pub fn link(&self, frm: &str, rel: &str, to: &str) -> Result<()> {
505        let (frm_coll, frm_id) = frm.split_once(':')
506            .ok_or_else(|| anyhow::anyhow!("link frm must be 'coll:id', got: {}", frm))?;
507        let (to_coll, to_id) = to.split_once(':')
508            .ok_or_else(|| anyhow::anyhow!("link to must be 'coll:id', got: {}", to))?;
509        if self.id_index.get(frm_coll, frm_id).is_none() {
510            anyhow::bail!("link: frm not found: {}", frm);
511        }
512        if self.id_index.get(to_coll, to_id).is_none() {
513            anyhow::bail!("link: to not found: {}", to);
514        }
515        let link_id = format!("{}|{}|{}", frm, rel, to);
516        let doc = serde_json::json!({"_from": frm, "_rel": rel, "_to": to});
517        self.put("__links__", &link_id, doc, vec![], None, None)?;
518        Ok(())
519    }
520
521    /// Remove a named relation (deletes the __links__ document).
522    pub fn unlink(&self, frm: &str, rel: &str, to: &str) -> Result<bool> {
523        let link_id = format!("{}|{}|{}", frm, rel, to);
524        self.delete("__links__", &link_id)
525    }
526
527    /// Get neighbor nodes via a named relation.
528    /// Queries __links__ — consistent with the PyO3 binding.
529    pub fn neighbors(&self, frm: &str, rel: &str) -> Vec<Node> {
530        self.id_index
531            .list_ids("__links__")
532            .into_iter()
533            .filter_map(|id| self.get("__links__", &id))
534            .filter(|node| {
535                node.data.get("_from").and_then(|v| v.as_str()) == Some(frm)
536                    && node.data.get("_rel").and_then(|v| v.as_str()) == Some(rel)
537            })
538            .filter_map(|node| {
539                let to = node.data.get("_to")?.as_str()?;
540                let (to_coll, to_id) = to.split_once(':')?;
541                self.get(to_coll, to_id)
542            })
543            .collect()
544    }
545}
546
547/// Background cold-scan worker. Takes Arc<Db> — safe, Db is on the heap.
548fn cold_scan_background_arc(db: Arc<Db>) {
549    use rayon::prelude::*;
550    use blake2::{Blake2b512, Digest};
551
552    let objects        = &db.objects;
553    let head           = &db.head;
554    let seq_atomic     = &db.seq;
555    let sorted_indexes = &db.sorted_indexes;
556    let root           = db.root.clone();
557    let ready_flag     = Arc::clone(&db.startup_ready);
558
559    let hashes: Vec<String> = objects.all_hashes().collect();
560    let total = hashes.len();
561
562    if total == 0 {
563        ready_flag.store(true, Ordering::SeqCst);
564        return;
565    }
566
567    println!("  [nedbd] background scan — {} objects...", total);
568    let t0 = std::time::Instant::now();
569    let step = (total / 10).max(1000);
570
571    let nodes: Vec<Node> = hashes.par_iter()
572        .enumerate()
573        .filter_map(|(i, h)| {
574            if i > 0 && i % step == 0 {
575                let pct     = i * 100 / total;
576                let elapsed = t0.elapsed().as_secs_f32();
577                let rate    = i as f32 / elapsed;
578                let eta     = (total - i) as f32 / rate;
579                eprint!("\r  [nedbd]   {:>3}%  {:>8} / {:>8}  ({:>8.0}/s  eta {:.0}s)   ",
580                    pct, i, total, rate, eta);
581            }
582            objects.read(h).ok()
583        })
584        .collect();
585
586    eprintln!("\r  [nedbd]   100%  {:>8} / {:>8}  ({:.1}s)                        ",
587        total, total, t0.elapsed().as_secs_f32());
588
589    let max_seq = nodes.iter().map(|n| n.seq).max().unwrap_or(0);
590    seq_atomic.store(max_seq + 1, Ordering::SeqCst);
591
592    for node in &nodes {
593        db.seq_index.insert(node.seq, node.hash.clone());
594        if let Value::Object(ref obj) = node.data {
595            for (field, value) in obj {
596                if sorted_indexes.has(&node.coll, field) {
597                    sorted_indexes.insert(&node.coll, field, value, &node.hash);
598                }
599            }
600        }
601    }
602
603    // Compute Merkle head from sorted hashes
604    let mut sorted_hashes = hashes;
605    sorted_hashes.sort();
606    let mut h = Blake2b512::new();
607    h.update(max_seq.to_le_bytes());
608    for hash_str in &sorted_hashes {
609        h.update(hash_str.as_bytes());
610    }
611    let new_head = hex::encode(&h.finalize()[..32]);
612    *head.write() = new_head.clone();
613
614    // Write MANIFEST atomically
615    let m     = Manifest { seq: max_seq, head: new_head };
616    let json  = serde_json::to_string(&m).unwrap_or_default();
617    let path  = root.join("MANIFEST");
618    let tmp   = root.join("MANIFEST.tmp");
619    let _ = fs::write(&tmp, &json);
620    let _ = fs::rename(&tmp, &path);
621
622    // Signal server: writes can now proceed
623    ready_flag.store(true, Ordering::SeqCst);
624    println!("  [nedbd] background scan complete — seq={} objects={} MANIFEST written", max_seq, total);
625}
626
627fn now() -> f64 {
628    std::time::SystemTime::now()
629        .duration_since(std::time::UNIX_EPOCH)
630        .map(|d| d.as_secs_f64())
631        .unwrap_or(0.0)
632}
633
634#[cfg(test)]
635mod tests {
636    use super::*;
637    use tempfile::tempdir;
638
639    #[test]
640    fn put_and_get() {
641        let dir = tempdir().unwrap();
642        let db = Db::open(dir.path(), None).unwrap();
643        db.put(
644            "blocks", "618000",
645            serde_json::json!({"height": 618000, "hash": "0000abc"}),
646            vec![], None, None,
647        ).unwrap();
648        let node = db.get("blocks", "618000").unwrap();
649        assert_eq!(node.id, "618000");
650        assert_eq!(node.data["height"], 618000);
651    }
652
653    #[test]
654    fn order_by_with_sorted_index() {
655        let dir = tempdir().unwrap();
656        let db = Db::open(dir.path(), None).unwrap();
657        db.create_sorted_index("blocks", "height");
658        for h in [3u64, 1, 5, 2, 4] {
659            db.put("blocks", &h.to_string(),
660                serde_json::json!({"height": h}),
661                vec![], None, None).unwrap();
662        }
663        let asc = db.order_by_asc("blocks", "height", 3);
664        let heights: Vec<u64> = asc.iter()
665            .filter_map(|n| n.data["height"].as_u64())
666            .collect();
667        assert_eq!(heights, vec![1, 2, 3]);
668    }
669
670    #[test]
671    fn causal_trace() {
672        let dir = tempdir().unwrap();
673        let db = Db::open(dir.path(), None).unwrap();
674        let a = db.put("ops", "a", serde_json::json!({"op": "create"}), vec![], None, None).unwrap();
675        let b = db.put("ops", "b", serde_json::json!({"op": "transfer"}), vec![a.hash.clone()], None, None).unwrap();
676        let c = db.put("ops", "c", serde_json::json!({"op": "burn"}), vec![b.hash.clone()], None, None).unwrap();
677
678        let trace = db.trace(&c.hash, false, 10);
679        assert_eq!(trace.len(), 3);  // c → b → a
680    }
681
682    #[test]
683    fn as_of() {
684        let dir = tempdir().unwrap();
685        let db = Db::open(dir.path(), None).unwrap();
686        let v1 = db.put("docs", "x", serde_json::json!({"v": 1}), vec![], None, None).unwrap();
687        let _v2 = db.put("docs", "x", serde_json::json!({"v": 2}), vec![], None, None).unwrap();
688
689        let at_v1 = db.get_as_of("docs", "x", v1.seq).unwrap();
690        assert_eq!(at_v1.data["v"], 1);
691        let current = db.get("docs", "x").unwrap();
692        assert_eq!(current.data["v"], 2);
693    }
694}
695
696#[cfg(test)]
697mod tests_v2 {
698    use super::*;
699    use tempfile::tempdir;
700
701    #[test]
702    fn seq_index_populated_on_put() {
703        let db = Db::in_memory();
704        let a = db.put("item", "a", serde_json::json!({"x": 1}), vec![], None, None).unwrap();
705        let b = db.put("item", "b", serde_json::json!({"x": 2}), vec![], None, None).unwrap();
706        assert_eq!(db.get_hash_by_seq(a.seq), Some(a.hash.clone()));
707        assert_eq!(db.get_hash_by_seq(b.seq), Some(b.hash.clone()));
708        assert_eq!(db.get_hash_by_seq(9999), None);
709    }
710
711    #[test]
712    fn seq_index_survives_batch() {
713        let db = Db::in_memory();
714        let nodes = db.put_batch(vec![
715            ("item".into(), "x".into(), serde_json::json!({"v": 1}), vec![], None, None),
716            ("item".into(), "y".into(), serde_json::json!({"v": 2}), vec![], None, None),
717        ]).unwrap();
718        for node in &nodes {
719            assert_eq!(db.get_hash_by_seq(node.seq), Some(node.hash.clone()));
720        }
721    }
722
723    #[test]
724    fn link_and_neighbors() {
725        let db = Db::in_memory();
726        db.put("driver", "d1", serde_json::json!({"name": "Bob"}),   vec![], None, None).unwrap();
727        db.put("driver", "d2", serde_json::json!({"name": "Carol"}), vec![], None, None).unwrap();
728        db.put("trip",   "t1", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
729        db.put("trip",   "t2", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
730
731        db.link("driver:d1", "handles", "trip:t1").unwrap();
732        db.link("driver:d1", "handles", "trip:t2").unwrap();
733        db.link("driver:d2", "handles", "trip:t1").unwrap();
734
735        let d1_trips = db.neighbors("driver:d1", "handles");
736        assert_eq!(d1_trips.len(), 2);
737        let ids: std::collections::HashSet<&str> = d1_trips.iter().map(|n| n.id.as_str()).collect();
738        assert!(ids.contains("t1") && ids.contains("t2"));
739
740        let d2_trips = db.neighbors("driver:d2", "handles");
741        assert_eq!(d2_trips.len(), 1);
742        assert_eq!(d2_trips[0].id, "t1");
743    }
744
745    #[test]
746    fn link_stored_in_links_collection() {
747        // Links are stored as __links__ documents, not as graph edges.
748        // The __links__ collection is NQL-queryable and consistent with the PyO3 binding.
749        let db = Db::in_memory();
750        db.put("driver", "d1", serde_json::json!({"name": "Bob"}),   vec![], None, None).unwrap();
751        db.put("trip",   "t1", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
752        db.link("driver:d1", "handles", "trip:t1").unwrap();
753        // Verify the __links__ document was created
754        let link_doc = db.get("__links__", "driver:d1|handles|trip:t1");
755        assert!(link_doc.is_some(), "__links__ doc should exist");
756        let doc = link_doc.unwrap();
757        assert_eq!(doc.data["_from"], "driver:d1");
758        assert_eq!(doc.data["_rel"],  "handles");
759        assert_eq!(doc.data["_to"],   "trip:t1");
760        // neighbors() resolves to the target node
761        let nb = db.neighbors("driver:d1", "handles");
762        assert_eq!(nb.len(), 1);
763        assert_eq!(nb[0].id, "t1");
764    }
765
766    #[test]
767    fn link_missing_node_errors() {
768        let db = Db::in_memory();
769        db.put("driver", "d1", serde_json::json!({}), vec![], None, None).unwrap();
770        assert!(db.link("driver:d1", "handles", "trip:ghost").is_err());
771    }
772
773    #[test]
774    fn link_durable_survives_reopen() {
775        let dir = tempdir().unwrap();
776        {
777            let db = Db::open(dir.path(), None).unwrap();
778            db.put("driver", "d1", serde_json::json!({"name": "Bob"}),   vec![], None, None).unwrap();
779            db.put("trip",   "t1", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
780            db.link("driver:d1", "handles", "trip:t1").unwrap();
781        }
782        let db2 = Db::open(dir.path(), None).unwrap();
783        db2.startup_ready.store(true, std::sync::atomic::Ordering::SeqCst);
784        let trips = db2.neighbors("driver:d1", "handles");
785        assert_eq!(trips.len(), 1);
786        assert_eq!(trips[0].id, "t1");
787    }
788}