Skip to main content

nedb_engine/
index.rs

1//! Index store for NEDB v2.
2//!
3//! Two index types:
4//!
5//! 1. **ID index** (`indexes/{coll}/id/{doc_id}` → object hash)
6//!    Atomic file-per-document. Reading is a single `fs::read_to_string`.
7//!    Writing is atomic (write .tmp → rename). Parallel reads are lock-free.
8//!
9//! 2. **Sorted index** (`indexes/{coll}/{field}.sorted` → in-memory BTreeMap)
10//!    Rebuilt from object store on startup. Persisted as a compact binary
11//!    file for fast cold start. Used for ORDER BY field ASC/DESC LIMIT n.
12
13use std::collections::BTreeMap;
14use std::fs;
15use std::path::{Path, PathBuf};
16use std::sync::Arc;
17use anyhow::Result;
18use dashmap::DashMap;
19use serde_json::Value;
20
21/// Ordered JSON value for BTree indexes (null < bool < number < string < array < object).
22#[derive(Debug, Clone, PartialEq)]
23pub enum OrderedValue {
24    Null,
25    Bool(bool),
26    Number(f64),   // NaN-safe comparison via total_cmp
27    Str(String),
28    Array(Vec<OrderedValue>),
29    Object,        // objects are all equal in ordering (sort by insertion order falls back to hash)
30}
31
32impl Eq for OrderedValue {}
33
34impl PartialOrd for OrderedValue {
35    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
36        Some(self.cmp(other))
37    }
38}
39
40impl Ord for OrderedValue {
41    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
42        use OrderedValue::*;
43        use std::cmp::Ordering::*;
44        match (self, other) {
45            (Null, Null)       => Equal,
46            (Null, _)          => Less,
47            (_, Null)          => Greater,
48            (Bool(a), Bool(b)) => a.cmp(b),
49            (Bool(_), _)       => Less,
50            (_, Bool(_))       => Greater,
51            (Number(a), Number(b)) => a.total_cmp(b),
52            (Number(_), _)     => Less,
53            (_, Number(_))     => Greater,
54            (Str(a), Str(b))   => a.cmp(b),
55            (Str(_), _)        => Less,
56            (_, Str(_))        => Greater,
57            (Array(a), Array(b)) => a.cmp(b),
58            (Array(_), _)      => Less,
59            (_, Array(_))      => Greater,
60            (Object, Object)   => Equal,
61        }
62    }
63}
64
65impl From<&Value> for OrderedValue {
66    fn from(v: &Value) -> Self {
67        match v {
68            Value::Null        => OrderedValue::Null,
69            Value::Bool(b)     => OrderedValue::Bool(*b),
70            Value::Number(n)   => OrderedValue::Number(n.as_f64().unwrap_or(f64::NAN)),
71            Value::String(s)   => OrderedValue::Str(s.clone()),
72            Value::Array(a)    => OrderedValue::Array(a.iter().map(|x| x.into()).collect()),
73            Value::Object(_)   => OrderedValue::Object,
74        }
75    }
76}
77
78/// Compute a 2-char hex shard prefix from a document id.
79/// Distributes files across 256 subdirectories to avoid flat-directory
80/// slowdown on ext4/xfs when a collection has >50k documents.
81fn id_shard(id: &str) -> String {
82    // FNV-1a 32-bit — fast, no crypto needed, deterministic
83    let mut hash: u32 = 2166136261;
84    for b in id.bytes() {
85        hash ^= b as u32;
86        hash = hash.wrapping_mul(16777619);
87    }
88    format!("{:02x}", hash & 0xff)
89}
90
91/// Per-document ID index — atomic file-per-doc, sharded across 256 subdirs.
92///
93/// Write path: updates go to `write_buf` (DashMap, zero I/O, lock-free).
94/// Background ticker calls `flush_write_buf()` every 1s — Rayon-parallel disk writes.
95/// Read path: `write_buf` checked first (latest value), then disk.
96/// This eliminates per-PUT `fs::rename` from the hot path, fixing concurrent write contention.
97pub struct IdIndex {
98    root:      PathBuf,
99    /// In-memory store: (coll, id) → hash. None = disk-backed (normal mode).
100    mem:       Option<Arc<dashmap::DashMap<(String, String), String>>>,
101    /// WAL write buffer — disk-backed mode buffers here, flushed to disk periodically.
102    write_buf: Arc<dashmap::DashMap<(String, String), Option<String>>>,  // None = tombstone
103}
104
105impl IdIndex {
106    pub fn new(db_root: &Path) -> Result<Self> {
107        let root = db_root.join("indexes");
108        fs::create_dir_all(&root)?;
109        Ok(Self { root, mem: None, write_buf: Arc::new(dashmap::DashMap::new()) })
110    }
111
112    /// Create a pure in-memory id index — no disk I/O.
113    pub fn in_memory() -> Self {
114        Self {
115            root:      PathBuf::from(":memory:"),
116            mem:       Some(Arc::new(dashmap::DashMap::new())),
117            write_buf: Arc::new(dashmap::DashMap::new()),
118        }
119    }
120
121    /// Flush the WAL write buffer to disk in parallel. Called by the background ticker.
122    /// No-op for in-memory databases. Safe to call concurrently with writes.
123    pub fn flush_write_buf(&self) {
124        if self.mem.is_some() || self.write_buf.is_empty() { return; }
125        use rayon::prelude::*;
126        // Drain all pending entries and write them in parallel
127        let entries: Vec<((String, String), Option<String>)> = self.write_buf
128            .iter()
129            .map(|e| (e.key().clone(), e.value().clone()))
130            .collect();
131        entries.par_iter().for_each(|((coll, id), hash_opt)| {
132            match hash_opt {
133                Some(hash) => {
134                    // Write/update: tmp → rename
135                    let path = self.path(coll, id);
136                    if let Some(parent) = path.parent() {
137                        let _ = fs::create_dir_all(parent);
138                    }
139                    let tmp = path.with_extension("tmp");
140                    if fs::write(&tmp, hash).is_ok() {
141                        let _ = fs::rename(&tmp, &path);
142                    }
143                }
144                None => {
145                    // Tombstone: remove the file
146                    let path = self.path(coll, id);
147                    let _ = fs::remove_file(&path);
148                }
149            }
150        });
151        // Clear flushed entries
152        for ((coll, id), _) in &entries {
153            self.write_buf.remove(&(coll.clone(), id.clone()));
154        }
155    }
156
157    fn path(&self, coll: &str, id: &str) -> PathBuf {
158        // Shard across 256 subdirectories using first 2 hex chars of a simple
159        // hash of the id. Prevents flat-directory slowdown (ext4 htree degrades
160        // past ~50k files per directory) for large collections like kv.
161        // Format: indexes/{coll}/id/{shard}/{id}
162        let shard = id_shard(id);
163        self.root.join(coll).join("id").join(&shard).join(id)
164    }
165
166    /// Get the current object hash for a document.
167    /// Checks WAL write buffer first (most recent), then disk.
168    pub fn get(&self, coll: &str, id: &str) -> Option<String> {
169        if let Some(ref mem) = self.mem {
170            return mem.get(&(coll.to_string(), id.to_string())).map(|v| v.clone());
171        }
172        // Check WAL buffer first — may have an unflushed write or tombstone
173        let key = (coll.to_string(), id.to_string());
174        if let Some(entry) = self.write_buf.get(&key) {
175            return entry.value().clone();  // None = tombstoned
176        }
177        // Fall through to disk
178        let content = fs::read_to_string(self.path(coll, id)).ok()?;
179        let h = content.trim().to_string();
180        if h.is_empty() { None } else { Some(h) }
181    }
182
183    /// Set the current object hash for a document.
184    /// Disk mode: writes to WAL buffer only (zero I/O on hot path).
185    /// Background ticker flushes WAL to disk every 1s via Rayon.
186    pub fn set(&self, coll: &str, id: &str, hash: &str) -> Result<()> {
187        if let Some(ref mem) = self.mem {
188            mem.insert((coll.to_string(), id.to_string()), hash.to_string());
189            return Ok(());
190        }
191        // WAL: buffer the update, no disk I/O here
192        self.write_buf.insert(
193            (coll.to_string(), id.to_string()),
194            Some(hash.to_string()),
195        );
196        Ok(())
197    }
198
199    /// List all doc IDs in a collection (memory map or disk + WAL merge).
200    pub fn list_ids(&self, coll: &str) -> Vec<String> {
201        if let Some(ref mem) = self.mem {
202            return mem.iter()
203                .filter(|e| e.key().0 == coll)
204                .map(|e| e.key().1.clone())
205                .collect();
206        }
207        // Read from disk then overlay WAL (adds buffered writes, removes tombstones)
208        let id_root = self.root.join(coll).join("id");
209        // Each entry in id_root is a 2-char hex shard dir
210        fs::read_dir(&id_root)
211            .into_iter()
212            .flatten()
213            .filter_map(|e| e.ok())
214            .filter(|e| e.file_type().map(|t| t.is_dir()).unwrap_or(false))
215            .flat_map(|shard_dir| {
216                fs::read_dir(shard_dir.path())
217                    .into_iter()
218                    .flatten()
219                    .filter_map(|e| e.ok())
220                    .filter_map(|e| {
221                        let name = e.file_name().to_string_lossy().to_string();
222                        if name.ends_with(".tmp") { return None; }
223                        Some(name)
224                    })
225                    .collect::<Vec<_>>()
226            })
227            .collect::<std::collections::HashSet<_>>()
228            .into_iter()
229            // Overlay WAL: add buffered writes, remove tombstones
230            .chain(
231                self.write_buf.iter()
232                    .filter(|e| e.key().0 == coll && e.value().is_some())
233                    .map(|e| e.key().1.clone())
234            )
235            .collect::<std::collections::HashSet<_>>()
236            .into_iter()
237            .filter(|id| {
238                // Exclude WAL tombstones
239                self.write_buf.get(&(coll.to_string(), id.clone()))
240                    .map(|v| v.is_some())
241                    .unwrap_or(true)
242            })
243            .collect()
244    }
245
246    /// Remove the id index entry for a document (tombstone / delete).
247    /// Disk mode: writes a tombstone to the WAL buffer; flushed to disk on next ticker.
248    pub fn remove(&self, coll: &str, id: &str) -> Result<()> {
249        if let Some(ref mem) = self.mem {
250            mem.remove(&(coll.to_string(), id.to_string()));
251            return Ok(());
252        }
253        // WAL tombstone: None value means "delete this file on flush"
254        self.write_buf.insert((coll.to_string(), id.to_string()), None);
255        Ok(())
256    }
257
258    /// List all known collections.
259    pub fn collections(&self) -> Vec<String> {
260        if let Some(ref mem) = self.mem {
261            let mut colls: Vec<String> = mem.iter()
262                .map(|e| e.key().0.clone())
263                .collect::<std::collections::HashSet<_>>()
264                .into_iter().collect();
265            colls.sort();
266            return colls;
267        }
268        fs::read_dir(&self.root)
269            .into_iter()
270            .flatten()
271            .filter_map(|e| e.ok())
272            .filter(|e| e.file_type().map(|t| t.is_dir()).unwrap_or(false))
273            .map(|e| e.file_name().to_string_lossy().to_string())
274            .collect()
275    }
276}
277
278/// In-memory sorted index per (collection, field).
279/// Rebuilt from object store on startup. O(log n) ORDER BY queries.
280pub struct SortedIndexes {
281    /// (coll, field) → BTreeMap<value, Vec<hash>>
282    inner: DashMap<(String, String), BTreeMap<OrderedValue, Vec<String>>>,
283}
284
285impl SortedIndexes {
286    pub fn new() -> Self {
287        Self { inner: DashMap::new() }
288    }
289
290    /// Register a field as sorted-indexed for a collection.
291    /// Must be called before any puts for that field to be indexed.
292    pub fn ensure(&self, coll: &str, field: &str) {
293        self.inner
294            .entry((coll.to_string(), field.to_string()))
295            .or_default();
296    }
297
298    /// Insert (or update) a value → hash mapping.
299    pub fn insert(&self, coll: &str, field: &str, value: &Value, hash: &str) {
300        let key = (coll.to_string(), field.to_string());
301        if let Some(mut idx) = self.inner.get_mut(&key) {
302            let ov = OrderedValue::from(value);
303            idx.entry(ov)
304               .or_default()
305               .push(hash.to_string());
306        }
307    }
308
309    /// Remove a hash from the index (on overwrite/delete of a doc version).
310    pub fn remove(&self, coll: &str, field: &str, value: &Value, hash: &str) {
311        let key = (coll.to_string(), field.to_string());
312        if let Some(mut idx) = self.inner.get_mut(&key) {
313            let ov = OrderedValue::from(value);
314            if let Some(hashes) = idx.get_mut(&ov) {
315                hashes.retain(|h| h != hash);
316                if hashes.is_empty() { idx.remove(&ov); }
317            }
318        }
319    }
320
321    /// Return the top-k hashes ordered by field ASC.
322    pub fn top_k_asc(&self, coll: &str, field: &str, k: usize) -> Vec<String> {
323        let key = (coll.to_string(), field.to_string());
324        self.inner.get(&key).map(|idx| {
325            idx.values().flat_map(|v| v.iter().cloned()).take(k).collect()
326        }).unwrap_or_default()
327    }
328
329    /// Return the top-k hashes ordered by field DESC.
330    pub fn top_k_desc(&self, coll: &str, field: &str, k: usize) -> Vec<String> {
331        let key = (coll.to_string(), field.to_string());
332        self.inner.get(&key).map(|idx| {
333            idx.values().rev().flat_map(|v| v.iter().cloned()).take(k).collect()
334        }).unwrap_or_default()
335    }
336
337    /// Check if a sorted index exists for a (coll, field) pair.
338    pub fn has(&self, coll: &str, field: &str) -> bool {
339        self.inner.contains_key(&(coll.to_string(), field.to_string()))
340    }
341
342    /// True if no sorted indexes have been registered yet.
343    pub fn is_empty(&self) -> bool {
344        self.inner.is_empty()
345    }
346}
347
348#[cfg(test)]
349mod tests {
350    use super::*;
351    use tempfile::tempdir;
352
353    #[test]
354    fn id_index_roundtrip() {
355        let dir = tempdir().unwrap();
356        let idx = IdIndex::new(dir.path()).unwrap();
357        idx.set("blocks", "618000", "abcdef1234").unwrap();
358        assert_eq!(idx.get("blocks", "618000"), Some("abcdef1234".to_string()));
359    }
360
361    #[test]
362    fn ordered_value_ordering() {
363        use OrderedValue::*;
364        assert!(Null < Bool(false));
365        assert!(Bool(false) < Bool(true));
366        assert!(Bool(true) < Number(0.0));
367        assert!(Number(1.0) < Number(2.0));
368        assert!(Number(2.0) < Str("a".to_string()));
369        assert!(Str("a".to_string()) < Str("b".to_string()));
370    }
371
372    #[test]
373    fn sorted_index_top_k() {
374        let idx = SortedIndexes::new();
375        idx.ensure("blocks", "height");
376        idx.insert("blocks", "height", &serde_json::json!(3), "hash3");
377        idx.insert("blocks", "height", &serde_json::json!(1), "hash1");
378        idx.insert("blocks", "height", &serde_json::json!(2), "hash2");
379        let asc = idx.top_k_asc("blocks", "height", 2);
380        assert_eq!(asc, vec!["hash1", "hash2"]);
381        let desc = idx.top_k_desc("blocks", "height", 2);
382        assert_eq!(desc, vec!["hash3", "hash2"]);
383    }
384}