Skip to main content

aurora_db/parser/
executor.rs

1//! AQL Executor - Connects parsed AQL to Aurora operations
2//!
3//! Provides the bridge between AQL AST and Aurora's database operations.
4//! All operations (queries, mutations, subscriptions) go through execute().
5
6use super::ast::{self, Field, Filter as AqlFilter, FragmentDef, MutationOp, Operation, Selection};
7use super::executor_utils::{CompiledFilter, compile_filter};
8
9use crate::Aurora;
10use crate::error::{AqlError, ErrorCode, Result};
11use crate::types::{Document, Value};
12use serde::Serialize;
13use serde_json::Value as JsonValue;
14use std::collections::HashMap;
15
16pub type ExecutionContext = HashMap<String, JsonValue>;
17
18/// Helper to flatten a selection set (processing fragments) into a list of fields
19fn collect_fields(
20    selection_set: &[Selection],
21    fragments: &HashMap<String, FragmentDef>,
22    variable_values: &HashMap<String, ast::Value>,
23    parent_type: Option<&str>,
24) -> Result<Vec<Field>> {
25    let mut fields = Vec::new();
26
27    for selection in selection_set {
28        match selection {
29            Selection::Field(field) => {
30                // Check directives for @skip or @include
31                if should_include(&field.directives, variable_values)? {
32                    fields.push(field.clone());
33                }
34            }
35            Selection::FragmentSpread(name) => {
36                if let Some(fragment) = fragments.get(name) {
37                    // Check type condition if parent type is known
38                    let type_match = if let Some(parent) = parent_type {
39                        parent == fragment.type_condition
40                    } else {
41                        true // If parent type unknown, assume match or skip check
42                    };
43
44                    if type_match {
45                        let fragment_fields = collect_fields(
46                            &fragment.selection_set,
47                            fragments,
48                            variable_values,
49                            parent_type,
50                        )?;
51                        fields.extend(fragment_fields);
52                    }
53                }
54            }
55            Selection::InlineFragment(inline) => {
56                // Check type condition if parent type is known
57                let type_match = if let Some(parent) = parent_type {
58                    parent == inline.type_condition
59                } else {
60                    true // If parent type unknown, assume match
61                };
62
63                if type_match {
64                    let inline_fields = collect_fields(
65                        &inline.selection_set,
66                        fragments,
67                        variable_values,
68                        parent_type,
69                    )?;
70                    fields.extend(inline_fields);
71                }
72            }
73        }
74    }
75
76    Ok(fields)
77}
78
79/// Check if a field/fragment should be included based on @skip/@include
80fn should_include(
81    directives: &[ast::Directive],
82    variables: &HashMap<String, ast::Value>,
83) -> Result<bool> {
84    for dir in directives {
85        if dir.name == "skip" {
86            if let Some(arg) = dir.arguments.iter().find(|a| a.name == "if") {
87                let should_skip = resolve_boolean_arg(&arg.value, variables)?;
88                if should_skip {
89                    return Ok(false);
90                }
91            }
92        } else if dir.name == "include" {
93            if let Some(arg) = dir.arguments.iter().find(|a| a.name == "if") {
94                let should_include = resolve_boolean_arg(&arg.value, variables)?;
95                if !should_include {
96                    return Ok(false);
97                }
98            }
99        }
100    }
101    Ok(true)
102}
103
104fn resolve_boolean_arg(
105    value: &ast::Value,
106    variables: &HashMap<String, ast::Value>,
107) -> Result<bool> {
108    match value {
109        ast::Value::Boolean(b) => Ok(*b),
110        ast::Value::Variable(name) => {
111            if let Some(val) = variables.get(name) {
112                match val {
113                    ast::Value::Boolean(b) => Ok(*b),
114                    _ => Err(AqlError::new(
115                        ErrorCode::TypeError,
116                        format!("Variable '{}' is not a boolean, got {:?}", name, val),
117                    )),
118                }
119            } else {
120                Err(AqlError::new(
121                    ErrorCode::UndefinedVariable,
122                    format!("Variable '{}' is not defined", name),
123                ))
124            }
125        }
126        _ => Err(AqlError::new(
127            ErrorCode::TypeError,
128            format!("Expected boolean value, got {:?}", value),
129        )),
130    }
131}
132
133/// Validate that all required variables are provided
134fn validate_required_variables(
135    variable_definitions: &[ast::VariableDefinition],
136    provided_variables: &HashMap<String, ast::Value>,
137) -> Result<()> {
138    for var_def in variable_definitions {
139        // Check if variable is required (has ! in type annotation)
140        if var_def.var_type.is_required {
141            // Check if variable is provided
142            if !provided_variables.contains_key(&var_def.name) {
143                // Check if there's a default value
144                if var_def.default_value.is_none() {
145                    return Err(AqlError::new(
146                        ErrorCode::UndefinedVariable,
147                        format!(
148                            "Required variable '{}' (type: {}{}) is not provided",
149                            var_def.name,
150                            var_def.var_type.name,
151                            if var_def.var_type.is_required {
152                                "!"
153                            } else {
154                                ""
155                            }
156                        ),
157                    ));
158                }
159            }
160        }
161    }
162    Ok(())
163}
164
165/// Result of executing an AQL operation
166#[derive(Debug)]
167pub enum ExecutionResult {
168    /// Query result with documents
169    Query(QueryResult),
170    /// Mutation result with affected documents
171    Mutation(MutationResult),
172    /// Subscription ID for reactive queries
173    Subscription(SubscriptionResult),
174    /// Multiple results for batch operations
175    Batch(Vec<ExecutionResult>),
176    /// Schema operation result
177    Schema(SchemaResult),
178    /// Migration operation result
179    Migration(MigrationResult),
180}
181
182#[derive(Debug, Clone)]
183pub struct SchemaResult {
184    pub operation: String,
185    pub collection: String,
186    pub status: String,
187}
188
189#[derive(Debug, Clone)]
190pub struct MigrationResult {
191    pub version: String,
192    pub steps_applied: usize,
193    pub status: String,
194}
195
196#[derive(Debug, Clone, Serialize)]
197pub struct ExecutionPlan {
198    pub operations: Vec<String>,
199    pub estimated_cost: f64,
200}
201
202/// Query execution result
203#[derive(Debug, Clone)]
204pub struct QueryResult {
205    pub collection: String,
206    pub documents: Vec<Document>,
207    pub total_count: Option<usize>,
208}
209
210/// Mutation execution result
211#[derive(Debug, Clone)]
212pub struct MutationResult {
213    pub operation: String,
214    pub collection: String,
215    pub affected_count: usize,
216    pub returned_documents: Vec<Document>,
217}
218
219/// Subscription result
220#[derive(Debug)]
221pub struct SubscriptionResult {
222    pub subscription_id: String,
223    pub collection: String,
224    pub stream: Option<crate::pubsub::ChangeListener>,
225}
226
227/// Execution options
228#[derive(Debug, Clone, Default)]
229pub struct ExecutionOptions {
230    /// Skip validation (for performance when you trust the query)
231    pub skip_validation: bool,
232    /// Apply projections (return only requested fields)
233    pub apply_projections: bool,
234    /// Variable values for parameterized queries
235    pub variables: HashMap<String, JsonValue>,
236}
237
238impl ExecutionOptions {
239    pub fn new() -> Self {
240        Self {
241            skip_validation: false,
242            apply_projections: true,
243            variables: HashMap::new(),
244        }
245    }
246
247    pub fn with_variables(mut self, vars: HashMap<String, JsonValue>) -> Self {
248        self.variables = vars;
249        self
250    }
251
252    pub fn skip_validation(mut self) -> Self {
253        self.skip_validation = true;
254        self
255    }
256}
257
258/// Execute an AQL query string against the database
259pub async fn execute(db: &Aurora, aql: &str, options: ExecutionOptions) -> Result<ExecutionResult> {
260    // Parse the AQL string
261    let variables = serde_json::Value::Object(options.variables.clone().into_iter().collect());
262    let doc = super::parse_with_variables(aql, variables)?;
263
264    // Execute each operation
265    execute_document(db, &doc, &options).await
266}
267
268/// Execute a parsed AQL document
269pub async fn execute_document(
270    db: &Aurora,
271    doc: &ast::Document,
272    options: &ExecutionOptions,
273) -> Result<ExecutionResult> {
274    if doc.operations.is_empty() {
275        return Err(AqlError::new(
276            ErrorCode::QueryError,
277            "No operations in document".to_string(),
278        ));
279    }
280
281    // Collect fragments from the document for reference
282    let fragments: HashMap<String, FragmentDef> = doc
283        .operations
284        .iter()
285        .filter_map(|op| {
286            if let Operation::FragmentDefinition(frag) = op {
287                Some((frag.name.clone(), frag.clone()))
288            } else {
289                None
290            }
291        })
292        .collect();
293
294    // Execute only non-fragment operations
295    let executable_ops: Vec<&Operation> = doc
296        .operations
297        .iter()
298        .filter(|op| !matches!(op, Operation::FragmentDefinition(_)))
299        .collect();
300
301    if executable_ops.is_empty() {
302        return Err(AqlError::new(
303            ErrorCode::QueryError,
304            "No executable operations in document".to_string(),
305        ));
306    }
307
308    if executable_ops.len() == 1 {
309        execute_operation(db, executable_ops[0], options, &fragments).await
310    } else {
311        let mut results = Vec::new();
312        for op in executable_ops {
313            results.push(execute_operation(db, op, options, &fragments).await?);
314        }
315        Ok(ExecutionResult::Batch(results))
316    }
317}
318
319/// Execute a single operation
320async fn execute_operation(
321    db: &Aurora,
322    op: &Operation,
323    options: &ExecutionOptions,
324    fragments: &HashMap<String, FragmentDef>,
325) -> Result<ExecutionResult> {
326    match op {
327        Operation::Query(query) => execute_query(db, query, options, fragments).await,
328        Operation::Mutation(mutation) => execute_mutation(db, mutation, options, fragments).await,
329        Operation::Subscription(sub) => execute_subscription(db, sub, options).await,
330        Operation::Schema(schema) => execute_schema(db, schema, options).await,
331        Operation::Migration(migration) => execute_migration(db, migration, options).await,
332        Operation::FragmentDefinition(_) => {
333            // Fragment definitions are stored for reference, not executed directly
334            Ok(ExecutionResult::Query(QueryResult {
335                collection: "__fragment".to_string(),
336                documents: vec![],
337                total_count: Some(0),
338            }))
339        }
340        Operation::Introspection(intro) => execute_introspection(db, intro).await,
341        Operation::Handler(_) => {
342            // Handler definitions are registered, not executed as queries
343            Ok(ExecutionResult::Query(QueryResult {
344                collection: "__handler".to_string(),
345                documents: vec![],
346                total_count: Some(0),
347            }))
348        }
349    }
350}
351
352/// Execute a query operation
353async fn execute_query(
354    db: &Aurora,
355    query: &ast::Query,
356    options: &ExecutionOptions,
357    fragments: &HashMap<String, FragmentDef>,
358) -> Result<ExecutionResult> {
359    // Validate required variables are provided
360    validate_required_variables(&query.variable_definitions, &query.variables_values)?;
361
362    // Collect fields (flatten fragments)
363    let root_fields = collect_fields(
364        &query.selection_set,
365        fragments,
366        &query.variables_values,
367        None,
368    )?;
369
370    // For each field in selection_set, it represents a collection query
371    if root_fields.is_empty() {
372        return Ok(ExecutionResult::Query(QueryResult {
373            collection: String::new(),
374            documents: vec![],
375            total_count: Some(0),
376        }));
377    }
378
379    // Execute each top-level field as a collection query
380    let mut results = Vec::new();
381    for field in &root_fields {
382        let result =
383            execute_collection_query(db, field, &query.variables_values, options, fragments)
384                .await?;
385        results.push(result);
386    }
387
388    if results.len() == 1 {
389        Ok(ExecutionResult::Query(results.remove(0)))
390    } else {
391        Ok(ExecutionResult::Batch(
392            results.into_iter().map(ExecutionResult::Query).collect(),
393        ))
394    }
395}
396
397/// Execute a collection query (single field with selection set)
398async fn execute_collection_query(
399    db: &Aurora,
400    field: &ast::Field,
401    variables: &HashMap<String, ast::Value>,
402    options: &ExecutionOptions,
403    fragments: &HashMap<String, FragmentDef>,
404) -> Result<QueryResult> {
405    let collection_name = &field.name;
406
407    // Collect sub-fields
408    // The parent type for sub-fields is the collection name (which is the field name here)
409    let sub_fields = collect_fields(
410        &field.selection_set,
411        fragments,
412        variables,
413        Some(&field.name),
414    )?;
415    let filter = extract_filter_from_args(&field.arguments)?;
416
417    // Extract pagination arguments
418    let (limit, offset) = extract_pagination(&field.arguments);
419    let (first, after, _last, _before) = extract_cursor_pagination(&field.arguments);
420
421    // Determines if we are in Connection Mode (Relay style)
422    // Check if selection set asks for "edges" or "pageInfo"
423    let is_connection = sub_fields
424        .iter()
425        .any(|f| f.name == "edges" || f.name == "pageInfo");
426
427    // Extract and apply orderBy if present
428    let orderings = extract_order_by(&field.arguments);
429
430    // Compile filter once
431    let compiled_filter = if let Some(ref f) = filter {
432        Some(compile_filter(f)?)
433    } else {
434        None
435    };
436
437    // Create filter closure
438    let filter_fn = |doc: &Document| {
439        compiled_filter
440            .as_ref()
441            .map(|f| matches_filter(doc, f, variables))
442            .unwrap_or(true)
443    };
444
445    // Optimization: If no sorting is required, we can limit the scan
446    let target = if orderings.is_empty() {
447        limit.map(|l| l + offset)
448    } else {
449        None
450    };
451
452    let mut filtered_docs_iter = db.scan_and_filter(collection_name, filter_fn, target)?;
453
454    // Extract and apply orderBy if present
455    let orderings = extract_order_by(&field.arguments);
456    if !orderings.is_empty() {
457        apply_ordering(&mut filtered_docs_iter, &orderings);
458    }
459
460    let total_count = filtered_docs_iter.len();
461
462    let final_docs = if is_connection {
463        // Sort by ID for stable, consistent pagination ordering
464        // This ensures cursor-based pagination works correctly across queries
465        filtered_docs_iter.sort_by(|a, b| a.id.cmp(&b.id));
466
467        // 1. Filter by 'after' cursor
468        if let Some(after_cursor) = after {
469            // Decode cursor to get the ID (or sort key)
470            if let Ok(after_id) = decode_cursor(&after_cursor) {
471                // Assuming sort by ID for now.
472                // Find index of document with this ID
473                if let Some(pos) = filtered_docs_iter.iter().position(|d| d.id == after_id) {
474                    // Skip up to and including the cursor
475                    filtered_docs_iter = filtered_docs_iter.into_iter().skip(pos + 1).collect();
476                }
477            }
478        }
479
480        let has_next_page = if let Some(l) = first {
481            filtered_docs_iter.len() > l
482        } else {
483            false
484        };
485
486        // Apply 'first' limit
487        if let Some(l) = first {
488            filtered_docs_iter.truncate(l);
489        }
490
491        // Construct edges
492        let mut edges = Vec::new();
493        let mut end_cursor = None;
494
495        for doc in filtered_docs_iter {
496            let cursor = encode_cursor(&Value::String(doc.id.clone()));
497            end_cursor = Some(cursor.clone());
498
499            let mut edge_data = HashMap::new();
500            edge_data.insert("cursor".to_string(), Value::String(cursor));
501
502            // Process 'node' selection
503            // Find 'edges' field in selection set, then 'node' subfield
504            if let Some(edges_field) = sub_fields.iter().find(|f| f.name == "edges") {
505                let edges_sub_fields =
506                    collect_fields(&edges_field.selection_set, fragments, variables, None)?;
507
508                if let Some(node_field) = edges_sub_fields.iter().find(|f| f.name == "node") {
509                    // Apply projection to node with lookup support
510                    let node_doc = apply_projection_with_lookups(
511                        db,
512                        doc.clone(), // Pass a clone for projection
513                        &node_field.selection_set,
514                        variables,
515                        fragments,
516                        collection_name,
517                    )
518                    .await?;
519                    edge_data.insert("node".to_string(), Value::Object(node_doc.data));
520                }
521            }
522
523            edges.push(Value::Object(edge_data));
524        }
525
526        // Construct pageInfo
527        let mut page_info_data = HashMap::new();
528        page_info_data.insert("hasNextPage".to_string(), Value::Bool(has_next_page));
529        if let Some(ec) = end_cursor {
530            page_info_data.insert("endCursor".to_string(), Value::String(ec));
531        } else {
532            page_info_data.insert("endCursor".to_string(), Value::Null);
533        }
534
535        // Construct result wrapper
536        let mut conn_data = HashMap::new();
537        conn_data.insert("edges".to_string(), Value::Array(edges));
538        conn_data.insert("pageInfo".to_string(), Value::Object(page_info_data));
539
540        vec![Document {
541            id: "connection".to_string(),
542            data: conn_data,
543        }]
544    } else {
545        // --- List Mode (Legacy/Standard) ---
546
547        // Apply limit/offset
548        let paginated_docs: Vec<Document> = filtered_docs_iter
549            .into_iter()
550            .skip(offset)
551            .take(limit.unwrap_or(usize::MAX))
552            .collect();
553
554        // Check for aggregation... (existing logic)
555        let has_aggregation =
556            !sub_fields.is_empty() && sub_fields.iter().any(|f| f.name == "aggregate");
557
558        // Check for groupBy
559        let group_by_field = if !sub_fields.is_empty() {
560            sub_fields.iter().find(|f| f.name == "groupBy")
561        } else {
562            None
563        };
564
565        if let Some(gb_field) = group_by_field {
566            execute_group_by(&paginated_docs, gb_field, fragments, variables)?
567        } else if has_aggregation {
568            let agg_doc = execute_aggregation(&paginated_docs, &sub_fields, fragments, variables)?;
569            vec![agg_doc]
570        } else if options.apply_projections && !sub_fields.is_empty() {
571            let mut projected = Vec::new();
572            for doc in paginated_docs {
573                projected.push(
574                    apply_projection_with_lookups(
575                        db,
576                        doc,
577                        &field.selection_set,
578                        variables,
579                        fragments,
580                        collection_name,
581                    )
582                    .await?,
583                );
584            }
585            projected
586        } else {
587            paginated_docs
588        }
589    };
590
591    Ok(QueryResult {
592        collection: collection_name.clone(),
593        documents: final_docs,
594        total_count: Some(total_count),
595    })
596}
597
598/// Execute GroupBy on a set of documents
599fn execute_group_by(
600    docs: &[Document],
601    group_by_field: &ast::Field,
602    fragments: &HashMap<String, FragmentDef>,
603    variables: &HashMap<String, ast::Value>,
604) -> Result<Vec<Document>> {
605    // 1. Identify the grouping key field name
606    let key_field_name = group_by_field
607        .arguments
608        .iter()
609        .find(|a| a.name == "field")
610        .and_then(|a| match &a.value {
611            ast::Value::String(s) => Some(s.clone()),
612            _ => None,
613        })
614        .ok_or_else(|| {
615            AqlError::new(
616                ErrorCode::QueryError,
617                "groupBy requires a 'field' argument".to_string(),
618            )
619        })?;
620
621    // 2. Group documents
622    let mut groups: HashMap<String, Vec<&Document>> = HashMap::new();
623
624    for doc in docs {
625        let val = doc.data.get(&key_field_name).unwrap_or(&Value::Null);
626        let key_str = match val {
627            Value::String(s) => s.clone(),
628            Value::Int(i) => i.to_string(),
629            Value::Float(f) => f.to_string(),
630            Value::Bool(b) => b.to_string(),
631            Value::Null => "null".to_string(),
632            _ => format!("{:?}", val), // Fallback
633        };
634
635        groups.entry(key_str).or_default().push(doc);
636    }
637
638    // 3. Construct result documents for each group
639    let mut result_docs = Vec::new();
640
641    for (group_key, group_docs) in groups {
642        let mut group_data = HashMap::new();
643
644        // Process selection set for the group
645        // groupBy { key, count, nodes { ... }, aggregate { ... } }
646        let group_fields =
647            collect_fields(&group_by_field.selection_set, fragments, variables, None)?;
648
649        for field in &group_fields {
650            let alias = field.alias.as_ref().unwrap_or(&field.name).clone();
651
652            match field.name.as_str() {
653                "key" => {
654                    group_data.insert(alias, Value::String(group_key.clone()));
655                }
656                "count" => {
657                    group_data.insert(alias, Value::Int(group_docs.len() as i64));
658                }
659                "nodes" => {
660                    // Return the documents in this group
661                    let nodes: Vec<Value> = group_docs
662                        .iter()
663                        .map(|d| {
664                            if !field.selection_set.is_empty() {
665                                // Need apply_projection that handles Selection... or update apply_projection signature
666                                // Actually apply_projection_with_lookups is async, but this is sync.
667                                // Simplified projection for now as apply_projection helper not fully updated.
668                                // Re-using existing structure but ignoring deep lookups for simple projection
669                                let proj_fields = collect_fields(
670                                    &field.selection_set,
671                                    fragments,
672                                    variables,
673                                    None,
674                                )
675                                .unwrap_or_default();
676                                let proj = apply_projection((*d).clone(), &proj_fields);
677                                Value::Object(proj.data)
678                            } else {
679                                Value::Object(d.data.clone())
680                            }
681                        })
682                        .collect();
683                    group_data.insert(alias, Value::Array(nodes));
684                }
685                "aggregate" => {
686                    // Run aggregation on this group's documents
687                    let group_docs_owned: Vec<Document> =
688                        group_docs.iter().map(|d| (*d).clone()).collect();
689                    // Note regarding aggregation sub-selections: existing parser puts them in generic selection_set
690                    // or specialized structure. If existing aggregation parsing resulted in Field with nested Fields,
691                    // we need to make sure execute_aggregation handles Vec<Selection> or we collect it.
692                    // The group_by field -> aggregate -> selection_set contains function calls.
693                    // Those are fields.
694                    let agg_result = execute_aggregation(
695                        &group_docs_owned,
696                        &[field.clone()],
697                        fragments,
698                        variables,
699                    )?;
700                    // Flatten result into group_data
701                    for (k, v) in agg_result.data {
702                        group_data.insert(k, v);
703                    }
704                }
705                _ => {
706                    // Ignore unknown fields
707                }
708            }
709        }
710
711        result_docs.push(Document {
712            id: format!("group_{}", group_key),
713            data: group_data,
714        });
715    }
716
717    Ok(result_docs)
718}
719
720/// Execute aggregation over a set of documents
721fn execute_aggregation(
722    docs: &[Document],
723    selection_set: &[ast::Field],
724    fragments: &HashMap<String, FragmentDef>,
725    variables: &HashMap<String, ast::Value>,
726) -> Result<Document> {
727    let mut result_data = HashMap::new();
728
729    for field in selection_set {
730        let alias = field.alias.as_ref().unwrap_or(&field.name).clone();
731
732        if field.name == "aggregate" {
733            // Process the aggregation block
734            // e.g. aggregate { count, avg(field: "age") }
735            let mut agg_result = HashMap::new();
736
737            let agg_fields = collect_fields(&field.selection_set, fragments, variables, None)?;
738
739            for agg_fn in agg_fields {
740                // Aggregation functions inside are Fields
741
742                let agg_alias = agg_fn.alias.as_ref().unwrap_or(&agg_fn.name).clone();
743                let agg_name = &agg_fn.name;
744
745                let value = match agg_name.as_str() {
746                    "count" => Value::Int(docs.len() as i64),
747                    "sum" | "avg" | "min" | "max" => {
748                        // Extract target field from arguments
749                        // e.g. sum(field: "age") or sum(fields: ["a", "b"])
750                        let target_field = agg_fn
751                            .arguments
752                            .iter()
753                            .find(|a| a.name == "field")
754                            .and_then(|a| match &a.value {
755                                ast::Value::String(s) => Some(s.clone()),
756                                _ => None,
757                            })
758                            .ok_or_else(|| {
759                                AqlError::new(
760                                    ErrorCode::QueryError,
761                                    format!(
762                                        "Aggregation '{}' requires a 'field' argument",
763                                        agg_name
764                                    ),
765                                )
766                            })?;
767
768                        // Collect values
769                        let values: Vec<f64> = docs
770                            .iter()
771                            .filter_map(|d| {
772                                d.data.get(&target_field).and_then(|v| match v {
773                                    Value::Int(i) => Some(*i as f64),
774                                    Value::Float(f) => Some(*f),
775                                    _ => None,
776                                })
777                            })
778                            .collect();
779
780                        match agg_name.as_str() {
781                            "sum" => Value::Float(values.iter().sum()),
782                            "avg" => {
783                                if values.is_empty() {
784                                    Value::Null
785                                } else {
786                                    let sum: f64 = values.iter().sum();
787                                    Value::Float(sum / values.len() as f64)
788                                }
789                            }
790                            "min" => values
791                                .iter()
792                                .fold(None, |min, val| match min {
793                                    None => Some(*val),
794                                    Some(m) => Some(if *val < m { *val } else { m }),
795                                })
796                                .map(Value::Float)
797                                .unwrap_or(Value::Null),
798                            "max" => values
799                                .iter()
800                                .fold(None, |max, val| match max {
801                                    None => Some(*val),
802                                    Some(m) => Some(if *val > m { *val } else { m }),
803                                })
804                                .map(Value::Float)
805                                .unwrap_or(Value::Null),
806                            _ => Value::Null, // Should be unreachable
807                        }
808                    }
809                    _ => {
810                        return Err(AqlError::new(
811                            ErrorCode::QueryError,
812                            format!("Unknown aggregation function '{}'", agg_name),
813                        ));
814                    }
815                };
816
817                agg_result.insert(agg_alias, value);
818            }
819
820            result_data.insert(alias, Value::Object(agg_result));
821        }
822    }
823
824    Ok(Document {
825        id: "aggregation_result".to_string(),
826        data: result_data,
827    })
828}
829
830/// Execute a lookup (cross-collection join) for a parent document
831async fn execute_lookup(
832    db: &Aurora,
833    parent_doc: &Document,
834    lookup: &ast::LookupSelection,
835    variables: &HashMap<String, ast::Value>,
836    fragments: &HashMap<String, FragmentDef>,
837) -> Result<Value> {
838    // Get the local field value from the parent document
839    let local_value = parent_doc.data.get(&lookup.local_field);
840
841    if local_value.is_none() {
842        return Ok(Value::Array(vec![]));
843    }
844
845    let local_value = local_value.unwrap();
846
847    // Query the foreign collection
848    let foreign_docs = db.aql_get_all_collection(&lookup.collection).await?;
849
850    // Filter to documents where foreign_field matches local_value
851    let matching_docs: Vec<Document> = foreign_docs
852        .into_iter()
853        .filter(|doc| {
854            if let Some(foreign_val) = doc.data.get(&lookup.foreign_field) {
855                db_values_equal(foreign_val, local_value)
856            } else {
857                false
858            }
859        })
860        .collect();
861
862    // Apply optional filter if present
863    let filtered_docs = if let Some(ref filter) = lookup.filter {
864        let compiled_filter = compile_filter(filter)?;
865        matching_docs
866            .into_iter()
867            .filter(|doc| matches_filter(doc, &compiled_filter, variables))
868            .collect()
869    } else {
870        matching_docs
871    };
872
873    // Apply projection from selection_set
874    let projected: Vec<Value> = filtered_docs
875        .into_iter()
876        .map(|doc| {
877            // Convert Selection to Field for projection
878            // Collect fields from the lookup's selection set
879            let fields = collect_fields(
880                &lookup.selection_set,
881                fragments,
882                variables,
883                Some(&lookup.collection),
884            )
885            .unwrap_or_default();
886
887            if fields.is_empty() {
888                Value::Object(doc.data)
889            } else {
890                let projected_doc = apply_projection(doc, &fields);
891                Value::Object(projected_doc.data)
892            }
893        })
894        .collect();
895
896    Ok(Value::Array(projected))
897}
898
899/// Check if two database values are equal (for join matching)
900fn db_values_equal(a: &Value, b: &Value) -> bool {
901    match (a, b) {
902        (Value::String(s1), Value::String(s2)) => s1 == s2,
903        (Value::Int(i1), Value::Int(i2)) => i1 == i2,
904        (Value::Float(f1), Value::Float(f2)) => (f1 - f2).abs() < f64::EPSILON,
905        (Value::Bool(b1), Value::Bool(b2)) => b1 == b2,
906        (Value::Null, Value::Null) => true,
907        // Cross-type comparisons for IDs (string vs int)
908        (Value::String(s), Value::Int(i)) => s.parse::<i64>().ok() == Some(*i),
909        (Value::Int(i), Value::String(s)) => s.parse::<i64>().ok() == Some(*i),
910        _ => false,
911    }
912}
913
914/// Apply projection (select specific fields) with support for lookups and fragments
915async fn apply_projection_with_lookups(
916    db: &Aurora,
917    doc: Document,
918    selection_set: &[ast::Selection],
919    variables: &HashMap<String, ast::Value>,
920    fragments: &HashMap<String, FragmentDef>,
921    collection: &str,
922) -> Result<Document> {
923    let fields = collect_fields(selection_set, fragments, variables, Some(collection))?;
924
925    if fields.is_empty() {
926        return Ok(doc);
927    }
928
929    let mut projected_data = HashMap::new();
930
931    // Always include id (prefer the document ID)
932    if !doc.id.is_empty() {
933        projected_data.insert("id".to_string(), Value::String(doc.id.clone()));
934    } else if let Some(id_val) = doc.data.get("id") {
935        projected_data.insert("id".to_string(), id_val.clone());
936    }
937
938    for field in fields {
939        let field_name = field.alias.as_ref().unwrap_or(&field.name);
940        let source_name = &field.name;
941
942        // Check if this is a lookup field (starts with "lookup" keyword in name or has lookup args)
943        let is_lookup = field.arguments.iter().any(|arg| {
944            arg.name == "collection" || arg.name == "localField" || arg.name == "foreignField"
945        });
946
947        if is_lookup {
948            // Parse lookup from field arguments
949            // Extract where filter if present
950            let filter = extract_filter_from_args(&field.arguments).ok().flatten();
951
952            let collection =
953                extract_string_arg(&field.arguments, "collection").ok_or_else(|| {
954                    AqlError::new(
955                        ErrorCode::QueryError,
956                        "lookup requires 'collection' argument".to_string(),
957                    )
958                })?;
959            let local_field =
960                extract_string_arg(&field.arguments, "localField").ok_or_else(|| {
961                    AqlError::new(
962                        ErrorCode::QueryError,
963                        "lookup requires 'localField' argument".to_string(),
964                    )
965                })?;
966            let foreign_field =
967                extract_string_arg(&field.arguments, "foreignField").ok_or_else(|| {
968                    AqlError::new(
969                        ErrorCode::QueryError,
970                        "lookup requires 'foreignField' argument".to_string(),
971                    )
972                })?;
973
974            let lookup = ast::LookupSelection {
975                collection,
976                local_field,
977                foreign_field,
978                filter,
979                selection_set: field.selection_set.clone(),
980            };
981
982            let lookup_result = execute_lookup(db, &doc, &lookup, variables, fragments).await?;
983            projected_data.insert(field_name.clone(), lookup_result);
984        } else if let Some(value) = doc.data.get(source_name) {
985            projected_data.insert(field_name.clone(), value.clone());
986        }
987    }
988
989    Ok(Document {
990        id: doc.id,
991        data: projected_data,
992    })
993}
994
995/// Extract string value from arguments
996fn extract_string_arg(args: &[ast::Argument], name: &str) -> Option<String> {
997    args.iter().find(|a| a.name == name).and_then(|a| {
998        if let ast::Value::String(s) = &a.value {
999            Some(s.clone())
1000        } else {
1001            None
1002        }
1003    })
1004}
1005
1006/// Validate a document against validation rules
1007pub fn validate_document(doc: &Document, rules: &[ast::ValidationRule]) -> Result<Vec<String>> {
1008    let mut errors = Vec::new();
1009
1010    for rule in rules {
1011        let field_value = doc.data.get(&rule.field);
1012
1013        for constraint in &rule.constraints {
1014            match constraint {
1015                ast::ValidationConstraint::Format(format) => {
1016                    if let Some(Value::String(s)) = field_value {
1017                        match format.as_str() {
1018                            "email" => {
1019                                if !s.contains('@') || !s.contains('.') {
1020                                    errors.push(format!("{}: invalid email format", rule.field));
1021                                }
1022                            }
1023                            "url" => {
1024                                if !s.starts_with("http://") && !s.starts_with("https://") {
1025                                    errors.push(format!("{}: invalid URL format", rule.field));
1026                                }
1027                            }
1028                            "uuid" => {
1029                                if uuid::Uuid::parse_str(s).is_err() {
1030                                    errors.push(format!("{}: invalid UUID format", rule.field));
1031                                }
1032                            }
1033                            _ => {}
1034                        }
1035                    }
1036                }
1037                ast::ValidationConstraint::Min(min) => {
1038                    let valid = match field_value {
1039                        Some(Value::Int(i)) => (*i as f64) >= *min,
1040                        Some(Value::Float(f)) => *f >= *min,
1041                        _ => true,
1042                    };
1043                    if !valid {
1044                        errors.push(format!("{}: value must be >= {}", rule.field, min));
1045                    }
1046                }
1047                ast::ValidationConstraint::Max(max) => {
1048                    let valid = match field_value {
1049                        Some(Value::Int(i)) => (*i as f64) <= *max,
1050                        Some(Value::Float(f)) => *f <= *max,
1051                        _ => true,
1052                    };
1053                    if !valid {
1054                        errors.push(format!("{}: value must be <= {}", rule.field, max));
1055                    }
1056                }
1057                ast::ValidationConstraint::MinLength(min_len) => {
1058                    if let Some(Value::String(s)) = field_value {
1059                        if (s.len() as i64) < *min_len {
1060                            errors.push(format!("{}: length must be >= {}", rule.field, min_len));
1061                        }
1062                    }
1063                }
1064                ast::ValidationConstraint::MaxLength(max_len) => {
1065                    if let Some(Value::String(s)) = field_value {
1066                        if (s.len() as i64) > *max_len {
1067                            errors.push(format!("{}: length must be <= {}", rule.field, max_len));
1068                        }
1069                    }
1070                }
1071                ast::ValidationConstraint::Pattern(pattern) => {
1072                    if let Some(Value::String(s)) = field_value {
1073                        // Use RegexBuilder with size limits to prevent ReDoS attacks
1074                        match regex::RegexBuilder::new(pattern)
1075                            .size_limit(10_000_000) // 10MB compiled regex size limit
1076                            .dfa_size_limit(2_000_000) // 2MB DFA size limit
1077                            .build()
1078                        {
1079                            Ok(re) => {
1080                                if !re.is_match(s) {
1081                                    errors.push(format!(
1082                                        "{}: does not match pattern '{}'",
1083                                        rule.field, pattern
1084                                    ));
1085                                }
1086                            }
1087                            Err(e) => {
1088                                errors.push(format!(
1089                                    "{}: regex pattern '{}' is too complex or invalid: {}",
1090                                    rule.field, pattern, e
1091                                ));
1092                            }
1093                        }
1094                    }
1095                }
1096            }
1097        }
1098    }
1099
1100    Ok(errors)
1101}
1102
1103/// Execute downsample operation for time-series data
1104/// Groups data by time interval and applies aggregation
1105pub fn execute_downsample(
1106    docs: &[Document],
1107    interval: &str,
1108    aggregation: &str,
1109    time_field: &str,
1110    value_field: &str,
1111) -> Result<Vec<Document>> {
1112    // Parse interval (e.g., "1h", "5m", "1d")
1113    let interval_secs = parse_interval(interval)?;
1114
1115    // Group documents by time bucket
1116    let mut buckets: HashMap<i64, Vec<&Document>> = HashMap::new();
1117
1118    for doc in docs {
1119        if let Some(Value::Int(ts)) = doc.data.get(time_field) {
1120            let bucket = (*ts / interval_secs) * interval_secs;
1121            buckets.entry(bucket).or_default().push(doc);
1122        }
1123    }
1124
1125    // Apply aggregation to each bucket
1126    let mut result_docs = Vec::new();
1127    let mut sorted_buckets: Vec<_> = buckets.into_iter().collect();
1128    sorted_buckets.sort_by_key(|(k, _)| *k);
1129
1130    for (bucket_ts, bucket_docs) in sorted_buckets {
1131        let values: Vec<f64> = bucket_docs
1132            .iter()
1133            .filter_map(|d| match d.data.get(value_field) {
1134                Some(Value::Int(i)) => Some(*i as f64),
1135                Some(Value::Float(f)) => Some(*f),
1136                _ => None,
1137            })
1138            .collect();
1139
1140        let agg_value = match aggregation {
1141            "avg" | "average" => {
1142                if values.is_empty() {
1143                    0.0
1144                } else {
1145                    values.iter().sum::<f64>() / values.len() as f64
1146                }
1147            }
1148            "sum" => values.iter().sum(),
1149            "min" => values.iter().cloned().fold(f64::INFINITY, f64::min),
1150            "max" => values.iter().cloned().fold(f64::NEG_INFINITY, f64::max),
1151            "count" => values.len() as f64,
1152            "first" => *values.first().unwrap_or(&0.0),
1153            "last" => *values.last().unwrap_or(&0.0),
1154            _ => values.iter().sum::<f64>() / values.len().max(1) as f64,
1155        };
1156
1157        let mut data = HashMap::new();
1158        data.insert(time_field.to_string(), Value::Int(bucket_ts));
1159        data.insert(value_field.to_string(), Value::Float(agg_value));
1160        data.insert("count".to_string(), Value::Int(bucket_docs.len() as i64));
1161
1162        result_docs.push(Document {
1163            id: format!("bucket_{}", bucket_ts),
1164            data,
1165        });
1166    }
1167
1168    Ok(result_docs)
1169}
1170
1171/// Parse interval string to seconds (e.g., "1h" -> 3600, "5m" -> 300)
1172fn parse_interval(interval: &str) -> Result<i64> {
1173    let interval = interval.trim().to_lowercase();
1174    let (num_str, unit) = interval.split_at(interval.len().saturating_sub(1));
1175    let num: i64 = num_str.parse().unwrap_or(1);
1176    let multiplier = match unit {
1177        "s" => 1,
1178        "m" => 60,
1179        "h" => 3600,
1180        "d" => 86400,
1181        "w" => 604800,
1182        _ => {
1183            return Err(AqlError::new(
1184                ErrorCode::QueryError,
1185                format!("Invalid interval unit '{}'", unit),
1186            ));
1187        }
1188    };
1189    Ok(num * multiplier)
1190}
1191
1192/// Execute window function on documents
1193pub fn execute_window_function(
1194    docs: &[Document],
1195    field: &str,
1196    function: &str,
1197    window_size: usize,
1198) -> Result<Vec<Document>> {
1199    if window_size == 0 {
1200        return Err(AqlError::new(
1201            ErrorCode::QueryError,
1202            "window_size must be >= 1".to_string(),
1203        ));
1204    }
1205    let mut result_docs = Vec::new();
1206
1207    for (i, doc) in docs.iter().enumerate() {
1208        let window_start = i.saturating_sub(window_size - 1);
1209        let window: Vec<f64> = docs[window_start..=i]
1210            .iter()
1211            .filter_map(|d| match d.data.get(field) {
1212                Some(Value::Int(v)) => Some(*v as f64),
1213                Some(Value::Float(v)) => Some(*v),
1214                _ => None,
1215            })
1216            .collect();
1217
1218        let window_value = match function {
1219            "ROW_NUMBER" | "row_number" => (i + 1) as f64,
1220            "RANK" | "rank" => (i + 1) as f64, // Simplified
1221            "SUM" | "sum" | "running_sum" => window.iter().sum(),
1222            "AVG" | "avg" | "moving_avg" => {
1223                if window.is_empty() {
1224                    0.0
1225                } else {
1226                    window.iter().sum::<f64>() / window.len() as f64
1227                }
1228            }
1229            "MIN" | "min" => window.iter().cloned().fold(f64::INFINITY, f64::min),
1230            "MAX" | "max" => window.iter().cloned().fold(f64::NEG_INFINITY, f64::max),
1231            "COUNT" | "count" => window.len() as f64,
1232            _ => 0.0,
1233        };
1234
1235        let mut new_data = doc.data.clone();
1236        new_data.insert(
1237            format!("{}_window", function.to_lowercase()),
1238            Value::Float(window_value),
1239        );
1240
1241        result_docs.push(Document {
1242            id: doc.id.clone(),
1243            data: new_data,
1244        });
1245    }
1246
1247    Ok(result_docs)
1248}
1249
1250/// Execute a mutation operation
1251async fn execute_mutation(
1252    db: &Aurora,
1253    mutation: &ast::Mutation,
1254    options: &ExecutionOptions,
1255    fragments: &HashMap<String, FragmentDef>,
1256) -> Result<ExecutionResult> {
1257    // Validate required variables are provided
1258    validate_required_variables(&mutation.variable_definitions, &mutation.variables_values)?;
1259
1260    let mut results = Vec::new();
1261    let mut context: ExecutionContext = HashMap::new();
1262
1263    for mut_op in &mutation.operations {
1264        let result = execute_mutation_op(
1265            db,
1266            mut_op,
1267            &mutation.variables_values,
1268            &context,
1269            options,
1270            fragments,
1271        )
1272        .await?;
1273
1274        // Update context if alias is present
1275        if let Some(alias) = &mut_op.alias {
1276            if let Some(doc) = result.returned_documents.first() {
1277                // Manually convert map to JsonValue::Object
1278                let mut json_map = serde_json::Map::new();
1279                for (k, v) in &doc.data {
1280                    json_map.insert(k.clone(), aurora_value_to_json_value(v));
1281                }
1282
1283                json_map.insert("id".to_string(), JsonValue::String(doc.id.clone()));
1284
1285                let doc_json = JsonValue::Object(json_map);
1286
1287                context.insert(alias.clone(), doc_json);
1288            }
1289        }
1290
1291        results.push(result);
1292    }
1293
1294    if results.len() == 1 {
1295        Ok(ExecutionResult::Mutation(results.remove(0)))
1296    } else {
1297        Ok(ExecutionResult::Batch(
1298            results.into_iter().map(ExecutionResult::Mutation).collect(),
1299        ))
1300    }
1301}
1302
1303use base64::{Engine as _, engine::general_purpose};
1304use futures::future::{BoxFuture, FutureExt}; // Added this import
1305
1306/// Execute a single mutation operation
1307fn execute_mutation_op<'a>(
1308    db: &'a Aurora,
1309    mut_op: &'a ast::MutationOperation,
1310    variables: &'a HashMap<String, ast::Value>,
1311    context: &'a ExecutionContext,
1312    options: &'a ExecutionOptions,
1313    fragments: &'a HashMap<String, FragmentDef>,
1314) -> BoxFuture<'a, Result<MutationResult>> {
1315    async move {
1316        match &mut_op.operation {
1317            MutationOp::Insert { collection, data } => {
1318                let resolved_data = resolve_value(data, variables, context);
1319                let doc_data = aql_value_to_hashmap(&resolved_data)?;
1320                let doc = db.aql_insert(collection, doc_data).await?;
1321
1322                let returned = if !mut_op.selection_set.is_empty() && options.apply_projections {
1323                    // Flatten selection struct for projection
1324                    // We use the collection name from the mutation as parent type
1325                    // Using passed fragments
1326                    let fields = collect_fields(
1327                        &mut_op.selection_set,
1328                        fragments,
1329                        variables,
1330                        Some(collection),
1331                    )
1332                    .unwrap_or_default();
1333
1334                    vec![apply_projection(doc.clone(), &fields)]
1335                } else {
1336                    vec![doc]
1337                };
1338
1339                Ok(MutationResult {
1340                    operation: "insert".to_string(),
1341                    collection: collection.clone(),
1342                    affected_count: 1,
1343                    returned_documents: returned,
1344                })
1345            }
1346
1347            MutationOp::InsertMany { collection, data } => {
1348                let mut docs = Vec::new();
1349                for item in data {
1350                    let resolved_item = resolve_value(item, variables, context);
1351                    let doc_data = aql_value_to_hashmap(&resolved_item)?;
1352                    let doc = db.aql_insert(collection, doc_data).await?;
1353                    docs.push(doc);
1354                }
1355
1356                let count = docs.len();
1357                let returned = if !mut_op.selection_set.is_empty() && options.apply_projections {
1358                    docs.into_iter()
1359                        .map(|d| {
1360                            let fields = collect_fields(
1361                                &mut_op.selection_set,
1362                                fragments,
1363                                variables,
1364                                Some(collection),
1365                            )
1366                            .unwrap_or_default();
1367                            apply_projection(d, &fields)
1368                        })
1369                        .collect()
1370                } else {
1371                    docs
1372                };
1373
1374                Ok(MutationResult {
1375                    operation: "insertMany".to_string(),
1376                    collection: collection.clone(),
1377                    affected_count: count,
1378                    returned_documents: returned,
1379                })
1380            }
1381
1382            MutationOp::Update {
1383                collection,
1384                filter,
1385                data,
1386            } => {
1387                let resolved_data = resolve_value(data, variables, context);
1388                let update_data = aql_value_to_hashmap(&resolved_data)?;
1389
1390                let compiled_filter = if let Some(f) = filter {
1391                    Some(compile_filter(f)?)
1392                } else {
1393                    None
1394                };
1395
1396                let filter_fn = |doc: &Document| {
1397                    compiled_filter
1398                        .as_ref()
1399                        .map(|f| matches_filter(doc, f, variables))
1400                        .unwrap_or(true)
1401                };
1402                // Get only matching docs
1403                let matching_docs = db.scan_and_filter(collection, filter_fn, None)?;
1404
1405                let mut affected = 0;
1406                let mut returned = Vec::new();
1407
1408                for doc in matching_docs {
1409                    let updated_doc = db
1410                        .aql_update_document(collection, &doc.id, update_data.clone())
1411                        .await?;
1412                    returned.push(updated_doc);
1413                    affected += 1;
1414                }
1415
1416                let returned = if !mut_op.selection_set.is_empty() && options.apply_projections {
1417                    returned
1418                        .into_iter()
1419                        .map(|d| {
1420                            let fields = collect_fields(
1421                                &mut_op.selection_set,
1422                                fragments,
1423                                variables,
1424                                Some(collection),
1425                            )
1426                            .unwrap_or_default();
1427                            apply_projection(d, &fields)
1428                        })
1429                        .collect()
1430                } else {
1431                    returned
1432                };
1433
1434                Ok(MutationResult {
1435                    operation: "update".to_string(),
1436                    collection: collection.clone(),
1437                    affected_count: affected,
1438                    returned_documents: returned,
1439                })
1440            }
1441
1442            MutationOp::Upsert {
1443                collection,
1444                filter,
1445                data,
1446            } => {
1447                // For upsert, try update first, if no matches then insert
1448                let resolved_data = resolve_value(data, variables, context);
1449                let update_data = aql_value_to_hashmap(&resolved_data)?;
1450
1451                let compiled_filter = if let Some(f) = filter {
1452                    Some(compile_filter(f)?)
1453                } else {
1454                    None
1455                };
1456
1457                let filter_fn = |doc: &Document| {
1458                    compiled_filter
1459                        .as_ref()
1460                        .map(|f| matches_filter(doc, f, variables))
1461                        .unwrap_or(false) // For upsert, no filter means match nothing (trigger Insert)
1462                };
1463
1464                let matching = db.scan_and_filter(collection, filter_fn, None)?;
1465
1466                if matching.is_empty() {
1467                    // Insert
1468                    let doc = db.aql_insert(collection, update_data).await?;
1469                    Ok(MutationResult {
1470                        operation: "upsert(insert)".to_string(),
1471                        collection: collection.clone(),
1472                        affected_count: 1,
1473                        returned_documents: vec![doc],
1474                    })
1475                } else {
1476                    // Update matching documents
1477                    let mut affected = 0;
1478                    let mut returned = Vec::new();
1479
1480                    for doc in matching {
1481                        let updated_doc = db
1482                            .aql_update_document(collection, &doc.id, update_data.clone())
1483                            .await?;
1484                        returned.push(updated_doc);
1485                        affected += 1;
1486                    }
1487
1488                    Ok(MutationResult {
1489                        operation: "upsert(update)".to_string(),
1490                        collection: collection.clone(),
1491                        affected_count: affected,
1492                        returned_documents: returned,
1493                    })
1494                }
1495            }
1496
1497            MutationOp::Delete { collection, filter } => {
1498                let compiled_filter = if let Some(f) = filter {
1499                    Some(compile_filter(f)?)
1500                } else {
1501                    None
1502                };
1503
1504                let filter_fn = |doc: &Document| {
1505                    compiled_filter
1506                        .as_ref()
1507                        .map(|f| matches_filter(doc, f, variables))
1508                        .unwrap_or(true) // Delete all if no filter
1509                };
1510
1511                let matching_docs = db.scan_and_filter(collection, filter_fn, None)?;
1512
1513                let mut affected = 0;
1514                let mut returned = Vec::new();
1515
1516                for doc in matching_docs {
1517                    let deleted_doc = db.aql_delete_document(collection, &doc.id).await?;
1518                    returned.push(deleted_doc);
1519                    affected += 1;
1520                }
1521
1522                Ok(MutationResult {
1523                    operation: "delete".to_string(),
1524                    collection: collection.clone(),
1525                    affected_count: affected,
1526                    returned_documents: returned,
1527                })
1528            }
1529
1530            MutationOp::EnqueueJob {
1531                job_type,
1532                payload,
1533                priority,
1534                scheduled_at,
1535                max_retries,
1536            } => {
1537                let workers = db
1538                    .workers
1539                    .as_ref()
1540                    .ok_or_else(|| AqlError::invalid_operation("Worker system not initialized"))?;
1541
1542                let mut job = crate::workers::Job::new(job_type);
1543
1544                // Payload
1545
1546                let resolved_payload = resolve_value(payload, variables, context);
1547                if let ast::Value::Object(p) = resolved_payload {
1548                    for (k, v) in p {
1549                        let db_val = aql_value_to_db_value(&v)?;
1550                        let json_val: serde_json::Value =
1551                            serde_json::to_value(&db_val).map_err(|e| {
1552                                AqlError::new(ErrorCode::SerializationError, e.to_string())
1553                            })?;
1554                        let key_str = k.to_string();
1555                        job = job.add_field(key_str, json_val);
1556                    }
1557                }
1558
1559                // Priority
1560                let p_enum = match priority {
1561                    ast::JobPriority::Critical => crate::workers::JobPriority::Critical,
1562                    ast::JobPriority::High => crate::workers::JobPriority::High,
1563                    ast::JobPriority::Low => crate::workers::JobPriority::Low,
1564                    ast::JobPriority::Normal => crate::workers::JobPriority::Normal,
1565                };
1566                job = job.with_priority(p_enum); // Scheduled At
1567                if let Some(s_str) = scheduled_at {
1568                    if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(s_str) {
1569                        job = job.scheduled_at(dt.with_timezone(&chrono::Utc));
1570                    }
1571                }
1572
1573                // Max Retries
1574                if let Some(retries) = max_retries {
1575                    job = job.with_max_retries((*retries).try_into().unwrap_or(3));
1576                }
1577
1578                let job_id = workers.enqueue(job).await?;
1579
1580                Ok(MutationResult {
1581                    operation: "enqueueJob".to_string(),
1582                    collection: "jobs".to_string(),
1583                    affected_count: 1,
1584                    returned_documents: vec![Document {
1585                        id: job_id,
1586                        data: HashMap::new(),
1587                    }],
1588                })
1589            }
1590
1591            MutationOp::Transaction { operations } => {
1592                // Execute in transaction
1593                let tx = db.aql_begin_transaction()?;
1594                let mut results = Vec::new();
1595
1596                for inner_op in operations {
1597                    match execute_mutation_op(db, inner_op, variables, context, options, fragments)
1598                        .await
1599                    {
1600                        Ok(result) => results.push(result),
1601                        Err(e) => {
1602                            let _ = db.aql_rollback_transaction(tx).await;
1603                            return Err(e);
1604                        }
1605                    }
1606                }
1607
1608                db.aql_commit_transaction(tx).await?;
1609
1610                let total_affected: usize = results.iter().map(|r| r.affected_count).sum();
1611                let all_returned: Vec<Document> = results
1612                    .into_iter()
1613                    .flat_map(|r| r.returned_documents)
1614                    .collect();
1615
1616                Ok(MutationResult {
1617                    operation: "transaction".to_string(),
1618                    collection: "multiple".to_string(),
1619                    affected_count: total_affected,
1620                    returned_documents: all_returned,
1621                })
1622            }
1623        }
1624    }
1625    .boxed()
1626}
1627
1628/// Execute a subscription operation
1629async fn execute_subscription(
1630    db: &Aurora,
1631    sub: &ast::Subscription,
1632    _options: &ExecutionOptions,
1633) -> Result<ExecutionResult> {
1634    let collection = sub
1635        .selection_set
1636        .first()
1637        .and_then(|sel| match sel {
1638            Selection::Field(f) => Some(f.name.clone()),
1639            _ => None, // Subscriptions shouldn't use fragments for top level usually?
1640        })
1641        .unwrap_or_default();
1642
1643    if collection.is_empty() {
1644        return Err(AqlError::new(
1645            ErrorCode::QueryError,
1646            "Subscription must select a collection".to_string(),
1647        ));
1648    }
1649
1650    // Create listener
1651    let mut listener = db.pubsub.listen(&collection);
1652
1653    // Apply filter if present
1654    // We look at the first field's arguments for 'where'
1655    if let Some(selection) = sub.selection_set.first() {
1656        if let Selection::Field(field) = selection {
1657            let filter_opt = extract_filter_from_args(&field.arguments)?;
1658            if let Some(aql_filter) = filter_opt {
1659                if let Some(event_filter) = convert_aql_filter_to_event_filter(&aql_filter) {
1660                    listener = listener.filter(event_filter);
1661                } else {
1662                    return Err(AqlError::new(
1663                        ErrorCode::QueryError,
1664                        "Unsupported filter operator in subscription",
1665                    ));
1666                }
1667            }
1668        }
1669    }
1670
1671    Ok(ExecutionResult::Subscription(SubscriptionResult {
1672        subscription_id: uuid::Uuid::new_v4().to_string(),
1673        collection,
1674        stream: Some(listener),
1675    }))
1676}
1677
1678/// Execute an introspection query (__schema)
1679async fn execute_introspection(
1680    db: &Aurora,
1681    intro: &ast::IntrospectionQuery,
1682) -> Result<ExecutionResult> {
1683    let mut result_data = HashMap::new();
1684
1685    // Get all collection names from stats
1686    let collection_stats = db.get_collection_stats().unwrap_or_default();
1687    let collection_names: Vec<String> = collection_stats.keys().cloned().collect();
1688
1689    for field_name in &intro.fields {
1690        match field_name.as_str() {
1691            "collections" => {
1692                // List all collections
1693                let collection_list: Vec<Value> = collection_names
1694                    .iter()
1695                    .map(|name| Value::String(name.clone()))
1696                    .collect();
1697                result_data.insert("collections".to_string(), Value::Array(collection_list));
1698            }
1699            "fields" => {
1700                // Get fields for all collections
1701                let mut all_fields = HashMap::new();
1702                for name in &collection_names {
1703                    if let Ok(coll) = db.get_collection_definition(name) {
1704                        let field_names: Vec<Value> = coll
1705                            .fields
1706                            .keys()
1707                            .map(|k| Value::String(k.clone()))
1708                            .collect();
1709                        all_fields.insert(name.clone(), Value::Array(field_names));
1710                    }
1711                }
1712                result_data.insert("fields".to_string(), Value::Object(all_fields));
1713            }
1714            "relations" => {
1715                // Relations are not yet implemented, return empty
1716                result_data.insert("relations".to_string(), Value::Array(vec![]));
1717            }
1718            _ => {
1719                // Unknown introspection field, ignore
1720            }
1721        }
1722    }
1723
1724    Ok(ExecutionResult::Query(QueryResult {
1725        collection: "__schema".to_string(),
1726        documents: vec![Document {
1727            id: "__schema".to_string(),
1728            data: result_data,
1729        }],
1730        total_count: Some(1),
1731    }))
1732}
1733
1734/// Helper to convert AST FieldDef to DB FieldDefinition
1735fn convert_ast_field_to_db_field(field: &ast::FieldDef) -> Result<crate::types::FieldDefinition> {
1736    use crate::types::{FieldDefinition, FieldType, ScalarType};
1737
1738    // Map Type Name
1739    let base_type = match field.field_type.name.as_str() {
1740        "String" | "ID" | "Email" | "URL" | "PhoneNumber" | "DateTime" | "Date" | "Time" => {
1741            ScalarType::String
1742        }
1743        "Int" => ScalarType::Int,
1744        "Float" => ScalarType::Float,
1745        "Boolean" => ScalarType::Bool,
1746        "Uuid" => ScalarType::Uuid,
1747        "Object" | "Json" => ScalarType::Object,
1748        "Any" => ScalarType::Any,
1749        "Array" => ScalarType::Array,
1750        _ => ScalarType::Any,
1751    };
1752
1753    // Construct final field type (Scalar or Array)
1754    let field_type = if field.field_type.is_array || field.field_type.name == "Array" {
1755        FieldType::Array(base_type)
1756    } else if field.field_type.name == "Object" || field.field_type.name == "Json" {
1757        FieldType::Object
1758    } else {
1759        FieldType::Scalar(base_type)
1760    };
1761
1762    // Parse Directives
1763    let mut unique = false;
1764    let mut indexed = false;
1765
1766    for directive in &field.directives {
1767        match directive.name.as_str() {
1768            "unique" => unique = true,
1769            "index" | "indexed" => indexed = true,
1770            _ => {}
1771        }
1772    }
1773
1774    // Validation matches DB logic
1775    if matches!(field_type, FieldType::Any) && (unique || indexed) {
1776        return Err(AqlError::new(
1777            ErrorCode::InvalidDefinition,
1778            format!(
1779                "Field '{}' of type 'Any' cannot be unique or indexed.",
1780                field.name
1781            ),
1782        ));
1783    }
1784
1785    Ok(FieldDefinition {
1786        field_type,
1787        unique,
1788        indexed,
1789        nullable: !field.field_type.is_required,
1790    })
1791}
1792
1793/// Execute a schema operation
1794async fn execute_schema(
1795    db: &Aurora,
1796    schema: &ast::Schema,
1797    _options: &ExecutionOptions,
1798) -> Result<ExecutionResult> {
1799    let mut results = Vec::new();
1800
1801    for op in &schema.operations {
1802        match op {
1803            ast::SchemaOp::DefineCollection {
1804                name,
1805                if_not_exists,
1806                fields,
1807                directives: _,
1808            } => {
1809                // Check if exists
1810                if *if_not_exists && db.get_collection_definition(name).is_ok() {
1811                    results.push(ExecutionResult::Schema(SchemaResult {
1812                        operation: "defineCollection".to_string(),
1813                        collection: name.clone(),
1814                        status: "skipped (exists)".to_string(),
1815                    }));
1816                    continue;
1817                }
1818
1819                // Call DB method to create collection
1820                let mut db_fields = std::collections::HashMap::new();
1821                for f in fields {
1822                    let def = convert_ast_field_to_db_field(f)?;
1823                    db_fields.insert(f.name.clone(), def);
1824                }
1825
1826                db.create_collection_schema(name, db_fields).await?;
1827
1828                results.push(ExecutionResult::Schema(SchemaResult {
1829                    operation: "defineCollection".to_string(),
1830                    collection: name.clone(),
1831                    status: "created".to_string(),
1832                }));
1833            }
1834            ast::SchemaOp::AlterCollection { name, actions } => {
1835                for action in actions {
1836                    match action {
1837                        ast::AlterAction::AddField(field_def) => {
1838                            let def = convert_ast_field_to_db_field(field_def)?;
1839                            db.add_field_to_schema(name, field_def.name.clone(), def)
1840                                .await?;
1841                        }
1842                        ast::AlterAction::DropField(field_name) => {
1843                            db.drop_field_from_schema(name, field_name.clone()).await?;
1844                        }
1845                        ast::AlterAction::RenameField { from, to } => {
1846                            db.rename_field_in_schema(name, from.clone(), to.clone())
1847                                .await?;
1848                        }
1849                        ast::AlterAction::ModifyField(field_def) => {
1850                            let def = convert_ast_field_to_db_field(field_def)?;
1851                            db.modify_field_in_schema(name, field_def.name.clone(), def)
1852                                .await?;
1853                        }
1854                    }
1855                }
1856                results.push(ExecutionResult::Schema(SchemaResult {
1857                    operation: "alterCollection".to_string(),
1858                    collection: name.clone(),
1859                    status: "modified".to_string(),
1860                }));
1861            }
1862            ast::SchemaOp::DropCollection { name, if_exists } => {
1863                if *if_exists && db.get_collection_definition(name).is_err() {
1864                    results.push(ExecutionResult::Schema(SchemaResult {
1865                        operation: "dropCollection".to_string(),
1866                        collection: name.clone(),
1867                        status: "skipped (not found)".to_string(),
1868                    }));
1869                    continue;
1870                }
1871
1872                db.drop_collection_schema(name).await?;
1873
1874                results.push(ExecutionResult::Schema(SchemaResult {
1875                    operation: "dropCollection".to_string(),
1876                    collection: name.clone(),
1877                    status: "dropped".to_string(),
1878                }));
1879            }
1880        }
1881    }
1882
1883    if results.len() == 1 {
1884        Ok(results.remove(0))
1885    } else {
1886        Ok(ExecutionResult::Batch(results))
1887    }
1888}
1889
1890/// Execute a migration operation
1891async fn execute_migration(
1892    db: &Aurora,
1893    migration: &ast::Migration,
1894    options: &ExecutionOptions,
1895) -> Result<ExecutionResult> {
1896    let mut results = Vec::new();
1897
1898    for step in &migration.steps {
1899        // Check if migration version already applied
1900        if db.is_migration_applied(&step.version).await? {
1901            continue;
1902        }
1903
1904        let mut applied_count = 0;
1905        for action in &step.actions {
1906            match action {
1907                ast::MigrationAction::Schema(schema_op) => {
1908                    // Re-use schema execution logic by wrapping in a temporary Schema struct.
1909                    let schema = ast::Schema {
1910                        operations: vec![schema_op.clone()],
1911                    };
1912                    execute_schema(db, &schema, options).await?;
1913                    applied_count += 1;
1914                }
1915                ast::MigrationAction::DataMigration(data_mig) => {
1916                    // Perform data transformation
1917                    // 1. Scan collection
1918                    // 2. Apply Rhai transforms
1919                    // 3. Update documents
1920                    let collection = &data_mig.collection;
1921                    let docs = db.aql_get_all_collection(collection).await?;
1922                    let engine = crate::computed::ComputedEngine::new();
1923
1924                    for doc in docs {
1925                        // Apply transforms to this doc
1926                        let mut updated_data = doc.data.clone();
1927                        let mut changed = false;
1928
1929                        for transform in &data_mig.transforms {
1930                            // Check if filter matches (if present)
1931                            let matches_filter = match &transform.filter {
1932                                Some(f) => check_ast_filter_match(f, &doc),
1933                                None => true,
1934                            };
1935
1936                            if matches_filter {
1937                                // Evaluate the Rhai expression
1938                                if let Some(new_value) =
1939                                    engine.evaluate(&transform.expression, &doc)
1940                                {
1941                                    updated_data.insert(transform.field.clone(), new_value);
1942                                    changed = true;
1943                                }
1944                            }
1945                        }
1946
1947                        if changed {
1948                            db.aql_update_document(collection, &doc.id, updated_data)
1949                                .await?;
1950                        }
1951                    }
1952                    applied_count += 1;
1953                }
1954            }
1955        }
1956
1957        db.mark_migration_applied(&step.version).await?;
1958
1959        results.push(ExecutionResult::Migration(MigrationResult {
1960            version: step.version.clone(),
1961            steps_applied: applied_count,
1962            status: "applied".to_string(),
1963        }));
1964    }
1965
1966    // Return a summary Migration result
1967    let total_applied = results
1968        .iter()
1969        .map(|r| {
1970            if let ExecutionResult::Migration(m) = r {
1971                m.steps_applied
1972            } else {
1973                0
1974            }
1975        })
1976        .sum();
1977
1978    if results.is_empty() {
1979        // All migrations were already applied
1980        Ok(ExecutionResult::Migration(MigrationResult {
1981            version: migration
1982                .steps
1983                .first()
1984                .map(|s| s.version.clone())
1985                .unwrap_or_default(),
1986            steps_applied: 0,
1987            status: "skipped (already applied)".to_string(),
1988        }))
1989    } else if results.len() == 1 {
1990        Ok(results.remove(0))
1991    } else {
1992        // Return a summary result
1993        Ok(ExecutionResult::Migration(MigrationResult {
1994            version: "batch".to_string(),
1995            steps_applied: total_applied,
1996            status: "applied".to_string(),
1997        }))
1998    }
1999}
2000
2001// Helper functions
2002
2003/// Extract filter from field arguments
2004fn extract_filter_from_args(args: &[ast::Argument]) -> Result<Option<AqlFilter>> {
2005    for arg in args {
2006        if arg.name == "where" || arg.name == "filter" {
2007            return Ok(Some(value_to_filter(&arg.value)?));
2008        }
2009    }
2010    Ok(None)
2011}
2012
2013/// Extract orderBy from arguments
2014/// Supports: orderBy: "field", orderBy: { field: "name", direction: ASC }
2015/// or orderBy: [{ field: "a", direction: ASC }, { field: "b", direction: DESC }]
2016fn extract_order_by(args: &[ast::Argument]) -> Vec<ast::Ordering> {
2017    let mut orderings = Vec::new();
2018
2019    for arg in args {
2020        if arg.name == "orderBy" {
2021            match &arg.value {
2022                // Simple string: orderBy: "fieldName"
2023                ast::Value::String(field) => {
2024                    orderings.push(ast::Ordering {
2025                        field: field.clone(),
2026                        direction: ast::SortDirection::Asc,
2027                    });
2028                }
2029                // Object: orderBy: { field: "x", direction: ASC }
2030                ast::Value::Object(map) => {
2031                    if let Some(ordering) = parse_ordering_object(map) {
2032                        orderings.push(ordering);
2033                    }
2034                }
2035                // Array: orderBy: [{ field: "a", direction: ASC }, ...]
2036                ast::Value::Array(arr) => {
2037                    for val in arr {
2038                        if let ast::Value::Object(map) = val {
2039                            if let Some(ordering) = parse_ordering_object(map) {
2040                                orderings.push(ordering);
2041                            }
2042                        }
2043                    }
2044                }
2045                _ => {}
2046            }
2047        }
2048    }
2049
2050    orderings
2051}
2052
2053/// Parse an ordering object { field: "x", direction: ASC } OR { x: ASC }
2054fn parse_ordering_object(map: &HashMap<String, ast::Value>) -> Option<ast::Ordering> {
2055    // 1. Try explicit format: { field: "x", direction: ASC }
2056    if let Some(ast::Value::String(field_name)) = map.get("field") {
2057        let direction = map
2058            .get("direction")
2059            .and_then(|v| match v {
2060                ast::Value::String(s) | ast::Value::Enum(s) => match s.to_uppercase().as_str() {
2061                    "ASC" | "ASCENDING" => Some(ast::SortDirection::Asc),
2062                    "DESC" | "DESCENDING" => Some(ast::SortDirection::Desc),
2063                    _ => None,
2064                },
2065                _ => None,
2066            })
2067            .unwrap_or(ast::SortDirection::Asc);
2068
2069        return Some(ast::Ordering {
2070            field: field_name.clone(),
2071            direction,
2072        });
2073    }
2074
2075    // 2. Try shorthand format: { score: DESC } (single key)
2076    if map.len() == 1 {
2077        let (key, val) = map.iter().next().unwrap();
2078        // Check if value is a valid direction
2079        let direction = match val {
2080            ast::Value::String(s) | ast::Value::Enum(s) => match s.to_uppercase().as_str() {
2081                "ASC" | "ASCENDING" => Some(ast::SortDirection::Asc),
2082                "DESC" | "DESCENDING" => Some(ast::SortDirection::Desc),
2083                _ => None,
2084            },
2085            _ => None,
2086        };
2087
2088        if let Some(dir) = direction {
2089            return Some(ast::Ordering {
2090                field: key.clone(),
2091                direction: dir,
2092            });
2093        }
2094    }
2095
2096    None
2097}
2098
2099/// Apply ordering to documents
2100fn apply_ordering(docs: &mut [Document], orderings: &[ast::Ordering]) {
2101    if orderings.is_empty() {
2102        return;
2103    }
2104
2105    docs.sort_by(|a, b| {
2106        for ordering in orderings {
2107            let a_val = a.data.get(&ordering.field);
2108            let b_val = b.data.get(&ordering.field);
2109
2110            let cmp = compare_values(a_val, b_val);
2111
2112            if cmp != std::cmp::Ordering::Equal {
2113                return match ordering.direction {
2114                    ast::SortDirection::Asc => cmp,
2115                    ast::SortDirection::Desc => cmp.reverse(),
2116                };
2117            }
2118        }
2119        std::cmp::Ordering::Equal
2120    });
2121}
2122
2123/// Compare two optional values for sorting
2124fn compare_values(a: Option<&Value>, b: Option<&Value>) -> std::cmp::Ordering {
2125    match (a, b) {
2126        (None, None) => std::cmp::Ordering::Equal,
2127        (None, Some(_)) => std::cmp::Ordering::Less,
2128        (Some(_), None) => std::cmp::Ordering::Greater,
2129        (Some(av), Some(bv)) => {
2130            match (av, bv) {
2131                (Value::Int(ai), Value::Int(bi)) => ai.cmp(bi),
2132                (Value::Float(af), Value::Float(bf)) => {
2133                    af.partial_cmp(bf).unwrap_or(std::cmp::Ordering::Equal)
2134                }
2135                (Value::String(as_), Value::String(bs)) => as_.cmp(bs),
2136                (Value::Bool(ab), Value::Bool(bb)) => ab.cmp(bb),
2137                // Cross-type: convert to string
2138                _ => format!("{:?}", av).cmp(&format!("{:?}", bv)),
2139            }
2140        }
2141    }
2142}
2143
2144/// Convert AQL Filter to EventFilter
2145fn convert_aql_filter_to_event_filter(filter: &AqlFilter) -> Option<crate::pubsub::EventFilter> {
2146    use crate::pubsub::EventFilter;
2147
2148    match filter {
2149        AqlFilter::Eq(field, value) => {
2150            let db_val = aql_value_to_db_value(value).ok()?;
2151            Some(EventFilter::FieldEquals(field.clone(), db_val))
2152        }
2153        AqlFilter::Gt(field, value) => {
2154            let db_val = aql_value_to_db_value(value).ok()?;
2155            Some(EventFilter::Gt(field.clone(), db_val))
2156        }
2157        AqlFilter::Gte(field, value) => {
2158            let db_val = aql_value_to_db_value(value).ok()?;
2159            Some(EventFilter::Gte(field.clone(), db_val))
2160        }
2161        AqlFilter::Lt(field, value) => {
2162            let db_val = aql_value_to_db_value(value).ok()?;
2163            Some(EventFilter::Lt(field.clone(), db_val))
2164        }
2165        AqlFilter::Lte(field, value) => {
2166            let db_val = aql_value_to_db_value(value).ok()?;
2167            Some(EventFilter::Lte(field.clone(), db_val))
2168        }
2169        AqlFilter::Ne(field, value) => {
2170            let db_val = aql_value_to_db_value(value).ok()?;
2171            Some(EventFilter::Ne(field.clone(), db_val))
2172        }
2173        AqlFilter::In(field, value) => {
2174            let db_val = aql_value_to_db_value(value).ok()?;
2175            Some(EventFilter::In(field.clone(), db_val))
2176        }
2177        AqlFilter::NotIn(field, value) => {
2178            let db_val = aql_value_to_db_value(value).ok()?;
2179            Some(EventFilter::NotIn(field.clone(), db_val))
2180        }
2181        AqlFilter::And(filters) => {
2182            let mut event_filters = Vec::new();
2183            for f in filters {
2184                if let Some(ef) = convert_aql_filter_to_event_filter(f) {
2185                    event_filters.push(ef);
2186                } else {
2187                    return None; // Cannot fully convert
2188                }
2189            }
2190            Some(EventFilter::And(event_filters))
2191        }
2192        AqlFilter::Or(filters) => {
2193            let mut event_filters = Vec::new();
2194            for f in filters {
2195                if let Some(ef) = convert_aql_filter_to_event_filter(f) {
2196                    event_filters.push(ef);
2197                } else {
2198                    return None;
2199                }
2200            }
2201            Some(EventFilter::Or(event_filters))
2202        }
2203        AqlFilter::Not(filter) => {
2204            convert_aql_filter_to_event_filter(filter).map(|f| EventFilter::Not(Box::new(f)))
2205        }
2206        AqlFilter::Contains(field, value) => {
2207            let db_val = aql_value_to_db_value(value).ok()?;
2208            Some(EventFilter::Contains(field.clone(), db_val))
2209        }
2210        AqlFilter::StartsWith(field, value) => {
2211            let db_val = aql_value_to_db_value(value).ok()?;
2212            Some(EventFilter::StartsWith(field.clone(), db_val))
2213        }
2214        AqlFilter::EndsWith(field, value) => {
2215            let db_val = aql_value_to_db_value(value).ok()?;
2216            Some(EventFilter::EndsWith(field.clone(), db_val))
2217        }
2218        AqlFilter::IsNull(field) => Some(EventFilter::IsNull(field.clone())),
2219        AqlFilter::IsNotNull(field) => Some(EventFilter::IsNotNull(field.clone())),
2220
2221        // Unsupported
2222        AqlFilter::Matches(_, _) => None,
2223    }
2224}
2225
2226/// Extract pagination from field arguments
2227pub fn extract_pagination(args: &[ast::Argument]) -> (Option<usize>, usize) {
2228    let mut limit = None;
2229    let mut offset = 0;
2230
2231    for arg in args {
2232        match arg.name.as_str() {
2233            "limit" | "first" | "take" => {
2234                if let ast::Value::Int(n) = arg.value {
2235                    limit = Some(n as usize);
2236                }
2237            }
2238            "offset" | "skip" => {
2239                if let ast::Value::Int(n) = arg.value {
2240                    offset = n as usize;
2241                }
2242            }
2243            _ => {}
2244        }
2245    }
2246
2247    (limit, offset)
2248}
2249
2250fn extract_cursor_pagination(
2251    args: &[ast::Argument],
2252) -> (Option<usize>, Option<String>, Option<usize>, Option<String>) {
2253    let mut first = None;
2254    let mut after = None;
2255    let mut last = None;
2256    let mut before = None;
2257
2258    for arg in args {
2259        match arg.name.as_str() {
2260            "first" => {
2261                if let ast::Value::Int(n) = arg.value {
2262                    first = Some(n as usize);
2263                }
2264            }
2265            "after" => {
2266                if let ast::Value::String(ref s) = arg.value {
2267                    after = Some(s.clone());
2268                }
2269            }
2270            "last" => {
2271                if let ast::Value::Int(n) = arg.value {
2272                    last = Some(n as usize);
2273                }
2274            }
2275            "before" => {
2276                if let ast::Value::String(ref s) = arg.value {
2277                    before = Some(s.clone());
2278                }
2279            }
2280            _ => {}
2281        }
2282    }
2283
2284    (first, after, last, before)
2285}
2286
2287fn encode_cursor(val: &Value) -> String {
2288    let s = match val {
2289        Value::String(s) => s.clone(),
2290        _ => String::new(),
2291    };
2292    general_purpose::STANDARD.encode(s)
2293}
2294
2295fn decode_cursor(cursor: &str) -> Result<String> {
2296    let bytes = general_purpose::STANDARD
2297        .decode(cursor)
2298        .map_err(|_| AqlError::new(ErrorCode::QueryError, "Invalid cursor".to_string()))?;
2299    String::from_utf8(bytes)
2300        .map_err(|_| AqlError::new(ErrorCode::QueryError, "Invalid cursor UTF-8".to_string()))
2301}
2302
2303fn get_doc_value_at_path<'a>(doc: &'a Document, path: &str) -> Option<&'a Value> {
2304    // Optimization: Check for exact key match first (supports "user.name" as a literal key)
2305    if let Some(val) = doc.data.get(path) {
2306        return Some(val);
2307    }
2308
2309    if !path.contains('.') {
2310        return None;
2311    }
2312
2313    let parts: Vec<&str> = path.split('.').collect();
2314    let mut current = doc.data.get(parts[0])?;
2315
2316    for &part in &parts[1..] {
2317        if let Value::Object(map) = current {
2318            current = map.get(part)?;
2319        } else {
2320            return None;
2321        }
2322    }
2323
2324    Some(current)
2325}
2326
2327/// Check if a document matches a filter
2328pub fn matches_filter(
2329    doc: &Document,
2330    filter: &CompiledFilter,
2331    variables: &HashMap<String, ast::Value>,
2332) -> bool {
2333    match filter {
2334        CompiledFilter::Eq(field, value) => get_doc_value_at_path(doc, field)
2335            .map(|v| values_equal(v, value, variables))
2336            .unwrap_or(false),
2337        CompiledFilter::Ne(field, value) => get_doc_value_at_path(doc, field)
2338            .map(|v| !values_bit_equal(v, value, variables))
2339            .unwrap_or(true),
2340        CompiledFilter::Gt(field, value) => get_doc_value_at_path(doc, field)
2341            .map(|v| value_compare(v, value, variables) == Some(std::cmp::Ordering::Greater))
2342            .unwrap_or(false),
2343        CompiledFilter::Gte(field, value) => get_doc_value_at_path(doc, field)
2344            .map(|v| {
2345                matches!(
2346                    value_compare(v, value, variables),
2347                    Some(std::cmp::Ordering::Greater | std::cmp::Ordering::Equal)
2348                )
2349            })
2350            .unwrap_or(false),
2351        CompiledFilter::Lt(field, value) => get_doc_value_at_path(doc, field)
2352            .map(|v| value_compare(v, value, variables) == Some(std::cmp::Ordering::Less))
2353            .unwrap_or(false),
2354        CompiledFilter::Lte(field, value) => get_doc_value_at_path(doc, field)
2355            .map(|v| {
2356                matches!(
2357                    value_compare(v, value, variables),
2358                    Some(std::cmp::Ordering::Less | std::cmp::Ordering::Equal)
2359                )
2360            })
2361            .unwrap_or(false),
2362        CompiledFilter::In(field, value) => {
2363            if let ast::Value::Array(arr) = value {
2364                get_doc_value_at_path(doc, field)
2365                    .map(|v| arr.iter().any(|item| values_equal(v, item, variables)))
2366                    .unwrap_or(false)
2367            } else {
2368                false
2369            }
2370        }
2371        CompiledFilter::NotIn(field, value) => {
2372            if let ast::Value::Array(arr) = value {
2373                get_doc_value_at_path(doc, field)
2374                    .map(|v| !arr.iter().any(|item| values_equal(v, item, variables)))
2375                    .unwrap_or(true)
2376            } else {
2377                true
2378            }
2379        }
2380        CompiledFilter::Contains(field, value) => {
2381            match (get_doc_value_at_path(doc, field), value) {
2382                (Some(Value::String(doc_val)), ast::Value::String(search)) => {
2383                    doc_val.contains(search)
2384                }
2385                (Some(Value::Array(doc_arr)), ast::Value::String(search)) => {
2386                    doc_arr.iter().any(|v| {
2387                        if let Value::String(s) = v {
2388                            s == search
2389                        } else {
2390                            false
2391                        }
2392                    })
2393                }
2394                _ => false,
2395            }
2396        }
2397        CompiledFilter::StartsWith(field, value) => {
2398            if let (Some(Value::String(doc_val)), ast::Value::String(prefix)) =
2399                (get_doc_value_at_path(doc, field), value)
2400            {
2401                doc_val.starts_with(prefix)
2402            } else {
2403                false
2404            }
2405        }
2406        CompiledFilter::EndsWith(field, value) => {
2407            if let (Some(Value::String(doc_val)), ast::Value::String(suffix)) =
2408                (get_doc_value_at_path(doc, field), value)
2409            {
2410                doc_val.ends_with(suffix)
2411            } else {
2412                false
2413            }
2414        }
2415        CompiledFilter::Matches(field, regex) => {
2416            if let Some(Value::String(doc_val)) = get_doc_value_at_path(doc, field) {
2417                regex.is_match(doc_val)
2418            } else {
2419                false
2420            }
2421        }
2422        CompiledFilter::IsNull(field) => get_doc_value_at_path(doc, field)
2423            .map(|v| matches!(v, Value::Null))
2424            .unwrap_or(true),
2425        CompiledFilter::IsNotNull(field) => get_doc_value_at_path(doc, field)
2426            .map(|v| !matches!(v, Value::Null))
2427            .unwrap_or(false),
2428        CompiledFilter::And(filters) => filters.iter().all(|f| matches_filter(doc, f, variables)),
2429        CompiledFilter::Or(filters) => filters.iter().any(|f| matches_filter(doc, f, variables)),
2430        CompiledFilter::Not(filter) => !matches_filter(doc, filter, variables),
2431    }
2432}
2433
2434/// Check if two values are equal
2435fn values_equal(
2436    db_val: &Value,
2437    aql_val: &ast::Value,
2438    variables: &HashMap<String, ast::Value>,
2439) -> bool {
2440    let resolved = resolve_if_variable(aql_val, variables);
2441    match (db_val, resolved) {
2442        (Value::Null, ast::Value::Null) => true,
2443        (Value::Bool(a), ast::Value::Boolean(b)) => *a == *b,
2444        (Value::Int(a), ast::Value::Int(b)) => *a == *b,
2445        (Value::Float(a), ast::Value::Float(b)) => (*a - *b).abs() < f64::EPSILON,
2446        (Value::Float(a), ast::Value::Int(b)) => (*a - (*b as f64)).abs() < f64::EPSILON,
2447        (Value::Int(a), ast::Value::Float(b)) => ((*a as f64) - *b).abs() < f64::EPSILON,
2448        (Value::String(a), ast::Value::String(b)) => a == b,
2449        _ => false,
2450    }
2451}
2452
2453/// Check if two values are equal using strict bitwise comparison (no type coercion, strict float bits)
2454fn values_bit_equal(
2455    db_val: &Value,
2456    aql_val: &ast::Value,
2457    variables: &HashMap<String, ast::Value>,
2458) -> bool {
2459    let resolved = resolve_if_variable(aql_val, variables);
2460    match (db_val, resolved) {
2461        (Value::Null, ast::Value::Null) => true,
2462        (Value::Bool(a), ast::Value::Boolean(b)) => *a == *b,
2463        (Value::Int(a), ast::Value::Int(b)) => *a == *b,
2464        (Value::Float(a), ast::Value::Float(b)) => a.to_bits() == b.to_bits(),
2465        (Value::String(a), ast::Value::String(b)) => a == b,
2466        // No mixed type int/float comparison for bit equality
2467        _ => false,
2468    }
2469}
2470
2471/// Compare two values
2472fn value_compare(
2473    db_val: &Value,
2474    aql_val: &ast::Value,
2475    variables: &HashMap<String, ast::Value>,
2476) -> Option<std::cmp::Ordering> {
2477    let resolved = resolve_if_variable(aql_val, variables);
2478    match (db_val, resolved) {
2479        (Value::Int(a), ast::Value::Int(b)) => Some(a.cmp(b)),
2480        (Value::Float(a), ast::Value::Float(b)) => a.partial_cmp(b),
2481        (Value::Float(a), ast::Value::Int(b)) => a.partial_cmp(&(*b as f64)),
2482        (Value::Int(a), ast::Value::Float(b)) => (*a as f64).partial_cmp(b),
2483        (Value::String(a), ast::Value::String(b)) => Some(a.cmp(b)),
2484        _ => None,
2485    }
2486}
2487
2488/// Resolve a variable reference
2489fn resolve_if_variable<'a>(
2490    val: &'a ast::Value,
2491    variables: &'a HashMap<String, ast::Value>,
2492) -> &'a ast::Value {
2493    if let ast::Value::Variable(name) = val {
2494        variables.get(name).unwrap_or(val)
2495    } else {
2496        val
2497    }
2498}
2499
2500/// Apply projection to a document (keep only selected fields)
2501pub fn apply_projection(mut doc: Document, fields: &[ast::Field]) -> Document {
2502    if fields.is_empty() {
2503        return doc;
2504    }
2505
2506    let mut projected_data = HashMap::new();
2507
2508    // Always include id
2509    if let Some(id_val) = doc.data.get("id") {
2510        projected_data.insert("id".to_string(), id_val.clone());
2511    }
2512
2513    for field in fields {
2514        let field_name = field.alias.as_ref().unwrap_or(&field.name);
2515        let source_name = &field.name;
2516
2517        if let Some(value) = doc.data.get(source_name) {
2518            projected_data.insert(field_name.clone(), value.clone());
2519        }
2520    }
2521
2522    doc.data = projected_data;
2523    doc
2524}
2525
2526/// Convert AQL Value to DB Value
2527pub fn aql_value_to_db_value(val: &ast::Value) -> Result<Value> {
2528    match val {
2529        ast::Value::Null => Ok(Value::Null),
2530        ast::Value::Boolean(b) => Ok(Value::Bool(*b)),
2531        ast::Value::Int(i) => Ok(Value::Int(*i)),
2532        ast::Value::Float(f) => Ok(Value::Float(*f)),
2533        ast::Value::String(s) => Ok(Value::String(s.clone())),
2534        ast::Value::Array(arr) => {
2535            let converted: Result<Vec<Value>> = arr.iter().map(aql_value_to_db_value).collect();
2536            Ok(Value::Array(converted?))
2537        }
2538        ast::Value::Object(map) => {
2539            let mut converted = HashMap::new();
2540            for (k, v) in map {
2541                converted.insert(k.clone(), aql_value_to_db_value(v)?);
2542            }
2543            Ok(Value::Object(converted))
2544        }
2545        ast::Value::Variable(name) => Err(AqlError::new(
2546            ErrorCode::QueryError,
2547            format!("Unresolved variable: {}", name),
2548        )),
2549        ast::Value::Enum(e) => Ok(Value::String(e.clone())),
2550    }
2551}
2552
2553/// Convert AQL Value to HashMap (for insert/update data)
2554fn aql_value_to_hashmap(val: &ast::Value) -> Result<HashMap<String, Value>> {
2555    match val {
2556        ast::Value::Object(map) => {
2557            let mut converted = HashMap::new();
2558            for (k, v) in map {
2559                converted.insert(k.clone(), aql_value_to_db_value(v)?);
2560            }
2561            Ok(converted)
2562        }
2563        _ => Err(AqlError::new(
2564            ErrorCode::QueryError,
2565            "Data must be an object".to_string(),
2566        )),
2567    }
2568}
2569
2570/// Convert DB Value to AQL Value
2571pub fn db_value_to_aql_value(val: &Value) -> ast::Value {
2572    match val {
2573        Value::Null => ast::Value::Null,
2574        Value::Bool(b) => ast::Value::Boolean(*b),
2575        Value::Int(i) => ast::Value::Int(*i),
2576        Value::Float(f) => ast::Value::Float(*f),
2577        Value::String(s) => ast::Value::String(s.clone()),
2578        Value::Array(arr) => ast::Value::Array(arr.iter().map(db_value_to_aql_value).collect()),
2579        Value::Object(map) => ast::Value::Object(
2580            map.iter()
2581                .map(|(k, v)| (k.clone(), db_value_to_aql_value(v)))
2582                .collect(),
2583        ),
2584        Value::Uuid(u) => ast::Value::String(u.to_string()),
2585    }
2586}
2587
2588/// Convert filter from AST Value
2589pub fn value_to_filter(value: &ast::Value) -> Result<AqlFilter> {
2590    match value {
2591        ast::Value::Object(map) => {
2592            let mut filters = Vec::new();
2593            for (key, val) in map {
2594                match key.as_str() {
2595                    "and" => {
2596                        if let ast::Value::Array(arr) = val {
2597                            let sub: Result<Vec<_>> = arr.iter().map(value_to_filter).collect();
2598                            filters.push(AqlFilter::And(sub?));
2599                        }
2600                    }
2601                    "or" => {
2602                        if let ast::Value::Array(arr) = val {
2603                            let sub: Result<Vec<_>> = arr.iter().map(value_to_filter).collect();
2604                            filters.push(AqlFilter::Or(sub?));
2605                        }
2606                    }
2607                    "not" => filters.push(AqlFilter::Not(Box::new(value_to_filter(val)?))),
2608                    field => {
2609                        if let ast::Value::Object(ops) = val {
2610                            for (op, op_val) in ops {
2611                                let f = match op.as_str() {
2612                                    "eq" => AqlFilter::Eq(field.to_string(), op_val.clone()),
2613                                    "ne" => AqlFilter::Ne(field.to_string(), op_val.clone()),
2614                                    "gt" => AqlFilter::Gt(field.to_string(), op_val.clone()),
2615                                    "gte" => AqlFilter::Gte(field.to_string(), op_val.clone()),
2616                                    "lt" => AqlFilter::Lt(field.to_string(), op_val.clone()),
2617                                    "lte" => AqlFilter::Lte(field.to_string(), op_val.clone()),
2618                                    "in" => AqlFilter::In(field.to_string(), op_val.clone()),
2619                                    "nin" => AqlFilter::NotIn(field.to_string(), op_val.clone()),
2620                                    "contains" => {
2621                                        AqlFilter::Contains(field.to_string(), op_val.clone())
2622                                    }
2623                                    "startsWith" => {
2624                                        AqlFilter::StartsWith(field.to_string(), op_val.clone())
2625                                    }
2626                                    "endsWith" => {
2627                                        AqlFilter::EndsWith(field.to_string(), op_val.clone())
2628                                    }
2629                                    "isNull" => AqlFilter::IsNull(field.to_string()),
2630                                    "isNotNull" => AqlFilter::IsNotNull(field.to_string()),
2631                                    _ => continue,
2632                                };
2633                                filters.push(f);
2634                            }
2635                        }
2636                    }
2637                }
2638            }
2639            if filters.len() == 1 {
2640                Ok(filters.remove(0))
2641            } else {
2642                Ok(AqlFilter::And(filters))
2643            }
2644        }
2645        _ => Err(AqlError::new(
2646            ErrorCode::QueryError,
2647            "Filter must be an object".to_string(),
2648        )),
2649    }
2650}
2651
2652/// Check if a document matches an AST filter
2653fn check_ast_filter_match(filter: &ast::Filter, doc: &Document) -> bool {
2654    match filter {
2655        ast::Filter::Eq(field, val) => check_cmp(doc, field, val, |a, b| a == b),
2656        ast::Filter::Ne(field, val) => check_cmp(doc, field, val, |a, b| a != b),
2657        ast::Filter::Gt(field, val) => check_cmp(doc, field, val, |a, b| a > b),
2658        ast::Filter::Gte(field, val) => check_cmp(doc, field, val, |a, b| a >= b),
2659        ast::Filter::Lt(field, val) => check_cmp(doc, field, val, |a, b| a < b),
2660        ast::Filter::Lte(field, val) => check_cmp(doc, field, val, |a, b| a <= b),
2661        ast::Filter::In(field, val) => {
2662            if let Ok(db_val) = aql_value_to_db_value(val) {
2663                if let Some(doc_val) = doc.data.get(field) {
2664                    if let Value::Array(arr) = db_val {
2665                        return arr.contains(doc_val);
2666                    }
2667                }
2668            }
2669            false
2670        }
2671        ast::Filter::And(filters) => filters.iter().all(|f| check_ast_filter_match(f, doc)),
2672        ast::Filter::Or(filters) => filters.iter().any(|f| check_ast_filter_match(f, doc)),
2673        ast::Filter::Not(filter) => !check_ast_filter_match(filter, doc),
2674        _ => true, // Ignore other filters for now
2675    }
2676}
2677
2678fn check_cmp<F>(doc: &Document, field: &str, val: &ast::Value, op: F) -> bool
2679where
2680    F: Fn(&Value, &Value) -> bool,
2681{
2682    if let Some(doc_val) = doc.data.get(field) {
2683        if let Ok(cmp_val) = aql_value_to_db_value(val) {
2684            return op(doc_val, &cmp_val);
2685        }
2686    }
2687    false
2688}
2689
2690// Dynamic Resolution Helpers
2691
2692/// Recursively resolve values, replacing strings starting with $ with context values
2693fn resolve_value(
2694    val: &ast::Value,
2695    variables: &HashMap<String, ast::Value>,
2696    context: &ExecutionContext,
2697) -> ast::Value {
2698    match val {
2699        ast::Value::Variable(name) => {
2700            if let Some(v) = variables.get(name) {
2701                v.clone()
2702            } else {
2703                // Variable not found - log warning and return Null
2704                // This allows optional variables but provides visibility for debugging
2705                #[cfg(debug_assertions)]
2706                eprintln!(
2707                    "Warning: Variable '{}' not found in resolve_value, defaulting to Null",
2708                    name
2709                );
2710
2711                // TODO: Consider making this an error in strict mode
2712                // For now, returning Null maintains backward compatibility
2713                ast::Value::Null
2714            }
2715        }
2716        ast::Value::String(s) if s.starts_with('$') => {
2717            // Context resolution (e.g. results from previous ops)
2718            match resolve_variable_path(s, context) {
2719                Some(v) => v,
2720                None => val.clone(),
2721            }
2722        }
2723        ast::Value::Array(arr) => ast::Value::Array(
2724            arr.iter()
2725                .map(|v| resolve_value(v, variables, context))
2726                .collect(),
2727        ),
2728        ast::Value::Object(map) => {
2729            let mut resolved_map = HashMap::new();
2730            for (k, v) in map {
2731                resolved_map.insert(k.clone(), resolve_value(v, variables, context));
2732            }
2733            ast::Value::Object(resolved_map)
2734        }
2735        _ => val.clone(),
2736    }
2737}
2738
2739/// Resolve a variable path like "$alias.field.subfield"
2740fn resolve_variable_path(path: &str, context: &ExecutionContext) -> Option<ast::Value> {
2741    let path = path.trim_start_matches('$');
2742    let parts: Vec<&str> = path.split('.').collect();
2743
2744    if parts.is_empty() {
2745        return None;
2746    }
2747
2748    // First part is the alias
2749    let alias = parts[0];
2750    let mut current_value = context.get(alias)?;
2751
2752    // Traverse remaining parts
2753    for part in &parts[1..] {
2754        match current_value {
2755            serde_json::Value::Object(map) => {
2756                current_value = map.get(*part)?;
2757            }
2758            serde_json::Value::Array(arr) => {
2759                // Support array indexing? e.g. "users.0.id"
2760                if let Ok(idx) = part.parse::<usize>() {
2761                    current_value = arr.get(idx)?;
2762                } else {
2763                    return None;
2764                }
2765            }
2766            _ => return None,
2767        }
2768    }
2769
2770    // Convert serde_json::Value back to ast::Value
2771    Some(json_to_ast_value(current_value))
2772}
2773
2774fn json_to_ast_value(json: &serde_json::Value) -> ast::Value {
2775    match json {
2776        serde_json::Value::Null => ast::Value::Null,
2777        serde_json::Value::Bool(b) => ast::Value::Boolean(*b),
2778        serde_json::Value::Number(n) => {
2779            if let Some(i) = n.as_i64() {
2780                ast::Value::Int(i)
2781            } else if let Some(f) = n.as_f64() {
2782                ast::Value::Float(f)
2783            } else {
2784                ast::Value::Null // Should handle u64 appropriately if needed
2785            }
2786        }
2787        serde_json::Value::String(s) => ast::Value::String(s.clone()),
2788        serde_json::Value::Array(arr) => {
2789            ast::Value::Array(arr.iter().map(json_to_ast_value).collect())
2790        }
2791        serde_json::Value::Object(map) => {
2792            let mut new_map = HashMap::new();
2793            for (k, v) in map {
2794                new_map.insert(k.clone(), json_to_ast_value(v));
2795            }
2796            ast::Value::Object(new_map)
2797        }
2798    }
2799}
2800
2801fn aurora_value_to_json_value(v: &Value) -> JsonValue {
2802    match v {
2803        Value::Null => JsonValue::Null,
2804        Value::String(s) => JsonValue::String(s.clone()),
2805        Value::Int(i) => JsonValue::Number((*i).into()),
2806        Value::Float(f) => {
2807            if let Some(n) = serde_json::Number::from_f64(*f) {
2808                JsonValue::Number(n)
2809            } else {
2810                JsonValue::Null
2811            }
2812        }
2813        Value::Bool(b) => JsonValue::Bool(*b),
2814        Value::Array(arr) => JsonValue::Array(arr.iter().map(aurora_value_to_json_value).collect()),
2815        Value::Object(map) => {
2816            let mut json_map = serde_json::Map::new();
2817            for (k, v) in map {
2818                json_map.insert(k.clone(), aurora_value_to_json_value(v));
2819            }
2820            JsonValue::Object(json_map)
2821        }
2822        Value::Uuid(u) => JsonValue::String(u.to_string()),
2823    }
2824}
2825
2826#[cfg(test)]
2827mod tests {
2828    use super::*;
2829
2830    #[test]
2831    fn test_aql_value_conversion() {
2832        let aql_val = ast::Value::Object({
2833            let mut map = HashMap::new();
2834            map.insert("name".to_string(), ast::Value::String("John".to_string()));
2835            map.insert("age".to_string(), ast::Value::Int(30));
2836            map
2837        });
2838
2839        let db_val = aql_value_to_db_value(&aql_val).unwrap();
2840        if let Value::Object(map) = db_val {
2841            assert_eq!(map.get("name"), Some(&Value::String("John".to_string())));
2842            assert_eq!(map.get("age"), Some(&Value::Int(30)));
2843        } else {
2844            panic!("Expected Object");
2845        }
2846    }
2847
2848    #[test]
2849    fn test_matches_filter_eq() {
2850        let mut doc = Document::new();
2851        doc.data
2852            .insert("name".to_string(), Value::String("Alice".to_string()));
2853        doc.data.insert("age".to_string(), Value::Int(25));
2854
2855        let filter = AqlFilter::Eq("name".to_string(), ast::Value::String("Alice".to_string()));
2856        let compiled = compile_filter(&filter).unwrap();
2857        assert!(matches_filter(&doc, &compiled, &HashMap::new()));
2858
2859        let filter = AqlFilter::Eq("name".to_string(), ast::Value::String("Bob".to_string()));
2860        let compiled = compile_filter(&filter).unwrap();
2861        assert!(!matches_filter(&doc, &compiled, &HashMap::new()));
2862    }
2863
2864    #[test]
2865    fn test_matches_filter_comparison() {
2866        let mut doc = Document::new();
2867        doc.data.insert("age".to_string(), Value::Int(25));
2868
2869        let filter = AqlFilter::Gt("age".to_string(), ast::Value::Int(20));
2870        let compiled = compile_filter(&filter).unwrap();
2871        assert!(matches_filter(&doc, &compiled, &HashMap::new()));
2872
2873        let filter = AqlFilter::Gt("age".to_string(), ast::Value::Int(30));
2874        let compiled = compile_filter(&filter).unwrap();
2875        assert!(!matches_filter(&doc, &compiled, &HashMap::new()));
2876
2877        let filter = AqlFilter::Gte("age".to_string(), ast::Value::Int(25));
2878        let compiled = compile_filter(&filter).unwrap();
2879        assert!(matches_filter(&doc, &compiled, &HashMap::new()));
2880
2881        let filter = AqlFilter::Lt("age".to_string(), ast::Value::Int(30));
2882        let compiled = compile_filter(&filter).unwrap();
2883        assert!(matches_filter(&doc, &compiled, &HashMap::new()));
2884    }
2885
2886    #[test]
2887    fn test_matches_filter_and_or() {
2888        let mut doc = Document::new();
2889        doc.data
2890            .insert("name".to_string(), Value::String("Alice".to_string()));
2891        doc.data.insert("age".to_string(), Value::Int(25));
2892
2893        let filter = AqlFilter::And(vec![
2894            AqlFilter::Eq("name".to_string(), ast::Value::String("Alice".to_string())),
2895            AqlFilter::Gte("age".to_string(), ast::Value::Int(18)),
2896        ]);
2897        let compiled = compile_filter(&filter).unwrap();
2898        assert!(matches_filter(&doc, &compiled, &HashMap::new()));
2899
2900        let filter = AqlFilter::Or(vec![
2901            AqlFilter::Eq("name".to_string(), ast::Value::String("Bob".to_string())),
2902            AqlFilter::Gte("age".to_string(), ast::Value::Int(18)),
2903        ]);
2904        let compiled = compile_filter(&filter).unwrap();
2905        assert!(matches_filter(&doc, &compiled, &HashMap::new()));
2906    }
2907
2908    #[test]
2909    fn test_matches_filter_string_ops() {
2910        let mut doc = Document::new();
2911        doc.data.insert(
2912            "email".to_string(),
2913            Value::String("alice@example.com".to_string()),
2914        );
2915
2916        let filter = AqlFilter::Contains(
2917            "email".to_string(),
2918            ast::Value::String("example".to_string()),
2919        );
2920        let compiled = compile_filter(&filter).unwrap();
2921        assert!(matches_filter(&doc, &compiled, &HashMap::new()));
2922
2923        let filter =
2924            AqlFilter::StartsWith("email".to_string(), ast::Value::String("alice".to_string()));
2925        let compiled = compile_filter(&filter).unwrap();
2926        assert!(matches_filter(&doc, &compiled, &HashMap::new()));
2927
2928        let filter =
2929            AqlFilter::EndsWith("email".to_string(), ast::Value::String(".com".to_string()));
2930        let compiled = compile_filter(&filter).unwrap();
2931        assert!(matches_filter(&doc, &compiled, &HashMap::new()));
2932    }
2933
2934    #[test]
2935    fn test_matches_filter_in() {
2936        let mut doc = Document::new();
2937        doc.data
2938            .insert("status".to_string(), Value::String("active".to_string()));
2939
2940        let filter = AqlFilter::In(
2941            "status".to_string(),
2942            ast::Value::Array(vec![
2943                ast::Value::String("active".to_string()),
2944                ast::Value::String("pending".to_string()),
2945            ]),
2946        );
2947        let compiled = compile_filter(&filter).unwrap();
2948        assert!(matches_filter(&doc, &compiled, &HashMap::new()));
2949
2950        let filter = AqlFilter::In(
2951            "status".to_string(),
2952            ast::Value::Array(vec![ast::Value::String("inactive".to_string())]),
2953        );
2954        let compiled = compile_filter(&filter).unwrap();
2955        assert!(!matches_filter(&doc, &compiled, &HashMap::new()));
2956    }
2957
2958    #[test]
2959    fn test_apply_projection() {
2960        let mut doc = Document::new();
2961        doc.data
2962            .insert("id".to_string(), Value::String("123".to_string()));
2963        doc.data
2964            .insert("name".to_string(), Value::String("Alice".to_string()));
2965        doc.data.insert(
2966            "email".to_string(),
2967            Value::String("alice@example.com".to_string()),
2968        );
2969        doc.data
2970            .insert("password".to_string(), Value::String("secret".to_string()));
2971
2972        let fields = vec![
2973            ast::Field {
2974                alias: None,
2975                name: "id".to_string(),
2976                arguments: vec![],
2977                directives: vec![],
2978                selection_set: vec![],
2979            },
2980            ast::Field {
2981                alias: None,
2982                name: "name".to_string(),
2983                arguments: vec![],
2984                directives: vec![],
2985                selection_set: vec![],
2986            },
2987        ];
2988
2989        let projected = apply_projection(doc, &fields);
2990        assert_eq!(projected.data.len(), 2);
2991        assert!(projected.data.contains_key("id"));
2992        assert!(projected.data.contains_key("name"));
2993        assert!(!projected.data.contains_key("email"));
2994        assert!(!projected.data.contains_key("password"));
2995    }
2996
2997    #[test]
2998    fn test_apply_projection_with_alias() {
2999        let mut doc = Document::new();
3000        doc.data
3001            .insert("first_name".to_string(), Value::String("Alice".to_string()));
3002
3003        let fields = vec![ast::Field {
3004            alias: Some("name".to_string()),
3005            name: "first_name".to_string(),
3006            arguments: vec![],
3007            directives: vec![],
3008            selection_set: vec![],
3009        }];
3010
3011        let projected = apply_projection(doc, &fields);
3012        assert!(projected.data.contains_key("name"));
3013        assert!(!projected.data.contains_key("first_name"));
3014    }
3015
3016    #[test]
3017    fn test_extract_pagination() {
3018        let args = vec![
3019            ast::Argument {
3020                name: "limit".to_string(),
3021                value: ast::Value::Int(10),
3022            },
3023            ast::Argument {
3024                name: "offset".to_string(),
3025                value: ast::Value::Int(20),
3026            },
3027        ];
3028
3029        let (limit, offset) = extract_pagination(&args);
3030        assert_eq!(limit, Some(10));
3031        assert_eq!(offset, 20);
3032    }
3033
3034    #[test]
3035    fn test_matches_filter_with_variables() {
3036        let mut doc = Document::new();
3037        doc.data.insert("age".to_string(), Value::Int(25));
3038
3039        let mut variables = HashMap::new();
3040        variables.insert("minAge".to_string(), ast::Value::Int(18));
3041
3042        let filter = AqlFilter::Gte(
3043            "age".to_string(),
3044            ast::Value::Variable("minAge".to_string()),
3045        );
3046        let compiled = compile_filter(&filter).unwrap();
3047        assert!(matches_filter(&doc, &compiled, &variables));
3048    }
3049
3050    #[test]
3051    fn test_values_bit_equal() {
3052        let vars = HashMap::new();
3053
3054        // 1. Floats
3055        let f1 = Value::Float(1.0);
3056        let f1_copy = ast::Value::Float(1.0);
3057        assert!(values_bit_equal(&f1, &f1_copy, &vars));
3058
3059        // 0.0 vs -0.0 (Should be DIFFERENT in bitwise)
3060        let f_zero = Value::Float(0.0);
3061        let f_neg_zero = ast::Value::Float(-0.0);
3062        assert!(!values_bit_equal(&f_zero, &f_neg_zero, &vars));
3063
3064        // NaN vs NaN (Should be SAME in bitwise if same NaN struct, typically distinct NaNs differ)
3065        // Standard == says NaN != NaN. Bitwise can say they are equal if bits match.
3066        let nan = f64::NAN;
3067        let v_nan = Value::Float(nan);
3068        let a_nan = ast::Value::Float(nan);
3069        // We expect bit equality to hold for the exact same NaN
3070        assert!(values_bit_equal(&v_nan, &a_nan, &vars));
3071
3072        // 2. Int vs Float (No coercion)
3073        let i1 = Value::Int(1);
3074        let f1_as_val = ast::Value::Float(1.0);
3075        assert!(!values_bit_equal(&i1, &f1_as_val, &vars));
3076    }
3077
3078    #[test]
3079    fn test_ne_filter_strictness() {
3080        let mut doc = Document::new();
3081        doc.data.insert("val".to_string(), Value::Float(0.0));
3082        doc.data.insert("intVal".to_string(), Value::Int(1));
3083
3084        let vars = HashMap::new();
3085
3086        // Ne 0.0 should be FALSE for 0.0
3087        let filter = AqlFilter::Ne("val".to_string(), ast::Value::Float(0.0));
3088        let compiled = compile_filter(&filter).unwrap();
3089        assert!(!matches_filter(&doc, &compiled, &vars));
3090
3091        // Ne -0.0 should be TRUE for 0.0 (bits differ)
3092        let filter = AqlFilter::Ne("val".to_string(), ast::Value::Float(-0.0));
3093        let compiled = compile_filter(&filter).unwrap();
3094        assert!(matches_filter(&doc, &compiled, &vars));
3095
3096        // Ne 1.0 (float) should be TRUE for 1 (int) because types differ (no coercion)
3097        let filter = AqlFilter::Ne("intVal".to_string(), ast::Value::Float(1.0));
3098        let compiled = compile_filter(&filter).unwrap();
3099        assert!(matches_filter(&doc, &compiled, &vars));
3100    }
3101
3102    #[tokio::test]
3103    async fn test_executor_integration() {
3104        use crate::Aurora;
3105        use tempfile::TempDir;
3106
3107        // Setup - use synchronous config to ensure writes are visible immediately
3108        let temp_dir = TempDir::new().unwrap();
3109        let db_path = temp_dir.path().join("test.db");
3110        let config = crate::AuroraConfig {
3111            db_path,
3112            enable_write_buffering: false,
3113            durability_mode: crate::DurabilityMode::Synchronous,
3114            ..Default::default()
3115        };
3116        let db = Aurora::with_config(config).unwrap();
3117
3118        // Create collection schema first
3119        db.new_collection(
3120            "users",
3121            vec![
3122                ("name".to_string(), crate::FieldType::String, false),
3123                ("age".to_string(), crate::FieldType::Int, false),
3124                ("active".to_string(), crate::FieldType::Bool, false),
3125            ],
3126        )
3127        .await
3128        .unwrap();
3129
3130        // 1. Test Mutation: Insert
3131        let insert_query = r#"
3132            mutation {
3133                insertInto(collection: "users", data: {
3134                    name: "Alice",
3135                    age: 30,
3136                    active: true
3137                }) {
3138                    id
3139                    name
3140                }
3141            }
3142        "#;
3143
3144        let result = execute(&db, insert_query, ExecutionOptions::new())
3145            .await
3146            .unwrap();
3147        match result {
3148            ExecutionResult::Mutation(res) => {
3149                assert_eq!(res.affected_count, 1);
3150                assert_eq!(res.returned_documents.len(), 1);
3151                assert_eq!(
3152                    res.returned_documents[0].data.get("name"),
3153                    Some(&Value::String("Alice".to_string()))
3154                );
3155            }
3156            _ => panic!("Expected mutation result"),
3157        }
3158
3159        // 2. Test Query: Get with filter
3160        let query = r#"
3161            query {
3162                users {
3163                    name
3164                    age
3165                }
3166            }
3167        "#;
3168
3169        let result = execute(&db, query, ExecutionOptions::new()).await.unwrap();
3170        match result {
3171            ExecutionResult::Query(res) => {
3172                assert_eq!(res.documents.len(), 1);
3173                assert_eq!(
3174                    res.documents[0].data.get("name"),
3175                    Some(&Value::String("Alice".to_string()))
3176                );
3177                assert_eq!(res.documents[0].data.get("age"), Some(&Value::Int(30)));
3178            }
3179            _ => panic!("Expected query result"),
3180        }
3181
3182        // 3. Test Mutation: Delete
3183        let delete_query = r#"
3184            mutation {
3185                deleteFrom(collection: "users", filter: { name: { eq: "Alice" } }) {
3186                    id
3187                }
3188            }
3189        "#;
3190
3191        let result = execute(&db, delete_query, ExecutionOptions::new())
3192            .await
3193            .unwrap();
3194        match result {
3195            ExecutionResult::Mutation(res) => {
3196                assert_eq!(res.affected_count, 1);
3197            }
3198            _ => panic!("Expected mutation result"),
3199        }
3200
3201        // 4. Verify Delete
3202        let query = r#"
3203            query {
3204                users {
3205                    name
3206                }
3207            }
3208        "#;
3209
3210        let result = execute(&db, query, ExecutionOptions::new()).await.unwrap();
3211        match result {
3212            ExecutionResult::Query(res) => {
3213                assert_eq!(res.documents.len(), 0);
3214            }
3215            _ => panic!("Expected query result"),
3216        }
3217    }
3218
3219    #[tokio::test]
3220    async fn test_lookup_cross_collection_join() {
3221        // Test the db_values_equal function which is core to lookup matching
3222
3223        // Same string values
3224        assert!(db_values_equal(
3225            &Value::String("user1".to_string()),
3226            &Value::String("user1".to_string())
3227        ));
3228
3229        // Different string values
3230        assert!(!db_values_equal(
3231            &Value::String("user1".to_string()),
3232            &Value::String("user2".to_string())
3233        ));
3234
3235        // Int comparison
3236        assert!(db_values_equal(&Value::Int(42), &Value::Int(42)));
3237
3238        // Cross-type: string can match int
3239        assert!(db_values_equal(
3240            &Value::String("123".to_string()),
3241            &Value::Int(123)
3242        ));
3243
3244        // Null comparison
3245        assert!(db_values_equal(&Value::Null, &Value::Null));
3246
3247        // Bool comparison
3248        assert!(db_values_equal(&Value::Bool(true), &Value::Bool(true)));
3249        assert!(!db_values_equal(&Value::Bool(true), &Value::Bool(false)));
3250    }
3251
3252    // Note: A full integration test for lookup (test_lookup_integration) was removed because it
3253    // depends on schema auto-creation which has pre-existing issues in the executor.
3254    // The test_lookup_cross_collection_join above validates the core db_values_equal matching logic.
3255
3256    #[test]
3257    fn test_order_by_extraction_and_sorting() {
3258        // Test extract_order_by with simple string
3259        let args = vec![ast::Argument {
3260            name: "orderBy".to_string(),
3261            value: ast::Value::String("name".to_string()),
3262        }];
3263        let orderings = extract_order_by(&args);
3264        assert_eq!(orderings.len(), 1);
3265        assert_eq!(orderings[0].field, "name");
3266        assert_eq!(orderings[0].direction, ast::SortDirection::Asc);
3267
3268        // Test extract_order_by with object
3269        let mut order_map = HashMap::new();
3270        order_map.insert("field".to_string(), ast::Value::String("age".to_string()));
3271        order_map.insert(
3272            "direction".to_string(),
3273            ast::Value::Enum("DESC".to_string()),
3274        );
3275        let args = vec![ast::Argument {
3276            name: "orderBy".to_string(),
3277            value: ast::Value::Object(order_map),
3278        }];
3279        let orderings = extract_order_by(&args);
3280        assert_eq!(orderings.len(), 1);
3281        assert_eq!(orderings[0].field, "age");
3282        assert_eq!(orderings[0].direction, ast::SortDirection::Desc);
3283
3284        // Test apply_ordering
3285        let mut docs = vec![
3286            Document {
3287                id: "1".to_string(),
3288                data: {
3289                    let mut m = HashMap::new();
3290                    m.insert("name".to_string(), Value::String("Charlie".to_string()));
3291                    m
3292                },
3293            },
3294            Document {
3295                id: "2".to_string(),
3296                data: {
3297                    let mut m = HashMap::new();
3298                    m.insert("name".to_string(), Value::String("Alice".to_string()));
3299                    m
3300                },
3301            },
3302            Document {
3303                id: "3".to_string(),
3304                data: {
3305                    let mut m = HashMap::new();
3306                    m.insert("name".to_string(), Value::String("Bob".to_string()));
3307                    m
3308                },
3309            },
3310        ];
3311
3312        let orderings = vec![ast::Ordering {
3313            field: "name".to_string(),
3314            direction: ast::SortDirection::Asc,
3315        }];
3316        apply_ordering(&mut docs, &orderings);
3317
3318        assert_eq!(
3319            docs[0].data.get("name"),
3320            Some(&Value::String("Alice".to_string()))
3321        );
3322        assert_eq!(
3323            docs[1].data.get("name"),
3324            Some(&Value::String("Bob".to_string()))
3325        );
3326        assert_eq!(
3327            docs[2].data.get("name"),
3328            Some(&Value::String("Charlie".to_string()))
3329        );
3330    }
3331
3332    #[test]
3333    fn test_validation() {
3334        let doc = Document {
3335            id: "1".to_string(),
3336            data: {
3337                let mut m = HashMap::new();
3338                m.insert("email".to_string(), Value::String("invalid".to_string()));
3339                m.insert("age".to_string(), Value::Int(15));
3340                m.insert("name".to_string(), Value::String("Ab".to_string()));
3341                m
3342            },
3343        };
3344
3345        let rules = vec![
3346            ast::ValidationRule {
3347                field: "email".to_string(),
3348                constraints: vec![ast::ValidationConstraint::Format("email".to_string())],
3349            },
3350            ast::ValidationRule {
3351                field: "age".to_string(),
3352                constraints: vec![ast::ValidationConstraint::Min(18.0)],
3353            },
3354            ast::ValidationRule {
3355                field: "name".to_string(),
3356                constraints: vec![ast::ValidationConstraint::MinLength(3)],
3357            },
3358        ];
3359
3360        let errors = validate_document(&doc, &rules).unwrap();
3361        assert_eq!(errors.len(), 3);
3362        assert!(errors.iter().any(|e| e.contains("email")));
3363        assert!(errors.iter().any(|e| e.contains("age")));
3364        assert!(errors.iter().any(|e| e.contains("name")));
3365    }
3366
3367    #[test]
3368    fn test_downsample() {
3369        let docs = vec![
3370            Document {
3371                id: "1".to_string(),
3372                data: {
3373                    let mut m = HashMap::new();
3374                    m.insert("timestamp".to_string(), Value::Int(0));
3375                    m.insert("value".to_string(), Value::Float(10.0));
3376                    m
3377                },
3378            },
3379            Document {
3380                id: "2".to_string(),
3381                data: {
3382                    let mut m = HashMap::new();
3383                    m.insert("timestamp".to_string(), Value::Int(30));
3384                    m.insert("value".to_string(), Value::Float(20.0));
3385                    m
3386                },
3387            },
3388            Document {
3389                id: "3".to_string(),
3390                data: {
3391                    let mut m = HashMap::new();
3392                    m.insert("timestamp".to_string(), Value::Int(120));
3393                    m.insert("value".to_string(), Value::Float(30.0));
3394                    m
3395                },
3396            },
3397        ];
3398
3399        // Downsample to 1 minute (60s) buckets
3400        let result = execute_downsample(&docs, "1m", "avg", "timestamp", "value").unwrap();
3401
3402        // Should have 2 buckets: one for ts=0-59, one for ts=120+
3403        assert_eq!(result.len(), 2);
3404    }
3405
3406    #[test]
3407    fn test_window_function() {
3408        let docs = vec![
3409            Document {
3410                id: "1".to_string(),
3411                data: {
3412                    let mut m = HashMap::new();
3413                    m.insert("value".to_string(), Value::Float(10.0));
3414                    m
3415                },
3416            },
3417            Document {
3418                id: "2".to_string(),
3419                data: {
3420                    let mut m = HashMap::new();
3421                    m.insert("value".to_string(), Value::Float(20.0));
3422                    m
3423                },
3424            },
3425            Document {
3426                id: "3".to_string(),
3427                data: {
3428                    let mut m = HashMap::new();
3429                    m.insert("value".to_string(), Value::Float(30.0));
3430                    m
3431                },
3432            },
3433        ];
3434
3435        // Moving average with window size 2
3436        let result = execute_window_function(&docs, "value", "avg", 2).unwrap();
3437        assert_eq!(result.len(), 3);
3438
3439        // First value: avg of [10] = 10
3440        assert_eq!(result[0].data.get("avg_window"), Some(&Value::Float(10.0)));
3441        // Second value: avg of [10, 20] = 15
3442        assert_eq!(result[1].data.get("avg_window"), Some(&Value::Float(15.0)));
3443        // Third value: avg of [20, 30] = 25
3444        assert_eq!(result[2].data.get("avg_window"), Some(&Value::Float(25.0)));
3445    }
3446
3447    #[tokio::test]
3448    async fn test_lookup_integration_with_schema() {
3449        use crate::Aurora;
3450        use crate::AuroraConfig;
3451        use tempfile::TempDir;
3452
3453        let temp_dir = TempDir::new().unwrap();
3454        let db_path = temp_dir.path().join("lookup_test.db");
3455
3456        let config = AuroraConfig {
3457            db_path,
3458            enable_write_buffering: false,
3459            durability_mode: crate::DurabilityMode::Synchronous,
3460            ..Default::default()
3461        };
3462        let db = Aurora::with_config(config).unwrap();
3463
3464        // 1. Create users schema first
3465        let define_users = r#"
3466            schema {
3467                define collection users if not exists {
3468                    userId: String
3469                    name: String
3470                }
3471            }
3472        "#;
3473        execute(&db, define_users, ExecutionOptions::new())
3474            .await
3475            .unwrap();
3476
3477        // 2. Create orders schema
3478        let define_orders = r#"
3479            schema {
3480                define collection orders if not exists {
3481                    orderId: String
3482                    userId: String
3483                    total: Int
3484                }
3485            }
3486        "#;
3487        execute(&db, define_orders, ExecutionOptions::new())
3488            .await
3489            .unwrap();
3490
3491        // 3. Insert user
3492        let insert_user = r#"
3493            mutation {
3494                insertInto(collection: "users", data: {
3495                    userId: "user1",
3496                    name: "Alice"
3497                }) { id userId name }
3498            }
3499        "#;
3500        let user_result = execute(&db, insert_user, ExecutionOptions::new())
3501            .await
3502            .unwrap();
3503
3504        let user_doc = match user_result {
3505            ExecutionResult::Mutation(res) => {
3506                assert_eq!(res.affected_count, 1);
3507                res.returned_documents[0].clone()
3508            }
3509            _ => panic!("Expected mutation result"),
3510        };
3511
3512        // 4. Insert orders
3513        let insert_order1 = r#"
3514            mutation {
3515                insertInto(collection: "orders", data: {
3516                    orderId: "order1",
3517                    userId: "user1",
3518                    total: 100
3519                }) { id }
3520            }
3521        "#;
3522        execute(&db, insert_order1, ExecutionOptions::new())
3523            .await
3524            .unwrap();
3525
3526        let insert_order2 = r#"
3527            mutation {
3528                insertInto(collection: "orders", data: {
3529                    orderId: "order2",
3530                    userId: "user1",
3531                    total: 250
3532                }) { id }
3533            }
3534        "#;
3535        execute(&db, insert_order2, ExecutionOptions::new())
3536            .await
3537            .unwrap();
3538
3539        // 5. Verify orders exist via query
3540        let query = r#"query { orders { orderId userId total } }"#;
3541        let result = execute(&db, query, ExecutionOptions::new()).await.unwrap();
3542        match result {
3543            ExecutionResult::Query(res) => {
3544                assert_eq!(res.documents.len(), 2, "Should have 2 orders");
3545            }
3546            _ => panic!("Expected query result"),
3547        }
3548
3549        // 6. Test lookup function
3550        let lookup = ast::LookupSelection {
3551            collection: "orders".to_string(),
3552            local_field: "userId".to_string(),
3553            foreign_field: "userId".to_string(),
3554            filter: None,
3555            selection_set: vec![],
3556        };
3557
3558        let lookup_result =
3559            execute_lookup(&db, &user_doc, &lookup, &HashMap::new(), &HashMap::new())
3560                .await
3561                .unwrap();
3562        if let Value::Array(found_orders) = lookup_result {
3563            assert_eq!(found_orders.len(), 2, "Should find 2 orders for user1");
3564        } else {
3565            panic!("Expected array result from lookup");
3566        }
3567    }
3568
3569    #[tokio::test]
3570    async fn test_sdl_integration() {
3571        use crate::Aurora;
3572        use crate::AuroraConfig;
3573        use tempfile::TempDir;
3574
3575        // Setup
3576        let temp_dir = TempDir::new().unwrap();
3577        let db_path = temp_dir.path().join("test_sdl.db");
3578
3579        let config = AuroraConfig {
3580            db_path,
3581            enable_write_buffering: false,
3582            durability_mode: crate::DurabilityMode::Synchronous,
3583            ..Default::default()
3584        };
3585        let db = Aurora::with_config(config).unwrap();
3586
3587        // 1. Define Collection Schema
3588        let define_schema = r#"
3589            schema {
3590                define collection products if not exists {
3591                    name: String @unique
3592                    price: Float @indexed
3593                    category: String
3594                }
3595            }
3596        "#;
3597
3598        let result = execute(&db, define_schema, ExecutionOptions::new())
3599            .await
3600            .unwrap();
3601        match result {
3602            ExecutionResult::Schema(res) => {
3603                assert_eq!(res.status, "created");
3604                assert_eq!(res.collection, "products");
3605            }
3606            _ => panic!("Expected schema result"),
3607        }
3608
3609        // Verify schema is persisted (indirectly via duplicate creation attempt)
3610        let result = execute(&db, define_schema, ExecutionOptions::new())
3611            .await
3612            .unwrap();
3613        match result {
3614            ExecutionResult::Schema(res) => {
3615                // With IF NOT EXISTS, the second attempt should return "skipped (exists)"
3616                // but both "created" (race) and "skipped (exists)" are acceptable
3617                assert!(
3618                    res.status == "skipped (exists)" || res.status == "created",
3619                    "Unexpected status: {}",
3620                    res.status
3621                );
3622            }
3623            _ => panic!("Expected schema result for duplicate"),
3624        }
3625
3626        // 2. Alter Collection
3627        let alter_schema = r#"
3628            schema {
3629                alter collection products {
3630                    add stock: Int @indexed
3631                }
3632            }
3633        "#;
3634
3635        let result = execute(&db, alter_schema, ExecutionOptions::new())
3636            .await
3637            .unwrap();
3638        match result {
3639            ExecutionResult::Schema(res) => {
3640                assert_eq!(res.status, "modified");
3641            }
3642            _ => panic!("Expected schema result for alter"),
3643        }
3644
3645        // 2b. Rename Field
3646        let rename_schema = r#"
3647            schema {
3648                alter collection products {
3649                    rename category to cat
3650                }
3651            }
3652        "#;
3653        let result = execute(&db, rename_schema, ExecutionOptions::new())
3654            .await
3655            .unwrap();
3656        match result {
3657            ExecutionResult::Schema(res) => {
3658                assert_eq!(res.status, "modified");
3659            }
3660            _ => panic!("Expected schema result for rename"),
3661        }
3662
3663        // 2c. Modify Field
3664        let modify_schema = r#"
3665            schema {
3666                alter collection products {
3667                    modify price: Float
3668                }
3669            }
3670        "#;
3671        let result = execute(&db, modify_schema, ExecutionOptions::new())
3672            .await
3673            .unwrap();
3674        match result {
3675            ExecutionResult::Schema(res) => {
3676                assert_eq!(res.status, "modified");
3677            }
3678            _ => panic!("Expected schema result for modify"),
3679        }
3680
3681        // 3. Migration
3682        let migration = r#"
3683            migrate {
3684                 "v1": {
3685                     alter collection products {
3686                         add description: String
3687                     }
3688                 }
3689            }
3690        "#;
3691
3692        let result = execute(&db, migration, ExecutionOptions::new())
3693            .await
3694            .unwrap();
3695        match result {
3696            ExecutionResult::Migration(res) => {
3697                assert_eq!(res.steps_applied, 1);
3698            }
3699            _ => panic!("Expected migration result"),
3700        }
3701
3702        // Migration idempotency: running the same migration again should skip applied versions
3703        let result = execute(&db, migration, ExecutionOptions::new())
3704            .await
3705            .unwrap();
3706        match result {
3707            ExecutionResult::Migration(res) => {
3708                // Second run should skip the already-applied migration
3709                assert_eq!(
3710                    res.steps_applied, 0,
3711                    "Migration should be idempotent - version already applied"
3712                );
3713            }
3714            _ => panic!("Expected Migration result for idempotency check"),
3715        }
3716
3717        // 4. Drop Collection
3718        let drop_schema = r#"
3719            schema {
3720                drop collection products
3721            }
3722        "#;
3723
3724        let result = execute(&db, drop_schema, ExecutionOptions::new())
3725            .await
3726            .unwrap();
3727        match result {
3728            ExecutionResult::Schema(res) => {
3729                assert_eq!(res.status, "dropped");
3730            }
3731            _ => panic!("Expected schema result for drop"),
3732        }
3733    }
3734
3735    #[tokio::test]
3736    async fn test_dynamic_variable_resolution() {
3737        use crate::Aurora;
3738        use tempfile::TempDir;
3739
3740        // Setup
3741        let temp_dir = TempDir::new().unwrap();
3742        // Setup
3743
3744        let db_path = temp_dir.path().join("test_dynamic.db");
3745
3746        // Use synchronous config to ensure writes are visible immediately
3747        let config = crate::AuroraConfig {
3748            db_path,
3749            enable_write_buffering: false,
3750            durability_mode: crate::DurabilityMode::Synchronous,
3751            ..Default::default()
3752        };
3753        let db = Aurora::with_config(config).unwrap();
3754
3755        // Create collection schemas
3756        db.new_collection(
3757            "users",
3758            vec![
3759                ("name", crate::FieldType::String, false),
3760                ("profile", crate::FieldType::Any, false),
3761            ],
3762        )
3763        .await
3764        .unwrap();
3765
3766        db.new_collection(
3767            "orders",
3768            vec![
3769                ("user_id", crate::FieldType::String, false),
3770                ("theme", crate::FieldType::String, false),
3771            ],
3772        )
3773        .await
3774        .unwrap();
3775
3776        db.new_collection(
3777            "user_settings",
3778            vec![
3779                ("user_id", crate::FieldType::String, false),
3780                ("theme", crate::FieldType::String, false),
3781            ],
3782        )
3783        .await
3784        .unwrap();
3785
3786        // Initialize workers for job test
3787
3788        let mutation = r#"
3789            mutation DynamicFlow {
3790                user: insertInto(collection: "users", data: { 
3791                    name: "John", 
3792                    profile: { settings: { theme: "dark" } } 
3793                }) {
3794                    id
3795                    name
3796                    profile
3797                }
3798                
3799                order: insertInto(collection: "orders", data: { 
3800                    user_id: "$user.id",
3801                    theme: "$user.profile.settings.theme"
3802                }) {
3803                    id
3804                    user_id
3805                    theme
3806                }
3807                
3808                job: enqueueJob(
3809                    jobType: "send_email",
3810                    payload: {
3811                        orderId: "$order.id",
3812                        userId: "$order.user_id",
3813                        theme: "$order.theme"
3814                    }
3815                )
3816            }
3817        "#;
3818
3819        let result = execute(&db, mutation, ExecutionOptions::new())
3820            .await
3821            .unwrap();
3822
3823        match result {
3824            ExecutionResult::Mutation(_res) => {
3825                // Multi-op mutations return Batch, not single Mutation
3826                panic!("Expected Batch result for multi-op mutation, got Mutation");
3827            }
3828            ExecutionResult::Batch(results) => {
3829                assert_eq!(results.len(), 3);
3830
3831                // Verify data in collections
3832                // 1. User
3833                let users = db.aql_get_all_collection("users").await.unwrap();
3834                assert_eq!(users.len(), 1);
3835                let user_id = &users[0].id;
3836
3837                // 2. Order
3838                let orders = db.aql_get_all_collection("orders").await.unwrap();
3839                assert_eq!(orders.len(), 1);
3840
3841                // Verify resolved values in order
3842                let order_doc = &orders[0];
3843                assert_eq!(
3844                    order_doc.data.get("user_id"),
3845                    Some(&Value::String(user_id.clone()))
3846                );
3847                assert_eq!(
3848                    order_doc.data.get("theme"),
3849                    Some(&Value::String("dark".to_string()))
3850                );
3851            }
3852            _ => panic!("Expected Batch result"),
3853        }
3854    }
3855}