vegafusion_core/spec/
data.rs

1use crate::error::Result;
2use crate::planning::plan::PlannerConfig;
3use crate::proto::gen::tasks::Variable;
4use crate::spec::chart::ChartSpec;
5use crate::spec::transform::TransformSpec;
6use crate::spec::values::StringOrSignalSpec;
7use crate::task_graph::scope::TaskScope;
8use itertools::sorted;
9use serde::{Deserialize, Serialize};
10use serde_json::Value;
11use std::collections::{HashMap, HashSet};
12use vegafusion_common::data::table::VegaFusionTable;
13use vegafusion_common::error::VegaFusionError;
14
15#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize)]
16pub struct DataSpec {
17    pub name: String,
18
19    #[serde(skip_serializing_if = "Option::is_none")]
20    pub source: Option<String>,
21
22    #[serde(skip_serializing_if = "Option::is_none")]
23    pub url: Option<StringOrSignalSpec>,
24
25    #[serde(skip_serializing_if = "Option::is_none")]
26    pub format: Option<DataFormatSpec>,
27
28    #[serde(skip_serializing_if = "Option::is_none")]
29    pub values: Option<Value>,
30
31    #[serde(default, skip_serializing_if = "Vec::is_empty")]
32    pub transform: Vec<TransformSpec>,
33
34    #[serde(skip_serializing_if = "Option::is_none")]
35    pub on: Option<Value>,
36
37    #[serde(flatten)]
38    pub extra: HashMap<String, Value>,
39}
40
41impl DataSpec {
42    pub fn output_signals(&self) -> Vec<String> {
43        let mut signals: HashSet<String> = Default::default();
44
45        for tx in &self.transform {
46            signals.extend(tx.output_signals())
47        }
48
49        sorted(signals).collect()
50    }
51
52    pub fn supported(
53        &self,
54        planner_config: &PlannerConfig,
55        task_scope: &TaskScope,
56        scope: &[u32],
57    ) -> DependencyNodeSupported {
58        if let Some(Some(format_type)) = self.format.as_ref().map(|fmt| fmt.type_.clone()) {
59            if !matches!(format_type.as_str(), "csv" | "tsv" | "arrow" | "json") {
60                // We don't know how to read the data, so full node is unsupported
61                return DependencyNodeSupported::Unsupported;
62            }
63        }
64
65        // Check if inline values array is supported
66        if let Some(values) = &self.values {
67            if !planner_config.extract_inline_data {
68                return DependencyNodeSupported::Unsupported;
69            }
70            if !matches!(values, Value::Array(_)) {
71                return DependencyNodeSupported::Unsupported;
72            }
73            if VegaFusionTable::from_json(values).is_err() {
74                // Failed to read inline JSON as arrow, so unsupported
75                return DependencyNodeSupported::Unsupported;
76            }
77        }
78
79        let all_supported = self
80            .transform
81            .iter()
82            .all(|tx| tx.supported_and_allowed(planner_config, task_scope, scope));
83        if all_supported {
84            DependencyNodeSupported::Supported
85        } else if self.url.is_some() {
86            DependencyNodeSupported::PartiallySupported
87        } else {
88            match self.transform.first() {
89                Some(tx) if tx.supported_and_allowed(planner_config, task_scope, scope) => {
90                    DependencyNodeSupported::PartiallySupported
91                }
92                _ => DependencyNodeSupported::Unsupported,
93            }
94        }
95    }
96
97    pub fn is_selection_store(&self) -> bool {
98        self.name.ends_with("_store")
99            && self.url.is_none()
100            && self.source.is_none()
101            && self.transform.is_empty()
102    }
103
104    pub fn local_datetime_columns_produced(
105        &self,
106        chart_spec: &ChartSpec,
107        task_scope: &TaskScope,
108        usage_scope: &[u32],
109    ) -> Result<Vec<String>> {
110        // Initialize output_local_datetime_columns
111        let mut output_local_datetime_columns = if let Some(source) = &self.source {
112            // We have a parent dataset, so init output_local_datetime_columns to be those
113            // local datetime columns produced by the parent
114            let source_var = Variable::new_data(source);
115            let resolved = task_scope.resolve_scope(&source_var, usage_scope)?;
116            let source_data = chart_spec.get_nested_data(resolved.scope.as_slice(), source)?;
117            source_data.local_datetime_columns_produced(
118                chart_spec,
119                task_scope,
120                resolved.scope.as_slice(),
121            )?
122        } else {
123            // No parent dataset, so input local datetime columns is empty
124            Default::default()
125        };
126
127        // Add any fields that are parsed as local datetimes
128        if let Some(DataFormatParseSpec::Object(parse)) =
129            self.format.as_ref().and_then(|format| format.parse.clone())
130        {
131            for (field, format) in parse {
132                if format == "date" {
133                    output_local_datetime_columns.push(field.clone())
134                }
135            }
136        }
137
138        // Propagate output_local_datetime_columns through transforms
139        for tx in &self.transform {
140            output_local_datetime_columns =
141                tx.local_datetime_columns_produced(output_local_datetime_columns.as_slice())
142        }
143
144        Ok(output_local_datetime_columns)
145    }
146
147    /// Fuse this dataset into a child dataset. This mutates the child to include this dataset's
148    /// source data and transforms. The name of the child is preserved.
149    pub fn fuse_into(&self, child: &mut DataSpec) -> Result<()> {
150        if Some(&self.name) != child.source.as_ref() {
151            return Err(VegaFusionError::internal(format!(
152                "Incompatible fuse dataset names {:?} and {:?}",
153                self.name, child.source
154            )));
155        }
156        if self.on.is_some() {
157            return Err(VegaFusionError::internal(
158                "Cannot fuse dataset with \"on\" trigger",
159            ));
160        }
161
162        // Copy over source dataset info
163        child.source = self.source.clone();
164        child.url = self.url.clone();
165        child.format = self.format.clone();
166        child.values = self.values.clone();
167
168        // Prepend this dataset's transforms
169        let mut new_transforms = self.transform.clone();
170        new_transforms.extend(child.transform.clone());
171        child.transform = new_transforms;
172        Ok(())
173    }
174
175    pub fn has_aggregate(&self) -> bool {
176        self.transform
177            .iter()
178            .any(|tx| matches!(tx, &TransformSpec::Aggregate(_)))
179    }
180}
181
182#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Hash, Eq)]
183pub enum DependencyNodeSupported {
184    Supported,
185    PartiallySupported,
186    Unsupported,
187}
188
189#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
190pub struct DataFormatSpec {
191    #[serde(rename = "type", skip_serializing_if = "Option::is_none")]
192    pub type_: Option<String>,
193
194    #[serde(skip_serializing_if = "Option::is_none")]
195    pub parse: Option<DataFormatParseSpec>,
196
197    #[serde(flatten)]
198    pub extra: HashMap<String, Value>,
199}
200
201#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
202#[serde(untagged)]
203pub enum DataFormatParseSpec {
204    Auto(String),
205    Object(HashMap<String, String>),
206}