Skip to main content

nedb_engine/
nql.rs

1//! NQL (NEDB Query Language) parser and executor for v2 DAG storage.
2//!
3//! Grammar:
4//!   FROM coll
5//!     [AS OF seq]
6//!     [VALID AS OF "date"]
7//!     [WHERE field op value [AND field op value]*]
8//!     [SEARCH "text"]
9//!     [ORDER BY field [DESC]]
10//!     [LIMIT n]
11//!     [GROUP BY field COUNT|SUM|AVG|MIN|MAX]
12//!     [TRACE caused_by [REVERSE]]
13
14use std::collections::HashMap;
15use anyhow::{bail, Result};
16use serde_json::{json, Value};
17
18use crate::db::Db;
19use crate::index::OrderedValue;
20use crate::store::Node;
21
22// ── Token types ──────────────────────────────────────────────────────────────
23
24#[derive(Debug, Clone, PartialEq)]
25enum Tok {
26    Kw(String),     // uppercase keyword: FROM, WHERE, ORDER, BY, AS, OF, VALID, LIMIT, GROUP, TRACE, REVERSE, AND, DESC, COUNT, SUM, AVG, MIN, MAX, SEARCH
27    Ident(String),  // field name or collection name (lowercase/mixed)
28    Str(String),    // "quoted string"
29    Num(f64),       // numeric literal
30    Op(String),     // = != > < >= <=
31    Eof,
32}
33
34struct Lexer<'a> {
35    src:  &'a str,
36    pos:  usize,
37}
38
39impl<'a> Lexer<'a> {
40    fn new(src: &'a str) -> Self { Self { src, pos: 0 } }
41
42    fn peek_char(&self) -> Option<char> { self.src[self.pos..].chars().next() }
43
44    fn skip_ws(&mut self) {
45        while let Some(c) = self.peek_char() {
46            if c.is_whitespace() { self.pos += c.len_utf8(); } else { break; }
47        }
48    }
49
50    fn next_tok(&mut self) -> Tok {
51        self.skip_ws();
52        if self.pos >= self.src.len() { return Tok::Eof; }
53
54        let c = self.peek_char().unwrap();
55
56        // Quoted string
57        if c == '"' {
58            self.pos += 1;
59            let start = self.pos;
60            while self.pos < self.src.len() && self.peek_char() != Some('"') {
61                self.pos += self.peek_char().unwrap().len_utf8();
62            }
63            let s = self.src[start..self.pos].to_string();
64            if self.peek_char() == Some('"') { self.pos += 1; }
65            return Tok::Str(s);
66        }
67
68        // Two-char operators
69        if self.pos + 1 < self.src.len() {
70            let two = &self.src[self.pos..self.pos+2];
71            if matches!(two, "!=" | ">=" | "<=") {
72                self.pos += 2;
73                return Tok::Op(two.to_string());
74            }
75        }
76
77        // One-char operators
78        if matches!(c, '=' | '>' | '<') {
79            self.pos += 1;
80            return Tok::Op(c.to_string());
81        }
82
83        // Number
84        if c.is_ascii_digit() || (c == '-' && self.src[self.pos+1..].starts_with(|d: char| d.is_ascii_digit())) {
85            let start = self.pos;
86            if c == '-' { self.pos += 1; }
87            while let Some(d) = self.peek_char() {
88                if d.is_ascii_digit() || d == '.' { self.pos += 1; } else { break; }
89            }
90            let n: f64 = self.src[start..self.pos].parse().unwrap_or(0.0);
91            return Tok::Num(n);
92        }
93
94        // Keyword or identifier
95        if c.is_alphabetic() || c == '_' {
96            let start = self.pos;
97            while let Some(ch) = self.peek_char() {
98                if ch.is_alphanumeric() || ch == '_' || ch == '.' || ch == ':' {
99                    self.pos += ch.len_utf8();
100                } else { break; }
101            }
102            let word = &self.src[start..self.pos];
103            let upper = word.to_uppercase();
104            let keywords = ["FROM","AS","OF","VALID","WHERE","AND","ORDER","BY",
105                            "DESC","LIMIT","GROUP","COUNT","SUM","AVG","MIN","MAX",
106                            "TRACE","TRAVERSE","REVERSE","SEARCH","NOT","NULL","TRUE","FALSE"];
107            if keywords.contains(&upper.as_str()) {
108                return Tok::Kw(upper);
109            }
110            return Tok::Ident(word.to_string());
111        }
112
113        // Skip unknown char
114        self.pos += c.len_utf8();
115        self.next_tok()
116    }
117
118    fn tokenize(&mut self) -> Vec<Tok> {
119        let mut toks = vec![];
120        loop {
121            let t = self.next_tok();
122            if t == Tok::Eof { break; }
123            toks.push(t);
124        }
125        toks
126    }
127}
128
129// ── AST ──────────────────────────────────────────────────────────────────────
130
131#[derive(Debug, Clone)]
132pub struct WhereClause {
133    pub field: String,
134    pub op:    String,
135    pub value: Value,
136}
137
138#[derive(Debug, Clone)]
139pub enum GroupAgg { Count, Sum, Avg, Min, Max }
140
141#[derive(Debug, Clone)]
142pub struct Query {
143    pub coll:       String,
144    pub as_of:      Option<u64>,
145    pub valid_as_of: Option<String>,
146    pub wheres:     Vec<WhereClause>,
147    pub search:     Option<String>,
148    pub order_by:   Option<String>,
149    pub order_desc: bool,
150    pub limit:      Option<usize>,
151    pub group_by:   Option<(String, GroupAgg)>,
152    pub trace:      Option<String>,     // edge type (usually "caused_by")
153    pub trace_rev:  bool,
154    pub traverse:   Option<String>,     // named relation for TRAVERSE rel
155}
156
157// ── Parser ────────────────────────────────────────────────────────────────────
158
159struct Parser { toks: Vec<Tok>, pos: usize }
160
161impl Parser {
162    fn new(toks: Vec<Tok>) -> Self { Self { toks, pos: 0 } }
163
164    fn peek(&self) -> &Tok { self.toks.get(self.pos).unwrap_or(&Tok::Eof) }
165    fn advance(&mut self) -> Tok { let t = self.peek().clone(); self.pos += 1; t }
166
167    fn expect_kw(&mut self, kw: &str) -> Result<()> {
168        match self.advance() {
169            Tok::Kw(k) if k == kw => Ok(()),
170            other => bail!("expected keyword {}, got {:?}", kw, other),
171        }
172    }
173
174    fn parse_value(&mut self) -> Value {
175        match self.advance() {
176            Tok::Str(s)  => Value::String(s),
177            Tok::Num(n)  => json!(n),
178            Tok::Kw(k) if k == "NULL"  => Value::Null,
179            Tok::Kw(k) if k == "TRUE"  => Value::Bool(true),
180            Tok::Kw(k) if k == "FALSE" => Value::Bool(false),
181            Tok::Ident(s) => Value::String(s),
182            _ => Value::Null,
183        }
184    }
185
186    fn parse(&mut self) -> Result<Query> {
187        self.expect_kw("FROM")?;
188        let coll = match self.advance() {
189            Tok::Ident(s) | Tok::Kw(s) => s,
190            other => bail!("expected collection name, got {:?}", other),
191        };
192
193        let mut q = Query {
194            coll, as_of: None, valid_as_of: None,
195            wheres: vec![], search: None,
196            order_by: None, order_desc: false,
197            limit: None, group_by: None,
198            trace: None, trace_rev: false,
199            traverse: None,
200        };
201
202        loop {
203            match self.peek() {
204                Tok::Eof => break,
205
206                Tok::Kw(k) if k == "AS" => {
207                    self.advance();
208                    self.expect_kw("OF")?;
209                    match self.advance() {
210                        Tok::Num(n) => q.as_of = Some(n as u64),
211                        other => bail!("AS OF expects sequence number, got {:?}", other),
212                    }
213                }
214
215                Tok::Kw(k) if k == "VALID" => {
216                    self.advance();
217                    self.expect_kw("AS")?;
218                    self.expect_kw("OF")?;
219                    match self.advance() {
220                        Tok::Str(s) => q.valid_as_of = Some(s),
221                        other => bail!("VALID AS OF expects date string, got {:?}", other),
222                    }
223                }
224
225                Tok::Kw(k) if k == "WHERE" => {
226                    self.advance();
227                    loop {
228                        let field = match self.advance() {
229                            Tok::Ident(s) | Tok::Kw(s) => s,
230                            other => bail!("WHERE: expected field name, got {:?}", other),
231                        };
232                        let op = match self.advance() {
233                            Tok::Op(s) => s,
234                            other => bail!("WHERE: expected operator, got {:?}", other),
235                        };
236                        let value = self.parse_value();
237                        q.wheres.push(WhereClause { field, op, value });
238                        if let Tok::Kw(k) = self.peek() {
239                            if k == "AND" { self.advance(); } else { break; }
240                        } else { break; }
241                    }
242                }
243
244                Tok::Kw(k) if k == "SEARCH" => {
245                    self.advance();
246                    match self.advance() {
247                        Tok::Str(s) => q.search = Some(s),
248                        other => bail!("SEARCH expects quoted string, got {:?}", other),
249                    }
250                }
251
252                Tok::Kw(k) if k == "ORDER" => {
253                    self.advance();
254                    self.expect_kw("BY")?;
255                    let field = match self.advance() {
256                        Tok::Ident(s) | Tok::Kw(s) => s,
257                        other => bail!("ORDER BY: expected field, got {:?}", other),
258                    };
259                    q.order_by = Some(field);
260                    if let Tok::Kw(k) = self.peek() {
261                        if k == "DESC" { self.advance(); q.order_desc = true; }
262                    }
263                }
264
265                Tok::Kw(k) if k == "LIMIT" => {
266                    self.advance();
267                    match self.advance() {
268                        Tok::Num(n) => q.limit = Some(n as usize),
269                        other => bail!("LIMIT expects number, got {:?}", other),
270                    }
271                }
272
273                Tok::Kw(k) if k == "GROUP" => {
274                    self.advance();
275                    self.expect_kw("BY")?;
276                    let field = match self.advance() {
277                        Tok::Ident(s) | Tok::Kw(s) => s,
278                        other => bail!("GROUP BY: expected field, got {:?}", other),
279                    };
280                    let agg = match self.advance() {
281                        Tok::Kw(a) if a == "COUNT" => GroupAgg::Count,
282                        Tok::Kw(a) if a == "SUM"   => GroupAgg::Sum,
283                        Tok::Kw(a) if a == "AVG"   => GroupAgg::Avg,
284                        Tok::Kw(a) if a == "MIN"   => GroupAgg::Min,
285                        Tok::Kw(a) if a == "MAX"   => GroupAgg::Max,
286                        other => bail!("GROUP BY: expected aggregation, got {:?}", other),
287                    };
288                    q.group_by = Some((field, agg));
289                }
290
291                Tok::Kw(k) if k == "TRACE" => {
292                    self.advance();
293                    let edge = match self.advance() {
294                        Tok::Ident(s) | Tok::Kw(s) => s,
295                        other => bail!("TRACE: expected edge type, got {:?}", other),
296                    };
297                    q.trace = Some(edge);
298                    if let Tok::Kw(k) = self.peek() {
299                        if k == "REVERSE" { self.advance(); q.trace_rev = true; }
300                    }
301                }
302
303                Tok::Kw(k) if k == "TRAVERSE" => {
304                    self.advance();
305                    let rel = match self.advance() {
306                        Tok::Ident(s) | Tok::Kw(s) => s,
307                        other => bail!("TRAVERSE: expected relation name, got {:?}", other),
308                    };
309                    q.traverse = Some(rel);
310                }
311
312                _ => { self.advance(); } // skip unrecognised
313            }
314        }
315
316        Ok(q)
317    }
318}
319
320// ── Executor ──────────────────────────────────────────────────────────────────
321
322fn matches_where(node: &Node, w: &WhereClause) -> bool {
323    let field_val = if w.field == "_id" {
324        Value::String(node.id.clone())
325    } else if w.field == "_coll" {
326        Value::String(node.coll.clone())
327    } else if w.field == "_hash" {
328        Value::String(node.hash.clone())
329    } else {
330        node.data.get(&w.field).cloned().unwrap_or(Value::Null)
331    };
332
333    let a = OrderedValue::from(&field_val);
334    let b = OrderedValue::from(&w.value);
335
336    match w.op.as_str() {
337        "="  => a == b,
338        "!=" => a != b,
339        ">"  => a >  b,
340        "<"  => a <  b,
341        ">=" => a >= b,
342        "<=" => a <= b,
343        _    => false,
344    }
345}
346
347fn matches_valid_as_of(node: &Node, date: &str) -> bool {
348    // A node is valid at `date` if:
349    //   valid_from is None OR valid_from <= date
350    //   AND (valid_to is None OR valid_to > date)
351    let from_ok = node.valid_from.as_deref().map(|f| f <= date).unwrap_or(true);
352    let to_ok   = node.valid_to.as_deref().map(|t| t > date).unwrap_or(true);
353    from_ok && to_ok
354}
355
356fn node_contains_text(node: &Node, text: &str) -> bool {
357    let s = node.data.to_string().to_lowercase();
358    s.contains(&text.to_lowercase())
359}
360
361fn node_to_json(node: &Node) -> Value {
362    let mut obj = if let Value::Object(m) = &node.data {
363        m.clone()
364    } else {
365        serde_json::Map::new()
366    };
367    obj.insert("_id".to_string(),   Value::String(node.id.clone()));
368    obj.insert("_hash".to_string(), Value::String(node.hash.clone()));
369    obj.insert("_seq".to_string(),  json!(node.seq));
370    obj.insert("_coll".to_string(), Value::String(node.coll.clone()));
371    if let Some(ref vf) = node.valid_from {
372        obj.insert("_valid_from".to_string(), Value::String(vf.clone()));
373    }
374    if let Some(ref vt) = node.valid_to {
375        obj.insert("_valid_to".to_string(), Value::String(vt.clone()));
376    }
377    if !node.caused_by.is_empty() {
378        obj.insert("_caused_by".to_string(), Value::Array(
379            node.caused_by.iter().map(|h| Value::String(h.clone())).collect()
380        ));
381    }
382    Value::Object(obj)
383}
384
385/// Execute a NQL query against the DAG database.
386pub fn execute(db: &Db, nql: &str) -> Result<Vec<Value>> {
387    let mut lexer = Lexer::new(nql);
388    let toks = lexer.tokenize();
389    let mut parser = Parser::new(toks);
390    let q = parser.parse()?;
391
392    // ── Candidate generation ──────────────────────────────────────────────────
393
394    // Fast path: single equality filter on _id with no AS OF.
395    // Skip the O(n) collection scan — go straight to the id index (O(1) file read).
396    // This turns `FROM coll WHERE _id = "x" LIMIT 1` from a full-table-scan into
397    // a single file read, giving orders-of-magnitude speedup for point lookups.
398    let id_eq_fast_path: Option<String> = if q.as_of.is_none() && q.trace.is_none() {
399        q.wheres.iter().find_map(|w| {
400            if w.field == "_id" && w.op == "=" {
401                if let Value::String(ref id) = w.value { Some(id.clone()) } else { None }
402            } else { None }
403        })
404    } else { None };
405
406    let candidates: Vec<Node> = if let Some(ref target_id) = id_eq_fast_path {
407        // O(1) direct id-index lookup — skip full collection scan entirely
408        db.get(&q.coll, target_id).into_iter().collect()
409    } else if let Some(seq_target) = q.as_of {
410        // AS OF: return each doc's version at or before target seq
411        db.id_index.list_ids(&q.coll).into_iter()
412            .filter_map(|id| db.get_as_of(&q.coll, &id, seq_target))
413            .collect()
414    } else if let Some(ref order_field) = q.order_by {
415        // ORDER BY with optional sorted index — get candidates in order
416        let limit = q.limit.unwrap_or(9_999_999);
417        if q.order_desc {
418            db.order_by_desc(&q.coll, order_field, limit)
419        } else {
420            db.order_by_asc(&q.coll, order_field, limit)
421        }
422    } else if let (Some(n), true) = (q.limit, q.wheres.is_empty()
423            && q.search.is_none() && q.trace.is_none()
424            && q.traverse.is_none() && q.group_by.is_none()
425            && q.valid_as_of.is_none()) {
426        // LIMIT-only fast path: no filters, no ordering, no trace.
427        // Take only the first N IDs from the id-index and fetch those docs.
428        // This makes `FROM coll LIMIT 1` O(N) not O(total) — critical for
429        // the Studio "Preparing…" phase which samples every collection.
430        db.id_index
431            .list_ids(&q.coll)
432            .into_iter()
433            .take(n)
434            .filter_map(|id| db.get(&q.coll, &id))
435            .collect()
436    } else {
437        // Default: all docs in collection
438        db.list(&q.coll)
439    };
440
441    // ── WHERE filter ──────────────────────────────────────────────────────────
442
443    let mut rows: Vec<Node> = candidates.into_iter()
444        .filter(|n| q.wheres.iter().all(|w| matches_where(n, w)))
445        .filter(|n| q.valid_as_of.as_deref()
446                       .map(|d| matches_valid_as_of(n, d))
447                       .unwrap_or(true))
448        .filter(|n| q.search.as_deref()
449                       .map(|t| node_contains_text(n, t))
450                       .unwrap_or(true))
451        .collect();
452
453    // ── TRACE ─────────────────────────────────────────────────────────────────
454
455    if let Some(ref _edge_type) = q.trace {
456        let limit = q.limit.unwrap_or(1000);
457        let mut traced: Vec<Node> = vec![];
458        for root in &rows {
459            let chain = db.trace(&root.hash, q.trace_rev, limit);
460            traced.extend(chain);
461        }
462        rows = traced;
463    }
464
465    // ── TRAVERSE rel — one-hop named-relation lookup ──────────────────────────
466
467    if let Some(ref rel) = q.traverse {
468        let mut traversed: Vec<Node> = vec![];
469        for root in &rows {
470            let frm = format!("{}:{}", root.coll, root.id);
471            let neighbors = db.neighbors(&frm, rel);
472            traversed.extend(neighbors);
473        }
474        rows = traversed;
475    }
476
477    // ── ORDER BY (post-filter sort if no sorted index was used) ───────────────
478
479    if let Some(ref field) = q.order_by {
480        if q.as_of.is_some() || !q.wheres.is_empty() || q.search.is_some() {
481            // Re-sort after filtering
482            rows.sort_by(|a, b| {
483                let av = a.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
484                let bv = b.data.get(field).map(OrderedValue::from).unwrap_or(OrderedValue::Null);
485                if q.order_desc { bv.cmp(&av) } else { av.cmp(&bv) }
486            });
487        }
488    }
489
490    // ── LIMIT ─────────────────────────────────────────────────────────────────
491
492    if let Some(n) = q.limit {
493        rows.truncate(n);
494    }
495
496    // ── GROUP BY ─────────────────────────────────────────────────────────────
497
498    if let Some((ref group_field, ref agg)) = q.group_by {
499        let mut groups: HashMap<String, Vec<f64>> = HashMap::new();
500        for node in &rows {
501            let key = node.data.get(group_field)
502                .map(|v| v.to_string().trim_matches('"').to_string())
503                .unwrap_or_else(|| "null".to_string());
504            let val = node.data.get(group_field)
505                .and_then(|v| v.as_f64())
506                .unwrap_or(1.0);
507            groups.entry(key).or_default().push(val);
508        }
509        let result: Vec<Value> = groups.into_iter().map(|(k, vals)| {
510            let agg_val = match agg {
511                GroupAgg::Count => vals.len() as f64,
512                GroupAgg::Sum   => vals.iter().sum(),
513                GroupAgg::Avg   => vals.iter().sum::<f64>() / vals.len() as f64,
514                GroupAgg::Min   => vals.iter().cloned().fold(f64::INFINITY, f64::min),
515                GroupAgg::Max   => vals.iter().cloned().fold(f64::NEG_INFINITY, f64::max),
516            };
517            json!({group_field: k, "value": agg_val, "count": vals.len()})
518        }).collect();
519        return Ok(result);
520    }
521
522    // ── Serialize ─────────────────────────────────────────────────────────────
523
524    Ok(rows.into_iter().map(|n| node_to_json(&n)).collect())
525}
526
527/// Parse and execute NQL, returning (rows, count).
528pub fn query(db: &Db, nql: &str) -> Result<(Vec<Value>, usize)> {
529    let rows = execute(db, nql)?;
530    let count = rows.len();
531    Ok((rows, count))
532}
533
534#[cfg(test)]
535mod tests {
536    use super::*;
537    use tempfile::tempdir;
538    use crate::db::Db;
539
540    // Returns (TempDir, Db) — the TempDir guard MUST be kept alive by the caller
541    // (`let (_tmp, db) = setup();`). If it dropped here, its Drop would delete the
542    // database directory out from under the live Db, and every objects.read()
543    // (loose object files live on disk) would fail → queries return 0 rows.
544    fn setup() -> (tempfile::TempDir, Db) {
545        let dir = tempdir().unwrap();
546        let db = Db::open(dir.path(), None).unwrap();
547        db.create_sorted_index("blocks", "height");
548        for h in 1u64..=5 {
549            db.put("blocks", &h.to_string(),
550                serde_json::json!({"height": h, "hash": format!("000{}", h), "n_tx": h * 2}),
551                vec![], None, None).unwrap();
552        }
553        (dir, db)
554    }
555
556    #[test]
557    fn from_all() {
558        let (_tmp, db) = setup();
559        let (rows, count) = query(&db, "FROM blocks").unwrap();
560        assert_eq!(count, 5);
561        let _ = rows;
562    }
563
564    #[test]
565    fn where_eq() {
566        let (_tmp, db) = setup();
567        let (rows, count) = query(&db, r#"FROM blocks WHERE _id = "3""#).unwrap();
568        assert_eq!(count, 1);
569        assert_eq!(rows[0]["_id"], "3");
570    }
571
572    #[test]
573    fn order_by_limit() {
574        let (_tmp, db) = setup();
575        let (rows, count) = query(&db, "FROM blocks ORDER BY height ASC LIMIT 3").unwrap();
576        assert_eq!(count, 3);
577        assert_eq!(rows[0]["height"], 1);
578        assert_eq!(rows[2]["height"], 3);
579    }
580
581    #[test]
582    fn order_by_desc() {
583        let (_tmp, db) = setup();
584        let (rows, _) = query(&db, "FROM blocks ORDER BY height DESC LIMIT 2").unwrap();
585        assert_eq!(rows[0]["height"], 5);
586    }
587
588    #[test]
589    fn where_gt() {
590        let (_tmp, db) = setup();
591        let (rows, _) = query(&db, "FROM blocks WHERE height > 3").unwrap();
592        assert_eq!(rows.len(), 2);
593    }
594
595    #[test]
596    fn group_by_count() {
597        let (_tmp, db) = setup();
598        let (rows, _) = query(&db, "FROM blocks GROUP BY n_tx COUNT").unwrap();
599        assert_eq!(rows.len(), 5); // all unique n_tx values
600    }
601
602    #[test]
603    fn search() {
604        let (_tmp, db) = setup();
605        let (rows, _) = query(&db, r#"FROM blocks SEARCH "0003""#).unwrap();
606        assert_eq!(rows.len(), 1);
607    }
608
609    #[test]
610    fn as_of() {
611        let dir = tempdir().unwrap();
612        let db = Db::open(dir.path(), None).unwrap();
613        let v1 = db.put("docs", "x", serde_json::json!({"v": 1}), vec![], None, None).unwrap();
614        db.put("docs", "x", serde_json::json!({"v": 2}), vec![], None, None).unwrap();
615        let (rows, _) = query(&db, &format!("FROM docs AS OF {}", v1.seq)).unwrap();
616        assert_eq!(rows[0]["v"], 1);
617    }
618
619    #[test]
620    fn valid_as_of() {
621        let dir = tempdir().unwrap();
622        let db = Db::open(dir.path(), None).unwrap();
623        db.put("events", "e1", serde_json::json!({"type": "a"}), vec![],
624               Some("2025-01-01".to_string()), Some("2025-06-01".to_string())).unwrap();
625        db.put("events", "e2", serde_json::json!({"type": "b"}), vec![],
626               Some("2026-01-01".to_string()), None).unwrap();
627        let (rows, _) = query(&db, r#"FROM events VALID AS OF "2025-03-01""#).unwrap();
628        assert_eq!(rows.len(), 1);
629        assert_eq!(rows[0]["type"], "a");
630    }
631}
632
633#[cfg(test)]
634mod tests_traverse {
635    use super::*;
636    use tempfile::tempdir;
637    use crate::db::Db;
638
639    #[test]
640    fn traverse_one_hop() {
641        let db = Db::in_memory();
642        db.put("driver", "d1", serde_json::json!({"name": "Bob"}),   vec![], None, None).unwrap();
643        db.put("driver", "d2", serde_json::json!({"name": "Carol"}), vec![], None, None).unwrap();
644        db.put("trip",   "t1", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
645        db.put("trip",   "t2", serde_json::json!({"status": "ok"}),  vec![], None, None).unwrap();
646
647        db.link("driver:d1", "handles", "trip:t1").unwrap();
648        db.link("driver:d1", "handles", "trip:t2").unwrap();
649
650        let (rows, count) = query(&db, r#"FROM driver WHERE _id = "d1" TRAVERSE handles"#).unwrap();
651        assert_eq!(count, 2);
652        let ids: std::collections::HashSet<&str> = rows.iter()
653            .filter_map(|r| r["_id"].as_str())
654            .collect();
655        assert!(ids.contains("t1") && ids.contains("t2"));
656    }
657
658    #[test]
659    fn traverse_returns_empty_when_no_links() {
660        let db = Db::in_memory();
661        db.put("driver", "d1", serde_json::json!({"name": "Bob"}), vec![], None, None).unwrap();
662        let (rows, count) = query(&db, r#"FROM driver WHERE _id = "d1" TRAVERSE handles"#).unwrap();
663        assert_eq!(count, 0);
664        assert!(rows.is_empty());
665    }
666
667    #[test]
668    fn traverse_multi_source() {
669        // When WHERE matches multiple rows, TRAVERSE unions all their neighbors
670        let db = Db::in_memory();
671        db.put("driver", "d1", serde_json::json!({"status": "active"}), vec![], None, None).unwrap();
672        db.put("driver", "d2", serde_json::json!({"status": "active"}), vec![], None, None).unwrap();
673        db.put("trip",   "t1", serde_json::json!({"n": 1}), vec![], None, None).unwrap();
674        db.put("trip",   "t2", serde_json::json!({"n": 2}), vec![], None, None).unwrap();
675        db.put("trip",   "t3", serde_json::json!({"n": 3}), vec![], None, None).unwrap();
676
677        db.link("driver:d1", "handles", "trip:t1").unwrap();
678        db.link("driver:d1", "handles", "trip:t2").unwrap();
679        db.link("driver:d2", "handles", "trip:t3").unwrap();
680
681        let (rows, count) = query(&db, r#"FROM driver WHERE status = "active" TRAVERSE handles"#).unwrap();
682        assert_eq!(count, 3);
683    }
684
685    #[test]
686    fn traverse_nql_keyword_case_insensitive() {
687        // Parser normalises to uppercase — "traverse" and "TRAVERSE" both work
688        let db = Db::in_memory();
689        db.put("driver", "d1", serde_json::json!({}), vec![], None, None).unwrap();
690        db.put("trip",   "t1", serde_json::json!({}), vec![], None, None).unwrap();
691        db.link("driver:d1", "handles", "trip:t1").unwrap();
692        // uppercase
693        let (r1, c1) = query(&db, r#"FROM driver WHERE _id = "d1" TRAVERSE handles"#).unwrap();
694        assert_eq!(c1, 1);
695        // lowercase (lexer uppercases keywords)
696        let (r2, c2) = query(&db, r#"FROM driver WHERE _id = "d1" traverse handles"#).unwrap();
697        assert_eq!(c2, 1);
698        assert_eq!(r1[0]["_id"], r2[0]["_id"]);
699    }
700
701    #[test]
702    fn traverse_durable() {
703        let dir = tempdir().unwrap();
704        {
705            let db = Db::open(dir.path(), None).unwrap();
706            db.put("driver", "d1", serde_json::json!({"name": "Bob"}),   vec![], None, None).unwrap();
707            db.put("trip",   "t1", serde_json::json!({"status": "req"}), vec![], None, None).unwrap();
708            db.link("driver:d1", "handles", "trip:t1").unwrap();
709        }
710        let db2 = Db::open(dir.path(), None).unwrap();
711        db2.startup_ready.store(true, std::sync::atomic::Ordering::SeqCst);
712        let (rows, count) = query(&db2, r#"FROM driver WHERE _id = "d1" TRAVERSE handles"#).unwrap();
713        assert_eq!(count, 1);
714        assert_eq!(rows[0]["_id"], "t1");
715    }
716}