Skip to main content

nedb_engine/
db.rs

1//! Main DAG database — coordinates ObjectStore, IdIndex, SortedIndexes, GraphStore.
2
3use std::fs;
4use std::path::{Path, PathBuf};
5use std::sync::Arc;
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7use anyhow::Result;
8use dashmap::DashMap;
9use serde_json::Value;
10use parking_lot::RwLock;
11
12use crate::store::{Dek, Node, ObjectStore};
13use crate::index::{IdIndex, OrderedValue, SortedIndexes};
14use crate::graph::GraphStore;
15use crate::migrate;
16
17/// MANIFEST: cached {seq, head} written atomically after every write.
18/// On startup, if MANIFEST exists and no sorted indexes need rebuilding,
19/// startup is O(1) — just read this one file instead of scanning all objects.
20#[derive(serde::Serialize, serde::Deserialize)]
21struct Manifest {
22    seq:  u64,
23    head: String,
24}
25
26/// Default cap for `since()` when the caller passes `limit == 0`. Bounds the
27/// engine primitive itself so a stale/offline consumer can never force an
28/// unbounded materialization — the safety lives in the core, not the HTTP layer.
29pub const DEFAULT_SINCE_LIMIT: usize = 10_000;
30
31/// One page of the changefeed returned by `since()`. The replication contract:
32/// apply `nodes` in ascending seq order, advance your cursor to `to_seq`, and keep
33/// paging while `has_more` is true; then attach to the live `subscribe` edge.
34/// `head_seq` tells the consumer how far the log currently extends (how far behind
35/// it is).
36#[derive(Debug, Clone, serde::Serialize)]
37pub struct SinceBatch {
38    /// Writes in (`from_seq`, `to_seq`], ascending by seq.
39    pub nodes:    Vec<Node>,
40    /// The exclusive cursor this page started from (echoes the request).
41    pub from_seq: u64,
42    /// Seq of the last node in this page — the consumer's next cursor.
43    pub to_seq:   u64,
44    /// Current head seq of the log (latest committed write).
45    pub head_seq: u64,
46    /// True when more writes remain past `to_seq` (the page hit `limit`).
47    pub has_more: bool,
48}
49
50/// Replication readiness snapshot. `scan_complete` is the correctness gate: until
51/// the cold-scan finishes rebuilding the seq index, an old cursor passed to
52/// `since()` can return a PARTIAL page and look (wrongly) like "caught up". A
53/// correctness-critical consumer MUST wait for `scan_complete == true` before
54/// trusting historical catch-up. `indexed_seq_min/max` report the currently
55/// resolvable seq range; `tip_seq` is the log head.
56#[derive(Debug, Clone, serde::Serialize)]
57pub struct ScanStatus {
58    /// Cold-scan finished — historical seqs fully resolvable; catch-up is safe.
59    pub scan_complete:   bool,
60    /// Head seq of the log (latest committed write).
61    pub tip_seq:         u64,
62    /// Lowest seq currently in the seq index (0 if empty).
63    pub indexed_seq_min: u64,
64    /// Highest seq currently in the seq index.
65    pub indexed_seq_max: u64,
66    /// Number of seqs currently resolvable via the index.
67    pub indexed_count:   usize,
68}
69
70pub struct Db {
71    pub objects:        ObjectStore,
72    pub id_index:       IdIndex,
73    pub sorted_indexes: SortedIndexes,
74    pub graph:          GraphStore,
75    pub root:           PathBuf,
76    /// Dirty flag — set true when head changes, cleared after manifest flush.
77    /// Decouples flush_manifest from the hot write path so concurrent writes
78    /// don't serialise on 2× file I/O per PUT.
79    manifest_dirty:     Arc<AtomicBool>,
80    pub seq:            AtomicU64,
81    /// Cached Merkle head — updated incrementally on every write (O(1)).
82    head:               RwLock<String>,
83    /// True once startup is fully ready (MANIFEST loaded or cold scan complete).
84    /// Warm starts set this true before returning from open().
85    /// Cold starts set this true in the background thread when scan completes.
86    /// Writes are held with 503 until this is true; reads always proceed.
87    pub startup_ready:  Arc<AtomicBool>,
88    /// Seq → hash lookup for v1 compatibility. Populated by put(), put_batch(),
89    /// and the cold-scan background pass. Only covers nodes from the current
90    /// process session + cold-scan; older seqs not in this map cannot be resolved.
91    seq_index:          Arc<DashMap<u64, String>>,
92}
93
94impl Db {
95    /// Create a pure in-memory database — no disk I/O, no migration, instant startup.
96    /// Perfect for tests, hot-cache layers, and ephemeral sessions.
97    /// All data is lost when the Db is dropped.
98    pub fn in_memory() -> Self {
99        Self {
100            objects:        ObjectStore::in_memory(),
101            id_index:       IdIndex::in_memory(),
102            sorted_indexes: SortedIndexes::new(),
103            graph:          GraphStore::in_memory(),
104            root:           std::path::PathBuf::from(":memory:"),
105            seq:            AtomicU64::new(0),
106            head:           RwLock::new(String::new()),
107            startup_ready:  Arc::new(AtomicBool::new(true)),  // always ready
108            manifest_dirty: Arc::new(AtomicBool::new(false)),
109            seq_index:      Arc::new(DashMap::new()),
110        }
111    }
112
113    /// Open (or create) a database. Runs v1→v2 migration automatically if log.aof is present.
114    pub fn open(db_root: &Path, dek: Option<Dek>) -> Result<Self> {
115        std::fs::create_dir_all(db_root)?;
116
117        let objects        = ObjectStore::new(db_root, dek.clone())?;
118        let id_index       = IdIndex::new(db_root)?;
119        let sorted_indexes = SortedIndexes::new();
120        let graph          = GraphStore::new(db_root)?;
121
122        let mut db = Self {
123            objects,
124            id_index,
125            sorted_indexes,
126            graph,
127            root: db_root.to_path_buf(),
128            seq:  AtomicU64::new(0),
129            head: RwLock::new(String::new()),
130            startup_ready:  Arc::new(AtomicBool::new(false)),
131            manifest_dirty: Arc::new(AtomicBool::new(false)),
132            seq_index:      Arc::new(DashMap::new()),
133        };
134
135        // Auto-migrate v1 → v2 if needed (pass DEK so encrypted AOFs convert correctly)
136        migrate::migrate_if_needed(
137            db_root,
138            &db.objects,
139            &db.id_index,
140            &db.sorted_indexes,
141            &db.graph,
142            dek.as_ref(),
143        )?;
144
145        // Fast startup: load seq+head from MANIFEST if no sorted indexes need rebuilding.
146        // Falls back to full object scan only when necessary (first open, or post-migration).
147        db.startup_rebuild()?;
148
149        Ok(db)
150    }
151
152    /// Smart startup:
153    /// - Warm (MANIFEST exists): O(1) load → startup_ready = true immediately.
154    /// - Cold (no MANIFEST): start server immediately, run scan in background thread.
155    ///   Writes return 503 until scan completes; reads always proceed.
156    fn startup_rebuild(&mut self) -> Result<()> {
157        let manifest_path = self.root.join("MANIFEST");
158        let needs_index_rebuild = !self.sorted_indexes.is_empty();
159
160        // Warm path: MANIFEST + no sorted indexes to rebuild → instant start
161        if manifest_path.exists() && !needs_index_rebuild {
162            if let Some(m) = fs::read_to_string(&manifest_path)
163                .ok()
164                .and_then(|s| serde_json::from_str::<Manifest>(&s).ok())
165            {
166                // Self-heal: MANIFEST with an empty or short head is corrupt/stale.
167                // Fall through to cold scan so the head is rebuilt correctly from objects.
168                if m.head.len() < 8 {
169                    eprintln!("  [nedbd] MANIFEST head invalid (len={}), self-healing via cold scan", m.head.len());
170                } else {
171                    self.seq.store(m.seq, Ordering::SeqCst); // m.seq is already the next-to-assign counter
172                    *self.head.write() = m.head.clone();
173                    self.startup_ready.store(true, Ordering::SeqCst);
174                    println!("  [nedbd] warm start — seq={} head={}...", m.seq, &m.head[..8]);
175                    return Ok(());
176                }
177            } else {
178                eprintln!("  [nedbd] MANIFEST corrupt or missing, falling back to cold scan");
179            }
180        }
181
182        // Cold path: mark as not ready, return immediately.
183        // The actual background scan is started by Db::start_cold_scan(arc)
184        // which is called from Manager::open_all() AFTER Arc::new(db) — when
185        // the Db is heap-allocated and its field addresses are permanently stable.
186        // Capturing field addresses here would cause UB: Db moves on return.
187        println!("  [nedbd] cold start — background scan will start after heap allocation");
188        Ok(())
189    }
190
191    /// Call this from Manager::open_all() after Arc::new(db).
192    /// Spawns the cold scan background thread with stable heap addresses.
193    /// No-op if startup is already complete (warm start).
194    pub fn start_cold_scan(self_arc: Arc<Self>) {
195        if self_arc.startup_ready.load(Ordering::SeqCst) {
196            return; // warm start — already ready
197        }
198        // Fast path: if the database is empty (new or just created), skip the
199        // background thread entirely. No objects to scan = instant startup.
200        if self_arc.objects.all_hashes().next().is_none() {
201            self_arc.startup_ready.store(true, Ordering::SeqCst);
202            return;
203        }
204        println!("  [nedbd] cold start — background scan starting, server accepting reads now");
205        std::thread::spawn(move || {
206            let db = self_arc;
207            cold_scan_background_arc(db);
208        });
209    }
210
211    /// Write a document. Returns the new node with its content hash set.
212    pub fn put(
213        &self,
214        coll: &str,
215        id: &str,
216        data: Value,
217        caused_by: Vec<String>,
218        valid_from: Option<String>,
219        valid_to:   Option<String>,
220    ) -> Result<Node> {
221        let seq  = self.seq.fetch_add(1, Ordering::SeqCst);
222        let prev = self.id_index.get(coll, id);
223
224        // Remove old node from sorted indexes (it's being superseded)
225        if let Some(old_hash) = &prev {
226            if let Ok(old_node) = self.objects.read(old_hash) {
227                if let Value::Object(ref obj) = old_node.data {
228                    for (field, value) in obj {
229                        self.sorted_indexes.remove(coll, field, value, old_hash);
230                    }
231                }
232            }
233        }
234
235        let mut node = Node {
236            id:         id.to_string(),
237            coll:       coll.to_string(),
238            seq,
239            data:       data.clone(),
240            prev,
241            caused_by:  caused_by.clone(),
242            ts:         now(),
243            valid_from,
244            valid_to,
245            hash:       String::new(),
246        };
247
248        // Write to object store (atomic, content-addressed)
249        let hash = self.objects.write(&mut node)?;
250        self.seq_index.insert(seq, hash.clone());
251
252        // Update id index (atomic file)
253        self.id_index.set(coll, id, &hash)?;
254
255        // Update sorted indexes
256        if let Value::Object(ref obj) = data {
257            for (field, value) in obj {
258                if self.sorted_indexes.has(coll, field) {
259                    self.sorted_indexes.insert(coll, field, value, &hash);
260                }
261            }
262        }
263
264        // Write causal graph edges
265        for cause in &caused_by {
266            self.graph.add_edge(&hash, "caused_by", cause)?;
267            self.graph.add_edge(cause, "caused_by_rev", &hash)?;
268        }
269
270        // Update running Merkle head: O(1) chain, no full recompute.
271        // new_head = BLAKE2b(prev_head || seq_bytes || new_object_hash)
272        self.update_head(seq, &hash);
273
274        Ok(node)
275    }
276
277    /// Batch put: write N documents in parallel, preserving monotonic seq ordering.
278    /// Pre-allocates N seq numbers atomically, then parallelises object writes and
279    /// id-index updates via Rayon. Each op is independent — safe to parallelise.
280    /// Returns nodes in input order with assigned seq numbers.
281    pub fn put_batch(
282        &self,
283        ops: Vec<(String, String, Value, Vec<String>, Option<String>, Option<String>)>,
284        // (coll, id, data, caused_by, valid_from, valid_to)
285    ) -> Result<Vec<Node>> {
286        use rayon::prelude::*;
287
288        if ops.is_empty() { return Ok(vec![]); }
289        let n = ops.len() as u64;
290
291        // Pre-allocate N consecutive seq numbers — preserves ordering under concurrency
292        let base_seq = self.seq.fetch_add(n, Ordering::SeqCst);
293        let ts = now();
294
295        // Build nodes with assigned seq numbers
296        let mut nodes: Vec<Node> = ops.into_iter().enumerate().map(|(i, (coll, id, data, caused_by, valid_from, valid_to))| {
297            let prev = self.id_index.get(&coll, &id);
298            Node {
299                id, coll, seq: base_seq + i as u64,
300                data, prev, caused_by,
301                ts, valid_from, valid_to,
302                hash: String::new(),
303            }
304        }).collect();
305
306        // Parallel object writes (content-addressed, idempotent, safe to parallelise)
307        let write_errors: Vec<anyhow::Error> = nodes.par_iter_mut()
308            .filter_map(|node| self.objects.write(node).err())
309            .collect();
310        if let Some(e) = write_errors.into_iter().next() { return Err(e); }
311
312        // Parallel id-index updates
313        let index_errors: Vec<anyhow::Error> = nodes.par_iter()
314            .filter_map(|node| self.id_index.set(&node.coll, &node.id, &node.hash).err())
315            .collect();
316        if let Some(e) = index_errors.into_iter().next() { return Err(e); }
317
318        // Sorted indexes + causal graph (sequential — small overhead, usually no indexes)
319        for node in &nodes {
320            self.seq_index.insert(node.seq, node.hash.clone());
321            if let Value::Object(ref obj) = node.data {
322                for (field, value) in obj {
323                    if self.sorted_indexes.has(&node.coll, field) {
324                        self.sorted_indexes.insert(&node.coll, field, value, &node.hash);
325                    }
326                }
327            }
328            for cause in &node.caused_by {
329                self.graph.add_edge(&node.hash, "caused_by", cause).ok();
330                self.graph.add_edge(cause, "caused_by_rev", &node.hash).ok();
331            }
332        }
333
334        // Single Merkle head update for the whole batch (chain all hashes)
335        for node in &nodes {
336            self.update_head(node.seq, &node.hash);
337        }
338
339        Ok(nodes)
340    }
341
342    /// Update the running Merkle head with a new write. O(1), lock-free on the flush path.
343    /// Sets dirty flag — the background ticker calls flush_manifest periodically.
344    /// This removes 2× file I/O ops from the hot write path, unblocking concurrent writes.
345    fn update_head(&self, seq: u64, new_hash: &str) {
346        use blake2::{Blake2b512, Digest};
347        let prev = self.head.read().clone();
348        let mut h = Blake2b512::new();
349        h.update(prev.as_bytes());
350        h.update(seq.to_le_bytes());
351        h.update(new_hash.as_bytes());
352        *self.head.write() = hex::encode(&h.finalize()[..32]);
353        // Mark dirty — background ticker will flush to MANIFEST (no I/O on write path)
354        self.manifest_dirty.store(true, Ordering::Release);
355    }
356
357    /// Flush both the id-index WAL and MANIFEST. Used on graceful shutdown.
358    pub fn flush_all(&self) {
359        self.id_index.flush_write_buf();
360        // v3: fsync the active segment (no-op for loose/in-memory stores).
361        // One durability point per batch instead of one fsync per object.
362        if let Err(e) = self.objects.sync() {
363            eprintln!("nedb: segment sync failed: {}", e);
364        }
365        self.flush_manifest();
366    }
367
368    /// Compact the v3 packed object store: keep the CURRENT version of every
369    /// document (from the id-index) and reclaim everything else. No-op unless
370    /// running with the v3 segment substrate (`--dag-v3` / NEDB_DAG_V3).
371    ///
372    /// This is a PRUNING operation: superseded/historical object versions are
373    /// dropped, so AS OF / TRACE over pruned versions is discarded — that is
374    /// what reclaims the space. Flushes first so all data is durable on disk
375    /// before the old segments are deleted.
376    pub fn compact(&self) -> Result<crate::segment::CompactStats> {
377        self.flush_all();
378        let mut live: std::collections::HashSet<String> = std::collections::HashSet::new();
379        for coll in self.id_index.collections() {
380            for id in self.id_index.list_ids(&coll) {
381                if let Some(h) = self.id_index.get(&coll, &id) {
382                    live.insert(h);
383                }
384            }
385        }
386        self.objects.compact(&live)
387    }
388
389    /// Flush MANIFEST to disk if dirty. No-op for in-memory databases.
390    pub fn flush_manifest_if_dirty(&self) {
391        if self.root == std::path::PathBuf::from(":memory:") { return; }
392        if self.manifest_dirty.compare_exchange(
393            true, false, Ordering::AcqRel, Ordering::Relaxed
394        ).is_ok() {
395            self.flush_manifest();
396        }
397    }
398
399    /// Atomically persist current seq+head to MANIFEST. No-op for in-memory databases.
400    pub fn flush_manifest(&self) {
401        if self.root == std::path::PathBuf::from(":memory:") { return; }
402        let seq  = self.seq.load(Ordering::SeqCst);
403        let head = self.head.read().clone();
404        let m = Manifest { seq, head };
405        if let Ok(json) = serde_json::to_string(&m) {
406            let path = self.root.join("MANIFEST");
407            let tmp  = self.root.join("MANIFEST.tmp");
408            let _ = fs::write(&tmp, &json);
409            let _ = fs::rename(&tmp, &path);
410        }
411    }
412
413    /// Start a background thread that flushes both the id-index WAL and MANIFEST
414    /// every `interval_ms` milliseconds.
415    /// Call this after Arc::new(db) — the Arc keeps Db alive for the thread's lifetime.
416    pub fn start_manifest_ticker(self_arc: Arc<Self>, interval_ms: u64) {
417        let db = self_arc;
418        std::thread::spawn(move || {
419            loop {
420                std::thread::sleep(std::time::Duration::from_millis(interval_ms));
421                // Flush id-index WAL to disk (parallel Rayon writes)
422                db.id_index.flush_write_buf();
423                // Then flush MANIFEST
424                db.flush_manifest_if_dirty();
425            }
426        });
427    }
428
429    /// Return the current Merkle head string. O(1) — read from cache.
430    pub fn head(&self) -> String {
431        self.head.read().clone()
432    }
433
434    /// Delete a document — writes a tombstone node and removes the id from the index.
435    /// The object history is preserved in the DAG; only the live id pointer is cleared.
436    pub fn delete(&self, coll: &str, id: &str) -> Result<bool> {
437        let prev = match self.id_index.get(coll, id) {
438            None => return Ok(false),   // already gone
439            Some(h) => h,
440        };
441        let seq = self.seq.fetch_add(1, Ordering::SeqCst);
442        let mut tombstone = Node {
443            id:         format!("_del_{}", id),
444            coll:       coll.to_string(),
445            seq,
446            data:       serde_json::json!({"_deleted": id, "_prev": prev}),
447            prev:       Some(prev),
448            caused_by:  vec![],
449            ts:         now(),
450            valid_from: None,
451            valid_to:   None,
452            hash:       String::new(),
453        };
454        let hash = self.objects.write(&mut tombstone)?;
455        self.update_head(seq, &hash);
456        // Remove the live id pointer — doc is now invisible to queries and list()
457        self.id_index.remove(coll, id)?;
458        Ok(true)
459    }
460
461    /// Get the current version of a document by id.
462    pub fn get(&self, coll: &str, id: &str) -> Option<Node> {
463        let hash = self.id_index.get(coll, id)?;
464        self.objects.read(&hash).ok()
465    }
466
467    /// Get a specific version of a document by object hash.
468    pub fn get_by_hash(&self, hash: &str) -> Option<Node> {
469        self.objects.read(hash).ok()
470    }
471
472    /// Get a document AS OF a specific sequence number.
473    /// Walks the version chain (prev links) backward until seq <= target.
474    pub fn get_as_of(&self, coll: &str, id: &str, target_seq: u64) -> Option<Node> {
475        let hash = self.id_index.get(coll, id)?;
476        let mut current = self.objects.read(&hash).ok()?;
477        loop {
478            if current.seq <= target_seq {
479                return Some(current);
480            }
481            let prev_hash = current.prev.as_deref()?;
482            current = self.objects.read(prev_hash).ok()?;
483        }
484    }
485
486    /// List all documents in a collection, returning current versions.
487    pub fn list(&self, coll: &str) -> Vec<Node> {
488        self.id_index
489            .list_ids(coll)
490            .into_iter()
491            .filter_map(|id| self.get(coll, &id))
492            .collect()
493    }
494
495    /// ORDER BY field ASC LIMIT n — uses sorted index if available, else falls back to full scan.
496    pub fn order_by_asc(&self, coll: &str, field: &str, limit: usize) -> Vec<Node> {
497        if self.sorted_indexes.has(coll, field) {
498            self.sorted_indexes
499                .top_k_asc(coll, field, limit)
500                .into_iter()
501                .filter_map(|h| self.objects.read(&h).ok())
502                .collect()
503        } else {
504            let mut docs = self.list(coll);
505            docs.sort_by(|a, b| {
506                let av = a.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
507                let bv = b.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
508                av.cmp(&bv)
509            });
510            docs.truncate(limit);
511            docs
512        }
513    }
514
515    /// ORDER BY field DESC LIMIT n
516    pub fn order_by_desc(&self, coll: &str, field: &str, limit: usize) -> Vec<Node> {
517        if self.sorted_indexes.has(coll, field) {
518            self.sorted_indexes
519                .top_k_desc(coll, field, limit)
520                .into_iter()
521                .filter_map(|h| self.objects.read(&h).ok())
522                .collect()
523        } else {
524            let mut docs = self.list(coll);
525            docs.sort_by(|a, b| {
526                let av = a.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
527                let bv = b.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
528                bv.cmp(&av)
529            });
530            docs.truncate(limit);
531            docs
532        }
533    }
534
535    /// TRACE caused_by — walk causal graph from a node.
536    pub fn trace(&self, hash: &str, reverse: bool, limit: usize) -> Vec<Node> {
537        self.graph
538            .trace(hash, "caused_by", reverse, limit)
539            .into_iter()
540            .filter_map(|h| self.objects.read(&h).ok())
541            .collect()
542    }
543
544    /// Verify tamper-evidence of all objects.
545    pub fn verify(&self) -> (usize, Vec<String>) {
546        self.objects.verify_all()
547    }
548
549    /// Create a sorted index for a (coll, field) pair.
550    pub fn create_sorted_index(&self, coll: &str, field: &str) {
551        self.sorted_indexes.ensure(coll, field);
552        // Backfill from existing objects
553        for id in self.id_index.list_ids(coll) {
554            if let Some(node) = self.get(coll, &id) {
555                if let Value::Object(ref obj) = node.data {
556                    if let Some(value) = obj.get(field) {
557                        self.sorted_indexes.insert(coll, field, value, &node.hash);
558                    }
559                }
560            }
561        }
562    }
563
564    /// Resolve a sequence number to its content hash (v1 compatibility).
565    /// Only covers nodes written in the current process session + cold-scan nodes.
566    pub fn get_hash_by_seq(&self, seq: u64) -> Option<String> {
567        self.seq_index.get(&seq).map(|r| r.clone())
568    }
569
570    /// The tip — the most recently written node (highest seq), or `None` if the
571    /// database is empty. O(1): `self.seq` is the next-to-assign counter, so the
572    /// latest write sits at `seq - 1`; we resolve it through the same
573    /// seq_index → object-store path a normal read uses, so the returned Node is
574    /// byte-identical to one fetched by id or hash (it carries its own seq, hash,
575    /// causal links, and valid-time). This is the cheap "give me the latest write"
576    /// primitive — the head of the log, not an aggregate.
577    pub fn tip(&self) -> Option<Node> {
578        let next = self.seq.load(Ordering::SeqCst);
579        if next == 0 {
580            return None; // nothing written yet
581        }
582        let hash = self.get_hash_by_seq(next - 1)?;
583        self.get_by_hash(&hash)
584    }
585
586    /// The collection-local tip — the most recent write into `coll` (highest seq in
587    /// that collection), or `None` if the collection has no writes. Scans the seq
588    /// index backward from the head until a node in `coll` is found, resolving
589    /// through the same seq_index → object-store path as a normal read; bounded by
590    /// how recently `coll` was written. Conceptually a different index than the
591    /// global `tip()` (global head vs collection head), kept as a separate method
592    /// so each is explicit — parity with the Python reference's `tip(coll)`. Lets a
593    /// consumer resume one chain (e.g. blocks / tx / utxo) without pulling global
594    /// tip and filtering.
595    pub fn tip_collection(&self, coll: &str) -> Option<Node> {
596        let mut s = self.seq.load(Ordering::SeqCst); // exclusive upper bound (head + 1)
597        while s > 0 {
598            s -= 1;
599            if let Some(hash) = self.get_hash_by_seq(s) {
600                if let Some(node) = self.get_by_hash(&hash) {
601                    if node.coll.as_str() == coll {
602                        return Some(node);
603                    }
604                }
605            }
606        }
607        None
608    }
609
610    /// Changefeed page: up to `limit` nodes written AFTER `after_seq` (EXCLUSIVE),
611    /// ascending by seq, wrapped in a `SinceBatch` cursor envelope. `after_seq` is
612    /// the cursor you last applied (a prior `tip()` seq or `to_seq`). `limit` bounds
613    /// the page — `0` means DEFAULT_SINCE_LIMIT, so the engine primitive can never
614    /// materialize an unbounded batch even when embedders call it directly (the
615    /// safety is here, not only in the HTTP layer). Drain by paging while
616    /// `has_more`, advancing your cursor to `to_seq`, then hand off to the live
617    /// `subscribe` edge. The append-only log IS the changefeed, so this is an
618    /// O(page) walk; unresolved seqs (outside seq_index coverage — see
619    /// `scan_status()`) are skipped rather than faked.
620    pub fn since(&self, after_seq: u64, limit: usize) -> SinceBatch {
621        let next = self.seq.load(Ordering::SeqCst);          // head + 1
622        let head_seq = next.saturating_sub(1);
623        let cap = if limit == 0 { DEFAULT_SINCE_LIMIT } else { limit };
624        let mut nodes: Vec<Node> = Vec::new();
625        let mut to_seq = after_seq;
626        let mut hit_limit = false;
627        let mut s = after_seq.saturating_add(1);
628        while s < next {
629            if nodes.len() >= cap { hit_limit = true; break; }
630            if let Some(hash) = self.get_hash_by_seq(s) {
631                if let Some(node) = self.get_by_hash(&hash) {
632                    to_seq = node.seq;
633                    nodes.push(node);
634                }
635            }
636            s += 1;
637        }
638        SinceBatch { nodes, from_seq: after_seq, to_seq, head_seq, has_more: hit_limit }
639    }
640
641    /// Replication readiness — see `ScanStatus`. `scan_complete` gates safe
642    /// historical catch-up: a consumer pulling an old cursor right after a cold
643    /// start must wait for it, or `since()` may hand back a partial page that looks
644    /// like "caught up". Computes the indexed range by scanning the in-memory seq
645    /// index (O(index)) — intended for periodic status polls, not the per-write
646    /// hot path.
647    pub fn scan_status(&self) -> ScanStatus {
648        let next = self.seq.load(Ordering::SeqCst);
649        let mut min = u64::MAX;
650        let mut max = 0u64;
651        let mut count = 0usize;
652        for kv in self.seq_index.iter() {
653            let s = *kv.key();
654            if s < min { min = s; }
655            if s > max { max = s; }
656            count += 1;
657        }
658        if count == 0 { min = 0; }
659        ScanStatus {
660            scan_complete:   self.startup_ready.load(Ordering::SeqCst),
661            tip_seq:         next.saturating_sub(1),
662            indexed_seq_min: min,
663            indexed_seq_max: max,
664            indexed_count:   count,
665        }
666    }
667
668    /// Add an explicit named relation edge between two documents.
669    /// Add an explicit named relation between two "coll:id" nodes.
670    /// Relations stored as __links__ documents — NQL-queryable, time-travelable,
671    /// consistent with the PyO3 binding which uses the same __links__ convention.
672    pub fn link(&self, frm: &str, rel: &str, to: &str) -> Result<()> {
673        let (frm_coll, frm_id) = frm.split_once(':')
674            .ok_or_else(|| anyhow::anyhow!("link frm must be 'coll:id', got: {}", frm))?;
675        let (to_coll, to_id) = to.split_once(':')
676            .ok_or_else(|| anyhow::anyhow!("link to must be 'coll:id', got: {}", to))?;
677        if self.id_index.get(frm_coll, frm_id).is_none() {
678            anyhow::bail!("link: frm not found: {}", frm);
679        }
680        if self.id_index.get(to_coll, to_id).is_none() {
681            anyhow::bail!("link: to not found: {}", to);
682        }
683        let link_id = format!("{}|{}|{}", frm, rel, to);
684        let doc = serde_json::json!({"_from": frm, "_rel": rel, "_to": to});
685        self.put("__links__", &link_id, doc, vec![], None, None)?;
686        Ok(())
687    }
688
689    /// Remove a named relation (deletes the __links__ document).
690    pub fn unlink(&self, frm: &str, rel: &str, to: &str) -> Result<bool> {
691        let link_id = format!("{}|{}|{}", frm, rel, to);
692        self.delete("__links__", &link_id)
693    }
694
695    /// Get neighbor nodes via a named relation.
696    /// Queries __links__ — consistent with the PyO3 binding.
697    pub fn neighbors(&self, frm: &str, rel: &str) -> Vec<Node> {
698        self.id_index
699            .list_ids("__links__")
700            .into_iter()
701            .filter_map(|id| self.get("__links__", &id))
702            .filter(|node| {
703                node.data.get("_from").and_then(|v| v.as_str()) == Some(frm)
704                    && node.data.get("_rel").and_then(|v| v.as_str()) == Some(rel)
705            })
706            .filter_map(|node| {
707                let to = node.data.get("_to")?.as_str()?;
708                let (to_coll, to_id) = to.split_once(':')?;
709                self.get(to_coll, to_id)
710            })
711            .collect()
712    }
713}
714
715impl Drop for Db {
716    /// Flush buffered state when the database is closed so a write-then-drop
717    /// sequence is durable without an explicit `flush_all()`.
718    ///
719    /// `IdIndex::set` only stages updates in the in-memory WAL `write_buf`;
720    /// disk persistence happens in `flush_write_buf()`, normally driven by the
721    /// manifest ticker. A short-lived `Db` (a library user's `{ let db =
722    /// Db::open(p)?; db.put(..)?; }` block, or a test) has no ticker, so without
723    /// this its writes would be silently lost on reopen. Flushing on drop
724    /// mirrors the flush-on-close contract of other embedded stores (sled,
725    /// RocksDB).
726    ///
727    /// In production this is a harmless safety net, not the primary durability
728    /// path: the manifest ticker thread holds an `Arc<Db>` for the process
729    /// lifetime, so `Drop` only fires once every owning handle is gone. No-op
730    /// for in-memory databases (`flush_all` short-circuits on `:memory:`).
731    fn drop(&mut self) {
732        self.flush_all();
733    }
734}
735
736/// Background cold-scan worker. Takes Arc<Db> — safe, Db is on the heap.
737fn cold_scan_background_arc(db: Arc<Db>) {
738    use rayon::prelude::*;
739    use blake2::{Blake2b512, Digest};
740
741    let objects        = &db.objects;
742    let head           = &db.head;
743    let seq_atomic     = &db.seq;
744    let sorted_indexes = &db.sorted_indexes;
745    let root           = db.root.clone();
746    let ready_flag     = Arc::clone(&db.startup_ready);
747
748    let hashes: Vec<String> = objects.all_hashes().collect();
749    let total = hashes.len();
750
751    if total == 0 {
752        ready_flag.store(true, Ordering::SeqCst);
753        return;
754    }
755
756    println!("  [nedbd] background scan — {} objects...", total);
757    let t0 = std::time::Instant::now();
758    let step = (total / 10).max(1000);
759
760    let nodes: Vec<Node> = hashes.par_iter()
761        .enumerate()
762        .filter_map(|(i, h)| {
763            if i > 0 && i % step == 0 {
764                let pct     = i * 100 / total;
765                let elapsed = t0.elapsed().as_secs_f32();
766                let rate    = i as f32 / elapsed;
767                let eta     = (total - i) as f32 / rate;
768                eprint!("\r  [nedbd]   {:>3}%  {:>8} / {:>8}  ({:>8.0}/s  eta {:.0}s)   ",
769                    pct, i, total, rate, eta);
770            }
771            objects.read(h).ok()
772        })
773        .collect();
774
775    eprintln!("\r  [nedbd]   100%  {:>8} / {:>8}  ({:.1}s)                        ",
776        total, total, t0.elapsed().as_secs_f32());
777
778    let max_seq = nodes.iter().map(|n| n.seq).max().unwrap_or(0);
779    seq_atomic.store(max_seq + 1, Ordering::SeqCst);
780
781    for node in &nodes {
782        db.seq_index.insert(node.seq, node.hash.clone());
783        if let Value::Object(ref obj) = node.data {
784            for (field, value) in obj {
785                if sorted_indexes.has(&node.coll, field) {
786                    sorted_indexes.insert(&node.coll, field, value, &node.hash);
787                }
788            }
789        }
790    }
791
792    // Compute Merkle head from sorted hashes
793    let mut sorted_hashes = hashes;
794    sorted_hashes.sort();
795    let mut h = Blake2b512::new();
796    h.update(max_seq.to_le_bytes());
797    for hash_str in &sorted_hashes {
798        h.update(hash_str.as_bytes());
799    }
800    let new_head = hex::encode(&h.finalize()[..32]);
801    *head.write() = new_head.clone();
802
803    // Write MANIFEST atomically
804    let m     = Manifest { seq: max_seq, head: new_head };
805    let json  = serde_json::to_string(&m).unwrap_or_default();
806    let path  = root.join("MANIFEST");
807    let tmp   = root.join("MANIFEST.tmp");
808    let _ = fs::write(&tmp, &json);
809    let _ = fs::rename(&tmp, &path);
810
811    // Signal server: writes can now proceed
812    ready_flag.store(true, Ordering::SeqCst);
813    println!("  [nedbd] background scan complete — seq={} objects={} MANIFEST written", max_seq, total);
814}
815
816fn now() -> f64 {
817    std::time::SystemTime::now()
818        .duration_since(std::time::UNIX_EPOCH)
819        .map(|d| d.as_secs_f64())
820        .unwrap_or(0.0)
821}
822
823#[cfg(test)]
824mod tests {
825    use super::*;
826    use tempfile::tempdir;
827
828    #[test]
829    fn put_and_get() {
830        let dir = tempdir().unwrap();
831        let db = Db::open(dir.path(), None).unwrap();
832        db.put(
833            "blocks", "618000",
834            serde_json::json!({"height": 618000, "hash": "0000abc"}),
835            vec![], None, None,
836        ).unwrap();
837        let node = db.get("blocks", "618000").unwrap();
838        assert_eq!(node.id, "618000");
839        assert_eq!(node.data["height"], 618000);
840    }
841
842    #[test]
843    fn order_by_with_sorted_index() {
844        let dir = tempdir().unwrap();
845        let db = Db::open(dir.path(), None).unwrap();
846        db.create_sorted_index("blocks", "height");
847        for h in [3u64, 1, 5, 2, 4] {
848            db.put("blocks", &h.to_string(),
849                serde_json::json!({"height": h}),
850                vec![], None, None).unwrap();
851        }
852        let asc = db.order_by_asc("blocks", "height", 3);
853        let heights: Vec<u64> = asc.iter()
854            .filter_map(|n| n.data["height"].as_u64())
855            .collect();
856        assert_eq!(heights, vec![1, 2, 3]);
857    }
858
859    #[test]
860    fn causal_trace() {
861        let dir = tempdir().unwrap();
862        let db = Db::open(dir.path(), None).unwrap();
863        let a = db.put("ops", "a", serde_json::json!({"op": "create"}), vec![], None, None).unwrap();
864        let b = db.put("ops", "b", serde_json::json!({"op": "transfer"}), vec![a.hash.clone()], None, None).unwrap();
865        let c = db.put("ops", "c", serde_json::json!({"op": "burn"}), vec![b.hash.clone()], None, None).unwrap();
866
867        let trace = db.trace(&c.hash, false, 10);
868        assert_eq!(trace.len(), 3);  // c → b → a
869    }
870
871    #[test]
872    fn as_of() {
873        let dir = tempdir().unwrap();
874        let db = Db::open(dir.path(), None).unwrap();
875        let v1 = db.put("docs", "x", serde_json::json!({"v": 1}), vec![], None, None).unwrap();
876        let _v2 = db.put("docs", "x", serde_json::json!({"v": 2}), vec![], None, None).unwrap();
877
878        let at_v1 = db.get_as_of("docs", "x", v1.seq).unwrap();
879        assert_eq!(at_v1.data["v"], 1);
880        let current = db.get("docs", "x").unwrap();
881        assert_eq!(current.data["v"], 2);
882    }
883}
884
885#[cfg(test)]
886mod tests_v2 {
887    use super::*;
888    use tempfile::tempdir;
889
890    #[test]
891    fn seq_index_populated_on_put() {
892        let db = Db::in_memory();
893        let a = db.put("item", "a", serde_json::json!({"x": 1}), vec![], None, None).unwrap();
894        let b = db.put("item", "b", serde_json::json!({"x": 2}), vec![], None, None).unwrap();
895        assert_eq!(db.get_hash_by_seq(a.seq), Some(a.hash.clone()));
896        assert_eq!(db.get_hash_by_seq(b.seq), Some(b.hash.clone()));
897        assert_eq!(db.get_hash_by_seq(9999), None);
898    }
899
900    #[test]
901    fn tip_and_since() {
902        let db = Db::in_memory();
903        // Empty db: no tip, empty changefeed.
904        assert!(db.tip().is_none());
905        assert!(db.since(0, 0).nodes.is_empty());
906
907        let a = db.put("item", "a", serde_json::json!({"x": 1}), vec![], None, None).unwrap();
908        let b = db.put("item", "b", serde_json::json!({"x": 2}), vec![], None, None).unwrap();
909
910        // tip() = the most recent write (highest seq), returned as a full node.
911        let t = db.tip().expect("tip after writes");
912        assert_eq!(t.seq, b.seq);
913        assert_eq!(t.id, "b");
914        assert_eq!(t.hash, b.hash);
915
916        // since(after_seq, limit) — EXCLUSIVE cursor, bounded page + envelope.
917        let after_a = db.since(a.seq, 0);
918        assert_eq!(after_a.nodes.len(), 1);
919        assert_eq!(after_a.nodes[0].id, "b");
920        assert_eq!(after_a.from_seq, a.seq);
921        assert_eq!(after_a.to_seq, b.seq);
922        assert_eq!(after_a.head_seq, b.seq);
923        assert!(!after_a.has_more);
924
925        // Nothing written after the tip.
926        assert!(db.since(b.seq, 0).nodes.is_empty());
927
928        // `limit` bounds the page and sets has_more; resume from to_seq.
929        let c = db.put("item", "c", serde_json::json!({"x": 3}), vec![], None, None).unwrap();
930        let page = db.since(a.seq, 1);             // (a..] capped at 1 -> [b], more pending
931        assert_eq!(page.nodes.len(), 1);
932        assert_eq!(page.nodes[0].id, "b");
933        assert_eq!(page.to_seq, b.seq);
934        assert!(page.has_more);
935        let page2 = db.since(page.to_seq, 1);      // resume from b -> [c], done
936        assert_eq!(page2.nodes.len(), 1);
937        assert_eq!(page2.nodes[0].id, "c");
938        assert_eq!(page2.to_seq, c.seq);
939        assert!(!page2.has_more);
940    }
941
942    #[test]
943    fn tip_collection_per_chain() {
944        // The ITC sync-client case: separate chains in separate collections; a
945        // consumer resumes ONE without pulling global tip and filtering.
946        let db = Db::in_memory();
947        assert!(db.tip_collection("blocks").is_none());
948
949        db.put("blocks", "b0", serde_json::json!({"h": 0}), vec![], None, None).unwrap();
950        db.put("tx",     "t0", serde_json::json!({"v": 1}), vec![], None, None).unwrap();
951        let b1 = db.put("blocks", "b1", serde_json::json!({"h": 1}), vec![], None, None).unwrap();
952        let t1 = db.put("tx",     "t1", serde_json::json!({"v": 2}), vec![], None, None).unwrap();
953
954        // global tip = latest write overall (t1)
955        assert_eq!(db.tip().unwrap().id, "t1");
956        // collection-local tips = latest write in each collection
957        let bt = db.tip_collection("blocks").expect("blocks tip");
958        assert_eq!(bt.id, "b1");
959        assert_eq!(bt.seq, b1.seq);
960        assert_eq!(db.tip_collection("tx").unwrap().seq, t1.seq);
961        assert!(db.tip_collection("absent").is_none());
962    }
963
964    #[test]
965    fn seq_index_survives_batch() {
966        let db = Db::in_memory();
967        let nodes = db.put_batch(vec![
968            ("item".into(), "x".into(), serde_json::json!({"v": 1}), vec![], None, None),
969            ("item".into(), "y".into(), serde_json::json!({"v": 2}), vec![], None, None),
970        ]).unwrap();
971        for node in &nodes {
972            assert_eq!(db.get_hash_by_seq(node.seq), Some(node.hash.clone()));
973        }
974    }
975
976    #[test]
977    fn link_and_neighbors() {
978        let db = Db::in_memory();
979        db.put("driver", "d1", serde_json::json!({"name": "Bob"}),   vec![], None, None).unwrap();
980        db.put("driver", "d2", serde_json::json!({"name": "Carol"}), vec![], None, None).unwrap();
981        db.put("trip",   "t1", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
982        db.put("trip",   "t2", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
983
984        db.link("driver:d1", "handles", "trip:t1").unwrap();
985        db.link("driver:d1", "handles", "trip:t2").unwrap();
986        db.link("driver:d2", "handles", "trip:t1").unwrap();
987
988        let d1_trips = db.neighbors("driver:d1", "handles");
989        assert_eq!(d1_trips.len(), 2);
990        let ids: std::collections::HashSet<&str> = d1_trips.iter().map(|n| n.id.as_str()).collect();
991        assert!(ids.contains("t1") && ids.contains("t2"));
992
993        let d2_trips = db.neighbors("driver:d2", "handles");
994        assert_eq!(d2_trips.len(), 1);
995        assert_eq!(d2_trips[0].id, "t1");
996    }
997
998    #[test]
999    fn link_stored_in_links_collection() {
1000        // Links are stored as __links__ documents, not as graph edges.
1001        // The __links__ collection is NQL-queryable and consistent with the PyO3 binding.
1002        let db = Db::in_memory();
1003        db.put("driver", "d1", serde_json::json!({"name": "Bob"}),   vec![], None, None).unwrap();
1004        db.put("trip",   "t1", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
1005        db.link("driver:d1", "handles", "trip:t1").unwrap();
1006        // Verify the __links__ document was created
1007        let link_doc = db.get("__links__", "driver:d1|handles|trip:t1");
1008        assert!(link_doc.is_some(), "__links__ doc should exist");
1009        let doc = link_doc.unwrap();
1010        assert_eq!(doc.data["_from"], "driver:d1");
1011        assert_eq!(doc.data["_rel"],  "handles");
1012        assert_eq!(doc.data["_to"],   "trip:t1");
1013        // neighbors() resolves to the target node
1014        let nb = db.neighbors("driver:d1", "handles");
1015        assert_eq!(nb.len(), 1);
1016        assert_eq!(nb[0].id, "t1");
1017    }
1018
1019    #[test]
1020    fn link_missing_node_errors() {
1021        let db = Db::in_memory();
1022        db.put("driver", "d1", serde_json::json!({}), vec![], None, None).unwrap();
1023        assert!(db.link("driver:d1", "handles", "trip:ghost").is_err());
1024    }
1025
1026    #[test]
1027    fn link_durable_survives_reopen() {
1028        let dir = tempdir().unwrap();
1029        {
1030            let db = Db::open(dir.path(), None).unwrap();
1031            db.put("driver", "d1", serde_json::json!({"name": "Bob"}),   vec![], None, None).unwrap();
1032            db.put("trip",   "t1", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
1033            db.link("driver:d1", "handles", "trip:t1").unwrap();
1034        }
1035        let db2 = Db::open(dir.path(), None).unwrap();
1036        db2.startup_ready.store(true, std::sync::atomic::Ordering::SeqCst);
1037        let trips = db2.neighbors("driver:d1", "handles");
1038        assert_eq!(trips.len(), 1);
1039        assert_eq!(trips[0].id, "t1");
1040    }
1041}