Skip to main content

fsqlite_func/
window_builtins.rs

1//! Built-in window functions (S13.5, bd-14i6).
2//!
3//! Implements: row_number, rank, dense_rank, percent_rank, cume_dist,
4//! ntile, lag, lead, first_value, last_value, nth_value.
5//!
6//! These functions implement the [`WindowFunction`] trait.  The VDBE is
7//! responsible for partitioning, ordering, and frame management; these
8//! implementations provide the per-row computation logic.
9//!
10//! # Design Notes
11//!
12//! Pure-numbering functions (row_number, rank, dense_rank) track position
13//! via step() and expose the current value through value().  The ORDER BY
14//! column is passed as args\[0\] so the function can detect peer-group
15//! boundaries.
16//!
17//! Buffer-based functions (lag, lead, first_value, last_value, nth_value)
18//! maintain an internal VecDeque of values and expose frame-relative
19//! access through value().
20#![allow(
21    clippy::unnecessary_literal_bound,
22    clippy::cast_possible_truncation,
23    clippy::cast_possible_wrap,
24    clippy::cast_precision_loss,
25    clippy::cast_sign_loss,
26    clippy::items_after_statements,
27    clippy::float_cmp,
28    clippy::match_same_arms,
29    clippy::similar_names
30)]
31
32use std::collections::VecDeque;
33
34use fsqlite_error::Result;
35use fsqlite_types::SqliteValue;
36
37use crate::{FunctionRegistry, WindowFunction};
38
39// ═══════════════════════════════════════════════════════════════════════════
40// row_number()
41// ═══════════════════════════════════════════════════════════════════════════
42
43pub struct RowNumberState {
44    counter: i64,
45}
46
47pub struct RowNumberFunc;
48
49impl WindowFunction for RowNumberFunc {
50    type State = RowNumberState;
51
52    fn initial_state(&self) -> Self::State {
53        RowNumberState { counter: 0 }
54    }
55
56    fn step(&self, state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
57        state.counter += 1;
58        Ok(())
59    }
60
61    fn inverse(&self, state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
62        state.counter -= 1;
63        Ok(())
64    }
65
66    fn value(&self, state: &Self::State) -> Result<SqliteValue> {
67        Ok(SqliteValue::Integer(state.counter))
68    }
69
70    fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
71        Ok(SqliteValue::Integer(state.counter))
72    }
73
74    fn num_args(&self) -> i32 {
75        0
76    }
77
78    fn name(&self) -> &str {
79        "row_number"
80    }
81}
82
83// ═══════════════════════════════════════════════════════════════════════════
84// rank()
85// ═══════════════════════════════════════════════════════════════════════════
86
87pub struct RankState {
88    row_number: i64,
89    rank: i64,
90    last_order_value: Option<SqliteValue>,
91}
92
93pub struct RankFunc;
94
95impl WindowFunction for RankFunc {
96    type State = RankState;
97
98    fn initial_state(&self) -> Self::State {
99        RankState {
100            row_number: 0,
101            rank: 0,
102            last_order_value: None,
103        }
104    }
105
106    fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
107        state.row_number += 1;
108        let current = args.first().cloned().unwrap_or(SqliteValue::Null);
109        let is_new_peer = match &state.last_order_value {
110            None => true,
111            Some(last) => &current != last,
112        };
113        if is_new_peer {
114            state.rank = state.row_number;
115            state.last_order_value = Some(current);
116        }
117        Ok(())
118    }
119
120    fn inverse(&self, _state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
121        // rank() uses UNBOUNDED PRECEDING to CURRENT ROW; inverse is a no-op.
122        Ok(())
123    }
124
125    fn value(&self, state: &Self::State) -> Result<SqliteValue> {
126        Ok(SqliteValue::Integer(state.rank))
127    }
128
129    fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
130        Ok(SqliteValue::Integer(state.rank))
131    }
132
133    fn num_args(&self) -> i32 {
134        -1
135    }
136
137    fn name(&self) -> &str {
138        "rank"
139    }
140}
141
142// ═══════════════════════════════════════════════════════════════════════════
143// dense_rank()
144// ═══════════════════════════════════════════════════════════════════════════
145
146pub struct DenseRankState {
147    dense_rank: i64,
148    last_order_value: Option<SqliteValue>,
149}
150
151pub struct DenseRankFunc;
152
153impl WindowFunction for DenseRankFunc {
154    type State = DenseRankState;
155
156    fn initial_state(&self) -> Self::State {
157        DenseRankState {
158            dense_rank: 0,
159            last_order_value: None,
160        }
161    }
162
163    fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
164        let current = args.first().cloned().unwrap_or(SqliteValue::Null);
165        let is_new_peer = match &state.last_order_value {
166            None => true,
167            Some(last) => &current != last,
168        };
169        if is_new_peer {
170            state.dense_rank += 1;
171            state.last_order_value = Some(current);
172        }
173        Ok(())
174    }
175
176    fn inverse(&self, _state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
177        Ok(())
178    }
179
180    fn value(&self, state: &Self::State) -> Result<SqliteValue> {
181        Ok(SqliteValue::Integer(state.dense_rank))
182    }
183
184    fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
185        Ok(SqliteValue::Integer(state.dense_rank))
186    }
187
188    fn num_args(&self) -> i32 {
189        -1
190    }
191
192    fn name(&self) -> &str {
193        "dense_rank"
194    }
195}
196
197// ═══════════════════════════════════════════════════════════════════════════
198// percent_rank()
199// ═══════════════════════════════════════════════════════════════════════════
200
201/// State for `percent_rank()`.
202///
203/// The VDBE must step() all rows in the partition first (to compute ranks
204/// and partition size), then iterate by calling value() and inverse() for
205/// each output row.
206pub struct PercentRankState {
207    partition_size: i64,
208    ranks: Vec<i64>,
209    cursor: usize,
210    step_row_number: i64,
211    current_rank: i64,
212    last_order_value: Option<SqliteValue>,
213}
214
215pub struct PercentRankFunc;
216
217impl WindowFunction for PercentRankFunc {
218    type State = PercentRankState;
219
220    fn initial_state(&self) -> Self::State {
221        PercentRankState {
222            partition_size: 0,
223            ranks: Vec::new(),
224            cursor: 0,
225            step_row_number: 0,
226            current_rank: 0,
227            last_order_value: None,
228        }
229    }
230
231    fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
232        state.step_row_number += 1;
233        state.partition_size += 1;
234        let current = args.first().cloned().unwrap_or(SqliteValue::Null);
235        let is_new_peer = match &state.last_order_value {
236            None => true,
237            Some(last) => &current != last,
238        };
239        if is_new_peer {
240            state.current_rank = state.step_row_number;
241            state.last_order_value = Some(current);
242        }
243        state.ranks.push(state.current_rank);
244        Ok(())
245    }
246
247    fn inverse(&self, state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
248        state.cursor += 1;
249        Ok(())
250    }
251
252    fn value(&self, state: &Self::State) -> Result<SqliteValue> {
253        if state.partition_size <= 1 {
254            return Ok(SqliteValue::Float(0.0));
255        }
256        let rank = state.ranks.get(state.cursor).copied().unwrap_or(1);
257        let pr = (rank - 1) as f64 / (state.partition_size - 1) as f64;
258        Ok(SqliteValue::Float(pr))
259    }
260
261    fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
262        self.value(&state)
263    }
264
265    fn num_args(&self) -> i32 {
266        -1
267    }
268
269    fn name(&self) -> &str {
270        "percent_rank"
271    }
272}
273
274// ═══════════════════════════════════════════════════════════════════════════
275// cume_dist()
276// ═══════════════════════════════════════════════════════════════════════════
277
278/// State for `cume_dist()`.
279///
280/// The VDBE must step() all rows first, then iterate with value()+inverse().
281/// cume_dist = (current_position) / partition_size.
282pub struct CumeDistState {
283    partition_size: i64,
284    current_row: i64,
285}
286
287pub struct CumeDistFunc;
288
289impl WindowFunction for CumeDistFunc {
290    type State = CumeDistState;
291
292    fn initial_state(&self) -> Self::State {
293        CumeDistState {
294            partition_size: 0,
295            current_row: 0,
296        }
297    }
298
299    fn step(&self, state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
300        state.partition_size += 1;
301        Ok(())
302    }
303
304    fn inverse(&self, state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
305        state.current_row += 1;
306        Ok(())
307    }
308
309    fn value(&self, state: &Self::State) -> Result<SqliteValue> {
310        if state.partition_size == 0 {
311            return Ok(SqliteValue::Float(0.0));
312        }
313        let cd = (state.current_row + 1) as f64 / state.partition_size as f64;
314        Ok(SqliteValue::Float(cd))
315    }
316
317    fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
318        self.value(&state)
319    }
320
321    fn num_args(&self) -> i32 {
322        -1
323    }
324
325    fn name(&self) -> &str {
326        "cume_dist"
327    }
328}
329
330// ═══════════════════════════════════════════════════════════════════════════
331// ntile(N)
332// ═══════════════════════════════════════════════════════════════════════════
333
334/// State for `ntile(N)`.
335///
336/// The VDBE must step() all rows first, then iterate with value()+inverse().
337/// Distributes rows into N groups.  If partition_size % N != 0, the first
338/// (partition_size % N) groups get one extra row.
339pub struct NtileState {
340    partition_size: i64,
341    n: i64,
342    current_row: i64,
343}
344
345pub struct NtileFunc;
346
347impl WindowFunction for NtileFunc {
348    type State = NtileState;
349
350    fn initial_state(&self) -> Self::State {
351        NtileState {
352            partition_size: 0,
353            n: 1,
354            current_row: 0,
355        }
356    }
357
358    fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
359        state.partition_size += 1;
360        if state.partition_size == 1 {
361            let n = args.first().map_or(1, |v| v.to_integer().max(1));
362            state.n = n;
363        }
364        Ok(())
365    }
366
367    fn inverse(&self, state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
368        state.current_row += 1;
369        Ok(())
370    }
371
372    fn value(&self, state: &Self::State) -> Result<SqliteValue> {
373        if state.n <= 0 || state.partition_size == 0 {
374            return Ok(SqliteValue::Integer(1));
375        }
376        let n = state.n;
377        let sz = state.partition_size;
378        let row = state.current_row + 1; // 1-based
379
380        // Group size: first (sz % n) groups get (sz / n + 1) rows,
381        // remaining groups get (sz / n) rows.
382        let base = sz / n;
383        let extra = sz % n;
384        // Rows in "large" groups: extra * (base + 1).
385        let large_rows = extra * (base + 1);
386
387        let bucket = if row <= large_rows {
388            // In one of the first `extra` groups (each of size base+1).
389            (row - 1) / (base + 1) + 1
390        } else {
391            // In one of the remaining groups (each of size base).
392            let adjusted = row - large_rows;
393            if base == 0 {
394                // More buckets than rows; each remaining row gets its own bucket.
395                extra + adjusted
396            } else {
397                extra + (adjusted - 1) / base + 1
398            }
399        };
400        Ok(SqliteValue::Integer(bucket))
401    }
402
403    fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
404        self.value(&state)
405    }
406
407    fn num_args(&self) -> i32 {
408        1
409    }
410
411    fn name(&self) -> &str {
412        "ntile"
413    }
414}
415
416// ═══════════════════════════════════════════════════════════════════════════
417// lag(X [, offset [, default]])
418// ═══════════════════════════════════════════════════════════════════════════
419
420/// State for `lag()`: maintains a buffer of previous values.
421pub struct LagState {
422    buffer: VecDeque<SqliteValue>,
423    offset: i64,
424    default_val: SqliteValue,
425    row_number: i64,
426}
427
428pub struct LagFunc;
429
430impl WindowFunction for LagFunc {
431    type State = LagState;
432
433    fn initial_state(&self) -> Self::State {
434        LagState {
435            buffer: VecDeque::new(),
436            offset: 1,
437            default_val: SqliteValue::Null,
438            row_number: 0,
439        }
440    }
441
442    fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
443        let val = args.first().cloned().unwrap_or(SqliteValue::Null);
444        // Capture offset and default on first call.
445        if state.row_number == 0 {
446            if let Some(off) = args.get(1) {
447                state.offset = off.to_integer().max(0);
448            }
449            if let Some(def) = args.get(2) {
450                state.default_val = def.clone();
451            }
452        }
453        state.row_number += 1;
454        state.buffer.push_back(val);
455        Ok(())
456    }
457
458    fn inverse(&self, _state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
459        Ok(())
460    }
461
462    fn value(&self, state: &Self::State) -> Result<SqliteValue> {
463        let idx = state.row_number - state.offset;
464        if idx < 1 || idx > state.buffer.len() as i64 {
465            return Ok(state.default_val.clone());
466        }
467        Ok(state.buffer[(idx - 1) as usize].clone())
468    }
469
470    fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
471        self.value(&state)
472    }
473
474    fn num_args(&self) -> i32 {
475        -1 // 1, 2, or 3 args
476    }
477
478    fn name(&self) -> &str {
479        "lag"
480    }
481}
482
483// ═══════════════════════════════════════════════════════════════════════════
484// lead(X [, offset [, default]])
485// ═══════════════════════════════════════════════════════════════════════════
486
487/// State for `lead()`: maintains a buffer and reads ahead.
488pub struct LeadState {
489    buffer: Vec<SqliteValue>,
490    offset: i64,
491    default_val: SqliteValue,
492    current_row: i64,
493}
494
495pub struct LeadFunc;
496
497impl WindowFunction for LeadFunc {
498    type State = LeadState;
499
500    fn initial_state(&self) -> Self::State {
501        LeadState {
502            buffer: Vec::new(),
503            offset: 1,
504            default_val: SqliteValue::Null,
505            current_row: 0,
506        }
507    }
508
509    fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
510        let val = args.first().cloned().unwrap_or(SqliteValue::Null);
511        if state.buffer.is_empty() {
512            if let Some(off) = args.get(1) {
513                state.offset = off.to_integer().max(0);
514            }
515            if let Some(def) = args.get(2) {
516                state.default_val = def.clone();
517            }
518        }
519        state.buffer.push(val);
520        Ok(())
521    }
522
523    fn inverse(&self, state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
524        state.current_row += 1;
525        Ok(())
526    }
527
528    fn value(&self, state: &Self::State) -> Result<SqliteValue> {
529        let target = state.current_row + state.offset;
530        if target < 0 || target >= state.buffer.len() as i64 {
531            return Ok(state.default_val.clone());
532        }
533        Ok(state.buffer[target as usize].clone())
534    }
535
536    fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
537        self.value(&state)
538    }
539
540    fn num_args(&self) -> i32 {
541        -1
542    }
543
544    fn name(&self) -> &str {
545        "lead"
546    }
547}
548
549// ═══════════════════════════════════════════════════════════════════════════
550// first_value(X)
551// ═══════════════════════════════════════════════════════════════════════════
552
553pub struct FirstValueState {
554    first: Option<SqliteValue>,
555}
556
557pub struct FirstValueFunc;
558
559impl WindowFunction for FirstValueFunc {
560    type State = FirstValueState;
561
562    fn initial_state(&self) -> Self::State {
563        FirstValueState { first: None }
564    }
565
566    fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
567        if state.first.is_none() {
568            state.first = Some(args.first().cloned().unwrap_or(SqliteValue::Null));
569        }
570        Ok(())
571    }
572
573    fn inverse(&self, state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
574        // When the first row exits the frame, we need to clear so the next
575        // step() captures the new first value.  For simplicity, we use a
576        // VecDeque-based approach in FirstValueFrameFunc below.  This basic
577        // version handles the common UNBOUNDED PRECEDING case correctly.
578        state.first = None;
579        Ok(())
580    }
581
582    fn value(&self, state: &Self::State) -> Result<SqliteValue> {
583        Ok(state.first.clone().unwrap_or(SqliteValue::Null))
584    }
585
586    fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
587        Ok(state.first.unwrap_or(SqliteValue::Null))
588    }
589
590    fn num_args(&self) -> i32 {
591        1
592    }
593
594    fn name(&self) -> &str {
595        "first_value"
596    }
597}
598
599// ═══════════════════════════════════════════════════════════════════════════
600// last_value(X)
601// ═══════════════════════════════════════════════════════════════════════════
602
603pub struct LastValueState {
604    frame: VecDeque<SqliteValue>,
605}
606
607pub struct LastValueFunc;
608
609impl WindowFunction for LastValueFunc {
610    type State = LastValueState;
611
612    fn initial_state(&self) -> Self::State {
613        LastValueState {
614            frame: VecDeque::new(),
615        }
616    }
617
618    fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
619        state
620            .frame
621            .push_back(args.first().cloned().unwrap_or(SqliteValue::Null));
622        Ok(())
623    }
624
625    fn inverse(&self, state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
626        state.frame.pop_front();
627        Ok(())
628    }
629
630    fn value(&self, state: &Self::State) -> Result<SqliteValue> {
631        Ok(state.frame.back().cloned().unwrap_or(SqliteValue::Null))
632    }
633
634    fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
635        Ok(state.frame.back().cloned().unwrap_or(SqliteValue::Null))
636    }
637
638    fn num_args(&self) -> i32 {
639        1
640    }
641
642    fn name(&self) -> &str {
643        "last_value"
644    }
645}
646
647// ═══════════════════════════════════════════════════════════════════════════
648// nth_value(X, N)
649// ═══════════════════════════════════════════════════════════════════════════
650
651pub struct NthValueState {
652    frame: VecDeque<SqliteValue>,
653    n: i64,
654}
655
656pub struct NthValueFunc;
657
658impl WindowFunction for NthValueFunc {
659    type State = NthValueState;
660
661    fn initial_state(&self) -> Self::State {
662        NthValueState {
663            frame: VecDeque::new(),
664            n: 1,
665        }
666    }
667
668    fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
669        let val = args.first().cloned().unwrap_or(SqliteValue::Null);
670        // Capture N from second arg on first call.
671        if state.frame.is_empty() {
672            if let Some(n_arg) = args.get(1) {
673                state.n = n_arg.to_integer();
674            }
675        }
676        state.frame.push_back(val);
677        Ok(())
678    }
679
680    fn inverse(&self, state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
681        state.frame.pop_front();
682        Ok(())
683    }
684
685    fn value(&self, state: &Self::State) -> Result<SqliteValue> {
686        // nth_value is 1-based.  N <= 0 is an error per SQLite docs,
687        // but we return NULL to be safe (the VDBE should validate N).
688        if state.n <= 0 {
689            return Ok(SqliteValue::Null);
690        }
691        let idx = (state.n - 1) as usize;
692        Ok(state.frame.get(idx).cloned().unwrap_or(SqliteValue::Null))
693    }
694
695    fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
696        self.value(&state)
697    }
698
699    fn num_args(&self) -> i32 {
700        2
701    }
702
703    fn name(&self) -> &str {
704        "nth_value"
705    }
706}
707
708// ── Registration ──────────────────────────────────────────────────────────
709
710/// Register all S13.5 window functions.
711pub fn register_window_builtins(registry: &mut FunctionRegistry) {
712    registry.register_window(RowNumberFunc);
713    registry.register_window(RankFunc);
714    registry.register_window(DenseRankFunc);
715    registry.register_window(PercentRankFunc);
716    registry.register_window(CumeDistFunc);
717    registry.register_window(NtileFunc);
718    registry.register_window(LagFunc);
719    registry.register_window(LeadFunc);
720    registry.register_window(FirstValueFunc);
721    registry.register_window(LastValueFunc);
722    registry.register_window(NthValueFunc);
723}
724
725// ── Tests ─────────────────────────────────────────────────────────────────
726
727#[cfg(test)]
728mod tests {
729    use super::*;
730
731    fn int(v: i64) -> SqliteValue {
732        SqliteValue::Integer(v)
733    }
734
735    fn text(s: &str) -> SqliteValue {
736        SqliteValue::Text(s.to_owned())
737    }
738
739    fn null() -> SqliteValue {
740        SqliteValue::Null
741    }
742
743    /// Simulate a partition by calling step() for each row, collecting
744    /// value() after each step.  Returns the vector of per-row results.
745    /// Suitable for progressive functions (row_number, rank, dense_rank).
746    fn run_window_partition<F: WindowFunction>(
747        func: &F,
748        rows: &[Vec<SqliteValue>],
749    ) -> Vec<SqliteValue> {
750        let mut state = func.initial_state();
751        let mut results = Vec::new();
752        for row in rows {
753            func.step(&mut state, row).unwrap();
754            results.push(func.value(&state).unwrap());
755        }
756        results
757    }
758
759    /// Two-pass partition simulation: step all rows first (pass 1), then
760    /// iterate calling value()+inverse() for each row (pass 2).
761    /// Required for functions that need full partition size (ntile,
762    /// percent_rank, cume_dist).
763    fn run_window_two_pass<F: WindowFunction>(
764        func: &F,
765        rows: &[Vec<SqliteValue>],
766    ) -> Vec<SqliteValue> {
767        let mut state = func.initial_state();
768        // Pass 1: step all rows.
769        for row in rows {
770            func.step(&mut state, row).unwrap();
771        }
772        // Pass 2: read values, advance cursor via inverse().
773        let mut results = Vec::new();
774        for (i, _) in rows.iter().enumerate() {
775            results.push(func.value(&state).unwrap());
776            if i < rows.len() - 1 {
777                func.inverse(&mut state, &[]).unwrap();
778            }
779        }
780        results
781    }
782
783    // ── row_number ───────────────────────────────────────────────────
784
785    #[test]
786    fn test_row_number_basic() {
787        let results =
788            run_window_partition(&RowNumberFunc, &[vec![], vec![], vec![], vec![], vec![]]);
789        assert_eq!(results, vec![int(1), int(2), int(3), int(4), int(5)]);
790    }
791
792    #[test]
793    fn test_row_number_partition_reset() {
794        // Partition 1: 3 rows.
795        let r1 = run_window_partition(&RowNumberFunc, &[vec![], vec![], vec![]]);
796        assert_eq!(r1, vec![int(1), int(2), int(3)]);
797
798        // Partition 2: 2 rows (fresh state).
799        let r2 = run_window_partition(&RowNumberFunc, &[vec![], vec![]]);
800        assert_eq!(r2, vec![int(1), int(2)]);
801    }
802
803    // ── rank ─────────────────────────────────────────────────────────
804
805    #[test]
806    fn test_rank_with_ties() {
807        // Values: [1, 2, 2, 3] -> ranks: [1, 2, 2, 4]
808        let results = run_window_partition(
809            &RankFunc,
810            &[vec![int(1)], vec![int(2)], vec![int(2)], vec![int(3)]],
811        );
812        assert_eq!(results, vec![int(1), int(2), int(2), int(4)]);
813    }
814
815    #[test]
816    fn test_rank_no_ties() {
817        let results =
818            run_window_partition(&RankFunc, &[vec![int(10)], vec![int(20)], vec![int(30)]]);
819        assert_eq!(results, vec![int(1), int(2), int(3)]);
820    }
821
822    // ── dense_rank ───────────────────────────────────────────────────
823
824    #[test]
825    fn test_dense_rank_with_ties() {
826        // Values: [1, 2, 2, 3] -> dense_ranks: [1, 2, 2, 3]
827        let results = run_window_partition(
828            &DenseRankFunc,
829            &[vec![int(1)], vec![int(2)], vec![int(2)], vec![int(3)]],
830        );
831        assert_eq!(results, vec![int(1), int(2), int(2), int(3)]);
832    }
833
834    #[test]
835    fn test_dense_rank_multiple_ties() {
836        // Values: [1, 1, 2, 2, 3] -> dense_ranks: [1, 1, 2, 2, 3]
837        let results = run_window_partition(
838            &DenseRankFunc,
839            &[
840                vec![int(1)],
841                vec![int(1)],
842                vec![int(2)],
843                vec![int(2)],
844                vec![int(3)],
845            ],
846        );
847        assert_eq!(results, vec![int(1), int(1), int(2), int(2), int(3)]);
848    }
849
850    // ── percent_rank ─────────────────────────────────────────────────
851
852    #[test]
853    fn test_percent_rank_single_row() {
854        let results = run_window_two_pass(&PercentRankFunc, &[vec![int(1)]]);
855        assert_eq!(results, vec![SqliteValue::Float(0.0)]);
856    }
857
858    #[test]
859    fn test_percent_rank_formula() {
860        // 4 rows, values [1, 2, 2, 3] -> ranks [1, 2, 2, 4]
861        // percent_rank = (rank - 1) / (N - 1) = (rank - 1) / 3
862        let results = run_window_two_pass(
863            &PercentRankFunc,
864            &[vec![int(1)], vec![int(2)], vec![int(2)], vec![int(3)]],
865        );
866        // Row 1: (1-1)/3 = 0.0
867        // Row 2: (2-1)/3 = 0.333...
868        // Row 3: (2-1)/3 = 0.333... (same rank as row 2)
869        // Row 4: (4-1)/3 = 1.0
870        match &results[0] {
871            SqliteValue::Float(v) => assert!((*v - 0.0).abs() < 1e-10),
872            other => panic!("expected Float, got {other:?}"),
873        }
874        match &results[1] {
875            SqliteValue::Float(v) => assert!((*v - 1.0 / 3.0).abs() < 1e-10),
876            other => panic!("expected Float, got {other:?}"),
877        }
878        match &results[2] {
879            SqliteValue::Float(v) => assert!((*v - 1.0 / 3.0).abs() < 1e-10),
880            other => panic!("expected Float, got {other:?}"),
881        }
882        match &results[3] {
883            SqliteValue::Float(v) => assert!((*v - 1.0).abs() < 1e-10),
884            other => panic!("expected Float, got {other:?}"),
885        }
886    }
887
888    // ── cume_dist ────────────────────────────────────────────────────
889
890    #[test]
891    fn test_cume_dist_distinct() {
892        // 4 distinct values: cume_dist = [0.25, 0.5, 0.75, 1.0]
893        let results = run_window_two_pass(
894            &CumeDistFunc,
895            &[vec![int(1)], vec![int(2)], vec![int(3)], vec![int(4)]],
896        );
897        for (i, expected) in [0.25, 0.5, 0.75, 1.0].iter().enumerate() {
898            match &results[i] {
899                SqliteValue::Float(v) => {
900                    assert!(
901                        (*v - expected).abs() < 1e-10,
902                        "row {i}: expected {expected}, got {v}"
903                    );
904                }
905                other => panic!("expected Float, got {other:?}"),
906            }
907        }
908    }
909
910    // ── ntile ────────────────────────────────────────────────────────
911
912    #[test]
913    fn test_ntile_even() {
914        // ntile(4) over 8 rows: groups of 2 each -> [1,1,2,2,3,3,4,4]
915        let rows: Vec<Vec<SqliteValue>> = (0..8).map(|_| vec![int(4)]).collect();
916        let results = run_window_two_pass(&NtileFunc, &rows);
917        assert_eq!(
918            results,
919            vec![
920                int(1),
921                int(1),
922                int(2),
923                int(2),
924                int(3),
925                int(3),
926                int(4),
927                int(4)
928            ]
929        );
930    }
931
932    #[test]
933    fn test_ntile_uneven() {
934        // ntile(3) over 10 rows: groups of 4,3,3
935        let rows: Vec<Vec<SqliteValue>> = (0..10).map(|_| vec![int(3)]).collect();
936        let results = run_window_two_pass(&NtileFunc, &rows);
937        assert_eq!(
938            results,
939            vec![
940                int(1),
941                int(1),
942                int(1),
943                int(1),
944                int(2),
945                int(2),
946                int(2),
947                int(3),
948                int(3),
949                int(3)
950            ]
951        );
952    }
953
954    #[test]
955    fn test_ntile_more_buckets_than_rows() {
956        // ntile(10) over 3 rows: [1, 2, 3]
957        let rows: Vec<Vec<SqliteValue>> = (0..3).map(|_| vec![int(10)]).collect();
958        let results = run_window_two_pass(&NtileFunc, &rows);
959        assert_eq!(results, vec![int(1), int(2), int(3)]);
960    }
961
962    // ── lag ──────────────────────────────────────────────────────────
963
964    #[test]
965    fn test_lag_default() {
966        // lag(X) with default offset=1: previous row's value, NULL for first.
967        let results =
968            run_window_partition(&LagFunc, &[vec![int(10)], vec![int(20)], vec![int(30)]]);
969        assert_eq!(results, vec![null(), int(10), int(20)]);
970    }
971
972    #[test]
973    fn test_lag_offset_3() {
974        // lag(X, 3): 3 rows back.
975        let results = run_window_partition(
976            &LagFunc,
977            &[
978                vec![int(10), int(3)],
979                vec![int(20), int(3)],
980                vec![int(30), int(3)],
981                vec![int(40), int(3)],
982                vec![int(50), int(3)],
983            ],
984        );
985        assert_eq!(results, vec![null(), null(), null(), int(10), int(20)]);
986    }
987
988    #[test]
989    fn test_lag_default_value() {
990        // lag(X, 1, -1): returns -1 when no previous row.
991        let results = run_window_partition(
992            &LagFunc,
993            &[
994                vec![int(10), int(1), int(-1)],
995                vec![int(20), int(1), int(-1)],
996            ],
997        );
998        assert_eq!(results, vec![int(-1), int(10)]);
999    }
1000
1001    // ── lead ─────────────────────────────────────────────────────────
1002
1003    #[test]
1004    fn test_lead_default() {
1005        // lead(X): next row's value, NULL for last.
1006        // For lead, we need to step all rows first (to build the buffer),
1007        // then call inverse + value for each row.
1008        let func = LeadFunc;
1009        let mut state = func.initial_state();
1010        let rows = [int(10), int(20), int(30)];
1011
1012        // Step all rows to build the buffer.
1013        for row in &rows {
1014            func.step(&mut state, std::slice::from_ref(row)).unwrap();
1015        }
1016
1017        // Now iterate: first row is at current_row=0.
1018        let mut results = Vec::new();
1019        for _ in &rows {
1020            results.push(func.value(&state).unwrap());
1021            func.inverse(&mut state, &[]).unwrap();
1022        }
1023        assert_eq!(results, vec![int(20), int(30), null()]);
1024    }
1025
1026    #[test]
1027    fn test_lead_offset_2() {
1028        let func = LeadFunc;
1029        let mut state = func.initial_state();
1030        let rows = [int(10), int(20), int(30), int(40), int(50)];
1031
1032        for row in &rows {
1033            func.step(&mut state, &[row.clone(), int(2)]).unwrap();
1034        }
1035
1036        let mut results = Vec::new();
1037        for _ in &rows {
1038            results.push(func.value(&state).unwrap());
1039            func.inverse(&mut state, &[]).unwrap();
1040        }
1041        assert_eq!(results, vec![int(30), int(40), int(50), null(), null()]);
1042    }
1043
1044    #[test]
1045    fn test_lead_default_value() {
1046        let func = LeadFunc;
1047        let mut state = func.initial_state();
1048        let rows = [int(10), int(20)];
1049
1050        for row in &rows {
1051            func.step(&mut state, &[row.clone(), int(1), text("N/A")])
1052                .unwrap();
1053        }
1054
1055        let mut results = Vec::new();
1056        for _ in &rows {
1057            results.push(func.value(&state).unwrap());
1058            func.inverse(&mut state, &[]).unwrap();
1059        }
1060        assert_eq!(results, vec![int(20), text("N/A")]);
1061    }
1062
1063    // ── first_value ──────────────────────────────────────────────────
1064
1065    #[test]
1066    fn test_first_value_basic() {
1067        let results = run_window_partition(
1068            &FirstValueFunc,
1069            &[vec![int(10)], vec![int(20)], vec![int(30)]],
1070        );
1071        // With default frame (UNBOUNDED PRECEDING to CURRENT ROW),
1072        // first_value is always the first row's value.
1073        assert_eq!(results, vec![int(10), int(10), int(10)]);
1074    }
1075
1076    // ── last_value ───────────────────────────────────────────────────
1077
1078    #[test]
1079    fn test_last_value_default_frame() {
1080        // With default frame (RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW),
1081        // last_value returns the current row's value.
1082        let results = run_window_partition(
1083            &LastValueFunc,
1084            &[vec![int(10)], vec![int(20)], vec![int(30)]],
1085        );
1086        assert_eq!(results, vec![int(10), int(20), int(30)]);
1087    }
1088
1089    #[test]
1090    fn test_last_value_unbounded_following() {
1091        // With UNBOUNDED FOLLOWING frame, step all rows first,
1092        // then value() returns the true last value.
1093        let func = LastValueFunc;
1094        let mut state = func.initial_state();
1095        func.step(&mut state, &[int(10)]).unwrap();
1096        func.step(&mut state, &[int(20)]).unwrap();
1097        func.step(&mut state, &[int(30)]).unwrap();
1098        assert_eq!(func.value(&state).unwrap(), int(30));
1099    }
1100
1101    // ── nth_value ────────────────────────────────────────────────────
1102
1103    #[test]
1104    fn test_nth_value_basic() {
1105        let func = NthValueFunc;
1106        let mut state = func.initial_state();
1107        // Step 5 rows; N=3.
1108        func.step(&mut state, &[int(10), int(3)]).unwrap();
1109        func.step(&mut state, &[int(20), int(3)]).unwrap();
1110        func.step(&mut state, &[int(30), int(3)]).unwrap();
1111        func.step(&mut state, &[int(40), int(3)]).unwrap();
1112        func.step(&mut state, &[int(50), int(3)]).unwrap();
1113        assert_eq!(func.value(&state).unwrap(), int(30));
1114    }
1115
1116    #[test]
1117    fn test_nth_value_out_of_range() {
1118        let func = NthValueFunc;
1119        let mut state = func.initial_state();
1120        func.step(&mut state, &[int(10), int(100)]).unwrap();
1121        func.step(&mut state, &[int(20), int(100)]).unwrap();
1122        // Frame has 2 rows but N=100.
1123        assert_eq!(func.value(&state).unwrap(), null());
1124    }
1125
1126    #[test]
1127    fn test_nth_value_n_zero() {
1128        // nth_value(X, 0) returns NULL (0 is invalid, 1-based).
1129        let func = NthValueFunc;
1130        let mut state = func.initial_state();
1131        func.step(&mut state, &[int(10), int(0)]).unwrap();
1132        assert_eq!(func.value(&state).unwrap(), null());
1133    }
1134
1135    // ── Registration ─────────────────────────────────────────────────
1136
1137    #[test]
1138    fn test_register_window_builtins_all_present() {
1139        let mut reg = FunctionRegistry::new();
1140        register_window_builtins(&mut reg);
1141
1142        let expected_variadic = [
1143            "row_number",
1144            "rank",
1145            "dense_rank",
1146            "percent_rank",
1147            "cume_dist",
1148            "lag",
1149            "lead",
1150        ];
1151        for name in expected_variadic {
1152            assert!(
1153                reg.find_window(name, 0).is_some()
1154                    || reg.find_window(name, 1).is_some()
1155                    || reg.find_window(name, -1).is_some(),
1156                "window function '{name}' not registered"
1157            );
1158        }
1159
1160        assert!(
1161            reg.find_window("ntile", 1).is_some(),
1162            "ntile(1) not registered"
1163        );
1164        assert!(
1165            reg.find_window("first_value", 1).is_some(),
1166            "first_value(1) not registered"
1167        );
1168        assert!(
1169            reg.find_window("last_value", 1).is_some(),
1170            "last_value(1) not registered"
1171        );
1172        assert!(
1173            reg.find_window("nth_value", 2).is_some(),
1174            "nth_value(2) not registered"
1175        );
1176    }
1177
1178    // ── E2E: full lifecycle through registry ─────────────────────────
1179
1180    #[test]
1181    fn test_e2e_window_row_number_through_registry() {
1182        let mut reg = FunctionRegistry::new();
1183        register_window_builtins(&mut reg);
1184
1185        let rn = reg.find_window("row_number", 0).unwrap();
1186        let mut state = rn.initial_state();
1187        rn.step(&mut state, &[]).unwrap();
1188        assert_eq!(rn.value(&state).unwrap(), int(1));
1189        rn.step(&mut state, &[]).unwrap();
1190        assert_eq!(rn.value(&state).unwrap(), int(2));
1191        rn.step(&mut state, &[]).unwrap();
1192        assert_eq!(rn.value(&state).unwrap(), int(3));
1193    }
1194
1195    #[test]
1196    fn test_e2e_window_rank_through_registry() {
1197        let mut reg = FunctionRegistry::new();
1198        register_window_builtins(&mut reg);
1199
1200        let rank = reg.find_window("rank", 1).unwrap();
1201        let mut state = rank.initial_state();
1202        // [1, 2, 2, 3] -> [1, 2, 2, 4]
1203        rank.step(&mut state, &[int(1)]).unwrap();
1204        assert_eq!(rank.value(&state).unwrap(), int(1));
1205        rank.step(&mut state, &[int(2)]).unwrap();
1206        assert_eq!(rank.value(&state).unwrap(), int(2));
1207        rank.step(&mut state, &[int(2)]).unwrap();
1208        assert_eq!(rank.value(&state).unwrap(), int(2));
1209        rank.step(&mut state, &[int(3)]).unwrap();
1210        assert_eq!(rank.value(&state).unwrap(), int(4));
1211    }
1212}