vegafusion_runtime/transform/
sequence.rs1use crate::expression::compiler::compile;
2use crate::expression::compiler::config::CompilationConfig;
3use crate::transform::TransformTrait;
4
5use crate::data::util::SessionContextUtils;
6use crate::expression::compiler::utils::ExprHelpers;
7use async_trait::async_trait;
8use datafusion::prelude::{DataFrame, SessionContext};
9use std::sync::Arc;
10use vegafusion_common::arrow::array::{ArrayRef, Float64Array};
11use vegafusion_common::arrow::datatypes::DataType;
12use vegafusion_common::arrow::datatypes::{Field, Schema, SchemaRef};
13use vegafusion_common::arrow::record_batch::RecordBatch;
14use vegafusion_common::data::scalar::ScalarValueHelpers;
15use vegafusion_common::data::table::VegaFusionTable;
16use vegafusion_common::error::Result;
17use vegafusion_core::proto::gen::transforms::Sequence;
18use vegafusion_core::task_graph::task_value::TaskValue;
19
20#[async_trait]
21impl TransformTrait for Sequence {
22 async fn eval(
23 &self,
24 dataframe: DataFrame,
25 config: &CompilationConfig,
26 ) -> Result<(DataFrame, Vec<TaskValue>)> {
27 let start_expr = compile(self.start.as_ref().unwrap(), config, None)?;
28 let start_scalar = start_expr.eval_to_scalar()?;
29 let start = start_scalar.to_f64()?;
30
31 let stop_expr = compile(self.stop.as_ref().unwrap(), config, None)?;
32 let stop_scalar = stop_expr.eval_to_scalar()?;
33 let stop = stop_scalar.to_f64()?;
34
35 let step = if let Some(step_signal) = &self.step {
36 let step_expr = compile(step_signal, config, None)?;
37 let step_scalar = step_expr.eval_to_scalar()?;
38 step_scalar.to_f64()?
39 } else if stop >= start {
40 1.0
41 } else {
42 -1.0
43 };
44
45 let capacity = ((stop - start).abs() / step.abs()).ceil() as usize;
46 let mut data_builder = Float64Array::builder(capacity);
47 let mut val = start;
48 if start <= stop && step > 0.0 {
49 while val < stop {
50 data_builder.append_value(val);
51 val += step;
52 }
53 } else if step < 0.0 {
54 while val > stop {
55 data_builder.append_value(val);
56 val += step;
57 }
58 }
59 let data_array = Arc::new(data_builder.finish()) as ArrayRef;
60 let col_name = self.r#as.clone().unwrap_or_else(|| "data".to_string());
61 let data_schema = Arc::new(Schema::new(vec![Field::new(
62 &col_name,
63 DataType::Float64,
64 true,
65 )])) as SchemaRef;
66 let data_batch = RecordBatch::try_new(data_schema, vec![data_array])?;
67 let data_table = VegaFusionTable::from(data_batch);
68
69 let (state, _) = dataframe.into_parts();
71 let ctx = SessionContext::from(state);
72 let result = ctx.vegafusion_table(data_table.with_ordering()?).await?;
73
74 Ok((result, Default::default()))
75 }
76}