1#![allow(
22 clippy::unnecessary_literal_bound,
23 clippy::cast_possible_truncation,
24 clippy::cast_possible_wrap,
25 clippy::cast_precision_loss,
26 clippy::cast_sign_loss,
27 clippy::items_after_statements,
28 clippy::float_cmp,
29 clippy::match_same_arms,
30 clippy::similar_names
31)]
32
33use std::collections::VecDeque;
34
35use fsqlite_error::{FrankenError, Result};
36use fsqlite_types::SqliteValue;
37
38use crate::{FunctionRegistry, WindowFunction};
39
40pub struct RowNumberState {
45 counter: i64,
46}
47
48pub struct RowNumberFunc;
49
50impl WindowFunction for RowNumberFunc {
51 type State = RowNumberState;
52
53 fn initial_state(&self) -> Self::State {
54 RowNumberState { counter: 0 }
55 }
56
57 fn step(&self, state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
58 state.counter += 1;
59 Ok(())
60 }
61
62 fn inverse(&self, state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
63 state.counter -= 1;
64 Ok(())
65 }
66
67 fn value(&self, state: &Self::State) -> Result<SqliteValue> {
68 Ok(SqliteValue::Integer(state.counter))
69 }
70
71 fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
72 Ok(SqliteValue::Integer(state.counter))
73 }
74
75 fn num_args(&self) -> i32 {
76 0
77 }
78
79 fn name(&self) -> &str {
80 "row_number"
81 }
82}
83
84pub struct RankState {
89 row_number: i64,
90 rank: i64,
91 last_order_value: Option<SqliteValue>,
92}
93
94pub struct RankFunc;
95
96impl WindowFunction for RankFunc {
97 type State = RankState;
98
99 fn initial_state(&self) -> Self::State {
100 RankState {
101 row_number: 0,
102 rank: 0,
103 last_order_value: None,
104 }
105 }
106
107 fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
108 state.row_number += 1;
109 let current = args.first().cloned().unwrap_or(SqliteValue::Null);
110 let is_new_peer = match &state.last_order_value {
111 None => true,
112 Some(last) => ¤t != last,
113 };
114 if is_new_peer {
115 state.rank = state.row_number;
116 state.last_order_value = Some(current);
117 }
118 Ok(())
119 }
120
121 fn inverse(&self, _state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
122 Ok(())
124 }
125
126 fn value(&self, state: &Self::State) -> Result<SqliteValue> {
127 Ok(SqliteValue::Integer(state.rank))
128 }
129
130 fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
131 Ok(SqliteValue::Integer(state.rank))
132 }
133
134 fn num_args(&self) -> i32 {
135 -1
136 }
137
138 fn min_args(&self) -> i32 {
139 0
140 }
141
142 fn max_args(&self) -> Option<i32> {
143 Some(0)
144 }
145
146 fn name(&self) -> &str {
147 "rank"
148 }
149}
150
151pub struct DenseRankState {
156 dense_rank: i64,
157 last_order_value: Option<SqliteValue>,
158}
159
160pub struct DenseRankFunc;
161
162impl WindowFunction for DenseRankFunc {
163 type State = DenseRankState;
164
165 fn initial_state(&self) -> Self::State {
166 DenseRankState {
167 dense_rank: 0,
168 last_order_value: None,
169 }
170 }
171
172 fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
173 let current = args.first().cloned().unwrap_or(SqliteValue::Null);
174 let is_new_peer = match &state.last_order_value {
175 None => true,
176 Some(last) => ¤t != last,
177 };
178 if is_new_peer {
179 state.dense_rank += 1;
180 state.last_order_value = Some(current);
181 }
182 Ok(())
183 }
184
185 fn inverse(&self, _state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
186 Ok(())
187 }
188
189 fn value(&self, state: &Self::State) -> Result<SqliteValue> {
190 Ok(SqliteValue::Integer(state.dense_rank))
191 }
192
193 fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
194 Ok(SqliteValue::Integer(state.dense_rank))
195 }
196
197 fn num_args(&self) -> i32 {
198 -1
199 }
200
201 fn min_args(&self) -> i32 {
202 0
203 }
204
205 fn max_args(&self) -> Option<i32> {
206 Some(0)
207 }
208
209 fn name(&self) -> &str {
210 "dense_rank"
211 }
212}
213
214pub struct PercentRankState {
224 partition_size: i64,
225 ranks: Vec<i64>,
226 cursor: usize,
227 step_row_number: i64,
228 current_rank: i64,
229 last_order_value: Option<SqliteValue>,
230}
231
232pub struct PercentRankFunc;
233
234impl WindowFunction for PercentRankFunc {
235 type State = PercentRankState;
236
237 fn initial_state(&self) -> Self::State {
238 PercentRankState {
239 partition_size: 0,
240 ranks: Vec::new(),
241 cursor: 0,
242 step_row_number: 0,
243 current_rank: 0,
244 last_order_value: None,
245 }
246 }
247
248 fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
249 state.step_row_number += 1;
250 state.partition_size += 1;
251 let current = args.first().cloned().unwrap_or(SqliteValue::Null);
252 let is_new_peer = match &state.last_order_value {
253 None => true,
254 Some(last) => ¤t != last,
255 };
256 if is_new_peer {
257 state.current_rank = state.step_row_number;
258 state.last_order_value = Some(current);
259 }
260 state.ranks.push(state.current_rank);
261 Ok(())
262 }
263
264 fn inverse(&self, state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
265 state.cursor += 1;
266 Ok(())
267 }
268
269 fn value(&self, state: &Self::State) -> Result<SqliteValue> {
270 if state.partition_size <= 1 {
271 return Ok(SqliteValue::Float(0.0));
272 }
273 let rank = state.ranks.get(state.cursor).copied().unwrap_or(1);
274 let pr = (rank - 1) as f64 / (state.partition_size - 1) as f64;
275 Ok(SqliteValue::Float(pr))
276 }
277
278 fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
279 self.value(&state)
280 }
281
282 fn num_args(&self) -> i32 {
283 -1
284 }
285
286 fn min_args(&self) -> i32 {
287 0
288 }
289
290 fn max_args(&self) -> Option<i32> {
291 Some(0)
292 }
293
294 fn name(&self) -> &str {
295 "percent_rank"
296 }
297}
298
299pub struct CumeDistState {
308 partition_size: i64,
309 current_row: usize,
310 cume_positions: Vec<i64>,
311 peer_start: usize,
312 last_order_value: Option<SqliteValue>,
313}
314
315pub struct CumeDistFunc;
316
317impl WindowFunction for CumeDistFunc {
318 type State = CumeDistState;
319
320 fn initial_state(&self) -> Self::State {
321 CumeDistState {
322 partition_size: 0,
323 current_row: 0,
324 cume_positions: Vec::new(),
325 peer_start: 0,
326 last_order_value: None,
327 }
328 }
329
330 fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
331 let current = args.first().cloned().unwrap_or(SqliteValue::Null);
332 let is_new_peer = match &state.last_order_value {
333 None => true,
334 Some(last) => ¤t != last,
335 };
336 if is_new_peer {
337 let peer_end = state.partition_size;
338 if let Some(slots) = state.cume_positions.get_mut(state.peer_start..) {
339 for slot in slots {
340 *slot = peer_end;
341 }
342 }
343 state.peer_start = state.cume_positions.len();
344 state.last_order_value = Some(current);
345 }
346 state.partition_size += 1;
347 state.cume_positions.push(0);
348 Ok(())
349 }
350
351 fn inverse(&self, state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
352 state.current_row += 1;
353 Ok(())
354 }
355
356 fn value(&self, state: &Self::State) -> Result<SqliteValue> {
357 if state.partition_size == 0 {
358 return Ok(SqliteValue::Float(0.0));
359 }
360 let peer_end = state
361 .cume_positions
362 .get(state.current_row)
363 .copied()
364 .filter(|position| *position != 0)
365 .unwrap_or(state.partition_size);
366 let cd = peer_end as f64 / state.partition_size as f64;
367 Ok(SqliteValue::Float(cd))
368 }
369
370 fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
371 self.value(&state)
372 }
373
374 fn num_args(&self) -> i32 {
375 -1
376 }
377
378 fn min_args(&self) -> i32 {
379 0
380 }
381
382 fn max_args(&self) -> Option<i32> {
383 Some(0)
384 }
385
386 fn name(&self) -> &str {
387 "cume_dist"
388 }
389}
390
391pub struct NtileState {
401 partition_size: i64,
402 n: i64,
403 current_row: i64,
404}
405
406pub struct NtileFunc;
407
408const INVALID_NTILE_ARGUMENT: &str = "argument of ntile must be a positive integer";
409
410impl WindowFunction for NtileFunc {
411 type State = NtileState;
412
413 fn initial_state(&self) -> Self::State {
414 NtileState {
415 partition_size: 0,
416 n: 1,
417 current_row: 0,
418 }
419 }
420
421 fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
422 if state.partition_size == 0 {
423 let n = args.first().map_or(0, SqliteValue::to_integer);
424 if n <= 0 {
425 return Err(FrankenError::function_error(INVALID_NTILE_ARGUMENT));
426 }
427 state.n = n;
428 }
429 state.partition_size += 1;
430 Ok(())
431 }
432
433 fn inverse(&self, state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
434 state.current_row += 1;
435 Ok(())
436 }
437
438 fn value(&self, state: &Self::State) -> Result<SqliteValue> {
439 if state.partition_size == 0 {
440 return Ok(SqliteValue::Integer(1));
441 }
442 let n = state.n;
443 let sz = state.partition_size;
444 let row = state.current_row + 1; let base = sz / n;
449 let extra = sz % n;
450 let large_rows = extra * (base + 1);
452
453 let bucket = if row <= large_rows {
454 (row - 1) / (base + 1) + 1
456 } else {
457 let adjusted = row - large_rows;
459 if base == 0 {
460 extra + adjusted
462 } else {
463 extra + (adjusted - 1) / base + 1
464 }
465 };
466 Ok(SqliteValue::Integer(bucket))
467 }
468
469 fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
470 self.value(&state)
471 }
472
473 fn num_args(&self) -> i32 {
474 1
475 }
476
477 fn name(&self) -> &str {
478 "ntile"
479 }
480}
481
482fn numeric_prefix_len(bytes: &[u8]) -> usize {
487 let mut idx = 0;
488 if bytes
489 .get(idx)
490 .is_some_and(|byte| matches!(*byte, b'+' | b'-'))
491 {
492 idx += 1;
493 }
494
495 let mut saw_digit = false;
496 while bytes.get(idx).is_some_and(u8::is_ascii_digit) {
497 idx += 1;
498 saw_digit = true;
499 }
500
501 if bytes.get(idx) == Some(&b'.') {
502 idx += 1;
503 while bytes.get(idx).is_some_and(u8::is_ascii_digit) {
504 idx += 1;
505 saw_digit = true;
506 }
507 }
508
509 if !saw_digit {
510 return 0;
511 }
512
513 let mantissa_end = idx;
514 if bytes
515 .get(idx)
516 .is_some_and(|byte| matches!(*byte, b'e' | b'E'))
517 {
518 idx += 1;
519 if bytes
520 .get(idx)
521 .is_some_and(|byte| matches!(*byte, b'+' | b'-'))
522 {
523 idx += 1;
524 }
525 let exp_start = idx;
526 while bytes.get(idx).is_some_and(u8::is_ascii_digit) {
527 idx += 1;
528 }
529 if idx == exp_start {
530 return mantissa_end;
531 }
532 }
533
534 idx
535}
536
537fn trim_ascii_start(bytes: &[u8]) -> &[u8] {
538 let start = bytes
539 .iter()
540 .position(|byte| !byte.is_ascii_whitespace())
541 .unwrap_or(bytes.len());
542 bytes.get(start..).unwrap_or(&[])
543}
544
545fn lag_lead_bytes_offset(bytes: &[u8]) -> Option<i64> {
546 let trimmed = trim_ascii_start(bytes);
547 let prefix_len = numeric_prefix_len(trimmed);
548 if prefix_len == 0 {
549 return Some(0);
550 }
551 let prefix = trimmed
552 .get(..prefix_len)
553 .and_then(|bytes| std::str::from_utf8(bytes).ok())?;
554 if prefix
555 .as_bytes()
556 .iter()
557 .any(|byte| matches!(*byte, b'.' | b'e' | b'E'))
558 {
559 prefix.parse().ok().and_then(integral_f64_to_i64)
560 } else {
561 prefix
562 .parse()
563 .ok()
564 .or_else(|| prefix.parse().ok().and_then(integral_f64_to_i64))
565 }
566}
567
568fn lag_lead_text_offset(text: &str) -> Option<i64> {
569 lag_lead_bytes_offset(text.as_bytes())
570}
571
572fn lag_lead_offset_arg(value: Option<&SqliteValue>) -> Option<i64> {
573 match value {
574 None => Some(1),
575 Some(SqliteValue::Null) => None,
576 Some(SqliteValue::Integer(offset)) => Some(*offset),
577 Some(SqliteValue::Float(offset)) => integral_f64_to_i64(*offset),
578 Some(SqliteValue::Text(text)) => lag_lead_text_offset(text),
579 Some(SqliteValue::Blob(bytes)) => lag_lead_bytes_offset(bytes),
580 }
581}
582
583pub struct LagState {
585 buffer: Vec<SqliteValue>,
586 offsets: Vec<Option<i64>>,
587 defaults: Vec<SqliteValue>,
588 current_row: i64,
589}
590
591pub struct LagFunc;
592
593impl WindowFunction for LagFunc {
594 type State = LagState;
595
596 fn initial_state(&self) -> Self::State {
597 LagState {
598 buffer: Vec::new(),
599 offsets: Vec::new(),
600 defaults: Vec::new(),
601 current_row: 0,
602 }
603 }
604
605 fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
606 let val = args.first().cloned().unwrap_or(SqliteValue::Null);
607 let offset = lag_lead_offset_arg(args.get(1));
608 let default_val = args.get(2).cloned().unwrap_or(SqliteValue::Null);
609 state.buffer.push(val);
610 state.offsets.push(offset);
611 state.defaults.push(default_val);
612 Ok(())
613 }
614
615 fn inverse(&self, state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
616 state.current_row += 1;
617 Ok(())
618 }
619
620 fn value(&self, state: &Self::State) -> Result<SqliteValue> {
621 let current_index = usize::try_from(state.current_row).unwrap_or(usize::MAX);
622 let default_val = state
623 .defaults
624 .get(current_index)
625 .cloned()
626 .unwrap_or(SqliteValue::Null);
627 let Some(offset) = state.offsets.get(current_index).copied().flatten() else {
628 return Ok(default_val);
629 };
630 let target = state.current_row - offset;
631 let Ok(target_index) = usize::try_from(target) else {
632 return Ok(default_val);
633 };
634 Ok(state
635 .buffer
636 .get(target_index)
637 .cloned()
638 .unwrap_or(default_val))
639 }
640
641 fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
642 self.value(&state)
643 }
644
645 fn num_args(&self) -> i32 {
646 -1 }
648
649 fn min_args(&self) -> i32 {
650 1
651 }
652
653 fn max_args(&self) -> Option<i32> {
654 Some(3)
655 }
656
657 fn name(&self) -> &str {
658 "lag"
659 }
660}
661
662pub struct LeadState {
668 buffer: Vec<SqliteValue>,
669 offsets: Vec<Option<i64>>,
670 defaults: Vec<SqliteValue>,
671 current_row: i64,
672}
673
674pub struct LeadFunc;
675
676impl WindowFunction for LeadFunc {
677 type State = LeadState;
678
679 fn initial_state(&self) -> Self::State {
680 LeadState {
681 buffer: Vec::new(),
682 offsets: Vec::new(),
683 defaults: Vec::new(),
684 current_row: 0,
685 }
686 }
687
688 fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
689 let val = args.first().cloned().unwrap_or(SqliteValue::Null);
690 let offset = lag_lead_offset_arg(args.get(1));
691 let default_val = args.get(2).cloned().unwrap_or(SqliteValue::Null);
692 state.buffer.push(val);
693 state.offsets.push(offset);
694 state.defaults.push(default_val);
695 Ok(())
696 }
697
698 fn inverse(&self, state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
699 state.current_row += 1;
700 Ok(())
701 }
702
703 fn value(&self, state: &Self::State) -> Result<SqliteValue> {
704 let current_index = usize::try_from(state.current_row).unwrap_or(usize::MAX);
705 let default_val = state
706 .defaults
707 .get(current_index)
708 .cloned()
709 .unwrap_or(SqliteValue::Null);
710 let Some(offset) = state.offsets.get(current_index).copied().flatten() else {
711 return Ok(default_val);
712 };
713 let target = state.current_row + offset;
714 if target < 0 || target >= state.buffer.len() as i64 {
715 return Ok(default_val);
716 }
717 Ok(state.buffer[target as usize].clone())
718 }
719
720 fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
721 self.value(&state)
722 }
723
724 fn num_args(&self) -> i32 {
725 -1
726 }
727
728 fn min_args(&self) -> i32 {
729 1
730 }
731
732 fn max_args(&self) -> Option<i32> {
733 Some(3)
734 }
735
736 fn name(&self) -> &str {
737 "lead"
738 }
739}
740
741pub struct FirstValueState {
746 first: Option<SqliteValue>,
747}
748
749pub struct FirstValueFunc;
750
751impl WindowFunction for FirstValueFunc {
752 type State = FirstValueState;
753
754 fn initial_state(&self) -> Self::State {
755 FirstValueState { first: None }
756 }
757
758 fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
759 if state.first.is_none() {
760 state.first = Some(args.first().cloned().unwrap_or(SqliteValue::Null));
761 }
762 Ok(())
763 }
764
765 fn inverse(&self, state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
766 state.first = None;
771 Ok(())
772 }
773
774 fn value(&self, state: &Self::State) -> Result<SqliteValue> {
775 Ok(state.first.clone().unwrap_or(SqliteValue::Null))
776 }
777
778 fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
779 Ok(state.first.unwrap_or(SqliteValue::Null))
780 }
781
782 fn num_args(&self) -> i32 {
783 1
784 }
785
786 fn name(&self) -> &str {
787 "first_value"
788 }
789}
790
791pub struct LastValueState {
796 frame: VecDeque<SqliteValue>,
797}
798
799pub struct LastValueFunc;
800
801impl WindowFunction for LastValueFunc {
802 type State = LastValueState;
803
804 fn initial_state(&self) -> Self::State {
805 LastValueState {
806 frame: VecDeque::new(),
807 }
808 }
809
810 fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
811 state
812 .frame
813 .push_back(args.first().cloned().unwrap_or(SqliteValue::Null));
814 Ok(())
815 }
816
817 fn inverse(&self, state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
818 state.frame.pop_front();
819 Ok(())
820 }
821
822 fn value(&self, state: &Self::State) -> Result<SqliteValue> {
823 Ok(state.frame.back().cloned().unwrap_or(SqliteValue::Null))
824 }
825
826 fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
827 Ok(state.frame.back().cloned().unwrap_or(SqliteValue::Null))
828 }
829
830 fn num_args(&self) -> i32 {
831 1
832 }
833
834 fn name(&self) -> &str {
835 "last_value"
836 }
837}
838
839pub struct NthValueState {
844 frame: VecDeque<SqliteValue>,
845 n: i64,
846}
847
848pub struct NthValueFunc;
849
850const INVALID_NTH_VALUE_ARGUMENT: &str = "second argument to nth_value must be a positive integer";
851
852fn integral_f64_to_i64(value: f64) -> Option<i64> {
853 const I64_MIN_AS_F64: f64 = -9_223_372_036_854_775_808.0;
854 const I64_MAX_EXCLUSIVE_AS_F64: f64 = 9_223_372_036_854_775_808.0;
855
856 if !value.is_finite()
857 || value.fract() != 0.0
858 || !(I64_MIN_AS_F64..I64_MAX_EXCLUSIVE_AS_F64).contains(&value)
859 {
860 return None;
861 }
862 Some(value as i64)
863}
864
865fn parse_integral_text(text: &str) -> Option<i64> {
866 let trimmed = text.trim();
867 if trimmed.is_empty() {
868 return None;
869 }
870 trimmed
871 .parse()
872 .ok()
873 .or_else(|| trimmed.parse().ok().and_then(integral_f64_to_i64))
874}
875
876fn nth_value_positive_integer_arg(value: Option<&SqliteValue>) -> Result<i64> {
877 let Some(value) = value else {
878 return Err(FrankenError::function_error(INVALID_NTH_VALUE_ARGUMENT));
879 };
880 let n = match value {
881 SqliteValue::Integer(n) => Some(*n),
882 SqliteValue::Float(n) => integral_f64_to_i64(*n),
883 SqliteValue::Text(text) => parse_integral_text(text),
884 SqliteValue::Null | SqliteValue::Blob(_) => None,
885 };
886 match n {
887 Some(n) if n > 0 => Ok(n),
888 _ => Err(FrankenError::function_error(INVALID_NTH_VALUE_ARGUMENT)),
889 }
890}
891
892impl WindowFunction for NthValueFunc {
893 type State = NthValueState;
894
895 fn initial_state(&self) -> Self::State {
896 NthValueState {
897 frame: VecDeque::new(),
898 n: 1,
899 }
900 }
901
902 fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
903 let val = args.first().cloned().unwrap_or(SqliteValue::Null);
904 let n = nth_value_positive_integer_arg(args.get(1))?;
905 if state.frame.is_empty() {
907 state.n = n;
908 }
909 state.frame.push_back(val);
910 Ok(())
911 }
912
913 fn inverse(&self, state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
914 state.frame.pop_front();
915 Ok(())
916 }
917
918 fn value(&self, state: &Self::State) -> Result<SqliteValue> {
919 let idx = (state.n - 1) as usize;
920 Ok(state.frame.get(idx).cloned().unwrap_or(SqliteValue::Null))
921 }
922
923 fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
924 self.value(&state)
925 }
926
927 fn num_args(&self) -> i32 {
928 2
929 }
930
931 fn name(&self) -> &str {
932 "nth_value"
933 }
934}
935
936pub struct WindowSumState {
942 sum: f64,
943 err: f64,
944 has_value: bool,
945 is_int: bool,
946 int_sum: i64,
947 overflowed: bool,
948}
949
950#[inline]
952fn kbn_step(sum: &mut f64, err: &mut f64, value: f64) {
953 let s = *sum;
954 let t = s + value;
955 if s.abs() > value.abs() {
956 *err += (s - t) + value;
957 } else {
958 *err += (value - t) + s;
959 }
960 *sum = t;
961}
962
963pub struct WindowSumFunc;
964
965impl WindowFunction for WindowSumFunc {
966 type State = WindowSumState;
967
968 fn initial_state(&self) -> Self::State {
969 WindowSumState {
970 sum: 0.0,
971 err: 0.0,
972 has_value: false,
973 is_int: true,
974 int_sum: 0,
975 overflowed: false,
976 }
977 }
978
979 fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
980 if args.is_empty() || args[0].is_null() {
981 return Ok(());
982 }
983 let value = args[0].to_sum_numeric_value();
984 if value.is_null() {
985 return Ok(());
986 }
987 state.has_value = true;
988 match value {
989 SqliteValue::Integer(i) => {
990 if state.is_int && !state.overflowed {
991 match state.int_sum.checked_add(i) {
992 Some(s) => state.int_sum = s,
993 None => state.overflowed = true,
994 }
995 }
996 kbn_step(&mut state.sum, &mut state.err, i as f64);
997 }
998 SqliteValue::Float(f) => {
999 state.is_int = false;
1000 kbn_step(&mut state.sum, &mut state.err, f);
1001 }
1002 SqliteValue::Null | SqliteValue::Text(_) | SqliteValue::Blob(_) => {}
1003 }
1004 Ok(())
1005 }
1006
1007 fn inverse(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
1008 if args.is_empty() || args[0].is_null() {
1009 return Ok(());
1010 }
1011 let value = args[0].to_sum_numeric_value();
1012 match value {
1013 SqliteValue::Integer(i) => {
1014 if state.is_int && !state.overflowed {
1015 match state.int_sum.checked_sub(i) {
1016 Some(s) => state.int_sum = s,
1017 None => state.overflowed = true,
1018 }
1019 }
1020 kbn_step(&mut state.sum, &mut state.err, -(i as f64));
1021 }
1022 SqliteValue::Float(f) => {
1023 state.is_int = false;
1024 kbn_step(&mut state.sum, &mut state.err, -f);
1025 }
1026 SqliteValue::Null | SqliteValue::Text(_) | SqliteValue::Blob(_) => {}
1027 }
1028 Ok(())
1029 }
1030
1031 fn value(&self, state: &Self::State) -> Result<SqliteValue> {
1032 if !state.has_value {
1033 return Ok(SqliteValue::Null);
1034 }
1035 if state.is_int && state.overflowed {
1036 return Err(FrankenError::IntegerOverflow);
1037 }
1038 if state.is_int {
1039 Ok(SqliteValue::Integer(state.int_sum))
1040 } else {
1041 Ok(SqliteValue::Float(state.sum + state.err))
1042 }
1043 }
1044
1045 fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
1046 self.value(&state)
1047 }
1048
1049 fn num_args(&self) -> i32 {
1050 1
1051 }
1052
1053 fn name(&self) -> &str {
1054 "SUM"
1055 }
1056}
1057
1058pub struct WindowTotalFunc;
1059
1060impl WindowFunction for WindowTotalFunc {
1061 type State = f64;
1062
1063 fn initial_state(&self) -> Self::State {
1064 0.0
1065 }
1066
1067 fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
1068 if !args.is_empty() && !args[0].is_null() {
1069 *state += args[0].to_float();
1070 }
1071 Ok(())
1072 }
1073
1074 fn inverse(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
1075 if !args.is_empty() && !args[0].is_null() {
1076 *state -= args[0].to_float();
1077 }
1078 Ok(())
1079 }
1080
1081 fn value(&self, state: &Self::State) -> Result<SqliteValue> {
1082 Ok(SqliteValue::Float(*state))
1083 }
1084
1085 fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
1086 Ok(SqliteValue::Float(state))
1087 }
1088
1089 fn num_args(&self) -> i32 {
1090 1
1091 }
1092
1093 fn name(&self) -> &str {
1094 "TOTAL"
1095 }
1096}
1097
1098pub struct WindowCountState {
1099 count: i64,
1100}
1101
1102pub struct WindowCountFunc;
1103
1104impl WindowFunction for WindowCountFunc {
1105 type State = WindowCountState;
1106
1107 fn initial_state(&self) -> Self::State {
1108 WindowCountState { count: 0 }
1109 }
1110
1111 fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
1112 if args.is_empty() || !args[0].is_null() {
1114 state.count += 1;
1115 }
1116 Ok(())
1117 }
1118
1119 fn inverse(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
1120 if args.is_empty() || !args[0].is_null() {
1121 state.count -= 1;
1122 }
1123 Ok(())
1124 }
1125
1126 fn value(&self, state: &Self::State) -> Result<SqliteValue> {
1127 Ok(SqliteValue::Integer(state.count))
1128 }
1129
1130 fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
1131 Ok(SqliteValue::Integer(state.count))
1132 }
1133
1134 fn num_args(&self) -> i32 {
1135 -1 }
1137
1138 fn min_args(&self) -> i32 {
1139 0
1140 }
1141
1142 fn max_args(&self) -> Option<i32> {
1143 Some(1)
1144 }
1145
1146 fn name(&self) -> &str {
1147 "COUNT"
1148 }
1149}
1150
1151pub struct WindowMinState {
1152 min: Option<SqliteValue>,
1153}
1154
1155pub struct WindowMinFunc;
1156
1157impl WindowFunction for WindowMinFunc {
1158 type State = WindowMinState;
1159
1160 fn initial_state(&self) -> Self::State {
1161 WindowMinState { min: None }
1162 }
1163
1164 fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
1165 if args.is_empty() || args[0].is_null() {
1166 return Ok(());
1167 }
1168 state.min = Some(match state.min.take() {
1169 None => args[0].clone(),
1170 Some(cur) => {
1171 if cmp_values(&args[0], &cur) == std::cmp::Ordering::Less {
1172 args[0].clone()
1173 } else {
1174 cur
1175 }
1176 }
1177 });
1178 Ok(())
1179 }
1180
1181 fn inverse(&self, _state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
1182 Ok(())
1184 }
1185
1186 fn value(&self, state: &Self::State) -> Result<SqliteValue> {
1187 Ok(state.min.clone().unwrap_or(SqliteValue::Null))
1188 }
1189
1190 fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
1191 Ok(state.min.unwrap_or(SqliteValue::Null))
1192 }
1193
1194 fn num_args(&self) -> i32 {
1195 1
1196 }
1197
1198 fn name(&self) -> &str {
1199 "MIN"
1200 }
1201}
1202
1203pub struct WindowMaxState {
1204 max: Option<SqliteValue>,
1205}
1206
1207pub struct WindowMaxFunc;
1208
1209impl WindowFunction for WindowMaxFunc {
1210 type State = WindowMaxState;
1211
1212 fn initial_state(&self) -> Self::State {
1213 WindowMaxState { max: None }
1214 }
1215
1216 fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
1217 if args.is_empty() || args[0].is_null() {
1218 return Ok(());
1219 }
1220 state.max = Some(match state.max.take() {
1221 None => args[0].clone(),
1222 Some(cur) => {
1223 if cmp_values(&args[0], &cur) == std::cmp::Ordering::Greater {
1224 args[0].clone()
1225 } else {
1226 cur
1227 }
1228 }
1229 });
1230 Ok(())
1231 }
1232
1233 fn inverse(&self, _state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
1234 Ok(())
1235 }
1236
1237 fn value(&self, state: &Self::State) -> Result<SqliteValue> {
1238 Ok(state.max.clone().unwrap_or(SqliteValue::Null))
1239 }
1240
1241 fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
1242 Ok(state.max.unwrap_or(SqliteValue::Null))
1243 }
1244
1245 fn num_args(&self) -> i32 {
1246 1
1247 }
1248
1249 fn name(&self) -> &str {
1250 "MAX"
1251 }
1252}
1253
1254pub struct WindowAvgState {
1255 sum: f64,
1256 err: f64,
1257 count: i64,
1258}
1259
1260pub struct WindowAvgFunc;
1261
1262impl WindowFunction for WindowAvgFunc {
1263 type State = WindowAvgState;
1264
1265 fn initial_state(&self) -> Self::State {
1266 WindowAvgState {
1267 sum: 0.0,
1268 err: 0.0,
1269 count: 0,
1270 }
1271 }
1272
1273 fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
1274 if args.is_empty() || args[0].is_null() {
1275 return Ok(());
1276 }
1277 kbn_step(&mut state.sum, &mut state.err, args[0].to_float());
1278 state.count += 1;
1279 Ok(())
1280 }
1281
1282 fn inverse(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
1283 if args.is_empty() || args[0].is_null() {
1284 return Ok(());
1285 }
1286 kbn_step(&mut state.sum, &mut state.err, -args[0].to_float());
1287 state.count -= 1;
1288 Ok(())
1289 }
1290
1291 fn value(&self, state: &Self::State) -> Result<SqliteValue> {
1292 if state.count == 0 {
1293 Ok(SqliteValue::Null)
1294 } else {
1295 #[allow(clippy::cast_precision_loss)]
1296 Ok(SqliteValue::Float(
1297 (state.sum + state.err) / state.count as f64,
1298 ))
1299 }
1300 }
1301
1302 fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
1303 self.value(&state)
1304 }
1305
1306 fn num_args(&self) -> i32 {
1307 1
1308 }
1309
1310 fn name(&self) -> &str {
1311 "AVG"
1312 }
1313}
1314
1315pub struct WindowGroupConcatState {
1316 result: String,
1317 has_value: bool,
1318}
1319
1320fn window_group_concat_step(state: &mut WindowGroupConcatState, args: &[SqliteValue]) {
1321 if args.is_empty() || args[0].is_null() {
1322 return;
1323 }
1324 if state.has_value {
1325 match args.get(1) {
1326 Some(separator) if !separator.is_null() => {
1327 if let Some(text) = separator.as_text_str() {
1328 state.result.push_str(text);
1329 } else {
1330 state.result.push_str(&separator.to_text());
1331 }
1332 }
1333 Some(_) => {}
1334 None => state.result.push(','),
1335 }
1336 }
1337 if let Some(text) = args[0].as_text_str() {
1338 state.result.push_str(text);
1339 } else {
1340 state.result.push_str(&args[0].to_text());
1341 }
1342 state.has_value = true;
1343}
1344
1345fn window_group_concat_value(state: &WindowGroupConcatState) -> SqliteValue {
1346 if state.has_value {
1347 SqliteValue::Text(state.result.clone().into())
1348 } else {
1349 SqliteValue::Null
1350 }
1351}
1352
1353pub struct WindowGroupConcatFunc;
1354
1355impl WindowFunction for WindowGroupConcatFunc {
1356 type State = WindowGroupConcatState;
1357
1358 fn initial_state(&self) -> Self::State {
1359 WindowGroupConcatState {
1360 result: String::new(),
1361 has_value: false,
1362 }
1363 }
1364
1365 fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
1366 window_group_concat_step(state, args);
1367 Ok(())
1368 }
1369
1370 fn inverse(&self, _state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
1371 Ok(())
1374 }
1375
1376 fn value(&self, state: &Self::State) -> Result<SqliteValue> {
1377 Ok(window_group_concat_value(state))
1378 }
1379
1380 fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
1381 Ok(window_group_concat_value(&state))
1382 }
1383
1384 fn num_args(&self) -> i32 {
1385 -1
1386 }
1387
1388 fn min_args(&self) -> i32 {
1389 1
1390 }
1391
1392 fn max_args(&self) -> Option<i32> {
1393 Some(2)
1394 }
1395
1396 fn name(&self) -> &str {
1397 "group_concat"
1398 }
1399}
1400
1401pub struct WindowStringAggFunc;
1402
1403impl WindowFunction for WindowStringAggFunc {
1404 type State = WindowGroupConcatState;
1405
1406 fn initial_state(&self) -> Self::State {
1407 WindowGroupConcatState {
1408 result: String::new(),
1409 has_value: false,
1410 }
1411 }
1412
1413 fn step(&self, state: &mut Self::State, args: &[SqliteValue]) -> Result<()> {
1414 window_group_concat_step(state, args);
1415 Ok(())
1416 }
1417
1418 fn inverse(&self, _state: &mut Self::State, _args: &[SqliteValue]) -> Result<()> {
1419 Ok(())
1420 }
1421
1422 fn value(&self, state: &Self::State) -> Result<SqliteValue> {
1423 Ok(window_group_concat_value(state))
1424 }
1425
1426 fn finalize(&self, state: Self::State) -> Result<SqliteValue> {
1427 Ok(window_group_concat_value(&state))
1428 }
1429
1430 fn num_args(&self) -> i32 {
1431 2
1432 }
1433
1434 fn name(&self) -> &str {
1435 "string_agg"
1436 }
1437}
1438
1439pub fn cmp_values(a: &SqliteValue, b: &SqliteValue) -> std::cmp::Ordering {
1441 a.cmp(b)
1442}
1443
1444pub fn register_window_builtins(registry: &mut FunctionRegistry) {
1448 registry.register_window(RowNumberFunc);
1449 registry.register_window(RankFunc);
1450 registry.register_window(DenseRankFunc);
1451 registry.register_window(PercentRankFunc);
1452 registry.register_window(CumeDistFunc);
1453 registry.register_window(NtileFunc);
1454 registry.register_window(LagFunc);
1455 registry.register_window(LeadFunc);
1456 registry.register_window(FirstValueFunc);
1457 registry.register_window(LastValueFunc);
1458 registry.register_window(NthValueFunc);
1459
1460 registry.register_window(WindowSumFunc);
1462 registry.register_window(WindowTotalFunc);
1463 registry.register_window(WindowCountFunc);
1464 registry.register_window(WindowMinFunc);
1465 registry.register_window(WindowMaxFunc);
1466 registry.register_window(WindowAvgFunc);
1467 registry.register_window(WindowGroupConcatFunc);
1468 registry.register_window(WindowStringAggFunc);
1469}
1470
1471#[cfg(test)]
1474mod tests {
1475 use super::*;
1476
1477 fn int(v: i64) -> SqliteValue {
1478 SqliteValue::Integer(v)
1479 }
1480
1481 fn float(v: f64) -> SqliteValue {
1482 SqliteValue::Float(v)
1483 }
1484
1485 fn text(s: &str) -> SqliteValue {
1486 SqliteValue::Text(s.into())
1487 }
1488
1489 fn blob(bytes: &[u8]) -> SqliteValue {
1490 SqliteValue::Blob(std::sync::Arc::from(bytes))
1491 }
1492
1493 fn null() -> SqliteValue {
1494 SqliteValue::Null
1495 }
1496
1497 fn assert_function_error(err: FrankenError, expected: &str) {
1498 assert!(
1499 matches!(&err, FrankenError::FunctionError(message) if message == expected),
1500 "expected function error {expected:?}, got {err:?}"
1501 );
1502 }
1503
1504 fn assert_float_near(value: &SqliteValue, expected: f64) {
1505 assert!(
1506 matches!(value, SqliteValue::Float(_)),
1507 "expected Float, got {value:?}"
1508 );
1509 if let SqliteValue::Float(actual) = value {
1510 assert!(
1511 (*actual - expected).abs() < 1e-10,
1512 "expected {expected}, got {actual}"
1513 );
1514 }
1515 }
1516
1517 fn run_window_partition<F: WindowFunction>(
1521 func: &F,
1522 rows: &[Vec<SqliteValue>],
1523 ) -> Vec<SqliteValue> {
1524 let mut state = func.initial_state();
1525 let mut results = Vec::new();
1526 for row in rows {
1527 func.step(&mut state, row).unwrap();
1528 results.push(func.value(&state).unwrap());
1529 }
1530 results
1531 }
1532
1533 fn run_window_two_pass<F: WindowFunction>(
1538 func: &F,
1539 rows: &[Vec<SqliteValue>],
1540 ) -> Vec<SqliteValue> {
1541 let mut state = func.initial_state();
1542 for row in rows {
1544 func.step(&mut state, row).unwrap();
1545 }
1546 let mut results = Vec::new();
1548 for (i, _) in rows.iter().enumerate() {
1549 results.push(func.value(&state).unwrap());
1550 if i < rows.len() - 1 {
1551 func.inverse(&mut state, &[]).unwrap();
1552 }
1553 }
1554 results
1555 }
1556
1557 #[test]
1560 fn test_row_number_basic() {
1561 let results =
1562 run_window_partition(&RowNumberFunc, &[vec![], vec![], vec![], vec![], vec![]]);
1563 assert_eq!(results, vec![int(1), int(2), int(3), int(4), int(5)]);
1564 }
1565
1566 #[test]
1567 fn test_row_number_partition_reset() {
1568 let r1 = run_window_partition(&RowNumberFunc, &[vec![], vec![], vec![]]);
1570 assert_eq!(r1, vec![int(1), int(2), int(3)]);
1571
1572 let r2 = run_window_partition(&RowNumberFunc, &[vec![], vec![]]);
1574 assert_eq!(r2, vec![int(1), int(2)]);
1575 }
1576
1577 #[test]
1580 fn test_rank_with_ties() {
1581 let results = run_window_partition(
1583 &RankFunc,
1584 &[vec![int(1)], vec![int(2)], vec![int(2)], vec![int(3)]],
1585 );
1586 assert_eq!(results, vec![int(1), int(2), int(2), int(4)]);
1587 }
1588
1589 #[test]
1590 fn test_rank_no_ties() {
1591 let results =
1592 run_window_partition(&RankFunc, &[vec![int(10)], vec![int(20)], vec![int(30)]]);
1593 assert_eq!(results, vec![int(1), int(2), int(3)]);
1594 }
1595
1596 #[test]
1599 fn test_dense_rank_with_ties() {
1600 let results = run_window_partition(
1602 &DenseRankFunc,
1603 &[vec![int(1)], vec![int(2)], vec![int(2)], vec![int(3)]],
1604 );
1605 assert_eq!(results, vec![int(1), int(2), int(2), int(3)]);
1606 }
1607
1608 #[test]
1609 fn test_dense_rank_multiple_ties() {
1610 let results = run_window_partition(
1612 &DenseRankFunc,
1613 &[
1614 vec![int(1)],
1615 vec![int(1)],
1616 vec![int(2)],
1617 vec![int(2)],
1618 vec![int(3)],
1619 ],
1620 );
1621 assert_eq!(results, vec![int(1), int(1), int(2), int(2), int(3)]);
1622 }
1623
1624 #[test]
1627 fn test_percent_rank_single_row() {
1628 let results = run_window_two_pass(&PercentRankFunc, &[vec![int(1)]]);
1629 assert_eq!(results, vec![SqliteValue::Float(0.0)]);
1630 }
1631
1632 #[test]
1633 fn test_percent_rank_formula() {
1634 let results = run_window_two_pass(
1637 &PercentRankFunc,
1638 &[vec![int(1)], vec![int(2)], vec![int(2)], vec![int(3)]],
1639 );
1640 assert_float_near(&results[0], 0.0);
1645 assert_float_near(&results[1], 1.0 / 3.0);
1646 assert_float_near(&results[2], 1.0 / 3.0);
1647 assert_float_near(&results[3], 1.0);
1648 }
1649
1650 #[test]
1651 fn test_percent_rank_without_order_treats_partition_as_one_peer_group() {
1652 let results = run_window_two_pass(&PercentRankFunc, &[vec![], vec![], vec![]]);
1653 for value in results {
1654 assert_float_near(&value, 0.0);
1655 }
1656 }
1657
1658 #[test]
1661 fn test_cume_dist_distinct() {
1662 let results = run_window_two_pass(
1664 &CumeDistFunc,
1665 &[vec![int(1)], vec![int(2)], vec![int(3)], vec![int(4)]],
1666 );
1667 for (i, expected) in [0.25, 0.5, 0.75, 1.0].iter().enumerate() {
1668 assert_float_near(&results[i], *expected);
1669 }
1670 }
1671
1672 #[test]
1673 fn test_cume_dist_with_ties() {
1674 let results = run_window_two_pass(
1676 &CumeDistFunc,
1677 &[vec![int(1)], vec![int(2)], vec![int(2)], vec![int(3)]],
1678 );
1679 for (i, expected) in [0.25, 0.75, 0.75, 1.0].iter().enumerate() {
1680 assert_float_near(&results[i], *expected);
1681 }
1682 }
1683
1684 #[test]
1685 fn test_cume_dist_without_order_treats_partition_as_one_peer_group() {
1686 let results = run_window_two_pass(&CumeDistFunc, &[vec![], vec![], vec![]]);
1687 for value in results {
1688 assert_float_near(&value, 1.0);
1689 }
1690 }
1691
1692 #[test]
1693 fn test_cume_dist_null_peers_share_same_peer_group() {
1694 let results =
1695 run_window_two_pass(&CumeDistFunc, &[vec![null()], vec![null()], vec![int(1)]]);
1696 assert_float_near(&results[0], 2.0 / 3.0);
1697 assert_float_near(&results[1], 2.0 / 3.0);
1698 assert_float_near(&results[2], 1.0);
1699 }
1700
1701 #[test]
1704 fn test_ntile_even() {
1705 let rows: Vec<Vec<SqliteValue>> = (0..8).map(|_| vec![int(4)]).collect();
1707 let results = run_window_two_pass(&NtileFunc, &rows);
1708 assert_eq!(
1709 results,
1710 vec![
1711 int(1),
1712 int(1),
1713 int(2),
1714 int(2),
1715 int(3),
1716 int(3),
1717 int(4),
1718 int(4)
1719 ]
1720 );
1721 }
1722
1723 #[test]
1724 fn test_ntile_uneven() {
1725 let rows: Vec<Vec<SqliteValue>> = (0..10).map(|_| vec![int(3)]).collect();
1727 let results = run_window_two_pass(&NtileFunc, &rows);
1728 assert_eq!(
1729 results,
1730 vec![
1731 int(1),
1732 int(1),
1733 int(1),
1734 int(1),
1735 int(2),
1736 int(2),
1737 int(2),
1738 int(3),
1739 int(3),
1740 int(3)
1741 ]
1742 );
1743 }
1744
1745 #[test]
1746 fn test_ntile_more_buckets_than_rows() {
1747 let rows: Vec<Vec<SqliteValue>> = (0..3).map(|_| vec![int(10)]).collect();
1749 let results = run_window_two_pass(&NtileFunc, &rows);
1750 assert_eq!(results, vec![int(1), int(2), int(3)]);
1751 }
1752
1753 #[test]
1754 fn test_ntile_rejects_non_positive_argument() {
1755 for n in [0, -1] {
1756 let mut state = NtileFunc.initial_state();
1757 let err = NtileFunc.step(&mut state, &[int(n)]).unwrap_err();
1758 assert_function_error(err, INVALID_NTILE_ARGUMENT);
1759 }
1760 }
1761
1762 #[test]
1765 fn test_lag_default() {
1766 let results = run_window_two_pass(&LagFunc, &[vec![int(10)], vec![int(20)], vec![int(30)]]);
1768 assert_eq!(results, vec![null(), int(10), int(20)]);
1769 }
1770
1771 #[test]
1772 fn test_lag_offset_3() {
1773 let results = run_window_two_pass(
1775 &LagFunc,
1776 &[
1777 vec![int(10), int(3)],
1778 vec![int(20), int(3)],
1779 vec![int(30), int(3)],
1780 vec![int(40), int(3)],
1781 vec![int(50), int(3)],
1782 ],
1783 );
1784 assert_eq!(results, vec![null(), null(), null(), int(10), int(20)]);
1785 }
1786
1787 #[test]
1788 fn test_lag_default_value() {
1789 let results = run_window_two_pass(
1791 &LagFunc,
1792 &[
1793 vec![int(10), int(1), int(-1)],
1794 vec![int(20), int(1), int(-1)],
1795 ],
1796 );
1797 assert_eq!(results, vec![int(-1), int(10)]);
1798 }
1799
1800 #[test]
1801 fn test_lag_null_offset_returns_default_for_each_row() {
1802 let results = run_window_two_pass(
1803 &LagFunc,
1804 &[
1805 vec![int(10), null(), text("N/A")],
1806 vec![int(20), null(), text("N/A")],
1807 ],
1808 );
1809 assert_eq!(results, vec![text("N/A"), text("N/A")]);
1810 }
1811
1812 #[test]
1813 fn test_lag_uses_current_row_offset_and_default() {
1814 let results = run_window_two_pass(
1815 &LagFunc,
1816 &[
1817 vec![int(10), int(1), text("first")],
1818 vec![int(20), null(), text("null-offset")],
1819 vec![int(30), int(1), text("third")],
1820 vec![int(40), int(2), text("fourth")],
1821 ],
1822 );
1823 assert_eq!(
1824 results,
1825 vec![text("first"), text("null-offset"), int(20), int(20)]
1826 );
1827 }
1828
1829 #[test]
1830 fn test_lag_negative_offset_reads_following_row() {
1831 let results = run_window_two_pass(
1832 &LagFunc,
1833 &[
1834 vec![int(10), int(-1), text("N/A")],
1835 vec![int(20), int(-1), text("N/A")],
1836 vec![int(30), int(-1), text("N/A")],
1837 ],
1838 );
1839 assert_eq!(results, vec![int(20), int(30), text("N/A")]);
1840 }
1841
1842 #[test]
1843 fn test_lag_fractional_offset_uses_default() {
1844 let results = run_window_two_pass(
1845 &LagFunc,
1846 &[
1847 vec![int(10), float(1.5), text("N/A")],
1848 vec![int(20), float(1.5), text("N/A")],
1849 vec![int(30), float(1.5), text("N/A")],
1850 ],
1851 );
1852 assert_eq!(results, vec![text("N/A"), text("N/A"), text("N/A")]);
1853 }
1854
1855 #[test]
1856 fn test_lag_nonnumeric_text_offset_reads_current_row() {
1857 let results = run_window_two_pass(
1858 &LagFunc,
1859 &[
1860 vec![int(10), text("abc"), text("N/A")],
1861 vec![int(20), text("abc"), text("N/A")],
1862 vec![int(30), text("abc"), text("N/A")],
1863 ],
1864 );
1865 assert_eq!(results, vec![int(10), int(20), int(30)]);
1866 }
1867
1868 #[test]
1869 fn test_lag_integral_text_prefix_offset() {
1870 let results = run_window_two_pass(
1871 &LagFunc,
1872 &[
1873 vec![int(10), text("2.0x"), text("N/A")],
1874 vec![int(20), text("2e0x"), text("N/A")],
1875 vec![int(30), text("2x"), text("N/A")],
1876 ],
1877 );
1878 assert_eq!(results, vec![text("N/A"), text("N/A"), int(10)]);
1879 }
1880
1881 #[test]
1884 fn test_lead_default() {
1885 let func = LeadFunc;
1889 let mut state = func.initial_state();
1890 let rows = [int(10), int(20), int(30)];
1891
1892 for row in &rows {
1894 func.step(&mut state, std::slice::from_ref(row)).unwrap();
1895 }
1896
1897 let mut results = Vec::new();
1899 for _ in &rows {
1900 results.push(func.value(&state).unwrap());
1901 func.inverse(&mut state, &[]).unwrap();
1902 }
1903 assert_eq!(results, vec![int(20), int(30), null()]);
1904 }
1905
1906 #[test]
1907 fn test_lead_offset_2() {
1908 let func = LeadFunc;
1909 let mut state = func.initial_state();
1910 let rows = [int(10), int(20), int(30), int(40), int(50)];
1911
1912 for row in &rows {
1913 func.step(&mut state, &[row.clone(), int(2)]).unwrap();
1914 }
1915
1916 let mut results = Vec::new();
1917 for _ in &rows {
1918 results.push(func.value(&state).unwrap());
1919 func.inverse(&mut state, &[]).unwrap();
1920 }
1921 assert_eq!(results, vec![int(30), int(40), int(50), null(), null()]);
1922 }
1923
1924 #[test]
1925 fn test_lead_default_value() {
1926 let func = LeadFunc;
1927 let mut state = func.initial_state();
1928 let rows = [int(10), int(20)];
1929
1930 for row in &rows {
1931 func.step(&mut state, &[row.clone(), int(1), text("N/A")])
1932 .unwrap();
1933 }
1934
1935 let mut results = Vec::new();
1936 for _ in &rows {
1937 results.push(func.value(&state).unwrap());
1938 func.inverse(&mut state, &[]).unwrap();
1939 }
1940 assert_eq!(results, vec![int(20), text("N/A")]);
1941 }
1942
1943 #[test]
1944 fn test_lead_null_offset_returns_default_for_each_row() {
1945 let func = LeadFunc;
1946 let mut state = func.initial_state();
1947 let rows = [int(10), int(20)];
1948
1949 for row in &rows {
1950 func.step(&mut state, &[row.clone(), null(), text("N/A")])
1951 .unwrap();
1952 }
1953
1954 let mut results = Vec::new();
1955 for _ in &rows {
1956 results.push(func.value(&state).unwrap());
1957 func.inverse(&mut state, &[]).unwrap();
1958 }
1959 assert_eq!(results, vec![text("N/A"), text("N/A")]);
1960 }
1961
1962 #[test]
1963 fn test_lead_uses_current_row_offset_and_default() {
1964 let func = LeadFunc;
1965 let mut state = func.initial_state();
1966 let rows = [
1967 vec![int(10), int(1), text("first")],
1968 vec![int(20), null(), text("null-offset")],
1969 vec![int(30), int(1), text("third")],
1970 ];
1971
1972 for row in &rows {
1973 func.step(&mut state, row).unwrap();
1974 }
1975
1976 let mut results = Vec::new();
1977 for _ in &rows {
1978 results.push(func.value(&state).unwrap());
1979 func.inverse(&mut state, &[]).unwrap();
1980 }
1981 assert_eq!(results, vec![int(20), text("null-offset"), text("third")]);
1982 }
1983
1984 #[test]
1985 fn test_lead_negative_offset_reads_previous_row() {
1986 let func = LeadFunc;
1987 let mut state = func.initial_state();
1988 let rows = [int(10), int(20), int(30)];
1989
1990 for row in &rows {
1991 func.step(&mut state, &[row.clone(), int(-1), text("N/A")])
1992 .unwrap();
1993 }
1994
1995 let mut results = Vec::new();
1996 for _ in &rows {
1997 results.push(func.value(&state).unwrap());
1998 func.inverse(&mut state, &[]).unwrap();
1999 }
2000 assert_eq!(results, vec![text("N/A"), int(10), int(20)]);
2001 }
2002
2003 #[test]
2004 fn test_lead_fractional_offset_uses_default() {
2005 let results = run_window_two_pass(
2006 &LeadFunc,
2007 &[
2008 vec![int(10), float(1.5), text("N/A")],
2009 vec![int(20), float(1.5), text("N/A")],
2010 vec![int(30), float(1.5), text("N/A")],
2011 ],
2012 );
2013 assert_eq!(results, vec![text("N/A"), text("N/A"), text("N/A")]);
2014 }
2015
2016 #[test]
2017 fn test_lead_integral_blob_offset() {
2018 let results = run_window_two_pass(
2019 &LeadFunc,
2020 &[
2021 vec![int(10), blob(b"2.0x"), text("N/A")],
2022 vec![int(20), blob(b"2e0x"), text("N/A")],
2023 vec![int(30), blob(b"2x"), text("N/A")],
2024 ],
2025 );
2026 assert_eq!(results, vec![int(30), text("N/A"), text("N/A")]);
2027 }
2028
2029 #[test]
2032 fn test_first_value_basic() {
2033 let results = run_window_partition(
2034 &FirstValueFunc,
2035 &[vec![int(10)], vec![int(20)], vec![int(30)]],
2036 );
2037 assert_eq!(results, vec![int(10), int(10), int(10)]);
2040 }
2041
2042 #[test]
2045 fn test_last_value_default_frame() {
2046 let results = run_window_partition(
2049 &LastValueFunc,
2050 &[vec![int(10)], vec![int(20)], vec![int(30)]],
2051 );
2052 assert_eq!(results, vec![int(10), int(20), int(30)]);
2053 }
2054
2055 #[test]
2056 fn test_last_value_unbounded_following() {
2057 let func = LastValueFunc;
2060 let mut state = func.initial_state();
2061 func.step(&mut state, &[int(10)]).unwrap();
2062 func.step(&mut state, &[int(20)]).unwrap();
2063 func.step(&mut state, &[int(30)]).unwrap();
2064 assert_eq!(func.value(&state).unwrap(), int(30));
2065 }
2066
2067 #[test]
2070 fn test_nth_value_basic() {
2071 let func = NthValueFunc;
2072 let mut state = func.initial_state();
2073 func.step(&mut state, &[int(10), int(3)]).unwrap();
2075 func.step(&mut state, &[int(20), int(3)]).unwrap();
2076 func.step(&mut state, &[int(30), int(3)]).unwrap();
2077 func.step(&mut state, &[int(40), int(3)]).unwrap();
2078 func.step(&mut state, &[int(50), int(3)]).unwrap();
2079 assert_eq!(func.value(&state).unwrap(), int(30));
2080 }
2081
2082 #[test]
2083 fn test_nth_value_out_of_range() {
2084 let func = NthValueFunc;
2085 let mut state = func.initial_state();
2086 func.step(&mut state, &[int(10), int(100)]).unwrap();
2087 func.step(&mut state, &[int(20), int(100)]).unwrap();
2088 assert_eq!(func.value(&state).unwrap(), null());
2090 }
2091
2092 #[test]
2093 fn test_nth_value_n_zero() {
2094 let func = NthValueFunc;
2095 let mut state = func.initial_state();
2096 let err = func.step(&mut state, &[int(10), int(0)]).unwrap_err();
2097 assert_function_error(err, INVALID_NTH_VALUE_ARGUMENT);
2098 }
2099
2100 #[test]
2101 fn test_nth_value_rejects_negative_n() {
2102 let func = NthValueFunc;
2103 let mut state = func.initial_state();
2104 let err = func.step(&mut state, &[int(10), int(-1)]).unwrap_err();
2105 assert_function_error(err, INVALID_NTH_VALUE_ARGUMENT);
2106 }
2107
2108 #[test]
2109 fn test_nth_value_accepts_integral_real_n() {
2110 let func = NthValueFunc;
2111 let mut state = func.initial_state();
2112 func.step(&mut state, &[int(10), float(2.0)]).unwrap();
2113 func.step(&mut state, &[int(20), float(2.0)]).unwrap();
2114 assert_eq!(func.value(&state).unwrap(), int(20));
2115 }
2116
2117 #[test]
2118 fn test_nth_value_accepts_integral_text_n() {
2119 let func = NthValueFunc;
2120 let mut state = func.initial_state();
2121 func.step(&mut state, &[int(10), text("2e0")]).unwrap();
2122 func.step(&mut state, &[int(20), text("2.0")]).unwrap();
2123 assert_eq!(func.value(&state).unwrap(), int(20));
2124 }
2125
2126 #[test]
2127 fn test_nth_value_rejects_fractional_real_n() {
2128 let func = NthValueFunc;
2129 let mut state = func.initial_state();
2130 let err = func.step(&mut state, &[int(10), float(1.5)]).unwrap_err();
2131 assert_function_error(err, INVALID_NTH_VALUE_ARGUMENT);
2132 }
2133
2134 #[test]
2135 fn test_nth_value_rejects_fractional_text_n() {
2136 let func = NthValueFunc;
2137 let mut state = func.initial_state();
2138 let err = func.step(&mut state, &[int(10), text("1.5")]).unwrap_err();
2139 assert_function_error(err, INVALID_NTH_VALUE_ARGUMENT);
2140 }
2141
2142 #[test]
2143 fn test_nth_value_rejects_text_numeric_prefix_n() {
2144 let func = NthValueFunc;
2145 let mut state = func.initial_state();
2146 let err = func.step(&mut state, &[int(10), text("2x")]).unwrap_err();
2147 assert_function_error(err, INVALID_NTH_VALUE_ARGUMENT);
2148 }
2149
2150 #[test]
2151 fn test_nth_value_rejects_blob_integer_n() {
2152 let func = NthValueFunc;
2153 let mut state = func.initial_state();
2154 let err = func.step(&mut state, &[int(10), blob(b"2")]).unwrap_err();
2155 assert_function_error(err, INVALID_NTH_VALUE_ARGUMENT);
2156 }
2157
2158 #[test]
2159 fn test_nth_value_rejects_non_positive_n_after_first_row() {
2160 let func = NthValueFunc;
2161 let mut state = func.initial_state();
2162 func.step(&mut state, &[int(10), int(1)]).unwrap();
2163 let err = func.step(&mut state, &[int(20), int(0)]).unwrap_err();
2164 assert_function_error(err, INVALID_NTH_VALUE_ARGUMENT);
2165 }
2166
2167 #[test]
2170 fn test_window_sum_text_integer_literals_stay_integer() {
2171 let results = run_window_partition(&WindowSumFunc, &[vec![text("1")], vec![text("2")]]);
2172 assert_eq!(results, vec![int(1), int(3)]);
2173 }
2174
2175 #[test]
2176 fn test_window_sum_non_numeric_text_returns_real_zero() {
2177 let results = run_window_partition(&WindowSumFunc, &[vec![text("abc")]]);
2178 assert_eq!(results, vec![float(0.0)]);
2179 }
2180
2181 #[test]
2182 fn test_window_sum_overflow_suppressed_after_float_input() {
2183 let func = WindowSumFunc;
2184 let mut state = func.initial_state();
2185
2186 func.step(&mut state, &[int(i64::MAX)]).unwrap();
2187 func.step(&mut state, &[int(1)]).unwrap();
2188 assert_eq!(
2189 func.value(&state).unwrap_err().to_string(),
2190 "integer overflow"
2191 );
2192
2193 func.step(&mut state, &[float(0.5)]).unwrap();
2194 assert!(matches!(func.value(&state).unwrap(), SqliteValue::Float(_)));
2195 }
2196
2197 #[test]
2200 fn test_window_min_max_use_sqlite_storage_class_order() {
2201 let text_value = text("z");
2202 let blob_value = blob(b"\0");
2203
2204 let mut min_state = WindowMinFunc.initial_state();
2205 WindowMinFunc
2206 .step(&mut min_state, std::slice::from_ref(&blob_value))
2207 .unwrap();
2208 WindowMinFunc
2209 .step(&mut min_state, std::slice::from_ref(&text_value))
2210 .unwrap();
2211 assert_eq!(WindowMinFunc.value(&min_state).unwrap(), text_value);
2212
2213 let mut max_state = WindowMaxFunc.initial_state();
2214 WindowMaxFunc
2215 .step(&mut max_state, std::slice::from_ref(&text_value))
2216 .unwrap();
2217 WindowMaxFunc
2218 .step(&mut max_state, std::slice::from_ref(&blob_value))
2219 .unwrap();
2220 assert_eq!(WindowMaxFunc.value(&max_state).unwrap(), blob_value);
2221 }
2222
2223 #[test]
2226 fn test_window_group_concat_running_default_separator() {
2227 let results = run_window_partition(
2228 &WindowGroupConcatFunc,
2229 &[vec![text("a")], vec![text("b")], vec![text("c")]],
2230 );
2231 assert_eq!(results, vec![text("a"), text("a,b"), text("a,b,c")]);
2232 }
2233
2234 #[test]
2235 fn test_window_group_concat_running_custom_separator() {
2236 let results = run_window_partition(
2237 &WindowGroupConcatFunc,
2238 &[
2239 vec![text("a"), text(" | ")],
2240 vec![text("b"), text(" | ")],
2241 vec![text("c"), text(" | ")],
2242 ],
2243 );
2244 assert_eq!(results, vec![text("a"), text("a | b"), text("a | b | c")]);
2245 }
2246
2247 #[test]
2248 fn test_window_group_concat_skips_null_and_uses_current_row_separator() {
2249 let results = run_window_partition(
2250 &WindowGroupConcatFunc,
2251 &[
2252 vec![text("a"), text("-")],
2253 vec![null(), text("?")],
2254 vec![text("b"), text("+")],
2255 vec![text("c"), text("*")],
2256 ],
2257 );
2258 assert_eq!(
2259 results,
2260 vec![text("a"), text("a"), text("a+b"), text("a+b*c")]
2261 );
2262 }
2263
2264 #[test]
2265 fn test_window_string_agg_alias_through_registry() {
2266 let mut reg = FunctionRegistry::new();
2267 register_window_builtins(&mut reg);
2268
2269 let sa = reg.find_window("string_agg", 2).unwrap();
2270 let mut state = sa.initial_state();
2271 sa.step(&mut state, &[text("a"), text(";")]).unwrap();
2272 assert_eq!(sa.value(&state).unwrap(), text("a"));
2273 sa.step(&mut state, &[text("b"), text(";")]).unwrap();
2274 assert_eq!(sa.value(&state).unwrap(), text("a;b"));
2275 }
2276
2277 #[test]
2280 fn test_register_window_builtins_all_present() {
2281 let mut reg = FunctionRegistry::new();
2282 register_window_builtins(&mut reg);
2283
2284 let expected_variadic = [
2285 "row_number",
2286 "rank",
2287 "dense_rank",
2288 "percent_rank",
2289 "cume_dist",
2290 "lag",
2291 "lead",
2292 ];
2293 for name in expected_variadic {
2294 assert!(
2295 reg.find_window(name, 0).is_some()
2296 || reg.find_window(name, 1).is_some()
2297 || reg.find_window(name, -1).is_some(),
2298 "window function '{name}' not registered"
2299 );
2300 }
2301
2302 assert!(
2303 reg.find_window("ntile", 1).is_some(),
2304 "ntile(1) not registered"
2305 );
2306 assert!(
2307 reg.find_window("first_value", 1).is_some(),
2308 "first_value(1) not registered"
2309 );
2310 assert!(
2311 reg.find_window("last_value", 1).is_some(),
2312 "last_value(1) not registered"
2313 );
2314 assert!(
2315 reg.find_window("nth_value", 2).is_some(),
2316 "nth_value(2) not registered"
2317 );
2318 assert!(
2319 reg.find_window("group_concat", 1).is_some(),
2320 "group_concat(1) not registered"
2321 );
2322 assert!(
2323 reg.find_window("group_concat", 2).is_some(),
2324 "group_concat(2) not registered"
2325 );
2326 assert!(
2327 reg.find_window("string_agg", 2).is_some(),
2328 "string_agg(2) not registered"
2329 );
2330
2331 for (name, arity) in [
2332 ("rank", 1),
2333 ("dense_rank", 1),
2334 ("percent_rank", 1),
2335 ("cume_dist", 1),
2336 ("lag", 0),
2337 ("lag", 4),
2338 ("lead", 0),
2339 ("lead", 4),
2340 ("count", 2),
2341 ("group_concat", 0),
2342 ("group_concat", 3),
2343 ] {
2344 let f = reg
2345 .find_window(name, arity)
2346 .expect("known window with wrong arity returns erroring window");
2347 let mut state = f.initial_state();
2348 let err = f
2349 .step(&mut state, &[])
2350 .expect_err("invalid window arity should fail");
2351 let expected = format!("wrong number of arguments to function {name}()");
2352 assert!(
2353 matches!(&err, FrankenError::FunctionError(message) if message == &expected),
2354 "unexpected error for {name}/{arity}: {err:?}"
2355 );
2356 }
2357 }
2358
2359 #[test]
2362 fn test_e2e_window_row_number_through_registry() {
2363 let mut reg = FunctionRegistry::new();
2364 register_window_builtins(&mut reg);
2365
2366 let rn = reg.find_window("row_number", 0).unwrap();
2367 let mut state = rn.initial_state();
2368 rn.step(&mut state, &[]).unwrap();
2369 assert_eq!(rn.value(&state).unwrap(), int(1));
2370 rn.step(&mut state, &[]).unwrap();
2371 assert_eq!(rn.value(&state).unwrap(), int(2));
2372 rn.step(&mut state, &[]).unwrap();
2373 assert_eq!(rn.value(&state).unwrap(), int(3));
2374 }
2375
2376 #[test]
2377 fn test_e2e_window_rank_through_registry() {
2378 let mut reg = FunctionRegistry::new();
2379 register_window_builtins(&mut reg);
2380
2381 let rank = reg.find_window("rank", 0).unwrap();
2382 let mut state = rank.initial_state();
2383 rank.step(&mut state, &[int(1)]).unwrap();
2385 assert_eq!(rank.value(&state).unwrap(), int(1));
2386 rank.step(&mut state, &[int(2)]).unwrap();
2387 assert_eq!(rank.value(&state).unwrap(), int(2));
2388 rank.step(&mut state, &[int(2)]).unwrap();
2389 assert_eq!(rank.value(&state).unwrap(), int(2));
2390 rank.step(&mut state, &[int(3)]).unwrap();
2391 assert_eq!(rank.value(&state).unwrap(), int(4));
2392 }
2393}