vegafusion_core/planning/
split_domain_data.rs

1use crate::proto::gen::tasks::Variable;
2use crate::spec::chart::{ChartSpec, MutChartVisitor};
3use crate::spec::data::DataSpec;
4use crate::spec::scale::{
5    ScaleDataReferenceOrSignalSpec, ScaleDataReferenceSort, ScaleDataReferenceSortParameters,
6    ScaleDomainSpec, ScaleFieldReferenceSpec, ScaleFieldsReferencesSpec, ScaleSpec, ScaleTypeSpec,
7};
8use crate::spec::transform::aggregate::AggregateOpSpec;
9use crate::task_graph::graph::ScopedVariable;
10use crate::task_graph::scope::TaskScope;
11use itertools::Itertools;
12use std::collections::HashMap;
13use vegafusion_common::error::Result;
14use vegafusion_common::escape::escape_field;
15
16/// This optimization extracts the intensive data processing from scale.domain.data specifications
17/// into dedicated datasets. Domain calculations can't be entirely evaluated on the server, but
18/// this approach still allows the heavy data processing to happen on the server, and to avoid
19/// serializing the full dataset to send to the client.
20pub fn split_domain_data(
21    spec: &mut ChartSpec,
22) -> Result<HashMap<ScopedVariable, (ScopedVariable, String)>> {
23    let task_scope = spec.to_task_scope()?;
24    let mut visitor = SplitScaleDomainVisitor::new(&task_scope);
25    spec.walk_mut(&mut visitor)?;
26    for (scope, data) in visitor.new_datasets {
27        if scope.is_empty() {
28            spec.data.push(data);
29        } else {
30            let group = spec.get_nested_group_mut(scope.as_slice())?;
31            group.data.push(data);
32        }
33    }
34
35    Ok(visitor.domain_dataset_fields)
36}
37
38#[derive(Debug, Clone)]
39pub struct SplitScaleDomainVisitor<'a> {
40    pub task_scope: &'a TaskScope,
41    pub new_datasets: Vec<(Vec<u32>, DataSpec)>,
42    pub domain_dataset_fields: HashMap<ScopedVariable, (ScopedVariable, String)>,
43    pub nested_regex: regex::Regex,
44}
45
46impl<'a> SplitScaleDomainVisitor<'a> {
47    pub fn new(task_scope: &'a TaskScope) -> Self {
48        // Regex matching unescaped dot characters
49        let nested_regex = regex::Regex::new(r#"[^\\]\."#).unwrap();
50        Self {
51            new_datasets: Vec::new(),
52            task_scope,
53            domain_dataset_fields: Default::default(),
54            nested_regex,
55        }
56    }
57}
58
59impl MutChartVisitor for SplitScaleDomainVisitor<'_> {
60    fn visit_scale(&mut self, scale: &mut ScaleSpec, scope: &[u32]) -> Result<()> {
61        if let Some(domain) = scale.domain.clone() {
62            match domain {
63                ScaleDomainSpec::FieldReference(field_ref) => {
64                    self.split_field_reference_domain(scale, scope, &field_ref)?;
65                }
66                ScaleDomainSpec::FieldsReferences(fields_ref) => {
67                    self.split_fields_reference_domain(scale, scope, &fields_ref)?;
68                }
69                _ => {}
70            }
71        }
72        Ok(())
73    }
74}
75
76impl SplitScaleDomainVisitor<'_> {
77    fn split_fields_reference_domain(
78        &mut self,
79        scale: &mut ScaleSpec,
80        scope: &[u32],
81        fields_ref: &ScaleFieldsReferencesSpec,
82    ) -> Result<()> {
83        let discrete_scale = scale.type_.clone().unwrap_or_default().is_discrete();
84        let (new_datasets, new_dataset_scope, new_domain) = if discrete_scale {
85            // Extract sort field and op
86            let (sort_field, sort_op) = match &fields_ref.sort {
87                Some(ScaleDataReferenceSort::Parameters(sort_params)) => {
88                    (sort_params.field.clone(), sort_params.op.clone())
89                }
90                _ => (None, None),
91            };
92
93            // Iterate over data fields
94            let mut new_datasets = Vec::new();
95            let mut new_dataset_scope = Vec::new();
96            let mut new_fields = Vec::new();
97            for (field_index, field_ref) in fields_ref.fields.iter().enumerate() {
98                if let ScaleDataReferenceOrSignalSpec::Reference(field_ref) = field_ref {
99                    let field_name = &field_ref.field;
100                    let data_name = field_ref.data.clone();
101                    let scope_suffix = Self::build_scope_suffix(scope);
102
103                    let new_data_name = format!(
104                        "{}_{}_domain_{}{}_{}",
105                        data_name, scale.name, field_name, scope_suffix, field_index
106                    );
107
108                    let new_data = Self::make_discrete_domain_data(
109                        &data_name,
110                        &new_data_name,
111                        field_name,
112                        sort_field.clone(),
113                        sort_op.clone(),
114                    )?;
115                    new_datasets.push(new_data);
116
117                    // Compute new domain field
118                    let mut new_field_ref = field_ref.clone();
119                    new_field_ref.data = new_data_name.clone();
120                    new_fields.push(new_field_ref);
121
122                    // Compute scope for the original referenced dataset
123                    let resolved = self
124                        .task_scope
125                        .resolve_scope(&Variable::new_data(data_name.as_str()), scope)?;
126                    new_dataset_scope.push(resolved.scope);
127                }
128            }
129
130            // Create new domain specification that uses the new fields
131            let sort = match &fields_ref.sort {
132                Some(ScaleDataReferenceSort::Parameters(sort_params)) => Some(
133                    ScaleDataReferenceSort::Parameters(ScaleDataReferenceSortParameters {
134                        op: Some(AggregateOpSpec::Max),
135                        field: Some("sort_field".to_string()),
136                        ..sort_params.clone()
137                    }),
138                ),
139                sort => sort.clone(),
140            };
141
142            let new_domain = ScaleDomainSpec::FieldsReferences(ScaleFieldsReferencesSpec {
143                fields: new_fields
144                    .into_iter()
145                    .map(ScaleDataReferenceOrSignalSpec::Reference)
146                    .collect(),
147                sort,
148                extra: Default::default(),
149            });
150
151            (new_datasets, new_dataset_scope, new_domain)
152        } else {
153            // Scale type not supported
154            return Ok(());
155        };
156
157        // Overwrite scale domain with new domain
158        scale.domain = Some(new_domain);
159
160        for (new_dataset, scope) in new_datasets.into_iter().zip(new_dataset_scope) {
161            // Add new dataset at current scope
162            self.new_datasets.push((scope, new_dataset));
163        }
164
165        Ok(())
166    }
167
168    fn split_field_reference_domain(
169        &mut self,
170        scale: &mut ScaleSpec,
171        scope: &[u32],
172        field_ref: &ScaleFieldReferenceSpec,
173    ) -> Result<()> {
174        let discrete_scale = scale.type_.clone().unwrap_or_default().is_discrete();
175        let data_name = field_ref.data.clone();
176        let data_var = (Variable::new_data(&data_name), Vec::from(scope));
177        let field_name = &field_ref.field;
178
179        // Validate whether we can do anything
180        if self.nested_regex.is_match(field_name) {
181            // Nested fields not supported
182            return Ok(());
183        }
184
185        let scope_suffix = Self::build_scope_suffix(scope);
186
187        let new_data_name = format!(
188            "{}_{}_domain_{}{}",
189            data_name, scale.name, field_name, scope_suffix
190        );
191        let new_data_var = (Variable::new_data(&new_data_name), Vec::from(scope));
192        self.domain_dataset_fields
193            .insert(new_data_var, (data_var, field_name.clone()));
194
195        let (new_data, new_domain) = if discrete_scale {
196            // Extract sort field and op
197            let (sort_field, sort_op) =
198                if let Some(ScaleDataReferenceSort::Parameters(sort_params)) = &field_ref.sort {
199                    (sort_params.field.clone(), sort_params.op.clone())
200                } else {
201                    (None, None)
202                };
203
204            let new_data = Self::make_discrete_domain_data(
205                &data_name,
206                &new_data_name,
207                field_name,
208                sort_field,
209                sort_op,
210            )?;
211
212            // Create new domain specification that uses the new dataset
213            let sort = match &field_ref.sort {
214                Some(ScaleDataReferenceSort::Parameters(sort_params)) => Some(
215                    ScaleDataReferenceSort::Parameters(ScaleDataReferenceSortParameters {
216                        op: Some(AggregateOpSpec::Max),
217                        field: Some("sort_field".to_string()),
218                        ..sort_params.clone()
219                    }),
220                ),
221                sort => sort.clone(),
222            };
223
224            let new_domain = ScaleDomainSpec::FieldReference(ScaleFieldReferenceSpec {
225                data: new_data_name,
226                field: field_name.clone(),
227                sort,
228                extra: Default::default(),
229            });
230
231            (new_data, new_domain)
232        } else if matches!(
233            scale.type_.clone().unwrap_or_default(),
234            ScaleTypeSpec::Linear
235        ) {
236            // Create derived dataset that performs the min/max calculations
237            let new_data: DataSpec = serde_json::from_value(serde_json::json!(
238                {
239                    "name": new_data_name,
240                    "source": data_name,
241                    "transform": [
242                        {
243                            "type": "formula",
244                            "as": field_name,
245                            "expr": format!("+datum['{field_name}']")
246                        }, {
247                            "type": "aggregate",
248                            "fields": [field_name, field_name],
249                            "ops": ["min", "max"],
250                            "as": ["min", "max"],
251                             "groupby": []
252                        }
253                    ]
254                }
255            ))?;
256
257            // Create new domain specification that uses the new dataset
258            let new_domain: ScaleDomainSpec = serde_json::from_value(serde_json::json!([
259                {
260                    "signal":
261                        format!(
262                            "(data(\"{}\")[0] || {{}}).min",
263                            escape_field(&new_data_name)
264                        )
265                },
266                {
267                    "signal":
268                        format!(
269                            "(data(\"{}\")[0] || {{}}).max",
270                            escape_field(&new_data_name)
271                        )
272                }
273            ]))?;
274
275            (new_data, new_domain)
276        } else {
277            // Unsupported scale type
278            return Ok(());
279        };
280
281        // Overwrite scale domain with new domain
282        scale.domain = Some(new_domain);
283
284        // Add new dataset at same scope as source dataset
285        let resolved = self
286            .task_scope
287            .resolve_scope(&Variable::new_data(data_name.as_str()), scope)?;
288
289        // Add new dataset at current scope
290        self.new_datasets.push((resolved.scope, new_data));
291        Ok(())
292    }
293
294    fn build_scope_suffix(scope: &[u32]) -> String {
295        // Build suffix using scope
296        let mut scope_suffix = scope.iter().map(|s| s.to_string()).join("_");
297        if !scope_suffix.is_empty() {
298            scope_suffix.insert(0, '_');
299        }
300        scope_suffix
301    }
302
303    /// Make a Vega dataset that computes the discrete values of an input dataset with
304    /// an optional sorting field
305    fn make_discrete_domain_data(
306        data_name: &str,
307        new_data_name: &str,
308        field_name: &String,
309        sort_field: Option<String>,
310        sort_op: Option<AggregateOpSpec>,
311    ) -> Result<DataSpec> {
312        Ok(if let Some(sort_op) = sort_op {
313            // Will sort by the result of an aggregation operation
314            let sort_field = sort_field.unwrap_or_else(|| field_name.clone());
315            serde_json::from_value(serde_json::json!(
316                {
317                    "name": new_data_name,
318                    "source": data_name,
319                    "transform": [
320                        {
321                            "type": "aggregate",
322                            "as": ["sort_field"],
323                            "groupby": [field_name],
324                            "ops": [sort_op],
325                            "fields": [sort_field]
326                        }
327                    ]
328               }
329            ))?
330        } else {
331            // Will sort by the grouped field values
332            serde_json::from_value(serde_json::json!(
333                {
334                    "name": new_data_name,
335                    "source": data_name,
336                    "transform": [
337                        {
338                            "type": "aggregate",
339                            "as": [],
340                            "groupby": [field_name],
341                            "ops": [],
342                            "fields": []
343                        }, {
344                            "type": "formula",
345                            "as": "sort_field",
346                            "expr": format!("datum['{field_name}']")
347                        }
348                    ]
349               }
350            ))?
351        })
352    }
353}