Skip to main content

nedb_engine/
db.rs

1//! Main DAG database — coordinates ObjectStore, IdIndex, SortedIndexes, GraphStore.
2
3use std::fs;
4use std::path::{Path, PathBuf};
5use std::sync::Arc;
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7use anyhow::Result;
8use dashmap::DashMap;
9use serde_json::Value;
10use parking_lot::RwLock;
11
12use crate::store::{Dek, Node, ObjectStore};
13use crate::index::{IdIndex, OrderedValue, SortedIndexes};
14use crate::graph::GraphStore;
15use crate::migrate;
16
17/// MANIFEST: cached {seq, head} written atomically after every write.
18/// On startup, if MANIFEST exists and no sorted indexes need rebuilding,
19/// startup is O(1) — just read this one file instead of scanning all objects.
20#[derive(serde::Serialize, serde::Deserialize)]
21struct Manifest {
22    seq:  u64,
23    head: String,
24    /// Object hash of the highest-seq node at flush time. Lets `tip()` resolve the
25    /// last write O(1) on a warm boot — before any scan repopulates the in-memory
26    /// seq index. `#[serde(default)]` so pre-2.5.43 MANIFESTs (no field) still parse.
27    #[serde(default)]
28    tip_hash: String,
29}
30
31/// Default cap for `since()` when the caller passes `limit == 0`. Bounds the
32/// engine primitive itself so a stale/offline consumer can never force an
33/// unbounded materialization — the safety lives in the core, not the HTTP layer.
34pub const DEFAULT_SINCE_LIMIT: usize = 10_000;
35
36/// One page of the changefeed returned by `since()`. The replication contract:
37/// apply `nodes` in ascending seq order, advance your cursor to `to_seq`, and keep
38/// paging while `has_more` is true; then attach to the live `subscribe` edge.
39/// `head_seq` tells the consumer how far the log currently extends (how far behind
40/// it is).
41#[derive(Debug, Clone, serde::Serialize)]
42pub struct SinceBatch {
43    /// Writes in (`from_seq`, `to_seq`], ascending by seq.
44    pub nodes:    Vec<Node>,
45    /// The exclusive cursor this page started from (echoes the request).
46    pub from_seq: u64,
47    /// Seq of the last node in this page — the consumer's next cursor.
48    pub to_seq:   u64,
49    /// Current head seq of the log (latest committed write).
50    pub head_seq: u64,
51    /// True when more writes remain past `to_seq` (the page hit `limit`).
52    pub has_more: bool,
53}
54
55/// Replication readiness snapshot. `scan_complete` is the correctness gate: until
56/// the cold-scan finishes rebuilding the seq index, an old cursor passed to
57/// `since()` can return a PARTIAL page and look (wrongly) like "caught up". A
58/// correctness-critical consumer MUST wait for `scan_complete == true` before
59/// trusting historical catch-up. `indexed_seq_min/max` report the currently
60/// resolvable seq range; `tip_seq` is the log head.
61#[derive(Debug, Clone, serde::Serialize)]
62pub struct ScanStatus {
63    /// Cold-scan finished — historical seqs fully resolvable; catch-up is safe.
64    pub scan_complete:   bool,
65    /// Head seq of the log (latest committed write).
66    pub tip_seq:         u64,
67    /// Lowest seq currently in the seq index (0 if empty).
68    pub indexed_seq_min: u64,
69    /// Highest seq currently in the seq index.
70    pub indexed_seq_max: u64,
71    /// Number of seqs currently resolvable via the index.
72    pub indexed_count:   usize,
73}
74
75pub struct Db {
76    pub objects:        ObjectStore,
77    pub id_index:       IdIndex,
78    pub sorted_indexes: SortedIndexes,
79    pub graph:          GraphStore,
80    pub root:           PathBuf,
81    /// Dirty flag — set true when head changes, cleared after manifest flush.
82    /// Decouples flush_manifest from the hot write path so concurrent writes
83    /// don't serialise on 2× file I/O per PUT.
84    manifest_dirty:     Arc<AtomicBool>,
85    pub seq:            AtomicU64,
86    /// Cached Merkle head — updated incrementally on every write (O(1)).
87    head:               RwLock<String>,
88    /// Object hash of the most recent write (highest seq). Mirrors `head` but holds
89    /// the tip's content hash, so `tip()` can resolve the last node O(1) on a warm
90    /// boot when the in-memory `seq_index` is still cold. Persisted in MANIFEST.
91    tip_hash:           RwLock<String>,
92    /// True once startup is fully ready (MANIFEST loaded or cold scan complete).
93    /// Warm starts set this true before returning from open().
94    /// Cold starts set this true in the background thread when scan completes.
95    /// Writes are held with 503 until this is true; reads always proceed.
96    pub startup_ready:  Arc<AtomicBool>,
97    /// Seq → hash lookup for v1 compatibility. Populated by put(), put_batch(),
98    /// and the cold-scan background pass. Only covers nodes from the current
99    /// process session + cold-scan; older seqs not in this map cannot be resolved.
100    seq_index:          Arc<DashMap<u64, String>>,
101}
102
103impl Db {
104    /// Create a pure in-memory database — no disk I/O, no migration, instant startup.
105    /// Perfect for tests, hot-cache layers, and ephemeral sessions.
106    /// All data is lost when the Db is dropped.
107    pub fn in_memory() -> Self {
108        Self {
109            objects:        ObjectStore::in_memory(),
110            id_index:       IdIndex::in_memory(),
111            sorted_indexes: SortedIndexes::new(),
112            graph:          GraphStore::in_memory(),
113            root:           std::path::PathBuf::from(":memory:"),
114            seq:            AtomicU64::new(0),
115            head:           RwLock::new(String::new()),
116            tip_hash:       RwLock::new(String::new()),
117            startup_ready:  Arc::new(AtomicBool::new(true)),  // always ready
118            manifest_dirty: Arc::new(AtomicBool::new(false)),
119            seq_index:      Arc::new(DashMap::new()),
120        }
121    }
122
123    /// Open (or create) a database. Runs v1→v2 migration automatically if log.aof is present.
124    pub fn open(db_root: &Path, dek: Option<Dek>) -> Result<Self> {
125        std::fs::create_dir_all(db_root)?;
126
127        let objects        = ObjectStore::new(db_root, dek.clone())?;
128        let id_index       = IdIndex::new(db_root)?;
129        let sorted_indexes = SortedIndexes::new();
130        let graph          = GraphStore::new(db_root)?;
131
132        let mut db = Self {
133            objects,
134            id_index,
135            sorted_indexes,
136            graph,
137            root: db_root.to_path_buf(),
138            seq:  AtomicU64::new(0),
139            head: RwLock::new(String::new()),
140            tip_hash: RwLock::new(String::new()),
141            startup_ready:  Arc::new(AtomicBool::new(false)),
142            manifest_dirty: Arc::new(AtomicBool::new(false)),
143            seq_index:      Arc::new(DashMap::new()),
144        };
145
146        // Auto-migrate v1 → v2 if needed (pass DEK so encrypted AOFs convert correctly)
147        migrate::migrate_if_needed(
148            db_root,
149            &db.objects,
150            &db.id_index,
151            &db.sorted_indexes,
152            &db.graph,
153            dek.as_ref(),
154        )?;
155
156        // Fast startup: load seq+head from MANIFEST if no sorted indexes need rebuilding.
157        // Falls back to full object scan only when necessary (first open, or post-migration).
158        db.startup_rebuild()?;
159
160        Ok(db)
161    }
162
163    /// Smart startup:
164    /// - Warm (MANIFEST exists): O(1) load → startup_ready = true immediately.
165    /// - Cold (no MANIFEST): start server immediately, run scan in background thread.
166    ///   Writes return 503 until scan completes; reads always proceed.
167    fn startup_rebuild(&mut self) -> Result<()> {
168        let manifest_path = self.root.join("MANIFEST");
169        let needs_index_rebuild = !self.sorted_indexes.is_empty();
170
171        // Warm path: MANIFEST + no sorted indexes to rebuild → instant start
172        if manifest_path.exists() && !needs_index_rebuild {
173            if let Some(m) = fs::read_to_string(&manifest_path)
174                .ok()
175                .and_then(|s| serde_json::from_str::<Manifest>(&s).ok())
176            {
177                // Self-heal: MANIFEST with an empty or short head is corrupt/stale.
178                // Fall through to cold scan so the head is rebuilt correctly from objects.
179                if m.head.len() < 8 {
180                    eprintln!("  [nedbd] MANIFEST head invalid (len={}), self-healing via cold scan", m.head.len());
181                } else if m.tip_hash.is_empty() {
182                    // Pre-2.5.43 MANIFEST (no persisted tip). Cold-scan once to rebuild
183                    // the seq index and rewrite MANIFEST with tip_hash — warm + tip()-
184                    // durable on every boot thereafter.
185                    eprintln!("  [nedbd] MANIFEST predates durable tip() — cold scan once to upgrade");
186                } else {
187                    self.seq.store(m.seq, Ordering::SeqCst); // m.seq is already the next-to-assign counter
188                    *self.head.write() = m.head.clone();
189                    *self.tip_hash.write() = m.tip_hash.clone();
190                    self.startup_ready.store(true, Ordering::SeqCst);
191                    println!("  [nedbd] warm start — seq={} head={}... tip={}...",
192                        m.seq, &m.head[..8], &m.tip_hash[..8.min(m.tip_hash.len())]);
193                    return Ok(());
194                }
195            } else {
196                eprintln!("  [nedbd] MANIFEST corrupt or missing, falling back to cold scan");
197            }
198        }
199
200        // Cold path: mark as not ready, return immediately.
201        // The actual background scan is started by Db::start_cold_scan(arc)
202        // which is called from Manager::open_all() AFTER Arc::new(db) — when
203        // the Db is heap-allocated and its field addresses are permanently stable.
204        // Capturing field addresses here would cause UB: Db moves on return.
205        println!("  [nedbd] cold start — background scan will start after heap allocation");
206        Ok(())
207    }
208
209    /// Call this from Manager::open_all() after Arc::new(db).
210    /// Spawns the cold scan background thread with stable heap addresses.
211    /// No-op if startup is already complete (warm start).
212    pub fn start_cold_scan(self_arc: Arc<Self>) {
213        if self_arc.startup_ready.load(Ordering::SeqCst) {
214            return; // warm start — already ready
215        }
216        // Fast path: if the database is empty (new or just created), skip the
217        // background thread entirely. No objects to scan = instant startup.
218        if self_arc.objects.all_hashes().next().is_none() {
219            self_arc.startup_ready.store(true, Ordering::SeqCst);
220            return;
221        }
222        println!("  [nedbd] cold start — background scan starting, server accepting reads now");
223        std::thread::spawn(move || {
224            let db = self_arc;
225            cold_scan_background_arc(db);
226        });
227    }
228
229    /// Write a document. Returns the new node with its content hash set.
230    pub fn put(
231        &self,
232        coll: &str,
233        id: &str,
234        data: Value,
235        caused_by: Vec<String>,
236        valid_from: Option<String>,
237        valid_to:   Option<String>,
238    ) -> Result<Node> {
239        let seq  = self.seq.fetch_add(1, Ordering::SeqCst);
240        let prev = self.id_index.get(coll, id);
241
242        // Remove old node from sorted indexes (it's being superseded)
243        if let Some(old_hash) = &prev {
244            if let Ok(old_node) = self.objects.read(old_hash) {
245                if let Value::Object(ref obj) = old_node.data {
246                    for (field, value) in obj {
247                        self.sorted_indexes.remove(coll, field, value, old_hash);
248                    }
249                }
250            }
251        }
252
253        let mut node = Node {
254            id:         id.to_string(),
255            coll:       coll.to_string(),
256            seq,
257            data:       data.clone(),
258            prev,
259            caused_by:  caused_by.clone(),
260            ts:         now(),
261            valid_from,
262            valid_to,
263            hash:       String::new(),
264        };
265
266        // Write to object store (atomic, content-addressed)
267        let hash = self.objects.write(&mut node)?;
268        self.seq_index.insert(seq, hash.clone());
269
270        // Update id index (atomic file)
271        self.id_index.set(coll, id, &hash)?;
272
273        // Update sorted indexes
274        if let Value::Object(ref obj) = data {
275            for (field, value) in obj {
276                if self.sorted_indexes.has(coll, field) {
277                    self.sorted_indexes.insert(coll, field, value, &hash);
278                }
279            }
280        }
281
282        // Write causal graph edges
283        for cause in &caused_by {
284            self.graph.add_edge(&hash, "caused_by", cause)?;
285            self.graph.add_edge(cause, "caused_by_rev", &hash)?;
286        }
287
288        // Update running Merkle head: O(1) chain, no full recompute.
289        // new_head = BLAKE2b(prev_head || seq_bytes || new_object_hash)
290        self.update_head(seq, &hash);
291
292        Ok(node)
293    }
294
295    /// Batch put: write N documents in parallel, preserving monotonic seq ordering.
296    /// Pre-allocates N seq numbers atomically, then parallelises object writes and
297    /// id-index updates via Rayon. Each op is independent — safe to parallelise.
298    /// Returns nodes in input order with assigned seq numbers.
299    pub fn put_batch(
300        &self,
301        ops: Vec<(String, String, Value, Vec<String>, Option<String>, Option<String>)>,
302        // (coll, id, data, caused_by, valid_from, valid_to)
303    ) -> Result<Vec<Node>> {
304        use rayon::prelude::*;
305
306        if ops.is_empty() { return Ok(vec![]); }
307        let n = ops.len() as u64;
308
309        // Pre-allocate N consecutive seq numbers — preserves ordering under concurrency
310        let base_seq = self.seq.fetch_add(n, Ordering::SeqCst);
311        let ts = now();
312
313        // Build nodes with assigned seq numbers
314        let mut nodes: Vec<Node> = ops.into_iter().enumerate().map(|(i, (coll, id, data, caused_by, valid_from, valid_to))| {
315            let prev = self.id_index.get(&coll, &id);
316            Node {
317                id, coll, seq: base_seq + i as u64,
318                data, prev, caused_by,
319                ts, valid_from, valid_to,
320                hash: String::new(),
321            }
322        }).collect();
323
324        // Parallel object writes (content-addressed, idempotent, safe to parallelise)
325        let write_errors: Vec<anyhow::Error> = nodes.par_iter_mut()
326            .filter_map(|node| self.objects.write(node).err())
327            .collect();
328        if let Some(e) = write_errors.into_iter().next() { return Err(e); }
329
330        // Parallel id-index updates
331        let index_errors: Vec<anyhow::Error> = nodes.par_iter()
332            .filter_map(|node| self.id_index.set(&node.coll, &node.id, &node.hash).err())
333            .collect();
334        if let Some(e) = index_errors.into_iter().next() { return Err(e); }
335
336        // Sorted indexes + causal graph (sequential — small overhead, usually no indexes)
337        for node in &nodes {
338            self.seq_index.insert(node.seq, node.hash.clone());
339            if let Value::Object(ref obj) = node.data {
340                for (field, value) in obj {
341                    if self.sorted_indexes.has(&node.coll, field) {
342                        self.sorted_indexes.insert(&node.coll, field, value, &node.hash);
343                    }
344                }
345            }
346            for cause in &node.caused_by {
347                self.graph.add_edge(&node.hash, "caused_by", cause).ok();
348                self.graph.add_edge(cause, "caused_by_rev", &node.hash).ok();
349            }
350        }
351
352        // Single Merkle head update for the whole batch (chain all hashes)
353        for node in &nodes {
354            self.update_head(node.seq, &node.hash);
355        }
356
357        Ok(nodes)
358    }
359
360    /// Update the running Merkle head with a new write. O(1), lock-free on the flush path.
361    /// Sets dirty flag — the background ticker calls flush_manifest periodically.
362    /// This removes 2× file I/O ops from the hot write path, unblocking concurrent writes.
363    fn update_head(&self, seq: u64, new_hash: &str) {
364        use blake2::{Blake2b512, Digest};
365        let prev = self.head.read().clone();
366        let mut h = Blake2b512::new();
367        h.update(prev.as_bytes());
368        h.update(seq.to_le_bytes());
369        h.update(new_hash.as_bytes());
370        *self.head.write() = hex::encode(&h.finalize()[..32]);
371        // Track the tip's object hash. update_head arrives in ascending seq order
372        // (put in seq order; put_batch loops the batch in order), so the last call
373        // wins → the highest-seq node's hash. Persisted in MANIFEST for warm-boot tip().
374        *self.tip_hash.write() = new_hash.to_string();
375        // Mark dirty — background ticker will flush to MANIFEST (no I/O on write path)
376        self.manifest_dirty.store(true, Ordering::Release);
377    }
378
379    /// Flush both the id-index WAL and MANIFEST. Used on graceful shutdown.
380    pub fn flush_all(&self) {
381        self.id_index.flush_write_buf();
382        // v3: fsync the active segment (no-op for loose/in-memory stores).
383        // One durability point per batch instead of one fsync per object.
384        if let Err(e) = self.objects.sync() {
385            eprintln!("nedb: segment sync failed: {}", e);
386        }
387        self.flush_manifest();
388    }
389
390    /// Compact the v3 packed object store: keep the CURRENT version of every
391    /// document (from the id-index) and reclaim everything else. No-op unless
392    /// running with the v3 segment substrate (`--dag-v3` / NEDB_DAG_V3).
393    ///
394    /// This is a PRUNING operation: superseded/historical object versions are
395    /// dropped, so AS OF / TRACE over pruned versions is discarded — that is
396    /// what reclaims the space. Flushes first so all data is durable on disk
397    /// before the old segments are deleted.
398    pub fn compact(&self) -> Result<crate::segment::CompactStats> {
399        self.flush_all();
400        let mut live: std::collections::HashSet<String> = std::collections::HashSet::new();
401        for coll in self.id_index.collections() {
402            for id in self.id_index.list_ids(&coll) {
403                if let Some(h) = self.id_index.get(&coll, &id) {
404                    live.insert(h);
405                }
406            }
407        }
408        self.objects.compact(&live)
409    }
410
411    /// Flush MANIFEST to disk if dirty. No-op for in-memory databases.
412    pub fn flush_manifest_if_dirty(&self) {
413        if self.root == std::path::PathBuf::from(":memory:") { return; }
414        if self.manifest_dirty.compare_exchange(
415            true, false, Ordering::AcqRel, Ordering::Relaxed
416        ).is_ok() {
417            self.flush_manifest();
418        }
419    }
420
421    /// Atomically persist current seq+head to MANIFEST. No-op for in-memory databases.
422    pub fn flush_manifest(&self) {
423        if self.root == std::path::PathBuf::from(":memory:") { return; }
424        let seq  = self.seq.load(Ordering::SeqCst);
425        let head = self.head.read().clone();
426        let tip_hash = self.tip_hash.read().clone();
427        let m = Manifest { seq, head, tip_hash };
428        if let Ok(json) = serde_json::to_string(&m) {
429            let path = self.root.join("MANIFEST");
430            let tmp  = self.root.join("MANIFEST.tmp");
431            let _ = fs::write(&tmp, &json);
432            let _ = fs::rename(&tmp, &path);
433        }
434    }
435
436    /// Start a background thread that flushes both the id-index WAL and MANIFEST
437    /// every `interval_ms` milliseconds.
438    /// Call this after Arc::new(db) — the Arc keeps Db alive for the thread's lifetime.
439    pub fn start_manifest_ticker(self_arc: Arc<Self>, interval_ms: u64) {
440        let db = self_arc;
441        std::thread::spawn(move || {
442            loop {
443                std::thread::sleep(std::time::Duration::from_millis(interval_ms));
444                // Flush id-index WAL to disk (parallel Rayon writes)
445                db.id_index.flush_write_buf();
446                // Then flush MANIFEST
447                db.flush_manifest_if_dirty();
448            }
449        });
450    }
451
452    /// Return the current Merkle head string. O(1) — read from cache.
453    pub fn head(&self) -> String {
454        self.head.read().clone()
455    }
456
457    /// Delete a document — writes a tombstone node and removes the id from the index.
458    /// The object history is preserved in the DAG; only the live id pointer is cleared.
459    pub fn delete(&self, coll: &str, id: &str) -> Result<bool> {
460        let prev = match self.id_index.get(coll, id) {
461            None => return Ok(false),   // already gone
462            Some(h) => h,
463        };
464        let seq = self.seq.fetch_add(1, Ordering::SeqCst);
465        let mut tombstone = Node {
466            id:         format!("_del_{}", id),
467            coll:       coll.to_string(),
468            seq,
469            data:       serde_json::json!({"_deleted": id, "_prev": prev}),
470            prev:       Some(prev),
471            caused_by:  vec![],
472            ts:         now(),
473            valid_from: None,
474            valid_to:   None,
475            hash:       String::new(),
476        };
477        let hash = self.objects.write(&mut tombstone)?;
478        self.update_head(seq, &hash);
479        // Remove the live id pointer — doc is now invisible to queries and list()
480        self.id_index.remove(coll, id)?;
481        Ok(true)
482    }
483
484    /// Get the current version of a document by id.
485    pub fn get(&self, coll: &str, id: &str) -> Option<Node> {
486        let hash = self.id_index.get(coll, id)?;
487        self.objects.read(&hash).ok()
488    }
489
490    /// Get a specific version of a document by object hash.
491    pub fn get_by_hash(&self, hash: &str) -> Option<Node> {
492        self.objects.read(hash).ok()
493    }
494
495    /// Get a document AS OF a specific sequence number.
496    /// Walks the version chain (prev links) backward until seq <= target.
497    pub fn get_as_of(&self, coll: &str, id: &str, target_seq: u64) -> Option<Node> {
498        let hash = self.id_index.get(coll, id)?;
499        let mut current = self.objects.read(&hash).ok()?;
500        loop {
501            if current.seq <= target_seq {
502                return Some(current);
503            }
504            let prev_hash = current.prev.as_deref()?;
505            current = self.objects.read(prev_hash).ok()?;
506        }
507    }
508
509    /// List all documents in a collection, returning current versions.
510    pub fn list(&self, coll: &str) -> Vec<Node> {
511        self.id_index
512            .list_ids(coll)
513            .into_iter()
514            .filter_map(|id| self.get(coll, &id))
515            .collect()
516    }
517
518    /// ORDER BY field ASC LIMIT n — uses sorted index if available, else falls back to full scan.
519    pub fn order_by_asc(&self, coll: &str, field: &str, limit: usize) -> Vec<Node> {
520        if self.sorted_indexes.has(coll, field) {
521            self.sorted_indexes
522                .top_k_asc(coll, field, limit)
523                .into_iter()
524                .filter_map(|h| self.objects.read(&h).ok())
525                .collect()
526        } else {
527            let mut docs = self.list(coll);
528            docs.sort_by(|a, b| {
529                let av = a.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
530                let bv = b.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
531                av.cmp(&bv)
532            });
533            docs.truncate(limit);
534            docs
535        }
536    }
537
538    /// ORDER BY field DESC LIMIT n
539    pub fn order_by_desc(&self, coll: &str, field: &str, limit: usize) -> Vec<Node> {
540        if self.sorted_indexes.has(coll, field) {
541            self.sorted_indexes
542                .top_k_desc(coll, field, limit)
543                .into_iter()
544                .filter_map(|h| self.objects.read(&h).ok())
545                .collect()
546        } else {
547            let mut docs = self.list(coll);
548            docs.sort_by(|a, b| {
549                let av = a.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
550                let bv = b.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
551                bv.cmp(&av)
552            });
553            docs.truncate(limit);
554            docs
555        }
556    }
557
558    /// TRACE caused_by — walk causal graph from a node.
559    pub fn trace(&self, hash: &str, reverse: bool, limit: usize) -> Vec<Node> {
560        self.graph
561            .trace(hash, "caused_by", reverse, limit)
562            .into_iter()
563            .filter_map(|h| self.objects.read(&h).ok())
564            .collect()
565    }
566
567    /// Verify tamper-evidence of all objects.
568    pub fn verify(&self) -> (usize, Vec<String>) {
569        self.objects.verify_all()
570    }
571
572    /// Create a sorted index for a (coll, field) pair.
573    pub fn create_sorted_index(&self, coll: &str, field: &str) {
574        self.sorted_indexes.ensure(coll, field);
575        // Backfill from existing objects
576        for id in self.id_index.list_ids(coll) {
577            if let Some(node) = self.get(coll, &id) {
578                if let Value::Object(ref obj) = node.data {
579                    if let Some(value) = obj.get(field) {
580                        self.sorted_indexes.insert(coll, field, value, &node.hash);
581                    }
582                }
583            }
584        }
585    }
586
587    /// Resolve a sequence number to its content hash (v1 compatibility).
588    /// Only covers nodes written in the current process session + cold-scan nodes.
589    pub fn get_hash_by_seq(&self, seq: u64) -> Option<String> {
590        self.seq_index.get(&seq).map(|r| r.clone())
591    }
592
593    /// The tip — the most recently written node (highest seq), or `None` if the
594    /// database is empty. O(1): `self.seq` is the next-to-assign counter, so the
595    /// latest write sits at `seq - 1`; we resolve it through the same
596    /// seq_index → object-store path a normal read uses, so the returned Node is
597    /// byte-identical to one fetched by id or hash (it carries its own seq, hash,
598    /// causal links, and valid-time). This is the cheap "give me the latest write"
599    /// primitive — the head of the log, not an aggregate.
600    pub fn tip(&self) -> Option<Node> {
601        let next = self.seq.load(Ordering::SeqCst);
602        if next == 0 {
603            return None; // nothing written yet
604        }
605        // Fast path: resolve the head seq through the in-memory seq index
606        // (populated by this session's writes or by the cold scan).
607        if let Some(hash) = self.get_hash_by_seq(next - 1) {
608            return self.get_by_hash(&hash);
609        }
610        // Warm-boot fallback: the seq index is still cold (warm start skips the
611        // scan), but the tip's object hash was persisted in MANIFEST and restored
612        // on open. O(1), no scan — this is what makes tip() survive a restart.
613        let th = self.tip_hash.read().clone();
614        if !th.is_empty() {
615            return self.get_by_hash(&th);
616        }
617        None
618    }
619
620    /// The collection-local tip — the most recent write into `coll` (highest seq in
621    /// that collection), or `None` if the collection has no writes. Scans the seq
622    /// index backward from the head until a node in `coll` is found, resolving
623    /// through the same seq_index → object-store path as a normal read; bounded by
624    /// how recently `coll` was written. Conceptually a different index than the
625    /// global `tip()` (global head vs collection head), kept as a separate method
626    /// so each is explicit — parity with the Python reference's `tip(coll)`. Lets a
627    /// consumer resume one chain (e.g. blocks / tx / utxo) without pulling global
628    /// tip and filtering.
629    pub fn tip_collection(&self, coll: &str) -> Option<Node> {
630        let mut s = self.seq.load(Ordering::SeqCst); // exclusive upper bound (head + 1)
631        while s > 0 {
632            s -= 1;
633            if let Some(hash) = self.get_hash_by_seq(s) {
634                if let Some(node) = self.get_by_hash(&hash) {
635                    if node.coll.as_str() == coll {
636                        return Some(node);
637                    }
638                }
639            }
640        }
641        None
642    }
643
644    /// Changefeed page: up to `limit` nodes written AFTER `after_seq` (EXCLUSIVE),
645    /// ascending by seq, wrapped in a `SinceBatch` cursor envelope. `after_seq` is
646    /// the cursor you last applied (a prior `tip()` seq or `to_seq`). `limit` bounds
647    /// the page — `0` means DEFAULT_SINCE_LIMIT, so the engine primitive can never
648    /// materialize an unbounded batch even when embedders call it directly (the
649    /// safety is here, not only in the HTTP layer). Drain by paging while
650    /// `has_more`, advancing your cursor to `to_seq`, then hand off to the live
651    /// `subscribe` edge. The append-only log IS the changefeed, so this is an
652    /// O(page) walk; unresolved seqs (outside seq_index coverage — see
653    /// `scan_status()`) are skipped rather than faked.
654    pub fn since(&self, after_seq: u64, limit: usize) -> SinceBatch {
655        let next = self.seq.load(Ordering::SeqCst);          // head + 1
656        let head_seq = next.saturating_sub(1);
657        let cap = if limit == 0 { DEFAULT_SINCE_LIMIT } else { limit };
658        let mut nodes: Vec<Node> = Vec::new();
659        let mut to_seq = after_seq;
660        let mut hit_limit = false;
661        let mut s = after_seq.saturating_add(1);
662        while s < next {
663            if nodes.len() >= cap { hit_limit = true; break; }
664            if let Some(hash) = self.get_hash_by_seq(s) {
665                if let Some(node) = self.get_by_hash(&hash) {
666                    to_seq = node.seq;
667                    nodes.push(node);
668                }
669            }
670            s += 1;
671        }
672        SinceBatch { nodes, from_seq: after_seq, to_seq, head_seq, has_more: hit_limit }
673    }
674
675    /// Replication readiness — see `ScanStatus`. `scan_complete` gates safe
676    /// historical catch-up: a consumer pulling an old cursor right after a cold
677    /// start must wait for it, or `since()` may hand back a partial page that looks
678    /// like "caught up". Computes the indexed range by scanning the in-memory seq
679    /// index (O(index)) — intended for periodic status polls, not the per-write
680    /// hot path.
681    pub fn scan_status(&self) -> ScanStatus {
682        let next = self.seq.load(Ordering::SeqCst);
683        let mut min = u64::MAX;
684        let mut max = 0u64;
685        let mut count = 0usize;
686        for kv in self.seq_index.iter() {
687            let s = *kv.key();
688            if s < min { min = s; }
689            if s > max { max = s; }
690            count += 1;
691        }
692        if count == 0 { min = 0; }
693        ScanStatus {
694            scan_complete:   self.startup_ready.load(Ordering::SeqCst),
695            tip_seq:         next.saturating_sub(1),
696            indexed_seq_min: min,
697            indexed_seq_max: max,
698            indexed_count:   count,
699        }
700    }
701
702    /// Add an explicit named relation edge between two documents.
703    /// Add an explicit named relation between two "coll:id" nodes.
704    /// Relations stored as __links__ documents — NQL-queryable, time-travelable,
705    /// consistent with the PyO3 binding which uses the same __links__ convention.
706    pub fn link(&self, frm: &str, rel: &str, to: &str) -> Result<()> {
707        let (frm_coll, frm_id) = frm.split_once(':')
708            .ok_or_else(|| anyhow::anyhow!("link frm must be 'coll:id', got: {}", frm))?;
709        let (to_coll, to_id) = to.split_once(':')
710            .ok_or_else(|| anyhow::anyhow!("link to must be 'coll:id', got: {}", to))?;
711        if self.id_index.get(frm_coll, frm_id).is_none() {
712            anyhow::bail!("link: frm not found: {}", frm);
713        }
714        if self.id_index.get(to_coll, to_id).is_none() {
715            anyhow::bail!("link: to not found: {}", to);
716        }
717        let link_id = format!("{}|{}|{}", frm, rel, to);
718        let doc = serde_json::json!({"_from": frm, "_rel": rel, "_to": to});
719        self.put("__links__", &link_id, doc, vec![], None, None)?;
720        Ok(())
721    }
722
723    /// Remove a named relation (deletes the __links__ document).
724    pub fn unlink(&self, frm: &str, rel: &str, to: &str) -> Result<bool> {
725        let link_id = format!("{}|{}|{}", frm, rel, to);
726        self.delete("__links__", &link_id)
727    }
728
729    /// Get neighbor nodes via a named relation.
730    /// Queries __links__ — consistent with the PyO3 binding.
731    pub fn neighbors(&self, frm: &str, rel: &str) -> Vec<Node> {
732        self.id_index
733            .list_ids("__links__")
734            .into_iter()
735            .filter_map(|id| self.get("__links__", &id))
736            .filter(|node| {
737                node.data.get("_from").and_then(|v| v.as_str()) == Some(frm)
738                    && node.data.get("_rel").and_then(|v| v.as_str()) == Some(rel)
739            })
740            .filter_map(|node| {
741                let to = node.data.get("_to")?.as_str()?;
742                let (to_coll, to_id) = to.split_once(':')?;
743                self.get(to_coll, to_id)
744            })
745            .collect()
746    }
747}
748
749impl Drop for Db {
750    /// Flush buffered state when the database is closed so a write-then-drop
751    /// sequence is durable without an explicit `flush_all()`.
752    ///
753    /// `IdIndex::set` only stages updates in the in-memory WAL `write_buf`;
754    /// disk persistence happens in `flush_write_buf()`, normally driven by the
755    /// manifest ticker. A short-lived `Db` (a library user's `{ let db =
756    /// Db::open(p)?; db.put(..)?; }` block, or a test) has no ticker, so without
757    /// this its writes would be silently lost on reopen. Flushing on drop
758    /// mirrors the flush-on-close contract of other embedded stores (sled,
759    /// RocksDB).
760    ///
761    /// In production this is a harmless safety net, not the primary durability
762    /// path: the manifest ticker thread holds an `Arc<Db>` for the process
763    /// lifetime, so `Drop` only fires once every owning handle is gone. No-op
764    /// for in-memory databases (`flush_all` short-circuits on `:memory:`).
765    fn drop(&mut self) {
766        self.flush_all();
767    }
768}
769
770/// Background cold-scan worker. Takes Arc<Db> — safe, Db is on the heap.
771fn cold_scan_background_arc(db: Arc<Db>) {
772    use rayon::prelude::*;
773    use blake2::{Blake2b512, Digest};
774
775    let objects        = &db.objects;
776    let head           = &db.head;
777    let seq_atomic     = &db.seq;
778    let sorted_indexes = &db.sorted_indexes;
779    let root           = db.root.clone();
780    let ready_flag     = Arc::clone(&db.startup_ready);
781
782    let hashes: Vec<String> = objects.all_hashes().collect();
783    let total = hashes.len();
784
785    if total == 0 {
786        ready_flag.store(true, Ordering::SeqCst);
787        return;
788    }
789
790    println!("  [nedbd] background scan — {} objects...", total);
791    let t0 = std::time::Instant::now();
792    let step = (total / 10).max(1000);
793
794    let nodes: Vec<Node> = hashes.par_iter()
795        .enumerate()
796        .filter_map(|(i, h)| {
797            if i > 0 && i % step == 0 {
798                let pct     = i * 100 / total;
799                let elapsed = t0.elapsed().as_secs_f32();
800                let rate    = i as f32 / elapsed;
801                let eta     = (total - i) as f32 / rate;
802                eprint!("\r  [nedbd]   {:>3}%  {:>8} / {:>8}  ({:>8.0}/s  eta {:.0}s)   ",
803                    pct, i, total, rate, eta);
804            }
805            objects.read(h).ok()
806        })
807        .collect();
808
809    eprintln!("\r  [nedbd]   100%  {:>8} / {:>8}  ({:.1}s)                        ",
810        total, total, t0.elapsed().as_secs_f32());
811
812    let max_seq = nodes.iter().map(|n| n.seq).max().unwrap_or(0);
813    seq_atomic.store(max_seq + 1, Ordering::SeqCst);
814
815    for node in &nodes {
816        db.seq_index.insert(node.seq, node.hash.clone());
817        if let Value::Object(ref obj) = node.data {
818            for (field, value) in obj {
819                if sorted_indexes.has(&node.coll, field) {
820                    sorted_indexes.insert(&node.coll, field, value, &node.hash);
821                }
822            }
823        }
824    }
825
826    // Compute Merkle head from sorted hashes
827    let mut sorted_hashes = hashes;
828    sorted_hashes.sort();
829    let mut h = Blake2b512::new();
830    h.update(max_seq.to_le_bytes());
831    for hash_str in &sorted_hashes {
832        h.update(hash_str.as_bytes());
833    }
834    let new_head = hex::encode(&h.finalize()[..32]);
835    *head.write() = new_head.clone();
836
837    // Tip = the highest-seq object we indexed. Persist its hash so tip() resolves
838    // O(1) on the next warm boot, before any scan repopulates the seq index.
839    let tip_hash = db.seq_index.iter()
840        .max_by_key(|kv| *kv.key())
841        .map(|kv| kv.value().clone())
842        .unwrap_or_default();
843    *db.tip_hash.write() = tip_hash.clone();
844
845    // Write MANIFEST atomically
846    let m     = Manifest { seq: max_seq, head: new_head, tip_hash };
847    let json  = serde_json::to_string(&m).unwrap_or_default();
848    let path  = root.join("MANIFEST");
849    let tmp   = root.join("MANIFEST.tmp");
850    let _ = fs::write(&tmp, &json);
851    let _ = fs::rename(&tmp, &path);
852
853    // Signal server: writes can now proceed
854    ready_flag.store(true, Ordering::SeqCst);
855    println!("  [nedbd] background scan complete — seq={} objects={} MANIFEST written", max_seq, total);
856}
857
858fn now() -> f64 {
859    std::time::SystemTime::now()
860        .duration_since(std::time::UNIX_EPOCH)
861        .map(|d| d.as_secs_f64())
862        .unwrap_or(0.0)
863}
864
865#[cfg(test)]
866mod tests {
867    use super::*;
868    use tempfile::tempdir;
869
870    #[test]
871    fn put_and_get() {
872        let dir = tempdir().unwrap();
873        let db = Db::open(dir.path(), None).unwrap();
874        db.put(
875            "blocks", "618000",
876            serde_json::json!({"height": 618000, "hash": "0000abc"}),
877            vec![], None, None,
878        ).unwrap();
879        let node = db.get("blocks", "618000").unwrap();
880        assert_eq!(node.id, "618000");
881        assert_eq!(node.data["height"], 618000);
882    }
883
884    #[test]
885    fn order_by_with_sorted_index() {
886        let dir = tempdir().unwrap();
887        let db = Db::open(dir.path(), None).unwrap();
888        db.create_sorted_index("blocks", "height");
889        for h in [3u64, 1, 5, 2, 4] {
890            db.put("blocks", &h.to_string(),
891                serde_json::json!({"height": h}),
892                vec![], None, None).unwrap();
893        }
894        let asc = db.order_by_asc("blocks", "height", 3);
895        let heights: Vec<u64> = asc.iter()
896            .filter_map(|n| n.data["height"].as_u64())
897            .collect();
898        assert_eq!(heights, vec![1, 2, 3]);
899    }
900
901    #[test]
902    fn causal_trace() {
903        let dir = tempdir().unwrap();
904        let db = Db::open(dir.path(), None).unwrap();
905        let a = db.put("ops", "a", serde_json::json!({"op": "create"}), vec![], None, None).unwrap();
906        let b = db.put("ops", "b", serde_json::json!({"op": "transfer"}), vec![a.hash.clone()], None, None).unwrap();
907        let c = db.put("ops", "c", serde_json::json!({"op": "burn"}), vec![b.hash.clone()], None, None).unwrap();
908
909        let trace = db.trace(&c.hash, false, 10);
910        assert_eq!(trace.len(), 3);  // c → b → a
911    }
912
913    #[test]
914    fn as_of() {
915        let dir = tempdir().unwrap();
916        let db = Db::open(dir.path(), None).unwrap();
917        let v1 = db.put("docs", "x", serde_json::json!({"v": 1}), vec![], None, None).unwrap();
918        let _v2 = db.put("docs", "x", serde_json::json!({"v": 2}), vec![], None, None).unwrap();
919
920        let at_v1 = db.get_as_of("docs", "x", v1.seq).unwrap();
921        assert_eq!(at_v1.data["v"], 1);
922        let current = db.get("docs", "x").unwrap();
923        assert_eq!(current.data["v"], 2);
924    }
925}
926
927#[cfg(test)]
928mod tests_v2 {
929    use super::*;
930    use tempfile::tempdir;
931
932    #[test]
933    fn seq_index_populated_on_put() {
934        let db = Db::in_memory();
935        let a = db.put("item", "a", serde_json::json!({"x": 1}), vec![], None, None).unwrap();
936        let b = db.put("item", "b", serde_json::json!({"x": 2}), vec![], None, None).unwrap();
937        assert_eq!(db.get_hash_by_seq(a.seq), Some(a.hash.clone()));
938        assert_eq!(db.get_hash_by_seq(b.seq), Some(b.hash.clone()));
939        assert_eq!(db.get_hash_by_seq(9999), None);
940    }
941
942    #[test]
943    fn tip_and_since() {
944        let db = Db::in_memory();
945        // Empty db: no tip, empty changefeed.
946        assert!(db.tip().is_none());
947        assert!(db.since(0, 0).nodes.is_empty());
948
949        let a = db.put("item", "a", serde_json::json!({"x": 1}), vec![], None, None).unwrap();
950        let b = db.put("item", "b", serde_json::json!({"x": 2}), vec![], None, None).unwrap();
951
952        // tip() = the most recent write (highest seq), returned as a full node.
953        let t = db.tip().expect("tip after writes");
954        assert_eq!(t.seq, b.seq);
955        assert_eq!(t.id, "b");
956        assert_eq!(t.hash, b.hash);
957
958        // since(after_seq, limit) — EXCLUSIVE cursor, bounded page + envelope.
959        let after_a = db.since(a.seq, 0);
960        assert_eq!(after_a.nodes.len(), 1);
961        assert_eq!(after_a.nodes[0].id, "b");
962        assert_eq!(after_a.from_seq, a.seq);
963        assert_eq!(after_a.to_seq, b.seq);
964        assert_eq!(after_a.head_seq, b.seq);
965        assert!(!after_a.has_more);
966
967        // Nothing written after the tip.
968        assert!(db.since(b.seq, 0).nodes.is_empty());
969
970        // `limit` bounds the page and sets has_more; resume from to_seq.
971        let c = db.put("item", "c", serde_json::json!({"x": 3}), vec![], None, None).unwrap();
972        let page = db.since(a.seq, 1);             // (a..] capped at 1 -> [b], more pending
973        assert_eq!(page.nodes.len(), 1);
974        assert_eq!(page.nodes[0].id, "b");
975        assert_eq!(page.to_seq, b.seq);
976        assert!(page.has_more);
977        let page2 = db.since(page.to_seq, 1);      // resume from b -> [c], done
978        assert_eq!(page2.nodes.len(), 1);
979        assert_eq!(page2.nodes[0].id, "c");
980        assert_eq!(page2.to_seq, c.seq);
981        assert!(!page2.has_more);
982    }
983
984    #[test]
985    fn tip_collection_per_chain() {
986        // The ITC sync-client case: separate chains in separate collections; a
987        // consumer resumes ONE without pulling global tip and filtering.
988        let db = Db::in_memory();
989        assert!(db.tip_collection("blocks").is_none());
990
991        db.put("blocks", "b0", serde_json::json!({"h": 0}), vec![], None, None).unwrap();
992        db.put("tx",     "t0", serde_json::json!({"v": 1}), vec![], None, None).unwrap();
993        let b1 = db.put("blocks", "b1", serde_json::json!({"h": 1}), vec![], None, None).unwrap();
994        let t1 = db.put("tx",     "t1", serde_json::json!({"v": 2}), vec![], None, None).unwrap();
995
996        // global tip = latest write overall (t1)
997        assert_eq!(db.tip().unwrap().id, "t1");
998        // collection-local tips = latest write in each collection
999        let bt = db.tip_collection("blocks").expect("blocks tip");
1000        assert_eq!(bt.id, "b1");
1001        assert_eq!(bt.seq, b1.seq);
1002        assert_eq!(db.tip_collection("tx").unwrap().seq, t1.seq);
1003        assert!(db.tip_collection("absent").is_none());
1004    }
1005
1006    #[test]
1007    fn seq_index_survives_batch() {
1008        let db = Db::in_memory();
1009        let nodes = db.put_batch(vec![
1010            ("item".into(), "x".into(), serde_json::json!({"v": 1}), vec![], None, None),
1011            ("item".into(), "y".into(), serde_json::json!({"v": 2}), vec![], None, None),
1012        ]).unwrap();
1013        for node in &nodes {
1014            assert_eq!(db.get_hash_by_seq(node.seq), Some(node.hash.clone()));
1015        }
1016    }
1017
1018    #[test]
1019    fn link_and_neighbors() {
1020        let db = Db::in_memory();
1021        db.put("driver", "d1", serde_json::json!({"name": "Bob"}),   vec![], None, None).unwrap();
1022        db.put("driver", "d2", serde_json::json!({"name": "Carol"}), vec![], None, None).unwrap();
1023        db.put("trip",   "t1", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
1024        db.put("trip",   "t2", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
1025
1026        db.link("driver:d1", "handles", "trip:t1").unwrap();
1027        db.link("driver:d1", "handles", "trip:t2").unwrap();
1028        db.link("driver:d2", "handles", "trip:t1").unwrap();
1029
1030        let d1_trips = db.neighbors("driver:d1", "handles");
1031        assert_eq!(d1_trips.len(), 2);
1032        let ids: std::collections::HashSet<&str> = d1_trips.iter().map(|n| n.id.as_str()).collect();
1033        assert!(ids.contains("t1") && ids.contains("t2"));
1034
1035        let d2_trips = db.neighbors("driver:d2", "handles");
1036        assert_eq!(d2_trips.len(), 1);
1037        assert_eq!(d2_trips[0].id, "t1");
1038    }
1039
1040    #[test]
1041    fn link_stored_in_links_collection() {
1042        // Links are stored as __links__ documents, not as graph edges.
1043        // The __links__ collection is NQL-queryable and consistent with the PyO3 binding.
1044        let db = Db::in_memory();
1045        db.put("driver", "d1", serde_json::json!({"name": "Bob"}),   vec![], None, None).unwrap();
1046        db.put("trip",   "t1", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
1047        db.link("driver:d1", "handles", "trip:t1").unwrap();
1048        // Verify the __links__ document was created
1049        let link_doc = db.get("__links__", "driver:d1|handles|trip:t1");
1050        assert!(link_doc.is_some(), "__links__ doc should exist");
1051        let doc = link_doc.unwrap();
1052        assert_eq!(doc.data["_from"], "driver:d1");
1053        assert_eq!(doc.data["_rel"],  "handles");
1054        assert_eq!(doc.data["_to"],   "trip:t1");
1055        // neighbors() resolves to the target node
1056        let nb = db.neighbors("driver:d1", "handles");
1057        assert_eq!(nb.len(), 1);
1058        assert_eq!(nb[0].id, "t1");
1059    }
1060
1061    #[test]
1062    fn link_missing_node_errors() {
1063        let db = Db::in_memory();
1064        db.put("driver", "d1", serde_json::json!({}), vec![], None, None).unwrap();
1065        assert!(db.link("driver:d1", "handles", "trip:ghost").is_err());
1066    }
1067
1068    #[test]
1069    fn link_durable_survives_reopen() {
1070        let dir = tempdir().unwrap();
1071        {
1072            let db = Db::open(dir.path(), None).unwrap();
1073            db.put("driver", "d1", serde_json::json!({"name": "Bob"}),   vec![], None, None).unwrap();
1074            db.put("trip",   "t1", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
1075            db.link("driver:d1", "handles", "trip:t1").unwrap();
1076        }
1077        let db2 = Db::open(dir.path(), None).unwrap();
1078        db2.startup_ready.store(true, std::sync::atomic::Ordering::SeqCst);
1079        let trips = db2.neighbors("driver:d1", "handles");
1080        assert_eq!(trips.len(), 1);
1081        assert_eq!(trips[0].id, "t1");
1082    }
1083
1084    #[test]
1085    fn tip_survives_warm_restart() {
1086        // v2.5.43: tip() returns the last written object AND survives a warm restart.
1087        // On reopen the seq_index is cold (warm start skips the scan), so tip() must
1088        // resolve the last write via the MANIFEST tip_hash fallback — no scan.
1089        let dir = tempdir().unwrap();
1090        {
1091            let db = Db::open(dir.path(), None).unwrap();
1092            db.put("blocks", "b1", serde_json::json!({"h": 1}), vec![], None, None).unwrap();
1093            db.put("blocks", "b2", serde_json::json!({"h": 2}), vec![], None, None).unwrap();
1094            db.flush_all(); // persists MANIFEST incl. tip_hash
1095            assert_eq!(db.tip().expect("tip in-session").id, "b2");
1096        }
1097        // Warm reopen: MANIFEST present -> no cold scan -> seq_index cold.
1098        let db2 = Db::open(dir.path(), None).unwrap();
1099        assert!(db2.get_hash_by_seq(1).is_none(), "seq_index is cold on a warm boot");
1100        let tip = db2.tip().expect("tip() must survive a warm restart");
1101        assert_eq!(tip.id, "b2");
1102        assert_eq!(tip.data.get("h").and_then(|v| v.as_i64()), Some(2));
1103    }
1104}