1use super::ast::{self, Field, Filter as AqlFilter, FragmentDef, MutationOp, Operation, Selection};
7use super::executor_utils::{CompiledFilter, compile_filter};
8
9use crate::Aurora;
10use crate::error::{AqlError, ErrorCode, Result};
11use crate::types::{Document, FieldDefinition, FieldType, ScalarType, Value};
12use serde::Serialize;
13use serde_json::Value as JsonValue;
14use std::collections::HashMap;
15use std::sync::Arc;
16
17#[allow(unused_imports)]
19use chrono::TimeZone as _;
20
21pub type ExecutionContext = HashMap<String, JsonValue>;
22
23#[derive(Debug, Clone)]
25pub struct QueryPlan {
26 pub collection: String,
27 pub filter: Option<ast::Filter>,
28 pub compiled_filter: Option<CompiledFilter>,
29 pub projection: Vec<ast::Field>,
30 pub limit: Option<usize>,
31 pub offset: usize,
32 pub after: Option<String>,
33 pub orderings: Vec<ast::Ordering>,
34 pub is_connection: bool,
35 pub has_lookups: bool,
36 pub fragments: HashMap<String, FragmentDef>,
37 pub variable_definitions: Vec<ast::VariableDefinition>,
38}
39
40impl QueryPlan {
41 pub fn validate(&self, provided_variables: &HashMap<String, ast::Value>) -> Result<()> {
42 validate_required_variables(&self.variable_definitions, provided_variables)
43 }
44
45 pub fn from_query(
46 query: &ast::Query,
47 fragments: &HashMap<String, FragmentDef>,
48 variables: &HashMap<String, ast::Value>,
49 ) -> Result<Vec<Self>> {
50 let root_fields = collect_fields(&query.selection_set, fragments, variables, None)?;
51
52 let mut plans = Vec::new();
53 for field in root_fields {
54 let collection = field.name.clone();
55 let filter = extract_filter_from_args(&field.arguments)?;
56 let compiled_filter = if let Some(ref f) = filter {
57 Some(compile_filter(f)?)
58 } else {
59 None
60 };
61
62 let sub_fields = collect_fields(
63 &field.selection_set,
64 fragments,
65 variables,
66 Some(&field.name),
67 )?;
68
69 let (limit, offset) = extract_pagination(&field.arguments);
70 let (first, after, _last, _before) = extract_cursor_pagination(&field.arguments);
71 let orderings = extract_order_by(&field.arguments);
72 let is_connection = sub_fields
73 .iter()
74 .any(|f| f.name == "edges" || f.name == "pageInfo");
75
76 let has_lookups = sub_fields.iter().any(|f| {
77 f.arguments.iter().any(|arg| {
78 arg.name == "collection"
79 || arg.name == "localField"
80 || arg.name == "foreignField"
81 })
82 });
83
84 plans.push(QueryPlan {
85 collection,
86 filter,
87 compiled_filter,
88 projection: sub_fields,
89 limit: limit.or(first),
90 offset,
91 after,
92 orderings,
93 is_connection,
94 has_lookups,
95 fragments: fragments.clone(),
96 variable_definitions: query.variable_definitions.clone(),
97 });
98 }
99 Ok(plans)
100 }
101}
102
103pub async fn execute(db: &Aurora, aql: &str, options: ExecutionOptions) -> Result<ExecutionResult> {
105 use std::collections::hash_map::DefaultHasher;
106 use std::hash::{Hash, Hasher};
107
108 let query_key = {
110 let mut hasher = DefaultHasher::new();
111 aql.trim().hash(&mut hasher);
112 hasher.finish()
113 };
114
115 let vars: HashMap<String, ast::Value> = options
117 .variables
118 .iter()
119 .map(|(k, v)| (k.clone(), json_to_aql_value(v.clone())))
120 .collect();
121
122 if let Some(plan) = db.plan_cache.get(&query_key) {
125 plan.validate(&vars)?;
126 return execute_plan(db, &plan, &vars, &options).await;
127 }
128
129 let mut doc = super::parse(aql)?;
131
132 if let Some(Operation::Query(query)) = doc
134 .operations
135 .iter()
136 .find(|op| matches!(op, Operation::Query(_)))
137 {
138 let fragments: HashMap<String, FragmentDef> = doc
139 .operations
140 .iter()
141 .filter_map(|op| {
142 if let Operation::FragmentDefinition(f) = op {
143 Some((f.name.clone(), f.clone()))
144 } else {
145 None
146 }
147 })
148 .collect();
149
150 let root_fields = collect_fields(&query.selection_set, &fragments, &vars, None)?;
155 let has_search = root_fields.iter().any(|f| f.arguments.iter().any(|a| a.name == "search"));
156
157 if !has_search {
158 let plans = QueryPlan::from_query(query, &fragments, &vars)?;
159 if plans.len() == 1 {
160 let plan = Arc::new(plans[0].clone());
161 plan.validate(&vars)?;
162 db.plan_cache.insert(query_key, Arc::clone(&plan));
163
164 return execute_plan(db, &plan, &vars, &options).await;
165 }
166 }
167 }
168
169 let mut vars = vars;
172 for op in &doc.operations {
173 if let Operation::Query(q) = op {
174 for var_def in &q.variable_definitions {
175 if !vars.contains_key(&var_def.name) {
176 if let Some(default) = &var_def.default_value {
177 vars.insert(var_def.name.clone(), default.clone());
178 }
179 }
180 }
181 }
182 }
183
184 super::validator::resolve_variables(&mut doc, &vars).map_err(|e| {
186 let code = match e.code {
187 super::validator::ErrorCode::MissingRequiredVariable => ErrorCode::UndefinedVariable,
188 super::validator::ErrorCode::TypeMismatch => ErrorCode::TypeError,
189 _ => ErrorCode::QueryError,
190 };
191 AqlError::new(code, e.to_string())
192 })?;
193
194 execute_document(db, &doc, &options).await
195}
196
197async fn execute_plan(
198 db: &Aurora,
199 plan: &QueryPlan,
200 variables: &HashMap<String, ast::Value>,
201 options: &ExecutionOptions,
202) -> Result<ExecutionResult> {
203 let collection_name = &plan.collection;
204
205 let effective_limit = plan.limit;
207
208 let mut indexed_docs = None;
210 if let Some(ref f) = plan.filter {
211 if let Some((field, val)) =
213 find_indexed_equality_filter_runtime(f, db, collection_name, variables)
214 {
215 let index_ready = field == "_sid" || !db.is_index_building(collection_name, &field);
218 if index_ready {
219 let db_val = aql_value_to_db_value(&val, variables)?;
220 let ids = if field == "_sid" {
221 match &db_val {
222 Value::String(s) => vec![s.clone()],
223 Value::Uuid(u) => vec![u.to_string()],
224 _ => vec![],
225 }
226 } else {
227 db.get_ids_from_index(collection_name, &field, &db_val)
228 };
229
230 let mut docs = Vec::with_capacity(ids.len());
231 for id in ids {
232 if let Some(doc) = db.get_document(collection_name, &id)? {
233 if let Some(ref cf) = plan.compiled_filter {
235 if matches_filter(&doc, cf, variables) {
236 docs.push(doc);
237 }
238 } else {
239 docs.push(doc);
240 }
241 }
242 }
243
244 indexed_docs = Some(docs);
245 } }
247 }
248
249 let mut docs = if let Some(d) = indexed_docs {
250 d
251 } else {
252 let vars_arc = Arc::new(variables.clone());
254 let cf_clone = plan.compiled_filter.clone();
255 let filter_fn = move |doc: &Document| {
256 cf_clone
257 .as_ref()
258 .map(|f| matches_filter(doc, f, &vars_arc))
259 .unwrap_or(true)
260 };
261
262 let scan_limit = if plan.after.is_some() || !plan.orderings.is_empty() {
265 None
266 } else {
267 plan.limit.map(|l| {
268 let base = if plan.is_connection { l + 1 } else { l };
269 base + plan.offset
270 })
271 };
272 db.scan_and_filter(collection_name, filter_fn, scan_limit)?
273 };
274
275 if !plan.orderings.is_empty() {
277 apply_ordering(&mut docs, &plan.orderings);
278 }
279
280 if let Some(ref cursor) = plan.after {
282 if let Some(pos) = docs.iter().position(|d| &d._sid == cursor) {
283 docs.drain(0..=pos);
284 }
285 }
286
287 if plan.offset > 0 {
289 if plan.offset < docs.len() {
290 docs.drain(0..plan.offset);
291 } else {
292 docs.clear();
293 }
294 }
295
296 if plan.is_connection {
298 return Ok(ExecutionResult::Query(execute_connection(
299 docs,
300 &plan.projection,
301 effective_limit,
302 &plan.fragments,
303 variables,
304 options,
305 )));
306 }
307
308 if let Some(l) = effective_limit {
310 docs.truncate(l);
311 }
312
313 if options.apply_projections && !plan.projection.is_empty() {
315 let agg_field = plan.projection.iter().find(|f| f.name == "aggregate");
317 let group_by_field = plan.projection.iter().find(|f| f.name == "groupBy");
318
319 if let Some(f) = group_by_field {
320 let field_name = f
322 .arguments
323 .iter()
324 .find(|a| a.name == "field")
325 .and_then(|a| match &a.value {
326 ast::Value::String(s) => Some(s),
327 _ => None,
328 });
329
330 if let Some(group_field) = field_name {
331 let mut groups: HashMap<String, Vec<Document>> = HashMap::new();
332 for d in docs {
333 let key = d
334 .data
335 .get(group_field)
336 .map(|v| match v {
337 Value::String(s) => s.clone(),
338 _ => v.to_string(),
339 })
340 .unwrap_or_else(|| "null".to_string());
341 groups.entry(key).or_default().push(d);
342 }
343
344 let mut group_docs = Vec::with_capacity(groups.len());
345 for (key, group_items) in groups {
346 let mut data = HashMap::new();
347 for selection in &f.selection_set {
348 if let Selection::Field(sub_f) = selection {
349 let alias = sub_f.alias.as_ref().unwrap_or(&sub_f.name);
350 match sub_f.name.as_str() {
351 "key" => {
352 data.insert(alias.clone(), Value::String(key.clone()));
353 }
354 "count" => {
355 data.insert(
356 alias.clone(),
357 Value::Int(group_items.len() as i64),
358 );
359 }
360 "nodes" => {
361 let sub_fields = collect_fields(
362 &sub_f.selection_set,
363 &plan.fragments,
364 variables,
365 None,
366 )
367 .unwrap_or_default();
368
369 let projected_nodes = group_items
370 .iter()
371 .map(|d| {
372 let node_data =
373 apply_projection(d.clone(), &sub_fields, options).data;
374 Value::Object(node_data)
375 })
376 .collect();
377 data.insert(alias.clone(), Value::Array(projected_nodes));
378 }
379 "aggregate" => {
380 let agg_res =
381 compute_aggregates(&group_items, &sub_f.selection_set);
382 data.insert(alias.clone(), Value::Object(agg_res));
383 }
384 _ => {}
385 }
386 }
387 }
388 group_docs.push(Document {
389 _sid: format!("group:{}", key),
390 data,
391 });
392 }
393 docs = group_docs;
394 }
395 } else if let Some(agg) = agg_field {
396 let agg_result = compute_aggregates(&docs, &agg.selection_set);
400 let alias = agg.alias.as_ref().unwrap_or(&agg.name);
401
402 if plan.projection.len() == 1 {
403 let mut data = HashMap::new();
404 data.insert(alias.clone(), Value::Object(agg_result));
405 docs = vec![Document {
406 _sid: "aggregate".to_string(),
407 data,
408 }];
409 } else {
410 docs = docs
412 .into_iter()
413 .map(|mut d| {
414 d.data
415 .insert(alias.clone(), Value::Object(agg_result.clone()));
416 apply_projection(d, &plan.projection, options)
417 })
418 .collect();
419 }
420 } else if !plan.has_lookups {
421 docs = docs
422 .into_iter()
423 .map(|d| apply_projection(d, &plan.projection, options))
424 .collect();
425 } else {
426 let mut projected = Vec::with_capacity(docs.len());
428 for d in docs {
429 let (proj_doc, _) =
430 apply_projection_with_lookups(db, d, &plan.projection, variables, options).await?;
431 projected.push(proj_doc);
432 }
433 docs = projected;
434 }
435 }
436
437 Ok(ExecutionResult::Query(QueryResult {
438 collection: collection_name.clone(),
439 documents: docs,
440 total_count: None,
441 deferred_fields: vec![],
442 explain: None,
443 }))
444}
445
446fn compute_aggregates(docs: &[Document], selections: &[Selection]) -> HashMap<String, Value> {
447 let mut results = HashMap::new();
448
449 for selection in selections {
450 if let Selection::Field(f) = selection {
451 let alias = f.alias.as_ref().unwrap_or(&f.name);
452 let value = match f.name.as_str() {
453 "count" => Value::Int(docs.len() as i64),
454 "sum" => {
455 let field =
456 f.arguments
457 .iter()
458 .find(|a| a.name == "field")
459 .and_then(|a| match &a.value {
460 ast::Value::String(s) => Some(s),
461 _ => None,
462 });
463
464 if let Some(field_name) = field {
465 let sum: f64 = docs
466 .iter()
467 .filter_map(|d| d.data.get(field_name))
468 .filter_map(|v| match v {
469 Value::Int(i) => Some(*i as f64),
470 Value::Float(f) => Some(*f),
471 _ => None,
472 })
473 .sum();
474 Value::Float(sum)
475 } else {
476 Value::Null
477 }
478 }
479 "avg" => {
480 let field =
481 f.arguments
482 .iter()
483 .find(|a| a.name == "field")
484 .and_then(|a| match &a.value {
485 ast::Value::String(s) => Some(s),
486 _ => None,
487 });
488
489 if let Some(field_name) = field
490 && !docs.is_empty()
491 {
492 let values: Vec<f64> = docs
493 .iter()
494 .filter_map(|d| d.data.get(field_name))
495 .filter_map(|v| match v {
496 Value::Int(i) => Some(*i as f64),
497 Value::Float(f) => Some(*f),
498 _ => None,
499 })
500 .collect();
501
502 if values.is_empty() {
503 Value::Null
504 } else {
505 let sum: f64 = values.iter().sum();
506 Value::Float(sum / values.len() as f64)
507 }
508 } else {
509 Value::Null
510 }
511 }
512 "min" => {
513 let field =
514 f.arguments
515 .iter()
516 .find(|a| a.name == "field")
517 .and_then(|a| match &a.value {
518 ast::Value::String(s) => Some(s),
519 _ => None,
520 });
521
522 if let Some(field_name) = field
523 && !docs.is_empty()
524 {
525 let min = docs
526 .iter()
527 .filter_map(|d| d.data.get(field_name))
528 .filter_map(|v| match v {
529 Value::Int(i) => Some(*i as f64),
530 Value::Float(f) => Some(*f),
531 _ => None,
532 })
533 .fold(f64::INFINITY, f64::min);
534
535 if min == f64::INFINITY {
536 Value::Null
537 } else {
538 Value::Float(min)
539 }
540 } else {
541 Value::Null
542 }
543 }
544 "max" => {
545 let field =
546 f.arguments
547 .iter()
548 .find(|a| a.name == "field")
549 .and_then(|a| match &a.value {
550 ast::Value::String(s) => Some(s),
551 _ => None,
552 });
553
554 if let Some(field_name) = field
555 && !docs.is_empty()
556 {
557 let max = docs
558 .iter()
559 .filter_map(|d| d.data.get(field_name))
560 .filter_map(|v| match v {
561 Value::Int(i) => Some(*i as f64),
562 Value::Float(f) => Some(*f),
563 _ => None,
564 })
565 .fold(f64::NEG_INFINITY, f64::max);
566
567 if max == f64::NEG_INFINITY {
568 Value::Null
569 } else {
570 Value::Float(max)
571 }
572 } else {
573 Value::Null
574 }
575 }
576 _ => Value::Null,
577 };
578 results.insert(alias.clone(), value);
579 }
580 }
581
582 results
583}
584
585fn find_indexed_equality_filter_runtime(
586 filter: &ast::Filter,
587 db: &Aurora,
588 collection: &str,
589 variables: &HashMap<String, ast::Value>,
590) -> Option<(String, ast::Value)> {
591 match filter {
592 ast::Filter::Eq(field, val) => {
593 if field == "_sid" || db.has_index(collection, field) {
594 let resolved = resolve_if_variable(val, variables);
595 return Some((field.clone(), resolved.clone()));
596 }
597 }
598 ast::Filter::And(filters) => {
599 for f in filters {
600 if let Some(res) =
601 find_indexed_equality_filter_runtime(f, db, collection, variables)
602 {
603 return Some(res);
604 }
605 }
606 }
607 _ => {}
608 }
609 None
610}
611
612fn collect_fields(
614 selection_set: &[Selection],
615 fragments: &HashMap<String, FragmentDef>,
616 variable_values: &HashMap<String, ast::Value>,
617 parent_type: Option<&str>,
618) -> Result<Vec<Field>> {
619 let mut fields = Vec::new();
620
621 for selection in selection_set {
622 match selection {
623 Selection::Field(field) => {
624 if should_include(&field.directives, variable_values)? {
625 fields.push(field.clone());
626 }
627 }
628 Selection::FragmentSpread(name) => {
629 if let Some(fragment) = fragments.get(name) {
630 let type_match = if let Some(parent) = parent_type {
631 parent == fragment.type_condition
632 } else {
633 true
634 };
635
636 if type_match {
637 let fragment_fields = collect_fields(
638 &fragment.selection_set,
639 fragments,
640 variable_values,
641 parent_type,
642 )?;
643 fields.extend(fragment_fields);
644 }
645 }
646 }
647 Selection::InlineFragment(inline) => {
648 let type_match = if let Some(parent) = parent_type {
649 parent == inline.type_condition
650 } else {
651 true
652 };
653
654 if type_match {
655 let inline_fields = collect_fields(
656 &inline.selection_set,
657 fragments,
658 variable_values,
659 parent_type,
660 )?;
661 fields.extend(inline_fields);
662 }
663 }
664 }
665 }
666
667 Ok(fields)
668}
669
670fn should_include(
672 directives: &[ast::Directive],
673 variables: &HashMap<String, ast::Value>,
674) -> Result<bool> {
675 for dir in directives {
676 if dir.name == "skip" {
677 if let Some(arg) = dir.arguments.iter().find(|a| a.name == "if") {
678 let should_skip = resolve_boolean_arg(&arg.value, variables)?;
679 if should_skip {
680 return Ok(false);
681 }
682 }
683 } else if dir.name == "include" {
684 if let Some(arg) = dir.arguments.iter().find(|a| a.name == "if") {
685 let should_include = resolve_boolean_arg(&arg.value, variables)?;
686 if !should_include {
687 return Ok(false);
688 }
689 }
690 }
691 }
692 Ok(true)
693}
694
695fn resolve_boolean_arg(
696 value: &ast::Value,
697 variables: &HashMap<String, ast::Value>,
698) -> Result<bool> {
699 match value {
700 ast::Value::Boolean(b) => Ok(*b),
701 ast::Value::Variable(name) => {
702 if let Some(val) = variables.get(name) {
703 match val {
704 ast::Value::Boolean(b) => Ok(*b),
705 _ => Err(AqlError::new(
706 ErrorCode::TypeError,
707 format!("Variable '{}' is not a boolean, got {:?}", name, val),
708 )),
709 }
710 } else {
711 Err(AqlError::new(
712 ErrorCode::UndefinedVariable,
713 format!("Variable '{}' is not defined", name),
714 ))
715 }
716 }
717 _ => Err(AqlError::new(
718 ErrorCode::TypeError,
719 format!("Expected boolean value, got {:?}", value),
720 )),
721 }
722}
723
724fn validate_required_variables(
726 variable_definitions: &[ast::VariableDefinition],
727 provided_variables: &HashMap<String, ast::Value>,
728) -> Result<()> {
729 for var_def in variable_definitions {
730 if var_def.var_type.is_required {
731 if !provided_variables.contains_key(&var_def.name) {
732 if var_def.default_value.is_none() {
733 return Err(AqlError::new(
734 ErrorCode::UndefinedVariable,
735 format!(
736 "Required variable '{}' (type: {}{}) is not provided",
737 var_def.name,
738 var_def.var_type.name,
739 if var_def.var_type.is_required {
740 "!"
741 } else {
742 ""
743 }
744 ),
745 ));
746 }
747 }
748 }
749 }
750 Ok(())
751}
752
753#[derive(Debug)]
755pub enum ExecutionResult {
756 Query(QueryResult),
757 Mutation(MutationResult),
758 Subscription(SubscriptionResult),
759 Batch(Vec<ExecutionResult>),
760 Schema(SchemaResult),
761 Migration(MigrationResult),
762}
763
764#[derive(Debug, Clone)]
765pub struct SchemaResult {
766 pub operation: String,
767 pub collection: String,
768 pub status: String,
769}
770
771#[derive(Debug, Clone)]
772pub struct MigrationResult {
773 pub version: String,
774 pub steps_applied: usize,
775 pub status: String,
776}
777
778#[derive(Debug, Clone, Serialize)]
779pub struct ExecutionPlan {
780 pub operations: Vec<String>,
781 pub estimated_cost: f64,
782}
783
784#[derive(Debug, Clone)]
786pub struct QueryResult {
787 pub collection: String,
788 pub documents: Vec<Document>,
789 pub total_count: Option<usize>,
790 pub deferred_fields: Vec<String>,
792 pub explain: Option<ExplainResult>,
794}
795
796#[derive(Debug, Clone, Default)]
798pub struct ExplainResult {
799 pub collection: String,
800 pub docs_scanned: usize,
801 pub index_used: bool,
802 pub elapsed_ms: u128,
803}
804
805#[derive(Debug, Clone)]
807pub struct MutationResult {
808 pub operation: String,
809 pub collection: String,
810 pub affected_count: usize,
811 pub returned_documents: Vec<Document>,
812}
813
814#[derive(Debug)]
816pub struct SubscriptionResult {
817 pub subscription_id: String,
818 pub collection: String,
819 pub stream: Option<crate::pubsub::ChangeListener>,
820}
821
822#[derive(Debug, Clone)]
824pub struct ExecutionOptions {
825 pub skip_validation: bool,
826 pub apply_projections: bool,
827 pub debug_audit: bool,
828 pub variables: HashMap<String, JsonValue>,
829}
830
831impl Default for ExecutionOptions {
832 fn default() -> Self {
833 Self {
834 skip_validation: false,
835 apply_projections: true,
836 debug_audit: false,
837 variables: HashMap::new(),
838 }
839 }
840}
841
842impl ExecutionOptions {
843 pub fn new() -> Self {
844 Self::default()
845 }
846
847 pub fn with_variables(mut self, vars: HashMap<String, JsonValue>) -> Self {
848 self.variables = vars;
849 self
850 }
851
852 pub fn skip_validation(mut self) -> Self {
853 self.skip_validation = true;
854 self
855 }
856}
857
858fn json_to_aql_value(v: serde_json::Value) -> ast::Value {
859 match v {
860 serde_json::Value::Null => ast::Value::Null,
861 serde_json::Value::Bool(b) => ast::Value::Boolean(b),
862 serde_json::Value::Number(n) => {
863 if let Some(i) = n.as_i64() {
864 ast::Value::Int(i)
865 } else if let Some(f) = n.as_f64() {
866 ast::Value::Float(f)
867 } else {
868 ast::Value::Null
869 }
870 }
871 serde_json::Value::String(s) => ast::Value::String(s),
872 serde_json::Value::Array(arr) => {
873 ast::Value::Array(arr.into_iter().map(json_to_aql_value).collect())
874 }
875 serde_json::Value::Object(map) => ast::Value::Object(
876 map.into_iter()
877 .map(|(k, v)| (k, json_to_aql_value(v)))
878 .collect(),
879 ),
880 }
881}
882
883pub async fn execute_document(
885 db: &Aurora,
886 doc: &ast::Document,
887 options: &ExecutionOptions,
888) -> Result<ExecutionResult> {
889 if doc.operations.is_empty() {
890 return Err(AqlError::new(
891 ErrorCode::QueryError,
892 "No operations in document".to_string(),
893 ));
894 }
895
896 let vars: HashMap<String, ast::Value> = options
897 .variables
898 .iter()
899 .map(|(k, v)| (k.clone(), json_to_aql_value(v.clone())))
900 .collect();
901
902 let fragments: HashMap<String, FragmentDef> = doc
903 .operations
904 .iter()
905 .filter_map(|op| {
906 if let Operation::FragmentDefinition(frag) = op {
907 Some((frag.name.clone(), frag.clone()))
908 } else {
909 None
910 }
911 })
912 .collect();
913
914 let executable_ops: Vec<&Operation> = doc
915 .operations
916 .iter()
917 .filter(|op| !matches!(op, Operation::FragmentDefinition(_)))
918 .collect();
919
920 if executable_ops.is_empty() {
921 return Err(AqlError::new(
922 ErrorCode::QueryError,
923 "No executable operations in document".to_string(),
924 ));
925 }
926
927 if executable_ops.len() == 1 {
928 execute_operation(db, executable_ops[0], &vars, options, &fragments).await
929 } else {
930 let mut results = Vec::new();
931 for op in executable_ops {
932 results.push(execute_operation(db, op, &vars, options, &fragments).await?);
933 }
934 Ok(ExecutionResult::Batch(results))
935 }
936}
937
938async fn execute_operation(
939 db: &Aurora,
940 op: &Operation,
941 vars: &HashMap<String, ast::Value>,
942 options: &ExecutionOptions,
943 fragments: &HashMap<String, FragmentDef>,
944) -> Result<ExecutionResult> {
945 match op {
946 Operation::Query(query) => execute_query(db, query, vars, options, fragments).await,
947 Operation::Mutation(mutation) => {
948 execute_mutation(db, mutation, vars, options, fragments).await
949 }
950 Operation::Subscription(sub) => execute_subscription(db, sub, vars, options).await,
951 Operation::Schema(schema) => execute_schema(db, schema, options).await,
952 Operation::Migration(migration) => execute_migration(db, migration, options).await,
953 Operation::Introspection(intro) => execute_introspection(db, intro).await,
954 Operation::Handler(handler) => execute_handler_registration(db, handler, options).await,
955 _ => Ok(ExecutionResult::Query(QueryResult {
956 collection: String::new(),
957 documents: vec![],
958 total_count: None,
959 deferred_fields: vec![],
960 explain: None,
961 })),
962 }
963}
964
965async fn execute_query(
966 db: &Aurora,
967 query: &ast::Query,
968 vars: &HashMap<String, ast::Value>,
969 options: &ExecutionOptions,
970 fragments: &HashMap<String, FragmentDef>,
971) -> Result<ExecutionResult> {
972 validate_required_variables(&query.variable_definitions, vars)?;
973 let has_explain = query.directives.iter().any(|d| d.name == "explain");
974 let root_fields = collect_fields(&query.selection_set, fragments, vars, None)?;
975 let mut results = Vec::new();
976 for field in &root_fields {
977 let sub_fields = collect_fields(&field.selection_set, fragments, vars, Some(&field.name))?;
978 let start = std::time::Instant::now();
979 let mut result =
980 execute_collection_query(db, field, &sub_fields, vars, options, fragments).await?;
981 if has_explain {
982 let elapsed_ms = start.elapsed().as_millis();
983 let index_used =
984 field.arguments.iter().any(|a| a.name == "where") && !result.documents.is_empty();
985 result.explain = Some(ExplainResult {
986 collection: result.collection.clone(),
987 docs_scanned: result.documents.len(),
988 index_used,
989 elapsed_ms,
990 });
991 }
992 results.push(result);
993 }
994 if results.len() == 1 {
995 Ok(ExecutionResult::Query(results.remove(0)))
996 } else {
997 Ok(ExecutionResult::Batch(
998 results.into_iter().map(ExecutionResult::Query).collect(),
999 ))
1000 }
1001}
1002
1003async fn execute_collection_query(
1004 db: &Aurora,
1005 field: &ast::Field,
1006 sub_fields: &[ast::Field],
1007 variables: &HashMap<String, ast::Value>,
1008 options: &ExecutionOptions,
1009 fragments: &HashMap<String, FragmentDef>,
1010) -> Result<QueryResult> {
1011 let collection_name = &field.name;
1012
1013 if let Some(search_arg) = field.arguments.iter().find(|a| a.name == "search") {
1016 return execute_search_query(
1017 db,
1018 collection_name,
1019 search_arg,
1020 sub_fields,
1021 field,
1022 variables,
1023 options,
1024 )
1025 .await;
1026 }
1027
1028 let filter = extract_filter_from_args(&field.arguments)?;
1029 let (limit, offset) = extract_pagination(&field.arguments);
1030 let (first, after, _last, _before) = extract_cursor_pagination(&field.arguments);
1031 let compiled_filter = if let Some(ref f) = filter {
1032 Some(compile_filter(f)?)
1033 } else {
1034 None
1035 };
1036 let vars_arc = Arc::new(variables.clone());
1037 let filter_fn = move |doc: &Document| {
1038 compiled_filter
1039 .as_ref()
1040 .map(|f| matches_filter(doc, f, &vars_arc))
1041 .unwrap_or(true)
1042 };
1043
1044 let indexed_docs = if let Some(ref f) = filter {
1045 match find_indexed_equality_filter(f, db, collection_name) {
1046 Some((field_name, val))
1047 if field_name == "_sid"
1048 || !db.is_index_building(collection_name, &field_name) =>
1049 {
1050 let db_val = aql_value_to_db_value(&val, variables)?;
1051 let ids = if field_name == "_sid" {
1052 match &db_val {
1053 Value::String(s) => vec![s.clone()],
1054 Value::Uuid(u) => vec![u.to_string()],
1055 _ => vec![],
1056 }
1057 } else {
1058 db.get_ids_from_index(collection_name, &field_name, &db_val)
1059 };
1060
1061 let mut docs = Vec::with_capacity(ids.len());
1062 for id in ids {
1063 if let Some(doc) = db.get_document(collection_name, &id)? {
1064 if filter_fn(&doc) {
1065 docs.push(doc);
1066 }
1067 }
1068 }
1069 Some(docs)
1070 }
1071 _ => None,
1073 }
1074 } else {
1075 None
1076 };
1077
1078 let is_connection = sub_fields
1079 .iter()
1080 .any(|f| f.name == "edges" || f.name == "pageInfo");
1081
1082 let orderings = extract_order_by(&field.arguments);
1083
1084 let mut docs = if let Some(docs) = indexed_docs {
1085 docs
1086 } else {
1087 let scan_limit = if after.is_some() || !orderings.is_empty() {
1091 None
1092 } else {
1093 limit.or(first).map(|l| {
1094 let base = if is_connection { l + 1 } else { l };
1095 base + offset
1096 })
1097 };
1098 db.scan_and_filter(collection_name, filter_fn, scan_limit)?
1099 };
1100
1101 if let Some(validate_arg) = field.arguments.iter().find(|a| a.name == "validate") {
1104 docs.retain(|doc| doc_passes_validate_arg(doc, validate_arg));
1105 }
1106
1107 if !orderings.is_empty() {
1109 apply_ordering(&mut docs, &orderings);
1110 }
1111
1112 if let Some(ref cursor) = after {
1114 if let Some(pos) = docs.iter().position(|d| &d._sid == cursor) {
1115 docs.drain(0..=pos);
1116 }
1117 }
1118
1119 if is_connection {
1120 return Ok(execute_connection(
1121 docs,
1122 sub_fields,
1123 limit.or(first),
1124 fragments,
1125 variables,
1126 options,
1127 ));
1128 }
1129
1130 if offset > 0 {
1132 if offset < docs.len() {
1133 docs.drain(0..offset);
1134 } else {
1135 docs.clear();
1136 }
1137 }
1138
1139 if let Some(l) = limit.or(first) {
1141 docs.truncate(l);
1142 }
1143
1144 let has_lookups = sub_fields.iter().any(|f| {
1146 f.arguments
1147 .iter()
1148 .any(|a| a.name == "collection" || a.name == "localField")
1149 });
1150
1151 let mut deferred_fields = Vec::new();
1152
1153 if options.apply_projections && !sub_fields.is_empty() {
1154 if has_lookups {
1155 let mut projected = Vec::with_capacity(docs.len());
1156 for d in docs {
1157 let (proj_doc, deferred) =
1158 apply_projection_with_lookups(db, d, sub_fields, variables, options).await?;
1159 projected.push(proj_doc);
1160 if deferred_fields.is_empty() {
1161 deferred_fields = deferred;
1162 }
1163 }
1164 docs = projected;
1165 } else {
1166 let mut all_deferred = Vec::new();
1167 docs = docs
1168 .into_iter()
1169 .map(|d| {
1170 let (proj, deferred) = apply_projection_and_defer(d, sub_fields, options);
1171 if all_deferred.is_empty() && !deferred.is_empty() {
1172 all_deferred = deferred;
1173 }
1174 proj
1175 })
1176 .collect();
1177 deferred_fields = all_deferred;
1178 }
1179 }
1180
1181 for sf in sub_fields {
1183 if sf.name == "windowFunc" {
1184 let alias = sf.alias.as_ref().unwrap_or(&sf.name).clone();
1185 let wfield = arg_string(&sf.arguments, "field").unwrap_or_default();
1186 let func = arg_string(&sf.arguments, "function").unwrap_or_else(|| "avg".to_string());
1187 let wsize = arg_i64(&sf.arguments, "windowSize").unwrap_or(3) as usize;
1188 apply_window_function(&mut docs, &alias, &wfield, &func, wsize);
1189 }
1190 }
1191
1192 if let Some(ds_field) = sub_fields.iter().find(|f| f.name == "downsample") {
1194 let interval =
1195 arg_string(&ds_field.arguments, "interval").unwrap_or_else(|| "1h".to_string());
1196 let aggregation =
1197 arg_string(&ds_field.arguments, "aggregation").unwrap_or_else(|| "avg".to_string());
1198 let ds_sub: Vec<String> =
1199 collect_fields(&ds_field.selection_set, fragments, variables, None)
1200 .unwrap_or_default()
1201 .iter()
1202 .map(|f| f.name.clone())
1203 .collect();
1204 docs = apply_downsample(docs, &interval, &aggregation, &ds_sub);
1205 }
1206
1207 Ok(QueryResult {
1208 collection: collection_name.clone(),
1209 documents: docs,
1210 total_count: None,
1211 deferred_fields,
1212 explain: None,
1213 })
1214}
1215
1216async fn execute_search_query(
1218 db: &Aurora,
1219 collection: &str,
1220 search_arg: &ast::Argument,
1221 sub_fields: &[ast::Field],
1222 field: &ast::Field,
1223 variables: &HashMap<String, ast::Value>,
1224 options: &ExecutionOptions,
1225) -> Result<QueryResult> {
1226 let resolved_search_val = resolve_ast_deep(&search_arg.value, variables);
1228 let (query_str, search_fields, fuzzy) = extract_search_params(&resolved_search_val);
1229 let (limit, _) = extract_pagination(&field.arguments);
1230
1231 let mut builder = db.search(collection).query(&query_str);
1232 if fuzzy {
1233 builder = builder.fuzzy(1);
1234 }
1235 if let Some(l) = limit {
1236 builder = builder.limit(l);
1237 }
1238
1239 let mut docs = builder
1240 .collect_with_fields(if search_fields.is_empty() {
1241 None
1242 } else {
1243 Some(&search_fields)
1244 })
1245 .await?;
1246
1247 if options.apply_projections && !sub_fields.is_empty() {
1248 docs = docs
1249 .into_iter()
1250 .map(|d| {
1251 let (proj, _) = apply_projection_and_defer(d, sub_fields, options);
1252 proj
1253 })
1254 .collect();
1255 }
1256
1257 Ok(QueryResult {
1258 collection: collection.to_string(),
1259 documents: docs,
1260 total_count: None,
1261 deferred_fields: vec![],
1262 explain: None,
1263 })
1264}
1265
1266fn extract_search_params(v: &ast::Value) -> (String, Vec<String>, bool) {
1267 let mut query = String::new();
1268 let mut fields = Vec::new();
1269 let mut fuzzy = false;
1270 if let ast::Value::Object(m) = v {
1271 if let Some(ast::Value::String(q)) = m.get("query") {
1272 query = q.clone();
1273 }
1274 if let Some(ast::Value::Array(arr)) = m.get("fields") {
1275 for item in arr {
1276 if let ast::Value::String(s) = item {
1277 fields.push(s.clone());
1278 }
1279 }
1280 }
1281 if let Some(ast::Value::Boolean(b)) = m.get("fuzzy") {
1282 fuzzy = *b;
1283 }
1284 }
1285 (query, fields, fuzzy)
1286}
1287
1288fn doc_passes_validate_arg(doc: &Document, validate_arg: &ast::Argument) -> bool {
1289 if let ast::Value::Object(rules) = &validate_arg.value {
1290 for (field_name, constraints_val) in rules {
1291 if let ast::Value::Object(constraints) = constraints_val {
1292 if let Some(field_val) = doc.data.get(field_name) {
1293 for (constraint_name, constraint_val) in constraints {
1294 if !check_inline_constraint(field_val, constraint_name, constraint_val) {
1295 return false;
1296 }
1297 }
1298 }
1299 }
1300 }
1301 }
1302 true
1303}
1304
1305fn check_inline_constraint(value: &Value, constraint: &str, constraint_val: &ast::Value) -> bool {
1306 match constraint {
1307 "format" => {
1308 if let (Value::String(s), ast::Value::String(fmt)) = (value, constraint_val) {
1309 return match fmt.as_str() {
1310 "email" => {
1311 s.contains('@')
1312 && s.split('@')
1313 .nth(1)
1314 .map(|d| d.contains('.'))
1315 .unwrap_or(false)
1316 }
1317 "url" => s.starts_with("http://") || s.starts_with("https://"),
1318 "uuid" => uuid::Uuid::parse_str(s).is_ok(),
1319 _ => true,
1320 };
1321 }
1322 true
1323 }
1324 "min" => {
1325 let n = match value {
1326 Value::Int(i) => *i as f64,
1327 Value::Float(f) => *f,
1328 _ => return true,
1329 };
1330 let min = match constraint_val {
1331 ast::Value::Float(f) => *f,
1332 ast::Value::Int(i) => *i as f64,
1333 _ => return true,
1334 };
1335 n >= min
1336 }
1337 "max" => {
1338 let n = match value {
1339 Value::Int(i) => *i as f64,
1340 Value::Float(f) => *f,
1341 _ => return true,
1342 };
1343 let max = match constraint_val {
1344 ast::Value::Float(f) => *f,
1345 ast::Value::Int(i) => *i as f64,
1346 _ => return true,
1347 };
1348 n <= max
1349 }
1350 "minLength" => {
1351 if let (Value::String(s), ast::Value::Int(n)) = (value, constraint_val) {
1352 return s.len() >= *n as usize;
1353 }
1354 true
1355 }
1356 "maxLength" => {
1357 if let (Value::String(s), ast::Value::Int(n)) = (value, constraint_val) {
1358 return s.len() <= *n as usize;
1359 }
1360 true
1361 }
1362 "pattern" => {
1363 if let (Value::String(s), ast::Value::String(pat)) = (value, constraint_val) {
1364 if let Ok(re) = regex::Regex::new(pat) {
1365 return re.is_match(s);
1366 }
1367 }
1368 true
1369 }
1370 _ => true,
1371 }
1372}
1373
1374fn arg_string(args: &[ast::Argument], name: &str) -> Option<String> {
1375 args.iter().find(|a| a.name == name).and_then(|a| {
1376 if let ast::Value::String(s) = &a.value {
1377 Some(s.clone())
1378 } else {
1379 None
1380 }
1381 })
1382}
1383
1384fn arg_i64(args: &[ast::Argument], name: &str) -> Option<i64> {
1385 args.iter().find(|a| a.name == name).and_then(|a| {
1386 if let ast::Value::Int(i) = &a.value {
1387 Some(*i)
1388 } else {
1389 None
1390 }
1391 })
1392}
1393
1394fn apply_projection_and_defer(
1396 mut doc: Document,
1397 fields: &[ast::Field],
1398 _options: &ExecutionOptions,
1399) -> (Document, Vec<String>) {
1400 if fields.is_empty() {
1401 return (doc, vec![]);
1402 }
1403 let mut proj = HashMap::new();
1404 let mut deferred = Vec::new();
1405
1406 for f in fields {
1407 if f.directives.iter().any(|d| d.name == "defer") {
1409 deferred.push(f.alias.as_ref().unwrap_or(&f.name).clone());
1410 continue;
1411 }
1412 if f.name == "__compute__" {
1414 let alias = f.alias.as_deref().unwrap_or("computed");
1415 if let Some(expr) = f.arguments.iter().find(|a| a.name == "expr") {
1416 if let ast::Value::String(template) = &expr.value {
1417 let result = eval_template(template, &doc.data);
1418 proj.insert(alias.to_string(), Value::String(result));
1419 }
1420 }
1421 continue;
1422 }
1423 if f.name == "id" {
1424 if let Some(v) = doc.data.get("id") {
1426 proj.insert(f.alias.as_ref().unwrap_or(&f.name).clone(), v.clone());
1427 } else {
1428 proj.insert(f.alias.as_ref().unwrap_or(&f.name).clone(), Value::Null);
1430 }
1431 } else if f.name == "_sid" {
1432 proj.insert(
1433 f.alias.as_ref().unwrap_or(&f.name).clone(),
1434 Value::String(doc._sid.clone()),
1435 );
1436 } else if let Some(v) = doc.data.get(&f.name) {
1437 proj.insert(f.alias.as_ref().unwrap_or(&f.name).clone(), v.clone());
1438 }
1439 }
1440 doc.data = proj;
1441 (doc, deferred)
1442}
1443
1444fn eval_template(template: &str, data: &HashMap<String, Value>) -> String {
1446 let mut result = template.to_string();
1447 for (k, v) in data {
1448 let placeholder = format!("${}", k);
1449 if result.contains(&placeholder) {
1450 result = result.replace(&placeholder, &v.to_string());
1451 }
1452 }
1453 result
1454}
1455
1456fn apply_window_function(
1458 docs: &mut Vec<Document>,
1459 alias: &str,
1460 field: &str,
1461 function: &str,
1462 window: usize,
1463) {
1464 if docs.is_empty() || window == 0 {
1465 return;
1466 }
1467 let values: Vec<Option<f64>> = docs
1468 .iter()
1469 .map(|d| match d.data.get(field) {
1470 Some(Value::Int(i)) => Some(*i as f64),
1471 Some(Value::Float(f)) => Some(*f),
1472 _ => None,
1473 })
1474 .collect();
1475
1476 for (i, doc) in docs.iter_mut().enumerate() {
1477 let start = if i + 1 >= window { i + 1 - window } else { 0 };
1478 let window_vals: Vec<f64> = values[start..=i].iter().filter_map(|v| *v).collect();
1479 if window_vals.is_empty() {
1480 continue;
1481 }
1482 let result = match function {
1483 "rollingAvg" | "avg" => window_vals.iter().sum::<f64>() / window_vals.len() as f64,
1484 "rollingSum" | "sum" => window_vals.iter().sum::<f64>(),
1485 "rollingMin" | "min" => window_vals.iter().cloned().fold(f64::INFINITY, f64::min),
1486 "rollingMax" | "max" => window_vals
1487 .iter()
1488 .cloned()
1489 .fold(f64::NEG_INFINITY, f64::max),
1490 _ => window_vals.iter().sum::<f64>() / window_vals.len() as f64,
1491 };
1492 doc.data.insert(alias.to_string(), Value::Float(result));
1493 }
1494}
1495
1496fn apply_downsample(
1498 docs: Vec<Document>,
1499 interval: &str,
1500 aggregation: &str,
1501 value_fields: &[String],
1502) -> Vec<Document> {
1503 let interval_secs: i64 = parse_interval(interval);
1504 if interval_secs <= 0 {
1505 return docs;
1506 }
1507
1508 let mut buckets: std::collections::BTreeMap<i64, Vec<Document>> =
1510 std::collections::BTreeMap::new();
1511 let mut leftover = Vec::new();
1512
1513 for doc in docs {
1514 let ts = ["timestamp", "ts", "created_at", "time"]
1516 .iter()
1517 .find_map(|&k| doc.data.get(k))
1518 .and_then(|v| match v {
1519 Value::String(s) => chrono::DateTime::parse_from_rfc3339(s)
1520 .ok()
1521 .map(|dt| dt.timestamp()),
1522 Value::Int(i) => Some(*i),
1523 _ => None,
1524 });
1525
1526 if let Some(t) = ts {
1527 let bucket = (t / interval_secs) * interval_secs;
1528 buckets.entry(bucket).or_default().push(doc);
1529 } else {
1530 leftover.push(doc);
1531 }
1532 }
1533
1534 let mut result = Vec::new();
1535 for (bucket_ts, group) in buckets {
1536 let mut data = HashMap::new();
1537 data.insert(
1538 "timestamp".to_string(),
1539 Value::String(
1540 chrono::DateTime::from_timestamp(bucket_ts, 0)
1541 .map(|dt: chrono::DateTime<chrono::Utc>| dt.to_rfc3339())
1542 .unwrap_or_default(),
1543 ),
1544 );
1545 data.insert("count".to_string(), Value::Int(group.len() as i64));
1546
1547 for field in value_fields {
1548 if field == "timestamp" || field == "count" {
1549 continue;
1550 }
1551 let nums: Vec<f64> = group
1552 .iter()
1553 .filter_map(|d| match d.data.get(field) {
1554 Some(Value::Int(i)) => Some(*i as f64),
1555 Some(Value::Float(f)) => Some(*f),
1556 _ => None,
1557 })
1558 .collect();
1559 if nums.is_empty() {
1560 continue;
1561 }
1562 let agg = match aggregation {
1563 "sum" => nums.iter().sum::<f64>(),
1564 "min" => nums.iter().cloned().fold(f64::INFINITY, f64::min),
1565 "max" => nums.iter().cloned().fold(f64::NEG_INFINITY, f64::max),
1566 "count" => nums.len() as f64,
1567 _ => nums.iter().sum::<f64>() / nums.len() as f64, };
1569 data.insert(field.clone(), Value::Float(agg));
1570 }
1571
1572 result.push(Document {
1573 _sid: bucket_ts.to_string(),
1574 data,
1575 });
1576 }
1577
1578 result.extend(leftover);
1579 result
1580}
1581
1582fn parse_interval(s: &str) -> i64 {
1583 let s = s.trim();
1584 if s.ends_with('s') {
1585 s[..s.len() - 1].parse().unwrap_or(0)
1586 } else if s.ends_with('m') {
1587 s[..s.len() - 1].parse::<i64>().unwrap_or(0) * 60
1588 } else if s.ends_with('h') {
1589 s[..s.len() - 1].parse::<i64>().unwrap_or(0) * 3600
1590 } else if s.ends_with('d') {
1591 s[..s.len() - 1].parse::<i64>().unwrap_or(0) * 86400
1592 } else {
1593 s.parse().unwrap_or(3600)
1594 }
1595}
1596
1597async fn execute_handler_registration(
1600 db: &Aurora,
1601 handler: &ast::HandlerDef,
1602 _options: &ExecutionOptions,
1603) -> Result<ExecutionResult> {
1604 use crate::pubsub::events::ChangeType;
1605
1606 let collection = match &handler.trigger {
1607 ast::HandlerTrigger::Insert { collection }
1608 | ast::HandlerTrigger::Update { collection }
1609 | ast::HandlerTrigger::Delete { collection } => {
1610 collection.as_deref().unwrap_or("*").to_string()
1611 }
1612 _ => "*".to_string(),
1613 };
1614
1615 let trigger_type = match &handler.trigger {
1616 ast::HandlerTrigger::Insert { .. } => Some(ChangeType::Insert),
1617 ast::HandlerTrigger::Update { .. } => Some(ChangeType::Update),
1618 ast::HandlerTrigger::Delete { .. } => Some(ChangeType::Delete),
1619 _ => None,
1620 };
1621
1622 let mut listener = if collection == "*" {
1623 db.pubsub.listen_all()
1624 } else {
1625 db.pubsub.listen(collection.clone())
1626 };
1627
1628 let db_clone = db.clone();
1629 let action = handler.action.clone();
1630 let handler_name = handler.name.clone();
1631
1632 tokio::spawn(async move {
1633 loop {
1634 match listener.recv().await {
1635 Ok(event) => {
1636 let matches = trigger_type
1638 .as_ref()
1639 .map(|t| &event.change_type == t)
1640 .unwrap_or(true);
1641 if !matches {
1642 continue;
1643 }
1644
1645 let mut vars = HashMap::new();
1649 vars.insert("_id".to_string(), ast::Value::String(event._sid.clone()));
1650 if let Some(doc) = &event.document {
1651 for (k, v) in &doc.data {
1653 vars.insert(format!("_{}", k), db_value_to_ast_value(v));
1654 }
1655 }
1656
1657 let _ = execute_mutation_op(
1658 &db_clone,
1659 &action,
1660 &vars,
1661 &HashMap::new(),
1662 &ExecutionOptions::default(),
1663 &HashMap::new(),
1664 )
1665 .await;
1666 }
1667 Err(_) => {
1668 eprintln!("[handler:{}] channel closed, stopping", handler_name);
1669 break;
1670 }
1671 }
1672 }
1673 });
1674
1675 eprintln!(
1676 "[handler] '{}' registered on '{}'",
1677 handler.name, collection
1678 );
1679
1680 let mut data = HashMap::new();
1681 data.insert("name".to_string(), Value::String(handler.name.clone()));
1682 data.insert("collection".to_string(), Value::String(collection));
1683 data.insert(
1684 "status".to_string(),
1685 Value::String("registered".to_string()),
1686 );
1687
1688 Ok(ExecutionResult::Query(QueryResult {
1689 collection: "__handler".to_string(),
1690 documents: vec![Document {
1691 _sid: handler.name.clone(),
1692 data,
1693 }],
1694 total_count: Some(1),
1695 deferred_fields: vec![],
1696 explain: None,
1697 }))
1698}
1699
1700fn db_value_to_ast_value(v: &Value) -> ast::Value {
1701 match v {
1702 Value::Null => ast::Value::Null,
1703 Value::Bool(b) => ast::Value::Boolean(*b),
1704 Value::Int(i) => ast::Value::Int(*i),
1705 Value::Float(f) => ast::Value::Float(*f),
1706 Value::String(s) => ast::Value::String(s.clone()),
1707 Value::Uuid(u) => ast::Value::String(u.to_string()),
1708 Value::DateTime(dt) => ast::Value::String(dt.to_rfc3339()),
1709 Value::Array(arr) => ast::Value::Array(arr.iter().map(db_value_to_ast_value).collect()),
1710 Value::Object(m) => ast::Value::Object(
1711 m.iter()
1712 .map(|(k, v)| (k.clone(), db_value_to_ast_value(v)))
1713 .collect(),
1714 ),
1715 }
1716}
1717
1718async fn execute_mutation(
1719 db: &Aurora,
1720 mutation: &ast::Mutation,
1721 vars: &HashMap<String, ast::Value>,
1722 options: &ExecutionOptions,
1723 fragments: &HashMap<String, FragmentDef>,
1724) -> Result<ExecutionResult> {
1725 use crate::transaction::ACTIVE_TRANSACTION_ID;
1726
1727 validate_required_variables(&mutation.variable_definitions, vars)?;
1728
1729 let already_in_tx = ACTIVE_TRANSACTION_ID
1734 .try_with(|id| *id)
1735 .ok()
1736 .and_then(|id| db.transaction_manager.active_transactions.get(&id))
1737 .is_some();
1738
1739 if already_in_tx {
1740 let mut results = Vec::new();
1741 let mut context = HashMap::new();
1742 for mut_op in &mutation.operations {
1743 let res = execute_mutation_op(db, mut_op, vars, &context, options, fragments).await?;
1744 if let Some(alias) = &mut_op.alias {
1745 if let Some(doc) = res.returned_documents.first() {
1746 let mut m = serde_json::Map::new();
1747 for (k, v) in &doc.data {
1748 m.insert(k.clone(), aurora_value_to_json_value(v));
1749 }
1750 m.insert("id".to_string(), JsonValue::String(doc._sid.clone()));
1751 context.insert(alias.clone(), JsonValue::Object(m));
1752 }
1753 }
1754 results.push(res);
1755 }
1756 return if results.len() == 1 {
1757 Ok(ExecutionResult::Mutation(results.remove(0)))
1758 } else {
1759 Ok(ExecutionResult::Batch(
1760 results.into_iter().map(ExecutionResult::Mutation).collect(),
1761 ))
1762 };
1763 }
1764
1765 let tx_id = db.begin_transaction().await;
1768
1769 let exec_result = ACTIVE_TRANSACTION_ID
1770 .scope(tx_id, async {
1771 let mut results = Vec::new();
1772 let mut context = HashMap::new();
1773 for mut_op in &mutation.operations {
1774 let res =
1775 execute_mutation_op(db, mut_op, vars, &context, options, fragments).await?;
1776 if let Some(alias) = &mut_op.alias {
1777 if let Some(doc) = res.returned_documents.first() {
1778 let mut m = serde_json::Map::new();
1779 for (k, v) in &doc.data {
1780 m.insert(k.clone(), aurora_value_to_json_value(v));
1781 }
1782 m.insert("id".to_string(), JsonValue::String(doc._sid.clone()));
1783 context.insert(alias.clone(), JsonValue::Object(m));
1784 }
1785 }
1786 results.push(res);
1787 }
1788 Ok::<_, crate::error::AqlError>(results)
1789 })
1790 .await;
1791
1792 match exec_result {
1793 Ok(mut results) => {
1794 db.commit_transaction(tx_id).await?;
1795 if results.len() == 1 {
1796 Ok(ExecutionResult::Mutation(results.remove(0)))
1797 } else {
1798 Ok(ExecutionResult::Batch(
1799 results.into_iter().map(ExecutionResult::Mutation).collect(),
1800 ))
1801 }
1802 }
1803 Err(e) => {
1804 let _ = db.rollback_transaction(tx_id).await;
1805 Err(e)
1806 }
1807 }
1808}
1809
1810fn execute_mutation_op<'a>(
1811 db: &'a Aurora,
1812 mut_op: &'a ast::MutationOperation,
1813 variables: &'a HashMap<String, ast::Value>,
1814 context: &'a ExecutionContext,
1815 options: &'a ExecutionOptions,
1816 fragments: &'a HashMap<String, FragmentDef>,
1817) -> futures::future::BoxFuture<'a, Result<MutationResult>> {
1818 use futures::future::FutureExt;
1819 async move {
1820 match &mut_op.operation {
1821 MutationOp::Insert { collection, data } => {
1822 let resolved = resolve_value(data, variables, context);
1823 let doc = db
1824 .aql_insert(collection, aql_value_to_hashmap(&resolved, variables)?)
1825 .await?;
1826 let returned = if !mut_op.selection_set.is_empty() && options.apply_projections {
1827 let fields = collect_fields(
1828 &mut_op.selection_set,
1829 fragments,
1830 variables,
1831 Some(collection),
1832 )
1833 .unwrap_or_default();
1834 vec![apply_projection(doc, &fields, options)]
1835 } else {
1836 vec![doc]
1837 };
1838 Ok(MutationResult {
1839 operation: "insert".to_string(),
1840 collection: collection.clone(),
1841 affected_count: 1,
1842 returned_documents: returned,
1843 })
1844 }
1845 MutationOp::Update {
1846 collection,
1847 filter,
1848 data,
1849 } => {
1850 let cf = if let Some(f) = filter {
1851 Some(compile_filter(f)?)
1852 } else {
1853 None
1854 };
1855 let update_data =
1856 aql_value_to_hashmap(&resolve_value(data, variables, context), variables)?;
1857
1858 let mut affected = 0;
1860 let mut returned = Vec::new();
1861
1862 let vars_arc = Arc::new(variables.clone());
1865 let cf_arc = cf.map(Arc::new);
1866
1867 let matches = db.scan_and_filter(
1868 collection,
1869 |doc| {
1870 if let Some(ref filter) = cf_arc {
1871 matches_filter(doc, filter, &vars_arc)
1872 } else {
1873 true
1874 }
1875 },
1876 None,
1877 )?;
1878
1879 let fields = if !mut_op.selection_set.is_empty() {
1880 Some(
1881 collect_fields(
1882 &mut_op.selection_set,
1883 fragments,
1884 variables,
1885 Some(collection),
1886 )
1887 .unwrap_or_default(),
1888 )
1889 } else {
1890 None
1891 };
1892
1893 for doc in matches {
1894 let mut new_data = doc.data.clone();
1895 for (k, v) in &update_data {
1896 let applied = apply_field_modifier(new_data.get(k), v);
1897 new_data.insert(k.clone(), applied);
1898 }
1899
1900 let updated_doc = db
1901 .aql_update_document(collection, &doc._sid, new_data)
1902 .await?;
1903
1904 affected += 1;
1905 if let Some(ref f) = fields {
1906 returned.push(apply_projection(updated_doc, f, options));
1907 }
1908 }
1909
1910 Ok(MutationResult {
1911 operation: "update".to_string(),
1912 collection: collection.clone(),
1913 affected_count: affected,
1914 returned_documents: returned,
1915 })
1916 }
1917 MutationOp::Delete { collection, filter } => {
1918 let cf = if let Some(f) = filter {
1919 Some(compile_filter(f)?)
1920 } else {
1921 None
1922 };
1923
1924 let mut affected = 0;
1925 let vars_arc = Arc::new(variables.clone());
1926 let cf_arc = cf.map(Arc::new);
1927
1928 let matches = db.scan_and_filter(
1929 collection,
1930 |doc| {
1931 if let Some(ref filter) = cf_arc {
1932 matches_filter(doc, filter, &vars_arc)
1933 } else {
1934 true
1935 }
1936 },
1937 None,
1938 )?;
1939
1940 for doc in matches {
1941 db.aql_delete_document(collection, &doc._sid).await?;
1942 affected += 1;
1943 }
1944
1945 Ok(MutationResult {
1946 operation: "delete".to_string(),
1947 collection: collection.clone(),
1948 affected_count: affected,
1949 returned_documents: vec![],
1950 })
1951 }
1952 MutationOp::InsertMany { collection, data } => {
1953 let mut affected = 0;
1954 let mut returned = Vec::new();
1955 for item in data {
1956 let resolved = resolve_value(item, variables, context);
1957 let doc = db
1958 .aql_insert(collection, aql_value_to_hashmap(&resolved, variables)?)
1959 .await?;
1960 affected += 1;
1961 if !mut_op.selection_set.is_empty() && options.apply_projections {
1962 let fields = collect_fields(
1963 &mut_op.selection_set,
1964 fragments,
1965 variables,
1966 Some(collection),
1967 )
1968 .unwrap_or_default();
1969 returned.push(apply_projection(doc, &fields, options));
1970 } else {
1971 returned.push(doc);
1972 }
1973 }
1974 Ok(MutationResult {
1975 operation: "insertMany".to_string(),
1976 collection: collection.clone(),
1977 affected_count: affected,
1978 returned_documents: returned,
1979 })
1980 }
1981 MutationOp::Upsert {
1982 collection,
1983 filter,
1984 data,
1985 } => {
1986 let update_data =
1987 aql_value_to_hashmap(&resolve_value(data, variables, context), variables)?;
1988 let cf = if let Some(f) = filter {
1989 Some(compile_filter(f)?)
1990 } else {
1991 None
1992 };
1993 let vars_arc = Arc::new(variables.clone());
1994 let cf_arc = cf.map(Arc::new);
1995 let matches = db.scan_and_filter(
1996 collection,
1997 |doc| {
1998 if let Some(ref filter) = cf_arc {
1999 matches_filter(doc, filter, &vars_arc)
2000 } else {
2001 true
2002 }
2003 },
2004 Some(1),
2005 )?;
2006 let doc = if let Some(existing) = matches.into_iter().next() {
2007 db.aql_update_document(collection, &existing._sid, update_data)
2008 .await?
2009 } else {
2010 db.aql_insert(collection, update_data).await?
2011 };
2012 Ok(MutationResult {
2013 operation: "upsert".to_string(),
2014 collection: collection.clone(),
2015 affected_count: 1,
2016 returned_documents: vec![doc],
2017 })
2018 }
2019 MutationOp::Transaction { operations } => {
2020 let mut all_returned = Vec::new();
2021 let mut total_affected = 0;
2022 for op in operations {
2023 let res =
2024 execute_mutation_op(db, op, variables, context, options, fragments).await?;
2025 total_affected += res.affected_count;
2026 all_returned.extend(res.returned_documents);
2027 }
2028 Ok(MutationResult {
2029 operation: "transaction".to_string(),
2030 collection: String::new(),
2031 affected_count: total_affected,
2032 returned_documents: all_returned,
2033 })
2034 }
2035 MutationOp::EnqueueJobs {
2036 job_type,
2037 payloads,
2038 priority,
2039 max_retries,
2040 } => {
2041 let workers = db.workers.as_ref().ok_or_else(|| {
2042 AqlError::new(
2043 ErrorCode::QueryError,
2044 "Worker system not initialised".to_string(),
2045 )
2046 })?;
2047 let job_priority = match priority {
2048 ast::JobPriority::Low => crate::workers::JobPriority::Low,
2049 ast::JobPriority::Normal => crate::workers::JobPriority::Normal,
2050 ast::JobPriority::High => crate::workers::JobPriority::High,
2051 ast::JobPriority::Critical => crate::workers::JobPriority::Critical,
2052 };
2053 let mut returned = Vec::new();
2054 for payload in payloads {
2055 let resolved = resolve_value(payload, variables, context);
2056 let json_payload: std::collections::HashMap<String, serde_json::Value> =
2057 if let ast::Value::Object(map) = &resolved {
2058 map.iter()
2059 .map(|(k, v)| (k.clone(), aql_value_to_json(v)))
2060 .collect()
2061 } else {
2062 std::collections::HashMap::new()
2063 };
2064 let mut job =
2065 crate::workers::Job::new(job_type.clone()).with_priority(job_priority);
2066 for (k, v) in json_payload {
2067 job = job.add_field(k, v);
2068 }
2069 if let Some(retries) = max_retries {
2070 job = job.with_max_retries(*retries);
2071 }
2072 let job_id = workers.enqueue(job).await?;
2073 let mut doc = crate::types::Document::new();
2074 doc._sid = job_id.clone();
2075 doc.data.insert("job_id".to_string(), Value::String(job_id));
2076 doc.data
2077 .insert("job_type".to_string(), Value::String(job_type.clone()));
2078 doc.data
2079 .insert("status".to_string(), Value::String("pending".to_string()));
2080 returned.push(doc);
2081 }
2082 let count = returned.len();
2083 Ok(MutationResult {
2084 operation: "enqueueJobs".to_string(),
2085 collection: "__jobs".to_string(),
2086 affected_count: count,
2087 returned_documents: returned,
2088 })
2089 }
2090 MutationOp::Import { collection, data } => {
2091 let mut affected = 0;
2092 let mut returned = Vec::new();
2093 let fields = if !mut_op.selection_set.is_empty() {
2094 Some(
2095 collect_fields(
2096 &mut_op.selection_set,
2097 fragments,
2098 variables,
2099 Some(collection),
2100 )
2101 .unwrap_or_default(),
2102 )
2103 } else {
2104 None
2105 };
2106 for item in data {
2107 let resolved = resolve_value(item, variables, context);
2108 let map = aql_value_to_hashmap(&resolved, variables)?;
2109 let doc = db.aql_insert(collection, map).await?;
2110 affected += 1;
2111 if let Some(ref f) = fields {
2112 returned.push(apply_projection(doc, f, options));
2113 }
2114 }
2115 Ok(MutationResult {
2116 operation: "import".to_string(),
2117 collection: collection.clone(),
2118 affected_count: affected,
2119 returned_documents: returned,
2120 })
2121 }
2122 MutationOp::Export {
2123 collection,
2124 format: _,
2125 } => {
2126 let docs = db.scan_and_filter(collection, |_| true, None)?;
2127 let fields = if !mut_op.selection_set.is_empty() {
2128 Some(
2129 collect_fields(
2130 &mut_op.selection_set,
2131 fragments,
2132 variables,
2133 Some(collection),
2134 )
2135 .unwrap_or_default(),
2136 )
2137 } else {
2138 None
2139 };
2140 let returned: Vec<Document> = if let Some(ref f) = fields {
2141 docs.into_iter().map(|d| apply_projection(d, f, options)).collect()
2142 } else {
2143 docs
2144 };
2145 let count = returned.len();
2146 Ok(MutationResult {
2147 operation: "export".to_string(),
2148 collection: collection.clone(),
2149 affected_count: count,
2150 returned_documents: returned,
2151 })
2152 }
2153 MutationOp::EnqueueJob {
2154 job_type,
2155 payload,
2156 priority,
2157 scheduled_at,
2158 max_retries,
2159 } => {
2160 let workers = db.workers.as_ref().ok_or_else(|| {
2161 AqlError::new(
2162 ErrorCode::QueryError,
2163 "Worker system not initialised".to_string(),
2164 )
2165 })?;
2166
2167 let job_priority = match priority {
2169 ast::JobPriority::Low => crate::workers::JobPriority::Low,
2170 ast::JobPriority::Normal => crate::workers::JobPriority::Normal,
2171 ast::JobPriority::High => crate::workers::JobPriority::High,
2172 ast::JobPriority::Critical => crate::workers::JobPriority::Critical,
2173 };
2174
2175 let resolved = resolve_value(payload, variables, context);
2177 let json_payload: std::collections::HashMap<String, serde_json::Value> =
2178 if let ast::Value::Object(map) = &resolved {
2179 map.iter()
2180 .map(|(k, v)| (k.clone(), aql_value_to_json(v)))
2181 .collect()
2182 } else {
2183 std::collections::HashMap::new()
2184 };
2185
2186 let mut job =
2187 crate::workers::Job::new(job_type.clone()).with_priority(job_priority);
2188
2189 for (k, v) in json_payload {
2190 job = job.add_field(k, v);
2191 }
2192
2193 if let Some(retries) = max_retries {
2194 job = job.with_max_retries(*retries);
2195 }
2196
2197 if let Some(scheduled) = scheduled_at {
2198 if let Ok(dt) = scheduled.parse::<chrono::DateTime<chrono::Utc>>() {
2199 job = job.scheduled_at(dt);
2200 }
2201 }
2202
2203 let job_id = workers.enqueue(job).await?;
2204
2205 let mut doc = crate::types::Document::new();
2206 doc._sid = job_id.clone();
2207 doc.data
2208 .insert("job_id".to_string(), crate::types::Value::String(job_id));
2209 doc.data.insert(
2210 "job_type".to_string(),
2211 crate::types::Value::String(job_type.clone()),
2212 );
2213 doc.data.insert(
2214 "status".to_string(),
2215 crate::types::Value::String("pending".to_string()),
2216 );
2217
2218 Ok(MutationResult {
2219 operation: "enqueueJob".to_string(),
2220 collection: "__jobs".to_string(),
2221 affected_count: 1,
2222 returned_documents: vec![doc],
2223 })
2224 }
2225 }
2226 }
2227 .boxed()
2228}
2229
2230async fn execute_subscription(
2231 db: &Aurora,
2232 sub: &ast::Subscription,
2233 vars: &HashMap<String, ast::Value>,
2234 _options: &ExecutionOptions,
2235) -> Result<ExecutionResult> {
2236 let vars: HashMap<String, ast::Value> = vars.clone();
2237
2238 let selection = sub.selection_set.first().ok_or_else(|| {
2239 AqlError::new(
2240 ErrorCode::QueryError,
2241 "Subscription must have a selection".to_string(),
2242 )
2243 })?;
2244
2245 if let Selection::Field(f) = selection {
2246 let collection = f.name.clone();
2247 let filter = extract_filter_from_args(&f.arguments)?;
2248
2249 let mut listener = db.pubsub.listen(&collection);
2250
2251 if let Some(f) = filter {
2252 let event_filter = ast_filter_to_event_filter(&f, &vars)?;
2253 listener = listener.filter(event_filter);
2254 }
2255
2256 Ok(ExecutionResult::Subscription(SubscriptionResult {
2257 subscription_id: uuid::Uuid::new_v4().to_string(),
2258 collection,
2259 stream: Some(listener),
2260 }))
2261 } else {
2262 Err(AqlError::new(
2263 ErrorCode::QueryError,
2264 "Invalid subscription selection".to_string(),
2265 ))
2266 }
2267}
2268
2269fn ast_filter_to_event_filter(
2270 filter: &AqlFilter,
2271 vars: &HashMap<String, ast::Value>,
2272) -> Result<crate::pubsub::EventFilter> {
2273 use crate::pubsub::EventFilter;
2274
2275 match filter {
2276 AqlFilter::Eq(f, v) => Ok(EventFilter::FieldEquals(
2277 f.clone(),
2278 aql_value_to_db_value(v, vars)?,
2279 )),
2280 AqlFilter::Ne(f, v) => Ok(EventFilter::Ne(f.clone(), aql_value_to_db_value(v, vars)?)),
2281 AqlFilter::Gt(f, v) => Ok(EventFilter::Gt(f.clone(), aql_value_to_db_value(v, vars)?)),
2282 AqlFilter::Gte(f, v) => Ok(EventFilter::Gte(f.clone(), aql_value_to_db_value(v, vars)?)),
2283 AqlFilter::Lt(f, v) => Ok(EventFilter::Lt(f.clone(), aql_value_to_db_value(v, vars)?)),
2284 AqlFilter::Lte(f, v) => Ok(EventFilter::Lte(f.clone(), aql_value_to_db_value(v, vars)?)),
2285 AqlFilter::In(f, v) => Ok(EventFilter::In(f.clone(), aql_value_to_db_value(v, vars)?)),
2286 AqlFilter::NotIn(f, v) => Ok(EventFilter::NotIn(
2287 f.clone(),
2288 aql_value_to_db_value(v, vars)?,
2289 )),
2290 AqlFilter::Contains(f, v) => Ok(EventFilter::Contains(
2291 f.clone(),
2292 aql_value_to_db_value(v, vars)?,
2293 )),
2294 AqlFilter::ContainsAny(f, v) | AqlFilter::ContainsAll(f, v) => Ok(EventFilter::Contains(
2296 f.clone(),
2297 aql_value_to_db_value(v, vars)?,
2298 )),
2299 AqlFilter::StartsWith(f, v) => Ok(EventFilter::StartsWith(
2300 f.clone(),
2301 aql_value_to_db_value(v, vars)?,
2302 )),
2303 AqlFilter::EndsWith(f, v) => Ok(EventFilter::EndsWith(
2304 f.clone(),
2305 aql_value_to_db_value(v, vars)?,
2306 )),
2307 AqlFilter::Matches(f, v) => {
2308 let pattern = match aql_value_to_db_value(v, vars)? {
2309 crate::types::Value::String(s) => s,
2310 other => other.to_string(),
2311 };
2312 let re = regex::Regex::new(&pattern).map_err(|e| {
2313 crate::error::AqlError::invalid_operation(format!("Invalid regex pattern: {}", e))
2314 })?;
2315 Ok(EventFilter::Matches(f.clone(), re))
2316 }
2317 AqlFilter::IsNull(f) => Ok(EventFilter::IsNull(f.clone())),
2318 AqlFilter::IsNotNull(f) => Ok(EventFilter::IsNotNull(f.clone())),
2319 AqlFilter::And(filters) => {
2320 let mut mapped = Vec::new();
2321 for f in filters {
2322 mapped.push(ast_filter_to_event_filter(f, vars)?);
2323 }
2324 Ok(EventFilter::And(mapped))
2325 }
2326 AqlFilter::Or(filters) => {
2327 let mut mapped = Vec::new();
2328 for f in filters {
2329 mapped.push(ast_filter_to_event_filter(f, vars)?);
2330 }
2331 Ok(EventFilter::Or(mapped))
2332 }
2333 AqlFilter::Not(f) => Ok(EventFilter::Not(Box::new(ast_filter_to_event_filter(
2334 f, vars,
2335 )?))),
2336 }
2337}
2338
2339async fn execute_introspection(
2340 db: &Aurora,
2341 intro: &ast::IntrospectionQuery,
2342) -> Result<ExecutionResult> {
2343 let names = db.list_collection_names();
2344 let want_fields = intro.fields.is_empty()
2345 || intro
2346 .fields
2347 .iter()
2348 .any(|f| f == "collections" || f == "fields");
2349
2350 let documents: Vec<Document> = names
2351 .iter()
2352 .filter_map(|name| {
2353 if name.starts_with('_') {
2355 return None;
2356 }
2357 let col = db.get_collection_definition(name).ok()?;
2358 let mut data = HashMap::new();
2359 data.insert("name".to_string(), Value::String(name.clone()));
2360
2361 if want_fields {
2362 let field_list: Vec<Value> = col
2363 .fields
2364 .iter()
2365 .map(|(fname, fdef)| {
2366 let mut fdata = HashMap::new();
2367 fdata.insert("name".to_string(), Value::String(fname.clone()));
2368 fdata.insert(
2369 "type".to_string(),
2370 Value::String(fdef.field_type.to_string()),
2371 );
2372 fdata.insert("required".to_string(), Value::Bool(!fdef.nullable));
2373 fdata.insert("indexed".to_string(), Value::Bool(fdef.indexed));
2374 fdata.insert("unique".to_string(), Value::Bool(fdef.unique));
2375 if !fdef.validations.is_empty() {
2376 let vcons: Vec<Value> = fdef
2377 .validations
2378 .iter()
2379 .map(|c| Value::String(format!("{:?}", c)))
2380 .collect();
2381 fdata.insert("validations".to_string(), Value::Array(vcons));
2382 }
2383 Value::Object(fdata)
2384 })
2385 .collect();
2386 data.insert("fields".to_string(), Value::Array(field_list));
2387 }
2388
2389 Some(Document {
2390 _sid: name.clone(),
2391 data,
2392 })
2393 })
2394 .collect();
2395
2396 let count = documents.len();
2397 Ok(ExecutionResult::Query(QueryResult {
2398 collection: "__schema".to_string(),
2399 documents,
2400 total_count: Some(count),
2401 deferred_fields: vec![],
2402 explain: None,
2403 }))
2404}
2405
2406async fn execute_schema(
2407 db: &Aurora,
2408 schema: &ast::Schema,
2409 _options: &ExecutionOptions,
2410) -> Result<ExecutionResult> {
2411 let mut last_collection = String::new();
2412
2413 for op in &schema.operations {
2414 match op {
2415 ast::SchemaOp::DefineCollection {
2416 name,
2417 fields,
2418 if_not_exists,
2419 ..
2420 } => {
2421 last_collection = name.clone();
2422
2423 if *if_not_exists {
2425 if db.get_collection_definition(name).is_ok() {
2426 continue;
2427 }
2428 }
2429
2430 let mut field_defs = Vec::new();
2431 for field in fields {
2432 field_defs.push((field.name.as_str(), build_field_def(field)));
2433 }
2434 db.new_collection(name, field_defs).await?;
2435 }
2436 ast::SchemaOp::AlterCollection { name, actions } => {
2437 last_collection = name.clone();
2438 for action in actions {
2439 match action {
2440 ast::AlterAction::AddField { field, default } => {
2441 let def = build_field_def(field);
2442 db.add_field_to_schema(name, field.name.clone(), def)
2443 .await?;
2444 if let Some(default_val) = default {
2445 let db_val = aql_value_to_db_value(default_val, &HashMap::new())?;
2446 let docs = db.get_all_collection(name).await?;
2447 for doc in docs {
2448 if !doc.data.contains_key(&field.name) {
2449 db.update_document(
2450 name,
2451 &doc._sid,
2452 vec![(&field.name, db_val.clone())],
2453 )
2454 .await?;
2455 }
2456 }
2457 }
2458 }
2459 ast::AlterAction::DropField(field_name) => {
2460 db.drop_field_from_schema(name, field_name.clone()).await?;
2461 }
2462 ast::AlterAction::RenameField { from, to } => {
2463 db.rename_field_in_schema(name, from.clone(), to.clone())
2464 .await?;
2465 let docs = db.get_all_collection(name).await?;
2466 for mut doc in docs {
2467 if let Some(val) = doc.data.remove(from.as_str()) {
2468 doc.data.insert(to.clone(), val);
2469 let key = format!("{}:{}", name, doc._sid);
2470 db.put(key, serde_json::to_vec(&doc)?, None).await?;
2471 }
2472 }
2473 }
2474 ast::AlterAction::ModifyField(field) => {
2475 db.modify_field_in_schema(
2476 name,
2477 field.name.clone(),
2478 build_field_def(field),
2479 )
2480 .await?;
2481 }
2482 }
2483 }
2484 }
2485 ast::SchemaOp::DropCollection { name, .. } => {
2486 db.drop_collection_schema(name).await?;
2487 last_collection = name.clone();
2488 }
2489 }
2490 }
2491
2492 Ok(ExecutionResult::Schema(SchemaResult {
2493 operation: "schema".to_string(),
2494 collection: last_collection,
2495 status: "done".to_string(),
2496 }))
2497}
2498
2499fn map_ast_type(anno: &ast::TypeAnnotation) -> FieldType {
2500 let scalar = match anno.name.to_lowercase().as_str() {
2501 "string" => ScalarType::String,
2502 "int" | "integer" => ScalarType::Int,
2503 "float" | "double" => ScalarType::Float,
2504 "bool" | "boolean" => ScalarType::Bool,
2505 "uuid" => ScalarType::Uuid,
2506 "object" => ScalarType::Object,
2507 "array" => ScalarType::Array,
2508 _ => ScalarType::Any,
2509 };
2510
2511 if anno.is_array {
2512 FieldType::Array(scalar)
2513 } else {
2514 match scalar {
2515 ScalarType::Object => FieldType::Object,
2516 ScalarType::Any => FieldType::Any,
2517 _ => FieldType::Scalar(scalar),
2518 }
2519 }
2520}
2521
2522fn parse_validate_directive(
2524 directive: &ast::Directive,
2525) -> Vec<crate::types::FieldValidationConstraint> {
2526 use crate::types::FieldValidationConstraint as FVC;
2527 let mut constraints = Vec::new();
2528 for arg in &directive.arguments {
2529 match arg.name.as_str() {
2530 "format" => {
2531 if let ast::Value::String(s) = &arg.value {
2532 constraints.push(FVC::Format(s.clone()));
2533 }
2534 }
2535 "min" => {
2536 let n = match &arg.value {
2537 ast::Value::Float(f) => Some(*f),
2538 ast::Value::Int(i) => Some(*i as f64),
2539 _ => None,
2540 };
2541 if let Some(n) = n {
2542 constraints.push(FVC::Min(n));
2543 }
2544 }
2545 "max" => {
2546 let n = match &arg.value {
2547 ast::Value::Float(f) => Some(*f),
2548 ast::Value::Int(i) => Some(*i as f64),
2549 _ => None,
2550 };
2551 if let Some(n) = n {
2552 constraints.push(FVC::Max(n));
2553 }
2554 }
2555 "minLength" => {
2556 if let ast::Value::Int(i) = &arg.value {
2557 constraints.push(FVC::MinLength(*i));
2558 }
2559 }
2560 "maxLength" => {
2561 if let ast::Value::Int(i) = &arg.value {
2562 constraints.push(FVC::MaxLength(*i));
2563 }
2564 }
2565 "pattern" => {
2566 if let ast::Value::String(s) = &arg.value {
2567 constraints.push(FVC::Pattern(s.clone()));
2568 }
2569 }
2570 _ => {}
2571 }
2572 }
2573 constraints
2574}
2575
2576fn build_field_def(field: &ast::FieldDef) -> FieldDefinition {
2578 let field_type = map_ast_type(&field.field_type);
2579 let mut indexed = false;
2580 let mut unique = false;
2581 let mut validations = Vec::new();
2582 for directive in &field.directives {
2583 match directive.name.as_str() {
2584 "indexed" | "index" => indexed = true,
2585 "unique" => {
2586 unique = true;
2587 indexed = true;
2588 }
2589 "primary" => {
2590 indexed = true;
2591 unique = true;
2592 }
2593 "validate" => validations.extend(parse_validate_directive(directive)),
2594 _ => {}
2595 }
2596 }
2597 FieldDefinition {
2598 field_type,
2599 unique,
2600 indexed,
2601 nullable: !field.field_type.is_required,
2602 validations,
2603 }
2604}
2605
2606async fn execute_migration(
2607 db: &Aurora,
2608 migration: &ast::Migration,
2609 _options: &ExecutionOptions,
2610) -> Result<ExecutionResult> {
2611 let mut steps_applied = 0;
2612 let mut last_version = String::new();
2613
2614 for step in &migration.steps {
2615 last_version = step.version.clone();
2616
2617 if db.is_migration_applied(&step.version).await? {
2618 eprintln!(
2619 "[migration] version '{}' already applied — skipping",
2620 step.version
2621 );
2622 continue;
2623 }
2624
2625 eprintln!("[migration] applying version '{}'", step.version);
2626
2627 for action in &step.actions {
2628 match action {
2629 ast::MigrationAction::Schema(schema_op) => {
2630 execute_single_schema_op(db, schema_op).await?;
2631 }
2632 ast::MigrationAction::DataMigration(dm) => {
2633 execute_data_migration(db, dm).await?;
2634 }
2635 }
2636 }
2637
2638 db.mark_migration_applied(&step.version).await?;
2639 steps_applied += 1;
2640 eprintln!("[migration] version '{}' applied", step.version);
2641 }
2642
2643 let status = if steps_applied > 0 {
2644 "applied".to_string()
2645 } else {
2646 "skipped".to_string()
2647 };
2648
2649 Ok(ExecutionResult::Migration(MigrationResult {
2650 version: last_version,
2651 steps_applied,
2652 status,
2653 }))
2654}
2655
2656async fn execute_single_schema_op(db: &Aurora, op: &ast::SchemaOp) -> Result<()> {
2658 match op {
2659 ast::SchemaOp::DefineCollection {
2660 name,
2661 fields,
2662 if_not_exists,
2663 ..
2664 } => {
2665 if *if_not_exists && db.get_collection_definition(name).is_ok() {
2666 return Ok(());
2667 }
2668 let field_defs: Vec<(&str, FieldDefinition)> = fields
2669 .iter()
2670 .map(|f| (f.name.as_str(), build_field_def(f)))
2671 .collect();
2672 db.new_collection(name, field_defs).await?;
2673 }
2674 ast::SchemaOp::AlterCollection { name, actions } => {
2675 for action in actions {
2676 match action {
2677 ast::AlterAction::AddField { field, default } => {
2678 db.add_field_to_schema(name, field.name.clone(), build_field_def(field))
2679 .await?;
2680 if let Some(default_val) = default {
2681 let db_val = aql_value_to_db_value(default_val, &HashMap::new())?;
2682 let docs = db.get_all_collection(name).await?;
2683 for doc in docs {
2684 if !doc.data.contains_key(&field.name) {
2685 db.update_document(
2686 name,
2687 &doc._sid,
2688 vec![(&field.name, db_val.clone())],
2689 )
2690 .await?;
2691 }
2692 }
2693 }
2694 }
2695 ast::AlterAction::DropField(field_name) => {
2696 db.drop_field_from_schema(name, field_name.clone()).await?;
2697 }
2698 ast::AlterAction::RenameField { from, to } => {
2699 db.rename_field_in_schema(name, from.clone(), to.clone())
2700 .await?;
2701 let docs = db.get_all_collection(name).await?;
2703 for mut doc in docs {
2704 if let Some(val) = doc.data.remove(from.as_str()) {
2705 doc.data.insert(to.clone(), val);
2706 let key = format!("{}:{}", name, doc._sid);
2707 db.put(key, serde_json::to_vec(&doc)?, None).await?;
2708 }
2709 }
2710 }
2711 ast::AlterAction::ModifyField(field) => {
2712 db.modify_field_in_schema(name, field.name.clone(), build_field_def(field))
2713 .await?;
2714 }
2715 }
2716 }
2717 }
2718 ast::SchemaOp::DropCollection { name, if_exists } => {
2719 if *if_exists && db.get_collection_definition(name).is_err() {
2720 return Ok(());
2721 }
2722 db.drop_collection_schema(name).await?;
2723 }
2724 }
2725 Ok(())
2726}
2727
2728async fn execute_data_migration(db: &Aurora, dm: &ast::DataMigration) -> Result<()> {
2730 let docs = db.get_all_collection(&dm.collection).await?;
2731
2732 for doc in docs {
2733 for transform in &dm.transforms {
2734 let matches = match &transform.filter {
2735 Some(filter) => {
2736 let compiled = compile_filter(filter)?;
2737 matches_filter(&doc, &compiled, &HashMap::new())
2738 }
2739 None => true,
2740 };
2741
2742 if matches {
2743 let new_value = eval_migration_expr(&transform.expression, &doc);
2744 let mut updates = HashMap::new();
2745 updates.insert(transform.field.clone(), new_value);
2746 db.aql_update_document(&dm.collection, &doc._sid, updates)
2747 .await?;
2748 }
2749 }
2750 }
2751 Ok(())
2752}
2753
2754fn eval_migration_expr(expr: &str, doc: &Document) -> Value {
2764 let expr = expr.trim();
2765
2766 if expr.starts_with('"') && expr.ends_with('"') && expr.len() >= 2 {
2767 return Value::String(expr[1..expr.len() - 1].to_string());
2768 }
2769 if expr == "true" {
2770 return Value::Bool(true);
2771 }
2772 if expr == "false" {
2773 return Value::Bool(false);
2774 }
2775 if expr == "null" {
2776 return Value::Null;
2777 }
2778 if let Ok(n) = expr.parse::<i64>() {
2779 return Value::Int(n);
2780 }
2781 if let Ok(f) = expr.parse::<f64>() {
2782 return Value::Float(f);
2783 }
2784 if let Some(v) = doc.data.get(expr) {
2785 return v.clone();
2786 }
2787
2788 Value::Null
2789}
2790
2791fn extract_filter_from_args(args: &[ast::Argument]) -> Result<Option<AqlFilter>> {
2792 for a in args {
2793 if a.name == "where" || a.name == "filter" {
2794 return Ok(Some(value_to_filter(&a.value)?));
2795 }
2796 }
2797 Ok(None)
2798}
2799
2800fn extract_order_by(args: &[ast::Argument]) -> Vec<ast::Ordering> {
2801 let mut orderings = Vec::new();
2802 for a in args {
2803 if a.name == "orderBy" {
2804 match &a.value {
2805 ast::Value::String(f) => orderings.push(ast::Ordering {
2806 field: f.clone(),
2807 direction: ast::SortDirection::Asc,
2808 }),
2809 ast::Value::Object(obj) => {
2810 if let (Some(ast::Value::String(field_name)), Some(dir_val)) =
2812 (obj.get("field"), obj.get("direction"))
2813 {
2814 let direction = match dir_val {
2815 ast::Value::Enum(s) | ast::Value::String(s) => {
2816 if s.to_uppercase() == "DESC" {
2817 ast::SortDirection::Desc
2818 } else {
2819 ast::SortDirection::Asc
2820 }
2821 }
2822 _ => ast::SortDirection::Asc,
2823 };
2824 orderings.push(ast::Ordering {
2825 field: field_name.clone(),
2826 direction,
2827 });
2828 } else {
2829 for (field, dir_val) in obj {
2831 let direction = match dir_val {
2832 ast::Value::Enum(s) | ast::Value::String(s) => {
2833 if s.to_uppercase() == "DESC" {
2834 ast::SortDirection::Desc
2835 } else {
2836 ast::SortDirection::Asc
2837 }
2838 }
2839 _ => ast::SortDirection::Asc,
2840 };
2841 orderings.push(ast::Ordering {
2842 field: field.clone(),
2843 direction,
2844 });
2845 }
2846 }
2847 }
2848 _ => {}
2849 }
2850 }
2851 }
2852 orderings
2853}
2854
2855fn apply_ordering(docs: &mut [Document], orderings: &[ast::Ordering]) {
2856 docs.sort_by(|a, b| {
2857 for o in orderings {
2858 let cmp = compare_values(a.data.get(&o.field), b.data.get(&o.field));
2859 if cmp != std::cmp::Ordering::Equal {
2860 return match o.direction {
2861 ast::SortDirection::Asc => cmp,
2862 ast::SortDirection::Desc => cmp.reverse(),
2863 };
2864 }
2865 }
2866 std::cmp::Ordering::Equal
2867 });
2868}
2869
2870fn compare_values(a: Option<&Value>, b: Option<&Value>) -> std::cmp::Ordering {
2871 match (a, b) {
2872 (None, None) => std::cmp::Ordering::Equal,
2873 (None, Some(_)) => std::cmp::Ordering::Less,
2874 (Some(_), None) => std::cmp::Ordering::Greater,
2875 (Some(av), Some(bv)) => av.partial_cmp(bv).unwrap_or(std::cmp::Ordering::Equal),
2876 }
2877}
2878
2879pub fn extract_pagination(args: &[ast::Argument]) -> (Option<usize>, usize) {
2880 let (mut limit, mut offset) = (None, 0);
2881 for a in args {
2882 match a.name.as_str() {
2883 "limit" | "first" => {
2884 if let ast::Value::Int(n) = a.value {
2885 limit = Some(n as usize);
2886 }
2887 }
2888 "offset" | "skip" => {
2889 if let ast::Value::Int(n) = a.value {
2890 offset = n as usize;
2891 }
2892 }
2893 _ => {}
2894 }
2895 }
2896 (limit, offset)
2897}
2898
2899fn extract_cursor_pagination(
2900 args: &[ast::Argument],
2901) -> (Option<usize>, Option<String>, Option<usize>, Option<String>) {
2902 let (mut first, mut after, mut last, mut before) = (None, None, None, None);
2903 for a in args {
2904 match a.name.as_str() {
2905 "first" => {
2906 if let ast::Value::Int(n) = a.value {
2907 first = Some(n as usize);
2908 }
2909 }
2910 "after" => {
2911 if let ast::Value::String(s) = &a.value {
2912 after = Some(s.clone());
2913 }
2914 }
2915 "last" => {
2916 if let ast::Value::Int(n) = a.value {
2917 last = Some(n as usize);
2918 }
2919 }
2920 "before" => {
2921 if let ast::Value::String(s) = &a.value {
2922 before = Some(s.clone());
2923 }
2924 }
2925 _ => {}
2926 }
2927 }
2928 (first, after, last, before)
2929}
2930
2931fn execute_connection(
2932 mut docs: Vec<Document>,
2933 sub_fields: &[ast::Field],
2934 limit: Option<usize>,
2935 fragments: &HashMap<String, FragmentDef>,
2936 variables: &HashMap<String, ast::Value>,
2937 options: &ExecutionOptions,
2938) -> QueryResult {
2939 let has_next_page = if let Some(l) = limit {
2940 docs.len() > l
2941 } else {
2942 false
2943 };
2944
2945 if has_next_page {
2946 docs.truncate(limit.unwrap());
2947 }
2948
2949 let mut edges = Vec::with_capacity(docs.len());
2950 let mut end_cursor = String::new();
2951
2952 let node_fields = if let Some(edges_field) = sub_fields.iter().find(|f| f.name == "edges") {
2954 let edges_sub_fields =
2955 collect_fields(&edges_field.selection_set, fragments, variables, None)
2956 .unwrap_or_default();
2957 if let Some(node_field) = edges_sub_fields.into_iter().find(|f| f.name == "node") {
2958 collect_fields(&node_field.selection_set, fragments, variables, None)
2959 .unwrap_or_default()
2960 } else {
2961 Vec::new()
2962 }
2963 } else {
2964 Vec::new()
2965 };
2966
2967 for doc in &docs {
2968 let cursor = doc._sid.clone();
2969 end_cursor = cursor.clone();
2970
2971 let mut edge_data = HashMap::new();
2972 edge_data.insert("cursor".to_string(), Value::String(cursor));
2973
2974 let node_doc = if node_fields.is_empty() {
2975 doc.clone()
2976 } else {
2977 apply_projection(doc.clone(), &node_fields, options)
2978 };
2979
2980 edge_data.insert("node".to_string(), Value::Object(node_doc.data));
2981 edges.push(Value::Object(edge_data));
2982 }
2983
2984 let mut page_info = HashMap::new();
2985 page_info.insert("hasNextPage".to_string(), Value::Bool(has_next_page));
2986 page_info.insert("endCursor".to_string(), Value::String(end_cursor));
2987
2988 let mut conn_data = HashMap::new();
2989 conn_data.insert("edges".to_string(), Value::Array(edges));
2990 conn_data.insert("pageInfo".to_string(), Value::Object(page_info));
2991
2992 QueryResult {
2993 collection: String::new(),
2994 documents: vec![Document {
2995 _sid: "connection".to_string(),
2996 data: conn_data,
2997 }],
2998 total_count: None,
2999 deferred_fields: vec![],
3000 explain: None,
3001 }
3002}
3003
3004pub fn matches_filter(
3005 doc: &Document,
3006 filter: &CompiledFilter,
3007 vars: &HashMap<String, ast::Value>,
3008) -> bool {
3009 match filter {
3010 CompiledFilter::Eq(f, v) => {
3011 if let Some(dv) = doc.data.get(f) {
3012 values_equal(dv, v, vars)
3013 } else if f == "_sid" {
3014 values_equal(&Value::String(doc._sid.clone()), v, vars)
3015 } else {
3016 false
3017 }
3018 }
3019 CompiledFilter::Ne(f, v) => {
3020 if let Some(dv) = doc.data.get(f) {
3021 !values_equal(dv, v, vars)
3022 } else if f == "_sid" {
3023 !values_equal(&Value::String(doc._sid.clone()), v, vars)
3024 } else {
3025 true
3026 }
3027 }
3028 CompiledFilter::Gt(f, v) => {
3029 let dv_opt = doc.data.get(f).cloned().or_else(|| {
3030 if f == "_sid" { Some(Value::String(doc._sid.clone())) } else { None }
3031 });
3032 dv_opt.map_or(false, |dv| {
3033 if let Ok(bv) = aql_value_to_db_value(v, vars) {
3034 return dv > bv;
3035 }
3036 false
3037 })
3038 }
3039 CompiledFilter::Gte(f, v) => {
3040 let dv_opt = doc.data.get(f).cloned().or_else(|| {
3041 if f == "_sid" { Some(Value::String(doc._sid.clone())) } else { None }
3042 });
3043 dv_opt.map_or(false, |dv| {
3044 if let Ok(bv) = aql_value_to_db_value(v, vars) {
3045 return dv >= bv;
3046 }
3047 false
3048 })
3049 }
3050 CompiledFilter::Lt(f, v) => {
3051 let dv_opt = doc.data.get(f).cloned().or_else(|| {
3052 if f == "_sid" { Some(Value::String(doc._sid.clone())) } else { None }
3053 });
3054 dv_opt.map_or(false, |dv| {
3055 if let Ok(bv) = aql_value_to_db_value(v, vars) {
3056 return dv < bv;
3057 }
3058 false
3059 })
3060 }
3061 CompiledFilter::Lte(f, v) => {
3062 let dv_opt = doc.data.get(f).cloned().or_else(|| {
3063 if f == "_sid" { Some(Value::String(doc._sid.clone())) } else { None }
3064 });
3065 dv_opt.map_or(false, |dv| {
3066 if let Ok(bv) = aql_value_to_db_value(v, vars) {
3067 return dv <= bv;
3068 }
3069 false
3070 })
3071 }
3072 CompiledFilter::In(f, v) => {
3073 let dv_opt = doc.data.get(f).cloned().or_else(|| {
3074 if f == "_sid" { Some(Value::String(doc._sid.clone())) } else { None }
3075 });
3076 dv_opt.map_or(false, |dv| {
3077 if let Ok(Value::Array(arr)) = aql_value_to_db_value(v, vars) {
3078 return arr.contains(&dv);
3079 }
3080 false
3081 })
3082 }
3083 CompiledFilter::NotIn(f, v) => {
3084 let dv_opt = doc.data.get(f).cloned().or_else(|| {
3085 if f == "_sid" { Some(Value::String(doc._sid.clone())) } else { None }
3086 });
3087 dv_opt.map_or(true, |dv| {
3088 if let Ok(Value::Array(arr)) = aql_value_to_db_value(v, vars) {
3089 return !arr.contains(&dv);
3090 }
3091 true
3092 })
3093 }
3094 CompiledFilter::Contains(f, v) => {
3095 let dv_opt = doc.data.get(f).cloned().or_else(|| {
3096 if f == "_sid" { Some(Value::String(doc._sid.clone())) } else { None }
3097 });
3098 if let Some(dv) = dv_opt {
3099 match (dv, resolve_if_variable(v, vars)) {
3100 (Value::String(s), ast::Value::String(sub)) => s.contains(&*sub),
3101 (Value::Array(arr), _) => {
3102 if let Ok(bv) = aql_value_to_db_value(v, vars) {
3103 return arr.contains(&bv);
3104 }
3105 false
3106 }
3107 _ => false,
3108 }
3109 } else {
3110 false
3111 }
3112 }
3113 CompiledFilter::ContainsAny(f, v) => {
3115 let dv_opt = doc.data.get(f).cloned().or_else(|| {
3116 if f == "_sid" { Some(Value::String(doc._sid.clone())) } else { None }
3117 });
3118 if let (Some(Value::Array(field_arr)), Ok(Value::Array(check_arr))) =
3119 (dv_opt, aql_value_to_db_value(v, vars))
3120 {
3121 check_arr.iter().any(|item| field_arr.contains(item))
3122 } else {
3123 false
3124 }
3125 }
3126 CompiledFilter::ContainsAll(f, v) => {
3128 let dv_opt = doc.data.get(f).cloned().or_else(|| {
3129 if f == "_sid" { Some(Value::String(doc._sid.clone())) } else { None }
3130 });
3131 if let (Some(Value::Array(field_arr)), Ok(Value::Array(check_arr))) =
3132 (dv_opt, aql_value_to_db_value(v, vars))
3133 {
3134 check_arr.iter().all(|item| field_arr.contains(item))
3135 } else {
3136 false
3137 }
3138 }
3139 CompiledFilter::StartsWith(f, v) => {
3140 let dv_opt = doc.data.get(f).cloned().or_else(|| {
3141 if f == "_sid" { Some(Value::String(doc._sid.clone())) } else { None }
3142 });
3143 if let (Some(Value::String(s)), ast::Value::String(pre)) =
3144 (dv_opt, resolve_if_variable(v, vars))
3145 {
3146 s.starts_with(&*pre)
3147 } else {
3148 false
3149 }
3150 }
3151 CompiledFilter::EndsWith(f, v) => {
3152 let dv_opt = doc.data.get(f).cloned().or_else(|| {
3153 if f == "_sid" { Some(Value::String(doc._sid.clone())) } else { None }
3154 });
3155 if let (Some(Value::String(s)), ast::Value::String(suf)) =
3156 (dv_opt, resolve_if_variable(v, vars))
3157 {
3158 s.ends_with(&*suf)
3159 } else {
3160 false
3161 }
3162 }
3163 CompiledFilter::Matches(f, re) => {
3164 let dv_opt = doc.data.get(f).cloned().or_else(|| {
3165 if f == "_sid" { Some(Value::String(doc._sid.clone())) } else { None }
3166 });
3167 if let Some(Value::String(s)) = dv_opt {
3168 re.is_match(&s)
3169 } else {
3170 false
3171 }
3172 }
3173 CompiledFilter::IsNull(f) => {
3174 if f == "_sid" {
3175 false } else {
3177 doc.data.get(f).map_or(true, |v| matches!(v, Value::Null))
3178 }
3179 }
3180 CompiledFilter::IsNotNull(f) => {
3181 if f == "_sid" {
3182 true } else {
3184 doc.data.get(f).map_or(false, |v| !matches!(v, Value::Null))
3185 }
3186 }
3187 CompiledFilter::And(fs) => fs.iter().all(|f| matches_filter(doc, f, vars)),
3188 CompiledFilter::Or(fs) => fs.iter().any(|f| matches_filter(doc, f, vars)),
3189 CompiledFilter::Not(f) => !matches_filter(doc, f, vars),
3190 }
3191}
3192
3193fn apply_field_modifier(existing: Option<&Value>, new_val: &Value) -> Value {
3198 if let Value::Object(modifier) = new_val {
3199 if let Some(delta) = modifier.get("increment") {
3200 match (existing, delta) {
3201 (Some(Value::Int(c)), Value::Int(d)) => return Value::Int(c + d),
3202 (Some(Value::Float(c)), Value::Float(d)) => return Value::Float(c + d),
3203 (Some(Value::Int(c)), Value::Float(d)) => return Value::Float(*c as f64 + d),
3204 _ => {}
3205 }
3206 }
3207 if let Some(delta) = modifier.get("decrement") {
3208 match (existing, delta) {
3209 (Some(Value::Int(c)), Value::Int(d)) => return Value::Int(c - d),
3210 (Some(Value::Float(c)), Value::Float(d)) => return Value::Float(c - d),
3211 (Some(Value::Int(c)), Value::Float(d)) => return Value::Float(*c as f64 - d),
3212 _ => {}
3213 }
3214 }
3215 if let Some(item) = modifier.get("push") {
3216 if let Some(Value::Array(mut arr)) = existing.cloned() {
3217 arr.push(item.clone());
3218 return Value::Array(arr);
3219 }
3220 return Value::Array(vec![item.clone()]);
3221 }
3222 if let Some(item) = modifier.get("pull") {
3223 if let Some(Value::Array(arr)) = existing {
3224 let filtered: Vec<Value> = arr.iter().filter(|v| *v != item).cloned().collect();
3225 return Value::Array(filtered);
3226 }
3227 return Value::Array(vec![]);
3228 }
3229 if let Some(item) = modifier.get("addToSet") {
3230 if let Some(Value::Array(mut arr)) = existing.cloned() {
3231 if !arr.contains(item) {
3232 arr.push(item.clone());
3233 }
3234 return Value::Array(arr);
3235 }
3236 return Value::Array(vec![item.clone()]);
3237 }
3238 }
3239 new_val.clone()
3240}
3241
3242fn values_equal(dv: &Value, av: &ast::Value, vars: &HashMap<String, ast::Value>) -> bool {
3243 if let Ok(bv) = aql_value_to_db_value(av, vars) {
3244 return dv == &bv;
3245 }
3246 false
3247}
3248
3249fn resolve_if_variable<'a>(
3250 v: &'a ast::Value,
3251 vars: &'a HashMap<String, ast::Value>,
3252) -> &'a ast::Value {
3253 if let ast::Value::Variable(n) = v {
3254 vars.get(n).unwrap_or(v)
3255 } else {
3256 v
3257 }
3258}
3259
3260pub fn apply_projection(
3261 doc: Document,
3262 fields: &[ast::Field],
3263 options: &ExecutionOptions,
3264) -> Document {
3265 let (projected, _) = apply_projection_and_defer(doc, fields, options);
3266 projected
3267}
3268
3269async fn apply_projection_with_lookups(
3272 db: &Aurora,
3273 mut doc: Document,
3274 fields: &[ast::Field],
3275 vars: &HashMap<String, ast::Value>,
3276 options: &ExecutionOptions,
3277) -> Result<(Document, Vec<String>)> {
3278 if fields.is_empty() {
3279 return Ok((doc, vec![]));
3280 }
3281 let mut proj = HashMap::new();
3282 let mut deferred = Vec::new();
3283
3284 for f in fields {
3285 if f.directives.iter().any(|d| d.name == "defer") {
3287 deferred.push(f.alias.as_ref().unwrap_or(&f.name).clone());
3288 continue;
3289 }
3290
3291 let coll_arg = f.arguments.iter().find(|a| a.name == "collection");
3293 let local_arg = f.arguments.iter().find(|a| a.name == "localField");
3294 let foreign_arg = f.arguments.iter().find(|a| a.name == "foreignField");
3295
3296 if let (Some(c), Some(lf), Some(ff)) = (coll_arg, local_arg, foreign_arg) {
3297 let c_val = resolve_if_variable(&c.value, vars);
3299 let lf_val = resolve_if_variable(&lf.value, vars);
3300 let ff_val = resolve_if_variable(&ff.value, vars);
3301 let foreign_coll = if let ast::Value::String(s) = c_val {
3302 s.as_str()
3303 } else {
3304 continue;
3305 };
3306 let local_field = if let ast::Value::String(s) = lf_val {
3307 s.as_str()
3308 } else {
3309 continue;
3310 };
3311 let foreign_field = if let ast::Value::String(s) = ff_val {
3312 s.as_str()
3313 } else {
3314 continue;
3315 };
3316
3317 let local_val = doc.data.get(local_field).cloned().or_else(|| {
3319 if local_field == "_sid" {
3320 Some(Value::String(doc._sid.clone()))
3321 } else {
3322 None
3323 }
3324 });
3325
3326 if let Some(match_val) = local_val {
3327 let extra_filter = f
3329 .arguments
3330 .iter()
3331 .find(|a| a.name == "where")
3332 .and_then(|a| {
3333 let resolved = resolve_ast_deep(&a.value, vars);
3334 value_to_filter(&resolved).ok()
3335 });
3336
3337 let vars_arc = Arc::new(vars.clone());
3338 let foreign_docs = db.scan_and_filter(
3339 foreign_coll,
3340 |fdoc| {
3341 let field_match = fdoc
3342 .data
3343 .get(foreign_field)
3344 .map(|v| values_equal_db(v, &match_val))
3345 .unwrap_or(foreign_field == "_sid" && fdoc._sid == match_val.to_string());
3346 if !field_match {
3347 return false;
3348 }
3349 if let Some(ref ef) = extra_filter {
3350 let compiled = compile_filter(ef)
3351 .unwrap_or(CompiledFilter::Eq("_".into(), ast::Value::Null));
3352 matches_filter(fdoc, &compiled, &vars_arc)
3353 } else {
3354 true
3355 }
3356 },
3357 None,
3358 )?;
3359
3360 let sub_projected: Vec<Value> = if f.selection_set.is_empty() {
3362 foreign_docs
3363 .into_iter()
3364 .map(|fd| Value::Object(fd.data))
3365 .collect()
3366 } else {
3367 let sub_fields: Vec<ast::Field> = f
3368 .selection_set
3369 .iter()
3370 .filter_map(|sel| {
3371 if let Selection::Field(sf) = sel {
3372 Some(sf.clone())
3373 } else {
3374 None
3375 }
3376 })
3377 .collect();
3378 foreign_docs
3379 .into_iter()
3380 .map(|fd| {
3381 let (proj_fd, _) = apply_projection_and_defer(fd, &sub_fields, options);
3382 Value::Object(proj_fd.data)
3383 })
3384 .collect()
3385 };
3386
3387 let alias = f.alias.as_ref().unwrap_or(&f.name).clone();
3388 proj.insert(alias, Value::Array(sub_projected));
3389 }
3390 continue;
3391 }
3392
3393 if f.name == "__compute__" {
3395 let alias = f.alias.as_deref().unwrap_or("computed");
3396 if let Some(expr) = f.arguments.iter().find(|a| a.name == "expr") {
3397 let resolved_expr = resolve_if_variable(&expr.value, vars);
3398 if let ast::Value::String(template) = resolved_expr {
3399 let result = eval_template(template, &doc.data);
3400 proj.insert(alias.to_string(), Value::String(result));
3401 }
3402 }
3403 continue;
3404 }
3405
3406 if f.name == "id" {
3408 if let Some(v) = doc.data.get("id") {
3409 proj.insert(f.alias.as_ref().unwrap_or(&f.name).clone(), v.clone());
3410 } else {
3411 proj.insert(f.alias.as_ref().unwrap_or(&f.name).clone(), Value::Null);
3412 }
3413 } else if f.name == "_sid" {
3414 proj.insert(
3415 f.alias.as_ref().unwrap_or(&f.name).clone(),
3416 Value::String(doc._sid.clone()),
3417 );
3418 } else if let Some(v) = doc.data.get(&f.name) {
3419 proj.insert(f.alias.as_ref().unwrap_or(&f.name).clone(), v.clone());
3420 }
3421 }
3422 doc.data = proj;
3423 Ok((doc, deferred))
3424}
3425
3426fn values_equal_db(a: &Value, b: &Value) -> bool {
3427 match (a, b) {
3428 (Value::String(s1), Value::String(s2)) => s1 == s2,
3429 (Value::Int(i1), Value::Int(i2)) => i1 == i2,
3430 (Value::Float(f1), Value::Float(f2)) => (f1 - f2).abs() < f64::EPSILON,
3431 (Value::Bool(b1), Value::Bool(b2)) => b1 == b2,
3432 _ => false,
3433 }
3434}
3435
3436pub fn aql_value_to_db_value(v: &ast::Value, vars: &HashMap<String, ast::Value>) -> Result<Value> {
3437 let resolved = resolve_if_variable(v, vars);
3438 match resolved {
3439 ast::Value::Int(i) => Ok(Value::Int(*i)),
3440 ast::Value::Float(f) => Ok(Value::Float(*f)),
3441 ast::Value::Boolean(b) => Ok(Value::Bool(*b)),
3442 ast::Value::String(s) => Ok(Value::String(s.clone())),
3443 ast::Value::Enum(s) => Ok(Value::String(s.clone())),
3444 ast::Value::Null => Ok(Value::Null),
3445 ast::Value::Variable(name) => Err(AqlError::new(
3446 ErrorCode::UndefinedVariable,
3447 format!("Variable '{}' not found", name),
3448 )),
3449 ast::Value::Array(arr) => {
3450 let mut vals = Vec::with_capacity(arr.len());
3451 for v in arr {
3452 vals.push(aql_value_to_db_value(v, vars)?);
3453 }
3454 Ok(Value::Array(vals))
3455 }
3456 ast::Value::Object(obj) => {
3457 let mut map = HashMap::with_capacity(obj.len());
3458 for (k, v) in obj {
3459 map.insert(k.clone(), aql_value_to_db_value(v, vars)?);
3460 }
3461 Ok(Value::Object(map))
3462 }
3463 }
3464}
3465
3466fn aql_value_to_json(v: &ast::Value) -> serde_json::Value {
3468 match v {
3469 ast::Value::Null => serde_json::Value::Null,
3470 ast::Value::Boolean(b) => serde_json::Value::Bool(*b),
3471 ast::Value::Int(i) => serde_json::Value::Number((*i).into()),
3472 ast::Value::Float(f) => serde_json::Number::from_f64(*f)
3473 .map(serde_json::Value::Number)
3474 .unwrap_or(serde_json::Value::Null),
3475 ast::Value::String(s) | ast::Value::Enum(s) => serde_json::Value::String(s.clone()),
3476 ast::Value::Array(arr) => {
3477 serde_json::Value::Array(arr.iter().map(aql_value_to_json).collect())
3478 }
3479 ast::Value::Object(obj) => serde_json::Value::Object(
3480 obj.iter()
3481 .map(|(k, v)| (k.clone(), aql_value_to_json(v)))
3482 .collect(),
3483 ),
3484 ast::Value::Variable(_) => serde_json::Value::Null,
3485 }
3486}
3487
3488fn aql_value_to_hashmap(
3489 v: &ast::Value,
3490 vars: &HashMap<String, ast::Value>,
3491) -> Result<HashMap<String, Value>> {
3492 if let ast::Value::Object(m) = resolve_if_variable(v, vars) {
3493 let mut res = HashMap::new();
3494 for (k, v) in m {
3495 res.insert(k.clone(), aql_value_to_db_value(v, vars)?);
3496 }
3497 Ok(res)
3498 } else {
3499 Err(AqlError::new(
3500 ErrorCode::QueryError,
3501 "Data must be object".to_string(),
3502 ))
3503 }
3504}
3505
3506fn aurora_value_to_json_value(v: &Value) -> JsonValue {
3507 match v {
3508 Value::Null => JsonValue::Null,
3509 Value::String(s) => JsonValue::String(s.clone()),
3510 Value::Int(i) => JsonValue::Number((*i).into()),
3511 Value::Float(f) => serde_json::Number::from_f64(*f)
3512 .map(JsonValue::Number)
3513 .unwrap_or(JsonValue::Null),
3514 Value::Bool(b) => JsonValue::Bool(*b),
3515 Value::Array(arr) => JsonValue::Array(arr.iter().map(aurora_value_to_json_value).collect()),
3516 Value::Object(m) => {
3517 let mut jm = serde_json::Map::new();
3518 for (k, v) in m {
3519 jm.insert(k.clone(), aurora_value_to_json_value(v));
3520 }
3521 JsonValue::Object(jm)
3522 }
3523 Value::Uuid(u) => JsonValue::String(u.to_string()),
3524 Value::DateTime(dt) => JsonValue::String(dt.to_rfc3339()),
3525 }
3526}
3527
3528fn find_indexed_equality_filter(
3530 filter: &AqlFilter,
3531 db: &Aurora,
3532 collection: &str,
3533) -> Option<(String, ast::Value)> {
3534 match filter {
3535 AqlFilter::Eq(field, val) => {
3536 if field == "_sid" || db.has_index(collection, field) {
3537 Some((field.clone(), val.clone()))
3538 } else {
3539 None
3540 }
3541 }
3542 AqlFilter::And(filters) => {
3543 for f in filters {
3544 if let Some(res) = find_indexed_equality_filter(f, db, collection) {
3545 return Some(res);
3546 }
3547 }
3548 None
3549 }
3550 _ => None,
3551 }
3552}
3553
3554pub fn value_to_filter(v: &ast::Value) -> Result<AqlFilter> {
3555 if let ast::Value::Object(m) = v {
3556 let mut fs = Vec::new();
3557 for (k, val) in m {
3558 match k.as_str() {
3559 "or" => {
3560 if let ast::Value::Array(arr) = val {
3561 let mut sub = Vec::new();
3562 for item in arr {
3563 sub.push(value_to_filter(item)?);
3564 }
3565 return Ok(AqlFilter::Or(sub));
3566 }
3567 }
3568 "and" => {
3569 if let ast::Value::Array(arr) = val {
3570 let mut sub = Vec::new();
3571 for item in arr {
3572 sub.push(value_to_filter(item)?);
3573 }
3574 return Ok(AqlFilter::And(sub));
3575 }
3576 }
3577 "not" => {
3578 return Ok(AqlFilter::Not(Box::new(value_to_filter(val)?)));
3579 }
3580 field => {
3581 if let ast::Value::Object(ops) = val {
3582 for (op, ov) in ops {
3583 match op.as_str() {
3584 "eq" => fs.push(AqlFilter::Eq(field.to_string(), ov.clone())),
3585 "ne" => fs.push(AqlFilter::Ne(field.to_string(), ov.clone())),
3586 "gt" => fs.push(AqlFilter::Gt(field.to_string(), ov.clone())),
3587 "gte" => fs.push(AqlFilter::Gte(field.to_string(), ov.clone())),
3588 "lt" => fs.push(AqlFilter::Lt(field.to_string(), ov.clone())),
3589 "lte" => fs.push(AqlFilter::Lte(field.to_string(), ov.clone())),
3590 "in" => fs.push(AqlFilter::In(field.to_string(), ov.clone())),
3591 "notin" => fs.push(AqlFilter::NotIn(field.to_string(), ov.clone())),
3592 "contains" => {
3593 fs.push(AqlFilter::Contains(field.to_string(), ov.clone()))
3594 }
3595 "containsAny" => {
3596 fs.push(AqlFilter::ContainsAny(field.to_string(), ov.clone()))
3597 }
3598 "containsAll" => {
3599 fs.push(AqlFilter::ContainsAll(field.to_string(), ov.clone()))
3600 }
3601 "startsWith" => {
3602 fs.push(AqlFilter::StartsWith(field.to_string(), ov.clone()))
3603 }
3604 "endsWith" => {
3605 fs.push(AqlFilter::EndsWith(field.to_string(), ov.clone()))
3606 }
3607 "matches" => {
3608 fs.push(AqlFilter::Matches(field.to_string(), ov.clone()))
3609 }
3610 _ => {}
3611 }
3612 }
3613 }
3614 }
3615 }
3616 }
3617 if fs.is_empty() {
3618 Ok(AqlFilter::And(vec![]))
3619 } else if fs.len() == 1 {
3620 Ok(fs.remove(0))
3621 } else {
3622 Ok(AqlFilter::And(fs))
3623 }
3624 } else {
3625 Err(AqlError::new(
3626 ErrorCode::QueryError,
3627 "Filter must be object".to_string(),
3628 ))
3629 }
3630}
3631
3632fn resolve_value(
3633 v: &ast::Value,
3634 vars: &HashMap<String, ast::Value>,
3635 _ctx: &ExecutionContext,
3636) -> ast::Value {
3637 match v {
3638 ast::Value::Variable(n) => vars.get(n).cloned().unwrap_or(ast::Value::Null),
3639 _ => v.clone(),
3640 }
3641}
3642
3643fn resolve_ast_deep(v: &ast::Value, vars: &HashMap<String, ast::Value>) -> ast::Value {
3648 match v {
3649 ast::Value::Variable(n) => vars.get(n).cloned().unwrap_or(ast::Value::Null),
3650 ast::Value::Object(m) => ast::Value::Object(
3651 m.iter()
3652 .map(|(k, val)| (k.clone(), resolve_ast_deep(val, vars)))
3653 .collect(),
3654 ),
3655 ast::Value::Array(arr) => {
3656 ast::Value::Array(arr.iter().map(|val| resolve_ast_deep(val, vars)).collect())
3657 }
3658 _ => v.clone(),
3659 }
3660}
3661
3662#[cfg(test)]
3663mod tests {
3664 use super::*;
3665 use crate::{Aurora, AuroraConfig, DurabilityMode, FieldType};
3666 use tempfile::TempDir;
3667
3668 #[tokio::test]
3669 async fn test_executor_integration() {
3670 let td = TempDir::new().unwrap();
3671 let db = Aurora::with_config(AuroraConfig {
3672 db_path: td.path().join("test.db"),
3673 enable_write_buffering: false,
3674 durability_mode: DurabilityMode::Strict,
3675 ..Default::default()
3676 })
3677 .await
3678 .unwrap();
3679 db.new_collection(
3680 "users",
3681 vec![(
3682 "name",
3683 crate::types::FieldDefinition {
3684 field_type: FieldType::SCALAR_STRING,
3685 unique: false,
3686 indexed: true,
3687 nullable: true,
3688 ..Default::default()
3689 },
3690 )],
3691 )
3692 .await
3693 .unwrap();
3694 let _ = execute(
3695 &db,
3696 r#"mutation { insertInto(collection: "users", data: { name: "Alice" }) { id name } }"#,
3697 ExecutionOptions::new(),
3698 )
3699 .await
3700 .unwrap();
3701 let res = execute(&db, "query { users { name } }", ExecutionOptions::new())
3702 .await
3703 .unwrap();
3704 if let ExecutionResult::Query(q) = res {
3705 assert_eq!(q.documents.len(), 1);
3706 } else {
3707 panic!("Expected query");
3708 }
3709 }
3710}