feldera_adapterlib/utils/
datafusion.rs1use crate::errors::journal::ControllerError;
2use anyhow::{Error as AnyError, anyhow};
3use arrow::array::Array;
4use datafusion::common::arrow::array::{AsArray, RecordBatch};
5use datafusion::logical_expr::sqlparser::parser::ParserError;
6use datafusion::prelude::{SQLOptions, SessionContext};
7use datafusion::sql::sqlparser::dialect::GenericDialect;
8use datafusion::sql::sqlparser::parser::Parser;
9use feldera_types::program_schema::{ColumnType, Field, Relation, SqlType};
10
11pub async fn execute_query_collect(
13 datafusion: &SessionContext,
14 query: &str,
15) -> Result<Vec<RecordBatch>, AnyError> {
16 let options = SQLOptions::new()
17 .with_allow_ddl(false)
18 .with_allow_dml(false);
19
20 let df = datafusion
21 .sql_with_options(query, options)
22 .await
23 .map_err(|e| anyhow!("error compiling query '{query}': {e}"))?;
24
25 df.collect()
26 .await
27 .map_err(|e| anyhow!("error executing query '{query}': {e}"))
28}
29
30pub async fn execute_singleton_query(
32 datafusion: &SessionContext,
33 query: &str,
34) -> Result<String, AnyError> {
35 let result = execute_query_collect(datafusion, query).await?;
36 if result.len() != 1 {
37 return Err(anyhow!(
38 "internal error: query '{query}' returned {} batches; expected: 1",
39 result.len()
40 ));
41 }
42
43 if result[0].num_rows() != 1 {
44 return Err(anyhow!(
45 "internal error: query '{query}' returned {} rows; expected: 1",
46 result[0].num_rows()
47 ));
48 }
49
50 if result[0].num_columns() != 1 {
51 return Err(anyhow!(
52 "internal error: query '{query}' returned {} columns; expected: 1",
53 result[0].num_columns()
54 ));
55 }
56
57 let column0 = result[0].column(0);
58
59 array_to_string(column0).ok_or_else(|| {
60 anyhow!("internal error: cannot retrieve the output of query '{query}' as a string")
61 })
62}
63
64pub fn array_to_string(array: &dyn Array) -> Option<String> {
65 if let Some(string_view_array) = array.as_string_view_opt() {
66 Some(string_view_array.value(0).to_string())
67 } else {
68 array
69 .as_string_opt::<i32>()
70 .map(|array| array.value(0).to_string())
71 }
72}
73
74pub fn validate_sql_expression(expr: &str) -> Result<(), ParserError> {
76 let mut parser = Parser::new(&GenericDialect).try_with_sql(expr)?;
77 parser.parse_expr()?;
78
79 Ok(())
80}
81
82pub fn timestamp_to_sql_expression(column_type: &ColumnType, expr: &str) -> String {
85 match column_type.typ {
86 SqlType::Timestamp => format!("timestamp '{expr}'"),
87 SqlType::Date => format!("date '{expr}'"),
88 _ => expr.to_string(),
89 }
90}
91
92pub fn validate_timestamp_type(
94 endpoint_name: &str,
95 timestamp: &Field,
96 docs: &str,
97) -> Result<(), ControllerError> {
98 if !timestamp.columntype.is_integral_type()
99 && !matches!(
100 ×tamp.columntype.typ,
101 SqlType::Date | SqlType::Timestamp
102 )
103 {
104 return Err(ControllerError::invalid_transport_configuration(
105 endpoint_name,
106 &format!(
107 "timestamp column '{}' has unsupported type {}; supported types for 'timestamp_column' are integer types, DATE, and TIMESTAMP; {docs}",
108 timestamp.name,
109 serde_json::to_string(×tamp.columntype).unwrap()
110 ),
111 ));
112 }
113
114 Ok(())
115}
116
117pub async fn validate_timestamp_column(
119 endpoint_name: &str,
120 timestamp_column: &str,
121 datafusion: &SessionContext,
122 schema: &Relation,
123 docs: &str,
124) -> Result<(), ControllerError> {
125 let Some(field) = schema.field(timestamp_column) else {
127 return Err(ControllerError::invalid_transport_configuration(
128 endpoint_name,
129 &format!("timestamp column '{timestamp_column}' not found in table schema"),
130 ));
131 };
132
133 validate_timestamp_type(endpoint_name, field, docs)?;
135
136 let Some(lateness) = &field.lateness else {
138 return Err(ControllerError::invalid_transport_configuration(
139 endpoint_name,
140 &format!(
141 "timestamp column '{timestamp_column}' does not have a LATENESS attribute; {docs}"
142 ),
143 ));
144 };
145
146 validate_sql_expression(lateness).map_err(|e|
148 ControllerError::invalid_transport_configuration(
149 endpoint_name,
150 &format!("error parsing LATENESS attribute '{lateness}' of the timestamp column '{timestamp_column}': {e}; {docs}"),
151 ),
152 )?;
153
154 let is_zero = execute_singleton_query(
158 datafusion,
159 &format!("select cast((({lateness} + {lateness}) = {lateness}) as string)"),
160 )
161 .await
162 .map_err(|e| ControllerError::invalid_transport_configuration(endpoint_name, &e.to_string()))?;
163
164 if &is_zero == "true" {
165 return Err(ControllerError::invalid_transport_configuration(
166 endpoint_name,
167 &format!(
168 "invalid LATENESS attribute '{lateness}' of the timestamp column '{timestamp_column}': LATENESS must be greater than zero; {docs}"
169 ),
170 ));
171 }
172
173 Ok(())
174}