1use super::ast::{self, Filter as AqlFilter, MutationOp, Operation};
7
8use crate::Aurora;
9use crate::error::{AqlError, ErrorCode, Result};
10use crate::types::{Document, Value};
11use serde::Serialize;
12use serde_json::Value as JsonValue;
13use std::collections::HashMap;
14
15pub type ExecutionContext = HashMap<String, JsonValue>;
16
17#[derive(Debug)]
19pub enum ExecutionResult {
20 Query(QueryResult),
22 Mutation(MutationResult),
24 Subscription(SubscriptionResult),
26 Batch(Vec<ExecutionResult>),
28 Schema(SchemaResult),
30 Migration(MigrationResult),
32}
33
34#[derive(Debug, Clone)]
35pub struct SchemaResult {
36 pub operation: String,
37 pub collection: String,
38 pub status: String,
39}
40
41#[derive(Debug, Clone)]
42pub struct MigrationResult {
43 pub version: String,
44 pub steps_applied: usize,
45 pub status: String,
46}
47
48#[derive(Debug, Clone, Serialize)]
49pub struct ExecutionPlan {
50 pub operations: Vec<String>,
51 pub estimated_cost: f64,
52}
53
54#[derive(Debug, Clone)]
56pub struct QueryResult {
57 pub collection: String,
58 pub documents: Vec<Document>,
59 pub total_count: Option<usize>,
60}
61
62#[derive(Debug, Clone)]
64pub struct MutationResult {
65 pub operation: String,
66 pub collection: String,
67 pub affected_count: usize,
68 pub returned_documents: Vec<Document>,
69}
70
71#[derive(Debug)]
73pub struct SubscriptionResult {
74 pub subscription_id: String,
75 pub collection: String,
76 pub stream: Option<crate::pubsub::ChangeListener>,
77}
78
79#[derive(Debug, Clone, Default)]
81pub struct ExecutionOptions {
82 pub skip_validation: bool,
84 pub apply_projections: bool,
86 pub variables: HashMap<String, JsonValue>,
88}
89
90impl ExecutionOptions {
91 pub fn new() -> Self {
92 Self {
93 skip_validation: false,
94 apply_projections: true,
95 variables: HashMap::new(),
96 }
97 }
98
99 pub fn with_variables(mut self, vars: HashMap<String, JsonValue>) -> Self {
100 self.variables = vars;
101 self
102 }
103
104 pub fn skip_validation(mut self) -> Self {
105 self.skip_validation = true;
106 self
107 }
108}
109
110pub async fn execute(db: &Aurora, aql: &str, options: ExecutionOptions) -> Result<ExecutionResult> {
112 let variables = serde_json::Value::Object(options.variables.clone().into_iter().collect());
114 let doc = super::parse_with_variables(aql, variables)?;
115
116 execute_document(db, &doc, &options).await
118}
119
120pub async fn execute_document(
122 db: &Aurora,
123 doc: &ast::Document,
124 options: &ExecutionOptions,
125) -> Result<ExecutionResult> {
126 if doc.operations.is_empty() {
127 return Err(AqlError::new(
128 ErrorCode::QueryError,
129 "No operations in document".to_string(),
130 ));
131 }
132
133 if doc.operations.len() == 1 {
134 execute_operation(db, &doc.operations[0], options).await
135 } else {
136 let mut results = Vec::new();
137 for op in &doc.operations {
138 results.push(execute_operation(db, op, options).await?);
139 }
140 Ok(ExecutionResult::Batch(results))
141 }
142}
143
144async fn execute_operation(
146 db: &Aurora,
147 op: &Operation,
148 options: &ExecutionOptions,
149) -> Result<ExecutionResult> {
150 match op {
151 Operation::Query(query) => execute_query(db, query, options).await,
152 Operation::Mutation(mutation) => execute_mutation(db, mutation, options).await,
153 Operation::Subscription(sub) => execute_subscription(db, sub, options).await,
154 Operation::Schema(schema) => execute_schema(db, schema, options).await,
155 Operation::Migration(migration) => execute_migration(db, migration, options).await,
156 Operation::FragmentDefinition(_) => {
157 Ok(ExecutionResult::Query(QueryResult {
159 collection: "__fragment".to_string(),
160 documents: vec![],
161 total_count: Some(0),
162 }))
163 }
164 Operation::Introspection(intro) => execute_introspection(db, intro).await,
165 Operation::Handler(_) => {
166 Ok(ExecutionResult::Query(QueryResult {
168 collection: "__handler".to_string(),
169 documents: vec![],
170 total_count: Some(0),
171 }))
172 }
173 }
174}
175
176async fn execute_query(
178 db: &Aurora,
179 query: &ast::Query,
180 options: &ExecutionOptions,
181) -> Result<ExecutionResult> {
182 if query.selection_set.is_empty() {
184 return Ok(ExecutionResult::Query(QueryResult {
185 collection: String::new(),
186 documents: vec![],
187 total_count: Some(0),
188 }));
189 }
190
191 let mut results = Vec::new();
193 for field in &query.selection_set {
194 let result = execute_collection_query(db, field, &query.variables_values, options).await?;
195 results.push(result);
196 }
197
198 if results.len() == 1 {
199 Ok(ExecutionResult::Query(results.remove(0)))
200 } else {
201 Ok(ExecutionResult::Batch(
202 results.into_iter().map(ExecutionResult::Query).collect(),
203 ))
204 }
205}
206
207async fn execute_collection_query(
209 db: &Aurora,
210 field: &ast::Field,
211 variables: &HashMap<String, ast::Value>,
212 options: &ExecutionOptions,
213) -> Result<QueryResult> {
214 let collection_name = &field.name;
215
216 let filter = extract_filter_from_args(&field.arguments)?;
218
219 let (limit, offset) = extract_pagination(&field.arguments);
221 let (first, after, _last, _before) = extract_cursor_pagination(&field.arguments);
222
223 let is_connection = field
226 .selection_set
227 .iter()
228 .any(|f| f.name == "edges" || f.name == "pageInfo");
229
230 let all_docs = db.aql_get_all_collection(collection_name).await?;
234
235 let mut filtered_docs_iter: Vec<Document> = if let Some(ref aql_filter) = filter {
237 all_docs
238 .into_iter()
239 .filter(|doc| matches_filter(doc, aql_filter, variables))
240 .collect()
241 } else {
242 all_docs
243 };
244
245 let orderings = extract_order_by(&field.arguments);
247 if !orderings.is_empty() {
248 apply_ordering(&mut filtered_docs_iter, &orderings);
249 }
250
251 let total_count = filtered_docs_iter.len();
252
253 let final_docs = if is_connection {
254 filtered_docs_iter.sort_by(|a, b| a.id.cmp(&b.id));
257
258 if let Some(after_cursor) = after {
260 if let Ok(after_id) = decode_cursor(&after_cursor) {
262 if let Some(pos) = filtered_docs_iter.iter().position(|d| d.id == after_id) {
265 filtered_docs_iter = filtered_docs_iter.into_iter().skip(pos + 1).collect();
267 }
268 }
269 }
270
271 let has_next_page = if let Some(l) = first {
272 filtered_docs_iter.len() > l
273 } else {
274 false
275 };
276
277 if let Some(l) = first {
279 filtered_docs_iter.truncate(l);
280 }
281
282 let mut edges = Vec::new();
284 let mut end_cursor = None;
285
286 for doc in filtered_docs_iter {
287 let cursor = encode_cursor(&Value::String(doc.id.clone()));
288 end_cursor = Some(cursor.clone());
289
290 let mut edge_data = HashMap::new();
291 edge_data.insert("cursor".to_string(), Value::String(cursor));
292
293 if let Some(edges_field) = field.selection_set.iter().find(|f| f.name == "edges") {
296 if let Some(node_field) =
297 edges_field.selection_set.iter().find(|f| f.name == "node")
298 {
299 let node_doc = apply_projection_with_lookups(
301 db,
302 doc,
303 &node_field.selection_set,
304 variables,
305 )
306 .await?;
307 edge_data.insert("node".to_string(), Value::Object(node_doc.data));
308 }
309 }
310
311 edges.push(Value::Object(edge_data));
312 }
313
314 let mut page_info_data = HashMap::new();
316 page_info_data.insert("hasNextPage".to_string(), Value::Bool(has_next_page));
317 if let Some(ec) = end_cursor {
318 page_info_data.insert("endCursor".to_string(), Value::String(ec));
319 } else {
320 page_info_data.insert("endCursor".to_string(), Value::Null);
321 }
322
323 let mut conn_data = HashMap::new();
325 conn_data.insert("edges".to_string(), Value::Array(edges));
326 conn_data.insert("pageInfo".to_string(), Value::Object(page_info_data));
327
328 vec![Document {
329 id: "connection".to_string(),
330 data: conn_data,
331 }]
332 } else {
333 let paginated_docs: Vec<Document> = filtered_docs_iter
337 .into_iter()
338 .skip(offset)
339 .take(limit.unwrap_or(usize::MAX))
340 .collect();
341
342 let has_aggregation = !field.selection_set.is_empty()
344 && field.selection_set.iter().any(|f| f.name == "aggregate");
345
346 let group_by_field = if !field.selection_set.is_empty() {
348 field.selection_set.iter().find(|f| f.name == "groupBy")
349 } else {
350 None
351 };
352
353 if let Some(gb_field) = group_by_field {
354 execute_group_by(&paginated_docs, gb_field)?
355 } else if has_aggregation {
356 let agg_doc = execute_aggregation(&paginated_docs, &field.selection_set)?;
357 vec![agg_doc]
358 } else if options.apply_projections && !field.selection_set.is_empty() {
359 let mut projected = Vec::new();
360 for doc in paginated_docs {
361 projected.push(
362 apply_projection_with_lookups(db, doc, &field.selection_set, variables).await?,
363 );
364 }
365 projected
366 } else {
367 paginated_docs
368 }
369 };
370
371 Ok(QueryResult {
372 collection: collection_name.clone(),
373 documents: final_docs,
374 total_count: Some(total_count),
375 })
376}
377
378fn execute_group_by(docs: &[Document], group_by_field: &ast::Field) -> Result<Vec<Document>> {
380 let key_field_name = group_by_field
382 .arguments
383 .iter()
384 .find(|a| a.name == "field")
385 .and_then(|a| match &a.value {
386 ast::Value::String(s) => Some(s.clone()),
387 _ => None,
388 })
389 .ok_or_else(|| {
390 AqlError::new(
391 ErrorCode::QueryError,
392 "groupBy requires a 'field' argument".to_string(),
393 )
394 })?;
395
396 let mut groups: HashMap<String, Vec<&Document>> = HashMap::new();
398
399 for doc in docs {
400 let val = doc.data.get(&key_field_name).unwrap_or(&Value::Null);
401 let key_str = match val {
402 Value::String(s) => s.clone(),
403 Value::Int(i) => i.to_string(),
404 Value::Float(f) => f.to_string(),
405 Value::Bool(b) => b.to_string(),
406 Value::Null => "null".to_string(),
407 _ => format!("{:?}", val), };
409
410 groups.entry(key_str).or_default().push(doc);
411 }
412
413 let mut result_docs = Vec::new();
415
416 for (group_key, group_docs) in groups {
417 let mut group_data = HashMap::new();
418
419 for field in &group_by_field.selection_set {
422 let alias = field.alias.as_ref().unwrap_or(&field.name).clone();
423
424 match field.name.as_str() {
425 "key" => {
426 group_data.insert(alias, Value::String(group_key.clone()));
427 }
428 "count" => {
429 group_data.insert(alias, Value::Int(group_docs.len() as i64));
430 }
431 "nodes" => {
432 let nodes: Vec<Value> = group_docs
434 .iter()
435 .map(|d| {
436 if !field.selection_set.is_empty() {
437 let proj = apply_projection((*d).clone(), &field.selection_set);
438 Value::Object(proj.data)
439 } else {
440 Value::Object(d.data.clone())
441 }
442 })
443 .collect();
444 group_data.insert(alias, Value::Array(nodes));
445 }
446 "aggregate" => {
447 let group_docs_owned: Vec<Document> =
449 group_docs.iter().map(|d| (*d).clone()).collect();
450 let agg_result = execute_aggregation(&group_docs_owned, &[field.clone()])?;
451 for (k, v) in agg_result.data {
453 group_data.insert(k, v);
454 }
455 }
456 _ => {
457 }
459 }
460 }
461
462 result_docs.push(Document {
463 id: format!("group_{}", group_key),
464 data: group_data,
465 });
466 }
467
468 Ok(result_docs)
469}
470
471fn execute_aggregation(docs: &[Document], selection_set: &[ast::Field]) -> Result<Document> {
473 let mut result_data = HashMap::new();
474
475 for field in selection_set {
476 let alias = field.alias.as_ref().unwrap_or(&field.name).clone();
477
478 if field.name == "aggregate" {
479 let mut agg_result = HashMap::new();
482
483 for agg_fn in &field.selection_set {
484 let agg_alias = agg_fn.alias.as_ref().unwrap_or(&agg_fn.name).clone();
485 let agg_name = &agg_fn.name;
486
487 let value = match agg_name.as_str() {
488 "count" => Value::Int(docs.len() as i64),
489 "sum" | "avg" | "min" | "max" => {
490 let target_field = agg_fn
493 .arguments
494 .iter()
495 .find(|a| a.name == "field")
496 .and_then(|a| match &a.value {
497 ast::Value::String(s) => Some(s.clone()),
498 _ => None,
499 })
500 .ok_or_else(|| {
501 AqlError::new(
502 ErrorCode::QueryError,
503 format!(
504 "Aggregation '{}' requires a 'field' argument",
505 agg_name
506 ),
507 )
508 })?;
509
510 let values: Vec<f64> = docs
512 .iter()
513 .filter_map(|d| {
514 d.data.get(&target_field).and_then(|v| match v {
515 Value::Int(i) => Some(*i as f64),
516 Value::Float(f) => Some(*f),
517 _ => None,
518 })
519 })
520 .collect();
521
522 match agg_name.as_str() {
523 "sum" => Value::Float(values.iter().sum()),
524 "avg" => {
525 if values.is_empty() {
526 Value::Null
527 } else {
528 let sum: f64 = values.iter().sum();
529 Value::Float(sum / values.len() as f64)
530 }
531 }
532 "min" => values
533 .iter()
534 .fold(None, |min, val| match min {
535 None => Some(*val),
536 Some(m) => Some(if *val < m { *val } else { m }),
537 })
538 .map(Value::Float)
539 .unwrap_or(Value::Null),
540 "max" => values
541 .iter()
542 .fold(None, |max, val| match max {
543 None => Some(*val),
544 Some(m) => Some(if *val > m { *val } else { m }),
545 })
546 .map(Value::Float)
547 .unwrap_or(Value::Null),
548 _ => Value::Null, }
550 }
551 _ => {
552 return Err(AqlError::new(
553 ErrorCode::QueryError,
554 format!("Unknown aggregation function '{}'", agg_name),
555 ));
556 }
557 };
558
559 agg_result.insert(agg_alias, value);
560 }
561
562 result_data.insert(alias, Value::Object(agg_result));
563 }
564 }
565
566 Ok(Document {
567 id: "aggregation_result".to_string(),
568 data: result_data,
569 })
570}
571
572async fn execute_lookup(
574 db: &Aurora,
575 parent_doc: &Document,
576 lookup: &ast::LookupSelection,
577 variables: &HashMap<String, ast::Value>,
578) -> Result<Value> {
579 let local_value = parent_doc.data.get(&lookup.local_field);
581
582 if local_value.is_none() {
583 return Ok(Value::Array(vec![]));
584 }
585
586 let local_value = local_value.unwrap();
587
588 let foreign_docs = db.aql_get_all_collection(&lookup.collection).await?;
590
591 let matching_docs: Vec<Document> = foreign_docs
593 .into_iter()
594 .filter(|doc| {
595 if let Some(foreign_val) = doc.data.get(&lookup.foreign_field) {
596 db_values_equal(foreign_val, local_value)
597 } else {
598 false
599 }
600 })
601 .collect();
602
603 let filtered_docs = if let Some(ref filter) = lookup.filter {
605 matching_docs
606 .into_iter()
607 .filter(|doc| matches_filter(doc, filter, variables))
608 .collect()
609 } else {
610 matching_docs
611 };
612
613 let projected: Vec<Value> = filtered_docs
615 .into_iter()
616 .map(|doc| {
617 let fields: Vec<ast::Field> = lookup
619 .selection_set
620 .iter()
621 .filter_map(|sel| {
622 if let ast::Selection::Field(f) = sel {
623 Some(f.clone())
624 } else {
625 None
626 }
627 })
628 .collect();
629
630 if fields.is_empty() {
631 Value::Object(doc.data)
632 } else {
633 let projected_doc = apply_projection(doc, &fields);
634 Value::Object(projected_doc.data)
635 }
636 })
637 .collect();
638
639 Ok(Value::Array(projected))
640}
641
642fn db_values_equal(a: &Value, b: &Value) -> bool {
644 match (a, b) {
645 (Value::String(s1), Value::String(s2)) => s1 == s2,
646 (Value::Int(i1), Value::Int(i2)) => i1 == i2,
647 (Value::Float(f1), Value::Float(f2)) => (f1 - f2).abs() < f64::EPSILON,
648 (Value::Bool(b1), Value::Bool(b2)) => b1 == b2,
649 (Value::Null, Value::Null) => true,
650 (Value::String(s), Value::Int(i)) => s.parse::<i64>().ok() == Some(*i),
652 (Value::Int(i), Value::String(s)) => s.parse::<i64>().ok() == Some(*i),
653 _ => false,
654 }
655}
656
657pub async fn apply_projection_with_lookups(
659 db: &Aurora,
660 doc: Document,
661 fields: &[ast::Field],
662 variables: &HashMap<String, ast::Value>,
663) -> Result<Document> {
664 if fields.is_empty() {
665 return Ok(doc);
666 }
667
668 let mut projected_data = HashMap::new();
669
670 if let Some(id_val) = doc.data.get("id") {
672 projected_data.insert("id".to_string(), id_val.clone());
673 }
674 projected_data.insert("id".to_string(), Value::String(doc.id.clone()));
676
677 for field in fields {
678 let field_name = field.alias.as_ref().unwrap_or(&field.name);
679 let source_name = &field.name;
680
681 let is_lookup = field.arguments.iter().any(|arg| {
683 arg.name == "collection" || arg.name == "localField" || arg.name == "foreignField"
684 });
685
686 if is_lookup {
687 let filter = extract_filter_from_args(&field.arguments).ok().flatten();
690
691 let lookup = ast::LookupSelection {
692 collection: extract_string_arg(&field.arguments, "collection").unwrap_or_default(),
693 local_field: extract_string_arg(&field.arguments, "localField").unwrap_or_default(),
694 foreign_field: extract_string_arg(&field.arguments, "foreignField")
695 .unwrap_or_default(),
696 filter,
697 selection_set: field
698 .selection_set
699 .iter()
700 .map(|f| ast::Selection::Field(f.clone()))
701 .collect(),
702 };
703
704 let lookup_result = execute_lookup(db, &doc, &lookup, variables).await?;
705 projected_data.insert(field_name.clone(), lookup_result);
706 } else if let Some(value) = doc.data.get(source_name) {
707 projected_data.insert(field_name.clone(), value.clone());
708 }
709 }
710
711 Ok(Document {
712 id: doc.id,
713 data: projected_data,
714 })
715}
716
717fn extract_string_arg(args: &[ast::Argument], name: &str) -> Option<String> {
719 args.iter().find(|a| a.name == name).and_then(|a| {
720 if let ast::Value::String(s) = &a.value {
721 Some(s.clone())
722 } else {
723 None
724 }
725 })
726}
727
728pub fn validate_document(doc: &Document, rules: &[ast::ValidationRule]) -> Result<Vec<String>> {
730 let mut errors = Vec::new();
731
732 for rule in rules {
733 let field_value = doc.data.get(&rule.field);
734
735 for constraint in &rule.constraints {
736 match constraint {
737 ast::ValidationConstraint::Format(format) => {
738 if let Some(Value::String(s)) = field_value {
739 match format.as_str() {
740 "email" => {
741 if !s.contains('@') || !s.contains('.') {
742 errors.push(format!("{}: invalid email format", rule.field));
743 }
744 }
745 "url" => {
746 if !s.starts_with("http://") && !s.starts_with("https://") {
747 errors.push(format!("{}: invalid URL format", rule.field));
748 }
749 }
750 "uuid" => {
751 if uuid::Uuid::parse_str(s).is_err() {
752 errors.push(format!("{}: invalid UUID format", rule.field));
753 }
754 }
755 _ => {}
756 }
757 }
758 }
759 ast::ValidationConstraint::Min(min) => {
760 let valid = match field_value {
761 Some(Value::Int(i)) => (*i as f64) >= *min,
762 Some(Value::Float(f)) => *f >= *min,
763 _ => true,
764 };
765 if !valid {
766 errors.push(format!("{}: value must be >= {}", rule.field, min));
767 }
768 }
769 ast::ValidationConstraint::Max(max) => {
770 let valid = match field_value {
771 Some(Value::Int(i)) => (*i as f64) <= *max,
772 Some(Value::Float(f)) => *f <= *max,
773 _ => true,
774 };
775 if !valid {
776 errors.push(format!("{}: value must be <= {}", rule.field, max));
777 }
778 }
779 ast::ValidationConstraint::MinLength(min_len) => {
780 if let Some(Value::String(s)) = field_value {
781 if (s.len() as i64) < *min_len {
782 errors.push(format!("{}: length must be >= {}", rule.field, min_len));
783 }
784 }
785 }
786 ast::ValidationConstraint::MaxLength(max_len) => {
787 if let Some(Value::String(s)) = field_value {
788 if (s.len() as i64) > *max_len {
789 errors.push(format!("{}: length must be <= {}", rule.field, max_len));
790 }
791 }
792 }
793 ast::ValidationConstraint::Pattern(pattern) => {
794 if let Some(Value::String(s)) = field_value {
795 if let Ok(re) = regex::Regex::new(pattern) {
796 if !re.is_match(s) {
797 errors.push(format!(
798 "{}: does not match pattern '{}'",
799 rule.field, pattern
800 ));
801 }
802 }
803 }
804 }
805 }
806 }
807 }
808
809 Ok(errors)
810}
811
812pub fn execute_downsample(
815 docs: &[Document],
816 interval: &str,
817 aggregation: &str,
818 time_field: &str,
819 value_field: &str,
820) -> Result<Vec<Document>> {
821 let interval_secs = parse_interval(interval)?;
823
824 let mut buckets: HashMap<i64, Vec<&Document>> = HashMap::new();
826
827 for doc in docs {
828 if let Some(Value::Int(ts)) = doc.data.get(time_field) {
829 let bucket = (*ts / interval_secs) * interval_secs;
830 buckets.entry(bucket).or_default().push(doc);
831 }
832 }
833
834 let mut result_docs = Vec::new();
836 let mut sorted_buckets: Vec<_> = buckets.into_iter().collect();
837 sorted_buckets.sort_by_key(|(k, _)| *k);
838
839 for (bucket_ts, bucket_docs) in sorted_buckets {
840 let values: Vec<f64> = bucket_docs
841 .iter()
842 .filter_map(|d| match d.data.get(value_field) {
843 Some(Value::Int(i)) => Some(*i as f64),
844 Some(Value::Float(f)) => Some(*f),
845 _ => None,
846 })
847 .collect();
848
849 let agg_value = match aggregation {
850 "avg" | "average" => {
851 if values.is_empty() {
852 0.0
853 } else {
854 values.iter().sum::<f64>() / values.len() as f64
855 }
856 }
857 "sum" => values.iter().sum(),
858 "min" => values.iter().cloned().fold(f64::INFINITY, f64::min),
859 "max" => values.iter().cloned().fold(f64::NEG_INFINITY, f64::max),
860 "count" => values.len() as f64,
861 "first" => *values.first().unwrap_or(&0.0),
862 "last" => *values.last().unwrap_or(&0.0),
863 _ => values.iter().sum::<f64>() / values.len().max(1) as f64,
864 };
865
866 let mut data = HashMap::new();
867 data.insert(time_field.to_string(), Value::Int(bucket_ts));
868 data.insert(value_field.to_string(), Value::Float(agg_value));
869 data.insert("count".to_string(), Value::Int(bucket_docs.len() as i64));
870
871 result_docs.push(Document {
872 id: format!("bucket_{}", bucket_ts),
873 data,
874 });
875 }
876
877 Ok(result_docs)
878}
879
880fn parse_interval(interval: &str) -> Result<i64> {
882 let interval = interval.trim().to_lowercase();
883 let (num_str, unit) = interval.split_at(interval.len().saturating_sub(1));
884 let num: i64 = num_str.parse().unwrap_or(1);
885 let multiplier = match unit {
886 "s" => 1,
887 "m" => 60,
888 "h" => 3600,
889 "d" => 86400,
890 "w" => 604800,
891 _ => {
892 return Err(AqlError::new(
893 ErrorCode::QueryError,
894 format!("Invalid interval unit '{}'", unit),
895 ))
896 }
897 };
898 Ok(num * multiplier)
899}
900
901pub fn execute_window_function(
903 docs: &[Document],
904 field: &str,
905 function: &str,
906 window_size: usize,
907) -> Result<Vec<Document>> {
908 let mut result_docs = Vec::new();
909
910 for (i, doc) in docs.iter().enumerate() {
911 let window_start = i.saturating_sub(window_size - 1);
912 let window: Vec<f64> = docs[window_start..=i]
913 .iter()
914 .filter_map(|d| match d.data.get(field) {
915 Some(Value::Int(v)) => Some(*v as f64),
916 Some(Value::Float(v)) => Some(*v),
917 _ => None,
918 })
919 .collect();
920
921 let window_value = match function {
922 "ROW_NUMBER" | "row_number" => (i + 1) as f64,
923 "RANK" | "rank" => (i + 1) as f64, "SUM" | "sum" | "running_sum" => window.iter().sum(),
925 "AVG" | "avg" | "moving_avg" => {
926 if window.is_empty() {
927 0.0
928 } else {
929 window.iter().sum::<f64>() / window.len() as f64
930 }
931 }
932 "MIN" | "min" => window.iter().cloned().fold(f64::INFINITY, f64::min),
933 "MAX" | "max" => window.iter().cloned().fold(f64::NEG_INFINITY, f64::max),
934 "COUNT" | "count" => window.len() as f64,
935 _ => 0.0,
936 };
937
938 let mut new_data = doc.data.clone();
939 new_data.insert(
940 format!("{}_window", function.to_lowercase()),
941 Value::Float(window_value),
942 );
943
944 result_docs.push(Document {
945 id: doc.id.clone(),
946 data: new_data,
947 });
948 }
949
950 Ok(result_docs)
951}
952
953async fn execute_mutation(
955 db: &Aurora,
956 mutation: &ast::Mutation,
957 options: &ExecutionOptions,
958) -> Result<ExecutionResult> {
959 let mut results = Vec::new();
960 let mut context: ExecutionContext = HashMap::new();
961
962 for mut_op in &mutation.operations {
963 let result =
964 execute_mutation_op(db, mut_op, &mutation.variables_values, &context, options).await?;
965
966 if let Some(alias) = &mut_op.alias {
968 if let Some(doc) = result.returned_documents.first() {
969 let mut json_map = serde_json::Map::new();
971 for (k, v) in &doc.data {
972 json_map.insert(k.clone(), aurora_value_to_json_value(v));
973 }
974
975 json_map.insert("id".to_string(), JsonValue::String(doc.id.clone()));
976
977 let doc_json = JsonValue::Object(json_map);
978
979 context.insert(alias.clone(), doc_json);
980 }
981 }
982
983 results.push(result);
984 }
985
986 if results.len() == 1 {
987 Ok(ExecutionResult::Mutation(results.remove(0)))
988 } else {
989 Ok(ExecutionResult::Batch(
990 results.into_iter().map(ExecutionResult::Mutation).collect(),
991 ))
992 }
993}
994
995use base64::{Engine as _, engine::general_purpose};
996use futures::future::{BoxFuture, FutureExt};
997
998fn execute_mutation_op<'a>(
1000 db: &'a Aurora,
1001 mut_op: &'a ast::MutationOperation,
1002 variables: &'a HashMap<String, ast::Value>,
1003 context: &'a ExecutionContext,
1004 options: &'a ExecutionOptions,
1005) -> BoxFuture<'a, Result<MutationResult>> {
1006 async move {
1007 match &mut_op.operation {
1008 MutationOp::Insert { collection, data } => {
1009 let resolved_data = resolve_value(data, variables, context);
1010 let doc_data = aql_value_to_hashmap(&resolved_data)?;
1011 let doc = db.aql_insert(collection, doc_data).await?;
1012
1013 let returned = if !mut_op.selection_set.is_empty() && options.apply_projections {
1014 vec![apply_projection(doc.clone(), &mut_op.selection_set)]
1015 } else {
1016 vec![doc]
1017 };
1018
1019 Ok(MutationResult {
1020 operation: "insert".to_string(),
1021 collection: collection.clone(),
1022 affected_count: 1,
1023 returned_documents: returned,
1024 })
1025 }
1026
1027 MutationOp::InsertMany { collection, data } => {
1028 let mut docs = Vec::new();
1029 for item in data {
1030 let resolved_item = resolve_value(item, variables, context);
1031 let doc_data = aql_value_to_hashmap(&resolved_item)?;
1032 let doc = db.aql_insert(collection, doc_data).await?;
1033 docs.push(doc);
1034 }
1035
1036 let count = docs.len();
1037 let returned = if !mut_op.selection_set.is_empty() && options.apply_projections {
1038 docs.into_iter()
1039 .map(|d| apply_projection(d, &mut_op.selection_set))
1040 .collect()
1041 } else {
1042 docs
1043 };
1044
1045 Ok(MutationResult {
1046 operation: "insertMany".to_string(),
1047 collection: collection.clone(),
1048 affected_count: count,
1049 returned_documents: returned,
1050 })
1051 }
1052
1053 MutationOp::Update {
1054 collection,
1055 filter,
1056 data,
1057 } => {
1058 let resolved_data = resolve_value(data, variables, context);
1059 let update_data = aql_value_to_hashmap(&resolved_data)?;
1060 let all_docs = db.aql_get_all_collection(collection).await?;
1061
1062 let mut affected = 0;
1063 let mut returned = Vec::new();
1064
1065 for doc in all_docs {
1066 let should_update = filter
1067 .as_ref()
1068 .map(|f| matches_filter(&doc, f, variables))
1069 .unwrap_or(true);
1070
1071 if should_update {
1072 let updated_doc = db
1073 .aql_update_document(collection, &doc.id, update_data.clone())
1074 .await?;
1075 returned.push(updated_doc);
1076 affected += 1;
1077 }
1078 }
1079
1080 let returned = if !mut_op.selection_set.is_empty() && options.apply_projections {
1081 returned
1082 .into_iter()
1083 .map(|d| apply_projection(d, &mut_op.selection_set))
1084 .collect()
1085 } else {
1086 returned
1087 };
1088
1089 Ok(MutationResult {
1090 operation: "update".to_string(),
1091 collection: collection.clone(),
1092 affected_count: affected,
1093 returned_documents: returned,
1094 })
1095 }
1096
1097 MutationOp::Upsert {
1098 collection,
1099 filter,
1100 data,
1101 } => {
1102 let resolved_data = resolve_value(data, variables, context);
1104 let update_data = aql_value_to_hashmap(&resolved_data)?;
1105 let all_docs = db.aql_get_all_collection(collection).await?;
1106
1107 let matching: Vec<_> = all_docs
1108 .iter()
1109 .filter(|doc| {
1110 filter
1111 .as_ref()
1112 .map(|f| matches_filter(doc, f, variables))
1113 .unwrap_or(false)
1114 })
1115 .collect();
1116
1117 if matching.is_empty() {
1118 let doc = db.aql_insert(collection, update_data).await?;
1120 Ok(MutationResult {
1121 operation: "upsert(insert)".to_string(),
1122 collection: collection.clone(),
1123 affected_count: 1,
1124 returned_documents: vec![doc],
1125 })
1126 } else {
1127 let mut affected = 0;
1129 let mut returned = Vec::new();
1130
1131 for doc in matching {
1132 let updated_doc = db
1133 .aql_update_document(collection, &doc.id, update_data.clone())
1134 .await?;
1135 returned.push(updated_doc);
1136 affected += 1;
1137 }
1138
1139 Ok(MutationResult {
1140 operation: "upsert(update)".to_string(),
1141 collection: collection.clone(),
1142 affected_count: affected,
1143 returned_documents: returned,
1144 })
1145 }
1146 }
1147
1148 MutationOp::Delete { collection, filter } => {
1149 let all_docs = db.aql_get_all_collection(collection).await?;
1150 let mut affected = 0;
1151 let mut returned = Vec::new();
1152
1153 for doc in all_docs {
1154 let should_delete = filter
1155 .as_ref()
1156 .map(|f| matches_filter(&doc, f, variables))
1157 .unwrap_or(true);
1158
1159 if should_delete {
1160 let deleted_doc = db.aql_delete_document(collection, &doc.id).await?;
1161 returned.push(deleted_doc);
1162 affected += 1;
1163 }
1164 }
1165
1166 Ok(MutationResult {
1167 operation: "delete".to_string(),
1168 collection: collection.clone(),
1169 affected_count: affected,
1170 returned_documents: returned,
1171 })
1172 }
1173
1174 MutationOp::EnqueueJob {
1175 job_type,
1176 payload,
1177 priority,
1178 scheduled_at,
1179 max_retries,
1180 } => {
1181 let workers = db
1182 .workers
1183 .as_ref()
1184 .ok_or_else(|| AqlError::invalid_operation("Worker system not initialized"))?;
1185
1186 let mut job = crate::workers::Job::new(job_type);
1187
1188 let resolved_payload = resolve_value(payload, variables, context);
1191 if let ast::Value::Object(p) = resolved_payload {
1192 for (k, v) in p {
1193 let db_val = aql_value_to_db_value(&v)?;
1194 let json_val: serde_json::Value =
1195 serde_json::to_value(&db_val).map_err(|e| {
1196 AqlError::new(ErrorCode::SerializationError, e.to_string())
1197 })?;
1198 let key_str = k.to_string();
1199 job = job.add_field(key_str, json_val);
1200 }
1201 }
1202
1203 let p_enum = match priority {
1205 ast::JobPriority::Critical => crate::workers::JobPriority::Critical,
1206 ast::JobPriority::High => crate::workers::JobPriority::High,
1207 ast::JobPriority::Low => crate::workers::JobPriority::Low,
1208 ast::JobPriority::Normal => crate::workers::JobPriority::Normal,
1209 };
1210 job = job.with_priority(p_enum); if let Some(s_str) = scheduled_at {
1212 if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(s_str) {
1213 job = job.scheduled_at(dt.with_timezone(&chrono::Utc));
1214 }
1215 }
1216
1217 if let Some(retries) = max_retries {
1219 job = job.with_max_retries((*retries).try_into().unwrap_or(3));
1220 }
1221
1222 let job_id = workers.enqueue(job).await?;
1223
1224 Ok(MutationResult {
1225 operation: "enqueueJob".to_string(),
1226 collection: "jobs".to_string(),
1227 affected_count: 1,
1228 returned_documents: vec![Document {
1229 id: job_id,
1230 data: HashMap::new(),
1231 }],
1232 })
1233 }
1234
1235 MutationOp::Transaction { operations } => {
1236 let tx = db.aql_begin_transaction()?;
1238 let mut results = Vec::new();
1239
1240 for inner_op in operations {
1241 match execute_mutation_op(db, inner_op, variables, context, options).await {
1242 Ok(result) => results.push(result),
1243 Err(e) => {
1244 let _ = db.aql_rollback_transaction(tx).await;
1245 return Err(e);
1246 }
1247 }
1248 }
1249
1250 db.aql_commit_transaction(tx).await?;
1251
1252 let total_affected: usize = results.iter().map(|r| r.affected_count).sum();
1253 let all_returned: Vec<Document> = results
1254 .into_iter()
1255 .flat_map(|r| r.returned_documents)
1256 .collect();
1257
1258 Ok(MutationResult {
1259 operation: "transaction".to_string(),
1260 collection: "multiple".to_string(),
1261 affected_count: total_affected,
1262 returned_documents: all_returned,
1263 })
1264 }
1265 }
1266 }
1267 .boxed()
1268}
1269
1270async fn execute_subscription(
1272 db: &Aurora,
1273 sub: &ast::Subscription,
1274 _options: &ExecutionOptions,
1275) -> Result<ExecutionResult> {
1276 let collection = sub
1277 .selection_set
1278 .first()
1279 .map(|f| f.name.clone())
1280 .unwrap_or_default();
1281
1282 if collection.is_empty() {
1283 return Err(AqlError::new(
1284 ErrorCode::QueryError,
1285 "Subscription must select a collection".to_string(),
1286 ));
1287 }
1288
1289 let mut listener = db.pubsub.listen(&collection);
1291
1292 if let Some(field) = sub.selection_set.first() {
1295 let filter_opt = extract_filter_from_args(&field.arguments)?;
1296 if let Some(aql_filter) = filter_opt {
1297 if let Some(event_filter) = convert_aql_filter_to_event_filter(&aql_filter) {
1298 listener = listener.filter(event_filter);
1299 } else {
1300 }
1303 }
1304 }
1305
1306 Ok(ExecutionResult::Subscription(SubscriptionResult {
1307 subscription_id: uuid::Uuid::new_v4().to_string(),
1308 collection,
1309 stream: Some(listener),
1310 }))
1311}
1312
1313async fn execute_introspection(
1315 db: &Aurora,
1316 intro: &ast::IntrospectionQuery,
1317) -> Result<ExecutionResult> {
1318 let mut result_data = HashMap::new();
1319
1320 let collection_stats = db.get_collection_stats().unwrap_or_default();
1322 let collection_names: Vec<String> = collection_stats.keys().cloned().collect();
1323
1324 for field_name in &intro.fields {
1325 match field_name.as_str() {
1326 "collections" => {
1327 let collection_list: Vec<Value> = collection_names
1329 .iter()
1330 .map(|name| Value::String(name.clone()))
1331 .collect();
1332 result_data.insert("collections".to_string(), Value::Array(collection_list));
1333 }
1334 "fields" => {
1335 let mut all_fields = HashMap::new();
1337 for name in &collection_names {
1338 if let Ok(coll) = db.get_collection_definition(name) {
1339 let field_names: Vec<Value> = coll
1340 .fields
1341 .keys()
1342 .map(|k| Value::String(k.clone()))
1343 .collect();
1344 all_fields.insert(name.clone(), Value::Array(field_names));
1345 }
1346 }
1347 result_data.insert("fields".to_string(), Value::Object(all_fields));
1348 }
1349 "relations" => {
1350 result_data.insert("relations".to_string(), Value::Array(vec![]));
1352 }
1353 _ => {
1354 }
1356 }
1357 }
1358
1359 Ok(ExecutionResult::Query(QueryResult {
1360 collection: "__schema".to_string(),
1361 documents: vec![Document {
1362 id: "__schema".to_string(),
1363 data: result_data,
1364 }],
1365 total_count: Some(1),
1366 }))
1367}
1368
1369fn convert_ast_field_to_db_field(field: &ast::FieldDef) -> Result<crate::types::FieldDefinition> {
1371 use crate::types::{FieldDefinition, FieldType};
1372
1373 let field_type = match field.field_type.name.as_str() {
1375 "String" | "ID" | "Email" | "URL" | "PhoneNumber" | "DateTime" | "Date" | "Time" => {
1376 FieldType::String
1377 }
1378 "Int" => FieldType::Int,
1379 "Float" => FieldType::Float,
1380 "Boolean" => FieldType::Bool,
1381 "Uuid" => FieldType::Uuid,
1382 "Object" | "Json" => FieldType::Object,
1383 "Any" => FieldType::Any,
1384 "Array" => FieldType::Array,
1386 _ => FieldType::Any,
1387 };
1388
1389 let field_type = if field.field_type.is_array {
1391 FieldType::Array
1392 } else {
1393 field_type
1394 };
1395
1396 let mut unique = false;
1398 let mut indexed = false;
1399
1400 for directive in &field.directives {
1401 match directive.name.as_str() {
1402 "unique" => unique = true,
1403 "index" | "indexed" => indexed = true,
1404 _ => {}
1405 }
1406 }
1407
1408 if matches!(field_type, FieldType::Any) && (unique || indexed) {
1410 return Err(AqlError::new(
1411 ErrorCode::InvalidDefinition,
1412 format!(
1413 "Field '{}' of type 'Any' cannot be unique or indexed.",
1414 field.name
1415 ),
1416 ));
1417 }
1418
1419 Ok(FieldDefinition {
1420 field_type,
1421 unique,
1422 indexed,
1423 })
1424}
1425
1426async fn execute_schema(
1428 db: &Aurora,
1429 schema: &ast::Schema,
1430 _options: &ExecutionOptions,
1431) -> Result<ExecutionResult> {
1432 let mut results = Vec::new();
1433
1434 for op in &schema.operations {
1435 match op {
1436 ast::SchemaOp::DefineCollection {
1437 name,
1438 if_not_exists,
1439 fields,
1440 directives: _,
1441 } => {
1442 if *if_not_exists && db.get_collection_definition(name).is_ok() {
1444 results.push(ExecutionResult::Schema(SchemaResult {
1445 operation: "defineCollection".to_string(),
1446 collection: name.clone(),
1447 status: "skipped (exists)".to_string(),
1448 }));
1449 continue;
1450 }
1451
1452 let mut db_fields = std::collections::HashMap::new();
1454 for f in fields {
1455 let def = convert_ast_field_to_db_field(f)?;
1456 db_fields.insert(f.name.clone(), def);
1457 }
1458
1459 db.create_collection_schema(name, db_fields).await?;
1460
1461 results.push(ExecutionResult::Schema(SchemaResult {
1462 operation: "defineCollection".to_string(),
1463 collection: name.clone(),
1464 status: "created".to_string(),
1465 }));
1466 }
1467 ast::SchemaOp::AlterCollection { name, actions } => {
1468 for action in actions {
1469 match action {
1470 ast::AlterAction::AddField(field_def) => {
1471 let def = convert_ast_field_to_db_field(field_def)?;
1472 db.add_field_to_schema(name, field_def.name.clone(), def)
1473 .await?;
1474 }
1475 ast::AlterAction::DropField(field_name) => {
1476 db.drop_field_from_schema(name, field_name.clone()).await?;
1477 }
1478 ast::AlterAction::RenameField { from, to } => {
1479 db.rename_field_in_schema(name, from.clone(), to.clone())
1480 .await?;
1481 }
1482 ast::AlterAction::ModifyField(field_def) => {
1483 let def = convert_ast_field_to_db_field(field_def)?;
1484 db.modify_field_in_schema(name, field_def.name.clone(), def)
1485 .await?;
1486 }
1487 }
1488 }
1489 results.push(ExecutionResult::Schema(SchemaResult {
1490 operation: "alterCollection".to_string(),
1491 collection: name.clone(),
1492 status: "modified".to_string(),
1493 }));
1494 }
1495 ast::SchemaOp::DropCollection { name, if_exists } => {
1496 if *if_exists && db.get_collection_definition(name).is_err() {
1497 results.push(ExecutionResult::Schema(SchemaResult {
1498 operation: "dropCollection".to_string(),
1499 collection: name.clone(),
1500 status: "skipped (not found)".to_string(),
1501 }));
1502 continue;
1503 }
1504
1505 db.drop_collection_schema(name).await?;
1506
1507 results.push(ExecutionResult::Schema(SchemaResult {
1508 operation: "dropCollection".to_string(),
1509 collection: name.clone(),
1510 status: "dropped".to_string(),
1511 }));
1512 }
1513 }
1514 }
1515
1516 if results.len() == 1 {
1517 Ok(results.remove(0))
1518 } else {
1519 Ok(ExecutionResult::Batch(results))
1520 }
1521}
1522
1523async fn execute_migration(
1525 db: &Aurora,
1526 migration: &ast::Migration,
1527 options: &ExecutionOptions,
1528) -> Result<ExecutionResult> {
1529 let mut results = Vec::new();
1530
1531 for step in &migration.steps {
1532 if db.is_migration_applied(&step.version).await? {
1534 continue;
1535 }
1536
1537 let mut applied_count = 0;
1538 for action in &step.actions {
1539 match action {
1540 ast::MigrationAction::Schema(schema_op) => {
1541 let schema = ast::Schema {
1543 operations: vec![schema_op.clone()],
1544 };
1545 execute_schema(db, &schema, options).await?;
1546 applied_count += 1;
1547 }
1548 ast::MigrationAction::DataMigration(data_mig) => {
1549 let collection = &data_mig.collection;
1554 let docs = db.aql_get_all_collection(collection).await?;
1555 let engine = crate::computed::ComputedEngine::new();
1556
1557 for doc in docs {
1558 let mut updated_data = doc.data.clone();
1560 let mut changed = false;
1561
1562 for transform in &data_mig.transforms {
1563 let matches_filter = match &transform.filter {
1565 Some(f) => check_ast_filter_match(f, &doc),
1566 None => true,
1567 };
1568
1569 if matches_filter {
1570 if let Some(new_value) =
1572 engine.evaluate(&transform.expression, &doc)
1573 {
1574 updated_data.insert(transform.field.clone(), new_value);
1575 changed = true;
1576 }
1577 }
1578 }
1579
1580 if changed {
1581 db.aql_update_document(collection, &doc.id, updated_data)
1582 .await?;
1583 }
1584 }
1585 applied_count += 1;
1586 }
1587 }
1588 }
1589
1590 db.mark_migration_applied(&step.version).await?;
1591
1592 results.push(ExecutionResult::Migration(MigrationResult {
1593 version: step.version.clone(),
1594 steps_applied: applied_count,
1595 status: "applied".to_string(),
1596 }));
1597 }
1598
1599 let total_applied = results
1601 .iter()
1602 .map(|r| {
1603 if let ExecutionResult::Migration(m) = r {
1604 m.steps_applied
1605 } else {
1606 0
1607 }
1608 })
1609 .sum();
1610
1611 if results.is_empty() {
1612 Ok(ExecutionResult::Migration(MigrationResult {
1614 version: migration
1615 .steps
1616 .first()
1617 .map(|s| s.version.clone())
1618 .unwrap_or_default(),
1619 steps_applied: 0,
1620 status: "skipped (already applied)".to_string(),
1621 }))
1622 } else if results.len() == 1 {
1623 Ok(results.remove(0))
1624 } else {
1625 Ok(ExecutionResult::Migration(MigrationResult {
1627 version: "batch".to_string(),
1628 steps_applied: total_applied,
1629 status: "applied".to_string(),
1630 }))
1631 }
1632}
1633
1634fn extract_filter_from_args(args: &[ast::Argument]) -> Result<Option<AqlFilter>> {
1638 for arg in args {
1639 if arg.name == "where" || arg.name == "filter" {
1640 return Ok(Some(value_to_filter(&arg.value)?));
1641 }
1642 }
1643 Ok(None)
1644}
1645
1646fn extract_order_by(args: &[ast::Argument]) -> Vec<ast::Ordering> {
1650 let mut orderings = Vec::new();
1651
1652 for arg in args {
1653 if arg.name == "orderBy" {
1654 match &arg.value {
1655 ast::Value::String(field) => {
1657 orderings.push(ast::Ordering {
1658 field: field.clone(),
1659 direction: ast::SortDirection::Asc,
1660 });
1661 }
1662 ast::Value::Object(map) => {
1664 if let Some(ordering) = parse_ordering_object(map) {
1665 orderings.push(ordering);
1666 }
1667 }
1668 ast::Value::Array(arr) => {
1670 for val in arr {
1671 if let ast::Value::Object(map) = val {
1672 if let Some(ordering) = parse_ordering_object(map) {
1673 orderings.push(ordering);
1674 }
1675 }
1676 }
1677 }
1678 _ => {}
1679 }
1680 }
1681 }
1682
1683 orderings
1684}
1685
1686fn parse_ordering_object(map: &HashMap<String, ast::Value>) -> Option<ast::Ordering> {
1688 let field = map.get("field").and_then(|v| {
1689 if let ast::Value::String(s) = v {
1690 Some(s.clone())
1691 } else {
1692 None
1693 }
1694 })?;
1695
1696 let direction = map
1697 .get("direction")
1698 .and_then(|v| match v {
1699 ast::Value::String(s) | ast::Value::Enum(s) => match s.to_uppercase().as_str() {
1700 "ASC" | "ASCENDING" => Some(ast::SortDirection::Asc),
1701 "DESC" | "DESCENDING" => Some(ast::SortDirection::Desc),
1702 _ => None,
1703 },
1704 _ => None,
1705 })
1706 .unwrap_or(ast::SortDirection::Asc);
1707
1708 Some(ast::Ordering { field, direction })
1709}
1710
1711fn apply_ordering(docs: &mut [Document], orderings: &[ast::Ordering]) {
1713 if orderings.is_empty() {
1714 return;
1715 }
1716
1717 docs.sort_by(|a, b| {
1718 for ordering in orderings {
1719 let a_val = a.data.get(&ordering.field);
1720 let b_val = b.data.get(&ordering.field);
1721
1722 let cmp = compare_values(a_val, b_val);
1723
1724 if cmp != std::cmp::Ordering::Equal {
1725 return match ordering.direction {
1726 ast::SortDirection::Asc => cmp,
1727 ast::SortDirection::Desc => cmp.reverse(),
1728 };
1729 }
1730 }
1731 std::cmp::Ordering::Equal
1732 });
1733}
1734
1735fn compare_values(a: Option<&Value>, b: Option<&Value>) -> std::cmp::Ordering {
1737 match (a, b) {
1738 (None, None) => std::cmp::Ordering::Equal,
1739 (None, Some(_)) => std::cmp::Ordering::Less,
1740 (Some(_), None) => std::cmp::Ordering::Greater,
1741 (Some(av), Some(bv)) => {
1742 match (av, bv) {
1743 (Value::Int(ai), Value::Int(bi)) => ai.cmp(bi),
1744 (Value::Float(af), Value::Float(bf)) => {
1745 af.partial_cmp(bf).unwrap_or(std::cmp::Ordering::Equal)
1746 }
1747 (Value::String(as_), Value::String(bs)) => as_.cmp(bs),
1748 (Value::Bool(ab), Value::Bool(bb)) => ab.cmp(bb),
1749 _ => format!("{:?}", av).cmp(&format!("{:?}", bv)),
1751 }
1752 }
1753 }
1754}
1755
1756fn convert_aql_filter_to_event_filter(filter: &AqlFilter) -> Option<crate::pubsub::EventFilter> {
1758 use crate::pubsub::EventFilter;
1759
1760 match filter {
1761 AqlFilter::Eq(field, value) => {
1762 let db_val = aql_value_to_db_value(value).ok()?;
1763 Some(EventFilter::FieldEquals(field.clone(), db_val))
1764 }
1765 AqlFilter::Gt(field, value) => {
1766 let db_val = aql_value_to_db_value(value).ok()?;
1767 Some(EventFilter::Gt(field.clone(), db_val))
1768 }
1769 AqlFilter::Gte(field, value) => {
1770 let db_val = aql_value_to_db_value(value).ok()?;
1771 Some(EventFilter::Gte(field.clone(), db_val))
1772 }
1773 AqlFilter::Lt(field, value) => {
1774 let db_val = aql_value_to_db_value(value).ok()?;
1775 Some(EventFilter::Lt(field.clone(), db_val))
1776 }
1777 AqlFilter::Lte(field, value) => {
1778 let db_val = aql_value_to_db_value(value).ok()?;
1779 Some(EventFilter::Lte(field.clone(), db_val))
1780 }
1781 AqlFilter::Ne(field, value) => {
1782 let db_val = aql_value_to_db_value(value).ok()?;
1783 Some(EventFilter::Ne(field.clone(), db_val))
1784 }
1785 AqlFilter::In(field, value) => {
1786 let db_val = aql_value_to_db_value(value).ok()?;
1787 Some(EventFilter::In(field.clone(), db_val))
1788 }
1789 AqlFilter::NotIn(field, value) => {
1790 let db_val = aql_value_to_db_value(value).ok()?;
1791 Some(EventFilter::NotIn(field.clone(), db_val))
1792 }
1793 AqlFilter::And(filters) => {
1794 let mut event_filters = Vec::new();
1795 for f in filters {
1796 if let Some(ef) = convert_aql_filter_to_event_filter(f) {
1797 event_filters.push(ef);
1798 } else {
1799 return None; }
1801 }
1802 Some(EventFilter::And(event_filters))
1803 }
1804 AqlFilter::Or(filters) => {
1805 let mut event_filters = Vec::new();
1806 for f in filters {
1807 if let Some(ef) = convert_aql_filter_to_event_filter(f) {
1808 event_filters.push(ef);
1809 } else {
1810 return None;
1811 }
1812 }
1813 Some(EventFilter::Or(event_filters))
1814 }
1815 AqlFilter::Not(filter) => {
1816 convert_aql_filter_to_event_filter(filter).map(|f| EventFilter::Not(Box::new(f)))
1817 }
1818 AqlFilter::Contains(field, value) => {
1819 let db_val = aql_value_to_db_value(value).ok()?;
1820 Some(EventFilter::Contains(field.clone(), db_val))
1821 }
1822 AqlFilter::StartsWith(field, value) => {
1823 let db_val = aql_value_to_db_value(value).ok()?;
1824 Some(EventFilter::StartsWith(field.clone(), db_val))
1825 }
1826 AqlFilter::EndsWith(field, value) => {
1827 let db_val = aql_value_to_db_value(value).ok()?;
1828 Some(EventFilter::EndsWith(field.clone(), db_val))
1829 }
1830 AqlFilter::IsNull(field) => Some(EventFilter::IsNull(field.clone())),
1831 AqlFilter::IsNotNull(field) => Some(EventFilter::IsNotNull(field.clone())),
1832
1833 AqlFilter::Matches(_, _) => None,
1835 }
1836}
1837
1838pub fn extract_pagination(args: &[ast::Argument]) -> (Option<usize>, usize) {
1840 let mut limit = None;
1841 let mut offset = 0;
1842
1843 for arg in args {
1844 match arg.name.as_str() {
1845 "limit" | "first" | "take" => {
1846 if let ast::Value::Int(n) = arg.value {
1847 limit = Some(n as usize);
1848 }
1849 }
1850 "offset" | "skip" => {
1851 if let ast::Value::Int(n) = arg.value {
1852 offset = n as usize;
1853 }
1854 }
1855 _ => {}
1856 }
1857 }
1858
1859 (limit, offset)
1860}
1861
1862fn extract_cursor_pagination(
1863 args: &[ast::Argument],
1864) -> (Option<usize>, Option<String>, Option<usize>, Option<String>) {
1865 let mut first = None;
1866 let mut after = None;
1867 let mut last = None;
1868 let mut before = None;
1869
1870 for arg in args {
1871 match arg.name.as_str() {
1872 "first" => {
1873 if let ast::Value::Int(n) = arg.value {
1874 first = Some(n as usize);
1875 }
1876 }
1877 "after" => {
1878 if let ast::Value::String(ref s) = arg.value {
1879 after = Some(s.clone());
1880 }
1881 }
1882 "last" => {
1883 if let ast::Value::Int(n) = arg.value {
1884 last = Some(n as usize);
1885 }
1886 }
1887 "before" => {
1888 if let ast::Value::String(ref s) = arg.value {
1889 before = Some(s.clone());
1890 }
1891 }
1892 _ => {}
1893 }
1894 }
1895
1896 (first, after, last, before)
1897}
1898
1899fn encode_cursor(val: &Value) -> String {
1900 let s = match val {
1901 Value::String(s) => s.clone(),
1902 _ => String::new(),
1903 };
1904 general_purpose::STANDARD.encode(s)
1905}
1906
1907fn decode_cursor(cursor: &str) -> Result<String> {
1908 let bytes = general_purpose::STANDARD
1909 .decode(cursor)
1910 .map_err(|_| AqlError::new(ErrorCode::QueryError, "Invalid cursor".to_string()))?;
1911 String::from_utf8(bytes)
1912 .map_err(|_| AqlError::new(ErrorCode::QueryError, "Invalid cursor UTF-8".to_string()))
1913}
1914
1915fn get_doc_value_at_path<'a>(doc: &'a Document, path: &str) -> Option<&'a Value> {
1916 if !path.contains('.') {
1917 return doc.data.get(path);
1918 }
1919
1920 let parts: Vec<&str> = path.split('.').collect();
1921 let mut current = doc.data.get(parts[0])?;
1922
1923 for &part in &parts[1..] {
1924 if let Value::Object(map) = current {
1925 current = map.get(part)?;
1926 } else {
1927 return None;
1928 }
1929 }
1930
1931 Some(current)
1932}
1933
1934pub fn matches_filter(
1936 doc: &Document,
1937 filter: &AqlFilter,
1938 variables: &HashMap<String, ast::Value>,
1939) -> bool {
1940 match filter {
1941 AqlFilter::Eq(field, value) => get_doc_value_at_path(doc, field)
1942 .map(|v| values_equal(v, value, variables))
1943 .unwrap_or(false),
1944 AqlFilter::Ne(field, value) => get_doc_value_at_path(doc, field)
1945 .map(|v| !values_equal(v, value, variables))
1946 .unwrap_or(true),
1947 AqlFilter::Gt(field, value) => get_doc_value_at_path(doc, field)
1948 .map(|v| value_compare(v, value, variables) == Some(std::cmp::Ordering::Greater))
1949 .unwrap_or(false),
1950 AqlFilter::Gte(field, value) => get_doc_value_at_path(doc, field)
1951 .map(|v| {
1952 matches!(
1953 value_compare(v, value, variables),
1954 Some(std::cmp::Ordering::Greater | std::cmp::Ordering::Equal)
1955 )
1956 })
1957 .unwrap_or(false),
1958 AqlFilter::Lt(field, value) => get_doc_value_at_path(doc, field)
1959 .map(|v| value_compare(v, value, variables) == Some(std::cmp::Ordering::Less))
1960 .unwrap_or(false),
1961 AqlFilter::Lte(field, value) => get_doc_value_at_path(doc, field)
1962 .map(|v| {
1963 matches!(
1964 value_compare(v, value, variables),
1965 Some(std::cmp::Ordering::Less | std::cmp::Ordering::Equal)
1966 )
1967 })
1968 .unwrap_or(false),
1969 AqlFilter::In(field, value) => {
1970 if let ast::Value::Array(arr) = value {
1971 get_doc_value_at_path(doc, field)
1972 .map(|v| arr.iter().any(|item| values_equal(v, item, variables)))
1973 .unwrap_or(false)
1974 } else {
1975 false
1976 }
1977 }
1978 AqlFilter::NotIn(field, value) => {
1979 if let ast::Value::Array(arr) = value {
1980 get_doc_value_at_path(doc, field)
1981 .map(|v| !arr.iter().any(|item| values_equal(v, item, variables)))
1982 .unwrap_or(true)
1983 } else {
1984 true
1985 }
1986 }
1987 AqlFilter::Contains(field, value) => {
1988 if let (Some(Value::String(doc_val)), ast::Value::String(search)) =
1989 (get_doc_value_at_path(doc, field), value)
1990 {
1991 doc_val.contains(search)
1992 } else {
1993 false
1994 }
1995 }
1996 AqlFilter::StartsWith(field, value) => {
1997 if let (Some(Value::String(doc_val)), ast::Value::String(prefix)) =
1998 (get_doc_value_at_path(doc, field), value)
1999 {
2000 doc_val.starts_with(prefix)
2001 } else {
2002 false
2003 }
2004 }
2005 AqlFilter::EndsWith(field, value) => {
2006 if let (Some(Value::String(doc_val)), ast::Value::String(suffix)) =
2007 (get_doc_value_at_path(doc, field), value)
2008 {
2009 doc_val.ends_with(suffix)
2010 } else {
2011 false
2012 }
2013 }
2014 AqlFilter::Matches(field, value) => {
2015 if let (Some(Value::String(doc_val)), ast::Value::String(pattern)) =
2017 (get_doc_value_at_path(doc, field), value)
2018 {
2019 doc_val.contains(pattern)
2020 } else {
2021 false
2022 }
2023 }
2024 AqlFilter::IsNull(field) => get_doc_value_at_path(doc, field)
2025 .map(|v| matches!(v, Value::Null))
2026 .unwrap_or(true),
2027 AqlFilter::IsNotNull(field) => get_doc_value_at_path(doc, field)
2028 .map(|v| !matches!(v, Value::Null))
2029 .unwrap_or(false),
2030 AqlFilter::And(filters) => filters.iter().all(|f| matches_filter(doc, f, variables)),
2031 AqlFilter::Or(filters) => filters.iter().any(|f| matches_filter(doc, f, variables)),
2032 AqlFilter::Not(filter) => !matches_filter(doc, filter, variables),
2033 }
2034}
2035
2036fn values_equal(
2038 db_val: &Value,
2039 aql_val: &ast::Value,
2040 variables: &HashMap<String, ast::Value>,
2041) -> bool {
2042 let resolved = resolve_if_variable(aql_val, variables);
2043 match (db_val, resolved) {
2044 (Value::Null, ast::Value::Null) => true,
2045 (Value::Bool(a), ast::Value::Boolean(b)) => *a == *b,
2046 (Value::Int(a), ast::Value::Int(b)) => *a == *b,
2047 (Value::Float(a), ast::Value::Float(b)) => (*a - *b).abs() < f64::EPSILON,
2048 (Value::Float(a), ast::Value::Int(b)) => (*a - (*b as f64)).abs() < f64::EPSILON,
2049 (Value::Int(a), ast::Value::Float(b)) => ((*a as f64) - *b).abs() < f64::EPSILON,
2050 (Value::String(a), ast::Value::String(b)) => a == b,
2051 _ => false,
2052 }
2053}
2054
2055fn value_compare(
2057 db_val: &Value,
2058 aql_val: &ast::Value,
2059 variables: &HashMap<String, ast::Value>,
2060) -> Option<std::cmp::Ordering> {
2061 let resolved = resolve_if_variable(aql_val, variables);
2062 match (db_val, resolved) {
2063 (Value::Int(a), ast::Value::Int(b)) => Some(a.cmp(b)),
2064 (Value::Float(a), ast::Value::Float(b)) => a.partial_cmp(b),
2065 (Value::Float(a), ast::Value::Int(b)) => a.partial_cmp(&(*b as f64)),
2066 (Value::Int(a), ast::Value::Float(b)) => (*a as f64).partial_cmp(b),
2067 (Value::String(a), ast::Value::String(b)) => Some(a.cmp(b)),
2068 _ => None,
2069 }
2070}
2071
2072fn resolve_if_variable<'a>(
2074 val: &'a ast::Value,
2075 variables: &'a HashMap<String, ast::Value>,
2076) -> &'a ast::Value {
2077 if let ast::Value::Variable(name) = val {
2078 variables.get(name).unwrap_or(val)
2079 } else {
2080 val
2081 }
2082}
2083
2084pub fn apply_projection(mut doc: Document, fields: &[ast::Field]) -> Document {
2086 if fields.is_empty() {
2087 return doc;
2088 }
2089
2090 let mut projected_data = HashMap::new();
2091
2092 if let Some(id_val) = doc.data.get("id") {
2094 projected_data.insert("id".to_string(), id_val.clone());
2095 }
2096
2097 for field in fields {
2098 let field_name = field.alias.as_ref().unwrap_or(&field.name);
2099 let source_name = &field.name;
2100
2101 if let Some(value) = doc.data.get(source_name) {
2102 projected_data.insert(field_name.clone(), value.clone());
2103 }
2104 }
2105
2106 doc.data = projected_data;
2107 doc
2108}
2109
2110pub fn aql_value_to_db_value(val: &ast::Value) -> Result<Value> {
2112 match val {
2113 ast::Value::Null => Ok(Value::Null),
2114 ast::Value::Boolean(b) => Ok(Value::Bool(*b)),
2115 ast::Value::Int(i) => Ok(Value::Int(*i)),
2116 ast::Value::Float(f) => Ok(Value::Float(*f)),
2117 ast::Value::String(s) => Ok(Value::String(s.clone())),
2118 ast::Value::Array(arr) => {
2119 let converted: Result<Vec<Value>> = arr.iter().map(aql_value_to_db_value).collect();
2120 Ok(Value::Array(converted?))
2121 }
2122 ast::Value::Object(map) => {
2123 let mut converted = HashMap::new();
2124 for (k, v) in map {
2125 converted.insert(k.clone(), aql_value_to_db_value(v)?);
2126 }
2127 Ok(Value::Object(converted))
2128 }
2129 ast::Value::Variable(name) => Err(AqlError::new(
2130 ErrorCode::QueryError,
2131 format!("Unresolved variable: {}", name),
2132 )),
2133 ast::Value::Enum(e) => Ok(Value::String(e.clone())),
2134 }
2135}
2136
2137fn aql_value_to_hashmap(val: &ast::Value) -> Result<HashMap<String, Value>> {
2139 match val {
2140 ast::Value::Object(map) => {
2141 let mut converted = HashMap::new();
2142 for (k, v) in map {
2143 converted.insert(k.clone(), aql_value_to_db_value(v)?);
2144 }
2145 Ok(converted)
2146 }
2147 _ => Err(AqlError::new(
2148 ErrorCode::QueryError,
2149 "Data must be an object".to_string(),
2150 )),
2151 }
2152}
2153
2154pub fn db_value_to_aql_value(val: &Value) -> ast::Value {
2156 match val {
2157 Value::Null => ast::Value::Null,
2158 Value::Bool(b) => ast::Value::Boolean(*b),
2159 Value::Int(i) => ast::Value::Int(*i),
2160 Value::Float(f) => ast::Value::Float(*f),
2161 Value::String(s) => ast::Value::String(s.clone()),
2162 Value::Array(arr) => ast::Value::Array(arr.iter().map(db_value_to_aql_value).collect()),
2163 Value::Object(map) => ast::Value::Object(
2164 map.iter()
2165 .map(|(k, v)| (k.clone(), db_value_to_aql_value(v)))
2166 .collect(),
2167 ),
2168 Value::Uuid(u) => ast::Value::String(u.to_string()),
2169 }
2170}
2171
2172pub fn value_to_filter(value: &ast::Value) -> Result<AqlFilter> {
2174 match value {
2175 ast::Value::Object(map) => {
2176 let mut filters = Vec::new();
2177 for (key, val) in map {
2178 match key.as_str() {
2179 "and" => {
2180 if let ast::Value::Array(arr) = val {
2181 let sub: Result<Vec<_>> = arr.iter().map(value_to_filter).collect();
2182 filters.push(AqlFilter::And(sub?));
2183 }
2184 }
2185 "or" => {
2186 if let ast::Value::Array(arr) = val {
2187 let sub: Result<Vec<_>> = arr.iter().map(value_to_filter).collect();
2188 filters.push(AqlFilter::Or(sub?));
2189 }
2190 }
2191 "not" => filters.push(AqlFilter::Not(Box::new(value_to_filter(val)?))),
2192 field => {
2193 if let ast::Value::Object(ops) = val {
2194 for (op, op_val) in ops {
2195 let f = match op.as_str() {
2196 "eq" => AqlFilter::Eq(field.to_string(), op_val.clone()),
2197 "ne" => AqlFilter::Ne(field.to_string(), op_val.clone()),
2198 "gt" => AqlFilter::Gt(field.to_string(), op_val.clone()),
2199 "gte" => AqlFilter::Gte(field.to_string(), op_val.clone()),
2200 "lt" => AqlFilter::Lt(field.to_string(), op_val.clone()),
2201 "lte" => AqlFilter::Lte(field.to_string(), op_val.clone()),
2202 "in" => AqlFilter::In(field.to_string(), op_val.clone()),
2203 "nin" => AqlFilter::NotIn(field.to_string(), op_val.clone()),
2204 "contains" => {
2205 AqlFilter::Contains(field.to_string(), op_val.clone())
2206 }
2207 "startsWith" => {
2208 AqlFilter::StartsWith(field.to_string(), op_val.clone())
2209 }
2210 "endsWith" => {
2211 AqlFilter::EndsWith(field.to_string(), op_val.clone())
2212 }
2213 "isNull" => AqlFilter::IsNull(field.to_string()),
2214 "isNotNull" => AqlFilter::IsNotNull(field.to_string()),
2215 _ => continue,
2216 };
2217 filters.push(f);
2218 }
2219 }
2220 }
2221 }
2222 }
2223 if filters.len() == 1 {
2224 Ok(filters.remove(0))
2225 } else {
2226 Ok(AqlFilter::And(filters))
2227 }
2228 }
2229 _ => Err(AqlError::new(
2230 ErrorCode::QueryError,
2231 "Filter must be an object".to_string(),
2232 )),
2233 }
2234}
2235
2236fn check_ast_filter_match(filter: &ast::Filter, doc: &Document) -> bool {
2238 match filter {
2239 ast::Filter::Eq(field, val) => check_cmp(doc, field, val, |a, b| a == b),
2240 ast::Filter::Ne(field, val) => check_cmp(doc, field, val, |a, b| a != b),
2241 ast::Filter::Gt(field, val) => check_cmp(doc, field, val, |a, b| a > b),
2242 ast::Filter::Gte(field, val) => check_cmp(doc, field, val, |a, b| a >= b),
2243 ast::Filter::Lt(field, val) => check_cmp(doc, field, val, |a, b| a < b),
2244 ast::Filter::Lte(field, val) => check_cmp(doc, field, val, |a, b| a <= b),
2245 ast::Filter::In(field, val) => {
2246 if let Ok(db_val) = aql_value_to_db_value(val) {
2247 if let Some(doc_val) = doc.data.get(field) {
2248 if let Value::Array(arr) = db_val {
2249 return arr.contains(doc_val);
2250 }
2251 }
2252 }
2253 false
2254 }
2255 ast::Filter::And(filters) => filters.iter().all(|f| check_ast_filter_match(f, doc)),
2256 ast::Filter::Or(filters) => filters.iter().any(|f| check_ast_filter_match(f, doc)),
2257 ast::Filter::Not(filter) => !check_ast_filter_match(filter, doc),
2258 _ => true, }
2260}
2261
2262fn check_cmp<F>(doc: &Document, field: &str, val: &ast::Value, op: F) -> bool
2263where
2264 F: Fn(&Value, &Value) -> bool,
2265{
2266 if let Some(doc_val) = doc.data.get(field) {
2267 if let Ok(cmp_val) = aql_value_to_db_value(val) {
2268 return op(doc_val, &cmp_val);
2269 }
2270 }
2271 false
2272}
2273
2274fn resolve_value(
2278 val: &ast::Value,
2279 variables: &HashMap<String, ast::Value>,
2280 context: &ExecutionContext,
2281) -> ast::Value {
2282 match val {
2283 ast::Value::Variable(name) => {
2284 if let Some(v) = variables.get(name) {
2285 v.clone()
2286 } else {
2287 val.clone()
2295 }
2296 }
2297 ast::Value::String(s) if s.starts_with('$') => {
2298 match resolve_variable_path(s, context) {
2300 Some(v) => v,
2301 None => val.clone(),
2302 }
2303 }
2304 ast::Value::Array(arr) => ast::Value::Array(
2305 arr.iter()
2306 .map(|v| resolve_value(v, variables, context))
2307 .collect(),
2308 ),
2309 ast::Value::Object(map) => {
2310 let mut resolved_map = HashMap::new();
2311 for (k, v) in map {
2312 resolved_map.insert(k.clone(), resolve_value(v, variables, context));
2313 }
2314 ast::Value::Object(resolved_map)
2315 }
2316 _ => val.clone(),
2317 }
2318}
2319
2320fn resolve_variable_path(path: &str, context: &ExecutionContext) -> Option<ast::Value> {
2322 let path = path.trim_start_matches('$');
2323 let parts: Vec<&str> = path.split('.').collect();
2324
2325 if parts.is_empty() {
2326 return None;
2327 }
2328
2329 let alias = parts[0];
2331 let mut current_value = context.get(alias)?;
2332
2333 for part in &parts[1..] {
2335 match current_value {
2336 serde_json::Value::Object(map) => {
2337 current_value = map.get(*part)?;
2338 }
2339 serde_json::Value::Array(arr) => {
2340 if let Ok(idx) = part.parse::<usize>() {
2342 current_value = arr.get(idx)?;
2343 } else {
2344 return None;
2345 }
2346 }
2347 _ => return None,
2348 }
2349 }
2350
2351 Some(json_to_ast_value(current_value))
2353}
2354
2355fn json_to_ast_value(json: &serde_json::Value) -> ast::Value {
2356 match json {
2357 serde_json::Value::Null => ast::Value::Null,
2358 serde_json::Value::Bool(b) => ast::Value::Boolean(*b),
2359 serde_json::Value::Number(n) => {
2360 if let Some(i) = n.as_i64() {
2361 ast::Value::Int(i)
2362 } else if let Some(f) = n.as_f64() {
2363 ast::Value::Float(f)
2364 } else {
2365 ast::Value::Null }
2367 }
2368 serde_json::Value::String(s) => ast::Value::String(s.clone()),
2369 serde_json::Value::Array(arr) => {
2370 ast::Value::Array(arr.iter().map(json_to_ast_value).collect())
2371 }
2372 serde_json::Value::Object(map) => {
2373 let mut new_map = HashMap::new();
2374 for (k, v) in map {
2375 new_map.insert(k.clone(), json_to_ast_value(v));
2376 }
2377 ast::Value::Object(new_map)
2378 }
2379 }
2380}
2381
2382fn aurora_value_to_json_value(v: &Value) -> JsonValue {
2383 match v {
2384 Value::Null => JsonValue::Null,
2385 Value::String(s) => JsonValue::String(s.clone()),
2386 Value::Int(i) => JsonValue::Number((*i).into()),
2387 Value::Float(f) => {
2388 if let Some(n) = serde_json::Number::from_f64(*f) {
2389 JsonValue::Number(n)
2390 } else {
2391 JsonValue::Null
2392 }
2393 }
2394 Value::Bool(b) => JsonValue::Bool(*b),
2395 Value::Array(arr) => JsonValue::Array(arr.iter().map(aurora_value_to_json_value).collect()),
2396 Value::Object(map) => {
2397 let mut json_map = serde_json::Map::new();
2398 for (k, v) in map {
2399 json_map.insert(k.clone(), aurora_value_to_json_value(v));
2400 }
2401 JsonValue::Object(json_map)
2402 }
2403 Value::Uuid(u) => JsonValue::String(u.to_string()),
2404 }
2405}
2406
2407#[cfg(test)]
2408mod tests {
2409 use super::*;
2410
2411 #[test]
2412 fn test_aql_value_conversion() {
2413 let aql_val = ast::Value::Object({
2414 let mut map = HashMap::new();
2415 map.insert("name".to_string(), ast::Value::String("John".to_string()));
2416 map.insert("age".to_string(), ast::Value::Int(30));
2417 map
2418 });
2419
2420 let db_val = aql_value_to_db_value(&aql_val).unwrap();
2421 if let Value::Object(map) = db_val {
2422 assert_eq!(map.get("name"), Some(&Value::String("John".to_string())));
2423 assert_eq!(map.get("age"), Some(&Value::Int(30)));
2424 } else {
2425 panic!("Expected Object");
2426 }
2427 }
2428
2429 #[test]
2430 fn test_matches_filter_eq() {
2431 let mut doc = Document::new();
2432 doc.data
2433 .insert("name".to_string(), Value::String("Alice".to_string()));
2434 doc.data.insert("age".to_string(), Value::Int(25));
2435
2436 let filter = AqlFilter::Eq("name".to_string(), ast::Value::String("Alice".to_string()));
2437 assert!(matches_filter(&doc, &filter, &HashMap::new()));
2438
2439 let filter = AqlFilter::Eq("name".to_string(), ast::Value::String("Bob".to_string()));
2440 assert!(!matches_filter(&doc, &filter, &HashMap::new()));
2441 }
2442
2443 #[test]
2444 fn test_matches_filter_comparison() {
2445 let mut doc = Document::new();
2446 doc.data.insert("age".to_string(), Value::Int(25));
2447
2448 let filter = AqlFilter::Gt("age".to_string(), ast::Value::Int(20));
2449 assert!(matches_filter(&doc, &filter, &HashMap::new()));
2450
2451 let filter = AqlFilter::Gt("age".to_string(), ast::Value::Int(30));
2452 assert!(!matches_filter(&doc, &filter, &HashMap::new()));
2453
2454 let filter = AqlFilter::Gte("age".to_string(), ast::Value::Int(25));
2455 assert!(matches_filter(&doc, &filter, &HashMap::new()));
2456
2457 let filter = AqlFilter::Lt("age".to_string(), ast::Value::Int(30));
2458 assert!(matches_filter(&doc, &filter, &HashMap::new()));
2459 }
2460
2461 #[test]
2462 fn test_matches_filter_and_or() {
2463 let mut doc = Document::new();
2464 doc.data
2465 .insert("name".to_string(), Value::String("Alice".to_string()));
2466 doc.data.insert("age".to_string(), Value::Int(25));
2467
2468 let filter = AqlFilter::And(vec![
2469 AqlFilter::Eq("name".to_string(), ast::Value::String("Alice".to_string())),
2470 AqlFilter::Gte("age".to_string(), ast::Value::Int(18)),
2471 ]);
2472 assert!(matches_filter(&doc, &filter, &HashMap::new()));
2473
2474 let filter = AqlFilter::Or(vec![
2475 AqlFilter::Eq("name".to_string(), ast::Value::String("Bob".to_string())),
2476 AqlFilter::Gte("age".to_string(), ast::Value::Int(18)),
2477 ]);
2478 assert!(matches_filter(&doc, &filter, &HashMap::new()));
2479 }
2480
2481 #[test]
2482 fn test_matches_filter_string_ops() {
2483 let mut doc = Document::new();
2484 doc.data.insert(
2485 "email".to_string(),
2486 Value::String("alice@example.com".to_string()),
2487 );
2488
2489 let filter = AqlFilter::Contains(
2490 "email".to_string(),
2491 ast::Value::String("example".to_string()),
2492 );
2493 assert!(matches_filter(&doc, &filter, &HashMap::new()));
2494
2495 let filter =
2496 AqlFilter::StartsWith("email".to_string(), ast::Value::String("alice".to_string()));
2497 assert!(matches_filter(&doc, &filter, &HashMap::new()));
2498
2499 let filter =
2500 AqlFilter::EndsWith("email".to_string(), ast::Value::String(".com".to_string()));
2501 assert!(matches_filter(&doc, &filter, &HashMap::new()));
2502 }
2503
2504 #[test]
2505 fn test_matches_filter_in() {
2506 let mut doc = Document::new();
2507 doc.data
2508 .insert("status".to_string(), Value::String("active".to_string()));
2509
2510 let filter = AqlFilter::In(
2511 "status".to_string(),
2512 ast::Value::Array(vec![
2513 ast::Value::String("active".to_string()),
2514 ast::Value::String("pending".to_string()),
2515 ]),
2516 );
2517 assert!(matches_filter(&doc, &filter, &HashMap::new()));
2518
2519 let filter = AqlFilter::In(
2520 "status".to_string(),
2521 ast::Value::Array(vec![ast::Value::String("inactive".to_string())]),
2522 );
2523 assert!(!matches_filter(&doc, &filter, &HashMap::new()));
2524 }
2525
2526 #[test]
2527 fn test_apply_projection() {
2528 let mut doc = Document::new();
2529 doc.data
2530 .insert("id".to_string(), Value::String("123".to_string()));
2531 doc.data
2532 .insert("name".to_string(), Value::String("Alice".to_string()));
2533 doc.data.insert(
2534 "email".to_string(),
2535 Value::String("alice@example.com".to_string()),
2536 );
2537 doc.data
2538 .insert("password".to_string(), Value::String("secret".to_string()));
2539
2540 let fields = vec![
2541 ast::Field {
2542 alias: None,
2543 name: "id".to_string(),
2544 arguments: vec![],
2545 directives: vec![],
2546 selection_set: vec![],
2547 },
2548 ast::Field {
2549 alias: None,
2550 name: "name".to_string(),
2551 arguments: vec![],
2552 directives: vec![],
2553 selection_set: vec![],
2554 },
2555 ];
2556
2557 let projected = apply_projection(doc, &fields);
2558 assert_eq!(projected.data.len(), 2);
2559 assert!(projected.data.contains_key("id"));
2560 assert!(projected.data.contains_key("name"));
2561 assert!(!projected.data.contains_key("email"));
2562 assert!(!projected.data.contains_key("password"));
2563 }
2564
2565 #[test]
2566 fn test_apply_projection_with_alias() {
2567 let mut doc = Document::new();
2568 doc.data
2569 .insert("first_name".to_string(), Value::String("Alice".to_string()));
2570
2571 let fields = vec![ast::Field {
2572 alias: Some("name".to_string()),
2573 name: "first_name".to_string(),
2574 arguments: vec![],
2575 directives: vec![],
2576 selection_set: vec![],
2577 }];
2578
2579 let projected = apply_projection(doc, &fields);
2580 assert!(projected.data.contains_key("name"));
2581 assert!(!projected.data.contains_key("first_name"));
2582 }
2583
2584 #[test]
2585 fn test_extract_pagination() {
2586 let args = vec![
2587 ast::Argument {
2588 name: "limit".to_string(),
2589 value: ast::Value::Int(10),
2590 },
2591 ast::Argument {
2592 name: "offset".to_string(),
2593 value: ast::Value::Int(20),
2594 },
2595 ];
2596
2597 let (limit, offset) = extract_pagination(&args);
2598 assert_eq!(limit, Some(10));
2599 assert_eq!(offset, 20);
2600 }
2601
2602 #[test]
2603 fn test_matches_filter_with_variables() {
2604 let mut doc = Document::new();
2605 doc.data.insert("age".to_string(), Value::Int(25));
2606
2607 let mut variables = HashMap::new();
2608 variables.insert("minAge".to_string(), ast::Value::Int(18));
2609
2610 let filter = AqlFilter::Gte(
2611 "age".to_string(),
2612 ast::Value::Variable("minAge".to_string()),
2613 );
2614 assert!(matches_filter(&doc, &filter, &variables));
2615 }
2616
2617 #[tokio::test]
2618 async fn test_executor_integration() {
2619 use crate::Aurora;
2620 use tempfile::TempDir;
2621
2622 let temp_dir = TempDir::new().unwrap();
2624 let db_path = temp_dir.path().join("test.db");
2625 let config = crate::AuroraConfig {
2626 db_path,
2627 enable_write_buffering: false,
2628 durability_mode: crate::DurabilityMode::Synchronous,
2629 ..Default::default()
2630 };
2631 let db = Aurora::with_config(config).unwrap();
2632
2633 db.new_collection(
2635 "users",
2636 vec![
2637 ("name", crate::FieldType::String, false),
2638 ("age", crate::FieldType::Int, false),
2639 ("active", crate::FieldType::Bool, false),
2640 ],
2641 )
2642 .await
2643 .unwrap();
2644
2645 let insert_query = r#"
2647 mutation {
2648 insertInto(collection: "users", data: {
2649 name: "Alice",
2650 age: 30,
2651 active: true
2652 }) {
2653 id
2654 name
2655 }
2656 }
2657 "#;
2658
2659 let result = execute(&db, insert_query, ExecutionOptions::new())
2660 .await
2661 .unwrap();
2662 match result {
2663 ExecutionResult::Mutation(res) => {
2664 assert_eq!(res.affected_count, 1);
2665 assert_eq!(res.returned_documents.len(), 1);
2666 assert_eq!(
2667 res.returned_documents[0].data.get("name"),
2668 Some(&Value::String("Alice".to_string()))
2669 );
2670 }
2671 _ => panic!("Expected mutation result"),
2672 }
2673
2674 let query = r#"
2676 query {
2677 users {
2678 name
2679 age
2680 }
2681 }
2682 "#;
2683
2684 let result = execute(&db, query, ExecutionOptions::new()).await.unwrap();
2685 match result {
2686 ExecutionResult::Query(res) => {
2687 assert_eq!(res.documents.len(), 1);
2688 assert_eq!(
2689 res.documents[0].data.get("name"),
2690 Some(&Value::String("Alice".to_string()))
2691 );
2692 assert_eq!(res.documents[0].data.get("age"), Some(&Value::Int(30)));
2693 }
2694 _ => panic!("Expected query result"),
2695 }
2696
2697 let delete_query = r#"
2699 mutation {
2700 deleteFrom(collection: "users", filter: { name: { eq: "Alice" } }) {
2701 id
2702 }
2703 }
2704 "#;
2705
2706 let result = execute(&db, delete_query, ExecutionOptions::new())
2707 .await
2708 .unwrap();
2709 match result {
2710 ExecutionResult::Mutation(res) => {
2711 assert_eq!(res.affected_count, 1);
2712 }
2713 _ => panic!("Expected mutation result"),
2714 }
2715
2716 let query = r#"
2718 query {
2719 users {
2720 name
2721 }
2722 }
2723 "#;
2724
2725 let result = execute(&db, query, ExecutionOptions::new()).await.unwrap();
2726 match result {
2727 ExecutionResult::Query(res) => {
2728 assert_eq!(res.documents.len(), 0);
2729 }
2730 _ => panic!("Expected query result"),
2731 }
2732 }
2733
2734 #[tokio::test]
2735 async fn test_lookup_cross_collection_join() {
2736 assert!(db_values_equal(
2740 &Value::String("user1".to_string()),
2741 &Value::String("user1".to_string())
2742 ));
2743
2744 assert!(!db_values_equal(
2746 &Value::String("user1".to_string()),
2747 &Value::String("user2".to_string())
2748 ));
2749
2750 assert!(db_values_equal(&Value::Int(42), &Value::Int(42)));
2752
2753 assert!(db_values_equal(
2755 &Value::String("123".to_string()),
2756 &Value::Int(123)
2757 ));
2758
2759 assert!(db_values_equal(&Value::Null, &Value::Null));
2761
2762 assert!(db_values_equal(&Value::Bool(true), &Value::Bool(true)));
2764 assert!(!db_values_equal(&Value::Bool(true), &Value::Bool(false)));
2765 }
2766
2767 #[test]
2772 fn test_order_by_extraction_and_sorting() {
2773 let args = vec![ast::Argument {
2775 name: "orderBy".to_string(),
2776 value: ast::Value::String("name".to_string()),
2777 }];
2778 let orderings = extract_order_by(&args);
2779 assert_eq!(orderings.len(), 1);
2780 assert_eq!(orderings[0].field, "name");
2781 assert_eq!(orderings[0].direction, ast::SortDirection::Asc);
2782
2783 let mut order_map = HashMap::new();
2785 order_map.insert("field".to_string(), ast::Value::String("age".to_string()));
2786 order_map.insert(
2787 "direction".to_string(),
2788 ast::Value::Enum("DESC".to_string()),
2789 );
2790 let args = vec![ast::Argument {
2791 name: "orderBy".to_string(),
2792 value: ast::Value::Object(order_map),
2793 }];
2794 let orderings = extract_order_by(&args);
2795 assert_eq!(orderings.len(), 1);
2796 assert_eq!(orderings[0].field, "age");
2797 assert_eq!(orderings[0].direction, ast::SortDirection::Desc);
2798
2799 let mut docs = vec![
2801 Document {
2802 id: "1".to_string(),
2803 data: {
2804 let mut m = HashMap::new();
2805 m.insert("name".to_string(), Value::String("Charlie".to_string()));
2806 m
2807 },
2808 },
2809 Document {
2810 id: "2".to_string(),
2811 data: {
2812 let mut m = HashMap::new();
2813 m.insert("name".to_string(), Value::String("Alice".to_string()));
2814 m
2815 },
2816 },
2817 Document {
2818 id: "3".to_string(),
2819 data: {
2820 let mut m = HashMap::new();
2821 m.insert("name".to_string(), Value::String("Bob".to_string()));
2822 m
2823 },
2824 },
2825 ];
2826
2827 let orderings = vec![ast::Ordering {
2828 field: "name".to_string(),
2829 direction: ast::SortDirection::Asc,
2830 }];
2831 apply_ordering(&mut docs, &orderings);
2832
2833 assert_eq!(
2834 docs[0].data.get("name"),
2835 Some(&Value::String("Alice".to_string()))
2836 );
2837 assert_eq!(
2838 docs[1].data.get("name"),
2839 Some(&Value::String("Bob".to_string()))
2840 );
2841 assert_eq!(
2842 docs[2].data.get("name"),
2843 Some(&Value::String("Charlie".to_string()))
2844 );
2845 }
2846
2847 #[test]
2848 fn test_validation() {
2849 let doc = Document {
2850 id: "1".to_string(),
2851 data: {
2852 let mut m = HashMap::new();
2853 m.insert("email".to_string(), Value::String("invalid".to_string()));
2854 m.insert("age".to_string(), Value::Int(15));
2855 m.insert("name".to_string(), Value::String("Ab".to_string()));
2856 m
2857 },
2858 };
2859
2860 let rules = vec![
2861 ast::ValidationRule {
2862 field: "email".to_string(),
2863 constraints: vec![ast::ValidationConstraint::Format("email".to_string())],
2864 },
2865 ast::ValidationRule {
2866 field: "age".to_string(),
2867 constraints: vec![ast::ValidationConstraint::Min(18.0)],
2868 },
2869 ast::ValidationRule {
2870 field: "name".to_string(),
2871 constraints: vec![ast::ValidationConstraint::MinLength(3)],
2872 },
2873 ];
2874
2875 let errors = validate_document(&doc, &rules).unwrap();
2876 assert_eq!(errors.len(), 3);
2877 assert!(errors.iter().any(|e| e.contains("email")));
2878 assert!(errors.iter().any(|e| e.contains("age")));
2879 assert!(errors.iter().any(|e| e.contains("name")));
2880 }
2881
2882 #[test]
2883 fn test_downsample() {
2884 let docs = vec![
2885 Document {
2886 id: "1".to_string(),
2887 data: {
2888 let mut m = HashMap::new();
2889 m.insert("timestamp".to_string(), Value::Int(0));
2890 m.insert("value".to_string(), Value::Float(10.0));
2891 m
2892 },
2893 },
2894 Document {
2895 id: "2".to_string(),
2896 data: {
2897 let mut m = HashMap::new();
2898 m.insert("timestamp".to_string(), Value::Int(30));
2899 m.insert("value".to_string(), Value::Float(20.0));
2900 m
2901 },
2902 },
2903 Document {
2904 id: "3".to_string(),
2905 data: {
2906 let mut m = HashMap::new();
2907 m.insert("timestamp".to_string(), Value::Int(120));
2908 m.insert("value".to_string(), Value::Float(30.0));
2909 m
2910 },
2911 },
2912 ];
2913
2914 let result = execute_downsample(&docs, "1m", "avg", "timestamp", "value").unwrap();
2916
2917 assert_eq!(result.len(), 2);
2919 }
2920
2921 #[test]
2922 fn test_window_function() {
2923 let docs = vec![
2924 Document {
2925 id: "1".to_string(),
2926 data: {
2927 let mut m = HashMap::new();
2928 m.insert("value".to_string(), Value::Float(10.0));
2929 m
2930 },
2931 },
2932 Document {
2933 id: "2".to_string(),
2934 data: {
2935 let mut m = HashMap::new();
2936 m.insert("value".to_string(), Value::Float(20.0));
2937 m
2938 },
2939 },
2940 Document {
2941 id: "3".to_string(),
2942 data: {
2943 let mut m = HashMap::new();
2944 m.insert("value".to_string(), Value::Float(30.0));
2945 m
2946 },
2947 },
2948 ];
2949
2950 let result = execute_window_function(&docs, "value", "avg", 2).unwrap();
2952 assert_eq!(result.len(), 3);
2953
2954 assert_eq!(result[0].data.get("avg_window"), Some(&Value::Float(10.0)));
2956 assert_eq!(result[1].data.get("avg_window"), Some(&Value::Float(15.0)));
2958 assert_eq!(result[2].data.get("avg_window"), Some(&Value::Float(25.0)));
2960 }
2961
2962 #[tokio::test]
2963 async fn test_lookup_integration_with_schema() {
2964 use crate::Aurora;
2965 use crate::AuroraConfig;
2966 use tempfile::TempDir;
2967
2968 let temp_dir = TempDir::new().unwrap();
2969 let db_path = temp_dir.path().join("lookup_test.db");
2970
2971 let config = AuroraConfig {
2972 db_path,
2973 enable_write_buffering: false,
2974 durability_mode: crate::DurabilityMode::Synchronous,
2975 ..Default::default()
2976 };
2977 let db = Aurora::with_config(config).unwrap();
2978
2979 let define_users = r#"
2981 schema {
2982 define collection users if not exists {
2983 userId: String
2984 name: String
2985 }
2986 }
2987 "#;
2988 execute(&db, define_users, ExecutionOptions::new())
2989 .await
2990 .unwrap();
2991
2992 let define_orders = r#"
2994 schema {
2995 define collection orders if not exists {
2996 orderId: String
2997 userId: String
2998 total: Int
2999 }
3000 }
3001 "#;
3002 execute(&db, define_orders, ExecutionOptions::new())
3003 .await
3004 .unwrap();
3005
3006 let insert_user = r#"
3008 mutation {
3009 insertInto(collection: "users", data: {
3010 userId: "user1",
3011 name: "Alice"
3012 }) { id userId name }
3013 }
3014 "#;
3015 let user_result = execute(&db, insert_user, ExecutionOptions::new())
3016 .await
3017 .unwrap();
3018
3019 let user_doc = match user_result {
3020 ExecutionResult::Mutation(res) => {
3021 assert_eq!(res.affected_count, 1);
3022 res.returned_documents[0].clone()
3023 }
3024 _ => panic!("Expected mutation result"),
3025 };
3026
3027 let insert_order1 = r#"
3029 mutation {
3030 insertInto(collection: "orders", data: {
3031 orderId: "order1",
3032 userId: "user1",
3033 total: 100
3034 }) { id }
3035 }
3036 "#;
3037 execute(&db, insert_order1, ExecutionOptions::new())
3038 .await
3039 .unwrap();
3040
3041 let insert_order2 = r#"
3042 mutation {
3043 insertInto(collection: "orders", data: {
3044 orderId: "order2",
3045 userId: "user1",
3046 total: 250
3047 }) { id }
3048 }
3049 "#;
3050 execute(&db, insert_order2, ExecutionOptions::new())
3051 .await
3052 .unwrap();
3053
3054 let query = r#"query { orders { orderId userId total } }"#;
3056 let result = execute(&db, query, ExecutionOptions::new()).await.unwrap();
3057 match result {
3058 ExecutionResult::Query(res) => {
3059 assert_eq!(res.documents.len(), 2, "Should have 2 orders");
3060 }
3061 _ => panic!("Expected query result"),
3062 }
3063
3064 let lookup = ast::LookupSelection {
3066 collection: "orders".to_string(),
3067 local_field: "userId".to_string(),
3068 foreign_field: "userId".to_string(),
3069 filter: None,
3070 selection_set: vec![],
3071 };
3072
3073 let lookup_result = execute_lookup(&db, &user_doc, &lookup, &HashMap::new())
3074 .await
3075 .unwrap();
3076 if let Value::Array(found_orders) = lookup_result {
3077 assert_eq!(found_orders.len(), 2, "Should find 2 orders for user1");
3078 } else {
3079 panic!("Expected array result from lookup");
3080 }
3081 }
3082
3083 #[tokio::test]
3084 async fn test_sdl_integration() {
3085 use crate::Aurora;
3086 use crate::AuroraConfig;
3087 use tempfile::TempDir;
3088
3089 let temp_dir = TempDir::new().unwrap();
3091 let db_path = temp_dir.path().join("test_sdl.db");
3092
3093 let config = AuroraConfig {
3094 db_path,
3095 enable_write_buffering: false,
3096 durability_mode: crate::DurabilityMode::Synchronous,
3097 ..Default::default()
3098 };
3099 let db = Aurora::with_config(config).unwrap();
3100
3101 let define_schema = r#"
3103 schema {
3104 define collection products if not exists {
3105 name: String @unique
3106 price: Float @indexed
3107 category: String
3108 }
3109 }
3110 "#;
3111
3112 let result = execute(&db, define_schema, ExecutionOptions::new())
3113 .await
3114 .unwrap();
3115 match result {
3116 ExecutionResult::Schema(res) => {
3117 assert_eq!(res.status, "created");
3118 assert_eq!(res.collection, "products");
3119 }
3120 _ => panic!("Expected schema result"),
3121 }
3122
3123 let result = execute(&db, define_schema, ExecutionOptions::new())
3125 .await
3126 .unwrap();
3127 match result {
3128 ExecutionResult::Schema(res) => {
3129 assert!(
3132 res.status == "skipped (exists)" || res.status == "created",
3133 "Unexpected status: {}",
3134 res.status
3135 );
3136 }
3137 _ => panic!("Expected schema result for duplicate"),
3138 }
3139
3140 let alter_schema = r#"
3142 schema {
3143 alter collection products {
3144 add stock: Int @indexed
3145 }
3146 }
3147 "#;
3148
3149 let result = execute(&db, alter_schema, ExecutionOptions::new())
3150 .await
3151 .unwrap();
3152 match result {
3153 ExecutionResult::Schema(res) => {
3154 assert_eq!(res.status, "modified");
3155 }
3156 _ => panic!("Expected schema result for alter"),
3157 }
3158
3159 let rename_schema = r#"
3161 schema {
3162 alter collection products {
3163 rename category to cat
3164 }
3165 }
3166 "#;
3167 let result = execute(&db, rename_schema, ExecutionOptions::new())
3168 .await
3169 .unwrap();
3170 match result {
3171 ExecutionResult::Schema(res) => {
3172 assert_eq!(res.status, "modified");
3173 }
3174 _ => panic!("Expected schema result for rename"),
3175 }
3176
3177 let modify_schema = r#"
3179 schema {
3180 alter collection products {
3181 modify price: Float
3182 }
3183 }
3184 "#;
3185 let result = execute(&db, modify_schema, ExecutionOptions::new())
3186 .await
3187 .unwrap();
3188 match result {
3189 ExecutionResult::Schema(res) => {
3190 assert_eq!(res.status, "modified");
3191 }
3192 _ => panic!("Expected schema result for modify"),
3193 }
3194
3195 let migration = r#"
3197 migrate {
3198 "v1": {
3199 alter collection products {
3200 add description: String
3201 }
3202 }
3203 }
3204 "#;
3205
3206 let result = execute(&db, migration, ExecutionOptions::new())
3207 .await
3208 .unwrap();
3209 match result {
3210 ExecutionResult::Migration(res) => {
3211 assert_eq!(res.steps_applied, 1);
3212 }
3213 _ => panic!("Expected migration result"),
3214 }
3215
3216 let result = execute(&db, migration, ExecutionOptions::new())
3218 .await
3219 .unwrap();
3220 match result {
3221 ExecutionResult::Migration(res) => {
3222 assert_eq!(
3224 res.steps_applied, 0,
3225 "Migration should be idempotent - version already applied"
3226 );
3227 }
3228 _ => panic!("Expected Migration result for idempotency check"),
3229 }
3230
3231 let drop_schema = r#"
3233 schema {
3234 drop collection products
3235 }
3236 "#;
3237
3238 let result = execute(&db, drop_schema, ExecutionOptions::new())
3239 .await
3240 .unwrap();
3241 match result {
3242 ExecutionResult::Schema(res) => {
3243 assert_eq!(res.status, "dropped");
3244 }
3245 _ => panic!("Expected schema result for drop"),
3246 }
3247 }
3248
3249 #[tokio::test]
3250 async fn test_dynamic_variable_resolution() {
3251 use crate::Aurora;
3252 use tempfile::TempDir;
3253
3254 let temp_dir = TempDir::new().unwrap();
3256 let db_path = temp_dir.path().join("test_dynamic.db");
3259
3260 let config = crate::AuroraConfig {
3262 db_path,
3263 enable_write_buffering: false,
3264 durability_mode: crate::DurabilityMode::Synchronous,
3265 ..Default::default()
3266 };
3267 let db = Aurora::with_config(config).unwrap();
3268
3269 db.new_collection(
3271 "users",
3272 vec![
3273 ("name", crate::FieldType::String, false),
3274 ("profile", crate::FieldType::Any, false),
3275 ],
3276 )
3277 .await
3278 .unwrap();
3279
3280 db.new_collection(
3281 "orders",
3282 vec![
3283 ("user_id", crate::FieldType::String, false),
3284 ("theme", crate::FieldType::String, false),
3285 ],
3286 )
3287 .await
3288 .unwrap();
3289
3290 db.new_collection(
3291 "user_settings",
3292 vec![
3293 ("user_id", crate::FieldType::String, false),
3294 ("theme", crate::FieldType::String, false),
3295 ],
3296 )
3297 .await
3298 .unwrap();
3299
3300 let mutation = r#"
3303 mutation DynamicFlow {
3304 user: insertInto(collection: "users", data: {
3305 name: "John",
3306 profile: { settings: { theme: "dark" } }
3307 }) {
3308 id
3309 name
3310 profile
3311 }
3312
3313 order: insertInto(collection: "orders", data: {
3314 user_id: "$user.id",
3315 theme: "$user.profile.settings.theme"
3316 }) {
3317 id
3318 user_id
3319 theme
3320 }
3321
3322 job: enqueueJob(
3323 jobType: "send_email",
3324 payload: {
3325 orderId: "$order.id",
3326 userId: "$order.user_id",
3327 theme: "$order.theme"
3328 }
3329 )
3330 }
3331 "#;
3332
3333 let result = execute(&db, mutation, ExecutionOptions::new())
3334 .await
3335 .unwrap();
3336
3337 match result {
3338 ExecutionResult::Mutation(_res) => {
3339 panic!("Expected Batch result for multi-op mutation, got Mutation");
3341 }
3342 ExecutionResult::Batch(results) => {
3343 assert_eq!(results.len(), 3);
3344
3345 let users = db.aql_get_all_collection("users").await.unwrap();
3348 assert_eq!(users.len(), 1);
3349 let user_id = &users[0].id;
3350
3351 let orders = db.aql_get_all_collection("orders").await.unwrap();
3353 assert_eq!(orders.len(), 1);
3354
3355 let order_doc = &orders[0];
3357 assert_eq!(
3358 order_doc.data.get("user_id"),
3359 Some(&Value::String(user_id.clone()))
3360 );
3361 assert_eq!(
3362 order_doc.data.get("theme"),
3363 Some(&Value::String("dark".to_string()))
3364 );
3365 }
3366 _ => panic!("Expected Batch result"),
3367 }
3368 }
3369}