1#![allow(
22 clippy::unnecessary_literal_bound,
23 clippy::cast_possible_truncation,
24 clippy::cast_possible_wrap,
25 clippy::cast_precision_loss,
26 clippy::cast_sign_loss,
27 clippy::items_after_statements,
28 clippy::float_cmp,
29 clippy::match_same_arms,
30 clippy::similar_names
31)]
32
33use std::collections::VecDeque;
34
35use fsqlite_error::{FrankenError, Result};
36use fsqlite_types::SqliteValue;
37
38use crate::{FunctionRegistry, WindowFunction};
39
40pub struct RowNumberState {
45 counter: i64,
46}
47
48pub struct RowNumberFunc;
49
50impl WindowFunction for RowNumberFunc {
51 type State = RowNumberState;
52
53 fn initial_state(&self) -> Self::State {
54 RowNumberState { counter: 0 }
55 }
56
57 fn step(&self, state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
58 state.counter += 1;
59 Ok(())
60 }
61
62 fn inverse(&self, state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
63 state.counter -= 1;
64 Ok(())
65 }
66
67 fn value(&self, state: &Self::State) -> Result<SqliteValue> {
68 Ok(SqliteValue::Integer(state.counter))
69 }
70
71 fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
72 Ok(SqliteValue::Integer(state.counter))
73 }
74
75 fn num_args(&self) -> i32 {
76 0
77 }
78
79 fn name(&self) -> &str {
80 "row_number"
81 }
82}
83
84pub struct RankState {
89 row_number: i64,
90 rank: i64,
91 last_order_value: Option<SqliteValue>,
92}
93
94pub struct RankFunc;
95
96impl WindowFunction for RankFunc {
97 type State = RankState;
98
99 fn initial_state(&self) -> Self::State {
100 RankState {
101 row_number: 0,
102 rank: 0,
103 last_order_value: None,
104 }
105 }
106
107 fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
108 state.row_number += 1;
109 let current = args.first().cloned().unwrap_or(SqliteValue::Null);
110 let is_new_peer = match &state.last_order_value {
111 None => true,
112 Some(last) => ¤t != last,
113 };
114 if is_new_peer {
115 state.rank = state.row_number;
116 state.last_order_value = Some(current);
117 }
118 Ok(())
119 }
120
121 fn inverse(&self, _state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
122 Ok(())
124 }
125
126 fn value(&self, state: &Self::State) -> Result<SqliteValue> {
127 Ok(SqliteValue::Integer(state.rank))
128 }
129
130 fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
131 Ok(SqliteValue::Integer(state.rank))
132 }
133
134 fn num_args(&self) -> i32 {
135 -1
136 }
137
138 fn min_args(&self) -> i32 {
139 0
140 }
141
142 fn max_args(&self) -> Option<i32> {
143 Some(0)
144 }
145
146 fn name(&self) -> &str {
147 "rank"
148 }
149}
150
151pub struct DenseRankState {
156 dense_rank: i64,
157 last_order_value: Option<SqliteValue>,
158}
159
160pub struct DenseRankFunc;
161
162impl WindowFunction for DenseRankFunc {
163 type State = DenseRankState;
164
165 fn initial_state(&self) -> Self::State {
166 DenseRankState {
167 dense_rank: 0,
168 last_order_value: None,
169 }
170 }
171
172 fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
173 let current = args.first().cloned().unwrap_or(SqliteValue::Null);
174 let is_new_peer = match &state.last_order_value {
175 None => true,
176 Some(last) => ¤t != last,
177 };
178 if is_new_peer {
179 state.dense_rank += 1;
180 state.last_order_value = Some(current);
181 }
182 Ok(())
183 }
184
185 fn inverse(&self, _state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
186 Ok(())
187 }
188
189 fn value(&self, state: &Self::State) -> Result<SqliteValue> {
190 Ok(SqliteValue::Integer(state.dense_rank))
191 }
192
193 fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
194 Ok(SqliteValue::Integer(state.dense_rank))
195 }
196
197 fn num_args(&self) -> i32 {
198 -1
199 }
200
201 fn min_args(&self) -> i32 {
202 0
203 }
204
205 fn max_args(&self) -> Option<i32> {
206 Some(0)
207 }
208
209 fn name(&self) -> &str {
210 "dense_rank"
211 }
212}
213
214pub struct PercentRankState {
224 partition_size: i64,
225 ranks: Vec<i64>,
226 cursor: usize,
227 step_row_number: i64,
228 current_rank: i64,
229 last_order_value: Option<SqliteValue>,
230}
231
232pub struct PercentRankFunc;
233
234impl WindowFunction for PercentRankFunc {
235 type State = PercentRankState;
236
237 fn initial_state(&self) -> Self::State {
238 PercentRankState {
239 partition_size: 0,
240 ranks: Vec::new(),
241 cursor: 0,
242 step_row_number: 0,
243 current_rank: 0,
244 last_order_value: None,
245 }
246 }
247
248 fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
249 state.step_row_number += 1;
250 state.partition_size += 1;
251 let current = args.first().cloned().unwrap_or(SqliteValue::Null);
252 let is_new_peer = match &state.last_order_value {
253 None => true,
254 Some(last) => ¤t != last,
255 };
256 if is_new_peer {
257 state.current_rank = state.step_row_number;
258 state.last_order_value = Some(current);
259 }
260 state.ranks.push(state.current_rank);
261 Ok(())
262 }
263
264 fn inverse(&self, state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
265 state.cursor += 1;
266 Ok(())
267 }
268
269 fn value(&self, state: &Self::State) -> Result<SqliteValue> {
270 if state.partition_size <= 1 {
271 return Ok(SqliteValue::Float(0.0));
272 }
273 let rank = state.ranks.get(state.cursor).copied().unwrap_or(1);
274 let pr = (rank - 1) as f64 / (state.partition_size - 1) as f64;
275 Ok(SqliteValue::Float(pr))
276 }
277
278 fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
279 self.value(&state)
280 }
281
282 fn num_args(&self) -> i32 {
283 -1
284 }
285
286 fn min_args(&self) -> i32 {
287 0
288 }
289
290 fn max_args(&self) -> Option<i32> {
291 Some(0)
292 }
293
294 fn name(&self) -> &str {
295 "percent_rank"
296 }
297}
298
299pub struct CumeDistState {
308 partition_size: i64,
309 current_row: usize,
310 cume_positions: Vec<i64>,
311 peer_start: usize,
312 last_order_value: Option<SqliteValue>,
313}
314
315pub struct CumeDistFunc;
316
317impl WindowFunction for CumeDistFunc {
318 type State = CumeDistState;
319
320 fn initial_state(&self) -> Self::State {
321 CumeDistState {
322 partition_size: 0,
323 current_row: 0,
324 cume_positions: Vec::new(),
325 peer_start: 0,
326 last_order_value: None,
327 }
328 }
329
330 fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
331 let current = args.first().cloned().unwrap_or(SqliteValue::Null);
332 let is_new_peer = match &state.last_order_value {
333 None => true,
334 Some(last) => ¤t != last,
335 };
336 if is_new_peer {
337 let peer_end = state.partition_size;
338 if let Some(slots) = state.cume_positions.get_mut(state.peer_start..) {
339 for slot in slots {
340 *slot = peer_end;
341 }
342 }
343 state.peer_start = state.cume_positions.len();
344 state.last_order_value = Some(current);
345 }
346 state.partition_size += 1;
347 state.cume_positions.push(0);
348 Ok(())
349 }
350
351 fn inverse(&self, state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
352 state.current_row += 1;
353 Ok(())
354 }
355
356 fn value(&self, state: &Self::State) -> Result<SqliteValue> {
357 if state.partition_size == 0 {
358 return Ok(SqliteValue::Float(0.0));
359 }
360 let peer_end = state
361 .cume_positions
362 .get(state.current_row)
363 .copied()
364 .filter(|position| *position != 0)
365 .unwrap_or(state.partition_size);
366 let cd = peer_end as f64 / state.partition_size as f64;
367 Ok(SqliteValue::Float(cd))
368 }
369
370 fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
371 self.value(&state)
372 }
373
374 fn num_args(&self) -> i32 {
375 -1
376 }
377
378 fn min_args(&self) -> i32 {
379 0
380 }
381
382 fn max_args(&self) -> Option<i32> {
383 Some(0)
384 }
385
386 fn name(&self) -> &str {
387 "cume_dist"
388 }
389}
390
391pub struct NtileState {
401 partition_size: i64,
402 n: i64,
403 current_row: i64,
404}
405
406pub struct NtileFunc;
407
408const INVALID_NTILE_ARGUMENT: &str = "argument of ntile must be a positive integer";
409
410impl WindowFunction for NtileFunc {
411 type State = NtileState;
412
413 fn initial_state(&self) -> Self::State {
414 NtileState {
415 partition_size: 0,
416 n: 1,
417 current_row: 0,
418 }
419 }
420
421 fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
422 if state.partition_size == 0 {
423 let n = args.first().map_or(0, SqliteValue::to_integer);
424 if n <= 0 {
425 return Err(FrankenError::function_error(INVALID_NTILE_ARGUMENT));
426 }
427 state.n = n;
428 }
429 state.partition_size += 1;
430 Ok(())
431 }
432
433 fn inverse(&self, state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
434 state.current_row += 1;
435 Ok(())
436 }
437
438 fn value(&self, state: &Self::State) -> Result<SqliteValue> {
439 if state.partition_size == 0 {
440 return Ok(SqliteValue::Integer(1));
441 }
442 let n = state.n;
443 let sz = state.partition_size;
444 let row = state.current_row + 1; let base = sz / n;
449 let extra = sz % n;
450 let large_rows = extra * (base + 1);
452
453 let bucket = if row <= large_rows {
454 (row - 1) / (base + 1) + 1
456 } else {
457 let adjusted = row - large_rows;
459 if base == 0 {
460 extra + adjusted
462 } else {
463 extra + (adjusted - 1) / base + 1
464 }
465 };
466 Ok(SqliteValue::Integer(bucket))
467 }
468
469 fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
470 self.value(&state)
471 }
472
473 fn num_args(&self) -> i32 {
474 1
475 }
476
477 fn name(&self) -> &str {
478 "ntile"
479 }
480}
481
482fn numeric_prefix_len(bytes: &[u8]) -> usize {
487 let mut idx = 0;
488 if bytes
489 .get(idx)
490 .is_some_and(|byte| matches!(*byte, b'+' | b'-'))
491 {
492 idx += 1;
493 }
494
495 let mut saw_digit = false;
496 while bytes.get(idx).is_some_and(u8::is_ascii_digit) {
497 idx += 1;
498 saw_digit = true;
499 }
500
501 if bytes.get(idx) == Some(&b'.') {
502 idx += 1;
503 while bytes.get(idx).is_some_and(u8::is_ascii_digit) {
504 idx += 1;
505 saw_digit = true;
506 }
507 }
508
509 if !saw_digit {
510 return 0;
511 }
512
513 let mantissa_end = idx;
514 if bytes
515 .get(idx)
516 .is_some_and(|byte| matches!(*byte, b'e' | b'E'))
517 {
518 idx += 1;
519 if bytes
520 .get(idx)
521 .is_some_and(|byte| matches!(*byte, b'+' | b'-'))
522 {
523 idx += 1;
524 }
525 let exp_start = idx;
526 while bytes.get(idx).is_some_and(u8::is_ascii_digit) {
527 idx += 1;
528 }
529 if idx == exp_start {
530 return mantissa_end;
531 }
532 }
533
534 idx
535}
536
537fn trim_ascii_start(bytes: &[u8]) -> &[u8] {
538 let start = bytes
539 .iter()
540 .position(|byte| !byte.is_ascii_whitespace())
541 .unwrap_or(bytes.len());
542 bytes.get(start..).unwrap_or(&[])
543}
544
545fn lag_lead_bytes_offset(bytes: &[u8]) -> Option<i64> {
546 let trimmed = trim_ascii_start(bytes);
547 let prefix_len = numeric_prefix_len(trimmed);
548 if prefix_len == 0 {
549 return Some(0);
550 }
551 let prefix = trimmed
552 .get(..prefix_len)
553 .and_then(|bytes| std::str::from_utf8(bytes).ok())?;
554 if prefix
555 .as_bytes()
556 .iter()
557 .any(|byte| matches!(*byte, b'.' | b'e' | b'E'))
558 {
559 prefix.parse().ok().and_then(integral_f64_to_i64)
560 } else {
561 prefix
562 .parse()
563 .ok()
564 .or_else(|| prefix.parse().ok().and_then(integral_f64_to_i64))
565 }
566}
567
568fn lag_lead_text_offset(text: &str) -> Option<i64> {
569 lag_lead_bytes_offset(text.as_bytes())
570}
571
572fn lag_lead_offset_arg(value: Option<&SqliteValue>) -> Option<i64> {
573 match value {
574 None => Some(1),
575 Some(SqliteValue::Null) => None,
576 Some(SqliteValue::Integer(offset)) => Some(*offset),
577 Some(SqliteValue::Float(offset)) => integral_f64_to_i64(*offset),
578 Some(SqliteValue::Text(text)) => lag_lead_text_offset(text),
579 Some(SqliteValue::Blob(bytes)) => lag_lead_bytes_offset(bytes),
580 }
581}
582
583pub struct LagState {
585 buffer: Vec<SqliteValue>,
586 offsets: Vec<Option<i64>>,
587 defaults: Vec<SqliteValue>,
588 current_row: i64,
589}
590
591pub struct LagFunc;
592
593impl WindowFunction for LagFunc {
594 type State = LagState;
595
596 fn initial_state(&self) -> Self::State {
597 LagState {
598 buffer: Vec::new(),
599 offsets: Vec::new(),
600 defaults: Vec::new(),
601 current_row: 0,
602 }
603 }
604
605 fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
606 let val = args.first().cloned().unwrap_or(SqliteValue::Null);
607 let offset = lag_lead_offset_arg(args.get(1));
608 let default_val = args.get(2).cloned().unwrap_or(SqliteValue::Null);
609 state.buffer.push(val);
610 state.offsets.push(offset);
611 state.defaults.push(default_val);
612 Ok(())
613 }
614
615 fn inverse(&self, state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
616 state.current_row += 1;
617 Ok(())
618 }
619
620 fn value(&self, state: &Self::State) -> Result<SqliteValue> {
621 let current_index = usize::try_from(state.current_row).unwrap_or(usize::MAX);
622 let default_val = state
623 .defaults
624 .get(current_index)
625 .cloned()
626 .unwrap_or(SqliteValue::Null);
627 let Some(offset) = state.offsets.get(current_index).copied().flatten() else {
628 return Ok(default_val);
629 };
630 let target = state.current_row - offset;
631 let Ok(target_index) = usize::try_from(target) else {
632 return Ok(default_val);
633 };
634 Ok(state
635 .buffer
636 .get(target_index)
637 .cloned()
638 .unwrap_or(default_val))
639 }
640
641 fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
642 self.value(&state)
643 }
644
645 fn num_args(&self) -> i32 {
646 -1 }
648
649 fn min_args(&self) -> i32 {
650 1
651 }
652
653 fn max_args(&self) -> Option<i32> {
654 Some(3)
655 }
656
657 fn name(&self) -> &str {
658 "lag"
659 }
660}
661
662pub struct LeadState {
668 buffer: Vec<SqliteValue>,
669 offsets: Vec<Option<i64>>,
670 defaults: Vec<SqliteValue>,
671 current_row: i64,
672}
673
674pub struct LeadFunc;
675
676impl WindowFunction for LeadFunc {
677 type State = LeadState;
678
679 fn initial_state(&self) -> Self::State {
680 LeadState {
681 buffer: Vec::new(),
682 offsets: Vec::new(),
683 defaults: Vec::new(),
684 current_row: 0,
685 }
686 }
687
688 fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
689 let val = args.first().cloned().unwrap_or(SqliteValue::Null);
690 let offset = lag_lead_offset_arg(args.get(1));
691 let default_val = args.get(2).cloned().unwrap_or(SqliteValue::Null);
692 state.buffer.push(val);
693 state.offsets.push(offset);
694 state.defaults.push(default_val);
695 Ok(())
696 }
697
698 fn inverse(&self, state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
699 state.current_row += 1;
700 Ok(())
701 }
702
703 fn value(&self, state: &Self::State) -> Result<SqliteValue> {
704 let current_index = usize::try_from(state.current_row).unwrap_or(usize::MAX);
705 let default_val = state
706 .defaults
707 .get(current_index)
708 .cloned()
709 .unwrap_or(SqliteValue::Null);
710 let Some(offset) = state.offsets.get(current_index).copied().flatten() else {
711 return Ok(default_val);
712 };
713 let target = state.current_row + offset;
714 if target < 0 || target >= state.buffer.len() as i64 {
715 return Ok(default_val);
716 }
717 Ok(state.buffer[target as usize].clone())
718 }
719
720 fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
721 self.value(&state)
722 }
723
724 fn num_args(&self) -> i32 {
725 -1
726 }
727
728 fn min_args(&self) -> i32 {
729 1
730 }
731
732 fn max_args(&self) -> Option<i32> {
733 Some(3)
734 }
735
736 fn name(&self) -> &str {
737 "lead"
738 }
739}
740
741pub struct FirstValueState {
746 first: Option<SqliteValue>,
747}
748
749pub struct FirstValueFunc;
750
751impl WindowFunction for FirstValueFunc {
752 type State = FirstValueState;
753
754 fn initial_state(&self) -> Self::State {
755 FirstValueState { first: None }
756 }
757
758 fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
759 if state.first.is_none() {
760 state.first = Some(args.first().cloned().unwrap_or(SqliteValue::Null));
761 }
762 Ok(())
763 }
764
765 fn inverse(&self, state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
766 state.first = None;
771 Ok(())
772 }
773
774 fn value(&self, state: &Self::State) -> Result<SqliteValue> {
775 Ok(state.first.clone().unwrap_or(SqliteValue::Null))
776 }
777
778 fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
779 Ok(state.first.unwrap_or(SqliteValue::Null))
780 }
781
782 fn num_args(&self) -> i32 {
783 1
784 }
785
786 fn name(&self) -> &str {
787 "first_value"
788 }
789}
790
791pub struct LastValueState {
796 frame: VecDeque<SqliteValue>,
797}
798
799pub struct LastValueFunc;
800
801impl WindowFunction for LastValueFunc {
802 type State = LastValueState;
803
804 fn initial_state(&self) -> Self::State {
805 LastValueState {
806 frame: VecDeque::new(),
807 }
808 }
809
810 fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
811 state
812 .frame
813 .push_back(args.first().cloned().unwrap_or(SqliteValue::Null));
814 Ok(())
815 }
816
817 fn inverse(&self, state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
818 state.frame.pop_front();
819 Ok(())
820 }
821
822 fn value(&self, state: &Self::State) -> Result<SqliteValue> {
823 Ok(state.frame.back().cloned().unwrap_or(SqliteValue::Null))
824 }
825
826 fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
827 Ok(state.frame.back().cloned().unwrap_or(SqliteValue::Null))
828 }
829
830 fn num_args(&self) -> i32 {
831 1
832 }
833
834 fn name(&self) -> &str {
835 "last_value"
836 }
837}
838
839pub struct NthValueState {
844 frame: VecDeque<SqliteValue>,
845 n: i64,
846}
847
848pub struct NthValueFunc;
849
850const INVALID_NTH_VALUE_ARGUMENT: &str = "second argument to nth_value must be a positive integer";
851
852fn integral_f64_to_i64(value: f64) -> Option<i64> {
853 const I64_MIN_AS_F64: f64 = -9_223_372_036_854_775_808.0;
854 const I64_MAX_EXCLUSIVE_AS_F64: f64 = 9_223_372_036_854_775_808.0;
855
856 if !value.is_finite()
857 || value.fract() != 0.0
858 || !(I64_MIN_AS_F64..I64_MAX_EXCLUSIVE_AS_F64).contains(&value)
859 {
860 return None;
861 }
862 Some(value as i64)
863}
864
865fn parse_integral_text(text: &str) -> Option<i64> {
866 let trimmed = text.trim();
867 if trimmed.is_empty() {
868 return None;
869 }
870 trimmed
871 .parse()
872 .ok()
873 .or_else(|| trimmed.parse().ok().and_then(integral_f64_to_i64))
874}
875
876fn nth_value_positive_integer_arg(value: Option<&SqliteValue>) -> Result<i64> {
877 let Some(value) = value else {
878 return Err(FrankenError::function_error(INVALID_NTH_VALUE_ARGUMENT));
879 };
880 let n = match value {
881 SqliteValue::Integer(n) => Some(*n),
882 SqliteValue::Float(n) => integral_f64_to_i64(*n),
883 SqliteValue::Text(text) => parse_integral_text(text),
884 SqliteValue::Null | SqliteValue::Blob(_) => None,
885 };
886 match n {
887 Some(n) if n > 0 => Ok(n),
888 _ => Err(FrankenError::function_error(INVALID_NTH_VALUE_ARGUMENT)),
889 }
890}
891
892impl WindowFunction for NthValueFunc {
893 type State = NthValueState;
894
895 fn initial_state(&self) -> Self::State {
896 NthValueState {
897 frame: VecDeque::new(),
898 n: 1,
899 }
900 }
901
902 fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
903 let val = args.first().cloned().unwrap_or(SqliteValue::Null);
904 let n = nth_value_positive_integer_arg(args.get(1))?;
905 if state.frame.is_empty() {
907 state.n = n;
908 }
909 state.frame.push_back(val);
910 Ok(())
911 }
912
913 fn inverse(&self, state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
914 state.frame.pop_front();
915 Ok(())
916 }
917
918 fn value(&self, state: &Self::State) -> Result<SqliteValue> {
919 let idx = (state.n - 1) as usize;
920 Ok(state.frame.get(idx).cloned().unwrap_or(SqliteValue::Null))
921 }
922
923 fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
924 self.value(&state)
925 }
926
927 fn num_args(&self) -> i32 {
928 2
929 }
930
931 fn name(&self) -> &str {
932 "nth_value"
933 }
934}
935
936pub struct WindowSumState {
942 sum: f64,
943 err: f64,
944 has_value: bool,
945 is_int: bool,
946 int_sum: i64,
947 overflowed: bool,
948}
949
950#[inline]
952fn kbn_step(sum: &mut f64, err: &mut f64, value: f64) {
953 let s = *sum;
954 let t = s + value;
955 if s.abs() > value.abs() {
956 *err += (s - t) + value;
957 } else {
958 *err += (value - t) + s;
959 }
960 *sum = t;
961}
962
963pub struct WindowSumFunc;
964
965impl WindowFunction for WindowSumFunc {
966 type State = WindowSumState;
967
968 fn initial_state(&self) -> Self::State {
969 WindowSumState {
970 sum: 0.0,
971 err: 0.0,
972 has_value: false,
973 is_int: true,
974 int_sum: 0,
975 overflowed: false,
976 }
977 }
978
979 fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
980 if args.is_empty() || args[0].is_null() {
981 return Ok(());
982 }
983 let value = args[0].to_sum_numeric_value();
984 if value.is_null() {
985 return Ok(());
986 }
987 state.has_value = true;
988 match value {
989 SqliteValue::Integer(i) => {
990 if state.is_int && !state.overflowed {
991 match state.int_sum.checked_add(i) {
992 Some(s) => state.int_sum = s,
993 None => state.overflowed = true,
994 }
995 }
996 kbn_step(&mut state.sum, &mut state.err, i as f64);
997 }
998 SqliteValue::Float(f) => {
999 state.is_int = false;
1000 kbn_step(&mut state.sum, &mut state.err, f);
1001 }
1002 SqliteValue::Null | SqliteValue::Text(_) | SqliteValue::Blob(_) => {}
1003 }
1004 Ok(())
1005 }
1006
1007 fn inverse(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
1008 if args.is_empty() || args[0].is_null() {
1009 return Ok(());
1010 }
1011 let value = args[0].to_sum_numeric_value();
1012 match value {
1013 SqliteValue::Integer(i) => {
1014 if state.is_int && !state.overflowed {
1015 match state.int_sum.checked_sub(i) {
1016 Some(s) => state.int_sum = s,
1017 None => state.overflowed = true,
1018 }
1019 }
1020 kbn_step(&mut state.sum, &mut state.err, -(i as f64));
1021 }
1022 SqliteValue::Float(f) => {
1023 state.is_int = false;
1024 kbn_step(&mut state.sum, &mut state.err, -f);
1025 }
1026 SqliteValue::Null | SqliteValue::Text(_) | SqliteValue::Blob(_) => {}
1027 }
1028 Ok(())
1029 }
1030
1031 fn value(&self, state: &Self::State) -> Result<SqliteValue> {
1032 if !state.has_value {
1033 return Ok(SqliteValue::Null);
1034 }
1035 if state.is_int && state.overflowed {
1036 return Err(FrankenError::IntegerOverflow);
1037 }
1038 if state.is_int {
1039 Ok(SqliteValue::Integer(state.int_sum))
1040 } else {
1041 Ok(SqliteValue::Float(state.sum + state.err))
1042 }
1043 }
1044
1045 fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
1046 self.value(&state)
1047 }
1048
1049 fn num_args(&self) -> i32 {
1050 1
1051 }
1052
1053 fn name(&self) -> &str {
1054 "SUM"
1055 }
1056}
1057
1058pub struct WindowTotalFunc;
1059
1060impl WindowFunction for WindowTotalFunc {
1061 type State = f64;
1062
1063 fn initial_state(&self) -> Self::State {
1064 0.0
1065 }
1066
1067 fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
1068 if !args.is_empty() && !args[0].is_null() {
1069 *state += args[0].to_float();
1070 }
1071 Ok(())
1072 }
1073
1074 fn inverse(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
1075 if !args.is_empty() && !args[0].is_null() {
1076 *state -= args[0].to_float();
1077 }
1078 Ok(())
1079 }
1080
1081 fn value(&self, state: &Self::State) -> Result<SqliteValue> {
1082 Ok(SqliteValue::Float(*state))
1083 }
1084
1085 fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
1086 Ok(SqliteValue::Float(state))
1087 }
1088
1089 fn num_args(&self) -> i32 {
1090 1
1091 }
1092
1093 fn name(&self) -> &str {
1094 "TOTAL"
1095 }
1096}
1097
1098pub struct WindowCountState {
1099 count: i64,
1100}
1101
1102pub struct WindowCountFunc;
1103
1104impl WindowFunction for WindowCountFunc {
1105 type State = WindowCountState;
1106
1107 fn initial_state(&self) -> Self::State {
1108 WindowCountState { count: 0 }
1109 }
1110
1111 fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
1112 if args.is_empty() || !args[0].is_null() {
1114 state.count += 1;
1115 }
1116 Ok(())
1117 }
1118
1119 fn inverse(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
1120 if args.is_empty() || !args[0].is_null() {
1121 state.count -= 1;
1122 }
1123 Ok(())
1124 }
1125
1126 fn value(&self, state: &Self::State) -> Result<SqliteValue> {
1127 Ok(SqliteValue::Integer(state.count))
1128 }
1129
1130 fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
1131 Ok(SqliteValue::Integer(state.count))
1132 }
1133
1134 fn num_args(&self) -> i32 {
1135 -1 }
1137
1138 fn min_args(&self) -> i32 {
1139 0
1140 }
1141
1142 fn max_args(&self) -> Option<i32> {
1143 Some(1)
1144 }
1145
1146 fn name(&self) -> &str {
1147 "COUNT"
1148 }
1149}
1150
1151pub struct WindowMinState {
1152 min: Option<SqliteValue>,
1153}
1154
1155pub struct WindowMinFunc;
1156
1157impl WindowFunction for WindowMinFunc {
1158 type State = WindowMinState;
1159
1160 fn initial_state(&self) -> Self::State {
1161 WindowMinState { min: None }
1162 }
1163
1164 fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
1165 if args.is_empty() || args[0].is_null() {
1166 return Ok(());
1167 }
1168 state.min = Some(match state.min.take() {
1169 None => args[0].clone(),
1170 Some(cur) => {
1171 if cmp_values(&args[0], &cur) == std::cmp::Ordering::Less {
1172 args[0].clone()
1173 } else {
1174 cur
1175 }
1176 }
1177 });
1178 Ok(())
1179 }
1180
1181 fn inverse(&self, _state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
1182 Ok(())
1184 }
1185
1186 fn value(&self, state: &Self::State) -> Result<SqliteValue> {
1187 Ok(state.min.clone().unwrap_or(SqliteValue::Null))
1188 }
1189
1190 fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
1191 Ok(state.min.unwrap_or(SqliteValue::Null))
1192 }
1193
1194 fn num_args(&self) -> i32 {
1195 1
1196 }
1197
1198 fn name(&self) -> &str {
1199 "MIN"
1200 }
1201}
1202
1203pub struct WindowMaxState {
1204 max: Option<SqliteValue>,
1205}
1206
1207pub struct WindowMaxFunc;
1208
1209impl WindowFunction for WindowMaxFunc {
1210 type State = WindowMaxState;
1211
1212 fn initial_state(&self) -> Self::State {
1213 WindowMaxState { max: None }
1214 }
1215
1216 fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
1217 if args.is_empty() || args[0].is_null() {
1218 return Ok(());
1219 }
1220 state.max = Some(match state.max.take() {
1221 None => args[0].clone(),
1222 Some(cur) => {
1223 if cmp_values(&args[0], &cur) == std::cmp::Ordering::Greater {
1224 args[0].clone()
1225 } else {
1226 cur
1227 }
1228 }
1229 });
1230 Ok(())
1231 }
1232
1233 fn inverse(&self, _state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
1234 Ok(())
1235 }
1236
1237 fn value(&self, state: &Self::State) -> Result<SqliteValue> {
1238 Ok(state.max.clone().unwrap_or(SqliteValue::Null))
1239 }
1240
1241 fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
1242 Ok(state.max.unwrap_or(SqliteValue::Null))
1243 }
1244
1245 fn num_args(&self) -> i32 {
1246 1
1247 }
1248
1249 fn name(&self) -> &str {
1250 "MAX"
1251 }
1252}
1253
1254pub struct WindowAvgState {
1255 sum: f64,
1256 err: f64,
1257 count: i64,
1258}
1259
1260pub struct WindowAvgFunc;
1261
1262impl WindowFunction for WindowAvgFunc {
1263 type State = WindowAvgState;
1264
1265 fn initial_state(&self) -> Self::State {
1266 WindowAvgState {
1267 sum: 0.0,
1268 err: 0.0,
1269 count: 0,
1270 }
1271 }
1272
1273 fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
1274 if args.is_empty() || args[0].is_null() {
1275 return Ok(());
1276 }
1277 kbn_step(&mut state.sum, &mut state.err, args[0].to_float());
1278 state.count += 1;
1279 Ok(())
1280 }
1281
1282 fn inverse(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
1283 if args.is_empty() || args[0].is_null() {
1284 return Ok(());
1285 }
1286 kbn_step(&mut state.sum, &mut state.err, -args[0].to_float());
1287 state.count -= 1;
1288 Ok(())
1289 }
1290
1291 fn value(&self, state: &Self::State) -> Result<SqliteValue> {
1292 if state.count == 0 {
1293 Ok(SqliteValue::Null)
1294 } else {
1295 #[allow(clippy::cast_precision_loss)]
1296 Ok(SqliteValue::Float(
1297 (state.sum + state.err) / state.count as f64,
1298 ))
1299 }
1300 }
1301
1302 fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
1303 self.value(&state)
1304 }
1305
1306 fn num_args(&self) -> i32 {
1307 1
1308 }
1309
1310 fn name(&self) -> &str {
1311 "AVG"
1312 }
1313}
1314
1315pub struct WindowGroupConcatState {
1316 result: String,
1317 has_value: bool,
1318}
1319
1320fn window_group_concat_step(state: &mut WindowGroupConcatState, args: &[SqliteValue]) {
1321 if args.is_empty() || args[0].is_null() {
1322 return;
1323 }
1324 if state.has_value {
1325 match args.get(1) {
1326 Some(separator) if !separator.is_null() => {
1327 if let Some(text) = separator.as_text_str() {
1328 state.result.push_str(text);
1329 } else {
1330 state.result.push_str(&separator.to_text());
1331 }
1332 }
1333 Some(_) => {}
1334 None => state.result.push(','),
1335 }
1336 }
1337 if let Some(text) = args[0].as_text_str() {
1338 state.result.push_str(text);
1339 } else {
1340 state.result.push_str(&args[0].to_text());
1341 }
1342 state.has_value = true;
1343}
1344
1345fn window_group_concat_value(state: &WindowGroupConcatState) -> SqliteValue {
1346 if state.has_value {
1347 SqliteValue::Text(state.result.clone().into())
1348 } else {
1349 SqliteValue::Null
1350 }
1351}
1352
1353pub struct WindowGroupConcatFunc;
1354
1355impl WindowFunction for WindowGroupConcatFunc {
1356 type State = WindowGroupConcatState;
1357
1358 fn initial_state(&self) -> Self::State {
1359 WindowGroupConcatState {
1360 result: String::new(),
1361 has_value: false,
1362 }
1363 }
1364
1365 fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
1366 window_group_concat_step(state, args);
1367 Ok(())
1368 }
1369
1370 fn inverse(&self, _state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
1371 Ok(())
1374 }
1375
1376 fn value(&self, state: &Self::State) -> Result<SqliteValue> {
1377 Ok(window_group_concat_value(state))
1378 }
1379
1380 fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
1381 Ok(window_group_concat_value(&state))
1382 }
1383
1384 fn num_args(&self) -> i32 {
1385 -1
1386 }
1387
1388 fn min_args(&self) -> i32 {
1389 1
1390 }
1391
1392 fn max_args(&self) -> Option<i32> {
1393 Some(2)
1394 }
1395
1396 fn name(&self) -> &str {
1397 "group_concat"
1398 }
1399}
1400
1401pub struct WindowStringAggFunc;
1402
1403impl WindowFunction for WindowStringAggFunc {
1404 type State = WindowGroupConcatState;
1405
1406 fn initial_state(&self) -> Self::State {
1407 WindowGroupConcatState {
1408 result: String::new(),
1409 has_value: false,
1410 }
1411 }
1412
1413 fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
1414 window_group_concat_step(state, args);
1415 Ok(())
1416 }
1417
1418 fn inverse(&self, _state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
1419 Ok(())
1420 }
1421
1422 fn value(&self, state: &Self::State) -> Result<SqliteValue> {
1423 Ok(window_group_concat_value(state))
1424 }
1425
1426 fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
1427 Ok(window_group_concat_value(&state))
1428 }
1429
1430 fn num_args(&self) -> i32 {
1431 2
1432 }
1433
1434 fn name(&self) -> &str {
1435 "string_agg"
1436 }
1437}
1438
1439pub fn cmp_values(a: &SqliteValue, b: &SqliteValue) -> std::cmp::Ordering {
1441 a.cmp(b)
1442}
1443
1444pub fn register_window_builtins(registry: &mut FunctionRegistry) {
1448 registry.register_window(RowNumberFunc);
1449 registry.register_window(RankFunc);
1450 registry.register_window(DenseRankFunc);
1451 registry.register_window(PercentRankFunc);
1452 registry.register_window(CumeDistFunc);
1453 registry.register_window(NtileFunc);
1454 registry.register_window(LagFunc);
1455 registry.register_window(LeadFunc);
1456 registry.register_window(FirstValueFunc);
1457 registry.register_window(LastValueFunc);
1458 registry.register_window(NthValueFunc);
1459
1460 registry.register_window(WindowSumFunc);
1462 registry.register_window(WindowTotalFunc);
1463 registry.register_window(WindowCountFunc);
1464 registry.register_window(WindowMinFunc);
1465 registry.register_window(WindowMaxFunc);
1466 registry.register_window(WindowAvgFunc);
1467 registry.register_window(WindowGroupConcatFunc);
1468 registry.register_window(WindowStringAggFunc);
1469}
1470
1471#[cfg(test)]
1474mod tests {
1475 use super::*;
1476
1477 fn int(v: i64) -> SqliteValue {
1478 SqliteValue::Integer(v)
1479 }
1480
1481 fn float(v: f64) -> SqliteValue {
1482 SqliteValue::Float(v)
1483 }
1484
1485 fn text(s: &str) -> SqliteValue {
1486 SqliteValue::Text(s.into())
1487 }
1488
1489 fn blob(bytes: &[u8]) -> SqliteValue {
1490 SqliteValue::Blob(std::sync::Arc::from(bytes))
1491 }
1492
1493 fn null() -> SqliteValue {
1494 SqliteValue::Null
1495 }
1496
1497 fn assert_function_error(err: FrankenError, expected: &str) {
1498 assert!(
1499 matches!(&err, FrankenError::FunctionError(message) if message == expected),
1500 "expected function error {expected:?}, got {err:?}"
1501 );
1502 }
1503
1504 fn assert_float_near(value: &SqliteValue, expected: f64) {
1505 assert!(
1506 matches!(value, SqliteValue::Float(_)),
1507 "expected Float, got {value:?}"
1508 );
1509 if let SqliteValue::Float(actual) = value {
1510 assert!(
1511 (*actual - expected).abs() < 1e-10,
1512 "expected {expected}, got {actual}"
1513 );
1514 }
1515 }
1516
1517 fn run_window_partition<F: WindowFunction>(
1521 func: &F,
1522 rows: &[Vec<SqliteValue>],
1523 ) -> Vec<SqliteValue> {
1524 let mut state = func.initial_state();
1525 let mut results = Vec::new();
1526 for row in rows {
1527 func.step(&mut state, row).unwrap();
1528 results.push(func.value(&state).unwrap());
1529 }
1530 results
1531 }
1532
1533 fn run_window_two_pass<F: WindowFunction>(
1538 func: &F,
1539 rows: &[Vec<SqliteValue>],
1540 ) -> Vec<SqliteValue> {
1541 let mut state = func.initial_state();
1542 for row in rows {
1544 func.step(&mut state, row).unwrap();
1545 }
1546 let mut results = Vec::new();
1548 for (i, _) in rows.iter().enumerate() {
1549 results.push(func.value(&state).unwrap());
1550 if i < rows.len() - 1 {
1551 func.inverse(&mut state, &[]).unwrap();
1552 }
1553 }
1554 results
1555 }
1556
1557 #[test]
1560 fn test_row_number_basic() {
1561 let results =
1562 run_window_partition(&RowNumberFunc, &[vec![], vec![], vec![], vec![], vec![]]);
1563 assert_eq!(results, vec![int(1), int(2), int(3), int(4), int(5)]);
1564 }
1565
1566 #[test]
1567 fn test_row_number_partition_reset() {
1568 let r1 = run_window_partition(&RowNumberFunc, &[vec![], vec![], vec![]]);
1570 assert_eq!(r1, vec![int(1), int(2), int(3)]);
1571
1572 let r2 = run_window_partition(&RowNumberFunc, &[vec![], vec![]]);
1574 assert_eq!(r2, vec![int(1), int(2)]);
1575 }
1576
1577 #[test]
1580 fn test_rank_with_ties() {
1581 let results = run_window_partition(
1583 &RankFunc,
1584 &[vec![int(1)], vec![int(2)], vec![int(2)], vec![int(3)]],
1585 );
1586 assert_eq!(results, vec![int(1), int(2), int(2), int(4)]);
1587 }
1588
1589 #[test]
1590 fn test_rank_no_ties() {
1591 let results =
1592 run_window_partition(&RankFunc, &[vec![int(10)], vec![int(20)], vec![int(30)]]);
1593 assert_eq!(results, vec![int(1), int(2), int(3)]);
1594 }
1595
1596 #[test]
1599 fn test_dense_rank_with_ties() {
1600 let results = run_window_partition(
1602 &DenseRankFunc,
1603 &[vec![int(1)], vec![int(2)], vec![int(2)], vec![int(3)]],
1604 );
1605 assert_eq!(results, vec![int(1), int(2), int(2), int(3)]);
1606 }
1607
1608 #[test]
1609 fn test_dense_rank_multiple_ties() {
1610 let results = run_window_partition(
1612 &DenseRankFunc,
1613 &[
1614 vec![int(1)],
1615 vec![int(1)],
1616 vec![int(2)],
1617 vec![int(2)],
1618 vec![int(3)],
1619 ],
1620 );
1621 assert_eq!(results, vec![int(1), int(1), int(2), int(2), int(3)]);
1622 }
1623
1624 #[test]
1627 fn test_percent_rank_single_row() {
1628 let results = run_window_two_pass(&PercentRankFunc, &[vec![int(1)]]);
1629 assert_eq!(results, vec![SqliteValue::Float(0.0)]);
1630 }
1631
1632 #[test]
1633 fn test_percent_rank_formula() {
1634 let results = run_window_two_pass(
1637 &PercentRankFunc,
1638 &[vec![int(1)], vec![int(2)], vec![int(2)], vec![int(3)]],
1639 );
1640 assert_float_near(&results[0], 0.0);
1645 assert_float_near(&results[1], 1.0 / 3.0);
1646 assert_float_near(&results[2], 1.0 / 3.0);
1647 assert_float_near(&results[3], 1.0);
1648 }
1649
1650 #[test]
1653 fn test_cume_dist_distinct() {
1654 let results = run_window_two_pass(
1656 &CumeDistFunc,
1657 &[vec![int(1)], vec![int(2)], vec![int(3)], vec![int(4)]],
1658 );
1659 for (i, expected) in [0.25, 0.5, 0.75, 1.0].iter().enumerate() {
1660 assert_float_near(&results[i], *expected);
1661 }
1662 }
1663
1664 #[test]
1665 fn test_cume_dist_with_ties() {
1666 let results = run_window_two_pass(
1668 &CumeDistFunc,
1669 &[vec![int(1)], vec![int(2)], vec![int(2)], vec![int(3)]],
1670 );
1671 for (i, expected) in [0.25, 0.75, 0.75, 1.0].iter().enumerate() {
1672 assert_float_near(&results[i], *expected);
1673 }
1674 }
1675
1676 #[test]
1677 fn test_cume_dist_without_order_treats_partition_as_one_peer_group() {
1678 let results = run_window_two_pass(&CumeDistFunc, &[vec![], vec![], vec![]]);
1679 for value in results {
1680 assert_float_near(&value, 1.0);
1681 }
1682 }
1683
1684 #[test]
1687 fn test_ntile_even() {
1688 let rows: Vec<Vec<SqliteValue>> = (0..8).map(|_| vec![int(4)]).collect();
1690 let results = run_window_two_pass(&NtileFunc, &rows);
1691 assert_eq!(
1692 results,
1693 vec![
1694 int(1),
1695 int(1),
1696 int(2),
1697 int(2),
1698 int(3),
1699 int(3),
1700 int(4),
1701 int(4)
1702 ]
1703 );
1704 }
1705
1706 #[test]
1707 fn test_ntile_uneven() {
1708 let rows: Vec<Vec<SqliteValue>> = (0..10).map(|_| vec![int(3)]).collect();
1710 let results = run_window_two_pass(&NtileFunc, &rows);
1711 assert_eq!(
1712 results,
1713 vec![
1714 int(1),
1715 int(1),
1716 int(1),
1717 int(1),
1718 int(2),
1719 int(2),
1720 int(2),
1721 int(3),
1722 int(3),
1723 int(3)
1724 ]
1725 );
1726 }
1727
1728 #[test]
1729 fn test_ntile_more_buckets_than_rows() {
1730 let rows: Vec<Vec<SqliteValue>> = (0..3).map(|_| vec![int(10)]).collect();
1732 let results = run_window_two_pass(&NtileFunc, &rows);
1733 assert_eq!(results, vec![int(1), int(2), int(3)]);
1734 }
1735
1736 #[test]
1737 fn test_ntile_rejects_non_positive_argument() {
1738 for n in [0, -1] {
1739 let mut state = NtileFunc.initial_state();
1740 let err = NtileFunc.step(&mut state, &[int(n)]).unwrap_err();
1741 assert_function_error(err, INVALID_NTILE_ARGUMENT);
1742 }
1743 }
1744
1745 #[test]
1748 fn test_lag_default() {
1749 let results = run_window_two_pass(&LagFunc, &[vec![int(10)], vec![int(20)], vec![int(30)]]);
1751 assert_eq!(results, vec![null(), int(10), int(20)]);
1752 }
1753
1754 #[test]
1755 fn test_lag_offset_3() {
1756 let results = run_window_two_pass(
1758 &LagFunc,
1759 &[
1760 vec![int(10), int(3)],
1761 vec![int(20), int(3)],
1762 vec![int(30), int(3)],
1763 vec![int(40), int(3)],
1764 vec![int(50), int(3)],
1765 ],
1766 );
1767 assert_eq!(results, vec![null(), null(), null(), int(10), int(20)]);
1768 }
1769
1770 #[test]
1771 fn test_lag_default_value() {
1772 let results = run_window_two_pass(
1774 &LagFunc,
1775 &[
1776 vec![int(10), int(1), int(-1)],
1777 vec![int(20), int(1), int(-1)],
1778 ],
1779 );
1780 assert_eq!(results, vec![int(-1), int(10)]);
1781 }
1782
1783 #[test]
1784 fn test_lag_null_offset_returns_default_for_each_row() {
1785 let results = run_window_two_pass(
1786 &LagFunc,
1787 &[
1788 vec![int(10), null(), text("N/A")],
1789 vec![int(20), null(), text("N/A")],
1790 ],
1791 );
1792 assert_eq!(results, vec![text("N/A"), text("N/A")]);
1793 }
1794
1795 #[test]
1796 fn test_lag_uses_current_row_offset_and_default() {
1797 let results = run_window_two_pass(
1798 &LagFunc,
1799 &[
1800 vec![int(10), int(1), text("first")],
1801 vec![int(20), null(), text("null-offset")],
1802 vec![int(30), int(1), text("third")],
1803 vec![int(40), int(2), text("fourth")],
1804 ],
1805 );
1806 assert_eq!(
1807 results,
1808 vec![text("first"), text("null-offset"), int(20), int(20)]
1809 );
1810 }
1811
1812 #[test]
1813 fn test_lag_negative_offset_reads_following_row() {
1814 let results = run_window_two_pass(
1815 &LagFunc,
1816 &[
1817 vec![int(10), int(-1), text("N/A")],
1818 vec![int(20), int(-1), text("N/A")],
1819 vec![int(30), int(-1), text("N/A")],
1820 ],
1821 );
1822 assert_eq!(results, vec![int(20), int(30), text("N/A")]);
1823 }
1824
1825 #[test]
1826 fn test_lag_fractional_offset_uses_default() {
1827 let results = run_window_two_pass(
1828 &LagFunc,
1829 &[
1830 vec![int(10), float(1.5), text("N/A")],
1831 vec![int(20), float(1.5), text("N/A")],
1832 vec![int(30), float(1.5), text("N/A")],
1833 ],
1834 );
1835 assert_eq!(results, vec![text("N/A"), text("N/A"), text("N/A")]);
1836 }
1837
1838 #[test]
1839 fn test_lag_nonnumeric_text_offset_reads_current_row() {
1840 let results = run_window_two_pass(
1841 &LagFunc,
1842 &[
1843 vec![int(10), text("abc"), text("N/A")],
1844 vec![int(20), text("abc"), text("N/A")],
1845 vec![int(30), text("abc"), text("N/A")],
1846 ],
1847 );
1848 assert_eq!(results, vec![int(10), int(20), int(30)]);
1849 }
1850
1851 #[test]
1852 fn test_lag_integral_text_prefix_offset() {
1853 let results = run_window_two_pass(
1854 &LagFunc,
1855 &[
1856 vec![int(10), text("2.0x"), text("N/A")],
1857 vec![int(20), text("2e0x"), text("N/A")],
1858 vec![int(30), text("2x"), text("N/A")],
1859 ],
1860 );
1861 assert_eq!(results, vec![text("N/A"), text("N/A"), int(10)]);
1862 }
1863
1864 #[test]
1867 fn test_lead_default() {
1868 let func = LeadFunc;
1872 let mut state = func.initial_state();
1873 let rows = [int(10), int(20), int(30)];
1874
1875 for row in &rows {
1877 func.step(&mut state, std::slice::from_ref(row)).unwrap();
1878 }
1879
1880 let mut results = Vec::new();
1882 for _ in &rows {
1883 results.push(func.value(&state).unwrap());
1884 func.inverse(&mut state, &[]).unwrap();
1885 }
1886 assert_eq!(results, vec![int(20), int(30), null()]);
1887 }
1888
1889 #[test]
1890 fn test_lead_offset_2() {
1891 let func = LeadFunc;
1892 let mut state = func.initial_state();
1893 let rows = [int(10), int(20), int(30), int(40), int(50)];
1894
1895 for row in &rows {
1896 func.step(&mut state, &[row.clone(), int(2)]).unwrap();
1897 }
1898
1899 let mut results = Vec::new();
1900 for _ in &rows {
1901 results.push(func.value(&state).unwrap());
1902 func.inverse(&mut state, &[]).unwrap();
1903 }
1904 assert_eq!(results, vec![int(30), int(40), int(50), null(), null()]);
1905 }
1906
1907 #[test]
1908 fn test_lead_default_value() {
1909 let func = LeadFunc;
1910 let mut state = func.initial_state();
1911 let rows = [int(10), int(20)];
1912
1913 for row in &rows {
1914 func.step(&mut state, &[row.clone(), int(1), text("N/A")])
1915 .unwrap();
1916 }
1917
1918 let mut results = Vec::new();
1919 for _ in &rows {
1920 results.push(func.value(&state).unwrap());
1921 func.inverse(&mut state, &[]).unwrap();
1922 }
1923 assert_eq!(results, vec![int(20), text("N/A")]);
1924 }
1925
1926 #[test]
1927 fn test_lead_null_offset_returns_default_for_each_row() {
1928 let func = LeadFunc;
1929 let mut state = func.initial_state();
1930 let rows = [int(10), int(20)];
1931
1932 for row in &rows {
1933 func.step(&mut state, &[row.clone(), null(), text("N/A")])
1934 .unwrap();
1935 }
1936
1937 let mut results = Vec::new();
1938 for _ in &rows {
1939 results.push(func.value(&state).unwrap());
1940 func.inverse(&mut state, &[]).unwrap();
1941 }
1942 assert_eq!(results, vec![text("N/A"), text("N/A")]);
1943 }
1944
1945 #[test]
1946 fn test_lead_uses_current_row_offset_and_default() {
1947 let func = LeadFunc;
1948 let mut state = func.initial_state();
1949 let rows = [
1950 vec![int(10), int(1), text("first")],
1951 vec![int(20), null(), text("null-offset")],
1952 vec![int(30), int(1), text("third")],
1953 ];
1954
1955 for row in &rows {
1956 func.step(&mut state, row).unwrap();
1957 }
1958
1959 let mut results = Vec::new();
1960 for _ in &rows {
1961 results.push(func.value(&state).unwrap());
1962 func.inverse(&mut state, &[]).unwrap();
1963 }
1964 assert_eq!(results, vec![int(20), text("null-offset"), text("third")]);
1965 }
1966
1967 #[test]
1968 fn test_lead_negative_offset_reads_previous_row() {
1969 let func = LeadFunc;
1970 let mut state = func.initial_state();
1971 let rows = [int(10), int(20), int(30)];
1972
1973 for row in &rows {
1974 func.step(&mut state, &[row.clone(), int(-1), text("N/A")])
1975 .unwrap();
1976 }
1977
1978 let mut results = Vec::new();
1979 for _ in &rows {
1980 results.push(func.value(&state).unwrap());
1981 func.inverse(&mut state, &[]).unwrap();
1982 }
1983 assert_eq!(results, vec![text("N/A"), int(10), int(20)]);
1984 }
1985
1986 #[test]
1987 fn test_lead_fractional_offset_uses_default() {
1988 let results = run_window_two_pass(
1989 &LeadFunc,
1990 &[
1991 vec![int(10), float(1.5), text("N/A")],
1992 vec![int(20), float(1.5), text("N/A")],
1993 vec![int(30), float(1.5), text("N/A")],
1994 ],
1995 );
1996 assert_eq!(results, vec![text("N/A"), text("N/A"), text("N/A")]);
1997 }
1998
1999 #[test]
2000 fn test_lead_integral_blob_offset() {
2001 let results = run_window_two_pass(
2002 &LeadFunc,
2003 &[
2004 vec![int(10), blob(b"2.0x"), text("N/A")],
2005 vec![int(20), blob(b"2e0x"), text("N/A")],
2006 vec![int(30), blob(b"2x"), text("N/A")],
2007 ],
2008 );
2009 assert_eq!(results, vec![int(30), text("N/A"), text("N/A")]);
2010 }
2011
2012 #[test]
2015 fn test_first_value_basic() {
2016 let results = run_window_partition(
2017 &FirstValueFunc,
2018 &[vec![int(10)], vec![int(20)], vec![int(30)]],
2019 );
2020 assert_eq!(results, vec![int(10), int(10), int(10)]);
2023 }
2024
2025 #[test]
2028 fn test_last_value_default_frame() {
2029 let results = run_window_partition(
2032 &LastValueFunc,
2033 &[vec![int(10)], vec![int(20)], vec![int(30)]],
2034 );
2035 assert_eq!(results, vec![int(10), int(20), int(30)]);
2036 }
2037
2038 #[test]
2039 fn test_last_value_unbounded_following() {
2040 let func = LastValueFunc;
2043 let mut state = func.initial_state();
2044 func.step(&mut state, &[int(10)]).unwrap();
2045 func.step(&mut state, &[int(20)]).unwrap();
2046 func.step(&mut state, &[int(30)]).unwrap();
2047 assert_eq!(func.value(&state).unwrap(), int(30));
2048 }
2049
2050 #[test]
2053 fn test_nth_value_basic() {
2054 let func = NthValueFunc;
2055 let mut state = func.initial_state();
2056 func.step(&mut state, &[int(10), int(3)]).unwrap();
2058 func.step(&mut state, &[int(20), int(3)]).unwrap();
2059 func.step(&mut state, &[int(30), int(3)]).unwrap();
2060 func.step(&mut state, &[int(40), int(3)]).unwrap();
2061 func.step(&mut state, &[int(50), int(3)]).unwrap();
2062 assert_eq!(func.value(&state).unwrap(), int(30));
2063 }
2064
2065 #[test]
2066 fn test_nth_value_out_of_range() {
2067 let func = NthValueFunc;
2068 let mut state = func.initial_state();
2069 func.step(&mut state, &[int(10), int(100)]).unwrap();
2070 func.step(&mut state, &[int(20), int(100)]).unwrap();
2071 assert_eq!(func.value(&state).unwrap(), null());
2073 }
2074
2075 #[test]
2076 fn test_nth_value_n_zero() {
2077 let func = NthValueFunc;
2078 let mut state = func.initial_state();
2079 let err = func.step(&mut state, &[int(10), int(0)]).unwrap_err();
2080 assert_function_error(err, INVALID_NTH_VALUE_ARGUMENT);
2081 }
2082
2083 #[test]
2084 fn test_nth_value_rejects_negative_n() {
2085 let func = NthValueFunc;
2086 let mut state = func.initial_state();
2087 let err = func.step(&mut state, &[int(10), int(-1)]).unwrap_err();
2088 assert_function_error(err, INVALID_NTH_VALUE_ARGUMENT);
2089 }
2090
2091 #[test]
2092 fn test_nth_value_accepts_integral_real_n() {
2093 let func = NthValueFunc;
2094 let mut state = func.initial_state();
2095 func.step(&mut state, &[int(10), float(2.0)]).unwrap();
2096 func.step(&mut state, &[int(20), float(2.0)]).unwrap();
2097 assert_eq!(func.value(&state).unwrap(), int(20));
2098 }
2099
2100 #[test]
2101 fn test_nth_value_accepts_integral_text_n() {
2102 let func = NthValueFunc;
2103 let mut state = func.initial_state();
2104 func.step(&mut state, &[int(10), text("2e0")]).unwrap();
2105 func.step(&mut state, &[int(20), text("2.0")]).unwrap();
2106 assert_eq!(func.value(&state).unwrap(), int(20));
2107 }
2108
2109 #[test]
2110 fn test_nth_value_rejects_fractional_real_n() {
2111 let func = NthValueFunc;
2112 let mut state = func.initial_state();
2113 let err = func.step(&mut state, &[int(10), float(1.5)]).unwrap_err();
2114 assert_function_error(err, INVALID_NTH_VALUE_ARGUMENT);
2115 }
2116
2117 #[test]
2118 fn test_nth_value_rejects_fractional_text_n() {
2119 let func = NthValueFunc;
2120 let mut state = func.initial_state();
2121 let err = func.step(&mut state, &[int(10), text("1.5")]).unwrap_err();
2122 assert_function_error(err, INVALID_NTH_VALUE_ARGUMENT);
2123 }
2124
2125 #[test]
2126 fn test_nth_value_rejects_text_numeric_prefix_n() {
2127 let func = NthValueFunc;
2128 let mut state = func.initial_state();
2129 let err = func.step(&mut state, &[int(10), text("2x")]).unwrap_err();
2130 assert_function_error(err, INVALID_NTH_VALUE_ARGUMENT);
2131 }
2132
2133 #[test]
2134 fn test_nth_value_rejects_blob_integer_n() {
2135 let func = NthValueFunc;
2136 let mut state = func.initial_state();
2137 let err = func.step(&mut state, &[int(10), blob(b"2")]).unwrap_err();
2138 assert_function_error(err, INVALID_NTH_VALUE_ARGUMENT);
2139 }
2140
2141 #[test]
2142 fn test_nth_value_rejects_non_positive_n_after_first_row() {
2143 let func = NthValueFunc;
2144 let mut state = func.initial_state();
2145 func.step(&mut state, &[int(10), int(1)]).unwrap();
2146 let err = func.step(&mut state, &[int(20), int(0)]).unwrap_err();
2147 assert_function_error(err, INVALID_NTH_VALUE_ARGUMENT);
2148 }
2149
2150 #[test]
2153 fn test_window_sum_text_integer_literals_stay_integer() {
2154 let results = run_window_partition(&WindowSumFunc, &[vec![text("1")], vec![text("2")]]);
2155 assert_eq!(results, vec![int(1), int(3)]);
2156 }
2157
2158 #[test]
2159 fn test_window_sum_non_numeric_text_returns_real_zero() {
2160 let results = run_window_partition(&WindowSumFunc, &[vec![text("abc")]]);
2161 assert_eq!(results, vec![float(0.0)]);
2162 }
2163
2164 #[test]
2165 fn test_window_sum_overflow_suppressed_after_float_input() {
2166 let func = WindowSumFunc;
2167 let mut state = func.initial_state();
2168
2169 func.step(&mut state, &[int(i64::MAX)]).unwrap();
2170 func.step(&mut state, &[int(1)]).unwrap();
2171 assert_eq!(
2172 func.value(&state).unwrap_err().to_string(),
2173 "integer overflow"
2174 );
2175
2176 func.step(&mut state, &[float(0.5)]).unwrap();
2177 assert!(matches!(func.value(&state).unwrap(), SqliteValue::Float(_)));
2178 }
2179
2180 #[test]
2183 fn test_window_min_max_use_sqlite_storage_class_order() {
2184 let text_value = text("z");
2185 let blob_value = blob(b"\0");
2186
2187 let mut min_state = WindowMinFunc.initial_state();
2188 WindowMinFunc
2189 .step(&mut min_state, std::slice::from_ref(&blob_value))
2190 .unwrap();
2191 WindowMinFunc
2192 .step(&mut min_state, std::slice::from_ref(&text_value))
2193 .unwrap();
2194 assert_eq!(WindowMinFunc.value(&min_state).unwrap(), text_value);
2195
2196 let mut max_state = WindowMaxFunc.initial_state();
2197 WindowMaxFunc
2198 .step(&mut max_state, std::slice::from_ref(&text_value))
2199 .unwrap();
2200 WindowMaxFunc
2201 .step(&mut max_state, std::slice::from_ref(&blob_value))
2202 .unwrap();
2203 assert_eq!(WindowMaxFunc.value(&max_state).unwrap(), blob_value);
2204 }
2205
2206 #[test]
2209 fn test_window_group_concat_running_default_separator() {
2210 let results = run_window_partition(
2211 &WindowGroupConcatFunc,
2212 &[vec![text("a")], vec![text("b")], vec![text("c")]],
2213 );
2214 assert_eq!(results, vec![text("a"), text("a,b"), text("a,b,c")]);
2215 }
2216
2217 #[test]
2218 fn test_window_group_concat_running_custom_separator() {
2219 let results = run_window_partition(
2220 &WindowGroupConcatFunc,
2221 &[
2222 vec![text("a"), text(" | ")],
2223 vec![text("b"), text(" | ")],
2224 vec![text("c"), text(" | ")],
2225 ],
2226 );
2227 assert_eq!(results, vec![text("a"), text("a | b"), text("a | b | c")]);
2228 }
2229
2230 #[test]
2231 fn test_window_group_concat_skips_null_and_uses_current_row_separator() {
2232 let results = run_window_partition(
2233 &WindowGroupConcatFunc,
2234 &[
2235 vec![text("a"), text("-")],
2236 vec![null(), text("?")],
2237 vec![text("b"), text("+")],
2238 vec![text("c"), text("*")],
2239 ],
2240 );
2241 assert_eq!(
2242 results,
2243 vec![text("a"), text("a"), text("a+b"), text("a+b*c")]
2244 );
2245 }
2246
2247 #[test]
2248 fn test_window_string_agg_alias_through_registry() {
2249 let mut reg = FunctionRegistry::new();
2250 register_window_builtins(&mut reg);
2251
2252 let sa = reg.find_window("string_agg", 2).unwrap();
2253 let mut state = sa.initial_state();
2254 sa.step(&mut state, &[text("a"), text(";")]).unwrap();
2255 assert_eq!(sa.value(&state).unwrap(), text("a"));
2256 sa.step(&mut state, &[text("b"), text(";")]).unwrap();
2257 assert_eq!(sa.value(&state).unwrap(), text("a;b"));
2258 }
2259
2260 #[test]
2263 fn test_register_window_builtins_all_present() {
2264 let mut reg = FunctionRegistry::new();
2265 register_window_builtins(&mut reg);
2266
2267 let expected_variadic = [
2268 "row_number",
2269 "rank",
2270 "dense_rank",
2271 "percent_rank",
2272 "cume_dist",
2273 "lag",
2274 "lead",
2275 ];
2276 for name in expected_variadic {
2277 assert!(
2278 reg.find_window(name, 0).is_some()
2279 || reg.find_window(name, 1).is_some()
2280 || reg.find_window(name, -1).is_some(),
2281 "window function '{name}' not registered"
2282 );
2283 }
2284
2285 assert!(
2286 reg.find_window("ntile", 1).is_some(),
2287 "ntile(1) not registered"
2288 );
2289 assert!(
2290 reg.find_window("first_value", 1).is_some(),
2291 "first_value(1) not registered"
2292 );
2293 assert!(
2294 reg.find_window("last_value", 1).is_some(),
2295 "last_value(1) not registered"
2296 );
2297 assert!(
2298 reg.find_window("nth_value", 2).is_some(),
2299 "nth_value(2) not registered"
2300 );
2301 assert!(
2302 reg.find_window("group_concat", 1).is_some(),
2303 "group_concat(1) not registered"
2304 );
2305 assert!(
2306 reg.find_window("group_concat", 2).is_some(),
2307 "group_concat(2) not registered"
2308 );
2309 assert!(
2310 reg.find_window("string_agg", 2).is_some(),
2311 "string_agg(2) not registered"
2312 );
2313
2314 for (name, arity) in [
2315 ("rank", 1),
2316 ("dense_rank", 1),
2317 ("percent_rank", 1),
2318 ("cume_dist", 1),
2319 ("lag", 0),
2320 ("lag", 4),
2321 ("lead", 0),
2322 ("lead", 4),
2323 ("count", 2),
2324 ("group_concat", 0),
2325 ("group_concat", 3),
2326 ] {
2327 let f = reg
2328 .find_window(name, arity)
2329 .expect("known window with wrong arity returns erroring window");
2330 let mut state = f.initial_state();
2331 let err = f
2332 .step(&mut state, &[])
2333 .expect_err("invalid window arity should fail");
2334 let expected = format!("wrong number of arguments to function {name}()");
2335 assert!(
2336 matches!(&err, FrankenError::FunctionError(message) if message == &expected),
2337 "unexpected error for {name}/{arity}: {err:?}"
2338 );
2339 }
2340 }
2341
2342 #[test]
2345 fn test_e2e_window_row_number_through_registry() {
2346 let mut reg = FunctionRegistry::new();
2347 register_window_builtins(&mut reg);
2348
2349 let rn = reg.find_window("row_number", 0).unwrap();
2350 let mut state = rn.initial_state();
2351 rn.step(&mut state, &[]).unwrap();
2352 assert_eq!(rn.value(&state).unwrap(), int(1));
2353 rn.step(&mut state, &[]).unwrap();
2354 assert_eq!(rn.value(&state).unwrap(), int(2));
2355 rn.step(&mut state, &[]).unwrap();
2356 assert_eq!(rn.value(&state).unwrap(), int(3));
2357 }
2358
2359 #[test]
2360 fn test_e2e_window_rank_through_registry() {
2361 let mut reg = FunctionRegistry::new();
2362 register_window_builtins(&mut reg);
2363
2364 let rank = reg.find_window("rank", 0).unwrap();
2365 let mut state = rank.initial_state();
2366 rank.step(&mut state, &[int(1)]).unwrap();
2368 assert_eq!(rank.value(&state).unwrap(), int(1));
2369 rank.step(&mut state, &[int(2)]).unwrap();
2370 assert_eq!(rank.value(&state).unwrap(), int(2));
2371 rank.step(&mut state, &[int(2)]).unwrap();
2372 assert_eq!(rank.value(&state).unwrap(), int(2));
2373 rank.step(&mut state, &[int(3)]).unwrap();
2374 assert_eq!(rank.value(&state).unwrap(), int(4));
2375 }
2376}