use std::collections::HashMap;
use std::sync::Arc;
use arrow::array::RecordBatch;
use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use indexmap::IndexMap;
use crate::data::DataTable;
use crate::error::ChartError;
use crate::spec::TransformSpec;
#[derive(Debug, Clone, Default)]
pub struct TransformContext {
pub params: HashMap<String, serde_json::Value>,
}
#[derive(Debug, Clone)]
pub struct TransformResult {
pub data: DataTable,
pub metadata: HashMap<String, serde_json::Value>,
}
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
pub trait TransformMiddleware: Send + Sync {
async fn transform(
&self,
sources: &IndexMap<String, DataTable>,
spec: &TransformSpec,
context: &TransformContext,
) -> Result<TransformResult, ChartError>;
async fn transform_batches(
&self,
sources: &IndexMap<String, (SchemaRef, Vec<RecordBatch>)>,
spec: &TransformSpec,
context: &TransformContext,
) -> Result<TransformResult, ChartError> {
let mut data_tables = IndexMap::with_capacity(sources.len());
for (name, (schema, batches)) in sources {
let batch = if batches.is_empty() {
RecordBatch::new_empty(Arc::clone(schema))
} else {
arrow::compute::concat_batches(schema, batches).map_err(|e| {
ChartError::DataError(format!(
"Failed to concat batches for source '{}': {}",
name, e
))
})?
};
data_tables.insert(name.clone(), DataTable::from_record_batch(batch));
}
self.transform(&data_tables, spec, context).await
}
}