Skip to main content

fsqlite_func/
window_builtins.rs

1//! Built-in window functions (S13.5, bd-14i6).
2//!
3//! Implements: row_number, rank, dense_rank, percent_rank, cume_dist,
4//! ntile, lag, lead, first_value, last_value, nth_value, plus
5//! aggregate-as-window variants including group_concat and string_agg.
6//!
7//! These functions implement the [`WindowFunction`] trait.  The VDBE is
8//! responsible for partitioning, ordering, and frame management; these
9//! implementations provide the per-row computation logic.
10//!
11//! # Design Notes
12//!
13//! Pure-numbering functions (row_number, rank, dense_rank) track position
14//! via step() and expose the current value through value().  The ORDER BY
15//! column is passed as args\[0\] so the function can detect peer-group
16//! boundaries.
17//!
18//! Buffer-based functions (lag, lead, first_value, last_value, nth_value)
19//! maintain an internal VecDeque of values and expose frame-relative
20//! access through value().
21#![allow(
22    clippy::unnecessary_literal_bound,
23    clippy::cast_possible_truncation,
24    clippy::cast_possible_wrap,
25    clippy::cast_precision_loss,
26    clippy::cast_sign_loss,
27    clippy::items_after_statements,
28    clippy::float_cmp,
29    clippy::match_same_arms,
30    clippy::similar_names
31)]
32
33use std::collections::VecDeque;
34
35use fsqlite_error::{FrankenError, Result};
36use fsqlite_types::SqliteValue;
37
38use crate::{FunctionRegistry, WindowFunction};
39
40// ═══════════════════════════════════════════════════════════════════════════
41// row_number()
42// ═══════════════════════════════════════════════════════════════════════════
43
44pub struct RowNumberState {
45    counter: i64,
46}
47
48pub struct RowNumberFunc;
49
50impl WindowFunction for RowNumberFunc {
51    type State = RowNumberState;
52
53    fn initial_state(&self) -> Self::State {
54        RowNumberState { counter: 0 }
55    }
56
57    fn step(&self, state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
58        state.counter += 1;
59        Ok(())
60    }
61
62    fn inverse(&self, state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
63        state.counter -= 1;
64        Ok(())
65    }
66
67    fn value(&self, state: &Self::State) -> Result<SqliteValue> {
68        Ok(SqliteValue::Integer(state.counter))
69    }
70
71    fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
72        Ok(SqliteValue::Integer(state.counter))
73    }
74
75    fn num_args(&self) -> i32 {
76        0
77    }
78
79    fn name(&self) -> &str {
80        "row_number"
81    }
82}
83
84// ═══════════════════════════════════════════════════════════════════════════
85// rank()
86// ═══════════════════════════════════════════════════════════════════════════
87
88pub struct RankState {
89    row_number: i64,
90    rank: i64,
91    last_order_value: Option<SqliteValue>,
92}
93
94pub struct RankFunc;
95
96impl WindowFunction for RankFunc {
97    type State = RankState;
98
99    fn initial_state(&self) -> Self::State {
100        RankState {
101            row_number: 0,
102            rank: 0,
103            last_order_value: None,
104        }
105    }
106
107    fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
108        state.row_number += 1;
109        let current = args.first().cloned().unwrap_or(SqliteValue::Null);
110        let is_new_peer = match &state.last_order_value {
111            None => true,
112            Some(last) => &current != last,
113        };
114        if is_new_peer {
115            state.rank = state.row_number;
116            state.last_order_value = Some(current);
117        }
118        Ok(())
119    }
120
121    fn inverse(&self, _state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
122        // rank() uses UNBOUNDED PRECEDING to CURRENT ROW; inverse is a no-op.
123        Ok(())
124    }
125
126    fn value(&self, state: &Self::State) -> Result<SqliteValue> {
127        Ok(SqliteValue::Integer(state.rank))
128    }
129
130    fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
131        Ok(SqliteValue::Integer(state.rank))
132    }
133
134    fn num_args(&self) -> i32 {
135        -1
136    }
137
138    fn min_args(&self) -> i32 {
139        0
140    }
141
142    fn max_args(&self) -> Option<i32> {
143        Some(0)
144    }
145
146    fn name(&self) -> &str {
147        "rank"
148    }
149}
150
151// ═══════════════════════════════════════════════════════════════════════════
152// dense_rank()
153// ═══════════════════════════════════════════════════════════════════════════
154
155pub struct DenseRankState {
156    dense_rank: i64,
157    last_order_value: Option<SqliteValue>,
158}
159
160pub struct DenseRankFunc;
161
162impl WindowFunction for DenseRankFunc {
163    type State = DenseRankState;
164
165    fn initial_state(&self) -> Self::State {
166        DenseRankState {
167            dense_rank: 0,
168            last_order_value: None,
169        }
170    }
171
172    fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
173        let current = args.first().cloned().unwrap_or(SqliteValue::Null);
174        let is_new_peer = match &state.last_order_value {
175            None => true,
176            Some(last) => &current != last,
177        };
178        if is_new_peer {
179            state.dense_rank += 1;
180            state.last_order_value = Some(current);
181        }
182        Ok(())
183    }
184
185    fn inverse(&self, _state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
186        Ok(())
187    }
188
189    fn value(&self, state: &Self::State) -> Result<SqliteValue> {
190        Ok(SqliteValue::Integer(state.dense_rank))
191    }
192
193    fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
194        Ok(SqliteValue::Integer(state.dense_rank))
195    }
196
197    fn num_args(&self) -> i32 {
198        -1
199    }
200
201    fn min_args(&self) -> i32 {
202        0
203    }
204
205    fn max_args(&self) -> Option<i32> {
206        Some(0)
207    }
208
209    fn name(&self) -> &str {
210        "dense_rank"
211    }
212}
213
214// ═══════════════════════════════════════════════════════════════════════════
215// percent_rank()
216// ═══════════════════════════════════════════════════════════════════════════
217
218/// State for `percent_rank()`.
219///
220/// The VDBE must step() all rows in the partition first (to compute ranks
221/// and partition size), then iterate by calling value() and inverse() for
222/// each output row.
223pub struct PercentRankState {
224    partition_size: i64,
225    ranks: Vec<i64>,
226    cursor: usize,
227    step_row_number: i64,
228    current_rank: i64,
229    last_order_value: Option<SqliteValue>,
230}
231
232pub struct PercentRankFunc;
233
234impl WindowFunction for PercentRankFunc {
235    type State = PercentRankState;
236
237    fn initial_state(&self) -> Self::State {
238        PercentRankState {
239            partition_size: 0,
240            ranks: Vec::new(),
241            cursor: 0,
242            step_row_number: 0,
243            current_rank: 0,
244            last_order_value: None,
245        }
246    }
247
248    fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
249        state.step_row_number += 1;
250        state.partition_size += 1;
251        let current = args.first().cloned().unwrap_or(SqliteValue::Null);
252        let is_new_peer = match &state.last_order_value {
253            None => true,
254            Some(last) => &current != last,
255        };
256        if is_new_peer {
257            state.current_rank = state.step_row_number;
258            state.last_order_value = Some(current);
259        }
260        state.ranks.push(state.current_rank);
261        Ok(())
262    }
263
264    fn inverse(&self, state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
265        state.cursor += 1;
266        Ok(())
267    }
268
269    fn value(&self, state: &Self::State) -> Result<SqliteValue> {
270        if state.partition_size <= 1 {
271            return Ok(SqliteValue::Float(0.0));
272        }
273        let rank = state.ranks.get(state.cursor).copied().unwrap_or(1);
274        let pr = (rank - 1) as f64 / (state.partition_size - 1) as f64;
275        Ok(SqliteValue::Float(pr))
276    }
277
278    fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
279        self.value(&state)
280    }
281
282    fn num_args(&self) -> i32 {
283        -1
284    }
285
286    fn min_args(&self) -> i32 {
287        0
288    }
289
290    fn max_args(&self) -> Option<i32> {
291        Some(0)
292    }
293
294    fn name(&self) -> &str {
295        "percent_rank"
296    }
297}
298
299// ═══════════════════════════════════════════════════════════════════════════
300// cume_dist()
301// ═══════════════════════════════════════════════════════════════════════════
302
303/// State for `cume_dist()`.
304///
305/// The VDBE must step() all rows first, then iterate with value()+inverse().
306/// cume_dist = (last peer position) / partition_size.
307pub struct CumeDistState {
308    partition_size: i64,
309    current_row: usize,
310    cume_positions: Vec<i64>,
311    peer_start: usize,
312    last_order_value: Option<SqliteValue>,
313}
314
315pub struct CumeDistFunc;
316
317impl WindowFunction for CumeDistFunc {
318    type State = CumeDistState;
319
320    fn initial_state(&self) -> Self::State {
321        CumeDistState {
322            partition_size: 0,
323            current_row: 0,
324            cume_positions: Vec::new(),
325            peer_start: 0,
326            last_order_value: None,
327        }
328    }
329
330    fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
331        let current = args.first().cloned().unwrap_or(SqliteValue::Null);
332        let is_new_peer = match &state.last_order_value {
333            None => true,
334            Some(last) => &current != last,
335        };
336        if is_new_peer {
337            let peer_end = state.partition_size;
338            if let Some(slots) = state.cume_positions.get_mut(state.peer_start..) {
339                for slot in slots {
340                    *slot = peer_end;
341                }
342            }
343            state.peer_start = state.cume_positions.len();
344            state.last_order_value = Some(current);
345        }
346        state.partition_size += 1;
347        state.cume_positions.push(0);
348        Ok(())
349    }
350
351    fn inverse(&self, state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
352        state.current_row += 1;
353        Ok(())
354    }
355
356    fn value(&self, state: &Self::State) -> Result<SqliteValue> {
357        if state.partition_size == 0 {
358            return Ok(SqliteValue::Float(0.0));
359        }
360        let peer_end = state
361            .cume_positions
362            .get(state.current_row)
363            .copied()
364            .filter(|position| *position != 0)
365            .unwrap_or(state.partition_size);
366        let cd = peer_end as f64 / state.partition_size as f64;
367        Ok(SqliteValue::Float(cd))
368    }
369
370    fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
371        self.value(&state)
372    }
373
374    fn num_args(&self) -> i32 {
375        -1
376    }
377
378    fn min_args(&self) -> i32 {
379        0
380    }
381
382    fn max_args(&self) -> Option<i32> {
383        Some(0)
384    }
385
386    fn name(&self) -> &str {
387        "cume_dist"
388    }
389}
390
391// ═══════════════════════════════════════════════════════════════════════════
392// ntile(N)
393// ═══════════════════════════════════════════════════════════════════════════
394
395/// State for `ntile(N)`.
396///
397/// The VDBE must step() all rows first, then iterate with value()+inverse().
398/// Distributes rows into N groups.  If partition_size % N != 0, the first
399/// (partition_size % N) groups get one extra row.
400pub struct NtileState {
401    partition_size: i64,
402    n: i64,
403    current_row: i64,
404}
405
406pub struct NtileFunc;
407
408const INVALID_NTILE_ARGUMENT: &str = "argument of ntile must be a positive integer";
409
410impl WindowFunction for NtileFunc {
411    type State = NtileState;
412
413    fn initial_state(&self) -> Self::State {
414        NtileState {
415            partition_size: 0,
416            n: 1,
417            current_row: 0,
418        }
419    }
420
421    fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
422        if state.partition_size == 0 {
423            let n = args.first().map_or(0, SqliteValue::to_integer);
424            if n <= 0 {
425                return Err(FrankenError::function_error(INVALID_NTILE_ARGUMENT));
426            }
427            state.n = n;
428        }
429        state.partition_size += 1;
430        Ok(())
431    }
432
433    fn inverse(&self, state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
434        state.current_row += 1;
435        Ok(())
436    }
437
438    fn value(&self, state: &Self::State) -> Result<SqliteValue> {
439        if state.partition_size == 0 {
440            return Ok(SqliteValue::Integer(1));
441        }
442        let n = state.n;
443        let sz = state.partition_size;
444        let row = state.current_row + 1; // 1-based
445
446        // Group size: first (sz % n) groups get (sz / n + 1) rows,
447        // remaining groups get (sz / n) rows.
448        let base = sz / n;
449        let extra = sz % n;
450        // Rows in "large" groups: extra * (base + 1).
451        let large_rows = extra * (base + 1);
452
453        let bucket = if row <= large_rows {
454            // In one of the first `extra` groups (each of size base+1).
455            (row - 1) / (base + 1) + 1
456        } else {
457            // In one of the remaining groups (each of size base).
458            let adjusted = row - large_rows;
459            if base == 0 {
460                // More buckets than rows; each remaining row gets its own bucket.
461                extra + adjusted
462            } else {
463                extra + (adjusted - 1) / base + 1
464            }
465        };
466        Ok(SqliteValue::Integer(bucket))
467    }
468
469    fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
470        self.value(&state)
471    }
472
473    fn num_args(&self) -> i32 {
474        1
475    }
476
477    fn name(&self) -> &str {
478        "ntile"
479    }
480}
481
482// ═══════════════════════════════════════════════════════════════════════════
483// lag(X [, offset [, default]])
484// ═══════════════════════════════════════════════════════════════════════════
485
486fn numeric_prefix_len(bytes: &[u8]) -> usize {
487    let mut idx = 0;
488    if bytes
489        .get(idx)
490        .is_some_and(|byte| matches!(*byte, b'+' | b'-'))
491    {
492        idx += 1;
493    }
494
495    let mut saw_digit = false;
496    while bytes.get(idx).is_some_and(u8::is_ascii_digit) {
497        idx += 1;
498        saw_digit = true;
499    }
500
501    if bytes.get(idx) == Some(&b'.') {
502        idx += 1;
503        while bytes.get(idx).is_some_and(u8::is_ascii_digit) {
504            idx += 1;
505            saw_digit = true;
506        }
507    }
508
509    if !saw_digit {
510        return 0;
511    }
512
513    let mantissa_end = idx;
514    if bytes
515        .get(idx)
516        .is_some_and(|byte| matches!(*byte, b'e' | b'E'))
517    {
518        idx += 1;
519        if bytes
520            .get(idx)
521            .is_some_and(|byte| matches!(*byte, b'+' | b'-'))
522        {
523            idx += 1;
524        }
525        let exp_start = idx;
526        while bytes.get(idx).is_some_and(u8::is_ascii_digit) {
527            idx += 1;
528        }
529        if idx == exp_start {
530            return mantissa_end;
531        }
532    }
533
534    idx
535}
536
537fn trim_ascii_start(bytes: &[u8]) -> &[u8] {
538    let start = bytes
539        .iter()
540        .position(|byte| !byte.is_ascii_whitespace())
541        .unwrap_or(bytes.len());
542    bytes.get(start..).unwrap_or(&[])
543}
544
545fn lag_lead_bytes_offset(bytes: &[u8]) -> Option<i64> {
546    let trimmed = trim_ascii_start(bytes);
547    let prefix_len = numeric_prefix_len(trimmed);
548    if prefix_len == 0 {
549        return Some(0);
550    }
551    let prefix = trimmed
552        .get(..prefix_len)
553        .and_then(|bytes| std::str::from_utf8(bytes).ok())?;
554    if prefix
555        .as_bytes()
556        .iter()
557        .any(|byte| matches!(*byte, b'.' | b'e' | b'E'))
558    {
559        prefix.parse().ok().and_then(integral_f64_to_i64)
560    } else {
561        prefix
562            .parse()
563            .ok()
564            .or_else(|| prefix.parse().ok().and_then(integral_f64_to_i64))
565    }
566}
567
568fn lag_lead_text_offset(text: &str) -> Option<i64> {
569    lag_lead_bytes_offset(text.as_bytes())
570}
571
572fn lag_lead_offset_arg(value: Option<&SqliteValue>) -> Option<i64> {
573    match value {
574        None => Some(1),
575        Some(SqliteValue::Null) => None,
576        Some(SqliteValue::Integer(offset)) => Some(*offset),
577        Some(SqliteValue::Float(offset)) => integral_f64_to_i64(*offset),
578        Some(SqliteValue::Text(text)) => lag_lead_text_offset(text),
579        Some(SqliteValue::Blob(bytes)) => lag_lead_bytes_offset(bytes),
580    }
581}
582
583/// State for `lag()`: maintains a buffer of previous values.
584pub struct LagState {
585    buffer: Vec<SqliteValue>,
586    offsets: Vec<Option<i64>>,
587    defaults: Vec<SqliteValue>,
588    current_row: i64,
589}
590
591pub struct LagFunc;
592
593impl WindowFunction for LagFunc {
594    type State = LagState;
595
596    fn initial_state(&self) -> Self::State {
597        LagState {
598            buffer: Vec::new(),
599            offsets: Vec::new(),
600            defaults: Vec::new(),
601            current_row: 0,
602        }
603    }
604
605    fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
606        let val = args.first().cloned().unwrap_or(SqliteValue::Null);
607        let offset = lag_lead_offset_arg(args.get(1));
608        let default_val = args.get(2).cloned().unwrap_or(SqliteValue::Null);
609        state.buffer.push(val);
610        state.offsets.push(offset);
611        state.defaults.push(default_val);
612        Ok(())
613    }
614
615    fn inverse(&self, state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
616        state.current_row += 1;
617        Ok(())
618    }
619
620    fn value(&self, state: &Self::State) -> Result<SqliteValue> {
621        let current_index = usize::try_from(state.current_row).unwrap_or(usize::MAX);
622        let default_val = state
623            .defaults
624            .get(current_index)
625            .cloned()
626            .unwrap_or(SqliteValue::Null);
627        let Some(offset) = state.offsets.get(current_index).copied().flatten() else {
628            return Ok(default_val);
629        };
630        let target = state.current_row - offset;
631        let Ok(target_index) = usize::try_from(target) else {
632            return Ok(default_val);
633        };
634        Ok(state
635            .buffer
636            .get(target_index)
637            .cloned()
638            .unwrap_or(default_val))
639    }
640
641    fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
642        self.value(&state)
643    }
644
645    fn num_args(&self) -> i32 {
646        -1 // 1, 2, or 3 args
647    }
648
649    fn min_args(&self) -> i32 {
650        1
651    }
652
653    fn max_args(&self) -> Option<i32> {
654        Some(3)
655    }
656
657    fn name(&self) -> &str {
658        "lag"
659    }
660}
661
662// ═══════════════════════════════════════════════════════════════════════════
663// lead(X [, offset [, default]])
664// ═══════════════════════════════════════════════════════════════════════════
665
666/// State for `lead()`: maintains a buffer and reads ahead.
667pub struct LeadState {
668    buffer: Vec<SqliteValue>,
669    offsets: Vec<Option<i64>>,
670    defaults: Vec<SqliteValue>,
671    current_row: i64,
672}
673
674pub struct LeadFunc;
675
676impl WindowFunction for LeadFunc {
677    type State = LeadState;
678
679    fn initial_state(&self) -> Self::State {
680        LeadState {
681            buffer: Vec::new(),
682            offsets: Vec::new(),
683            defaults: Vec::new(),
684            current_row: 0,
685        }
686    }
687
688    fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
689        let val = args.first().cloned().unwrap_or(SqliteValue::Null);
690        let offset = lag_lead_offset_arg(args.get(1));
691        let default_val = args.get(2).cloned().unwrap_or(SqliteValue::Null);
692        state.buffer.push(val);
693        state.offsets.push(offset);
694        state.defaults.push(default_val);
695        Ok(())
696    }
697
698    fn inverse(&self, state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
699        state.current_row += 1;
700        Ok(())
701    }
702
703    fn value(&self, state: &Self::State) -> Result<SqliteValue> {
704        let current_index = usize::try_from(state.current_row).unwrap_or(usize::MAX);
705        let default_val = state
706            .defaults
707            .get(current_index)
708            .cloned()
709            .unwrap_or(SqliteValue::Null);
710        let Some(offset) = state.offsets.get(current_index).copied().flatten() else {
711            return Ok(default_val);
712        };
713        let target = state.current_row + offset;
714        if target < 0 || target >= state.buffer.len() as i64 {
715            return Ok(default_val);
716        }
717        Ok(state.buffer[target as usize].clone())
718    }
719
720    fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
721        self.value(&state)
722    }
723
724    fn num_args(&self) -> i32 {
725        -1
726    }
727
728    fn min_args(&self) -> i32 {
729        1
730    }
731
732    fn max_args(&self) -> Option<i32> {
733        Some(3)
734    }
735
736    fn name(&self) -> &str {
737        "lead"
738    }
739}
740
741// ═══════════════════════════════════════════════════════════════════════════
742// first_value(X)
743// ═══════════════════════════════════════════════════════════════════════════
744
745pub struct FirstValueState {
746    first: Option<SqliteValue>,
747}
748
749pub struct FirstValueFunc;
750
751impl WindowFunction for FirstValueFunc {
752    type State = FirstValueState;
753
754    fn initial_state(&self) -> Self::State {
755        FirstValueState { first: None }
756    }
757
758    fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
759        if state.first.is_none() {
760            state.first = Some(args.first().cloned().unwrap_or(SqliteValue::Null));
761        }
762        Ok(())
763    }
764
765    fn inverse(&self, state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
766        // When the first row exits the frame, we need to clear so the next
767        // step() captures the new first value.  For simplicity, we use a
768        // VecDeque-based approach in FirstValueFrameFunc below.  This basic
769        // version handles the common UNBOUNDED PRECEDING case correctly.
770        state.first = None;
771        Ok(())
772    }
773
774    fn value(&self, state: &Self::State) -> Result<SqliteValue> {
775        Ok(state.first.clone().unwrap_or(SqliteValue::Null))
776    }
777
778    fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
779        Ok(state.first.unwrap_or(SqliteValue::Null))
780    }
781
782    fn num_args(&self) -> i32 {
783        1
784    }
785
786    fn name(&self) -> &str {
787        "first_value"
788    }
789}
790
791// ═══════════════════════════════════════════════════════════════════════════
792// last_value(X)
793// ═══════════════════════════════════════════════════════════════════════════
794
795pub struct LastValueState {
796    frame: VecDeque<SqliteValue>,
797}
798
799pub struct LastValueFunc;
800
801impl WindowFunction for LastValueFunc {
802    type State = LastValueState;
803
804    fn initial_state(&self) -> Self::State {
805        LastValueState {
806            frame: VecDeque::new(),
807        }
808    }
809
810    fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
811        state
812            .frame
813            .push_back(args.first().cloned().unwrap_or(SqliteValue::Null));
814        Ok(())
815    }
816
817    fn inverse(&self, state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
818        state.frame.pop_front();
819        Ok(())
820    }
821
822    fn value(&self, state: &Self::State) -> Result<SqliteValue> {
823        Ok(state.frame.back().cloned().unwrap_or(SqliteValue::Null))
824    }
825
826    fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
827        Ok(state.frame.back().cloned().unwrap_or(SqliteValue::Null))
828    }
829
830    fn num_args(&self) -> i32 {
831        1
832    }
833
834    fn name(&self) -> &str {
835        "last_value"
836    }
837}
838
839// ═══════════════════════════════════════════════════════════════════════════
840// nth_value(X, N)
841// ═══════════════════════════════════════════════════════════════════════════
842
843pub struct NthValueState {
844    frame: VecDeque<SqliteValue>,
845    n: i64,
846}
847
848pub struct NthValueFunc;
849
850const INVALID_NTH_VALUE_ARGUMENT: &str = "second argument to nth_value must be a positive integer";
851
852fn integral_f64_to_i64(value: f64) -> Option<i64> {
853    const I64_MIN_AS_F64: f64 = -9_223_372_036_854_775_808.0;
854    const I64_MAX_EXCLUSIVE_AS_F64: f64 = 9_223_372_036_854_775_808.0;
855
856    if !value.is_finite()
857        || value.fract() != 0.0
858        || !(I64_MIN_AS_F64..I64_MAX_EXCLUSIVE_AS_F64).contains(&value)
859    {
860        return None;
861    }
862    Some(value as i64)
863}
864
865fn parse_integral_text(text: &str) -> Option<i64> {
866    let trimmed = text.trim();
867    if trimmed.is_empty() {
868        return None;
869    }
870    trimmed
871        .parse()
872        .ok()
873        .or_else(|| trimmed.parse().ok().and_then(integral_f64_to_i64))
874}
875
876fn nth_value_positive_integer_arg(value: Option<&SqliteValue>) -> Result<i64> {
877    let Some(value) = value else {
878        return Err(FrankenError::function_error(INVALID_NTH_VALUE_ARGUMENT));
879    };
880    let n = match value {
881        SqliteValue::Integer(n) => Some(*n),
882        SqliteValue::Float(n) => integral_f64_to_i64(*n),
883        SqliteValue::Text(text) => parse_integral_text(text),
884        SqliteValue::Null | SqliteValue::Blob(_) => None,
885    };
886    match n {
887        Some(n) if n > 0 => Ok(n),
888        _ => Err(FrankenError::function_error(INVALID_NTH_VALUE_ARGUMENT)),
889    }
890}
891
892impl WindowFunction for NthValueFunc {
893    type State = NthValueState;
894
895    fn initial_state(&self) -> Self::State {
896        NthValueState {
897            frame: VecDeque::new(),
898            n: 1,
899        }
900    }
901
902    fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
903        let val = args.first().cloned().unwrap_or(SqliteValue::Null);
904        let n = nth_value_positive_integer_arg(args.get(1))?;
905        // Capture N from second arg on first call.
906        if state.frame.is_empty() {
907            state.n = n;
908        }
909        state.frame.push_back(val);
910        Ok(())
911    }
912
913    fn inverse(&self, state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
914        state.frame.pop_front();
915        Ok(())
916    }
917
918    fn value(&self, state: &Self::State) -> Result<SqliteValue> {
919        let idx = (state.n - 1) as usize;
920        Ok(state.frame.get(idx).cloned().unwrap_or(SqliteValue::Null))
921    }
922
923    fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
924        self.value(&state)
925    }
926
927    fn num_args(&self) -> i32 {
928        2
929    }
930
931    fn name(&self) -> &str {
932        "nth_value"
933    }
934}
935
936// ═══════════════════════════════════════════════════════════════════════════
937// Aggregate-as-window functions: SUM, AVG, COUNT, MIN, MAX, TOTAL
938// SQLite allows any aggregate function to be used as a window function.
939// ═══════════════════════════════════════════════════════════════════════════
940
941pub struct WindowSumState {
942    sum: f64,
943    err: f64,
944    has_value: bool,
945    is_int: bool,
946    int_sum: i64,
947    overflowed: bool,
948}
949
950/// Kahan-Babuska-Neumaier compensated summation step (matches C SQLite func.c:1871).
951#[inline]
952fn kbn_step(sum: &mut f64, err: &mut f64, value: f64) {
953    let s = *sum;
954    let t = s + value;
955    if s.abs() > value.abs() {
956        *err += (s - t) + value;
957    } else {
958        *err += (value - t) + s;
959    }
960    *sum = t;
961}
962
963pub struct WindowSumFunc;
964
965impl WindowFunction for WindowSumFunc {
966    type State = WindowSumState;
967
968    fn initial_state(&self) -> Self::State {
969        WindowSumState {
970            sum: 0.0,
971            err: 0.0,
972            has_value: false,
973            is_int: true,
974            int_sum: 0,
975            overflowed: false,
976        }
977    }
978
979    fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
980        if args.is_empty() || args[0].is_null() {
981            return Ok(());
982        }
983        let value = args[0].to_sum_numeric_value();
984        if value.is_null() {
985            return Ok(());
986        }
987        state.has_value = true;
988        match value {
989            SqliteValue::Integer(i) => {
990                if state.is_int && !state.overflowed {
991                    match state.int_sum.checked_add(i) {
992                        Some(s) => state.int_sum = s,
993                        None => state.overflowed = true,
994                    }
995                }
996                kbn_step(&mut state.sum, &mut state.err, i as f64);
997            }
998            SqliteValue::Float(f) => {
999                state.is_int = false;
1000                kbn_step(&mut state.sum, &mut state.err, f);
1001            }
1002            SqliteValue::Null | SqliteValue::Text(_) | SqliteValue::Blob(_) => {}
1003        }
1004        Ok(())
1005    }
1006
1007    fn inverse(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
1008        if args.is_empty() || args[0].is_null() {
1009            return Ok(());
1010        }
1011        let value = args[0].to_sum_numeric_value();
1012        match value {
1013            SqliteValue::Integer(i) => {
1014                if state.is_int && !state.overflowed {
1015                    match state.int_sum.checked_sub(i) {
1016                        Some(s) => state.int_sum = s,
1017                        None => state.overflowed = true,
1018                    }
1019                }
1020                kbn_step(&mut state.sum, &mut state.err, -(i as f64));
1021            }
1022            SqliteValue::Float(f) => {
1023                state.is_int = false;
1024                kbn_step(&mut state.sum, &mut state.err, -f);
1025            }
1026            SqliteValue::Null | SqliteValue::Text(_) | SqliteValue::Blob(_) => {}
1027        }
1028        Ok(())
1029    }
1030
1031    fn value(&self, state: &Self::State) -> Result<SqliteValue> {
1032        if !state.has_value {
1033            return Ok(SqliteValue::Null);
1034        }
1035        if state.is_int && state.overflowed {
1036            return Err(FrankenError::IntegerOverflow);
1037        }
1038        if state.is_int {
1039            Ok(SqliteValue::Integer(state.int_sum))
1040        } else {
1041            Ok(SqliteValue::Float(state.sum + state.err))
1042        }
1043    }
1044
1045    fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
1046        self.value(&state)
1047    }
1048
1049    fn num_args(&self) -> i32 {
1050        1
1051    }
1052
1053    fn name(&self) -> &str {
1054        "SUM"
1055    }
1056}
1057
1058pub struct WindowTotalFunc;
1059
1060impl WindowFunction for WindowTotalFunc {
1061    type State = f64;
1062
1063    fn initial_state(&self) -> Self::State {
1064        0.0
1065    }
1066
1067    fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
1068        if !args.is_empty() && !args[0].is_null() {
1069            *state += args[0].to_float();
1070        }
1071        Ok(())
1072    }
1073
1074    fn inverse(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
1075        if !args.is_empty() && !args[0].is_null() {
1076            *state -= args[0].to_float();
1077        }
1078        Ok(())
1079    }
1080
1081    fn value(&self, state: &Self::State) -> Result<SqliteValue> {
1082        Ok(SqliteValue::Float(*state))
1083    }
1084
1085    fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
1086        Ok(SqliteValue::Float(state))
1087    }
1088
1089    fn num_args(&self) -> i32 {
1090        1
1091    }
1092
1093    fn name(&self) -> &str {
1094        "TOTAL"
1095    }
1096}
1097
1098pub struct WindowCountState {
1099    count: i64,
1100}
1101
1102pub struct WindowCountFunc;
1103
1104impl WindowFunction for WindowCountFunc {
1105    type State = WindowCountState;
1106
1107    fn initial_state(&self) -> Self::State {
1108        WindowCountState { count: 0 }
1109    }
1110
1111    fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
1112        // COUNT(*) has 0 args → count all rows; COUNT(x) skips NULLs.
1113        if args.is_empty() || !args[0].is_null() {
1114            state.count += 1;
1115        }
1116        Ok(())
1117    }
1118
1119    fn inverse(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
1120        if args.is_empty() || !args[0].is_null() {
1121            state.count -= 1;
1122        }
1123        Ok(())
1124    }
1125
1126    fn value(&self, state: &Self::State) -> Result<SqliteValue> {
1127        Ok(SqliteValue::Integer(state.count))
1128    }
1129
1130    fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
1131        Ok(SqliteValue::Integer(state.count))
1132    }
1133
1134    fn num_args(&self) -> i32 {
1135        -1 // variadic: COUNT(*) = 0 args, COUNT(x) = 1 arg
1136    }
1137
1138    fn min_args(&self) -> i32 {
1139        0
1140    }
1141
1142    fn max_args(&self) -> Option<i32> {
1143        Some(1)
1144    }
1145
1146    fn name(&self) -> &str {
1147        "COUNT"
1148    }
1149}
1150
1151pub struct WindowMinState {
1152    min: Option<SqliteValue>,
1153}
1154
1155pub struct WindowMinFunc;
1156
1157impl WindowFunction for WindowMinFunc {
1158    type State = WindowMinState;
1159
1160    fn initial_state(&self) -> Self::State {
1161        WindowMinState { min: None }
1162    }
1163
1164    fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
1165        if args.is_empty() || args[0].is_null() {
1166            return Ok(());
1167        }
1168        state.min = Some(match state.min.take() {
1169            None => args[0].clone(),
1170            Some(cur) => {
1171                if cmp_values(&args[0], &cur) == std::cmp::Ordering::Less {
1172                    args[0].clone()
1173                } else {
1174                    cur
1175                }
1176            }
1177        });
1178        Ok(())
1179    }
1180
1181    fn inverse(&self, _state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
1182        // MIN inverse is not efficiently invertible; no-op for unbounded frames.
1183        Ok(())
1184    }
1185
1186    fn value(&self, state: &Self::State) -> Result<SqliteValue> {
1187        Ok(state.min.clone().unwrap_or(SqliteValue::Null))
1188    }
1189
1190    fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
1191        Ok(state.min.unwrap_or(SqliteValue::Null))
1192    }
1193
1194    fn num_args(&self) -> i32 {
1195        1
1196    }
1197
1198    fn name(&self) -> &str {
1199        "MIN"
1200    }
1201}
1202
1203pub struct WindowMaxState {
1204    max: Option<SqliteValue>,
1205}
1206
1207pub struct WindowMaxFunc;
1208
1209impl WindowFunction for WindowMaxFunc {
1210    type State = WindowMaxState;
1211
1212    fn initial_state(&self) -> Self::State {
1213        WindowMaxState { max: None }
1214    }
1215
1216    fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
1217        if args.is_empty() || args[0].is_null() {
1218            return Ok(());
1219        }
1220        state.max = Some(match state.max.take() {
1221            None => args[0].clone(),
1222            Some(cur) => {
1223                if cmp_values(&args[0], &cur) == std::cmp::Ordering::Greater {
1224                    args[0].clone()
1225                } else {
1226                    cur
1227                }
1228            }
1229        });
1230        Ok(())
1231    }
1232
1233    fn inverse(&self, _state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
1234        Ok(())
1235    }
1236
1237    fn value(&self, state: &Self::State) -> Result<SqliteValue> {
1238        Ok(state.max.clone().unwrap_or(SqliteValue::Null))
1239    }
1240
1241    fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
1242        Ok(state.max.unwrap_or(SqliteValue::Null))
1243    }
1244
1245    fn num_args(&self) -> i32 {
1246        1
1247    }
1248
1249    fn name(&self) -> &str {
1250        "MAX"
1251    }
1252}
1253
1254pub struct WindowAvgState {
1255    sum: f64,
1256    err: f64,
1257    count: i64,
1258}
1259
1260pub struct WindowAvgFunc;
1261
1262impl WindowFunction for WindowAvgFunc {
1263    type State = WindowAvgState;
1264
1265    fn initial_state(&self) -> Self::State {
1266        WindowAvgState {
1267            sum: 0.0,
1268            err: 0.0,
1269            count: 0,
1270        }
1271    }
1272
1273    fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
1274        if args.is_empty() || args[0].is_null() {
1275            return Ok(());
1276        }
1277        kbn_step(&mut state.sum, &mut state.err, args[0].to_float());
1278        state.count += 1;
1279        Ok(())
1280    }
1281
1282    fn inverse(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
1283        if args.is_empty() || args[0].is_null() {
1284            return Ok(());
1285        }
1286        kbn_step(&mut state.sum, &mut state.err, -args[0].to_float());
1287        state.count -= 1;
1288        Ok(())
1289    }
1290
1291    fn value(&self, state: &Self::State) -> Result<SqliteValue> {
1292        if state.count == 0 {
1293            Ok(SqliteValue::Null)
1294        } else {
1295            #[allow(clippy::cast_precision_loss)]
1296            Ok(SqliteValue::Float(
1297                (state.sum + state.err) / state.count as f64,
1298            ))
1299        }
1300    }
1301
1302    fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
1303        self.value(&state)
1304    }
1305
1306    fn num_args(&self) -> i32 {
1307        1
1308    }
1309
1310    fn name(&self) -> &str {
1311        "AVG"
1312    }
1313}
1314
1315pub struct WindowGroupConcatState {
1316    result: String,
1317    has_value: bool,
1318}
1319
1320fn window_group_concat_step(state: &mut WindowGroupConcatState, args: &[SqliteValue]) {
1321    if args.is_empty() || args[0].is_null() {
1322        return;
1323    }
1324    if state.has_value {
1325        match args.get(1) {
1326            Some(separator) if !separator.is_null() => {
1327                if let Some(text) = separator.as_text_str() {
1328                    state.result.push_str(text);
1329                } else {
1330                    state.result.push_str(&separator.to_text());
1331                }
1332            }
1333            Some(_) => {}
1334            None => state.result.push(','),
1335        }
1336    }
1337    if let Some(text) = args[0].as_text_str() {
1338        state.result.push_str(text);
1339    } else {
1340        state.result.push_str(&args[0].to_text());
1341    }
1342    state.has_value = true;
1343}
1344
1345fn window_group_concat_value(state: &WindowGroupConcatState) -> SqliteValue {
1346    if state.has_value {
1347        SqliteValue::Text(state.result.clone().into())
1348    } else {
1349        SqliteValue::Null
1350    }
1351}
1352
1353pub struct WindowGroupConcatFunc;
1354
1355impl WindowFunction for WindowGroupConcatFunc {
1356    type State = WindowGroupConcatState;
1357
1358    fn initial_state(&self) -> Self::State {
1359        WindowGroupConcatState {
1360            result: String::new(),
1361            has_value: false,
1362        }
1363    }
1364
1365    fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
1366        window_group_concat_step(state, args);
1367        Ok(())
1368    }
1369
1370    fn inverse(&self, _state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
1371        // Sliding ROWS/RANGE/GROUPS frames are recomputed by the connection-level
1372        // window executor; the remaining paths never evict rows from the left edge.
1373        Ok(())
1374    }
1375
1376    fn value(&self, state: &Self::State) -> Result<SqliteValue> {
1377        Ok(window_group_concat_value(state))
1378    }
1379
1380    fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
1381        Ok(window_group_concat_value(&state))
1382    }
1383
1384    fn num_args(&self) -> i32 {
1385        -1
1386    }
1387
1388    fn min_args(&self) -> i32 {
1389        1
1390    }
1391
1392    fn max_args(&self) -> Option<i32> {
1393        Some(2)
1394    }
1395
1396    fn name(&self) -> &str {
1397        "group_concat"
1398    }
1399}
1400
1401pub struct WindowStringAggFunc;
1402
1403impl WindowFunction for WindowStringAggFunc {
1404    type State = WindowGroupConcatState;
1405
1406    fn initial_state(&self) -> Self::State {
1407        WindowGroupConcatState {
1408            result: String::new(),
1409            has_value: false,
1410        }
1411    }
1412
1413    fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
1414        window_group_concat_step(state, args);
1415        Ok(())
1416    }
1417
1418    fn inverse(&self, _state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
1419        Ok(())
1420    }
1421
1422    fn value(&self, state: &Self::State) -> Result<SqliteValue> {
1423        Ok(window_group_concat_value(state))
1424    }
1425
1426    fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
1427        Ok(window_group_concat_value(&state))
1428    }
1429
1430    fn num_args(&self) -> i32 {
1431        2
1432    }
1433
1434    fn name(&self) -> &str {
1435        "string_agg"
1436    }
1437}
1438
1439/// Compare two SQLite values using type-aware ordering.
1440pub fn cmp_values(a: &SqliteValue, b: &SqliteValue) -> std::cmp::Ordering {
1441    a.cmp(b)
1442}
1443
1444// ── Registration ──────────────────────────────────────────────────────────
1445
1446/// Register all S13.5 window functions.
1447pub fn register_window_builtins(registry: &mut FunctionRegistry) {
1448    registry.register_window(RowNumberFunc);
1449    registry.register_window(RankFunc);
1450    registry.register_window(DenseRankFunc);
1451    registry.register_window(PercentRankFunc);
1452    registry.register_window(CumeDistFunc);
1453    registry.register_window(NtileFunc);
1454    registry.register_window(LagFunc);
1455    registry.register_window(LeadFunc);
1456    registry.register_window(FirstValueFunc);
1457    registry.register_window(LastValueFunc);
1458    registry.register_window(NthValueFunc);
1459
1460    // Aggregate-as-window functions (SQLite allows any aggregate as a window fn).
1461    registry.register_window(WindowSumFunc);
1462    registry.register_window(WindowTotalFunc);
1463    registry.register_window(WindowCountFunc);
1464    registry.register_window(WindowMinFunc);
1465    registry.register_window(WindowMaxFunc);
1466    registry.register_window(WindowAvgFunc);
1467    registry.register_window(WindowGroupConcatFunc);
1468    registry.register_window(WindowStringAggFunc);
1469}
1470
1471// ── Tests ─────────────────────────────────────────────────────────────────
1472
1473#[cfg(test)]
1474mod tests {
1475    use super::*;
1476
1477    fn int(v: i64) -> SqliteValue {
1478        SqliteValue::Integer(v)
1479    }
1480
1481    fn float(v: f64) -> SqliteValue {
1482        SqliteValue::Float(v)
1483    }
1484
1485    fn text(s: &str) -> SqliteValue {
1486        SqliteValue::Text(s.into())
1487    }
1488
1489    fn blob(bytes: &[u8]) -> SqliteValue {
1490        SqliteValue::Blob(std::sync::Arc::from(bytes))
1491    }
1492
1493    fn null() -> SqliteValue {
1494        SqliteValue::Null
1495    }
1496
1497    fn assert_function_error(err: FrankenError, expected: &str) {
1498        assert!(
1499            matches!(&err, FrankenError::FunctionError(message) if message == expected),
1500            "expected function error {expected:?}, got {err:?}"
1501        );
1502    }
1503
1504    fn assert_float_near(value: &SqliteValue, expected: f64) {
1505        assert!(
1506            matches!(value, SqliteValue::Float(_)),
1507            "expected Float, got {value:?}"
1508        );
1509        if let SqliteValue::Float(actual) = value {
1510            assert!(
1511                (*actual - expected).abs() < 1e-10,
1512                "expected {expected}, got {actual}"
1513            );
1514        }
1515    }
1516
1517    /// Simulate a partition by calling step() for each row, collecting
1518    /// value() after each step.  Returns the vector of per-row results.
1519    /// Suitable for progressive functions (row_number, rank, dense_rank).
1520    fn run_window_partition<F: WindowFunction>(
1521        func: &F,
1522        rows: &[Vec<SqliteValue>],
1523    ) -> Vec<SqliteValue> {
1524        let mut state = func.initial_state();
1525        let mut results = Vec::new();
1526        for row in rows {
1527            func.step(&mut state, row).unwrap();
1528            results.push(func.value(&state).unwrap());
1529        }
1530        results
1531    }
1532
1533    /// Two-pass partition simulation: step all rows first (pass 1), then
1534    /// iterate calling value()+inverse() for each row (pass 2).
1535    /// Required for functions that need full partition size (ntile,
1536    /// percent_rank, cume_dist).
1537    fn run_window_two_pass<F: WindowFunction>(
1538        func: &F,
1539        rows: &[Vec<SqliteValue>],
1540    ) -> Vec<SqliteValue> {
1541        let mut state = func.initial_state();
1542        // Pass 1: step all rows.
1543        for row in rows {
1544            func.step(&mut state, row).unwrap();
1545        }
1546        // Pass 2: read values, advance cursor via inverse().
1547        let mut results = Vec::new();
1548        for (i, _) in rows.iter().enumerate() {
1549            results.push(func.value(&state).unwrap());
1550            if i < rows.len() - 1 {
1551                func.inverse(&mut state, &[]).unwrap();
1552            }
1553        }
1554        results
1555    }
1556
1557    // ── row_number ───────────────────────────────────────────────────
1558
1559    #[test]
1560    fn test_row_number_basic() {
1561        let results =
1562            run_window_partition(&RowNumberFunc, &[vec![], vec![], vec![], vec![], vec![]]);
1563        assert_eq!(results, vec![int(1), int(2), int(3), int(4), int(5)]);
1564    }
1565
1566    #[test]
1567    fn test_row_number_partition_reset() {
1568        // Partition 1: 3 rows.
1569        let r1 = run_window_partition(&RowNumberFunc, &[vec![], vec![], vec![]]);
1570        assert_eq!(r1, vec![int(1), int(2), int(3)]);
1571
1572        // Partition 2: 2 rows (fresh state).
1573        let r2 = run_window_partition(&RowNumberFunc, &[vec![], vec![]]);
1574        assert_eq!(r2, vec![int(1), int(2)]);
1575    }
1576
1577    // ── rank ─────────────────────────────────────────────────────────
1578
1579    #[test]
1580    fn test_rank_with_ties() {
1581        // Values: [1, 2, 2, 3] -> ranks: [1, 2, 2, 4]
1582        let results = run_window_partition(
1583            &RankFunc,
1584            &[vec![int(1)], vec![int(2)], vec![int(2)], vec![int(3)]],
1585        );
1586        assert_eq!(results, vec![int(1), int(2), int(2), int(4)]);
1587    }
1588
1589    #[test]
1590    fn test_rank_no_ties() {
1591        let results =
1592            run_window_partition(&RankFunc, &[vec![int(10)], vec![int(20)], vec![int(30)]]);
1593        assert_eq!(results, vec![int(1), int(2), int(3)]);
1594    }
1595
1596    // ── dense_rank ───────────────────────────────────────────────────
1597
1598    #[test]
1599    fn test_dense_rank_with_ties() {
1600        // Values: [1, 2, 2, 3] -> dense_ranks: [1, 2, 2, 3]
1601        let results = run_window_partition(
1602            &DenseRankFunc,
1603            &[vec![int(1)], vec![int(2)], vec![int(2)], vec![int(3)]],
1604        );
1605        assert_eq!(results, vec![int(1), int(2), int(2), int(3)]);
1606    }
1607
1608    #[test]
1609    fn test_dense_rank_multiple_ties() {
1610        // Values: [1, 1, 2, 2, 3] -> dense_ranks: [1, 1, 2, 2, 3]
1611        let results = run_window_partition(
1612            &DenseRankFunc,
1613            &[
1614                vec![int(1)],
1615                vec![int(1)],
1616                vec![int(2)],
1617                vec![int(2)],
1618                vec![int(3)],
1619            ],
1620        );
1621        assert_eq!(results, vec![int(1), int(1), int(2), int(2), int(3)]);
1622    }
1623
1624    // ── percent_rank ─────────────────────────────────────────────────
1625
1626    #[test]
1627    fn test_percent_rank_single_row() {
1628        let results = run_window_two_pass(&PercentRankFunc, &[vec![int(1)]]);
1629        assert_eq!(results, vec![SqliteValue::Float(0.0)]);
1630    }
1631
1632    #[test]
1633    fn test_percent_rank_formula() {
1634        // 4 rows, values [1, 2, 2, 3] -> ranks [1, 2, 2, 4]
1635        // percent_rank = (rank - 1) / (N - 1) = (rank - 1) / 3
1636        let results = run_window_two_pass(
1637            &PercentRankFunc,
1638            &[vec![int(1)], vec![int(2)], vec![int(2)], vec![int(3)]],
1639        );
1640        // Row 1: (1-1)/3 = 0.0
1641        // Row 2: (2-1)/3 = 0.333...
1642        // Row 3: (2-1)/3 = 0.333... (same rank as row 2)
1643        // Row 4: (4-1)/3 = 1.0
1644        assert_float_near(&results[0], 0.0);
1645        assert_float_near(&results[1], 1.0 / 3.0);
1646        assert_float_near(&results[2], 1.0 / 3.0);
1647        assert_float_near(&results[3], 1.0);
1648    }
1649
1650    // ── cume_dist ────────────────────────────────────────────────────
1651
1652    #[test]
1653    fn test_cume_dist_distinct() {
1654        // 4 distinct values: cume_dist = [0.25, 0.5, 0.75, 1.0]
1655        let results = run_window_two_pass(
1656            &CumeDistFunc,
1657            &[vec![int(1)], vec![int(2)], vec![int(3)], vec![int(4)]],
1658        );
1659        for (i, expected) in [0.25, 0.5, 0.75, 1.0].iter().enumerate() {
1660            assert_float_near(&results[i], *expected);
1661        }
1662    }
1663
1664    #[test]
1665    fn test_cume_dist_with_ties() {
1666        // Values [1, 2, 2, 3] -> last peer positions [1, 3, 3, 4].
1667        let results = run_window_two_pass(
1668            &CumeDistFunc,
1669            &[vec![int(1)], vec![int(2)], vec![int(2)], vec![int(3)]],
1670        );
1671        for (i, expected) in [0.25, 0.75, 0.75, 1.0].iter().enumerate() {
1672            assert_float_near(&results[i], *expected);
1673        }
1674    }
1675
1676    #[test]
1677    fn test_cume_dist_without_order_treats_partition_as_one_peer_group() {
1678        let results = run_window_two_pass(&CumeDistFunc, &[vec![], vec![], vec![]]);
1679        for value in results {
1680            assert_float_near(&value, 1.0);
1681        }
1682    }
1683
1684    // ── ntile ────────────────────────────────────────────────────────
1685
1686    #[test]
1687    fn test_ntile_even() {
1688        // ntile(4) over 8 rows: groups of 2 each -> [1,1,2,2,3,3,4,4]
1689        let rows: Vec<Vec<SqliteValue>> = (0..8).map(|_| vec![int(4)]).collect();
1690        let results = run_window_two_pass(&NtileFunc, &rows);
1691        assert_eq!(
1692            results,
1693            vec![
1694                int(1),
1695                int(1),
1696                int(2),
1697                int(2),
1698                int(3),
1699                int(3),
1700                int(4),
1701                int(4)
1702            ]
1703        );
1704    }
1705
1706    #[test]
1707    fn test_ntile_uneven() {
1708        // ntile(3) over 10 rows: groups of 4,3,3
1709        let rows: Vec<Vec<SqliteValue>> = (0..10).map(|_| vec![int(3)]).collect();
1710        let results = run_window_two_pass(&NtileFunc, &rows);
1711        assert_eq!(
1712            results,
1713            vec![
1714                int(1),
1715                int(1),
1716                int(1),
1717                int(1),
1718                int(2),
1719                int(2),
1720                int(2),
1721                int(3),
1722                int(3),
1723                int(3)
1724            ]
1725        );
1726    }
1727
1728    #[test]
1729    fn test_ntile_more_buckets_than_rows() {
1730        // ntile(10) over 3 rows: [1, 2, 3]
1731        let rows: Vec<Vec<SqliteValue>> = (0..3).map(|_| vec![int(10)]).collect();
1732        let results = run_window_two_pass(&NtileFunc, &rows);
1733        assert_eq!(results, vec![int(1), int(2), int(3)]);
1734    }
1735
1736    #[test]
1737    fn test_ntile_rejects_non_positive_argument() {
1738        for n in [0, -1] {
1739            let mut state = NtileFunc.initial_state();
1740            let err = NtileFunc.step(&mut state, &[int(n)]).unwrap_err();
1741            assert_function_error(err, INVALID_NTILE_ARGUMENT);
1742        }
1743    }
1744
1745    // ── lag ──────────────────────────────────────────────────────────
1746
1747    #[test]
1748    fn test_lag_default() {
1749        // lag(X) with default offset=1: previous row's value, NULL for first.
1750        let results = run_window_two_pass(&LagFunc, &[vec![int(10)], vec![int(20)], vec![int(30)]]);
1751        assert_eq!(results, vec![null(), int(10), int(20)]);
1752    }
1753
1754    #[test]
1755    fn test_lag_offset_3() {
1756        // lag(X, 3): 3 rows back.
1757        let results = run_window_two_pass(
1758            &LagFunc,
1759            &[
1760                vec![int(10), int(3)],
1761                vec![int(20), int(3)],
1762                vec![int(30), int(3)],
1763                vec![int(40), int(3)],
1764                vec![int(50), int(3)],
1765            ],
1766        );
1767        assert_eq!(results, vec![null(), null(), null(), int(10), int(20)]);
1768    }
1769
1770    #[test]
1771    fn test_lag_default_value() {
1772        // lag(X, 1, -1): returns -1 when no previous row.
1773        let results = run_window_two_pass(
1774            &LagFunc,
1775            &[
1776                vec![int(10), int(1), int(-1)],
1777                vec![int(20), int(1), int(-1)],
1778            ],
1779        );
1780        assert_eq!(results, vec![int(-1), int(10)]);
1781    }
1782
1783    #[test]
1784    fn test_lag_null_offset_returns_default_for_each_row() {
1785        let results = run_window_two_pass(
1786            &LagFunc,
1787            &[
1788                vec![int(10), null(), text("N/A")],
1789                vec![int(20), null(), text("N/A")],
1790            ],
1791        );
1792        assert_eq!(results, vec![text("N/A"), text("N/A")]);
1793    }
1794
1795    #[test]
1796    fn test_lag_uses_current_row_offset_and_default() {
1797        let results = run_window_two_pass(
1798            &LagFunc,
1799            &[
1800                vec![int(10), int(1), text("first")],
1801                vec![int(20), null(), text("null-offset")],
1802                vec![int(30), int(1), text("third")],
1803                vec![int(40), int(2), text("fourth")],
1804            ],
1805        );
1806        assert_eq!(
1807            results,
1808            vec![text("first"), text("null-offset"), int(20), int(20)]
1809        );
1810    }
1811
1812    #[test]
1813    fn test_lag_negative_offset_reads_following_row() {
1814        let results = run_window_two_pass(
1815            &LagFunc,
1816            &[
1817                vec![int(10), int(-1), text("N/A")],
1818                vec![int(20), int(-1), text("N/A")],
1819                vec![int(30), int(-1), text("N/A")],
1820            ],
1821        );
1822        assert_eq!(results, vec![int(20), int(30), text("N/A")]);
1823    }
1824
1825    #[test]
1826    fn test_lag_fractional_offset_uses_default() {
1827        let results = run_window_two_pass(
1828            &LagFunc,
1829            &[
1830                vec![int(10), float(1.5), text("N/A")],
1831                vec![int(20), float(1.5), text("N/A")],
1832                vec![int(30), float(1.5), text("N/A")],
1833            ],
1834        );
1835        assert_eq!(results, vec![text("N/A"), text("N/A"), text("N/A")]);
1836    }
1837
1838    #[test]
1839    fn test_lag_nonnumeric_text_offset_reads_current_row() {
1840        let results = run_window_two_pass(
1841            &LagFunc,
1842            &[
1843                vec![int(10), text("abc"), text("N/A")],
1844                vec![int(20), text("abc"), text("N/A")],
1845                vec![int(30), text("abc"), text("N/A")],
1846            ],
1847        );
1848        assert_eq!(results, vec![int(10), int(20), int(30)]);
1849    }
1850
1851    #[test]
1852    fn test_lag_integral_text_prefix_offset() {
1853        let results = run_window_two_pass(
1854            &LagFunc,
1855            &[
1856                vec![int(10), text("2.0x"), text("N/A")],
1857                vec![int(20), text("2e0x"), text("N/A")],
1858                vec![int(30), text("2x"), text("N/A")],
1859            ],
1860        );
1861        assert_eq!(results, vec![text("N/A"), text("N/A"), int(10)]);
1862    }
1863
1864    // ── lead ─────────────────────────────────────────────────────────
1865
1866    #[test]
1867    fn test_lead_default() {
1868        // lead(X): next row's value, NULL for last.
1869        // For lead, we need to step all rows first (to build the buffer),
1870        // then call inverse + value for each row.
1871        let func = LeadFunc;
1872        let mut state = func.initial_state();
1873        let rows = [int(10), int(20), int(30)];
1874
1875        // Step all rows to build the buffer.
1876        for row in &rows {
1877            func.step(&mut state, std::slice::from_ref(row)).unwrap();
1878        }
1879
1880        // Now iterate: first row is at current_row=0.
1881        let mut results = Vec::new();
1882        for _ in &rows {
1883            results.push(func.value(&state).unwrap());
1884            func.inverse(&mut state, &[]).unwrap();
1885        }
1886        assert_eq!(results, vec![int(20), int(30), null()]);
1887    }
1888
1889    #[test]
1890    fn test_lead_offset_2() {
1891        let func = LeadFunc;
1892        let mut state = func.initial_state();
1893        let rows = [int(10), int(20), int(30), int(40), int(50)];
1894
1895        for row in &rows {
1896            func.step(&mut state, &[row.clone(), int(2)]).unwrap();
1897        }
1898
1899        let mut results = Vec::new();
1900        for _ in &rows {
1901            results.push(func.value(&state).unwrap());
1902            func.inverse(&mut state, &[]).unwrap();
1903        }
1904        assert_eq!(results, vec![int(30), int(40), int(50), null(), null()]);
1905    }
1906
1907    #[test]
1908    fn test_lead_default_value() {
1909        let func = LeadFunc;
1910        let mut state = func.initial_state();
1911        let rows = [int(10), int(20)];
1912
1913        for row in &rows {
1914            func.step(&mut state, &[row.clone(), int(1), text("N/A")])
1915                .unwrap();
1916        }
1917
1918        let mut results = Vec::new();
1919        for _ in &rows {
1920            results.push(func.value(&state).unwrap());
1921            func.inverse(&mut state, &[]).unwrap();
1922        }
1923        assert_eq!(results, vec![int(20), text("N/A")]);
1924    }
1925
1926    #[test]
1927    fn test_lead_null_offset_returns_default_for_each_row() {
1928        let func = LeadFunc;
1929        let mut state = func.initial_state();
1930        let rows = [int(10), int(20)];
1931
1932        for row in &rows {
1933            func.step(&mut state, &[row.clone(), null(), text("N/A")])
1934                .unwrap();
1935        }
1936
1937        let mut results = Vec::new();
1938        for _ in &rows {
1939            results.push(func.value(&state).unwrap());
1940            func.inverse(&mut state, &[]).unwrap();
1941        }
1942        assert_eq!(results, vec![text("N/A"), text("N/A")]);
1943    }
1944
1945    #[test]
1946    fn test_lead_uses_current_row_offset_and_default() {
1947        let func = LeadFunc;
1948        let mut state = func.initial_state();
1949        let rows = [
1950            vec![int(10), int(1), text("first")],
1951            vec![int(20), null(), text("null-offset")],
1952            vec![int(30), int(1), text("third")],
1953        ];
1954
1955        for row in &rows {
1956            func.step(&mut state, row).unwrap();
1957        }
1958
1959        let mut results = Vec::new();
1960        for _ in &rows {
1961            results.push(func.value(&state).unwrap());
1962            func.inverse(&mut state, &[]).unwrap();
1963        }
1964        assert_eq!(results, vec![int(20), text("null-offset"), text("third")]);
1965    }
1966
1967    #[test]
1968    fn test_lead_negative_offset_reads_previous_row() {
1969        let func = LeadFunc;
1970        let mut state = func.initial_state();
1971        let rows = [int(10), int(20), int(30)];
1972
1973        for row in &rows {
1974            func.step(&mut state, &[row.clone(), int(-1), text("N/A")])
1975                .unwrap();
1976        }
1977
1978        let mut results = Vec::new();
1979        for _ in &rows {
1980            results.push(func.value(&state).unwrap());
1981            func.inverse(&mut state, &[]).unwrap();
1982        }
1983        assert_eq!(results, vec![text("N/A"), int(10), int(20)]);
1984    }
1985
1986    #[test]
1987    fn test_lead_fractional_offset_uses_default() {
1988        let results = run_window_two_pass(
1989            &LeadFunc,
1990            &[
1991                vec![int(10), float(1.5), text("N/A")],
1992                vec![int(20), float(1.5), text("N/A")],
1993                vec![int(30), float(1.5), text("N/A")],
1994            ],
1995        );
1996        assert_eq!(results, vec![text("N/A"), text("N/A"), text("N/A")]);
1997    }
1998
1999    #[test]
2000    fn test_lead_integral_blob_offset() {
2001        let results = run_window_two_pass(
2002            &LeadFunc,
2003            &[
2004                vec![int(10), blob(b"2.0x"), text("N/A")],
2005                vec![int(20), blob(b"2e0x"), text("N/A")],
2006                vec![int(30), blob(b"2x"), text("N/A")],
2007            ],
2008        );
2009        assert_eq!(results, vec![int(30), text("N/A"), text("N/A")]);
2010    }
2011
2012    // ── first_value ──────────────────────────────────────────────────
2013
2014    #[test]
2015    fn test_first_value_basic() {
2016        let results = run_window_partition(
2017            &FirstValueFunc,
2018            &[vec![int(10)], vec![int(20)], vec![int(30)]],
2019        );
2020        // With default frame (UNBOUNDED PRECEDING to CURRENT ROW),
2021        // first_value is always the first row's value.
2022        assert_eq!(results, vec![int(10), int(10), int(10)]);
2023    }
2024
2025    // ── last_value ───────────────────────────────────────────────────
2026
2027    #[test]
2028    fn test_last_value_default_frame() {
2029        // With default frame (RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW),
2030        // last_value returns the current row's value.
2031        let results = run_window_partition(
2032            &LastValueFunc,
2033            &[vec![int(10)], vec![int(20)], vec![int(30)]],
2034        );
2035        assert_eq!(results, vec![int(10), int(20), int(30)]);
2036    }
2037
2038    #[test]
2039    fn test_last_value_unbounded_following() {
2040        // With UNBOUNDED FOLLOWING frame, step all rows first,
2041        // then value() returns the true last value.
2042        let func = LastValueFunc;
2043        let mut state = func.initial_state();
2044        func.step(&mut state, &[int(10)]).unwrap();
2045        func.step(&mut state, &[int(20)]).unwrap();
2046        func.step(&mut state, &[int(30)]).unwrap();
2047        assert_eq!(func.value(&state).unwrap(), int(30));
2048    }
2049
2050    // ── nth_value ────────────────────────────────────────────────────
2051
2052    #[test]
2053    fn test_nth_value_basic() {
2054        let func = NthValueFunc;
2055        let mut state = func.initial_state();
2056        // Step 5 rows; N=3.
2057        func.step(&mut state, &[int(10), int(3)]).unwrap();
2058        func.step(&mut state, &[int(20), int(3)]).unwrap();
2059        func.step(&mut state, &[int(30), int(3)]).unwrap();
2060        func.step(&mut state, &[int(40), int(3)]).unwrap();
2061        func.step(&mut state, &[int(50), int(3)]).unwrap();
2062        assert_eq!(func.value(&state).unwrap(), int(30));
2063    }
2064
2065    #[test]
2066    fn test_nth_value_out_of_range() {
2067        let func = NthValueFunc;
2068        let mut state = func.initial_state();
2069        func.step(&mut state, &[int(10), int(100)]).unwrap();
2070        func.step(&mut state, &[int(20), int(100)]).unwrap();
2071        // Frame has 2 rows but N=100.
2072        assert_eq!(func.value(&state).unwrap(), null());
2073    }
2074
2075    #[test]
2076    fn test_nth_value_n_zero() {
2077        let func = NthValueFunc;
2078        let mut state = func.initial_state();
2079        let err = func.step(&mut state, &[int(10), int(0)]).unwrap_err();
2080        assert_function_error(err, INVALID_NTH_VALUE_ARGUMENT);
2081    }
2082
2083    #[test]
2084    fn test_nth_value_rejects_negative_n() {
2085        let func = NthValueFunc;
2086        let mut state = func.initial_state();
2087        let err = func.step(&mut state, &[int(10), int(-1)]).unwrap_err();
2088        assert_function_error(err, INVALID_NTH_VALUE_ARGUMENT);
2089    }
2090
2091    #[test]
2092    fn test_nth_value_accepts_integral_real_n() {
2093        let func = NthValueFunc;
2094        let mut state = func.initial_state();
2095        func.step(&mut state, &[int(10), float(2.0)]).unwrap();
2096        func.step(&mut state, &[int(20), float(2.0)]).unwrap();
2097        assert_eq!(func.value(&state).unwrap(), int(20));
2098    }
2099
2100    #[test]
2101    fn test_nth_value_accepts_integral_text_n() {
2102        let func = NthValueFunc;
2103        let mut state = func.initial_state();
2104        func.step(&mut state, &[int(10), text("2e0")]).unwrap();
2105        func.step(&mut state, &[int(20), text("2.0")]).unwrap();
2106        assert_eq!(func.value(&state).unwrap(), int(20));
2107    }
2108
2109    #[test]
2110    fn test_nth_value_rejects_fractional_real_n() {
2111        let func = NthValueFunc;
2112        let mut state = func.initial_state();
2113        let err = func.step(&mut state, &[int(10), float(1.5)]).unwrap_err();
2114        assert_function_error(err, INVALID_NTH_VALUE_ARGUMENT);
2115    }
2116
2117    #[test]
2118    fn test_nth_value_rejects_fractional_text_n() {
2119        let func = NthValueFunc;
2120        let mut state = func.initial_state();
2121        let err = func.step(&mut state, &[int(10), text("1.5")]).unwrap_err();
2122        assert_function_error(err, INVALID_NTH_VALUE_ARGUMENT);
2123    }
2124
2125    #[test]
2126    fn test_nth_value_rejects_text_numeric_prefix_n() {
2127        let func = NthValueFunc;
2128        let mut state = func.initial_state();
2129        let err = func.step(&mut state, &[int(10), text("2x")]).unwrap_err();
2130        assert_function_error(err, INVALID_NTH_VALUE_ARGUMENT);
2131    }
2132
2133    #[test]
2134    fn test_nth_value_rejects_blob_integer_n() {
2135        let func = NthValueFunc;
2136        let mut state = func.initial_state();
2137        let err = func.step(&mut state, &[int(10), blob(b"2")]).unwrap_err();
2138        assert_function_error(err, INVALID_NTH_VALUE_ARGUMENT);
2139    }
2140
2141    #[test]
2142    fn test_nth_value_rejects_non_positive_n_after_first_row() {
2143        let func = NthValueFunc;
2144        let mut state = func.initial_state();
2145        func.step(&mut state, &[int(10), int(1)]).unwrap();
2146        let err = func.step(&mut state, &[int(20), int(0)]).unwrap_err();
2147        assert_function_error(err, INVALID_NTH_VALUE_ARGUMENT);
2148    }
2149
2150    // ── sum / total / avg ────────────────────────────────────────────
2151
2152    #[test]
2153    fn test_window_sum_text_integer_literals_stay_integer() {
2154        let results = run_window_partition(&WindowSumFunc, &[vec![text("1")], vec![text("2")]]);
2155        assert_eq!(results, vec![int(1), int(3)]);
2156    }
2157
2158    #[test]
2159    fn test_window_sum_non_numeric_text_returns_real_zero() {
2160        let results = run_window_partition(&WindowSumFunc, &[vec![text("abc")]]);
2161        assert_eq!(results, vec![float(0.0)]);
2162    }
2163
2164    #[test]
2165    fn test_window_sum_overflow_suppressed_after_float_input() {
2166        let func = WindowSumFunc;
2167        let mut state = func.initial_state();
2168
2169        func.step(&mut state, &[int(i64::MAX)]).unwrap();
2170        func.step(&mut state, &[int(1)]).unwrap();
2171        assert_eq!(
2172            func.value(&state).unwrap_err().to_string(),
2173            "integer overflow"
2174        );
2175
2176        func.step(&mut state, &[float(0.5)]).unwrap();
2177        assert!(matches!(func.value(&state).unwrap(), SqliteValue::Float(_)));
2178    }
2179
2180    // ── min / max ────────────────────────────────────────────────────
2181
2182    #[test]
2183    fn test_window_min_max_use_sqlite_storage_class_order() {
2184        let text_value = text("z");
2185        let blob_value = blob(b"\0");
2186
2187        let mut min_state = WindowMinFunc.initial_state();
2188        WindowMinFunc
2189            .step(&mut min_state, std::slice::from_ref(&blob_value))
2190            .unwrap();
2191        WindowMinFunc
2192            .step(&mut min_state, std::slice::from_ref(&text_value))
2193            .unwrap();
2194        assert_eq!(WindowMinFunc.value(&min_state).unwrap(), text_value);
2195
2196        let mut max_state = WindowMaxFunc.initial_state();
2197        WindowMaxFunc
2198            .step(&mut max_state, std::slice::from_ref(&text_value))
2199            .unwrap();
2200        WindowMaxFunc
2201            .step(&mut max_state, std::slice::from_ref(&blob_value))
2202            .unwrap();
2203        assert_eq!(WindowMaxFunc.value(&max_state).unwrap(), blob_value);
2204    }
2205
2206    // ── group_concat / string_agg ────────────────────────────────────
2207
2208    #[test]
2209    fn test_window_group_concat_running_default_separator() {
2210        let results = run_window_partition(
2211            &WindowGroupConcatFunc,
2212            &[vec![text("a")], vec![text("b")], vec![text("c")]],
2213        );
2214        assert_eq!(results, vec![text("a"), text("a,b"), text("a,b,c")]);
2215    }
2216
2217    #[test]
2218    fn test_window_group_concat_running_custom_separator() {
2219        let results = run_window_partition(
2220            &WindowGroupConcatFunc,
2221            &[
2222                vec![text("a"), text(" | ")],
2223                vec![text("b"), text(" | ")],
2224                vec![text("c"), text(" | ")],
2225            ],
2226        );
2227        assert_eq!(results, vec![text("a"), text("a | b"), text("a | b | c")]);
2228    }
2229
2230    #[test]
2231    fn test_window_group_concat_skips_null_and_uses_current_row_separator() {
2232        let results = run_window_partition(
2233            &WindowGroupConcatFunc,
2234            &[
2235                vec![text("a"), text("-")],
2236                vec![null(), text("?")],
2237                vec![text("b"), text("+")],
2238                vec![text("c"), text("*")],
2239            ],
2240        );
2241        assert_eq!(
2242            results,
2243            vec![text("a"), text("a"), text("a+b"), text("a+b*c")]
2244        );
2245    }
2246
2247    #[test]
2248    fn test_window_string_agg_alias_through_registry() {
2249        let mut reg = FunctionRegistry::new();
2250        register_window_builtins(&mut reg);
2251
2252        let sa = reg.find_window("string_agg", 2).unwrap();
2253        let mut state = sa.initial_state();
2254        sa.step(&mut state, &[text("a"), text(";")]).unwrap();
2255        assert_eq!(sa.value(&state).unwrap(), text("a"));
2256        sa.step(&mut state, &[text("b"), text(";")]).unwrap();
2257        assert_eq!(sa.value(&state).unwrap(), text("a;b"));
2258    }
2259
2260    // ── Registration ─────────────────────────────────────────────────
2261
2262    #[test]
2263    fn test_register_window_builtins_all_present() {
2264        let mut reg = FunctionRegistry::new();
2265        register_window_builtins(&mut reg);
2266
2267        let expected_variadic = [
2268            "row_number",
2269            "rank",
2270            "dense_rank",
2271            "percent_rank",
2272            "cume_dist",
2273            "lag",
2274            "lead",
2275        ];
2276        for name in expected_variadic {
2277            assert!(
2278                reg.find_window(name, 0).is_some()
2279                    || reg.find_window(name, 1).is_some()
2280                    || reg.find_window(name, -1).is_some(),
2281                "window function '{name}' not registered"
2282            );
2283        }
2284
2285        assert!(
2286            reg.find_window("ntile", 1).is_some(),
2287            "ntile(1) not registered"
2288        );
2289        assert!(
2290            reg.find_window("first_value", 1).is_some(),
2291            "first_value(1) not registered"
2292        );
2293        assert!(
2294            reg.find_window("last_value", 1).is_some(),
2295            "last_value(1) not registered"
2296        );
2297        assert!(
2298            reg.find_window("nth_value", 2).is_some(),
2299            "nth_value(2) not registered"
2300        );
2301        assert!(
2302            reg.find_window("group_concat", 1).is_some(),
2303            "group_concat(1) not registered"
2304        );
2305        assert!(
2306            reg.find_window("group_concat", 2).is_some(),
2307            "group_concat(2) not registered"
2308        );
2309        assert!(
2310            reg.find_window("string_agg", 2).is_some(),
2311            "string_agg(2) not registered"
2312        );
2313
2314        for (name, arity) in [
2315            ("rank", 1),
2316            ("dense_rank", 1),
2317            ("percent_rank", 1),
2318            ("cume_dist", 1),
2319            ("lag", 0),
2320            ("lag", 4),
2321            ("lead", 0),
2322            ("lead", 4),
2323            ("count", 2),
2324            ("group_concat", 0),
2325            ("group_concat", 3),
2326        ] {
2327            let f = reg
2328                .find_window(name, arity)
2329                .expect("known window with wrong arity returns erroring window");
2330            let mut state = f.initial_state();
2331            let err = f
2332                .step(&mut state, &[])
2333                .expect_err("invalid window arity should fail");
2334            let expected = format!("wrong number of arguments to function {name}()");
2335            assert!(
2336                matches!(&err, FrankenError::FunctionError(message) if message == &expected),
2337                "unexpected error for {name}/{arity}: {err:?}"
2338            );
2339        }
2340    }
2341
2342    // ── E2E: full lifecycle through registry ─────────────────────────
2343
2344    #[test]
2345    fn test_e2e_window_row_number_through_registry() {
2346        let mut reg = FunctionRegistry::new();
2347        register_window_builtins(&mut reg);
2348
2349        let rn = reg.find_window("row_number", 0).unwrap();
2350        let mut state = rn.initial_state();
2351        rn.step(&mut state, &[]).unwrap();
2352        assert_eq!(rn.value(&state).unwrap(), int(1));
2353        rn.step(&mut state, &[]).unwrap();
2354        assert_eq!(rn.value(&state).unwrap(), int(2));
2355        rn.step(&mut state, &[]).unwrap();
2356        assert_eq!(rn.value(&state).unwrap(), int(3));
2357    }
2358
2359    #[test]
2360    fn test_e2e_window_rank_through_registry() {
2361        let mut reg = FunctionRegistry::new();
2362        register_window_builtins(&mut reg);
2363
2364        let rank = reg.find_window("rank", 0).unwrap();
2365        let mut state = rank.initial_state();
2366        // [1, 2, 2, 3] -> [1, 2, 2, 4]
2367        rank.step(&mut state, &[int(1)]).unwrap();
2368        assert_eq!(rank.value(&state).unwrap(), int(1));
2369        rank.step(&mut state, &[int(2)]).unwrap();
2370        assert_eq!(rank.value(&state).unwrap(), int(2));
2371        rank.step(&mut state, &[int(2)]).unwrap();
2372        assert_eq!(rank.value(&state).unwrap(), int(2));
2373        rank.step(&mut state, &[int(3)]).unwrap();
2374        assert_eq!(rank.value(&state).unwrap(), int(4));
2375    }
2376}