vegafusion_core/planning/
stringify_local_datetimes.rs

1use crate::error::Result;
2use crate::planning::stitch::CommPlan;
3use crate::proto::gen::tasks::{Variable, VariableNamespace};
4use crate::spec::axis::{AxisFormatTypeSpec, AxisSpec};
5use crate::spec::chart::{ChartSpec, ChartVisitor, MutChartVisitor};
6use crate::spec::data::DataSpec;
7use crate::spec::mark::{MarkEncodingField, MarkSpec};
8use crate::spec::scale::{
9    ScaleDataReferenceOrSignalSpec, ScaleDomainSpec, ScaleSpec, ScaleTypeSpec,
10};
11use crate::spec::transform::formula::FormulaTransformSpec;
12use crate::spec::transform::TransformSpec;
13use crate::task_graph::graph::ScopedVariable;
14use crate::task_graph::scope::TaskScope;
15use itertools::sorted;
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18use vegafusion_common::escape::unescape_field;
19
20/// This planning phase converts select datetime columns from the default millisecond UTC
21/// representation to naive datetime strings in an "output timezone". This is only done for datetime
22/// columns that are scaled using a (non-utc) `time` scale in the client specification.
23///
24/// This is needed in order for the chart displayed by the client to be consistent regardless of
25/// the browser's local timezone.  Viewers from all timezones should see the chart displayed as
26/// it would look when generated by pure Vega in the `output_tz` timezone.
27pub fn stringify_local_datetimes(
28    server_spec: &mut ChartSpec,
29    client_spec: &mut ChartSpec,
30    comm_plan: &CommPlan,
31    domain_dataset_fields: &HashMap<ScopedVariable, (ScopedVariable, String)>,
32) -> Result<()> {
33    // Build task scope for client spec
34    let client_scope = client_spec.to_task_scope()?;
35
36    // Collect the name/scope of all time scales
37    let mut visitor = CollectScalesTypesVisitor::new();
38    client_spec.walk(&mut visitor)?;
39    let local_time_scales = visitor.local_time_scales;
40
41    // Gather candidate datasets
42    let server_to_client_datasets: HashSet<_> = comm_plan
43        .server_to_client
44        .iter()
45        .filter(|&var| var.0.namespace == VariableNamespace::Data as i32)
46        .cloned()
47        .collect();
48
49    let mut visitor = CollectCandidateDatasetMapping::new(
50        client_scope.clone(),
51        server_to_client_datasets.clone(),
52    );
53    client_spec.walk(&mut visitor)?;
54    let candidate_dataset_mapping = visitor.candidate_dataset_mapping;
55
56    // Collect data fields to convert to datetime strings
57    let mut visitor = CollectLocalTimeScaledFieldsVisitor::new(
58        client_scope,
59        local_time_scales,
60        candidate_dataset_mapping,
61    );
62    client_spec.walk(&mut visitor)?;
63
64    // Collect fields that are produced in datasets
65    let mut visitor = CollectLocalTimeDataFieldsVisitor::try_new(
66        visitor.local_datetime_fields,
67        &server_to_client_datasets,
68        server_spec,
69    )?;
70    server_spec.walk(&mut visitor)?;
71    let local_datetime_fields = visitor.local_datetime_fields;
72
73    // Add formula transforms to server spec
74    let server_scope = server_spec.to_task_scope()?;
75    let mut visitor = StringifyLocalDatetimeFieldsVisitor::new(
76        &local_datetime_fields,
77        &server_scope,
78        domain_dataset_fields,
79    );
80    server_spec.walk_mut(&mut visitor)?;
81
82    // Add format spec to client spec (to parse strings as local dates)
83    let mut visitor =
84        FormatLocalDatetimeFieldsVisitor::new(&local_datetime_fields, domain_dataset_fields);
85    client_spec.walk_mut(&mut visitor)?;
86
87    Ok(())
88}
89
90/// Visitor to collect the non-UTC time scales
91struct CollectScalesTypesVisitor {
92    pub local_time_scales: HashSet<ScopedVariable>,
93}
94
95impl CollectScalesTypesVisitor {
96    pub fn new() -> Self {
97        Self {
98            local_time_scales: Default::default(),
99        }
100    }
101}
102
103impl ChartVisitor for CollectScalesTypesVisitor {
104    fn visit_scale(&mut self, scale: &ScaleSpec, scope: &[u32]) -> Result<()> {
105        let var = (Variable::new_scale(&scale.name), Vec::from(scope));
106        if matches!(scale.type_, Some(ScaleTypeSpec::Time)) {
107            self.local_time_scales.insert(var);
108        }
109
110        Ok(())
111    }
112
113    fn visit_axis(&mut self, axis: &AxisSpec, scope: &[u32]) -> Result<()> {
114        if matches!(axis.format_type, Some(AxisFormatTypeSpec::Time)) {
115            let var = (Variable::new_scale(&axis.scale), Vec::from(scope));
116            self.local_time_scales.insert(var);
117        }
118        Ok(())
119    }
120}
121
122/// Visitor to collect mapping from mark dataset to server_to_client dataset
123/// (following facet aliasing)
124struct CollectCandidateDatasetMapping {
125    pub scope: TaskScope,
126    pub server_to_client_datasets: HashSet<ScopedVariable>,
127    pub candidate_dataset_mapping: HashMap<ScopedVariable, ScopedVariable>,
128}
129
130impl CollectCandidateDatasetMapping {
131    pub fn new(scope: TaskScope, server_to_client_datasets: HashSet<ScopedVariable>) -> Self {
132        // Initialize candidate_dataset_mapping with all server_to_client datasets
133        let candidate_dataset_mapping: HashMap<_, _> = server_to_client_datasets
134            .iter()
135            .map(|var| (var.clone(), var.clone()))
136            .collect();
137        Self {
138            scope,
139            server_to_client_datasets,
140            candidate_dataset_mapping,
141        }
142    }
143}
144
145impl ChartVisitor for CollectCandidateDatasetMapping {
146    fn visit_group_mark(&mut self, mark: &MarkSpec, scope: &[u32]) -> Result<()> {
147        // Add to candidate_dataset_mapping facet datasets that reference a server_to_scale dataset
148        if let Some(from) = &mark.from {
149            if let Some(facet) = &from.facet {
150                let data_var = Variable::new_data(&facet.data);
151                let resolved_data = self.scope.resolve_scope(&data_var, scope)?;
152                let resolved_data_scoped: ScopedVariable =
153                    (resolved_data.var.clone(), resolved_data.scope);
154
155                if self
156                    .server_to_client_datasets
157                    .contains(&resolved_data_scoped)
158                {
159                    let facet_var = Variable::new_data(&facet.name);
160                    let facet_scoped_var: ScopedVariable = (facet_var, Vec::from(scope));
161                    self.candidate_dataset_mapping
162                        .insert(facet_scoped_var, resolved_data_scoped);
163                }
164            }
165        }
166
167        Ok(())
168    }
169}
170
171/// Visitor to collect data fields that are scaled by a non-UTC time scale
172struct CollectLocalTimeScaledFieldsVisitor {
173    pub scope: TaskScope,
174    pub candidate_dataset_mapping: HashMap<ScopedVariable, ScopedVariable>,
175    pub local_time_scales: HashSet<ScopedVariable>,
176    pub local_datetime_fields: HashMap<ScopedVariable, HashSet<String>>,
177}
178
179impl CollectLocalTimeScaledFieldsVisitor {
180    pub fn new(
181        scope: TaskScope,
182        local_time_scales: HashSet<ScopedVariable>,
183        candidate_dataset_mapping: HashMap<ScopedVariable, ScopedVariable>,
184    ) -> Self {
185        Self {
186            scope,
187            candidate_dataset_mapping,
188            local_time_scales,
189            local_datetime_fields: Default::default(),
190        }
191    }
192}
193
194impl ChartVisitor for CollectLocalTimeScaledFieldsVisitor {
195    fn visit_non_group_mark(&mut self, mark: &MarkSpec, scope: &[u32]) -> Result<()> {
196        if let Some(mark_from) = &mark.from {
197            if let Some(mark_data) = &mark_from.data {
198                let mark_data_var = Variable::new_data(mark_data);
199                let resolved_mark_data = self.scope.resolve_scope(&mark_data_var, scope)?;
200                let resolved_mark_data_scoped =
201                    (resolved_mark_data.var.clone(), resolved_mark_data.scope);
202                if let Some(server_to_client_data_var) = self
203                    .candidate_dataset_mapping
204                    .get(&resolved_mark_data_scoped)
205                {
206                    // We've found a mark with a dataset that is eligible for date string
207                    // conversion
208                    if let Some(encode) = &mark.encode {
209                        for (_, encodings) in encode.encodings.iter() {
210                            for (_, channels) in encodings.channels.iter() {
211                                for channel in channels.to_vec() {
212                                    if let (Some(scale), Some(MarkEncodingField::Field(field))) =
213                                        (&channel.scale, &channel.field)
214                                    {
215                                        let scale_var = Variable::new_scale(scale);
216                                        let resolved_scale =
217                                            self.scope.resolve_scope(&scale_var, scope)?;
218                                        let resolved_scoped_scale = (
219                                            resolved_scale.var.clone(),
220                                            resolved_scale.scope.clone(),
221                                        );
222
223                                        if self.local_time_scales.contains(&resolved_scoped_scale) {
224                                            // Save off field for dataset
225                                            let entry = self
226                                                .local_datetime_fields
227                                                .entry(server_to_client_data_var.clone());
228                                            let fields = entry.or_default();
229                                            fields.insert(field.clone());
230                                        }
231                                    }
232                                }
233                            }
234                        }
235                    }
236                }
237            }
238        }
239        Ok(())
240    }
241
242    fn visit_scale(&mut self, scale: &ScaleSpec, scope: &[u32]) -> Result<()> {
243        let scale_var: ScopedVariable = (Variable::new_scale(&scale.name), Vec::from(scope));
244        if self.local_time_scales.contains(&scale_var) {
245            if let Some(domain) = &scale.domain {
246                let field_refs = match domain {
247                    ScaleDomainSpec::FieldReference(field_ref) => {
248                        vec![field_ref.clone()]
249                    }
250                    ScaleDomainSpec::FieldsReference(fields_ref) => {
251                        fields_ref.to_field_references()
252                    }
253                    ScaleDomainSpec::FieldsReferences(fields_ref) => fields_ref
254                        .fields
255                        .iter()
256                        .filter_map(|f| {
257                            if let ScaleDataReferenceOrSignalSpec::Reference(f) = f {
258                                Some(f.clone())
259                            } else {
260                                None
261                            }
262                        })
263                        .collect(),
264                    _ => Default::default(),
265                };
266                for field_ref in &field_refs {
267                    let data_var = Variable::new_data(&field_ref.data);
268                    if let Ok(resolved) = self.scope.resolve_scope(&data_var, scope) {
269                        let scoped_data_var = (resolved.var, resolved.scope);
270                        if self
271                            .candidate_dataset_mapping
272                            .contains_key(&scoped_data_var)
273                        {
274                            // Save off field for dataset
275                            let entry = self.local_datetime_fields.entry(scoped_data_var.clone());
276                            let fields = entry.or_default();
277                            fields.insert(field_ref.field.clone());
278                        }
279                    }
280                }
281            }
282        }
283        Ok(())
284    }
285}
286
287fn get_local_datetime_fields(
288    data_var: &ScopedVariable,
289    local_datetime_fields: &HashMap<ScopedVariable, HashSet<String>>,
290    domain_dataset_fields: &HashMap<ScopedVariable, (ScopedVariable, String)>,
291) -> HashSet<String> {
292    // Map dataset variable
293    if let Some(fields) = local_datetime_fields.get(data_var) {
294        fields.clone()
295    } else if let Some((mapped_var, field)) = domain_dataset_fields.get(data_var) {
296        if let Some(fields) = local_datetime_fields.get(mapped_var) {
297            if fields.contains(field) {
298                vec![field.clone()].into_iter().collect()
299            } else {
300                Default::default()
301            }
302        } else {
303            Default::default()
304        }
305    } else {
306        Default::default()
307    }
308}
309
310/// Visitor to collect local datetime columns produced by datasets
311struct CollectLocalTimeDataFieldsVisitor<'a> {
312    pub local_datetime_fields: HashMap<ScopedVariable, HashSet<String>>,
313    pub server_to_client_datasets: &'a HashSet<ScopedVariable>,
314    pub chart_spec: &'a ChartSpec,
315    pub task_scope: TaskScope,
316}
317
318impl<'a> CollectLocalTimeDataFieldsVisitor<'a> {
319    pub fn try_new(
320        local_datetime_fields: HashMap<ScopedVariable, HashSet<String>>,
321        server_to_client_datasets: &'a HashSet<ScopedVariable>,
322        chart_spec: &'a ChartSpec,
323    ) -> Result<Self> {
324        Ok(Self {
325            local_datetime_fields,
326            server_to_client_datasets,
327            chart_spec,
328            task_scope: chart_spec.to_task_scope()?,
329        })
330    }
331}
332
333impl ChartVisitor for CollectLocalTimeDataFieldsVisitor<'_> {
334    fn visit_data(&mut self, data: &DataSpec, scope: &[u32]) -> Result<()> {
335        // Add local datetime columns produced by the dataset.
336        // Note: This isn't quite complete, for derived datasets input_local_datetime_columns
337        //       should be computed from the parent dataset, but we won't be able to do this
338        //       in a visitor
339        let local_columns =
340            data.local_datetime_columns_produced(self.chart_spec, &self.task_scope, scope)?;
341        let dataset_var: ScopedVariable = (Variable::new_data(&data.name), Vec::from(scope));
342        if self.server_to_client_datasets.contains(&dataset_var) {
343            match self.local_datetime_fields.entry(dataset_var) {
344                Entry::Occupied(mut v) => {
345                    v.get_mut().extend(local_columns);
346                }
347                Entry::Vacant(v) => {
348                    v.insert(local_columns.into_iter().collect());
349                }
350            }
351        }
352        Ok(())
353    }
354}
355
356/// Visitor to stringify select datetime fields
357struct StringifyLocalDatetimeFieldsVisitor<'a> {
358    pub local_datetime_fields: &'a HashMap<ScopedVariable, HashSet<String>>,
359    pub scope: &'a TaskScope,
360    pub domain_dataset_fields: &'a HashMap<ScopedVariable, (ScopedVariable, String)>,
361}
362
363impl<'a> StringifyLocalDatetimeFieldsVisitor<'a> {
364    pub fn new(
365        local_datetime_fields: &'a HashMap<ScopedVariable, HashSet<String>>,
366        scope: &'a TaskScope,
367        domain_dataset_fields: &'a HashMap<ScopedVariable, (ScopedVariable, String)>,
368    ) -> Self {
369        Self {
370            local_datetime_fields,
371            scope,
372            domain_dataset_fields,
373        }
374    }
375}
376
377impl MutChartVisitor for StringifyLocalDatetimeFieldsVisitor<'_> {
378    fn visit_data(&mut self, data: &mut DataSpec, scope: &[u32]) -> Result<()> {
379        let data_var = (Variable::new_data(&data.name), Vec::from(scope));
380
381        // Map dataset variable
382        let fields = get_local_datetime_fields(
383            &data_var,
384            self.local_datetime_fields,
385            self.domain_dataset_fields,
386        );
387
388        for field in sorted(fields) {
389            let field = unescape_field(&field);
390            let expr_str =
391                format!("timeFormat(toDate(datum['{field}'], 'local'), '%Y-%m-%dT%H:%M:%S.%L')");
392
393            let transforms = &mut data.transform;
394            let transform = FormulaTransformSpec {
395                expr: expr_str,
396                as_: field,
397                extra: Default::default(),
398            };
399            transforms.push(TransformSpec::Formula(transform))
400        }
401
402        // Check if dataset is a child a stringified dataset. If so, we need to convert
403        // datetime strings back to the utc millisecond representation
404        if let Some(source) = &data.source {
405            let source_var = Variable::new_data(source);
406            let source_resolved = self.scope.resolve_scope(&source_var, scope)?;
407            let source_resolved_var = (source_resolved.var, source_resolved.scope);
408            if let Some(fields) = self.local_datetime_fields.get(&source_resolved_var) {
409                for field in sorted(fields) {
410                    let field = unescape_field(field);
411                    let expr_str = format!("toDate(datum['{field}'], 'local')");
412                    let transforms = &mut data.transform;
413                    let transform = FormulaTransformSpec {
414                        expr: expr_str,
415                        as_: field.to_string(),
416                        extra: Default::default(),
417                    };
418                    transforms.insert(0, TransformSpec::Formula(transform))
419                }
420            }
421        }
422
423        Ok(())
424    }
425}
426
427/// Visitor to add format parse specification for local dates
428struct FormatLocalDatetimeFieldsVisitor<'a> {
429    pub local_datetime_fields: &'a HashMap<ScopedVariable, HashSet<String>>,
430    pub domain_dataset_fields: &'a HashMap<ScopedVariable, (ScopedVariable, String)>,
431}
432
433impl<'a> FormatLocalDatetimeFieldsVisitor<'a> {
434    pub fn new(
435        local_datetime_fields: &'a HashMap<ScopedVariable, HashSet<String>>,
436        domain_dataset_fields: &'a HashMap<ScopedVariable, (ScopedVariable, String)>,
437    ) -> Self {
438        Self {
439            local_datetime_fields,
440            domain_dataset_fields,
441        }
442    }
443}
444
445impl MutChartVisitor for FormatLocalDatetimeFieldsVisitor<'_> {
446    fn visit_data(&mut self, data: &mut DataSpec, scope: &[u32]) -> Result<()> {
447        let data_var = (Variable::new_data(&data.name), Vec::from(scope));
448        let fields = get_local_datetime_fields(
449            &data_var,
450            self.local_datetime_fields,
451            self.domain_dataset_fields,
452        );
453        for field in sorted(fields) {
454            let field = unescape_field(&field);
455            let transforms = &mut data.transform;
456            let transform = FormulaTransformSpec {
457                expr: format!("toDate(datum['{field}'])"),
458                as_: field.to_string(),
459                extra: Default::default(),
460            };
461            transforms.insert(0, TransformSpec::Formula(transform))
462        }
463
464        Ok(())
465    }
466}