vegafusion_runtime/transform/
sequence.rs

1use crate::expression::compiler::compile;
2use crate::expression::compiler::config::CompilationConfig;
3use crate::transform::TransformTrait;
4
5use crate::data::util::SessionContextUtils;
6use crate::expression::compiler::utils::ExprHelpers;
7use async_trait::async_trait;
8use datafusion::prelude::{DataFrame, SessionContext};
9use std::sync::Arc;
10use vegafusion_common::arrow::array::{ArrayRef, Float64Array};
11use vegafusion_common::arrow::datatypes::DataType;
12use vegafusion_common::arrow::datatypes::{Field, Schema, SchemaRef};
13use vegafusion_common::arrow::record_batch::RecordBatch;
14use vegafusion_common::data::scalar::ScalarValueHelpers;
15use vegafusion_common::data::table::VegaFusionTable;
16use vegafusion_common::error::Result;
17use vegafusion_core::proto::gen::transforms::Sequence;
18use vegafusion_core::task_graph::task_value::TaskValue;
19
20#[async_trait]
21impl TransformTrait for Sequence {
22    async fn eval(
23        &self,
24        dataframe: DataFrame,
25        config: &CompilationConfig,
26    ) -> Result<(DataFrame, Vec<TaskValue>)> {
27        let start_expr = compile(self.start.as_ref().unwrap(), config, None)?;
28        let start_scalar = start_expr.eval_to_scalar()?;
29        let start = start_scalar.to_f64()?;
30
31        let stop_expr = compile(self.stop.as_ref().unwrap(), config, None)?;
32        let stop_scalar = stop_expr.eval_to_scalar()?;
33        let stop = stop_scalar.to_f64()?;
34
35        let step = if let Some(step_signal) = &self.step {
36            let step_expr = compile(step_signal, config, None)?;
37            let step_scalar = step_expr.eval_to_scalar()?;
38            step_scalar.to_f64()?
39        } else if stop >= start {
40            1.0
41        } else {
42            -1.0
43        };
44
45        let capacity = ((stop - start).abs() / step.abs()).ceil() as usize;
46        let mut data_builder = Float64Array::builder(capacity);
47        let mut val = start;
48        if start <= stop && step > 0.0 {
49            while val < stop {
50                data_builder.append_value(val);
51                val += step;
52            }
53        } else if step < 0.0 {
54            while val > stop {
55                data_builder.append_value(val);
56                val += step;
57            }
58        }
59        let data_array = Arc::new(data_builder.finish()) as ArrayRef;
60        let col_name = self.r#as.clone().unwrap_or_else(|| "data".to_string());
61        let data_schema = Arc::new(Schema::new(vec![Field::new(
62            &col_name,
63            DataType::Float64,
64            true,
65        )])) as SchemaRef;
66        let data_batch = RecordBatch::try_new(data_schema, vec![data_array])?;
67        let data_table = VegaFusionTable::from(data_batch);
68
69        // Build session context from input DataFrame
70        let (state, _) = dataframe.into_parts();
71        let ctx = SessionContext::from(state);
72        let result = ctx.vegafusion_table(data_table.with_ordering()?).await?;
73
74        Ok((result, Default::default()))
75    }
76}