vegafusion_core/planning/
projection_pushdown.rs

1use crate::expression::column_usage::{
2    ColumnUsage, DatasetsColumnUsage, GetDatasetsColumnUsage, VlSelectionFields,
3};
4use crate::expression::parser::parse;
5use crate::planning::dependency_graph::build_dependency_graph;
6use crate::proto::gen::tasks::{Variable, VariableNamespace};
7use crate::spec::chart::{ChartSpec, ChartVisitor, MutChartVisitor};
8use crate::spec::data::DataSpec;
9use crate::spec::mark::{
10    EncodingOffset, MarkEncodeSpec, MarkEncodingField, MarkEncodingSpec, MarkSpec,
11};
12use crate::spec::scale::{
13    ScaleDataReferenceOrSignalSpec, ScaleDataReferenceSort, ScaleDomainSpec,
14    ScaleFieldReferenceSpec, ScaleRangeSpec, ScaleSpec,
15};
16use crate::spec::signal::{SignalOnEventSpec, SignalSpec};
17use crate::spec::transform::project::ProjectTransformSpec;
18use crate::spec::transform::{TransformColumns, TransformSpec};
19use crate::task_graph::graph::ScopedVariable;
20use crate::task_graph::scope::TaskScope;
21use itertools::{sorted, Itertools};
22use petgraph::algo::toposort;
23use std::collections::HashMap;
24use vegafusion_common::arrow::array::StringArray;
25use vegafusion_common::data::table::VegaFusionTable;
26use vegafusion_common::error::Result;
27use vegafusion_common::escape::{escape_field, unescape_field};
28
29/// This planning phase attempts to identify the precise subset of columns that are required
30/// of each dataset. If this can be determined for a particular dataset, then a projection
31/// transform is appended to the dataset's transform array. If it cannot be determined, then
32/// no change is made.
33pub fn projection_pushdown(chart_spec: &mut ChartSpec) -> Result<()> {
34    let datum_var = None;
35    let usage_scope = Vec::new();
36    let task_scope = chart_spec.to_task_scope()?;
37
38    // Collect field usage for vlSelectionTest datasets
39    let mut vl_selection_visitor = CollectVlSelectionTestFieldsVisitor::new(task_scope.clone());
40    chart_spec.walk(&mut vl_selection_visitor)?;
41    let vl_selection_fields = vl_selection_visitor.vl_selection_fields;
42
43    let datasets_column_usage = chart_spec.datasets_column_usage(
44        &datum_var,
45        usage_scope.as_slice(),
46        &task_scope,
47        &vl_selection_fields,
48    );
49
50    let mut visitor = InsertProjectionVisitor::new(&datasets_column_usage);
51    chart_spec.walk_mut(&mut visitor)?;
52    Ok(())
53}
54
55/// Get column usage info for the top-level root datasets of a Vega spec
56/// Returns map from dataset name to either:
57///   - None if column usage could not be determined
58///   - Vec<String> of the referenced columns if column usage could be determined precisely
59pub fn get_column_usage(chart_spec: &ChartSpec) -> Result<HashMap<String, Option<Vec<String>>>> {
60    let mut chart_spec = chart_spec.clone();
61
62    // split root nodes that have transforms so that the usage we compute refers to the source
63    // data, not the result after transforms
64    let mut new_data_specs: Vec<DataSpec> = Vec::new();
65    let suffix = "__column_usage_root";
66    for data_spec in &mut chart_spec.data {
67        if data_spec.source.is_none() && !data_spec.transform.is_empty() {
68            // This is a root dataset that has transforms, so we split it
69            let name = data_spec.name.clone();
70            let mut transforms = Vec::new();
71            transforms.append(&mut data_spec.transform);
72            let root_name = format!("{name}{suffix}");
73            data_spec.name = root_name.clone();
74
75            let new_spec = DataSpec {
76                name: name.clone(),
77                source: Some(root_name),
78                transform: transforms,
79                ..Default::default()
80            };
81
82            new_data_specs.push(new_spec);
83        }
84    }
85
86    chart_spec.data.append(&mut new_data_specs);
87
88    let datum_var = None;
89    let usage_scope = Vec::new();
90    let task_scope = chart_spec.to_task_scope()?;
91
92    // Collect field usage for vlSelectionTest datasets
93    let mut vl_selection_visitor = CollectVlSelectionTestFieldsVisitor::new(task_scope.clone());
94    chart_spec.walk(&mut vl_selection_visitor)?;
95    let vl_selection_fields = vl_selection_visitor.vl_selection_fields;
96
97    let datasets_column_usage = chart_spec.datasets_column_usage(
98        &datum_var,
99        usage_scope.as_slice(),
100        &task_scope,
101        &vl_selection_fields,
102    );
103
104    let mut root_dataset_columns: HashMap<String, Option<Vec<String>>> = HashMap::new();
105    for data_spec in &chart_spec.data {
106        if data_spec.source.is_none() {
107            let var = Variable::new(VariableNamespace::Data, &data_spec.name);
108            let scoped_var = (var, Vec::new());
109            let column_usage = datasets_column_usage
110                .usages
111                .get(&scoped_var)
112                .unwrap_or(&ColumnUsage::Unknown);
113
114            // Remove root dataset suffix that was added above
115            let original_name = data_spec
116                .name
117                .strip_suffix(suffix)
118                .unwrap_or(&data_spec.name)
119                .to_string();
120
121            match column_usage {
122                ColumnUsage::Unknown => {
123                    root_dataset_columns.insert(original_name.clone(), None);
124                }
125                ColumnUsage::Known(used) => {
126                    root_dataset_columns.insert(
127                        original_name.clone(),
128                        Some(used.iter().cloned().sorted().collect()),
129                    );
130                }
131            }
132        }
133    }
134    Ok(root_dataset_columns)
135}
136
137impl GetDatasetsColumnUsage for MarkEncodingField {
138    fn datasets_column_usage(
139        &self,
140        datum_var: &Option<ScopedVariable>,
141        _usage_scope: &[u32],
142        _task_scope: &TaskScope,
143        _vl_selection_fields: &VlSelectionFields,
144    ) -> DatasetsColumnUsage {
145        if let Some(datum_var) = datum_var {
146            let column_usage = match self {
147                MarkEncodingField::Field(field) => {
148                    if field.contains('.') || field.contains('[') {
149                        // Specification of a nested column like "target['x']" or "source.x"
150                        // (https://vega.github.io/vega/docs/types/#Field)
151                        // Eventually we could add a separate parser to identify the column portion,
152                        // but for now just declare as unknown column usage
153                        ColumnUsage::Unknown
154                    } else {
155                        ColumnUsage::empty().with_column(&unescape_field(field))
156                    }
157                }
158                MarkEncodingField::Object(field_object) => {
159                    // Field is an object that should have a "field" property.
160                    // Eventually we can add support for this form, for now declare as unknown
161                    // column usage
162                    if field_object.signal.is_some() {
163                        // Dynamically determined field
164                        ColumnUsage::Unknown
165                    } else if let Some(field) = &field_object.datum {
166                        // Just like specifying a string
167                        ColumnUsage::empty().with_column(&unescape_field(field))
168                    } else {
169                        ColumnUsage::empty()
170                    }
171                }
172            };
173            DatasetsColumnUsage::empty().with_column_usage(datum_var, column_usage)
174        } else {
175            DatasetsColumnUsage::empty()
176        }
177    }
178}
179
180impl GetDatasetsColumnUsage for MarkEncodingSpec {
181    fn datasets_column_usage(
182        &self,
183        datum_var: &Option<ScopedVariable>,
184        usage_scope: &[u32],
185        task_scope: &TaskScope,
186        vl_selection_fields: &VlSelectionFields,
187    ) -> DatasetsColumnUsage {
188        let mut usage = DatasetsColumnUsage::empty();
189
190        if let Some(datum_var) = datum_var {
191            // Handle direct field references
192            if let Some(field) = &self.field {
193                usage = usage.union(&field.datasets_column_usage(
194                    &Some(datum_var.clone()),
195                    usage_scope,
196                    task_scope,
197                    vl_selection_fields,
198                ))
199            }
200
201            // Handle signal
202            if let Some(signal) = &self.signal {
203                match parse(signal) {
204                    Ok(parsed) => {
205                        usage = usage.union(&parsed.datasets_column_usage(
206                            &Some(datum_var.clone()),
207                            usage_scope,
208                            task_scope,
209                            vl_selection_fields,
210                        ))
211                    }
212                    Err(_) => {
213                        // Failed to parse expression, unknown column usage
214                        usage = usage.with_unknown_usage(datum_var);
215                    }
216                }
217            }
218
219            // Handle test expression
220            if let Some(signal) = &self.test {
221                match parse(signal) {
222                    Ok(parsed) => {
223                        usage = usage.union(&parsed.datasets_column_usage(
224                            &Some(datum_var.clone()),
225                            usage_scope,
226                            task_scope,
227                            vl_selection_fields,
228                        ))
229                    }
230                    Err(_) => {
231                        // Failed to parse expression, unknown column usage
232                        usage = usage.with_unknown_usage(datum_var);
233                    }
234                }
235            }
236
237            // Handle offset
238            if let Some(EncodingOffset::Encoding(offset)) = &self.offset {
239                usage = usage.union(&offset.datasets_column_usage(
240                    &Some(datum_var.clone()),
241                    usage_scope,
242                    task_scope,
243                    vl_selection_fields,
244                ))
245            }
246        }
247        usage
248    }
249}
250
251impl GetDatasetsColumnUsage for MarkEncodeSpec {
252    fn datasets_column_usage(
253        &self,
254        datum_var: &Option<ScopedVariable>,
255        usage_scope: &[u32],
256        task_scope: &TaskScope,
257        vl_selection_fields: &VlSelectionFields,
258    ) -> DatasetsColumnUsage {
259        // Initialize empty usage
260        let mut usage = DatasetsColumnUsage::empty();
261
262        // Iterate over all encoding channels
263        for encoding_spec in self.encodings.values() {
264            for encoding_or_list in encoding_spec.channels.values() {
265                for encoding in encoding_or_list.to_vec() {
266                    usage = usage.union(&encoding.datasets_column_usage(
267                        datum_var,
268                        usage_scope,
269                        task_scope,
270                        vl_selection_fields,
271                    ))
272                }
273            }
274        }
275
276        usage
277    }
278}
279
280impl GetDatasetsColumnUsage for MarkSpec {
281    fn datasets_column_usage(
282        &self,
283        _datum_var: &Option<ScopedVariable>,
284        usage_scope: &[u32],
285        task_scope: &TaskScope,
286        vl_selection_fields: &VlSelectionFields,
287    ) -> DatasetsColumnUsage {
288        // Initialize empty usage
289        let mut usage = DatasetsColumnUsage::empty();
290        if self.type_ == "group" {
291            // group marks with data, signals, scales, marks
292            for sig in &self.signals {
293                usage = usage.union(&sig.datasets_column_usage(
294                    &None,
295                    usage_scope,
296                    task_scope,
297                    vl_selection_fields,
298                ))
299            }
300
301            for scale in &self.scales {
302                usage = usage.union(&scale.datasets_column_usage(
303                    &None,
304                    usage_scope,
305                    task_scope,
306                    vl_selection_fields,
307                ))
308            }
309
310            // Data is handled at chart-level
311
312            // Handle group from->facet->name. In this case, a new dataset is named for the
313            // subsets of the input dataset. For now, this means we don't know what columns
314            // from the input dataset are used. In the future, we could track which columns of
315            // the subset datasets are used.
316            if let Some(facet) = self.from.as_ref().and_then(|from| from.facet.clone()) {
317                let facet_data_var = Variable::new_data(&facet.data);
318                let parent_scope = &usage_scope[0..usage_scope.len() - 1];
319                if let Ok(resolved) = task_scope.resolve_scope(&facet_data_var, parent_scope) {
320                    let scoped_facet_data_var = (resolved.var, resolved.scope);
321                    usage = usage.with_unknown_usage(&scoped_facet_data_var);
322                }
323            }
324
325            // Handle group mark with from->data. For now, this results in unknown usage because
326            // the data columns can be used by outside of the encoding channels
327            // (e.g. in the title object) with the parent variable
328            if let Some(data) = self.from.as_ref().and_then(|from| from.data.clone()) {
329                let from_data_var = Variable::new_data(&data);
330                if let Ok(resolved) = task_scope.resolve_scope(&from_data_var, usage_scope) {
331                    let scoped_from_data_var = (resolved.var, resolved.scope);
332                    usage = usage.with_unknown_usage(&scoped_from_data_var);
333                }
334            }
335
336            let mut child_group_idx = 0;
337            for mark in &self.marks {
338                if mark.type_ == "group" {
339                    let mut child_usage_scope = Vec::from(usage_scope);
340                    child_usage_scope.push(child_group_idx as u32);
341                    usage = usage.union(&mark.datasets_column_usage(
342                        &None,
343                        child_usage_scope.as_slice(),
344                        task_scope,
345                        vl_selection_fields,
346                    ));
347                    child_group_idx += 1;
348                } else {
349                    usage = usage.union(&mark.datasets_column_usage(
350                        &None,
351                        usage_scope,
352                        task_scope,
353                        vl_selection_fields,
354                    ))
355                }
356            }
357        } else {
358            // non-group marks
359            if let Some(from) = &self.from {
360                if let Some(data_name) = &from.data {
361                    let data_var = Variable::new_data(data_name);
362                    if let Ok(resolved) = task_scope.resolve_scope(&data_var, usage_scope) {
363                        let scoped_datum_var: ScopedVariable = (resolved.var, resolved.scope);
364
365                        // Add alias from mark name to dataset
366                        if let Some(name) = &self.name {
367                            let mark_data_var: ScopedVariable =
368                                (Variable::new_data(name), Vec::from(usage_scope));
369                            usage = usage.with_alias(mark_data_var, scoped_datum_var.clone());
370                        }
371
372                        if let Some(encode) = &self.encode {
373                            usage = usage.union(&encode.datasets_column_usage(
374                                &Some(scoped_datum_var.clone()),
375                                usage_scope,
376                                task_scope,
377                                vl_selection_fields,
378                            ))
379                        }
380
381                        // Handle sort expression
382                        if let Some(sort) = &self.sort {
383                            let sort_fields = sort.field.to_vec();
384                            for sort_field in sort_fields {
385                                if let Ok(parsed) = parse(&sort_field) {
386                                    usage = usage.union(&parsed.datasets_column_usage(
387                                        &Some(scoped_datum_var.clone()),
388                                        usage_scope,
389                                        task_scope,
390                                        vl_selection_fields,
391                                    ));
392                                }
393                            }
394                        }
395
396                        // Check for mark-level transforms. We don't look inside of these yet,
397                        // so we don't know which columns are used
398                        if !self.transform.is_empty() {
399                            usage = usage.with_unknown_usage(&scoped_datum_var);
400                        }
401                    }
402                }
403            }
404        }
405
406        // All marks with "from" data source
407
408        usage
409    }
410}
411
412impl GetDatasetsColumnUsage for ScaleFieldReferenceSpec {
413    fn datasets_column_usage(
414        &self,
415        _datum_var: &Option<ScopedVariable>,
416        usage_scope: &[u32],
417        task_scope: &TaskScope,
418        _vl_selection_fields: &VlSelectionFields,
419    ) -> DatasetsColumnUsage {
420        let mut usage = DatasetsColumnUsage::empty();
421        let data_var = Variable::new_data(&self.data);
422        if let Ok(resolved) = task_scope.resolve_scope(&data_var, usage_scope) {
423            let scoped_datum_var: ScopedVariable = (resolved.var, resolved.scope);
424
425            // Handle field
426            usage = usage.with_column_usage(
427                &scoped_datum_var,
428                ColumnUsage::from(unescape_field(&self.field).as_str()),
429            );
430
431            // Handle sort field
432            if let Some(ScaleDataReferenceSort::Parameters(sort_params)) = &self.sort {
433                if let Some(sort_field) = &sort_params.field {
434                    usage = usage.with_column_usage(
435                        &scoped_datum_var,
436                        ColumnUsage::from(unescape_field(sort_field).as_str()),
437                    );
438                }
439            }
440        }
441
442        usage
443    }
444}
445
446impl GetDatasetsColumnUsage for ScaleDomainSpec {
447    fn datasets_column_usage(
448        &self,
449        _datum_var: &Option<ScopedVariable>,
450        usage_scope: &[u32],
451        task_scope: &TaskScope,
452        vl_selection_fields: &VlSelectionFields,
453    ) -> DatasetsColumnUsage {
454        let mut usage = DatasetsColumnUsage::empty();
455        let mut scale_data_refs = Vec::new();
456        let mut signals = Vec::new();
457        let mut sort = None;
458
459        match &self {
460            ScaleDomainSpec::FieldReference(field_ref) => {
461                scale_data_refs.push(field_ref.clone());
462                sort = field_ref.sort.clone();
463            }
464            ScaleDomainSpec::FieldsReference(fields_ref) => {
465                scale_data_refs.extend(fields_ref.to_field_references());
466            }
467            ScaleDomainSpec::FieldsReferences(fields_refs) => {
468                for v in &fields_refs.fields {
469                    match v {
470                        ScaleDataReferenceOrSignalSpec::Reference(scale_data_ref) => {
471                            scale_data_refs.push(scale_data_ref.clone());
472                        }
473                        ScaleDataReferenceOrSignalSpec::Signal(signal) => {
474                            signals.push(signal);
475                        }
476                    }
477                    sort = fields_refs.sort.clone();
478                }
479            }
480            _ => {}
481        };
482        for scale_data_ref in scale_data_refs {
483            // Push sort field down in the case of FieldsReference
484            let mut scale_data_ref = scale_data_ref.clone();
485            scale_data_ref.sort = sort.clone();
486
487            usage = usage.union(&scale_data_ref.datasets_column_usage(
488                &None,
489                usage_scope,
490                task_scope,
491                vl_selection_fields,
492            ))
493        }
494
495        // Handle signals
496        for signal in signals {
497            if let Ok(expr) = parse(&signal.signal) {
498                usage = usage.union(&expr.datasets_column_usage(
499                    &None,
500                    usage_scope,
501                    task_scope,
502                    vl_selection_fields,
503                ))
504            }
505        }
506        usage
507    }
508}
509
510impl GetDatasetsColumnUsage for ScaleRangeSpec {
511    fn datasets_column_usage(
512        &self,
513        _datum_var: &Option<ScopedVariable>,
514        usage_scope: &[u32],
515        task_scope: &TaskScope,
516        vl_selection_fields: &VlSelectionFields,
517    ) -> DatasetsColumnUsage {
518        let mut usage = DatasetsColumnUsage::empty();
519        if let ScaleRangeSpec::Reference(data_ref) = &self {
520            usage = usage.union(&data_ref.datasets_column_usage(
521                &None,
522                usage_scope,
523                task_scope,
524                vl_selection_fields,
525            ))
526        }
527        usage
528    }
529}
530
531impl GetDatasetsColumnUsage for ScaleSpec {
532    fn datasets_column_usage(
533        &self,
534        _datum_var: &Option<ScopedVariable>,
535        usage_scope: &[u32],
536        task_scope: &TaskScope,
537        vl_selection_fields: &VlSelectionFields,
538    ) -> DatasetsColumnUsage {
539        let mut usage = DatasetsColumnUsage::empty();
540        if let Some(domain) = &self.domain {
541            usage = usage.union(&domain.datasets_column_usage(
542                &None,
543                usage_scope,
544                task_scope,
545                vl_selection_fields,
546            ))
547        }
548
549        if let Some(range) = &self.range {
550            usage = usage.union(&range.datasets_column_usage(
551                &None,
552                usage_scope,
553                task_scope,
554                vl_selection_fields,
555            ))
556        }
557        usage
558    }
559}
560
561impl GetDatasetsColumnUsage for SignalSpec {
562    fn datasets_column_usage(
563        &self,
564        _datum_var: &Option<ScopedVariable>,
565        usage_scope: &[u32],
566        task_scope: &TaskScope,
567        vl_selection_fields: &VlSelectionFields,
568    ) -> DatasetsColumnUsage {
569        let mut usage = DatasetsColumnUsage::empty();
570        let mut expr_strs = Vec::new();
571
572        // Collect all expression strings used in the signal definition
573        // init
574        if let Some(init) = &self.init {
575            expr_strs.push(init.clone())
576        }
577
578        // update
579        if let Some(update) = &self.update {
580            expr_strs.push(update.clone())
581        }
582
583        // on
584        for sig_on in &self.on {
585            expr_strs.push(sig_on.update.clone());
586            for sig_event in sig_on.events.to_vec() {
587                if let SignalOnEventSpec::Signal(signal) = sig_event {
588                    expr_strs.push(signal.signal.clone());
589                }
590            }
591        }
592
593        for expr_str in expr_strs {
594            if let Ok(parsed) = parse(&expr_str) {
595                usage = usage.union(&parsed.datasets_column_usage(
596                    &None,
597                    usage_scope,
598                    task_scope,
599                    vl_selection_fields,
600                ))
601            }
602        }
603
604        usage
605    }
606}
607
608impl GetDatasetsColumnUsage for ChartSpec {
609    fn datasets_column_usage(
610        &self,
611        _datum_var: &Option<ScopedVariable>,
612        _usage_scope: &[u32],
613        task_scope: &TaskScope,
614        vl_selection_fields: &VlSelectionFields,
615    ) -> DatasetsColumnUsage {
616        // Initialize empty usage
617        let mut usage = DatasetsColumnUsage::empty();
618
619        // group marks with data, signals, scales, marks
620        for sig in &self.signals {
621            usage =
622                usage.union(&sig.datasets_column_usage(&None, &[], task_scope, vl_selection_fields))
623        }
624
625        for scale in &self.scales {
626            usage = usage.union(&scale.datasets_column_usage(
627                &None,
628                &[],
629                task_scope,
630                vl_selection_fields,
631            ))
632        }
633
634        let mut child_group_idx = 0;
635        for mark in &self.marks {
636            if mark.type_ == "group" {
637                let child_usage_scope = vec![child_group_idx as u32];
638                usage = usage.union(&mark.datasets_column_usage(
639                    &None,
640                    child_usage_scope.as_slice(),
641                    task_scope,
642                    vl_selection_fields,
643                ));
644                child_group_idx += 1;
645            } else {
646                usage = usage.union(&mark.datasets_column_usage(
647                    &None,
648                    &[],
649                    task_scope,
650                    vl_selection_fields,
651                ))
652            }
653        }
654
655        // Handle data
656        // Here we need to be careful to traverse datasets in rever topological order.
657        if let Ok((dep_graph, _)) = build_dependency_graph(self, &Default::default()) {
658            if let Ok(node_indexes) = toposort(&dep_graph, None) {
659                // Iterate over dependencies in reverse topological order
660                for node_idx in node_indexes.iter().rev() {
661                    let (scoped_dep_var, _) = dep_graph
662                        .node_weight(*node_idx)
663                        .expect("Expected node in graph");
664                    if matches!(scoped_dep_var.0.ns(), VariableNamespace::Data) {
665                        if let Ok(data) = self
666                            .get_nested_data(scoped_dep_var.1.as_slice(), &scoped_dep_var.0.name)
667                        {
668                            usage = usage.union(&datasets_column_usage_for_data(
669                                data,
670                                &usage,
671                                scoped_dep_var.1.as_slice(),
672                                task_scope,
673                                vl_selection_fields,
674                            ));
675                        }
676                    }
677                }
678            }
679        }
680
681        usage
682    }
683}
684
685/// We need a separate interface for getting dataset column usage from datasets to account
686/// for the fact that determining the usage of a dataset requires information about the usage
687/// of itself.
688fn datasets_column_usage_for_data(
689    data: &DataSpec,
690    usage: &DatasetsColumnUsage,
691    usage_scope: &[u32],
692    task_scope: &TaskScope,
693    vl_selection_fields: &VlSelectionFields,
694) -> DatasetsColumnUsage {
695    let mut usage = usage.clone();
696    if let Some(source) = &data.source {
697        let source_var = Variable::new_data(source);
698        if let Ok(resolved) = task_scope.resolve_scope(&source_var, usage_scope) {
699            let scoped_source_var = (resolved.var, resolved.scope);
700            let datum_var = Some(scoped_source_var.clone());
701
702            // Maintain collection of the columns that have been produced so far
703            let mut all_produced = ColumnUsage::empty();
704
705            // Track whether all transforms in the pipeline are pass through.
706            let mut all_passthrough = true;
707
708            // iterate through transforms
709            for tx in &data.transform {
710                let tx_cols =
711                    tx.transform_columns(&datum_var, usage_scope, task_scope, vl_selection_fields);
712                match tx_cols {
713                    TransformColumns::PassThrough {
714                        usage: tx_usage,
715                        produced: tx_produced,
716                    } => {
717                        // Remove previously created columns from tx_usage
718                        let tx_usage =
719                            tx_usage.without_column_usage(&scoped_source_var, &all_produced);
720
721                        // Add used columns
722                        usage = usage.union(&tx_usage);
723
724                        // Update produced columns
725                        all_produced = all_produced.union(&tx_produced);
726                    }
727                    TransformColumns::Overwrite {
728                        usage: tx_usage, ..
729                    } => {
730                        // Remove previously created columns from tx_usage
731                        let tx_usage =
732                            tx_usage.without_column_usage(&scoped_source_var, &all_produced);
733
734                        // Add used columns
735                        usage = usage.union(&tx_usage);
736
737                        // Downstream transforms no longer have access to source data columns,
738                        // so we're done
739                        all_passthrough = false;
740                        break;
741                    }
742                    TransformColumns::Unknown => {
743                        // All bets are off
744                        usage = usage.with_unknown_usage(&scoped_source_var);
745                        all_passthrough = false;
746                        break;
747                    }
748                }
749            }
750
751            // If all transforms were passthrough, then we may need to propagate the
752            // column usages of this dataset to it's source
753            if all_passthrough {
754                let self_var = Variable::new_data(&data.name);
755                let self_scoped_var: ScopedVariable = (self_var, Vec::from(usage_scope));
756                if let Some(self_usage) = usage.usages.get(&self_scoped_var) {
757                    let self_usage_not_produced = self_usage.difference(&all_produced);
758                    usage = usage.with_column_usage(&scoped_source_var, self_usage_not_produced);
759                }
760            }
761        }
762    }
763
764    // Check for lookup transform and ensure that all columns are kept from the looked up
765    // dataset
766    for tx in &data.transform {
767        if let TransformSpec::Lookup(lookup) = tx {
768            let lookup_from_var = Variable::new_data(&lookup.from);
769            if let Ok(resolved) = task_scope.resolve_scope(&lookup_from_var, usage_scope) {
770                let lookup_data_var = (resolved.var, resolved.scope);
771                usage = usage.with_unknown_usage(&lookup_data_var);
772            }
773        }
774    }
775    usage
776}
777
778/// Visitor to collect the non-UTC time scales
779struct InsertProjectionVisitor<'a> {
780    pub columns_usage: &'a DatasetsColumnUsage,
781}
782
783impl<'a> InsertProjectionVisitor<'a> {
784    pub fn new(columns_usage: &'a DatasetsColumnUsage) -> Self {
785        Self { columns_usage }
786    }
787}
788
789impl MutChartVisitor for InsertProjectionVisitor<'_> {
790    fn visit_data(&mut self, data: &mut DataSpec, scope: &[u32]) -> Result<()> {
791        let data_var = Variable::new_data(&data.name);
792        let scoped_data_var = (data_var, Vec::from(scope));
793        if let Some(ColumnUsage::Known(columns)) = self.columns_usage.usages.get(&scoped_data_var) {
794            if !columns.is_empty() {
795                // We know exactly which columns are required of this dataset (and it's not none),
796                // so we can append a projection transform to limit the columns that are produced
797                // Note: empty strings here seem to break vega, filter them out
798                let proj_fields: Vec<_> = sorted(columns)
799                    .filter(|&f| !f.is_empty())
800                    .cloned()
801                    .map(|f| escape_field(&f))
802                    .collect();
803
804                let proj_transform = TransformSpec::Project(ProjectTransformSpec {
805                    fields: proj_fields,
806                    extra: Default::default(),
807                });
808                let transforms = &mut data.transform;
809                transforms.push(proj_transform);
810            }
811        }
812        Ok(())
813    }
814}
815
816/// Visitor to collect the columns used in vl_selection_test datasets.
817/// Note: This is a bit of a hack which relies on implementation details of how Vega-Lite
818/// generates Vega. This may break in the future if Vega-Lite changes how selections are
819/// represented.
820#[derive(Clone)]
821pub struct CollectVlSelectionTestFieldsVisitor {
822    pub vl_selection_fields: VlSelectionFields,
823    pub task_scope: TaskScope,
824}
825
826impl CollectVlSelectionTestFieldsVisitor {
827    pub fn new(task_scope: TaskScope) -> Self {
828        Self {
829            vl_selection_fields: Default::default(),
830            task_scope,
831        }
832    }
833}
834
835impl ChartVisitor for CollectVlSelectionTestFieldsVisitor {
836    fn visit_signal(&mut self, signal: &SignalSpec, scope: &[u32]) -> Result<()> {
837        // Look for signal named {name}_tuple_fields with structure like
838        //
839        // {
840        //   "name": "brush_tuple_fields",
841        //   "value": [
842        //     {"field": "Miles_per_Gallon", "channel": "x", "type": "R"},
843        //     {"field": "Horsepower", "channel": "y", "type": "R"}
844        //   ]
845        // }
846        //
847        // or, for point selections
848        //
849        //   "value": [{"field": "year_date", "type": "E"}]
850        //
851        // With a corresponding dataset named "{name}_store". If we fine this pair, then use the
852        // "field" entries in {name}_tuple_fields as column usage fields.
853        if signal.name.ends_with("_tuple_fields") {
854            // Build name of potential store
855            let dataset_name = signal.name.trim_end_matches("_tuple_fields").to_string();
856            let mut store_name = dataset_name;
857            store_name.push_str("_store");
858            let store_var = Variable::new_data(&store_name);
859
860            // Try to re
861            if let Ok(resolved) = self.task_scope.resolve_scope(&store_var, scope) {
862                let scoped_brush_var: ScopedVariable = (resolved.var, resolved.scope);
863
864                if let Some(value) = &signal.value.as_option() {
865                    if let Ok(table) = VegaFusionTable::from_json(value) {
866                        // Check that we have "field", "channel", and "type" columns
867                        let schema = &table.schema;
868                        if schema.field_with_name("type").is_ok() {
869                            if let Ok(field_index) = schema.index_of("field") {
870                                if let Ok(batch) = table.to_record_batch() {
871                                    let field_array = batch.column(field_index);
872                                    if let Some(field_array) =
873                                        field_array.as_any().downcast_ref::<StringArray>()
874                                    {
875                                        for col in field_array.iter().flatten() {
876                                            let usage = self
877                                                .vl_selection_fields
878                                                .entry(scoped_brush_var.clone())
879                                                .or_insert_with(ColumnUsage::empty);
880
881                                            *usage = usage.with_column(col);
882                                        }
883                                    }
884                                }
885                            }
886                        }
887                    }
888                }
889            }
890        }
891        Ok(())
892    }
893}
894
895#[cfg(test)]
896mod tests {
897    use crate::expression::column_usage::{
898        ColumnUsage, DatasetsColumnUsage, GetDatasetsColumnUsage, VlSelectionFields,
899    };
900    use crate::proto::gen::tasks::Variable;
901
902    use crate::spec::mark::{MarkEncodeSpec, MarkSpec};
903    use crate::spec::scale::ScaleSpec;
904    use crate::spec::signal::SignalSpec;
905    use crate::task_graph::graph::ScopedVariable;
906    use crate::task_graph::scope::TaskScope;
907    use serde_json::json;
908
909    fn selection_fields() -> VlSelectionFields {
910        vec![(
911            (Variable::new_data("brush2_store"), Vec::new()),
912            ColumnUsage::from(vec!["AA", "BB", "CC"].as_slice()),
913        )]
914        .into_iter()
915        .collect()
916    }
917
918    fn task_scope() -> TaskScope {
919        let mut task_scope = TaskScope::new();
920        task_scope
921            .add_variable(&Variable::new_data("brush2_store"), &[])
922            .unwrap();
923        task_scope
924            .add_variable(&Variable::new_data("dataA"), &[])
925            .unwrap();
926        task_scope
927    }
928
929    #[test]
930    fn test_mark_encoding_column_known_usage() {
931        // Define selection dataset fields
932        let selection_fields = selection_fields();
933
934        let encodings: MarkEncodeSpec = serde_json::from_value(json!({
935            "update": {
936                "x": {"field": "one", "scale": "scale_a"},
937                "y": [
938                    {"field": "three", "scale": "scale_a", "test": "datum.two > 7"},
939                    {"value": 23},
940                ],
941                "opacity": [
942                    {"signal": "datum['four'] * 2", "test": "vlSelectionTest('brush2_store', datum)"},
943                    {"value": 0.3},
944                ]
945            }
946        })).unwrap();
947
948        // Build dataset_column_usage args
949        let datum_var: ScopedVariable = (Variable::new_data("dataA"), Vec::new());
950        let usage_scope = Vec::new();
951        let task_scope = task_scope();
952
953        let usage = encodings.datasets_column_usage(
954            &Some(datum_var.clone()),
955            &usage_scope,
956            &task_scope,
957            &selection_fields,
958        );
959
960        let expected = DatasetsColumnUsage::empty()
961            .with_column_usage(
962                &datum_var,
963                ColumnUsage::from(vec!["AA", "BB", "CC", "one", "two", "three", "four"].as_slice()),
964            )
965            .with_unknown_usage(&(Variable::new_data("brush2_store"), Vec::new()));
966
967        assert_eq!(usage, expected);
968
969        // // Without selection fields column usage should be unknown
970        let usage = encodings.datasets_column_usage(
971            &Some(datum_var.clone()),
972            &usage_scope,
973            &task_scope,
974            &Default::default(),
975        );
976        let expected = DatasetsColumnUsage::empty()
977            .with_unknown_usage(&datum_var)
978            .with_unknown_usage(&(Variable::new_data("brush2_store"), Vec::new()));
979
980        assert_eq!(usage, expected);
981    }
982
983    #[test]
984    fn test_mark_with_known_usage() {
985        // Define selection dataset fields
986        let selection_fields = selection_fields();
987
988        let mark: MarkSpec = serde_json::from_value(json!({
989            "type": "rect",
990            "from": {"data": "dataA"},
991            "encode": {
992                "init": {
993                    "x": {"field": "one", "scale": "scale_a"},
994                    "y": [
995                        {"field": "three", "scale": "scale_a", "test": "datum.two > 7"},
996                        {"value": 23},
997                    ],
998                },
999                "update": {
1000                    "opacity": [
1001                        {"signal": "datum['four'] * 2", "test": "vlSelectionTest('brush2_store', datum)"},
1002                        {"value": 0.3},
1003                    ]
1004                }
1005            }
1006        })).unwrap();
1007
1008        // Build dataset_column_usage args
1009        let usage_scope = Vec::new();
1010        let task_scope = task_scope();
1011
1012        let usage = mark.datasets_column_usage(&None, &usage_scope, &task_scope, &selection_fields);
1013
1014        let expected = DatasetsColumnUsage::empty()
1015            .with_column_usage(
1016                &(Variable::new_data("dataA"), Vec::new()),
1017                ColumnUsage::from(vec!["AA", "BB", "CC", "one", "two", "three", "four"].as_slice()),
1018            )
1019            .with_unknown_usage(&(Variable::new_data("brush2_store"), Vec::new()));
1020
1021        assert_eq!(usage, expected);
1022    }
1023
1024    #[test]
1025    fn test_scale_usage() {
1026        let scale: ScaleSpec = serde_json::from_value(json!({
1027            "name": "color",
1028            "scale": "quantize",
1029            "domain": {"data": "dataA", "field": "colZ"},
1030            "range": {"scheme": "blues", "count": 7}
1031        }))
1032        .unwrap();
1033
1034        // Build dataset_column_usage args
1035        let usage_scope = Vec::new();
1036        let task_scope = task_scope();
1037
1038        let usage =
1039            scale.datasets_column_usage(&None, &usage_scope, &task_scope, &Default::default());
1040
1041        let expected = DatasetsColumnUsage::empty().with_column_usage(
1042            &(Variable::new_data("dataA"), Vec::new()),
1043            ColumnUsage::from(vec!["colZ"].as_slice()),
1044        );
1045
1046        assert_eq!(usage, expected);
1047    }
1048
1049    #[test]
1050    fn test_signal_usage() {
1051        let signal: SignalSpec = serde_json::from_value(json!({
1052            "name": "indexDate",
1053            "description": "A date value that updates in response to mousemove.",
1054            "update": "length(data('brush2_store'))",
1055            "on": [{"events": "mousemove", "update": "length(data('dataA'))"}]
1056        }))
1057        .unwrap();
1058
1059        // Build dataset_column_usage args
1060        let usage_scope = Vec::new();
1061        let task_scope = task_scope();
1062
1063        let usage =
1064            signal.datasets_column_usage(&None, &usage_scope, &task_scope, &Default::default());
1065
1066        let expected = DatasetsColumnUsage::empty()
1067            .with_unknown_usage(&(Variable::new_data("brush2_store"), Vec::new()))
1068            .with_unknown_usage(&(Variable::new_data("dataA"), Vec::new()));
1069
1070        assert_eq!(usage, expected);
1071    }
1072}