1use super::ast::{self, Field, Filter as AqlFilter, FragmentDef, MutationOp, Operation, Selection};
7use super::executor_utils::{CompiledFilter, compile_filter};
8
9use crate::Aurora;
10use crate::error::{AqlError, ErrorCode, Result};
11use crate::types::{Document, FieldDefinition, FieldType, ScalarType, Value};
12use serde::Serialize;
13use serde_json::Value as JsonValue;
14use std::collections::HashMap;
15use std::sync::Arc;
16
17#[allow(unused_imports)]
19use chrono::TimeZone as _;
20
21pub type ExecutionContext = HashMap<String, JsonValue>;
22
23#[derive(Debug, Clone)]
25pub struct QueryPlan {
26 pub collection: String,
27 pub filter: Option<ast::Filter>,
28 pub compiled_filter: Option<CompiledFilter>,
29 pub projection: Vec<ast::Field>,
30 pub limit: Option<usize>,
31 pub offset: usize,
32 pub after: Option<String>,
33 pub orderings: Vec<ast::Ordering>,
34 pub is_connection: bool,
35 pub has_lookups: bool,
36 pub fragments: HashMap<String, FragmentDef>,
37 pub variable_definitions: Vec<ast::VariableDefinition>,
38}
39
40impl QueryPlan {
41 pub fn validate(&self, provided_variables: &HashMap<String, ast::Value>) -> Result<()> {
42 validate_required_variables(&self.variable_definitions, provided_variables)
43 }
44
45 pub fn from_query(
46 query: &ast::Query,
47 fragments: &HashMap<String, FragmentDef>,
48 variables: &HashMap<String, ast::Value>,
49 ) -> Result<Vec<Self>> {
50 let root_fields = collect_fields(&query.selection_set, fragments, variables, None)?;
51
52 let mut plans = Vec::new();
53 for field in root_fields {
54 let collection = field.name.clone();
55 let filter = extract_filter_from_args(&field.arguments)?;
56 let compiled_filter = if let Some(ref f) = filter {
57 Some(compile_filter(f)?)
58 } else {
59 None
60 };
61
62 let sub_fields = collect_fields(
63 &field.selection_set,
64 fragments,
65 variables,
66 Some(&field.name),
67 )?;
68
69 let (limit, offset) = extract_pagination(&field.arguments);
70 let (first, after, _last, _before) = extract_cursor_pagination(&field.arguments);
71 let orderings = extract_order_by(&field.arguments);
72 let is_connection = sub_fields
73 .iter()
74 .any(|f| f.name == "edges" || f.name == "pageInfo");
75
76 let has_lookups = sub_fields.iter().any(|f| {
77 f.arguments.iter().any(|arg| {
78 arg.name == "collection"
79 || arg.name == "localField"
80 || arg.name == "foreignField"
81 })
82 });
83
84 plans.push(QueryPlan {
85 collection,
86 filter,
87 compiled_filter,
88 projection: sub_fields,
89 limit: limit.or(first),
90 offset,
91 after,
92 orderings,
93 is_connection,
94 has_lookups,
95 fragments: fragments.clone(),
96 variable_definitions: query.variable_definitions.clone(),
97 });
98 }
99 Ok(plans)
100 }
101}
102
103pub async fn execute(db: &Aurora, aql: &str, options: ExecutionOptions) -> Result<ExecutionResult> {
105 use std::collections::hash_map::DefaultHasher;
106 use std::hash::{Hash, Hasher};
107
108 let query_key = {
110 let mut hasher = DefaultHasher::new();
111 aql.trim().hash(&mut hasher);
112 hasher.finish()
113 };
114
115 let vars: HashMap<String, ast::Value> = options
117 .variables
118 .iter()
119 .map(|(k, v)| (k.clone(), json_to_aql_value(v.clone())))
120 .collect();
121
122 if let Some(plan) = db.plan_cache.get(&query_key) {
124 plan.validate(&vars)?;
125 return execute_plan(db, &plan, &vars, &options).await;
126 }
127
128 let mut doc = super::parse(aql)?;
130
131 if let Some(Operation::Query(query)) = doc
133 .operations
134 .iter()
135 .find(|op| matches!(op, Operation::Query(_)))
136 {
137 let fragments: HashMap<String, FragmentDef> = doc
138 .operations
139 .iter()
140 .filter_map(|op| {
141 if let Operation::FragmentDefinition(f) = op {
142 Some((f.name.clone(), f.clone()))
143 } else {
144 None
145 }
146 })
147 .collect();
148
149 let plans = QueryPlan::from_query(query, &fragments, &vars)?;
150 if plans.len() == 1 {
151 let plan = Arc::new(plans[0].clone());
152 plan.validate(&vars)?;
153 db.plan_cache.insert(query_key, Arc::clone(&plan));
154
155 return execute_plan(db, &plan, &vars, &options).await;
156 }
157 }
158
159 let mut vars = vars;
162 for op in &doc.operations {
163 if let Operation::Query(q) = op {
164 for var_def in &q.variable_definitions {
165 if !vars.contains_key(&var_def.name) {
166 if let Some(default) = &var_def.default_value {
167 vars.insert(var_def.name.clone(), default.clone());
168 }
169 }
170 }
171 }
172 }
173
174 super::validator::resolve_variables(&mut doc, &vars).map_err(|e| {
176 let code = match e.code {
177 super::validator::ErrorCode::MissingRequiredVariable => ErrorCode::UndefinedVariable,
178 super::validator::ErrorCode::TypeMismatch => ErrorCode::TypeError,
179 _ => ErrorCode::QueryError,
180 };
181 AqlError::new(code, e.to_string())
182 })?;
183
184 execute_document(db, &doc, &options).await
185}
186
187async fn execute_plan(
188 db: &Aurora,
189 plan: &QueryPlan,
190 variables: &HashMap<String, ast::Value>,
191 options: &ExecutionOptions,
192) -> Result<ExecutionResult> {
193 let collection_name = &plan.collection;
194
195 let effective_limit = plan.limit;
197
198 let mut indexed_docs = None;
200 if let Some(ref f) = plan.filter {
201 if let Some((field, val)) =
203 find_indexed_equality_filter_runtime(f, db, collection_name, variables)
204 {
205 let db_val = aql_value_to_db_value(&val, variables)?;
206 let ids = db.get_ids_from_index(collection_name, &field, &db_val);
207
208 let mut docs = Vec::with_capacity(ids.len());
209 for id in ids {
210 if let Some(doc) = db.get_document(collection_name, &id)? {
211 if let Some(ref cf) = plan.compiled_filter {
213 if matches_filter(&doc, cf, variables) {
214 docs.push(doc);
215 }
216 } else {
217 docs.push(doc);
218 }
219 }
220 }
221 indexed_docs = Some(docs);
222 }
223 }
224
225 let mut docs = if let Some(d) = indexed_docs {
226 d
227 } else {
228 let vars_arc = Arc::new(variables.clone());
230 let cf_clone = plan.compiled_filter.clone();
231 let filter_fn = move |doc: &Document| {
232 cf_clone
233 .as_ref()
234 .map(|f| matches_filter(doc, f, &vars_arc))
235 .unwrap_or(true)
236 };
237
238 let scan_limit = if plan.after.is_some() || !plan.orderings.is_empty() {
241 None
242 } else {
243 plan.limit.map(|l| {
244 let base = if plan.is_connection { l + 1 } else { l };
245 base + plan.offset
246 })
247 };
248 db.scan_and_filter(collection_name, filter_fn, scan_limit)?
249 };
250
251 if !plan.orderings.is_empty() {
253 apply_ordering(&mut docs, &plan.orderings);
254 }
255
256 if let Some(ref cursor) = plan.after {
258 if let Some(pos) = docs.iter().position(|d| &d.id == cursor) {
259 docs.drain(0..=pos);
260 }
261 }
262
263 if plan.offset > 0 {
265 if plan.offset < docs.len() {
266 docs.drain(0..plan.offset);
267 } else {
268 docs.clear();
269 }
270 }
271
272 if plan.is_connection {
274 return Ok(ExecutionResult::Query(execute_connection(
275 docs,
276 &plan.projection,
277 effective_limit,
278 &plan.fragments,
279 variables,
280 )));
281 }
282
283 if let Some(l) = effective_limit {
285 docs.truncate(l);
286 }
287
288 if options.apply_projections && !plan.projection.is_empty() {
290 let agg_field = plan.projection.iter().find(|f| f.name == "aggregate");
292 let group_by_field = plan.projection.iter().find(|f| f.name == "groupBy");
293
294 if let Some(f) = group_by_field {
295 let field_name = f
297 .arguments
298 .iter()
299 .find(|a| a.name == "field")
300 .and_then(|a| match &a.value {
301 ast::Value::String(s) => Some(s),
302 _ => None,
303 });
304
305 if let Some(group_field) = field_name {
306 let mut groups: HashMap<String, Vec<Document>> = HashMap::new();
307 for d in docs {
308 let key = d
309 .data
310 .get(group_field)
311 .map(|v| match v {
312 Value::String(s) => s.clone(),
313 _ => v.to_string(),
314 })
315 .unwrap_or_else(|| "null".to_string());
316 groups.entry(key).or_default().push(d);
317 }
318
319 let mut group_docs = Vec::with_capacity(groups.len());
320 for (key, group_items) in groups {
321 let mut data = HashMap::new();
322 for selection in &f.selection_set {
323 if let Selection::Field(sub_f) = selection {
324 let alias = sub_f.alias.as_ref().unwrap_or(&sub_f.name);
325 match sub_f.name.as_str() {
326 "key" => {
327 data.insert(alias.clone(), Value::String(key.clone()));
328 }
329 "count" => {
330 data.insert(
331 alias.clone(),
332 Value::Int(group_items.len() as i64),
333 );
334 }
335 "nodes" => {
336 let sub_fields = collect_fields(
337 &sub_f.selection_set,
338 &plan.fragments,
339 variables,
340 None,
341 )
342 .unwrap_or_default();
343
344 let projected_nodes = group_items
345 .iter()
346 .map(|d| {
347 let node_data =
348 apply_projection(d.clone(), &sub_fields).data;
349 Value::Object(node_data)
350 })
351 .collect();
352 data.insert(alias.clone(), Value::Array(projected_nodes));
353 }
354 "aggregate" => {
355 let agg_res =
356 compute_aggregates(&group_items, &sub_f.selection_set);
357 data.insert(alias.clone(), Value::Object(agg_res));
358 }
359 _ => {}
360 }
361 }
362 }
363 group_docs.push(Document {
364 id: format!("group:{}", key),
365 data,
366 });
367 }
368 docs = group_docs;
369 }
370 } else if let Some(agg) = agg_field {
371 let agg_result = compute_aggregates(&docs, &agg.selection_set);
375 let alias = agg.alias.as_ref().unwrap_or(&agg.name);
376
377 if plan.projection.len() == 1 {
378 let mut data = HashMap::new();
379 data.insert(alias.clone(), Value::Object(agg_result));
380 docs = vec![Document {
381 id: "aggregate".to_string(),
382 data,
383 }];
384 } else {
385 docs = docs
387 .into_iter()
388 .map(|mut d| {
389 d.data
390 .insert(alias.clone(), Value::Object(agg_result.clone()));
391 apply_projection(d, &plan.projection)
392 })
393 .collect();
394 }
395 } else if !plan.has_lookups {
396 docs = docs
397 .into_iter()
398 .map(|d| apply_projection(d, &plan.projection))
399 .collect();
400 } else {
401 let mut projected = Vec::with_capacity(docs.len());
403 for d in docs {
404 let (proj_doc, _) =
405 apply_projection_with_lookups(db, d, &plan.projection, variables).await?;
406 projected.push(proj_doc);
407 }
408 docs = projected;
409 }
410 }
411
412 Ok(ExecutionResult::Query(QueryResult {
413 collection: collection_name.clone(),
414 documents: docs,
415 total_count: None,
416 deferred_fields: vec![],
417 explain: None,
418 }))
419}
420
421fn compute_aggregates(docs: &[Document], selections: &[Selection]) -> HashMap<String, Value> {
422 let mut results = HashMap::new();
423
424 for selection in selections {
425 if let Selection::Field(f) = selection {
426 let alias = f.alias.as_ref().unwrap_or(&f.name);
427 let value = match f.name.as_str() {
428 "count" => Value::Int(docs.len() as i64),
429 "sum" => {
430 let field =
431 f.arguments
432 .iter()
433 .find(|a| a.name == "field")
434 .and_then(|a| match &a.value {
435 ast::Value::String(s) => Some(s),
436 _ => None,
437 });
438
439 if let Some(field_name) = field {
440 let sum: f64 = docs
441 .iter()
442 .filter_map(|d| d.data.get(field_name))
443 .filter_map(|v| match v {
444 Value::Int(i) => Some(*i as f64),
445 Value::Float(f) => Some(*f),
446 _ => None,
447 })
448 .sum();
449 Value::Float(sum)
450 } else {
451 Value::Null
452 }
453 }
454 "avg" => {
455 let field =
456 f.arguments
457 .iter()
458 .find(|a| a.name == "field")
459 .and_then(|a| match &a.value {
460 ast::Value::String(s) => Some(s),
461 _ => None,
462 });
463
464 if let Some(field_name) = field
465 && !docs.is_empty()
466 {
467 let values: Vec<f64> = docs
468 .iter()
469 .filter_map(|d| d.data.get(field_name))
470 .filter_map(|v| match v {
471 Value::Int(i) => Some(*i as f64),
472 Value::Float(f) => Some(*f),
473 _ => None,
474 })
475 .collect();
476
477 if values.is_empty() {
478 Value::Null
479 } else {
480 let sum: f64 = values.iter().sum();
481 Value::Float(sum / values.len() as f64)
482 }
483 } else {
484 Value::Null
485 }
486 }
487 "min" => {
488 let field =
489 f.arguments
490 .iter()
491 .find(|a| a.name == "field")
492 .and_then(|a| match &a.value {
493 ast::Value::String(s) => Some(s),
494 _ => None,
495 });
496
497 if let Some(field_name) = field
498 && !docs.is_empty()
499 {
500 let min = docs
501 .iter()
502 .filter_map(|d| d.data.get(field_name))
503 .filter_map(|v| match v {
504 Value::Int(i) => Some(*i as f64),
505 Value::Float(f) => Some(*f),
506 _ => None,
507 })
508 .fold(f64::INFINITY, f64::min);
509
510 if min == f64::INFINITY {
511 Value::Null
512 } else {
513 Value::Float(min)
514 }
515 } else {
516 Value::Null
517 }
518 }
519 "max" => {
520 let field =
521 f.arguments
522 .iter()
523 .find(|a| a.name == "field")
524 .and_then(|a| match &a.value {
525 ast::Value::String(s) => Some(s),
526 _ => None,
527 });
528
529 if let Some(field_name) = field
530 && !docs.is_empty()
531 {
532 let max = docs
533 .iter()
534 .filter_map(|d| d.data.get(field_name))
535 .filter_map(|v| match v {
536 Value::Int(i) => Some(*i as f64),
537 Value::Float(f) => Some(*f),
538 _ => None,
539 })
540 .fold(f64::NEG_INFINITY, f64::max);
541
542 if max == f64::NEG_INFINITY {
543 Value::Null
544 } else {
545 Value::Float(max)
546 }
547 } else {
548 Value::Null
549 }
550 }
551 _ => Value::Null,
552 };
553 results.insert(alias.clone(), value);
554 }
555 }
556
557 results
558}
559
560fn find_indexed_equality_filter_runtime(
561 filter: &ast::Filter,
562 db: &Aurora,
563 collection: &str,
564 variables: &HashMap<String, ast::Value>,
565) -> Option<(String, ast::Value)> {
566 match filter {
567 ast::Filter::Eq(field, val) => {
568 if field == "id" || db.has_index(collection, field) {
569 let resolved = resolve_if_variable(val, variables);
570 return Some((field.clone(), resolved.clone()));
571 }
572 }
573 ast::Filter::And(filters) => {
574 for f in filters {
575 if let Some(res) =
576 find_indexed_equality_filter_runtime(f, db, collection, variables)
577 {
578 return Some(res);
579 }
580 }
581 }
582 _ => {}
583 }
584 None
585}
586
587fn collect_fields(
589 selection_set: &[Selection],
590 fragments: &HashMap<String, FragmentDef>,
591 variable_values: &HashMap<String, ast::Value>,
592 parent_type: Option<&str>,
593) -> Result<Vec<Field>> {
594 let mut fields = Vec::new();
595
596 for selection in selection_set {
597 match selection {
598 Selection::Field(field) => {
599 if should_include(&field.directives, variable_values)? {
600 fields.push(field.clone());
601 }
602 }
603 Selection::FragmentSpread(name) => {
604 if let Some(fragment) = fragments.get(name) {
605 let type_match = if let Some(parent) = parent_type {
606 parent == fragment.type_condition
607 } else {
608 true
609 };
610
611 if type_match {
612 let fragment_fields = collect_fields(
613 &fragment.selection_set,
614 fragments,
615 variable_values,
616 parent_type,
617 )?;
618 fields.extend(fragment_fields);
619 }
620 }
621 }
622 Selection::InlineFragment(inline) => {
623 let type_match = if let Some(parent) = parent_type {
624 parent == inline.type_condition
625 } else {
626 true
627 };
628
629 if type_match {
630 let inline_fields = collect_fields(
631 &inline.selection_set,
632 fragments,
633 variable_values,
634 parent_type,
635 )?;
636 fields.extend(inline_fields);
637 }
638 }
639 }
640 }
641
642 Ok(fields)
643}
644
645fn should_include(
647 directives: &[ast::Directive],
648 variables: &HashMap<String, ast::Value>,
649) -> Result<bool> {
650 for dir in directives {
651 if dir.name == "skip" {
652 if let Some(arg) = dir.arguments.iter().find(|a| a.name == "if") {
653 let should_skip = resolve_boolean_arg(&arg.value, variables)?;
654 if should_skip {
655 return Ok(false);
656 }
657 }
658 } else if dir.name == "include" {
659 if let Some(arg) = dir.arguments.iter().find(|a| a.name == "if") {
660 let should_include = resolve_boolean_arg(&arg.value, variables)?;
661 if !should_include {
662 return Ok(false);
663 }
664 }
665 }
666 }
667 Ok(true)
668}
669
670fn resolve_boolean_arg(
671 value: &ast::Value,
672 variables: &HashMap<String, ast::Value>,
673) -> Result<bool> {
674 match value {
675 ast::Value::Boolean(b) => Ok(*b),
676 ast::Value::Variable(name) => {
677 if let Some(val) = variables.get(name) {
678 match val {
679 ast::Value::Boolean(b) => Ok(*b),
680 _ => Err(AqlError::new(
681 ErrorCode::TypeError,
682 format!("Variable '{}' is not a boolean, got {:?}", name, val),
683 )),
684 }
685 } else {
686 Err(AqlError::new(
687 ErrorCode::UndefinedVariable,
688 format!("Variable '{}' is not defined", name),
689 ))
690 }
691 }
692 _ => Err(AqlError::new(
693 ErrorCode::TypeError,
694 format!("Expected boolean value, got {:?}", value),
695 )),
696 }
697}
698
699fn validate_required_variables(
701 variable_definitions: &[ast::VariableDefinition],
702 provided_variables: &HashMap<String, ast::Value>,
703) -> Result<()> {
704 for var_def in variable_definitions {
705 if var_def.var_type.is_required {
706 if !provided_variables.contains_key(&var_def.name) {
707 if var_def.default_value.is_none() {
708 return Err(AqlError::new(
709 ErrorCode::UndefinedVariable,
710 format!(
711 "Required variable '{}' (type: {}{}) is not provided",
712 var_def.name,
713 var_def.var_type.name,
714 if var_def.var_type.is_required {
715 "!"
716 } else {
717 ""
718 }
719 ),
720 ));
721 }
722 }
723 }
724 }
725 Ok(())
726}
727
728#[derive(Debug)]
730pub enum ExecutionResult {
731 Query(QueryResult),
732 Mutation(MutationResult),
733 Subscription(SubscriptionResult),
734 Batch(Vec<ExecutionResult>),
735 Schema(SchemaResult),
736 Migration(MigrationResult),
737}
738
739#[derive(Debug, Clone)]
740pub struct SchemaResult {
741 pub operation: String,
742 pub collection: String,
743 pub status: String,
744}
745
746#[derive(Debug, Clone)]
747pub struct MigrationResult {
748 pub version: String,
749 pub steps_applied: usize,
750 pub status: String,
751}
752
753#[derive(Debug, Clone, Serialize)]
754pub struct ExecutionPlan {
755 pub operations: Vec<String>,
756 pub estimated_cost: f64,
757}
758
759#[derive(Debug, Clone)]
761pub struct QueryResult {
762 pub collection: String,
763 pub documents: Vec<Document>,
764 pub total_count: Option<usize>,
765 pub deferred_fields: Vec<String>,
767 pub explain: Option<ExplainResult>,
769}
770
771#[derive(Debug, Clone, Default)]
773pub struct ExplainResult {
774 pub collection: String,
775 pub docs_scanned: usize,
776 pub index_used: bool,
777 pub elapsed_ms: u128,
778}
779
780#[derive(Debug, Clone)]
782pub struct MutationResult {
783 pub operation: String,
784 pub collection: String,
785 pub affected_count: usize,
786 pub returned_documents: Vec<Document>,
787}
788
789#[derive(Debug)]
791pub struct SubscriptionResult {
792 pub subscription_id: String,
793 pub collection: String,
794 pub stream: Option<crate::pubsub::ChangeListener>,
795}
796
797#[derive(Debug, Clone)]
799pub struct ExecutionOptions {
800 pub skip_validation: bool,
801 pub apply_projections: bool,
802 pub variables: HashMap<String, JsonValue>,
803}
804
805impl Default for ExecutionOptions {
806 fn default() -> Self {
807 Self {
808 skip_validation: false,
809 apply_projections: true,
810 variables: HashMap::new(),
811 }
812 }
813}
814
815impl ExecutionOptions {
816 pub fn new() -> Self {
817 Self::default()
818 }
819
820 pub fn with_variables(mut self, vars: HashMap<String, JsonValue>) -> Self {
821 self.variables = vars;
822 self
823 }
824
825 pub fn skip_validation(mut self) -> Self {
826 self.skip_validation = true;
827 self
828 }
829}
830
831fn json_to_aql_value(v: serde_json::Value) -> ast::Value {
832 match v {
833 serde_json::Value::Null => ast::Value::Null,
834 serde_json::Value::Bool(b) => ast::Value::Boolean(b),
835 serde_json::Value::Number(n) => {
836 if let Some(i) = n.as_i64() {
837 ast::Value::Int(i)
838 } else if let Some(f) = n.as_f64() {
839 ast::Value::Float(f)
840 } else {
841 ast::Value::Null
842 }
843 }
844 serde_json::Value::String(s) => ast::Value::String(s),
845 serde_json::Value::Array(arr) => {
846 ast::Value::Array(arr.into_iter().map(json_to_aql_value).collect())
847 }
848 serde_json::Value::Object(map) => ast::Value::Object(
849 map.into_iter()
850 .map(|(k, v)| (k, json_to_aql_value(v)))
851 .collect(),
852 ),
853 }
854}
855
856pub async fn execute_document(
858 db: &Aurora,
859 doc: &ast::Document,
860 options: &ExecutionOptions,
861) -> Result<ExecutionResult> {
862 if doc.operations.is_empty() {
863 return Err(AqlError::new(
864 ErrorCode::QueryError,
865 "No operations in document".to_string(),
866 ));
867 }
868
869 let vars: HashMap<String, ast::Value> = options
870 .variables
871 .iter()
872 .map(|(k, v)| (k.clone(), json_to_aql_value(v.clone())))
873 .collect();
874
875 let fragments: HashMap<String, FragmentDef> = doc
876 .operations
877 .iter()
878 .filter_map(|op| {
879 if let Operation::FragmentDefinition(frag) = op {
880 Some((frag.name.clone(), frag.clone()))
881 } else {
882 None
883 }
884 })
885 .collect();
886
887 let executable_ops: Vec<&Operation> = doc
888 .operations
889 .iter()
890 .filter(|op| !matches!(op, Operation::FragmentDefinition(_)))
891 .collect();
892
893 if executable_ops.is_empty() {
894 return Err(AqlError::new(
895 ErrorCode::QueryError,
896 "No executable operations in document".to_string(),
897 ));
898 }
899
900 if executable_ops.len() == 1 {
901 execute_operation(db, executable_ops[0], &vars, options, &fragments).await
902 } else {
903 let mut results = Vec::new();
904 for op in executable_ops {
905 results.push(execute_operation(db, op, &vars, options, &fragments).await?);
906 }
907 Ok(ExecutionResult::Batch(results))
908 }
909}
910
911async fn execute_operation(
912 db: &Aurora,
913 op: &Operation,
914 vars: &HashMap<String, ast::Value>,
915 options: &ExecutionOptions,
916 fragments: &HashMap<String, FragmentDef>,
917) -> Result<ExecutionResult> {
918 match op {
919 Operation::Query(query) => execute_query(db, query, vars, options, fragments).await,
920 Operation::Mutation(mutation) => {
921 execute_mutation(db, mutation, vars, options, fragments).await
922 }
923 Operation::Subscription(sub) => execute_subscription(db, sub, vars, options).await,
924 Operation::Schema(schema) => execute_schema(db, schema, options).await,
925 Operation::Migration(migration) => execute_migration(db, migration, options).await,
926 Operation::Introspection(intro) => execute_introspection(db, intro).await,
927 Operation::Handler(handler) => execute_handler_registration(db, handler, options).await,
928 _ => Ok(ExecutionResult::Query(QueryResult {
929 collection: String::new(),
930 documents: vec![],
931 total_count: None,
932 deferred_fields: vec![],
933 explain: None,
934 })),
935 }
936}
937
938async fn execute_query(
939 db: &Aurora,
940 query: &ast::Query,
941 vars: &HashMap<String, ast::Value>,
942 options: &ExecutionOptions,
943 fragments: &HashMap<String, FragmentDef>,
944) -> Result<ExecutionResult> {
945 validate_required_variables(&query.variable_definitions, vars)?;
946 let has_explain = query.directives.iter().any(|d| d.name == "explain");
947 let root_fields = collect_fields(&query.selection_set, fragments, vars, None)?;
948 let mut results = Vec::new();
949 for field in &root_fields {
950 let sub_fields = collect_fields(&field.selection_set, fragments, vars, Some(&field.name))?;
951 let start = std::time::Instant::now();
952 let mut result =
953 execute_collection_query(db, field, &sub_fields, vars, options, fragments).await?;
954 if has_explain {
955 let elapsed_ms = start.elapsed().as_millis();
956 let index_used =
957 field.arguments.iter().any(|a| a.name == "where") && !result.documents.is_empty();
958 result.explain = Some(ExplainResult {
959 collection: result.collection.clone(),
960 docs_scanned: result.documents.len(),
961 index_used,
962 elapsed_ms,
963 });
964 }
965 results.push(result);
966 }
967 if results.len() == 1 {
968 Ok(ExecutionResult::Query(results.remove(0)))
969 } else {
970 Ok(ExecutionResult::Batch(
971 results.into_iter().map(ExecutionResult::Query).collect(),
972 ))
973 }
974}
975
976async fn execute_collection_query(
977 db: &Aurora,
978 field: &ast::Field,
979 sub_fields: &[ast::Field],
980 variables: &HashMap<String, ast::Value>,
981 options: &ExecutionOptions,
982 fragments: &HashMap<String, FragmentDef>,
983) -> Result<QueryResult> {
984 let collection_name = &field.name;
985
986 if let Some(search_arg) = field.arguments.iter().find(|a| a.name == "search") {
989 return execute_search_query(
990 db,
991 collection_name,
992 search_arg,
993 sub_fields,
994 field,
995 variables,
996 options,
997 )
998 .await;
999 }
1000
1001 let filter = extract_filter_from_args(&field.arguments)?;
1002 let (limit, offset) = extract_pagination(&field.arguments);
1003 let (first, after, _last, _before) = extract_cursor_pagination(&field.arguments);
1004 let compiled_filter = if let Some(ref f) = filter {
1005 Some(compile_filter(f)?)
1006 } else {
1007 None
1008 };
1009 let vars_arc = Arc::new(variables.clone());
1010 let filter_fn = move |doc: &Document| {
1011 compiled_filter
1012 .as_ref()
1013 .map(|f| matches_filter(doc, f, &vars_arc))
1014 .unwrap_or(true)
1015 };
1016
1017 let indexed_docs = if let Some(ref f) = filter {
1018 match find_indexed_equality_filter(f, db, collection_name) {
1019 Some((field_name, val)) => {
1020 let db_val = aql_value_to_db_value(&val, variables)?;
1021 let ids = db.get_ids_from_index(collection_name, &field_name, &db_val);
1022 let mut docs = Vec::with_capacity(ids.len());
1023 for id in ids {
1024 if let Some(doc) = db.get_document(collection_name, &id)? {
1025 if filter_fn(&doc) {
1026 docs.push(doc);
1027 }
1028 }
1029 }
1030 Some(docs)
1031 }
1032 None => None,
1033 }
1034 } else {
1035 None
1036 };
1037
1038 let is_connection = sub_fields
1039 .iter()
1040 .any(|f| f.name == "edges" || f.name == "pageInfo");
1041
1042 let orderings = extract_order_by(&field.arguments);
1043
1044 let mut docs = if let Some(docs) = indexed_docs {
1045 docs
1046 } else {
1047 let scan_limit = if after.is_some() || !orderings.is_empty() {
1051 None
1052 } else {
1053 limit.or(first).map(|l| {
1054 let base = if is_connection { l + 1 } else { l };
1055 base + offset
1056 })
1057 };
1058 db.scan_and_filter(collection_name, filter_fn, scan_limit)?
1059 };
1060
1061 if let Some(validate_arg) = field.arguments.iter().find(|a| a.name == "validate") {
1064 docs.retain(|doc| doc_passes_validate_arg(doc, validate_arg));
1065 }
1066
1067 if !orderings.is_empty() {
1069 apply_ordering(&mut docs, &orderings);
1070 }
1071
1072 if let Some(ref cursor) = after {
1074 if let Some(pos) = docs.iter().position(|d| &d.id == cursor) {
1075 docs.drain(0..=pos);
1076 }
1077 }
1078
1079 if is_connection {
1080 return Ok(execute_connection(
1081 docs,
1082 sub_fields,
1083 limit.or(first),
1084 fragments,
1085 variables,
1086 ));
1087 }
1088
1089 if offset > 0 {
1091 if offset < docs.len() {
1092 docs.drain(0..offset);
1093 } else {
1094 docs.clear();
1095 }
1096 }
1097
1098 if let Some(l) = limit.or(first) {
1100 docs.truncate(l);
1101 }
1102
1103 let has_lookups = sub_fields.iter().any(|f| {
1105 f.arguments
1106 .iter()
1107 .any(|a| a.name == "collection" || a.name == "localField")
1108 });
1109
1110 let mut deferred_fields = Vec::new();
1111
1112 if options.apply_projections && !sub_fields.is_empty() {
1113 if has_lookups {
1114 let mut projected = Vec::with_capacity(docs.len());
1115 for d in docs {
1116 let (proj_doc, deferred) =
1117 apply_projection_with_lookups(db, d, sub_fields, variables).await?;
1118 projected.push(proj_doc);
1119 if deferred_fields.is_empty() {
1120 deferred_fields = deferred;
1121 }
1122 }
1123 docs = projected;
1124 } else {
1125 let mut all_deferred = Vec::new();
1126 docs = docs
1127 .into_iter()
1128 .map(|d| {
1129 let (proj, deferred) = apply_projection_and_defer(d, sub_fields);
1130 if all_deferred.is_empty() && !deferred.is_empty() {
1131 all_deferred = deferred;
1132 }
1133 proj
1134 })
1135 .collect();
1136 deferred_fields = all_deferred;
1137 }
1138 }
1139
1140 for sf in sub_fields {
1142 if sf.name == "windowFunc" {
1143 let alias = sf.alias.as_ref().unwrap_or(&sf.name).clone();
1144 let wfield = arg_string(&sf.arguments, "field").unwrap_or_default();
1145 let func = arg_string(&sf.arguments, "function").unwrap_or_else(|| "avg".to_string());
1146 let wsize = arg_i64(&sf.arguments, "windowSize").unwrap_or(3) as usize;
1147 apply_window_function(&mut docs, &alias, &wfield, &func, wsize);
1148 }
1149 }
1150
1151 if let Some(ds_field) = sub_fields.iter().find(|f| f.name == "downsample") {
1153 let interval =
1154 arg_string(&ds_field.arguments, "interval").unwrap_or_else(|| "1h".to_string());
1155 let aggregation =
1156 arg_string(&ds_field.arguments, "aggregation").unwrap_or_else(|| "avg".to_string());
1157 let ds_sub: Vec<String> =
1158 collect_fields(&ds_field.selection_set, fragments, variables, None)
1159 .unwrap_or_default()
1160 .iter()
1161 .map(|f| f.name.clone())
1162 .collect();
1163 docs = apply_downsample(docs, &interval, &aggregation, &ds_sub);
1164 }
1165
1166 Ok(QueryResult {
1167 collection: collection_name.clone(),
1168 documents: docs,
1169 total_count: None,
1170 deferred_fields,
1171 explain: None,
1172 })
1173}
1174
1175async fn execute_search_query(
1177 db: &Aurora,
1178 collection: &str,
1179 search_arg: &ast::Argument,
1180 sub_fields: &[ast::Field],
1181 field: &ast::Field,
1182 variables: &HashMap<String, ast::Value>,
1183 options: &ExecutionOptions,
1184) -> Result<QueryResult> {
1185 let resolved_search_val = resolve_ast_deep(&search_arg.value, variables);
1187 let (query_str, search_fields, fuzzy) = extract_search_params(&resolved_search_val);
1188 let (limit, _) = extract_pagination(&field.arguments);
1189
1190 let mut builder = db.search(collection).query(&query_str);
1191 if fuzzy {
1192 builder = builder.fuzzy(1);
1193 }
1194 if let Some(l) = limit {
1195 builder = builder.limit(l);
1196 }
1197
1198 let mut docs = builder
1199 .collect_with_fields(if search_fields.is_empty() {
1200 None
1201 } else {
1202 Some(&search_fields)
1203 })
1204 .await?;
1205
1206 if options.apply_projections && !sub_fields.is_empty() {
1207 docs = docs
1208 .into_iter()
1209 .map(|d| {
1210 let (proj, _) = apply_projection_and_defer(d, sub_fields);
1211 proj
1212 })
1213 .collect();
1214 }
1215
1216 Ok(QueryResult {
1217 collection: collection.to_string(),
1218 documents: docs,
1219 total_count: None,
1220 deferred_fields: vec![],
1221 explain: None,
1222 })
1223}
1224
1225fn extract_search_params(v: &ast::Value) -> (String, Vec<String>, bool) {
1226 let mut query = String::new();
1227 let mut fields = Vec::new();
1228 let mut fuzzy = false;
1229 if let ast::Value::Object(m) = v {
1230 if let Some(ast::Value::String(q)) = m.get("query") {
1231 query = q.clone();
1232 }
1233 if let Some(ast::Value::Array(arr)) = m.get("fields") {
1234 for item in arr {
1235 if let ast::Value::String(s) = item {
1236 fields.push(s.clone());
1237 }
1238 }
1239 }
1240 if let Some(ast::Value::Boolean(b)) = m.get("fuzzy") {
1241 fuzzy = *b;
1242 }
1243 }
1244 (query, fields, fuzzy)
1245}
1246
1247fn doc_passes_validate_arg(doc: &Document, validate_arg: &ast::Argument) -> bool {
1248 if let ast::Value::Object(rules) = &validate_arg.value {
1249 for (field_name, constraints_val) in rules {
1250 if let ast::Value::Object(constraints) = constraints_val {
1251 if let Some(field_val) = doc.data.get(field_name) {
1252 for (constraint_name, constraint_val) in constraints {
1253 if !check_inline_constraint(field_val, constraint_name, constraint_val) {
1254 return false;
1255 }
1256 }
1257 }
1258 }
1259 }
1260 }
1261 true
1262}
1263
1264fn check_inline_constraint(value: &Value, constraint: &str, constraint_val: &ast::Value) -> bool {
1265 match constraint {
1266 "format" => {
1267 if let (Value::String(s), ast::Value::String(fmt)) = (value, constraint_val) {
1268 return match fmt.as_str() {
1269 "email" => {
1270 s.contains('@')
1271 && s.split('@')
1272 .nth(1)
1273 .map(|d| d.contains('.'))
1274 .unwrap_or(false)
1275 }
1276 "url" => s.starts_with("http://") || s.starts_with("https://"),
1277 "uuid" => uuid::Uuid::parse_str(s).is_ok(),
1278 _ => true,
1279 };
1280 }
1281 true
1282 }
1283 "min" => {
1284 let n = match value {
1285 Value::Int(i) => *i as f64,
1286 Value::Float(f) => *f,
1287 _ => return true,
1288 };
1289 let min = match constraint_val {
1290 ast::Value::Float(f) => *f,
1291 ast::Value::Int(i) => *i as f64,
1292 _ => return true,
1293 };
1294 n >= min
1295 }
1296 "max" => {
1297 let n = match value {
1298 Value::Int(i) => *i as f64,
1299 Value::Float(f) => *f,
1300 _ => return true,
1301 };
1302 let max = match constraint_val {
1303 ast::Value::Float(f) => *f,
1304 ast::Value::Int(i) => *i as f64,
1305 _ => return true,
1306 };
1307 n <= max
1308 }
1309 "minLength" => {
1310 if let (Value::String(s), ast::Value::Int(n)) = (value, constraint_val) {
1311 return s.len() >= *n as usize;
1312 }
1313 true
1314 }
1315 "maxLength" => {
1316 if let (Value::String(s), ast::Value::Int(n)) = (value, constraint_val) {
1317 return s.len() <= *n as usize;
1318 }
1319 true
1320 }
1321 "pattern" => {
1322 if let (Value::String(s), ast::Value::String(pat)) = (value, constraint_val) {
1323 if let Ok(re) = regex::Regex::new(pat) {
1324 return re.is_match(s);
1325 }
1326 }
1327 true
1328 }
1329 _ => true,
1330 }
1331}
1332
1333fn arg_string(args: &[ast::Argument], name: &str) -> Option<String> {
1334 args.iter().find(|a| a.name == name).and_then(|a| {
1335 if let ast::Value::String(s) = &a.value {
1336 Some(s.clone())
1337 } else {
1338 None
1339 }
1340 })
1341}
1342
1343fn arg_i64(args: &[ast::Argument], name: &str) -> Option<i64> {
1344 args.iter().find(|a| a.name == name).and_then(|a| {
1345 if let ast::Value::Int(i) = &a.value {
1346 Some(*i)
1347 } else {
1348 None
1349 }
1350 })
1351}
1352
1353fn apply_projection_and_defer(mut doc: Document, fields: &[ast::Field]) -> (Document, Vec<String>) {
1355 if fields.is_empty() {
1356 return (doc, vec![]);
1357 }
1358 let mut proj = HashMap::new();
1359 let mut deferred = Vec::new();
1360
1361 for f in fields {
1362 if f.directives.iter().any(|d| d.name == "defer") {
1364 deferred.push(f.alias.as_ref().unwrap_or(&f.name).clone());
1365 continue;
1366 }
1367 if f.name == "__compute__" {
1369 let alias = f.alias.as_deref().unwrap_or("computed");
1370 if let Some(expr) = f.arguments.iter().find(|a| a.name == "expr") {
1371 if let ast::Value::String(template) = &expr.value {
1372 let result = eval_template(template, &doc.data);
1373 proj.insert(alias.to_string(), Value::String(result));
1374 }
1375 }
1376 continue;
1377 }
1378 if f.name == "id" {
1379 proj.insert(
1380 f.alias.as_ref().unwrap_or(&f.name).clone(),
1381 Value::String(doc.id.clone()),
1382 );
1383 } else if let Some(v) = doc.data.get(&f.name) {
1384 proj.insert(f.alias.as_ref().unwrap_or(&f.name).clone(), v.clone());
1385 }
1386 }
1387 doc.data = proj;
1388 (doc, deferred)
1389}
1390
1391fn eval_template(template: &str, data: &HashMap<String, Value>) -> String {
1393 let mut result = template.to_string();
1394 for (k, v) in data {
1395 let placeholder = format!("${}", k);
1396 if result.contains(&placeholder) {
1397 result = result.replace(&placeholder, &v.to_string());
1398 }
1399 }
1400 result
1401}
1402
1403fn apply_window_function(
1405 docs: &mut Vec<Document>,
1406 alias: &str,
1407 field: &str,
1408 function: &str,
1409 window: usize,
1410) {
1411 if docs.is_empty() || window == 0 {
1412 return;
1413 }
1414 let values: Vec<Option<f64>> = docs
1415 .iter()
1416 .map(|d| match d.data.get(field) {
1417 Some(Value::Int(i)) => Some(*i as f64),
1418 Some(Value::Float(f)) => Some(*f),
1419 _ => None,
1420 })
1421 .collect();
1422
1423 for (i, doc) in docs.iter_mut().enumerate() {
1424 let start = if i + 1 >= window { i + 1 - window } else { 0 };
1425 let window_vals: Vec<f64> = values[start..=i].iter().filter_map(|v| *v).collect();
1426 if window_vals.is_empty() {
1427 continue;
1428 }
1429 let result = match function {
1430 "rollingAvg" | "avg" => window_vals.iter().sum::<f64>() / window_vals.len() as f64,
1431 "rollingSum" | "sum" => window_vals.iter().sum::<f64>(),
1432 "rollingMin" | "min" => window_vals.iter().cloned().fold(f64::INFINITY, f64::min),
1433 "rollingMax" | "max" => window_vals
1434 .iter()
1435 .cloned()
1436 .fold(f64::NEG_INFINITY, f64::max),
1437 _ => window_vals.iter().sum::<f64>() / window_vals.len() as f64,
1438 };
1439 doc.data.insert(alias.to_string(), Value::Float(result));
1440 }
1441}
1442
1443fn apply_downsample(
1445 docs: Vec<Document>,
1446 interval: &str,
1447 aggregation: &str,
1448 value_fields: &[String],
1449) -> Vec<Document> {
1450 let interval_secs: i64 = parse_interval(interval);
1451 if interval_secs <= 0 {
1452 return docs;
1453 }
1454
1455 let mut buckets: std::collections::BTreeMap<i64, Vec<Document>> =
1457 std::collections::BTreeMap::new();
1458 let mut leftover = Vec::new();
1459
1460 for doc in docs {
1461 let ts = ["timestamp", "ts", "created_at", "time"]
1463 .iter()
1464 .find_map(|&k| doc.data.get(k))
1465 .and_then(|v| match v {
1466 Value::String(s) => chrono::DateTime::parse_from_rfc3339(s)
1467 .ok()
1468 .map(|dt| dt.timestamp()),
1469 Value::Int(i) => Some(*i),
1470 _ => None,
1471 });
1472
1473 if let Some(t) = ts {
1474 let bucket = (t / interval_secs) * interval_secs;
1475 buckets.entry(bucket).or_default().push(doc);
1476 } else {
1477 leftover.push(doc);
1478 }
1479 }
1480
1481 let mut result = Vec::new();
1482 for (bucket_ts, group) in buckets {
1483 let mut data = HashMap::new();
1484 data.insert(
1485 "timestamp".to_string(),
1486 Value::String(
1487 chrono::DateTime::from_timestamp(bucket_ts, 0)
1488 .map(|dt: chrono::DateTime<chrono::Utc>| dt.to_rfc3339())
1489 .unwrap_or_default(),
1490 ),
1491 );
1492 data.insert("count".to_string(), Value::Int(group.len() as i64));
1493
1494 for field in value_fields {
1495 if field == "timestamp" || field == "count" {
1496 continue;
1497 }
1498 let nums: Vec<f64> = group
1499 .iter()
1500 .filter_map(|d| match d.data.get(field) {
1501 Some(Value::Int(i)) => Some(*i as f64),
1502 Some(Value::Float(f)) => Some(*f),
1503 _ => None,
1504 })
1505 .collect();
1506 if nums.is_empty() {
1507 continue;
1508 }
1509 let agg = match aggregation {
1510 "sum" => nums.iter().sum::<f64>(),
1511 "min" => nums.iter().cloned().fold(f64::INFINITY, f64::min),
1512 "max" => nums.iter().cloned().fold(f64::NEG_INFINITY, f64::max),
1513 "count" => nums.len() as f64,
1514 _ => nums.iter().sum::<f64>() / nums.len() as f64, };
1516 data.insert(field.clone(), Value::Float(agg));
1517 }
1518
1519 result.push(Document {
1520 id: bucket_ts.to_string(),
1521 data,
1522 });
1523 }
1524
1525 result.extend(leftover);
1526 result
1527}
1528
1529fn parse_interval(s: &str) -> i64 {
1530 let s = s.trim();
1531 if s.ends_with('s') {
1532 s[..s.len() - 1].parse().unwrap_or(0)
1533 } else if s.ends_with('m') {
1534 s[..s.len() - 1].parse::<i64>().unwrap_or(0) * 60
1535 } else if s.ends_with('h') {
1536 s[..s.len() - 1].parse::<i64>().unwrap_or(0) * 3600
1537 } else if s.ends_with('d') {
1538 s[..s.len() - 1].parse::<i64>().unwrap_or(0) * 86400
1539 } else {
1540 s.parse().unwrap_or(3600)
1541 }
1542}
1543
1544async fn execute_handler_registration(
1547 db: &Aurora,
1548 handler: &ast::HandlerDef,
1549 _options: &ExecutionOptions,
1550) -> Result<ExecutionResult> {
1551 use crate::pubsub::events::ChangeType;
1552
1553 let collection = match &handler.trigger {
1554 ast::HandlerTrigger::Insert { collection }
1555 | ast::HandlerTrigger::Update { collection }
1556 | ast::HandlerTrigger::Delete { collection } => {
1557 collection.as_deref().unwrap_or("*").to_string()
1558 }
1559 _ => "*".to_string(),
1560 };
1561
1562 let trigger_type = match &handler.trigger {
1563 ast::HandlerTrigger::Insert { .. } => Some(ChangeType::Insert),
1564 ast::HandlerTrigger::Update { .. } => Some(ChangeType::Update),
1565 ast::HandlerTrigger::Delete { .. } => Some(ChangeType::Delete),
1566 _ => None,
1567 };
1568
1569 let mut listener = if collection == "*" {
1570 db.pubsub.listen_all()
1571 } else {
1572 db.pubsub.listen(collection.clone())
1573 };
1574
1575 let db_clone = db.clone();
1576 let action = handler.action.clone();
1577 let handler_name = handler.name.clone();
1578
1579 tokio::spawn(async move {
1580 loop {
1581 match listener.recv().await {
1582 Ok(event) => {
1583 let matches = trigger_type
1585 .as_ref()
1586 .map(|t| &event.change_type == t)
1587 .unwrap_or(true);
1588 if !matches {
1589 continue;
1590 }
1591
1592 let mut vars = HashMap::new();
1596 vars.insert("_id".to_string(), ast::Value::String(event.id.clone()));
1597 if let Some(doc) = &event.document {
1598 for (k, v) in &doc.data {
1600 vars.insert(format!("_{}", k), db_value_to_ast_value(v));
1601 }
1602 }
1603
1604 let _ = execute_mutation_op(
1605 &db_clone,
1606 &action,
1607 &vars,
1608 &HashMap::new(),
1609 &ExecutionOptions::default(),
1610 &HashMap::new(),
1611 )
1612 .await;
1613 }
1614 Err(_) => {
1615 eprintln!("[handler:{}] channel closed, stopping", handler_name);
1616 break;
1617 }
1618 }
1619 }
1620 });
1621
1622 eprintln!(
1623 "[handler] '{}' registered on '{}'",
1624 handler.name, collection
1625 );
1626
1627 let mut data = HashMap::new();
1628 data.insert("name".to_string(), Value::String(handler.name.clone()));
1629 data.insert("collection".to_string(), Value::String(collection));
1630 data.insert(
1631 "status".to_string(),
1632 Value::String("registered".to_string()),
1633 );
1634
1635 Ok(ExecutionResult::Query(QueryResult {
1636 collection: "__handler".to_string(),
1637 documents: vec![Document {
1638 id: handler.name.clone(),
1639 data,
1640 }],
1641 total_count: Some(1),
1642 deferred_fields: vec![],
1643 explain: None,
1644 }))
1645}
1646
1647fn db_value_to_ast_value(v: &Value) -> ast::Value {
1648 match v {
1649 Value::Null => ast::Value::Null,
1650 Value::Bool(b) => ast::Value::Boolean(*b),
1651 Value::Int(i) => ast::Value::Int(*i),
1652 Value::Float(f) => ast::Value::Float(*f),
1653 Value::String(s) => ast::Value::String(s.clone()),
1654 Value::Uuid(u) => ast::Value::String(u.to_string()),
1655 Value::DateTime(dt) => ast::Value::String(dt.to_rfc3339()),
1656 Value::Array(arr) => ast::Value::Array(arr.iter().map(db_value_to_ast_value).collect()),
1657 Value::Object(m) => ast::Value::Object(
1658 m.iter()
1659 .map(|(k, v)| (k.clone(), db_value_to_ast_value(v)))
1660 .collect(),
1661 ),
1662 }
1663}
1664
1665async fn execute_mutation(
1666 db: &Aurora,
1667 mutation: &ast::Mutation,
1668 vars: &HashMap<String, ast::Value>,
1669 options: &ExecutionOptions,
1670 fragments: &HashMap<String, FragmentDef>,
1671) -> Result<ExecutionResult> {
1672 use crate::transaction::ACTIVE_TRANSACTION_ID;
1673
1674 validate_required_variables(&mutation.variable_definitions, vars)?;
1675
1676 let already_in_tx = ACTIVE_TRANSACTION_ID
1681 .try_with(|id| *id)
1682 .ok()
1683 .and_then(|id| db.transaction_manager.active_transactions.get(&id))
1684 .is_some();
1685
1686 if already_in_tx {
1687 let mut results = Vec::new();
1688 let mut context = HashMap::new();
1689 for mut_op in &mutation.operations {
1690 let res = execute_mutation_op(db, mut_op, vars, &context, options, fragments).await?;
1691 if let Some(alias) = &mut_op.alias {
1692 if let Some(doc) = res.returned_documents.first() {
1693 let mut m = serde_json::Map::new();
1694 for (k, v) in &doc.data {
1695 m.insert(k.clone(), aurora_value_to_json_value(v));
1696 }
1697 m.insert("id".to_string(), JsonValue::String(doc.id.clone()));
1698 context.insert(alias.clone(), JsonValue::Object(m));
1699 }
1700 }
1701 results.push(res);
1702 }
1703 return if results.len() == 1 {
1704 Ok(ExecutionResult::Mutation(results.remove(0)))
1705 } else {
1706 Ok(ExecutionResult::Batch(
1707 results.into_iter().map(ExecutionResult::Mutation).collect(),
1708 ))
1709 };
1710 }
1711
1712 let tx_id = db.begin_transaction().await;
1715
1716 let exec_result = ACTIVE_TRANSACTION_ID
1717 .scope(tx_id, async {
1718 let mut results = Vec::new();
1719 let mut context = HashMap::new();
1720 for mut_op in &mutation.operations {
1721 let res =
1722 execute_mutation_op(db, mut_op, vars, &context, options, fragments).await?;
1723 if let Some(alias) = &mut_op.alias {
1724 if let Some(doc) = res.returned_documents.first() {
1725 let mut m = serde_json::Map::new();
1726 for (k, v) in &doc.data {
1727 m.insert(k.clone(), aurora_value_to_json_value(v));
1728 }
1729 m.insert("id".to_string(), JsonValue::String(doc.id.clone()));
1730 context.insert(alias.clone(), JsonValue::Object(m));
1731 }
1732 }
1733 results.push(res);
1734 }
1735 Ok::<_, crate::error::AqlError>(results)
1736 })
1737 .await;
1738
1739 match exec_result {
1740 Ok(mut results) => {
1741 db.commit_transaction(tx_id).await?;
1742 if results.len() == 1 {
1743 Ok(ExecutionResult::Mutation(results.remove(0)))
1744 } else {
1745 Ok(ExecutionResult::Batch(
1746 results.into_iter().map(ExecutionResult::Mutation).collect(),
1747 ))
1748 }
1749 }
1750 Err(e) => {
1751 let _ = db.rollback_transaction(tx_id).await;
1752 Err(e)
1753 }
1754 }
1755}
1756
1757fn execute_mutation_op<'a>(
1758 db: &'a Aurora,
1759 mut_op: &'a ast::MutationOperation,
1760 variables: &'a HashMap<String, ast::Value>,
1761 context: &'a ExecutionContext,
1762 options: &'a ExecutionOptions,
1763 fragments: &'a HashMap<String, FragmentDef>,
1764) -> futures::future::BoxFuture<'a, Result<MutationResult>> {
1765 use futures::future::FutureExt;
1766 async move {
1767 match &mut_op.operation {
1768 MutationOp::Insert { collection, data } => {
1769 let resolved = resolve_value(data, variables, context);
1770 let doc = db
1771 .aql_insert(collection, aql_value_to_hashmap(&resolved, variables)?)
1772 .await?;
1773 let returned = if !mut_op.selection_set.is_empty() && options.apply_projections {
1774 let fields = collect_fields(
1775 &mut_op.selection_set,
1776 fragments,
1777 variables,
1778 Some(collection),
1779 )
1780 .unwrap_or_default();
1781 vec![apply_projection(doc, &fields)]
1782 } else {
1783 vec![doc]
1784 };
1785 Ok(MutationResult {
1786 operation: "insert".to_string(),
1787 collection: collection.clone(),
1788 affected_count: 1,
1789 returned_documents: returned,
1790 })
1791 }
1792 MutationOp::Update {
1793 collection,
1794 filter,
1795 data,
1796 } => {
1797 let cf = if let Some(f) = filter {
1798 Some(compile_filter(f)?)
1799 } else {
1800 None
1801 };
1802 let update_data =
1803 aql_value_to_hashmap(&resolve_value(data, variables, context), variables)?;
1804
1805 let mut affected = 0;
1807 let mut returned = Vec::new();
1808
1809 let vars_arc = Arc::new(variables.clone());
1812 let cf_arc = cf.map(Arc::new);
1813
1814 let matches = db.scan_and_filter(
1815 collection,
1816 |doc| {
1817 if let Some(ref filter) = cf_arc {
1818 matches_filter(doc, filter, &vars_arc)
1819 } else {
1820 true
1821 }
1822 },
1823 None,
1824 )?;
1825
1826 let fields = if !mut_op.selection_set.is_empty() {
1827 Some(
1828 collect_fields(
1829 &mut_op.selection_set,
1830 fragments,
1831 variables,
1832 Some(collection),
1833 )
1834 .unwrap_or_default(),
1835 )
1836 } else {
1837 None
1838 };
1839
1840 for doc in matches {
1841 let mut new_data = doc.data.clone();
1842 for (k, v) in &update_data {
1843 let applied = apply_field_modifier(new_data.get(k), v);
1844 new_data.insert(k.clone(), applied);
1845 }
1846
1847 let updated_doc = db
1848 .aql_update_document(collection, &doc.id, new_data)
1849 .await?;
1850
1851 affected += 1;
1852 if let Some(ref f) = fields {
1853 returned.push(apply_projection(updated_doc, f));
1854 }
1855 }
1856
1857 Ok(MutationResult {
1858 operation: "update".to_string(),
1859 collection: collection.clone(),
1860 affected_count: affected,
1861 returned_documents: returned,
1862 })
1863 }
1864 MutationOp::Delete { collection, filter } => {
1865 let cf = if let Some(f) = filter {
1866 Some(compile_filter(f)?)
1867 } else {
1868 None
1869 };
1870
1871 let mut affected = 0;
1872 let vars_arc = Arc::new(variables.clone());
1873 let cf_arc = cf.map(Arc::new);
1874
1875 let matches = db.scan_and_filter(
1876 collection,
1877 |doc| {
1878 if let Some(ref filter) = cf_arc {
1879 matches_filter(doc, filter, &vars_arc)
1880 } else {
1881 true
1882 }
1883 },
1884 None,
1885 )?;
1886
1887 for doc in matches {
1888 db.aql_delete_document(collection, &doc.id).await?;
1889 affected += 1;
1890 }
1891
1892 Ok(MutationResult {
1893 operation: "delete".to_string(),
1894 collection: collection.clone(),
1895 affected_count: affected,
1896 returned_documents: vec![],
1897 })
1898 }
1899 MutationOp::InsertMany { collection, data } => {
1900 let mut affected = 0;
1901 let mut returned = Vec::new();
1902 for item in data {
1903 let resolved = resolve_value(item, variables, context);
1904 let doc = db
1905 .aql_insert(collection, aql_value_to_hashmap(&resolved, variables)?)
1906 .await?;
1907 affected += 1;
1908 if !mut_op.selection_set.is_empty() && options.apply_projections {
1909 let fields = collect_fields(
1910 &mut_op.selection_set,
1911 fragments,
1912 variables,
1913 Some(collection),
1914 )
1915 .unwrap_or_default();
1916 returned.push(apply_projection(doc, &fields));
1917 } else {
1918 returned.push(doc);
1919 }
1920 }
1921 Ok(MutationResult {
1922 operation: "insertMany".to_string(),
1923 collection: collection.clone(),
1924 affected_count: affected,
1925 returned_documents: returned,
1926 })
1927 }
1928 MutationOp::Upsert {
1929 collection,
1930 filter,
1931 data,
1932 } => {
1933 let update_data =
1934 aql_value_to_hashmap(&resolve_value(data, variables, context), variables)?;
1935 let cf = if let Some(f) = filter {
1936 Some(compile_filter(f)?)
1937 } else {
1938 None
1939 };
1940 let vars_arc = Arc::new(variables.clone());
1941 let cf_arc = cf.map(Arc::new);
1942 let matches = db.scan_and_filter(
1943 collection,
1944 |doc| {
1945 if let Some(ref filter) = cf_arc {
1946 matches_filter(doc, filter, &vars_arc)
1947 } else {
1948 true
1949 }
1950 },
1951 Some(1),
1952 )?;
1953 let doc = if let Some(existing) = matches.into_iter().next() {
1954 db.aql_update_document(collection, &existing.id, update_data)
1955 .await?
1956 } else {
1957 db.aql_insert(collection, update_data).await?
1958 };
1959 Ok(MutationResult {
1960 operation: "upsert".to_string(),
1961 collection: collection.clone(),
1962 affected_count: 1,
1963 returned_documents: vec![doc],
1964 })
1965 }
1966 MutationOp::Transaction { operations } => {
1967 let mut all_returned = Vec::new();
1968 let mut total_affected = 0;
1969 for op in operations {
1970 let res =
1971 execute_mutation_op(db, op, variables, context, options, fragments).await?;
1972 total_affected += res.affected_count;
1973 all_returned.extend(res.returned_documents);
1974 }
1975 Ok(MutationResult {
1976 operation: "transaction".to_string(),
1977 collection: String::new(),
1978 affected_count: total_affected,
1979 returned_documents: all_returned,
1980 })
1981 }
1982 MutationOp::EnqueueJobs {
1983 job_type,
1984 payloads,
1985 priority,
1986 max_retries,
1987 } => {
1988 let workers = db.workers.as_ref().ok_or_else(|| {
1989 AqlError::new(
1990 ErrorCode::QueryError,
1991 "Worker system not initialised".to_string(),
1992 )
1993 })?;
1994 let job_priority = match priority {
1995 ast::JobPriority::Low => crate::workers::JobPriority::Low,
1996 ast::JobPriority::Normal => crate::workers::JobPriority::Normal,
1997 ast::JobPriority::High => crate::workers::JobPriority::High,
1998 ast::JobPriority::Critical => crate::workers::JobPriority::Critical,
1999 };
2000 let mut returned = Vec::new();
2001 for payload in payloads {
2002 let resolved = resolve_value(payload, variables, context);
2003 let json_payload: std::collections::HashMap<String, serde_json::Value> =
2004 if let ast::Value::Object(map) = &resolved {
2005 map.iter()
2006 .map(|(k, v)| (k.clone(), aql_value_to_json(v)))
2007 .collect()
2008 } else {
2009 std::collections::HashMap::new()
2010 };
2011 let mut job =
2012 crate::workers::Job::new(job_type.clone()).with_priority(job_priority);
2013 for (k, v) in json_payload {
2014 job = job.add_field(k, v);
2015 }
2016 if let Some(retries) = max_retries {
2017 job = job.with_max_retries(*retries);
2018 }
2019 let job_id = workers.enqueue(job).await?;
2020 let mut doc = crate::types::Document::new();
2021 doc.id = job_id.clone();
2022 doc.data.insert("job_id".to_string(), Value::String(job_id));
2023 doc.data
2024 .insert("job_type".to_string(), Value::String(job_type.clone()));
2025 doc.data
2026 .insert("status".to_string(), Value::String("pending".to_string()));
2027 returned.push(doc);
2028 }
2029 let count = returned.len();
2030 Ok(MutationResult {
2031 operation: "enqueueJobs".to_string(),
2032 collection: "__jobs".to_string(),
2033 affected_count: count,
2034 returned_documents: returned,
2035 })
2036 }
2037 MutationOp::Import { collection, data } => {
2038 let mut affected = 0;
2039 let mut returned = Vec::new();
2040 let fields = if !mut_op.selection_set.is_empty() {
2041 Some(
2042 collect_fields(
2043 &mut_op.selection_set,
2044 fragments,
2045 variables,
2046 Some(collection),
2047 )
2048 .unwrap_or_default(),
2049 )
2050 } else {
2051 None
2052 };
2053 for item in data {
2054 let resolved = resolve_value(item, variables, context);
2055 let map = aql_value_to_hashmap(&resolved, variables)?;
2056 let doc = db.aql_insert(collection, map).await?;
2057 affected += 1;
2058 if let Some(ref f) = fields {
2059 returned.push(apply_projection(doc, f));
2060 }
2061 }
2062 Ok(MutationResult {
2063 operation: "import".to_string(),
2064 collection: collection.clone(),
2065 affected_count: affected,
2066 returned_documents: returned,
2067 })
2068 }
2069 MutationOp::Export {
2070 collection,
2071 format: _,
2072 } => {
2073 let docs = db.scan_and_filter(collection, |_| true, None)?;
2074 let fields = if !mut_op.selection_set.is_empty() {
2075 Some(
2076 collect_fields(
2077 &mut_op.selection_set,
2078 fragments,
2079 variables,
2080 Some(collection),
2081 )
2082 .unwrap_or_default(),
2083 )
2084 } else {
2085 None
2086 };
2087 let returned: Vec<Document> = if let Some(ref f) = fields {
2088 docs.into_iter().map(|d| apply_projection(d, f)).collect()
2089 } else {
2090 docs
2091 };
2092 let count = returned.len();
2093 Ok(MutationResult {
2094 operation: "export".to_string(),
2095 collection: collection.clone(),
2096 affected_count: count,
2097 returned_documents: returned,
2098 })
2099 }
2100 MutationOp::EnqueueJob {
2101 job_type,
2102 payload,
2103 priority,
2104 scheduled_at,
2105 max_retries,
2106 } => {
2107 let workers = db.workers.as_ref().ok_or_else(|| {
2108 AqlError::new(
2109 ErrorCode::QueryError,
2110 "Worker system not initialised".to_string(),
2111 )
2112 })?;
2113
2114 let job_priority = match priority {
2116 ast::JobPriority::Low => crate::workers::JobPriority::Low,
2117 ast::JobPriority::Normal => crate::workers::JobPriority::Normal,
2118 ast::JobPriority::High => crate::workers::JobPriority::High,
2119 ast::JobPriority::Critical => crate::workers::JobPriority::Critical,
2120 };
2121
2122 let resolved = resolve_value(payload, variables, context);
2124 let json_payload: std::collections::HashMap<String, serde_json::Value> =
2125 if let ast::Value::Object(map) = &resolved {
2126 map.iter()
2127 .map(|(k, v)| (k.clone(), aql_value_to_json(v)))
2128 .collect()
2129 } else {
2130 std::collections::HashMap::new()
2131 };
2132
2133 let mut job =
2134 crate::workers::Job::new(job_type.clone()).with_priority(job_priority);
2135
2136 for (k, v) in json_payload {
2137 job = job.add_field(k, v);
2138 }
2139
2140 if let Some(retries) = max_retries {
2141 job = job.with_max_retries(*retries);
2142 }
2143
2144 if let Some(scheduled) = scheduled_at {
2145 if let Ok(dt) = scheduled.parse::<chrono::DateTime<chrono::Utc>>() {
2146 job = job.scheduled_at(dt);
2147 }
2148 }
2149
2150 let job_id = workers.enqueue(job).await?;
2151
2152 let mut doc = crate::types::Document::new();
2153 doc.id = job_id.clone();
2154 doc.data
2155 .insert("job_id".to_string(), crate::types::Value::String(job_id));
2156 doc.data.insert(
2157 "job_type".to_string(),
2158 crate::types::Value::String(job_type.clone()),
2159 );
2160 doc.data.insert(
2161 "status".to_string(),
2162 crate::types::Value::String("pending".to_string()),
2163 );
2164
2165 Ok(MutationResult {
2166 operation: "enqueueJob".to_string(),
2167 collection: "__jobs".to_string(),
2168 affected_count: 1,
2169 returned_documents: vec![doc],
2170 })
2171 }
2172 }
2173 }
2174 .boxed()
2175}
2176
2177async fn execute_subscription(
2178 db: &Aurora,
2179 sub: &ast::Subscription,
2180 vars: &HashMap<String, ast::Value>,
2181 _options: &ExecutionOptions,
2182) -> Result<ExecutionResult> {
2183 let vars: HashMap<String, ast::Value> = vars.clone();
2184
2185 let selection = sub.selection_set.first().ok_or_else(|| {
2186 AqlError::new(
2187 ErrorCode::QueryError,
2188 "Subscription must have a selection".to_string(),
2189 )
2190 })?;
2191
2192 if let Selection::Field(f) = selection {
2193 let collection = f.name.clone();
2194 let filter = extract_filter_from_args(&f.arguments)?;
2195
2196 let mut listener = db.pubsub.listen(&collection);
2197
2198 if let Some(f) = filter {
2199 let event_filter = ast_filter_to_event_filter(&f, &vars)?;
2200 listener = listener.filter(event_filter);
2201 }
2202
2203 Ok(ExecutionResult::Subscription(SubscriptionResult {
2204 subscription_id: uuid::Uuid::new_v4().to_string(),
2205 collection,
2206 stream: Some(listener),
2207 }))
2208 } else {
2209 Err(AqlError::new(
2210 ErrorCode::QueryError,
2211 "Invalid subscription selection".to_string(),
2212 ))
2213 }
2214}
2215
2216fn ast_filter_to_event_filter(
2217 filter: &AqlFilter,
2218 vars: &HashMap<String, ast::Value>,
2219) -> Result<crate::pubsub::EventFilter> {
2220 use crate::pubsub::EventFilter;
2221
2222 match filter {
2223 AqlFilter::Eq(f, v) => Ok(EventFilter::FieldEquals(
2224 f.clone(),
2225 aql_value_to_db_value(v, vars)?,
2226 )),
2227 AqlFilter::Ne(f, v) => Ok(EventFilter::Ne(f.clone(), aql_value_to_db_value(v, vars)?)),
2228 AqlFilter::Gt(f, v) => Ok(EventFilter::Gt(f.clone(), aql_value_to_db_value(v, vars)?)),
2229 AqlFilter::Gte(f, v) => Ok(EventFilter::Gte(f.clone(), aql_value_to_db_value(v, vars)?)),
2230 AqlFilter::Lt(f, v) => Ok(EventFilter::Lt(f.clone(), aql_value_to_db_value(v, vars)?)),
2231 AqlFilter::Lte(f, v) => Ok(EventFilter::Lte(f.clone(), aql_value_to_db_value(v, vars)?)),
2232 AqlFilter::In(f, v) => Ok(EventFilter::In(f.clone(), aql_value_to_db_value(v, vars)?)),
2233 AqlFilter::NotIn(f, v) => Ok(EventFilter::NotIn(
2234 f.clone(),
2235 aql_value_to_db_value(v, vars)?,
2236 )),
2237 AqlFilter::Contains(f, v) => Ok(EventFilter::Contains(
2238 f.clone(),
2239 aql_value_to_db_value(v, vars)?,
2240 )),
2241 AqlFilter::ContainsAny(f, v) | AqlFilter::ContainsAll(f, v) => Ok(EventFilter::Contains(
2243 f.clone(),
2244 aql_value_to_db_value(v, vars)?,
2245 )),
2246 AqlFilter::StartsWith(f, v) => Ok(EventFilter::StartsWith(
2247 f.clone(),
2248 aql_value_to_db_value(v, vars)?,
2249 )),
2250 AqlFilter::EndsWith(f, v) => Ok(EventFilter::EndsWith(
2251 f.clone(),
2252 aql_value_to_db_value(v, vars)?,
2253 )),
2254 AqlFilter::Matches(f, v) => {
2255 let pattern = match aql_value_to_db_value(v, vars)? {
2256 crate::types::Value::String(s) => s,
2257 other => other.to_string(),
2258 };
2259 let re = regex::Regex::new(&pattern).map_err(|e| {
2260 crate::error::AqlError::invalid_operation(format!("Invalid regex pattern: {}", e))
2261 })?;
2262 Ok(EventFilter::Matches(f.clone(), re))
2263 }
2264 AqlFilter::IsNull(f) => Ok(EventFilter::IsNull(f.clone())),
2265 AqlFilter::IsNotNull(f) => Ok(EventFilter::IsNotNull(f.clone())),
2266 AqlFilter::And(filters) => {
2267 let mut mapped = Vec::new();
2268 for f in filters {
2269 mapped.push(ast_filter_to_event_filter(f, vars)?);
2270 }
2271 Ok(EventFilter::And(mapped))
2272 }
2273 AqlFilter::Or(filters) => {
2274 let mut mapped = Vec::new();
2275 for f in filters {
2276 mapped.push(ast_filter_to_event_filter(f, vars)?);
2277 }
2278 Ok(EventFilter::Or(mapped))
2279 }
2280 AqlFilter::Not(f) => Ok(EventFilter::Not(Box::new(ast_filter_to_event_filter(
2281 f, vars,
2282 )?))),
2283 }
2284}
2285
2286async fn execute_introspection(
2287 db: &Aurora,
2288 intro: &ast::IntrospectionQuery,
2289) -> Result<ExecutionResult> {
2290 let names = db.list_collection_names();
2291 let want_fields = intro.fields.is_empty()
2292 || intro
2293 .fields
2294 .iter()
2295 .any(|f| f == "collections" || f == "fields");
2296
2297 let documents: Vec<Document> = names
2298 .iter()
2299 .filter_map(|name| {
2300 if name.starts_with('_') {
2302 return None;
2303 }
2304 let col = db.get_collection_definition(name).ok()?;
2305 let mut data = HashMap::new();
2306 data.insert("name".to_string(), Value::String(name.clone()));
2307
2308 if want_fields {
2309 let field_list: Vec<Value> = col
2310 .fields
2311 .iter()
2312 .map(|(fname, fdef)| {
2313 let mut fdata = HashMap::new();
2314 fdata.insert("name".to_string(), Value::String(fname.clone()));
2315 fdata.insert(
2316 "type".to_string(),
2317 Value::String(fdef.field_type.to_string()),
2318 );
2319 fdata.insert("required".to_string(), Value::Bool(!fdef.nullable));
2320 fdata.insert("indexed".to_string(), Value::Bool(fdef.indexed));
2321 fdata.insert("unique".to_string(), Value::Bool(fdef.unique));
2322 if !fdef.validations.is_empty() {
2323 let vcons: Vec<Value> = fdef
2324 .validations
2325 .iter()
2326 .map(|c| Value::String(format!("{:?}", c)))
2327 .collect();
2328 fdata.insert("validations".to_string(), Value::Array(vcons));
2329 }
2330 Value::Object(fdata)
2331 })
2332 .collect();
2333 data.insert("fields".to_string(), Value::Array(field_list));
2334 }
2335
2336 Some(Document {
2337 id: name.clone(),
2338 data,
2339 })
2340 })
2341 .collect();
2342
2343 let count = documents.len();
2344 Ok(ExecutionResult::Query(QueryResult {
2345 collection: "__schema".to_string(),
2346 documents,
2347 total_count: Some(count),
2348 deferred_fields: vec![],
2349 explain: None,
2350 }))
2351}
2352
2353async fn execute_schema(
2354 db: &Aurora,
2355 schema: &ast::Schema,
2356 _options: &ExecutionOptions,
2357) -> Result<ExecutionResult> {
2358 let mut last_collection = String::new();
2359
2360 for op in &schema.operations {
2361 match op {
2362 ast::SchemaOp::DefineCollection {
2363 name,
2364 fields,
2365 if_not_exists,
2366 ..
2367 } => {
2368 last_collection = name.clone();
2369
2370 if *if_not_exists {
2372 if db.get_collection_definition(name).is_ok() {
2373 continue;
2374 }
2375 }
2376
2377 let mut field_defs = Vec::new();
2378 for field in fields {
2379 field_defs.push((field.name.as_str(), build_field_def(field)));
2380 }
2381 db.new_collection(name, field_defs).await?;
2382 }
2383 ast::SchemaOp::AlterCollection { name, actions } => {
2384 last_collection = name.clone();
2385 for action in actions {
2386 match action {
2387 ast::AlterAction::AddField { field, default } => {
2388 let def = build_field_def(field);
2389 db.add_field_to_schema(name, field.name.clone(), def)
2390 .await?;
2391 if let Some(default_val) = default {
2392 let db_val = aql_value_to_db_value(default_val, &HashMap::new())?;
2393 let docs = db.get_all_collection(name).await?;
2394 for doc in docs {
2395 if !doc.data.contains_key(&field.name) {
2396 db.update_document(
2397 name,
2398 &doc.id,
2399 vec![(&field.name, db_val.clone())],
2400 )
2401 .await?;
2402 }
2403 }
2404 }
2405 }
2406 ast::AlterAction::DropField(field_name) => {
2407 db.drop_field_from_schema(name, field_name.clone()).await?;
2408 }
2409 ast::AlterAction::RenameField { from, to } => {
2410 db.rename_field_in_schema(name, from.clone(), to.clone())
2411 .await?;
2412 let docs = db.get_all_collection(name).await?;
2413 for mut doc in docs {
2414 if let Some(val) = doc.data.remove(from.as_str()) {
2415 doc.data.insert(to.clone(), val);
2416 let key = format!("{}:{}", name, doc.id);
2417 db.put(key, serde_json::to_vec(&doc)?, None).await?;
2418 }
2419 }
2420 }
2421 ast::AlterAction::ModifyField(field) => {
2422 db.modify_field_in_schema(
2423 name,
2424 field.name.clone(),
2425 build_field_def(field),
2426 )
2427 .await?;
2428 }
2429 }
2430 }
2431 }
2432 ast::SchemaOp::DropCollection { name, .. } => {
2433 db.drop_collection_schema(name).await?;
2434 last_collection = name.clone();
2435 }
2436 }
2437 }
2438
2439 Ok(ExecutionResult::Schema(SchemaResult {
2440 operation: "schema".to_string(),
2441 collection: last_collection,
2442 status: "done".to_string(),
2443 }))
2444}
2445
2446fn map_ast_type(anno: &ast::TypeAnnotation) -> FieldType {
2447 let scalar = match anno.name.to_lowercase().as_str() {
2448 "string" => ScalarType::String,
2449 "int" | "integer" => ScalarType::Int,
2450 "float" | "double" => ScalarType::Float,
2451 "bool" | "boolean" => ScalarType::Bool,
2452 "uuid" => ScalarType::Uuid,
2453 "object" => ScalarType::Object,
2454 "array" => ScalarType::Array,
2455 _ => ScalarType::Any,
2456 };
2457
2458 if anno.is_array {
2459 FieldType::Array(scalar)
2460 } else {
2461 match scalar {
2462 ScalarType::Object => FieldType::Object,
2463 ScalarType::Any => FieldType::Any,
2464 _ => FieldType::Scalar(scalar),
2465 }
2466 }
2467}
2468
2469fn parse_validate_directive(
2471 directive: &ast::Directive,
2472) -> Vec<crate::types::FieldValidationConstraint> {
2473 use crate::types::FieldValidationConstraint as FVC;
2474 let mut constraints = Vec::new();
2475 for arg in &directive.arguments {
2476 match arg.name.as_str() {
2477 "format" => {
2478 if let ast::Value::String(s) = &arg.value {
2479 constraints.push(FVC::Format(s.clone()));
2480 }
2481 }
2482 "min" => {
2483 let n = match &arg.value {
2484 ast::Value::Float(f) => Some(*f),
2485 ast::Value::Int(i) => Some(*i as f64),
2486 _ => None,
2487 };
2488 if let Some(n) = n {
2489 constraints.push(FVC::Min(n));
2490 }
2491 }
2492 "max" => {
2493 let n = match &arg.value {
2494 ast::Value::Float(f) => Some(*f),
2495 ast::Value::Int(i) => Some(*i as f64),
2496 _ => None,
2497 };
2498 if let Some(n) = n {
2499 constraints.push(FVC::Max(n));
2500 }
2501 }
2502 "minLength" => {
2503 if let ast::Value::Int(i) = &arg.value {
2504 constraints.push(FVC::MinLength(*i));
2505 }
2506 }
2507 "maxLength" => {
2508 if let ast::Value::Int(i) = &arg.value {
2509 constraints.push(FVC::MaxLength(*i));
2510 }
2511 }
2512 "pattern" => {
2513 if let ast::Value::String(s) = &arg.value {
2514 constraints.push(FVC::Pattern(s.clone()));
2515 }
2516 }
2517 _ => {}
2518 }
2519 }
2520 constraints
2521}
2522
2523fn build_field_def(field: &ast::FieldDef) -> FieldDefinition {
2525 let field_type = map_ast_type(&field.field_type);
2526 let mut indexed = false;
2527 let mut unique = false;
2528 let mut validations = Vec::new();
2529 for directive in &field.directives {
2530 match directive.name.as_str() {
2531 "indexed" | "index" => indexed = true,
2532 "unique" => {
2533 unique = true;
2534 indexed = true;
2535 }
2536 "primary" => {
2537 indexed = true;
2538 unique = true;
2539 }
2540 "validate" => validations.extend(parse_validate_directive(directive)),
2541 _ => {}
2542 }
2543 }
2544 FieldDefinition {
2545 field_type,
2546 unique,
2547 indexed,
2548 nullable: !field.field_type.is_required,
2549 validations,
2550 }
2551}
2552
2553async fn execute_migration(
2554 db: &Aurora,
2555 migration: &ast::Migration,
2556 _options: &ExecutionOptions,
2557) -> Result<ExecutionResult> {
2558 let mut steps_applied = 0;
2559 let mut last_version = String::new();
2560
2561 for step in &migration.steps {
2562 last_version = step.version.clone();
2563
2564 if db.is_migration_applied(&step.version).await? {
2565 eprintln!(
2566 "[migration] version '{}' already applied — skipping",
2567 step.version
2568 );
2569 continue;
2570 }
2571
2572 eprintln!("[migration] applying version '{}'", step.version);
2573
2574 for action in &step.actions {
2575 match action {
2576 ast::MigrationAction::Schema(schema_op) => {
2577 execute_single_schema_op(db, schema_op).await?;
2578 }
2579 ast::MigrationAction::DataMigration(dm) => {
2580 execute_data_migration(db, dm).await?;
2581 }
2582 }
2583 }
2584
2585 db.mark_migration_applied(&step.version).await?;
2586 steps_applied += 1;
2587 eprintln!("[migration] version '{}' applied", step.version);
2588 }
2589
2590 let status = if steps_applied > 0 {
2591 "applied".to_string()
2592 } else {
2593 "skipped".to_string()
2594 };
2595
2596 Ok(ExecutionResult::Migration(MigrationResult {
2597 version: last_version,
2598 steps_applied,
2599 status,
2600 }))
2601}
2602
2603async fn execute_single_schema_op(db: &Aurora, op: &ast::SchemaOp) -> Result<()> {
2605 match op {
2606 ast::SchemaOp::DefineCollection {
2607 name,
2608 fields,
2609 if_not_exists,
2610 ..
2611 } => {
2612 if *if_not_exists && db.get_collection_definition(name).is_ok() {
2613 return Ok(());
2614 }
2615 let field_defs: Vec<(&str, FieldDefinition)> = fields
2616 .iter()
2617 .map(|f| (f.name.as_str(), build_field_def(f)))
2618 .collect();
2619 db.new_collection(name, field_defs).await?;
2620 }
2621 ast::SchemaOp::AlterCollection { name, actions } => {
2622 for action in actions {
2623 match action {
2624 ast::AlterAction::AddField { field, default } => {
2625 db.add_field_to_schema(name, field.name.clone(), build_field_def(field))
2626 .await?;
2627 if let Some(default_val) = default {
2628 let db_val = aql_value_to_db_value(default_val, &HashMap::new())?;
2629 let docs = db.get_all_collection(name).await?;
2630 for doc in docs {
2631 if !doc.data.contains_key(&field.name) {
2632 db.update_document(
2633 name,
2634 &doc.id,
2635 vec![(&field.name, db_val.clone())],
2636 )
2637 .await?;
2638 }
2639 }
2640 }
2641 }
2642 ast::AlterAction::DropField(field_name) => {
2643 db.drop_field_from_schema(name, field_name.clone()).await?;
2644 }
2645 ast::AlterAction::RenameField { from, to } => {
2646 db.rename_field_in_schema(name, from.clone(), to.clone())
2647 .await?;
2648 let docs = db.get_all_collection(name).await?;
2650 for mut doc in docs {
2651 if let Some(val) = doc.data.remove(from.as_str()) {
2652 doc.data.insert(to.clone(), val);
2653 let key = format!("{}:{}", name, doc.id);
2654 db.put(key, serde_json::to_vec(&doc)?, None).await?;
2655 }
2656 }
2657 }
2658 ast::AlterAction::ModifyField(field) => {
2659 db.modify_field_in_schema(name, field.name.clone(), build_field_def(field))
2660 .await?;
2661 }
2662 }
2663 }
2664 }
2665 ast::SchemaOp::DropCollection { name, if_exists } => {
2666 if *if_exists && db.get_collection_definition(name).is_err() {
2667 return Ok(());
2668 }
2669 db.drop_collection_schema(name).await?;
2670 }
2671 }
2672 Ok(())
2673}
2674
2675async fn execute_data_migration(db: &Aurora, dm: &ast::DataMigration) -> Result<()> {
2677 let docs = db.get_all_collection(&dm.collection).await?;
2678
2679 for doc in docs {
2680 for transform in &dm.transforms {
2681 let matches = match &transform.filter {
2682 Some(filter) => {
2683 let compiled = compile_filter(filter)?;
2684 matches_filter(&doc, &compiled, &HashMap::new())
2685 }
2686 None => true,
2687 };
2688
2689 if matches {
2690 let new_value = eval_migration_expr(&transform.expression, &doc);
2691 let mut updates = HashMap::new();
2692 updates.insert(transform.field.clone(), new_value);
2693 db.aql_update_document(&dm.collection, &doc.id, updates)
2694 .await?;
2695 }
2696 }
2697 }
2698 Ok(())
2699}
2700
2701fn eval_migration_expr(expr: &str, doc: &Document) -> Value {
2711 let expr = expr.trim();
2712
2713 if expr.starts_with('"') && expr.ends_with('"') && expr.len() >= 2 {
2714 return Value::String(expr[1..expr.len() - 1].to_string());
2715 }
2716 if expr == "true" {
2717 return Value::Bool(true);
2718 }
2719 if expr == "false" {
2720 return Value::Bool(false);
2721 }
2722 if expr == "null" {
2723 return Value::Null;
2724 }
2725 if let Ok(n) = expr.parse::<i64>() {
2726 return Value::Int(n);
2727 }
2728 if let Ok(f) = expr.parse::<f64>() {
2729 return Value::Float(f);
2730 }
2731 if let Some(v) = doc.data.get(expr) {
2732 return v.clone();
2733 }
2734
2735 Value::Null
2736}
2737
2738fn extract_filter_from_args(args: &[ast::Argument]) -> Result<Option<AqlFilter>> {
2739 for a in args {
2740 if a.name == "where" || a.name == "filter" {
2741 return Ok(Some(value_to_filter(&a.value)?));
2742 }
2743 }
2744 Ok(None)
2745}
2746
2747fn extract_order_by(args: &[ast::Argument]) -> Vec<ast::Ordering> {
2748 let mut orderings = Vec::new();
2749 for a in args {
2750 if a.name == "orderBy" {
2751 match &a.value {
2752 ast::Value::String(f) => orderings.push(ast::Ordering {
2753 field: f.clone(),
2754 direction: ast::SortDirection::Asc,
2755 }),
2756 ast::Value::Object(obj) => {
2757 if let (Some(ast::Value::String(field_name)), Some(dir_val)) =
2759 (obj.get("field"), obj.get("direction"))
2760 {
2761 let direction = match dir_val {
2762 ast::Value::Enum(s) | ast::Value::String(s) => {
2763 if s.to_uppercase() == "DESC" {
2764 ast::SortDirection::Desc
2765 } else {
2766 ast::SortDirection::Asc
2767 }
2768 }
2769 _ => ast::SortDirection::Asc,
2770 };
2771 orderings.push(ast::Ordering {
2772 field: field_name.clone(),
2773 direction,
2774 });
2775 } else {
2776 for (field, dir_val) in obj {
2778 let direction = match dir_val {
2779 ast::Value::Enum(s) | ast::Value::String(s) => {
2780 if s.to_uppercase() == "DESC" {
2781 ast::SortDirection::Desc
2782 } else {
2783 ast::SortDirection::Asc
2784 }
2785 }
2786 _ => ast::SortDirection::Asc,
2787 };
2788 orderings.push(ast::Ordering {
2789 field: field.clone(),
2790 direction,
2791 });
2792 }
2793 }
2794 }
2795 _ => {}
2796 }
2797 }
2798 }
2799 orderings
2800}
2801
2802fn apply_ordering(docs: &mut [Document], orderings: &[ast::Ordering]) {
2803 docs.sort_by(|a, b| {
2804 for o in orderings {
2805 let cmp = compare_values(a.data.get(&o.field), b.data.get(&o.field));
2806 if cmp != std::cmp::Ordering::Equal {
2807 return match o.direction {
2808 ast::SortDirection::Asc => cmp,
2809 ast::SortDirection::Desc => cmp.reverse(),
2810 };
2811 }
2812 }
2813 std::cmp::Ordering::Equal
2814 });
2815}
2816
2817fn compare_values(a: Option<&Value>, b: Option<&Value>) -> std::cmp::Ordering {
2818 match (a, b) {
2819 (None, None) => std::cmp::Ordering::Equal,
2820 (None, Some(_)) => std::cmp::Ordering::Less,
2821 (Some(_), None) => std::cmp::Ordering::Greater,
2822 (Some(av), Some(bv)) => av.partial_cmp(bv).unwrap_or(std::cmp::Ordering::Equal),
2823 }
2824}
2825
2826pub fn extract_pagination(args: &[ast::Argument]) -> (Option<usize>, usize) {
2827 let (mut limit, mut offset) = (None, 0);
2828 for a in args {
2829 match a.name.as_str() {
2830 "limit" | "first" => {
2831 if let ast::Value::Int(n) = a.value {
2832 limit = Some(n as usize);
2833 }
2834 }
2835 "offset" | "skip" => {
2836 if let ast::Value::Int(n) = a.value {
2837 offset = n as usize;
2838 }
2839 }
2840 _ => {}
2841 }
2842 }
2843 (limit, offset)
2844}
2845
2846fn extract_cursor_pagination(
2847 args: &[ast::Argument],
2848) -> (Option<usize>, Option<String>, Option<usize>, Option<String>) {
2849 let (mut first, mut after, mut last, mut before) = (None, None, None, None);
2850 for a in args {
2851 match a.name.as_str() {
2852 "first" => {
2853 if let ast::Value::Int(n) = a.value {
2854 first = Some(n as usize);
2855 }
2856 }
2857 "after" => {
2858 if let ast::Value::String(s) = &a.value {
2859 after = Some(s.clone());
2860 }
2861 }
2862 "last" => {
2863 if let ast::Value::Int(n) = a.value {
2864 last = Some(n as usize);
2865 }
2866 }
2867 "before" => {
2868 if let ast::Value::String(s) = &a.value {
2869 before = Some(s.clone());
2870 }
2871 }
2872 _ => {}
2873 }
2874 }
2875 (first, after, last, before)
2876}
2877
2878fn execute_connection(
2879 mut docs: Vec<Document>,
2880 sub_fields: &[ast::Field],
2881 limit: Option<usize>,
2882 fragments: &HashMap<String, FragmentDef>,
2883 variables: &HashMap<String, ast::Value>,
2884) -> QueryResult {
2885 let has_next_page = if let Some(l) = limit {
2886 docs.len() > l
2887 } else {
2888 false
2889 };
2890
2891 if has_next_page {
2892 docs.truncate(limit.unwrap());
2893 }
2894
2895 let mut edges = Vec::with_capacity(docs.len());
2896 let mut end_cursor = String::new();
2897
2898 let node_fields = if let Some(edges_field) = sub_fields.iter().find(|f| f.name == "edges") {
2900 let edges_sub_fields =
2901 collect_fields(&edges_field.selection_set, fragments, variables, None)
2902 .unwrap_or_default();
2903 if let Some(node_field) = edges_sub_fields.into_iter().find(|f| f.name == "node") {
2904 collect_fields(&node_field.selection_set, fragments, variables, None)
2905 .unwrap_or_default()
2906 } else {
2907 Vec::new()
2908 }
2909 } else {
2910 Vec::new()
2911 };
2912
2913 for doc in &docs {
2914 let cursor = doc.id.clone();
2915 end_cursor = cursor.clone();
2916
2917 let mut edge_data = HashMap::new();
2918 edge_data.insert("cursor".to_string(), Value::String(cursor));
2919
2920 let node_doc = if node_fields.is_empty() {
2921 doc.clone()
2922 } else {
2923 apply_projection(doc.clone(), &node_fields)
2924 };
2925
2926 edge_data.insert("node".to_string(), Value::Object(node_doc.data));
2927 edges.push(Value::Object(edge_data));
2928 }
2929
2930 let mut page_info = HashMap::new();
2931 page_info.insert("hasNextPage".to_string(), Value::Bool(has_next_page));
2932 page_info.insert("endCursor".to_string(), Value::String(end_cursor));
2933
2934 let mut conn_data = HashMap::new();
2935 conn_data.insert("edges".to_string(), Value::Array(edges));
2936 conn_data.insert("pageInfo".to_string(), Value::Object(page_info));
2937
2938 QueryResult {
2939 collection: String::new(),
2940 documents: vec![Document {
2941 id: "connection".to_string(),
2942 data: conn_data,
2943 }],
2944 total_count: None,
2945 deferred_fields: vec![],
2946 explain: None,
2947 }
2948}
2949
2950pub fn matches_filter(
2951 doc: &Document,
2952 filter: &CompiledFilter,
2953 vars: &HashMap<String, ast::Value>,
2954) -> bool {
2955 match filter {
2956 CompiledFilter::Eq(f, v) => doc
2957 .data
2958 .get(f)
2959 .map_or(false, |dv| values_equal(dv, v, vars)),
2960 CompiledFilter::Ne(f, v) => doc
2961 .data
2962 .get(f)
2963 .map_or(true, |dv| !values_equal(dv, v, vars)),
2964 CompiledFilter::Gt(f, v) => doc.data.get(f).map_or(false, |dv| {
2965 if let Ok(bv) = aql_value_to_db_value(v, vars) {
2966 return dv > &bv;
2967 }
2968 false
2969 }),
2970 CompiledFilter::Gte(f, v) => doc.data.get(f).map_or(false, |dv| {
2971 if let Ok(bv) = aql_value_to_db_value(v, vars) {
2972 return dv >= &bv;
2973 }
2974 false
2975 }),
2976 CompiledFilter::Lt(f, v) => doc.data.get(f).map_or(false, |dv| {
2977 if let Ok(bv) = aql_value_to_db_value(v, vars) {
2978 return dv < &bv;
2979 }
2980 false
2981 }),
2982 CompiledFilter::Lte(f, v) => doc.data.get(f).map_or(false, |dv| {
2983 if let Ok(bv) = aql_value_to_db_value(v, vars) {
2984 return dv <= &bv;
2985 }
2986 false
2987 }),
2988 CompiledFilter::In(f, v) => doc.data.get(f).map_or(false, |dv| {
2989 if let Ok(Value::Array(arr)) = aql_value_to_db_value(v, vars) {
2990 return arr.contains(dv);
2991 }
2992 false
2993 }),
2994 CompiledFilter::NotIn(f, v) => doc.data.get(f).map_or(true, |dv| {
2995 if let Ok(Value::Array(arr)) = aql_value_to_db_value(v, vars) {
2996 return !arr.contains(dv);
2997 }
2998 true
2999 }),
3000 CompiledFilter::Contains(f, v) => {
3001 if let Some(dv) = doc.data.get(f) {
3002 match (dv, resolve_if_variable(v, vars)) {
3003 (Value::String(s), ast::Value::String(sub)) => s.contains(sub),
3004 (Value::Array(arr), _) => {
3005 if let Ok(bv) = aql_value_to_db_value(v, vars) {
3006 return arr.contains(&bv);
3007 }
3008 false
3009 }
3010 _ => false,
3011 }
3012 } else {
3013 false
3014 }
3015 }
3016 CompiledFilter::ContainsAny(f, v) => {
3018 if let (Some(Value::Array(field_arr)), Ok(Value::Array(check_arr))) =
3019 (doc.data.get(f), aql_value_to_db_value(v, vars))
3020 {
3021 check_arr.iter().any(|item| field_arr.contains(item))
3022 } else {
3023 false
3024 }
3025 }
3026 CompiledFilter::ContainsAll(f, v) => {
3028 if let (Some(Value::Array(field_arr)), Ok(Value::Array(check_arr))) =
3029 (doc.data.get(f), aql_value_to_db_value(v, vars))
3030 {
3031 check_arr.iter().all(|item| field_arr.contains(item))
3032 } else {
3033 false
3034 }
3035 }
3036 CompiledFilter::StartsWith(f, v) => {
3037 if let (Some(Value::String(s)), ast::Value::String(pre)) =
3038 (doc.data.get(f), resolve_if_variable(v, vars))
3039 {
3040 s.starts_with(pre)
3041 } else {
3042 false
3043 }
3044 }
3045 CompiledFilter::EndsWith(f, v) => {
3046 if let (Some(Value::String(s)), ast::Value::String(suf)) =
3047 (doc.data.get(f), resolve_if_variable(v, vars))
3048 {
3049 s.ends_with(suf)
3050 } else {
3051 false
3052 }
3053 }
3054 CompiledFilter::Matches(f, re) => {
3055 if let Some(Value::String(s)) = doc.data.get(f) {
3056 re.is_match(s)
3057 } else {
3058 false
3059 }
3060 }
3061 CompiledFilter::IsNull(f) => doc.data.get(f).map_or(true, |v| matches!(v, Value::Null)),
3062 CompiledFilter::IsNotNull(f) => {
3063 doc.data.get(f).map_or(false, |v| !matches!(v, Value::Null))
3064 }
3065 CompiledFilter::And(fs) => fs.iter().all(|f| matches_filter(doc, f, vars)),
3066 CompiledFilter::Or(fs) => fs.iter().any(|f| matches_filter(doc, f, vars)),
3067 CompiledFilter::Not(f) => !matches_filter(doc, f, vars),
3068 }
3069}
3070
3071fn apply_field_modifier(existing: Option<&Value>, new_val: &Value) -> Value {
3076 if let Value::Object(modifier) = new_val {
3077 if let Some(delta) = modifier.get("increment") {
3078 match (existing, delta) {
3079 (Some(Value::Int(c)), Value::Int(d)) => return Value::Int(c + d),
3080 (Some(Value::Float(c)), Value::Float(d)) => return Value::Float(c + d),
3081 (Some(Value::Int(c)), Value::Float(d)) => return Value::Float(*c as f64 + d),
3082 _ => {}
3083 }
3084 }
3085 if let Some(delta) = modifier.get("decrement") {
3086 match (existing, delta) {
3087 (Some(Value::Int(c)), Value::Int(d)) => return Value::Int(c - d),
3088 (Some(Value::Float(c)), Value::Float(d)) => return Value::Float(c - d),
3089 (Some(Value::Int(c)), Value::Float(d)) => return Value::Float(*c as f64 - d),
3090 _ => {}
3091 }
3092 }
3093 if let Some(item) = modifier.get("push") {
3094 if let Some(Value::Array(mut arr)) = existing.cloned() {
3095 arr.push(item.clone());
3096 return Value::Array(arr);
3097 }
3098 return Value::Array(vec![item.clone()]);
3099 }
3100 if let Some(item) = modifier.get("pull") {
3101 if let Some(Value::Array(arr)) = existing {
3102 let filtered: Vec<Value> = arr.iter().filter(|v| *v != item).cloned().collect();
3103 return Value::Array(filtered);
3104 }
3105 return Value::Array(vec![]);
3106 }
3107 if let Some(item) = modifier.get("addToSet") {
3108 if let Some(Value::Array(mut arr)) = existing.cloned() {
3109 if !arr.contains(item) {
3110 arr.push(item.clone());
3111 }
3112 return Value::Array(arr);
3113 }
3114 return Value::Array(vec![item.clone()]);
3115 }
3116 }
3117 new_val.clone()
3118}
3119
3120fn values_equal(dv: &Value, av: &ast::Value, vars: &HashMap<String, ast::Value>) -> bool {
3121 if let Ok(bv) = aql_value_to_db_value(av, vars) {
3122 return dv == &bv;
3123 }
3124 false
3125}
3126
3127fn resolve_if_variable<'a>(
3128 v: &'a ast::Value,
3129 vars: &'a HashMap<String, ast::Value>,
3130) -> &'a ast::Value {
3131 if let ast::Value::Variable(n) = v {
3132 vars.get(n).unwrap_or(v)
3133 } else {
3134 v
3135 }
3136}
3137
3138pub fn apply_projection(doc: Document, fields: &[ast::Field]) -> Document {
3139 let (projected, _) = apply_projection_and_defer(doc, fields);
3140 projected
3141}
3142
3143async fn apply_projection_with_lookups(
3146 db: &Aurora,
3147 mut doc: Document,
3148 fields: &[ast::Field],
3149 vars: &HashMap<String, ast::Value>,
3150) -> Result<(Document, Vec<String>)> {
3151 if fields.is_empty() {
3152 return Ok((doc, vec![]));
3153 }
3154 let mut proj = HashMap::new();
3155 let mut deferred = Vec::new();
3156
3157 for f in fields {
3158 if f.directives.iter().any(|d| d.name == "defer") {
3160 deferred.push(f.alias.as_ref().unwrap_or(&f.name).clone());
3161 continue;
3162 }
3163
3164 let coll_arg = f.arguments.iter().find(|a| a.name == "collection");
3166 let local_arg = f.arguments.iter().find(|a| a.name == "localField");
3167 let foreign_arg = f.arguments.iter().find(|a| a.name == "foreignField");
3168
3169 if let (Some(c), Some(lf), Some(ff)) = (coll_arg, local_arg, foreign_arg) {
3170 let c_val = resolve_if_variable(&c.value, vars);
3172 let lf_val = resolve_if_variable(&lf.value, vars);
3173 let ff_val = resolve_if_variable(&ff.value, vars);
3174 let foreign_coll = if let ast::Value::String(s) = c_val {
3175 s.as_str()
3176 } else {
3177 continue;
3178 };
3179 let local_field = if let ast::Value::String(s) = lf_val {
3180 s.as_str()
3181 } else {
3182 continue;
3183 };
3184 let foreign_field = if let ast::Value::String(s) = ff_val {
3185 s.as_str()
3186 } else {
3187 continue;
3188 };
3189
3190 let local_val = doc.data.get(local_field).cloned().or_else(|| {
3192 if local_field == "id" {
3193 Some(Value::String(doc.id.clone()))
3194 } else {
3195 None
3196 }
3197 });
3198
3199 if let Some(match_val) = local_val {
3200 let extra_filter = f
3202 .arguments
3203 .iter()
3204 .find(|a| a.name == "where")
3205 .and_then(|a| {
3206 let resolved = resolve_ast_deep(&a.value, vars);
3207 value_to_filter(&resolved).ok()
3208 });
3209
3210 let vars_arc = Arc::new(vars.clone());
3211 let foreign_docs = db.scan_and_filter(
3212 foreign_coll,
3213 |fdoc| {
3214 let field_match = fdoc
3215 .data
3216 .get(foreign_field)
3217 .map(|v| values_equal_db(v, &match_val))
3218 .unwrap_or(foreign_field == "id" && fdoc.id == match_val.to_string());
3219 if !field_match {
3220 return false;
3221 }
3222 if let Some(ref ef) = extra_filter {
3223 let compiled = compile_filter(ef)
3224 .unwrap_or(CompiledFilter::Eq("_".into(), ast::Value::Null));
3225 matches_filter(fdoc, &compiled, &vars_arc)
3226 } else {
3227 true
3228 }
3229 },
3230 None,
3231 )?;
3232
3233 let sub_projected: Vec<Value> = if f.selection_set.is_empty() {
3235 foreign_docs
3236 .into_iter()
3237 .map(|fd| Value::Object(fd.data))
3238 .collect()
3239 } else {
3240 let sub_fields: Vec<ast::Field> = f
3241 .selection_set
3242 .iter()
3243 .filter_map(|sel| {
3244 if let Selection::Field(sf) = sel {
3245 Some(sf.clone())
3246 } else {
3247 None
3248 }
3249 })
3250 .collect();
3251 foreign_docs
3252 .into_iter()
3253 .map(|fd| {
3254 let (proj_fd, _) = apply_projection_and_defer(fd, &sub_fields);
3255 Value::Object(proj_fd.data)
3256 })
3257 .collect()
3258 };
3259
3260 let alias = f.alias.as_ref().unwrap_or(&f.name).clone();
3261 proj.insert(alias, Value::Array(sub_projected));
3262 }
3263 continue;
3264 }
3265
3266 if f.name == "__compute__" {
3268 let alias = f.alias.as_deref().unwrap_or("computed");
3269 if let Some(expr) = f.arguments.iter().find(|a| a.name == "expr") {
3270 let resolved_expr = resolve_if_variable(&expr.value, vars);
3271 if let ast::Value::String(template) = resolved_expr {
3272 let result = eval_template(template, &doc.data);
3273 proj.insert(alias.to_string(), Value::String(result));
3274 }
3275 }
3276 continue;
3277 }
3278
3279 if f.name == "id" {
3281 proj.insert(
3282 f.alias.as_ref().unwrap_or(&f.name).clone(),
3283 Value::String(doc.id.clone()),
3284 );
3285 } else if let Some(v) = doc.data.get(&f.name) {
3286 proj.insert(f.alias.as_ref().unwrap_or(&f.name).clone(), v.clone());
3287 }
3288 }
3289 doc.data = proj;
3290 Ok((doc, deferred))
3291}
3292
3293fn values_equal_db(a: &Value, b: &Value) -> bool {
3294 match (a, b) {
3295 (Value::String(s1), Value::String(s2)) => s1 == s2,
3296 (Value::Int(i1), Value::Int(i2)) => i1 == i2,
3297 (Value::Float(f1), Value::Float(f2)) => (f1 - f2).abs() < f64::EPSILON,
3298 (Value::Bool(b1), Value::Bool(b2)) => b1 == b2,
3299 _ => false,
3300 }
3301}
3302
3303pub fn aql_value_to_db_value(v: &ast::Value, vars: &HashMap<String, ast::Value>) -> Result<Value> {
3304 let resolved = resolve_if_variable(v, vars);
3305 match resolved {
3306 ast::Value::Int(i) => Ok(Value::Int(*i)),
3307 ast::Value::Float(f) => Ok(Value::Float(*f)),
3308 ast::Value::Boolean(b) => Ok(Value::Bool(*b)),
3309 ast::Value::String(s) => Ok(Value::String(s.clone())),
3310 ast::Value::Enum(s) => Ok(Value::String(s.clone())),
3311 ast::Value::Null => Ok(Value::Null),
3312 ast::Value::Variable(name) => Err(AqlError::new(
3313 ErrorCode::UndefinedVariable,
3314 format!("Variable '{}' not found", name),
3315 )),
3316 ast::Value::Array(arr) => {
3317 let mut vals = Vec::with_capacity(arr.len());
3318 for v in arr {
3319 vals.push(aql_value_to_db_value(v, vars)?);
3320 }
3321 Ok(Value::Array(vals))
3322 }
3323 ast::Value::Object(obj) => {
3324 let mut map = HashMap::with_capacity(obj.len());
3325 for (k, v) in obj {
3326 map.insert(k.clone(), aql_value_to_db_value(v, vars)?);
3327 }
3328 Ok(Value::Object(map))
3329 }
3330 }
3331}
3332
3333fn aql_value_to_json(v: &ast::Value) -> serde_json::Value {
3335 match v {
3336 ast::Value::Null => serde_json::Value::Null,
3337 ast::Value::Boolean(b) => serde_json::Value::Bool(*b),
3338 ast::Value::Int(i) => serde_json::Value::Number((*i).into()),
3339 ast::Value::Float(f) => serde_json::Number::from_f64(*f)
3340 .map(serde_json::Value::Number)
3341 .unwrap_or(serde_json::Value::Null),
3342 ast::Value::String(s) | ast::Value::Enum(s) => serde_json::Value::String(s.clone()),
3343 ast::Value::Array(arr) => {
3344 serde_json::Value::Array(arr.iter().map(aql_value_to_json).collect())
3345 }
3346 ast::Value::Object(obj) => serde_json::Value::Object(
3347 obj.iter()
3348 .map(|(k, v)| (k.clone(), aql_value_to_json(v)))
3349 .collect(),
3350 ),
3351 ast::Value::Variable(_) => serde_json::Value::Null,
3352 }
3353}
3354
3355fn aql_value_to_hashmap(
3356 v: &ast::Value,
3357 vars: &HashMap<String, ast::Value>,
3358) -> Result<HashMap<String, Value>> {
3359 if let ast::Value::Object(m) = resolve_if_variable(v, vars) {
3360 let mut res = HashMap::new();
3361 for (k, v) in m {
3362 res.insert(k.clone(), aql_value_to_db_value(v, vars)?);
3363 }
3364 Ok(res)
3365 } else {
3366 Err(AqlError::new(
3367 ErrorCode::QueryError,
3368 "Data must be object".to_string(),
3369 ))
3370 }
3371}
3372
3373fn aurora_value_to_json_value(v: &Value) -> JsonValue {
3374 match v {
3375 Value::Null => JsonValue::Null,
3376 Value::String(s) => JsonValue::String(s.clone()),
3377 Value::Int(i) => JsonValue::Number((*i).into()),
3378 Value::Float(f) => serde_json::Number::from_f64(*f)
3379 .map(JsonValue::Number)
3380 .unwrap_or(JsonValue::Null),
3381 Value::Bool(b) => JsonValue::Bool(*b),
3382 Value::Array(arr) => JsonValue::Array(arr.iter().map(aurora_value_to_json_value).collect()),
3383 Value::Object(m) => {
3384 let mut jm = serde_json::Map::new();
3385 for (k, v) in m {
3386 jm.insert(k.clone(), aurora_value_to_json_value(v));
3387 }
3388 JsonValue::Object(jm)
3389 }
3390 Value::Uuid(u) => JsonValue::String(u.to_string()),
3391 Value::DateTime(dt) => JsonValue::String(dt.to_rfc3339()),
3392 }
3393}
3394
3395fn find_indexed_equality_filter(
3397 filter: &AqlFilter,
3398 db: &Aurora,
3399 collection: &str,
3400) -> Option<(String, ast::Value)> {
3401 match filter {
3402 AqlFilter::Eq(field, val) => {
3403 if field == "id" || db.has_index(collection, field) {
3404 Some((field.clone(), val.clone()))
3405 } else {
3406 None
3407 }
3408 }
3409 AqlFilter::And(filters) => {
3410 for f in filters {
3411 if let Some(res) = find_indexed_equality_filter(f, db, collection) {
3412 return Some(res);
3413 }
3414 }
3415 None
3416 }
3417 _ => None,
3418 }
3419}
3420
3421pub fn value_to_filter(v: &ast::Value) -> Result<AqlFilter> {
3422 if let ast::Value::Object(m) = v {
3423 let mut fs = Vec::new();
3424 for (k, val) in m {
3425 match k.as_str() {
3426 "or" => {
3427 if let ast::Value::Array(arr) = val {
3428 let mut sub = Vec::new();
3429 for item in arr {
3430 sub.push(value_to_filter(item)?);
3431 }
3432 return Ok(AqlFilter::Or(sub));
3433 }
3434 }
3435 "and" => {
3436 if let ast::Value::Array(arr) = val {
3437 let mut sub = Vec::new();
3438 for item in arr {
3439 sub.push(value_to_filter(item)?);
3440 }
3441 return Ok(AqlFilter::And(sub));
3442 }
3443 }
3444 "not" => {
3445 return Ok(AqlFilter::Not(Box::new(value_to_filter(val)?)));
3446 }
3447 field => {
3448 if let ast::Value::Object(ops) = val {
3449 for (op, ov) in ops {
3450 match op.as_str() {
3451 "eq" => fs.push(AqlFilter::Eq(field.to_string(), ov.clone())),
3452 "ne" => fs.push(AqlFilter::Ne(field.to_string(), ov.clone())),
3453 "gt" => fs.push(AqlFilter::Gt(field.to_string(), ov.clone())),
3454 "gte" => fs.push(AqlFilter::Gte(field.to_string(), ov.clone())),
3455 "lt" => fs.push(AqlFilter::Lt(field.to_string(), ov.clone())),
3456 "lte" => fs.push(AqlFilter::Lte(field.to_string(), ov.clone())),
3457 "in" => fs.push(AqlFilter::In(field.to_string(), ov.clone())),
3458 "notin" => fs.push(AqlFilter::NotIn(field.to_string(), ov.clone())),
3459 "contains" => {
3460 fs.push(AqlFilter::Contains(field.to_string(), ov.clone()))
3461 }
3462 "containsAny" => {
3463 fs.push(AqlFilter::ContainsAny(field.to_string(), ov.clone()))
3464 }
3465 "containsAll" => {
3466 fs.push(AqlFilter::ContainsAll(field.to_string(), ov.clone()))
3467 }
3468 "startsWith" => {
3469 fs.push(AqlFilter::StartsWith(field.to_string(), ov.clone()))
3470 }
3471 "endsWith" => {
3472 fs.push(AqlFilter::EndsWith(field.to_string(), ov.clone()))
3473 }
3474 "matches" => {
3475 fs.push(AqlFilter::Matches(field.to_string(), ov.clone()))
3476 }
3477 _ => {}
3478 }
3479 }
3480 }
3481 }
3482 }
3483 }
3484 if fs.is_empty() {
3485 Ok(AqlFilter::And(vec![]))
3486 } else if fs.len() == 1 {
3487 Ok(fs.remove(0))
3488 } else {
3489 Ok(AqlFilter::And(fs))
3490 }
3491 } else {
3492 Err(AqlError::new(
3493 ErrorCode::QueryError,
3494 "Filter must be object".to_string(),
3495 ))
3496 }
3497}
3498
3499fn resolve_value(
3500 v: &ast::Value,
3501 vars: &HashMap<String, ast::Value>,
3502 _ctx: &ExecutionContext,
3503) -> ast::Value {
3504 match v {
3505 ast::Value::Variable(n) => vars.get(n).cloned().unwrap_or(ast::Value::Null),
3506 _ => v.clone(),
3507 }
3508}
3509
3510fn resolve_ast_deep(v: &ast::Value, vars: &HashMap<String, ast::Value>) -> ast::Value {
3515 match v {
3516 ast::Value::Variable(n) => vars.get(n).cloned().unwrap_or(ast::Value::Null),
3517 ast::Value::Object(m) => ast::Value::Object(
3518 m.iter()
3519 .map(|(k, val)| (k.clone(), resolve_ast_deep(val, vars)))
3520 .collect(),
3521 ),
3522 ast::Value::Array(arr) => {
3523 ast::Value::Array(arr.iter().map(|val| resolve_ast_deep(val, vars)).collect())
3524 }
3525 _ => v.clone(),
3526 }
3527}
3528
3529#[cfg(test)]
3530mod tests {
3531 use super::*;
3532 use crate::{Aurora, AuroraConfig, DurabilityMode, FieldType};
3533 use tempfile::TempDir;
3534
3535 #[tokio::test]
3536 async fn test_executor_integration() {
3537 let td = TempDir::new().unwrap();
3538 let db = Aurora::with_config(AuroraConfig {
3539 db_path: td.path().join("test.db"),
3540 enable_write_buffering: false,
3541 durability_mode: DurabilityMode::Strict,
3542 ..Default::default()
3543 })
3544 .await
3545 .unwrap();
3546 db.new_collection(
3547 "users",
3548 vec![(
3549 "name",
3550 crate::types::FieldDefinition {
3551 field_type: FieldType::SCALAR_STRING,
3552 unique: false,
3553 indexed: true,
3554 nullable: true,
3555 ..Default::default()
3556 },
3557 )],
3558 )
3559 .await
3560 .unwrap();
3561 let _ = execute(
3562 &db,
3563 r#"mutation { insertInto(collection: "users", data: { name: "Alice" }) { id name } }"#,
3564 ExecutionOptions::new(),
3565 )
3566 .await
3567 .unwrap();
3568 let res = execute(&db, "query { users { name } }", ExecutionOptions::new())
3569 .await
3570 .unwrap();
3571 if let ExecutionResult::Query(q) = res {
3572 assert_eq!(q.documents.len(), 1);
3573 } else {
3574 panic!("Expected query");
3575 }
3576 }
3577}