1use crate::error::Result;
2use crate::planning::stitch::CommPlan;
3use crate::proto::gen::tasks::{Variable, VariableNamespace};
4use crate::spec::axis::{AxisFormatTypeSpec, AxisSpec};
5use crate::spec::chart::{ChartSpec, ChartVisitor, MutChartVisitor};
6use crate::spec::data::DataSpec;
7use crate::spec::mark::{MarkEncodingField, MarkSpec};
8use crate::spec::scale::{
9 ScaleDataReferenceOrSignalSpec, ScaleDomainSpec, ScaleSpec, ScaleTypeSpec,
10};
11use crate::spec::transform::formula::FormulaTransformSpec;
12use crate::spec::transform::TransformSpec;
13use crate::task_graph::graph::ScopedVariable;
14use crate::task_graph::scope::TaskScope;
15use itertools::sorted;
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18use vegafusion_common::escape::unescape_field;
19
20pub fn stringify_local_datetimes(
28 server_spec: &mut ChartSpec,
29 client_spec: &mut ChartSpec,
30 comm_plan: &CommPlan,
31 domain_dataset_fields: &HashMap<ScopedVariable, (ScopedVariable, String)>,
32) -> Result<()> {
33 let client_scope = client_spec.to_task_scope()?;
35
36 let mut visitor = CollectScalesTypesVisitor::new();
38 client_spec.walk(&mut visitor)?;
39 let local_time_scales = visitor.local_time_scales;
40
41 let server_to_client_datasets: HashSet<_> = comm_plan
43 .server_to_client
44 .iter()
45 .filter(|&var| var.0.namespace == VariableNamespace::Data as i32)
46 .cloned()
47 .collect();
48
49 let mut visitor = CollectCandidateDatasetMapping::new(
50 client_scope.clone(),
51 server_to_client_datasets.clone(),
52 );
53 client_spec.walk(&mut visitor)?;
54 let candidate_dataset_mapping = visitor.candidate_dataset_mapping;
55
56 let mut visitor = CollectLocalTimeScaledFieldsVisitor::new(
58 client_scope,
59 local_time_scales,
60 candidate_dataset_mapping,
61 );
62 client_spec.walk(&mut visitor)?;
63
64 let mut visitor = CollectLocalTimeDataFieldsVisitor::try_new(
66 visitor.local_datetime_fields,
67 &server_to_client_datasets,
68 server_spec,
69 )?;
70 server_spec.walk(&mut visitor)?;
71 let local_datetime_fields = visitor.local_datetime_fields;
72
73 let server_scope = server_spec.to_task_scope()?;
75 let mut visitor = StringifyLocalDatetimeFieldsVisitor::new(
76 &local_datetime_fields,
77 &server_scope,
78 domain_dataset_fields,
79 );
80 server_spec.walk_mut(&mut visitor)?;
81
82 let mut visitor =
84 FormatLocalDatetimeFieldsVisitor::new(&local_datetime_fields, domain_dataset_fields);
85 client_spec.walk_mut(&mut visitor)?;
86
87 Ok(())
88}
89
90struct CollectScalesTypesVisitor {
92 pub local_time_scales: HashSet<ScopedVariable>,
93}
94
95impl CollectScalesTypesVisitor {
96 pub fn new() -> Self {
97 Self {
98 local_time_scales: Default::default(),
99 }
100 }
101}
102
103impl ChartVisitor for CollectScalesTypesVisitor {
104 fn visit_scale(&mut self, scale: &ScaleSpec, scope: &[u32]) -> Result<()> {
105 let var = (Variable::new_scale(&scale.name), Vec::from(scope));
106 if matches!(scale.type_, Some(ScaleTypeSpec::Time)) {
107 self.local_time_scales.insert(var);
108 }
109
110 Ok(())
111 }
112
113 fn visit_axis(&mut self, axis: &AxisSpec, scope: &[u32]) -> Result<()> {
114 if matches!(axis.format_type, Some(AxisFormatTypeSpec::Time)) {
115 let var = (Variable::new_scale(&axis.scale), Vec::from(scope));
116 self.local_time_scales.insert(var);
117 }
118 Ok(())
119 }
120}
121
122struct CollectCandidateDatasetMapping {
125 pub scope: TaskScope,
126 pub server_to_client_datasets: HashSet<ScopedVariable>,
127 pub candidate_dataset_mapping: HashMap<ScopedVariable, ScopedVariable>,
128}
129
130impl CollectCandidateDatasetMapping {
131 pub fn new(scope: TaskScope, server_to_client_datasets: HashSet<ScopedVariable>) -> Self {
132 let candidate_dataset_mapping: HashMap<_, _> = server_to_client_datasets
134 .iter()
135 .map(|var| (var.clone(), var.clone()))
136 .collect();
137 Self {
138 scope,
139 server_to_client_datasets,
140 candidate_dataset_mapping,
141 }
142 }
143}
144
145impl ChartVisitor for CollectCandidateDatasetMapping {
146 fn visit_group_mark(&mut self, mark: &MarkSpec, scope: &[u32]) -> Result<()> {
147 if let Some(from) = &mark.from {
149 if let Some(facet) = &from.facet {
150 let data_var = Variable::new_data(&facet.data);
151 let resolved_data = self.scope.resolve_scope(&data_var, scope)?;
152 let resolved_data_scoped: ScopedVariable =
153 (resolved_data.var.clone(), resolved_data.scope);
154
155 if self
156 .server_to_client_datasets
157 .contains(&resolved_data_scoped)
158 {
159 let facet_var = Variable::new_data(&facet.name);
160 let facet_scoped_var: ScopedVariable = (facet_var, Vec::from(scope));
161 self.candidate_dataset_mapping
162 .insert(facet_scoped_var, resolved_data_scoped);
163 }
164 }
165 }
166
167 Ok(())
168 }
169}
170
171struct CollectLocalTimeScaledFieldsVisitor {
173 pub scope: TaskScope,
174 pub candidate_dataset_mapping: HashMap<ScopedVariable, ScopedVariable>,
175 pub local_time_scales: HashSet<ScopedVariable>,
176 pub local_datetime_fields: HashMap<ScopedVariable, HashSet<String>>,
177}
178
179impl CollectLocalTimeScaledFieldsVisitor {
180 pub fn new(
181 scope: TaskScope,
182 local_time_scales: HashSet<ScopedVariable>,
183 candidate_dataset_mapping: HashMap<ScopedVariable, ScopedVariable>,
184 ) -> Self {
185 Self {
186 scope,
187 candidate_dataset_mapping,
188 local_time_scales,
189 local_datetime_fields: Default::default(),
190 }
191 }
192}
193
194impl ChartVisitor for CollectLocalTimeScaledFieldsVisitor {
195 fn visit_non_group_mark(&mut self, mark: &MarkSpec, scope: &[u32]) -> Result<()> {
196 if let Some(mark_from) = &mark.from {
197 if let Some(mark_data) = &mark_from.data {
198 let mark_data_var = Variable::new_data(mark_data);
199 let resolved_mark_data = self.scope.resolve_scope(&mark_data_var, scope)?;
200 let resolved_mark_data_scoped =
201 (resolved_mark_data.var.clone(), resolved_mark_data.scope);
202 if let Some(server_to_client_data_var) = self
203 .candidate_dataset_mapping
204 .get(&resolved_mark_data_scoped)
205 {
206 if let Some(encode) = &mark.encode {
209 for (_, encodings) in encode.encodings.iter() {
210 for (_, channels) in encodings.channels.iter() {
211 for channel in channels.to_vec() {
212 if let (Some(scale), Some(MarkEncodingField::Field(field))) =
213 (&channel.scale, &channel.field)
214 {
215 let scale_var = Variable::new_scale(scale);
216 let resolved_scale =
217 self.scope.resolve_scope(&scale_var, scope)?;
218 let resolved_scoped_scale = (
219 resolved_scale.var.clone(),
220 resolved_scale.scope.clone(),
221 );
222
223 if self.local_time_scales.contains(&resolved_scoped_scale) {
224 let entry = self
226 .local_datetime_fields
227 .entry(server_to_client_data_var.clone());
228 let fields = entry.or_default();
229 fields.insert(field.clone());
230 }
231 }
232 }
233 }
234 }
235 }
236 }
237 }
238 }
239 Ok(())
240 }
241
242 fn visit_scale(&mut self, scale: &ScaleSpec, scope: &[u32]) -> Result<()> {
243 let scale_var: ScopedVariable = (Variable::new_scale(&scale.name), Vec::from(scope));
244 if self.local_time_scales.contains(&scale_var) {
245 if let Some(domain) = &scale.domain {
246 let field_refs = match domain {
247 ScaleDomainSpec::FieldReference(field_ref) => {
248 vec![field_ref.clone()]
249 }
250 ScaleDomainSpec::FieldsReference(fields_ref) => {
251 fields_ref.to_field_references()
252 }
253 ScaleDomainSpec::FieldsReferences(fields_ref) => fields_ref
254 .fields
255 .iter()
256 .filter_map(|f| {
257 if let ScaleDataReferenceOrSignalSpec::Reference(f) = f {
258 Some(f.clone())
259 } else {
260 None
261 }
262 })
263 .collect(),
264 _ => Default::default(),
265 };
266 for field_ref in &field_refs {
267 let data_var = Variable::new_data(&field_ref.data);
268 if let Ok(resolved) = self.scope.resolve_scope(&data_var, scope) {
269 let scoped_data_var = (resolved.var, resolved.scope);
270 if self
271 .candidate_dataset_mapping
272 .contains_key(&scoped_data_var)
273 {
274 let entry = self.local_datetime_fields.entry(scoped_data_var.clone());
276 let fields = entry.or_default();
277 fields.insert(field_ref.field.clone());
278 }
279 }
280 }
281 }
282 }
283 Ok(())
284 }
285}
286
287fn get_local_datetime_fields(
288 data_var: &ScopedVariable,
289 local_datetime_fields: &HashMap<ScopedVariable, HashSet<String>>,
290 domain_dataset_fields: &HashMap<ScopedVariable, (ScopedVariable, String)>,
291) -> HashSet<String> {
292 if let Some(fields) = local_datetime_fields.get(data_var) {
294 fields.clone()
295 } else if let Some((mapped_var, field)) = domain_dataset_fields.get(data_var) {
296 if let Some(fields) = local_datetime_fields.get(mapped_var) {
297 if fields.contains(field) {
298 vec![field.clone()].into_iter().collect()
299 } else {
300 Default::default()
301 }
302 } else {
303 Default::default()
304 }
305 } else {
306 Default::default()
307 }
308}
309
310struct CollectLocalTimeDataFieldsVisitor<'a> {
312 pub local_datetime_fields: HashMap<ScopedVariable, HashSet<String>>,
313 pub server_to_client_datasets: &'a HashSet<ScopedVariable>,
314 pub chart_spec: &'a ChartSpec,
315 pub task_scope: TaskScope,
316}
317
318impl<'a> CollectLocalTimeDataFieldsVisitor<'a> {
319 pub fn try_new(
320 local_datetime_fields: HashMap<ScopedVariable, HashSet<String>>,
321 server_to_client_datasets: &'a HashSet<ScopedVariable>,
322 chart_spec: &'a ChartSpec,
323 ) -> Result<Self> {
324 Ok(Self {
325 local_datetime_fields,
326 server_to_client_datasets,
327 chart_spec,
328 task_scope: chart_spec.to_task_scope()?,
329 })
330 }
331}
332
333impl ChartVisitor for CollectLocalTimeDataFieldsVisitor<'_> {
334 fn visit_data(&mut self, data: &DataSpec, scope: &[u32]) -> Result<()> {
335 let local_columns =
340 data.local_datetime_columns_produced(self.chart_spec, &self.task_scope, scope)?;
341 let dataset_var: ScopedVariable = (Variable::new_data(&data.name), Vec::from(scope));
342 if self.server_to_client_datasets.contains(&dataset_var) {
343 match self.local_datetime_fields.entry(dataset_var) {
344 Entry::Occupied(mut v) => {
345 v.get_mut().extend(local_columns);
346 }
347 Entry::Vacant(v) => {
348 v.insert(local_columns.into_iter().collect());
349 }
350 }
351 }
352 Ok(())
353 }
354}
355
356struct StringifyLocalDatetimeFieldsVisitor<'a> {
358 pub local_datetime_fields: &'a HashMap<ScopedVariable, HashSet<String>>,
359 pub scope: &'a TaskScope,
360 pub domain_dataset_fields: &'a HashMap<ScopedVariable, (ScopedVariable, String)>,
361}
362
363impl<'a> StringifyLocalDatetimeFieldsVisitor<'a> {
364 pub fn new(
365 local_datetime_fields: &'a HashMap<ScopedVariable, HashSet<String>>,
366 scope: &'a TaskScope,
367 domain_dataset_fields: &'a HashMap<ScopedVariable, (ScopedVariable, String)>,
368 ) -> Self {
369 Self {
370 local_datetime_fields,
371 scope,
372 domain_dataset_fields,
373 }
374 }
375}
376
377impl MutChartVisitor for StringifyLocalDatetimeFieldsVisitor<'_> {
378 fn visit_data(&mut self, data: &mut DataSpec, scope: &[u32]) -> Result<()> {
379 let data_var = (Variable::new_data(&data.name), Vec::from(scope));
380
381 let fields = get_local_datetime_fields(
383 &data_var,
384 self.local_datetime_fields,
385 self.domain_dataset_fields,
386 );
387
388 for field in sorted(fields) {
389 let field = unescape_field(&field);
390 let expr_str =
391 format!("timeFormat(toDate(datum['{field}'], 'local'), '%Y-%m-%dT%H:%M:%S.%L')");
392
393 let transforms = &mut data.transform;
394 let transform = FormulaTransformSpec {
395 expr: expr_str,
396 as_: field,
397 extra: Default::default(),
398 };
399 transforms.push(TransformSpec::Formula(transform))
400 }
401
402 if let Some(source) = &data.source {
405 let source_var = Variable::new_data(source);
406 let source_resolved = self.scope.resolve_scope(&source_var, scope)?;
407 let source_resolved_var = (source_resolved.var, source_resolved.scope);
408 if let Some(fields) = self.local_datetime_fields.get(&source_resolved_var) {
409 for field in sorted(fields) {
410 let field = unescape_field(field);
411 let expr_str = format!("toDate(datum['{field}'], 'local')");
412 let transforms = &mut data.transform;
413 let transform = FormulaTransformSpec {
414 expr: expr_str,
415 as_: field.to_string(),
416 extra: Default::default(),
417 };
418 transforms.insert(0, TransformSpec::Formula(transform))
419 }
420 }
421 }
422
423 Ok(())
424 }
425}
426
427struct FormatLocalDatetimeFieldsVisitor<'a> {
429 pub local_datetime_fields: &'a HashMap<ScopedVariable, HashSet<String>>,
430 pub domain_dataset_fields: &'a HashMap<ScopedVariable, (ScopedVariable, String)>,
431}
432
433impl<'a> FormatLocalDatetimeFieldsVisitor<'a> {
434 pub fn new(
435 local_datetime_fields: &'a HashMap<ScopedVariable, HashSet<String>>,
436 domain_dataset_fields: &'a HashMap<ScopedVariable, (ScopedVariable, String)>,
437 ) -> Self {
438 Self {
439 local_datetime_fields,
440 domain_dataset_fields,
441 }
442 }
443}
444
445impl MutChartVisitor for FormatLocalDatetimeFieldsVisitor<'_> {
446 fn visit_data(&mut self, data: &mut DataSpec, scope: &[u32]) -> Result<()> {
447 let data_var = (Variable::new_data(&data.name), Vec::from(scope));
448 let fields = get_local_datetime_fields(
449 &data_var,
450 self.local_datetime_fields,
451 self.domain_dataset_fields,
452 );
453 for field in sorted(fields) {
454 let field = unescape_field(&field);
455 let transforms = &mut data.transform;
456 let transform = FormulaTransformSpec {
457 expr: format!("toDate(datum['{field}'])"),
458 as_: field.to_string(),
459 extra: Default::default(),
460 };
461 transforms.insert(0, TransformSpec::Formula(transform))
462 }
463
464 Ok(())
465 }
466}