chartml_core/plugin/
transform.rs1use std::collections::HashMap;
2use std::sync::Arc;
3
4use arrow::array::RecordBatch;
5use arrow::datatypes::SchemaRef;
6use async_trait::async_trait;
7use indexmap::IndexMap;
8
9use crate::data::DataTable;
10use crate::error::ChartError;
11use crate::spec::TransformSpec;
12
13#[derive(Debug, Clone, Default)]
15pub struct TransformContext {
16 pub params: HashMap<String, serde_json::Value>,
18}
19
20#[derive(Debug, Clone)]
22pub struct TransformResult {
23 pub data: DataTable,
24 pub metadata: HashMap<String, serde_json::Value>,
25}
26
27#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
40#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
41pub trait TransformMiddleware: Send + Sync {
42 async fn transform(
44 &self,
45 sources: &IndexMap<String, DataTable>,
46 spec: &TransformSpec,
47 context: &TransformContext,
48 ) -> Result<TransformResult, ChartError>;
49
50 async fn transform_batches(
57 &self,
58 sources: &IndexMap<String, (SchemaRef, Vec<RecordBatch>)>,
59 spec: &TransformSpec,
60 context: &TransformContext,
61 ) -> Result<TransformResult, ChartError> {
62 let mut data_tables = IndexMap::with_capacity(sources.len());
63 for (name, (schema, batches)) in sources {
64 let batch = if batches.is_empty() {
65 RecordBatch::new_empty(Arc::clone(schema))
66 } else {
67 arrow::compute::concat_batches(schema, batches).map_err(|e| {
68 ChartError::DataError(format!(
69 "Failed to concat batches for source '{}': {}",
70 name, e
71 ))
72 })?
73 };
74 data_tables.insert(name.clone(), DataTable::from_record_batch(batch));
75 }
76 self.transform(&data_tables, spec, context).await
77 }
78}