Skip to main content

fsqlite_func/
window_builtins.rs

1//! Built-in window functions (S13.5, bd-14i6).
2//!
3//! Implements: row_number, rank, dense_rank, percent_rank, cume_dist,
4//! ntile, lag, lead, first_value, last_value, nth_value, plus
5//! aggregate-as-window variants including group_concat and string_agg.
6//!
7//! These functions implement the [`WindowFunction`] trait.  The VDBE is
8//! responsible for partitioning, ordering, and frame management; these
9//! implementations provide the per-row computation logic.
10//!
11//! # Design Notes
12//!
13//! Pure-numbering functions (row_number, rank, dense_rank) track position
14//! via step() and expose the current value through value().  The ORDER BY
15//! column is passed as args\[0\] so the function can detect peer-group
16//! boundaries.
17//!
18//! Buffer-based functions (lag, lead, first_value, last_value, nth_value)
19//! maintain an internal VecDeque of values and expose frame-relative
20//! access through value().
21#![allow(
22    clippy::unnecessary_literal_bound,
23    clippy::cast_possible_truncation,
24    clippy::cast_possible_wrap,
25    clippy::cast_precision_loss,
26    clippy::cast_sign_loss,
27    clippy::items_after_statements,
28    clippy::float_cmp,
29    clippy::match_same_arms,
30    clippy::similar_names
31)]
32
33use std::collections::VecDeque;
34
35use fsqlite_error::{FrankenError, Result};
36use fsqlite_types::SqliteValue;
37
38use crate::{FunctionRegistry, WindowFunction};
39
40// ═══════════════════════════════════════════════════════════════════════════
41// row_number()
42// ═══════════════════════════════════════════════════════════════════════════
43
44pub struct RowNumberState {
45    counter: i64,
46}
47
48pub struct RowNumberFunc;
49
50impl WindowFunction for RowNumberFunc {
51    type State = RowNumberState;
52
53    fn initial_state(&self) -> Self::State {
54        RowNumberState { counter: 0 }
55    }
56
57    fn step(&self, state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
58        state.counter += 1;
59        Ok(())
60    }
61
62    fn inverse(&self, state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
63        state.counter -= 1;
64        Ok(())
65    }
66
67    fn value(&self, state: &Self::State) -> Result<SqliteValue> {
68        Ok(SqliteValue::Integer(state.counter))
69    }
70
71    fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
72        Ok(SqliteValue::Integer(state.counter))
73    }
74
75    fn num_args(&self) -> i32 {
76        0
77    }
78
79    fn name(&self) -> &str {
80        "row_number"
81    }
82}
83
84// ═══════════════════════════════════════════════════════════════════════════
85// rank()
86// ═══════════════════════════════════════════════════════════════════════════
87
88pub struct RankState {
89    row_number: i64,
90    rank: i64,
91    last_order_value: Option<SqliteValue>,
92}
93
94pub struct RankFunc;
95
96impl WindowFunction for RankFunc {
97    type State = RankState;
98
99    fn initial_state(&self) -> Self::State {
100        RankState {
101            row_number: 0,
102            rank: 0,
103            last_order_value: None,
104        }
105    }
106
107    fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
108        state.row_number += 1;
109        let current = args.first().cloned().unwrap_or(SqliteValue::Null);
110        let is_new_peer = match &state.last_order_value {
111            None => true,
112            Some(last) => &current != last,
113        };
114        if is_new_peer {
115            state.rank = state.row_number;
116            state.last_order_value = Some(current);
117        }
118        Ok(())
119    }
120
121    fn inverse(&self, _state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
122        // rank() uses UNBOUNDED PRECEDING to CURRENT ROW; inverse is a no-op.
123        Ok(())
124    }
125
126    fn value(&self, state: &Self::State) -> Result<SqliteValue> {
127        Ok(SqliteValue::Integer(state.rank))
128    }
129
130    fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
131        Ok(SqliteValue::Integer(state.rank))
132    }
133
134    fn num_args(&self) -> i32 {
135        -1
136    }
137
138    fn min_args(&self) -> i32 {
139        0
140    }
141
142    fn max_args(&self) -> Option<i32> {
143        Some(0)
144    }
145
146    fn name(&self) -> &str {
147        "rank"
148    }
149}
150
151// ═══════════════════════════════════════════════════════════════════════════
152// dense_rank()
153// ═══════════════════════════════════════════════════════════════════════════
154
155pub struct DenseRankState {
156    dense_rank: i64,
157    last_order_value: Option<SqliteValue>,
158}
159
160pub struct DenseRankFunc;
161
162impl WindowFunction for DenseRankFunc {
163    type State = DenseRankState;
164
165    fn initial_state(&self) -> Self::State {
166        DenseRankState {
167            dense_rank: 0,
168            last_order_value: None,
169        }
170    }
171
172    fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
173        let current = args.first().cloned().unwrap_or(SqliteValue::Null);
174        let is_new_peer = match &state.last_order_value {
175            None => true,
176            Some(last) => &current != last,
177        };
178        if is_new_peer {
179            state.dense_rank += 1;
180            state.last_order_value = Some(current);
181        }
182        Ok(())
183    }
184
185    fn inverse(&self, _state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
186        Ok(())
187    }
188
189    fn value(&self, state: &Self::State) -> Result<SqliteValue> {
190        Ok(SqliteValue::Integer(state.dense_rank))
191    }
192
193    fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
194        Ok(SqliteValue::Integer(state.dense_rank))
195    }
196
197    fn num_args(&self) -> i32 {
198        -1
199    }
200
201    fn min_args(&self) -> i32 {
202        0
203    }
204
205    fn max_args(&self) -> Option<i32> {
206        Some(0)
207    }
208
209    fn name(&self) -> &str {
210        "dense_rank"
211    }
212}
213
214// ═══════════════════════════════════════════════════════════════════════════
215// percent_rank()
216// ═══════════════════════════════════════════════════════════════════════════
217
218/// State for `percent_rank()`.
219///
220/// The VDBE must step() all rows in the partition first (to compute ranks
221/// and partition size), then iterate by calling value() and inverse() for
222/// each output row.
223pub struct PercentRankState {
224    partition_size: i64,
225    ranks: Vec<i64>,
226    cursor: usize,
227    step_row_number: i64,
228    current_rank: i64,
229    last_order_value: Option<SqliteValue>,
230}
231
232pub struct PercentRankFunc;
233
234impl WindowFunction for PercentRankFunc {
235    type State = PercentRankState;
236
237    fn initial_state(&self) -> Self::State {
238        PercentRankState {
239            partition_size: 0,
240            ranks: Vec::new(),
241            cursor: 0,
242            step_row_number: 0,
243            current_rank: 0,
244            last_order_value: None,
245        }
246    }
247
248    fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
249        state.step_row_number += 1;
250        state.partition_size += 1;
251        let current = args.first().cloned().unwrap_or(SqliteValue::Null);
252        let is_new_peer = match &state.last_order_value {
253            None => true,
254            Some(last) => &current != last,
255        };
256        if is_new_peer {
257            state.current_rank = state.step_row_number;
258            state.last_order_value = Some(current);
259        }
260        state.ranks.push(state.current_rank);
261        Ok(())
262    }
263
264    fn inverse(&self, state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
265        state.cursor += 1;
266        Ok(())
267    }
268
269    fn value(&self, state: &Self::State) -> Result<SqliteValue> {
270        if state.partition_size <= 1 {
271            return Ok(SqliteValue::Float(0.0));
272        }
273        let rank = state.ranks.get(state.cursor).copied().unwrap_or(1);
274        let pr = (rank - 1) as f64 / (state.partition_size - 1) as f64;
275        Ok(SqliteValue::Float(pr))
276    }
277
278    fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
279        self.value(&state)
280    }
281
282    fn num_args(&self) -> i32 {
283        -1
284    }
285
286    fn min_args(&self) -> i32 {
287        0
288    }
289
290    fn max_args(&self) -> Option<i32> {
291        Some(0)
292    }
293
294    fn name(&self) -> &str {
295        "percent_rank"
296    }
297}
298
299// ═══════════════════════════════════════════════════════════════════════════
300// cume_dist()
301// ═══════════════════════════════════════════════════════════════════════════
302
303/// State for `cume_dist()`.
304///
305/// The VDBE must step() all rows first, then iterate with value()+inverse().
306/// cume_dist = (last peer position) / partition_size.
307pub struct CumeDistState {
308    partition_size: i64,
309    current_row: usize,
310    cume_positions: Vec<i64>,
311    peer_start: usize,
312    last_order_value: Option<SqliteValue>,
313}
314
315pub struct CumeDistFunc;
316
317impl WindowFunction for CumeDistFunc {
318    type State = CumeDistState;
319
320    fn initial_state(&self) -> Self::State {
321        CumeDistState {
322            partition_size: 0,
323            current_row: 0,
324            cume_positions: Vec::new(),
325            peer_start: 0,
326            last_order_value: None,
327        }
328    }
329
330    fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
331        let current = args.first().cloned().unwrap_or(SqliteValue::Null);
332        let is_new_peer = match &state.last_order_value {
333            None => true,
334            Some(last) => &current != last,
335        };
336        if is_new_peer {
337            let peer_end = state.partition_size;
338            if let Some(slots) = state.cume_positions.get_mut(state.peer_start..) {
339                for slot in slots {
340                    *slot = peer_end;
341                }
342            }
343            state.peer_start = state.cume_positions.len();
344            state.last_order_value = Some(current);
345        }
346        state.partition_size += 1;
347        state.cume_positions.push(0);
348        Ok(())
349    }
350
351    fn inverse(&self, state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
352        state.current_row += 1;
353        Ok(())
354    }
355
356    fn value(&self, state: &Self::State) -> Result<SqliteValue> {
357        if state.partition_size == 0 {
358            return Ok(SqliteValue::Float(0.0));
359        }
360        let peer_end = state
361            .cume_positions
362            .get(state.current_row)
363            .copied()
364            .filter(|position| *position != 0)
365            .unwrap_or(state.partition_size);
366        let cd = peer_end as f64 / state.partition_size as f64;
367        Ok(SqliteValue::Float(cd))
368    }
369
370    fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
371        self.value(&state)
372    }
373
374    fn num_args(&self) -> i32 {
375        -1
376    }
377
378    fn min_args(&self) -> i32 {
379        0
380    }
381
382    fn max_args(&self) -> Option<i32> {
383        Some(0)
384    }
385
386    fn name(&self) -> &str {
387        "cume_dist"
388    }
389}
390
391// ═══════════════════════════════════════════════════════════════════════════
392// ntile(N)
393// ═══════════════════════════════════════════════════════════════════════════
394
395/// State for `ntile(N)`.
396///
397/// The VDBE must step() all rows first, then iterate with value()+inverse().
398/// Distributes rows into N groups.  If partition_size % N != 0, the first
399/// (partition_size % N) groups get one extra row.
400pub struct NtileState {
401    partition_size: i64,
402    n: i64,
403    current_row: i64,
404}
405
406pub struct NtileFunc;
407
408const INVALID_NTILE_ARGUMENT: &str = "argument of ntile must be a positive integer";
409
410impl WindowFunction for NtileFunc {
411    type State = NtileState;
412
413    fn initial_state(&self) -> Self::State {
414        NtileState {
415            partition_size: 0,
416            n: 1,
417            current_row: 0,
418        }
419    }
420
421    fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
422        if state.partition_size == 0 {
423            let n = args.first().map_or(0, SqliteValue::to_integer);
424            if n <= 0 {
425                return Err(FrankenError::function_error(INVALID_NTILE_ARGUMENT));
426            }
427            state.n = n;
428        }
429        state.partition_size += 1;
430        Ok(())
431    }
432
433    fn inverse(&self, state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
434        state.current_row += 1;
435        Ok(())
436    }
437
438    fn value(&self, state: &Self::State) -> Result<SqliteValue> {
439        if state.partition_size == 0 {
440            return Ok(SqliteValue::Integer(1));
441        }
442        let n = state.n;
443        let sz = state.partition_size;
444        let row = state.current_row + 1; // 1-based
445
446        // Group size: first (sz % n) groups get (sz / n + 1) rows,
447        // remaining groups get (sz / n) rows.
448        let base = sz / n;
449        let extra = sz % n;
450        // Rows in "large" groups: extra * (base + 1).
451        let large_rows = extra * (base + 1);
452
453        let bucket = if row <= large_rows {
454            // In one of the first `extra` groups (each of size base+1).
455            (row - 1) / (base + 1) + 1
456        } else {
457            // In one of the remaining groups (each of size base).
458            let adjusted = row - large_rows;
459            if base == 0 {
460                // More buckets than rows; each remaining row gets its own bucket.
461                extra + adjusted
462            } else {
463                extra + (adjusted - 1) / base + 1
464            }
465        };
466        Ok(SqliteValue::Integer(bucket))
467    }
468
469    fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
470        self.value(&state)
471    }
472
473    fn num_args(&self) -> i32 {
474        1
475    }
476
477    fn name(&self) -> &str {
478        "ntile"
479    }
480}
481
482// ═══════════════════════════════════════════════════════════════════════════
483// lag(X [, offset [, default]])
484// ═══════════════════════════════════════════════════════════════════════════
485
486fn numeric_prefix_len(bytes: &[u8]) -> usize {
487    let mut idx = 0;
488    if bytes
489        .get(idx)
490        .is_some_and(|byte| matches!(*byte, b'+' | b'-'))
491    {
492        idx += 1;
493    }
494
495    let mut saw_digit = false;
496    while bytes.get(idx).is_some_and(u8::is_ascii_digit) {
497        idx += 1;
498        saw_digit = true;
499    }
500
501    if bytes.get(idx) == Some(&b'.') {
502        idx += 1;
503        while bytes.get(idx).is_some_and(u8::is_ascii_digit) {
504            idx += 1;
505            saw_digit = true;
506        }
507    }
508
509    if !saw_digit {
510        return 0;
511    }
512
513    let mantissa_end = idx;
514    if bytes
515        .get(idx)
516        .is_some_and(|byte| matches!(*byte, b'e' | b'E'))
517    {
518        idx += 1;
519        if bytes
520            .get(idx)
521            .is_some_and(|byte| matches!(*byte, b'+' | b'-'))
522        {
523            idx += 1;
524        }
525        let exp_start = idx;
526        while bytes.get(idx).is_some_and(u8::is_ascii_digit) {
527            idx += 1;
528        }
529        if idx == exp_start {
530            return mantissa_end;
531        }
532    }
533
534    idx
535}
536
537fn trim_ascii_start(bytes: &[u8]) -> &[u8] {
538    let start = bytes
539        .iter()
540        .position(|byte| !byte.is_ascii_whitespace())
541        .unwrap_or(bytes.len());
542    bytes.get(start..).unwrap_or(&[])
543}
544
545fn lag_lead_bytes_offset(bytes: &[u8]) -> Option<i64> {
546    let trimmed = trim_ascii_start(bytes);
547    let prefix_len = numeric_prefix_len(trimmed);
548    if prefix_len == 0 {
549        return Some(0);
550    }
551    let prefix = trimmed
552        .get(..prefix_len)
553        .and_then(|bytes| std::str::from_utf8(bytes).ok())?;
554    if prefix
555        .as_bytes()
556        .iter()
557        .any(|byte| matches!(*byte, b'.' | b'e' | b'E'))
558    {
559        prefix.parse().ok().and_then(integral_f64_to_i64)
560    } else {
561        prefix
562            .parse()
563            .ok()
564            .or_else(|| prefix.parse().ok().and_then(integral_f64_to_i64))
565    }
566}
567
568fn lag_lead_text_offset(text: &str) -> Option<i64> {
569    lag_lead_bytes_offset(text.as_bytes())
570}
571
572fn lag_lead_offset_arg(value: Option<&SqliteValue>) -> Option<i64> {
573    match value {
574        None => Some(1),
575        Some(SqliteValue::Null) => None,
576        Some(SqliteValue::Integer(offset)) => Some(*offset),
577        Some(SqliteValue::Float(offset)) => integral_f64_to_i64(*offset),
578        Some(SqliteValue::Text(text)) => lag_lead_text_offset(text),
579        Some(SqliteValue::Blob(bytes)) => lag_lead_bytes_offset(bytes),
580    }
581}
582
583/// State for `lag()`: maintains a buffer of previous values.
584pub struct LagState {
585    buffer: Vec<SqliteValue>,
586    offsets: Vec<Option<i64>>,
587    defaults: Vec<SqliteValue>,
588    current_row: i64,
589}
590
591pub struct LagFunc;
592
593impl WindowFunction for LagFunc {
594    type State = LagState;
595
596    fn initial_state(&self) -> Self::State {
597        LagState {
598            buffer: Vec::new(),
599            offsets: Vec::new(),
600            defaults: Vec::new(),
601            current_row: 0,
602        }
603    }
604
605    fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
606        let val = args.first().cloned().unwrap_or(SqliteValue::Null);
607        let offset = lag_lead_offset_arg(args.get(1));
608        let default_val = args.get(2).cloned().unwrap_or(SqliteValue::Null);
609        state.buffer.push(val);
610        state.offsets.push(offset);
611        state.defaults.push(default_val);
612        Ok(())
613    }
614
615    fn inverse(&self, state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
616        state.current_row += 1;
617        Ok(())
618    }
619
620    fn value(&self, state: &Self::State) -> Result<SqliteValue> {
621        let current_index = usize::try_from(state.current_row).unwrap_or(usize::MAX);
622        let default_val = state
623            .defaults
624            .get(current_index)
625            .cloned()
626            .unwrap_or(SqliteValue::Null);
627        let Some(offset) = state.offsets.get(current_index).copied().flatten() else {
628            return Ok(default_val);
629        };
630        let target = state.current_row - offset;
631        let Ok(target_index) = usize::try_from(target) else {
632            return Ok(default_val);
633        };
634        Ok(state
635            .buffer
636            .get(target_index)
637            .cloned()
638            .unwrap_or(default_val))
639    }
640
641    fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
642        self.value(&state)
643    }
644
645    fn num_args(&self) -> i32 {
646        -1 // 1, 2, or 3 args
647    }
648
649    fn min_args(&self) -> i32 {
650        1
651    }
652
653    fn max_args(&self) -> Option<i32> {
654        Some(3)
655    }
656
657    fn name(&self) -> &str {
658        "lag"
659    }
660}
661
662// ═══════════════════════════════════════════════════════════════════════════
663// lead(X [, offset [, default]])
664// ═══════════════════════════════════════════════════════════════════════════
665
666/// State for `lead()`: maintains a buffer and reads ahead.
667pub struct LeadState {
668    buffer: Vec<SqliteValue>,
669    offsets: Vec<Option<i64>>,
670    defaults: Vec<SqliteValue>,
671    current_row: i64,
672}
673
674pub struct LeadFunc;
675
676impl WindowFunction for LeadFunc {
677    type State = LeadState;
678
679    fn initial_state(&self) -> Self::State {
680        LeadState {
681            buffer: Vec::new(),
682            offsets: Vec::new(),
683            defaults: Vec::new(),
684            current_row: 0,
685        }
686    }
687
688    fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
689        let val = args.first().cloned().unwrap_or(SqliteValue::Null);
690        let offset = lag_lead_offset_arg(args.get(1));
691        let default_val = args.get(2).cloned().unwrap_or(SqliteValue::Null);
692        state.buffer.push(val);
693        state.offsets.push(offset);
694        state.defaults.push(default_val);
695        Ok(())
696    }
697
698    fn inverse(&self, state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
699        state.current_row += 1;
700        Ok(())
701    }
702
703    fn value(&self, state: &Self::State) -> Result<SqliteValue> {
704        let current_index = usize::try_from(state.current_row).unwrap_or(usize::MAX);
705        let default_val = state
706            .defaults
707            .get(current_index)
708            .cloned()
709            .unwrap_or(SqliteValue::Null);
710        let Some(offset) = state.offsets.get(current_index).copied().flatten() else {
711            return Ok(default_val);
712        };
713        let target = state.current_row + offset;
714        if target < 0 || target >= state.buffer.len() as i64 {
715            return Ok(default_val);
716        }
717        Ok(state.buffer[target as usize].clone())
718    }
719
720    fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
721        self.value(&state)
722    }
723
724    fn num_args(&self) -> i32 {
725        -1
726    }
727
728    fn min_args(&self) -> i32 {
729        1
730    }
731
732    fn max_args(&self) -> Option<i32> {
733        Some(3)
734    }
735
736    fn name(&self) -> &str {
737        "lead"
738    }
739}
740
741// ═══════════════════════════════════════════════════════════════════════════
742// first_value(X)
743// ═══════════════════════════════════════════════════════════════════════════
744
745pub struct FirstValueState {
746    first: Option<SqliteValue>,
747}
748
749pub struct FirstValueFunc;
750
751impl WindowFunction for FirstValueFunc {
752    type State = FirstValueState;
753
754    fn initial_state(&self) -> Self::State {
755        FirstValueState { first: None }
756    }
757
758    fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
759        if state.first.is_none() {
760            state.first = Some(args.first().cloned().unwrap_or(SqliteValue::Null));
761        }
762        Ok(())
763    }
764
765    fn inverse(&self, state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
766        // When the first row exits the frame, we need to clear so the next
767        // step() captures the new first value.  For simplicity, we use a
768        // VecDeque-based approach in FirstValueFrameFunc below.  This basic
769        // version handles the common UNBOUNDED PRECEDING case correctly.
770        state.first = None;
771        Ok(())
772    }
773
774    fn value(&self, state: &Self::State) -> Result<SqliteValue> {
775        Ok(state.first.clone().unwrap_or(SqliteValue::Null))
776    }
777
778    fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
779        Ok(state.first.unwrap_or(SqliteValue::Null))
780    }
781
782    fn num_args(&self) -> i32 {
783        1
784    }
785
786    fn name(&self) -> &str {
787        "first_value"
788    }
789}
790
791// ═══════════════════════════════════════════════════════════════════════════
792// last_value(X)
793// ═══════════════════════════════════════════════════════════════════════════
794
795pub struct LastValueState {
796    frame: VecDeque<SqliteValue>,
797}
798
799pub struct LastValueFunc;
800
801impl WindowFunction for LastValueFunc {
802    type State = LastValueState;
803
804    fn initial_state(&self) -> Self::State {
805        LastValueState {
806            frame: VecDeque::new(),
807        }
808    }
809
810    fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
811        state
812            .frame
813            .push_back(args.first().cloned().unwrap_or(SqliteValue::Null));
814        Ok(())
815    }
816
817    fn inverse(&self, state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
818        state.frame.pop_front();
819        Ok(())
820    }
821
822    fn value(&self, state: &Self::State) -> Result<SqliteValue> {
823        Ok(state.frame.back().cloned().unwrap_or(SqliteValue::Null))
824    }
825
826    fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
827        Ok(state.frame.back().cloned().unwrap_or(SqliteValue::Null))
828    }
829
830    fn num_args(&self) -> i32 {
831        1
832    }
833
834    fn name(&self) -> &str {
835        "last_value"
836    }
837}
838
839// ═══════════════════════════════════════════════════════════════════════════
840// nth_value(X, N)
841// ═══════════════════════════════════════════════════════════════════════════
842
843pub struct NthValueState {
844    frame: VecDeque<SqliteValue>,
845    n: i64,
846}
847
848pub struct NthValueFunc;
849
850const INVALID_NTH_VALUE_ARGUMENT: &str = "second argument to nth_value must be a positive integer";
851
852fn integral_f64_to_i64(value: f64) -> Option<i64> {
853    const I64_MIN_AS_F64: f64 = -9_223_372_036_854_775_808.0;
854    const I64_MAX_EXCLUSIVE_AS_F64: f64 = 9_223_372_036_854_775_808.0;
855
856    if !value.is_finite()
857        || value.fract() != 0.0
858        || !(I64_MIN_AS_F64..I64_MAX_EXCLUSIVE_AS_F64).contains(&value)
859    {
860        return None;
861    }
862    Some(value as i64)
863}
864
865fn parse_integral_text(text: &str) -> Option<i64> {
866    let trimmed = text.trim();
867    if trimmed.is_empty() {
868        return None;
869    }
870    trimmed
871        .parse()
872        .ok()
873        .or_else(|| trimmed.parse().ok().and_then(integral_f64_to_i64))
874}
875
876fn nth_value_positive_integer_arg(value: Option<&SqliteValue>) -> Result<i64> {
877    let Some(value) = value else {
878        return Err(FrankenError::function_error(INVALID_NTH_VALUE_ARGUMENT));
879    };
880    let n = match value {
881        SqliteValue::Integer(n) => Some(*n),
882        SqliteValue::Float(n) => integral_f64_to_i64(*n),
883        SqliteValue::Text(text) => parse_integral_text(text),
884        SqliteValue::Null | SqliteValue::Blob(_) => None,
885    };
886    match n {
887        Some(n) if n > 0 => Ok(n),
888        _ => Err(FrankenError::function_error(INVALID_NTH_VALUE_ARGUMENT)),
889    }
890}
891
892impl WindowFunction for NthValueFunc {
893    type State = NthValueState;
894
895    fn initial_state(&self) -> Self::State {
896        NthValueState {
897            frame: VecDeque::new(),
898            n: 1,
899        }
900    }
901
902    fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
903        let val = args.first().cloned().unwrap_or(SqliteValue::Null);
904        let n = nth_value_positive_integer_arg(args.get(1))?;
905        // Capture N from second arg on first call.
906        if state.frame.is_empty() {
907            state.n = n;
908        }
909        state.frame.push_back(val);
910        Ok(())
911    }
912
913    fn inverse(&self, state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
914        state.frame.pop_front();
915        Ok(())
916    }
917
918    fn value(&self, state: &Self::State) -> Result<SqliteValue> {
919        let idx = (state.n - 1) as usize;
920        Ok(state.frame.get(idx).cloned().unwrap_or(SqliteValue::Null))
921    }
922
923    fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
924        self.value(&state)
925    }
926
927    fn num_args(&self) -> i32 {
928        2
929    }
930
931    fn name(&self) -> &str {
932        "nth_value"
933    }
934}
935
936// ═══════════════════════════════════════════════════════════════════════════
937// Aggregate-as-window functions: SUM, AVG, COUNT, MIN, MAX, TOTAL
938// SQLite allows any aggregate function to be used as a window function.
939// ═══════════════════════════════════════════════════════════════════════════
940
941pub struct WindowSumState {
942    sum: f64,
943    err: f64,
944    has_value: bool,
945    is_int: bool,
946    int_sum: i64,
947    overflowed: bool,
948}
949
950/// Kahan-Babuska-Neumaier compensated summation step (matches C SQLite func.c:1871).
951#[inline]
952fn kbn_step(sum: &mut f64, err: &mut f64, value: f64) {
953    let s = *sum;
954    let t = s + value;
955    if s.abs() > value.abs() {
956        *err += (s - t) + value;
957    } else {
958        *err += (value - t) + s;
959    }
960    *sum = t;
961}
962
963pub struct WindowSumFunc;
964
965impl WindowFunction for WindowSumFunc {
966    type State = WindowSumState;
967
968    fn initial_state(&self) -> Self::State {
969        WindowSumState {
970            sum: 0.0,
971            err: 0.0,
972            has_value: false,
973            is_int: true,
974            int_sum: 0,
975            overflowed: false,
976        }
977    }
978
979    fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
980        if args.is_empty() || args[0].is_null() {
981            return Ok(());
982        }
983        let value = args[0].to_sum_numeric_value();
984        if value.is_null() {
985            return Ok(());
986        }
987        state.has_value = true;
988        match value {
989            SqliteValue::Integer(i) => {
990                if state.is_int && !state.overflowed {
991                    match state.int_sum.checked_add(i) {
992                        Some(s) => state.int_sum = s,
993                        None => state.overflowed = true,
994                    }
995                }
996                kbn_step(&mut state.sum, &mut state.err, i as f64);
997            }
998            SqliteValue::Float(f) => {
999                state.is_int = false;
1000                kbn_step(&mut state.sum, &mut state.err, f);
1001            }
1002            SqliteValue::Null | SqliteValue::Text(_) | SqliteValue::Blob(_) => {}
1003        }
1004        Ok(())
1005    }
1006
1007    fn inverse(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
1008        if args.is_empty() || args[0].is_null() {
1009            return Ok(());
1010        }
1011        let value = args[0].to_sum_numeric_value();
1012        match value {
1013            SqliteValue::Integer(i) => {
1014                if state.is_int && !state.overflowed {
1015                    match state.int_sum.checked_sub(i) {
1016                        Some(s) => state.int_sum = s,
1017                        None => state.overflowed = true,
1018                    }
1019                }
1020                kbn_step(&mut state.sum, &mut state.err, -(i as f64));
1021            }
1022            SqliteValue::Float(f) => {
1023                state.is_int = false;
1024                kbn_step(&mut state.sum, &mut state.err, -f);
1025            }
1026            SqliteValue::Null | SqliteValue::Text(_) | SqliteValue::Blob(_) => {}
1027        }
1028        Ok(())
1029    }
1030
1031    fn value(&self, state: &Self::State) -> Result<SqliteValue> {
1032        if !state.has_value {
1033            return Ok(SqliteValue::Null);
1034        }
1035        if state.is_int && state.overflowed {
1036            return Err(FrankenError::IntegerOverflow);
1037        }
1038        if state.is_int {
1039            Ok(SqliteValue::Integer(state.int_sum))
1040        } else {
1041            Ok(SqliteValue::Float(state.sum + state.err))
1042        }
1043    }
1044
1045    fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
1046        self.value(&state)
1047    }
1048
1049    fn num_args(&self) -> i32 {
1050        1
1051    }
1052
1053    fn name(&self) -> &str {
1054        "SUM"
1055    }
1056}
1057
1058pub struct WindowTotalFunc;
1059
1060impl WindowFunction for WindowTotalFunc {
1061    type State = f64;
1062
1063    fn initial_state(&self) -> Self::State {
1064        0.0
1065    }
1066
1067    fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
1068        if !args.is_empty() && !args[0].is_null() {
1069            *state += args[0].to_float();
1070        }
1071        Ok(())
1072    }
1073
1074    fn inverse(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
1075        if !args.is_empty() && !args[0].is_null() {
1076            *state -= args[0].to_float();
1077        }
1078        Ok(())
1079    }
1080
1081    fn value(&self, state: &Self::State) -> Result<SqliteValue> {
1082        Ok(SqliteValue::Float(*state))
1083    }
1084
1085    fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
1086        Ok(SqliteValue::Float(state))
1087    }
1088
1089    fn num_args(&self) -> i32 {
1090        1
1091    }
1092
1093    fn name(&self) -> &str {
1094        "TOTAL"
1095    }
1096}
1097
1098pub struct WindowCountState {
1099    count: i64,
1100}
1101
1102pub struct WindowCountFunc;
1103
1104impl WindowFunction for WindowCountFunc {
1105    type State = WindowCountState;
1106
1107    fn initial_state(&self) -> Self::State {
1108        WindowCountState { count: 0 }
1109    }
1110
1111    fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
1112        // COUNT(*) has 0 args → count all rows; COUNT(x) skips NULLs.
1113        if args.is_empty() || !args[0].is_null() {
1114            state.count += 1;
1115        }
1116        Ok(())
1117    }
1118
1119    fn inverse(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
1120        if args.is_empty() || !args[0].is_null() {
1121            state.count -= 1;
1122        }
1123        Ok(())
1124    }
1125
1126    fn value(&self, state: &Self::State) -> Result<SqliteValue> {
1127        Ok(SqliteValue::Integer(state.count))
1128    }
1129
1130    fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
1131        Ok(SqliteValue::Integer(state.count))
1132    }
1133
1134    fn num_args(&self) -> i32 {
1135        -1 // variadic: COUNT(*) = 0 args, COUNT(x) = 1 arg
1136    }
1137
1138    fn min_args(&self) -> i32 {
1139        0
1140    }
1141
1142    fn max_args(&self) -> Option<i32> {
1143        Some(1)
1144    }
1145
1146    fn name(&self) -> &str {
1147        "COUNT"
1148    }
1149}
1150
1151pub struct WindowMinState {
1152    min: Option<SqliteValue>,
1153}
1154
1155pub struct WindowMinFunc;
1156
1157impl WindowFunction for WindowMinFunc {
1158    type State = WindowMinState;
1159
1160    fn initial_state(&self) -> Self::State {
1161        WindowMinState { min: None }
1162    }
1163
1164    fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
1165        if args.is_empty() || args[0].is_null() {
1166            return Ok(());
1167        }
1168        state.min = Some(match state.min.take() {
1169            None => args[0].clone(),
1170            Some(cur) => {
1171                if cmp_values(&args[0], &cur) == std::cmp::Ordering::Less {
1172                    args[0].clone()
1173                } else {
1174                    cur
1175                }
1176            }
1177        });
1178        Ok(())
1179    }
1180
1181    fn inverse(&self, _state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
1182        // MIN inverse is not efficiently invertible; no-op for unbounded frames.
1183        Ok(())
1184    }
1185
1186    fn value(&self, state: &Self::State) -> Result<SqliteValue> {
1187        Ok(state.min.clone().unwrap_or(SqliteValue::Null))
1188    }
1189
1190    fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
1191        Ok(state.min.unwrap_or(SqliteValue::Null))
1192    }
1193
1194    fn num_args(&self) -> i32 {
1195        1
1196    }
1197
1198    fn name(&self) -> &str {
1199        "MIN"
1200    }
1201}
1202
1203pub struct WindowMaxState {
1204    max: Option<SqliteValue>,
1205}
1206
1207pub struct WindowMaxFunc;
1208
1209impl WindowFunction for WindowMaxFunc {
1210    type State = WindowMaxState;
1211
1212    fn initial_state(&self) -> Self::State {
1213        WindowMaxState { max: None }
1214    }
1215
1216    fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
1217        if args.is_empty() || args[0].is_null() {
1218            return Ok(());
1219        }
1220        state.max = Some(match state.max.take() {
1221            None => args[0].clone(),
1222            Some(cur) => {
1223                if cmp_values(&args[0], &cur) == std::cmp::Ordering::Greater {
1224                    args[0].clone()
1225                } else {
1226                    cur
1227                }
1228            }
1229        });
1230        Ok(())
1231    }
1232
1233    fn inverse(&self, _state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
1234        Ok(())
1235    }
1236
1237    fn value(&self, state: &Self::State) -> Result<SqliteValue> {
1238        Ok(state.max.clone().unwrap_or(SqliteValue::Null))
1239    }
1240
1241    fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
1242        Ok(state.max.unwrap_or(SqliteValue::Null))
1243    }
1244
1245    fn num_args(&self) -> i32 {
1246        1
1247    }
1248
1249    fn name(&self) -> &str {
1250        "MAX"
1251    }
1252}
1253
1254pub struct WindowAvgState {
1255    sum: f64,
1256    err: f64,
1257    count: i64,
1258}
1259
1260pub struct WindowAvgFunc;
1261
1262impl WindowFunction for WindowAvgFunc {
1263    type State = WindowAvgState;
1264
1265    fn initial_state(&self) -> Self::State {
1266        WindowAvgState {
1267            sum: 0.0,
1268            err: 0.0,
1269            count: 0,
1270        }
1271    }
1272
1273    fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
1274        if args.is_empty() || args[0].is_null() {
1275            return Ok(());
1276        }
1277        kbn_step(&mut state.sum, &mut state.err, args[0].to_float());
1278        state.count += 1;
1279        Ok(())
1280    }
1281
1282    fn inverse(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
1283        if args.is_empty() || args[0].is_null() {
1284            return Ok(());
1285        }
1286        kbn_step(&mut state.sum, &mut state.err, -args[0].to_float());
1287        state.count -= 1;
1288        Ok(())
1289    }
1290
1291    fn value(&self, state: &Self::State) -> Result<SqliteValue> {
1292        if state.count == 0 {
1293            Ok(SqliteValue::Null)
1294        } else {
1295            #[allow(clippy::cast_precision_loss)]
1296            Ok(SqliteValue::Float(
1297                (state.sum + state.err) / state.count as f64,
1298            ))
1299        }
1300    }
1301
1302    fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
1303        self.value(&state)
1304    }
1305
1306    fn num_args(&self) -> i32 {
1307        1
1308    }
1309
1310    fn name(&self) -> &str {
1311        "AVG"
1312    }
1313}
1314
1315pub struct WindowGroupConcatState {
1316    result: String,
1317    has_value: bool,
1318}
1319
1320fn window_group_concat_step(state: &mut WindowGroupConcatState, args: &[SqliteValue]) {
1321    if args.is_empty() || args[0].is_null() {
1322        return;
1323    }
1324    if state.has_value {
1325        match args.get(1) {
1326            Some(separator) if !separator.is_null() => {
1327                if let Some(text) = separator.as_text_str() {
1328                    state.result.push_str(text);
1329                } else {
1330                    state.result.push_str(&separator.to_text());
1331                }
1332            }
1333            Some(_) => {}
1334            None => state.result.push(','),
1335        }
1336    }
1337    if let Some(text) = args[0].as_text_str() {
1338        state.result.push_str(text);
1339    } else {
1340        state.result.push_str(&args[0].to_text());
1341    }
1342    state.has_value = true;
1343}
1344
1345fn window_group_concat_value(state: &WindowGroupConcatState) -> SqliteValue {
1346    if state.has_value {
1347        SqliteValue::Text(state.result.clone().into())
1348    } else {
1349        SqliteValue::Null
1350    }
1351}
1352
1353pub struct WindowGroupConcatFunc;
1354
1355impl WindowFunction for WindowGroupConcatFunc {
1356    type State = WindowGroupConcatState;
1357
1358    fn initial_state(&self) -> Self::State {
1359        WindowGroupConcatState {
1360            result: String::new(),
1361            has_value: false,
1362        }
1363    }
1364
1365    fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
1366        window_group_concat_step(state, args);
1367        Ok(())
1368    }
1369
1370    fn inverse(&self, _state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
1371        // Sliding ROWS/RANGE/GROUPS frames are recomputed by the connection-level
1372        // window executor; the remaining paths never evict rows from the left edge.
1373        Ok(())
1374    }
1375
1376    fn value(&self, state: &Self::State) -> Result<SqliteValue> {
1377        Ok(window_group_concat_value(state))
1378    }
1379
1380    fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
1381        Ok(window_group_concat_value(&state))
1382    }
1383
1384    fn num_args(&self) -> i32 {
1385        -1
1386    }
1387
1388    fn min_args(&self) -> i32 {
1389        1
1390    }
1391
1392    fn max_args(&self) -> Option<i32> {
1393        Some(2)
1394    }
1395
1396    fn name(&self) -> &str {
1397        "group_concat"
1398    }
1399}
1400
1401pub struct WindowStringAggFunc;
1402
1403impl WindowFunction for WindowStringAggFunc {
1404    type State = WindowGroupConcatState;
1405
1406    fn initial_state(&self) -> Self::State {
1407        WindowGroupConcatState {
1408            result: String::new(),
1409            has_value: false,
1410        }
1411    }
1412
1413    fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
1414        window_group_concat_step(state, args);
1415        Ok(())
1416    }
1417
1418    fn inverse(&self, _state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
1419        Ok(())
1420    }
1421
1422    fn value(&self, state: &Self::State) -> Result<SqliteValue> {
1423        Ok(window_group_concat_value(state))
1424    }
1425
1426    fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
1427        Ok(window_group_concat_value(&state))
1428    }
1429
1430    fn num_args(&self) -> i32 {
1431        2
1432    }
1433
1434    fn name(&self) -> &str {
1435        "string_agg"
1436    }
1437}
1438
1439/// Compare two SQLite values using type-aware ordering.
1440pub fn cmp_values(a: &SqliteValue, b: &SqliteValue) -> std::cmp::Ordering {
1441    a.cmp(b)
1442}
1443
1444// ── Registration ──────────────────────────────────────────────────────────
1445
1446/// Register all S13.5 window functions.
1447pub fn register_window_builtins(registry: &mut FunctionRegistry) {
1448    registry.register_window(RowNumberFunc);
1449    registry.register_window(RankFunc);
1450    registry.register_window(DenseRankFunc);
1451    registry.register_window(PercentRankFunc);
1452    registry.register_window(CumeDistFunc);
1453    registry.register_window(NtileFunc);
1454    registry.register_window(LagFunc);
1455    registry.register_window(LeadFunc);
1456    registry.register_window(FirstValueFunc);
1457    registry.register_window(LastValueFunc);
1458    registry.register_window(NthValueFunc);
1459
1460    // Aggregate-as-window functions (SQLite allows any aggregate as a window fn).
1461    registry.register_window(WindowSumFunc);
1462    registry.register_window(WindowTotalFunc);
1463    registry.register_window(WindowCountFunc);
1464    registry.register_window(WindowMinFunc);
1465    registry.register_window(WindowMaxFunc);
1466    registry.register_window(WindowAvgFunc);
1467    registry.register_window(WindowGroupConcatFunc);
1468    registry.register_window(WindowStringAggFunc);
1469}
1470
1471// ── Tests ─────────────────────────────────────────────────────────────────
1472
1473#[cfg(test)]
1474mod tests {
1475    use super::*;
1476
1477    fn int(v: i64) -> SqliteValue {
1478        SqliteValue::Integer(v)
1479    }
1480
1481    fn float(v: f64) -> SqliteValue {
1482        SqliteValue::Float(v)
1483    }
1484
1485    fn text(s: &str) -> SqliteValue {
1486        SqliteValue::Text(s.into())
1487    }
1488
1489    fn blob(bytes: &[u8]) -> SqliteValue {
1490        SqliteValue::Blob(std::sync::Arc::from(bytes))
1491    }
1492
1493    fn null() -> SqliteValue {
1494        SqliteValue::Null
1495    }
1496
1497    fn assert_function_error(err: FrankenError, expected: &str) {
1498        assert!(
1499            matches!(&err, FrankenError::FunctionError(message) if message == expected),
1500            "expected function error {expected:?}, got {err:?}"
1501        );
1502    }
1503
1504    fn assert_float_near(value: &SqliteValue, expected: f64) {
1505        assert!(
1506            matches!(value, SqliteValue::Float(_)),
1507            "expected Float, got {value:?}"
1508        );
1509        if let SqliteValue::Float(actual) = value {
1510            assert!(
1511                (*actual - expected).abs() < 1e-10,
1512                "expected {expected}, got {actual}"
1513            );
1514        }
1515    }
1516
1517    /// Simulate a partition by calling step() for each row, collecting
1518    /// value() after each step.  Returns the vector of per-row results.
1519    /// Suitable for progressive functions (row_number, rank, dense_rank).
1520    fn run_window_partition<F: WindowFunction>(
1521        func: &F,
1522        rows: &[Vec<SqliteValue>],
1523    ) -> Vec<SqliteValue> {
1524        let mut state = func.initial_state();
1525        let mut results = Vec::new();
1526        for row in rows {
1527            func.step(&mut state, row).unwrap();
1528            results.push(func.value(&state).unwrap());
1529        }
1530        results
1531    }
1532
1533    /// Two-pass partition simulation: step all rows first (pass 1), then
1534    /// iterate calling value()+inverse() for each row (pass 2).
1535    /// Required for functions that need full partition size (ntile,
1536    /// percent_rank, cume_dist).
1537    fn run_window_two_pass<F: WindowFunction>(
1538        func: &F,
1539        rows: &[Vec<SqliteValue>],
1540    ) -> Vec<SqliteValue> {
1541        let mut state = func.initial_state();
1542        // Pass 1: step all rows.
1543        for row in rows {
1544            func.step(&mut state, row).unwrap();
1545        }
1546        // Pass 2: read values, advance cursor via inverse().
1547        let mut results = Vec::new();
1548        for (i, _) in rows.iter().enumerate() {
1549            results.push(func.value(&state).unwrap());
1550            if i < rows.len() - 1 {
1551                func.inverse(&mut state, &[]).unwrap();
1552            }
1553        }
1554        results
1555    }
1556
1557    // ── row_number ───────────────────────────────────────────────────
1558
1559    #[test]
1560    fn test_row_number_basic() {
1561        let results =
1562            run_window_partition(&RowNumberFunc, &[vec![], vec![], vec![], vec![], vec![]]);
1563        assert_eq!(results, vec![int(1), int(2), int(3), int(4), int(5)]);
1564    }
1565
1566    #[test]
1567    fn test_row_number_partition_reset() {
1568        // Partition 1: 3 rows.
1569        let r1 = run_window_partition(&RowNumberFunc, &[vec![], vec![], vec![]]);
1570        assert_eq!(r1, vec![int(1), int(2), int(3)]);
1571
1572        // Partition 2: 2 rows (fresh state).
1573        let r2 = run_window_partition(&RowNumberFunc, &[vec![], vec![]]);
1574        assert_eq!(r2, vec![int(1), int(2)]);
1575    }
1576
1577    // ── rank ─────────────────────────────────────────────────────────
1578
1579    #[test]
1580    fn test_rank_with_ties() {
1581        // Values: [1, 2, 2, 3] -> ranks: [1, 2, 2, 4]
1582        let results = run_window_partition(
1583            &RankFunc,
1584            &[vec![int(1)], vec![int(2)], vec![int(2)], vec![int(3)]],
1585        );
1586        assert_eq!(results, vec![int(1), int(2), int(2), int(4)]);
1587    }
1588
1589    #[test]
1590    fn test_rank_no_ties() {
1591        let results =
1592            run_window_partition(&RankFunc, &[vec![int(10)], vec![int(20)], vec![int(30)]]);
1593        assert_eq!(results, vec![int(1), int(2), int(3)]);
1594    }
1595
1596    // ── dense_rank ───────────────────────────────────────────────────
1597
1598    #[test]
1599    fn test_dense_rank_with_ties() {
1600        // Values: [1, 2, 2, 3] -> dense_ranks: [1, 2, 2, 3]
1601        let results = run_window_partition(
1602            &DenseRankFunc,
1603            &[vec![int(1)], vec![int(2)], vec![int(2)], vec![int(3)]],
1604        );
1605        assert_eq!(results, vec![int(1), int(2), int(2), int(3)]);
1606    }
1607
1608    #[test]
1609    fn test_dense_rank_multiple_ties() {
1610        // Values: [1, 1, 2, 2, 3] -> dense_ranks: [1, 1, 2, 2, 3]
1611        let results = run_window_partition(
1612            &DenseRankFunc,
1613            &[
1614                vec![int(1)],
1615                vec![int(1)],
1616                vec![int(2)],
1617                vec![int(2)],
1618                vec![int(3)],
1619            ],
1620        );
1621        assert_eq!(results, vec![int(1), int(1), int(2), int(2), int(3)]);
1622    }
1623
1624    // ── percent_rank ─────────────────────────────────────────────────
1625
1626    #[test]
1627    fn test_percent_rank_single_row() {
1628        let results = run_window_two_pass(&PercentRankFunc, &[vec![int(1)]]);
1629        assert_eq!(results, vec![SqliteValue::Float(0.0)]);
1630    }
1631
1632    #[test]
1633    fn test_percent_rank_formula() {
1634        // 4 rows, values [1, 2, 2, 3] -> ranks [1, 2, 2, 4]
1635        // percent_rank = (rank - 1) / (N - 1) = (rank - 1) / 3
1636        let results = run_window_two_pass(
1637            &PercentRankFunc,
1638            &[vec![int(1)], vec![int(2)], vec![int(2)], vec![int(3)]],
1639        );
1640        // Row 1: (1-1)/3 = 0.0
1641        // Row 2: (2-1)/3 = 0.333...
1642        // Row 3: (2-1)/3 = 0.333... (same rank as row 2)
1643        // Row 4: (4-1)/3 = 1.0
1644        assert_float_near(&results[0], 0.0);
1645        assert_float_near(&results[1], 1.0 / 3.0);
1646        assert_float_near(&results[2], 1.0 / 3.0);
1647        assert_float_near(&results[3], 1.0);
1648    }
1649
1650    #[test]
1651    fn test_percent_rank_without_order_treats_partition_as_one_peer_group() {
1652        let results = run_window_two_pass(&PercentRankFunc, &[vec![], vec![], vec![]]);
1653        for value in results {
1654            assert_float_near(&value, 0.0);
1655        }
1656    }
1657
1658    // ── cume_dist ────────────────────────────────────────────────────
1659
1660    #[test]
1661    fn test_cume_dist_distinct() {
1662        // 4 distinct values: cume_dist = [0.25, 0.5, 0.75, 1.0]
1663        let results = run_window_two_pass(
1664            &CumeDistFunc,
1665            &[vec![int(1)], vec![int(2)], vec![int(3)], vec![int(4)]],
1666        );
1667        for (i, expected) in [0.25, 0.5, 0.75, 1.0].iter().enumerate() {
1668            assert_float_near(&results[i], *expected);
1669        }
1670    }
1671
1672    #[test]
1673    fn test_cume_dist_with_ties() {
1674        // Values [1, 2, 2, 3] -> last peer positions [1, 3, 3, 4].
1675        let results = run_window_two_pass(
1676            &CumeDistFunc,
1677            &[vec![int(1)], vec![int(2)], vec![int(2)], vec![int(3)]],
1678        );
1679        for (i, expected) in [0.25, 0.75, 0.75, 1.0].iter().enumerate() {
1680            assert_float_near(&results[i], *expected);
1681        }
1682    }
1683
1684    #[test]
1685    fn test_cume_dist_without_order_treats_partition_as_one_peer_group() {
1686        let results = run_window_two_pass(&CumeDistFunc, &[vec![], vec![], vec![]]);
1687        for value in results {
1688            assert_float_near(&value, 1.0);
1689        }
1690    }
1691
1692    #[test]
1693    fn test_cume_dist_null_peers_share_same_peer_group() {
1694        let results =
1695            run_window_two_pass(&CumeDistFunc, &[vec![null()], vec![null()], vec![int(1)]]);
1696        assert_float_near(&results[0], 2.0 / 3.0);
1697        assert_float_near(&results[1], 2.0 / 3.0);
1698        assert_float_near(&results[2], 1.0);
1699    }
1700
1701    // ── ntile ────────────────────────────────────────────────────────
1702
1703    #[test]
1704    fn test_ntile_even() {
1705        // ntile(4) over 8 rows: groups of 2 each -> [1,1,2,2,3,3,4,4]
1706        let rows: Vec<Vec<SqliteValue>> = (0..8).map(|_| vec![int(4)]).collect();
1707        let results = run_window_two_pass(&NtileFunc, &rows);
1708        assert_eq!(
1709            results,
1710            vec![
1711                int(1),
1712                int(1),
1713                int(2),
1714                int(2),
1715                int(3),
1716                int(3),
1717                int(4),
1718                int(4)
1719            ]
1720        );
1721    }
1722
1723    #[test]
1724    fn test_ntile_uneven() {
1725        // ntile(3) over 10 rows: groups of 4,3,3
1726        let rows: Vec<Vec<SqliteValue>> = (0..10).map(|_| vec![int(3)]).collect();
1727        let results = run_window_two_pass(&NtileFunc, &rows);
1728        assert_eq!(
1729            results,
1730            vec![
1731                int(1),
1732                int(1),
1733                int(1),
1734                int(1),
1735                int(2),
1736                int(2),
1737                int(2),
1738                int(3),
1739                int(3),
1740                int(3)
1741            ]
1742        );
1743    }
1744
1745    #[test]
1746    fn test_ntile_more_buckets_than_rows() {
1747        // ntile(10) over 3 rows: [1, 2, 3]
1748        let rows: Vec<Vec<SqliteValue>> = (0..3).map(|_| vec![int(10)]).collect();
1749        let results = run_window_two_pass(&NtileFunc, &rows);
1750        assert_eq!(results, vec![int(1), int(2), int(3)]);
1751    }
1752
1753    #[test]
1754    fn test_ntile_rejects_non_positive_argument() {
1755        for n in [0, -1] {
1756            let mut state = NtileFunc.initial_state();
1757            let err = NtileFunc.step(&mut state, &[int(n)]).unwrap_err();
1758            assert_function_error(err, INVALID_NTILE_ARGUMENT);
1759        }
1760    }
1761
1762    // ── lag ──────────────────────────────────────────────────────────
1763
1764    #[test]
1765    fn test_lag_default() {
1766        // lag(X) with default offset=1: previous row's value, NULL for first.
1767        let results = run_window_two_pass(&LagFunc, &[vec![int(10)], vec![int(20)], vec![int(30)]]);
1768        assert_eq!(results, vec![null(), int(10), int(20)]);
1769    }
1770
1771    #[test]
1772    fn test_lag_offset_3() {
1773        // lag(X, 3): 3 rows back.
1774        let results = run_window_two_pass(
1775            &LagFunc,
1776            &[
1777                vec![int(10), int(3)],
1778                vec![int(20), int(3)],
1779                vec![int(30), int(3)],
1780                vec![int(40), int(3)],
1781                vec![int(50), int(3)],
1782            ],
1783        );
1784        assert_eq!(results, vec![null(), null(), null(), int(10), int(20)]);
1785    }
1786
1787    #[test]
1788    fn test_lag_default_value() {
1789        // lag(X, 1, -1): returns -1 when no previous row.
1790        let results = run_window_two_pass(
1791            &LagFunc,
1792            &[
1793                vec![int(10), int(1), int(-1)],
1794                vec![int(20), int(1), int(-1)],
1795            ],
1796        );
1797        assert_eq!(results, vec![int(-1), int(10)]);
1798    }
1799
1800    #[test]
1801    fn test_lag_null_offset_returns_default_for_each_row() {
1802        let results = run_window_two_pass(
1803            &LagFunc,
1804            &[
1805                vec![int(10), null(), text("N/A")],
1806                vec![int(20), null(), text("N/A")],
1807            ],
1808        );
1809        assert_eq!(results, vec![text("N/A"), text("N/A")]);
1810    }
1811
1812    #[test]
1813    fn test_lag_uses_current_row_offset_and_default() {
1814        let results = run_window_two_pass(
1815            &LagFunc,
1816            &[
1817                vec![int(10), int(1), text("first")],
1818                vec![int(20), null(), text("null-offset")],
1819                vec![int(30), int(1), text("third")],
1820                vec![int(40), int(2), text("fourth")],
1821            ],
1822        );
1823        assert_eq!(
1824            results,
1825            vec![text("first"), text("null-offset"), int(20), int(20)]
1826        );
1827    }
1828
1829    #[test]
1830    fn test_lag_negative_offset_reads_following_row() {
1831        let results = run_window_two_pass(
1832            &LagFunc,
1833            &[
1834                vec![int(10), int(-1), text("N/A")],
1835                vec![int(20), int(-1), text("N/A")],
1836                vec![int(30), int(-1), text("N/A")],
1837            ],
1838        );
1839        assert_eq!(results, vec![int(20), int(30), text("N/A")]);
1840    }
1841
1842    #[test]
1843    fn test_lag_fractional_offset_uses_default() {
1844        let results = run_window_two_pass(
1845            &LagFunc,
1846            &[
1847                vec![int(10), float(1.5), text("N/A")],
1848                vec![int(20), float(1.5), text("N/A")],
1849                vec![int(30), float(1.5), text("N/A")],
1850            ],
1851        );
1852        assert_eq!(results, vec![text("N/A"), text("N/A"), text("N/A")]);
1853    }
1854
1855    #[test]
1856    fn test_lag_nonnumeric_text_offset_reads_current_row() {
1857        let results = run_window_two_pass(
1858            &LagFunc,
1859            &[
1860                vec![int(10), text("abc"), text("N/A")],
1861                vec![int(20), text("abc"), text("N/A")],
1862                vec![int(30), text("abc"), text("N/A")],
1863            ],
1864        );
1865        assert_eq!(results, vec![int(10), int(20), int(30)]);
1866    }
1867
1868    #[test]
1869    fn test_lag_integral_text_prefix_offset() {
1870        let results = run_window_two_pass(
1871            &LagFunc,
1872            &[
1873                vec![int(10), text("2.0x"), text("N/A")],
1874                vec![int(20), text("2e0x"), text("N/A")],
1875                vec![int(30), text("2x"), text("N/A")],
1876            ],
1877        );
1878        assert_eq!(results, vec![text("N/A"), text("N/A"), int(10)]);
1879    }
1880
1881    // ── lead ─────────────────────────────────────────────────────────
1882
1883    #[test]
1884    fn test_lead_default() {
1885        // lead(X): next row's value, NULL for last.
1886        // For lead, we need to step all rows first (to build the buffer),
1887        // then call inverse + value for each row.
1888        let func = LeadFunc;
1889        let mut state = func.initial_state();
1890        let rows = [int(10), int(20), int(30)];
1891
1892        // Step all rows to build the buffer.
1893        for row in &rows {
1894            func.step(&mut state, std::slice::from_ref(row)).unwrap();
1895        }
1896
1897        // Now iterate: first row is at current_row=0.
1898        let mut results = Vec::new();
1899        for _ in &rows {
1900            results.push(func.value(&state).unwrap());
1901            func.inverse(&mut state, &[]).unwrap();
1902        }
1903        assert_eq!(results, vec![int(20), int(30), null()]);
1904    }
1905
1906    #[test]
1907    fn test_lead_offset_2() {
1908        let func = LeadFunc;
1909        let mut state = func.initial_state();
1910        let rows = [int(10), int(20), int(30), int(40), int(50)];
1911
1912        for row in &rows {
1913            func.step(&mut state, &[row.clone(), int(2)]).unwrap();
1914        }
1915
1916        let mut results = Vec::new();
1917        for _ in &rows {
1918            results.push(func.value(&state).unwrap());
1919            func.inverse(&mut state, &[]).unwrap();
1920        }
1921        assert_eq!(results, vec![int(30), int(40), int(50), null(), null()]);
1922    }
1923
1924    #[test]
1925    fn test_lead_default_value() {
1926        let func = LeadFunc;
1927        let mut state = func.initial_state();
1928        let rows = [int(10), int(20)];
1929
1930        for row in &rows {
1931            func.step(&mut state, &[row.clone(), int(1), text("N/A")])
1932                .unwrap();
1933        }
1934
1935        let mut results = Vec::new();
1936        for _ in &rows {
1937            results.push(func.value(&state).unwrap());
1938            func.inverse(&mut state, &[]).unwrap();
1939        }
1940        assert_eq!(results, vec![int(20), text("N/A")]);
1941    }
1942
1943    #[test]
1944    fn test_lead_null_offset_returns_default_for_each_row() {
1945        let func = LeadFunc;
1946        let mut state = func.initial_state();
1947        let rows = [int(10), int(20)];
1948
1949        for row in &rows {
1950            func.step(&mut state, &[row.clone(), null(), text("N/A")])
1951                .unwrap();
1952        }
1953
1954        let mut results = Vec::new();
1955        for _ in &rows {
1956            results.push(func.value(&state).unwrap());
1957            func.inverse(&mut state, &[]).unwrap();
1958        }
1959        assert_eq!(results, vec![text("N/A"), text("N/A")]);
1960    }
1961
1962    #[test]
1963    fn test_lead_uses_current_row_offset_and_default() {
1964        let func = LeadFunc;
1965        let mut state = func.initial_state();
1966        let rows = [
1967            vec![int(10), int(1), text("first")],
1968            vec![int(20), null(), text("null-offset")],
1969            vec![int(30), int(1), text("third")],
1970        ];
1971
1972        for row in &rows {
1973            func.step(&mut state, row).unwrap();
1974        }
1975
1976        let mut results = Vec::new();
1977        for _ in &rows {
1978            results.push(func.value(&state).unwrap());
1979            func.inverse(&mut state, &[]).unwrap();
1980        }
1981        assert_eq!(results, vec![int(20), text("null-offset"), text("third")]);
1982    }
1983
1984    #[test]
1985    fn test_lead_negative_offset_reads_previous_row() {
1986        let func = LeadFunc;
1987        let mut state = func.initial_state();
1988        let rows = [int(10), int(20), int(30)];
1989
1990        for row in &rows {
1991            func.step(&mut state, &[row.clone(), int(-1), text("N/A")])
1992                .unwrap();
1993        }
1994
1995        let mut results = Vec::new();
1996        for _ in &rows {
1997            results.push(func.value(&state).unwrap());
1998            func.inverse(&mut state, &[]).unwrap();
1999        }
2000        assert_eq!(results, vec![text("N/A"), int(10), int(20)]);
2001    }
2002
2003    #[test]
2004    fn test_lead_fractional_offset_uses_default() {
2005        let results = run_window_two_pass(
2006            &LeadFunc,
2007            &[
2008                vec![int(10), float(1.5), text("N/A")],
2009                vec![int(20), float(1.5), text("N/A")],
2010                vec![int(30), float(1.5), text("N/A")],
2011            ],
2012        );
2013        assert_eq!(results, vec![text("N/A"), text("N/A"), text("N/A")]);
2014    }
2015
2016    #[test]
2017    fn test_lead_integral_blob_offset() {
2018        let results = run_window_two_pass(
2019            &LeadFunc,
2020            &[
2021                vec![int(10), blob(b"2.0x"), text("N/A")],
2022                vec![int(20), blob(b"2e0x"), text("N/A")],
2023                vec![int(30), blob(b"2x"), text("N/A")],
2024            ],
2025        );
2026        assert_eq!(results, vec![int(30), text("N/A"), text("N/A")]);
2027    }
2028
2029    // ── first_value ──────────────────────────────────────────────────
2030
2031    #[test]
2032    fn test_first_value_basic() {
2033        let results = run_window_partition(
2034            &FirstValueFunc,
2035            &[vec![int(10)], vec![int(20)], vec![int(30)]],
2036        );
2037        // With default frame (UNBOUNDED PRECEDING to CURRENT ROW),
2038        // first_value is always the first row's value.
2039        assert_eq!(results, vec![int(10), int(10), int(10)]);
2040    }
2041
2042    // ── last_value ───────────────────────────────────────────────────
2043
2044    #[test]
2045    fn test_last_value_default_frame() {
2046        // With default frame (RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW),
2047        // last_value returns the current row's value.
2048        let results = run_window_partition(
2049            &LastValueFunc,
2050            &[vec![int(10)], vec![int(20)], vec![int(30)]],
2051        );
2052        assert_eq!(results, vec![int(10), int(20), int(30)]);
2053    }
2054
2055    #[test]
2056    fn test_last_value_unbounded_following() {
2057        // With UNBOUNDED FOLLOWING frame, step all rows first,
2058        // then value() returns the true last value.
2059        let func = LastValueFunc;
2060        let mut state = func.initial_state();
2061        func.step(&mut state, &[int(10)]).unwrap();
2062        func.step(&mut state, &[int(20)]).unwrap();
2063        func.step(&mut state, &[int(30)]).unwrap();
2064        assert_eq!(func.value(&state).unwrap(), int(30));
2065    }
2066
2067    // ── nth_value ────────────────────────────────────────────────────
2068
2069    #[test]
2070    fn test_nth_value_basic() {
2071        let func = NthValueFunc;
2072        let mut state = func.initial_state();
2073        // Step 5 rows; N=3.
2074        func.step(&mut state, &[int(10), int(3)]).unwrap();
2075        func.step(&mut state, &[int(20), int(3)]).unwrap();
2076        func.step(&mut state, &[int(30), int(3)]).unwrap();
2077        func.step(&mut state, &[int(40), int(3)]).unwrap();
2078        func.step(&mut state, &[int(50), int(3)]).unwrap();
2079        assert_eq!(func.value(&state).unwrap(), int(30));
2080    }
2081
2082    #[test]
2083    fn test_nth_value_out_of_range() {
2084        let func = NthValueFunc;
2085        let mut state = func.initial_state();
2086        func.step(&mut state, &[int(10), int(100)]).unwrap();
2087        func.step(&mut state, &[int(20), int(100)]).unwrap();
2088        // Frame has 2 rows but N=100.
2089        assert_eq!(func.value(&state).unwrap(), null());
2090    }
2091
2092    #[test]
2093    fn test_nth_value_n_zero() {
2094        let func = NthValueFunc;
2095        let mut state = func.initial_state();
2096        let err = func.step(&mut state, &[int(10), int(0)]).unwrap_err();
2097        assert_function_error(err, INVALID_NTH_VALUE_ARGUMENT);
2098    }
2099
2100    #[test]
2101    fn test_nth_value_rejects_negative_n() {
2102        let func = NthValueFunc;
2103        let mut state = func.initial_state();
2104        let err = func.step(&mut state, &[int(10), int(-1)]).unwrap_err();
2105        assert_function_error(err, INVALID_NTH_VALUE_ARGUMENT);
2106    }
2107
2108    #[test]
2109    fn test_nth_value_accepts_integral_real_n() {
2110        let func = NthValueFunc;
2111        let mut state = func.initial_state();
2112        func.step(&mut state, &[int(10), float(2.0)]).unwrap();
2113        func.step(&mut state, &[int(20), float(2.0)]).unwrap();
2114        assert_eq!(func.value(&state).unwrap(), int(20));
2115    }
2116
2117    #[test]
2118    fn test_nth_value_accepts_integral_text_n() {
2119        let func = NthValueFunc;
2120        let mut state = func.initial_state();
2121        func.step(&mut state, &[int(10), text("2e0")]).unwrap();
2122        func.step(&mut state, &[int(20), text("2.0")]).unwrap();
2123        assert_eq!(func.value(&state).unwrap(), int(20));
2124    }
2125
2126    #[test]
2127    fn test_nth_value_rejects_fractional_real_n() {
2128        let func = NthValueFunc;
2129        let mut state = func.initial_state();
2130        let err = func.step(&mut state, &[int(10), float(1.5)]).unwrap_err();
2131        assert_function_error(err, INVALID_NTH_VALUE_ARGUMENT);
2132    }
2133
2134    #[test]
2135    fn test_nth_value_rejects_fractional_text_n() {
2136        let func = NthValueFunc;
2137        let mut state = func.initial_state();
2138        let err = func.step(&mut state, &[int(10), text("1.5")]).unwrap_err();
2139        assert_function_error(err, INVALID_NTH_VALUE_ARGUMENT);
2140    }
2141
2142    #[test]
2143    fn test_nth_value_rejects_text_numeric_prefix_n() {
2144        let func = NthValueFunc;
2145        let mut state = func.initial_state();
2146        let err = func.step(&mut state, &[int(10), text("2x")]).unwrap_err();
2147        assert_function_error(err, INVALID_NTH_VALUE_ARGUMENT);
2148    }
2149
2150    #[test]
2151    fn test_nth_value_rejects_blob_integer_n() {
2152        let func = NthValueFunc;
2153        let mut state = func.initial_state();
2154        let err = func.step(&mut state, &[int(10), blob(b"2")]).unwrap_err();
2155        assert_function_error(err, INVALID_NTH_VALUE_ARGUMENT);
2156    }
2157
2158    #[test]
2159    fn test_nth_value_rejects_non_positive_n_after_first_row() {
2160        let func = NthValueFunc;
2161        let mut state = func.initial_state();
2162        func.step(&mut state, &[int(10), int(1)]).unwrap();
2163        let err = func.step(&mut state, &[int(20), int(0)]).unwrap_err();
2164        assert_function_error(err, INVALID_NTH_VALUE_ARGUMENT);
2165    }
2166
2167    // ── sum / total / avg ────────────────────────────────────────────
2168
2169    #[test]
2170    fn test_window_sum_text_integer_literals_stay_integer() {
2171        let results = run_window_partition(&WindowSumFunc, &[vec![text("1")], vec![text("2")]]);
2172        assert_eq!(results, vec![int(1), int(3)]);
2173    }
2174
2175    #[test]
2176    fn test_window_sum_non_numeric_text_returns_real_zero() {
2177        let results = run_window_partition(&WindowSumFunc, &[vec![text("abc")]]);
2178        assert_eq!(results, vec![float(0.0)]);
2179    }
2180
2181    #[test]
2182    fn test_window_sum_overflow_suppressed_after_float_input() {
2183        let func = WindowSumFunc;
2184        let mut state = func.initial_state();
2185
2186        func.step(&mut state, &[int(i64::MAX)]).unwrap();
2187        func.step(&mut state, &[int(1)]).unwrap();
2188        assert_eq!(
2189            func.value(&state).unwrap_err().to_string(),
2190            "integer overflow"
2191        );
2192
2193        func.step(&mut state, &[float(0.5)]).unwrap();
2194        assert!(matches!(func.value(&state).unwrap(), SqliteValue::Float(_)));
2195    }
2196
2197    // ── min / max ────────────────────────────────────────────────────
2198
2199    #[test]
2200    fn test_window_min_max_use_sqlite_storage_class_order() {
2201        let text_value = text("z");
2202        let blob_value = blob(b"\0");
2203
2204        let mut min_state = WindowMinFunc.initial_state();
2205        WindowMinFunc
2206            .step(&mut min_state, std::slice::from_ref(&blob_value))
2207            .unwrap();
2208        WindowMinFunc
2209            .step(&mut min_state, std::slice::from_ref(&text_value))
2210            .unwrap();
2211        assert_eq!(WindowMinFunc.value(&min_state).unwrap(), text_value);
2212
2213        let mut max_state = WindowMaxFunc.initial_state();
2214        WindowMaxFunc
2215            .step(&mut max_state, std::slice::from_ref(&text_value))
2216            .unwrap();
2217        WindowMaxFunc
2218            .step(&mut max_state, std::slice::from_ref(&blob_value))
2219            .unwrap();
2220        assert_eq!(WindowMaxFunc.value(&max_state).unwrap(), blob_value);
2221    }
2222
2223    // ── group_concat / string_agg ────────────────────────────────────
2224
2225    #[test]
2226    fn test_window_group_concat_running_default_separator() {
2227        let results = run_window_partition(
2228            &WindowGroupConcatFunc,
2229            &[vec![text("a")], vec![text("b")], vec![text("c")]],
2230        );
2231        assert_eq!(results, vec![text("a"), text("a,b"), text("a,b,c")]);
2232    }
2233
2234    #[test]
2235    fn test_window_group_concat_running_custom_separator() {
2236        let results = run_window_partition(
2237            &WindowGroupConcatFunc,
2238            &[
2239                vec![text("a"), text(" | ")],
2240                vec![text("b"), text(" | ")],
2241                vec![text("c"), text(" | ")],
2242            ],
2243        );
2244        assert_eq!(results, vec![text("a"), text("a | b"), text("a | b | c")]);
2245    }
2246
2247    #[test]
2248    fn test_window_group_concat_skips_null_and_uses_current_row_separator() {
2249        let results = run_window_partition(
2250            &WindowGroupConcatFunc,
2251            &[
2252                vec![text("a"), text("-")],
2253                vec![null(), text("?")],
2254                vec![text("b"), text("+")],
2255                vec![text("c"), text("*")],
2256            ],
2257        );
2258        assert_eq!(
2259            results,
2260            vec![text("a"), text("a"), text("a+b"), text("a+b*c")]
2261        );
2262    }
2263
2264    #[test]
2265    fn test_window_string_agg_alias_through_registry() {
2266        let mut reg = FunctionRegistry::new();
2267        register_window_builtins(&mut reg);
2268
2269        let sa = reg.find_window("string_agg", 2).unwrap();
2270        let mut state = sa.initial_state();
2271        sa.step(&mut state, &[text("a"), text(";")]).unwrap();
2272        assert_eq!(sa.value(&state).unwrap(), text("a"));
2273        sa.step(&mut state, &[text("b"), text(";")]).unwrap();
2274        assert_eq!(sa.value(&state).unwrap(), text("a;b"));
2275    }
2276
2277    // ── Registration ─────────────────────────────────────────────────
2278
2279    #[test]
2280    fn test_register_window_builtins_all_present() {
2281        let mut reg = FunctionRegistry::new();
2282        register_window_builtins(&mut reg);
2283
2284        let expected_variadic = [
2285            "row_number",
2286            "rank",
2287            "dense_rank",
2288            "percent_rank",
2289            "cume_dist",
2290            "lag",
2291            "lead",
2292        ];
2293        for name in expected_variadic {
2294            assert!(
2295                reg.find_window(name, 0).is_some()
2296                    || reg.find_window(name, 1).is_some()
2297                    || reg.find_window(name, -1).is_some(),
2298                "window function '{name}' not registered"
2299            );
2300        }
2301
2302        assert!(
2303            reg.find_window("ntile", 1).is_some(),
2304            "ntile(1) not registered"
2305        );
2306        assert!(
2307            reg.find_window("first_value", 1).is_some(),
2308            "first_value(1) not registered"
2309        );
2310        assert!(
2311            reg.find_window("last_value", 1).is_some(),
2312            "last_value(1) not registered"
2313        );
2314        assert!(
2315            reg.find_window("nth_value", 2).is_some(),
2316            "nth_value(2) not registered"
2317        );
2318        assert!(
2319            reg.find_window("group_concat", 1).is_some(),
2320            "group_concat(1) not registered"
2321        );
2322        assert!(
2323            reg.find_window("group_concat", 2).is_some(),
2324            "group_concat(2) not registered"
2325        );
2326        assert!(
2327            reg.find_window("string_agg", 2).is_some(),
2328            "string_agg(2) not registered"
2329        );
2330
2331        for (name, arity) in [
2332            ("rank", 1),
2333            ("dense_rank", 1),
2334            ("percent_rank", 1),
2335            ("cume_dist", 1),
2336            ("lag", 0),
2337            ("lag", 4),
2338            ("lead", 0),
2339            ("lead", 4),
2340            ("count", 2),
2341            ("group_concat", 0),
2342            ("group_concat", 3),
2343        ] {
2344            let f = reg
2345                .find_window(name, arity)
2346                .expect("known window with wrong arity returns erroring window");
2347            let mut state = f.initial_state();
2348            let err = f
2349                .step(&mut state, &[])
2350                .expect_err("invalid window arity should fail");
2351            let expected = format!("wrong number of arguments to function {name}()");
2352            assert!(
2353                matches!(&err, FrankenError::FunctionError(message) if message == &expected),
2354                "unexpected error for {name}/{arity}: {err:?}"
2355            );
2356        }
2357    }
2358
2359    // ── E2E: full lifecycle through registry ─────────────────────────
2360
2361    #[test]
2362    fn test_e2e_window_row_number_through_registry() {
2363        let mut reg = FunctionRegistry::new();
2364        register_window_builtins(&mut reg);
2365
2366        let rn = reg.find_window("row_number", 0).unwrap();
2367        let mut state = rn.initial_state();
2368        rn.step(&mut state, &[]).unwrap();
2369        assert_eq!(rn.value(&state).unwrap(), int(1));
2370        rn.step(&mut state, &[]).unwrap();
2371        assert_eq!(rn.value(&state).unwrap(), int(2));
2372        rn.step(&mut state, &[]).unwrap();
2373        assert_eq!(rn.value(&state).unwrap(), int(3));
2374    }
2375
2376    #[test]
2377    fn test_e2e_window_rank_through_registry() {
2378        let mut reg = FunctionRegistry::new();
2379        register_window_builtins(&mut reg);
2380
2381        let rank = reg.find_window("rank", 0).unwrap();
2382        let mut state = rank.initial_state();
2383        // [1, 2, 2, 3] -> [1, 2, 2, 4]
2384        rank.step(&mut state, &[int(1)]).unwrap();
2385        assert_eq!(rank.value(&state).unwrap(), int(1));
2386        rank.step(&mut state, &[int(2)]).unwrap();
2387        assert_eq!(rank.value(&state).unwrap(), int(2));
2388        rank.step(&mut state, &[int(2)]).unwrap();
2389        assert_eq!(rank.value(&state).unwrap(), int(2));
2390        rank.step(&mut state, &[int(3)]).unwrap();
2391        assert_eq!(rank.value(&state).unwrap(), int(4));
2392    }
2393}