use super::ast::{self, Field, Filter as AqlFilter, FragmentDef, MutationOp, Operation, Selection};
use super::executor_utils::{CompiledFilter, compile_filter};
use crate::Aurora;
use crate::error::{AqlError, ErrorCode, Result};
use crate::types::{Document, FieldDefinition, FieldType, ScalarType, Value};
use serde::Serialize;
use serde_json::Value as JsonValue;
use std::collections::HashMap;
use std::sync::Arc;
#[allow(unused_imports)]
use chrono::TimeZone as _;
pub type ExecutionContext = HashMap<String, JsonValue>;
#[derive(Debug, Clone)]
pub struct QueryPlan {
pub collection: String,
pub filter: Option<ast::Filter>,
pub compiled_filter: Option<CompiledFilter>,
pub projection: Vec<ast::Field>,
pub limit: Option<usize>,
pub offset: usize,
pub after: Option<String>,
pub orderings: Vec<ast::Ordering>,
pub is_connection: bool,
pub has_lookups: bool,
pub fragments: HashMap<String, FragmentDef>,
pub variable_definitions: Vec<ast::VariableDefinition>,
pub directives: Vec<ast::Directive>,
pub lookup_dependencies: Vec<String>,
}
impl QueryPlan {
pub fn validate(&self, provided_variables: &HashMap<String, ast::Value>) -> Result<()> {
validate_required_variables(&self.variable_definitions, provided_variables)
}
pub fn from_query(
db: &Aurora,
query: &ast::Query,
fragments: &HashMap<String, FragmentDef>,
variables: &HashMap<String, ast::Value>,
) -> Result<Vec<Self>> {
let root_fields = collect_fields(&query.selection_set, fragments, variables, None)?;
let mut plans = Vec::new();
for field in root_fields {
let collection = field.name.clone();
let filter = extract_filter_from_args(&field.arguments)?;
let compiled_filter = if let Some(ref f) = filter {
Some(compile_filter(f)?)
} else {
None
};
let sub_fields = collect_fields(
&field.selection_set,
fragments,
variables,
Some(&field.name),
)?;
let (limit, offset) = extract_pagination(&field.arguments);
let (first, after, _last, _before) = extract_cursor_pagination(&field.arguments);
let orderings = extract_order_by(&field.arguments);
let is_connection = sub_fields
.iter()
.any(|f| f.name == "edges" || f.name == "pageInfo");
let lookup_dependencies =
identify_lookup_dependencies(db, &collection, &sub_fields, variables);
let has_lookups = !lookup_dependencies.is_empty();
plans.push(QueryPlan {
collection,
filter,
compiled_filter,
projection: sub_fields,
limit: limit.or(first),
offset,
after,
orderings,
is_connection,
has_lookups,
fragments: fragments.clone(),
variable_definitions: query.variable_definitions.clone(),
directives: field.directives.clone(),
lookup_dependencies,
});
}
Ok(plans)
}
}
pub async fn execute(db: &Aurora, aql: &str, options: ExecutionOptions) -> Result<ExecutionResult> {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let query_key = {
let mut hasher = DefaultHasher::new();
aql.trim().hash(&mut hasher);
hasher.finish()
};
let vars: HashMap<String, ast::Value> = options.variables.clone();
if let Some(plan) = db.plan_cache.get(&query_key) {
plan.validate(&vars)?;
return execute_plan(db, &plan, &vars, &options).await;
}
let mut doc = super::parse(aql)?;
println!("Parsed document with {} operations", doc.operations.len());
if let Some(Operation::Query(query)) = doc
.operations
.iter()
.find(|op| matches!(op, Operation::Query(_)))
{
let fragments: HashMap<String, FragmentDef> = doc
.operations
.iter()
.filter_map(|op| {
if let Operation::FragmentDefinition(f) = op {
Some((f.name.clone(), f.clone()))
} else {
None
}
})
.collect();
let root_fields = collect_fields(&query.selection_set, &fragments, &vars, None)?;
let has_search = root_fields
.iter()
.any(|f| f.arguments.iter().any(|a| a.name == "search"));
if !has_search {
let plans = QueryPlan::from_query(db, query, &fragments, &vars)?;
if plans.len() == 1 {
let plan = Arc::new(plans[0].clone());
plan.validate(&vars)?;
db.plan_cache.insert(query_key, Arc::clone(&plan));
return execute_plan(db, &plan, &vars, &options).await;
}
}
}
let mut vars = vars;
for op in &doc.operations {
if let Operation::Query(q) = op {
for var_def in &q.variable_definitions {
if !vars.contains_key(&var_def.name) {
if let Some(default) = &var_def.default_value {
vars.insert(var_def.name.clone(), default.clone());
}
}
}
}
}
super::validator::resolve_variables(&mut doc, &vars).map_err(|e| {
let code = match e.code {
super::validator::ErrorCode::MissingRequiredVariable => ErrorCode::UndefinedVariable,
super::validator::ErrorCode::TypeMismatch => ErrorCode::TypeError,
_ => ErrorCode::QueryError,
};
AqlError::new(code, e.to_string())
})?;
execute_document(db, &doc, &options).await
}
pub async fn execute_plan(
db: &Aurora,
plan: &QueryPlan,
variables: &HashMap<String, ast::Value>,
options: &ExecutionOptions,
) -> Result<ExecutionResult> {
check_permissions(&plan.directives, options, false)?;
let collection_name = &plan.collection;
let effective_limit = plan.limit;
let mut indexed_docs = None;
if let Some(ref f) = plan.filter {
if let Some((field, val)) =
find_indexed_equality_filter_runtime(f, db, collection_name, variables)
{
let index_ready = field == "_sid" || !db.is_index_building(collection_name, &field);
if index_ready {
let mut db_val = aql_value_to_db_value(&val, variables)?;
if let Ok(col_def) = db.get_collection_definition(collection_name) {
if let Some(f_def) = col_def.fields.get(&field) {
db_val = db_val.coerce_to(&f_def.field_type);
}
}
let ids = if field == "_sid" {
match &db_val {
Value::String(s) => vec![s.clone()],
Value::Uuid(u) => vec![u.to_string()],
_ => vec![],
}
} else {
db.get_ids_from_index(collection_name, &field, &db_val)
};
let mut docs = Vec::with_capacity(ids.len());
for id in ids {
if let Some(doc) = db.get_document(collection_name, &id)? {
if let Some(ref cf) = plan.compiled_filter {
if matches_filter(&doc, cf, variables) {
docs.push(doc);
}
} else {
docs.push(doc);
}
}
}
indexed_docs = Some(docs);
} }
}
let mut docs = if let Some(d) = indexed_docs {
d
} else {
let vars_arc = Arc::new(variables.clone());
let cf_clone = plan.compiled_filter.clone();
let filter_fn = move |doc: &Document| {
cf_clone
.as_ref()
.map(|f| matches_filter(doc, f, &vars_arc))
.unwrap_or(true)
};
let scan_limit = if plan.after.is_some() || !plan.orderings.is_empty() {
None
} else {
plan.limit.map(|l| {
let base = if plan.is_connection { l + 1 } else { l };
base + plan.offset
})
};
db.scan_and_filter(collection_name, filter_fn, scan_limit)?
};
if !plan.orderings.is_empty() {
apply_ordering(&mut docs, &plan.orderings);
}
if let Some(ref cursor) = plan.after {
if let Some(pos) = docs.iter().position(|d| &d._sid == cursor) {
docs.drain(0..=pos);
}
}
if plan.offset > 0 {
if plan.offset < docs.len() {
docs.drain(0..plan.offset);
} else {
docs.clear();
}
}
if plan.is_connection {
return Ok(ExecutionResult::Query(execute_connection(
docs,
&plan.projection,
effective_limit,
&plan.fragments,
variables,
options,
)?));
}
if let Some(l) = effective_limit {
docs.truncate(l);
}
if options.apply_projections && !plan.projection.is_empty() {
let agg_field = plan.projection.iter().find(|f| f.name == "aggregate");
let group_by_field = plan.projection.iter().find(|f| f.name == "groupBy");
if let Some(f) = group_by_field {
let field_name = f
.arguments
.iter()
.find(|a| a.name == "field")
.and_then(|a| match &a.value {
ast::Value::String(s) => Some(s),
_ => None,
});
if let Some(group_field) = field_name {
let mut groups: HashMap<String, Vec<Document>> = HashMap::new();
for d in docs {
let key = d
.data
.get(group_field)
.map(|v| match v {
Value::String(s) => s.clone(),
_ => v.to_string(),
})
.unwrap_or_else(|| "null".to_string());
groups.entry(key).or_default().push(d);
}
let mut group_docs = Vec::with_capacity(groups.len());
for (key, group_items) in groups {
let mut data = HashMap::new();
for selection in &f.selection_set {
if let Selection::Field(sub_f) = selection {
let alias = sub_f.alias.as_ref().unwrap_or(&sub_f.name);
match sub_f.name.as_str() {
"key" => {
data.insert(alias.clone(), Value::String(key.clone()));
}
"count" => {
data.insert(
alias.clone(),
Value::Int(group_items.len() as i64),
);
}
"nodes" => {
let sub_fields = collect_fields(
&sub_f.selection_set,
&plan.fragments,
variables,
None,
)
.unwrap_or_default();
let mut projected_nodes = Vec::with_capacity(group_items.len());
for d in &group_items {
let node_doc =
apply_projection(d.clone(), &sub_fields, options)?;
projected_nodes.push(Value::Object(node_doc.data));
}
data.insert(alias.clone(), Value::Array(projected_nodes));
}
"aggregate" => {
let agg_res =
compute_aggregates(&group_items, &sub_f.selection_set);
data.insert(alias.clone(), Value::Object(agg_res));
}
_ => {}
}
}
}
group_docs.push(Document {
_sid: format!("group:{}", key),
data,
});
}
docs = group_docs;
}
} else if let Some(agg) = agg_field {
let agg_result = compute_aggregates(&docs, &agg.selection_set);
let alias = agg.alias.as_ref().unwrap_or(&agg.name);
if plan.projection.len() == 1 {
let mut data = HashMap::new();
data.insert(alias.clone(), Value::Object(agg_result));
docs = vec![Document {
_sid: "aggregate".to_string(),
data,
}];
} else {
let mut projected = Vec::with_capacity(docs.len());
for mut d in docs {
d.data
.insert(alias.clone(), Value::Object(agg_result.clone()));
projected.push(apply_projection(d, &plan.projection, options)?);
}
docs = projected;
}
} else if !plan.has_lookups {
let mut projected = Vec::with_capacity(docs.len());
for d in docs {
projected.push(apply_projection(d, &plan.projection, options)?);
}
docs = projected;
} else {
let mut projected = Vec::with_capacity(docs.len());
for d in docs {
let (proj_doc, _) = apply_projection_with_lookups(
db,
d,
collection_name,
&plan.projection,
&plan.fragments,
variables,
options,
&plan.lookup_dependencies,
)
.await?;
projected.push(proj_doc);
}
docs = projected;
}
}
Ok(ExecutionResult::Query(QueryResult {
collection: collection_name.clone(),
documents: docs,
total_count: None,
deferred_fields: vec![],
explain: None,
}))
}
fn compute_aggregates(docs: &[Document], selections: &[Selection]) -> HashMap<String, Value> {
let mut results = HashMap::new();
for selection in selections {
if let Selection::Field(f) = selection {
let alias = f.alias.as_ref().unwrap_or(&f.name);
let value = match f.name.as_str() {
"count" => Value::Int(docs.len() as i64),
"sum" => {
let field =
f.arguments
.iter()
.find(|a| a.name == "field")
.and_then(|a| match &a.value {
ast::Value::String(s) => Some(s),
_ => None,
});
if let Some(field_name) = field {
let sum: f64 = docs
.iter()
.filter_map(|d| d.data.get(field_name))
.filter_map(|v| match v {
Value::Int(i) => Some(*i as f64),
Value::Float(f) => Some(*f),
_ => None,
})
.sum();
Value::Float(sum)
} else {
Value::Null
}
}
"avg" => {
let field =
f.arguments
.iter()
.find(|a| a.name == "field")
.and_then(|a| match &a.value {
ast::Value::String(s) => Some(s),
_ => None,
});
if let Some(field_name) = field
&& !docs.is_empty()
{
let values: Vec<f64> = docs
.iter()
.filter_map(|d| d.data.get(field_name))
.filter_map(|v| match v {
Value::Int(i) => Some(*i as f64),
Value::Float(f) => Some(*f),
_ => None,
})
.collect();
if values.is_empty() {
Value::Null
} else {
let sum: f64 = values.iter().sum();
Value::Float(sum / values.len() as f64)
}
} else {
Value::Null
}
}
"min" => {
let field =
f.arguments
.iter()
.find(|a| a.name == "field")
.and_then(|a| match &a.value {
ast::Value::String(s) => Some(s),
_ => None,
});
if let Some(field_name) = field
&& !docs.is_empty()
{
let min = docs
.iter()
.filter_map(|d| d.data.get(field_name))
.filter_map(|v| match v {
Value::Int(i) => Some(*i as f64),
Value::Float(f) => Some(*f),
_ => None,
})
.fold(f64::INFINITY, f64::min);
if min == f64::INFINITY {
Value::Null
} else {
Value::Float(min)
}
} else {
Value::Null
}
}
"max" => {
let field =
f.arguments
.iter()
.find(|a| a.name == "field")
.and_then(|a| match &a.value {
ast::Value::String(s) => Some(s),
_ => None,
});
if let Some(field_name) = field
&& !docs.is_empty()
{
let max = docs
.iter()
.filter_map(|d| d.data.get(field_name))
.filter_map(|v| match v {
Value::Int(i) => Some(*i as f64),
Value::Float(f) => Some(*f),
_ => None,
})
.fold(f64::NEG_INFINITY, f64::max);
if max == f64::NEG_INFINITY {
Value::Null
} else {
Value::Float(max)
}
} else {
Value::Null
}
}
_ => Value::Null,
};
results.insert(alias.clone(), value);
}
}
results
}
fn find_indexed_equality_filter_runtime(
filter: &ast::Filter,
db: &Aurora,
collection: &str,
variables: &HashMap<String, ast::Value>,
) -> Option<(String, ast::Value)> {
match filter {
ast::Filter::Eq(field, val) => {
if field == "_sid" || db.has_index(collection, field) {
let resolved = resolve_if_variable(val, variables);
return Some((field.clone(), resolved.clone()));
}
}
ast::Filter::And(filters) => {
for f in filters {
if let Some(res) =
find_indexed_equality_filter_runtime(f, db, collection, variables)
{
return Some(res);
}
}
}
_ => {}
}
None
}
fn collect_fields(
selection_set: &[Selection],
fragments: &HashMap<String, FragmentDef>,
variable_values: &HashMap<String, ast::Value>,
parent_type: Option<&str>,
) -> Result<Vec<Field>> {
let mut fields = Vec::new();
for selection in selection_set {
match selection {
Selection::Field(field) => {
if should_include(&field.directives, variable_values)? {
fields.push(field.clone());
}
}
Selection::FragmentSpread(name) => {
if let Some(fragment) = fragments.get(name) {
let type_match = if let Some(parent) = parent_type {
parent == fragment.type_condition
} else {
true
};
if type_match {
let fragment_fields = collect_fields(
&fragment.selection_set,
fragments,
variable_values,
parent_type,
)?;
fields.extend(fragment_fields);
}
}
}
Selection::InlineFragment(inline) => {
let type_match = if let Some(parent) = parent_type {
parent == inline.type_condition
} else {
true
};
if type_match {
let inline_fields = collect_fields(
&inline.selection_set,
fragments,
variable_values,
parent_type,
)?;
fields.extend(inline_fields);
}
}
Selection::ComputedField(cf) => {
let (expr_val, _complex_expr) = match &cf.expression {
ast::ComputedExpression::TemplateString(s) => (s.clone(), None),
_ => (
"complex".to_string(),
Some(ast::Expression::Literal(ast::Value::Null)),
), };
let mut field = Field {
alias: Some(cf.alias.clone()),
name: "__compute__".to_string(),
arguments: vec![ast::Argument {
name: "expr".to_string(),
value: ast::Value::String(expr_val),
}],
directives: Vec::new(),
selection_set: Vec::new(),
computed_expression: None,
};
if let ast::ComputedExpression::TemplateString(s) = &cf.expression {
field.arguments[0].value = ast::Value::String(s.clone());
}
match &cf.expression {
ast::ComputedExpression::TemplateString(_) => {}
ast::ComputedExpression::StandardExpression(e) => {
field.computed_expression = Some(e.clone());
}
ast::ComputedExpression::FunctionCall { name, args } => {
field.computed_expression = Some(ast::Expression::FunctionCall {
name: name.clone(),
args: args.clone(),
});
}
ast::ComputedExpression::PipeExpression { .. } => {
}
ast::ComputedExpression::SqlExpression(s) => {
field.computed_expression = Some(ast::Expression::Literal(
ast::Value::String(format!("SQL: {}", s)),
));
}
ast::ComputedExpression::AggregateFunction { .. } => {
}
}
fields.push(field);
}
}
}
Ok(fields)
}
fn should_include(
directives: &[ast::Directive],
variables: &HashMap<String, ast::Value>,
) -> Result<bool> {
for dir in directives {
if dir.name == "skip" {
if let Some(arg) = dir.arguments.iter().find(|a| a.name == "if") {
let should_skip = resolve_boolean_arg(&arg.value, variables)?;
if should_skip {
return Ok(false);
}
}
} else if dir.name == "include" {
if let Some(arg) = dir.arguments.iter().find(|a| a.name == "if") {
let should_include = resolve_boolean_arg(&arg.value, variables)?;
if !should_include {
return Ok(false);
}
}
}
}
Ok(true)
}
fn check_permissions(
directives: &[ast::Directive],
options: &ExecutionOptions,
is_write: bool,
) -> Result<()> {
for directive in directives {
match directive.name.as_str() {
"auth" => {
if options.user_role.is_none() {
return Err(AqlError::new(
ErrorCode::Unauthorized,
"Authentication required for this operation".to_string(),
));
}
}
"require" | "allow" => {
let required_role = directive
.arguments
.iter()
.find(|a| {
if is_write {
a.name == "write" || a.name == "role"
} else {
a.name == "read" || a.name == "role"
}
})
.and_then(|a| {
if let ast::Value::String(s) = &a.value {
Some(s.as_str())
} else {
None
}
});
if let Some(role) = required_role {
if options.user_role.as_deref() != Some(role) {
return Err(AqlError::new(
ErrorCode::Forbidden,
format!("Role '{}' required for this operation", role),
));
}
}
}
_ => {}
}
}
Ok(())
}
fn resolve_boolean_arg(
value: &ast::Value,
variables: &HashMap<String, ast::Value>,
) -> Result<bool> {
match value {
ast::Value::Boolean(b) => Ok(*b),
ast::Value::Variable(name) => {
if let Some(val) = variables.get(name) {
match val {
ast::Value::Boolean(b) => Ok(*b),
_ => Err(AqlError::new(
ErrorCode::TypeError,
format!("Variable '{}' is not a boolean, got {:?}", name, val),
)),
}
} else {
Err(AqlError::new(
ErrorCode::UndefinedVariable,
format!("Variable '{}' is not defined", name),
))
}
}
_ => Err(AqlError::new(
ErrorCode::TypeError,
format!("Expected boolean value, got {:?}", value),
)),
}
}
fn validate_required_variables(
variable_definitions: &[ast::VariableDefinition],
provided_variables: &HashMap<String, ast::Value>,
) -> Result<()> {
for var_def in variable_definitions {
if var_def.var_type.is_required {
if !provided_variables.contains_key(&var_def.name) {
if var_def.default_value.is_none() {
return Err(AqlError::new(
ErrorCode::UndefinedVariable,
format!(
"Required variable '{}' (type: {}{}) is not provided",
var_def.name,
var_def.var_type.name,
if var_def.var_type.is_required {
"!"
} else {
""
}
),
));
}
}
}
}
Ok(())
}
#[derive(Debug)]
pub enum ExecutionResult {
Query(QueryResult),
Mutation(MutationResult),
Subscription(SubscriptionResult),
Batch(Vec<ExecutionResult>),
Schema(SchemaResult),
Migration(MigrationResult),
}
impl ExecutionResult {
pub fn as_query(&self) -> Option<&QueryResult> {
if let Self::Query(q) = self {
Some(q)
} else {
None
}
}
pub fn into_query(self) -> Option<QueryResult> {
if let Self::Query(q) = self {
Some(q)
} else {
None
}
}
pub fn as_mutation(&self) -> Option<&MutationResult> {
if let Self::Mutation(m) = self {
Some(m)
} else {
None
}
}
pub fn into_mutation(self) -> Option<MutationResult> {
if let Self::Mutation(m) = self {
Some(m)
} else {
None
}
}
pub fn as_subscription(&self) -> Option<&SubscriptionResult> {
if let Self::Subscription(s) = self {
Some(s)
} else {
None
}
}
pub fn into_subscription(self) -> Option<SubscriptionResult> {
if let Self::Subscription(s) = self {
Some(s)
} else {
None
}
}
pub fn bind<T: serde::de::DeserializeOwned>(self) -> Result<Vec<T>> {
match self {
Self::Query(q) => q.bind(),
Self::Mutation(m) => m.bind(),
Self::Batch(results) => {
let mut all = Vec::new();
for r in results {
all.extend(r.bind()?);
}
Ok(all)
}
_ => Err(AqlError::new(
ErrorCode::QueryError,
"Cannot bind results from schema or migration operations".to_string(),
)),
}
}
pub fn bind_first<T: serde::de::DeserializeOwned>(self) -> Result<T> {
match self {
Self::Query(q) => q.bind_first(),
Self::Mutation(m) => m.bind_first(),
Self::Batch(mut results) => {
if results.is_empty() {
return Err(AqlError::new(
ErrorCode::NotFound,
"No documents found to bind".to_string(),
));
}
results.remove(0).bind_first()
}
_ => Err(AqlError::new(
ErrorCode::QueryError,
"Cannot bind results from schema or migration operations".to_string(),
)),
}
}
}
#[derive(Debug, Clone)]
pub struct SchemaResult {
pub operation: String,
pub collection: String,
pub status: String,
}
#[derive(Debug, Clone)]
pub struct MigrationResult {
pub version: String,
pub steps_applied: usize,
pub status: String,
}
#[derive(Debug, Clone, Serialize)]
pub struct ExecutionPlan {
pub operations: Vec<String>,
pub estimated_cost: f64,
}
#[derive(Debug, Clone)]
pub struct QueryResult {
pub collection: String,
pub documents: Vec<Document>,
pub total_count: Option<usize>,
pub deferred_fields: Vec<String>,
pub explain: Option<ExplainResult>,
}
impl QueryResult {
pub fn bind<T: serde::de::DeserializeOwned>(self) -> Result<Vec<T>> {
self.documents
.into_iter()
.map(|d| d.bind())
.collect::<Result<Vec<T>>>()
}
pub fn bind_first<T: serde::de::DeserializeOwned>(self) -> Result<T> {
self.documents
.into_iter()
.next()
.ok_or_else(|| {
AqlError::new(
ErrorCode::NotFound,
"No documents found to bind".to_string(),
)
})?
.bind()
}
}
#[derive(Debug, Clone, Default)]
pub struct ExplainResult {
pub collection: String,
pub docs_scanned: usize,
pub index_used: bool,
pub elapsed_ms: u128,
}
#[derive(Debug, Clone)]
pub struct MutationResult {
pub operation: String,
pub collection: String,
pub affected_count: usize,
pub returned_documents: Vec<Document>,
}
impl MutationResult {
pub fn bind<T: serde::de::DeserializeOwned>(self) -> Result<Vec<T>> {
self.returned_documents
.into_iter()
.map(|d| d.bind())
.collect::<Result<Vec<T>>>()
}
pub fn bind_first<T: serde::de::DeserializeOwned>(self) -> Result<T> {
self.returned_documents
.into_iter()
.next()
.ok_or_else(|| {
AqlError::new(
ErrorCode::NotFound,
"No documents found to bind".to_string(),
)
})?
.bind()
}
}
#[derive(Debug)]
pub struct SubscriptionResult {
pub subscription_id: String,
pub collection: String,
pub stream: Option<crate::pubsub::ChangeListener>,
}
#[derive(Debug, Clone)]
pub struct ExecutionOptions {
pub skip_validation: bool,
pub apply_projections: bool,
pub debug_audit: bool,
pub variables: HashMap<String, ast::Value>,
pub user_role: Option<String>,
}
impl Default for ExecutionOptions {
fn default() -> Self {
Self {
skip_validation: false,
apply_projections: true,
debug_audit: false,
variables: HashMap::new(),
user_role: None,
}
}
}
impl ExecutionOptions {
pub fn new() -> Self {
Self::default()
}
pub fn with_variables(mut self, vars: HashMap<String, ast::Value>) -> Self {
self.variables = vars;
self
}
pub fn with_role(mut self, role: impl Into<String>) -> Self {
self.user_role = Some(role.into());
self
}
pub fn skip_validation(mut self) -> Self {
self.skip_validation = true;
self
}
}
pub(crate) fn json_to_aql_value(v: serde_json::Value) -> ast::Value {
match v {
serde_json::Value::Null => ast::Value::Null,
serde_json::Value::Bool(b) => ast::Value::Boolean(b),
serde_json::Value::Number(n) => {
if let Some(i) = n.as_i64() {
ast::Value::Int(i)
} else if let Some(f) = n.as_f64() {
ast::Value::Float(f)
} else {
ast::Value::Null
}
}
serde_json::Value::String(s) => ast::Value::String(s),
serde_json::Value::Array(arr) => {
ast::Value::Array(arr.into_iter().map(json_to_aql_value).collect())
}
serde_json::Value::Object(map) => ast::Value::Object(
map.into_iter()
.map(|(k, v)| (k, json_to_aql_value(v)))
.collect(),
),
}
}
pub async fn execute_document(
db: &Aurora,
doc: &ast::Document,
options: &ExecutionOptions,
) -> Result<ExecutionResult> {
if doc.operations.is_empty() {
return Err(AqlError::new(
ErrorCode::QueryError,
"No operations in document".to_string(),
));
}
println!("execute_document: first op: {:?}", doc.operations[0]);
let vars: HashMap<String, ast::Value> = options.variables.clone();
let fragments: HashMap<String, FragmentDef> = doc
.operations
.iter()
.filter_map(|op| {
if let Operation::FragmentDefinition(frag) = op {
Some((frag.name.clone(), frag.clone()))
} else {
None
}
})
.collect();
let executable_ops: Vec<&Operation> = doc
.operations
.iter()
.filter(|op| !matches!(op, Operation::FragmentDefinition(_)))
.collect();
if executable_ops.is_empty() {
return Err(AqlError::new(
ErrorCode::QueryError,
"No executable operations in document".to_string(),
));
}
if executable_ops.len() == 1 {
execute_operation(db, executable_ops[0], &vars, options, &fragments).await
} else {
let mut results = Vec::new();
for op in executable_ops {
results.push(execute_operation(db, op, &vars, options, &fragments).await?);
}
Ok(ExecutionResult::Batch(results))
}
}
async fn execute_operation(
db: &Aurora,
op: &Operation,
vars: &HashMap<String, ast::Value>,
options: &ExecutionOptions,
fragments: &HashMap<String, FragmentDef>,
) -> Result<ExecutionResult> {
match op {
Operation::Query(query) => execute_query(db, query, vars, options, fragments).await,
Operation::Mutation(mutation) => {
execute_mutation(db, mutation, vars, options, fragments).await
}
Operation::Subscription(sub) => execute_subscription(db, sub, vars, options).await,
Operation::Schema(schema) => execute_schema(db, schema, options).await,
Operation::Migration(migration) => execute_migration(db, migration, options).await,
Operation::Introspection(intro) => execute_introspection(db, intro).await,
Operation::Handler(handler) => execute_handler_registration(db, handler, options).await,
_ => Ok(ExecutionResult::Query(QueryResult {
collection: String::new(),
documents: vec![],
total_count: None,
deferred_fields: vec![],
explain: None,
})),
}
}
async fn execute_query(
db: &Aurora,
query: &ast::Query,
vars: &HashMap<String, ast::Value>,
options: &ExecutionOptions,
fragments: &HashMap<String, FragmentDef>,
) -> Result<ExecutionResult> {
validate_required_variables(&query.variable_definitions, vars)?;
let has_explain = query.directives.iter().any(|d| d.name == "explain");
let root_fields = collect_fields(&query.selection_set, fragments, vars, None)?;
let mut results = Vec::new();
for field in &root_fields {
let sub_fields = collect_fields(&field.selection_set, fragments, vars, Some(&field.name))?;
let start = std::time::Instant::now();
let mut result =
execute_collection_query(db, field, &sub_fields, vars, options, fragments).await?;
if has_explain {
let elapsed_ms = start.elapsed().as_millis();
let index_used =
field.arguments.iter().any(|a| a.name == "where") && !result.documents.is_empty();
result.explain = Some(ExplainResult {
collection: result.collection.clone(),
docs_scanned: result.documents.len(),
index_used,
elapsed_ms,
});
}
results.push(result);
}
if results.len() == 1 {
Ok(ExecutionResult::Query(results.remove(0)))
} else {
Ok(ExecutionResult::Batch(
results.into_iter().map(ExecutionResult::Query).collect(),
))
}
}
async fn execute_collection_query(
db: &Aurora,
field: &ast::Field,
sub_fields: &[ast::Field],
variables: &HashMap<String, ast::Value>,
options: &ExecutionOptions,
fragments: &HashMap<String, FragmentDef>,
) -> Result<QueryResult> {
let collection_name = &field.name;
check_permissions(&field.directives, options, false)?;
if let Some(search_arg) = field.arguments.iter().find(|a| a.name == "search") {
return execute_search_query(
db,
collection_name,
search_arg,
sub_fields,
field,
variables,
options,
)
.await;
}
let filter = extract_filter_from_args(&field.arguments)?;
let (limit, offset) = extract_pagination(&field.arguments);
let (first, after, _last, _before) = extract_cursor_pagination(&field.arguments);
let compiled_filter = if let Some(ref f) = filter {
Some(compile_filter(f)?)
} else {
None
};
let vars_arc = Arc::new(variables.clone());
let filter_fn = move |doc: &Document| {
compiled_filter
.as_ref()
.map(|f| matches_filter(doc, f, &vars_arc))
.unwrap_or(true)
};
let indexed_docs = if let Some(ref f) = filter {
match find_indexed_equality_filter(f, db, collection_name) {
Some((field_name, val))
if field_name == "_sid" || !db.is_index_building(collection_name, &field_name) =>
{
let db_val = aql_value_to_db_value(&val, variables)?;
let ids = if field_name == "_sid" {
match &db_val {
Value::String(s) => vec![s.clone()],
Value::Uuid(u) => vec![u.to_string()],
_ => vec![],
}
} else {
db.get_ids_from_index(collection_name, &field_name, &db_val)
};
let mut docs = Vec::with_capacity(ids.len());
for id in ids {
if let Some(doc) = db.get_document(collection_name, &id)? {
if filter_fn(&doc) {
docs.push(doc);
}
}
}
Some(docs)
}
_ => None,
}
} else {
None
};
let is_connection = sub_fields
.iter()
.any(|f| f.name == "edges" || f.name == "pageInfo");
let orderings = extract_order_by(&field.arguments);
let mut docs = if let Some(docs) = indexed_docs {
docs
} else {
let scan_limit = if after.is_some() || !orderings.is_empty() {
None
} else {
limit.or(first).map(|l| {
let base = if is_connection { l + 1 } else { l };
base + offset
})
};
db.scan_and_filter(collection_name, filter_fn, scan_limit)?
};
if let Some(validate_arg) = field.arguments.iter().find(|a| a.name == "validate") {
docs.retain(|doc| doc_passes_validate_arg(doc, validate_arg));
}
if !orderings.is_empty() {
apply_ordering(&mut docs, &orderings);
}
if let Some(ref cursor) = after {
if let Some(pos) = docs.iter().position(|d| &d._sid == cursor) {
docs.drain(0..=pos);
}
}
if is_connection {
return Ok(execute_connection(
docs,
sub_fields,
limit.or(first),
fragments,
variables,
options,
)?);
}
if offset > 0 {
if offset < docs.len() {
docs.drain(0..offset);
} else {
docs.clear();
}
}
if let Some(l) = limit.or(first) {
docs.truncate(l);
}
let lookup_dependencies =
identify_lookup_dependencies(db, collection_name, sub_fields, variables);
let has_lookups = !lookup_dependencies.is_empty();
let mut deferred_fields = Vec::new();
if options.apply_projections && !sub_fields.is_empty() {
if has_lookups {
let mut projected = Vec::with_capacity(docs.len());
for d in docs {
let (proj_doc, deferred) = apply_projection_with_lookups(
db,
d,
collection_name,
sub_fields,
fragments,
variables,
options,
&lookup_dependencies,
)
.await?;
projected.push(proj_doc);
if deferred_fields.is_empty() {
deferred_fields = deferred;
}
}
docs = projected;
} else {
let mut all_deferred = Vec::new();
let mut projected = Vec::with_capacity(docs.len());
for d in docs {
let (proj, deferred) =
apply_projection_and_defer(d, sub_fields, variables, options)?;
if all_deferred.is_empty() && !deferred.is_empty() {
all_deferred = deferred;
}
projected.push(proj);
}
docs = projected;
deferred_fields = all_deferred;
}
}
for sf in sub_fields {
if sf.name == "windowFunc" {
let alias = sf.alias.as_ref().unwrap_or(&sf.name).clone();
let wfield = arg_string(&sf.arguments, "field").unwrap_or_default();
let func = arg_string(&sf.arguments, "function").unwrap_or_else(|| "avg".to_string());
let wsize = arg_i64(&sf.arguments, "windowSize").unwrap_or(3) as usize;
apply_window_function(&mut docs, &alias, &wfield, &func, wsize);
}
}
if let Some(ds_field) = sub_fields.iter().find(|f| f.name == "downsample") {
let interval =
arg_string(&ds_field.arguments, "interval").unwrap_or_else(|| "1h".to_string());
let aggregation =
arg_string(&ds_field.arguments, "aggregation").unwrap_or_else(|| "avg".to_string());
let ds_sub: Vec<String> =
collect_fields(&ds_field.selection_set, fragments, variables, None)
.unwrap_or_default()
.iter()
.map(|f| f.name.clone())
.collect();
docs = apply_downsample(docs, &interval, &aggregation, &ds_sub);
}
Ok(QueryResult {
collection: collection_name.clone(),
documents: docs,
total_count: None,
deferred_fields,
explain: None,
})
}
async fn execute_search_query(
db: &Aurora,
collection: &str,
search_arg: &ast::Argument,
sub_fields: &[ast::Field],
field: &ast::Field,
variables: &HashMap<String, ast::Value>,
options: &ExecutionOptions,
) -> Result<QueryResult> {
let resolved_search_val = resolve_ast_deep(&search_arg.value, variables);
let (query_str, search_fields, fuzzy) = extract_search_params(&resolved_search_val);
let (limit, _) = extract_pagination(&field.arguments);
let mut builder = db.search(collection).query(&query_str);
if fuzzy {
builder = builder.fuzzy(1);
}
if let Some(l) = limit {
builder = builder.limit(l);
}
let mut docs = builder
.collect_with_fields(if search_fields.is_empty() {
None
} else {
Some(&search_fields)
})
.await?;
if options.apply_projections && !sub_fields.is_empty() {
let mut projected = Vec::with_capacity(docs.len());
for d in docs {
let (proj, _) = apply_projection_and_defer(d, sub_fields, variables, options)?;
projected.push(proj);
}
docs = projected;
}
Ok(QueryResult {
collection: collection.to_string(),
documents: docs,
total_count: None,
deferred_fields: vec![],
explain: None,
})
}
fn extract_search_params(v: &ast::Value) -> (String, Vec<String>, bool) {
let mut query = String::new();
let mut fields = Vec::new();
let mut fuzzy = false;
if let ast::Value::Object(m) = v {
if let Some(ast::Value::String(q)) = m.get("query") {
query = q.clone();
}
if let Some(ast::Value::Array(arr)) = m.get("fields") {
for item in arr {
if let ast::Value::String(s) = item {
fields.push(s.clone());
}
}
}
if let Some(ast::Value::Boolean(b)) = m.get("fuzzy") {
fuzzy = *b;
}
}
(query, fields, fuzzy)
}
fn doc_passes_validate_arg(doc: &Document, validate_arg: &ast::Argument) -> bool {
if let ast::Value::Object(rules) = &validate_arg.value {
for (field_name, constraints_val) in rules {
if let ast::Value::Object(constraints) = constraints_val {
if let Some(field_val) = doc.data.get(field_name) {
for (constraint_name, constraint_val) in constraints {
if !check_inline_constraint(field_val, constraint_name, constraint_val) {
return false;
}
}
}
}
}
}
true
}
fn check_inline_constraint(value: &Value, constraint: &str, constraint_val: &ast::Value) -> bool {
match constraint {
"format" => {
if let (Value::String(s), ast::Value::String(fmt)) = (value, constraint_val) {
return match fmt.as_str() {
"email" => {
s.contains('@')
&& s.split('@')
.nth(1)
.map(|d| d.contains('.'))
.unwrap_or(false)
}
"url" => s.starts_with("http://") || s.starts_with("https://"),
"uuid" => uuid::Uuid::parse_str(s).is_ok(),
_ => true,
};
}
true
}
"min" => {
let n = match value {
Value::Int(i) => *i as f64,
Value::Float(f) => *f,
_ => return true,
};
let min = match constraint_val {
ast::Value::Float(f) => *f,
ast::Value::Int(i) => *i as f64,
_ => return true,
};
n >= min
}
"max" => {
let n = match value {
Value::Int(i) => *i as f64,
Value::Float(f) => *f,
_ => return true,
};
let max = match constraint_val {
ast::Value::Float(f) => *f,
ast::Value::Int(i) => *i as f64,
_ => return true,
};
n <= max
}
"minLength" => {
if let (Value::String(s), ast::Value::Int(n)) = (value, constraint_val) {
return s.len() >= *n as usize;
}
true
}
"maxLength" => {
if let (Value::String(s), ast::Value::Int(n)) = (value, constraint_val) {
return s.len() <= *n as usize;
}
true
}
"pattern" => {
if let (Value::String(s), ast::Value::String(pat)) = (value, constraint_val) {
if let Ok(re) = regex::Regex::new(pat) {
return re.is_match(s);
}
}
true
}
_ => true,
}
}
fn arg_string(args: &[ast::Argument], name: &str) -> Option<String> {
args.iter().find(|a| a.name == name).and_then(|a| {
if let ast::Value::String(s) = &a.value {
Some(s.clone())
} else {
None
}
})
}
fn arg_i64(args: &[ast::Argument], name: &str) -> Option<i64> {
args.iter().find(|a| a.name == name).and_then(|a| {
if let ast::Value::Int(i) = &a.value {
Some(*i)
} else {
None
}
})
}
fn identify_lookup_dependencies(
db: &Aurora,
collection: &str,
fields: &[ast::Field],
variables: &HashMap<String, ast::Value>,
) -> Vec<String> {
let mut deps = Vec::new();
for f in fields {
let mut found = false;
if let (Some(lf), Some(_c), Some(_ff)) = (
f.arguments.iter().find(|a| a.name == "localField"),
f.arguments.iter().find(|a| a.name == "collection"),
f.arguments.iter().find(|a| a.name == "foreignField"),
) {
if let ast::Value::String(s) = resolve_if_variable(&lf.value, variables) {
deps.push(s.clone());
}
found = true;
}
if !found {
if let Ok(col_def) = db.get_collection_definition(collection) {
if let Some(f_def) = col_def.fields.get(&f.name) {
if let Some(_rel) = &f_def.relation {
deps.push(f.name.clone());
}
}
}
}
}
deps
}
fn apply_projection_and_defer(
mut doc: Document,
fields: &[ast::Field],
vars: &HashMap<String, ast::Value>,
options: &ExecutionOptions,
) -> Result<(Document, Vec<String>)> {
if fields.is_empty() {
return Ok((doc, vec![]));
}
let mut proj = HashMap::new();
let mut deferred = Vec::new();
for f in fields {
check_permissions(&f.directives, options, false)?;
if f.directives.iter().any(|d| d.name == "defer") {
deferred.push(f.alias.as_ref().unwrap_or(&f.name).clone());
continue;
}
if f.name == "__compute__" {
let alias = f.alias.as_deref().unwrap_or("computed");
if let Some(expr_arg) = f.arguments.iter().find(|a| a.name == "expr") {
if let ast::Value::String(template) = &expr_arg.value {
let result = eval_template(template, &doc.data);
proj.insert(alias.to_string(), Value::String(result));
} else if let Some(expr) = f.computed_expression.as_ref() {
let result = eval_expression(expr, &doc.data, vars);
proj.insert(alias.to_string(), result);
}
}
continue;
}
if f.name == "id" {
if let Some(v) = doc.data.get("id") {
proj.insert(f.alias.as_ref().unwrap_or(&f.name).clone(), v.clone());
} else {
proj.insert(f.alias.as_ref().unwrap_or(&f.name).clone(), Value::Null);
}
} else if f.name == "_sid" {
proj.insert(
f.alias.as_ref().unwrap_or(&f.name).clone(),
Value::String(doc._sid.clone()),
);
} else if let Some(v) = doc.data.get(&f.name) {
proj.insert(f.alias.as_ref().unwrap_or(&f.name).clone(), v.clone());
}
}
doc.data = proj;
Ok((doc, deferred))
}
fn eval_expression(
expr: &ast::Expression,
data: &HashMap<String, Value>,
variables: &HashMap<String, ast::Value>,
) -> Value {
match expr {
ast::Expression::Literal(v) => match aql_value_to_db_value(v, variables) {
Ok(dv) => dv,
Err(_) => Value::Null,
},
ast::Expression::Variable(name) => {
if let Some(v) = variables.get(name) {
match aql_value_to_db_value(v, variables) {
Ok(dv) => dv,
Err(_) => Value::Null,
}
} else {
Value::Null
}
}
ast::Expression::FieldAccess(parts) => {
let mut current = if parts[0] == "_sid" {
Some(Value::String(
data.get("_sid")
.cloned()
.and_then(|v| v.as_str().map(|s| s.to_string()))
.unwrap_or_default(),
))
} else {
data.get(&parts[0]).cloned()
};
for part in parts.iter().skip(1) {
if let Some(Value::Object(map)) = current {
current = map.get(part).cloned();
} else {
current = None;
break;
}
}
current.unwrap_or(Value::Null)
}
ast::Expression::Binary { op, left, right } => {
let lv = eval_expression(left, data, variables);
let rv = eval_expression(right, data, variables);
eval_binary_op(*op, lv, rv)
}
ast::Expression::Unary { op, expr } => {
let v = eval_expression(expr, data, variables);
eval_unary_op(*op, v)
}
ast::Expression::Ternary {
condition,
then_expr,
else_expr,
} => {
let cv = eval_expression(condition, data, variables);
let is_true = match cv {
Value::Bool(b) => b,
Value::Null => false,
Value::Int(i) => i != 0,
Value::Float(f) => f != 0.0,
Value::String(s) => !s.is_empty(),
_ => true,
};
if is_true {
eval_expression(then_expr, data, variables)
} else {
eval_expression(else_expr, data, variables)
}
}
ast::Expression::FunctionCall { name, args } => {
let evaluated_args: Vec<Value> = args
.iter()
.map(|a| eval_expression(a, data, variables))
.collect();
eval_function_call(name, evaluated_args)
}
}
}
fn eval_binary_op(op: ast::BinaryOp, left: Value, right: Value) -> Value {
match op {
ast::BinaryOp::Add => match (left, right) {
(Value::Int(a), Value::Int(b)) => Value::Int(a + b),
(Value::Int(a), Value::Float(b)) => Value::Float(a as f64 + b),
(Value::Float(a), Value::Int(b)) => Value::Float(a + b as f64),
(Value::Float(a), Value::Float(b)) => Value::Float(a + b),
(Value::String(mut a), Value::String(b)) => {
a.push_str(&b);
Value::String(a)
}
_ => Value::Null,
},
ast::BinaryOp::Sub => match (left, right) {
(Value::Int(a), Value::Int(b)) => Value::Int(a - b),
(Value::Float(a), Value::Float(b)) => Value::Float(a - b),
_ => Value::Null,
},
ast::BinaryOp::Mul => match (left, right) {
(Value::Int(a), Value::Int(b)) => Value::Int(a * b),
(Value::Int(a), Value::Float(b)) => Value::Float(a as f64 * b),
(Value::Float(a), Value::Int(b)) => Value::Float(a * b as f64),
(Value::Float(a), Value::Float(b)) => Value::Float(a * b),
_ => Value::Null,
},
ast::BinaryOp::Div => match (left, right) {
(Value::Int(a), Value::Int(b)) if b != 0 => Value::Int(a / b),
(Value::Float(a), Value::Float(b)) if b != 0.0 => Value::Float(a / b),
_ => Value::Null,
},
ast::BinaryOp::Eq => Value::Bool(left == right),
ast::BinaryOp::Ne => Value::Bool(left != right),
ast::BinaryOp::Gt => Value::Bool(left > right),
ast::BinaryOp::Gte => Value::Bool(left >= right),
ast::BinaryOp::Lt => Value::Bool(left < right),
ast::BinaryOp::Lte => Value::Bool(left <= right),
ast::BinaryOp::And => {
let lb = matches!(left, Value::Bool(true));
let rb = matches!(right, Value::Bool(true));
Value::Bool(lb && rb)
}
ast::BinaryOp::Or => {
let lb = matches!(left, Value::Bool(true));
let rb = matches!(right, Value::Bool(true));
Value::Bool(lb || rb)
}
_ => Value::Null,
}
}
fn eval_unary_op(op: ast::UnaryOp, val: Value) -> Value {
match op {
ast::UnaryOp::Not => match val {
Value::Bool(b) => Value::Bool(!b),
Value::Null => Value::Bool(true),
_ => Value::Bool(false),
},
ast::UnaryOp::Neg => match val {
Value::Int(i) => Value::Int(-i),
Value::Float(f) => Value::Float(-f),
_ => Value::Null,
},
}
}
fn eval_function_call(name: &str, args: Vec<Value>) -> Value {
match name {
"upper" | "uppercase" => {
if let Some(Value::String(s)) = args.get(0) {
Value::String(s.to_uppercase())
} else {
Value::Null
}
}
"lower" | "lowercase" => {
if let Some(Value::String(s)) = args.get(0) {
Value::String(s.to_lowercase())
} else {
Value::Null
}
}
"len" | "length" => match args.get(0) {
Some(Value::String(s)) => Value::Int(s.len() as i64),
Some(Value::Array(a)) => Value::Int(a.len() as i64),
Some(Value::Object(o)) => Value::Int(o.len() as i64),
_ => Value::Null,
},
_ => Value::Null,
}
}
fn eval_template(template: &str, data: &HashMap<String, Value>) -> String {
let mut result = template.to_string();
for (k, v) in data {
let p1 = format!("${{{}}}", k);
let p2 = format!("${}", k);
let v_str = match v {
Value::String(s) => s.clone(),
_ => v.to_string(),
};
if result.contains(&p1) {
result = result.replace(&p1, &v_str);
}
if result.contains(&p2) {
result = result.replace(&p2, &v_str);
}
}
result
}
fn apply_window_function(
docs: &mut Vec<Document>,
alias: &str,
field: &str,
function: &str,
window: usize,
) {
if docs.is_empty() || window == 0 {
return;
}
let values: Vec<Option<f64>> = docs
.iter()
.map(|d| match d.data.get(field) {
Some(Value::Int(i)) => Some(*i as f64),
Some(Value::Float(f)) => Some(*f),
_ => None,
})
.collect();
for (i, doc) in docs.iter_mut().enumerate() {
let start = if i + 1 >= window { i + 1 - window } else { 0 };
let window_vals: Vec<f64> = values[start..=i].iter().filter_map(|v| *v).collect();
if window_vals.is_empty() {
continue;
}
let result = match function {
"rollingAvg" | "avg" => window_vals.iter().sum::<f64>() / window_vals.len() as f64,
"rollingSum" | "sum" => window_vals.iter().sum::<f64>(),
"rollingMin" | "min" => window_vals.iter().cloned().fold(f64::INFINITY, f64::min),
"rollingMax" | "max" => window_vals
.iter()
.cloned()
.fold(f64::NEG_INFINITY, f64::max),
_ => window_vals.iter().sum::<f64>() / window_vals.len() as f64,
};
doc.data.insert(alias.to_string(), Value::Float(result));
}
}
fn apply_downsample(
docs: Vec<Document>,
interval: &str,
aggregation: &str,
value_fields: &[String],
) -> Vec<Document> {
let interval_secs: i64 = parse_interval(interval);
if interval_secs <= 0 {
return docs;
}
let mut buckets: std::collections::BTreeMap<i64, Vec<Document>> =
std::collections::BTreeMap::new();
let mut leftover = Vec::new();
for doc in docs {
let ts = ["timestamp", "ts", "created_at", "time"]
.iter()
.find_map(|&k| doc.data.get(k))
.and_then(|v| match v {
Value::String(s) => chrono::DateTime::parse_from_rfc3339(s)
.ok()
.map(|dt| dt.timestamp()),
Value::Int(i) => Some(*i),
_ => None,
});
if let Some(t) = ts {
let bucket = (t / interval_secs) * interval_secs;
buckets.entry(bucket).or_default().push(doc);
} else {
leftover.push(doc);
}
}
let mut result = Vec::new();
for (bucket_ts, group) in buckets {
let mut data = HashMap::new();
data.insert(
"timestamp".to_string(),
Value::String(
chrono::DateTime::from_timestamp(bucket_ts, 0)
.map(|dt: chrono::DateTime<chrono::Utc>| dt.to_rfc3339())
.unwrap_or_default(),
),
);
data.insert("count".to_string(), Value::Int(group.len() as i64));
for field in value_fields {
if field == "timestamp" || field == "count" {
continue;
}
let nums: Vec<f64> = group
.iter()
.filter_map(|d| match d.data.get(field) {
Some(Value::Int(i)) => Some(*i as f64),
Some(Value::Float(f)) => Some(*f),
_ => None,
})
.collect();
if nums.is_empty() {
continue;
}
let agg = match aggregation {
"sum" => nums.iter().sum::<f64>(),
"min" => nums.iter().cloned().fold(f64::INFINITY, f64::min),
"max" => nums.iter().cloned().fold(f64::NEG_INFINITY, f64::max),
"count" => nums.len() as f64,
_ => nums.iter().sum::<f64>() / nums.len() as f64, };
data.insert(field.clone(), Value::Float(agg));
}
result.push(Document {
_sid: bucket_ts.to_string(),
data,
});
}
result.extend(leftover);
result
}
fn parse_interval(s: &str) -> i64 {
let s = s.trim();
if s.ends_with('s') {
s[..s.len() - 1].parse().unwrap_or(0)
} else if s.ends_with('m') {
s[..s.len() - 1].parse::<i64>().unwrap_or(0) * 60
} else if s.ends_with('h') {
s[..s.len() - 1].parse::<i64>().unwrap_or(0) * 3600
} else if s.ends_with('d') {
s[..s.len() - 1].parse::<i64>().unwrap_or(0) * 86400
} else {
s.parse().unwrap_or(3600)
}
}
async fn execute_handler_registration(
db: &Aurora,
handler: &ast::HandlerDef,
_options: &ExecutionOptions,
) -> Result<ExecutionResult> {
use crate::pubsub::events::ChangeType;
let collection = match &handler.trigger {
ast::HandlerTrigger::Insert { collection }
| ast::HandlerTrigger::Update { collection }
| ast::HandlerTrigger::Delete { collection } => {
collection.as_deref().unwrap_or("*").to_string()
}
_ => "*".to_string(),
};
let trigger_type = match &handler.trigger {
ast::HandlerTrigger::Insert { .. } => Some(ChangeType::Insert),
ast::HandlerTrigger::Update { .. } => Some(ChangeType::Update),
ast::HandlerTrigger::Delete { .. } => Some(ChangeType::Delete),
_ => None,
};
let mut listener = if collection == "*" {
db.pubsub.listen_all()
} else {
db.pubsub.listen(collection.clone())
};
let db_clone = db.clone();
let action = handler.action.clone();
let handler_name = handler.name.clone();
tokio::spawn(async move {
loop {
match listener.recv().await {
Ok(event) => {
let matches = trigger_type
.as_ref()
.map(|t| &event.change_type == t)
.unwrap_or(true);
if !matches {
continue;
}
let mut vars = HashMap::new();
vars.insert("_id".to_string(), ast::Value::String(event._sid.clone()));
if let Some(doc) = &event.document {
for (k, v) in &doc.data {
vars.insert(format!("_{}", k), db_value_to_ast_value(v));
}
}
let _ = execute_mutation_op(
&db_clone,
&action,
&vars,
&HashMap::new(),
&ExecutionOptions::default(),
&HashMap::new(),
)
.await;
}
Err(_) => {
eprintln!("[handler:{}] channel closed, stopping", handler_name);
break;
}
}
}
});
eprintln!(
"[handler] '{}' registered on '{}'",
handler.name, collection
);
let mut data = HashMap::new();
data.insert("name".to_string(), Value::String(handler.name.clone()));
data.insert("collection".to_string(), Value::String(collection));
data.insert(
"status".to_string(),
Value::String("registered".to_string()),
);
Ok(ExecutionResult::Query(QueryResult {
collection: "__handler".to_string(),
documents: vec![Document {
_sid: handler.name.clone(),
data,
}],
total_count: Some(1),
deferred_fields: vec![],
explain: None,
}))
}
fn db_value_to_ast_value(v: &Value) -> ast::Value {
match v {
Value::Null => ast::Value::Null,
Value::Bool(b) => ast::Value::Boolean(*b),
Value::Int(i) => ast::Value::Int(*i),
Value::Float(f) => ast::Value::Float(*f),
Value::String(s) => ast::Value::String(s.clone()),
Value::Uuid(u) => ast::Value::String(u.to_string()),
Value::DateTime(dt) => ast::Value::String(dt.to_rfc3339()),
Value::Array(arr) => ast::Value::Array(arr.iter().map(db_value_to_ast_value).collect()),
Value::Object(m) => ast::Value::Object(
m.iter()
.map(|(k, v)| (k.clone(), db_value_to_ast_value(v)))
.collect(),
),
}
}
async fn execute_mutation(
db: &Aurora,
mutation: &ast::Mutation,
vars: &HashMap<String, ast::Value>,
options: &ExecutionOptions,
fragments: &HashMap<String, FragmentDef>,
) -> Result<ExecutionResult> {
use crate::transaction::ACTIVE_TRANSACTION_ID;
validate_required_variables(&mutation.variable_definitions, vars)?;
check_permissions(&mutation.directives, options, true)?;
let already_in_tx = ACTIVE_TRANSACTION_ID
.try_with(|id| *id)
.ok()
.and_then(|id| db.transaction_manager.active_transactions.get(&id))
.is_some();
if already_in_tx {
let mut results = Vec::new();
let mut context = HashMap::new();
for mut_op in &mutation.operations {
let res = execute_mutation_op(db, mut_op, vars, &context, options, fragments).await?;
if let Some(alias) = &mut_op.alias {
if let Some(doc) = res.returned_documents.first() {
let mut m = serde_json::Map::new();
for (k, v) in &doc.data {
m.insert(k.clone(), aurora_value_to_json_value(v));
}
m.insert("id".to_string(), JsonValue::String(doc._sid.clone()));
context.insert(alias.clone(), JsonValue::Object(m));
}
}
results.push(res);
}
return if results.len() == 1 {
Ok(ExecutionResult::Mutation(results.remove(0)))
} else {
Ok(ExecutionResult::Batch(
results.into_iter().map(ExecutionResult::Mutation).collect(),
))
};
}
let tx_id = db.begin_transaction().await;
let exec_result = ACTIVE_TRANSACTION_ID
.scope(tx_id, async {
let mut results = Vec::new();
let mut context = HashMap::new();
for mut_op in &mutation.operations {
let res =
execute_mutation_op(db, mut_op, vars, &context, options, fragments).await?;
if let Some(alias) = &mut_op.alias {
if let Some(doc) = res.returned_documents.first() {
let mut m = serde_json::Map::new();
for (k, v) in &doc.data {
m.insert(k.clone(), aurora_value_to_json_value(v));
}
m.insert("id".to_string(), JsonValue::String(doc._sid.clone()));
context.insert(alias.clone(), JsonValue::Object(m));
}
}
results.push(res);
}
Ok::<_, crate::error::AqlError>(results)
})
.await;
match exec_result {
Ok(mut results) => {
db.commit_transaction(tx_id).await?;
if results.len() == 1 {
Ok(ExecutionResult::Mutation(results.remove(0)))
} else {
Ok(ExecutionResult::Batch(
results.into_iter().map(ExecutionResult::Mutation).collect(),
))
}
}
Err(e) => {
let _ = db.rollback_transaction(tx_id).await;
Err(e)
}
}
}
fn execute_mutation_op<'a>(
db: &'a Aurora,
mut_op: &'a ast::MutationOperation,
variables: &'a HashMap<String, ast::Value>,
context: &'a ExecutionContext,
options: &'a ExecutionOptions,
fragments: &'a HashMap<String, FragmentDef>,
) -> futures::future::BoxFuture<'a, Result<MutationResult>> {
use futures::future::FutureExt;
async move {
check_permissions(&mut_op.directives, options, true)?;
match &mut_op.operation {
MutationOp::Insert { collection, data } => {
let resolved = resolve_value(data, variables, context);
let doc = db
.aql_insert(collection, aql_value_to_hashmap(&resolved, variables)?)
.await?;
let returned = if !mut_op.selection_set.is_empty() && options.apply_projections {
let fields = collect_fields(
&mut_op.selection_set,
fragments,
variables,
Some(collection),
)
.unwrap_or_default();
vec![apply_projection(doc, &fields, options)?]
} else {
vec![doc]
};
Ok(MutationResult {
operation: "insert".to_string(),
collection: collection.clone(),
affected_count: 1,
returned_documents: returned,
})
}
MutationOp::Update {
collection,
filter,
data,
} => {
let cf = if let Some(f) = filter {
Some(compile_filter(f)?)
} else {
None
};
let update_data =
aql_value_to_hashmap(&resolve_value(data, variables, context), variables)?;
let mut affected = 0;
let mut returned = Vec::new();
let vars_arc = Arc::new(variables.clone());
let cf_arc = cf.map(Arc::new);
let matches = db.scan_and_filter(
collection,
|doc| {
if let Some(ref filter) = cf_arc {
matches_filter(doc, filter, &vars_arc)
} else {
true
}
},
None,
)?;
let fields = if !mut_op.selection_set.is_empty() {
Some(
collect_fields(
&mut_op.selection_set,
fragments,
variables,
Some(collection),
)
.unwrap_or_default(),
)
} else {
None
};
for doc in matches {
let mut new_data = doc.data.clone();
for (k, v) in &update_data {
let applied = apply_field_modifier(new_data.get(k), v);
new_data.insert(k.clone(), applied);
}
let updated_doc = db
.aql_update_document(collection, &doc._sid, new_data)
.await?;
affected += 1;
if let Some(ref f) = fields {
returned.push(apply_projection(updated_doc, f, options)?);
}
}
Ok(MutationResult {
operation: "update".to_string(),
collection: collection.clone(),
affected_count: affected,
returned_documents: returned,
})
}
MutationOp::Delete { collection, filter } => {
let cf = if let Some(f) = filter {
Some(compile_filter(f)?)
} else {
None
};
let mut affected = 0;
let vars_arc = Arc::new(variables.clone());
let cf_arc = cf.map(Arc::new);
let matches = db.scan_and_filter(
collection,
|doc| {
if let Some(ref filter) = cf_arc {
matches_filter(doc, filter, &vars_arc)
} else {
true
}
},
None,
)?;
for doc in matches {
db.aql_delete_document(collection, &doc._sid).await?;
affected += 1;
}
Ok(MutationResult {
operation: "delete".to_string(),
collection: collection.clone(),
affected_count: affected,
returned_documents: vec![],
})
}
MutationOp::InsertMany { collection, data } => {
let resolved = resolve_value(data, variables, context);
let items = match resolved {
ast::Value::Array(arr) => arr,
_ => {
return Err(AqlError::new(
ErrorCode::QueryError,
"insertMany data must be an array".to_string(),
));
}
};
let mut affected = 0;
let mut returned = Vec::new();
for item in items {
let doc = db
.aql_insert(collection, aql_value_to_hashmap(&item, variables)?)
.await?;
affected += 1;
if !mut_op.selection_set.is_empty() && options.apply_projections {
let fields = collect_fields(
&mut_op.selection_set,
fragments,
variables,
Some(collection),
)
.unwrap_or_default();
returned.push(apply_projection(doc, &fields, options)?);
} else {
returned.push(doc);
}
}
Ok(MutationResult {
operation: "insertMany".to_string(),
collection: collection.clone(),
affected_count: affected,
returned_documents: returned,
})
}
MutationOp::Upsert {
collection,
filter,
data,
} => {
let update_data =
aql_value_to_hashmap(&resolve_value(data, variables, context), variables)?;
let cf = if let Some(f) = filter {
Some(compile_filter(f)?)
} else {
None
};
let vars_arc = Arc::new(variables.clone());
let cf_arc = cf.map(Arc::new);
let matches = db.scan_and_filter(
collection,
|doc| {
if let Some(ref filter) = cf_arc {
matches_filter(doc, filter, &vars_arc)
} else {
true
}
},
Some(1),
)?;
let doc = if let Some(existing) = matches.into_iter().next() {
db.aql_update_document(collection, &existing._sid, update_data)
.await?
} else {
db.aql_insert(collection, update_data).await?
};
let returned = if !mut_op.selection_set.is_empty() && options.apply_projections {
let fields = collect_fields(
&mut_op.selection_set,
fragments,
variables,
Some(collection),
)
.unwrap_or_default();
vec![apply_projection(doc, &fields, options)?]
} else {
vec![doc]
};
Ok(MutationResult {
operation: "upsert".to_string(),
collection: collection.clone(),
affected_count: 1,
returned_documents: returned,
})
}
MutationOp::Transaction { operations } => {
let mut all_returned = Vec::new();
let mut total_affected = 0;
for op in operations {
let res =
execute_mutation_op(db, op, variables, context, options, fragments).await?;
total_affected += res.affected_count;
all_returned.extend(res.returned_documents);
}
Ok(MutationResult {
operation: "transaction".to_string(),
collection: String::new(),
affected_count: total_affected,
returned_documents: all_returned,
})
}
MutationOp::EnqueueJobs {
job_type,
payloads,
priority,
max_retries,
} => {
let workers = db.workers.as_ref().ok_or_else(|| {
AqlError::new(
ErrorCode::QueryError,
"Worker system not initialised".to_string(),
)
})?;
let job_priority = match priority {
ast::JobPriority::Low => crate::workers::JobPriority::Low,
ast::JobPriority::Normal => crate::workers::JobPriority::Normal,
ast::JobPriority::High => crate::workers::JobPriority::High,
ast::JobPriority::Critical => crate::workers::JobPriority::Critical,
};
let mut returned = Vec::new();
for payload in payloads {
let resolved = resolve_value(payload, variables, context);
let json_payload: std::collections::HashMap<String, serde_json::Value> =
if let ast::Value::Object(map) = &resolved {
map.iter()
.map(|(k, v)| (k.clone(), aql_value_to_json(v)))
.collect()
} else {
std::collections::HashMap::new()
};
let mut job =
crate::workers::Job::new(job_type.clone()).with_priority(job_priority);
for (k, v) in json_payload {
job = job.add_field(k, v);
}
if let Some(retries) = max_retries {
job = job.with_max_retries(*retries);
}
let job_id = workers.enqueue(job).await?;
let mut doc = crate::types::Document::new();
doc._sid = job_id.clone();
doc.data.insert("job_id".to_string(), Value::String(job_id));
doc.data
.insert("job_type".to_string(), Value::String(job_type.clone()));
doc.data
.insert("status".to_string(), Value::String("pending".to_string()));
returned.push(doc);
}
let count = returned.len();
Ok(MutationResult {
operation: "enqueueJobs".to_string(),
collection: "__jobs".to_string(),
affected_count: count,
returned_documents: returned,
})
}
MutationOp::Import { collection, data } => {
let mut affected = 0;
let mut returned = Vec::new();
let fields = if !mut_op.selection_set.is_empty() {
Some(
collect_fields(
&mut_op.selection_set,
fragments,
variables,
Some(collection),
)
.unwrap_or_default(),
)
} else {
None
};
for item in data {
let resolved = resolve_value(item, variables, context);
let map = aql_value_to_hashmap(&resolved, variables)?;
let doc = db.aql_insert(collection, map).await?;
affected += 1;
if let Some(ref f) = fields {
returned.push(apply_projection(doc, f, options)?);
}
}
Ok(MutationResult {
operation: "import".to_string(),
collection: collection.clone(),
affected_count: affected,
returned_documents: returned,
})
}
MutationOp::Export {
collection,
format: _,
} => {
let docs = db.scan_and_filter(collection, |_| true, None)?;
let fields = if !mut_op.selection_set.is_empty() {
Some(
collect_fields(
&mut_op.selection_set,
fragments,
variables,
Some(collection),
)
.unwrap_or_default(),
)
} else {
None
};
let mut returned = Vec::with_capacity(docs.len());
if let Some(ref f) = fields {
for d in docs {
returned.push(apply_projection(d, f, options)?);
}
} else {
returned = docs;
}
let count = returned.len();
Ok(MutationResult {
operation: "export".to_string(),
collection: collection.clone(),
affected_count: count,
returned_documents: returned,
})
}
MutationOp::EnqueueJob {
job_type,
payload,
priority,
scheduled_at,
max_retries,
} => {
let workers = db.workers.as_ref().ok_or_else(|| {
AqlError::new(
ErrorCode::QueryError,
"Worker system not initialised".to_string(),
)
})?;
let job_priority = match priority {
ast::JobPriority::Low => crate::workers::JobPriority::Low,
ast::JobPriority::Normal => crate::workers::JobPriority::Normal,
ast::JobPriority::High => crate::workers::JobPriority::High,
ast::JobPriority::Critical => crate::workers::JobPriority::Critical,
};
let resolved = resolve_value(payload, variables, context);
let json_payload: std::collections::HashMap<String, serde_json::Value> =
if let ast::Value::Object(map) = &resolved {
map.iter()
.map(|(k, v)| (k.clone(), aql_value_to_json(v)))
.collect()
} else {
std::collections::HashMap::new()
};
let mut job =
crate::workers::Job::new(job_type.clone()).with_priority(job_priority);
for (k, v) in json_payload {
job = job.add_field(k, v);
}
if let Some(retries) = max_retries {
job = job.with_max_retries(*retries);
}
if let Some(scheduled) = scheduled_at {
if let Ok(dt) = scheduled.parse::<chrono::DateTime<chrono::Utc>>() {
job = job.scheduled_at(dt);
}
}
let job_id = workers.enqueue(job).await?;
let mut doc = crate::types::Document::new();
doc._sid = job_id.clone();
doc.data
.insert("job_id".to_string(), crate::types::Value::String(job_id));
doc.data.insert(
"job_type".to_string(),
crate::types::Value::String(job_type.clone()),
);
doc.data.insert(
"status".to_string(),
crate::types::Value::String("pending".to_string()),
);
Ok(MutationResult {
operation: "enqueueJob".to_string(),
collection: "__jobs".to_string(),
affected_count: 1,
returned_documents: vec![doc],
})
}
}
}
.boxed()
}
async fn execute_subscription(
db: &Aurora,
sub: &ast::Subscription,
vars: &HashMap<String, ast::Value>,
_options: &ExecutionOptions,
) -> Result<ExecutionResult> {
let vars: HashMap<String, ast::Value> = vars.clone();
let selection = sub.selection_set.first().ok_or_else(|| {
AqlError::new(
ErrorCode::QueryError,
"Subscription must have a selection".to_string(),
)
})?;
if let Selection::Field(f) = selection {
let collection = f.name.clone();
let filter = extract_filter_from_args(&f.arguments)?;
let mut listener = db.pubsub.listen(&collection);
if let Some(f) = filter {
let event_filter = ast_filter_to_event_filter(&f, &vars)?;
listener = listener.filter(event_filter);
}
Ok(ExecutionResult::Subscription(SubscriptionResult {
subscription_id: uuid::Uuid::new_v4().to_string(),
collection,
stream: Some(listener),
}))
} else {
Err(AqlError::new(
ErrorCode::QueryError,
"Invalid subscription selection".to_string(),
))
}
}
fn ast_filter_to_event_filter(
filter: &AqlFilter,
vars: &HashMap<String, ast::Value>,
) -> Result<crate::pubsub::EventFilter> {
use crate::pubsub::EventFilter;
match filter {
AqlFilter::Eq(f, v) => Ok(EventFilter::FieldEquals(
f.clone(),
aql_value_to_db_value(v, vars)?,
)),
AqlFilter::Ne(f, v) => Ok(EventFilter::Ne(f.clone(), aql_value_to_db_value(v, vars)?)),
AqlFilter::Gt(f, v) => Ok(EventFilter::Gt(f.clone(), aql_value_to_db_value(v, vars)?)),
AqlFilter::Gte(f, v) => Ok(EventFilter::Gte(f.clone(), aql_value_to_db_value(v, vars)?)),
AqlFilter::Lt(f, v) => Ok(EventFilter::Lt(f.clone(), aql_value_to_db_value(v, vars)?)),
AqlFilter::Lte(f, v) => Ok(EventFilter::Lte(f.clone(), aql_value_to_db_value(v, vars)?)),
AqlFilter::In(f, v) => Ok(EventFilter::In(f.clone(), aql_value_to_db_value(v, vars)?)),
AqlFilter::NotIn(f, v) => Ok(EventFilter::NotIn(
f.clone(),
aql_value_to_db_value(v, vars)?,
)),
AqlFilter::Contains(f, v) => Ok(EventFilter::Contains(
f.clone(),
aql_value_to_db_value(v, vars)?,
)),
AqlFilter::ContainsAny(f, v) | AqlFilter::ContainsAll(f, v) => Ok(EventFilter::Contains(
f.clone(),
aql_value_to_db_value(v, vars)?,
)),
AqlFilter::StartsWith(f, v) => Ok(EventFilter::StartsWith(
f.clone(),
aql_value_to_db_value(v, vars)?,
)),
AqlFilter::EndsWith(f, v) => Ok(EventFilter::EndsWith(
f.clone(),
aql_value_to_db_value(v, vars)?,
)),
AqlFilter::Matches(f, v) => {
let pattern = match aql_value_to_db_value(v, vars)? {
crate::types::Value::String(s) => s,
other => other.to_string(),
};
let re = regex::Regex::new(&pattern).map_err(|e| {
crate::error::AqlError::invalid_operation(format!("Invalid regex pattern: {}", e))
})?;
Ok(EventFilter::Matches(f.clone(), re))
}
AqlFilter::IsNull(f) => Ok(EventFilter::IsNull(f.clone())),
AqlFilter::IsNotNull(f) => Ok(EventFilter::IsNotNull(f.clone())),
AqlFilter::And(filters) => {
let mut mapped = Vec::new();
for f in filters {
mapped.push(ast_filter_to_event_filter(f, vars)?);
}
Ok(EventFilter::And(mapped))
}
AqlFilter::Or(filters) => {
let mut mapped = Vec::new();
for f in filters {
mapped.push(ast_filter_to_event_filter(f, vars)?);
}
Ok(EventFilter::Or(mapped))
}
AqlFilter::Not(f) => Ok(EventFilter::Not(Box::new(ast_filter_to_event_filter(
f, vars,
)?))),
}
}
async fn execute_introspection(
db: &Aurora,
intro: &ast::IntrospectionQuery,
) -> Result<ExecutionResult> {
let names = db.list_collection_names();
let want_fields = intro.fields.is_empty()
|| intro
.fields
.iter()
.any(|f| f == "collections" || f == "fields");
let documents: Vec<Document> = names
.iter()
.filter_map(|name| {
if name.starts_with('_') {
return None;
}
let col = db.get_collection_definition(name).ok()?;
let mut data = HashMap::new();
data.insert("name".to_string(), Value::String(name.clone()));
if want_fields {
let field_list: Vec<Value> = col
.fields
.iter()
.map(|(fname, fdef)| {
let mut fdata = HashMap::new();
fdata.insert("name".to_string(), Value::String(fname.clone()));
fdata.insert(
"type".to_string(),
Value::String(fdef.field_type.to_string()),
);
fdata.insert("required".to_string(), Value::Bool(!fdef.nullable));
fdata.insert("indexed".to_string(), Value::Bool(fdef.indexed));
fdata.insert("unique".to_string(), Value::Bool(fdef.unique));
if !fdef.validations.is_empty() {
let vcons: Vec<Value> = fdef
.validations
.iter()
.map(|c| Value::String(format!("{:?}", c)))
.collect();
fdata.insert("validations".to_string(), Value::Array(vcons));
}
Value::Object(fdata)
})
.collect();
data.insert("fields".to_string(), Value::Array(field_list));
}
Some(Document {
_sid: name.clone(),
data,
})
})
.collect();
let count = documents.len();
Ok(ExecutionResult::Query(QueryResult {
collection: "__schema".to_string(),
documents,
total_count: Some(count),
deferred_fields: vec![],
explain: None,
}))
}
async fn execute_schema(
db: &Aurora,
schema: &ast::Schema,
_options: &ExecutionOptions,
) -> Result<ExecutionResult> {
let mut last_collection = String::new();
for op in &schema.operations {
match op {
ast::SchemaOp::DefineCollection {
name,
fields,
if_not_exists,
..
} => {
last_collection = name.clone();
if *if_not_exists {
if db.get_collection_definition(name).is_ok() {
continue;
}
}
let mut field_defs = Vec::new();
for field in fields {
field_defs.push((field.name.as_str(), build_field_def(field)));
}
db.new_collection(name, field_defs).await?;
}
ast::SchemaOp::AlterCollection { name, actions } => {
last_collection = name.clone();
for action in actions {
match action {
ast::AlterAction::AddField { field, default } => {
let def = build_field_def(field);
db.add_field_to_schema(name, field.name.clone(), def)
.await?;
if let Some(default_val) = default {
let db_val = aql_value_to_db_value(default_val, &HashMap::new())?;
let docs = db.get_all_collection(name).await?;
for doc in docs {
if !doc.data.contains_key(&field.name) {
db.update_document(
name,
&doc._sid,
vec![(&field.name, db_val.clone())],
)
.await?;
}
}
}
}
ast::AlterAction::DropField(field_name) => {
db.drop_field_from_schema(name, field_name.clone()).await?;
}
ast::AlterAction::RenameField { from, to } => {
db.rename_field_in_schema(name, from.clone(), to.clone())
.await?;
let docs = db.get_all_collection(name).await?;
for mut doc in docs {
if let Some(val) = doc.data.remove(from.as_str()) {
doc.data.insert(to.clone(), val);
let key = format!("{}:{}", name, doc._sid);
db.put(key, serde_json::to_vec(&doc)?, None).await?;
}
}
}
ast::AlterAction::ModifyField(field) => {
db.modify_field_in_schema(
name,
field.name.clone(),
build_field_def(field),
)
.await?;
}
}
}
}
ast::SchemaOp::DropCollection { name, .. } => {
db.drop_collection_schema(name).await?;
last_collection = name.clone();
}
}
}
Ok(ExecutionResult::Schema(SchemaResult {
operation: "schema".to_string(),
collection: last_collection,
status: "done".to_string(),
}))
}
fn map_ast_type(anno: &ast::TypeAnnotation) -> FieldType {
let scalar = match anno.name.to_lowercase().as_str() {
"string" => ScalarType::String,
"int" | "integer" => ScalarType::Int,
"float" | "double" => ScalarType::Float,
"bool" | "boolean" => ScalarType::Bool,
"uuid" => ScalarType::Uuid,
"object" => ScalarType::Object,
"array" => ScalarType::Array,
_ => ScalarType::Any,
};
if anno.is_array {
FieldType::Array(scalar)
} else {
match scalar {
ScalarType::Object => FieldType::Object,
ScalarType::Any => FieldType::Any,
_ => FieldType::Scalar(scalar),
}
}
}
fn parse_validate_directive(
directive: &ast::Directive,
) -> Vec<crate::types::FieldValidationConstraint> {
use crate::types::FieldValidationConstraint as FVC;
let mut constraints = Vec::new();
for arg in &directive.arguments {
match arg.name.as_str() {
"format" => {
if let ast::Value::String(s) = &arg.value {
constraints.push(FVC::Format(s.clone()));
}
}
"min" => {
let n = match &arg.value {
ast::Value::Float(f) => Some(*f),
ast::Value::Int(i) => Some(*i as f64),
_ => None,
};
if let Some(n) = n {
constraints.push(FVC::Min(n));
}
}
"max" => {
let n = match &arg.value {
ast::Value::Float(f) => Some(*f),
ast::Value::Int(i) => Some(*i as f64),
_ => None,
};
if let Some(n) = n {
constraints.push(FVC::Max(n));
}
}
"minLength" => {
if let ast::Value::Int(i) = &arg.value {
constraints.push(FVC::MinLength(*i));
}
}
"maxLength" => {
if let ast::Value::Int(i) = &arg.value {
constraints.push(FVC::MaxLength(*i));
}
}
"pattern" => {
if let ast::Value::String(s) = &arg.value {
constraints.push(FVC::Pattern(s.clone()));
}
}
_ => {}
}
}
constraints
}
fn build_field_def(field: &ast::FieldDef) -> FieldDefinition {
let field_type = map_ast_type(&field.field_type);
let mut indexed = false;
let mut unique = false;
let mut relation = None;
let mut validations = Vec::new();
for directive in &field.directives {
match directive.name.as_str() {
"indexed" | "index" => indexed = true,
"unique" => {
unique = true;
indexed = true;
}
"primary" => {
indexed = true;
unique = true;
}
"relation" => {
let to = directive
.arguments
.iter()
.find(|a| a.name == "to" || a.name == "collection")
.and_then(|a| {
if let ast::Value::String(s) = &a.value {
Some(s.clone())
} else {
None
}
})
.unwrap_or_default();
let key = directive
.arguments
.iter()
.find(|a| a.name == "key" || a.name == "field")
.and_then(|a| {
if let ast::Value::String(s) = &a.value {
Some(s.clone())
} else {
None
}
})
.unwrap_or_else(|| "id".to_string());
if !to.is_empty() {
relation = Some(crate::types::Relation { to, key });
}
}
"validate" => validations.extend(parse_validate_directive(directive)),
_ => {}
}
}
FieldDefinition {
field_type,
unique,
indexed,
nullable: !field.field_type.is_required,
validations,
relation,
}
}
async fn execute_migration(
db: &Aurora,
migration: &ast::Migration,
_options: &ExecutionOptions,
) -> Result<ExecutionResult> {
let mut steps_applied = 0;
let mut last_version = String::new();
for step in &migration.steps {
last_version = step.version.clone();
if db.is_migration_applied(&step.version).await? {
eprintln!(
"[migration] version '{}' already applied — skipping",
step.version
);
continue;
}
eprintln!("[migration] applying version '{}'", step.version);
for action in &step.actions {
match action {
ast::MigrationAction::Schema(schema_op) => {
execute_single_schema_op(db, schema_op).await?;
}
ast::MigrationAction::DataMigration(dm) => {
execute_data_migration(db, dm).await?;
}
}
}
db.mark_migration_applied(&step.version).await?;
steps_applied += 1;
eprintln!("[migration] version '{}' applied", step.version);
}
let status = if steps_applied > 0 {
"applied".to_string()
} else {
"skipped".to_string()
};
Ok(ExecutionResult::Migration(MigrationResult {
version: last_version,
steps_applied,
status,
}))
}
async fn execute_single_schema_op(db: &Aurora, op: &ast::SchemaOp) -> Result<()> {
match op {
ast::SchemaOp::DefineCollection {
name,
fields,
if_not_exists,
..
} => {
if *if_not_exists && db.get_collection_definition(name).is_ok() {
return Ok(());
}
let field_defs: Vec<(&str, FieldDefinition)> = fields
.iter()
.map(|f| (f.name.as_str(), build_field_def(f)))
.collect();
db.new_collection(name, field_defs).await?;
}
ast::SchemaOp::AlterCollection { name, actions } => {
for action in actions {
match action {
ast::AlterAction::AddField { field, default } => {
db.add_field_to_schema(name, field.name.clone(), build_field_def(field))
.await?;
if let Some(default_val) = default {
let db_val = aql_value_to_db_value(default_val, &HashMap::new())?;
let docs = db.get_all_collection(name).await?;
for doc in docs {
if !doc.data.contains_key(&field.name) {
db.update_document(
name,
&doc._sid,
vec![(&field.name, db_val.clone())],
)
.await?;
}
}
}
}
ast::AlterAction::DropField(field_name) => {
db.drop_field_from_schema(name, field_name.clone()).await?;
}
ast::AlterAction::RenameField { from, to } => {
db.rename_field_in_schema(name, from.clone(), to.clone())
.await?;
let docs = db.get_all_collection(name).await?;
for mut doc in docs {
if let Some(val) = doc.data.remove(from.as_str()) {
doc.data.insert(to.clone(), val);
let key = format!("{}:{}", name, doc._sid);
db.put(key, serde_json::to_vec(&doc)?, None).await?;
}
}
}
ast::AlterAction::ModifyField(field) => {
db.modify_field_in_schema(name, field.name.clone(), build_field_def(field))
.await?;
}
}
}
}
ast::SchemaOp::DropCollection { name, if_exists } => {
if *if_exists && db.get_collection_definition(name).is_err() {
return Ok(());
}
db.drop_collection_schema(name).await?;
}
}
Ok(())
}
async fn execute_data_migration(db: &Aurora, dm: &ast::DataMigration) -> Result<()> {
let docs = db.get_all_collection(&dm.collection).await?;
for doc in docs {
for transform in &dm.transforms {
let matches = match &transform.filter {
Some(filter) => {
let compiled = compile_filter(filter)?;
matches_filter(&doc, &compiled, &HashMap::new())
}
None => true,
};
if matches {
let new_value = eval_migration_expr(&transform.expression, &doc);
let mut updates = HashMap::new();
updates.insert(transform.field.clone(), new_value);
db.aql_update_document(&dm.collection, &doc._sid, updates)
.await?;
}
}
}
Ok(())
}
fn eval_migration_expr(expr: &str, doc: &Document) -> Value {
let expr = expr.trim();
if expr.starts_with('"') && expr.ends_with('"') && expr.len() >= 2 {
return Value::String(expr[1..expr.len() - 1].to_string());
}
if expr == "true" {
return Value::Bool(true);
}
if expr == "false" {
return Value::Bool(false);
}
if expr == "null" {
return Value::Null;
}
if let Ok(n) = expr.parse::<i64>() {
return Value::Int(n);
}
if let Ok(f) = expr.parse::<f64>() {
return Value::Float(f);
}
if let Some(v) = doc.data.get(expr) {
return v.clone();
}
Value::Null
}
fn extract_filter_from_args(args: &[ast::Argument]) -> Result<Option<AqlFilter>> {
for a in args {
if a.name == "where" || a.name == "filter" {
return Ok(Some(value_to_filter(&a.value)?));
}
}
Ok(None)
}
fn extract_order_by(args: &[ast::Argument]) -> Vec<ast::Ordering> {
let mut orderings = Vec::new();
for a in args {
if a.name == "orderBy" {
match &a.value {
ast::Value::String(f) => orderings.push(ast::Ordering {
field: f.clone(),
direction: ast::SortDirection::Asc,
}),
ast::Value::Object(obj) => {
if let (Some(ast::Value::String(field_name)), Some(dir_val)) =
(obj.get("field"), obj.get("direction"))
{
let direction = match dir_val {
ast::Value::Enum(s) | ast::Value::String(s) => {
if s.to_uppercase() == "DESC" {
ast::SortDirection::Desc
} else {
ast::SortDirection::Asc
}
}
_ => ast::SortDirection::Asc,
};
orderings.push(ast::Ordering {
field: field_name.clone(),
direction,
});
} else {
for (field, dir_val) in obj {
let direction = match dir_val {
ast::Value::Enum(s) | ast::Value::String(s) => {
if s.to_uppercase() == "DESC" {
ast::SortDirection::Desc
} else {
ast::SortDirection::Asc
}
}
_ => ast::SortDirection::Asc,
};
orderings.push(ast::Ordering {
field: field.clone(),
direction,
});
}
}
}
_ => {}
}
}
}
orderings
}
fn apply_ordering(docs: &mut [Document], orderings: &[ast::Ordering]) {
docs.sort_by(|a, b| {
for o in orderings {
let cmp = compare_values(a.data.get(&o.field), b.data.get(&o.field));
if cmp != std::cmp::Ordering::Equal {
return match o.direction {
ast::SortDirection::Asc => cmp,
ast::SortDirection::Desc => cmp.reverse(),
};
}
}
std::cmp::Ordering::Equal
});
}
fn compare_values(a: Option<&Value>, b: Option<&Value>) -> std::cmp::Ordering {
match (a, b) {
(None, None) => std::cmp::Ordering::Equal,
(None, Some(_)) => std::cmp::Ordering::Less,
(Some(_), None) => std::cmp::Ordering::Greater,
(Some(av), Some(bv)) => av.partial_cmp(bv).unwrap_or(std::cmp::Ordering::Equal),
}
}
pub fn extract_pagination(args: &[ast::Argument]) -> (Option<usize>, usize) {
let (mut limit, mut offset) = (None, 0);
for a in args {
match a.name.as_str() {
"limit" | "first" => {
if let ast::Value::Int(n) = a.value {
limit = Some(n as usize);
}
}
"offset" | "skip" => {
if let ast::Value::Int(n) = a.value {
offset = n as usize;
}
}
_ => {}
}
}
(limit, offset)
}
fn extract_cursor_pagination(
args: &[ast::Argument],
) -> (Option<usize>, Option<String>, Option<usize>, Option<String>) {
let (mut first, mut after, mut last, mut before) = (None, None, None, None);
for a in args {
match a.name.as_str() {
"first" => {
if let ast::Value::Int(n) = a.value {
first = Some(n as usize);
}
}
"after" => {
if let ast::Value::String(s) = &a.value {
after = Some(s.clone());
}
}
"last" => {
if let ast::Value::Int(n) = a.value {
last = Some(n as usize);
}
}
"before" => {
if let ast::Value::String(s) = &a.value {
before = Some(s.clone());
}
}
_ => {}
}
}
(first, after, last, before)
}
fn execute_connection(
mut docs: Vec<Document>,
sub_fields: &[ast::Field],
limit: Option<usize>,
fragments: &HashMap<String, FragmentDef>,
variables: &HashMap<String, ast::Value>,
options: &ExecutionOptions,
) -> Result<QueryResult> {
let has_next_page = if let Some(l) = limit {
docs.len() > l
} else {
false
};
if has_next_page {
docs.truncate(limit.unwrap());
}
let mut edges = Vec::with_capacity(docs.len());
let mut end_cursor = String::new();
let node_fields = if let Some(edges_field) = sub_fields.iter().find(|f| f.name == "edges") {
let edges_sub_fields =
collect_fields(&edges_field.selection_set, fragments, variables, None)
.unwrap_or_default();
if let Some(node_field) = edges_sub_fields.into_iter().find(|f| f.name == "node") {
collect_fields(&node_field.selection_set, fragments, variables, None)
.unwrap_or_default()
} else {
Vec::new()
}
} else {
Vec::new()
};
for doc in docs {
let cursor = doc._sid.clone();
end_cursor = cursor.clone();
let mut edge_data = HashMap::new();
edge_data.insert("cursor".to_string(), Value::String(cursor));
let node_doc = if node_fields.is_empty() {
doc
} else {
apply_projection(doc, &node_fields, options)?
};
edge_data.insert("node".to_string(), Value::Object(node_doc.data));
edges.push(Value::Object(edge_data));
}
let mut page_info = HashMap::new();
page_info.insert("hasNextPage".to_string(), Value::Bool(has_next_page));
page_info.insert("endCursor".to_string(), Value::String(end_cursor));
let mut conn_data = HashMap::new();
conn_data.insert("edges".to_string(), Value::Array(edges));
conn_data.insert("pageInfo".to_string(), Value::Object(page_info));
Ok(QueryResult {
collection: String::new(),
documents: vec![Document {
_sid: "connection".to_string(),
data: conn_data,
}],
total_count: None,
deferred_fields: vec![],
explain: None,
})
}
pub fn matches_filter(
doc: &Document,
filter: &CompiledFilter,
vars: &HashMap<String, ast::Value>,
) -> bool {
match filter {
CompiledFilter::Eq(f, v) => {
if let Some(dv) = doc.data.get(f) {
values_equal(dv, v, vars)
} else if f == "_sid" {
values_equal(&Value::String(doc._sid.clone()), v, vars)
} else {
false
}
}
CompiledFilter::Ne(f, v) => {
if let Some(dv) = doc.data.get(f) {
!values_equal(dv, v, vars)
} else if f == "_sid" {
!values_equal(&Value::String(doc._sid.clone()), v, vars)
} else {
true
}
}
CompiledFilter::Gt(f, v) => {
let dv_opt = doc.data.get(f).cloned().or_else(|| {
if f == "_sid" {
Some(Value::String(doc._sid.clone()))
} else {
None
}
});
dv_opt.map_or(false, |dv| {
if let Ok(bv) = aql_value_to_db_value(v, vars) {
return dv > bv;
}
false
})
}
CompiledFilter::Gte(f, v) => {
let dv_opt = doc.data.get(f).cloned().or_else(|| {
if f == "_sid" {
Some(Value::String(doc._sid.clone()))
} else {
None
}
});
dv_opt.map_or(false, |dv| {
if let Ok(bv) = aql_value_to_db_value(v, vars) {
return dv >= bv;
}
false
})
}
CompiledFilter::Lt(f, v) => {
let dv_opt = doc.data.get(f).cloned().or_else(|| {
if f == "_sid" {
Some(Value::String(doc._sid.clone()))
} else {
None
}
});
dv_opt.map_or(false, |dv| {
if let Ok(bv) = aql_value_to_db_value(v, vars) {
return dv < bv;
}
false
})
}
CompiledFilter::Lte(f, v) => {
let dv_opt = doc.data.get(f).cloned().or_else(|| {
if f == "_sid" {
Some(Value::String(doc._sid.clone()))
} else {
None
}
});
dv_opt.map_or(false, |dv| {
if let Ok(bv) = aql_value_to_db_value(v, vars) {
return dv <= bv;
}
false
})
}
CompiledFilter::In(f, v) => {
let dv_opt = doc.data.get(f).cloned().or_else(|| {
if f == "_sid" {
Some(Value::String(doc._sid.clone()))
} else {
None
}
});
dv_opt.map_or(false, |dv| {
if let Ok(Value::Array(arr)) = aql_value_to_db_value(v, vars) {
return arr.contains(&dv);
}
false
})
}
CompiledFilter::NotIn(f, v) => {
let dv_opt = doc.data.get(f).cloned().or_else(|| {
if f == "_sid" {
Some(Value::String(doc._sid.clone()))
} else {
None
}
});
dv_opt.map_or(true, |dv| {
if let Ok(Value::Array(arr)) = aql_value_to_db_value(v, vars) {
return !arr.contains(&dv);
}
true
})
}
CompiledFilter::Contains(f, v) => {
let dv_opt = doc.data.get(f).cloned().or_else(|| {
if f == "_sid" {
Some(Value::String(doc._sid.clone()))
} else {
None
}
});
if let Some(dv) = dv_opt {
match (dv, resolve_if_variable(v, vars)) {
(Value::String(s), ast::Value::String(sub)) => s.contains(&*sub),
(Value::Array(arr), _) => {
if let Ok(bv) = aql_value_to_db_value(v, vars) {
return arr.contains(&bv);
}
false
}
_ => false,
}
} else {
false
}
}
CompiledFilter::ContainsAny(f, v) => {
let dv_opt = doc.data.get(f).cloned().or_else(|| {
if f == "_sid" {
Some(Value::String(doc._sid.clone()))
} else {
None
}
});
if let (Some(Value::Array(field_arr)), Ok(Value::Array(check_arr))) =
(dv_opt, aql_value_to_db_value(v, vars))
{
check_arr.iter().any(|item| field_arr.contains(item))
} else {
false
}
}
CompiledFilter::ContainsAll(f, v) => {
let dv_opt = doc.data.get(f).cloned().or_else(|| {
if f == "_sid" {
Some(Value::String(doc._sid.clone()))
} else {
None
}
});
if let (Some(Value::Array(field_arr)), Ok(Value::Array(check_arr))) =
(dv_opt, aql_value_to_db_value(v, vars))
{
check_arr.iter().all(|item| field_arr.contains(item))
} else {
false
}
}
CompiledFilter::StartsWith(f, v) => {
let dv_opt = doc.data.get(f).cloned().or_else(|| {
if f == "_sid" {
Some(Value::String(doc._sid.clone()))
} else {
None
}
});
if let (Some(Value::String(s)), ast::Value::String(pre)) =
(dv_opt, resolve_if_variable(v, vars))
{
s.starts_with(&*pre)
} else {
false
}
}
CompiledFilter::EndsWith(f, v) => {
let dv_opt = doc.data.get(f).cloned().or_else(|| {
if f == "_sid" {
Some(Value::String(doc._sid.clone()))
} else {
None
}
});
if let (Some(Value::String(s)), ast::Value::String(suf)) =
(dv_opt, resolve_if_variable(v, vars))
{
s.ends_with(&*suf)
} else {
false
}
}
CompiledFilter::Matches(f, re) => {
let dv_opt = doc.data.get(f).cloned().or_else(|| {
if f == "_sid" {
Some(Value::String(doc._sid.clone()))
} else {
None
}
});
if let Some(Value::String(s)) = dv_opt {
re.is_match(&s)
} else {
false
}
}
CompiledFilter::IsNull(f) => {
if f == "_sid" {
false } else {
doc.data.get(f).map_or(true, |v| matches!(v, Value::Null))
}
}
CompiledFilter::IsNotNull(f) => {
if f == "_sid" {
true } else {
doc.data.get(f).map_or(false, |v| !matches!(v, Value::Null))
}
}
CompiledFilter::And(fs) => fs.iter().all(|f| matches_filter(doc, f, vars)),
CompiledFilter::Or(fs) => fs.iter().any(|f| matches_filter(doc, f, vars)),
CompiledFilter::Not(f) => !matches_filter(doc, f, vars),
}
}
fn apply_field_modifier(existing: Option<&Value>, new_val: &Value) -> Value {
if let Value::Object(modifier) = new_val {
if let Some(delta) = modifier.get("increment") {
match (existing, delta) {
(Some(Value::Int(c)), Value::Int(d)) => return Value::Int(c + d),
(Some(Value::Float(c)), Value::Float(d)) => return Value::Float(c + d),
(Some(Value::Int(c)), Value::Float(d)) => return Value::Float(*c as f64 + d),
_ => {}
}
}
if let Some(delta) = modifier.get("decrement") {
match (existing, delta) {
(Some(Value::Int(c)), Value::Int(d)) => return Value::Int(c - d),
(Some(Value::Float(c)), Value::Float(d)) => return Value::Float(c - d),
(Some(Value::Int(c)), Value::Float(d)) => return Value::Float(*c as f64 - d),
_ => {}
}
}
if let Some(item) = modifier.get("push") {
if let Some(Value::Array(mut arr)) = existing.cloned() {
arr.push(item.clone());
return Value::Array(arr);
}
return Value::Array(vec![item.clone()]);
}
if let Some(item) = modifier.get("pull") {
if let Some(Value::Array(arr)) = existing {
let filtered: Vec<Value> = arr.iter().filter(|v| *v != item).cloned().collect();
return Value::Array(filtered);
}
return Value::Array(vec![]);
}
if let Some(item) = modifier.get("addToSet") {
if let Some(Value::Array(mut arr)) = existing.cloned() {
if !arr.contains(item) {
arr.push(item.clone());
}
return Value::Array(arr);
}
return Value::Array(vec![item.clone()]);
}
}
new_val.clone()
}
fn values_equal(dv: &Value, av: &ast::Value, vars: &HashMap<String, ast::Value>) -> bool {
if let Ok(bv) = aql_value_to_db_value(av, vars) {
return dv == &bv;
}
false
}
fn resolve_if_variable<'a>(
v: &'a ast::Value,
vars: &'a HashMap<String, ast::Value>,
) -> &'a ast::Value {
if let ast::Value::Variable(n) = v {
vars.get(n).unwrap_or(v)
} else {
v
}
}
pub fn apply_projection(
doc: Document,
fields: &[ast::Field],
options: &ExecutionOptions,
) -> Result<Document> {
let (projected, _) = apply_projection_and_defer(doc, fields, &options.variables, options)?;
Ok(projected)
}
async fn apply_projection_with_lookups(
db: &Aurora,
mut doc: Document,
collection_name: &str,
fields: &[ast::Field],
fragments: &HashMap<String, FragmentDef>,
vars: &HashMap<String, ast::Value>,
options: &ExecutionOptions,
pinned_fields: &[String],
) -> Result<(Document, Vec<String>)> {
if fields.is_empty() {
return Ok((doc, vec![]));
}
let mut proj = HashMap::new();
let mut deferred = Vec::new();
for f in fields {
check_permissions(&f.directives, options, false)?;
if f.directives.iter().any(|d| d.name == "defer") {
deferred.push(f.alias.as_ref().unwrap_or(&f.name).clone());
continue;
}
let mut lookup_info: Option<(String, String, String)> = None;
let coll_arg = f.arguments.iter().find(|a| a.name == "collection");
let local_arg = f.arguments.iter().find(|a| a.name == "localField");
let foreign_arg = f.arguments.iter().find(|a| a.name == "foreignField");
if let (Some(c), Some(lf), Some(ff)) = (coll_arg, local_arg, foreign_arg) {
let c_val = resolve_if_variable(&c.value, vars);
let lf_val = resolve_if_variable(&lf.value, vars);
let ff_val = resolve_if_variable(&ff.value, vars);
if let (
ast::Value::String(foreign_coll),
ast::Value::String(local_field),
ast::Value::String(foreign_field),
) = (c_val, lf_val, ff_val)
{
lookup_info = Some((
foreign_coll.clone(),
local_field.clone(),
foreign_field.clone(),
));
}
} else if let Ok(col_def) = db.get_collection_definition(collection_name) {
if let Some(f_def) = col_def.fields.get(&f.name) {
if let Some(rel) = &f_def.relation {
lookup_info = Some((rel.to.clone(), f.name.clone(), rel.key.clone()));
}
}
}
if let Some((foreign_coll, local_field, foreign_field)) = lookup_info {
let local_val = doc.data.get(local_field.as_str()).cloned().or_else(|| {
if local_field == "_sid" {
Some(Value::String(doc._sid.clone()))
} else {
None
}
});
if let Some(match_val) = local_val {
let extra_filter = f
.arguments
.iter()
.find(|a| a.name == "where")
.and_then(|a| {
let resolved = resolve_ast_deep(&a.value, vars);
value_to_filter(&resolved).ok()
});
let vars_arc = Arc::new(vars.clone());
let foreign_docs = db.scan_and_filter(
&foreign_coll,
|fdoc| {
let field_match = fdoc
.data
.get(foreign_field.as_str())
.map(|v| values_equal_db(v, &match_val))
.unwrap_or(
foreign_field == "_sid" && fdoc._sid == match_val.to_string(),
);
if !field_match {
return false;
}
if let Some(ref ef) = extra_filter {
let compiled = compile_filter(ef)
.unwrap_or(CompiledFilter::Eq("_".into(), ast::Value::Null));
matches_filter(fdoc, &compiled, &vars_arc)
} else {
true
}
},
None,
)?;
let sub_projected: Vec<Value> = if f.selection_set.is_empty() {
foreign_docs
.into_iter()
.map(|fd| Value::Object(fd.data))
.collect()
} else {
let sub_fields: Vec<ast::Field> =
collect_fields(&f.selection_set, fragments, vars, Some(&foreign_coll))?;
let mut sub_results = Vec::with_capacity(foreign_docs.len());
for fd in foreign_docs {
let (proj_fd, _) =
apply_projection_and_defer(fd, &sub_fields, vars, options)?;
sub_results.push(Value::Object(proj_fd.data));
}
sub_results
};
proj.insert(
f.alias.as_ref().unwrap_or(&f.name).clone(),
Value::Array(sub_projected),
);
}
continue;
}
if f.name == "__compute__" {
let alias = f.alias.as_deref().unwrap_or("computed");
if let Some(expr_arg) = f.arguments.iter().find(|a| a.name == "expr") {
if let ast::Value::String(template) = &expr_arg.value {
let result = eval_template(template, &doc.data);
proj.insert(alias.to_string(), Value::String(result));
} else if let Some(expr) = f.computed_expression.as_ref() {
let result = eval_expression(expr, &doc.data, vars);
proj.insert(alias.to_string(), result);
}
}
continue;
}
if f.name == "id" {
let v = doc
.data
.get("id")
.cloned()
.unwrap_or(Value::String(doc._sid.clone()));
proj.insert(f.alias.as_ref().unwrap_or(&f.name).clone(), v);
} else if f.name == "_sid" {
proj.insert(
f.alias.as_ref().unwrap_or(&f.name).clone(),
Value::String(doc._sid.clone()),
);
} else if let Some(v) = doc.data.get(&f.name) {
proj.insert(f.alias.as_ref().unwrap_or(&f.name).clone(), v.clone());
}
}
for pinned in pinned_fields {
if !proj.contains_key(pinned) {
if let Some(v) = doc.data.get(pinned) {
proj.insert(pinned.clone(), v.clone());
}
}
}
doc.data = proj;
Ok((doc, deferred))
}
fn values_equal_db(a: &Value, b: &Value) -> bool {
a == b
}
pub fn aql_value_to_db_value(v: &ast::Value, vars: &HashMap<String, ast::Value>) -> Result<Value> {
let resolved = resolve_if_variable(v, vars);
match resolved {
ast::Value::Int(i) => Ok(Value::Int(*i)),
ast::Value::Float(f) => Ok(Value::Float(*f)),
ast::Value::Boolean(b) => Ok(Value::Bool(*b)),
ast::Value::String(s) => Ok(Value::String(s.clone())),
ast::Value::Enum(s) => Ok(Value::String(s.clone())),
ast::Value::Null => Ok(Value::Null),
ast::Value::Variable(name) => Err(AqlError::new(
ErrorCode::UndefinedVariable,
format!("Variable '{}' not found", name),
)),
ast::Value::Array(arr) => {
let mut vals = Vec::with_capacity(arr.len());
for v in arr {
vals.push(aql_value_to_db_value(v, vars)?);
}
Ok(Value::Array(vals))
}
ast::Value::Object(obj) => {
let mut map = HashMap::with_capacity(obj.len());
for (k, v) in obj {
map.insert(k.clone(), aql_value_to_db_value(v, vars)?);
}
Ok(Value::Object(map))
}
}
}
fn aql_value_to_json(v: &ast::Value) -> serde_json::Value {
match v {
ast::Value::Null => serde_json::Value::Null,
ast::Value::Boolean(b) => serde_json::Value::Bool(*b),
ast::Value::Int(i) => serde_json::Value::Number((*i).into()),
ast::Value::Float(f) => serde_json::Number::from_f64(*f)
.map(serde_json::Value::Number)
.unwrap_or(serde_json::Value::Null),
ast::Value::String(s) | ast::Value::Enum(s) => serde_json::Value::String(s.clone()),
ast::Value::Array(arr) => {
serde_json::Value::Array(arr.iter().map(aql_value_to_json).collect())
}
ast::Value::Object(obj) => serde_json::Value::Object(
obj.iter()
.map(|(k, v)| (k.clone(), aql_value_to_json(v)))
.collect(),
),
ast::Value::Variable(_) => serde_json::Value::Null,
}
}
fn aql_value_to_hashmap(
v: &ast::Value,
vars: &HashMap<String, ast::Value>,
) -> Result<HashMap<String, Value>> {
if let ast::Value::Object(m) = resolve_if_variable(v, vars) {
let mut res = HashMap::new();
for (k, v) in m {
res.insert(k.clone(), aql_value_to_db_value(v, vars)?);
}
Ok(res)
} else {
Err(AqlError::new(
ErrorCode::QueryError,
"Data must be object".to_string(),
))
}
}
fn aurora_value_to_json_value(v: &Value) -> JsonValue {
match v {
Value::Null => JsonValue::Null,
Value::String(s) => JsonValue::String(s.clone()),
Value::Int(i) => JsonValue::Number((*i).into()),
Value::Float(f) => serde_json::Number::from_f64(*f)
.map(JsonValue::Number)
.unwrap_or(JsonValue::Null),
Value::Bool(b) => JsonValue::Bool(*b),
Value::Array(arr) => JsonValue::Array(arr.iter().map(aurora_value_to_json_value).collect()),
Value::Object(m) => {
let mut jm = serde_json::Map::new();
for (k, v) in m {
jm.insert(k.clone(), aurora_value_to_json_value(v));
}
JsonValue::Object(jm)
}
Value::Uuid(u) => JsonValue::String(u.to_string()),
Value::DateTime(dt) => JsonValue::String(dt.to_rfc3339()),
}
}
fn find_indexed_equality_filter(
filter: &AqlFilter,
db: &Aurora,
collection: &str,
) -> Option<(String, ast::Value)> {
match filter {
AqlFilter::Eq(field, val) => {
if field == "_sid" || db.has_index(collection, field) {
Some((field.clone(), val.clone()))
} else {
None
}
}
AqlFilter::And(filters) => {
for f in filters {
if let Some(res) = find_indexed_equality_filter(f, db, collection) {
return Some(res);
}
}
None
}
_ => None,
}
}
pub fn value_to_filter(v: &ast::Value) -> Result<AqlFilter> {
if let ast::Value::Object(m) = v {
let mut fs = Vec::new();
for (k, val) in m {
match k.as_str() {
"or" => {
if let ast::Value::Array(arr) = val {
let mut sub = Vec::new();
for item in arr {
sub.push(value_to_filter(item)?);
}
return Ok(AqlFilter::Or(sub));
}
}
"and" => {
if let ast::Value::Array(arr) = val {
let mut sub = Vec::new();
for item in arr {
sub.push(value_to_filter(item)?);
}
return Ok(AqlFilter::And(sub));
}
}
"not" => {
return Ok(AqlFilter::Not(Box::new(value_to_filter(val)?)));
}
field => {
if let ast::Value::Object(ops) = val {
for (op, ov) in ops {
match op.as_str() {
"eq" => fs.push(AqlFilter::Eq(field.to_string(), ov.clone())),
"ne" => fs.push(AqlFilter::Ne(field.to_string(), ov.clone())),
"gt" => fs.push(AqlFilter::Gt(field.to_string(), ov.clone())),
"gte" => fs.push(AqlFilter::Gte(field.to_string(), ov.clone())),
"lt" => fs.push(AqlFilter::Lt(field.to_string(), ov.clone())),
"lte" => fs.push(AqlFilter::Lte(field.to_string(), ov.clone())),
"in" => fs.push(AqlFilter::In(field.to_string(), ov.clone())),
"notin" => fs.push(AqlFilter::NotIn(field.to_string(), ov.clone())),
"contains" => {
fs.push(AqlFilter::Contains(field.to_string(), ov.clone()))
}
"containsAny" => {
fs.push(AqlFilter::ContainsAny(field.to_string(), ov.clone()))
}
"containsAll" => {
fs.push(AqlFilter::ContainsAll(field.to_string(), ov.clone()))
}
"startsWith" => {
fs.push(AqlFilter::StartsWith(field.to_string(), ov.clone()))
}
"endsWith" => {
fs.push(AqlFilter::EndsWith(field.to_string(), ov.clone()))
}
"matches" => {
fs.push(AqlFilter::Matches(field.to_string(), ov.clone()))
}
_ => {}
}
}
}
}
}
}
if fs.is_empty() {
Ok(AqlFilter::And(vec![]))
} else if fs.len() == 1 {
Ok(fs.remove(0))
} else {
Ok(AqlFilter::And(fs))
}
} else {
Err(AqlError::new(
ErrorCode::QueryError,
"Filter must be object".to_string(),
))
}
}
fn resolve_value(
v: &ast::Value,
vars: &HashMap<String, ast::Value>,
_ctx: &ExecutionContext,
) -> ast::Value {
match v {
ast::Value::Variable(n) => vars.get(n).cloned().unwrap_or(ast::Value::Null),
_ => v.clone(),
}
}
fn resolve_ast_deep(v: &ast::Value, vars: &HashMap<String, ast::Value>) -> ast::Value {
match v {
ast::Value::Variable(n) => vars.get(n).cloned().unwrap_or(ast::Value::Null),
ast::Value::Object(m) => ast::Value::Object(
m.iter()
.map(|(k, val)| (k.clone(), resolve_ast_deep(val, vars)))
.collect(),
),
ast::Value::Array(arr) => {
ast::Value::Array(arr.iter().map(|val| resolve_ast_deep(val, vars)).collect())
}
_ => v.clone(),
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{Aurora, AuroraConfig, DurabilityMode, FieldType};
use tempfile::TempDir;
#[tokio::test]
async fn test_executor_integration() {
let td = TempDir::new().unwrap();
let db = Aurora::with_config(AuroraConfig {
db_path: td.path().join("test.db"),
enable_write_buffering: false,
durability_mode: DurabilityMode::Strict,
..Default::default()
})
.await
.unwrap();
db.new_collection(
"users",
vec![(
"name",
crate::types::FieldDefinition {
field_type: FieldType::SCALAR_STRING,
unique: false,
indexed: true,
nullable: true,
..Default::default()
},
)],
)
.await
.unwrap();
let _ = execute(
&db,
r#"mutation { insertInto(collection: "users", data: { name: "Alice" }) { id name } }"#,
ExecutionOptions::new(),
)
.await
.unwrap();
let res = execute(&db, "query { users { name } }", ExecutionOptions::new())
.await
.unwrap();
if let ExecutionResult::Query(q) = res {
assert_eq!(q.documents.len(), 1);
} else {
panic!("Expected query");
}
}
}