1use super::ast::{self, Field, Filter as AqlFilter, FragmentDef, MutationOp, Operation, Selection};
7use super::executor_utils::{CompiledFilter, compile_filter};
8
9use crate::Aurora;
10use crate::error::{AqlError, ErrorCode, Result};
11use crate::types::{Document, FieldDefinition, FieldType, ScalarType, Value};
12use serde::Serialize;
13use serde_json::Value as JsonValue;
14use std::collections::HashMap;
15use std::sync::Arc;
16
17#[allow(unused_imports)]
19use chrono::TimeZone as _;
20
21pub type ExecutionContext = HashMap<String, JsonValue>;
22
23#[derive(Debug, Clone)]
25pub struct QueryPlan {
26 pub collection: String,
27 pub filter: Option<ast::Filter>,
28 pub compiled_filter: Option<CompiledFilter>,
29 pub projection: Vec<ast::Field>,
30 pub limit: Option<usize>,
31 pub offset: usize,
32 pub after: Option<String>,
33 pub orderings: Vec<ast::Ordering>,
34 pub is_connection: bool,
35 pub has_lookups: bool,
36 pub fragments: HashMap<String, FragmentDef>,
37 pub variable_definitions: Vec<ast::VariableDefinition>,
38 pub directives: Vec<ast::Directive>,
39 pub lookup_dependencies: Vec<String>,
41}
42
43impl QueryPlan {
44 pub fn validate(&self, provided_variables: &HashMap<String, ast::Value>) -> Result<()> {
45 validate_required_variables(&self.variable_definitions, provided_variables)
46 }
47
48 pub fn from_query(
49 db: &Aurora,
50 query: &ast::Query,
51 fragments: &HashMap<String, FragmentDef>,
52 variables: &HashMap<String, ast::Value>,
53 ) -> Result<Vec<Self>> {
54 let root_fields = collect_fields(&query.selection_set, fragments, variables, None)?;
55
56 let mut plans = Vec::new();
57 for field in root_fields {
58 let collection = field.name.clone();
59 let filter = extract_filter_from_args(&field.arguments)?;
60 let compiled_filter = if let Some(ref f) = filter {
61 Some(compile_filter(f)?)
62 } else {
63 None
64 };
65
66 let sub_fields = collect_fields(
67 &field.selection_set,
68 fragments,
69 variables,
70 Some(&field.name),
71 )?;
72
73 let (limit, offset) = extract_pagination(&field.arguments);
74 let (first, after, _last, _before) = extract_cursor_pagination(&field.arguments);
75 let orderings = extract_order_by(&field.arguments);
76 let is_connection = sub_fields
77 .iter()
78 .any(|f| f.name == "edges" || f.name == "pageInfo");
79
80 let lookup_dependencies =
81 identify_lookup_dependencies(db, &collection, &sub_fields, variables);
82 let has_lookups = !lookup_dependencies.is_empty();
83
84 plans.push(QueryPlan {
85 collection,
86 filter,
87 compiled_filter,
88 projection: sub_fields,
89 limit: limit.or(first),
90 offset,
91 after,
92 orderings,
93 is_connection,
94 has_lookups,
95 fragments: fragments.clone(),
96 variable_definitions: query.variable_definitions.clone(),
97 directives: field.directives.clone(),
98 lookup_dependencies,
99 });
100 }
101 Ok(plans)
102 }
103}
104
105pub async fn execute(db: &Aurora, aql: &str, options: ExecutionOptions) -> Result<ExecutionResult> {
107 use std::collections::hash_map::DefaultHasher;
108 use std::hash::{Hash, Hasher};
109
110 let query_key = {
112 let mut hasher = DefaultHasher::new();
113 aql.trim().hash(&mut hasher);
114 hasher.finish()
115 };
116
117 let vars: HashMap<String, ast::Value> = options.variables.clone();
119
120 if let Some(plan) = db.plan_cache.get(&query_key) {
123 plan.validate(&vars)?;
124 return execute_plan(db, &plan, &vars, &options).await;
125 }
126
127 let mut doc = super::parse(aql)?;
129 println!("Parsed document with {} operations", doc.operations.len());
130
131 if let Some(Operation::Query(query)) = doc
133 .operations
134 .iter()
135 .find(|op| matches!(op, Operation::Query(_)))
136 {
137 let fragments: HashMap<String, FragmentDef> = doc
138 .operations
139 .iter()
140 .filter_map(|op| {
141 if let Operation::FragmentDefinition(f) = op {
142 Some((f.name.clone(), f.clone()))
143 } else {
144 None
145 }
146 })
147 .collect();
148
149 let root_fields = collect_fields(&query.selection_set, &fragments, &vars, None)?;
154 let has_search = root_fields
155 .iter()
156 .any(|f| f.arguments.iter().any(|a| a.name == "search"));
157
158 if !has_search {
159 let plans = QueryPlan::from_query(db, query, &fragments, &vars)?;
160 if plans.len() == 1 {
161 let plan = Arc::new(plans[0].clone());
162 plan.validate(&vars)?;
163 db.plan_cache.insert(query_key, Arc::clone(&plan));
164
165 return execute_plan(db, &plan, &vars, &options).await;
166 }
167 }
168 }
169
170 let mut vars = vars;
173 for op in &doc.operations {
174 if let Operation::Query(q) = op {
175 for var_def in &q.variable_definitions {
176 if !vars.contains_key(&var_def.name) {
177 if let Some(default) = &var_def.default_value {
178 vars.insert(var_def.name.clone(), default.clone());
179 }
180 }
181 }
182 }
183 }
184
185 super::validator::resolve_variables(&mut doc, &vars).map_err(|e| {
187 let code = match e.code {
188 super::validator::ErrorCode::MissingRequiredVariable => ErrorCode::UndefinedVariable,
189 super::validator::ErrorCode::TypeMismatch => ErrorCode::TypeError,
190 _ => ErrorCode::QueryError,
191 };
192 AqlError::new(code, e.to_string())
193 })?;
194
195 execute_document(db, &doc, &options).await
196}
197
198pub async fn execute_plan(
199 db: &Aurora,
200 plan: &QueryPlan,
201 variables: &HashMap<String, ast::Value>,
202 options: &ExecutionOptions,
203) -> Result<ExecutionResult> {
204 check_permissions(&plan.directives, options, false)?;
206
207 let collection_name = &plan.collection;
208
209 let effective_limit = plan.limit;
211
212 let mut indexed_docs = None;
214 if let Some(ref f) = plan.filter {
215 if let Some((field, val)) =
217 find_indexed_equality_filter_runtime(f, db, collection_name, variables)
218 {
219 let index_ready = field == "_sid" || !db.is_index_building(collection_name, &field);
222 if index_ready {
223 let mut db_val = aql_value_to_db_value(&val, variables)?;
224
225 if let Ok(col_def) = db.get_collection_definition(collection_name) {
227 if let Some(f_def) = col_def.fields.get(&field) {
228 db_val = db_val.coerce_to(&f_def.field_type);
229 }
230 }
231
232 let ids = if field == "_sid" {
233 match &db_val {
234 Value::String(s) => vec![s.clone()],
235 Value::Uuid(u) => vec![u.to_string()],
236 _ => vec![],
237 }
238 } else {
239 db.get_ids_from_index(collection_name, &field, &db_val)
240 };
241
242 let mut docs = Vec::with_capacity(ids.len());
243 for id in ids {
244 if let Some(doc) = db.get_document(collection_name, &id)? {
245 if let Some(ref cf) = plan.compiled_filter {
247 if matches_filter(&doc, cf, variables) {
248 docs.push(doc);
249 }
250 } else {
251 docs.push(doc);
252 }
253 }
254 }
255
256 indexed_docs = Some(docs);
257 } }
259 }
260
261 let mut docs = if let Some(d) = indexed_docs {
262 d
263 } else {
264 let vars_arc = Arc::new(variables.clone());
266 let cf_clone = plan.compiled_filter.clone();
267 let filter_fn = move |doc: &Document| {
268 cf_clone
269 .as_ref()
270 .map(|f| matches_filter(doc, f, &vars_arc))
271 .unwrap_or(true)
272 };
273
274 let scan_limit = if plan.after.is_some() || !plan.orderings.is_empty() {
277 None
278 } else {
279 plan.limit.map(|l| {
280 let base = if plan.is_connection { l + 1 } else { l };
281 base + plan.offset
282 })
283 };
284 db.scan_and_filter(collection_name, filter_fn, scan_limit)?
285 };
286
287 if !plan.orderings.is_empty() {
289 apply_ordering(&mut docs, &plan.orderings);
290 }
291
292 if let Some(ref cursor) = plan.after {
294 if let Some(pos) = docs.iter().position(|d| &d._sid == cursor) {
295 docs.drain(0..=pos);
296 }
297 }
298
299 if plan.offset > 0 {
301 if plan.offset < docs.len() {
302 docs.drain(0..plan.offset);
303 } else {
304 docs.clear();
305 }
306 }
307
308 if plan.is_connection {
310 return Ok(ExecutionResult::Query(execute_connection(
311 docs,
312 &plan.projection,
313 effective_limit,
314 &plan.fragments,
315 variables,
316 options,
317 )?));
318 }
319
320 if let Some(l) = effective_limit {
322 docs.truncate(l);
323 }
324
325 if options.apply_projections && !plan.projection.is_empty() {
327 let agg_field = plan.projection.iter().find(|f| f.name == "aggregate");
329 let group_by_field = plan.projection.iter().find(|f| f.name == "groupBy");
330
331 if let Some(f) = group_by_field {
332 let field_name = f
334 .arguments
335 .iter()
336 .find(|a| a.name == "field")
337 .and_then(|a| match &a.value {
338 ast::Value::String(s) => Some(s),
339 _ => None,
340 });
341
342 if let Some(group_field) = field_name {
343 let mut groups: HashMap<String, Vec<Document>> = HashMap::new();
344 for d in docs {
345 let key = d
346 .data
347 .get(group_field)
348 .map(|v| match v {
349 Value::String(s) => s.clone(),
350 _ => v.to_string(),
351 })
352 .unwrap_or_else(|| "null".to_string());
353 groups.entry(key).or_default().push(d);
354 }
355
356 let mut group_docs = Vec::with_capacity(groups.len());
357 for (key, group_items) in groups {
358 let mut data = HashMap::new();
359 for selection in &f.selection_set {
360 if let Selection::Field(sub_f) = selection {
361 let alias = sub_f.alias.as_ref().unwrap_or(&sub_f.name);
362 match sub_f.name.as_str() {
363 "key" => {
364 data.insert(alias.clone(), Value::String(key.clone()));
365 }
366 "count" => {
367 data.insert(
368 alias.clone(),
369 Value::Int(group_items.len() as i64),
370 );
371 }
372 "nodes" => {
373 let sub_fields = collect_fields(
374 &sub_f.selection_set,
375 &plan.fragments,
376 variables,
377 None,
378 )
379 .unwrap_or_default();
380
381 let mut projected_nodes = Vec::with_capacity(group_items.len());
382 for d in &group_items {
383 let node_doc =
384 apply_projection(d.clone(), &sub_fields, options)?;
385 projected_nodes.push(Value::Object(node_doc.data));
386 }
387 data.insert(alias.clone(), Value::Array(projected_nodes));
388 }
389 "aggregate" => {
390 let agg_res =
391 compute_aggregates(&group_items, &sub_f.selection_set);
392 data.insert(alias.clone(), Value::Object(agg_res));
393 }
394 _ => {}
395 }
396 }
397 }
398 group_docs.push(Document {
399 _sid: format!("group:{}", key),
400 data,
401 });
402 }
403 docs = group_docs;
404 }
405 } else if let Some(agg) = agg_field {
406 let agg_result = compute_aggregates(&docs, &agg.selection_set);
410 let alias = agg.alias.as_ref().unwrap_or(&agg.name);
411
412 if plan.projection.len() == 1 {
413 let mut data = HashMap::new();
414 data.insert(alias.clone(), Value::Object(agg_result));
415 docs = vec![Document {
416 _sid: "aggregate".to_string(),
417 data,
418 }];
419 } else {
420 let mut projected = Vec::with_capacity(docs.len());
422 for mut d in docs {
423 d.data
424 .insert(alias.clone(), Value::Object(agg_result.clone()));
425 projected.push(apply_projection(d, &plan.projection, options)?);
426 }
427 docs = projected;
428 }
429 } else if !plan.has_lookups {
430 let mut projected = Vec::with_capacity(docs.len());
431 for d in docs {
432 projected.push(apply_projection(d, &plan.projection, options)?);
433 }
434 docs = projected;
435 } else {
436 let mut projected = Vec::with_capacity(docs.len());
438 for d in docs {
439 let (proj_doc, _) = apply_projection_with_lookups(
440 db,
441 d,
442 collection_name,
443 &plan.projection,
444 &plan.fragments,
445 variables,
446 options,
447 &plan.lookup_dependencies,
448 )
449 .await?;
450 projected.push(proj_doc);
451 }
452 docs = projected;
453 }
454 }
455
456 Ok(ExecutionResult::Query(QueryResult {
457 collection: collection_name.clone(),
458 documents: docs,
459 total_count: None,
460 deferred_fields: vec![],
461 explain: None,
462 }))
463}
464
465fn compute_aggregates(docs: &[Document], selections: &[Selection]) -> HashMap<String, Value> {
466 let mut results = HashMap::new();
467
468 for selection in selections {
469 if let Selection::Field(f) = selection {
470 let alias = f.alias.as_ref().unwrap_or(&f.name);
471 let value = match f.name.as_str() {
472 "count" => Value::Int(docs.len() as i64),
473 "sum" => {
474 let field =
475 f.arguments
476 .iter()
477 .find(|a| a.name == "field")
478 .and_then(|a| match &a.value {
479 ast::Value::String(s) => Some(s),
480 _ => None,
481 });
482
483 if let Some(field_name) = field {
484 let sum: f64 = docs
485 .iter()
486 .filter_map(|d| d.data.get(field_name))
487 .filter_map(|v| match v {
488 Value::Int(i) => Some(*i as f64),
489 Value::Float(f) => Some(*f),
490 _ => None,
491 })
492 .sum();
493 Value::Float(sum)
494 } else {
495 Value::Null
496 }
497 }
498 "avg" => {
499 let field =
500 f.arguments
501 .iter()
502 .find(|a| a.name == "field")
503 .and_then(|a| match &a.value {
504 ast::Value::String(s) => Some(s),
505 _ => None,
506 });
507
508 if let Some(field_name) = field
509 && !docs.is_empty()
510 {
511 let values: Vec<f64> = docs
512 .iter()
513 .filter_map(|d| d.data.get(field_name))
514 .filter_map(|v| match v {
515 Value::Int(i) => Some(*i as f64),
516 Value::Float(f) => Some(*f),
517 _ => None,
518 })
519 .collect();
520
521 if values.is_empty() {
522 Value::Null
523 } else {
524 let sum: f64 = values.iter().sum();
525 Value::Float(sum / values.len() as f64)
526 }
527 } else {
528 Value::Null
529 }
530 }
531 "min" => {
532 let field =
533 f.arguments
534 .iter()
535 .find(|a| a.name == "field")
536 .and_then(|a| match &a.value {
537 ast::Value::String(s) => Some(s),
538 _ => None,
539 });
540
541 if let Some(field_name) = field
542 && !docs.is_empty()
543 {
544 let min = docs
545 .iter()
546 .filter_map(|d| d.data.get(field_name))
547 .filter_map(|v| match v {
548 Value::Int(i) => Some(*i as f64),
549 Value::Float(f) => Some(*f),
550 _ => None,
551 })
552 .fold(f64::INFINITY, f64::min);
553
554 if min == f64::INFINITY {
555 Value::Null
556 } else {
557 Value::Float(min)
558 }
559 } else {
560 Value::Null
561 }
562 }
563 "max" => {
564 let field =
565 f.arguments
566 .iter()
567 .find(|a| a.name == "field")
568 .and_then(|a| match &a.value {
569 ast::Value::String(s) => Some(s),
570 _ => None,
571 });
572
573 if let Some(field_name) = field
574 && !docs.is_empty()
575 {
576 let max = docs
577 .iter()
578 .filter_map(|d| d.data.get(field_name))
579 .filter_map(|v| match v {
580 Value::Int(i) => Some(*i as f64),
581 Value::Float(f) => Some(*f),
582 _ => None,
583 })
584 .fold(f64::NEG_INFINITY, f64::max);
585
586 if max == f64::NEG_INFINITY {
587 Value::Null
588 } else {
589 Value::Float(max)
590 }
591 } else {
592 Value::Null
593 }
594 }
595 _ => Value::Null,
596 };
597 results.insert(alias.clone(), value);
598 }
599 }
600
601 results
602}
603
604fn find_indexed_equality_filter_runtime(
605 filter: &ast::Filter,
606 db: &Aurora,
607 collection: &str,
608 variables: &HashMap<String, ast::Value>,
609) -> Option<(String, ast::Value)> {
610 match filter {
611 ast::Filter::Eq(field, val) => {
612 if field == "_sid" || db.has_index(collection, field) {
613 let resolved = resolve_if_variable(val, variables);
614 return Some((field.clone(), resolved.clone()));
615 }
616 }
617 ast::Filter::And(filters) => {
618 for f in filters {
619 if let Some(res) =
620 find_indexed_equality_filter_runtime(f, db, collection, variables)
621 {
622 return Some(res);
623 }
624 }
625 }
626 _ => {}
627 }
628 None
629}
630
631fn collect_fields(
633 selection_set: &[Selection],
634 fragments: &HashMap<String, FragmentDef>,
635 variable_values: &HashMap<String, ast::Value>,
636 parent_type: Option<&str>,
637) -> Result<Vec<Field>> {
638 let mut fields = Vec::new();
639
640 for selection in selection_set {
641 match selection {
642 Selection::Field(field) => {
643 if should_include(&field.directives, variable_values)? {
644 fields.push(field.clone());
645 }
646 }
647 Selection::FragmentSpread(name) => {
648 if let Some(fragment) = fragments.get(name) {
649 let type_match = if let Some(parent) = parent_type {
650 parent == fragment.type_condition
651 } else {
652 true
653 };
654
655 if type_match {
656 let fragment_fields = collect_fields(
657 &fragment.selection_set,
658 fragments,
659 variable_values,
660 parent_type,
661 )?;
662 fields.extend(fragment_fields);
663 }
664 }
665 }
666 Selection::InlineFragment(inline) => {
667 let type_match = if let Some(parent) = parent_type {
668 parent == inline.type_condition
669 } else {
670 true
671 };
672
673 if type_match {
674 let inline_fields = collect_fields(
675 &inline.selection_set,
676 fragments,
677 variable_values,
678 parent_type,
679 )?;
680 fields.extend(inline_fields);
681 }
682 }
683 Selection::ComputedField(cf) => {
684 let (expr_val, _complex_expr) = match &cf.expression {
685 ast::ComputedExpression::TemplateString(s) => (s.clone(), None),
686 _ => ("complex".to_string(), Some(ast::Expression::Literal(ast::Value::Null))), };
688
689 let mut field = Field {
690 alias: Some(cf.alias.clone()),
691 name: "__compute__".to_string(),
692 arguments: vec![ast::Argument {
693 name: "expr".to_string(),
694 value: ast::Value::String(expr_val),
695 }],
696 directives: Vec::new(),
697 selection_set: Vec::new(),
698 computed_expression: None,
699 };
700
701 if let ast::ComputedExpression::TemplateString(s) = &cf.expression {
702 field.arguments[0].value = ast::Value::String(s.clone());
703 }
704 match &cf.expression {
705 ast::ComputedExpression::TemplateString(_) => {}
706 ast::ComputedExpression::StandardExpression(e) => {
707 field.computed_expression = Some(e.clone());
708 }
709 ast::ComputedExpression::FunctionCall { name, args } => {
710 field.computed_expression = Some(ast::Expression::FunctionCall {
711 name: name.clone(),
712 args: args.clone(),
713 });
714 }
715 ast::ComputedExpression::PipeExpression { .. } => {
716 }
718 ast::ComputedExpression::SqlExpression(s) => {
719 field.computed_expression = Some(ast::Expression::Literal(ast::Value::String(format!("SQL: {}", s))));
720 }
721 ast::ComputedExpression::AggregateFunction { .. } => {
722 }
724 }
725
726 fields.push(field);
727 }
728 }
729 }
730 Ok(fields)
731}
732
733fn should_include(
735 directives: &[ast::Directive],
736 variables: &HashMap<String, ast::Value>,
737) -> Result<bool> {
738 for dir in directives {
739 if dir.name == "skip" {
740 if let Some(arg) = dir.arguments.iter().find(|a| a.name == "if") {
741 let should_skip = resolve_boolean_arg(&arg.value, variables)?;
742 if should_skip {
743 return Ok(false);
744 }
745 }
746 } else if dir.name == "include" {
747 if let Some(arg) = dir.arguments.iter().find(|a| a.name == "if") {
748 let should_include = resolve_boolean_arg(&arg.value, variables)?;
749 if !should_include {
750 return Ok(false);
751 }
752 }
753 }
754 }
755 Ok(true)
756}
757
758fn check_permissions(
761 directives: &[ast::Directive],
762 options: &ExecutionOptions,
763 is_write: bool,
764) -> Result<()> {
765 for directive in directives {
766 match directive.name.as_str() {
767 "auth" => {
768 if options.user_role.is_none() {
769 return Err(AqlError::new(
770 ErrorCode::Unauthorized,
771 "Authentication required for this operation".to_string(),
772 ));
773 }
774 }
775 "require" | "allow" => {
776 let required_role = directive
777 .arguments
778 .iter()
779 .find(|a| {
780 if is_write {
781 a.name == "write" || a.name == "role"
782 } else {
783 a.name == "read" || a.name == "role"
784 }
785 })
786 .and_then(|a| {
787 if let ast::Value::String(s) = &a.value {
788 Some(s.as_str())
789 } else {
790 None
791 }
792 });
793
794 if let Some(role) = required_role {
795 if options.user_role.as_deref() != Some(role) {
796 return Err(AqlError::new(
797 ErrorCode::Forbidden,
798 format!("Role '{}' required for this operation", role),
799 ));
800 }
801 }
802 }
803 _ => {}
804 }
805 }
806 Ok(())
807}
808
809fn resolve_boolean_arg(
810 value: &ast::Value,
811 variables: &HashMap<String, ast::Value>,
812) -> Result<bool> {
813 match value {
814 ast::Value::Boolean(b) => Ok(*b),
815 ast::Value::Variable(name) => {
816 if let Some(val) = variables.get(name) {
817 match val {
818 ast::Value::Boolean(b) => Ok(*b),
819 _ => Err(AqlError::new(
820 ErrorCode::TypeError,
821 format!("Variable '{}' is not a boolean, got {:?}", name, val),
822 )),
823 }
824 } else {
825 Err(AqlError::new(
826 ErrorCode::UndefinedVariable,
827 format!("Variable '{}' is not defined", name),
828 ))
829 }
830 }
831 _ => Err(AqlError::new(
832 ErrorCode::TypeError,
833 format!("Expected boolean value, got {:?}", value),
834 )),
835 }
836}
837
838fn validate_required_variables(
840 variable_definitions: &[ast::VariableDefinition],
841 provided_variables: &HashMap<String, ast::Value>,
842) -> Result<()> {
843 for var_def in variable_definitions {
844 if var_def.var_type.is_required {
845 if !provided_variables.contains_key(&var_def.name) {
846 if var_def.default_value.is_none() {
847 return Err(AqlError::new(
848 ErrorCode::UndefinedVariable,
849 format!(
850 "Required variable '{}' (type: {}{}) is not provided",
851 var_def.name,
852 var_def.var_type.name,
853 if var_def.var_type.is_required {
854 "!"
855 } else {
856 ""
857 }
858 ),
859 ));
860 }
861 }
862 }
863 }
864 Ok(())
865}
866
867#[derive(Debug)]
872pub enum ExecutionResult {
873 Query(QueryResult),
875 Mutation(MutationResult),
877 Subscription(SubscriptionResult),
879 Batch(Vec<ExecutionResult>),
881 Schema(SchemaResult),
883 Migration(MigrationResult),
885}
886
887impl ExecutionResult {
888 pub fn as_query(&self) -> Option<&QueryResult> {
890 if let Self::Query(q) = self {
891 Some(q)
892 } else {
893 None
894 }
895 }
896
897 pub fn into_query(self) -> Option<QueryResult> {
899 if let Self::Query(q) = self {
900 Some(q)
901 } else {
902 None
903 }
904 }
905
906 pub fn as_mutation(&self) -> Option<&MutationResult> {
908 if let Self::Mutation(m) = self {
909 Some(m)
910 } else {
911 None
912 }
913 }
914
915 pub fn into_mutation(self) -> Option<MutationResult> {
917 if let Self::Mutation(m) = self {
918 Some(m)
919 } else {
920 None
921 }
922 }
923
924 pub fn as_subscription(&self) -> Option<&SubscriptionResult> {
926 if let Self::Subscription(s) = self {
927 Some(s)
928 } else {
929 None
930 }
931 }
932
933 pub fn into_subscription(self) -> Option<SubscriptionResult> {
935 if let Self::Subscription(s) = self {
936 Some(s)
937 } else {
938 None
939 }
940 }
941
942 pub fn bind<T: serde::de::DeserializeOwned>(self) -> Result<Vec<T>> {
947 match self {
948 Self::Query(q) => q.bind(),
949 Self::Mutation(m) => m.bind(),
950 Self::Batch(results) => {
951 let mut all = Vec::new();
952 for r in results {
953 all.extend(r.bind()?);
954 }
955 Ok(all)
956 }
957 _ => Err(AqlError::new(
958 ErrorCode::QueryError,
959 "Cannot bind results from schema or migration operations".to_string(),
960 )),
961 }
962 }
963
964 pub fn bind_first<T: serde::de::DeserializeOwned>(self) -> Result<T> {
968 match self {
969 Self::Query(q) => q.bind_first(),
970 Self::Mutation(m) => m.bind_first(),
971 Self::Batch(mut results) => {
972 if results.is_empty() {
973 return Err(AqlError::new(
974 ErrorCode::NotFound,
975 "No documents found to bind".to_string(),
976 ));
977 }
978 results.remove(0).bind_first()
979 }
980 _ => Err(AqlError::new(
981 ErrorCode::QueryError,
982 "Cannot bind results from schema or migration operations".to_string(),
983 )),
984 }
985 }
986}
987
988#[derive(Debug, Clone)]
989pub struct SchemaResult {
990 pub operation: String,
991 pub collection: String,
992 pub status: String,
993}
994
995#[derive(Debug, Clone)]
996pub struct MigrationResult {
997 pub version: String,
998 pub steps_applied: usize,
999 pub status: String,
1000}
1001
1002#[derive(Debug, Clone, Serialize)]
1003pub struct ExecutionPlan {
1004 pub operations: Vec<String>,
1005 pub estimated_cost: f64,
1006}
1007
1008#[derive(Debug, Clone)]
1010pub struct QueryResult {
1011 pub collection: String,
1013 pub documents: Vec<Document>,
1015 pub total_count: Option<usize>,
1017 pub deferred_fields: Vec<String>,
1019 pub explain: Option<ExplainResult>,
1021}
1022
1023impl QueryResult {
1024 pub fn bind<T: serde::de::DeserializeOwned>(self) -> Result<Vec<T>> {
1026 self.documents
1027 .into_iter()
1028 .map(|d| d.bind())
1029 .collect::<Result<Vec<T>>>()
1030 }
1031
1032 pub fn bind_first<T: serde::de::DeserializeOwned>(self) -> Result<T> {
1034 self.documents
1035 .into_iter()
1036 .next()
1037 .ok_or_else(|| {
1038 AqlError::new(
1039 ErrorCode::NotFound,
1040 "No documents found to bind".to_string(),
1041 )
1042 })?
1043 .bind()
1044 }
1045}
1046
1047#[derive(Debug, Clone, Default)]
1049pub struct ExplainResult {
1050 pub collection: String,
1052 pub docs_scanned: usize,
1054 pub index_used: bool,
1056 pub elapsed_ms: u128,
1058}
1059
1060#[derive(Debug, Clone)]
1062pub struct MutationResult {
1063 pub operation: String,
1065 pub collection: String,
1067 pub affected_count: usize,
1069 pub returned_documents: Vec<Document>,
1071}
1072
1073impl MutationResult {
1074 pub fn bind<T: serde::de::DeserializeOwned>(self) -> Result<Vec<T>> {
1076 self.returned_documents
1077 .into_iter()
1078 .map(|d| d.bind())
1079 .collect::<Result<Vec<T>>>()
1080 }
1081
1082 pub fn bind_first<T: serde::de::DeserializeOwned>(self) -> Result<T> {
1084 self.returned_documents
1085 .into_iter()
1086 .next()
1087 .ok_or_else(|| {
1088 AqlError::new(
1089 ErrorCode::NotFound,
1090 "No documents found to bind".to_string(),
1091 )
1092 })?
1093 .bind()
1094 }
1095}
1096
1097#[derive(Debug)]
1099pub struct SubscriptionResult {
1100 pub subscription_id: String,
1102 pub collection: String,
1104 pub stream: Option<crate::pubsub::ChangeListener>,
1106}
1107
1108#[derive(Debug, Clone)]
1110pub struct ExecutionOptions {
1111 pub skip_validation: bool,
1112 pub apply_projections: bool,
1113 pub debug_audit: bool,
1114 pub variables: HashMap<String, ast::Value>,
1115 pub user_role: Option<String>,
1117}
1118
1119impl Default for ExecutionOptions {
1120 fn default() -> Self {
1121 Self {
1122 skip_validation: false,
1123 apply_projections: true,
1124 debug_audit: false,
1125 variables: HashMap::new(),
1126 user_role: None,
1127 }
1128 }
1129}
1130
1131impl ExecutionOptions {
1132 pub fn new() -> Self {
1133 Self::default()
1134 }
1135
1136 pub fn with_variables(mut self, vars: HashMap<String, ast::Value>) -> Self {
1137 self.variables = vars;
1138 self
1139 }
1140
1141 pub fn with_role(mut self, role: impl Into<String>) -> Self {
1142 self.user_role = Some(role.into());
1143 self
1144 }
1145
1146 pub fn skip_validation(mut self) -> Self {
1147 self.skip_validation = true;
1148 self
1149 }
1150}
1151
1152pub(crate) fn json_to_aql_value(v: serde_json::Value) -> ast::Value {
1153 match v {
1154 serde_json::Value::Null => ast::Value::Null,
1155 serde_json::Value::Bool(b) => ast::Value::Boolean(b),
1156 serde_json::Value::Number(n) => {
1157 if let Some(i) = n.as_i64() {
1158 ast::Value::Int(i)
1159 } else if let Some(f) = n.as_f64() {
1160 ast::Value::Float(f)
1161 } else {
1162 ast::Value::Null
1163 }
1164 }
1165 serde_json::Value::String(s) => ast::Value::String(s),
1166 serde_json::Value::Array(arr) => {
1167 ast::Value::Array(arr.into_iter().map(json_to_aql_value).collect())
1168 }
1169 serde_json::Value::Object(map) => ast::Value::Object(
1170 map.into_iter()
1171 .map(|(k, v)| (k, json_to_aql_value(v)))
1172 .collect(),
1173 ),
1174 }
1175}
1176
1177pub async fn execute_document(
1179 db: &Aurora,
1180 doc: &ast::Document,
1181 options: &ExecutionOptions,
1182) -> Result<ExecutionResult> {
1183 if doc.operations.is_empty() {
1184 return Err(AqlError::new(
1185 ErrorCode::QueryError,
1186 "No operations in document".to_string(),
1187 ));
1188 }
1189
1190 println!("execute_document: first op: {:?}", doc.operations[0]);
1191
1192 let vars: HashMap<String, ast::Value> = options.variables.clone();
1193
1194 let fragments: HashMap<String, FragmentDef> = doc
1195 .operations
1196 .iter()
1197 .filter_map(|op| {
1198 if let Operation::FragmentDefinition(frag) = op {
1199 Some((frag.name.clone(), frag.clone()))
1200 } else {
1201 None
1202 }
1203 })
1204 .collect();
1205
1206 let executable_ops: Vec<&Operation> = doc
1207 .operations
1208 .iter()
1209 .filter(|op| !matches!(op, Operation::FragmentDefinition(_)))
1210 .collect();
1211
1212 if executable_ops.is_empty() {
1213 return Err(AqlError::new(
1214 ErrorCode::QueryError,
1215 "No executable operations in document".to_string(),
1216 ));
1217 }
1218
1219 if executable_ops.len() == 1 {
1220 execute_operation(db, executable_ops[0], &vars, options, &fragments).await
1221 } else {
1222 let mut results = Vec::new();
1223 for op in executable_ops {
1224 results.push(execute_operation(db, op, &vars, options, &fragments).await?);
1225 }
1226 Ok(ExecutionResult::Batch(results))
1227 }
1228}
1229
1230async fn execute_operation(
1231 db: &Aurora,
1232 op: &Operation,
1233 vars: &HashMap<String, ast::Value>,
1234 options: &ExecutionOptions,
1235 fragments: &HashMap<String, FragmentDef>,
1236) -> Result<ExecutionResult> {
1237 match op {
1238 Operation::Query(query) => execute_query(db, query, vars, options, fragments).await,
1239 Operation::Mutation(mutation) => {
1240 execute_mutation(db, mutation, vars, options, fragments).await
1241 }
1242 Operation::Subscription(sub) => execute_subscription(db, sub, vars, options).await,
1243 Operation::Schema(schema) => execute_schema(db, schema, options).await,
1244 Operation::Migration(migration) => execute_migration(db, migration, options).await,
1245 Operation::Introspection(intro) => execute_introspection(db, intro).await,
1246 Operation::Handler(handler) => execute_handler_registration(db, handler, options).await,
1247 _ => Ok(ExecutionResult::Query(QueryResult {
1248 collection: String::new(),
1249 documents: vec![],
1250 total_count: None,
1251 deferred_fields: vec![],
1252 explain: None,
1253 })),
1254 }
1255}
1256
1257async fn execute_query(
1258 db: &Aurora,
1259 query: &ast::Query,
1260 vars: &HashMap<String, ast::Value>,
1261 options: &ExecutionOptions,
1262 fragments: &HashMap<String, FragmentDef>,
1263) -> Result<ExecutionResult> {
1264 validate_required_variables(&query.variable_definitions, vars)?;
1265 let has_explain = query.directives.iter().any(|d| d.name == "explain");
1266 let root_fields = collect_fields(&query.selection_set, fragments, vars, None)?;
1267 let mut results = Vec::new();
1268 for field in &root_fields {
1269 let sub_fields = collect_fields(&field.selection_set, fragments, vars, Some(&field.name))?;
1270 let start = std::time::Instant::now();
1271 let mut result =
1272 execute_collection_query(db, field, &sub_fields, vars, options, fragments).await?;
1273 if has_explain {
1274 let elapsed_ms = start.elapsed().as_millis();
1275 let index_used =
1276 field.arguments.iter().any(|a| a.name == "where") && !result.documents.is_empty();
1277 result.explain = Some(ExplainResult {
1278 collection: result.collection.clone(),
1279 docs_scanned: result.documents.len(),
1280 index_used,
1281 elapsed_ms,
1282 });
1283 }
1284 results.push(result);
1285 }
1286 if results.len() == 1 {
1287 Ok(ExecutionResult::Query(results.remove(0)))
1288 } else {
1289 Ok(ExecutionResult::Batch(
1290 results.into_iter().map(ExecutionResult::Query).collect(),
1291 ))
1292 }
1293}
1294
1295async fn execute_collection_query(
1296 db: &Aurora,
1297 field: &ast::Field,
1298 sub_fields: &[ast::Field],
1299 variables: &HashMap<String, ast::Value>,
1300 options: &ExecutionOptions,
1301 fragments: &HashMap<String, FragmentDef>,
1302) -> Result<QueryResult> {
1303 let collection_name = &field.name;
1304
1305 check_permissions(&field.directives, options, false)?;
1307
1308 if let Some(search_arg) = field.arguments.iter().find(|a| a.name == "search") {
1311 return execute_search_query(
1312 db,
1313 collection_name,
1314 search_arg,
1315 sub_fields,
1316 field,
1317 variables,
1318 options,
1319 )
1320 .await;
1321 }
1322
1323 let filter = extract_filter_from_args(&field.arguments)?;
1324 let (limit, offset) = extract_pagination(&field.arguments);
1325 let (first, after, _last, _before) = extract_cursor_pagination(&field.arguments);
1326 let compiled_filter = if let Some(ref f) = filter {
1327 Some(compile_filter(f)?)
1328 } else {
1329 None
1330 };
1331 let vars_arc = Arc::new(variables.clone());
1332 let filter_fn = move |doc: &Document| {
1333 compiled_filter
1334 .as_ref()
1335 .map(|f| matches_filter(doc, f, &vars_arc))
1336 .unwrap_or(true)
1337 };
1338
1339 let indexed_docs = if let Some(ref f) = filter {
1340 match find_indexed_equality_filter(f, db, collection_name) {
1341 Some((field_name, val))
1342 if field_name == "_sid" || !db.is_index_building(collection_name, &field_name) =>
1343 {
1344 let db_val = aql_value_to_db_value(&val, variables)?;
1345 let ids = if field_name == "_sid" {
1346 match &db_val {
1347 Value::String(s) => vec![s.clone()],
1348 Value::Uuid(u) => vec![u.to_string()],
1349 _ => vec![],
1350 }
1351 } else {
1352 db.get_ids_from_index(collection_name, &field_name, &db_val)
1353 };
1354
1355 let mut docs = Vec::with_capacity(ids.len());
1356 for id in ids {
1357 if let Some(doc) = db.get_document(collection_name, &id)? {
1358 if filter_fn(&doc) {
1359 docs.push(doc);
1360 }
1361 }
1362 }
1363 Some(docs)
1364 }
1365 _ => None,
1367 }
1368 } else {
1369 None
1370 };
1371
1372 let is_connection = sub_fields
1373 .iter()
1374 .any(|f| f.name == "edges" || f.name == "pageInfo");
1375
1376 let orderings = extract_order_by(&field.arguments);
1377
1378 let mut docs = if let Some(docs) = indexed_docs {
1379 docs
1380 } else {
1381 let scan_limit = if after.is_some() || !orderings.is_empty() {
1385 None
1386 } else {
1387 limit.or(first).map(|l| {
1388 let base = if is_connection { l + 1 } else { l };
1389 base + offset
1390 })
1391 };
1392 db.scan_and_filter(collection_name, filter_fn, scan_limit)?
1393 };
1394
1395 if let Some(validate_arg) = field.arguments.iter().find(|a| a.name == "validate") {
1398 docs.retain(|doc| doc_passes_validate_arg(doc, validate_arg));
1399 }
1400
1401 if !orderings.is_empty() {
1403 apply_ordering(&mut docs, &orderings);
1404 }
1405
1406 if let Some(ref cursor) = after {
1408 if let Some(pos) = docs.iter().position(|d| &d._sid == cursor) {
1409 docs.drain(0..=pos);
1410 }
1411 }
1412
1413 if is_connection {
1414 return Ok(execute_connection(
1415 docs,
1416 sub_fields,
1417 limit.or(first),
1418 fragments,
1419 variables,
1420 options,
1421 )?);
1422 }
1423
1424 if offset > 0 {
1426 if offset < docs.len() {
1427 docs.drain(0..offset);
1428 } else {
1429 docs.clear();
1430 }
1431 }
1432
1433 if let Some(l) = limit.or(first) {
1435 docs.truncate(l);
1436 }
1437
1438 let lookup_dependencies =
1440 identify_lookup_dependencies(db, collection_name, sub_fields, variables);
1441 let has_lookups = !lookup_dependencies.is_empty();
1442
1443 let mut deferred_fields = Vec::new();
1444
1445 if options.apply_projections && !sub_fields.is_empty() {
1446 if has_lookups {
1447 let mut projected = Vec::with_capacity(docs.len());
1448 for d in docs {
1449 let (proj_doc, deferred) = apply_projection_with_lookups(
1450 db,
1451 d,
1452 collection_name,
1453 sub_fields,
1454 fragments,
1455 variables,
1456 options,
1457 &lookup_dependencies,
1458 )
1459 .await?;
1460 projected.push(proj_doc);
1461 if deferred_fields.is_empty() {
1462 deferred_fields = deferred;
1463 }
1464 }
1465 docs = projected;
1466 } else {
1467 let mut all_deferred = Vec::new();
1468 let mut projected = Vec::with_capacity(docs.len());
1469 for d in docs {
1470 let (proj, deferred) =
1471 apply_projection_and_defer(d, sub_fields, variables, options)?;
1472 if all_deferred.is_empty() && !deferred.is_empty() {
1473 all_deferred = deferred;
1474 }
1475 projected.push(proj);
1476 }
1477 docs = projected;
1478 deferred_fields = all_deferred;
1479 }
1480 }
1481
1482 for sf in sub_fields {
1484 if sf.name == "windowFunc" {
1485 let alias = sf.alias.as_ref().unwrap_or(&sf.name).clone();
1486 let wfield = arg_string(&sf.arguments, "field").unwrap_or_default();
1487 let func = arg_string(&sf.arguments, "function").unwrap_or_else(|| "avg".to_string());
1488 let wsize = arg_i64(&sf.arguments, "windowSize").unwrap_or(3) as usize;
1489 apply_window_function(&mut docs, &alias, &wfield, &func, wsize);
1490 }
1491 }
1492
1493 if let Some(ds_field) = sub_fields.iter().find(|f| f.name == "downsample") {
1495 let interval =
1496 arg_string(&ds_field.arguments, "interval").unwrap_or_else(|| "1h".to_string());
1497 let aggregation =
1498 arg_string(&ds_field.arguments, "aggregation").unwrap_or_else(|| "avg".to_string());
1499 let ds_sub: Vec<String> =
1500 collect_fields(&ds_field.selection_set, fragments, variables, None)
1501 .unwrap_or_default()
1502 .iter()
1503 .map(|f| f.name.clone())
1504 .collect();
1505 docs = apply_downsample(docs, &interval, &aggregation, &ds_sub);
1506 }
1507
1508 Ok(QueryResult {
1509 collection: collection_name.clone(),
1510 documents: docs,
1511 total_count: None,
1512 deferred_fields,
1513 explain: None,
1514 })
1515}
1516
1517async fn execute_search_query(
1519 db: &Aurora,
1520 collection: &str,
1521 search_arg: &ast::Argument,
1522 sub_fields: &[ast::Field],
1523 field: &ast::Field,
1524 variables: &HashMap<String, ast::Value>,
1525 options: &ExecutionOptions,
1526) -> Result<QueryResult> {
1527 let resolved_search_val = resolve_ast_deep(&search_arg.value, variables);
1529 let (query_str, search_fields, fuzzy) = extract_search_params(&resolved_search_val);
1530 let (limit, _) = extract_pagination(&field.arguments);
1531
1532 let mut builder = db.search(collection).query(&query_str);
1533 if fuzzy {
1534 builder = builder.fuzzy(1);
1535 }
1536 if let Some(l) = limit {
1537 builder = builder.limit(l);
1538 }
1539
1540 let mut docs = builder
1541 .collect_with_fields(if search_fields.is_empty() {
1542 None
1543 } else {
1544 Some(&search_fields)
1545 })
1546 .await?;
1547
1548 if options.apply_projections && !sub_fields.is_empty() {
1549 let mut projected = Vec::with_capacity(docs.len());
1550 for d in docs {
1551 let (proj, _) = apply_projection_and_defer(d, sub_fields, variables, options)?;
1552 projected.push(proj);
1553 }
1554 docs = projected;
1555 }
1556
1557 Ok(QueryResult {
1558 collection: collection.to_string(),
1559 documents: docs,
1560 total_count: None,
1561 deferred_fields: vec![],
1562 explain: None,
1563 })
1564}
1565
1566fn extract_search_params(v: &ast::Value) -> (String, Vec<String>, bool) {
1567 let mut query = String::new();
1568 let mut fields = Vec::new();
1569 let mut fuzzy = false;
1570 if let ast::Value::Object(m) = v {
1571 if let Some(ast::Value::String(q)) = m.get("query") {
1572 query = q.clone();
1573 }
1574 if let Some(ast::Value::Array(arr)) = m.get("fields") {
1575 for item in arr {
1576 if let ast::Value::String(s) = item {
1577 fields.push(s.clone());
1578 }
1579 }
1580 }
1581 if let Some(ast::Value::Boolean(b)) = m.get("fuzzy") {
1582 fuzzy = *b;
1583 }
1584 }
1585 (query, fields, fuzzy)
1586}
1587
1588fn doc_passes_validate_arg(doc: &Document, validate_arg: &ast::Argument) -> bool {
1589 if let ast::Value::Object(rules) = &validate_arg.value {
1590 for (field_name, constraints_val) in rules {
1591 if let ast::Value::Object(constraints) = constraints_val {
1592 if let Some(field_val) = doc.data.get(field_name) {
1593 for (constraint_name, constraint_val) in constraints {
1594 if !check_inline_constraint(field_val, constraint_name, constraint_val) {
1595 return false;
1596 }
1597 }
1598 }
1599 }
1600 }
1601 }
1602 true
1603}
1604
1605fn check_inline_constraint(value: &Value, constraint: &str, constraint_val: &ast::Value) -> bool {
1606 match constraint {
1607 "format" => {
1608 if let (Value::String(s), ast::Value::String(fmt)) = (value, constraint_val) {
1609 return match fmt.as_str() {
1610 "email" => {
1611 s.contains('@')
1612 && s.split('@')
1613 .nth(1)
1614 .map(|d| d.contains('.'))
1615 .unwrap_or(false)
1616 }
1617 "url" => s.starts_with("http://") || s.starts_with("https://"),
1618 "uuid" => uuid::Uuid::parse_str(s).is_ok(),
1619 _ => true,
1620 };
1621 }
1622 true
1623 }
1624 "min" => {
1625 let n = match value {
1626 Value::Int(i) => *i as f64,
1627 Value::Float(f) => *f,
1628 _ => return true,
1629 };
1630 let min = match constraint_val {
1631 ast::Value::Float(f) => *f,
1632 ast::Value::Int(i) => *i as f64,
1633 _ => return true,
1634 };
1635 n >= min
1636 }
1637 "max" => {
1638 let n = match value {
1639 Value::Int(i) => *i as f64,
1640 Value::Float(f) => *f,
1641 _ => return true,
1642 };
1643 let max = match constraint_val {
1644 ast::Value::Float(f) => *f,
1645 ast::Value::Int(i) => *i as f64,
1646 _ => return true,
1647 };
1648 n <= max
1649 }
1650 "minLength" => {
1651 if let (Value::String(s), ast::Value::Int(n)) = (value, constraint_val) {
1652 return s.len() >= *n as usize;
1653 }
1654 true
1655 }
1656 "maxLength" => {
1657 if let (Value::String(s), ast::Value::Int(n)) = (value, constraint_val) {
1658 return s.len() <= *n as usize;
1659 }
1660 true
1661 }
1662 "pattern" => {
1663 if let (Value::String(s), ast::Value::String(pat)) = (value, constraint_val) {
1664 if let Ok(re) = regex::Regex::new(pat) {
1665 return re.is_match(s);
1666 }
1667 }
1668 true
1669 }
1670 _ => true,
1671 }
1672}
1673
1674fn arg_string(args: &[ast::Argument], name: &str) -> Option<String> {
1675 args.iter().find(|a| a.name == name).and_then(|a| {
1676 if let ast::Value::String(s) = &a.value {
1677 Some(s.clone())
1678 } else {
1679 None
1680 }
1681 })
1682}
1683
1684fn arg_i64(args: &[ast::Argument], name: &str) -> Option<i64> {
1685 args.iter().find(|a| a.name == name).and_then(|a| {
1686 if let ast::Value::Int(i) = &a.value {
1687 Some(*i)
1688 } else {
1689 None
1690 }
1691 })
1692}
1693
1694fn identify_lookup_dependencies(
1695 db: &Aurora,
1696 collection: &str,
1697 fields: &[ast::Field],
1698 variables: &HashMap<String, ast::Value>,
1699) -> Vec<String> {
1700 let mut deps = Vec::new();
1701 for f in fields {
1702 let mut found = false;
1703 if let (Some(lf), Some(_c), Some(_ff)) = (
1704 f.arguments.iter().find(|a| a.name == "localField"),
1705 f.arguments.iter().find(|a| a.name == "collection"),
1706 f.arguments.iter().find(|a| a.name == "foreignField"),
1707 ) {
1708 if let ast::Value::String(s) = resolve_if_variable(&lf.value, variables) {
1709 deps.push(s.clone());
1710 }
1711 found = true;
1712 }
1713 if !found {
1714 if let Ok(col_def) = db.get_collection_definition(collection) {
1715 if let Some(f_def) = col_def.fields.get(&f.name) {
1716 if let Some(_rel) = &f_def.relation {
1717 deps.push(f.name.clone());
1718 }
1719 }
1720 }
1721 }
1722 }
1723 deps
1724}
1725
1726fn apply_projection_and_defer(
1728 mut doc: Document,
1729 fields: &[ast::Field],
1730 vars: &HashMap<String, ast::Value>,
1731 options: &ExecutionOptions,
1732) -> Result<(Document, Vec<String>)> {
1733 if fields.is_empty() {
1734 return Ok((doc, vec![]));
1735 }
1736 let mut proj = HashMap::new();
1737 let mut deferred = Vec::new();
1738
1739 for f in fields {
1740 check_permissions(&f.directives, options, false)?;
1742
1743 if f.directives.iter().any(|d| d.name == "defer") {
1745 deferred.push(f.alias.as_ref().unwrap_or(&f.name).clone());
1746 continue;
1747 }
1748 if f.name == "__compute__" {
1750 let alias = f.alias.as_deref().unwrap_or("computed");
1751 if let Some(expr_arg) = f.arguments.iter().find(|a| a.name == "expr") {
1752 if let ast::Value::String(template) = &expr_arg.value {
1754 let result = eval_template(template, &doc.data);
1755 proj.insert(alias.to_string(), Value::String(result));
1756 } else if let Some(expr) = f.computed_expression.as_ref() {
1757 let result = eval_expression(expr, &doc.data, vars);
1759 proj.insert(alias.to_string(), result);
1760 }
1761 }
1762 continue;
1763 }
1764 if f.name == "id" {
1765 if let Some(v) = doc.data.get("id") {
1767 proj.insert(f.alias.as_ref().unwrap_or(&f.name).clone(), v.clone());
1768 } else {
1769 proj.insert(f.alias.as_ref().unwrap_or(&f.name).clone(), Value::Null);
1771 }
1772 } else if f.name == "_sid" {
1773 proj.insert(
1774 f.alias.as_ref().unwrap_or(&f.name).clone(),
1775 Value::String(doc._sid.clone()),
1776 );
1777 } else if let Some(v) = doc.data.get(&f.name) {
1778 proj.insert(f.alias.as_ref().unwrap_or(&f.name).clone(), v.clone());
1779 }
1780 }
1781 doc.data = proj;
1782 Ok((doc, deferred))
1783}
1784fn eval_expression(
1786 expr: &ast::Expression,
1787 data: &HashMap<String, Value>,
1788 variables: &HashMap<String, ast::Value>,
1789) -> Value {
1790 match expr {
1791 ast::Expression::Literal(v) => match aql_value_to_db_value(v, variables) {
1792 Ok(dv) => dv,
1793 Err(_) => Value::Null,
1794 },
1795 ast::Expression::Variable(name) => {
1796 if let Some(v) = variables.get(name) {
1797 match aql_value_to_db_value(v, variables) {
1798 Ok(dv) => dv,
1799 Err(_) => Value::Null,
1800 }
1801 } else {
1802 Value::Null
1803 }
1804 }
1805 ast::Expression::FieldAccess(parts) => {
1806 let mut current = if parts[0] == "_sid" {
1807 Some(Value::String(data.get("_sid").cloned().and_then(|v| v.as_str().map(|s| s.to_string())).unwrap_or_default()))
1808 } else {
1809 data.get(&parts[0]).cloned()
1810 };
1811
1812 for part in parts.iter().skip(1) {
1813 if let Some(Value::Object(map)) = current {
1814 current = map.get(part).cloned();
1815 } else {
1816 current = None;
1817 break;
1818 }
1819 }
1820 current.unwrap_or(Value::Null)
1821 }
1822 ast::Expression::Binary { op, left, right } => {
1823 let lv = eval_expression(left, data, variables);
1824 let rv = eval_expression(right, data, variables);
1825 eval_binary_op(*op, lv, rv)
1826 }
1827 ast::Expression::Unary { op, expr } => {
1828 let v = eval_expression(expr, data, variables);
1829 eval_unary_op(*op, v)
1830 }
1831 ast::Expression::Ternary {
1832 condition,
1833 then_expr,
1834 else_expr,
1835 } => {
1836 let cv = eval_expression(condition, data, variables);
1837 let is_true = match cv {
1838 Value::Bool(b) => b,
1839 Value::Null => false,
1840 Value::Int(i) => i != 0,
1841 Value::Float(f) => f != 0.0,
1842 Value::String(s) => !s.is_empty(),
1843 _ => true,
1844 };
1845 if is_true {
1846 eval_expression(then_expr, data, variables)
1847 } else {
1848 eval_expression(else_expr, data, variables)
1849 }
1850 }
1851 ast::Expression::FunctionCall { name, args } => {
1852 let evaluated_args: Vec<Value> = args
1853 .iter()
1854 .map(|a| eval_expression(a, data, variables))
1855 .collect();
1856 eval_function_call(name, evaluated_args)
1857 }
1858 }
1859}
1860
1861fn eval_binary_op(op: ast::BinaryOp, left: Value, right: Value) -> Value {
1862 match op {
1863 ast::BinaryOp::Add => match (left, right) {
1864 (Value::Int(a), Value::Int(b)) => Value::Int(a + b),
1865 (Value::Int(a), Value::Float(b)) => Value::Float(a as f64 + b),
1866 (Value::Float(a), Value::Int(b)) => Value::Float(a + b as f64),
1867 (Value::Float(a), Value::Float(b)) => Value::Float(a + b),
1868 (Value::String(mut a), Value::String(b)) => {
1869 a.push_str(&b);
1870 Value::String(a)
1871 }
1872 _ => Value::Null,
1873 },
1874 ast::BinaryOp::Sub => match (left, right) {
1875 (Value::Int(a), Value::Int(b)) => Value::Int(a - b),
1876 (Value::Float(a), Value::Float(b)) => Value::Float(a - b),
1877 _ => Value::Null,
1878 },
1879 ast::BinaryOp::Mul => match (left, right) {
1880 (Value::Int(a), Value::Int(b)) => Value::Int(a * b),
1881 (Value::Int(a), Value::Float(b)) => Value::Float(a as f64 * b),
1882 (Value::Float(a), Value::Int(b)) => Value::Float(a * b as f64),
1883 (Value::Float(a), Value::Float(b)) => Value::Float(a * b),
1884 _ => Value::Null,
1885 },
1886 ast::BinaryOp::Div => match (left, right) {
1887 (Value::Int(a), Value::Int(b)) if b != 0 => Value::Int(a / b),
1888 (Value::Float(a), Value::Float(b)) if b != 0.0 => Value::Float(a / b),
1889 _ => Value::Null,
1890 },
1891 ast::BinaryOp::Eq => Value::Bool(left == right),
1892 ast::BinaryOp::Ne => Value::Bool(left != right),
1893 ast::BinaryOp::Gt => Value::Bool(left > right),
1894 ast::BinaryOp::Gte => Value::Bool(left >= right),
1895 ast::BinaryOp::Lt => Value::Bool(left < right),
1896 ast::BinaryOp::Lte => Value::Bool(left <= right),
1897 ast::BinaryOp::And => {
1898 let lb = matches!(left, Value::Bool(true));
1899 let rb = matches!(right, Value::Bool(true));
1900 Value::Bool(lb && rb)
1901 }
1902 ast::BinaryOp::Or => {
1903 let lb = matches!(left, Value::Bool(true));
1904 let rb = matches!(right, Value::Bool(true));
1905 Value::Bool(lb || rb)
1906 }
1907 _ => Value::Null,
1908 }
1909}
1910
1911fn eval_unary_op(op: ast::UnaryOp, val: Value) -> Value {
1912 match op {
1913 ast::UnaryOp::Not => match val {
1914 Value::Bool(b) => Value::Bool(!b),
1915 Value::Null => Value::Bool(true),
1916 _ => Value::Bool(false),
1917 },
1918 ast::UnaryOp::Neg => match val {
1919 Value::Int(i) => Value::Int(-i),
1920 Value::Float(f) => Value::Float(-f),
1921 _ => Value::Null,
1922 },
1923 }
1924}
1925
1926fn eval_function_call(name: &str, args: Vec<Value>) -> Value {
1927 match name {
1928 "upper" | "uppercase" => {
1929 if let Some(Value::String(s)) = args.get(0) {
1930 Value::String(s.to_uppercase())
1931 } else {
1932 Value::Null
1933 }
1934 }
1935 "lower" | "lowercase" => {
1936 if let Some(Value::String(s)) = args.get(0) {
1937 Value::String(s.to_lowercase())
1938 } else {
1939 Value::Null
1940 }
1941 }
1942 "len" | "length" => match args.get(0) {
1943 Some(Value::String(s)) => Value::Int(s.len() as i64),
1944 Some(Value::Array(a)) => Value::Int(a.len() as i64),
1945 Some(Value::Object(o)) => Value::Int(o.len() as i64),
1946 _ => Value::Null,
1947 },
1948 _ => Value::Null,
1949 }
1950}
1951
1952fn eval_template(template: &str, data: &HashMap<String, Value>) -> String {
1954 let mut result = template.to_string();
1955 for (k, v) in data {
1956 let p1 = format!("${{{}}}", k);
1957 let p2 = format!("${}", k);
1958 let v_str = match v {
1959 Value::String(s) => s.clone(),
1960 _ => v.to_string(),
1961 };
1962 if result.contains(&p1) {
1963 result = result.replace(&p1, &v_str);
1964 }
1965 if result.contains(&p2) {
1966 result = result.replace(&p2, &v_str);
1967 }
1968 }
1969 result
1970}
1971
1972fn apply_window_function(
1974 docs: &mut Vec<Document>,
1975 alias: &str,
1976 field: &str,
1977 function: &str,
1978 window: usize,
1979) {
1980 if docs.is_empty() || window == 0 {
1981 return;
1982 }
1983 let values: Vec<Option<f64>> = docs
1984 .iter()
1985 .map(|d| match d.data.get(field) {
1986 Some(Value::Int(i)) => Some(*i as f64),
1987 Some(Value::Float(f)) => Some(*f),
1988 _ => None,
1989 })
1990 .collect();
1991
1992 for (i, doc) in docs.iter_mut().enumerate() {
1993 let start = if i + 1 >= window { i + 1 - window } else { 0 };
1994 let window_vals: Vec<f64> = values[start..=i].iter().filter_map(|v| *v).collect();
1995 if window_vals.is_empty() {
1996 continue;
1997 }
1998 let result = match function {
1999 "rollingAvg" | "avg" => window_vals.iter().sum::<f64>() / window_vals.len() as f64,
2000 "rollingSum" | "sum" => window_vals.iter().sum::<f64>(),
2001 "rollingMin" | "min" => window_vals.iter().cloned().fold(f64::INFINITY, f64::min),
2002 "rollingMax" | "max" => window_vals
2003 .iter()
2004 .cloned()
2005 .fold(f64::NEG_INFINITY, f64::max),
2006 _ => window_vals.iter().sum::<f64>() / window_vals.len() as f64,
2007 };
2008 doc.data.insert(alias.to_string(), Value::Float(result));
2009 }
2010}
2011
2012fn apply_downsample(
2014 docs: Vec<Document>,
2015 interval: &str,
2016 aggregation: &str,
2017 value_fields: &[String],
2018) -> Vec<Document> {
2019 let interval_secs: i64 = parse_interval(interval);
2020 if interval_secs <= 0 {
2021 return docs;
2022 }
2023
2024 let mut buckets: std::collections::BTreeMap<i64, Vec<Document>> =
2026 std::collections::BTreeMap::new();
2027 let mut leftover = Vec::new();
2028
2029 for doc in docs {
2030 let ts = ["timestamp", "ts", "created_at", "time"]
2032 .iter()
2033 .find_map(|&k| doc.data.get(k))
2034 .and_then(|v| match v {
2035 Value::String(s) => chrono::DateTime::parse_from_rfc3339(s)
2036 .ok()
2037 .map(|dt| dt.timestamp()),
2038 Value::Int(i) => Some(*i),
2039 _ => None,
2040 });
2041
2042 if let Some(t) = ts {
2043 let bucket = (t / interval_secs) * interval_secs;
2044 buckets.entry(bucket).or_default().push(doc);
2045 } else {
2046 leftover.push(doc);
2047 }
2048 }
2049
2050 let mut result = Vec::new();
2051 for (bucket_ts, group) in buckets {
2052 let mut data = HashMap::new();
2053 data.insert(
2054 "timestamp".to_string(),
2055 Value::String(
2056 chrono::DateTime::from_timestamp(bucket_ts, 0)
2057 .map(|dt: chrono::DateTime<chrono::Utc>| dt.to_rfc3339())
2058 .unwrap_or_default(),
2059 ),
2060 );
2061 data.insert("count".to_string(), Value::Int(group.len() as i64));
2062
2063 for field in value_fields {
2064 if field == "timestamp" || field == "count" {
2065 continue;
2066 }
2067 let nums: Vec<f64> = group
2068 .iter()
2069 .filter_map(|d| match d.data.get(field) {
2070 Some(Value::Int(i)) => Some(*i as f64),
2071 Some(Value::Float(f)) => Some(*f),
2072 _ => None,
2073 })
2074 .collect();
2075 if nums.is_empty() {
2076 continue;
2077 }
2078 let agg = match aggregation {
2079 "sum" => nums.iter().sum::<f64>(),
2080 "min" => nums.iter().cloned().fold(f64::INFINITY, f64::min),
2081 "max" => nums.iter().cloned().fold(f64::NEG_INFINITY, f64::max),
2082 "count" => nums.len() as f64,
2083 _ => nums.iter().sum::<f64>() / nums.len() as f64, };
2085 data.insert(field.clone(), Value::Float(agg));
2086 }
2087
2088 result.push(Document {
2089 _sid: bucket_ts.to_string(),
2090 data,
2091 });
2092 }
2093
2094 result.extend(leftover);
2095 result
2096}
2097
2098fn parse_interval(s: &str) -> i64 {
2099 let s = s.trim();
2100 if s.ends_with('s') {
2101 s[..s.len() - 1].parse().unwrap_or(0)
2102 } else if s.ends_with('m') {
2103 s[..s.len() - 1].parse::<i64>().unwrap_or(0) * 60
2104 } else if s.ends_with('h') {
2105 s[..s.len() - 1].parse::<i64>().unwrap_or(0) * 3600
2106 } else if s.ends_with('d') {
2107 s[..s.len() - 1].parse::<i64>().unwrap_or(0) * 86400
2108 } else {
2109 s.parse().unwrap_or(3600)
2110 }
2111}
2112
2113async fn execute_handler_registration(
2116 db: &Aurora,
2117 handler: &ast::HandlerDef,
2118 _options: &ExecutionOptions,
2119) -> Result<ExecutionResult> {
2120 use crate::pubsub::events::ChangeType;
2121
2122 let collection = match &handler.trigger {
2123 ast::HandlerTrigger::Insert { collection }
2124 | ast::HandlerTrigger::Update { collection }
2125 | ast::HandlerTrigger::Delete { collection } => {
2126 collection.as_deref().unwrap_or("*").to_string()
2127 }
2128 _ => "*".to_string(),
2129 };
2130
2131 let trigger_type = match &handler.trigger {
2132 ast::HandlerTrigger::Insert { .. } => Some(ChangeType::Insert),
2133 ast::HandlerTrigger::Update { .. } => Some(ChangeType::Update),
2134 ast::HandlerTrigger::Delete { .. } => Some(ChangeType::Delete),
2135 _ => None,
2136 };
2137
2138 let mut listener = if collection == "*" {
2139 db.pubsub.listen_all()
2140 } else {
2141 db.pubsub.listen(collection.clone())
2142 };
2143
2144 let db_clone = db.clone();
2145 let action = handler.action.clone();
2146 let handler_name = handler.name.clone();
2147
2148 tokio::spawn(async move {
2149 loop {
2150 match listener.recv().await {
2151 Ok(event) => {
2152 let matches = trigger_type
2154 .as_ref()
2155 .map(|t| &event.change_type == t)
2156 .unwrap_or(true);
2157 if !matches {
2158 continue;
2159 }
2160
2161 let mut vars = HashMap::new();
2165 vars.insert("_id".to_string(), ast::Value::String(event._sid.clone()));
2166 if let Some(doc) = &event.document {
2167 for (k, v) in &doc.data {
2169 vars.insert(format!("_{}", k), db_value_to_ast_value(v));
2170 }
2171 }
2172
2173 let _ = execute_mutation_op(
2174 &db_clone,
2175 &action,
2176 &vars,
2177 &HashMap::new(),
2178 &ExecutionOptions::default(),
2179 &HashMap::new(),
2180 )
2181 .await;
2182 }
2183 Err(_) => {
2184 eprintln!("[handler:{}] channel closed, stopping", handler_name);
2185 break;
2186 }
2187 }
2188 }
2189 });
2190
2191 eprintln!(
2192 "[handler] '{}' registered on '{}'",
2193 handler.name, collection
2194 );
2195
2196 let mut data = HashMap::new();
2197 data.insert("name".to_string(), Value::String(handler.name.clone()));
2198 data.insert("collection".to_string(), Value::String(collection));
2199 data.insert(
2200 "status".to_string(),
2201 Value::String("registered".to_string()),
2202 );
2203
2204 Ok(ExecutionResult::Query(QueryResult {
2205 collection: "__handler".to_string(),
2206 documents: vec![Document {
2207 _sid: handler.name.clone(),
2208 data,
2209 }],
2210 total_count: Some(1),
2211 deferred_fields: vec![],
2212 explain: None,
2213 }))
2214}
2215
2216fn db_value_to_ast_value(v: &Value) -> ast::Value {
2217 match v {
2218 Value::Null => ast::Value::Null,
2219 Value::Bool(b) => ast::Value::Boolean(*b),
2220 Value::Int(i) => ast::Value::Int(*i),
2221 Value::Float(f) => ast::Value::Float(*f),
2222 Value::String(s) => ast::Value::String(s.clone()),
2223 Value::Uuid(u) => ast::Value::String(u.to_string()),
2224 Value::DateTime(dt) => ast::Value::String(dt.to_rfc3339()),
2225 Value::Array(arr) => ast::Value::Array(arr.iter().map(db_value_to_ast_value).collect()),
2226 Value::Object(m) => ast::Value::Object(
2227 m.iter()
2228 .map(|(k, v)| (k.clone(), db_value_to_ast_value(v)))
2229 .collect(),
2230 ),
2231 }
2232}
2233
2234async fn execute_mutation(
2235 db: &Aurora,
2236 mutation: &ast::Mutation,
2237 vars: &HashMap<String, ast::Value>,
2238 options: &ExecutionOptions,
2239 fragments: &HashMap<String, FragmentDef>,
2240) -> Result<ExecutionResult> {
2241 use crate::transaction::ACTIVE_TRANSACTION_ID;
2242
2243 validate_required_variables(&mutation.variable_definitions, vars)?;
2244
2245 check_permissions(&mutation.directives, options, true)?;
2247
2248 let already_in_tx = ACTIVE_TRANSACTION_ID
2253 .try_with(|id| *id)
2254 .ok()
2255 .and_then(|id| db.transaction_manager.active_transactions.get(&id))
2256 .is_some();
2257
2258 if already_in_tx {
2259 let mut results = Vec::new();
2260 let mut context = HashMap::new();
2261 for mut_op in &mutation.operations {
2262 let res = execute_mutation_op(db, mut_op, vars, &context, options, fragments).await?;
2263 if let Some(alias) = &mut_op.alias {
2264 if let Some(doc) = res.returned_documents.first() {
2265 let mut m = serde_json::Map::new();
2266 for (k, v) in &doc.data {
2267 m.insert(k.clone(), aurora_value_to_json_value(v));
2268 }
2269 m.insert("id".to_string(), JsonValue::String(doc._sid.clone()));
2270 context.insert(alias.clone(), JsonValue::Object(m));
2271 }
2272 }
2273 results.push(res);
2274 }
2275 return if results.len() == 1 {
2276 Ok(ExecutionResult::Mutation(results.remove(0)))
2277 } else {
2278 Ok(ExecutionResult::Batch(
2279 results.into_iter().map(ExecutionResult::Mutation).collect(),
2280 ))
2281 };
2282 }
2283
2284 let tx_id = db.begin_transaction().await;
2287
2288 let exec_result = ACTIVE_TRANSACTION_ID
2289 .scope(tx_id, async {
2290 let mut results = Vec::new();
2291 let mut context = HashMap::new();
2292 for mut_op in &mutation.operations {
2293 let res =
2294 execute_mutation_op(db, mut_op, vars, &context, options, fragments).await?;
2295 if let Some(alias) = &mut_op.alias {
2296 if let Some(doc) = res.returned_documents.first() {
2297 let mut m = serde_json::Map::new();
2298 for (k, v) in &doc.data {
2299 m.insert(k.clone(), aurora_value_to_json_value(v));
2300 }
2301 m.insert("id".to_string(), JsonValue::String(doc._sid.clone()));
2302 context.insert(alias.clone(), JsonValue::Object(m));
2303 }
2304 }
2305 results.push(res);
2306 }
2307 Ok::<_, crate::error::AqlError>(results)
2308 })
2309 .await;
2310
2311 match exec_result {
2312 Ok(mut results) => {
2313 db.commit_transaction(tx_id).await?;
2314 if results.len() == 1 {
2315 Ok(ExecutionResult::Mutation(results.remove(0)))
2316 } else {
2317 Ok(ExecutionResult::Batch(
2318 results.into_iter().map(ExecutionResult::Mutation).collect(),
2319 ))
2320 }
2321 }
2322 Err(e) => {
2323 let _ = db.rollback_transaction(tx_id).await;
2324 Err(e)
2325 }
2326 }
2327}
2328
2329fn execute_mutation_op<'a>(
2330 db: &'a Aurora,
2331 mut_op: &'a ast::MutationOperation,
2332 variables: &'a HashMap<String, ast::Value>,
2333 context: &'a ExecutionContext,
2334 options: &'a ExecutionOptions,
2335 fragments: &'a HashMap<String, FragmentDef>,
2336) -> futures::future::BoxFuture<'a, Result<MutationResult>> {
2337 use futures::future::FutureExt;
2338 async move {
2339 check_permissions(&mut_op.directives, options, true)?;
2341
2342 match &mut_op.operation {
2343 MutationOp::Insert { collection, data } => {
2344 let resolved = resolve_value(data, variables, context);
2345 let doc = db
2346 .aql_insert(collection, aql_value_to_hashmap(&resolved, variables)?)
2347 .await?;
2348 let returned = if !mut_op.selection_set.is_empty() && options.apply_projections {
2349 let fields = collect_fields(
2350 &mut_op.selection_set,
2351 fragments,
2352 variables,
2353 Some(collection),
2354 )
2355 .unwrap_or_default();
2356 vec![apply_projection(doc, &fields, options)?]
2357 } else {
2358 vec![doc]
2359 };
2360
2361 Ok(MutationResult {
2362 operation: "insert".to_string(),
2363 collection: collection.clone(),
2364 affected_count: 1,
2365 returned_documents: returned,
2366 })
2367 }
2368 MutationOp::Update {
2369 collection,
2370 filter,
2371 data,
2372 } => {
2373 let cf = if let Some(f) = filter {
2374 Some(compile_filter(f)?)
2375 } else {
2376 None
2377 };
2378 let update_data =
2379 aql_value_to_hashmap(&resolve_value(data, variables, context), variables)?;
2380
2381 let mut affected = 0;
2383 let mut returned = Vec::new();
2384
2385 let vars_arc = Arc::new(variables.clone());
2388 let cf_arc = cf.map(Arc::new);
2389
2390 let matches = db.scan_and_filter(
2391 collection,
2392 |doc| {
2393 if let Some(ref filter) = cf_arc {
2394 matches_filter(doc, filter, &vars_arc)
2395 } else {
2396 true
2397 }
2398 },
2399 None,
2400 )?;
2401
2402 let fields = if !mut_op.selection_set.is_empty() {
2403 Some(
2404 collect_fields(
2405 &mut_op.selection_set,
2406 fragments,
2407 variables,
2408 Some(collection),
2409 )
2410 .unwrap_or_default(),
2411 )
2412 } else {
2413 None
2414 };
2415
2416 for doc in matches {
2417 let mut new_data = doc.data.clone();
2418 for (k, v) in &update_data {
2419 let applied = apply_field_modifier(new_data.get(k), v);
2420 new_data.insert(k.clone(), applied);
2421 }
2422
2423 let updated_doc = db
2424 .aql_update_document(collection, &doc._sid, new_data)
2425 .await?;
2426
2427 affected += 1;
2428 if let Some(ref f) = fields {
2429 returned.push(apply_projection(updated_doc, f, options)?);
2430 }
2431 }
2432
2433 Ok(MutationResult {
2434 operation: "update".to_string(),
2435 collection: collection.clone(),
2436 affected_count: affected,
2437 returned_documents: returned,
2438 })
2439 }
2440 MutationOp::Delete { collection, filter } => {
2441 let cf = if let Some(f) = filter {
2442 Some(compile_filter(f)?)
2443 } else {
2444 None
2445 };
2446
2447 let mut affected = 0;
2448 let vars_arc = Arc::new(variables.clone());
2449 let cf_arc = cf.map(Arc::new);
2450
2451 let matches = db.scan_and_filter(
2452 collection,
2453 |doc| {
2454 if let Some(ref filter) = cf_arc {
2455 matches_filter(doc, filter, &vars_arc)
2456 } else {
2457 true
2458 }
2459 },
2460 None,
2461 )?;
2462
2463 for doc in matches {
2464 db.aql_delete_document(collection, &doc._sid).await?;
2465 affected += 1;
2466 }
2467
2468 Ok(MutationResult {
2469 operation: "delete".to_string(),
2470 collection: collection.clone(),
2471 affected_count: affected,
2472 returned_documents: vec![],
2473 })
2474 }
2475 MutationOp::InsertMany { collection, data } => {
2476 let resolved = resolve_value(data, variables, context);
2477 let items = match resolved {
2478 ast::Value::Array(arr) => arr,
2479 _ => {
2480 return Err(AqlError::new(
2481 ErrorCode::QueryError,
2482 "insertMany data must be an array".to_string(),
2483 ));
2484 }
2485 };
2486
2487 let mut affected = 0;
2488 let mut returned = Vec::new();
2489 for item in items {
2490 let doc = db
2491 .aql_insert(collection, aql_value_to_hashmap(&item, variables)?)
2492 .await?;
2493 affected += 1;
2494 if !mut_op.selection_set.is_empty() && options.apply_projections {
2495 let fields = collect_fields(
2496 &mut_op.selection_set,
2497 fragments,
2498 variables,
2499 Some(collection),
2500 )
2501 .unwrap_or_default();
2502 returned.push(apply_projection(doc, &fields, options)?);
2503 } else {
2504 returned.push(doc);
2505 }
2506 }
2507 Ok(MutationResult {
2508 operation: "insertMany".to_string(),
2509 collection: collection.clone(),
2510 affected_count: affected,
2511 returned_documents: returned,
2512 })
2513 }
2514 MutationOp::Upsert {
2515 collection,
2516 filter,
2517 data,
2518 } => {
2519 let update_data =
2520 aql_value_to_hashmap(&resolve_value(data, variables, context), variables)?;
2521 let cf = if let Some(f) = filter {
2522 Some(compile_filter(f)?)
2523 } else {
2524 None
2525 };
2526 let vars_arc = Arc::new(variables.clone());
2527 let cf_arc = cf.map(Arc::new);
2528 let matches = db.scan_and_filter(
2529 collection,
2530 |doc| {
2531 if let Some(ref filter) = cf_arc {
2532 matches_filter(doc, filter, &vars_arc)
2533 } else {
2534 true
2535 }
2536 },
2537 Some(1),
2538 )?;
2539 let doc = if let Some(existing) = matches.into_iter().next() {
2540 db.aql_update_document(collection, &existing._sid, update_data)
2541 .await?
2542 } else {
2543 db.aql_insert(collection, update_data).await?
2544 };
2545
2546 let returned = if !mut_op.selection_set.is_empty() && options.apply_projections {
2547 let fields = collect_fields(
2548 &mut_op.selection_set,
2549 fragments,
2550 variables,
2551 Some(collection),
2552 )
2553 .unwrap_or_default();
2554 vec![apply_projection(doc, &fields, options)?]
2555 } else {
2556 vec![doc]
2557 };
2558
2559 Ok(MutationResult {
2560 operation: "upsert".to_string(),
2561 collection: collection.clone(),
2562 affected_count: 1,
2563 returned_documents: returned,
2564 })
2565 }
2566 MutationOp::Transaction { operations } => {
2567 let mut all_returned = Vec::new();
2568 let mut total_affected = 0;
2569 for op in operations {
2570 let res =
2571 execute_mutation_op(db, op, variables, context, options, fragments).await?;
2572 total_affected += res.affected_count;
2573 all_returned.extend(res.returned_documents);
2574 }
2575 Ok(MutationResult {
2576 operation: "transaction".to_string(),
2577 collection: String::new(),
2578 affected_count: total_affected,
2579 returned_documents: all_returned,
2580 })
2581 }
2582 MutationOp::EnqueueJobs {
2583 job_type,
2584 payloads,
2585 priority,
2586 max_retries,
2587 } => {
2588 let workers = db.workers.as_ref().ok_or_else(|| {
2589 AqlError::new(
2590 ErrorCode::QueryError,
2591 "Worker system not initialised".to_string(),
2592 )
2593 })?;
2594 let job_priority = match priority {
2595 ast::JobPriority::Low => crate::workers::JobPriority::Low,
2596 ast::JobPriority::Normal => crate::workers::JobPriority::Normal,
2597 ast::JobPriority::High => crate::workers::JobPriority::High,
2598 ast::JobPriority::Critical => crate::workers::JobPriority::Critical,
2599 };
2600 let mut returned = Vec::new();
2601 for payload in payloads {
2602 let resolved = resolve_value(payload, variables, context);
2603 let json_payload: std::collections::HashMap<String, serde_json::Value> =
2604 if let ast::Value::Object(map) = &resolved {
2605 map.iter()
2606 .map(|(k, v)| (k.clone(), aql_value_to_json(v)))
2607 .collect()
2608 } else {
2609 std::collections::HashMap::new()
2610 };
2611 let mut job =
2612 crate::workers::Job::new(job_type.clone()).with_priority(job_priority);
2613 for (k, v) in json_payload {
2614 job = job.add_field(k, v);
2615 }
2616 if let Some(retries) = max_retries {
2617 job = job.with_max_retries(*retries);
2618 }
2619 let job_id = workers.enqueue(job).await?;
2620 let mut doc = crate::types::Document::new();
2621 doc._sid = job_id.clone();
2622 doc.data.insert("job_id".to_string(), Value::String(job_id));
2623 doc.data
2624 .insert("job_type".to_string(), Value::String(job_type.clone()));
2625 doc.data
2626 .insert("status".to_string(), Value::String("pending".to_string()));
2627 returned.push(doc);
2628 }
2629 let count = returned.len();
2630 Ok(MutationResult {
2631 operation: "enqueueJobs".to_string(),
2632 collection: "__jobs".to_string(),
2633 affected_count: count,
2634 returned_documents: returned,
2635 })
2636 }
2637 MutationOp::Import { collection, data } => {
2638 let mut affected = 0;
2639 let mut returned = Vec::new();
2640 let fields = if !mut_op.selection_set.is_empty() {
2641 Some(
2642 collect_fields(
2643 &mut_op.selection_set,
2644 fragments,
2645 variables,
2646 Some(collection),
2647 )
2648 .unwrap_or_default(),
2649 )
2650 } else {
2651 None
2652 };
2653 for item in data {
2654 let resolved = resolve_value(item, variables, context);
2655 let map = aql_value_to_hashmap(&resolved, variables)?;
2656 let doc = db.aql_insert(collection, map).await?;
2657 affected += 1;
2658 if let Some(ref f) = fields {
2659 returned.push(apply_projection(doc, f, options)?);
2660 }
2661 }
2662 Ok(MutationResult {
2663 operation: "import".to_string(),
2664 collection: collection.clone(),
2665 affected_count: affected,
2666 returned_documents: returned,
2667 })
2668 }
2669 MutationOp::Export {
2670 collection,
2671 format: _,
2672 } => {
2673 let docs = db.scan_and_filter(collection, |_| true, None)?;
2674 let fields = if !mut_op.selection_set.is_empty() {
2675 Some(
2676 collect_fields(
2677 &mut_op.selection_set,
2678 fragments,
2679 variables,
2680 Some(collection),
2681 )
2682 .unwrap_or_default(),
2683 )
2684 } else {
2685 None
2686 };
2687 let mut returned = Vec::with_capacity(docs.len());
2688 if let Some(ref f) = fields {
2689 for d in docs {
2690 returned.push(apply_projection(d, f, options)?);
2691 }
2692 } else {
2693 returned = docs;
2694 }
2695 let count = returned.len();
2696 Ok(MutationResult {
2697 operation: "export".to_string(),
2698 collection: collection.clone(),
2699 affected_count: count,
2700 returned_documents: returned,
2701 })
2702 }
2703 MutationOp::EnqueueJob {
2704 job_type,
2705 payload,
2706 priority,
2707 scheduled_at,
2708 max_retries,
2709 } => {
2710 let workers = db.workers.as_ref().ok_or_else(|| {
2711 AqlError::new(
2712 ErrorCode::QueryError,
2713 "Worker system not initialised".to_string(),
2714 )
2715 })?;
2716
2717 let job_priority = match priority {
2719 ast::JobPriority::Low => crate::workers::JobPriority::Low,
2720 ast::JobPriority::Normal => crate::workers::JobPriority::Normal,
2721 ast::JobPriority::High => crate::workers::JobPriority::High,
2722 ast::JobPriority::Critical => crate::workers::JobPriority::Critical,
2723 };
2724
2725 let resolved = resolve_value(payload, variables, context);
2727 let json_payload: std::collections::HashMap<String, serde_json::Value> =
2728 if let ast::Value::Object(map) = &resolved {
2729 map.iter()
2730 .map(|(k, v)| (k.clone(), aql_value_to_json(v)))
2731 .collect()
2732 } else {
2733 std::collections::HashMap::new()
2734 };
2735
2736 let mut job =
2737 crate::workers::Job::new(job_type.clone()).with_priority(job_priority);
2738
2739 for (k, v) in json_payload {
2740 job = job.add_field(k, v);
2741 }
2742
2743 if let Some(retries) = max_retries {
2744 job = job.with_max_retries(*retries);
2745 }
2746
2747 if let Some(scheduled) = scheduled_at {
2748 if let Ok(dt) = scheduled.parse::<chrono::DateTime<chrono::Utc>>() {
2749 job = job.scheduled_at(dt);
2750 }
2751 }
2752
2753 let job_id = workers.enqueue(job).await?;
2754
2755 let mut doc = crate::types::Document::new();
2756 doc._sid = job_id.clone();
2757 doc.data
2758 .insert("job_id".to_string(), crate::types::Value::String(job_id));
2759 doc.data.insert(
2760 "job_type".to_string(),
2761 crate::types::Value::String(job_type.clone()),
2762 );
2763 doc.data.insert(
2764 "status".to_string(),
2765 crate::types::Value::String("pending".to_string()),
2766 );
2767
2768 Ok(MutationResult {
2769 operation: "enqueueJob".to_string(),
2770 collection: "__jobs".to_string(),
2771 affected_count: 1,
2772 returned_documents: vec![doc],
2773 })
2774 }
2775 }
2776 }
2777 .boxed()
2778}
2779
2780async fn execute_subscription(
2781 db: &Aurora,
2782 sub: &ast::Subscription,
2783 vars: &HashMap<String, ast::Value>,
2784 _options: &ExecutionOptions,
2785) -> Result<ExecutionResult> {
2786 let vars: HashMap<String, ast::Value> = vars.clone();
2787
2788 let selection = sub.selection_set.first().ok_or_else(|| {
2789 AqlError::new(
2790 ErrorCode::QueryError,
2791 "Subscription must have a selection".to_string(),
2792 )
2793 })?;
2794
2795 if let Selection::Field(f) = selection {
2796 let collection = f.name.clone();
2797 let filter = extract_filter_from_args(&f.arguments)?;
2798
2799 let mut listener = db.pubsub.listen(&collection);
2800
2801 if let Some(f) = filter {
2802 let event_filter = ast_filter_to_event_filter(&f, &vars)?;
2803 listener = listener.filter(event_filter);
2804 }
2805
2806 Ok(ExecutionResult::Subscription(SubscriptionResult {
2807 subscription_id: uuid::Uuid::new_v4().to_string(),
2808 collection,
2809 stream: Some(listener),
2810 }))
2811 } else {
2812 Err(AqlError::new(
2813 ErrorCode::QueryError,
2814 "Invalid subscription selection".to_string(),
2815 ))
2816 }
2817}
2818
2819fn ast_filter_to_event_filter(
2820 filter: &AqlFilter,
2821 vars: &HashMap<String, ast::Value>,
2822) -> Result<crate::pubsub::EventFilter> {
2823 use crate::pubsub::EventFilter;
2824
2825 match filter {
2826 AqlFilter::Eq(f, v) => Ok(EventFilter::FieldEquals(
2827 f.clone(),
2828 aql_value_to_db_value(v, vars)?,
2829 )),
2830 AqlFilter::Ne(f, v) => Ok(EventFilter::Ne(f.clone(), aql_value_to_db_value(v, vars)?)),
2831 AqlFilter::Gt(f, v) => Ok(EventFilter::Gt(f.clone(), aql_value_to_db_value(v, vars)?)),
2832 AqlFilter::Gte(f, v) => Ok(EventFilter::Gte(f.clone(), aql_value_to_db_value(v, vars)?)),
2833 AqlFilter::Lt(f, v) => Ok(EventFilter::Lt(f.clone(), aql_value_to_db_value(v, vars)?)),
2834 AqlFilter::Lte(f, v) => Ok(EventFilter::Lte(f.clone(), aql_value_to_db_value(v, vars)?)),
2835 AqlFilter::In(f, v) => Ok(EventFilter::In(f.clone(), aql_value_to_db_value(v, vars)?)),
2836 AqlFilter::NotIn(f, v) => Ok(EventFilter::NotIn(
2837 f.clone(),
2838 aql_value_to_db_value(v, vars)?,
2839 )),
2840 AqlFilter::Contains(f, v) => Ok(EventFilter::Contains(
2841 f.clone(),
2842 aql_value_to_db_value(v, vars)?,
2843 )),
2844 AqlFilter::ContainsAny(f, v) | AqlFilter::ContainsAll(f, v) => Ok(EventFilter::Contains(
2846 f.clone(),
2847 aql_value_to_db_value(v, vars)?,
2848 )),
2849 AqlFilter::StartsWith(f, v) => Ok(EventFilter::StartsWith(
2850 f.clone(),
2851 aql_value_to_db_value(v, vars)?,
2852 )),
2853 AqlFilter::EndsWith(f, v) => Ok(EventFilter::EndsWith(
2854 f.clone(),
2855 aql_value_to_db_value(v, vars)?,
2856 )),
2857 AqlFilter::Matches(f, v) => {
2858 let pattern = match aql_value_to_db_value(v, vars)? {
2859 crate::types::Value::String(s) => s,
2860 other => other.to_string(),
2861 };
2862 let re = regex::Regex::new(&pattern).map_err(|e| {
2863 crate::error::AqlError::invalid_operation(format!("Invalid regex pattern: {}", e))
2864 })?;
2865 Ok(EventFilter::Matches(f.clone(), re))
2866 }
2867 AqlFilter::IsNull(f) => Ok(EventFilter::IsNull(f.clone())),
2868 AqlFilter::IsNotNull(f) => Ok(EventFilter::IsNotNull(f.clone())),
2869 AqlFilter::And(filters) => {
2870 let mut mapped = Vec::new();
2871 for f in filters {
2872 mapped.push(ast_filter_to_event_filter(f, vars)?);
2873 }
2874 Ok(EventFilter::And(mapped))
2875 }
2876 AqlFilter::Or(filters) => {
2877 let mut mapped = Vec::new();
2878 for f in filters {
2879 mapped.push(ast_filter_to_event_filter(f, vars)?);
2880 }
2881 Ok(EventFilter::Or(mapped))
2882 }
2883 AqlFilter::Not(f) => Ok(EventFilter::Not(Box::new(ast_filter_to_event_filter(
2884 f, vars,
2885 )?))),
2886 }
2887}
2888
2889async fn execute_introspection(
2890 db: &Aurora,
2891 intro: &ast::IntrospectionQuery,
2892) -> Result<ExecutionResult> {
2893 let names = db.list_collection_names();
2894 let want_fields = intro.fields.is_empty()
2895 || intro
2896 .fields
2897 .iter()
2898 .any(|f| f == "collections" || f == "fields");
2899
2900 let documents: Vec<Document> = names
2901 .iter()
2902 .filter_map(|name| {
2903 if name.starts_with('_') {
2905 return None;
2906 }
2907 let col = db.get_collection_definition(name).ok()?;
2908 let mut data = HashMap::new();
2909 data.insert("name".to_string(), Value::String(name.clone()));
2910
2911 if want_fields {
2912 let field_list: Vec<Value> = col
2913 .fields
2914 .iter()
2915 .map(|(fname, fdef)| {
2916 let mut fdata = HashMap::new();
2917 fdata.insert("name".to_string(), Value::String(fname.clone()));
2918 fdata.insert(
2919 "type".to_string(),
2920 Value::String(fdef.field_type.to_string()),
2921 );
2922 fdata.insert("required".to_string(), Value::Bool(!fdef.nullable));
2923 fdata.insert("indexed".to_string(), Value::Bool(fdef.indexed));
2924 fdata.insert("unique".to_string(), Value::Bool(fdef.unique));
2925 if !fdef.validations.is_empty() {
2926 let vcons: Vec<Value> = fdef
2927 .validations
2928 .iter()
2929 .map(|c| Value::String(format!("{:?}", c)))
2930 .collect();
2931 fdata.insert("validations".to_string(), Value::Array(vcons));
2932 }
2933 Value::Object(fdata)
2934 })
2935 .collect();
2936 data.insert("fields".to_string(), Value::Array(field_list));
2937 }
2938
2939 Some(Document {
2940 _sid: name.clone(),
2941 data,
2942 })
2943 })
2944 .collect();
2945
2946 let count = documents.len();
2947 Ok(ExecutionResult::Query(QueryResult {
2948 collection: "__schema".to_string(),
2949 documents,
2950 total_count: Some(count),
2951 deferred_fields: vec![],
2952 explain: None,
2953 }))
2954}
2955
2956async fn execute_schema(
2957 db: &Aurora,
2958 schema: &ast::Schema,
2959 _options: &ExecutionOptions,
2960) -> Result<ExecutionResult> {
2961 let mut last_collection = String::new();
2962
2963 for op in &schema.operations {
2964 match op {
2965 ast::SchemaOp::DefineCollection {
2966 name,
2967 fields,
2968 if_not_exists,
2969 ..
2970 } => {
2971 last_collection = name.clone();
2972
2973 if *if_not_exists {
2975 if db.get_collection_definition(name).is_ok() {
2976 continue;
2977 }
2978 }
2979
2980 let mut field_defs = Vec::new();
2981 for field in fields {
2982 field_defs.push((field.name.as_str(), build_field_def(field)));
2983 }
2984 db.new_collection(name, field_defs).await?;
2985 }
2986 ast::SchemaOp::AlterCollection { name, actions } => {
2987 last_collection = name.clone();
2988 for action in actions {
2989 match action {
2990 ast::AlterAction::AddField { field, default } => {
2991 let def = build_field_def(field);
2992 db.add_field_to_schema(name, field.name.clone(), def)
2993 .await?;
2994 if let Some(default_val) = default {
2995 let db_val = aql_value_to_db_value(default_val, &HashMap::new())?;
2996 let docs = db.get_all_collection(name).await?;
2997 for doc in docs {
2998 if !doc.data.contains_key(&field.name) {
2999 db.update_document(
3000 name,
3001 &doc._sid,
3002 vec![(&field.name, db_val.clone())],
3003 )
3004 .await?;
3005 }
3006 }
3007 }
3008 }
3009 ast::AlterAction::DropField(field_name) => {
3010 db.drop_field_from_schema(name, field_name.clone()).await?;
3011 }
3012 ast::AlterAction::RenameField { from, to } => {
3013 db.rename_field_in_schema(name, from.clone(), to.clone())
3014 .await?;
3015 let docs = db.get_all_collection(name).await?;
3016 for mut doc in docs {
3017 if let Some(val) = doc.data.remove(from.as_str()) {
3018 doc.data.insert(to.clone(), val);
3019 let key = format!("{}:{}", name, doc._sid);
3020 db.put(key, serde_json::to_vec(&doc)?, None).await?;
3021 }
3022 }
3023 }
3024 ast::AlterAction::ModifyField(field) => {
3025 db.modify_field_in_schema(
3026 name,
3027 field.name.clone(),
3028 build_field_def(field),
3029 )
3030 .await?;
3031 }
3032 }
3033 }
3034 }
3035 ast::SchemaOp::DropCollection { name, .. } => {
3036 db.drop_collection_schema(name).await?;
3037 last_collection = name.clone();
3038 }
3039 }
3040 }
3041
3042 Ok(ExecutionResult::Schema(SchemaResult {
3043 operation: "schema".to_string(),
3044 collection: last_collection,
3045 status: "done".to_string(),
3046 }))
3047}
3048
3049fn map_ast_type(anno: &ast::TypeAnnotation) -> FieldType {
3050 let scalar = match anno.name.to_lowercase().as_str() {
3051 "string" => ScalarType::String,
3052 "int" | "integer" => ScalarType::Int,
3053 "float" | "double" => ScalarType::Float,
3054 "bool" | "boolean" => ScalarType::Bool,
3055 "uuid" => ScalarType::Uuid,
3056 "object" => ScalarType::Object,
3057 "array" => ScalarType::Array,
3058 _ => ScalarType::Any,
3059 };
3060
3061 if anno.is_array {
3062 FieldType::Array(scalar)
3063 } else {
3064 match scalar {
3065 ScalarType::Object => FieldType::Object,
3066 ScalarType::Any => FieldType::Any,
3067 _ => FieldType::Scalar(scalar),
3068 }
3069 }
3070}
3071
3072fn parse_validate_directive(
3074 directive: &ast::Directive,
3075) -> Vec<crate::types::FieldValidationConstraint> {
3076 use crate::types::FieldValidationConstraint as FVC;
3077 let mut constraints = Vec::new();
3078 for arg in &directive.arguments {
3079 match arg.name.as_str() {
3080 "format" => {
3081 if let ast::Value::String(s) = &arg.value {
3082 constraints.push(FVC::Format(s.clone()));
3083 }
3084 }
3085 "min" => {
3086 let n = match &arg.value {
3087 ast::Value::Float(f) => Some(*f),
3088 ast::Value::Int(i) => Some(*i as f64),
3089 _ => None,
3090 };
3091 if let Some(n) = n {
3092 constraints.push(FVC::Min(n));
3093 }
3094 }
3095 "max" => {
3096 let n = match &arg.value {
3097 ast::Value::Float(f) => Some(*f),
3098 ast::Value::Int(i) => Some(*i as f64),
3099 _ => None,
3100 };
3101 if let Some(n) = n {
3102 constraints.push(FVC::Max(n));
3103 }
3104 }
3105 "minLength" => {
3106 if let ast::Value::Int(i) = &arg.value {
3107 constraints.push(FVC::MinLength(*i));
3108 }
3109 }
3110 "maxLength" => {
3111 if let ast::Value::Int(i) = &arg.value {
3112 constraints.push(FVC::MaxLength(*i));
3113 }
3114 }
3115 "pattern" => {
3116 if let ast::Value::String(s) = &arg.value {
3117 constraints.push(FVC::Pattern(s.clone()));
3118 }
3119 }
3120 _ => {}
3121 }
3122 }
3123 constraints
3124}
3125
3126fn build_field_def(field: &ast::FieldDef) -> FieldDefinition {
3128 let field_type = map_ast_type(&field.field_type);
3129 let mut indexed = false;
3130 let mut unique = false;
3131 let mut relation = None;
3132 let mut validations = Vec::new();
3133 for directive in &field.directives {
3134 match directive.name.as_str() {
3135 "indexed" | "index" => indexed = true,
3136 "unique" => {
3137 unique = true;
3138 indexed = true;
3139 }
3140 "primary" => {
3141 indexed = true;
3142 unique = true;
3143 }
3144 "relation" => {
3145 let to = directive
3146 .arguments
3147 .iter()
3148 .find(|a| a.name == "to" || a.name == "collection")
3149 .and_then(|a| {
3150 if let ast::Value::String(s) = &a.value {
3151 Some(s.clone())
3152 } else {
3153 None
3154 }
3155 })
3156 .unwrap_or_default();
3157 let key = directive
3158 .arguments
3159 .iter()
3160 .find(|a| a.name == "key" || a.name == "field")
3161 .and_then(|a| {
3162 if let ast::Value::String(s) = &a.value {
3163 Some(s.clone())
3164 } else {
3165 None
3166 }
3167 })
3168 .unwrap_or_else(|| "id".to_string());
3169 if !to.is_empty() {
3170 relation = Some(crate::types::Relation { to, key });
3171 }
3172 }
3173 "validate" => validations.extend(parse_validate_directive(directive)),
3174 _ => {}
3175 }
3176 }
3177 FieldDefinition {
3178 field_type,
3179 unique,
3180 indexed,
3181 nullable: !field.field_type.is_required,
3182 validations,
3183 relation,
3184 }
3185}
3186
3187async fn execute_migration(
3188 db: &Aurora,
3189 migration: &ast::Migration,
3190 _options: &ExecutionOptions,
3191) -> Result<ExecutionResult> {
3192 let mut steps_applied = 0;
3193 let mut last_version = String::new();
3194
3195 for step in &migration.steps {
3196 last_version = step.version.clone();
3197
3198 if db.is_migration_applied(&step.version).await? {
3199 eprintln!(
3200 "[migration] version '{}' already applied — skipping",
3201 step.version
3202 );
3203 continue;
3204 }
3205
3206 eprintln!("[migration] applying version '{}'", step.version);
3207
3208 for action in &step.actions {
3209 match action {
3210 ast::MigrationAction::Schema(schema_op) => {
3211 execute_single_schema_op(db, schema_op).await?;
3212 }
3213 ast::MigrationAction::DataMigration(dm) => {
3214 execute_data_migration(db, dm).await?;
3215 }
3216 }
3217 }
3218
3219 db.mark_migration_applied(&step.version).await?;
3220 steps_applied += 1;
3221 eprintln!("[migration] version '{}' applied", step.version);
3222 }
3223
3224 let status = if steps_applied > 0 {
3225 "applied".to_string()
3226 } else {
3227 "skipped".to_string()
3228 };
3229
3230 Ok(ExecutionResult::Migration(MigrationResult {
3231 version: last_version,
3232 steps_applied,
3233 status,
3234 }))
3235}
3236
3237async fn execute_single_schema_op(db: &Aurora, op: &ast::SchemaOp) -> Result<()> {
3239 match op {
3240 ast::SchemaOp::DefineCollection {
3241 name,
3242 fields,
3243 if_not_exists,
3244 ..
3245 } => {
3246 if *if_not_exists && db.get_collection_definition(name).is_ok() {
3247 return Ok(());
3248 }
3249 let field_defs: Vec<(&str, FieldDefinition)> = fields
3250 .iter()
3251 .map(|f| (f.name.as_str(), build_field_def(f)))
3252 .collect();
3253 db.new_collection(name, field_defs).await?;
3254 }
3255 ast::SchemaOp::AlterCollection { name, actions } => {
3256 for action in actions {
3257 match action {
3258 ast::AlterAction::AddField { field, default } => {
3259 db.add_field_to_schema(name, field.name.clone(), build_field_def(field))
3260 .await?;
3261 if let Some(default_val) = default {
3262 let db_val = aql_value_to_db_value(default_val, &HashMap::new())?;
3263 let docs = db.get_all_collection(name).await?;
3264 for doc in docs {
3265 if !doc.data.contains_key(&field.name) {
3266 db.update_document(
3267 name,
3268 &doc._sid,
3269 vec![(&field.name, db_val.clone())],
3270 )
3271 .await?;
3272 }
3273 }
3274 }
3275 }
3276 ast::AlterAction::DropField(field_name) => {
3277 db.drop_field_from_schema(name, field_name.clone()).await?;
3278 }
3279 ast::AlterAction::RenameField { from, to } => {
3280 db.rename_field_in_schema(name, from.clone(), to.clone())
3281 .await?;
3282 let docs = db.get_all_collection(name).await?;
3284 for mut doc in docs {
3285 if let Some(val) = doc.data.remove(from.as_str()) {
3286 doc.data.insert(to.clone(), val);
3287 let key = format!("{}:{}", name, doc._sid);
3288 db.put(key, serde_json::to_vec(&doc)?, None).await?;
3289 }
3290 }
3291 }
3292 ast::AlterAction::ModifyField(field) => {
3293 db.modify_field_in_schema(name, field.name.clone(), build_field_def(field))
3294 .await?;
3295 }
3296 }
3297 }
3298 }
3299 ast::SchemaOp::DropCollection { name, if_exists } => {
3300 if *if_exists && db.get_collection_definition(name).is_err() {
3301 return Ok(());
3302 }
3303 db.drop_collection_schema(name).await?;
3304 }
3305 }
3306 Ok(())
3307}
3308
3309async fn execute_data_migration(db: &Aurora, dm: &ast::DataMigration) -> Result<()> {
3311 let docs = db.get_all_collection(&dm.collection).await?;
3312
3313 for doc in docs {
3314 for transform in &dm.transforms {
3315 let matches = match &transform.filter {
3316 Some(filter) => {
3317 let compiled = compile_filter(filter)?;
3318 matches_filter(&doc, &compiled, &HashMap::new())
3319 }
3320 None => true,
3321 };
3322
3323 if matches {
3324 let new_value = eval_migration_expr(&transform.expression, &doc);
3325 let mut updates = HashMap::new();
3326 updates.insert(transform.field.clone(), new_value);
3327 db.aql_update_document(&dm.collection, &doc._sid, updates)
3328 .await?;
3329 }
3330 }
3331 }
3332 Ok(())
3333}
3334
3335fn eval_migration_expr(expr: &str, doc: &Document) -> Value {
3345 let expr = expr.trim();
3346
3347 if expr.starts_with('"') && expr.ends_with('"') && expr.len() >= 2 {
3348 return Value::String(expr[1..expr.len() - 1].to_string());
3349 }
3350 if expr == "true" {
3351 return Value::Bool(true);
3352 }
3353 if expr == "false" {
3354 return Value::Bool(false);
3355 }
3356 if expr == "null" {
3357 return Value::Null;
3358 }
3359 if let Ok(n) = expr.parse::<i64>() {
3360 return Value::Int(n);
3361 }
3362 if let Ok(f) = expr.parse::<f64>() {
3363 return Value::Float(f);
3364 }
3365 if let Some(v) = doc.data.get(expr) {
3366 return v.clone();
3367 }
3368
3369 Value::Null
3370}
3371
3372fn extract_filter_from_args(args: &[ast::Argument]) -> Result<Option<AqlFilter>> {
3373 for a in args {
3374 if a.name == "where" || a.name == "filter" {
3375 return Ok(Some(value_to_filter(&a.value)?));
3376 }
3377 }
3378 Ok(None)
3379}
3380
3381fn extract_order_by(args: &[ast::Argument]) -> Vec<ast::Ordering> {
3382 let mut orderings = Vec::new();
3383 for a in args {
3384 if a.name == "orderBy" {
3385 match &a.value {
3386 ast::Value::String(f) => orderings.push(ast::Ordering {
3387 field: f.clone(),
3388 direction: ast::SortDirection::Asc,
3389 }),
3390 ast::Value::Object(obj) => {
3391 if let (Some(ast::Value::String(field_name)), Some(dir_val)) =
3393 (obj.get("field"), obj.get("direction"))
3394 {
3395 let direction = match dir_val {
3396 ast::Value::Enum(s) | ast::Value::String(s) => {
3397 if s.to_uppercase() == "DESC" {
3398 ast::SortDirection::Desc
3399 } else {
3400 ast::SortDirection::Asc
3401 }
3402 }
3403 _ => ast::SortDirection::Asc,
3404 };
3405 orderings.push(ast::Ordering {
3406 field: field_name.clone(),
3407 direction,
3408 });
3409 } else {
3410 for (field, dir_val) in obj {
3412 let direction = match dir_val {
3413 ast::Value::Enum(s) | ast::Value::String(s) => {
3414 if s.to_uppercase() == "DESC" {
3415 ast::SortDirection::Desc
3416 } else {
3417 ast::SortDirection::Asc
3418 }
3419 }
3420 _ => ast::SortDirection::Asc,
3421 };
3422 orderings.push(ast::Ordering {
3423 field: field.clone(),
3424 direction,
3425 });
3426 }
3427 }
3428 }
3429 _ => {}
3430 }
3431 }
3432 }
3433 orderings
3434}
3435
3436fn apply_ordering(docs: &mut [Document], orderings: &[ast::Ordering]) {
3437 docs.sort_by(|a, b| {
3438 for o in orderings {
3439 let cmp = compare_values(a.data.get(&o.field), b.data.get(&o.field));
3440 if cmp != std::cmp::Ordering::Equal {
3441 return match o.direction {
3442 ast::SortDirection::Asc => cmp,
3443 ast::SortDirection::Desc => cmp.reverse(),
3444 };
3445 }
3446 }
3447 std::cmp::Ordering::Equal
3448 });
3449}
3450
3451fn compare_values(a: Option<&Value>, b: Option<&Value>) -> std::cmp::Ordering {
3452 match (a, b) {
3453 (None, None) => std::cmp::Ordering::Equal,
3454 (None, Some(_)) => std::cmp::Ordering::Less,
3455 (Some(_), None) => std::cmp::Ordering::Greater,
3456 (Some(av), Some(bv)) => av.partial_cmp(bv).unwrap_or(std::cmp::Ordering::Equal),
3457 }
3458}
3459
3460pub fn extract_pagination(args: &[ast::Argument]) -> (Option<usize>, usize) {
3461 let (mut limit, mut offset) = (None, 0);
3462 for a in args {
3463 match a.name.as_str() {
3464 "limit" | "first" => {
3465 if let ast::Value::Int(n) = a.value {
3466 limit = Some(n as usize);
3467 }
3468 }
3469 "offset" | "skip" => {
3470 if let ast::Value::Int(n) = a.value {
3471 offset = n as usize;
3472 }
3473 }
3474 _ => {}
3475 }
3476 }
3477 (limit, offset)
3478}
3479
3480fn extract_cursor_pagination(
3481 args: &[ast::Argument],
3482) -> (Option<usize>, Option<String>, Option<usize>, Option<String>) {
3483 let (mut first, mut after, mut last, mut before) = (None, None, None, None);
3484 for a in args {
3485 match a.name.as_str() {
3486 "first" => {
3487 if let ast::Value::Int(n) = a.value {
3488 first = Some(n as usize);
3489 }
3490 }
3491 "after" => {
3492 if let ast::Value::String(s) = &a.value {
3493 after = Some(s.clone());
3494 }
3495 }
3496 "last" => {
3497 if let ast::Value::Int(n) = a.value {
3498 last = Some(n as usize);
3499 }
3500 }
3501 "before" => {
3502 if let ast::Value::String(s) = &a.value {
3503 before = Some(s.clone());
3504 }
3505 }
3506 _ => {}
3507 }
3508 }
3509 (first, after, last, before)
3510}
3511
3512fn execute_connection(
3513 mut docs: Vec<Document>,
3514 sub_fields: &[ast::Field],
3515 limit: Option<usize>,
3516 fragments: &HashMap<String, FragmentDef>,
3517 variables: &HashMap<String, ast::Value>,
3518 options: &ExecutionOptions,
3519) -> Result<QueryResult> {
3520 let has_next_page = if let Some(l) = limit {
3521 docs.len() > l
3522 } else {
3523 false
3524 };
3525
3526 if has_next_page {
3527 docs.truncate(limit.unwrap());
3528 }
3529
3530 let mut edges = Vec::with_capacity(docs.len());
3531 let mut end_cursor = String::new();
3532
3533 let node_fields = if let Some(edges_field) = sub_fields.iter().find(|f| f.name == "edges") {
3535 let edges_sub_fields =
3536 collect_fields(&edges_field.selection_set, fragments, variables, None)
3537 .unwrap_or_default();
3538 if let Some(node_field) = edges_sub_fields.into_iter().find(|f| f.name == "node") {
3539 collect_fields(&node_field.selection_set, fragments, variables, None)
3540 .unwrap_or_default()
3541 } else {
3542 Vec::new()
3543 }
3544 } else {
3545 Vec::new()
3546 };
3547
3548 for doc in docs {
3549 let cursor = doc._sid.clone();
3550 end_cursor = cursor.clone();
3551
3552 let mut edge_data = HashMap::new();
3553 edge_data.insert("cursor".to_string(), Value::String(cursor));
3554
3555 let node_doc = if node_fields.is_empty() {
3556 doc
3557 } else {
3558 apply_projection(doc, &node_fields, options)?
3559 };
3560
3561 edge_data.insert("node".to_string(), Value::Object(node_doc.data));
3562 edges.push(Value::Object(edge_data));
3563 }
3564
3565 let mut page_info = HashMap::new();
3566 page_info.insert("hasNextPage".to_string(), Value::Bool(has_next_page));
3567 page_info.insert("endCursor".to_string(), Value::String(end_cursor));
3568
3569 let mut conn_data = HashMap::new();
3570 conn_data.insert("edges".to_string(), Value::Array(edges));
3571 conn_data.insert("pageInfo".to_string(), Value::Object(page_info));
3572
3573 Ok(QueryResult {
3574 collection: String::new(),
3575 documents: vec![Document {
3576 _sid: "connection".to_string(),
3577 data: conn_data,
3578 }],
3579 total_count: None,
3580 deferred_fields: vec![],
3581 explain: None,
3582 })
3583}
3584
3585pub fn matches_filter(
3586 doc: &Document,
3587 filter: &CompiledFilter,
3588 vars: &HashMap<String, ast::Value>,
3589) -> bool {
3590 match filter {
3591 CompiledFilter::Eq(f, v) => {
3592 if let Some(dv) = doc.data.get(f) {
3593 values_equal(dv, v, vars)
3594 } else if f == "_sid" {
3595 values_equal(&Value::String(doc._sid.clone()), v, vars)
3596 } else {
3597 false
3598 }
3599 }
3600 CompiledFilter::Ne(f, v) => {
3601 if let Some(dv) = doc.data.get(f) {
3602 !values_equal(dv, v, vars)
3603 } else if f == "_sid" {
3604 !values_equal(&Value::String(doc._sid.clone()), v, vars)
3605 } else {
3606 true
3607 }
3608 }
3609 CompiledFilter::Gt(f, v) => {
3610 let dv_opt = doc.data.get(f).cloned().or_else(|| {
3611 if f == "_sid" {
3612 Some(Value::String(doc._sid.clone()))
3613 } else {
3614 None
3615 }
3616 });
3617 dv_opt.map_or(false, |dv| {
3618 if let Ok(bv) = aql_value_to_db_value(v, vars) {
3619 return dv > bv;
3620 }
3621 false
3622 })
3623 }
3624 CompiledFilter::Gte(f, v) => {
3625 let dv_opt = doc.data.get(f).cloned().or_else(|| {
3626 if f == "_sid" {
3627 Some(Value::String(doc._sid.clone()))
3628 } else {
3629 None
3630 }
3631 });
3632 dv_opt.map_or(false, |dv| {
3633 if let Ok(bv) = aql_value_to_db_value(v, vars) {
3634 return dv >= bv;
3635 }
3636 false
3637 })
3638 }
3639 CompiledFilter::Lt(f, v) => {
3640 let dv_opt = doc.data.get(f).cloned().or_else(|| {
3641 if f == "_sid" {
3642 Some(Value::String(doc._sid.clone()))
3643 } else {
3644 None
3645 }
3646 });
3647 dv_opt.map_or(false, |dv| {
3648 if let Ok(bv) = aql_value_to_db_value(v, vars) {
3649 return dv < bv;
3650 }
3651 false
3652 })
3653 }
3654 CompiledFilter::Lte(f, v) => {
3655 let dv_opt = doc.data.get(f).cloned().or_else(|| {
3656 if f == "_sid" {
3657 Some(Value::String(doc._sid.clone()))
3658 } else {
3659 None
3660 }
3661 });
3662 dv_opt.map_or(false, |dv| {
3663 if let Ok(bv) = aql_value_to_db_value(v, vars) {
3664 return dv <= bv;
3665 }
3666 false
3667 })
3668 }
3669 CompiledFilter::In(f, v) => {
3670 let dv_opt = doc.data.get(f).cloned().or_else(|| {
3671 if f == "_sid" {
3672 Some(Value::String(doc._sid.clone()))
3673 } else {
3674 None
3675 }
3676 });
3677 dv_opt.map_or(false, |dv| {
3678 if let Ok(Value::Array(arr)) = aql_value_to_db_value(v, vars) {
3679 return arr.contains(&dv);
3680 }
3681 false
3682 })
3683 }
3684 CompiledFilter::NotIn(f, v) => {
3685 let dv_opt = doc.data.get(f).cloned().or_else(|| {
3686 if f == "_sid" {
3687 Some(Value::String(doc._sid.clone()))
3688 } else {
3689 None
3690 }
3691 });
3692 dv_opt.map_or(true, |dv| {
3693 if let Ok(Value::Array(arr)) = aql_value_to_db_value(v, vars) {
3694 return !arr.contains(&dv);
3695 }
3696 true
3697 })
3698 }
3699 CompiledFilter::Contains(f, v) => {
3700 let dv_opt = doc.data.get(f).cloned().or_else(|| {
3701 if f == "_sid" {
3702 Some(Value::String(doc._sid.clone()))
3703 } else {
3704 None
3705 }
3706 });
3707 if let Some(dv) = dv_opt {
3708 match (dv, resolve_if_variable(v, vars)) {
3709 (Value::String(s), ast::Value::String(sub)) => s.contains(&*sub),
3710 (Value::Array(arr), _) => {
3711 if let Ok(bv) = aql_value_to_db_value(v, vars) {
3712 return arr.contains(&bv);
3713 }
3714 false
3715 }
3716 _ => false,
3717 }
3718 } else {
3719 false
3720 }
3721 }
3722 CompiledFilter::ContainsAny(f, v) => {
3724 let dv_opt = doc.data.get(f).cloned().or_else(|| {
3725 if f == "_sid" {
3726 Some(Value::String(doc._sid.clone()))
3727 } else {
3728 None
3729 }
3730 });
3731 if let (Some(Value::Array(field_arr)), Ok(Value::Array(check_arr))) =
3732 (dv_opt, aql_value_to_db_value(v, vars))
3733 {
3734 check_arr.iter().any(|item| field_arr.contains(item))
3735 } else {
3736 false
3737 }
3738 }
3739 CompiledFilter::ContainsAll(f, v) => {
3741 let dv_opt = doc.data.get(f).cloned().or_else(|| {
3742 if f == "_sid" {
3743 Some(Value::String(doc._sid.clone()))
3744 } else {
3745 None
3746 }
3747 });
3748 if let (Some(Value::Array(field_arr)), Ok(Value::Array(check_arr))) =
3749 (dv_opt, aql_value_to_db_value(v, vars))
3750 {
3751 check_arr.iter().all(|item| field_arr.contains(item))
3752 } else {
3753 false
3754 }
3755 }
3756 CompiledFilter::StartsWith(f, v) => {
3757 let dv_opt = doc.data.get(f).cloned().or_else(|| {
3758 if f == "_sid" {
3759 Some(Value::String(doc._sid.clone()))
3760 } else {
3761 None
3762 }
3763 });
3764 if let (Some(Value::String(s)), ast::Value::String(pre)) =
3765 (dv_opt, resolve_if_variable(v, vars))
3766 {
3767 s.starts_with(&*pre)
3768 } else {
3769 false
3770 }
3771 }
3772 CompiledFilter::EndsWith(f, v) => {
3773 let dv_opt = doc.data.get(f).cloned().or_else(|| {
3774 if f == "_sid" {
3775 Some(Value::String(doc._sid.clone()))
3776 } else {
3777 None
3778 }
3779 });
3780 if let (Some(Value::String(s)), ast::Value::String(suf)) =
3781 (dv_opt, resolve_if_variable(v, vars))
3782 {
3783 s.ends_with(&*suf)
3784 } else {
3785 false
3786 }
3787 }
3788 CompiledFilter::Matches(f, re) => {
3789 let dv_opt = doc.data.get(f).cloned().or_else(|| {
3790 if f == "_sid" {
3791 Some(Value::String(doc._sid.clone()))
3792 } else {
3793 None
3794 }
3795 });
3796 if let Some(Value::String(s)) = dv_opt {
3797 re.is_match(&s)
3798 } else {
3799 false
3800 }
3801 }
3802 CompiledFilter::IsNull(f) => {
3803 if f == "_sid" {
3804 false } else {
3806 doc.data.get(f).map_or(true, |v| matches!(v, Value::Null))
3807 }
3808 }
3809 CompiledFilter::IsNotNull(f) => {
3810 if f == "_sid" {
3811 true } else {
3813 doc.data.get(f).map_or(false, |v| !matches!(v, Value::Null))
3814 }
3815 }
3816 CompiledFilter::And(fs) => fs.iter().all(|f| matches_filter(doc, f, vars)),
3817 CompiledFilter::Or(fs) => fs.iter().any(|f| matches_filter(doc, f, vars)),
3818 CompiledFilter::Not(f) => !matches_filter(doc, f, vars),
3819 }
3820}
3821
3822fn apply_field_modifier(existing: Option<&Value>, new_val: &Value) -> Value {
3827 if let Value::Object(modifier) = new_val {
3828 if let Some(delta) = modifier.get("increment") {
3829 match (existing, delta) {
3830 (Some(Value::Int(c)), Value::Int(d)) => return Value::Int(c + d),
3831 (Some(Value::Float(c)), Value::Float(d)) => return Value::Float(c + d),
3832 (Some(Value::Int(c)), Value::Float(d)) => return Value::Float(*c as f64 + d),
3833 _ => {}
3834 }
3835 }
3836 if let Some(delta) = modifier.get("decrement") {
3837 match (existing, delta) {
3838 (Some(Value::Int(c)), Value::Int(d)) => return Value::Int(c - d),
3839 (Some(Value::Float(c)), Value::Float(d)) => return Value::Float(c - d),
3840 (Some(Value::Int(c)), Value::Float(d)) => return Value::Float(*c as f64 - d),
3841 _ => {}
3842 }
3843 }
3844 if let Some(item) = modifier.get("push") {
3845 if let Some(Value::Array(mut arr)) = existing.cloned() {
3846 arr.push(item.clone());
3847 return Value::Array(arr);
3848 }
3849 return Value::Array(vec![item.clone()]);
3850 }
3851 if let Some(item) = modifier.get("pull") {
3852 if let Some(Value::Array(arr)) = existing {
3853 let filtered: Vec<Value> = arr.iter().filter(|v| *v != item).cloned().collect();
3854 return Value::Array(filtered);
3855 }
3856 return Value::Array(vec![]);
3857 }
3858 if let Some(item) = modifier.get("addToSet") {
3859 if let Some(Value::Array(mut arr)) = existing.cloned() {
3860 if !arr.contains(item) {
3861 arr.push(item.clone());
3862 }
3863 return Value::Array(arr);
3864 }
3865 return Value::Array(vec![item.clone()]);
3866 }
3867 }
3868 new_val.clone()
3869}
3870
3871fn values_equal(dv: &Value, av: &ast::Value, vars: &HashMap<String, ast::Value>) -> bool {
3872 if let Ok(bv) = aql_value_to_db_value(av, vars) {
3873 return dv == &bv;
3874 }
3875 false
3876}
3877
3878fn resolve_if_variable<'a>(
3879 v: &'a ast::Value,
3880 vars: &'a HashMap<String, ast::Value>,
3881) -> &'a ast::Value {
3882 if let ast::Value::Variable(n) = v {
3883 vars.get(n).unwrap_or(v)
3884 } else {
3885 v
3886 }
3887}
3888
3889pub fn apply_projection(
3890 doc: Document,
3891 fields: &[ast::Field],
3892 options: &ExecutionOptions,
3893) -> Result<Document> {
3894 let (projected, _) = apply_projection_and_defer(doc, fields, &options.variables, options)?;
3895 Ok(projected)
3896}
3897
3898async fn apply_projection_with_lookups(
3901 db: &Aurora,
3902 mut doc: Document,
3903 collection_name: &str,
3904 fields: &[ast::Field],
3905 fragments: &HashMap<String, FragmentDef>,
3906 vars: &HashMap<String, ast::Value>,
3907 options: &ExecutionOptions,
3908 pinned_fields: &[String],
3909) -> Result<(Document, Vec<String>)> {
3910 if fields.is_empty() {
3911 return Ok((doc, vec![]));
3912 }
3913 let mut proj = HashMap::new();
3914 let mut deferred = Vec::new();
3915
3916 for f in fields {
3918 check_permissions(&f.directives, options, false)?;
3920
3921 if f.directives.iter().any(|d| d.name == "defer") {
3923 deferred.push(f.alias.as_ref().unwrap_or(&f.name).clone());
3924 continue;
3925 }
3926
3927 let mut lookup_info: Option<(String, String, String)> = None;
3928 let coll_arg = f.arguments.iter().find(|a| a.name == "collection");
3929 let local_arg = f.arguments.iter().find(|a| a.name == "localField");
3930 let foreign_arg = f.arguments.iter().find(|a| a.name == "foreignField");
3931
3932 if let (Some(c), Some(lf), Some(ff)) = (coll_arg, local_arg, foreign_arg) {
3933 let c_val = resolve_if_variable(&c.value, vars);
3934 let lf_val = resolve_if_variable(&lf.value, vars);
3935 let ff_val = resolve_if_variable(&ff.value, vars);
3936
3937 if let (
3938 ast::Value::String(foreign_coll),
3939 ast::Value::String(local_field),
3940 ast::Value::String(foreign_field),
3941 ) = (c_val, lf_val, ff_val)
3942 {
3943 lookup_info = Some((
3944 foreign_coll.clone(),
3945 local_field.clone(),
3946 foreign_field.clone(),
3947 ));
3948 }
3949 } else if let Ok(col_def) = db.get_collection_definition(collection_name) {
3950 if let Some(f_def) = col_def.fields.get(&f.name) {
3951 if let Some(rel) = &f_def.relation {
3952 lookup_info = Some((rel.to.clone(), f.name.clone(), rel.key.clone()));
3953 }
3954 }
3955 }
3956
3957 if let Some((foreign_coll, local_field, foreign_field)) = lookup_info {
3958 let local_val = doc.data.get(local_field.as_str()).cloned().or_else(|| {
3959 if local_field == "_sid" {
3960 Some(Value::String(doc._sid.clone()))
3961 } else {
3962 None
3963 }
3964 });
3965
3966 if let Some(match_val) = local_val {
3967 let extra_filter = f
3968 .arguments
3969 .iter()
3970 .find(|a| a.name == "where")
3971 .and_then(|a| {
3972 let resolved = resolve_ast_deep(&a.value, vars);
3973 value_to_filter(&resolved).ok()
3974 });
3975
3976 let vars_arc = Arc::new(vars.clone());
3977 let foreign_docs = db.scan_and_filter(
3978 &foreign_coll,
3979 |fdoc| {
3980 let field_match = fdoc
3981 .data
3982 .get(foreign_field.as_str())
3983 .map(|v| values_equal_db(v, &match_val))
3984 .unwrap_or(
3985 foreign_field == "_sid" && fdoc._sid == match_val.to_string(),
3986 );
3987 if !field_match {
3988 return false;
3989 }
3990 if let Some(ref ef) = extra_filter {
3991 let compiled = compile_filter(ef)
3992 .unwrap_or(CompiledFilter::Eq("_".into(), ast::Value::Null));
3993 matches_filter(fdoc, &compiled, &vars_arc)
3994 } else {
3995 true
3996 }
3997 },
3998 None,
3999 )?;
4000
4001 let sub_projected: Vec<Value> = if f.selection_set.is_empty() {
4002 foreign_docs
4003 .into_iter()
4004 .map(|fd| Value::Object(fd.data))
4005 .collect()
4006 } else {
4007 let sub_fields: Vec<ast::Field> =
4008 collect_fields(&f.selection_set, fragments, vars, Some(&foreign_coll))?;
4009 let mut sub_results = Vec::with_capacity(foreign_docs.len());
4010 for fd in foreign_docs {
4011 let (proj_fd, _) =
4012 apply_projection_and_defer(fd, &sub_fields, vars, options)?;
4013 sub_results.push(Value::Object(proj_fd.data));
4014 }
4015 sub_results
4016 };
4017
4018 proj.insert(f.alias.as_ref().unwrap_or(&f.name).clone(), Value::Array(sub_projected));
4019 }
4020 continue;
4021 }
4022
4023 if f.name == "__compute__" {
4025 let alias = f.alias.as_deref().unwrap_or("computed");
4026 if let Some(expr_arg) = f.arguments.iter().find(|a| a.name == "expr") {
4027 if let ast::Value::String(template) = &expr_arg.value {
4028 let result = eval_template(template, &doc.data);
4029 proj.insert(alias.to_string(), Value::String(result));
4030 } else if let Some(expr) = f.computed_expression.as_ref() {
4031 let result = eval_expression(expr, &doc.data, vars);
4032 proj.insert(alias.to_string(), result);
4033 }
4034 }
4035 continue;
4036 }
4037
4038 if f.name == "id" {
4040 let v = doc.data.get("id").cloned().unwrap_or(Value::String(doc._sid.clone()));
4041 proj.insert(f.alias.as_ref().unwrap_or(&f.name).clone(), v);
4042 } else if f.name == "_sid" {
4043 proj.insert(
4044 f.alias.as_ref().unwrap_or(&f.name).clone(),
4045 Value::String(doc._sid.clone()),
4046 );
4047 } else if let Some(v) = doc.data.get(&f.name) {
4048 proj.insert(f.alias.as_ref().unwrap_or(&f.name).clone(), v.clone());
4049 }
4050 }
4051
4052 for pinned in pinned_fields {
4054 if !proj.contains_key(pinned) {
4055 if let Some(v) = doc.data.get(pinned) {
4056 proj.insert(pinned.clone(), v.clone());
4057 }
4058 }
4059 }
4060
4061 doc.data = proj;
4062 Ok((doc, deferred))
4063}
4064
4065fn values_equal_db(a: &Value, b: &Value) -> bool {
4066 a == b
4067}
4068
4069pub fn aql_value_to_db_value(v: &ast::Value, vars: &HashMap<String, ast::Value>) -> Result<Value> {
4070 let resolved = resolve_if_variable(v, vars);
4071 match resolved {
4072 ast::Value::Int(i) => Ok(Value::Int(*i)),
4073 ast::Value::Float(f) => Ok(Value::Float(*f)),
4074 ast::Value::Boolean(b) => Ok(Value::Bool(*b)),
4075 ast::Value::String(s) => Ok(Value::String(s.clone())),
4076 ast::Value::Enum(s) => Ok(Value::String(s.clone())),
4077 ast::Value::Null => Ok(Value::Null),
4078 ast::Value::Variable(name) => Err(AqlError::new(
4079 ErrorCode::UndefinedVariable,
4080 format!("Variable '{}' not found", name),
4081 )),
4082 ast::Value::Array(arr) => {
4083 let mut vals = Vec::with_capacity(arr.len());
4084 for v in arr {
4085 vals.push(aql_value_to_db_value(v, vars)?);
4086 }
4087 Ok(Value::Array(vals))
4088 }
4089 ast::Value::Object(obj) => {
4090 let mut map = HashMap::with_capacity(obj.len());
4091 for (k, v) in obj {
4092 map.insert(k.clone(), aql_value_to_db_value(v, vars)?);
4093 }
4094 Ok(Value::Object(map))
4095 }
4096 }
4097}
4098
4099fn aql_value_to_json(v: &ast::Value) -> serde_json::Value {
4101 match v {
4102 ast::Value::Null => serde_json::Value::Null,
4103 ast::Value::Boolean(b) => serde_json::Value::Bool(*b),
4104 ast::Value::Int(i) => serde_json::Value::Number((*i).into()),
4105 ast::Value::Float(f) => serde_json::Number::from_f64(*f)
4106 .map(serde_json::Value::Number)
4107 .unwrap_or(serde_json::Value::Null),
4108 ast::Value::String(s) | ast::Value::Enum(s) => serde_json::Value::String(s.clone()),
4109 ast::Value::Array(arr) => {
4110 serde_json::Value::Array(arr.iter().map(aql_value_to_json).collect())
4111 }
4112 ast::Value::Object(obj) => serde_json::Value::Object(
4113 obj.iter()
4114 .map(|(k, v)| (k.clone(), aql_value_to_json(v)))
4115 .collect(),
4116 ),
4117 ast::Value::Variable(_) => serde_json::Value::Null,
4118 }
4119}
4120
4121fn aql_value_to_hashmap(
4122 v: &ast::Value,
4123 vars: &HashMap<String, ast::Value>,
4124) -> Result<HashMap<String, Value>> {
4125 if let ast::Value::Object(m) = resolve_if_variable(v, vars) {
4126 let mut res = HashMap::new();
4127 for (k, v) in m {
4128 res.insert(k.clone(), aql_value_to_db_value(v, vars)?);
4129 }
4130 Ok(res)
4131 } else {
4132 Err(AqlError::new(
4133 ErrorCode::QueryError,
4134 "Data must be object".to_string(),
4135 ))
4136 }
4137}
4138
4139fn aurora_value_to_json_value(v: &Value) -> JsonValue {
4140 match v {
4141 Value::Null => JsonValue::Null,
4142 Value::String(s) => JsonValue::String(s.clone()),
4143 Value::Int(i) => JsonValue::Number((*i).into()),
4144 Value::Float(f) => serde_json::Number::from_f64(*f)
4145 .map(JsonValue::Number)
4146 .unwrap_or(JsonValue::Null),
4147 Value::Bool(b) => JsonValue::Bool(*b),
4148 Value::Array(arr) => JsonValue::Array(arr.iter().map(aurora_value_to_json_value).collect()),
4149 Value::Object(m) => {
4150 let mut jm = serde_json::Map::new();
4151 for (k, v) in m {
4152 jm.insert(k.clone(), aurora_value_to_json_value(v));
4153 }
4154 JsonValue::Object(jm)
4155 }
4156 Value::Uuid(u) => JsonValue::String(u.to_string()),
4157 Value::DateTime(dt) => JsonValue::String(dt.to_rfc3339()),
4158 }
4159}
4160
4161fn find_indexed_equality_filter(
4163 filter: &AqlFilter,
4164 db: &Aurora,
4165 collection: &str,
4166) -> Option<(String, ast::Value)> {
4167 match filter {
4168 AqlFilter::Eq(field, val) => {
4169 if field == "_sid" || db.has_index(collection, field) {
4170 Some((field.clone(), val.clone()))
4171 } else {
4172 None
4173 }
4174 }
4175 AqlFilter::And(filters) => {
4176 for f in filters {
4177 if let Some(res) = find_indexed_equality_filter(f, db, collection) {
4178 return Some(res);
4179 }
4180 }
4181 None
4182 }
4183 _ => None,
4184 }
4185}
4186
4187pub fn value_to_filter(v: &ast::Value) -> Result<AqlFilter> {
4188 if let ast::Value::Object(m) = v {
4189 let mut fs = Vec::new();
4190 for (k, val) in m {
4191 match k.as_str() {
4192 "or" => {
4193 if let ast::Value::Array(arr) = val {
4194 let mut sub = Vec::new();
4195 for item in arr {
4196 sub.push(value_to_filter(item)?);
4197 }
4198 return Ok(AqlFilter::Or(sub));
4199 }
4200 }
4201 "and" => {
4202 if let ast::Value::Array(arr) = val {
4203 let mut sub = Vec::new();
4204 for item in arr {
4205 sub.push(value_to_filter(item)?);
4206 }
4207 return Ok(AqlFilter::And(sub));
4208 }
4209 }
4210 "not" => {
4211 return Ok(AqlFilter::Not(Box::new(value_to_filter(val)?)));
4212 }
4213 field => {
4214 if let ast::Value::Object(ops) = val {
4215 for (op, ov) in ops {
4216 match op.as_str() {
4217 "eq" => fs.push(AqlFilter::Eq(field.to_string(), ov.clone())),
4218 "ne" => fs.push(AqlFilter::Ne(field.to_string(), ov.clone())),
4219 "gt" => fs.push(AqlFilter::Gt(field.to_string(), ov.clone())),
4220 "gte" => fs.push(AqlFilter::Gte(field.to_string(), ov.clone())),
4221 "lt" => fs.push(AqlFilter::Lt(field.to_string(), ov.clone())),
4222 "lte" => fs.push(AqlFilter::Lte(field.to_string(), ov.clone())),
4223 "in" => fs.push(AqlFilter::In(field.to_string(), ov.clone())),
4224 "notin" => fs.push(AqlFilter::NotIn(field.to_string(), ov.clone())),
4225 "contains" => {
4226 fs.push(AqlFilter::Contains(field.to_string(), ov.clone()))
4227 }
4228 "containsAny" => {
4229 fs.push(AqlFilter::ContainsAny(field.to_string(), ov.clone()))
4230 }
4231 "containsAll" => {
4232 fs.push(AqlFilter::ContainsAll(field.to_string(), ov.clone()))
4233 }
4234 "startsWith" => {
4235 fs.push(AqlFilter::StartsWith(field.to_string(), ov.clone()))
4236 }
4237 "endsWith" => {
4238 fs.push(AqlFilter::EndsWith(field.to_string(), ov.clone()))
4239 }
4240 "matches" => {
4241 fs.push(AqlFilter::Matches(field.to_string(), ov.clone()))
4242 }
4243 _ => {}
4244 }
4245 }
4246 }
4247 }
4248 }
4249 }
4250 if fs.is_empty() {
4251 Ok(AqlFilter::And(vec![]))
4252 } else if fs.len() == 1 {
4253 Ok(fs.remove(0))
4254 } else {
4255 Ok(AqlFilter::And(fs))
4256 }
4257 } else {
4258 Err(AqlError::new(
4259 ErrorCode::QueryError,
4260 "Filter must be object".to_string(),
4261 ))
4262 }
4263}
4264
4265fn resolve_value(
4266 v: &ast::Value,
4267 vars: &HashMap<String, ast::Value>,
4268 _ctx: &ExecutionContext,
4269) -> ast::Value {
4270 match v {
4271 ast::Value::Variable(n) => vars.get(n).cloned().unwrap_or(ast::Value::Null),
4272 _ => v.clone(),
4273 }
4274}
4275
4276fn resolve_ast_deep(v: &ast::Value, vars: &HashMap<String, ast::Value>) -> ast::Value {
4281 match v {
4282 ast::Value::Variable(n) => vars.get(n).cloned().unwrap_or(ast::Value::Null),
4283 ast::Value::Object(m) => ast::Value::Object(
4284 m.iter()
4285 .map(|(k, val)| (k.clone(), resolve_ast_deep(val, vars)))
4286 .collect(),
4287 ),
4288 ast::Value::Array(arr) => {
4289 ast::Value::Array(arr.iter().map(|val| resolve_ast_deep(val, vars)).collect())
4290 }
4291 _ => v.clone(),
4292 }
4293}
4294
4295#[cfg(test)]
4296mod tests {
4297 use super::*;
4298 use crate::{Aurora, AuroraConfig, DurabilityMode, FieldType};
4299 use tempfile::TempDir;
4300
4301 #[tokio::test]
4302 async fn test_executor_integration() {
4303 let td = TempDir::new().unwrap();
4304 let db = Aurora::with_config(AuroraConfig {
4305 db_path: td.path().join("test.db"),
4306 enable_write_buffering: false,
4307 durability_mode: DurabilityMode::Strict,
4308 ..Default::default()
4309 })
4310 .await
4311 .unwrap();
4312 db.new_collection(
4313 "users",
4314 vec![(
4315 "name",
4316 crate::types::FieldDefinition {
4317 field_type: FieldType::SCALAR_STRING,
4318 unique: false,
4319 indexed: true,
4320 nullable: true,
4321 ..Default::default()
4322 },
4323 )],
4324 )
4325 .await
4326 .unwrap();
4327 let _ = execute(
4328 &db,
4329 r#"mutation { insertInto(collection: "users", data: { name: "Alice" }) { id name } }"#,
4330 ExecutionOptions::new(),
4331 )
4332 .await
4333 .unwrap();
4334 let res = execute(&db, "query { users { name } }", ExecutionOptions::new())
4335 .await
4336 .unwrap();
4337 if let ExecutionResult::Query(q) = res {
4338 assert_eq!(q.documents.len(), 1);
4339 } else {
4340 panic!("Expected query");
4341 }
4342 }
4343}