Skip to main content

aurora_db/parser/
executor.rs

1//! AQL Executor - Connects parsed AQL to Aurora operations
2//!
3//! Provides the bridge between AQL AST and Aurora's database operations.
4//! All operations (queries, mutations, subscriptions) go through execute().
5
6use super::ast::{self, Field, Filter as AqlFilter, FragmentDef, MutationOp, Operation, Selection};
7use super::executor_utils::{CompiledFilter, compile_filter};
8
9use crate::Aurora;
10use crate::error::{AqlError, ErrorCode, Result};
11use crate::types::{Document, FieldDefinition, FieldType, ScalarType, Value};
12use serde::Serialize;
13use serde_json::Value as JsonValue;
14use std::collections::HashMap;
15use std::sync::Arc;
16
17// chrono used for downsample timestamp bucketing
18#[allow(unused_imports)]
19use chrono::TimeZone as _;
20
21pub type ExecutionContext = HashMap<String, JsonValue>;
22
23/// A pre-compiled query plan that can be executed repeatedly with different variables
24#[derive(Debug, Clone)]
25pub struct QueryPlan {
26    pub collection: String,
27    pub filter: Option<ast::Filter>,
28    pub compiled_filter: Option<CompiledFilter>,
29    pub projection: Vec<ast::Field>,
30    pub limit: Option<usize>,
31    pub offset: usize,
32    pub after: Option<String>,
33    pub orderings: Vec<ast::Ordering>,
34    pub is_connection: bool,
35    pub has_lookups: bool,
36    pub fragments: HashMap<String, FragmentDef>,
37    pub variable_definitions: Vec<ast::VariableDefinition>,
38}
39
40impl QueryPlan {
41    pub fn validate(&self, provided_variables: &HashMap<String, ast::Value>) -> Result<()> {
42        validate_required_variables(&self.variable_definitions, provided_variables)
43    }
44
45    pub fn from_query(
46        query: &ast::Query,
47        fragments: &HashMap<String, FragmentDef>,
48        variables: &HashMap<String, ast::Value>,
49    ) -> Result<Vec<Self>> {
50        let root_fields = collect_fields(&query.selection_set, fragments, variables, None)?;
51
52        let mut plans = Vec::new();
53        for field in root_fields {
54            let collection = field.name.clone();
55            let filter = extract_filter_from_args(&field.arguments)?;
56            let compiled_filter = if let Some(ref f) = filter {
57                Some(compile_filter(f)?)
58            } else {
59                None
60            };
61
62            let sub_fields = collect_fields(
63                &field.selection_set,
64                fragments,
65                variables,
66                Some(&field.name),
67            )?;
68
69            let (limit, offset) = extract_pagination(&field.arguments);
70            let (first, after, _last, _before) = extract_cursor_pagination(&field.arguments);
71            let orderings = extract_order_by(&field.arguments);
72            let is_connection = sub_fields
73                .iter()
74                .any(|f| f.name == "edges" || f.name == "pageInfo");
75
76            let has_lookups = sub_fields.iter().any(|f| {
77                f.arguments.iter().any(|arg| {
78                    arg.name == "collection"
79                        || arg.name == "localField"
80                        || arg.name == "foreignField"
81                })
82            });
83
84            plans.push(QueryPlan {
85                collection,
86                filter,
87                compiled_filter,
88                projection: sub_fields,
89                limit: limit.or(first),
90                offset,
91                after,
92                orderings,
93                is_connection,
94                has_lookups,
95                fragments: fragments.clone(),
96                variable_definitions: query.variable_definitions.clone(),
97            });
98        }
99        Ok(plans)
100    }
101}
102
103/// Execute an AQL query string against the database
104pub async fn execute(db: &Aurora, aql: &str, options: ExecutionOptions) -> Result<ExecutionResult> {
105    use std::collections::hash_map::DefaultHasher;
106    use std::hash::{Hash, Hasher};
107
108    // Compute query hash for cache lookup (ignoring whitespace variations)
109    let query_key = {
110        let mut hasher = DefaultHasher::new();
111        aql.trim().hash(&mut hasher);
112        hasher.finish()
113    };
114
115    // Prepare variables for this execution
116    let vars: HashMap<String, ast::Value> = options
117        .variables
118        .iter()
119        .map(|(k, v)| (k.clone(), json_to_aql_value(v.clone())))
120        .collect();
121
122    // HIGH-SPEED PATH: Check for pre-compiled Query Plan
123    // Search queries are never cached — execute_plan doesn't handle `search:`.
124    if let Some(plan) = db.plan_cache.get(&query_key) {
125        plan.validate(&vars)?;
126        return execute_plan(db, &plan, &vars, &options).await;
127    }
128
129    // SLOW PATH: Parse and Compile
130    let mut doc = super::parse(aql)?;
131
132    // Extract first query operation for planning
133    if let Some(Operation::Query(query)) = doc
134        .operations
135        .iter()
136        .find(|op| matches!(op, Operation::Query(_)))
137    {
138        let fragments: HashMap<String, FragmentDef> = doc
139            .operations
140            .iter()
141            .filter_map(|op| {
142                if let Operation::FragmentDefinition(f) = op {
143                    Some((f.name.clone(), f.clone()))
144                } else {
145                    None
146                }
147            })
148            .collect();
149
150        // Skip the fast plan path for search queries — execute_plan doesn't
151        // handle the `search:` argument; route through execute_document instead.
152        // Use collect_fields() so inline fragments and fragment spreads are
153        // traversed the same way QueryPlan::from_query() traverses them.
154        let root_fields = collect_fields(&query.selection_set, &fragments, &vars, None)?;
155        let has_search = root_fields.iter().any(|f| f.arguments.iter().any(|a| a.name == "search"));
156
157        if !has_search {
158            let plans = QueryPlan::from_query(query, &fragments, &vars)?;
159            if plans.len() == 1 {
160                let plan = Arc::new(plans[0].clone());
161                plan.validate(&vars)?;
162                db.plan_cache.insert(query_key, Arc::clone(&plan));
163
164                return execute_plan(db, &plan, &vars, &options).await;
165            }
166        }
167    }
168
169    // FALLBACK: Normal execution for complex documents (but WITH our prepared vars)
170    // Merge declared variable defaults for any var not explicitly provided.
171    let mut vars = vars;
172    for op in &doc.operations {
173        if let Operation::Query(q) = op {
174            for var_def in &q.variable_definitions {
175                if !vars.contains_key(&var_def.name) {
176                    if let Some(default) = &var_def.default_value {
177                        vars.insert(var_def.name.clone(), default.clone());
178                    }
179                }
180            }
181        }
182    }
183
184    // We must resolve variables in the AST before executing the document path
185    super::validator::resolve_variables(&mut doc, &vars).map_err(|e| {
186        let code = match e.code {
187            super::validator::ErrorCode::MissingRequiredVariable => ErrorCode::UndefinedVariable,
188            super::validator::ErrorCode::TypeMismatch => ErrorCode::TypeError,
189            _ => ErrorCode::QueryError,
190        };
191        AqlError::new(code, e.to_string())
192    })?;
193
194    execute_document(db, &doc, &options).await
195}
196
197async fn execute_plan(
198    db: &Aurora,
199    plan: &QueryPlan,
200    variables: &HashMap<String, ast::Value>,
201    options: &ExecutionOptions,
202) -> Result<ExecutionResult> {
203    let collection_name = &plan.collection;
204
205    // Determine effective limit for the scan
206    let effective_limit = plan.limit;
207
208    // 1. Resolve Indexed Equality (Fast path)
209    let mut indexed_docs = None;
210    if let Some(ref f) = plan.filter {
211        // We need a version of find_indexed that uses our runtime variables
212        if let Some((field, val)) =
213            find_indexed_equality_filter_runtime(f, db, collection_name, variables)
214        {
215            // Skip the indexed fast path if the index is still being rebuilt —
216            // fall through to the full scan below instead of returning zero results.
217            let index_ready = field == "_sid" || !db.is_index_building(collection_name, &field);
218            if index_ready {
219            let db_val = aql_value_to_db_value(&val, variables)?;
220            let ids = if field == "_sid" {
221                match &db_val {
222                    Value::String(s) => vec![s.clone()],
223                    Value::Uuid(u) => vec![u.to_string()],
224                    _ => vec![],
225                }
226            } else {
227                db.get_ids_from_index(collection_name, &field, &db_val)
228            };
229
230            let mut docs = Vec::with_capacity(ids.len());
231            for id in ids {
232                if let Some(doc) = db.get_document(collection_name, &id)? {
233                    // Check if doc matches the REST of the plan's compiled filter
234                    if let Some(ref cf) = plan.compiled_filter {
235                        if matches_filter(&doc, cf, variables) {
236                            docs.push(doc);
237                        }
238                    } else {
239                        docs.push(doc);
240                    }
241                }
242            }
243
244            indexed_docs = Some(docs);
245            } // end if index_ready — if not ready, indexed_docs stays None → full scan
246        }
247    }
248
249    let mut docs = if let Some(d) = indexed_docs {
250        d
251    } else {
252        // Fallback to scan
253        let vars_arc = Arc::new(variables.clone());
254        let cf_clone = plan.compiled_filter.clone();
255        let filter_fn = move |doc: &Document| {
256            cf_clone
257                .as_ref()
258                .map(|f| matches_filter(doc, f, &vars_arc))
259                .unwrap_or(true)
260        };
261
262        // Scan-limit is only valid when there is no cursor and no ordering —
263        // orderings require a full scan to find the correct top-N.
264        let scan_limit = if plan.after.is_some() || !plan.orderings.is_empty() {
265            None
266        } else {
267            plan.limit.map(|l| {
268                let base = if plan.is_connection { l + 1 } else { l };
269                base + plan.offset
270            })
271        };
272        db.scan_and_filter(collection_name, filter_fn, scan_limit)?
273    };
274
275    // 2. Sort (must happen before cursor drain and offset)
276    if !plan.orderings.is_empty() {
277        apply_ordering(&mut docs, &plan.orderings);
278    }
279
280    // 2a. Cursor pagination: skip past the `after` cursor
281    if let Some(ref cursor) = plan.after {
282        if let Some(pos) = docs.iter().position(|d| &d._sid == cursor) {
283            docs.drain(0..=pos);
284        }
285    }
286
287    // 2b. Apply offset
288    if plan.offset > 0 {
289        if plan.offset < docs.len() {
290            docs.drain(0..plan.offset);
291        } else {
292            docs.clear();
293        }
294    }
295
296    // 3. Handle Connection vs Flat List
297    if plan.is_connection {
298        return Ok(ExecutionResult::Query(execute_connection(
299            docs,
300            &plan.projection,
301            effective_limit,
302            &plan.fragments,
303            variables,
304            options,
305        )));
306    }
307
308    // 3a. Apply limit for flat list (after ordering so we get correct top-N)
309    if let Some(l) = effective_limit {
310        docs.truncate(l);
311    }
312
313    // 4. Project (Flat List)
314    if options.apply_projections && !plan.projection.is_empty() {
315        // Check if we have an aggregate or groupBy field
316        let agg_field = plan.projection.iter().find(|f| f.name == "aggregate");
317        let group_by_field = plan.projection.iter().find(|f| f.name == "groupBy");
318
319        if let Some(f) = group_by_field {
320            // Handle GroupBy
321            let field_name = f
322                .arguments
323                .iter()
324                .find(|a| a.name == "field")
325                .and_then(|a| match &a.value {
326                    ast::Value::String(s) => Some(s),
327                    _ => None,
328                });
329
330            if let Some(group_field) = field_name {
331                let mut groups: HashMap<String, Vec<Document>> = HashMap::new();
332                for d in docs {
333                    let key = d
334                        .data
335                        .get(group_field)
336                        .map(|v| match v {
337                            Value::String(s) => s.clone(),
338                            _ => v.to_string(),
339                        })
340                        .unwrap_or_else(|| "null".to_string());
341                    groups.entry(key).or_default().push(d);
342                }
343
344                let mut group_docs = Vec::with_capacity(groups.len());
345                for (key, group_items) in groups {
346                    let mut data = HashMap::new();
347                    for selection in &f.selection_set {
348                        if let Selection::Field(sub_f) = selection {
349                            let alias = sub_f.alias.as_ref().unwrap_or(&sub_f.name);
350                            match sub_f.name.as_str() {
351                                "key" => {
352                                    data.insert(alias.clone(), Value::String(key.clone()));
353                                }
354                                "count" => {
355                                    data.insert(
356                                        alias.clone(),
357                                        Value::Int(group_items.len() as i64),
358                                    );
359                                }
360                                "nodes" => {
361                                    let sub_fields = collect_fields(
362                                        &sub_f.selection_set,
363                                        &plan.fragments,
364                                        variables,
365                                        None,
366                                    )
367                                    .unwrap_or_default();
368
369                                    let projected_nodes = group_items
370                                        .iter()
371                                        .map(|d| {
372                                            let node_data =
373                                                apply_projection(d.clone(), &sub_fields, options).data;
374                                            Value::Object(node_data)
375                                        })
376                                        .collect();
377                                    data.insert(alias.clone(), Value::Array(projected_nodes));
378                                }
379                                "aggregate" => {
380                                    let agg_res =
381                                        compute_aggregates(&group_items, &sub_f.selection_set);
382                                    data.insert(alias.clone(), Value::Object(agg_res));
383                                }
384                                _ => {}
385                            }
386                        }
387                    }
388                    group_docs.push(Document {
389                        _sid: format!("group:{}", key),
390                        data,
391                    });
392                }
393                docs = group_docs;
394            }
395        } else if let Some(agg) = agg_field {
396            // If aggregate is the ONLY field, we return a single summary document
397            // If there are other fields, we currently follow the behavior of adding the agg to each doc
398            // (Test expectations suggest a single doc if only agg is asked)
399            let agg_result = compute_aggregates(&docs, &agg.selection_set);
400            let alias = agg.alias.as_ref().unwrap_or(&agg.name);
401
402            if plan.projection.len() == 1 {
403                let mut data = HashMap::new();
404                data.insert(alias.clone(), Value::Object(agg_result));
405                docs = vec![Document {
406                    _sid: "aggregate".to_string(),
407                    data,
408                }];
409            } else {
410                // Add aggregate to each document
411                docs = docs
412                    .into_iter()
413                    .map(|mut d| {
414                        d.data
415                            .insert(alias.clone(), Value::Object(agg_result.clone()));
416                        apply_projection(d, &plan.projection, options)
417                    })
418                    .collect();
419            }
420        } else if !plan.has_lookups {
421            docs = docs
422                .into_iter()
423                .map(|d| apply_projection(d, &plan.projection, options))
424                .collect();
425        } else {
426            // Slow path for lookups
427            let mut projected = Vec::with_capacity(docs.len());
428            for d in docs {
429                let (proj_doc, _) =
430                    apply_projection_with_lookups(db, d, &plan.projection, variables, options).await?;
431                projected.push(proj_doc);
432            }
433            docs = projected;
434        }
435    }
436
437    Ok(ExecutionResult::Query(QueryResult {
438        collection: collection_name.clone(),
439        documents: docs,
440        total_count: None,
441        deferred_fields: vec![],
442        explain: None,
443    }))
444}
445
446fn compute_aggregates(docs: &[Document], selections: &[Selection]) -> HashMap<String, Value> {
447    let mut results = HashMap::new();
448
449    for selection in selections {
450        if let Selection::Field(f) = selection {
451            let alias = f.alias.as_ref().unwrap_or(&f.name);
452            let value = match f.name.as_str() {
453                "count" => Value::Int(docs.len() as i64),
454                "sum" => {
455                    let field =
456                        f.arguments
457                            .iter()
458                            .find(|a| a.name == "field")
459                            .and_then(|a| match &a.value {
460                                ast::Value::String(s) => Some(s),
461                                _ => None,
462                            });
463
464                    if let Some(field_name) = field {
465                        let sum: f64 = docs
466                            .iter()
467                            .filter_map(|d| d.data.get(field_name))
468                            .filter_map(|v| match v {
469                                Value::Int(i) => Some(*i as f64),
470                                Value::Float(f) => Some(*f),
471                                _ => None,
472                            })
473                            .sum();
474                        Value::Float(sum)
475                    } else {
476                        Value::Null
477                    }
478                }
479                "avg" => {
480                    let field =
481                        f.arguments
482                            .iter()
483                            .find(|a| a.name == "field")
484                            .and_then(|a| match &a.value {
485                                ast::Value::String(s) => Some(s),
486                                _ => None,
487                            });
488
489                    if let Some(field_name) = field
490                        && !docs.is_empty()
491                    {
492                        let values: Vec<f64> = docs
493                            .iter()
494                            .filter_map(|d| d.data.get(field_name))
495                            .filter_map(|v| match v {
496                                Value::Int(i) => Some(*i as f64),
497                                Value::Float(f) => Some(*f),
498                                _ => None,
499                            })
500                            .collect();
501
502                        if values.is_empty() {
503                            Value::Null
504                        } else {
505                            let sum: f64 = values.iter().sum();
506                            Value::Float(sum / values.len() as f64)
507                        }
508                    } else {
509                        Value::Null
510                    }
511                }
512                "min" => {
513                    let field =
514                        f.arguments
515                            .iter()
516                            .find(|a| a.name == "field")
517                            .and_then(|a| match &a.value {
518                                ast::Value::String(s) => Some(s),
519                                _ => None,
520                            });
521
522                    if let Some(field_name) = field
523                        && !docs.is_empty()
524                    {
525                        let min = docs
526                            .iter()
527                            .filter_map(|d| d.data.get(field_name))
528                            .filter_map(|v| match v {
529                                Value::Int(i) => Some(*i as f64),
530                                Value::Float(f) => Some(*f),
531                                _ => None,
532                            })
533                            .fold(f64::INFINITY, f64::min);
534
535                        if min == f64::INFINITY {
536                            Value::Null
537                        } else {
538                            Value::Float(min)
539                        }
540                    } else {
541                        Value::Null
542                    }
543                }
544                "max" => {
545                    let field =
546                        f.arguments
547                            .iter()
548                            .find(|a| a.name == "field")
549                            .and_then(|a| match &a.value {
550                                ast::Value::String(s) => Some(s),
551                                _ => None,
552                            });
553
554                    if let Some(field_name) = field
555                        && !docs.is_empty()
556                    {
557                        let max = docs
558                            .iter()
559                            .filter_map(|d| d.data.get(field_name))
560                            .filter_map(|v| match v {
561                                Value::Int(i) => Some(*i as f64),
562                                Value::Float(f) => Some(*f),
563                                _ => None,
564                            })
565                            .fold(f64::NEG_INFINITY, f64::max);
566
567                        if max == f64::NEG_INFINITY {
568                            Value::Null
569                        } else {
570                            Value::Float(max)
571                        }
572                    } else {
573                        Value::Null
574                    }
575                }
576                _ => Value::Null,
577            };
578            results.insert(alias.clone(), value);
579        }
580    }
581
582    results
583}
584
585fn find_indexed_equality_filter_runtime(
586    filter: &ast::Filter,
587    db: &Aurora,
588    collection: &str,
589    variables: &HashMap<String, ast::Value>,
590) -> Option<(String, ast::Value)> {
591    match filter {
592        ast::Filter::Eq(field, val) => {
593            if field == "_sid" || db.has_index(collection, field) {
594                let resolved = resolve_if_variable(val, variables);
595                return Some((field.clone(), resolved.clone()));
596            }
597        }
598        ast::Filter::And(filters) => {
599            for f in filters {
600                if let Some(res) =
601                    find_indexed_equality_filter_runtime(f, db, collection, variables)
602                {
603                    return Some(res);
604                }
605            }
606        }
607        _ => {}
608    }
609    None
610}
611
612/// Helper to flatten a selection set (processing fragments) into a list of fields
613fn collect_fields(
614    selection_set: &[Selection],
615    fragments: &HashMap<String, FragmentDef>,
616    variable_values: &HashMap<String, ast::Value>,
617    parent_type: Option<&str>,
618) -> Result<Vec<Field>> {
619    let mut fields = Vec::new();
620
621    for selection in selection_set {
622        match selection {
623            Selection::Field(field) => {
624                if should_include(&field.directives, variable_values)? {
625                    fields.push(field.clone());
626                }
627            }
628            Selection::FragmentSpread(name) => {
629                if let Some(fragment) = fragments.get(name) {
630                    let type_match = if let Some(parent) = parent_type {
631                        parent == fragment.type_condition
632                    } else {
633                        true
634                    };
635
636                    if type_match {
637                        let fragment_fields = collect_fields(
638                            &fragment.selection_set,
639                            fragments,
640                            variable_values,
641                            parent_type,
642                        )?;
643                        fields.extend(fragment_fields);
644                    }
645                }
646            }
647            Selection::InlineFragment(inline) => {
648                let type_match = if let Some(parent) = parent_type {
649                    parent == inline.type_condition
650                } else {
651                    true
652                };
653
654                if type_match {
655                    let inline_fields = collect_fields(
656                        &inline.selection_set,
657                        fragments,
658                        variable_values,
659                        parent_type,
660                    )?;
661                    fields.extend(inline_fields);
662                }
663            }
664        }
665    }
666
667    Ok(fields)
668}
669
670/// Check if a field/fragment should be included based on @skip/@include
671fn should_include(
672    directives: &[ast::Directive],
673    variables: &HashMap<String, ast::Value>,
674) -> Result<bool> {
675    for dir in directives {
676        if dir.name == "skip" {
677            if let Some(arg) = dir.arguments.iter().find(|a| a.name == "if") {
678                let should_skip = resolve_boolean_arg(&arg.value, variables)?;
679                if should_skip {
680                    return Ok(false);
681                }
682            }
683        } else if dir.name == "include" {
684            if let Some(arg) = dir.arguments.iter().find(|a| a.name == "if") {
685                let should_include = resolve_boolean_arg(&arg.value, variables)?;
686                if !should_include {
687                    return Ok(false);
688                }
689            }
690        }
691    }
692    Ok(true)
693}
694
695fn resolve_boolean_arg(
696    value: &ast::Value,
697    variables: &HashMap<String, ast::Value>,
698) -> Result<bool> {
699    match value {
700        ast::Value::Boolean(b) => Ok(*b),
701        ast::Value::Variable(name) => {
702            if let Some(val) = variables.get(name) {
703                match val {
704                    ast::Value::Boolean(b) => Ok(*b),
705                    _ => Err(AqlError::new(
706                        ErrorCode::TypeError,
707                        format!("Variable '{}' is not a boolean, got {:?}", name, val),
708                    )),
709                }
710            } else {
711                Err(AqlError::new(
712                    ErrorCode::UndefinedVariable,
713                    format!("Variable '{}' is not defined", name),
714                ))
715            }
716        }
717        _ => Err(AqlError::new(
718            ErrorCode::TypeError,
719            format!("Expected boolean value, got {:?}", value),
720        )),
721    }
722}
723
724/// Validate that all required variables are provided
725fn validate_required_variables(
726    variable_definitions: &[ast::VariableDefinition],
727    provided_variables: &HashMap<String, ast::Value>,
728) -> Result<()> {
729    for var_def in variable_definitions {
730        if var_def.var_type.is_required {
731            if !provided_variables.contains_key(&var_def.name) {
732                if var_def.default_value.is_none() {
733                    return Err(AqlError::new(
734                        ErrorCode::UndefinedVariable,
735                        format!(
736                            "Required variable '{}' (type: {}{}) is not provided",
737                            var_def.name,
738                            var_def.var_type.name,
739                            if var_def.var_type.is_required {
740                                "!"
741                            } else {
742                                ""
743                            }
744                        ),
745                    ));
746                }
747            }
748        }
749    }
750    Ok(())
751}
752
753/// Result of executing an AQL operation
754#[derive(Debug)]
755pub enum ExecutionResult {
756    Query(QueryResult),
757    Mutation(MutationResult),
758    Subscription(SubscriptionResult),
759    Batch(Vec<ExecutionResult>),
760    Schema(SchemaResult),
761    Migration(MigrationResult),
762}
763
764#[derive(Debug, Clone)]
765pub struct SchemaResult {
766    pub operation: String,
767    pub collection: String,
768    pub status: String,
769}
770
771#[derive(Debug, Clone)]
772pub struct MigrationResult {
773    pub version: String,
774    pub steps_applied: usize,
775    pub status: String,
776}
777
778#[derive(Debug, Clone, Serialize)]
779pub struct ExecutionPlan {
780    pub operations: Vec<String>,
781    pub estimated_cost: f64,
782}
783
784/// Query execution result
785#[derive(Debug, Clone)]
786pub struct QueryResult {
787    pub collection: String,
788    pub documents: Vec<Document>,
789    pub total_count: Option<usize>,
790    /// Fields marked @defer — omitted from documents, listed here
791    pub deferred_fields: Vec<String>,
792    /// Explain metadata when @explain is present on the query
793    pub explain: Option<ExplainResult>,
794}
795
796/// Execution plan metadata returned by @explain
797#[derive(Debug, Clone, Default)]
798pub struct ExplainResult {
799    pub collection: String,
800    pub docs_scanned: usize,
801    pub index_used: bool,
802    pub elapsed_ms: u128,
803}
804
805/// Mutation execution result
806#[derive(Debug, Clone)]
807pub struct MutationResult {
808    pub operation: String,
809    pub collection: String,
810    pub affected_count: usize,
811    pub returned_documents: Vec<Document>,
812}
813
814/// Subscription result
815#[derive(Debug)]
816pub struct SubscriptionResult {
817    pub subscription_id: String,
818    pub collection: String,
819    pub stream: Option<crate::pubsub::ChangeListener>,
820}
821
822/// Execution options
823#[derive(Debug, Clone)]
824pub struct ExecutionOptions {
825    pub skip_validation: bool,
826    pub apply_projections: bool,
827    pub debug_audit: bool,
828    pub variables: HashMap<String, JsonValue>,
829}
830
831impl Default for ExecutionOptions {
832    fn default() -> Self {
833        Self {
834            skip_validation: false,
835            apply_projections: true,
836            debug_audit: false,
837            variables: HashMap::new(),
838        }
839    }
840}
841
842impl ExecutionOptions {
843    pub fn new() -> Self {
844        Self::default()
845    }
846
847    pub fn with_variables(mut self, vars: HashMap<String, JsonValue>) -> Self {
848        self.variables = vars;
849        self
850    }
851
852    pub fn skip_validation(mut self) -> Self {
853        self.skip_validation = true;
854        self
855    }
856}
857
858fn json_to_aql_value(v: serde_json::Value) -> ast::Value {
859    match v {
860        serde_json::Value::Null => ast::Value::Null,
861        serde_json::Value::Bool(b) => ast::Value::Boolean(b),
862        serde_json::Value::Number(n) => {
863            if let Some(i) = n.as_i64() {
864                ast::Value::Int(i)
865            } else if let Some(f) = n.as_f64() {
866                ast::Value::Float(f)
867            } else {
868                ast::Value::Null
869            }
870        }
871        serde_json::Value::String(s) => ast::Value::String(s),
872        serde_json::Value::Array(arr) => {
873            ast::Value::Array(arr.into_iter().map(json_to_aql_value).collect())
874        }
875        serde_json::Value::Object(map) => ast::Value::Object(
876            map.into_iter()
877                .map(|(k, v)| (k, json_to_aql_value(v)))
878                .collect(),
879        ),
880    }
881}
882
883/// Execute a parsed AQL document
884pub async fn execute_document(
885    db: &Aurora,
886    doc: &ast::Document,
887    options: &ExecutionOptions,
888) -> Result<ExecutionResult> {
889    if doc.operations.is_empty() {
890        return Err(AqlError::new(
891            ErrorCode::QueryError,
892            "No operations in document".to_string(),
893        ));
894    }
895
896    let vars: HashMap<String, ast::Value> = options
897        .variables
898        .iter()
899        .map(|(k, v)| (k.clone(), json_to_aql_value(v.clone())))
900        .collect();
901
902    let fragments: HashMap<String, FragmentDef> = doc
903        .operations
904        .iter()
905        .filter_map(|op| {
906            if let Operation::FragmentDefinition(frag) = op {
907                Some((frag.name.clone(), frag.clone()))
908            } else {
909                None
910            }
911        })
912        .collect();
913
914    let executable_ops: Vec<&Operation> = doc
915        .operations
916        .iter()
917        .filter(|op| !matches!(op, Operation::FragmentDefinition(_)))
918        .collect();
919
920    if executable_ops.is_empty() {
921        return Err(AqlError::new(
922            ErrorCode::QueryError,
923            "No executable operations in document".to_string(),
924        ));
925    }
926
927    if executable_ops.len() == 1 {
928        execute_operation(db, executable_ops[0], &vars, options, &fragments).await
929    } else {
930        let mut results = Vec::new();
931        for op in executable_ops {
932            results.push(execute_operation(db, op, &vars, options, &fragments).await?);
933        }
934        Ok(ExecutionResult::Batch(results))
935    }
936}
937
938async fn execute_operation(
939    db: &Aurora,
940    op: &Operation,
941    vars: &HashMap<String, ast::Value>,
942    options: &ExecutionOptions,
943    fragments: &HashMap<String, FragmentDef>,
944) -> Result<ExecutionResult> {
945    match op {
946        Operation::Query(query) => execute_query(db, query, vars, options, fragments).await,
947        Operation::Mutation(mutation) => {
948            execute_mutation(db, mutation, vars, options, fragments).await
949        }
950        Operation::Subscription(sub) => execute_subscription(db, sub, vars, options).await,
951        Operation::Schema(schema) => execute_schema(db, schema, options).await,
952        Operation::Migration(migration) => execute_migration(db, migration, options).await,
953        Operation::Introspection(intro) => execute_introspection(db, intro).await,
954        Operation::Handler(handler) => execute_handler_registration(db, handler, options).await,
955        _ => Ok(ExecutionResult::Query(QueryResult {
956            collection: String::new(),
957            documents: vec![],
958            total_count: None,
959            deferred_fields: vec![],
960            explain: None,
961        })),
962    }
963}
964
965async fn execute_query(
966    db: &Aurora,
967    query: &ast::Query,
968    vars: &HashMap<String, ast::Value>,
969    options: &ExecutionOptions,
970    fragments: &HashMap<String, FragmentDef>,
971) -> Result<ExecutionResult> {
972    validate_required_variables(&query.variable_definitions, vars)?;
973    let has_explain = query.directives.iter().any(|d| d.name == "explain");
974    let root_fields = collect_fields(&query.selection_set, fragments, vars, None)?;
975    let mut results = Vec::new();
976    for field in &root_fields {
977        let sub_fields = collect_fields(&field.selection_set, fragments, vars, Some(&field.name))?;
978        let start = std::time::Instant::now();
979        let mut result =
980            execute_collection_query(db, field, &sub_fields, vars, options, fragments).await?;
981        if has_explain {
982            let elapsed_ms = start.elapsed().as_millis();
983            let index_used =
984                field.arguments.iter().any(|a| a.name == "where") && !result.documents.is_empty();
985            result.explain = Some(ExplainResult {
986                collection: result.collection.clone(),
987                docs_scanned: result.documents.len(),
988                index_used,
989                elapsed_ms,
990            });
991        }
992        results.push(result);
993    }
994    if results.len() == 1 {
995        Ok(ExecutionResult::Query(results.remove(0)))
996    } else {
997        Ok(ExecutionResult::Batch(
998            results.into_iter().map(ExecutionResult::Query).collect(),
999        ))
1000    }
1001}
1002
1003async fn execute_collection_query(
1004    db: &Aurora,
1005    field: &ast::Field,
1006    sub_fields: &[ast::Field],
1007    variables: &HashMap<String, ast::Value>,
1008    options: &ExecutionOptions,
1009    fragments: &HashMap<String, FragmentDef>,
1010) -> Result<QueryResult> {
1011    let collection_name = &field.name;
1012
1013    // ── Search args ────────────────────────────────────────────────────────
1014    // `search: { query: "...", fields: [...], fuzzy: true }` overrides normal scan.
1015    if let Some(search_arg) = field.arguments.iter().find(|a| a.name == "search") {
1016        return execute_search_query(
1017            db,
1018            collection_name,
1019            search_arg,
1020            sub_fields,
1021            field,
1022            variables,
1023            options,
1024        )
1025        .await;
1026    }
1027
1028    let filter = extract_filter_from_args(&field.arguments)?;
1029    let (limit, offset) = extract_pagination(&field.arguments);
1030    let (first, after, _last, _before) = extract_cursor_pagination(&field.arguments);
1031    let compiled_filter = if let Some(ref f) = filter {
1032        Some(compile_filter(f)?)
1033    } else {
1034        None
1035    };
1036    let vars_arc = Arc::new(variables.clone());
1037    let filter_fn = move |doc: &Document| {
1038        compiled_filter
1039            .as_ref()
1040            .map(|f| matches_filter(doc, f, &vars_arc))
1041            .unwrap_or(true)
1042    };
1043
1044    let indexed_docs = if let Some(ref f) = filter {
1045        match find_indexed_equality_filter(f, db, collection_name) {
1046            Some((field_name, val))
1047                if field_name == "_sid"
1048                    || !db.is_index_building(collection_name, &field_name) =>
1049            {
1050                let db_val = aql_value_to_db_value(&val, variables)?;
1051                let ids = if field_name == "_sid" {
1052                    match &db_val {
1053                        Value::String(s) => vec![s.clone()],
1054                        Value::Uuid(u) => vec![u.to_string()],
1055                        _ => vec![],
1056                    }
1057                } else {
1058                    db.get_ids_from_index(collection_name, &field_name, &db_val)
1059                };
1060
1061                let mut docs = Vec::with_capacity(ids.len());
1062                for id in ids {
1063                    if let Some(doc) = db.get_document(collection_name, &id)? {
1064                        if filter_fn(&doc) {
1065                            docs.push(doc);
1066                        }
1067                    }
1068                }
1069                Some(docs)
1070            }
1071            // Index is building or no indexed filter found — full scan.
1072            _ => None,
1073        }
1074    } else {
1075        None
1076    };
1077
1078    let is_connection = sub_fields
1079        .iter()
1080        .any(|f| f.name == "edges" || f.name == "pageInfo");
1081
1082    let orderings = extract_order_by(&field.arguments);
1083
1084    let mut docs = if let Some(docs) = indexed_docs {
1085        docs
1086    } else {
1087        //only valid when there is no cursor (after) and no
1088        // ordering.  With ordering we must see ALL docs before we can pick the top-N;
1089        // with a cursor we don't know how far into the result set the cursor lies.
1090        let scan_limit = if after.is_some() || !orderings.is_empty() {
1091            None
1092        } else {
1093            limit.or(first).map(|l| {
1094                let base = if is_connection { l + 1 } else { l };
1095                base + offset
1096            })
1097        };
1098        db.scan_and_filter(collection_name, filter_fn, scan_limit)?
1099    };
1100
1101    // Validation args
1102    // `validate: { fieldName: { format: "email" } }` filters out non-conforming docs
1103    if let Some(validate_arg) = field.arguments.iter().find(|a| a.name == "validate") {
1104        docs.retain(|doc| doc_passes_validate_arg(doc, validate_arg));
1105    }
1106
1107    // Sort before any pagination so offset/cursor operate on ordered results.
1108    if !orderings.is_empty() {
1109        apply_ordering(&mut docs, &orderings);
1110    }
1111
1112    // Cursor pagination: skip past the `after` cursor before slicing
1113    if let Some(ref cursor) = after {
1114        if let Some(pos) = docs.iter().position(|d| &d._sid == cursor) {
1115            docs.drain(0..=pos);
1116        }
1117    }
1118
1119    if is_connection {
1120        return Ok(execute_connection(
1121            docs,
1122            sub_fields,
1123            limit.or(first),
1124            fragments,
1125            variables,
1126            options,
1127        ));
1128    }
1129
1130    // Offset pagination: skip the first `offset` documents
1131    if offset > 0 {
1132        if offset < docs.len() {
1133            docs.drain(0..offset);
1134        } else {
1135            docs.clear();
1136        }
1137    }
1138
1139    // Apply limit after offset
1140    if let Some(l) = limit.or(first) {
1141        docs.truncate(l);
1142    }
1143
1144    // Projections & Lookups
1145    let has_lookups = sub_fields.iter().any(|f| {
1146        f.arguments
1147            .iter()
1148            .any(|a| a.name == "collection" || a.name == "localField")
1149    });
1150
1151    let mut deferred_fields = Vec::new();
1152
1153    if options.apply_projections && !sub_fields.is_empty() {
1154        if has_lookups {
1155            let mut projected = Vec::with_capacity(docs.len());
1156            for d in docs {
1157                let (proj_doc, deferred) =
1158                    apply_projection_with_lookups(db, d, sub_fields, variables, options).await?;
1159                projected.push(proj_doc);
1160                if deferred_fields.is_empty() {
1161                    deferred_fields = deferred;
1162                }
1163            }
1164            docs = projected;
1165        } else {
1166            let mut all_deferred = Vec::new();
1167            docs = docs
1168                .into_iter()
1169                .map(|d| {
1170                    let (proj, deferred) = apply_projection_and_defer(d, sub_fields, options);
1171                    if all_deferred.is_empty() && !deferred.is_empty() {
1172                        all_deferred = deferred;
1173                    }
1174                    proj
1175                })
1176                .collect();
1177            deferred_fields = all_deferred;
1178        }
1179    }
1180
1181    // ── Window functions ───────────────────────────────────────────────────
1182    for sf in sub_fields {
1183        if sf.name == "windowFunc" {
1184            let alias = sf.alias.as_ref().unwrap_or(&sf.name).clone();
1185            let wfield = arg_string(&sf.arguments, "field").unwrap_or_default();
1186            let func = arg_string(&sf.arguments, "function").unwrap_or_else(|| "avg".to_string());
1187            let wsize = arg_i64(&sf.arguments, "windowSize").unwrap_or(3) as usize;
1188            apply_window_function(&mut docs, &alias, &wfield, &func, wsize);
1189        }
1190    }
1191
1192    // ── Downsample ─────────────────────────────────────────────────────────
1193    if let Some(ds_field) = sub_fields.iter().find(|f| f.name == "downsample") {
1194        let interval =
1195            arg_string(&ds_field.arguments, "interval").unwrap_or_else(|| "1h".to_string());
1196        let aggregation =
1197            arg_string(&ds_field.arguments, "aggregation").unwrap_or_else(|| "avg".to_string());
1198        let ds_sub: Vec<String> =
1199            collect_fields(&ds_field.selection_set, fragments, variables, None)
1200                .unwrap_or_default()
1201                .iter()
1202                .map(|f| f.name.clone())
1203                .collect();
1204        docs = apply_downsample(docs, &interval, &aggregation, &ds_sub);
1205    }
1206
1207    Ok(QueryResult {
1208        collection: collection_name.clone(),
1209        documents: docs,
1210        total_count: None,
1211        deferred_fields,
1212        explain: None,
1213    })
1214}
1215
1216/// Execute a search query using SearchBuilder
1217async fn execute_search_query(
1218    db: &Aurora,
1219    collection: &str,
1220    search_arg: &ast::Argument,
1221    sub_fields: &[ast::Field],
1222    field: &ast::Field,
1223    variables: &HashMap<String, ast::Value>,
1224    options: &ExecutionOptions,
1225) -> Result<QueryResult> {
1226    // Deeply resolve variable references inside the search argument (e.g. query: $term)
1227    let resolved_search_val = resolve_ast_deep(&search_arg.value, variables);
1228    let (query_str, search_fields, fuzzy) = extract_search_params(&resolved_search_val);
1229    let (limit, _) = extract_pagination(&field.arguments);
1230
1231    let mut builder = db.search(collection).query(&query_str);
1232    if fuzzy {
1233        builder = builder.fuzzy(1);
1234    }
1235    if let Some(l) = limit {
1236        builder = builder.limit(l);
1237    }
1238
1239    let mut docs = builder
1240        .collect_with_fields(if search_fields.is_empty() {
1241            None
1242        } else {
1243            Some(&search_fields)
1244        })
1245        .await?;
1246
1247    if options.apply_projections && !sub_fields.is_empty() {
1248        docs = docs
1249            .into_iter()
1250            .map(|d| {
1251                let (proj, _) = apply_projection_and_defer(d, sub_fields, options);
1252                proj
1253            })
1254            .collect();
1255    }
1256
1257    Ok(QueryResult {
1258        collection: collection.to_string(),
1259        documents: docs,
1260        total_count: None,
1261        deferred_fields: vec![],
1262        explain: None,
1263    })
1264}
1265
1266fn extract_search_params(v: &ast::Value) -> (String, Vec<String>, bool) {
1267    let mut query = String::new();
1268    let mut fields = Vec::new();
1269    let mut fuzzy = false;
1270    if let ast::Value::Object(m) = v {
1271        if let Some(ast::Value::String(q)) = m.get("query") {
1272            query = q.clone();
1273        }
1274        if let Some(ast::Value::Array(arr)) = m.get("fields") {
1275            for item in arr {
1276                if let ast::Value::String(s) = item {
1277                    fields.push(s.clone());
1278                }
1279            }
1280        }
1281        if let Some(ast::Value::Boolean(b)) = m.get("fuzzy") {
1282            fuzzy = *b;
1283        }
1284    }
1285    (query, fields, fuzzy)
1286}
1287
1288fn doc_passes_validate_arg(doc: &Document, validate_arg: &ast::Argument) -> bool {
1289    if let ast::Value::Object(rules) = &validate_arg.value {
1290        for (field_name, constraints_val) in rules {
1291            if let ast::Value::Object(constraints) = constraints_val {
1292                if let Some(field_val) = doc.data.get(field_name) {
1293                    for (constraint_name, constraint_val) in constraints {
1294                        if !check_inline_constraint(field_val, constraint_name, constraint_val) {
1295                            return false;
1296                        }
1297                    }
1298                }
1299            }
1300        }
1301    }
1302    true
1303}
1304
1305fn check_inline_constraint(value: &Value, constraint: &str, constraint_val: &ast::Value) -> bool {
1306    match constraint {
1307        "format" => {
1308            if let (Value::String(s), ast::Value::String(fmt)) = (value, constraint_val) {
1309                return match fmt.as_str() {
1310                    "email" => {
1311                        s.contains('@')
1312                            && s.split('@')
1313                                .nth(1)
1314                                .map(|d| d.contains('.'))
1315                                .unwrap_or(false)
1316                    }
1317                    "url" => s.starts_with("http://") || s.starts_with("https://"),
1318                    "uuid" => uuid::Uuid::parse_str(s).is_ok(),
1319                    _ => true,
1320                };
1321            }
1322            true
1323        }
1324        "min" => {
1325            let n = match value {
1326                Value::Int(i) => *i as f64,
1327                Value::Float(f) => *f,
1328                _ => return true,
1329            };
1330            let min = match constraint_val {
1331                ast::Value::Float(f) => *f,
1332                ast::Value::Int(i) => *i as f64,
1333                _ => return true,
1334            };
1335            n >= min
1336        }
1337        "max" => {
1338            let n = match value {
1339                Value::Int(i) => *i as f64,
1340                Value::Float(f) => *f,
1341                _ => return true,
1342            };
1343            let max = match constraint_val {
1344                ast::Value::Float(f) => *f,
1345                ast::Value::Int(i) => *i as f64,
1346                _ => return true,
1347            };
1348            n <= max
1349        }
1350        "minLength" => {
1351            if let (Value::String(s), ast::Value::Int(n)) = (value, constraint_val) {
1352                return s.len() >= *n as usize;
1353            }
1354            true
1355        }
1356        "maxLength" => {
1357            if let (Value::String(s), ast::Value::Int(n)) = (value, constraint_val) {
1358                return s.len() <= *n as usize;
1359            }
1360            true
1361        }
1362        "pattern" => {
1363            if let (Value::String(s), ast::Value::String(pat)) = (value, constraint_val) {
1364                if let Ok(re) = regex::Regex::new(pat) {
1365                    return re.is_match(s);
1366                }
1367            }
1368            true
1369        }
1370        _ => true,
1371    }
1372}
1373
1374fn arg_string(args: &[ast::Argument], name: &str) -> Option<String> {
1375    args.iter().find(|a| a.name == name).and_then(|a| {
1376        if let ast::Value::String(s) = &a.value {
1377            Some(s.clone())
1378        } else {
1379            None
1380        }
1381    })
1382}
1383
1384fn arg_i64(args: &[ast::Argument], name: &str) -> Option<i64> {
1385    args.iter().find(|a| a.name == name).and_then(|a| {
1386        if let ast::Value::Int(i) = &a.value {
1387            Some(*i)
1388        } else {
1389            None
1390        }
1391    })
1392}
1393
1394/// Apply projection and collect @defer'd field names separately
1395fn apply_projection_and_defer(
1396    mut doc: Document,
1397    fields: &[ast::Field],
1398    _options: &ExecutionOptions,
1399) -> (Document, Vec<String>) {
1400    if fields.is_empty() {
1401        return (doc, vec![]);
1402    }
1403    let mut proj = HashMap::new();
1404    let mut deferred = Vec::new();
1405
1406    for f in fields {
1407        // @defer directive: omit from response, record field name
1408        if f.directives.iter().any(|d| d.name == "defer") {
1409            deferred.push(f.alias.as_ref().unwrap_or(&f.name).clone());
1410            continue;
1411        }
1412        // Computed field: name starts with __compute__ sentinel
1413        if f.name == "__compute__" {
1414            let alias = f.alias.as_deref().unwrap_or("computed");
1415            if let Some(expr) = f.arguments.iter().find(|a| a.name == "expr") {
1416                if let ast::Value::String(template) = &expr.value {
1417                    let result = eval_template(template, &doc.data);
1418                    proj.insert(alias.to_string(), Value::String(result));
1419                }
1420            }
1421            continue;
1422        }
1423        if f.name == "id" {
1424            // User requested 'id'. Pull from data strictly.
1425            if let Some(v) = doc.data.get("id") {
1426                proj.insert(f.alias.as_ref().unwrap_or(&f.name).clone(), v.clone());
1427            } else {
1428                // Not in data, return null
1429                proj.insert(f.alias.as_ref().unwrap_or(&f.name).clone(), Value::Null);
1430            }
1431        } else if f.name == "_sid" {
1432            proj.insert(
1433                f.alias.as_ref().unwrap_or(&f.name).clone(),
1434                Value::String(doc._sid.clone()),
1435            );
1436        } else if let Some(v) = doc.data.get(&f.name) {
1437            proj.insert(f.alias.as_ref().unwrap_or(&f.name).clone(), v.clone());
1438        }
1439    }
1440    doc.data = proj;
1441    (doc, deferred)
1442}
1443
1444/// Simple `$fieldName` template evaluator
1445fn eval_template(template: &str, data: &HashMap<String, Value>) -> String {
1446    let mut result = template.to_string();
1447    for (k, v) in data {
1448        let placeholder = format!("${}", k);
1449        if result.contains(&placeholder) {
1450            result = result.replace(&placeholder, &v.to_string());
1451        }
1452    }
1453    result
1454}
1455
1456/// Apply window function (rollingAvg, rollingSum, rollingMin, rollingMax) on the docs Vec
1457fn apply_window_function(
1458    docs: &mut Vec<Document>,
1459    alias: &str,
1460    field: &str,
1461    function: &str,
1462    window: usize,
1463) {
1464    if docs.is_empty() || window == 0 {
1465        return;
1466    }
1467    let values: Vec<Option<f64>> = docs
1468        .iter()
1469        .map(|d| match d.data.get(field) {
1470            Some(Value::Int(i)) => Some(*i as f64),
1471            Some(Value::Float(f)) => Some(*f),
1472            _ => None,
1473        })
1474        .collect();
1475
1476    for (i, doc) in docs.iter_mut().enumerate() {
1477        let start = if i + 1 >= window { i + 1 - window } else { 0 };
1478        let window_vals: Vec<f64> = values[start..=i].iter().filter_map(|v| *v).collect();
1479        if window_vals.is_empty() {
1480            continue;
1481        }
1482        let result = match function {
1483            "rollingAvg" | "avg" => window_vals.iter().sum::<f64>() / window_vals.len() as f64,
1484            "rollingSum" | "sum" => window_vals.iter().sum::<f64>(),
1485            "rollingMin" | "min" => window_vals.iter().cloned().fold(f64::INFINITY, f64::min),
1486            "rollingMax" | "max" => window_vals
1487                .iter()
1488                .cloned()
1489                .fold(f64::NEG_INFINITY, f64::max),
1490            _ => window_vals.iter().sum::<f64>() / window_vals.len() as f64,
1491        };
1492        doc.data.insert(alias.to_string(), Value::Float(result));
1493    }
1494}
1495
1496/// Downsample time-series: bucket docs by interval, aggregate value fields
1497fn apply_downsample(
1498    docs: Vec<Document>,
1499    interval: &str,
1500    aggregation: &str,
1501    value_fields: &[String],
1502) -> Vec<Document> {
1503    let interval_secs: i64 = parse_interval(interval);
1504    if interval_secs <= 0 {
1505        return docs;
1506    }
1507
1508    // Group by bucket
1509    let mut buckets: std::collections::BTreeMap<i64, Vec<Document>> =
1510        std::collections::BTreeMap::new();
1511    let mut leftover = Vec::new();
1512
1513    for doc in docs {
1514        // Try common timestamp field names
1515        let ts = ["timestamp", "ts", "created_at", "time"]
1516            .iter()
1517            .find_map(|&k| doc.data.get(k))
1518            .and_then(|v| match v {
1519                Value::String(s) => chrono::DateTime::parse_from_rfc3339(s)
1520                    .ok()
1521                    .map(|dt| dt.timestamp()),
1522                Value::Int(i) => Some(*i),
1523                _ => None,
1524            });
1525
1526        if let Some(t) = ts {
1527            let bucket = (t / interval_secs) * interval_secs;
1528            buckets.entry(bucket).or_default().push(doc);
1529        } else {
1530            leftover.push(doc);
1531        }
1532    }
1533
1534    let mut result = Vec::new();
1535    for (bucket_ts, group) in buckets {
1536        let mut data = HashMap::new();
1537        data.insert(
1538            "timestamp".to_string(),
1539            Value::String(
1540                chrono::DateTime::from_timestamp(bucket_ts, 0)
1541                    .map(|dt: chrono::DateTime<chrono::Utc>| dt.to_rfc3339())
1542                    .unwrap_or_default(),
1543            ),
1544        );
1545        data.insert("count".to_string(), Value::Int(group.len() as i64));
1546
1547        for field in value_fields {
1548            if field == "timestamp" || field == "count" {
1549                continue;
1550            }
1551            let nums: Vec<f64> = group
1552                .iter()
1553                .filter_map(|d| match d.data.get(field) {
1554                    Some(Value::Int(i)) => Some(*i as f64),
1555                    Some(Value::Float(f)) => Some(*f),
1556                    _ => None,
1557                })
1558                .collect();
1559            if nums.is_empty() {
1560                continue;
1561            }
1562            let agg = match aggregation {
1563                "sum" => nums.iter().sum::<f64>(),
1564                "min" => nums.iter().cloned().fold(f64::INFINITY, f64::min),
1565                "max" => nums.iter().cloned().fold(f64::NEG_INFINITY, f64::max),
1566                "count" => nums.len() as f64,
1567                _ => nums.iter().sum::<f64>() / nums.len() as f64, // avg default
1568            };
1569            data.insert(field.clone(), Value::Float(agg));
1570        }
1571
1572        result.push(Document {
1573            _sid: bucket_ts.to_string(),
1574            data,
1575        });
1576    }
1577
1578    result.extend(leftover);
1579    result
1580}
1581
1582fn parse_interval(s: &str) -> i64 {
1583    let s = s.trim();
1584    if s.ends_with('s') {
1585        s[..s.len() - 1].parse().unwrap_or(0)
1586    } else if s.ends_with('m') {
1587        s[..s.len() - 1].parse::<i64>().unwrap_or(0) * 60
1588    } else if s.ends_with('h') {
1589        s[..s.len() - 1].parse::<i64>().unwrap_or(0) * 3600
1590    } else if s.ends_with('d') {
1591        s[..s.len() - 1].parse::<i64>().unwrap_or(0) * 86400
1592    } else {
1593        s.parse().unwrap_or(3600)
1594    }
1595}
1596
1597/// Register an event handler. Spawns a background task that fires a mutation
1598/// each time the trigger event is received on the relevant collection.
1599async fn execute_handler_registration(
1600    db: &Aurora,
1601    handler: &ast::HandlerDef,
1602    _options: &ExecutionOptions,
1603) -> Result<ExecutionResult> {
1604    use crate::pubsub::events::ChangeType;
1605
1606    let collection = match &handler.trigger {
1607        ast::HandlerTrigger::Insert { collection }
1608        | ast::HandlerTrigger::Update { collection }
1609        | ast::HandlerTrigger::Delete { collection } => {
1610            collection.as_deref().unwrap_or("*").to_string()
1611        }
1612        _ => "*".to_string(),
1613    };
1614
1615    let trigger_type = match &handler.trigger {
1616        ast::HandlerTrigger::Insert { .. } => Some(ChangeType::Insert),
1617        ast::HandlerTrigger::Update { .. } => Some(ChangeType::Update),
1618        ast::HandlerTrigger::Delete { .. } => Some(ChangeType::Delete),
1619        _ => None,
1620    };
1621
1622    let mut listener = if collection == "*" {
1623        db.pubsub.listen_all()
1624    } else {
1625        db.pubsub.listen(collection.clone())
1626    };
1627
1628    let db_clone = db.clone();
1629    let action = handler.action.clone();
1630    let handler_name = handler.name.clone();
1631
1632    tokio::spawn(async move {
1633        loop {
1634            match listener.recv().await {
1635                Ok(event) => {
1636                    // Check event type matches trigger
1637                    let matches = trigger_type
1638                        .as_ref()
1639                        .map(|t| &event.change_type == t)
1640                        .unwrap_or(true);
1641                    if !matches {
1642                        continue;
1643                    }
1644
1645                    // Build context from the triggering event.
1646                    // Always set $_id from event._sid — for deletes, event.document is None
1647                    // but the document ID is still in event._sid.
1648                    let mut vars = HashMap::new();
1649                    vars.insert("_id".to_string(), ast::Value::String(event._sid.clone()));
1650                    if let Some(doc) = &event.document {
1651                        // Overlay field variables from the document data (insert/update)
1652                        for (k, v) in &doc.data {
1653                            vars.insert(format!("_{}", k), db_value_to_ast_value(v));
1654                        }
1655                    }
1656
1657                    let _ = execute_mutation_op(
1658                        &db_clone,
1659                        &action,
1660                        &vars,
1661                        &HashMap::new(),
1662                        &ExecutionOptions::default(),
1663                        &HashMap::new(),
1664                    )
1665                    .await;
1666                }
1667                Err(_) => {
1668                    eprintln!("[handler:{}] channel closed, stopping", handler_name);
1669                    break;
1670                }
1671            }
1672        }
1673    });
1674
1675    eprintln!(
1676        "[handler] '{}' registered on '{}'",
1677        handler.name, collection
1678    );
1679
1680    let mut data = HashMap::new();
1681    data.insert("name".to_string(), Value::String(handler.name.clone()));
1682    data.insert("collection".to_string(), Value::String(collection));
1683    data.insert(
1684        "status".to_string(),
1685        Value::String("registered".to_string()),
1686    );
1687
1688    Ok(ExecutionResult::Query(QueryResult {
1689        collection: "__handler".to_string(),
1690        documents: vec![Document {
1691            _sid: handler.name.clone(),
1692            data,
1693        }],
1694        total_count: Some(1),
1695        deferred_fields: vec![],
1696        explain: None,
1697    }))
1698}
1699
1700fn db_value_to_ast_value(v: &Value) -> ast::Value {
1701    match v {
1702        Value::Null => ast::Value::Null,
1703        Value::Bool(b) => ast::Value::Boolean(*b),
1704        Value::Int(i) => ast::Value::Int(*i),
1705        Value::Float(f) => ast::Value::Float(*f),
1706        Value::String(s) => ast::Value::String(s.clone()),
1707        Value::Uuid(u) => ast::Value::String(u.to_string()),
1708        Value::DateTime(dt) => ast::Value::String(dt.to_rfc3339()),
1709        Value::Array(arr) => ast::Value::Array(arr.iter().map(db_value_to_ast_value).collect()),
1710        Value::Object(m) => ast::Value::Object(
1711            m.iter()
1712                .map(|(k, v)| (k.clone(), db_value_to_ast_value(v)))
1713                .collect(),
1714        ),
1715    }
1716}
1717
1718async fn execute_mutation(
1719    db: &Aurora,
1720    mutation: &ast::Mutation,
1721    vars: &HashMap<String, ast::Value>,
1722    options: &ExecutionOptions,
1723    fragments: &HashMap<String, FragmentDef>,
1724) -> Result<ExecutionResult> {
1725    use crate::transaction::ACTIVE_TRANSACTION_ID;
1726
1727    validate_required_variables(&mutation.variable_definitions, vars)?;
1728
1729    // If there's already an active transaction in scope, run ops directly inside
1730    // it — the caller owns commit/rollback. Creating a nested transaction would
1731    // shadow the outer ACTIVE_TRANSACTION_ID and prevent the outer rollback from
1732    // undoing these writes.
1733    let already_in_tx = ACTIVE_TRANSACTION_ID
1734        .try_with(|id| *id)
1735        .ok()
1736        .and_then(|id| db.transaction_manager.active_transactions.get(&id))
1737        .is_some();
1738
1739    if already_in_tx {
1740        let mut results = Vec::new();
1741        let mut context = HashMap::new();
1742        for mut_op in &mutation.operations {
1743            let res = execute_mutation_op(db, mut_op, vars, &context, options, fragments).await?;
1744            if let Some(alias) = &mut_op.alias {
1745                if let Some(doc) = res.returned_documents.first() {
1746                    let mut m = serde_json::Map::new();
1747                    for (k, v) in &doc.data {
1748                        m.insert(k.clone(), aurora_value_to_json_value(v));
1749                    }
1750                    m.insert("id".to_string(), JsonValue::String(doc._sid.clone()));
1751                    context.insert(alias.clone(), JsonValue::Object(m));
1752                }
1753            }
1754            results.push(res);
1755        }
1756        return if results.len() == 1 {
1757            Ok(ExecutionResult::Mutation(results.remove(0)))
1758        } else {
1759            Ok(ExecutionResult::Batch(
1760                results.into_iter().map(ExecutionResult::Mutation).collect(),
1761            ))
1762        };
1763    }
1764
1765    // No active transaction — wrap the mutation block in one so a failure in any
1766    // operation rolls back all previous operations in the same block.
1767    let tx_id = db.begin_transaction().await;
1768
1769    let exec_result = ACTIVE_TRANSACTION_ID
1770        .scope(tx_id, async {
1771            let mut results = Vec::new();
1772            let mut context = HashMap::new();
1773            for mut_op in &mutation.operations {
1774                let res =
1775                    execute_mutation_op(db, mut_op, vars, &context, options, fragments).await?;
1776                if let Some(alias) = &mut_op.alias {
1777                    if let Some(doc) = res.returned_documents.first() {
1778                        let mut m = serde_json::Map::new();
1779                        for (k, v) in &doc.data {
1780                            m.insert(k.clone(), aurora_value_to_json_value(v));
1781                        }
1782                        m.insert("id".to_string(), JsonValue::String(doc._sid.clone()));
1783                        context.insert(alias.clone(), JsonValue::Object(m));
1784                    }
1785                }
1786                results.push(res);
1787            }
1788            Ok::<_, crate::error::AqlError>(results)
1789        })
1790        .await;
1791
1792    match exec_result {
1793        Ok(mut results) => {
1794            db.commit_transaction(tx_id).await?;
1795            if results.len() == 1 {
1796                Ok(ExecutionResult::Mutation(results.remove(0)))
1797            } else {
1798                Ok(ExecutionResult::Batch(
1799                    results.into_iter().map(ExecutionResult::Mutation).collect(),
1800                ))
1801            }
1802        }
1803        Err(e) => {
1804            let _ = db.rollback_transaction(tx_id).await;
1805            Err(e)
1806        }
1807    }
1808}
1809
1810fn execute_mutation_op<'a>(
1811    db: &'a Aurora,
1812    mut_op: &'a ast::MutationOperation,
1813    variables: &'a HashMap<String, ast::Value>,
1814    context: &'a ExecutionContext,
1815    options: &'a ExecutionOptions,
1816    fragments: &'a HashMap<String, FragmentDef>,
1817) -> futures::future::BoxFuture<'a, Result<MutationResult>> {
1818    use futures::future::FutureExt;
1819    async move {
1820        match &mut_op.operation {
1821            MutationOp::Insert { collection, data } => {
1822                let resolved = resolve_value(data, variables, context);
1823                let doc = db
1824                    .aql_insert(collection, aql_value_to_hashmap(&resolved, variables)?)
1825                    .await?;
1826                let returned = if !mut_op.selection_set.is_empty() && options.apply_projections {
1827                    let fields = collect_fields(
1828                        &mut_op.selection_set,
1829                        fragments,
1830                        variables,
1831                        Some(collection),
1832                    )
1833                    .unwrap_or_default();
1834                    vec![apply_projection(doc, &fields, options)]
1835                } else {
1836                    vec![doc]
1837                };
1838                Ok(MutationResult {
1839                    operation: "insert".to_string(),
1840                    collection: collection.clone(),
1841                    affected_count: 1,
1842                    returned_documents: returned,
1843                })
1844            }
1845            MutationOp::Update {
1846                collection,
1847                filter,
1848                data,
1849            } => {
1850                let cf = if let Some(f) = filter {
1851                    Some(compile_filter(f)?)
1852                } else {
1853                    None
1854                };
1855                let update_data =
1856                    aql_value_to_hashmap(&resolve_value(data, variables, context), variables)?;
1857
1858                // Scan and update matching documents
1859                let mut affected = 0;
1860                let mut returned = Vec::new();
1861
1862                // For now, we use a simple scan-and-update.
1863                // In production, we'd use indices if the filter allows.
1864                let vars_arc = Arc::new(variables.clone());
1865                let cf_arc = cf.map(Arc::new);
1866
1867                let matches = db.scan_and_filter(
1868                    collection,
1869                    |doc| {
1870                        if let Some(ref filter) = cf_arc {
1871                            matches_filter(doc, filter, &vars_arc)
1872                        } else {
1873                            true
1874                        }
1875                    },
1876                    None,
1877                )?;
1878
1879                let fields = if !mut_op.selection_set.is_empty() {
1880                    Some(
1881                        collect_fields(
1882                            &mut_op.selection_set,
1883                            fragments,
1884                            variables,
1885                            Some(collection),
1886                        )
1887                        .unwrap_or_default(),
1888                    )
1889                } else {
1890                    None
1891                };
1892
1893                for doc in matches {
1894                    let mut new_data = doc.data.clone();
1895                    for (k, v) in &update_data {
1896                        let applied = apply_field_modifier(new_data.get(k), v);
1897                        new_data.insert(k.clone(), applied);
1898                    }
1899
1900                    let updated_doc = db
1901                        .aql_update_document(collection, &doc._sid, new_data)
1902                        .await?;
1903
1904                    affected += 1;
1905                    if let Some(ref f) = fields {
1906                        returned.push(apply_projection(updated_doc, f, options));
1907                    }
1908                }
1909
1910                Ok(MutationResult {
1911                    operation: "update".to_string(),
1912                    collection: collection.clone(),
1913                    affected_count: affected,
1914                    returned_documents: returned,
1915                })
1916            }
1917            MutationOp::Delete { collection, filter } => {
1918                let cf = if let Some(f) = filter {
1919                    Some(compile_filter(f)?)
1920                } else {
1921                    None
1922                };
1923
1924                let mut affected = 0;
1925                let vars_arc = Arc::new(variables.clone());
1926                let cf_arc = cf.map(Arc::new);
1927
1928                let matches = db.scan_and_filter(
1929                    collection,
1930                    |doc| {
1931                        if let Some(ref filter) = cf_arc {
1932                            matches_filter(doc, filter, &vars_arc)
1933                        } else {
1934                            true
1935                        }
1936                    },
1937                    None,
1938                )?;
1939
1940                for doc in matches {
1941                    db.aql_delete_document(collection, &doc._sid).await?;
1942                    affected += 1;
1943                }
1944
1945                Ok(MutationResult {
1946                    operation: "delete".to_string(),
1947                    collection: collection.clone(),
1948                    affected_count: affected,
1949                    returned_documents: vec![],
1950                })
1951            }
1952            MutationOp::InsertMany { collection, data } => {
1953                let mut affected = 0;
1954                let mut returned = Vec::new();
1955                for item in data {
1956                    let resolved = resolve_value(item, variables, context);
1957                    let doc = db
1958                        .aql_insert(collection, aql_value_to_hashmap(&resolved, variables)?)
1959                        .await?;
1960                    affected += 1;
1961                    if !mut_op.selection_set.is_empty() && options.apply_projections {
1962                        let fields = collect_fields(
1963                            &mut_op.selection_set,
1964                            fragments,
1965                            variables,
1966                            Some(collection),
1967                        )
1968                        .unwrap_or_default();
1969                        returned.push(apply_projection(doc, &fields, options));
1970                    } else {
1971                        returned.push(doc);
1972                    }
1973                }
1974                Ok(MutationResult {
1975                    operation: "insertMany".to_string(),
1976                    collection: collection.clone(),
1977                    affected_count: affected,
1978                    returned_documents: returned,
1979                })
1980            }
1981            MutationOp::Upsert {
1982                collection,
1983                filter,
1984                data,
1985            } => {
1986                let update_data =
1987                    aql_value_to_hashmap(&resolve_value(data, variables, context), variables)?;
1988                let cf = if let Some(f) = filter {
1989                    Some(compile_filter(f)?)
1990                } else {
1991                    None
1992                };
1993                let vars_arc = Arc::new(variables.clone());
1994                let cf_arc = cf.map(Arc::new);
1995                let matches = db.scan_and_filter(
1996                    collection,
1997                    |doc| {
1998                        if let Some(ref filter) = cf_arc {
1999                            matches_filter(doc, filter, &vars_arc)
2000                        } else {
2001                            true
2002                        }
2003                    },
2004                    Some(1),
2005                )?;
2006                let doc = if let Some(existing) = matches.into_iter().next() {
2007                    db.aql_update_document(collection, &existing._sid, update_data)
2008                        .await?
2009                } else {
2010                    db.aql_insert(collection, update_data).await?
2011                };
2012                Ok(MutationResult {
2013                    operation: "upsert".to_string(),
2014                    collection: collection.clone(),
2015                    affected_count: 1,
2016                    returned_documents: vec![doc],
2017                })
2018            }
2019            MutationOp::Transaction { operations } => {
2020                let mut all_returned = Vec::new();
2021                let mut total_affected = 0;
2022                for op in operations {
2023                    let res =
2024                        execute_mutation_op(db, op, variables, context, options, fragments).await?;
2025                    total_affected += res.affected_count;
2026                    all_returned.extend(res.returned_documents);
2027                }
2028                Ok(MutationResult {
2029                    operation: "transaction".to_string(),
2030                    collection: String::new(),
2031                    affected_count: total_affected,
2032                    returned_documents: all_returned,
2033                })
2034            }
2035            MutationOp::EnqueueJobs {
2036                job_type,
2037                payloads,
2038                priority,
2039                max_retries,
2040            } => {
2041                let workers = db.workers.as_ref().ok_or_else(|| {
2042                    AqlError::new(
2043                        ErrorCode::QueryError,
2044                        "Worker system not initialised".to_string(),
2045                    )
2046                })?;
2047                let job_priority = match priority {
2048                    ast::JobPriority::Low => crate::workers::JobPriority::Low,
2049                    ast::JobPriority::Normal => crate::workers::JobPriority::Normal,
2050                    ast::JobPriority::High => crate::workers::JobPriority::High,
2051                    ast::JobPriority::Critical => crate::workers::JobPriority::Critical,
2052                };
2053                let mut returned = Vec::new();
2054                for payload in payloads {
2055                    let resolved = resolve_value(payload, variables, context);
2056                    let json_payload: std::collections::HashMap<String, serde_json::Value> =
2057                        if let ast::Value::Object(map) = &resolved {
2058                            map.iter()
2059                                .map(|(k, v)| (k.clone(), aql_value_to_json(v)))
2060                                .collect()
2061                        } else {
2062                            std::collections::HashMap::new()
2063                        };
2064                    let mut job =
2065                        crate::workers::Job::new(job_type.clone()).with_priority(job_priority);
2066                    for (k, v) in json_payload {
2067                        job = job.add_field(k, v);
2068                    }
2069                    if let Some(retries) = max_retries {
2070                        job = job.with_max_retries(*retries);
2071                    }
2072                    let job_id = workers.enqueue(job).await?;
2073                    let mut doc = crate::types::Document::new();
2074                    doc._sid = job_id.clone();
2075                    doc.data.insert("job_id".to_string(), Value::String(job_id));
2076                    doc.data
2077                        .insert("job_type".to_string(), Value::String(job_type.clone()));
2078                    doc.data
2079                        .insert("status".to_string(), Value::String("pending".to_string()));
2080                    returned.push(doc);
2081                }
2082                let count = returned.len();
2083                Ok(MutationResult {
2084                    operation: "enqueueJobs".to_string(),
2085                    collection: "__jobs".to_string(),
2086                    affected_count: count,
2087                    returned_documents: returned,
2088                })
2089            }
2090            MutationOp::Import { collection, data } => {
2091                let mut affected = 0;
2092                let mut returned = Vec::new();
2093                let fields = if !mut_op.selection_set.is_empty() {
2094                    Some(
2095                        collect_fields(
2096                            &mut_op.selection_set,
2097                            fragments,
2098                            variables,
2099                            Some(collection),
2100                        )
2101                        .unwrap_or_default(),
2102                    )
2103                } else {
2104                    None
2105                };
2106                for item in data {
2107                    let resolved = resolve_value(item, variables, context);
2108                    let map = aql_value_to_hashmap(&resolved, variables)?;
2109                    let doc = db.aql_insert(collection, map).await?;
2110                    affected += 1;
2111                    if let Some(ref f) = fields {
2112                        returned.push(apply_projection(doc, f, options));
2113                    }
2114                }
2115                Ok(MutationResult {
2116                    operation: "import".to_string(),
2117                    collection: collection.clone(),
2118                    affected_count: affected,
2119                    returned_documents: returned,
2120                })
2121            }
2122            MutationOp::Export {
2123                collection,
2124                format: _,
2125            } => {
2126                let docs = db.scan_and_filter(collection, |_| true, None)?;
2127                let fields = if !mut_op.selection_set.is_empty() {
2128                    Some(
2129                        collect_fields(
2130                            &mut_op.selection_set,
2131                            fragments,
2132                            variables,
2133                            Some(collection),
2134                        )
2135                        .unwrap_or_default(),
2136                    )
2137                } else {
2138                    None
2139                };
2140                let returned: Vec<Document> = if let Some(ref f) = fields {
2141                    docs.into_iter().map(|d| apply_projection(d, f, options)).collect()
2142                } else {
2143                    docs
2144                };
2145                let count = returned.len();
2146                Ok(MutationResult {
2147                    operation: "export".to_string(),
2148                    collection: collection.clone(),
2149                    affected_count: count,
2150                    returned_documents: returned,
2151                })
2152            }
2153            MutationOp::EnqueueJob {
2154                job_type,
2155                payload,
2156                priority,
2157                scheduled_at,
2158                max_retries,
2159            } => {
2160                let workers = db.workers.as_ref().ok_or_else(|| {
2161                    AqlError::new(
2162                        ErrorCode::QueryError,
2163                        "Worker system not initialised".to_string(),
2164                    )
2165                })?;
2166
2167                // Convert AQL priority to Job priority
2168                let job_priority = match priority {
2169                    ast::JobPriority::Low => crate::workers::JobPriority::Low,
2170                    ast::JobPriority::Normal => crate::workers::JobPriority::Normal,
2171                    ast::JobPriority::High => crate::workers::JobPriority::High,
2172                    ast::JobPriority::Critical => crate::workers::JobPriority::Critical,
2173                };
2174
2175                // Convert AQL Value payload to serde_json payload
2176                let resolved = resolve_value(payload, variables, context);
2177                let json_payload: std::collections::HashMap<String, serde_json::Value> =
2178                    if let ast::Value::Object(map) = &resolved {
2179                        map.iter()
2180                            .map(|(k, v)| (k.clone(), aql_value_to_json(v)))
2181                            .collect()
2182                    } else {
2183                        std::collections::HashMap::new()
2184                    };
2185
2186                let mut job =
2187                    crate::workers::Job::new(job_type.clone()).with_priority(job_priority);
2188
2189                for (k, v) in json_payload {
2190                    job = job.add_field(k, v);
2191                }
2192
2193                if let Some(retries) = max_retries {
2194                    job = job.with_max_retries(*retries);
2195                }
2196
2197                if let Some(scheduled) = scheduled_at {
2198                    if let Ok(dt) = scheduled.parse::<chrono::DateTime<chrono::Utc>>() {
2199                        job = job.scheduled_at(dt);
2200                    }
2201                }
2202
2203                let job_id = workers.enqueue(job).await?;
2204
2205                let mut doc = crate::types::Document::new();
2206                doc._sid = job_id.clone();
2207                doc.data
2208                    .insert("job_id".to_string(), crate::types::Value::String(job_id));
2209                doc.data.insert(
2210                    "job_type".to_string(),
2211                    crate::types::Value::String(job_type.clone()),
2212                );
2213                doc.data.insert(
2214                    "status".to_string(),
2215                    crate::types::Value::String("pending".to_string()),
2216                );
2217
2218                Ok(MutationResult {
2219                    operation: "enqueueJob".to_string(),
2220                    collection: "__jobs".to_string(),
2221                    affected_count: 1,
2222                    returned_documents: vec![doc],
2223                })
2224            }
2225        }
2226    }
2227    .boxed()
2228}
2229
2230async fn execute_subscription(
2231    db: &Aurora,
2232    sub: &ast::Subscription,
2233    vars: &HashMap<String, ast::Value>,
2234    _options: &ExecutionOptions,
2235) -> Result<ExecutionResult> {
2236    let vars: HashMap<String, ast::Value> = vars.clone();
2237
2238    let selection = sub.selection_set.first().ok_or_else(|| {
2239        AqlError::new(
2240            ErrorCode::QueryError,
2241            "Subscription must have a selection".to_string(),
2242        )
2243    })?;
2244
2245    if let Selection::Field(f) = selection {
2246        let collection = f.name.clone();
2247        let filter = extract_filter_from_args(&f.arguments)?;
2248
2249        let mut listener = db.pubsub.listen(&collection);
2250
2251        if let Some(f) = filter {
2252            let event_filter = ast_filter_to_event_filter(&f, &vars)?;
2253            listener = listener.filter(event_filter);
2254        }
2255
2256        Ok(ExecutionResult::Subscription(SubscriptionResult {
2257            subscription_id: uuid::Uuid::new_v4().to_string(),
2258            collection,
2259            stream: Some(listener),
2260        }))
2261    } else {
2262        Err(AqlError::new(
2263            ErrorCode::QueryError,
2264            "Invalid subscription selection".to_string(),
2265        ))
2266    }
2267}
2268
2269fn ast_filter_to_event_filter(
2270    filter: &AqlFilter,
2271    vars: &HashMap<String, ast::Value>,
2272) -> Result<crate::pubsub::EventFilter> {
2273    use crate::pubsub::EventFilter;
2274
2275    match filter {
2276        AqlFilter::Eq(f, v) => Ok(EventFilter::FieldEquals(
2277            f.clone(),
2278            aql_value_to_db_value(v, vars)?,
2279        )),
2280        AqlFilter::Ne(f, v) => Ok(EventFilter::Ne(f.clone(), aql_value_to_db_value(v, vars)?)),
2281        AqlFilter::Gt(f, v) => Ok(EventFilter::Gt(f.clone(), aql_value_to_db_value(v, vars)?)),
2282        AqlFilter::Gte(f, v) => Ok(EventFilter::Gte(f.clone(), aql_value_to_db_value(v, vars)?)),
2283        AqlFilter::Lt(f, v) => Ok(EventFilter::Lt(f.clone(), aql_value_to_db_value(v, vars)?)),
2284        AqlFilter::Lte(f, v) => Ok(EventFilter::Lte(f.clone(), aql_value_to_db_value(v, vars)?)),
2285        AqlFilter::In(f, v) => Ok(EventFilter::In(f.clone(), aql_value_to_db_value(v, vars)?)),
2286        AqlFilter::NotIn(f, v) => Ok(EventFilter::NotIn(
2287            f.clone(),
2288            aql_value_to_db_value(v, vars)?,
2289        )),
2290        AqlFilter::Contains(f, v) => Ok(EventFilter::Contains(
2291            f.clone(),
2292            aql_value_to_db_value(v, vars)?,
2293        )),
2294        // ContainsAny/ContainsAll don't have EventFilter equivalents; fall back to Contains
2295        AqlFilter::ContainsAny(f, v) | AqlFilter::ContainsAll(f, v) => Ok(EventFilter::Contains(
2296            f.clone(),
2297            aql_value_to_db_value(v, vars)?,
2298        )),
2299        AqlFilter::StartsWith(f, v) => Ok(EventFilter::StartsWith(
2300            f.clone(),
2301            aql_value_to_db_value(v, vars)?,
2302        )),
2303        AqlFilter::EndsWith(f, v) => Ok(EventFilter::EndsWith(
2304            f.clone(),
2305            aql_value_to_db_value(v, vars)?,
2306        )),
2307        AqlFilter::Matches(f, v) => {
2308            let pattern = match aql_value_to_db_value(v, vars)? {
2309                crate::types::Value::String(s) => s,
2310                other => other.to_string(),
2311            };
2312            let re = regex::Regex::new(&pattern).map_err(|e| {
2313                crate::error::AqlError::invalid_operation(format!("Invalid regex pattern: {}", e))
2314            })?;
2315            Ok(EventFilter::Matches(f.clone(), re))
2316        }
2317        AqlFilter::IsNull(f) => Ok(EventFilter::IsNull(f.clone())),
2318        AqlFilter::IsNotNull(f) => Ok(EventFilter::IsNotNull(f.clone())),
2319        AqlFilter::And(filters) => {
2320            let mut mapped = Vec::new();
2321            for f in filters {
2322                mapped.push(ast_filter_to_event_filter(f, vars)?);
2323            }
2324            Ok(EventFilter::And(mapped))
2325        }
2326        AqlFilter::Or(filters) => {
2327            let mut mapped = Vec::new();
2328            for f in filters {
2329                mapped.push(ast_filter_to_event_filter(f, vars)?);
2330            }
2331            Ok(EventFilter::Or(mapped))
2332        }
2333        AqlFilter::Not(f) => Ok(EventFilter::Not(Box::new(ast_filter_to_event_filter(
2334            f, vars,
2335        )?))),
2336    }
2337}
2338
2339async fn execute_introspection(
2340    db: &Aurora,
2341    intro: &ast::IntrospectionQuery,
2342) -> Result<ExecutionResult> {
2343    let names = db.list_collection_names();
2344    let want_fields = intro.fields.is_empty()
2345        || intro
2346            .fields
2347            .iter()
2348            .any(|f| f == "collections" || f == "fields");
2349
2350    let documents: Vec<Document> = names
2351        .iter()
2352        .filter_map(|name| {
2353            // Skip internal system collections from introspection unless asked
2354            if name.starts_with('_') {
2355                return None;
2356            }
2357            let col = db.get_collection_definition(name).ok()?;
2358            let mut data = HashMap::new();
2359            data.insert("name".to_string(), Value::String(name.clone()));
2360
2361            if want_fields {
2362                let field_list: Vec<Value> = col
2363                    .fields
2364                    .iter()
2365                    .map(|(fname, fdef)| {
2366                        let mut fdata = HashMap::new();
2367                        fdata.insert("name".to_string(), Value::String(fname.clone()));
2368                        fdata.insert(
2369                            "type".to_string(),
2370                            Value::String(fdef.field_type.to_string()),
2371                        );
2372                        fdata.insert("required".to_string(), Value::Bool(!fdef.nullable));
2373                        fdata.insert("indexed".to_string(), Value::Bool(fdef.indexed));
2374                        fdata.insert("unique".to_string(), Value::Bool(fdef.unique));
2375                        if !fdef.validations.is_empty() {
2376                            let vcons: Vec<Value> = fdef
2377                                .validations
2378                                .iter()
2379                                .map(|c| Value::String(format!("{:?}", c)))
2380                                .collect();
2381                            fdata.insert("validations".to_string(), Value::Array(vcons));
2382                        }
2383                        Value::Object(fdata)
2384                    })
2385                    .collect();
2386                data.insert("fields".to_string(), Value::Array(field_list));
2387            }
2388
2389            Some(Document {
2390                _sid: name.clone(),
2391                data,
2392            })
2393        })
2394        .collect();
2395
2396    let count = documents.len();
2397    Ok(ExecutionResult::Query(QueryResult {
2398        collection: "__schema".to_string(),
2399        documents,
2400        total_count: Some(count),
2401        deferred_fields: vec![],
2402        explain: None,
2403    }))
2404}
2405
2406async fn execute_schema(
2407    db: &Aurora,
2408    schema: &ast::Schema,
2409    _options: &ExecutionOptions,
2410) -> Result<ExecutionResult> {
2411    let mut last_collection = String::new();
2412
2413    for op in &schema.operations {
2414        match op {
2415            ast::SchemaOp::DefineCollection {
2416                name,
2417                fields,
2418                if_not_exists,
2419                ..
2420            } => {
2421                last_collection = name.clone();
2422
2423                // If if_not_exists is true, check if it already exists
2424                if *if_not_exists {
2425                    if db.get_collection_definition(name).is_ok() {
2426                        continue;
2427                    }
2428                }
2429
2430                let mut field_defs = Vec::new();
2431                for field in fields {
2432                    field_defs.push((field.name.as_str(), build_field_def(field)));
2433                }
2434                db.new_collection(name, field_defs).await?;
2435            }
2436            ast::SchemaOp::AlterCollection { name, actions } => {
2437                last_collection = name.clone();
2438                for action in actions {
2439                    match action {
2440                        ast::AlterAction::AddField { field, default } => {
2441                            let def = build_field_def(field);
2442                            db.add_field_to_schema(name, field.name.clone(), def)
2443                                .await?;
2444                            if let Some(default_val) = default {
2445                                let db_val = aql_value_to_db_value(default_val, &HashMap::new())?;
2446                                let docs = db.get_all_collection(name).await?;
2447                                for doc in docs {
2448                                    if !doc.data.contains_key(&field.name) {
2449                                        db.update_document(
2450                                            name,
2451                                            &doc._sid,
2452                                            vec![(&field.name, db_val.clone())],
2453                                        )
2454                                        .await?;
2455                                    }
2456                                }
2457                            }
2458                        }
2459                        ast::AlterAction::DropField(field_name) => {
2460                            db.drop_field_from_schema(name, field_name.clone()).await?;
2461                        }
2462                        ast::AlterAction::RenameField { from, to } => {
2463                            db.rename_field_in_schema(name, from.clone(), to.clone())
2464                                .await?;
2465                            let docs = db.get_all_collection(name).await?;
2466                            for mut doc in docs {
2467                                if let Some(val) = doc.data.remove(from.as_str()) {
2468                                    doc.data.insert(to.clone(), val);
2469                                    let key = format!("{}:{}", name, doc._sid);
2470                                    db.put(key, serde_json::to_vec(&doc)?, None).await?;
2471                                }
2472                            }
2473                        }
2474                        ast::AlterAction::ModifyField(field) => {
2475                            db.modify_field_in_schema(
2476                                name,
2477                                field.name.clone(),
2478                                build_field_def(field),
2479                            )
2480                            .await?;
2481                        }
2482                    }
2483                }
2484            }
2485            ast::SchemaOp::DropCollection { name, .. } => {
2486                db.drop_collection_schema(name).await?;
2487                last_collection = name.clone();
2488            }
2489        }
2490    }
2491
2492    Ok(ExecutionResult::Schema(SchemaResult {
2493        operation: "schema".to_string(),
2494        collection: last_collection,
2495        status: "done".to_string(),
2496    }))
2497}
2498
2499fn map_ast_type(anno: &ast::TypeAnnotation) -> FieldType {
2500    let scalar = match anno.name.to_lowercase().as_str() {
2501        "string" => ScalarType::String,
2502        "int" | "integer" => ScalarType::Int,
2503        "float" | "double" => ScalarType::Float,
2504        "bool" | "boolean" => ScalarType::Bool,
2505        "uuid" => ScalarType::Uuid,
2506        "object" => ScalarType::Object,
2507        "array" => ScalarType::Array,
2508        _ => ScalarType::Any,
2509    };
2510
2511    if anno.is_array {
2512        FieldType::Array(scalar)
2513    } else {
2514        match scalar {
2515            ScalarType::Object => FieldType::Object,
2516            ScalarType::Any => FieldType::Any,
2517            _ => FieldType::Scalar(scalar),
2518        }
2519    }
2520}
2521
2522/// Parse @validate directive arguments into FieldValidationConstraints
2523fn parse_validate_directive(
2524    directive: &ast::Directive,
2525) -> Vec<crate::types::FieldValidationConstraint> {
2526    use crate::types::FieldValidationConstraint as FVC;
2527    let mut constraints = Vec::new();
2528    for arg in &directive.arguments {
2529        match arg.name.as_str() {
2530            "format" => {
2531                if let ast::Value::String(s) = &arg.value {
2532                    constraints.push(FVC::Format(s.clone()));
2533                }
2534            }
2535            "min" => {
2536                let n = match &arg.value {
2537                    ast::Value::Float(f) => Some(*f),
2538                    ast::Value::Int(i) => Some(*i as f64),
2539                    _ => None,
2540                };
2541                if let Some(n) = n {
2542                    constraints.push(FVC::Min(n));
2543                }
2544            }
2545            "max" => {
2546                let n = match &arg.value {
2547                    ast::Value::Float(f) => Some(*f),
2548                    ast::Value::Int(i) => Some(*i as f64),
2549                    _ => None,
2550                };
2551                if let Some(n) = n {
2552                    constraints.push(FVC::Max(n));
2553                }
2554            }
2555            "minLength" => {
2556                if let ast::Value::Int(i) = &arg.value {
2557                    constraints.push(FVC::MinLength(*i));
2558                }
2559            }
2560            "maxLength" => {
2561                if let ast::Value::Int(i) = &arg.value {
2562                    constraints.push(FVC::MaxLength(*i));
2563                }
2564            }
2565            "pattern" => {
2566                if let ast::Value::String(s) = &arg.value {
2567                    constraints.push(FVC::Pattern(s.clone()));
2568                }
2569            }
2570            _ => {}
2571        }
2572    }
2573    constraints
2574}
2575
2576/// Build a FieldDefinition from a field's type + directives
2577fn build_field_def(field: &ast::FieldDef) -> FieldDefinition {
2578    let field_type = map_ast_type(&field.field_type);
2579    let mut indexed = false;
2580    let mut unique = false;
2581    let mut validations = Vec::new();
2582    for directive in &field.directives {
2583        match directive.name.as_str() {
2584            "indexed" | "index" => indexed = true,
2585            "unique" => {
2586                unique = true;
2587                indexed = true;
2588            }
2589            "primary" => {
2590                indexed = true;
2591                unique = true;
2592            }
2593            "validate" => validations.extend(parse_validate_directive(directive)),
2594            _ => {}
2595        }
2596    }
2597    FieldDefinition {
2598        field_type,
2599        unique,
2600        indexed,
2601        nullable: !field.field_type.is_required,
2602        validations,
2603    }
2604}
2605
2606async fn execute_migration(
2607    db: &Aurora,
2608    migration: &ast::Migration,
2609    _options: &ExecutionOptions,
2610) -> Result<ExecutionResult> {
2611    let mut steps_applied = 0;
2612    let mut last_version = String::new();
2613
2614    for step in &migration.steps {
2615        last_version = step.version.clone();
2616
2617        if db.is_migration_applied(&step.version).await? {
2618            eprintln!(
2619                "[migration] version '{}' already applied — skipping",
2620                step.version
2621            );
2622            continue;
2623        }
2624
2625        eprintln!("[migration] applying version '{}'", step.version);
2626
2627        for action in &step.actions {
2628            match action {
2629                ast::MigrationAction::Schema(schema_op) => {
2630                    execute_single_schema_op(db, schema_op).await?;
2631                }
2632                ast::MigrationAction::DataMigration(dm) => {
2633                    execute_data_migration(db, dm).await?;
2634                }
2635            }
2636        }
2637
2638        db.mark_migration_applied(&step.version).await?;
2639        steps_applied += 1;
2640        eprintln!("[migration] version '{}' applied", step.version);
2641    }
2642
2643    let status = if steps_applied > 0 {
2644        "applied".to_string()
2645    } else {
2646        "skipped".to_string()
2647    };
2648
2649    Ok(ExecutionResult::Migration(MigrationResult {
2650        version: last_version,
2651        steps_applied,
2652        status,
2653    }))
2654}
2655
2656/// Execute a single SchemaOp — shared by execute_schema and execute_migration.
2657async fn execute_single_schema_op(db: &Aurora, op: &ast::SchemaOp) -> Result<()> {
2658    match op {
2659        ast::SchemaOp::DefineCollection {
2660            name,
2661            fields,
2662            if_not_exists,
2663            ..
2664        } => {
2665            if *if_not_exists && db.get_collection_definition(name).is_ok() {
2666                return Ok(());
2667            }
2668            let field_defs: Vec<(&str, FieldDefinition)> = fields
2669                .iter()
2670                .map(|f| (f.name.as_str(), build_field_def(f)))
2671                .collect();
2672            db.new_collection(name, field_defs).await?;
2673        }
2674        ast::SchemaOp::AlterCollection { name, actions } => {
2675            for action in actions {
2676                match action {
2677                    ast::AlterAction::AddField { field, default } => {
2678                        db.add_field_to_schema(name, field.name.clone(), build_field_def(field))
2679                            .await?;
2680                        if let Some(default_val) = default {
2681                            let db_val = aql_value_to_db_value(default_val, &HashMap::new())?;
2682                            let docs = db.get_all_collection(name).await?;
2683                            for doc in docs {
2684                                if !doc.data.contains_key(&field.name) {
2685                                    db.update_document(
2686                                        name,
2687                                        &doc._sid,
2688                                        vec![(&field.name, db_val.clone())],
2689                                    )
2690                                    .await?;
2691                                }
2692                            }
2693                        }
2694                    }
2695                    ast::AlterAction::DropField(field_name) => {
2696                        db.drop_field_from_schema(name, field_name.clone()).await?;
2697                    }
2698                    ast::AlterAction::RenameField { from, to } => {
2699                        db.rename_field_in_schema(name, from.clone(), to.clone())
2700                            .await?;
2701                        // Rename the field key in every existing document (one read-modify-write per doc)
2702                        let docs = db.get_all_collection(name).await?;
2703                        for mut doc in docs {
2704                            if let Some(val) = doc.data.remove(from.as_str()) {
2705                                doc.data.insert(to.clone(), val);
2706                                let key = format!("{}:{}", name, doc._sid);
2707                                db.put(key, serde_json::to_vec(&doc)?, None).await?;
2708                            }
2709                        }
2710                    }
2711                    ast::AlterAction::ModifyField(field) => {
2712                        db.modify_field_in_schema(name, field.name.clone(), build_field_def(field))
2713                            .await?;
2714                    }
2715                }
2716            }
2717        }
2718        ast::SchemaOp::DropCollection { name, if_exists } => {
2719            if *if_exists && db.get_collection_definition(name).is_err() {
2720                return Ok(());
2721            }
2722            db.drop_collection_schema(name).await?;
2723        }
2724    }
2725    Ok(())
2726}
2727
2728/// Apply a `migrate data in <collection> { set field = expr where { ... } }` block.
2729async fn execute_data_migration(db: &Aurora, dm: &ast::DataMigration) -> Result<()> {
2730    let docs = db.get_all_collection(&dm.collection).await?;
2731
2732    for doc in docs {
2733        for transform in &dm.transforms {
2734            let matches = match &transform.filter {
2735                Some(filter) => {
2736                    let compiled = compile_filter(filter)?;
2737                    matches_filter(&doc, &compiled, &HashMap::new())
2738                }
2739                None => true,
2740            };
2741
2742            if matches {
2743                let new_value = eval_migration_expr(&transform.expression, &doc);
2744                let mut updates = HashMap::new();
2745                updates.insert(transform.field.clone(), new_value);
2746                db.aql_update_document(&dm.collection, &doc._sid, updates)
2747                    .await?;
2748            }
2749        }
2750    }
2751    Ok(())
2752}
2753
2754/// Evaluate a migration expression string to a `Value`.
2755///
2756/// Supported forms (evaluated in order):
2757/// - String literal:  `"hello"` → `Value::String("hello")`
2758/// - Boolean:         `true` / `false`
2759/// - Null:            `null`
2760/// - Integer:         `42`
2761/// - Float:           `3.14`
2762/// - Field reference: `field_name` (looks up the field in the current document)
2763fn eval_migration_expr(expr: &str, doc: &Document) -> Value {
2764    let expr = expr.trim();
2765
2766    if expr.starts_with('"') && expr.ends_with('"') && expr.len() >= 2 {
2767        return Value::String(expr[1..expr.len() - 1].to_string());
2768    }
2769    if expr == "true" {
2770        return Value::Bool(true);
2771    }
2772    if expr == "false" {
2773        return Value::Bool(false);
2774    }
2775    if expr == "null" {
2776        return Value::Null;
2777    }
2778    if let Ok(n) = expr.parse::<i64>() {
2779        return Value::Int(n);
2780    }
2781    if let Ok(f) = expr.parse::<f64>() {
2782        return Value::Float(f);
2783    }
2784    if let Some(v) = doc.data.get(expr) {
2785        return v.clone();
2786    }
2787
2788    Value::Null
2789}
2790
2791fn extract_filter_from_args(args: &[ast::Argument]) -> Result<Option<AqlFilter>> {
2792    for a in args {
2793        if a.name == "where" || a.name == "filter" {
2794            return Ok(Some(value_to_filter(&a.value)?));
2795        }
2796    }
2797    Ok(None)
2798}
2799
2800fn extract_order_by(args: &[ast::Argument]) -> Vec<ast::Ordering> {
2801    let mut orderings = Vec::new();
2802    for a in args {
2803        if a.name == "orderBy" {
2804            match &a.value {
2805                ast::Value::String(f) => orderings.push(ast::Ordering {
2806                    field: f.clone(),
2807                    direction: ast::SortDirection::Asc,
2808                }),
2809                ast::Value::Object(obj) => {
2810                    // Formal syntax: { field: "fieldname", direction: ASC|DESC }
2811                    if let (Some(ast::Value::String(field_name)), Some(dir_val)) =
2812                        (obj.get("field"), obj.get("direction"))
2813                    {
2814                        let direction = match dir_val {
2815                            ast::Value::Enum(s) | ast::Value::String(s) => {
2816                                if s.to_uppercase() == "DESC" {
2817                                    ast::SortDirection::Desc
2818                                } else {
2819                                    ast::SortDirection::Asc
2820                                }
2821                            }
2822                            _ => ast::SortDirection::Asc,
2823                        };
2824                        orderings.push(ast::Ordering {
2825                            field: field_name.clone(),
2826                            direction,
2827                        });
2828                    } else {
2829                        // Shorthand: { fieldName: "ASC", fieldName2: "DESC" }
2830                        for (field, dir_val) in obj {
2831                            let direction = match dir_val {
2832                                ast::Value::Enum(s) | ast::Value::String(s) => {
2833                                    if s.to_uppercase() == "DESC" {
2834                                        ast::SortDirection::Desc
2835                                    } else {
2836                                        ast::SortDirection::Asc
2837                                    }
2838                                }
2839                                _ => ast::SortDirection::Asc,
2840                            };
2841                            orderings.push(ast::Ordering {
2842                                field: field.clone(),
2843                                direction,
2844                            });
2845                        }
2846                    }
2847                }
2848                _ => {}
2849            }
2850        }
2851    }
2852    orderings
2853}
2854
2855fn apply_ordering(docs: &mut [Document], orderings: &[ast::Ordering]) {
2856    docs.sort_by(|a, b| {
2857        for o in orderings {
2858            let cmp = compare_values(a.data.get(&o.field), b.data.get(&o.field));
2859            if cmp != std::cmp::Ordering::Equal {
2860                return match o.direction {
2861                    ast::SortDirection::Asc => cmp,
2862                    ast::SortDirection::Desc => cmp.reverse(),
2863                };
2864            }
2865        }
2866        std::cmp::Ordering::Equal
2867    });
2868}
2869
2870fn compare_values(a: Option<&Value>, b: Option<&Value>) -> std::cmp::Ordering {
2871    match (a, b) {
2872        (None, None) => std::cmp::Ordering::Equal,
2873        (None, Some(_)) => std::cmp::Ordering::Less,
2874        (Some(_), None) => std::cmp::Ordering::Greater,
2875        (Some(av), Some(bv)) => av.partial_cmp(bv).unwrap_or(std::cmp::Ordering::Equal),
2876    }
2877}
2878
2879pub fn extract_pagination(args: &[ast::Argument]) -> (Option<usize>, usize) {
2880    let (mut limit, mut offset) = (None, 0);
2881    for a in args {
2882        match a.name.as_str() {
2883            "limit" | "first" => {
2884                if let ast::Value::Int(n) = a.value {
2885                    limit = Some(n as usize);
2886                }
2887            }
2888            "offset" | "skip" => {
2889                if let ast::Value::Int(n) = a.value {
2890                    offset = n as usize;
2891                }
2892            }
2893            _ => {}
2894        }
2895    }
2896    (limit, offset)
2897}
2898
2899fn extract_cursor_pagination(
2900    args: &[ast::Argument],
2901) -> (Option<usize>, Option<String>, Option<usize>, Option<String>) {
2902    let (mut first, mut after, mut last, mut before) = (None, None, None, None);
2903    for a in args {
2904        match a.name.as_str() {
2905            "first" => {
2906                if let ast::Value::Int(n) = a.value {
2907                    first = Some(n as usize);
2908                }
2909            }
2910            "after" => {
2911                if let ast::Value::String(s) = &a.value {
2912                    after = Some(s.clone());
2913                }
2914            }
2915            "last" => {
2916                if let ast::Value::Int(n) = a.value {
2917                    last = Some(n as usize);
2918                }
2919            }
2920            "before" => {
2921                if let ast::Value::String(s) = &a.value {
2922                    before = Some(s.clone());
2923                }
2924            }
2925            _ => {}
2926        }
2927    }
2928    (first, after, last, before)
2929}
2930
2931fn execute_connection(
2932    mut docs: Vec<Document>,
2933    sub_fields: &[ast::Field],
2934    limit: Option<usize>,
2935    fragments: &HashMap<String, FragmentDef>,
2936    variables: &HashMap<String, ast::Value>,
2937    options: &ExecutionOptions,
2938) -> QueryResult {
2939    let has_next_page = if let Some(l) = limit {
2940        docs.len() > l
2941    } else {
2942        false
2943    };
2944
2945    if has_next_page {
2946        docs.truncate(limit.unwrap());
2947    }
2948
2949    let mut edges = Vec::with_capacity(docs.len());
2950    let mut end_cursor = String::new();
2951
2952    // Find the 'node' selection fields once (potentially expanding fragments)
2953    let node_fields = if let Some(edges_field) = sub_fields.iter().find(|f| f.name == "edges") {
2954        let edges_sub_fields =
2955            collect_fields(&edges_field.selection_set, fragments, variables, None)
2956                .unwrap_or_default();
2957        if let Some(node_field) = edges_sub_fields.into_iter().find(|f| f.name == "node") {
2958            collect_fields(&node_field.selection_set, fragments, variables, None)
2959                .unwrap_or_default()
2960        } else {
2961            Vec::new()
2962        }
2963    } else {
2964        Vec::new()
2965    };
2966
2967    for doc in &docs {
2968        let cursor = doc._sid.clone();
2969        end_cursor = cursor.clone();
2970
2971        let mut edge_data = HashMap::new();
2972        edge_data.insert("cursor".to_string(), Value::String(cursor));
2973
2974        let node_doc = if node_fields.is_empty() {
2975            doc.clone()
2976        } else {
2977            apply_projection(doc.clone(), &node_fields, options)
2978        };
2979
2980        edge_data.insert("node".to_string(), Value::Object(node_doc.data));
2981        edges.push(Value::Object(edge_data));
2982    }
2983
2984    let mut page_info = HashMap::new();
2985    page_info.insert("hasNextPage".to_string(), Value::Bool(has_next_page));
2986    page_info.insert("endCursor".to_string(), Value::String(end_cursor));
2987
2988    let mut conn_data = HashMap::new();
2989    conn_data.insert("edges".to_string(), Value::Array(edges));
2990    conn_data.insert("pageInfo".to_string(), Value::Object(page_info));
2991
2992    QueryResult {
2993        collection: String::new(),
2994        documents: vec![Document {
2995            _sid: "connection".to_string(),
2996            data: conn_data,
2997        }],
2998        total_count: None,
2999        deferred_fields: vec![],
3000        explain: None,
3001    }
3002}
3003
3004pub fn matches_filter(
3005    doc: &Document,
3006    filter: &CompiledFilter,
3007    vars: &HashMap<String, ast::Value>,
3008) -> bool {
3009    match filter {
3010        CompiledFilter::Eq(f, v) => {
3011            if let Some(dv) = doc.data.get(f) {
3012                values_equal(dv, v, vars)
3013            } else if f == "_sid" {
3014                values_equal(&Value::String(doc._sid.clone()), v, vars)
3015            } else {
3016                false
3017            }
3018        }
3019        CompiledFilter::Ne(f, v) => {
3020            if let Some(dv) = doc.data.get(f) {
3021                !values_equal(dv, v, vars)
3022            } else if f == "_sid" {
3023                !values_equal(&Value::String(doc._sid.clone()), v, vars)
3024            } else {
3025                true
3026            }
3027        }
3028        CompiledFilter::Gt(f, v) => {
3029            let dv_opt = doc.data.get(f).cloned().or_else(|| {
3030                if f == "_sid" { Some(Value::String(doc._sid.clone())) } else { None }
3031            });
3032            dv_opt.map_or(false, |dv| {
3033                if let Ok(bv) = aql_value_to_db_value(v, vars) {
3034                    return dv > bv;
3035                }
3036                false
3037            })
3038        }
3039        CompiledFilter::Gte(f, v) => {
3040            let dv_opt = doc.data.get(f).cloned().or_else(|| {
3041                if f == "_sid" { Some(Value::String(doc._sid.clone())) } else { None }
3042            });
3043            dv_opt.map_or(false, |dv| {
3044                if let Ok(bv) = aql_value_to_db_value(v, vars) {
3045                    return dv >= bv;
3046                }
3047                false
3048            })
3049        }
3050        CompiledFilter::Lt(f, v) => {
3051            let dv_opt = doc.data.get(f).cloned().or_else(|| {
3052                if f == "_sid" { Some(Value::String(doc._sid.clone())) } else { None }
3053            });
3054            dv_opt.map_or(false, |dv| {
3055                if let Ok(bv) = aql_value_to_db_value(v, vars) {
3056                    return dv < bv;
3057                }
3058                false
3059            })
3060        }
3061        CompiledFilter::Lte(f, v) => {
3062            let dv_opt = doc.data.get(f).cloned().or_else(|| {
3063                if f == "_sid" { Some(Value::String(doc._sid.clone())) } else { None }
3064            });
3065            dv_opt.map_or(false, |dv| {
3066                if let Ok(bv) = aql_value_to_db_value(v, vars) {
3067                    return dv <= bv;
3068                }
3069                false
3070            })
3071        }
3072        CompiledFilter::In(f, v) => {
3073            let dv_opt = doc.data.get(f).cloned().or_else(|| {
3074                if f == "_sid" { Some(Value::String(doc._sid.clone())) } else { None }
3075            });
3076            dv_opt.map_or(false, |dv| {
3077                if let Ok(Value::Array(arr)) = aql_value_to_db_value(v, vars) {
3078                    return arr.contains(&dv);
3079                }
3080                false
3081            })
3082        }
3083        CompiledFilter::NotIn(f, v) => {
3084            let dv_opt = doc.data.get(f).cloned().or_else(|| {
3085                if f == "_sid" { Some(Value::String(doc._sid.clone())) } else { None }
3086            });
3087            dv_opt.map_or(true, |dv| {
3088                if let Ok(Value::Array(arr)) = aql_value_to_db_value(v, vars) {
3089                    return !arr.contains(&dv);
3090                }
3091                true
3092            })
3093        }
3094        CompiledFilter::Contains(f, v) => {
3095            let dv_opt = doc.data.get(f).cloned().or_else(|| {
3096                if f == "_sid" { Some(Value::String(doc._sid.clone())) } else { None }
3097            });
3098            if let Some(dv) = dv_opt {
3099                match (dv, resolve_if_variable(v, vars)) {
3100                    (Value::String(s), ast::Value::String(sub)) => s.contains(&*sub),
3101                    (Value::Array(arr), _) => {
3102                        if let Ok(bv) = aql_value_to_db_value(v, vars) {
3103                            return arr.contains(&bv);
3104                        }
3105                        false
3106                    }
3107                    _ => false,
3108                }
3109            } else {
3110                false
3111            }
3112        }
3113        // Field array must contain at least one of the provided values
3114        CompiledFilter::ContainsAny(f, v) => {
3115            let dv_opt = doc.data.get(f).cloned().or_else(|| {
3116                if f == "_sid" { Some(Value::String(doc._sid.clone())) } else { None }
3117            });
3118            if let (Some(Value::Array(field_arr)), Ok(Value::Array(check_arr))) =
3119                (dv_opt, aql_value_to_db_value(v, vars))
3120            {
3121                check_arr.iter().any(|item| field_arr.contains(item))
3122            } else {
3123                false
3124            }
3125        }
3126        // Field array must contain all of the provided values
3127        CompiledFilter::ContainsAll(f, v) => {
3128            let dv_opt = doc.data.get(f).cloned().or_else(|| {
3129                if f == "_sid" { Some(Value::String(doc._sid.clone())) } else { None }
3130            });
3131            if let (Some(Value::Array(field_arr)), Ok(Value::Array(check_arr))) =
3132                (dv_opt, aql_value_to_db_value(v, vars))
3133            {
3134                check_arr.iter().all(|item| field_arr.contains(item))
3135            } else {
3136                false
3137            }
3138        }
3139        CompiledFilter::StartsWith(f, v) => {
3140            let dv_opt = doc.data.get(f).cloned().or_else(|| {
3141                if f == "_sid" { Some(Value::String(doc._sid.clone())) } else { None }
3142            });
3143            if let (Some(Value::String(s)), ast::Value::String(pre)) =
3144                (dv_opt, resolve_if_variable(v, vars))
3145            {
3146                s.starts_with(&*pre)
3147            } else {
3148                false
3149            }
3150        }
3151        CompiledFilter::EndsWith(f, v) => {
3152            let dv_opt = doc.data.get(f).cloned().or_else(|| {
3153                if f == "_sid" { Some(Value::String(doc._sid.clone())) } else { None }
3154            });
3155            if let (Some(Value::String(s)), ast::Value::String(suf)) =
3156                (dv_opt, resolve_if_variable(v, vars))
3157            {
3158                s.ends_with(&*suf)
3159            } else {
3160                false
3161            }
3162        }
3163        CompiledFilter::Matches(f, re) => {
3164            let dv_opt = doc.data.get(f).cloned().or_else(|| {
3165                if f == "_sid" { Some(Value::String(doc._sid.clone())) } else { None }
3166            });
3167            if let Some(Value::String(s)) = dv_opt {
3168                re.is_match(&s)
3169            } else {
3170                false
3171            }
3172        }
3173        CompiledFilter::IsNull(f) => {
3174            if f == "_sid" {
3175                false // _sid is never null
3176            } else {
3177                doc.data.get(f).map_or(true, |v| matches!(v, Value::Null))
3178            }
3179        }
3180        CompiledFilter::IsNotNull(f) => {
3181            if f == "_sid" {
3182                true // _sid is always non-null
3183            } else {
3184                doc.data.get(f).map_or(false, |v| !matches!(v, Value::Null))
3185            }
3186        }
3187        CompiledFilter::And(fs) => fs.iter().all(|f| matches_filter(doc, f, vars)),
3188        CompiledFilter::Or(fs) => fs.iter().any(|f| matches_filter(doc, f, vars)),
3189        CompiledFilter::Not(f) => !matches_filter(doc, f, vars),
3190    }
3191}
3192
3193/// Apply an update modifier object to an existing field value.
3194/// Modifiers: `{ increment: N }`, `{ decrement: N }`, `{ push: V }`,
3195/// `{ pull: V }`, `{ addToSet: V }`.
3196/// Any non-modifier object is returned as-is (plain field replace).
3197fn apply_field_modifier(existing: Option<&Value>, new_val: &Value) -> Value {
3198    if let Value::Object(modifier) = new_val {
3199        if let Some(delta) = modifier.get("increment") {
3200            match (existing, delta) {
3201                (Some(Value::Int(c)), Value::Int(d)) => return Value::Int(c + d),
3202                (Some(Value::Float(c)), Value::Float(d)) => return Value::Float(c + d),
3203                (Some(Value::Int(c)), Value::Float(d)) => return Value::Float(*c as f64 + d),
3204                _ => {}
3205            }
3206        }
3207        if let Some(delta) = modifier.get("decrement") {
3208            match (existing, delta) {
3209                (Some(Value::Int(c)), Value::Int(d)) => return Value::Int(c - d),
3210                (Some(Value::Float(c)), Value::Float(d)) => return Value::Float(c - d),
3211                (Some(Value::Int(c)), Value::Float(d)) => return Value::Float(*c as f64 - d),
3212                _ => {}
3213            }
3214        }
3215        if let Some(item) = modifier.get("push") {
3216            if let Some(Value::Array(mut arr)) = existing.cloned() {
3217                arr.push(item.clone());
3218                return Value::Array(arr);
3219            }
3220            return Value::Array(vec![item.clone()]);
3221        }
3222        if let Some(item) = modifier.get("pull") {
3223            if let Some(Value::Array(arr)) = existing {
3224                let filtered: Vec<Value> = arr.iter().filter(|v| *v != item).cloned().collect();
3225                return Value::Array(filtered);
3226            }
3227            return Value::Array(vec![]);
3228        }
3229        if let Some(item) = modifier.get("addToSet") {
3230            if let Some(Value::Array(mut arr)) = existing.cloned() {
3231                if !arr.contains(item) {
3232                    arr.push(item.clone());
3233                }
3234                return Value::Array(arr);
3235            }
3236            return Value::Array(vec![item.clone()]);
3237        }
3238    }
3239    new_val.clone()
3240}
3241
3242fn values_equal(dv: &Value, av: &ast::Value, vars: &HashMap<String, ast::Value>) -> bool {
3243    if let Ok(bv) = aql_value_to_db_value(av, vars) {
3244        return dv == &bv;
3245    }
3246    false
3247}
3248
3249fn resolve_if_variable<'a>(
3250    v: &'a ast::Value,
3251    vars: &'a HashMap<String, ast::Value>,
3252) -> &'a ast::Value {
3253    if let ast::Value::Variable(n) = v {
3254        vars.get(n).unwrap_or(v)
3255    } else {
3256        v
3257    }
3258}
3259
3260pub fn apply_projection(
3261    doc: Document,
3262    fields: &[ast::Field],
3263    options: &ExecutionOptions,
3264) -> Document {
3265    let (projected, _) = apply_projection_and_defer(doc, fields, options);
3266    projected
3267}
3268
3269/// Apply projections, resolving lookup fields from foreign collections.
3270/// Returns (projected_doc, deferred_field_names).
3271async fn apply_projection_with_lookups(
3272    db: &Aurora,
3273    mut doc: Document,
3274    fields: &[ast::Field],
3275    vars: &HashMap<String, ast::Value>,
3276    options: &ExecutionOptions,
3277) -> Result<(Document, Vec<String>)> {
3278    if fields.is_empty() {
3279        return Ok((doc, vec![]));
3280    }
3281    let mut proj = HashMap::new();
3282    let mut deferred = Vec::new();
3283
3284    for f in fields {
3285        // @defer
3286        if f.directives.iter().any(|d| d.name == "defer") {
3287            deferred.push(f.alias.as_ref().unwrap_or(&f.name).clone());
3288            continue;
3289        }
3290
3291        // Lookup field: has collection + localField + foreignField arguments
3292        let coll_arg = f.arguments.iter().find(|a| a.name == "collection");
3293        let local_arg = f.arguments.iter().find(|a| a.name == "localField");
3294        let foreign_arg = f.arguments.iter().find(|a| a.name == "foreignField");
3295
3296        if let (Some(c), Some(lf), Some(ff)) = (coll_arg, local_arg, foreign_arg) {
3297            // Resolve variable references in lookup arguments
3298            let c_val = resolve_if_variable(&c.value, vars);
3299            let lf_val = resolve_if_variable(&lf.value, vars);
3300            let ff_val = resolve_if_variable(&ff.value, vars);
3301            let foreign_coll = if let ast::Value::String(s) = c_val {
3302                s.as_str()
3303            } else {
3304                continue;
3305            };
3306            let local_field = if let ast::Value::String(s) = lf_val {
3307                s.as_str()
3308            } else {
3309                continue;
3310            };
3311            let foreign_field = if let ast::Value::String(s) = ff_val {
3312                s.as_str()
3313            } else {
3314                continue;
3315            };
3316
3317            // Get the local field value to match against the foreign collection
3318            let local_val = doc.data.get(local_field).cloned().or_else(|| {
3319                if local_field == "_sid" {
3320                    Some(Value::String(doc._sid.clone()))
3321                } else {
3322                    None
3323                }
3324            });
3325
3326            if let Some(match_val) = local_val {
3327                // Optional filter — deeply resolve variables inside it before compiling
3328                let extra_filter = f
3329                    .arguments
3330                    .iter()
3331                    .find(|a| a.name == "where")
3332                    .and_then(|a| {
3333                        let resolved = resolve_ast_deep(&a.value, vars);
3334                        value_to_filter(&resolved).ok()
3335                    });
3336
3337                let vars_arc = Arc::new(vars.clone());
3338                let foreign_docs = db.scan_and_filter(
3339                    foreign_coll,
3340                    |fdoc| {
3341                        let field_match = fdoc
3342                            .data
3343                            .get(foreign_field)
3344                            .map(|v| values_equal_db(v, &match_val))
3345                            .unwrap_or(foreign_field == "_sid" && fdoc._sid == match_val.to_string());
3346                        if !field_match {
3347                            return false;
3348                        }
3349                        if let Some(ref ef) = extra_filter {
3350                            let compiled = compile_filter(ef)
3351                                .unwrap_or(CompiledFilter::Eq("_".into(), ast::Value::Null));
3352                            matches_filter(fdoc, &compiled, &vars_arc)
3353                        } else {
3354                            true
3355                        }
3356                    },
3357                    None,
3358                )?;
3359
3360                // Apply sub-projection to foreign docs
3361                let sub_projected: Vec<Value> = if f.selection_set.is_empty() {
3362                    foreign_docs
3363                        .into_iter()
3364                        .map(|fd| Value::Object(fd.data))
3365                        .collect()
3366                } else {
3367                    let sub_fields: Vec<ast::Field> = f
3368                        .selection_set
3369                        .iter()
3370                        .filter_map(|sel| {
3371                            if let Selection::Field(sf) = sel {
3372                                Some(sf.clone())
3373                            } else {
3374                                None
3375                            }
3376                        })
3377                        .collect();
3378                    foreign_docs
3379                        .into_iter()
3380                        .map(|fd| {
3381                            let (proj_fd, _) = apply_projection_and_defer(fd, &sub_fields, options);
3382                            Value::Object(proj_fd.data)
3383                        })
3384                        .collect()
3385                };
3386
3387                let alias = f.alias.as_ref().unwrap_or(&f.name).clone();
3388                proj.insert(alias, Value::Array(sub_projected));
3389            }
3390            continue;
3391        }
3392
3393        // Computed field
3394        if f.name == "__compute__" {
3395            let alias = f.alias.as_deref().unwrap_or("computed");
3396            if let Some(expr) = f.arguments.iter().find(|a| a.name == "expr") {
3397                let resolved_expr = resolve_if_variable(&expr.value, vars);
3398                if let ast::Value::String(template) = resolved_expr {
3399                    let result = eval_template(template, &doc.data);
3400                    proj.insert(alias.to_string(), Value::String(result));
3401                }
3402            }
3403            continue;
3404        }
3405
3406        // Normal field
3407        if f.name == "id" {
3408            if let Some(v) = doc.data.get("id") {
3409                proj.insert(f.alias.as_ref().unwrap_or(&f.name).clone(), v.clone());
3410            } else {
3411                proj.insert(f.alias.as_ref().unwrap_or(&f.name).clone(), Value::Null);
3412            }
3413        } else if f.name == "_sid" {
3414            proj.insert(
3415                f.alias.as_ref().unwrap_or(&f.name).clone(),
3416                Value::String(doc._sid.clone()),
3417            );
3418        } else if let Some(v) = doc.data.get(&f.name) {
3419            proj.insert(f.alias.as_ref().unwrap_or(&f.name).clone(), v.clone());
3420        }
3421    }
3422    doc.data = proj;
3423    Ok((doc, deferred))
3424}
3425
3426fn values_equal_db(a: &Value, b: &Value) -> bool {
3427    a == b
3428}
3429
3430pub fn aql_value_to_db_value(v: &ast::Value, vars: &HashMap<String, ast::Value>) -> Result<Value> {
3431    let resolved = resolve_if_variable(v, vars);
3432    match resolved {
3433        ast::Value::Int(i) => Ok(Value::Int(*i)),
3434        ast::Value::Float(f) => Ok(Value::Float(*f)),
3435        ast::Value::Boolean(b) => Ok(Value::Bool(*b)),
3436        ast::Value::String(s) => Ok(Value::String(s.clone())),
3437        ast::Value::Enum(s) => Ok(Value::String(s.clone())),
3438        ast::Value::Null => Ok(Value::Null),
3439        ast::Value::Variable(name) => Err(AqlError::new(
3440            ErrorCode::UndefinedVariable,
3441            format!("Variable '{}' not found", name),
3442        )),
3443        ast::Value::Array(arr) => {
3444            let mut vals = Vec::with_capacity(arr.len());
3445            for v in arr {
3446                vals.push(aql_value_to_db_value(v, vars)?);
3447            }
3448            Ok(Value::Array(vals))
3449        }
3450        ast::Value::Object(obj) => {
3451            let mut map = HashMap::with_capacity(obj.len());
3452            for (k, v) in obj {
3453                map.insert(k.clone(), aql_value_to_db_value(v, vars)?);
3454            }
3455            Ok(Value::Object(map))
3456        }
3457    }
3458}
3459
3460/// Convert AQL AST value to serde_json::Value for job payloads
3461fn aql_value_to_json(v: &ast::Value) -> serde_json::Value {
3462    match v {
3463        ast::Value::Null => serde_json::Value::Null,
3464        ast::Value::Boolean(b) => serde_json::Value::Bool(*b),
3465        ast::Value::Int(i) => serde_json::Value::Number((*i).into()),
3466        ast::Value::Float(f) => serde_json::Number::from_f64(*f)
3467            .map(serde_json::Value::Number)
3468            .unwrap_or(serde_json::Value::Null),
3469        ast::Value::String(s) | ast::Value::Enum(s) => serde_json::Value::String(s.clone()),
3470        ast::Value::Array(arr) => {
3471            serde_json::Value::Array(arr.iter().map(aql_value_to_json).collect())
3472        }
3473        ast::Value::Object(obj) => serde_json::Value::Object(
3474            obj.iter()
3475                .map(|(k, v)| (k.clone(), aql_value_to_json(v)))
3476                .collect(),
3477        ),
3478        ast::Value::Variable(_) => serde_json::Value::Null,
3479    }
3480}
3481
3482fn aql_value_to_hashmap(
3483    v: &ast::Value,
3484    vars: &HashMap<String, ast::Value>,
3485) -> Result<HashMap<String, Value>> {
3486    if let ast::Value::Object(m) = resolve_if_variable(v, vars) {
3487        let mut res = HashMap::new();
3488        for (k, v) in m {
3489            res.insert(k.clone(), aql_value_to_db_value(v, vars)?);
3490        }
3491        Ok(res)
3492    } else {
3493        Err(AqlError::new(
3494            ErrorCode::QueryError,
3495            "Data must be object".to_string(),
3496        ))
3497    }
3498}
3499
3500fn aurora_value_to_json_value(v: &Value) -> JsonValue {
3501    match v {
3502        Value::Null => JsonValue::Null,
3503        Value::String(s) => JsonValue::String(s.clone()),
3504        Value::Int(i) => JsonValue::Number((*i).into()),
3505        Value::Float(f) => serde_json::Number::from_f64(*f)
3506            .map(JsonValue::Number)
3507            .unwrap_or(JsonValue::Null),
3508        Value::Bool(b) => JsonValue::Bool(*b),
3509        Value::Array(arr) => JsonValue::Array(arr.iter().map(aurora_value_to_json_value).collect()),
3510        Value::Object(m) => {
3511            let mut jm = serde_json::Map::new();
3512            for (k, v) in m {
3513                jm.insert(k.clone(), aurora_value_to_json_value(v));
3514            }
3515            JsonValue::Object(jm)
3516        }
3517        Value::Uuid(u) => JsonValue::String(u.to_string()),
3518        Value::DateTime(dt) => JsonValue::String(dt.to_rfc3339()),
3519    }
3520}
3521
3522/// Find an equality filter on an indexed field to allow O(1) lookup short-circuit
3523fn find_indexed_equality_filter(
3524    filter: &AqlFilter,
3525    db: &Aurora,
3526    collection: &str,
3527) -> Option<(String, ast::Value)> {
3528    match filter {
3529        AqlFilter::Eq(field, val) => {
3530            if field == "_sid" || db.has_index(collection, field) {
3531                Some((field.clone(), val.clone()))
3532            } else {
3533                None
3534            }
3535        }
3536        AqlFilter::And(filters) => {
3537            for f in filters {
3538                if let Some(res) = find_indexed_equality_filter(f, db, collection) {
3539                    return Some(res);
3540                }
3541            }
3542            None
3543        }
3544        _ => None,
3545    }
3546}
3547
3548pub fn value_to_filter(v: &ast::Value) -> Result<AqlFilter> {
3549    if let ast::Value::Object(m) = v {
3550        let mut fs = Vec::new();
3551        for (k, val) in m {
3552            match k.as_str() {
3553                "or" => {
3554                    if let ast::Value::Array(arr) = val {
3555                        let mut sub = Vec::new();
3556                        for item in arr {
3557                            sub.push(value_to_filter(item)?);
3558                        }
3559                        return Ok(AqlFilter::Or(sub));
3560                    }
3561                }
3562                "and" => {
3563                    if let ast::Value::Array(arr) = val {
3564                        let mut sub = Vec::new();
3565                        for item in arr {
3566                            sub.push(value_to_filter(item)?);
3567                        }
3568                        return Ok(AqlFilter::And(sub));
3569                    }
3570                }
3571                "not" => {
3572                    return Ok(AqlFilter::Not(Box::new(value_to_filter(val)?)));
3573                }
3574                field => {
3575                    if let ast::Value::Object(ops) = val {
3576                        for (op, ov) in ops {
3577                            match op.as_str() {
3578                                "eq" => fs.push(AqlFilter::Eq(field.to_string(), ov.clone())),
3579                                "ne" => fs.push(AqlFilter::Ne(field.to_string(), ov.clone())),
3580                                "gt" => fs.push(AqlFilter::Gt(field.to_string(), ov.clone())),
3581                                "gte" => fs.push(AqlFilter::Gte(field.to_string(), ov.clone())),
3582                                "lt" => fs.push(AqlFilter::Lt(field.to_string(), ov.clone())),
3583                                "lte" => fs.push(AqlFilter::Lte(field.to_string(), ov.clone())),
3584                                "in" => fs.push(AqlFilter::In(field.to_string(), ov.clone())),
3585                                "notin" => fs.push(AqlFilter::NotIn(field.to_string(), ov.clone())),
3586                                "contains" => {
3587                                    fs.push(AqlFilter::Contains(field.to_string(), ov.clone()))
3588                                }
3589                                "containsAny" => {
3590                                    fs.push(AqlFilter::ContainsAny(field.to_string(), ov.clone()))
3591                                }
3592                                "containsAll" => {
3593                                    fs.push(AqlFilter::ContainsAll(field.to_string(), ov.clone()))
3594                                }
3595                                "startsWith" => {
3596                                    fs.push(AqlFilter::StartsWith(field.to_string(), ov.clone()))
3597                                }
3598                                "endsWith" => {
3599                                    fs.push(AqlFilter::EndsWith(field.to_string(), ov.clone()))
3600                                }
3601                                "matches" => {
3602                                    fs.push(AqlFilter::Matches(field.to_string(), ov.clone()))
3603                                }
3604                                _ => {}
3605                            }
3606                        }
3607                    }
3608                }
3609            }
3610        }
3611        if fs.is_empty() {
3612            Ok(AqlFilter::And(vec![]))
3613        } else if fs.len() == 1 {
3614            Ok(fs.remove(0))
3615        } else {
3616            Ok(AqlFilter::And(fs))
3617        }
3618    } else {
3619        Err(AqlError::new(
3620            ErrorCode::QueryError,
3621            "Filter must be object".to_string(),
3622        ))
3623    }
3624}
3625
3626fn resolve_value(
3627    v: &ast::Value,
3628    vars: &HashMap<String, ast::Value>,
3629    _ctx: &ExecutionContext,
3630) -> ast::Value {
3631    match v {
3632        ast::Value::Variable(n) => vars.get(n).cloned().unwrap_or(ast::Value::Null),
3633        _ => v.clone(),
3634    }
3635}
3636
3637/// Recursively resolve all variable references inside an AST value.
3638/// Unlike `resolve_value` (which only handles a top-level `$var`), this walks
3639/// into objects and arrays so nested references like
3640/// `{ query: $term }` or `{ status: { eq: $activeStatus } }` are fully resolved.
3641fn resolve_ast_deep(v: &ast::Value, vars: &HashMap<String, ast::Value>) -> ast::Value {
3642    match v {
3643        ast::Value::Variable(n) => vars.get(n).cloned().unwrap_or(ast::Value::Null),
3644        ast::Value::Object(m) => ast::Value::Object(
3645            m.iter()
3646                .map(|(k, val)| (k.clone(), resolve_ast_deep(val, vars)))
3647                .collect(),
3648        ),
3649        ast::Value::Array(arr) => {
3650            ast::Value::Array(arr.iter().map(|val| resolve_ast_deep(val, vars)).collect())
3651        }
3652        _ => v.clone(),
3653    }
3654}
3655
3656#[cfg(test)]
3657mod tests {
3658    use super::*;
3659    use crate::{Aurora, AuroraConfig, DurabilityMode, FieldType};
3660    use tempfile::TempDir;
3661
3662    #[tokio::test]
3663    async fn test_executor_integration() {
3664        let td = TempDir::new().unwrap();
3665        let db = Aurora::with_config(AuroraConfig {
3666            db_path: td.path().join("test.db"),
3667            enable_write_buffering: false,
3668            durability_mode: DurabilityMode::Strict,
3669            ..Default::default()
3670        })
3671        .await
3672        .unwrap();
3673        db.new_collection(
3674            "users",
3675            vec![(
3676                "name",
3677                crate::types::FieldDefinition {
3678                    field_type: FieldType::SCALAR_STRING,
3679                    unique: false,
3680                    indexed: true,
3681                    nullable: true,
3682                    ..Default::default()
3683                },
3684            )],
3685        )
3686        .await
3687        .unwrap();
3688        let _ = execute(
3689            &db,
3690            r#"mutation { insertInto(collection: "users", data: { name: "Alice" }) { id name } }"#,
3691            ExecutionOptions::new(),
3692        )
3693        .await
3694        .unwrap();
3695        let res = execute(&db, "query { users { name } }", ExecutionOptions::new())
3696            .await
3697            .unwrap();
3698        if let ExecutionResult::Query(q) = res {
3699            assert_eq!(q.documents.len(), 1);
3700        } else {
3701            panic!("Expected query");
3702        }
3703    }
3704}