Skip to main content

feldera_adapterlib/utils/
datafusion.rs

1use crate::errors::journal::ControllerError;
2use anyhow::{Error as AnyError, anyhow};
3use arrow::array::Array;
4use datafusion::common::arrow::array::{AsArray, RecordBatch};
5use datafusion::logical_expr::sqlparser::parser::ParserError;
6use datafusion::prelude::{SQLOptions, SessionContext};
7use datafusion::sql::sqlparser::dialect::GenericDialect;
8use datafusion::sql::sqlparser::parser::Parser;
9use feldera_types::program_schema::{ColumnType, Field, Relation, SqlType};
10
11/// Execute a SQL query and collect all results in a vector of `RecordBatch`'s.
12pub async fn execute_query_collect(
13    datafusion: &SessionContext,
14    query: &str,
15) -> Result<Vec<RecordBatch>, AnyError> {
16    let options = SQLOptions::new()
17        .with_allow_ddl(false)
18        .with_allow_dml(false);
19
20    let df = datafusion
21        .sql_with_options(query, options)
22        .await
23        .map_err(|e| anyhow!("error compiling query '{query}': {e}"))?;
24
25    df.collect()
26        .await
27        .map_err(|e| anyhow!("error executing query '{query}': {e}"))
28}
29
30/// Execute a SQL query that returns a result with exactly one row and column of type `string`.
31pub async fn execute_singleton_query(
32    datafusion: &SessionContext,
33    query: &str,
34) -> Result<String, AnyError> {
35    let result = execute_query_collect(datafusion, query).await?;
36    if result.len() != 1 {
37        return Err(anyhow!(
38            "internal error: query '{query}' returned {} batches; expected: 1",
39            result.len()
40        ));
41    }
42
43    if result[0].num_rows() != 1 {
44        return Err(anyhow!(
45            "internal error: query '{query}' returned {} rows; expected: 1",
46            result[0].num_rows()
47        ));
48    }
49
50    if result[0].num_columns() != 1 {
51        return Err(anyhow!(
52            "internal error: query '{query}' returned {} columns; expected: 1",
53            result[0].num_columns()
54        ));
55    }
56
57    let column0 = result[0].column(0);
58
59    array_to_string(column0).ok_or_else(|| {
60        anyhow!("internal error: cannot retrieve the output of query '{query}' as a string")
61    })
62}
63
64pub fn array_to_string(array: &dyn Array) -> Option<String> {
65    if let Some(string_view_array) = array.as_string_view_opt() {
66        Some(string_view_array.value(0).to_string())
67    } else {
68        array
69            .as_string_opt::<i32>()
70            .map(|array| array.value(0).to_string())
71    }
72}
73
74/// Parse expression only to validate it.
75pub fn validate_sql_expression(expr: &str) -> Result<(), ParserError> {
76    let mut parser = Parser::new(&GenericDialect).try_with_sql(expr)?;
77    parser.parse_expr()?;
78
79    Ok(())
80}
81
82/// Convert a value of the timestamp column returned by a SQL query into a valid
83/// SQL expression.
84pub fn timestamp_to_sql_expression(column_type: &ColumnType, expr: &str) -> String {
85    match column_type.typ {
86        SqlType::Timestamp => format!("timestamp '{expr}'"),
87        SqlType::Date => format!("date '{expr}'"),
88        _ => expr.to_string(),
89    }
90}
91
92/// Check that the `timestamp` field has one of supported types.
93pub fn validate_timestamp_type(
94    endpoint_name: &str,
95    timestamp: &Field,
96    docs: &str,
97) -> Result<(), ControllerError> {
98    if !timestamp.columntype.is_integral_type()
99        && !matches!(
100            &timestamp.columntype.typ,
101            SqlType::Date | SqlType::Timestamp
102        )
103    {
104        return Err(ControllerError::invalid_transport_configuration(
105            endpoint_name,
106            &format!(
107                "timestamp column '{}' has unsupported type {}; supported types for 'timestamp_column' are integer types, DATE, and TIMESTAMP; {docs}",
108                timestamp.name,
109                serde_json::to_string(&timestamp.columntype).unwrap()
110            ),
111        ));
112    }
113
114    Ok(())
115}
116
117/// Validate 'timestamp_column'.
118pub async fn validate_timestamp_column(
119    endpoint_name: &str,
120    timestamp_column: &str,
121    datafusion: &SessionContext,
122    schema: &Relation,
123    docs: &str,
124) -> Result<(), ControllerError> {
125    // Lookup column in the schema.
126    let Some(field) = schema.field(timestamp_column) else {
127        return Err(ControllerError::invalid_transport_configuration(
128            endpoint_name,
129            &format!("timestamp column '{timestamp_column}' not found in table schema"),
130        ));
131    };
132
133    // Field must have a supported type.
134    validate_timestamp_type(endpoint_name, field, docs)?;
135
136    // Column must have lateness.
137    let Some(lateness) = &field.lateness else {
138        return Err(ControllerError::invalid_transport_configuration(
139            endpoint_name,
140            &format!(
141                "timestamp column '{timestamp_column}' does not have a LATENESS attribute; {docs}"
142            ),
143        ));
144    };
145
146    // Validate lateness expression.
147    validate_sql_expression(lateness).map_err(|e|
148                ControllerError::invalid_transport_configuration(
149                    endpoint_name,
150                    &format!("error parsing LATENESS attribute '{lateness}' of the timestamp column '{timestamp_column}': {e}; {docs}"),
151                ),
152            )?;
153
154    // Lateness has to be >0. Zero would mean that we need to ingest data strictly in order. If we need to support this case in the future,
155    // we could revert to our old (and very costly) strategy of issuing a single `select *` query with the 'ORDER BY timestamp_column' clause,
156    // which requires storing and sorting the entire collection locally.
157    let is_zero = execute_singleton_query(
158        datafusion,
159        &format!("select cast((({lateness} + {lateness}) = {lateness}) as string)"),
160    )
161    .await
162    .map_err(|e| ControllerError::invalid_transport_configuration(endpoint_name, &e.to_string()))?;
163
164    if &is_zero == "true" {
165        return Err(ControllerError::invalid_transport_configuration(
166            endpoint_name,
167            &format!(
168                "invalid LATENESS attribute '{lateness}' of the timestamp column '{timestamp_column}': LATENESS must be greater than zero; {docs}"
169            ),
170        ));
171    }
172
173    Ok(())
174}