1use super::ast::{self, Field, Filter as AqlFilter, FragmentDef, MutationOp, Operation, Selection};
7use super::executor_utils::{CompiledFilter, compile_filter};
8
9use crate::Aurora;
10use crate::error::{AqlError, ErrorCode, Result};
11use crate::types::{Document, Value};
12use serde::Serialize;
13use serde_json::Value as JsonValue;
14use std::collections::HashMap;
15
16pub type ExecutionContext = HashMap<String, JsonValue>;
17
18fn collect_fields(
20 selection_set: &[Selection],
21 fragments: &HashMap<String, FragmentDef>,
22 variable_values: &HashMap<String, ast::Value>,
23 parent_type: Option<&str>,
24) -> Result<Vec<Field>> {
25 let mut fields = Vec::new();
26
27 for selection in selection_set {
28 match selection {
29 Selection::Field(field) => {
30 if should_include(&field.directives, variable_values)? {
32 fields.push(field.clone());
33 }
34 }
35 Selection::FragmentSpread(name) => {
36 if let Some(fragment) = fragments.get(name) {
37 let type_match = if let Some(parent) = parent_type {
39 parent == fragment.type_condition
40 } else {
41 true };
43
44 if type_match {
45 let fragment_fields = collect_fields(
46 &fragment.selection_set,
47 fragments,
48 variable_values,
49 parent_type,
50 )?;
51 fields.extend(fragment_fields);
52 }
53 }
54 }
55 Selection::InlineFragment(inline) => {
56 let type_match = if let Some(parent) = parent_type {
58 parent == inline.type_condition
59 } else {
60 true };
62
63 if type_match {
64 let inline_fields = collect_fields(
65 &inline.selection_set,
66 fragments,
67 variable_values,
68 parent_type,
69 )?;
70 fields.extend(inline_fields);
71 }
72 }
73 }
74 }
75
76 Ok(fields)
77}
78
79fn should_include(
81 directives: &[ast::Directive],
82 variables: &HashMap<String, ast::Value>,
83) -> Result<bool> {
84 for dir in directives {
85 if dir.name == "skip" {
86 if let Some(arg) = dir.arguments.iter().find(|a| a.name == "if") {
87 let should_skip = resolve_boolean_arg(&arg.value, variables)?;
88 if should_skip {
89 return Ok(false);
90 }
91 }
92 } else if dir.name == "include" {
93 if let Some(arg) = dir.arguments.iter().find(|a| a.name == "if") {
94 let should_include = resolve_boolean_arg(&arg.value, variables)?;
95 if !should_include {
96 return Ok(false);
97 }
98 }
99 }
100 }
101 Ok(true)
102}
103
104fn resolve_boolean_arg(
105 value: &ast::Value,
106 variables: &HashMap<String, ast::Value>,
107) -> Result<bool> {
108 match value {
109 ast::Value::Boolean(b) => Ok(*b),
110 ast::Value::Variable(name) => {
111 if let Some(val) = variables.get(name) {
112 match val {
113 ast::Value::Boolean(b) => Ok(*b),
114 _ => Err(AqlError::new(
115 ErrorCode::TypeError,
116 format!("Variable '{}' is not a boolean, got {:?}", name, val),
117 )),
118 }
119 } else {
120 Err(AqlError::new(
121 ErrorCode::UndefinedVariable,
122 format!("Variable '{}' is not defined", name),
123 ))
124 }
125 }
126 _ => Err(AqlError::new(
127 ErrorCode::TypeError,
128 format!("Expected boolean value, got {:?}", value),
129 )),
130 }
131}
132
133fn validate_required_variables(
135 variable_definitions: &[ast::VariableDefinition],
136 provided_variables: &HashMap<String, ast::Value>,
137) -> Result<()> {
138 for var_def in variable_definitions {
139 if var_def.var_type.is_required {
141 if !provided_variables.contains_key(&var_def.name) {
143 if var_def.default_value.is_none() {
145 return Err(AqlError::new(
146 ErrorCode::UndefinedVariable,
147 format!(
148 "Required variable '{}' (type: {}{}) is not provided",
149 var_def.name,
150 var_def.var_type.name,
151 if var_def.var_type.is_required {
152 "!"
153 } else {
154 ""
155 }
156 ),
157 ));
158 }
159 }
160 }
161 }
162 Ok(())
163}
164
165#[derive(Debug)]
167pub enum ExecutionResult {
168 Query(QueryResult),
170 Mutation(MutationResult),
172 Subscription(SubscriptionResult),
174 Batch(Vec<ExecutionResult>),
176 Schema(SchemaResult),
178 Migration(MigrationResult),
180}
181
182#[derive(Debug, Clone)]
183pub struct SchemaResult {
184 pub operation: String,
185 pub collection: String,
186 pub status: String,
187}
188
189#[derive(Debug, Clone)]
190pub struct MigrationResult {
191 pub version: String,
192 pub steps_applied: usize,
193 pub status: String,
194}
195
196#[derive(Debug, Clone, Serialize)]
197pub struct ExecutionPlan {
198 pub operations: Vec<String>,
199 pub estimated_cost: f64,
200}
201
202#[derive(Debug, Clone)]
204pub struct QueryResult {
205 pub collection: String,
206 pub documents: Vec<Document>,
207 pub total_count: Option<usize>,
208}
209
210#[derive(Debug, Clone)]
212pub struct MutationResult {
213 pub operation: String,
214 pub collection: String,
215 pub affected_count: usize,
216 pub returned_documents: Vec<Document>,
217}
218
219#[derive(Debug)]
221pub struct SubscriptionResult {
222 pub subscription_id: String,
223 pub collection: String,
224 pub stream: Option<crate::pubsub::ChangeListener>,
225}
226
227#[derive(Debug, Clone, Default)]
229pub struct ExecutionOptions {
230 pub skip_validation: bool,
232 pub apply_projections: bool,
234 pub variables: HashMap<String, JsonValue>,
236}
237
238impl ExecutionOptions {
239 pub fn new() -> Self {
240 Self {
241 skip_validation: false,
242 apply_projections: true,
243 variables: HashMap::new(),
244 }
245 }
246
247 pub fn with_variables(mut self, vars: HashMap<String, JsonValue>) -> Self {
248 self.variables = vars;
249 self
250 }
251
252 pub fn skip_validation(mut self) -> Self {
253 self.skip_validation = true;
254 self
255 }
256}
257
258pub async fn execute(db: &Aurora, aql: &str, options: ExecutionOptions) -> Result<ExecutionResult> {
260 let variables = serde_json::Value::Object(options.variables.clone().into_iter().collect());
262 let doc = super::parse_with_variables(aql, variables)?;
263
264 execute_document(db, &doc, &options).await
266}
267
268pub async fn execute_document(
270 db: &Aurora,
271 doc: &ast::Document,
272 options: &ExecutionOptions,
273) -> Result<ExecutionResult> {
274 if doc.operations.is_empty() {
275 return Err(AqlError::new(
276 ErrorCode::QueryError,
277 "No operations in document".to_string(),
278 ));
279 }
280
281 let fragments: HashMap<String, FragmentDef> = doc
283 .operations
284 .iter()
285 .filter_map(|op| {
286 if let Operation::FragmentDefinition(frag) = op {
287 Some((frag.name.clone(), frag.clone()))
288 } else {
289 None
290 }
291 })
292 .collect();
293
294 let executable_ops: Vec<&Operation> = doc
296 .operations
297 .iter()
298 .filter(|op| !matches!(op, Operation::FragmentDefinition(_)))
299 .collect();
300
301 if executable_ops.is_empty() {
302 return Err(AqlError::new(
303 ErrorCode::QueryError,
304 "No executable operations in document".to_string(),
305 ));
306 }
307
308 if executable_ops.len() == 1 {
309 execute_operation(db, executable_ops[0], options, &fragments).await
310 } else {
311 let mut results = Vec::new();
312 for op in executable_ops {
313 results.push(execute_operation(db, op, options, &fragments).await?);
314 }
315 Ok(ExecutionResult::Batch(results))
316 }
317}
318
319async fn execute_operation(
321 db: &Aurora,
322 op: &Operation,
323 options: &ExecutionOptions,
324 fragments: &HashMap<String, FragmentDef>,
325) -> Result<ExecutionResult> {
326 match op {
327 Operation::Query(query) => execute_query(db, query, options, fragments).await,
328 Operation::Mutation(mutation) => execute_mutation(db, mutation, options, fragments).await,
329 Operation::Subscription(sub) => execute_subscription(db, sub, options).await,
330 Operation::Schema(schema) => execute_schema(db, schema, options).await,
331 Operation::Migration(migration) => execute_migration(db, migration, options).await,
332 Operation::FragmentDefinition(_) => {
333 Ok(ExecutionResult::Query(QueryResult {
335 collection: "__fragment".to_string(),
336 documents: vec![],
337 total_count: Some(0),
338 }))
339 }
340 Operation::Introspection(intro) => execute_introspection(db, intro).await,
341 Operation::Handler(_) => {
342 Ok(ExecutionResult::Query(QueryResult {
344 collection: "__handler".to_string(),
345 documents: vec![],
346 total_count: Some(0),
347 }))
348 }
349 }
350}
351
352async fn execute_query(
354 db: &Aurora,
355 query: &ast::Query,
356 options: &ExecutionOptions,
357 fragments: &HashMap<String, FragmentDef>,
358) -> Result<ExecutionResult> {
359 validate_required_variables(&query.variable_definitions, &query.variables_values)?;
361
362 let root_fields = collect_fields(
364 &query.selection_set,
365 fragments,
366 &query.variables_values,
367 None,
368 )?;
369
370 if root_fields.is_empty() {
372 return Ok(ExecutionResult::Query(QueryResult {
373 collection: String::new(),
374 documents: vec![],
375 total_count: Some(0),
376 }));
377 }
378
379 let mut results = Vec::new();
381 for field in &root_fields {
382 let result =
383 execute_collection_query(db, field, &query.variables_values, options, fragments)
384 .await?;
385 results.push(result);
386 }
387
388 if results.len() == 1 {
389 Ok(ExecutionResult::Query(results.remove(0)))
390 } else {
391 Ok(ExecutionResult::Batch(
392 results.into_iter().map(ExecutionResult::Query).collect(),
393 ))
394 }
395}
396
397async fn execute_collection_query(
399 db: &Aurora,
400 field: &ast::Field,
401 variables: &HashMap<String, ast::Value>,
402 options: &ExecutionOptions,
403 fragments: &HashMap<String, FragmentDef>,
404) -> Result<QueryResult> {
405 let collection_name = &field.name;
406
407 let sub_fields = collect_fields(
410 &field.selection_set,
411 fragments,
412 variables,
413 Some(&field.name),
414 )?;
415 let filter = extract_filter_from_args(&field.arguments)?;
416
417 let (limit, offset) = extract_pagination(&field.arguments);
419 let (first, after, _last, _before) = extract_cursor_pagination(&field.arguments);
420
421 let is_connection = sub_fields
424 .iter()
425 .any(|f| f.name == "edges" || f.name == "pageInfo");
426
427 let orderings = extract_order_by(&field.arguments);
429
430 let compiled_filter = if let Some(ref f) = filter {
432 Some(compile_filter(f)?)
433 } else {
434 None
435 };
436
437 let filter_fn = |doc: &Document| {
439 compiled_filter
440 .as_ref()
441 .map(|f| matches_filter(doc, f, variables))
442 .unwrap_or(true)
443 };
444
445 let target = if orderings.is_empty() {
447 limit.map(|l| l + offset)
448 } else {
449 None
450 };
451
452 let mut filtered_docs_iter = db.scan_and_filter(collection_name, filter_fn, target)?;
453
454 let orderings = extract_order_by(&field.arguments);
456 if !orderings.is_empty() {
457 apply_ordering(&mut filtered_docs_iter, &orderings);
458 }
459
460 let total_count = filtered_docs_iter.len();
461
462 let final_docs = if is_connection {
463 filtered_docs_iter.sort_by(|a, b| a.id.cmp(&b.id));
466
467 if let Some(after_cursor) = after {
469 if let Ok(after_id) = decode_cursor(&after_cursor) {
471 if let Some(pos) = filtered_docs_iter.iter().position(|d| d.id == after_id) {
474 filtered_docs_iter = filtered_docs_iter.into_iter().skip(pos + 1).collect();
476 }
477 }
478 }
479
480 let has_next_page = if let Some(l) = first {
481 filtered_docs_iter.len() > l
482 } else {
483 false
484 };
485
486 if let Some(l) = first {
488 filtered_docs_iter.truncate(l);
489 }
490
491 let mut edges = Vec::new();
493 let mut end_cursor = None;
494
495 for doc in filtered_docs_iter {
496 let cursor = encode_cursor(&Value::String(doc.id.clone()));
497 end_cursor = Some(cursor.clone());
498
499 let mut edge_data = HashMap::new();
500 edge_data.insert("cursor".to_string(), Value::String(cursor));
501
502 if let Some(edges_field) = sub_fields.iter().find(|f| f.name == "edges") {
505 let edges_sub_fields =
506 collect_fields(&edges_field.selection_set, fragments, variables, None)?;
507
508 if let Some(node_field) = edges_sub_fields.iter().find(|f| f.name == "node") {
509 let node_doc = apply_projection_with_lookups(
511 db,
512 doc.clone(), &node_field.selection_set,
514 variables,
515 fragments,
516 collection_name,
517 )
518 .await?;
519 edge_data.insert("node".to_string(), Value::Object(node_doc.data));
520 }
521 }
522
523 edges.push(Value::Object(edge_data));
524 }
525
526 let mut page_info_data = HashMap::new();
528 page_info_data.insert("hasNextPage".to_string(), Value::Bool(has_next_page));
529 if let Some(ec) = end_cursor {
530 page_info_data.insert("endCursor".to_string(), Value::String(ec));
531 } else {
532 page_info_data.insert("endCursor".to_string(), Value::Null);
533 }
534
535 let mut conn_data = HashMap::new();
537 conn_data.insert("edges".to_string(), Value::Array(edges));
538 conn_data.insert("pageInfo".to_string(), Value::Object(page_info_data));
539
540 vec![Document {
541 id: "connection".to_string(),
542 data: conn_data,
543 }]
544 } else {
545 let paginated_docs: Vec<Document> = filtered_docs_iter
549 .into_iter()
550 .skip(offset)
551 .take(limit.unwrap_or(usize::MAX))
552 .collect();
553
554 let has_aggregation =
556 !sub_fields.is_empty() && sub_fields.iter().any(|f| f.name == "aggregate");
557
558 let group_by_field = if !sub_fields.is_empty() {
560 sub_fields.iter().find(|f| f.name == "groupBy")
561 } else {
562 None
563 };
564
565 if let Some(gb_field) = group_by_field {
566 execute_group_by(&paginated_docs, gb_field, fragments, variables)?
567 } else if has_aggregation {
568 let agg_doc = execute_aggregation(&paginated_docs, &sub_fields, fragments, variables)?;
569 vec![agg_doc]
570 } else if options.apply_projections && !sub_fields.is_empty() {
571 let mut projected = Vec::new();
572 for doc in paginated_docs {
573 projected.push(
574 apply_projection_with_lookups(
575 db,
576 doc,
577 &field.selection_set,
578 variables,
579 fragments,
580 collection_name,
581 )
582 .await?,
583 );
584 }
585 projected
586 } else {
587 paginated_docs
588 }
589 };
590
591 Ok(QueryResult {
592 collection: collection_name.clone(),
593 documents: final_docs,
594 total_count: Some(total_count),
595 })
596}
597
598fn execute_group_by(
600 docs: &[Document],
601 group_by_field: &ast::Field,
602 fragments: &HashMap<String, FragmentDef>,
603 variables: &HashMap<String, ast::Value>,
604) -> Result<Vec<Document>> {
605 let key_field_name = group_by_field
607 .arguments
608 .iter()
609 .find(|a| a.name == "field")
610 .and_then(|a| match &a.value {
611 ast::Value::String(s) => Some(s.clone()),
612 _ => None,
613 })
614 .ok_or_else(|| {
615 AqlError::new(
616 ErrorCode::QueryError,
617 "groupBy requires a 'field' argument".to_string(),
618 )
619 })?;
620
621 let mut groups: HashMap<String, Vec<&Document>> = HashMap::new();
623
624 for doc in docs {
625 let val = doc.data.get(&key_field_name).unwrap_or(&Value::Null);
626 let key_str = match val {
627 Value::String(s) => s.clone(),
628 Value::Int(i) => i.to_string(),
629 Value::Float(f) => f.to_string(),
630 Value::Bool(b) => b.to_string(),
631 Value::Null => "null".to_string(),
632 _ => format!("{:?}", val), };
634
635 groups.entry(key_str).or_default().push(doc);
636 }
637
638 let mut result_docs = Vec::new();
640
641 for (group_key, group_docs) in groups {
642 let mut group_data = HashMap::new();
643
644 let group_fields =
647 collect_fields(&group_by_field.selection_set, fragments, variables, None)?;
648
649 for field in &group_fields {
650 let alias = field.alias.as_ref().unwrap_or(&field.name).clone();
651
652 match field.name.as_str() {
653 "key" => {
654 group_data.insert(alias, Value::String(group_key.clone()));
655 }
656 "count" => {
657 group_data.insert(alias, Value::Int(group_docs.len() as i64));
658 }
659 "nodes" => {
660 let nodes: Vec<Value> = group_docs
662 .iter()
663 .map(|d| {
664 if !field.selection_set.is_empty() {
665 let proj_fields = collect_fields(
670 &field.selection_set,
671 fragments,
672 variables,
673 None,
674 )
675 .unwrap_or_default();
676 let proj = apply_projection((*d).clone(), &proj_fields);
677 Value::Object(proj.data)
678 } else {
679 Value::Object(d.data.clone())
680 }
681 })
682 .collect();
683 group_data.insert(alias, Value::Array(nodes));
684 }
685 "aggregate" => {
686 let group_docs_owned: Vec<Document> =
688 group_docs.iter().map(|d| (*d).clone()).collect();
689 let agg_result = execute_aggregation(
695 &group_docs_owned,
696 &[field.clone()],
697 fragments,
698 variables,
699 )?;
700 for (k, v) in agg_result.data {
702 group_data.insert(k, v);
703 }
704 }
705 _ => {
706 }
708 }
709 }
710
711 result_docs.push(Document {
712 id: format!("group_{}", group_key),
713 data: group_data,
714 });
715 }
716
717 Ok(result_docs)
718}
719
720fn execute_aggregation(
722 docs: &[Document],
723 selection_set: &[ast::Field],
724 fragments: &HashMap<String, FragmentDef>,
725 variables: &HashMap<String, ast::Value>,
726) -> Result<Document> {
727 let mut result_data = HashMap::new();
728
729 for field in selection_set {
730 let alias = field.alias.as_ref().unwrap_or(&field.name).clone();
731
732 if field.name == "aggregate" {
733 let mut agg_result = HashMap::new();
736
737 let agg_fields = collect_fields(&field.selection_set, fragments, variables, None)?;
738
739 for agg_fn in agg_fields {
740 let agg_alias = agg_fn.alias.as_ref().unwrap_or(&agg_fn.name).clone();
743 let agg_name = &agg_fn.name;
744
745 let value = match agg_name.as_str() {
746 "count" => Value::Int(docs.len() as i64),
747 "sum" | "avg" | "min" | "max" => {
748 let target_field = agg_fn
751 .arguments
752 .iter()
753 .find(|a| a.name == "field")
754 .and_then(|a| match &a.value {
755 ast::Value::String(s) => Some(s.clone()),
756 _ => None,
757 })
758 .ok_or_else(|| {
759 AqlError::new(
760 ErrorCode::QueryError,
761 format!(
762 "Aggregation '{}' requires a 'field' argument",
763 agg_name
764 ),
765 )
766 })?;
767
768 let values: Vec<f64> = docs
770 .iter()
771 .filter_map(|d| {
772 d.data.get(&target_field).and_then(|v| match v {
773 Value::Int(i) => Some(*i as f64),
774 Value::Float(f) => Some(*f),
775 _ => None,
776 })
777 })
778 .collect();
779
780 match agg_name.as_str() {
781 "sum" => Value::Float(values.iter().sum()),
782 "avg" => {
783 if values.is_empty() {
784 Value::Null
785 } else {
786 let sum: f64 = values.iter().sum();
787 Value::Float(sum / values.len() as f64)
788 }
789 }
790 "min" => values
791 .iter()
792 .fold(None, |min, val| match min {
793 None => Some(*val),
794 Some(m) => Some(if *val < m { *val } else { m }),
795 })
796 .map(Value::Float)
797 .unwrap_or(Value::Null),
798 "max" => values
799 .iter()
800 .fold(None, |max, val| match max {
801 None => Some(*val),
802 Some(m) => Some(if *val > m { *val } else { m }),
803 })
804 .map(Value::Float)
805 .unwrap_or(Value::Null),
806 _ => Value::Null, }
808 }
809 _ => {
810 return Err(AqlError::new(
811 ErrorCode::QueryError,
812 format!("Unknown aggregation function '{}'", agg_name),
813 ));
814 }
815 };
816
817 agg_result.insert(agg_alias, value);
818 }
819
820 result_data.insert(alias, Value::Object(agg_result));
821 }
822 }
823
824 Ok(Document {
825 id: "aggregation_result".to_string(),
826 data: result_data,
827 })
828}
829
830async fn execute_lookup(
832 db: &Aurora,
833 parent_doc: &Document,
834 lookup: &ast::LookupSelection,
835 variables: &HashMap<String, ast::Value>,
836 fragments: &HashMap<String, FragmentDef>,
837) -> Result<Value> {
838 let local_value = parent_doc.data.get(&lookup.local_field);
840
841 if local_value.is_none() {
842 return Ok(Value::Array(vec![]));
843 }
844
845 let local_value = local_value.unwrap();
846
847 let foreign_docs = db.aql_get_all_collection(&lookup.collection).await?;
849
850 let matching_docs: Vec<Document> = foreign_docs
852 .into_iter()
853 .filter(|doc| {
854 if let Some(foreign_val) = doc.data.get(&lookup.foreign_field) {
855 db_values_equal(foreign_val, local_value)
856 } else {
857 false
858 }
859 })
860 .collect();
861
862 let filtered_docs = if let Some(ref filter) = lookup.filter {
864 let compiled_filter = compile_filter(filter)?;
865 matching_docs
866 .into_iter()
867 .filter(|doc| matches_filter(doc, &compiled_filter, variables))
868 .collect()
869 } else {
870 matching_docs
871 };
872
873 let projected: Vec<Value> = filtered_docs
875 .into_iter()
876 .map(|doc| {
877 let fields = collect_fields(
880 &lookup.selection_set,
881 fragments,
882 variables,
883 Some(&lookup.collection),
884 )
885 .unwrap_or_default();
886
887 if fields.is_empty() {
888 Value::Object(doc.data)
889 } else {
890 let projected_doc = apply_projection(doc, &fields);
891 Value::Object(projected_doc.data)
892 }
893 })
894 .collect();
895
896 Ok(Value::Array(projected))
897}
898
899fn db_values_equal(a: &Value, b: &Value) -> bool {
901 match (a, b) {
902 (Value::String(s1), Value::String(s2)) => s1 == s2,
903 (Value::Int(i1), Value::Int(i2)) => i1 == i2,
904 (Value::Float(f1), Value::Float(f2)) => (f1 - f2).abs() < f64::EPSILON,
905 (Value::Bool(b1), Value::Bool(b2)) => b1 == b2,
906 (Value::Null, Value::Null) => true,
907 (Value::String(s), Value::Int(i)) => s.parse::<i64>().ok() == Some(*i),
909 (Value::Int(i), Value::String(s)) => s.parse::<i64>().ok() == Some(*i),
910 _ => false,
911 }
912}
913
914async fn apply_projection_with_lookups(
916 db: &Aurora,
917 doc: Document,
918 selection_set: &[ast::Selection],
919 variables: &HashMap<String, ast::Value>,
920 fragments: &HashMap<String, FragmentDef>,
921 collection: &str,
922) -> Result<Document> {
923 let fields = collect_fields(selection_set, fragments, variables, Some(collection))?;
924
925 if fields.is_empty() {
926 return Ok(doc);
927 }
928
929 let mut projected_data = HashMap::new();
930
931 if !doc.id.is_empty() {
933 projected_data.insert("id".to_string(), Value::String(doc.id.clone()));
934 } else if let Some(id_val) = doc.data.get("id") {
935 projected_data.insert("id".to_string(), id_val.clone());
936 }
937
938 for field in fields {
939 let field_name = field.alias.as_ref().unwrap_or(&field.name);
940 let source_name = &field.name;
941
942 let is_lookup = field.arguments.iter().any(|arg| {
944 arg.name == "collection" || arg.name == "localField" || arg.name == "foreignField"
945 });
946
947 if is_lookup {
948 let filter = extract_filter_from_args(&field.arguments).ok().flatten();
951
952 let collection =
953 extract_string_arg(&field.arguments, "collection").ok_or_else(|| {
954 AqlError::new(
955 ErrorCode::QueryError,
956 "lookup requires 'collection' argument".to_string(),
957 )
958 })?;
959 let local_field =
960 extract_string_arg(&field.arguments, "localField").ok_or_else(|| {
961 AqlError::new(
962 ErrorCode::QueryError,
963 "lookup requires 'localField' argument".to_string(),
964 )
965 })?;
966 let foreign_field =
967 extract_string_arg(&field.arguments, "foreignField").ok_or_else(|| {
968 AqlError::new(
969 ErrorCode::QueryError,
970 "lookup requires 'foreignField' argument".to_string(),
971 )
972 })?;
973
974 let lookup = ast::LookupSelection {
975 collection,
976 local_field,
977 foreign_field,
978 filter,
979 selection_set: field.selection_set.clone(),
980 };
981
982 let lookup_result = execute_lookup(db, &doc, &lookup, variables, fragments).await?;
983 projected_data.insert(field_name.clone(), lookup_result);
984 } else if let Some(value) = doc.data.get(source_name) {
985 projected_data.insert(field_name.clone(), value.clone());
986 }
987 }
988
989 Ok(Document {
990 id: doc.id,
991 data: projected_data,
992 })
993}
994
995fn extract_string_arg(args: &[ast::Argument], name: &str) -> Option<String> {
997 args.iter().find(|a| a.name == name).and_then(|a| {
998 if let ast::Value::String(s) = &a.value {
999 Some(s.clone())
1000 } else {
1001 None
1002 }
1003 })
1004}
1005
1006pub fn validate_document(doc: &Document, rules: &[ast::ValidationRule]) -> Result<Vec<String>> {
1008 let mut errors = Vec::new();
1009
1010 for rule in rules {
1011 let field_value = doc.data.get(&rule.field);
1012
1013 for constraint in &rule.constraints {
1014 match constraint {
1015 ast::ValidationConstraint::Format(format) => {
1016 if let Some(Value::String(s)) = field_value {
1017 match format.as_str() {
1018 "email" => {
1019 if !s.contains('@') || !s.contains('.') {
1020 errors.push(format!("{}: invalid email format", rule.field));
1021 }
1022 }
1023 "url" => {
1024 if !s.starts_with("http://") && !s.starts_with("https://") {
1025 errors.push(format!("{}: invalid URL format", rule.field));
1026 }
1027 }
1028 "uuid" => {
1029 if uuid::Uuid::parse_str(s).is_err() {
1030 errors.push(format!("{}: invalid UUID format", rule.field));
1031 }
1032 }
1033 _ => {}
1034 }
1035 }
1036 }
1037 ast::ValidationConstraint::Min(min) => {
1038 let valid = match field_value {
1039 Some(Value::Int(i)) => (*i as f64) >= *min,
1040 Some(Value::Float(f)) => *f >= *min,
1041 _ => true,
1042 };
1043 if !valid {
1044 errors.push(format!("{}: value must be >= {}", rule.field, min));
1045 }
1046 }
1047 ast::ValidationConstraint::Max(max) => {
1048 let valid = match field_value {
1049 Some(Value::Int(i)) => (*i as f64) <= *max,
1050 Some(Value::Float(f)) => *f <= *max,
1051 _ => true,
1052 };
1053 if !valid {
1054 errors.push(format!("{}: value must be <= {}", rule.field, max));
1055 }
1056 }
1057 ast::ValidationConstraint::MinLength(min_len) => {
1058 if let Some(Value::String(s)) = field_value {
1059 if (s.len() as i64) < *min_len {
1060 errors.push(format!("{}: length must be >= {}", rule.field, min_len));
1061 }
1062 }
1063 }
1064 ast::ValidationConstraint::MaxLength(max_len) => {
1065 if let Some(Value::String(s)) = field_value {
1066 if (s.len() as i64) > *max_len {
1067 errors.push(format!("{}: length must be <= {}", rule.field, max_len));
1068 }
1069 }
1070 }
1071 ast::ValidationConstraint::Pattern(pattern) => {
1072 if let Some(Value::String(s)) = field_value {
1073 match regex::RegexBuilder::new(pattern)
1075 .size_limit(10_000_000) .dfa_size_limit(2_000_000) .build()
1078 {
1079 Ok(re) => {
1080 if !re.is_match(s) {
1081 errors.push(format!(
1082 "{}: does not match pattern '{}'",
1083 rule.field, pattern
1084 ));
1085 }
1086 }
1087 Err(e) => {
1088 errors.push(format!(
1089 "{}: regex pattern '{}' is too complex or invalid: {}",
1090 rule.field, pattern, e
1091 ));
1092 }
1093 }
1094 }
1095 }
1096 }
1097 }
1098 }
1099
1100 Ok(errors)
1101}
1102
1103pub fn execute_downsample(
1106 docs: &[Document],
1107 interval: &str,
1108 aggregation: &str,
1109 time_field: &str,
1110 value_field: &str,
1111) -> Result<Vec<Document>> {
1112 let interval_secs = parse_interval(interval)?;
1114
1115 let mut buckets: HashMap<i64, Vec<&Document>> = HashMap::new();
1117
1118 for doc in docs {
1119 if let Some(Value::Int(ts)) = doc.data.get(time_field) {
1120 let bucket = (*ts / interval_secs) * interval_secs;
1121 buckets.entry(bucket).or_default().push(doc);
1122 }
1123 }
1124
1125 let mut result_docs = Vec::new();
1127 let mut sorted_buckets: Vec<_> = buckets.into_iter().collect();
1128 sorted_buckets.sort_by_key(|(k, _)| *k);
1129
1130 for (bucket_ts, bucket_docs) in sorted_buckets {
1131 let values: Vec<f64> = bucket_docs
1132 .iter()
1133 .filter_map(|d| match d.data.get(value_field) {
1134 Some(Value::Int(i)) => Some(*i as f64),
1135 Some(Value::Float(f)) => Some(*f),
1136 _ => None,
1137 })
1138 .collect();
1139
1140 let agg_value = match aggregation {
1141 "avg" | "average" => {
1142 if values.is_empty() {
1143 0.0
1144 } else {
1145 values.iter().sum::<f64>() / values.len() as f64
1146 }
1147 }
1148 "sum" => values.iter().sum(),
1149 "min" => values.iter().cloned().fold(f64::INFINITY, f64::min),
1150 "max" => values.iter().cloned().fold(f64::NEG_INFINITY, f64::max),
1151 "count" => values.len() as f64,
1152 "first" => *values.first().unwrap_or(&0.0),
1153 "last" => *values.last().unwrap_or(&0.0),
1154 _ => values.iter().sum::<f64>() / values.len().max(1) as f64,
1155 };
1156
1157 let mut data = HashMap::new();
1158 data.insert(time_field.to_string(), Value::Int(bucket_ts));
1159 data.insert(value_field.to_string(), Value::Float(agg_value));
1160 data.insert("count".to_string(), Value::Int(bucket_docs.len() as i64));
1161
1162 result_docs.push(Document {
1163 id: format!("bucket_{}", bucket_ts),
1164 data,
1165 });
1166 }
1167
1168 Ok(result_docs)
1169}
1170
1171fn parse_interval(interval: &str) -> Result<i64> {
1173 let interval = interval.trim().to_lowercase();
1174 let (num_str, unit) = interval.split_at(interval.len().saturating_sub(1));
1175 let num: i64 = num_str.parse().unwrap_or(1);
1176 let multiplier = match unit {
1177 "s" => 1,
1178 "m" => 60,
1179 "h" => 3600,
1180 "d" => 86400,
1181 "w" => 604800,
1182 _ => {
1183 return Err(AqlError::new(
1184 ErrorCode::QueryError,
1185 format!("Invalid interval unit '{}'", unit),
1186 ));
1187 }
1188 };
1189 Ok(num * multiplier)
1190}
1191
1192pub fn execute_window_function(
1194 docs: &[Document],
1195 field: &str,
1196 function: &str,
1197 window_size: usize,
1198) -> Result<Vec<Document>> {
1199 if window_size == 0 {
1200 return Err(AqlError::new(
1201 ErrorCode::QueryError,
1202 "window_size must be >= 1".to_string(),
1203 ));
1204 }
1205 let mut result_docs = Vec::new();
1206
1207 for (i, doc) in docs.iter().enumerate() {
1208 let window_start = i.saturating_sub(window_size - 1);
1209 let window: Vec<f64> = docs[window_start..=i]
1210 .iter()
1211 .filter_map(|d| match d.data.get(field) {
1212 Some(Value::Int(v)) => Some(*v as f64),
1213 Some(Value::Float(v)) => Some(*v),
1214 _ => None,
1215 })
1216 .collect();
1217
1218 let window_value = match function {
1219 "ROW_NUMBER" | "row_number" => (i + 1) as f64,
1220 "RANK" | "rank" => (i + 1) as f64, "SUM" | "sum" | "running_sum" => window.iter().sum(),
1222 "AVG" | "avg" | "moving_avg" => {
1223 if window.is_empty() {
1224 0.0
1225 } else {
1226 window.iter().sum::<f64>() / window.len() as f64
1227 }
1228 }
1229 "MIN" | "min" => window.iter().cloned().fold(f64::INFINITY, f64::min),
1230 "MAX" | "max" => window.iter().cloned().fold(f64::NEG_INFINITY, f64::max),
1231 "COUNT" | "count" => window.len() as f64,
1232 _ => 0.0,
1233 };
1234
1235 let mut new_data = doc.data.clone();
1236 new_data.insert(
1237 format!("{}_window", function.to_lowercase()),
1238 Value::Float(window_value),
1239 );
1240
1241 result_docs.push(Document {
1242 id: doc.id.clone(),
1243 data: new_data,
1244 });
1245 }
1246
1247 Ok(result_docs)
1248}
1249
1250async fn execute_mutation(
1252 db: &Aurora,
1253 mutation: &ast::Mutation,
1254 options: &ExecutionOptions,
1255 fragments: &HashMap<String, FragmentDef>,
1256) -> Result<ExecutionResult> {
1257 validate_required_variables(&mutation.variable_definitions, &mutation.variables_values)?;
1259
1260 let mut results = Vec::new();
1261 let mut context: ExecutionContext = HashMap::new();
1262
1263 for mut_op in &mutation.operations {
1264 let result = execute_mutation_op(
1265 db,
1266 mut_op,
1267 &mutation.variables_values,
1268 &context,
1269 options,
1270 fragments,
1271 )
1272 .await?;
1273
1274 if let Some(alias) = &mut_op.alias {
1276 if let Some(doc) = result.returned_documents.first() {
1277 let mut json_map = serde_json::Map::new();
1279 for (k, v) in &doc.data {
1280 json_map.insert(k.clone(), aurora_value_to_json_value(v));
1281 }
1282
1283 json_map.insert("id".to_string(), JsonValue::String(doc.id.clone()));
1284
1285 let doc_json = JsonValue::Object(json_map);
1286
1287 context.insert(alias.clone(), doc_json);
1288 }
1289 }
1290
1291 results.push(result);
1292 }
1293
1294 if results.len() == 1 {
1295 Ok(ExecutionResult::Mutation(results.remove(0)))
1296 } else {
1297 Ok(ExecutionResult::Batch(
1298 results.into_iter().map(ExecutionResult::Mutation).collect(),
1299 ))
1300 }
1301}
1302
1303use base64::{Engine as _, engine::general_purpose};
1304use futures::future::{BoxFuture, FutureExt}; fn execute_mutation_op<'a>(
1308 db: &'a Aurora,
1309 mut_op: &'a ast::MutationOperation,
1310 variables: &'a HashMap<String, ast::Value>,
1311 context: &'a ExecutionContext,
1312 options: &'a ExecutionOptions,
1313 fragments: &'a HashMap<String, FragmentDef>,
1314) -> BoxFuture<'a, Result<MutationResult>> {
1315 async move {
1316 match &mut_op.operation {
1317 MutationOp::Insert { collection, data } => {
1318 let resolved_data = resolve_value(data, variables, context);
1319 let doc_data = aql_value_to_hashmap(&resolved_data)?;
1320 let doc = db.aql_insert(collection, doc_data).await?;
1321
1322 let returned = if !mut_op.selection_set.is_empty() && options.apply_projections {
1323 let fields = collect_fields(
1327 &mut_op.selection_set,
1328 fragments,
1329 variables,
1330 Some(collection),
1331 )
1332 .unwrap_or_default();
1333
1334 vec![apply_projection(doc.clone(), &fields)]
1335 } else {
1336 vec![doc]
1337 };
1338
1339 Ok(MutationResult {
1340 operation: "insert".to_string(),
1341 collection: collection.clone(),
1342 affected_count: 1,
1343 returned_documents: returned,
1344 })
1345 }
1346
1347 MutationOp::InsertMany { collection, data } => {
1348 let mut docs = Vec::new();
1349 for item in data {
1350 let resolved_item = resolve_value(item, variables, context);
1351 let doc_data = aql_value_to_hashmap(&resolved_item)?;
1352 let doc = db.aql_insert(collection, doc_data).await?;
1353 docs.push(doc);
1354 }
1355
1356 let count = docs.len();
1357 let returned = if !mut_op.selection_set.is_empty() && options.apply_projections {
1358 docs.into_iter()
1359 .map(|d| {
1360 let fields = collect_fields(
1361 &mut_op.selection_set,
1362 fragments,
1363 variables,
1364 Some(collection),
1365 )
1366 .unwrap_or_default();
1367 apply_projection(d, &fields)
1368 })
1369 .collect()
1370 } else {
1371 docs
1372 };
1373
1374 Ok(MutationResult {
1375 operation: "insertMany".to_string(),
1376 collection: collection.clone(),
1377 affected_count: count,
1378 returned_documents: returned,
1379 })
1380 }
1381
1382 MutationOp::Update {
1383 collection,
1384 filter,
1385 data,
1386 } => {
1387 let resolved_data = resolve_value(data, variables, context);
1388 let update_data = aql_value_to_hashmap(&resolved_data)?;
1389
1390 let compiled_filter = if let Some(f) = filter {
1391 Some(compile_filter(f)?)
1392 } else {
1393 None
1394 };
1395
1396 let filter_fn = |doc: &Document| {
1397 compiled_filter
1398 .as_ref()
1399 .map(|f| matches_filter(doc, f, variables))
1400 .unwrap_or(true)
1401 };
1402 let matching_docs = db.scan_and_filter(collection, filter_fn, None)?;
1404
1405 let mut affected = 0;
1406 let mut returned = Vec::new();
1407
1408 for doc in matching_docs {
1409 let updated_doc = db
1410 .aql_update_document(collection, &doc.id, update_data.clone())
1411 .await?;
1412 returned.push(updated_doc);
1413 affected += 1;
1414 }
1415
1416 let returned = if !mut_op.selection_set.is_empty() && options.apply_projections {
1417 returned
1418 .into_iter()
1419 .map(|d| {
1420 let fields = collect_fields(
1421 &mut_op.selection_set,
1422 fragments,
1423 variables,
1424 Some(collection),
1425 )
1426 .unwrap_or_default();
1427 apply_projection(d, &fields)
1428 })
1429 .collect()
1430 } else {
1431 returned
1432 };
1433
1434 Ok(MutationResult {
1435 operation: "update".to_string(),
1436 collection: collection.clone(),
1437 affected_count: affected,
1438 returned_documents: returned,
1439 })
1440 }
1441
1442 MutationOp::Upsert {
1443 collection,
1444 filter,
1445 data,
1446 } => {
1447 let resolved_data = resolve_value(data, variables, context);
1449 let update_data = aql_value_to_hashmap(&resolved_data)?;
1450
1451 let compiled_filter = if let Some(f) = filter {
1452 Some(compile_filter(f)?)
1453 } else {
1454 None
1455 };
1456
1457 let filter_fn = |doc: &Document| {
1458 compiled_filter
1459 .as_ref()
1460 .map(|f| matches_filter(doc, f, variables))
1461 .unwrap_or(false) };
1463
1464 let matching = db.scan_and_filter(collection, filter_fn, None)?;
1465
1466 if matching.is_empty() {
1467 let doc = db.aql_insert(collection, update_data).await?;
1469 Ok(MutationResult {
1470 operation: "upsert(insert)".to_string(),
1471 collection: collection.clone(),
1472 affected_count: 1,
1473 returned_documents: vec![doc],
1474 })
1475 } else {
1476 let mut affected = 0;
1478 let mut returned = Vec::new();
1479
1480 for doc in matching {
1481 let updated_doc = db
1482 .aql_update_document(collection, &doc.id, update_data.clone())
1483 .await?;
1484 returned.push(updated_doc);
1485 affected += 1;
1486 }
1487
1488 Ok(MutationResult {
1489 operation: "upsert(update)".to_string(),
1490 collection: collection.clone(),
1491 affected_count: affected,
1492 returned_documents: returned,
1493 })
1494 }
1495 }
1496
1497 MutationOp::Delete { collection, filter } => {
1498 let compiled_filter = if let Some(f) = filter {
1499 Some(compile_filter(f)?)
1500 } else {
1501 None
1502 };
1503
1504 let filter_fn = |doc: &Document| {
1505 compiled_filter
1506 .as_ref()
1507 .map(|f| matches_filter(doc, f, variables))
1508 .unwrap_or(true) };
1510
1511 let matching_docs = db.scan_and_filter(collection, filter_fn, None)?;
1512
1513 let mut affected = 0;
1514 let mut returned = Vec::new();
1515
1516 for doc in matching_docs {
1517 let deleted_doc = db.aql_delete_document(collection, &doc.id).await?;
1518 returned.push(deleted_doc);
1519 affected += 1;
1520 }
1521
1522 Ok(MutationResult {
1523 operation: "delete".to_string(),
1524 collection: collection.clone(),
1525 affected_count: affected,
1526 returned_documents: returned,
1527 })
1528 }
1529
1530 MutationOp::EnqueueJob {
1531 job_type,
1532 payload,
1533 priority,
1534 scheduled_at,
1535 max_retries,
1536 } => {
1537 let workers = db
1538 .workers
1539 .as_ref()
1540 .ok_or_else(|| AqlError::invalid_operation("Worker system not initialized"))?;
1541
1542 let mut job = crate::workers::Job::new(job_type);
1543
1544 let resolved_payload = resolve_value(payload, variables, context);
1547 if let ast::Value::Object(p) = resolved_payload {
1548 for (k, v) in p {
1549 let db_val = aql_value_to_db_value(&v)?;
1550 let json_val: serde_json::Value =
1551 serde_json::to_value(&db_val).map_err(|e| {
1552 AqlError::new(ErrorCode::SerializationError, e.to_string())
1553 })?;
1554 let key_str = k.to_string();
1555 job = job.add_field(key_str, json_val);
1556 }
1557 }
1558
1559 let p_enum = match priority {
1561 ast::JobPriority::Critical => crate::workers::JobPriority::Critical,
1562 ast::JobPriority::High => crate::workers::JobPriority::High,
1563 ast::JobPriority::Low => crate::workers::JobPriority::Low,
1564 ast::JobPriority::Normal => crate::workers::JobPriority::Normal,
1565 };
1566 job = job.with_priority(p_enum); if let Some(s_str) = scheduled_at {
1568 if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(s_str) {
1569 job = job.scheduled_at(dt.with_timezone(&chrono::Utc));
1570 }
1571 }
1572
1573 if let Some(retries) = max_retries {
1575 job = job.with_max_retries((*retries).try_into().unwrap_or(3));
1576 }
1577
1578 let job_id = workers.enqueue(job).await?;
1579
1580 Ok(MutationResult {
1581 operation: "enqueueJob".to_string(),
1582 collection: "jobs".to_string(),
1583 affected_count: 1,
1584 returned_documents: vec![Document {
1585 id: job_id,
1586 data: HashMap::new(),
1587 }],
1588 })
1589 }
1590
1591 MutationOp::Transaction { operations } => {
1592 let tx = db.aql_begin_transaction()?;
1594 let mut results = Vec::new();
1595
1596 for inner_op in operations {
1597 match execute_mutation_op(db, inner_op, variables, context, options, fragments)
1598 .await
1599 {
1600 Ok(result) => results.push(result),
1601 Err(e) => {
1602 let _ = db.aql_rollback_transaction(tx).await;
1603 return Err(e);
1604 }
1605 }
1606 }
1607
1608 db.aql_commit_transaction(tx).await?;
1609
1610 let total_affected: usize = results.iter().map(|r| r.affected_count).sum();
1611 let all_returned: Vec<Document> = results
1612 .into_iter()
1613 .flat_map(|r| r.returned_documents)
1614 .collect();
1615
1616 Ok(MutationResult {
1617 operation: "transaction".to_string(),
1618 collection: "multiple".to_string(),
1619 affected_count: total_affected,
1620 returned_documents: all_returned,
1621 })
1622 }
1623 }
1624 }
1625 .boxed()
1626}
1627
1628async fn execute_subscription(
1630 db: &Aurora,
1631 sub: &ast::Subscription,
1632 _options: &ExecutionOptions,
1633) -> Result<ExecutionResult> {
1634 let collection = sub
1635 .selection_set
1636 .first()
1637 .and_then(|sel| match sel {
1638 Selection::Field(f) => Some(f.name.clone()),
1639 _ => None, })
1641 .unwrap_or_default();
1642
1643 if collection.is_empty() {
1644 return Err(AqlError::new(
1645 ErrorCode::QueryError,
1646 "Subscription must select a collection".to_string(),
1647 ));
1648 }
1649
1650 let mut listener = db.pubsub.listen(&collection);
1652
1653 if let Some(selection) = sub.selection_set.first() {
1656 if let Selection::Field(field) = selection {
1657 let filter_opt = extract_filter_from_args(&field.arguments)?;
1658 if let Some(aql_filter) = filter_opt {
1659 if let Some(event_filter) = convert_aql_filter_to_event_filter(&aql_filter) {
1660 listener = listener.filter(event_filter);
1661 } else {
1662 return Err(AqlError::new(
1663 ErrorCode::QueryError,
1664 "Unsupported filter operator in subscription",
1665 ));
1666 }
1667 }
1668 }
1669 }
1670
1671 Ok(ExecutionResult::Subscription(SubscriptionResult {
1672 subscription_id: uuid::Uuid::new_v4().to_string(),
1673 collection,
1674 stream: Some(listener),
1675 }))
1676}
1677
1678async fn execute_introspection(
1680 db: &Aurora,
1681 intro: &ast::IntrospectionQuery,
1682) -> Result<ExecutionResult> {
1683 let mut result_data = HashMap::new();
1684
1685 let collection_stats = db.get_collection_stats().unwrap_or_default();
1687 let collection_names: Vec<String> = collection_stats.keys().cloned().collect();
1688
1689 for field_name in &intro.fields {
1690 match field_name.as_str() {
1691 "collections" => {
1692 let collection_list: Vec<Value> = collection_names
1694 .iter()
1695 .map(|name| Value::String(name.clone()))
1696 .collect();
1697 result_data.insert("collections".to_string(), Value::Array(collection_list));
1698 }
1699 "fields" => {
1700 let mut all_fields = HashMap::new();
1702 for name in &collection_names {
1703 if let Ok(coll) = db.get_collection_definition(name) {
1704 let field_names: Vec<Value> = coll
1705 .fields
1706 .keys()
1707 .map(|k| Value::String(k.clone()))
1708 .collect();
1709 all_fields.insert(name.clone(), Value::Array(field_names));
1710 }
1711 }
1712 result_data.insert("fields".to_string(), Value::Object(all_fields));
1713 }
1714 "relations" => {
1715 result_data.insert("relations".to_string(), Value::Array(vec![]));
1717 }
1718 _ => {
1719 }
1721 }
1722 }
1723
1724 Ok(ExecutionResult::Query(QueryResult {
1725 collection: "__schema".to_string(),
1726 documents: vec![Document {
1727 id: "__schema".to_string(),
1728 data: result_data,
1729 }],
1730 total_count: Some(1),
1731 }))
1732}
1733
1734fn convert_ast_field_to_db_field(field: &ast::FieldDef) -> Result<crate::types::FieldDefinition> {
1736 use crate::types::{FieldDefinition, FieldType, ScalarType};
1737
1738 let base_type = match field.field_type.name.as_str() {
1740 "String" | "ID" | "Email" | "URL" | "PhoneNumber" | "DateTime" | "Date" | "Time" => {
1741 ScalarType::String
1742 }
1743 "Int" => ScalarType::Int,
1744 "Float" => ScalarType::Float,
1745 "Boolean" => ScalarType::Bool,
1746 "Uuid" => ScalarType::Uuid,
1747 "Object" | "Json" => ScalarType::Object,
1748 "Any" => ScalarType::Any,
1749 "Array" => ScalarType::Array,
1750 _ => ScalarType::Any,
1751 };
1752
1753 let field_type = if field.field_type.is_array || field.field_type.name == "Array" {
1755 FieldType::Array(base_type)
1756 } else if field.field_type.name == "Object" || field.field_type.name == "Json" {
1757 FieldType::Object
1758 } else {
1759 FieldType::Scalar(base_type)
1760 };
1761
1762 let mut unique = false;
1764 let mut indexed = false;
1765
1766 for directive in &field.directives {
1767 match directive.name.as_str() {
1768 "unique" => unique = true,
1769 "index" | "indexed" => indexed = true,
1770 _ => {}
1771 }
1772 }
1773
1774 if matches!(field_type, FieldType::Any) && (unique || indexed) {
1776 return Err(AqlError::new(
1777 ErrorCode::InvalidDefinition,
1778 format!(
1779 "Field '{}' of type 'Any' cannot be unique or indexed.",
1780 field.name
1781 ),
1782 ));
1783 }
1784
1785 Ok(FieldDefinition {
1786 field_type,
1787 unique,
1788 indexed,
1789 nullable: !field.field_type.is_required,
1790 })
1791}
1792
1793async fn execute_schema(
1795 db: &Aurora,
1796 schema: &ast::Schema,
1797 _options: &ExecutionOptions,
1798) -> Result<ExecutionResult> {
1799 let mut results = Vec::new();
1800
1801 for op in &schema.operations {
1802 match op {
1803 ast::SchemaOp::DefineCollection {
1804 name,
1805 if_not_exists,
1806 fields,
1807 directives: _,
1808 } => {
1809 if *if_not_exists && db.get_collection_definition(name).is_ok() {
1811 results.push(ExecutionResult::Schema(SchemaResult {
1812 operation: "defineCollection".to_string(),
1813 collection: name.clone(),
1814 status: "skipped (exists)".to_string(),
1815 }));
1816 continue;
1817 }
1818
1819 let mut db_fields = std::collections::HashMap::new();
1821 for f in fields {
1822 let def = convert_ast_field_to_db_field(f)?;
1823 db_fields.insert(f.name.clone(), def);
1824 }
1825
1826 db.create_collection_schema(name, db_fields).await?;
1827
1828 results.push(ExecutionResult::Schema(SchemaResult {
1829 operation: "defineCollection".to_string(),
1830 collection: name.clone(),
1831 status: "created".to_string(),
1832 }));
1833 }
1834 ast::SchemaOp::AlterCollection { name, actions } => {
1835 for action in actions {
1836 match action {
1837 ast::AlterAction::AddField(field_def) => {
1838 let def = convert_ast_field_to_db_field(field_def)?;
1839 db.add_field_to_schema(name, field_def.name.clone(), def)
1840 .await?;
1841 }
1842 ast::AlterAction::DropField(field_name) => {
1843 db.drop_field_from_schema(name, field_name.clone()).await?;
1844 }
1845 ast::AlterAction::RenameField { from, to } => {
1846 db.rename_field_in_schema(name, from.clone(), to.clone())
1847 .await?;
1848 }
1849 ast::AlterAction::ModifyField(field_def) => {
1850 let def = convert_ast_field_to_db_field(field_def)?;
1851 db.modify_field_in_schema(name, field_def.name.clone(), def)
1852 .await?;
1853 }
1854 }
1855 }
1856 results.push(ExecutionResult::Schema(SchemaResult {
1857 operation: "alterCollection".to_string(),
1858 collection: name.clone(),
1859 status: "modified".to_string(),
1860 }));
1861 }
1862 ast::SchemaOp::DropCollection { name, if_exists } => {
1863 if *if_exists && db.get_collection_definition(name).is_err() {
1864 results.push(ExecutionResult::Schema(SchemaResult {
1865 operation: "dropCollection".to_string(),
1866 collection: name.clone(),
1867 status: "skipped (not found)".to_string(),
1868 }));
1869 continue;
1870 }
1871
1872 db.drop_collection_schema(name).await?;
1873
1874 results.push(ExecutionResult::Schema(SchemaResult {
1875 operation: "dropCollection".to_string(),
1876 collection: name.clone(),
1877 status: "dropped".to_string(),
1878 }));
1879 }
1880 }
1881 }
1882
1883 if results.len() == 1 {
1884 Ok(results.remove(0))
1885 } else {
1886 Ok(ExecutionResult::Batch(results))
1887 }
1888}
1889
1890async fn execute_migration(
1892 db: &Aurora,
1893 migration: &ast::Migration,
1894 options: &ExecutionOptions,
1895) -> Result<ExecutionResult> {
1896 let mut results = Vec::new();
1897
1898 for step in &migration.steps {
1899 if db.is_migration_applied(&step.version).await? {
1901 continue;
1902 }
1903
1904 let mut applied_count = 0;
1905 for action in &step.actions {
1906 match action {
1907 ast::MigrationAction::Schema(schema_op) => {
1908 let schema = ast::Schema {
1910 operations: vec![schema_op.clone()],
1911 };
1912 execute_schema(db, &schema, options).await?;
1913 applied_count += 1;
1914 }
1915 ast::MigrationAction::DataMigration(data_mig) => {
1916 let collection = &data_mig.collection;
1921 let docs = db.aql_get_all_collection(collection).await?;
1922 let engine = crate::computed::ComputedEngine::new();
1923
1924 for doc in docs {
1925 let mut updated_data = doc.data.clone();
1927 let mut changed = false;
1928
1929 for transform in &data_mig.transforms {
1930 let matches_filter = match &transform.filter {
1932 Some(f) => check_ast_filter_match(f, &doc),
1933 None => true,
1934 };
1935
1936 if matches_filter {
1937 if let Some(new_value) =
1939 engine.evaluate(&transform.expression, &doc)
1940 {
1941 updated_data.insert(transform.field.clone(), new_value);
1942 changed = true;
1943 }
1944 }
1945 }
1946
1947 if changed {
1948 db.aql_update_document(collection, &doc.id, updated_data)
1949 .await?;
1950 }
1951 }
1952 applied_count += 1;
1953 }
1954 }
1955 }
1956
1957 db.mark_migration_applied(&step.version).await?;
1958
1959 results.push(ExecutionResult::Migration(MigrationResult {
1960 version: step.version.clone(),
1961 steps_applied: applied_count,
1962 status: "applied".to_string(),
1963 }));
1964 }
1965
1966 let total_applied = results
1968 .iter()
1969 .map(|r| {
1970 if let ExecutionResult::Migration(m) = r {
1971 m.steps_applied
1972 } else {
1973 0
1974 }
1975 })
1976 .sum();
1977
1978 if results.is_empty() {
1979 Ok(ExecutionResult::Migration(MigrationResult {
1981 version: migration
1982 .steps
1983 .first()
1984 .map(|s| s.version.clone())
1985 .unwrap_or_default(),
1986 steps_applied: 0,
1987 status: "skipped (already applied)".to_string(),
1988 }))
1989 } else if results.len() == 1 {
1990 Ok(results.remove(0))
1991 } else {
1992 Ok(ExecutionResult::Migration(MigrationResult {
1994 version: "batch".to_string(),
1995 steps_applied: total_applied,
1996 status: "applied".to_string(),
1997 }))
1998 }
1999}
2000
2001fn extract_filter_from_args(args: &[ast::Argument]) -> Result<Option<AqlFilter>> {
2005 for arg in args {
2006 if arg.name == "where" || arg.name == "filter" {
2007 return Ok(Some(value_to_filter(&arg.value)?));
2008 }
2009 }
2010 Ok(None)
2011}
2012
2013fn extract_order_by(args: &[ast::Argument]) -> Vec<ast::Ordering> {
2017 let mut orderings = Vec::new();
2018
2019 for arg in args {
2020 if arg.name == "orderBy" {
2021 match &arg.value {
2022 ast::Value::String(field) => {
2024 orderings.push(ast::Ordering {
2025 field: field.clone(),
2026 direction: ast::SortDirection::Asc,
2027 });
2028 }
2029 ast::Value::Object(map) => {
2031 if let Some(ordering) = parse_ordering_object(map) {
2032 orderings.push(ordering);
2033 }
2034 }
2035 ast::Value::Array(arr) => {
2037 for val in arr {
2038 if let ast::Value::Object(map) = val {
2039 if let Some(ordering) = parse_ordering_object(map) {
2040 orderings.push(ordering);
2041 }
2042 }
2043 }
2044 }
2045 _ => {}
2046 }
2047 }
2048 }
2049
2050 orderings
2051}
2052
2053fn parse_ordering_object(map: &HashMap<String, ast::Value>) -> Option<ast::Ordering> {
2055 if let Some(ast::Value::String(field_name)) = map.get("field") {
2057 let direction = map
2058 .get("direction")
2059 .and_then(|v| match v {
2060 ast::Value::String(s) | ast::Value::Enum(s) => match s.to_uppercase().as_str() {
2061 "ASC" | "ASCENDING" => Some(ast::SortDirection::Asc),
2062 "DESC" | "DESCENDING" => Some(ast::SortDirection::Desc),
2063 _ => None,
2064 },
2065 _ => None,
2066 })
2067 .unwrap_or(ast::SortDirection::Asc);
2068
2069 return Some(ast::Ordering {
2070 field: field_name.clone(),
2071 direction,
2072 });
2073 }
2074
2075 if map.len() == 1 {
2077 let (key, val) = map.iter().next().unwrap();
2078 let direction = match val {
2080 ast::Value::String(s) | ast::Value::Enum(s) => match s.to_uppercase().as_str() {
2081 "ASC" | "ASCENDING" => Some(ast::SortDirection::Asc),
2082 "DESC" | "DESCENDING" => Some(ast::SortDirection::Desc),
2083 _ => None,
2084 },
2085 _ => None,
2086 };
2087
2088 if let Some(dir) = direction {
2089 return Some(ast::Ordering {
2090 field: key.clone(),
2091 direction: dir,
2092 });
2093 }
2094 }
2095
2096 None
2097}
2098
2099fn apply_ordering(docs: &mut [Document], orderings: &[ast::Ordering]) {
2101 if orderings.is_empty() {
2102 return;
2103 }
2104
2105 docs.sort_by(|a, b| {
2106 for ordering in orderings {
2107 let a_val = a.data.get(&ordering.field);
2108 let b_val = b.data.get(&ordering.field);
2109
2110 let cmp = compare_values(a_val, b_val);
2111
2112 if cmp != std::cmp::Ordering::Equal {
2113 return match ordering.direction {
2114 ast::SortDirection::Asc => cmp,
2115 ast::SortDirection::Desc => cmp.reverse(),
2116 };
2117 }
2118 }
2119 std::cmp::Ordering::Equal
2120 });
2121}
2122
2123fn compare_values(a: Option<&Value>, b: Option<&Value>) -> std::cmp::Ordering {
2125 match (a, b) {
2126 (None, None) => std::cmp::Ordering::Equal,
2127 (None, Some(_)) => std::cmp::Ordering::Less,
2128 (Some(_), None) => std::cmp::Ordering::Greater,
2129 (Some(av), Some(bv)) => {
2130 match (av, bv) {
2131 (Value::Int(ai), Value::Int(bi)) => ai.cmp(bi),
2132 (Value::Float(af), Value::Float(bf)) => {
2133 af.partial_cmp(bf).unwrap_or(std::cmp::Ordering::Equal)
2134 }
2135 (Value::String(as_), Value::String(bs)) => as_.cmp(bs),
2136 (Value::Bool(ab), Value::Bool(bb)) => ab.cmp(bb),
2137 _ => format!("{:?}", av).cmp(&format!("{:?}", bv)),
2139 }
2140 }
2141 }
2142}
2143
2144fn convert_aql_filter_to_event_filter(filter: &AqlFilter) -> Option<crate::pubsub::EventFilter> {
2146 use crate::pubsub::EventFilter;
2147
2148 match filter {
2149 AqlFilter::Eq(field, value) => {
2150 let db_val = aql_value_to_db_value(value).ok()?;
2151 Some(EventFilter::FieldEquals(field.clone(), db_val))
2152 }
2153 AqlFilter::Gt(field, value) => {
2154 let db_val = aql_value_to_db_value(value).ok()?;
2155 Some(EventFilter::Gt(field.clone(), db_val))
2156 }
2157 AqlFilter::Gte(field, value) => {
2158 let db_val = aql_value_to_db_value(value).ok()?;
2159 Some(EventFilter::Gte(field.clone(), db_val))
2160 }
2161 AqlFilter::Lt(field, value) => {
2162 let db_val = aql_value_to_db_value(value).ok()?;
2163 Some(EventFilter::Lt(field.clone(), db_val))
2164 }
2165 AqlFilter::Lte(field, value) => {
2166 let db_val = aql_value_to_db_value(value).ok()?;
2167 Some(EventFilter::Lte(field.clone(), db_val))
2168 }
2169 AqlFilter::Ne(field, value) => {
2170 let db_val = aql_value_to_db_value(value).ok()?;
2171 Some(EventFilter::Ne(field.clone(), db_val))
2172 }
2173 AqlFilter::In(field, value) => {
2174 let db_val = aql_value_to_db_value(value).ok()?;
2175 Some(EventFilter::In(field.clone(), db_val))
2176 }
2177 AqlFilter::NotIn(field, value) => {
2178 let db_val = aql_value_to_db_value(value).ok()?;
2179 Some(EventFilter::NotIn(field.clone(), db_val))
2180 }
2181 AqlFilter::And(filters) => {
2182 let mut event_filters = Vec::new();
2183 for f in filters {
2184 if let Some(ef) = convert_aql_filter_to_event_filter(f) {
2185 event_filters.push(ef);
2186 } else {
2187 return None; }
2189 }
2190 Some(EventFilter::And(event_filters))
2191 }
2192 AqlFilter::Or(filters) => {
2193 let mut event_filters = Vec::new();
2194 for f in filters {
2195 if let Some(ef) = convert_aql_filter_to_event_filter(f) {
2196 event_filters.push(ef);
2197 } else {
2198 return None;
2199 }
2200 }
2201 Some(EventFilter::Or(event_filters))
2202 }
2203 AqlFilter::Not(filter) => {
2204 convert_aql_filter_to_event_filter(filter).map(|f| EventFilter::Not(Box::new(f)))
2205 }
2206 AqlFilter::Contains(field, value) => {
2207 let db_val = aql_value_to_db_value(value).ok()?;
2208 Some(EventFilter::Contains(field.clone(), db_val))
2209 }
2210 AqlFilter::StartsWith(field, value) => {
2211 let db_val = aql_value_to_db_value(value).ok()?;
2212 Some(EventFilter::StartsWith(field.clone(), db_val))
2213 }
2214 AqlFilter::EndsWith(field, value) => {
2215 let db_val = aql_value_to_db_value(value).ok()?;
2216 Some(EventFilter::EndsWith(field.clone(), db_val))
2217 }
2218 AqlFilter::IsNull(field) => Some(EventFilter::IsNull(field.clone())),
2219 AqlFilter::IsNotNull(field) => Some(EventFilter::IsNotNull(field.clone())),
2220
2221 AqlFilter::Matches(_, _) => None,
2223 }
2224}
2225
2226pub fn extract_pagination(args: &[ast::Argument]) -> (Option<usize>, usize) {
2228 let mut limit = None;
2229 let mut offset = 0;
2230
2231 for arg in args {
2232 match arg.name.as_str() {
2233 "limit" | "first" | "take" => {
2234 if let ast::Value::Int(n) = arg.value {
2235 limit = Some(n as usize);
2236 }
2237 }
2238 "offset" | "skip" => {
2239 if let ast::Value::Int(n) = arg.value {
2240 offset = n as usize;
2241 }
2242 }
2243 _ => {}
2244 }
2245 }
2246
2247 (limit, offset)
2248}
2249
2250fn extract_cursor_pagination(
2251 args: &[ast::Argument],
2252) -> (Option<usize>, Option<String>, Option<usize>, Option<String>) {
2253 let mut first = None;
2254 let mut after = None;
2255 let mut last = None;
2256 let mut before = None;
2257
2258 for arg in args {
2259 match arg.name.as_str() {
2260 "first" => {
2261 if let ast::Value::Int(n) = arg.value {
2262 first = Some(n as usize);
2263 }
2264 }
2265 "after" => {
2266 if let ast::Value::String(ref s) = arg.value {
2267 after = Some(s.clone());
2268 }
2269 }
2270 "last" => {
2271 if let ast::Value::Int(n) = arg.value {
2272 last = Some(n as usize);
2273 }
2274 }
2275 "before" => {
2276 if let ast::Value::String(ref s) = arg.value {
2277 before = Some(s.clone());
2278 }
2279 }
2280 _ => {}
2281 }
2282 }
2283
2284 (first, after, last, before)
2285}
2286
2287fn encode_cursor(val: &Value) -> String {
2288 let s = match val {
2289 Value::String(s) => s.clone(),
2290 _ => String::new(),
2291 };
2292 general_purpose::STANDARD.encode(s)
2293}
2294
2295fn decode_cursor(cursor: &str) -> Result<String> {
2296 let bytes = general_purpose::STANDARD
2297 .decode(cursor)
2298 .map_err(|_| AqlError::new(ErrorCode::QueryError, "Invalid cursor".to_string()))?;
2299 String::from_utf8(bytes)
2300 .map_err(|_| AqlError::new(ErrorCode::QueryError, "Invalid cursor UTF-8".to_string()))
2301}
2302
2303fn get_doc_value_at_path<'a>(doc: &'a Document, path: &str) -> Option<&'a Value> {
2304 if let Some(val) = doc.data.get(path) {
2306 return Some(val);
2307 }
2308
2309 if !path.contains('.') {
2310 return None;
2311 }
2312
2313 let parts: Vec<&str> = path.split('.').collect();
2314 let mut current = doc.data.get(parts[0])?;
2315
2316 for &part in &parts[1..] {
2317 if let Value::Object(map) = current {
2318 current = map.get(part)?;
2319 } else {
2320 return None;
2321 }
2322 }
2323
2324 Some(current)
2325}
2326
2327pub fn matches_filter(
2329 doc: &Document,
2330 filter: &CompiledFilter,
2331 variables: &HashMap<String, ast::Value>,
2332) -> bool {
2333 match filter {
2334 CompiledFilter::Eq(field, value) => get_doc_value_at_path(doc, field)
2335 .map(|v| values_equal(v, value, variables))
2336 .unwrap_or(false),
2337 CompiledFilter::Ne(field, value) => get_doc_value_at_path(doc, field)
2338 .map(|v| !values_bit_equal(v, value, variables))
2339 .unwrap_or(true),
2340 CompiledFilter::Gt(field, value) => get_doc_value_at_path(doc, field)
2341 .map(|v| value_compare(v, value, variables) == Some(std::cmp::Ordering::Greater))
2342 .unwrap_or(false),
2343 CompiledFilter::Gte(field, value) => get_doc_value_at_path(doc, field)
2344 .map(|v| {
2345 matches!(
2346 value_compare(v, value, variables),
2347 Some(std::cmp::Ordering::Greater | std::cmp::Ordering::Equal)
2348 )
2349 })
2350 .unwrap_or(false),
2351 CompiledFilter::Lt(field, value) => get_doc_value_at_path(doc, field)
2352 .map(|v| value_compare(v, value, variables) == Some(std::cmp::Ordering::Less))
2353 .unwrap_or(false),
2354 CompiledFilter::Lte(field, value) => get_doc_value_at_path(doc, field)
2355 .map(|v| {
2356 matches!(
2357 value_compare(v, value, variables),
2358 Some(std::cmp::Ordering::Less | std::cmp::Ordering::Equal)
2359 )
2360 })
2361 .unwrap_or(false),
2362 CompiledFilter::In(field, value) => {
2363 if let ast::Value::Array(arr) = value {
2364 get_doc_value_at_path(doc, field)
2365 .map(|v| arr.iter().any(|item| values_equal(v, item, variables)))
2366 .unwrap_or(false)
2367 } else {
2368 false
2369 }
2370 }
2371 CompiledFilter::NotIn(field, value) => {
2372 if let ast::Value::Array(arr) = value {
2373 get_doc_value_at_path(doc, field)
2374 .map(|v| !arr.iter().any(|item| values_equal(v, item, variables)))
2375 .unwrap_or(true)
2376 } else {
2377 true
2378 }
2379 }
2380 CompiledFilter::Contains(field, value) => {
2381 match (get_doc_value_at_path(doc, field), value) {
2382 (Some(Value::String(doc_val)), ast::Value::String(search)) => {
2383 doc_val.contains(search)
2384 }
2385 (Some(Value::Array(doc_arr)), ast::Value::String(search)) => {
2386 doc_arr.iter().any(|v| {
2387 if let Value::String(s) = v {
2388 s == search
2389 } else {
2390 false
2391 }
2392 })
2393 }
2394 _ => false,
2395 }
2396 }
2397 CompiledFilter::StartsWith(field, value) => {
2398 if let (Some(Value::String(doc_val)), ast::Value::String(prefix)) =
2399 (get_doc_value_at_path(doc, field), value)
2400 {
2401 doc_val.starts_with(prefix)
2402 } else {
2403 false
2404 }
2405 }
2406 CompiledFilter::EndsWith(field, value) => {
2407 if let (Some(Value::String(doc_val)), ast::Value::String(suffix)) =
2408 (get_doc_value_at_path(doc, field), value)
2409 {
2410 doc_val.ends_with(suffix)
2411 } else {
2412 false
2413 }
2414 }
2415 CompiledFilter::Matches(field, regex) => {
2416 if let Some(Value::String(doc_val)) = get_doc_value_at_path(doc, field) {
2417 regex.is_match(doc_val)
2418 } else {
2419 false
2420 }
2421 }
2422 CompiledFilter::IsNull(field) => get_doc_value_at_path(doc, field)
2423 .map(|v| matches!(v, Value::Null))
2424 .unwrap_or(true),
2425 CompiledFilter::IsNotNull(field) => get_doc_value_at_path(doc, field)
2426 .map(|v| !matches!(v, Value::Null))
2427 .unwrap_or(false),
2428 CompiledFilter::And(filters) => filters.iter().all(|f| matches_filter(doc, f, variables)),
2429 CompiledFilter::Or(filters) => filters.iter().any(|f| matches_filter(doc, f, variables)),
2430 CompiledFilter::Not(filter) => !matches_filter(doc, filter, variables),
2431 }
2432}
2433
2434fn values_equal(
2436 db_val: &Value,
2437 aql_val: &ast::Value,
2438 variables: &HashMap<String, ast::Value>,
2439) -> bool {
2440 let resolved = resolve_if_variable(aql_val, variables);
2441 match (db_val, resolved) {
2442 (Value::Null, ast::Value::Null) => true,
2443 (Value::Bool(a), ast::Value::Boolean(b)) => *a == *b,
2444 (Value::Int(a), ast::Value::Int(b)) => *a == *b,
2445 (Value::Float(a), ast::Value::Float(b)) => (*a - *b).abs() < f64::EPSILON,
2446 (Value::Float(a), ast::Value::Int(b)) => (*a - (*b as f64)).abs() < f64::EPSILON,
2447 (Value::Int(a), ast::Value::Float(b)) => ((*a as f64) - *b).abs() < f64::EPSILON,
2448 (Value::String(a), ast::Value::String(b)) => a == b,
2449 _ => false,
2450 }
2451}
2452
2453fn values_bit_equal(
2455 db_val: &Value,
2456 aql_val: &ast::Value,
2457 variables: &HashMap<String, ast::Value>,
2458) -> bool {
2459 let resolved = resolve_if_variable(aql_val, variables);
2460 match (db_val, resolved) {
2461 (Value::Null, ast::Value::Null) => true,
2462 (Value::Bool(a), ast::Value::Boolean(b)) => *a == *b,
2463 (Value::Int(a), ast::Value::Int(b)) => *a == *b,
2464 (Value::Float(a), ast::Value::Float(b)) => a.to_bits() == b.to_bits(),
2465 (Value::String(a), ast::Value::String(b)) => a == b,
2466 _ => false,
2468 }
2469}
2470
2471fn value_compare(
2473 db_val: &Value,
2474 aql_val: &ast::Value,
2475 variables: &HashMap<String, ast::Value>,
2476) -> Option<std::cmp::Ordering> {
2477 let resolved = resolve_if_variable(aql_val, variables);
2478 match (db_val, resolved) {
2479 (Value::Int(a), ast::Value::Int(b)) => Some(a.cmp(b)),
2480 (Value::Float(a), ast::Value::Float(b)) => a.partial_cmp(b),
2481 (Value::Float(a), ast::Value::Int(b)) => a.partial_cmp(&(*b as f64)),
2482 (Value::Int(a), ast::Value::Float(b)) => (*a as f64).partial_cmp(b),
2483 (Value::String(a), ast::Value::String(b)) => Some(a.cmp(b)),
2484 _ => None,
2485 }
2486}
2487
2488fn resolve_if_variable<'a>(
2490 val: &'a ast::Value,
2491 variables: &'a HashMap<String, ast::Value>,
2492) -> &'a ast::Value {
2493 if let ast::Value::Variable(name) = val {
2494 variables.get(name).unwrap_or(val)
2495 } else {
2496 val
2497 }
2498}
2499
2500pub fn apply_projection(mut doc: Document, fields: &[ast::Field]) -> Document {
2502 if fields.is_empty() {
2503 return doc;
2504 }
2505
2506 let mut projected_data = HashMap::new();
2507
2508 if let Some(id_val) = doc.data.get("id") {
2510 projected_data.insert("id".to_string(), id_val.clone());
2511 }
2512
2513 for field in fields {
2514 let field_name = field.alias.as_ref().unwrap_or(&field.name);
2515 let source_name = &field.name;
2516
2517 if let Some(value) = doc.data.get(source_name) {
2518 projected_data.insert(field_name.clone(), value.clone());
2519 }
2520 }
2521
2522 doc.data = projected_data;
2523 doc
2524}
2525
2526pub fn aql_value_to_db_value(val: &ast::Value) -> Result<Value> {
2528 match val {
2529 ast::Value::Null => Ok(Value::Null),
2530 ast::Value::Boolean(b) => Ok(Value::Bool(*b)),
2531 ast::Value::Int(i) => Ok(Value::Int(*i)),
2532 ast::Value::Float(f) => Ok(Value::Float(*f)),
2533 ast::Value::String(s) => Ok(Value::String(s.clone())),
2534 ast::Value::Array(arr) => {
2535 let converted: Result<Vec<Value>> = arr.iter().map(aql_value_to_db_value).collect();
2536 Ok(Value::Array(converted?))
2537 }
2538 ast::Value::Object(map) => {
2539 let mut converted = HashMap::new();
2540 for (k, v) in map {
2541 converted.insert(k.clone(), aql_value_to_db_value(v)?);
2542 }
2543 Ok(Value::Object(converted))
2544 }
2545 ast::Value::Variable(name) => Err(AqlError::new(
2546 ErrorCode::QueryError,
2547 format!("Unresolved variable: {}", name),
2548 )),
2549 ast::Value::Enum(e) => Ok(Value::String(e.clone())),
2550 }
2551}
2552
2553fn aql_value_to_hashmap(val: &ast::Value) -> Result<HashMap<String, Value>> {
2555 match val {
2556 ast::Value::Object(map) => {
2557 let mut converted = HashMap::new();
2558 for (k, v) in map {
2559 converted.insert(k.clone(), aql_value_to_db_value(v)?);
2560 }
2561 Ok(converted)
2562 }
2563 _ => Err(AqlError::new(
2564 ErrorCode::QueryError,
2565 "Data must be an object".to_string(),
2566 )),
2567 }
2568}
2569
2570pub fn db_value_to_aql_value(val: &Value) -> ast::Value {
2572 match val {
2573 Value::Null => ast::Value::Null,
2574 Value::Bool(b) => ast::Value::Boolean(*b),
2575 Value::Int(i) => ast::Value::Int(*i),
2576 Value::Float(f) => ast::Value::Float(*f),
2577 Value::String(s) => ast::Value::String(s.clone()),
2578 Value::Array(arr) => ast::Value::Array(arr.iter().map(db_value_to_aql_value).collect()),
2579 Value::Object(map) => ast::Value::Object(
2580 map.iter()
2581 .map(|(k, v)| (k.clone(), db_value_to_aql_value(v)))
2582 .collect(),
2583 ),
2584 Value::Uuid(u) => ast::Value::String(u.to_string()),
2585 }
2586}
2587
2588pub fn value_to_filter(value: &ast::Value) -> Result<AqlFilter> {
2590 match value {
2591 ast::Value::Object(map) => {
2592 let mut filters = Vec::new();
2593 for (key, val) in map {
2594 match key.as_str() {
2595 "and" => {
2596 if let ast::Value::Array(arr) = val {
2597 let sub: Result<Vec<_>> = arr.iter().map(value_to_filter).collect();
2598 filters.push(AqlFilter::And(sub?));
2599 }
2600 }
2601 "or" => {
2602 if let ast::Value::Array(arr) = val {
2603 let sub: Result<Vec<_>> = arr.iter().map(value_to_filter).collect();
2604 filters.push(AqlFilter::Or(sub?));
2605 }
2606 }
2607 "not" => filters.push(AqlFilter::Not(Box::new(value_to_filter(val)?))),
2608 field => {
2609 if let ast::Value::Object(ops) = val {
2610 for (op, op_val) in ops {
2611 let f = match op.as_str() {
2612 "eq" => AqlFilter::Eq(field.to_string(), op_val.clone()),
2613 "ne" => AqlFilter::Ne(field.to_string(), op_val.clone()),
2614 "gt" => AqlFilter::Gt(field.to_string(), op_val.clone()),
2615 "gte" => AqlFilter::Gte(field.to_string(), op_val.clone()),
2616 "lt" => AqlFilter::Lt(field.to_string(), op_val.clone()),
2617 "lte" => AqlFilter::Lte(field.to_string(), op_val.clone()),
2618 "in" => AqlFilter::In(field.to_string(), op_val.clone()),
2619 "nin" => AqlFilter::NotIn(field.to_string(), op_val.clone()),
2620 "contains" => {
2621 AqlFilter::Contains(field.to_string(), op_val.clone())
2622 }
2623 "startsWith" => {
2624 AqlFilter::StartsWith(field.to_string(), op_val.clone())
2625 }
2626 "endsWith" => {
2627 AqlFilter::EndsWith(field.to_string(), op_val.clone())
2628 }
2629 "isNull" => AqlFilter::IsNull(field.to_string()),
2630 "isNotNull" => AqlFilter::IsNotNull(field.to_string()),
2631 _ => continue,
2632 };
2633 filters.push(f);
2634 }
2635 }
2636 }
2637 }
2638 }
2639 if filters.len() == 1 {
2640 Ok(filters.remove(0))
2641 } else {
2642 Ok(AqlFilter::And(filters))
2643 }
2644 }
2645 _ => Err(AqlError::new(
2646 ErrorCode::QueryError,
2647 "Filter must be an object".to_string(),
2648 )),
2649 }
2650}
2651
2652fn check_ast_filter_match(filter: &ast::Filter, doc: &Document) -> bool {
2654 match filter {
2655 ast::Filter::Eq(field, val) => check_cmp(doc, field, val, |a, b| a == b),
2656 ast::Filter::Ne(field, val) => check_cmp(doc, field, val, |a, b| a != b),
2657 ast::Filter::Gt(field, val) => check_cmp(doc, field, val, |a, b| a > b),
2658 ast::Filter::Gte(field, val) => check_cmp(doc, field, val, |a, b| a >= b),
2659 ast::Filter::Lt(field, val) => check_cmp(doc, field, val, |a, b| a < b),
2660 ast::Filter::Lte(field, val) => check_cmp(doc, field, val, |a, b| a <= b),
2661 ast::Filter::In(field, val) => {
2662 if let Ok(db_val) = aql_value_to_db_value(val) {
2663 if let Some(doc_val) = doc.data.get(field) {
2664 if let Value::Array(arr) = db_val {
2665 return arr.contains(doc_val);
2666 }
2667 }
2668 }
2669 false
2670 }
2671 ast::Filter::And(filters) => filters.iter().all(|f| check_ast_filter_match(f, doc)),
2672 ast::Filter::Or(filters) => filters.iter().any(|f| check_ast_filter_match(f, doc)),
2673 ast::Filter::Not(filter) => !check_ast_filter_match(filter, doc),
2674 _ => true, }
2676}
2677
2678fn check_cmp<F>(doc: &Document, field: &str, val: &ast::Value, op: F) -> bool
2679where
2680 F: Fn(&Value, &Value) -> bool,
2681{
2682 if let Some(doc_val) = doc.data.get(field) {
2683 if let Ok(cmp_val) = aql_value_to_db_value(val) {
2684 return op(doc_val, &cmp_val);
2685 }
2686 }
2687 false
2688}
2689
2690fn resolve_value(
2694 val: &ast::Value,
2695 variables: &HashMap<String, ast::Value>,
2696 context: &ExecutionContext,
2697) -> ast::Value {
2698 match val {
2699 ast::Value::Variable(name) => {
2700 if let Some(v) = variables.get(name) {
2701 v.clone()
2702 } else {
2703 #[cfg(debug_assertions)]
2706 eprintln!(
2707 "Warning: Variable '{}' not found in resolve_value, defaulting to Null",
2708 name
2709 );
2710
2711 ast::Value::Null
2714 }
2715 }
2716 ast::Value::String(s) if s.starts_with('$') => {
2717 match resolve_variable_path(s, context) {
2719 Some(v) => v,
2720 None => val.clone(),
2721 }
2722 }
2723 ast::Value::Array(arr) => ast::Value::Array(
2724 arr.iter()
2725 .map(|v| resolve_value(v, variables, context))
2726 .collect(),
2727 ),
2728 ast::Value::Object(map) => {
2729 let mut resolved_map = HashMap::new();
2730 for (k, v) in map {
2731 resolved_map.insert(k.clone(), resolve_value(v, variables, context));
2732 }
2733 ast::Value::Object(resolved_map)
2734 }
2735 _ => val.clone(),
2736 }
2737}
2738
2739fn resolve_variable_path(path: &str, context: &ExecutionContext) -> Option<ast::Value> {
2741 let path = path.trim_start_matches('$');
2742 let parts: Vec<&str> = path.split('.').collect();
2743
2744 if parts.is_empty() {
2745 return None;
2746 }
2747
2748 let alias = parts[0];
2750 let mut current_value = context.get(alias)?;
2751
2752 for part in &parts[1..] {
2754 match current_value {
2755 serde_json::Value::Object(map) => {
2756 current_value = map.get(*part)?;
2757 }
2758 serde_json::Value::Array(arr) => {
2759 if let Ok(idx) = part.parse::<usize>() {
2761 current_value = arr.get(idx)?;
2762 } else {
2763 return None;
2764 }
2765 }
2766 _ => return None,
2767 }
2768 }
2769
2770 Some(json_to_ast_value(current_value))
2772}
2773
2774fn json_to_ast_value(json: &serde_json::Value) -> ast::Value {
2775 match json {
2776 serde_json::Value::Null => ast::Value::Null,
2777 serde_json::Value::Bool(b) => ast::Value::Boolean(*b),
2778 serde_json::Value::Number(n) => {
2779 if let Some(i) = n.as_i64() {
2780 ast::Value::Int(i)
2781 } else if let Some(f) = n.as_f64() {
2782 ast::Value::Float(f)
2783 } else {
2784 ast::Value::Null }
2786 }
2787 serde_json::Value::String(s) => ast::Value::String(s.clone()),
2788 serde_json::Value::Array(arr) => {
2789 ast::Value::Array(arr.iter().map(json_to_ast_value).collect())
2790 }
2791 serde_json::Value::Object(map) => {
2792 let mut new_map = HashMap::new();
2793 for (k, v) in map {
2794 new_map.insert(k.clone(), json_to_ast_value(v));
2795 }
2796 ast::Value::Object(new_map)
2797 }
2798 }
2799}
2800
2801fn aurora_value_to_json_value(v: &Value) -> JsonValue {
2802 match v {
2803 Value::Null => JsonValue::Null,
2804 Value::String(s) => JsonValue::String(s.clone()),
2805 Value::Int(i) => JsonValue::Number((*i).into()),
2806 Value::Float(f) => {
2807 if let Some(n) = serde_json::Number::from_f64(*f) {
2808 JsonValue::Number(n)
2809 } else {
2810 JsonValue::Null
2811 }
2812 }
2813 Value::Bool(b) => JsonValue::Bool(*b),
2814 Value::Array(arr) => JsonValue::Array(arr.iter().map(aurora_value_to_json_value).collect()),
2815 Value::Object(map) => {
2816 let mut json_map = serde_json::Map::new();
2817 for (k, v) in map {
2818 json_map.insert(k.clone(), aurora_value_to_json_value(v));
2819 }
2820 JsonValue::Object(json_map)
2821 }
2822 Value::Uuid(u) => JsonValue::String(u.to_string()),
2823 }
2824}
2825
2826#[cfg(test)]
2827mod tests {
2828 use super::*;
2829
2830 #[test]
2831 fn test_aql_value_conversion() {
2832 let aql_val = ast::Value::Object({
2833 let mut map = HashMap::new();
2834 map.insert("name".to_string(), ast::Value::String("John".to_string()));
2835 map.insert("age".to_string(), ast::Value::Int(30));
2836 map
2837 });
2838
2839 let db_val = aql_value_to_db_value(&aql_val).unwrap();
2840 if let Value::Object(map) = db_val {
2841 assert_eq!(map.get("name"), Some(&Value::String("John".to_string())));
2842 assert_eq!(map.get("age"), Some(&Value::Int(30)));
2843 } else {
2844 panic!("Expected Object");
2845 }
2846 }
2847
2848 #[test]
2849 fn test_matches_filter_eq() {
2850 let mut doc = Document::new();
2851 doc.data
2852 .insert("name".to_string(), Value::String("Alice".to_string()));
2853 doc.data.insert("age".to_string(), Value::Int(25));
2854
2855 let filter = AqlFilter::Eq("name".to_string(), ast::Value::String("Alice".to_string()));
2856 let compiled = compile_filter(&filter).unwrap();
2857 assert!(matches_filter(&doc, &compiled, &HashMap::new()));
2858
2859 let filter = AqlFilter::Eq("name".to_string(), ast::Value::String("Bob".to_string()));
2860 let compiled = compile_filter(&filter).unwrap();
2861 assert!(!matches_filter(&doc, &compiled, &HashMap::new()));
2862 }
2863
2864 #[test]
2865 fn test_matches_filter_comparison() {
2866 let mut doc = Document::new();
2867 doc.data.insert("age".to_string(), Value::Int(25));
2868
2869 let filter = AqlFilter::Gt("age".to_string(), ast::Value::Int(20));
2870 let compiled = compile_filter(&filter).unwrap();
2871 assert!(matches_filter(&doc, &compiled, &HashMap::new()));
2872
2873 let filter = AqlFilter::Gt("age".to_string(), ast::Value::Int(30));
2874 let compiled = compile_filter(&filter).unwrap();
2875 assert!(!matches_filter(&doc, &compiled, &HashMap::new()));
2876
2877 let filter = AqlFilter::Gte("age".to_string(), ast::Value::Int(25));
2878 let compiled = compile_filter(&filter).unwrap();
2879 assert!(matches_filter(&doc, &compiled, &HashMap::new()));
2880
2881 let filter = AqlFilter::Lt("age".to_string(), ast::Value::Int(30));
2882 let compiled = compile_filter(&filter).unwrap();
2883 assert!(matches_filter(&doc, &compiled, &HashMap::new()));
2884 }
2885
2886 #[test]
2887 fn test_matches_filter_and_or() {
2888 let mut doc = Document::new();
2889 doc.data
2890 .insert("name".to_string(), Value::String("Alice".to_string()));
2891 doc.data.insert("age".to_string(), Value::Int(25));
2892
2893 let filter = AqlFilter::And(vec![
2894 AqlFilter::Eq("name".to_string(), ast::Value::String("Alice".to_string())),
2895 AqlFilter::Gte("age".to_string(), ast::Value::Int(18)),
2896 ]);
2897 let compiled = compile_filter(&filter).unwrap();
2898 assert!(matches_filter(&doc, &compiled, &HashMap::new()));
2899
2900 let filter = AqlFilter::Or(vec![
2901 AqlFilter::Eq("name".to_string(), ast::Value::String("Bob".to_string())),
2902 AqlFilter::Gte("age".to_string(), ast::Value::Int(18)),
2903 ]);
2904 let compiled = compile_filter(&filter).unwrap();
2905 assert!(matches_filter(&doc, &compiled, &HashMap::new()));
2906 }
2907
2908 #[test]
2909 fn test_matches_filter_string_ops() {
2910 let mut doc = Document::new();
2911 doc.data.insert(
2912 "email".to_string(),
2913 Value::String("alice@example.com".to_string()),
2914 );
2915
2916 let filter = AqlFilter::Contains(
2917 "email".to_string(),
2918 ast::Value::String("example".to_string()),
2919 );
2920 let compiled = compile_filter(&filter).unwrap();
2921 assert!(matches_filter(&doc, &compiled, &HashMap::new()));
2922
2923 let filter =
2924 AqlFilter::StartsWith("email".to_string(), ast::Value::String("alice".to_string()));
2925 let compiled = compile_filter(&filter).unwrap();
2926 assert!(matches_filter(&doc, &compiled, &HashMap::new()));
2927
2928 let filter =
2929 AqlFilter::EndsWith("email".to_string(), ast::Value::String(".com".to_string()));
2930 let compiled = compile_filter(&filter).unwrap();
2931 assert!(matches_filter(&doc, &compiled, &HashMap::new()));
2932 }
2933
2934 #[test]
2935 fn test_matches_filter_in() {
2936 let mut doc = Document::new();
2937 doc.data
2938 .insert("status".to_string(), Value::String("active".to_string()));
2939
2940 let filter = AqlFilter::In(
2941 "status".to_string(),
2942 ast::Value::Array(vec![
2943 ast::Value::String("active".to_string()),
2944 ast::Value::String("pending".to_string()),
2945 ]),
2946 );
2947 let compiled = compile_filter(&filter).unwrap();
2948 assert!(matches_filter(&doc, &compiled, &HashMap::new()));
2949
2950 let filter = AqlFilter::In(
2951 "status".to_string(),
2952 ast::Value::Array(vec![ast::Value::String("inactive".to_string())]),
2953 );
2954 let compiled = compile_filter(&filter).unwrap();
2955 assert!(!matches_filter(&doc, &compiled, &HashMap::new()));
2956 }
2957
2958 #[test]
2959 fn test_apply_projection() {
2960 let mut doc = Document::new();
2961 doc.data
2962 .insert("id".to_string(), Value::String("123".to_string()));
2963 doc.data
2964 .insert("name".to_string(), Value::String("Alice".to_string()));
2965 doc.data.insert(
2966 "email".to_string(),
2967 Value::String("alice@example.com".to_string()),
2968 );
2969 doc.data
2970 .insert("password".to_string(), Value::String("secret".to_string()));
2971
2972 let fields = vec![
2973 ast::Field {
2974 alias: None,
2975 name: "id".to_string(),
2976 arguments: vec![],
2977 directives: vec![],
2978 selection_set: vec![],
2979 },
2980 ast::Field {
2981 alias: None,
2982 name: "name".to_string(),
2983 arguments: vec![],
2984 directives: vec![],
2985 selection_set: vec![],
2986 },
2987 ];
2988
2989 let projected = apply_projection(doc, &fields);
2990 assert_eq!(projected.data.len(), 2);
2991 assert!(projected.data.contains_key("id"));
2992 assert!(projected.data.contains_key("name"));
2993 assert!(!projected.data.contains_key("email"));
2994 assert!(!projected.data.contains_key("password"));
2995 }
2996
2997 #[test]
2998 fn test_apply_projection_with_alias() {
2999 let mut doc = Document::new();
3000 doc.data
3001 .insert("first_name".to_string(), Value::String("Alice".to_string()));
3002
3003 let fields = vec![ast::Field {
3004 alias: Some("name".to_string()),
3005 name: "first_name".to_string(),
3006 arguments: vec![],
3007 directives: vec![],
3008 selection_set: vec![],
3009 }];
3010
3011 let projected = apply_projection(doc, &fields);
3012 assert!(projected.data.contains_key("name"));
3013 assert!(!projected.data.contains_key("first_name"));
3014 }
3015
3016 #[test]
3017 fn test_extract_pagination() {
3018 let args = vec![
3019 ast::Argument {
3020 name: "limit".to_string(),
3021 value: ast::Value::Int(10),
3022 },
3023 ast::Argument {
3024 name: "offset".to_string(),
3025 value: ast::Value::Int(20),
3026 },
3027 ];
3028
3029 let (limit, offset) = extract_pagination(&args);
3030 assert_eq!(limit, Some(10));
3031 assert_eq!(offset, 20);
3032 }
3033
3034 #[test]
3035 fn test_matches_filter_with_variables() {
3036 let mut doc = Document::new();
3037 doc.data.insert("age".to_string(), Value::Int(25));
3038
3039 let mut variables = HashMap::new();
3040 variables.insert("minAge".to_string(), ast::Value::Int(18));
3041
3042 let filter = AqlFilter::Gte(
3043 "age".to_string(),
3044 ast::Value::Variable("minAge".to_string()),
3045 );
3046 let compiled = compile_filter(&filter).unwrap();
3047 assert!(matches_filter(&doc, &compiled, &variables));
3048 }
3049
3050 #[test]
3051 fn test_values_bit_equal() {
3052 let vars = HashMap::new();
3053
3054 let f1 = Value::Float(1.0);
3056 let f1_copy = ast::Value::Float(1.0);
3057 assert!(values_bit_equal(&f1, &f1_copy, &vars));
3058
3059 let f_zero = Value::Float(0.0);
3061 let f_neg_zero = ast::Value::Float(-0.0);
3062 assert!(!values_bit_equal(&f_zero, &f_neg_zero, &vars));
3063
3064 let nan = f64::NAN;
3067 let v_nan = Value::Float(nan);
3068 let a_nan = ast::Value::Float(nan);
3069 assert!(values_bit_equal(&v_nan, &a_nan, &vars));
3071
3072 let i1 = Value::Int(1);
3074 let f1_as_val = ast::Value::Float(1.0);
3075 assert!(!values_bit_equal(&i1, &f1_as_val, &vars));
3076 }
3077
3078 #[test]
3079 fn test_ne_filter_strictness() {
3080 let mut doc = Document::new();
3081 doc.data.insert("val".to_string(), Value::Float(0.0));
3082 doc.data.insert("intVal".to_string(), Value::Int(1));
3083
3084 let vars = HashMap::new();
3085
3086 let filter = AqlFilter::Ne("val".to_string(), ast::Value::Float(0.0));
3088 let compiled = compile_filter(&filter).unwrap();
3089 assert!(!matches_filter(&doc, &compiled, &vars));
3090
3091 let filter = AqlFilter::Ne("val".to_string(), ast::Value::Float(-0.0));
3093 let compiled = compile_filter(&filter).unwrap();
3094 assert!(matches_filter(&doc, &compiled, &vars));
3095
3096 let filter = AqlFilter::Ne("intVal".to_string(), ast::Value::Float(1.0));
3098 let compiled = compile_filter(&filter).unwrap();
3099 assert!(matches_filter(&doc, &compiled, &vars));
3100 }
3101
3102 #[tokio::test]
3103 async fn test_executor_integration() {
3104 use crate::Aurora;
3105 use tempfile::TempDir;
3106
3107 let temp_dir = TempDir::new().unwrap();
3109 let db_path = temp_dir.path().join("test.db");
3110 let config = crate::AuroraConfig {
3111 db_path,
3112 enable_write_buffering: false,
3113 durability_mode: crate::DurabilityMode::Synchronous,
3114 ..Default::default()
3115 };
3116 let db = Aurora::with_config(config).unwrap();
3117
3118 db.new_collection(
3120 "users",
3121 vec![
3122 ("name".to_string(), crate::FieldType::String, false),
3123 ("age".to_string(), crate::FieldType::Int, false),
3124 ("active".to_string(), crate::FieldType::Bool, false),
3125 ],
3126 )
3127 .await
3128 .unwrap();
3129
3130 let insert_query = r#"
3132 mutation {
3133 insertInto(collection: "users", data: {
3134 name: "Alice",
3135 age: 30,
3136 active: true
3137 }) {
3138 id
3139 name
3140 }
3141 }
3142 "#;
3143
3144 let result = execute(&db, insert_query, ExecutionOptions::new())
3145 .await
3146 .unwrap();
3147 match result {
3148 ExecutionResult::Mutation(res) => {
3149 assert_eq!(res.affected_count, 1);
3150 assert_eq!(res.returned_documents.len(), 1);
3151 assert_eq!(
3152 res.returned_documents[0].data.get("name"),
3153 Some(&Value::String("Alice".to_string()))
3154 );
3155 }
3156 _ => panic!("Expected mutation result"),
3157 }
3158
3159 let query = r#"
3161 query {
3162 users {
3163 name
3164 age
3165 }
3166 }
3167 "#;
3168
3169 let result = execute(&db, query, ExecutionOptions::new()).await.unwrap();
3170 match result {
3171 ExecutionResult::Query(res) => {
3172 assert_eq!(res.documents.len(), 1);
3173 assert_eq!(
3174 res.documents[0].data.get("name"),
3175 Some(&Value::String("Alice".to_string()))
3176 );
3177 assert_eq!(res.documents[0].data.get("age"), Some(&Value::Int(30)));
3178 }
3179 _ => panic!("Expected query result"),
3180 }
3181
3182 let delete_query = r#"
3184 mutation {
3185 deleteFrom(collection: "users", filter: { name: { eq: "Alice" } }) {
3186 id
3187 }
3188 }
3189 "#;
3190
3191 let result = execute(&db, delete_query, ExecutionOptions::new())
3192 .await
3193 .unwrap();
3194 match result {
3195 ExecutionResult::Mutation(res) => {
3196 assert_eq!(res.affected_count, 1);
3197 }
3198 _ => panic!("Expected mutation result"),
3199 }
3200
3201 let query = r#"
3203 query {
3204 users {
3205 name
3206 }
3207 }
3208 "#;
3209
3210 let result = execute(&db, query, ExecutionOptions::new()).await.unwrap();
3211 match result {
3212 ExecutionResult::Query(res) => {
3213 assert_eq!(res.documents.len(), 0);
3214 }
3215 _ => panic!("Expected query result"),
3216 }
3217 }
3218
3219 #[tokio::test]
3220 async fn test_lookup_cross_collection_join() {
3221 assert!(db_values_equal(
3225 &Value::String("user1".to_string()),
3226 &Value::String("user1".to_string())
3227 ));
3228
3229 assert!(!db_values_equal(
3231 &Value::String("user1".to_string()),
3232 &Value::String("user2".to_string())
3233 ));
3234
3235 assert!(db_values_equal(&Value::Int(42), &Value::Int(42)));
3237
3238 assert!(db_values_equal(
3240 &Value::String("123".to_string()),
3241 &Value::Int(123)
3242 ));
3243
3244 assert!(db_values_equal(&Value::Null, &Value::Null));
3246
3247 assert!(db_values_equal(&Value::Bool(true), &Value::Bool(true)));
3249 assert!(!db_values_equal(&Value::Bool(true), &Value::Bool(false)));
3250 }
3251
3252 #[test]
3257 fn test_order_by_extraction_and_sorting() {
3258 let args = vec![ast::Argument {
3260 name: "orderBy".to_string(),
3261 value: ast::Value::String("name".to_string()),
3262 }];
3263 let orderings = extract_order_by(&args);
3264 assert_eq!(orderings.len(), 1);
3265 assert_eq!(orderings[0].field, "name");
3266 assert_eq!(orderings[0].direction, ast::SortDirection::Asc);
3267
3268 let mut order_map = HashMap::new();
3270 order_map.insert("field".to_string(), ast::Value::String("age".to_string()));
3271 order_map.insert(
3272 "direction".to_string(),
3273 ast::Value::Enum("DESC".to_string()),
3274 );
3275 let args = vec![ast::Argument {
3276 name: "orderBy".to_string(),
3277 value: ast::Value::Object(order_map),
3278 }];
3279 let orderings = extract_order_by(&args);
3280 assert_eq!(orderings.len(), 1);
3281 assert_eq!(orderings[0].field, "age");
3282 assert_eq!(orderings[0].direction, ast::SortDirection::Desc);
3283
3284 let mut docs = vec![
3286 Document {
3287 id: "1".to_string(),
3288 data: {
3289 let mut m = HashMap::new();
3290 m.insert("name".to_string(), Value::String("Charlie".to_string()));
3291 m
3292 },
3293 },
3294 Document {
3295 id: "2".to_string(),
3296 data: {
3297 let mut m = HashMap::new();
3298 m.insert("name".to_string(), Value::String("Alice".to_string()));
3299 m
3300 },
3301 },
3302 Document {
3303 id: "3".to_string(),
3304 data: {
3305 let mut m = HashMap::new();
3306 m.insert("name".to_string(), Value::String("Bob".to_string()));
3307 m
3308 },
3309 },
3310 ];
3311
3312 let orderings = vec![ast::Ordering {
3313 field: "name".to_string(),
3314 direction: ast::SortDirection::Asc,
3315 }];
3316 apply_ordering(&mut docs, &orderings);
3317
3318 assert_eq!(
3319 docs[0].data.get("name"),
3320 Some(&Value::String("Alice".to_string()))
3321 );
3322 assert_eq!(
3323 docs[1].data.get("name"),
3324 Some(&Value::String("Bob".to_string()))
3325 );
3326 assert_eq!(
3327 docs[2].data.get("name"),
3328 Some(&Value::String("Charlie".to_string()))
3329 );
3330 }
3331
3332 #[test]
3333 fn test_validation() {
3334 let doc = Document {
3335 id: "1".to_string(),
3336 data: {
3337 let mut m = HashMap::new();
3338 m.insert("email".to_string(), Value::String("invalid".to_string()));
3339 m.insert("age".to_string(), Value::Int(15));
3340 m.insert("name".to_string(), Value::String("Ab".to_string()));
3341 m
3342 },
3343 };
3344
3345 let rules = vec![
3346 ast::ValidationRule {
3347 field: "email".to_string(),
3348 constraints: vec![ast::ValidationConstraint::Format("email".to_string())],
3349 },
3350 ast::ValidationRule {
3351 field: "age".to_string(),
3352 constraints: vec![ast::ValidationConstraint::Min(18.0)],
3353 },
3354 ast::ValidationRule {
3355 field: "name".to_string(),
3356 constraints: vec![ast::ValidationConstraint::MinLength(3)],
3357 },
3358 ];
3359
3360 let errors = validate_document(&doc, &rules).unwrap();
3361 assert_eq!(errors.len(), 3);
3362 assert!(errors.iter().any(|e| e.contains("email")));
3363 assert!(errors.iter().any(|e| e.contains("age")));
3364 assert!(errors.iter().any(|e| e.contains("name")));
3365 }
3366
3367 #[test]
3368 fn test_downsample() {
3369 let docs = vec![
3370 Document {
3371 id: "1".to_string(),
3372 data: {
3373 let mut m = HashMap::new();
3374 m.insert("timestamp".to_string(), Value::Int(0));
3375 m.insert("value".to_string(), Value::Float(10.0));
3376 m
3377 },
3378 },
3379 Document {
3380 id: "2".to_string(),
3381 data: {
3382 let mut m = HashMap::new();
3383 m.insert("timestamp".to_string(), Value::Int(30));
3384 m.insert("value".to_string(), Value::Float(20.0));
3385 m
3386 },
3387 },
3388 Document {
3389 id: "3".to_string(),
3390 data: {
3391 let mut m = HashMap::new();
3392 m.insert("timestamp".to_string(), Value::Int(120));
3393 m.insert("value".to_string(), Value::Float(30.0));
3394 m
3395 },
3396 },
3397 ];
3398
3399 let result = execute_downsample(&docs, "1m", "avg", "timestamp", "value").unwrap();
3401
3402 assert_eq!(result.len(), 2);
3404 }
3405
3406 #[test]
3407 fn test_window_function() {
3408 let docs = vec![
3409 Document {
3410 id: "1".to_string(),
3411 data: {
3412 let mut m = HashMap::new();
3413 m.insert("value".to_string(), Value::Float(10.0));
3414 m
3415 },
3416 },
3417 Document {
3418 id: "2".to_string(),
3419 data: {
3420 let mut m = HashMap::new();
3421 m.insert("value".to_string(), Value::Float(20.0));
3422 m
3423 },
3424 },
3425 Document {
3426 id: "3".to_string(),
3427 data: {
3428 let mut m = HashMap::new();
3429 m.insert("value".to_string(), Value::Float(30.0));
3430 m
3431 },
3432 },
3433 ];
3434
3435 let result = execute_window_function(&docs, "value", "avg", 2).unwrap();
3437 assert_eq!(result.len(), 3);
3438
3439 assert_eq!(result[0].data.get("avg_window"), Some(&Value::Float(10.0)));
3441 assert_eq!(result[1].data.get("avg_window"), Some(&Value::Float(15.0)));
3443 assert_eq!(result[2].data.get("avg_window"), Some(&Value::Float(25.0)));
3445 }
3446
3447 #[tokio::test]
3448 async fn test_lookup_integration_with_schema() {
3449 use crate::Aurora;
3450 use crate::AuroraConfig;
3451 use tempfile::TempDir;
3452
3453 let temp_dir = TempDir::new().unwrap();
3454 let db_path = temp_dir.path().join("lookup_test.db");
3455
3456 let config = AuroraConfig {
3457 db_path,
3458 enable_write_buffering: false,
3459 durability_mode: crate::DurabilityMode::Synchronous,
3460 ..Default::default()
3461 };
3462 let db = Aurora::with_config(config).unwrap();
3463
3464 let define_users = r#"
3466 schema {
3467 define collection users if not exists {
3468 userId: String
3469 name: String
3470 }
3471 }
3472 "#;
3473 execute(&db, define_users, ExecutionOptions::new())
3474 .await
3475 .unwrap();
3476
3477 let define_orders = r#"
3479 schema {
3480 define collection orders if not exists {
3481 orderId: String
3482 userId: String
3483 total: Int
3484 }
3485 }
3486 "#;
3487 execute(&db, define_orders, ExecutionOptions::new())
3488 .await
3489 .unwrap();
3490
3491 let insert_user = r#"
3493 mutation {
3494 insertInto(collection: "users", data: {
3495 userId: "user1",
3496 name: "Alice"
3497 }) { id userId name }
3498 }
3499 "#;
3500 let user_result = execute(&db, insert_user, ExecutionOptions::new())
3501 .await
3502 .unwrap();
3503
3504 let user_doc = match user_result {
3505 ExecutionResult::Mutation(res) => {
3506 assert_eq!(res.affected_count, 1);
3507 res.returned_documents[0].clone()
3508 }
3509 _ => panic!("Expected mutation result"),
3510 };
3511
3512 let insert_order1 = r#"
3514 mutation {
3515 insertInto(collection: "orders", data: {
3516 orderId: "order1",
3517 userId: "user1",
3518 total: 100
3519 }) { id }
3520 }
3521 "#;
3522 execute(&db, insert_order1, ExecutionOptions::new())
3523 .await
3524 .unwrap();
3525
3526 let insert_order2 = r#"
3527 mutation {
3528 insertInto(collection: "orders", data: {
3529 orderId: "order2",
3530 userId: "user1",
3531 total: 250
3532 }) { id }
3533 }
3534 "#;
3535 execute(&db, insert_order2, ExecutionOptions::new())
3536 .await
3537 .unwrap();
3538
3539 let query = r#"query { orders { orderId userId total } }"#;
3541 let result = execute(&db, query, ExecutionOptions::new()).await.unwrap();
3542 match result {
3543 ExecutionResult::Query(res) => {
3544 assert_eq!(res.documents.len(), 2, "Should have 2 orders");
3545 }
3546 _ => panic!("Expected query result"),
3547 }
3548
3549 let lookup = ast::LookupSelection {
3551 collection: "orders".to_string(),
3552 local_field: "userId".to_string(),
3553 foreign_field: "userId".to_string(),
3554 filter: None,
3555 selection_set: vec![],
3556 };
3557
3558 let lookup_result =
3559 execute_lookup(&db, &user_doc, &lookup, &HashMap::new(), &HashMap::new())
3560 .await
3561 .unwrap();
3562 if let Value::Array(found_orders) = lookup_result {
3563 assert_eq!(found_orders.len(), 2, "Should find 2 orders for user1");
3564 } else {
3565 panic!("Expected array result from lookup");
3566 }
3567 }
3568
3569 #[tokio::test]
3570 async fn test_sdl_integration() {
3571 use crate::Aurora;
3572 use crate::AuroraConfig;
3573 use tempfile::TempDir;
3574
3575 let temp_dir = TempDir::new().unwrap();
3577 let db_path = temp_dir.path().join("test_sdl.db");
3578
3579 let config = AuroraConfig {
3580 db_path,
3581 enable_write_buffering: false,
3582 durability_mode: crate::DurabilityMode::Synchronous,
3583 ..Default::default()
3584 };
3585 let db = Aurora::with_config(config).unwrap();
3586
3587 let define_schema = r#"
3589 schema {
3590 define collection products if not exists {
3591 name: String @unique
3592 price: Float @indexed
3593 category: String
3594 }
3595 }
3596 "#;
3597
3598 let result = execute(&db, define_schema, ExecutionOptions::new())
3599 .await
3600 .unwrap();
3601 match result {
3602 ExecutionResult::Schema(res) => {
3603 assert_eq!(res.status, "created");
3604 assert_eq!(res.collection, "products");
3605 }
3606 _ => panic!("Expected schema result"),
3607 }
3608
3609 let result = execute(&db, define_schema, ExecutionOptions::new())
3611 .await
3612 .unwrap();
3613 match result {
3614 ExecutionResult::Schema(res) => {
3615 assert!(
3618 res.status == "skipped (exists)" || res.status == "created",
3619 "Unexpected status: {}",
3620 res.status
3621 );
3622 }
3623 _ => panic!("Expected schema result for duplicate"),
3624 }
3625
3626 let alter_schema = r#"
3628 schema {
3629 alter collection products {
3630 add stock: Int @indexed
3631 }
3632 }
3633 "#;
3634
3635 let result = execute(&db, alter_schema, ExecutionOptions::new())
3636 .await
3637 .unwrap();
3638 match result {
3639 ExecutionResult::Schema(res) => {
3640 assert_eq!(res.status, "modified");
3641 }
3642 _ => panic!("Expected schema result for alter"),
3643 }
3644
3645 let rename_schema = r#"
3647 schema {
3648 alter collection products {
3649 rename category to cat
3650 }
3651 }
3652 "#;
3653 let result = execute(&db, rename_schema, ExecutionOptions::new())
3654 .await
3655 .unwrap();
3656 match result {
3657 ExecutionResult::Schema(res) => {
3658 assert_eq!(res.status, "modified");
3659 }
3660 _ => panic!("Expected schema result for rename"),
3661 }
3662
3663 let modify_schema = r#"
3665 schema {
3666 alter collection products {
3667 modify price: Float
3668 }
3669 }
3670 "#;
3671 let result = execute(&db, modify_schema, ExecutionOptions::new())
3672 .await
3673 .unwrap();
3674 match result {
3675 ExecutionResult::Schema(res) => {
3676 assert_eq!(res.status, "modified");
3677 }
3678 _ => panic!("Expected schema result for modify"),
3679 }
3680
3681 let migration = r#"
3683 migrate {
3684 "v1": {
3685 alter collection products {
3686 add description: String
3687 }
3688 }
3689 }
3690 "#;
3691
3692 let result = execute(&db, migration, ExecutionOptions::new())
3693 .await
3694 .unwrap();
3695 match result {
3696 ExecutionResult::Migration(res) => {
3697 assert_eq!(res.steps_applied, 1);
3698 }
3699 _ => panic!("Expected migration result"),
3700 }
3701
3702 let result = execute(&db, migration, ExecutionOptions::new())
3704 .await
3705 .unwrap();
3706 match result {
3707 ExecutionResult::Migration(res) => {
3708 assert_eq!(
3710 res.steps_applied, 0,
3711 "Migration should be idempotent - version already applied"
3712 );
3713 }
3714 _ => panic!("Expected Migration result for idempotency check"),
3715 }
3716
3717 let drop_schema = r#"
3719 schema {
3720 drop collection products
3721 }
3722 "#;
3723
3724 let result = execute(&db, drop_schema, ExecutionOptions::new())
3725 .await
3726 .unwrap();
3727 match result {
3728 ExecutionResult::Schema(res) => {
3729 assert_eq!(res.status, "dropped");
3730 }
3731 _ => panic!("Expected schema result for drop"),
3732 }
3733 }
3734
3735 #[tokio::test]
3736 async fn test_dynamic_variable_resolution() {
3737 use crate::Aurora;
3738 use tempfile::TempDir;
3739
3740 let temp_dir = TempDir::new().unwrap();
3742 let db_path = temp_dir.path().join("test_dynamic.db");
3745
3746 let config = crate::AuroraConfig {
3748 db_path,
3749 enable_write_buffering: false,
3750 durability_mode: crate::DurabilityMode::Synchronous,
3751 ..Default::default()
3752 };
3753 let db = Aurora::with_config(config).unwrap();
3754
3755 db.new_collection(
3757 "users",
3758 vec![
3759 ("name", crate::FieldType::String, false),
3760 ("profile", crate::FieldType::Any, false),
3761 ],
3762 )
3763 .await
3764 .unwrap();
3765
3766 db.new_collection(
3767 "orders",
3768 vec![
3769 ("user_id", crate::FieldType::String, false),
3770 ("theme", crate::FieldType::String, false),
3771 ],
3772 )
3773 .await
3774 .unwrap();
3775
3776 db.new_collection(
3777 "user_settings",
3778 vec![
3779 ("user_id", crate::FieldType::String, false),
3780 ("theme", crate::FieldType::String, false),
3781 ],
3782 )
3783 .await
3784 .unwrap();
3785
3786 let mutation = r#"
3789 mutation DynamicFlow {
3790 user: insertInto(collection: "users", data: {
3791 name: "John",
3792 profile: { settings: { theme: "dark" } }
3793 }) {
3794 id
3795 name
3796 profile
3797 }
3798
3799 order: insertInto(collection: "orders", data: {
3800 user_id: "$user.id",
3801 theme: "$user.profile.settings.theme"
3802 }) {
3803 id
3804 user_id
3805 theme
3806 }
3807
3808 job: enqueueJob(
3809 jobType: "send_email",
3810 payload: {
3811 orderId: "$order.id",
3812 userId: "$order.user_id",
3813 theme: "$order.theme"
3814 }
3815 )
3816 }
3817 "#;
3818
3819 let result = execute(&db, mutation, ExecutionOptions::new())
3820 .await
3821 .unwrap();
3822
3823 match result {
3824 ExecutionResult::Mutation(_res) => {
3825 panic!("Expected Batch result for multi-op mutation, got Mutation");
3827 }
3828 ExecutionResult::Batch(results) => {
3829 assert_eq!(results.len(), 3);
3830
3831 let users = db.aql_get_all_collection("users").await.unwrap();
3834 assert_eq!(users.len(), 1);
3835 let user_id = &users[0].id;
3836
3837 let orders = db.aql_get_all_collection("orders").await.unwrap();
3839 assert_eq!(orders.len(), 1);
3840
3841 let order_doc = &orders[0];
3843 assert_eq!(
3844 order_doc.data.get("user_id"),
3845 Some(&Value::String(user_id.clone()))
3846 );
3847 assert_eq!(
3848 order_doc.data.get("theme"),
3849 Some(&Value::String("dark".to_string()))
3850 );
3851 }
3852 _ => panic!("Expected Batch result"),
3853 }
3854 }
3855}