Skip to main content

aurora_db/parser/
executor.rs

1//! AQL Executor - Connects parsed AQL to Aurora operations
2//!
3//! Provides the bridge between AQL AST and Aurora's database operations.
4//! All operations (queries, mutations, subscriptions) go through execute().
5
6use super::ast::{self, Field, Filter as AqlFilter, FragmentDef, MutationOp, Operation, Selection};
7use super::executor_utils::{CompiledFilter, compile_filter};
8
9use crate::Aurora;
10use crate::error::{AqlError, ErrorCode, Result};
11use crate::types::{Document, FieldDefinition, FieldType, ScalarType, Value};
12use serde::Serialize;
13use serde_json::Value as JsonValue;
14use std::collections::HashMap;
15use std::sync::Arc;
16
17// chrono used for downsample timestamp bucketing
18#[allow(unused_imports)]
19use chrono::TimeZone as _;
20
21pub type ExecutionContext = HashMap<String, JsonValue>;
22
23/// A pre-compiled query plan that can be executed repeatedly with different variables
24#[derive(Debug, Clone)]
25pub struct QueryPlan {
26    pub collection: String,
27    pub filter: Option<ast::Filter>,
28    pub compiled_filter: Option<CompiledFilter>,
29    pub projection: Vec<ast::Field>,
30    pub limit: Option<usize>,
31    pub offset: usize,
32    pub after: Option<String>,
33    pub orderings: Vec<ast::Ordering>,
34    pub is_connection: bool,
35    pub has_lookups: bool,
36    pub fragments: HashMap<String, FragmentDef>,
37    pub variable_definitions: Vec<ast::VariableDefinition>,
38    pub directives: Vec<ast::Directive>,
39    /// Fields required for lookups (localField values)
40    pub lookup_dependencies: Vec<String>,
41}
42
43impl QueryPlan {
44    pub fn validate(&self, provided_variables: &HashMap<String, ast::Value>) -> Result<()> {
45        validate_required_variables(&self.variable_definitions, provided_variables)
46    }
47
48    pub fn from_query(
49        db: &Aurora,
50        query: &ast::Query,
51        fragments: &HashMap<String, FragmentDef>,
52        variables: &HashMap<String, ast::Value>,
53    ) -> Result<Vec<Self>> {
54        let root_fields = collect_fields(&query.selection_set, fragments, variables, None)?;
55
56        let mut plans = Vec::new();
57        for field in root_fields {
58            let collection = field.name.clone();
59            let filter = extract_filter_from_args(&field.arguments)?;
60            let compiled_filter = if let Some(ref f) = filter {
61                Some(compile_filter(f)?)
62            } else {
63                None
64            };
65
66            let sub_fields = collect_fields(
67                &field.selection_set,
68                fragments,
69                variables,
70                Some(&field.name),
71            )?;
72
73            let (limit, offset) = extract_pagination(&field.arguments);
74            let (first, after, _last, _before) = extract_cursor_pagination(&field.arguments);
75            let orderings = extract_order_by(&field.arguments);
76            let is_connection = sub_fields
77                .iter()
78                .any(|f| f.name == "edges" || f.name == "pageInfo");
79
80            let lookup_dependencies =
81                identify_lookup_dependencies(db, &collection, &sub_fields, variables);
82            let has_lookups = !lookup_dependencies.is_empty();
83
84            plans.push(QueryPlan {
85                collection,
86                filter,
87                compiled_filter,
88                projection: sub_fields,
89                limit: limit.or(first),
90                offset,
91                after,
92                orderings,
93                is_connection,
94                has_lookups,
95                fragments: fragments.clone(),
96                variable_definitions: query.variable_definitions.clone(),
97                directives: field.directives.clone(),
98                lookup_dependencies,
99            });
100        }
101        Ok(plans)
102    }
103}
104
105/// Execute an AQL query string against the database
106pub async fn execute(db: &Aurora, aql: &str, options: ExecutionOptions) -> Result<ExecutionResult> {
107    use std::collections::hash_map::DefaultHasher;
108    use std::hash::{Hash, Hasher};
109
110    // Compute query hash for cache lookup (ignoring whitespace variations)
111    let query_key = {
112        let mut hasher = DefaultHasher::new();
113        aql.trim().hash(&mut hasher);
114        hasher.finish()
115    };
116
117    // Prepare variables for this execution
118    let vars: HashMap<String, ast::Value> = options.variables.clone();
119
120    // HIGH-SPEED PATH: Check for pre-compiled Query Plan
121    // Search queries are never cached — execute_plan doesn't handle `search:`.
122    if let Some(plan) = db.plan_cache.get(&query_key) {
123        plan.validate(&vars)?;
124        return execute_plan(db, &plan, &vars, &options).await;
125    }
126
127    // SLOW PATH: Parse and Compile
128    let mut doc = super::parse(aql)?;
129    println!("Parsed document with {} operations", doc.operations.len());
130
131    // Extract first query operation for planning
132    if let Some(Operation::Query(query)) = doc
133        .operations
134        .iter()
135        .find(|op| matches!(op, Operation::Query(_)))
136    {
137        let fragments: HashMap<String, FragmentDef> = doc
138            .operations
139            .iter()
140            .filter_map(|op| {
141                if let Operation::FragmentDefinition(f) = op {
142                    Some((f.name.clone(), f.clone()))
143                } else {
144                    None
145                }
146            })
147            .collect();
148
149        // Skip the fast plan path for search queries — execute_plan doesn't
150        // handle the `search:` argument; route through execute_document instead.
151        // Use collect_fields() so inline fragments and fragment spreads are
152        // traversed the same way QueryPlan::from_query() traverses them.
153        let root_fields = collect_fields(&query.selection_set, &fragments, &vars, None)?;
154        let has_search = root_fields
155            .iter()
156            .any(|f| f.arguments.iter().any(|a| a.name == "search"));
157
158        if !has_search {
159            let plans = QueryPlan::from_query(db, query, &fragments, &vars)?;
160            if plans.len() == 1 {
161                let plan = Arc::new(plans[0].clone());
162                plan.validate(&vars)?;
163                db.plan_cache.insert(query_key, Arc::clone(&plan));
164
165                return execute_plan(db, &plan, &vars, &options).await;
166            }
167        }
168    }
169
170    // FALLBACK: Normal execution for complex documents (but WITH our prepared vars)
171    // Merge declared variable defaults for any var not explicitly provided.
172    let mut vars = vars;
173    for op in &doc.operations {
174        if let Operation::Query(q) = op {
175            for var_def in &q.variable_definitions {
176                if !vars.contains_key(&var_def.name) {
177                    if let Some(default) = &var_def.default_value {
178                        vars.insert(var_def.name.clone(), default.clone());
179                    }
180                }
181            }
182        }
183    }
184
185    // We must resolve variables in the AST before executing the document path
186    super::validator::resolve_variables(&mut doc, &vars).map_err(|e| {
187        let code = match e.code {
188            super::validator::ErrorCode::MissingRequiredVariable => ErrorCode::UndefinedVariable,
189            super::validator::ErrorCode::TypeMismatch => ErrorCode::TypeError,
190            _ => ErrorCode::QueryError,
191        };
192        AqlError::new(code, e.to_string())
193    })?;
194
195    execute_document(db, &doc, &options).await
196}
197
198pub async fn execute_plan(
199    db: &Aurora,
200    plan: &QueryPlan,
201    variables: &HashMap<String, ast::Value>,
202    options: &ExecutionOptions,
203) -> Result<ExecutionResult> {
204    // RBAC: Check collection level permissions from query directives
205    check_permissions(&plan.directives, options, false)?;
206
207    let collection_name = &plan.collection;
208
209    // Determine effective limit for the scan
210    let effective_limit = plan.limit;
211
212    // 1. Resolve Indexed Equality (Fast path)
213    let mut indexed_docs = None;
214    if let Some(ref f) = plan.filter {
215        // We need a version of find_indexed that uses our runtime variables
216        if let Some((field, val)) =
217            find_indexed_equality_filter_runtime(f, db, collection_name, variables)
218        {
219            // Skip the indexed fast path if the index is still being rebuilt —
220            // fall through to the full scan below instead of returning zero results.
221            let index_ready = field == "_sid" || !db.is_index_building(collection_name, &field);
222            if index_ready {
223                let mut db_val = aql_value_to_db_value(&val, variables)?;
224
225                // Coerce based on schema if available to ensure index hits
226                if let Ok(col_def) = db.get_collection_definition(collection_name) {
227                    if let Some(f_def) = col_def.fields.get(&field) {
228                        db_val = db_val.coerce_to(&f_def.field_type);
229                    }
230                }
231
232                let ids = if field == "_sid" {
233                    match &db_val {
234                        Value::String(s) => vec![s.clone()],
235                        Value::Uuid(u) => vec![u.to_string()],
236                        _ => vec![],
237                    }
238                } else {
239                    db.get_ids_from_index(collection_name, &field, &db_val)
240                };
241
242                let mut docs = Vec::with_capacity(ids.len());
243                for id in ids {
244                    if let Some(doc) = db.get_document(collection_name, &id)? {
245                        // Check if doc matches the REST of the plan's compiled filter
246                        if let Some(ref cf) = plan.compiled_filter {
247                            if matches_filter(&doc, cf, variables) {
248                                docs.push(doc);
249                            }
250                        } else {
251                            docs.push(doc);
252                        }
253                    }
254                }
255
256                indexed_docs = Some(docs);
257            } // end if index_ready — if not ready, indexed_docs stays None → full scan
258        }
259    }
260
261    let mut docs = if let Some(d) = indexed_docs {
262        d
263    } else {
264        // Fallback to scan
265        let vars_arc = Arc::new(variables.clone());
266        let cf_clone = plan.compiled_filter.clone();
267        let filter_fn = move |doc: &Document| {
268            cf_clone
269                .as_ref()
270                .map(|f| matches_filter(doc, f, &vars_arc))
271                .unwrap_or(true)
272        };
273
274        // Scan-limit is only valid when there is no cursor and no ordering —
275        // orderings require a full scan to find the correct top-N.
276        let scan_limit = if plan.after.is_some() || !plan.orderings.is_empty() {
277            None
278        } else {
279            plan.limit.map(|l| {
280                let base = if plan.is_connection { l + 1 } else { l };
281                base + plan.offset
282            })
283        };
284        db.scan_and_filter(collection_name, filter_fn, scan_limit)?
285    };
286
287    // 2. Sort (must happen before cursor drain and offset)
288    if !plan.orderings.is_empty() {
289        apply_ordering(&mut docs, &plan.orderings);
290    }
291
292    // 2a. Cursor pagination: skip past the `after` cursor
293    if let Some(ref cursor) = plan.after {
294        if let Some(pos) = docs.iter().position(|d| &d._sid == cursor) {
295            docs.drain(0..=pos);
296        }
297    }
298
299    // 2b. Apply offset
300    if plan.offset > 0 {
301        if plan.offset < docs.len() {
302            docs.drain(0..plan.offset);
303        } else {
304            docs.clear();
305        }
306    }
307
308    // 3. Handle Connection vs Flat List
309    if plan.is_connection {
310        return Ok(ExecutionResult::Query(execute_connection(
311            docs,
312            &plan.projection,
313            effective_limit,
314            &plan.fragments,
315            variables,
316            options,
317        )?));
318    }
319
320    // 3a. Apply limit for flat list (after ordering so we get correct top-N)
321    if let Some(l) = effective_limit {
322        docs.truncate(l);
323    }
324
325    // 4. Project (Flat List)
326    if options.apply_projections && !plan.projection.is_empty() {
327        // Check if we have an aggregate or groupBy field
328        let agg_field = plan.projection.iter().find(|f| f.name == "aggregate");
329        let group_by_field = plan.projection.iter().find(|f| f.name == "groupBy");
330
331        if let Some(f) = group_by_field {
332            // Handle GroupBy
333            let field_name = f
334                .arguments
335                .iter()
336                .find(|a| a.name == "field")
337                .and_then(|a| match &a.value {
338                    ast::Value::String(s) => Some(s),
339                    _ => None,
340                });
341
342            if let Some(group_field) = field_name {
343                let mut groups: HashMap<String, Vec<Document>> = HashMap::new();
344                for d in docs {
345                    let key = d
346                        .data
347                        .get(group_field)
348                        .map(|v| match v {
349                            Value::String(s) => s.clone(),
350                            _ => v.to_string(),
351                        })
352                        .unwrap_or_else(|| "null".to_string());
353                    groups.entry(key).or_default().push(d);
354                }
355
356                let mut group_docs = Vec::with_capacity(groups.len());
357                for (key, group_items) in groups {
358                    let mut data = HashMap::new();
359                    for selection in &f.selection_set {
360                        if let Selection::Field(sub_f) = selection {
361                            let alias = sub_f.alias.as_ref().unwrap_or(&sub_f.name);
362                            match sub_f.name.as_str() {
363                                "key" => {
364                                    data.insert(alias.clone(), Value::String(key.clone()));
365                                }
366                                "count" => {
367                                    data.insert(
368                                        alias.clone(),
369                                        Value::Int(group_items.len() as i64),
370                                    );
371                                }
372                                "nodes" => {
373                                    let sub_fields = collect_fields(
374                                        &sub_f.selection_set,
375                                        &plan.fragments,
376                                        variables,
377                                        None,
378                                    )
379                                    .unwrap_or_default();
380
381                                    let mut projected_nodes = Vec::with_capacity(group_items.len());
382                                    for d in &group_items {
383                                        let node_doc =
384                                            apply_projection(d.clone(), &sub_fields, options)?;
385                                        projected_nodes.push(Value::Object(node_doc.data));
386                                    }
387                                    data.insert(alias.clone(), Value::Array(projected_nodes));
388                                }
389                                "aggregate" => {
390                                    let agg_res =
391                                        compute_aggregates(&group_items, &sub_f.selection_set);
392                                    data.insert(alias.clone(), Value::Object(agg_res));
393                                }
394                                _ => {}
395                            }
396                        }
397                    }
398                    group_docs.push(Document {
399                        _sid: format!("group:{}", key),
400                        data,
401                    });
402                }
403                docs = group_docs;
404            }
405        } else if let Some(agg) = agg_field {
406            // If aggregate is the ONLY field, we return a single summary document
407            // If there are other fields, we currently follow the behavior of adding the agg to each doc
408            // (Test expectations suggest a single doc if only agg is asked)
409            let agg_result = compute_aggregates(&docs, &agg.selection_set);
410            let alias = agg.alias.as_ref().unwrap_or(&agg.name);
411
412            if plan.projection.len() == 1 {
413                let mut data = HashMap::new();
414                data.insert(alias.clone(), Value::Object(agg_result));
415                docs = vec![Document {
416                    _sid: "aggregate".to_string(),
417                    data,
418                }];
419            } else {
420                // Add aggregate to each document
421                let mut projected = Vec::with_capacity(docs.len());
422                for mut d in docs {
423                    d.data
424                        .insert(alias.clone(), Value::Object(agg_result.clone()));
425                    projected.push(apply_projection(d, &plan.projection, options)?);
426                }
427                docs = projected;
428            }
429        } else if !plan.has_lookups {
430            let mut projected = Vec::with_capacity(docs.len());
431            for d in docs {
432                projected.push(apply_projection(d, &plan.projection, options)?);
433            }
434            docs = projected;
435        } else {
436            // Slow path for lookups
437            let mut projected = Vec::with_capacity(docs.len());
438            for d in docs {
439                let (proj_doc, _) = apply_projection_with_lookups(
440                    db,
441                    d,
442                    collection_name,
443                    &plan.projection,
444                    &plan.fragments,
445                    variables,
446                    options,
447                    &plan.lookup_dependencies,
448                )
449                .await?;
450                projected.push(proj_doc);
451            }
452            docs = projected;
453        }
454    }
455
456    Ok(ExecutionResult::Query(QueryResult {
457        collection: collection_name.clone(),
458        documents: docs,
459        total_count: None,
460        deferred_fields: vec![],
461        explain: None,
462    }))
463}
464
465fn compute_aggregates(docs: &[Document], selections: &[Selection]) -> HashMap<String, Value> {
466    let mut results = HashMap::new();
467
468    for selection in selections {
469        if let Selection::Field(f) = selection {
470            let alias = f.alias.as_ref().unwrap_or(&f.name);
471            let value = match f.name.as_str() {
472                "count" => Value::Int(docs.len() as i64),
473                "sum" => {
474                    let field =
475                        f.arguments
476                            .iter()
477                            .find(|a| a.name == "field")
478                            .and_then(|a| match &a.value {
479                                ast::Value::String(s) => Some(s),
480                                _ => None,
481                            });
482
483                    if let Some(field_name) = field {
484                        let sum: f64 = docs
485                            .iter()
486                            .filter_map(|d| d.data.get(field_name))
487                            .filter_map(|v| match v {
488                                Value::Int(i) => Some(*i as f64),
489                                Value::Float(f) => Some(*f),
490                                _ => None,
491                            })
492                            .sum();
493                        Value::Float(sum)
494                    } else {
495                        Value::Null
496                    }
497                }
498                "avg" => {
499                    let field =
500                        f.arguments
501                            .iter()
502                            .find(|a| a.name == "field")
503                            .and_then(|a| match &a.value {
504                                ast::Value::String(s) => Some(s),
505                                _ => None,
506                            });
507
508                    if let Some(field_name) = field
509                        && !docs.is_empty()
510                    {
511                        let values: Vec<f64> = docs
512                            .iter()
513                            .filter_map(|d| d.data.get(field_name))
514                            .filter_map(|v| match v {
515                                Value::Int(i) => Some(*i as f64),
516                                Value::Float(f) => Some(*f),
517                                _ => None,
518                            })
519                            .collect();
520
521                        if values.is_empty() {
522                            Value::Null
523                        } else {
524                            let sum: f64 = values.iter().sum();
525                            Value::Float(sum / values.len() as f64)
526                        }
527                    } else {
528                        Value::Null
529                    }
530                }
531                "min" => {
532                    let field =
533                        f.arguments
534                            .iter()
535                            .find(|a| a.name == "field")
536                            .and_then(|a| match &a.value {
537                                ast::Value::String(s) => Some(s),
538                                _ => None,
539                            });
540
541                    if let Some(field_name) = field
542                        && !docs.is_empty()
543                    {
544                        let min = docs
545                            .iter()
546                            .filter_map(|d| d.data.get(field_name))
547                            .filter_map(|v| match v {
548                                Value::Int(i) => Some(*i as f64),
549                                Value::Float(f) => Some(*f),
550                                _ => None,
551                            })
552                            .fold(f64::INFINITY, f64::min);
553
554                        if min == f64::INFINITY {
555                            Value::Null
556                        } else {
557                            Value::Float(min)
558                        }
559                    } else {
560                        Value::Null
561                    }
562                }
563                "max" => {
564                    let field =
565                        f.arguments
566                            .iter()
567                            .find(|a| a.name == "field")
568                            .and_then(|a| match &a.value {
569                                ast::Value::String(s) => Some(s),
570                                _ => None,
571                            });
572
573                    if let Some(field_name) = field
574                        && !docs.is_empty()
575                    {
576                        let max = docs
577                            .iter()
578                            .filter_map(|d| d.data.get(field_name))
579                            .filter_map(|v| match v {
580                                Value::Int(i) => Some(*i as f64),
581                                Value::Float(f) => Some(*f),
582                                _ => None,
583                            })
584                            .fold(f64::NEG_INFINITY, f64::max);
585
586                        if max == f64::NEG_INFINITY {
587                            Value::Null
588                        } else {
589                            Value::Float(max)
590                        }
591                    } else {
592                        Value::Null
593                    }
594                }
595                _ => Value::Null,
596            };
597            results.insert(alias.clone(), value);
598        }
599    }
600
601    results
602}
603
604fn find_indexed_equality_filter_runtime(
605    filter: &ast::Filter,
606    db: &Aurora,
607    collection: &str,
608    variables: &HashMap<String, ast::Value>,
609) -> Option<(String, ast::Value)> {
610    match filter {
611        ast::Filter::Eq(field, val) => {
612            if field == "_sid" || db.has_index(collection, field) {
613                let resolved = resolve_if_variable(val, variables);
614                return Some((field.clone(), resolved.clone()));
615            }
616        }
617        ast::Filter::And(filters) => {
618            for f in filters {
619                if let Some(res) =
620                    find_indexed_equality_filter_runtime(f, db, collection, variables)
621                {
622                    return Some(res);
623                }
624            }
625        }
626        _ => {}
627    }
628    None
629}
630
631/// Helper to flatten a selection set (processing fragments) into a list of fields
632fn collect_fields(
633    selection_set: &[Selection],
634    fragments: &HashMap<String, FragmentDef>,
635    variable_values: &HashMap<String, ast::Value>,
636    parent_type: Option<&str>,
637) -> Result<Vec<Field>> {
638    let mut fields = Vec::new();
639
640    for selection in selection_set {
641        match selection {
642            Selection::Field(field) => {
643                if should_include(&field.directives, variable_values)? {
644                    fields.push(field.clone());
645                }
646            }
647            Selection::FragmentSpread(name) => {
648                if let Some(fragment) = fragments.get(name) {
649                    let type_match = if let Some(parent) = parent_type {
650                        parent == fragment.type_condition
651                    } else {
652                        true
653                    };
654
655                    if type_match {
656                        let fragment_fields = collect_fields(
657                            &fragment.selection_set,
658                            fragments,
659                            variable_values,
660                            parent_type,
661                        )?;
662                        fields.extend(fragment_fields);
663                    }
664                }
665            }
666            Selection::InlineFragment(inline) => {
667                let type_match = if let Some(parent) = parent_type {
668                    parent == inline.type_condition
669                } else {
670                    true
671                };
672
673                if type_match {
674                    let inline_fields = collect_fields(
675                        &inline.selection_set,
676                        fragments,
677                        variable_values,
678                        parent_type,
679                    )?;
680                    fields.extend(inline_fields);
681                }
682            }
683            Selection::ComputedField(cf) => {
684                let (expr_val, _complex_expr) = match &cf.expression {
685                    ast::ComputedExpression::TemplateString(s) => (s.clone(), None),
686                    _ => (
687                        "complex".to_string(),
688                        Some(ast::Expression::Literal(ast::Value::Null)),
689                    ), // Handled below
690                };
691
692                let mut field = Field {
693                    alias: Some(cf.alias.clone()),
694                    name: "__compute__".to_string(),
695                    arguments: vec![ast::Argument {
696                        name: "expr".to_string(),
697                        value: ast::Value::String(expr_val),
698                    }],
699                    directives: Vec::new(),
700                    selection_set: Vec::new(),
701                    computed_expression: None,
702                };
703
704                if let ast::ComputedExpression::TemplateString(s) = &cf.expression {
705                    field.arguments[0].value = ast::Value::String(s.clone());
706                }
707                match &cf.expression {
708                    ast::ComputedExpression::TemplateString(_) => {}
709                    ast::ComputedExpression::StandardExpression(e) => {
710                        field.computed_expression = Some(e.clone());
711                    }
712                    ast::ComputedExpression::FunctionCall { name, args } => {
713                        field.computed_expression = Some(ast::Expression::FunctionCall {
714                            name: name.clone(),
715                            args: args.clone(),
716                        });
717                    }
718                    ast::ComputedExpression::PipeExpression { .. } => {
719                        // TODO: Implement Pipe to Expression conversion if needed
720                    }
721                    ast::ComputedExpression::SqlExpression(s) => {
722                        field.computed_expression = Some(ast::Expression::Literal(
723                            ast::Value::String(format!("SQL: {}", s)),
724                        ));
725                    }
726                    ast::ComputedExpression::AggregateFunction { .. } => {
727                        // Handled by aggregate executor
728                    }
729                }
730
731                fields.push(field);
732            }
733        }
734    }
735    Ok(fields)
736}
737
738/// Check if a field/fragment should be included based on @skip/@include
739fn should_include(
740    directives: &[ast::Directive],
741    variables: &HashMap<String, ast::Value>,
742) -> Result<bool> {
743    for dir in directives {
744        if dir.name == "skip" {
745            if let Some(arg) = dir.arguments.iter().find(|a| a.name == "if") {
746                let should_skip = resolve_boolean_arg(&arg.value, variables)?;
747                if should_skip {
748                    return Ok(false);
749                }
750            }
751        } else if dir.name == "include" {
752            if let Some(arg) = dir.arguments.iter().find(|a| a.name == "if") {
753                let should_include = resolve_boolean_arg(&arg.value, variables)?;
754                if !should_include {
755                    return Ok(false);
756                }
757            }
758        }
759    }
760    Ok(true)
761}
762
763/// Check if the current user role has permission to access a field or collection.
764/// Supports @allow(read: "ROLE", write: "ROLE"), @auth, and @require(role: "ROLE").
765fn check_permissions(
766    directives: &[ast::Directive],
767    options: &ExecutionOptions,
768    is_write: bool,
769) -> Result<()> {
770    for directive in directives {
771        match directive.name.as_str() {
772            "auth" => {
773                if options.user_role.is_none() {
774                    return Err(AqlError::new(
775                        ErrorCode::Unauthorized,
776                        "Authentication required for this operation".to_string(),
777                    ));
778                }
779            }
780            "require" | "allow" => {
781                let required_role = directive
782                    .arguments
783                    .iter()
784                    .find(|a| {
785                        if is_write {
786                            a.name == "write" || a.name == "role"
787                        } else {
788                            a.name == "read" || a.name == "role"
789                        }
790                    })
791                    .and_then(|a| {
792                        if let ast::Value::String(s) = &a.value {
793                            Some(s.as_str())
794                        } else {
795                            None
796                        }
797                    });
798
799                if let Some(role) = required_role {
800                    if options.user_role.as_deref() != Some(role) {
801                        return Err(AqlError::new(
802                            ErrorCode::Forbidden,
803                            format!("Role '{}' required for this operation", role),
804                        ));
805                    }
806                }
807            }
808            _ => {}
809        }
810    }
811    Ok(())
812}
813
814fn resolve_boolean_arg(
815    value: &ast::Value,
816    variables: &HashMap<String, ast::Value>,
817) -> Result<bool> {
818    match value {
819        ast::Value::Boolean(b) => Ok(*b),
820        ast::Value::Variable(name) => {
821            if let Some(val) = variables.get(name) {
822                match val {
823                    ast::Value::Boolean(b) => Ok(*b),
824                    _ => Err(AqlError::new(
825                        ErrorCode::TypeError,
826                        format!("Variable '{}' is not a boolean, got {:?}", name, val),
827                    )),
828                }
829            } else {
830                Err(AqlError::new(
831                    ErrorCode::UndefinedVariable,
832                    format!("Variable '{}' is not defined", name),
833                ))
834            }
835        }
836        _ => Err(AqlError::new(
837            ErrorCode::TypeError,
838            format!("Expected boolean value, got {:?}", value),
839        )),
840    }
841}
842
843/// Validate that all required variables are provided
844fn validate_required_variables(
845    variable_definitions: &[ast::VariableDefinition],
846    provided_variables: &HashMap<String, ast::Value>,
847) -> Result<()> {
848    for var_def in variable_definitions {
849        if var_def.var_type.is_required {
850            if !provided_variables.contains_key(&var_def.name) {
851                if var_def.default_value.is_none() {
852                    return Err(AqlError::new(
853                        ErrorCode::UndefinedVariable,
854                        format!(
855                            "Required variable '{}' (type: {}{}) is not provided",
856                            var_def.name,
857                            var_def.var_type.name,
858                            if var_def.var_type.is_required {
859                                "!"
860                            } else {
861                                ""
862                            }
863                        ),
864                    ));
865                }
866            }
867        }
868    }
869    Ok(())
870}
871
872/// The result of executing an AQL operation.
873///
874/// This enum encapsulates the different types of results that can be returned
875/// by the Aurora engine, such as query results, mutation effects, or schema changes.
876#[derive(Debug)]
877pub enum ExecutionResult {
878    /// Result of a `query` operation.
879    Query(QueryResult),
880    /// Result of a `mutation` operation (insert, update, delete, upsert).
881    Mutation(MutationResult),
882    /// Result of a `subscription` operation.
883    Subscription(SubscriptionResult),
884    /// A batch of multiple results.
885    Batch(Vec<ExecutionResult>),
886    /// Result of a schema definition operation.
887    Schema(SchemaResult),
888    /// Result of a database migration.
889    Migration(MigrationResult),
890}
891
892impl ExecutionResult {
893    /// Extracts the `QueryResult` if this is a `Query`, returning `None` otherwise.
894    pub fn as_query(&self) -> Option<&QueryResult> {
895        if let Self::Query(q) = self {
896            Some(q)
897        } else {
898            None
899        }
900    }
901
902    /// Consumes the result and returns the `QueryResult` if this is a `Query`.
903    pub fn into_query(self) -> Option<QueryResult> {
904        if let Self::Query(q) = self {
905            Some(q)
906        } else {
907            None
908        }
909    }
910
911    /// Extracts the `MutationResult` if this is a `Mutation`, returning `None` otherwise.
912    pub fn as_mutation(&self) -> Option<&MutationResult> {
913        if let Self::Mutation(m) = self {
914            Some(m)
915        } else {
916            None
917        }
918    }
919
920    /// Consumes the result and returns the `MutationResult` if this is a `Mutation`.
921    pub fn into_mutation(self) -> Option<MutationResult> {
922        if let Self::Mutation(m) = self {
923            Some(m)
924        } else {
925            None
926        }
927    }
928
929    /// Extracts the `SubscriptionResult` if this is a `Subscription`, returning `None` otherwise.
930    pub fn as_subscription(&self) -> Option<&SubscriptionResult> {
931        if let Self::Subscription(s) = self {
932            Some(s)
933        } else {
934            None
935        }
936    }
937
938    /// Consumes the result and returns the `SubscriptionResult` if this is a `Subscription`.
939    pub fn into_subscription(self) -> Option<SubscriptionResult> {
940        if let Self::Subscription(s) = self {
941            Some(s)
942        } else {
943            None
944        }
945    }
946
947    /// Maps the result documents into a list of user-defined structs.
948    ///
949    /// This works for `Query`, `Mutation`, and `Batch` results. The target type `T`
950    /// must implement `serde::Deserialize`.
951    pub fn bind<T: serde::de::DeserializeOwned>(self) -> Result<Vec<T>> {
952        match self {
953            Self::Query(q) => q.bind(),
954            Self::Mutation(m) => m.bind(),
955            Self::Batch(results) => {
956                let mut all = Vec::new();
957                for r in results {
958                    all.extend(r.bind()?);
959                }
960                Ok(all)
961            }
962            _ => Err(AqlError::new(
963                ErrorCode::QueryError,
964                "Cannot bind results from schema or migration operations".to_string(),
965            )),
966        }
967    }
968
969    /// Maps the first result document into a user-defined struct.
970    ///
971    /// This is an optimized version of `bind` that only processes the first document.
972    pub fn bind_first<T: serde::de::DeserializeOwned>(self) -> Result<T> {
973        match self {
974            Self::Query(q) => q.bind_first(),
975            Self::Mutation(m) => m.bind_first(),
976            Self::Batch(mut results) => {
977                if results.is_empty() {
978                    return Err(AqlError::new(
979                        ErrorCode::NotFound,
980                        "No documents found to bind".to_string(),
981                    ));
982                }
983                results.remove(0).bind_first()
984            }
985            _ => Err(AqlError::new(
986                ErrorCode::QueryError,
987                "Cannot bind results from schema or migration operations".to_string(),
988            )),
989        }
990    }
991}
992
993#[derive(Debug, Clone)]
994pub struct SchemaResult {
995    pub operation: String,
996    pub collection: String,
997    pub status: String,
998}
999
1000#[derive(Debug, Clone)]
1001pub struct MigrationResult {
1002    pub version: String,
1003    pub steps_applied: usize,
1004    pub status: String,
1005}
1006
1007#[derive(Debug, Clone, Serialize)]
1008pub struct ExecutionPlan {
1009    pub operations: Vec<String>,
1010    pub estimated_cost: f64,
1011}
1012
1013/// The result of a `query` operation.
1014#[derive(Debug, Clone)]
1015pub struct QueryResult {
1016    /// The name of the collection that was queried.
1017    pub collection: String,
1018    /// The documents that matched the query.
1019    pub documents: Vec<Document>,
1020    /// The total number of documents matching the query (if calculated).
1021    pub total_count: Option<usize>,
1022    /// Fields marked `@defer` — omitted from documents, listed here.
1023    pub deferred_fields: Vec<String>,
1024    /// Explain metadata when `@explain` is present on the query.
1025    pub explain: Option<ExplainResult>,
1026}
1027
1028impl QueryResult {
1029    /// Maps the result documents into a list of user-defined structs.
1030    pub fn bind<T: serde::de::DeserializeOwned>(self) -> Result<Vec<T>> {
1031        self.documents
1032            .into_iter()
1033            .map(|d| d.bind())
1034            .collect::<Result<Vec<T>>>()
1035    }
1036
1037    /// Maps the first result document into a user-defined struct.
1038    pub fn bind_first<T: serde::de::DeserializeOwned>(self) -> Result<T> {
1039        self.documents
1040            .into_iter()
1041            .next()
1042            .ok_or_else(|| {
1043                AqlError::new(
1044                    ErrorCode::NotFound,
1045                    "No documents found to bind".to_string(),
1046                )
1047            })?
1048            .bind()
1049    }
1050}
1051
1052/// Execution plan metadata returned by `@explain`.
1053#[derive(Debug, Clone, Default)]
1054pub struct ExplainResult {
1055    /// The name of the collection that was scanned.
1056    pub collection: String,
1057    /// The number of documents scanned during execution.
1058    pub docs_scanned: usize,
1059    /// Whether a secondary index was used for the query.
1060    pub index_used: bool,
1061    /// The time taken to execute the query in milliseconds.
1062    pub elapsed_ms: u128,
1063}
1064
1065/// The result of a `mutation` operation.
1066#[derive(Debug, Clone)]
1067pub struct MutationResult {
1068    /// The type of operation performed (insert, update, delete, upsert).
1069    pub operation: String,
1070    /// The name of the collection affected by the mutation.
1071    pub collection: String,
1072    /// The number of documents affected by the operation.
1073    pub affected_count: usize,
1074    /// The documents returned by the mutation (if any).
1075    pub returned_documents: Vec<Document>,
1076}
1077
1078impl MutationResult {
1079    /// Maps the documents returned by the mutation to a list of user-defined structs.
1080    pub fn bind<T: serde::de::DeserializeOwned>(self) -> Result<Vec<T>> {
1081        self.returned_documents
1082            .into_iter()
1083            .map(|d| d.bind())
1084            .collect::<Result<Vec<T>>>()
1085    }
1086
1087    /// Maps the first document returned by the mutation to a user-defined struct.
1088    pub fn bind_first<T: serde::de::DeserializeOwned>(self) -> Result<T> {
1089        self.returned_documents
1090            .into_iter()
1091            .next()
1092            .ok_or_else(|| {
1093                AqlError::new(
1094                    ErrorCode::NotFound,
1095                    "No documents found to bind".to_string(),
1096                )
1097            })?
1098            .bind()
1099    }
1100}
1101
1102/// The result of a `subscription` operation.
1103#[derive(Debug)]
1104pub struct SubscriptionResult {
1105    /// Unique identifier for the subscription.
1106    pub subscription_id: String,
1107    /// The name of the collection being subscribed to.
1108    pub collection: String,
1109    /// The stream of change events.
1110    pub stream: Option<crate::pubsub::ChangeListener>,
1111}
1112
1113/// Execution options
1114#[derive(Debug, Clone)]
1115pub struct ExecutionOptions {
1116    pub skip_validation: bool,
1117    pub apply_projections: bool,
1118    pub debug_audit: bool,
1119    pub variables: HashMap<String, ast::Value>,
1120    /// Current user role for RBAC checks (@allow, @auth, @require)
1121    pub user_role: Option<String>,
1122}
1123
1124impl Default for ExecutionOptions {
1125    fn default() -> Self {
1126        Self {
1127            skip_validation: false,
1128            apply_projections: true,
1129            debug_audit: false,
1130            variables: HashMap::new(),
1131            user_role: None,
1132        }
1133    }
1134}
1135
1136impl ExecutionOptions {
1137    pub fn new() -> Self {
1138        Self::default()
1139    }
1140
1141    pub fn with_variables(mut self, vars: HashMap<String, ast::Value>) -> Self {
1142        self.variables = vars;
1143        self
1144    }
1145
1146    pub fn with_role(mut self, role: impl Into<String>) -> Self {
1147        self.user_role = Some(role.into());
1148        self
1149    }
1150
1151    pub fn skip_validation(mut self) -> Self {
1152        self.skip_validation = true;
1153        self
1154    }
1155}
1156
1157pub(crate) fn json_to_aql_value(v: serde_json::Value) -> ast::Value {
1158    match v {
1159        serde_json::Value::Null => ast::Value::Null,
1160        serde_json::Value::Bool(b) => ast::Value::Boolean(b),
1161        serde_json::Value::Number(n) => {
1162            if let Some(i) = n.as_i64() {
1163                ast::Value::Int(i)
1164            } else if let Some(f) = n.as_f64() {
1165                ast::Value::Float(f)
1166            } else {
1167                ast::Value::Null
1168            }
1169        }
1170        serde_json::Value::String(s) => ast::Value::String(s),
1171        serde_json::Value::Array(arr) => {
1172            ast::Value::Array(arr.into_iter().map(json_to_aql_value).collect())
1173        }
1174        serde_json::Value::Object(map) => ast::Value::Object(
1175            map.into_iter()
1176                .map(|(k, v)| (k, json_to_aql_value(v)))
1177                .collect(),
1178        ),
1179    }
1180}
1181
1182/// Execute a parsed AQL document
1183pub async fn execute_document(
1184    db: &Aurora,
1185    doc: &ast::Document,
1186    options: &ExecutionOptions,
1187) -> Result<ExecutionResult> {
1188    if doc.operations.is_empty() {
1189        return Err(AqlError::new(
1190            ErrorCode::QueryError,
1191            "No operations in document".to_string(),
1192        ));
1193    }
1194
1195    println!("execute_document: first op: {:?}", doc.operations[0]);
1196
1197    let vars: HashMap<String, ast::Value> = options.variables.clone();
1198
1199    let fragments: HashMap<String, FragmentDef> = doc
1200        .operations
1201        .iter()
1202        .filter_map(|op| {
1203            if let Operation::FragmentDefinition(frag) = op {
1204                Some((frag.name.clone(), frag.clone()))
1205            } else {
1206                None
1207            }
1208        })
1209        .collect();
1210
1211    let executable_ops: Vec<&Operation> = doc
1212        .operations
1213        .iter()
1214        .filter(|op| !matches!(op, Operation::FragmentDefinition(_)))
1215        .collect();
1216
1217    if executable_ops.is_empty() {
1218        return Err(AqlError::new(
1219            ErrorCode::QueryError,
1220            "No executable operations in document".to_string(),
1221        ));
1222    }
1223
1224    if executable_ops.len() == 1 {
1225        execute_operation(db, executable_ops[0], &vars, options, &fragments).await
1226    } else {
1227        let mut results = Vec::new();
1228        for op in executable_ops {
1229            results.push(execute_operation(db, op, &vars, options, &fragments).await?);
1230        }
1231        Ok(ExecutionResult::Batch(results))
1232    }
1233}
1234
1235async fn execute_operation(
1236    db: &Aurora,
1237    op: &Operation,
1238    vars: &HashMap<String, ast::Value>,
1239    options: &ExecutionOptions,
1240    fragments: &HashMap<String, FragmentDef>,
1241) -> Result<ExecutionResult> {
1242    match op {
1243        Operation::Query(query) => execute_query(db, query, vars, options, fragments).await,
1244        Operation::Mutation(mutation) => {
1245            execute_mutation(db, mutation, vars, options, fragments).await
1246        }
1247        Operation::Subscription(sub) => execute_subscription(db, sub, vars, options).await,
1248        Operation::Schema(schema) => execute_schema(db, schema, options).await,
1249        Operation::Migration(migration) => execute_migration(db, migration, options).await,
1250        Operation::Introspection(intro) => execute_introspection(db, intro).await,
1251        Operation::Handler(handler) => execute_handler_registration(db, handler, options).await,
1252        _ => Ok(ExecutionResult::Query(QueryResult {
1253            collection: String::new(),
1254            documents: vec![],
1255            total_count: None,
1256            deferred_fields: vec![],
1257            explain: None,
1258        })),
1259    }
1260}
1261
1262async fn execute_query(
1263    db: &Aurora,
1264    query: &ast::Query,
1265    vars: &HashMap<String, ast::Value>,
1266    options: &ExecutionOptions,
1267    fragments: &HashMap<String, FragmentDef>,
1268) -> Result<ExecutionResult> {
1269    validate_required_variables(&query.variable_definitions, vars)?;
1270    let has_explain = query.directives.iter().any(|d| d.name == "explain");
1271    let root_fields = collect_fields(&query.selection_set, fragments, vars, None)?;
1272    let mut results = Vec::new();
1273    for field in &root_fields {
1274        let sub_fields = collect_fields(&field.selection_set, fragments, vars, Some(&field.name))?;
1275        let start = std::time::Instant::now();
1276        let mut result =
1277            execute_collection_query(db, field, &sub_fields, vars, options, fragments).await?;
1278        if has_explain {
1279            let elapsed_ms = start.elapsed().as_millis();
1280            let index_used =
1281                field.arguments.iter().any(|a| a.name == "where") && !result.documents.is_empty();
1282            result.explain = Some(ExplainResult {
1283                collection: result.collection.clone(),
1284                docs_scanned: result.documents.len(),
1285                index_used,
1286                elapsed_ms,
1287            });
1288        }
1289        results.push(result);
1290    }
1291    if results.len() == 1 {
1292        Ok(ExecutionResult::Query(results.remove(0)))
1293    } else {
1294        Ok(ExecutionResult::Batch(
1295            results.into_iter().map(ExecutionResult::Query).collect(),
1296        ))
1297    }
1298}
1299
1300async fn execute_collection_query(
1301    db: &Aurora,
1302    field: &ast::Field,
1303    sub_fields: &[ast::Field],
1304    variables: &HashMap<String, ast::Value>,
1305    options: &ExecutionOptions,
1306    fragments: &HashMap<String, FragmentDef>,
1307) -> Result<QueryResult> {
1308    let collection_name = &field.name;
1309
1310    // RBAC: Check field level permissions
1311    check_permissions(&field.directives, options, false)?;
1312
1313    // ── Search args ────────────────────────────────────────────────────────
1314    // `search: { query: "...", fields: [...], fuzzy: true }` overrides normal scan.
1315    if let Some(search_arg) = field.arguments.iter().find(|a| a.name == "search") {
1316        return execute_search_query(
1317            db,
1318            collection_name,
1319            search_arg,
1320            sub_fields,
1321            field,
1322            variables,
1323            options,
1324        )
1325        .await;
1326    }
1327
1328    let filter = extract_filter_from_args(&field.arguments)?;
1329    let (limit, offset) = extract_pagination(&field.arguments);
1330    let (first, after, _last, _before) = extract_cursor_pagination(&field.arguments);
1331    let compiled_filter = if let Some(ref f) = filter {
1332        Some(compile_filter(f)?)
1333    } else {
1334        None
1335    };
1336    let vars_arc = Arc::new(variables.clone());
1337    let filter_fn = move |doc: &Document| {
1338        compiled_filter
1339            .as_ref()
1340            .map(|f| matches_filter(doc, f, &vars_arc))
1341            .unwrap_or(true)
1342    };
1343
1344    let indexed_docs = if let Some(ref f) = filter {
1345        match find_indexed_equality_filter(f, db, collection_name) {
1346            Some((field_name, val))
1347                if field_name == "_sid" || !db.is_index_building(collection_name, &field_name) =>
1348            {
1349                let db_val = aql_value_to_db_value(&val, variables)?;
1350                let ids = if field_name == "_sid" {
1351                    match &db_val {
1352                        Value::String(s) => vec![s.clone()],
1353                        Value::Uuid(u) => vec![u.to_string()],
1354                        _ => vec![],
1355                    }
1356                } else {
1357                    db.get_ids_from_index(collection_name, &field_name, &db_val)
1358                };
1359
1360                let mut docs = Vec::with_capacity(ids.len());
1361                for id in ids {
1362                    if let Some(doc) = db.get_document(collection_name, &id)? {
1363                        if filter_fn(&doc) {
1364                            docs.push(doc);
1365                        }
1366                    }
1367                }
1368                Some(docs)
1369            }
1370            // Index is building or no indexed filter found — full scan.
1371            _ => None,
1372        }
1373    } else {
1374        None
1375    };
1376
1377    let is_connection = sub_fields
1378        .iter()
1379        .any(|f| f.name == "edges" || f.name == "pageInfo");
1380
1381    let orderings = extract_order_by(&field.arguments);
1382
1383    let mut docs = if let Some(docs) = indexed_docs {
1384        docs
1385    } else {
1386        //only valid when there is no cursor (after) and no
1387        // ordering.  With ordering we must see ALL docs before we can pick the top-N;
1388        // with a cursor we don't know how far into the result set the cursor lies.
1389        let scan_limit = if after.is_some() || !orderings.is_empty() {
1390            None
1391        } else {
1392            limit.or(first).map(|l| {
1393                let base = if is_connection { l + 1 } else { l };
1394                base + offset
1395            })
1396        };
1397        db.scan_and_filter(collection_name, filter_fn, scan_limit)?
1398    };
1399
1400    // Validation args
1401    // `validate: { fieldName: { format: "email" } }` filters out non-conforming docs
1402    if let Some(validate_arg) = field.arguments.iter().find(|a| a.name == "validate") {
1403        docs.retain(|doc| doc_passes_validate_arg(doc, validate_arg));
1404    }
1405
1406    // Sort before any pagination so offset/cursor operate on ordered results.
1407    if !orderings.is_empty() {
1408        apply_ordering(&mut docs, &orderings);
1409    }
1410
1411    // Cursor pagination: skip past the `after` cursor before slicing
1412    if let Some(ref cursor) = after {
1413        if let Some(pos) = docs.iter().position(|d| &d._sid == cursor) {
1414            docs.drain(0..=pos);
1415        }
1416    }
1417
1418    if is_connection {
1419        return Ok(execute_connection(
1420            docs,
1421            sub_fields,
1422            limit.or(first),
1423            fragments,
1424            variables,
1425            options,
1426        )?);
1427    }
1428
1429    // Offset pagination: skip the first `offset` documents
1430    if offset > 0 {
1431        if offset < docs.len() {
1432            docs.drain(0..offset);
1433        } else {
1434            docs.clear();
1435        }
1436    }
1437
1438    // Apply limit after offset
1439    if let Some(l) = limit.or(first) {
1440        docs.truncate(l);
1441    }
1442
1443    // Projections & Lookups
1444    let lookup_dependencies =
1445        identify_lookup_dependencies(db, collection_name, sub_fields, variables);
1446    let has_lookups = !lookup_dependencies.is_empty();
1447
1448    let mut deferred_fields = Vec::new();
1449
1450    if options.apply_projections && !sub_fields.is_empty() {
1451        if has_lookups {
1452            let mut projected = Vec::with_capacity(docs.len());
1453            for d in docs {
1454                let (proj_doc, deferred) = apply_projection_with_lookups(
1455                    db,
1456                    d,
1457                    collection_name,
1458                    sub_fields,
1459                    fragments,
1460                    variables,
1461                    options,
1462                    &lookup_dependencies,
1463                )
1464                .await?;
1465                projected.push(proj_doc);
1466                if deferred_fields.is_empty() {
1467                    deferred_fields = deferred;
1468                }
1469            }
1470            docs = projected;
1471        } else {
1472            let mut all_deferred = Vec::new();
1473            let mut projected = Vec::with_capacity(docs.len());
1474            for d in docs {
1475                let (proj, deferred) =
1476                    apply_projection_and_defer(d, sub_fields, variables, options)?;
1477                if all_deferred.is_empty() && !deferred.is_empty() {
1478                    all_deferred = deferred;
1479                }
1480                projected.push(proj);
1481            }
1482            docs = projected;
1483            deferred_fields = all_deferred;
1484        }
1485    }
1486
1487    // ── Window functions ───────────────────────────────────────────────────
1488    for sf in sub_fields {
1489        if sf.name == "windowFunc" {
1490            let alias = sf.alias.as_ref().unwrap_or(&sf.name).clone();
1491            let wfield = arg_string(&sf.arguments, "field").unwrap_or_default();
1492            let func = arg_string(&sf.arguments, "function").unwrap_or_else(|| "avg".to_string());
1493            let wsize = arg_i64(&sf.arguments, "windowSize").unwrap_or(3) as usize;
1494            apply_window_function(&mut docs, &alias, &wfield, &func, wsize);
1495        }
1496    }
1497
1498    // ── Downsample ─────────────────────────────────────────────────────────
1499    if let Some(ds_field) = sub_fields.iter().find(|f| f.name == "downsample") {
1500        let interval =
1501            arg_string(&ds_field.arguments, "interval").unwrap_or_else(|| "1h".to_string());
1502        let aggregation =
1503            arg_string(&ds_field.arguments, "aggregation").unwrap_or_else(|| "avg".to_string());
1504        let ds_sub: Vec<String> =
1505            collect_fields(&ds_field.selection_set, fragments, variables, None)
1506                .unwrap_or_default()
1507                .iter()
1508                .map(|f| f.name.clone())
1509                .collect();
1510        docs = apply_downsample(docs, &interval, &aggregation, &ds_sub);
1511    }
1512
1513    Ok(QueryResult {
1514        collection: collection_name.clone(),
1515        documents: docs,
1516        total_count: None,
1517        deferred_fields,
1518        explain: None,
1519    })
1520}
1521
1522/// Execute a search query using SearchBuilder
1523async fn execute_search_query(
1524    db: &Aurora,
1525    collection: &str,
1526    search_arg: &ast::Argument,
1527    sub_fields: &[ast::Field],
1528    field: &ast::Field,
1529    variables: &HashMap<String, ast::Value>,
1530    options: &ExecutionOptions,
1531) -> Result<QueryResult> {
1532    // Deeply resolve variable references inside the search argument (e.g. query: $term)
1533    let resolved_search_val = resolve_ast_deep(&search_arg.value, variables);
1534    let (query_str, search_fields, fuzzy) = extract_search_params(&resolved_search_val);
1535    let (limit, _) = extract_pagination(&field.arguments);
1536
1537    let mut builder = db.search(collection).query(&query_str);
1538    if fuzzy {
1539        builder = builder.fuzzy(1);
1540    }
1541    if let Some(l) = limit {
1542        builder = builder.limit(l);
1543    }
1544
1545    let mut docs = builder
1546        .collect_with_fields(if search_fields.is_empty() {
1547            None
1548        } else {
1549            Some(&search_fields)
1550        })
1551        .await?;
1552
1553    if options.apply_projections && !sub_fields.is_empty() {
1554        let mut projected = Vec::with_capacity(docs.len());
1555        for d in docs {
1556            let (proj, _) = apply_projection_and_defer(d, sub_fields, variables, options)?;
1557            projected.push(proj);
1558        }
1559        docs = projected;
1560    }
1561
1562    Ok(QueryResult {
1563        collection: collection.to_string(),
1564        documents: docs,
1565        total_count: None,
1566        deferred_fields: vec![],
1567        explain: None,
1568    })
1569}
1570
1571fn extract_search_params(v: &ast::Value) -> (String, Vec<String>, bool) {
1572    let mut query = String::new();
1573    let mut fields = Vec::new();
1574    let mut fuzzy = false;
1575    if let ast::Value::Object(m) = v {
1576        if let Some(ast::Value::String(q)) = m.get("query") {
1577            query = q.clone();
1578        }
1579        if let Some(ast::Value::Array(arr)) = m.get("fields") {
1580            for item in arr {
1581                if let ast::Value::String(s) = item {
1582                    fields.push(s.clone());
1583                }
1584            }
1585        }
1586        if let Some(ast::Value::Boolean(b)) = m.get("fuzzy") {
1587            fuzzy = *b;
1588        }
1589    }
1590    (query, fields, fuzzy)
1591}
1592
1593fn doc_passes_validate_arg(doc: &Document, validate_arg: &ast::Argument) -> bool {
1594    if let ast::Value::Object(rules) = &validate_arg.value {
1595        for (field_name, constraints_val) in rules {
1596            if let ast::Value::Object(constraints) = constraints_val {
1597                if let Some(field_val) = doc.data.get(field_name) {
1598                    for (constraint_name, constraint_val) in constraints {
1599                        if !check_inline_constraint(field_val, constraint_name, constraint_val) {
1600                            return false;
1601                        }
1602                    }
1603                }
1604            }
1605        }
1606    }
1607    true
1608}
1609
1610fn check_inline_constraint(value: &Value, constraint: &str, constraint_val: &ast::Value) -> bool {
1611    match constraint {
1612        "format" => {
1613            if let (Value::String(s), ast::Value::String(fmt)) = (value, constraint_val) {
1614                return match fmt.as_str() {
1615                    "email" => {
1616                        s.contains('@')
1617                            && s.split('@')
1618                                .nth(1)
1619                                .map(|d| d.contains('.'))
1620                                .unwrap_or(false)
1621                    }
1622                    "url" => s.starts_with("http://") || s.starts_with("https://"),
1623                    "uuid" => uuid::Uuid::parse_str(s).is_ok(),
1624                    _ => true,
1625                };
1626            }
1627            true
1628        }
1629        "min" => {
1630            let n = match value {
1631                Value::Int(i) => *i as f64,
1632                Value::Float(f) => *f,
1633                _ => return true,
1634            };
1635            let min = match constraint_val {
1636                ast::Value::Float(f) => *f,
1637                ast::Value::Int(i) => *i as f64,
1638                _ => return true,
1639            };
1640            n >= min
1641        }
1642        "max" => {
1643            let n = match value {
1644                Value::Int(i) => *i as f64,
1645                Value::Float(f) => *f,
1646                _ => return true,
1647            };
1648            let max = match constraint_val {
1649                ast::Value::Float(f) => *f,
1650                ast::Value::Int(i) => *i as f64,
1651                _ => return true,
1652            };
1653            n <= max
1654        }
1655        "minLength" => {
1656            if let (Value::String(s), ast::Value::Int(n)) = (value, constraint_val) {
1657                return s.len() >= *n as usize;
1658            }
1659            true
1660        }
1661        "maxLength" => {
1662            if let (Value::String(s), ast::Value::Int(n)) = (value, constraint_val) {
1663                return s.len() <= *n as usize;
1664            }
1665            true
1666        }
1667        "pattern" => {
1668            if let (Value::String(s), ast::Value::String(pat)) = (value, constraint_val) {
1669                if let Ok(re) = regex::Regex::new(pat) {
1670                    return re.is_match(s);
1671                }
1672            }
1673            true
1674        }
1675        _ => true,
1676    }
1677}
1678
1679fn arg_string(args: &[ast::Argument], name: &str) -> Option<String> {
1680    args.iter().find(|a| a.name == name).and_then(|a| {
1681        if let ast::Value::String(s) = &a.value {
1682            Some(s.clone())
1683        } else {
1684            None
1685        }
1686    })
1687}
1688
1689fn arg_i64(args: &[ast::Argument], name: &str) -> Option<i64> {
1690    args.iter().find(|a| a.name == name).and_then(|a| {
1691        if let ast::Value::Int(i) = &a.value {
1692            Some(*i)
1693        } else {
1694            None
1695        }
1696    })
1697}
1698
1699fn identify_lookup_dependencies(
1700    db: &Aurora,
1701    collection: &str,
1702    fields: &[ast::Field],
1703    variables: &HashMap<String, ast::Value>,
1704) -> Vec<String> {
1705    let mut deps = Vec::new();
1706    for f in fields {
1707        let mut found = false;
1708        if let (Some(lf), Some(_c), Some(_ff)) = (
1709            f.arguments.iter().find(|a| a.name == "localField"),
1710            f.arguments.iter().find(|a| a.name == "collection"),
1711            f.arguments.iter().find(|a| a.name == "foreignField"),
1712        ) {
1713            if let ast::Value::String(s) = resolve_if_variable(&lf.value, variables) {
1714                deps.push(s.clone());
1715            }
1716            found = true;
1717        }
1718        if !found {
1719            if let Ok(col_def) = db.get_collection_definition(collection) {
1720                if let Some(f_def) = col_def.fields.get(&f.name) {
1721                    if let Some(_rel) = &f_def.relation {
1722                        deps.push(f.name.clone());
1723                    }
1724                }
1725            }
1726        }
1727    }
1728    deps
1729}
1730
1731/// Apply projection and collect @defer'd field names separately
1732fn apply_projection_and_defer(
1733    mut doc: Document,
1734    fields: &[ast::Field],
1735    vars: &HashMap<String, ast::Value>,
1736    options: &ExecutionOptions,
1737) -> Result<(Document, Vec<String>)> {
1738    if fields.is_empty() {
1739        return Ok((doc, vec![]));
1740    }
1741    let mut proj = HashMap::new();
1742    let mut deferred = Vec::new();
1743
1744    for f in fields {
1745        // RBAC: Check field level permissions
1746        check_permissions(&f.directives, options, false)?;
1747
1748        // @defer directive: omit from response, record field name
1749        if f.directives.iter().any(|d| d.name == "defer") {
1750            deferred.push(f.alias.as_ref().unwrap_or(&f.name).clone());
1751            continue;
1752        }
1753        // Computed field: name starts with __compute__ sentinel
1754        if f.name == "__compute__" {
1755            let alias = f.alias.as_deref().unwrap_or("computed");
1756            if let Some(expr_arg) = f.arguments.iter().find(|a| a.name == "expr") {
1757                // If it's a raw string, treat as template for backward compatibility
1758                if let ast::Value::String(template) = &expr_arg.value {
1759                    let result = eval_template(template, &doc.data);
1760                    proj.insert(alias.to_string(), Value::String(result));
1761                } else if let Some(expr) = f.computed_expression.as_ref() {
1762                    // Use new expression evaluator
1763                    let result = eval_expression(expr, &doc.data, vars);
1764                    proj.insert(alias.to_string(), result);
1765                }
1766            }
1767            continue;
1768        }
1769        if f.name == "id" {
1770            // User requested 'id'. Pull from data strictly.
1771            if let Some(v) = doc.data.get("id") {
1772                proj.insert(f.alias.as_ref().unwrap_or(&f.name).clone(), v.clone());
1773            } else {
1774                // Not in data, return null
1775                proj.insert(f.alias.as_ref().unwrap_or(&f.name).clone(), Value::Null);
1776            }
1777        } else if f.name == "_sid" {
1778            proj.insert(
1779                f.alias.as_ref().unwrap_or(&f.name).clone(),
1780                Value::String(doc._sid.clone()),
1781            );
1782        } else if let Some(v) = doc.data.get(&f.name) {
1783            proj.insert(f.alias.as_ref().unwrap_or(&f.name).clone(), v.clone());
1784        }
1785    }
1786    doc.data = proj;
1787    Ok((doc, deferred))
1788}
1789/// Evaluates a complex AQL expression against document data
1790fn eval_expression(
1791    expr: &ast::Expression,
1792    data: &HashMap<String, Value>,
1793    variables: &HashMap<String, ast::Value>,
1794) -> Value {
1795    match expr {
1796        ast::Expression::Literal(v) => match aql_value_to_db_value(v, variables) {
1797            Ok(dv) => dv,
1798            Err(_) => Value::Null,
1799        },
1800        ast::Expression::Variable(name) => {
1801            if let Some(v) = variables.get(name) {
1802                match aql_value_to_db_value(v, variables) {
1803                    Ok(dv) => dv,
1804                    Err(_) => Value::Null,
1805                }
1806            } else {
1807                Value::Null
1808            }
1809        }
1810        ast::Expression::FieldAccess(parts) => {
1811            let mut current = if parts[0] == "_sid" {
1812                Some(Value::String(
1813                    data.get("_sid")
1814                        .cloned()
1815                        .and_then(|v| v.as_str().map(|s| s.to_string()))
1816                        .unwrap_or_default(),
1817                ))
1818            } else {
1819                data.get(&parts[0]).cloned()
1820            };
1821
1822            for part in parts.iter().skip(1) {
1823                if let Some(Value::Object(map)) = current {
1824                    current = map.get(part).cloned();
1825                } else {
1826                    current = None;
1827                    break;
1828                }
1829            }
1830            current.unwrap_or(Value::Null)
1831        }
1832        ast::Expression::Binary { op, left, right } => {
1833            let lv = eval_expression(left, data, variables);
1834            let rv = eval_expression(right, data, variables);
1835            eval_binary_op(*op, lv, rv)
1836        }
1837        ast::Expression::Unary { op, expr } => {
1838            let v = eval_expression(expr, data, variables);
1839            eval_unary_op(*op, v)
1840        }
1841        ast::Expression::Ternary {
1842            condition,
1843            then_expr,
1844            else_expr,
1845        } => {
1846            let cv = eval_expression(condition, data, variables);
1847            let is_true = match cv {
1848                Value::Bool(b) => b,
1849                Value::Null => false,
1850                Value::Int(i) => i != 0,
1851                Value::Float(f) => f != 0.0,
1852                Value::String(s) => !s.is_empty(),
1853                _ => true,
1854            };
1855            if is_true {
1856                eval_expression(then_expr, data, variables)
1857            } else {
1858                eval_expression(else_expr, data, variables)
1859            }
1860        }
1861        ast::Expression::FunctionCall { name, args } => {
1862            let evaluated_args: Vec<Value> = args
1863                .iter()
1864                .map(|a| eval_expression(a, data, variables))
1865                .collect();
1866            eval_function_call(name, evaluated_args)
1867        }
1868    }
1869}
1870
1871fn eval_binary_op(op: ast::BinaryOp, left: Value, right: Value) -> Value {
1872    match op {
1873        ast::BinaryOp::Add => match (left, right) {
1874            (Value::Int(a), Value::Int(b)) => Value::Int(a + b),
1875            (Value::Int(a), Value::Float(b)) => Value::Float(a as f64 + b),
1876            (Value::Float(a), Value::Int(b)) => Value::Float(a + b as f64),
1877            (Value::Float(a), Value::Float(b)) => Value::Float(a + b),
1878            (Value::String(mut a), Value::String(b)) => {
1879                a.push_str(&b);
1880                Value::String(a)
1881            }
1882            _ => Value::Null,
1883        },
1884        ast::BinaryOp::Sub => match (left, right) {
1885            (Value::Int(a), Value::Int(b)) => Value::Int(a - b),
1886            (Value::Float(a), Value::Float(b)) => Value::Float(a - b),
1887            _ => Value::Null,
1888        },
1889        ast::BinaryOp::Mul => match (left, right) {
1890            (Value::Int(a), Value::Int(b)) => Value::Int(a * b),
1891            (Value::Int(a), Value::Float(b)) => Value::Float(a as f64 * b),
1892            (Value::Float(a), Value::Int(b)) => Value::Float(a * b as f64),
1893            (Value::Float(a), Value::Float(b)) => Value::Float(a * b),
1894            _ => Value::Null,
1895        },
1896        ast::BinaryOp::Div => match (left, right) {
1897            (Value::Int(a), Value::Int(b)) if b != 0 => Value::Int(a / b),
1898            (Value::Float(a), Value::Float(b)) if b != 0.0 => Value::Float(a / b),
1899            _ => Value::Null,
1900        },
1901        ast::BinaryOp::Eq => Value::Bool(left == right),
1902        ast::BinaryOp::Ne => Value::Bool(left != right),
1903        ast::BinaryOp::Gt => Value::Bool(left > right),
1904        ast::BinaryOp::Gte => Value::Bool(left >= right),
1905        ast::BinaryOp::Lt => Value::Bool(left < right),
1906        ast::BinaryOp::Lte => Value::Bool(left <= right),
1907        ast::BinaryOp::And => {
1908            let lb = matches!(left, Value::Bool(true));
1909            let rb = matches!(right, Value::Bool(true));
1910            Value::Bool(lb && rb)
1911        }
1912        ast::BinaryOp::Or => {
1913            let lb = matches!(left, Value::Bool(true));
1914            let rb = matches!(right, Value::Bool(true));
1915            Value::Bool(lb || rb)
1916        }
1917        _ => Value::Null,
1918    }
1919}
1920
1921fn eval_unary_op(op: ast::UnaryOp, val: Value) -> Value {
1922    match op {
1923        ast::UnaryOp::Not => match val {
1924            Value::Bool(b) => Value::Bool(!b),
1925            Value::Null => Value::Bool(true),
1926            _ => Value::Bool(false),
1927        },
1928        ast::UnaryOp::Neg => match val {
1929            Value::Int(i) => Value::Int(-i),
1930            Value::Float(f) => Value::Float(-f),
1931            _ => Value::Null,
1932        },
1933    }
1934}
1935
1936fn eval_function_call(name: &str, args: Vec<Value>) -> Value {
1937    match name {
1938        "upper" | "uppercase" => {
1939            if let Some(Value::String(s)) = args.get(0) {
1940                Value::String(s.to_uppercase())
1941            } else {
1942                Value::Null
1943            }
1944        }
1945        "lower" | "lowercase" => {
1946            if let Some(Value::String(s)) = args.get(0) {
1947                Value::String(s.to_lowercase())
1948            } else {
1949                Value::Null
1950            }
1951        }
1952        "len" | "length" => match args.get(0) {
1953            Some(Value::String(s)) => Value::Int(s.len() as i64),
1954            Some(Value::Array(a)) => Value::Int(a.len() as i64),
1955            Some(Value::Object(o)) => Value::Int(o.len() as i64),
1956            _ => Value::Null,
1957        },
1958        _ => Value::Null,
1959    }
1960}
1961
1962/// Simple `$fieldName` or `${fieldName}` template evaluator
1963fn eval_template(template: &str, data: &HashMap<String, Value>) -> String {
1964    let mut result = template.to_string();
1965    for (k, v) in data {
1966        let p1 = format!("${{{}}}", k);
1967        let p2 = format!("${}", k);
1968        let v_str = match v {
1969            Value::String(s) => s.clone(),
1970            _ => v.to_string(),
1971        };
1972        if result.contains(&p1) {
1973            result = result.replace(&p1, &v_str);
1974        }
1975        if result.contains(&p2) {
1976            result = result.replace(&p2, &v_str);
1977        }
1978    }
1979    result
1980}
1981
1982/// Apply window function (rollingAvg, rollingSum, rollingMin, rollingMax) on the docs Vec
1983fn apply_window_function(
1984    docs: &mut Vec<Document>,
1985    alias: &str,
1986    field: &str,
1987    function: &str,
1988    window: usize,
1989) {
1990    if docs.is_empty() || window == 0 {
1991        return;
1992    }
1993    let values: Vec<Option<f64>> = docs
1994        .iter()
1995        .map(|d| match d.data.get(field) {
1996            Some(Value::Int(i)) => Some(*i as f64),
1997            Some(Value::Float(f)) => Some(*f),
1998            _ => None,
1999        })
2000        .collect();
2001
2002    for (i, doc) in docs.iter_mut().enumerate() {
2003        let start = if i + 1 >= window { i + 1 - window } else { 0 };
2004        let window_vals: Vec<f64> = values[start..=i].iter().filter_map(|v| *v).collect();
2005        if window_vals.is_empty() {
2006            continue;
2007        }
2008        let result = match function {
2009            "rollingAvg" | "avg" => window_vals.iter().sum::<f64>() / window_vals.len() as f64,
2010            "rollingSum" | "sum" => window_vals.iter().sum::<f64>(),
2011            "rollingMin" | "min" => window_vals.iter().cloned().fold(f64::INFINITY, f64::min),
2012            "rollingMax" | "max" => window_vals
2013                .iter()
2014                .cloned()
2015                .fold(f64::NEG_INFINITY, f64::max),
2016            _ => window_vals.iter().sum::<f64>() / window_vals.len() as f64,
2017        };
2018        doc.data.insert(alias.to_string(), Value::Float(result));
2019    }
2020}
2021
2022/// Downsample time-series: bucket docs by interval, aggregate value fields
2023fn apply_downsample(
2024    docs: Vec<Document>,
2025    interval: &str,
2026    aggregation: &str,
2027    value_fields: &[String],
2028) -> Vec<Document> {
2029    let interval_secs: i64 = parse_interval(interval);
2030    if interval_secs <= 0 {
2031        return docs;
2032    }
2033
2034    // Group by bucket
2035    let mut buckets: std::collections::BTreeMap<i64, Vec<Document>> =
2036        std::collections::BTreeMap::new();
2037    let mut leftover = Vec::new();
2038
2039    for doc in docs {
2040        // Try common timestamp field names
2041        let ts = ["timestamp", "ts", "created_at", "time"]
2042            .iter()
2043            .find_map(|&k| doc.data.get(k))
2044            .and_then(|v| match v {
2045                Value::String(s) => chrono::DateTime::parse_from_rfc3339(s)
2046                    .ok()
2047                    .map(|dt| dt.timestamp()),
2048                Value::Int(i) => Some(*i),
2049                _ => None,
2050            });
2051
2052        if let Some(t) = ts {
2053            let bucket = (t / interval_secs) * interval_secs;
2054            buckets.entry(bucket).or_default().push(doc);
2055        } else {
2056            leftover.push(doc);
2057        }
2058    }
2059
2060    let mut result = Vec::new();
2061    for (bucket_ts, group) in buckets {
2062        let mut data = HashMap::new();
2063        data.insert(
2064            "timestamp".to_string(),
2065            Value::String(
2066                chrono::DateTime::from_timestamp(bucket_ts, 0)
2067                    .map(|dt: chrono::DateTime<chrono::Utc>| dt.to_rfc3339())
2068                    .unwrap_or_default(),
2069            ),
2070        );
2071        data.insert("count".to_string(), Value::Int(group.len() as i64));
2072
2073        for field in value_fields {
2074            if field == "timestamp" || field == "count" {
2075                continue;
2076            }
2077            let nums: Vec<f64> = group
2078                .iter()
2079                .filter_map(|d| match d.data.get(field) {
2080                    Some(Value::Int(i)) => Some(*i as f64),
2081                    Some(Value::Float(f)) => Some(*f),
2082                    _ => None,
2083                })
2084                .collect();
2085            if nums.is_empty() {
2086                continue;
2087            }
2088            let agg = match aggregation {
2089                "sum" => nums.iter().sum::<f64>(),
2090                "min" => nums.iter().cloned().fold(f64::INFINITY, f64::min),
2091                "max" => nums.iter().cloned().fold(f64::NEG_INFINITY, f64::max),
2092                "count" => nums.len() as f64,
2093                _ => nums.iter().sum::<f64>() / nums.len() as f64, // avg default
2094            };
2095            data.insert(field.clone(), Value::Float(agg));
2096        }
2097
2098        result.push(Document {
2099            _sid: bucket_ts.to_string(),
2100            data,
2101        });
2102    }
2103
2104    result.extend(leftover);
2105    result
2106}
2107
2108fn parse_interval(s: &str) -> i64 {
2109    let s = s.trim();
2110    if s.ends_with('s') {
2111        s[..s.len() - 1].parse().unwrap_or(0)
2112    } else if s.ends_with('m') {
2113        s[..s.len() - 1].parse::<i64>().unwrap_or(0) * 60
2114    } else if s.ends_with('h') {
2115        s[..s.len() - 1].parse::<i64>().unwrap_or(0) * 3600
2116    } else if s.ends_with('d') {
2117        s[..s.len() - 1].parse::<i64>().unwrap_or(0) * 86400
2118    } else {
2119        s.parse().unwrap_or(3600)
2120    }
2121}
2122
2123/// Register an event handler. Spawns a background task that fires a mutation
2124/// each time the trigger event is received on the relevant collection.
2125async fn execute_handler_registration(
2126    db: &Aurora,
2127    handler: &ast::HandlerDef,
2128    _options: &ExecutionOptions,
2129) -> Result<ExecutionResult> {
2130    use crate::pubsub::events::ChangeType;
2131
2132    let collection = match &handler.trigger {
2133        ast::HandlerTrigger::Insert { collection }
2134        | ast::HandlerTrigger::Update { collection }
2135        | ast::HandlerTrigger::Delete { collection } => {
2136            collection.as_deref().unwrap_or("*").to_string()
2137        }
2138        _ => "*".to_string(),
2139    };
2140
2141    let trigger_type = match &handler.trigger {
2142        ast::HandlerTrigger::Insert { .. } => Some(ChangeType::Insert),
2143        ast::HandlerTrigger::Update { .. } => Some(ChangeType::Update),
2144        ast::HandlerTrigger::Delete { .. } => Some(ChangeType::Delete),
2145        _ => None,
2146    };
2147
2148    let mut listener = if collection == "*" {
2149        db.pubsub.listen_all()
2150    } else {
2151        db.pubsub.listen(collection.clone())
2152    };
2153
2154    let db_clone = db.clone();
2155    let action = handler.action.clone();
2156    let handler_name = handler.name.clone();
2157
2158    tokio::spawn(async move {
2159        loop {
2160            match listener.recv().await {
2161                Ok(event) => {
2162                    // Check event type matches trigger
2163                    let matches = trigger_type
2164                        .as_ref()
2165                        .map(|t| &event.change_type == t)
2166                        .unwrap_or(true);
2167                    if !matches {
2168                        continue;
2169                    }
2170
2171                    // Build context from the triggering event.
2172                    // Always set $_id from event._sid — for deletes, event.document is None
2173                    // but the document ID is still in event._sid.
2174                    let mut vars = HashMap::new();
2175                    vars.insert("_id".to_string(), ast::Value::String(event._sid.clone()));
2176                    if let Some(doc) = &event.document {
2177                        // Overlay field variables from the document data (insert/update)
2178                        for (k, v) in &doc.data {
2179                            vars.insert(format!("_{}", k), db_value_to_ast_value(v));
2180                        }
2181                    }
2182
2183                    let _ = execute_mutation_op(
2184                        &db_clone,
2185                        &action,
2186                        &vars,
2187                        &HashMap::new(),
2188                        &ExecutionOptions::default(),
2189                        &HashMap::new(),
2190                    )
2191                    .await;
2192                }
2193                Err(_) => {
2194                    eprintln!("[handler:{}] channel closed, stopping", handler_name);
2195                    break;
2196                }
2197            }
2198        }
2199    });
2200
2201    eprintln!(
2202        "[handler] '{}' registered on '{}'",
2203        handler.name, collection
2204    );
2205
2206    let mut data = HashMap::new();
2207    data.insert("name".to_string(), Value::String(handler.name.clone()));
2208    data.insert("collection".to_string(), Value::String(collection));
2209    data.insert(
2210        "status".to_string(),
2211        Value::String("registered".to_string()),
2212    );
2213
2214    Ok(ExecutionResult::Query(QueryResult {
2215        collection: "__handler".to_string(),
2216        documents: vec![Document {
2217            _sid: handler.name.clone(),
2218            data,
2219        }],
2220        total_count: Some(1),
2221        deferred_fields: vec![],
2222        explain: None,
2223    }))
2224}
2225
2226fn db_value_to_ast_value(v: &Value) -> ast::Value {
2227    match v {
2228        Value::Null => ast::Value::Null,
2229        Value::Bool(b) => ast::Value::Boolean(*b),
2230        Value::Int(i) => ast::Value::Int(*i),
2231        Value::Float(f) => ast::Value::Float(*f),
2232        Value::String(s) => ast::Value::String(s.clone()),
2233        Value::Uuid(u) => ast::Value::String(u.to_string()),
2234        Value::DateTime(dt) => ast::Value::String(dt.to_rfc3339()),
2235        Value::Array(arr) => ast::Value::Array(arr.iter().map(db_value_to_ast_value).collect()),
2236        Value::Object(m) => ast::Value::Object(
2237            m.iter()
2238                .map(|(k, v)| (k.clone(), db_value_to_ast_value(v)))
2239                .collect(),
2240        ),
2241    }
2242}
2243
2244async fn execute_mutation(
2245    db: &Aurora,
2246    mutation: &ast::Mutation,
2247    vars: &HashMap<String, ast::Value>,
2248    options: &ExecutionOptions,
2249    fragments: &HashMap<String, FragmentDef>,
2250) -> Result<ExecutionResult> {
2251    use crate::transaction::ACTIVE_TRANSACTION_ID;
2252
2253    validate_required_variables(&mutation.variable_definitions, vars)?;
2254
2255    // RBAC: Check mutation level permissions
2256    check_permissions(&mutation.directives, options, true)?;
2257
2258    // If there's already an active transaction in scope, run ops directly inside
2259    // it — the caller owns commit/rollback. Creating a nested transaction would
2260    // shadow the outer ACTIVE_TRANSACTION_ID and prevent the outer rollback from
2261    // undoing these writes.
2262    let already_in_tx = ACTIVE_TRANSACTION_ID
2263        .try_with(|id| *id)
2264        .ok()
2265        .and_then(|id| db.transaction_manager.active_transactions.get(&id))
2266        .is_some();
2267
2268    if already_in_tx {
2269        let mut results = Vec::new();
2270        let mut context = HashMap::new();
2271        for mut_op in &mutation.operations {
2272            let res = execute_mutation_op(db, mut_op, vars, &context, options, fragments).await?;
2273            if let Some(alias) = &mut_op.alias {
2274                if let Some(doc) = res.returned_documents.first() {
2275                    let mut m = serde_json::Map::new();
2276                    for (k, v) in &doc.data {
2277                        m.insert(k.clone(), aurora_value_to_json_value(v));
2278                    }
2279                    m.insert("id".to_string(), JsonValue::String(doc._sid.clone()));
2280                    context.insert(alias.clone(), JsonValue::Object(m));
2281                }
2282            }
2283            results.push(res);
2284        }
2285        return if results.len() == 1 {
2286            Ok(ExecutionResult::Mutation(results.remove(0)))
2287        } else {
2288            Ok(ExecutionResult::Batch(
2289                results.into_iter().map(ExecutionResult::Mutation).collect(),
2290            ))
2291        };
2292    }
2293
2294    // No active transaction — wrap the mutation block in one so a failure in any
2295    // operation rolls back all previous operations in the same block.
2296    let tx_id = db.begin_transaction().await;
2297
2298    let exec_result = ACTIVE_TRANSACTION_ID
2299        .scope(tx_id, async {
2300            let mut results = Vec::new();
2301            let mut context = HashMap::new();
2302            for mut_op in &mutation.operations {
2303                let res =
2304                    execute_mutation_op(db, mut_op, vars, &context, options, fragments).await?;
2305                if let Some(alias) = &mut_op.alias {
2306                    if let Some(doc) = res.returned_documents.first() {
2307                        let mut m = serde_json::Map::new();
2308                        for (k, v) in &doc.data {
2309                            m.insert(k.clone(), aurora_value_to_json_value(v));
2310                        }
2311                        m.insert("id".to_string(), JsonValue::String(doc._sid.clone()));
2312                        context.insert(alias.clone(), JsonValue::Object(m));
2313                    }
2314                }
2315                results.push(res);
2316            }
2317            Ok::<_, crate::error::AqlError>(results)
2318        })
2319        .await;
2320
2321    match exec_result {
2322        Ok(mut results) => {
2323            db.commit_transaction(tx_id).await?;
2324            if results.len() == 1 {
2325                Ok(ExecutionResult::Mutation(results.remove(0)))
2326            } else {
2327                Ok(ExecutionResult::Batch(
2328                    results.into_iter().map(ExecutionResult::Mutation).collect(),
2329                ))
2330            }
2331        }
2332        Err(e) => {
2333            let _ = db.rollback_transaction(tx_id).await;
2334            Err(e)
2335        }
2336    }
2337}
2338
2339fn execute_mutation_op<'a>(
2340    db: &'a Aurora,
2341    mut_op: &'a ast::MutationOperation,
2342    variables: &'a HashMap<String, ast::Value>,
2343    context: &'a ExecutionContext,
2344    options: &'a ExecutionOptions,
2345    fragments: &'a HashMap<String, FragmentDef>,
2346) -> futures::future::BoxFuture<'a, Result<MutationResult>> {
2347    use futures::future::FutureExt;
2348    async move {
2349        // RBAC: Check mutation level permissions
2350        check_permissions(&mut_op.directives, options, true)?;
2351
2352        match &mut_op.operation {
2353            MutationOp::Insert { collection, data } => {
2354                let resolved = resolve_value(data, variables, context);
2355                let doc = db
2356                    .aql_insert(collection, aql_value_to_hashmap(&resolved, variables)?)
2357                    .await?;
2358                let returned = if !mut_op.selection_set.is_empty() && options.apply_projections {
2359                    let fields = collect_fields(
2360                        &mut_op.selection_set,
2361                        fragments,
2362                        variables,
2363                        Some(collection),
2364                    )
2365                    .unwrap_or_default();
2366                    vec![apply_projection(doc, &fields, options)?]
2367                } else {
2368                    vec![doc]
2369                };
2370
2371                Ok(MutationResult {
2372                    operation: "insert".to_string(),
2373                    collection: collection.clone(),
2374                    affected_count: 1,
2375                    returned_documents: returned,
2376                })
2377            }
2378            MutationOp::Update {
2379                collection,
2380                filter,
2381                data,
2382            } => {
2383                let cf = if let Some(f) = filter {
2384                    Some(compile_filter(f)?)
2385                } else {
2386                    None
2387                };
2388                let update_data =
2389                    aql_value_to_hashmap(&resolve_value(data, variables, context), variables)?;
2390
2391                // Scan and update matching documents
2392                let mut affected = 0;
2393                let mut returned = Vec::new();
2394
2395                // For now, we use a simple scan-and-update.
2396                // In production, we'd use indices if the filter allows.
2397                let vars_arc = Arc::new(variables.clone());
2398                let cf_arc = cf.map(Arc::new);
2399
2400                let matches = db.scan_and_filter(
2401                    collection,
2402                    |doc| {
2403                        if let Some(ref filter) = cf_arc {
2404                            matches_filter(doc, filter, &vars_arc)
2405                        } else {
2406                            true
2407                        }
2408                    },
2409                    None,
2410                )?;
2411
2412                let fields = if !mut_op.selection_set.is_empty() {
2413                    Some(
2414                        collect_fields(
2415                            &mut_op.selection_set,
2416                            fragments,
2417                            variables,
2418                            Some(collection),
2419                        )
2420                        .unwrap_or_default(),
2421                    )
2422                } else {
2423                    None
2424                };
2425
2426                for doc in matches {
2427                    let mut new_data = doc.data.clone();
2428                    for (k, v) in &update_data {
2429                        let applied = apply_field_modifier(new_data.get(k), v);
2430                        new_data.insert(k.clone(), applied);
2431                    }
2432
2433                    let updated_doc = db
2434                        .aql_update_document(collection, &doc._sid, new_data)
2435                        .await?;
2436
2437                    affected += 1;
2438                    if let Some(ref f) = fields {
2439                        returned.push(apply_projection(updated_doc, f, options)?);
2440                    }
2441                }
2442
2443                Ok(MutationResult {
2444                    operation: "update".to_string(),
2445                    collection: collection.clone(),
2446                    affected_count: affected,
2447                    returned_documents: returned,
2448                })
2449            }
2450            MutationOp::Delete { collection, filter } => {
2451                let cf = if let Some(f) = filter {
2452                    Some(compile_filter(f)?)
2453                } else {
2454                    None
2455                };
2456
2457                let mut affected = 0;
2458                let vars_arc = Arc::new(variables.clone());
2459                let cf_arc = cf.map(Arc::new);
2460
2461                let matches = db.scan_and_filter(
2462                    collection,
2463                    |doc| {
2464                        if let Some(ref filter) = cf_arc {
2465                            matches_filter(doc, filter, &vars_arc)
2466                        } else {
2467                            true
2468                        }
2469                    },
2470                    None,
2471                )?;
2472
2473                for doc in matches {
2474                    db.aql_delete_document(collection, &doc._sid).await?;
2475                    affected += 1;
2476                }
2477
2478                Ok(MutationResult {
2479                    operation: "delete".to_string(),
2480                    collection: collection.clone(),
2481                    affected_count: affected,
2482                    returned_documents: vec![],
2483                })
2484            }
2485            MutationOp::InsertMany { collection, data } => {
2486                let resolved = resolve_value(data, variables, context);
2487                let items = match resolved {
2488                    ast::Value::Array(arr) => arr,
2489                    _ => {
2490                        return Err(AqlError::new(
2491                            ErrorCode::QueryError,
2492                            "insertMany data must be an array".to_string(),
2493                        ));
2494                    }
2495                };
2496
2497                let mut affected = 0;
2498                let mut returned = Vec::new();
2499                for item in items {
2500                    let doc = db
2501                        .aql_insert(collection, aql_value_to_hashmap(&item, variables)?)
2502                        .await?;
2503                    affected += 1;
2504                    if !mut_op.selection_set.is_empty() && options.apply_projections {
2505                        let fields = collect_fields(
2506                            &mut_op.selection_set,
2507                            fragments,
2508                            variables,
2509                            Some(collection),
2510                        )
2511                        .unwrap_or_default();
2512                        returned.push(apply_projection(doc, &fields, options)?);
2513                    } else {
2514                        returned.push(doc);
2515                    }
2516                }
2517                Ok(MutationResult {
2518                    operation: "insertMany".to_string(),
2519                    collection: collection.clone(),
2520                    affected_count: affected,
2521                    returned_documents: returned,
2522                })
2523            }
2524            MutationOp::Upsert {
2525                collection,
2526                filter,
2527                data,
2528            } => {
2529                let update_data =
2530                    aql_value_to_hashmap(&resolve_value(data, variables, context), variables)?;
2531                let cf = if let Some(f) = filter {
2532                    Some(compile_filter(f)?)
2533                } else {
2534                    None
2535                };
2536                let vars_arc = Arc::new(variables.clone());
2537                let cf_arc = cf.map(Arc::new);
2538                let matches = db.scan_and_filter(
2539                    collection,
2540                    |doc| {
2541                        if let Some(ref filter) = cf_arc {
2542                            matches_filter(doc, filter, &vars_arc)
2543                        } else {
2544                            true
2545                        }
2546                    },
2547                    Some(1),
2548                )?;
2549                let doc = if let Some(existing) = matches.into_iter().next() {
2550                    db.aql_update_document(collection, &existing._sid, update_data)
2551                        .await?
2552                } else {
2553                    db.aql_insert(collection, update_data).await?
2554                };
2555
2556                let returned = if !mut_op.selection_set.is_empty() && options.apply_projections {
2557                    let fields = collect_fields(
2558                        &mut_op.selection_set,
2559                        fragments,
2560                        variables,
2561                        Some(collection),
2562                    )
2563                    .unwrap_or_default();
2564                    vec![apply_projection(doc, &fields, options)?]
2565                } else {
2566                    vec![doc]
2567                };
2568
2569                Ok(MutationResult {
2570                    operation: "upsert".to_string(),
2571                    collection: collection.clone(),
2572                    affected_count: 1,
2573                    returned_documents: returned,
2574                })
2575            }
2576            MutationOp::Transaction { operations } => {
2577                let mut all_returned = Vec::new();
2578                let mut total_affected = 0;
2579                for op in operations {
2580                    let res =
2581                        execute_mutation_op(db, op, variables, context, options, fragments).await?;
2582                    total_affected += res.affected_count;
2583                    all_returned.extend(res.returned_documents);
2584                }
2585                Ok(MutationResult {
2586                    operation: "transaction".to_string(),
2587                    collection: String::new(),
2588                    affected_count: total_affected,
2589                    returned_documents: all_returned,
2590                })
2591            }
2592            MutationOp::EnqueueJobs {
2593                job_type,
2594                payloads,
2595                priority,
2596                max_retries,
2597            } => {
2598                let workers = db.workers.as_ref().ok_or_else(|| {
2599                    AqlError::new(
2600                        ErrorCode::QueryError,
2601                        "Worker system not initialised".to_string(),
2602                    )
2603                })?;
2604                let job_priority = match priority {
2605                    ast::JobPriority::Low => crate::workers::JobPriority::Low,
2606                    ast::JobPriority::Normal => crate::workers::JobPriority::Normal,
2607                    ast::JobPriority::High => crate::workers::JobPriority::High,
2608                    ast::JobPriority::Critical => crate::workers::JobPriority::Critical,
2609                };
2610                let mut returned = Vec::new();
2611                for payload in payloads {
2612                    let resolved = resolve_value(payload, variables, context);
2613                    let json_payload: std::collections::HashMap<String, serde_json::Value> =
2614                        if let ast::Value::Object(map) = &resolved {
2615                            map.iter()
2616                                .map(|(k, v)| (k.clone(), aql_value_to_json(v)))
2617                                .collect()
2618                        } else {
2619                            std::collections::HashMap::new()
2620                        };
2621                    let mut job =
2622                        crate::workers::Job::new(job_type.clone()).with_priority(job_priority);
2623                    for (k, v) in json_payload {
2624                        job = job.add_field(k, v);
2625                    }
2626                    if let Some(retries) = max_retries {
2627                        job = job.with_max_retries(*retries);
2628                    }
2629                    let job_id = workers.enqueue(job).await?;
2630                    let mut doc = crate::types::Document::new();
2631                    doc._sid = job_id.clone();
2632                    doc.data.insert("job_id".to_string(), Value::String(job_id));
2633                    doc.data
2634                        .insert("job_type".to_string(), Value::String(job_type.clone()));
2635                    doc.data
2636                        .insert("status".to_string(), Value::String("pending".to_string()));
2637                    returned.push(doc);
2638                }
2639                let count = returned.len();
2640                Ok(MutationResult {
2641                    operation: "enqueueJobs".to_string(),
2642                    collection: "__jobs".to_string(),
2643                    affected_count: count,
2644                    returned_documents: returned,
2645                })
2646            }
2647            MutationOp::Import { collection, data } => {
2648                let mut affected = 0;
2649                let mut returned = Vec::new();
2650                let fields = if !mut_op.selection_set.is_empty() {
2651                    Some(
2652                        collect_fields(
2653                            &mut_op.selection_set,
2654                            fragments,
2655                            variables,
2656                            Some(collection),
2657                        )
2658                        .unwrap_or_default(),
2659                    )
2660                } else {
2661                    None
2662                };
2663                for item in data {
2664                    let resolved = resolve_value(item, variables, context);
2665                    let map = aql_value_to_hashmap(&resolved, variables)?;
2666                    let doc = db.aql_insert(collection, map).await?;
2667                    affected += 1;
2668                    if let Some(ref f) = fields {
2669                        returned.push(apply_projection(doc, f, options)?);
2670                    }
2671                }
2672                Ok(MutationResult {
2673                    operation: "import".to_string(),
2674                    collection: collection.clone(),
2675                    affected_count: affected,
2676                    returned_documents: returned,
2677                })
2678            }
2679            MutationOp::Export {
2680                collection,
2681                format: _,
2682            } => {
2683                let docs = db.scan_and_filter(collection, |_| true, None)?;
2684                let fields = if !mut_op.selection_set.is_empty() {
2685                    Some(
2686                        collect_fields(
2687                            &mut_op.selection_set,
2688                            fragments,
2689                            variables,
2690                            Some(collection),
2691                        )
2692                        .unwrap_or_default(),
2693                    )
2694                } else {
2695                    None
2696                };
2697                let mut returned = Vec::with_capacity(docs.len());
2698                if let Some(ref f) = fields {
2699                    for d in docs {
2700                        returned.push(apply_projection(d, f, options)?);
2701                    }
2702                } else {
2703                    returned = docs;
2704                }
2705                let count = returned.len();
2706                Ok(MutationResult {
2707                    operation: "export".to_string(),
2708                    collection: collection.clone(),
2709                    affected_count: count,
2710                    returned_documents: returned,
2711                })
2712            }
2713            MutationOp::EnqueueJob {
2714                job_type,
2715                payload,
2716                priority,
2717                scheduled_at,
2718                max_retries,
2719            } => {
2720                let workers = db.workers.as_ref().ok_or_else(|| {
2721                    AqlError::new(
2722                        ErrorCode::QueryError,
2723                        "Worker system not initialised".to_string(),
2724                    )
2725                })?;
2726
2727                // Convert AQL priority to Job priority
2728                let job_priority = match priority {
2729                    ast::JobPriority::Low => crate::workers::JobPriority::Low,
2730                    ast::JobPriority::Normal => crate::workers::JobPriority::Normal,
2731                    ast::JobPriority::High => crate::workers::JobPriority::High,
2732                    ast::JobPriority::Critical => crate::workers::JobPriority::Critical,
2733                };
2734
2735                // Convert AQL Value payload to serde_json payload
2736                let resolved = resolve_value(payload, variables, context);
2737                let json_payload: std::collections::HashMap<String, serde_json::Value> =
2738                    if let ast::Value::Object(map) = &resolved {
2739                        map.iter()
2740                            .map(|(k, v)| (k.clone(), aql_value_to_json(v)))
2741                            .collect()
2742                    } else {
2743                        std::collections::HashMap::new()
2744                    };
2745
2746                let mut job =
2747                    crate::workers::Job::new(job_type.clone()).with_priority(job_priority);
2748
2749                for (k, v) in json_payload {
2750                    job = job.add_field(k, v);
2751                }
2752
2753                if let Some(retries) = max_retries {
2754                    job = job.with_max_retries(*retries);
2755                }
2756
2757                if let Some(scheduled) = scheduled_at {
2758                    if let Ok(dt) = scheduled.parse::<chrono::DateTime<chrono::Utc>>() {
2759                        job = job.scheduled_at(dt);
2760                    }
2761                }
2762
2763                let job_id = workers.enqueue(job).await?;
2764
2765                let mut doc = crate::types::Document::new();
2766                doc._sid = job_id.clone();
2767                doc.data
2768                    .insert("job_id".to_string(), crate::types::Value::String(job_id));
2769                doc.data.insert(
2770                    "job_type".to_string(),
2771                    crate::types::Value::String(job_type.clone()),
2772                );
2773                doc.data.insert(
2774                    "status".to_string(),
2775                    crate::types::Value::String("pending".to_string()),
2776                );
2777
2778                Ok(MutationResult {
2779                    operation: "enqueueJob".to_string(),
2780                    collection: "__jobs".to_string(),
2781                    affected_count: 1,
2782                    returned_documents: vec![doc],
2783                })
2784            }
2785        }
2786    }
2787    .boxed()
2788}
2789
2790async fn execute_subscription(
2791    db: &Aurora,
2792    sub: &ast::Subscription,
2793    vars: &HashMap<String, ast::Value>,
2794    _options: &ExecutionOptions,
2795) -> Result<ExecutionResult> {
2796    let vars: HashMap<String, ast::Value> = vars.clone();
2797
2798    let selection = sub.selection_set.first().ok_or_else(|| {
2799        AqlError::new(
2800            ErrorCode::QueryError,
2801            "Subscription must have a selection".to_string(),
2802        )
2803    })?;
2804
2805    if let Selection::Field(f) = selection {
2806        let collection = f.name.clone();
2807        let filter = extract_filter_from_args(&f.arguments)?;
2808
2809        let mut listener = db.pubsub.listen(&collection);
2810
2811        if let Some(f) = filter {
2812            let event_filter = ast_filter_to_event_filter(&f, &vars)?;
2813            listener = listener.filter(event_filter);
2814        }
2815
2816        Ok(ExecutionResult::Subscription(SubscriptionResult {
2817            subscription_id: uuid::Uuid::new_v4().to_string(),
2818            collection,
2819            stream: Some(listener),
2820        }))
2821    } else {
2822        Err(AqlError::new(
2823            ErrorCode::QueryError,
2824            "Invalid subscription selection".to_string(),
2825        ))
2826    }
2827}
2828
2829fn ast_filter_to_event_filter(
2830    filter: &AqlFilter,
2831    vars: &HashMap<String, ast::Value>,
2832) -> Result<crate::pubsub::EventFilter> {
2833    use crate::pubsub::EventFilter;
2834
2835    match filter {
2836        AqlFilter::Eq(f, v) => Ok(EventFilter::FieldEquals(
2837            f.clone(),
2838            aql_value_to_db_value(v, vars)?,
2839        )),
2840        AqlFilter::Ne(f, v) => Ok(EventFilter::Ne(f.clone(), aql_value_to_db_value(v, vars)?)),
2841        AqlFilter::Gt(f, v) => Ok(EventFilter::Gt(f.clone(), aql_value_to_db_value(v, vars)?)),
2842        AqlFilter::Gte(f, v) => Ok(EventFilter::Gte(f.clone(), aql_value_to_db_value(v, vars)?)),
2843        AqlFilter::Lt(f, v) => Ok(EventFilter::Lt(f.clone(), aql_value_to_db_value(v, vars)?)),
2844        AqlFilter::Lte(f, v) => Ok(EventFilter::Lte(f.clone(), aql_value_to_db_value(v, vars)?)),
2845        AqlFilter::In(f, v) => Ok(EventFilter::In(f.clone(), aql_value_to_db_value(v, vars)?)),
2846        AqlFilter::NotIn(f, v) => Ok(EventFilter::NotIn(
2847            f.clone(),
2848            aql_value_to_db_value(v, vars)?,
2849        )),
2850        AqlFilter::Contains(f, v) => Ok(EventFilter::Contains(
2851            f.clone(),
2852            aql_value_to_db_value(v, vars)?,
2853        )),
2854        // ContainsAny/ContainsAll don't have EventFilter equivalents; fall back to Contains
2855        AqlFilter::ContainsAny(f, v) | AqlFilter::ContainsAll(f, v) => Ok(EventFilter::Contains(
2856            f.clone(),
2857            aql_value_to_db_value(v, vars)?,
2858        )),
2859        AqlFilter::StartsWith(f, v) => Ok(EventFilter::StartsWith(
2860            f.clone(),
2861            aql_value_to_db_value(v, vars)?,
2862        )),
2863        AqlFilter::EndsWith(f, v) => Ok(EventFilter::EndsWith(
2864            f.clone(),
2865            aql_value_to_db_value(v, vars)?,
2866        )),
2867        AqlFilter::Matches(f, v) => {
2868            let pattern = match aql_value_to_db_value(v, vars)? {
2869                crate::types::Value::String(s) => s,
2870                other => other.to_string(),
2871            };
2872            let re = regex::Regex::new(&pattern).map_err(|e| {
2873                crate::error::AqlError::invalid_operation(format!("Invalid regex pattern: {}", e))
2874            })?;
2875            Ok(EventFilter::Matches(f.clone(), re))
2876        }
2877        AqlFilter::IsNull(f) => Ok(EventFilter::IsNull(f.clone())),
2878        AqlFilter::IsNotNull(f) => Ok(EventFilter::IsNotNull(f.clone())),
2879        AqlFilter::And(filters) => {
2880            let mut mapped = Vec::new();
2881            for f in filters {
2882                mapped.push(ast_filter_to_event_filter(f, vars)?);
2883            }
2884            Ok(EventFilter::And(mapped))
2885        }
2886        AqlFilter::Or(filters) => {
2887            let mut mapped = Vec::new();
2888            for f in filters {
2889                mapped.push(ast_filter_to_event_filter(f, vars)?);
2890            }
2891            Ok(EventFilter::Or(mapped))
2892        }
2893        AqlFilter::Not(f) => Ok(EventFilter::Not(Box::new(ast_filter_to_event_filter(
2894            f, vars,
2895        )?))),
2896    }
2897}
2898
2899async fn execute_introspection(
2900    db: &Aurora,
2901    intro: &ast::IntrospectionQuery,
2902) -> Result<ExecutionResult> {
2903    let names = db.list_collection_names();
2904    let want_fields = intro.fields.is_empty()
2905        || intro
2906            .fields
2907            .iter()
2908            .any(|f| f == "collections" || f == "fields");
2909
2910    let documents: Vec<Document> = names
2911        .iter()
2912        .filter_map(|name| {
2913            // Skip internal system collections from introspection unless asked
2914            if name.starts_with('_') {
2915                return None;
2916            }
2917            let col = db.get_collection_definition(name).ok()?;
2918            let mut data = HashMap::new();
2919            data.insert("name".to_string(), Value::String(name.clone()));
2920
2921            if want_fields {
2922                let field_list: Vec<Value> = col
2923                    .fields
2924                    .iter()
2925                    .map(|(fname, fdef)| {
2926                        let mut fdata = HashMap::new();
2927                        fdata.insert("name".to_string(), Value::String(fname.clone()));
2928                        fdata.insert(
2929                            "type".to_string(),
2930                            Value::String(fdef.field_type.to_string()),
2931                        );
2932                        fdata.insert("required".to_string(), Value::Bool(!fdef.nullable));
2933                        fdata.insert("indexed".to_string(), Value::Bool(fdef.indexed));
2934                        fdata.insert("unique".to_string(), Value::Bool(fdef.unique));
2935                        if !fdef.validations.is_empty() {
2936                            let vcons: Vec<Value> = fdef
2937                                .validations
2938                                .iter()
2939                                .map(|c| Value::String(format!("{:?}", c)))
2940                                .collect();
2941                            fdata.insert("validations".to_string(), Value::Array(vcons));
2942                        }
2943                        Value::Object(fdata)
2944                    })
2945                    .collect();
2946                data.insert("fields".to_string(), Value::Array(field_list));
2947            }
2948
2949            Some(Document {
2950                _sid: name.clone(),
2951                data,
2952            })
2953        })
2954        .collect();
2955
2956    let count = documents.len();
2957    Ok(ExecutionResult::Query(QueryResult {
2958        collection: "__schema".to_string(),
2959        documents,
2960        total_count: Some(count),
2961        deferred_fields: vec![],
2962        explain: None,
2963    }))
2964}
2965
2966async fn execute_schema(
2967    db: &Aurora,
2968    schema: &ast::Schema,
2969    _options: &ExecutionOptions,
2970) -> Result<ExecutionResult> {
2971    let mut last_collection = String::new();
2972
2973    for op in &schema.operations {
2974        match op {
2975            ast::SchemaOp::DefineCollection {
2976                name,
2977                fields,
2978                if_not_exists,
2979                ..
2980            } => {
2981                last_collection = name.clone();
2982
2983                // If if_not_exists is true, check if it already exists
2984                if *if_not_exists {
2985                    if db.get_collection_definition(name).is_ok() {
2986                        continue;
2987                    }
2988                }
2989
2990                let mut field_defs = Vec::new();
2991                for field in fields {
2992                    field_defs.push((field.name.as_str(), build_field_def(field)));
2993                }
2994                db.new_collection(name, field_defs).await?;
2995            }
2996            ast::SchemaOp::AlterCollection { name, actions } => {
2997                last_collection = name.clone();
2998                for action in actions {
2999                    match action {
3000                        ast::AlterAction::AddField { field, default } => {
3001                            let def = build_field_def(field);
3002                            db.add_field_to_schema(name, field.name.clone(), def)
3003                                .await?;
3004                            if let Some(default_val) = default {
3005                                let db_val = aql_value_to_db_value(default_val, &HashMap::new())?;
3006                                let docs = db.get_all_collection(name).await?;
3007                                for doc in docs {
3008                                    if !doc.data.contains_key(&field.name) {
3009                                        db.update_document(
3010                                            name,
3011                                            &doc._sid,
3012                                            vec![(&field.name, db_val.clone())],
3013                                        )
3014                                        .await?;
3015                                    }
3016                                }
3017                            }
3018                        }
3019                        ast::AlterAction::DropField(field_name) => {
3020                            db.drop_field_from_schema(name, field_name.clone()).await?;
3021                        }
3022                        ast::AlterAction::RenameField { from, to } => {
3023                            db.rename_field_in_schema(name, from.clone(), to.clone())
3024                                .await?;
3025                            let docs = db.get_all_collection(name).await?;
3026                            for mut doc in docs {
3027                                if let Some(val) = doc.data.remove(from.as_str()) {
3028                                    doc.data.insert(to.clone(), val);
3029                                    let key = format!("{}:{}", name, doc._sid);
3030                                    db.put(key, serde_json::to_vec(&doc)?, None).await?;
3031                                }
3032                            }
3033                        }
3034                        ast::AlterAction::ModifyField(field) => {
3035                            db.modify_field_in_schema(
3036                                name,
3037                                field.name.clone(),
3038                                build_field_def(field),
3039                            )
3040                            .await?;
3041                        }
3042                    }
3043                }
3044            }
3045            ast::SchemaOp::DropCollection { name, .. } => {
3046                db.drop_collection_schema(name).await?;
3047                last_collection = name.clone();
3048            }
3049        }
3050    }
3051
3052    Ok(ExecutionResult::Schema(SchemaResult {
3053        operation: "schema".to_string(),
3054        collection: last_collection,
3055        status: "done".to_string(),
3056    }))
3057}
3058
3059fn map_ast_type(anno: &ast::TypeAnnotation) -> FieldType {
3060    let scalar = match anno.name.to_lowercase().as_str() {
3061        "string" => ScalarType::String,
3062        "int" | "integer" => ScalarType::Int,
3063        "float" | "double" => ScalarType::Float,
3064        "bool" | "boolean" => ScalarType::Bool,
3065        "uuid" => ScalarType::Uuid,
3066        "object" => ScalarType::Object,
3067        "array" => ScalarType::Array,
3068        _ => ScalarType::Any,
3069    };
3070
3071    if anno.is_array {
3072        FieldType::Array(scalar)
3073    } else {
3074        match scalar {
3075            ScalarType::Object => FieldType::Object,
3076            ScalarType::Any => FieldType::Any,
3077            _ => FieldType::Scalar(scalar),
3078        }
3079    }
3080}
3081
3082/// Parse @validate directive arguments into FieldValidationConstraints
3083fn parse_validate_directive(
3084    directive: &ast::Directive,
3085) -> Vec<crate::types::FieldValidationConstraint> {
3086    use crate::types::FieldValidationConstraint as FVC;
3087    let mut constraints = Vec::new();
3088    for arg in &directive.arguments {
3089        match arg.name.as_str() {
3090            "format" => {
3091                if let ast::Value::String(s) = &arg.value {
3092                    constraints.push(FVC::Format(s.clone()));
3093                }
3094            }
3095            "min" => {
3096                let n = match &arg.value {
3097                    ast::Value::Float(f) => Some(*f),
3098                    ast::Value::Int(i) => Some(*i as f64),
3099                    _ => None,
3100                };
3101                if let Some(n) = n {
3102                    constraints.push(FVC::Min(n));
3103                }
3104            }
3105            "max" => {
3106                let n = match &arg.value {
3107                    ast::Value::Float(f) => Some(*f),
3108                    ast::Value::Int(i) => Some(*i as f64),
3109                    _ => None,
3110                };
3111                if let Some(n) = n {
3112                    constraints.push(FVC::Max(n));
3113                }
3114            }
3115            "minLength" => {
3116                if let ast::Value::Int(i) = &arg.value {
3117                    constraints.push(FVC::MinLength(*i));
3118                }
3119            }
3120            "maxLength" => {
3121                if let ast::Value::Int(i) = &arg.value {
3122                    constraints.push(FVC::MaxLength(*i));
3123                }
3124            }
3125            "pattern" => {
3126                if let ast::Value::String(s) = &arg.value {
3127                    constraints.push(FVC::Pattern(s.clone()));
3128                }
3129            }
3130            _ => {}
3131        }
3132    }
3133    constraints
3134}
3135
3136/// Build a FieldDefinition from a field's type + directives
3137fn build_field_def(field: &ast::FieldDef) -> FieldDefinition {
3138    let field_type = map_ast_type(&field.field_type);
3139    let mut indexed = false;
3140    let mut unique = false;
3141    let mut relation = None;
3142    let mut validations = Vec::new();
3143    for directive in &field.directives {
3144        match directive.name.as_str() {
3145            "indexed" | "index" => indexed = true,
3146            "unique" => {
3147                unique = true;
3148                indexed = true;
3149            }
3150            "primary" => {
3151                indexed = true;
3152                unique = true;
3153            }
3154            "relation" => {
3155                let to = directive
3156                    .arguments
3157                    .iter()
3158                    .find(|a| a.name == "to" || a.name == "collection")
3159                    .and_then(|a| {
3160                        if let ast::Value::String(s) = &a.value {
3161                            Some(s.clone())
3162                        } else {
3163                            None
3164                        }
3165                    })
3166                    .unwrap_or_default();
3167                let key = directive
3168                    .arguments
3169                    .iter()
3170                    .find(|a| a.name == "key" || a.name == "field")
3171                    .and_then(|a| {
3172                        if let ast::Value::String(s) = &a.value {
3173                            Some(s.clone())
3174                        } else {
3175                            None
3176                        }
3177                    })
3178                    .unwrap_or_else(|| "id".to_string());
3179                if !to.is_empty() {
3180                    relation = Some(crate::types::Relation { to, key });
3181                }
3182            }
3183            "validate" => validations.extend(parse_validate_directive(directive)),
3184            _ => {}
3185        }
3186    }
3187    FieldDefinition {
3188        field_type,
3189        unique,
3190        indexed,
3191        nullable: !field.field_type.is_required,
3192        validations,
3193        relation,
3194    }
3195}
3196
3197async fn execute_migration(
3198    db: &Aurora,
3199    migration: &ast::Migration,
3200    _options: &ExecutionOptions,
3201) -> Result<ExecutionResult> {
3202    let mut steps_applied = 0;
3203    let mut last_version = String::new();
3204
3205    for step in &migration.steps {
3206        last_version = step.version.clone();
3207
3208        if db.is_migration_applied(&step.version).await? {
3209            eprintln!(
3210                "[migration] version '{}' already applied — skipping",
3211                step.version
3212            );
3213            continue;
3214        }
3215
3216        eprintln!("[migration] applying version '{}'", step.version);
3217
3218        for action in &step.actions {
3219            match action {
3220                ast::MigrationAction::Schema(schema_op) => {
3221                    execute_single_schema_op(db, schema_op).await?;
3222                }
3223                ast::MigrationAction::DataMigration(dm) => {
3224                    execute_data_migration(db, dm).await?;
3225                }
3226            }
3227        }
3228
3229        db.mark_migration_applied(&step.version).await?;
3230        steps_applied += 1;
3231        eprintln!("[migration] version '{}' applied", step.version);
3232    }
3233
3234    let status = if steps_applied > 0 {
3235        "applied".to_string()
3236    } else {
3237        "skipped".to_string()
3238    };
3239
3240    Ok(ExecutionResult::Migration(MigrationResult {
3241        version: last_version,
3242        steps_applied,
3243        status,
3244    }))
3245}
3246
3247/// Execute a single SchemaOp — shared by execute_schema and execute_migration.
3248async fn execute_single_schema_op(db: &Aurora, op: &ast::SchemaOp) -> Result<()> {
3249    match op {
3250        ast::SchemaOp::DefineCollection {
3251            name,
3252            fields,
3253            if_not_exists,
3254            ..
3255        } => {
3256            if *if_not_exists && db.get_collection_definition(name).is_ok() {
3257                return Ok(());
3258            }
3259            let field_defs: Vec<(&str, FieldDefinition)> = fields
3260                .iter()
3261                .map(|f| (f.name.as_str(), build_field_def(f)))
3262                .collect();
3263            db.new_collection(name, field_defs).await?;
3264        }
3265        ast::SchemaOp::AlterCollection { name, actions } => {
3266            for action in actions {
3267                match action {
3268                    ast::AlterAction::AddField { field, default } => {
3269                        db.add_field_to_schema(name, field.name.clone(), build_field_def(field))
3270                            .await?;
3271                        if let Some(default_val) = default {
3272                            let db_val = aql_value_to_db_value(default_val, &HashMap::new())?;
3273                            let docs = db.get_all_collection(name).await?;
3274                            for doc in docs {
3275                                if !doc.data.contains_key(&field.name) {
3276                                    db.update_document(
3277                                        name,
3278                                        &doc._sid,
3279                                        vec![(&field.name, db_val.clone())],
3280                                    )
3281                                    .await?;
3282                                }
3283                            }
3284                        }
3285                    }
3286                    ast::AlterAction::DropField(field_name) => {
3287                        db.drop_field_from_schema(name, field_name.clone()).await?;
3288                    }
3289                    ast::AlterAction::RenameField { from, to } => {
3290                        db.rename_field_in_schema(name, from.clone(), to.clone())
3291                            .await?;
3292                        // Rename the field key in every existing document (one read-modify-write per doc)
3293                        let docs = db.get_all_collection(name).await?;
3294                        for mut doc in docs {
3295                            if let Some(val) = doc.data.remove(from.as_str()) {
3296                                doc.data.insert(to.clone(), val);
3297                                let key = format!("{}:{}", name, doc._sid);
3298                                db.put(key, serde_json::to_vec(&doc)?, None).await?;
3299                            }
3300                        }
3301                    }
3302                    ast::AlterAction::ModifyField(field) => {
3303                        db.modify_field_in_schema(name, field.name.clone(), build_field_def(field))
3304                            .await?;
3305                    }
3306                }
3307            }
3308        }
3309        ast::SchemaOp::DropCollection { name, if_exists } => {
3310            if *if_exists && db.get_collection_definition(name).is_err() {
3311                return Ok(());
3312            }
3313            db.drop_collection_schema(name).await?;
3314        }
3315    }
3316    Ok(())
3317}
3318
3319/// Apply a `migrate data in <collection> { set field = expr where { ... } }` block.
3320async fn execute_data_migration(db: &Aurora, dm: &ast::DataMigration) -> Result<()> {
3321    let docs = db.get_all_collection(&dm.collection).await?;
3322
3323    for doc in docs {
3324        for transform in &dm.transforms {
3325            let matches = match &transform.filter {
3326                Some(filter) => {
3327                    let compiled = compile_filter(filter)?;
3328                    matches_filter(&doc, &compiled, &HashMap::new())
3329                }
3330                None => true,
3331            };
3332
3333            if matches {
3334                let new_value = eval_migration_expr(&transform.expression, &doc);
3335                let mut updates = HashMap::new();
3336                updates.insert(transform.field.clone(), new_value);
3337                db.aql_update_document(&dm.collection, &doc._sid, updates)
3338                    .await?;
3339            }
3340        }
3341    }
3342    Ok(())
3343}
3344
3345/// Evaluate a migration expression string to a `Value`.
3346///
3347/// Supported forms (evaluated in order):
3348/// - String literal:  `"hello"` → `Value::String("hello")`
3349/// - Boolean:         `true` / `false`
3350/// - Null:            `null`
3351/// - Integer:         `42`
3352/// - Float:           `3.14`
3353/// - Field reference: `field_name` (looks up the field in the current document)
3354fn eval_migration_expr(expr: &str, doc: &Document) -> Value {
3355    let expr = expr.trim();
3356
3357    if expr.starts_with('"') && expr.ends_with('"') && expr.len() >= 2 {
3358        return Value::String(expr[1..expr.len() - 1].to_string());
3359    }
3360    if expr == "true" {
3361        return Value::Bool(true);
3362    }
3363    if expr == "false" {
3364        return Value::Bool(false);
3365    }
3366    if expr == "null" {
3367        return Value::Null;
3368    }
3369    if let Ok(n) = expr.parse::<i64>() {
3370        return Value::Int(n);
3371    }
3372    if let Ok(f) = expr.parse::<f64>() {
3373        return Value::Float(f);
3374    }
3375    if let Some(v) = doc.data.get(expr) {
3376        return v.clone();
3377    }
3378
3379    Value::Null
3380}
3381
3382fn extract_filter_from_args(args: &[ast::Argument]) -> Result<Option<AqlFilter>> {
3383    for a in args {
3384        if a.name == "where" || a.name == "filter" {
3385            return Ok(Some(value_to_filter(&a.value)?));
3386        }
3387    }
3388    Ok(None)
3389}
3390
3391fn extract_order_by(args: &[ast::Argument]) -> Vec<ast::Ordering> {
3392    let mut orderings = Vec::new();
3393    for a in args {
3394        if a.name == "orderBy" {
3395            match &a.value {
3396                ast::Value::String(f) => orderings.push(ast::Ordering {
3397                    field: f.clone(),
3398                    direction: ast::SortDirection::Asc,
3399                }),
3400                ast::Value::Object(obj) => {
3401                    // Formal syntax: { field: "fieldname", direction: ASC|DESC }
3402                    if let (Some(ast::Value::String(field_name)), Some(dir_val)) =
3403                        (obj.get("field"), obj.get("direction"))
3404                    {
3405                        let direction = match dir_val {
3406                            ast::Value::Enum(s) | ast::Value::String(s) => {
3407                                if s.to_uppercase() == "DESC" {
3408                                    ast::SortDirection::Desc
3409                                } else {
3410                                    ast::SortDirection::Asc
3411                                }
3412                            }
3413                            _ => ast::SortDirection::Asc,
3414                        };
3415                        orderings.push(ast::Ordering {
3416                            field: field_name.clone(),
3417                            direction,
3418                        });
3419                    } else {
3420                        // Shorthand: { fieldName: "ASC", fieldName2: "DESC" }
3421                        for (field, dir_val) in obj {
3422                            let direction = match dir_val {
3423                                ast::Value::Enum(s) | ast::Value::String(s) => {
3424                                    if s.to_uppercase() == "DESC" {
3425                                        ast::SortDirection::Desc
3426                                    } else {
3427                                        ast::SortDirection::Asc
3428                                    }
3429                                }
3430                                _ => ast::SortDirection::Asc,
3431                            };
3432                            orderings.push(ast::Ordering {
3433                                field: field.clone(),
3434                                direction,
3435                            });
3436                        }
3437                    }
3438                }
3439                _ => {}
3440            }
3441        }
3442    }
3443    orderings
3444}
3445
3446fn apply_ordering(docs: &mut [Document], orderings: &[ast::Ordering]) {
3447    docs.sort_by(|a, b| {
3448        for o in orderings {
3449            let cmp = compare_values(a.data.get(&o.field), b.data.get(&o.field));
3450            if cmp != std::cmp::Ordering::Equal {
3451                return match o.direction {
3452                    ast::SortDirection::Asc => cmp,
3453                    ast::SortDirection::Desc => cmp.reverse(),
3454                };
3455            }
3456        }
3457        std::cmp::Ordering::Equal
3458    });
3459}
3460
3461fn compare_values(a: Option<&Value>, b: Option<&Value>) -> std::cmp::Ordering {
3462    match (a, b) {
3463        (None, None) => std::cmp::Ordering::Equal,
3464        (None, Some(_)) => std::cmp::Ordering::Less,
3465        (Some(_), None) => std::cmp::Ordering::Greater,
3466        (Some(av), Some(bv)) => av.partial_cmp(bv).unwrap_or(std::cmp::Ordering::Equal),
3467    }
3468}
3469
3470pub fn extract_pagination(args: &[ast::Argument]) -> (Option<usize>, usize) {
3471    let (mut limit, mut offset) = (None, 0);
3472    for a in args {
3473        match a.name.as_str() {
3474            "limit" | "first" => {
3475                if let ast::Value::Int(n) = a.value {
3476                    limit = Some(n as usize);
3477                }
3478            }
3479            "offset" | "skip" => {
3480                if let ast::Value::Int(n) = a.value {
3481                    offset = n as usize;
3482                }
3483            }
3484            _ => {}
3485        }
3486    }
3487    (limit, offset)
3488}
3489
3490fn extract_cursor_pagination(
3491    args: &[ast::Argument],
3492) -> (Option<usize>, Option<String>, Option<usize>, Option<String>) {
3493    let (mut first, mut after, mut last, mut before) = (None, None, None, None);
3494    for a in args {
3495        match a.name.as_str() {
3496            "first" => {
3497                if let ast::Value::Int(n) = a.value {
3498                    first = Some(n as usize);
3499                }
3500            }
3501            "after" => {
3502                if let ast::Value::String(s) = &a.value {
3503                    after = Some(s.clone());
3504                }
3505            }
3506            "last" => {
3507                if let ast::Value::Int(n) = a.value {
3508                    last = Some(n as usize);
3509                }
3510            }
3511            "before" => {
3512                if let ast::Value::String(s) = &a.value {
3513                    before = Some(s.clone());
3514                }
3515            }
3516            _ => {}
3517        }
3518    }
3519    (first, after, last, before)
3520}
3521
3522fn execute_connection(
3523    mut docs: Vec<Document>,
3524    sub_fields: &[ast::Field],
3525    limit: Option<usize>,
3526    fragments: &HashMap<String, FragmentDef>,
3527    variables: &HashMap<String, ast::Value>,
3528    options: &ExecutionOptions,
3529) -> Result<QueryResult> {
3530    let has_next_page = if let Some(l) = limit {
3531        docs.len() > l
3532    } else {
3533        false
3534    };
3535
3536    if has_next_page {
3537        docs.truncate(limit.unwrap());
3538    }
3539
3540    let mut edges = Vec::with_capacity(docs.len());
3541    let mut end_cursor = String::new();
3542
3543    // Find the 'node' selection fields once (potentially expanding fragments)
3544    let node_fields = if let Some(edges_field) = sub_fields.iter().find(|f| f.name == "edges") {
3545        let edges_sub_fields =
3546            collect_fields(&edges_field.selection_set, fragments, variables, None)
3547                .unwrap_or_default();
3548        if let Some(node_field) = edges_sub_fields.into_iter().find(|f| f.name == "node") {
3549            collect_fields(&node_field.selection_set, fragments, variables, None)
3550                .unwrap_or_default()
3551        } else {
3552            Vec::new()
3553        }
3554    } else {
3555        Vec::new()
3556    };
3557
3558    for doc in docs {
3559        let cursor = doc._sid.clone();
3560        end_cursor = cursor.clone();
3561
3562        let mut edge_data = HashMap::new();
3563        edge_data.insert("cursor".to_string(), Value::String(cursor));
3564
3565        let node_doc = if node_fields.is_empty() {
3566            doc
3567        } else {
3568            apply_projection(doc, &node_fields, options)?
3569        };
3570
3571        edge_data.insert("node".to_string(), Value::Object(node_doc.data));
3572        edges.push(Value::Object(edge_data));
3573    }
3574
3575    let mut page_info = HashMap::new();
3576    page_info.insert("hasNextPage".to_string(), Value::Bool(has_next_page));
3577    page_info.insert("endCursor".to_string(), Value::String(end_cursor));
3578
3579    let mut conn_data = HashMap::new();
3580    conn_data.insert("edges".to_string(), Value::Array(edges));
3581    conn_data.insert("pageInfo".to_string(), Value::Object(page_info));
3582
3583    Ok(QueryResult {
3584        collection: String::new(),
3585        documents: vec![Document {
3586            _sid: "connection".to_string(),
3587            data: conn_data,
3588        }],
3589        total_count: None,
3590        deferred_fields: vec![],
3591        explain: None,
3592    })
3593}
3594
3595pub fn matches_filter(
3596    doc: &Document,
3597    filter: &CompiledFilter,
3598    vars: &HashMap<String, ast::Value>,
3599) -> bool {
3600    match filter {
3601        CompiledFilter::Eq(f, v) => {
3602            if let Some(dv) = doc.data.get(f) {
3603                values_equal(dv, v, vars)
3604            } else if f == "_sid" {
3605                values_equal(&Value::String(doc._sid.clone()), v, vars)
3606            } else {
3607                false
3608            }
3609        }
3610        CompiledFilter::Ne(f, v) => {
3611            if let Some(dv) = doc.data.get(f) {
3612                !values_equal(dv, v, vars)
3613            } else if f == "_sid" {
3614                !values_equal(&Value::String(doc._sid.clone()), v, vars)
3615            } else {
3616                true
3617            }
3618        }
3619        CompiledFilter::Gt(f, v) => {
3620            let dv_opt = doc.data.get(f).cloned().or_else(|| {
3621                if f == "_sid" {
3622                    Some(Value::String(doc._sid.clone()))
3623                } else {
3624                    None
3625                }
3626            });
3627            dv_opt.map_or(false, |dv| {
3628                if let Ok(bv) = aql_value_to_db_value(v, vars) {
3629                    return dv > bv;
3630                }
3631                false
3632            })
3633        }
3634        CompiledFilter::Gte(f, v) => {
3635            let dv_opt = doc.data.get(f).cloned().or_else(|| {
3636                if f == "_sid" {
3637                    Some(Value::String(doc._sid.clone()))
3638                } else {
3639                    None
3640                }
3641            });
3642            dv_opt.map_or(false, |dv| {
3643                if let Ok(bv) = aql_value_to_db_value(v, vars) {
3644                    return dv >= bv;
3645                }
3646                false
3647            })
3648        }
3649        CompiledFilter::Lt(f, v) => {
3650            let dv_opt = doc.data.get(f).cloned().or_else(|| {
3651                if f == "_sid" {
3652                    Some(Value::String(doc._sid.clone()))
3653                } else {
3654                    None
3655                }
3656            });
3657            dv_opt.map_or(false, |dv| {
3658                if let Ok(bv) = aql_value_to_db_value(v, vars) {
3659                    return dv < bv;
3660                }
3661                false
3662            })
3663        }
3664        CompiledFilter::Lte(f, v) => {
3665            let dv_opt = doc.data.get(f).cloned().or_else(|| {
3666                if f == "_sid" {
3667                    Some(Value::String(doc._sid.clone()))
3668                } else {
3669                    None
3670                }
3671            });
3672            dv_opt.map_or(false, |dv| {
3673                if let Ok(bv) = aql_value_to_db_value(v, vars) {
3674                    return dv <= bv;
3675                }
3676                false
3677            })
3678        }
3679        CompiledFilter::In(f, v) => {
3680            let dv_opt = doc.data.get(f).cloned().or_else(|| {
3681                if f == "_sid" {
3682                    Some(Value::String(doc._sid.clone()))
3683                } else {
3684                    None
3685                }
3686            });
3687            dv_opt.map_or(false, |dv| {
3688                if let Ok(Value::Array(arr)) = aql_value_to_db_value(v, vars) {
3689                    return arr.contains(&dv);
3690                }
3691                false
3692            })
3693        }
3694        CompiledFilter::NotIn(f, v) => {
3695            let dv_opt = doc.data.get(f).cloned().or_else(|| {
3696                if f == "_sid" {
3697                    Some(Value::String(doc._sid.clone()))
3698                } else {
3699                    None
3700                }
3701            });
3702            dv_opt.map_or(true, |dv| {
3703                if let Ok(Value::Array(arr)) = aql_value_to_db_value(v, vars) {
3704                    return !arr.contains(&dv);
3705                }
3706                true
3707            })
3708        }
3709        CompiledFilter::Contains(f, v) => {
3710            let dv_opt = doc.data.get(f).cloned().or_else(|| {
3711                if f == "_sid" {
3712                    Some(Value::String(doc._sid.clone()))
3713                } else {
3714                    None
3715                }
3716            });
3717            if let Some(dv) = dv_opt {
3718                match (dv, resolve_if_variable(v, vars)) {
3719                    (Value::String(s), ast::Value::String(sub)) => s.contains(&*sub),
3720                    (Value::Array(arr), _) => {
3721                        if let Ok(bv) = aql_value_to_db_value(v, vars) {
3722                            return arr.contains(&bv);
3723                        }
3724                        false
3725                    }
3726                    _ => false,
3727                }
3728            } else {
3729                false
3730            }
3731        }
3732        // Field array must contain at least one of the provided values
3733        CompiledFilter::ContainsAny(f, v) => {
3734            let dv_opt = doc.data.get(f).cloned().or_else(|| {
3735                if f == "_sid" {
3736                    Some(Value::String(doc._sid.clone()))
3737                } else {
3738                    None
3739                }
3740            });
3741            if let (Some(Value::Array(field_arr)), Ok(Value::Array(check_arr))) =
3742                (dv_opt, aql_value_to_db_value(v, vars))
3743            {
3744                check_arr.iter().any(|item| field_arr.contains(item))
3745            } else {
3746                false
3747            }
3748        }
3749        // Field array must contain all of the provided values
3750        CompiledFilter::ContainsAll(f, v) => {
3751            let dv_opt = doc.data.get(f).cloned().or_else(|| {
3752                if f == "_sid" {
3753                    Some(Value::String(doc._sid.clone()))
3754                } else {
3755                    None
3756                }
3757            });
3758            if let (Some(Value::Array(field_arr)), Ok(Value::Array(check_arr))) =
3759                (dv_opt, aql_value_to_db_value(v, vars))
3760            {
3761                check_arr.iter().all(|item| field_arr.contains(item))
3762            } else {
3763                false
3764            }
3765        }
3766        CompiledFilter::StartsWith(f, v) => {
3767            let dv_opt = doc.data.get(f).cloned().or_else(|| {
3768                if f == "_sid" {
3769                    Some(Value::String(doc._sid.clone()))
3770                } else {
3771                    None
3772                }
3773            });
3774            if let (Some(Value::String(s)), ast::Value::String(pre)) =
3775                (dv_opt, resolve_if_variable(v, vars))
3776            {
3777                s.starts_with(&*pre)
3778            } else {
3779                false
3780            }
3781        }
3782        CompiledFilter::EndsWith(f, v) => {
3783            let dv_opt = doc.data.get(f).cloned().or_else(|| {
3784                if f == "_sid" {
3785                    Some(Value::String(doc._sid.clone()))
3786                } else {
3787                    None
3788                }
3789            });
3790            if let (Some(Value::String(s)), ast::Value::String(suf)) =
3791                (dv_opt, resolve_if_variable(v, vars))
3792            {
3793                s.ends_with(&*suf)
3794            } else {
3795                false
3796            }
3797        }
3798        CompiledFilter::Matches(f, re) => {
3799            let dv_opt = doc.data.get(f).cloned().or_else(|| {
3800                if f == "_sid" {
3801                    Some(Value::String(doc._sid.clone()))
3802                } else {
3803                    None
3804                }
3805            });
3806            if let Some(Value::String(s)) = dv_opt {
3807                re.is_match(&s)
3808            } else {
3809                false
3810            }
3811        }
3812        CompiledFilter::IsNull(f) => {
3813            if f == "_sid" {
3814                false // _sid is never null
3815            } else {
3816                doc.data.get(f).map_or(true, |v| matches!(v, Value::Null))
3817            }
3818        }
3819        CompiledFilter::IsNotNull(f) => {
3820            if f == "_sid" {
3821                true // _sid is always non-null
3822            } else {
3823                doc.data.get(f).map_or(false, |v| !matches!(v, Value::Null))
3824            }
3825        }
3826        CompiledFilter::And(fs) => fs.iter().all(|f| matches_filter(doc, f, vars)),
3827        CompiledFilter::Or(fs) => fs.iter().any(|f| matches_filter(doc, f, vars)),
3828        CompiledFilter::Not(f) => !matches_filter(doc, f, vars),
3829    }
3830}
3831
3832/// Apply an update modifier object to an existing field value.
3833/// Modifiers: `{ increment: N }`, `{ decrement: N }`, `{ push: V }`,
3834/// `{ pull: V }`, `{ addToSet: V }`.
3835/// Any non-modifier object is returned as-is (plain field replace).
3836fn apply_field_modifier(existing: Option<&Value>, new_val: &Value) -> Value {
3837    if let Value::Object(modifier) = new_val {
3838        if let Some(delta) = modifier.get("increment") {
3839            match (existing, delta) {
3840                (Some(Value::Int(c)), Value::Int(d)) => return Value::Int(c + d),
3841                (Some(Value::Float(c)), Value::Float(d)) => return Value::Float(c + d),
3842                (Some(Value::Int(c)), Value::Float(d)) => return Value::Float(*c as f64 + d),
3843                _ => {}
3844            }
3845        }
3846        if let Some(delta) = modifier.get("decrement") {
3847            match (existing, delta) {
3848                (Some(Value::Int(c)), Value::Int(d)) => return Value::Int(c - d),
3849                (Some(Value::Float(c)), Value::Float(d)) => return Value::Float(c - d),
3850                (Some(Value::Int(c)), Value::Float(d)) => return Value::Float(*c as f64 - d),
3851                _ => {}
3852            }
3853        }
3854        if let Some(item) = modifier.get("push") {
3855            if let Some(Value::Array(mut arr)) = existing.cloned() {
3856                arr.push(item.clone());
3857                return Value::Array(arr);
3858            }
3859            return Value::Array(vec![item.clone()]);
3860        }
3861        if let Some(item) = modifier.get("pull") {
3862            if let Some(Value::Array(arr)) = existing {
3863                let filtered: Vec<Value> = arr.iter().filter(|v| *v != item).cloned().collect();
3864                return Value::Array(filtered);
3865            }
3866            return Value::Array(vec![]);
3867        }
3868        if let Some(item) = modifier.get("addToSet") {
3869            if let Some(Value::Array(mut arr)) = existing.cloned() {
3870                if !arr.contains(item) {
3871                    arr.push(item.clone());
3872                }
3873                return Value::Array(arr);
3874            }
3875            return Value::Array(vec![item.clone()]);
3876        }
3877    }
3878    new_val.clone()
3879}
3880
3881fn values_equal(dv: &Value, av: &ast::Value, vars: &HashMap<String, ast::Value>) -> bool {
3882    if let Ok(bv) = aql_value_to_db_value(av, vars) {
3883        return dv == &bv;
3884    }
3885    false
3886}
3887
3888fn resolve_if_variable<'a>(
3889    v: &'a ast::Value,
3890    vars: &'a HashMap<String, ast::Value>,
3891) -> &'a ast::Value {
3892    if let ast::Value::Variable(n) = v {
3893        vars.get(n).unwrap_or(v)
3894    } else {
3895        v
3896    }
3897}
3898
3899pub fn apply_projection(
3900    doc: Document,
3901    fields: &[ast::Field],
3902    options: &ExecutionOptions,
3903) -> Result<Document> {
3904    let (projected, _) = apply_projection_and_defer(doc, fields, &options.variables, options)?;
3905    Ok(projected)
3906}
3907
3908/// Apply projections, resolving lookup fields from foreign collections.
3909/// Returns (projected_doc, deferred_field_names).
3910async fn apply_projection_with_lookups(
3911    db: &Aurora,
3912    mut doc: Document,
3913    collection_name: &str,
3914    fields: &[ast::Field],
3915    fragments: &HashMap<String, FragmentDef>,
3916    vars: &HashMap<String, ast::Value>,
3917    options: &ExecutionOptions,
3918    pinned_fields: &[String],
3919) -> Result<(Document, Vec<String>)> {
3920    if fields.is_empty() {
3921        return Ok((doc, vec![]));
3922    }
3923    let mut proj = HashMap::new();
3924    let mut deferred = Vec::new();
3925
3926    // 1. Process all fields (including pinning dependencies)
3927    for f in fields {
3928        // RBAC Check
3929        check_permissions(&f.directives, options, false)?;
3930
3931        // @defer
3932        if f.directives.iter().any(|d| d.name == "defer") {
3933            deferred.push(f.alias.as_ref().unwrap_or(&f.name).clone());
3934            continue;
3935        }
3936
3937        let mut lookup_info: Option<(String, String, String)> = None;
3938        let coll_arg = f.arguments.iter().find(|a| a.name == "collection");
3939        let local_arg = f.arguments.iter().find(|a| a.name == "localField");
3940        let foreign_arg = f.arguments.iter().find(|a| a.name == "foreignField");
3941
3942        if let (Some(c), Some(lf), Some(ff)) = (coll_arg, local_arg, foreign_arg) {
3943            let c_val = resolve_if_variable(&c.value, vars);
3944            let lf_val = resolve_if_variable(&lf.value, vars);
3945            let ff_val = resolve_if_variable(&ff.value, vars);
3946
3947            if let (
3948                ast::Value::String(foreign_coll),
3949                ast::Value::String(local_field),
3950                ast::Value::String(foreign_field),
3951            ) = (c_val, lf_val, ff_val)
3952            {
3953                lookup_info = Some((
3954                    foreign_coll.clone(),
3955                    local_field.clone(),
3956                    foreign_field.clone(),
3957                ));
3958            }
3959        } else if let Ok(col_def) = db.get_collection_definition(collection_name) {
3960            if let Some(f_def) = col_def.fields.get(&f.name) {
3961                if let Some(rel) = &f_def.relation {
3962                    lookup_info = Some((rel.to.clone(), f.name.clone(), rel.key.clone()));
3963                }
3964            }
3965        }
3966
3967        if let Some((foreign_coll, local_field, foreign_field)) = lookup_info {
3968            let local_val = doc.data.get(local_field.as_str()).cloned().or_else(|| {
3969                if local_field == "_sid" {
3970                    Some(Value::String(doc._sid.clone()))
3971                } else {
3972                    None
3973                }
3974            });
3975
3976            if let Some(match_val) = local_val {
3977                let extra_filter = f
3978                    .arguments
3979                    .iter()
3980                    .find(|a| a.name == "where")
3981                    .and_then(|a| {
3982                        let resolved = resolve_ast_deep(&a.value, vars);
3983                        value_to_filter(&resolved).ok()
3984                    });
3985
3986                let vars_arc = Arc::new(vars.clone());
3987                let foreign_docs = db.scan_and_filter(
3988                    &foreign_coll,
3989                    |fdoc| {
3990                        let field_match = fdoc
3991                            .data
3992                            .get(foreign_field.as_str())
3993                            .map(|v| values_equal_db(v, &match_val))
3994                            .unwrap_or(
3995                                foreign_field == "_sid" && fdoc._sid == match_val.to_string(),
3996                            );
3997                        if !field_match {
3998                            return false;
3999                        }
4000                        if let Some(ref ef) = extra_filter {
4001                            let compiled = compile_filter(ef)
4002                                .unwrap_or(CompiledFilter::Eq("_".into(), ast::Value::Null));
4003                            matches_filter(fdoc, &compiled, &vars_arc)
4004                        } else {
4005                            true
4006                        }
4007                    },
4008                    None,
4009                )?;
4010
4011                let sub_projected: Vec<Value> = if f.selection_set.is_empty() {
4012                    foreign_docs
4013                        .into_iter()
4014                        .map(|fd| Value::Object(fd.data))
4015                        .collect()
4016                } else {
4017                    let sub_fields: Vec<ast::Field> =
4018                        collect_fields(&f.selection_set, fragments, vars, Some(&foreign_coll))?;
4019                    let mut sub_results = Vec::with_capacity(foreign_docs.len());
4020                    for fd in foreign_docs {
4021                        let (proj_fd, _) =
4022                            apply_projection_and_defer(fd, &sub_fields, vars, options)?;
4023                        sub_results.push(Value::Object(proj_fd.data));
4024                    }
4025                    sub_results
4026                };
4027
4028                proj.insert(
4029                    f.alias.as_ref().unwrap_or(&f.name).clone(),
4030                    Value::Array(sub_projected),
4031                );
4032            }
4033            continue;
4034        }
4035
4036        // Computed field
4037        if f.name == "__compute__" {
4038            let alias = f.alias.as_deref().unwrap_or("computed");
4039            if let Some(expr_arg) = f.arguments.iter().find(|a| a.name == "expr") {
4040                if let ast::Value::String(template) = &expr_arg.value {
4041                    let result = eval_template(template, &doc.data);
4042                    proj.insert(alias.to_string(), Value::String(result));
4043                } else if let Some(expr) = f.computed_expression.as_ref() {
4044                    let result = eval_expression(expr, &doc.data, vars);
4045                    proj.insert(alias.to_string(), result);
4046                }
4047            }
4048            continue;
4049        }
4050
4051        // Normal field
4052        if f.name == "id" {
4053            let v = doc
4054                .data
4055                .get("id")
4056                .cloned()
4057                .unwrap_or(Value::String(doc._sid.clone()));
4058            proj.insert(f.alias.as_ref().unwrap_or(&f.name).clone(), v);
4059        } else if f.name == "_sid" {
4060            proj.insert(
4061                f.alias.as_ref().unwrap_or(&f.name).clone(),
4062                Value::String(doc._sid.clone()),
4063            );
4064        } else if let Some(v) = doc.data.get(&f.name) {
4065            proj.insert(f.alias.as_ref().unwrap_or(&f.name).clone(), v.clone());
4066        }
4067    }
4068
4069    // 2. Add pinned fields if missing (needed for outer lookups)
4070    for pinned in pinned_fields {
4071        if !proj.contains_key(pinned) {
4072            if let Some(v) = doc.data.get(pinned) {
4073                proj.insert(pinned.clone(), v.clone());
4074            }
4075        }
4076    }
4077
4078    doc.data = proj;
4079    Ok((doc, deferred))
4080}
4081
4082fn values_equal_db(a: &Value, b: &Value) -> bool {
4083    a == b
4084}
4085
4086pub fn aql_value_to_db_value(v: &ast::Value, vars: &HashMap<String, ast::Value>) -> Result<Value> {
4087    let resolved = resolve_if_variable(v, vars);
4088    match resolved {
4089        ast::Value::Int(i) => Ok(Value::Int(*i)),
4090        ast::Value::Float(f) => Ok(Value::Float(*f)),
4091        ast::Value::Boolean(b) => Ok(Value::Bool(*b)),
4092        ast::Value::String(s) => Ok(Value::String(s.clone())),
4093        ast::Value::Enum(s) => Ok(Value::String(s.clone())),
4094        ast::Value::Null => Ok(Value::Null),
4095        ast::Value::Variable(name) => Err(AqlError::new(
4096            ErrorCode::UndefinedVariable,
4097            format!("Variable '{}' not found", name),
4098        )),
4099        ast::Value::Array(arr) => {
4100            let mut vals = Vec::with_capacity(arr.len());
4101            for v in arr {
4102                vals.push(aql_value_to_db_value(v, vars)?);
4103            }
4104            Ok(Value::Array(vals))
4105        }
4106        ast::Value::Object(obj) => {
4107            let mut map = HashMap::with_capacity(obj.len());
4108            for (k, v) in obj {
4109                map.insert(k.clone(), aql_value_to_db_value(v, vars)?);
4110            }
4111            Ok(Value::Object(map))
4112        }
4113    }
4114}
4115
4116/// Convert AQL AST value to serde_json::Value for job payloads
4117fn aql_value_to_json(v: &ast::Value) -> serde_json::Value {
4118    match v {
4119        ast::Value::Null => serde_json::Value::Null,
4120        ast::Value::Boolean(b) => serde_json::Value::Bool(*b),
4121        ast::Value::Int(i) => serde_json::Value::Number((*i).into()),
4122        ast::Value::Float(f) => serde_json::Number::from_f64(*f)
4123            .map(serde_json::Value::Number)
4124            .unwrap_or(serde_json::Value::Null),
4125        ast::Value::String(s) | ast::Value::Enum(s) => serde_json::Value::String(s.clone()),
4126        ast::Value::Array(arr) => {
4127            serde_json::Value::Array(arr.iter().map(aql_value_to_json).collect())
4128        }
4129        ast::Value::Object(obj) => serde_json::Value::Object(
4130            obj.iter()
4131                .map(|(k, v)| (k.clone(), aql_value_to_json(v)))
4132                .collect(),
4133        ),
4134        ast::Value::Variable(_) => serde_json::Value::Null,
4135    }
4136}
4137
4138fn aql_value_to_hashmap(
4139    v: &ast::Value,
4140    vars: &HashMap<String, ast::Value>,
4141) -> Result<HashMap<String, Value>> {
4142    if let ast::Value::Object(m) = resolve_if_variable(v, vars) {
4143        let mut res = HashMap::new();
4144        for (k, v) in m {
4145            res.insert(k.clone(), aql_value_to_db_value(v, vars)?);
4146        }
4147        Ok(res)
4148    } else {
4149        Err(AqlError::new(
4150            ErrorCode::QueryError,
4151            "Data must be object".to_string(),
4152        ))
4153    }
4154}
4155
4156fn aurora_value_to_json_value(v: &Value) -> JsonValue {
4157    match v {
4158        Value::Null => JsonValue::Null,
4159        Value::String(s) => JsonValue::String(s.clone()),
4160        Value::Int(i) => JsonValue::Number((*i).into()),
4161        Value::Float(f) => serde_json::Number::from_f64(*f)
4162            .map(JsonValue::Number)
4163            .unwrap_or(JsonValue::Null),
4164        Value::Bool(b) => JsonValue::Bool(*b),
4165        Value::Array(arr) => JsonValue::Array(arr.iter().map(aurora_value_to_json_value).collect()),
4166        Value::Object(m) => {
4167            let mut jm = serde_json::Map::new();
4168            for (k, v) in m {
4169                jm.insert(k.clone(), aurora_value_to_json_value(v));
4170            }
4171            JsonValue::Object(jm)
4172        }
4173        Value::Uuid(u) => JsonValue::String(u.to_string()),
4174        Value::DateTime(dt) => JsonValue::String(dt.to_rfc3339()),
4175    }
4176}
4177
4178/// Find an equality filter on an indexed field to allow O(1) lookup short-circuit
4179fn find_indexed_equality_filter(
4180    filter: &AqlFilter,
4181    db: &Aurora,
4182    collection: &str,
4183) -> Option<(String, ast::Value)> {
4184    match filter {
4185        AqlFilter::Eq(field, val) => {
4186            if field == "_sid" || db.has_index(collection, field) {
4187                Some((field.clone(), val.clone()))
4188            } else {
4189                None
4190            }
4191        }
4192        AqlFilter::And(filters) => {
4193            for f in filters {
4194                if let Some(res) = find_indexed_equality_filter(f, db, collection) {
4195                    return Some(res);
4196                }
4197            }
4198            None
4199        }
4200        _ => None,
4201    }
4202}
4203
4204pub fn value_to_filter(v: &ast::Value) -> Result<AqlFilter> {
4205    if let ast::Value::Object(m) = v {
4206        let mut fs = Vec::new();
4207        for (k, val) in m {
4208            match k.as_str() {
4209                "or" => {
4210                    if let ast::Value::Array(arr) = val {
4211                        let mut sub = Vec::new();
4212                        for item in arr {
4213                            sub.push(value_to_filter(item)?);
4214                        }
4215                        return Ok(AqlFilter::Or(sub));
4216                    }
4217                }
4218                "and" => {
4219                    if let ast::Value::Array(arr) = val {
4220                        let mut sub = Vec::new();
4221                        for item in arr {
4222                            sub.push(value_to_filter(item)?);
4223                        }
4224                        return Ok(AqlFilter::And(sub));
4225                    }
4226                }
4227                "not" => {
4228                    return Ok(AqlFilter::Not(Box::new(value_to_filter(val)?)));
4229                }
4230                field => {
4231                    if let ast::Value::Object(ops) = val {
4232                        for (op, ov) in ops {
4233                            match op.as_str() {
4234                                "eq" => fs.push(AqlFilter::Eq(field.to_string(), ov.clone())),
4235                                "ne" => fs.push(AqlFilter::Ne(field.to_string(), ov.clone())),
4236                                "gt" => fs.push(AqlFilter::Gt(field.to_string(), ov.clone())),
4237                                "gte" => fs.push(AqlFilter::Gte(field.to_string(), ov.clone())),
4238                                "lt" => fs.push(AqlFilter::Lt(field.to_string(), ov.clone())),
4239                                "lte" => fs.push(AqlFilter::Lte(field.to_string(), ov.clone())),
4240                                "in" => fs.push(AqlFilter::In(field.to_string(), ov.clone())),
4241                                "notin" => fs.push(AqlFilter::NotIn(field.to_string(), ov.clone())),
4242                                "contains" => {
4243                                    fs.push(AqlFilter::Contains(field.to_string(), ov.clone()))
4244                                }
4245                                "containsAny" => {
4246                                    fs.push(AqlFilter::ContainsAny(field.to_string(), ov.clone()))
4247                                }
4248                                "containsAll" => {
4249                                    fs.push(AqlFilter::ContainsAll(field.to_string(), ov.clone()))
4250                                }
4251                                "startsWith" => {
4252                                    fs.push(AqlFilter::StartsWith(field.to_string(), ov.clone()))
4253                                }
4254                                "endsWith" => {
4255                                    fs.push(AqlFilter::EndsWith(field.to_string(), ov.clone()))
4256                                }
4257                                "matches" => {
4258                                    fs.push(AqlFilter::Matches(field.to_string(), ov.clone()))
4259                                }
4260                                _ => {}
4261                            }
4262                        }
4263                    }
4264                }
4265            }
4266        }
4267        if fs.is_empty() {
4268            Ok(AqlFilter::And(vec![]))
4269        } else if fs.len() == 1 {
4270            Ok(fs.remove(0))
4271        } else {
4272            Ok(AqlFilter::And(fs))
4273        }
4274    } else {
4275        Err(AqlError::new(
4276            ErrorCode::QueryError,
4277            "Filter must be object".to_string(),
4278        ))
4279    }
4280}
4281
4282fn resolve_value(
4283    v: &ast::Value,
4284    vars: &HashMap<String, ast::Value>,
4285    _ctx: &ExecutionContext,
4286) -> ast::Value {
4287    match v {
4288        ast::Value::Variable(n) => vars.get(n).cloned().unwrap_or(ast::Value::Null),
4289        _ => v.clone(),
4290    }
4291}
4292
4293/// Recursively resolve all variable references inside an AST value.
4294/// Unlike `resolve_value` (which only handles a top-level `$var`), this walks
4295/// into objects and arrays so nested references like
4296/// `{ query: $term }` or `{ status: { eq: $activeStatus } }` are fully resolved.
4297fn resolve_ast_deep(v: &ast::Value, vars: &HashMap<String, ast::Value>) -> ast::Value {
4298    match v {
4299        ast::Value::Variable(n) => vars.get(n).cloned().unwrap_or(ast::Value::Null),
4300        ast::Value::Object(m) => ast::Value::Object(
4301            m.iter()
4302                .map(|(k, val)| (k.clone(), resolve_ast_deep(val, vars)))
4303                .collect(),
4304        ),
4305        ast::Value::Array(arr) => {
4306            ast::Value::Array(arr.iter().map(|val| resolve_ast_deep(val, vars)).collect())
4307        }
4308        _ => v.clone(),
4309    }
4310}
4311
4312#[cfg(test)]
4313mod tests {
4314    use super::*;
4315    use crate::{Aurora, AuroraConfig, DurabilityMode, FieldType};
4316    use tempfile::TempDir;
4317
4318    #[tokio::test]
4319    async fn test_executor_integration() {
4320        let td = TempDir::new().unwrap();
4321        let db = Aurora::with_config(AuroraConfig {
4322            db_path: td.path().join("test.db"),
4323            enable_write_buffering: false,
4324            durability_mode: DurabilityMode::Strict,
4325            ..Default::default()
4326        })
4327        .await
4328        .unwrap();
4329        db.new_collection(
4330            "users",
4331            vec![(
4332                "name",
4333                crate::types::FieldDefinition {
4334                    field_type: FieldType::SCALAR_STRING,
4335                    unique: false,
4336                    indexed: true,
4337                    nullable: true,
4338                    ..Default::default()
4339                },
4340            )],
4341        )
4342        .await
4343        .unwrap();
4344        let _ = execute(
4345            &db,
4346            r#"mutation { insertInto(collection: "users", data: { name: "Alice" }) { id name } }"#,
4347            ExecutionOptions::new(),
4348        )
4349        .await
4350        .unwrap();
4351        let res = execute(&db, "query { users { name } }", ExecutionOptions::new())
4352            .await
4353            .unwrap();
4354        if let ExecutionResult::Query(q) = res {
4355            assert_eq!(q.documents.len(), 1);
4356        } else {
4357            panic!("Expected query");
4358        }
4359    }
4360}