1#![allow(
21 clippy::unnecessary_literal_bound,
22 clippy::cast_possible_truncation,
23 clippy::cast_possible_wrap,
24 clippy::cast_precision_loss,
25 clippy::cast_sign_loss,
26 clippy::items_after_statements,
27 clippy::float_cmp,
28 clippy::match_same_arms,
29 clippy::similar_names
30)]
31
32use std::collections::VecDeque;
33
34use fsqlite_error::Result;
35use fsqlite_types::SqliteValue;
36
37use crate::{FunctionRegistry, WindowFunction};
38
39pub struct RowNumberState {
44 counter: i64,
45}
46
47pub struct RowNumberFunc;
48
49impl WindowFunction for RowNumberFunc {
50 type State = RowNumberState;
51
52 fn initial_state(&self) -> Self::State {
53 RowNumberState { counter: 0 }
54 }
55
56 fn step(&self, state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
57 state.counter += 1;
58 Ok(())
59 }
60
61 fn inverse(&self, state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
62 state.counter -= 1;
63 Ok(())
64 }
65
66 fn value(&self, state: &Self::State) -> Result<SqliteValue> {
67 Ok(SqliteValue::Integer(state.counter))
68 }
69
70 fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
71 Ok(SqliteValue::Integer(state.counter))
72 }
73
74 fn num_args(&self) -> i32 {
75 0
76 }
77
78 fn name(&self) -> &str {
79 "row_number"
80 }
81}
82
83pub struct RankState {
88 row_number: i64,
89 rank: i64,
90 last_order_value: Option<SqliteValue>,
91}
92
93pub struct RankFunc;
94
95impl WindowFunction for RankFunc {
96 type State = RankState;
97
98 fn initial_state(&self) -> Self::State {
99 RankState {
100 row_number: 0,
101 rank: 0,
102 last_order_value: None,
103 }
104 }
105
106 fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
107 state.row_number += 1;
108 let current = args.first().cloned().unwrap_or(SqliteValue::Null);
109 let is_new_peer = match &state.last_order_value {
110 None => true,
111 Some(last) => ¤t != last,
112 };
113 if is_new_peer {
114 state.rank = state.row_number;
115 state.last_order_value = Some(current);
116 }
117 Ok(())
118 }
119
120 fn inverse(&self, _state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
121 Ok(())
123 }
124
125 fn value(&self, state: &Self::State) -> Result<SqliteValue> {
126 Ok(SqliteValue::Integer(state.rank))
127 }
128
129 fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
130 Ok(SqliteValue::Integer(state.rank))
131 }
132
133 fn num_args(&self) -> i32 {
134 -1
135 }
136
137 fn name(&self) -> &str {
138 "rank"
139 }
140}
141
142pub struct DenseRankState {
147 dense_rank: i64,
148 last_order_value: Option<SqliteValue>,
149}
150
151pub struct DenseRankFunc;
152
153impl WindowFunction for DenseRankFunc {
154 type State = DenseRankState;
155
156 fn initial_state(&self) -> Self::State {
157 DenseRankState {
158 dense_rank: 0,
159 last_order_value: None,
160 }
161 }
162
163 fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
164 let current = args.first().cloned().unwrap_or(SqliteValue::Null);
165 let is_new_peer = match &state.last_order_value {
166 None => true,
167 Some(last) => ¤t != last,
168 };
169 if is_new_peer {
170 state.dense_rank += 1;
171 state.last_order_value = Some(current);
172 }
173 Ok(())
174 }
175
176 fn inverse(&self, _state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
177 Ok(())
178 }
179
180 fn value(&self, state: &Self::State) -> Result<SqliteValue> {
181 Ok(SqliteValue::Integer(state.dense_rank))
182 }
183
184 fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
185 Ok(SqliteValue::Integer(state.dense_rank))
186 }
187
188 fn num_args(&self) -> i32 {
189 -1
190 }
191
192 fn name(&self) -> &str {
193 "dense_rank"
194 }
195}
196
197pub struct PercentRankState {
207 partition_size: i64,
208 ranks: Vec<i64>,
209 cursor: usize,
210 step_row_number: i64,
211 current_rank: i64,
212 last_order_value: Option<SqliteValue>,
213}
214
215pub struct PercentRankFunc;
216
217impl WindowFunction for PercentRankFunc {
218 type State = PercentRankState;
219
220 fn initial_state(&self) -> Self::State {
221 PercentRankState {
222 partition_size: 0,
223 ranks: Vec::new(),
224 cursor: 0,
225 step_row_number: 0,
226 current_rank: 0,
227 last_order_value: None,
228 }
229 }
230
231 fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
232 state.step_row_number += 1;
233 state.partition_size += 1;
234 let current = args.first().cloned().unwrap_or(SqliteValue::Null);
235 let is_new_peer = match &state.last_order_value {
236 None => true,
237 Some(last) => ¤t != last,
238 };
239 if is_new_peer {
240 state.current_rank = state.step_row_number;
241 state.last_order_value = Some(current);
242 }
243 state.ranks.push(state.current_rank);
244 Ok(())
245 }
246
247 fn inverse(&self, state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
248 state.cursor += 1;
249 Ok(())
250 }
251
252 fn value(&self, state: &Self::State) -> Result<SqliteValue> {
253 if state.partition_size <= 1 {
254 return Ok(SqliteValue::Float(0.0));
255 }
256 let rank = state.ranks.get(state.cursor).copied().unwrap_or(1);
257 let pr = (rank - 1) as f64 / (state.partition_size - 1) as f64;
258 Ok(SqliteValue::Float(pr))
259 }
260
261 fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
262 self.value(&state)
263 }
264
265 fn num_args(&self) -> i32 {
266 -1
267 }
268
269 fn name(&self) -> &str {
270 "percent_rank"
271 }
272}
273
274pub struct CumeDistState {
283 partition_size: i64,
284 current_row: i64,
285}
286
287pub struct CumeDistFunc;
288
289impl WindowFunction for CumeDistFunc {
290 type State = CumeDistState;
291
292 fn initial_state(&self) -> Self::State {
293 CumeDistState {
294 partition_size: 0,
295 current_row: 0,
296 }
297 }
298
299 fn step(&self, state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
300 state.partition_size += 1;
301 Ok(())
302 }
303
304 fn inverse(&self, state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
305 state.current_row += 1;
306 Ok(())
307 }
308
309 fn value(&self, state: &Self::State) -> Result<SqliteValue> {
310 if state.partition_size == 0 {
311 return Ok(SqliteValue::Float(0.0));
312 }
313 let cd = (state.current_row + 1) as f64 / state.partition_size as f64;
314 Ok(SqliteValue::Float(cd))
315 }
316
317 fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
318 self.value(&state)
319 }
320
321 fn num_args(&self) -> i32 {
322 -1
323 }
324
325 fn name(&self) -> &str {
326 "cume_dist"
327 }
328}
329
330pub struct NtileState {
340 partition_size: i64,
341 n: i64,
342 current_row: i64,
343}
344
345pub struct NtileFunc;
346
347impl WindowFunction for NtileFunc {
348 type State = NtileState;
349
350 fn initial_state(&self) -> Self::State {
351 NtileState {
352 partition_size: 0,
353 n: 1,
354 current_row: 0,
355 }
356 }
357
358 fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
359 state.partition_size += 1;
360 if state.partition_size == 1 {
361 let n = args.first().map_or(1, |v| v.to_integer().max(1));
362 state.n = n;
363 }
364 Ok(())
365 }
366
367 fn inverse(&self, state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
368 state.current_row += 1;
369 Ok(())
370 }
371
372 fn value(&self, state: &Self::State) -> Result<SqliteValue> {
373 if state.n <= 0 || state.partition_size == 0 {
374 return Ok(SqliteValue::Integer(1));
375 }
376 let n = state.n;
377 let sz = state.partition_size;
378 let row = state.current_row + 1; let base = sz / n;
383 let extra = sz % n;
384 let large_rows = extra * (base + 1);
386
387 let bucket = if row <= large_rows {
388 (row - 1) / (base + 1) + 1
390 } else {
391 let adjusted = row - large_rows;
393 if base == 0 {
394 extra + adjusted
396 } else {
397 extra + (adjusted - 1) / base + 1
398 }
399 };
400 Ok(SqliteValue::Integer(bucket))
401 }
402
403 fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
404 self.value(&state)
405 }
406
407 fn num_args(&self) -> i32 {
408 1
409 }
410
411 fn name(&self) -> &str {
412 "ntile"
413 }
414}
415
416pub struct LagState {
422 buffer: VecDeque<SqliteValue>,
423 offset: i64,
424 default_val: SqliteValue,
425 row_number: i64,
426}
427
428pub struct LagFunc;
429
430impl WindowFunction for LagFunc {
431 type State = LagState;
432
433 fn initial_state(&self) -> Self::State {
434 LagState {
435 buffer: VecDeque::new(),
436 offset: 1,
437 default_val: SqliteValue::Null,
438 row_number: 0,
439 }
440 }
441
442 fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
443 let val = args.first().cloned().unwrap_or(SqliteValue::Null);
444 if state.row_number == 0 {
446 if let Some(off) = args.get(1) {
447 state.offset = off.to_integer().max(0);
448 }
449 if let Some(def) = args.get(2) {
450 state.default_val = def.clone();
451 }
452 }
453 state.row_number += 1;
454 state.buffer.push_back(val);
455 Ok(())
456 }
457
458 fn inverse(&self, _state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
459 Ok(())
460 }
461
462 fn value(&self, state: &Self::State) -> Result<SqliteValue> {
463 let idx = state.row_number - state.offset;
464 if idx < 1 || idx > state.buffer.len() as i64 {
465 return Ok(state.default_val.clone());
466 }
467 Ok(state.buffer[(idx - 1) as usize].clone())
468 }
469
470 fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
471 self.value(&state)
472 }
473
474 fn num_args(&self) -> i32 {
475 -1 }
477
478 fn name(&self) -> &str {
479 "lag"
480 }
481}
482
483pub struct LeadState {
489 buffer: Vec<SqliteValue>,
490 offset: i64,
491 default_val: SqliteValue,
492 current_row: i64,
493}
494
495pub struct LeadFunc;
496
497impl WindowFunction for LeadFunc {
498 type State = LeadState;
499
500 fn initial_state(&self) -> Self::State {
501 LeadState {
502 buffer: Vec::new(),
503 offset: 1,
504 default_val: SqliteValue::Null,
505 current_row: 0,
506 }
507 }
508
509 fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
510 let val = args.first().cloned().unwrap_or(SqliteValue::Null);
511 if state.buffer.is_empty() {
512 if let Some(off) = args.get(1) {
513 state.offset = off.to_integer().max(0);
514 }
515 if let Some(def) = args.get(2) {
516 state.default_val = def.clone();
517 }
518 }
519 state.buffer.push(val);
520 Ok(())
521 }
522
523 fn inverse(&self, state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
524 state.current_row += 1;
525 Ok(())
526 }
527
528 fn value(&self, state: &Self::State) -> Result<SqliteValue> {
529 let target = state.current_row + state.offset;
530 if target < 0 || target >= state.buffer.len() as i64 {
531 return Ok(state.default_val.clone());
532 }
533 Ok(state.buffer[target as usize].clone())
534 }
535
536 fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
537 self.value(&state)
538 }
539
540 fn num_args(&self) -> i32 {
541 -1
542 }
543
544 fn name(&self) -> &str {
545 "lead"
546 }
547}
548
549pub struct FirstValueState {
554 first: Option<SqliteValue>,
555}
556
557pub struct FirstValueFunc;
558
559impl WindowFunction for FirstValueFunc {
560 type State = FirstValueState;
561
562 fn initial_state(&self) -> Self::State {
563 FirstValueState { first: None }
564 }
565
566 fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
567 if state.first.is_none() {
568 state.first = Some(args.first().cloned().unwrap_or(SqliteValue::Null));
569 }
570 Ok(())
571 }
572
573 fn inverse(&self, state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
574 state.first = None;
579 Ok(())
580 }
581
582 fn value(&self, state: &Self::State) -> Result<SqliteValue> {
583 Ok(state.first.clone().unwrap_or(SqliteValue::Null))
584 }
585
586 fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
587 Ok(state.first.unwrap_or(SqliteValue::Null))
588 }
589
590 fn num_args(&self) -> i32 {
591 1
592 }
593
594 fn name(&self) -> &str {
595 "first_value"
596 }
597}
598
599pub struct LastValueState {
604 frame: VecDeque<SqliteValue>,
605}
606
607pub struct LastValueFunc;
608
609impl WindowFunction for LastValueFunc {
610 type State = LastValueState;
611
612 fn initial_state(&self) -> Self::State {
613 LastValueState {
614 frame: VecDeque::new(),
615 }
616 }
617
618 fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
619 state
620 .frame
621 .push_back(args.first().cloned().unwrap_or(SqliteValue::Null));
622 Ok(())
623 }
624
625 fn inverse(&self, state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
626 state.frame.pop_front();
627 Ok(())
628 }
629
630 fn value(&self, state: &Self::State) -> Result<SqliteValue> {
631 Ok(state.frame.back().cloned().unwrap_or(SqliteValue::Null))
632 }
633
634 fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
635 Ok(state.frame.back().cloned().unwrap_or(SqliteValue::Null))
636 }
637
638 fn num_args(&self) -> i32 {
639 1
640 }
641
642 fn name(&self) -> &str {
643 "last_value"
644 }
645}
646
647pub struct NthValueState {
652 frame: VecDeque<SqliteValue>,
653 n: i64,
654}
655
656pub struct NthValueFunc;
657
658impl WindowFunction for NthValueFunc {
659 type State = NthValueState;
660
661 fn initial_state(&self) -> Self::State {
662 NthValueState {
663 frame: VecDeque::new(),
664 n: 1,
665 }
666 }
667
668 fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
669 let val = args.first().cloned().unwrap_or(SqliteValue::Null);
670 if state.frame.is_empty() {
672 if let Some(n_arg) = args.get(1) {
673 state.n = n_arg.to_integer();
674 }
675 }
676 state.frame.push_back(val);
677 Ok(())
678 }
679
680 fn inverse(&self, state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
681 state.frame.pop_front();
682 Ok(())
683 }
684
685 fn value(&self, state: &Self::State) -> Result<SqliteValue> {
686 if state.n <= 0 {
689 return Ok(SqliteValue::Null);
690 }
691 let idx = (state.n - 1) as usize;
692 Ok(state.frame.get(idx).cloned().unwrap_or(SqliteValue::Null))
693 }
694
695 fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
696 self.value(&state)
697 }
698
699 fn num_args(&self) -> i32 {
700 2
701 }
702
703 fn name(&self) -> &str {
704 "nth_value"
705 }
706}
707
708pub fn register_window_builtins(registry: &mut FunctionRegistry) {
712 registry.register_window(RowNumberFunc);
713 registry.register_window(RankFunc);
714 registry.register_window(DenseRankFunc);
715 registry.register_window(PercentRankFunc);
716 registry.register_window(CumeDistFunc);
717 registry.register_window(NtileFunc);
718 registry.register_window(LagFunc);
719 registry.register_window(LeadFunc);
720 registry.register_window(FirstValueFunc);
721 registry.register_window(LastValueFunc);
722 registry.register_window(NthValueFunc);
723}
724
725#[cfg(test)]
728mod tests {
729 use super::*;
730
731 fn int(v: i64) -> SqliteValue {
732 SqliteValue::Integer(v)
733 }
734
735 fn text(s: &str) -> SqliteValue {
736 SqliteValue::Text(s.to_owned())
737 }
738
739 fn null() -> SqliteValue {
740 SqliteValue::Null
741 }
742
743 fn run_window_partition<F: WindowFunction>(
747 func: &F,
748 rows: &[Vec<SqliteValue>],
749 ) -> Vec<SqliteValue> {
750 let mut state = func.initial_state();
751 let mut results = Vec::new();
752 for row in rows {
753 func.step(&mut state, row).unwrap();
754 results.push(func.value(&state).unwrap());
755 }
756 results
757 }
758
759 fn run_window_two_pass<F: WindowFunction>(
764 func: &F,
765 rows: &[Vec<SqliteValue>],
766 ) -> Vec<SqliteValue> {
767 let mut state = func.initial_state();
768 for row in rows {
770 func.step(&mut state, row).unwrap();
771 }
772 let mut results = Vec::new();
774 for (i, _) in rows.iter().enumerate() {
775 results.push(func.value(&state).unwrap());
776 if i < rows.len() - 1 {
777 func.inverse(&mut state, &[]).unwrap();
778 }
779 }
780 results
781 }
782
783 #[test]
786 fn test_row_number_basic() {
787 let results =
788 run_window_partition(&RowNumberFunc, &[vec![], vec![], vec![], vec![], vec![]]);
789 assert_eq!(results, vec![int(1), int(2), int(3), int(4), int(5)]);
790 }
791
792 #[test]
793 fn test_row_number_partition_reset() {
794 let r1 = run_window_partition(&RowNumberFunc, &[vec![], vec![], vec![]]);
796 assert_eq!(r1, vec![int(1), int(2), int(3)]);
797
798 let r2 = run_window_partition(&RowNumberFunc, &[vec![], vec![]]);
800 assert_eq!(r2, vec![int(1), int(2)]);
801 }
802
803 #[test]
806 fn test_rank_with_ties() {
807 let results = run_window_partition(
809 &RankFunc,
810 &[vec![int(1)], vec![int(2)], vec![int(2)], vec![int(3)]],
811 );
812 assert_eq!(results, vec![int(1), int(2), int(2), int(4)]);
813 }
814
815 #[test]
816 fn test_rank_no_ties() {
817 let results =
818 run_window_partition(&RankFunc, &[vec![int(10)], vec![int(20)], vec![int(30)]]);
819 assert_eq!(results, vec![int(1), int(2), int(3)]);
820 }
821
822 #[test]
825 fn test_dense_rank_with_ties() {
826 let results = run_window_partition(
828 &DenseRankFunc,
829 &[vec![int(1)], vec![int(2)], vec![int(2)], vec![int(3)]],
830 );
831 assert_eq!(results, vec![int(1), int(2), int(2), int(3)]);
832 }
833
834 #[test]
835 fn test_dense_rank_multiple_ties() {
836 let results = run_window_partition(
838 &DenseRankFunc,
839 &[
840 vec![int(1)],
841 vec![int(1)],
842 vec![int(2)],
843 vec![int(2)],
844 vec![int(3)],
845 ],
846 );
847 assert_eq!(results, vec![int(1), int(1), int(2), int(2), int(3)]);
848 }
849
850 #[test]
853 fn test_percent_rank_single_row() {
854 let results = run_window_two_pass(&PercentRankFunc, &[vec![int(1)]]);
855 assert_eq!(results, vec![SqliteValue::Float(0.0)]);
856 }
857
858 #[test]
859 fn test_percent_rank_formula() {
860 let results = run_window_two_pass(
863 &PercentRankFunc,
864 &[vec![int(1)], vec![int(2)], vec![int(2)], vec![int(3)]],
865 );
866 match &results[0] {
871 SqliteValue::Float(v) => assert!((*v - 0.0).abs() < 1e-10),
872 other => panic!("expected Float, got {other:?}"),
873 }
874 match &results[1] {
875 SqliteValue::Float(v) => assert!((*v - 1.0 / 3.0).abs() < 1e-10),
876 other => panic!("expected Float, got {other:?}"),
877 }
878 match &results[2] {
879 SqliteValue::Float(v) => assert!((*v - 1.0 / 3.0).abs() < 1e-10),
880 other => panic!("expected Float, got {other:?}"),
881 }
882 match &results[3] {
883 SqliteValue::Float(v) => assert!((*v - 1.0).abs() < 1e-10),
884 other => panic!("expected Float, got {other:?}"),
885 }
886 }
887
888 #[test]
891 fn test_cume_dist_distinct() {
892 let results = run_window_two_pass(
894 &CumeDistFunc,
895 &[vec![int(1)], vec![int(2)], vec![int(3)], vec![int(4)]],
896 );
897 for (i, expected) in [0.25, 0.5, 0.75, 1.0].iter().enumerate() {
898 match &results[i] {
899 SqliteValue::Float(v) => {
900 assert!(
901 (*v - expected).abs() < 1e-10,
902 "row {i}: expected {expected}, got {v}"
903 );
904 }
905 other => panic!("expected Float, got {other:?}"),
906 }
907 }
908 }
909
910 #[test]
913 fn test_ntile_even() {
914 let rows: Vec<Vec<SqliteValue>> = (0..8).map(|_| vec![int(4)]).collect();
916 let results = run_window_two_pass(&NtileFunc, &rows);
917 assert_eq!(
918 results,
919 vec![
920 int(1),
921 int(1),
922 int(2),
923 int(2),
924 int(3),
925 int(3),
926 int(4),
927 int(4)
928 ]
929 );
930 }
931
932 #[test]
933 fn test_ntile_uneven() {
934 let rows: Vec<Vec<SqliteValue>> = (0..10).map(|_| vec![int(3)]).collect();
936 let results = run_window_two_pass(&NtileFunc, &rows);
937 assert_eq!(
938 results,
939 vec![
940 int(1),
941 int(1),
942 int(1),
943 int(1),
944 int(2),
945 int(2),
946 int(2),
947 int(3),
948 int(3),
949 int(3)
950 ]
951 );
952 }
953
954 #[test]
955 fn test_ntile_more_buckets_than_rows() {
956 let rows: Vec<Vec<SqliteValue>> = (0..3).map(|_| vec![int(10)]).collect();
958 let results = run_window_two_pass(&NtileFunc, &rows);
959 assert_eq!(results, vec![int(1), int(2), int(3)]);
960 }
961
962 #[test]
965 fn test_lag_default() {
966 let results =
968 run_window_partition(&LagFunc, &[vec![int(10)], vec![int(20)], vec![int(30)]]);
969 assert_eq!(results, vec![null(), int(10), int(20)]);
970 }
971
972 #[test]
973 fn test_lag_offset_3() {
974 let results = run_window_partition(
976 &LagFunc,
977 &[
978 vec![int(10), int(3)],
979 vec![int(20), int(3)],
980 vec![int(30), int(3)],
981 vec![int(40), int(3)],
982 vec![int(50), int(3)],
983 ],
984 );
985 assert_eq!(results, vec![null(), null(), null(), int(10), int(20)]);
986 }
987
988 #[test]
989 fn test_lag_default_value() {
990 let results = run_window_partition(
992 &LagFunc,
993 &[
994 vec![int(10), int(1), int(-1)],
995 vec![int(20), int(1), int(-1)],
996 ],
997 );
998 assert_eq!(results, vec![int(-1), int(10)]);
999 }
1000
1001 #[test]
1004 fn test_lead_default() {
1005 let func = LeadFunc;
1009 let mut state = func.initial_state();
1010 let rows = [int(10), int(20), int(30)];
1011
1012 for row in &rows {
1014 func.step(&mut state, std::slice::from_ref(row)).unwrap();
1015 }
1016
1017 let mut results = Vec::new();
1019 for _ in &rows {
1020 results.push(func.value(&state).unwrap());
1021 func.inverse(&mut state, &[]).unwrap();
1022 }
1023 assert_eq!(results, vec![int(20), int(30), null()]);
1024 }
1025
1026 #[test]
1027 fn test_lead_offset_2() {
1028 let func = LeadFunc;
1029 let mut state = func.initial_state();
1030 let rows = [int(10), int(20), int(30), int(40), int(50)];
1031
1032 for row in &rows {
1033 func.step(&mut state, &[row.clone(), int(2)]).unwrap();
1034 }
1035
1036 let mut results = Vec::new();
1037 for _ in &rows {
1038 results.push(func.value(&state).unwrap());
1039 func.inverse(&mut state, &[]).unwrap();
1040 }
1041 assert_eq!(results, vec![int(30), int(40), int(50), null(), null()]);
1042 }
1043
1044 #[test]
1045 fn test_lead_default_value() {
1046 let func = LeadFunc;
1047 let mut state = func.initial_state();
1048 let rows = [int(10), int(20)];
1049
1050 for row in &rows {
1051 func.step(&mut state, &[row.clone(), int(1), text("N/A")])
1052 .unwrap();
1053 }
1054
1055 let mut results = Vec::new();
1056 for _ in &rows {
1057 results.push(func.value(&state).unwrap());
1058 func.inverse(&mut state, &[]).unwrap();
1059 }
1060 assert_eq!(results, vec![int(20), text("N/A")]);
1061 }
1062
1063 #[test]
1066 fn test_first_value_basic() {
1067 let results = run_window_partition(
1068 &FirstValueFunc,
1069 &[vec![int(10)], vec![int(20)], vec![int(30)]],
1070 );
1071 assert_eq!(results, vec![int(10), int(10), int(10)]);
1074 }
1075
1076 #[test]
1079 fn test_last_value_default_frame() {
1080 let results = run_window_partition(
1083 &LastValueFunc,
1084 &[vec![int(10)], vec![int(20)], vec![int(30)]],
1085 );
1086 assert_eq!(results, vec![int(10), int(20), int(30)]);
1087 }
1088
1089 #[test]
1090 fn test_last_value_unbounded_following() {
1091 let func = LastValueFunc;
1094 let mut state = func.initial_state();
1095 func.step(&mut state, &[int(10)]).unwrap();
1096 func.step(&mut state, &[int(20)]).unwrap();
1097 func.step(&mut state, &[int(30)]).unwrap();
1098 assert_eq!(func.value(&state).unwrap(), int(30));
1099 }
1100
1101 #[test]
1104 fn test_nth_value_basic() {
1105 let func = NthValueFunc;
1106 let mut state = func.initial_state();
1107 func.step(&mut state, &[int(10), int(3)]).unwrap();
1109 func.step(&mut state, &[int(20), int(3)]).unwrap();
1110 func.step(&mut state, &[int(30), int(3)]).unwrap();
1111 func.step(&mut state, &[int(40), int(3)]).unwrap();
1112 func.step(&mut state, &[int(50), int(3)]).unwrap();
1113 assert_eq!(func.value(&state).unwrap(), int(30));
1114 }
1115
1116 #[test]
1117 fn test_nth_value_out_of_range() {
1118 let func = NthValueFunc;
1119 let mut state = func.initial_state();
1120 func.step(&mut state, &[int(10), int(100)]).unwrap();
1121 func.step(&mut state, &[int(20), int(100)]).unwrap();
1122 assert_eq!(func.value(&state).unwrap(), null());
1124 }
1125
1126 #[test]
1127 fn test_nth_value_n_zero() {
1128 let func = NthValueFunc;
1130 let mut state = func.initial_state();
1131 func.step(&mut state, &[int(10), int(0)]).unwrap();
1132 assert_eq!(func.value(&state).unwrap(), null());
1133 }
1134
1135 #[test]
1138 fn test_register_window_builtins_all_present() {
1139 let mut reg = FunctionRegistry::new();
1140 register_window_builtins(&mut reg);
1141
1142 let expected_variadic = [
1143 "row_number",
1144 "rank",
1145 "dense_rank",
1146 "percent_rank",
1147 "cume_dist",
1148 "lag",
1149 "lead",
1150 ];
1151 for name in expected_variadic {
1152 assert!(
1153 reg.find_window(name, 0).is_some()
1154 || reg.find_window(name, 1).is_some()
1155 || reg.find_window(name, -1).is_some(),
1156 "window function '{name}' not registered"
1157 );
1158 }
1159
1160 assert!(
1161 reg.find_window("ntile", 1).is_some(),
1162 "ntile(1) not registered"
1163 );
1164 assert!(
1165 reg.find_window("first_value", 1).is_some(),
1166 "first_value(1) not registered"
1167 );
1168 assert!(
1169 reg.find_window("last_value", 1).is_some(),
1170 "last_value(1) not registered"
1171 );
1172 assert!(
1173 reg.find_window("nth_value", 2).is_some(),
1174 "nth_value(2) not registered"
1175 );
1176 }
1177
1178 #[test]
1181 fn test_e2e_window_row_number_through_registry() {
1182 let mut reg = FunctionRegistry::new();
1183 register_window_builtins(&mut reg);
1184
1185 let rn = reg.find_window("row_number", 0).unwrap();
1186 let mut state = rn.initial_state();
1187 rn.step(&mut state, &[]).unwrap();
1188 assert_eq!(rn.value(&state).unwrap(), int(1));
1189 rn.step(&mut state, &[]).unwrap();
1190 assert_eq!(rn.value(&state).unwrap(), int(2));
1191 rn.step(&mut state, &[]).unwrap();
1192 assert_eq!(rn.value(&state).unwrap(), int(3));
1193 }
1194
1195 #[test]
1196 fn test_e2e_window_rank_through_registry() {
1197 let mut reg = FunctionRegistry::new();
1198 register_window_builtins(&mut reg);
1199
1200 let rank = reg.find_window("rank", 1).unwrap();
1201 let mut state = rank.initial_state();
1202 rank.step(&mut state, &[int(1)]).unwrap();
1204 assert_eq!(rank.value(&state).unwrap(), int(1));
1205 rank.step(&mut state, &[int(2)]).unwrap();
1206 assert_eq!(rank.value(&state).unwrap(), int(2));
1207 rank.step(&mut state, &[int(2)]).unwrap();
1208 assert_eq!(rank.value(&state).unwrap(), int(2));
1209 rank.step(&mut state, &[int(3)]).unwrap();
1210 assert_eq!(rank.value(&state).unwrap(), int(4));
1211 }
1212}