Skip to main content

chartml_core/plugin/
transform.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use arrow::array::RecordBatch;
5use arrow::datatypes::SchemaRef;
6use async_trait::async_trait;
7use indexmap::IndexMap;
8
9use crate::data::DataTable;
10use crate::error::ChartError;
11use crate::spec::TransformSpec;
12
13/// Context available during transform execution.
14#[derive(Debug, Clone, Default)]
15pub struct TransformContext {
16    /// Parameter values resolved from the spec.
17    pub params: HashMap<String, serde_json::Value>,
18}
19
20/// Result of a transform operation.
21#[derive(Debug, Clone)]
22pub struct TransformResult {
23    pub data: DataTable,
24    pub metadata: HashMap<String, serde_json::Value>,
25}
26
27/// Transform middleware — processes data between fetch and render.
28///
29/// Receives a map of named source tables (insertion-ordered, matching the YAML
30/// `data:` map order) and produces a single result table that the renderer
31/// consumes. Implementations are expected to:
32///
33/// - Register every entry in `sources` so user-authored SQL can reference
34///   any source by its declared name.
35/// - For single-entry maps with a non-`"source"` key, additionally register
36///   the sole table under the alias `"source"` so legacy SQL referencing
37///   `FROM source` keeps working. Multi-entry maps are NOT aliased — the
38///   caller's SQL must use the explicit source names.
39#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
40#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
41pub trait TransformMiddleware: Send + Sync {
42    /// Transform input data according to the spec.
43    async fn transform(
44        &self,
45        sources: &IndexMap<String, DataTable>,
46        spec: &TransformSpec,
47        context: &TransformContext,
48    ) -> Result<TransformResult, ChartError>;
49
50    /// Batch-oriented transform. Receives multiple RecordBatches per source,
51    /// allowing implementations to register them into MemTable without
52    /// concatenation.
53    ///
54    /// Default concatenates batches into DataTables and delegates to
55    /// `transform()`. Override in `DataFusionTransform` to avoid the concat.
56    async fn transform_batches(
57        &self,
58        sources: &IndexMap<String, (SchemaRef, Vec<RecordBatch>)>,
59        spec: &TransformSpec,
60        context: &TransformContext,
61    ) -> Result<TransformResult, ChartError> {
62        let mut data_tables = IndexMap::with_capacity(sources.len());
63        for (name, (schema, batches)) in sources {
64            let batch = if batches.is_empty() {
65                RecordBatch::new_empty(Arc::clone(schema))
66            } else {
67                arrow::compute::concat_batches(schema, batches).map_err(|e| {
68                    ChartError::DataError(format!(
69                        "Failed to concat batches for source '{}': {}",
70                        name, e
71                    ))
72                })?
73            };
74            data_tables.insert(name.clone(), DataTable::from_record_batch(batch));
75        }
76        self.transform(&data_tables, spec, context).await
77    }
78}