1use crate::expression::column_usage::{
2 ColumnUsage, DatasetsColumnUsage, GetDatasetsColumnUsage, VlSelectionFields,
3};
4use crate::expression::parser::parse;
5use crate::planning::dependency_graph::build_dependency_graph;
6use crate::proto::gen::tasks::{Variable, VariableNamespace};
7use crate::spec::chart::{ChartSpec, ChartVisitor, MutChartVisitor};
8use crate::spec::data::DataSpec;
9use crate::spec::mark::{
10 EncodingOffset, MarkEncodeSpec, MarkEncodingField, MarkEncodingSpec, MarkSpec,
11};
12use crate::spec::scale::{
13 ScaleDataReferenceOrSignalSpec, ScaleDataReferenceSort, ScaleDomainSpec,
14 ScaleFieldReferenceSpec, ScaleRangeSpec, ScaleSpec,
15};
16use crate::spec::signal::{SignalOnEventSpec, SignalSpec};
17use crate::spec::transform::project::ProjectTransformSpec;
18use crate::spec::transform::{TransformColumns, TransformSpec};
19use crate::task_graph::graph::ScopedVariable;
20use crate::task_graph::scope::TaskScope;
21use itertools::{sorted, Itertools};
22use petgraph::algo::toposort;
23use std::collections::HashMap;
24use vegafusion_common::arrow::array::StringArray;
25use vegafusion_common::data::table::VegaFusionTable;
26use vegafusion_common::error::Result;
27use vegafusion_common::escape::{escape_field, unescape_field};
28
29pub fn projection_pushdown(chart_spec: &mut ChartSpec) -> Result<()> {
34 let datum_var = None;
35 let usage_scope = Vec::new();
36 let task_scope = chart_spec.to_task_scope()?;
37
38 let mut vl_selection_visitor = CollectVlSelectionTestFieldsVisitor::new(task_scope.clone());
40 chart_spec.walk(&mut vl_selection_visitor)?;
41 let vl_selection_fields = vl_selection_visitor.vl_selection_fields;
42
43 let datasets_column_usage = chart_spec.datasets_column_usage(
44 &datum_var,
45 usage_scope.as_slice(),
46 &task_scope,
47 &vl_selection_fields,
48 );
49
50 let mut visitor = InsertProjectionVisitor::new(&datasets_column_usage);
51 chart_spec.walk_mut(&mut visitor)?;
52 Ok(())
53}
54
55pub fn get_column_usage(chart_spec: &ChartSpec) -> Result<HashMap<String, Option<Vec<String>>>> {
60 let mut chart_spec = chart_spec.clone();
61
62 let mut new_data_specs: Vec<DataSpec> = Vec::new();
65 let suffix = "__column_usage_root";
66 for data_spec in &mut chart_spec.data {
67 if data_spec.source.is_none() && !data_spec.transform.is_empty() {
68 let name = data_spec.name.clone();
70 let mut transforms = Vec::new();
71 transforms.append(&mut data_spec.transform);
72 let root_name = format!("{name}{suffix}");
73 data_spec.name = root_name.clone();
74
75 let new_spec = DataSpec {
76 name: name.clone(),
77 source: Some(root_name),
78 transform: transforms,
79 ..Default::default()
80 };
81
82 new_data_specs.push(new_spec);
83 }
84 }
85
86 chart_spec.data.append(&mut new_data_specs);
87
88 let datum_var = None;
89 let usage_scope = Vec::new();
90 let task_scope = chart_spec.to_task_scope()?;
91
92 let mut vl_selection_visitor = CollectVlSelectionTestFieldsVisitor::new(task_scope.clone());
94 chart_spec.walk(&mut vl_selection_visitor)?;
95 let vl_selection_fields = vl_selection_visitor.vl_selection_fields;
96
97 let datasets_column_usage = chart_spec.datasets_column_usage(
98 &datum_var,
99 usage_scope.as_slice(),
100 &task_scope,
101 &vl_selection_fields,
102 );
103
104 let mut root_dataset_columns: HashMap<String, Option<Vec<String>>> = HashMap::new();
105 for data_spec in &chart_spec.data {
106 if data_spec.source.is_none() {
107 let var = Variable::new(VariableNamespace::Data, &data_spec.name);
108 let scoped_var = (var, Vec::new());
109 let column_usage = datasets_column_usage
110 .usages
111 .get(&scoped_var)
112 .unwrap_or(&ColumnUsage::Unknown);
113
114 let original_name = data_spec
116 .name
117 .strip_suffix(suffix)
118 .unwrap_or(&data_spec.name)
119 .to_string();
120
121 match column_usage {
122 ColumnUsage::Unknown => {
123 root_dataset_columns.insert(original_name.clone(), None);
124 }
125 ColumnUsage::Known(used) => {
126 root_dataset_columns.insert(
127 original_name.clone(),
128 Some(used.iter().cloned().sorted().collect()),
129 );
130 }
131 }
132 }
133 }
134 Ok(root_dataset_columns)
135}
136
137impl GetDatasetsColumnUsage for MarkEncodingField {
138 fn datasets_column_usage(
139 &self,
140 datum_var: &Option<ScopedVariable>,
141 _usage_scope: &[u32],
142 _task_scope: &TaskScope,
143 _vl_selection_fields: &VlSelectionFields,
144 ) -> DatasetsColumnUsage {
145 if let Some(datum_var) = datum_var {
146 let column_usage = match self {
147 MarkEncodingField::Field(field) => {
148 if field.contains('.') || field.contains('[') {
149 ColumnUsage::Unknown
154 } else {
155 ColumnUsage::empty().with_column(&unescape_field(field))
156 }
157 }
158 MarkEncodingField::Object(field_object) => {
159 if field_object.signal.is_some() {
163 ColumnUsage::Unknown
165 } else if let Some(field) = &field_object.datum {
166 ColumnUsage::empty().with_column(&unescape_field(field))
168 } else {
169 ColumnUsage::empty()
170 }
171 }
172 };
173 DatasetsColumnUsage::empty().with_column_usage(datum_var, column_usage)
174 } else {
175 DatasetsColumnUsage::empty()
176 }
177 }
178}
179
180impl GetDatasetsColumnUsage for MarkEncodingSpec {
181 fn datasets_column_usage(
182 &self,
183 datum_var: &Option<ScopedVariable>,
184 usage_scope: &[u32],
185 task_scope: &TaskScope,
186 vl_selection_fields: &VlSelectionFields,
187 ) -> DatasetsColumnUsage {
188 let mut usage = DatasetsColumnUsage::empty();
189
190 if let Some(datum_var) = datum_var {
191 if let Some(field) = &self.field {
193 usage = usage.union(&field.datasets_column_usage(
194 &Some(datum_var.clone()),
195 usage_scope,
196 task_scope,
197 vl_selection_fields,
198 ))
199 }
200
201 if let Some(signal) = &self.signal {
203 match parse(signal) {
204 Ok(parsed) => {
205 usage = usage.union(&parsed.datasets_column_usage(
206 &Some(datum_var.clone()),
207 usage_scope,
208 task_scope,
209 vl_selection_fields,
210 ))
211 }
212 Err(_) => {
213 usage = usage.with_unknown_usage(datum_var);
215 }
216 }
217 }
218
219 if let Some(signal) = &self.test {
221 match parse(signal) {
222 Ok(parsed) => {
223 usage = usage.union(&parsed.datasets_column_usage(
224 &Some(datum_var.clone()),
225 usage_scope,
226 task_scope,
227 vl_selection_fields,
228 ))
229 }
230 Err(_) => {
231 usage = usage.with_unknown_usage(datum_var);
233 }
234 }
235 }
236
237 if let Some(EncodingOffset::Encoding(offset)) = &self.offset {
239 usage = usage.union(&offset.datasets_column_usage(
240 &Some(datum_var.clone()),
241 usage_scope,
242 task_scope,
243 vl_selection_fields,
244 ))
245 }
246 }
247 usage
248 }
249}
250
251impl GetDatasetsColumnUsage for MarkEncodeSpec {
252 fn datasets_column_usage(
253 &self,
254 datum_var: &Option<ScopedVariable>,
255 usage_scope: &[u32],
256 task_scope: &TaskScope,
257 vl_selection_fields: &VlSelectionFields,
258 ) -> DatasetsColumnUsage {
259 let mut usage = DatasetsColumnUsage::empty();
261
262 for encoding_spec in self.encodings.values() {
264 for encoding_or_list in encoding_spec.channels.values() {
265 for encoding in encoding_or_list.to_vec() {
266 usage = usage.union(&encoding.datasets_column_usage(
267 datum_var,
268 usage_scope,
269 task_scope,
270 vl_selection_fields,
271 ))
272 }
273 }
274 }
275
276 usage
277 }
278}
279
280impl GetDatasetsColumnUsage for MarkSpec {
281 fn datasets_column_usage(
282 &self,
283 _datum_var: &Option<ScopedVariable>,
284 usage_scope: &[u32],
285 task_scope: &TaskScope,
286 vl_selection_fields: &VlSelectionFields,
287 ) -> DatasetsColumnUsage {
288 let mut usage = DatasetsColumnUsage::empty();
290 if self.type_ == "group" {
291 for sig in &self.signals {
293 usage = usage.union(&sig.datasets_column_usage(
294 &None,
295 usage_scope,
296 task_scope,
297 vl_selection_fields,
298 ))
299 }
300
301 for scale in &self.scales {
302 usage = usage.union(&scale.datasets_column_usage(
303 &None,
304 usage_scope,
305 task_scope,
306 vl_selection_fields,
307 ))
308 }
309
310 if let Some(facet) = self.from.as_ref().and_then(|from| from.facet.clone()) {
317 let facet_data_var = Variable::new_data(&facet.data);
318 let parent_scope = &usage_scope[0..usage_scope.len() - 1];
319 if let Ok(resolved) = task_scope.resolve_scope(&facet_data_var, parent_scope) {
320 let scoped_facet_data_var = (resolved.var, resolved.scope);
321 usage = usage.with_unknown_usage(&scoped_facet_data_var);
322 }
323 }
324
325 if let Some(data) = self.from.as_ref().and_then(|from| from.data.clone()) {
329 let from_data_var = Variable::new_data(&data);
330 if let Ok(resolved) = task_scope.resolve_scope(&from_data_var, usage_scope) {
331 let scoped_from_data_var = (resolved.var, resolved.scope);
332 usage = usage.with_unknown_usage(&scoped_from_data_var);
333 }
334 }
335
336 let mut child_group_idx = 0;
337 for mark in &self.marks {
338 if mark.type_ == "group" {
339 let mut child_usage_scope = Vec::from(usage_scope);
340 child_usage_scope.push(child_group_idx as u32);
341 usage = usage.union(&mark.datasets_column_usage(
342 &None,
343 child_usage_scope.as_slice(),
344 task_scope,
345 vl_selection_fields,
346 ));
347 child_group_idx += 1;
348 } else {
349 usage = usage.union(&mark.datasets_column_usage(
350 &None,
351 usage_scope,
352 task_scope,
353 vl_selection_fields,
354 ))
355 }
356 }
357 } else {
358 if let Some(from) = &self.from {
360 if let Some(data_name) = &from.data {
361 let data_var = Variable::new_data(data_name);
362 if let Ok(resolved) = task_scope.resolve_scope(&data_var, usage_scope) {
363 let scoped_datum_var: ScopedVariable = (resolved.var, resolved.scope);
364
365 if let Some(name) = &self.name {
367 let mark_data_var: ScopedVariable =
368 (Variable::new_data(name), Vec::from(usage_scope));
369 usage = usage.with_alias(mark_data_var, scoped_datum_var.clone());
370 }
371
372 if let Some(encode) = &self.encode {
373 usage = usage.union(&encode.datasets_column_usage(
374 &Some(scoped_datum_var.clone()),
375 usage_scope,
376 task_scope,
377 vl_selection_fields,
378 ))
379 }
380
381 if let Some(sort) = &self.sort {
383 let sort_fields = sort.field.to_vec();
384 for sort_field in sort_fields {
385 if let Ok(parsed) = parse(&sort_field) {
386 usage = usage.union(&parsed.datasets_column_usage(
387 &Some(scoped_datum_var.clone()),
388 usage_scope,
389 task_scope,
390 vl_selection_fields,
391 ));
392 }
393 }
394 }
395
396 if !self.transform.is_empty() {
399 usage = usage.with_unknown_usage(&scoped_datum_var);
400 }
401 }
402 }
403 }
404 }
405
406 usage
409 }
410}
411
412impl GetDatasetsColumnUsage for ScaleFieldReferenceSpec {
413 fn datasets_column_usage(
414 &self,
415 _datum_var: &Option<ScopedVariable>,
416 usage_scope: &[u32],
417 task_scope: &TaskScope,
418 _vl_selection_fields: &VlSelectionFields,
419 ) -> DatasetsColumnUsage {
420 let mut usage = DatasetsColumnUsage::empty();
421 let data_var = Variable::new_data(&self.data);
422 if let Ok(resolved) = task_scope.resolve_scope(&data_var, usage_scope) {
423 let scoped_datum_var: ScopedVariable = (resolved.var, resolved.scope);
424
425 usage = usage.with_column_usage(
427 &scoped_datum_var,
428 ColumnUsage::from(unescape_field(&self.field).as_str()),
429 );
430
431 if let Some(ScaleDataReferenceSort::Parameters(sort_params)) = &self.sort {
433 if let Some(sort_field) = &sort_params.field {
434 usage = usage.with_column_usage(
435 &scoped_datum_var,
436 ColumnUsage::from(unescape_field(sort_field).as_str()),
437 );
438 }
439 }
440 }
441
442 usage
443 }
444}
445
446impl GetDatasetsColumnUsage for ScaleDomainSpec {
447 fn datasets_column_usage(
448 &self,
449 _datum_var: &Option<ScopedVariable>,
450 usage_scope: &[u32],
451 task_scope: &TaskScope,
452 vl_selection_fields: &VlSelectionFields,
453 ) -> DatasetsColumnUsage {
454 let mut usage = DatasetsColumnUsage::empty();
455 let mut scale_data_refs = Vec::new();
456 let mut signals = Vec::new();
457 let mut sort = None;
458
459 match &self {
460 ScaleDomainSpec::FieldReference(field_ref) => {
461 scale_data_refs.push(field_ref.clone());
462 sort = field_ref.sort.clone();
463 }
464 ScaleDomainSpec::FieldsReference(fields_ref) => {
465 scale_data_refs.extend(fields_ref.to_field_references());
466 }
467 ScaleDomainSpec::FieldsReferences(fields_refs) => {
468 for v in &fields_refs.fields {
469 match v {
470 ScaleDataReferenceOrSignalSpec::Reference(scale_data_ref) => {
471 scale_data_refs.push(scale_data_ref.clone());
472 }
473 ScaleDataReferenceOrSignalSpec::Signal(signal) => {
474 signals.push(signal);
475 }
476 }
477 sort = fields_refs.sort.clone();
478 }
479 }
480 _ => {}
481 };
482 for scale_data_ref in scale_data_refs {
483 let mut scale_data_ref = scale_data_ref.clone();
485 scale_data_ref.sort = sort.clone();
486
487 usage = usage.union(&scale_data_ref.datasets_column_usage(
488 &None,
489 usage_scope,
490 task_scope,
491 vl_selection_fields,
492 ))
493 }
494
495 for signal in signals {
497 if let Ok(expr) = parse(&signal.signal) {
498 usage = usage.union(&expr.datasets_column_usage(
499 &None,
500 usage_scope,
501 task_scope,
502 vl_selection_fields,
503 ))
504 }
505 }
506 usage
507 }
508}
509
510impl GetDatasetsColumnUsage for ScaleRangeSpec {
511 fn datasets_column_usage(
512 &self,
513 _datum_var: &Option<ScopedVariable>,
514 usage_scope: &[u32],
515 task_scope: &TaskScope,
516 vl_selection_fields: &VlSelectionFields,
517 ) -> DatasetsColumnUsage {
518 let mut usage = DatasetsColumnUsage::empty();
519 if let ScaleRangeSpec::Reference(data_ref) = &self {
520 usage = usage.union(&data_ref.datasets_column_usage(
521 &None,
522 usage_scope,
523 task_scope,
524 vl_selection_fields,
525 ))
526 }
527 usage
528 }
529}
530
531impl GetDatasetsColumnUsage for ScaleSpec {
532 fn datasets_column_usage(
533 &self,
534 _datum_var: &Option<ScopedVariable>,
535 usage_scope: &[u32],
536 task_scope: &TaskScope,
537 vl_selection_fields: &VlSelectionFields,
538 ) -> DatasetsColumnUsage {
539 let mut usage = DatasetsColumnUsage::empty();
540 if let Some(domain) = &self.domain {
541 usage = usage.union(&domain.datasets_column_usage(
542 &None,
543 usage_scope,
544 task_scope,
545 vl_selection_fields,
546 ))
547 }
548
549 if let Some(range) = &self.range {
550 usage = usage.union(&range.datasets_column_usage(
551 &None,
552 usage_scope,
553 task_scope,
554 vl_selection_fields,
555 ))
556 }
557 usage
558 }
559}
560
561impl GetDatasetsColumnUsage for SignalSpec {
562 fn datasets_column_usage(
563 &self,
564 _datum_var: &Option<ScopedVariable>,
565 usage_scope: &[u32],
566 task_scope: &TaskScope,
567 vl_selection_fields: &VlSelectionFields,
568 ) -> DatasetsColumnUsage {
569 let mut usage = DatasetsColumnUsage::empty();
570 let mut expr_strs = Vec::new();
571
572 if let Some(init) = &self.init {
575 expr_strs.push(init.clone())
576 }
577
578 if let Some(update) = &self.update {
580 expr_strs.push(update.clone())
581 }
582
583 for sig_on in &self.on {
585 expr_strs.push(sig_on.update.clone());
586 for sig_event in sig_on.events.to_vec() {
587 if let SignalOnEventSpec::Signal(signal) = sig_event {
588 expr_strs.push(signal.signal.clone());
589 }
590 }
591 }
592
593 for expr_str in expr_strs {
594 if let Ok(parsed) = parse(&expr_str) {
595 usage = usage.union(&parsed.datasets_column_usage(
596 &None,
597 usage_scope,
598 task_scope,
599 vl_selection_fields,
600 ))
601 }
602 }
603
604 usage
605 }
606}
607
608impl GetDatasetsColumnUsage for ChartSpec {
609 fn datasets_column_usage(
610 &self,
611 _datum_var: &Option<ScopedVariable>,
612 _usage_scope: &[u32],
613 task_scope: &TaskScope,
614 vl_selection_fields: &VlSelectionFields,
615 ) -> DatasetsColumnUsage {
616 let mut usage = DatasetsColumnUsage::empty();
618
619 for sig in &self.signals {
621 usage =
622 usage.union(&sig.datasets_column_usage(&None, &[], task_scope, vl_selection_fields))
623 }
624
625 for scale in &self.scales {
626 usage = usage.union(&scale.datasets_column_usage(
627 &None,
628 &[],
629 task_scope,
630 vl_selection_fields,
631 ))
632 }
633
634 let mut child_group_idx = 0;
635 for mark in &self.marks {
636 if mark.type_ == "group" {
637 let child_usage_scope = vec![child_group_idx as u32];
638 usage = usage.union(&mark.datasets_column_usage(
639 &None,
640 child_usage_scope.as_slice(),
641 task_scope,
642 vl_selection_fields,
643 ));
644 child_group_idx += 1;
645 } else {
646 usage = usage.union(&mark.datasets_column_usage(
647 &None,
648 &[],
649 task_scope,
650 vl_selection_fields,
651 ))
652 }
653 }
654
655 if let Ok((dep_graph, _)) = build_dependency_graph(self, &Default::default()) {
658 if let Ok(node_indexes) = toposort(&dep_graph, None) {
659 for node_idx in node_indexes.iter().rev() {
661 let (scoped_dep_var, _) = dep_graph
662 .node_weight(*node_idx)
663 .expect("Expected node in graph");
664 if matches!(scoped_dep_var.0.ns(), VariableNamespace::Data) {
665 if let Ok(data) = self
666 .get_nested_data(scoped_dep_var.1.as_slice(), &scoped_dep_var.0.name)
667 {
668 usage = usage.union(&datasets_column_usage_for_data(
669 data,
670 &usage,
671 scoped_dep_var.1.as_slice(),
672 task_scope,
673 vl_selection_fields,
674 ));
675 }
676 }
677 }
678 }
679 }
680
681 usage
682 }
683}
684
685fn datasets_column_usage_for_data(
689 data: &DataSpec,
690 usage: &DatasetsColumnUsage,
691 usage_scope: &[u32],
692 task_scope: &TaskScope,
693 vl_selection_fields: &VlSelectionFields,
694) -> DatasetsColumnUsage {
695 let mut usage = usage.clone();
696 if let Some(source) = &data.source {
697 let source_var = Variable::new_data(source);
698 if let Ok(resolved) = task_scope.resolve_scope(&source_var, usage_scope) {
699 let scoped_source_var = (resolved.var, resolved.scope);
700 let datum_var = Some(scoped_source_var.clone());
701
702 let mut all_produced = ColumnUsage::empty();
704
705 let mut all_passthrough = true;
707
708 for tx in &data.transform {
710 let tx_cols =
711 tx.transform_columns(&datum_var, usage_scope, task_scope, vl_selection_fields);
712 match tx_cols {
713 TransformColumns::PassThrough {
714 usage: tx_usage,
715 produced: tx_produced,
716 } => {
717 let tx_usage =
719 tx_usage.without_column_usage(&scoped_source_var, &all_produced);
720
721 usage = usage.union(&tx_usage);
723
724 all_produced = all_produced.union(&tx_produced);
726 }
727 TransformColumns::Overwrite {
728 usage: tx_usage, ..
729 } => {
730 let tx_usage =
732 tx_usage.without_column_usage(&scoped_source_var, &all_produced);
733
734 usage = usage.union(&tx_usage);
736
737 all_passthrough = false;
740 break;
741 }
742 TransformColumns::Unknown => {
743 usage = usage.with_unknown_usage(&scoped_source_var);
745 all_passthrough = false;
746 break;
747 }
748 }
749 }
750
751 if all_passthrough {
754 let self_var = Variable::new_data(&data.name);
755 let self_scoped_var: ScopedVariable = (self_var, Vec::from(usage_scope));
756 if let Some(self_usage) = usage.usages.get(&self_scoped_var) {
757 let self_usage_not_produced = self_usage.difference(&all_produced);
758 usage = usage.with_column_usage(&scoped_source_var, self_usage_not_produced);
759 }
760 }
761 }
762 }
763
764 for tx in &data.transform {
767 if let TransformSpec::Lookup(lookup) = tx {
768 let lookup_from_var = Variable::new_data(&lookup.from);
769 if let Ok(resolved) = task_scope.resolve_scope(&lookup_from_var, usage_scope) {
770 let lookup_data_var = (resolved.var, resolved.scope);
771 usage = usage.with_unknown_usage(&lookup_data_var);
772 }
773 }
774 }
775 usage
776}
777
778struct InsertProjectionVisitor<'a> {
780 pub columns_usage: &'a DatasetsColumnUsage,
781}
782
783impl<'a> InsertProjectionVisitor<'a> {
784 pub fn new(columns_usage: &'a DatasetsColumnUsage) -> Self {
785 Self { columns_usage }
786 }
787}
788
789impl MutChartVisitor for InsertProjectionVisitor<'_> {
790 fn visit_data(&mut self, data: &mut DataSpec, scope: &[u32]) -> Result<()> {
791 let data_var = Variable::new_data(&data.name);
792 let scoped_data_var = (data_var, Vec::from(scope));
793 if let Some(ColumnUsage::Known(columns)) = self.columns_usage.usages.get(&scoped_data_var) {
794 if !columns.is_empty() {
795 let proj_fields: Vec<_> = sorted(columns)
799 .filter(|&f| !f.is_empty())
800 .cloned()
801 .map(|f| escape_field(&f))
802 .collect();
803
804 let proj_transform = TransformSpec::Project(ProjectTransformSpec {
805 fields: proj_fields,
806 extra: Default::default(),
807 });
808 let transforms = &mut data.transform;
809 transforms.push(proj_transform);
810 }
811 }
812 Ok(())
813 }
814}
815
816#[derive(Clone)]
821pub struct CollectVlSelectionTestFieldsVisitor {
822 pub vl_selection_fields: VlSelectionFields,
823 pub task_scope: TaskScope,
824}
825
826impl CollectVlSelectionTestFieldsVisitor {
827 pub fn new(task_scope: TaskScope) -> Self {
828 Self {
829 vl_selection_fields: Default::default(),
830 task_scope,
831 }
832 }
833}
834
835impl ChartVisitor for CollectVlSelectionTestFieldsVisitor {
836 fn visit_signal(&mut self, signal: &SignalSpec, scope: &[u32]) -> Result<()> {
837 if signal.name.ends_with("_tuple_fields") {
854 let dataset_name = signal.name.trim_end_matches("_tuple_fields").to_string();
856 let mut store_name = dataset_name;
857 store_name.push_str("_store");
858 let store_var = Variable::new_data(&store_name);
859
860 if let Ok(resolved) = self.task_scope.resolve_scope(&store_var, scope) {
862 let scoped_brush_var: ScopedVariable = (resolved.var, resolved.scope);
863
864 if let Some(value) = &signal.value.as_option() {
865 if let Ok(table) = VegaFusionTable::from_json(value) {
866 let schema = &table.schema;
868 if schema.field_with_name("type").is_ok() {
869 if let Ok(field_index) = schema.index_of("field") {
870 if let Ok(batch) = table.to_record_batch() {
871 let field_array = batch.column(field_index);
872 if let Some(field_array) =
873 field_array.as_any().downcast_ref::<StringArray>()
874 {
875 for col in field_array.iter().flatten() {
876 let usage = self
877 .vl_selection_fields
878 .entry(scoped_brush_var.clone())
879 .or_insert_with(ColumnUsage::empty);
880
881 *usage = usage.with_column(col);
882 }
883 }
884 }
885 }
886 }
887 }
888 }
889 }
890 }
891 Ok(())
892 }
893}
894
895#[cfg(test)]
896mod tests {
897 use crate::expression::column_usage::{
898 ColumnUsage, DatasetsColumnUsage, GetDatasetsColumnUsage, VlSelectionFields,
899 };
900 use crate::proto::gen::tasks::Variable;
901
902 use crate::spec::mark::{MarkEncodeSpec, MarkSpec};
903 use crate::spec::scale::ScaleSpec;
904 use crate::spec::signal::SignalSpec;
905 use crate::task_graph::graph::ScopedVariable;
906 use crate::task_graph::scope::TaskScope;
907 use serde_json::json;
908
909 fn selection_fields() -> VlSelectionFields {
910 vec![(
911 (Variable::new_data("brush2_store"), Vec::new()),
912 ColumnUsage::from(vec!["AA", "BB", "CC"].as_slice()),
913 )]
914 .into_iter()
915 .collect()
916 }
917
918 fn task_scope() -> TaskScope {
919 let mut task_scope = TaskScope::new();
920 task_scope
921 .add_variable(&Variable::new_data("brush2_store"), &[])
922 .unwrap();
923 task_scope
924 .add_variable(&Variable::new_data("dataA"), &[])
925 .unwrap();
926 task_scope
927 }
928
929 #[test]
930 fn test_mark_encoding_column_known_usage() {
931 let selection_fields = selection_fields();
933
934 let encodings: MarkEncodeSpec = serde_json::from_value(json!({
935 "update": {
936 "x": {"field": "one", "scale": "scale_a"},
937 "y": [
938 {"field": "three", "scale": "scale_a", "test": "datum.two > 7"},
939 {"value": 23},
940 ],
941 "opacity": [
942 {"signal": "datum['four'] * 2", "test": "vlSelectionTest('brush2_store', datum)"},
943 {"value": 0.3},
944 ]
945 }
946 })).unwrap();
947
948 let datum_var: ScopedVariable = (Variable::new_data("dataA"), Vec::new());
950 let usage_scope = Vec::new();
951 let task_scope = task_scope();
952
953 let usage = encodings.datasets_column_usage(
954 &Some(datum_var.clone()),
955 &usage_scope,
956 &task_scope,
957 &selection_fields,
958 );
959
960 let expected = DatasetsColumnUsage::empty()
961 .with_column_usage(
962 &datum_var,
963 ColumnUsage::from(vec!["AA", "BB", "CC", "one", "two", "three", "four"].as_slice()),
964 )
965 .with_unknown_usage(&(Variable::new_data("brush2_store"), Vec::new()));
966
967 assert_eq!(usage, expected);
968
969 let usage = encodings.datasets_column_usage(
971 &Some(datum_var.clone()),
972 &usage_scope,
973 &task_scope,
974 &Default::default(),
975 );
976 let expected = DatasetsColumnUsage::empty()
977 .with_unknown_usage(&datum_var)
978 .with_unknown_usage(&(Variable::new_data("brush2_store"), Vec::new()));
979
980 assert_eq!(usage, expected);
981 }
982
983 #[test]
984 fn test_mark_with_known_usage() {
985 let selection_fields = selection_fields();
987
988 let mark: MarkSpec = serde_json::from_value(json!({
989 "type": "rect",
990 "from": {"data": "dataA"},
991 "encode": {
992 "init": {
993 "x": {"field": "one", "scale": "scale_a"},
994 "y": [
995 {"field": "three", "scale": "scale_a", "test": "datum.two > 7"},
996 {"value": 23},
997 ],
998 },
999 "update": {
1000 "opacity": [
1001 {"signal": "datum['four'] * 2", "test": "vlSelectionTest('brush2_store', datum)"},
1002 {"value": 0.3},
1003 ]
1004 }
1005 }
1006 })).unwrap();
1007
1008 let usage_scope = Vec::new();
1010 let task_scope = task_scope();
1011
1012 let usage = mark.datasets_column_usage(&None, &usage_scope, &task_scope, &selection_fields);
1013
1014 let expected = DatasetsColumnUsage::empty()
1015 .with_column_usage(
1016 &(Variable::new_data("dataA"), Vec::new()),
1017 ColumnUsage::from(vec!["AA", "BB", "CC", "one", "two", "three", "four"].as_slice()),
1018 )
1019 .with_unknown_usage(&(Variable::new_data("brush2_store"), Vec::new()));
1020
1021 assert_eq!(usage, expected);
1022 }
1023
1024 #[test]
1025 fn test_scale_usage() {
1026 let scale: ScaleSpec = serde_json::from_value(json!({
1027 "name": "color",
1028 "scale": "quantize",
1029 "domain": {"data": "dataA", "field": "colZ"},
1030 "range": {"scheme": "blues", "count": 7}
1031 }))
1032 .unwrap();
1033
1034 let usage_scope = Vec::new();
1036 let task_scope = task_scope();
1037
1038 let usage =
1039 scale.datasets_column_usage(&None, &usage_scope, &task_scope, &Default::default());
1040
1041 let expected = DatasetsColumnUsage::empty().with_column_usage(
1042 &(Variable::new_data("dataA"), Vec::new()),
1043 ColumnUsage::from(vec!["colZ"].as_slice()),
1044 );
1045
1046 assert_eq!(usage, expected);
1047 }
1048
1049 #[test]
1050 fn test_signal_usage() {
1051 let signal: SignalSpec = serde_json::from_value(json!({
1052 "name": "indexDate",
1053 "description": "A date value that updates in response to mousemove.",
1054 "update": "length(data('brush2_store'))",
1055 "on": [{"events": "mousemove", "update": "length(data('dataA'))"}]
1056 }))
1057 .unwrap();
1058
1059 let usage_scope = Vec::new();
1061 let task_scope = task_scope();
1062
1063 let usage =
1064 signal.datasets_column_usage(&None, &usage_scope, &task_scope, &Default::default());
1065
1066 let expected = DatasetsColumnUsage::empty()
1067 .with_unknown_usage(&(Variable::new_data("brush2_store"), Vec::new()))
1068 .with_unknown_usage(&(Variable::new_data("dataA"), Vec::new()));
1069
1070 assert_eq!(usage, expected);
1071 }
1072}