Skip to main content

nedb_engine/
index.rs

1//! Index store for NEDB v2.
2//!
3//! Two index types:
4//!
5//! 1. **ID index** (`indexes/{coll}/id/{doc_id}` → object hash)
6//!    Atomic file-per-document. Reading is a single `fs::read_to_string`.
7//!    Writing is atomic (write .tmp → rename). Parallel reads are lock-free.
8//!
9//! 2. **Sorted index** (`indexes/{coll}/{field}.sorted` → in-memory BTreeMap)
10//!    Rebuilt from object store on startup. Persisted as a compact binary
11//!    file for fast cold start. Used for ORDER BY field ASC/DESC LIMIT n.
12
13use std::collections::BTreeMap;
14use std::fs;
15use std::path::{Path, PathBuf};
16use std::sync::Arc;
17use anyhow::Result;
18use dashmap::DashMap;
19use serde_json::Value;
20
21/// Ordered JSON value for BTree indexes (null < bool < number < string < array < object).
22#[derive(Debug, Clone, PartialEq)]
23pub enum OrderedValue {
24    Null,
25    Bool(bool),
26    Number(f64),   // NaN-safe comparison via total_cmp
27    Str(String),
28    Array(Vec<OrderedValue>),
29    Object,        // objects are all equal in ordering (sort by insertion order falls back to hash)
30}
31
32impl Eq for OrderedValue {}
33
34impl PartialOrd for OrderedValue {
35    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
36        Some(self.cmp(other))
37    }
38}
39
40impl Ord for OrderedValue {
41    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
42        use OrderedValue::*;
43        use std::cmp::Ordering::*;
44        match (self, other) {
45            (Null, Null)       => Equal,
46            (Null, _)          => Less,
47            (_, Null)          => Greater,
48            (Bool(a), Bool(b)) => a.cmp(b),
49            (Bool(_), _)       => Less,
50            (_, Bool(_))       => Greater,
51            (Number(a), Number(b)) => a.total_cmp(b),
52            (Number(_), _)     => Less,
53            (_, Number(_))     => Greater,
54            (Str(a), Str(b))   => a.cmp(b),
55            (Str(_), _)        => Less,
56            (_, Str(_))        => Greater,
57            (Array(a), Array(b)) => a.cmp(b),
58            (Array(_), _)      => Less,
59            (_, Array(_))      => Greater,
60            (Object, Object)   => Equal,
61        }
62    }
63}
64
65impl From<&Value> for OrderedValue {
66    fn from(v: &Value) -> Self {
67        match v {
68            Value::Null        => OrderedValue::Null,
69            Value::Bool(b)     => OrderedValue::Bool(*b),
70            Value::Number(n)   => OrderedValue::Number(n.as_f64().unwrap_or(f64::NAN)),
71            Value::String(s)   => OrderedValue::Str(s.clone()),
72            Value::Array(a)    => OrderedValue::Array(a.iter().map(|x| x.into()).collect()),
73            Value::Object(_)   => OrderedValue::Object,
74        }
75    }
76}
77
78/// Compute a 2-char hex shard prefix from a document id.
79/// Distributes files across 256 subdirectories to avoid flat-directory
80/// slowdown on ext4/xfs when a collection has >50k documents.
81fn id_shard(id: &str) -> String {
82    // FNV-1a 32-bit — fast, no crypto needed, deterministic
83    let mut hash: u32 = 2166136261;
84    for b in id.bytes() {
85        hash ^= b as u32;
86        hash = hash.wrapping_mul(16777619);
87    }
88    format!("{:02x}", hash & 0xff)
89}
90
91/// Encode a document id into a filesystem-safe leaf filename.
92///
93/// The id-index stores one file per document, and the id is the filename. Raw
94/// ids work on case-sensitive POSIX filesystems, but ids containing bytes that
95/// are illegal in Windows filenames (`: | / \ < > " ? *`, control chars) — most
96/// notably link ids like `driver:d1|handles|trip:t1` — cannot be written there,
97/// so the write silently fails and the entry is lost on reopen.
98///
99/// We percent-escape every byte that isn't unreserved (`A-Z a-z 0-9 - _ .`).
100/// `%` itself is escaped so decoding is unambiguous. Safe ids (block heights,
101/// hex hashes, utxo keys) are all-unreserved and return UNCHANGED, so existing
102/// chainstate paths are byte-for-byte identical and the hot path is unaffected.
103fn encode_id(id: &str) -> String {
104    fn is_unreserved(b: u8) -> bool {
105        b.is_ascii_alphanumeric() || matches!(b, b'-' | b'_' | b'.')
106    }
107    if id.bytes().all(is_unreserved) {
108        return id.to_string();
109    }
110    let mut out = String::with_capacity(id.len() + 8);
111    for &b in id.as_bytes() {
112        if is_unreserved(b) {
113            out.push(b as char);
114        } else {
115            out.push_str(&format!("%{:02X}", b));
116        }
117    }
118    out
119}
120
121/// Inverse of `encode_id`. A name with no `%` (a safe id, or a legacy raw id
122/// written by an older version on a POSIX filesystem) is returned unchanged, so
123/// `list_ids` recovers the right id for both new and pre-upgrade files.
124fn decode_id(name: &str) -> String {
125    if !name.contains('%') {
126        return name.to_string();
127    }
128    fn hexval(b: u8) -> Option<u8> {
129        match b {
130            b'0'..=b'9' => Some(b - b'0'),
131            b'A'..=b'F' => Some(b - b'A' + 10),
132            b'a'..=b'f' => Some(b - b'a' + 10),
133            _ => None,
134        }
135    }
136    let bytes = name.as_bytes();
137    let mut out: Vec<u8> = Vec::with_capacity(bytes.len());
138    let mut i = 0;
139    while i < bytes.len() {
140        if bytes[i] == b'%' && i + 2 < bytes.len() {
141            if let (Some(hi), Some(lo)) = (hexval(bytes[i + 1]), hexval(bytes[i + 2])) {
142                out.push(hi * 16 + lo);
143                i += 3;
144                continue;
145            }
146        }
147        out.push(bytes[i]);
148        i += 1;
149    }
150    String::from_utf8_lossy(&out).into_owned()
151}
152
153/// Per-document ID index — atomic file-per-doc, sharded across 256 subdirs.
154///
155/// Write path: updates go to `write_buf` (DashMap, zero I/O, lock-free).
156/// Background ticker calls `flush_write_buf()` every 1s — Rayon-parallel disk writes.
157/// Read path: `write_buf` checked first (latest value), then disk.
158/// This eliminates per-PUT `fs::rename` from the hot path, fixing concurrent write contention.
159pub struct IdIndex {
160    root:      PathBuf,
161    /// In-memory store: (coll, id) → hash. None = disk-backed (normal mode).
162    mem:       Option<Arc<dashmap::DashMap<(String, String), String>>>,
163    /// WAL write buffer — disk-backed mode buffers here, flushed to disk periodically.
164    write_buf: Arc<dashmap::DashMap<(String, String), Option<String>>>,  // None = tombstone
165}
166
167impl IdIndex {
168    pub fn new(db_root: &Path) -> Result<Self> {
169        let root = db_root.join("indexes");
170        fs::create_dir_all(&root)?;
171        Ok(Self { root, mem: None, write_buf: Arc::new(dashmap::DashMap::new()) })
172    }
173
174    /// Create a pure in-memory id index — no disk I/O.
175    pub fn in_memory() -> Self {
176        Self {
177            root:      PathBuf::from(":memory:"),
178            mem:       Some(Arc::new(dashmap::DashMap::new())),
179            write_buf: Arc::new(dashmap::DashMap::new()),
180        }
181    }
182
183    /// Flush the WAL write buffer to disk in parallel. Called by the background ticker.
184    /// No-op for in-memory databases. Safe to call concurrently with writes.
185    pub fn flush_write_buf(&self) {
186        if self.mem.is_some() || self.write_buf.is_empty() { return; }
187        use rayon::prelude::*;
188        // Drain all pending entries and write them in parallel
189        let entries: Vec<((String, String), Option<String>)> = self.write_buf
190            .iter()
191            .map(|e| (e.key().clone(), e.value().clone()))
192            .collect();
193        entries.par_iter().for_each(|((coll, id), hash_opt)| {
194            match hash_opt {
195                Some(hash) => {
196                    // Write/update: tmp → rename
197                    let path = self.path(coll, id);
198                    if let Some(parent) = path.parent() {
199                        let _ = fs::create_dir_all(parent);
200                    }
201                    let tmp = path.with_extension("tmp");
202                    if fs::write(&tmp, hash).is_ok() {
203                        let _ = fs::rename(&tmp, &path);
204                    }
205                }
206                None => {
207                    // Tombstone: remove the file (encoded leaf + legacy raw if distinct)
208                    let path = self.path(coll, id);
209                    let _ = fs::remove_file(&path);
210                    let raw = self.raw_path(coll, id);
211                    if raw != path { let _ = fs::remove_file(&raw); }
212                }
213            }
214        });
215        // Clear flushed entries — but ONLY when the buffered value is still the
216        // exact value we flushed. An unconditional remove() here would delete a
217        // NEWER value written between the snapshot above and this point: that
218        // write would never reach disk (the file holds the stale hash we just
219        // wrote) and get() would serve the old version once the buffer check
220        // misses — a silent lost update. remove_if closes the race; a newer
221        // value simply stays buffered and flushes on the next tick.
222        for (key, flushed_val) in &entries {
223            self.write_buf.remove_if(key, |_, current| current == flushed_val);
224        }
225    }
226
227    fn path(&self, coll: &str, id: &str) -> PathBuf {
228        // Shard across 256 subdirectories using first 2 hex chars of a simple
229        // hash of the id. Prevents flat-directory slowdown (ext4 htree degrades
230        // past ~50k files per directory) for large collections like kv.
231        // Format: indexes/{coll}/id/{shard}/{encode_id(id)}
232        // Shard on the RAW id (stable across versions); only the leaf filename
233        // is encoded so it is legal on every filesystem (incl. Windows).
234        let shard = id_shard(id);
235        self.root.join(coll).join("id").join(&shard).join(encode_id(id))
236    }
237
238    /// Legacy path: the raw id as the leaf filename (pre-`encode_id`). Used only
239    /// as a read/cleanup fallback so id-index entries written by older versions
240    /// on POSIX filesystems stay readable after upgrade. On Windows a raw path
241    /// with illegal chars simply fails to open (→ treated as absent).
242    fn raw_path(&self, coll: &str, id: &str) -> PathBuf {
243        let shard = id_shard(id);
244        self.root.join(coll).join("id").join(&shard).join(id)
245    }
246
247    /// Get the current object hash for a document.
248    /// Checks WAL write buffer first (most recent), then disk.
249    pub fn get(&self, coll: &str, id: &str) -> Option<String> {
250        if let Some(ref mem) = self.mem {
251            return mem.get(&(coll.to_string(), id.to_string())).map(|v| v.clone());
252        }
253        // Check WAL buffer first — may have an unflushed write or tombstone
254        let key = (coll.to_string(), id.to_string());
255        if let Some(entry) = self.write_buf.get(&key) {
256            return entry.value().clone();  // None = tombstoned
257        }
258        // Fall through to disk: encoded filename first, then the legacy raw
259        // filename (pre-upgrade data). For safe ids the two paths are identical,
260        // so this is a single read on the hot path.
261        let p = self.path(coll, id);
262        let content = match fs::read_to_string(&p) {
263            Ok(c) => c,
264            Err(_) => {
265                let raw = self.raw_path(coll, id);
266                if raw == p { return None; }
267                fs::read_to_string(&raw).ok()?
268            }
269        };
270        let h = content.trim().to_string();
271        if h.is_empty() { None } else { Some(h) }
272    }
273
274    /// Set the current object hash for a document.
275    /// Disk mode: writes to WAL buffer only (zero I/O on hot path).
276    /// Background ticker flushes WAL to disk every 1s via Rayon.
277    pub fn set(&self, coll: &str, id: &str, hash: &str) -> Result<()> {
278        if let Some(ref mem) = self.mem {
279            mem.insert((coll.to_string(), id.to_string()), hash.to_string());
280            return Ok(());
281        }
282        // WAL: buffer the update, no disk I/O here
283        self.write_buf.insert(
284            (coll.to_string(), id.to_string()),
285            Some(hash.to_string()),
286        );
287        Ok(())
288    }
289
290    /// List all doc IDs in a collection (memory map or disk + WAL merge).
291    pub fn list_ids(&self, coll: &str) -> Vec<String> {
292        if let Some(ref mem) = self.mem {
293            return mem.iter()
294                .filter(|e| e.key().0 == coll)
295                .map(|e| e.key().1.clone())
296                .collect();
297        }
298        // Read from disk then overlay WAL (adds buffered writes, removes tombstones)
299        let id_root = self.root.join(coll).join("id");
300        // Each entry in id_root is a 2-char hex shard dir
301        fs::read_dir(&id_root)
302            .into_iter()
303            .flatten()
304            .filter_map(|e| e.ok())
305            .filter(|e| e.file_type().map(|t| t.is_dir()).unwrap_or(false))
306            .flat_map(|shard_dir| {
307                fs::read_dir(shard_dir.path())
308                    .into_iter()
309                    .flatten()
310                    .filter_map(|e| e.ok())
311                    .filter_map(|e| {
312                        let name = e.file_name().to_string_lossy().to_string();
313                        if name.ends_with(".tmp") { return None; }
314                        // Decode the on-disk filename back to the document id
315                        // (encoded for new files; identity for legacy/safe ids).
316                        Some(decode_id(&name))
317                    })
318                    .collect::<Vec<_>>()
319            })
320            .collect::<std::collections::HashSet<_>>()
321            .into_iter()
322            // Overlay WAL: add buffered writes, remove tombstones
323            .chain(
324                self.write_buf.iter()
325                    .filter(|e| e.key().0 == coll && e.value().is_some())
326                    .map(|e| e.key().1.clone())
327            )
328            .collect::<std::collections::HashSet<_>>()
329            .into_iter()
330            .filter(|id| {
331                // Exclude WAL tombstones
332                self.write_buf.get(&(coll.to_string(), id.clone()))
333                    .map(|v| v.is_some())
334                    .unwrap_or(true)
335            })
336            .collect()
337    }
338
339    /// Remove the id index entry for a document (tombstone / delete).
340    /// Disk mode: writes a tombstone to the WAL buffer; flushed to disk on next ticker.
341    pub fn remove(&self, coll: &str, id: &str) -> Result<()> {
342        if let Some(ref mem) = self.mem {
343            mem.remove(&(coll.to_string(), id.to_string()));
344            return Ok(());
345        }
346        // WAL tombstone: None value means "delete this file on flush"
347        self.write_buf.insert((coll.to_string(), id.to_string()), None);
348        Ok(())
349    }
350
351    /// List all known collections.
352    pub fn collections(&self) -> Vec<String> {
353        if let Some(ref mem) = self.mem {
354            let mut colls: Vec<String> = mem.iter()
355                .map(|e| e.key().0.clone())
356                .collect::<std::collections::HashSet<_>>()
357                .into_iter().collect();
358            colls.sort();
359            return colls;
360        }
361        fs::read_dir(&self.root)
362            .into_iter()
363            .flatten()
364            .filter_map(|e| e.ok())
365            .filter(|e| e.file_type().map(|t| t.is_dir()).unwrap_or(false))
366            .map(|e| e.file_name().to_string_lossy().to_string())
367            .collect()
368    }
369}
370
371/// In-memory sorted index per (collection, field).
372/// Rebuilt from object store on startup. O(log n) ORDER BY queries.
373pub struct SortedIndexes {
374    /// (coll, field) → BTreeMap<value, Vec<hash>>
375    inner: DashMap<(String, String), BTreeMap<OrderedValue, Vec<String>>>,
376}
377
378impl SortedIndexes {
379    pub fn new() -> Self {
380        Self { inner: DashMap::new() }
381    }
382
383    /// Register a field as sorted-indexed for a collection.
384    /// Must be called before any puts for that field to be indexed.
385    pub fn ensure(&self, coll: &str, field: &str) {
386        self.inner
387            .entry((coll.to_string(), field.to_string()))
388            .or_default();
389    }
390
391    /// Insert (or update) a value → hash mapping.
392    pub fn insert(&self, coll: &str, field: &str, value: &Value, hash: &str) {
393        let key = (coll.to_string(), field.to_string());
394        if let Some(mut idx) = self.inner.get_mut(&key) {
395            let ov = OrderedValue::from(value);
396            idx.entry(ov)
397               .or_default()
398               .push(hash.to_string());
399        }
400    }
401
402    /// Remove a hash from the index (on overwrite/delete of a doc version).
403    pub fn remove(&self, coll: &str, field: &str, value: &Value, hash: &str) {
404        let key = (coll.to_string(), field.to_string());
405        if let Some(mut idx) = self.inner.get_mut(&key) {
406            let ov = OrderedValue::from(value);
407            if let Some(hashes) = idx.get_mut(&ov) {
408                hashes.retain(|h| h != hash);
409                if hashes.is_empty() { idx.remove(&ov); }
410            }
411        }
412    }
413
414    /// Return the top-k hashes ordered by field ASC.
415    pub fn top_k_asc(&self, coll: &str, field: &str, k: usize) -> Vec<String> {
416        let key = (coll.to_string(), field.to_string());
417        self.inner.get(&key).map(|idx| {
418            idx.values().flat_map(|v| v.iter().cloned()).take(k).collect()
419        }).unwrap_or_default()
420    }
421
422    /// Return the top-k hashes ordered by field DESC.
423    pub fn top_k_desc(&self, coll: &str, field: &str, k: usize) -> Vec<String> {
424        let key = (coll.to_string(), field.to_string());
425        self.inner.get(&key).map(|idx| {
426            idx.values().rev().flat_map(|v| v.iter().cloned()).take(k).collect()
427        }).unwrap_or_default()
428    }
429
430    /// Check if a sorted index exists for a (coll, field) pair.
431    pub fn has(&self, coll: &str, field: &str) -> bool {
432        self.inner.contains_key(&(coll.to_string(), field.to_string()))
433    }
434
435    /// True if no sorted indexes have been registered yet.
436    pub fn is_empty(&self) -> bool {
437        self.inner.is_empty()
438    }
439}
440
441#[cfg(test)]
442mod tests {
443    use super::*;
444    use tempfile::tempdir;
445
446    #[test]
447    fn id_index_roundtrip() {
448        let dir = tempdir().unwrap();
449        let idx = IdIndex::new(dir.path()).unwrap();
450        idx.set("blocks", "618000", "abcdef1234").unwrap();
451        assert_eq!(idx.get("blocks", "618000"), Some("abcdef1234".to_string()));
452    }
453
454    #[test]
455    fn encode_decode_id_bijective() {
456        // Safe ids pass through unchanged (chainstate paths stay identical).
457        for safe in ["618000", "utxo-000000042", "abc_DEF.123", "deadBEEF"] {
458            assert_eq!(encode_id(safe), safe, "safe id must be identity");
459            assert_eq!(decode_id(&encode_id(safe)), safe);
460        }
461        // FS-unsafe ids (link ids, paths) round-trip and contain no illegal
462        // Windows filename chars once encoded.
463        for weird in ["driver:d1|handles|trip:t1", "a/b\\c", "x<y>z?\"*", "100%done"] {
464            let enc = encode_id(weird);
465            assert!(
466                !enc.chars().any(|c| matches!(c,
467                    ':' | '|' | '/' | '\\' | '<' | '>' | '?' | '"' | '*')),
468                "encoded leaf must be filesystem-safe: {}", enc);
469            assert_eq!(decode_id(&enc), weird, "encode/decode must round-trip");
470        }
471    }
472
473    #[test]
474    fn id_index_fs_unsafe_id_survives_disk_roundtrip() {
475        // Regression: link ids contain ':' and '|', illegal in Windows filenames.
476        // They must persist to the on-disk id-index and read back after reopen.
477        let dir = tempdir().unwrap();
478        let weird = "driver:d1|handles|trip:t1";
479        {
480            let idx = IdIndex::new(dir.path()).unwrap();
481            idx.set("__links__", weird, "deadbeefcafe").unwrap();
482            idx.flush_write_buf(); // persist WAL → disk (encoded leaf filename)
483        }
484        // Cold reopen: nothing in the WAL, must come from disk.
485        let idx2 = IdIndex::new(dir.path()).unwrap();
486        assert_eq!(idx2.get("__links__", weird), Some("deadbeefcafe".to_string()),
487                   "FS-unsafe id must be readable from disk after reopen");
488        assert_eq!(idx2.list_ids("__links__"), vec![weird.to_string()],
489                   "list_ids must decode the on-disk filename back to the id");
490    }
491
492    #[test]
493    fn ordered_value_ordering() {
494        use OrderedValue::*;
495        assert!(Null < Bool(false));
496        assert!(Bool(false) < Bool(true));
497        assert!(Bool(true) < Number(0.0));
498        assert!(Number(1.0) < Number(2.0));
499        assert!(Number(2.0) < Str("a".to_string()));
500        assert!(Str("a".to_string()) < Str("b".to_string()));
501    }
502
503    #[test]
504    fn sorted_index_top_k() {
505        let idx = SortedIndexes::new();
506        idx.ensure("blocks", "height");
507        idx.insert("blocks", "height", &serde_json::json!(3), "hash3");
508        idx.insert("blocks", "height", &serde_json::json!(1), "hash1");
509        idx.insert("blocks", "height", &serde_json::json!(2), "hash2");
510        let asc = idx.top_k_asc("blocks", "height", 2);
511        assert_eq!(asc, vec!["hash1", "hash2"]);
512        let desc = idx.top_k_desc("blocks", "height", 2);
513        assert_eq!(desc, vec!["hash3", "hash2"]);
514    }
515
516    /// Regression stress test for the flush_write_buf lost-update race.
517    ///
518    /// Old behavior: flush snapshotted the buffer, wrote files in parallel, then
519    /// UNCONDITIONALLY removed each snapshotted key. A set() landing between the
520    /// snapshot and the remove was deleted from the buffer without ever being
521    /// flushed — disk kept the stale hash and (with no later write to re-insert
522    /// the key) the newer value was lost forever.
523    ///
524    /// Shape: every key is written exactly twice (v1 then v2) while a flusher
525    /// thread spins. Under the old code, keys whose v1 was snapshotted and whose
526    /// v2 arrived during the parallel disk-write phase get their v2 dropped by
527    /// the unconditional remove — the final assert catches them on disk at v1.
528    /// With remove_if, a superseded snapshot entry leaves the newer value
529    /// buffered for the next flush, so every key must read v2 at the end.
530    /// (Probabilistic by nature, but the race window — thousands of parallel
531    /// file writes — is wide; with 2000 keys the old code fails reliably.)
532    #[test]
533    fn flush_never_drops_a_concurrent_newer_write() {
534        use std::sync::Arc;
535        use std::sync::atomic::{AtomicBool, Ordering};
536
537        let dir = tempdir().unwrap();
538        let idx = Arc::new(IdIndex::new(dir.path()).unwrap());
539        let stop = Arc::new(AtomicBool::new(false));
540        const N: usize = 2000;
541
542        let flusher = {
543            let idx = Arc::clone(&idx);
544            let stop = Arc::clone(&stop);
545            std::thread::spawn(move || {
546                while !stop.load(Ordering::Relaxed) {
547                    idx.flush_write_buf();
548                }
549            })
550        };
551
552        // v1 for every key, then v2 for every key — the flusher races both passes.
553        for i in 0..N {
554            idx.set("c", &format!("k{}", i), "v1").unwrap();
555        }
556        for i in 0..N {
557            idx.set("c", &format!("k{}", i), "v2").unwrap();
558        }
559
560        stop.store(true, Ordering::Relaxed);
561        flusher.join().unwrap();
562        // Drain anything still buffered (remove_if leaves superseded entries in).
563        idx.flush_write_buf();
564        idx.flush_write_buf();
565
566        // Every key must be v2 — from this handle AND from a cold reopen (disk).
567        for i in 0..N {
568            let k = format!("k{}", i);
569            assert_eq!(idx.get("c", &k), Some("v2".to_string()),
570                       "key {} lost its newer write (buffer path)", k);
571        }
572        let cold = IdIndex::new(dir.path()).unwrap();
573        for i in 0..N {
574            let k = format!("k{}", i);
575            assert_eq!(cold.get("c", &k), Some("v2".to_string()),
576                       "key {} lost its newer write (disk path)", k);
577        }
578    }
579}