1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
use crate::{
    base::database::{dataframe_to_record_batch, record_batch_to_dataframe},
    sql::transform::RecordBatchExpr,
};
use arrow::record_batch::RecordBatch;
use dyn_partial_eq::DynPartialEq;
use polars::prelude::{IntoLazy, LazyFrame};
use serde::{Deserialize, Serialize};

/// The result expression is used to transform the results of a query
///
/// Note: both the `transformation` and `result_schema` are
/// mutually exclusive operations. So they must not be set at the same time.
#[derive(Debug, DynPartialEq, PartialEq, Serialize, Deserialize)]
pub struct ResultExpr {
    transformation: Box<dyn RecordBatchExpr>,
}

impl ResultExpr {
    /// Create a new `ResultExpr` node with the provided transformation to be applied to the input record batch.
    pub fn new(transformation: Box<dyn RecordBatchExpr>) -> Self {
        Self { transformation }
    }
}

pub(super) fn record_batch_to_lazy_frame(result_batch: RecordBatch) -> Option<(LazyFrame, usize)> {
    let num_input_rows = result_batch.num_rows();
    let df = record_batch_to_dataframe(result_batch)?;
    Some((df.lazy(), num_input_rows))
}
pub(super) fn lazy_frame_to_record_batch(lazy_frame: LazyFrame) -> Option<RecordBatch> {
    // We're currently excluding NULLs in post-processing due to a lack of
    // prover support, aiming to avoid future complexities.
    // The drawback is that users won't get NULL results in aggregations on empty data.
    // For example, the query `SELECT MAX(i), COUNT(i), SUM(i), MIN(i) FROM table WHERE s = 'nonexist'`
    // will now omit the entire row (before, it would return `null, 0, 0, null`).
    // This choice is acceptable, as `SELECT MAX(i), COUNT(i), SUM(i) FROM table WHERE s = 'nonexist' GROUP BY f`
    // has the same outcome.
    //
    // TODO: Revisit if we add NULL support to the prover.
    let lazy_frame = lazy_frame.drop_nulls(None);

    dataframe_to_record_batch(lazy_frame.collect().ok()?)
}

impl ResultExpr {
    /// Transform the `RecordBatch` result of a query using the `transformation` expression
    pub fn transform_results(&self, result_batch: RecordBatch) -> Option<RecordBatch> {
        self.transformation.apply_transformation(result_batch)
    }
}