Skip to main content

omnigraph_server/
queries.rs

1//! Stored-query registry.
2//!
3//! A server-side registry of named, parameter-typed `.gq` queries that
4//! operators declare in `omnigraph.yaml` (per-graph, or top-level in
5//! single mode) and the server loads at startup. Each entry is parsed
6//! and its identity asserted here (`load`); type-checking against the
7//! live schema happens separately (a `check` pass) so the loader stays
8//! callable without an open engine (the CLI's offline `queries check`).
9//!
10//! Identity is the query **name**: the manifest key must equal the
11//! `query <name>` symbol declared in the referenced `.gq` file. The two
12//! are asserted equal at load — one name, two places that must agree.
13//! Renaming either is a breaking change to callers, by design.
14
15use std::collections::BTreeMap;
16use std::sync::Arc;
17
18use omnigraph_compiler::catalog::Catalog;
19use omnigraph_compiler::query::ast::QueryDecl;
20use omnigraph_compiler::query::parser::parse_query;
21use omnigraph_compiler::query::typecheck::typecheck_query_decl;
22use omnigraph_compiler::types::{PropType, ScalarType};
23
24/// One loaded stored query. `source` is the full `.gq` file text — the
25/// invocation handler hands it to `run_query` / `run_mutate` verbatim,
26/// which reuse the same parse/IR/exec path as the inline routes (no
27/// parallel implementation).
28#[derive(Debug, Clone)]
29pub struct StoredQuery {
30    /// Identity: manifest key == `query <name>` symbol.
31    pub name: String,
32    /// Full `.gq` source text the query was selected from.
33    pub source: Arc<str>,
34    /// Parsed declaration (params, mutations, description, …).
35    pub decl: QueryDecl,
36    /// Whether this query is listed in the MCP tool catalog (`GET /queries`).
37    /// Default `true` (the manifest entry is the opt-in); `expose: false`
38    /// keeps it HTTP/service-callable but hidden from the agent tool list.
39    /// Catalog membership only — not an authorization gate.
40    pub expose: bool,
41    /// Optional MCP tool-name override; defaults to `name`.
42    pub tool_name: Option<String>,
43}
44
45impl StoredQuery {
46    /// `true` if the selected declaration contains insert/update/delete
47    /// statements — drives read-vs-mutate routing at invocation time.
48    pub fn is_mutation(&self) -> bool {
49        !self.decl.mutations.is_empty()
50    }
51
52    /// The MCP tool name this query is catalogued under: the explicit
53    /// `tool_name` override, else the query `name`. The catalog key —
54    /// enforced unique across exposed queries at load. Server-side
55    /// consumers (the uniqueness check, the future catalog projection) read
56    /// this; the CLI `queries list` resolves the same rule on its own DTO.
57    pub fn effective_tool_name(&self) -> &str {
58        self.tool_name.as_deref().unwrap_or(&self.name)
59    }
60}
61
62/// A loaded, identity-checked stored-query registry for one graph.
63#[derive(Debug, Clone, Default)]
64pub struct QueryRegistry {
65    by_name: BTreeMap<String, StoredQuery>,
66}
67
68/// In-memory registry spec: a query's name + already-read `.gq` source. The
69/// input to [`QueryRegistry::from_specs`] — built by the server's cluster boot
70/// and by the CLI's `queries` tooling from a cluster serving snapshot.
71#[derive(Debug, Clone)]
72pub struct RegistrySpec {
73    pub name: String,
74    pub source: String,
75    pub expose: bool,
76    pub tool_name: Option<String>,
77}
78
79/// A single registry load failure. Collected (not fail-fast) so a bad
80/// `omnigraph.yaml` surfaces every broken entry at once, matching the
81/// bad-policy-YAML posture.
82#[derive(Debug, Clone)]
83pub struct LoadError {
84    /// The offending query name, when the failure is entry-scoped.
85    pub query: Option<String>,
86    pub message: String,
87}
88
89impl std::fmt::Display for LoadError {
90    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91        match &self.query {
92            Some(name) => write!(f, "stored query '{name}': {}", self.message),
93            None => write!(f, "stored query registry: {}", self.message),
94        }
95    }
96}
97
98impl QueryRegistry {
99    /// Build a registry from in-memory specs: parse each source, select
100    /// the declaration whose symbol equals the manifest key, and assert
101    /// they agree. Collects every failure. No schema type-checking here
102    /// — that is [`check`].
103    pub fn from_specs(specs: Vec<RegistrySpec>) -> Result<Self, Vec<LoadError>> {
104        let mut by_name = BTreeMap::new();
105        let mut errors = Vec::new();
106
107        for spec in specs {
108            match parse_query(&spec.source) {
109                Ok(file) => {
110                    match file.queries.into_iter().find(|q| q.name == spec.name) {
111                        Some(decl) => {
112                            by_name.insert(
113                                spec.name.clone(),
114                                StoredQuery {
115                                    name: spec.name,
116                                    source: Arc::from(spec.source),
117                                    decl,
118                                    expose: spec.expose,
119                                    tool_name: spec.tool_name,
120                                },
121                            );
122                        }
123                        None => errors.push(LoadError {
124                            query: Some(spec.name.clone()),
125                            message: format!(
126                                "no `query {}` declaration found in its `.gq` file \
127                                 (the registry key must match the query symbol)",
128                                spec.name
129                            ),
130                        }),
131                    }
132                }
133                Err(err) => errors.push(LoadError {
134                    query: Some(spec.name),
135                    message: format!("parse error: {err}"),
136                }),
137            }
138        }
139
140        // Exposed queries are catalogued under their effective tool name;
141        // two claiming one name is an MCP-namespace collision. Refuse it at
142        // load (collected, not fail-fast), naming the loser and the winner.
143        // Iterating the `BTreeMap` makes the winner deterministic (the
144        // lexicographically-first query name; config is a map, so YAML
145        // declaration order isn't preserved anyway) and the error order
146        // stable. Scoped to a block so these borrows of `by_name` end
147        // before it is moved into `Self`.
148        {
149            let mut claimed: BTreeMap<&str, &str> = BTreeMap::new();
150            for query in by_name.values().filter(|q| q.expose) {
151                let tool = query.effective_tool_name();
152                if let Some(winner) = claimed.insert(tool, &query.name) {
153                    errors.push(LoadError {
154                        query: Some(query.name.clone()),
155                        message: format!(
156                            "MCP tool name '{tool}' already claimed by exposed query '{winner}'"
157                        ),
158                    });
159                }
160            }
161        }
162
163        if errors.is_empty() {
164            Ok(Self { by_name })
165        } else {
166            Err(errors)
167        }
168    }
169
170    pub fn lookup(&self, name: &str) -> Option<&StoredQuery> {
171        self.by_name.get(name)
172    }
173
174    pub fn iter(&self) -> impl Iterator<Item = &StoredQuery> {
175        self.by_name.values()
176    }
177
178    pub fn is_empty(&self) -> bool {
179        self.by_name.is_empty()
180    }
181
182    pub fn len(&self) -> usize {
183        self.by_name.len()
184    }
185}
186
187/// A stored query that fails to type-check against the live schema —
188/// e.g. it references a node/edge type or property that was renamed or
189/// removed by a migration. Breakages **block server boot** (same posture
190/// as bad policy YAML), surfacing schema drift at the deploy boundary
191/// rather than silently at invocation time.
192#[derive(Debug, Clone)]
193pub struct Breakage {
194    pub query: String,
195    pub message: String,
196}
197
198/// A non-blocking advisory found during validation. Logged at boot;
199/// never blocks startup. Currently: an MCP-exposed query that declares a
200/// parameter an agent cannot realistically supply.
201#[derive(Debug, Clone)]
202pub struct Warning {
203    pub query: String,
204    pub message: String,
205}
206
207/// Outcome of validating a registry against a schema. Breakages are
208/// fatal (boot refuses); warnings are advisory.
209#[derive(Debug, Clone, Default)]
210pub struct CheckReport {
211    pub breakages: Vec<Breakage>,
212    pub warnings: Vec<Warning>,
213}
214
215impl CheckReport {
216    pub fn has_breakages(&self) -> bool {
217        !self.breakages.is_empty()
218    }
219
220    pub fn is_clean(&self) -> bool {
221        self.breakages.is_empty() && self.warnings.is_empty()
222    }
223}
224
225/// Validate a loaded registry against the live schema.
226///
227/// Pure over `(registry, catalog)` — takes an already-parsed registry and
228/// a catalog, so it is callable both at server boot (with the engine's
229/// `catalog()`) and offline from the CLI (`omnigraph queries check`),
230/// without coupling to server config or an open engine connection.
231///
232/// Every query is type-checked via the same `typecheck_query_decl` the
233/// engine runs for inline queries — no parallel implementation. Failures
234/// are **collected, not fail-fast**, so an operator sees every broken
235/// query in one pass.
236///
237/// Advisory lint (warn, never block): an `mcp.expose: true` query that
238/// declares a `Vector(N)` parameter. An LLM cannot supply a raw embedding
239/// vector; such a query should take a `String` parameter and let the
240/// engine embed it server-side at query time. Service-to-service callers
241/// may legitimately pass vectors, so this warns rather than rejects.
242pub fn check(registry: &QueryRegistry, catalog: &Catalog) -> CheckReport {
243    let mut report = CheckReport::default();
244    for query in registry.iter() {
245        if let Err(err) = typecheck_query_decl(catalog, &query.decl) {
246            report.breakages.push(Breakage {
247                query: query.name.clone(),
248                message: err.to_string(),
249            });
250        }
251        if query.expose {
252            for param in &query.decl.params {
253                // Resolve to the structured type via the compiler's own
254                // resolver rather than string-matching `Vector(` — one
255                // canonical definition of "is a vector", so this lint can't
256                // drift from how the parser/type system spells the type.
257                let is_vector = PropType::from_param_type_name(&param.type_name, param.nullable)
258                    .is_some_and(|pt| matches!(pt.scalar, ScalarType::Vector(_)));
259                if is_vector {
260                    report.warnings.push(Warning {
261                        query: query.name.clone(),
262                        message: format!(
263                            "MCP-exposed query declares a `{}` parameter `${}` that agents \
264                             cannot supply; use a `String` parameter for server-side embedding",
265                            param.type_name, param.name
266                        ),
267                    });
268                }
269            }
270        }
271    }
272    report
273}
274
275/// Format every breakage in a registry check report into a multi-line
276/// operator-facing message, naming each offending query.
277pub fn format_check_breakages(label: &str, report: &CheckReport) -> String {
278    let joined = report
279        .breakages
280        .iter()
281        .map(|b| format!("query '{}': {}", b.query, b.message))
282        .collect::<Vec<_>>()
283        .join("\n  ");
284    format!(
285        "graph '{label}': {} stored quer{} failed the schema check:\n  {joined}",
286        report.breakages.len(),
287        if report.breakages.len() == 1 {
288            "y"
289        } else {
290            "ies"
291        }
292    )
293}
294
295#[cfg(test)]
296mod tests {
297    use super::*;
298
299    fn spec(name: &str, source: &str, expose: bool) -> RegistrySpec {
300        RegistrySpec {
301            name: name.to_string(),
302            source: source.to_string(),
303            expose,
304            tool_name: None,
305        }
306    }
307
308    fn spec_tool(name: &str, source: &str, expose: bool, tool_name: &str) -> RegistrySpec {
309        RegistrySpec {
310            name: name.to_string(),
311            source: source.to_string(),
312            expose,
313            tool_name: Some(tool_name.to_string()),
314        }
315    }
316
317    #[test]
318    fn key_equal_symbol_loads() {
319        let reg = QueryRegistry::from_specs(vec![spec(
320            "find_user",
321            "query find_user($id: String) { match { $u: User } return { $u.name } }",
322            true,
323        )])
324        .unwrap();
325        let q = reg.lookup("find_user").unwrap();
326        assert_eq!(q.name, "find_user");
327        assert!(q.expose);
328        assert_eq!(q.decl.params.len(), 1);
329        assert!(!q.is_mutation());
330        // No override → the effective tool name is the query name.
331        assert_eq!(q.effective_tool_name(), "find_user");
332
333        // An explicit override is what the catalog keys on.
334        let with_tool = QueryRegistry::from_specs(vec![spec_tool(
335            "find_user",
336            "query find_user($id: String) { match { $u: User } return { $u.name } }",
337            true,
338            "lookup_user",
339        )])
340        .unwrap();
341        assert_eq!(
342            with_tool.lookup("find_user").unwrap().effective_tool_name(),
343            "lookup_user"
344        );
345    }
346
347    #[test]
348    fn key_mismatch_is_an_identity_error() {
349        let errors = QueryRegistry::from_specs(vec![spec(
350            "find_user",
351            // symbol is `lookup`, key is `find_user` — must be rejected.
352            "query lookup($id: String) { match { $u: User } return { $u.name } }",
353            false,
354        )])
355        .unwrap_err();
356        assert_eq!(errors.len(), 1);
357        assert_eq!(errors[0].query.as_deref(), Some("find_user"));
358        assert!(errors[0].message.contains("must match the query symbol"));
359    }
360
361    #[test]
362    fn multi_query_file_selects_the_matching_symbol() {
363        let source = "query a($x: I64) { match { $u: User } return { $u.name } }\n\
364                      query b($y: String) { match { $u: User } return { $u.name } }";
365        let reg = QueryRegistry::from_specs(vec![spec("b", source, false)]).unwrap();
366        let q = reg.lookup("b").unwrap();
367        assert_eq!(q.name, "b");
368        assert_eq!(q.decl.params[0].name, "y");
369        assert!(reg.lookup("a").is_none(), "only the selected symbol is registered");
370    }
371
372    #[test]
373    fn duplicate_exposed_tool_name_is_a_load_error() {
374        // Two MCP-exposed queries claiming one tool name is an ambiguity in
375        // the catalog key space — refused at load, naming both queries and
376        // the contested tool.
377        let errors = QueryRegistry::from_specs(vec![
378            spec_tool("a", "query a() { match { $u: User } return { $u.name } }", true, "dup"),
379            spec_tool("b", "query b() { match { $u: User } return { $u.name } }", true, "dup"),
380        ])
381        .unwrap_err();
382        assert_eq!(errors.len(), 1);
383        let msg = errors[0].to_string();
384        assert!(msg.contains("'dup'"), "names the contested tool: {msg}");
385        assert!(msg.contains("'a'"), "names the winning query: {msg}");
386        assert!(msg.contains("'b'"), "names the losing query: {msg}");
387    }
388
389    #[test]
390    fn duplicate_tool_name_among_unexposed_is_allowed() {
391        // Unexposed queries have no MCP tool, so a shared effective tool
392        // name is inert — must not error (pins the exposed-only scope).
393        let reg = QueryRegistry::from_specs(vec![
394            spec_tool("a", "query a() { match { $u: User } return { $u.name } }", false, "dup"),
395            spec_tool("b", "query b() { match { $u: User } return { $u.name } }", false, "dup"),
396        ])
397        .unwrap();
398        assert_eq!(reg.len(), 2);
399    }
400
401    #[test]
402    fn parse_error_surfaces_per_entry() {
403        let errors =
404            QueryRegistry::from_specs(vec![spec("broken", "query broken( {{ not valid", false)])
405                .unwrap_err();
406        assert_eq!(errors[0].query.as_deref(), Some("broken"));
407        assert!(errors[0].message.contains("parse error"));
408    }
409
410    #[test]
411    fn errors_collect_rather_than_fail_fast() {
412        let errors = QueryRegistry::from_specs(vec![
413            spec("good", "query good() { match { $u: User } return { $u.name } }", false),
414            spec("mismatch", "query other() { match { $u: User } return { $u.name } }", false),
415            spec("broken", "query broken(", false),
416        ])
417        .unwrap_err();
418        // `good` loads cleanly; only the mismatch and the parse error are
419        // reported, and both surface in one pass (not fail-fast).
420        assert_eq!(errors.len(), 2);
421    }
422
423    #[test]
424    fn mutation_body_classifies_as_mutation() {
425        let reg = QueryRegistry::from_specs(vec![spec(
426            "add_user",
427            "query add_user($name: String) { insert User { name: $name } }",
428            false,
429        )])
430        .unwrap();
431        assert!(reg.lookup("add_user").unwrap().is_mutation());
432    }
433
434    // --- check(registry, catalog) ---
435
436    use omnigraph_compiler::catalog::build_catalog;
437    use omnigraph_compiler::schema::parser::parse_schema;
438
439    fn test_catalog() -> Catalog {
440        let schema = parse_schema(
441            r#"
442node User {
443name: String
444age: I32?
445embedding: Vector(4)
446}
447"#,
448        )
449        .unwrap();
450        build_catalog(&schema).unwrap()
451    }
452
453    #[test]
454    fn check_passes_for_valid_query() {
455        let reg = QueryRegistry::from_specs(vec![spec(
456            "find_user",
457            "query find_user($name: String) { match { $u: User { name: $name } } return { $u.age } }",
458            false,
459        )])
460        .unwrap();
461        let report = check(&reg, &test_catalog());
462        assert!(report.is_clean(), "unexpected: {:?}", report);
463    }
464
465    #[test]
466    fn check_reports_unknown_type_as_breakage() {
467        let reg = QueryRegistry::from_specs(vec![spec(
468            "ghost",
469            // `Widget` is not in the schema.
470            "query ghost() { match { $w: Widget } return { $w.name } }",
471            false,
472        )])
473        .unwrap();
474        let report = check(&reg, &test_catalog());
475        assert!(report.has_breakages());
476        assert_eq!(report.breakages[0].query, "ghost");
477    }
478
479    #[test]
480    fn check_reports_unknown_property_as_breakage() {
481        let reg = QueryRegistry::from_specs(vec![spec(
482            "bad_prop",
483            // `User` exists but has no `nickname`.
484            "query bad_prop() { match { $u: User } return { $u.nickname } }",
485            false,
486        )])
487        .unwrap();
488        let report = check(&reg, &test_catalog());
489        assert!(report.has_breakages());
490        assert_eq!(report.breakages[0].query, "bad_prop");
491    }
492
493    #[test]
494    fn check_collects_every_breakage_not_fail_fast() {
495        let reg = QueryRegistry::from_specs(vec![
496            spec("a", "query a() { match { $w: Widget } return { $w.x } }", false),
497            spec("b", "query b() { match { $g: Gadget } return { $g.y } }", false),
498            spec(
499                "ok",
500                "query ok() { match { $u: User } return { $u.name } }",
501                false,
502            ),
503        ])
504        .unwrap();
505        let report = check(&reg, &test_catalog());
506        assert_eq!(report.breakages.len(), 2, "both bad queries reported: {:?}", report);
507    }
508
509    #[test]
510    fn vector_param_on_exposed_query_warns() {
511        let reg = QueryRegistry::from_specs(vec![spec(
512            "vec_search",
513            "query vec_search($q: Vector(4)) { match { $u: User } return { $u.name } \
514             order { nearest($u.embedding, $q) } limit 3 }",
515            true, // mcp.expose
516        )])
517        .unwrap();
518        let report = check(&reg, &test_catalog());
519        assert!(!report.has_breakages(), "valid query: {:?}", report);
520        assert_eq!(report.warnings.len(), 1);
521        assert_eq!(report.warnings[0].query, "vec_search");
522    }
523
524    #[test]
525    fn vector_param_on_unexposed_query_is_silent() {
526        let reg = QueryRegistry::from_specs(vec![spec(
527            "vec_search",
528            "query vec_search($q: Vector(4)) { match { $u: User } return { $u.name } \
529             order { nearest($u.embedding, $q) } limit 3 }",
530            false, // not exposed — vector param is fine for service-to-service callers
531        )])
532        .unwrap();
533        let report = check(&reg, &test_catalog());
534        assert!(report.is_clean(), "unexpected: {:?}", report);
535    }
536
537    #[test]
538    fn non_vector_param_on_exposed_query_does_not_warn() {
539        // The recommended `String` alternative on an exposed query does not
540        // resolve to a Vector, so the embedding advisory stays silent. Guards
541        // the structured type check against a false positive (and pins that
542        // only `Vector(_)` triggers the warning).
543        let reg = QueryRegistry::from_specs(vec![spec(
544            "search",
545            "query search($name: String) { match { $u: User { name: $name } } return { $u.name } }",
546            true,
547        )])
548        .unwrap();
549        let report = check(&reg, &test_catalog());
550        assert!(report.is_clean(), "no breakage or warning expected: {:?}", report);
551    }
552
553    // --- catalog projection (api::query_catalog_entry) ---
554
555    #[test]
556    fn catalog_entry_projects_every_param_kind() {
557        use crate::api::{self, ParamKind};
558        let reg = QueryRegistry::from_specs(vec![spec_tool(
559            "all_types",
560            "query all_types($s: String, $i: I32, $big: I64, $u: U64, $f: F64, $b: Bool, \
561             $d: Date, $dt: DateTime, $blob: Blob, $opt: String?, $list: [I32], $vec: Vector(4)) \
562             { match { $x: User } return { $x.name } }",
563            true,
564            "all",
565        )])
566        .unwrap();
567        let entry = api::query_catalog_entry(reg.lookup("all_types").unwrap());
568        assert_eq!(entry.name, "all_types");
569        assert_eq!(entry.tool_name, "all");
570        assert!(!entry.mutation);
571
572        let by: std::collections::HashMap<_, _> =
573            entry.params.iter().map(|p| (p.name.as_str(), p)).collect();
574        assert_eq!(by["s"].kind, ParamKind::String);
575        assert_eq!(by["i"].kind, ParamKind::Int);
576        assert_eq!(by["big"].kind, ParamKind::BigInt, "I64 → bigint (string on the wire)");
577        assert_eq!(by["u"].kind, ParamKind::BigInt, "U64 → bigint");
578        assert_eq!(by["f"].kind, ParamKind::Float);
579        assert_eq!(by["b"].kind, ParamKind::Bool);
580        assert_eq!(by["d"].kind, ParamKind::Date);
581        assert_eq!(by["dt"].kind, ParamKind::DateTime);
582        assert_eq!(by["blob"].kind, ParamKind::Blob);
583        assert!(!by["s"].nullable);
584        assert!(by["opt"].nullable, "String? → nullable");
585        assert_eq!(by["list"].kind, ParamKind::List);
586        assert_eq!(by["list"].item_kind, Some(ParamKind::Int), "[I32] → list of int");
587        assert_eq!(by["vec"].kind, ParamKind::Vector);
588        assert_eq!(by["vec"].vector_dim, Some(4));
589    }
590
591    #[test]
592    fn catalog_entry_flags_mutation_and_empty_params() {
593        use crate::api;
594        let reg = QueryRegistry::from_specs(vec![spec(
595            "add_user",
596            "query add_user($name: String) { insert User { name: $name } }",
597            true,
598        )])
599        .unwrap();
600        let entry = api::query_catalog_entry(reg.lookup("add_user").unwrap());
601        assert!(entry.mutation, "insert body → mutation flag");
602
603        let reg2 = QueryRegistry::from_specs(vec![spec(
604            "no_params",
605            "query no_params() { match { $u: User } return { $u.name } }",
606            true,
607        )])
608        .unwrap();
609        let entry2 = api::query_catalog_entry(reg2.lookup("no_params").unwrap());
610        assert!(entry2.params.is_empty(), "no declared params → empty list");
611    }
612
613}