Skip to main content

nedb_engine/
db.rs

1//! Main DAG database — coordinates ObjectStore, IdIndex, SortedIndexes, GraphStore.
2
3use std::fs;
4use std::path::{Path, PathBuf};
5use std::sync::Arc;
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7use anyhow::Result;
8use dashmap::DashMap;
9use serde_json::Value;
10use parking_lot::RwLock;
11
12use crate::store::{Dek, Node, ObjectStore};
13use crate::index::{IdIndex, OrderedValue, SortedIndexes};
14use crate::graph::GraphStore;
15use crate::migrate;
16
17/// MANIFEST: cached {seq, head} written atomically after every write.
18/// On startup, if MANIFEST exists and no sorted indexes need rebuilding,
19/// startup is O(1) — just read this one file instead of scanning all objects.
20#[derive(serde::Serialize, serde::Deserialize)]
21struct Manifest {
22    seq:  u64,
23    head: String,
24    /// Object hash of the highest-seq node at flush time. Lets `tip()` resolve the
25    /// last write O(1) on a warm boot — before any scan repopulates the in-memory
26    /// seq index. `#[serde(default)]` so pre-2.5.43 MANIFESTs (no field) still parse.
27    #[serde(default)]
28    tip_hash: String,
29    /// Per-collection tip: `coll -> object hash of the highest-seq node in that
30    /// collection`. Lets `tip_collection()` resolve O(1) on a warm boot, same
31    /// contract as `tip_hash` for the global head. `#[serde(default)]` so
32    /// pre-this-field MANIFESTs still parse (empty map — self-heals on next write
33    /// or cold scan).
34    #[serde(default)]
35    coll_tips: std::collections::HashMap<String, String>,
36}
37
38/// Default cap for `since()` when the caller passes `limit == 0`. Bounds the
39/// engine primitive itself so a stale/offline consumer can never force an
40/// unbounded materialization — the safety lives in the core, not the HTTP layer.
41pub const DEFAULT_SINCE_LIMIT: usize = 10_000;
42
43/// One page of the changefeed returned by `since()`. The replication contract:
44/// apply `nodes` in ascending seq order, advance your cursor to `to_seq`, and keep
45/// paging while `has_more` is true; then attach to the live `subscribe` edge.
46/// `head_seq` tells the consumer how far the log currently extends (how far behind
47/// it is).
48#[derive(Debug, Clone, serde::Serialize)]
49pub struct SinceBatch {
50    /// Writes in (`from_seq`, `to_seq`], ascending by seq.
51    pub nodes:    Vec<Node>,
52    /// The exclusive cursor this page started from (echoes the request).
53    pub from_seq: u64,
54    /// Seq of the last node in this page — the consumer's next cursor.
55    pub to_seq:   u64,
56    /// Current head seq of the log (latest committed write).
57    pub head_seq: u64,
58    /// True when more writes remain past `to_seq` (the page hit `limit`).
59    pub has_more: bool,
60}
61
62/// Replication readiness snapshot. `scan_complete` is the correctness gate: until
63/// the cold-scan finishes rebuilding the seq index, an old cursor passed to
64/// `since()` can return a PARTIAL page and look (wrongly) like "caught up". A
65/// correctness-critical consumer MUST wait for `scan_complete == true` before
66/// trusting historical catch-up. `indexed_seq_min/max` report the currently
67/// resolvable seq range; `tip_seq` is the log head.
68#[derive(Debug, Clone, serde::Serialize)]
69pub struct ScanStatus {
70    /// Cold-scan finished — historical seqs fully resolvable; catch-up is safe.
71    pub scan_complete:   bool,
72    /// Head seq of the log (latest committed write).
73    pub tip_seq:         u64,
74    /// Lowest seq currently in the seq index (0 if empty).
75    pub indexed_seq_min: u64,
76    /// Highest seq currently in the seq index.
77    pub indexed_seq_max: u64,
78    /// Number of seqs currently resolvable via the index.
79    pub indexed_count:   usize,
80}
81
82pub struct Db {
83    pub objects:        ObjectStore,
84    pub id_index:       IdIndex,
85    pub sorted_indexes: SortedIndexes,
86    pub graph:          GraphStore,
87    pub root:           PathBuf,
88    /// Dirty flag — set true when head changes, cleared after manifest flush.
89    /// Decouples flush_manifest from the hot write path so concurrent writes
90    /// don't serialise on 2× file I/O per PUT.
91    manifest_dirty:     Arc<AtomicBool>,
92    pub seq:            AtomicU64,
93    /// Cached Merkle head — updated incrementally on every write (O(1)).
94    head:               RwLock<String>,
95    /// Object hash of the most recent write (highest seq). Mirrors `head` but holds
96    /// the tip's content hash, so `tip()` can resolve the last node O(1) on a warm
97    /// boot when the in-memory `seq_index` is still cold. Persisted in MANIFEST.
98    tip_hash:           RwLock<String>,
99    /// Per-collection tip: `coll -> object hash of the highest-seq node in that
100    /// collection`. Kept current on every write (`update_head`), restored from
101    /// MANIFEST on warm boot, rebuilt by the cold scan — so `tip_collection()` is
102    /// O(1) and durable across restarts in every startup regime, by construction.
103    coll_tip_hash:      Arc<DashMap<String, String>>,
104    /// True once startup is fully ready (MANIFEST loaded or cold scan complete).
105    /// Warm starts set this true before returning from open().
106    /// Cold starts set this true in the background thread when scan completes.
107    /// Writes are held with 503 until this is true; reads always proceed.
108    pub startup_ready:  Arc<AtomicBool>,
109    /// Seq → hash lookup for v1 compatibility. Populated by put(), put_batch(),
110    /// and the cold-scan background pass. Only covers nodes from the current
111    /// process session + cold-scan; older seqs not in this map cannot be resolved.
112    seq_index:          Arc<DashMap<u64, String>>,
113}
114
115impl Db {
116    /// Create a pure in-memory database — no disk I/O, no migration, instant startup.
117    /// Perfect for tests, hot-cache layers, and ephemeral sessions.
118    /// All data is lost when the Db is dropped.
119    pub fn in_memory() -> Self {
120        Self {
121            objects:        ObjectStore::in_memory(),
122            id_index:       IdIndex::in_memory(),
123            sorted_indexes: SortedIndexes::new(),
124            graph:          GraphStore::in_memory(),
125            root:           std::path::PathBuf::from(":memory:"),
126            seq:            AtomicU64::new(0),
127            head:           RwLock::new(String::new()),
128            tip_hash:       RwLock::new(String::new()),
129            coll_tip_hash:  Arc::new(DashMap::new()),
130            startup_ready:  Arc::new(AtomicBool::new(true)),  // always ready
131            manifest_dirty: Arc::new(AtomicBool::new(false)),
132            seq_index:      Arc::new(DashMap::new()),
133        }
134    }
135
136    /// Open (or create) a database. Runs v1→v2 migration automatically if log.aof is present.
137    pub fn open(db_root: &Path, dek: Option<Dek>) -> Result<Self> {
138        std::fs::create_dir_all(db_root)?;
139
140        let objects        = ObjectStore::new(db_root, dek.clone())?;
141        let id_index       = IdIndex::new(db_root)?;
142        let sorted_indexes = SortedIndexes::new();
143        let graph          = GraphStore::new(db_root)?;
144
145        let mut db = Self {
146            objects,
147            id_index,
148            sorted_indexes,
149            graph,
150            root: db_root.to_path_buf(),
151            seq:  AtomicU64::new(0),
152            head: RwLock::new(String::new()),
153            tip_hash: RwLock::new(String::new()),
154            coll_tip_hash: Arc::new(DashMap::new()),
155            startup_ready:  Arc::new(AtomicBool::new(false)),
156            manifest_dirty: Arc::new(AtomicBool::new(false)),
157            seq_index:      Arc::new(DashMap::new()),
158        };
159
160        // Auto-migrate v1 → v2 if needed (pass DEK so encrypted AOFs convert correctly)
161        migrate::migrate_if_needed(
162            db_root,
163            &db.objects,
164            &db.id_index,
165            &db.sorted_indexes,
166            &db.graph,
167            dek.as_ref(),
168        )?;
169
170        // Fast startup: load seq+head from MANIFEST if no sorted indexes need rebuilding.
171        // Falls back to full object scan only when necessary (first open, or post-migration).
172        db.startup_rebuild()?;
173
174        Ok(db)
175    }
176
177    /// Smart startup:
178    /// - Warm (MANIFEST exists): O(1) load → startup_ready = true immediately.
179    /// - Cold (no MANIFEST): start server immediately, run scan in background thread.
180    ///   Writes return 503 until scan completes; reads always proceed.
181    fn startup_rebuild(&mut self) -> Result<()> {
182        let manifest_path = self.root.join("MANIFEST");
183        let needs_index_rebuild = !self.sorted_indexes.is_empty();
184
185        // Warm path: MANIFEST + no sorted indexes to rebuild → instant start
186        if manifest_path.exists() && !needs_index_rebuild {
187            if let Some(m) = fs::read_to_string(&manifest_path)
188                .ok()
189                .and_then(|s| serde_json::from_str::<Manifest>(&s).ok())
190            {
191                // Self-heal: MANIFEST with an empty or short head is corrupt/stale.
192                // Fall through to cold scan so the head is rebuilt correctly from objects.
193                if m.head.len() < 8 {
194                    eprintln!("  [nedbd] MANIFEST head invalid (len={}), self-healing via cold scan", m.head.len());
195                } else if m.tip_hash.is_empty() {
196                    // Pre-2.5.43 MANIFEST (no persisted tip). Cold-scan once to rebuild
197                    // the seq index and rewrite MANIFEST with tip_hash — warm + tip()-
198                    // durable on every boot thereafter.
199                    eprintln!("  [nedbd] MANIFEST predates durable tip() — cold scan once to upgrade");
200                } else {
201                    self.seq.store(m.seq, Ordering::SeqCst); // m.seq is already the next-to-assign counter
202                    *self.head.write() = m.head.clone();
203                    *self.tip_hash.write() = m.tip_hash.clone();
204                    for (coll, hash) in &m.coll_tips {
205                        self.coll_tip_hash.insert(coll.clone(), hash.clone());
206                    }
207                    self.startup_ready.store(true, Ordering::SeqCst);
208                    println!("  [nedbd] warm start — seq={} head={}... tip={}...",
209                        m.seq, &m.head[..8], &m.tip_hash[..8.min(m.tip_hash.len())]);
210                    return Ok(());
211                }
212            } else {
213                eprintln!("  [nedbd] MANIFEST corrupt or missing, falling back to cold scan");
214            }
215        }
216
217        // Cold path: mark as not ready, return immediately.
218        // The actual background scan is started by Db::start_cold_scan(arc)
219        // which is called from Manager::open_all() AFTER Arc::new(db) — when
220        // the Db is heap-allocated and its field addresses are permanently stable.
221        // Capturing field addresses here would cause UB: Db moves on return.
222        println!("  [nedbd] cold start — background scan will start after heap allocation");
223        Ok(())
224    }
225
226    /// Call this from Manager::open_all() after Arc::new(db).
227    /// Spawns the cold scan background thread with stable heap addresses.
228    /// No-op if startup is already complete (warm start).
229    pub fn start_cold_scan(self_arc: Arc<Self>) {
230        if self_arc.startup_ready.load(Ordering::SeqCst) {
231            return; // warm start — already ready
232        }
233        // Fast path: if the database is empty (new or just created), skip the
234        // background thread entirely. No objects to scan = instant startup.
235        if self_arc.objects.all_hashes().next().is_none() {
236            self_arc.startup_ready.store(true, Ordering::SeqCst);
237            return;
238        }
239        println!("  [nedbd] cold start — background scan starting, server accepting reads now");
240        std::thread::spawn(move || {
241            let db = self_arc;
242            cold_scan_background_arc(db);
243        });
244    }
245
246    /// Write a document. Returns the new node with its content hash set.
247    pub fn put(
248        &self,
249        coll: &str,
250        id: &str,
251        data: Value,
252        caused_by: Vec<String>,
253        valid_from: Option<String>,
254        valid_to:   Option<String>,
255    ) -> Result<Node> {
256        let seq  = self.seq.fetch_add(1, Ordering::SeqCst);
257        let prev = self.id_index.get(coll, id);
258
259        // Remove old node from sorted indexes (it's being superseded)
260        if let Some(old_hash) = &prev {
261            if let Ok(old_node) = self.objects.read(old_hash) {
262                if let Value::Object(ref obj) = old_node.data {
263                    for (field, value) in obj {
264                        self.sorted_indexes.remove(coll, field, value, old_hash);
265                    }
266                }
267            }
268        }
269
270        let mut node = Node {
271            id:         id.to_string(),
272            coll:       coll.to_string(),
273            seq,
274            data:       data.clone(),
275            prev,
276            caused_by:  caused_by.clone(),
277            ts:         now(),
278            valid_from,
279            valid_to,
280            hash:       String::new(),
281        };
282
283        // Write to object store (atomic, content-addressed)
284        let hash = self.objects.write(&mut node)?;
285        self.seq_index.insert(seq, hash.clone());
286
287        // Update id index (atomic file)
288        self.id_index.set(coll, id, &hash)?;
289
290        // Update sorted indexes
291        if let Value::Object(ref obj) = data {
292            for (field, value) in obj {
293                if self.sorted_indexes.has(coll, field) {
294                    self.sorted_indexes.insert(coll, field, value, &hash);
295                }
296            }
297        }
298
299        // Write causal graph edges
300        for cause in &caused_by {
301            self.graph.add_edge(&hash, "caused_by", cause)?;
302            self.graph.add_edge(cause, "caused_by_rev", &hash)?;
303        }
304
305        // Update running Merkle head: O(1) chain, no full recompute.
306        // new_head = BLAKE2b(prev_head || seq_bytes || new_object_hash)
307        self.update_head(coll, seq, &hash);
308
309        Ok(node)
310    }
311
312    /// Batch put: write N documents in parallel, preserving monotonic seq ordering.
313    /// Pre-allocates N seq numbers atomically, then parallelises object writes and
314    /// id-index updates via Rayon. Each op is independent — safe to parallelise.
315    /// Returns nodes in input order with assigned seq numbers.
316    pub fn put_batch(
317        &self,
318        ops: Vec<(String, String, Value, Vec<String>, Option<String>, Option<String>)>,
319        // (coll, id, data, caused_by, valid_from, valid_to)
320    ) -> Result<Vec<Node>> {
321        use rayon::prelude::*;
322
323        if ops.is_empty() { return Ok(vec![]); }
324        let n = ops.len() as u64;
325
326        // Pre-allocate N consecutive seq numbers — preserves ordering under concurrency
327        let base_seq = self.seq.fetch_add(n, Ordering::SeqCst);
328        let ts = now();
329
330        // Build nodes with assigned seq numbers
331        let mut nodes: Vec<Node> = ops.into_iter().enumerate().map(|(i, (coll, id, data, caused_by, valid_from, valid_to))| {
332            let prev = self.id_index.get(&coll, &id);
333            Node {
334                id, coll, seq: base_seq + i as u64,
335                data, prev, caused_by,
336                ts, valid_from, valid_to,
337                hash: String::new(),
338            }
339        }).collect();
340
341        // Parallel object writes (content-addressed, idempotent, safe to parallelise)
342        let write_errors: Vec<anyhow::Error> = nodes.par_iter_mut()
343            .filter_map(|node| self.objects.write(node).err())
344            .collect();
345        if let Some(e) = write_errors.into_iter().next() { return Err(e); }
346
347        // Parallel id-index updates
348        let index_errors: Vec<anyhow::Error> = nodes.par_iter()
349            .filter_map(|node| self.id_index.set(&node.coll, &node.id, &node.hash).err())
350            .collect();
351        if let Some(e) = index_errors.into_iter().next() { return Err(e); }
352
353        // Sorted indexes + causal graph (sequential — small overhead, usually no indexes)
354        for node in &nodes {
355            self.seq_index.insert(node.seq, node.hash.clone());
356            if let Value::Object(ref obj) = node.data {
357                for (field, value) in obj {
358                    if self.sorted_indexes.has(&node.coll, field) {
359                        self.sorted_indexes.insert(&node.coll, field, value, &node.hash);
360                    }
361                }
362            }
363            for cause in &node.caused_by {
364                self.graph.add_edge(&node.hash, "caused_by", cause).ok();
365                self.graph.add_edge(cause, "caused_by_rev", &node.hash).ok();
366            }
367        }
368
369        // Single Merkle head update for the whole batch (chain all hashes)
370        for node in &nodes {
371            self.update_head(&node.coll, node.seq, &node.hash);
372        }
373
374        Ok(nodes)
375    }
376
377    /// Update the running Merkle head with a new write. O(1), lock-free on the flush path.
378    /// Sets dirty flag — the background ticker calls flush_manifest periodically.
379    /// This removes 2× file I/O ops from the hot write path, unblocking concurrent writes.
380    fn update_head(&self, coll: &str, seq: u64, new_hash: &str) {
381        use blake2::{Blake2b512, Digest};
382        let prev = self.head.read().clone();
383        let mut h = Blake2b512::new();
384        h.update(prev.as_bytes());
385        h.update(seq.to_le_bytes());
386        h.update(new_hash.as_bytes());
387        *self.head.write() = hex::encode(&h.finalize()[..32]);
388        // Track the tip's object hash. update_head arrives in ascending seq order
389        // (put in seq order; put_batch loops the batch in order), so the last call
390        // wins → the highest-seq node's hash. Persisted in MANIFEST for warm-boot tip().
391        *self.tip_hash.write() = new_hash.to_string();
392        // Same contract, per-collection: last call for this coll wins → that
393        // collection's highest-seq hash. Persisted in MANIFEST for warm-boot
394        // tip_collection().
395        self.coll_tip_hash.insert(coll.to_string(), new_hash.to_string());
396        // Mark dirty — background ticker will flush to MANIFEST (no I/O on write path)
397        self.manifest_dirty.store(true, Ordering::Release);
398    }
399
400    /// Flush both the id-index WAL and MANIFEST. Used on graceful shutdown.
401    pub fn flush_all(&self) {
402        self.id_index.flush_write_buf();
403        // v3: fsync the active segment (no-op for loose/in-memory stores).
404        // One durability point per batch instead of one fsync per object.
405        if let Err(e) = self.objects.sync() {
406            eprintln!("nedb: segment sync failed: {}", e);
407        }
408        self.flush_manifest();
409    }
410
411    /// Compact the v3 packed object store: keep the CURRENT version of every
412    /// document (from the id-index) and reclaim everything else. No-op unless
413    /// running with the v3 segment substrate (`--dag-v3` / NEDB_DAG_V3).
414    ///
415    /// This is a PRUNING operation: superseded/historical object versions are
416    /// dropped, so AS OF / TRACE over pruned versions is discarded — that is
417    /// what reclaims the space. Flushes first so all data is durable on disk
418    /// before the old segments are deleted.
419    pub fn compact(&self) -> Result<crate::segment::CompactStats> {
420        self.flush_all();
421        let mut live: std::collections::HashSet<String> = std::collections::HashSet::new();
422        for coll in self.id_index.collections() {
423            for id in self.id_index.list_ids(&coll) {
424                if let Some(h) = self.id_index.get(&coll, &id) {
425                    live.insert(h);
426                }
427            }
428        }
429        self.objects.compact(&live)
430    }
431
432    /// Flush MANIFEST to disk if dirty. No-op for in-memory databases.
433    pub fn flush_manifest_if_dirty(&self) {
434        if self.root == std::path::PathBuf::from(":memory:") { return; }
435        if self.manifest_dirty.compare_exchange(
436            true, false, Ordering::AcqRel, Ordering::Relaxed
437        ).is_ok() {
438            self.flush_manifest();
439        }
440    }
441
442    /// Atomically persist current seq+head to MANIFEST. No-op for in-memory databases.
443    pub fn flush_manifest(&self) {
444        if self.root == std::path::PathBuf::from(":memory:") { return; }
445        let seq  = self.seq.load(Ordering::SeqCst);
446        let head = self.head.read().clone();
447        let tip_hash = self.tip_hash.read().clone();
448        let coll_tips: std::collections::HashMap<String, String> = self.coll_tip_hash
449            .iter()
450            .map(|kv| (kv.key().clone(), kv.value().clone()))
451            .collect();
452        let m = Manifest { seq, head, tip_hash, coll_tips };
453        if let Ok(json) = serde_json::to_string(&m) {
454            let path = self.root.join("MANIFEST");
455            let tmp  = self.root.join("MANIFEST.tmp");
456            let _ = fs::write(&tmp, &json);
457            let _ = fs::rename(&tmp, &path);
458        }
459    }
460
461    /// Start a background thread that flushes both the id-index WAL and MANIFEST
462    /// every `interval_ms` milliseconds.
463    /// Call this after Arc::new(db) — the Arc keeps Db alive for the thread's lifetime.
464    pub fn start_manifest_ticker(self_arc: Arc<Self>, interval_ms: u64) {
465        let db = self_arc;
466        std::thread::spawn(move || {
467            loop {
468                std::thread::sleep(std::time::Duration::from_millis(interval_ms));
469                // Flush id-index WAL to disk (parallel Rayon writes)
470                db.id_index.flush_write_buf();
471                // Then flush MANIFEST
472                db.flush_manifest_if_dirty();
473            }
474        });
475    }
476
477    /// Return the current Merkle head string. O(1) — read from cache.
478    pub fn head(&self) -> String {
479        self.head.read().clone()
480    }
481
482    /// Delete a document — writes a tombstone node and removes the id from the index.
483    /// The object history is preserved in the DAG; only the live id pointer is cleared.
484    pub fn delete(&self, coll: &str, id: &str) -> Result<bool> {
485        let prev = match self.id_index.get(coll, id) {
486            None => return Ok(false),   // already gone
487            Some(h) => h,
488        };
489        let seq = self.seq.fetch_add(1, Ordering::SeqCst);
490        let mut tombstone = Node {
491            id:         format!("_del_{}", id),
492            coll:       coll.to_string(),
493            seq,
494            data:       serde_json::json!({"_deleted": id, "_prev": prev}),
495            prev:       Some(prev),
496            caused_by:  vec![],
497            ts:         now(),
498            valid_from: None,
499            valid_to:   None,
500            hash:       String::new(),
501        };
502        let hash = self.objects.write(&mut tombstone)?;
503        self.update_head(coll, seq, &hash);
504        // Remove the live id pointer — doc is now invisible to queries and list()
505        self.id_index.remove(coll, id)?;
506        Ok(true)
507    }
508
509    /// Get the current version of a document by id.
510    pub fn get(&self, coll: &str, id: &str) -> Option<Node> {
511        let hash = self.id_index.get(coll, id)?;
512        self.objects.read(&hash).ok()
513    }
514
515    /// Get a specific version of a document by object hash.
516    pub fn get_by_hash(&self, hash: &str) -> Option<Node> {
517        self.objects.read(hash).ok()
518    }
519
520    /// Get a document AS OF a specific sequence number.
521    /// Walks the version chain (prev links) backward until seq <= target.
522    pub fn get_as_of(&self, coll: &str, id: &str, target_seq: u64) -> Option<Node> {
523        let hash = self.id_index.get(coll, id)?;
524        let mut current = self.objects.read(&hash).ok()?;
525        loop {
526            if current.seq <= target_seq {
527                return Some(current);
528            }
529            let prev_hash = current.prev.as_deref()?;
530            current = self.objects.read(prev_hash).ok()?;
531        }
532    }
533
534    /// List all documents in a collection, returning current versions.
535    pub fn list(&self, coll: &str) -> Vec<Node> {
536        self.id_index
537            .list_ids(coll)
538            .into_iter()
539            .filter_map(|id| self.get(coll, &id))
540            .collect()
541    }
542
543    /// ORDER BY field ASC LIMIT n — uses sorted index if available, else falls back to full scan.
544    pub fn order_by_asc(&self, coll: &str, field: &str, limit: usize) -> Vec<Node> {
545        if self.sorted_indexes.has(coll, field) {
546            self.sorted_indexes
547                .top_k_asc(coll, field, limit)
548                .into_iter()
549                .filter_map(|h| self.objects.read(&h).ok())
550                .collect()
551        } else {
552            let mut docs = self.list(coll);
553            docs.sort_by(|a, b| {
554                let av = a.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
555                let bv = b.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
556                av.cmp(&bv)
557            });
558            docs.truncate(limit);
559            docs
560        }
561    }
562
563    /// ORDER BY field DESC LIMIT n
564    pub fn order_by_desc(&self, coll: &str, field: &str, limit: usize) -> Vec<Node> {
565        if self.sorted_indexes.has(coll, field) {
566            self.sorted_indexes
567                .top_k_desc(coll, field, limit)
568                .into_iter()
569                .filter_map(|h| self.objects.read(&h).ok())
570                .collect()
571        } else {
572            let mut docs = self.list(coll);
573            docs.sort_by(|a, b| {
574                let av = a.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
575                let bv = b.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
576                bv.cmp(&av)
577            });
578            docs.truncate(limit);
579            docs
580        }
581    }
582
583    /// TRACE caused_by — walk causal graph from a node.
584    pub fn trace(&self, hash: &str, reverse: bool, limit: usize) -> Vec<Node> {
585        self.graph
586            .trace(hash, "caused_by", reverse, limit)
587            .into_iter()
588            .filter_map(|h| self.objects.read(&h).ok())
589            .collect()
590    }
591
592    /// Verify tamper-evidence of all objects.
593    pub fn verify(&self) -> (usize, Vec<String>) {
594        self.objects.verify_all()
595    }
596
597    /// Create a sorted index for a (coll, field) pair.
598    pub fn create_sorted_index(&self, coll: &str, field: &str) {
599        self.sorted_indexes.ensure(coll, field);
600        // Backfill from existing objects
601        for id in self.id_index.list_ids(coll) {
602            if let Some(node) = self.get(coll, &id) {
603                if let Value::Object(ref obj) = node.data {
604                    if let Some(value) = obj.get(field) {
605                        self.sorted_indexes.insert(coll, field, value, &node.hash);
606                    }
607                }
608            }
609        }
610    }
611
612    /// Resolve a sequence number to its content hash (v1 compatibility).
613    /// Only covers nodes written in the current process session + cold-scan nodes.
614    pub fn get_hash_by_seq(&self, seq: u64) -> Option<String> {
615        self.seq_index.get(&seq).map(|r| r.clone())
616    }
617
618    /// The tip — the most recently written node (highest seq), or `None` if the
619    /// database is empty. O(1): `self.seq` is the next-to-assign counter, so the
620    /// latest write sits at `seq - 1`; we resolve it through the same
621    /// seq_index → object-store path a normal read uses, so the returned Node is
622    /// byte-identical to one fetched by id or hash (it carries its own seq, hash,
623    /// causal links, and valid-time). This is the cheap "give me the latest write"
624    /// primitive — the head of the log, not an aggregate.
625    pub fn tip(&self) -> Option<Node> {
626        let next = self.seq.load(Ordering::SeqCst);
627        if next == 0 {
628            return None; // nothing written yet
629        }
630        // Fast path: resolve the head seq through the in-memory seq index
631        // (populated by this session's writes or by the cold scan).
632        if let Some(hash) = self.get_hash_by_seq(next - 1) {
633            return self.get_by_hash(&hash);
634        }
635        // Warm-boot fallback: the seq index is still cold (warm start skips the
636        // scan), but the tip's object hash was persisted in MANIFEST and restored
637        // on open. O(1), no scan — this is what makes tip() survive a restart.
638        let th = self.tip_hash.read().clone();
639        if !th.is_empty() {
640            return self.get_by_hash(&th);
641        }
642        None
643    }
644
645    /// The collection-local tip — the most recent write into `coll` (highest seq in
646    /// that collection), or `None` if the collection has no writes. O(1): resolves
647    /// through `coll_tip_hash`, a dedicated per-collection map kept current on every
648    /// write (`update_head`), restored from MANIFEST on warm boot, and rebuilt by the
649    /// cold scan — durable across restarts by construction, same contract as `tip()`
650    /// for the global head. Conceptually a different index than the global `tip()`
651    /// (global head vs collection head), kept as a separate method so each is
652    /// explicit — parity with the Python reference's `tip(coll)`. Lets a consumer
653    /// resume one chain (e.g. blocks / tx / utxo) without pulling global tip and
654    /// filtering.
655    pub fn tip_collection(&self, coll: &str) -> Option<Node> {
656        let hash = self.coll_tip_hash.get(coll)?.clone();
657        self.get_by_hash(&hash)
658    }
659
660    /// Changefeed page: up to `limit` nodes written AFTER `after_seq` (EXCLUSIVE),
661    /// ascending by seq, wrapped in a `SinceBatch` cursor envelope. `after_seq` is
662    /// the cursor you last applied (a prior `tip()` seq or `to_seq`). `limit` bounds
663    /// the page — `0` means DEFAULT_SINCE_LIMIT, so the engine primitive can never
664    /// materialize an unbounded batch even when embedders call it directly (the
665    /// safety is here, not only in the HTTP layer). Drain by paging while
666    /// `has_more`, advancing your cursor to `to_seq`, then hand off to the live
667    /// `subscribe` edge. The append-only log IS the changefeed, so this is an
668    /// O(page) walk; unresolved seqs (outside seq_index coverage — see
669    /// `scan_status()`) are skipped rather than faked.
670    pub fn since(&self, after_seq: u64, limit: usize) -> SinceBatch {
671        let next = self.seq.load(Ordering::SeqCst);          // head + 1
672        let head_seq = next.saturating_sub(1);
673        let cap = if limit == 0 { DEFAULT_SINCE_LIMIT } else { limit };
674        let mut nodes: Vec<Node> = Vec::new();
675        let mut to_seq = after_seq;
676        let mut hit_limit = false;
677        let mut s = after_seq.saturating_add(1);
678        while s < next {
679            if nodes.len() >= cap { hit_limit = true; break; }
680            if let Some(hash) = self.get_hash_by_seq(s) {
681                if let Some(node) = self.get_by_hash(&hash) {
682                    to_seq = node.seq;
683                    nodes.push(node);
684                }
685            }
686            s += 1;
687        }
688        SinceBatch { nodes, from_seq: after_seq, to_seq, head_seq, has_more: hit_limit }
689    }
690
691    /// Replication readiness — see `ScanStatus`. `scan_complete` gates safe
692    /// historical catch-up: a consumer pulling an old cursor right after a cold
693    /// start must wait for it, or `since()` may hand back a partial page that looks
694    /// like "caught up". Computes the indexed range by scanning the in-memory seq
695    /// index (O(index)) — intended for periodic status polls, not the per-write
696    /// hot path.
697    pub fn scan_status(&self) -> ScanStatus {
698        let next = self.seq.load(Ordering::SeqCst);
699        let mut min = u64::MAX;
700        let mut max = 0u64;
701        let mut count = 0usize;
702        for kv in self.seq_index.iter() {
703            let s = *kv.key();
704            if s < min { min = s; }
705            if s > max { max = s; }
706            count += 1;
707        }
708        if count == 0 { min = 0; }
709        ScanStatus {
710            scan_complete:   self.startup_ready.load(Ordering::SeqCst),
711            tip_seq:         next.saturating_sub(1),
712            indexed_seq_min: min,
713            indexed_seq_max: max,
714            indexed_count:   count,
715        }
716    }
717
718    /// Add an explicit named relation edge between two documents.
719    /// Add an explicit named relation between two "coll:id" nodes.
720    /// Relations stored as __links__ documents — NQL-queryable, time-travelable,
721    /// consistent with the PyO3 binding which uses the same __links__ convention.
722    pub fn link(&self, frm: &str, rel: &str, to: &str) -> Result<()> {
723        let (frm_coll, frm_id) = frm.split_once(':')
724            .ok_or_else(|| anyhow::anyhow!("link frm must be 'coll:id', got: {}", frm))?;
725        let (to_coll, to_id) = to.split_once(':')
726            .ok_or_else(|| anyhow::anyhow!("link to must be 'coll:id', got: {}", to))?;
727        if self.id_index.get(frm_coll, frm_id).is_none() {
728            anyhow::bail!("link: frm not found: {}", frm);
729        }
730        if self.id_index.get(to_coll, to_id).is_none() {
731            anyhow::bail!("link: to not found: {}", to);
732        }
733        let link_id = format!("{}|{}|{}", frm, rel, to);
734        let doc = serde_json::json!({"_from": frm, "_rel": rel, "_to": to});
735        self.put("__links__", &link_id, doc, vec![], None, None)?;
736        Ok(())
737    }
738
739    /// Remove a named relation (deletes the __links__ document).
740    pub fn unlink(&self, frm: &str, rel: &str, to: &str) -> Result<bool> {
741        let link_id = format!("{}|{}|{}", frm, rel, to);
742        self.delete("__links__", &link_id)
743    }
744
745    /// Get neighbor nodes via a named relation.
746    /// Queries __links__ — consistent with the PyO3 binding.
747    pub fn neighbors(&self, frm: &str, rel: &str) -> Vec<Node> {
748        self.id_index
749            .list_ids("__links__")
750            .into_iter()
751            .filter_map(|id| self.get("__links__", &id))
752            .filter(|node| {
753                node.data.get("_from").and_then(|v| v.as_str()) == Some(frm)
754                    && node.data.get("_rel").and_then(|v| v.as_str()) == Some(rel)
755            })
756            .filter_map(|node| {
757                let to = node.data.get("_to")?.as_str()?;
758                let (to_coll, to_id) = to.split_once(':')?;
759                self.get(to_coll, to_id)
760            })
761            .collect()
762    }
763}
764
765impl Drop for Db {
766    /// Flush buffered state when the database is closed so a write-then-drop
767    /// sequence is durable without an explicit `flush_all()`.
768    ///
769    /// `IdIndex::set` only stages updates in the in-memory WAL `write_buf`;
770    /// disk persistence happens in `flush_write_buf()`, normally driven by the
771    /// manifest ticker. A short-lived `Db` (a library user's `{ let db =
772    /// Db::open(p)?; db.put(..)?; }` block, or a test) has no ticker, so without
773    /// this its writes would be silently lost on reopen. Flushing on drop
774    /// mirrors the flush-on-close contract of other embedded stores (sled,
775    /// RocksDB).
776    ///
777    /// In production this is a harmless safety net, not the primary durability
778    /// path: the manifest ticker thread holds an `Arc<Db>` for the process
779    /// lifetime, so `Drop` only fires once every owning handle is gone. No-op
780    /// for in-memory databases (`flush_all` short-circuits on `:memory:`).
781    fn drop(&mut self) {
782        self.flush_all();
783    }
784}
785
786/// Background cold-scan worker. Takes Arc<Db> — safe, Db is on the heap.
787fn cold_scan_background_arc(db: Arc<Db>) {
788    use rayon::prelude::*;
789    use blake2::{Blake2b512, Digest};
790
791    let objects        = &db.objects;
792    let head           = &db.head;
793    let seq_atomic     = &db.seq;
794    let sorted_indexes = &db.sorted_indexes;
795    let seq_index      = &db.seq_index;
796    let root           = db.root.clone();
797    let ready_flag     = Arc::clone(&db.startup_ready);
798
799    let hashes: Vec<String> = objects.all_hashes().collect();
800    let total = hashes.len();
801
802    if total == 0 {
803        ready_flag.store(true, Ordering::SeqCst);
804        return;
805    }
806
807    println!("  [nedbd] background scan — {} objects...", total);
808    let t0 = std::time::Instant::now();
809    let step = (total / 10).max(1000);
810
811    // Populate the seq index AS objects are read here, not in a second pass
812    // afterward: this loop is the slow, disk-I/O-bound phase (verifying and
813    // parsing every object), and it can run for minutes on a multi-million
814    // object store. `scan_status().indexed_count` reads `seq_index`'s size, so
815    // inserting here — not after `.collect()` — is what makes that a real, live
816    // progress signal through the phase that actually takes the time, instead
817    // of reporting a flat 0 until this whole pass finishes. Safe: DashMap
818    // supports concurrent inserts, and every parallel worker here inserts a
819    // disjoint key (each object has its own seq).
820    let nodes: Vec<Node> = hashes.par_iter()
821        .enumerate()
822        .filter_map(|(i, h)| {
823            if i > 0 && i % step == 0 {
824                let pct     = i * 100 / total;
825                let elapsed = t0.elapsed().as_secs_f32();
826                let rate    = i as f32 / elapsed;
827                let eta     = (total - i) as f32 / rate;
828                eprint!("\r  [nedbd]   {:>3}%  {:>8} / {:>8}  ({:>8.0}/s  eta {:.0}s)   ",
829                    pct, i, total, rate, eta);
830            }
831            let node = objects.read(h).ok()?;
832            seq_index.insert(node.seq, node.hash.clone());
833            Some(node)
834        })
835        .collect();
836
837    eprintln!("\r  [nedbd]   100%  {:>8} / {:>8}  ({:.1}s)                        ",
838        total, total, t0.elapsed().as_secs_f32());
839
840    let max_seq = nodes.iter().map(|n| n.seq).max().unwrap_or(0);
841    seq_atomic.store(max_seq + 1, Ordering::SeqCst);
842
843    // Per-collection tip: highest-seq node's hash, per coll. `nodes` is NOT
844    // seq-ordered here (it comes from an unordered object-hash scan), so this
845    // must track the max explicitly — unlike the live write path's "last call
846    // wins" (which relies on ascending call order that a scan doesn't have).
847    let mut coll_max: std::collections::HashMap<String, (u64, String)> = std::collections::HashMap::new();
848
849    for node in &nodes {
850        // seq_index was already populated above, during the read pass.
851        coll_max.entry(node.coll.clone())
852            .and_modify(|(s, h)| if node.seq > *s { *s = node.seq; *h = node.hash.clone(); })
853            .or_insert_with(|| (node.seq, node.hash.clone()));
854        if let Value::Object(ref obj) = node.data {
855            for (field, value) in obj {
856                if sorted_indexes.has(&node.coll, field) {
857                    sorted_indexes.insert(&node.coll, field, value, &node.hash);
858                }
859            }
860        }
861    }
862
863    let coll_tips: std::collections::HashMap<String, String> = coll_max.into_iter()
864        .map(|(coll, (_seq, hash))| {
865            db.coll_tip_hash.insert(coll.clone(), hash.clone());
866            (coll, hash)
867        })
868        .collect();
869
870    // Compute Merkle head from sorted hashes
871    let mut sorted_hashes = hashes;
872    sorted_hashes.sort();
873    let mut h = Blake2b512::new();
874    h.update(max_seq.to_le_bytes());
875    for hash_str in &sorted_hashes {
876        h.update(hash_str.as_bytes());
877    }
878    let new_head = hex::encode(&h.finalize()[..32]);
879    *head.write() = new_head.clone();
880
881    // Tip = the highest-seq object we indexed. Persist its hash so tip() resolves
882    // O(1) on the next warm boot, before any scan repopulates the seq index.
883    let tip_hash = db.seq_index.iter()
884        .max_by_key(|kv| *kv.key())
885        .map(|kv| kv.value().clone())
886        .unwrap_or_default();
887    *db.tip_hash.write() = tip_hash.clone();
888
889    // Write MANIFEST atomically
890    let m     = Manifest { seq: max_seq, head: new_head, tip_hash, coll_tips };
891    let json  = serde_json::to_string(&m).unwrap_or_default();
892    let path  = root.join("MANIFEST");
893    let tmp   = root.join("MANIFEST.tmp");
894    let _ = fs::write(&tmp, &json);
895    let _ = fs::rename(&tmp, &path);
896
897    // Signal server: writes can now proceed
898    ready_flag.store(true, Ordering::SeqCst);
899    println!("  [nedbd] background scan complete — seq={} objects={} MANIFEST written", max_seq, total);
900}
901
902fn now() -> f64 {
903    std::time::SystemTime::now()
904        .duration_since(std::time::UNIX_EPOCH)
905        .map(|d| d.as_secs_f64())
906        .unwrap_or(0.0)
907}
908
909#[cfg(test)]
910mod tests {
911    use super::*;
912    use tempfile::tempdir;
913
914    #[test]
915    fn put_and_get() {
916        let dir = tempdir().unwrap();
917        let db = Db::open(dir.path(), None).unwrap();
918        db.put(
919            "blocks", "618000",
920            serde_json::json!({"height": 618000, "hash": "0000abc"}),
921            vec![], None, None,
922        ).unwrap();
923        let node = db.get("blocks", "618000").unwrap();
924        assert_eq!(node.id, "618000");
925        assert_eq!(node.data["height"], 618000);
926    }
927
928    #[test]
929    fn order_by_with_sorted_index() {
930        let dir = tempdir().unwrap();
931        let db = Db::open(dir.path(), None).unwrap();
932        db.create_sorted_index("blocks", "height");
933        for h in [3u64, 1, 5, 2, 4] {
934            db.put("blocks", &h.to_string(),
935                serde_json::json!({"height": h}),
936                vec![], None, None).unwrap();
937        }
938        let asc = db.order_by_asc("blocks", "height", 3);
939        let heights: Vec<u64> = asc.iter()
940            .filter_map(|n| n.data["height"].as_u64())
941            .collect();
942        assert_eq!(heights, vec![1, 2, 3]);
943    }
944
945    #[test]
946    fn causal_trace() {
947        let dir = tempdir().unwrap();
948        let db = Db::open(dir.path(), None).unwrap();
949        let a = db.put("ops", "a", serde_json::json!({"op": "create"}), vec![], None, None).unwrap();
950        let b = db.put("ops", "b", serde_json::json!({"op": "transfer"}), vec![a.hash.clone()], None, None).unwrap();
951        let c = db.put("ops", "c", serde_json::json!({"op": "burn"}), vec![b.hash.clone()], None, None).unwrap();
952
953        let trace = db.trace(&c.hash, false, 10);
954        assert_eq!(trace.len(), 3);  // c → b → a
955    }
956
957    #[test]
958    fn as_of() {
959        let dir = tempdir().unwrap();
960        let db = Db::open(dir.path(), None).unwrap();
961        let v1 = db.put("docs", "x", serde_json::json!({"v": 1}), vec![], None, None).unwrap();
962        let _v2 = db.put("docs", "x", serde_json::json!({"v": 2}), vec![], None, None).unwrap();
963
964        let at_v1 = db.get_as_of("docs", "x", v1.seq).unwrap();
965        assert_eq!(at_v1.data["v"], 1);
966        let current = db.get("docs", "x").unwrap();
967        assert_eq!(current.data["v"], 2);
968    }
969}
970
971#[cfg(test)]
972mod tests_v2 {
973    use super::*;
974    use tempfile::tempdir;
975
976    #[test]
977    fn seq_index_populated_on_put() {
978        let db = Db::in_memory();
979        let a = db.put("item", "a", serde_json::json!({"x": 1}), vec![], None, None).unwrap();
980        let b = db.put("item", "b", serde_json::json!({"x": 2}), vec![], None, None).unwrap();
981        assert_eq!(db.get_hash_by_seq(a.seq), Some(a.hash.clone()));
982        assert_eq!(db.get_hash_by_seq(b.seq), Some(b.hash.clone()));
983        assert_eq!(db.get_hash_by_seq(9999), None);
984    }
985
986    #[test]
987    fn tip_and_since() {
988        let db = Db::in_memory();
989        // Empty db: no tip, empty changefeed.
990        assert!(db.tip().is_none());
991        assert!(db.since(0, 0).nodes.is_empty());
992
993        let a = db.put("item", "a", serde_json::json!({"x": 1}), vec![], None, None).unwrap();
994        let b = db.put("item", "b", serde_json::json!({"x": 2}), vec![], None, None).unwrap();
995
996        // tip() = the most recent write (highest seq), returned as a full node.
997        let t = db.tip().expect("tip after writes");
998        assert_eq!(t.seq, b.seq);
999        assert_eq!(t.id, "b");
1000        assert_eq!(t.hash, b.hash);
1001
1002        // since(after_seq, limit) — EXCLUSIVE cursor, bounded page + envelope.
1003        let after_a = db.since(a.seq, 0);
1004        assert_eq!(after_a.nodes.len(), 1);
1005        assert_eq!(after_a.nodes[0].id, "b");
1006        assert_eq!(after_a.from_seq, a.seq);
1007        assert_eq!(after_a.to_seq, b.seq);
1008        assert_eq!(after_a.head_seq, b.seq);
1009        assert!(!after_a.has_more);
1010
1011        // Nothing written after the tip.
1012        assert!(db.since(b.seq, 0).nodes.is_empty());
1013
1014        // `limit` bounds the page and sets has_more; resume from to_seq.
1015        let c = db.put("item", "c", serde_json::json!({"x": 3}), vec![], None, None).unwrap();
1016        let page = db.since(a.seq, 1);             // (a..] capped at 1 -> [b], more pending
1017        assert_eq!(page.nodes.len(), 1);
1018        assert_eq!(page.nodes[0].id, "b");
1019        assert_eq!(page.to_seq, b.seq);
1020        assert!(page.has_more);
1021        let page2 = db.since(page.to_seq, 1);      // resume from b -> [c], done
1022        assert_eq!(page2.nodes.len(), 1);
1023        assert_eq!(page2.nodes[0].id, "c");
1024        assert_eq!(page2.to_seq, c.seq);
1025        assert!(!page2.has_more);
1026    }
1027
1028    #[test]
1029    fn tip_collection_per_chain() {
1030        // The ITC sync-client case: separate chains in separate collections; a
1031        // consumer resumes ONE without pulling global tip and filtering.
1032        let db = Db::in_memory();
1033        assert!(db.tip_collection("blocks").is_none());
1034
1035        db.put("blocks", "b0", serde_json::json!({"h": 0}), vec![], None, None).unwrap();
1036        db.put("tx",     "t0", serde_json::json!({"v": 1}), vec![], None, None).unwrap();
1037        let b1 = db.put("blocks", "b1", serde_json::json!({"h": 1}), vec![], None, None).unwrap();
1038        let t1 = db.put("tx",     "t1", serde_json::json!({"v": 2}), vec![], None, None).unwrap();
1039
1040        // global tip = latest write overall (t1)
1041        assert_eq!(db.tip().unwrap().id, "t1");
1042        // collection-local tips = latest write in each collection
1043        let bt = db.tip_collection("blocks").expect("blocks tip");
1044        assert_eq!(bt.id, "b1");
1045        assert_eq!(bt.seq, b1.seq);
1046        assert_eq!(db.tip_collection("tx").unwrap().seq, t1.seq);
1047        assert!(db.tip_collection("absent").is_none());
1048    }
1049
1050    #[test]
1051    fn seq_index_survives_batch() {
1052        let db = Db::in_memory();
1053        let nodes = db.put_batch(vec![
1054            ("item".into(), "x".into(), serde_json::json!({"v": 1}), vec![], None, None),
1055            ("item".into(), "y".into(), serde_json::json!({"v": 2}), vec![], None, None),
1056        ]).unwrap();
1057        for node in &nodes {
1058            assert_eq!(db.get_hash_by_seq(node.seq), Some(node.hash.clone()));
1059        }
1060    }
1061
1062    #[test]
1063    fn link_and_neighbors() {
1064        let db = Db::in_memory();
1065        db.put("driver", "d1", serde_json::json!({"name": "Bob"}),   vec![], None, None).unwrap();
1066        db.put("driver", "d2", serde_json::json!({"name": "Carol"}), vec![], None, None).unwrap();
1067        db.put("trip",   "t1", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
1068        db.put("trip",   "t2", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
1069
1070        db.link("driver:d1", "handles", "trip:t1").unwrap();
1071        db.link("driver:d1", "handles", "trip:t2").unwrap();
1072        db.link("driver:d2", "handles", "trip:t1").unwrap();
1073
1074        let d1_trips = db.neighbors("driver:d1", "handles");
1075        assert_eq!(d1_trips.len(), 2);
1076        let ids: std::collections::HashSet<&str> = d1_trips.iter().map(|n| n.id.as_str()).collect();
1077        assert!(ids.contains("t1") && ids.contains("t2"));
1078
1079        let d2_trips = db.neighbors("driver:d2", "handles");
1080        assert_eq!(d2_trips.len(), 1);
1081        assert_eq!(d2_trips[0].id, "t1");
1082    }
1083
1084    #[test]
1085    fn link_stored_in_links_collection() {
1086        // Links are stored as __links__ documents, not as graph edges.
1087        // The __links__ collection is NQL-queryable and consistent with the PyO3 binding.
1088        let db = Db::in_memory();
1089        db.put("driver", "d1", serde_json::json!({"name": "Bob"}),   vec![], None, None).unwrap();
1090        db.put("trip",   "t1", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
1091        db.link("driver:d1", "handles", "trip:t1").unwrap();
1092        // Verify the __links__ document was created
1093        let link_doc = db.get("__links__", "driver:d1|handles|trip:t1");
1094        assert!(link_doc.is_some(), "__links__ doc should exist");
1095        let doc = link_doc.unwrap();
1096        assert_eq!(doc.data["_from"], "driver:d1");
1097        assert_eq!(doc.data["_rel"],  "handles");
1098        assert_eq!(doc.data["_to"],   "trip:t1");
1099        // neighbors() resolves to the target node
1100        let nb = db.neighbors("driver:d1", "handles");
1101        assert_eq!(nb.len(), 1);
1102        assert_eq!(nb[0].id, "t1");
1103    }
1104
1105    #[test]
1106    fn link_missing_node_errors() {
1107        let db = Db::in_memory();
1108        db.put("driver", "d1", serde_json::json!({}), vec![], None, None).unwrap();
1109        assert!(db.link("driver:d1", "handles", "trip:ghost").is_err());
1110    }
1111
1112    #[test]
1113    fn link_durable_survives_reopen() {
1114        let dir = tempdir().unwrap();
1115        {
1116            let db = Db::open(dir.path(), None).unwrap();
1117            db.put("driver", "d1", serde_json::json!({"name": "Bob"}),   vec![], None, None).unwrap();
1118            db.put("trip",   "t1", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
1119            db.link("driver:d1", "handles", "trip:t1").unwrap();
1120        }
1121        let db2 = Db::open(dir.path(), None).unwrap();
1122        db2.startup_ready.store(true, std::sync::atomic::Ordering::SeqCst);
1123        let trips = db2.neighbors("driver:d1", "handles");
1124        assert_eq!(trips.len(), 1);
1125        assert_eq!(trips[0].id, "t1");
1126    }
1127
1128    #[test]
1129    fn tip_survives_warm_restart() {
1130        // v2.5.43: tip() returns the last written object AND survives a warm restart.
1131        // On reopen the seq_index is cold (warm start skips the scan), so tip() must
1132        // resolve the last write via the MANIFEST tip_hash fallback — no scan.
1133        let dir = tempdir().unwrap();
1134        {
1135            let db = Db::open(dir.path(), None).unwrap();
1136            db.put("blocks", "b1", serde_json::json!({"h": 1}), vec![], None, None).unwrap();
1137            db.put("blocks", "b2", serde_json::json!({"h": 2}), vec![], None, None).unwrap();
1138            db.flush_all(); // persists MANIFEST incl. tip_hash
1139            assert_eq!(db.tip().expect("tip in-session").id, "b2");
1140        }
1141        // Warm reopen: MANIFEST present -> no cold scan -> seq_index cold.
1142        let db2 = Db::open(dir.path(), None).unwrap();
1143        assert!(db2.get_hash_by_seq(1).is_none(), "seq_index is cold on a warm boot");
1144        let tip = db2.tip().expect("tip() must survive a warm restart");
1145        assert_eq!(tip.id, "b2");
1146        assert_eq!(tip.data.get("h").and_then(|v| v.as_i64()), Some(2));
1147    }
1148
1149    #[test]
1150    fn tip_collection_survives_warm_restart() {
1151        // Same contract as tip(), per collection: itc-node-rs resumes headers /
1152        // blocks / l2_receipts independently, so each must be its own durable
1153        // resume point — not just the global tip.
1154        let dir = tempdir().unwrap();
1155        {
1156            let db = Db::open(dir.path(), None).unwrap();
1157            db.put("blocks", "b1", serde_json::json!({"h": 1}), vec![], None, None).unwrap();
1158            db.put("tx",     "t1", serde_json::json!({"v": 1}), vec![], None, None).unwrap();
1159            let b2 = db.put("blocks", "b2", serde_json::json!({"h": 2}), vec![], None, None).unwrap();
1160            db.flush_all(); // persists MANIFEST incl. coll_tips
1161            assert_eq!(db.tip_collection("blocks").unwrap().id, "b2");
1162            assert_eq!(db.tip_collection("blocks").unwrap().seq, b2.seq);
1163        }
1164        // Warm reopen: MANIFEST present -> no cold scan -> seq_index cold.
1165        let db2 = Db::open(dir.path(), None).unwrap();
1166        assert!(db2.get_hash_by_seq(0).is_none(), "seq_index is cold on a warm boot");
1167        let blocks_tip = db2.tip_collection("blocks").expect("tip_collection must survive a warm restart");
1168        assert_eq!(blocks_tip.id, "b2");
1169        assert_eq!(blocks_tip.data.get("h").and_then(|v| v.as_i64()), Some(2));
1170        let tx_tip = db2.tip_collection("tx").expect("tx tip must also survive");
1171        assert_eq!(tx_tip.id, "t1");
1172        assert!(db2.tip_collection("absent").is_none());
1173    }
1174
1175    #[test]
1176    fn cold_scan_indexes_every_object_and_reports_completion() {
1177        // Regression guard for the cold-scan refactor: seq_index is now populated
1178        // DURING the parallel read pass (for live scan_status().indexed_count
1179        // progress — see cold_scan_background_arc), not in a second pass
1180        // afterward. This asserts the end state is unchanged: every written
1181        // object is indexed, tip()/tip_collection() are correct, and
1182        // scan_complete eventually reports true.
1183        let dir = tempdir().unwrap();
1184        let n = 25u64;
1185        {
1186            let db = Db::open(dir.path(), None).unwrap();
1187            for i in 0..n {
1188                db.put("things", &i.to_string(), serde_json::json!({"i": i}), vec![], None, None).unwrap();
1189            }
1190            db.flush_all();
1191        }
1192        // Force a COLD start regardless of the MANIFEST nedb-v2 itself would
1193        // have written: delete it so startup_rebuild() takes the cold path and
1194        // start_cold_scan() actually spawns the background scan this test needs
1195        // to exercise.
1196        std::fs::remove_file(dir.path().join("MANIFEST")).unwrap();
1197
1198        let db = Db::open(dir.path(), None).unwrap();
1199        assert!(!db.scan_status().scan_complete, "should be cold immediately after open");
1200        let db = std::sync::Arc::new(db);
1201        Db::start_cold_scan(std::sync::Arc::clone(&db));
1202
1203        let deadline = std::time::Instant::now() + std::time::Duration::from_secs(10);
1204        while !db.scan_status().scan_complete {
1205            assert!(std::time::Instant::now() < deadline, "cold scan did not complete in time");
1206            std::thread::sleep(std::time::Duration::from_millis(5));
1207        }
1208
1209        let status = db.scan_status();
1210        assert_eq!(status.indexed_count, n as usize, "every written object must be indexed");
1211        assert!(status.scan_complete);
1212
1213        let tip = db.tip().expect("tip resolves after cold scan");
1214        assert_eq!(tip.data.get("i").and_then(|v| v.as_u64()), Some(n - 1));
1215        let coll_tip = db.tip_collection("things").expect("tip_collection resolves after cold scan");
1216        assert_eq!(coll_tip.id, tip.id);
1217    }
1218}