Skip to main content

nedb_engine/
index.rs

1//! Index store for NEDB v2.
2//!
3//! Two index types:
4//!
5//! 1. **ID index** (`indexes/{coll}/id/{doc_id}` → object hash)
6//!    Atomic file-per-document. Reading is a single `fs::read_to_string`.
7//!    Writing is atomic (write .tmp → rename). Parallel reads are lock-free.
8//!
9//! 2. **Sorted index** (`indexes/{coll}/{field}.sorted` → in-memory BTreeMap)
10//!    Rebuilt from object store on startup. Persisted as a compact binary
11//!    file for fast cold start. Used for ORDER BY field ASC/DESC LIMIT n.
12
13use std::collections::BTreeMap;
14use std::fs;
15use std::path::{Path, PathBuf};
16use std::sync::Arc;
17use anyhow::Result;
18use dashmap::DashMap;
19use serde_json::Value;
20
21/// Ordered JSON value for BTree indexes (null < bool < number < string < array < object).
22#[derive(Debug, Clone, PartialEq)]
23pub enum OrderedValue {
24    Null,
25    Bool(bool),
26    Number(f64),   // NaN-safe comparison via total_cmp
27    Str(String),
28    Array(Vec<OrderedValue>),
29    Object,        // objects are all equal in ordering (sort by insertion order falls back to hash)
30}
31
32impl Eq for OrderedValue {}
33
34impl PartialOrd for OrderedValue {
35    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
36        Some(self.cmp(other))
37    }
38}
39
40impl Ord for OrderedValue {
41    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
42        use OrderedValue::*;
43        use std::cmp::Ordering::*;
44        match (self, other) {
45            (Null, Null)       => Equal,
46            (Null, _)          => Less,
47            (_, Null)          => Greater,
48            (Bool(a), Bool(b)) => a.cmp(b),
49            (Bool(_), _)       => Less,
50            (_, Bool(_))       => Greater,
51            (Number(a), Number(b)) => a.total_cmp(b),
52            (Number(_), _)     => Less,
53            (_, Number(_))     => Greater,
54            (Str(a), Str(b))   => a.cmp(b),
55            (Str(_), _)        => Less,
56            (_, Str(_))        => Greater,
57            (Array(a), Array(b)) => a.cmp(b),
58            (Array(_), _)      => Less,
59            (_, Array(_))      => Greater,
60            (Object, Object)   => Equal,
61        }
62    }
63}
64
65impl From<&Value> for OrderedValue {
66    fn from(v: &Value) -> Self {
67        match v {
68            Value::Null        => OrderedValue::Null,
69            Value::Bool(b)     => OrderedValue::Bool(*b),
70            Value::Number(n)   => OrderedValue::Number(n.as_f64().unwrap_or(f64::NAN)),
71            Value::String(s)   => OrderedValue::Str(s.clone()),
72            Value::Array(a)    => OrderedValue::Array(a.iter().map(|x| x.into()).collect()),
73            Value::Object(_)   => OrderedValue::Object,
74        }
75    }
76}
77
78/// Compute a 2-char hex shard prefix from a document id.
79/// Distributes files across 256 subdirectories to avoid flat-directory
80/// slowdown on ext4/xfs when a collection has >50k documents.
81fn id_shard(id: &str) -> String {
82    // FNV-1a 32-bit — fast, no crypto needed, deterministic
83    let mut hash: u32 = 2166136261;
84    for b in id.bytes() {
85        hash ^= b as u32;
86        hash = hash.wrapping_mul(16777619);
87    }
88    format!("{:02x}", hash & 0xff)
89}
90
91/// Encode a document id into a filesystem-safe leaf filename.
92///
93/// The id-index stores one file per document, and the id is the filename. Raw
94/// ids work on case-sensitive POSIX filesystems, but ids containing bytes that
95/// are illegal in Windows filenames (`: | / \ < > " ? *`, control chars) — most
96/// notably link ids like `driver:d1|handles|trip:t1` — cannot be written there,
97/// so the write silently fails and the entry is lost on reopen.
98///
99/// We percent-escape every byte that isn't unreserved (`A-Z a-z 0-9 - _ .`).
100/// `%` itself is escaped so decoding is unambiguous. Safe ids (block heights,
101/// hex hashes, utxo keys) are all-unreserved and return UNCHANGED, so existing
102/// chainstate paths are byte-for-byte identical and the hot path is unaffected.
103fn encode_id(id: &str) -> String {
104    fn is_unreserved(b: u8) -> bool {
105        b.is_ascii_alphanumeric() || matches!(b, b'-' | b'_' | b'.')
106    }
107    if id.bytes().all(is_unreserved) {
108        return id.to_string();
109    }
110    let mut out = String::with_capacity(id.len() + 8);
111    for &b in id.as_bytes() {
112        if is_unreserved(b) {
113            out.push(b as char);
114        } else {
115            out.push_str(&format!("%{:02X}", b));
116        }
117    }
118    out
119}
120
121/// Inverse of `encode_id`. A name with no `%` (a safe id, or a legacy raw id
122/// written by an older version on a POSIX filesystem) is returned unchanged, so
123/// `list_ids` recovers the right id for both new and pre-upgrade files.
124fn decode_id(name: &str) -> String {
125    if !name.contains('%') {
126        return name.to_string();
127    }
128    fn hexval(b: u8) -> Option<u8> {
129        match b {
130            b'0'..=b'9' => Some(b - b'0'),
131            b'A'..=b'F' => Some(b - b'A' + 10),
132            b'a'..=b'f' => Some(b - b'a' + 10),
133            _ => None,
134        }
135    }
136    let bytes = name.as_bytes();
137    let mut out: Vec<u8> = Vec::with_capacity(bytes.len());
138    let mut i = 0;
139    while i < bytes.len() {
140        if bytes[i] == b'%' && i + 2 < bytes.len() {
141            if let (Some(hi), Some(lo)) = (hexval(bytes[i + 1]), hexval(bytes[i + 2])) {
142                out.push(hi * 16 + lo);
143                i += 3;
144                continue;
145            }
146        }
147        out.push(bytes[i]);
148        i += 1;
149    }
150    String::from_utf8_lossy(&out).into_owned()
151}
152
153/// Per-document ID index — atomic file-per-doc, sharded across 256 subdirs.
154///
155/// Write path: updates go to `write_buf` (DashMap, zero I/O, lock-free).
156/// Background ticker calls `flush_write_buf()` every 1s — Rayon-parallel disk writes.
157/// Read path: `write_buf` checked first (latest value), then disk.
158/// This eliminates per-PUT `fs::rename` from the hot path, fixing concurrent write contention.
159pub struct IdIndex {
160    root:      PathBuf,
161    /// In-memory store: (coll, id) → hash. None = disk-backed (normal mode).
162    mem:       Option<Arc<dashmap::DashMap<(String, String), String>>>,
163    /// WAL write buffer — disk-backed mode buffers here, flushed to disk periodically.
164    write_buf: Arc<dashmap::DashMap<(String, String), Option<String>>>,  // None = tombstone
165}
166
167impl IdIndex {
168    pub fn new(db_root: &Path) -> Result<Self> {
169        let root = db_root.join("indexes");
170        fs::create_dir_all(&root)?;
171        Ok(Self { root, mem: None, write_buf: Arc::new(dashmap::DashMap::new()) })
172    }
173
174    /// Create a pure in-memory id index — no disk I/O.
175    pub fn in_memory() -> Self {
176        Self {
177            root:      PathBuf::from(":memory:"),
178            mem:       Some(Arc::new(dashmap::DashMap::new())),
179            write_buf: Arc::new(dashmap::DashMap::new()),
180        }
181    }
182
183    /// Flush the WAL write buffer to disk in parallel. Called by the background ticker.
184    /// No-op for in-memory databases. Safe to call concurrently with writes.
185    pub fn flush_write_buf(&self) {
186        if self.mem.is_some() || self.write_buf.is_empty() { return; }
187        use rayon::prelude::*;
188        // Drain all pending entries and write them in parallel
189        let entries: Vec<((String, String), Option<String>)> = self.write_buf
190            .iter()
191            .map(|e| (e.key().clone(), e.value().clone()))
192            .collect();
193        entries.par_iter().for_each(|((coll, id), hash_opt)| {
194            match hash_opt {
195                Some(hash) => {
196                    // Write/update: tmp → rename
197                    let path = self.path(coll, id);
198                    if let Some(parent) = path.parent() {
199                        let _ = fs::create_dir_all(parent);
200                    }
201                    let tmp = path.with_extension("tmp");
202                    if fs::write(&tmp, hash).is_ok() {
203                        let _ = fs::rename(&tmp, &path);
204                    }
205                }
206                None => {
207                    // Tombstone: remove the file (encoded leaf + legacy raw if distinct)
208                    let path = self.path(coll, id);
209                    let _ = fs::remove_file(&path);
210                    let raw = self.raw_path(coll, id);
211                    if raw != path { let _ = fs::remove_file(&raw); }
212                }
213            }
214        });
215        // Clear flushed entries
216        for ((coll, id), _) in &entries {
217            self.write_buf.remove(&(coll.clone(), id.clone()));
218        }
219    }
220
221    fn path(&self, coll: &str, id: &str) -> PathBuf {
222        // Shard across 256 subdirectories using first 2 hex chars of a simple
223        // hash of the id. Prevents flat-directory slowdown (ext4 htree degrades
224        // past ~50k files per directory) for large collections like kv.
225        // Format: indexes/{coll}/id/{shard}/{encode_id(id)}
226        // Shard on the RAW id (stable across versions); only the leaf filename
227        // is encoded so it is legal on every filesystem (incl. Windows).
228        let shard = id_shard(id);
229        self.root.join(coll).join("id").join(&shard).join(encode_id(id))
230    }
231
232    /// Legacy path: the raw id as the leaf filename (pre-`encode_id`). Used only
233    /// as a read/cleanup fallback so id-index entries written by older versions
234    /// on POSIX filesystems stay readable after upgrade. On Windows a raw path
235    /// with illegal chars simply fails to open (→ treated as absent).
236    fn raw_path(&self, coll: &str, id: &str) -> PathBuf {
237        let shard = id_shard(id);
238        self.root.join(coll).join("id").join(&shard).join(id)
239    }
240
241    /// Get the current object hash for a document.
242    /// Checks WAL write buffer first (most recent), then disk.
243    pub fn get(&self, coll: &str, id: &str) -> Option<String> {
244        if let Some(ref mem) = self.mem {
245            return mem.get(&(coll.to_string(), id.to_string())).map(|v| v.clone());
246        }
247        // Check WAL buffer first — may have an unflushed write or tombstone
248        let key = (coll.to_string(), id.to_string());
249        if let Some(entry) = self.write_buf.get(&key) {
250            return entry.value().clone();  // None = tombstoned
251        }
252        // Fall through to disk: encoded filename first, then the legacy raw
253        // filename (pre-upgrade data). For safe ids the two paths are identical,
254        // so this is a single read on the hot path.
255        let p = self.path(coll, id);
256        let content = match fs::read_to_string(&p) {
257            Ok(c) => c,
258            Err(_) => {
259                let raw = self.raw_path(coll, id);
260                if raw == p { return None; }
261                fs::read_to_string(&raw).ok()?
262            }
263        };
264        let h = content.trim().to_string();
265        if h.is_empty() { None } else { Some(h) }
266    }
267
268    /// Set the current object hash for a document.
269    /// Disk mode: writes to WAL buffer only (zero I/O on hot path).
270    /// Background ticker flushes WAL to disk every 1s via Rayon.
271    pub fn set(&self, coll: &str, id: &str, hash: &str) -> Result<()> {
272        if let Some(ref mem) = self.mem {
273            mem.insert((coll.to_string(), id.to_string()), hash.to_string());
274            return Ok(());
275        }
276        // WAL: buffer the update, no disk I/O here
277        self.write_buf.insert(
278            (coll.to_string(), id.to_string()),
279            Some(hash.to_string()),
280        );
281        Ok(())
282    }
283
284    /// List all doc IDs in a collection (memory map or disk + WAL merge).
285    pub fn list_ids(&self, coll: &str) -> Vec<String> {
286        if let Some(ref mem) = self.mem {
287            return mem.iter()
288                .filter(|e| e.key().0 == coll)
289                .map(|e| e.key().1.clone())
290                .collect();
291        }
292        // Read from disk then overlay WAL (adds buffered writes, removes tombstones)
293        let id_root = self.root.join(coll).join("id");
294        // Each entry in id_root is a 2-char hex shard dir
295        fs::read_dir(&id_root)
296            .into_iter()
297            .flatten()
298            .filter_map(|e| e.ok())
299            .filter(|e| e.file_type().map(|t| t.is_dir()).unwrap_or(false))
300            .flat_map(|shard_dir| {
301                fs::read_dir(shard_dir.path())
302                    .into_iter()
303                    .flatten()
304                    .filter_map(|e| e.ok())
305                    .filter_map(|e| {
306                        let name = e.file_name().to_string_lossy().to_string();
307                        if name.ends_with(".tmp") { return None; }
308                        // Decode the on-disk filename back to the document id
309                        // (encoded for new files; identity for legacy/safe ids).
310                        Some(decode_id(&name))
311                    })
312                    .collect::<Vec<_>>()
313            })
314            .collect::<std::collections::HashSet<_>>()
315            .into_iter()
316            // Overlay WAL: add buffered writes, remove tombstones
317            .chain(
318                self.write_buf.iter()
319                    .filter(|e| e.key().0 == coll && e.value().is_some())
320                    .map(|e| e.key().1.clone())
321            )
322            .collect::<std::collections::HashSet<_>>()
323            .into_iter()
324            .filter(|id| {
325                // Exclude WAL tombstones
326                self.write_buf.get(&(coll.to_string(), id.clone()))
327                    .map(|v| v.is_some())
328                    .unwrap_or(true)
329            })
330            .collect()
331    }
332
333    /// Remove the id index entry for a document (tombstone / delete).
334    /// Disk mode: writes a tombstone to the WAL buffer; flushed to disk on next ticker.
335    pub fn remove(&self, coll: &str, id: &str) -> Result<()> {
336        if let Some(ref mem) = self.mem {
337            mem.remove(&(coll.to_string(), id.to_string()));
338            return Ok(());
339        }
340        // WAL tombstone: None value means "delete this file on flush"
341        self.write_buf.insert((coll.to_string(), id.to_string()), None);
342        Ok(())
343    }
344
345    /// List all known collections.
346    pub fn collections(&self) -> Vec<String> {
347        if let Some(ref mem) = self.mem {
348            let mut colls: Vec<String> = mem.iter()
349                .map(|e| e.key().0.clone())
350                .collect::<std::collections::HashSet<_>>()
351                .into_iter().collect();
352            colls.sort();
353            return colls;
354        }
355        fs::read_dir(&self.root)
356            .into_iter()
357            .flatten()
358            .filter_map(|e| e.ok())
359            .filter(|e| e.file_type().map(|t| t.is_dir()).unwrap_or(false))
360            .map(|e| e.file_name().to_string_lossy().to_string())
361            .collect()
362    }
363}
364
365/// In-memory sorted index per (collection, field).
366/// Rebuilt from object store on startup. O(log n) ORDER BY queries.
367pub struct SortedIndexes {
368    /// (coll, field) → BTreeMap<value, Vec<hash>>
369    inner: DashMap<(String, String), BTreeMap<OrderedValue, Vec<String>>>,
370}
371
372impl SortedIndexes {
373    pub fn new() -> Self {
374        Self { inner: DashMap::new() }
375    }
376
377    /// Register a field as sorted-indexed for a collection.
378    /// Must be called before any puts for that field to be indexed.
379    pub fn ensure(&self, coll: &str, field: &str) {
380        self.inner
381            .entry((coll.to_string(), field.to_string()))
382            .or_default();
383    }
384
385    /// Insert (or update) a value → hash mapping.
386    pub fn insert(&self, coll: &str, field: &str, value: &Value, hash: &str) {
387        let key = (coll.to_string(), field.to_string());
388        if let Some(mut idx) = self.inner.get_mut(&key) {
389            let ov = OrderedValue::from(value);
390            idx.entry(ov)
391               .or_default()
392               .push(hash.to_string());
393        }
394    }
395
396    /// Remove a hash from the index (on overwrite/delete of a doc version).
397    pub fn remove(&self, coll: &str, field: &str, value: &Value, hash: &str) {
398        let key = (coll.to_string(), field.to_string());
399        if let Some(mut idx) = self.inner.get_mut(&key) {
400            let ov = OrderedValue::from(value);
401            if let Some(hashes) = idx.get_mut(&ov) {
402                hashes.retain(|h| h != hash);
403                if hashes.is_empty() { idx.remove(&ov); }
404            }
405        }
406    }
407
408    /// Return the top-k hashes ordered by field ASC.
409    pub fn top_k_asc(&self, coll: &str, field: &str, k: usize) -> Vec<String> {
410        let key = (coll.to_string(), field.to_string());
411        self.inner.get(&key).map(|idx| {
412            idx.values().flat_map(|v| v.iter().cloned()).take(k).collect()
413        }).unwrap_or_default()
414    }
415
416    /// Return the top-k hashes ordered by field DESC.
417    pub fn top_k_desc(&self, coll: &str, field: &str, k: usize) -> Vec<String> {
418        let key = (coll.to_string(), field.to_string());
419        self.inner.get(&key).map(|idx| {
420            idx.values().rev().flat_map(|v| v.iter().cloned()).take(k).collect()
421        }).unwrap_or_default()
422    }
423
424    /// Check if a sorted index exists for a (coll, field) pair.
425    pub fn has(&self, coll: &str, field: &str) -> bool {
426        self.inner.contains_key(&(coll.to_string(), field.to_string()))
427    }
428
429    /// True if no sorted indexes have been registered yet.
430    pub fn is_empty(&self) -> bool {
431        self.inner.is_empty()
432    }
433}
434
435#[cfg(test)]
436mod tests {
437    use super::*;
438    use tempfile::tempdir;
439
440    #[test]
441    fn id_index_roundtrip() {
442        let dir = tempdir().unwrap();
443        let idx = IdIndex::new(dir.path()).unwrap();
444        idx.set("blocks", "618000", "abcdef1234").unwrap();
445        assert_eq!(idx.get("blocks", "618000"), Some("abcdef1234".to_string()));
446    }
447
448    #[test]
449    fn encode_decode_id_bijective() {
450        // Safe ids pass through unchanged (chainstate paths stay identical).
451        for safe in ["618000", "utxo-000000042", "abc_DEF.123", "deadBEEF"] {
452            assert_eq!(encode_id(safe), safe, "safe id must be identity");
453            assert_eq!(decode_id(&encode_id(safe)), safe);
454        }
455        // FS-unsafe ids (link ids, paths) round-trip and contain no illegal
456        // Windows filename chars once encoded.
457        for weird in ["driver:d1|handles|trip:t1", "a/b\\c", "x<y>z?\"*", "100%done"] {
458            let enc = encode_id(weird);
459            assert!(
460                !enc.chars().any(|c| matches!(c,
461                    ':' | '|' | '/' | '\\' | '<' | '>' | '?' | '"' | '*')),
462                "encoded leaf must be filesystem-safe: {}", enc);
463            assert_eq!(decode_id(&enc), weird, "encode/decode must round-trip");
464        }
465    }
466
467    #[test]
468    fn id_index_fs_unsafe_id_survives_disk_roundtrip() {
469        // Regression: link ids contain ':' and '|', illegal in Windows filenames.
470        // They must persist to the on-disk id-index and read back after reopen.
471        let dir = tempdir().unwrap();
472        let weird = "driver:d1|handles|trip:t1";
473        {
474            let idx = IdIndex::new(dir.path()).unwrap();
475            idx.set("__links__", weird, "deadbeefcafe").unwrap();
476            idx.flush_write_buf(); // persist WAL → disk (encoded leaf filename)
477        }
478        // Cold reopen: nothing in the WAL, must come from disk.
479        let idx2 = IdIndex::new(dir.path()).unwrap();
480        assert_eq!(idx2.get("__links__", weird), Some("deadbeefcafe".to_string()),
481                   "FS-unsafe id must be readable from disk after reopen");
482        assert_eq!(idx2.list_ids("__links__"), vec![weird.to_string()],
483                   "list_ids must decode the on-disk filename back to the id");
484    }
485
486    #[test]
487    fn ordered_value_ordering() {
488        use OrderedValue::*;
489        assert!(Null < Bool(false));
490        assert!(Bool(false) < Bool(true));
491        assert!(Bool(true) < Number(0.0));
492        assert!(Number(1.0) < Number(2.0));
493        assert!(Number(2.0) < Str("a".to_string()));
494        assert!(Str("a".to_string()) < Str("b".to_string()));
495    }
496
497    #[test]
498    fn sorted_index_top_k() {
499        let idx = SortedIndexes::new();
500        idx.ensure("blocks", "height");
501        idx.insert("blocks", "height", &serde_json::json!(3), "hash3");
502        idx.insert("blocks", "height", &serde_json::json!(1), "hash1");
503        idx.insert("blocks", "height", &serde_json::json!(2), "hash2");
504        let asc = idx.top_k_asc("blocks", "height", 2);
505        assert_eq!(asc, vec!["hash1", "hash2"]);
506        let desc = idx.top_k_desc("blocks", "height", 2);
507        assert_eq!(desc, vec!["hash3", "hash2"]);
508    }
509}