Skip to main content

reddb_server/storage/query/executors/
window.rs

1//! Window Functions Executor
2//!
3//! Provides SQL standard window functions for analytical queries.
4//!
5//! # Window Function Types
6//!
7//! **Ranking Functions:**
8//! - `ROW_NUMBER()`: Sequential number within partition
9//! - `RANK()`: Rank with gaps for ties
10//! - `DENSE_RANK()`: Rank without gaps for ties
11//! - `NTILE(n)`: Divide partition into n buckets
12//! - `PERCENT_RANK()`: Relative rank as percentage
13//! - `CUME_DIST()`: Cumulative distribution
14//!
15//! **Value Functions:**
16//! - `FIRST_VALUE(x)`: First value in frame
17//! - `LAST_VALUE(x)`: Last value in frame
18//! - `NTH_VALUE(x, n)`: Nth value in frame
19//! - `LAG(x, n, default)`: Value n rows before current
20//! - `LEAD(x, n, default)`: Value n rows after current
21//!
22//! **Aggregate Functions (with OVER):**
23//! - All standard aggregates (SUM, AVG, COUNT, MIN, MAX, etc.)
24//!
25//! # Frame Specification
26//!
27//! Frames define the subset of partition rows for each computation:
28//! - `ROWS`: Physical row-based boundaries
29//! - `RANGE`: Value-based logical boundaries
30//! - `GROUPS`: Groups of peer rows
31//!
32//! # Implementation
33//!
34//! Window functions are evaluated in three phases:
35//! 1. **Partition**: Group rows by PARTITION BY columns
36//! 2. **Order**: Sort each partition by ORDER BY columns
37//! 3. **Compute**: Apply window function with frame for each row
38
39use std::cmp::Ordering;
40use std::collections::HashMap;
41
42use super::super::engine::binding::{Binding, Value, Var};
43use super::aggregation::create_aggregator;
44use super::value_compare::{total_compare_values, values_equal};
45
46// ============================================================================
47// Window Function Types
48// ============================================================================
49
50/// Type of window function
51#[derive(Debug, Clone, PartialEq, Eq)]
52pub enum WindowFuncType {
53    // Ranking functions
54    RowNumber,
55    Rank,
56    DenseRank,
57    Ntile(i64),
58    PercentRank,
59    CumeDist,
60
61    // Value functions
62    FirstValue(Var),
63    LastValue(Var),
64    NthValue(Var, i64),
65    Lag(Var, i64, Option<Value>),
66    Lead(Var, i64, Option<Value>),
67
68    // Aggregate functions with OVER
69    Aggregate(String, Var),
70}
71
72impl WindowFuncType {
73    /// Create a ROW_NUMBER function
74    pub fn row_number() -> Self {
75        Self::RowNumber
76    }
77
78    /// Create a RANK function
79    pub fn rank() -> Self {
80        Self::Rank
81    }
82
83    /// Create a DENSE_RANK function
84    pub fn dense_rank() -> Self {
85        Self::DenseRank
86    }
87
88    /// Create an NTILE function
89    pub fn ntile(n: i64) -> Self {
90        Self::Ntile(n)
91    }
92
93    /// Create a LAG function
94    pub fn lag(var: Var, offset: i64, default: Option<Value>) -> Self {
95        Self::Lag(var, offset, default)
96    }
97
98    /// Create a LEAD function
99    pub fn lead(var: Var, offset: i64, default: Option<Value>) -> Self {
100        Self::Lead(var, offset, default)
101    }
102
103    /// Create a FIRST_VALUE function
104    pub fn first_value(var: Var) -> Self {
105        Self::FirstValue(var)
106    }
107
108    /// Create a LAST_VALUE function
109    pub fn last_value(var: Var) -> Self {
110        Self::LastValue(var)
111    }
112
113    /// Create an aggregate window function
114    pub fn aggregate(name: &str, var: Var) -> Self {
115        Self::Aggregate(name.to_uppercase(), var)
116    }
117}
118
119// ============================================================================
120// Frame Specification
121// ============================================================================
122
123/// Frame type for window functions
124#[derive(Debug, Clone, Copy, PartialEq, Eq)]
125pub enum FrameType {
126    /// Physical row boundaries
127    Rows,
128    /// Value-based boundaries (for RANGE)
129    Range,
130    /// Groups of peer rows
131    Groups,
132}
133
134/// Frame boundary specification
135#[derive(Debug, Clone, PartialEq, Eq, Default)]
136pub enum FrameBound {
137    /// UNBOUNDED PRECEDING
138    UnboundedPreceding,
139    /// UNBOUNDED FOLLOWING
140    UnboundedFollowing,
141    /// CURRENT ROW
142    #[default]
143    CurrentRow,
144    /// n PRECEDING
145    Preceding(i64),
146    /// n FOLLOWING
147    Following(i64),
148}
149
150/// Frame specification for window function
151#[derive(Debug, Clone)]
152pub struct FrameSpec {
153    /// Frame type (ROWS, RANGE, GROUPS)
154    pub frame_type: FrameType,
155    /// Start boundary
156    pub start: FrameBound,
157    /// End boundary
158    pub end: FrameBound,
159    /// Exclude option (CURRENT ROW, GROUP, TIES, NO OTHERS)
160    pub exclude: FrameExclude,
161}
162
163/// Frame exclusion option
164#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
165pub enum FrameExclude {
166    /// EXCLUDE NO OTHERS (default)
167    #[default]
168    NoOthers,
169    /// EXCLUDE CURRENT ROW
170    CurrentRow,
171    /// EXCLUDE GROUP
172    Group,
173    /// EXCLUDE TIES
174    Ties,
175}
176
177impl Default for FrameSpec {
178    /// Default frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
179    fn default() -> Self {
180        Self {
181            frame_type: FrameType::Range,
182            start: FrameBound::UnboundedPreceding,
183            end: FrameBound::CurrentRow,
184            exclude: FrameExclude::NoOthers,
185        }
186    }
187}
188
189impl FrameSpec {
190    /// Create ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
191    pub fn entire_partition() -> Self {
192        Self {
193            frame_type: FrameType::Rows,
194            start: FrameBound::UnboundedPreceding,
195            end: FrameBound::UnboundedFollowing,
196            exclude: FrameExclude::NoOthers,
197        }
198    }
199
200    /// Create ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
201    pub fn running() -> Self {
202        Self {
203            frame_type: FrameType::Rows,
204            start: FrameBound::UnboundedPreceding,
205            end: FrameBound::CurrentRow,
206            exclude: FrameExclude::NoOthers,
207        }
208    }
209
210    /// Create ROWS BETWEEN n PRECEDING AND CURRENT ROW (sliding window)
211    pub fn sliding(n: i64) -> Self {
212        Self {
213            frame_type: FrameType::Rows,
214            start: FrameBound::Preceding(n),
215            end: FrameBound::CurrentRow,
216            exclude: FrameExclude::NoOthers,
217        }
218    }
219}
220
221// ============================================================================
222// Window Definition
223// ============================================================================
224
225/// Sort direction
226#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
227pub enum SortDirection {
228    #[default]
229    Asc,
230    Desc,
231}
232
233/// Null handling in sorting
234#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
235pub enum NullsOrder {
236    #[default]
237    First,
238    Last,
239}
240
241/// Order-by specification for window
242#[derive(Debug, Clone)]
243pub struct WindowOrderBy {
244    /// Variable to sort by
245    pub var: Var,
246    /// Sort direction
247    pub direction: SortDirection,
248    /// Null handling
249    pub nulls: NullsOrder,
250}
251
252impl WindowOrderBy {
253    pub fn new(var: Var) -> Self {
254        Self {
255            var,
256            direction: SortDirection::Asc,
257            nulls: NullsOrder::Last,
258        }
259    }
260
261    pub fn desc(mut self) -> Self {
262        self.direction = SortDirection::Desc;
263        self
264    }
265
266    pub fn nulls_first(mut self) -> Self {
267        self.nulls = NullsOrder::First;
268        self
269    }
270}
271
272/// Complete window definition
273#[derive(Debug, Clone, Default)]
274pub struct WindowDef {
275    /// Optional window name
276    pub name: Option<String>,
277    /// Partition by variables
278    pub partition_by: Vec<Var>,
279    /// Order by specifications
280    pub order_by: Vec<WindowOrderBy>,
281    /// Frame specification
282    pub frame: FrameSpec,
283}
284
285impl WindowDef {
286    /// Create a window with partition by
287    pub fn partition_by(vars: Vec<Var>) -> Self {
288        Self {
289            partition_by: vars,
290            ..Default::default()
291        }
292    }
293
294    /// Add order by
295    pub fn with_order_by(mut self, order: Vec<WindowOrderBy>) -> Self {
296        self.order_by = order;
297        self
298    }
299
300    /// Set frame specification
301    pub fn with_frame(mut self, frame: FrameSpec) -> Self {
302        self.frame = frame;
303        self
304    }
305}
306
307// ============================================================================
308// Window Function Application
309// ============================================================================
310
311/// A window function to apply
312#[derive(Debug, Clone)]
313pub struct WindowFunc {
314    /// The function type
315    pub func_type: WindowFuncType,
316    /// Result variable name
317    pub result_var: Var,
318    /// Window definition
319    pub window: WindowDef,
320}
321
322impl WindowFunc {
323    /// Create a new window function
324    pub fn new(func_type: WindowFuncType, result_var: Var, window: WindowDef) -> Self {
325        Self {
326            func_type,
327            result_var,
328            window,
329        }
330    }
331}
332
333// ============================================================================
334// Window Executor
335// ============================================================================
336
337/// Window function executor
338pub struct WindowExecutor;
339
340#[derive(Debug, Clone)]
341struct IndexedBinding {
342    index: usize,
343    binding: Binding,
344}
345
346impl WindowExecutor {
347    /// Execute window functions on bindings
348    pub fn execute(bindings: Vec<Binding>, functions: &[WindowFunc]) -> Vec<Binding> {
349        if bindings.is_empty() || functions.is_empty() {
350            return bindings;
351        }
352
353        // We need to execute each window function
354        let mut result = bindings;
355
356        for func in functions {
357            result = Self::apply_window_function(&result, func);
358        }
359
360        result
361    }
362
363    /// Apply a single window function
364    fn apply_window_function(bindings: &[Binding], func: &WindowFunc) -> Vec<Binding> {
365        // Step 1: Partition the data
366        let partitions = Self::partition_bindings(bindings, &func.window.partition_by);
367
368        // Step 2: For each partition, sort and compute
369        let mut result: Vec<Option<Binding>> = vec![None; bindings.len()];
370
371        for (_key, mut partition) in partitions {
372            // Sort within partition
373            Self::sort_partition(&mut partition, &func.window.order_by);
374
375            // Compute window function for each row
376            let computed = Self::compute_for_partition(&partition, func);
377            for entry in computed {
378                if entry.index < result.len() {
379                    result[entry.index] = Some(entry.binding);
380                }
381            }
382        }
383
384        result
385            .into_iter()
386            .enumerate()
387            .map(|(idx, binding)| binding.unwrap_or_else(|| bindings[idx].clone()))
388            .collect()
389    }
390
391    /// Partition bindings by partition-by variables
392    fn partition_bindings(
393        bindings: &[Binding],
394        partition_by: &[Var],
395    ) -> Vec<(Vec<Option<Value>>, Vec<IndexedBinding>)> {
396        if partition_by.is_empty() {
397            // No partitioning - single partition with all rows
398            let entries = bindings
399                .iter()
400                .cloned()
401                .enumerate()
402                .map(|(index, binding)| IndexedBinding { index, binding })
403                .collect();
404            return vec![(vec![], entries)];
405        }
406
407        let mut partitions: HashMap<Vec<Option<Value>>, Vec<IndexedBinding>> = HashMap::new();
408        let mut key_order: Vec<Vec<Option<Value>>> = Vec::new();
409
410        for (index, binding) in bindings.iter().cloned().enumerate() {
411            let key_values: Vec<Option<Value>> = partition_by
412                .iter()
413                .map(|v| binding.get(v).cloned())
414                .collect();
415
416            if !partitions.contains_key(&key_values) {
417                key_order.push(key_values.clone());
418            }
419
420            partitions
421                .entry(key_values)
422                .or_default()
423                .push(IndexedBinding { index, binding });
424        }
425
426        // Maintain insertion order
427        key_order
428            .into_iter()
429            .filter_map(|values| partitions.remove(&values).map(|rows| (values, rows)))
430            .collect()
431    }
432
433    /// Sort partition by order-by specifications
434    fn sort_partition(partition: &mut [IndexedBinding], order_by: &[WindowOrderBy]) {
435        if order_by.is_empty() {
436            return;
437        }
438
439        partition.sort_by(|a, b| {
440            for spec in order_by {
441                let val_a = a.binding.get(&spec.var);
442                let val_b = b.binding.get(&spec.var);
443
444                let cmp = match (val_a, val_b) {
445                    (None, None) => Ordering::Equal,
446                    (None, Some(_)) => match spec.nulls {
447                        NullsOrder::First => Ordering::Less,
448                        NullsOrder::Last => Ordering::Greater,
449                    },
450                    (Some(_), None) => match spec.nulls {
451                        NullsOrder::First => Ordering::Greater,
452                        NullsOrder::Last => Ordering::Less,
453                    },
454                    (Some(a), Some(b)) => total_compare_values(a, b),
455                };
456
457                if cmp != Ordering::Equal {
458                    return match spec.direction {
459                        SortDirection::Asc => cmp,
460                        SortDirection::Desc => cmp.reverse(),
461                    };
462                }
463            }
464            a.index.cmp(&b.index)
465        });
466    }
467
468    /// Compute window function for each row in a partition
469    fn compute_for_partition(
470        partition: &[IndexedBinding],
471        func: &WindowFunc,
472    ) -> Vec<IndexedBinding> {
473        let partition_size = partition.len();
474
475        // Pre-compute peer groups for ranking functions
476        let peer_groups = Self::compute_peer_groups(partition, &func.window.order_by);
477
478        partition
479            .iter()
480            .enumerate()
481            .map(|(row_idx, indexed)| {
482                let value =
483                    Self::compute_value(partition, row_idx, &peer_groups, partition_size, func);
484
485                // Add result to binding
486                let result_binding = Binding::one(func.result_var.clone(), value);
487                let binding = indexed
488                    .binding
489                    .merge(&result_binding)
490                    .unwrap_or_else(|| indexed.binding.clone());
491                IndexedBinding {
492                    index: indexed.index,
493                    binding,
494                }
495            })
496            .collect()
497    }
498
499    /// Compute peer groups (rows with same ORDER BY values)
500    fn compute_peer_groups(partition: &[IndexedBinding], order_by: &[WindowOrderBy]) -> Vec<usize> {
501        if order_by.is_empty() {
502            // No ordering - all rows are peers (single group)
503            return vec![0; partition.len()];
504        }
505
506        let mut groups = Vec::with_capacity(partition.len());
507        let mut current_group = 0;
508
509        for (idx, indexed) in partition.iter().enumerate() {
510            if idx == 0 {
511                groups.push(0);
512                continue;
513            }
514
515            let prev = &partition[idx - 1].binding;
516            let binding = &indexed.binding;
517            let is_peer = order_by.iter().all(|spec| {
518                let a = prev.get(&spec.var);
519                let b = binding.get(&spec.var);
520                match (a, b) {
521                    (None, None) => true,
522                    (Some(va), Some(vb)) => values_equal(va, vb),
523                    _ => false,
524                }
525            });
526
527            if !is_peer {
528                current_group += 1;
529            }
530            groups.push(current_group);
531        }
532
533        groups
534    }
535
536    /// Compute value for a single row
537    fn compute_value(
538        partition: &[IndexedBinding],
539        row_idx: usize,
540        peer_groups: &[usize],
541        partition_size: usize,
542        func: &WindowFunc,
543    ) -> Value {
544        match &func.func_type {
545            // Ranking functions
546            WindowFuncType::RowNumber => Value::Integer((row_idx + 1) as i64),
547
548            WindowFuncType::Rank => {
549                // Rank = position of first row in current peer group + 1
550                let current_group = peer_groups[row_idx];
551                let first_in_group = peer_groups
552                    .iter()
553                    .position(|&g| g == current_group)
554                    .unwrap();
555                Value::Integer((first_in_group + 1) as i64)
556            }
557
558            WindowFuncType::DenseRank => {
559                // Dense rank = peer group number + 1
560                Value::Integer((peer_groups[row_idx] + 1) as i64)
561            }
562
563            WindowFuncType::Ntile(n) => {
564                // Divide into n buckets
565                let n = *n as usize;
566                if n == 0 || partition_size == 0 {
567                    return Value::Null;
568                }
569                let bucket_size = partition_size / n;
570                let remainder = partition_size % n;
571
572                // Rows are distributed: first `remainder` buckets get one extra row
573                let mut row = 0;
574                let mut bucket = 1;
575                for i in 0..n {
576                    let size = bucket_size + if i < remainder { 1 } else { 0 };
577                    if row_idx < row + size {
578                        bucket = i + 1;
579                        break;
580                    }
581                    row += size;
582                }
583                Value::Integer(bucket as i64)
584            }
585
586            WindowFuncType::PercentRank => {
587                // (rank - 1) / (partition_size - 1)
588                if partition_size <= 1 {
589                    return Value::Float(0.0);
590                }
591                let current_group = peer_groups[row_idx];
592                let first_in_group = peer_groups
593                    .iter()
594                    .position(|&g| g == current_group)
595                    .unwrap();
596                let rank = first_in_group as f64;
597                Value::Float(rank / (partition_size - 1) as f64)
598            }
599
600            WindowFuncType::CumeDist => {
601                // count of rows <= current row / partition_size
602                let current_group = peer_groups[row_idx];
603                // Count all rows up to and including current peer group
604                let count = peer_groups.iter().filter(|&&g| g <= current_group).count();
605                Value::Float(count as f64 / partition_size as f64)
606            }
607
608            // Value functions
609            WindowFuncType::FirstValue(var) => {
610                // Get frame bounds
611                let (start, _) = Self::get_frame_bounds(
612                    row_idx,
613                    partition_size,
614                    peer_groups,
615                    &func.window.frame,
616                );
617                partition
618                    .get(start)
619                    .and_then(|b| b.binding.get(var))
620                    .cloned()
621                    .unwrap_or(Value::Null)
622            }
623
624            WindowFuncType::LastValue(var) => {
625                let (_, end) = Self::get_frame_bounds(
626                    row_idx,
627                    partition_size,
628                    peer_groups,
629                    &func.window.frame,
630                );
631                // End is exclusive, so use end - 1
632                if end > 0 {
633                    partition
634                        .get(end - 1)
635                        .and_then(|b| b.binding.get(var))
636                        .cloned()
637                        .unwrap_or(Value::Null)
638                } else {
639                    Value::Null
640                }
641            }
642
643            WindowFuncType::NthValue(var, n) => {
644                let (start, end) = Self::get_frame_bounds(
645                    row_idx,
646                    partition_size,
647                    peer_groups,
648                    &func.window.frame,
649                );
650                let n = *n as usize;
651                if n == 0 {
652                    return Value::Null;
653                }
654                let target_idx = start + n - 1;
655                if target_idx < end {
656                    partition
657                        .get(target_idx)
658                        .and_then(|b| b.binding.get(var))
659                        .cloned()
660                        .unwrap_or(Value::Null)
661                } else {
662                    Value::Null
663                }
664            }
665
666            WindowFuncType::Lag(var, offset, default) => {
667                let offset = *offset as usize;
668                if row_idx >= offset {
669                    partition
670                        .get(row_idx - offset)
671                        .and_then(|b| b.binding.get(var))
672                        .cloned()
673                        .unwrap_or_else(|| default.clone().unwrap_or(Value::Null))
674                } else {
675                    default.clone().unwrap_or(Value::Null)
676                }
677            }
678
679            WindowFuncType::Lead(var, offset, default) => {
680                let offset = *offset as usize;
681                let target = row_idx + offset;
682                if target < partition_size {
683                    partition
684                        .get(target)
685                        .and_then(|b| b.binding.get(var))
686                        .cloned()
687                        .unwrap_or_else(|| default.clone().unwrap_or(Value::Null))
688                } else {
689                    default.clone().unwrap_or(Value::Null)
690                }
691            }
692
693            // Aggregate functions
694            WindowFuncType::Aggregate(agg_name, var) => {
695                let (start, end) = Self::get_frame_bounds(
696                    row_idx,
697                    partition_size,
698                    peer_groups,
699                    &func.window.frame,
700                );
701
702                if let Some(mut aggregator) = create_aggregator(agg_name) {
703                    for i in start..end {
704                        if let Some(binding) = partition.get(i) {
705                            let value = binding.binding.get(var);
706                            aggregator.accumulate(value);
707                        }
708                    }
709                    aggregator.finalize()
710                } else {
711                    Value::Null
712                }
713            }
714        }
715    }
716
717    /// Get frame bounds (start, end) for current row
718    /// Returns (inclusive start, exclusive end)
719    fn get_frame_bounds(
720        row_idx: usize,
721        partition_size: usize,
722        peer_groups: &[usize],
723        frame: &FrameSpec,
724    ) -> (usize, usize) {
725        let start = match &frame.start {
726            FrameBound::UnboundedPreceding => 0,
727            FrameBound::CurrentRow => {
728                match frame.frame_type {
729                    FrameType::Rows => row_idx,
730                    FrameType::Range | FrameType::Groups => {
731                        // Start of current peer group
732                        let group = peer_groups[row_idx];
733                        peer_groups
734                            .iter()
735                            .position(|&g| g == group)
736                            .unwrap_or(row_idx)
737                    }
738                }
739            }
740            FrameBound::Preceding(n) => {
741                match frame.frame_type {
742                    FrameType::Rows => row_idx.saturating_sub(*n as usize),
743                    FrameType::Groups => {
744                        // n groups preceding
745                        let current_group = peer_groups[row_idx];
746                        let target_group = current_group.saturating_sub(*n as usize);
747                        peer_groups
748                            .iter()
749                            .position(|&g| g == target_group)
750                            .unwrap_or(0)
751                    }
752                    FrameType::Range => row_idx.saturating_sub(*n as usize),
753                }
754            }
755            FrameBound::Following(n) => match frame.frame_type {
756                FrameType::Rows => (row_idx + *n as usize).min(partition_size),
757                FrameType::Groups => {
758                    let current_group = peer_groups[row_idx];
759                    let target_group = current_group + *n as usize;
760                    peer_groups
761                        .iter()
762                        .position(|&g| g >= target_group)
763                        .unwrap_or(partition_size)
764                }
765                FrameType::Range => (row_idx + *n as usize).min(partition_size),
766            },
767            FrameBound::UnboundedFollowing => partition_size,
768        };
769
770        let end = match &frame.end {
771            FrameBound::UnboundedFollowing => partition_size,
772            FrameBound::CurrentRow => {
773                match frame.frame_type {
774                    FrameType::Rows => row_idx + 1,
775                    FrameType::Range | FrameType::Groups => {
776                        // End of current peer group (exclusive)
777                        let group = peer_groups[row_idx];
778                        peer_groups
779                            .iter()
780                            .position(|&g| g > group)
781                            .unwrap_or(partition_size)
782                    }
783                }
784            }
785            FrameBound::Preceding(n) => match frame.frame_type {
786                FrameType::Rows => row_idx.saturating_sub(*n as usize) + 1,
787                FrameType::Groups => {
788                    let current_group = peer_groups[row_idx];
789                    let target_group = current_group.saturating_sub(*n as usize);
790                    peer_groups
791                        .iter()
792                        .position(|&g| g > target_group)
793                        .unwrap_or(partition_size)
794                }
795                FrameType::Range => row_idx.saturating_sub(*n as usize) + 1,
796            },
797            FrameBound::Following(n) => match frame.frame_type {
798                FrameType::Rows => (row_idx + *n as usize + 1).min(partition_size),
799                FrameType::Groups => {
800                    let current_group = peer_groups[row_idx];
801                    let target_group = current_group + *n as usize;
802                    peer_groups
803                        .iter()
804                        .position(|&g| g > target_group)
805                        .unwrap_or(partition_size)
806                }
807                FrameType::Range => (row_idx + *n as usize + 1).min(partition_size),
808            },
809            FrameBound::UnboundedPreceding => 0, // Invalid but handle gracefully
810        };
811
812        (
813            start.min(partition_size),
814            end.min(partition_size).max(start),
815        )
816    }
817}
818
819// ============================================================================
820// Tests
821// ============================================================================
822
823#[cfg(test)]
824mod tests {
825    use super::*;
826
827    fn make_binding(pairs: &[(&str, Value)]) -> Binding {
828        if pairs.is_empty() {
829            return Binding::empty();
830        }
831
832        let mut result = Binding::one(Var::new(pairs[0].0), pairs[0].1.clone());
833
834        for (k, v) in pairs.iter().skip(1) {
835            let next = Binding::one(Var::new(k), v.clone());
836            result = result.merge(&next).unwrap_or(result);
837        }
838
839        result
840    }
841
842    fn get_values(bindings: &[Binding], var: &str) -> Vec<i64> {
843        let v = Var::new(var);
844        bindings
845            .iter()
846            .filter_map(|b| b.get(&v))
847            .filter_map(|v| match v {
848                Value::Integer(i) => Some(*i),
849                _ => None,
850            })
851            .collect()
852    }
853
854    #[test]
855    fn test_row_number() {
856        let bindings = vec![
857            make_binding(&[
858                ("dept", Value::String("A".to_string())),
859                ("salary", Value::Integer(100)),
860            ]),
861            make_binding(&[
862                ("dept", Value::String("A".to_string())),
863                ("salary", Value::Integer(200)),
864            ]),
865            make_binding(&[
866                ("dept", Value::String("B".to_string())),
867                ("salary", Value::Integer(150)),
868            ]),
869        ];
870
871        let func = WindowFunc::new(
872            WindowFuncType::RowNumber,
873            Var::new("rn"),
874            WindowDef::partition_by(vec![Var::new("dept")])
875                .with_order_by(vec![WindowOrderBy::new(Var::new("salary"))]),
876        );
877
878        let result = WindowExecutor::execute(bindings, &[func]);
879
880        // Each partition should have row numbers starting from 1
881        let rns = get_values(&result, "rn");
882        assert_eq!(rns, vec![1, 2, 1]); // A: 1,2 and B: 1
883    }
884
885    #[test]
886    fn test_rank_with_ties() {
887        let bindings = vec![
888            make_binding(&[("score", Value::Integer(100))]),
889            make_binding(&[("score", Value::Integer(100))]), // Tie
890            make_binding(&[("score", Value::Integer(90))]),
891            make_binding(&[("score", Value::Integer(80))]),
892        ];
893
894        let func = WindowFunc::new(
895            WindowFuncType::Rank,
896            Var::new("rank"),
897            WindowDef::default().with_order_by(vec![WindowOrderBy::new(Var::new("score")).desc()]),
898        );
899
900        let result = WindowExecutor::execute(bindings, &[func]);
901        let ranks = get_values(&result, "rank");
902
903        // 100 = rank 1, 100 = rank 1 (tie), 90 = rank 3 (skip 2), 80 = rank 4
904        assert_eq!(ranks, vec![1, 1, 3, 4]);
905    }
906
907    #[test]
908    fn test_dense_rank() {
909        let bindings = vec![
910            make_binding(&[("score", Value::Integer(100))]),
911            make_binding(&[("score", Value::Integer(100))]),
912            make_binding(&[("score", Value::Integer(90))]),
913        ];
914
915        let func = WindowFunc::new(
916            WindowFuncType::DenseRank,
917            Var::new("drank"),
918            WindowDef::default().with_order_by(vec![WindowOrderBy::new(Var::new("score")).desc()]),
919        );
920
921        let result = WindowExecutor::execute(bindings, &[func]);
922        let ranks = get_values(&result, "drank");
923
924        // No gaps: 1, 1, 2
925        assert_eq!(ranks, vec![1, 1, 2]);
926    }
927
928    #[test]
929    fn test_ntile() {
930        let bindings: Vec<Binding> = (1..=10)
931            .map(|i| make_binding(&[("val", Value::Integer(i))]))
932            .collect();
933
934        let func = WindowFunc::new(
935            WindowFuncType::Ntile(4),
936            Var::new("bucket"),
937            WindowDef::default().with_order_by(vec![WindowOrderBy::new(Var::new("val"))]),
938        );
939
940        let result = WindowExecutor::execute(bindings, &[func]);
941        let buckets = get_values(&result, "bucket");
942
943        // 10 rows into 4 buckets: 3, 3, 2, 2 rows per bucket
944        assert_eq!(buckets, vec![1, 1, 1, 2, 2, 2, 3, 3, 4, 4]);
945    }
946
947    #[test]
948    fn test_lag_lead() {
949        let bindings: Vec<Binding> = (1..=5)
950            .map(|i| make_binding(&[("val", Value::Integer(i))]))
951            .collect();
952
953        let lag_func = WindowFunc::new(
954            WindowFuncType::Lag(Var::new("val"), 1, Some(Value::Integer(0))),
955            Var::new("prev"),
956            WindowDef::default().with_order_by(vec![WindowOrderBy::new(Var::new("val"))]),
957        );
958
959        let lead_func = WindowFunc::new(
960            WindowFuncType::Lead(Var::new("val"), 1, Some(Value::Integer(0))),
961            Var::new("next"),
962            WindowDef::default().with_order_by(vec![WindowOrderBy::new(Var::new("val"))]),
963        );
964
965        let result = WindowExecutor::execute(bindings, &[lag_func, lead_func]);
966        let prevs = get_values(&result, "prev");
967        let nexts = get_values(&result, "next");
968
969        assert_eq!(prevs, vec![0, 1, 2, 3, 4]); // LAG(val, 1, 0)
970        assert_eq!(nexts, vec![2, 3, 4, 5, 0]); // LEAD(val, 1, 0)
971    }
972
973    #[test]
974    fn test_running_sum() {
975        let bindings: Vec<Binding> = (1..=5)
976            .map(|i| make_binding(&[("val", Value::Integer(i))]))
977            .collect();
978
979        let func = WindowFunc::new(
980            WindowFuncType::Aggregate("SUM".to_string(), Var::new("val")),
981            Var::new("running_sum"),
982            WindowDef::default()
983                .with_order_by(vec![WindowOrderBy::new(Var::new("val"))])
984                .with_frame(FrameSpec::running()),
985        );
986
987        let result = WindowExecutor::execute(bindings, &[func]);
988        let sums = get_values(&result, "running_sum");
989
990        // Running sum: 1, 1+2=3, 1+2+3=6, ...
991        assert_eq!(sums, vec![1, 3, 6, 10, 15]);
992    }
993
994    #[test]
995    fn test_first_last_value() {
996        let bindings: Vec<Binding> = (1..=5)
997            .map(|i| make_binding(&[("val", Value::Integer(i))]))
998            .collect();
999
1000        let first_func = WindowFunc::new(
1001            WindowFuncType::FirstValue(Var::new("val")),
1002            Var::new("first"),
1003            WindowDef::default()
1004                .with_order_by(vec![WindowOrderBy::new(Var::new("val"))])
1005                .with_frame(FrameSpec::entire_partition()),
1006        );
1007
1008        let last_func = WindowFunc::new(
1009            WindowFuncType::LastValue(Var::new("val")),
1010            Var::new("last"),
1011            WindowDef::default()
1012                .with_order_by(vec![WindowOrderBy::new(Var::new("val"))])
1013                .with_frame(FrameSpec::entire_partition()),
1014        );
1015
1016        let result = WindowExecutor::execute(bindings, &[first_func, last_func]);
1017        let firsts = get_values(&result, "first");
1018        let lasts = get_values(&result, "last");
1019
1020        assert_eq!(firsts, vec![1, 1, 1, 1, 1]); // First value of partition
1021        assert_eq!(lasts, vec![5, 5, 5, 5, 5]); // Last value of partition
1022    }
1023
1024    #[test]
1025    fn test_partitioned_sum() {
1026        let bindings = vec![
1027            make_binding(&[
1028                ("dept", Value::String("A".to_string())),
1029                ("salary", Value::Integer(100)),
1030            ]),
1031            make_binding(&[
1032                ("dept", Value::String("A".to_string())),
1033                ("salary", Value::Integer(200)),
1034            ]),
1035            make_binding(&[
1036                ("dept", Value::String("B".to_string())),
1037                ("salary", Value::Integer(150)),
1038            ]),
1039            make_binding(&[
1040                ("dept", Value::String("B".to_string())),
1041                ("salary", Value::Integer(250)),
1042            ]),
1043        ];
1044
1045        let func = WindowFunc::new(
1046            WindowFuncType::Aggregate("SUM".to_string(), Var::new("salary")),
1047            Var::new("dept_total"),
1048            WindowDef::partition_by(vec![Var::new("dept")])
1049                .with_frame(FrameSpec::entire_partition()),
1050        );
1051
1052        let result = WindowExecutor::execute(bindings, &[func]);
1053        let totals = get_values(&result, "dept_total");
1054
1055        // A: 100+200=300, B: 150+250=400
1056        assert_eq!(totals, vec![300, 300, 400, 400]);
1057    }
1058
1059    #[test]
1060    fn test_window_preserves_input_order() {
1061        let bindings = vec![
1062            make_binding(&[
1063                ("dept", Value::String("A".to_string())),
1064                ("seq", Value::Integer(1)),
1065            ]),
1066            make_binding(&[
1067                ("dept", Value::String("B".to_string())),
1068                ("seq", Value::Integer(1)),
1069            ]),
1070            make_binding(&[
1071                ("dept", Value::String("A".to_string())),
1072                ("seq", Value::Integer(2)),
1073            ]),
1074            make_binding(&[
1075                ("dept", Value::String("B".to_string())),
1076                ("seq", Value::Integer(2)),
1077            ]),
1078        ];
1079
1080        let func = WindowFunc::new(
1081            WindowFuncType::RowNumber,
1082            Var::new("rn"),
1083            WindowDef::partition_by(vec![Var::new("dept")])
1084                .with_order_by(vec![WindowOrderBy::new(Var::new("seq"))]),
1085        );
1086
1087        let result = WindowExecutor::execute(bindings, &[func]);
1088        let dept_var = Var::new("dept");
1089        let depts: Vec<String> = result
1090            .iter()
1091            .filter_map(|b| b.get(&dept_var))
1092            .filter_map(|v| match v {
1093                Value::String(s) => Some(s.clone()),
1094                _ => None,
1095            })
1096            .collect();
1097
1098        assert_eq!(depts, vec!["A", "B", "A", "B"]);
1099        assert_eq!(get_values(&result, "rn"), vec![1, 1, 2, 2]);
1100    }
1101
1102    #[test]
1103    fn test_percent_rank() {
1104        let bindings: Vec<Binding> = (1..=4)
1105            .map(|i| make_binding(&[("val", Value::Integer(i))]))
1106            .collect();
1107
1108        let func = WindowFunc::new(
1109            WindowFuncType::PercentRank,
1110            Var::new("prank"),
1111            WindowDef::default().with_order_by(vec![WindowOrderBy::new(Var::new("val"))]),
1112        );
1113
1114        let result = WindowExecutor::execute(bindings, &[func]);
1115
1116        // Check percent ranks: 0, 0.333..., 0.666..., 1.0
1117        for (i, binding) in result.iter().enumerate() {
1118            if let Some(Value::Float(pr)) = binding.get(&Var::new("prank")) {
1119                let expected = i as f64 / 3.0;
1120                assert!(
1121                    (pr - expected).abs() < 0.001,
1122                    "Row {}: expected {}, got {}",
1123                    i,
1124                    expected,
1125                    pr
1126                );
1127            }
1128        }
1129    }
1130}