Skip to main content

nedb_engine/
db.rs

1//! Main DAG database — coordinates ObjectStore, IdIndex, SortedIndexes, GraphStore.
2
3use std::fs;
4use std::path::{Path, PathBuf};
5use std::sync::Arc;
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7use anyhow::Result;
8use dashmap::DashMap;
9use serde_json::Value;
10use parking_lot::RwLock;
11
12use crate::store::{Dek, Node, ObjectStore};
13use crate::index::{IdIndex, OrderedValue, SortedIndexes};
14use crate::graph::GraphStore;
15use crate::migrate;
16
17/// MANIFEST: cached {seq, head} written atomically after every write.
18/// On startup, if MANIFEST exists and no sorted indexes need rebuilding,
19/// startup is O(1) — just read this one file instead of scanning all objects.
20#[derive(serde::Serialize, serde::Deserialize)]
21struct Manifest {
22    seq:  u64,
23    head: String,
24    /// Object hash of the highest-seq node at flush time. Lets `tip()` resolve the
25    /// last write O(1) on a warm boot — before any scan repopulates the in-memory
26    /// seq index. `#[serde(default)]` so pre-2.5.43 MANIFESTs (no field) still parse.
27    #[serde(default)]
28    tip_hash: String,
29    /// Per-collection tip: `coll -> object hash of the highest-seq node in that
30    /// collection`. Lets `tip_collection()` resolve O(1) on a warm boot, same
31    /// contract as `tip_hash` for the global head. `#[serde(default)]` so
32    /// pre-this-field MANIFESTs still parse (empty map — self-heals on next write
33    /// or cold scan).
34    #[serde(default)]
35    coll_tips: std::collections::HashMap<String, String>,
36}
37
38/// Default cap for `since()` when the caller passes `limit == 0`. Bounds the
39/// engine primitive itself so a stale/offline consumer can never force an
40/// unbounded materialization — the safety lives in the core, not the HTTP layer.
41pub const DEFAULT_SINCE_LIMIT: usize = 10_000;
42
43/// One page of the changefeed returned by `since()`. The replication contract:
44/// apply `nodes` in ascending seq order, advance your cursor to `to_seq`, and keep
45/// paging while `has_more` is true; then attach to the live `subscribe` edge.
46/// `head_seq` tells the consumer how far the log currently extends (how far behind
47/// it is).
48#[derive(Debug, Clone, serde::Serialize)]
49pub struct SinceBatch {
50    /// Writes in (`from_seq`, `to_seq`], ascending by seq.
51    pub nodes:    Vec<Node>,
52    /// The exclusive cursor this page started from (echoes the request).
53    pub from_seq: u64,
54    /// Seq of the last node in this page — the consumer's next cursor.
55    pub to_seq:   u64,
56    /// Current head seq of the log (latest committed write).
57    pub head_seq: u64,
58    /// True when more writes remain past `to_seq` (the page hit `limit`).
59    pub has_more: bool,
60}
61
62/// Replication readiness snapshot. `scan_complete` is the correctness gate: until
63/// the cold-scan finishes rebuilding the seq index, an old cursor passed to
64/// `since()` can return a PARTIAL page and look (wrongly) like "caught up". A
65/// correctness-critical consumer MUST wait for `scan_complete == true` before
66/// trusting historical catch-up. `indexed_seq_min/max` report the currently
67/// resolvable seq range; `tip_seq` is the log head.
68#[derive(Debug, Clone, serde::Serialize)]
69pub struct ScanStatus {
70    /// Cold-scan finished — historical seqs fully resolvable; catch-up is safe.
71    pub scan_complete:   bool,
72    /// Head seq of the log (latest committed write).
73    pub tip_seq:         u64,
74    /// Lowest seq currently in the seq index (0 if empty).
75    pub indexed_seq_min: u64,
76    /// Highest seq currently in the seq index.
77    pub indexed_seq_max: u64,
78    /// Number of seqs currently resolvable via the index.
79    pub indexed_count:   usize,
80}
81
82pub struct Db {
83    pub objects:        ObjectStore,
84    pub id_index:       IdIndex,
85    pub sorted_indexes: SortedIndexes,
86    pub graph:          GraphStore,
87    pub root:           PathBuf,
88    /// Dirty flag — set true when head changes, cleared after manifest flush.
89    /// Decouples flush_manifest from the hot write path so concurrent writes
90    /// don't serialise on 2× file I/O per PUT.
91    manifest_dirty:     Arc<AtomicBool>,
92    pub seq:            AtomicU64,
93    /// Cached Merkle head — updated incrementally on every write (O(1)).
94    head:               RwLock<String>,
95    /// `(seq, object hash)` of the most recent write (highest seq). Mirrors `head`
96    /// but holds the tip's content hash, so `tip()` can resolve the last node O(1)
97    /// on a warm boot when the in-memory `seq_index` is still cold. The seq rides
98    /// along so concurrent writers can settle the tip by HIGHEST SEQ rather than
99    /// arrival order (a slow older put must never clobber a newer tip). Only the
100    /// hash is persisted in MANIFEST — format unchanged.
101    tip_hash:           RwLock<(u64, String)>,
102    /// Per-collection tip: `coll -> (seq, object hash)` of the highest-seq node in
103    /// that collection. Kept current on every write (`update_head`, seq-guarded),
104    /// restored from MANIFEST on warm boot, rebuilt by the cold scan — so
105    /// `tip_collection()` is O(1) and durable across restarts in every startup
106    /// regime, by construction.
107    coll_tip_hash:      Arc<DashMap<String, (u64, String)>>,
108    /// True once startup is fully ready (MANIFEST loaded or cold scan complete).
109    /// Warm starts set this true before returning from open().
110    /// Cold starts set this true in the background thread when scan completes.
111    /// Writes are held with 503 until this is true; reads always proceed.
112    pub startup_ready:  Arc<AtomicBool>,
113    /// Seq → hash lookup for v1 compatibility. Populated by put(), put_batch(),
114    /// and the cold-scan background pass. Only covers nodes from the current
115    /// process session + cold-scan; older seqs not in this map cannot be resolved.
116    seq_index:          Arc<DashMap<u64, String>>,
117}
118
119impl Db {
120    /// Create a pure in-memory database — no disk I/O, no migration, instant startup.
121    /// Perfect for tests, hot-cache layers, and ephemeral sessions.
122    /// All data is lost when the Db is dropped.
123    pub fn in_memory() -> Self {
124        Self {
125            objects:        ObjectStore::in_memory(),
126            id_index:       IdIndex::in_memory(),
127            sorted_indexes: SortedIndexes::new(),
128            graph:          GraphStore::in_memory(),
129            root:           std::path::PathBuf::from(":memory:"),
130            seq:            AtomicU64::new(0),
131            head:           RwLock::new(String::new()),
132            tip_hash:       RwLock::new((0, String::new())),
133            coll_tip_hash:  Arc::new(DashMap::new()),
134            startup_ready:  Arc::new(AtomicBool::new(true)),  // always ready
135            manifest_dirty: Arc::new(AtomicBool::new(false)),
136            seq_index:      Arc::new(DashMap::new()),
137        }
138    }
139
140    /// Open (or create) a database. Runs v1→v2 migration automatically if log.aof is present.
141    pub fn open(db_root: &Path, dek: Option<Dek>) -> Result<Self> {
142        std::fs::create_dir_all(db_root)?;
143
144        let objects        = ObjectStore::new(db_root, dek.clone())?;
145        let id_index       = IdIndex::new(db_root)?;
146        let sorted_indexes = SortedIndexes::new();
147        let graph          = GraphStore::new(db_root)?;
148
149        let mut db = Self {
150            objects,
151            id_index,
152            sorted_indexes,
153            graph,
154            root: db_root.to_path_buf(),
155            seq:  AtomicU64::new(0),
156            head: RwLock::new(String::new()),
157            tip_hash: RwLock::new((0, String::new())),
158            coll_tip_hash: Arc::new(DashMap::new()),
159            startup_ready:  Arc::new(AtomicBool::new(false)),
160            manifest_dirty: Arc::new(AtomicBool::new(false)),
161            seq_index:      Arc::new(DashMap::new()),
162        };
163
164        // Auto-migrate v1 → v2 if needed (pass DEK so encrypted AOFs convert correctly)
165        migrate::migrate_if_needed(
166            db_root,
167            &db.objects,
168            &db.id_index,
169            &db.sorted_indexes,
170            &db.graph,
171            dek.as_ref(),
172        )?;
173
174        // Fast startup: load seq+head from MANIFEST if no sorted indexes need rebuilding.
175        // Falls back to full object scan only when necessary (first open, or post-migration).
176        db.startup_rebuild()?;
177
178        Ok(db)
179    }
180
181    /// Smart startup:
182    /// - Warm (MANIFEST exists): O(1) load → startup_ready = true immediately.
183    /// - Cold (no MANIFEST): start server immediately, run scan in background thread.
184    ///   Writes return 503 until scan completes; reads always proceed.
185    fn startup_rebuild(&mut self) -> Result<()> {
186        let manifest_path = self.root.join("MANIFEST");
187        let needs_index_rebuild = !self.sorted_indexes.is_empty();
188
189        // Warm path: MANIFEST + no sorted indexes to rebuild → instant start
190        if manifest_path.exists() && !needs_index_rebuild {
191            if let Some(m) = fs::read_to_string(&manifest_path)
192                .ok()
193                .and_then(|s| serde_json::from_str::<Manifest>(&s).ok())
194            {
195                // Self-heal: MANIFEST with an empty or short head is corrupt/stale.
196                // Fall through to cold scan so the head is rebuilt correctly from objects.
197                if m.head.len() < 8 {
198                    eprintln!("  [nedbd] MANIFEST head invalid (len={}), self-healing via cold scan", m.head.len());
199                } else {
200                    // Pre-2.5.43 MANIFEST (no persisted tip): warm-boot ANYWAY.
201                    //
202                    // The old policy forced a full cold scan "once to upgrade" —
203                    // on multi-million-object embedded stores (itcd -dagv3:
204                    // 1.7M+ objects per database) that scan is hours of random
205                    // reads on seek-bound media, it races the host's own boot
206                    // I/O, and if the process exits before it completes the
207                    // NEXT boot pays it again — a permanent boot tax for
208                    // exactly the deployments that can least afford it. And it
209                    // buys nothing that can't heal lazily: seq + head in the
210                    // old MANIFEST are perfectly valid, and flush_manifest
211                    // writes tip_hash + coll_tips from live state, so the very
212                    // first write + flush after boot upgrades the MANIFEST
213                    // organically. Until then tip()/tip_collection() simply
214                    // return None on this boot — exactly their documented
215                    // behavior for an unresolvable tip — and every other read
216                    // and write path is unaffected.
217                    if m.tip_hash.is_empty() {
218                        eprintln!("  [nedbd] MANIFEST predates durable tip() — warm boot; tip()/tip_collection() heal on first flush (no forced scan)");
219                    }
220                    self.seq.store(m.seq, Ordering::SeqCst); // m.seq is already the next-to-assign counter
221                    *self.head.write() = m.head.clone();
222                    // The tip's seq is the last ASSIGNED seq (m.seq is next-to-assign).
223                    *self.tip_hash.write() = (m.seq.saturating_sub(1), m.tip_hash.clone());
224                    for (coll, hash) in &m.coll_tips {
225                        // Per-coll seqs aren't persisted (MANIFEST format unchanged);
226                        // seed 0 — every future write has seq >= m.seq > 0 and wins,
227                        // and nothing older than the persisted tip can ever arrive
228                        // because the seq counter resumes at m.seq.
229                        self.coll_tip_hash.insert(coll.clone(), (0, hash.clone()));
230                    }
231                    self.startup_ready.store(true, Ordering::SeqCst);
232                    println!("  [nedbd] warm start — seq={} head={}... tip={}...",
233                        m.seq, &m.head[..8],
234                        if m.tip_hash.is_empty() { "(pre-2.5.43, heals on flush)" }
235                        else { &m.tip_hash[..8.min(m.tip_hash.len())] });
236                    return Ok(());
237                }
238            } else {
239                eprintln!("  [nedbd] MANIFEST corrupt or missing, falling back to cold scan");
240            }
241        }
242
243        // Cold path: mark as not ready, return immediately.
244        // The actual background scan is started by Db::start_cold_scan(arc)
245        // which is called from Manager::open_all() AFTER Arc::new(db) — when
246        // the Db is heap-allocated and its field addresses are permanently stable.
247        // Capturing field addresses here would cause UB: Db moves on return.
248        println!("  [nedbd] cold start — background scan will start after heap allocation");
249        Ok(())
250    }
251
252    /// Call this from Manager::open_all() after Arc::new(db).
253    /// Spawns the cold scan background thread with stable heap addresses.
254    /// No-op if startup is already complete (warm start).
255    pub fn start_cold_scan(self_arc: Arc<Self>) {
256        if self_arc.startup_ready.load(Ordering::SeqCst) {
257            return; // warm start — already ready
258        }
259        // Fast path: if the database is empty (new or just created), skip the
260        // background thread entirely. No objects to scan = instant startup.
261        if self_arc.objects.all_hashes().next().is_none() {
262            self_arc.startup_ready.store(true, Ordering::SeqCst);
263            return;
264        }
265        println!("  [nedbd] cold start — background scan starting, server accepting reads now");
266        std::thread::spawn(move || {
267            let db = self_arc;
268            cold_scan_background_arc(db);
269        });
270    }
271
272    /// Write a document. Returns the new node with its content hash set.
273    pub fn put(
274        &self,
275        coll: &str,
276        id: &str,
277        data: Value,
278        caused_by: Vec<String>,
279        valid_from: Option<String>,
280        valid_to:   Option<String>,
281    ) -> Result<Node> {
282        let seq  = self.seq.fetch_add(1, Ordering::SeqCst);
283        let prev = self.id_index.get(coll, id);
284
285        // Remove old node from sorted indexes (it's being superseded).
286        // Skip the old-object disk read entirely when no sorted index exists —
287        // the read (open + BLAKE2b verify + optional AES-GCM decrypt + JSON
288        // parse) was pure waste in the common unindexed case, ~2x read
289        // amplification on every update (the itcd chainstate shape).
290        if !self.sorted_indexes.is_empty() {
291            if let Some(old_hash) = &prev {
292                if let Ok(old_node) = self.objects.read(old_hash) {
293                    if let Value::Object(ref obj) = old_node.data {
294                        for (field, value) in obj {
295                            self.sorted_indexes.remove(coll, field, value, old_hash);
296                        }
297                    }
298                }
299            }
300        }
301
302        let mut node = Node {
303            id:         id.to_string(),
304            coll:       coll.to_string(),
305            seq,
306            data:       data.clone(),
307            prev,
308            caused_by:  caused_by.clone(),
309            ts:         now(),
310            valid_from,
311            valid_to,
312            hash:       String::new(),
313        };
314
315        // Write to object store (atomic, content-addressed)
316        let hash = self.objects.write(&mut node)?;
317        self.seq_index.insert(seq, hash.clone());
318
319        // Update id index (atomic file)
320        self.id_index.set(coll, id, &hash)?;
321
322        // Update sorted indexes
323        if let Value::Object(ref obj) = data {
324            for (field, value) in obj {
325                if self.sorted_indexes.has(coll, field) {
326                    self.sorted_indexes.insert(coll, field, value, &hash);
327                }
328            }
329        }
330
331        // Write causal graph edges
332        for cause in &caused_by {
333            self.graph.add_edge(&hash, "caused_by", cause)?;
334            self.graph.add_edge(cause, "caused_by_rev", &hash)?;
335        }
336
337        // Update running Merkle head: O(1) chain, no full recompute.
338        // new_head = BLAKE2b(prev_head || seq_bytes || new_object_hash)
339        self.update_head(coll, seq, &hash);
340
341        Ok(node)
342    }
343
344    /// Batch put: write N documents in parallel, preserving monotonic seq ordering.
345    /// Pre-allocates N seq numbers atomically, then parallelises object writes and
346    /// id-index updates via Rayon. Each op is independent — safe to parallelise.
347    /// Returns nodes in input order with assigned seq numbers.
348    pub fn put_batch(
349        &self,
350        ops: Vec<(String, String, Value, Vec<String>, Option<String>, Option<String>)>,
351        // (coll, id, data, caused_by, valid_from, valid_to)
352    ) -> Result<Vec<Node>> {
353        use rayon::prelude::*;
354
355        if ops.is_empty() { return Ok(vec![]); }
356        let n = ops.len() as u64;
357
358        // Pre-allocate N consecutive seq numbers — preserves ordering under concurrency
359        let base_seq = self.seq.fetch_add(n, Ordering::SeqCst);
360        let ts = now();
361
362        // Build nodes with assigned seq numbers
363        let index_live = !self.sorted_indexes.is_empty();
364        let mut nodes: Vec<Node> = ops.into_iter().enumerate().map(|(i, (coll, id, data, caused_by, valid_from, valid_to))| {
365            let prev = self.id_index.get(&coll, &id);
366            // Parity with put(): drop the superseded version's values from any
367            // sorted indexes, so top-k never returns stale hashes after a batch
368            // update. Without this, batch updates left the old version's index
369            // entries in place — ORDER BY surfaced superseded rows alongside
370            // current ones. Only pay the old-object read when an index exists.
371            if index_live {
372                if let Some(old_hash) = &prev {
373                    if let Ok(old_node) = self.objects.read(old_hash) {
374                        if let Value::Object(ref obj) = old_node.data {
375                            for (field, value) in obj {
376                                self.sorted_indexes.remove(&coll, field, value, old_hash);
377                            }
378                        }
379                    }
380                }
381            }
382            Node {
383                id, coll, seq: base_seq + i as u64,
384                data, prev, caused_by,
385                ts, valid_from, valid_to,
386                hash: String::new(),
387            }
388        }).collect();
389
390        // Parallel object writes (content-addressed, idempotent, safe to parallelise)
391        let write_errors: Vec<anyhow::Error> = nodes.par_iter_mut()
392            .filter_map(|node| self.objects.write(node).err())
393            .collect();
394        if let Some(e) = write_errors.into_iter().next() { return Err(e); }
395
396        // Parallel id-index updates
397        let index_errors: Vec<anyhow::Error> = nodes.par_iter()
398            .filter_map(|node| self.id_index.set(&node.coll, &node.id, &node.hash).err())
399            .collect();
400        if let Some(e) = index_errors.into_iter().next() { return Err(e); }
401
402        // Sorted indexes + causal graph (sequential — small overhead, usually no indexes)
403        for node in &nodes {
404            self.seq_index.insert(node.seq, node.hash.clone());
405            if let Value::Object(ref obj) = node.data {
406                for (field, value) in obj {
407                    if self.sorted_indexes.has(&node.coll, field) {
408                        self.sorted_indexes.insert(&node.coll, field, value, &node.hash);
409                    }
410                }
411            }
412            for cause in &node.caused_by {
413                self.graph.add_edge(&node.hash, "caused_by", cause).ok();
414                self.graph.add_edge(cause, "caused_by_rev", &node.hash).ok();
415            }
416        }
417
418        // Single Merkle head update for the whole batch (chain all hashes)
419        for node in &nodes {
420            self.update_head(&node.coll, node.seq, &node.hash);
421        }
422
423        Ok(nodes)
424    }
425
426    /// Update the running Merkle head with a new write. O(1); no file I/O — the
427    /// background ticker flushes MANIFEST.
428    ///
429    /// Concurrency contract (this function is reached by parallel `put()`s —
430    /// the server runs puts on blocking threads):
431    /// - The head chain is extended under ONE write lock held across the whole
432    ///   read-modify-write. The old read-then-write shape let two concurrent
433    ///   writers both read the same prev head; one contribution was silently
434    ///   dropped from the chain — a corrupted tamper-evidence primitive. The
435    ///   chain is arrival-ordered under concurrency (a seq-ordered canonical
436    ///   head is tracked as follow-up work); what this lock guarantees is that
437    ///   EVERY write is committed into the chain exactly once.
438    /// - Tip pointers settle by HIGHEST SEQ, not arrival order: concurrent
439    ///   puts can reach here out of seq order, and "last call wins" could
440    ///   persist a stale tip into MANIFEST for the next warm boot.
441    fn update_head(&self, coll: &str, seq: u64, new_hash: &str) {
442        use blake2::{Blake2b512, Digest};
443        {
444            let mut head = self.head.write();
445            let mut h = Blake2b512::new();
446            h.update(head.as_bytes());
447            h.update(seq.to_le_bytes());
448            h.update(new_hash.as_bytes());
449            *head = hex::encode(&h.finalize()[..32]);
450        }
451        {
452            let mut tip = self.tip_hash.write();
453            if seq >= tip.0 {
454                *tip = (seq, new_hash.to_string());
455            }
456        }
457        self.coll_tip_hash
458            .entry(coll.to_string())
459            .and_modify(|t| {
460                if seq >= t.0 {
461                    *t = (seq, new_hash.to_string());
462                }
463            })
464            .or_insert_with(|| (seq, new_hash.to_string()));
465        // Mark dirty — background ticker will flush to MANIFEST (no I/O on write path)
466        self.manifest_dirty.store(true, Ordering::Release);
467    }
468
469    /// Flush both the id-index WAL and MANIFEST. Used on graceful shutdown.
470    pub fn flush_all(&self) {
471        self.id_index.flush_write_buf();
472        // v3: fsync the active segment (no-op for loose/in-memory stores).
473        // One durability point per batch instead of one fsync per object.
474        if let Err(e) = self.objects.sync() {
475            eprintln!("nedb: segment sync failed: {}", e);
476        }
477        self.flush_manifest();
478    }
479
480    /// Compact the v3 packed object store: keep the CURRENT version of every
481    /// document (from the id-index) and reclaim everything else. No-op unless
482    /// running with the v3 segment substrate (`--dag-v3` / NEDB_DAG_V3).
483    ///
484    /// This is a PRUNING operation: superseded/historical object versions are
485    /// dropped, so AS OF / TRACE over pruned versions is discarded — that is
486    /// what reclaims the space. Flushes first so all data is durable on disk
487    /// before the old segments are deleted.
488    pub fn compact(&self) -> Result<crate::segment::CompactStats> {
489        self.flush_all();
490        let mut live: std::collections::HashSet<String> = std::collections::HashSet::new();
491        for coll in self.id_index.collections() {
492            for id in self.id_index.list_ids(&coll) {
493                if let Some(h) = self.id_index.get(&coll, &id) {
494                    live.insert(h);
495                }
496            }
497        }
498        self.objects.compact(&live)
499    }
500
501    /// Flush MANIFEST to disk if dirty. No-op for in-memory databases.
502    pub fn flush_manifest_if_dirty(&self) {
503        if self.root == std::path::PathBuf::from(":memory:") { return; }
504        if self.manifest_dirty.compare_exchange(
505            true, false, Ordering::AcqRel, Ordering::Relaxed
506        ).is_ok() {
507            self.flush_manifest();
508        }
509    }
510
511    /// Atomically persist current seq+head to MANIFEST. No-op for in-memory databases.
512    pub fn flush_manifest(&self) {
513        if self.root == std::path::PathBuf::from(":memory:") { return; }
514        let seq  = self.seq.load(Ordering::SeqCst);
515        let head = self.head.read().clone();
516        let tip_hash = self.tip_hash.read().1.clone();
517        let coll_tips: std::collections::HashMap<String, String> = self.coll_tip_hash
518            .iter()
519            .map(|kv| (kv.key().clone(), kv.value().1.clone()))
520            .collect();
521        let m = Manifest { seq, head, tip_hash, coll_tips };
522        if let Ok(json) = serde_json::to_string(&m) {
523            let path = self.root.join("MANIFEST");
524            let tmp  = self.root.join("MANIFEST.tmp");
525            // fsync the tmp file BEFORE the rename: rename-without-fsync can
526            // leave a zero-length/partial MANIFEST at the final path after
527            // power loss (ext4 delayed allocation). The startup self-heal
528            // (invalid head -> cold scan) catches that, but a full rescan is
529            // exactly the cost MANIFEST exists to avoid. One fsync per flush,
530            // and flushes are already off the hot write path (ticker-driven).
531            let wrote = (|| -> std::io::Result<()> {
532                use std::io::Write;
533                let mut f = fs::File::create(&tmp)?;
534                f.write_all(json.as_bytes())?;
535                f.sync_all()
536            })();
537            if wrote.is_ok() && fs::rename(&tmp, &path).is_ok() {
538                // Make the rename itself durable (directory entry). Unix-only;
539                // on Windows directory handles don't support this and the
540                // rename is already journaled by NTFS.
541                #[cfg(unix)]
542                if let Ok(dir) = fs::File::open(&self.root) {
543                    let _ = dir.sync_all();
544                }
545            }
546        }
547    }
548
549    /// Start a background thread that flushes both the id-index WAL and MANIFEST
550    /// every `interval_ms` milliseconds.
551    /// Call this after Arc::new(db) — the Arc keeps Db alive for the thread's lifetime.
552    pub fn start_manifest_ticker(self_arc: Arc<Self>, interval_ms: u64) {
553        let db = self_arc;
554        std::thread::spawn(move || {
555            loop {
556                std::thread::sleep(std::time::Duration::from_millis(interval_ms));
557                // Flush id-index WAL to disk (parallel Rayon writes)
558                db.id_index.flush_write_buf();
559                // Segment bytes must be durable BEFORE a MANIFEST that
560                // references them: otherwise power loss can leave MANIFEST
561                // pointing at a tip whose object bytes were still in the page
562                // cache — the torn tail is truncated on reopen and the warm
563                // boot resolves a tip that no longer exists, with the seq
564                // counter ahead of durable data. Order: sync segments, then
565                // MANIFEST. Gated on the dirty flag so an idle database pays
566                // no per-tick fsync. (flush_all already used this order; the
567                // ticker now matches it.)
568                if db.manifest_dirty.load(Ordering::Acquire) {
569                    if let Err(e) = db.objects.sync() {
570                        eprintln!("nedb: segment sync failed: {}", e);
571                    }
572                    db.flush_manifest_if_dirty();
573                }
574            }
575        });
576    }
577
578    /// Return the current Merkle head string. O(1) — read from cache.
579    pub fn head(&self) -> String {
580        self.head.read().clone()
581    }
582
583    /// Delete a document — writes a tombstone node and removes the id from the index.
584    /// The object history is preserved in the DAG; only the live id pointer is cleared.
585    pub fn delete(&self, coll: &str, id: &str) -> Result<bool> {
586        let prev = match self.id_index.get(coll, id) {
587            None => return Ok(false),   // already gone
588            Some(h) => h,
589        };
590        let seq = self.seq.fetch_add(1, Ordering::SeqCst);
591        let mut tombstone = Node {
592            id:         format!("_del_{}", id),
593            coll:       coll.to_string(),
594            seq,
595            data:       serde_json::json!({"_deleted": id, "_prev": prev}),
596            prev:       Some(prev),
597            caused_by:  vec![],
598            ts:         now(),
599            valid_from: None,
600            valid_to:   None,
601            hash:       String::new(),
602        };
603        let hash = self.objects.write(&mut tombstone)?;
604        self.update_head(coll, seq, &hash);
605        // Remove the live id pointer — doc is now invisible to queries and list()
606        self.id_index.remove(coll, id)?;
607        Ok(true)
608    }
609
610    /// Get the current version of a document by id.
611    pub fn get(&self, coll: &str, id: &str) -> Option<Node> {
612        let hash = self.id_index.get(coll, id)?;
613        self.objects.read(&hash).ok()
614    }
615
616    /// Get a specific version of a document by object hash.
617    pub fn get_by_hash(&self, hash: &str) -> Option<Node> {
618        self.objects.read(hash).ok()
619    }
620
621    /// Get a document AS OF a specific sequence number.
622    /// Walks the version chain (prev links) backward until seq <= target.
623    pub fn get_as_of(&self, coll: &str, id: &str, target_seq: u64) -> Option<Node> {
624        let hash = self.id_index.get(coll, id)?;
625        let mut current = self.objects.read(&hash).ok()?;
626        loop {
627            if current.seq <= target_seq {
628                return Some(current);
629            }
630            let prev_hash = current.prev.as_deref()?;
631            current = self.objects.read(prev_hash).ok()?;
632        }
633    }
634
635    /// List all documents in a collection, returning current versions.
636    pub fn list(&self, coll: &str) -> Vec<Node> {
637        self.id_index
638            .list_ids(coll)
639            .into_iter()
640            .filter_map(|id| self.get(coll, &id))
641            .collect()
642    }
643
644    /// ORDER BY field ASC LIMIT n — uses sorted index if available, else falls back to full scan.
645    pub fn order_by_asc(&self, coll: &str, field: &str, limit: usize) -> Vec<Node> {
646        if self.sorted_indexes.has(coll, field) {
647            self.sorted_indexes
648                .top_k_asc(coll, field, limit)
649                .into_iter()
650                .filter_map(|h| self.objects.read(&h).ok())
651                .collect()
652        } else {
653            let mut docs = self.list(coll);
654            docs.sort_by(|a, b| {
655                let av = a.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
656                let bv = b.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
657                av.cmp(&bv)
658            });
659            docs.truncate(limit);
660            docs
661        }
662    }
663
664    /// ORDER BY field DESC LIMIT n
665    pub fn order_by_desc(&self, coll: &str, field: &str, limit: usize) -> Vec<Node> {
666        if self.sorted_indexes.has(coll, field) {
667            self.sorted_indexes
668                .top_k_desc(coll, field, limit)
669                .into_iter()
670                .filter_map(|h| self.objects.read(&h).ok())
671                .collect()
672        } else {
673            let mut docs = self.list(coll);
674            docs.sort_by(|a, b| {
675                let av = a.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
676                let bv = b.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
677                bv.cmp(&av)
678            });
679            docs.truncate(limit);
680            docs
681        }
682    }
683
684    /// TRACE caused_by — walk causal graph from a node.
685    pub fn trace(&self, hash: &str, reverse: bool, limit: usize) -> Vec<Node> {
686        self.graph
687            .trace(hash, "caused_by", reverse, limit)
688            .into_iter()
689            .filter_map(|h| self.objects.read(&h).ok())
690            .collect()
691    }
692
693    /// Verify tamper-evidence of all objects.
694    pub fn verify(&self) -> (usize, Vec<String>) {
695        self.objects.verify_all()
696    }
697
698    /// Create a sorted index for a (coll, field) pair.
699    pub fn create_sorted_index(&self, coll: &str, field: &str) {
700        self.sorted_indexes.ensure(coll, field);
701        // Backfill from existing objects
702        for id in self.id_index.list_ids(coll) {
703            if let Some(node) = self.get(coll, &id) {
704                if let Value::Object(ref obj) = node.data {
705                    if let Some(value) = obj.get(field) {
706                        self.sorted_indexes.insert(coll, field, value, &node.hash);
707                    }
708                }
709            }
710        }
711    }
712
713    /// Resolve a sequence number to its content hash (v1 compatibility).
714    /// Only covers nodes written in the current process session + cold-scan nodes.
715    pub fn get_hash_by_seq(&self, seq: u64) -> Option<String> {
716        self.seq_index.get(&seq).map(|r| r.clone())
717    }
718
719    /// The tip — the most recently written node (highest seq), or `None` if the
720    /// database is empty. O(1): `self.seq` is the next-to-assign counter, so the
721    /// latest write sits at `seq - 1`; we resolve it through the same
722    /// seq_index → object-store path a normal read uses, so the returned Node is
723    /// byte-identical to one fetched by id or hash (it carries its own seq, hash,
724    /// causal links, and valid-time). This is the cheap "give me the latest write"
725    /// primitive — the head of the log, not an aggregate.
726    pub fn tip(&self) -> Option<Node> {
727        let next = self.seq.load(Ordering::SeqCst);
728        if next == 0 {
729            return None; // nothing written yet
730        }
731        // Fast path: resolve the head seq through the in-memory seq index
732        // (populated by this session's writes or by the cold scan).
733        if let Some(hash) = self.get_hash_by_seq(next - 1) {
734            return self.get_by_hash(&hash);
735        }
736        // Warm-boot fallback: the seq index is still cold (warm start skips the
737        // scan), but the tip's object hash was persisted in MANIFEST and restored
738        // on open. O(1), no scan — this is what makes tip() survive a restart.
739        let th = self.tip_hash.read().1.clone();
740        if !th.is_empty() {
741            return self.get_by_hash(&th);
742        }
743        None
744    }
745
746    /// The collection-local tip — the most recent write into `coll` (highest seq in
747    /// that collection), or `None` if the collection has no writes. O(1): resolves
748    /// through `coll_tip_hash`, a dedicated per-collection map kept current on every
749    /// write (`update_head`), restored from MANIFEST on warm boot, and rebuilt by the
750    /// cold scan — durable across restarts by construction, same contract as `tip()`
751    /// for the global head. Conceptually a different index than the global `tip()`
752    /// (global head vs collection head), kept as a separate method so each is
753    /// explicit — parity with the Python reference's `tip(coll)`. Lets a consumer
754    /// resume one chain (e.g. blocks / tx / utxo) without pulling global tip and
755    /// filtering.
756    pub fn tip_collection(&self, coll: &str) -> Option<Node> {
757        let hash = self.coll_tip_hash.get(coll)?.1.clone();
758        self.get_by_hash(&hash)
759    }
760
761    /// Changefeed page: up to `limit` nodes written AFTER `after_seq` (EXCLUSIVE),
762    /// ascending by seq, wrapped in a `SinceBatch` cursor envelope. `after_seq` is
763    /// the cursor you last applied (a prior `tip()` seq or `to_seq`). `limit` bounds
764    /// the page — `0` means DEFAULT_SINCE_LIMIT, so the engine primitive can never
765    /// materialize an unbounded batch even when embedders call it directly (the
766    /// safety is here, not only in the HTTP layer). Drain by paging while
767    /// `has_more`, advancing your cursor to `to_seq`, then hand off to the live
768    /// `subscribe` edge. The append-only log IS the changefeed, so this is an
769    /// O(page) walk; unresolved seqs (outside seq_index coverage — see
770    /// `scan_status()`) are skipped rather than faked.
771    pub fn since(&self, after_seq: u64, limit: usize) -> SinceBatch {
772        let next = self.seq.load(Ordering::SeqCst);          // head + 1
773        let head_seq = next.saturating_sub(1);
774        let cap = if limit == 0 { DEFAULT_SINCE_LIMIT } else { limit };
775        let mut nodes: Vec<Node> = Vec::new();
776        let mut to_seq = after_seq;
777        let mut hit_limit = false;
778        let mut s = after_seq.saturating_add(1);
779        while s < next {
780            if nodes.len() >= cap { hit_limit = true; break; }
781            if let Some(hash) = self.get_hash_by_seq(s) {
782                if let Some(node) = self.get_by_hash(&hash) {
783                    to_seq = node.seq;
784                    nodes.push(node);
785                }
786            }
787            s += 1;
788        }
789        SinceBatch { nodes, from_seq: after_seq, to_seq, head_seq, has_more: hit_limit }
790    }
791
792    /// Replication readiness — see `ScanStatus`. `scan_complete` gates safe
793    /// historical catch-up: a consumer pulling an old cursor right after a cold
794    /// start must wait for it, or `since()` may hand back a partial page that looks
795    /// like "caught up". Computes the indexed range by scanning the in-memory seq
796    /// index (O(index)) — intended for periodic status polls, not the per-write
797    /// hot path.
798    pub fn scan_status(&self) -> ScanStatus {
799        let next = self.seq.load(Ordering::SeqCst);
800        let mut min = u64::MAX;
801        let mut max = 0u64;
802        let mut count = 0usize;
803        for kv in self.seq_index.iter() {
804            let s = *kv.key();
805            if s < min { min = s; }
806            if s > max { max = s; }
807            count += 1;
808        }
809        if count == 0 { min = 0; }
810        ScanStatus {
811            scan_complete:   self.startup_ready.load(Ordering::SeqCst),
812            tip_seq:         next.saturating_sub(1),
813            indexed_seq_min: min,
814            indexed_seq_max: max,
815            indexed_count:   count,
816        }
817    }
818
819    /// Add an explicit named relation edge between two documents.
820    /// Add an explicit named relation between two "coll:id" nodes.
821    /// Relations stored as __links__ documents — NQL-queryable, time-travelable,
822    /// consistent with the PyO3 binding which uses the same __links__ convention.
823    pub fn link(&self, frm: &str, rel: &str, to: &str) -> Result<()> {
824        let (frm_coll, frm_id) = frm.split_once(':')
825            .ok_or_else(|| anyhow::anyhow!("link frm must be 'coll:id', got: {}", frm))?;
826        let (to_coll, to_id) = to.split_once(':')
827            .ok_or_else(|| anyhow::anyhow!("link to must be 'coll:id', got: {}", to))?;
828        if self.id_index.get(frm_coll, frm_id).is_none() {
829            anyhow::bail!("link: frm not found: {}", frm);
830        }
831        if self.id_index.get(to_coll, to_id).is_none() {
832            anyhow::bail!("link: to not found: {}", to);
833        }
834        let link_id = format!("{}|{}|{}", frm, rel, to);
835        let doc = serde_json::json!({"_from": frm, "_rel": rel, "_to": to});
836        self.put("__links__", &link_id, doc, vec![], None, None)?;
837        Ok(())
838    }
839
840    /// Remove a named relation (deletes the __links__ document).
841    pub fn unlink(&self, frm: &str, rel: &str, to: &str) -> Result<bool> {
842        let link_id = format!("{}|{}|{}", frm, rel, to);
843        self.delete("__links__", &link_id)
844    }
845
846    /// Get neighbor nodes via a named relation.
847    /// Queries __links__ — consistent with the PyO3 binding.
848    pub fn neighbors(&self, frm: &str, rel: &str) -> Vec<Node> {
849        self.id_index
850            .list_ids("__links__")
851            .into_iter()
852            .filter_map(|id| self.get("__links__", &id))
853            .filter(|node| {
854                node.data.get("_from").and_then(|v| v.as_str()) == Some(frm)
855                    && node.data.get("_rel").and_then(|v| v.as_str()) == Some(rel)
856            })
857            .filter_map(|node| {
858                let to = node.data.get("_to")?.as_str()?;
859                let (to_coll, to_id) = to.split_once(':')?;
860                self.get(to_coll, to_id)
861            })
862            .collect()
863    }
864}
865
866impl Drop for Db {
867    /// Flush buffered state when the database is closed so a write-then-drop
868    /// sequence is durable without an explicit `flush_all()`.
869    ///
870    /// `IdIndex::set` only stages updates in the in-memory WAL `write_buf`;
871    /// disk persistence happens in `flush_write_buf()`, normally driven by the
872    /// manifest ticker. A short-lived `Db` (a library user's `{ let db =
873    /// Db::open(p)?; db.put(..)?; }` block, or a test) has no ticker, so without
874    /// this its writes would be silently lost on reopen. Flushing on drop
875    /// mirrors the flush-on-close contract of other embedded stores (sled,
876    /// RocksDB).
877    ///
878    /// In production this is a harmless safety net, not the primary durability
879    /// path: the manifest ticker thread holds an `Arc<Db>` for the process
880    /// lifetime, so `Drop` only fires once every owning handle is gone. No-op
881    /// for in-memory databases (`flush_all` short-circuits on `:memory:`).
882    fn drop(&mut self) {
883        self.flush_all();
884    }
885}
886
887/// Background cold-scan worker. Takes Arc<Db> — safe, Db is on the heap.
888fn cold_scan_background_arc(db: Arc<Db>) {
889    use rayon::prelude::*;
890    use blake2::{Blake2b512, Digest};
891
892    let objects        = &db.objects;
893    let head           = &db.head;
894    let seq_atomic     = &db.seq;
895    let sorted_indexes = &db.sorted_indexes;
896    let seq_index      = &db.seq_index;
897    let ready_flag     = Arc::clone(&db.startup_ready);
898
899    let hashes: Vec<String> = objects.all_hashes().collect();
900    let total = hashes.len();
901
902    if total == 0 {
903        ready_flag.store(true, Ordering::SeqCst);
904        return;
905    }
906
907    println!("  [nedbd] background scan — {} objects...", total);
908    let t0 = std::time::Instant::now();
909    let step = (total / 10).max(1000);
910
911    // Populate the seq index AS objects are read here, not in a second pass
912    // afterward: this loop is the slow, disk-I/O-bound phase (verifying and
913    // parsing every object), and it can run for minutes on a multi-million
914    // object store. `scan_status().indexed_count` reads `seq_index`'s size, so
915    // inserting here — not after `.collect()` — is what makes that a real, live
916    // progress signal through the phase that actually takes the time, instead
917    // of reporting a flat 0 until this whole pass finishes. Safe: DashMap
918    // supports concurrent inserts, and every parallel worker here inserts a
919    // disjoint key (each object has its own seq).
920    let nodes: Vec<Node> = hashes.par_iter()
921        .enumerate()
922        .filter_map(|(i, h)| {
923            if i > 0 && i % step == 0 {
924                let pct     = i * 100 / total;
925                let elapsed = t0.elapsed().as_secs_f32();
926                let rate    = i as f32 / elapsed;
927                let eta     = (total - i) as f32 / rate;
928                eprint!("\r  [nedbd]   {:>3}%  {:>8} / {:>8}  ({:>8.0}/s  eta {:.0}s)   ",
929                    pct, i, total, rate, eta);
930            }
931            let node = objects.read(h).ok()?;
932            seq_index.insert(node.seq, node.hash.clone());
933            Some(node)
934        })
935        .collect();
936
937    eprintln!("\r  [nedbd]   100%  {:>8} / {:>8}  ({:.1}s)                        ",
938        total, total, t0.elapsed().as_secs_f32());
939
940    let max_seq = nodes.iter().map(|n| n.seq).max().unwrap_or(0);
941    seq_atomic.store(max_seq + 1, Ordering::SeqCst);
942
943    // Per-collection tip: highest-seq node's hash, per coll. `nodes` is NOT
944    // seq-ordered here (it comes from an unordered object-hash scan), so this
945    // must track the max explicitly — unlike the live write path's "last call
946    // wins" (which relies on ascending call order that a scan doesn't have).
947    let mut coll_max: std::collections::HashMap<String, (u64, String)> = std::collections::HashMap::new();
948
949    for node in &nodes {
950        // seq_index was already populated above, during the read pass.
951        coll_max.entry(node.coll.clone())
952            .and_modify(|(s, h)| if node.seq > *s { *s = node.seq; *h = node.hash.clone(); })
953            .or_insert_with(|| (node.seq, node.hash.clone()));
954        if let Value::Object(ref obj) = node.data {
955            for (field, value) in obj {
956                if sorted_indexes.has(&node.coll, field) {
957                    sorted_indexes.insert(&node.coll, field, value, &node.hash);
958                }
959            }
960        }
961    }
962
963    for (coll, (seq, hash)) in coll_max {
964        db.coll_tip_hash.insert(coll, (seq, hash));
965    }
966
967    // Compute Merkle head from sorted hashes
968    let mut sorted_hashes = hashes;
969    sorted_hashes.sort();
970    let mut h = Blake2b512::new();
971    h.update(max_seq.to_le_bytes());
972    for hash_str in &sorted_hashes {
973        h.update(hash_str.as_bytes());
974    }
975    let new_head = hex::encode(&h.finalize()[..32]);
976    *head.write() = new_head;
977
978    // Tip = the highest-seq object we indexed. Persist its hash so tip() resolves
979    // O(1) on the next warm boot, before any scan repopulates the seq index.
980    let tip_hash = db.seq_index.iter()
981        .max_by_key(|kv| *kv.key())
982        .map(|kv| kv.value().clone())
983        .unwrap_or_default();
984    *db.tip_hash.write() = (max_seq, tip_hash);
985
986    // Write MANIFEST through the one canonical writer. The hand-rolled write
987    // this replaces stored `seq: max_seq` (the last USED seq) — but the warm
988    // boot loads `m.seq` as the NEXT-TO-ASSIGN counter, so a restart right
989    // after a quiet cold scan handed the next write the tip's seq: a duplicate
990    // seq in the log (seq_index overwrite, wrong since() page). flush_manifest
991    // reads the live counter (already max_seq + 1) — correct by construction.
992    db.flush_manifest();
993
994    // Signal server: writes can now proceed
995    ready_flag.store(true, Ordering::SeqCst);
996    println!("  [nedbd] background scan complete — seq={} objects={} MANIFEST written", max_seq, total);
997}
998
999fn now() -> f64 {
1000    std::time::SystemTime::now()
1001        .duration_since(std::time::UNIX_EPOCH)
1002        .map(|d| d.as_secs_f64())
1003        .unwrap_or(0.0)
1004}
1005
1006#[cfg(test)]
1007mod tests {
1008    use super::*;
1009    use tempfile::tempdir;
1010
1011    #[test]
1012    fn put_and_get() {
1013        let dir = tempdir().unwrap();
1014        let db = Db::open(dir.path(), None).unwrap();
1015        db.put(
1016            "blocks", "618000",
1017            serde_json::json!({"height": 618000, "hash": "0000abc"}),
1018            vec![], None, None,
1019        ).unwrap();
1020        let node = db.get("blocks", "618000").unwrap();
1021        assert_eq!(node.id, "618000");
1022        assert_eq!(node.data["height"], 618000);
1023    }
1024
1025    #[test]
1026    fn order_by_with_sorted_index() {
1027        let dir = tempdir().unwrap();
1028        let db = Db::open(dir.path(), None).unwrap();
1029        db.create_sorted_index("blocks", "height");
1030        for h in [3u64, 1, 5, 2, 4] {
1031            db.put("blocks", &h.to_string(),
1032                serde_json::json!({"height": h}),
1033                vec![], None, None).unwrap();
1034        }
1035        let asc = db.order_by_asc("blocks", "height", 3);
1036        let heights: Vec<u64> = asc.iter()
1037            .filter_map(|n| n.data["height"].as_u64())
1038            .collect();
1039        assert_eq!(heights, vec![1, 2, 3]);
1040    }
1041
1042    #[test]
1043    fn causal_trace() {
1044        let dir = tempdir().unwrap();
1045        let db = Db::open(dir.path(), None).unwrap();
1046        let a = db.put("ops", "a", serde_json::json!({"op": "create"}), vec![], None, None).unwrap();
1047        let b = db.put("ops", "b", serde_json::json!({"op": "transfer"}), vec![a.hash.clone()], None, None).unwrap();
1048        let c = db.put("ops", "c", serde_json::json!({"op": "burn"}), vec![b.hash.clone()], None, None).unwrap();
1049
1050        let trace = db.trace(&c.hash, false, 10);
1051        assert_eq!(trace.len(), 3);  // c → b → a
1052    }
1053
1054    #[test]
1055    fn as_of() {
1056        let dir = tempdir().unwrap();
1057        let db = Db::open(dir.path(), None).unwrap();
1058        let v1 = db.put("docs", "x", serde_json::json!({"v": 1}), vec![], None, None).unwrap();
1059        let _v2 = db.put("docs", "x", serde_json::json!({"v": 2}), vec![], None, None).unwrap();
1060
1061        let at_v1 = db.get_as_of("docs", "x", v1.seq).unwrap();
1062        assert_eq!(at_v1.data["v"], 1);
1063        let current = db.get("docs", "x").unwrap();
1064        assert_eq!(current.data["v"], 2);
1065    }
1066}
1067
1068#[cfg(test)]
1069mod tests_v2 {
1070    use super::*;
1071    use tempfile::tempdir;
1072
1073    #[test]
1074    fn seq_index_populated_on_put() {
1075        let db = Db::in_memory();
1076        let a = db.put("item", "a", serde_json::json!({"x": 1}), vec![], None, None).unwrap();
1077        let b = db.put("item", "b", serde_json::json!({"x": 2}), vec![], None, None).unwrap();
1078        assert_eq!(db.get_hash_by_seq(a.seq), Some(a.hash.clone()));
1079        assert_eq!(db.get_hash_by_seq(b.seq), Some(b.hash.clone()));
1080        assert_eq!(db.get_hash_by_seq(9999), None);
1081    }
1082
1083    #[test]
1084    fn tip_and_since() {
1085        let db = Db::in_memory();
1086        // Empty db: no tip, empty changefeed.
1087        assert!(db.tip().is_none());
1088        assert!(db.since(0, 0).nodes.is_empty());
1089
1090        let a = db.put("item", "a", serde_json::json!({"x": 1}), vec![], None, None).unwrap();
1091        let b = db.put("item", "b", serde_json::json!({"x": 2}), vec![], None, None).unwrap();
1092
1093        // tip() = the most recent write (highest seq), returned as a full node.
1094        let t = db.tip().expect("tip after writes");
1095        assert_eq!(t.seq, b.seq);
1096        assert_eq!(t.id, "b");
1097        assert_eq!(t.hash, b.hash);
1098
1099        // since(after_seq, limit) — EXCLUSIVE cursor, bounded page + envelope.
1100        let after_a = db.since(a.seq, 0);
1101        assert_eq!(after_a.nodes.len(), 1);
1102        assert_eq!(after_a.nodes[0].id, "b");
1103        assert_eq!(after_a.from_seq, a.seq);
1104        assert_eq!(after_a.to_seq, b.seq);
1105        assert_eq!(after_a.head_seq, b.seq);
1106        assert!(!after_a.has_more);
1107
1108        // Nothing written after the tip.
1109        assert!(db.since(b.seq, 0).nodes.is_empty());
1110
1111        // `limit` bounds the page and sets has_more; resume from to_seq.
1112        let c = db.put("item", "c", serde_json::json!({"x": 3}), vec![], None, None).unwrap();
1113        let page = db.since(a.seq, 1);             // (a..] capped at 1 -> [b], more pending
1114        assert_eq!(page.nodes.len(), 1);
1115        assert_eq!(page.nodes[0].id, "b");
1116        assert_eq!(page.to_seq, b.seq);
1117        assert!(page.has_more);
1118        let page2 = db.since(page.to_seq, 1);      // resume from b -> [c], done
1119        assert_eq!(page2.nodes.len(), 1);
1120        assert_eq!(page2.nodes[0].id, "c");
1121        assert_eq!(page2.to_seq, c.seq);
1122        assert!(!page2.has_more);
1123    }
1124
1125    #[test]
1126    fn tip_collection_per_chain() {
1127        // The ITC sync-client case: separate chains in separate collections; a
1128        // consumer resumes ONE without pulling global tip and filtering.
1129        let db = Db::in_memory();
1130        assert!(db.tip_collection("blocks").is_none());
1131
1132        db.put("blocks", "b0", serde_json::json!({"h": 0}), vec![], None, None).unwrap();
1133        db.put("tx",     "t0", serde_json::json!({"v": 1}), vec![], None, None).unwrap();
1134        let b1 = db.put("blocks", "b1", serde_json::json!({"h": 1}), vec![], None, None).unwrap();
1135        let t1 = db.put("tx",     "t1", serde_json::json!({"v": 2}), vec![], None, None).unwrap();
1136
1137        // global tip = latest write overall (t1)
1138        assert_eq!(db.tip().unwrap().id, "t1");
1139        // collection-local tips = latest write in each collection
1140        let bt = db.tip_collection("blocks").expect("blocks tip");
1141        assert_eq!(bt.id, "b1");
1142        assert_eq!(bt.seq, b1.seq);
1143        assert_eq!(db.tip_collection("tx").unwrap().seq, t1.seq);
1144        assert!(db.tip_collection("absent").is_none());
1145    }
1146
1147    #[test]
1148    fn seq_index_survives_batch() {
1149        let db = Db::in_memory();
1150        let nodes = db.put_batch(vec![
1151            ("item".into(), "x".into(), serde_json::json!({"v": 1}), vec![], None, None),
1152            ("item".into(), "y".into(), serde_json::json!({"v": 2}), vec![], None, None),
1153        ]).unwrap();
1154        for node in &nodes {
1155            assert_eq!(db.get_hash_by_seq(node.seq), Some(node.hash.clone()));
1156        }
1157    }
1158
1159    /// Regression: put_batch must remove the superseded version's sorted-index
1160    /// entries, exactly like put() does. Old behavior left the old hashes in
1161    /// the BTree — ORDER BY returned superseded rows alongside current ones
1162    /// (they resolve fine through the content-addressed store, which made the
1163    /// stale rows look legitimate).
1164    #[test]
1165    fn put_batch_removes_superseded_sorted_index_entries() {
1166        let db = Db::in_memory();
1167        db.create_sorted_index("blocks", "height");
1168        db.put("blocks", "x", serde_json::json!({"height": 1}), vec![], None, None).unwrap();
1169        db.put_batch(vec![
1170            ("blocks".into(), "x".into(), serde_json::json!({"height": 99}), vec![], None, None),
1171        ]).unwrap();
1172
1173        let asc = db.order_by_asc("blocks", "height", 10);
1174        assert_eq!(asc.len(), 1, "stale index entry for the superseded version must be gone");
1175        assert_eq!(asc[0].data["height"], 99);
1176        assert_eq!(asc[0].id, "x");
1177    }
1178
1179    /// Updates without any sorted index must keep full version-chain semantics
1180    /// (guards the new skip-old-object-read fast path in put()).
1181    #[test]
1182    fn update_without_indexes_preserves_chain() {
1183        let db = Db::in_memory();
1184        let v1 = db.put("docs", "x", serde_json::json!({"v": 1}), vec![], None, None).unwrap();
1185        let v2 = db.put("docs", "x", serde_json::json!({"v": 2}), vec![], None, None).unwrap();
1186        assert_eq!(v2.prev.as_deref(), Some(v1.hash.as_str()), "prev chain must survive the fast path");
1187        assert_eq!(db.get("docs", "x").unwrap().data["v"], 2);
1188        assert_eq!(db.get_as_of("docs", "x", v1.seq).unwrap().data["v"], 1);
1189    }
1190
1191    #[test]
1192    fn link_and_neighbors() {
1193        let db = Db::in_memory();
1194        db.put("driver", "d1", serde_json::json!({"name": "Bob"}),   vec![], None, None).unwrap();
1195        db.put("driver", "d2", serde_json::json!({"name": "Carol"}), vec![], None, None).unwrap();
1196        db.put("trip",   "t1", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
1197        db.put("trip",   "t2", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
1198
1199        db.link("driver:d1", "handles", "trip:t1").unwrap();
1200        db.link("driver:d1", "handles", "trip:t2").unwrap();
1201        db.link("driver:d2", "handles", "trip:t1").unwrap();
1202
1203        let d1_trips = db.neighbors("driver:d1", "handles");
1204        assert_eq!(d1_trips.len(), 2);
1205        let ids: std::collections::HashSet<&str> = d1_trips.iter().map(|n| n.id.as_str()).collect();
1206        assert!(ids.contains("t1") && ids.contains("t2"));
1207
1208        let d2_trips = db.neighbors("driver:d2", "handles");
1209        assert_eq!(d2_trips.len(), 1);
1210        assert_eq!(d2_trips[0].id, "t1");
1211    }
1212
1213    #[test]
1214    fn link_stored_in_links_collection() {
1215        // Links are stored as __links__ documents, not as graph edges.
1216        // The __links__ collection is NQL-queryable and consistent with the PyO3 binding.
1217        let db = Db::in_memory();
1218        db.put("driver", "d1", serde_json::json!({"name": "Bob"}),   vec![], None, None).unwrap();
1219        db.put("trip",   "t1", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
1220        db.link("driver:d1", "handles", "trip:t1").unwrap();
1221        // Verify the __links__ document was created
1222        let link_doc = db.get("__links__", "driver:d1|handles|trip:t1");
1223        assert!(link_doc.is_some(), "__links__ doc should exist");
1224        let doc = link_doc.unwrap();
1225        assert_eq!(doc.data["_from"], "driver:d1");
1226        assert_eq!(doc.data["_rel"],  "handles");
1227        assert_eq!(doc.data["_to"],   "trip:t1");
1228        // neighbors() resolves to the target node
1229        let nb = db.neighbors("driver:d1", "handles");
1230        assert_eq!(nb.len(), 1);
1231        assert_eq!(nb[0].id, "t1");
1232    }
1233
1234    #[test]
1235    fn link_missing_node_errors() {
1236        let db = Db::in_memory();
1237        db.put("driver", "d1", serde_json::json!({}), vec![], None, None).unwrap();
1238        assert!(db.link("driver:d1", "handles", "trip:ghost").is_err());
1239    }
1240
1241    #[test]
1242    fn link_durable_survives_reopen() {
1243        let dir = tempdir().unwrap();
1244        {
1245            let db = Db::open(dir.path(), None).unwrap();
1246            db.put("driver", "d1", serde_json::json!({"name": "Bob"}),   vec![], None, None).unwrap();
1247            db.put("trip",   "t1", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
1248            db.link("driver:d1", "handles", "trip:t1").unwrap();
1249        }
1250        let db2 = Db::open(dir.path(), None).unwrap();
1251        db2.startup_ready.store(true, std::sync::atomic::Ordering::SeqCst);
1252        let trips = db2.neighbors("driver:d1", "handles");
1253        assert_eq!(trips.len(), 1);
1254        assert_eq!(trips[0].id, "t1");
1255    }
1256
1257    #[test]
1258    fn tip_survives_warm_restart() {
1259        // v2.5.43: tip() returns the last written object AND survives a warm restart.
1260        // On reopen the seq_index is cold (warm start skips the scan), so tip() must
1261        // resolve the last write via the MANIFEST tip_hash fallback — no scan.
1262        let dir = tempdir().unwrap();
1263        {
1264            let db = Db::open(dir.path(), None).unwrap();
1265            db.put("blocks", "b1", serde_json::json!({"h": 1}), vec![], None, None).unwrap();
1266            db.put("blocks", "b2", serde_json::json!({"h": 2}), vec![], None, None).unwrap();
1267            db.flush_all(); // persists MANIFEST incl. tip_hash
1268            assert_eq!(db.tip().expect("tip in-session").id, "b2");
1269        }
1270        // Warm reopen: MANIFEST present -> no cold scan -> seq_index cold.
1271        let db2 = Db::open(dir.path(), None).unwrap();
1272        assert!(db2.get_hash_by_seq(1).is_none(), "seq_index is cold on a warm boot");
1273        let tip = db2.tip().expect("tip() must survive a warm restart");
1274        assert_eq!(tip.id, "b2");
1275        assert_eq!(tip.data.get("h").and_then(|v| v.as_i64()), Some(2));
1276    }
1277
1278    #[test]
1279    fn tip_collection_survives_warm_restart() {
1280        // Same contract as tip(), per collection: itc-node-rs resumes headers /
1281        // blocks / l2_receipts independently, so each must be its own durable
1282        // resume point — not just the global tip.
1283        let dir = tempdir().unwrap();
1284        {
1285            let db = Db::open(dir.path(), None).unwrap();
1286            db.put("blocks", "b1", serde_json::json!({"h": 1}), vec![], None, None).unwrap();
1287            db.put("tx",     "t1", serde_json::json!({"v": 1}), vec![], None, None).unwrap();
1288            let b2 = db.put("blocks", "b2", serde_json::json!({"h": 2}), vec![], None, None).unwrap();
1289            db.flush_all(); // persists MANIFEST incl. coll_tips
1290            assert_eq!(db.tip_collection("blocks").unwrap().id, "b2");
1291            assert_eq!(db.tip_collection("blocks").unwrap().seq, b2.seq);
1292        }
1293        // Warm reopen: MANIFEST present -> no cold scan -> seq_index cold.
1294        let db2 = Db::open(dir.path(), None).unwrap();
1295        assert!(db2.get_hash_by_seq(0).is_none(), "seq_index is cold on a warm boot");
1296        let blocks_tip = db2.tip_collection("blocks").expect("tip_collection must survive a warm restart");
1297        assert_eq!(blocks_tip.id, "b2");
1298        assert_eq!(blocks_tip.data.get("h").and_then(|v| v.as_i64()), Some(2));
1299        let tx_tip = db2.tip_collection("tx").expect("tx tip must also survive");
1300        assert_eq!(tx_tip.id, "t1");
1301        assert!(db2.tip_collection("absent").is_none());
1302    }
1303
1304    #[test]
1305    fn cold_scan_indexes_every_object_and_reports_completion() {
1306        // Regression guard for the cold-scan refactor: seq_index is now populated
1307        // DURING the parallel read pass (for live scan_status().indexed_count
1308        // progress — see cold_scan_background_arc), not in a second pass
1309        // afterward. This asserts the end state is unchanged: every written
1310        // object is indexed, tip()/tip_collection() are correct, and
1311        // scan_complete eventually reports true.
1312        let dir = tempdir().unwrap();
1313        let n = 25u64;
1314        {
1315            let db = Db::open(dir.path(), None).unwrap();
1316            for i in 0..n {
1317                db.put("things", &i.to_string(), serde_json::json!({"i": i}), vec![], None, None).unwrap();
1318            }
1319            db.flush_all();
1320        }
1321        // Force a COLD start regardless of the MANIFEST nedb-v2 itself would
1322        // have written: delete it so startup_rebuild() takes the cold path and
1323        // start_cold_scan() actually spawns the background scan this test needs
1324        // to exercise.
1325        std::fs::remove_file(dir.path().join("MANIFEST")).unwrap();
1326
1327        let db = Db::open(dir.path(), None).unwrap();
1328        assert!(!db.scan_status().scan_complete, "should be cold immediately after open");
1329        let db = std::sync::Arc::new(db);
1330        Db::start_cold_scan(std::sync::Arc::clone(&db));
1331
1332        let deadline = std::time::Instant::now() + std::time::Duration::from_secs(10);
1333        while !db.scan_status().scan_complete {
1334            assert!(std::time::Instant::now() < deadline, "cold scan did not complete in time");
1335            std::thread::sleep(std::time::Duration::from_millis(5));
1336        }
1337
1338        let status = db.scan_status();
1339        assert_eq!(status.indexed_count, n as usize, "every written object must be indexed");
1340        assert!(status.scan_complete);
1341
1342        let tip = db.tip().expect("tip resolves after cold scan");
1343        assert_eq!(tip.data.get("i").and_then(|v| v.as_u64()), Some(n - 1));
1344        let coll_tip = db.tip_collection("things").expect("tip_collection resolves after cold scan");
1345        assert_eq!(coll_tip.id, tip.id);
1346    }
1347
1348    /// Concurrent writers must settle the tip at the HIGHEST SEQ, and that tip
1349    /// must survive a warm restart. Before the seq-guarded tip fix, update_head
1350    /// was "last call wins": a slower thread carrying an OLDER seq could
1351    /// overwrite tip_hash after a newer write, and MANIFEST then persisted the
1352    /// stale tip for the next warm boot (flaky by nature — this pins the
1353    /// contract deterministically for the fixed code).
1354    #[test]
1355    fn concurrent_puts_tip_resolves_to_highest_seq_after_warm_restart() {
1356        let dir = tempdir().unwrap();
1357        let total: u64 = 100;
1358        {
1359            let db = std::sync::Arc::new(Db::open(dir.path(), None).unwrap());
1360            let mut handles = vec![];
1361            for t in 0..4u64 {
1362                let db2 = std::sync::Arc::clone(&db);
1363                handles.push(std::thread::spawn(move || {
1364                    for i in 0..25u64 {
1365                        db2.put("c", &format!("{}-{}", t, i),
1366                                serde_json::json!({"t": t, "i": i}),
1367                                vec![], None, None).unwrap();
1368                    }
1369                }));
1370            }
1371            for h in handles { h.join().unwrap(); }
1372            // In-session: tip must be the highest assigned seq.
1373            let expected = db.seq.load(std::sync::atomic::Ordering::SeqCst) - 1;
1374            assert_eq!(expected, total - 1, "exactly {} writes expected", total);
1375            assert_eq!(db.tip().expect("in-session tip").seq, expected);
1376            db.flush_all(); // persist MANIFEST incl. tip_hash
1377        }
1378        // Warm reopen: seq_index cold; tip() resolves via MANIFEST tip_hash.
1379        let db2 = Db::open(dir.path(), None).unwrap();
1380        let tip = db2.tip().expect("tip must survive warm restart after concurrent writes");
1381        assert_eq!(tip.seq, total - 1, "warm-boot tip must be the highest-seq write");
1382        // Per-collection tip: same contract.
1383        let ct = db2.tip_collection("c").expect("coll tip survives");
1384        assert_eq!(ct.seq, total - 1);
1385    }
1386
1387    /// Pre-2.5.43 MANIFESTs (no tip_hash) must warm-boot, NOT force a cold
1388    /// scan. The old "cold scan once to upgrade" policy was hours of random
1389    /// reads on multi-million-object seek-bound stores (itcd -dagv3), re-paid
1390    /// on every boot if the process exited before the scan finished. seq+head
1391    /// in the old MANIFEST are valid; tip()/tip_collection() return None until
1392    /// the first write+flush organically rewrites MANIFEST with a tip.
1393    #[test]
1394    fn pre_durable_tip_manifest_warm_boots_and_heals_lazily() {
1395        let dir = tempdir().unwrap();
1396        {
1397            let db = Db::open(dir.path(), None).unwrap();
1398            for i in 0..5u64 {
1399                db.put("things", &i.to_string(), serde_json::json!({"i": i}), vec![], None, None).unwrap();
1400            }
1401            db.flush_all();
1402        }
1403        // Rewrite MANIFEST in the pre-2.5.43 shape: seq + head only.
1404        let manifest_path = dir.path().join("MANIFEST");
1405        let m: serde_json::Value =
1406            serde_json::from_str(&std::fs::read_to_string(&manifest_path).unwrap()).unwrap();
1407        let old_format = serde_json::json!({ "seq": m["seq"], "head": m["head"] });
1408        std::fs::write(&manifest_path, serde_json::to_string(&old_format).unwrap()).unwrap();
1409
1410        // Reopen: must be WARM (startup_ready immediately — no cold scan gate).
1411        let db2 = Db::open(dir.path(), None).unwrap();
1412        assert!(db2.startup_ready.load(std::sync::atomic::Ordering::SeqCst),
1413                "pre-2.5.43 MANIFEST must warm-boot, not fall to a cold scan");
1414        // tip() unresolvable this boot — documented None, not a panic or scan.
1415        assert!(db2.tip().is_none(), "tip() is None until the manifest heals");
1416        // seq continuity: a new write gets a FRESH seq (no reuse).
1417        let n = db2.put("things", "next", serde_json::json!({"fresh": true}), vec![], None, None).unwrap();
1418        assert_eq!(n.seq, m["seq"].as_u64().unwrap(), "next write takes the persisted next-to-assign seq");
1419        db2.flush_all(); // organic upgrade: MANIFEST now carries tip_hash
1420        drop(db2);
1421
1422        // Healed: next boot is warm AND tip() resolves.
1423        let db3 = Db::open(dir.path(), None).unwrap();
1424        assert!(db3.startup_ready.load(std::sync::atomic::Ordering::SeqCst));
1425        let tip = db3.tip().expect("tip() must resolve after the organic upgrade");
1426        assert_eq!(tip.id, "next");
1427    }
1428
1429    /// Regression for the cold-scan MANIFEST seq off-by-one. The scan's old
1430    /// hand-rolled MANIFEST stored `seq: max_seq` (the last USED seq), but the
1431    /// warm boot loads `m.seq` as the NEXT-TO-ASSIGN counter — so a restart
1432    /// right after a quiet cold scan handed the next write the tip's seq:
1433    /// a DUPLICATE seq in the log (seq_index overwrite, wrong since() page).
1434    /// The scan now writes MANIFEST via flush_manifest(), which reads the live
1435    /// counter (max_seq + 1).
1436    #[test]
1437    fn manifest_after_cold_scan_does_not_reuse_tip_seq() {
1438        let dir = tempdir().unwrap();
1439        let old_tip_seq;
1440        {
1441            let db = Db::open(dir.path(), None).unwrap();
1442            for i in 0..5u64 {
1443                db.put("things", &i.to_string(), serde_json::json!({"i": i}), vec![], None, None).unwrap();
1444            }
1445            db.flush_all();
1446            old_tip_seq = db.tip().unwrap().seq;
1447        }
1448        // Force a cold start: remove MANIFEST so the background scan runs and
1449        // writes a fresh MANIFEST itself.
1450        std::fs::remove_file(dir.path().join("MANIFEST")).unwrap();
1451        {
1452            let db = std::sync::Arc::new(Db::open(dir.path(), None).unwrap());
1453            Db::start_cold_scan(std::sync::Arc::clone(&db));
1454            let deadline = std::time::Instant::now() + std::time::Duration::from_secs(10);
1455            while !db.scan_status().scan_complete {
1456                assert!(std::time::Instant::now() < deadline, "cold scan did not complete");
1457                std::thread::sleep(std::time::Duration::from_millis(5));
1458            }
1459            // No further writes — the scan's own MANIFEST is what the next boot sees.
1460        }
1461        // Warm reopen from the scan-written MANIFEST: the next write must get a
1462        // FRESH seq, never the tip's.
1463        let db3 = Db::open(dir.path(), None).unwrap();
1464        let tip_before = db3.tip().expect("tip survives scan-written MANIFEST");
1465        assert_eq!(tip_before.seq, old_tip_seq, "tip identity preserved across the scan");
1466        let new_node = db3.put("things", "next", serde_json::json!({"fresh": true}),
1467                               vec![], None, None).unwrap();
1468        assert!(new_node.seq > old_tip_seq,
1469                "new write reused seq {} (tip was {}) — duplicate seq in the log",
1470                new_node.seq, old_tip_seq);
1471    }
1472}