1use std::{any::Any, collections::HashMap, sync::Arc};
2
3use crate::proto::gen::pretransform::pre_transform_values_warning::WarningType as ValuesWarningType;
4use crate::{
5 data::dataset::VegaFusionDataset,
6 planning::{
7 apply_pre_transform::apply_pre_transform_datasets,
8 destringify_selection_datetimes::destringify_selection_datetimes,
9 plan::{PlannerConfig, SpecPlan},
10 watch::{ExportUpdateArrow, ExportUpdateNamespace},
11 },
12 proto::gen::{
13 pretransform::{
14 pre_transform_extract_warning, PlannerWarning, PreTransformExtractOpts,
15 PreTransformExtractWarning, PreTransformRowLimitWarning, PreTransformSpecOpts,
16 PreTransformSpecWarning, PreTransformValuesOpts, PreTransformValuesWarning,
17 },
18 tasks::{NodeValueIndex, TaskGraph, TzConfig, VariableNamespace},
19 },
20 spec::{chart::ChartSpec, values::MissingNullOrValue},
21 task_graph::{
22 graph::ScopedVariable,
23 task_value::{NamedTaskValue, TaskValue},
24 },
25};
26use async_trait::async_trait;
27use vegafusion_common::{
28 data::table::VegaFusionTable,
29 error::{Result, ResultWithContext, VegaFusionError},
30};
31
32#[derive(Clone, Debug)]
33pub struct PreTransformExtractTable {
34 pub name: String,
35 pub scope: Vec<u32>,
36 pub table: VegaFusionTable,
37}
38
39#[async_trait]
40pub trait VegaFusionRuntimeTrait: Send + Sync {
41 fn as_any(&self) -> &dyn Any;
42
43 async fn query_request(
44 &self,
45 task_graph: Arc<TaskGraph>,
46 indices: &[NodeValueIndex],
47 inline_datasets: &HashMap<String, VegaFusionDataset>,
48 ) -> Result<Vec<NamedTaskValue>>;
49
50 async fn pre_transform_spec_plan(
51 &self,
52 spec: &ChartSpec,
53 local_tz: &str,
54 default_input_tz: &Option<String>,
55 preserve_interactivity: bool,
56 inline_datasets: &HashMap<String, VegaFusionDataset>,
57 keep_variables: Vec<ScopedVariable>,
58 ) -> Result<(SpecPlan, Vec<ExportUpdateArrow>)> {
59 let plan = SpecPlan::try_new(
61 spec,
62 &PlannerConfig::pre_transformed_spec_config(preserve_interactivity, keep_variables),
63 )?;
64
65 let dataset_fingerprints = inline_datasets
67 .iter()
68 .map(|(k, ds)| (k.clone(), ds.fingerprint()))
69 .collect::<HashMap<_, _>>();
70
71 let tz_config = TzConfig {
73 local_tz: local_tz.to_string(),
74 default_input_tz: default_input_tz.clone(),
75 };
76 let task_scope = plan.server_spec.to_task_scope().unwrap();
77 let tasks = plan
78 .server_spec
79 .to_tasks(&tz_config, &dataset_fingerprints)
80 .unwrap();
81 let task_graph = TaskGraph::new(tasks, &task_scope).unwrap();
82 let task_graph_mapping = task_graph.build_mapping();
83
84 let mut init = Vec::new();
86 let task_graph = Arc::new(task_graph);
87 let indices: Vec<NodeValueIndex> = plan
88 .comm_plan
89 .server_to_client
90 .iter()
91 .filter_map(|var| task_graph_mapping.get(var).cloned())
92 .collect();
93
94 let response_values = self
95 .query_request(task_graph.clone(), &indices, inline_datasets)
96 .await
97 .with_context(|| "Failed to query node values")?;
98
99 for (var, response_value) in plan.comm_plan.server_to_client.iter().zip(response_values) {
100 init.push(ExportUpdateArrow {
101 namespace: ExportUpdateNamespace::try_from(var.0.namespace()).unwrap(),
102 name: var.0.name.clone(),
103 scope: var.1.clone(),
104 value: response_value.value,
105 });
106 }
107 Ok((plan, init))
108 }
109
110 async fn pre_transform_spec(
111 &self,
112 spec: &ChartSpec,
113 inline_datasets: &HashMap<String, VegaFusionDataset>,
114 options: &PreTransformSpecOpts,
115 ) -> Result<(ChartSpec, Vec<PreTransformSpecWarning>)> {
116 let input_spec = spec;
117
118 let keep_variables: Vec<ScopedVariable> = options
119 .keep_variables
120 .clone()
121 .into_iter()
122 .map(|var| (var.variable.unwrap(), var.scope))
123 .collect();
124 let (plan, init) = self
125 .pre_transform_spec_plan(
126 spec,
127 &options.local_tz,
128 &options.default_input_tz,
129 options.preserve_interactivity,
130 inline_datasets,
131 keep_variables,
132 )
133 .await?;
134
135 apply_pre_transform_datasets(input_spec, &plan, init, options.row_limit)
136 }
137
138 async fn pre_transform_extract(
139 &self,
140 spec: &ChartSpec,
141 inline_datasets: &HashMap<String, VegaFusionDataset>,
142 options: &PreTransformExtractOpts,
143 ) -> Result<(
144 ChartSpec,
145 Vec<PreTransformExtractTable>,
146 Vec<PreTransformExtractWarning>,
147 )> {
148 let input_spec = spec;
149 let keep_variables: Vec<ScopedVariable> = options
150 .keep_variables
151 .clone()
152 .into_iter()
153 .map(|var| (var.variable.unwrap(), var.scope))
154 .collect();
155
156 let (plan, init) = self
157 .pre_transform_spec_plan(
158 spec,
159 &options.local_tz,
160 &options.default_input_tz,
161 options.preserve_interactivity,
162 inline_datasets,
163 keep_variables,
164 )
165 .await?;
166
167 let mut spec = plan.client_spec.clone();
169 let mut datasets: Vec<PreTransformExtractTable> = Vec::new();
170 let extract_threshold = options.extract_threshold as usize;
171
172 for export_update in init {
173 let scope = export_update.scope.clone();
174 let name = export_update.name.as_str();
175 match export_update.namespace {
176 ExportUpdateNamespace::Signal => {
177 let signal = spec.get_nested_signal_mut(&scope, name)?;
179 signal.value = MissingNullOrValue::Value(export_update.value.to_json()?);
180 }
181 ExportUpdateNamespace::Data => {
182 let data = spec.get_nested_data_mut(&scope, name)?;
183
184 let input_values =
189 input_spec
190 .get_nested_data(&scope, name)
191 .ok()
192 .and_then(|data| {
193 if data.transform.is_empty() {
194 data.values.clone()
195 } else {
196 None
197 }
198 });
199 if let Some(input_values) = input_values {
200 data.values = Some(input_values);
202 } else if let TaskValue::Table(table) = export_update.value {
203 if table.num_rows() <= extract_threshold {
204 data.values = Some(table.to_json()?);
206 } else {
207 datasets.push(PreTransformExtractTable {
209 name: export_update.name,
210 scope: export_update.scope,
211 table,
212 });
213 }
214 } else {
215 return Err(VegaFusionError::internal(
216 "Expected Data TaskValue to be an Table",
217 ));
218 }
219 }
220 }
221 }
222
223 destringify_selection_datetimes(&mut spec)?;
225
226 let mut warnings: Vec<PreTransformExtractWarning> = Vec::new();
228
229 for planner_warning in &plan.warnings {
231 warnings.push(PreTransformExtractWarning {
232 warning_type: Some(pre_transform_extract_warning::WarningType::Planner(
233 PlannerWarning {
234 message: planner_warning.message(),
235 },
236 )),
237 });
238 }
239
240 Ok((spec, datasets, warnings))
241 }
242
243 async fn pre_transform_values(
244 &self,
245 spec: &ChartSpec,
246 variables: &[ScopedVariable],
247 inline_datasets: &HashMap<String, VegaFusionDataset>,
248 options: &PreTransformValuesOpts,
249 ) -> Result<(Vec<TaskValue>, Vec<PreTransformValuesWarning>)> {
250 for var in variables {
252 let scope = var.1.as_slice();
253 let variable = var.0.clone();
254 let name = variable.name.clone();
255 let namespace = variable.clone().ns();
256
257 match namespace {
258 VariableNamespace::Signal => {
259 if spec.get_nested_signal(scope, &name).is_err() {
260 return Err(VegaFusionError::pre_transform(format!(
261 "No signal named {} with scope {:?}",
262 name, scope
263 )));
264 }
265 }
266 VariableNamespace::Data => {
267 if spec.get_nested_data(scope, &name).is_err() {
268 return Err(VegaFusionError::pre_transform(format!(
269 "No dataset named {} with scope {:?}",
270 name, scope
271 )));
272 }
273 }
274 VariableNamespace::Scale => {
275 return Err(VegaFusionError::pre_transform(format!(
276 "pre_transform_values does not support scale variable {:?}",
277 variable
278 )))
279 }
280 }
281 }
282
283 let keep_variables = Vec::from(variables);
286
287 let plan = SpecPlan::try_new(
289 spec,
290 &PlannerConfig {
291 stringify_local_datetimes: false,
292 extract_inline_data: true,
293 split_domain_data: false,
294 projection_pushdown: false,
295 allow_client_to_server_comms: true,
296 keep_variables,
297 ..Default::default()
298 },
299 )?;
300
301 let dataset_fingerprints = inline_datasets
303 .iter()
304 .map(|(k, ds)| (k.clone(), ds.fingerprint()))
305 .collect::<HashMap<_, _>>();
306
307 let tz_config = TzConfig {
309 local_tz: options.local_tz.to_string(),
310 default_input_tz: options.default_input_tz.clone(),
311 };
312 let task_scope = plan.server_spec.to_task_scope().unwrap();
313 let tasks = plan
314 .server_spec
315 .to_tasks(&tz_config, &dataset_fingerprints)?;
316 let task_graph = TaskGraph::new(tasks, &task_scope).unwrap();
317 let task_graph_mapping = task_graph.build_mapping();
318
319 let mut warnings: Vec<PreTransformValuesWarning> = Vec::new();
320
321 for planner_warning in &plan.warnings {
323 warnings.push(PreTransformValuesWarning {
324 warning_type: Some(ValuesWarningType::Planner(PlannerWarning {
325 message: planner_warning.message(),
326 })),
327 });
328 }
329
330 let indices: Vec<_> = variables
332 .iter()
333 .map(|var| {
334 if let Some(index) = task_graph_mapping.get(&(var.0.clone(), var.1.clone())) {
335 Ok(*index)
336 } else {
337 Err(VegaFusionError::pre_transform(format!(
338 "Requested variable {var:?}\n requires transforms or signal \
339 expressions that are not yet supported"
340 )))
341 }
342 })
343 .collect::<Result<Vec<_>>>()?;
344
345 let named_task_values = self
347 .query_request(Arc::new(task_graph.clone()), &indices, inline_datasets)
348 .await?;
349
350 let mut task_values: Vec<TaskValue> = Vec::new();
352 let row_limit = options.row_limit.map(|l| l as usize);
353 for named_task_value in named_task_values {
354 let value = named_task_value.value;
355 let variable = named_task_value.variable;
356
357 let value = if let (Some(row_limit), TaskValue::Table(table)) = (row_limit, &value) {
359 warnings.push(PreTransformValuesWarning {
360 warning_type: Some(ValuesWarningType::RowLimit(PreTransformRowLimitWarning {
361 datasets: vec![variable.clone()],
362 })),
363 });
364 TaskValue::Table(table.head(row_limit))
365 } else {
366 value
367 };
368
369 task_values.push(value);
370 }
371
372 Ok((task_values, warnings))
373 }
374}