1use crate::error::{QueryError, Result};
43use crate::executor::filter::Value;
44use crate::executor::scan::{ColumnData, RecordBatch};
45use std::cmp::Ordering;
46use std::collections::HashMap;
47
48#[derive(Debug, Clone, PartialEq)]
50pub enum WindowFunction {
51 RowNumber,
53 Rank,
55 DenseRank,
57 Lag {
59 offset: usize,
61 default: Option<Value>,
63 },
64 Lead {
66 offset: usize,
68 default: Option<Value>,
70 },
71 FirstValue,
73 LastValue,
75 NthValue {
77 n: usize,
79 },
80}
81
82impl WindowFunction {
83 pub fn lag() -> Self {
85 WindowFunction::Lag {
86 offset: 1,
87 default: None,
88 }
89 }
90
91 pub fn lag_offset(offset: usize) -> Self {
93 WindowFunction::Lag {
94 offset,
95 default: None,
96 }
97 }
98
99 pub fn lag_offset_default(offset: usize, default: Value) -> Self {
101 WindowFunction::Lag {
102 offset,
103 default: Some(default),
104 }
105 }
106
107 pub fn lead() -> Self {
109 WindowFunction::Lead {
110 offset: 1,
111 default: None,
112 }
113 }
114
115 pub fn lead_offset(offset: usize) -> Self {
117 WindowFunction::Lead {
118 offset,
119 default: None,
120 }
121 }
122
123 pub fn lead_offset_default(offset: usize, default: Value) -> Self {
125 WindowFunction::Lead {
126 offset,
127 default: Some(default),
128 }
129 }
130
131 pub fn nth_value(n: usize) -> Self {
133 WindowFunction::NthValue { n }
134 }
135
136 pub fn reads_target(&self) -> bool {
141 matches!(
142 self,
143 WindowFunction::Lag { .. }
144 | WindowFunction::Lead { .. }
145 | WindowFunction::FirstValue
146 | WindowFunction::LastValue
147 | WindowFunction::NthValue { .. }
148 )
149 }
150}
151
152#[derive(Debug, Clone, Copy, PartialEq, Eq)]
154pub struct OrderKey {
155 pub column: usize,
157 pub ascending: bool,
159}
160
161impl OrderKey {
162 pub fn asc(column: usize) -> Self {
164 Self {
165 column,
166 ascending: true,
167 }
168 }
169
170 pub fn desc(column: usize) -> Self {
172 Self {
173 column,
174 ascending: false,
175 }
176 }
177}
178
179#[derive(Debug, Clone, Default, PartialEq)]
181pub struct WindowSpec {
182 pub partition_by: Vec<usize>,
184 pub order_by: Vec<OrderKey>,
186}
187
188impl WindowSpec {
189 pub fn new(partition_by: Vec<usize>, order_by: Vec<OrderKey>) -> Self {
191 Self {
192 partition_by,
193 order_by,
194 }
195 }
196
197 pub fn ordered(order_by: Vec<OrderKey>) -> Self {
199 Self {
200 partition_by: Vec::new(),
201 order_by,
202 }
203 }
204}
205
206fn compare_values(left: &Value, right: &Value) -> Ordering {
215 match (left, right) {
218 (Value::Null, Value::Null) => return Ordering::Equal,
219 (Value::Null, _) => return Ordering::Greater,
220 (_, Value::Null) => return Ordering::Less,
221 _ => {}
222 }
223
224 match (left, right) {
225 (Value::Boolean(a), Value::Boolean(b)) => a.cmp(b),
226 (Value::String(a), Value::String(b)) => a.cmp(b),
227
228 (Value::Int32(a), Value::Int32(b)) => a.cmp(b),
230 (Value::Int64(a), Value::Int64(b)) => a.cmp(b),
231
232 (Value::Int32(a), Value::Int64(b)) => (*a as i64).cmp(b),
234 (Value::Int64(a), Value::Int32(b)) => a.cmp(&(*b as i64)),
235
236 (Value::Float32(a), Value::Float32(b)) => float_cmp(*a as f64, *b as f64),
238 (Value::Float64(a), Value::Float64(b)) => float_cmp(*a, *b),
239
240 (Value::Float32(a), Value::Float64(b)) => float_cmp(*a as f64, *b),
242 (Value::Float64(a), Value::Float32(b)) => float_cmp(*a, *b as f64),
243
244 (Value::Int32(a), Value::Float32(b)) => float_cmp(*a as f64, *b as f64),
246 (Value::Int32(a), Value::Float64(b)) => float_cmp(*a as f64, *b),
247 (Value::Int64(a), Value::Float32(b)) => float_cmp(*a as f64, *b as f64),
248 (Value::Int64(a), Value::Float64(b)) => float_cmp(*a as f64, *b),
249 (Value::Float32(a), Value::Int32(b)) => float_cmp(*a as f64, *b as f64),
250 (Value::Float32(a), Value::Int64(b)) => float_cmp(*a as f64, *b as f64),
251 (Value::Float64(a), Value::Int32(b)) => float_cmp(*a, *b as f64),
252 (Value::Float64(a), Value::Int64(b)) => float_cmp(*a, *b as f64),
253
254 _ => Ordering::Equal,
258 }
259}
260
261fn float_cmp(a: f64, b: f64) -> Ordering {
263 match a.partial_cmp(&b) {
264 Some(ordering) => ordering,
265 None => {
266 match (a.is_nan(), b.is_nan()) {
268 (true, true) => Ordering::Equal,
269 (true, false) => Ordering::Greater,
270 (false, true) => Ordering::Less,
271 (false, false) => Ordering::Equal,
272 }
273 }
274 }
275}
276
277fn order_keys_equal(a: &[Value], b: &[Value]) -> bool {
279 a.len() == b.len()
280 && a.iter()
281 .zip(b)
282 .all(|(x, y)| compare_values(x, y) == Ordering::Equal)
283}
284
285#[allow(unreachable_patterns)]
298fn partition_fingerprint(values: &[Value]) -> String {
299 let mut key = String::new();
300 for value in values {
301 match value {
302 Value::Null => key.push_str("N|"),
303 Value::Boolean(b) => {
304 key.push_str("b:");
305 key.push_str(if *b { "1" } else { "0" });
306 key.push('|');
307 }
308 Value::Int32(i) => {
309 key.push_str("i:");
310 key.push_str(&(*i as i64).to_string());
311 key.push('|');
312 }
313 Value::Int64(i) => {
314 key.push_str("i:");
315 key.push_str(&i.to_string());
316 key.push('|');
317 }
318 Value::Float32(fl) => {
319 key.push_str("f:");
320 key.push_str(&format!("{:?}", *fl as f64));
321 key.push('|');
322 }
323 Value::Float64(fl) => {
324 key.push_str("f:");
325 key.push_str(&format!("{:?}", fl));
326 key.push('|');
327 }
328 Value::String(s) => {
329 key.push_str("s:");
330 key.push_str(&s.len().to_string());
332 key.push(':');
333 key.push_str(s);
334 key.push('|');
335 }
336 other => {
339 key.push_str("x:");
340 key.push_str(&format!("{:?}", other));
341 key.push('|');
342 }
343 }
344 }
345 key
346}
347
348pub fn evaluate_window(
358 func: &WindowFunction,
359 spec: &WindowSpec,
360 num_rows: usize,
361 target_column: usize,
362 value_at: impl Fn(usize, usize) -> Value,
363) -> Result<Vec<Value>> {
364 if num_rows == 0 {
365 return Ok(Vec::new());
366 }
367
368 let partitions = build_partitions(spec, num_rows, &value_at);
370
371 let mut output = vec![Value::Null; num_rows];
373
374 for partition in &partitions {
375 let sorted = sort_partition(spec, partition, &value_at);
377
378 match func {
380 WindowFunction::RowNumber => {
381 assign_row_number(&sorted, &mut output);
382 }
383 WindowFunction::Rank => {
384 assign_rank(spec, &sorted, &value_at, &mut output, true);
385 }
386 WindowFunction::DenseRank => {
387 assign_rank(spec, &sorted, &value_at, &mut output, false);
388 }
389 WindowFunction::Lag { offset, default } => {
390 assign_lag_lead(
391 &sorted,
392 target_column,
393 &value_at,
394 default,
395 *offset as isize,
396 true,
397 &mut output,
398 );
399 }
400 WindowFunction::Lead { offset, default } => {
401 assign_lag_lead(
402 &sorted,
403 target_column,
404 &value_at,
405 default,
406 *offset as isize,
407 false,
408 &mut output,
409 );
410 }
411 WindowFunction::FirstValue => {
412 assign_nth_in_partition(&sorted, target_column, &value_at, Some(0), &mut output);
413 }
414 WindowFunction::LastValue => {
415 let last = sorted.len().checked_sub(1);
416 assign_nth_in_partition(&sorted, target_column, &value_at, last, &mut output);
417 }
418 WindowFunction::NthValue { n } => {
419 let index = n.checked_sub(1);
421 assign_nth_in_partition(&sorted, target_column, &value_at, index, &mut output);
422 }
423 }
424 }
425
426 Ok(output)
427}
428
429pub fn evaluate_window_batch(
435 func: &WindowFunction,
436 spec: &WindowSpec,
437 target_column: usize,
438 batch: &RecordBatch,
439) -> Result<Vec<Value>> {
440 let max_column = spec
441 .partition_by
442 .iter()
443 .chain(spec.order_by.iter().map(|key| &key.column))
444 .copied()
445 .max();
446
447 if let Some(max_column) = max_column {
448 if max_column >= batch.columns.len() {
449 return Err(QueryError::execution(format!(
450 "Window column index {} out of bounds (batch has {} columns)",
451 max_column,
452 batch.columns.len()
453 )));
454 }
455 }
456
457 if func.reads_target() && target_column >= batch.columns.len() {
458 return Err(QueryError::execution(format!(
459 "Window target column index {} out of bounds (batch has {} columns)",
460 target_column,
461 batch.columns.len()
462 )));
463 }
464
465 evaluate_window(func, spec, batch.num_rows, target_column, |row, column| {
466 column_value(&batch.columns[column], row)
467 })
468}
469
470fn column_value(column: &ColumnData, row_idx: usize) -> Value {
475 match column {
476 ColumnData::Boolean(data) => data
477 .get(row_idx)
478 .and_then(|v| v.as_ref())
479 .map(|&v| Value::Boolean(v))
480 .unwrap_or(Value::Null),
481 ColumnData::Int32(data) => data
482 .get(row_idx)
483 .and_then(|v| v.as_ref())
484 .map(|&v| Value::Int32(v))
485 .unwrap_or(Value::Null),
486 ColumnData::Int64(data) => data
487 .get(row_idx)
488 .and_then(|v| v.as_ref())
489 .map(|&v| Value::Int64(v))
490 .unwrap_or(Value::Null),
491 ColumnData::Float32(data) => data
492 .get(row_idx)
493 .and_then(|v| v.as_ref())
494 .map(|&v| Value::Float32(v))
495 .unwrap_or(Value::Null),
496 ColumnData::Float64(data) => data
497 .get(row_idx)
498 .and_then(|v| v.as_ref())
499 .map(|&v| Value::Float64(v))
500 .unwrap_or(Value::Null),
501 ColumnData::String(data) => data
502 .get(row_idx)
503 .and_then(|v| v.as_ref())
504 .map(|v| Value::String(v.clone()))
505 .unwrap_or(Value::Null),
506 ColumnData::Binary(_) => Value::Null,
507 }
508}
509
510fn build_partitions(
512 spec: &WindowSpec,
513 num_rows: usize,
514 value_at: &impl Fn(usize, usize) -> Value,
515) -> Vec<Vec<usize>> {
516 if spec.partition_by.is_empty() {
517 return vec![(0..num_rows).collect()];
519 }
520
521 let mut order: Vec<String> = Vec::new();
522 let mut groups: HashMap<String, Vec<usize>> = HashMap::new();
523
524 for row in 0..num_rows {
525 let key_values: Vec<Value> = spec
526 .partition_by
527 .iter()
528 .map(|&column| value_at(row, column))
529 .collect();
530 let fingerprint = partition_fingerprint(&key_values);
531
532 match groups.get_mut(&fingerprint) {
533 Some(bucket) => bucket.push(row),
534 None => {
535 order.push(fingerprint.clone());
536 groups.insert(fingerprint, vec![row]);
537 }
538 }
539 }
540
541 order
542 .into_iter()
543 .filter_map(|fingerprint| groups.remove(&fingerprint))
544 .collect()
545}
546
547fn sort_partition(
552 spec: &WindowSpec,
553 partition: &[usize],
554 value_at: &impl Fn(usize, usize) -> Value,
555) -> Vec<usize> {
556 let mut sorted = partition.to_vec();
557 if spec.order_by.is_empty() {
558 return sorted;
559 }
560
561 sorted.sort_by(|&a, &b| {
562 for key in &spec.order_by {
563 let left = value_at(a, key.column);
564 let right = value_at(b, key.column);
565 let mut ordering = compare_values(&left, &right);
566 if !key.ascending {
567 ordering = ordering.reverse();
568 }
569 if ordering != Ordering::Equal {
570 return ordering;
571 }
572 }
573 Ordering::Equal
574 });
575
576 sorted
577}
578
579fn order_key_values(
581 spec: &WindowSpec,
582 row: usize,
583 value_at: &impl Fn(usize, usize) -> Value,
584) -> Vec<Value> {
585 spec.order_by
586 .iter()
587 .map(|key| value_at(row, key.column))
588 .collect()
589}
590
591fn assign_row_number(sorted: &[usize], output: &mut [Value]) {
593 for (position, &row) in sorted.iter().enumerate() {
594 output[row] = Value::Int64((position + 1) as i64);
595 }
596}
597
598fn assign_rank(
605 spec: &WindowSpec,
606 sorted: &[usize],
607 value_at: &impl Fn(usize, usize) -> Value,
608 output: &mut [Value],
609 competition: bool,
610) {
611 let mut current_rank: i64 = 0;
612 let mut previous_key: Option<Vec<Value>> = None;
613
614 for (position, &row) in sorted.iter().enumerate() {
615 let ordinal = (position + 1) as i64;
618 let key = order_key_values(spec, row, value_at);
619
620 let is_new_group = match &previous_key {
621 None => true,
622 Some(prev) => !order_keys_equal(prev, &key),
623 };
624
625 if is_new_group {
626 current_rank = if competition {
627 ordinal
628 } else {
629 current_rank + 1
630 };
631 previous_key = Some(key);
632 }
633
634 output[row] = Value::Int64(current_rank);
635 }
636}
637
638fn assign_lag_lead(
644 sorted: &[usize],
645 target_column: usize,
646 value_at: &impl Fn(usize, usize) -> Value,
647 default: &Option<Value>,
648 offset: isize,
649 backward: bool,
650 output: &mut [Value],
651) {
652 let len = sorted.len() as isize;
653 let signed_offset = if backward { -offset } else { offset };
654
655 for (position, &row) in sorted.iter().enumerate() {
656 let target_index = position as isize + signed_offset;
657 let value = if target_index >= 0 && target_index < len {
658 let source_row = sorted[target_index as usize];
659 value_at(source_row, target_column)
660 } else {
661 default.clone().unwrap_or(Value::Null)
662 };
663 output[row] = value;
664 }
665}
666
667fn assign_nth_in_partition(
674 sorted: &[usize],
675 target_column: usize,
676 value_at: &impl Fn(usize, usize) -> Value,
677 index: Option<usize>,
678 output: &mut [Value],
679) {
680 let picked = match index {
681 Some(idx) => sorted
682 .get(idx)
683 .map(|&source_row| value_at(source_row, target_column))
684 .unwrap_or(Value::Null),
685 None => Value::Null,
686 };
687
688 for &row in sorted {
689 output[row] = picked.clone();
690 }
691}
692
693#[cfg(test)]
694mod tests {
695 use super::*;
696
697 fn table(columns: Vec<Vec<Value>>) -> (usize, impl Fn(usize, usize) -> Value) {
699 let num_rows = columns.first().map(|c| c.len()).unwrap_or(0);
700 (num_rows, move |row: usize, col: usize| {
701 columns[col][row].clone()
702 })
703 }
704
705 #[test]
706 fn unit_row_number_basic() -> Result<()> {
707 let (rows, value_at) = table(vec![vec![
709 Value::Int64(30),
710 Value::Int64(10),
711 Value::Int64(20),
712 ]]);
713 let spec = WindowSpec::ordered(vec![OrderKey::asc(0)]);
714 let out = evaluate_window(&WindowFunction::RowNumber, &spec, rows, 0, value_at)?;
715 assert_eq!(out[1], Value::Int64(1));
717 assert_eq!(out[2], Value::Int64(2));
718 assert_eq!(out[0], Value::Int64(3));
719 Ok(())
720 }
721
722 #[test]
723 fn unit_compare_nulls_last() {
724 assert_eq!(
725 compare_values(&Value::Int64(1), &Value::Null),
726 Ordering::Less
727 );
728 assert_eq!(
729 compare_values(&Value::Null, &Value::Int64(1)),
730 Ordering::Greater
731 );
732 assert_eq!(compare_values(&Value::Null, &Value::Null), Ordering::Equal);
733 }
734
735 #[test]
736 fn unit_compare_mixed_numeric() {
737 assert_eq!(
738 compare_values(&Value::Int32(5), &Value::Float64(5.5)),
739 Ordering::Less
740 );
741 assert_eq!(
742 compare_values(&Value::Float64(2.0), &Value::Int64(2)),
743 Ordering::Equal
744 );
745 }
746}