use crate::input::proto::substrait;
use crate::output::diagnostic;
use crate::output::extension;
use crate::output::type_system::data;
use crate::parse::context;
use crate::parse::expressions;
use crate::parse::extensions;
use crate::parse::sorts;
use crate::parse::types;
use crate::util;
use crate::util::string::Describe;
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum FunctionType {
Scalar,
Aggregate,
Window,
}
#[derive(Clone, Debug, PartialEq, Default)]
pub enum FunctionArgument {
#[default]
Unresolved,
Value(data::Type, expressions::Expression),
Type(data::Type),
Enum(String),
}
impl Describe for FunctionArgument {
fn describe(
&self,
f: &mut std::fmt::Formatter<'_>,
limit: util::string::Limit,
) -> std::fmt::Result {
match self {
FunctionArgument::Unresolved => write!(f, "!"),
FunctionArgument::Value(_, e) => e.describe(f, limit),
FunctionArgument::Type(t) => t.describe(f, limit),
FunctionArgument::Enum(s) => util::string::describe_identifier(f, s, limit),
}
}
}
impl std::fmt::Display for FunctionArgument {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.display().fmt(f)
}
}
#[derive(Clone, Debug)]
#[allow(dead_code)]
pub struct FunctionOption {
pub name: String,
pub preference: Vec<String>,
}
#[derive(Clone, Debug)]
#[allow(dead_code)]
pub struct FunctionContext {
pub function_type: FunctionType,
pub arguments: Vec<FunctionArgument>,
pub options: Vec<FunctionOption>,
pub return_type: data::Type,
}
#[derive(Clone, Debug)]
#[allow(dead_code)]
pub struct FunctionBinding {
pub function: extension::simple::function::Reference,
pub expression: expressions::Expression,
pub return_type: data::Type,
}
impl FunctionBinding {
pub fn new(
functions: Option<&extension::simple::function::ResolutionResult>,
function_context: &FunctionContext,
parse_context: &mut context::Context,
) -> FunctionBinding {
let function = if let Some(functions) = functions {
diagnostic!(
parse_context,
Warning,
NotYetImplemented,
"matching function calls with their definitions"
);
functions.as_item()
} else {
Default::default()
};
let name = function.name.to_string();
FunctionBinding {
function,
expression: expressions::Expression::Function(name, function_context.arguments.clone()),
return_type: function_context.return_type.clone(),
}
}
}
fn parse_function_argument_type(
x: &substrait::function_argument::ArgType,
y: &mut context::Context,
) -> diagnostic::Result<FunctionArgument> {
match x {
substrait::function_argument::ArgType::Enum(x) => Ok(FunctionArgument::Enum(x.to_string())),
substrait::function_argument::ArgType::Type(x) => {
types::parse_type(x, y)?;
Ok(FunctionArgument::Type(y.data_type()))
}
substrait::function_argument::ArgType::Value(x) => {
let expression = expressions::parse_expression(x, y)?;
Ok(FunctionArgument::Value(y.data_type(), expression))
}
}
}
fn parse_function_argument(
x: &substrait::FunctionArgument,
y: &mut context::Context,
) -> diagnostic::Result<FunctionArgument> {
Ok(
proto_required_field!(x, y, arg_type, parse_function_argument_type)
.1
.unwrap_or_default(),
)
}
fn parse_function_option(
x: &substrait::FunctionOption,
y: &mut context::Context,
) -> diagnostic::Result<FunctionOption> {
proto_primitive_field!(x, y, name);
proto_required_repeated_field!(x, y, preference);
if x.preference.is_empty() {
let err = cause!(IllegalValue, "at least one option must be specified");
diagnostic!(y, Error, err.clone());
comment!(
y,
"To leave an option unspecified, simply don't add an entry to `options`"
);
Err(err)
} else {
Ok(FunctionOption {
name: x.name.clone(),
preference: x.preference.clone(),
})
}
}
fn parse_legacy_function_argument(
x: &substrait::Expression,
y: &mut context::Context,
) -> diagnostic::Result<FunctionArgument> {
expressions::parse_legacy_function_argument(x, y).map(|x| match x {
expressions::ExpressionOrEnum::Value(x) => FunctionArgument::Value(y.data_type(), x),
expressions::ExpressionOrEnum::Enum(x) => match x {
Some(x) => FunctionArgument::Enum(x),
None => {
diagnostic!(
y,
Error,
Deprecation,
"support for optional enum arguments was removed in Substrait 0.20.0 (#342)"
);
FunctionArgument::Unresolved
}
},
})
}
fn handle_legacy_arguments(
y: &mut context::Context,
arguments: Vec<FunctionArgument>,
legacy_arguments: Vec<FunctionArgument>,
) -> Vec<FunctionArgument> {
if legacy_arguments.is_empty() {
arguments
} else if arguments.is_empty() {
diagnostic!(
y,
Warning,
Deprecation,
"the args field for specifying function arguments was deprecated Substrait 0.3.0 (#161)"
);
legacy_arguments
} else {
if arguments != legacy_arguments {
diagnostic!(
y,
Error,
IllegalValue,
"mismatch between v0.3+ and legacy function argument specification"
);
comment!(
y,
"If both the v0.3+ and legacy syntax is used to specify function \
arguments, please make sure both map to the same arguments. If \
the argument pack cannot be represented using the legacy syntax, \
do not use it."
);
}
arguments
}
}
pub fn parse_scalar_function(
x: &substrait::expression::ScalarFunction,
y: &mut context::Context,
) -> diagnostic::Result<expressions::Expression> {
let functions = proto_primitive_field!(
x,
y,
function_reference,
extensions::simple::parse_function_reference
)
.1;
#[allow(deprecated)]
let legacy_arguments = proto_repeated_field!(x, y, args, parse_legacy_function_argument)
.1
.into_iter()
.map(|x| x.unwrap_or_default())
.collect();
let arguments = proto_repeated_field!(x, y, arguments, parse_function_argument)
.1
.into_iter()
.map(|x| x.unwrap_or_default())
.collect();
let options = proto_repeated_field!(x, y, options, parse_function_option)
.1
.into_iter()
.flatten()
.collect();
let return_type = proto_required_field!(x, y, output_type, types::parse_type)
.0
.data_type();
let arguments = handle_legacy_arguments(y, arguments, legacy_arguments);
let context = FunctionContext {
function_type: FunctionType::Scalar,
arguments,
options,
return_type,
};
let binding = FunctionBinding::new(functions.as_ref(), &context, y);
y.set_data_type(binding.return_type);
describe!(y, Expression, "{}", binding.expression);
summary!(y, "Scalar function call: {:#}", binding.expression);
Ok(binding.expression)
}
fn parse_bound(
_x: &substrait::expression::window_function::Bound,
y: &mut context::Context,
) -> diagnostic::Result<()> {
diagnostic!(
y,
Warning,
NotYetImplemented,
"validation of window function bounds"
);
Ok(())
}
pub fn parse_window_function(
x: &substrait::expression::WindowFunction,
y: &mut context::Context,
) -> diagnostic::Result<expressions::Expression> {
let functions = proto_primitive_field!(
x,
y,
function_reference,
extensions::simple::parse_function_reference
)
.1;
#[allow(deprecated)]
let legacy_arguments = proto_repeated_field!(x, y, args, parse_legacy_function_argument)
.1
.into_iter()
.map(|x| x.unwrap_or_default())
.collect();
let arguments = proto_repeated_field!(x, y, arguments, parse_function_argument)
.1
.into_iter()
.map(|x| x.unwrap_or_default())
.collect();
let options = proto_repeated_field!(x, y, options, parse_function_option)
.1
.into_iter()
.flatten()
.collect();
let return_type = proto_required_field!(x, y, output_type, types::parse_type)
.0
.data_type();
proto_repeated_field!(x, y, partitions, expressions::parse_expression);
proto_repeated_field!(x, y, sorts, sorts::parse_sort_field);
proto_field!(x, y, upper_bound, parse_bound);
proto_field!(x, y, lower_bound, parse_bound);
proto_enum_field!(x, y, phase, substrait::AggregationPhase);
let arguments = handle_legacy_arguments(y, arguments, legacy_arguments);
let context = FunctionContext {
function_type: FunctionType::Window,
arguments,
options,
return_type,
};
let binding = FunctionBinding::new(functions.as_ref(), &context, y);
y.set_data_type(binding.return_type);
describe!(y, Expression, "{}", binding.expression);
summary!(y, "Window function call: {:#}", binding.expression);
Ok(binding.expression)
}
pub fn parse_aggregate_function(
x: &substrait::AggregateFunction,
y: &mut context::Context,
) -> diagnostic::Result<expressions::Expression> {
let functions = proto_primitive_field!(
x,
y,
function_reference,
extensions::simple::parse_function_reference
)
.1;
#[allow(deprecated)]
let legacy_arguments = proto_repeated_field!(x, y, args, parse_legacy_function_argument)
.1
.into_iter()
.map(|x| x.unwrap_or_default())
.collect();
let arguments = proto_repeated_field!(x, y, arguments, parse_function_argument)
.1
.into_iter()
.map(|x| x.unwrap_or_default())
.collect();
let options = proto_repeated_field!(x, y, options, parse_function_option)
.1
.into_iter()
.flatten()
.collect();
let return_type = proto_required_field!(x, y, output_type, types::parse_type)
.0
.data_type();
proto_repeated_field!(x, y, sorts, sorts::parse_sort_field);
proto_enum_field!(x, y, phase, substrait::AggregationPhase);
proto_enum_field!(
x,
y,
invocation,
substrait::aggregate_function::AggregationInvocation
);
let arguments = handle_legacy_arguments(y, arguments, legacy_arguments);
let context = FunctionContext {
function_type: FunctionType::Aggregate,
arguments,
options,
return_type,
};
let binding = FunctionBinding::new(functions.as_ref(), &context, y);
y.set_data_type(binding.return_type);
describe!(y, Expression, "{}", binding.expression);
summary!(y, "Aggregate function call: {:#}", binding.expression);
Ok(binding.expression)
}