vegafusion_core/spec/transform/
window.rs

1use crate::expression::column_usage::{ColumnUsage, DatasetsColumnUsage, VlSelectionFields};
2use crate::spec::transform::aggregate::AggregateOpSpec;
3use crate::spec::transform::{TransformColumns, TransformSpecTrait};
4use crate::spec::values::{CompareSpec, Field};
5use crate::task_graph::graph::ScopedVariable;
6use crate::task_graph::scope::TaskScope;
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9use std::collections::HashMap;
10use vegafusion_common::escape::unescape_field;
11
12#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
13pub struct WindowTransformSpec {
14    #[serde(skip_serializing_if = "Option::is_none")]
15    pub sort: Option<CompareSpec>,
16
17    #[serde(skip_serializing_if = "Option::is_none")]
18    pub groupby: Option<Vec<Field>>,
19
20    pub ops: Vec<WindowTransformOpSpec>,
21
22    pub fields: Vec<Option<Field>>,
23
24    #[serde(skip_serializing_if = "Option::is_none")]
25    pub params: Option<Vec<Value>>,
26
27    #[serde(rename = "as", skip_serializing_if = "Option::is_none")]
28    pub as_: Option<Vec<Option<String>>>,
29
30    #[serde(skip_serializing_if = "Option::is_none")]
31    pub frame: Option<[Value; 2]>,
32
33    #[serde(rename = "ignorePeers", skip_serializing_if = "Option::is_none")]
34    pub ignore_peers: Option<bool>,
35
36    #[serde(flatten)]
37    pub extra: HashMap<String, Value>,
38}
39
40#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Hash)]
41#[serde(rename_all = "lowercase")]
42pub enum WindowOpSpec {
43    #[serde(rename = "row_number")]
44    RowNumber,
45    Rank,
46
47    #[serde(rename = "dense_rank")]
48    DenseRank,
49
50    #[serde(rename = "percent_rank")]
51    PercentileRank,
52
53    #[serde(rename = "cume_dist")]
54    CumeDist,
55    NTile,
56    Lag,
57    Lead,
58
59    #[serde(rename = "first_value")]
60    FirstValue,
61
62    #[serde(rename = "last_value")]
63    LastValue,
64
65    #[serde(rename = "nth_value")]
66    NthValue,
67
68    #[serde(rename = "prev_value")]
69    PrevValue,
70
71    #[serde(rename = "next_value")]
72    NextValue,
73}
74
75#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
76#[serde(untagged)]
77pub enum WindowTransformOpSpec {
78    Aggregate(AggregateOpSpec),
79    Window(WindowOpSpec),
80}
81
82impl TransformSpecTrait for WindowTransformSpec {
83    fn supported(&self) -> bool {
84        // Check for supported aggregation op
85        use AggregateOpSpec::*;
86        use WindowOpSpec::*;
87        for op in &self.ops {
88            match op {
89                WindowTransformOpSpec::Aggregate(op) => {
90                    if !matches!(
91                        op,
92                        Count
93                            | Sum
94                            | Mean
95                            | Average
96                            | Min
97                            | Max
98                            | Values
99                            | Variance
100                            | Variancep
101                            | Stdev
102                            | Stdevp
103                            | Q1
104                            | Q3
105                    ) {
106                        // Unsupported aggregation op
107                        return false;
108                    }
109                }
110                WindowTransformOpSpec::Window(op) => {
111                    if !matches!(
112                        op,
113                        RowNumber
114                            | Rank
115                            | DenseRank
116                            | PercentileRank
117                            | CumeDist
118                            | FirstValue
119                            | LastValue
120                    ) {
121                        // Unsupported window op
122                        return false;
123                    }
124                }
125            }
126        }
127
128        true
129    }
130
131    fn transform_columns(
132        &self,
133        datum_var: &Option<ScopedVariable>,
134        _usage_scope: &[u32],
135        _task_scope: &TaskScope,
136        _vl_selection_fields: &VlSelectionFields,
137    ) -> TransformColumns {
138        if let Some(datum_var) = datum_var {
139            // Compute produced columns
140            // Only handle the case where "as" contains a list of strings with length matching ops
141            let ops = self.ops.clone();
142            let as_: Vec<_> = self
143                .as_
144                .clone()
145                .unwrap_or_default()
146                .iter()
147                .cloned()
148                .collect::<Option<Vec<_>>>()
149                .unwrap_or_default();
150            let produced = if ops.len() == as_.len() {
151                ColumnUsage::from(as_.as_slice())
152            } else {
153                ColumnUsage::Unknown
154            };
155
156            // Compute used columns (both groupby, fields, and sort)
157            let mut usage_cols: Vec<_> = self
158                .groupby
159                .clone()
160                .unwrap_or_default()
161                .iter()
162                .map(|field| unescape_field(&field.field()))
163                .collect();
164            for field in self.fields.iter().flatten() {
165                // Ignore empty fields, which vega-lite sometimes produces instead of null
166                if !field.field().trim().is_empty() {
167                    usage_cols.push(unescape_field(&field.field()))
168                }
169            }
170            if let Some(sort) = &self.sort {
171                let unescaped_sort_fields: Vec<_> = sort
172                    .field
173                    .to_vec()
174                    .iter()
175                    .map(|f| unescape_field(f))
176                    .collect();
177                usage_cols.extend(unescaped_sort_fields)
178            }
179
180            let col_usage = ColumnUsage::from(usage_cols.as_slice());
181            let usage = DatasetsColumnUsage::empty().with_column_usage(datum_var, col_usage);
182            TransformColumns::PassThrough { usage, produced }
183        } else {
184            TransformColumns::Unknown
185        }
186    }
187
188    fn local_datetime_columns_produced(
189        &self,
190        input_local_datetime_columns: &[String],
191    ) -> Vec<String> {
192        // Keep input local datetime columns as window passes through all input columns
193        // and doesn't create any local datetime columns
194        Vec::from(input_local_datetime_columns)
195    }
196}