Skip to main content

nedb_engine/
db.rs

1//! Main DAG database — coordinates ObjectStore, IdIndex, SortedIndexes, GraphStore.
2
3use std::fs;
4use std::path::{Path, PathBuf};
5use std::sync::Arc;
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7use anyhow::Result;
8use dashmap::DashMap;
9use serde_json::Value;
10use parking_lot::RwLock;
11
12use crate::store::{Dek, Node, ObjectStore};
13use crate::index::{IdIndex, OrderedValue, SortedIndexes};
14use crate::graph::GraphStore;
15use crate::migrate;
16
17/// MANIFEST: cached {seq, head} written atomically after every write.
18/// On startup, if MANIFEST exists and no sorted indexes need rebuilding,
19/// startup is O(1) — just read this one file instead of scanning all objects.
20#[derive(serde::Serialize, serde::Deserialize)]
21struct Manifest {
22    seq:  u64,
23    head: String,
24    /// Object hash of the highest-seq node at flush time. Lets `tip()` resolve the
25    /// last write O(1) on a warm boot — before any scan repopulates the in-memory
26    /// seq index. `#[serde(default)]` so pre-2.5.43 MANIFESTs (no field) still parse.
27    #[serde(default)]
28    tip_hash: String,
29    /// Per-collection tip: `coll -> object hash of the highest-seq node in that
30    /// collection`. Lets `tip_collection()` resolve O(1) on a warm boot, same
31    /// contract as `tip_hash` for the global head. `#[serde(default)]` so
32    /// pre-this-field MANIFESTs still parse (empty map — self-heals on next write
33    /// or cold scan).
34    #[serde(default)]
35    coll_tips: std::collections::HashMap<String, String>,
36}
37
38/// Default cap for `since()` when the caller passes `limit == 0`. Bounds the
39/// engine primitive itself so a stale/offline consumer can never force an
40/// unbounded materialization — the safety lives in the core, not the HTTP layer.
41pub const DEFAULT_SINCE_LIMIT: usize = 10_000;
42
43/// One page of the changefeed returned by `since()`. The replication contract:
44/// apply `nodes` in ascending seq order, advance your cursor to `to_seq`, and keep
45/// paging while `has_more` is true; then attach to the live `subscribe` edge.
46/// `head_seq` tells the consumer how far the log currently extends (how far behind
47/// it is).
48#[derive(Debug, Clone, serde::Serialize)]
49pub struct SinceBatch {
50    /// Writes in (`from_seq`, `to_seq`], ascending by seq.
51    pub nodes:    Vec<Node>,
52    /// The exclusive cursor this page started from (echoes the request).
53    pub from_seq: u64,
54    /// Seq of the last node in this page — the consumer's next cursor.
55    pub to_seq:   u64,
56    /// Current head seq of the log (latest committed write).
57    pub head_seq: u64,
58    /// True when more writes remain past `to_seq` (the page hit `limit`).
59    pub has_more: bool,
60}
61
62/// Replication readiness snapshot. `scan_complete` is the correctness gate: until
63/// the cold-scan finishes rebuilding the seq index, an old cursor passed to
64/// `since()` can return a PARTIAL page and look (wrongly) like "caught up". A
65/// correctness-critical consumer MUST wait for `scan_complete == true` before
66/// trusting historical catch-up. `indexed_seq_min/max` report the currently
67/// resolvable seq range; `tip_seq` is the log head.
68#[derive(Debug, Clone, serde::Serialize)]
69pub struct ScanStatus {
70    /// Cold-scan finished — historical seqs fully resolvable; catch-up is safe.
71    pub scan_complete:   bool,
72    /// Head seq of the log (latest committed write).
73    pub tip_seq:         u64,
74    /// Lowest seq currently in the seq index (0 if empty).
75    pub indexed_seq_min: u64,
76    /// Highest seq currently in the seq index.
77    pub indexed_seq_max: u64,
78    /// Number of seqs currently resolvable via the index.
79    pub indexed_count:   usize,
80}
81
82pub struct Db {
83    pub objects:        ObjectStore,
84    pub id_index:       IdIndex,
85    pub sorted_indexes: SortedIndexes,
86    pub graph:          GraphStore,
87    pub root:           PathBuf,
88    /// Dirty flag — set true when head changes, cleared after manifest flush.
89    /// Decouples flush_manifest from the hot write path so concurrent writes
90    /// don't serialise on 2× file I/O per PUT.
91    manifest_dirty:     Arc<AtomicBool>,
92    pub seq:            AtomicU64,
93    /// Cached Merkle head — updated incrementally on every write (O(1)).
94    head:               RwLock<String>,
95    /// Object hash of the most recent write (highest seq). Mirrors `head` but holds
96    /// the tip's content hash, so `tip()` can resolve the last node O(1) on a warm
97    /// boot when the in-memory `seq_index` is still cold. Persisted in MANIFEST.
98    tip_hash:           RwLock<String>,
99    /// Per-collection tip: `coll -> object hash of the highest-seq node in that
100    /// collection`. Kept current on every write (`update_head`), restored from
101    /// MANIFEST on warm boot, rebuilt by the cold scan — so `tip_collection()` is
102    /// O(1) and durable across restarts in every startup regime, by construction.
103    coll_tip_hash:      Arc<DashMap<String, String>>,
104    /// True once startup is fully ready (MANIFEST loaded or cold scan complete).
105    /// Warm starts set this true before returning from open().
106    /// Cold starts set this true in the background thread when scan completes.
107    /// Writes are held with 503 until this is true; reads always proceed.
108    pub startup_ready:  Arc<AtomicBool>,
109    /// Seq → hash lookup for v1 compatibility. Populated by put(), put_batch(),
110    /// and the cold-scan background pass. Only covers nodes from the current
111    /// process session + cold-scan; older seqs not in this map cannot be resolved.
112    seq_index:          Arc<DashMap<u64, String>>,
113}
114
115impl Db {
116    /// Create a pure in-memory database — no disk I/O, no migration, instant startup.
117    /// Perfect for tests, hot-cache layers, and ephemeral sessions.
118    /// All data is lost when the Db is dropped.
119    pub fn in_memory() -> Self {
120        Self {
121            objects:        ObjectStore::in_memory(),
122            id_index:       IdIndex::in_memory(),
123            sorted_indexes: SortedIndexes::new(),
124            graph:          GraphStore::in_memory(),
125            root:           std::path::PathBuf::from(":memory:"),
126            seq:            AtomicU64::new(0),
127            head:           RwLock::new(String::new()),
128            tip_hash:       RwLock::new(String::new()),
129            coll_tip_hash:  Arc::new(DashMap::new()),
130            startup_ready:  Arc::new(AtomicBool::new(true)),  // always ready
131            manifest_dirty: Arc::new(AtomicBool::new(false)),
132            seq_index:      Arc::new(DashMap::new()),
133        }
134    }
135
136    /// Open (or create) a database. Runs v1→v2 migration automatically if log.aof is present.
137    pub fn open(db_root: &Path, dek: Option<Dek>) -> Result<Self> {
138        std::fs::create_dir_all(db_root)?;
139
140        let objects        = ObjectStore::new(db_root, dek.clone())?;
141        let id_index       = IdIndex::new(db_root)?;
142        let sorted_indexes = SortedIndexes::new();
143        let graph          = GraphStore::new(db_root)?;
144
145        let mut db = Self {
146            objects,
147            id_index,
148            sorted_indexes,
149            graph,
150            root: db_root.to_path_buf(),
151            seq:  AtomicU64::new(0),
152            head: RwLock::new(String::new()),
153            tip_hash: RwLock::new(String::new()),
154            coll_tip_hash: Arc::new(DashMap::new()),
155            startup_ready:  Arc::new(AtomicBool::new(false)),
156            manifest_dirty: Arc::new(AtomicBool::new(false)),
157            seq_index:      Arc::new(DashMap::new()),
158        };
159
160        // Auto-migrate v1 → v2 if needed (pass DEK so encrypted AOFs convert correctly)
161        migrate::migrate_if_needed(
162            db_root,
163            &db.objects,
164            &db.id_index,
165            &db.sorted_indexes,
166            &db.graph,
167            dek.as_ref(),
168        )?;
169
170        // Fast startup: load seq+head from MANIFEST if no sorted indexes need rebuilding.
171        // Falls back to full object scan only when necessary (first open, or post-migration).
172        db.startup_rebuild()?;
173
174        Ok(db)
175    }
176
177    /// Smart startup:
178    /// - Warm (MANIFEST exists): O(1) load → startup_ready = true immediately.
179    /// - Cold (no MANIFEST): start server immediately, run scan in background thread.
180    ///   Writes return 503 until scan completes; reads always proceed.
181    fn startup_rebuild(&mut self) -> Result<()> {
182        let manifest_path = self.root.join("MANIFEST");
183        let needs_index_rebuild = !self.sorted_indexes.is_empty();
184
185        // Warm path: MANIFEST + no sorted indexes to rebuild → instant start
186        if manifest_path.exists() && !needs_index_rebuild {
187            if let Some(m) = fs::read_to_string(&manifest_path)
188                .ok()
189                .and_then(|s| serde_json::from_str::<Manifest>(&s).ok())
190            {
191                // Self-heal: MANIFEST with an empty or short head is corrupt/stale.
192                // Fall through to cold scan so the head is rebuilt correctly from objects.
193                if m.head.len() < 8 {
194                    eprintln!("  [nedbd] MANIFEST head invalid (len={}), self-healing via cold scan", m.head.len());
195                } else if m.tip_hash.is_empty() {
196                    // Pre-2.5.43 MANIFEST (no persisted tip). Cold-scan once to rebuild
197                    // the seq index and rewrite MANIFEST with tip_hash — warm + tip()-
198                    // durable on every boot thereafter.
199                    eprintln!("  [nedbd] MANIFEST predates durable tip() — cold scan once to upgrade");
200                } else {
201                    self.seq.store(m.seq, Ordering::SeqCst); // m.seq is already the next-to-assign counter
202                    *self.head.write() = m.head.clone();
203                    *self.tip_hash.write() = m.tip_hash.clone();
204                    for (coll, hash) in &m.coll_tips {
205                        self.coll_tip_hash.insert(coll.clone(), hash.clone());
206                    }
207                    self.startup_ready.store(true, Ordering::SeqCst);
208                    println!("  [nedbd] warm start — seq={} head={}... tip={}...",
209                        m.seq, &m.head[..8], &m.tip_hash[..8.min(m.tip_hash.len())]);
210                    return Ok(());
211                }
212            } else {
213                eprintln!("  [nedbd] MANIFEST corrupt or missing, falling back to cold scan");
214            }
215        }
216
217        // Cold path: mark as not ready, return immediately.
218        // The actual background scan is started by Db::start_cold_scan(arc)
219        // which is called from Manager::open_all() AFTER Arc::new(db) — when
220        // the Db is heap-allocated and its field addresses are permanently stable.
221        // Capturing field addresses here would cause UB: Db moves on return.
222        println!("  [nedbd] cold start — background scan will start after heap allocation");
223        Ok(())
224    }
225
226    /// Call this from Manager::open_all() after Arc::new(db).
227    /// Spawns the cold scan background thread with stable heap addresses.
228    /// No-op if startup is already complete (warm start).
229    pub fn start_cold_scan(self_arc: Arc<Self>) {
230        if self_arc.startup_ready.load(Ordering::SeqCst) {
231            return; // warm start — already ready
232        }
233        // Fast path: if the database is empty (new or just created), skip the
234        // background thread entirely. No objects to scan = instant startup.
235        if self_arc.objects.all_hashes().next().is_none() {
236            self_arc.startup_ready.store(true, Ordering::SeqCst);
237            return;
238        }
239        println!("  [nedbd] cold start — background scan starting, server accepting reads now");
240        std::thread::spawn(move || {
241            let db = self_arc;
242            cold_scan_background_arc(db);
243        });
244    }
245
246    /// Write a document. Returns the new node with its content hash set.
247    pub fn put(
248        &self,
249        coll: &str,
250        id: &str,
251        data: Value,
252        caused_by: Vec<String>,
253        valid_from: Option<String>,
254        valid_to:   Option<String>,
255    ) -> Result<Node> {
256        let seq  = self.seq.fetch_add(1, Ordering::SeqCst);
257        let prev = self.id_index.get(coll, id);
258
259        // Remove old node from sorted indexes (it's being superseded)
260        if let Some(old_hash) = &prev {
261            if let Ok(old_node) = self.objects.read(old_hash) {
262                if let Value::Object(ref obj) = old_node.data {
263                    for (field, value) in obj {
264                        self.sorted_indexes.remove(coll, field, value, old_hash);
265                    }
266                }
267            }
268        }
269
270        let mut node = Node {
271            id:         id.to_string(),
272            coll:       coll.to_string(),
273            seq,
274            data:       data.clone(),
275            prev,
276            caused_by:  caused_by.clone(),
277            ts:         now(),
278            valid_from,
279            valid_to,
280            hash:       String::new(),
281        };
282
283        // Write to object store (atomic, content-addressed)
284        let hash = self.objects.write(&mut node)?;
285        self.seq_index.insert(seq, hash.clone());
286
287        // Update id index (atomic file)
288        self.id_index.set(coll, id, &hash)?;
289
290        // Update sorted indexes
291        if let Value::Object(ref obj) = data {
292            for (field, value) in obj {
293                if self.sorted_indexes.has(coll, field) {
294                    self.sorted_indexes.insert(coll, field, value, &hash);
295                }
296            }
297        }
298
299        // Write causal graph edges
300        for cause in &caused_by {
301            self.graph.add_edge(&hash, "caused_by", cause)?;
302            self.graph.add_edge(cause, "caused_by_rev", &hash)?;
303        }
304
305        // Update running Merkle head: O(1) chain, no full recompute.
306        // new_head = BLAKE2b(prev_head || seq_bytes || new_object_hash)
307        self.update_head(coll, seq, &hash);
308
309        Ok(node)
310    }
311
312    /// Batch put: write N documents in parallel, preserving monotonic seq ordering.
313    /// Pre-allocates N seq numbers atomically, then parallelises object writes and
314    /// id-index updates via Rayon. Each op is independent — safe to parallelise.
315    /// Returns nodes in input order with assigned seq numbers.
316    pub fn put_batch(
317        &self,
318        ops: Vec<(String, String, Value, Vec<String>, Option<String>, Option<String>)>,
319        // (coll, id, data, caused_by, valid_from, valid_to)
320    ) -> Result<Vec<Node>> {
321        use rayon::prelude::*;
322
323        if ops.is_empty() { return Ok(vec![]); }
324        let n = ops.len() as u64;
325
326        // Pre-allocate N consecutive seq numbers — preserves ordering under concurrency
327        let base_seq = self.seq.fetch_add(n, Ordering::SeqCst);
328        let ts = now();
329
330        // Build nodes with assigned seq numbers
331        let mut nodes: Vec<Node> = ops.into_iter().enumerate().map(|(i, (coll, id, data, caused_by, valid_from, valid_to))| {
332            let prev = self.id_index.get(&coll, &id);
333            Node {
334                id, coll, seq: base_seq + i as u64,
335                data, prev, caused_by,
336                ts, valid_from, valid_to,
337                hash: String::new(),
338            }
339        }).collect();
340
341        // Parallel object writes (content-addressed, idempotent, safe to parallelise)
342        let write_errors: Vec<anyhow::Error> = nodes.par_iter_mut()
343            .filter_map(|node| self.objects.write(node).err())
344            .collect();
345        if let Some(e) = write_errors.into_iter().next() { return Err(e); }
346
347        // Parallel id-index updates
348        let index_errors: Vec<anyhow::Error> = nodes.par_iter()
349            .filter_map(|node| self.id_index.set(&node.coll, &node.id, &node.hash).err())
350            .collect();
351        if let Some(e) = index_errors.into_iter().next() { return Err(e); }
352
353        // Sorted indexes + causal graph (sequential — small overhead, usually no indexes)
354        for node in &nodes {
355            self.seq_index.insert(node.seq, node.hash.clone());
356            if let Value::Object(ref obj) = node.data {
357                for (field, value) in obj {
358                    if self.sorted_indexes.has(&node.coll, field) {
359                        self.sorted_indexes.insert(&node.coll, field, value, &node.hash);
360                    }
361                }
362            }
363            for cause in &node.caused_by {
364                self.graph.add_edge(&node.hash, "caused_by", cause).ok();
365                self.graph.add_edge(cause, "caused_by_rev", &node.hash).ok();
366            }
367        }
368
369        // Single Merkle head update for the whole batch (chain all hashes)
370        for node in &nodes {
371            self.update_head(&node.coll, node.seq, &node.hash);
372        }
373
374        Ok(nodes)
375    }
376
377    /// Update the running Merkle head with a new write. O(1), lock-free on the flush path.
378    /// Sets dirty flag — the background ticker calls flush_manifest periodically.
379    /// This removes 2× file I/O ops from the hot write path, unblocking concurrent writes.
380    fn update_head(&self, coll: &str, seq: u64, new_hash: &str) {
381        use blake2::{Blake2b512, Digest};
382        let prev = self.head.read().clone();
383        let mut h = Blake2b512::new();
384        h.update(prev.as_bytes());
385        h.update(seq.to_le_bytes());
386        h.update(new_hash.as_bytes());
387        *self.head.write() = hex::encode(&h.finalize()[..32]);
388        // Track the tip's object hash. update_head arrives in ascending seq order
389        // (put in seq order; put_batch loops the batch in order), so the last call
390        // wins → the highest-seq node's hash. Persisted in MANIFEST for warm-boot tip().
391        *self.tip_hash.write() = new_hash.to_string();
392        // Same contract, per-collection: last call for this coll wins → that
393        // collection's highest-seq hash. Persisted in MANIFEST for warm-boot
394        // tip_collection().
395        self.coll_tip_hash.insert(coll.to_string(), new_hash.to_string());
396        // Mark dirty — background ticker will flush to MANIFEST (no I/O on write path)
397        self.manifest_dirty.store(true, Ordering::Release);
398    }
399
400    /// Flush both the id-index WAL and MANIFEST. Used on graceful shutdown.
401    pub fn flush_all(&self) {
402        self.id_index.flush_write_buf();
403        // v3: fsync the active segment (no-op for loose/in-memory stores).
404        // One durability point per batch instead of one fsync per object.
405        if let Err(e) = self.objects.sync() {
406            eprintln!("nedb: segment sync failed: {}", e);
407        }
408        self.flush_manifest();
409    }
410
411    /// Compact the v3 packed object store: keep the CURRENT version of every
412    /// document (from the id-index) and reclaim everything else. No-op unless
413    /// running with the v3 segment substrate (`--dag-v3` / NEDB_DAG_V3).
414    ///
415    /// This is a PRUNING operation: superseded/historical object versions are
416    /// dropped, so AS OF / TRACE over pruned versions is discarded — that is
417    /// what reclaims the space. Flushes first so all data is durable on disk
418    /// before the old segments are deleted.
419    pub fn compact(&self) -> Result<crate::segment::CompactStats> {
420        self.flush_all();
421        let mut live: std::collections::HashSet<String> = std::collections::HashSet::new();
422        for coll in self.id_index.collections() {
423            for id in self.id_index.list_ids(&coll) {
424                if let Some(h) = self.id_index.get(&coll, &id) {
425                    live.insert(h);
426                }
427            }
428        }
429        self.objects.compact(&live)
430    }
431
432    /// Flush MANIFEST to disk if dirty. No-op for in-memory databases.
433    pub fn flush_manifest_if_dirty(&self) {
434        if self.root == std::path::PathBuf::from(":memory:") { return; }
435        if self.manifest_dirty.compare_exchange(
436            true, false, Ordering::AcqRel, Ordering::Relaxed
437        ).is_ok() {
438            self.flush_manifest();
439        }
440    }
441
442    /// Atomically persist current seq+head to MANIFEST. No-op for in-memory databases.
443    pub fn flush_manifest(&self) {
444        if self.root == std::path::PathBuf::from(":memory:") { return; }
445        let seq  = self.seq.load(Ordering::SeqCst);
446        let head = self.head.read().clone();
447        let tip_hash = self.tip_hash.read().clone();
448        let coll_tips: std::collections::HashMap<String, String> = self.coll_tip_hash
449            .iter()
450            .map(|kv| (kv.key().clone(), kv.value().clone()))
451            .collect();
452        let m = Manifest { seq, head, tip_hash, coll_tips };
453        if let Ok(json) = serde_json::to_string(&m) {
454            let path = self.root.join("MANIFEST");
455            let tmp  = self.root.join("MANIFEST.tmp");
456            let _ = fs::write(&tmp, &json);
457            let _ = fs::rename(&tmp, &path);
458        }
459    }
460
461    /// Start a background thread that flushes both the id-index WAL and MANIFEST
462    /// every `interval_ms` milliseconds.
463    /// Call this after Arc::new(db) — the Arc keeps Db alive for the thread's lifetime.
464    pub fn start_manifest_ticker(self_arc: Arc<Self>, interval_ms: u64) {
465        let db = self_arc;
466        std::thread::spawn(move || {
467            loop {
468                std::thread::sleep(std::time::Duration::from_millis(interval_ms));
469                // Flush id-index WAL to disk (parallel Rayon writes)
470                db.id_index.flush_write_buf();
471                // Then flush MANIFEST
472                db.flush_manifest_if_dirty();
473            }
474        });
475    }
476
477    /// Return the current Merkle head string. O(1) — read from cache.
478    pub fn head(&self) -> String {
479        self.head.read().clone()
480    }
481
482    /// Delete a document — writes a tombstone node and removes the id from the index.
483    /// The object history is preserved in the DAG; only the live id pointer is cleared.
484    pub fn delete(&self, coll: &str, id: &str) -> Result<bool> {
485        let prev = match self.id_index.get(coll, id) {
486            None => return Ok(false),   // already gone
487            Some(h) => h,
488        };
489        let seq = self.seq.fetch_add(1, Ordering::SeqCst);
490        let mut tombstone = Node {
491            id:         format!("_del_{}", id),
492            coll:       coll.to_string(),
493            seq,
494            data:       serde_json::json!({"_deleted": id, "_prev": prev}),
495            prev:       Some(prev),
496            caused_by:  vec![],
497            ts:         now(),
498            valid_from: None,
499            valid_to:   None,
500            hash:       String::new(),
501        };
502        let hash = self.objects.write(&mut tombstone)?;
503        self.update_head(coll, seq, &hash);
504        // Remove the live id pointer — doc is now invisible to queries and list()
505        self.id_index.remove(coll, id)?;
506        Ok(true)
507    }
508
509    /// Get the current version of a document by id.
510    pub fn get(&self, coll: &str, id: &str) -> Option<Node> {
511        let hash = self.id_index.get(coll, id)?;
512        self.objects.read(&hash).ok()
513    }
514
515    /// Get a specific version of a document by object hash.
516    pub fn get_by_hash(&self, hash: &str) -> Option<Node> {
517        self.objects.read(hash).ok()
518    }
519
520    /// Get a document AS OF a specific sequence number.
521    /// Walks the version chain (prev links) backward until seq <= target.
522    pub fn get_as_of(&self, coll: &str, id: &str, target_seq: u64) -> Option<Node> {
523        let hash = self.id_index.get(coll, id)?;
524        let mut current = self.objects.read(&hash).ok()?;
525        loop {
526            if current.seq <= target_seq {
527                return Some(current);
528            }
529            let prev_hash = current.prev.as_deref()?;
530            current = self.objects.read(prev_hash).ok()?;
531        }
532    }
533
534    /// List all documents in a collection, returning current versions.
535    pub fn list(&self, coll: &str) -> Vec<Node> {
536        self.id_index
537            .list_ids(coll)
538            .into_iter()
539            .filter_map(|id| self.get(coll, &id))
540            .collect()
541    }
542
543    /// ORDER BY field ASC LIMIT n — uses sorted index if available, else falls back to full scan.
544    pub fn order_by_asc(&self, coll: &str, field: &str, limit: usize) -> Vec<Node> {
545        if self.sorted_indexes.has(coll, field) {
546            self.sorted_indexes
547                .top_k_asc(coll, field, limit)
548                .into_iter()
549                .filter_map(|h| self.objects.read(&h).ok())
550                .collect()
551        } else {
552            let mut docs = self.list(coll);
553            docs.sort_by(|a, b| {
554                let av = a.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
555                let bv = b.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
556                av.cmp(&bv)
557            });
558            docs.truncate(limit);
559            docs
560        }
561    }
562
563    /// ORDER BY field DESC LIMIT n
564    pub fn order_by_desc(&self, coll: &str, field: &str, limit: usize) -> Vec<Node> {
565        if self.sorted_indexes.has(coll, field) {
566            self.sorted_indexes
567                .top_k_desc(coll, field, limit)
568                .into_iter()
569                .filter_map(|h| self.objects.read(&h).ok())
570                .collect()
571        } else {
572            let mut docs = self.list(coll);
573            docs.sort_by(|a, b| {
574                let av = a.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
575                let bv = b.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
576                bv.cmp(&av)
577            });
578            docs.truncate(limit);
579            docs
580        }
581    }
582
583    /// TRACE caused_by — walk causal graph from a node.
584    pub fn trace(&self, hash: &str, reverse: bool, limit: usize) -> Vec<Node> {
585        self.graph
586            .trace(hash, "caused_by", reverse, limit)
587            .into_iter()
588            .filter_map(|h| self.objects.read(&h).ok())
589            .collect()
590    }
591
592    /// Verify tamper-evidence of all objects.
593    pub fn verify(&self) -> (usize, Vec<String>) {
594        self.objects.verify_all()
595    }
596
597    /// Create a sorted index for a (coll, field) pair.
598    pub fn create_sorted_index(&self, coll: &str, field: &str) {
599        self.sorted_indexes.ensure(coll, field);
600        // Backfill from existing objects
601        for id in self.id_index.list_ids(coll) {
602            if let Some(node) = self.get(coll, &id) {
603                if let Value::Object(ref obj) = node.data {
604                    if let Some(value) = obj.get(field) {
605                        self.sorted_indexes.insert(coll, field, value, &node.hash);
606                    }
607                }
608            }
609        }
610    }
611
612    /// Resolve a sequence number to its content hash (v1 compatibility).
613    /// Only covers nodes written in the current process session + cold-scan nodes.
614    pub fn get_hash_by_seq(&self, seq: u64) -> Option<String> {
615        self.seq_index.get(&seq).map(|r| r.clone())
616    }
617
618    /// The tip — the most recently written node (highest seq), or `None` if the
619    /// database is empty. O(1): `self.seq` is the next-to-assign counter, so the
620    /// latest write sits at `seq - 1`; we resolve it through the same
621    /// seq_index → object-store path a normal read uses, so the returned Node is
622    /// byte-identical to one fetched by id or hash (it carries its own seq, hash,
623    /// causal links, and valid-time). This is the cheap "give me the latest write"
624    /// primitive — the head of the log, not an aggregate.
625    pub fn tip(&self) -> Option<Node> {
626        let next = self.seq.load(Ordering::SeqCst);
627        if next == 0 {
628            return None; // nothing written yet
629        }
630        // Fast path: resolve the head seq through the in-memory seq index
631        // (populated by this session's writes or by the cold scan).
632        if let Some(hash) = self.get_hash_by_seq(next - 1) {
633            return self.get_by_hash(&hash);
634        }
635        // Warm-boot fallback: the seq index is still cold (warm start skips the
636        // scan), but the tip's object hash was persisted in MANIFEST and restored
637        // on open. O(1), no scan — this is what makes tip() survive a restart.
638        let th = self.tip_hash.read().clone();
639        if !th.is_empty() {
640            return self.get_by_hash(&th);
641        }
642        None
643    }
644
645    /// The collection-local tip — the most recent write into `coll` (highest seq in
646    /// that collection), or `None` if the collection has no writes. O(1): resolves
647    /// through `coll_tip_hash`, a dedicated per-collection map kept current on every
648    /// write (`update_head`), restored from MANIFEST on warm boot, and rebuilt by the
649    /// cold scan — durable across restarts by construction, same contract as `tip()`
650    /// for the global head. Conceptually a different index than the global `tip()`
651    /// (global head vs collection head), kept as a separate method so each is
652    /// explicit — parity with the Python reference's `tip(coll)`. Lets a consumer
653    /// resume one chain (e.g. blocks / tx / utxo) without pulling global tip and
654    /// filtering.
655    pub fn tip_collection(&self, coll: &str) -> Option<Node> {
656        let hash = self.coll_tip_hash.get(coll)?.clone();
657        self.get_by_hash(&hash)
658    }
659
660    /// Changefeed page: up to `limit` nodes written AFTER `after_seq` (EXCLUSIVE),
661    /// ascending by seq, wrapped in a `SinceBatch` cursor envelope. `after_seq` is
662    /// the cursor you last applied (a prior `tip()` seq or `to_seq`). `limit` bounds
663    /// the page — `0` means DEFAULT_SINCE_LIMIT, so the engine primitive can never
664    /// materialize an unbounded batch even when embedders call it directly (the
665    /// safety is here, not only in the HTTP layer). Drain by paging while
666    /// `has_more`, advancing your cursor to `to_seq`, then hand off to the live
667    /// `subscribe` edge. The append-only log IS the changefeed, so this is an
668    /// O(page) walk; unresolved seqs (outside seq_index coverage — see
669    /// `scan_status()`) are skipped rather than faked.
670    pub fn since(&self, after_seq: u64, limit: usize) -> SinceBatch {
671        let next = self.seq.load(Ordering::SeqCst);          // head + 1
672        let head_seq = next.saturating_sub(1);
673        let cap = if limit == 0 { DEFAULT_SINCE_LIMIT } else { limit };
674        let mut nodes: Vec<Node> = Vec::new();
675        let mut to_seq = after_seq;
676        let mut hit_limit = false;
677        let mut s = after_seq.saturating_add(1);
678        while s < next {
679            if nodes.len() >= cap { hit_limit = true; break; }
680            if let Some(hash) = self.get_hash_by_seq(s) {
681                if let Some(node) = self.get_by_hash(&hash) {
682                    to_seq = node.seq;
683                    nodes.push(node);
684                }
685            }
686            s += 1;
687        }
688        SinceBatch { nodes, from_seq: after_seq, to_seq, head_seq, has_more: hit_limit }
689    }
690
691    /// Replication readiness — see `ScanStatus`. `scan_complete` gates safe
692    /// historical catch-up: a consumer pulling an old cursor right after a cold
693    /// start must wait for it, or `since()` may hand back a partial page that looks
694    /// like "caught up". Computes the indexed range by scanning the in-memory seq
695    /// index (O(index)) — intended for periodic status polls, not the per-write
696    /// hot path.
697    pub fn scan_status(&self) -> ScanStatus {
698        let next = self.seq.load(Ordering::SeqCst);
699        let mut min = u64::MAX;
700        let mut max = 0u64;
701        let mut count = 0usize;
702        for kv in self.seq_index.iter() {
703            let s = *kv.key();
704            if s < min { min = s; }
705            if s > max { max = s; }
706            count += 1;
707        }
708        if count == 0 { min = 0; }
709        ScanStatus {
710            scan_complete:   self.startup_ready.load(Ordering::SeqCst),
711            tip_seq:         next.saturating_sub(1),
712            indexed_seq_min: min,
713            indexed_seq_max: max,
714            indexed_count:   count,
715        }
716    }
717
718    /// Add an explicit named relation edge between two documents.
719    /// Add an explicit named relation between two "coll:id" nodes.
720    /// Relations stored as __links__ documents — NQL-queryable, time-travelable,
721    /// consistent with the PyO3 binding which uses the same __links__ convention.
722    pub fn link(&self, frm: &str, rel: &str, to: &str) -> Result<()> {
723        let (frm_coll, frm_id) = frm.split_once(':')
724            .ok_or_else(|| anyhow::anyhow!("link frm must be 'coll:id', got: {}", frm))?;
725        let (to_coll, to_id) = to.split_once(':')
726            .ok_or_else(|| anyhow::anyhow!("link to must be 'coll:id', got: {}", to))?;
727        if self.id_index.get(frm_coll, frm_id).is_none() {
728            anyhow::bail!("link: frm not found: {}", frm);
729        }
730        if self.id_index.get(to_coll, to_id).is_none() {
731            anyhow::bail!("link: to not found: {}", to);
732        }
733        let link_id = format!("{}|{}|{}", frm, rel, to);
734        let doc = serde_json::json!({"_from": frm, "_rel": rel, "_to": to});
735        self.put("__links__", &link_id, doc, vec![], None, None)?;
736        Ok(())
737    }
738
739    /// Remove a named relation (deletes the __links__ document).
740    pub fn unlink(&self, frm: &str, rel: &str, to: &str) -> Result<bool> {
741        let link_id = format!("{}|{}|{}", frm, rel, to);
742        self.delete("__links__", &link_id)
743    }
744
745    /// Get neighbor nodes via a named relation.
746    /// Queries __links__ — consistent with the PyO3 binding.
747    pub fn neighbors(&self, frm: &str, rel: &str) -> Vec<Node> {
748        self.id_index
749            .list_ids("__links__")
750            .into_iter()
751            .filter_map(|id| self.get("__links__", &id))
752            .filter(|node| {
753                node.data.get("_from").and_then(|v| v.as_str()) == Some(frm)
754                    && node.data.get("_rel").and_then(|v| v.as_str()) == Some(rel)
755            })
756            .filter_map(|node| {
757                let to = node.data.get("_to")?.as_str()?;
758                let (to_coll, to_id) = to.split_once(':')?;
759                self.get(to_coll, to_id)
760            })
761            .collect()
762    }
763}
764
765impl Drop for Db {
766    /// Flush buffered state when the database is closed so a write-then-drop
767    /// sequence is durable without an explicit `flush_all()`.
768    ///
769    /// `IdIndex::set` only stages updates in the in-memory WAL `write_buf`;
770    /// disk persistence happens in `flush_write_buf()`, normally driven by the
771    /// manifest ticker. A short-lived `Db` (a library user's `{ let db =
772    /// Db::open(p)?; db.put(..)?; }` block, or a test) has no ticker, so without
773    /// this its writes would be silently lost on reopen. Flushing on drop
774    /// mirrors the flush-on-close contract of other embedded stores (sled,
775    /// RocksDB).
776    ///
777    /// In production this is a harmless safety net, not the primary durability
778    /// path: the manifest ticker thread holds an `Arc<Db>` for the process
779    /// lifetime, so `Drop` only fires once every owning handle is gone. No-op
780    /// for in-memory databases (`flush_all` short-circuits on `:memory:`).
781    fn drop(&mut self) {
782        self.flush_all();
783    }
784}
785
786/// Background cold-scan worker. Takes Arc<Db> — safe, Db is on the heap.
787fn cold_scan_background_arc(db: Arc<Db>) {
788    use rayon::prelude::*;
789    use blake2::{Blake2b512, Digest};
790
791    let objects        = &db.objects;
792    let head           = &db.head;
793    let seq_atomic     = &db.seq;
794    let sorted_indexes = &db.sorted_indexes;
795    let root           = db.root.clone();
796    let ready_flag     = Arc::clone(&db.startup_ready);
797
798    let hashes: Vec<String> = objects.all_hashes().collect();
799    let total = hashes.len();
800
801    if total == 0 {
802        ready_flag.store(true, Ordering::SeqCst);
803        return;
804    }
805
806    println!("  [nedbd] background scan — {} objects...", total);
807    let t0 = std::time::Instant::now();
808    let step = (total / 10).max(1000);
809
810    let nodes: Vec<Node> = hashes.par_iter()
811        .enumerate()
812        .filter_map(|(i, h)| {
813            if i > 0 && i % step == 0 {
814                let pct     = i * 100 / total;
815                let elapsed = t0.elapsed().as_secs_f32();
816                let rate    = i as f32 / elapsed;
817                let eta     = (total - i) as f32 / rate;
818                eprint!("\r  [nedbd]   {:>3}%  {:>8} / {:>8}  ({:>8.0}/s  eta {:.0}s)   ",
819                    pct, i, total, rate, eta);
820            }
821            objects.read(h).ok()
822        })
823        .collect();
824
825    eprintln!("\r  [nedbd]   100%  {:>8} / {:>8}  ({:.1}s)                        ",
826        total, total, t0.elapsed().as_secs_f32());
827
828    let max_seq = nodes.iter().map(|n| n.seq).max().unwrap_or(0);
829    seq_atomic.store(max_seq + 1, Ordering::SeqCst);
830
831    // Per-collection tip: highest-seq node's hash, per coll. `nodes` is NOT
832    // seq-ordered here (it comes from an unordered object-hash scan), so this
833    // must track the max explicitly — unlike the live write path's "last call
834    // wins" (which relies on ascending call order that a scan doesn't have).
835    let mut coll_max: std::collections::HashMap<String, (u64, String)> = std::collections::HashMap::new();
836
837    for node in &nodes {
838        db.seq_index.insert(node.seq, node.hash.clone());
839        coll_max.entry(node.coll.clone())
840            .and_modify(|(s, h)| if node.seq > *s { *s = node.seq; *h = node.hash.clone(); })
841            .or_insert_with(|| (node.seq, node.hash.clone()));
842        if let Value::Object(ref obj) = node.data {
843            for (field, value) in obj {
844                if sorted_indexes.has(&node.coll, field) {
845                    sorted_indexes.insert(&node.coll, field, value, &node.hash);
846                }
847            }
848        }
849    }
850
851    let coll_tips: std::collections::HashMap<String, String> = coll_max.into_iter()
852        .map(|(coll, (_seq, hash))| {
853            db.coll_tip_hash.insert(coll.clone(), hash.clone());
854            (coll, hash)
855        })
856        .collect();
857
858    // Compute Merkle head from sorted hashes
859    let mut sorted_hashes = hashes;
860    sorted_hashes.sort();
861    let mut h = Blake2b512::new();
862    h.update(max_seq.to_le_bytes());
863    for hash_str in &sorted_hashes {
864        h.update(hash_str.as_bytes());
865    }
866    let new_head = hex::encode(&h.finalize()[..32]);
867    *head.write() = new_head.clone();
868
869    // Tip = the highest-seq object we indexed. Persist its hash so tip() resolves
870    // O(1) on the next warm boot, before any scan repopulates the seq index.
871    let tip_hash = db.seq_index.iter()
872        .max_by_key(|kv| *kv.key())
873        .map(|kv| kv.value().clone())
874        .unwrap_or_default();
875    *db.tip_hash.write() = tip_hash.clone();
876
877    // Write MANIFEST atomically
878    let m     = Manifest { seq: max_seq, head: new_head, tip_hash, coll_tips };
879    let json  = serde_json::to_string(&m).unwrap_or_default();
880    let path  = root.join("MANIFEST");
881    let tmp   = root.join("MANIFEST.tmp");
882    let _ = fs::write(&tmp, &json);
883    let _ = fs::rename(&tmp, &path);
884
885    // Signal server: writes can now proceed
886    ready_flag.store(true, Ordering::SeqCst);
887    println!("  [nedbd] background scan complete — seq={} objects={} MANIFEST written", max_seq, total);
888}
889
890fn now() -> f64 {
891    std::time::SystemTime::now()
892        .duration_since(std::time::UNIX_EPOCH)
893        .map(|d| d.as_secs_f64())
894        .unwrap_or(0.0)
895}
896
897#[cfg(test)]
898mod tests {
899    use super::*;
900    use tempfile::tempdir;
901
902    #[test]
903    fn put_and_get() {
904        let dir = tempdir().unwrap();
905        let db = Db::open(dir.path(), None).unwrap();
906        db.put(
907            "blocks", "618000",
908            serde_json::json!({"height": 618000, "hash": "0000abc"}),
909            vec![], None, None,
910        ).unwrap();
911        let node = db.get("blocks", "618000").unwrap();
912        assert_eq!(node.id, "618000");
913        assert_eq!(node.data["height"], 618000);
914    }
915
916    #[test]
917    fn order_by_with_sorted_index() {
918        let dir = tempdir().unwrap();
919        let db = Db::open(dir.path(), None).unwrap();
920        db.create_sorted_index("blocks", "height");
921        for h in [3u64, 1, 5, 2, 4] {
922            db.put("blocks", &h.to_string(),
923                serde_json::json!({"height": h}),
924                vec![], None, None).unwrap();
925        }
926        let asc = db.order_by_asc("blocks", "height", 3);
927        let heights: Vec<u64> = asc.iter()
928            .filter_map(|n| n.data["height"].as_u64())
929            .collect();
930        assert_eq!(heights, vec![1, 2, 3]);
931    }
932
933    #[test]
934    fn causal_trace() {
935        let dir = tempdir().unwrap();
936        let db = Db::open(dir.path(), None).unwrap();
937        let a = db.put("ops", "a", serde_json::json!({"op": "create"}), vec![], None, None).unwrap();
938        let b = db.put("ops", "b", serde_json::json!({"op": "transfer"}), vec![a.hash.clone()], None, None).unwrap();
939        let c = db.put("ops", "c", serde_json::json!({"op": "burn"}), vec![b.hash.clone()], None, None).unwrap();
940
941        let trace = db.trace(&c.hash, false, 10);
942        assert_eq!(trace.len(), 3);  // c → b → a
943    }
944
945    #[test]
946    fn as_of() {
947        let dir = tempdir().unwrap();
948        let db = Db::open(dir.path(), None).unwrap();
949        let v1 = db.put("docs", "x", serde_json::json!({"v": 1}), vec![], None, None).unwrap();
950        let _v2 = db.put("docs", "x", serde_json::json!({"v": 2}), vec![], None, None).unwrap();
951
952        let at_v1 = db.get_as_of("docs", "x", v1.seq).unwrap();
953        assert_eq!(at_v1.data["v"], 1);
954        let current = db.get("docs", "x").unwrap();
955        assert_eq!(current.data["v"], 2);
956    }
957}
958
959#[cfg(test)]
960mod tests_v2 {
961    use super::*;
962    use tempfile::tempdir;
963
964    #[test]
965    fn seq_index_populated_on_put() {
966        let db = Db::in_memory();
967        let a = db.put("item", "a", serde_json::json!({"x": 1}), vec![], None, None).unwrap();
968        let b = db.put("item", "b", serde_json::json!({"x": 2}), vec![], None, None).unwrap();
969        assert_eq!(db.get_hash_by_seq(a.seq), Some(a.hash.clone()));
970        assert_eq!(db.get_hash_by_seq(b.seq), Some(b.hash.clone()));
971        assert_eq!(db.get_hash_by_seq(9999), None);
972    }
973
974    #[test]
975    fn tip_and_since() {
976        let db = Db::in_memory();
977        // Empty db: no tip, empty changefeed.
978        assert!(db.tip().is_none());
979        assert!(db.since(0, 0).nodes.is_empty());
980
981        let a = db.put("item", "a", serde_json::json!({"x": 1}), vec![], None, None).unwrap();
982        let b = db.put("item", "b", serde_json::json!({"x": 2}), vec![], None, None).unwrap();
983
984        // tip() = the most recent write (highest seq), returned as a full node.
985        let t = db.tip().expect("tip after writes");
986        assert_eq!(t.seq, b.seq);
987        assert_eq!(t.id, "b");
988        assert_eq!(t.hash, b.hash);
989
990        // since(after_seq, limit) — EXCLUSIVE cursor, bounded page + envelope.
991        let after_a = db.since(a.seq, 0);
992        assert_eq!(after_a.nodes.len(), 1);
993        assert_eq!(after_a.nodes[0].id, "b");
994        assert_eq!(after_a.from_seq, a.seq);
995        assert_eq!(after_a.to_seq, b.seq);
996        assert_eq!(after_a.head_seq, b.seq);
997        assert!(!after_a.has_more);
998
999        // Nothing written after the tip.
1000        assert!(db.since(b.seq, 0).nodes.is_empty());
1001
1002        // `limit` bounds the page and sets has_more; resume from to_seq.
1003        let c = db.put("item", "c", serde_json::json!({"x": 3}), vec![], None, None).unwrap();
1004        let page = db.since(a.seq, 1);             // (a..] capped at 1 -> [b], more pending
1005        assert_eq!(page.nodes.len(), 1);
1006        assert_eq!(page.nodes[0].id, "b");
1007        assert_eq!(page.to_seq, b.seq);
1008        assert!(page.has_more);
1009        let page2 = db.since(page.to_seq, 1);      // resume from b -> [c], done
1010        assert_eq!(page2.nodes.len(), 1);
1011        assert_eq!(page2.nodes[0].id, "c");
1012        assert_eq!(page2.to_seq, c.seq);
1013        assert!(!page2.has_more);
1014    }
1015
1016    #[test]
1017    fn tip_collection_per_chain() {
1018        // The ITC sync-client case: separate chains in separate collections; a
1019        // consumer resumes ONE without pulling global tip and filtering.
1020        let db = Db::in_memory();
1021        assert!(db.tip_collection("blocks").is_none());
1022
1023        db.put("blocks", "b0", serde_json::json!({"h": 0}), vec![], None, None).unwrap();
1024        db.put("tx",     "t0", serde_json::json!({"v": 1}), vec![], None, None).unwrap();
1025        let b1 = db.put("blocks", "b1", serde_json::json!({"h": 1}), vec![], None, None).unwrap();
1026        let t1 = db.put("tx",     "t1", serde_json::json!({"v": 2}), vec![], None, None).unwrap();
1027
1028        // global tip = latest write overall (t1)
1029        assert_eq!(db.tip().unwrap().id, "t1");
1030        // collection-local tips = latest write in each collection
1031        let bt = db.tip_collection("blocks").expect("blocks tip");
1032        assert_eq!(bt.id, "b1");
1033        assert_eq!(bt.seq, b1.seq);
1034        assert_eq!(db.tip_collection("tx").unwrap().seq, t1.seq);
1035        assert!(db.tip_collection("absent").is_none());
1036    }
1037
1038    #[test]
1039    fn seq_index_survives_batch() {
1040        let db = Db::in_memory();
1041        let nodes = db.put_batch(vec![
1042            ("item".into(), "x".into(), serde_json::json!({"v": 1}), vec![], None, None),
1043            ("item".into(), "y".into(), serde_json::json!({"v": 2}), vec![], None, None),
1044        ]).unwrap();
1045        for node in &nodes {
1046            assert_eq!(db.get_hash_by_seq(node.seq), Some(node.hash.clone()));
1047        }
1048    }
1049
1050    #[test]
1051    fn link_and_neighbors() {
1052        let db = Db::in_memory();
1053        db.put("driver", "d1", serde_json::json!({"name": "Bob"}),   vec![], None, None).unwrap();
1054        db.put("driver", "d2", serde_json::json!({"name": "Carol"}), vec![], None, None).unwrap();
1055        db.put("trip",   "t1", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
1056        db.put("trip",   "t2", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
1057
1058        db.link("driver:d1", "handles", "trip:t1").unwrap();
1059        db.link("driver:d1", "handles", "trip:t2").unwrap();
1060        db.link("driver:d2", "handles", "trip:t1").unwrap();
1061
1062        let d1_trips = db.neighbors("driver:d1", "handles");
1063        assert_eq!(d1_trips.len(), 2);
1064        let ids: std::collections::HashSet<&str> = d1_trips.iter().map(|n| n.id.as_str()).collect();
1065        assert!(ids.contains("t1") && ids.contains("t2"));
1066
1067        let d2_trips = db.neighbors("driver:d2", "handles");
1068        assert_eq!(d2_trips.len(), 1);
1069        assert_eq!(d2_trips[0].id, "t1");
1070    }
1071
1072    #[test]
1073    fn link_stored_in_links_collection() {
1074        // Links are stored as __links__ documents, not as graph edges.
1075        // The __links__ collection is NQL-queryable and consistent with the PyO3 binding.
1076        let db = Db::in_memory();
1077        db.put("driver", "d1", serde_json::json!({"name": "Bob"}),   vec![], None, None).unwrap();
1078        db.put("trip",   "t1", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
1079        db.link("driver:d1", "handles", "trip:t1").unwrap();
1080        // Verify the __links__ document was created
1081        let link_doc = db.get("__links__", "driver:d1|handles|trip:t1");
1082        assert!(link_doc.is_some(), "__links__ doc should exist");
1083        let doc = link_doc.unwrap();
1084        assert_eq!(doc.data["_from"], "driver:d1");
1085        assert_eq!(doc.data["_rel"],  "handles");
1086        assert_eq!(doc.data["_to"],   "trip:t1");
1087        // neighbors() resolves to the target node
1088        let nb = db.neighbors("driver:d1", "handles");
1089        assert_eq!(nb.len(), 1);
1090        assert_eq!(nb[0].id, "t1");
1091    }
1092
1093    #[test]
1094    fn link_missing_node_errors() {
1095        let db = Db::in_memory();
1096        db.put("driver", "d1", serde_json::json!({}), vec![], None, None).unwrap();
1097        assert!(db.link("driver:d1", "handles", "trip:ghost").is_err());
1098    }
1099
1100    #[test]
1101    fn link_durable_survives_reopen() {
1102        let dir = tempdir().unwrap();
1103        {
1104            let db = Db::open(dir.path(), None).unwrap();
1105            db.put("driver", "d1", serde_json::json!({"name": "Bob"}),   vec![], None, None).unwrap();
1106            db.put("trip",   "t1", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
1107            db.link("driver:d1", "handles", "trip:t1").unwrap();
1108        }
1109        let db2 = Db::open(dir.path(), None).unwrap();
1110        db2.startup_ready.store(true, std::sync::atomic::Ordering::SeqCst);
1111        let trips = db2.neighbors("driver:d1", "handles");
1112        assert_eq!(trips.len(), 1);
1113        assert_eq!(trips[0].id, "t1");
1114    }
1115
1116    #[test]
1117    fn tip_survives_warm_restart() {
1118        // v2.5.43: tip() returns the last written object AND survives a warm restart.
1119        // On reopen the seq_index is cold (warm start skips the scan), so tip() must
1120        // resolve the last write via the MANIFEST tip_hash fallback — no scan.
1121        let dir = tempdir().unwrap();
1122        {
1123            let db = Db::open(dir.path(), None).unwrap();
1124            db.put("blocks", "b1", serde_json::json!({"h": 1}), vec![], None, None).unwrap();
1125            db.put("blocks", "b2", serde_json::json!({"h": 2}), vec![], None, None).unwrap();
1126            db.flush_all(); // persists MANIFEST incl. tip_hash
1127            assert_eq!(db.tip().expect("tip in-session").id, "b2");
1128        }
1129        // Warm reopen: MANIFEST present -> no cold scan -> seq_index cold.
1130        let db2 = Db::open(dir.path(), None).unwrap();
1131        assert!(db2.get_hash_by_seq(1).is_none(), "seq_index is cold on a warm boot");
1132        let tip = db2.tip().expect("tip() must survive a warm restart");
1133        assert_eq!(tip.id, "b2");
1134        assert_eq!(tip.data.get("h").and_then(|v| v.as_i64()), Some(2));
1135    }
1136
1137    #[test]
1138    fn tip_collection_survives_warm_restart() {
1139        // Same contract as tip(), per collection: itc-node-rs resumes headers /
1140        // blocks / l2_receipts independently, so each must be its own durable
1141        // resume point — not just the global tip.
1142        let dir = tempdir().unwrap();
1143        {
1144            let db = Db::open(dir.path(), None).unwrap();
1145            db.put("blocks", "b1", serde_json::json!({"h": 1}), vec![], None, None).unwrap();
1146            db.put("tx",     "t1", serde_json::json!({"v": 1}), vec![], None, None).unwrap();
1147            let b2 = db.put("blocks", "b2", serde_json::json!({"h": 2}), vec![], None, None).unwrap();
1148            db.flush_all(); // persists MANIFEST incl. coll_tips
1149            assert_eq!(db.tip_collection("blocks").unwrap().id, "b2");
1150            assert_eq!(db.tip_collection("blocks").unwrap().seq, b2.seq);
1151        }
1152        // Warm reopen: MANIFEST present -> no cold scan -> seq_index cold.
1153        let db2 = Db::open(dir.path(), None).unwrap();
1154        assert!(db2.get_hash_by_seq(0).is_none(), "seq_index is cold on a warm boot");
1155        let blocks_tip = db2.tip_collection("blocks").expect("tip_collection must survive a warm restart");
1156        assert_eq!(blocks_tip.id, "b2");
1157        assert_eq!(blocks_tip.data.get("h").and_then(|v| v.as_i64()), Some(2));
1158        let tx_tip = db2.tip_collection("tx").expect("tx tip must also survive");
1159        assert_eq!(tx_tip.id, "t1");
1160        assert!(db2.tip_collection("absent").is_none());
1161    }
1162}