use crate::errors::journal::ControllerError;
use anyhow::{Error as AnyError, anyhow};
use arrow::array::Array;
use datafusion::common::arrow::array::{AsArray, RecordBatch};
use datafusion::logical_expr::sqlparser::parser::ParserError;
use datafusion::prelude::{SQLOptions, SessionContext};
use datafusion::sql::sqlparser::dialect::GenericDialect;
use datafusion::sql::sqlparser::parser::Parser;
use feldera_types::program_schema::{ColumnType, Field, Relation, SqlType};
pub async fn execute_query_collect(
datafusion: &SessionContext,
query: &str,
) -> Result<Vec<RecordBatch>, AnyError> {
let options = SQLOptions::new()
.with_allow_ddl(false)
.with_allow_dml(false);
let df = datafusion
.sql_with_options(query, options)
.await
.map_err(|e| anyhow!("error compiling query '{query}': {e}"))?;
df.collect()
.await
.map_err(|e| anyhow!("error executing query '{query}': {e}"))
}
pub async fn execute_singleton_query(
datafusion: &SessionContext,
query: &str,
) -> Result<String, AnyError> {
let result = execute_query_collect(datafusion, query).await?;
if result.len() != 1 {
return Err(anyhow!(
"internal error: query '{query}' returned {} batches; expected: 1",
result.len()
));
}
if result[0].num_rows() != 1 {
return Err(anyhow!(
"internal error: query '{query}' returned {} rows; expected: 1",
result[0].num_rows()
));
}
if result[0].num_columns() != 1 {
return Err(anyhow!(
"internal error: query '{query}' returned {} columns; expected: 1",
result[0].num_columns()
));
}
let column0 = result[0].column(0);
array_to_string(column0).ok_or_else(|| {
anyhow!("internal error: cannot retrieve the output of query '{query}' as a string")
})
}
pub fn array_to_string(array: &dyn Array) -> Option<String> {
if let Some(string_view_array) = array.as_string_view_opt() {
Some(string_view_array.value(0).to_string())
} else {
array
.as_string_opt::<i32>()
.map(|array| array.value(0).to_string())
}
}
pub fn validate_sql_expression(expr: &str) -> Result<(), ParserError> {
let mut parser = Parser::new(&GenericDialect).try_with_sql(expr)?;
parser.parse_expr()?;
Ok(())
}
pub fn timestamp_to_sql_expression(column_type: &ColumnType, expr: &str) -> String {
match column_type.typ {
SqlType::Timestamp => format!("timestamp '{expr}'"),
SqlType::Date => format!("date '{expr}'"),
_ => expr.to_string(),
}
}
pub fn validate_timestamp_type(
endpoint_name: &str,
timestamp: &Field,
docs: &str,
) -> Result<(), ControllerError> {
if !timestamp.columntype.is_integral_type()
&& !matches!(
×tamp.columntype.typ,
SqlType::Date | SqlType::Timestamp
)
{
return Err(ControllerError::invalid_transport_configuration(
endpoint_name,
&format!(
"timestamp column '{}' has unsupported type {}; supported types for 'timestamp_column' are integer types, DATE, and TIMESTAMP; {docs}",
timestamp.name,
serde_json::to_string(×tamp.columntype).unwrap()
),
));
}
Ok(())
}
pub async fn validate_timestamp_column(
endpoint_name: &str,
timestamp_column: &str,
datafusion: &SessionContext,
schema: &Relation,
docs: &str,
) -> Result<(), ControllerError> {
let Some(field) = schema.field(timestamp_column) else {
return Err(ControllerError::invalid_transport_configuration(
endpoint_name,
&format!("timestamp column '{timestamp_column}' not found in table schema"),
));
};
validate_timestamp_type(endpoint_name, field, docs)?;
let Some(lateness) = &field.lateness else {
return Err(ControllerError::invalid_transport_configuration(
endpoint_name,
&format!(
"timestamp column '{timestamp_column}' does not have a LATENESS attribute; {docs}"
),
));
};
validate_sql_expression(lateness).map_err(|e|
ControllerError::invalid_transport_configuration(
endpoint_name,
&format!("error parsing LATENESS attribute '{lateness}' of the timestamp column '{timestamp_column}': {e}; {docs}"),
),
)?;
let is_zero = execute_singleton_query(
datafusion,
&format!("select cast((({lateness} + {lateness}) = {lateness}) as string)"),
)
.await
.map_err(|e| ControllerError::invalid_transport_configuration(endpoint_name, &e.to_string()))?;
if &is_zero == "true" {
return Err(ControllerError::invalid_transport_configuration(
endpoint_name,
&format!(
"invalid LATENESS attribute '{lateness}' of the timestamp column '{timestamp_column}': LATENESS must be greater than zero; {docs}"
),
));
}
Ok(())
}