Skip to main content

aurora_db/parser/
executor.rs

1//! AQL Executor - Connects parsed AQL to Aurora operations
2//!
3//! Provides the bridge between AQL AST and Aurora's database operations.
4//! All operations (queries, mutations, subscriptions) go through execute().
5
6use super::ast::{self, Field, Filter as AqlFilter, FragmentDef, MutationOp, Operation, Selection};
7use super::executor_utils::{CompiledFilter, compile_filter};
8
9use crate::Aurora;
10use crate::error::{AqlError, ErrorCode, Result};
11use crate::types::{Document, FieldDefinition, FieldType, ScalarType, Value};
12use serde::Serialize;
13use serde_json::Value as JsonValue;
14use std::collections::HashMap;
15use std::sync::Arc;
16
17// chrono used for downsample timestamp bucketing
18#[allow(unused_imports)]
19use chrono::TimeZone as _;
20
21pub type ExecutionContext = HashMap<String, JsonValue>;
22
23/// A pre-compiled query plan that can be executed repeatedly with different variables
24#[derive(Debug, Clone)]
25pub struct QueryPlan {
26    pub collection: String,
27    pub filter: Option<ast::Filter>,
28    pub compiled_filter: Option<CompiledFilter>,
29    pub projection: Vec<ast::Field>,
30    pub limit: Option<usize>,
31    pub offset: usize,
32    pub after: Option<String>,
33    pub orderings: Vec<ast::Ordering>,
34    pub is_connection: bool,
35    pub has_lookups: bool,
36    pub fragments: HashMap<String, FragmentDef>,
37    pub variable_definitions: Vec<ast::VariableDefinition>,
38    pub directives: Vec<ast::Directive>,
39    /// Fields required for lookups (localField values)
40    pub lookup_dependencies: Vec<String>,
41}
42
43impl QueryPlan {
44    pub fn validate(&self, provided_variables: &HashMap<String, ast::Value>) -> Result<()> {
45        validate_required_variables(&self.variable_definitions, provided_variables)
46    }
47
48    pub fn from_query(
49        db: &Aurora,
50        query: &ast::Query,
51        fragments: &HashMap<String, FragmentDef>,
52        variables: &HashMap<String, ast::Value>,
53    ) -> Result<Vec<Self>> {
54        let root_fields = collect_fields(&query.selection_set, fragments, variables, None)?;
55
56        let mut plans = Vec::new();
57        for field in root_fields {
58            let collection = field.name.clone();
59            let filter = extract_filter_from_args(&field.arguments)?;
60            let compiled_filter = if let Some(ref f) = filter {
61                Some(compile_filter(f)?)
62            } else {
63                None
64            };
65
66            let sub_fields = collect_fields(
67                &field.selection_set,
68                fragments,
69                variables,
70                Some(&field.name),
71            )?;
72
73            let (limit, offset) = extract_pagination(&field.arguments);
74            let (first, after, _last, _before) = extract_cursor_pagination(&field.arguments);
75            let orderings = extract_order_by(&field.arguments);
76            let is_connection = sub_fields
77                .iter()
78                .any(|f| f.name == "edges" || f.name == "pageInfo");
79
80            let lookup_dependencies =
81                identify_lookup_dependencies(db, &collection, &sub_fields, variables);
82            let has_lookups = !lookup_dependencies.is_empty();
83
84            plans.push(QueryPlan {
85                collection,
86                filter,
87                compiled_filter,
88                projection: sub_fields,
89                limit: limit.or(first),
90                offset,
91                after,
92                orderings,
93                is_connection,
94                has_lookups,
95                fragments: fragments.clone(),
96                variable_definitions: query.variable_definitions.clone(),
97                directives: field.directives.clone(),
98                lookup_dependencies,
99            });
100        }
101        Ok(plans)
102    }
103}
104
105/// Execute an AQL query string against the database
106pub async fn execute(db: &Aurora, aql: &str, options: ExecutionOptions) -> Result<ExecutionResult> {
107    use std::collections::hash_map::DefaultHasher;
108    use std::hash::{Hash, Hasher};
109
110    // Compute query hash for cache lookup (ignoring whitespace variations)
111    let query_key = {
112        let mut hasher = DefaultHasher::new();
113        aql.trim().hash(&mut hasher);
114        hasher.finish()
115    };
116
117    // Prepare variables for this execution
118    let vars: HashMap<String, ast::Value> = options.variables.clone();
119
120    // HIGH-SPEED PATH: Check for pre-compiled Query Plan
121    // Search queries are never cached — execute_plan doesn't handle `search:`.
122    if let Some(plan) = db.plan_cache.get(&query_key) {
123        plan.validate(&vars)?;
124        return execute_plan(db, &plan, &vars, &options).await;
125    }
126
127    // SLOW PATH: Parse and Compile
128    let mut doc = super::parse(aql)?;
129    println!("Parsed document with {} operations", doc.operations.len());
130
131    // Extract first query operation for planning
132    if let Some(Operation::Query(query)) = doc
133        .operations
134        .iter()
135        .find(|op| matches!(op, Operation::Query(_)))
136    {
137        let fragments: HashMap<String, FragmentDef> = doc
138            .operations
139            .iter()
140            .filter_map(|op| {
141                if let Operation::FragmentDefinition(f) = op {
142                    Some((f.name.clone(), f.clone()))
143                } else {
144                    None
145                }
146            })
147            .collect();
148
149        // Skip the fast plan path for search queries — execute_plan doesn't
150        // handle the `search:` argument; route through execute_document instead.
151        // Use collect_fields() so inline fragments and fragment spreads are
152        // traversed the same way QueryPlan::from_query() traverses them.
153        let root_fields = collect_fields(&query.selection_set, &fragments, &vars, None)?;
154        let has_search = root_fields
155            .iter()
156            .any(|f| f.arguments.iter().any(|a| a.name == "search"));
157
158        if !has_search {
159            let plans = QueryPlan::from_query(db, query, &fragments, &vars)?;
160            if plans.len() == 1 {
161                let plan = Arc::new(plans[0].clone());
162                plan.validate(&vars)?;
163                db.plan_cache.insert(query_key, Arc::clone(&plan));
164
165                return execute_plan(db, &plan, &vars, &options).await;
166            }
167        }
168    }
169
170    // FALLBACK: Normal execution for complex documents (but WITH our prepared vars)
171    // Merge declared variable defaults for any var not explicitly provided.
172    let mut vars = vars;
173    for op in &doc.operations {
174        if let Operation::Query(q) = op {
175            for var_def in &q.variable_definitions {
176                if !vars.contains_key(&var_def.name) {
177                    if let Some(default) = &var_def.default_value {
178                        vars.insert(var_def.name.clone(), default.clone());
179                    }
180                }
181            }
182        }
183    }
184
185    // We must resolve variables in the AST before executing the document path
186    super::validator::resolve_variables(&mut doc, &vars).map_err(|e| {
187        let code = match e.code {
188            super::validator::ErrorCode::MissingRequiredVariable => ErrorCode::UndefinedVariable,
189            super::validator::ErrorCode::TypeMismatch => ErrorCode::TypeError,
190            _ => ErrorCode::QueryError,
191        };
192        AqlError::new(code, e.to_string())
193    })?;
194
195    execute_document(db, &doc, &options).await
196}
197
198pub async fn execute_plan(
199    db: &Aurora,
200    plan: &QueryPlan,
201    variables: &HashMap<String, ast::Value>,
202    options: &ExecutionOptions,
203) -> Result<ExecutionResult> {
204    // RBAC: Check collection level permissions from query directives
205    check_permissions(&plan.directives, options, false)?;
206
207    let collection_name = &plan.collection;
208
209    // Determine effective limit for the scan
210    let effective_limit = plan.limit;
211
212    // 1. Resolve Indexed Equality (Fast path)
213    let mut indexed_docs = None;
214    if let Some(ref f) = plan.filter {
215        // We need a version of find_indexed that uses our runtime variables
216        if let Some((field, val)) =
217            find_indexed_equality_filter_runtime(f, db, collection_name, variables)
218        {
219            // Skip the indexed fast path if the index is still being rebuilt —
220            // fall through to the full scan below instead of returning zero results.
221            let index_ready = field == "_sid" || !db.is_index_building(collection_name, &field);
222            if index_ready {
223                let mut db_val = aql_value_to_db_value(&val, variables)?;
224
225                // Coerce based on schema if available to ensure index hits
226                if let Ok(col_def) = db.get_collection_definition(collection_name) {
227                    if let Some(f_def) = col_def.fields.get(&field) {
228                        db_val = db_val.coerce_to(&f_def.field_type);
229                    }
230                }
231
232                let ids = if field == "_sid" {
233                    match &db_val {
234                        Value::String(s) => vec![s.clone()],
235                        Value::Uuid(u) => vec![u.to_string()],
236                        _ => vec![],
237                    }
238                } else {
239                    db.get_ids_from_index(collection_name, &field, &db_val)
240                };
241
242                let mut docs = Vec::with_capacity(ids.len());
243                for id in ids {
244                    if let Some(doc) = db.get_document(collection_name, &id)? {
245                        // Check if doc matches the REST of the plan's compiled filter
246                        if let Some(ref cf) = plan.compiled_filter {
247                            if matches_filter(&doc, cf, variables) {
248                                docs.push(doc);
249                            }
250                        } else {
251                            docs.push(doc);
252                        }
253                    }
254                }
255
256                indexed_docs = Some(docs);
257            } // end if index_ready — if not ready, indexed_docs stays None → full scan
258        }
259    }
260
261    let mut docs = if let Some(d) = indexed_docs {
262        d
263    } else {
264        // Fallback to scan
265        let vars_arc = Arc::new(variables.clone());
266        let cf_clone = plan.compiled_filter.clone();
267        let filter_fn = move |doc: &Document| {
268            cf_clone
269                .as_ref()
270                .map(|f| matches_filter(doc, f, &vars_arc))
271                .unwrap_or(true)
272        };
273
274        // Scan-limit is only valid when there is no cursor and no ordering —
275        // orderings require a full scan to find the correct top-N.
276        let scan_limit = if plan.after.is_some() || !plan.orderings.is_empty() {
277            None
278        } else {
279            plan.limit.map(|l| {
280                let base = if plan.is_connection { l + 1 } else { l };
281                base + plan.offset
282            })
283        };
284        db.scan_and_filter(collection_name, filter_fn, scan_limit)?
285    };
286
287    // 2. Sort (must happen before cursor drain and offset)
288    if !plan.orderings.is_empty() {
289        apply_ordering(&mut docs, &plan.orderings);
290    }
291
292    // 2a. Cursor pagination: skip past the `after` cursor
293    if let Some(ref cursor) = plan.after {
294        if let Some(pos) = docs.iter().position(|d| &d._sid == cursor) {
295            docs.drain(0..=pos);
296        }
297    }
298
299    // 2b. Apply offset
300    if plan.offset > 0 {
301        if plan.offset < docs.len() {
302            docs.drain(0..plan.offset);
303        } else {
304            docs.clear();
305        }
306    }
307
308    // 3. Handle Connection vs Flat List
309    if plan.is_connection {
310        return Ok(ExecutionResult::Query(execute_connection(
311            docs,
312            &plan.projection,
313            effective_limit,
314            &plan.fragments,
315            variables,
316            options,
317        )?));
318    }
319
320    // 3a. Apply limit for flat list (after ordering so we get correct top-N)
321    if let Some(l) = effective_limit {
322        docs.truncate(l);
323    }
324
325    // 4. Project (Flat List)
326    if options.apply_projections && !plan.projection.is_empty() {
327        // Check if we have an aggregate or groupBy field
328        let agg_field = plan.projection.iter().find(|f| f.name == "aggregate");
329        let group_by_field = plan.projection.iter().find(|f| f.name == "groupBy");
330
331        if let Some(f) = group_by_field {
332            // Handle GroupBy
333            let field_name = f
334                .arguments
335                .iter()
336                .find(|a| a.name == "field")
337                .and_then(|a| match &a.value {
338                    ast::Value::String(s) => Some(s),
339                    _ => None,
340                });
341
342            if let Some(group_field) = field_name {
343                let mut groups: HashMap<String, Vec<Document>> = HashMap::new();
344                for d in docs {
345                    let key = d
346                        .data
347                        .get(group_field)
348                        .map(|v| match v {
349                            Value::String(s) => s.clone(),
350                            _ => v.to_string(),
351                        })
352                        .unwrap_or_else(|| "null".to_string());
353                    groups.entry(key).or_default().push(d);
354                }
355
356                let mut group_docs = Vec::with_capacity(groups.len());
357                for (key, group_items) in groups {
358                    let mut data = HashMap::new();
359                    for selection in &f.selection_set {
360                        if let Selection::Field(sub_f) = selection {
361                            let alias = sub_f.alias.as_ref().unwrap_or(&sub_f.name);
362                            match sub_f.name.as_str() {
363                                "key" => {
364                                    data.insert(alias.clone(), Value::String(key.clone()));
365                                }
366                                "count" => {
367                                    data.insert(
368                                        alias.clone(),
369                                        Value::Int(group_items.len() as i64),
370                                    );
371                                }
372                                "nodes" => {
373                                    let sub_fields = collect_fields(
374                                        &sub_f.selection_set,
375                                        &plan.fragments,
376                                        variables,
377                                        None,
378                                    )
379                                    .unwrap_or_default();
380
381                                    let mut projected_nodes = Vec::with_capacity(group_items.len());
382                                    for d in &group_items {
383                                        let node_doc =
384                                            apply_projection(d.clone(), &sub_fields, options)?;
385                                        projected_nodes.push(Value::Object(node_doc.data));
386                                    }
387                                    data.insert(alias.clone(), Value::Array(projected_nodes));
388                                }
389                                "aggregate" => {
390                                    let agg_res =
391                                        compute_aggregates(&group_items, &sub_f.selection_set);
392                                    data.insert(alias.clone(), Value::Object(agg_res));
393                                }
394                                _ => {}
395                            }
396                        }
397                    }
398                    group_docs.push(Document {
399                        _sid: format!("group:{}", key),
400                        data,
401                    });
402                }
403                docs = group_docs;
404            }
405        } else if let Some(agg) = agg_field {
406            // If aggregate is the ONLY field, we return a single summary document
407            // If there are other fields, we currently follow the behavior of adding the agg to each doc
408            // (Test expectations suggest a single doc if only agg is asked)
409            let agg_result = compute_aggregates(&docs, &agg.selection_set);
410            let alias = agg.alias.as_ref().unwrap_or(&agg.name);
411
412            if plan.projection.len() == 1 {
413                let mut data = HashMap::new();
414                data.insert(alias.clone(), Value::Object(agg_result));
415                docs = vec![Document {
416                    _sid: "aggregate".to_string(),
417                    data,
418                }];
419            } else {
420                // Add aggregate to each document
421                let mut projected = Vec::with_capacity(docs.len());
422                for mut d in docs {
423                    d.data
424                        .insert(alias.clone(), Value::Object(agg_result.clone()));
425                    projected.push(apply_projection(d, &plan.projection, options)?);
426                }
427                docs = projected;
428            }
429        } else if !plan.has_lookups {
430            let mut projected = Vec::with_capacity(docs.len());
431            for d in docs {
432                projected.push(apply_projection(d, &plan.projection, options)?);
433            }
434            docs = projected;
435        } else {
436            // Slow path for lookups
437            let mut projected = Vec::with_capacity(docs.len());
438            for d in docs {
439                let (proj_doc, _) = apply_projection_with_lookups(
440                    db,
441                    d,
442                    collection_name,
443                    &plan.projection,
444                    &plan.fragments,
445                    variables,
446                    options,
447                    &plan.lookup_dependencies,
448                )
449                .await?;
450                projected.push(proj_doc);
451            }
452            docs = projected;
453        }
454    }
455
456    Ok(ExecutionResult::Query(QueryResult {
457        collection: collection_name.clone(),
458        documents: docs,
459        total_count: None,
460        deferred_fields: vec![],
461        explain: None,
462    }))
463}
464
465fn compute_aggregates(docs: &[Document], selections: &[Selection]) -> HashMap<String, Value> {
466    let mut results = HashMap::new();
467
468    for selection in selections {
469        if let Selection::Field(f) = selection {
470            let alias = f.alias.as_ref().unwrap_or(&f.name);
471            let value = match f.name.as_str() {
472                "count" => Value::Int(docs.len() as i64),
473                "sum" => {
474                    let field =
475                        f.arguments
476                            .iter()
477                            .find(|a| a.name == "field")
478                            .and_then(|a| match &a.value {
479                                ast::Value::String(s) => Some(s),
480                                _ => None,
481                            });
482
483                    if let Some(field_name) = field {
484                        let sum: f64 = docs
485                            .iter()
486                            .filter_map(|d| d.data.get(field_name))
487                            .filter_map(|v| match v {
488                                Value::Int(i) => Some(*i as f64),
489                                Value::Float(f) => Some(*f),
490                                _ => None,
491                            })
492                            .sum();
493                        Value::Float(sum)
494                    } else {
495                        Value::Null
496                    }
497                }
498                "avg" => {
499                    let field =
500                        f.arguments
501                            .iter()
502                            .find(|a| a.name == "field")
503                            .and_then(|a| match &a.value {
504                                ast::Value::String(s) => Some(s),
505                                _ => None,
506                            });
507
508                    if let Some(field_name) = field
509                        && !docs.is_empty()
510                    {
511                        let values: Vec<f64> = docs
512                            .iter()
513                            .filter_map(|d| d.data.get(field_name))
514                            .filter_map(|v| match v {
515                                Value::Int(i) => Some(*i as f64),
516                                Value::Float(f) => Some(*f),
517                                _ => None,
518                            })
519                            .collect();
520
521                        if values.is_empty() {
522                            Value::Null
523                        } else {
524                            let sum: f64 = values.iter().sum();
525                            Value::Float(sum / values.len() as f64)
526                        }
527                    } else {
528                        Value::Null
529                    }
530                }
531                "min" => {
532                    let field =
533                        f.arguments
534                            .iter()
535                            .find(|a| a.name == "field")
536                            .and_then(|a| match &a.value {
537                                ast::Value::String(s) => Some(s),
538                                _ => None,
539                            });
540
541                    if let Some(field_name) = field
542                        && !docs.is_empty()
543                    {
544                        let min = docs
545                            .iter()
546                            .filter_map(|d| d.data.get(field_name))
547                            .filter_map(|v| match v {
548                                Value::Int(i) => Some(*i as f64),
549                                Value::Float(f) => Some(*f),
550                                _ => None,
551                            })
552                            .fold(f64::INFINITY, f64::min);
553
554                        if min == f64::INFINITY {
555                            Value::Null
556                        } else {
557                            Value::Float(min)
558                        }
559                    } else {
560                        Value::Null
561                    }
562                }
563                "max" => {
564                    let field =
565                        f.arguments
566                            .iter()
567                            .find(|a| a.name == "field")
568                            .and_then(|a| match &a.value {
569                                ast::Value::String(s) => Some(s),
570                                _ => None,
571                            });
572
573                    if let Some(field_name) = field
574                        && !docs.is_empty()
575                    {
576                        let max = docs
577                            .iter()
578                            .filter_map(|d| d.data.get(field_name))
579                            .filter_map(|v| match v {
580                                Value::Int(i) => Some(*i as f64),
581                                Value::Float(f) => Some(*f),
582                                _ => None,
583                            })
584                            .fold(f64::NEG_INFINITY, f64::max);
585
586                        if max == f64::NEG_INFINITY {
587                            Value::Null
588                        } else {
589                            Value::Float(max)
590                        }
591                    } else {
592                        Value::Null
593                    }
594                }
595                _ => Value::Null,
596            };
597            results.insert(alias.clone(), value);
598        }
599    }
600
601    results
602}
603
604fn find_indexed_equality_filter_runtime(
605    filter: &ast::Filter,
606    db: &Aurora,
607    collection: &str,
608    variables: &HashMap<String, ast::Value>,
609) -> Option<(String, ast::Value)> {
610    match filter {
611        ast::Filter::Eq(field, val) => {
612            if field == "_sid" || db.has_index(collection, field) {
613                let resolved = resolve_if_variable(val, variables);
614                return Some((field.clone(), resolved.clone()));
615            }
616        }
617        ast::Filter::And(filters) => {
618            for f in filters {
619                if let Some(res) =
620                    find_indexed_equality_filter_runtime(f, db, collection, variables)
621                {
622                    return Some(res);
623                }
624            }
625        }
626        _ => {}
627    }
628    None
629}
630
631/// Helper to flatten a selection set (processing fragments) into a list of fields
632fn collect_fields(
633    selection_set: &[Selection],
634    fragments: &HashMap<String, FragmentDef>,
635    variable_values: &HashMap<String, ast::Value>,
636    parent_type: Option<&str>,
637) -> Result<Vec<Field>> {
638    let mut fields = Vec::new();
639
640    for selection in selection_set {
641        match selection {
642            Selection::Field(field) => {
643                if should_include(&field.directives, variable_values)? {
644                    fields.push(field.clone());
645                }
646            }
647            Selection::FragmentSpread(name) => {
648                if let Some(fragment) = fragments.get(name) {
649                    let type_match = if let Some(parent) = parent_type {
650                        parent == fragment.type_condition
651                    } else {
652                        true
653                    };
654
655                    if type_match {
656                        let fragment_fields = collect_fields(
657                            &fragment.selection_set,
658                            fragments,
659                            variable_values,
660                            parent_type,
661                        )?;
662                        fields.extend(fragment_fields);
663                    }
664                }
665            }
666            Selection::InlineFragment(inline) => {
667                let type_match = if let Some(parent) = parent_type {
668                    parent == inline.type_condition
669                } else {
670                    true
671                };
672
673                if type_match {
674                    let inline_fields = collect_fields(
675                        &inline.selection_set,
676                        fragments,
677                        variable_values,
678                        parent_type,
679                    )?;
680                    fields.extend(inline_fields);
681                }
682            }
683            Selection::ComputedField(cf) => {
684                let (expr_val, _complex_expr) = match &cf.expression {
685                    ast::ComputedExpression::TemplateString(s) => (s.clone(), None),
686                    _ => ("complex".to_string(), Some(ast::Expression::Literal(ast::Value::Null))), // Handled below
687                };
688
689                let mut field = Field {
690                    alias: Some(cf.alias.clone()),
691                    name: "__compute__".to_string(),
692                    arguments: vec![ast::Argument {
693                        name: "expr".to_string(),
694                        value: ast::Value::String(expr_val),
695                    }],
696                    directives: Vec::new(),
697                    selection_set: Vec::new(),
698                    computed_expression: None,
699                };
700
701                if let ast::ComputedExpression::TemplateString(s) = &cf.expression {
702                    field.arguments[0].value = ast::Value::String(s.clone());
703                }
704                match &cf.expression {
705                    ast::ComputedExpression::TemplateString(_) => {}
706                    ast::ComputedExpression::StandardExpression(e) => {
707                        field.computed_expression = Some(e.clone());
708                    }
709                    ast::ComputedExpression::FunctionCall { name, args } => {
710                        field.computed_expression = Some(ast::Expression::FunctionCall {
711                            name: name.clone(),
712                            args: args.clone(),
713                        });
714                    }
715                    ast::ComputedExpression::PipeExpression { .. } => {
716                        // TODO: Implement Pipe to Expression conversion if needed
717                    }
718                    ast::ComputedExpression::SqlExpression(s) => {
719                        field.computed_expression = Some(ast::Expression::Literal(ast::Value::String(format!("SQL: {}", s))));
720                    }
721                    ast::ComputedExpression::AggregateFunction { .. } => {
722                        // Handled by aggregate executor
723                    }
724                }
725
726                fields.push(field);
727            }
728        }
729    }
730    Ok(fields)
731}
732
733/// Check if a field/fragment should be included based on @skip/@include
734fn should_include(
735    directives: &[ast::Directive],
736    variables: &HashMap<String, ast::Value>,
737) -> Result<bool> {
738    for dir in directives {
739        if dir.name == "skip" {
740            if let Some(arg) = dir.arguments.iter().find(|a| a.name == "if") {
741                let should_skip = resolve_boolean_arg(&arg.value, variables)?;
742                if should_skip {
743                    return Ok(false);
744                }
745            }
746        } else if dir.name == "include" {
747            if let Some(arg) = dir.arguments.iter().find(|a| a.name == "if") {
748                let should_include = resolve_boolean_arg(&arg.value, variables)?;
749                if !should_include {
750                    return Ok(false);
751                }
752            }
753        }
754    }
755    Ok(true)
756}
757
758/// Check if the current user role has permission to access a field or collection.
759/// Supports @allow(read: "ROLE", write: "ROLE"), @auth, and @require(role: "ROLE").
760fn check_permissions(
761    directives: &[ast::Directive],
762    options: &ExecutionOptions,
763    is_write: bool,
764) -> Result<()> {
765    for directive in directives {
766        match directive.name.as_str() {
767            "auth" => {
768                if options.user_role.is_none() {
769                    return Err(AqlError::new(
770                        ErrorCode::Unauthorized,
771                        "Authentication required for this operation".to_string(),
772                    ));
773                }
774            }
775            "require" | "allow" => {
776                let required_role = directive
777                    .arguments
778                    .iter()
779                    .find(|a| {
780                        if is_write {
781                            a.name == "write" || a.name == "role"
782                        } else {
783                            a.name == "read" || a.name == "role"
784                        }
785                    })
786                    .and_then(|a| {
787                        if let ast::Value::String(s) = &a.value {
788                            Some(s.as_str())
789                        } else {
790                            None
791                        }
792                    });
793
794                if let Some(role) = required_role {
795                    if options.user_role.as_deref() != Some(role) {
796                        return Err(AqlError::new(
797                            ErrorCode::Forbidden,
798                            format!("Role '{}' required for this operation", role),
799                        ));
800                    }
801                }
802            }
803            _ => {}
804        }
805    }
806    Ok(())
807}
808
809fn resolve_boolean_arg(
810    value: &ast::Value,
811    variables: &HashMap<String, ast::Value>,
812) -> Result<bool> {
813    match value {
814        ast::Value::Boolean(b) => Ok(*b),
815        ast::Value::Variable(name) => {
816            if let Some(val) = variables.get(name) {
817                match val {
818                    ast::Value::Boolean(b) => Ok(*b),
819                    _ => Err(AqlError::new(
820                        ErrorCode::TypeError,
821                        format!("Variable '{}' is not a boolean, got {:?}", name, val),
822                    )),
823                }
824            } else {
825                Err(AqlError::new(
826                    ErrorCode::UndefinedVariable,
827                    format!("Variable '{}' is not defined", name),
828                ))
829            }
830        }
831        _ => Err(AqlError::new(
832            ErrorCode::TypeError,
833            format!("Expected boolean value, got {:?}", value),
834        )),
835    }
836}
837
838/// Validate that all required variables are provided
839fn validate_required_variables(
840    variable_definitions: &[ast::VariableDefinition],
841    provided_variables: &HashMap<String, ast::Value>,
842) -> Result<()> {
843    for var_def in variable_definitions {
844        if var_def.var_type.is_required {
845            if !provided_variables.contains_key(&var_def.name) {
846                if var_def.default_value.is_none() {
847                    return Err(AqlError::new(
848                        ErrorCode::UndefinedVariable,
849                        format!(
850                            "Required variable '{}' (type: {}{}) is not provided",
851                            var_def.name,
852                            var_def.var_type.name,
853                            if var_def.var_type.is_required {
854                                "!"
855                            } else {
856                                ""
857                            }
858                        ),
859                    ));
860                }
861            }
862        }
863    }
864    Ok(())
865}
866
867/// The result of executing an AQL operation.
868///
869/// This enum encapsulates the different types of results that can be returned
870/// by the Aurora engine, such as query results, mutation effects, or schema changes.
871#[derive(Debug)]
872pub enum ExecutionResult {
873    /// Result of a `query` operation.
874    Query(QueryResult),
875    /// Result of a `mutation` operation (insert, update, delete, upsert).
876    Mutation(MutationResult),
877    /// Result of a `subscription` operation.
878    Subscription(SubscriptionResult),
879    /// A batch of multiple results.
880    Batch(Vec<ExecutionResult>),
881    /// Result of a schema definition operation.
882    Schema(SchemaResult),
883    /// Result of a database migration.
884    Migration(MigrationResult),
885}
886
887impl ExecutionResult {
888    /// Extracts the `QueryResult` if this is a `Query`, returning `None` otherwise.
889    pub fn as_query(&self) -> Option<&QueryResult> {
890        if let Self::Query(q) = self {
891            Some(q)
892        } else {
893            None
894        }
895    }
896
897    /// Consumes the result and returns the `QueryResult` if this is a `Query`.
898    pub fn into_query(self) -> Option<QueryResult> {
899        if let Self::Query(q) = self {
900            Some(q)
901        } else {
902            None
903        }
904    }
905
906    /// Extracts the `MutationResult` if this is a `Mutation`, returning `None` otherwise.
907    pub fn as_mutation(&self) -> Option<&MutationResult> {
908        if let Self::Mutation(m) = self {
909            Some(m)
910        } else {
911            None
912        }
913    }
914
915    /// Consumes the result and returns the `MutationResult` if this is a `Mutation`.
916    pub fn into_mutation(self) -> Option<MutationResult> {
917        if let Self::Mutation(m) = self {
918            Some(m)
919        } else {
920            None
921        }
922    }
923
924    /// Extracts the `SubscriptionResult` if this is a `Subscription`, returning `None` otherwise.
925    pub fn as_subscription(&self) -> Option<&SubscriptionResult> {
926        if let Self::Subscription(s) = self {
927            Some(s)
928        } else {
929            None
930        }
931    }
932
933    /// Consumes the result and returns the `SubscriptionResult` if this is a `Subscription`.
934    pub fn into_subscription(self) -> Option<SubscriptionResult> {
935        if let Self::Subscription(s) = self {
936            Some(s)
937        } else {
938            None
939        }
940    }
941
942    /// Maps the result documents into a list of user-defined structs.
943    ///
944    /// This works for `Query`, `Mutation`, and `Batch` results. The target type `T`
945    /// must implement `serde::Deserialize`.
946    pub fn bind<T: serde::de::DeserializeOwned>(self) -> Result<Vec<T>> {
947        match self {
948            Self::Query(q) => q.bind(),
949            Self::Mutation(m) => m.bind(),
950            Self::Batch(results) => {
951                let mut all = Vec::new();
952                for r in results {
953                    all.extend(r.bind()?);
954                }
955                Ok(all)
956            }
957            _ => Err(AqlError::new(
958                ErrorCode::QueryError,
959                "Cannot bind results from schema or migration operations".to_string(),
960            )),
961        }
962    }
963
964    /// Maps the first result document into a user-defined struct.
965    ///
966    /// This is an optimized version of `bind` that only processes the first document.
967    pub fn bind_first<T: serde::de::DeserializeOwned>(self) -> Result<T> {
968        match self {
969            Self::Query(q) => q.bind_first(),
970            Self::Mutation(m) => m.bind_first(),
971            Self::Batch(mut results) => {
972                if results.is_empty() {
973                    return Err(AqlError::new(
974                        ErrorCode::NotFound,
975                        "No documents found to bind".to_string(),
976                    ));
977                }
978                results.remove(0).bind_first()
979            }
980            _ => Err(AqlError::new(
981                ErrorCode::QueryError,
982                "Cannot bind results from schema or migration operations".to_string(),
983            )),
984        }
985    }
986}
987
988#[derive(Debug, Clone)]
989pub struct SchemaResult {
990    pub operation: String,
991    pub collection: String,
992    pub status: String,
993}
994
995#[derive(Debug, Clone)]
996pub struct MigrationResult {
997    pub version: String,
998    pub steps_applied: usize,
999    pub status: String,
1000}
1001
1002#[derive(Debug, Clone, Serialize)]
1003pub struct ExecutionPlan {
1004    pub operations: Vec<String>,
1005    pub estimated_cost: f64,
1006}
1007
1008/// The result of a `query` operation.
1009#[derive(Debug, Clone)]
1010pub struct QueryResult {
1011    /// The name of the collection that was queried.
1012    pub collection: String,
1013    /// The documents that matched the query.
1014    pub documents: Vec<Document>,
1015    /// The total number of documents matching the query (if calculated).
1016    pub total_count: Option<usize>,
1017    /// Fields marked `@defer` — omitted from documents, listed here.
1018    pub deferred_fields: Vec<String>,
1019    /// Explain metadata when `@explain` is present on the query.
1020    pub explain: Option<ExplainResult>,
1021}
1022
1023impl QueryResult {
1024    /// Maps the result documents into a list of user-defined structs.
1025    pub fn bind<T: serde::de::DeserializeOwned>(self) -> Result<Vec<T>> {
1026        self.documents
1027            .into_iter()
1028            .map(|d| d.bind())
1029            .collect::<Result<Vec<T>>>()
1030    }
1031
1032    /// Maps the first result document into a user-defined struct.
1033    pub fn bind_first<T: serde::de::DeserializeOwned>(self) -> Result<T> {
1034        self.documents
1035            .into_iter()
1036            .next()
1037            .ok_or_else(|| {
1038                AqlError::new(
1039                    ErrorCode::NotFound,
1040                    "No documents found to bind".to_string(),
1041                )
1042            })?
1043            .bind()
1044    }
1045}
1046
1047/// Execution plan metadata returned by `@explain`.
1048#[derive(Debug, Clone, Default)]
1049pub struct ExplainResult {
1050    /// The name of the collection that was scanned.
1051    pub collection: String,
1052    /// The number of documents scanned during execution.
1053    pub docs_scanned: usize,
1054    /// Whether a secondary index was used for the query.
1055    pub index_used: bool,
1056    /// The time taken to execute the query in milliseconds.
1057    pub elapsed_ms: u128,
1058}
1059
1060/// The result of a `mutation` operation.
1061#[derive(Debug, Clone)]
1062pub struct MutationResult {
1063    /// The type of operation performed (insert, update, delete, upsert).
1064    pub operation: String,
1065    /// The name of the collection affected by the mutation.
1066    pub collection: String,
1067    /// The number of documents affected by the operation.
1068    pub affected_count: usize,
1069    /// The documents returned by the mutation (if any).
1070    pub returned_documents: Vec<Document>,
1071}
1072
1073impl MutationResult {
1074    /// Maps the documents returned by the mutation to a list of user-defined structs.
1075    pub fn bind<T: serde::de::DeserializeOwned>(self) -> Result<Vec<T>> {
1076        self.returned_documents
1077            .into_iter()
1078            .map(|d| d.bind())
1079            .collect::<Result<Vec<T>>>()
1080    }
1081
1082    /// Maps the first document returned by the mutation to a user-defined struct.
1083    pub fn bind_first<T: serde::de::DeserializeOwned>(self) -> Result<T> {
1084        self.returned_documents
1085            .into_iter()
1086            .next()
1087            .ok_or_else(|| {
1088                AqlError::new(
1089                    ErrorCode::NotFound,
1090                    "No documents found to bind".to_string(),
1091                )
1092            })?
1093            .bind()
1094    }
1095}
1096
1097/// The result of a `subscription` operation.
1098#[derive(Debug)]
1099pub struct SubscriptionResult {
1100    /// Unique identifier for the subscription.
1101    pub subscription_id: String,
1102    /// The name of the collection being subscribed to.
1103    pub collection: String,
1104    /// The stream of change events.
1105    pub stream: Option<crate::pubsub::ChangeListener>,
1106}
1107
1108/// Execution options
1109#[derive(Debug, Clone)]
1110pub struct ExecutionOptions {
1111    pub skip_validation: bool,
1112    pub apply_projections: bool,
1113    pub debug_audit: bool,
1114    pub variables: HashMap<String, ast::Value>,
1115    /// Current user role for RBAC checks (@allow, @auth, @require)
1116    pub user_role: Option<String>,
1117}
1118
1119impl Default for ExecutionOptions {
1120    fn default() -> Self {
1121        Self {
1122            skip_validation: false,
1123            apply_projections: true,
1124            debug_audit: false,
1125            variables: HashMap::new(),
1126            user_role: None,
1127        }
1128    }
1129}
1130
1131impl ExecutionOptions {
1132    pub fn new() -> Self {
1133        Self::default()
1134    }
1135
1136    pub fn with_variables(mut self, vars: HashMap<String, ast::Value>) -> Self {
1137        self.variables = vars;
1138        self
1139    }
1140
1141    pub fn with_role(mut self, role: impl Into<String>) -> Self {
1142        self.user_role = Some(role.into());
1143        self
1144    }
1145
1146    pub fn skip_validation(mut self) -> Self {
1147        self.skip_validation = true;
1148        self
1149    }
1150}
1151
1152pub(crate) fn json_to_aql_value(v: serde_json::Value) -> ast::Value {
1153    match v {
1154        serde_json::Value::Null => ast::Value::Null,
1155        serde_json::Value::Bool(b) => ast::Value::Boolean(b),
1156        serde_json::Value::Number(n) => {
1157            if let Some(i) = n.as_i64() {
1158                ast::Value::Int(i)
1159            } else if let Some(f) = n.as_f64() {
1160                ast::Value::Float(f)
1161            } else {
1162                ast::Value::Null
1163            }
1164        }
1165        serde_json::Value::String(s) => ast::Value::String(s),
1166        serde_json::Value::Array(arr) => {
1167            ast::Value::Array(arr.into_iter().map(json_to_aql_value).collect())
1168        }
1169        serde_json::Value::Object(map) => ast::Value::Object(
1170            map.into_iter()
1171                .map(|(k, v)| (k, json_to_aql_value(v)))
1172                .collect(),
1173        ),
1174    }
1175}
1176
1177/// Execute a parsed AQL document
1178pub async fn execute_document(
1179    db: &Aurora,
1180    doc: &ast::Document,
1181    options: &ExecutionOptions,
1182) -> Result<ExecutionResult> {
1183    if doc.operations.is_empty() {
1184        return Err(AqlError::new(
1185            ErrorCode::QueryError,
1186            "No operations in document".to_string(),
1187        ));
1188    }
1189
1190    println!("execute_document: first op: {:?}", doc.operations[0]);
1191
1192    let vars: HashMap<String, ast::Value> = options.variables.clone();
1193
1194    let fragments: HashMap<String, FragmentDef> = doc
1195        .operations
1196        .iter()
1197        .filter_map(|op| {
1198            if let Operation::FragmentDefinition(frag) = op {
1199                Some((frag.name.clone(), frag.clone()))
1200            } else {
1201                None
1202            }
1203        })
1204        .collect();
1205
1206    let executable_ops: Vec<&Operation> = doc
1207        .operations
1208        .iter()
1209        .filter(|op| !matches!(op, Operation::FragmentDefinition(_)))
1210        .collect();
1211
1212    if executable_ops.is_empty() {
1213        return Err(AqlError::new(
1214            ErrorCode::QueryError,
1215            "No executable operations in document".to_string(),
1216        ));
1217    }
1218
1219    if executable_ops.len() == 1 {
1220        execute_operation(db, executable_ops[0], &vars, options, &fragments).await
1221    } else {
1222        let mut results = Vec::new();
1223        for op in executable_ops {
1224            results.push(execute_operation(db, op, &vars, options, &fragments).await?);
1225        }
1226        Ok(ExecutionResult::Batch(results))
1227    }
1228}
1229
1230async fn execute_operation(
1231    db: &Aurora,
1232    op: &Operation,
1233    vars: &HashMap<String, ast::Value>,
1234    options: &ExecutionOptions,
1235    fragments: &HashMap<String, FragmentDef>,
1236) -> Result<ExecutionResult> {
1237    match op {
1238        Operation::Query(query) => execute_query(db, query, vars, options, fragments).await,
1239        Operation::Mutation(mutation) => {
1240            execute_mutation(db, mutation, vars, options, fragments).await
1241        }
1242        Operation::Subscription(sub) => execute_subscription(db, sub, vars, options).await,
1243        Operation::Schema(schema) => execute_schema(db, schema, options).await,
1244        Operation::Migration(migration) => execute_migration(db, migration, options).await,
1245        Operation::Introspection(intro) => execute_introspection(db, intro).await,
1246        Operation::Handler(handler) => execute_handler_registration(db, handler, options).await,
1247        _ => Ok(ExecutionResult::Query(QueryResult {
1248            collection: String::new(),
1249            documents: vec![],
1250            total_count: None,
1251            deferred_fields: vec![],
1252            explain: None,
1253        })),
1254    }
1255}
1256
1257async fn execute_query(
1258    db: &Aurora,
1259    query: &ast::Query,
1260    vars: &HashMap<String, ast::Value>,
1261    options: &ExecutionOptions,
1262    fragments: &HashMap<String, FragmentDef>,
1263) -> Result<ExecutionResult> {
1264    validate_required_variables(&query.variable_definitions, vars)?;
1265    let has_explain = query.directives.iter().any(|d| d.name == "explain");
1266    let root_fields = collect_fields(&query.selection_set, fragments, vars, None)?;
1267    let mut results = Vec::new();
1268    for field in &root_fields {
1269        let sub_fields = collect_fields(&field.selection_set, fragments, vars, Some(&field.name))?;
1270        let start = std::time::Instant::now();
1271        let mut result =
1272            execute_collection_query(db, field, &sub_fields, vars, options, fragments).await?;
1273        if has_explain {
1274            let elapsed_ms = start.elapsed().as_millis();
1275            let index_used =
1276                field.arguments.iter().any(|a| a.name == "where") && !result.documents.is_empty();
1277            result.explain = Some(ExplainResult {
1278                collection: result.collection.clone(),
1279                docs_scanned: result.documents.len(),
1280                index_used,
1281                elapsed_ms,
1282            });
1283        }
1284        results.push(result);
1285    }
1286    if results.len() == 1 {
1287        Ok(ExecutionResult::Query(results.remove(0)))
1288    } else {
1289        Ok(ExecutionResult::Batch(
1290            results.into_iter().map(ExecutionResult::Query).collect(),
1291        ))
1292    }
1293}
1294
1295async fn execute_collection_query(
1296    db: &Aurora,
1297    field: &ast::Field,
1298    sub_fields: &[ast::Field],
1299    variables: &HashMap<String, ast::Value>,
1300    options: &ExecutionOptions,
1301    fragments: &HashMap<String, FragmentDef>,
1302) -> Result<QueryResult> {
1303    let collection_name = &field.name;
1304
1305    // RBAC: Check field level permissions
1306    check_permissions(&field.directives, options, false)?;
1307
1308    // ── Search args ────────────────────────────────────────────────────────
1309    // `search: { query: "...", fields: [...], fuzzy: true }` overrides normal scan.
1310    if let Some(search_arg) = field.arguments.iter().find(|a| a.name == "search") {
1311        return execute_search_query(
1312            db,
1313            collection_name,
1314            search_arg,
1315            sub_fields,
1316            field,
1317            variables,
1318            options,
1319        )
1320        .await;
1321    }
1322
1323    let filter = extract_filter_from_args(&field.arguments)?;
1324    let (limit, offset) = extract_pagination(&field.arguments);
1325    let (first, after, _last, _before) = extract_cursor_pagination(&field.arguments);
1326    let compiled_filter = if let Some(ref f) = filter {
1327        Some(compile_filter(f)?)
1328    } else {
1329        None
1330    };
1331    let vars_arc = Arc::new(variables.clone());
1332    let filter_fn = move |doc: &Document| {
1333        compiled_filter
1334            .as_ref()
1335            .map(|f| matches_filter(doc, f, &vars_arc))
1336            .unwrap_or(true)
1337    };
1338
1339    let indexed_docs = if let Some(ref f) = filter {
1340        match find_indexed_equality_filter(f, db, collection_name) {
1341            Some((field_name, val))
1342                if field_name == "_sid" || !db.is_index_building(collection_name, &field_name) =>
1343            {
1344                let db_val = aql_value_to_db_value(&val, variables)?;
1345                let ids = if field_name == "_sid" {
1346                    match &db_val {
1347                        Value::String(s) => vec![s.clone()],
1348                        Value::Uuid(u) => vec![u.to_string()],
1349                        _ => vec![],
1350                    }
1351                } else {
1352                    db.get_ids_from_index(collection_name, &field_name, &db_val)
1353                };
1354
1355                let mut docs = Vec::with_capacity(ids.len());
1356                for id in ids {
1357                    if let Some(doc) = db.get_document(collection_name, &id)? {
1358                        if filter_fn(&doc) {
1359                            docs.push(doc);
1360                        }
1361                    }
1362                }
1363                Some(docs)
1364            }
1365            // Index is building or no indexed filter found — full scan.
1366            _ => None,
1367        }
1368    } else {
1369        None
1370    };
1371
1372    let is_connection = sub_fields
1373        .iter()
1374        .any(|f| f.name == "edges" || f.name == "pageInfo");
1375
1376    let orderings = extract_order_by(&field.arguments);
1377
1378    let mut docs = if let Some(docs) = indexed_docs {
1379        docs
1380    } else {
1381        //only valid when there is no cursor (after) and no
1382        // ordering.  With ordering we must see ALL docs before we can pick the top-N;
1383        // with a cursor we don't know how far into the result set the cursor lies.
1384        let scan_limit = if after.is_some() || !orderings.is_empty() {
1385            None
1386        } else {
1387            limit.or(first).map(|l| {
1388                let base = if is_connection { l + 1 } else { l };
1389                base + offset
1390            })
1391        };
1392        db.scan_and_filter(collection_name, filter_fn, scan_limit)?
1393    };
1394
1395    // Validation args
1396    // `validate: { fieldName: { format: "email" } }` filters out non-conforming docs
1397    if let Some(validate_arg) = field.arguments.iter().find(|a| a.name == "validate") {
1398        docs.retain(|doc| doc_passes_validate_arg(doc, validate_arg));
1399    }
1400
1401    // Sort before any pagination so offset/cursor operate on ordered results.
1402    if !orderings.is_empty() {
1403        apply_ordering(&mut docs, &orderings);
1404    }
1405
1406    // Cursor pagination: skip past the `after` cursor before slicing
1407    if let Some(ref cursor) = after {
1408        if let Some(pos) = docs.iter().position(|d| &d._sid == cursor) {
1409            docs.drain(0..=pos);
1410        }
1411    }
1412
1413    if is_connection {
1414        return Ok(execute_connection(
1415            docs,
1416            sub_fields,
1417            limit.or(first),
1418            fragments,
1419            variables,
1420            options,
1421        )?);
1422    }
1423
1424    // Offset pagination: skip the first `offset` documents
1425    if offset > 0 {
1426        if offset < docs.len() {
1427            docs.drain(0..offset);
1428        } else {
1429            docs.clear();
1430        }
1431    }
1432
1433    // Apply limit after offset
1434    if let Some(l) = limit.or(first) {
1435        docs.truncate(l);
1436    }
1437
1438    // Projections & Lookups
1439    let lookup_dependencies =
1440        identify_lookup_dependencies(db, collection_name, sub_fields, variables);
1441    let has_lookups = !lookup_dependencies.is_empty();
1442
1443    let mut deferred_fields = Vec::new();
1444
1445    if options.apply_projections && !sub_fields.is_empty() {
1446        if has_lookups {
1447            let mut projected = Vec::with_capacity(docs.len());
1448            for d in docs {
1449                let (proj_doc, deferred) = apply_projection_with_lookups(
1450                    db,
1451                    d,
1452                    collection_name,
1453                    sub_fields,
1454                    fragments,
1455                    variables,
1456                    options,
1457                    &lookup_dependencies,
1458                )
1459                .await?;
1460                projected.push(proj_doc);
1461                if deferred_fields.is_empty() {
1462                    deferred_fields = deferred;
1463                }
1464            }
1465            docs = projected;
1466        } else {
1467            let mut all_deferred = Vec::new();
1468            let mut projected = Vec::with_capacity(docs.len());
1469            for d in docs {
1470                let (proj, deferred) =
1471                    apply_projection_and_defer(d, sub_fields, variables, options)?;
1472                if all_deferred.is_empty() && !deferred.is_empty() {
1473                    all_deferred = deferred;
1474                }
1475                projected.push(proj);
1476            }
1477            docs = projected;
1478            deferred_fields = all_deferred;
1479        }
1480    }
1481
1482    // ── Window functions ───────────────────────────────────────────────────
1483    for sf in sub_fields {
1484        if sf.name == "windowFunc" {
1485            let alias = sf.alias.as_ref().unwrap_or(&sf.name).clone();
1486            let wfield = arg_string(&sf.arguments, "field").unwrap_or_default();
1487            let func = arg_string(&sf.arguments, "function").unwrap_or_else(|| "avg".to_string());
1488            let wsize = arg_i64(&sf.arguments, "windowSize").unwrap_or(3) as usize;
1489            apply_window_function(&mut docs, &alias, &wfield, &func, wsize);
1490        }
1491    }
1492
1493    // ── Downsample ─────────────────────────────────────────────────────────
1494    if let Some(ds_field) = sub_fields.iter().find(|f| f.name == "downsample") {
1495        let interval =
1496            arg_string(&ds_field.arguments, "interval").unwrap_or_else(|| "1h".to_string());
1497        let aggregation =
1498            arg_string(&ds_field.arguments, "aggregation").unwrap_or_else(|| "avg".to_string());
1499        let ds_sub: Vec<String> =
1500            collect_fields(&ds_field.selection_set, fragments, variables, None)
1501                .unwrap_or_default()
1502                .iter()
1503                .map(|f| f.name.clone())
1504                .collect();
1505        docs = apply_downsample(docs, &interval, &aggregation, &ds_sub);
1506    }
1507
1508    Ok(QueryResult {
1509        collection: collection_name.clone(),
1510        documents: docs,
1511        total_count: None,
1512        deferred_fields,
1513        explain: None,
1514    })
1515}
1516
1517/// Execute a search query using SearchBuilder
1518async fn execute_search_query(
1519    db: &Aurora,
1520    collection: &str,
1521    search_arg: &ast::Argument,
1522    sub_fields: &[ast::Field],
1523    field: &ast::Field,
1524    variables: &HashMap<String, ast::Value>,
1525    options: &ExecutionOptions,
1526) -> Result<QueryResult> {
1527    // Deeply resolve variable references inside the search argument (e.g. query: $term)
1528    let resolved_search_val = resolve_ast_deep(&search_arg.value, variables);
1529    let (query_str, search_fields, fuzzy) = extract_search_params(&resolved_search_val);
1530    let (limit, _) = extract_pagination(&field.arguments);
1531
1532    let mut builder = db.search(collection).query(&query_str);
1533    if fuzzy {
1534        builder = builder.fuzzy(1);
1535    }
1536    if let Some(l) = limit {
1537        builder = builder.limit(l);
1538    }
1539
1540    let mut docs = builder
1541        .collect_with_fields(if search_fields.is_empty() {
1542            None
1543        } else {
1544            Some(&search_fields)
1545        })
1546        .await?;
1547
1548    if options.apply_projections && !sub_fields.is_empty() {
1549        let mut projected = Vec::with_capacity(docs.len());
1550        for d in docs {
1551            let (proj, _) = apply_projection_and_defer(d, sub_fields, variables, options)?;
1552            projected.push(proj);
1553        }
1554        docs = projected;
1555    }
1556
1557    Ok(QueryResult {
1558        collection: collection.to_string(),
1559        documents: docs,
1560        total_count: None,
1561        deferred_fields: vec![],
1562        explain: None,
1563    })
1564}
1565
1566fn extract_search_params(v: &ast::Value) -> (String, Vec<String>, bool) {
1567    let mut query = String::new();
1568    let mut fields = Vec::new();
1569    let mut fuzzy = false;
1570    if let ast::Value::Object(m) = v {
1571        if let Some(ast::Value::String(q)) = m.get("query") {
1572            query = q.clone();
1573        }
1574        if let Some(ast::Value::Array(arr)) = m.get("fields") {
1575            for item in arr {
1576                if let ast::Value::String(s) = item {
1577                    fields.push(s.clone());
1578                }
1579            }
1580        }
1581        if let Some(ast::Value::Boolean(b)) = m.get("fuzzy") {
1582            fuzzy = *b;
1583        }
1584    }
1585    (query, fields, fuzzy)
1586}
1587
1588fn doc_passes_validate_arg(doc: &Document, validate_arg: &ast::Argument) -> bool {
1589    if let ast::Value::Object(rules) = &validate_arg.value {
1590        for (field_name, constraints_val) in rules {
1591            if let ast::Value::Object(constraints) = constraints_val {
1592                if let Some(field_val) = doc.data.get(field_name) {
1593                    for (constraint_name, constraint_val) in constraints {
1594                        if !check_inline_constraint(field_val, constraint_name, constraint_val) {
1595                            return false;
1596                        }
1597                    }
1598                }
1599            }
1600        }
1601    }
1602    true
1603}
1604
1605fn check_inline_constraint(value: &Value, constraint: &str, constraint_val: &ast::Value) -> bool {
1606    match constraint {
1607        "format" => {
1608            if let (Value::String(s), ast::Value::String(fmt)) = (value, constraint_val) {
1609                return match fmt.as_str() {
1610                    "email" => {
1611                        s.contains('@')
1612                            && s.split('@')
1613                                .nth(1)
1614                                .map(|d| d.contains('.'))
1615                                .unwrap_or(false)
1616                    }
1617                    "url" => s.starts_with("http://") || s.starts_with("https://"),
1618                    "uuid" => uuid::Uuid::parse_str(s).is_ok(),
1619                    _ => true,
1620                };
1621            }
1622            true
1623        }
1624        "min" => {
1625            let n = match value {
1626                Value::Int(i) => *i as f64,
1627                Value::Float(f) => *f,
1628                _ => return true,
1629            };
1630            let min = match constraint_val {
1631                ast::Value::Float(f) => *f,
1632                ast::Value::Int(i) => *i as f64,
1633                _ => return true,
1634            };
1635            n >= min
1636        }
1637        "max" => {
1638            let n = match value {
1639                Value::Int(i) => *i as f64,
1640                Value::Float(f) => *f,
1641                _ => return true,
1642            };
1643            let max = match constraint_val {
1644                ast::Value::Float(f) => *f,
1645                ast::Value::Int(i) => *i as f64,
1646                _ => return true,
1647            };
1648            n <= max
1649        }
1650        "minLength" => {
1651            if let (Value::String(s), ast::Value::Int(n)) = (value, constraint_val) {
1652                return s.len() >= *n as usize;
1653            }
1654            true
1655        }
1656        "maxLength" => {
1657            if let (Value::String(s), ast::Value::Int(n)) = (value, constraint_val) {
1658                return s.len() <= *n as usize;
1659            }
1660            true
1661        }
1662        "pattern" => {
1663            if let (Value::String(s), ast::Value::String(pat)) = (value, constraint_val) {
1664                if let Ok(re) = regex::Regex::new(pat) {
1665                    return re.is_match(s);
1666                }
1667            }
1668            true
1669        }
1670        _ => true,
1671    }
1672}
1673
1674fn arg_string(args: &[ast::Argument], name: &str) -> Option<String> {
1675    args.iter().find(|a| a.name == name).and_then(|a| {
1676        if let ast::Value::String(s) = &a.value {
1677            Some(s.clone())
1678        } else {
1679            None
1680        }
1681    })
1682}
1683
1684fn arg_i64(args: &[ast::Argument], name: &str) -> Option<i64> {
1685    args.iter().find(|a| a.name == name).and_then(|a| {
1686        if let ast::Value::Int(i) = &a.value {
1687            Some(*i)
1688        } else {
1689            None
1690        }
1691    })
1692}
1693
1694fn identify_lookup_dependencies(
1695    db: &Aurora,
1696    collection: &str,
1697    fields: &[ast::Field],
1698    variables: &HashMap<String, ast::Value>,
1699) -> Vec<String> {
1700    let mut deps = Vec::new();
1701    for f in fields {
1702        let mut found = false;
1703        if let (Some(lf), Some(_c), Some(_ff)) = (
1704            f.arguments.iter().find(|a| a.name == "localField"),
1705            f.arguments.iter().find(|a| a.name == "collection"),
1706            f.arguments.iter().find(|a| a.name == "foreignField"),
1707        ) {
1708            if let ast::Value::String(s) = resolve_if_variable(&lf.value, variables) {
1709                deps.push(s.clone());
1710            }
1711            found = true;
1712        }
1713        if !found {
1714            if let Ok(col_def) = db.get_collection_definition(collection) {
1715                if let Some(f_def) = col_def.fields.get(&f.name) {
1716                    if let Some(_rel) = &f_def.relation {
1717                        deps.push(f.name.clone());
1718                    }
1719                }
1720            }
1721        }
1722    }
1723    deps
1724}
1725
1726/// Apply projection and collect @defer'd field names separately
1727fn apply_projection_and_defer(
1728    mut doc: Document,
1729    fields: &[ast::Field],
1730    vars: &HashMap<String, ast::Value>,
1731    options: &ExecutionOptions,
1732) -> Result<(Document, Vec<String>)> {
1733    if fields.is_empty() {
1734        return Ok((doc, vec![]));
1735    }
1736    let mut proj = HashMap::new();
1737    let mut deferred = Vec::new();
1738
1739    for f in fields {
1740        // RBAC: Check field level permissions
1741        check_permissions(&f.directives, options, false)?;
1742
1743        // @defer directive: omit from response, record field name
1744        if f.directives.iter().any(|d| d.name == "defer") {
1745            deferred.push(f.alias.as_ref().unwrap_or(&f.name).clone());
1746            continue;
1747        }
1748        // Computed field: name starts with __compute__ sentinel
1749        if f.name == "__compute__" {
1750            let alias = f.alias.as_deref().unwrap_or("computed");
1751            if let Some(expr_arg) = f.arguments.iter().find(|a| a.name == "expr") {
1752                // If it's a raw string, treat as template for backward compatibility
1753                if let ast::Value::String(template) = &expr_arg.value {
1754                    let result = eval_template(template, &doc.data);
1755                    proj.insert(alias.to_string(), Value::String(result));
1756                } else if let Some(expr) = f.computed_expression.as_ref() {
1757                    // Use new expression evaluator
1758                    let result = eval_expression(expr, &doc.data, vars);
1759                    proj.insert(alias.to_string(), result);
1760                }
1761            }
1762            continue;
1763        }
1764        if f.name == "id" {
1765            // User requested 'id'. Pull from data strictly.
1766            if let Some(v) = doc.data.get("id") {
1767                proj.insert(f.alias.as_ref().unwrap_or(&f.name).clone(), v.clone());
1768            } else {
1769                // Not in data, return null
1770                proj.insert(f.alias.as_ref().unwrap_or(&f.name).clone(), Value::Null);
1771            }
1772        } else if f.name == "_sid" {
1773            proj.insert(
1774                f.alias.as_ref().unwrap_or(&f.name).clone(),
1775                Value::String(doc._sid.clone()),
1776            );
1777        } else if let Some(v) = doc.data.get(&f.name) {
1778            proj.insert(f.alias.as_ref().unwrap_or(&f.name).clone(), v.clone());
1779        }
1780    }
1781    doc.data = proj;
1782    Ok((doc, deferred))
1783}
1784/// Evaluates a complex AQL expression against document data
1785fn eval_expression(
1786    expr: &ast::Expression,
1787    data: &HashMap<String, Value>,
1788    variables: &HashMap<String, ast::Value>,
1789) -> Value {
1790    match expr {
1791        ast::Expression::Literal(v) => match aql_value_to_db_value(v, variables) {
1792            Ok(dv) => dv,
1793            Err(_) => Value::Null,
1794        },
1795        ast::Expression::Variable(name) => {
1796            if let Some(v) = variables.get(name) {
1797                match aql_value_to_db_value(v, variables) {
1798                    Ok(dv) => dv,
1799                    Err(_) => Value::Null,
1800                }
1801            } else {
1802                Value::Null
1803            }
1804        }
1805        ast::Expression::FieldAccess(parts) => {
1806            let mut current = if parts[0] == "_sid" {
1807                Some(Value::String(data.get("_sid").cloned().and_then(|v| v.as_str().map(|s| s.to_string())).unwrap_or_default()))
1808            } else {
1809                data.get(&parts[0]).cloned()
1810            };
1811
1812            for part in parts.iter().skip(1) {
1813                if let Some(Value::Object(map)) = current {
1814                    current = map.get(part).cloned();
1815                } else {
1816                    current = None;
1817                    break;
1818                }
1819            }
1820            current.unwrap_or(Value::Null)
1821        }
1822        ast::Expression::Binary { op, left, right } => {
1823            let lv = eval_expression(left, data, variables);
1824            let rv = eval_expression(right, data, variables);
1825            eval_binary_op(*op, lv, rv)
1826        }
1827        ast::Expression::Unary { op, expr } => {
1828            let v = eval_expression(expr, data, variables);
1829            eval_unary_op(*op, v)
1830        }
1831        ast::Expression::Ternary {
1832            condition,
1833            then_expr,
1834            else_expr,
1835        } => {
1836            let cv = eval_expression(condition, data, variables);
1837            let is_true = match cv {
1838                Value::Bool(b) => b,
1839                Value::Null => false,
1840                Value::Int(i) => i != 0,
1841                Value::Float(f) => f != 0.0,
1842                Value::String(s) => !s.is_empty(),
1843                _ => true,
1844            };
1845            if is_true {
1846                eval_expression(then_expr, data, variables)
1847            } else {
1848                eval_expression(else_expr, data, variables)
1849            }
1850        }
1851        ast::Expression::FunctionCall { name, args } => {
1852            let evaluated_args: Vec<Value> = args
1853                .iter()
1854                .map(|a| eval_expression(a, data, variables))
1855                .collect();
1856            eval_function_call(name, evaluated_args)
1857        }
1858    }
1859}
1860
1861fn eval_binary_op(op: ast::BinaryOp, left: Value, right: Value) -> Value {
1862    match op {
1863        ast::BinaryOp::Add => match (left, right) {
1864            (Value::Int(a), Value::Int(b)) => Value::Int(a + b),
1865            (Value::Int(a), Value::Float(b)) => Value::Float(a as f64 + b),
1866            (Value::Float(a), Value::Int(b)) => Value::Float(a + b as f64),
1867            (Value::Float(a), Value::Float(b)) => Value::Float(a + b),
1868            (Value::String(mut a), Value::String(b)) => {
1869                a.push_str(&b);
1870                Value::String(a)
1871            }
1872            _ => Value::Null,
1873        },
1874        ast::BinaryOp::Sub => match (left, right) {
1875            (Value::Int(a), Value::Int(b)) => Value::Int(a - b),
1876            (Value::Float(a), Value::Float(b)) => Value::Float(a - b),
1877            _ => Value::Null,
1878        },
1879        ast::BinaryOp::Mul => match (left, right) {
1880            (Value::Int(a), Value::Int(b)) => Value::Int(a * b),
1881            (Value::Int(a), Value::Float(b)) => Value::Float(a as f64 * b),
1882            (Value::Float(a), Value::Int(b)) => Value::Float(a * b as f64),
1883            (Value::Float(a), Value::Float(b)) => Value::Float(a * b),
1884            _ => Value::Null,
1885        },
1886        ast::BinaryOp::Div => match (left, right) {
1887            (Value::Int(a), Value::Int(b)) if b != 0 => Value::Int(a / b),
1888            (Value::Float(a), Value::Float(b)) if b != 0.0 => Value::Float(a / b),
1889            _ => Value::Null,
1890        },
1891        ast::BinaryOp::Eq => Value::Bool(left == right),
1892        ast::BinaryOp::Ne => Value::Bool(left != right),
1893        ast::BinaryOp::Gt => Value::Bool(left > right),
1894        ast::BinaryOp::Gte => Value::Bool(left >= right),
1895        ast::BinaryOp::Lt => Value::Bool(left < right),
1896        ast::BinaryOp::Lte => Value::Bool(left <= right),
1897        ast::BinaryOp::And => {
1898            let lb = matches!(left, Value::Bool(true));
1899            let rb = matches!(right, Value::Bool(true));
1900            Value::Bool(lb && rb)
1901        }
1902        ast::BinaryOp::Or => {
1903            let lb = matches!(left, Value::Bool(true));
1904            let rb = matches!(right, Value::Bool(true));
1905            Value::Bool(lb || rb)
1906        }
1907        _ => Value::Null,
1908    }
1909}
1910
1911fn eval_unary_op(op: ast::UnaryOp, val: Value) -> Value {
1912    match op {
1913        ast::UnaryOp::Not => match val {
1914            Value::Bool(b) => Value::Bool(!b),
1915            Value::Null => Value::Bool(true),
1916            _ => Value::Bool(false),
1917        },
1918        ast::UnaryOp::Neg => match val {
1919            Value::Int(i) => Value::Int(-i),
1920            Value::Float(f) => Value::Float(-f),
1921            _ => Value::Null,
1922        },
1923    }
1924}
1925
1926fn eval_function_call(name: &str, args: Vec<Value>) -> Value {
1927    match name {
1928        "upper" | "uppercase" => {
1929            if let Some(Value::String(s)) = args.get(0) {
1930                Value::String(s.to_uppercase())
1931            } else {
1932                Value::Null
1933            }
1934        }
1935        "lower" | "lowercase" => {
1936            if let Some(Value::String(s)) = args.get(0) {
1937                Value::String(s.to_lowercase())
1938            } else {
1939                Value::Null
1940            }
1941        }
1942        "len" | "length" => match args.get(0) {
1943            Some(Value::String(s)) => Value::Int(s.len() as i64),
1944            Some(Value::Array(a)) => Value::Int(a.len() as i64),
1945            Some(Value::Object(o)) => Value::Int(o.len() as i64),
1946            _ => Value::Null,
1947        },
1948        _ => Value::Null,
1949    }
1950}
1951
1952/// Simple `$fieldName` or `${fieldName}` template evaluator
1953fn eval_template(template: &str, data: &HashMap<String, Value>) -> String {
1954    let mut result = template.to_string();
1955    for (k, v) in data {
1956        let p1 = format!("${{{}}}", k);
1957        let p2 = format!("${}", k);
1958        let v_str = match v {
1959            Value::String(s) => s.clone(),
1960            _ => v.to_string(),
1961        };
1962        if result.contains(&p1) {
1963            result = result.replace(&p1, &v_str);
1964        }
1965        if result.contains(&p2) {
1966            result = result.replace(&p2, &v_str);
1967        }
1968    }
1969    result
1970}
1971
1972/// Apply window function (rollingAvg, rollingSum, rollingMin, rollingMax) on the docs Vec
1973fn apply_window_function(
1974    docs: &mut Vec<Document>,
1975    alias: &str,
1976    field: &str,
1977    function: &str,
1978    window: usize,
1979) {
1980    if docs.is_empty() || window == 0 {
1981        return;
1982    }
1983    let values: Vec<Option<f64>> = docs
1984        .iter()
1985        .map(|d| match d.data.get(field) {
1986            Some(Value::Int(i)) => Some(*i as f64),
1987            Some(Value::Float(f)) => Some(*f),
1988            _ => None,
1989        })
1990        .collect();
1991
1992    for (i, doc) in docs.iter_mut().enumerate() {
1993        let start = if i + 1 >= window { i + 1 - window } else { 0 };
1994        let window_vals: Vec<f64> = values[start..=i].iter().filter_map(|v| *v).collect();
1995        if window_vals.is_empty() {
1996            continue;
1997        }
1998        let result = match function {
1999            "rollingAvg" | "avg" => window_vals.iter().sum::<f64>() / window_vals.len() as f64,
2000            "rollingSum" | "sum" => window_vals.iter().sum::<f64>(),
2001            "rollingMin" | "min" => window_vals.iter().cloned().fold(f64::INFINITY, f64::min),
2002            "rollingMax" | "max" => window_vals
2003                .iter()
2004                .cloned()
2005                .fold(f64::NEG_INFINITY, f64::max),
2006            _ => window_vals.iter().sum::<f64>() / window_vals.len() as f64,
2007        };
2008        doc.data.insert(alias.to_string(), Value::Float(result));
2009    }
2010}
2011
2012/// Downsample time-series: bucket docs by interval, aggregate value fields
2013fn apply_downsample(
2014    docs: Vec<Document>,
2015    interval: &str,
2016    aggregation: &str,
2017    value_fields: &[String],
2018) -> Vec<Document> {
2019    let interval_secs: i64 = parse_interval(interval);
2020    if interval_secs <= 0 {
2021        return docs;
2022    }
2023
2024    // Group by bucket
2025    let mut buckets: std::collections::BTreeMap<i64, Vec<Document>> =
2026        std::collections::BTreeMap::new();
2027    let mut leftover = Vec::new();
2028
2029    for doc in docs {
2030        // Try common timestamp field names
2031        let ts = ["timestamp", "ts", "created_at", "time"]
2032            .iter()
2033            .find_map(|&k| doc.data.get(k))
2034            .and_then(|v| match v {
2035                Value::String(s) => chrono::DateTime::parse_from_rfc3339(s)
2036                    .ok()
2037                    .map(|dt| dt.timestamp()),
2038                Value::Int(i) => Some(*i),
2039                _ => None,
2040            });
2041
2042        if let Some(t) = ts {
2043            let bucket = (t / interval_secs) * interval_secs;
2044            buckets.entry(bucket).or_default().push(doc);
2045        } else {
2046            leftover.push(doc);
2047        }
2048    }
2049
2050    let mut result = Vec::new();
2051    for (bucket_ts, group) in buckets {
2052        let mut data = HashMap::new();
2053        data.insert(
2054            "timestamp".to_string(),
2055            Value::String(
2056                chrono::DateTime::from_timestamp(bucket_ts, 0)
2057                    .map(|dt: chrono::DateTime<chrono::Utc>| dt.to_rfc3339())
2058                    .unwrap_or_default(),
2059            ),
2060        );
2061        data.insert("count".to_string(), Value::Int(group.len() as i64));
2062
2063        for field in value_fields {
2064            if field == "timestamp" || field == "count" {
2065                continue;
2066            }
2067            let nums: Vec<f64> = group
2068                .iter()
2069                .filter_map(|d| match d.data.get(field) {
2070                    Some(Value::Int(i)) => Some(*i as f64),
2071                    Some(Value::Float(f)) => Some(*f),
2072                    _ => None,
2073                })
2074                .collect();
2075            if nums.is_empty() {
2076                continue;
2077            }
2078            let agg = match aggregation {
2079                "sum" => nums.iter().sum::<f64>(),
2080                "min" => nums.iter().cloned().fold(f64::INFINITY, f64::min),
2081                "max" => nums.iter().cloned().fold(f64::NEG_INFINITY, f64::max),
2082                "count" => nums.len() as f64,
2083                _ => nums.iter().sum::<f64>() / nums.len() as f64, // avg default
2084            };
2085            data.insert(field.clone(), Value::Float(agg));
2086        }
2087
2088        result.push(Document {
2089            _sid: bucket_ts.to_string(),
2090            data,
2091        });
2092    }
2093
2094    result.extend(leftover);
2095    result
2096}
2097
2098fn parse_interval(s: &str) -> i64 {
2099    let s = s.trim();
2100    if s.ends_with('s') {
2101        s[..s.len() - 1].parse().unwrap_or(0)
2102    } else if s.ends_with('m') {
2103        s[..s.len() - 1].parse::<i64>().unwrap_or(0) * 60
2104    } else if s.ends_with('h') {
2105        s[..s.len() - 1].parse::<i64>().unwrap_or(0) * 3600
2106    } else if s.ends_with('d') {
2107        s[..s.len() - 1].parse::<i64>().unwrap_or(0) * 86400
2108    } else {
2109        s.parse().unwrap_or(3600)
2110    }
2111}
2112
2113/// Register an event handler. Spawns a background task that fires a mutation
2114/// each time the trigger event is received on the relevant collection.
2115async fn execute_handler_registration(
2116    db: &Aurora,
2117    handler: &ast::HandlerDef,
2118    _options: &ExecutionOptions,
2119) -> Result<ExecutionResult> {
2120    use crate::pubsub::events::ChangeType;
2121
2122    let collection = match &handler.trigger {
2123        ast::HandlerTrigger::Insert { collection }
2124        | ast::HandlerTrigger::Update { collection }
2125        | ast::HandlerTrigger::Delete { collection } => {
2126            collection.as_deref().unwrap_or("*").to_string()
2127        }
2128        _ => "*".to_string(),
2129    };
2130
2131    let trigger_type = match &handler.trigger {
2132        ast::HandlerTrigger::Insert { .. } => Some(ChangeType::Insert),
2133        ast::HandlerTrigger::Update { .. } => Some(ChangeType::Update),
2134        ast::HandlerTrigger::Delete { .. } => Some(ChangeType::Delete),
2135        _ => None,
2136    };
2137
2138    let mut listener = if collection == "*" {
2139        db.pubsub.listen_all()
2140    } else {
2141        db.pubsub.listen(collection.clone())
2142    };
2143
2144    let db_clone = db.clone();
2145    let action = handler.action.clone();
2146    let handler_name = handler.name.clone();
2147
2148    tokio::spawn(async move {
2149        loop {
2150            match listener.recv().await {
2151                Ok(event) => {
2152                    // Check event type matches trigger
2153                    let matches = trigger_type
2154                        .as_ref()
2155                        .map(|t| &event.change_type == t)
2156                        .unwrap_or(true);
2157                    if !matches {
2158                        continue;
2159                    }
2160
2161                    // Build context from the triggering event.
2162                    // Always set $_id from event._sid — for deletes, event.document is None
2163                    // but the document ID is still in event._sid.
2164                    let mut vars = HashMap::new();
2165                    vars.insert("_id".to_string(), ast::Value::String(event._sid.clone()));
2166                    if let Some(doc) = &event.document {
2167                        // Overlay field variables from the document data (insert/update)
2168                        for (k, v) in &doc.data {
2169                            vars.insert(format!("_{}", k), db_value_to_ast_value(v));
2170                        }
2171                    }
2172
2173                    let _ = execute_mutation_op(
2174                        &db_clone,
2175                        &action,
2176                        &vars,
2177                        &HashMap::new(),
2178                        &ExecutionOptions::default(),
2179                        &HashMap::new(),
2180                    )
2181                    .await;
2182                }
2183                Err(_) => {
2184                    eprintln!("[handler:{}] channel closed, stopping", handler_name);
2185                    break;
2186                }
2187            }
2188        }
2189    });
2190
2191    eprintln!(
2192        "[handler] '{}' registered on '{}'",
2193        handler.name, collection
2194    );
2195
2196    let mut data = HashMap::new();
2197    data.insert("name".to_string(), Value::String(handler.name.clone()));
2198    data.insert("collection".to_string(), Value::String(collection));
2199    data.insert(
2200        "status".to_string(),
2201        Value::String("registered".to_string()),
2202    );
2203
2204    Ok(ExecutionResult::Query(QueryResult {
2205        collection: "__handler".to_string(),
2206        documents: vec![Document {
2207            _sid: handler.name.clone(),
2208            data,
2209        }],
2210        total_count: Some(1),
2211        deferred_fields: vec![],
2212        explain: None,
2213    }))
2214}
2215
2216fn db_value_to_ast_value(v: &Value) -> ast::Value {
2217    match v {
2218        Value::Null => ast::Value::Null,
2219        Value::Bool(b) => ast::Value::Boolean(*b),
2220        Value::Int(i) => ast::Value::Int(*i),
2221        Value::Float(f) => ast::Value::Float(*f),
2222        Value::String(s) => ast::Value::String(s.clone()),
2223        Value::Uuid(u) => ast::Value::String(u.to_string()),
2224        Value::DateTime(dt) => ast::Value::String(dt.to_rfc3339()),
2225        Value::Array(arr) => ast::Value::Array(arr.iter().map(db_value_to_ast_value).collect()),
2226        Value::Object(m) => ast::Value::Object(
2227            m.iter()
2228                .map(|(k, v)| (k.clone(), db_value_to_ast_value(v)))
2229                .collect(),
2230        ),
2231    }
2232}
2233
2234async fn execute_mutation(
2235    db: &Aurora,
2236    mutation: &ast::Mutation,
2237    vars: &HashMap<String, ast::Value>,
2238    options: &ExecutionOptions,
2239    fragments: &HashMap<String, FragmentDef>,
2240) -> Result<ExecutionResult> {
2241    use crate::transaction::ACTIVE_TRANSACTION_ID;
2242
2243    validate_required_variables(&mutation.variable_definitions, vars)?;
2244
2245    // RBAC: Check mutation level permissions
2246    check_permissions(&mutation.directives, options, true)?;
2247
2248    // If there's already an active transaction in scope, run ops directly inside
2249    // it — the caller owns commit/rollback. Creating a nested transaction would
2250    // shadow the outer ACTIVE_TRANSACTION_ID and prevent the outer rollback from
2251    // undoing these writes.
2252    let already_in_tx = ACTIVE_TRANSACTION_ID
2253        .try_with(|id| *id)
2254        .ok()
2255        .and_then(|id| db.transaction_manager.active_transactions.get(&id))
2256        .is_some();
2257
2258    if already_in_tx {
2259        let mut results = Vec::new();
2260        let mut context = HashMap::new();
2261        for mut_op in &mutation.operations {
2262            let res = execute_mutation_op(db, mut_op, vars, &context, options, fragments).await?;
2263            if let Some(alias) = &mut_op.alias {
2264                if let Some(doc) = res.returned_documents.first() {
2265                    let mut m = serde_json::Map::new();
2266                    for (k, v) in &doc.data {
2267                        m.insert(k.clone(), aurora_value_to_json_value(v));
2268                    }
2269                    m.insert("id".to_string(), JsonValue::String(doc._sid.clone()));
2270                    context.insert(alias.clone(), JsonValue::Object(m));
2271                }
2272            }
2273            results.push(res);
2274        }
2275        return if results.len() == 1 {
2276            Ok(ExecutionResult::Mutation(results.remove(0)))
2277        } else {
2278            Ok(ExecutionResult::Batch(
2279                results.into_iter().map(ExecutionResult::Mutation).collect(),
2280            ))
2281        };
2282    }
2283
2284    // No active transaction — wrap the mutation block in one so a failure in any
2285    // operation rolls back all previous operations in the same block.
2286    let tx_id = db.begin_transaction().await;
2287
2288    let exec_result = ACTIVE_TRANSACTION_ID
2289        .scope(tx_id, async {
2290            let mut results = Vec::new();
2291            let mut context = HashMap::new();
2292            for mut_op in &mutation.operations {
2293                let res =
2294                    execute_mutation_op(db, mut_op, vars, &context, options, fragments).await?;
2295                if let Some(alias) = &mut_op.alias {
2296                    if let Some(doc) = res.returned_documents.first() {
2297                        let mut m = serde_json::Map::new();
2298                        for (k, v) in &doc.data {
2299                            m.insert(k.clone(), aurora_value_to_json_value(v));
2300                        }
2301                        m.insert("id".to_string(), JsonValue::String(doc._sid.clone()));
2302                        context.insert(alias.clone(), JsonValue::Object(m));
2303                    }
2304                }
2305                results.push(res);
2306            }
2307            Ok::<_, crate::error::AqlError>(results)
2308        })
2309        .await;
2310
2311    match exec_result {
2312        Ok(mut results) => {
2313            db.commit_transaction(tx_id).await?;
2314            if results.len() == 1 {
2315                Ok(ExecutionResult::Mutation(results.remove(0)))
2316            } else {
2317                Ok(ExecutionResult::Batch(
2318                    results.into_iter().map(ExecutionResult::Mutation).collect(),
2319                ))
2320            }
2321        }
2322        Err(e) => {
2323            let _ = db.rollback_transaction(tx_id).await;
2324            Err(e)
2325        }
2326    }
2327}
2328
2329fn execute_mutation_op<'a>(
2330    db: &'a Aurora,
2331    mut_op: &'a ast::MutationOperation,
2332    variables: &'a HashMap<String, ast::Value>,
2333    context: &'a ExecutionContext,
2334    options: &'a ExecutionOptions,
2335    fragments: &'a HashMap<String, FragmentDef>,
2336) -> futures::future::BoxFuture<'a, Result<MutationResult>> {
2337    use futures::future::FutureExt;
2338    async move {
2339        // RBAC: Check mutation level permissions
2340        check_permissions(&mut_op.directives, options, true)?;
2341
2342        match &mut_op.operation {
2343            MutationOp::Insert { collection, data } => {
2344                let resolved = resolve_value(data, variables, context);
2345                let doc = db
2346                    .aql_insert(collection, aql_value_to_hashmap(&resolved, variables)?)
2347                    .await?;
2348                let returned = if !mut_op.selection_set.is_empty() && options.apply_projections {
2349                    let fields = collect_fields(
2350                        &mut_op.selection_set,
2351                        fragments,
2352                        variables,
2353                        Some(collection),
2354                    )
2355                    .unwrap_or_default();
2356                    vec![apply_projection(doc, &fields, options)?]
2357                } else {
2358                    vec![doc]
2359                };
2360
2361                Ok(MutationResult {
2362                    operation: "insert".to_string(),
2363                    collection: collection.clone(),
2364                    affected_count: 1,
2365                    returned_documents: returned,
2366                })
2367            }
2368            MutationOp::Update {
2369                collection,
2370                filter,
2371                data,
2372            } => {
2373                let cf = if let Some(f) = filter {
2374                    Some(compile_filter(f)?)
2375                } else {
2376                    None
2377                };
2378                let update_data =
2379                    aql_value_to_hashmap(&resolve_value(data, variables, context), variables)?;
2380
2381                // Scan and update matching documents
2382                let mut affected = 0;
2383                let mut returned = Vec::new();
2384
2385                // For now, we use a simple scan-and-update.
2386                // In production, we'd use indices if the filter allows.
2387                let vars_arc = Arc::new(variables.clone());
2388                let cf_arc = cf.map(Arc::new);
2389
2390                let matches = db.scan_and_filter(
2391                    collection,
2392                    |doc| {
2393                        if let Some(ref filter) = cf_arc {
2394                            matches_filter(doc, filter, &vars_arc)
2395                        } else {
2396                            true
2397                        }
2398                    },
2399                    None,
2400                )?;
2401
2402                let fields = if !mut_op.selection_set.is_empty() {
2403                    Some(
2404                        collect_fields(
2405                            &mut_op.selection_set,
2406                            fragments,
2407                            variables,
2408                            Some(collection),
2409                        )
2410                        .unwrap_or_default(),
2411                    )
2412                } else {
2413                    None
2414                };
2415
2416                for doc in matches {
2417                    let mut new_data = doc.data.clone();
2418                    for (k, v) in &update_data {
2419                        let applied = apply_field_modifier(new_data.get(k), v);
2420                        new_data.insert(k.clone(), applied);
2421                    }
2422
2423                    let updated_doc = db
2424                        .aql_update_document(collection, &doc._sid, new_data)
2425                        .await?;
2426
2427                    affected += 1;
2428                    if let Some(ref f) = fields {
2429                        returned.push(apply_projection(updated_doc, f, options)?);
2430                    }
2431                }
2432
2433                Ok(MutationResult {
2434                    operation: "update".to_string(),
2435                    collection: collection.clone(),
2436                    affected_count: affected,
2437                    returned_documents: returned,
2438                })
2439            }
2440            MutationOp::Delete { collection, filter } => {
2441                let cf = if let Some(f) = filter {
2442                    Some(compile_filter(f)?)
2443                } else {
2444                    None
2445                };
2446
2447                let mut affected = 0;
2448                let vars_arc = Arc::new(variables.clone());
2449                let cf_arc = cf.map(Arc::new);
2450
2451                let matches = db.scan_and_filter(
2452                    collection,
2453                    |doc| {
2454                        if let Some(ref filter) = cf_arc {
2455                            matches_filter(doc, filter, &vars_arc)
2456                        } else {
2457                            true
2458                        }
2459                    },
2460                    None,
2461                )?;
2462
2463                for doc in matches {
2464                    db.aql_delete_document(collection, &doc._sid).await?;
2465                    affected += 1;
2466                }
2467
2468                Ok(MutationResult {
2469                    operation: "delete".to_string(),
2470                    collection: collection.clone(),
2471                    affected_count: affected,
2472                    returned_documents: vec![],
2473                })
2474            }
2475            MutationOp::InsertMany { collection, data } => {
2476                let resolved = resolve_value(data, variables, context);
2477                let items = match resolved {
2478                    ast::Value::Array(arr) => arr,
2479                    _ => {
2480                        return Err(AqlError::new(
2481                            ErrorCode::QueryError,
2482                            "insertMany data must be an array".to_string(),
2483                        ));
2484                    }
2485                };
2486
2487                let mut affected = 0;
2488                let mut returned = Vec::new();
2489                for item in items {
2490                    let doc = db
2491                        .aql_insert(collection, aql_value_to_hashmap(&item, variables)?)
2492                        .await?;
2493                    affected += 1;
2494                    if !mut_op.selection_set.is_empty() && options.apply_projections {
2495                        let fields = collect_fields(
2496                            &mut_op.selection_set,
2497                            fragments,
2498                            variables,
2499                            Some(collection),
2500                        )
2501                        .unwrap_or_default();
2502                        returned.push(apply_projection(doc, &fields, options)?);
2503                    } else {
2504                        returned.push(doc);
2505                    }
2506                }
2507                Ok(MutationResult {
2508                    operation: "insertMany".to_string(),
2509                    collection: collection.clone(),
2510                    affected_count: affected,
2511                    returned_documents: returned,
2512                })
2513            }
2514            MutationOp::Upsert {
2515                collection,
2516                filter,
2517                data,
2518            } => {
2519                let update_data =
2520                    aql_value_to_hashmap(&resolve_value(data, variables, context), variables)?;
2521                let cf = if let Some(f) = filter {
2522                    Some(compile_filter(f)?)
2523                } else {
2524                    None
2525                };
2526                let vars_arc = Arc::new(variables.clone());
2527                let cf_arc = cf.map(Arc::new);
2528                let matches = db.scan_and_filter(
2529                    collection,
2530                    |doc| {
2531                        if let Some(ref filter) = cf_arc {
2532                            matches_filter(doc, filter, &vars_arc)
2533                        } else {
2534                            true
2535                        }
2536                    },
2537                    Some(1),
2538                )?;
2539                let doc = if let Some(existing) = matches.into_iter().next() {
2540                    db.aql_update_document(collection, &existing._sid, update_data)
2541                        .await?
2542                } else {
2543                    db.aql_insert(collection, update_data).await?
2544                };
2545
2546                let returned = if !mut_op.selection_set.is_empty() && options.apply_projections {
2547                    let fields = collect_fields(
2548                        &mut_op.selection_set,
2549                        fragments,
2550                        variables,
2551                        Some(collection),
2552                    )
2553                    .unwrap_or_default();
2554                    vec![apply_projection(doc, &fields, options)?]
2555                } else {
2556                    vec![doc]
2557                };
2558
2559                Ok(MutationResult {
2560                    operation: "upsert".to_string(),
2561                    collection: collection.clone(),
2562                    affected_count: 1,
2563                    returned_documents: returned,
2564                })
2565            }
2566            MutationOp::Transaction { operations } => {
2567                let mut all_returned = Vec::new();
2568                let mut total_affected = 0;
2569                for op in operations {
2570                    let res =
2571                        execute_mutation_op(db, op, variables, context, options, fragments).await?;
2572                    total_affected += res.affected_count;
2573                    all_returned.extend(res.returned_documents);
2574                }
2575                Ok(MutationResult {
2576                    operation: "transaction".to_string(),
2577                    collection: String::new(),
2578                    affected_count: total_affected,
2579                    returned_documents: all_returned,
2580                })
2581            }
2582            MutationOp::EnqueueJobs {
2583                job_type,
2584                payloads,
2585                priority,
2586                max_retries,
2587            } => {
2588                let workers = db.workers.as_ref().ok_or_else(|| {
2589                    AqlError::new(
2590                        ErrorCode::QueryError,
2591                        "Worker system not initialised".to_string(),
2592                    )
2593                })?;
2594                let job_priority = match priority {
2595                    ast::JobPriority::Low => crate::workers::JobPriority::Low,
2596                    ast::JobPriority::Normal => crate::workers::JobPriority::Normal,
2597                    ast::JobPriority::High => crate::workers::JobPriority::High,
2598                    ast::JobPriority::Critical => crate::workers::JobPriority::Critical,
2599                };
2600                let mut returned = Vec::new();
2601                for payload in payloads {
2602                    let resolved = resolve_value(payload, variables, context);
2603                    let json_payload: std::collections::HashMap<String, serde_json::Value> =
2604                        if let ast::Value::Object(map) = &resolved {
2605                            map.iter()
2606                                .map(|(k, v)| (k.clone(), aql_value_to_json(v)))
2607                                .collect()
2608                        } else {
2609                            std::collections::HashMap::new()
2610                        };
2611                    let mut job =
2612                        crate::workers::Job::new(job_type.clone()).with_priority(job_priority);
2613                    for (k, v) in json_payload {
2614                        job = job.add_field(k, v);
2615                    }
2616                    if let Some(retries) = max_retries {
2617                        job = job.with_max_retries(*retries);
2618                    }
2619                    let job_id = workers.enqueue(job).await?;
2620                    let mut doc = crate::types::Document::new();
2621                    doc._sid = job_id.clone();
2622                    doc.data.insert("job_id".to_string(), Value::String(job_id));
2623                    doc.data
2624                        .insert("job_type".to_string(), Value::String(job_type.clone()));
2625                    doc.data
2626                        .insert("status".to_string(), Value::String("pending".to_string()));
2627                    returned.push(doc);
2628                }
2629                let count = returned.len();
2630                Ok(MutationResult {
2631                    operation: "enqueueJobs".to_string(),
2632                    collection: "__jobs".to_string(),
2633                    affected_count: count,
2634                    returned_documents: returned,
2635                })
2636            }
2637            MutationOp::Import { collection, data } => {
2638                let mut affected = 0;
2639                let mut returned = Vec::new();
2640                let fields = if !mut_op.selection_set.is_empty() {
2641                    Some(
2642                        collect_fields(
2643                            &mut_op.selection_set,
2644                            fragments,
2645                            variables,
2646                            Some(collection),
2647                        )
2648                        .unwrap_or_default(),
2649                    )
2650                } else {
2651                    None
2652                };
2653                for item in data {
2654                    let resolved = resolve_value(item, variables, context);
2655                    let map = aql_value_to_hashmap(&resolved, variables)?;
2656                    let doc = db.aql_insert(collection, map).await?;
2657                    affected += 1;
2658                    if let Some(ref f) = fields {
2659                        returned.push(apply_projection(doc, f, options)?);
2660                    }
2661                }
2662                Ok(MutationResult {
2663                    operation: "import".to_string(),
2664                    collection: collection.clone(),
2665                    affected_count: affected,
2666                    returned_documents: returned,
2667                })
2668            }
2669            MutationOp::Export {
2670                collection,
2671                format: _,
2672            } => {
2673                let docs = db.scan_and_filter(collection, |_| true, None)?;
2674                let fields = if !mut_op.selection_set.is_empty() {
2675                    Some(
2676                        collect_fields(
2677                            &mut_op.selection_set,
2678                            fragments,
2679                            variables,
2680                            Some(collection),
2681                        )
2682                        .unwrap_or_default(),
2683                    )
2684                } else {
2685                    None
2686                };
2687                let mut returned = Vec::with_capacity(docs.len());
2688                if let Some(ref f) = fields {
2689                    for d in docs {
2690                        returned.push(apply_projection(d, f, options)?);
2691                    }
2692                } else {
2693                    returned = docs;
2694                }
2695                let count = returned.len();
2696                Ok(MutationResult {
2697                    operation: "export".to_string(),
2698                    collection: collection.clone(),
2699                    affected_count: count,
2700                    returned_documents: returned,
2701                })
2702            }
2703            MutationOp::EnqueueJob {
2704                job_type,
2705                payload,
2706                priority,
2707                scheduled_at,
2708                max_retries,
2709            } => {
2710                let workers = db.workers.as_ref().ok_or_else(|| {
2711                    AqlError::new(
2712                        ErrorCode::QueryError,
2713                        "Worker system not initialised".to_string(),
2714                    )
2715                })?;
2716
2717                // Convert AQL priority to Job priority
2718                let job_priority = match priority {
2719                    ast::JobPriority::Low => crate::workers::JobPriority::Low,
2720                    ast::JobPriority::Normal => crate::workers::JobPriority::Normal,
2721                    ast::JobPriority::High => crate::workers::JobPriority::High,
2722                    ast::JobPriority::Critical => crate::workers::JobPriority::Critical,
2723                };
2724
2725                // Convert AQL Value payload to serde_json payload
2726                let resolved = resolve_value(payload, variables, context);
2727                let json_payload: std::collections::HashMap<String, serde_json::Value> =
2728                    if let ast::Value::Object(map) = &resolved {
2729                        map.iter()
2730                            .map(|(k, v)| (k.clone(), aql_value_to_json(v)))
2731                            .collect()
2732                    } else {
2733                        std::collections::HashMap::new()
2734                    };
2735
2736                let mut job =
2737                    crate::workers::Job::new(job_type.clone()).with_priority(job_priority);
2738
2739                for (k, v) in json_payload {
2740                    job = job.add_field(k, v);
2741                }
2742
2743                if let Some(retries) = max_retries {
2744                    job = job.with_max_retries(*retries);
2745                }
2746
2747                if let Some(scheduled) = scheduled_at {
2748                    if let Ok(dt) = scheduled.parse::<chrono::DateTime<chrono::Utc>>() {
2749                        job = job.scheduled_at(dt);
2750                    }
2751                }
2752
2753                let job_id = workers.enqueue(job).await?;
2754
2755                let mut doc = crate::types::Document::new();
2756                doc._sid = job_id.clone();
2757                doc.data
2758                    .insert("job_id".to_string(), crate::types::Value::String(job_id));
2759                doc.data.insert(
2760                    "job_type".to_string(),
2761                    crate::types::Value::String(job_type.clone()),
2762                );
2763                doc.data.insert(
2764                    "status".to_string(),
2765                    crate::types::Value::String("pending".to_string()),
2766                );
2767
2768                Ok(MutationResult {
2769                    operation: "enqueueJob".to_string(),
2770                    collection: "__jobs".to_string(),
2771                    affected_count: 1,
2772                    returned_documents: vec![doc],
2773                })
2774            }
2775        }
2776    }
2777    .boxed()
2778}
2779
2780async fn execute_subscription(
2781    db: &Aurora,
2782    sub: &ast::Subscription,
2783    vars: &HashMap<String, ast::Value>,
2784    _options: &ExecutionOptions,
2785) -> Result<ExecutionResult> {
2786    let vars: HashMap<String, ast::Value> = vars.clone();
2787
2788    let selection = sub.selection_set.first().ok_or_else(|| {
2789        AqlError::new(
2790            ErrorCode::QueryError,
2791            "Subscription must have a selection".to_string(),
2792        )
2793    })?;
2794
2795    if let Selection::Field(f) = selection {
2796        let collection = f.name.clone();
2797        let filter = extract_filter_from_args(&f.arguments)?;
2798
2799        let mut listener = db.pubsub.listen(&collection);
2800
2801        if let Some(f) = filter {
2802            let event_filter = ast_filter_to_event_filter(&f, &vars)?;
2803            listener = listener.filter(event_filter);
2804        }
2805
2806        Ok(ExecutionResult::Subscription(SubscriptionResult {
2807            subscription_id: uuid::Uuid::new_v4().to_string(),
2808            collection,
2809            stream: Some(listener),
2810        }))
2811    } else {
2812        Err(AqlError::new(
2813            ErrorCode::QueryError,
2814            "Invalid subscription selection".to_string(),
2815        ))
2816    }
2817}
2818
2819fn ast_filter_to_event_filter(
2820    filter: &AqlFilter,
2821    vars: &HashMap<String, ast::Value>,
2822) -> Result<crate::pubsub::EventFilter> {
2823    use crate::pubsub::EventFilter;
2824
2825    match filter {
2826        AqlFilter::Eq(f, v) => Ok(EventFilter::FieldEquals(
2827            f.clone(),
2828            aql_value_to_db_value(v, vars)?,
2829        )),
2830        AqlFilter::Ne(f, v) => Ok(EventFilter::Ne(f.clone(), aql_value_to_db_value(v, vars)?)),
2831        AqlFilter::Gt(f, v) => Ok(EventFilter::Gt(f.clone(), aql_value_to_db_value(v, vars)?)),
2832        AqlFilter::Gte(f, v) => Ok(EventFilter::Gte(f.clone(), aql_value_to_db_value(v, vars)?)),
2833        AqlFilter::Lt(f, v) => Ok(EventFilter::Lt(f.clone(), aql_value_to_db_value(v, vars)?)),
2834        AqlFilter::Lte(f, v) => Ok(EventFilter::Lte(f.clone(), aql_value_to_db_value(v, vars)?)),
2835        AqlFilter::In(f, v) => Ok(EventFilter::In(f.clone(), aql_value_to_db_value(v, vars)?)),
2836        AqlFilter::NotIn(f, v) => Ok(EventFilter::NotIn(
2837            f.clone(),
2838            aql_value_to_db_value(v, vars)?,
2839        )),
2840        AqlFilter::Contains(f, v) => Ok(EventFilter::Contains(
2841            f.clone(),
2842            aql_value_to_db_value(v, vars)?,
2843        )),
2844        // ContainsAny/ContainsAll don't have EventFilter equivalents; fall back to Contains
2845        AqlFilter::ContainsAny(f, v) | AqlFilter::ContainsAll(f, v) => Ok(EventFilter::Contains(
2846            f.clone(),
2847            aql_value_to_db_value(v, vars)?,
2848        )),
2849        AqlFilter::StartsWith(f, v) => Ok(EventFilter::StartsWith(
2850            f.clone(),
2851            aql_value_to_db_value(v, vars)?,
2852        )),
2853        AqlFilter::EndsWith(f, v) => Ok(EventFilter::EndsWith(
2854            f.clone(),
2855            aql_value_to_db_value(v, vars)?,
2856        )),
2857        AqlFilter::Matches(f, v) => {
2858            let pattern = match aql_value_to_db_value(v, vars)? {
2859                crate::types::Value::String(s) => s,
2860                other => other.to_string(),
2861            };
2862            let re = regex::Regex::new(&pattern).map_err(|e| {
2863                crate::error::AqlError::invalid_operation(format!("Invalid regex pattern: {}", e))
2864            })?;
2865            Ok(EventFilter::Matches(f.clone(), re))
2866        }
2867        AqlFilter::IsNull(f) => Ok(EventFilter::IsNull(f.clone())),
2868        AqlFilter::IsNotNull(f) => Ok(EventFilter::IsNotNull(f.clone())),
2869        AqlFilter::And(filters) => {
2870            let mut mapped = Vec::new();
2871            for f in filters {
2872                mapped.push(ast_filter_to_event_filter(f, vars)?);
2873            }
2874            Ok(EventFilter::And(mapped))
2875        }
2876        AqlFilter::Or(filters) => {
2877            let mut mapped = Vec::new();
2878            for f in filters {
2879                mapped.push(ast_filter_to_event_filter(f, vars)?);
2880            }
2881            Ok(EventFilter::Or(mapped))
2882        }
2883        AqlFilter::Not(f) => Ok(EventFilter::Not(Box::new(ast_filter_to_event_filter(
2884            f, vars,
2885        )?))),
2886    }
2887}
2888
2889async fn execute_introspection(
2890    db: &Aurora,
2891    intro: &ast::IntrospectionQuery,
2892) -> Result<ExecutionResult> {
2893    let names = db.list_collection_names();
2894    let want_fields = intro.fields.is_empty()
2895        || intro
2896            .fields
2897            .iter()
2898            .any(|f| f == "collections" || f == "fields");
2899
2900    let documents: Vec<Document> = names
2901        .iter()
2902        .filter_map(|name| {
2903            // Skip internal system collections from introspection unless asked
2904            if name.starts_with('_') {
2905                return None;
2906            }
2907            let col = db.get_collection_definition(name).ok()?;
2908            let mut data = HashMap::new();
2909            data.insert("name".to_string(), Value::String(name.clone()));
2910
2911            if want_fields {
2912                let field_list: Vec<Value> = col
2913                    .fields
2914                    .iter()
2915                    .map(|(fname, fdef)| {
2916                        let mut fdata = HashMap::new();
2917                        fdata.insert("name".to_string(), Value::String(fname.clone()));
2918                        fdata.insert(
2919                            "type".to_string(),
2920                            Value::String(fdef.field_type.to_string()),
2921                        );
2922                        fdata.insert("required".to_string(), Value::Bool(!fdef.nullable));
2923                        fdata.insert("indexed".to_string(), Value::Bool(fdef.indexed));
2924                        fdata.insert("unique".to_string(), Value::Bool(fdef.unique));
2925                        if !fdef.validations.is_empty() {
2926                            let vcons: Vec<Value> = fdef
2927                                .validations
2928                                .iter()
2929                                .map(|c| Value::String(format!("{:?}", c)))
2930                                .collect();
2931                            fdata.insert("validations".to_string(), Value::Array(vcons));
2932                        }
2933                        Value::Object(fdata)
2934                    })
2935                    .collect();
2936                data.insert("fields".to_string(), Value::Array(field_list));
2937            }
2938
2939            Some(Document {
2940                _sid: name.clone(),
2941                data,
2942            })
2943        })
2944        .collect();
2945
2946    let count = documents.len();
2947    Ok(ExecutionResult::Query(QueryResult {
2948        collection: "__schema".to_string(),
2949        documents,
2950        total_count: Some(count),
2951        deferred_fields: vec![],
2952        explain: None,
2953    }))
2954}
2955
2956async fn execute_schema(
2957    db: &Aurora,
2958    schema: &ast::Schema,
2959    _options: &ExecutionOptions,
2960) -> Result<ExecutionResult> {
2961    let mut last_collection = String::new();
2962
2963    for op in &schema.operations {
2964        match op {
2965            ast::SchemaOp::DefineCollection {
2966                name,
2967                fields,
2968                if_not_exists,
2969                ..
2970            } => {
2971                last_collection = name.clone();
2972
2973                // If if_not_exists is true, check if it already exists
2974                if *if_not_exists {
2975                    if db.get_collection_definition(name).is_ok() {
2976                        continue;
2977                    }
2978                }
2979
2980                let mut field_defs = Vec::new();
2981                for field in fields {
2982                    field_defs.push((field.name.as_str(), build_field_def(field)));
2983                }
2984                db.new_collection(name, field_defs).await?;
2985            }
2986            ast::SchemaOp::AlterCollection { name, actions } => {
2987                last_collection = name.clone();
2988                for action in actions {
2989                    match action {
2990                        ast::AlterAction::AddField { field, default } => {
2991                            let def = build_field_def(field);
2992                            db.add_field_to_schema(name, field.name.clone(), def)
2993                                .await?;
2994                            if let Some(default_val) = default {
2995                                let db_val = aql_value_to_db_value(default_val, &HashMap::new())?;
2996                                let docs = db.get_all_collection(name).await?;
2997                                for doc in docs {
2998                                    if !doc.data.contains_key(&field.name) {
2999                                        db.update_document(
3000                                            name,
3001                                            &doc._sid,
3002                                            vec![(&field.name, db_val.clone())],
3003                                        )
3004                                        .await?;
3005                                    }
3006                                }
3007                            }
3008                        }
3009                        ast::AlterAction::DropField(field_name) => {
3010                            db.drop_field_from_schema(name, field_name.clone()).await?;
3011                        }
3012                        ast::AlterAction::RenameField { from, to } => {
3013                            db.rename_field_in_schema(name, from.clone(), to.clone())
3014                                .await?;
3015                            let docs = db.get_all_collection(name).await?;
3016                            for mut doc in docs {
3017                                if let Some(val) = doc.data.remove(from.as_str()) {
3018                                    doc.data.insert(to.clone(), val);
3019                                    let key = format!("{}:{}", name, doc._sid);
3020                                    db.put(key, serde_json::to_vec(&doc)?, None).await?;
3021                                }
3022                            }
3023                        }
3024                        ast::AlterAction::ModifyField(field) => {
3025                            db.modify_field_in_schema(
3026                                name,
3027                                field.name.clone(),
3028                                build_field_def(field),
3029                            )
3030                            .await?;
3031                        }
3032                    }
3033                }
3034            }
3035            ast::SchemaOp::DropCollection { name, .. } => {
3036                db.drop_collection_schema(name).await?;
3037                last_collection = name.clone();
3038            }
3039        }
3040    }
3041
3042    Ok(ExecutionResult::Schema(SchemaResult {
3043        operation: "schema".to_string(),
3044        collection: last_collection,
3045        status: "done".to_string(),
3046    }))
3047}
3048
3049fn map_ast_type(anno: &ast::TypeAnnotation) -> FieldType {
3050    let scalar = match anno.name.to_lowercase().as_str() {
3051        "string" => ScalarType::String,
3052        "int" | "integer" => ScalarType::Int,
3053        "float" | "double" => ScalarType::Float,
3054        "bool" | "boolean" => ScalarType::Bool,
3055        "uuid" => ScalarType::Uuid,
3056        "object" => ScalarType::Object,
3057        "array" => ScalarType::Array,
3058        _ => ScalarType::Any,
3059    };
3060
3061    if anno.is_array {
3062        FieldType::Array(scalar)
3063    } else {
3064        match scalar {
3065            ScalarType::Object => FieldType::Object,
3066            ScalarType::Any => FieldType::Any,
3067            _ => FieldType::Scalar(scalar),
3068        }
3069    }
3070}
3071
3072/// Parse @validate directive arguments into FieldValidationConstraints
3073fn parse_validate_directive(
3074    directive: &ast::Directive,
3075) -> Vec<crate::types::FieldValidationConstraint> {
3076    use crate::types::FieldValidationConstraint as FVC;
3077    let mut constraints = Vec::new();
3078    for arg in &directive.arguments {
3079        match arg.name.as_str() {
3080            "format" => {
3081                if let ast::Value::String(s) = &arg.value {
3082                    constraints.push(FVC::Format(s.clone()));
3083                }
3084            }
3085            "min" => {
3086                let n = match &arg.value {
3087                    ast::Value::Float(f) => Some(*f),
3088                    ast::Value::Int(i) => Some(*i as f64),
3089                    _ => None,
3090                };
3091                if let Some(n) = n {
3092                    constraints.push(FVC::Min(n));
3093                }
3094            }
3095            "max" => {
3096                let n = match &arg.value {
3097                    ast::Value::Float(f) => Some(*f),
3098                    ast::Value::Int(i) => Some(*i as f64),
3099                    _ => None,
3100                };
3101                if let Some(n) = n {
3102                    constraints.push(FVC::Max(n));
3103                }
3104            }
3105            "minLength" => {
3106                if let ast::Value::Int(i) = &arg.value {
3107                    constraints.push(FVC::MinLength(*i));
3108                }
3109            }
3110            "maxLength" => {
3111                if let ast::Value::Int(i) = &arg.value {
3112                    constraints.push(FVC::MaxLength(*i));
3113                }
3114            }
3115            "pattern" => {
3116                if let ast::Value::String(s) = &arg.value {
3117                    constraints.push(FVC::Pattern(s.clone()));
3118                }
3119            }
3120            _ => {}
3121        }
3122    }
3123    constraints
3124}
3125
3126/// Build a FieldDefinition from a field's type + directives
3127fn build_field_def(field: &ast::FieldDef) -> FieldDefinition {
3128    let field_type = map_ast_type(&field.field_type);
3129    let mut indexed = false;
3130    let mut unique = false;
3131    let mut relation = None;
3132    let mut validations = Vec::new();
3133    for directive in &field.directives {
3134        match directive.name.as_str() {
3135            "indexed" | "index" => indexed = true,
3136            "unique" => {
3137                unique = true;
3138                indexed = true;
3139            }
3140            "primary" => {
3141                indexed = true;
3142                unique = true;
3143            }
3144            "relation" => {
3145                let to = directive
3146                    .arguments
3147                    .iter()
3148                    .find(|a| a.name == "to" || a.name == "collection")
3149                    .and_then(|a| {
3150                        if let ast::Value::String(s) = &a.value {
3151                            Some(s.clone())
3152                        } else {
3153                            None
3154                        }
3155                    })
3156                    .unwrap_or_default();
3157                let key = directive
3158                    .arguments
3159                    .iter()
3160                    .find(|a| a.name == "key" || a.name == "field")
3161                    .and_then(|a| {
3162                        if let ast::Value::String(s) = &a.value {
3163                            Some(s.clone())
3164                        } else {
3165                            None
3166                        }
3167                    })
3168                    .unwrap_or_else(|| "id".to_string());
3169                if !to.is_empty() {
3170                    relation = Some(crate::types::Relation { to, key });
3171                }
3172            }
3173            "validate" => validations.extend(parse_validate_directive(directive)),
3174            _ => {}
3175        }
3176    }
3177    FieldDefinition {
3178        field_type,
3179        unique,
3180        indexed,
3181        nullable: !field.field_type.is_required,
3182        validations,
3183        relation,
3184    }
3185}
3186
3187async fn execute_migration(
3188    db: &Aurora,
3189    migration: &ast::Migration,
3190    _options: &ExecutionOptions,
3191) -> Result<ExecutionResult> {
3192    let mut steps_applied = 0;
3193    let mut last_version = String::new();
3194
3195    for step in &migration.steps {
3196        last_version = step.version.clone();
3197
3198        if db.is_migration_applied(&step.version).await? {
3199            eprintln!(
3200                "[migration] version '{}' already applied — skipping",
3201                step.version
3202            );
3203            continue;
3204        }
3205
3206        eprintln!("[migration] applying version '{}'", step.version);
3207
3208        for action in &step.actions {
3209            match action {
3210                ast::MigrationAction::Schema(schema_op) => {
3211                    execute_single_schema_op(db, schema_op).await?;
3212                }
3213                ast::MigrationAction::DataMigration(dm) => {
3214                    execute_data_migration(db, dm).await?;
3215                }
3216            }
3217        }
3218
3219        db.mark_migration_applied(&step.version).await?;
3220        steps_applied += 1;
3221        eprintln!("[migration] version '{}' applied", step.version);
3222    }
3223
3224    let status = if steps_applied > 0 {
3225        "applied".to_string()
3226    } else {
3227        "skipped".to_string()
3228    };
3229
3230    Ok(ExecutionResult::Migration(MigrationResult {
3231        version: last_version,
3232        steps_applied,
3233        status,
3234    }))
3235}
3236
3237/// Execute a single SchemaOp — shared by execute_schema and execute_migration.
3238async fn execute_single_schema_op(db: &Aurora, op: &ast::SchemaOp) -> Result<()> {
3239    match op {
3240        ast::SchemaOp::DefineCollection {
3241            name,
3242            fields,
3243            if_not_exists,
3244            ..
3245        } => {
3246            if *if_not_exists && db.get_collection_definition(name).is_ok() {
3247                return Ok(());
3248            }
3249            let field_defs: Vec<(&str, FieldDefinition)> = fields
3250                .iter()
3251                .map(|f| (f.name.as_str(), build_field_def(f)))
3252                .collect();
3253            db.new_collection(name, field_defs).await?;
3254        }
3255        ast::SchemaOp::AlterCollection { name, actions } => {
3256            for action in actions {
3257                match action {
3258                    ast::AlterAction::AddField { field, default } => {
3259                        db.add_field_to_schema(name, field.name.clone(), build_field_def(field))
3260                            .await?;
3261                        if let Some(default_val) = default {
3262                            let db_val = aql_value_to_db_value(default_val, &HashMap::new())?;
3263                            let docs = db.get_all_collection(name).await?;
3264                            for doc in docs {
3265                                if !doc.data.contains_key(&field.name) {
3266                                    db.update_document(
3267                                        name,
3268                                        &doc._sid,
3269                                        vec![(&field.name, db_val.clone())],
3270                                    )
3271                                    .await?;
3272                                }
3273                            }
3274                        }
3275                    }
3276                    ast::AlterAction::DropField(field_name) => {
3277                        db.drop_field_from_schema(name, field_name.clone()).await?;
3278                    }
3279                    ast::AlterAction::RenameField { from, to } => {
3280                        db.rename_field_in_schema(name, from.clone(), to.clone())
3281                            .await?;
3282                        // Rename the field key in every existing document (one read-modify-write per doc)
3283                        let docs = db.get_all_collection(name).await?;
3284                        for mut doc in docs {
3285                            if let Some(val) = doc.data.remove(from.as_str()) {
3286                                doc.data.insert(to.clone(), val);
3287                                let key = format!("{}:{}", name, doc._sid);
3288                                db.put(key, serde_json::to_vec(&doc)?, None).await?;
3289                            }
3290                        }
3291                    }
3292                    ast::AlterAction::ModifyField(field) => {
3293                        db.modify_field_in_schema(name, field.name.clone(), build_field_def(field))
3294                            .await?;
3295                    }
3296                }
3297            }
3298        }
3299        ast::SchemaOp::DropCollection { name, if_exists } => {
3300            if *if_exists && db.get_collection_definition(name).is_err() {
3301                return Ok(());
3302            }
3303            db.drop_collection_schema(name).await?;
3304        }
3305    }
3306    Ok(())
3307}
3308
3309/// Apply a `migrate data in <collection> { set field = expr where { ... } }` block.
3310async fn execute_data_migration(db: &Aurora, dm: &ast::DataMigration) -> Result<()> {
3311    let docs = db.get_all_collection(&dm.collection).await?;
3312
3313    for doc in docs {
3314        for transform in &dm.transforms {
3315            let matches = match &transform.filter {
3316                Some(filter) => {
3317                    let compiled = compile_filter(filter)?;
3318                    matches_filter(&doc, &compiled, &HashMap::new())
3319                }
3320                None => true,
3321            };
3322
3323            if matches {
3324                let new_value = eval_migration_expr(&transform.expression, &doc);
3325                let mut updates = HashMap::new();
3326                updates.insert(transform.field.clone(), new_value);
3327                db.aql_update_document(&dm.collection, &doc._sid, updates)
3328                    .await?;
3329            }
3330        }
3331    }
3332    Ok(())
3333}
3334
3335/// Evaluate a migration expression string to a `Value`.
3336///
3337/// Supported forms (evaluated in order):
3338/// - String literal:  `"hello"` → `Value::String("hello")`
3339/// - Boolean:         `true` / `false`
3340/// - Null:            `null`
3341/// - Integer:         `42`
3342/// - Float:           `3.14`
3343/// - Field reference: `field_name` (looks up the field in the current document)
3344fn eval_migration_expr(expr: &str, doc: &Document) -> Value {
3345    let expr = expr.trim();
3346
3347    if expr.starts_with('"') && expr.ends_with('"') && expr.len() >= 2 {
3348        return Value::String(expr[1..expr.len() - 1].to_string());
3349    }
3350    if expr == "true" {
3351        return Value::Bool(true);
3352    }
3353    if expr == "false" {
3354        return Value::Bool(false);
3355    }
3356    if expr == "null" {
3357        return Value::Null;
3358    }
3359    if let Ok(n) = expr.parse::<i64>() {
3360        return Value::Int(n);
3361    }
3362    if let Ok(f) = expr.parse::<f64>() {
3363        return Value::Float(f);
3364    }
3365    if let Some(v) = doc.data.get(expr) {
3366        return v.clone();
3367    }
3368
3369    Value::Null
3370}
3371
3372fn extract_filter_from_args(args: &[ast::Argument]) -> Result<Option<AqlFilter>> {
3373    for a in args {
3374        if a.name == "where" || a.name == "filter" {
3375            return Ok(Some(value_to_filter(&a.value)?));
3376        }
3377    }
3378    Ok(None)
3379}
3380
3381fn extract_order_by(args: &[ast::Argument]) -> Vec<ast::Ordering> {
3382    let mut orderings = Vec::new();
3383    for a in args {
3384        if a.name == "orderBy" {
3385            match &a.value {
3386                ast::Value::String(f) => orderings.push(ast::Ordering {
3387                    field: f.clone(),
3388                    direction: ast::SortDirection::Asc,
3389                }),
3390                ast::Value::Object(obj) => {
3391                    // Formal syntax: { field: "fieldname", direction: ASC|DESC }
3392                    if let (Some(ast::Value::String(field_name)), Some(dir_val)) =
3393                        (obj.get("field"), obj.get("direction"))
3394                    {
3395                        let direction = match dir_val {
3396                            ast::Value::Enum(s) | ast::Value::String(s) => {
3397                                if s.to_uppercase() == "DESC" {
3398                                    ast::SortDirection::Desc
3399                                } else {
3400                                    ast::SortDirection::Asc
3401                                }
3402                            }
3403                            _ => ast::SortDirection::Asc,
3404                        };
3405                        orderings.push(ast::Ordering {
3406                            field: field_name.clone(),
3407                            direction,
3408                        });
3409                    } else {
3410                        // Shorthand: { fieldName: "ASC", fieldName2: "DESC" }
3411                        for (field, dir_val) in obj {
3412                            let direction = match dir_val {
3413                                ast::Value::Enum(s) | ast::Value::String(s) => {
3414                                    if s.to_uppercase() == "DESC" {
3415                                        ast::SortDirection::Desc
3416                                    } else {
3417                                        ast::SortDirection::Asc
3418                                    }
3419                                }
3420                                _ => ast::SortDirection::Asc,
3421                            };
3422                            orderings.push(ast::Ordering {
3423                                field: field.clone(),
3424                                direction,
3425                            });
3426                        }
3427                    }
3428                }
3429                _ => {}
3430            }
3431        }
3432    }
3433    orderings
3434}
3435
3436fn apply_ordering(docs: &mut [Document], orderings: &[ast::Ordering]) {
3437    docs.sort_by(|a, b| {
3438        for o in orderings {
3439            let cmp = compare_values(a.data.get(&o.field), b.data.get(&o.field));
3440            if cmp != std::cmp::Ordering::Equal {
3441                return match o.direction {
3442                    ast::SortDirection::Asc => cmp,
3443                    ast::SortDirection::Desc => cmp.reverse(),
3444                };
3445            }
3446        }
3447        std::cmp::Ordering::Equal
3448    });
3449}
3450
3451fn compare_values(a: Option<&Value>, b: Option<&Value>) -> std::cmp::Ordering {
3452    match (a, b) {
3453        (None, None) => std::cmp::Ordering::Equal,
3454        (None, Some(_)) => std::cmp::Ordering::Less,
3455        (Some(_), None) => std::cmp::Ordering::Greater,
3456        (Some(av), Some(bv)) => av.partial_cmp(bv).unwrap_or(std::cmp::Ordering::Equal),
3457    }
3458}
3459
3460pub fn extract_pagination(args: &[ast::Argument]) -> (Option<usize>, usize) {
3461    let (mut limit, mut offset) = (None, 0);
3462    for a in args {
3463        match a.name.as_str() {
3464            "limit" | "first" => {
3465                if let ast::Value::Int(n) = a.value {
3466                    limit = Some(n as usize);
3467                }
3468            }
3469            "offset" | "skip" => {
3470                if let ast::Value::Int(n) = a.value {
3471                    offset = n as usize;
3472                }
3473            }
3474            _ => {}
3475        }
3476    }
3477    (limit, offset)
3478}
3479
3480fn extract_cursor_pagination(
3481    args: &[ast::Argument],
3482) -> (Option<usize>, Option<String>, Option<usize>, Option<String>) {
3483    let (mut first, mut after, mut last, mut before) = (None, None, None, None);
3484    for a in args {
3485        match a.name.as_str() {
3486            "first" => {
3487                if let ast::Value::Int(n) = a.value {
3488                    first = Some(n as usize);
3489                }
3490            }
3491            "after" => {
3492                if let ast::Value::String(s) = &a.value {
3493                    after = Some(s.clone());
3494                }
3495            }
3496            "last" => {
3497                if let ast::Value::Int(n) = a.value {
3498                    last = Some(n as usize);
3499                }
3500            }
3501            "before" => {
3502                if let ast::Value::String(s) = &a.value {
3503                    before = Some(s.clone());
3504                }
3505            }
3506            _ => {}
3507        }
3508    }
3509    (first, after, last, before)
3510}
3511
3512fn execute_connection(
3513    mut docs: Vec<Document>,
3514    sub_fields: &[ast::Field],
3515    limit: Option<usize>,
3516    fragments: &HashMap<String, FragmentDef>,
3517    variables: &HashMap<String, ast::Value>,
3518    options: &ExecutionOptions,
3519) -> Result<QueryResult> {
3520    let has_next_page = if let Some(l) = limit {
3521        docs.len() > l
3522    } else {
3523        false
3524    };
3525
3526    if has_next_page {
3527        docs.truncate(limit.unwrap());
3528    }
3529
3530    let mut edges = Vec::with_capacity(docs.len());
3531    let mut end_cursor = String::new();
3532
3533    // Find the 'node' selection fields once (potentially expanding fragments)
3534    let node_fields = if let Some(edges_field) = sub_fields.iter().find(|f| f.name == "edges") {
3535        let edges_sub_fields =
3536            collect_fields(&edges_field.selection_set, fragments, variables, None)
3537                .unwrap_or_default();
3538        if let Some(node_field) = edges_sub_fields.into_iter().find(|f| f.name == "node") {
3539            collect_fields(&node_field.selection_set, fragments, variables, None)
3540                .unwrap_or_default()
3541        } else {
3542            Vec::new()
3543        }
3544    } else {
3545        Vec::new()
3546    };
3547
3548    for doc in docs {
3549        let cursor = doc._sid.clone();
3550        end_cursor = cursor.clone();
3551
3552        let mut edge_data = HashMap::new();
3553        edge_data.insert("cursor".to_string(), Value::String(cursor));
3554
3555        let node_doc = if node_fields.is_empty() {
3556            doc
3557        } else {
3558            apply_projection(doc, &node_fields, options)?
3559        };
3560
3561        edge_data.insert("node".to_string(), Value::Object(node_doc.data));
3562        edges.push(Value::Object(edge_data));
3563    }
3564
3565    let mut page_info = HashMap::new();
3566    page_info.insert("hasNextPage".to_string(), Value::Bool(has_next_page));
3567    page_info.insert("endCursor".to_string(), Value::String(end_cursor));
3568
3569    let mut conn_data = HashMap::new();
3570    conn_data.insert("edges".to_string(), Value::Array(edges));
3571    conn_data.insert("pageInfo".to_string(), Value::Object(page_info));
3572
3573    Ok(QueryResult {
3574        collection: String::new(),
3575        documents: vec![Document {
3576            _sid: "connection".to_string(),
3577            data: conn_data,
3578        }],
3579        total_count: None,
3580        deferred_fields: vec![],
3581        explain: None,
3582    })
3583}
3584
3585pub fn matches_filter(
3586    doc: &Document,
3587    filter: &CompiledFilter,
3588    vars: &HashMap<String, ast::Value>,
3589) -> bool {
3590    match filter {
3591        CompiledFilter::Eq(f, v) => {
3592            if let Some(dv) = doc.data.get(f) {
3593                values_equal(dv, v, vars)
3594            } else if f == "_sid" {
3595                values_equal(&Value::String(doc._sid.clone()), v, vars)
3596            } else {
3597                false
3598            }
3599        }
3600        CompiledFilter::Ne(f, v) => {
3601            if let Some(dv) = doc.data.get(f) {
3602                !values_equal(dv, v, vars)
3603            } else if f == "_sid" {
3604                !values_equal(&Value::String(doc._sid.clone()), v, vars)
3605            } else {
3606                true
3607            }
3608        }
3609        CompiledFilter::Gt(f, v) => {
3610            let dv_opt = doc.data.get(f).cloned().or_else(|| {
3611                if f == "_sid" {
3612                    Some(Value::String(doc._sid.clone()))
3613                } else {
3614                    None
3615                }
3616            });
3617            dv_opt.map_or(false, |dv| {
3618                if let Ok(bv) = aql_value_to_db_value(v, vars) {
3619                    return dv > bv;
3620                }
3621                false
3622            })
3623        }
3624        CompiledFilter::Gte(f, v) => {
3625            let dv_opt = doc.data.get(f).cloned().or_else(|| {
3626                if f == "_sid" {
3627                    Some(Value::String(doc._sid.clone()))
3628                } else {
3629                    None
3630                }
3631            });
3632            dv_opt.map_or(false, |dv| {
3633                if let Ok(bv) = aql_value_to_db_value(v, vars) {
3634                    return dv >= bv;
3635                }
3636                false
3637            })
3638        }
3639        CompiledFilter::Lt(f, v) => {
3640            let dv_opt = doc.data.get(f).cloned().or_else(|| {
3641                if f == "_sid" {
3642                    Some(Value::String(doc._sid.clone()))
3643                } else {
3644                    None
3645                }
3646            });
3647            dv_opt.map_or(false, |dv| {
3648                if let Ok(bv) = aql_value_to_db_value(v, vars) {
3649                    return dv < bv;
3650                }
3651                false
3652            })
3653        }
3654        CompiledFilter::Lte(f, v) => {
3655            let dv_opt = doc.data.get(f).cloned().or_else(|| {
3656                if f == "_sid" {
3657                    Some(Value::String(doc._sid.clone()))
3658                } else {
3659                    None
3660                }
3661            });
3662            dv_opt.map_or(false, |dv| {
3663                if let Ok(bv) = aql_value_to_db_value(v, vars) {
3664                    return dv <= bv;
3665                }
3666                false
3667            })
3668        }
3669        CompiledFilter::In(f, v) => {
3670            let dv_opt = doc.data.get(f).cloned().or_else(|| {
3671                if f == "_sid" {
3672                    Some(Value::String(doc._sid.clone()))
3673                } else {
3674                    None
3675                }
3676            });
3677            dv_opt.map_or(false, |dv| {
3678                if let Ok(Value::Array(arr)) = aql_value_to_db_value(v, vars) {
3679                    return arr.contains(&dv);
3680                }
3681                false
3682            })
3683        }
3684        CompiledFilter::NotIn(f, v) => {
3685            let dv_opt = doc.data.get(f).cloned().or_else(|| {
3686                if f == "_sid" {
3687                    Some(Value::String(doc._sid.clone()))
3688                } else {
3689                    None
3690                }
3691            });
3692            dv_opt.map_or(true, |dv| {
3693                if let Ok(Value::Array(arr)) = aql_value_to_db_value(v, vars) {
3694                    return !arr.contains(&dv);
3695                }
3696                true
3697            })
3698        }
3699        CompiledFilter::Contains(f, v) => {
3700            let dv_opt = doc.data.get(f).cloned().or_else(|| {
3701                if f == "_sid" {
3702                    Some(Value::String(doc._sid.clone()))
3703                } else {
3704                    None
3705                }
3706            });
3707            if let Some(dv) = dv_opt {
3708                match (dv, resolve_if_variable(v, vars)) {
3709                    (Value::String(s), ast::Value::String(sub)) => s.contains(&*sub),
3710                    (Value::Array(arr), _) => {
3711                        if let Ok(bv) = aql_value_to_db_value(v, vars) {
3712                            return arr.contains(&bv);
3713                        }
3714                        false
3715                    }
3716                    _ => false,
3717                }
3718            } else {
3719                false
3720            }
3721        }
3722        // Field array must contain at least one of the provided values
3723        CompiledFilter::ContainsAny(f, v) => {
3724            let dv_opt = doc.data.get(f).cloned().or_else(|| {
3725                if f == "_sid" {
3726                    Some(Value::String(doc._sid.clone()))
3727                } else {
3728                    None
3729                }
3730            });
3731            if let (Some(Value::Array(field_arr)), Ok(Value::Array(check_arr))) =
3732                (dv_opt, aql_value_to_db_value(v, vars))
3733            {
3734                check_arr.iter().any(|item| field_arr.contains(item))
3735            } else {
3736                false
3737            }
3738        }
3739        // Field array must contain all of the provided values
3740        CompiledFilter::ContainsAll(f, v) => {
3741            let dv_opt = doc.data.get(f).cloned().or_else(|| {
3742                if f == "_sid" {
3743                    Some(Value::String(doc._sid.clone()))
3744                } else {
3745                    None
3746                }
3747            });
3748            if let (Some(Value::Array(field_arr)), Ok(Value::Array(check_arr))) =
3749                (dv_opt, aql_value_to_db_value(v, vars))
3750            {
3751                check_arr.iter().all(|item| field_arr.contains(item))
3752            } else {
3753                false
3754            }
3755        }
3756        CompiledFilter::StartsWith(f, v) => {
3757            let dv_opt = doc.data.get(f).cloned().or_else(|| {
3758                if f == "_sid" {
3759                    Some(Value::String(doc._sid.clone()))
3760                } else {
3761                    None
3762                }
3763            });
3764            if let (Some(Value::String(s)), ast::Value::String(pre)) =
3765                (dv_opt, resolve_if_variable(v, vars))
3766            {
3767                s.starts_with(&*pre)
3768            } else {
3769                false
3770            }
3771        }
3772        CompiledFilter::EndsWith(f, v) => {
3773            let dv_opt = doc.data.get(f).cloned().or_else(|| {
3774                if f == "_sid" {
3775                    Some(Value::String(doc._sid.clone()))
3776                } else {
3777                    None
3778                }
3779            });
3780            if let (Some(Value::String(s)), ast::Value::String(suf)) =
3781                (dv_opt, resolve_if_variable(v, vars))
3782            {
3783                s.ends_with(&*suf)
3784            } else {
3785                false
3786            }
3787        }
3788        CompiledFilter::Matches(f, re) => {
3789            let dv_opt = doc.data.get(f).cloned().or_else(|| {
3790                if f == "_sid" {
3791                    Some(Value::String(doc._sid.clone()))
3792                } else {
3793                    None
3794                }
3795            });
3796            if let Some(Value::String(s)) = dv_opt {
3797                re.is_match(&s)
3798            } else {
3799                false
3800            }
3801        }
3802        CompiledFilter::IsNull(f) => {
3803            if f == "_sid" {
3804                false // _sid is never null
3805            } else {
3806                doc.data.get(f).map_or(true, |v| matches!(v, Value::Null))
3807            }
3808        }
3809        CompiledFilter::IsNotNull(f) => {
3810            if f == "_sid" {
3811                true // _sid is always non-null
3812            } else {
3813                doc.data.get(f).map_or(false, |v| !matches!(v, Value::Null))
3814            }
3815        }
3816        CompiledFilter::And(fs) => fs.iter().all(|f| matches_filter(doc, f, vars)),
3817        CompiledFilter::Or(fs) => fs.iter().any(|f| matches_filter(doc, f, vars)),
3818        CompiledFilter::Not(f) => !matches_filter(doc, f, vars),
3819    }
3820}
3821
3822/// Apply an update modifier object to an existing field value.
3823/// Modifiers: `{ increment: N }`, `{ decrement: N }`, `{ push: V }`,
3824/// `{ pull: V }`, `{ addToSet: V }`.
3825/// Any non-modifier object is returned as-is (plain field replace).
3826fn apply_field_modifier(existing: Option<&Value>, new_val: &Value) -> Value {
3827    if let Value::Object(modifier) = new_val {
3828        if let Some(delta) = modifier.get("increment") {
3829            match (existing, delta) {
3830                (Some(Value::Int(c)), Value::Int(d)) => return Value::Int(c + d),
3831                (Some(Value::Float(c)), Value::Float(d)) => return Value::Float(c + d),
3832                (Some(Value::Int(c)), Value::Float(d)) => return Value::Float(*c as f64 + d),
3833                _ => {}
3834            }
3835        }
3836        if let Some(delta) = modifier.get("decrement") {
3837            match (existing, delta) {
3838                (Some(Value::Int(c)), Value::Int(d)) => return Value::Int(c - d),
3839                (Some(Value::Float(c)), Value::Float(d)) => return Value::Float(c - d),
3840                (Some(Value::Int(c)), Value::Float(d)) => return Value::Float(*c as f64 - d),
3841                _ => {}
3842            }
3843        }
3844        if let Some(item) = modifier.get("push") {
3845            if let Some(Value::Array(mut arr)) = existing.cloned() {
3846                arr.push(item.clone());
3847                return Value::Array(arr);
3848            }
3849            return Value::Array(vec![item.clone()]);
3850        }
3851        if let Some(item) = modifier.get("pull") {
3852            if let Some(Value::Array(arr)) = existing {
3853                let filtered: Vec<Value> = arr.iter().filter(|v| *v != item).cloned().collect();
3854                return Value::Array(filtered);
3855            }
3856            return Value::Array(vec![]);
3857        }
3858        if let Some(item) = modifier.get("addToSet") {
3859            if let Some(Value::Array(mut arr)) = existing.cloned() {
3860                if !arr.contains(item) {
3861                    arr.push(item.clone());
3862                }
3863                return Value::Array(arr);
3864            }
3865            return Value::Array(vec![item.clone()]);
3866        }
3867    }
3868    new_val.clone()
3869}
3870
3871fn values_equal(dv: &Value, av: &ast::Value, vars: &HashMap<String, ast::Value>) -> bool {
3872    if let Ok(bv) = aql_value_to_db_value(av, vars) {
3873        return dv == &bv;
3874    }
3875    false
3876}
3877
3878fn resolve_if_variable<'a>(
3879    v: &'a ast::Value,
3880    vars: &'a HashMap<String, ast::Value>,
3881) -> &'a ast::Value {
3882    if let ast::Value::Variable(n) = v {
3883        vars.get(n).unwrap_or(v)
3884    } else {
3885        v
3886    }
3887}
3888
3889pub fn apply_projection(
3890    doc: Document,
3891    fields: &[ast::Field],
3892    options: &ExecutionOptions,
3893) -> Result<Document> {
3894    let (projected, _) = apply_projection_and_defer(doc, fields, &options.variables, options)?;
3895    Ok(projected)
3896}
3897
3898/// Apply projections, resolving lookup fields from foreign collections.
3899/// Returns (projected_doc, deferred_field_names).
3900async fn apply_projection_with_lookups(
3901    db: &Aurora,
3902    mut doc: Document,
3903    collection_name: &str,
3904    fields: &[ast::Field],
3905    fragments: &HashMap<String, FragmentDef>,
3906    vars: &HashMap<String, ast::Value>,
3907    options: &ExecutionOptions,
3908    pinned_fields: &[String],
3909) -> Result<(Document, Vec<String>)> {
3910    if fields.is_empty() {
3911        return Ok((doc, vec![]));
3912    }
3913    let mut proj = HashMap::new();
3914    let mut deferred = Vec::new();
3915
3916    // 1. Process all fields (including pinning dependencies)
3917    for f in fields {
3918        // RBAC Check
3919        check_permissions(&f.directives, options, false)?;
3920
3921        // @defer
3922        if f.directives.iter().any(|d| d.name == "defer") {
3923            deferred.push(f.alias.as_ref().unwrap_or(&f.name).clone());
3924            continue;
3925        }
3926
3927        let mut lookup_info: Option<(String, String, String)> = None;
3928        let coll_arg = f.arguments.iter().find(|a| a.name == "collection");
3929        let local_arg = f.arguments.iter().find(|a| a.name == "localField");
3930        let foreign_arg = f.arguments.iter().find(|a| a.name == "foreignField");
3931
3932        if let (Some(c), Some(lf), Some(ff)) = (coll_arg, local_arg, foreign_arg) {
3933            let c_val = resolve_if_variable(&c.value, vars);
3934            let lf_val = resolve_if_variable(&lf.value, vars);
3935            let ff_val = resolve_if_variable(&ff.value, vars);
3936
3937            if let (
3938                ast::Value::String(foreign_coll),
3939                ast::Value::String(local_field),
3940                ast::Value::String(foreign_field),
3941            ) = (c_val, lf_val, ff_val)
3942            {
3943                lookup_info = Some((
3944                    foreign_coll.clone(),
3945                    local_field.clone(),
3946                    foreign_field.clone(),
3947                ));
3948            }
3949        } else if let Ok(col_def) = db.get_collection_definition(collection_name) {
3950            if let Some(f_def) = col_def.fields.get(&f.name) {
3951                if let Some(rel) = &f_def.relation {
3952                    lookup_info = Some((rel.to.clone(), f.name.clone(), rel.key.clone()));
3953                }
3954            }
3955        }
3956
3957        if let Some((foreign_coll, local_field, foreign_field)) = lookup_info {
3958            let local_val = doc.data.get(local_field.as_str()).cloned().or_else(|| {
3959                if local_field == "_sid" {
3960                    Some(Value::String(doc._sid.clone()))
3961                } else {
3962                    None
3963                }
3964            });
3965
3966            if let Some(match_val) = local_val {
3967                let extra_filter = f
3968                    .arguments
3969                    .iter()
3970                    .find(|a| a.name == "where")
3971                    .and_then(|a| {
3972                        let resolved = resolve_ast_deep(&a.value, vars);
3973                        value_to_filter(&resolved).ok()
3974                    });
3975
3976                let vars_arc = Arc::new(vars.clone());
3977                let foreign_docs = db.scan_and_filter(
3978                    &foreign_coll,
3979                    |fdoc| {
3980                        let field_match = fdoc
3981                            .data
3982                            .get(foreign_field.as_str())
3983                            .map(|v| values_equal_db(v, &match_val))
3984                            .unwrap_or(
3985                                foreign_field == "_sid" && fdoc._sid == match_val.to_string(),
3986                            );
3987                        if !field_match {
3988                            return false;
3989                        }
3990                        if let Some(ref ef) = extra_filter {
3991                            let compiled = compile_filter(ef)
3992                                .unwrap_or(CompiledFilter::Eq("_".into(), ast::Value::Null));
3993                            matches_filter(fdoc, &compiled, &vars_arc)
3994                        } else {
3995                            true
3996                        }
3997                    },
3998                    None,
3999                )?;
4000
4001                let sub_projected: Vec<Value> = if f.selection_set.is_empty() {
4002                    foreign_docs
4003                        .into_iter()
4004                        .map(|fd| Value::Object(fd.data))
4005                        .collect()
4006                } else {
4007                    let sub_fields: Vec<ast::Field> =
4008                        collect_fields(&f.selection_set, fragments, vars, Some(&foreign_coll))?;
4009                    let mut sub_results = Vec::with_capacity(foreign_docs.len());
4010                    for fd in foreign_docs {
4011                        let (proj_fd, _) =
4012                            apply_projection_and_defer(fd, &sub_fields, vars, options)?;
4013                        sub_results.push(Value::Object(proj_fd.data));
4014                    }
4015                    sub_results
4016                };
4017
4018                proj.insert(f.alias.as_ref().unwrap_or(&f.name).clone(), Value::Array(sub_projected));
4019            }
4020            continue;
4021        }
4022
4023        // Computed field
4024        if f.name == "__compute__" {
4025            let alias = f.alias.as_deref().unwrap_or("computed");
4026            if let Some(expr_arg) = f.arguments.iter().find(|a| a.name == "expr") {
4027                if let ast::Value::String(template) = &expr_arg.value {
4028                    let result = eval_template(template, &doc.data);
4029                    proj.insert(alias.to_string(), Value::String(result));
4030                } else if let Some(expr) = f.computed_expression.as_ref() {
4031                    let result = eval_expression(expr, &doc.data, vars);
4032                    proj.insert(alias.to_string(), result);
4033                }
4034            }
4035            continue;
4036        }
4037
4038        // Normal field
4039        if f.name == "id" {
4040            let v = doc.data.get("id").cloned().unwrap_or(Value::String(doc._sid.clone()));
4041            proj.insert(f.alias.as_ref().unwrap_or(&f.name).clone(), v);
4042        } else if f.name == "_sid" {
4043            proj.insert(
4044                f.alias.as_ref().unwrap_or(&f.name).clone(),
4045                Value::String(doc._sid.clone()),
4046            );
4047        } else if let Some(v) = doc.data.get(&f.name) {
4048            proj.insert(f.alias.as_ref().unwrap_or(&f.name).clone(), v.clone());
4049        }
4050    }
4051
4052    // 2. Add pinned fields if missing (needed for outer lookups)
4053    for pinned in pinned_fields {
4054        if !proj.contains_key(pinned) {
4055            if let Some(v) = doc.data.get(pinned) {
4056                proj.insert(pinned.clone(), v.clone());
4057            }
4058        }
4059    }
4060
4061    doc.data = proj;
4062    Ok((doc, deferred))
4063}
4064
4065fn values_equal_db(a: &Value, b: &Value) -> bool {
4066    a == b
4067}
4068
4069pub fn aql_value_to_db_value(v: &ast::Value, vars: &HashMap<String, ast::Value>) -> Result<Value> {
4070    let resolved = resolve_if_variable(v, vars);
4071    match resolved {
4072        ast::Value::Int(i) => Ok(Value::Int(*i)),
4073        ast::Value::Float(f) => Ok(Value::Float(*f)),
4074        ast::Value::Boolean(b) => Ok(Value::Bool(*b)),
4075        ast::Value::String(s) => Ok(Value::String(s.clone())),
4076        ast::Value::Enum(s) => Ok(Value::String(s.clone())),
4077        ast::Value::Null => Ok(Value::Null),
4078        ast::Value::Variable(name) => Err(AqlError::new(
4079            ErrorCode::UndefinedVariable,
4080            format!("Variable '{}' not found", name),
4081        )),
4082        ast::Value::Array(arr) => {
4083            let mut vals = Vec::with_capacity(arr.len());
4084            for v in arr {
4085                vals.push(aql_value_to_db_value(v, vars)?);
4086            }
4087            Ok(Value::Array(vals))
4088        }
4089        ast::Value::Object(obj) => {
4090            let mut map = HashMap::with_capacity(obj.len());
4091            for (k, v) in obj {
4092                map.insert(k.clone(), aql_value_to_db_value(v, vars)?);
4093            }
4094            Ok(Value::Object(map))
4095        }
4096    }
4097}
4098
4099/// Convert AQL AST value to serde_json::Value for job payloads
4100fn aql_value_to_json(v: &ast::Value) -> serde_json::Value {
4101    match v {
4102        ast::Value::Null => serde_json::Value::Null,
4103        ast::Value::Boolean(b) => serde_json::Value::Bool(*b),
4104        ast::Value::Int(i) => serde_json::Value::Number((*i).into()),
4105        ast::Value::Float(f) => serde_json::Number::from_f64(*f)
4106            .map(serde_json::Value::Number)
4107            .unwrap_or(serde_json::Value::Null),
4108        ast::Value::String(s) | ast::Value::Enum(s) => serde_json::Value::String(s.clone()),
4109        ast::Value::Array(arr) => {
4110            serde_json::Value::Array(arr.iter().map(aql_value_to_json).collect())
4111        }
4112        ast::Value::Object(obj) => serde_json::Value::Object(
4113            obj.iter()
4114                .map(|(k, v)| (k.clone(), aql_value_to_json(v)))
4115                .collect(),
4116        ),
4117        ast::Value::Variable(_) => serde_json::Value::Null,
4118    }
4119}
4120
4121fn aql_value_to_hashmap(
4122    v: &ast::Value,
4123    vars: &HashMap<String, ast::Value>,
4124) -> Result<HashMap<String, Value>> {
4125    if let ast::Value::Object(m) = resolve_if_variable(v, vars) {
4126        let mut res = HashMap::new();
4127        for (k, v) in m {
4128            res.insert(k.clone(), aql_value_to_db_value(v, vars)?);
4129        }
4130        Ok(res)
4131    } else {
4132        Err(AqlError::new(
4133            ErrorCode::QueryError,
4134            "Data must be object".to_string(),
4135        ))
4136    }
4137}
4138
4139fn aurora_value_to_json_value(v: &Value) -> JsonValue {
4140    match v {
4141        Value::Null => JsonValue::Null,
4142        Value::String(s) => JsonValue::String(s.clone()),
4143        Value::Int(i) => JsonValue::Number((*i).into()),
4144        Value::Float(f) => serde_json::Number::from_f64(*f)
4145            .map(JsonValue::Number)
4146            .unwrap_or(JsonValue::Null),
4147        Value::Bool(b) => JsonValue::Bool(*b),
4148        Value::Array(arr) => JsonValue::Array(arr.iter().map(aurora_value_to_json_value).collect()),
4149        Value::Object(m) => {
4150            let mut jm = serde_json::Map::new();
4151            for (k, v) in m {
4152                jm.insert(k.clone(), aurora_value_to_json_value(v));
4153            }
4154            JsonValue::Object(jm)
4155        }
4156        Value::Uuid(u) => JsonValue::String(u.to_string()),
4157        Value::DateTime(dt) => JsonValue::String(dt.to_rfc3339()),
4158    }
4159}
4160
4161/// Find an equality filter on an indexed field to allow O(1) lookup short-circuit
4162fn find_indexed_equality_filter(
4163    filter: &AqlFilter,
4164    db: &Aurora,
4165    collection: &str,
4166) -> Option<(String, ast::Value)> {
4167    match filter {
4168        AqlFilter::Eq(field, val) => {
4169            if field == "_sid" || db.has_index(collection, field) {
4170                Some((field.clone(), val.clone()))
4171            } else {
4172                None
4173            }
4174        }
4175        AqlFilter::And(filters) => {
4176            for f in filters {
4177                if let Some(res) = find_indexed_equality_filter(f, db, collection) {
4178                    return Some(res);
4179                }
4180            }
4181            None
4182        }
4183        _ => None,
4184    }
4185}
4186
4187pub fn value_to_filter(v: &ast::Value) -> Result<AqlFilter> {
4188    if let ast::Value::Object(m) = v {
4189        let mut fs = Vec::new();
4190        for (k, val) in m {
4191            match k.as_str() {
4192                "or" => {
4193                    if let ast::Value::Array(arr) = val {
4194                        let mut sub = Vec::new();
4195                        for item in arr {
4196                            sub.push(value_to_filter(item)?);
4197                        }
4198                        return Ok(AqlFilter::Or(sub));
4199                    }
4200                }
4201                "and" => {
4202                    if let ast::Value::Array(arr) = val {
4203                        let mut sub = Vec::new();
4204                        for item in arr {
4205                            sub.push(value_to_filter(item)?);
4206                        }
4207                        return Ok(AqlFilter::And(sub));
4208                    }
4209                }
4210                "not" => {
4211                    return Ok(AqlFilter::Not(Box::new(value_to_filter(val)?)));
4212                }
4213                field => {
4214                    if let ast::Value::Object(ops) = val {
4215                        for (op, ov) in ops {
4216                            match op.as_str() {
4217                                "eq" => fs.push(AqlFilter::Eq(field.to_string(), ov.clone())),
4218                                "ne" => fs.push(AqlFilter::Ne(field.to_string(), ov.clone())),
4219                                "gt" => fs.push(AqlFilter::Gt(field.to_string(), ov.clone())),
4220                                "gte" => fs.push(AqlFilter::Gte(field.to_string(), ov.clone())),
4221                                "lt" => fs.push(AqlFilter::Lt(field.to_string(), ov.clone())),
4222                                "lte" => fs.push(AqlFilter::Lte(field.to_string(), ov.clone())),
4223                                "in" => fs.push(AqlFilter::In(field.to_string(), ov.clone())),
4224                                "notin" => fs.push(AqlFilter::NotIn(field.to_string(), ov.clone())),
4225                                "contains" => {
4226                                    fs.push(AqlFilter::Contains(field.to_string(), ov.clone()))
4227                                }
4228                                "containsAny" => {
4229                                    fs.push(AqlFilter::ContainsAny(field.to_string(), ov.clone()))
4230                                }
4231                                "containsAll" => {
4232                                    fs.push(AqlFilter::ContainsAll(field.to_string(), ov.clone()))
4233                                }
4234                                "startsWith" => {
4235                                    fs.push(AqlFilter::StartsWith(field.to_string(), ov.clone()))
4236                                }
4237                                "endsWith" => {
4238                                    fs.push(AqlFilter::EndsWith(field.to_string(), ov.clone()))
4239                                }
4240                                "matches" => {
4241                                    fs.push(AqlFilter::Matches(field.to_string(), ov.clone()))
4242                                }
4243                                _ => {}
4244                            }
4245                        }
4246                    }
4247                }
4248            }
4249        }
4250        if fs.is_empty() {
4251            Ok(AqlFilter::And(vec![]))
4252        } else if fs.len() == 1 {
4253            Ok(fs.remove(0))
4254        } else {
4255            Ok(AqlFilter::And(fs))
4256        }
4257    } else {
4258        Err(AqlError::new(
4259            ErrorCode::QueryError,
4260            "Filter must be object".to_string(),
4261        ))
4262    }
4263}
4264
4265fn resolve_value(
4266    v: &ast::Value,
4267    vars: &HashMap<String, ast::Value>,
4268    _ctx: &ExecutionContext,
4269) -> ast::Value {
4270    match v {
4271        ast::Value::Variable(n) => vars.get(n).cloned().unwrap_or(ast::Value::Null),
4272        _ => v.clone(),
4273    }
4274}
4275
4276/// Recursively resolve all variable references inside an AST value.
4277/// Unlike `resolve_value` (which only handles a top-level `$var`), this walks
4278/// into objects and arrays so nested references like
4279/// `{ query: $term }` or `{ status: { eq: $activeStatus } }` are fully resolved.
4280fn resolve_ast_deep(v: &ast::Value, vars: &HashMap<String, ast::Value>) -> ast::Value {
4281    match v {
4282        ast::Value::Variable(n) => vars.get(n).cloned().unwrap_or(ast::Value::Null),
4283        ast::Value::Object(m) => ast::Value::Object(
4284            m.iter()
4285                .map(|(k, val)| (k.clone(), resolve_ast_deep(val, vars)))
4286                .collect(),
4287        ),
4288        ast::Value::Array(arr) => {
4289            ast::Value::Array(arr.iter().map(|val| resolve_ast_deep(val, vars)).collect())
4290        }
4291        _ => v.clone(),
4292    }
4293}
4294
4295#[cfg(test)]
4296mod tests {
4297    use super::*;
4298    use crate::{Aurora, AuroraConfig, DurabilityMode, FieldType};
4299    use tempfile::TempDir;
4300
4301    #[tokio::test]
4302    async fn test_executor_integration() {
4303        let td = TempDir::new().unwrap();
4304        let db = Aurora::with_config(AuroraConfig {
4305            db_path: td.path().join("test.db"),
4306            enable_write_buffering: false,
4307            durability_mode: DurabilityMode::Strict,
4308            ..Default::default()
4309        })
4310        .await
4311        .unwrap();
4312        db.new_collection(
4313            "users",
4314            vec![(
4315                "name",
4316                crate::types::FieldDefinition {
4317                    field_type: FieldType::SCALAR_STRING,
4318                    unique: false,
4319                    indexed: true,
4320                    nullable: true,
4321                    ..Default::default()
4322                },
4323            )],
4324        )
4325        .await
4326        .unwrap();
4327        let _ = execute(
4328            &db,
4329            r#"mutation { insertInto(collection: "users", data: { name: "Alice" }) { id name } }"#,
4330            ExecutionOptions::new(),
4331        )
4332        .await
4333        .unwrap();
4334        let res = execute(&db, "query { users { name } }", ExecutionOptions::new())
4335            .await
4336            .unwrap();
4337        if let ExecutionResult::Query(q) = res {
4338            assert_eq!(q.documents.len(), 1);
4339        } else {
4340            panic!("Expected query");
4341        }
4342    }
4343}