1use std::collections::{BTreeMap, HashMap};
7use std::sync::Arc;
8use std::time::Instant;
9
10use anyhow::{anyhow, Result};
11use tracing::{debug, info};
12
13use crate::data::data_view::DataView;
14use crate::data::datatable::{DataTable, DataValue};
15use crate::sql::parser::ast::{
16 FrameBound, FrameUnit, OrderByItem, SortDirection, SqlExpression, WindowSpec,
17};
18
19#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
22struct PartitionKey(String);
23
24impl PartitionKey {
25 fn from_values(values: Vec<DataValue>) -> Self {
27 let key_parts: Vec<String> = values
29 .iter()
30 .map(|v| match v {
31 DataValue::String(s) => format!("S:{}", s),
32 DataValue::InternedString(s) => format!("S:{}", s),
33 DataValue::Integer(i) => format!("I:{}", i),
34 DataValue::Float(f) => format!("F:{}", f),
35 DataValue::Boolean(b) => format!("B:{}", b),
36 DataValue::DateTime(dt) => format!("D:{}", dt),
37 DataValue::Vector(v) => {
38 let components: Vec<String> = v.iter().map(|f| f.to_string()).collect();
39 format!("V:[{}]", components.join(","))
40 }
41 DataValue::Null => "N".to_string(),
42 })
43 .collect();
44 let key = key_parts.join("|");
45 PartitionKey(key)
46 }
47}
48
49#[derive(Debug, Clone)]
51pub struct OrderedPartition {
52 rows: Vec<usize>,
54
55 row_positions: HashMap<usize, usize>,
57}
58
59impl OrderedPartition {
60 fn new(rows: Vec<usize>) -> Self {
62 let row_positions: HashMap<usize, usize> = rows
64 .iter()
65 .enumerate()
66 .map(|(pos, &row_idx)| (row_idx, pos))
67 .collect();
68
69 Self {
70 rows,
71 row_positions,
72 }
73 }
74
75 pub fn get_row_at_offset(&self, current_row: usize, offset: i32) -> Option<usize> {
77 let current_pos = self.row_positions.get(¤t_row)?;
78 let target_pos = (*current_pos as i32) + offset;
79
80 if target_pos >= 0 && target_pos < self.rows.len() as i32 {
81 Some(self.rows[target_pos as usize])
82 } else {
83 None
84 }
85 }
86
87 pub fn get_position(&self, row_index: usize) -> Option<usize> {
89 self.row_positions.get(&row_index).copied()
90 }
91
92 pub fn first_row(&self) -> Option<usize> {
94 self.rows.first().copied()
95 }
96
97 pub fn last_row(&self) -> Option<usize> {
99 self.rows.last().copied()
100 }
101}
102
103pub struct WindowContext {
105 source: Arc<DataView>,
107
108 partitions: BTreeMap<PartitionKey, OrderedPartition>,
110
111 row_to_partition: HashMap<usize, PartitionKey>,
113
114 spec: WindowSpec,
116}
117
118impl WindowContext {
119 pub fn new(
121 view: Arc<DataView>,
122 partition_by: Vec<String>,
123 order_by: Vec<OrderByItem>,
124 ) -> Result<Self> {
125 Self::new_with_spec(
126 view,
127 WindowSpec {
128 partition_by,
129 order_by,
130 frame: None,
131 },
132 )
133 }
134
135 pub fn new_with_spec(view: Arc<DataView>, spec: WindowSpec) -> Result<Self> {
137 let overall_start = Instant::now();
138 let partition_by = spec.partition_by.clone();
139 let order_by = spec.order_by.clone();
140 let row_count = view.row_count();
141
142 if partition_by.is_empty() {
144 info!(
145 "Creating single partition (no PARTITION BY) for {} rows",
146 row_count
147 );
148 let partition_start = Instant::now();
149
150 let single_partition = Self::create_single_partition(&view, &order_by)?;
151 let partition_key = PartitionKey::from_values(vec![]);
152
153 let mut row_to_partition = HashMap::new();
155 for &row_idx in &single_partition.rows {
156 row_to_partition.insert(row_idx, partition_key.clone());
157 }
158
159 let mut partitions = BTreeMap::new();
160 partitions.insert(partition_key, single_partition);
161
162 info!(
163 "Single partition created in {:.2}ms (1 partition, {} rows)",
164 partition_start.elapsed().as_secs_f64() * 1000.0,
165 row_count
166 );
167
168 info!(
169 "WindowContext::new_with_spec (single partition) took {:.2}ms total",
170 overall_start.elapsed().as_secs_f64() * 1000.0
171 );
172
173 return Ok(Self {
174 source: view,
175 partitions,
176 row_to_partition,
177 spec,
178 });
179 }
180
181 info!(
183 "Creating partitions with PARTITION BY for {} rows",
184 row_count
185 );
186 let partition_start = Instant::now();
187
188 let mut partition_map: BTreeMap<PartitionKey, Vec<usize>> = BTreeMap::new();
189 let mut row_to_partition = HashMap::new();
190
191 let source_table = view.source();
193 let partition_col_indices: Vec<usize> = partition_by
194 .iter()
195 .map(|col| {
196 source_table
197 .get_column_index(col)
198 .ok_or_else(|| anyhow!("Invalid partition column: {}", col))
199 })
200 .collect::<Result<Vec<_>>>()?;
201
202 let grouping_start = Instant::now();
204 for row_idx in view.get_visible_rows() {
205 let mut key_values = Vec::new();
207 for &col_idx in &partition_col_indices {
208 let value = source_table
209 .get_value(row_idx, col_idx)
210 .ok_or_else(|| anyhow!("Failed to get value for partition"))?
211 .clone();
212 key_values.push(value);
213 }
214 let key = PartitionKey::from_values(key_values);
215
216 partition_map.entry(key.clone()).or_default().push(row_idx);
218 row_to_partition.insert(row_idx, key);
219 }
220
221 info!(
222 "Partition grouping took {:.2}ms ({} partitions created)",
223 grouping_start.elapsed().as_secs_f64() * 1000.0,
224 partition_map.len()
225 );
226
227 let sort_start = Instant::now();
229 let mut partitions = BTreeMap::new();
230 let partition_count = partition_map.len();
231
232 for (key, mut rows) in partition_map {
233 if !order_by.is_empty() {
235 Self::sort_rows(&mut rows, source_table, &order_by)?;
236 }
237
238 partitions.insert(key, OrderedPartition::new(rows));
239 }
240
241 info!(
242 "Partition sorting took {:.2}ms ({} partitions, ORDER BY: {})",
243 sort_start.elapsed().as_secs_f64() * 1000.0,
244 partition_count,
245 !order_by.is_empty()
246 );
247
248 info!(
249 "Total partition creation took {:.2}ms",
250 partition_start.elapsed().as_secs_f64() * 1000.0
251 );
252
253 info!(
254 "WindowContext::new_with_spec (multi-partition) took {:.2}ms total",
255 overall_start.elapsed().as_secs_f64() * 1000.0
256 );
257
258 Ok(Self {
259 source: view,
260 partitions,
261 row_to_partition,
262 spec,
263 })
264 }
265
266 fn create_single_partition(
268 view: &DataView,
269 order_by: &[OrderByItem],
270 ) -> Result<OrderedPartition> {
271 let mut rows: Vec<usize> = view.get_visible_rows();
272
273 if !order_by.is_empty() {
274 let sort_start = Instant::now();
275 Self::sort_rows(&mut rows, view.source(), order_by)?;
276 debug!(
277 "Single partition sort took {:.2}ms ({} rows)",
278 sort_start.elapsed().as_secs_f64() * 1000.0,
279 rows.len()
280 );
281 }
282
283 Ok(OrderedPartition::new(rows))
284 }
285
286 fn sort_rows(rows: &mut Vec<usize>, table: &DataTable, order_by: &[OrderByItem]) -> Result<()> {
288 let prep_start = Instant::now();
289
290 let sort_cols: Vec<(usize, bool)> = order_by
292 .iter()
293 .map(|col| {
294 let column_name = match &col.expr {
296 SqlExpression::Column(col_ref) => &col_ref.name,
297 _ => {
298 return Err(anyhow!("Window function ORDER BY only supports simple columns, not expressions"));
299 }
300 };
301 let idx = table
302 .get_column_index(column_name)
303 .ok_or_else(|| anyhow!("Invalid ORDER BY column: {}", column_name))?;
304 let ascending = matches!(col.direction, SortDirection::Asc);
305 Ok((idx, ascending))
306 })
307 .collect::<Result<Vec<_>>>()?;
308
309 debug!(
310 "Sort preparation took {:.2}μs ({} sort columns)",
311 prep_start.elapsed().as_micros(),
312 sort_cols.len()
313 );
314
315 let sort_start = Instant::now();
316
317 rows.sort_by(|&a, &b| {
319 for &(col_idx, ascending) in &sort_cols {
320 let val_a = table.get_value(a, col_idx);
321 let val_b = table.get_value(b, col_idx);
322
323 match (val_a, val_b) {
324 (None, None) => continue,
325 (None, Some(_)) => {
326 return if ascending {
327 std::cmp::Ordering::Less
328 } else {
329 std::cmp::Ordering::Greater
330 }
331 }
332 (Some(_), None) => {
333 return if ascending {
334 std::cmp::Ordering::Greater
335 } else {
336 std::cmp::Ordering::Less
337 }
338 }
339 (Some(v_a), Some(v_b)) => {
340 let ord = v_a.partial_cmp(&v_b).unwrap_or(std::cmp::Ordering::Equal);
342 if ord != std::cmp::Ordering::Equal {
343 return if ascending { ord } else { ord.reverse() };
344 }
345 }
346 }
347 }
348 std::cmp::Ordering::Equal
349 });
350
351 debug!(
352 "Actual sort operation took {:.2}μs ({} rows)",
353 sort_start.elapsed().as_micros(),
354 rows.len()
355 );
356
357 Ok(())
358 }
359
360 pub fn get_offset_value(
362 &self,
363 current_row: usize,
364 offset: i32,
365 column: &str,
366 ) -> Option<DataValue> {
367 let start = Instant::now();
370
371 let partition_lookup_start = Instant::now();
373 let partition_key = self.row_to_partition.get(¤t_row)?;
374 let partition = self.partitions.get(partition_key)?;
375 let partition_lookup_time = partition_lookup_start.elapsed();
376
377 let offset_nav_start = Instant::now();
379 let target_row = partition.get_row_at_offset(current_row, offset)?;
380 let offset_nav_time = offset_nav_start.elapsed();
381
382 let value_access_start = Instant::now();
384 let source_table = self.source.source();
385 let col_idx = source_table.get_column_index(column)?;
386 let value = source_table.get_value(target_row, col_idx).cloned();
387 let value_access_time = value_access_start.elapsed();
388
389 let total_time = start.elapsed();
391 if total_time.as_micros() > 10 {
392 debug!(
393 "get_offset_value slow: total={:.2}μs, partition_lookup={:.2}μs, offset_nav={:.2}μs, value_access={:.2}μs",
394 total_time.as_micros(),
395 partition_lookup_time.as_micros(),
396 offset_nav_time.as_micros(),
397 value_access_time.as_micros()
398 );
399 }
400
401 value
402 }
403
404 pub fn get_row_number(&self, row_index: usize) -> usize {
406 if let Some(partition_key) = self.row_to_partition.get(&row_index) {
407 if let Some(partition) = self.partitions.get(partition_key) {
408 if let Some(position) = partition.get_position(row_index) {
409 return position + 1; }
411 }
412 }
413 0 }
415
416 pub fn get_frame_first_value(&self, row_index: usize, column: &str) -> Option<DataValue> {
418 let frame_rows = self.get_frame_rows(row_index);
419 if frame_rows.is_empty() {
420 return Some(DataValue::Null);
421 }
422
423 let source_table = self.source.source();
424 let col_idx = source_table.get_column_index(column)?;
425
426 let first_row = frame_rows[0];
428 source_table.get_value(first_row, col_idx).cloned()
429 }
430
431 pub fn get_frame_last_value(&self, row_index: usize, column: &str) -> Option<DataValue> {
433 let frame_rows = self.get_frame_rows(row_index);
434 if frame_rows.is_empty() {
435 return Some(DataValue::Null);
436 }
437
438 let source_table = self.source.source();
439 let col_idx = source_table.get_column_index(column)?;
440
441 let last_row = frame_rows[frame_rows.len() - 1];
443 source_table.get_value(last_row, col_idx).cloned()
444 }
445
446 pub fn get_first_value(&self, row_index: usize, column: &str) -> Option<DataValue> {
448 let partition_key = self.row_to_partition.get(&row_index)?;
449 let partition = self.partitions.get(partition_key)?;
450 let first_row = partition.first_row()?;
451
452 let source_table = self.source.source();
453 let col_idx = source_table.get_column_index(column)?;
454 source_table.get_value(first_row, col_idx).cloned()
455 }
456
457 pub fn get_last_value(&self, row_index: usize, column: &str) -> Option<DataValue> {
459 let partition_key = self.row_to_partition.get(&row_index)?;
460 let partition = self.partitions.get(partition_key)?;
461 let last_row = partition.last_row()?;
462
463 let source_table = self.source.source();
464 let col_idx = source_table.get_column_index(column)?;
465 source_table.get_value(last_row, col_idx).cloned()
466 }
467
468 pub fn partition_count(&self) -> usize {
470 self.partitions.len()
471 }
472
473 pub fn has_partitions(&self) -> bool {
475 !self.spec.partition_by.is_empty()
476 }
477
478 pub fn has_frame(&self) -> bool {
480 self.spec.frame.is_some()
481 }
482
483 pub fn is_running_aggregate_frame(&self) -> bool {
485 if let Some(frame) = &self.spec.frame {
486 matches!(frame.start, FrameBound::UnboundedPreceding) && frame.end.is_none()
487 } else {
489 false
490 }
491 }
492
493 pub fn source(&self) -> &DataTable {
495 self.source.source()
496 }
497
498 pub fn get_frame_rows(&self, row_index: usize) -> Vec<usize> {
500 let partition_key = match self.row_to_partition.get(&row_index) {
502 Some(key) => key,
503 None => return vec![],
504 };
505
506 let partition = match self.partitions.get(partition_key) {
507 Some(p) => p,
508 None => return vec![],
509 };
510
511 let current_pos = match partition.get_position(row_index) {
513 Some(pos) => pos as i64,
514 None => return vec![],
515 };
516
517 let frame = match &self.spec.frame {
519 Some(f) => f,
520 None => return partition.rows.clone(),
521 };
522
523 let (start_pos, end_pos) = match frame.unit {
525 FrameUnit::Rows => {
526 let start =
528 self.calculate_frame_position(&frame.start, current_pos, partition.rows.len());
529 let end = match &frame.end {
530 Some(bound) => {
531 self.calculate_frame_position(bound, current_pos, partition.rows.len())
532 }
533 None => current_pos, };
535 (start, end)
536 }
537 FrameUnit::Range => {
538 let start =
541 self.calculate_frame_position(&frame.start, current_pos, partition.rows.len());
542 let end = match &frame.end {
543 Some(bound) => {
544 self.calculate_frame_position(bound, current_pos, partition.rows.len())
545 }
546 None => current_pos,
547 };
548 (start, end)
549 }
550 };
551
552 let mut frame_rows = Vec::new();
554 for i in start_pos..=end_pos {
555 if i >= 0 && (i as usize) < partition.rows.len() {
556 frame_rows.push(partition.rows[i as usize]);
557 }
558 }
559
560 frame_rows
561 }
562
563 fn calculate_frame_position(
565 &self,
566 bound: &FrameBound,
567 current_pos: i64,
568 partition_size: usize,
569 ) -> i64 {
570 match bound {
571 FrameBound::UnboundedPreceding => 0,
572 FrameBound::UnboundedFollowing => partition_size as i64 - 1,
573 FrameBound::CurrentRow => current_pos,
574 FrameBound::Preceding(n) => current_pos - n,
575 FrameBound::Following(n) => current_pos + n,
576 }
577 }
578
579 pub fn get_frame_sum(&self, row_index: usize, column: &str) -> Option<DataValue> {
581 let frame_rows = self.get_frame_rows(row_index);
582 if frame_rows.is_empty() {
583 return Some(DataValue::Null);
584 }
585
586 let source_table = self.source.source();
587 let col_idx = source_table.get_column_index(column)?;
588
589 let mut sum = 0.0;
590 let mut has_float = false;
591 let mut has_value = false;
592
593 for &row_idx in &frame_rows {
595 if let Some(value) = source_table.get_value(row_idx, col_idx) {
596 match value {
597 DataValue::Integer(i) => {
598 sum += *i as f64;
599 has_value = true;
600 }
601 DataValue::Float(f) => {
602 sum += f;
603 has_float = true;
604 has_value = true;
605 }
606 DataValue::Null => {
607 }
609 _ => {
610 return Some(DataValue::Null);
612 }
613 }
614 }
615 }
616
617 if !has_value {
618 return Some(DataValue::Null);
619 }
620
621 if !has_float && sum.fract() == 0.0 && sum >= i64::MIN as f64 && sum <= i64::MAX as f64 {
623 Some(DataValue::Integer(sum as i64))
624 } else {
625 Some(DataValue::Float(sum))
626 }
627 }
628
629 pub fn get_frame_count(&self, row_index: usize, column: Option<&str>) -> Option<DataValue> {
631 let frame_rows = self.get_frame_rows(row_index);
632 if frame_rows.is_empty() {
633 return Some(DataValue::Integer(0));
634 }
635
636 if let Some(col_name) = column {
637 let source_table = self.source.source();
639 let col_idx = source_table.get_column_index(col_name)?;
640
641 let count = frame_rows
642 .iter()
643 .filter_map(|&row_idx| source_table.get_value(row_idx, col_idx))
644 .filter(|v| !matches!(v, DataValue::Null))
645 .count();
646
647 Some(DataValue::Integer(count as i64))
648 } else {
649 Some(DataValue::Integer(frame_rows.len() as i64))
651 }
652 }
653
654 pub fn get_frame_avg(&self, row_index: usize, column: &str) -> Option<DataValue> {
656 let frame_rows = self.get_frame_rows(row_index);
657 if frame_rows.is_empty() {
658 return Some(DataValue::Null);
659 }
660
661 let source_table = self.source.source();
662 let col_idx = source_table.get_column_index(column)?;
663
664 let mut sum = 0.0;
665 let mut count = 0;
666
667 for &row_idx in &frame_rows {
669 if let Some(value) = source_table.get_value(row_idx, col_idx) {
670 match value {
671 DataValue::Integer(i) => {
672 sum += *i as f64;
673 count += 1;
674 }
675 DataValue::Float(f) => {
676 sum += f;
677 count += 1;
678 }
679 DataValue::Null => {
680 }
682 _ => {
683 return Some(DataValue::Null);
685 }
686 }
687 }
688 }
689
690 if count == 0 {
691 return Some(DataValue::Null);
692 }
693
694 Some(DataValue::Float(sum / count as f64))
695 }
696
697 pub fn get_frame_stddev(&self, row_index: usize, column: &str) -> Option<DataValue> {
699 let variance = self.get_frame_variance(row_index, column)?;
700 match variance {
701 DataValue::Float(v) => Some(DataValue::Float(v.sqrt())),
702 DataValue::Null => Some(DataValue::Null),
703 _ => Some(DataValue::Null),
704 }
705 }
706
707 pub fn get_frame_variance(&self, row_index: usize, column: &str) -> Option<DataValue> {
709 let frame_rows = self.get_frame_rows(row_index);
710 if frame_rows.is_empty() {
711 return Some(DataValue::Null);
712 }
713
714 let source_table = self.source.source();
715 let col_idx = source_table.get_column_index(column)?;
716
717 let mut values = Vec::new();
718
719 for &row_idx in &frame_rows {
721 if let Some(value) = source_table.get_value(row_idx, col_idx) {
722 match value {
723 DataValue::Integer(i) => values.push(*i as f64),
724 DataValue::Float(f) => values.push(*f),
725 DataValue::Null => {
726 }
728 _ => {
729 return Some(DataValue::Null);
731 }
732 }
733 }
734 }
735
736 if values.is_empty() {
737 return Some(DataValue::Null);
738 }
739
740 if values.len() == 1 {
741 return Some(DataValue::Float(0.0));
743 }
744
745 let mean = values.iter().sum::<f64>() / values.len() as f64;
747
748 let variance =
750 values.iter().map(|x| (x - mean).powi(2)).sum::<f64>() / (values.len() - 1) as f64;
751
752 Some(DataValue::Float(variance))
753 }
754
755 pub fn get_partition_sum(&self, row_index: usize, column: &str) -> Option<DataValue> {
757 let partition_key = self.row_to_partition.get(&row_index)?;
758 let partition = self.partitions.get(partition_key)?;
759 let source_table = self.source.source();
760 let col_idx = source_table.get_column_index(column)?;
761
762 let mut sum = 0.0;
763 let mut has_float = false;
764 let mut has_value = false;
765
766 for &row_idx in &partition.rows {
768 if let Some(value) = source_table.get_value(row_idx, col_idx) {
769 match value {
770 DataValue::Integer(i) => {
771 sum += *i as f64;
772 has_value = true;
773 }
774 DataValue::Float(f) => {
775 sum += f;
776 has_float = true;
777 has_value = true;
778 }
779 DataValue::Null => {
780 }
782 _ => {
783 return Some(DataValue::Null);
785 }
786 }
787 }
788 }
789
790 if !has_value {
791 return Some(DataValue::Null);
792 }
793
794 if !has_float && sum.fract() == 0.0 && sum >= i64::MIN as f64 && sum <= i64::MAX as f64 {
796 Some(DataValue::Integer(sum as i64))
797 } else {
798 Some(DataValue::Float(sum))
799 }
800 }
801
802 pub fn get_partition_count(&self, row_index: usize, column: Option<&str>) -> Option<DataValue> {
804 let partition_key = self.row_to_partition.get(&row_index)?;
805 let partition = self.partitions.get(partition_key)?;
806
807 if let Some(col_name) = column {
808 let source_table = self.source.source();
810 let col_idx = source_table.get_column_index(col_name)?;
811
812 let count = partition
813 .rows
814 .iter()
815 .filter_map(|&row_idx| source_table.get_value(row_idx, col_idx))
816 .filter(|v| !matches!(v, DataValue::Null))
817 .count();
818
819 Some(DataValue::Integer(count as i64))
820 } else {
821 Some(DataValue::Integer(partition.rows.len() as i64))
823 }
824 }
825
826 pub fn get_partition_avg(&self, row_index: usize, column: &str) -> Option<DataValue> {
828 let partition_key = self.row_to_partition.get(&row_index)?;
829 let partition = self.partitions.get(partition_key)?;
830 let source_table = self.source.source();
831 let col_idx = source_table.get_column_index(column)?;
832
833 let mut sum = 0.0;
834 let mut count = 0;
835
836 for &row_idx in &partition.rows {
838 if let Some(value) = source_table.get_value(row_idx, col_idx) {
839 match value {
840 DataValue::Integer(i) => {
841 sum += *i as f64;
842 count += 1;
843 }
844 DataValue::Float(f) => {
845 sum += f;
846 count += 1;
847 }
848 DataValue::Null => {
849 }
851 _ => {
852 return Some(DataValue::Null);
854 }
855 }
856 }
857 }
858
859 if count == 0 {
860 Some(DataValue::Null)
861 } else {
862 Some(DataValue::Float(sum / count as f64))
863 }
864 }
865
866 pub fn get_partition_min(&self, row_index: usize, column: &str) -> Option<DataValue> {
868 let partition_key = self.row_to_partition.get(&row_index)?;
869 let partition = self.partitions.get(partition_key)?;
870 let source_table = self.source.source();
871 let col_idx = source_table.get_column_index(column)?;
872
873 let mut min_value: Option<DataValue> = None;
874
875 for &row_idx in &partition.rows {
876 if let Some(value) = source_table.get_value(row_idx, col_idx) {
877 if !matches!(value, DataValue::Null) {
878 match &min_value {
879 None => min_value = Some(value.clone()),
880 Some(current_min) => {
881 use crate::data::datavalue_compare::compare_datavalues;
882 if compare_datavalues(value, current_min).is_lt() {
883 min_value = Some(value.clone());
884 }
885 }
886 }
887 }
888 }
889 }
890
891 min_value.or(Some(DataValue::Null))
892 }
893
894 pub fn get_partition_max(&self, row_index: usize, column: &str) -> Option<DataValue> {
896 let partition_key = self.row_to_partition.get(&row_index)?;
897 let partition = self.partitions.get(partition_key)?;
898 let source_table = self.source.source();
899 let col_idx = source_table.get_column_index(column)?;
900
901 let mut max_value: Option<DataValue> = None;
902
903 for &row_idx in &partition.rows {
904 if let Some(value) = source_table.get_value(row_idx, col_idx) {
905 if !matches!(value, DataValue::Null) {
906 match &max_value {
907 None => max_value = Some(value.clone()),
908 Some(current_max) => {
909 use crate::data::datavalue_compare::compare_datavalues;
910 if compare_datavalues(value, current_max).is_gt() {
911 max_value = Some(value.clone());
912 }
913 }
914 }
915 }
916 }
917 }
918
919 max_value.or(Some(DataValue::Null))
920 }
921
922 pub fn get_partition_rows(&self, row_index: usize) -> Vec<usize> {
924 if let Some(partition_key) = self.row_to_partition.get(&row_index) {
925 if let Some(partition) = self.partitions.get(partition_key) {
926 return partition.rows.clone();
927 }
928 }
929 vec![]
930 }
931
932 pub fn get_frame_min(&self, row_index: usize, column: &str) -> Option<DataValue> {
934 let frame_rows = self.get_frame_rows(row_index);
935 let source_table = self.source.source();
936 let col_idx = source_table.get_column_index(column)?;
937
938 let mut min_value: Option<DataValue> = None;
939
940 for &frame_row_idx in &frame_rows {
941 if let Some(value) = source_table.get_value(frame_row_idx, col_idx) {
942 if !matches!(value, DataValue::Null) {
943 match &min_value {
944 None => min_value = Some(value.clone()),
945 Some(current_min) => {
946 use crate::data::datavalue_compare::compare_datavalues;
947 if compare_datavalues(value, current_min).is_lt() {
948 min_value = Some(value.clone());
949 }
950 }
951 }
952 }
953 }
954 }
955
956 min_value.or(Some(DataValue::Null))
957 }
958
959 pub fn get_frame_max(&self, row_index: usize, column: &str) -> Option<DataValue> {
961 let frame_rows = self.get_frame_rows(row_index);
962 let source_table = self.source.source();
963 let col_idx = source_table.get_column_index(column)?;
964
965 let mut max_value: Option<DataValue> = None;
966
967 for &frame_row_idx in &frame_rows {
968 if let Some(value) = source_table.get_value(frame_row_idx, col_idx) {
969 if !matches!(value, DataValue::Null) {
970 match &max_value {
971 None => max_value = Some(value.clone()),
972 Some(current_max) => {
973 use crate::data::datavalue_compare::compare_datavalues;
974 if compare_datavalues(value, current_max).is_gt() {
975 max_value = Some(value.clone());
976 }
977 }
978 }
979 }
980 }
981 }
982
983 max_value.or(Some(DataValue::Null))
984 }
985
986 pub fn evaluate_lag_batch(
989 &self,
990 visible_rows: &[usize],
991 column_name: &str,
992 offset: i64,
993 ) -> Result<Vec<DataValue>> {
994 let start = Instant::now();
995 let mut results = Vec::with_capacity(visible_rows.len());
996
997 let source_table = self.source.source();
999 let _ = source_table
1000 .get_column_index(column_name)
1001 .ok_or_else(|| anyhow!("Column '{}' not found", column_name))?;
1002
1003 for &row_idx in visible_rows {
1004 let value =
1006 if let Some(val) = self.get_offset_value(row_idx, -(offset as i32), column_name) {
1007 val
1008 } else {
1009 DataValue::Null
1010 };
1011 results.push(value);
1012 }
1013
1014 debug!(
1015 "evaluate_lag_batch: {} rows in {:.3}ms ({:.2}μs/row)",
1016 visible_rows.len(),
1017 start.elapsed().as_secs_f64() * 1000.0,
1018 start.elapsed().as_micros() as f64 / visible_rows.len() as f64
1019 );
1020
1021 Ok(results)
1022 }
1023
1024 pub fn evaluate_lead_batch(
1026 &self,
1027 visible_rows: &[usize],
1028 column_name: &str,
1029 offset: i64,
1030 ) -> Result<Vec<DataValue>> {
1031 let start = Instant::now();
1032 let mut results = Vec::with_capacity(visible_rows.len());
1033
1034 let source_table = self.source.source();
1036 let _ = source_table
1037 .get_column_index(column_name)
1038 .ok_or_else(|| anyhow!("Column '{}' not found", column_name))?;
1039
1040 for &row_idx in visible_rows {
1041 let value =
1043 if let Some(val) = self.get_offset_value(row_idx, offset as i32, column_name) {
1044 val
1045 } else {
1046 DataValue::Null
1047 };
1048 results.push(value);
1049 }
1050
1051 debug!(
1052 "evaluate_lead_batch: {} rows in {:.3}ms ({:.2}μs/row)",
1053 visible_rows.len(),
1054 start.elapsed().as_secs_f64() * 1000.0,
1055 start.elapsed().as_micros() as f64 / visible_rows.len() as f64
1056 );
1057
1058 Ok(results)
1059 }
1060
1061 pub fn evaluate_row_number_batch(&self, visible_rows: &[usize]) -> Result<Vec<DataValue>> {
1063 let start = Instant::now();
1064 let mut results = Vec::with_capacity(visible_rows.len());
1065
1066 for &row_idx in visible_rows {
1067 let row_num = self.get_row_number(row_idx);
1068 if row_num > 0 {
1069 results.push(DataValue::Integer(row_num as i64));
1070 } else {
1071 results.push(DataValue::Null);
1073 }
1074 }
1075
1076 debug!(
1077 "evaluate_row_number_batch: {} rows in {:.3}ms ({:.2}μs/row)",
1078 visible_rows.len(),
1079 start.elapsed().as_secs_f64() * 1000.0,
1080 start.elapsed().as_micros() as f64 / visible_rows.len() as f64
1081 );
1082
1083 Ok(results)
1084 }
1085
1086 pub fn get_rank(&self, row_index: usize) -> i64 {
1089 if let Some(partition_key) = self.row_to_partition.get(&row_index) {
1090 if let Some(partition) = self.partitions.get(partition_key) {
1091 if let Some(position) = partition.get_position(row_index) {
1093 let mut rows_before = 0;
1094
1095 for i in 0..position {
1097 let prev_row = partition.rows[i];
1098 if self.compare_rows_for_rank(prev_row, row_index) < 0 {
1099 rows_before += 1;
1100 }
1101 }
1102
1103 let rank = rows_before + 1;
1104 return rank;
1105 }
1106 }
1107 }
1108 1 }
1110
1111 pub fn get_dense_rank(&self, row_index: usize) -> i64 {
1114 if let Some(partition_key) = self.row_to_partition.get(&row_index) {
1115 if let Some(partition) = self.partitions.get(partition_key) {
1116 if let Some(position) = partition.get_position(row_index) {
1117 let mut dense_rank = 1;
1118 let mut last_value_seen = None;
1119
1120 for i in 0..position {
1122 let prev_row = partition.rows[i];
1123 let cmp = self.compare_rows_for_rank(prev_row, row_index);
1124
1125 if cmp < 0 {
1126 if last_value_seen.is_none()
1128 || last_value_seen.map_or(true, |last| {
1129 self.compare_rows_for_rank(last, prev_row) != 0
1130 })
1131 {
1132 dense_rank += 1;
1133 last_value_seen = Some(prev_row);
1134 }
1135 }
1136 }
1137
1138 return dense_rank;
1139 }
1140 }
1141 }
1142 1 }
1144
1145 fn compare_rows_for_rank(&self, row1: usize, row2: usize) -> i32 {
1148 let source_table = self.source.source();
1149
1150 for order_item in &self.spec.order_by {
1151 if let SqlExpression::Column(col) = &order_item.expr {
1152 if let Some(col_idx) = source_table.get_column_index(&col.name) {
1153 let val1 = source_table.get_value(row1, col_idx);
1154 let val2 = source_table.get_value(row2, col_idx);
1155
1156 let cmp = match (val1, val2) {
1157 (Some(v1), Some(v2)) => {
1158 crate::data::datavalue_compare::compare_datavalues(v1, v2)
1159 }
1160 (None, Some(_)) => std::cmp::Ordering::Less,
1161 (Some(_), None) => std::cmp::Ordering::Greater,
1162 (None, None) => std::cmp::Ordering::Equal,
1163 };
1164
1165 if cmp != std::cmp::Ordering::Equal {
1166 let result = match order_item.direction {
1167 SortDirection::Asc => cmp,
1168 SortDirection::Desc => cmp.reverse(),
1169 };
1170
1171 return match result {
1172 std::cmp::Ordering::Less => -1,
1173 std::cmp::Ordering::Equal => 0,
1174 std::cmp::Ordering::Greater => 1,
1175 };
1176 }
1177 }
1178 }
1179 }
1180
1181 0 }
1183
1184 pub fn evaluate_rank_batch(&self, visible_rows: &[usize]) -> Result<Vec<DataValue>> {
1186 let start = Instant::now();
1187 let mut results = Vec::with_capacity(visible_rows.len());
1188
1189 for &row_idx in visible_rows {
1190 let rank = self.get_rank(row_idx);
1191 results.push(DataValue::Integer(rank));
1192 }
1193
1194 debug!(
1195 "evaluate_rank_batch: {} rows in {:.3}ms ({:.2}μs/row)",
1196 visible_rows.len(),
1197 start.elapsed().as_secs_f64() * 1000.0,
1198 start.elapsed().as_micros() as f64 / visible_rows.len() as f64
1199 );
1200
1201 Ok(results)
1202 }
1203
1204 pub fn evaluate_dense_rank_batch(&self, visible_rows: &[usize]) -> Result<Vec<DataValue>> {
1206 let start = Instant::now();
1207 let mut results = Vec::with_capacity(visible_rows.len());
1208
1209 for &row_idx in visible_rows {
1210 let rank = self.get_dense_rank(row_idx);
1211 results.push(DataValue::Integer(rank));
1212 }
1213
1214 debug!(
1215 "evaluate_dense_rank_batch: {} rows in {:.3}ms ({:.2}μs/row)",
1216 visible_rows.len(),
1217 start.elapsed().as_secs_f64() * 1000.0,
1218 start.elapsed().as_micros() as f64 / visible_rows.len() as f64
1219 );
1220
1221 Ok(results)
1222 }
1223
1224 pub fn evaluate_sum_batch(
1226 &self,
1227 visible_rows: &[usize],
1228 column_name: &str,
1229 ) -> Result<Vec<DataValue>> {
1230 let start = Instant::now();
1231 let mut results = Vec::with_capacity(visible_rows.len());
1232
1233 let is_running_aggregate = self.is_running_aggregate_frame();
1235
1236 if is_running_aggregate && !visible_rows.is_empty() {
1237 debug!(
1239 "Using optimized running sum for {} rows",
1240 visible_rows.len()
1241 );
1242
1243 let mut partition_groups: BTreeMap<PartitionKey, Vec<(usize, usize)>> = BTreeMap::new();
1245 for (idx, &row_idx) in visible_rows.iter().enumerate() {
1246 if let Some(partition_key) = self.row_to_partition.get(&row_idx) {
1247 partition_groups
1248 .entry(partition_key.clone())
1249 .or_insert_with(Vec::new)
1250 .push((idx, row_idx));
1251 }
1252 }
1253
1254 let source_table = self.source.source();
1255 let col_idx = source_table
1256 .get_column_index(column_name)
1257 .ok_or_else(|| anyhow!("Column '{}' not found", column_name))?;
1258
1259 results.resize(visible_rows.len(), DataValue::Null);
1261
1262 for (_partition_key, rows) in partition_groups {
1264 if let Some(partition) = self.partitions.get(&_partition_key) {
1265 let mut running_sum = 0.0;
1266 let mut has_float = false;
1267 let mut position_to_sum: HashMap<usize, DataValue> = HashMap::new();
1268
1269 for (pos, &row_idx) in partition.rows.iter().enumerate() {
1271 if let Some(value) = source_table.get_value(row_idx, col_idx) {
1272 match value {
1273 DataValue::Integer(i) => {
1274 running_sum += *i as f64;
1275 }
1276 DataValue::Float(f) => {
1277 running_sum += f;
1278 has_float = true;
1279 }
1280 DataValue::Null => {
1281 }
1283 _ => {
1284 }
1286 }
1287 }
1288
1289 if !has_float
1291 && running_sum.fract() == 0.0
1292 && running_sum >= i64::MIN as f64
1293 && running_sum <= i64::MAX as f64
1294 {
1295 position_to_sum.insert(pos, DataValue::Integer(running_sum as i64));
1296 } else {
1297 position_to_sum.insert(pos, DataValue::Float(running_sum));
1298 }
1299 }
1300
1301 for (result_idx, row_idx) in rows {
1303 if let Some(pos) = partition.get_position(row_idx) {
1304 results[result_idx] = position_to_sum
1305 .get(&pos)
1306 .cloned()
1307 .unwrap_or(DataValue::Null);
1308 }
1309 }
1310 }
1311 }
1312 } else {
1313 for &row_idx in visible_rows {
1315 let value = if self.has_frame() {
1316 self.get_frame_sum(row_idx, column_name)
1317 .unwrap_or(DataValue::Null)
1318 } else {
1319 self.get_partition_sum(row_idx, column_name)
1320 .unwrap_or(DataValue::Null)
1321 };
1322 results.push(value);
1323 }
1324 }
1325
1326 debug!(
1327 "evaluate_sum_batch: {} rows in {:.3}ms ({:.2}μs/row)",
1328 visible_rows.len(),
1329 start.elapsed().as_secs_f64() * 1000.0,
1330 start.elapsed().as_micros() as f64 / visible_rows.len() as f64
1331 );
1332
1333 Ok(results)
1334 }
1335
1336 pub fn evaluate_avg_batch(
1338 &self,
1339 visible_rows: &[usize],
1340 column_name: &str,
1341 ) -> Result<Vec<DataValue>> {
1342 let start = Instant::now();
1343 let mut results = Vec::with_capacity(visible_rows.len());
1344
1345 let is_running_aggregate = self.is_running_aggregate_frame();
1347
1348 if is_running_aggregate && !visible_rows.is_empty() {
1349 debug!(
1351 "Using optimized running average for {} rows",
1352 visible_rows.len()
1353 );
1354
1355 let mut partition_groups: BTreeMap<PartitionKey, Vec<(usize, usize)>> = BTreeMap::new();
1357 for (idx, &row_idx) in visible_rows.iter().enumerate() {
1358 if let Some(partition_key) = self.row_to_partition.get(&row_idx) {
1359 partition_groups
1360 .entry(partition_key.clone())
1361 .or_insert_with(Vec::new)
1362 .push((idx, row_idx));
1363 }
1364 }
1365
1366 let source_table = self.source.source();
1367 let col_idx = source_table
1368 .get_column_index(column_name)
1369 .ok_or_else(|| anyhow!("Column '{}' not found", column_name))?;
1370
1371 results.resize(visible_rows.len(), DataValue::Null);
1373
1374 for (_partition_key, rows) in partition_groups {
1376 if let Some(partition) = self.partitions.get(&_partition_key) {
1377 let mut running_sum = 0.0;
1378 let mut count = 0;
1379 let mut position_to_avg: HashMap<usize, DataValue> = HashMap::new();
1380
1381 for (pos, &row_idx) in partition.rows.iter().enumerate() {
1383 if let Some(value) = source_table.get_value(row_idx, col_idx) {
1384 match value {
1385 DataValue::Integer(i) => {
1386 running_sum += *i as f64;
1387 count += 1;
1388 }
1389 DataValue::Float(f) => {
1390 running_sum += f;
1391 count += 1;
1392 }
1393 DataValue::Null => {
1394 }
1396 _ => {
1397 }
1399 }
1400 }
1401
1402 if count > 0 {
1404 position_to_avg
1405 .insert(pos, DataValue::Float(running_sum / count as f64));
1406 } else {
1407 position_to_avg.insert(pos, DataValue::Null);
1408 }
1409 }
1410
1411 for (result_idx, row_idx) in rows {
1413 if let Some(pos) = partition.get_position(row_idx) {
1414 results[result_idx] = position_to_avg
1415 .get(&pos)
1416 .cloned()
1417 .unwrap_or(DataValue::Null);
1418 }
1419 }
1420 }
1421 }
1422 } else {
1423 for &row_idx in visible_rows {
1425 let value = if self.has_frame() {
1426 self.get_frame_avg(row_idx, column_name)
1427 .unwrap_or(DataValue::Null)
1428 } else {
1429 self.get_partition_avg(row_idx, column_name)
1430 .unwrap_or(DataValue::Null)
1431 };
1432 results.push(value);
1433 }
1434 }
1435
1436 debug!(
1437 "evaluate_avg_batch: {} rows in {:.3}ms ({:.2}μs/row)",
1438 visible_rows.len(),
1439 start.elapsed().as_secs_f64() * 1000.0,
1440 start.elapsed().as_micros() as f64 / visible_rows.len() as f64
1441 );
1442
1443 Ok(results)
1444 }
1445
1446 pub fn evaluate_min_batch(
1448 &self,
1449 visible_rows: &[usize],
1450 column_name: &str,
1451 ) -> Result<Vec<DataValue>> {
1452 let start = Instant::now();
1453 let mut results = Vec::with_capacity(visible_rows.len());
1454
1455 let is_running_aggregate = self.is_running_aggregate_frame();
1457
1458 if is_running_aggregate && !visible_rows.is_empty() {
1459 debug!(
1461 "Using optimized running minimum for {} rows",
1462 visible_rows.len()
1463 );
1464
1465 let mut partition_groups: BTreeMap<PartitionKey, Vec<(usize, usize)>> = BTreeMap::new();
1467 for (idx, &row_idx) in visible_rows.iter().enumerate() {
1468 if let Some(partition_key) = self.row_to_partition.get(&row_idx) {
1469 partition_groups
1470 .entry(partition_key.clone())
1471 .or_insert_with(Vec::new)
1472 .push((idx, row_idx));
1473 }
1474 }
1475
1476 let source_table = self.source.source();
1477 let col_idx = source_table
1478 .get_column_index(column_name)
1479 .ok_or_else(|| anyhow!("Column '{}' not found", column_name))?;
1480
1481 results.resize(visible_rows.len(), DataValue::Null);
1483
1484 for (_partition_key, rows) in partition_groups {
1486 if let Some(partition) = self.partitions.get(&_partition_key) {
1487 let mut running_min: Option<DataValue> = None;
1488 let mut position_to_min: HashMap<usize, DataValue> = HashMap::new();
1489
1490 for (pos, &row_idx) in partition.rows.iter().enumerate() {
1492 if let Some(value) = source_table.get_value(row_idx, col_idx) {
1493 if !matches!(value, DataValue::Null) {
1494 match &running_min {
1495 None => running_min = Some(value.clone()),
1496 Some(current_min) => {
1497 use crate::data::datavalue_compare::compare_datavalues;
1498 if compare_datavalues(value, current_min).is_lt() {
1499 running_min = Some(value.clone());
1500 }
1501 }
1502 }
1503 }
1504 }
1505
1506 position_to_min.insert(pos, running_min.clone().unwrap_or(DataValue::Null));
1508 }
1509
1510 for (result_idx, row_idx) in rows {
1512 if let Some(pos) = partition.get_position(row_idx) {
1513 results[result_idx] = position_to_min
1514 .get(&pos)
1515 .cloned()
1516 .unwrap_or(DataValue::Null);
1517 }
1518 }
1519 }
1520 }
1521 } else {
1522 for &row_idx in visible_rows {
1524 let value = if self.has_frame() {
1525 self.get_frame_min(row_idx, column_name)
1526 .unwrap_or(DataValue::Null)
1527 } else {
1528 self.get_partition_min(row_idx, column_name)
1529 .unwrap_or(DataValue::Null)
1530 };
1531 results.push(value);
1532 }
1533 }
1534
1535 debug!(
1536 "evaluate_min_batch: {} rows in {:.3}ms ({:.2}μs/row)",
1537 visible_rows.len(),
1538 start.elapsed().as_secs_f64() * 1000.0,
1539 start.elapsed().as_micros() as f64 / visible_rows.len() as f64
1540 );
1541
1542 Ok(results)
1543 }
1544
1545 pub fn evaluate_max_batch(
1547 &self,
1548 visible_rows: &[usize],
1549 column_name: &str,
1550 ) -> Result<Vec<DataValue>> {
1551 let start = Instant::now();
1552 let mut results = Vec::with_capacity(visible_rows.len());
1553
1554 let is_running_aggregate = self.is_running_aggregate_frame();
1556
1557 if is_running_aggregate && !visible_rows.is_empty() {
1558 debug!(
1560 "Using optimized running maximum for {} rows",
1561 visible_rows.len()
1562 );
1563
1564 let mut partition_groups: BTreeMap<PartitionKey, Vec<(usize, usize)>> = BTreeMap::new();
1566 for (idx, &row_idx) in visible_rows.iter().enumerate() {
1567 if let Some(partition_key) = self.row_to_partition.get(&row_idx) {
1568 partition_groups
1569 .entry(partition_key.clone())
1570 .or_insert_with(Vec::new)
1571 .push((idx, row_idx));
1572 }
1573 }
1574
1575 let source_table = self.source.source();
1576 let col_idx = source_table
1577 .get_column_index(column_name)
1578 .ok_or_else(|| anyhow!("Column '{}' not found", column_name))?;
1579
1580 results.resize(visible_rows.len(), DataValue::Null);
1582
1583 for (_partition_key, rows) in partition_groups {
1585 if let Some(partition) = self.partitions.get(&_partition_key) {
1586 let mut running_max: Option<DataValue> = None;
1587 let mut position_to_max: HashMap<usize, DataValue> = HashMap::new();
1588
1589 for (pos, &row_idx) in partition.rows.iter().enumerate() {
1591 if let Some(value) = source_table.get_value(row_idx, col_idx) {
1592 if !matches!(value, DataValue::Null) {
1593 match &running_max {
1594 None => running_max = Some(value.clone()),
1595 Some(current_max) => {
1596 use crate::data::datavalue_compare::compare_datavalues;
1597 if compare_datavalues(value, current_max).is_gt() {
1598 running_max = Some(value.clone());
1599 }
1600 }
1601 }
1602 }
1603 }
1604
1605 position_to_max.insert(pos, running_max.clone().unwrap_or(DataValue::Null));
1607 }
1608
1609 for (result_idx, row_idx) in rows {
1611 if let Some(pos) = partition.get_position(row_idx) {
1612 results[result_idx] = position_to_max
1613 .get(&pos)
1614 .cloned()
1615 .unwrap_or(DataValue::Null);
1616 }
1617 }
1618 }
1619 }
1620 } else {
1621 for &row_idx in visible_rows {
1623 let value = if self.has_frame() {
1624 self.get_frame_max(row_idx, column_name)
1625 .unwrap_or(DataValue::Null)
1626 } else {
1627 self.get_partition_max(row_idx, column_name)
1628 .unwrap_or(DataValue::Null)
1629 };
1630 results.push(value);
1631 }
1632 }
1633
1634 debug!(
1635 "evaluate_max_batch: {} rows in {:.3}ms ({:.2}μs/row)",
1636 visible_rows.len(),
1637 start.elapsed().as_secs_f64() * 1000.0,
1638 start.elapsed().as_micros() as f64 / visible_rows.len() as f64
1639 );
1640
1641 Ok(results)
1642 }
1643
1644 pub fn evaluate_count_batch(
1646 &self,
1647 visible_rows: &[usize],
1648 column_name: Option<&str>,
1649 ) -> Result<Vec<DataValue>> {
1650 let start = Instant::now();
1651 let mut results = Vec::with_capacity(visible_rows.len());
1652
1653 let is_running_aggregate = self.is_running_aggregate_frame();
1655
1656 if is_running_aggregate && !visible_rows.is_empty() {
1657 debug!(
1659 "Using optimized running count for {} rows",
1660 visible_rows.len()
1661 );
1662
1663 let mut partition_groups: BTreeMap<PartitionKey, Vec<(usize, usize)>> = BTreeMap::new();
1665 for (idx, &row_idx) in visible_rows.iter().enumerate() {
1666 if let Some(partition_key) = self.row_to_partition.get(&row_idx) {
1667 partition_groups
1668 .entry(partition_key.clone())
1669 .or_insert_with(Vec::new)
1670 .push((idx, row_idx));
1671 }
1672 }
1673
1674 let source_table = self.source.source();
1675 let col_idx = if let Some(col_name) = column_name {
1676 source_table.get_column_index(col_name)
1677 } else {
1678 None
1679 };
1680
1681 results.resize(visible_rows.len(), DataValue::Null);
1683
1684 for (_partition_key, rows) in partition_groups {
1686 if let Some(partition) = self.partitions.get(&_partition_key) {
1687 let mut running_count = 0i64;
1688 let mut position_to_count: HashMap<usize, DataValue> = HashMap::new();
1689
1690 for (pos, &row_idx) in partition.rows.iter().enumerate() {
1692 if let Some(col_idx) = col_idx {
1693 if let Some(value) = source_table.get_value(row_idx, col_idx) {
1695 if !matches!(value, DataValue::Null) {
1696 running_count += 1;
1697 }
1698 }
1699 } else {
1700 running_count += 1;
1702 }
1703
1704 position_to_count.insert(pos, DataValue::Integer(running_count));
1706 }
1707
1708 for (result_idx, row_idx) in rows {
1710 if let Some(pos) = partition.get_position(row_idx) {
1711 results[result_idx] = position_to_count
1712 .get(&pos)
1713 .cloned()
1714 .unwrap_or(DataValue::Null);
1715 }
1716 }
1717 }
1718 }
1719 } else {
1720 for &row_idx in visible_rows {
1722 let value = if self.has_frame() {
1723 self.get_frame_count(row_idx, column_name)
1724 .unwrap_or(DataValue::Null)
1725 } else {
1726 self.get_partition_count(row_idx, column_name)
1727 .unwrap_or(DataValue::Null)
1728 };
1729 results.push(value);
1730 }
1731 }
1732
1733 debug!(
1734 "evaluate_count_batch: {} rows in {:.3}ms ({:.2}μs/row)",
1735 visible_rows.len(),
1736 start.elapsed().as_secs_f64() * 1000.0,
1737 start.elapsed().as_micros() as f64 / visible_rows.len() as f64
1738 );
1739
1740 Ok(results)
1741 }
1742
1743 pub fn evaluate_first_value_batch(
1745 &self,
1746 visible_rows: &[usize],
1747 column_name: &str,
1748 ) -> Result<Vec<DataValue>> {
1749 let start = Instant::now();
1750 let mut results = Vec::with_capacity(visible_rows.len());
1751
1752 for &row_idx in visible_rows {
1753 let value = if self.has_frame() {
1754 self.get_frame_first_value(row_idx, column_name)
1755 .unwrap_or(DataValue::Null)
1756 } else {
1757 self.get_first_value(row_idx, column_name)
1758 .unwrap_or(DataValue::Null)
1759 };
1760 results.push(value);
1761 }
1762
1763 debug!(
1764 "evaluate_first_value_batch: {} rows in {:.3}ms ({:.2}μs/row)",
1765 visible_rows.len(),
1766 start.elapsed().as_secs_f64() * 1000.0,
1767 start.elapsed().as_micros() as f64 / visible_rows.len() as f64
1768 );
1769
1770 Ok(results)
1771 }
1772
1773 pub fn evaluate_last_value_batch(
1775 &self,
1776 visible_rows: &[usize],
1777 column_name: &str,
1778 ) -> Result<Vec<DataValue>> {
1779 let start = Instant::now();
1780 let mut results = Vec::with_capacity(visible_rows.len());
1781
1782 for &row_idx in visible_rows {
1783 let value = if self.has_frame() {
1784 self.get_frame_last_value(row_idx, column_name)
1785 .unwrap_or(DataValue::Null)
1786 } else {
1787 self.get_last_value(row_idx, column_name)
1788 .unwrap_or(DataValue::Null)
1789 };
1790 results.push(value);
1791 }
1792
1793 debug!(
1794 "evaluate_last_value_batch: {} rows in {:.3}ms ({:.2}μs/row)",
1795 visible_rows.len(),
1796 start.elapsed().as_secs_f64() * 1000.0,
1797 start.elapsed().as_micros() as f64 / visible_rows.len() as f64
1798 );
1799
1800 Ok(results)
1801 }
1802}