1use super::ast::{self, Field, Filter as AqlFilter, FragmentDef, MutationOp, Operation, Selection};
7use super::executor_utils::{CompiledFilter, compile_filter};
8
9use crate::Aurora;
10use crate::error::{AqlError, ErrorCode, Result};
11use crate::types::{Document, FieldDefinition, FieldType, ScalarType, Value};
12use serde::Serialize;
13use serde_json::Value as JsonValue;
14use std::collections::HashMap;
15use std::sync::Arc;
16
17#[allow(unused_imports)]
19use chrono::TimeZone as _;
20
21pub type ExecutionContext = HashMap<String, JsonValue>;
22
23#[derive(Debug, Clone)]
25pub struct QueryPlan {
26 pub collection: String,
27 pub filter: Option<ast::Filter>,
28 pub compiled_filter: Option<CompiledFilter>,
29 pub projection: Vec<ast::Field>,
30 pub limit: Option<usize>,
31 pub offset: usize,
32 pub after: Option<String>,
33 pub orderings: Vec<ast::Ordering>,
34 pub is_connection: bool,
35 pub has_lookups: bool,
36 pub fragments: HashMap<String, FragmentDef>,
37 pub variable_definitions: Vec<ast::VariableDefinition>,
38 pub directives: Vec<ast::Directive>,
39 pub lookup_dependencies: Vec<String>,
41}
42
43impl QueryPlan {
44 pub fn validate(&self, provided_variables: &HashMap<String, ast::Value>) -> Result<()> {
45 validate_required_variables(&self.variable_definitions, provided_variables)
46 }
47
48 pub fn from_query(
49 db: &Aurora,
50 query: &ast::Query,
51 fragments: &HashMap<String, FragmentDef>,
52 variables: &HashMap<String, ast::Value>,
53 ) -> Result<Vec<Self>> {
54 let root_fields = collect_fields(&query.selection_set, fragments, variables, None)?;
55
56 let mut plans = Vec::new();
57 for field in root_fields {
58 let collection = field.name.clone();
59 let filter = extract_filter_from_args(&field.arguments)?;
60 let compiled_filter = if let Some(ref f) = filter {
61 Some(compile_filter(f)?)
62 } else {
63 None
64 };
65
66 let sub_fields = collect_fields(
67 &field.selection_set,
68 fragments,
69 variables,
70 Some(&field.name),
71 )?;
72
73 let (limit, offset) = extract_pagination(&field.arguments);
74 let (first, after, _last, _before) = extract_cursor_pagination(&field.arguments);
75 let orderings = extract_order_by(&field.arguments);
76 let is_connection = sub_fields
77 .iter()
78 .any(|f| f.name == "edges" || f.name == "pageInfo");
79
80 let lookup_dependencies =
81 identify_lookup_dependencies(db, &collection, &sub_fields, variables);
82 let has_lookups = !lookup_dependencies.is_empty();
83
84 plans.push(QueryPlan {
85 collection,
86 filter,
87 compiled_filter,
88 projection: sub_fields,
89 limit: limit.or(first),
90 offset,
91 after,
92 orderings,
93 is_connection,
94 has_lookups,
95 fragments: fragments.clone(),
96 variable_definitions: query.variable_definitions.clone(),
97 directives: field.directives.clone(),
98 lookup_dependencies,
99 });
100 }
101 Ok(plans)
102 }
103}
104
105pub async fn execute(db: &Aurora, aql: &str, options: ExecutionOptions) -> Result<ExecutionResult> {
107 use std::collections::hash_map::DefaultHasher;
108 use std::hash::{Hash, Hasher};
109
110 let query_key = {
112 let mut hasher = DefaultHasher::new();
113 aql.trim().hash(&mut hasher);
114 hasher.finish()
115 };
116
117 let vars: HashMap<String, ast::Value> = options.variables.clone();
119
120 if let Some(plan) = db.plan_cache.get(&query_key) {
123 plan.validate(&vars)?;
124 return execute_plan(db, &plan, &vars, &options).await;
125 }
126
127 let mut doc = super::parse(aql)?;
129 println!("Parsed document with {} operations", doc.operations.len());
130
131 if let Some(Operation::Query(query)) = doc
133 .operations
134 .iter()
135 .find(|op| matches!(op, Operation::Query(_)))
136 {
137 let fragments: HashMap<String, FragmentDef> = doc
138 .operations
139 .iter()
140 .filter_map(|op| {
141 if let Operation::FragmentDefinition(f) = op {
142 Some((f.name.clone(), f.clone()))
143 } else {
144 None
145 }
146 })
147 .collect();
148
149 let root_fields = collect_fields(&query.selection_set, &fragments, &vars, None)?;
154 let has_search = root_fields
155 .iter()
156 .any(|f| f.arguments.iter().any(|a| a.name == "search"));
157
158 if !has_search {
159 let plans = QueryPlan::from_query(db, query, &fragments, &vars)?;
160 if plans.len() == 1 {
161 let plan = Arc::new(plans[0].clone());
162 plan.validate(&vars)?;
163 db.plan_cache.insert(query_key, Arc::clone(&plan));
164
165 return execute_plan(db, &plan, &vars, &options).await;
166 }
167 }
168 }
169
170 let mut vars = vars;
173 for op in &doc.operations {
174 if let Operation::Query(q) = op {
175 for var_def in &q.variable_definitions {
176 if !vars.contains_key(&var_def.name) {
177 if let Some(default) = &var_def.default_value {
178 vars.insert(var_def.name.clone(), default.clone());
179 }
180 }
181 }
182 }
183 }
184
185 super::validator::resolve_variables(&mut doc, &vars).map_err(|e| {
187 let code = match e.code {
188 super::validator::ErrorCode::MissingRequiredVariable => ErrorCode::UndefinedVariable,
189 super::validator::ErrorCode::TypeMismatch => ErrorCode::TypeError,
190 _ => ErrorCode::QueryError,
191 };
192 AqlError::new(code, e.to_string())
193 })?;
194
195 execute_document(db, &doc, &options).await
196}
197
198pub async fn execute_plan(
199 db: &Aurora,
200 plan: &QueryPlan,
201 variables: &HashMap<String, ast::Value>,
202 options: &ExecutionOptions,
203) -> Result<ExecutionResult> {
204 check_permissions(&plan.directives, options, false)?;
206
207 let collection_name = &plan.collection;
208
209 let effective_limit = plan.limit;
211
212 let mut indexed_docs = None;
214 if let Some(ref f) = plan.filter {
215 if let Some((field, val)) =
217 find_indexed_equality_filter_runtime(f, db, collection_name, variables)
218 {
219 let index_ready = field == "_sid" || !db.is_index_building(collection_name, &field);
222 if index_ready {
223 let mut db_val = aql_value_to_db_value(&val, variables)?;
224
225 if let Ok(col_def) = db.get_collection_definition(collection_name) {
227 if let Some(f_def) = col_def.fields.get(&field) {
228 db_val = db_val.coerce_to(&f_def.field_type);
229 }
230 }
231
232 let ids = if field == "_sid" {
233 match &db_val {
234 Value::String(s) => vec![s.clone()],
235 Value::Uuid(u) => vec![u.to_string()],
236 _ => vec![],
237 }
238 } else {
239 db.get_ids_from_index(collection_name, &field, &db_val)
240 };
241
242 let mut docs = Vec::with_capacity(ids.len());
243 for id in ids {
244 if let Some(doc) = db.get_document(collection_name, &id)? {
245 if let Some(ref cf) = plan.compiled_filter {
247 if matches_filter(&doc, cf, variables) {
248 docs.push(doc);
249 }
250 } else {
251 docs.push(doc);
252 }
253 }
254 }
255
256 indexed_docs = Some(docs);
257 } }
259 }
260
261 let mut docs = if let Some(d) = indexed_docs {
262 d
263 } else {
264 let vars_arc = Arc::new(variables.clone());
266 let cf_clone = plan.compiled_filter.clone();
267 let filter_fn = move |doc: &Document| {
268 cf_clone
269 .as_ref()
270 .map(|f| matches_filter(doc, f, &vars_arc))
271 .unwrap_or(true)
272 };
273
274 let scan_limit = if plan.after.is_some() || !plan.orderings.is_empty() {
277 None
278 } else {
279 plan.limit.map(|l| {
280 let base = if plan.is_connection { l + 1 } else { l };
281 base + plan.offset
282 })
283 };
284 db.scan_and_filter(collection_name, filter_fn, scan_limit)?
285 };
286
287 if !plan.orderings.is_empty() {
289 apply_ordering(&mut docs, &plan.orderings);
290 }
291
292 if let Some(ref cursor) = plan.after {
294 if let Some(pos) = docs.iter().position(|d| &d._sid == cursor) {
295 docs.drain(0..=pos);
296 }
297 }
298
299 if plan.offset > 0 {
301 if plan.offset < docs.len() {
302 docs.drain(0..plan.offset);
303 } else {
304 docs.clear();
305 }
306 }
307
308 if plan.is_connection {
310 return Ok(ExecutionResult::Query(execute_connection(
311 docs,
312 &plan.projection,
313 effective_limit,
314 &plan.fragments,
315 variables,
316 options,
317 )?));
318 }
319
320 if let Some(l) = effective_limit {
322 docs.truncate(l);
323 }
324
325 if options.apply_projections && !plan.projection.is_empty() {
327 let agg_field = plan.projection.iter().find(|f| f.name == "aggregate");
329 let group_by_field = plan.projection.iter().find(|f| f.name == "groupBy");
330
331 if let Some(f) = group_by_field {
332 let field_name = f
334 .arguments
335 .iter()
336 .find(|a| a.name == "field")
337 .and_then(|a| match &a.value {
338 ast::Value::String(s) => Some(s),
339 _ => None,
340 });
341
342 if let Some(group_field) = field_name {
343 let mut groups: HashMap<String, Vec<Document>> = HashMap::new();
344 for d in docs {
345 let key = d
346 .data
347 .get(group_field)
348 .map(|v| match v {
349 Value::String(s) => s.clone(),
350 _ => v.to_string(),
351 })
352 .unwrap_or_else(|| "null".to_string());
353 groups.entry(key).or_default().push(d);
354 }
355
356 let mut group_docs = Vec::with_capacity(groups.len());
357 for (key, group_items) in groups {
358 let mut data = HashMap::new();
359 for selection in &f.selection_set {
360 if let Selection::Field(sub_f) = selection {
361 let alias = sub_f.alias.as_ref().unwrap_or(&sub_f.name);
362 match sub_f.name.as_str() {
363 "key" => {
364 data.insert(alias.clone(), Value::String(key.clone()));
365 }
366 "count" => {
367 data.insert(
368 alias.clone(),
369 Value::Int(group_items.len() as i64),
370 );
371 }
372 "nodes" => {
373 let sub_fields = collect_fields(
374 &sub_f.selection_set,
375 &plan.fragments,
376 variables,
377 None,
378 )
379 .unwrap_or_default();
380
381 let mut projected_nodes = Vec::with_capacity(group_items.len());
382 for d in &group_items {
383 let node_doc =
384 apply_projection(d.clone(), &sub_fields, options)?;
385 projected_nodes.push(Value::Object(node_doc.data));
386 }
387 data.insert(alias.clone(), Value::Array(projected_nodes));
388 }
389 "aggregate" => {
390 let agg_res =
391 compute_aggregates(&group_items, &sub_f.selection_set);
392 data.insert(alias.clone(), Value::Object(agg_res));
393 }
394 _ => {}
395 }
396 }
397 }
398 group_docs.push(Document {
399 _sid: format!("group:{}", key),
400 data,
401 });
402 }
403 docs = group_docs;
404 }
405 } else if let Some(agg) = agg_field {
406 let agg_result = compute_aggregates(&docs, &agg.selection_set);
410 let alias = agg.alias.as_ref().unwrap_or(&agg.name);
411
412 if plan.projection.len() == 1 {
413 let mut data = HashMap::new();
414 data.insert(alias.clone(), Value::Object(agg_result));
415 docs = vec![Document {
416 _sid: "aggregate".to_string(),
417 data,
418 }];
419 } else {
420 let mut projected = Vec::with_capacity(docs.len());
422 for mut d in docs {
423 d.data
424 .insert(alias.clone(), Value::Object(agg_result.clone()));
425 projected.push(apply_projection(d, &plan.projection, options)?);
426 }
427 docs = projected;
428 }
429 } else if !plan.has_lookups {
430 let mut projected = Vec::with_capacity(docs.len());
431 for d in docs {
432 projected.push(apply_projection(d, &plan.projection, options)?);
433 }
434 docs = projected;
435 } else {
436 let mut projected = Vec::with_capacity(docs.len());
438 for d in docs {
439 let (proj_doc, _) = apply_projection_with_lookups(
440 db,
441 d,
442 collection_name,
443 &plan.projection,
444 &plan.fragments,
445 variables,
446 options,
447 &plan.lookup_dependencies,
448 )
449 .await?;
450 projected.push(proj_doc);
451 }
452 docs = projected;
453 }
454 }
455
456 Ok(ExecutionResult::Query(QueryResult {
457 collection: collection_name.clone(),
458 documents: docs,
459 total_count: None,
460 deferred_fields: vec![],
461 explain: None,
462 }))
463}
464
465fn compute_aggregates(docs: &[Document], selections: &[Selection]) -> HashMap<String, Value> {
466 let mut results = HashMap::new();
467
468 for selection in selections {
469 if let Selection::Field(f) = selection {
470 let alias = f.alias.as_ref().unwrap_or(&f.name);
471 let value = match f.name.as_str() {
472 "count" => Value::Int(docs.len() as i64),
473 "sum" => {
474 let field =
475 f.arguments
476 .iter()
477 .find(|a| a.name == "field")
478 .and_then(|a| match &a.value {
479 ast::Value::String(s) => Some(s),
480 _ => None,
481 });
482
483 if let Some(field_name) = field {
484 let sum: f64 = docs
485 .iter()
486 .filter_map(|d| d.data.get(field_name))
487 .filter_map(|v| match v {
488 Value::Int(i) => Some(*i as f64),
489 Value::Float(f) => Some(*f),
490 _ => None,
491 })
492 .sum();
493 Value::Float(sum)
494 } else {
495 Value::Null
496 }
497 }
498 "avg" => {
499 let field =
500 f.arguments
501 .iter()
502 .find(|a| a.name == "field")
503 .and_then(|a| match &a.value {
504 ast::Value::String(s) => Some(s),
505 _ => None,
506 });
507
508 if let Some(field_name) = field
509 && !docs.is_empty()
510 {
511 let values: Vec<f64> = docs
512 .iter()
513 .filter_map(|d| d.data.get(field_name))
514 .filter_map(|v| match v {
515 Value::Int(i) => Some(*i as f64),
516 Value::Float(f) => Some(*f),
517 _ => None,
518 })
519 .collect();
520
521 if values.is_empty() {
522 Value::Null
523 } else {
524 let sum: f64 = values.iter().sum();
525 Value::Float(sum / values.len() as f64)
526 }
527 } else {
528 Value::Null
529 }
530 }
531 "min" => {
532 let field =
533 f.arguments
534 .iter()
535 .find(|a| a.name == "field")
536 .and_then(|a| match &a.value {
537 ast::Value::String(s) => Some(s),
538 _ => None,
539 });
540
541 if let Some(field_name) = field
542 && !docs.is_empty()
543 {
544 let min = docs
545 .iter()
546 .filter_map(|d| d.data.get(field_name))
547 .filter_map(|v| match v {
548 Value::Int(i) => Some(*i as f64),
549 Value::Float(f) => Some(*f),
550 _ => None,
551 })
552 .fold(f64::INFINITY, f64::min);
553
554 if min == f64::INFINITY {
555 Value::Null
556 } else {
557 Value::Float(min)
558 }
559 } else {
560 Value::Null
561 }
562 }
563 "max" => {
564 let field =
565 f.arguments
566 .iter()
567 .find(|a| a.name == "field")
568 .and_then(|a| match &a.value {
569 ast::Value::String(s) => Some(s),
570 _ => None,
571 });
572
573 if let Some(field_name) = field
574 && !docs.is_empty()
575 {
576 let max = docs
577 .iter()
578 .filter_map(|d| d.data.get(field_name))
579 .filter_map(|v| match v {
580 Value::Int(i) => Some(*i as f64),
581 Value::Float(f) => Some(*f),
582 _ => None,
583 })
584 .fold(f64::NEG_INFINITY, f64::max);
585
586 if max == f64::NEG_INFINITY {
587 Value::Null
588 } else {
589 Value::Float(max)
590 }
591 } else {
592 Value::Null
593 }
594 }
595 _ => Value::Null,
596 };
597 results.insert(alias.clone(), value);
598 }
599 }
600
601 results
602}
603
604fn find_indexed_equality_filter_runtime(
605 filter: &ast::Filter,
606 db: &Aurora,
607 collection: &str,
608 variables: &HashMap<String, ast::Value>,
609) -> Option<(String, ast::Value)> {
610 match filter {
611 ast::Filter::Eq(field, val) => {
612 if field == "_sid" || db.has_index(collection, field) {
613 let resolved = resolve_if_variable(val, variables);
614 return Some((field.clone(), resolved.clone()));
615 }
616 }
617 ast::Filter::And(filters) => {
618 for f in filters {
619 if let Some(res) =
620 find_indexed_equality_filter_runtime(f, db, collection, variables)
621 {
622 return Some(res);
623 }
624 }
625 }
626 _ => {}
627 }
628 None
629}
630
631fn collect_fields(
633 selection_set: &[Selection],
634 fragments: &HashMap<String, FragmentDef>,
635 variable_values: &HashMap<String, ast::Value>,
636 parent_type: Option<&str>,
637) -> Result<Vec<Field>> {
638 let mut fields = Vec::new();
639
640 for selection in selection_set {
641 match selection {
642 Selection::Field(field) => {
643 if should_include(&field.directives, variable_values)? {
644 fields.push(field.clone());
645 }
646 }
647 Selection::FragmentSpread(name) => {
648 if let Some(fragment) = fragments.get(name) {
649 let type_match = if let Some(parent) = parent_type {
650 parent == fragment.type_condition
651 } else {
652 true
653 };
654
655 if type_match {
656 let fragment_fields = collect_fields(
657 &fragment.selection_set,
658 fragments,
659 variable_values,
660 parent_type,
661 )?;
662 fields.extend(fragment_fields);
663 }
664 }
665 }
666 Selection::InlineFragment(inline) => {
667 let type_match = if let Some(parent) = parent_type {
668 parent == inline.type_condition
669 } else {
670 true
671 };
672
673 if type_match {
674 let inline_fields = collect_fields(
675 &inline.selection_set,
676 fragments,
677 variable_values,
678 parent_type,
679 )?;
680 fields.extend(inline_fields);
681 }
682 }
683 Selection::ComputedField(cf) => {
684 let (expr_val, _complex_expr) = match &cf.expression {
685 ast::ComputedExpression::TemplateString(s) => (s.clone(), None),
686 _ => (
687 "complex".to_string(),
688 Some(ast::Expression::Literal(ast::Value::Null)),
689 ), };
691
692 let mut field = Field {
693 alias: Some(cf.alias.clone()),
694 name: "__compute__".to_string(),
695 arguments: vec![ast::Argument {
696 name: "expr".to_string(),
697 value: ast::Value::String(expr_val),
698 }],
699 directives: Vec::new(),
700 selection_set: Vec::new(),
701 computed_expression: None,
702 };
703
704 if let ast::ComputedExpression::TemplateString(s) = &cf.expression {
705 field.arguments[0].value = ast::Value::String(s.clone());
706 }
707 match &cf.expression {
708 ast::ComputedExpression::TemplateString(_) => {}
709 ast::ComputedExpression::StandardExpression(e) => {
710 field.computed_expression = Some(e.clone());
711 }
712 ast::ComputedExpression::FunctionCall { name, args } => {
713 field.computed_expression = Some(ast::Expression::FunctionCall {
714 name: name.clone(),
715 args: args.clone(),
716 });
717 }
718 ast::ComputedExpression::PipeExpression { .. } => {
719 }
721 ast::ComputedExpression::SqlExpression(s) => {
722 field.computed_expression = Some(ast::Expression::Literal(
723 ast::Value::String(format!("SQL: {}", s)),
724 ));
725 }
726 ast::ComputedExpression::AggregateFunction { .. } => {
727 }
729 }
730
731 fields.push(field);
732 }
733 }
734 }
735 Ok(fields)
736}
737
738fn should_include(
740 directives: &[ast::Directive],
741 variables: &HashMap<String, ast::Value>,
742) -> Result<bool> {
743 for dir in directives {
744 if dir.name == "skip" {
745 if let Some(arg) = dir.arguments.iter().find(|a| a.name == "if") {
746 let should_skip = resolve_boolean_arg(&arg.value, variables)?;
747 if should_skip {
748 return Ok(false);
749 }
750 }
751 } else if dir.name == "include" {
752 if let Some(arg) = dir.arguments.iter().find(|a| a.name == "if") {
753 let should_include = resolve_boolean_arg(&arg.value, variables)?;
754 if !should_include {
755 return Ok(false);
756 }
757 }
758 }
759 }
760 Ok(true)
761}
762
763fn check_permissions(
766 directives: &[ast::Directive],
767 options: &ExecutionOptions,
768 is_write: bool,
769) -> Result<()> {
770 for directive in directives {
771 match directive.name.as_str() {
772 "auth" => {
773 if options.user_role.is_none() {
774 return Err(AqlError::new(
775 ErrorCode::Unauthorized,
776 "Authentication required for this operation".to_string(),
777 ));
778 }
779 }
780 "require" | "allow" => {
781 let required_role = directive
782 .arguments
783 .iter()
784 .find(|a| {
785 if is_write {
786 a.name == "write" || a.name == "role"
787 } else {
788 a.name == "read" || a.name == "role"
789 }
790 })
791 .and_then(|a| {
792 if let ast::Value::String(s) = &a.value {
793 Some(s.as_str())
794 } else {
795 None
796 }
797 });
798
799 if let Some(role) = required_role {
800 if options.user_role.as_deref() != Some(role) {
801 return Err(AqlError::new(
802 ErrorCode::Forbidden,
803 format!("Role '{}' required for this operation", role),
804 ));
805 }
806 }
807 }
808 _ => {}
809 }
810 }
811 Ok(())
812}
813
814fn resolve_boolean_arg(
815 value: &ast::Value,
816 variables: &HashMap<String, ast::Value>,
817) -> Result<bool> {
818 match value {
819 ast::Value::Boolean(b) => Ok(*b),
820 ast::Value::Variable(name) => {
821 if let Some(val) = variables.get(name) {
822 match val {
823 ast::Value::Boolean(b) => Ok(*b),
824 _ => Err(AqlError::new(
825 ErrorCode::TypeError,
826 format!("Variable '{}' is not a boolean, got {:?}", name, val),
827 )),
828 }
829 } else {
830 Err(AqlError::new(
831 ErrorCode::UndefinedVariable,
832 format!("Variable '{}' is not defined", name),
833 ))
834 }
835 }
836 _ => Err(AqlError::new(
837 ErrorCode::TypeError,
838 format!("Expected boolean value, got {:?}", value),
839 )),
840 }
841}
842
843fn validate_required_variables(
845 variable_definitions: &[ast::VariableDefinition],
846 provided_variables: &HashMap<String, ast::Value>,
847) -> Result<()> {
848 for var_def in variable_definitions {
849 if var_def.var_type.is_required {
850 if !provided_variables.contains_key(&var_def.name) {
851 if var_def.default_value.is_none() {
852 return Err(AqlError::new(
853 ErrorCode::UndefinedVariable,
854 format!(
855 "Required variable '{}' (type: {}{}) is not provided",
856 var_def.name,
857 var_def.var_type.name,
858 if var_def.var_type.is_required {
859 "!"
860 } else {
861 ""
862 }
863 ),
864 ));
865 }
866 }
867 }
868 }
869 Ok(())
870}
871
872#[derive(Debug)]
877pub enum ExecutionResult {
878 Query(QueryResult),
880 Mutation(MutationResult),
882 Subscription(SubscriptionResult),
884 Batch(Vec<ExecutionResult>),
886 Schema(SchemaResult),
888 Migration(MigrationResult),
890}
891
892impl ExecutionResult {
893 pub fn as_query(&self) -> Option<&QueryResult> {
895 if let Self::Query(q) = self {
896 Some(q)
897 } else {
898 None
899 }
900 }
901
902 pub fn into_query(self) -> Option<QueryResult> {
904 if let Self::Query(q) = self {
905 Some(q)
906 } else {
907 None
908 }
909 }
910
911 pub fn as_mutation(&self) -> Option<&MutationResult> {
913 if let Self::Mutation(m) = self {
914 Some(m)
915 } else {
916 None
917 }
918 }
919
920 pub fn into_mutation(self) -> Option<MutationResult> {
922 if let Self::Mutation(m) = self {
923 Some(m)
924 } else {
925 None
926 }
927 }
928
929 pub fn as_subscription(&self) -> Option<&SubscriptionResult> {
931 if let Self::Subscription(s) = self {
932 Some(s)
933 } else {
934 None
935 }
936 }
937
938 pub fn into_subscription(self) -> Option<SubscriptionResult> {
940 if let Self::Subscription(s) = self {
941 Some(s)
942 } else {
943 None
944 }
945 }
946
947 pub fn bind<T: serde::de::DeserializeOwned>(self) -> Result<Vec<T>> {
952 match self {
953 Self::Query(q) => q.bind(),
954 Self::Mutation(m) => m.bind(),
955 Self::Batch(results) => {
956 let mut all = Vec::new();
957 for r in results {
958 all.extend(r.bind()?);
959 }
960 Ok(all)
961 }
962 _ => Err(AqlError::new(
963 ErrorCode::QueryError,
964 "Cannot bind results from schema or migration operations".to_string(),
965 )),
966 }
967 }
968
969 pub fn bind_first<T: serde::de::DeserializeOwned>(self) -> Result<T> {
973 match self {
974 Self::Query(q) => q.bind_first(),
975 Self::Mutation(m) => m.bind_first(),
976 Self::Batch(mut results) => {
977 if results.is_empty() {
978 return Err(AqlError::new(
979 ErrorCode::NotFound,
980 "No documents found to bind".to_string(),
981 ));
982 }
983 results.remove(0).bind_first()
984 }
985 _ => Err(AqlError::new(
986 ErrorCode::QueryError,
987 "Cannot bind results from schema or migration operations".to_string(),
988 )),
989 }
990 }
991}
992
993#[derive(Debug, Clone)]
994pub struct SchemaResult {
995 pub operation: String,
996 pub collection: String,
997 pub status: String,
998}
999
1000#[derive(Debug, Clone)]
1001pub struct MigrationResult {
1002 pub version: String,
1003 pub steps_applied: usize,
1004 pub status: String,
1005}
1006
1007#[derive(Debug, Clone, Serialize)]
1008pub struct ExecutionPlan {
1009 pub operations: Vec<String>,
1010 pub estimated_cost: f64,
1011}
1012
1013#[derive(Debug, Clone)]
1015pub struct QueryResult {
1016 pub collection: String,
1018 pub documents: Vec<Document>,
1020 pub total_count: Option<usize>,
1022 pub deferred_fields: Vec<String>,
1024 pub explain: Option<ExplainResult>,
1026}
1027
1028impl QueryResult {
1029 pub fn bind<T: serde::de::DeserializeOwned>(self) -> Result<Vec<T>> {
1031 self.documents
1032 .into_iter()
1033 .map(|d| d.bind())
1034 .collect::<Result<Vec<T>>>()
1035 }
1036
1037 pub fn bind_first<T: serde::de::DeserializeOwned>(self) -> Result<T> {
1039 self.documents
1040 .into_iter()
1041 .next()
1042 .ok_or_else(|| {
1043 AqlError::new(
1044 ErrorCode::NotFound,
1045 "No documents found to bind".to_string(),
1046 )
1047 })?
1048 .bind()
1049 }
1050}
1051
1052#[derive(Debug, Clone, Default)]
1054pub struct ExplainResult {
1055 pub collection: String,
1057 pub docs_scanned: usize,
1059 pub index_used: bool,
1061 pub elapsed_ms: u128,
1063}
1064
1065#[derive(Debug, Clone)]
1067pub struct MutationResult {
1068 pub operation: String,
1070 pub collection: String,
1072 pub affected_count: usize,
1074 pub returned_documents: Vec<Document>,
1076}
1077
1078impl MutationResult {
1079 pub fn bind<T: serde::de::DeserializeOwned>(self) -> Result<Vec<T>> {
1081 self.returned_documents
1082 .into_iter()
1083 .map(|d| d.bind())
1084 .collect::<Result<Vec<T>>>()
1085 }
1086
1087 pub fn bind_first<T: serde::de::DeserializeOwned>(self) -> Result<T> {
1089 self.returned_documents
1090 .into_iter()
1091 .next()
1092 .ok_or_else(|| {
1093 AqlError::new(
1094 ErrorCode::NotFound,
1095 "No documents found to bind".to_string(),
1096 )
1097 })?
1098 .bind()
1099 }
1100}
1101
1102#[derive(Debug)]
1104pub struct SubscriptionResult {
1105 pub subscription_id: String,
1107 pub collection: String,
1109 pub stream: Option<crate::pubsub::ChangeListener>,
1111}
1112
1113#[derive(Debug, Clone)]
1115pub struct ExecutionOptions {
1116 pub skip_validation: bool,
1117 pub apply_projections: bool,
1118 pub debug_audit: bool,
1119 pub variables: HashMap<String, ast::Value>,
1120 pub user_role: Option<String>,
1122}
1123
1124impl Default for ExecutionOptions {
1125 fn default() -> Self {
1126 Self {
1127 skip_validation: false,
1128 apply_projections: true,
1129 debug_audit: false,
1130 variables: HashMap::new(),
1131 user_role: None,
1132 }
1133 }
1134}
1135
1136impl ExecutionOptions {
1137 pub fn new() -> Self {
1138 Self::default()
1139 }
1140
1141 pub fn with_variables(mut self, vars: HashMap<String, ast::Value>) -> Self {
1142 self.variables = vars;
1143 self
1144 }
1145
1146 pub fn with_role(mut self, role: impl Into<String>) -> Self {
1147 self.user_role = Some(role.into());
1148 self
1149 }
1150
1151 pub fn skip_validation(mut self) -> Self {
1152 self.skip_validation = true;
1153 self
1154 }
1155}
1156
1157pub(crate) fn json_to_aql_value(v: serde_json::Value) -> ast::Value {
1158 match v {
1159 serde_json::Value::Null => ast::Value::Null,
1160 serde_json::Value::Bool(b) => ast::Value::Boolean(b),
1161 serde_json::Value::Number(n) => {
1162 if let Some(i) = n.as_i64() {
1163 ast::Value::Int(i)
1164 } else if let Some(f) = n.as_f64() {
1165 ast::Value::Float(f)
1166 } else {
1167 ast::Value::Null
1168 }
1169 }
1170 serde_json::Value::String(s) => ast::Value::String(s),
1171 serde_json::Value::Array(arr) => {
1172 ast::Value::Array(arr.into_iter().map(json_to_aql_value).collect())
1173 }
1174 serde_json::Value::Object(map) => ast::Value::Object(
1175 map.into_iter()
1176 .map(|(k, v)| (k, json_to_aql_value(v)))
1177 .collect(),
1178 ),
1179 }
1180}
1181
1182pub async fn execute_document(
1184 db: &Aurora,
1185 doc: &ast::Document,
1186 options: &ExecutionOptions,
1187) -> Result<ExecutionResult> {
1188 if doc.operations.is_empty() {
1189 return Err(AqlError::new(
1190 ErrorCode::QueryError,
1191 "No operations in document".to_string(),
1192 ));
1193 }
1194
1195 println!("execute_document: first op: {:?}", doc.operations[0]);
1196
1197 let vars: HashMap<String, ast::Value> = options.variables.clone();
1198
1199 let fragments: HashMap<String, FragmentDef> = doc
1200 .operations
1201 .iter()
1202 .filter_map(|op| {
1203 if let Operation::FragmentDefinition(frag) = op {
1204 Some((frag.name.clone(), frag.clone()))
1205 } else {
1206 None
1207 }
1208 })
1209 .collect();
1210
1211 let executable_ops: Vec<&Operation> = doc
1212 .operations
1213 .iter()
1214 .filter(|op| !matches!(op, Operation::FragmentDefinition(_)))
1215 .collect();
1216
1217 if executable_ops.is_empty() {
1218 return Err(AqlError::new(
1219 ErrorCode::QueryError,
1220 "No executable operations in document".to_string(),
1221 ));
1222 }
1223
1224 if executable_ops.len() == 1 {
1225 execute_operation(db, executable_ops[0], &vars, options, &fragments).await
1226 } else {
1227 let mut results = Vec::new();
1228 for op in executable_ops {
1229 results.push(execute_operation(db, op, &vars, options, &fragments).await?);
1230 }
1231 Ok(ExecutionResult::Batch(results))
1232 }
1233}
1234
1235async fn execute_operation(
1236 db: &Aurora,
1237 op: &Operation,
1238 vars: &HashMap<String, ast::Value>,
1239 options: &ExecutionOptions,
1240 fragments: &HashMap<String, FragmentDef>,
1241) -> Result<ExecutionResult> {
1242 match op {
1243 Operation::Query(query) => execute_query(db, query, vars, options, fragments).await,
1244 Operation::Mutation(mutation) => {
1245 execute_mutation(db, mutation, vars, options, fragments).await
1246 }
1247 Operation::Subscription(sub) => execute_subscription(db, sub, vars, options).await,
1248 Operation::Schema(schema) => execute_schema(db, schema, options).await,
1249 Operation::Migration(migration) => execute_migration(db, migration, options).await,
1250 Operation::Introspection(intro) => execute_introspection(db, intro).await,
1251 Operation::Handler(handler) => execute_handler_registration(db, handler, options).await,
1252 _ => Ok(ExecutionResult::Query(QueryResult {
1253 collection: String::new(),
1254 documents: vec![],
1255 total_count: None,
1256 deferred_fields: vec![],
1257 explain: None,
1258 })),
1259 }
1260}
1261
1262async fn execute_query(
1263 db: &Aurora,
1264 query: &ast::Query,
1265 vars: &HashMap<String, ast::Value>,
1266 options: &ExecutionOptions,
1267 fragments: &HashMap<String, FragmentDef>,
1268) -> Result<ExecutionResult> {
1269 validate_required_variables(&query.variable_definitions, vars)?;
1270 let has_explain = query.directives.iter().any(|d| d.name == "explain");
1271 let root_fields = collect_fields(&query.selection_set, fragments, vars, None)?;
1272 let mut results = Vec::new();
1273 for field in &root_fields {
1274 let sub_fields = collect_fields(&field.selection_set, fragments, vars, Some(&field.name))?;
1275 let start = std::time::Instant::now();
1276 let mut result =
1277 execute_collection_query(db, field, &sub_fields, vars, options, fragments).await?;
1278 if has_explain {
1279 let elapsed_ms = start.elapsed().as_millis();
1280 let index_used =
1281 field.arguments.iter().any(|a| a.name == "where") && !result.documents.is_empty();
1282 result.explain = Some(ExplainResult {
1283 collection: result.collection.clone(),
1284 docs_scanned: result.documents.len(),
1285 index_used,
1286 elapsed_ms,
1287 });
1288 }
1289 results.push(result);
1290 }
1291 if results.len() == 1 {
1292 Ok(ExecutionResult::Query(results.remove(0)))
1293 } else {
1294 Ok(ExecutionResult::Batch(
1295 results.into_iter().map(ExecutionResult::Query).collect(),
1296 ))
1297 }
1298}
1299
1300async fn execute_collection_query(
1301 db: &Aurora,
1302 field: &ast::Field,
1303 sub_fields: &[ast::Field],
1304 variables: &HashMap<String, ast::Value>,
1305 options: &ExecutionOptions,
1306 fragments: &HashMap<String, FragmentDef>,
1307) -> Result<QueryResult> {
1308 let collection_name = &field.name;
1309
1310 check_permissions(&field.directives, options, false)?;
1312
1313 if let Some(search_arg) = field.arguments.iter().find(|a| a.name == "search") {
1316 return execute_search_query(
1317 db,
1318 collection_name,
1319 search_arg,
1320 sub_fields,
1321 field,
1322 variables,
1323 options,
1324 )
1325 .await;
1326 }
1327
1328 let filter = extract_filter_from_args(&field.arguments)?;
1329 let (limit, offset) = extract_pagination(&field.arguments);
1330 let (first, after, _last, _before) = extract_cursor_pagination(&field.arguments);
1331 let compiled_filter = if let Some(ref f) = filter {
1332 Some(compile_filter(f)?)
1333 } else {
1334 None
1335 };
1336 let vars_arc = Arc::new(variables.clone());
1337 let filter_fn = move |doc: &Document| {
1338 compiled_filter
1339 .as_ref()
1340 .map(|f| matches_filter(doc, f, &vars_arc))
1341 .unwrap_or(true)
1342 };
1343
1344 let indexed_docs = if let Some(ref f) = filter {
1345 match find_indexed_equality_filter(f, db, collection_name) {
1346 Some((field_name, val))
1347 if field_name == "_sid" || !db.is_index_building(collection_name, &field_name) =>
1348 {
1349 let db_val = aql_value_to_db_value(&val, variables)?;
1350 let ids = if field_name == "_sid" {
1351 match &db_val {
1352 Value::String(s) => vec![s.clone()],
1353 Value::Uuid(u) => vec![u.to_string()],
1354 _ => vec![],
1355 }
1356 } else {
1357 db.get_ids_from_index(collection_name, &field_name, &db_val)
1358 };
1359
1360 let mut docs = Vec::with_capacity(ids.len());
1361 for id in ids {
1362 if let Some(doc) = db.get_document(collection_name, &id)? {
1363 if filter_fn(&doc) {
1364 docs.push(doc);
1365 }
1366 }
1367 }
1368 Some(docs)
1369 }
1370 _ => None,
1372 }
1373 } else {
1374 None
1375 };
1376
1377 let is_connection = sub_fields
1378 .iter()
1379 .any(|f| f.name == "edges" || f.name == "pageInfo");
1380
1381 let orderings = extract_order_by(&field.arguments);
1382
1383 let mut docs = if let Some(docs) = indexed_docs {
1384 docs
1385 } else {
1386 let scan_limit = if after.is_some() || !orderings.is_empty() {
1390 None
1391 } else {
1392 limit.or(first).map(|l| {
1393 let base = if is_connection { l + 1 } else { l };
1394 base + offset
1395 })
1396 };
1397 db.scan_and_filter(collection_name, filter_fn, scan_limit)?
1398 };
1399
1400 if let Some(validate_arg) = field.arguments.iter().find(|a| a.name == "validate") {
1403 docs.retain(|doc| doc_passes_validate_arg(doc, validate_arg));
1404 }
1405
1406 if !orderings.is_empty() {
1408 apply_ordering(&mut docs, &orderings);
1409 }
1410
1411 if let Some(ref cursor) = after {
1413 if let Some(pos) = docs.iter().position(|d| &d._sid == cursor) {
1414 docs.drain(0..=pos);
1415 }
1416 }
1417
1418 if is_connection {
1419 return Ok(execute_connection(
1420 docs,
1421 sub_fields,
1422 limit.or(first),
1423 fragments,
1424 variables,
1425 options,
1426 )?);
1427 }
1428
1429 if offset > 0 {
1431 if offset < docs.len() {
1432 docs.drain(0..offset);
1433 } else {
1434 docs.clear();
1435 }
1436 }
1437
1438 if let Some(l) = limit.or(first) {
1440 docs.truncate(l);
1441 }
1442
1443 let lookup_dependencies =
1445 identify_lookup_dependencies(db, collection_name, sub_fields, variables);
1446 let has_lookups = !lookup_dependencies.is_empty();
1447
1448 let mut deferred_fields = Vec::new();
1449
1450 if options.apply_projections && !sub_fields.is_empty() {
1451 if has_lookups {
1452 let mut projected = Vec::with_capacity(docs.len());
1453 for d in docs {
1454 let (proj_doc, deferred) = apply_projection_with_lookups(
1455 db,
1456 d,
1457 collection_name,
1458 sub_fields,
1459 fragments,
1460 variables,
1461 options,
1462 &lookup_dependencies,
1463 )
1464 .await?;
1465 projected.push(proj_doc);
1466 if deferred_fields.is_empty() {
1467 deferred_fields = deferred;
1468 }
1469 }
1470 docs = projected;
1471 } else {
1472 let mut all_deferred = Vec::new();
1473 let mut projected = Vec::with_capacity(docs.len());
1474 for d in docs {
1475 let (proj, deferred) =
1476 apply_projection_and_defer(d, sub_fields, variables, options)?;
1477 if all_deferred.is_empty() && !deferred.is_empty() {
1478 all_deferred = deferred;
1479 }
1480 projected.push(proj);
1481 }
1482 docs = projected;
1483 deferred_fields = all_deferred;
1484 }
1485 }
1486
1487 for sf in sub_fields {
1489 if sf.name == "windowFunc" {
1490 let alias = sf.alias.as_ref().unwrap_or(&sf.name).clone();
1491 let wfield = arg_string(&sf.arguments, "field").unwrap_or_default();
1492 let func = arg_string(&sf.arguments, "function").unwrap_or_else(|| "avg".to_string());
1493 let wsize = arg_i64(&sf.arguments, "windowSize").unwrap_or(3) as usize;
1494 apply_window_function(&mut docs, &alias, &wfield, &func, wsize);
1495 }
1496 }
1497
1498 if let Some(ds_field) = sub_fields.iter().find(|f| f.name == "downsample") {
1500 let interval =
1501 arg_string(&ds_field.arguments, "interval").unwrap_or_else(|| "1h".to_string());
1502 let aggregation =
1503 arg_string(&ds_field.arguments, "aggregation").unwrap_or_else(|| "avg".to_string());
1504 let ds_sub: Vec<String> =
1505 collect_fields(&ds_field.selection_set, fragments, variables, None)
1506 .unwrap_or_default()
1507 .iter()
1508 .map(|f| f.name.clone())
1509 .collect();
1510 docs = apply_downsample(docs, &interval, &aggregation, &ds_sub);
1511 }
1512
1513 Ok(QueryResult {
1514 collection: collection_name.clone(),
1515 documents: docs,
1516 total_count: None,
1517 deferred_fields,
1518 explain: None,
1519 })
1520}
1521
1522async fn execute_search_query(
1524 db: &Aurora,
1525 collection: &str,
1526 search_arg: &ast::Argument,
1527 sub_fields: &[ast::Field],
1528 field: &ast::Field,
1529 variables: &HashMap<String, ast::Value>,
1530 options: &ExecutionOptions,
1531) -> Result<QueryResult> {
1532 let resolved_search_val = resolve_ast_deep(&search_arg.value, variables);
1534 let (query_str, search_fields, fuzzy) = extract_search_params(&resolved_search_val);
1535 let (limit, _) = extract_pagination(&field.arguments);
1536
1537 let mut builder = db.search(collection).query(&query_str);
1538 if fuzzy {
1539 builder = builder.fuzzy(1);
1540 }
1541 if let Some(l) = limit {
1542 builder = builder.limit(l);
1543 }
1544
1545 let mut docs = builder
1546 .collect_with_fields(if search_fields.is_empty() {
1547 None
1548 } else {
1549 Some(&search_fields)
1550 })
1551 .await?;
1552
1553 if options.apply_projections && !sub_fields.is_empty() {
1554 let mut projected = Vec::with_capacity(docs.len());
1555 for d in docs {
1556 let (proj, _) = apply_projection_and_defer(d, sub_fields, variables, options)?;
1557 projected.push(proj);
1558 }
1559 docs = projected;
1560 }
1561
1562 Ok(QueryResult {
1563 collection: collection.to_string(),
1564 documents: docs,
1565 total_count: None,
1566 deferred_fields: vec![],
1567 explain: None,
1568 })
1569}
1570
1571fn extract_search_params(v: &ast::Value) -> (String, Vec<String>, bool) {
1572 let mut query = String::new();
1573 let mut fields = Vec::new();
1574 let mut fuzzy = false;
1575 if let ast::Value::Object(m) = v {
1576 if let Some(ast::Value::String(q)) = m.get("query") {
1577 query = q.clone();
1578 }
1579 if let Some(ast::Value::Array(arr)) = m.get("fields") {
1580 for item in arr {
1581 if let ast::Value::String(s) = item {
1582 fields.push(s.clone());
1583 }
1584 }
1585 }
1586 if let Some(ast::Value::Boolean(b)) = m.get("fuzzy") {
1587 fuzzy = *b;
1588 }
1589 }
1590 (query, fields, fuzzy)
1591}
1592
1593fn doc_passes_validate_arg(doc: &Document, validate_arg: &ast::Argument) -> bool {
1594 if let ast::Value::Object(rules) = &validate_arg.value {
1595 for (field_name, constraints_val) in rules {
1596 if let ast::Value::Object(constraints) = constraints_val {
1597 if let Some(field_val) = doc.data.get(field_name) {
1598 for (constraint_name, constraint_val) in constraints {
1599 if !check_inline_constraint(field_val, constraint_name, constraint_val) {
1600 return false;
1601 }
1602 }
1603 }
1604 }
1605 }
1606 }
1607 true
1608}
1609
1610fn check_inline_constraint(value: &Value, constraint: &str, constraint_val: &ast::Value) -> bool {
1611 match constraint {
1612 "format" => {
1613 if let (Value::String(s), ast::Value::String(fmt)) = (value, constraint_val) {
1614 return match fmt.as_str() {
1615 "email" => {
1616 s.contains('@')
1617 && s.split('@')
1618 .nth(1)
1619 .map(|d| d.contains('.'))
1620 .unwrap_or(false)
1621 }
1622 "url" => s.starts_with("http://") || s.starts_with("https://"),
1623 "uuid" => uuid::Uuid::parse_str(s).is_ok(),
1624 _ => true,
1625 };
1626 }
1627 true
1628 }
1629 "min" => {
1630 let n = match value {
1631 Value::Int(i) => *i as f64,
1632 Value::Float(f) => *f,
1633 _ => return true,
1634 };
1635 let min = match constraint_val {
1636 ast::Value::Float(f) => *f,
1637 ast::Value::Int(i) => *i as f64,
1638 _ => return true,
1639 };
1640 n >= min
1641 }
1642 "max" => {
1643 let n = match value {
1644 Value::Int(i) => *i as f64,
1645 Value::Float(f) => *f,
1646 _ => return true,
1647 };
1648 let max = match constraint_val {
1649 ast::Value::Float(f) => *f,
1650 ast::Value::Int(i) => *i as f64,
1651 _ => return true,
1652 };
1653 n <= max
1654 }
1655 "minLength" => {
1656 if let (Value::String(s), ast::Value::Int(n)) = (value, constraint_val) {
1657 return s.len() >= *n as usize;
1658 }
1659 true
1660 }
1661 "maxLength" => {
1662 if let (Value::String(s), ast::Value::Int(n)) = (value, constraint_val) {
1663 return s.len() <= *n as usize;
1664 }
1665 true
1666 }
1667 "pattern" => {
1668 if let (Value::String(s), ast::Value::String(pat)) = (value, constraint_val) {
1669 if let Ok(re) = regex::Regex::new(pat) {
1670 return re.is_match(s);
1671 }
1672 }
1673 true
1674 }
1675 _ => true,
1676 }
1677}
1678
1679fn arg_string(args: &[ast::Argument], name: &str) -> Option<String> {
1680 args.iter().find(|a| a.name == name).and_then(|a| {
1681 if let ast::Value::String(s) = &a.value {
1682 Some(s.clone())
1683 } else {
1684 None
1685 }
1686 })
1687}
1688
1689fn arg_i64(args: &[ast::Argument], name: &str) -> Option<i64> {
1690 args.iter().find(|a| a.name == name).and_then(|a| {
1691 if let ast::Value::Int(i) = &a.value {
1692 Some(*i)
1693 } else {
1694 None
1695 }
1696 })
1697}
1698
1699fn identify_lookup_dependencies(
1700 db: &Aurora,
1701 collection: &str,
1702 fields: &[ast::Field],
1703 variables: &HashMap<String, ast::Value>,
1704) -> Vec<String> {
1705 let mut deps = Vec::new();
1706 for f in fields {
1707 let mut found = false;
1708 if let (Some(lf), Some(_c), Some(_ff)) = (
1709 f.arguments.iter().find(|a| a.name == "localField"),
1710 f.arguments.iter().find(|a| a.name == "collection"),
1711 f.arguments.iter().find(|a| a.name == "foreignField"),
1712 ) {
1713 if let ast::Value::String(s) = resolve_if_variable(&lf.value, variables) {
1714 deps.push(s.clone());
1715 }
1716 found = true;
1717 }
1718 if !found {
1719 if let Ok(col_def) = db.get_collection_definition(collection) {
1720 if let Some(f_def) = col_def.fields.get(&f.name) {
1721 if let Some(_rel) = &f_def.relation {
1722 deps.push(f.name.clone());
1723 }
1724 }
1725 }
1726 }
1727 }
1728 deps
1729}
1730
1731fn apply_projection_and_defer(
1733 mut doc: Document,
1734 fields: &[ast::Field],
1735 vars: &HashMap<String, ast::Value>,
1736 options: &ExecutionOptions,
1737) -> Result<(Document, Vec<String>)> {
1738 if fields.is_empty() {
1739 return Ok((doc, vec![]));
1740 }
1741 let mut proj = HashMap::new();
1742 let mut deferred = Vec::new();
1743
1744 for f in fields {
1745 check_permissions(&f.directives, options, false)?;
1747
1748 if f.directives.iter().any(|d| d.name == "defer") {
1750 deferred.push(f.alias.as_ref().unwrap_or(&f.name).clone());
1751 continue;
1752 }
1753 if f.name == "__compute__" {
1755 let alias = f.alias.as_deref().unwrap_or("computed");
1756 if let Some(expr_arg) = f.arguments.iter().find(|a| a.name == "expr") {
1757 if let ast::Value::String(template) = &expr_arg.value {
1759 let result = eval_template(template, &doc.data);
1760 proj.insert(alias.to_string(), Value::String(result));
1761 } else if let Some(expr) = f.computed_expression.as_ref() {
1762 let result = eval_expression(expr, &doc.data, vars);
1764 proj.insert(alias.to_string(), result);
1765 }
1766 }
1767 continue;
1768 }
1769 if f.name == "id" {
1770 if let Some(v) = doc.data.get("id") {
1772 proj.insert(f.alias.as_ref().unwrap_or(&f.name).clone(), v.clone());
1773 } else {
1774 proj.insert(f.alias.as_ref().unwrap_or(&f.name).clone(), Value::Null);
1776 }
1777 } else if f.name == "_sid" {
1778 proj.insert(
1779 f.alias.as_ref().unwrap_or(&f.name).clone(),
1780 Value::String(doc._sid.clone()),
1781 );
1782 } else if let Some(v) = doc.data.get(&f.name) {
1783 proj.insert(f.alias.as_ref().unwrap_or(&f.name).clone(), v.clone());
1784 }
1785 }
1786 doc.data = proj;
1787 Ok((doc, deferred))
1788}
1789fn eval_expression(
1791 expr: &ast::Expression,
1792 data: &HashMap<String, Value>,
1793 variables: &HashMap<String, ast::Value>,
1794) -> Value {
1795 match expr {
1796 ast::Expression::Literal(v) => match aql_value_to_db_value(v, variables) {
1797 Ok(dv) => dv,
1798 Err(_) => Value::Null,
1799 },
1800 ast::Expression::Variable(name) => {
1801 if let Some(v) = variables.get(name) {
1802 match aql_value_to_db_value(v, variables) {
1803 Ok(dv) => dv,
1804 Err(_) => Value::Null,
1805 }
1806 } else {
1807 Value::Null
1808 }
1809 }
1810 ast::Expression::FieldAccess(parts) => {
1811 let mut current = if parts[0] == "_sid" {
1812 Some(Value::String(
1813 data.get("_sid")
1814 .cloned()
1815 .and_then(|v| v.as_str().map(|s| s.to_string()))
1816 .unwrap_or_default(),
1817 ))
1818 } else {
1819 data.get(&parts[0]).cloned()
1820 };
1821
1822 for part in parts.iter().skip(1) {
1823 if let Some(Value::Object(map)) = current {
1824 current = map.get(part).cloned();
1825 } else {
1826 current = None;
1827 break;
1828 }
1829 }
1830 current.unwrap_or(Value::Null)
1831 }
1832 ast::Expression::Binary { op, left, right } => {
1833 let lv = eval_expression(left, data, variables);
1834 let rv = eval_expression(right, data, variables);
1835 eval_binary_op(*op, lv, rv)
1836 }
1837 ast::Expression::Unary { op, expr } => {
1838 let v = eval_expression(expr, data, variables);
1839 eval_unary_op(*op, v)
1840 }
1841 ast::Expression::Ternary {
1842 condition,
1843 then_expr,
1844 else_expr,
1845 } => {
1846 let cv = eval_expression(condition, data, variables);
1847 let is_true = match cv {
1848 Value::Bool(b) => b,
1849 Value::Null => false,
1850 Value::Int(i) => i != 0,
1851 Value::Float(f) => f != 0.0,
1852 Value::String(s) => !s.is_empty(),
1853 _ => true,
1854 };
1855 if is_true {
1856 eval_expression(then_expr, data, variables)
1857 } else {
1858 eval_expression(else_expr, data, variables)
1859 }
1860 }
1861 ast::Expression::FunctionCall { name, args } => {
1862 let evaluated_args: Vec<Value> = args
1863 .iter()
1864 .map(|a| eval_expression(a, data, variables))
1865 .collect();
1866 eval_function_call(name, evaluated_args)
1867 }
1868 }
1869}
1870
1871fn eval_binary_op(op: ast::BinaryOp, left: Value, right: Value) -> Value {
1872 match op {
1873 ast::BinaryOp::Add => match (left, right) {
1874 (Value::Int(a), Value::Int(b)) => Value::Int(a + b),
1875 (Value::Int(a), Value::Float(b)) => Value::Float(a as f64 + b),
1876 (Value::Float(a), Value::Int(b)) => Value::Float(a + b as f64),
1877 (Value::Float(a), Value::Float(b)) => Value::Float(a + b),
1878 (Value::String(mut a), Value::String(b)) => {
1879 a.push_str(&b);
1880 Value::String(a)
1881 }
1882 _ => Value::Null,
1883 },
1884 ast::BinaryOp::Sub => match (left, right) {
1885 (Value::Int(a), Value::Int(b)) => Value::Int(a - b),
1886 (Value::Float(a), Value::Float(b)) => Value::Float(a - b),
1887 _ => Value::Null,
1888 },
1889 ast::BinaryOp::Mul => match (left, right) {
1890 (Value::Int(a), Value::Int(b)) => Value::Int(a * b),
1891 (Value::Int(a), Value::Float(b)) => Value::Float(a as f64 * b),
1892 (Value::Float(a), Value::Int(b)) => Value::Float(a * b as f64),
1893 (Value::Float(a), Value::Float(b)) => Value::Float(a * b),
1894 _ => Value::Null,
1895 },
1896 ast::BinaryOp::Div => match (left, right) {
1897 (Value::Int(a), Value::Int(b)) if b != 0 => Value::Int(a / b),
1898 (Value::Float(a), Value::Float(b)) if b != 0.0 => Value::Float(a / b),
1899 _ => Value::Null,
1900 },
1901 ast::BinaryOp::Eq => Value::Bool(left == right),
1902 ast::BinaryOp::Ne => Value::Bool(left != right),
1903 ast::BinaryOp::Gt => Value::Bool(left > right),
1904 ast::BinaryOp::Gte => Value::Bool(left >= right),
1905 ast::BinaryOp::Lt => Value::Bool(left < right),
1906 ast::BinaryOp::Lte => Value::Bool(left <= right),
1907 ast::BinaryOp::And => {
1908 let lb = matches!(left, Value::Bool(true));
1909 let rb = matches!(right, Value::Bool(true));
1910 Value::Bool(lb && rb)
1911 }
1912 ast::BinaryOp::Or => {
1913 let lb = matches!(left, Value::Bool(true));
1914 let rb = matches!(right, Value::Bool(true));
1915 Value::Bool(lb || rb)
1916 }
1917 _ => Value::Null,
1918 }
1919}
1920
1921fn eval_unary_op(op: ast::UnaryOp, val: Value) -> Value {
1922 match op {
1923 ast::UnaryOp::Not => match val {
1924 Value::Bool(b) => Value::Bool(!b),
1925 Value::Null => Value::Bool(true),
1926 _ => Value::Bool(false),
1927 },
1928 ast::UnaryOp::Neg => match val {
1929 Value::Int(i) => Value::Int(-i),
1930 Value::Float(f) => Value::Float(-f),
1931 _ => Value::Null,
1932 },
1933 }
1934}
1935
1936fn eval_function_call(name: &str, args: Vec<Value>) -> Value {
1937 match name {
1938 "upper" | "uppercase" => {
1939 if let Some(Value::String(s)) = args.get(0) {
1940 Value::String(s.to_uppercase())
1941 } else {
1942 Value::Null
1943 }
1944 }
1945 "lower" | "lowercase" => {
1946 if let Some(Value::String(s)) = args.get(0) {
1947 Value::String(s.to_lowercase())
1948 } else {
1949 Value::Null
1950 }
1951 }
1952 "len" | "length" => match args.get(0) {
1953 Some(Value::String(s)) => Value::Int(s.len() as i64),
1954 Some(Value::Array(a)) => Value::Int(a.len() as i64),
1955 Some(Value::Object(o)) => Value::Int(o.len() as i64),
1956 _ => Value::Null,
1957 },
1958 _ => Value::Null,
1959 }
1960}
1961
1962fn eval_template(template: &str, data: &HashMap<String, Value>) -> String {
1964 let mut result = template.to_string();
1965 for (k, v) in data {
1966 let p1 = format!("${{{}}}", k);
1967 let p2 = format!("${}", k);
1968 let v_str = match v {
1969 Value::String(s) => s.clone(),
1970 _ => v.to_string(),
1971 };
1972 if result.contains(&p1) {
1973 result = result.replace(&p1, &v_str);
1974 }
1975 if result.contains(&p2) {
1976 result = result.replace(&p2, &v_str);
1977 }
1978 }
1979 result
1980}
1981
1982fn apply_window_function(
1984 docs: &mut Vec<Document>,
1985 alias: &str,
1986 field: &str,
1987 function: &str,
1988 window: usize,
1989) {
1990 if docs.is_empty() || window == 0 {
1991 return;
1992 }
1993 let values: Vec<Option<f64>> = docs
1994 .iter()
1995 .map(|d| match d.data.get(field) {
1996 Some(Value::Int(i)) => Some(*i as f64),
1997 Some(Value::Float(f)) => Some(*f),
1998 _ => None,
1999 })
2000 .collect();
2001
2002 for (i, doc) in docs.iter_mut().enumerate() {
2003 let start = if i + 1 >= window { i + 1 - window } else { 0 };
2004 let window_vals: Vec<f64> = values[start..=i].iter().filter_map(|v| *v).collect();
2005 if window_vals.is_empty() {
2006 continue;
2007 }
2008 let result = match function {
2009 "rollingAvg" | "avg" => window_vals.iter().sum::<f64>() / window_vals.len() as f64,
2010 "rollingSum" | "sum" => window_vals.iter().sum::<f64>(),
2011 "rollingMin" | "min" => window_vals.iter().cloned().fold(f64::INFINITY, f64::min),
2012 "rollingMax" | "max" => window_vals
2013 .iter()
2014 .cloned()
2015 .fold(f64::NEG_INFINITY, f64::max),
2016 _ => window_vals.iter().sum::<f64>() / window_vals.len() as f64,
2017 };
2018 doc.data.insert(alias.to_string(), Value::Float(result));
2019 }
2020}
2021
2022fn apply_downsample(
2024 docs: Vec<Document>,
2025 interval: &str,
2026 aggregation: &str,
2027 value_fields: &[String],
2028) -> Vec<Document> {
2029 let interval_secs: i64 = parse_interval(interval);
2030 if interval_secs <= 0 {
2031 return docs;
2032 }
2033
2034 let mut buckets: std::collections::BTreeMap<i64, Vec<Document>> =
2036 std::collections::BTreeMap::new();
2037 let mut leftover = Vec::new();
2038
2039 for doc in docs {
2040 let ts = ["timestamp", "ts", "created_at", "time"]
2042 .iter()
2043 .find_map(|&k| doc.data.get(k))
2044 .and_then(|v| match v {
2045 Value::String(s) => chrono::DateTime::parse_from_rfc3339(s)
2046 .ok()
2047 .map(|dt| dt.timestamp()),
2048 Value::Int(i) => Some(*i),
2049 _ => None,
2050 });
2051
2052 if let Some(t) = ts {
2053 let bucket = (t / interval_secs) * interval_secs;
2054 buckets.entry(bucket).or_default().push(doc);
2055 } else {
2056 leftover.push(doc);
2057 }
2058 }
2059
2060 let mut result = Vec::new();
2061 for (bucket_ts, group) in buckets {
2062 let mut data = HashMap::new();
2063 data.insert(
2064 "timestamp".to_string(),
2065 Value::String(
2066 chrono::DateTime::from_timestamp(bucket_ts, 0)
2067 .map(|dt: chrono::DateTime<chrono::Utc>| dt.to_rfc3339())
2068 .unwrap_or_default(),
2069 ),
2070 );
2071 data.insert("count".to_string(), Value::Int(group.len() as i64));
2072
2073 for field in value_fields {
2074 if field == "timestamp" || field == "count" {
2075 continue;
2076 }
2077 let nums: Vec<f64> = group
2078 .iter()
2079 .filter_map(|d| match d.data.get(field) {
2080 Some(Value::Int(i)) => Some(*i as f64),
2081 Some(Value::Float(f)) => Some(*f),
2082 _ => None,
2083 })
2084 .collect();
2085 if nums.is_empty() {
2086 continue;
2087 }
2088 let agg = match aggregation {
2089 "sum" => nums.iter().sum::<f64>(),
2090 "min" => nums.iter().cloned().fold(f64::INFINITY, f64::min),
2091 "max" => nums.iter().cloned().fold(f64::NEG_INFINITY, f64::max),
2092 "count" => nums.len() as f64,
2093 _ => nums.iter().sum::<f64>() / nums.len() as f64, };
2095 data.insert(field.clone(), Value::Float(agg));
2096 }
2097
2098 result.push(Document {
2099 _sid: bucket_ts.to_string(),
2100 data,
2101 });
2102 }
2103
2104 result.extend(leftover);
2105 result
2106}
2107
2108fn parse_interval(s: &str) -> i64 {
2109 let s = s.trim();
2110 if s.ends_with('s') {
2111 s[..s.len() - 1].parse().unwrap_or(0)
2112 } else if s.ends_with('m') {
2113 s[..s.len() - 1].parse::<i64>().unwrap_or(0) * 60
2114 } else if s.ends_with('h') {
2115 s[..s.len() - 1].parse::<i64>().unwrap_or(0) * 3600
2116 } else if s.ends_with('d') {
2117 s[..s.len() - 1].parse::<i64>().unwrap_or(0) * 86400
2118 } else {
2119 s.parse().unwrap_or(3600)
2120 }
2121}
2122
2123async fn execute_handler_registration(
2126 db: &Aurora,
2127 handler: &ast::HandlerDef,
2128 _options: &ExecutionOptions,
2129) -> Result<ExecutionResult> {
2130 use crate::pubsub::events::ChangeType;
2131
2132 let collection = match &handler.trigger {
2133 ast::HandlerTrigger::Insert { collection }
2134 | ast::HandlerTrigger::Update { collection }
2135 | ast::HandlerTrigger::Delete { collection } => {
2136 collection.as_deref().unwrap_or("*").to_string()
2137 }
2138 _ => "*".to_string(),
2139 };
2140
2141 let trigger_type = match &handler.trigger {
2142 ast::HandlerTrigger::Insert { .. } => Some(ChangeType::Insert),
2143 ast::HandlerTrigger::Update { .. } => Some(ChangeType::Update),
2144 ast::HandlerTrigger::Delete { .. } => Some(ChangeType::Delete),
2145 _ => None,
2146 };
2147
2148 let mut listener = if collection == "*" {
2149 db.pubsub.listen_all()
2150 } else {
2151 db.pubsub.listen(collection.clone())
2152 };
2153
2154 let db_clone = db.clone();
2155 let action = handler.action.clone();
2156 let handler_name = handler.name.clone();
2157
2158 tokio::spawn(async move {
2159 loop {
2160 match listener.recv().await {
2161 Ok(event) => {
2162 let matches = trigger_type
2164 .as_ref()
2165 .map(|t| &event.change_type == t)
2166 .unwrap_or(true);
2167 if !matches {
2168 continue;
2169 }
2170
2171 let mut vars = HashMap::new();
2175 vars.insert("_id".to_string(), ast::Value::String(event._sid.clone()));
2176 if let Some(doc) = &event.document {
2177 for (k, v) in &doc.data {
2179 vars.insert(format!("_{}", k), db_value_to_ast_value(v));
2180 }
2181 }
2182
2183 let _ = execute_mutation_op(
2184 &db_clone,
2185 &action,
2186 &vars,
2187 &HashMap::new(),
2188 &ExecutionOptions::default(),
2189 &HashMap::new(),
2190 )
2191 .await;
2192 }
2193 Err(_) => {
2194 eprintln!("[handler:{}] channel closed, stopping", handler_name);
2195 break;
2196 }
2197 }
2198 }
2199 });
2200
2201 eprintln!(
2202 "[handler] '{}' registered on '{}'",
2203 handler.name, collection
2204 );
2205
2206 let mut data = HashMap::new();
2207 data.insert("name".to_string(), Value::String(handler.name.clone()));
2208 data.insert("collection".to_string(), Value::String(collection));
2209 data.insert(
2210 "status".to_string(),
2211 Value::String("registered".to_string()),
2212 );
2213
2214 Ok(ExecutionResult::Query(QueryResult {
2215 collection: "__handler".to_string(),
2216 documents: vec![Document {
2217 _sid: handler.name.clone(),
2218 data,
2219 }],
2220 total_count: Some(1),
2221 deferred_fields: vec![],
2222 explain: None,
2223 }))
2224}
2225
2226fn db_value_to_ast_value(v: &Value) -> ast::Value {
2227 match v {
2228 Value::Null => ast::Value::Null,
2229 Value::Bool(b) => ast::Value::Boolean(*b),
2230 Value::Int(i) => ast::Value::Int(*i),
2231 Value::Float(f) => ast::Value::Float(*f),
2232 Value::String(s) => ast::Value::String(s.clone()),
2233 Value::Uuid(u) => ast::Value::String(u.to_string()),
2234 Value::DateTime(dt) => ast::Value::String(dt.to_rfc3339()),
2235 Value::Array(arr) => ast::Value::Array(arr.iter().map(db_value_to_ast_value).collect()),
2236 Value::Object(m) => ast::Value::Object(
2237 m.iter()
2238 .map(|(k, v)| (k.clone(), db_value_to_ast_value(v)))
2239 .collect(),
2240 ),
2241 }
2242}
2243
2244async fn execute_mutation(
2245 db: &Aurora,
2246 mutation: &ast::Mutation,
2247 vars: &HashMap<String, ast::Value>,
2248 options: &ExecutionOptions,
2249 fragments: &HashMap<String, FragmentDef>,
2250) -> Result<ExecutionResult> {
2251 use crate::transaction::ACTIVE_TRANSACTION_ID;
2252
2253 validate_required_variables(&mutation.variable_definitions, vars)?;
2254
2255 check_permissions(&mutation.directives, options, true)?;
2257
2258 let already_in_tx = ACTIVE_TRANSACTION_ID
2263 .try_with(|id| *id)
2264 .ok()
2265 .and_then(|id| db.transaction_manager.active_transactions.get(&id))
2266 .is_some();
2267
2268 if already_in_tx {
2269 let mut results = Vec::new();
2270 let mut context = HashMap::new();
2271 for mut_op in &mutation.operations {
2272 let res = execute_mutation_op(db, mut_op, vars, &context, options, fragments).await?;
2273 if let Some(alias) = &mut_op.alias {
2274 if let Some(doc) = res.returned_documents.first() {
2275 let mut m = serde_json::Map::new();
2276 for (k, v) in &doc.data {
2277 m.insert(k.clone(), aurora_value_to_json_value(v));
2278 }
2279 m.insert("id".to_string(), JsonValue::String(doc._sid.clone()));
2280 context.insert(alias.clone(), JsonValue::Object(m));
2281 }
2282 }
2283 results.push(res);
2284 }
2285 return if results.len() == 1 {
2286 Ok(ExecutionResult::Mutation(results.remove(0)))
2287 } else {
2288 Ok(ExecutionResult::Batch(
2289 results.into_iter().map(ExecutionResult::Mutation).collect(),
2290 ))
2291 };
2292 }
2293
2294 let tx_id = db.begin_transaction().await;
2297
2298 let exec_result = ACTIVE_TRANSACTION_ID
2299 .scope(tx_id, async {
2300 let mut results = Vec::new();
2301 let mut context = HashMap::new();
2302 for mut_op in &mutation.operations {
2303 let res =
2304 execute_mutation_op(db, mut_op, vars, &context, options, fragments).await?;
2305 if let Some(alias) = &mut_op.alias {
2306 if let Some(doc) = res.returned_documents.first() {
2307 let mut m = serde_json::Map::new();
2308 for (k, v) in &doc.data {
2309 m.insert(k.clone(), aurora_value_to_json_value(v));
2310 }
2311 m.insert("id".to_string(), JsonValue::String(doc._sid.clone()));
2312 context.insert(alias.clone(), JsonValue::Object(m));
2313 }
2314 }
2315 results.push(res);
2316 }
2317 Ok::<_, crate::error::AqlError>(results)
2318 })
2319 .await;
2320
2321 match exec_result {
2322 Ok(mut results) => {
2323 db.commit_transaction(tx_id).await?;
2324 if results.len() == 1 {
2325 Ok(ExecutionResult::Mutation(results.remove(0)))
2326 } else {
2327 Ok(ExecutionResult::Batch(
2328 results.into_iter().map(ExecutionResult::Mutation).collect(),
2329 ))
2330 }
2331 }
2332 Err(e) => {
2333 let _ = db.rollback_transaction(tx_id).await;
2334 Err(e)
2335 }
2336 }
2337}
2338
2339fn execute_mutation_op<'a>(
2340 db: &'a Aurora,
2341 mut_op: &'a ast::MutationOperation,
2342 variables: &'a HashMap<String, ast::Value>,
2343 context: &'a ExecutionContext,
2344 options: &'a ExecutionOptions,
2345 fragments: &'a HashMap<String, FragmentDef>,
2346) -> futures::future::BoxFuture<'a, Result<MutationResult>> {
2347 use futures::future::FutureExt;
2348 async move {
2349 check_permissions(&mut_op.directives, options, true)?;
2351
2352 match &mut_op.operation {
2353 MutationOp::Insert { collection, data } => {
2354 let resolved = resolve_value(data, variables, context);
2355 let doc = db
2356 .aql_insert(collection, aql_value_to_hashmap(&resolved, variables)?)
2357 .await?;
2358 let returned = if !mut_op.selection_set.is_empty() && options.apply_projections {
2359 let fields = collect_fields(
2360 &mut_op.selection_set,
2361 fragments,
2362 variables,
2363 Some(collection),
2364 )
2365 .unwrap_or_default();
2366 vec![apply_projection(doc, &fields, options)?]
2367 } else {
2368 vec![doc]
2369 };
2370
2371 Ok(MutationResult {
2372 operation: "insert".to_string(),
2373 collection: collection.clone(),
2374 affected_count: 1,
2375 returned_documents: returned,
2376 })
2377 }
2378 MutationOp::Update {
2379 collection,
2380 filter,
2381 data,
2382 } => {
2383 let cf = if let Some(f) = filter {
2384 Some(compile_filter(f)?)
2385 } else {
2386 None
2387 };
2388 let update_data =
2389 aql_value_to_hashmap(&resolve_value(data, variables, context), variables)?;
2390
2391 let mut affected = 0;
2393 let mut returned = Vec::new();
2394
2395 let vars_arc = Arc::new(variables.clone());
2398 let cf_arc = cf.map(Arc::new);
2399
2400 let matches = db.scan_and_filter(
2401 collection,
2402 |doc| {
2403 if let Some(ref filter) = cf_arc {
2404 matches_filter(doc, filter, &vars_arc)
2405 } else {
2406 true
2407 }
2408 },
2409 None,
2410 )?;
2411
2412 let fields = if !mut_op.selection_set.is_empty() {
2413 Some(
2414 collect_fields(
2415 &mut_op.selection_set,
2416 fragments,
2417 variables,
2418 Some(collection),
2419 )
2420 .unwrap_or_default(),
2421 )
2422 } else {
2423 None
2424 };
2425
2426 for doc in matches {
2427 let mut new_data = doc.data.clone();
2428 for (k, v) in &update_data {
2429 let applied = apply_field_modifier(new_data.get(k), v);
2430 new_data.insert(k.clone(), applied);
2431 }
2432
2433 let updated_doc = db
2434 .aql_update_document(collection, &doc._sid, new_data)
2435 .await?;
2436
2437 affected += 1;
2438 if let Some(ref f) = fields {
2439 returned.push(apply_projection(updated_doc, f, options)?);
2440 }
2441 }
2442
2443 Ok(MutationResult {
2444 operation: "update".to_string(),
2445 collection: collection.clone(),
2446 affected_count: affected,
2447 returned_documents: returned,
2448 })
2449 }
2450 MutationOp::Delete { collection, filter } => {
2451 let cf = if let Some(f) = filter {
2452 Some(compile_filter(f)?)
2453 } else {
2454 None
2455 };
2456
2457 let mut affected = 0;
2458 let vars_arc = Arc::new(variables.clone());
2459 let cf_arc = cf.map(Arc::new);
2460
2461 let matches = db.scan_and_filter(
2462 collection,
2463 |doc| {
2464 if let Some(ref filter) = cf_arc {
2465 matches_filter(doc, filter, &vars_arc)
2466 } else {
2467 true
2468 }
2469 },
2470 None,
2471 )?;
2472
2473 for doc in matches {
2474 db.aql_delete_document(collection, &doc._sid).await?;
2475 affected += 1;
2476 }
2477
2478 Ok(MutationResult {
2479 operation: "delete".to_string(),
2480 collection: collection.clone(),
2481 affected_count: affected,
2482 returned_documents: vec![],
2483 })
2484 }
2485 MutationOp::InsertMany { collection, data } => {
2486 let resolved = resolve_value(data, variables, context);
2487 let items = match resolved {
2488 ast::Value::Array(arr) => arr,
2489 _ => {
2490 return Err(AqlError::new(
2491 ErrorCode::QueryError,
2492 "insertMany data must be an array".to_string(),
2493 ));
2494 }
2495 };
2496
2497 let mut affected = 0;
2498 let mut returned = Vec::new();
2499 for item in items {
2500 let doc = db
2501 .aql_insert(collection, aql_value_to_hashmap(&item, variables)?)
2502 .await?;
2503 affected += 1;
2504 if !mut_op.selection_set.is_empty() && options.apply_projections {
2505 let fields = collect_fields(
2506 &mut_op.selection_set,
2507 fragments,
2508 variables,
2509 Some(collection),
2510 )
2511 .unwrap_or_default();
2512 returned.push(apply_projection(doc, &fields, options)?);
2513 } else {
2514 returned.push(doc);
2515 }
2516 }
2517 Ok(MutationResult {
2518 operation: "insertMany".to_string(),
2519 collection: collection.clone(),
2520 affected_count: affected,
2521 returned_documents: returned,
2522 })
2523 }
2524 MutationOp::Upsert {
2525 collection,
2526 filter,
2527 data,
2528 } => {
2529 let update_data =
2530 aql_value_to_hashmap(&resolve_value(data, variables, context), variables)?;
2531 let cf = if let Some(f) = filter {
2532 Some(compile_filter(f)?)
2533 } else {
2534 None
2535 };
2536 let vars_arc = Arc::new(variables.clone());
2537 let cf_arc = cf.map(Arc::new);
2538 let matches = db.scan_and_filter(
2539 collection,
2540 |doc| {
2541 if let Some(ref filter) = cf_arc {
2542 matches_filter(doc, filter, &vars_arc)
2543 } else {
2544 true
2545 }
2546 },
2547 Some(1),
2548 )?;
2549 let doc = if let Some(existing) = matches.into_iter().next() {
2550 db.aql_update_document(collection, &existing._sid, update_data)
2551 .await?
2552 } else {
2553 db.aql_insert(collection, update_data).await?
2554 };
2555
2556 let returned = if !mut_op.selection_set.is_empty() && options.apply_projections {
2557 let fields = collect_fields(
2558 &mut_op.selection_set,
2559 fragments,
2560 variables,
2561 Some(collection),
2562 )
2563 .unwrap_or_default();
2564 vec![apply_projection(doc, &fields, options)?]
2565 } else {
2566 vec![doc]
2567 };
2568
2569 Ok(MutationResult {
2570 operation: "upsert".to_string(),
2571 collection: collection.clone(),
2572 affected_count: 1,
2573 returned_documents: returned,
2574 })
2575 }
2576 MutationOp::Transaction { operations } => {
2577 let mut all_returned = Vec::new();
2578 let mut total_affected = 0;
2579 for op in operations {
2580 let res =
2581 execute_mutation_op(db, op, variables, context, options, fragments).await?;
2582 total_affected += res.affected_count;
2583 all_returned.extend(res.returned_documents);
2584 }
2585 Ok(MutationResult {
2586 operation: "transaction".to_string(),
2587 collection: String::new(),
2588 affected_count: total_affected,
2589 returned_documents: all_returned,
2590 })
2591 }
2592 MutationOp::EnqueueJobs {
2593 job_type,
2594 payloads,
2595 priority,
2596 max_retries,
2597 } => {
2598 let workers = db.workers.as_ref().ok_or_else(|| {
2599 AqlError::new(
2600 ErrorCode::QueryError,
2601 "Worker system not initialised".to_string(),
2602 )
2603 })?;
2604 let job_priority = match priority {
2605 ast::JobPriority::Low => crate::workers::JobPriority::Low,
2606 ast::JobPriority::Normal => crate::workers::JobPriority::Normal,
2607 ast::JobPriority::High => crate::workers::JobPriority::High,
2608 ast::JobPriority::Critical => crate::workers::JobPriority::Critical,
2609 };
2610 let mut returned = Vec::new();
2611 for payload in payloads {
2612 let resolved = resolve_value(payload, variables, context);
2613 let json_payload: std::collections::HashMap<String, serde_json::Value> =
2614 if let ast::Value::Object(map) = &resolved {
2615 map.iter()
2616 .map(|(k, v)| (k.clone(), aql_value_to_json(v)))
2617 .collect()
2618 } else {
2619 std::collections::HashMap::new()
2620 };
2621 let mut job =
2622 crate::workers::Job::new(job_type.clone()).with_priority(job_priority);
2623 for (k, v) in json_payload {
2624 job = job.add_field(k, v);
2625 }
2626 if let Some(retries) = max_retries {
2627 job = job.with_max_retries(*retries);
2628 }
2629 let job_id = workers.enqueue(job).await?;
2630 let mut doc = crate::types::Document::new();
2631 doc._sid = job_id.clone();
2632 doc.data.insert("job_id".to_string(), Value::String(job_id));
2633 doc.data
2634 .insert("job_type".to_string(), Value::String(job_type.clone()));
2635 doc.data
2636 .insert("status".to_string(), Value::String("pending".to_string()));
2637 returned.push(doc);
2638 }
2639 let count = returned.len();
2640 Ok(MutationResult {
2641 operation: "enqueueJobs".to_string(),
2642 collection: "__jobs".to_string(),
2643 affected_count: count,
2644 returned_documents: returned,
2645 })
2646 }
2647 MutationOp::Import { collection, data } => {
2648 let mut affected = 0;
2649 let mut returned = Vec::new();
2650 let fields = if !mut_op.selection_set.is_empty() {
2651 Some(
2652 collect_fields(
2653 &mut_op.selection_set,
2654 fragments,
2655 variables,
2656 Some(collection),
2657 )
2658 .unwrap_or_default(),
2659 )
2660 } else {
2661 None
2662 };
2663 for item in data {
2664 let resolved = resolve_value(item, variables, context);
2665 let map = aql_value_to_hashmap(&resolved, variables)?;
2666 let doc = db.aql_insert(collection, map).await?;
2667 affected += 1;
2668 if let Some(ref f) = fields {
2669 returned.push(apply_projection(doc, f, options)?);
2670 }
2671 }
2672 Ok(MutationResult {
2673 operation: "import".to_string(),
2674 collection: collection.clone(),
2675 affected_count: affected,
2676 returned_documents: returned,
2677 })
2678 }
2679 MutationOp::Export {
2680 collection,
2681 format: _,
2682 } => {
2683 let docs = db.scan_and_filter(collection, |_| true, None)?;
2684 let fields = if !mut_op.selection_set.is_empty() {
2685 Some(
2686 collect_fields(
2687 &mut_op.selection_set,
2688 fragments,
2689 variables,
2690 Some(collection),
2691 )
2692 .unwrap_or_default(),
2693 )
2694 } else {
2695 None
2696 };
2697 let mut returned = Vec::with_capacity(docs.len());
2698 if let Some(ref f) = fields {
2699 for d in docs {
2700 returned.push(apply_projection(d, f, options)?);
2701 }
2702 } else {
2703 returned = docs;
2704 }
2705 let count = returned.len();
2706 Ok(MutationResult {
2707 operation: "export".to_string(),
2708 collection: collection.clone(),
2709 affected_count: count,
2710 returned_documents: returned,
2711 })
2712 }
2713 MutationOp::EnqueueJob {
2714 job_type,
2715 payload,
2716 priority,
2717 scheduled_at,
2718 max_retries,
2719 } => {
2720 let workers = db.workers.as_ref().ok_or_else(|| {
2721 AqlError::new(
2722 ErrorCode::QueryError,
2723 "Worker system not initialised".to_string(),
2724 )
2725 })?;
2726
2727 let job_priority = match priority {
2729 ast::JobPriority::Low => crate::workers::JobPriority::Low,
2730 ast::JobPriority::Normal => crate::workers::JobPriority::Normal,
2731 ast::JobPriority::High => crate::workers::JobPriority::High,
2732 ast::JobPriority::Critical => crate::workers::JobPriority::Critical,
2733 };
2734
2735 let resolved = resolve_value(payload, variables, context);
2737 let json_payload: std::collections::HashMap<String, serde_json::Value> =
2738 if let ast::Value::Object(map) = &resolved {
2739 map.iter()
2740 .map(|(k, v)| (k.clone(), aql_value_to_json(v)))
2741 .collect()
2742 } else {
2743 std::collections::HashMap::new()
2744 };
2745
2746 let mut job =
2747 crate::workers::Job::new(job_type.clone()).with_priority(job_priority);
2748
2749 for (k, v) in json_payload {
2750 job = job.add_field(k, v);
2751 }
2752
2753 if let Some(retries) = max_retries {
2754 job = job.with_max_retries(*retries);
2755 }
2756
2757 if let Some(scheduled) = scheduled_at {
2758 if let Ok(dt) = scheduled.parse::<chrono::DateTime<chrono::Utc>>() {
2759 job = job.scheduled_at(dt);
2760 }
2761 }
2762
2763 let job_id = workers.enqueue(job).await?;
2764
2765 let mut doc = crate::types::Document::new();
2766 doc._sid = job_id.clone();
2767 doc.data
2768 .insert("job_id".to_string(), crate::types::Value::String(job_id));
2769 doc.data.insert(
2770 "job_type".to_string(),
2771 crate::types::Value::String(job_type.clone()),
2772 );
2773 doc.data.insert(
2774 "status".to_string(),
2775 crate::types::Value::String("pending".to_string()),
2776 );
2777
2778 Ok(MutationResult {
2779 operation: "enqueueJob".to_string(),
2780 collection: "__jobs".to_string(),
2781 affected_count: 1,
2782 returned_documents: vec![doc],
2783 })
2784 }
2785 }
2786 }
2787 .boxed()
2788}
2789
2790async fn execute_subscription(
2791 db: &Aurora,
2792 sub: &ast::Subscription,
2793 vars: &HashMap<String, ast::Value>,
2794 _options: &ExecutionOptions,
2795) -> Result<ExecutionResult> {
2796 let vars: HashMap<String, ast::Value> = vars.clone();
2797
2798 let selection = sub.selection_set.first().ok_or_else(|| {
2799 AqlError::new(
2800 ErrorCode::QueryError,
2801 "Subscription must have a selection".to_string(),
2802 )
2803 })?;
2804
2805 if let Selection::Field(f) = selection {
2806 let collection = f.name.clone();
2807 let filter = extract_filter_from_args(&f.arguments)?;
2808
2809 let mut listener = db.pubsub.listen(&collection);
2810
2811 if let Some(f) = filter {
2812 let event_filter = ast_filter_to_event_filter(&f, &vars)?;
2813 listener = listener.filter(event_filter);
2814 }
2815
2816 Ok(ExecutionResult::Subscription(SubscriptionResult {
2817 subscription_id: uuid::Uuid::new_v4().to_string(),
2818 collection,
2819 stream: Some(listener),
2820 }))
2821 } else {
2822 Err(AqlError::new(
2823 ErrorCode::QueryError,
2824 "Invalid subscription selection".to_string(),
2825 ))
2826 }
2827}
2828
2829fn ast_filter_to_event_filter(
2830 filter: &AqlFilter,
2831 vars: &HashMap<String, ast::Value>,
2832) -> Result<crate::pubsub::EventFilter> {
2833 use crate::pubsub::EventFilter;
2834
2835 match filter {
2836 AqlFilter::Eq(f, v) => Ok(EventFilter::FieldEquals(
2837 f.clone(),
2838 aql_value_to_db_value(v, vars)?,
2839 )),
2840 AqlFilter::Ne(f, v) => Ok(EventFilter::Ne(f.clone(), aql_value_to_db_value(v, vars)?)),
2841 AqlFilter::Gt(f, v) => Ok(EventFilter::Gt(f.clone(), aql_value_to_db_value(v, vars)?)),
2842 AqlFilter::Gte(f, v) => Ok(EventFilter::Gte(f.clone(), aql_value_to_db_value(v, vars)?)),
2843 AqlFilter::Lt(f, v) => Ok(EventFilter::Lt(f.clone(), aql_value_to_db_value(v, vars)?)),
2844 AqlFilter::Lte(f, v) => Ok(EventFilter::Lte(f.clone(), aql_value_to_db_value(v, vars)?)),
2845 AqlFilter::In(f, v) => Ok(EventFilter::In(f.clone(), aql_value_to_db_value(v, vars)?)),
2846 AqlFilter::NotIn(f, v) => Ok(EventFilter::NotIn(
2847 f.clone(),
2848 aql_value_to_db_value(v, vars)?,
2849 )),
2850 AqlFilter::Contains(f, v) => Ok(EventFilter::Contains(
2851 f.clone(),
2852 aql_value_to_db_value(v, vars)?,
2853 )),
2854 AqlFilter::ContainsAny(f, v) | AqlFilter::ContainsAll(f, v) => Ok(EventFilter::Contains(
2856 f.clone(),
2857 aql_value_to_db_value(v, vars)?,
2858 )),
2859 AqlFilter::StartsWith(f, v) => Ok(EventFilter::StartsWith(
2860 f.clone(),
2861 aql_value_to_db_value(v, vars)?,
2862 )),
2863 AqlFilter::EndsWith(f, v) => Ok(EventFilter::EndsWith(
2864 f.clone(),
2865 aql_value_to_db_value(v, vars)?,
2866 )),
2867 AqlFilter::Matches(f, v) => {
2868 let pattern = match aql_value_to_db_value(v, vars)? {
2869 crate::types::Value::String(s) => s,
2870 other => other.to_string(),
2871 };
2872 let re = regex::Regex::new(&pattern).map_err(|e| {
2873 crate::error::AqlError::invalid_operation(format!("Invalid regex pattern: {}", e))
2874 })?;
2875 Ok(EventFilter::Matches(f.clone(), re))
2876 }
2877 AqlFilter::IsNull(f) => Ok(EventFilter::IsNull(f.clone())),
2878 AqlFilter::IsNotNull(f) => Ok(EventFilter::IsNotNull(f.clone())),
2879 AqlFilter::And(filters) => {
2880 let mut mapped = Vec::new();
2881 for f in filters {
2882 mapped.push(ast_filter_to_event_filter(f, vars)?);
2883 }
2884 Ok(EventFilter::And(mapped))
2885 }
2886 AqlFilter::Or(filters) => {
2887 let mut mapped = Vec::new();
2888 for f in filters {
2889 mapped.push(ast_filter_to_event_filter(f, vars)?);
2890 }
2891 Ok(EventFilter::Or(mapped))
2892 }
2893 AqlFilter::Not(f) => Ok(EventFilter::Not(Box::new(ast_filter_to_event_filter(
2894 f, vars,
2895 )?))),
2896 }
2897}
2898
2899async fn execute_introspection(
2900 db: &Aurora,
2901 intro: &ast::IntrospectionQuery,
2902) -> Result<ExecutionResult> {
2903 let names = db.list_collection_names();
2904 let want_fields = intro.fields.is_empty()
2905 || intro
2906 .fields
2907 .iter()
2908 .any(|f| f == "collections" || f == "fields");
2909
2910 let documents: Vec<Document> = names
2911 .iter()
2912 .filter_map(|name| {
2913 if name.starts_with('_') {
2915 return None;
2916 }
2917 let col = db.get_collection_definition(name).ok()?;
2918 let mut data = HashMap::new();
2919 data.insert("name".to_string(), Value::String(name.clone()));
2920
2921 if want_fields {
2922 let field_list: Vec<Value> = col
2923 .fields
2924 .iter()
2925 .map(|(fname, fdef)| {
2926 let mut fdata = HashMap::new();
2927 fdata.insert("name".to_string(), Value::String(fname.clone()));
2928 fdata.insert(
2929 "type".to_string(),
2930 Value::String(fdef.field_type.to_string()),
2931 );
2932 fdata.insert("required".to_string(), Value::Bool(!fdef.nullable));
2933 fdata.insert("indexed".to_string(), Value::Bool(fdef.indexed));
2934 fdata.insert("unique".to_string(), Value::Bool(fdef.unique));
2935 if !fdef.validations.is_empty() {
2936 let vcons: Vec<Value> = fdef
2937 .validations
2938 .iter()
2939 .map(|c| Value::String(format!("{:?}", c)))
2940 .collect();
2941 fdata.insert("validations".to_string(), Value::Array(vcons));
2942 }
2943 Value::Object(fdata)
2944 })
2945 .collect();
2946 data.insert("fields".to_string(), Value::Array(field_list));
2947 }
2948
2949 Some(Document {
2950 _sid: name.clone(),
2951 data,
2952 })
2953 })
2954 .collect();
2955
2956 let count = documents.len();
2957 Ok(ExecutionResult::Query(QueryResult {
2958 collection: "__schema".to_string(),
2959 documents,
2960 total_count: Some(count),
2961 deferred_fields: vec![],
2962 explain: None,
2963 }))
2964}
2965
2966async fn execute_schema(
2967 db: &Aurora,
2968 schema: &ast::Schema,
2969 _options: &ExecutionOptions,
2970) -> Result<ExecutionResult> {
2971 let mut last_collection = String::new();
2972
2973 for op in &schema.operations {
2974 match op {
2975 ast::SchemaOp::DefineCollection {
2976 name,
2977 fields,
2978 if_not_exists,
2979 ..
2980 } => {
2981 last_collection = name.clone();
2982
2983 if *if_not_exists {
2985 if db.get_collection_definition(name).is_ok() {
2986 continue;
2987 }
2988 }
2989
2990 let mut field_defs = Vec::new();
2991 for field in fields {
2992 field_defs.push((field.name.as_str(), build_field_def(field)));
2993 }
2994 db.new_collection(name, field_defs).await?;
2995 }
2996 ast::SchemaOp::AlterCollection { name, actions } => {
2997 last_collection = name.clone();
2998 for action in actions {
2999 match action {
3000 ast::AlterAction::AddField { field, default } => {
3001 let def = build_field_def(field);
3002 db.add_field_to_schema(name, field.name.clone(), def)
3003 .await?;
3004 if let Some(default_val) = default {
3005 let db_val = aql_value_to_db_value(default_val, &HashMap::new())?;
3006 let docs = db.get_all_collection(name).await?;
3007 for doc in docs {
3008 if !doc.data.contains_key(&field.name) {
3009 db.update_document(
3010 name,
3011 &doc._sid,
3012 vec![(&field.name, db_val.clone())],
3013 )
3014 .await?;
3015 }
3016 }
3017 }
3018 }
3019 ast::AlterAction::DropField(field_name) => {
3020 db.drop_field_from_schema(name, field_name.clone()).await?;
3021 }
3022 ast::AlterAction::RenameField { from, to } => {
3023 db.rename_field_in_schema(name, from.clone(), to.clone())
3024 .await?;
3025 let docs = db.get_all_collection(name).await?;
3026 for mut doc in docs {
3027 if let Some(val) = doc.data.remove(from.as_str()) {
3028 doc.data.insert(to.clone(), val);
3029 let key = format!("{}:{}", name, doc._sid);
3030 db.put(key, serde_json::to_vec(&doc)?, None).await?;
3031 }
3032 }
3033 }
3034 ast::AlterAction::ModifyField(field) => {
3035 db.modify_field_in_schema(
3036 name,
3037 field.name.clone(),
3038 build_field_def(field),
3039 )
3040 .await?;
3041 }
3042 }
3043 }
3044 }
3045 ast::SchemaOp::DropCollection { name, .. } => {
3046 db.drop_collection_schema(name).await?;
3047 last_collection = name.clone();
3048 }
3049 }
3050 }
3051
3052 Ok(ExecutionResult::Schema(SchemaResult {
3053 operation: "schema".to_string(),
3054 collection: last_collection,
3055 status: "done".to_string(),
3056 }))
3057}
3058
3059fn map_ast_type(anno: &ast::TypeAnnotation) -> FieldType {
3060 let scalar = match anno.name.to_lowercase().as_str() {
3061 "string" => ScalarType::String,
3062 "int" | "integer" => ScalarType::Int,
3063 "float" | "double" => ScalarType::Float,
3064 "bool" | "boolean" => ScalarType::Bool,
3065 "uuid" => ScalarType::Uuid,
3066 "object" => ScalarType::Object,
3067 "array" => ScalarType::Array,
3068 _ => ScalarType::Any,
3069 };
3070
3071 if anno.is_array {
3072 FieldType::Array(scalar)
3073 } else {
3074 match scalar {
3075 ScalarType::Object => FieldType::Object,
3076 ScalarType::Any => FieldType::Any,
3077 _ => FieldType::Scalar(scalar),
3078 }
3079 }
3080}
3081
3082fn parse_validate_directive(
3084 directive: &ast::Directive,
3085) -> Vec<crate::types::FieldValidationConstraint> {
3086 use crate::types::FieldValidationConstraint as FVC;
3087 let mut constraints = Vec::new();
3088 for arg in &directive.arguments {
3089 match arg.name.as_str() {
3090 "format" => {
3091 if let ast::Value::String(s) = &arg.value {
3092 constraints.push(FVC::Format(s.clone()));
3093 }
3094 }
3095 "min" => {
3096 let n = match &arg.value {
3097 ast::Value::Float(f) => Some(*f),
3098 ast::Value::Int(i) => Some(*i as f64),
3099 _ => None,
3100 };
3101 if let Some(n) = n {
3102 constraints.push(FVC::Min(n));
3103 }
3104 }
3105 "max" => {
3106 let n = match &arg.value {
3107 ast::Value::Float(f) => Some(*f),
3108 ast::Value::Int(i) => Some(*i as f64),
3109 _ => None,
3110 };
3111 if let Some(n) = n {
3112 constraints.push(FVC::Max(n));
3113 }
3114 }
3115 "minLength" => {
3116 if let ast::Value::Int(i) = &arg.value {
3117 constraints.push(FVC::MinLength(*i));
3118 }
3119 }
3120 "maxLength" => {
3121 if let ast::Value::Int(i) = &arg.value {
3122 constraints.push(FVC::MaxLength(*i));
3123 }
3124 }
3125 "pattern" => {
3126 if let ast::Value::String(s) = &arg.value {
3127 constraints.push(FVC::Pattern(s.clone()));
3128 }
3129 }
3130 _ => {}
3131 }
3132 }
3133 constraints
3134}
3135
3136fn build_field_def(field: &ast::FieldDef) -> FieldDefinition {
3138 let field_type = map_ast_type(&field.field_type);
3139 let mut indexed = false;
3140 let mut unique = false;
3141 let mut relation = None;
3142 let mut validations = Vec::new();
3143 for directive in &field.directives {
3144 match directive.name.as_str() {
3145 "indexed" | "index" => indexed = true,
3146 "unique" => {
3147 unique = true;
3148 indexed = true;
3149 }
3150 "primary" => {
3151 indexed = true;
3152 unique = true;
3153 }
3154 "relation" => {
3155 let to = directive
3156 .arguments
3157 .iter()
3158 .find(|a| a.name == "to" || a.name == "collection")
3159 .and_then(|a| {
3160 if let ast::Value::String(s) = &a.value {
3161 Some(s.clone())
3162 } else {
3163 None
3164 }
3165 })
3166 .unwrap_or_default();
3167 let key = directive
3168 .arguments
3169 .iter()
3170 .find(|a| a.name == "key" || a.name == "field")
3171 .and_then(|a| {
3172 if let ast::Value::String(s) = &a.value {
3173 Some(s.clone())
3174 } else {
3175 None
3176 }
3177 })
3178 .unwrap_or_else(|| "id".to_string());
3179 if !to.is_empty() {
3180 relation = Some(crate::types::Relation { to, key });
3181 }
3182 }
3183 "validate" => validations.extend(parse_validate_directive(directive)),
3184 _ => {}
3185 }
3186 }
3187 FieldDefinition {
3188 field_type,
3189 unique,
3190 indexed,
3191 nullable: !field.field_type.is_required,
3192 validations,
3193 relation,
3194 }
3195}
3196
3197async fn execute_migration(
3198 db: &Aurora,
3199 migration: &ast::Migration,
3200 _options: &ExecutionOptions,
3201) -> Result<ExecutionResult> {
3202 let mut steps_applied = 0;
3203 let mut last_version = String::new();
3204
3205 for step in &migration.steps {
3206 last_version = step.version.clone();
3207
3208 if db.is_migration_applied(&step.version).await? {
3209 eprintln!(
3210 "[migration] version '{}' already applied — skipping",
3211 step.version
3212 );
3213 continue;
3214 }
3215
3216 eprintln!("[migration] applying version '{}'", step.version);
3217
3218 for action in &step.actions {
3219 match action {
3220 ast::MigrationAction::Schema(schema_op) => {
3221 execute_single_schema_op(db, schema_op).await?;
3222 }
3223 ast::MigrationAction::DataMigration(dm) => {
3224 execute_data_migration(db, dm).await?;
3225 }
3226 }
3227 }
3228
3229 db.mark_migration_applied(&step.version).await?;
3230 steps_applied += 1;
3231 eprintln!("[migration] version '{}' applied", step.version);
3232 }
3233
3234 let status = if steps_applied > 0 {
3235 "applied".to_string()
3236 } else {
3237 "skipped".to_string()
3238 };
3239
3240 Ok(ExecutionResult::Migration(MigrationResult {
3241 version: last_version,
3242 steps_applied,
3243 status,
3244 }))
3245}
3246
3247async fn execute_single_schema_op(db: &Aurora, op: &ast::SchemaOp) -> Result<()> {
3249 match op {
3250 ast::SchemaOp::DefineCollection {
3251 name,
3252 fields,
3253 if_not_exists,
3254 ..
3255 } => {
3256 if *if_not_exists && db.get_collection_definition(name).is_ok() {
3257 return Ok(());
3258 }
3259 let field_defs: Vec<(&str, FieldDefinition)> = fields
3260 .iter()
3261 .map(|f| (f.name.as_str(), build_field_def(f)))
3262 .collect();
3263 db.new_collection(name, field_defs).await?;
3264 }
3265 ast::SchemaOp::AlterCollection { name, actions } => {
3266 for action in actions {
3267 match action {
3268 ast::AlterAction::AddField { field, default } => {
3269 db.add_field_to_schema(name, field.name.clone(), build_field_def(field))
3270 .await?;
3271 if let Some(default_val) = default {
3272 let db_val = aql_value_to_db_value(default_val, &HashMap::new())?;
3273 let docs = db.get_all_collection(name).await?;
3274 for doc in docs {
3275 if !doc.data.contains_key(&field.name) {
3276 db.update_document(
3277 name,
3278 &doc._sid,
3279 vec![(&field.name, db_val.clone())],
3280 )
3281 .await?;
3282 }
3283 }
3284 }
3285 }
3286 ast::AlterAction::DropField(field_name) => {
3287 db.drop_field_from_schema(name, field_name.clone()).await?;
3288 }
3289 ast::AlterAction::RenameField { from, to } => {
3290 db.rename_field_in_schema(name, from.clone(), to.clone())
3291 .await?;
3292 let docs = db.get_all_collection(name).await?;
3294 for mut doc in docs {
3295 if let Some(val) = doc.data.remove(from.as_str()) {
3296 doc.data.insert(to.clone(), val);
3297 let key = format!("{}:{}", name, doc._sid);
3298 db.put(key, serde_json::to_vec(&doc)?, None).await?;
3299 }
3300 }
3301 }
3302 ast::AlterAction::ModifyField(field) => {
3303 db.modify_field_in_schema(name, field.name.clone(), build_field_def(field))
3304 .await?;
3305 }
3306 }
3307 }
3308 }
3309 ast::SchemaOp::DropCollection { name, if_exists } => {
3310 if *if_exists && db.get_collection_definition(name).is_err() {
3311 return Ok(());
3312 }
3313 db.drop_collection_schema(name).await?;
3314 }
3315 }
3316 Ok(())
3317}
3318
3319async fn execute_data_migration(db: &Aurora, dm: &ast::DataMigration) -> Result<()> {
3321 let docs = db.get_all_collection(&dm.collection).await?;
3322
3323 for doc in docs {
3324 for transform in &dm.transforms {
3325 let matches = match &transform.filter {
3326 Some(filter) => {
3327 let compiled = compile_filter(filter)?;
3328 matches_filter(&doc, &compiled, &HashMap::new())
3329 }
3330 None => true,
3331 };
3332
3333 if matches {
3334 let new_value = eval_migration_expr(&transform.expression, &doc);
3335 let mut updates = HashMap::new();
3336 updates.insert(transform.field.clone(), new_value);
3337 db.aql_update_document(&dm.collection, &doc._sid, updates)
3338 .await?;
3339 }
3340 }
3341 }
3342 Ok(())
3343}
3344
3345fn eval_migration_expr(expr: &str, doc: &Document) -> Value {
3355 let expr = expr.trim();
3356
3357 if expr.starts_with('"') && expr.ends_with('"') && expr.len() >= 2 {
3358 return Value::String(expr[1..expr.len() - 1].to_string());
3359 }
3360 if expr == "true" {
3361 return Value::Bool(true);
3362 }
3363 if expr == "false" {
3364 return Value::Bool(false);
3365 }
3366 if expr == "null" {
3367 return Value::Null;
3368 }
3369 if let Ok(n) = expr.parse::<i64>() {
3370 return Value::Int(n);
3371 }
3372 if let Ok(f) = expr.parse::<f64>() {
3373 return Value::Float(f);
3374 }
3375 if let Some(v) = doc.data.get(expr) {
3376 return v.clone();
3377 }
3378
3379 Value::Null
3380}
3381
3382fn extract_filter_from_args(args: &[ast::Argument]) -> Result<Option<AqlFilter>> {
3383 for a in args {
3384 if a.name == "where" || a.name == "filter" {
3385 return Ok(Some(value_to_filter(&a.value)?));
3386 }
3387 }
3388 Ok(None)
3389}
3390
3391fn extract_order_by(args: &[ast::Argument]) -> Vec<ast::Ordering> {
3392 let mut orderings = Vec::new();
3393 for a in args {
3394 if a.name == "orderBy" {
3395 match &a.value {
3396 ast::Value::String(f) => orderings.push(ast::Ordering {
3397 field: f.clone(),
3398 direction: ast::SortDirection::Asc,
3399 }),
3400 ast::Value::Object(obj) => {
3401 if let (Some(ast::Value::String(field_name)), Some(dir_val)) =
3403 (obj.get("field"), obj.get("direction"))
3404 {
3405 let direction = match dir_val {
3406 ast::Value::Enum(s) | ast::Value::String(s) => {
3407 if s.to_uppercase() == "DESC" {
3408 ast::SortDirection::Desc
3409 } else {
3410 ast::SortDirection::Asc
3411 }
3412 }
3413 _ => ast::SortDirection::Asc,
3414 };
3415 orderings.push(ast::Ordering {
3416 field: field_name.clone(),
3417 direction,
3418 });
3419 } else {
3420 for (field, dir_val) in obj {
3422 let direction = match dir_val {
3423 ast::Value::Enum(s) | ast::Value::String(s) => {
3424 if s.to_uppercase() == "DESC" {
3425 ast::SortDirection::Desc
3426 } else {
3427 ast::SortDirection::Asc
3428 }
3429 }
3430 _ => ast::SortDirection::Asc,
3431 };
3432 orderings.push(ast::Ordering {
3433 field: field.clone(),
3434 direction,
3435 });
3436 }
3437 }
3438 }
3439 _ => {}
3440 }
3441 }
3442 }
3443 orderings
3444}
3445
3446fn apply_ordering(docs: &mut [Document], orderings: &[ast::Ordering]) {
3447 docs.sort_by(|a, b| {
3448 for o in orderings {
3449 let cmp = compare_values(a.data.get(&o.field), b.data.get(&o.field));
3450 if cmp != std::cmp::Ordering::Equal {
3451 return match o.direction {
3452 ast::SortDirection::Asc => cmp,
3453 ast::SortDirection::Desc => cmp.reverse(),
3454 };
3455 }
3456 }
3457 std::cmp::Ordering::Equal
3458 });
3459}
3460
3461fn compare_values(a: Option<&Value>, b: Option<&Value>) -> std::cmp::Ordering {
3462 match (a, b) {
3463 (None, None) => std::cmp::Ordering::Equal,
3464 (None, Some(_)) => std::cmp::Ordering::Less,
3465 (Some(_), None) => std::cmp::Ordering::Greater,
3466 (Some(av), Some(bv)) => av.partial_cmp(bv).unwrap_or(std::cmp::Ordering::Equal),
3467 }
3468}
3469
3470pub fn extract_pagination(args: &[ast::Argument]) -> (Option<usize>, usize) {
3471 let (mut limit, mut offset) = (None, 0);
3472 for a in args {
3473 match a.name.as_str() {
3474 "limit" | "first" => {
3475 if let ast::Value::Int(n) = a.value {
3476 limit = Some(n as usize);
3477 }
3478 }
3479 "offset" | "skip" => {
3480 if let ast::Value::Int(n) = a.value {
3481 offset = n as usize;
3482 }
3483 }
3484 _ => {}
3485 }
3486 }
3487 (limit, offset)
3488}
3489
3490fn extract_cursor_pagination(
3491 args: &[ast::Argument],
3492) -> (Option<usize>, Option<String>, Option<usize>, Option<String>) {
3493 let (mut first, mut after, mut last, mut before) = (None, None, None, None);
3494 for a in args {
3495 match a.name.as_str() {
3496 "first" => {
3497 if let ast::Value::Int(n) = a.value {
3498 first = Some(n as usize);
3499 }
3500 }
3501 "after" => {
3502 if let ast::Value::String(s) = &a.value {
3503 after = Some(s.clone());
3504 }
3505 }
3506 "last" => {
3507 if let ast::Value::Int(n) = a.value {
3508 last = Some(n as usize);
3509 }
3510 }
3511 "before" => {
3512 if let ast::Value::String(s) = &a.value {
3513 before = Some(s.clone());
3514 }
3515 }
3516 _ => {}
3517 }
3518 }
3519 (first, after, last, before)
3520}
3521
3522fn execute_connection(
3523 mut docs: Vec<Document>,
3524 sub_fields: &[ast::Field],
3525 limit: Option<usize>,
3526 fragments: &HashMap<String, FragmentDef>,
3527 variables: &HashMap<String, ast::Value>,
3528 options: &ExecutionOptions,
3529) -> Result<QueryResult> {
3530 let has_next_page = if let Some(l) = limit {
3531 docs.len() > l
3532 } else {
3533 false
3534 };
3535
3536 if has_next_page {
3537 docs.truncate(limit.unwrap());
3538 }
3539
3540 let mut edges = Vec::with_capacity(docs.len());
3541 let mut end_cursor = String::new();
3542
3543 let node_fields = if let Some(edges_field) = sub_fields.iter().find(|f| f.name == "edges") {
3545 let edges_sub_fields =
3546 collect_fields(&edges_field.selection_set, fragments, variables, None)
3547 .unwrap_or_default();
3548 if let Some(node_field) = edges_sub_fields.into_iter().find(|f| f.name == "node") {
3549 collect_fields(&node_field.selection_set, fragments, variables, None)
3550 .unwrap_or_default()
3551 } else {
3552 Vec::new()
3553 }
3554 } else {
3555 Vec::new()
3556 };
3557
3558 for doc in docs {
3559 let cursor = doc._sid.clone();
3560 end_cursor = cursor.clone();
3561
3562 let mut edge_data = HashMap::new();
3563 edge_data.insert("cursor".to_string(), Value::String(cursor));
3564
3565 let node_doc = if node_fields.is_empty() {
3566 doc
3567 } else {
3568 apply_projection(doc, &node_fields, options)?
3569 };
3570
3571 edge_data.insert("node".to_string(), Value::Object(node_doc.data));
3572 edges.push(Value::Object(edge_data));
3573 }
3574
3575 let mut page_info = HashMap::new();
3576 page_info.insert("hasNextPage".to_string(), Value::Bool(has_next_page));
3577 page_info.insert("endCursor".to_string(), Value::String(end_cursor));
3578
3579 let mut conn_data = HashMap::new();
3580 conn_data.insert("edges".to_string(), Value::Array(edges));
3581 conn_data.insert("pageInfo".to_string(), Value::Object(page_info));
3582
3583 Ok(QueryResult {
3584 collection: String::new(),
3585 documents: vec![Document {
3586 _sid: "connection".to_string(),
3587 data: conn_data,
3588 }],
3589 total_count: None,
3590 deferred_fields: vec![],
3591 explain: None,
3592 })
3593}
3594
3595pub fn matches_filter(
3596 doc: &Document,
3597 filter: &CompiledFilter,
3598 vars: &HashMap<String, ast::Value>,
3599) -> bool {
3600 match filter {
3601 CompiledFilter::Eq(f, v) => {
3602 if let Some(dv) = doc.data.get(f) {
3603 values_equal(dv, v, vars)
3604 } else if f == "_sid" {
3605 values_equal(&Value::String(doc._sid.clone()), v, vars)
3606 } else {
3607 false
3608 }
3609 }
3610 CompiledFilter::Ne(f, v) => {
3611 if let Some(dv) = doc.data.get(f) {
3612 !values_equal(dv, v, vars)
3613 } else if f == "_sid" {
3614 !values_equal(&Value::String(doc._sid.clone()), v, vars)
3615 } else {
3616 true
3617 }
3618 }
3619 CompiledFilter::Gt(f, v) => {
3620 let dv_opt = doc.data.get(f).cloned().or_else(|| {
3621 if f == "_sid" {
3622 Some(Value::String(doc._sid.clone()))
3623 } else {
3624 None
3625 }
3626 });
3627 dv_opt.map_or(false, |dv| {
3628 if let Ok(bv) = aql_value_to_db_value(v, vars) {
3629 return dv > bv;
3630 }
3631 false
3632 })
3633 }
3634 CompiledFilter::Gte(f, v) => {
3635 let dv_opt = doc.data.get(f).cloned().or_else(|| {
3636 if f == "_sid" {
3637 Some(Value::String(doc._sid.clone()))
3638 } else {
3639 None
3640 }
3641 });
3642 dv_opt.map_or(false, |dv| {
3643 if let Ok(bv) = aql_value_to_db_value(v, vars) {
3644 return dv >= bv;
3645 }
3646 false
3647 })
3648 }
3649 CompiledFilter::Lt(f, v) => {
3650 let dv_opt = doc.data.get(f).cloned().or_else(|| {
3651 if f == "_sid" {
3652 Some(Value::String(doc._sid.clone()))
3653 } else {
3654 None
3655 }
3656 });
3657 dv_opt.map_or(false, |dv| {
3658 if let Ok(bv) = aql_value_to_db_value(v, vars) {
3659 return dv < bv;
3660 }
3661 false
3662 })
3663 }
3664 CompiledFilter::Lte(f, v) => {
3665 let dv_opt = doc.data.get(f).cloned().or_else(|| {
3666 if f == "_sid" {
3667 Some(Value::String(doc._sid.clone()))
3668 } else {
3669 None
3670 }
3671 });
3672 dv_opt.map_or(false, |dv| {
3673 if let Ok(bv) = aql_value_to_db_value(v, vars) {
3674 return dv <= bv;
3675 }
3676 false
3677 })
3678 }
3679 CompiledFilter::In(f, v) => {
3680 let dv_opt = doc.data.get(f).cloned().or_else(|| {
3681 if f == "_sid" {
3682 Some(Value::String(doc._sid.clone()))
3683 } else {
3684 None
3685 }
3686 });
3687 dv_opt.map_or(false, |dv| {
3688 if let Ok(Value::Array(arr)) = aql_value_to_db_value(v, vars) {
3689 return arr.contains(&dv);
3690 }
3691 false
3692 })
3693 }
3694 CompiledFilter::NotIn(f, v) => {
3695 let dv_opt = doc.data.get(f).cloned().or_else(|| {
3696 if f == "_sid" {
3697 Some(Value::String(doc._sid.clone()))
3698 } else {
3699 None
3700 }
3701 });
3702 dv_opt.map_or(true, |dv| {
3703 if let Ok(Value::Array(arr)) = aql_value_to_db_value(v, vars) {
3704 return !arr.contains(&dv);
3705 }
3706 true
3707 })
3708 }
3709 CompiledFilter::Contains(f, v) => {
3710 let dv_opt = doc.data.get(f).cloned().or_else(|| {
3711 if f == "_sid" {
3712 Some(Value::String(doc._sid.clone()))
3713 } else {
3714 None
3715 }
3716 });
3717 if let Some(dv) = dv_opt {
3718 match (dv, resolve_if_variable(v, vars)) {
3719 (Value::String(s), ast::Value::String(sub)) => s.contains(&*sub),
3720 (Value::Array(arr), _) => {
3721 if let Ok(bv) = aql_value_to_db_value(v, vars) {
3722 return arr.contains(&bv);
3723 }
3724 false
3725 }
3726 _ => false,
3727 }
3728 } else {
3729 false
3730 }
3731 }
3732 CompiledFilter::ContainsAny(f, v) => {
3734 let dv_opt = doc.data.get(f).cloned().or_else(|| {
3735 if f == "_sid" {
3736 Some(Value::String(doc._sid.clone()))
3737 } else {
3738 None
3739 }
3740 });
3741 if let (Some(Value::Array(field_arr)), Ok(Value::Array(check_arr))) =
3742 (dv_opt, aql_value_to_db_value(v, vars))
3743 {
3744 check_arr.iter().any(|item| field_arr.contains(item))
3745 } else {
3746 false
3747 }
3748 }
3749 CompiledFilter::ContainsAll(f, v) => {
3751 let dv_opt = doc.data.get(f).cloned().or_else(|| {
3752 if f == "_sid" {
3753 Some(Value::String(doc._sid.clone()))
3754 } else {
3755 None
3756 }
3757 });
3758 if let (Some(Value::Array(field_arr)), Ok(Value::Array(check_arr))) =
3759 (dv_opt, aql_value_to_db_value(v, vars))
3760 {
3761 check_arr.iter().all(|item| field_arr.contains(item))
3762 } else {
3763 false
3764 }
3765 }
3766 CompiledFilter::StartsWith(f, v) => {
3767 let dv_opt = doc.data.get(f).cloned().or_else(|| {
3768 if f == "_sid" {
3769 Some(Value::String(doc._sid.clone()))
3770 } else {
3771 None
3772 }
3773 });
3774 if let (Some(Value::String(s)), ast::Value::String(pre)) =
3775 (dv_opt, resolve_if_variable(v, vars))
3776 {
3777 s.starts_with(&*pre)
3778 } else {
3779 false
3780 }
3781 }
3782 CompiledFilter::EndsWith(f, v) => {
3783 let dv_opt = doc.data.get(f).cloned().or_else(|| {
3784 if f == "_sid" {
3785 Some(Value::String(doc._sid.clone()))
3786 } else {
3787 None
3788 }
3789 });
3790 if let (Some(Value::String(s)), ast::Value::String(suf)) =
3791 (dv_opt, resolve_if_variable(v, vars))
3792 {
3793 s.ends_with(&*suf)
3794 } else {
3795 false
3796 }
3797 }
3798 CompiledFilter::Matches(f, re) => {
3799 let dv_opt = doc.data.get(f).cloned().or_else(|| {
3800 if f == "_sid" {
3801 Some(Value::String(doc._sid.clone()))
3802 } else {
3803 None
3804 }
3805 });
3806 if let Some(Value::String(s)) = dv_opt {
3807 re.is_match(&s)
3808 } else {
3809 false
3810 }
3811 }
3812 CompiledFilter::IsNull(f) => {
3813 if f == "_sid" {
3814 false } else {
3816 doc.data.get(f).map_or(true, |v| matches!(v, Value::Null))
3817 }
3818 }
3819 CompiledFilter::IsNotNull(f) => {
3820 if f == "_sid" {
3821 true } else {
3823 doc.data.get(f).map_or(false, |v| !matches!(v, Value::Null))
3824 }
3825 }
3826 CompiledFilter::And(fs) => fs.iter().all(|f| matches_filter(doc, f, vars)),
3827 CompiledFilter::Or(fs) => fs.iter().any(|f| matches_filter(doc, f, vars)),
3828 CompiledFilter::Not(f) => !matches_filter(doc, f, vars),
3829 }
3830}
3831
3832fn apply_field_modifier(existing: Option<&Value>, new_val: &Value) -> Value {
3837 if let Value::Object(modifier) = new_val {
3838 if let Some(delta) = modifier.get("increment") {
3839 match (existing, delta) {
3840 (Some(Value::Int(c)), Value::Int(d)) => return Value::Int(c + d),
3841 (Some(Value::Float(c)), Value::Float(d)) => return Value::Float(c + d),
3842 (Some(Value::Int(c)), Value::Float(d)) => return Value::Float(*c as f64 + d),
3843 _ => {}
3844 }
3845 }
3846 if let Some(delta) = modifier.get("decrement") {
3847 match (existing, delta) {
3848 (Some(Value::Int(c)), Value::Int(d)) => return Value::Int(c - d),
3849 (Some(Value::Float(c)), Value::Float(d)) => return Value::Float(c - d),
3850 (Some(Value::Int(c)), Value::Float(d)) => return Value::Float(*c as f64 - d),
3851 _ => {}
3852 }
3853 }
3854 if let Some(item) = modifier.get("push") {
3855 if let Some(Value::Array(mut arr)) = existing.cloned() {
3856 arr.push(item.clone());
3857 return Value::Array(arr);
3858 }
3859 return Value::Array(vec![item.clone()]);
3860 }
3861 if let Some(item) = modifier.get("pull") {
3862 if let Some(Value::Array(arr)) = existing {
3863 let filtered: Vec<Value> = arr.iter().filter(|v| *v != item).cloned().collect();
3864 return Value::Array(filtered);
3865 }
3866 return Value::Array(vec![]);
3867 }
3868 if let Some(item) = modifier.get("addToSet") {
3869 if let Some(Value::Array(mut arr)) = existing.cloned() {
3870 if !arr.contains(item) {
3871 arr.push(item.clone());
3872 }
3873 return Value::Array(arr);
3874 }
3875 return Value::Array(vec![item.clone()]);
3876 }
3877 }
3878 new_val.clone()
3879}
3880
3881fn values_equal(dv: &Value, av: &ast::Value, vars: &HashMap<String, ast::Value>) -> bool {
3882 if let Ok(bv) = aql_value_to_db_value(av, vars) {
3883 return dv == &bv;
3884 }
3885 false
3886}
3887
3888fn resolve_if_variable<'a>(
3889 v: &'a ast::Value,
3890 vars: &'a HashMap<String, ast::Value>,
3891) -> &'a ast::Value {
3892 if let ast::Value::Variable(n) = v {
3893 vars.get(n).unwrap_or(v)
3894 } else {
3895 v
3896 }
3897}
3898
3899pub fn apply_projection(
3900 doc: Document,
3901 fields: &[ast::Field],
3902 options: &ExecutionOptions,
3903) -> Result<Document> {
3904 let (projected, _) = apply_projection_and_defer(doc, fields, &options.variables, options)?;
3905 Ok(projected)
3906}
3907
3908async fn apply_projection_with_lookups(
3911 db: &Aurora,
3912 mut doc: Document,
3913 collection_name: &str,
3914 fields: &[ast::Field],
3915 fragments: &HashMap<String, FragmentDef>,
3916 vars: &HashMap<String, ast::Value>,
3917 options: &ExecutionOptions,
3918 pinned_fields: &[String],
3919) -> Result<(Document, Vec<String>)> {
3920 if fields.is_empty() {
3921 return Ok((doc, vec![]));
3922 }
3923 let mut proj = HashMap::new();
3924 let mut deferred = Vec::new();
3925
3926 for f in fields {
3928 check_permissions(&f.directives, options, false)?;
3930
3931 if f.directives.iter().any(|d| d.name == "defer") {
3933 deferred.push(f.alias.as_ref().unwrap_or(&f.name).clone());
3934 continue;
3935 }
3936
3937 let mut lookup_info: Option<(String, String, String)> = None;
3938 let coll_arg = f.arguments.iter().find(|a| a.name == "collection");
3939 let local_arg = f.arguments.iter().find(|a| a.name == "localField");
3940 let foreign_arg = f.arguments.iter().find(|a| a.name == "foreignField");
3941
3942 if let (Some(c), Some(lf), Some(ff)) = (coll_arg, local_arg, foreign_arg) {
3943 let c_val = resolve_if_variable(&c.value, vars);
3944 let lf_val = resolve_if_variable(&lf.value, vars);
3945 let ff_val = resolve_if_variable(&ff.value, vars);
3946
3947 if let (
3948 ast::Value::String(foreign_coll),
3949 ast::Value::String(local_field),
3950 ast::Value::String(foreign_field),
3951 ) = (c_val, lf_val, ff_val)
3952 {
3953 lookup_info = Some((
3954 foreign_coll.clone(),
3955 local_field.clone(),
3956 foreign_field.clone(),
3957 ));
3958 }
3959 } else if let Ok(col_def) = db.get_collection_definition(collection_name) {
3960 if let Some(f_def) = col_def.fields.get(&f.name) {
3961 if let Some(rel) = &f_def.relation {
3962 lookup_info = Some((rel.to.clone(), f.name.clone(), rel.key.clone()));
3963 }
3964 }
3965 }
3966
3967 if let Some((foreign_coll, local_field, foreign_field)) = lookup_info {
3968 let local_val = doc.data.get(local_field.as_str()).cloned().or_else(|| {
3969 if local_field == "_sid" {
3970 Some(Value::String(doc._sid.clone()))
3971 } else {
3972 None
3973 }
3974 });
3975
3976 if let Some(match_val) = local_val {
3977 let extra_filter = f
3978 .arguments
3979 .iter()
3980 .find(|a| a.name == "where")
3981 .and_then(|a| {
3982 let resolved = resolve_ast_deep(&a.value, vars);
3983 value_to_filter(&resolved).ok()
3984 });
3985
3986 let vars_arc = Arc::new(vars.clone());
3987 let foreign_docs = db.scan_and_filter(
3988 &foreign_coll,
3989 |fdoc| {
3990 let field_match = fdoc
3991 .data
3992 .get(foreign_field.as_str())
3993 .map(|v| values_equal_db(v, &match_val))
3994 .unwrap_or(
3995 foreign_field == "_sid" && fdoc._sid == match_val.to_string(),
3996 );
3997 if !field_match {
3998 return false;
3999 }
4000 if let Some(ref ef) = extra_filter {
4001 let compiled = compile_filter(ef)
4002 .unwrap_or(CompiledFilter::Eq("_".into(), ast::Value::Null));
4003 matches_filter(fdoc, &compiled, &vars_arc)
4004 } else {
4005 true
4006 }
4007 },
4008 None,
4009 )?;
4010
4011 let sub_projected: Vec<Value> = if f.selection_set.is_empty() {
4012 foreign_docs
4013 .into_iter()
4014 .map(|fd| Value::Object(fd.data))
4015 .collect()
4016 } else {
4017 let sub_fields: Vec<ast::Field> =
4018 collect_fields(&f.selection_set, fragments, vars, Some(&foreign_coll))?;
4019 let mut sub_results = Vec::with_capacity(foreign_docs.len());
4020 for fd in foreign_docs {
4021 let (proj_fd, _) =
4022 apply_projection_and_defer(fd, &sub_fields, vars, options)?;
4023 sub_results.push(Value::Object(proj_fd.data));
4024 }
4025 sub_results
4026 };
4027
4028 proj.insert(
4029 f.alias.as_ref().unwrap_or(&f.name).clone(),
4030 Value::Array(sub_projected),
4031 );
4032 }
4033 continue;
4034 }
4035
4036 if f.name == "__compute__" {
4038 let alias = f.alias.as_deref().unwrap_or("computed");
4039 if let Some(expr_arg) = f.arguments.iter().find(|a| a.name == "expr") {
4040 if let ast::Value::String(template) = &expr_arg.value {
4041 let result = eval_template(template, &doc.data);
4042 proj.insert(alias.to_string(), Value::String(result));
4043 } else if let Some(expr) = f.computed_expression.as_ref() {
4044 let result = eval_expression(expr, &doc.data, vars);
4045 proj.insert(alias.to_string(), result);
4046 }
4047 }
4048 continue;
4049 }
4050
4051 if f.name == "id" {
4053 let v = doc
4054 .data
4055 .get("id")
4056 .cloned()
4057 .unwrap_or(Value::String(doc._sid.clone()));
4058 proj.insert(f.alias.as_ref().unwrap_or(&f.name).clone(), v);
4059 } else if f.name == "_sid" {
4060 proj.insert(
4061 f.alias.as_ref().unwrap_or(&f.name).clone(),
4062 Value::String(doc._sid.clone()),
4063 );
4064 } else if let Some(v) = doc.data.get(&f.name) {
4065 proj.insert(f.alias.as_ref().unwrap_or(&f.name).clone(), v.clone());
4066 }
4067 }
4068
4069 for pinned in pinned_fields {
4071 if !proj.contains_key(pinned) {
4072 if let Some(v) = doc.data.get(pinned) {
4073 proj.insert(pinned.clone(), v.clone());
4074 }
4075 }
4076 }
4077
4078 doc.data = proj;
4079 Ok((doc, deferred))
4080}
4081
4082fn values_equal_db(a: &Value, b: &Value) -> bool {
4083 a == b
4084}
4085
4086pub fn aql_value_to_db_value(v: &ast::Value, vars: &HashMap<String, ast::Value>) -> Result<Value> {
4087 let resolved = resolve_if_variable(v, vars);
4088 match resolved {
4089 ast::Value::Int(i) => Ok(Value::Int(*i)),
4090 ast::Value::Float(f) => Ok(Value::Float(*f)),
4091 ast::Value::Boolean(b) => Ok(Value::Bool(*b)),
4092 ast::Value::String(s) => Ok(Value::String(s.clone())),
4093 ast::Value::Enum(s) => Ok(Value::String(s.clone())),
4094 ast::Value::Null => Ok(Value::Null),
4095 ast::Value::Variable(name) => Err(AqlError::new(
4096 ErrorCode::UndefinedVariable,
4097 format!("Variable '{}' not found", name),
4098 )),
4099 ast::Value::Array(arr) => {
4100 let mut vals = Vec::with_capacity(arr.len());
4101 for v in arr {
4102 vals.push(aql_value_to_db_value(v, vars)?);
4103 }
4104 Ok(Value::Array(vals))
4105 }
4106 ast::Value::Object(obj) => {
4107 let mut map = HashMap::with_capacity(obj.len());
4108 for (k, v) in obj {
4109 map.insert(k.clone(), aql_value_to_db_value(v, vars)?);
4110 }
4111 Ok(Value::Object(map))
4112 }
4113 }
4114}
4115
4116fn aql_value_to_json(v: &ast::Value) -> serde_json::Value {
4118 match v {
4119 ast::Value::Null => serde_json::Value::Null,
4120 ast::Value::Boolean(b) => serde_json::Value::Bool(*b),
4121 ast::Value::Int(i) => serde_json::Value::Number((*i).into()),
4122 ast::Value::Float(f) => serde_json::Number::from_f64(*f)
4123 .map(serde_json::Value::Number)
4124 .unwrap_or(serde_json::Value::Null),
4125 ast::Value::String(s) | ast::Value::Enum(s) => serde_json::Value::String(s.clone()),
4126 ast::Value::Array(arr) => {
4127 serde_json::Value::Array(arr.iter().map(aql_value_to_json).collect())
4128 }
4129 ast::Value::Object(obj) => serde_json::Value::Object(
4130 obj.iter()
4131 .map(|(k, v)| (k.clone(), aql_value_to_json(v)))
4132 .collect(),
4133 ),
4134 ast::Value::Variable(_) => serde_json::Value::Null,
4135 }
4136}
4137
4138fn aql_value_to_hashmap(
4139 v: &ast::Value,
4140 vars: &HashMap<String, ast::Value>,
4141) -> Result<HashMap<String, Value>> {
4142 if let ast::Value::Object(m) = resolve_if_variable(v, vars) {
4143 let mut res = HashMap::new();
4144 for (k, v) in m {
4145 res.insert(k.clone(), aql_value_to_db_value(v, vars)?);
4146 }
4147 Ok(res)
4148 } else {
4149 Err(AqlError::new(
4150 ErrorCode::QueryError,
4151 "Data must be object".to_string(),
4152 ))
4153 }
4154}
4155
4156fn aurora_value_to_json_value(v: &Value) -> JsonValue {
4157 match v {
4158 Value::Null => JsonValue::Null,
4159 Value::String(s) => JsonValue::String(s.clone()),
4160 Value::Int(i) => JsonValue::Number((*i).into()),
4161 Value::Float(f) => serde_json::Number::from_f64(*f)
4162 .map(JsonValue::Number)
4163 .unwrap_or(JsonValue::Null),
4164 Value::Bool(b) => JsonValue::Bool(*b),
4165 Value::Array(arr) => JsonValue::Array(arr.iter().map(aurora_value_to_json_value).collect()),
4166 Value::Object(m) => {
4167 let mut jm = serde_json::Map::new();
4168 for (k, v) in m {
4169 jm.insert(k.clone(), aurora_value_to_json_value(v));
4170 }
4171 JsonValue::Object(jm)
4172 }
4173 Value::Uuid(u) => JsonValue::String(u.to_string()),
4174 Value::DateTime(dt) => JsonValue::String(dt.to_rfc3339()),
4175 }
4176}
4177
4178fn find_indexed_equality_filter(
4180 filter: &AqlFilter,
4181 db: &Aurora,
4182 collection: &str,
4183) -> Option<(String, ast::Value)> {
4184 match filter {
4185 AqlFilter::Eq(field, val) => {
4186 if field == "_sid" || db.has_index(collection, field) {
4187 Some((field.clone(), val.clone()))
4188 } else {
4189 None
4190 }
4191 }
4192 AqlFilter::And(filters) => {
4193 for f in filters {
4194 if let Some(res) = find_indexed_equality_filter(f, db, collection) {
4195 return Some(res);
4196 }
4197 }
4198 None
4199 }
4200 _ => None,
4201 }
4202}
4203
4204pub fn value_to_filter(v: &ast::Value) -> Result<AqlFilter> {
4205 if let ast::Value::Object(m) = v {
4206 let mut fs = Vec::new();
4207 for (k, val) in m {
4208 match k.as_str() {
4209 "or" => {
4210 if let ast::Value::Array(arr) = val {
4211 let mut sub = Vec::new();
4212 for item in arr {
4213 sub.push(value_to_filter(item)?);
4214 }
4215 return Ok(AqlFilter::Or(sub));
4216 }
4217 }
4218 "and" => {
4219 if let ast::Value::Array(arr) = val {
4220 let mut sub = Vec::new();
4221 for item in arr {
4222 sub.push(value_to_filter(item)?);
4223 }
4224 return Ok(AqlFilter::And(sub));
4225 }
4226 }
4227 "not" => {
4228 return Ok(AqlFilter::Not(Box::new(value_to_filter(val)?)));
4229 }
4230 field => {
4231 if let ast::Value::Object(ops) = val {
4232 for (op, ov) in ops {
4233 match op.as_str() {
4234 "eq" => fs.push(AqlFilter::Eq(field.to_string(), ov.clone())),
4235 "ne" => fs.push(AqlFilter::Ne(field.to_string(), ov.clone())),
4236 "gt" => fs.push(AqlFilter::Gt(field.to_string(), ov.clone())),
4237 "gte" => fs.push(AqlFilter::Gte(field.to_string(), ov.clone())),
4238 "lt" => fs.push(AqlFilter::Lt(field.to_string(), ov.clone())),
4239 "lte" => fs.push(AqlFilter::Lte(field.to_string(), ov.clone())),
4240 "in" => fs.push(AqlFilter::In(field.to_string(), ov.clone())),
4241 "notin" => fs.push(AqlFilter::NotIn(field.to_string(), ov.clone())),
4242 "contains" => {
4243 fs.push(AqlFilter::Contains(field.to_string(), ov.clone()))
4244 }
4245 "containsAny" => {
4246 fs.push(AqlFilter::ContainsAny(field.to_string(), ov.clone()))
4247 }
4248 "containsAll" => {
4249 fs.push(AqlFilter::ContainsAll(field.to_string(), ov.clone()))
4250 }
4251 "startsWith" => {
4252 fs.push(AqlFilter::StartsWith(field.to_string(), ov.clone()))
4253 }
4254 "endsWith" => {
4255 fs.push(AqlFilter::EndsWith(field.to_string(), ov.clone()))
4256 }
4257 "matches" => {
4258 fs.push(AqlFilter::Matches(field.to_string(), ov.clone()))
4259 }
4260 _ => {}
4261 }
4262 }
4263 }
4264 }
4265 }
4266 }
4267 if fs.is_empty() {
4268 Ok(AqlFilter::And(vec![]))
4269 } else if fs.len() == 1 {
4270 Ok(fs.remove(0))
4271 } else {
4272 Ok(AqlFilter::And(fs))
4273 }
4274 } else {
4275 Err(AqlError::new(
4276 ErrorCode::QueryError,
4277 "Filter must be object".to_string(),
4278 ))
4279 }
4280}
4281
4282fn resolve_value(
4283 v: &ast::Value,
4284 vars: &HashMap<String, ast::Value>,
4285 _ctx: &ExecutionContext,
4286) -> ast::Value {
4287 match v {
4288 ast::Value::Variable(n) => vars.get(n).cloned().unwrap_or(ast::Value::Null),
4289 _ => v.clone(),
4290 }
4291}
4292
4293fn resolve_ast_deep(v: &ast::Value, vars: &HashMap<String, ast::Value>) -> ast::Value {
4298 match v {
4299 ast::Value::Variable(n) => vars.get(n).cloned().unwrap_or(ast::Value::Null),
4300 ast::Value::Object(m) => ast::Value::Object(
4301 m.iter()
4302 .map(|(k, val)| (k.clone(), resolve_ast_deep(val, vars)))
4303 .collect(),
4304 ),
4305 ast::Value::Array(arr) => {
4306 ast::Value::Array(arr.iter().map(|val| resolve_ast_deep(val, vars)).collect())
4307 }
4308 _ => v.clone(),
4309 }
4310}
4311
4312#[cfg(test)]
4313mod tests {
4314 use super::*;
4315 use crate::{Aurora, AuroraConfig, DurabilityMode, FieldType};
4316 use tempfile::TempDir;
4317
4318 #[tokio::test]
4319 async fn test_executor_integration() {
4320 let td = TempDir::new().unwrap();
4321 let db = Aurora::with_config(AuroraConfig {
4322 db_path: td.path().join("test.db"),
4323 enable_write_buffering: false,
4324 durability_mode: DurabilityMode::Strict,
4325 ..Default::default()
4326 })
4327 .await
4328 .unwrap();
4329 db.new_collection(
4330 "users",
4331 vec![(
4332 "name",
4333 crate::types::FieldDefinition {
4334 field_type: FieldType::SCALAR_STRING,
4335 unique: false,
4336 indexed: true,
4337 nullable: true,
4338 ..Default::default()
4339 },
4340 )],
4341 )
4342 .await
4343 .unwrap();
4344 let _ = execute(
4345 &db,
4346 r#"mutation { insertInto(collection: "users", data: { name: "Alice" }) { id name } }"#,
4347 ExecutionOptions::new(),
4348 )
4349 .await
4350 .unwrap();
4351 let res = execute(&db, "query { users { name } }", ExecutionOptions::new())
4352 .await
4353 .unwrap();
4354 if let ExecutionResult::Query(q) = res {
4355 assert_eq!(q.documents.len(), 1);
4356 } else {
4357 panic!("Expected query");
4358 }
4359 }
4360}