Skip to main content

fraiseql_core/compiler/
window_functions.rs

1//! Window Function Planning Module
2//!
3//! Generates execution plans for SQL window functions.
4//!
5//! # Architecture
6//!
7//! ```text
8//! WindowRequest (high-level, semantic)
9//!      ↓
10//! WindowPlanner::plan() (validates against FactTableMetadata)
11//!      ↓
12//! WindowExecutionPlan (low-level, SQL expressions)
13//!      ↓
14//! WindowSqlGenerator (database-specific SQL)
15//! ```
16//!
17//! # Window Functions
18//!
19//! Window functions perform calculations across sets of table rows that are related
20//! to the current row, without collapsing them into a single output row like GROUP BY.
21//!
22//! ## Function Types
23//!
24//! ### Ranking Functions
25//! - `ROW_NUMBER()` - Sequential number within partition
26//! - `RANK()` - Rank with gaps for ties
27//! - `DENSE_RANK()` - Rank without gaps
28//! - `NTILE(n)` - Divide rows into n groups
29//! - `PERCENT_RANK()` - Relative rank (0 to 1)
30//! - `CUME_DIST()` - Cumulative distribution
31//!
32//! ### Value Functions
33//! - `LAG(field, offset)` - Value from previous row
34//! - `LEAD(field, offset)` - Value from next row
35//! - `FIRST_VALUE(field)` - First value in window
36//! - `LAST_VALUE(field)` - Last value in window
37//! - `NTH_VALUE(field, n)` - Nth value in window
38//!
39//! ### Aggregate as Window
40//! - `SUM(field) OVER (...)` - Running total
41//! - `AVG(field) OVER (...)` - Moving average
42//! - `COUNT(*) OVER (...)` - Running count
43//!
44//! # High-Level Example (WindowRequest)
45//!
46//! ```rust,ignore
47//! use fraiseql_core::compiler::window_functions::*;
48//!
49//! let request = WindowRequest {
50//!     table_name: "tf_sales".to_string(),
51//!     select: vec![
52//!         WindowSelectColumn::Measure { name: "revenue".to_string(), alias: "revenue".to_string() },
53//!         WindowSelectColumn::Dimension { path: "category".to_string(), alias: "category".to_string() },
54//!     ],
55//!     windows: vec![
56//!         WindowFunctionRequest {
57//!             function: WindowFunctionSpec::RowNumber,
58//!             alias: "rank".to_string(),
59//!             partition_by: vec![PartitionByColumn::Dimension { path: "category".to_string() }],
60//!             order_by: vec![WindowOrderBy { field: "revenue".to_string(), direction: OrderDirection::Desc }],
61//!             frame: None,
62//!         },
63//!     ],
64//!     where_clause: None,
65//!     order_by: vec![],
66//!     limit: Some(100),
67//!     offset: None,
68//! };
69//!
70//! let plan = WindowPlanner::plan(request, metadata)?;
71//! ```
72//!
73//! # SQL Example (WindowExecutionPlan output)
74//!
75//! ```sql
76//! -- Running total
77//! SELECT
78//!     occurred_at,
79//!     revenue,
80//!     SUM(revenue) OVER (
81//!         ORDER BY occurred_at
82//!         ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
83//!     ) as running_total
84//! FROM tf_sales;
85//!
86//! -- Ranking
87//! SELECT
88//!     category,
89//!     revenue,
90//!     ROW_NUMBER() OVER (
91//!         PARTITION BY category
92//!         ORDER BY revenue DESC
93//!     ) as rank
94//! FROM tf_sales;
95//! ```
96
97use serde::{Deserialize, Serialize};
98
99use crate::{
100    compiler::{
101        aggregation::{OrderByClause, OrderDirection},
102        fact_table::FactTableMetadata,
103    },
104    db::where_clause::WhereClause,
105    error::{FraiseQLError, Result},
106};
107
108// =============================================================================
109// High-Level Types (WindowRequest) - Semantic names, validated against metadata
110// =============================================================================
111
112/// High-level window query request using semantic field names.
113///
114/// This is the user-facing API that uses measure names and dimension paths
115/// instead of raw SQL expressions. It gets validated and converted to
116/// `WindowExecutionPlan` by `WindowPlanner::plan()`.
117///
118/// # Example
119///
120/// ```rust,ignore
121/// let request = WindowRequest {
122///     table_name: "tf_sales".to_string(),
123///     select: vec![
124///         WindowSelectColumn::Measure { name: "revenue".to_string(), alias: "revenue".to_string() },
125///         WindowSelectColumn::Dimension { path: "category".to_string(), alias: "category".to_string() },
126///     ],
127///     windows: vec![WindowFunctionRequest {
128///         function: WindowFunctionSpec::RunningSum { measure: "revenue".to_string() },
129///         alias: "running_total".to_string(),
130///         partition_by: vec![],
131///         order_by: vec![WindowOrderBy { field: "occurred_at".to_string(), direction: OrderDirection::Asc }],
132///         frame: Some(WindowFrame { ... }),
133///     }],
134///     where_clause: None,
135///     order_by: vec![],
136///     limit: Some(100),
137///     offset: None,
138/// };
139/// ```
140#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
141pub struct WindowRequest {
142    /// Fact table name (e.g., "tf_sales")
143    pub table_name: String,
144
145    /// Columns to select (measures, dimensions, filters)
146    pub select: Vec<WindowSelectColumn>,
147
148    /// Window function specifications
149    pub windows: Vec<WindowFunctionRequest>,
150
151    /// WHERE clause filters (applied before window computation)
152    pub where_clause: Option<WhereClause>,
153
154    /// Final ORDER BY (after window computation)
155    pub order_by: Vec<WindowOrderBy>,
156
157    /// Result limit
158    pub limit: Option<u32>,
159
160    /// Result offset
161    pub offset: Option<u32>,
162}
163
164/// Column selection for window query (semantic names).
165#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
166#[serde(tag = "type", rename_all = "snake_case")]
167pub enum WindowSelectColumn {
168    /// Select a measure column (e.g., "revenue")
169    Measure {
170        /// Measure name from FactTableMetadata
171        name:  String,
172        /// Result alias
173        alias: String,
174    },
175
176    /// Select a dimension from JSONB (e.g., "category")
177    Dimension {
178        /// Dimension path in JSONB
179        path:  String,
180        /// Result alias
181        alias: String,
182    },
183
184    /// Select a denormalized filter column (e.g., "customer_id", "occurred_at")
185    Filter {
186        /// Filter column name
187        name:  String,
188        /// Result alias
189        alias: String,
190    },
191}
192
193impl WindowSelectColumn {
194    /// Get the result alias for this selection.
195    #[must_use]
196    pub fn alias(&self) -> &str {
197        match self {
198            Self::Measure { alias, .. }
199            | Self::Dimension { alias, .. }
200            | Self::Filter { alias, .. } => alias,
201        }
202    }
203}
204
205/// Window function request (high-level, semantic).
206#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
207pub struct WindowFunctionRequest {
208    /// Window function type and parameters
209    pub function: WindowFunctionSpec,
210
211    /// Result column alias
212    pub alias: String,
213
214    /// PARTITION BY columns (semantic names)
215    pub partition_by: Vec<PartitionByColumn>,
216
217    /// ORDER BY within window
218    pub order_by: Vec<WindowOrderBy>,
219
220    /// Window frame specification
221    pub frame: Option<WindowFrame>,
222}
223
224/// Window function specification using semantic field names.
225///
226/// Unlike `WindowFunctionType` which uses raw SQL expressions,
227/// this uses measure/dimension names that get validated against metadata.
228#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
229#[serde(tag = "type", rename_all = "snake_case")]
230pub enum WindowFunctionSpec {
231    // =========================================================================
232    // Ranking Functions (no field reference needed)
233    // =========================================================================
234    /// ROW_NUMBER() - Sequential number within partition
235    RowNumber,
236
237    /// RANK() - Rank with gaps for ties
238    Rank,
239
240    /// DENSE_RANK() - Rank without gaps
241    DenseRank,
242
243    /// NTILE(n) - Divide rows into n groups
244    Ntile {
245        /// Number of groups
246        n: u32,
247    },
248
249    /// PERCENT_RANK() - Relative rank (0 to 1)
250    PercentRank,
251
252    /// CUME_DIST() - Cumulative distribution
253    CumeDist,
254
255    // =========================================================================
256    // Value Functions (reference measures or dimensions)
257    // =========================================================================
258    /// LAG(field, offset, default) - Value from previous row
259    Lag {
260        /// Measure or dimension name
261        field:   String,
262        /// Row offset (default: 1)
263        offset:  i32,
264        /// Default value when no previous row
265        default: Option<serde_json::Value>,
266    },
267
268    /// LEAD(field, offset, default) - Value from next row
269    Lead {
270        /// Measure or dimension name
271        field:   String,
272        /// Row offset (default: 1)
273        offset:  i32,
274        /// Default value when no next row
275        default: Option<serde_json::Value>,
276    },
277
278    /// FIRST_VALUE(field) - First value in window frame
279    FirstValue {
280        /// Measure or dimension name
281        field: String,
282    },
283
284    /// LAST_VALUE(field) - Last value in window frame
285    LastValue {
286        /// Measure or dimension name
287        field: String,
288    },
289
290    /// NTH_VALUE(field, n) - Nth value in window frame
291    NthValue {
292        /// Measure or dimension name
293        field: String,
294        /// Position (1-indexed)
295        n:     u32,
296    },
297
298    // =========================================================================
299    // Aggregate as Window Functions (reference measures)
300    // =========================================================================
301    /// SUM(measure) OVER (...) - Running total
302    RunningSum {
303        /// Measure name
304        measure: String,
305    },
306
307    /// AVG(measure) OVER (...) - Moving average
308    RunningAvg {
309        /// Measure name
310        measure: String,
311    },
312
313    /// COUNT(*) OVER (...) - Running count
314    RunningCount,
315
316    /// COUNT(field) OVER (...) - Running count of non-null values
317    RunningCountField {
318        /// Measure or dimension name
319        field: String,
320    },
321
322    /// MIN(measure) OVER (...) - Running minimum
323    RunningMin {
324        /// Measure name
325        measure: String,
326    },
327
328    /// MAX(measure) OVER (...) - Running maximum
329    RunningMax {
330        /// Measure name
331        measure: String,
332    },
333
334    /// STDDEV(measure) OVER (...) - Running standard deviation
335    RunningStddev {
336        /// Measure name
337        measure: String,
338    },
339
340    /// VARIANCE(measure) OVER (...) - Running variance
341    RunningVariance {
342        /// Measure name
343        measure: String,
344    },
345}
346
347/// PARTITION BY column specification (semantic).
348#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
349#[serde(tag = "type", rename_all = "snake_case")]
350pub enum PartitionByColumn {
351    /// Partition by dimension from JSONB
352    Dimension {
353        /// Dimension path
354        path: String,
355    },
356
357    /// Partition by denormalized filter column
358    Filter {
359        /// Filter column name
360        name: String,
361    },
362
363    /// Partition by measure (rare but valid)
364    Measure {
365        /// Measure name
366        name: String,
367    },
368}
369
370/// ORDER BY clause for window functions (semantic field names).
371#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
372pub struct WindowOrderBy {
373    /// Field name (measure, dimension, or filter)
374    pub field: String,
375
376    /// Sort direction
377    pub direction: OrderDirection,
378}
379
380// =============================================================================
381// Low-Level Types (WindowExecutionPlan) - SQL expressions, ready for execution
382// =============================================================================
383
384/// Window function execution plan
385#[derive(Debug, Clone, Serialize, Deserialize)]
386pub struct WindowExecutionPlan {
387    /// Source table name
388    pub table: String,
389
390    /// Regular SELECT columns (non-window)
391    pub select: Vec<SelectColumn>,
392
393    /// Window function expressions
394    pub windows: Vec<WindowFunction>,
395
396    /// WHERE clause filter
397    pub where_clause: Option<WhereClause>,
398
399    /// Final ORDER BY (after window computation)
400    pub order_by: Vec<OrderByClause>,
401
402    /// Result limit
403    pub limit: Option<u32>,
404
405    /// Result offset
406    pub offset: Option<u32>,
407}
408
409/// Regular SELECT column
410#[derive(Debug, Clone, Serialize, Deserialize)]
411pub struct SelectColumn {
412    /// Column expression (e.g., "revenue", "data->>'category'")
413    pub expression: String,
414
415    /// Result alias
416    pub alias: String,
417}
418
419/// Window function specification
420#[derive(Debug, Clone, Serialize, Deserialize)]
421pub struct WindowFunction {
422    /// Window function type
423    pub function: WindowFunctionType,
424
425    /// Result column alias
426    pub alias: String,
427
428    /// PARTITION BY columns
429    pub partition_by: Vec<String>,
430
431    /// ORDER BY within window
432    pub order_by: Vec<OrderByClause>,
433
434    /// Window frame specification
435    pub frame: Option<WindowFrame>,
436}
437
438/// Window function types
439#[derive(Debug, Clone, Serialize, Deserialize)]
440#[serde(tag = "type", rename_all = "snake_case")]
441pub enum WindowFunctionType {
442    // Ranking functions
443    /// ROW_NUMBER() - Sequential number within partition
444    RowNumber,
445
446    /// RANK() - Rank with gaps for ties
447    Rank,
448
449    /// DENSE_RANK() - Rank without gaps
450    DenseRank,
451
452    /// NTILE(n) - Divide rows into n groups
453    Ntile {
454        /// Number of groups
455        n: u32,
456    },
457
458    /// PERCENT_RANK() - Relative rank (0 to 1)
459    PercentRank,
460
461    /// CUME_DIST() - Cumulative distribution
462    CumeDist,
463
464    // Value functions
465    /// LAG(field, offset, default) - Value from previous row
466    Lag {
467        /// Field name
468        field:   String,
469        /// Row offset
470        offset:  i32,
471        /// Default value
472        default: Option<serde_json::Value>,
473    },
474
475    /// LEAD(field, offset, default) - Value from next row
476    Lead {
477        /// Field name
478        field:   String,
479        /// Row offset
480        offset:  i32,
481        /// Default value
482        default: Option<serde_json::Value>,
483    },
484
485    /// FIRST_VALUE(field) - First value in window
486    FirstValue {
487        /// Field name
488        field: String,
489    },
490
491    /// LAST_VALUE(field) - Last value in window
492    LastValue {
493        /// Field name
494        field: String,
495    },
496
497    /// NTH_VALUE(field, n) - Nth value in window
498    NthValue {
499        /// Field name
500        field: String,
501        /// Position
502        n:     u32,
503    },
504
505    // Aggregate as window functions
506    /// SUM(field) OVER (...) - Running total
507    Sum {
508        /// Field name
509        field: String,
510    },
511
512    /// AVG(field) OVER (...) - Moving average
513    Avg {
514        /// Field name
515        field: String,
516    },
517
518    /// COUNT(*) OVER (...) - Running count
519    Count {
520        /// Field name
521        field: Option<String>,
522    },
523
524    /// MIN(field) OVER (...) - Running minimum
525    Min {
526        /// Field name
527        field: String,
528    },
529
530    /// MAX(field) OVER (...) - Running maximum
531    Max {
532        /// Field name
533        field: String,
534    },
535
536    /// STDDEV(field) OVER (...) - Running standard deviation
537    Stddev {
538        /// Field name
539        field: String,
540    },
541
542    /// VARIANCE(field) OVER (...) - Running variance
543    Variance {
544        /// Field name
545        field: String,
546    },
547}
548
549/// Window frame specification
550#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
551pub struct WindowFrame {
552    /// Frame type (ROWS, RANGE, GROUPS)
553    pub frame_type: FrameType,
554
555    /// Frame start boundary
556    pub start: FrameBoundary,
557
558    /// Frame end boundary
559    pub end: FrameBoundary,
560
561    /// Frame exclusion (PostgreSQL only)
562    pub exclusion: Option<FrameExclusion>,
563}
564
565/// Window frame type
566#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
567#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
568pub enum FrameType {
569    /// ROWS frame - Physical rows
570    Rows,
571
572    /// RANGE frame - Logical range based on ORDER BY
573    Range,
574
575    /// GROUPS frame - Peer groups (PostgreSQL only)
576    Groups,
577}
578
579/// Window frame boundary
580#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
581#[serde(tag = "type", rename_all = "snake_case")]
582pub enum FrameBoundary {
583    /// UNBOUNDED PRECEDING
584    UnboundedPreceding,
585
586    /// N PRECEDING
587    NPreceding {
588        /// Number of rows
589        n: u32,
590    },
591
592    /// CURRENT ROW
593    CurrentRow,
594
595    /// N FOLLOWING
596    NFollowing {
597        /// Number of rows
598        n: u32,
599    },
600
601    /// UNBOUNDED FOLLOWING
602    UnboundedFollowing,
603}
604
605/// Frame exclusion mode (PostgreSQL)
606#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
607#[serde(rename_all = "snake_case")]
608pub enum FrameExclusion {
609    /// EXCLUDE CURRENT ROW
610    CurrentRow,
611
612    /// EXCLUDE GROUP
613    Group,
614
615    /// EXCLUDE TIES
616    Ties,
617
618    /// EXCLUDE NO OTHERS
619    NoOthers,
620}
621
622/// Window function plan generator
623pub struct WindowFunctionPlanner;
624
625impl WindowFunctionPlanner {
626    /// Generate window function execution plan from JSON query
627    ///
628    /// # Example Query Format
629    ///
630    /// ```json
631    /// {
632    ///   "table": "tf_sales",
633    ///   "select": ["revenue", "category"],
634    ///   "windows": [
635    ///     {
636    ///       "function": {"row_number": {}},
637    ///       "alias": "rank",
638    ///       "partitionBy": ["data->>'category'"],
639    ///       "orderBy": [{"field": "revenue", "direction": "DESC"}]
640    ///     }
641    ///   ],
642    ///   "limit": 10
643    /// }
644    /// ```
645    pub fn plan(
646        query: &serde_json::Value,
647        _metadata: &FactTableMetadata,
648    ) -> Result<WindowExecutionPlan> {
649        // Parse table name
650        let table = query["table"]
651            .as_str()
652            .ok_or_else(|| FraiseQLError::validation("Missing 'table' field"))?
653            .to_string();
654
655        // Parse SELECT columns
656        let select = Self::parse_select_columns(query)?;
657
658        // Parse window functions
659        let windows = Self::parse_window_functions(query)?;
660
661        // Parse WHERE clause (placeholder - full implementation would parse actual conditions)
662        let where_clause = query.get("where").map(|_| WhereClause::And(vec![]));
663
664        // Parse ORDER BY
665        let order_by = query
666            .get("orderBy")
667            .and_then(|v| v.as_array())
668            .map(|arr| {
669                arr.iter()
670                    .filter_map(|item| {
671                        let direction = match item.get("direction").and_then(|d| d.as_str()) {
672                            Some("DESC") => OrderDirection::Desc,
673                            _ => OrderDirection::Asc,
674                        };
675                        Some(OrderByClause {
676                            field: item["field"].as_str()?.to_string(),
677                            direction,
678                        })
679                    })
680                    .collect()
681            })
682            .unwrap_or_default();
683
684        // Parse LIMIT/OFFSET
685        let limit = query.get("limit").and_then(|v| v.as_u64()).map(|n| n as u32);
686        let offset = query.get("offset").and_then(|v| v.as_u64()).map(|n| n as u32);
687
688        Ok(WindowExecutionPlan {
689            table,
690            select,
691            windows,
692            where_clause,
693            order_by,
694            limit,
695            offset,
696        })
697    }
698
699    fn parse_select_columns(query: &serde_json::Value) -> Result<Vec<SelectColumn>> {
700        let default_array = vec![];
701        let select = query.get("select").and_then(|s| s.as_array()).unwrap_or(&default_array);
702
703        let columns = select
704            .iter()
705            .filter_map(|col| {
706                if let Some(col_str) = col.as_str() {
707                    Some(SelectColumn {
708                        expression: col_str.to_string(),
709                        alias:      col_str.to_string(),
710                    })
711                } else {
712                    None
713                }
714            })
715            .collect();
716
717        Ok(columns)
718    }
719
720    fn parse_window_functions(query: &serde_json::Value) -> Result<Vec<WindowFunction>> {
721        let default_array = vec![];
722        let windows = query.get("windows").and_then(|w| w.as_array()).unwrap_or(&default_array);
723
724        windows.iter().map(|window| Self::parse_single_window(window)).collect()
725    }
726
727    fn parse_single_window(window: &serde_json::Value) -> Result<WindowFunction> {
728        let function = Self::parse_window_function_type(&window["function"])?;
729        let alias = window["alias"]
730            .as_str()
731            .ok_or_else(|| FraiseQLError::validation("Missing 'alias' in window function"))?
732            .to_string();
733
734        let partition_by = window
735            .get("partitionBy")
736            .and_then(|p| p.as_array())
737            .map(|arr| arr.iter().filter_map(|v| v.as_str().map(String::from)).collect())
738            .unwrap_or_default();
739
740        let order_by = window
741            .get("orderBy")
742            .and_then(|o| o.as_array())
743            .map(|arr| {
744                arr.iter()
745                    .filter_map(|item| {
746                        let direction = match item.get("direction").and_then(|d| d.as_str()) {
747                            Some("DESC") => OrderDirection::Desc,
748                            _ => OrderDirection::Asc,
749                        };
750                        Some(OrderByClause {
751                            field: item["field"].as_str()?.to_string(),
752                            direction,
753                        })
754                    })
755                    .collect()
756            })
757            .unwrap_or_default();
758
759        let frame = window.get("frame").map(Self::parse_window_frame).transpose()?;
760
761        Ok(WindowFunction {
762            function,
763            alias,
764            partition_by,
765            order_by,
766            frame,
767        })
768    }
769
770    fn parse_window_function_type(func: &serde_json::Value) -> Result<WindowFunctionType> {
771        // Try each function type
772        if func.get("row_number").is_some() {
773            return Ok(WindowFunctionType::RowNumber);
774        }
775        if func.get("rank").is_some() {
776            return Ok(WindowFunctionType::Rank);
777        }
778        if func.get("dense_rank").is_some() {
779            return Ok(WindowFunctionType::DenseRank);
780        }
781        if let Some(ntile) = func.get("ntile") {
782            let n = ntile["n"]
783                .as_u64()
784                .ok_or_else(|| FraiseQLError::validation("Missing 'n' in NTILE function"))?
785                as u32;
786            return Ok(WindowFunctionType::Ntile { n });
787        }
788        if func.get("percent_rank").is_some() {
789            return Ok(WindowFunctionType::PercentRank);
790        }
791        if func.get("cume_dist").is_some() {
792            return Ok(WindowFunctionType::CumeDist);
793        }
794
795        // Value functions
796        if let Some(lag) = func.get("lag") {
797            let field = lag["field"]
798                .as_str()
799                .ok_or_else(|| FraiseQLError::validation("Missing 'field' in LAG"))?
800                .to_string();
801            let offset = lag.get("offset").and_then(|o| o.as_i64()).unwrap_or(1) as i32;
802            let default = lag.get("default").cloned();
803            return Ok(WindowFunctionType::Lag {
804                field,
805                offset,
806                default,
807            });
808        }
809        if let Some(lead) = func.get("lead") {
810            let field = lead["field"]
811                .as_str()
812                .ok_or_else(|| FraiseQLError::validation("Missing 'field' in LEAD"))?
813                .to_string();
814            let offset = lead.get("offset").and_then(|o| o.as_i64()).unwrap_or(1) as i32;
815            let default = lead.get("default").cloned();
816            return Ok(WindowFunctionType::Lead {
817                field,
818                offset,
819                default,
820            });
821        }
822        if let Some(first_val) = func.get("first_value") {
823            let field = first_val["field"]
824                .as_str()
825                .ok_or_else(|| FraiseQLError::validation("Missing 'field' in FIRST_VALUE"))?
826                .to_string();
827            return Ok(WindowFunctionType::FirstValue { field });
828        }
829        if let Some(last_val) = func.get("last_value") {
830            let field = last_val["field"]
831                .as_str()
832                .ok_or_else(|| FraiseQLError::validation("Missing 'field' in LAST_VALUE"))?
833                .to_string();
834            return Ok(WindowFunctionType::LastValue { field });
835        }
836        if let Some(nth_val) = func.get("nth_value") {
837            let field = nth_val["field"]
838                .as_str()
839                .ok_or_else(|| FraiseQLError::validation("Missing 'field' in NTH_VALUE"))?
840                .to_string();
841            let n = nth_val["n"]
842                .as_u64()
843                .ok_or_else(|| FraiseQLError::validation("Missing 'n' in NTH_VALUE"))?
844                as u32;
845            return Ok(WindowFunctionType::NthValue { field, n });
846        }
847
848        // Aggregate as window
849        if let Some(sum) = func.get("sum") {
850            let field = sum["field"]
851                .as_str()
852                .ok_or_else(|| FraiseQLError::validation("Missing 'field' in SUM"))?
853                .to_string();
854            return Ok(WindowFunctionType::Sum { field });
855        }
856        if let Some(avg) = func.get("avg") {
857            let field = avg["field"]
858                .as_str()
859                .ok_or_else(|| FraiseQLError::validation("Missing 'field' in AVG"))?
860                .to_string();
861            return Ok(WindowFunctionType::Avg { field });
862        }
863        if let Some(count) = func.get("count") {
864            let field = count.get("field").and_then(|f| f.as_str()).map(String::from);
865            return Ok(WindowFunctionType::Count { field });
866        }
867        if let Some(min) = func.get("min") {
868            let field = min["field"]
869                .as_str()
870                .ok_or_else(|| FraiseQLError::validation("Missing 'field' in MIN"))?
871                .to_string();
872            return Ok(WindowFunctionType::Min { field });
873        }
874        if let Some(max) = func.get("max") {
875            let field = max["field"]
876                .as_str()
877                .ok_or_else(|| FraiseQLError::validation("Missing 'field' in MAX"))?
878                .to_string();
879            return Ok(WindowFunctionType::Max { field });
880        }
881        if let Some(stddev) = func.get("stddev") {
882            let field = stddev["field"]
883                .as_str()
884                .ok_or_else(|| FraiseQLError::validation("Missing 'field' in STDDEV"))?
885                .to_string();
886            return Ok(WindowFunctionType::Stddev { field });
887        }
888        if let Some(variance) = func.get("variance") {
889            let field = variance["field"]
890                .as_str()
891                .ok_or_else(|| FraiseQLError::validation("Missing 'field' in VARIANCE"))?
892                .to_string();
893            return Ok(WindowFunctionType::Variance { field });
894        }
895
896        Err(FraiseQLError::validation("Unknown window function type"))
897    }
898
899    fn parse_window_frame(frame: &serde_json::Value) -> Result<WindowFrame> {
900        let frame_type = match frame["frame_type"].as_str() {
901            Some("ROWS") => FrameType::Rows,
902            Some("RANGE") => FrameType::Range,
903            Some("GROUPS") => FrameType::Groups,
904            _ => return Err(FraiseQLError::validation("Invalid or missing 'frame_type'")),
905        };
906
907        let start = Self::parse_frame_boundary(&frame["start"])?;
908        let end = Self::parse_frame_boundary(&frame["end"])?;
909        let exclusion = frame.get("exclusion").map(|e| match e.as_str() {
910            Some("current_row") => FrameExclusion::CurrentRow,
911            Some("group") => FrameExclusion::Group,
912            Some("ties") => FrameExclusion::Ties,
913            Some("no_others") => FrameExclusion::NoOthers,
914            _ => FrameExclusion::NoOthers,
915        });
916
917        Ok(WindowFrame {
918            frame_type,
919            start,
920            end,
921            exclusion,
922        })
923    }
924
925    fn parse_frame_boundary(boundary: &serde_json::Value) -> Result<FrameBoundary> {
926        match boundary["type"].as_str() {
927            Some("unbounded_preceding") => Ok(FrameBoundary::UnboundedPreceding),
928            Some("n_preceding") => {
929                let n = boundary["n"]
930                    .as_u64()
931                    .ok_or_else(|| FraiseQLError::validation("Missing 'n' in N PRECEDING"))?
932                    as u32;
933                Ok(FrameBoundary::NPreceding { n })
934            },
935            Some("current_row") => Ok(FrameBoundary::CurrentRow),
936            Some("n_following") => {
937                let n = boundary["n"]
938                    .as_u64()
939                    .ok_or_else(|| FraiseQLError::validation("Missing 'n' in N FOLLOWING"))?
940                    as u32;
941                Ok(FrameBoundary::NFollowing { n })
942            },
943            Some("unbounded_following") => Ok(FrameBoundary::UnboundedFollowing),
944            _ => Err(FraiseQLError::validation("Invalid frame boundary type")),
945        }
946    }
947
948    /// Validate window function plan
949    pub fn validate(
950        plan: &WindowExecutionPlan,
951        _metadata: &FactTableMetadata,
952        database_target: crate::db::types::DatabaseType,
953    ) -> Result<()> {
954        use crate::db::types::DatabaseType;
955
956        // Validate frame type supported by database
957        for window in &plan.windows {
958            if let Some(frame) = &window.frame {
959                if frame.frame_type == FrameType::Groups {
960                    if !matches!(database_target, DatabaseType::PostgreSQL) {
961                        return Err(FraiseQLError::validation(
962                            "GROUPS frame type only supported on PostgreSQL",
963                        ));
964                    }
965                }
966
967                // Validate frame exclusion (PostgreSQL only)
968                if frame.exclusion.is_some() && !matches!(database_target, DatabaseType::PostgreSQL)
969                {
970                    return Err(FraiseQLError::validation(
971                        "Frame exclusion only supported on PostgreSQL",
972                    ));
973                }
974            }
975
976            // Validate PERCENT_RANK and CUME_DIST (not in SQLite)
977            match window.function {
978                WindowFunctionType::PercentRank | WindowFunctionType::CumeDist => {
979                    if matches!(database_target, DatabaseType::SQLite) {
980                        return Err(FraiseQLError::validation(
981                            "PERCENT_RANK and CUME_DIST not supported on SQLite",
982                        ));
983                    }
984                },
985                _ => {},
986            }
987        }
988
989        Ok(())
990    }
991}
992
993// =============================================================================
994// WindowPlanner - Converts high-level WindowRequest to WindowExecutionPlan
995// =============================================================================
996
997/// High-level window planner that validates semantic names against metadata.
998///
999/// Converts `WindowRequest` (user-friendly semantic names) to `WindowExecutionPlan`
1000/// (SQL expressions ready for execution).
1001///
1002/// # Example
1003///
1004/// ```rust,ignore
1005/// let request = WindowRequest { ... };
1006/// let metadata = FactTableMetadata { ... };
1007/// let plan = WindowPlanner::plan(request, metadata)?;
1008/// // plan now has SQL expressions like "dimensions->>'category'" instead of "category"
1009/// ```
1010pub struct WindowPlanner;
1011
1012impl WindowPlanner {
1013    /// Convert high-level WindowRequest to executable WindowExecutionPlan.
1014    ///
1015    /// # Arguments
1016    ///
1017    /// * `request` - High-level window request with semantic names
1018    /// * `metadata` - Fact table metadata for validation and expression generation
1019    ///
1020    /// # Errors
1021    ///
1022    /// Returns error if:
1023    /// - Referenced measures don't exist in metadata
1024    /// - Referenced filter columns don't exist
1025    /// - Window function field references are invalid
1026    pub fn plan(
1027        request: WindowRequest,
1028        metadata: FactTableMetadata,
1029    ) -> Result<WindowExecutionPlan> {
1030        // Convert select columns to SQL expressions
1031        let select = Self::convert_select_columns(&request.select, &metadata)?;
1032
1033        // Convert window functions to SQL expressions
1034        let windows = Self::convert_window_functions(&request.windows, &metadata)?;
1035
1036        // Convert final ORDER BY to SQL expressions
1037        let order_by = Self::convert_order_by(&request.order_by, &metadata)?;
1038
1039        Ok(WindowExecutionPlan {
1040            table: request.table_name,
1041            select,
1042            windows,
1043            where_clause: request.where_clause,
1044            order_by,
1045            limit: request.limit,
1046            offset: request.offset,
1047        })
1048    }
1049
1050    /// Convert semantic select columns to SQL expressions.
1051    fn convert_select_columns(
1052        columns: &[WindowSelectColumn],
1053        metadata: &FactTableMetadata,
1054    ) -> Result<Vec<SelectColumn>> {
1055        columns
1056            .iter()
1057            .map(|col| Self::convert_single_select_column(col, metadata))
1058            .collect()
1059    }
1060
1061    fn convert_single_select_column(
1062        column: &WindowSelectColumn,
1063        metadata: &FactTableMetadata,
1064    ) -> Result<SelectColumn> {
1065        match column {
1066            WindowSelectColumn::Measure { name, alias } => {
1067                // Validate measure exists
1068                if !metadata.measures.iter().any(|m| m.name == *name) {
1069                    return Err(FraiseQLError::Validation {
1070                        message: format!(
1071                            "Measure '{}' not found in fact table '{}'",
1072                            name, metadata.table_name
1073                        ),
1074                        path:    None,
1075                    });
1076                }
1077                // Measure columns are direct SQL columns
1078                Ok(SelectColumn {
1079                    expression: name.clone(),
1080                    alias:      alias.clone(),
1081                })
1082            },
1083            WindowSelectColumn::Dimension { path, alias } => {
1084                // Dimension from JSONB - generate extraction expression
1085                let expression = format!("{}->>'{}'", metadata.dimensions.name, path);
1086                Ok(SelectColumn {
1087                    expression,
1088                    alias: alias.clone(),
1089                })
1090            },
1091            WindowSelectColumn::Filter { name, alias } => {
1092                // Validate filter column exists
1093                if !metadata.denormalized_filters.iter().any(|f| f.name == *name) {
1094                    return Err(FraiseQLError::Validation {
1095                        message: format!(
1096                            "Filter column '{}' not found in fact table '{}'",
1097                            name, metadata.table_name
1098                        ),
1099                        path:    None,
1100                    });
1101                }
1102                // Filter columns are direct SQL columns
1103                Ok(SelectColumn {
1104                    expression: name.clone(),
1105                    alias:      alias.clone(),
1106                })
1107            },
1108        }
1109    }
1110
1111    /// Convert semantic window functions to SQL expressions.
1112    fn convert_window_functions(
1113        windows: &[WindowFunctionRequest],
1114        metadata: &FactTableMetadata,
1115    ) -> Result<Vec<WindowFunction>> {
1116        windows
1117            .iter()
1118            .map(|w| Self::convert_single_window_function(w, metadata))
1119            .collect()
1120    }
1121
1122    fn convert_single_window_function(
1123        request: &WindowFunctionRequest,
1124        metadata: &FactTableMetadata,
1125    ) -> Result<WindowFunction> {
1126        // Convert function spec to function type
1127        let function = Self::convert_function_spec(&request.function, metadata)?;
1128
1129        // Convert PARTITION BY columns to SQL expressions
1130        let partition_by = request
1131            .partition_by
1132            .iter()
1133            .map(|p| Self::convert_partition_by(p, metadata))
1134            .collect::<Result<Vec<_>>>()?;
1135
1136        // Convert ORDER BY within window to SQL expressions
1137        let order_by = request
1138            .order_by
1139            .iter()
1140            .map(|o| Self::convert_window_order_by(o, metadata))
1141            .collect::<Result<Vec<_>>>()?;
1142
1143        Ok(WindowFunction {
1144            function,
1145            alias: request.alias.clone(),
1146            partition_by,
1147            order_by,
1148            frame: request.frame.clone(),
1149        })
1150    }
1151
1152    /// Convert high-level function spec to low-level function type with SQL expressions.
1153    fn convert_function_spec(
1154        spec: &WindowFunctionSpec,
1155        metadata: &FactTableMetadata,
1156    ) -> Result<WindowFunctionType> {
1157        match spec {
1158            // Ranking functions - no field conversion needed
1159            WindowFunctionSpec::RowNumber => Ok(WindowFunctionType::RowNumber),
1160            WindowFunctionSpec::Rank => Ok(WindowFunctionType::Rank),
1161            WindowFunctionSpec::DenseRank => Ok(WindowFunctionType::DenseRank),
1162            WindowFunctionSpec::Ntile { n } => Ok(WindowFunctionType::Ntile { n: *n }),
1163            WindowFunctionSpec::PercentRank => Ok(WindowFunctionType::PercentRank),
1164            WindowFunctionSpec::CumeDist => Ok(WindowFunctionType::CumeDist),
1165
1166            // Value functions - need field conversion
1167            WindowFunctionSpec::Lag {
1168                field,
1169                offset,
1170                default,
1171            } => {
1172                let sql_field = Self::resolve_field_to_sql(field, metadata)?;
1173                Ok(WindowFunctionType::Lag {
1174                    field:   sql_field,
1175                    offset:  *offset,
1176                    default: default.clone(),
1177                })
1178            },
1179            WindowFunctionSpec::Lead {
1180                field,
1181                offset,
1182                default,
1183            } => {
1184                let sql_field = Self::resolve_field_to_sql(field, metadata)?;
1185                Ok(WindowFunctionType::Lead {
1186                    field:   sql_field,
1187                    offset:  *offset,
1188                    default: default.clone(),
1189                })
1190            },
1191            WindowFunctionSpec::FirstValue { field } => {
1192                let sql_field = Self::resolve_field_to_sql(field, metadata)?;
1193                Ok(WindowFunctionType::FirstValue { field: sql_field })
1194            },
1195            WindowFunctionSpec::LastValue { field } => {
1196                let sql_field = Self::resolve_field_to_sql(field, metadata)?;
1197                Ok(WindowFunctionType::LastValue { field: sql_field })
1198            },
1199            WindowFunctionSpec::NthValue { field, n } => {
1200                let sql_field = Self::resolve_field_to_sql(field, metadata)?;
1201                Ok(WindowFunctionType::NthValue {
1202                    field: sql_field,
1203                    n:     *n,
1204                })
1205            },
1206
1207            // Aggregate as window functions - need measure conversion
1208            WindowFunctionSpec::RunningSum { measure } => {
1209                Self::validate_measure(measure, metadata)?;
1210                Ok(WindowFunctionType::Sum {
1211                    field: measure.clone(),
1212                })
1213            },
1214            WindowFunctionSpec::RunningAvg { measure } => {
1215                Self::validate_measure(measure, metadata)?;
1216                Ok(WindowFunctionType::Avg {
1217                    field: measure.clone(),
1218                })
1219            },
1220            WindowFunctionSpec::RunningCount => Ok(WindowFunctionType::Count { field: None }),
1221            WindowFunctionSpec::RunningCountField { field } => {
1222                let sql_field = Self::resolve_field_to_sql(field, metadata)?;
1223                Ok(WindowFunctionType::Count {
1224                    field: Some(sql_field),
1225                })
1226            },
1227            WindowFunctionSpec::RunningMin { measure } => {
1228                Self::validate_measure(measure, metadata)?;
1229                Ok(WindowFunctionType::Min {
1230                    field: measure.clone(),
1231                })
1232            },
1233            WindowFunctionSpec::RunningMax { measure } => {
1234                Self::validate_measure(measure, metadata)?;
1235                Ok(WindowFunctionType::Max {
1236                    field: measure.clone(),
1237                })
1238            },
1239            WindowFunctionSpec::RunningStddev { measure } => {
1240                Self::validate_measure(measure, metadata)?;
1241                Ok(WindowFunctionType::Stddev {
1242                    field: measure.clone(),
1243                })
1244            },
1245            WindowFunctionSpec::RunningVariance { measure } => {
1246                Self::validate_measure(measure, metadata)?;
1247                Ok(WindowFunctionType::Variance {
1248                    field: measure.clone(),
1249                })
1250            },
1251        }
1252    }
1253
1254    /// Convert PARTITION BY column to SQL expression.
1255    fn convert_partition_by(
1256        partition: &PartitionByColumn,
1257        metadata: &FactTableMetadata,
1258    ) -> Result<String> {
1259        match partition {
1260            PartitionByColumn::Dimension { path } => {
1261                Ok(format!("{}->>'{}'", metadata.dimensions.name, path))
1262            },
1263            PartitionByColumn::Filter { name } => {
1264                if !metadata.denormalized_filters.iter().any(|f| f.name == *name) {
1265                    return Err(FraiseQLError::Validation {
1266                        message: format!(
1267                            "Filter column '{}' not found in fact table '{}'",
1268                            name, metadata.table_name
1269                        ),
1270                        path:    None,
1271                    });
1272                }
1273                Ok(name.clone())
1274            },
1275            PartitionByColumn::Measure { name } => {
1276                Self::validate_measure(name, metadata)?;
1277                Ok(name.clone())
1278            },
1279        }
1280    }
1281
1282    /// Convert window ORDER BY to SQL expression.
1283    fn convert_window_order_by(
1284        order: &WindowOrderBy,
1285        metadata: &FactTableMetadata,
1286    ) -> Result<OrderByClause> {
1287        let field = Self::resolve_field_to_sql(&order.field, metadata)?;
1288        Ok(OrderByClause {
1289            field,
1290            direction: order.direction,
1291        })
1292    }
1293
1294    /// Convert final ORDER BY to SQL expressions.
1295    fn convert_order_by(
1296        orders: &[WindowOrderBy],
1297        metadata: &FactTableMetadata,
1298    ) -> Result<Vec<OrderByClause>> {
1299        orders.iter().map(|o| Self::convert_window_order_by(o, metadata)).collect()
1300    }
1301
1302    /// Resolve a semantic field name to its SQL expression.
1303    ///
1304    /// Priority:
1305    /// 1. Check if it's a measure (direct column)
1306    /// 2. Check if it's a filter column (direct column)
1307    /// 3. Assume it's a dimension path (JSONB extraction)
1308    fn resolve_field_to_sql(field: &str, metadata: &FactTableMetadata) -> Result<String> {
1309        // Check if it's a measure
1310        if metadata.measures.iter().any(|m| m.name == field) {
1311            return Ok(field.to_string());
1312        }
1313
1314        // Check if it's a filter column
1315        if metadata.denormalized_filters.iter().any(|f| f.name == field) {
1316            return Ok(field.to_string());
1317        }
1318
1319        // Assume it's a dimension path
1320        Ok(format!("{}->>'{}'", metadata.dimensions.name, field))
1321    }
1322
1323    /// Validate that a measure exists in metadata.
1324    fn validate_measure(measure: &str, metadata: &FactTableMetadata) -> Result<()> {
1325        if !metadata.measures.iter().any(|m| m.name == *measure) {
1326            return Err(FraiseQLError::Validation {
1327                message: format!(
1328                    "Measure '{}' not found in fact table '{}'",
1329                    measure, metadata.table_name
1330                ),
1331                path:    None,
1332            });
1333        }
1334        Ok(())
1335    }
1336}
1337
1338#[cfg(test)]
1339mod tests {
1340    use super::*;
1341    use crate::compiler::fact_table::{DimensionColumn, FilterColumn, MeasureColumn, SqlType};
1342
1343    fn create_test_metadata() -> FactTableMetadata {
1344        FactTableMetadata {
1345            table_name:           "tf_sales".to_string(),
1346            measures:             vec![
1347                MeasureColumn {
1348                    name:     "revenue".to_string(),
1349                    sql_type: SqlType::Decimal,
1350                    nullable: false,
1351                },
1352                MeasureColumn {
1353                    name:     "quantity".to_string(),
1354                    sql_type: SqlType::Int,
1355                    nullable: false,
1356                },
1357            ],
1358            dimensions:           DimensionColumn {
1359                name:  "dimensions".to_string(),
1360                paths: vec![],
1361            },
1362            denormalized_filters: vec![
1363                FilterColumn {
1364                    name:     "customer_id".to_string(),
1365                    sql_type: SqlType::Uuid,
1366                    indexed:  true,
1367                },
1368                FilterColumn {
1369                    name:     "occurred_at".to_string(),
1370                    sql_type: SqlType::Timestamp,
1371                    indexed:  true,
1372                },
1373            ],
1374            calendar_dimensions:  vec![],
1375        }
1376    }
1377
1378    // =============================================================================
1379    // Test Helpers
1380    // =============================================================================
1381
1382    /// Helper to serialize test objects without panicking
1383    fn serialize_json<T: serde::Serialize>(value: &T) -> String {
1384        serde_json::to_string(value).expect("serialization should succeed for test objects")
1385    }
1386
1387    /// Helper to deserialize test JSON without panicking
1388    fn deserialize_json<'a, T: serde::Deserialize<'a>>(json: &'a str) -> T {
1389        serde_json::from_str(json).expect("deserialization should succeed for valid test JSON")
1390    }
1391
1392    // =============================================================================
1393    // Tests
1394    // =============================================================================
1395
1396    #[test]
1397    fn test_window_function_type_serialization() {
1398        let func = WindowFunctionType::RowNumber;
1399        let json = serialize_json(&func);
1400        assert_eq!(json, r#"{"type":"row_number"}"#);
1401    }
1402
1403    #[test]
1404    fn test_frame_type_serialization() {
1405        let frame_type = FrameType::Rows;
1406        let json = serialize_json(&frame_type);
1407        assert_eq!(json, r#""ROWS""#);
1408    }
1409
1410    #[test]
1411    fn test_frame_boundary_unbounded() {
1412        let boundary = FrameBoundary::UnboundedPreceding;
1413        let json = serialize_json(&boundary);
1414        assert!(json.contains("unbounded_preceding"));
1415    }
1416
1417    #[test]
1418    fn test_frame_boundary_n_preceding() {
1419        let boundary = FrameBoundary::NPreceding { n: 5 };
1420        let json = serialize_json(&boundary);
1421        assert!(json.contains("n_preceding"));
1422        assert!(json.contains("\"n\":5"));
1423    }
1424
1425    #[test]
1426    fn test_parse_row_number_query() {
1427        let metadata = create_test_metadata();
1428        let query = serde_json::json!({
1429            "table": "tf_sales",
1430            "select": ["revenue"],
1431            "windows": [{
1432                "function": {"row_number": {}},
1433                "alias": "rank",
1434                "partitionBy": ["category"],
1435                "orderBy": [{"field": "revenue", "direction": "DESC"}]
1436            }]
1437        });
1438
1439        let plan =
1440            WindowFunctionPlanner::plan(&query, &metadata).expect("window plan should succeed");
1441
1442        assert_eq!(plan.table, "tf_sales");
1443        assert_eq!(plan.windows.len(), 1);
1444        assert_eq!(plan.windows[0].alias, "rank");
1445        assert!(matches!(plan.windows[0].function, WindowFunctionType::RowNumber));
1446    }
1447
1448    #[test]
1449    fn test_parse_lag_function() {
1450        let metadata = create_test_metadata();
1451        let query = serde_json::json!({
1452            "table": "tf_sales",
1453            "windows": [{
1454                "function": {
1455                    "lag": {
1456                        "field": "revenue",
1457                        "offset": 1,
1458                        "default": 0
1459                    }
1460                },
1461                "alias": "prev_revenue",
1462                "orderBy": [{"field": "occurred_at"}]
1463            }]
1464        });
1465
1466        let plan =
1467            WindowFunctionPlanner::plan(&query, &metadata).expect("window plan should succeed");
1468
1469        match &plan.windows[0].function {
1470            WindowFunctionType::Lag {
1471                field,
1472                offset,
1473                default,
1474            } => {
1475                assert_eq!(field, "revenue");
1476                assert_eq!(*offset, 1);
1477                assert!(default.is_some());
1478            },
1479            _ => panic!("Expected LAG function"),
1480        }
1481    }
1482
1483    #[test]
1484    fn test_validate_groups_frame_postgres_only() {
1485        use crate::db::types::DatabaseType;
1486
1487        let metadata = create_test_metadata();
1488        let plan = WindowExecutionPlan {
1489            table:        "tf_sales".to_string(),
1490            select:       vec![],
1491            windows:      vec![WindowFunction {
1492                function:     WindowFunctionType::RowNumber,
1493                alias:        "rank".to_string(),
1494                partition_by: vec![],
1495                order_by:     vec![],
1496                frame:        Some(WindowFrame {
1497                    frame_type: FrameType::Groups,
1498                    start:      FrameBoundary::UnboundedPreceding,
1499                    end:        FrameBoundary::CurrentRow,
1500                    exclusion:  None,
1501                }),
1502            }],
1503            where_clause: None,
1504            order_by:     vec![],
1505            limit:        None,
1506            offset:       None,
1507        };
1508
1509        // Should pass for PostgreSQL
1510        assert!(
1511            WindowFunctionPlanner::validate(&plan, &metadata, DatabaseType::PostgreSQL).is_ok()
1512        );
1513
1514        // Should fail for MySQL
1515        assert!(WindowFunctionPlanner::validate(&plan, &metadata, DatabaseType::MySQL).is_err());
1516    }
1517
1518    // =============================================================================
1519    // WindowPlanner Tests (High-Level -> Low-Level conversion)
1520    // =============================================================================
1521
1522    #[test]
1523    fn test_window_planner_basic_request() {
1524        let metadata = create_test_metadata();
1525        let request = WindowRequest {
1526            table_name:   "tf_sales".to_string(),
1527            select:       vec![
1528                WindowSelectColumn::Measure {
1529                    name:  "revenue".to_string(),
1530                    alias: "revenue".to_string(),
1531                },
1532                WindowSelectColumn::Dimension {
1533                    path:  "category".to_string(),
1534                    alias: "category".to_string(),
1535                },
1536            ],
1537            windows:      vec![WindowFunctionRequest {
1538                function:     WindowFunctionSpec::RowNumber,
1539                alias:        "rank".to_string(),
1540                partition_by: vec![PartitionByColumn::Dimension {
1541                    path: "category".to_string(),
1542                }],
1543                order_by:     vec![WindowOrderBy {
1544                    field:     "revenue".to_string(),
1545                    direction: OrderDirection::Desc,
1546                }],
1547                frame:        None,
1548            }],
1549            where_clause: None,
1550            order_by:     vec![],
1551            limit:        Some(100),
1552            offset:       None,
1553        };
1554
1555        let plan = WindowPlanner::plan(request, metadata).expect("window plan should succeed");
1556
1557        assert_eq!(plan.table, "tf_sales");
1558        assert_eq!(plan.select.len(), 2);
1559        assert_eq!(plan.select[0].expression, "revenue");
1560        assert_eq!(plan.select[0].alias, "revenue");
1561        assert_eq!(plan.select[1].expression, "dimensions->>'category'");
1562        assert_eq!(plan.select[1].alias, "category");
1563
1564        assert_eq!(plan.windows.len(), 1);
1565        assert_eq!(plan.windows[0].alias, "rank");
1566        assert!(matches!(plan.windows[0].function, WindowFunctionType::RowNumber));
1567        assert_eq!(plan.windows[0].partition_by, vec!["dimensions->>'category'"]);
1568        assert_eq!(plan.windows[0].order_by.len(), 1);
1569        assert_eq!(plan.windows[0].order_by[0].field, "revenue");
1570        assert_eq!(plan.windows[0].order_by[0].direction, OrderDirection::Desc);
1571
1572        assert_eq!(plan.limit, Some(100));
1573    }
1574
1575    #[test]
1576    fn test_window_planner_running_sum() {
1577        let metadata = create_test_metadata();
1578        let request = WindowRequest {
1579            table_name:   "tf_sales".to_string(),
1580            select:       vec![WindowSelectColumn::Measure {
1581                name:  "revenue".to_string(),
1582                alias: "revenue".to_string(),
1583            }],
1584            windows:      vec![WindowFunctionRequest {
1585                function:     WindowFunctionSpec::RunningSum {
1586                    measure: "revenue".to_string(),
1587                },
1588                alias:        "running_total".to_string(),
1589                partition_by: vec![],
1590                order_by:     vec![WindowOrderBy {
1591                    field:     "occurred_at".to_string(),
1592                    direction: OrderDirection::Asc,
1593                }],
1594                frame:        Some(WindowFrame {
1595                    frame_type: FrameType::Rows,
1596                    start:      FrameBoundary::UnboundedPreceding,
1597                    end:        FrameBoundary::CurrentRow,
1598                    exclusion:  None,
1599                }),
1600            }],
1601            where_clause: None,
1602            order_by:     vec![],
1603            limit:        None,
1604            offset:       None,
1605        };
1606
1607        let plan = WindowPlanner::plan(request, metadata).expect("window plan should succeed");
1608
1609        assert_eq!(plan.windows.len(), 1);
1610        match &plan.windows[0].function {
1611            WindowFunctionType::Sum { field } => {
1612                assert_eq!(field, "revenue");
1613            },
1614            _ => panic!("Expected Sum function"),
1615        }
1616        assert_eq!(plan.windows[0].alias, "running_total");
1617        assert!(plan.windows[0].frame.is_some());
1618    }
1619
1620    #[test]
1621    fn test_window_planner_filter_column() {
1622        let metadata = create_test_metadata();
1623        let request = WindowRequest {
1624            table_name:   "tf_sales".to_string(),
1625            select:       vec![WindowSelectColumn::Filter {
1626                name:  "occurred_at".to_string(),
1627                alias: "date".to_string(),
1628            }],
1629            windows:      vec![],
1630            where_clause: None,
1631            order_by:     vec![],
1632            limit:        None,
1633            offset:       None,
1634        };
1635
1636        let plan = WindowPlanner::plan(request, metadata).expect("window plan should succeed");
1637
1638        assert_eq!(plan.select.len(), 1);
1639        assert_eq!(plan.select[0].expression, "occurred_at");
1640        assert_eq!(plan.select[0].alias, "date");
1641    }
1642
1643    #[test]
1644    fn test_window_planner_invalid_measure() {
1645        let metadata = create_test_metadata();
1646        let request = WindowRequest {
1647            table_name:   "tf_sales".to_string(),
1648            select:       vec![WindowSelectColumn::Measure {
1649                name:  "nonexistent".to_string(),
1650                alias: "alias".to_string(),
1651            }],
1652            windows:      vec![],
1653            where_clause: None,
1654            order_by:     vec![],
1655            limit:        None,
1656            offset:       None,
1657        };
1658
1659        let result = WindowPlanner::plan(request, metadata);
1660        assert!(result.is_err());
1661        assert!(result.unwrap_err().to_string().contains("not found"));
1662    }
1663
1664    #[test]
1665    fn test_window_planner_invalid_filter() {
1666        let metadata = create_test_metadata();
1667        let request = WindowRequest {
1668            table_name:   "tf_sales".to_string(),
1669            select:       vec![WindowSelectColumn::Filter {
1670                name:  "nonexistent_filter".to_string(),
1671                alias: "alias".to_string(),
1672            }],
1673            windows:      vec![],
1674            where_clause: None,
1675            order_by:     vec![],
1676            limit:        None,
1677            offset:       None,
1678        };
1679
1680        let result = WindowPlanner::plan(request, metadata);
1681        assert!(result.is_err());
1682        assert!(result.unwrap_err().to_string().contains("not found"));
1683    }
1684
1685    #[test]
1686    fn test_window_planner_lag_function() {
1687        let metadata = create_test_metadata();
1688        let request = WindowRequest {
1689            table_name:   "tf_sales".to_string(),
1690            select:       vec![],
1691            windows:      vec![WindowFunctionRequest {
1692                function:     WindowFunctionSpec::Lag {
1693                    field:   "revenue".to_string(),
1694                    offset:  1,
1695                    default: Some(serde_json::json!(0)),
1696                },
1697                alias:        "prev_revenue".to_string(),
1698                partition_by: vec![],
1699                order_by:     vec![WindowOrderBy {
1700                    field:     "occurred_at".to_string(),
1701                    direction: OrderDirection::Asc,
1702                }],
1703                frame:        None,
1704            }],
1705            where_clause: None,
1706            order_by:     vec![],
1707            limit:        None,
1708            offset:       None,
1709        };
1710
1711        let plan = WindowPlanner::plan(request, metadata).expect("window plan should succeed");
1712
1713        match &plan.windows[0].function {
1714            WindowFunctionType::Lag {
1715                field,
1716                offset,
1717                default,
1718            } => {
1719                assert_eq!(field, "revenue"); // measure stays as-is
1720                assert_eq!(*offset, 1);
1721                assert!(default.is_some());
1722            },
1723            _ => panic!("Expected Lag function"),
1724        }
1725    }
1726
1727    #[test]
1728    fn test_window_planner_dimension_field_in_lag() {
1729        let metadata = create_test_metadata();
1730        let request = WindowRequest {
1731            table_name:   "tf_sales".to_string(),
1732            select:       vec![],
1733            windows:      vec![WindowFunctionRequest {
1734                function:     WindowFunctionSpec::Lag {
1735                    field:   "category".to_string(), // dimension path
1736                    offset:  1,
1737                    default: None,
1738                },
1739                alias:        "prev_category".to_string(),
1740                partition_by: vec![],
1741                order_by:     vec![WindowOrderBy {
1742                    field:     "occurred_at".to_string(),
1743                    direction: OrderDirection::Asc,
1744                }],
1745                frame:        None,
1746            }],
1747            where_clause: None,
1748            order_by:     vec![],
1749            limit:        None,
1750            offset:       None,
1751        };
1752
1753        let plan = WindowPlanner::plan(request, metadata).expect("window plan should succeed");
1754
1755        match &plan.windows[0].function {
1756            WindowFunctionType::Lag { field, .. } => {
1757                // dimension gets converted to JSONB extraction
1758                assert_eq!(field, "dimensions->>'category'");
1759            },
1760            _ => panic!("Expected Lag function"),
1761        }
1762    }
1763
1764    #[test]
1765    fn test_window_planner_partition_by_filter() {
1766        let metadata = create_test_metadata();
1767        let request = WindowRequest {
1768            table_name:   "tf_sales".to_string(),
1769            select:       vec![],
1770            windows:      vec![WindowFunctionRequest {
1771                function:     WindowFunctionSpec::RowNumber,
1772                alias:        "rank".to_string(),
1773                partition_by: vec![PartitionByColumn::Filter {
1774                    name: "customer_id".to_string(),
1775                }],
1776                order_by:     vec![],
1777                frame:        None,
1778            }],
1779            where_clause: None,
1780            order_by:     vec![],
1781            limit:        None,
1782            offset:       None,
1783        };
1784
1785        let plan = WindowPlanner::plan(request, metadata).expect("window plan should succeed");
1786
1787        assert_eq!(plan.windows[0].partition_by, vec!["customer_id"]);
1788    }
1789
1790    #[test]
1791    fn test_window_planner_final_order_by() {
1792        let metadata = create_test_metadata();
1793        let request = WindowRequest {
1794            table_name:   "tf_sales".to_string(),
1795            select:       vec![],
1796            windows:      vec![],
1797            where_clause: None,
1798            order_by:     vec![
1799                WindowOrderBy {
1800                    field:     "revenue".to_string(),
1801                    direction: OrderDirection::Desc,
1802                },
1803                WindowOrderBy {
1804                    field:     "category".to_string(), // dimension
1805                    direction: OrderDirection::Asc,
1806                },
1807            ],
1808            limit:        None,
1809            offset:       None,
1810        };
1811
1812        let plan = WindowPlanner::plan(request, metadata).expect("window plan should succeed");
1813
1814        assert_eq!(plan.order_by.len(), 2);
1815        assert_eq!(plan.order_by[0].field, "revenue");
1816        assert_eq!(plan.order_by[0].direction, OrderDirection::Desc);
1817        assert_eq!(plan.order_by[1].field, "dimensions->>'category'");
1818        assert_eq!(plan.order_by[1].direction, OrderDirection::Asc);
1819    }
1820
1821    #[test]
1822    fn test_window_request_serialization() {
1823        let request = WindowRequest {
1824            table_name:   "tf_sales".to_string(),
1825            select:       vec![WindowSelectColumn::Measure {
1826                name:  "revenue".to_string(),
1827                alias: "revenue".to_string(),
1828            }],
1829            windows:      vec![WindowFunctionRequest {
1830                function:     WindowFunctionSpec::RowNumber,
1831                alias:        "rank".to_string(),
1832                partition_by: vec![],
1833                order_by:     vec![],
1834                frame:        None,
1835            }],
1836            where_clause: None,
1837            order_by:     vec![],
1838            limit:        Some(10),
1839            offset:       None,
1840        };
1841
1842        // Should serialize without panic
1843        let json = serialize_json(&request);
1844        assert!(json.contains("tf_sales"));
1845        assert!(json.contains("revenue"));
1846        assert!(json.contains("row_number"));
1847
1848        // Should deserialize back
1849        let deserialized: WindowRequest = deserialize_json(&json);
1850        assert_eq!(deserialized.table_name, "tf_sales");
1851        assert_eq!(deserialized.limit, Some(10));
1852    }
1853
1854    #[test]
1855    fn test_window_function_spec_serialization() {
1856        let spec = WindowFunctionSpec::RunningSum {
1857            measure: "revenue".to_string(),
1858        };
1859        let json = serialize_json(&spec);
1860        assert!(json.contains("running_sum"));
1861        assert!(json.contains("revenue"));
1862
1863        let spec2 = WindowFunctionSpec::Ntile { n: 4 };
1864        let json2 = serialize_json(&spec2);
1865        assert!(json2.contains("ntile"));
1866        assert!(json2.contains("4"));
1867    }
1868}