Skip to main content

nedb_engine/
nql.rs

1//! NQL (NEDB Query Language) parser and executor for v2 DAG storage.
2//!
3//! Grammar:
4//!   FROM coll
5//!     [AS OF seq]
6//!     [VALID AS OF "date"]
7//!     [WHERE field op value [AND field op value]*]
8//!     [SEARCH "text"]
9//!     [ORDER BY field [DESC]]
10//!     [LIMIT n]
11//!     [GROUP BY field COUNT|SUM|AVG|MIN|MAX]
12//!     [TRACE caused_by [REVERSE]]
13
14use std::collections::HashMap;
15use anyhow::{bail, Result};
16use serde_json::{json, Value};
17
18use crate::db::Db;
19use crate::index::OrderedValue;
20use crate::store::Node;
21
22// ── Token types ──────────────────────────────────────────────────────────────
23
24#[derive(Debug, Clone, PartialEq)]
25enum Tok {
26    Kw(String),     // uppercase keyword: FROM, WHERE, ORDER, BY, AS, OF, VALID, LIMIT, GROUP, TRACE, REVERSE, AND, DESC, COUNT, SUM, AVG, MIN, MAX, SEARCH
27    Ident(String),  // field name or collection name (lowercase/mixed)
28    Str(String),    // "quoted string"
29    Num(f64),       // numeric literal
30    Op(String),     // = != > < >= <=
31    Eof,
32}
33
34struct Lexer<'a> {
35    src:  &'a str,
36    pos:  usize,
37}
38
39impl<'a> Lexer<'a> {
40    fn new(src: &'a str) -> Self { Self { src, pos: 0 } }
41
42    fn peek_char(&self) -> Option<char> { self.src[self.pos..].chars().next() }
43
44    fn skip_ws(&mut self) {
45        while let Some(c) = self.peek_char() {
46            if c.is_whitespace() { self.pos += c.len_utf8(); } else { break; }
47        }
48    }
49
50    fn next_tok(&mut self) -> Tok {
51        self.skip_ws();
52        if self.pos >= self.src.len() { return Tok::Eof; }
53
54        let c = self.peek_char().unwrap();
55
56        // Quoted string
57        if c == '"' {
58            self.pos += 1;
59            let start = self.pos;
60            while self.pos < self.src.len() && self.peek_char() != Some('"') {
61                self.pos += self.peek_char().unwrap().len_utf8();
62            }
63            let s = self.src[start..self.pos].to_string();
64            if self.peek_char() == Some('"') { self.pos += 1; }
65            return Tok::Str(s);
66        }
67
68        // Two-char operators
69        if self.pos + 1 < self.src.len() {
70            let two = &self.src[self.pos..self.pos+2];
71            if matches!(two, "!=" | ">=" | "<=") {
72                self.pos += 2;
73                return Tok::Op(two.to_string());
74            }
75        }
76
77        // One-char operators
78        if matches!(c, '=' | '>' | '<') {
79            self.pos += 1;
80            return Tok::Op(c.to_string());
81        }
82
83        // Number
84        if c.is_ascii_digit() || (c == '-' && self.src[self.pos+1..].starts_with(|d: char| d.is_ascii_digit())) {
85            let start = self.pos;
86            if c == '-' { self.pos += 1; }
87            while let Some(d) = self.peek_char() {
88                if d.is_ascii_digit() || d == '.' { self.pos += 1; } else { break; }
89            }
90            let n: f64 = self.src[start..self.pos].parse().unwrap_or(0.0);
91            return Tok::Num(n);
92        }
93
94        // Keyword or identifier
95        if c.is_alphabetic() || c == '_' {
96            let start = self.pos;
97            while let Some(ch) = self.peek_char() {
98                if ch.is_alphanumeric() || ch == '_' || ch == '.' || ch == ':' {
99                    self.pos += ch.len_utf8();
100                } else { break; }
101            }
102            let word = &self.src[start..self.pos];
103            let upper = word.to_uppercase();
104            let keywords = ["FROM","AS","OF","VALID","WHERE","AND","ORDER","BY",
105                            "DESC","LIMIT","GROUP","COUNT","SUM","AVG","MIN","MAX",
106                            "TRACE","TRAVERSE","REVERSE","SEARCH","NOT","NULL","TRUE","FALSE"];
107            if keywords.contains(&upper.as_str()) {
108                return Tok::Kw(upper);
109            }
110            return Tok::Ident(word.to_string());
111        }
112
113        // Skip unknown char
114        self.pos += c.len_utf8();
115        self.next_tok()
116    }
117
118    fn tokenize(&mut self) -> Vec<Tok> {
119        let mut toks = vec![];
120        loop {
121            let t = self.next_tok();
122            if t == Tok::Eof { break; }
123            toks.push(t);
124        }
125        toks
126    }
127}
128
129// ── AST ──────────────────────────────────────────────────────────────────────
130
131#[derive(Debug, Clone)]
132pub struct WhereClause {
133    pub field: String,
134    pub op:    String,
135    pub value: Value,
136}
137
138#[derive(Debug, Clone)]
139pub enum GroupAgg { Count, Sum, Avg, Min, Max }
140
141#[derive(Debug, Clone)]
142pub struct Query {
143    pub coll:       String,
144    pub as_of:      Option<u64>,
145    pub valid_as_of: Option<String>,
146    pub wheres:     Vec<WhereClause>,
147    pub search:     Option<String>,
148    pub order_by:   Option<String>,
149    pub order_desc: bool,
150    pub limit:      Option<usize>,
151    pub group_by:   Option<(String, GroupAgg)>,
152    pub trace:      Option<String>,     // edge type (usually "caused_by")
153    pub trace_rev:  bool,
154    pub traverse:   Option<String>,     // named relation for TRAVERSE rel
155}
156
157// ── Parser ────────────────────────────────────────────────────────────────────
158
159struct Parser { toks: Vec<Tok>, pos: usize }
160
161impl Parser {
162    fn new(toks: Vec<Tok>) -> Self { Self { toks, pos: 0 } }
163
164    fn peek(&self) -> &Tok { self.toks.get(self.pos).unwrap_or(&Tok::Eof) }
165    fn advance(&mut self) -> Tok { let t = self.peek().clone(); self.pos += 1; t }
166
167    fn expect_kw(&mut self, kw: &str) -> Result<()> {
168        match self.advance() {
169            Tok::Kw(k) if k == kw => Ok(()),
170            other => bail!("expected keyword {}, got {:?}", kw, other),
171        }
172    }
173
174    fn parse_value(&mut self) -> Value {
175        match self.advance() {
176            Tok::Str(s)  => Value::String(s),
177            Tok::Num(n)  => json!(n),
178            Tok::Kw(k) if k == "NULL"  => Value::Null,
179            Tok::Kw(k) if k == "TRUE"  => Value::Bool(true),
180            Tok::Kw(k) if k == "FALSE" => Value::Bool(false),
181            Tok::Ident(s) => Value::String(s),
182            _ => Value::Null,
183        }
184    }
185
186    fn parse(&mut self) -> Result<Query> {
187        self.expect_kw("FROM")?;
188        let coll = match self.advance() {
189            Tok::Ident(s) | Tok::Kw(s) => s,
190            other => bail!("expected collection name, got {:?}", other),
191        };
192
193        let mut q = Query {
194            coll, as_of: None, valid_as_of: None,
195            wheres: vec![], search: None,
196            order_by: None, order_desc: false,
197            limit: None, group_by: None,
198            trace: None, trace_rev: false,
199            traverse: None,
200        };
201
202        loop {
203            match self.peek() {
204                Tok::Eof => break,
205
206                Tok::Kw(k) if k == "AS" => {
207                    self.advance();
208                    self.expect_kw("OF")?;
209                    match self.advance() {
210                        Tok::Num(n) => q.as_of = Some(n as u64),
211                        other => bail!("AS OF expects sequence number, got {:?}", other),
212                    }
213                }
214
215                Tok::Kw(k) if k == "VALID" => {
216                    self.advance();
217                    self.expect_kw("AS")?;
218                    self.expect_kw("OF")?;
219                    match self.advance() {
220                        Tok::Str(s) => q.valid_as_of = Some(s),
221                        other => bail!("VALID AS OF expects date string, got {:?}", other),
222                    }
223                }
224
225                Tok::Kw(k) if k == "WHERE" => {
226                    self.advance();
227                    loop {
228                        let field = match self.advance() {
229                            Tok::Ident(s) | Tok::Kw(s) => s,
230                            other => bail!("WHERE: expected field name, got {:?}", other),
231                        };
232                        let op = match self.advance() {
233                            Tok::Op(s) => s,
234                            other => bail!("WHERE: expected operator, got {:?}", other),
235                        };
236                        let value = self.parse_value();
237                        q.wheres.push(WhereClause { field, op, value });
238                        if let Tok::Kw(k) = self.peek() {
239                            if k == "AND" { self.advance(); } else { break; }
240                        } else { break; }
241                    }
242                }
243
244                Tok::Kw(k) if k == "SEARCH" => {
245                    self.advance();
246                    match self.advance() {
247                        Tok::Str(s) => q.search = Some(s),
248                        other => bail!("SEARCH expects quoted string, got {:?}", other),
249                    }
250                }
251
252                Tok::Kw(k) if k == "ORDER" => {
253                    self.advance();
254                    self.expect_kw("BY")?;
255                    let field = match self.advance() {
256                        Tok::Ident(s) | Tok::Kw(s) => s,
257                        other => bail!("ORDER BY: expected field, got {:?}", other),
258                    };
259                    q.order_by = Some(field);
260                    if let Tok::Kw(k) = self.peek() {
261                        if k == "DESC" { self.advance(); q.order_desc = true; }
262                    }
263                }
264
265                Tok::Kw(k) if k == "LIMIT" => {
266                    self.advance();
267                    match self.advance() {
268                        Tok::Num(n) => q.limit = Some(n as usize),
269                        other => bail!("LIMIT expects number, got {:?}", other),
270                    }
271                }
272
273                Tok::Kw(k) if k == "GROUP" => {
274                    self.advance();
275                    self.expect_kw("BY")?;
276                    let field = match self.advance() {
277                        Tok::Ident(s) | Tok::Kw(s) => s,
278                        other => bail!("GROUP BY: expected field, got {:?}", other),
279                    };
280                    let agg = match self.advance() {
281                        Tok::Kw(a) if a == "COUNT" => GroupAgg::Count,
282                        Tok::Kw(a) if a == "SUM"   => GroupAgg::Sum,
283                        Tok::Kw(a) if a == "AVG"   => GroupAgg::Avg,
284                        Tok::Kw(a) if a == "MIN"   => GroupAgg::Min,
285                        Tok::Kw(a) if a == "MAX"   => GroupAgg::Max,
286                        other => bail!("GROUP BY: expected aggregation, got {:?}", other),
287                    };
288                    q.group_by = Some((field, agg));
289                }
290
291                Tok::Kw(k) if k == "TRACE" => {
292                    self.advance();
293                    let edge = match self.advance() {
294                        Tok::Ident(s) | Tok::Kw(s) => s,
295                        other => bail!("TRACE: expected edge type, got {:?}", other),
296                    };
297                    q.trace = Some(edge);
298                    if let Tok::Kw(k) = self.peek() {
299                        if k == "REVERSE" { self.advance(); q.trace_rev = true; }
300                    }
301                }
302
303                Tok::Kw(k) if k == "TRAVERSE" => {
304                    self.advance();
305                    let rel = match self.advance() {
306                        Tok::Ident(s) | Tok::Kw(s) => s,
307                        other => bail!("TRAVERSE: expected relation name, got {:?}", other),
308                    };
309                    q.traverse = Some(rel);
310                }
311
312                _ => { self.advance(); } // skip unrecognised
313            }
314        }
315
316        Ok(q)
317    }
318}
319
320// ── Executor ──────────────────────────────────────────────────────────────────
321
322fn matches_where(node: &Node, w: &WhereClause) -> bool {
323    let field_val = if w.field == "_id" {
324        Value::String(node.id.clone())
325    } else if w.field == "_coll" {
326        Value::String(node.coll.clone())
327    } else if w.field == "_hash" {
328        Value::String(node.hash.clone())
329    } else {
330        node.data.get(&w.field).cloned().unwrap_or(Value::Null)
331    };
332
333    let a = OrderedValue::from(&field_val);
334    let b = OrderedValue::from(&w.value);
335
336    match w.op.as_str() {
337        "="  => a == b,
338        "!=" => a != b,
339        ">"  => a >  b,
340        "<"  => a <  b,
341        ">=" => a >= b,
342        "<=" => a <= b,
343        _    => false,
344    }
345}
346
347fn matches_valid_as_of(node: &Node, date: &str) -> bool {
348    // A node is valid at `date` if:
349    //   valid_from is None OR valid_from <= date
350    //   AND (valid_to is None OR valid_to > date)
351    let from_ok = node.valid_from.as_deref().map(|f| f <= date).unwrap_or(true);
352    let to_ok   = node.valid_to.as_deref().map(|t| t > date).unwrap_or(true);
353    from_ok && to_ok
354}
355
356fn node_contains_text(node: &Node, text: &str) -> bool {
357    let s = node.data.to_string().to_lowercase();
358    s.contains(&text.to_lowercase())
359}
360
361fn node_to_json(node: &Node) -> Value {
362    let mut obj = if let Value::Object(m) = &node.data {
363        m.clone()
364    } else {
365        serde_json::Map::new()
366    };
367    obj.insert("_id".to_string(),   Value::String(node.id.clone()));
368    obj.insert("_hash".to_string(), Value::String(node.hash.clone()));
369    obj.insert("_seq".to_string(),  json!(node.seq));
370    obj.insert("_coll".to_string(), Value::String(node.coll.clone()));
371    if let Some(ref vf) = node.valid_from {
372        obj.insert("_valid_from".to_string(), Value::String(vf.clone()));
373    }
374    if let Some(ref vt) = node.valid_to {
375        obj.insert("_valid_to".to_string(), Value::String(vt.clone()));
376    }
377    if !node.caused_by.is_empty() {
378        obj.insert("_caused_by".to_string(), Value::Array(
379            node.caused_by.iter().map(|h| Value::String(h.clone())).collect()
380        ));
381    }
382    Value::Object(obj)
383}
384
385/// Execute a NQL query against the DAG database.
386pub fn execute(db: &Db, nql: &str) -> Result<Vec<Value>> {
387    let mut lexer = Lexer::new(nql);
388    let toks = lexer.tokenize();
389    let mut parser = Parser::new(toks);
390    let q = parser.parse()?;
391
392    // ── Candidate generation ──────────────────────────────────────────────────
393
394    // Fast path: single equality filter on _id with no AS OF.
395    // Skip the O(n) collection scan — go straight to the id index (O(1) file read).
396    // This turns `FROM coll WHERE _id = "x" LIMIT 1` from a full-table-scan into
397    // a single file read, giving orders-of-magnitude speedup for point lookups.
398    let id_eq_fast_path: Option<String> = if q.as_of.is_none() && q.trace.is_none() {
399        q.wheres.iter().find_map(|w| {
400            if w.field == "_id" && w.op == "=" {
401                if let Value::String(ref id) = w.value { Some(id.clone()) } else { None }
402            } else { None }
403        })
404    } else { None };
405
406    let candidates: Vec<Node> = if let Some(ref target_id) = id_eq_fast_path {
407        // O(1) direct id-index lookup — skip full collection scan entirely
408        db.get(&q.coll, target_id).into_iter().collect()
409    } else if let Some(seq_target) = q.as_of {
410        // AS OF: return each doc's version at or before target seq
411        db.id_index.list_ids(&q.coll).into_iter()
412            .filter_map(|id| db.get_as_of(&q.coll, &id, seq_target))
413            .collect()
414    } else if let Some(ref order_field) = q.order_by {
415        // ORDER BY with optional sorted index — get candidates in order.
416        //
417        // Push LIMIT down into the index scan ONLY when nothing filters rows
418        // after candidate generation. WHERE / SEARCH / VALID AS OF all run on
419        // the candidate set below, so truncating to the top-k FIRST returns
420        // incomplete results: `WHERE n_tx > 100 ORDER BY height LIMIT 10`
421        // would fetch the 10 lowest blocks by height and then filter — losing
422        // matches past the top-k window. The Python reference filters → sorts
423        // → limits (engine.py execute()); this keeps the engines in agreement.
424        let has_post_filters = !q.wheres.is_empty()
425            || q.search.is_some()
426            || q.valid_as_of.is_some();
427        let limit = if has_post_filters {
428            9_999_999
429        } else {
430            q.limit.unwrap_or(9_999_999)
431        };
432        if q.order_desc {
433            db.order_by_desc(&q.coll, order_field, limit)
434        } else {
435            db.order_by_asc(&q.coll, order_field, limit)
436        }
437    } else if let (Some(n), true) = (q.limit, q.wheres.is_empty()
438            && q.search.is_none() && q.trace.is_none()
439            && q.traverse.is_none() && q.group_by.is_none()
440            && q.valid_as_of.is_none()) {
441        // LIMIT-only fast path: no filters, no ordering, no trace.
442        // Take only the first N IDs from the id-index and fetch those docs.
443        // This makes `FROM coll LIMIT 1` O(N) not O(total) — critical for
444        // the Studio "Preparing…" phase which samples every collection.
445        db.id_index
446            .list_ids(&q.coll)
447            .into_iter()
448            .take(n)
449            .filter_map(|id| db.get(&q.coll, &id))
450            .collect()
451    } else {
452        // Default: all docs in collection
453        db.list(&q.coll)
454    };
455
456    // ── WHERE filter ──────────────────────────────────────────────────────────
457
458    let mut rows: Vec<Node> = candidates.into_iter()
459        .filter(|n| q.wheres.iter().all(|w| matches_where(n, w)))
460        .filter(|n| q.valid_as_of.as_deref()
461                       .map(|d| matches_valid_as_of(n, d))
462                       .unwrap_or(true))
463        .filter(|n| q.search.as_deref()
464                       .map(|t| node_contains_text(n, t))
465                       .unwrap_or(true))
466        .collect();
467
468    // ── TRACE ─────────────────────────────────────────────────────────────────
469
470    if let Some(ref _edge_type) = q.trace {
471        let limit = q.limit.unwrap_or(1000);
472        let mut traced: Vec<Node> = vec![];
473        for root in &rows {
474            let chain = db.trace(&root.hash, q.trace_rev, limit);
475            traced.extend(chain);
476        }
477        rows = traced;
478    }
479
480    // ── TRAVERSE rel — one-hop named-relation lookup ──────────────────────────
481
482    if let Some(ref rel) = q.traverse {
483        let mut traversed: Vec<Node> = vec![];
484        for root in &rows {
485            let frm = format!("{}:{}", root.coll, root.id);
486            let neighbors = db.neighbors(&frm, rel);
487            traversed.extend(neighbors);
488        }
489        rows = traversed;
490    }
491
492    // ── ORDER BY (post-filter sort if no sorted index was used) ───────────────
493
494    if let Some(ref field) = q.order_by {
495        if q.as_of.is_some() || !q.wheres.is_empty() || q.search.is_some() {
496            // Re-sort after filtering
497            rows.sort_by(|a, b| {
498                let av = a.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
499                let bv = b.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
500                if q.order_desc { bv.cmp(&av) } else { av.cmp(&bv) }
501            });
502        }
503    }
504
505    // ── LIMIT ─────────────────────────────────────────────────────────────────
506
507    if let Some(n) = q.limit {
508        rows.truncate(n);
509    }
510
511    // ── GROUP BY ─────────────────────────────────────────────────────────────
512
513    if let Some((ref group_field, ref agg)) = q.group_by {
514        let mut groups: HashMap<String, Vec<f64>> = HashMap::new();
515        for node in &rows {
516            let key = node.data.get(group_field)
517                .map(|v| v.to_string().trim_matches('"').to_string())
518                .unwrap_or_else(|| "null".to_string());
519            let val = node.data.get(group_field)
520                .and_then(|v| v.as_f64())
521                .unwrap_or(1.0);
522            groups.entry(key).or_default().push(val);
523        }
524        let result: Vec<Value> = groups.into_iter().map(|(k, vals)| {
525            let agg_val = match agg {
526                GroupAgg::Count => vals.len() as f64,
527                GroupAgg::Sum   => vals.iter().sum(),
528                GroupAgg::Avg   => vals.iter().sum::<f64>() / vals.len() as f64,
529                GroupAgg::Min   => vals.iter().cloned().fold(f64::INFINITY, f64::min),
530                GroupAgg::Max   => vals.iter().cloned().fold(f64::NEG_INFINITY, f64::max),
531            };
532            json!({group_field: k, "value": agg_val, "count": vals.len()})
533        }).collect();
534        return Ok(result);
535    }
536
537    // ── Serialize ─────────────────────────────────────────────────────────────
538
539    Ok(rows.into_iter().map(|n| node_to_json(&n)).collect())
540}
541
542/// Parse and execute NQL, returning (rows, count).
543pub fn query(db: &Db, nql: &str) -> Result<(Vec<Value>, usize)> {
544    let rows = execute(db, nql)?;
545    let count = rows.len();
546    Ok((rows, count))
547}
548
549#[cfg(test)]
550mod tests {
551    use super::*;
552    use tempfile::tempdir;
553    use crate::db::Db;
554
555    // Returns (TempDir, Db) — the TempDir guard MUST be kept alive by the caller
556    // (`let (_tmp, db) = setup();`). If it dropped here, its Drop would delete the
557    // database directory out from under the live Db, and every objects.read()
558    // (loose object files live on disk) would fail → queries return 0 rows.
559    fn setup() -> (tempfile::TempDir, Db) {
560        let dir = tempdir().unwrap();
561        let db = Db::open(dir.path(), None).unwrap();
562        db.create_sorted_index("blocks", "height");
563        for h in 1u64..=5 {
564            db.put("blocks", &h.to_string(),
565                serde_json::json!({"height": h, "hash": format!("000{}", h), "n_tx": h * 2}),
566                vec![], None, None).unwrap();
567        }
568        (dir, db)
569    }
570
571    #[test]
572    fn from_all() {
573        let (_tmp, db) = setup();
574        let (rows, count) = query(&db, "FROM blocks").unwrap();
575        assert_eq!(count, 5);
576        let _ = rows;
577    }
578
579    #[test]
580    fn where_eq() {
581        let (_tmp, db) = setup();
582        let (rows, count) = query(&db, r#"FROM blocks WHERE _id = "3""#).unwrap();
583        assert_eq!(count, 1);
584        assert_eq!(rows[0]["_id"], "3");
585    }
586
587    #[test]
588    fn order_by_limit() {
589        let (_tmp, db) = setup();
590        let (rows, count) = query(&db, "FROM blocks ORDER BY height ASC LIMIT 3").unwrap();
591        assert_eq!(count, 3);
592        assert_eq!(rows[0]["height"], 1);
593        assert_eq!(rows[2]["height"], 3);
594    }
595
596    #[test]
597    fn order_by_desc() {
598        let (_tmp, db) = setup();
599        let (rows, _) = query(&db, "FROM blocks ORDER BY height DESC LIMIT 2").unwrap();
600        assert_eq!(rows[0]["height"], 5);
601    }
602
603    #[test]
604    fn where_gt() {
605        let (_tmp, db) = setup();
606        let (rows, _) = query(&db, "FROM blocks WHERE height > 3").unwrap();
607        assert_eq!(rows.len(), 2);
608    }
609
610    /// Regression: WHERE + ORDER BY + LIMIT must not truncate candidates
611    /// before the filter runs. setup() gives heights 1..=5 with n_tx = h*2;
612    /// the predicate matches ONLY the two highest heights (4, 5). The old
613    /// code passed LIMIT into the sorted-index top-k first: it fetched
614    /// heights [1, 2], filtered on n_tx >= 8, and returned ZERO rows even
615    /// though two matches exist. Python reference returns [4, 5].
616    #[test]
617    fn where_order_limit_does_not_truncate_before_filter() {
618        let (_tmp, db) = setup();
619        let (rows, count) =
620            query(&db, "FROM blocks WHERE n_tx >= 8 ORDER BY height LIMIT 2").unwrap();
621        assert_eq!(count, 2, "both matching rows must survive the limit");
622        let heights: Vec<u64> = rows.iter()
623            .filter_map(|r| r["height"].as_u64())
624            .collect();
625        assert_eq!(heights, vec![4, 5]);
626        // And the same shape DESC — top match first.
627        let (rows_d, _) =
628            query(&db, "FROM blocks WHERE n_tx >= 8 ORDER BY height DESC LIMIT 1").unwrap();
629        assert_eq!(rows_d.len(), 1);
630        assert_eq!(rows_d[0]["height"], 5);
631    }
632
633    #[test]
634    fn group_by_count() {
635        let (_tmp, db) = setup();
636        let (rows, _) = query(&db, "FROM blocks GROUP BY n_tx COUNT").unwrap();
637        assert_eq!(rows.len(), 5); // all unique n_tx values
638    }
639
640    #[test]
641    fn search() {
642        let (_tmp, db) = setup();
643        let (rows, _) = query(&db, r#"FROM blocks SEARCH "0003""#).unwrap();
644        assert_eq!(rows.len(), 1);
645    }
646
647    #[test]
648    fn as_of() {
649        let dir = tempdir().unwrap();
650        let db = Db::open(dir.path(), None).unwrap();
651        let v1 = db.put("docs", "x", serde_json::json!({"v": 1}), vec![], None, None).unwrap();
652        db.put("docs", "x", serde_json::json!({"v": 2}), vec![], None, None).unwrap();
653        let (rows, _) = query(&db, &format!("FROM docs AS OF {}", v1.seq)).unwrap();
654        assert_eq!(rows[0]["v"], 1);
655    }
656
657    #[test]
658    fn valid_as_of() {
659        let dir = tempdir().unwrap();
660        let db = Db::open(dir.path(), None).unwrap();
661        db.put("events", "e1", serde_json::json!({"type": "a"}), vec![],
662               Some("2025-01-01".to_string()), Some("2025-06-01".to_string())).unwrap();
663        db.put("events", "e2", serde_json::json!({"type": "b"}), vec![],
664               Some("2026-01-01".to_string()), None).unwrap();
665        let (rows, _) = query(&db, r#"FROM events VALID AS OF "2025-03-01""#).unwrap();
666        assert_eq!(rows.len(), 1);
667        assert_eq!(rows[0]["type"], "a");
668    }
669}
670
671#[cfg(test)]
672mod tests_traverse {
673    use super::*;
674    use tempfile::tempdir;
675    use crate::db::Db;
676
677    #[test]
678    fn traverse_one_hop() {
679        let db = Db::in_memory();
680        db.put("driver", "d1", serde_json::json!({"name": "Bob"}),   vec![], None, None).unwrap();
681        db.put("driver", "d2", serde_json::json!({"name": "Carol"}), vec![], None, None).unwrap();
682        db.put("trip",   "t1", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
683        db.put("trip",   "t2", serde_json::json!({"status": "ok"}),  vec![], None, None).unwrap();
684
685        db.link("driver:d1", "handles", "trip:t1").unwrap();
686        db.link("driver:d1", "handles", "trip:t2").unwrap();
687
688        let (rows, count) = query(&db, r#"FROM driver WHERE _id = "d1" TRAVERSE handles"#).unwrap();
689        assert_eq!(count, 2);
690        let ids: std::collections::HashSet<&str> = rows.iter()
691            .filter_map(|r| r["_id"].as_str())
692            .collect();
693        assert!(ids.contains("t1") && ids.contains("t2"));
694    }
695
696    #[test]
697    fn traverse_returns_empty_when_no_links() {
698        let db = Db::in_memory();
699        db.put("driver", "d1", serde_json::json!({"name": "Bob"}), vec![], None, None).unwrap();
700        let (rows, count) = query(&db, r#"FROM driver WHERE _id = "d1" TRAVERSE handles"#).unwrap();
701        assert_eq!(count, 0);
702        assert!(rows.is_empty());
703    }
704
705    #[test]
706    fn traverse_multi_source() {
707        // When WHERE matches multiple rows, TRAVERSE unions all their neighbors
708        let db = Db::in_memory();
709        db.put("driver", "d1", serde_json::json!({"status": "active"}), vec![], None, None).unwrap();
710        db.put("driver", "d2", serde_json::json!({"status": "active"}), vec![], None, None).unwrap();
711        db.put("trip",   "t1", serde_json::json!({"n": 1}), vec![], None, None).unwrap();
712        db.put("trip",   "t2", serde_json::json!({"n": 2}), vec![], None, None).unwrap();
713        db.put("trip",   "t3", serde_json::json!({"n": 3}), vec![], None, None).unwrap();
714
715        db.link("driver:d1", "handles", "trip:t1").unwrap();
716        db.link("driver:d1", "handles", "trip:t2").unwrap();
717        db.link("driver:d2", "handles", "trip:t3").unwrap();
718
719        let (_rows, count) = query(&db, r#"FROM driver WHERE status = "active" TRAVERSE handles"#).unwrap();
720        assert_eq!(count, 3);
721    }
722
723    #[test]
724    fn traverse_nql_keyword_case_insensitive() {
725        // Parser normalises to uppercase — "traverse" and "TRAVERSE" both work
726        let db = Db::in_memory();
727        db.put("driver", "d1", serde_json::json!({}), vec![], None, None).unwrap();
728        db.put("trip",   "t1", serde_json::json!({}), vec![], None, None).unwrap();
729        db.link("driver:d1", "handles", "trip:t1").unwrap();
730        // uppercase
731        let (r1, c1) = query(&db, r#"FROM driver WHERE _id = "d1" TRAVERSE handles"#).unwrap();
732        assert_eq!(c1, 1);
733        // lowercase (lexer uppercases keywords)
734        let (r2, c2) = query(&db, r#"FROM driver WHERE _id = "d1" traverse handles"#).unwrap();
735        assert_eq!(c2, 1);
736        assert_eq!(r1[0]["_id"], r2[0]["_id"]);
737    }
738
739    #[test]
740    fn traverse_durable() {
741        let dir = tempdir().unwrap();
742        {
743            let db = Db::open(dir.path(), None).unwrap();
744            db.put("driver", "d1", serde_json::json!({"name": "Bob"}),   vec![], None, None).unwrap();
745            db.put("trip",   "t1", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
746            db.link("driver:d1", "handles", "trip:t1").unwrap();
747        }
748        let db2 = Db::open(dir.path(), None).unwrap();
749        db2.startup_ready.store(true, std::sync::atomic::Ordering::SeqCst);
750        let (rows, count) = query(&db2, r#"FROM driver WHERE _id = "d1" TRAVERSE handles"#).unwrap();
751        assert_eq!(count, 1);
752        assert_eq!(rows[0]["_id"], "t1");
753    }
754}