vegafusion_core/planning/
split_domain_data.rs1use crate::proto::gen::tasks::Variable;
2use crate::spec::chart::{ChartSpec, MutChartVisitor};
3use crate::spec::data::DataSpec;
4use crate::spec::scale::{
5 ScaleDataReferenceOrSignalSpec, ScaleDataReferenceSort, ScaleDataReferenceSortParameters,
6 ScaleDomainSpec, ScaleFieldReferenceSpec, ScaleFieldsReferencesSpec, ScaleSpec, ScaleTypeSpec,
7};
8use crate::spec::transform::aggregate::AggregateOpSpec;
9use crate::task_graph::graph::ScopedVariable;
10use crate::task_graph::scope::TaskScope;
11use itertools::Itertools;
12use std::collections::HashMap;
13use vegafusion_common::error::Result;
14use vegafusion_common::escape::escape_field;
15
16pub fn split_domain_data(
21 spec: &mut ChartSpec,
22) -> Result<HashMap<ScopedVariable, (ScopedVariable, String)>> {
23 let task_scope = spec.to_task_scope()?;
24 let mut visitor = SplitScaleDomainVisitor::new(&task_scope);
25 spec.walk_mut(&mut visitor)?;
26 for (scope, data) in visitor.new_datasets {
27 if scope.is_empty() {
28 spec.data.push(data);
29 } else {
30 let group = spec.get_nested_group_mut(scope.as_slice())?;
31 group.data.push(data);
32 }
33 }
34
35 Ok(visitor.domain_dataset_fields)
36}
37
38#[derive(Debug, Clone)]
39pub struct SplitScaleDomainVisitor<'a> {
40 pub task_scope: &'a TaskScope,
41 pub new_datasets: Vec<(Vec<u32>, DataSpec)>,
42 pub domain_dataset_fields: HashMap<ScopedVariable, (ScopedVariable, String)>,
43 pub nested_regex: regex::Regex,
44}
45
46impl<'a> SplitScaleDomainVisitor<'a> {
47 pub fn new(task_scope: &'a TaskScope) -> Self {
48 let nested_regex = regex::Regex::new(r#"[^\\]\."#).unwrap();
50 Self {
51 new_datasets: Vec::new(),
52 task_scope,
53 domain_dataset_fields: Default::default(),
54 nested_regex,
55 }
56 }
57}
58
59impl MutChartVisitor for SplitScaleDomainVisitor<'_> {
60 fn visit_scale(&mut self, scale: &mut ScaleSpec, scope: &[u32]) -> Result<()> {
61 if let Some(domain) = scale.domain.clone() {
62 match domain {
63 ScaleDomainSpec::FieldReference(field_ref) => {
64 self.split_field_reference_domain(scale, scope, &field_ref)?;
65 }
66 ScaleDomainSpec::FieldsReferences(fields_ref) => {
67 self.split_fields_reference_domain(scale, scope, &fields_ref)?;
68 }
69 _ => {}
70 }
71 }
72 Ok(())
73 }
74}
75
76impl SplitScaleDomainVisitor<'_> {
77 fn split_fields_reference_domain(
78 &mut self,
79 scale: &mut ScaleSpec,
80 scope: &[u32],
81 fields_ref: &ScaleFieldsReferencesSpec,
82 ) -> Result<()> {
83 let discrete_scale = scale.type_.clone().unwrap_or_default().is_discrete();
84 let (new_datasets, new_dataset_scope, new_domain) = if discrete_scale {
85 let (sort_field, sort_op) = match &fields_ref.sort {
87 Some(ScaleDataReferenceSort::Parameters(sort_params)) => {
88 (sort_params.field.clone(), sort_params.op.clone())
89 }
90 _ => (None, None),
91 };
92
93 let mut new_datasets = Vec::new();
95 let mut new_dataset_scope = Vec::new();
96 let mut new_fields = Vec::new();
97 for (field_index, field_ref) in fields_ref.fields.iter().enumerate() {
98 if let ScaleDataReferenceOrSignalSpec::Reference(field_ref) = field_ref {
99 let field_name = &field_ref.field;
100 let data_name = field_ref.data.clone();
101 let scope_suffix = Self::build_scope_suffix(scope);
102
103 let new_data_name = format!(
104 "{}_{}_domain_{}{}_{}",
105 data_name, scale.name, field_name, scope_suffix, field_index
106 );
107
108 let new_data = Self::make_discrete_domain_data(
109 &data_name,
110 &new_data_name,
111 field_name,
112 sort_field.clone(),
113 sort_op.clone(),
114 )?;
115 new_datasets.push(new_data);
116
117 let mut new_field_ref = field_ref.clone();
119 new_field_ref.data = new_data_name.clone();
120 new_fields.push(new_field_ref);
121
122 let resolved = self
124 .task_scope
125 .resolve_scope(&Variable::new_data(data_name.as_str()), scope)?;
126 new_dataset_scope.push(resolved.scope);
127 }
128 }
129
130 let sort = match &fields_ref.sort {
132 Some(ScaleDataReferenceSort::Parameters(sort_params)) => Some(
133 ScaleDataReferenceSort::Parameters(ScaleDataReferenceSortParameters {
134 op: Some(AggregateOpSpec::Max),
135 field: Some("sort_field".to_string()),
136 ..sort_params.clone()
137 }),
138 ),
139 sort => sort.clone(),
140 };
141
142 let new_domain = ScaleDomainSpec::FieldsReferences(ScaleFieldsReferencesSpec {
143 fields: new_fields
144 .into_iter()
145 .map(ScaleDataReferenceOrSignalSpec::Reference)
146 .collect(),
147 sort,
148 extra: Default::default(),
149 });
150
151 (new_datasets, new_dataset_scope, new_domain)
152 } else {
153 return Ok(());
155 };
156
157 scale.domain = Some(new_domain);
159
160 for (new_dataset, scope) in new_datasets.into_iter().zip(new_dataset_scope) {
161 self.new_datasets.push((scope, new_dataset));
163 }
164
165 Ok(())
166 }
167
168 fn split_field_reference_domain(
169 &mut self,
170 scale: &mut ScaleSpec,
171 scope: &[u32],
172 field_ref: &ScaleFieldReferenceSpec,
173 ) -> Result<()> {
174 let discrete_scale = scale.type_.clone().unwrap_or_default().is_discrete();
175 let data_name = field_ref.data.clone();
176 let data_var = (Variable::new_data(&data_name), Vec::from(scope));
177 let field_name = &field_ref.field;
178
179 if self.nested_regex.is_match(field_name) {
181 return Ok(());
183 }
184
185 let scope_suffix = Self::build_scope_suffix(scope);
186
187 let new_data_name = format!(
188 "{}_{}_domain_{}{}",
189 data_name, scale.name, field_name, scope_suffix
190 );
191 let new_data_var = (Variable::new_data(&new_data_name), Vec::from(scope));
192 self.domain_dataset_fields
193 .insert(new_data_var, (data_var, field_name.clone()));
194
195 let (new_data, new_domain) = if discrete_scale {
196 let (sort_field, sort_op) =
198 if let Some(ScaleDataReferenceSort::Parameters(sort_params)) = &field_ref.sort {
199 (sort_params.field.clone(), sort_params.op.clone())
200 } else {
201 (None, None)
202 };
203
204 let new_data = Self::make_discrete_domain_data(
205 &data_name,
206 &new_data_name,
207 field_name,
208 sort_field,
209 sort_op,
210 )?;
211
212 let sort = match &field_ref.sort {
214 Some(ScaleDataReferenceSort::Parameters(sort_params)) => Some(
215 ScaleDataReferenceSort::Parameters(ScaleDataReferenceSortParameters {
216 op: Some(AggregateOpSpec::Max),
217 field: Some("sort_field".to_string()),
218 ..sort_params.clone()
219 }),
220 ),
221 sort => sort.clone(),
222 };
223
224 let new_domain = ScaleDomainSpec::FieldReference(ScaleFieldReferenceSpec {
225 data: new_data_name,
226 field: field_name.clone(),
227 sort,
228 extra: Default::default(),
229 });
230
231 (new_data, new_domain)
232 } else if matches!(
233 scale.type_.clone().unwrap_or_default(),
234 ScaleTypeSpec::Linear
235 ) {
236 let new_data: DataSpec = serde_json::from_value(serde_json::json!(
238 {
239 "name": new_data_name,
240 "source": data_name,
241 "transform": [
242 {
243 "type": "formula",
244 "as": field_name,
245 "expr": format!("+datum['{field_name}']")
246 }, {
247 "type": "aggregate",
248 "fields": [field_name, field_name],
249 "ops": ["min", "max"],
250 "as": ["min", "max"],
251 "groupby": []
252 }
253 ]
254 }
255 ))?;
256
257 let new_domain: ScaleDomainSpec = serde_json::from_value(serde_json::json!([
259 {
260 "signal":
261 format!(
262 "(data(\"{}\")[0] || {{}}).min",
263 escape_field(&new_data_name)
264 )
265 },
266 {
267 "signal":
268 format!(
269 "(data(\"{}\")[0] || {{}}).max",
270 escape_field(&new_data_name)
271 )
272 }
273 ]))?;
274
275 (new_data, new_domain)
276 } else {
277 return Ok(());
279 };
280
281 scale.domain = Some(new_domain);
283
284 let resolved = self
286 .task_scope
287 .resolve_scope(&Variable::new_data(data_name.as_str()), scope)?;
288
289 self.new_datasets.push((resolved.scope, new_data));
291 Ok(())
292 }
293
294 fn build_scope_suffix(scope: &[u32]) -> String {
295 let mut scope_suffix = scope.iter().map(|s| s.to_string()).join("_");
297 if !scope_suffix.is_empty() {
298 scope_suffix.insert(0, '_');
299 }
300 scope_suffix
301 }
302
303 fn make_discrete_domain_data(
306 data_name: &str,
307 new_data_name: &str,
308 field_name: &String,
309 sort_field: Option<String>,
310 sort_op: Option<AggregateOpSpec>,
311 ) -> Result<DataSpec> {
312 Ok(if let Some(sort_op) = sort_op {
313 let sort_field = sort_field.unwrap_or_else(|| field_name.clone());
315 serde_json::from_value(serde_json::json!(
316 {
317 "name": new_data_name,
318 "source": data_name,
319 "transform": [
320 {
321 "type": "aggregate",
322 "as": ["sort_field"],
323 "groupby": [field_name],
324 "ops": [sort_op],
325 "fields": [sort_field]
326 }
327 ]
328 }
329 ))?
330 } else {
331 serde_json::from_value(serde_json::json!(
333 {
334 "name": new_data_name,
335 "source": data_name,
336 "transform": [
337 {
338 "type": "aggregate",
339 "as": [],
340 "groupby": [field_name],
341 "ops": [],
342 "fields": []
343 }, {
344 "type": "formula",
345 "as": "sort_field",
346 "expr": format!("datum['{field_name}']")
347 }
348 ]
349 }
350 ))?
351 })
352 }
353}