Skip to main content

nedb_engine/
db.rs

1//! Main DAG database — coordinates ObjectStore, IdIndex, SortedIndexes, GraphStore.
2
3use std::fs;
4use std::path::{Path, PathBuf};
5use std::sync::Arc;
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7use anyhow::Result;
8use dashmap::DashMap;
9use serde_json::Value;
10use parking_lot::RwLock;
11
12use crate::store::{Dek, Node, ObjectStore};
13use crate::index::{IdIndex, OrderedValue, SortedIndexes};
14use crate::graph::GraphStore;
15use crate::migrate;
16
17/// MANIFEST: cached {seq, head} written atomically after every write.
18/// On startup, if MANIFEST exists and no sorted indexes need rebuilding,
19/// startup is O(1) — just read this one file instead of scanning all objects.
20#[derive(serde::Serialize, serde::Deserialize)]
21struct Manifest {
22    seq:  u64,
23    head: String,
24    /// Object hash of the highest-seq node at flush time. Lets `tip()` resolve the
25    /// last write O(1) on a warm boot — before any scan repopulates the in-memory
26    /// seq index. `#[serde(default)]` so pre-2.5.43 MANIFESTs (no field) still parse.
27    #[serde(default)]
28    tip_hash: String,
29    /// Per-collection tip: `coll -> object hash of the highest-seq node in that
30    /// collection`. Lets `tip_collection()` resolve O(1) on a warm boot, same
31    /// contract as `tip_hash` for the global head. `#[serde(default)]` so
32    /// pre-this-field MANIFESTs still parse (empty map — self-heals on next write
33    /// or cold scan).
34    #[serde(default)]
35    coll_tips: std::collections::HashMap<String, String>,
36}
37
38/// Default cap for `since()` when the caller passes `limit == 0`. Bounds the
39/// engine primitive itself so a stale/offline consumer can never force an
40/// unbounded materialization — the safety lives in the core, not the HTTP layer.
41pub const DEFAULT_SINCE_LIMIT: usize = 10_000;
42
43/// One page of the changefeed returned by `since()`. The replication contract:
44/// apply `nodes` in ascending seq order, advance your cursor to `to_seq`, and keep
45/// paging while `has_more` is true; then attach to the live `subscribe` edge.
46/// `head_seq` tells the consumer how far the log currently extends (how far behind
47/// it is).
48#[derive(Debug, Clone, serde::Serialize)]
49pub struct SinceBatch {
50    /// Writes in (`from_seq`, `to_seq`], ascending by seq.
51    pub nodes:    Vec<Node>,
52    /// The exclusive cursor this page started from (echoes the request).
53    pub from_seq: u64,
54    /// Seq of the last node in this page — the consumer's next cursor.
55    pub to_seq:   u64,
56    /// Current head seq of the log (latest committed write).
57    pub head_seq: u64,
58    /// True when more writes remain past `to_seq` (the page hit `limit`).
59    pub has_more: bool,
60}
61
62/// Replication readiness snapshot. `scan_complete` is the correctness gate: until
63/// the cold-scan finishes rebuilding the seq index, an old cursor passed to
64/// `since()` can return a PARTIAL page and look (wrongly) like "caught up". A
65/// correctness-critical consumer MUST wait for `scan_complete == true` before
66/// trusting historical catch-up. `indexed_seq_min/max` report the currently
67/// resolvable seq range; `tip_seq` is the log head.
68#[derive(Debug, Clone, serde::Serialize)]
69pub struct ScanStatus {
70    /// Cold-scan finished — historical seqs fully resolvable; catch-up is safe.
71    pub scan_complete:   bool,
72    /// Head seq of the log (latest committed write).
73    pub tip_seq:         u64,
74    /// Lowest seq currently in the seq index (0 if empty).
75    pub indexed_seq_min: u64,
76    /// Highest seq currently in the seq index.
77    pub indexed_seq_max: u64,
78    /// Number of seqs currently resolvable via the index.
79    pub indexed_count:   usize,
80}
81
82pub struct Db {
83    pub objects:        ObjectStore,
84    pub id_index:       IdIndex,
85    pub sorted_indexes: SortedIndexes,
86    pub graph:          GraphStore,
87    pub root:           PathBuf,
88    /// Dirty flag — set true when head changes, cleared after manifest flush.
89    /// Decouples flush_manifest from the hot write path so concurrent writes
90    /// don't serialise on 2× file I/O per PUT.
91    manifest_dirty:     Arc<AtomicBool>,
92    pub seq:            AtomicU64,
93    /// Cached Merkle head — updated incrementally on every write (O(1)).
94    head:               RwLock<String>,
95    /// `(seq, object hash)` of the most recent write (highest seq). Mirrors `head`
96    /// but holds the tip's content hash, so `tip()` can resolve the last node O(1)
97    /// on a warm boot when the in-memory `seq_index` is still cold. The seq rides
98    /// along so concurrent writers can settle the tip by HIGHEST SEQ rather than
99    /// arrival order (a slow older put must never clobber a newer tip). Only the
100    /// hash is persisted in MANIFEST — format unchanged.
101    tip_hash:           RwLock<(u64, String)>,
102    /// Per-collection tip: `coll -> (seq, object hash)` of the highest-seq node in
103    /// that collection. Kept current on every write (`update_head`, seq-guarded),
104    /// restored from MANIFEST on warm boot, rebuilt by the cold scan — so
105    /// `tip_collection()` is O(1) and durable across restarts in every startup
106    /// regime, by construction.
107    coll_tip_hash:      Arc<DashMap<String, (u64, String)>>,
108    /// True once startup is fully ready (MANIFEST loaded or cold scan complete).
109    /// Warm starts set this true before returning from open().
110    /// Cold starts set this true in the background thread when scan completes.
111    /// Writes are held with 503 until this is true; reads always proceed.
112    pub startup_ready:  Arc<AtomicBool>,
113    /// Seq → hash lookup for v1 compatibility. Populated by put(), put_batch(),
114    /// and the cold-scan background pass. Only covers nodes from the current
115    /// process session + cold-scan; older seqs not in this map cannot be resolved.
116    seq_index:          Arc<DashMap<u64, String>>,
117}
118
119impl Db {
120    /// Create a pure in-memory database — no disk I/O, no migration, instant startup.
121    /// Perfect for tests, hot-cache layers, and ephemeral sessions.
122    /// All data is lost when the Db is dropped.
123    pub fn in_memory() -> Self {
124        Self {
125            objects:        ObjectStore::in_memory(),
126            id_index:       IdIndex::in_memory(),
127            sorted_indexes: SortedIndexes::new(),
128            graph:          GraphStore::in_memory(),
129            root:           std::path::PathBuf::from(":memory:"),
130            seq:            AtomicU64::new(0),
131            head:           RwLock::new(String::new()),
132            tip_hash:       RwLock::new((0, String::new())),
133            coll_tip_hash:  Arc::new(DashMap::new()),
134            startup_ready:  Arc::new(AtomicBool::new(true)),  // always ready
135            manifest_dirty: Arc::new(AtomicBool::new(false)),
136            seq_index:      Arc::new(DashMap::new()),
137        }
138    }
139
140    /// Open (or create) a database. Runs v1→v2 migration automatically if log.aof is present.
141    pub fn open(db_root: &Path, dek: Option<Dek>) -> Result<Self> {
142        std::fs::create_dir_all(db_root)?;
143
144        let objects        = ObjectStore::new(db_root, dek.clone())?;
145        let id_index       = IdIndex::new(db_root)?;
146        let sorted_indexes = SortedIndexes::new();
147        let graph          = GraphStore::new(db_root)?;
148
149        let mut db = Self {
150            objects,
151            id_index,
152            sorted_indexes,
153            graph,
154            root: db_root.to_path_buf(),
155            seq:  AtomicU64::new(0),
156            head: RwLock::new(String::new()),
157            tip_hash: RwLock::new((0, String::new())),
158            coll_tip_hash: Arc::new(DashMap::new()),
159            startup_ready:  Arc::new(AtomicBool::new(false)),
160            manifest_dirty: Arc::new(AtomicBool::new(false)),
161            seq_index:      Arc::new(DashMap::new()),
162        };
163
164        // Auto-migrate v1 → v2 if needed (pass DEK so encrypted AOFs convert correctly)
165        migrate::migrate_if_needed(
166            db_root,
167            &db.objects,
168            &db.id_index,
169            &db.sorted_indexes,
170            &db.graph,
171            dek.as_ref(),
172        )?;
173
174        // Fast startup: load seq+head from MANIFEST if no sorted indexes need rebuilding.
175        // Falls back to full object scan only when necessary (first open, or post-migration).
176        db.startup_rebuild()?;
177
178        Ok(db)
179    }
180
181    /// Smart startup:
182    /// - Warm (MANIFEST exists): O(1) load → startup_ready = true immediately.
183    /// - Cold (no MANIFEST): start server immediately, run scan in background thread.
184    ///   Writes return 503 until scan completes; reads always proceed.
185    fn startup_rebuild(&mut self) -> Result<()> {
186        let manifest_path = self.root.join("MANIFEST");
187        let needs_index_rebuild = !self.sorted_indexes.is_empty();
188
189        // Warm path: MANIFEST + no sorted indexes to rebuild → instant start
190        if manifest_path.exists() && !needs_index_rebuild {
191            if let Some(m) = fs::read_to_string(&manifest_path)
192                .ok()
193                .and_then(|s| serde_json::from_str::<Manifest>(&s).ok())
194            {
195                // Self-heal: MANIFEST with an empty or short head is corrupt/stale.
196                // Fall through to cold scan so the head is rebuilt correctly from objects.
197                if m.head.len() < 8 {
198                    eprintln!("  [nedbd] MANIFEST head invalid (len={}), self-healing via cold scan", m.head.len());
199                } else if m.tip_hash.is_empty() {
200                    // Pre-2.5.43 MANIFEST (no persisted tip). Cold-scan once to rebuild
201                    // the seq index and rewrite MANIFEST with tip_hash — warm + tip()-
202                    // durable on every boot thereafter.
203                    eprintln!("  [nedbd] MANIFEST predates durable tip() — cold scan once to upgrade");
204                } else {
205                    self.seq.store(m.seq, Ordering::SeqCst); // m.seq is already the next-to-assign counter
206                    *self.head.write() = m.head.clone();
207                    // The tip's seq is the last ASSIGNED seq (m.seq is next-to-assign).
208                    *self.tip_hash.write() = (m.seq.saturating_sub(1), m.tip_hash.clone());
209                    for (coll, hash) in &m.coll_tips {
210                        // Per-coll seqs aren't persisted (MANIFEST format unchanged);
211                        // seed 0 — every future write has seq >= m.seq > 0 and wins,
212                        // and nothing older than the persisted tip can ever arrive
213                        // because the seq counter resumes at m.seq.
214                        self.coll_tip_hash.insert(coll.clone(), (0, hash.clone()));
215                    }
216                    self.startup_ready.store(true, Ordering::SeqCst);
217                    println!("  [nedbd] warm start — seq={} head={}... tip={}...",
218                        m.seq, &m.head[..8], &m.tip_hash[..8.min(m.tip_hash.len())]);
219                    return Ok(());
220                }
221            } else {
222                eprintln!("  [nedbd] MANIFEST corrupt or missing, falling back to cold scan");
223            }
224        }
225
226        // Cold path: mark as not ready, return immediately.
227        // The actual background scan is started by Db::start_cold_scan(arc)
228        // which is called from Manager::open_all() AFTER Arc::new(db) — when
229        // the Db is heap-allocated and its field addresses are permanently stable.
230        // Capturing field addresses here would cause UB: Db moves on return.
231        println!("  [nedbd] cold start — background scan will start after heap allocation");
232        Ok(())
233    }
234
235    /// Call this from Manager::open_all() after Arc::new(db).
236    /// Spawns the cold scan background thread with stable heap addresses.
237    /// No-op if startup is already complete (warm start).
238    pub fn start_cold_scan(self_arc: Arc<Self>) {
239        if self_arc.startup_ready.load(Ordering::SeqCst) {
240            return; // warm start — already ready
241        }
242        // Fast path: if the database is empty (new or just created), skip the
243        // background thread entirely. No objects to scan = instant startup.
244        if self_arc.objects.all_hashes().next().is_none() {
245            self_arc.startup_ready.store(true, Ordering::SeqCst);
246            return;
247        }
248        println!("  [nedbd] cold start — background scan starting, server accepting reads now");
249        std::thread::spawn(move || {
250            let db = self_arc;
251            cold_scan_background_arc(db);
252        });
253    }
254
255    /// Write a document. Returns the new node with its content hash set.
256    pub fn put(
257        &self,
258        coll: &str,
259        id: &str,
260        data: Value,
261        caused_by: Vec<String>,
262        valid_from: Option<String>,
263        valid_to:   Option<String>,
264    ) -> Result<Node> {
265        let seq  = self.seq.fetch_add(1, Ordering::SeqCst);
266        let prev = self.id_index.get(coll, id);
267
268        // Remove old node from sorted indexes (it's being superseded).
269        // Skip the old-object disk read entirely when no sorted index exists —
270        // the read (open + BLAKE2b verify + optional AES-GCM decrypt + JSON
271        // parse) was pure waste in the common unindexed case, ~2x read
272        // amplification on every update (the itcd chainstate shape).
273        if !self.sorted_indexes.is_empty() {
274            if let Some(old_hash) = &prev {
275                if let Ok(old_node) = self.objects.read(old_hash) {
276                    if let Value::Object(ref obj) = old_node.data {
277                        for (field, value) in obj {
278                            self.sorted_indexes.remove(coll, field, value, old_hash);
279                        }
280                    }
281                }
282            }
283        }
284
285        let mut node = Node {
286            id:         id.to_string(),
287            coll:       coll.to_string(),
288            seq,
289            data:       data.clone(),
290            prev,
291            caused_by:  caused_by.clone(),
292            ts:         now(),
293            valid_from,
294            valid_to,
295            hash:       String::new(),
296        };
297
298        // Write to object store (atomic, content-addressed)
299        let hash = self.objects.write(&mut node)?;
300        self.seq_index.insert(seq, hash.clone());
301
302        // Update id index (atomic file)
303        self.id_index.set(coll, id, &hash)?;
304
305        // Update sorted indexes
306        if let Value::Object(ref obj) = data {
307            for (field, value) in obj {
308                if self.sorted_indexes.has(coll, field) {
309                    self.sorted_indexes.insert(coll, field, value, &hash);
310                }
311            }
312        }
313
314        // Write causal graph edges
315        for cause in &caused_by {
316            self.graph.add_edge(&hash, "caused_by", cause)?;
317            self.graph.add_edge(cause, "caused_by_rev", &hash)?;
318        }
319
320        // Update running Merkle head: O(1) chain, no full recompute.
321        // new_head = BLAKE2b(prev_head || seq_bytes || new_object_hash)
322        self.update_head(coll, seq, &hash);
323
324        Ok(node)
325    }
326
327    /// Batch put: write N documents in parallel, preserving monotonic seq ordering.
328    /// Pre-allocates N seq numbers atomically, then parallelises object writes and
329    /// id-index updates via Rayon. Each op is independent — safe to parallelise.
330    /// Returns nodes in input order with assigned seq numbers.
331    pub fn put_batch(
332        &self,
333        ops: Vec<(String, String, Value, Vec<String>, Option<String>, Option<String>)>,
334        // (coll, id, data, caused_by, valid_from, valid_to)
335    ) -> Result<Vec<Node>> {
336        use rayon::prelude::*;
337
338        if ops.is_empty() { return Ok(vec![]); }
339        let n = ops.len() as u64;
340
341        // Pre-allocate N consecutive seq numbers — preserves ordering under concurrency
342        let base_seq = self.seq.fetch_add(n, Ordering::SeqCst);
343        let ts = now();
344
345        // Build nodes with assigned seq numbers
346        let index_live = !self.sorted_indexes.is_empty();
347        let mut nodes: Vec<Node> = ops.into_iter().enumerate().map(|(i, (coll, id, data, caused_by, valid_from, valid_to))| {
348            let prev = self.id_index.get(&coll, &id);
349            // Parity with put(): drop the superseded version's values from any
350            // sorted indexes, so top-k never returns stale hashes after a batch
351            // update. Without this, batch updates left the old version's index
352            // entries in place — ORDER BY surfaced superseded rows alongside
353            // current ones. Only pay the old-object read when an index exists.
354            if index_live {
355                if let Some(old_hash) = &prev {
356                    if let Ok(old_node) = self.objects.read(old_hash) {
357                        if let Value::Object(ref obj) = old_node.data {
358                            for (field, value) in obj {
359                                self.sorted_indexes.remove(&coll, field, value, old_hash);
360                            }
361                        }
362                    }
363                }
364            }
365            Node {
366                id, coll, seq: base_seq + i as u64,
367                data, prev, caused_by,
368                ts, valid_from, valid_to,
369                hash: String::new(),
370            }
371        }).collect();
372
373        // Parallel object writes (content-addressed, idempotent, safe to parallelise)
374        let write_errors: Vec<anyhow::Error> = nodes.par_iter_mut()
375            .filter_map(|node| self.objects.write(node).err())
376            .collect();
377        if let Some(e) = write_errors.into_iter().next() { return Err(e); }
378
379        // Parallel id-index updates
380        let index_errors: Vec<anyhow::Error> = nodes.par_iter()
381            .filter_map(|node| self.id_index.set(&node.coll, &node.id, &node.hash).err())
382            .collect();
383        if let Some(e) = index_errors.into_iter().next() { return Err(e); }
384
385        // Sorted indexes + causal graph (sequential — small overhead, usually no indexes)
386        for node in &nodes {
387            self.seq_index.insert(node.seq, node.hash.clone());
388            if let Value::Object(ref obj) = node.data {
389                for (field, value) in obj {
390                    if self.sorted_indexes.has(&node.coll, field) {
391                        self.sorted_indexes.insert(&node.coll, field, value, &node.hash);
392                    }
393                }
394            }
395            for cause in &node.caused_by {
396                self.graph.add_edge(&node.hash, "caused_by", cause).ok();
397                self.graph.add_edge(cause, "caused_by_rev", &node.hash).ok();
398            }
399        }
400
401        // Single Merkle head update for the whole batch (chain all hashes)
402        for node in &nodes {
403            self.update_head(&node.coll, node.seq, &node.hash);
404        }
405
406        Ok(nodes)
407    }
408
409    /// Update the running Merkle head with a new write. O(1); no file I/O — the
410    /// background ticker flushes MANIFEST.
411    ///
412    /// Concurrency contract (this function is reached by parallel `put()`s —
413    /// the server runs puts on blocking threads):
414    /// - The head chain is extended under ONE write lock held across the whole
415    ///   read-modify-write. The old read-then-write shape let two concurrent
416    ///   writers both read the same prev head; one contribution was silently
417    ///   dropped from the chain — a corrupted tamper-evidence primitive. The
418    ///   chain is arrival-ordered under concurrency (a seq-ordered canonical
419    ///   head is tracked as follow-up work); what this lock guarantees is that
420    ///   EVERY write is committed into the chain exactly once.
421    /// - Tip pointers settle by HIGHEST SEQ, not arrival order: concurrent
422    ///   puts can reach here out of seq order, and "last call wins" could
423    ///   persist a stale tip into MANIFEST for the next warm boot.
424    fn update_head(&self, coll: &str, seq: u64, new_hash: &str) {
425        use blake2::{Blake2b512, Digest};
426        {
427            let mut head = self.head.write();
428            let mut h = Blake2b512::new();
429            h.update(head.as_bytes());
430            h.update(seq.to_le_bytes());
431            h.update(new_hash.as_bytes());
432            *head = hex::encode(&h.finalize()[..32]);
433        }
434        {
435            let mut tip = self.tip_hash.write();
436            if seq >= tip.0 {
437                *tip = (seq, new_hash.to_string());
438            }
439        }
440        self.coll_tip_hash
441            .entry(coll.to_string())
442            .and_modify(|t| {
443                if seq >= t.0 {
444                    *t = (seq, new_hash.to_string());
445                }
446            })
447            .or_insert_with(|| (seq, new_hash.to_string()));
448        // Mark dirty — background ticker will flush to MANIFEST (no I/O on write path)
449        self.manifest_dirty.store(true, Ordering::Release);
450    }
451
452    /// Flush both the id-index WAL and MANIFEST. Used on graceful shutdown.
453    pub fn flush_all(&self) {
454        self.id_index.flush_write_buf();
455        // v3: fsync the active segment (no-op for loose/in-memory stores).
456        // One durability point per batch instead of one fsync per object.
457        if let Err(e) = self.objects.sync() {
458            eprintln!("nedb: segment sync failed: {}", e);
459        }
460        self.flush_manifest();
461    }
462
463    /// Compact the v3 packed object store: keep the CURRENT version of every
464    /// document (from the id-index) and reclaim everything else. No-op unless
465    /// running with the v3 segment substrate (`--dag-v3` / NEDB_DAG_V3).
466    ///
467    /// This is a PRUNING operation: superseded/historical object versions are
468    /// dropped, so AS OF / TRACE over pruned versions is discarded — that is
469    /// what reclaims the space. Flushes first so all data is durable on disk
470    /// before the old segments are deleted.
471    pub fn compact(&self) -> Result<crate::segment::CompactStats> {
472        self.flush_all();
473        let mut live: std::collections::HashSet<String> = std::collections::HashSet::new();
474        for coll in self.id_index.collections() {
475            for id in self.id_index.list_ids(&coll) {
476                if let Some(h) = self.id_index.get(&coll, &id) {
477                    live.insert(h);
478                }
479            }
480        }
481        self.objects.compact(&live)
482    }
483
484    /// Flush MANIFEST to disk if dirty. No-op for in-memory databases.
485    pub fn flush_manifest_if_dirty(&self) {
486        if self.root == std::path::PathBuf::from(":memory:") { return; }
487        if self.manifest_dirty.compare_exchange(
488            true, false, Ordering::AcqRel, Ordering::Relaxed
489        ).is_ok() {
490            self.flush_manifest();
491        }
492    }
493
494    /// Atomically persist current seq+head to MANIFEST. No-op for in-memory databases.
495    pub fn flush_manifest(&self) {
496        if self.root == std::path::PathBuf::from(":memory:") { return; }
497        let seq  = self.seq.load(Ordering::SeqCst);
498        let head = self.head.read().clone();
499        let tip_hash = self.tip_hash.read().1.clone();
500        let coll_tips: std::collections::HashMap<String, String> = self.coll_tip_hash
501            .iter()
502            .map(|kv| (kv.key().clone(), kv.value().1.clone()))
503            .collect();
504        let m = Manifest { seq, head, tip_hash, coll_tips };
505        if let Ok(json) = serde_json::to_string(&m) {
506            let path = self.root.join("MANIFEST");
507            let tmp  = self.root.join("MANIFEST.tmp");
508            // fsync the tmp file BEFORE the rename: rename-without-fsync can
509            // leave a zero-length/partial MANIFEST at the final path after
510            // power loss (ext4 delayed allocation). The startup self-heal
511            // (invalid head -> cold scan) catches that, but a full rescan is
512            // exactly the cost MANIFEST exists to avoid. One fsync per flush,
513            // and flushes are already off the hot write path (ticker-driven).
514            let wrote = (|| -> std::io::Result<()> {
515                use std::io::Write;
516                let mut f = fs::File::create(&tmp)?;
517                f.write_all(json.as_bytes())?;
518                f.sync_all()
519            })();
520            if wrote.is_ok() && fs::rename(&tmp, &path).is_ok() {
521                // Make the rename itself durable (directory entry). Unix-only;
522                // on Windows directory handles don't support this and the
523                // rename is already journaled by NTFS.
524                #[cfg(unix)]
525                if let Ok(dir) = fs::File::open(&self.root) {
526                    let _ = dir.sync_all();
527                }
528            }
529        }
530    }
531
532    /// Start a background thread that flushes both the id-index WAL and MANIFEST
533    /// every `interval_ms` milliseconds.
534    /// Call this after Arc::new(db) — the Arc keeps Db alive for the thread's lifetime.
535    pub fn start_manifest_ticker(self_arc: Arc<Self>, interval_ms: u64) {
536        let db = self_arc;
537        std::thread::spawn(move || {
538            loop {
539                std::thread::sleep(std::time::Duration::from_millis(interval_ms));
540                // Flush id-index WAL to disk (parallel Rayon writes)
541                db.id_index.flush_write_buf();
542                // Segment bytes must be durable BEFORE a MANIFEST that
543                // references them: otherwise power loss can leave MANIFEST
544                // pointing at a tip whose object bytes were still in the page
545                // cache — the torn tail is truncated on reopen and the warm
546                // boot resolves a tip that no longer exists, with the seq
547                // counter ahead of durable data. Order: sync segments, then
548                // MANIFEST. Gated on the dirty flag so an idle database pays
549                // no per-tick fsync. (flush_all already used this order; the
550                // ticker now matches it.)
551                if db.manifest_dirty.load(Ordering::Acquire) {
552                    if let Err(e) = db.objects.sync() {
553                        eprintln!("nedb: segment sync failed: {}", e);
554                    }
555                    db.flush_manifest_if_dirty();
556                }
557            }
558        });
559    }
560
561    /// Return the current Merkle head string. O(1) — read from cache.
562    pub fn head(&self) -> String {
563        self.head.read().clone()
564    }
565
566    /// Delete a document — writes a tombstone node and removes the id from the index.
567    /// The object history is preserved in the DAG; only the live id pointer is cleared.
568    pub fn delete(&self, coll: &str, id: &str) -> Result<bool> {
569        let prev = match self.id_index.get(coll, id) {
570            None => return Ok(false),   // already gone
571            Some(h) => h,
572        };
573        let seq = self.seq.fetch_add(1, Ordering::SeqCst);
574        let mut tombstone = Node {
575            id:         format!("_del_{}", id),
576            coll:       coll.to_string(),
577            seq,
578            data:       serde_json::json!({"_deleted": id, "_prev": prev}),
579            prev:       Some(prev),
580            caused_by:  vec![],
581            ts:         now(),
582            valid_from: None,
583            valid_to:   None,
584            hash:       String::new(),
585        };
586        let hash = self.objects.write(&mut tombstone)?;
587        self.update_head(coll, seq, &hash);
588        // Remove the live id pointer — doc is now invisible to queries and list()
589        self.id_index.remove(coll, id)?;
590        Ok(true)
591    }
592
593    /// Get the current version of a document by id.
594    pub fn get(&self, coll: &str, id: &str) -> Option<Node> {
595        let hash = self.id_index.get(coll, id)?;
596        self.objects.read(&hash).ok()
597    }
598
599    /// Get a specific version of a document by object hash.
600    pub fn get_by_hash(&self, hash: &str) -> Option<Node> {
601        self.objects.read(hash).ok()
602    }
603
604    /// Get a document AS OF a specific sequence number.
605    /// Walks the version chain (prev links) backward until seq <= target.
606    pub fn get_as_of(&self, coll: &str, id: &str, target_seq: u64) -> Option<Node> {
607        let hash = self.id_index.get(coll, id)?;
608        let mut current = self.objects.read(&hash).ok()?;
609        loop {
610            if current.seq <= target_seq {
611                return Some(current);
612            }
613            let prev_hash = current.prev.as_deref()?;
614            current = self.objects.read(prev_hash).ok()?;
615        }
616    }
617
618    /// List all documents in a collection, returning current versions.
619    pub fn list(&self, coll: &str) -> Vec<Node> {
620        self.id_index
621            .list_ids(coll)
622            .into_iter()
623            .filter_map(|id| self.get(coll, &id))
624            .collect()
625    }
626
627    /// ORDER BY field ASC LIMIT n — uses sorted index if available, else falls back to full scan.
628    pub fn order_by_asc(&self, coll: &str, field: &str, limit: usize) -> Vec<Node> {
629        if self.sorted_indexes.has(coll, field) {
630            self.sorted_indexes
631                .top_k_asc(coll, field, limit)
632                .into_iter()
633                .filter_map(|h| self.objects.read(&h).ok())
634                .collect()
635        } else {
636            let mut docs = self.list(coll);
637            docs.sort_by(|a, b| {
638                let av = a.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
639                let bv = b.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
640                av.cmp(&bv)
641            });
642            docs.truncate(limit);
643            docs
644        }
645    }
646
647    /// ORDER BY field DESC LIMIT n
648    pub fn order_by_desc(&self, coll: &str, field: &str, limit: usize) -> Vec<Node> {
649        if self.sorted_indexes.has(coll, field) {
650            self.sorted_indexes
651                .top_k_desc(coll, field, limit)
652                .into_iter()
653                .filter_map(|h| self.objects.read(&h).ok())
654                .collect()
655        } else {
656            let mut docs = self.list(coll);
657            docs.sort_by(|a, b| {
658                let av = a.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
659                let bv = b.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
660                bv.cmp(&av)
661            });
662            docs.truncate(limit);
663            docs
664        }
665    }
666
667    /// TRACE caused_by — walk causal graph from a node.
668    pub fn trace(&self, hash: &str, reverse: bool, limit: usize) -> Vec<Node> {
669        self.graph
670            .trace(hash, "caused_by", reverse, limit)
671            .into_iter()
672            .filter_map(|h| self.objects.read(&h).ok())
673            .collect()
674    }
675
676    /// Verify tamper-evidence of all objects.
677    pub fn verify(&self) -> (usize, Vec<String>) {
678        self.objects.verify_all()
679    }
680
681    /// Create a sorted index for a (coll, field) pair.
682    pub fn create_sorted_index(&self, coll: &str, field: &str) {
683        self.sorted_indexes.ensure(coll, field);
684        // Backfill from existing objects
685        for id in self.id_index.list_ids(coll) {
686            if let Some(node) = self.get(coll, &id) {
687                if let Value::Object(ref obj) = node.data {
688                    if let Some(value) = obj.get(field) {
689                        self.sorted_indexes.insert(coll, field, value, &node.hash);
690                    }
691                }
692            }
693        }
694    }
695
696    /// Resolve a sequence number to its content hash (v1 compatibility).
697    /// Only covers nodes written in the current process session + cold-scan nodes.
698    pub fn get_hash_by_seq(&self, seq: u64) -> Option<String> {
699        self.seq_index.get(&seq).map(|r| r.clone())
700    }
701
702    /// The tip — the most recently written node (highest seq), or `None` if the
703    /// database is empty. O(1): `self.seq` is the next-to-assign counter, so the
704    /// latest write sits at `seq - 1`; we resolve it through the same
705    /// seq_index → object-store path a normal read uses, so the returned Node is
706    /// byte-identical to one fetched by id or hash (it carries its own seq, hash,
707    /// causal links, and valid-time). This is the cheap "give me the latest write"
708    /// primitive — the head of the log, not an aggregate.
709    pub fn tip(&self) -> Option<Node> {
710        let next = self.seq.load(Ordering::SeqCst);
711        if next == 0 {
712            return None; // nothing written yet
713        }
714        // Fast path: resolve the head seq through the in-memory seq index
715        // (populated by this session's writes or by the cold scan).
716        if let Some(hash) = self.get_hash_by_seq(next - 1) {
717            return self.get_by_hash(&hash);
718        }
719        // Warm-boot fallback: the seq index is still cold (warm start skips the
720        // scan), but the tip's object hash was persisted in MANIFEST and restored
721        // on open. O(1), no scan — this is what makes tip() survive a restart.
722        let th = self.tip_hash.read().1.clone();
723        if !th.is_empty() {
724            return self.get_by_hash(&th);
725        }
726        None
727    }
728
729    /// The collection-local tip — the most recent write into `coll` (highest seq in
730    /// that collection), or `None` if the collection has no writes. O(1): resolves
731    /// through `coll_tip_hash`, a dedicated per-collection map kept current on every
732    /// write (`update_head`), restored from MANIFEST on warm boot, and rebuilt by the
733    /// cold scan — durable across restarts by construction, same contract as `tip()`
734    /// for the global head. Conceptually a different index than the global `tip()`
735    /// (global head vs collection head), kept as a separate method so each is
736    /// explicit — parity with the Python reference's `tip(coll)`. Lets a consumer
737    /// resume one chain (e.g. blocks / tx / utxo) without pulling global tip and
738    /// filtering.
739    pub fn tip_collection(&self, coll: &str) -> Option<Node> {
740        let hash = self.coll_tip_hash.get(coll)?.1.clone();
741        self.get_by_hash(&hash)
742    }
743
744    /// Changefeed page: up to `limit` nodes written AFTER `after_seq` (EXCLUSIVE),
745    /// ascending by seq, wrapped in a `SinceBatch` cursor envelope. `after_seq` is
746    /// the cursor you last applied (a prior `tip()` seq or `to_seq`). `limit` bounds
747    /// the page — `0` means DEFAULT_SINCE_LIMIT, so the engine primitive can never
748    /// materialize an unbounded batch even when embedders call it directly (the
749    /// safety is here, not only in the HTTP layer). Drain by paging while
750    /// `has_more`, advancing your cursor to `to_seq`, then hand off to the live
751    /// `subscribe` edge. The append-only log IS the changefeed, so this is an
752    /// O(page) walk; unresolved seqs (outside seq_index coverage — see
753    /// `scan_status()`) are skipped rather than faked.
754    pub fn since(&self, after_seq: u64, limit: usize) -> SinceBatch {
755        let next = self.seq.load(Ordering::SeqCst);          // head + 1
756        let head_seq = next.saturating_sub(1);
757        let cap = if limit == 0 { DEFAULT_SINCE_LIMIT } else { limit };
758        let mut nodes: Vec<Node> = Vec::new();
759        let mut to_seq = after_seq;
760        let mut hit_limit = false;
761        let mut s = after_seq.saturating_add(1);
762        while s < next {
763            if nodes.len() >= cap { hit_limit = true; break; }
764            if let Some(hash) = self.get_hash_by_seq(s) {
765                if let Some(node) = self.get_by_hash(&hash) {
766                    to_seq = node.seq;
767                    nodes.push(node);
768                }
769            }
770            s += 1;
771        }
772        SinceBatch { nodes, from_seq: after_seq, to_seq, head_seq, has_more: hit_limit }
773    }
774
775    /// Replication readiness — see `ScanStatus`. `scan_complete` gates safe
776    /// historical catch-up: a consumer pulling an old cursor right after a cold
777    /// start must wait for it, or `since()` may hand back a partial page that looks
778    /// like "caught up". Computes the indexed range by scanning the in-memory seq
779    /// index (O(index)) — intended for periodic status polls, not the per-write
780    /// hot path.
781    pub fn scan_status(&self) -> ScanStatus {
782        let next = self.seq.load(Ordering::SeqCst);
783        let mut min = u64::MAX;
784        let mut max = 0u64;
785        let mut count = 0usize;
786        for kv in self.seq_index.iter() {
787            let s = *kv.key();
788            if s < min { min = s; }
789            if s > max { max = s; }
790            count += 1;
791        }
792        if count == 0 { min = 0; }
793        ScanStatus {
794            scan_complete:   self.startup_ready.load(Ordering::SeqCst),
795            tip_seq:         next.saturating_sub(1),
796            indexed_seq_min: min,
797            indexed_seq_max: max,
798            indexed_count:   count,
799        }
800    }
801
802    /// Add an explicit named relation edge between two documents.
803    /// Add an explicit named relation between two "coll:id" nodes.
804    /// Relations stored as __links__ documents — NQL-queryable, time-travelable,
805    /// consistent with the PyO3 binding which uses the same __links__ convention.
806    pub fn link(&self, frm: &str, rel: &str, to: &str) -> Result<()> {
807        let (frm_coll, frm_id) = frm.split_once(':')
808            .ok_or_else(|| anyhow::anyhow!("link frm must be 'coll:id', got: {}", frm))?;
809        let (to_coll, to_id) = to.split_once(':')
810            .ok_or_else(|| anyhow::anyhow!("link to must be 'coll:id', got: {}", to))?;
811        if self.id_index.get(frm_coll, frm_id).is_none() {
812            anyhow::bail!("link: frm not found: {}", frm);
813        }
814        if self.id_index.get(to_coll, to_id).is_none() {
815            anyhow::bail!("link: to not found: {}", to);
816        }
817        let link_id = format!("{}|{}|{}", frm, rel, to);
818        let doc = serde_json::json!({"_from": frm, "_rel": rel, "_to": to});
819        self.put("__links__", &link_id, doc, vec![], None, None)?;
820        Ok(())
821    }
822
823    /// Remove a named relation (deletes the __links__ document).
824    pub fn unlink(&self, frm: &str, rel: &str, to: &str) -> Result<bool> {
825        let link_id = format!("{}|{}|{}", frm, rel, to);
826        self.delete("__links__", &link_id)
827    }
828
829    /// Get neighbor nodes via a named relation.
830    /// Queries __links__ — consistent with the PyO3 binding.
831    pub fn neighbors(&self, frm: &str, rel: &str) -> Vec<Node> {
832        self.id_index
833            .list_ids("__links__")
834            .into_iter()
835            .filter_map(|id| self.get("__links__", &id))
836            .filter(|node| {
837                node.data.get("_from").and_then(|v| v.as_str()) == Some(frm)
838                    && node.data.get("_rel").and_then(|v| v.as_str()) == Some(rel)
839            })
840            .filter_map(|node| {
841                let to = node.data.get("_to")?.as_str()?;
842                let (to_coll, to_id) = to.split_once(':')?;
843                self.get(to_coll, to_id)
844            })
845            .collect()
846    }
847}
848
849impl Drop for Db {
850    /// Flush buffered state when the database is closed so a write-then-drop
851    /// sequence is durable without an explicit `flush_all()`.
852    ///
853    /// `IdIndex::set` only stages updates in the in-memory WAL `write_buf`;
854    /// disk persistence happens in `flush_write_buf()`, normally driven by the
855    /// manifest ticker. A short-lived `Db` (a library user's `{ let db =
856    /// Db::open(p)?; db.put(..)?; }` block, or a test) has no ticker, so without
857    /// this its writes would be silently lost on reopen. Flushing on drop
858    /// mirrors the flush-on-close contract of other embedded stores (sled,
859    /// RocksDB).
860    ///
861    /// In production this is a harmless safety net, not the primary durability
862    /// path: the manifest ticker thread holds an `Arc<Db>` for the process
863    /// lifetime, so `Drop` only fires once every owning handle is gone. No-op
864    /// for in-memory databases (`flush_all` short-circuits on `:memory:`).
865    fn drop(&mut self) {
866        self.flush_all();
867    }
868}
869
870/// Background cold-scan worker. Takes Arc<Db> — safe, Db is on the heap.
871fn cold_scan_background_arc(db: Arc<Db>) {
872    use rayon::prelude::*;
873    use blake2::{Blake2b512, Digest};
874
875    let objects        = &db.objects;
876    let head           = &db.head;
877    let seq_atomic     = &db.seq;
878    let sorted_indexes = &db.sorted_indexes;
879    let seq_index      = &db.seq_index;
880    let ready_flag     = Arc::clone(&db.startup_ready);
881
882    let hashes: Vec<String> = objects.all_hashes().collect();
883    let total = hashes.len();
884
885    if total == 0 {
886        ready_flag.store(true, Ordering::SeqCst);
887        return;
888    }
889
890    println!("  [nedbd] background scan — {} objects...", total);
891    let t0 = std::time::Instant::now();
892    let step = (total / 10).max(1000);
893
894    // Populate the seq index AS objects are read here, not in a second pass
895    // afterward: this loop is the slow, disk-I/O-bound phase (verifying and
896    // parsing every object), and it can run for minutes on a multi-million
897    // object store. `scan_status().indexed_count` reads `seq_index`'s size, so
898    // inserting here — not after `.collect()` — is what makes that a real, live
899    // progress signal through the phase that actually takes the time, instead
900    // of reporting a flat 0 until this whole pass finishes. Safe: DashMap
901    // supports concurrent inserts, and every parallel worker here inserts a
902    // disjoint key (each object has its own seq).
903    let nodes: Vec<Node> = hashes.par_iter()
904        .enumerate()
905        .filter_map(|(i, h)| {
906            if i > 0 && i % step == 0 {
907                let pct     = i * 100 / total;
908                let elapsed = t0.elapsed().as_secs_f32();
909                let rate    = i as f32 / elapsed;
910                let eta     = (total - i) as f32 / rate;
911                eprint!("\r  [nedbd]   {:>3}%  {:>8} / {:>8}  ({:>8.0}/s  eta {:.0}s)   ",
912                    pct, i, total, rate, eta);
913            }
914            let node = objects.read(h).ok()?;
915            seq_index.insert(node.seq, node.hash.clone());
916            Some(node)
917        })
918        .collect();
919
920    eprintln!("\r  [nedbd]   100%  {:>8} / {:>8}  ({:.1}s)                        ",
921        total, total, t0.elapsed().as_secs_f32());
922
923    let max_seq = nodes.iter().map(|n| n.seq).max().unwrap_or(0);
924    seq_atomic.store(max_seq + 1, Ordering::SeqCst);
925
926    // Per-collection tip: highest-seq node's hash, per coll. `nodes` is NOT
927    // seq-ordered here (it comes from an unordered object-hash scan), so this
928    // must track the max explicitly — unlike the live write path's "last call
929    // wins" (which relies on ascending call order that a scan doesn't have).
930    let mut coll_max: std::collections::HashMap<String, (u64, String)> = std::collections::HashMap::new();
931
932    for node in &nodes {
933        // seq_index was already populated above, during the read pass.
934        coll_max.entry(node.coll.clone())
935            .and_modify(|(s, h)| if node.seq > *s { *s = node.seq; *h = node.hash.clone(); })
936            .or_insert_with(|| (node.seq, node.hash.clone()));
937        if let Value::Object(ref obj) = node.data {
938            for (field, value) in obj {
939                if sorted_indexes.has(&node.coll, field) {
940                    sorted_indexes.insert(&node.coll, field, value, &node.hash);
941                }
942            }
943        }
944    }
945
946    for (coll, (seq, hash)) in coll_max {
947        db.coll_tip_hash.insert(coll, (seq, hash));
948    }
949
950    // Compute Merkle head from sorted hashes
951    let mut sorted_hashes = hashes;
952    sorted_hashes.sort();
953    let mut h = Blake2b512::new();
954    h.update(max_seq.to_le_bytes());
955    for hash_str in &sorted_hashes {
956        h.update(hash_str.as_bytes());
957    }
958    let new_head = hex::encode(&h.finalize()[..32]);
959    *head.write() = new_head;
960
961    // Tip = the highest-seq object we indexed. Persist its hash so tip() resolves
962    // O(1) on the next warm boot, before any scan repopulates the seq index.
963    let tip_hash = db.seq_index.iter()
964        .max_by_key(|kv| *kv.key())
965        .map(|kv| kv.value().clone())
966        .unwrap_or_default();
967    *db.tip_hash.write() = (max_seq, tip_hash);
968
969    // Write MANIFEST through the one canonical writer. The hand-rolled write
970    // this replaces stored `seq: max_seq` (the last USED seq) — but the warm
971    // boot loads `m.seq` as the NEXT-TO-ASSIGN counter, so a restart right
972    // after a quiet cold scan handed the next write the tip's seq: a duplicate
973    // seq in the log (seq_index overwrite, wrong since() page). flush_manifest
974    // reads the live counter (already max_seq + 1) — correct by construction.
975    db.flush_manifest();
976
977    // Signal server: writes can now proceed
978    ready_flag.store(true, Ordering::SeqCst);
979    println!("  [nedbd] background scan complete — seq={} objects={} MANIFEST written", max_seq, total);
980}
981
982fn now() -> f64 {
983    std::time::SystemTime::now()
984        .duration_since(std::time::UNIX_EPOCH)
985        .map(|d| d.as_secs_f64())
986        .unwrap_or(0.0)
987}
988
989#[cfg(test)]
990mod tests {
991    use super::*;
992    use tempfile::tempdir;
993
994    #[test]
995    fn put_and_get() {
996        let dir = tempdir().unwrap();
997        let db = Db::open(dir.path(), None).unwrap();
998        db.put(
999            "blocks", "618000",
1000            serde_json::json!({"height": 618000, "hash": "0000abc"}),
1001            vec![], None, None,
1002        ).unwrap();
1003        let node = db.get("blocks", "618000").unwrap();
1004        assert_eq!(node.id, "618000");
1005        assert_eq!(node.data["height"], 618000);
1006    }
1007
1008    #[test]
1009    fn order_by_with_sorted_index() {
1010        let dir = tempdir().unwrap();
1011        let db = Db::open(dir.path(), None).unwrap();
1012        db.create_sorted_index("blocks", "height");
1013        for h in [3u64, 1, 5, 2, 4] {
1014            db.put("blocks", &h.to_string(),
1015                serde_json::json!({"height": h}),
1016                vec![], None, None).unwrap();
1017        }
1018        let asc = db.order_by_asc("blocks", "height", 3);
1019        let heights: Vec<u64> = asc.iter()
1020            .filter_map(|n| n.data["height"].as_u64())
1021            .collect();
1022        assert_eq!(heights, vec![1, 2, 3]);
1023    }
1024
1025    #[test]
1026    fn causal_trace() {
1027        let dir = tempdir().unwrap();
1028        let db = Db::open(dir.path(), None).unwrap();
1029        let a = db.put("ops", "a", serde_json::json!({"op": "create"}), vec![], None, None).unwrap();
1030        let b = db.put("ops", "b", serde_json::json!({"op": "transfer"}), vec![a.hash.clone()], None, None).unwrap();
1031        let c = db.put("ops", "c", serde_json::json!({"op": "burn"}), vec![b.hash.clone()], None, None).unwrap();
1032
1033        let trace = db.trace(&c.hash, false, 10);
1034        assert_eq!(trace.len(), 3);  // c → b → a
1035    }
1036
1037    #[test]
1038    fn as_of() {
1039        let dir = tempdir().unwrap();
1040        let db = Db::open(dir.path(), None).unwrap();
1041        let v1 = db.put("docs", "x", serde_json::json!({"v": 1}), vec![], None, None).unwrap();
1042        let _v2 = db.put("docs", "x", serde_json::json!({"v": 2}), vec![], None, None).unwrap();
1043
1044        let at_v1 = db.get_as_of("docs", "x", v1.seq).unwrap();
1045        assert_eq!(at_v1.data["v"], 1);
1046        let current = db.get("docs", "x").unwrap();
1047        assert_eq!(current.data["v"], 2);
1048    }
1049}
1050
1051#[cfg(test)]
1052mod tests_v2 {
1053    use super::*;
1054    use tempfile::tempdir;
1055
1056    #[test]
1057    fn seq_index_populated_on_put() {
1058        let db = Db::in_memory();
1059        let a = db.put("item", "a", serde_json::json!({"x": 1}), vec![], None, None).unwrap();
1060        let b = db.put("item", "b", serde_json::json!({"x": 2}), vec![], None, None).unwrap();
1061        assert_eq!(db.get_hash_by_seq(a.seq), Some(a.hash.clone()));
1062        assert_eq!(db.get_hash_by_seq(b.seq), Some(b.hash.clone()));
1063        assert_eq!(db.get_hash_by_seq(9999), None);
1064    }
1065
1066    #[test]
1067    fn tip_and_since() {
1068        let db = Db::in_memory();
1069        // Empty db: no tip, empty changefeed.
1070        assert!(db.tip().is_none());
1071        assert!(db.since(0, 0).nodes.is_empty());
1072
1073        let a = db.put("item", "a", serde_json::json!({"x": 1}), vec![], None, None).unwrap();
1074        let b = db.put("item", "b", serde_json::json!({"x": 2}), vec![], None, None).unwrap();
1075
1076        // tip() = the most recent write (highest seq), returned as a full node.
1077        let t = db.tip().expect("tip after writes");
1078        assert_eq!(t.seq, b.seq);
1079        assert_eq!(t.id, "b");
1080        assert_eq!(t.hash, b.hash);
1081
1082        // since(after_seq, limit) — EXCLUSIVE cursor, bounded page + envelope.
1083        let after_a = db.since(a.seq, 0);
1084        assert_eq!(after_a.nodes.len(), 1);
1085        assert_eq!(after_a.nodes[0].id, "b");
1086        assert_eq!(after_a.from_seq, a.seq);
1087        assert_eq!(after_a.to_seq, b.seq);
1088        assert_eq!(after_a.head_seq, b.seq);
1089        assert!(!after_a.has_more);
1090
1091        // Nothing written after the tip.
1092        assert!(db.since(b.seq, 0).nodes.is_empty());
1093
1094        // `limit` bounds the page and sets has_more; resume from to_seq.
1095        let c = db.put("item", "c", serde_json::json!({"x": 3}), vec![], None, None).unwrap();
1096        let page = db.since(a.seq, 1);             // (a..] capped at 1 -> [b], more pending
1097        assert_eq!(page.nodes.len(), 1);
1098        assert_eq!(page.nodes[0].id, "b");
1099        assert_eq!(page.to_seq, b.seq);
1100        assert!(page.has_more);
1101        let page2 = db.since(page.to_seq, 1);      // resume from b -> [c], done
1102        assert_eq!(page2.nodes.len(), 1);
1103        assert_eq!(page2.nodes[0].id, "c");
1104        assert_eq!(page2.to_seq, c.seq);
1105        assert!(!page2.has_more);
1106    }
1107
1108    #[test]
1109    fn tip_collection_per_chain() {
1110        // The ITC sync-client case: separate chains in separate collections; a
1111        // consumer resumes ONE without pulling global tip and filtering.
1112        let db = Db::in_memory();
1113        assert!(db.tip_collection("blocks").is_none());
1114
1115        db.put("blocks", "b0", serde_json::json!({"h": 0}), vec![], None, None).unwrap();
1116        db.put("tx",     "t0", serde_json::json!({"v": 1}), vec![], None, None).unwrap();
1117        let b1 = db.put("blocks", "b1", serde_json::json!({"h": 1}), vec![], None, None).unwrap();
1118        let t1 = db.put("tx",     "t1", serde_json::json!({"v": 2}), vec![], None, None).unwrap();
1119
1120        // global tip = latest write overall (t1)
1121        assert_eq!(db.tip().unwrap().id, "t1");
1122        // collection-local tips = latest write in each collection
1123        let bt = db.tip_collection("blocks").expect("blocks tip");
1124        assert_eq!(bt.id, "b1");
1125        assert_eq!(bt.seq, b1.seq);
1126        assert_eq!(db.tip_collection("tx").unwrap().seq, t1.seq);
1127        assert!(db.tip_collection("absent").is_none());
1128    }
1129
1130    #[test]
1131    fn seq_index_survives_batch() {
1132        let db = Db::in_memory();
1133        let nodes = db.put_batch(vec![
1134            ("item".into(), "x".into(), serde_json::json!({"v": 1}), vec![], None, None),
1135            ("item".into(), "y".into(), serde_json::json!({"v": 2}), vec![], None, None),
1136        ]).unwrap();
1137        for node in &nodes {
1138            assert_eq!(db.get_hash_by_seq(node.seq), Some(node.hash.clone()));
1139        }
1140    }
1141
1142    /// Regression: put_batch must remove the superseded version's sorted-index
1143    /// entries, exactly like put() does. Old behavior left the old hashes in
1144    /// the BTree — ORDER BY returned superseded rows alongside current ones
1145    /// (they resolve fine through the content-addressed store, which made the
1146    /// stale rows look legitimate).
1147    #[test]
1148    fn put_batch_removes_superseded_sorted_index_entries() {
1149        let db = Db::in_memory();
1150        db.create_sorted_index("blocks", "height");
1151        db.put("blocks", "x", serde_json::json!({"height": 1}), vec![], None, None).unwrap();
1152        db.put_batch(vec![
1153            ("blocks".into(), "x".into(), serde_json::json!({"height": 99}), vec![], None, None),
1154        ]).unwrap();
1155
1156        let asc = db.order_by_asc("blocks", "height", 10);
1157        assert_eq!(asc.len(), 1, "stale index entry for the superseded version must be gone");
1158        assert_eq!(asc[0].data["height"], 99);
1159        assert_eq!(asc[0].id, "x");
1160    }
1161
1162    /// Updates without any sorted index must keep full version-chain semantics
1163    /// (guards the new skip-old-object-read fast path in put()).
1164    #[test]
1165    fn update_without_indexes_preserves_chain() {
1166        let db = Db::in_memory();
1167        let v1 = db.put("docs", "x", serde_json::json!({"v": 1}), vec![], None, None).unwrap();
1168        let v2 = db.put("docs", "x", serde_json::json!({"v": 2}), vec![], None, None).unwrap();
1169        assert_eq!(v2.prev.as_deref(), Some(v1.hash.as_str()), "prev chain must survive the fast path");
1170        assert_eq!(db.get("docs", "x").unwrap().data["v"], 2);
1171        assert_eq!(db.get_as_of("docs", "x", v1.seq).unwrap().data["v"], 1);
1172    }
1173
1174    #[test]
1175    fn link_and_neighbors() {
1176        let db = Db::in_memory();
1177        db.put("driver", "d1", serde_json::json!({"name": "Bob"}),   vec![], None, None).unwrap();
1178        db.put("driver", "d2", serde_json::json!({"name": "Carol"}), vec![], None, None).unwrap();
1179        db.put("trip",   "t1", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
1180        db.put("trip",   "t2", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
1181
1182        db.link("driver:d1", "handles", "trip:t1").unwrap();
1183        db.link("driver:d1", "handles", "trip:t2").unwrap();
1184        db.link("driver:d2", "handles", "trip:t1").unwrap();
1185
1186        let d1_trips = db.neighbors("driver:d1", "handles");
1187        assert_eq!(d1_trips.len(), 2);
1188        let ids: std::collections::HashSet<&str> = d1_trips.iter().map(|n| n.id.as_str()).collect();
1189        assert!(ids.contains("t1") && ids.contains("t2"));
1190
1191        let d2_trips = db.neighbors("driver:d2", "handles");
1192        assert_eq!(d2_trips.len(), 1);
1193        assert_eq!(d2_trips[0].id, "t1");
1194    }
1195
1196    #[test]
1197    fn link_stored_in_links_collection() {
1198        // Links are stored as __links__ documents, not as graph edges.
1199        // The __links__ collection is NQL-queryable and consistent with the PyO3 binding.
1200        let db = Db::in_memory();
1201        db.put("driver", "d1", serde_json::json!({"name": "Bob"}),   vec![], None, None).unwrap();
1202        db.put("trip",   "t1", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
1203        db.link("driver:d1", "handles", "trip:t1").unwrap();
1204        // Verify the __links__ document was created
1205        let link_doc = db.get("__links__", "driver:d1|handles|trip:t1");
1206        assert!(link_doc.is_some(), "__links__ doc should exist");
1207        let doc = link_doc.unwrap();
1208        assert_eq!(doc.data["_from"], "driver:d1");
1209        assert_eq!(doc.data["_rel"],  "handles");
1210        assert_eq!(doc.data["_to"],   "trip:t1");
1211        // neighbors() resolves to the target node
1212        let nb = db.neighbors("driver:d1", "handles");
1213        assert_eq!(nb.len(), 1);
1214        assert_eq!(nb[0].id, "t1");
1215    }
1216
1217    #[test]
1218    fn link_missing_node_errors() {
1219        let db = Db::in_memory();
1220        db.put("driver", "d1", serde_json::json!({}), vec![], None, None).unwrap();
1221        assert!(db.link("driver:d1", "handles", "trip:ghost").is_err());
1222    }
1223
1224    #[test]
1225    fn link_durable_survives_reopen() {
1226        let dir = tempdir().unwrap();
1227        {
1228            let db = Db::open(dir.path(), None).unwrap();
1229            db.put("driver", "d1", serde_json::json!({"name": "Bob"}),   vec![], None, None).unwrap();
1230            db.put("trip",   "t1", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
1231            db.link("driver:d1", "handles", "trip:t1").unwrap();
1232        }
1233        let db2 = Db::open(dir.path(), None).unwrap();
1234        db2.startup_ready.store(true, std::sync::atomic::Ordering::SeqCst);
1235        let trips = db2.neighbors("driver:d1", "handles");
1236        assert_eq!(trips.len(), 1);
1237        assert_eq!(trips[0].id, "t1");
1238    }
1239
1240    #[test]
1241    fn tip_survives_warm_restart() {
1242        // v2.5.43: tip() returns the last written object AND survives a warm restart.
1243        // On reopen the seq_index is cold (warm start skips the scan), so tip() must
1244        // resolve the last write via the MANIFEST tip_hash fallback — no scan.
1245        let dir = tempdir().unwrap();
1246        {
1247            let db = Db::open(dir.path(), None).unwrap();
1248            db.put("blocks", "b1", serde_json::json!({"h": 1}), vec![], None, None).unwrap();
1249            db.put("blocks", "b2", serde_json::json!({"h": 2}), vec![], None, None).unwrap();
1250            db.flush_all(); // persists MANIFEST incl. tip_hash
1251            assert_eq!(db.tip().expect("tip in-session").id, "b2");
1252        }
1253        // Warm reopen: MANIFEST present -> no cold scan -> seq_index cold.
1254        let db2 = Db::open(dir.path(), None).unwrap();
1255        assert!(db2.get_hash_by_seq(1).is_none(), "seq_index is cold on a warm boot");
1256        let tip = db2.tip().expect("tip() must survive a warm restart");
1257        assert_eq!(tip.id, "b2");
1258        assert_eq!(tip.data.get("h").and_then(|v| v.as_i64()), Some(2));
1259    }
1260
1261    #[test]
1262    fn tip_collection_survives_warm_restart() {
1263        // Same contract as tip(), per collection: itc-node-rs resumes headers /
1264        // blocks / l2_receipts independently, so each must be its own durable
1265        // resume point — not just the global tip.
1266        let dir = tempdir().unwrap();
1267        {
1268            let db = Db::open(dir.path(), None).unwrap();
1269            db.put("blocks", "b1", serde_json::json!({"h": 1}), vec![], None, None).unwrap();
1270            db.put("tx",     "t1", serde_json::json!({"v": 1}), vec![], None, None).unwrap();
1271            let b2 = db.put("blocks", "b2", serde_json::json!({"h": 2}), vec![], None, None).unwrap();
1272            db.flush_all(); // persists MANIFEST incl. coll_tips
1273            assert_eq!(db.tip_collection("blocks").unwrap().id, "b2");
1274            assert_eq!(db.tip_collection("blocks").unwrap().seq, b2.seq);
1275        }
1276        // Warm reopen: MANIFEST present -> no cold scan -> seq_index cold.
1277        let db2 = Db::open(dir.path(), None).unwrap();
1278        assert!(db2.get_hash_by_seq(0).is_none(), "seq_index is cold on a warm boot");
1279        let blocks_tip = db2.tip_collection("blocks").expect("tip_collection must survive a warm restart");
1280        assert_eq!(blocks_tip.id, "b2");
1281        assert_eq!(blocks_tip.data.get("h").and_then(|v| v.as_i64()), Some(2));
1282        let tx_tip = db2.tip_collection("tx").expect("tx tip must also survive");
1283        assert_eq!(tx_tip.id, "t1");
1284        assert!(db2.tip_collection("absent").is_none());
1285    }
1286
1287    #[test]
1288    fn cold_scan_indexes_every_object_and_reports_completion() {
1289        // Regression guard for the cold-scan refactor: seq_index is now populated
1290        // DURING the parallel read pass (for live scan_status().indexed_count
1291        // progress — see cold_scan_background_arc), not in a second pass
1292        // afterward. This asserts the end state is unchanged: every written
1293        // object is indexed, tip()/tip_collection() are correct, and
1294        // scan_complete eventually reports true.
1295        let dir = tempdir().unwrap();
1296        let n = 25u64;
1297        {
1298            let db = Db::open(dir.path(), None).unwrap();
1299            for i in 0..n {
1300                db.put("things", &i.to_string(), serde_json::json!({"i": i}), vec![], None, None).unwrap();
1301            }
1302            db.flush_all();
1303        }
1304        // Force a COLD start regardless of the MANIFEST nedb-v2 itself would
1305        // have written: delete it so startup_rebuild() takes the cold path and
1306        // start_cold_scan() actually spawns the background scan this test needs
1307        // to exercise.
1308        std::fs::remove_file(dir.path().join("MANIFEST")).unwrap();
1309
1310        let db = Db::open(dir.path(), None).unwrap();
1311        assert!(!db.scan_status().scan_complete, "should be cold immediately after open");
1312        let db = std::sync::Arc::new(db);
1313        Db::start_cold_scan(std::sync::Arc::clone(&db));
1314
1315        let deadline = std::time::Instant::now() + std::time::Duration::from_secs(10);
1316        while !db.scan_status().scan_complete {
1317            assert!(std::time::Instant::now() < deadline, "cold scan did not complete in time");
1318            std::thread::sleep(std::time::Duration::from_millis(5));
1319        }
1320
1321        let status = db.scan_status();
1322        assert_eq!(status.indexed_count, n as usize, "every written object must be indexed");
1323        assert!(status.scan_complete);
1324
1325        let tip = db.tip().expect("tip resolves after cold scan");
1326        assert_eq!(tip.data.get("i").and_then(|v| v.as_u64()), Some(n - 1));
1327        let coll_tip = db.tip_collection("things").expect("tip_collection resolves after cold scan");
1328        assert_eq!(coll_tip.id, tip.id);
1329    }
1330
1331    /// Concurrent writers must settle the tip at the HIGHEST SEQ, and that tip
1332    /// must survive a warm restart. Before the seq-guarded tip fix, update_head
1333    /// was "last call wins": a slower thread carrying an OLDER seq could
1334    /// overwrite tip_hash after a newer write, and MANIFEST then persisted the
1335    /// stale tip for the next warm boot (flaky by nature — this pins the
1336    /// contract deterministically for the fixed code).
1337    #[test]
1338    fn concurrent_puts_tip_resolves_to_highest_seq_after_warm_restart() {
1339        let dir = tempdir().unwrap();
1340        let total: u64 = 100;
1341        {
1342            let db = std::sync::Arc::new(Db::open(dir.path(), None).unwrap());
1343            let mut handles = vec![];
1344            for t in 0..4u64 {
1345                let db2 = std::sync::Arc::clone(&db);
1346                handles.push(std::thread::spawn(move || {
1347                    for i in 0..25u64 {
1348                        db2.put("c", &format!("{}-{}", t, i),
1349                                serde_json::json!({"t": t, "i": i}),
1350                                vec![], None, None).unwrap();
1351                    }
1352                }));
1353            }
1354            for h in handles { h.join().unwrap(); }
1355            // In-session: tip must be the highest assigned seq.
1356            let expected = db.seq.load(std::sync::atomic::Ordering::SeqCst) - 1;
1357            assert_eq!(expected, total - 1, "exactly {} writes expected", total);
1358            assert_eq!(db.tip().expect("in-session tip").seq, expected);
1359            db.flush_all(); // persist MANIFEST incl. tip_hash
1360        }
1361        // Warm reopen: seq_index cold; tip() resolves via MANIFEST tip_hash.
1362        let db2 = Db::open(dir.path(), None).unwrap();
1363        let tip = db2.tip().expect("tip must survive warm restart after concurrent writes");
1364        assert_eq!(tip.seq, total - 1, "warm-boot tip must be the highest-seq write");
1365        // Per-collection tip: same contract.
1366        let ct = db2.tip_collection("c").expect("coll tip survives");
1367        assert_eq!(ct.seq, total - 1);
1368    }
1369
1370    /// Regression for the cold-scan MANIFEST seq off-by-one. The scan's old
1371    /// hand-rolled MANIFEST stored `seq: max_seq` (the last USED seq), but the
1372    /// warm boot loads `m.seq` as the NEXT-TO-ASSIGN counter — so a restart
1373    /// right after a quiet cold scan handed the next write the tip's seq:
1374    /// a DUPLICATE seq in the log (seq_index overwrite, wrong since() page).
1375    /// The scan now writes MANIFEST via flush_manifest(), which reads the live
1376    /// counter (max_seq + 1).
1377    #[test]
1378    fn manifest_after_cold_scan_does_not_reuse_tip_seq() {
1379        let dir = tempdir().unwrap();
1380        let old_tip_seq;
1381        {
1382            let db = Db::open(dir.path(), None).unwrap();
1383            for i in 0..5u64 {
1384                db.put("things", &i.to_string(), serde_json::json!({"i": i}), vec![], None, None).unwrap();
1385            }
1386            db.flush_all();
1387            old_tip_seq = db.tip().unwrap().seq;
1388        }
1389        // Force a cold start: remove MANIFEST so the background scan runs and
1390        // writes a fresh MANIFEST itself.
1391        std::fs::remove_file(dir.path().join("MANIFEST")).unwrap();
1392        {
1393            let db = std::sync::Arc::new(Db::open(dir.path(), None).unwrap());
1394            Db::start_cold_scan(std::sync::Arc::clone(&db));
1395            let deadline = std::time::Instant::now() + std::time::Duration::from_secs(10);
1396            while !db.scan_status().scan_complete {
1397                assert!(std::time::Instant::now() < deadline, "cold scan did not complete");
1398                std::thread::sleep(std::time::Duration::from_millis(5));
1399            }
1400            // No further writes — the scan's own MANIFEST is what the next boot sees.
1401        }
1402        // Warm reopen from the scan-written MANIFEST: the next write must get a
1403        // FRESH seq, never the tip's.
1404        let db3 = Db::open(dir.path(), None).unwrap();
1405        let tip_before = db3.tip().expect("tip survives scan-written MANIFEST");
1406        assert_eq!(tip_before.seq, old_tip_seq, "tip identity preserved across the scan");
1407        let new_node = db3.put("things", "next", serde_json::json!({"fresh": true}),
1408                               vec![], None, None).unwrap();
1409        assert!(new_node.seq > old_tip_seq,
1410                "new write reused seq {} (tip was {}) — duplicate seq in the log",
1411                new_node.seq, old_tip_seq);
1412    }
1413}