1use std::cmp::Ordering;
40use std::collections::HashMap;
41
42use super::super::engine::binding::{Binding, Value, Var};
43use super::aggregation::create_aggregator;
44use super::value_compare::{total_compare_values, values_equal};
45
46#[derive(Debug, Clone, PartialEq, Eq)]
52pub enum WindowFuncType {
53 RowNumber,
55 Rank,
56 DenseRank,
57 Ntile(i64),
58 PercentRank,
59 CumeDist,
60
61 FirstValue(Var),
63 LastValue(Var),
64 NthValue(Var, i64),
65 Lag(Var, i64, Option<Value>),
66 Lead(Var, i64, Option<Value>),
67
68 Aggregate(String, Var),
70}
71
72impl WindowFuncType {
73 pub fn row_number() -> Self {
75 Self::RowNumber
76 }
77
78 pub fn rank() -> Self {
80 Self::Rank
81 }
82
83 pub fn dense_rank() -> Self {
85 Self::DenseRank
86 }
87
88 pub fn ntile(n: i64) -> Self {
90 Self::Ntile(n)
91 }
92
93 pub fn lag(var: Var, offset: i64, default: Option<Value>) -> Self {
95 Self::Lag(var, offset, default)
96 }
97
98 pub fn lead(var: Var, offset: i64, default: Option<Value>) -> Self {
100 Self::Lead(var, offset, default)
101 }
102
103 pub fn first_value(var: Var) -> Self {
105 Self::FirstValue(var)
106 }
107
108 pub fn last_value(var: Var) -> Self {
110 Self::LastValue(var)
111 }
112
113 pub fn aggregate(name: &str, var: Var) -> Self {
115 Self::Aggregate(name.to_uppercase(), var)
116 }
117}
118
119#[derive(Debug, Clone, Copy, PartialEq, Eq)]
125pub enum FrameType {
126 Rows,
128 Range,
130 Groups,
132}
133
134#[derive(Debug, Clone, PartialEq, Eq, Default)]
136pub enum FrameBound {
137 UnboundedPreceding,
139 UnboundedFollowing,
141 #[default]
143 CurrentRow,
144 Preceding(i64),
146 Following(i64),
148}
149
150#[derive(Debug, Clone)]
152pub struct FrameSpec {
153 pub frame_type: FrameType,
155 pub start: FrameBound,
157 pub end: FrameBound,
159 pub exclude: FrameExclude,
161}
162
163#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
165pub enum FrameExclude {
166 #[default]
168 NoOthers,
169 CurrentRow,
171 Group,
173 Ties,
175}
176
177impl Default for FrameSpec {
178 fn default() -> Self {
180 Self {
181 frame_type: FrameType::Range,
182 start: FrameBound::UnboundedPreceding,
183 end: FrameBound::CurrentRow,
184 exclude: FrameExclude::NoOthers,
185 }
186 }
187}
188
189impl FrameSpec {
190 pub fn entire_partition() -> Self {
192 Self {
193 frame_type: FrameType::Rows,
194 start: FrameBound::UnboundedPreceding,
195 end: FrameBound::UnboundedFollowing,
196 exclude: FrameExclude::NoOthers,
197 }
198 }
199
200 pub fn running() -> Self {
202 Self {
203 frame_type: FrameType::Rows,
204 start: FrameBound::UnboundedPreceding,
205 end: FrameBound::CurrentRow,
206 exclude: FrameExclude::NoOthers,
207 }
208 }
209
210 pub fn sliding(n: i64) -> Self {
212 Self {
213 frame_type: FrameType::Rows,
214 start: FrameBound::Preceding(n),
215 end: FrameBound::CurrentRow,
216 exclude: FrameExclude::NoOthers,
217 }
218 }
219}
220
221#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
227pub enum SortDirection {
228 #[default]
229 Asc,
230 Desc,
231}
232
233#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
235pub enum NullsOrder {
236 #[default]
237 First,
238 Last,
239}
240
241#[derive(Debug, Clone)]
243pub struct WindowOrderBy {
244 pub var: Var,
246 pub direction: SortDirection,
248 pub nulls: NullsOrder,
250}
251
252impl WindowOrderBy {
253 pub fn new(var: Var) -> Self {
254 Self {
255 var,
256 direction: SortDirection::Asc,
257 nulls: NullsOrder::Last,
258 }
259 }
260
261 pub fn desc(mut self) -> Self {
262 self.direction = SortDirection::Desc;
263 self
264 }
265
266 pub fn nulls_first(mut self) -> Self {
267 self.nulls = NullsOrder::First;
268 self
269 }
270}
271
272#[derive(Debug, Clone, Default)]
274pub struct WindowDef {
275 pub name: Option<String>,
277 pub partition_by: Vec<Var>,
279 pub order_by: Vec<WindowOrderBy>,
281 pub frame: FrameSpec,
283}
284
285impl WindowDef {
286 pub fn partition_by(vars: Vec<Var>) -> Self {
288 Self {
289 partition_by: vars,
290 ..Default::default()
291 }
292 }
293
294 pub fn with_order_by(mut self, order: Vec<WindowOrderBy>) -> Self {
296 self.order_by = order;
297 self
298 }
299
300 pub fn with_frame(mut self, frame: FrameSpec) -> Self {
302 self.frame = frame;
303 self
304 }
305}
306
307#[derive(Debug, Clone)]
313pub struct WindowFunc {
314 pub func_type: WindowFuncType,
316 pub result_var: Var,
318 pub window: WindowDef,
320}
321
322impl WindowFunc {
323 pub fn new(func_type: WindowFuncType, result_var: Var, window: WindowDef) -> Self {
325 Self {
326 func_type,
327 result_var,
328 window,
329 }
330 }
331}
332
333pub struct WindowExecutor;
339
340#[derive(Debug, Clone)]
341struct IndexedBinding {
342 index: usize,
343 binding: Binding,
344}
345
346impl WindowExecutor {
347 pub fn execute(bindings: Vec<Binding>, functions: &[WindowFunc]) -> Vec<Binding> {
349 if bindings.is_empty() || functions.is_empty() {
350 return bindings;
351 }
352
353 let mut result = bindings;
355
356 for func in functions {
357 result = Self::apply_window_function(&result, func);
358 }
359
360 result
361 }
362
363 fn apply_window_function(bindings: &[Binding], func: &WindowFunc) -> Vec<Binding> {
365 let partitions = Self::partition_bindings(bindings, &func.window.partition_by);
367
368 let mut result: Vec<Option<Binding>> = vec![None; bindings.len()];
370
371 for (_key, mut partition) in partitions {
372 Self::sort_partition(&mut partition, &func.window.order_by);
374
375 let computed = Self::compute_for_partition(&partition, func);
377 for entry in computed {
378 if entry.index < result.len() {
379 result[entry.index] = Some(entry.binding);
380 }
381 }
382 }
383
384 result
385 .into_iter()
386 .enumerate()
387 .map(|(idx, binding)| binding.unwrap_or_else(|| bindings[idx].clone()))
388 .collect()
389 }
390
391 fn partition_bindings(
393 bindings: &[Binding],
394 partition_by: &[Var],
395 ) -> Vec<(Vec<Option<Value>>, Vec<IndexedBinding>)> {
396 if partition_by.is_empty() {
397 let entries = bindings
399 .iter()
400 .cloned()
401 .enumerate()
402 .map(|(index, binding)| IndexedBinding { index, binding })
403 .collect();
404 return vec![(vec![], entries)];
405 }
406
407 let mut partitions: HashMap<Vec<Option<Value>>, Vec<IndexedBinding>> = HashMap::new();
408 let mut key_order: Vec<Vec<Option<Value>>> = Vec::new();
409
410 for (index, binding) in bindings.iter().cloned().enumerate() {
411 let key_values: Vec<Option<Value>> = partition_by
412 .iter()
413 .map(|v| binding.get(v).cloned())
414 .collect();
415
416 if !partitions.contains_key(&key_values) {
417 key_order.push(key_values.clone());
418 }
419
420 partitions
421 .entry(key_values)
422 .or_default()
423 .push(IndexedBinding { index, binding });
424 }
425
426 key_order
428 .into_iter()
429 .filter_map(|values| partitions.remove(&values).map(|rows| (values, rows)))
430 .collect()
431 }
432
433 fn sort_partition(partition: &mut [IndexedBinding], order_by: &[WindowOrderBy]) {
435 if order_by.is_empty() {
436 return;
437 }
438
439 partition.sort_by(|a, b| {
440 for spec in order_by {
441 let val_a = a.binding.get(&spec.var);
442 let val_b = b.binding.get(&spec.var);
443
444 let cmp = match (val_a, val_b) {
445 (None, None) => Ordering::Equal,
446 (None, Some(_)) => match spec.nulls {
447 NullsOrder::First => Ordering::Less,
448 NullsOrder::Last => Ordering::Greater,
449 },
450 (Some(_), None) => match spec.nulls {
451 NullsOrder::First => Ordering::Greater,
452 NullsOrder::Last => Ordering::Less,
453 },
454 (Some(a), Some(b)) => total_compare_values(a, b),
455 };
456
457 if cmp != Ordering::Equal {
458 return match spec.direction {
459 SortDirection::Asc => cmp,
460 SortDirection::Desc => cmp.reverse(),
461 };
462 }
463 }
464 a.index.cmp(&b.index)
465 });
466 }
467
468 fn compute_for_partition(
470 partition: &[IndexedBinding],
471 func: &WindowFunc,
472 ) -> Vec<IndexedBinding> {
473 let partition_size = partition.len();
474
475 let peer_groups = Self::compute_peer_groups(partition, &func.window.order_by);
477
478 partition
479 .iter()
480 .enumerate()
481 .map(|(row_idx, indexed)| {
482 let value =
483 Self::compute_value(partition, row_idx, &peer_groups, partition_size, func);
484
485 let result_binding = Binding::one(func.result_var.clone(), value);
487 let binding = indexed
488 .binding
489 .merge(&result_binding)
490 .unwrap_or_else(|| indexed.binding.clone());
491 IndexedBinding {
492 index: indexed.index,
493 binding,
494 }
495 })
496 .collect()
497 }
498
499 fn compute_peer_groups(partition: &[IndexedBinding], order_by: &[WindowOrderBy]) -> Vec<usize> {
501 if order_by.is_empty() {
502 return vec![0; partition.len()];
504 }
505
506 let mut groups = Vec::with_capacity(partition.len());
507 let mut current_group = 0;
508
509 for (idx, indexed) in partition.iter().enumerate() {
510 if idx == 0 {
511 groups.push(0);
512 continue;
513 }
514
515 let prev = &partition[idx - 1].binding;
516 let binding = &indexed.binding;
517 let is_peer = order_by.iter().all(|spec| {
518 let a = prev.get(&spec.var);
519 let b = binding.get(&spec.var);
520 match (a, b) {
521 (None, None) => true,
522 (Some(va), Some(vb)) => values_equal(va, vb),
523 _ => false,
524 }
525 });
526
527 if !is_peer {
528 current_group += 1;
529 }
530 groups.push(current_group);
531 }
532
533 groups
534 }
535
536 fn compute_value(
538 partition: &[IndexedBinding],
539 row_idx: usize,
540 peer_groups: &[usize],
541 partition_size: usize,
542 func: &WindowFunc,
543 ) -> Value {
544 match &func.func_type {
545 WindowFuncType::RowNumber => Value::Integer((row_idx + 1) as i64),
547
548 WindowFuncType::Rank => {
549 let current_group = peer_groups[row_idx];
551 let first_in_group = peer_groups
552 .iter()
553 .position(|&g| g == current_group)
554 .unwrap();
555 Value::Integer((first_in_group + 1) as i64)
556 }
557
558 WindowFuncType::DenseRank => {
559 Value::Integer((peer_groups[row_idx] + 1) as i64)
561 }
562
563 WindowFuncType::Ntile(n) => {
564 let n = *n as usize;
566 if n == 0 || partition_size == 0 {
567 return Value::Null;
568 }
569 let bucket_size = partition_size / n;
570 let remainder = partition_size % n;
571
572 let mut row = 0;
574 let mut bucket = 1;
575 for i in 0..n {
576 let size = bucket_size + if i < remainder { 1 } else { 0 };
577 if row_idx < row + size {
578 bucket = i + 1;
579 break;
580 }
581 row += size;
582 }
583 Value::Integer(bucket as i64)
584 }
585
586 WindowFuncType::PercentRank => {
587 if partition_size <= 1 {
589 return Value::Float(0.0);
590 }
591 let current_group = peer_groups[row_idx];
592 let first_in_group = peer_groups
593 .iter()
594 .position(|&g| g == current_group)
595 .unwrap();
596 let rank = first_in_group as f64;
597 Value::Float(rank / (partition_size - 1) as f64)
598 }
599
600 WindowFuncType::CumeDist => {
601 let current_group = peer_groups[row_idx];
603 let count = peer_groups.iter().filter(|&&g| g <= current_group).count();
605 Value::Float(count as f64 / partition_size as f64)
606 }
607
608 WindowFuncType::FirstValue(var) => {
610 let (start, _) = Self::get_frame_bounds(
612 row_idx,
613 partition_size,
614 peer_groups,
615 &func.window.frame,
616 );
617 partition
618 .get(start)
619 .and_then(|b| b.binding.get(var))
620 .cloned()
621 .unwrap_or(Value::Null)
622 }
623
624 WindowFuncType::LastValue(var) => {
625 let (_, end) = Self::get_frame_bounds(
626 row_idx,
627 partition_size,
628 peer_groups,
629 &func.window.frame,
630 );
631 if end > 0 {
633 partition
634 .get(end - 1)
635 .and_then(|b| b.binding.get(var))
636 .cloned()
637 .unwrap_or(Value::Null)
638 } else {
639 Value::Null
640 }
641 }
642
643 WindowFuncType::NthValue(var, n) => {
644 let (start, end) = Self::get_frame_bounds(
645 row_idx,
646 partition_size,
647 peer_groups,
648 &func.window.frame,
649 );
650 let n = *n as usize;
651 if n == 0 {
652 return Value::Null;
653 }
654 let target_idx = start + n - 1;
655 if target_idx < end {
656 partition
657 .get(target_idx)
658 .and_then(|b| b.binding.get(var))
659 .cloned()
660 .unwrap_or(Value::Null)
661 } else {
662 Value::Null
663 }
664 }
665
666 WindowFuncType::Lag(var, offset, default) => {
667 let offset = *offset as usize;
668 if row_idx >= offset {
669 partition
670 .get(row_idx - offset)
671 .and_then(|b| b.binding.get(var))
672 .cloned()
673 .unwrap_or_else(|| default.clone().unwrap_or(Value::Null))
674 } else {
675 default.clone().unwrap_or(Value::Null)
676 }
677 }
678
679 WindowFuncType::Lead(var, offset, default) => {
680 let offset = *offset as usize;
681 let target = row_idx + offset;
682 if target < partition_size {
683 partition
684 .get(target)
685 .and_then(|b| b.binding.get(var))
686 .cloned()
687 .unwrap_or_else(|| default.clone().unwrap_or(Value::Null))
688 } else {
689 default.clone().unwrap_or(Value::Null)
690 }
691 }
692
693 WindowFuncType::Aggregate(agg_name, var) => {
695 let (start, end) = Self::get_frame_bounds(
696 row_idx,
697 partition_size,
698 peer_groups,
699 &func.window.frame,
700 );
701
702 if let Some(mut aggregator) = create_aggregator(agg_name) {
703 for i in start..end {
704 if let Some(binding) = partition.get(i) {
705 let value = binding.binding.get(var);
706 aggregator.accumulate(value);
707 }
708 }
709 aggregator.finalize()
710 } else {
711 Value::Null
712 }
713 }
714 }
715 }
716
717 fn get_frame_bounds(
720 row_idx: usize,
721 partition_size: usize,
722 peer_groups: &[usize],
723 frame: &FrameSpec,
724 ) -> (usize, usize) {
725 let start = match &frame.start {
726 FrameBound::UnboundedPreceding => 0,
727 FrameBound::CurrentRow => {
728 match frame.frame_type {
729 FrameType::Rows => row_idx,
730 FrameType::Range | FrameType::Groups => {
731 let group = peer_groups[row_idx];
733 peer_groups
734 .iter()
735 .position(|&g| g == group)
736 .unwrap_or(row_idx)
737 }
738 }
739 }
740 FrameBound::Preceding(n) => {
741 match frame.frame_type {
742 FrameType::Rows => row_idx.saturating_sub(*n as usize),
743 FrameType::Groups => {
744 let current_group = peer_groups[row_idx];
746 let target_group = current_group.saturating_sub(*n as usize);
747 peer_groups
748 .iter()
749 .position(|&g| g == target_group)
750 .unwrap_or(0)
751 }
752 FrameType::Range => row_idx.saturating_sub(*n as usize),
753 }
754 }
755 FrameBound::Following(n) => match frame.frame_type {
756 FrameType::Rows => (row_idx + *n as usize).min(partition_size),
757 FrameType::Groups => {
758 let current_group = peer_groups[row_idx];
759 let target_group = current_group + *n as usize;
760 peer_groups
761 .iter()
762 .position(|&g| g >= target_group)
763 .unwrap_or(partition_size)
764 }
765 FrameType::Range => (row_idx + *n as usize).min(partition_size),
766 },
767 FrameBound::UnboundedFollowing => partition_size,
768 };
769
770 let end = match &frame.end {
771 FrameBound::UnboundedFollowing => partition_size,
772 FrameBound::CurrentRow => {
773 match frame.frame_type {
774 FrameType::Rows => row_idx + 1,
775 FrameType::Range | FrameType::Groups => {
776 let group = peer_groups[row_idx];
778 peer_groups
779 .iter()
780 .position(|&g| g > group)
781 .unwrap_or(partition_size)
782 }
783 }
784 }
785 FrameBound::Preceding(n) => match frame.frame_type {
786 FrameType::Rows => row_idx.saturating_sub(*n as usize) + 1,
787 FrameType::Groups => {
788 let current_group = peer_groups[row_idx];
789 let target_group = current_group.saturating_sub(*n as usize);
790 peer_groups
791 .iter()
792 .position(|&g| g > target_group)
793 .unwrap_or(partition_size)
794 }
795 FrameType::Range => row_idx.saturating_sub(*n as usize) + 1,
796 },
797 FrameBound::Following(n) => match frame.frame_type {
798 FrameType::Rows => (row_idx + *n as usize + 1).min(partition_size),
799 FrameType::Groups => {
800 let current_group = peer_groups[row_idx];
801 let target_group = current_group + *n as usize;
802 peer_groups
803 .iter()
804 .position(|&g| g > target_group)
805 .unwrap_or(partition_size)
806 }
807 FrameType::Range => (row_idx + *n as usize + 1).min(partition_size),
808 },
809 FrameBound::UnboundedPreceding => 0, };
811
812 (
813 start.min(partition_size),
814 end.min(partition_size).max(start),
815 )
816 }
817}
818
819#[cfg(test)]
824mod tests {
825 use super::*;
826
827 fn make_binding(pairs: &[(&str, Value)]) -> Binding {
828 if pairs.is_empty() {
829 return Binding::empty();
830 }
831
832 let mut result = Binding::one(Var::new(pairs[0].0), pairs[0].1.clone());
833
834 for (k, v) in pairs.iter().skip(1) {
835 let next = Binding::one(Var::new(k), v.clone());
836 result = result.merge(&next).unwrap_or(result);
837 }
838
839 result
840 }
841
842 fn get_values(bindings: &[Binding], var: &str) -> Vec<i64> {
843 let v = Var::new(var);
844 bindings
845 .iter()
846 .filter_map(|b| b.get(&v))
847 .filter_map(|v| match v {
848 Value::Integer(i) => Some(*i),
849 _ => None,
850 })
851 .collect()
852 }
853
854 #[test]
855 fn test_row_number() {
856 let bindings = vec![
857 make_binding(&[
858 ("dept", Value::String("A".to_string())),
859 ("salary", Value::Integer(100)),
860 ]),
861 make_binding(&[
862 ("dept", Value::String("A".to_string())),
863 ("salary", Value::Integer(200)),
864 ]),
865 make_binding(&[
866 ("dept", Value::String("B".to_string())),
867 ("salary", Value::Integer(150)),
868 ]),
869 ];
870
871 let func = WindowFunc::new(
872 WindowFuncType::RowNumber,
873 Var::new("rn"),
874 WindowDef::partition_by(vec![Var::new("dept")])
875 .with_order_by(vec![WindowOrderBy::new(Var::new("salary"))]),
876 );
877
878 let result = WindowExecutor::execute(bindings, &[func]);
879
880 let rns = get_values(&result, "rn");
882 assert_eq!(rns, vec![1, 2, 1]); }
884
885 #[test]
886 fn test_rank_with_ties() {
887 let bindings = vec![
888 make_binding(&[("score", Value::Integer(100))]),
889 make_binding(&[("score", Value::Integer(100))]), make_binding(&[("score", Value::Integer(90))]),
891 make_binding(&[("score", Value::Integer(80))]),
892 ];
893
894 let func = WindowFunc::new(
895 WindowFuncType::Rank,
896 Var::new("rank"),
897 WindowDef::default().with_order_by(vec![WindowOrderBy::new(Var::new("score")).desc()]),
898 );
899
900 let result = WindowExecutor::execute(bindings, &[func]);
901 let ranks = get_values(&result, "rank");
902
903 assert_eq!(ranks, vec![1, 1, 3, 4]);
905 }
906
907 #[test]
908 fn test_dense_rank() {
909 let bindings = vec![
910 make_binding(&[("score", Value::Integer(100))]),
911 make_binding(&[("score", Value::Integer(100))]),
912 make_binding(&[("score", Value::Integer(90))]),
913 ];
914
915 let func = WindowFunc::new(
916 WindowFuncType::DenseRank,
917 Var::new("drank"),
918 WindowDef::default().with_order_by(vec![WindowOrderBy::new(Var::new("score")).desc()]),
919 );
920
921 let result = WindowExecutor::execute(bindings, &[func]);
922 let ranks = get_values(&result, "drank");
923
924 assert_eq!(ranks, vec![1, 1, 2]);
926 }
927
928 #[test]
929 fn test_ntile() {
930 let bindings: Vec<Binding> = (1..=10)
931 .map(|i| make_binding(&[("val", Value::Integer(i))]))
932 .collect();
933
934 let func = WindowFunc::new(
935 WindowFuncType::Ntile(4),
936 Var::new("bucket"),
937 WindowDef::default().with_order_by(vec![WindowOrderBy::new(Var::new("val"))]),
938 );
939
940 let result = WindowExecutor::execute(bindings, &[func]);
941 let buckets = get_values(&result, "bucket");
942
943 assert_eq!(buckets, vec![1, 1, 1, 2, 2, 2, 3, 3, 4, 4]);
945 }
946
947 #[test]
948 fn test_lag_lead() {
949 let bindings: Vec<Binding> = (1..=5)
950 .map(|i| make_binding(&[("val", Value::Integer(i))]))
951 .collect();
952
953 let lag_func = WindowFunc::new(
954 WindowFuncType::Lag(Var::new("val"), 1, Some(Value::Integer(0))),
955 Var::new("prev"),
956 WindowDef::default().with_order_by(vec![WindowOrderBy::new(Var::new("val"))]),
957 );
958
959 let lead_func = WindowFunc::new(
960 WindowFuncType::Lead(Var::new("val"), 1, Some(Value::Integer(0))),
961 Var::new("next"),
962 WindowDef::default().with_order_by(vec![WindowOrderBy::new(Var::new("val"))]),
963 );
964
965 let result = WindowExecutor::execute(bindings, &[lag_func, lead_func]);
966 let prevs = get_values(&result, "prev");
967 let nexts = get_values(&result, "next");
968
969 assert_eq!(prevs, vec![0, 1, 2, 3, 4]); assert_eq!(nexts, vec![2, 3, 4, 5, 0]); }
972
973 #[test]
974 fn test_running_sum() {
975 let bindings: Vec<Binding> = (1..=5)
976 .map(|i| make_binding(&[("val", Value::Integer(i))]))
977 .collect();
978
979 let func = WindowFunc::new(
980 WindowFuncType::Aggregate("SUM".to_string(), Var::new("val")),
981 Var::new("running_sum"),
982 WindowDef::default()
983 .with_order_by(vec![WindowOrderBy::new(Var::new("val"))])
984 .with_frame(FrameSpec::running()),
985 );
986
987 let result = WindowExecutor::execute(bindings, &[func]);
988 let sums = get_values(&result, "running_sum");
989
990 assert_eq!(sums, vec![1, 3, 6, 10, 15]);
992 }
993
994 #[test]
995 fn test_first_last_value() {
996 let bindings: Vec<Binding> = (1..=5)
997 .map(|i| make_binding(&[("val", Value::Integer(i))]))
998 .collect();
999
1000 let first_func = WindowFunc::new(
1001 WindowFuncType::FirstValue(Var::new("val")),
1002 Var::new("first"),
1003 WindowDef::default()
1004 .with_order_by(vec![WindowOrderBy::new(Var::new("val"))])
1005 .with_frame(FrameSpec::entire_partition()),
1006 );
1007
1008 let last_func = WindowFunc::new(
1009 WindowFuncType::LastValue(Var::new("val")),
1010 Var::new("last"),
1011 WindowDef::default()
1012 .with_order_by(vec![WindowOrderBy::new(Var::new("val"))])
1013 .with_frame(FrameSpec::entire_partition()),
1014 );
1015
1016 let result = WindowExecutor::execute(bindings, &[first_func, last_func]);
1017 let firsts = get_values(&result, "first");
1018 let lasts = get_values(&result, "last");
1019
1020 assert_eq!(firsts, vec![1, 1, 1, 1, 1]); assert_eq!(lasts, vec![5, 5, 5, 5, 5]); }
1023
1024 #[test]
1025 fn test_partitioned_sum() {
1026 let bindings = vec![
1027 make_binding(&[
1028 ("dept", Value::String("A".to_string())),
1029 ("salary", Value::Integer(100)),
1030 ]),
1031 make_binding(&[
1032 ("dept", Value::String("A".to_string())),
1033 ("salary", Value::Integer(200)),
1034 ]),
1035 make_binding(&[
1036 ("dept", Value::String("B".to_string())),
1037 ("salary", Value::Integer(150)),
1038 ]),
1039 make_binding(&[
1040 ("dept", Value::String("B".to_string())),
1041 ("salary", Value::Integer(250)),
1042 ]),
1043 ];
1044
1045 let func = WindowFunc::new(
1046 WindowFuncType::Aggregate("SUM".to_string(), Var::new("salary")),
1047 Var::new("dept_total"),
1048 WindowDef::partition_by(vec![Var::new("dept")])
1049 .with_frame(FrameSpec::entire_partition()),
1050 );
1051
1052 let result = WindowExecutor::execute(bindings, &[func]);
1053 let totals = get_values(&result, "dept_total");
1054
1055 assert_eq!(totals, vec![300, 300, 400, 400]);
1057 }
1058
1059 #[test]
1060 fn test_window_preserves_input_order() {
1061 let bindings = vec![
1062 make_binding(&[
1063 ("dept", Value::String("A".to_string())),
1064 ("seq", Value::Integer(1)),
1065 ]),
1066 make_binding(&[
1067 ("dept", Value::String("B".to_string())),
1068 ("seq", Value::Integer(1)),
1069 ]),
1070 make_binding(&[
1071 ("dept", Value::String("A".to_string())),
1072 ("seq", Value::Integer(2)),
1073 ]),
1074 make_binding(&[
1075 ("dept", Value::String("B".to_string())),
1076 ("seq", Value::Integer(2)),
1077 ]),
1078 ];
1079
1080 let func = WindowFunc::new(
1081 WindowFuncType::RowNumber,
1082 Var::new("rn"),
1083 WindowDef::partition_by(vec![Var::new("dept")])
1084 .with_order_by(vec![WindowOrderBy::new(Var::new("seq"))]),
1085 );
1086
1087 let result = WindowExecutor::execute(bindings, &[func]);
1088 let dept_var = Var::new("dept");
1089 let depts: Vec<String> = result
1090 .iter()
1091 .filter_map(|b| b.get(&dept_var))
1092 .filter_map(|v| match v {
1093 Value::String(s) => Some(s.clone()),
1094 _ => None,
1095 })
1096 .collect();
1097
1098 assert_eq!(depts, vec!["A", "B", "A", "B"]);
1099 assert_eq!(get_values(&result, "rn"), vec![1, 1, 2, 2]);
1100 }
1101
1102 #[test]
1103 fn test_percent_rank() {
1104 let bindings: Vec<Binding> = (1..=4)
1105 .map(|i| make_binding(&[("val", Value::Integer(i))]))
1106 .collect();
1107
1108 let func = WindowFunc::new(
1109 WindowFuncType::PercentRank,
1110 Var::new("prank"),
1111 WindowDef::default().with_order_by(vec![WindowOrderBy::new(Var::new("val"))]),
1112 );
1113
1114 let result = WindowExecutor::execute(bindings, &[func]);
1115
1116 for (i, binding) in result.iter().enumerate() {
1118 if let Some(Value::Float(pr)) = binding.get(&Var::new("prank")) {
1119 let expected = i as f64 / 3.0;
1120 assert!(
1121 (pr - expected).abs() < 0.001,
1122 "Row {}: expected {}, got {}",
1123 i,
1124 expected,
1125 pr
1126 );
1127 }
1128 }
1129 }
1130}