use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use super::core::Expr;
use super::{schema::ExprSchema, ExprDataType};
use crate::distributed::core::dataframe::DistributedDataFrame;
use crate::distributed::execution::{ExecutionPlan, Operation};
use crate::error::Result;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UdfDefinition {
pub name: String,
pub return_type: ExprDataType,
pub parameter_types: Vec<ExprDataType>,
pub body: String,
}
impl UdfDefinition {
pub fn new(
name: impl Into<String>,
return_type: ExprDataType,
parameter_types: Vec<ExprDataType>,
body: impl Into<String>,
) -> Self {
Self {
name: name.into(),
return_type,
parameter_types,
body: body.into(),
}
}
pub fn to_sql(&self) -> String {
let mut params = Vec::with_capacity(self.parameter_types.len());
for (i, param_type) in self.parameter_types.iter().enumerate() {
params.push(format!("param{} {}", i, param_type));
}
format!(
"CREATE FUNCTION {} ({}) RETURNS {} AS '{}'",
self.name,
params.join(", "),
self.return_type,
self.body
)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ColumnProjection {
pub expr: Expr,
pub alias: Option<String>,
}
impl ColumnProjection {
pub fn new(expr: Expr, alias: Option<impl Into<String>>) -> Self {
Self {
expr,
alias: alias.map(|a| a.into()),
}
}
pub fn with_alias(expr: Expr, alias: impl Into<String>) -> Self {
Self {
expr,
alias: Some(alias.into()),
}
}
pub fn column(name: impl Into<String>) -> Self {
Self {
expr: Expr::col(name),
alias: None,
}
}
pub fn to_sql(&self) -> String {
match &self.alias {
Some(alias) => format!("{} AS {}", self.expr, alias),
None => format!("{}", self.expr),
}
}
pub fn output_name(&self) -> String {
match &self.alias {
Some(alias) => alias.clone(),
None => match &self.expr {
Expr::Column(name) => name.clone(),
_ => {
let expr_str = format!("{:?}", self.expr);
format!(
"expr_{}",
expr_str
.chars()
.filter(|c| c.is_alphanumeric())
.collect::<String>()
)
}
},
}
}
}
pub trait ProjectionExt {
fn select_expr(&self, projections: &[ColumnProjection]) -> Result<DistributedDataFrame>;
fn with_column(&self, name: impl Into<String>, expr: Expr) -> Result<DistributedDataFrame>;
fn filter_expr(&mut self, expr: Expr) -> Result<DistributedDataFrame>;
fn create_udf(&self, udfs: &[UdfDefinition]) -> Result<DistributedDataFrame>;
fn validate_projections(
&self,
projections: &[ColumnProjection],
schema: &ExprSchema,
) -> Result<()>;
}
impl ProjectionExt for DistributedDataFrame {
fn select_expr(&self, projections: &[ColumnProjection]) -> Result<DistributedDataFrame> {
let operation = Operation::Custom {
name: "select_expr".to_string(),
params: [(
"projections".to_string(),
serde_json::to_string(projections).unwrap_or_default(),
)]
.iter()
.cloned()
.collect(),
};
if self.is_lazy() {
let mut new_df = self.clone_empty();
let mut plan = ExecutionPlan::new(&self.id().to_string());
plan.add_operation(operation);
new_df.add_pending_operation(plan, vec![self.id().to_string()]);
Ok(new_df)
} else {
let mut plan = ExecutionPlan::new(&self.id().to_string());
plan.add_operation(operation);
self.execute_operation(plan, vec![self.id().to_string()])
}
}
fn with_column(&self, name: impl Into<String>, expr: Expr) -> Result<DistributedDataFrame> {
let name = name.into();
let projection = ColumnProjection::with_alias(expr, name.clone());
let operation = Operation::Custom {
name: "with_column".to_string(),
params: [
("column_name".to_string(), name),
(
"projection".to_string(),
serde_json::to_string(&projection).unwrap_or_default(),
),
]
.iter()
.cloned()
.collect(),
};
if self.is_lazy() {
let mut new_df = self.clone_empty();
let mut plan = ExecutionPlan::new(&self.id().to_string());
plan.add_operation(operation);
new_df.add_pending_operation(plan, vec![self.id().to_string()]);
Ok(new_df)
} else {
let mut plan = ExecutionPlan::new(&self.id().to_string());
plan.add_operation(operation);
self.execute_operation(plan, vec![self.id().to_string()])
}
}
fn filter_expr(&mut self, expr: Expr) -> Result<DistributedDataFrame> {
let filter_sql = format!("{:?}", expr);
self.filter(&filter_sql)
}
fn create_udf(&self, udfs: &[UdfDefinition]) -> Result<DistributedDataFrame> {
let operation = Operation::Custom {
name: "create_udf".to_string(),
params: [(
"udfs".to_string(),
serde_json::to_string(udfs).unwrap_or_default(),
)]
.iter()
.cloned()
.collect(),
};
if self.is_lazy() {
let mut new_df = self.clone_empty();
let mut plan = ExecutionPlan::new(&self.id().to_string());
plan.add_operation(operation);
new_df.add_pending_operation(plan, vec![self.id().to_string()]);
Ok(new_df)
} else {
let mut plan = ExecutionPlan::new(&self.id().to_string());
plan.add_operation(operation);
self.execute_operation(plan, vec![self.id().to_string()])
}
}
fn validate_projections(
&self,
projections: &[ColumnProjection],
schema: &ExprSchema,
) -> Result<()> {
Ok(())
}
}