Skip to main content

nedb_engine/
db.rs

1//! Main DAG database — coordinates ObjectStore, IdIndex, SortedIndexes, GraphStore.
2
3use std::fs;
4use std::path::{Path, PathBuf};
5use std::sync::Arc;
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7use anyhow::Result;
8use dashmap::DashMap;
9use serde_json::Value;
10use parking_lot::RwLock;
11
12use crate::store::{Dek, Node, ObjectStore};
13use crate::index::{IdIndex, OrderedValue, SortedIndexes};
14use crate::graph::GraphStore;
15use crate::migrate;
16
17/// MANIFEST: cached {seq, head} written atomically after every write.
18/// On startup, if MANIFEST exists and no sorted indexes need rebuilding,
19/// startup is O(1) — just read this one file instead of scanning all objects.
20#[derive(serde::Serialize, serde::Deserialize)]
21struct Manifest {
22    seq:  u64,
23    head: String,
24}
25
26pub struct Db {
27    pub objects:        ObjectStore,
28    pub id_index:       IdIndex,
29    pub sorted_indexes: SortedIndexes,
30    pub graph:          GraphStore,
31    pub root:           PathBuf,
32    /// Dirty flag — set true when head changes, cleared after manifest flush.
33    /// Decouples flush_manifest from the hot write path so concurrent writes
34    /// don't serialise on 2× file I/O per PUT.
35    manifest_dirty:     Arc<AtomicBool>,
36    pub seq:            AtomicU64,
37    /// Cached Merkle head — updated incrementally on every write (O(1)).
38    head:               RwLock<String>,
39    /// True once startup is fully ready (MANIFEST loaded or cold scan complete).
40    /// Warm starts set this true before returning from open().
41    /// Cold starts set this true in the background thread when scan completes.
42    /// Writes are held with 503 until this is true; reads always proceed.
43    pub startup_ready:  Arc<AtomicBool>,
44    /// Seq → hash lookup for v1 compatibility. Populated by put(), put_batch(),
45    /// and the cold-scan background pass. Only covers nodes from the current
46    /// process session + cold-scan; older seqs not in this map cannot be resolved.
47    seq_index:          Arc<DashMap<u64, String>>,
48}
49
50impl Db {
51    /// Create a pure in-memory database — no disk I/O, no migration, instant startup.
52    /// Perfect for tests, hot-cache layers, and ephemeral sessions.
53    /// All data is lost when the Db is dropped.
54    pub fn in_memory() -> Self {
55        Self {
56            objects:        ObjectStore::in_memory(),
57            id_index:       IdIndex::in_memory(),
58            sorted_indexes: SortedIndexes::new(),
59            graph:          GraphStore::in_memory(),
60            root:           std::path::PathBuf::from(":memory:"),
61            seq:            AtomicU64::new(0),
62            head:           RwLock::new(String::new()),
63            startup_ready:  Arc::new(AtomicBool::new(true)),  // always ready
64            manifest_dirty: Arc::new(AtomicBool::new(false)),
65            seq_index:      Arc::new(DashMap::new()),
66        }
67    }
68
69    /// Open (or create) a database. Runs v1→v2 migration automatically if log.aof is present.
70    pub fn open(db_root: &Path, dek: Option<Dek>) -> Result<Self> {
71        std::fs::create_dir_all(db_root)?;
72
73        let objects        = ObjectStore::new(db_root, dek.clone())?;
74        let id_index       = IdIndex::new(db_root)?;
75        let sorted_indexes = SortedIndexes::new();
76        let graph          = GraphStore::new(db_root)?;
77
78        let mut db = Self {
79            objects,
80            id_index,
81            sorted_indexes,
82            graph,
83            root: db_root.to_path_buf(),
84            seq:  AtomicU64::new(0),
85            head: RwLock::new(String::new()),
86            startup_ready:  Arc::new(AtomicBool::new(false)),
87            manifest_dirty: Arc::new(AtomicBool::new(false)),
88            seq_index:      Arc::new(DashMap::new()),
89        };
90
91        // Auto-migrate v1 → v2 if needed (pass DEK so encrypted AOFs convert correctly)
92        migrate::migrate_if_needed(
93            db_root,
94            &db.objects,
95            &db.id_index,
96            &db.sorted_indexes,
97            &db.graph,
98            dek.as_ref(),
99        )?;
100
101        // Fast startup: load seq+head from MANIFEST if no sorted indexes need rebuilding.
102        // Falls back to full object scan only when necessary (first open, or post-migration).
103        db.startup_rebuild()?;
104
105        Ok(db)
106    }
107
108    /// Smart startup:
109    /// - Warm (MANIFEST exists): O(1) load → startup_ready = true immediately.
110    /// - Cold (no MANIFEST): start server immediately, run scan in background thread.
111    ///   Writes return 503 until scan completes; reads always proceed.
112    fn startup_rebuild(&mut self) -> Result<()> {
113        let manifest_path = self.root.join("MANIFEST");
114        let needs_index_rebuild = !self.sorted_indexes.is_empty();
115
116        // Warm path: MANIFEST + no sorted indexes to rebuild → instant start
117        if manifest_path.exists() && !needs_index_rebuild {
118            if let Some(m) = fs::read_to_string(&manifest_path)
119                .ok()
120                .and_then(|s| serde_json::from_str::<Manifest>(&s).ok())
121            {
122                // Self-heal: MANIFEST with an empty or short head is corrupt/stale.
123                // Fall through to cold scan so the head is rebuilt correctly from objects.
124                if m.head.len() < 8 {
125                    eprintln!("  [nedbd] MANIFEST head invalid (len={}), self-healing via cold scan", m.head.len());
126                } else {
127                    self.seq.store(m.seq, Ordering::SeqCst); // m.seq is already the next-to-assign counter
128                    *self.head.write() = m.head.clone();
129                    self.startup_ready.store(true, Ordering::SeqCst);
130                    println!("  [nedbd] warm start — seq={} head={}...", m.seq, &m.head[..8]);
131                    return Ok(());
132                }
133            } else {
134                eprintln!("  [nedbd] MANIFEST corrupt or missing, falling back to cold scan");
135            }
136        }
137
138        // Cold path: mark as not ready, return immediately.
139        // The actual background scan is started by Db::start_cold_scan(arc)
140        // which is called from Manager::open_all() AFTER Arc::new(db) — when
141        // the Db is heap-allocated and its field addresses are permanently stable.
142        // Capturing field addresses here would cause UB: Db moves on return.
143        println!("  [nedbd] cold start — background scan will start after heap allocation");
144        Ok(())
145    }
146
147    /// Call this from Manager::open_all() after Arc::new(db).
148    /// Spawns the cold scan background thread with stable heap addresses.
149    /// No-op if startup is already complete (warm start).
150    pub fn start_cold_scan(self_arc: Arc<Self>) {
151        if self_arc.startup_ready.load(Ordering::SeqCst) {
152            return; // warm start — already ready
153        }
154        // Fast path: if the database is empty (new or just created), skip the
155        // background thread entirely. No objects to scan = instant startup.
156        if self_arc.objects.all_hashes().next().is_none() {
157            self_arc.startup_ready.store(true, Ordering::SeqCst);
158            return;
159        }
160        println!("  [nedbd] cold start — background scan starting, server accepting reads now");
161        std::thread::spawn(move || {
162            let db = self_arc;
163            cold_scan_background_arc(db);
164        });
165    }
166
167    /// Write a document. Returns the new node with its content hash set.
168    pub fn put(
169        &self,
170        coll: &str,
171        id: &str,
172        data: Value,
173        caused_by: Vec<String>,
174        valid_from: Option<String>,
175        valid_to:   Option<String>,
176    ) -> Result<Node> {
177        let seq  = self.seq.fetch_add(1, Ordering::SeqCst);
178        let prev = self.id_index.get(coll, id);
179
180        // Remove old node from sorted indexes (it's being superseded)
181        if let Some(old_hash) = &prev {
182            if let Ok(old_node) = self.objects.read(old_hash) {
183                if let Value::Object(ref obj) = old_node.data {
184                    for (field, value) in obj {
185                        self.sorted_indexes.remove(coll, field, value, old_hash);
186                    }
187                }
188            }
189        }
190
191        let mut node = Node {
192            id:         id.to_string(),
193            coll:       coll.to_string(),
194            seq,
195            data:       data.clone(),
196            prev,
197            caused_by:  caused_by.clone(),
198            ts:         now(),
199            valid_from,
200            valid_to,
201            hash:       String::new(),
202        };
203
204        // Write to object store (atomic, content-addressed)
205        let hash = self.objects.write(&mut node)?;
206        self.seq_index.insert(seq, hash.clone());
207
208        // Update id index (atomic file)
209        self.id_index.set(coll, id, &hash)?;
210
211        // Update sorted indexes
212        if let Value::Object(ref obj) = data {
213            for (field, value) in obj {
214                if self.sorted_indexes.has(coll, field) {
215                    self.sorted_indexes.insert(coll, field, value, &hash);
216                }
217            }
218        }
219
220        // Write causal graph edges
221        for cause in &caused_by {
222            self.graph.add_edge(&hash, "caused_by", cause)?;
223            self.graph.add_edge(cause, "caused_by_rev", &hash)?;
224        }
225
226        // Update running Merkle head: O(1) chain, no full recompute.
227        // new_head = BLAKE2b(prev_head || seq_bytes || new_object_hash)
228        self.update_head(seq, &hash);
229
230        Ok(node)
231    }
232
233    /// Batch put: write N documents in parallel, preserving monotonic seq ordering.
234    /// Pre-allocates N seq numbers atomically, then parallelises object writes and
235    /// id-index updates via Rayon. Each op is independent — safe to parallelise.
236    /// Returns nodes in input order with assigned seq numbers.
237    pub fn put_batch(
238        &self,
239        ops: Vec<(String, String, Value, Vec<String>, Option<String>, Option<String>)>,
240        // (coll, id, data, caused_by, valid_from, valid_to)
241    ) -> Result<Vec<Node>> {
242        use rayon::prelude::*;
243
244        if ops.is_empty() { return Ok(vec![]); }
245        let n = ops.len() as u64;
246
247        // Pre-allocate N consecutive seq numbers — preserves ordering under concurrency
248        let base_seq = self.seq.fetch_add(n, Ordering::SeqCst);
249        let ts = now();
250
251        // Build nodes with assigned seq numbers
252        let mut nodes: Vec<Node> = ops.into_iter().enumerate().map(|(i, (coll, id, data, caused_by, valid_from, valid_to))| {
253            let prev = self.id_index.get(&coll, &id);
254            Node {
255                id, coll, seq: base_seq + i as u64,
256                data, prev, caused_by,
257                ts, valid_from, valid_to,
258                hash: String::new(),
259            }
260        }).collect();
261
262        // Parallel object writes (content-addressed, idempotent, safe to parallelise)
263        let write_errors: Vec<anyhow::Error> = nodes.par_iter_mut()
264            .filter_map(|node| self.objects.write(node).err())
265            .collect();
266        if let Some(e) = write_errors.into_iter().next() { return Err(e); }
267
268        // Parallel id-index updates
269        let index_errors: Vec<anyhow::Error> = nodes.par_iter()
270            .filter_map(|node| self.id_index.set(&node.coll, &node.id, &node.hash).err())
271            .collect();
272        if let Some(e) = index_errors.into_iter().next() { return Err(e); }
273
274        // Sorted indexes + causal graph (sequential — small overhead, usually no indexes)
275        for node in &nodes {
276            self.seq_index.insert(node.seq, node.hash.clone());
277            if let Value::Object(ref obj) = node.data {
278                for (field, value) in obj {
279                    if self.sorted_indexes.has(&node.coll, field) {
280                        self.sorted_indexes.insert(&node.coll, field, value, &node.hash);
281                    }
282                }
283            }
284            for cause in &node.caused_by {
285                self.graph.add_edge(&node.hash, "caused_by", cause).ok();
286                self.graph.add_edge(cause, "caused_by_rev", &node.hash).ok();
287            }
288        }
289
290        // Single Merkle head update for the whole batch (chain all hashes)
291        for node in &nodes {
292            self.update_head(node.seq, &node.hash);
293        }
294
295        Ok(nodes)
296    }
297
298    /// Update the running Merkle head with a new write. O(1), lock-free on the flush path.
299    /// Sets dirty flag — the background ticker calls flush_manifest periodically.
300    /// This removes 2× file I/O ops from the hot write path, unblocking concurrent writes.
301    fn update_head(&self, seq: u64, new_hash: &str) {
302        use blake2::{Blake2b512, Digest};
303        let prev = self.head.read().clone();
304        let mut h = Blake2b512::new();
305        h.update(prev.as_bytes());
306        h.update(seq.to_le_bytes());
307        h.update(new_hash.as_bytes());
308        *self.head.write() = hex::encode(&h.finalize()[..32]);
309        // Mark dirty — background ticker will flush to MANIFEST (no I/O on write path)
310        self.manifest_dirty.store(true, Ordering::Release);
311    }
312
313    /// Flush both the id-index WAL and MANIFEST. Used on graceful shutdown.
314    pub fn flush_all(&self) {
315        self.id_index.flush_write_buf();
316        // v3: fsync the active segment (no-op for loose/in-memory stores).
317        // One durability point per batch instead of one fsync per object.
318        if let Err(e) = self.objects.sync() {
319            eprintln!("nedb: segment sync failed: {}", e);
320        }
321        self.flush_manifest();
322    }
323
324    /// Compact the v3 packed object store: keep the CURRENT version of every
325    /// document (from the id-index) and reclaim everything else. No-op unless
326    /// running with the v3 segment substrate (`--dag-v3` / NEDB_DAG_V3).
327    ///
328    /// This is a PRUNING operation: superseded/historical object versions are
329    /// dropped, so AS OF / TRACE over pruned versions is discarded — that is
330    /// what reclaims the space. Flushes first so all data is durable on disk
331    /// before the old segments are deleted.
332    pub fn compact(&self) -> Result<crate::segment::CompactStats> {
333        self.flush_all();
334        let mut live: std::collections::HashSet<String> = std::collections::HashSet::new();
335        for coll in self.id_index.collections() {
336            for id in self.id_index.list_ids(&coll) {
337                if let Some(h) = self.id_index.get(&coll, &id) {
338                    live.insert(h);
339                }
340            }
341        }
342        self.objects.compact(&live)
343    }
344
345    /// Flush MANIFEST to disk if dirty. No-op for in-memory databases.
346    pub fn flush_manifest_if_dirty(&self) {
347        if self.root == std::path::PathBuf::from(":memory:") { return; }
348        if self.manifest_dirty.compare_exchange(
349            true, false, Ordering::AcqRel, Ordering::Relaxed
350        ).is_ok() {
351            self.flush_manifest();
352        }
353    }
354
355    /// Atomically persist current seq+head to MANIFEST. No-op for in-memory databases.
356    pub fn flush_manifest(&self) {
357        if self.root == std::path::PathBuf::from(":memory:") { return; }
358        let seq  = self.seq.load(Ordering::SeqCst);
359        let head = self.head.read().clone();
360        let m = Manifest { seq, head };
361        if let Ok(json) = serde_json::to_string(&m) {
362            let path = self.root.join("MANIFEST");
363            let tmp  = self.root.join("MANIFEST.tmp");
364            let _ = fs::write(&tmp, &json);
365            let _ = fs::rename(&tmp, &path);
366        }
367    }
368
369    /// Start a background thread that flushes both the id-index WAL and MANIFEST
370    /// every `interval_ms` milliseconds.
371    /// Call this after Arc::new(db) — the Arc keeps Db alive for the thread's lifetime.
372    pub fn start_manifest_ticker(self_arc: Arc<Self>, interval_ms: u64) {
373        let db = self_arc;
374        std::thread::spawn(move || {
375            loop {
376                std::thread::sleep(std::time::Duration::from_millis(interval_ms));
377                // Flush id-index WAL to disk (parallel Rayon writes)
378                db.id_index.flush_write_buf();
379                // Then flush MANIFEST
380                db.flush_manifest_if_dirty();
381            }
382        });
383    }
384
385    /// Return the current Merkle head string. O(1) — read from cache.
386    pub fn head(&self) -> String {
387        self.head.read().clone()
388    }
389
390    /// Delete a document — writes a tombstone node and removes the id from the index.
391    /// The object history is preserved in the DAG; only the live id pointer is cleared.
392    pub fn delete(&self, coll: &str, id: &str) -> Result<bool> {
393        let prev = match self.id_index.get(coll, id) {
394            None => return Ok(false),   // already gone
395            Some(h) => h,
396        };
397        let seq = self.seq.fetch_add(1, Ordering::SeqCst);
398        let mut tombstone = Node {
399            id:         format!("_del_{}", id),
400            coll:       coll.to_string(),
401            seq,
402            data:       serde_json::json!({"_deleted": id, "_prev": prev}),
403            prev:       Some(prev),
404            caused_by:  vec![],
405            ts:         now(),
406            valid_from: None,
407            valid_to:   None,
408            hash:       String::new(),
409        };
410        let hash = self.objects.write(&mut tombstone)?;
411        self.update_head(seq, &hash);
412        // Remove the live id pointer — doc is now invisible to queries and list()
413        self.id_index.remove(coll, id)?;
414        Ok(true)
415    }
416
417    /// Get the current version of a document by id.
418    pub fn get(&self, coll: &str, id: &str) -> Option<Node> {
419        let hash = self.id_index.get(coll, id)?;
420        self.objects.read(&hash).ok()
421    }
422
423    /// Get a specific version of a document by object hash.
424    pub fn get_by_hash(&self, hash: &str) -> Option<Node> {
425        self.objects.read(hash).ok()
426    }
427
428    /// Get a document AS OF a specific sequence number.
429    /// Walks the version chain (prev links) backward until seq <= target.
430    pub fn get_as_of(&self, coll: &str, id: &str, target_seq: u64) -> Option<Node> {
431        let hash = self.id_index.get(coll, id)?;
432        let mut current = self.objects.read(&hash).ok()?;
433        loop {
434            if current.seq <= target_seq {
435                return Some(current);
436            }
437            let prev_hash = current.prev.as_deref()?;
438            current = self.objects.read(prev_hash).ok()?;
439        }
440    }
441
442    /// List all documents in a collection, returning current versions.
443    pub fn list(&self, coll: &str) -> Vec<Node> {
444        self.id_index
445            .list_ids(coll)
446            .into_iter()
447            .filter_map(|id| self.get(coll, &id))
448            .collect()
449    }
450
451    /// ORDER BY field ASC LIMIT n — uses sorted index if available, else falls back to full scan.
452    pub fn order_by_asc(&self, coll: &str, field: &str, limit: usize) -> Vec<Node> {
453        if self.sorted_indexes.has(coll, field) {
454            self.sorted_indexes
455                .top_k_asc(coll, field, limit)
456                .into_iter()
457                .filter_map(|h| self.objects.read(&h).ok())
458                .collect()
459        } else {
460            let mut docs = self.list(coll);
461            docs.sort_by(|a, b| {
462                let av = a.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
463                let bv = b.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
464                av.cmp(&bv)
465            });
466            docs.truncate(limit);
467            docs
468        }
469    }
470
471    /// ORDER BY field DESC LIMIT n
472    pub fn order_by_desc(&self, coll: &str, field: &str, limit: usize) -> Vec<Node> {
473        if self.sorted_indexes.has(coll, field) {
474            self.sorted_indexes
475                .top_k_desc(coll, field, limit)
476                .into_iter()
477                .filter_map(|h| self.objects.read(&h).ok())
478                .collect()
479        } else {
480            let mut docs = self.list(coll);
481            docs.sort_by(|a, b| {
482                let av = a.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
483                let bv = b.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
484                bv.cmp(&av)
485            });
486            docs.truncate(limit);
487            docs
488        }
489    }
490
491    /// TRACE caused_by — walk causal graph from a node.
492    pub fn trace(&self, hash: &str, reverse: bool, limit: usize) -> Vec<Node> {
493        self.graph
494            .trace(hash, "caused_by", reverse, limit)
495            .into_iter()
496            .filter_map(|h| self.objects.read(&h).ok())
497            .collect()
498    }
499
500    /// Verify tamper-evidence of all objects.
501    pub fn verify(&self) -> (usize, Vec<String>) {
502        self.objects.verify_all()
503    }
504
505    /// Create a sorted index for a (coll, field) pair.
506    pub fn create_sorted_index(&self, coll: &str, field: &str) {
507        self.sorted_indexes.ensure(coll, field);
508        // Backfill from existing objects
509        for id in self.id_index.list_ids(coll) {
510            if let Some(node) = self.get(coll, &id) {
511                if let Value::Object(ref obj) = node.data {
512                    if let Some(value) = obj.get(field) {
513                        self.sorted_indexes.insert(coll, field, value, &node.hash);
514                    }
515                }
516            }
517        }
518    }
519
520    /// Resolve a sequence number to its content hash (v1 compatibility).
521    /// Only covers nodes written in the current process session + cold-scan nodes.
522    pub fn get_hash_by_seq(&self, seq: u64) -> Option<String> {
523        self.seq_index.get(&seq).map(|r| r.clone())
524    }
525
526    /// Add an explicit named relation edge between two documents.
527    /// Add an explicit named relation between two "coll:id" nodes.
528    /// Relations stored as __links__ documents — NQL-queryable, time-travelable,
529    /// consistent with the PyO3 binding which uses the same __links__ convention.
530    pub fn link(&self, frm: &str, rel: &str, to: &str) -> Result<()> {
531        let (frm_coll, frm_id) = frm.split_once(':')
532            .ok_or_else(|| anyhow::anyhow!("link frm must be 'coll:id', got: {}", frm))?;
533        let (to_coll, to_id) = to.split_once(':')
534            .ok_or_else(|| anyhow::anyhow!("link to must be 'coll:id', got: {}", to))?;
535        if self.id_index.get(frm_coll, frm_id).is_none() {
536            anyhow::bail!("link: frm not found: {}", frm);
537        }
538        if self.id_index.get(to_coll, to_id).is_none() {
539            anyhow::bail!("link: to not found: {}", to);
540        }
541        let link_id = format!("{}|{}|{}", frm, rel, to);
542        let doc = serde_json::json!({"_from": frm, "_rel": rel, "_to": to});
543        self.put("__links__", &link_id, doc, vec![], None, None)?;
544        Ok(())
545    }
546
547    /// Remove a named relation (deletes the __links__ document).
548    pub fn unlink(&self, frm: &str, rel: &str, to: &str) -> Result<bool> {
549        let link_id = format!("{}|{}|{}", frm, rel, to);
550        self.delete("__links__", &link_id)
551    }
552
553    /// Get neighbor nodes via a named relation.
554    /// Queries __links__ — consistent with the PyO3 binding.
555    pub fn neighbors(&self, frm: &str, rel: &str) -> Vec<Node> {
556        self.id_index
557            .list_ids("__links__")
558            .into_iter()
559            .filter_map(|id| self.get("__links__", &id))
560            .filter(|node| {
561                node.data.get("_from").and_then(|v| v.as_str()) == Some(frm)
562                    && node.data.get("_rel").and_then(|v| v.as_str()) == Some(rel)
563            })
564            .filter_map(|node| {
565                let to = node.data.get("_to")?.as_str()?;
566                let (to_coll, to_id) = to.split_once(':')?;
567                self.get(to_coll, to_id)
568            })
569            .collect()
570    }
571}
572
573/// Background cold-scan worker. Takes Arc<Db> — safe, Db is on the heap.
574fn cold_scan_background_arc(db: Arc<Db>) {
575    use rayon::prelude::*;
576    use blake2::{Blake2b512, Digest};
577
578    let objects        = &db.objects;
579    let head           = &db.head;
580    let seq_atomic     = &db.seq;
581    let sorted_indexes = &db.sorted_indexes;
582    let root           = db.root.clone();
583    let ready_flag     = Arc::clone(&db.startup_ready);
584
585    let hashes: Vec<String> = objects.all_hashes().collect();
586    let total = hashes.len();
587
588    if total == 0 {
589        ready_flag.store(true, Ordering::SeqCst);
590        return;
591    }
592
593    println!("  [nedbd] background scan — {} objects...", total);
594    let t0 = std::time::Instant::now();
595    let step = (total / 10).max(1000);
596
597    let nodes: Vec<Node> = hashes.par_iter()
598        .enumerate()
599        .filter_map(|(i, h)| {
600            if i > 0 && i % step == 0 {
601                let pct     = i * 100 / total;
602                let elapsed = t0.elapsed().as_secs_f32();
603                let rate    = i as f32 / elapsed;
604                let eta     = (total - i) as f32 / rate;
605                eprint!("\r  [nedbd]   {:>3}%  {:>8} / {:>8}  ({:>8.0}/s  eta {:.0}s)   ",
606                    pct, i, total, rate, eta);
607            }
608            objects.read(h).ok()
609        })
610        .collect();
611
612    eprintln!("\r  [nedbd]   100%  {:>8} / {:>8}  ({:.1}s)                        ",
613        total, total, t0.elapsed().as_secs_f32());
614
615    let max_seq = nodes.iter().map(|n| n.seq).max().unwrap_or(0);
616    seq_atomic.store(max_seq + 1, Ordering::SeqCst);
617
618    for node in &nodes {
619        db.seq_index.insert(node.seq, node.hash.clone());
620        if let Value::Object(ref obj) = node.data {
621            for (field, value) in obj {
622                if sorted_indexes.has(&node.coll, field) {
623                    sorted_indexes.insert(&node.coll, field, value, &node.hash);
624                }
625            }
626        }
627    }
628
629    // Compute Merkle head from sorted hashes
630    let mut sorted_hashes = hashes;
631    sorted_hashes.sort();
632    let mut h = Blake2b512::new();
633    h.update(max_seq.to_le_bytes());
634    for hash_str in &sorted_hashes {
635        h.update(hash_str.as_bytes());
636    }
637    let new_head = hex::encode(&h.finalize()[..32]);
638    *head.write() = new_head.clone();
639
640    // Write MANIFEST atomically
641    let m     = Manifest { seq: max_seq, head: new_head };
642    let json  = serde_json::to_string(&m).unwrap_or_default();
643    let path  = root.join("MANIFEST");
644    let tmp   = root.join("MANIFEST.tmp");
645    let _ = fs::write(&tmp, &json);
646    let _ = fs::rename(&tmp, &path);
647
648    // Signal server: writes can now proceed
649    ready_flag.store(true, Ordering::SeqCst);
650    println!("  [nedbd] background scan complete — seq={} objects={} MANIFEST written", max_seq, total);
651}
652
653fn now() -> f64 {
654    std::time::SystemTime::now()
655        .duration_since(std::time::UNIX_EPOCH)
656        .map(|d| d.as_secs_f64())
657        .unwrap_or(0.0)
658}
659
660#[cfg(test)]
661mod tests {
662    use super::*;
663    use tempfile::tempdir;
664
665    #[test]
666    fn put_and_get() {
667        let dir = tempdir().unwrap();
668        let db = Db::open(dir.path(), None).unwrap();
669        db.put(
670            "blocks", "618000",
671            serde_json::json!({"height": 618000, "hash": "0000abc"}),
672            vec![], None, None,
673        ).unwrap();
674        let node = db.get("blocks", "618000").unwrap();
675        assert_eq!(node.id, "618000");
676        assert_eq!(node.data["height"], 618000);
677    }
678
679    #[test]
680    fn order_by_with_sorted_index() {
681        let dir = tempdir().unwrap();
682        let db = Db::open(dir.path(), None).unwrap();
683        db.create_sorted_index("blocks", "height");
684        for h in [3u64, 1, 5, 2, 4] {
685            db.put("blocks", &h.to_string(),
686                serde_json::json!({"height": h}),
687                vec![], None, None).unwrap();
688        }
689        let asc = db.order_by_asc("blocks", "height", 3);
690        let heights: Vec<u64> = asc.iter()
691            .filter_map(|n| n.data["height"].as_u64())
692            .collect();
693        assert_eq!(heights, vec![1, 2, 3]);
694    }
695
696    #[test]
697    fn causal_trace() {
698        let dir = tempdir().unwrap();
699        let db = Db::open(dir.path(), None).unwrap();
700        let a = db.put("ops", "a", serde_json::json!({"op": "create"}), vec![], None, None).unwrap();
701        let b = db.put("ops", "b", serde_json::json!({"op": "transfer"}), vec![a.hash.clone()], None, None).unwrap();
702        let c = db.put("ops", "c", serde_json::json!({"op": "burn"}), vec![b.hash.clone()], None, None).unwrap();
703
704        let trace = db.trace(&c.hash, false, 10);
705        assert_eq!(trace.len(), 3);  // c → b → a
706    }
707
708    #[test]
709    fn as_of() {
710        let dir = tempdir().unwrap();
711        let db = Db::open(dir.path(), None).unwrap();
712        let v1 = db.put("docs", "x", serde_json::json!({"v": 1}), vec![], None, None).unwrap();
713        let _v2 = db.put("docs", "x", serde_json::json!({"v": 2}), vec![], None, None).unwrap();
714
715        let at_v1 = db.get_as_of("docs", "x", v1.seq).unwrap();
716        assert_eq!(at_v1.data["v"], 1);
717        let current = db.get("docs", "x").unwrap();
718        assert_eq!(current.data["v"], 2);
719    }
720}
721
722#[cfg(test)]
723mod tests_v2 {
724    use super::*;
725    use tempfile::tempdir;
726
727    #[test]
728    fn seq_index_populated_on_put() {
729        let db = Db::in_memory();
730        let a = db.put("item", "a", serde_json::json!({"x": 1}), vec![], None, None).unwrap();
731        let b = db.put("item", "b", serde_json::json!({"x": 2}), vec![], None, None).unwrap();
732        assert_eq!(db.get_hash_by_seq(a.seq), Some(a.hash.clone()));
733        assert_eq!(db.get_hash_by_seq(b.seq), Some(b.hash.clone()));
734        assert_eq!(db.get_hash_by_seq(9999), None);
735    }
736
737    #[test]
738    fn seq_index_survives_batch() {
739        let db = Db::in_memory();
740        let nodes = db.put_batch(vec![
741            ("item".into(), "x".into(), serde_json::json!({"v": 1}), vec![], None, None),
742            ("item".into(), "y".into(), serde_json::json!({"v": 2}), vec![], None, None),
743        ]).unwrap();
744        for node in &nodes {
745            assert_eq!(db.get_hash_by_seq(node.seq), Some(node.hash.clone()));
746        }
747    }
748
749    #[test]
750    fn link_and_neighbors() {
751        let db = Db::in_memory();
752        db.put("driver", "d1", serde_json::json!({"name": "Bob"}),   vec![], None, None).unwrap();
753        db.put("driver", "d2", serde_json::json!({"name": "Carol"}), vec![], None, None).unwrap();
754        db.put("trip",   "t1", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
755        db.put("trip",   "t2", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
756
757        db.link("driver:d1", "handles", "trip:t1").unwrap();
758        db.link("driver:d1", "handles", "trip:t2").unwrap();
759        db.link("driver:d2", "handles", "trip:t1").unwrap();
760
761        let d1_trips = db.neighbors("driver:d1", "handles");
762        assert_eq!(d1_trips.len(), 2);
763        let ids: std::collections::HashSet<&str> = d1_trips.iter().map(|n| n.id.as_str()).collect();
764        assert!(ids.contains("t1") && ids.contains("t2"));
765
766        let d2_trips = db.neighbors("driver:d2", "handles");
767        assert_eq!(d2_trips.len(), 1);
768        assert_eq!(d2_trips[0].id, "t1");
769    }
770
771    #[test]
772    fn link_stored_in_links_collection() {
773        // Links are stored as __links__ documents, not as graph edges.
774        // The __links__ collection is NQL-queryable and consistent with the PyO3 binding.
775        let db = Db::in_memory();
776        db.put("driver", "d1", serde_json::json!({"name": "Bob"}),   vec![], None, None).unwrap();
777        db.put("trip",   "t1", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
778        db.link("driver:d1", "handles", "trip:t1").unwrap();
779        // Verify the __links__ document was created
780        let link_doc = db.get("__links__", "driver:d1|handles|trip:t1");
781        assert!(link_doc.is_some(), "__links__ doc should exist");
782        let doc = link_doc.unwrap();
783        assert_eq!(doc.data["_from"], "driver:d1");
784        assert_eq!(doc.data["_rel"],  "handles");
785        assert_eq!(doc.data["_to"],   "trip:t1");
786        // neighbors() resolves to the target node
787        let nb = db.neighbors("driver:d1", "handles");
788        assert_eq!(nb.len(), 1);
789        assert_eq!(nb[0].id, "t1");
790    }
791
792    #[test]
793    fn link_missing_node_errors() {
794        let db = Db::in_memory();
795        db.put("driver", "d1", serde_json::json!({}), vec![], None, None).unwrap();
796        assert!(db.link("driver:d1", "handles", "trip:ghost").is_err());
797    }
798
799    #[test]
800    fn link_durable_survives_reopen() {
801        let dir = tempdir().unwrap();
802        {
803            let db = Db::open(dir.path(), None).unwrap();
804            db.put("driver", "d1", serde_json::json!({"name": "Bob"}),   vec![], None, None).unwrap();
805            db.put("trip",   "t1", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
806            db.link("driver:d1", "handles", "trip:t1").unwrap();
807        }
808        let db2 = Db::open(dir.path(), None).unwrap();
809        db2.startup_ready.store(true, std::sync::atomic::Ordering::SeqCst);
810        let trips = db2.neighbors("driver:d1", "handles");
811        assert_eq!(trips.len(), 1);
812        assert_eq!(trips[0].id, "t1");
813    }
814}