1use serde::{Deserialize, Serialize};
98
99use crate::{
100 compiler::{
101 aggregation::{OrderByClause, OrderDirection},
102 fact_table::FactTableMetadata,
103 },
104 db::where_clause::WhereClause,
105 error::{FraiseQLError, Result},
106};
107
108#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
141pub struct WindowRequest {
142 pub table_name: String,
144
145 pub select: Vec<WindowSelectColumn>,
147
148 pub windows: Vec<WindowFunctionRequest>,
150
151 pub where_clause: Option<WhereClause>,
153
154 pub order_by: Vec<WindowOrderBy>,
156
157 pub limit: Option<u32>,
159
160 pub offset: Option<u32>,
162}
163
164#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
166#[serde(tag = "type", rename_all = "snake_case")]
167pub enum WindowSelectColumn {
168 Measure {
170 name: String,
172 alias: String,
174 },
175
176 Dimension {
178 path: String,
180 alias: String,
182 },
183
184 Filter {
186 name: String,
188 alias: String,
190 },
191}
192
193impl WindowSelectColumn {
194 #[must_use]
196 pub fn alias(&self) -> &str {
197 match self {
198 Self::Measure { alias, .. }
199 | Self::Dimension { alias, .. }
200 | Self::Filter { alias, .. } => alias,
201 }
202 }
203}
204
205#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
207pub struct WindowFunctionRequest {
208 pub function: WindowFunctionSpec,
210
211 pub alias: String,
213
214 pub partition_by: Vec<PartitionByColumn>,
216
217 pub order_by: Vec<WindowOrderBy>,
219
220 pub frame: Option<WindowFrame>,
222}
223
224#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
229#[serde(tag = "type", rename_all = "snake_case")]
230pub enum WindowFunctionSpec {
231 RowNumber,
236
237 Rank,
239
240 DenseRank,
242
243 Ntile {
245 n: u32,
247 },
248
249 PercentRank,
251
252 CumeDist,
254
255 Lag {
260 field: String,
262 offset: i32,
264 default: Option<serde_json::Value>,
266 },
267
268 Lead {
270 field: String,
272 offset: i32,
274 default: Option<serde_json::Value>,
276 },
277
278 FirstValue {
280 field: String,
282 },
283
284 LastValue {
286 field: String,
288 },
289
290 NthValue {
292 field: String,
294 n: u32,
296 },
297
298 RunningSum {
303 measure: String,
305 },
306
307 RunningAvg {
309 measure: String,
311 },
312
313 RunningCount,
315
316 RunningCountField {
318 field: String,
320 },
321
322 RunningMin {
324 measure: String,
326 },
327
328 RunningMax {
330 measure: String,
332 },
333
334 RunningStddev {
336 measure: String,
338 },
339
340 RunningVariance {
342 measure: String,
344 },
345}
346
347#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
349#[serde(tag = "type", rename_all = "snake_case")]
350pub enum PartitionByColumn {
351 Dimension {
353 path: String,
355 },
356
357 Filter {
359 name: String,
361 },
362
363 Measure {
365 name: String,
367 },
368}
369
370#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
372pub struct WindowOrderBy {
373 pub field: String,
375
376 pub direction: OrderDirection,
378}
379
380#[derive(Debug, Clone, Serialize, Deserialize)]
386pub struct WindowExecutionPlan {
387 pub table: String,
389
390 pub select: Vec<SelectColumn>,
392
393 pub windows: Vec<WindowFunction>,
395
396 pub where_clause: Option<WhereClause>,
398
399 pub order_by: Vec<OrderByClause>,
401
402 pub limit: Option<u32>,
404
405 pub offset: Option<u32>,
407}
408
409#[derive(Debug, Clone, Serialize, Deserialize)]
411pub struct SelectColumn {
412 pub expression: String,
414
415 pub alias: String,
417}
418
419#[derive(Debug, Clone, Serialize, Deserialize)]
421pub struct WindowFunction {
422 pub function: WindowFunctionType,
424
425 pub alias: String,
427
428 pub partition_by: Vec<String>,
430
431 pub order_by: Vec<OrderByClause>,
433
434 pub frame: Option<WindowFrame>,
436}
437
438#[derive(Debug, Clone, Serialize, Deserialize)]
440#[serde(tag = "type", rename_all = "snake_case")]
441pub enum WindowFunctionType {
442 RowNumber,
445
446 Rank,
448
449 DenseRank,
451
452 Ntile {
454 n: u32,
456 },
457
458 PercentRank,
460
461 CumeDist,
463
464 Lag {
467 field: String,
469 offset: i32,
471 default: Option<serde_json::Value>,
473 },
474
475 Lead {
477 field: String,
479 offset: i32,
481 default: Option<serde_json::Value>,
483 },
484
485 FirstValue {
487 field: String,
489 },
490
491 LastValue {
493 field: String,
495 },
496
497 NthValue {
499 field: String,
501 n: u32,
503 },
504
505 Sum {
508 field: String,
510 },
511
512 Avg {
514 field: String,
516 },
517
518 Count {
520 field: Option<String>,
522 },
523
524 Min {
526 field: String,
528 },
529
530 Max {
532 field: String,
534 },
535
536 Stddev {
538 field: String,
540 },
541
542 Variance {
544 field: String,
546 },
547}
548
549#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
551pub struct WindowFrame {
552 pub frame_type: FrameType,
554
555 pub start: FrameBoundary,
557
558 pub end: FrameBoundary,
560
561 pub exclusion: Option<FrameExclusion>,
563}
564
565#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
567#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
568pub enum FrameType {
569 Rows,
571
572 Range,
574
575 Groups,
577}
578
579#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
581#[serde(tag = "type", rename_all = "snake_case")]
582pub enum FrameBoundary {
583 UnboundedPreceding,
585
586 NPreceding {
588 n: u32,
590 },
591
592 CurrentRow,
594
595 NFollowing {
597 n: u32,
599 },
600
601 UnboundedFollowing,
603}
604
605#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
607#[serde(rename_all = "snake_case")]
608pub enum FrameExclusion {
609 CurrentRow,
611
612 Group,
614
615 Ties,
617
618 NoOthers,
620}
621
622pub struct WindowFunctionPlanner;
624
625impl WindowFunctionPlanner {
626 pub fn plan(
646 query: &serde_json::Value,
647 _metadata: &FactTableMetadata,
648 ) -> Result<WindowExecutionPlan> {
649 let table = query["table"]
651 .as_str()
652 .ok_or_else(|| FraiseQLError::validation("Missing 'table' field"))?
653 .to_string();
654
655 let select = Self::parse_select_columns(query)?;
657
658 let windows = Self::parse_window_functions(query)?;
660
661 let where_clause = query.get("where").map(|_| WhereClause::And(vec![]));
663
664 let order_by = query
666 .get("orderBy")
667 .and_then(|v| v.as_array())
668 .map(|arr| {
669 arr.iter()
670 .filter_map(|item| {
671 let direction = match item.get("direction").and_then(|d| d.as_str()) {
672 Some("DESC") => OrderDirection::Desc,
673 _ => OrderDirection::Asc,
674 };
675 Some(OrderByClause {
676 field: item["field"].as_str()?.to_string(),
677 direction,
678 })
679 })
680 .collect()
681 })
682 .unwrap_or_default();
683
684 let limit = query.get("limit").and_then(|v| v.as_u64()).map(|n| n as u32);
686 let offset = query.get("offset").and_then(|v| v.as_u64()).map(|n| n as u32);
687
688 Ok(WindowExecutionPlan {
689 table,
690 select,
691 windows,
692 where_clause,
693 order_by,
694 limit,
695 offset,
696 })
697 }
698
699 fn parse_select_columns(query: &serde_json::Value) -> Result<Vec<SelectColumn>> {
700 let default_array = vec![];
701 let select = query.get("select").and_then(|s| s.as_array()).unwrap_or(&default_array);
702
703 let columns = select
704 .iter()
705 .filter_map(|col| {
706 if let Some(col_str) = col.as_str() {
707 Some(SelectColumn {
708 expression: col_str.to_string(),
709 alias: col_str.to_string(),
710 })
711 } else {
712 None
713 }
714 })
715 .collect();
716
717 Ok(columns)
718 }
719
720 fn parse_window_functions(query: &serde_json::Value) -> Result<Vec<WindowFunction>> {
721 let default_array = vec![];
722 let windows = query.get("windows").and_then(|w| w.as_array()).unwrap_or(&default_array);
723
724 windows.iter().map(|window| Self::parse_single_window(window)).collect()
725 }
726
727 fn parse_single_window(window: &serde_json::Value) -> Result<WindowFunction> {
728 let function = Self::parse_window_function_type(&window["function"])?;
729 let alias = window["alias"]
730 .as_str()
731 .ok_or_else(|| FraiseQLError::validation("Missing 'alias' in window function"))?
732 .to_string();
733
734 let partition_by = window
735 .get("partitionBy")
736 .and_then(|p| p.as_array())
737 .map(|arr| arr.iter().filter_map(|v| v.as_str().map(String::from)).collect())
738 .unwrap_or_default();
739
740 let order_by = window
741 .get("orderBy")
742 .and_then(|o| o.as_array())
743 .map(|arr| {
744 arr.iter()
745 .filter_map(|item| {
746 let direction = match item.get("direction").and_then(|d| d.as_str()) {
747 Some("DESC") => OrderDirection::Desc,
748 _ => OrderDirection::Asc,
749 };
750 Some(OrderByClause {
751 field: item["field"].as_str()?.to_string(),
752 direction,
753 })
754 })
755 .collect()
756 })
757 .unwrap_or_default();
758
759 let frame = window.get("frame").map(Self::parse_window_frame).transpose()?;
760
761 Ok(WindowFunction {
762 function,
763 alias,
764 partition_by,
765 order_by,
766 frame,
767 })
768 }
769
770 fn parse_window_function_type(func: &serde_json::Value) -> Result<WindowFunctionType> {
771 if func.get("row_number").is_some() {
773 return Ok(WindowFunctionType::RowNumber);
774 }
775 if func.get("rank").is_some() {
776 return Ok(WindowFunctionType::Rank);
777 }
778 if func.get("dense_rank").is_some() {
779 return Ok(WindowFunctionType::DenseRank);
780 }
781 if let Some(ntile) = func.get("ntile") {
782 let n = ntile["n"]
783 .as_u64()
784 .ok_or_else(|| FraiseQLError::validation("Missing 'n' in NTILE function"))?
785 as u32;
786 return Ok(WindowFunctionType::Ntile { n });
787 }
788 if func.get("percent_rank").is_some() {
789 return Ok(WindowFunctionType::PercentRank);
790 }
791 if func.get("cume_dist").is_some() {
792 return Ok(WindowFunctionType::CumeDist);
793 }
794
795 if let Some(lag) = func.get("lag") {
797 let field = lag["field"]
798 .as_str()
799 .ok_or_else(|| FraiseQLError::validation("Missing 'field' in LAG"))?
800 .to_string();
801 let offset = lag.get("offset").and_then(|o| o.as_i64()).unwrap_or(1) as i32;
802 let default = lag.get("default").cloned();
803 return Ok(WindowFunctionType::Lag {
804 field,
805 offset,
806 default,
807 });
808 }
809 if let Some(lead) = func.get("lead") {
810 let field = lead["field"]
811 .as_str()
812 .ok_or_else(|| FraiseQLError::validation("Missing 'field' in LEAD"))?
813 .to_string();
814 let offset = lead.get("offset").and_then(|o| o.as_i64()).unwrap_or(1) as i32;
815 let default = lead.get("default").cloned();
816 return Ok(WindowFunctionType::Lead {
817 field,
818 offset,
819 default,
820 });
821 }
822 if let Some(first_val) = func.get("first_value") {
823 let field = first_val["field"]
824 .as_str()
825 .ok_or_else(|| FraiseQLError::validation("Missing 'field' in FIRST_VALUE"))?
826 .to_string();
827 return Ok(WindowFunctionType::FirstValue { field });
828 }
829 if let Some(last_val) = func.get("last_value") {
830 let field = last_val["field"]
831 .as_str()
832 .ok_or_else(|| FraiseQLError::validation("Missing 'field' in LAST_VALUE"))?
833 .to_string();
834 return Ok(WindowFunctionType::LastValue { field });
835 }
836 if let Some(nth_val) = func.get("nth_value") {
837 let field = nth_val["field"]
838 .as_str()
839 .ok_or_else(|| FraiseQLError::validation("Missing 'field' in NTH_VALUE"))?
840 .to_string();
841 let n = nth_val["n"]
842 .as_u64()
843 .ok_or_else(|| FraiseQLError::validation("Missing 'n' in NTH_VALUE"))?
844 as u32;
845 return Ok(WindowFunctionType::NthValue { field, n });
846 }
847
848 if let Some(sum) = func.get("sum") {
850 let field = sum["field"]
851 .as_str()
852 .ok_or_else(|| FraiseQLError::validation("Missing 'field' in SUM"))?
853 .to_string();
854 return Ok(WindowFunctionType::Sum { field });
855 }
856 if let Some(avg) = func.get("avg") {
857 let field = avg["field"]
858 .as_str()
859 .ok_or_else(|| FraiseQLError::validation("Missing 'field' in AVG"))?
860 .to_string();
861 return Ok(WindowFunctionType::Avg { field });
862 }
863 if let Some(count) = func.get("count") {
864 let field = count.get("field").and_then(|f| f.as_str()).map(String::from);
865 return Ok(WindowFunctionType::Count { field });
866 }
867 if let Some(min) = func.get("min") {
868 let field = min["field"]
869 .as_str()
870 .ok_or_else(|| FraiseQLError::validation("Missing 'field' in MIN"))?
871 .to_string();
872 return Ok(WindowFunctionType::Min { field });
873 }
874 if let Some(max) = func.get("max") {
875 let field = max["field"]
876 .as_str()
877 .ok_or_else(|| FraiseQLError::validation("Missing 'field' in MAX"))?
878 .to_string();
879 return Ok(WindowFunctionType::Max { field });
880 }
881 if let Some(stddev) = func.get("stddev") {
882 let field = stddev["field"]
883 .as_str()
884 .ok_or_else(|| FraiseQLError::validation("Missing 'field' in STDDEV"))?
885 .to_string();
886 return Ok(WindowFunctionType::Stddev { field });
887 }
888 if let Some(variance) = func.get("variance") {
889 let field = variance["field"]
890 .as_str()
891 .ok_or_else(|| FraiseQLError::validation("Missing 'field' in VARIANCE"))?
892 .to_string();
893 return Ok(WindowFunctionType::Variance { field });
894 }
895
896 Err(FraiseQLError::validation("Unknown window function type"))
897 }
898
899 fn parse_window_frame(frame: &serde_json::Value) -> Result<WindowFrame> {
900 let frame_type = match frame["frame_type"].as_str() {
901 Some("ROWS") => FrameType::Rows,
902 Some("RANGE") => FrameType::Range,
903 Some("GROUPS") => FrameType::Groups,
904 _ => return Err(FraiseQLError::validation("Invalid or missing 'frame_type'")),
905 };
906
907 let start = Self::parse_frame_boundary(&frame["start"])?;
908 let end = Self::parse_frame_boundary(&frame["end"])?;
909 let exclusion = frame.get("exclusion").map(|e| match e.as_str() {
910 Some("current_row") => FrameExclusion::CurrentRow,
911 Some("group") => FrameExclusion::Group,
912 Some("ties") => FrameExclusion::Ties,
913 Some("no_others") => FrameExclusion::NoOthers,
914 _ => FrameExclusion::NoOthers,
915 });
916
917 Ok(WindowFrame {
918 frame_type,
919 start,
920 end,
921 exclusion,
922 })
923 }
924
925 fn parse_frame_boundary(boundary: &serde_json::Value) -> Result<FrameBoundary> {
926 match boundary["type"].as_str() {
927 Some("unbounded_preceding") => Ok(FrameBoundary::UnboundedPreceding),
928 Some("n_preceding") => {
929 let n = boundary["n"]
930 .as_u64()
931 .ok_or_else(|| FraiseQLError::validation("Missing 'n' in N PRECEDING"))?
932 as u32;
933 Ok(FrameBoundary::NPreceding { n })
934 },
935 Some("current_row") => Ok(FrameBoundary::CurrentRow),
936 Some("n_following") => {
937 let n = boundary["n"]
938 .as_u64()
939 .ok_or_else(|| FraiseQLError::validation("Missing 'n' in N FOLLOWING"))?
940 as u32;
941 Ok(FrameBoundary::NFollowing { n })
942 },
943 Some("unbounded_following") => Ok(FrameBoundary::UnboundedFollowing),
944 _ => Err(FraiseQLError::validation("Invalid frame boundary type")),
945 }
946 }
947
948 pub fn validate(
950 plan: &WindowExecutionPlan,
951 _metadata: &FactTableMetadata,
952 database_target: crate::db::types::DatabaseType,
953 ) -> Result<()> {
954 use crate::db::types::DatabaseType;
955
956 for window in &plan.windows {
958 if let Some(frame) = &window.frame {
959 if frame.frame_type == FrameType::Groups {
960 if !matches!(database_target, DatabaseType::PostgreSQL) {
961 return Err(FraiseQLError::validation(
962 "GROUPS frame type only supported on PostgreSQL",
963 ));
964 }
965 }
966
967 if frame.exclusion.is_some() && !matches!(database_target, DatabaseType::PostgreSQL)
969 {
970 return Err(FraiseQLError::validation(
971 "Frame exclusion only supported on PostgreSQL",
972 ));
973 }
974 }
975
976 match window.function {
978 WindowFunctionType::PercentRank | WindowFunctionType::CumeDist => {
979 if matches!(database_target, DatabaseType::SQLite) {
980 return Err(FraiseQLError::validation(
981 "PERCENT_RANK and CUME_DIST not supported on SQLite",
982 ));
983 }
984 },
985 _ => {},
986 }
987 }
988
989 Ok(())
990 }
991}
992
993pub struct WindowPlanner;
1011
1012impl WindowPlanner {
1013 pub fn plan(
1027 request: WindowRequest,
1028 metadata: FactTableMetadata,
1029 ) -> Result<WindowExecutionPlan> {
1030 let select = Self::convert_select_columns(&request.select, &metadata)?;
1032
1033 let windows = Self::convert_window_functions(&request.windows, &metadata)?;
1035
1036 let order_by = Self::convert_order_by(&request.order_by, &metadata)?;
1038
1039 Ok(WindowExecutionPlan {
1040 table: request.table_name,
1041 select,
1042 windows,
1043 where_clause: request.where_clause,
1044 order_by,
1045 limit: request.limit,
1046 offset: request.offset,
1047 })
1048 }
1049
1050 fn convert_select_columns(
1052 columns: &[WindowSelectColumn],
1053 metadata: &FactTableMetadata,
1054 ) -> Result<Vec<SelectColumn>> {
1055 columns
1056 .iter()
1057 .map(|col| Self::convert_single_select_column(col, metadata))
1058 .collect()
1059 }
1060
1061 fn convert_single_select_column(
1062 column: &WindowSelectColumn,
1063 metadata: &FactTableMetadata,
1064 ) -> Result<SelectColumn> {
1065 match column {
1066 WindowSelectColumn::Measure { name, alias } => {
1067 if !metadata.measures.iter().any(|m| m.name == *name) {
1069 return Err(FraiseQLError::Validation {
1070 message: format!(
1071 "Measure '{}' not found in fact table '{}'",
1072 name, metadata.table_name
1073 ),
1074 path: None,
1075 });
1076 }
1077 Ok(SelectColumn {
1079 expression: name.clone(),
1080 alias: alias.clone(),
1081 })
1082 },
1083 WindowSelectColumn::Dimension { path, alias } => {
1084 let expression = format!("{}->>'{}'", metadata.dimensions.name, path);
1086 Ok(SelectColumn {
1087 expression,
1088 alias: alias.clone(),
1089 })
1090 },
1091 WindowSelectColumn::Filter { name, alias } => {
1092 if !metadata.denormalized_filters.iter().any(|f| f.name == *name) {
1094 return Err(FraiseQLError::Validation {
1095 message: format!(
1096 "Filter column '{}' not found in fact table '{}'",
1097 name, metadata.table_name
1098 ),
1099 path: None,
1100 });
1101 }
1102 Ok(SelectColumn {
1104 expression: name.clone(),
1105 alias: alias.clone(),
1106 })
1107 },
1108 }
1109 }
1110
1111 fn convert_window_functions(
1113 windows: &[WindowFunctionRequest],
1114 metadata: &FactTableMetadata,
1115 ) -> Result<Vec<WindowFunction>> {
1116 windows
1117 .iter()
1118 .map(|w| Self::convert_single_window_function(w, metadata))
1119 .collect()
1120 }
1121
1122 fn convert_single_window_function(
1123 request: &WindowFunctionRequest,
1124 metadata: &FactTableMetadata,
1125 ) -> Result<WindowFunction> {
1126 let function = Self::convert_function_spec(&request.function, metadata)?;
1128
1129 let partition_by = request
1131 .partition_by
1132 .iter()
1133 .map(|p| Self::convert_partition_by(p, metadata))
1134 .collect::<Result<Vec<_>>>()?;
1135
1136 let order_by = request
1138 .order_by
1139 .iter()
1140 .map(|o| Self::convert_window_order_by(o, metadata))
1141 .collect::<Result<Vec<_>>>()?;
1142
1143 Ok(WindowFunction {
1144 function,
1145 alias: request.alias.clone(),
1146 partition_by,
1147 order_by,
1148 frame: request.frame.clone(),
1149 })
1150 }
1151
1152 fn convert_function_spec(
1154 spec: &WindowFunctionSpec,
1155 metadata: &FactTableMetadata,
1156 ) -> Result<WindowFunctionType> {
1157 match spec {
1158 WindowFunctionSpec::RowNumber => Ok(WindowFunctionType::RowNumber),
1160 WindowFunctionSpec::Rank => Ok(WindowFunctionType::Rank),
1161 WindowFunctionSpec::DenseRank => Ok(WindowFunctionType::DenseRank),
1162 WindowFunctionSpec::Ntile { n } => Ok(WindowFunctionType::Ntile { n: *n }),
1163 WindowFunctionSpec::PercentRank => Ok(WindowFunctionType::PercentRank),
1164 WindowFunctionSpec::CumeDist => Ok(WindowFunctionType::CumeDist),
1165
1166 WindowFunctionSpec::Lag {
1168 field,
1169 offset,
1170 default,
1171 } => {
1172 let sql_field = Self::resolve_field_to_sql(field, metadata)?;
1173 Ok(WindowFunctionType::Lag {
1174 field: sql_field,
1175 offset: *offset,
1176 default: default.clone(),
1177 })
1178 },
1179 WindowFunctionSpec::Lead {
1180 field,
1181 offset,
1182 default,
1183 } => {
1184 let sql_field = Self::resolve_field_to_sql(field, metadata)?;
1185 Ok(WindowFunctionType::Lead {
1186 field: sql_field,
1187 offset: *offset,
1188 default: default.clone(),
1189 })
1190 },
1191 WindowFunctionSpec::FirstValue { field } => {
1192 let sql_field = Self::resolve_field_to_sql(field, metadata)?;
1193 Ok(WindowFunctionType::FirstValue { field: sql_field })
1194 },
1195 WindowFunctionSpec::LastValue { field } => {
1196 let sql_field = Self::resolve_field_to_sql(field, metadata)?;
1197 Ok(WindowFunctionType::LastValue { field: sql_field })
1198 },
1199 WindowFunctionSpec::NthValue { field, n } => {
1200 let sql_field = Self::resolve_field_to_sql(field, metadata)?;
1201 Ok(WindowFunctionType::NthValue {
1202 field: sql_field,
1203 n: *n,
1204 })
1205 },
1206
1207 WindowFunctionSpec::RunningSum { measure } => {
1209 Self::validate_measure(measure, metadata)?;
1210 Ok(WindowFunctionType::Sum {
1211 field: measure.clone(),
1212 })
1213 },
1214 WindowFunctionSpec::RunningAvg { measure } => {
1215 Self::validate_measure(measure, metadata)?;
1216 Ok(WindowFunctionType::Avg {
1217 field: measure.clone(),
1218 })
1219 },
1220 WindowFunctionSpec::RunningCount => Ok(WindowFunctionType::Count { field: None }),
1221 WindowFunctionSpec::RunningCountField { field } => {
1222 let sql_field = Self::resolve_field_to_sql(field, metadata)?;
1223 Ok(WindowFunctionType::Count {
1224 field: Some(sql_field),
1225 })
1226 },
1227 WindowFunctionSpec::RunningMin { measure } => {
1228 Self::validate_measure(measure, metadata)?;
1229 Ok(WindowFunctionType::Min {
1230 field: measure.clone(),
1231 })
1232 },
1233 WindowFunctionSpec::RunningMax { measure } => {
1234 Self::validate_measure(measure, metadata)?;
1235 Ok(WindowFunctionType::Max {
1236 field: measure.clone(),
1237 })
1238 },
1239 WindowFunctionSpec::RunningStddev { measure } => {
1240 Self::validate_measure(measure, metadata)?;
1241 Ok(WindowFunctionType::Stddev {
1242 field: measure.clone(),
1243 })
1244 },
1245 WindowFunctionSpec::RunningVariance { measure } => {
1246 Self::validate_measure(measure, metadata)?;
1247 Ok(WindowFunctionType::Variance {
1248 field: measure.clone(),
1249 })
1250 },
1251 }
1252 }
1253
1254 fn convert_partition_by(
1256 partition: &PartitionByColumn,
1257 metadata: &FactTableMetadata,
1258 ) -> Result<String> {
1259 match partition {
1260 PartitionByColumn::Dimension { path } => {
1261 Ok(format!("{}->>'{}'", metadata.dimensions.name, path))
1262 },
1263 PartitionByColumn::Filter { name } => {
1264 if !metadata.denormalized_filters.iter().any(|f| f.name == *name) {
1265 return Err(FraiseQLError::Validation {
1266 message: format!(
1267 "Filter column '{}' not found in fact table '{}'",
1268 name, metadata.table_name
1269 ),
1270 path: None,
1271 });
1272 }
1273 Ok(name.clone())
1274 },
1275 PartitionByColumn::Measure { name } => {
1276 Self::validate_measure(name, metadata)?;
1277 Ok(name.clone())
1278 },
1279 }
1280 }
1281
1282 fn convert_window_order_by(
1284 order: &WindowOrderBy,
1285 metadata: &FactTableMetadata,
1286 ) -> Result<OrderByClause> {
1287 let field = Self::resolve_field_to_sql(&order.field, metadata)?;
1288 Ok(OrderByClause {
1289 field,
1290 direction: order.direction,
1291 })
1292 }
1293
1294 fn convert_order_by(
1296 orders: &[WindowOrderBy],
1297 metadata: &FactTableMetadata,
1298 ) -> Result<Vec<OrderByClause>> {
1299 orders.iter().map(|o| Self::convert_window_order_by(o, metadata)).collect()
1300 }
1301
1302 fn resolve_field_to_sql(field: &str, metadata: &FactTableMetadata) -> Result<String> {
1309 if metadata.measures.iter().any(|m| m.name == field) {
1311 return Ok(field.to_string());
1312 }
1313
1314 if metadata.denormalized_filters.iter().any(|f| f.name == field) {
1316 return Ok(field.to_string());
1317 }
1318
1319 Ok(format!("{}->>'{}'", metadata.dimensions.name, field))
1321 }
1322
1323 fn validate_measure(measure: &str, metadata: &FactTableMetadata) -> Result<()> {
1325 if !metadata.measures.iter().any(|m| m.name == *measure) {
1326 return Err(FraiseQLError::Validation {
1327 message: format!(
1328 "Measure '{}' not found in fact table '{}'",
1329 measure, metadata.table_name
1330 ),
1331 path: None,
1332 });
1333 }
1334 Ok(())
1335 }
1336}
1337
1338#[cfg(test)]
1339mod tests {
1340 use super::*;
1341 use crate::compiler::fact_table::{DimensionColumn, FilterColumn, MeasureColumn, SqlType};
1342
1343 fn create_test_metadata() -> FactTableMetadata {
1344 FactTableMetadata {
1345 table_name: "tf_sales".to_string(),
1346 measures: vec![
1347 MeasureColumn {
1348 name: "revenue".to_string(),
1349 sql_type: SqlType::Decimal,
1350 nullable: false,
1351 },
1352 MeasureColumn {
1353 name: "quantity".to_string(),
1354 sql_type: SqlType::Int,
1355 nullable: false,
1356 },
1357 ],
1358 dimensions: DimensionColumn {
1359 name: "dimensions".to_string(),
1360 paths: vec![],
1361 },
1362 denormalized_filters: vec![
1363 FilterColumn {
1364 name: "customer_id".to_string(),
1365 sql_type: SqlType::Uuid,
1366 indexed: true,
1367 },
1368 FilterColumn {
1369 name: "occurred_at".to_string(),
1370 sql_type: SqlType::Timestamp,
1371 indexed: true,
1372 },
1373 ],
1374 calendar_dimensions: vec![],
1375 }
1376 }
1377
1378 fn serialize_json<T: serde::Serialize>(value: &T) -> String {
1384 serde_json::to_string(value).expect("serialization should succeed for test objects")
1385 }
1386
1387 fn deserialize_json<'a, T: serde::Deserialize<'a>>(json: &'a str) -> T {
1389 serde_json::from_str(json).expect("deserialization should succeed for valid test JSON")
1390 }
1391
1392 #[test]
1397 fn test_window_function_type_serialization() {
1398 let func = WindowFunctionType::RowNumber;
1399 let json = serialize_json(&func);
1400 assert_eq!(json, r#"{"type":"row_number"}"#);
1401 }
1402
1403 #[test]
1404 fn test_frame_type_serialization() {
1405 let frame_type = FrameType::Rows;
1406 let json = serialize_json(&frame_type);
1407 assert_eq!(json, r#""ROWS""#);
1408 }
1409
1410 #[test]
1411 fn test_frame_boundary_unbounded() {
1412 let boundary = FrameBoundary::UnboundedPreceding;
1413 let json = serialize_json(&boundary);
1414 assert!(json.contains("unbounded_preceding"));
1415 }
1416
1417 #[test]
1418 fn test_frame_boundary_n_preceding() {
1419 let boundary = FrameBoundary::NPreceding { n: 5 };
1420 let json = serialize_json(&boundary);
1421 assert!(json.contains("n_preceding"));
1422 assert!(json.contains("\"n\":5"));
1423 }
1424
1425 #[test]
1426 fn test_parse_row_number_query() {
1427 let metadata = create_test_metadata();
1428 let query = serde_json::json!({
1429 "table": "tf_sales",
1430 "select": ["revenue"],
1431 "windows": [{
1432 "function": {"row_number": {}},
1433 "alias": "rank",
1434 "partitionBy": ["category"],
1435 "orderBy": [{"field": "revenue", "direction": "DESC"}]
1436 }]
1437 });
1438
1439 let plan =
1440 WindowFunctionPlanner::plan(&query, &metadata).expect("window plan should succeed");
1441
1442 assert_eq!(plan.table, "tf_sales");
1443 assert_eq!(plan.windows.len(), 1);
1444 assert_eq!(plan.windows[0].alias, "rank");
1445 assert!(matches!(plan.windows[0].function, WindowFunctionType::RowNumber));
1446 }
1447
1448 #[test]
1449 fn test_parse_lag_function() {
1450 let metadata = create_test_metadata();
1451 let query = serde_json::json!({
1452 "table": "tf_sales",
1453 "windows": [{
1454 "function": {
1455 "lag": {
1456 "field": "revenue",
1457 "offset": 1,
1458 "default": 0
1459 }
1460 },
1461 "alias": "prev_revenue",
1462 "orderBy": [{"field": "occurred_at"}]
1463 }]
1464 });
1465
1466 let plan =
1467 WindowFunctionPlanner::plan(&query, &metadata).expect("window plan should succeed");
1468
1469 match &plan.windows[0].function {
1470 WindowFunctionType::Lag {
1471 field,
1472 offset,
1473 default,
1474 } => {
1475 assert_eq!(field, "revenue");
1476 assert_eq!(*offset, 1);
1477 assert!(default.is_some());
1478 },
1479 _ => panic!("Expected LAG function"),
1480 }
1481 }
1482
1483 #[test]
1484 fn test_validate_groups_frame_postgres_only() {
1485 use crate::db::types::DatabaseType;
1486
1487 let metadata = create_test_metadata();
1488 let plan = WindowExecutionPlan {
1489 table: "tf_sales".to_string(),
1490 select: vec![],
1491 windows: vec![WindowFunction {
1492 function: WindowFunctionType::RowNumber,
1493 alias: "rank".to_string(),
1494 partition_by: vec![],
1495 order_by: vec![],
1496 frame: Some(WindowFrame {
1497 frame_type: FrameType::Groups,
1498 start: FrameBoundary::UnboundedPreceding,
1499 end: FrameBoundary::CurrentRow,
1500 exclusion: None,
1501 }),
1502 }],
1503 where_clause: None,
1504 order_by: vec![],
1505 limit: None,
1506 offset: None,
1507 };
1508
1509 assert!(
1511 WindowFunctionPlanner::validate(&plan, &metadata, DatabaseType::PostgreSQL).is_ok()
1512 );
1513
1514 assert!(WindowFunctionPlanner::validate(&plan, &metadata, DatabaseType::MySQL).is_err());
1516 }
1517
1518 #[test]
1523 fn test_window_planner_basic_request() {
1524 let metadata = create_test_metadata();
1525 let request = WindowRequest {
1526 table_name: "tf_sales".to_string(),
1527 select: vec![
1528 WindowSelectColumn::Measure {
1529 name: "revenue".to_string(),
1530 alias: "revenue".to_string(),
1531 },
1532 WindowSelectColumn::Dimension {
1533 path: "category".to_string(),
1534 alias: "category".to_string(),
1535 },
1536 ],
1537 windows: vec![WindowFunctionRequest {
1538 function: WindowFunctionSpec::RowNumber,
1539 alias: "rank".to_string(),
1540 partition_by: vec![PartitionByColumn::Dimension {
1541 path: "category".to_string(),
1542 }],
1543 order_by: vec![WindowOrderBy {
1544 field: "revenue".to_string(),
1545 direction: OrderDirection::Desc,
1546 }],
1547 frame: None,
1548 }],
1549 where_clause: None,
1550 order_by: vec![],
1551 limit: Some(100),
1552 offset: None,
1553 };
1554
1555 let plan = WindowPlanner::plan(request, metadata).expect("window plan should succeed");
1556
1557 assert_eq!(plan.table, "tf_sales");
1558 assert_eq!(plan.select.len(), 2);
1559 assert_eq!(plan.select[0].expression, "revenue");
1560 assert_eq!(plan.select[0].alias, "revenue");
1561 assert_eq!(plan.select[1].expression, "dimensions->>'category'");
1562 assert_eq!(plan.select[1].alias, "category");
1563
1564 assert_eq!(plan.windows.len(), 1);
1565 assert_eq!(plan.windows[0].alias, "rank");
1566 assert!(matches!(plan.windows[0].function, WindowFunctionType::RowNumber));
1567 assert_eq!(plan.windows[0].partition_by, vec!["dimensions->>'category'"]);
1568 assert_eq!(plan.windows[0].order_by.len(), 1);
1569 assert_eq!(plan.windows[0].order_by[0].field, "revenue");
1570 assert_eq!(plan.windows[0].order_by[0].direction, OrderDirection::Desc);
1571
1572 assert_eq!(plan.limit, Some(100));
1573 }
1574
1575 #[test]
1576 fn test_window_planner_running_sum() {
1577 let metadata = create_test_metadata();
1578 let request = WindowRequest {
1579 table_name: "tf_sales".to_string(),
1580 select: vec![WindowSelectColumn::Measure {
1581 name: "revenue".to_string(),
1582 alias: "revenue".to_string(),
1583 }],
1584 windows: vec![WindowFunctionRequest {
1585 function: WindowFunctionSpec::RunningSum {
1586 measure: "revenue".to_string(),
1587 },
1588 alias: "running_total".to_string(),
1589 partition_by: vec![],
1590 order_by: vec![WindowOrderBy {
1591 field: "occurred_at".to_string(),
1592 direction: OrderDirection::Asc,
1593 }],
1594 frame: Some(WindowFrame {
1595 frame_type: FrameType::Rows,
1596 start: FrameBoundary::UnboundedPreceding,
1597 end: FrameBoundary::CurrentRow,
1598 exclusion: None,
1599 }),
1600 }],
1601 where_clause: None,
1602 order_by: vec![],
1603 limit: None,
1604 offset: None,
1605 };
1606
1607 let plan = WindowPlanner::plan(request, metadata).expect("window plan should succeed");
1608
1609 assert_eq!(plan.windows.len(), 1);
1610 match &plan.windows[0].function {
1611 WindowFunctionType::Sum { field } => {
1612 assert_eq!(field, "revenue");
1613 },
1614 _ => panic!("Expected Sum function"),
1615 }
1616 assert_eq!(plan.windows[0].alias, "running_total");
1617 assert!(plan.windows[0].frame.is_some());
1618 }
1619
1620 #[test]
1621 fn test_window_planner_filter_column() {
1622 let metadata = create_test_metadata();
1623 let request = WindowRequest {
1624 table_name: "tf_sales".to_string(),
1625 select: vec![WindowSelectColumn::Filter {
1626 name: "occurred_at".to_string(),
1627 alias: "date".to_string(),
1628 }],
1629 windows: vec![],
1630 where_clause: None,
1631 order_by: vec![],
1632 limit: None,
1633 offset: None,
1634 };
1635
1636 let plan = WindowPlanner::plan(request, metadata).expect("window plan should succeed");
1637
1638 assert_eq!(plan.select.len(), 1);
1639 assert_eq!(plan.select[0].expression, "occurred_at");
1640 assert_eq!(plan.select[0].alias, "date");
1641 }
1642
1643 #[test]
1644 fn test_window_planner_invalid_measure() {
1645 let metadata = create_test_metadata();
1646 let request = WindowRequest {
1647 table_name: "tf_sales".to_string(),
1648 select: vec![WindowSelectColumn::Measure {
1649 name: "nonexistent".to_string(),
1650 alias: "alias".to_string(),
1651 }],
1652 windows: vec![],
1653 where_clause: None,
1654 order_by: vec![],
1655 limit: None,
1656 offset: None,
1657 };
1658
1659 let result = WindowPlanner::plan(request, metadata);
1660 assert!(result.is_err());
1661 assert!(result.unwrap_err().to_string().contains("not found"));
1662 }
1663
1664 #[test]
1665 fn test_window_planner_invalid_filter() {
1666 let metadata = create_test_metadata();
1667 let request = WindowRequest {
1668 table_name: "tf_sales".to_string(),
1669 select: vec![WindowSelectColumn::Filter {
1670 name: "nonexistent_filter".to_string(),
1671 alias: "alias".to_string(),
1672 }],
1673 windows: vec![],
1674 where_clause: None,
1675 order_by: vec![],
1676 limit: None,
1677 offset: None,
1678 };
1679
1680 let result = WindowPlanner::plan(request, metadata);
1681 assert!(result.is_err());
1682 assert!(result.unwrap_err().to_string().contains("not found"));
1683 }
1684
1685 #[test]
1686 fn test_window_planner_lag_function() {
1687 let metadata = create_test_metadata();
1688 let request = WindowRequest {
1689 table_name: "tf_sales".to_string(),
1690 select: vec![],
1691 windows: vec![WindowFunctionRequest {
1692 function: WindowFunctionSpec::Lag {
1693 field: "revenue".to_string(),
1694 offset: 1,
1695 default: Some(serde_json::json!(0)),
1696 },
1697 alias: "prev_revenue".to_string(),
1698 partition_by: vec![],
1699 order_by: vec![WindowOrderBy {
1700 field: "occurred_at".to_string(),
1701 direction: OrderDirection::Asc,
1702 }],
1703 frame: None,
1704 }],
1705 where_clause: None,
1706 order_by: vec![],
1707 limit: None,
1708 offset: None,
1709 };
1710
1711 let plan = WindowPlanner::plan(request, metadata).expect("window plan should succeed");
1712
1713 match &plan.windows[0].function {
1714 WindowFunctionType::Lag {
1715 field,
1716 offset,
1717 default,
1718 } => {
1719 assert_eq!(field, "revenue"); assert_eq!(*offset, 1);
1721 assert!(default.is_some());
1722 },
1723 _ => panic!("Expected Lag function"),
1724 }
1725 }
1726
1727 #[test]
1728 fn test_window_planner_dimension_field_in_lag() {
1729 let metadata = create_test_metadata();
1730 let request = WindowRequest {
1731 table_name: "tf_sales".to_string(),
1732 select: vec![],
1733 windows: vec![WindowFunctionRequest {
1734 function: WindowFunctionSpec::Lag {
1735 field: "category".to_string(), offset: 1,
1737 default: None,
1738 },
1739 alias: "prev_category".to_string(),
1740 partition_by: vec![],
1741 order_by: vec![WindowOrderBy {
1742 field: "occurred_at".to_string(),
1743 direction: OrderDirection::Asc,
1744 }],
1745 frame: None,
1746 }],
1747 where_clause: None,
1748 order_by: vec![],
1749 limit: None,
1750 offset: None,
1751 };
1752
1753 let plan = WindowPlanner::plan(request, metadata).expect("window plan should succeed");
1754
1755 match &plan.windows[0].function {
1756 WindowFunctionType::Lag { field, .. } => {
1757 assert_eq!(field, "dimensions->>'category'");
1759 },
1760 _ => panic!("Expected Lag function"),
1761 }
1762 }
1763
1764 #[test]
1765 fn test_window_planner_partition_by_filter() {
1766 let metadata = create_test_metadata();
1767 let request = WindowRequest {
1768 table_name: "tf_sales".to_string(),
1769 select: vec![],
1770 windows: vec![WindowFunctionRequest {
1771 function: WindowFunctionSpec::RowNumber,
1772 alias: "rank".to_string(),
1773 partition_by: vec![PartitionByColumn::Filter {
1774 name: "customer_id".to_string(),
1775 }],
1776 order_by: vec![],
1777 frame: None,
1778 }],
1779 where_clause: None,
1780 order_by: vec![],
1781 limit: None,
1782 offset: None,
1783 };
1784
1785 let plan = WindowPlanner::plan(request, metadata).expect("window plan should succeed");
1786
1787 assert_eq!(plan.windows[0].partition_by, vec!["customer_id"]);
1788 }
1789
1790 #[test]
1791 fn test_window_planner_final_order_by() {
1792 let metadata = create_test_metadata();
1793 let request = WindowRequest {
1794 table_name: "tf_sales".to_string(),
1795 select: vec![],
1796 windows: vec![],
1797 where_clause: None,
1798 order_by: vec![
1799 WindowOrderBy {
1800 field: "revenue".to_string(),
1801 direction: OrderDirection::Desc,
1802 },
1803 WindowOrderBy {
1804 field: "category".to_string(), direction: OrderDirection::Asc,
1806 },
1807 ],
1808 limit: None,
1809 offset: None,
1810 };
1811
1812 let plan = WindowPlanner::plan(request, metadata).expect("window plan should succeed");
1813
1814 assert_eq!(plan.order_by.len(), 2);
1815 assert_eq!(plan.order_by[0].field, "revenue");
1816 assert_eq!(plan.order_by[0].direction, OrderDirection::Desc);
1817 assert_eq!(plan.order_by[1].field, "dimensions->>'category'");
1818 assert_eq!(plan.order_by[1].direction, OrderDirection::Asc);
1819 }
1820
1821 #[test]
1822 fn test_window_request_serialization() {
1823 let request = WindowRequest {
1824 table_name: "tf_sales".to_string(),
1825 select: vec![WindowSelectColumn::Measure {
1826 name: "revenue".to_string(),
1827 alias: "revenue".to_string(),
1828 }],
1829 windows: vec![WindowFunctionRequest {
1830 function: WindowFunctionSpec::RowNumber,
1831 alias: "rank".to_string(),
1832 partition_by: vec![],
1833 order_by: vec![],
1834 frame: None,
1835 }],
1836 where_clause: None,
1837 order_by: vec![],
1838 limit: Some(10),
1839 offset: None,
1840 };
1841
1842 let json = serialize_json(&request);
1844 assert!(json.contains("tf_sales"));
1845 assert!(json.contains("revenue"));
1846 assert!(json.contains("row_number"));
1847
1848 let deserialized: WindowRequest = deserialize_json(&json);
1850 assert_eq!(deserialized.table_name, "tf_sales");
1851 assert_eq!(deserialized.limit, Some(10));
1852 }
1853
1854 #[test]
1855 fn test_window_function_spec_serialization() {
1856 let spec = WindowFunctionSpec::RunningSum {
1857 measure: "revenue".to_string(),
1858 };
1859 let json = serialize_json(&spec);
1860 assert!(json.contains("running_sum"));
1861 assert!(json.contains("revenue"));
1862
1863 let spec2 = WindowFunctionSpec::Ntile { n: 4 };
1864 let json2 = serialize_json(&spec2);
1865 assert!(json2.contains("ntile"));
1866 assert!(json2.contains("4"));
1867 }
1868}