Skip to main content

fraiseql_core/runtime/
window_parser.rs

1//! Window Query Parser
2//!
3//! Parses GraphQL window queries into `WindowRequest` for execution.
4//!
5//! # GraphQL Query Format
6//!
7//! ```graphql
8//! query {
9//!   sales_window(
10//!     where: { customer_id: { _eq: "uuid-123" } }
11//!     orderBy: { occurred_at: ASC }
12//!     limit: 100
13//!   ) {
14//!     revenue
15//!     category
16//!     rank: row_number(partitionBy: ["category"], orderBy: { revenue: DESC })
17//!     running_total: sum(field: "revenue", orderBy: { occurred_at: ASC })
18//!     prev_revenue: lag(field: "revenue", offset: 1, default: 0)
19//!   }
20//! }
21//! ```
22//!
23//! # JSON Query Format
24//!
25//! ```json
26//! {
27//!   "table": "tf_sales",
28//!   "select": [
29//!     {"type": "measure", "name": "revenue", "alias": "revenue"},
30//!     {"type": "dimension", "path": "category", "alias": "category"}
31//!   ],
32//!   "windows": [
33//!     {
34//!       "function": {"type": "row_number"},
35//!       "alias": "rank",
36//!       "partitionBy": [{"type": "dimension", "path": "category"}],
37//!       "orderBy": [{"field": "revenue", "direction": "DESC"}]
38//!     },
39//!     {
40//!       "function": {"type": "running_sum", "measure": "revenue"},
41//!       "alias": "running_total",
42//!       "orderBy": [{"field": "occurred_at", "direction": "ASC"}],
43//!       "frame": {"frame_type": "ROWS", "start": {"type": "unbounded_preceding"}, "end": {"type": "current_row"}}
44//!     }
45//!   ],
46//!   "orderBy": [{"field": "occurred_at", "direction": "ASC"}],
47//!   "limit": 100
48//! }
49//! ```
50
51use serde_json::Value;
52
53use crate::{
54    compiler::{
55        aggregation::OrderDirection,
56        fact_table::FactTableMetadata,
57        window_functions::{
58            FrameBoundary, FrameExclusion, FrameType, PartitionByColumn, WindowFrame,
59            WindowFunctionRequest, WindowFunctionSpec, WindowOrderBy, WindowRequest,
60            WindowSelectColumn,
61        },
62    },
63    db::where_clause::{WhereClause, WhereOperator},
64    error::{FraiseQLError, Result},
65};
66
67/// Window query parser
68pub struct WindowQueryParser;
69
70impl WindowQueryParser {
71    /// Parse a window query JSON into `WindowRequest`.
72    ///
73    /// # Arguments
74    ///
75    /// * `query_json` - JSON representation of the window query
76    /// * `_metadata` - Fact table metadata (for validation, optional future use)
77    ///
78    /// # Errors
79    ///
80    /// Returns error if the query structure is invalid.
81    pub fn parse(query_json: &Value, _metadata: &FactTableMetadata) -> Result<WindowRequest> {
82        // Extract table name
83        let table_name = query_json
84            .get("table")
85            .and_then(|v| v.as_str())
86            .ok_or_else(|| FraiseQLError::Validation {
87                message: "Missing 'table' field in window query".to_string(),
88                path:    None,
89            })?
90            .to_string();
91
92        // Parse SELECT columns
93        let select = if let Some(select_array) = query_json.get("select") {
94            Self::parse_select_columns(select_array)?
95        } else {
96            vec![]
97        };
98
99        // Parse window functions
100        let windows = if let Some(windows_array) = query_json.get("windows") {
101            Self::parse_window_functions(windows_array)?
102        } else {
103            vec![]
104        };
105
106        // Parse WHERE clause
107        let where_clause = if let Some(where_obj) = query_json.get("where") {
108            Some(Self::parse_where_clause(where_obj)?)
109        } else {
110            None
111        };
112
113        // Parse final ORDER BY
114        let order_by = if let Some(order_array) = query_json.get("orderBy") {
115            Self::parse_order_by(order_array)?
116        } else {
117            vec![]
118        };
119
120        // Parse LIMIT/OFFSET
121        let limit = query_json
122            .get("limit")
123            .and_then(|v| v.as_u64())
124            .map(|n| u32::try_from(n).unwrap_or(u32::MAX));
125
126        let offset = query_json
127            .get("offset")
128            .and_then(|v| v.as_u64())
129            .map(|n| u32::try_from(n).unwrap_or(u32::MAX));
130
131        Ok(WindowRequest {
132            table_name,
133            select,
134            windows,
135            where_clause,
136            order_by,
137            limit,
138            offset,
139        })
140    }
141
142    /// Parse SELECT columns from JSON array.
143    fn parse_select_columns(select_array: &Value) -> Result<Vec<WindowSelectColumn>> {
144        let Some(arr) = select_array.as_array() else {
145            return Ok(vec![]);
146        };
147
148        arr.iter().map(Self::parse_single_select_column).collect()
149    }
150
151    fn parse_single_select_column(col: &Value) -> Result<WindowSelectColumn> {
152        let col_type =
153            col.get("type")
154                .and_then(|v| v.as_str())
155                .ok_or_else(|| FraiseQLError::Validation {
156                    message: "Missing 'type' in select column".to_string(),
157                    path:    None,
158                })?;
159
160        let alias = col
161            .get("alias")
162            .and_then(|v| v.as_str())
163            .ok_or_else(|| FraiseQLError::Validation {
164                message: "Missing 'alias' in select column".to_string(),
165                path:    None,
166            })?
167            .to_string();
168
169        match col_type {
170            "measure" => {
171                let name = col
172                    .get("name")
173                    .and_then(|v| v.as_str())
174                    .ok_or_else(|| FraiseQLError::Validation {
175                        message: "Missing 'name' in measure select column".to_string(),
176                        path:    None,
177                    })?
178                    .to_string();
179                Ok(WindowSelectColumn::Measure { name, alias })
180            },
181            "dimension" => {
182                let path = col
183                    .get("path")
184                    .and_then(|v| v.as_str())
185                    .ok_or_else(|| FraiseQLError::Validation {
186                        message: "Missing 'path' in dimension select column".to_string(),
187                        path:    None,
188                    })?
189                    .to_string();
190                Ok(WindowSelectColumn::Dimension { path, alias })
191            },
192            "filter" => {
193                let name = col
194                    .get("name")
195                    .and_then(|v| v.as_str())
196                    .ok_or_else(|| FraiseQLError::Validation {
197                        message: "Missing 'name' in filter select column".to_string(),
198                        path:    None,
199                    })?
200                    .to_string();
201                Ok(WindowSelectColumn::Filter { name, alias })
202            },
203            _ => Err(FraiseQLError::Validation {
204                message: format!("Unknown select column type: {col_type}"),
205                path:    None,
206            }),
207        }
208    }
209
210    /// Parse window functions from JSON array.
211    fn parse_window_functions(windows_array: &Value) -> Result<Vec<WindowFunctionRequest>> {
212        let Some(arr) = windows_array.as_array() else {
213            return Ok(vec![]);
214        };
215
216        arr.iter().map(Self::parse_single_window_function).collect()
217    }
218
219    fn parse_single_window_function(window: &Value) -> Result<WindowFunctionRequest> {
220        // Parse function spec
221        let function = window
222            .get("function")
223            .ok_or_else(|| FraiseQLError::Validation {
224                message: "Missing 'function' in window definition".to_string(),
225                path:    None,
226            })
227            .and_then(Self::parse_function_spec)?;
228
229        // Parse alias
230        let alias = window
231            .get("alias")
232            .and_then(|v| v.as_str())
233            .ok_or_else(|| FraiseQLError::Validation {
234                message: "Missing 'alias' in window definition".to_string(),
235                path:    None,
236            })?
237            .to_string();
238
239        // Parse PARTITION BY
240        let partition_by = if let Some(partition_array) = window.get("partitionBy") {
241            Self::parse_partition_by(partition_array)?
242        } else {
243            vec![]
244        };
245
246        // Parse ORDER BY within window
247        let order_by = if let Some(order_array) = window.get("orderBy") {
248            Self::parse_order_by(order_array)?
249        } else {
250            vec![]
251        };
252
253        // Parse frame
254        let frame = window.get("frame").map(Self::parse_frame).transpose()?;
255
256        Ok(WindowFunctionRequest {
257            function,
258            alias,
259            partition_by,
260            order_by,
261            frame,
262        })
263    }
264
265    /// Parse window function specification.
266    fn parse_function_spec(func: &Value) -> Result<WindowFunctionSpec> {
267        let func_type =
268            func.get("type")
269                .and_then(|v| v.as_str())
270                .ok_or_else(|| FraiseQLError::Validation {
271                    message: "Missing 'type' in function spec".to_string(),
272                    path:    None,
273                })?;
274
275        match func_type {
276            // Ranking functions
277            "row_number" => Ok(WindowFunctionSpec::RowNumber),
278            "rank" => Ok(WindowFunctionSpec::Rank),
279            "dense_rank" => Ok(WindowFunctionSpec::DenseRank),
280            "ntile" => {
281                let n = u32::try_from(func.get("n").and_then(|v| v.as_u64()).ok_or_else(|| {
282                    FraiseQLError::Validation {
283                        message: "Missing 'n' in NTILE function".to_string(),
284                        path:    None,
285                    }
286                })?)
287                .unwrap_or(u32::MAX);
288                Ok(WindowFunctionSpec::Ntile { n })
289            },
290            "percent_rank" => Ok(WindowFunctionSpec::PercentRank),
291            "cume_dist" => Ok(WindowFunctionSpec::CumeDist),
292
293            // Value functions
294            "lag" => {
295                let field = Self::extract_string_field(func, "field")?;
296                let offset =
297                    i32::try_from(func.get("offset").and_then(|v| v.as_i64()).unwrap_or(1))
298                        .unwrap_or(1);
299                let default = func.get("default").cloned();
300                Ok(WindowFunctionSpec::Lag {
301                    field,
302                    offset,
303                    default,
304                })
305            },
306            "lead" => {
307                let field = Self::extract_string_field(func, "field")?;
308                let offset =
309                    i32::try_from(func.get("offset").and_then(|v| v.as_i64()).unwrap_or(1))
310                        .unwrap_or(1);
311                let default = func.get("default").cloned();
312                Ok(WindowFunctionSpec::Lead {
313                    field,
314                    offset,
315                    default,
316                })
317            },
318            "first_value" => {
319                let field = Self::extract_string_field(func, "field")?;
320                Ok(WindowFunctionSpec::FirstValue { field })
321            },
322            "last_value" => {
323                let field = Self::extract_string_field(func, "field")?;
324                Ok(WindowFunctionSpec::LastValue { field })
325            },
326            "nth_value" => {
327                let field = Self::extract_string_field(func, "field")?;
328                let n = u32::try_from(func.get("n").and_then(|v| v.as_u64()).ok_or_else(|| {
329                    FraiseQLError::Validation {
330                        message: "Missing 'n' in NTH_VALUE function".to_string(),
331                        path:    None,
332                    }
333                })?)
334                .unwrap_or(u32::MAX);
335                Ok(WindowFunctionSpec::NthValue { field, n })
336            },
337
338            // Aggregate as window functions
339            "running_sum" => {
340                let measure = Self::extract_string_field(func, "measure")?;
341                Ok(WindowFunctionSpec::RunningSum { measure })
342            },
343            "running_avg" => {
344                let measure = Self::extract_string_field(func, "measure")?;
345                Ok(WindowFunctionSpec::RunningAvg { measure })
346            },
347            "running_count" => {
348                if let Some(field) = func.get("field").and_then(|v| v.as_str()) {
349                    Ok(WindowFunctionSpec::RunningCountField {
350                        field: field.to_string(),
351                    })
352                } else {
353                    Ok(WindowFunctionSpec::RunningCount)
354                }
355            },
356            "running_min" => {
357                let measure = Self::extract_string_field(func, "measure")?;
358                Ok(WindowFunctionSpec::RunningMin { measure })
359            },
360            "running_max" => {
361                let measure = Self::extract_string_field(func, "measure")?;
362                Ok(WindowFunctionSpec::RunningMax { measure })
363            },
364            "running_stddev" => {
365                let measure = Self::extract_string_field(func, "measure")?;
366                Ok(WindowFunctionSpec::RunningStddev { measure })
367            },
368            "running_variance" => {
369                let measure = Self::extract_string_field(func, "measure")?;
370                Ok(WindowFunctionSpec::RunningVariance { measure })
371            },
372
373            _ => Err(FraiseQLError::Validation {
374                message: format!("Unknown window function type: {func_type}"),
375                path:    None,
376            }),
377        }
378    }
379
380    /// Extract a required string field from JSON object.
381    fn extract_string_field(obj: &Value, field_name: &str) -> Result<String> {
382        obj.get(field_name).and_then(|v| v.as_str()).map(String::from).ok_or_else(|| {
383            FraiseQLError::Validation {
384                message: format!("Missing '{field_name}' in function spec"),
385                path:    None,
386            }
387        })
388    }
389
390    /// Parse PARTITION BY from JSON array.
391    fn parse_partition_by(partition_array: &Value) -> Result<Vec<PartitionByColumn>> {
392        let Some(arr) = partition_array.as_array() else {
393            return Ok(vec![]);
394        };
395
396        arr.iter()
397            .map(|item| {
398                let col_type = item.get("type").and_then(|v| v.as_str()).ok_or_else(|| {
399                    FraiseQLError::Validation {
400                        message: "Missing 'type' in partitionBy column".to_string(),
401                        path:    None,
402                    }
403                })?;
404
405                match col_type {
406                    "dimension" => {
407                        let path = item
408                            .get("path")
409                            .and_then(|v| v.as_str())
410                            .ok_or_else(|| FraiseQLError::Validation {
411                                message: "Missing 'path' in dimension partition column".to_string(),
412                                path:    None,
413                            })?
414                            .to_string();
415                        Ok(PartitionByColumn::Dimension { path })
416                    },
417                    "filter" => {
418                        let name = item
419                            .get("name")
420                            .and_then(|v| v.as_str())
421                            .ok_or_else(|| FraiseQLError::Validation {
422                                message: "Missing 'name' in filter partition column".to_string(),
423                                path:    None,
424                            })?
425                            .to_string();
426                        Ok(PartitionByColumn::Filter { name })
427                    },
428                    "measure" => {
429                        let name = item
430                            .get("name")
431                            .and_then(|v| v.as_str())
432                            .ok_or_else(|| FraiseQLError::Validation {
433                                message: "Missing 'name' in measure partition column".to_string(),
434                                path:    None,
435                            })?
436                            .to_string();
437                        Ok(PartitionByColumn::Measure { name })
438                    },
439                    _ => Err(FraiseQLError::Validation {
440                        message: format!("Unknown partition column type: {col_type}"),
441                        path:    None,
442                    }),
443                }
444            })
445            .collect()
446    }
447
448    /// Parse ORDER BY from JSON array.
449    fn parse_order_by(order_array: &Value) -> Result<Vec<WindowOrderBy>> {
450        let Some(arr) = order_array.as_array() else {
451            return Ok(vec![]);
452        };
453
454        arr.iter()
455            .map(|item| {
456                let field = item
457                    .get("field")
458                    .and_then(|v| v.as_str())
459                    .ok_or_else(|| FraiseQLError::Validation {
460                        message: "Missing 'field' in orderBy".to_string(),
461                        path:    None,
462                    })?
463                    .to_string();
464
465                let direction = match item.get("direction").and_then(|v| v.as_str()) {
466                    Some("DESC" | "desc") => OrderDirection::Desc,
467                    _ => OrderDirection::Asc,
468                };
469
470                Ok(WindowOrderBy { field, direction })
471            })
472            .collect()
473    }
474
475    /// Parse WHERE clause from JSON.
476    fn parse_where_clause(where_obj: &Value) -> Result<WhereClause> {
477        let Some(obj) = where_obj.as_object() else {
478            return Ok(WhereClause::And(vec![]));
479        };
480
481        let mut conditions = Vec::new();
482
483        for (key, value) in obj {
484            // Parse field_operator format (e.g., "customer_id_eq" -> field="customer_id",
485            // operator="eq")
486            if let Some((field, operator_str)) = Self::parse_where_field_and_operator(key)? {
487                let operator = WhereOperator::from_str(operator_str)?;
488
489                conditions.push(WhereClause::Field {
490                    path: vec![field.to_string()],
491                    operator,
492                    value: value.clone(),
493                });
494            }
495        }
496
497        Ok(WhereClause::And(conditions))
498    }
499
500    /// Parse WHERE field and operator from key.
501    fn parse_where_field_and_operator(key: &str) -> Result<Option<(&str, &str)>> {
502        if let Some(last_underscore) = key.rfind('_') {
503            let field = &key[..last_underscore];
504            let operator = &key[last_underscore + 1..];
505
506            match WhereOperator::from_str(operator) {
507                Ok(_) => Ok(Some((field, operator))),
508                Err(_) => Ok(None),
509            }
510        } else {
511            Ok(None)
512        }
513    }
514
515    /// Parse window frame from JSON.
516    fn parse_frame(frame: &Value) -> Result<WindowFrame> {
517        let frame_type = match frame.get("frame_type").and_then(|v| v.as_str()) {
518            Some("ROWS") => FrameType::Rows,
519            Some("RANGE") => FrameType::Range,
520            Some("GROUPS") => FrameType::Groups,
521            _ => {
522                return Err(FraiseQLError::Validation {
523                    message: "Invalid or missing 'frame_type' in frame".to_string(),
524                    path:    None,
525                });
526            },
527        };
528
529        let start = frame
530            .get("start")
531            .ok_or_else(|| FraiseQLError::Validation {
532                message: "Missing 'start' in frame".to_string(),
533                path:    None,
534            })
535            .and_then(Self::parse_frame_boundary)?;
536
537        let end = frame
538            .get("end")
539            .ok_or_else(|| FraiseQLError::Validation {
540                message: "Missing 'end' in frame".to_string(),
541                path:    None,
542            })
543            .and_then(Self::parse_frame_boundary)?;
544
545        let exclusion = frame.get("exclusion").and_then(|v| v.as_str()).map(|s| match s {
546            "current_row" => FrameExclusion::CurrentRow,
547            "group" => FrameExclusion::Group,
548            "ties" => FrameExclusion::Ties,
549            _ => FrameExclusion::NoOthers,
550        });
551
552        Ok(WindowFrame {
553            frame_type,
554            start,
555            end,
556            exclusion,
557        })
558    }
559
560    /// Parse frame boundary from JSON.
561    fn parse_frame_boundary(boundary: &Value) -> Result<FrameBoundary> {
562        let boundary_type = boundary.get("type").and_then(|v| v.as_str()).ok_or_else(|| {
563            FraiseQLError::Validation {
564                message: "Missing 'type' in frame boundary".to_string(),
565                path:    None,
566            }
567        })?;
568
569        match boundary_type {
570            "unbounded_preceding" => Ok(FrameBoundary::UnboundedPreceding),
571            "n_preceding" => {
572                let n =
573                    u32::try_from(boundary.get("n").and_then(|v| v.as_u64()).ok_or_else(|| {
574                        FraiseQLError::Validation {
575                            message: "Missing 'n' in N PRECEDING boundary".to_string(),
576                            path:    None,
577                        }
578                    })?)
579                    .unwrap_or(u32::MAX);
580                Ok(FrameBoundary::NPreceding { n })
581            },
582            "current_row" => Ok(FrameBoundary::CurrentRow),
583            "n_following" => {
584                let n =
585                    u32::try_from(boundary.get("n").and_then(|v| v.as_u64()).ok_or_else(|| {
586                        FraiseQLError::Validation {
587                            message: "Missing 'n' in N FOLLOWING boundary".to_string(),
588                            path:    None,
589                        }
590                    })?)
591                    .unwrap_or(u32::MAX);
592                Ok(FrameBoundary::NFollowing { n })
593            },
594            "unbounded_following" => Ok(FrameBoundary::UnboundedFollowing),
595            _ => Err(FraiseQLError::Validation {
596                message: format!("Unknown frame boundary type: {boundary_type}"),
597                path:    None,
598            }),
599        }
600    }
601}
602
603#[cfg(test)]
604mod tests {
605    #![allow(clippy::unwrap_used)] // Reason: test code, panics are acceptable
606
607    use serde_json::json;
608
609    use super::*;
610    use crate::compiler::fact_table::{DimensionColumn, FilterColumn, MeasureColumn, SqlType};
611
612    fn create_test_metadata() -> FactTableMetadata {
613        FactTableMetadata {
614            table_name:           "tf_sales".to_string(),
615            measures:             vec![
616                MeasureColumn {
617                    name:     "revenue".to_string(),
618                    sql_type: SqlType::Decimal,
619                    nullable: false,
620                },
621                MeasureColumn {
622                    name:     "quantity".to_string(),
623                    sql_type: SqlType::Int,
624                    nullable: false,
625                },
626            ],
627            dimensions:           DimensionColumn {
628                name:  "dimensions".to_string(),
629                paths: vec![],
630            },
631            denormalized_filters: vec![
632                FilterColumn {
633                    name:     "customer_id".to_string(),
634                    sql_type: SqlType::Uuid,
635                    indexed:  true,
636                },
637                FilterColumn {
638                    name:     "occurred_at".to_string(),
639                    sql_type: SqlType::Timestamp,
640                    indexed:  true,
641                },
642            ],
643            calendar_dimensions:  vec![],
644        }
645    }
646
647    #[test]
648    fn test_parse_simple_window_query() {
649        let metadata = create_test_metadata();
650        let query = json!({
651            "table": "tf_sales",
652            "select": [
653                {"type": "measure", "name": "revenue", "alias": "revenue"}
654            ],
655            "windows": [
656                {
657                    "function": {"type": "row_number"},
658                    "alias": "rank",
659                    "partitionBy": [{"type": "dimension", "path": "category"}],
660                    "orderBy": [{"field": "revenue", "direction": "DESC"}]
661                }
662            ]
663        });
664
665        let request = WindowQueryParser::parse(&query, &metadata).unwrap();
666
667        assert_eq!(request.table_name, "tf_sales");
668        assert_eq!(request.select.len(), 1);
669        assert_eq!(request.windows.len(), 1);
670        assert_eq!(request.windows[0].alias, "rank");
671        assert!(matches!(request.windows[0].function, WindowFunctionSpec::RowNumber));
672    }
673
674    #[test]
675    fn test_parse_running_sum() {
676        let metadata = create_test_metadata();
677        let query = json!({
678            "table": "tf_sales",
679            "select": [],
680            "windows": [
681                {
682                    "function": {"type": "running_sum", "measure": "revenue"},
683                    "alias": "running_total",
684                    "orderBy": [{"field": "occurred_at", "direction": "ASC"}],
685                    "frame": {
686                        "frame_type": "ROWS",
687                        "start": {"type": "unbounded_preceding"},
688                        "end": {"type": "current_row"}
689                    }
690                }
691            ]
692        });
693
694        let request = WindowQueryParser::parse(&query, &metadata).unwrap();
695
696        assert_eq!(request.windows.len(), 1);
697        match &request.windows[0].function {
698            WindowFunctionSpec::RunningSum { measure } => {
699                assert_eq!(measure, "revenue");
700            },
701            _ => panic!("Expected RunningSum function"),
702        }
703        assert!(request.windows[0].frame.is_some());
704    }
705
706    #[test]
707    fn test_parse_lag_function() {
708        let metadata = create_test_metadata();
709        let query = json!({
710            "table": "tf_sales",
711            "windows": [
712                {
713                    "function": {"type": "lag", "field": "revenue", "offset": 1, "default": 0},
714                    "alias": "prev_revenue",
715                    "orderBy": [{"field": "occurred_at"}]
716                }
717            ]
718        });
719
720        let request = WindowQueryParser::parse(&query, &metadata).unwrap();
721
722        match &request.windows[0].function {
723            WindowFunctionSpec::Lag {
724                field,
725                offset,
726                default,
727            } => {
728                assert_eq!(field, "revenue");
729                assert_eq!(*offset, 1);
730                assert!(default.is_some());
731            },
732            _ => panic!("Expected Lag function"),
733        }
734    }
735
736    #[test]
737    fn test_parse_ntile_function() {
738        let metadata = create_test_metadata();
739        let query = json!({
740            "table": "tf_sales",
741            "windows": [
742                {
743                    "function": {"type": "ntile", "n": 4},
744                    "alias": "quartile",
745                    "orderBy": [{"field": "revenue", "direction": "DESC"}]
746                }
747            ]
748        });
749
750        let request = WindowQueryParser::parse(&query, &metadata).unwrap();
751
752        match &request.windows[0].function {
753            WindowFunctionSpec::Ntile { n } => {
754                assert_eq!(*n, 4);
755            },
756            _ => panic!("Expected Ntile function"),
757        }
758    }
759
760    #[test]
761    fn test_parse_select_columns() {
762        let metadata = create_test_metadata();
763        let query = json!({
764            "table": "tf_sales",
765            "select": [
766                {"type": "measure", "name": "revenue", "alias": "rev"},
767                {"type": "dimension", "path": "category", "alias": "cat"},
768                {"type": "filter", "name": "occurred_at", "alias": "date"}
769            ]
770        });
771
772        let request = WindowQueryParser::parse(&query, &metadata).unwrap();
773
774        assert_eq!(request.select.len(), 3);
775        assert!(matches!(
776            &request.select[0],
777            WindowSelectColumn::Measure { name, alias } if name == "revenue" && alias == "rev"
778        ));
779        assert!(matches!(
780            &request.select[1],
781            WindowSelectColumn::Dimension { path, alias } if path == "category" && alias == "cat"
782        ));
783        assert!(matches!(
784            &request.select[2],
785            WindowSelectColumn::Filter { name, alias } if name == "occurred_at" && alias == "date"
786        ));
787    }
788
789    #[test]
790    fn test_parse_partition_by() {
791        let metadata = create_test_metadata();
792        let query = json!({
793            "table": "tf_sales",
794            "windows": [
795                {
796                    "function": {"type": "row_number"},
797                    "alias": "rank",
798                    "partitionBy": [
799                        {"type": "dimension", "path": "category"},
800                        {"type": "filter", "name": "customer_id"}
801                    ],
802                    "orderBy": []
803                }
804            ]
805        });
806
807        let request = WindowQueryParser::parse(&query, &metadata).unwrap();
808
809        assert_eq!(request.windows[0].partition_by.len(), 2);
810        assert!(matches!(
811            &request.windows[0].partition_by[0],
812            PartitionByColumn::Dimension { path } if path == "category"
813        ));
814        assert!(matches!(
815            &request.windows[0].partition_by[1],
816            PartitionByColumn::Filter { name } if name == "customer_id"
817        ));
818    }
819
820    #[test]
821    fn test_parse_limit_offset() {
822        let metadata = create_test_metadata();
823        let query = json!({
824            "table": "tf_sales",
825            "limit": 100,
826            "offset": 50
827        });
828
829        let request = WindowQueryParser::parse(&query, &metadata).unwrap();
830
831        assert_eq!(request.limit, Some(100));
832        assert_eq!(request.offset, Some(50));
833    }
834
835    #[test]
836    fn test_parse_final_order_by() {
837        let metadata = create_test_metadata();
838        let query = json!({
839            "table": "tf_sales",
840            "orderBy": [
841                {"field": "revenue", "direction": "DESC"},
842                {"field": "occurred_at", "direction": "ASC"}
843            ]
844        });
845
846        let request = WindowQueryParser::parse(&query, &metadata).unwrap();
847
848        assert_eq!(request.order_by.len(), 2);
849        assert_eq!(request.order_by[0].field, "revenue");
850        assert_eq!(request.order_by[0].direction, OrderDirection::Desc);
851        assert_eq!(request.order_by[1].field, "occurred_at");
852        assert_eq!(request.order_by[1].direction, OrderDirection::Asc);
853    }
854
855    #[test]
856    fn test_parse_complex_window_query() {
857        let metadata = create_test_metadata();
858        let query = json!({
859            "table": "tf_sales",
860            "select": [
861                {"type": "measure", "name": "revenue", "alias": "revenue"},
862                {"type": "dimension", "path": "category", "alias": "category"}
863            ],
864            "windows": [
865                {
866                    "function": {"type": "row_number"},
867                    "alias": "rank",
868                    "partitionBy": [{"type": "dimension", "path": "category"}],
869                    "orderBy": [{"field": "revenue", "direction": "DESC"}]
870                },
871                {
872                    "function": {"type": "running_sum", "measure": "revenue"},
873                    "alias": "running_total",
874                    "partitionBy": [{"type": "dimension", "path": "category"}],
875                    "orderBy": [{"field": "occurred_at", "direction": "ASC"}],
876                    "frame": {
877                        "frame_type": "ROWS",
878                        "start": {"type": "unbounded_preceding"},
879                        "end": {"type": "current_row"}
880                    }
881                },
882                {
883                    "function": {"type": "lag", "field": "revenue", "offset": 1},
884                    "alias": "prev_revenue",
885                    "partitionBy": [{"type": "dimension", "path": "category"}],
886                    "orderBy": [{"field": "occurred_at", "direction": "ASC"}]
887                }
888            ],
889            "orderBy": [
890                {"field": "category", "direction": "ASC"},
891                {"field": "revenue", "direction": "DESC"}
892            ],
893            "limit": 100
894        });
895
896        let request = WindowQueryParser::parse(&query, &metadata).unwrap();
897
898        assert_eq!(request.table_name, "tf_sales");
899        assert_eq!(request.select.len(), 2);
900        assert_eq!(request.windows.len(), 3);
901        assert_eq!(request.order_by.len(), 2);
902        assert_eq!(request.limit, Some(100));
903    }
904
905    #[test]
906    fn test_parse_error_missing_table() {
907        let metadata = create_test_metadata();
908        let query = json!({
909            "select": [],
910            "windows": []
911        });
912
913        let result = WindowQueryParser::parse(&query, &metadata);
914        let err = result.expect_err("expected Err for missing table field");
915        assert!(err.to_string().contains("table"), "unexpected error message: {err}");
916    }
917
918    #[test]
919    fn test_parse_error_invalid_function_type() {
920        let metadata = create_test_metadata();
921        let query = json!({
922            "table": "tf_sales",
923            "windows": [
924                {
925                    "function": {"type": "invalid_function"},
926                    "alias": "test"
927                }
928            ]
929        });
930
931        let result = WindowQueryParser::parse(&query, &metadata);
932        let err = result.expect_err("expected Err for invalid window function type");
933        assert!(err.to_string().contains("Unknown"), "unexpected error message: {err}");
934    }
935}