vegafusion_core/runtime/
runtime.rs

1use std::{any::Any, collections::HashMap, sync::Arc};
2
3use crate::proto::gen::pretransform::pre_transform_values_warning::WarningType as ValuesWarningType;
4use crate::{
5    data::dataset::VegaFusionDataset,
6    planning::{
7        apply_pre_transform::apply_pre_transform_datasets,
8        destringify_selection_datetimes::destringify_selection_datetimes,
9        plan::{PlannerConfig, SpecPlan},
10        watch::{ExportUpdateArrow, ExportUpdateNamespace},
11    },
12    proto::gen::{
13        pretransform::{
14            pre_transform_extract_warning, PlannerWarning, PreTransformExtractOpts,
15            PreTransformExtractWarning, PreTransformRowLimitWarning, PreTransformSpecOpts,
16            PreTransformSpecWarning, PreTransformValuesOpts, PreTransformValuesWarning,
17        },
18        tasks::{NodeValueIndex, TaskGraph, TzConfig, VariableNamespace},
19    },
20    spec::{chart::ChartSpec, values::MissingNullOrValue},
21    task_graph::{
22        graph::ScopedVariable,
23        task_value::{NamedTaskValue, TaskValue},
24    },
25};
26use async_trait::async_trait;
27use vegafusion_common::{
28    data::table::VegaFusionTable,
29    error::{Result, ResultWithContext, VegaFusionError},
30};
31
32#[derive(Clone, Debug)]
33pub struct PreTransformExtractTable {
34    pub name: String,
35    pub scope: Vec<u32>,
36    pub table: VegaFusionTable,
37}
38
39#[async_trait]
40pub trait VegaFusionRuntimeTrait: Send + Sync {
41    fn as_any(&self) -> &dyn Any;
42
43    async fn query_request(
44        &self,
45        task_graph: Arc<TaskGraph>,
46        indices: &[NodeValueIndex],
47        inline_datasets: &HashMap<String, VegaFusionDataset>,
48    ) -> Result<Vec<NamedTaskValue>>;
49
50    async fn pre_transform_spec_plan(
51        &self,
52        spec: &ChartSpec,
53        local_tz: &str,
54        default_input_tz: &Option<String>,
55        preserve_interactivity: bool,
56        inline_datasets: &HashMap<String, VegaFusionDataset>,
57        keep_variables: Vec<ScopedVariable>,
58    ) -> Result<(SpecPlan, Vec<ExportUpdateArrow>)> {
59        // Create spec plan
60        let plan = SpecPlan::try_new(
61            spec,
62            &PlannerConfig::pre_transformed_spec_config(preserve_interactivity, keep_variables),
63        )?;
64
65        // Extract inline dataset fingerprints
66        let dataset_fingerprints = inline_datasets
67            .iter()
68            .map(|(k, ds)| (k.clone(), ds.fingerprint()))
69            .collect::<HashMap<_, _>>();
70
71        // Create task graph for server spec
72        let tz_config = TzConfig {
73            local_tz: local_tz.to_string(),
74            default_input_tz: default_input_tz.clone(),
75        };
76        let task_scope = plan.server_spec.to_task_scope().unwrap();
77        let tasks = plan
78            .server_spec
79            .to_tasks(&tz_config, &dataset_fingerprints)
80            .unwrap();
81        let task_graph = TaskGraph::new(tasks, &task_scope).unwrap();
82        let task_graph_mapping = task_graph.build_mapping();
83
84        // Gather values of server-to-client values
85        let mut init = Vec::new();
86        let task_graph = Arc::new(task_graph);
87        let indices: Vec<NodeValueIndex> = plan
88            .comm_plan
89            .server_to_client
90            .iter()
91            .filter_map(|var| task_graph_mapping.get(var).cloned())
92            .collect();
93
94        let response_values = self
95            .query_request(task_graph.clone(), &indices, inline_datasets)
96            .await
97            .with_context(|| "Failed to query node values")?;
98
99        for (var, response_value) in plan.comm_plan.server_to_client.iter().zip(response_values) {
100            init.push(ExportUpdateArrow {
101                namespace: ExportUpdateNamespace::try_from(var.0.namespace()).unwrap(),
102                name: var.0.name.clone(),
103                scope: var.1.clone(),
104                value: response_value.value,
105            });
106        }
107        Ok((plan, init))
108    }
109
110    async fn pre_transform_spec(
111        &self,
112        spec: &ChartSpec,
113        inline_datasets: &HashMap<String, VegaFusionDataset>,
114        options: &PreTransformSpecOpts,
115    ) -> Result<(ChartSpec, Vec<PreTransformSpecWarning>)> {
116        let input_spec = spec;
117
118        let keep_variables: Vec<ScopedVariable> = options
119            .keep_variables
120            .clone()
121            .into_iter()
122            .map(|var| (var.variable.unwrap(), var.scope))
123            .collect();
124        let (plan, init) = self
125            .pre_transform_spec_plan(
126                spec,
127                &options.local_tz,
128                &options.default_input_tz,
129                options.preserve_interactivity,
130                inline_datasets,
131                keep_variables,
132            )
133            .await?;
134
135        apply_pre_transform_datasets(input_spec, &plan, init, options.row_limit)
136    }
137
138    async fn pre_transform_extract(
139        &self,
140        spec: &ChartSpec,
141        inline_datasets: &HashMap<String, VegaFusionDataset>,
142        options: &PreTransformExtractOpts,
143    ) -> Result<(
144        ChartSpec,
145        Vec<PreTransformExtractTable>,
146        Vec<PreTransformExtractWarning>,
147    )> {
148        let input_spec = spec;
149        let keep_variables: Vec<ScopedVariable> = options
150            .keep_variables
151            .clone()
152            .into_iter()
153            .map(|var| (var.variable.unwrap(), var.scope))
154            .collect();
155
156        let (plan, init) = self
157            .pre_transform_spec_plan(
158                spec,
159                &options.local_tz,
160                &options.default_input_tz,
161                options.preserve_interactivity,
162                inline_datasets,
163                keep_variables,
164            )
165            .await?;
166
167        // Update client spec with server values
168        let mut spec = plan.client_spec.clone();
169        let mut datasets: Vec<PreTransformExtractTable> = Vec::new();
170        let extract_threshold = options.extract_threshold as usize;
171
172        for export_update in init {
173            let scope = export_update.scope.clone();
174            let name = export_update.name.as_str();
175            match export_update.namespace {
176                ExportUpdateNamespace::Signal => {
177                    // Always inline signal values
178                    let signal = spec.get_nested_signal_mut(&scope, name)?;
179                    signal.value = MissingNullOrValue::Value(export_update.value.to_json()?);
180                }
181                ExportUpdateNamespace::Data => {
182                    let data = spec.get_nested_data_mut(&scope, name)?;
183
184                    // If the input dataset includes inline values and no transforms,
185                    // copy the input JSON directly to avoid the case where round-tripping
186                    // through Arrow homogenizes mixed type arrays.
187                    // E.g. round tripping may turn [1, "two"] into ["1", "two"]
188                    let input_values =
189                        input_spec
190                            .get_nested_data(&scope, name)
191                            .ok()
192                            .and_then(|data| {
193                                if data.transform.is_empty() {
194                                    data.values.clone()
195                                } else {
196                                    None
197                                }
198                            });
199                    if let Some(input_values) = input_values {
200                        // Set inline value
201                        data.values = Some(input_values);
202                    } else if let TaskValue::Table(table) = export_update.value {
203                        if table.num_rows() <= extract_threshold {
204                            // Inline small tables
205                            data.values = Some(table.to_json()?);
206                        } else {
207                            // Extract non-small tables
208                            datasets.push(PreTransformExtractTable {
209                                name: export_update.name,
210                                scope: export_update.scope,
211                                table,
212                            });
213                        }
214                    } else {
215                        return Err(VegaFusionError::internal(
216                            "Expected Data TaskValue to be an Table",
217                        ));
218                    }
219                }
220            }
221        }
222
223        // Destringify datetime strings in selection store datasets
224        destringify_selection_datetimes(&mut spec)?;
225
226        // Build warnings
227        let mut warnings: Vec<PreTransformExtractWarning> = Vec::new();
228
229        // Add planner warnings
230        for planner_warning in &plan.warnings {
231            warnings.push(PreTransformExtractWarning {
232                warning_type: Some(pre_transform_extract_warning::WarningType::Planner(
233                    PlannerWarning {
234                        message: planner_warning.message(),
235                    },
236                )),
237            });
238        }
239
240        Ok((spec, datasets, warnings))
241    }
242
243    async fn pre_transform_values(
244        &self,
245        spec: &ChartSpec,
246        variables: &[ScopedVariable],
247        inline_datasets: &HashMap<String, VegaFusionDataset>,
248        options: &PreTransformValuesOpts,
249    ) -> Result<(Vec<TaskValue>, Vec<PreTransformValuesWarning>)> {
250        // Check that requested variables exist and collect indices
251        for var in variables {
252            let scope = var.1.as_slice();
253            let variable = var.0.clone();
254            let name = variable.name.clone();
255            let namespace = variable.clone().ns();
256
257            match namespace {
258                VariableNamespace::Signal => {
259                    if spec.get_nested_signal(scope, &name).is_err() {
260                        return Err(VegaFusionError::pre_transform(format!(
261                            "No signal named {} with scope {:?}",
262                            name, scope
263                        )));
264                    }
265                }
266                VariableNamespace::Data => {
267                    if spec.get_nested_data(scope, &name).is_err() {
268                        return Err(VegaFusionError::pre_transform(format!(
269                            "No dataset named {} with scope {:?}",
270                            name, scope
271                        )));
272                    }
273                }
274                VariableNamespace::Scale => {
275                    return Err(VegaFusionError::pre_transform(format!(
276                        "pre_transform_values does not support scale variable {:?}",
277                        variable
278                    )))
279                }
280            }
281        }
282
283        // Make sure planner keeps the requested variables, event
284        // if they are not used elsewhere in the spec
285        let keep_variables = Vec::from(variables);
286
287        // Create spec plan
288        let plan = SpecPlan::try_new(
289            spec,
290            &PlannerConfig {
291                stringify_local_datetimes: false,
292                extract_inline_data: true,
293                split_domain_data: false,
294                projection_pushdown: false,
295                allow_client_to_server_comms: true,
296                keep_variables,
297                ..Default::default()
298            },
299        )?;
300
301        // Extract inline dataset fingerprints
302        let dataset_fingerprints = inline_datasets
303            .iter()
304            .map(|(k, ds)| (k.clone(), ds.fingerprint()))
305            .collect::<HashMap<_, _>>();
306
307        // Create task graph for server spec
308        let tz_config = TzConfig {
309            local_tz: options.local_tz.to_string(),
310            default_input_tz: options.default_input_tz.clone(),
311        };
312        let task_scope = plan.server_spec.to_task_scope().unwrap();
313        let tasks = plan
314            .server_spec
315            .to_tasks(&tz_config, &dataset_fingerprints)?;
316        let task_graph = TaskGraph::new(tasks, &task_scope).unwrap();
317        let task_graph_mapping = task_graph.build_mapping();
318
319        let mut warnings: Vec<PreTransformValuesWarning> = Vec::new();
320
321        // Add planner warnings
322        for planner_warning in &plan.warnings {
323            warnings.push(PreTransformValuesWarning {
324                warning_type: Some(ValuesWarningType::Planner(PlannerWarning {
325                    message: planner_warning.message(),
326                })),
327            });
328        }
329
330        // Collect node indices for variables
331        let indices: Vec<_> = variables
332            .iter()
333            .map(|var| {
334                if let Some(index) = task_graph_mapping.get(&(var.0.clone(), var.1.clone())) {
335                    Ok(*index)
336                } else {
337                    Err(VegaFusionError::pre_transform(format!(
338                        "Requested variable {var:?}\n requires transforms or signal \
339                            expressions that are not yet supported"
340                    )))
341                }
342            })
343            .collect::<Result<Vec<_>>>()?;
344
345        // perform query
346        let named_task_values = self
347            .query_request(Arc::new(task_graph.clone()), &indices, inline_datasets)
348            .await?;
349
350        // Collect values and handle row limit
351        let mut task_values: Vec<TaskValue> = Vec::new();
352        let row_limit = options.row_limit.map(|l| l as usize);
353        for named_task_value in named_task_values {
354            let value = named_task_value.value;
355            let variable = named_task_value.variable;
356
357            // Apply row_limit
358            let value = if let (Some(row_limit), TaskValue::Table(table)) = (row_limit, &value) {
359                warnings.push(PreTransformValuesWarning {
360                    warning_type: Some(ValuesWarningType::RowLimit(PreTransformRowLimitWarning {
361                        datasets: vec![variable.clone()],
362                    })),
363                });
364                TaskValue::Table(table.head(row_limit))
365            } else {
366                value
367            };
368
369            task_values.push(value);
370        }
371
372        Ok((task_values, warnings))
373    }
374}