vegafusion_core/spec/transform/
window.rs1use crate::expression::column_usage::{ColumnUsage, DatasetsColumnUsage, VlSelectionFields};
2use crate::spec::transform::aggregate::AggregateOpSpec;
3use crate::spec::transform::{TransformColumns, TransformSpecTrait};
4use crate::spec::values::{CompareSpec, Field};
5use crate::task_graph::graph::ScopedVariable;
6use crate::task_graph::scope::TaskScope;
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9use std::collections::HashMap;
10use vegafusion_common::escape::unescape_field;
11
12#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
13pub struct WindowTransformSpec {
14 #[serde(skip_serializing_if = "Option::is_none")]
15 pub sort: Option<CompareSpec>,
16
17 #[serde(skip_serializing_if = "Option::is_none")]
18 pub groupby: Option<Vec<Field>>,
19
20 pub ops: Vec<WindowTransformOpSpec>,
21
22 pub fields: Vec<Option<Field>>,
23
24 #[serde(skip_serializing_if = "Option::is_none")]
25 pub params: Option<Vec<Value>>,
26
27 #[serde(rename = "as", skip_serializing_if = "Option::is_none")]
28 pub as_: Option<Vec<Option<String>>>,
29
30 #[serde(skip_serializing_if = "Option::is_none")]
31 pub frame: Option<[Value; 2]>,
32
33 #[serde(rename = "ignorePeers", skip_serializing_if = "Option::is_none")]
34 pub ignore_peers: Option<bool>,
35
36 #[serde(flatten)]
37 pub extra: HashMap<String, Value>,
38}
39
40#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Hash)]
41#[serde(rename_all = "lowercase")]
42pub enum WindowOpSpec {
43 #[serde(rename = "row_number")]
44 RowNumber,
45 Rank,
46
47 #[serde(rename = "dense_rank")]
48 DenseRank,
49
50 #[serde(rename = "percent_rank")]
51 PercentileRank,
52
53 #[serde(rename = "cume_dist")]
54 CumeDist,
55 NTile,
56 Lag,
57 Lead,
58
59 #[serde(rename = "first_value")]
60 FirstValue,
61
62 #[serde(rename = "last_value")]
63 LastValue,
64
65 #[serde(rename = "nth_value")]
66 NthValue,
67
68 #[serde(rename = "prev_value")]
69 PrevValue,
70
71 #[serde(rename = "next_value")]
72 NextValue,
73}
74
75#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
76#[serde(untagged)]
77pub enum WindowTransformOpSpec {
78 Aggregate(AggregateOpSpec),
79 Window(WindowOpSpec),
80}
81
82impl TransformSpecTrait for WindowTransformSpec {
83 fn supported(&self) -> bool {
84 use AggregateOpSpec::*;
86 use WindowOpSpec::*;
87 for op in &self.ops {
88 match op {
89 WindowTransformOpSpec::Aggregate(op) => {
90 if !matches!(
91 op,
92 Count
93 | Sum
94 | Mean
95 | Average
96 | Min
97 | Max
98 | Values
99 | Variance
100 | Variancep
101 | Stdev
102 | Stdevp
103 | Q1
104 | Q3
105 ) {
106 return false;
108 }
109 }
110 WindowTransformOpSpec::Window(op) => {
111 if !matches!(
112 op,
113 RowNumber
114 | Rank
115 | DenseRank
116 | PercentileRank
117 | CumeDist
118 | FirstValue
119 | LastValue
120 ) {
121 return false;
123 }
124 }
125 }
126 }
127
128 true
129 }
130
131 fn transform_columns(
132 &self,
133 datum_var: &Option<ScopedVariable>,
134 _usage_scope: &[u32],
135 _task_scope: &TaskScope,
136 _vl_selection_fields: &VlSelectionFields,
137 ) -> TransformColumns {
138 if let Some(datum_var) = datum_var {
139 let ops = self.ops.clone();
142 let as_: Vec<_> = self
143 .as_
144 .clone()
145 .unwrap_or_default()
146 .iter()
147 .cloned()
148 .collect::<Option<Vec<_>>>()
149 .unwrap_or_default();
150 let produced = if ops.len() == as_.len() {
151 ColumnUsage::from(as_.as_slice())
152 } else {
153 ColumnUsage::Unknown
154 };
155
156 let mut usage_cols: Vec<_> = self
158 .groupby
159 .clone()
160 .unwrap_or_default()
161 .iter()
162 .map(|field| unescape_field(&field.field()))
163 .collect();
164 for field in self.fields.iter().flatten() {
165 if !field.field().trim().is_empty() {
167 usage_cols.push(unescape_field(&field.field()))
168 }
169 }
170 if let Some(sort) = &self.sort {
171 let unescaped_sort_fields: Vec<_> = sort
172 .field
173 .to_vec()
174 .iter()
175 .map(|f| unescape_field(f))
176 .collect();
177 usage_cols.extend(unescaped_sort_fields)
178 }
179
180 let col_usage = ColumnUsage::from(usage_cols.as_slice());
181 let usage = DatasetsColumnUsage::empty().with_column_usage(datum_var, col_usage);
182 TransformColumns::PassThrough { usage, produced }
183 } else {
184 TransformColumns::Unknown
185 }
186 }
187
188 fn local_datetime_columns_produced(
189 &self,
190 input_local_datetime_columns: &[String],
191 ) -> Vec<String> {
192 Vec::from(input_local_datetime_columns)
195 }
196}