Skip to main content

aurora_db/parser/
executor.rs

1//! AQL Executor - Connects parsed AQL to Aurora operations
2//!
3//! Provides the bridge between AQL AST and Aurora's database operations.
4//! All operations (queries, mutations, subscriptions) go through execute().
5
6use super::ast::{self, Field, Filter as AqlFilter, FragmentDef, MutationOp, Operation, Selection};
7use super::executor_utils::{CompiledFilter, compile_filter};
8
9use crate::Aurora;
10use crate::error::{AqlError, ErrorCode, Result};
11use crate::types::{Document, FieldDefinition, FieldType, ScalarType, Value};
12use serde::Serialize;
13use serde_json::Value as JsonValue;
14use std::collections::HashMap;
15use std::sync::Arc;
16
17// chrono used for downsample timestamp bucketing
18#[allow(unused_imports)]
19use chrono::TimeZone as _;
20
21pub type ExecutionContext = HashMap<String, JsonValue>;
22
23/// A pre-compiled query plan that can be executed repeatedly with different variables
24#[derive(Debug, Clone)]
25pub struct QueryPlan {
26    pub collection: String,
27    pub filter: Option<ast::Filter>,
28    pub compiled_filter: Option<CompiledFilter>,
29    pub projection: Vec<ast::Field>,
30    pub limit: Option<usize>,
31    pub offset: usize,
32    pub after: Option<String>,
33    pub orderings: Vec<ast::Ordering>,
34    pub is_connection: bool,
35    pub has_lookups: bool,
36    pub fragments: HashMap<String, FragmentDef>,
37    pub variable_definitions: Vec<ast::VariableDefinition>,
38}
39
40impl QueryPlan {
41    pub fn validate(&self, provided_variables: &HashMap<String, ast::Value>) -> Result<()> {
42        validate_required_variables(&self.variable_definitions, provided_variables)
43    }
44
45    pub fn from_query(
46        query: &ast::Query,
47        fragments: &HashMap<String, FragmentDef>,
48        variables: &HashMap<String, ast::Value>,
49    ) -> Result<Vec<Self>> {
50        let root_fields = collect_fields(&query.selection_set, fragments, variables, None)?;
51
52        let mut plans = Vec::new();
53        for field in root_fields {
54            let collection = field.name.clone();
55            let filter = extract_filter_from_args(&field.arguments)?;
56            let compiled_filter = if let Some(ref f) = filter {
57                Some(compile_filter(f)?)
58            } else {
59                None
60            };
61
62            let sub_fields = collect_fields(
63                &field.selection_set,
64                fragments,
65                variables,
66                Some(&field.name),
67            )?;
68
69            let (limit, offset) = extract_pagination(&field.arguments);
70            let (first, after, _last, _before) = extract_cursor_pagination(&field.arguments);
71            let orderings = extract_order_by(&field.arguments);
72            let is_connection = sub_fields
73                .iter()
74                .any(|f| f.name == "edges" || f.name == "pageInfo");
75
76            let has_lookups = sub_fields.iter().any(|f| {
77                f.arguments.iter().any(|arg| {
78                    arg.name == "collection"
79                        || arg.name == "localField"
80                        || arg.name == "foreignField"
81                })
82            });
83
84            plans.push(QueryPlan {
85                collection,
86                filter,
87                compiled_filter,
88                projection: sub_fields,
89                limit: limit.or(first),
90                offset,
91                after,
92                orderings,
93                is_connection,
94                has_lookups,
95                fragments: fragments.clone(),
96                variable_definitions: query.variable_definitions.clone(),
97            });
98        }
99        Ok(plans)
100    }
101}
102
103/// Execute an AQL query string against the database
104pub async fn execute(db: &Aurora, aql: &str, options: ExecutionOptions) -> Result<ExecutionResult> {
105    use std::collections::hash_map::DefaultHasher;
106    use std::hash::{Hash, Hasher};
107
108    // Compute query hash for cache lookup (ignoring whitespace variations)
109    let query_key = {
110        let mut hasher = DefaultHasher::new();
111        aql.trim().hash(&mut hasher);
112        hasher.finish()
113    };
114
115    // Prepare variables for this execution
116    let vars: HashMap<String, ast::Value> = options
117        .variables
118        .iter()
119        .map(|(k, v)| (k.clone(), json_to_aql_value(v.clone())))
120        .collect();
121
122    // HIGH-SPEED PATH: Check for pre-compiled Query Plan
123    if let Some(plan) = db.plan_cache.get(&query_key) {
124        plan.validate(&vars)?;
125        return execute_plan(db, &plan, &vars, &options).await;
126    }
127
128    // SLOW PATH: Parse and Compile
129    let mut doc = super::parse(aql)?;
130
131    // Extract first query operation for planning
132    if let Some(Operation::Query(query)) = doc
133        .operations
134        .iter()
135        .find(|op| matches!(op, Operation::Query(_)))
136    {
137        let fragments: HashMap<String, FragmentDef> = doc
138            .operations
139            .iter()
140            .filter_map(|op| {
141                if let Operation::FragmentDefinition(f) = op {
142                    Some((f.name.clone(), f.clone()))
143                } else {
144                    None
145                }
146            })
147            .collect();
148
149        let plans = QueryPlan::from_query(query, &fragments, &vars)?;
150        if plans.len() == 1 {
151            let plan = Arc::new(plans[0].clone());
152            plan.validate(&vars)?;
153            db.plan_cache.insert(query_key, Arc::clone(&plan));
154
155            return execute_plan(db, &plan, &vars, &options).await;
156        }
157    }
158
159    // FALLBACK: Normal execution for complex documents (but WITH our prepared vars)
160    // Merge declared variable defaults for any var not explicitly provided.
161    let mut vars = vars;
162    for op in &doc.operations {
163        if let Operation::Query(q) = op {
164            for var_def in &q.variable_definitions {
165                if !vars.contains_key(&var_def.name) {
166                    if let Some(default) = &var_def.default_value {
167                        vars.insert(var_def.name.clone(), default.clone());
168                    }
169                }
170            }
171        }
172    }
173
174    // We must resolve variables in the AST before executing the document path
175    super::validator::resolve_variables(&mut doc, &vars).map_err(|e| {
176        let code = match e.code {
177            super::validator::ErrorCode::MissingRequiredVariable => ErrorCode::UndefinedVariable,
178            super::validator::ErrorCode::TypeMismatch => ErrorCode::TypeError,
179            _ => ErrorCode::QueryError,
180        };
181        AqlError::new(code, e.to_string())
182    })?;
183
184    execute_document(db, &doc, &options).await
185}
186
187async fn execute_plan(
188    db: &Aurora,
189    plan: &QueryPlan,
190    variables: &HashMap<String, ast::Value>,
191    options: &ExecutionOptions,
192) -> Result<ExecutionResult> {
193    let collection_name = &plan.collection;
194
195    // Determine effective limit for the scan
196    let effective_limit = plan.limit;
197
198    // 1. Resolve Indexed Equality (Fast path)
199    let mut indexed_docs = None;
200    if let Some(ref f) = plan.filter {
201        // We need a version of find_indexed that uses our runtime variables
202        if let Some((field, val)) =
203            find_indexed_equality_filter_runtime(f, db, collection_name, variables)
204        {
205            let db_val = aql_value_to_db_value(&val, variables)?;
206            let ids = db.get_ids_from_index(collection_name, &field, &db_val);
207
208            let mut docs = Vec::with_capacity(ids.len());
209            for id in ids {
210                if let Some(doc) = db.get_document(collection_name, &id)? {
211                    // Check if doc matches the REST of the plan's compiled filter
212                    if let Some(ref cf) = plan.compiled_filter {
213                        if matches_filter(&doc, cf, variables) {
214                            docs.push(doc);
215                        }
216                    } else {
217                        docs.push(doc);
218                    }
219                }
220            }
221            indexed_docs = Some(docs);
222        }
223    }
224
225    let mut docs = if let Some(d) = indexed_docs {
226        d
227    } else {
228        // Fallback to scan
229        let vars_arc = Arc::new(variables.clone());
230        let cf_clone = plan.compiled_filter.clone();
231        let filter_fn = move |doc: &Document| {
232            cf_clone
233                .as_ref()
234                .map(|f| matches_filter(doc, f, &vars_arc))
235                .unwrap_or(true)
236        };
237
238        // Scan-limit is only valid when there is no cursor and no ordering —
239        // orderings require a full scan to find the correct top-N.
240        let scan_limit = if plan.after.is_some() || !plan.orderings.is_empty() {
241            None
242        } else {
243            plan.limit.map(|l| {
244                let base = if plan.is_connection { l + 1 } else { l };
245                base + plan.offset
246            })
247        };
248        db.scan_and_filter(collection_name, filter_fn, scan_limit)?
249    };
250
251    // 2. Sort (must happen before cursor drain and offset)
252    if !plan.orderings.is_empty() {
253        apply_ordering(&mut docs, &plan.orderings);
254    }
255
256    // 2a. Cursor pagination: skip past the `after` cursor
257    if let Some(ref cursor) = plan.after {
258        if let Some(pos) = docs.iter().position(|d| &d.id == cursor) {
259            docs.drain(0..=pos);
260        }
261    }
262
263    // 2b. Apply offset
264    if plan.offset > 0 {
265        if plan.offset < docs.len() {
266            docs.drain(0..plan.offset);
267        } else {
268            docs.clear();
269        }
270    }
271
272    // 3. Handle Connection vs Flat List
273    if plan.is_connection {
274        return Ok(ExecutionResult::Query(execute_connection(
275            docs,
276            &plan.projection,
277            effective_limit,
278            &plan.fragments,
279            variables,
280        )));
281    }
282
283    // 3a. Apply limit for flat list (after ordering so we get correct top-N)
284    if let Some(l) = effective_limit {
285        docs.truncate(l);
286    }
287
288    // 4. Project (Flat List)
289    if options.apply_projections && !plan.projection.is_empty() {
290        // Check if we have an aggregate or groupBy field
291        let agg_field = plan.projection.iter().find(|f| f.name == "aggregate");
292        let group_by_field = plan.projection.iter().find(|f| f.name == "groupBy");
293
294        if let Some(f) = group_by_field {
295            // Handle GroupBy
296            let field_name = f
297                .arguments
298                .iter()
299                .find(|a| a.name == "field")
300                .and_then(|a| match &a.value {
301                    ast::Value::String(s) => Some(s),
302                    _ => None,
303                });
304
305            if let Some(group_field) = field_name {
306                let mut groups: HashMap<String, Vec<Document>> = HashMap::new();
307                for d in docs {
308                    let key = d
309                        .data
310                        .get(group_field)
311                        .map(|v| match v {
312                            Value::String(s) => s.clone(),
313                            _ => v.to_string(),
314                        })
315                        .unwrap_or_else(|| "null".to_string());
316                    groups.entry(key).or_default().push(d);
317                }
318
319                let mut group_docs = Vec::with_capacity(groups.len());
320                for (key, group_items) in groups {
321                    let mut data = HashMap::new();
322                    for selection in &f.selection_set {
323                        if let Selection::Field(sub_f) = selection {
324                            let alias = sub_f.alias.as_ref().unwrap_or(&sub_f.name);
325                            match sub_f.name.as_str() {
326                                "key" => {
327                                    data.insert(alias.clone(), Value::String(key.clone()));
328                                }
329                                "count" => {
330                                    data.insert(
331                                        alias.clone(),
332                                        Value::Int(group_items.len() as i64),
333                                    );
334                                }
335                                "nodes" => {
336                                    let sub_fields = collect_fields(
337                                        &sub_f.selection_set,
338                                        &plan.fragments,
339                                        variables,
340                                        None,
341                                    )
342                                    .unwrap_or_default();
343
344                                    let projected_nodes = group_items
345                                        .iter()
346                                        .map(|d| {
347                                            let node_data =
348                                                apply_projection(d.clone(), &sub_fields).data;
349                                            Value::Object(node_data)
350                                        })
351                                        .collect();
352                                    data.insert(alias.clone(), Value::Array(projected_nodes));
353                                }
354                                "aggregate" => {
355                                    let agg_res =
356                                        compute_aggregates(&group_items, &sub_f.selection_set);
357                                    data.insert(alias.clone(), Value::Object(agg_res));
358                                }
359                                _ => {}
360                            }
361                        }
362                    }
363                    group_docs.push(Document {
364                        id: format!("group:{}", key),
365                        data,
366                    });
367                }
368                docs = group_docs;
369            }
370        } else if let Some(agg) = agg_field {
371            // If aggregate is the ONLY field, we return a single summary document
372            // If there are other fields, we currently follow the behavior of adding the agg to each doc
373            // (Test expectations suggest a single doc if only agg is asked)
374            let agg_result = compute_aggregates(&docs, &agg.selection_set);
375            let alias = agg.alias.as_ref().unwrap_or(&agg.name);
376
377            if plan.projection.len() == 1 {
378                let mut data = HashMap::new();
379                data.insert(alias.clone(), Value::Object(agg_result));
380                docs = vec![Document {
381                    id: "aggregate".to_string(),
382                    data,
383                }];
384            } else {
385                // Add aggregate to each document
386                docs = docs
387                    .into_iter()
388                    .map(|mut d| {
389                        d.data
390                            .insert(alias.clone(), Value::Object(agg_result.clone()));
391                        apply_projection(d, &plan.projection)
392                    })
393                    .collect();
394            }
395        } else if !plan.has_lookups {
396            docs = docs
397                .into_iter()
398                .map(|d| apply_projection(d, &plan.projection))
399                .collect();
400        } else {
401            // Slow path for lookups
402            let mut projected = Vec::with_capacity(docs.len());
403            for d in docs {
404                let (proj_doc, _) =
405                    apply_projection_with_lookups(db, d, &plan.projection, variables).await?;
406                projected.push(proj_doc);
407            }
408            docs = projected;
409        }
410    }
411
412    Ok(ExecutionResult::Query(QueryResult {
413        collection: collection_name.clone(),
414        documents: docs,
415        total_count: None,
416        deferred_fields: vec![],
417        explain: None,
418    }))
419}
420
421fn compute_aggregates(docs: &[Document], selections: &[Selection]) -> HashMap<String, Value> {
422    let mut results = HashMap::new();
423
424    for selection in selections {
425        if let Selection::Field(f) = selection {
426            let alias = f.alias.as_ref().unwrap_or(&f.name);
427            let value = match f.name.as_str() {
428                "count" => Value::Int(docs.len() as i64),
429                "sum" => {
430                    let field =
431                        f.arguments
432                            .iter()
433                            .find(|a| a.name == "field")
434                            .and_then(|a| match &a.value {
435                                ast::Value::String(s) => Some(s),
436                                _ => None,
437                            });
438
439                    if let Some(field_name) = field {
440                        let sum: f64 = docs
441                            .iter()
442                            .filter_map(|d| d.data.get(field_name))
443                            .filter_map(|v| match v {
444                                Value::Int(i) => Some(*i as f64),
445                                Value::Float(f) => Some(*f),
446                                _ => None,
447                            })
448                            .sum();
449                        Value::Float(sum)
450                    } else {
451                        Value::Null
452                    }
453                }
454                "avg" => {
455                    let field =
456                        f.arguments
457                            .iter()
458                            .find(|a| a.name == "field")
459                            .and_then(|a| match &a.value {
460                                ast::Value::String(s) => Some(s),
461                                _ => None,
462                            });
463
464                    if let Some(field_name) = field
465                        && !docs.is_empty()
466                    {
467                        let values: Vec<f64> = docs
468                            .iter()
469                            .filter_map(|d| d.data.get(field_name))
470                            .filter_map(|v| match v {
471                                Value::Int(i) => Some(*i as f64),
472                                Value::Float(f) => Some(*f),
473                                _ => None,
474                            })
475                            .collect();
476
477                        if values.is_empty() {
478                            Value::Null
479                        } else {
480                            let sum: f64 = values.iter().sum();
481                            Value::Float(sum / values.len() as f64)
482                        }
483                    } else {
484                        Value::Null
485                    }
486                }
487                "min" => {
488                    let field =
489                        f.arguments
490                            .iter()
491                            .find(|a| a.name == "field")
492                            .and_then(|a| match &a.value {
493                                ast::Value::String(s) => Some(s),
494                                _ => None,
495                            });
496
497                    if let Some(field_name) = field
498                        && !docs.is_empty()
499                    {
500                        let min = docs
501                            .iter()
502                            .filter_map(|d| d.data.get(field_name))
503                            .filter_map(|v| match v {
504                                Value::Int(i) => Some(*i as f64),
505                                Value::Float(f) => Some(*f),
506                                _ => None,
507                            })
508                            .fold(f64::INFINITY, f64::min);
509
510                        if min == f64::INFINITY {
511                            Value::Null
512                        } else {
513                            Value::Float(min)
514                        }
515                    } else {
516                        Value::Null
517                    }
518                }
519                "max" => {
520                    let field =
521                        f.arguments
522                            .iter()
523                            .find(|a| a.name == "field")
524                            .and_then(|a| match &a.value {
525                                ast::Value::String(s) => Some(s),
526                                _ => None,
527                            });
528
529                    if let Some(field_name) = field
530                        && !docs.is_empty()
531                    {
532                        let max = docs
533                            .iter()
534                            .filter_map(|d| d.data.get(field_name))
535                            .filter_map(|v| match v {
536                                Value::Int(i) => Some(*i as f64),
537                                Value::Float(f) => Some(*f),
538                                _ => None,
539                            })
540                            .fold(f64::NEG_INFINITY, f64::max);
541
542                        if max == f64::NEG_INFINITY {
543                            Value::Null
544                        } else {
545                            Value::Float(max)
546                        }
547                    } else {
548                        Value::Null
549                    }
550                }
551                _ => Value::Null,
552            };
553            results.insert(alias.clone(), value);
554        }
555    }
556
557    results
558}
559
560fn find_indexed_equality_filter_runtime(
561    filter: &ast::Filter,
562    db: &Aurora,
563    collection: &str,
564    variables: &HashMap<String, ast::Value>,
565) -> Option<(String, ast::Value)> {
566    match filter {
567        ast::Filter::Eq(field, val) => {
568            if field == "id" || db.has_index(collection, field) {
569                let resolved = resolve_if_variable(val, variables);
570                return Some((field.clone(), resolved.clone()));
571            }
572        }
573        ast::Filter::And(filters) => {
574            for f in filters {
575                if let Some(res) =
576                    find_indexed_equality_filter_runtime(f, db, collection, variables)
577                {
578                    return Some(res);
579                }
580            }
581        }
582        _ => {}
583    }
584    None
585}
586
587/// Helper to flatten a selection set (processing fragments) into a list of fields
588fn collect_fields(
589    selection_set: &[Selection],
590    fragments: &HashMap<String, FragmentDef>,
591    variable_values: &HashMap<String, ast::Value>,
592    parent_type: Option<&str>,
593) -> Result<Vec<Field>> {
594    let mut fields = Vec::new();
595
596    for selection in selection_set {
597        match selection {
598            Selection::Field(field) => {
599                if should_include(&field.directives, variable_values)? {
600                    fields.push(field.clone());
601                }
602            }
603            Selection::FragmentSpread(name) => {
604                if let Some(fragment) = fragments.get(name) {
605                    let type_match = if let Some(parent) = parent_type {
606                        parent == fragment.type_condition
607                    } else {
608                        true
609                    };
610
611                    if type_match {
612                        let fragment_fields = collect_fields(
613                            &fragment.selection_set,
614                            fragments,
615                            variable_values,
616                            parent_type,
617                        )?;
618                        fields.extend(fragment_fields);
619                    }
620                }
621            }
622            Selection::InlineFragment(inline) => {
623                let type_match = if let Some(parent) = parent_type {
624                    parent == inline.type_condition
625                } else {
626                    true
627                };
628
629                if type_match {
630                    let inline_fields = collect_fields(
631                        &inline.selection_set,
632                        fragments,
633                        variable_values,
634                        parent_type,
635                    )?;
636                    fields.extend(inline_fields);
637                }
638            }
639        }
640    }
641
642    Ok(fields)
643}
644
645/// Check if a field/fragment should be included based on @skip/@include
646fn should_include(
647    directives: &[ast::Directive],
648    variables: &HashMap<String, ast::Value>,
649) -> Result<bool> {
650    for dir in directives {
651        if dir.name == "skip" {
652            if let Some(arg) = dir.arguments.iter().find(|a| a.name == "if") {
653                let should_skip = resolve_boolean_arg(&arg.value, variables)?;
654                if should_skip {
655                    return Ok(false);
656                }
657            }
658        } else if dir.name == "include" {
659            if let Some(arg) = dir.arguments.iter().find(|a| a.name == "if") {
660                let should_include = resolve_boolean_arg(&arg.value, variables)?;
661                if !should_include {
662                    return Ok(false);
663                }
664            }
665        }
666    }
667    Ok(true)
668}
669
670fn resolve_boolean_arg(
671    value: &ast::Value,
672    variables: &HashMap<String, ast::Value>,
673) -> Result<bool> {
674    match value {
675        ast::Value::Boolean(b) => Ok(*b),
676        ast::Value::Variable(name) => {
677            if let Some(val) = variables.get(name) {
678                match val {
679                    ast::Value::Boolean(b) => Ok(*b),
680                    _ => Err(AqlError::new(
681                        ErrorCode::TypeError,
682                        format!("Variable '{}' is not a boolean, got {:?}", name, val),
683                    )),
684                }
685            } else {
686                Err(AqlError::new(
687                    ErrorCode::UndefinedVariable,
688                    format!("Variable '{}' is not defined", name),
689                ))
690            }
691        }
692        _ => Err(AqlError::new(
693            ErrorCode::TypeError,
694            format!("Expected boolean value, got {:?}", value),
695        )),
696    }
697}
698
699/// Validate that all required variables are provided
700fn validate_required_variables(
701    variable_definitions: &[ast::VariableDefinition],
702    provided_variables: &HashMap<String, ast::Value>,
703) -> Result<()> {
704    for var_def in variable_definitions {
705        if var_def.var_type.is_required {
706            if !provided_variables.contains_key(&var_def.name) {
707                if var_def.default_value.is_none() {
708                    return Err(AqlError::new(
709                        ErrorCode::UndefinedVariable,
710                        format!(
711                            "Required variable '{}' (type: {}{}) is not provided",
712                            var_def.name,
713                            var_def.var_type.name,
714                            if var_def.var_type.is_required {
715                                "!"
716                            } else {
717                                ""
718                            }
719                        ),
720                    ));
721                }
722            }
723        }
724    }
725    Ok(())
726}
727
728/// Result of executing an AQL operation
729#[derive(Debug)]
730pub enum ExecutionResult {
731    Query(QueryResult),
732    Mutation(MutationResult),
733    Subscription(SubscriptionResult),
734    Batch(Vec<ExecutionResult>),
735    Schema(SchemaResult),
736    Migration(MigrationResult),
737}
738
739#[derive(Debug, Clone)]
740pub struct SchemaResult {
741    pub operation: String,
742    pub collection: String,
743    pub status: String,
744}
745
746#[derive(Debug, Clone)]
747pub struct MigrationResult {
748    pub version: String,
749    pub steps_applied: usize,
750    pub status: String,
751}
752
753#[derive(Debug, Clone, Serialize)]
754pub struct ExecutionPlan {
755    pub operations: Vec<String>,
756    pub estimated_cost: f64,
757}
758
759/// Query execution result
760#[derive(Debug, Clone)]
761pub struct QueryResult {
762    pub collection: String,
763    pub documents: Vec<Document>,
764    pub total_count: Option<usize>,
765    /// Fields marked @defer — omitted from documents, listed here
766    pub deferred_fields: Vec<String>,
767    /// Explain metadata when @explain is present on the query
768    pub explain: Option<ExplainResult>,
769}
770
771/// Execution plan metadata returned by @explain
772#[derive(Debug, Clone, Default)]
773pub struct ExplainResult {
774    pub collection: String,
775    pub docs_scanned: usize,
776    pub index_used: bool,
777    pub elapsed_ms: u128,
778}
779
780/// Mutation execution result
781#[derive(Debug, Clone)]
782pub struct MutationResult {
783    pub operation: String,
784    pub collection: String,
785    pub affected_count: usize,
786    pub returned_documents: Vec<Document>,
787}
788
789/// Subscription result
790#[derive(Debug)]
791pub struct SubscriptionResult {
792    pub subscription_id: String,
793    pub collection: String,
794    pub stream: Option<crate::pubsub::ChangeListener>,
795}
796
797/// Execution options
798#[derive(Debug, Clone)]
799pub struct ExecutionOptions {
800    pub skip_validation: bool,
801    pub apply_projections: bool,
802    pub variables: HashMap<String, JsonValue>,
803}
804
805impl Default for ExecutionOptions {
806    fn default() -> Self {
807        Self {
808            skip_validation: false,
809            apply_projections: true,
810            variables: HashMap::new(),
811        }
812    }
813}
814
815impl ExecutionOptions {
816    pub fn new() -> Self {
817        Self::default()
818    }
819
820    pub fn with_variables(mut self, vars: HashMap<String, JsonValue>) -> Self {
821        self.variables = vars;
822        self
823    }
824
825    pub fn skip_validation(mut self) -> Self {
826        self.skip_validation = true;
827        self
828    }
829}
830
831fn json_to_aql_value(v: serde_json::Value) -> ast::Value {
832    match v {
833        serde_json::Value::Null => ast::Value::Null,
834        serde_json::Value::Bool(b) => ast::Value::Boolean(b),
835        serde_json::Value::Number(n) => {
836            if let Some(i) = n.as_i64() {
837                ast::Value::Int(i)
838            } else if let Some(f) = n.as_f64() {
839                ast::Value::Float(f)
840            } else {
841                ast::Value::Null
842            }
843        }
844        serde_json::Value::String(s) => ast::Value::String(s),
845        serde_json::Value::Array(arr) => {
846            ast::Value::Array(arr.into_iter().map(json_to_aql_value).collect())
847        }
848        serde_json::Value::Object(map) => ast::Value::Object(
849            map.into_iter()
850                .map(|(k, v)| (k, json_to_aql_value(v)))
851                .collect(),
852        ),
853    }
854}
855
856/// Execute a parsed AQL document
857pub async fn execute_document(
858    db: &Aurora,
859    doc: &ast::Document,
860    options: &ExecutionOptions,
861) -> Result<ExecutionResult> {
862    if doc.operations.is_empty() {
863        return Err(AqlError::new(
864            ErrorCode::QueryError,
865            "No operations in document".to_string(),
866        ));
867    }
868
869    let vars: HashMap<String, ast::Value> = options
870        .variables
871        .iter()
872        .map(|(k, v)| (k.clone(), json_to_aql_value(v.clone())))
873        .collect();
874
875    let fragments: HashMap<String, FragmentDef> = doc
876        .operations
877        .iter()
878        .filter_map(|op| {
879            if let Operation::FragmentDefinition(frag) = op {
880                Some((frag.name.clone(), frag.clone()))
881            } else {
882                None
883            }
884        })
885        .collect();
886
887    let executable_ops: Vec<&Operation> = doc
888        .operations
889        .iter()
890        .filter(|op| !matches!(op, Operation::FragmentDefinition(_)))
891        .collect();
892
893    if executable_ops.is_empty() {
894        return Err(AqlError::new(
895            ErrorCode::QueryError,
896            "No executable operations in document".to_string(),
897        ));
898    }
899
900    if executable_ops.len() == 1 {
901        execute_operation(db, executable_ops[0], &vars, options, &fragments).await
902    } else {
903        let mut results = Vec::new();
904        for op in executable_ops {
905            results.push(execute_operation(db, op, &vars, options, &fragments).await?);
906        }
907        Ok(ExecutionResult::Batch(results))
908    }
909}
910
911async fn execute_operation(
912    db: &Aurora,
913    op: &Operation,
914    vars: &HashMap<String, ast::Value>,
915    options: &ExecutionOptions,
916    fragments: &HashMap<String, FragmentDef>,
917) -> Result<ExecutionResult> {
918    match op {
919        Operation::Query(query) => execute_query(db, query, vars, options, fragments).await,
920        Operation::Mutation(mutation) => {
921            execute_mutation(db, mutation, vars, options, fragments).await
922        }
923        Operation::Subscription(sub) => execute_subscription(db, sub, vars, options).await,
924        Operation::Schema(schema) => execute_schema(db, schema, options).await,
925        Operation::Migration(migration) => execute_migration(db, migration, options).await,
926        Operation::Introspection(intro) => execute_introspection(db, intro).await,
927        Operation::Handler(handler) => execute_handler_registration(db, handler, options).await,
928        _ => Ok(ExecutionResult::Query(QueryResult {
929            collection: String::new(),
930            documents: vec![],
931            total_count: None,
932            deferred_fields: vec![],
933            explain: None,
934        })),
935    }
936}
937
938async fn execute_query(
939    db: &Aurora,
940    query: &ast::Query,
941    vars: &HashMap<String, ast::Value>,
942    options: &ExecutionOptions,
943    fragments: &HashMap<String, FragmentDef>,
944) -> Result<ExecutionResult> {
945    validate_required_variables(&query.variable_definitions, vars)?;
946    let has_explain = query.directives.iter().any(|d| d.name == "explain");
947    let root_fields = collect_fields(&query.selection_set, fragments, vars, None)?;
948    let mut results = Vec::new();
949    for field in &root_fields {
950        let sub_fields = collect_fields(&field.selection_set, fragments, vars, Some(&field.name))?;
951        let start = std::time::Instant::now();
952        let mut result =
953            execute_collection_query(db, field, &sub_fields, vars, options, fragments).await?;
954        if has_explain {
955            let elapsed_ms = start.elapsed().as_millis();
956            let index_used =
957                field.arguments.iter().any(|a| a.name == "where") && !result.documents.is_empty();
958            result.explain = Some(ExplainResult {
959                collection: result.collection.clone(),
960                docs_scanned: result.documents.len(),
961                index_used,
962                elapsed_ms,
963            });
964        }
965        results.push(result);
966    }
967    if results.len() == 1 {
968        Ok(ExecutionResult::Query(results.remove(0)))
969    } else {
970        Ok(ExecutionResult::Batch(
971            results.into_iter().map(ExecutionResult::Query).collect(),
972        ))
973    }
974}
975
976async fn execute_collection_query(
977    db: &Aurora,
978    field: &ast::Field,
979    sub_fields: &[ast::Field],
980    variables: &HashMap<String, ast::Value>,
981    options: &ExecutionOptions,
982    fragments: &HashMap<String, FragmentDef>,
983) -> Result<QueryResult> {
984    let collection_name = &field.name;
985
986    // ── Search args ────────────────────────────────────────────────────────
987    // `search: { query: "...", fields: [...], fuzzy: true }` overrides normal scan.
988    if let Some(search_arg) = field.arguments.iter().find(|a| a.name == "search") {
989        return execute_search_query(
990            db,
991            collection_name,
992            search_arg,
993            sub_fields,
994            field,
995            variables,
996            options,
997        )
998        .await;
999    }
1000
1001    let filter = extract_filter_from_args(&field.arguments)?;
1002    let (limit, offset) = extract_pagination(&field.arguments);
1003    let (first, after, _last, _before) = extract_cursor_pagination(&field.arguments);
1004    let compiled_filter = if let Some(ref f) = filter {
1005        Some(compile_filter(f)?)
1006    } else {
1007        None
1008    };
1009    let vars_arc = Arc::new(variables.clone());
1010    let filter_fn = move |doc: &Document| {
1011        compiled_filter
1012            .as_ref()
1013            .map(|f| matches_filter(doc, f, &vars_arc))
1014            .unwrap_or(true)
1015    };
1016
1017    let indexed_docs = if let Some(ref f) = filter {
1018        match find_indexed_equality_filter(f, db, collection_name) {
1019            Some((field_name, val)) => {
1020                let db_val = aql_value_to_db_value(&val, variables)?;
1021                let ids = db.get_ids_from_index(collection_name, &field_name, &db_val);
1022                let mut docs = Vec::with_capacity(ids.len());
1023                for id in ids {
1024                    if let Some(doc) = db.get_document(collection_name, &id)? {
1025                        if filter_fn(&doc) {
1026                            docs.push(doc);
1027                        }
1028                    }
1029                }
1030                Some(docs)
1031            }
1032            None => None,
1033        }
1034    } else {
1035        None
1036    };
1037
1038    let is_connection = sub_fields
1039        .iter()
1040        .any(|f| f.name == "edges" || f.name == "pageInfo");
1041
1042    let orderings = extract_order_by(&field.arguments);
1043
1044    let mut docs = if let Some(docs) = indexed_docs {
1045        docs
1046    } else {
1047        //only valid when there is no cursor (after) and no
1048        // ordering.  With ordering we must see ALL docs before we can pick the top-N;
1049        // with a cursor we don't know how far into the result set the cursor lies.
1050        let scan_limit = if after.is_some() || !orderings.is_empty() {
1051            None
1052        } else {
1053            limit.or(first).map(|l| {
1054                let base = if is_connection { l + 1 } else { l };
1055                base + offset
1056            })
1057        };
1058        db.scan_and_filter(collection_name, filter_fn, scan_limit)?
1059    };
1060
1061    // Validation args
1062    // `validate: { fieldName: { format: "email" } }` filters out non-conforming docs
1063    if let Some(validate_arg) = field.arguments.iter().find(|a| a.name == "validate") {
1064        docs.retain(|doc| doc_passes_validate_arg(doc, validate_arg));
1065    }
1066
1067    // Sort before any pagination so offset/cursor operate on ordered results.
1068    if !orderings.is_empty() {
1069        apply_ordering(&mut docs, &orderings);
1070    }
1071
1072    // Cursor pagination: skip past the `after` cursor before slicing
1073    if let Some(ref cursor) = after {
1074        if let Some(pos) = docs.iter().position(|d| &d.id == cursor) {
1075            docs.drain(0..=pos);
1076        }
1077    }
1078
1079    if is_connection {
1080        return Ok(execute_connection(
1081            docs,
1082            sub_fields,
1083            limit.or(first),
1084            fragments,
1085            variables,
1086        ));
1087    }
1088
1089    // Offset pagination: skip the first `offset` documents
1090    if offset > 0 {
1091        if offset < docs.len() {
1092            docs.drain(0..offset);
1093        } else {
1094            docs.clear();
1095        }
1096    }
1097
1098    // Apply limit after offset
1099    if let Some(l) = limit.or(first) {
1100        docs.truncate(l);
1101    }
1102
1103    // Projections & Lookups
1104    let has_lookups = sub_fields.iter().any(|f| {
1105        f.arguments
1106            .iter()
1107            .any(|a| a.name == "collection" || a.name == "localField")
1108    });
1109
1110    let mut deferred_fields = Vec::new();
1111
1112    if options.apply_projections && !sub_fields.is_empty() {
1113        if has_lookups {
1114            let mut projected = Vec::with_capacity(docs.len());
1115            for d in docs {
1116                let (proj_doc, deferred) =
1117                    apply_projection_with_lookups(db, d, sub_fields, variables).await?;
1118                projected.push(proj_doc);
1119                if deferred_fields.is_empty() {
1120                    deferred_fields = deferred;
1121                }
1122            }
1123            docs = projected;
1124        } else {
1125            let mut all_deferred = Vec::new();
1126            docs = docs
1127                .into_iter()
1128                .map(|d| {
1129                    let (proj, deferred) = apply_projection_and_defer(d, sub_fields);
1130                    if all_deferred.is_empty() && !deferred.is_empty() {
1131                        all_deferred = deferred;
1132                    }
1133                    proj
1134                })
1135                .collect();
1136            deferred_fields = all_deferred;
1137        }
1138    }
1139
1140    // ── Window functions ───────────────────────────────────────────────────
1141    for sf in sub_fields {
1142        if sf.name == "windowFunc" {
1143            let alias = sf.alias.as_ref().unwrap_or(&sf.name).clone();
1144            let wfield = arg_string(&sf.arguments, "field").unwrap_or_default();
1145            let func = arg_string(&sf.arguments, "function").unwrap_or_else(|| "avg".to_string());
1146            let wsize = arg_i64(&sf.arguments, "windowSize").unwrap_or(3) as usize;
1147            apply_window_function(&mut docs, &alias, &wfield, &func, wsize);
1148        }
1149    }
1150
1151    // ── Downsample ─────────────────────────────────────────────────────────
1152    if let Some(ds_field) = sub_fields.iter().find(|f| f.name == "downsample") {
1153        let interval =
1154            arg_string(&ds_field.arguments, "interval").unwrap_or_else(|| "1h".to_string());
1155        let aggregation =
1156            arg_string(&ds_field.arguments, "aggregation").unwrap_or_else(|| "avg".to_string());
1157        let ds_sub: Vec<String> =
1158            collect_fields(&ds_field.selection_set, fragments, variables, None)
1159                .unwrap_or_default()
1160                .iter()
1161                .map(|f| f.name.clone())
1162                .collect();
1163        docs = apply_downsample(docs, &interval, &aggregation, &ds_sub);
1164    }
1165
1166    Ok(QueryResult {
1167        collection: collection_name.clone(),
1168        documents: docs,
1169        total_count: None,
1170        deferred_fields,
1171        explain: None,
1172    })
1173}
1174
1175/// Execute a search query using SearchBuilder
1176async fn execute_search_query(
1177    db: &Aurora,
1178    collection: &str,
1179    search_arg: &ast::Argument,
1180    sub_fields: &[ast::Field],
1181    field: &ast::Field,
1182    variables: &HashMap<String, ast::Value>,
1183    options: &ExecutionOptions,
1184) -> Result<QueryResult> {
1185    // Deeply resolve variable references inside the search argument (e.g. query: $term)
1186    let resolved_search_val = resolve_ast_deep(&search_arg.value, variables);
1187    let (query_str, search_fields, fuzzy) = extract_search_params(&resolved_search_val);
1188    let (limit, _) = extract_pagination(&field.arguments);
1189
1190    let mut builder = db.search(collection).query(&query_str);
1191    if fuzzy {
1192        builder = builder.fuzzy(1);
1193    }
1194    if let Some(l) = limit {
1195        builder = builder.limit(l);
1196    }
1197
1198    let mut docs = builder
1199        .collect_with_fields(if search_fields.is_empty() {
1200            None
1201        } else {
1202            Some(&search_fields)
1203        })
1204        .await?;
1205
1206    if options.apply_projections && !sub_fields.is_empty() {
1207        docs = docs
1208            .into_iter()
1209            .map(|d| {
1210                let (proj, _) = apply_projection_and_defer(d, sub_fields);
1211                proj
1212            })
1213            .collect();
1214    }
1215
1216    Ok(QueryResult {
1217        collection: collection.to_string(),
1218        documents: docs,
1219        total_count: None,
1220        deferred_fields: vec![],
1221        explain: None,
1222    })
1223}
1224
1225fn extract_search_params(v: &ast::Value) -> (String, Vec<String>, bool) {
1226    let mut query = String::new();
1227    let mut fields = Vec::new();
1228    let mut fuzzy = false;
1229    if let ast::Value::Object(m) = v {
1230        if let Some(ast::Value::String(q)) = m.get("query") {
1231            query = q.clone();
1232        }
1233        if let Some(ast::Value::Array(arr)) = m.get("fields") {
1234            for item in arr {
1235                if let ast::Value::String(s) = item {
1236                    fields.push(s.clone());
1237                }
1238            }
1239        }
1240        if let Some(ast::Value::Boolean(b)) = m.get("fuzzy") {
1241            fuzzy = *b;
1242        }
1243    }
1244    (query, fields, fuzzy)
1245}
1246
1247fn doc_passes_validate_arg(doc: &Document, validate_arg: &ast::Argument) -> bool {
1248    if let ast::Value::Object(rules) = &validate_arg.value {
1249        for (field_name, constraints_val) in rules {
1250            if let ast::Value::Object(constraints) = constraints_val {
1251                if let Some(field_val) = doc.data.get(field_name) {
1252                    for (constraint_name, constraint_val) in constraints {
1253                        if !check_inline_constraint(field_val, constraint_name, constraint_val) {
1254                            return false;
1255                        }
1256                    }
1257                }
1258            }
1259        }
1260    }
1261    true
1262}
1263
1264fn check_inline_constraint(value: &Value, constraint: &str, constraint_val: &ast::Value) -> bool {
1265    match constraint {
1266        "format" => {
1267            if let (Value::String(s), ast::Value::String(fmt)) = (value, constraint_val) {
1268                return match fmt.as_str() {
1269                    "email" => {
1270                        s.contains('@')
1271                            && s.split('@')
1272                                .nth(1)
1273                                .map(|d| d.contains('.'))
1274                                .unwrap_or(false)
1275                    }
1276                    "url" => s.starts_with("http://") || s.starts_with("https://"),
1277                    "uuid" => uuid::Uuid::parse_str(s).is_ok(),
1278                    _ => true,
1279                };
1280            }
1281            true
1282        }
1283        "min" => {
1284            let n = match value {
1285                Value::Int(i) => *i as f64,
1286                Value::Float(f) => *f,
1287                _ => return true,
1288            };
1289            let min = match constraint_val {
1290                ast::Value::Float(f) => *f,
1291                ast::Value::Int(i) => *i as f64,
1292                _ => return true,
1293            };
1294            n >= min
1295        }
1296        "max" => {
1297            let n = match value {
1298                Value::Int(i) => *i as f64,
1299                Value::Float(f) => *f,
1300                _ => return true,
1301            };
1302            let max = match constraint_val {
1303                ast::Value::Float(f) => *f,
1304                ast::Value::Int(i) => *i as f64,
1305                _ => return true,
1306            };
1307            n <= max
1308        }
1309        "minLength" => {
1310            if let (Value::String(s), ast::Value::Int(n)) = (value, constraint_val) {
1311                return s.len() >= *n as usize;
1312            }
1313            true
1314        }
1315        "maxLength" => {
1316            if let (Value::String(s), ast::Value::Int(n)) = (value, constraint_val) {
1317                return s.len() <= *n as usize;
1318            }
1319            true
1320        }
1321        "pattern" => {
1322            if let (Value::String(s), ast::Value::String(pat)) = (value, constraint_val) {
1323                if let Ok(re) = regex::Regex::new(pat) {
1324                    return re.is_match(s);
1325                }
1326            }
1327            true
1328        }
1329        _ => true,
1330    }
1331}
1332
1333fn arg_string(args: &[ast::Argument], name: &str) -> Option<String> {
1334    args.iter().find(|a| a.name == name).and_then(|a| {
1335        if let ast::Value::String(s) = &a.value {
1336            Some(s.clone())
1337        } else {
1338            None
1339        }
1340    })
1341}
1342
1343fn arg_i64(args: &[ast::Argument], name: &str) -> Option<i64> {
1344    args.iter().find(|a| a.name == name).and_then(|a| {
1345        if let ast::Value::Int(i) = &a.value {
1346            Some(*i)
1347        } else {
1348            None
1349        }
1350    })
1351}
1352
1353/// Apply projection and collect @defer'd field names separately
1354fn apply_projection_and_defer(mut doc: Document, fields: &[ast::Field]) -> (Document, Vec<String>) {
1355    if fields.is_empty() {
1356        return (doc, vec![]);
1357    }
1358    let mut proj = HashMap::new();
1359    let mut deferred = Vec::new();
1360
1361    for f in fields {
1362        // @defer directive: omit from response, record field name
1363        if f.directives.iter().any(|d| d.name == "defer") {
1364            deferred.push(f.alias.as_ref().unwrap_or(&f.name).clone());
1365            continue;
1366        }
1367        // Computed field: name starts with __compute__ sentinel
1368        if f.name == "__compute__" {
1369            let alias = f.alias.as_deref().unwrap_or("computed");
1370            if let Some(expr) = f.arguments.iter().find(|a| a.name == "expr") {
1371                if let ast::Value::String(template) = &expr.value {
1372                    let result = eval_template(template, &doc.data);
1373                    proj.insert(alias.to_string(), Value::String(result));
1374                }
1375            }
1376            continue;
1377        }
1378        if f.name == "id" {
1379            proj.insert(
1380                f.alias.as_ref().unwrap_or(&f.name).clone(),
1381                Value::String(doc.id.clone()),
1382            );
1383        } else if let Some(v) = doc.data.get(&f.name) {
1384            proj.insert(f.alias.as_ref().unwrap_or(&f.name).clone(), v.clone());
1385        }
1386    }
1387    doc.data = proj;
1388    (doc, deferred)
1389}
1390
1391/// Simple `$fieldName` template evaluator
1392fn eval_template(template: &str, data: &HashMap<String, Value>) -> String {
1393    let mut result = template.to_string();
1394    for (k, v) in data {
1395        let placeholder = format!("${}", k);
1396        if result.contains(&placeholder) {
1397            result = result.replace(&placeholder, &v.to_string());
1398        }
1399    }
1400    result
1401}
1402
1403/// Apply window function (rollingAvg, rollingSum, rollingMin, rollingMax) on the docs Vec
1404fn apply_window_function(
1405    docs: &mut Vec<Document>,
1406    alias: &str,
1407    field: &str,
1408    function: &str,
1409    window: usize,
1410) {
1411    if docs.is_empty() || window == 0 {
1412        return;
1413    }
1414    let values: Vec<Option<f64>> = docs
1415        .iter()
1416        .map(|d| match d.data.get(field) {
1417            Some(Value::Int(i)) => Some(*i as f64),
1418            Some(Value::Float(f)) => Some(*f),
1419            _ => None,
1420        })
1421        .collect();
1422
1423    for (i, doc) in docs.iter_mut().enumerate() {
1424        let start = if i + 1 >= window { i + 1 - window } else { 0 };
1425        let window_vals: Vec<f64> = values[start..=i].iter().filter_map(|v| *v).collect();
1426        if window_vals.is_empty() {
1427            continue;
1428        }
1429        let result = match function {
1430            "rollingAvg" | "avg" => window_vals.iter().sum::<f64>() / window_vals.len() as f64,
1431            "rollingSum" | "sum" => window_vals.iter().sum::<f64>(),
1432            "rollingMin" | "min" => window_vals.iter().cloned().fold(f64::INFINITY, f64::min),
1433            "rollingMax" | "max" => window_vals
1434                .iter()
1435                .cloned()
1436                .fold(f64::NEG_INFINITY, f64::max),
1437            _ => window_vals.iter().sum::<f64>() / window_vals.len() as f64,
1438        };
1439        doc.data.insert(alias.to_string(), Value::Float(result));
1440    }
1441}
1442
1443/// Downsample time-series: bucket docs by interval, aggregate value fields
1444fn apply_downsample(
1445    docs: Vec<Document>,
1446    interval: &str,
1447    aggregation: &str,
1448    value_fields: &[String],
1449) -> Vec<Document> {
1450    let interval_secs: i64 = parse_interval(interval);
1451    if interval_secs <= 0 {
1452        return docs;
1453    }
1454
1455    // Group by bucket
1456    let mut buckets: std::collections::BTreeMap<i64, Vec<Document>> =
1457        std::collections::BTreeMap::new();
1458    let mut leftover = Vec::new();
1459
1460    for doc in docs {
1461        // Try common timestamp field names
1462        let ts = ["timestamp", "ts", "created_at", "time"]
1463            .iter()
1464            .find_map(|&k| doc.data.get(k))
1465            .and_then(|v| match v {
1466                Value::String(s) => chrono::DateTime::parse_from_rfc3339(s)
1467                    .ok()
1468                    .map(|dt| dt.timestamp()),
1469                Value::Int(i) => Some(*i),
1470                _ => None,
1471            });
1472
1473        if let Some(t) = ts {
1474            let bucket = (t / interval_secs) * interval_secs;
1475            buckets.entry(bucket).or_default().push(doc);
1476        } else {
1477            leftover.push(doc);
1478        }
1479    }
1480
1481    let mut result = Vec::new();
1482    for (bucket_ts, group) in buckets {
1483        let mut data = HashMap::new();
1484        data.insert(
1485            "timestamp".to_string(),
1486            Value::String(
1487                chrono::DateTime::from_timestamp(bucket_ts, 0)
1488                    .map(|dt: chrono::DateTime<chrono::Utc>| dt.to_rfc3339())
1489                    .unwrap_or_default(),
1490            ),
1491        );
1492        data.insert("count".to_string(), Value::Int(group.len() as i64));
1493
1494        for field in value_fields {
1495            if field == "timestamp" || field == "count" {
1496                continue;
1497            }
1498            let nums: Vec<f64> = group
1499                .iter()
1500                .filter_map(|d| match d.data.get(field) {
1501                    Some(Value::Int(i)) => Some(*i as f64),
1502                    Some(Value::Float(f)) => Some(*f),
1503                    _ => None,
1504                })
1505                .collect();
1506            if nums.is_empty() {
1507                continue;
1508            }
1509            let agg = match aggregation {
1510                "sum" => nums.iter().sum::<f64>(),
1511                "min" => nums.iter().cloned().fold(f64::INFINITY, f64::min),
1512                "max" => nums.iter().cloned().fold(f64::NEG_INFINITY, f64::max),
1513                "count" => nums.len() as f64,
1514                _ => nums.iter().sum::<f64>() / nums.len() as f64, // avg default
1515            };
1516            data.insert(field.clone(), Value::Float(agg));
1517        }
1518
1519        result.push(Document {
1520            id: bucket_ts.to_string(),
1521            data,
1522        });
1523    }
1524
1525    result.extend(leftover);
1526    result
1527}
1528
1529fn parse_interval(s: &str) -> i64 {
1530    let s = s.trim();
1531    if s.ends_with('s') {
1532        s[..s.len() - 1].parse().unwrap_or(0)
1533    } else if s.ends_with('m') {
1534        s[..s.len() - 1].parse::<i64>().unwrap_or(0) * 60
1535    } else if s.ends_with('h') {
1536        s[..s.len() - 1].parse::<i64>().unwrap_or(0) * 3600
1537    } else if s.ends_with('d') {
1538        s[..s.len() - 1].parse::<i64>().unwrap_or(0) * 86400
1539    } else {
1540        s.parse().unwrap_or(3600)
1541    }
1542}
1543
1544/// Register an event handler. Spawns a background task that fires a mutation
1545/// each time the trigger event is received on the relevant collection.
1546async fn execute_handler_registration(
1547    db: &Aurora,
1548    handler: &ast::HandlerDef,
1549    _options: &ExecutionOptions,
1550) -> Result<ExecutionResult> {
1551    use crate::pubsub::events::ChangeType;
1552
1553    let collection = match &handler.trigger {
1554        ast::HandlerTrigger::Insert { collection }
1555        | ast::HandlerTrigger::Update { collection }
1556        | ast::HandlerTrigger::Delete { collection } => {
1557            collection.as_deref().unwrap_or("*").to_string()
1558        }
1559        _ => "*".to_string(),
1560    };
1561
1562    let trigger_type = match &handler.trigger {
1563        ast::HandlerTrigger::Insert { .. } => Some(ChangeType::Insert),
1564        ast::HandlerTrigger::Update { .. } => Some(ChangeType::Update),
1565        ast::HandlerTrigger::Delete { .. } => Some(ChangeType::Delete),
1566        _ => None,
1567    };
1568
1569    let mut listener = if collection == "*" {
1570        db.pubsub.listen_all()
1571    } else {
1572        db.pubsub.listen(collection.clone())
1573    };
1574
1575    let db_clone = db.clone();
1576    let action = handler.action.clone();
1577    let handler_name = handler.name.clone();
1578
1579    tokio::spawn(async move {
1580        loop {
1581            match listener.recv().await {
1582                Ok(event) => {
1583                    // Check event type matches trigger
1584                    let matches = trigger_type
1585                        .as_ref()
1586                        .map(|t| &event.change_type == t)
1587                        .unwrap_or(true);
1588                    if !matches {
1589                        continue;
1590                    }
1591
1592                    // Build context from the triggering event.
1593                    // Always set $_id from event.id — for deletes, event.document is None
1594                    // but the document ID is still in event.id.
1595                    let mut vars = HashMap::new();
1596                    vars.insert("_id".to_string(), ast::Value::String(event.id.clone()));
1597                    if let Some(doc) = &event.document {
1598                        // Overlay field variables from the document data (insert/update)
1599                        for (k, v) in &doc.data {
1600                            vars.insert(format!("_{}", k), db_value_to_ast_value(v));
1601                        }
1602                    }
1603
1604                    let _ = execute_mutation_op(
1605                        &db_clone,
1606                        &action,
1607                        &vars,
1608                        &HashMap::new(),
1609                        &ExecutionOptions::default(),
1610                        &HashMap::new(),
1611                    )
1612                    .await;
1613                }
1614                Err(_) => {
1615                    eprintln!("[handler:{}] channel closed, stopping", handler_name);
1616                    break;
1617                }
1618            }
1619        }
1620    });
1621
1622    eprintln!(
1623        "[handler] '{}' registered on '{}'",
1624        handler.name, collection
1625    );
1626
1627    let mut data = HashMap::new();
1628    data.insert("name".to_string(), Value::String(handler.name.clone()));
1629    data.insert("collection".to_string(), Value::String(collection));
1630    data.insert(
1631        "status".to_string(),
1632        Value::String("registered".to_string()),
1633    );
1634
1635    Ok(ExecutionResult::Query(QueryResult {
1636        collection: "__handler".to_string(),
1637        documents: vec![Document {
1638            id: handler.name.clone(),
1639            data,
1640        }],
1641        total_count: Some(1),
1642        deferred_fields: vec![],
1643        explain: None,
1644    }))
1645}
1646
1647fn db_value_to_ast_value(v: &Value) -> ast::Value {
1648    match v {
1649        Value::Null => ast::Value::Null,
1650        Value::Bool(b) => ast::Value::Boolean(*b),
1651        Value::Int(i) => ast::Value::Int(*i),
1652        Value::Float(f) => ast::Value::Float(*f),
1653        Value::String(s) => ast::Value::String(s.clone()),
1654        Value::Uuid(u) => ast::Value::String(u.to_string()),
1655        Value::DateTime(dt) => ast::Value::String(dt.to_rfc3339()),
1656        Value::Array(arr) => ast::Value::Array(arr.iter().map(db_value_to_ast_value).collect()),
1657        Value::Object(m) => ast::Value::Object(
1658            m.iter()
1659                .map(|(k, v)| (k.clone(), db_value_to_ast_value(v)))
1660                .collect(),
1661        ),
1662    }
1663}
1664
1665async fn execute_mutation(
1666    db: &Aurora,
1667    mutation: &ast::Mutation,
1668    vars: &HashMap<String, ast::Value>,
1669    options: &ExecutionOptions,
1670    fragments: &HashMap<String, FragmentDef>,
1671) -> Result<ExecutionResult> {
1672    use crate::transaction::ACTIVE_TRANSACTION_ID;
1673
1674    validate_required_variables(&mutation.variable_definitions, vars)?;
1675
1676    // If there's already an active transaction in scope, run ops directly inside
1677    // it — the caller owns commit/rollback. Creating a nested transaction would
1678    // shadow the outer ACTIVE_TRANSACTION_ID and prevent the outer rollback from
1679    // undoing these writes.
1680    let already_in_tx = ACTIVE_TRANSACTION_ID
1681        .try_with(|id| *id)
1682        .ok()
1683        .and_then(|id| db.transaction_manager.active_transactions.get(&id))
1684        .is_some();
1685
1686    if already_in_tx {
1687        let mut results = Vec::new();
1688        let mut context = HashMap::new();
1689        for mut_op in &mutation.operations {
1690            let res = execute_mutation_op(db, mut_op, vars, &context, options, fragments).await?;
1691            if let Some(alias) = &mut_op.alias {
1692                if let Some(doc) = res.returned_documents.first() {
1693                    let mut m = serde_json::Map::new();
1694                    for (k, v) in &doc.data {
1695                        m.insert(k.clone(), aurora_value_to_json_value(v));
1696                    }
1697                    m.insert("id".to_string(), JsonValue::String(doc.id.clone()));
1698                    context.insert(alias.clone(), JsonValue::Object(m));
1699                }
1700            }
1701            results.push(res);
1702        }
1703        return if results.len() == 1 {
1704            Ok(ExecutionResult::Mutation(results.remove(0)))
1705        } else {
1706            Ok(ExecutionResult::Batch(
1707                results.into_iter().map(ExecutionResult::Mutation).collect(),
1708            ))
1709        };
1710    }
1711
1712    // No active transaction — wrap the mutation block in one so a failure in any
1713    // operation rolls back all previous operations in the same block.
1714    let tx_id = db.begin_transaction().await;
1715
1716    let exec_result = ACTIVE_TRANSACTION_ID
1717        .scope(tx_id, async {
1718            let mut results = Vec::new();
1719            let mut context = HashMap::new();
1720            for mut_op in &mutation.operations {
1721                let res =
1722                    execute_mutation_op(db, mut_op, vars, &context, options, fragments).await?;
1723                if let Some(alias) = &mut_op.alias {
1724                    if let Some(doc) = res.returned_documents.first() {
1725                        let mut m = serde_json::Map::new();
1726                        for (k, v) in &doc.data {
1727                            m.insert(k.clone(), aurora_value_to_json_value(v));
1728                        }
1729                        m.insert("id".to_string(), JsonValue::String(doc.id.clone()));
1730                        context.insert(alias.clone(), JsonValue::Object(m));
1731                    }
1732                }
1733                results.push(res);
1734            }
1735            Ok::<_, crate::error::AqlError>(results)
1736        })
1737        .await;
1738
1739    match exec_result {
1740        Ok(mut results) => {
1741            db.commit_transaction(tx_id).await?;
1742            if results.len() == 1 {
1743                Ok(ExecutionResult::Mutation(results.remove(0)))
1744            } else {
1745                Ok(ExecutionResult::Batch(
1746                    results.into_iter().map(ExecutionResult::Mutation).collect(),
1747                ))
1748            }
1749        }
1750        Err(e) => {
1751            let _ = db.rollback_transaction(tx_id).await;
1752            Err(e)
1753        }
1754    }
1755}
1756
1757fn execute_mutation_op<'a>(
1758    db: &'a Aurora,
1759    mut_op: &'a ast::MutationOperation,
1760    variables: &'a HashMap<String, ast::Value>,
1761    context: &'a ExecutionContext,
1762    options: &'a ExecutionOptions,
1763    fragments: &'a HashMap<String, FragmentDef>,
1764) -> futures::future::BoxFuture<'a, Result<MutationResult>> {
1765    use futures::future::FutureExt;
1766    async move {
1767        match &mut_op.operation {
1768            MutationOp::Insert { collection, data } => {
1769                let resolved = resolve_value(data, variables, context);
1770                let doc = db
1771                    .aql_insert(collection, aql_value_to_hashmap(&resolved, variables)?)
1772                    .await?;
1773                let returned = if !mut_op.selection_set.is_empty() && options.apply_projections {
1774                    let fields = collect_fields(
1775                        &mut_op.selection_set,
1776                        fragments,
1777                        variables,
1778                        Some(collection),
1779                    )
1780                    .unwrap_or_default();
1781                    vec![apply_projection(doc, &fields)]
1782                } else {
1783                    vec![doc]
1784                };
1785                Ok(MutationResult {
1786                    operation: "insert".to_string(),
1787                    collection: collection.clone(),
1788                    affected_count: 1,
1789                    returned_documents: returned,
1790                })
1791            }
1792            MutationOp::Update {
1793                collection,
1794                filter,
1795                data,
1796            } => {
1797                let cf = if let Some(f) = filter {
1798                    Some(compile_filter(f)?)
1799                } else {
1800                    None
1801                };
1802                let update_data =
1803                    aql_value_to_hashmap(&resolve_value(data, variables, context), variables)?;
1804
1805                // Scan and update matching documents
1806                let mut affected = 0;
1807                let mut returned = Vec::new();
1808
1809                // For now, we use a simple scan-and-update.
1810                // In production, we'd use indices if the filter allows.
1811                let vars_arc = Arc::new(variables.clone());
1812                let cf_arc = cf.map(Arc::new);
1813
1814                let matches = db.scan_and_filter(
1815                    collection,
1816                    |doc| {
1817                        if let Some(ref filter) = cf_arc {
1818                            matches_filter(doc, filter, &vars_arc)
1819                        } else {
1820                            true
1821                        }
1822                    },
1823                    None,
1824                )?;
1825
1826                let fields = if !mut_op.selection_set.is_empty() {
1827                    Some(
1828                        collect_fields(
1829                            &mut_op.selection_set,
1830                            fragments,
1831                            variables,
1832                            Some(collection),
1833                        )
1834                        .unwrap_or_default(),
1835                    )
1836                } else {
1837                    None
1838                };
1839
1840                for doc in matches {
1841                    let mut new_data = doc.data.clone();
1842                    for (k, v) in &update_data {
1843                        let applied = apply_field_modifier(new_data.get(k), v);
1844                        new_data.insert(k.clone(), applied);
1845                    }
1846
1847                    let updated_doc = db
1848                        .aql_update_document(collection, &doc.id, new_data)
1849                        .await?;
1850
1851                    affected += 1;
1852                    if let Some(ref f) = fields {
1853                        returned.push(apply_projection(updated_doc, f));
1854                    }
1855                }
1856
1857                Ok(MutationResult {
1858                    operation: "update".to_string(),
1859                    collection: collection.clone(),
1860                    affected_count: affected,
1861                    returned_documents: returned,
1862                })
1863            }
1864            MutationOp::Delete { collection, filter } => {
1865                let cf = if let Some(f) = filter {
1866                    Some(compile_filter(f)?)
1867                } else {
1868                    None
1869                };
1870
1871                let mut affected = 0;
1872                let vars_arc = Arc::new(variables.clone());
1873                let cf_arc = cf.map(Arc::new);
1874
1875                let matches = db.scan_and_filter(
1876                    collection,
1877                    |doc| {
1878                        if let Some(ref filter) = cf_arc {
1879                            matches_filter(doc, filter, &vars_arc)
1880                        } else {
1881                            true
1882                        }
1883                    },
1884                    None,
1885                )?;
1886
1887                for doc in matches {
1888                    db.aql_delete_document(collection, &doc.id).await?;
1889                    affected += 1;
1890                }
1891
1892                Ok(MutationResult {
1893                    operation: "delete".to_string(),
1894                    collection: collection.clone(),
1895                    affected_count: affected,
1896                    returned_documents: vec![],
1897                })
1898            }
1899            MutationOp::InsertMany { collection, data } => {
1900                let mut affected = 0;
1901                let mut returned = Vec::new();
1902                for item in data {
1903                    let resolved = resolve_value(item, variables, context);
1904                    let doc = db
1905                        .aql_insert(collection, aql_value_to_hashmap(&resolved, variables)?)
1906                        .await?;
1907                    affected += 1;
1908                    if !mut_op.selection_set.is_empty() && options.apply_projections {
1909                        let fields = collect_fields(
1910                            &mut_op.selection_set,
1911                            fragments,
1912                            variables,
1913                            Some(collection),
1914                        )
1915                        .unwrap_or_default();
1916                        returned.push(apply_projection(doc, &fields));
1917                    } else {
1918                        returned.push(doc);
1919                    }
1920                }
1921                Ok(MutationResult {
1922                    operation: "insertMany".to_string(),
1923                    collection: collection.clone(),
1924                    affected_count: affected,
1925                    returned_documents: returned,
1926                })
1927            }
1928            MutationOp::Upsert {
1929                collection,
1930                filter,
1931                data,
1932            } => {
1933                let update_data =
1934                    aql_value_to_hashmap(&resolve_value(data, variables, context), variables)?;
1935                let cf = if let Some(f) = filter {
1936                    Some(compile_filter(f)?)
1937                } else {
1938                    None
1939                };
1940                let vars_arc = Arc::new(variables.clone());
1941                let cf_arc = cf.map(Arc::new);
1942                let matches = db.scan_and_filter(
1943                    collection,
1944                    |doc| {
1945                        if let Some(ref filter) = cf_arc {
1946                            matches_filter(doc, filter, &vars_arc)
1947                        } else {
1948                            true
1949                        }
1950                    },
1951                    Some(1),
1952                )?;
1953                let doc = if let Some(existing) = matches.into_iter().next() {
1954                    db.aql_update_document(collection, &existing.id, update_data)
1955                        .await?
1956                } else {
1957                    db.aql_insert(collection, update_data).await?
1958                };
1959                Ok(MutationResult {
1960                    operation: "upsert".to_string(),
1961                    collection: collection.clone(),
1962                    affected_count: 1,
1963                    returned_documents: vec![doc],
1964                })
1965            }
1966            MutationOp::Transaction { operations } => {
1967                let mut all_returned = Vec::new();
1968                let mut total_affected = 0;
1969                for op in operations {
1970                    let res =
1971                        execute_mutation_op(db, op, variables, context, options, fragments).await?;
1972                    total_affected += res.affected_count;
1973                    all_returned.extend(res.returned_documents);
1974                }
1975                Ok(MutationResult {
1976                    operation: "transaction".to_string(),
1977                    collection: String::new(),
1978                    affected_count: total_affected,
1979                    returned_documents: all_returned,
1980                })
1981            }
1982            MutationOp::EnqueueJobs {
1983                job_type,
1984                payloads,
1985                priority,
1986                max_retries,
1987            } => {
1988                let workers = db.workers.as_ref().ok_or_else(|| {
1989                    AqlError::new(
1990                        ErrorCode::QueryError,
1991                        "Worker system not initialised".to_string(),
1992                    )
1993                })?;
1994                let job_priority = match priority {
1995                    ast::JobPriority::Low => crate::workers::JobPriority::Low,
1996                    ast::JobPriority::Normal => crate::workers::JobPriority::Normal,
1997                    ast::JobPriority::High => crate::workers::JobPriority::High,
1998                    ast::JobPriority::Critical => crate::workers::JobPriority::Critical,
1999                };
2000                let mut returned = Vec::new();
2001                for payload in payloads {
2002                    let resolved = resolve_value(payload, variables, context);
2003                    let json_payload: std::collections::HashMap<String, serde_json::Value> =
2004                        if let ast::Value::Object(map) = &resolved {
2005                            map.iter()
2006                                .map(|(k, v)| (k.clone(), aql_value_to_json(v)))
2007                                .collect()
2008                        } else {
2009                            std::collections::HashMap::new()
2010                        };
2011                    let mut job =
2012                        crate::workers::Job::new(job_type.clone()).with_priority(job_priority);
2013                    for (k, v) in json_payload {
2014                        job = job.add_field(k, v);
2015                    }
2016                    if let Some(retries) = max_retries {
2017                        job = job.with_max_retries(*retries);
2018                    }
2019                    let job_id = workers.enqueue(job).await?;
2020                    let mut doc = crate::types::Document::new();
2021                    doc.id = job_id.clone();
2022                    doc.data.insert("job_id".to_string(), Value::String(job_id));
2023                    doc.data
2024                        .insert("job_type".to_string(), Value::String(job_type.clone()));
2025                    doc.data
2026                        .insert("status".to_string(), Value::String("pending".to_string()));
2027                    returned.push(doc);
2028                }
2029                let count = returned.len();
2030                Ok(MutationResult {
2031                    operation: "enqueueJobs".to_string(),
2032                    collection: "__jobs".to_string(),
2033                    affected_count: count,
2034                    returned_documents: returned,
2035                })
2036            }
2037            MutationOp::Import { collection, data } => {
2038                let mut affected = 0;
2039                let mut returned = Vec::new();
2040                let fields = if !mut_op.selection_set.is_empty() {
2041                    Some(
2042                        collect_fields(
2043                            &mut_op.selection_set,
2044                            fragments,
2045                            variables,
2046                            Some(collection),
2047                        )
2048                        .unwrap_or_default(),
2049                    )
2050                } else {
2051                    None
2052                };
2053                for item in data {
2054                    let resolved = resolve_value(item, variables, context);
2055                    let map = aql_value_to_hashmap(&resolved, variables)?;
2056                    let doc = db.aql_insert(collection, map).await?;
2057                    affected += 1;
2058                    if let Some(ref f) = fields {
2059                        returned.push(apply_projection(doc, f));
2060                    }
2061                }
2062                Ok(MutationResult {
2063                    operation: "import".to_string(),
2064                    collection: collection.clone(),
2065                    affected_count: affected,
2066                    returned_documents: returned,
2067                })
2068            }
2069            MutationOp::Export {
2070                collection,
2071                format: _,
2072            } => {
2073                let docs = db.scan_and_filter(collection, |_| true, None)?;
2074                let fields = if !mut_op.selection_set.is_empty() {
2075                    Some(
2076                        collect_fields(
2077                            &mut_op.selection_set,
2078                            fragments,
2079                            variables,
2080                            Some(collection),
2081                        )
2082                        .unwrap_or_default(),
2083                    )
2084                } else {
2085                    None
2086                };
2087                let returned: Vec<Document> = if let Some(ref f) = fields {
2088                    docs.into_iter().map(|d| apply_projection(d, f)).collect()
2089                } else {
2090                    docs
2091                };
2092                let count = returned.len();
2093                Ok(MutationResult {
2094                    operation: "export".to_string(),
2095                    collection: collection.clone(),
2096                    affected_count: count,
2097                    returned_documents: returned,
2098                })
2099            }
2100            MutationOp::EnqueueJob {
2101                job_type,
2102                payload,
2103                priority,
2104                scheduled_at,
2105                max_retries,
2106            } => {
2107                let workers = db.workers.as_ref().ok_or_else(|| {
2108                    AqlError::new(
2109                        ErrorCode::QueryError,
2110                        "Worker system not initialised".to_string(),
2111                    )
2112                })?;
2113
2114                // Convert AQL priority to Job priority
2115                let job_priority = match priority {
2116                    ast::JobPriority::Low => crate::workers::JobPriority::Low,
2117                    ast::JobPriority::Normal => crate::workers::JobPriority::Normal,
2118                    ast::JobPriority::High => crate::workers::JobPriority::High,
2119                    ast::JobPriority::Critical => crate::workers::JobPriority::Critical,
2120                };
2121
2122                // Convert AQL Value payload to serde_json payload
2123                let resolved = resolve_value(payload, variables, context);
2124                let json_payload: std::collections::HashMap<String, serde_json::Value> =
2125                    if let ast::Value::Object(map) = &resolved {
2126                        map.iter()
2127                            .map(|(k, v)| (k.clone(), aql_value_to_json(v)))
2128                            .collect()
2129                    } else {
2130                        std::collections::HashMap::new()
2131                    };
2132
2133                let mut job =
2134                    crate::workers::Job::new(job_type.clone()).with_priority(job_priority);
2135
2136                for (k, v) in json_payload {
2137                    job = job.add_field(k, v);
2138                }
2139
2140                if let Some(retries) = max_retries {
2141                    job = job.with_max_retries(*retries);
2142                }
2143
2144                if let Some(scheduled) = scheduled_at {
2145                    if let Ok(dt) = scheduled.parse::<chrono::DateTime<chrono::Utc>>() {
2146                        job = job.scheduled_at(dt);
2147                    }
2148                }
2149
2150                let job_id = workers.enqueue(job).await?;
2151
2152                let mut doc = crate::types::Document::new();
2153                doc.id = job_id.clone();
2154                doc.data
2155                    .insert("job_id".to_string(), crate::types::Value::String(job_id));
2156                doc.data.insert(
2157                    "job_type".to_string(),
2158                    crate::types::Value::String(job_type.clone()),
2159                );
2160                doc.data.insert(
2161                    "status".to_string(),
2162                    crate::types::Value::String("pending".to_string()),
2163                );
2164
2165                Ok(MutationResult {
2166                    operation: "enqueueJob".to_string(),
2167                    collection: "__jobs".to_string(),
2168                    affected_count: 1,
2169                    returned_documents: vec![doc],
2170                })
2171            }
2172        }
2173    }
2174    .boxed()
2175}
2176
2177async fn execute_subscription(
2178    db: &Aurora,
2179    sub: &ast::Subscription,
2180    vars: &HashMap<String, ast::Value>,
2181    _options: &ExecutionOptions,
2182) -> Result<ExecutionResult> {
2183    let vars: HashMap<String, ast::Value> = vars.clone();
2184
2185    let selection = sub.selection_set.first().ok_or_else(|| {
2186        AqlError::new(
2187            ErrorCode::QueryError,
2188            "Subscription must have a selection".to_string(),
2189        )
2190    })?;
2191
2192    if let Selection::Field(f) = selection {
2193        let collection = f.name.clone();
2194        let filter = extract_filter_from_args(&f.arguments)?;
2195
2196        let mut listener = db.pubsub.listen(&collection);
2197
2198        if let Some(f) = filter {
2199            let event_filter = ast_filter_to_event_filter(&f, &vars)?;
2200            listener = listener.filter(event_filter);
2201        }
2202
2203        Ok(ExecutionResult::Subscription(SubscriptionResult {
2204            subscription_id: uuid::Uuid::new_v4().to_string(),
2205            collection,
2206            stream: Some(listener),
2207        }))
2208    } else {
2209        Err(AqlError::new(
2210            ErrorCode::QueryError,
2211            "Invalid subscription selection".to_string(),
2212        ))
2213    }
2214}
2215
2216fn ast_filter_to_event_filter(
2217    filter: &AqlFilter,
2218    vars: &HashMap<String, ast::Value>,
2219) -> Result<crate::pubsub::EventFilter> {
2220    use crate::pubsub::EventFilter;
2221
2222    match filter {
2223        AqlFilter::Eq(f, v) => Ok(EventFilter::FieldEquals(
2224            f.clone(),
2225            aql_value_to_db_value(v, vars)?,
2226        )),
2227        AqlFilter::Ne(f, v) => Ok(EventFilter::Ne(f.clone(), aql_value_to_db_value(v, vars)?)),
2228        AqlFilter::Gt(f, v) => Ok(EventFilter::Gt(f.clone(), aql_value_to_db_value(v, vars)?)),
2229        AqlFilter::Gte(f, v) => Ok(EventFilter::Gte(f.clone(), aql_value_to_db_value(v, vars)?)),
2230        AqlFilter::Lt(f, v) => Ok(EventFilter::Lt(f.clone(), aql_value_to_db_value(v, vars)?)),
2231        AqlFilter::Lte(f, v) => Ok(EventFilter::Lte(f.clone(), aql_value_to_db_value(v, vars)?)),
2232        AqlFilter::In(f, v) => Ok(EventFilter::In(f.clone(), aql_value_to_db_value(v, vars)?)),
2233        AqlFilter::NotIn(f, v) => Ok(EventFilter::NotIn(
2234            f.clone(),
2235            aql_value_to_db_value(v, vars)?,
2236        )),
2237        AqlFilter::Contains(f, v) => Ok(EventFilter::Contains(
2238            f.clone(),
2239            aql_value_to_db_value(v, vars)?,
2240        )),
2241        // ContainsAny/ContainsAll don't have EventFilter equivalents; fall back to Contains
2242        AqlFilter::ContainsAny(f, v) | AqlFilter::ContainsAll(f, v) => Ok(EventFilter::Contains(
2243            f.clone(),
2244            aql_value_to_db_value(v, vars)?,
2245        )),
2246        AqlFilter::StartsWith(f, v) => Ok(EventFilter::StartsWith(
2247            f.clone(),
2248            aql_value_to_db_value(v, vars)?,
2249        )),
2250        AqlFilter::EndsWith(f, v) => Ok(EventFilter::EndsWith(
2251            f.clone(),
2252            aql_value_to_db_value(v, vars)?,
2253        )),
2254        AqlFilter::Matches(f, v) => {
2255            let pattern = match aql_value_to_db_value(v, vars)? {
2256                crate::types::Value::String(s) => s,
2257                other => other.to_string(),
2258            };
2259            let re = regex::Regex::new(&pattern).map_err(|e| {
2260                crate::error::AqlError::invalid_operation(format!("Invalid regex pattern: {}", e))
2261            })?;
2262            Ok(EventFilter::Matches(f.clone(), re))
2263        }
2264        AqlFilter::IsNull(f) => Ok(EventFilter::IsNull(f.clone())),
2265        AqlFilter::IsNotNull(f) => Ok(EventFilter::IsNotNull(f.clone())),
2266        AqlFilter::And(filters) => {
2267            let mut mapped = Vec::new();
2268            for f in filters {
2269                mapped.push(ast_filter_to_event_filter(f, vars)?);
2270            }
2271            Ok(EventFilter::And(mapped))
2272        }
2273        AqlFilter::Or(filters) => {
2274            let mut mapped = Vec::new();
2275            for f in filters {
2276                mapped.push(ast_filter_to_event_filter(f, vars)?);
2277            }
2278            Ok(EventFilter::Or(mapped))
2279        }
2280        AqlFilter::Not(f) => Ok(EventFilter::Not(Box::new(ast_filter_to_event_filter(
2281            f, vars,
2282        )?))),
2283    }
2284}
2285
2286async fn execute_introspection(
2287    db: &Aurora,
2288    intro: &ast::IntrospectionQuery,
2289) -> Result<ExecutionResult> {
2290    let names = db.list_collection_names();
2291    let want_fields = intro.fields.is_empty()
2292        || intro
2293            .fields
2294            .iter()
2295            .any(|f| f == "collections" || f == "fields");
2296
2297    let documents: Vec<Document> = names
2298        .iter()
2299        .filter_map(|name| {
2300            // Skip internal system collections from introspection unless asked
2301            if name.starts_with('_') {
2302                return None;
2303            }
2304            let col = db.get_collection_definition(name).ok()?;
2305            let mut data = HashMap::new();
2306            data.insert("name".to_string(), Value::String(name.clone()));
2307
2308            if want_fields {
2309                let field_list: Vec<Value> = col
2310                    .fields
2311                    .iter()
2312                    .map(|(fname, fdef)| {
2313                        let mut fdata = HashMap::new();
2314                        fdata.insert("name".to_string(), Value::String(fname.clone()));
2315                        fdata.insert(
2316                            "type".to_string(),
2317                            Value::String(fdef.field_type.to_string()),
2318                        );
2319                        fdata.insert("required".to_string(), Value::Bool(!fdef.nullable));
2320                        fdata.insert("indexed".to_string(), Value::Bool(fdef.indexed));
2321                        fdata.insert("unique".to_string(), Value::Bool(fdef.unique));
2322                        if !fdef.validations.is_empty() {
2323                            let vcons: Vec<Value> = fdef
2324                                .validations
2325                                .iter()
2326                                .map(|c| Value::String(format!("{:?}", c)))
2327                                .collect();
2328                            fdata.insert("validations".to_string(), Value::Array(vcons));
2329                        }
2330                        Value::Object(fdata)
2331                    })
2332                    .collect();
2333                data.insert("fields".to_string(), Value::Array(field_list));
2334            }
2335
2336            Some(Document {
2337                id: name.clone(),
2338                data,
2339            })
2340        })
2341        .collect();
2342
2343    let count = documents.len();
2344    Ok(ExecutionResult::Query(QueryResult {
2345        collection: "__schema".to_string(),
2346        documents,
2347        total_count: Some(count),
2348        deferred_fields: vec![],
2349        explain: None,
2350    }))
2351}
2352
2353async fn execute_schema(
2354    db: &Aurora,
2355    schema: &ast::Schema,
2356    _options: &ExecutionOptions,
2357) -> Result<ExecutionResult> {
2358    let mut last_collection = String::new();
2359
2360    for op in &schema.operations {
2361        match op {
2362            ast::SchemaOp::DefineCollection {
2363                name,
2364                fields,
2365                if_not_exists,
2366                ..
2367            } => {
2368                last_collection = name.clone();
2369
2370                // If if_not_exists is true, check if it already exists
2371                if *if_not_exists {
2372                    if db.get_collection_definition(name).is_ok() {
2373                        continue;
2374                    }
2375                }
2376
2377                let mut field_defs = Vec::new();
2378                for field in fields {
2379                    field_defs.push((field.name.as_str(), build_field_def(field)));
2380                }
2381                db.new_collection(name, field_defs).await?;
2382            }
2383            ast::SchemaOp::AlterCollection { name, actions } => {
2384                last_collection = name.clone();
2385                for action in actions {
2386                    match action {
2387                        ast::AlterAction::AddField { field, default } => {
2388                            let def = build_field_def(field);
2389                            db.add_field_to_schema(name, field.name.clone(), def)
2390                                .await?;
2391                            if let Some(default_val) = default {
2392                                let db_val = aql_value_to_db_value(default_val, &HashMap::new())?;
2393                                let docs = db.get_all_collection(name).await?;
2394                                for doc in docs {
2395                                    if !doc.data.contains_key(&field.name) {
2396                                        db.update_document(
2397                                            name,
2398                                            &doc.id,
2399                                            vec![(&field.name, db_val.clone())],
2400                                        )
2401                                        .await?;
2402                                    }
2403                                }
2404                            }
2405                        }
2406                        ast::AlterAction::DropField(field_name) => {
2407                            db.drop_field_from_schema(name, field_name.clone()).await?;
2408                        }
2409                        ast::AlterAction::RenameField { from, to } => {
2410                            db.rename_field_in_schema(name, from.clone(), to.clone())
2411                                .await?;
2412                            let docs = db.get_all_collection(name).await?;
2413                            for mut doc in docs {
2414                                if let Some(val) = doc.data.remove(from.as_str()) {
2415                                    doc.data.insert(to.clone(), val);
2416                                    let key = format!("{}:{}", name, doc.id);
2417                                    db.put(key, serde_json::to_vec(&doc)?, None).await?;
2418                                }
2419                            }
2420                        }
2421                        ast::AlterAction::ModifyField(field) => {
2422                            db.modify_field_in_schema(
2423                                name,
2424                                field.name.clone(),
2425                                build_field_def(field),
2426                            )
2427                            .await?;
2428                        }
2429                    }
2430                }
2431            }
2432            ast::SchemaOp::DropCollection { name, .. } => {
2433                db.drop_collection_schema(name).await?;
2434                last_collection = name.clone();
2435            }
2436        }
2437    }
2438
2439    Ok(ExecutionResult::Schema(SchemaResult {
2440        operation: "schema".to_string(),
2441        collection: last_collection,
2442        status: "done".to_string(),
2443    }))
2444}
2445
2446fn map_ast_type(anno: &ast::TypeAnnotation) -> FieldType {
2447    let scalar = match anno.name.to_lowercase().as_str() {
2448        "string" => ScalarType::String,
2449        "int" | "integer" => ScalarType::Int,
2450        "float" | "double" => ScalarType::Float,
2451        "bool" | "boolean" => ScalarType::Bool,
2452        "uuid" => ScalarType::Uuid,
2453        "object" => ScalarType::Object,
2454        "array" => ScalarType::Array,
2455        _ => ScalarType::Any,
2456    };
2457
2458    if anno.is_array {
2459        FieldType::Array(scalar)
2460    } else {
2461        match scalar {
2462            ScalarType::Object => FieldType::Object,
2463            ScalarType::Any => FieldType::Any,
2464            _ => FieldType::Scalar(scalar),
2465        }
2466    }
2467}
2468
2469/// Parse @validate directive arguments into FieldValidationConstraints
2470fn parse_validate_directive(
2471    directive: &ast::Directive,
2472) -> Vec<crate::types::FieldValidationConstraint> {
2473    use crate::types::FieldValidationConstraint as FVC;
2474    let mut constraints = Vec::new();
2475    for arg in &directive.arguments {
2476        match arg.name.as_str() {
2477            "format" => {
2478                if let ast::Value::String(s) = &arg.value {
2479                    constraints.push(FVC::Format(s.clone()));
2480                }
2481            }
2482            "min" => {
2483                let n = match &arg.value {
2484                    ast::Value::Float(f) => Some(*f),
2485                    ast::Value::Int(i) => Some(*i as f64),
2486                    _ => None,
2487                };
2488                if let Some(n) = n {
2489                    constraints.push(FVC::Min(n));
2490                }
2491            }
2492            "max" => {
2493                let n = match &arg.value {
2494                    ast::Value::Float(f) => Some(*f),
2495                    ast::Value::Int(i) => Some(*i as f64),
2496                    _ => None,
2497                };
2498                if let Some(n) = n {
2499                    constraints.push(FVC::Max(n));
2500                }
2501            }
2502            "minLength" => {
2503                if let ast::Value::Int(i) = &arg.value {
2504                    constraints.push(FVC::MinLength(*i));
2505                }
2506            }
2507            "maxLength" => {
2508                if let ast::Value::Int(i) = &arg.value {
2509                    constraints.push(FVC::MaxLength(*i));
2510                }
2511            }
2512            "pattern" => {
2513                if let ast::Value::String(s) = &arg.value {
2514                    constraints.push(FVC::Pattern(s.clone()));
2515                }
2516            }
2517            _ => {}
2518        }
2519    }
2520    constraints
2521}
2522
2523/// Build a FieldDefinition from a field's type + directives
2524fn build_field_def(field: &ast::FieldDef) -> FieldDefinition {
2525    let field_type = map_ast_type(&field.field_type);
2526    let mut indexed = false;
2527    let mut unique = false;
2528    let mut validations = Vec::new();
2529    for directive in &field.directives {
2530        match directive.name.as_str() {
2531            "indexed" | "index" => indexed = true,
2532            "unique" => {
2533                unique = true;
2534                indexed = true;
2535            }
2536            "primary" => {
2537                indexed = true;
2538                unique = true;
2539            }
2540            "validate" => validations.extend(parse_validate_directive(directive)),
2541            _ => {}
2542        }
2543    }
2544    FieldDefinition {
2545        field_type,
2546        unique,
2547        indexed,
2548        nullable: !field.field_type.is_required,
2549        validations,
2550    }
2551}
2552
2553async fn execute_migration(
2554    db: &Aurora,
2555    migration: &ast::Migration,
2556    _options: &ExecutionOptions,
2557) -> Result<ExecutionResult> {
2558    let mut steps_applied = 0;
2559    let mut last_version = String::new();
2560
2561    for step in &migration.steps {
2562        last_version = step.version.clone();
2563
2564        if db.is_migration_applied(&step.version).await? {
2565            eprintln!(
2566                "[migration] version '{}' already applied — skipping",
2567                step.version
2568            );
2569            continue;
2570        }
2571
2572        eprintln!("[migration] applying version '{}'", step.version);
2573
2574        for action in &step.actions {
2575            match action {
2576                ast::MigrationAction::Schema(schema_op) => {
2577                    execute_single_schema_op(db, schema_op).await?;
2578                }
2579                ast::MigrationAction::DataMigration(dm) => {
2580                    execute_data_migration(db, dm).await?;
2581                }
2582            }
2583        }
2584
2585        db.mark_migration_applied(&step.version).await?;
2586        steps_applied += 1;
2587        eprintln!("[migration] version '{}' applied", step.version);
2588    }
2589
2590    let status = if steps_applied > 0 {
2591        "applied".to_string()
2592    } else {
2593        "skipped".to_string()
2594    };
2595
2596    Ok(ExecutionResult::Migration(MigrationResult {
2597        version: last_version,
2598        steps_applied,
2599        status,
2600    }))
2601}
2602
2603/// Execute a single SchemaOp — shared by execute_schema and execute_migration.
2604async fn execute_single_schema_op(db: &Aurora, op: &ast::SchemaOp) -> Result<()> {
2605    match op {
2606        ast::SchemaOp::DefineCollection {
2607            name,
2608            fields,
2609            if_not_exists,
2610            ..
2611        } => {
2612            if *if_not_exists && db.get_collection_definition(name).is_ok() {
2613                return Ok(());
2614            }
2615            let field_defs: Vec<(&str, FieldDefinition)> = fields
2616                .iter()
2617                .map(|f| (f.name.as_str(), build_field_def(f)))
2618                .collect();
2619            db.new_collection(name, field_defs).await?;
2620        }
2621        ast::SchemaOp::AlterCollection { name, actions } => {
2622            for action in actions {
2623                match action {
2624                    ast::AlterAction::AddField { field, default } => {
2625                        db.add_field_to_schema(name, field.name.clone(), build_field_def(field))
2626                            .await?;
2627                        if let Some(default_val) = default {
2628                            let db_val = aql_value_to_db_value(default_val, &HashMap::new())?;
2629                            let docs = db.get_all_collection(name).await?;
2630                            for doc in docs {
2631                                if !doc.data.contains_key(&field.name) {
2632                                    db.update_document(
2633                                        name,
2634                                        &doc.id,
2635                                        vec![(&field.name, db_val.clone())],
2636                                    )
2637                                    .await?;
2638                                }
2639                            }
2640                        }
2641                    }
2642                    ast::AlterAction::DropField(field_name) => {
2643                        db.drop_field_from_schema(name, field_name.clone()).await?;
2644                    }
2645                    ast::AlterAction::RenameField { from, to } => {
2646                        db.rename_field_in_schema(name, from.clone(), to.clone())
2647                            .await?;
2648                        // Rename the field key in every existing document (one read-modify-write per doc)
2649                        let docs = db.get_all_collection(name).await?;
2650                        for mut doc in docs {
2651                            if let Some(val) = doc.data.remove(from.as_str()) {
2652                                doc.data.insert(to.clone(), val);
2653                                let key = format!("{}:{}", name, doc.id);
2654                                db.put(key, serde_json::to_vec(&doc)?, None).await?;
2655                            }
2656                        }
2657                    }
2658                    ast::AlterAction::ModifyField(field) => {
2659                        db.modify_field_in_schema(name, field.name.clone(), build_field_def(field))
2660                            .await?;
2661                    }
2662                }
2663            }
2664        }
2665        ast::SchemaOp::DropCollection { name, if_exists } => {
2666            if *if_exists && db.get_collection_definition(name).is_err() {
2667                return Ok(());
2668            }
2669            db.drop_collection_schema(name).await?;
2670        }
2671    }
2672    Ok(())
2673}
2674
2675/// Apply a `migrate data in <collection> { set field = expr where { ... } }` block.
2676async fn execute_data_migration(db: &Aurora, dm: &ast::DataMigration) -> Result<()> {
2677    let docs = db.get_all_collection(&dm.collection).await?;
2678
2679    for doc in docs {
2680        for transform in &dm.transforms {
2681            let matches = match &transform.filter {
2682                Some(filter) => {
2683                    let compiled = compile_filter(filter)?;
2684                    matches_filter(&doc, &compiled, &HashMap::new())
2685                }
2686                None => true,
2687            };
2688
2689            if matches {
2690                let new_value = eval_migration_expr(&transform.expression, &doc);
2691                let mut updates = HashMap::new();
2692                updates.insert(transform.field.clone(), new_value);
2693                db.aql_update_document(&dm.collection, &doc.id, updates)
2694                    .await?;
2695            }
2696        }
2697    }
2698    Ok(())
2699}
2700
2701/// Evaluate a migration expression string to a `Value`.
2702///
2703/// Supported forms (evaluated in order):
2704/// - String literal:  `"hello"` → `Value::String("hello")`
2705/// - Boolean:         `true` / `false`
2706/// - Null:            `null`
2707/// - Integer:         `42`
2708/// - Float:           `3.14`
2709/// - Field reference: `field_name` (looks up the field in the current document)
2710fn eval_migration_expr(expr: &str, doc: &Document) -> Value {
2711    let expr = expr.trim();
2712
2713    if expr.starts_with('"') && expr.ends_with('"') && expr.len() >= 2 {
2714        return Value::String(expr[1..expr.len() - 1].to_string());
2715    }
2716    if expr == "true" {
2717        return Value::Bool(true);
2718    }
2719    if expr == "false" {
2720        return Value::Bool(false);
2721    }
2722    if expr == "null" {
2723        return Value::Null;
2724    }
2725    if let Ok(n) = expr.parse::<i64>() {
2726        return Value::Int(n);
2727    }
2728    if let Ok(f) = expr.parse::<f64>() {
2729        return Value::Float(f);
2730    }
2731    if let Some(v) = doc.data.get(expr) {
2732        return v.clone();
2733    }
2734
2735    Value::Null
2736}
2737
2738fn extract_filter_from_args(args: &[ast::Argument]) -> Result<Option<AqlFilter>> {
2739    for a in args {
2740        if a.name == "where" || a.name == "filter" {
2741            return Ok(Some(value_to_filter(&a.value)?));
2742        }
2743    }
2744    Ok(None)
2745}
2746
2747fn extract_order_by(args: &[ast::Argument]) -> Vec<ast::Ordering> {
2748    let mut orderings = Vec::new();
2749    for a in args {
2750        if a.name == "orderBy" {
2751            match &a.value {
2752                ast::Value::String(f) => orderings.push(ast::Ordering {
2753                    field: f.clone(),
2754                    direction: ast::SortDirection::Asc,
2755                }),
2756                ast::Value::Object(obj) => {
2757                    // Formal syntax: { field: "fieldname", direction: ASC|DESC }
2758                    if let (Some(ast::Value::String(field_name)), Some(dir_val)) =
2759                        (obj.get("field"), obj.get("direction"))
2760                    {
2761                        let direction = match dir_val {
2762                            ast::Value::Enum(s) | ast::Value::String(s) => {
2763                                if s.to_uppercase() == "DESC" {
2764                                    ast::SortDirection::Desc
2765                                } else {
2766                                    ast::SortDirection::Asc
2767                                }
2768                            }
2769                            _ => ast::SortDirection::Asc,
2770                        };
2771                        orderings.push(ast::Ordering {
2772                            field: field_name.clone(),
2773                            direction,
2774                        });
2775                    } else {
2776                        // Shorthand: { fieldName: "ASC", fieldName2: "DESC" }
2777                        for (field, dir_val) in obj {
2778                            let direction = match dir_val {
2779                                ast::Value::Enum(s) | ast::Value::String(s) => {
2780                                    if s.to_uppercase() == "DESC" {
2781                                        ast::SortDirection::Desc
2782                                    } else {
2783                                        ast::SortDirection::Asc
2784                                    }
2785                                }
2786                                _ => ast::SortDirection::Asc,
2787                            };
2788                            orderings.push(ast::Ordering {
2789                                field: field.clone(),
2790                                direction,
2791                            });
2792                        }
2793                    }
2794                }
2795                _ => {}
2796            }
2797        }
2798    }
2799    orderings
2800}
2801
2802fn apply_ordering(docs: &mut [Document], orderings: &[ast::Ordering]) {
2803    docs.sort_by(|a, b| {
2804        for o in orderings {
2805            let cmp = compare_values(a.data.get(&o.field), b.data.get(&o.field));
2806            if cmp != std::cmp::Ordering::Equal {
2807                return match o.direction {
2808                    ast::SortDirection::Asc => cmp,
2809                    ast::SortDirection::Desc => cmp.reverse(),
2810                };
2811            }
2812        }
2813        std::cmp::Ordering::Equal
2814    });
2815}
2816
2817fn compare_values(a: Option<&Value>, b: Option<&Value>) -> std::cmp::Ordering {
2818    match (a, b) {
2819        (None, None) => std::cmp::Ordering::Equal,
2820        (None, Some(_)) => std::cmp::Ordering::Less,
2821        (Some(_), None) => std::cmp::Ordering::Greater,
2822        (Some(av), Some(bv)) => av.partial_cmp(bv).unwrap_or(std::cmp::Ordering::Equal),
2823    }
2824}
2825
2826pub fn extract_pagination(args: &[ast::Argument]) -> (Option<usize>, usize) {
2827    let (mut limit, mut offset) = (None, 0);
2828    for a in args {
2829        match a.name.as_str() {
2830            "limit" | "first" => {
2831                if let ast::Value::Int(n) = a.value {
2832                    limit = Some(n as usize);
2833                }
2834            }
2835            "offset" | "skip" => {
2836                if let ast::Value::Int(n) = a.value {
2837                    offset = n as usize;
2838                }
2839            }
2840            _ => {}
2841        }
2842    }
2843    (limit, offset)
2844}
2845
2846fn extract_cursor_pagination(
2847    args: &[ast::Argument],
2848) -> (Option<usize>, Option<String>, Option<usize>, Option<String>) {
2849    let (mut first, mut after, mut last, mut before) = (None, None, None, None);
2850    for a in args {
2851        match a.name.as_str() {
2852            "first" => {
2853                if let ast::Value::Int(n) = a.value {
2854                    first = Some(n as usize);
2855                }
2856            }
2857            "after" => {
2858                if let ast::Value::String(s) = &a.value {
2859                    after = Some(s.clone());
2860                }
2861            }
2862            "last" => {
2863                if let ast::Value::Int(n) = a.value {
2864                    last = Some(n as usize);
2865                }
2866            }
2867            "before" => {
2868                if let ast::Value::String(s) = &a.value {
2869                    before = Some(s.clone());
2870                }
2871            }
2872            _ => {}
2873        }
2874    }
2875    (first, after, last, before)
2876}
2877
2878fn execute_connection(
2879    mut docs: Vec<Document>,
2880    sub_fields: &[ast::Field],
2881    limit: Option<usize>,
2882    fragments: &HashMap<String, FragmentDef>,
2883    variables: &HashMap<String, ast::Value>,
2884) -> QueryResult {
2885    let has_next_page = if let Some(l) = limit {
2886        docs.len() > l
2887    } else {
2888        false
2889    };
2890
2891    if has_next_page {
2892        docs.truncate(limit.unwrap());
2893    }
2894
2895    let mut edges = Vec::with_capacity(docs.len());
2896    let mut end_cursor = String::new();
2897
2898    // Find the 'node' selection fields once (potentially expanding fragments)
2899    let node_fields = if let Some(edges_field) = sub_fields.iter().find(|f| f.name == "edges") {
2900        let edges_sub_fields =
2901            collect_fields(&edges_field.selection_set, fragments, variables, None)
2902                .unwrap_or_default();
2903        if let Some(node_field) = edges_sub_fields.into_iter().find(|f| f.name == "node") {
2904            collect_fields(&node_field.selection_set, fragments, variables, None)
2905                .unwrap_or_default()
2906        } else {
2907            Vec::new()
2908        }
2909    } else {
2910        Vec::new()
2911    };
2912
2913    for doc in &docs {
2914        let cursor = doc.id.clone();
2915        end_cursor = cursor.clone();
2916
2917        let mut edge_data = HashMap::new();
2918        edge_data.insert("cursor".to_string(), Value::String(cursor));
2919
2920        let node_doc = if node_fields.is_empty() {
2921            doc.clone()
2922        } else {
2923            apply_projection(doc.clone(), &node_fields)
2924        };
2925
2926        edge_data.insert("node".to_string(), Value::Object(node_doc.data));
2927        edges.push(Value::Object(edge_data));
2928    }
2929
2930    let mut page_info = HashMap::new();
2931    page_info.insert("hasNextPage".to_string(), Value::Bool(has_next_page));
2932    page_info.insert("endCursor".to_string(), Value::String(end_cursor));
2933
2934    let mut conn_data = HashMap::new();
2935    conn_data.insert("edges".to_string(), Value::Array(edges));
2936    conn_data.insert("pageInfo".to_string(), Value::Object(page_info));
2937
2938    QueryResult {
2939        collection: String::new(),
2940        documents: vec![Document {
2941            id: "connection".to_string(),
2942            data: conn_data,
2943        }],
2944        total_count: None,
2945        deferred_fields: vec![],
2946        explain: None,
2947    }
2948}
2949
2950pub fn matches_filter(
2951    doc: &Document,
2952    filter: &CompiledFilter,
2953    vars: &HashMap<String, ast::Value>,
2954) -> bool {
2955    match filter {
2956        CompiledFilter::Eq(f, v) => doc
2957            .data
2958            .get(f)
2959            .map_or(false, |dv| values_equal(dv, v, vars)),
2960        CompiledFilter::Ne(f, v) => doc
2961            .data
2962            .get(f)
2963            .map_or(true, |dv| !values_equal(dv, v, vars)),
2964        CompiledFilter::Gt(f, v) => doc.data.get(f).map_or(false, |dv| {
2965            if let Ok(bv) = aql_value_to_db_value(v, vars) {
2966                return dv > &bv;
2967            }
2968            false
2969        }),
2970        CompiledFilter::Gte(f, v) => doc.data.get(f).map_or(false, |dv| {
2971            if let Ok(bv) = aql_value_to_db_value(v, vars) {
2972                return dv >= &bv;
2973            }
2974            false
2975        }),
2976        CompiledFilter::Lt(f, v) => doc.data.get(f).map_or(false, |dv| {
2977            if let Ok(bv) = aql_value_to_db_value(v, vars) {
2978                return dv < &bv;
2979            }
2980            false
2981        }),
2982        CompiledFilter::Lte(f, v) => doc.data.get(f).map_or(false, |dv| {
2983            if let Ok(bv) = aql_value_to_db_value(v, vars) {
2984                return dv <= &bv;
2985            }
2986            false
2987        }),
2988        CompiledFilter::In(f, v) => doc.data.get(f).map_or(false, |dv| {
2989            if let Ok(Value::Array(arr)) = aql_value_to_db_value(v, vars) {
2990                return arr.contains(dv);
2991            }
2992            false
2993        }),
2994        CompiledFilter::NotIn(f, v) => doc.data.get(f).map_or(true, |dv| {
2995            if let Ok(Value::Array(arr)) = aql_value_to_db_value(v, vars) {
2996                return !arr.contains(dv);
2997            }
2998            true
2999        }),
3000        CompiledFilter::Contains(f, v) => {
3001            if let Some(dv) = doc.data.get(f) {
3002                match (dv, resolve_if_variable(v, vars)) {
3003                    (Value::String(s), ast::Value::String(sub)) => s.contains(sub),
3004                    (Value::Array(arr), _) => {
3005                        if let Ok(bv) = aql_value_to_db_value(v, vars) {
3006                            return arr.contains(&bv);
3007                        }
3008                        false
3009                    }
3010                    _ => false,
3011                }
3012            } else {
3013                false
3014            }
3015        }
3016        // Field array must contain at least one of the provided values
3017        CompiledFilter::ContainsAny(f, v) => {
3018            if let (Some(Value::Array(field_arr)), Ok(Value::Array(check_arr))) =
3019                (doc.data.get(f), aql_value_to_db_value(v, vars))
3020            {
3021                check_arr.iter().any(|item| field_arr.contains(item))
3022            } else {
3023                false
3024            }
3025        }
3026        // Field array must contain all of the provided values
3027        CompiledFilter::ContainsAll(f, v) => {
3028            if let (Some(Value::Array(field_arr)), Ok(Value::Array(check_arr))) =
3029                (doc.data.get(f), aql_value_to_db_value(v, vars))
3030            {
3031                check_arr.iter().all(|item| field_arr.contains(item))
3032            } else {
3033                false
3034            }
3035        }
3036        CompiledFilter::StartsWith(f, v) => {
3037            if let (Some(Value::String(s)), ast::Value::String(pre)) =
3038                (doc.data.get(f), resolve_if_variable(v, vars))
3039            {
3040                s.starts_with(pre)
3041            } else {
3042                false
3043            }
3044        }
3045        CompiledFilter::EndsWith(f, v) => {
3046            if let (Some(Value::String(s)), ast::Value::String(suf)) =
3047                (doc.data.get(f), resolve_if_variable(v, vars))
3048            {
3049                s.ends_with(suf)
3050            } else {
3051                false
3052            }
3053        }
3054        CompiledFilter::Matches(f, re) => {
3055            if let Some(Value::String(s)) = doc.data.get(f) {
3056                re.is_match(s)
3057            } else {
3058                false
3059            }
3060        }
3061        CompiledFilter::IsNull(f) => doc.data.get(f).map_or(true, |v| matches!(v, Value::Null)),
3062        CompiledFilter::IsNotNull(f) => {
3063            doc.data.get(f).map_or(false, |v| !matches!(v, Value::Null))
3064        }
3065        CompiledFilter::And(fs) => fs.iter().all(|f| matches_filter(doc, f, vars)),
3066        CompiledFilter::Or(fs) => fs.iter().any(|f| matches_filter(doc, f, vars)),
3067        CompiledFilter::Not(f) => !matches_filter(doc, f, vars),
3068    }
3069}
3070
3071/// Apply an update modifier object to an existing field value.
3072/// Modifiers: `{ increment: N }`, `{ decrement: N }`, `{ push: V }`,
3073/// `{ pull: V }`, `{ addToSet: V }`.
3074/// Any non-modifier object is returned as-is (plain field replace).
3075fn apply_field_modifier(existing: Option<&Value>, new_val: &Value) -> Value {
3076    if let Value::Object(modifier) = new_val {
3077        if let Some(delta) = modifier.get("increment") {
3078            match (existing, delta) {
3079                (Some(Value::Int(c)), Value::Int(d)) => return Value::Int(c + d),
3080                (Some(Value::Float(c)), Value::Float(d)) => return Value::Float(c + d),
3081                (Some(Value::Int(c)), Value::Float(d)) => return Value::Float(*c as f64 + d),
3082                _ => {}
3083            }
3084        }
3085        if let Some(delta) = modifier.get("decrement") {
3086            match (existing, delta) {
3087                (Some(Value::Int(c)), Value::Int(d)) => return Value::Int(c - d),
3088                (Some(Value::Float(c)), Value::Float(d)) => return Value::Float(c - d),
3089                (Some(Value::Int(c)), Value::Float(d)) => return Value::Float(*c as f64 - d),
3090                _ => {}
3091            }
3092        }
3093        if let Some(item) = modifier.get("push") {
3094            if let Some(Value::Array(mut arr)) = existing.cloned() {
3095                arr.push(item.clone());
3096                return Value::Array(arr);
3097            }
3098            return Value::Array(vec![item.clone()]);
3099        }
3100        if let Some(item) = modifier.get("pull") {
3101            if let Some(Value::Array(arr)) = existing {
3102                let filtered: Vec<Value> = arr.iter().filter(|v| *v != item).cloned().collect();
3103                return Value::Array(filtered);
3104            }
3105            return Value::Array(vec![]);
3106        }
3107        if let Some(item) = modifier.get("addToSet") {
3108            if let Some(Value::Array(mut arr)) = existing.cloned() {
3109                if !arr.contains(item) {
3110                    arr.push(item.clone());
3111                }
3112                return Value::Array(arr);
3113            }
3114            return Value::Array(vec![item.clone()]);
3115        }
3116    }
3117    new_val.clone()
3118}
3119
3120fn values_equal(dv: &Value, av: &ast::Value, vars: &HashMap<String, ast::Value>) -> bool {
3121    if let Ok(bv) = aql_value_to_db_value(av, vars) {
3122        return dv == &bv;
3123    }
3124    false
3125}
3126
3127fn resolve_if_variable<'a>(
3128    v: &'a ast::Value,
3129    vars: &'a HashMap<String, ast::Value>,
3130) -> &'a ast::Value {
3131    if let ast::Value::Variable(n) = v {
3132        vars.get(n).unwrap_or(v)
3133    } else {
3134        v
3135    }
3136}
3137
3138pub fn apply_projection(doc: Document, fields: &[ast::Field]) -> Document {
3139    let (projected, _) = apply_projection_and_defer(doc, fields);
3140    projected
3141}
3142
3143/// Apply projections, resolving lookup fields from foreign collections.
3144/// Returns (projected_doc, deferred_field_names).
3145async fn apply_projection_with_lookups(
3146    db: &Aurora,
3147    mut doc: Document,
3148    fields: &[ast::Field],
3149    vars: &HashMap<String, ast::Value>,
3150) -> Result<(Document, Vec<String>)> {
3151    if fields.is_empty() {
3152        return Ok((doc, vec![]));
3153    }
3154    let mut proj = HashMap::new();
3155    let mut deferred = Vec::new();
3156
3157    for f in fields {
3158        // @defer
3159        if f.directives.iter().any(|d| d.name == "defer") {
3160            deferred.push(f.alias.as_ref().unwrap_or(&f.name).clone());
3161            continue;
3162        }
3163
3164        // Lookup field: has collection + localField + foreignField arguments
3165        let coll_arg = f.arguments.iter().find(|a| a.name == "collection");
3166        let local_arg = f.arguments.iter().find(|a| a.name == "localField");
3167        let foreign_arg = f.arguments.iter().find(|a| a.name == "foreignField");
3168
3169        if let (Some(c), Some(lf), Some(ff)) = (coll_arg, local_arg, foreign_arg) {
3170            // Resolve variable references in lookup arguments
3171            let c_val = resolve_if_variable(&c.value, vars);
3172            let lf_val = resolve_if_variable(&lf.value, vars);
3173            let ff_val = resolve_if_variable(&ff.value, vars);
3174            let foreign_coll = if let ast::Value::String(s) = c_val {
3175                s.as_str()
3176            } else {
3177                continue;
3178            };
3179            let local_field = if let ast::Value::String(s) = lf_val {
3180                s.as_str()
3181            } else {
3182                continue;
3183            };
3184            let foreign_field = if let ast::Value::String(s) = ff_val {
3185                s.as_str()
3186            } else {
3187                continue;
3188            };
3189
3190            // Get the local field value to match against the foreign collection
3191            let local_val = doc.data.get(local_field).cloned().or_else(|| {
3192                if local_field == "id" {
3193                    Some(Value::String(doc.id.clone()))
3194                } else {
3195                    None
3196                }
3197            });
3198
3199            if let Some(match_val) = local_val {
3200                // Optional filter — deeply resolve variables inside it before compiling
3201                let extra_filter = f
3202                    .arguments
3203                    .iter()
3204                    .find(|a| a.name == "where")
3205                    .and_then(|a| {
3206                        let resolved = resolve_ast_deep(&a.value, vars);
3207                        value_to_filter(&resolved).ok()
3208                    });
3209
3210                let vars_arc = Arc::new(vars.clone());
3211                let foreign_docs = db.scan_and_filter(
3212                    foreign_coll,
3213                    |fdoc| {
3214                        let field_match = fdoc
3215                            .data
3216                            .get(foreign_field)
3217                            .map(|v| values_equal_db(v, &match_val))
3218                            .unwrap_or(foreign_field == "id" && fdoc.id == match_val.to_string());
3219                        if !field_match {
3220                            return false;
3221                        }
3222                        if let Some(ref ef) = extra_filter {
3223                            let compiled = compile_filter(ef)
3224                                .unwrap_or(CompiledFilter::Eq("_".into(), ast::Value::Null));
3225                            matches_filter(fdoc, &compiled, &vars_arc)
3226                        } else {
3227                            true
3228                        }
3229                    },
3230                    None,
3231                )?;
3232
3233                // Apply sub-projection to foreign docs
3234                let sub_projected: Vec<Value> = if f.selection_set.is_empty() {
3235                    foreign_docs
3236                        .into_iter()
3237                        .map(|fd| Value::Object(fd.data))
3238                        .collect()
3239                } else {
3240                    let sub_fields: Vec<ast::Field> = f
3241                        .selection_set
3242                        .iter()
3243                        .filter_map(|sel| {
3244                            if let Selection::Field(sf) = sel {
3245                                Some(sf.clone())
3246                            } else {
3247                                None
3248                            }
3249                        })
3250                        .collect();
3251                    foreign_docs
3252                        .into_iter()
3253                        .map(|fd| {
3254                            let (proj_fd, _) = apply_projection_and_defer(fd, &sub_fields);
3255                            Value::Object(proj_fd.data)
3256                        })
3257                        .collect()
3258                };
3259
3260                let alias = f.alias.as_ref().unwrap_or(&f.name).clone();
3261                proj.insert(alias, Value::Array(sub_projected));
3262            }
3263            continue;
3264        }
3265
3266        // Computed field
3267        if f.name == "__compute__" {
3268            let alias = f.alias.as_deref().unwrap_or("computed");
3269            if let Some(expr) = f.arguments.iter().find(|a| a.name == "expr") {
3270                let resolved_expr = resolve_if_variable(&expr.value, vars);
3271                if let ast::Value::String(template) = resolved_expr {
3272                    let result = eval_template(template, &doc.data);
3273                    proj.insert(alias.to_string(), Value::String(result));
3274                }
3275            }
3276            continue;
3277        }
3278
3279        // Normal field
3280        if f.name == "id" {
3281            proj.insert(
3282                f.alias.as_ref().unwrap_or(&f.name).clone(),
3283                Value::String(doc.id.clone()),
3284            );
3285        } else if let Some(v) = doc.data.get(&f.name) {
3286            proj.insert(f.alias.as_ref().unwrap_or(&f.name).clone(), v.clone());
3287        }
3288    }
3289    doc.data = proj;
3290    Ok((doc, deferred))
3291}
3292
3293fn values_equal_db(a: &Value, b: &Value) -> bool {
3294    match (a, b) {
3295        (Value::String(s1), Value::String(s2)) => s1 == s2,
3296        (Value::Int(i1), Value::Int(i2)) => i1 == i2,
3297        (Value::Float(f1), Value::Float(f2)) => (f1 - f2).abs() < f64::EPSILON,
3298        (Value::Bool(b1), Value::Bool(b2)) => b1 == b2,
3299        _ => false,
3300    }
3301}
3302
3303pub fn aql_value_to_db_value(v: &ast::Value, vars: &HashMap<String, ast::Value>) -> Result<Value> {
3304    let resolved = resolve_if_variable(v, vars);
3305    match resolved {
3306        ast::Value::Int(i) => Ok(Value::Int(*i)),
3307        ast::Value::Float(f) => Ok(Value::Float(*f)),
3308        ast::Value::Boolean(b) => Ok(Value::Bool(*b)),
3309        ast::Value::String(s) => Ok(Value::String(s.clone())),
3310        ast::Value::Enum(s) => Ok(Value::String(s.clone())),
3311        ast::Value::Null => Ok(Value::Null),
3312        ast::Value::Variable(name) => Err(AqlError::new(
3313            ErrorCode::UndefinedVariable,
3314            format!("Variable '{}' not found", name),
3315        )),
3316        ast::Value::Array(arr) => {
3317            let mut vals = Vec::with_capacity(arr.len());
3318            for v in arr {
3319                vals.push(aql_value_to_db_value(v, vars)?);
3320            }
3321            Ok(Value::Array(vals))
3322        }
3323        ast::Value::Object(obj) => {
3324            let mut map = HashMap::with_capacity(obj.len());
3325            for (k, v) in obj {
3326                map.insert(k.clone(), aql_value_to_db_value(v, vars)?);
3327            }
3328            Ok(Value::Object(map))
3329        }
3330    }
3331}
3332
3333/// Convert AQL AST value to serde_json::Value for job payloads
3334fn aql_value_to_json(v: &ast::Value) -> serde_json::Value {
3335    match v {
3336        ast::Value::Null => serde_json::Value::Null,
3337        ast::Value::Boolean(b) => serde_json::Value::Bool(*b),
3338        ast::Value::Int(i) => serde_json::Value::Number((*i).into()),
3339        ast::Value::Float(f) => serde_json::Number::from_f64(*f)
3340            .map(serde_json::Value::Number)
3341            .unwrap_or(serde_json::Value::Null),
3342        ast::Value::String(s) | ast::Value::Enum(s) => serde_json::Value::String(s.clone()),
3343        ast::Value::Array(arr) => {
3344            serde_json::Value::Array(arr.iter().map(aql_value_to_json).collect())
3345        }
3346        ast::Value::Object(obj) => serde_json::Value::Object(
3347            obj.iter()
3348                .map(|(k, v)| (k.clone(), aql_value_to_json(v)))
3349                .collect(),
3350        ),
3351        ast::Value::Variable(_) => serde_json::Value::Null,
3352    }
3353}
3354
3355fn aql_value_to_hashmap(
3356    v: &ast::Value,
3357    vars: &HashMap<String, ast::Value>,
3358) -> Result<HashMap<String, Value>> {
3359    if let ast::Value::Object(m) = resolve_if_variable(v, vars) {
3360        let mut res = HashMap::new();
3361        for (k, v) in m {
3362            res.insert(k.clone(), aql_value_to_db_value(v, vars)?);
3363        }
3364        Ok(res)
3365    } else {
3366        Err(AqlError::new(
3367            ErrorCode::QueryError,
3368            "Data must be object".to_string(),
3369        ))
3370    }
3371}
3372
3373fn aurora_value_to_json_value(v: &Value) -> JsonValue {
3374    match v {
3375        Value::Null => JsonValue::Null,
3376        Value::String(s) => JsonValue::String(s.clone()),
3377        Value::Int(i) => JsonValue::Number((*i).into()),
3378        Value::Float(f) => serde_json::Number::from_f64(*f)
3379            .map(JsonValue::Number)
3380            .unwrap_or(JsonValue::Null),
3381        Value::Bool(b) => JsonValue::Bool(*b),
3382        Value::Array(arr) => JsonValue::Array(arr.iter().map(aurora_value_to_json_value).collect()),
3383        Value::Object(m) => {
3384            let mut jm = serde_json::Map::new();
3385            for (k, v) in m {
3386                jm.insert(k.clone(), aurora_value_to_json_value(v));
3387            }
3388            JsonValue::Object(jm)
3389        }
3390        Value::Uuid(u) => JsonValue::String(u.to_string()),
3391        Value::DateTime(dt) => JsonValue::String(dt.to_rfc3339()),
3392    }
3393}
3394
3395/// Find an equality filter on an indexed field to allow O(1) lookup short-circuit
3396fn find_indexed_equality_filter(
3397    filter: &AqlFilter,
3398    db: &Aurora,
3399    collection: &str,
3400) -> Option<(String, ast::Value)> {
3401    match filter {
3402        AqlFilter::Eq(field, val) => {
3403            if field == "id" || db.has_index(collection, field) {
3404                Some((field.clone(), val.clone()))
3405            } else {
3406                None
3407            }
3408        }
3409        AqlFilter::And(filters) => {
3410            for f in filters {
3411                if let Some(res) = find_indexed_equality_filter(f, db, collection) {
3412                    return Some(res);
3413                }
3414            }
3415            None
3416        }
3417        _ => None,
3418    }
3419}
3420
3421pub fn value_to_filter(v: &ast::Value) -> Result<AqlFilter> {
3422    if let ast::Value::Object(m) = v {
3423        let mut fs = Vec::new();
3424        for (k, val) in m {
3425            match k.as_str() {
3426                "or" => {
3427                    if let ast::Value::Array(arr) = val {
3428                        let mut sub = Vec::new();
3429                        for item in arr {
3430                            sub.push(value_to_filter(item)?);
3431                        }
3432                        return Ok(AqlFilter::Or(sub));
3433                    }
3434                }
3435                "and" => {
3436                    if let ast::Value::Array(arr) = val {
3437                        let mut sub = Vec::new();
3438                        for item in arr {
3439                            sub.push(value_to_filter(item)?);
3440                        }
3441                        return Ok(AqlFilter::And(sub));
3442                    }
3443                }
3444                "not" => {
3445                    return Ok(AqlFilter::Not(Box::new(value_to_filter(val)?)));
3446                }
3447                field => {
3448                    if let ast::Value::Object(ops) = val {
3449                        for (op, ov) in ops {
3450                            match op.as_str() {
3451                                "eq" => fs.push(AqlFilter::Eq(field.to_string(), ov.clone())),
3452                                "ne" => fs.push(AqlFilter::Ne(field.to_string(), ov.clone())),
3453                                "gt" => fs.push(AqlFilter::Gt(field.to_string(), ov.clone())),
3454                                "gte" => fs.push(AqlFilter::Gte(field.to_string(), ov.clone())),
3455                                "lt" => fs.push(AqlFilter::Lt(field.to_string(), ov.clone())),
3456                                "lte" => fs.push(AqlFilter::Lte(field.to_string(), ov.clone())),
3457                                "in" => fs.push(AqlFilter::In(field.to_string(), ov.clone())),
3458                                "notin" => fs.push(AqlFilter::NotIn(field.to_string(), ov.clone())),
3459                                "contains" => {
3460                                    fs.push(AqlFilter::Contains(field.to_string(), ov.clone()))
3461                                }
3462                                "containsAny" => {
3463                                    fs.push(AqlFilter::ContainsAny(field.to_string(), ov.clone()))
3464                                }
3465                                "containsAll" => {
3466                                    fs.push(AqlFilter::ContainsAll(field.to_string(), ov.clone()))
3467                                }
3468                                "startsWith" => {
3469                                    fs.push(AqlFilter::StartsWith(field.to_string(), ov.clone()))
3470                                }
3471                                "endsWith" => {
3472                                    fs.push(AqlFilter::EndsWith(field.to_string(), ov.clone()))
3473                                }
3474                                "matches" => {
3475                                    fs.push(AqlFilter::Matches(field.to_string(), ov.clone()))
3476                                }
3477                                _ => {}
3478                            }
3479                        }
3480                    }
3481                }
3482            }
3483        }
3484        if fs.is_empty() {
3485            Ok(AqlFilter::And(vec![]))
3486        } else if fs.len() == 1 {
3487            Ok(fs.remove(0))
3488        } else {
3489            Ok(AqlFilter::And(fs))
3490        }
3491    } else {
3492        Err(AqlError::new(
3493            ErrorCode::QueryError,
3494            "Filter must be object".to_string(),
3495        ))
3496    }
3497}
3498
3499fn resolve_value(
3500    v: &ast::Value,
3501    vars: &HashMap<String, ast::Value>,
3502    _ctx: &ExecutionContext,
3503) -> ast::Value {
3504    match v {
3505        ast::Value::Variable(n) => vars.get(n).cloned().unwrap_or(ast::Value::Null),
3506        _ => v.clone(),
3507    }
3508}
3509
3510/// Recursively resolve all variable references inside an AST value.
3511/// Unlike `resolve_value` (which only handles a top-level `$var`), this walks
3512/// into objects and arrays so nested references like
3513/// `{ query: $term }` or `{ status: { eq: $activeStatus } }` are fully resolved.
3514fn resolve_ast_deep(v: &ast::Value, vars: &HashMap<String, ast::Value>) -> ast::Value {
3515    match v {
3516        ast::Value::Variable(n) => vars.get(n).cloned().unwrap_or(ast::Value::Null),
3517        ast::Value::Object(m) => ast::Value::Object(
3518            m.iter()
3519                .map(|(k, val)| (k.clone(), resolve_ast_deep(val, vars)))
3520                .collect(),
3521        ),
3522        ast::Value::Array(arr) => {
3523            ast::Value::Array(arr.iter().map(|val| resolve_ast_deep(val, vars)).collect())
3524        }
3525        _ => v.clone(),
3526    }
3527}
3528
3529#[cfg(test)]
3530mod tests {
3531    use super::*;
3532    use crate::{Aurora, AuroraConfig, DurabilityMode, FieldType};
3533    use tempfile::TempDir;
3534
3535    #[tokio::test]
3536    async fn test_executor_integration() {
3537        let td = TempDir::new().unwrap();
3538        let db = Aurora::with_config(AuroraConfig {
3539            db_path: td.path().join("test.db"),
3540            enable_write_buffering: false,
3541            durability_mode: DurabilityMode::Strict,
3542            ..Default::default()
3543        })
3544        .await
3545        .unwrap();
3546        db.new_collection(
3547            "users",
3548            vec![(
3549                "name",
3550                crate::types::FieldDefinition {
3551                    field_type: FieldType::SCALAR_STRING,
3552                    unique: false,
3553                    indexed: true,
3554                    nullable: true,
3555                    ..Default::default()
3556                },
3557            )],
3558        )
3559        .await
3560        .unwrap();
3561        let _ = execute(
3562            &db,
3563            r#"mutation { insertInto(collection: "users", data: { name: "Alice" }) { id name } }"#,
3564            ExecutionOptions::new(),
3565        )
3566        .await
3567        .unwrap();
3568        let res = execute(&db, "query { users { name } }", ExecutionOptions::new())
3569            .await
3570            .unwrap();
3571        if let ExecutionResult::Query(q) = res {
3572            assert_eq!(q.documents.len(), 1);
3573        } else {
3574            panic!("Expected query");
3575        }
3576    }
3577}