vegafusion_core/spec/
data.rs1use crate::error::Result;
2use crate::planning::plan::PlannerConfig;
3use crate::proto::gen::tasks::Variable;
4use crate::spec::chart::ChartSpec;
5use crate::spec::transform::TransformSpec;
6use crate::spec::values::StringOrSignalSpec;
7use crate::task_graph::scope::TaskScope;
8use itertools::sorted;
9use serde::{Deserialize, Serialize};
10use serde_json::Value;
11use std::collections::{HashMap, HashSet};
12use vegafusion_common::data::table::VegaFusionTable;
13use vegafusion_common::error::VegaFusionError;
14
15#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize)]
16pub struct DataSpec {
17 pub name: String,
18
19 #[serde(skip_serializing_if = "Option::is_none")]
20 pub source: Option<String>,
21
22 #[serde(skip_serializing_if = "Option::is_none")]
23 pub url: Option<StringOrSignalSpec>,
24
25 #[serde(skip_serializing_if = "Option::is_none")]
26 pub format: Option<DataFormatSpec>,
27
28 #[serde(skip_serializing_if = "Option::is_none")]
29 pub values: Option<Value>,
30
31 #[serde(default, skip_serializing_if = "Vec::is_empty")]
32 pub transform: Vec<TransformSpec>,
33
34 #[serde(skip_serializing_if = "Option::is_none")]
35 pub on: Option<Value>,
36
37 #[serde(flatten)]
38 pub extra: HashMap<String, Value>,
39}
40
41impl DataSpec {
42 pub fn output_signals(&self) -> Vec<String> {
43 let mut signals: HashSet<String> = Default::default();
44
45 for tx in &self.transform {
46 signals.extend(tx.output_signals())
47 }
48
49 sorted(signals).collect()
50 }
51
52 pub fn supported(
53 &self,
54 planner_config: &PlannerConfig,
55 task_scope: &TaskScope,
56 scope: &[u32],
57 ) -> DependencyNodeSupported {
58 if let Some(Some(format_type)) = self.format.as_ref().map(|fmt| fmt.type_.clone()) {
59 if !matches!(format_type.as_str(), "csv" | "tsv" | "arrow" | "json") {
60 return DependencyNodeSupported::Unsupported;
62 }
63 }
64
65 if let Some(values) = &self.values {
67 if !planner_config.extract_inline_data {
68 return DependencyNodeSupported::Unsupported;
69 }
70 if !matches!(values, Value::Array(_)) {
71 return DependencyNodeSupported::Unsupported;
72 }
73 if VegaFusionTable::from_json(values).is_err() {
74 return DependencyNodeSupported::Unsupported;
76 }
77 }
78
79 let all_supported = self
80 .transform
81 .iter()
82 .all(|tx| tx.supported_and_allowed(planner_config, task_scope, scope));
83 if all_supported {
84 DependencyNodeSupported::Supported
85 } else if self.url.is_some() {
86 DependencyNodeSupported::PartiallySupported
87 } else {
88 match self.transform.first() {
89 Some(tx) if tx.supported_and_allowed(planner_config, task_scope, scope) => {
90 DependencyNodeSupported::PartiallySupported
91 }
92 _ => DependencyNodeSupported::Unsupported,
93 }
94 }
95 }
96
97 pub fn is_selection_store(&self) -> bool {
98 self.name.ends_with("_store")
99 && self.url.is_none()
100 && self.source.is_none()
101 && self.transform.is_empty()
102 }
103
104 pub fn local_datetime_columns_produced(
105 &self,
106 chart_spec: &ChartSpec,
107 task_scope: &TaskScope,
108 usage_scope: &[u32],
109 ) -> Result<Vec<String>> {
110 let mut output_local_datetime_columns = if let Some(source) = &self.source {
112 let source_var = Variable::new_data(source);
115 let resolved = task_scope.resolve_scope(&source_var, usage_scope)?;
116 let source_data = chart_spec.get_nested_data(resolved.scope.as_slice(), source)?;
117 source_data.local_datetime_columns_produced(
118 chart_spec,
119 task_scope,
120 resolved.scope.as_slice(),
121 )?
122 } else {
123 Default::default()
125 };
126
127 if let Some(DataFormatParseSpec::Object(parse)) =
129 self.format.as_ref().and_then(|format| format.parse.clone())
130 {
131 for (field, format) in parse {
132 if format == "date" {
133 output_local_datetime_columns.push(field.clone())
134 }
135 }
136 }
137
138 for tx in &self.transform {
140 output_local_datetime_columns =
141 tx.local_datetime_columns_produced(output_local_datetime_columns.as_slice())
142 }
143
144 Ok(output_local_datetime_columns)
145 }
146
147 pub fn fuse_into(&self, child: &mut DataSpec) -> Result<()> {
150 if Some(&self.name) != child.source.as_ref() {
151 return Err(VegaFusionError::internal(format!(
152 "Incompatible fuse dataset names {:?} and {:?}",
153 self.name, child.source
154 )));
155 }
156 if self.on.is_some() {
157 return Err(VegaFusionError::internal(
158 "Cannot fuse dataset with \"on\" trigger",
159 ));
160 }
161
162 child.source = self.source.clone();
164 child.url = self.url.clone();
165 child.format = self.format.clone();
166 child.values = self.values.clone();
167
168 let mut new_transforms = self.transform.clone();
170 new_transforms.extend(child.transform.clone());
171 child.transform = new_transforms;
172 Ok(())
173 }
174
175 pub fn has_aggregate(&self) -> bool {
176 self.transform
177 .iter()
178 .any(|tx| matches!(tx, &TransformSpec::Aggregate(_)))
179 }
180}
181
182#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Hash, Eq)]
183pub enum DependencyNodeSupported {
184 Supported,
185 PartiallySupported,
186 Unsupported,
187}
188
189#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
190pub struct DataFormatSpec {
191 #[serde(rename = "type", skip_serializing_if = "Option::is_none")]
192 pub type_: Option<String>,
193
194 #[serde(skip_serializing_if = "Option::is_none")]
195 pub parse: Option<DataFormatParseSpec>,
196
197 #[serde(flatten)]
198 pub extra: HashMap<String, Value>,
199}
200
201#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
202#[serde(untagged)]
203pub enum DataFormatParseSpec {
204 Auto(String),
205 Object(HashMap<String, String>),
206}