vegafusion_core/planning/
optimize_server.rs

1use crate::error::Result;
2use crate::proto::gen::tasks::Variable;
3use crate::spec::chart::{ChartSpec, MutChartVisitor};
4use crate::spec::data::DataSpec;
5
6use std::collections::HashSet;
7
8/// This optimization pass examines data nodes that have been planned to execute on the server.
9/// For URL data nodes, if the node has external input vars in it's transforms, it is split
10/// into an upper part that has no dependencies. This way, the data isn't loaded from the external
11/// url repeatedly when the values of the input variables changes
12pub fn split_data_url_nodes(spec: &mut ChartSpec) -> Result<()> {
13    let mut visitor = SplitUrlDataNodeVisitor::new();
14    spec.walk_mut(&mut visitor)?;
15
16    for (parent_data, scope) in visitor.parent_url_data_nodes {
17        // Add parent data at appropriate scope
18        if scope.is_empty() {
19            // Add parent data node to spec
20            spec.data.push(parent_data)
21        } else {
22            // Add parent data node to spec
23            let parent_group = spec.get_nested_group_mut(scope.as_slice())?;
24            parent_group.data.push(parent_data);
25        }
26    }
27
28    Ok(())
29}
30
31#[derive(Debug, Clone, Default)]
32pub struct SplitUrlDataNodeVisitor {
33    pub parent_url_data_nodes: Vec<(DataSpec, Vec<u32>)>,
34}
35
36impl SplitUrlDataNodeVisitor {
37    pub fn new() -> Self {
38        Self {
39            parent_url_data_nodes: Default::default(),
40        }
41    }
42}
43
44impl MutChartVisitor for SplitUrlDataNodeVisitor {
45    fn visit_data(&mut self, data: &mut DataSpec, scope: &[u32]) -> crate::error::Result<()> {
46        if data.url.is_some() {
47            let mut pipeline_vars: HashSet<Variable> = HashSet::new();
48            let mut num_supported = 0;
49            for (i, tx) in data.transform.iter().enumerate() {
50                let has_external_input = !tx
51                    .input_vars()
52                    .unwrap_or_default()
53                    .iter()
54                    .all(|input_var| pipeline_vars.contains(&input_var.var));
55
56                // Add output signals so we know they are available later
57                for sig in &tx.output_signals() {
58                    pipeline_vars.insert(Variable::new_signal(sig));
59                }
60
61                if has_external_input {
62                    break;
63                } else {
64                    num_supported = i + 1
65                }
66            }
67
68            if num_supported < data.transform.len() {
69                // Perform split
70                let parents_transforms = Vec::from(&data.transform[..num_supported]);
71                let child_transforms = Vec::from(&data.transform[num_supported..]);
72
73                // Compute new name for parent data
74                let mut parent_name = data.name.clone();
75                parent_name.insert_str(0, "_parent_");
76
77                // Clone data for parent (with updated name)
78                let mut parent_data = data.clone();
79                parent_data.name = parent_name.clone();
80                parent_data.transform = parents_transforms;
81
82                // Save parent data node
83                self.parent_url_data_nodes
84                    .push((parent_data, Vec::from(scope)));
85
86                // Update child data spec:
87                //   - Same name
88                //   - Add source of parent
89                //   - Update remaining transforms
90                data.source = Some(parent_name.clone());
91                data.format = None;
92                data.values = None;
93                data.transform = child_transforms;
94                data.on = None;
95                data.url = None;
96            }
97        }
98        Ok(())
99    }
100}