1use std::collections::{BTreeMap, HashMap};
7use std::sync::Arc;
8use std::time::Instant;
9
10use anyhow::{anyhow, Result};
11use tracing::{debug, info};
12
13use crate::data::data_view::DataView;
14use crate::data::datatable::{DataTable, DataValue};
15use crate::sql::parser::ast::{
16 FrameBound, FrameUnit, OrderByItem, SortDirection, SqlExpression, WindowSpec,
17};
18
19#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
22struct PartitionKey(String);
23
24impl PartitionKey {
25 fn from_values(values: Vec<DataValue>) -> Self {
27 let key_parts: Vec<String> = values
29 .iter()
30 .map(|v| match v {
31 DataValue::String(s) => format!("S:{}", s),
32 DataValue::InternedString(s) => format!("S:{}", s),
33 DataValue::Integer(i) => format!("I:{}", i),
34 DataValue::Float(f) => format!("F:{}", f),
35 DataValue::Boolean(b) => format!("B:{}", b),
36 DataValue::DateTime(dt) => format!("D:{}", dt),
37 DataValue::Vector(v) => {
38 let components: Vec<String> = v.iter().map(|f| f.to_string()).collect();
39 format!("V:[{}]", components.join(","))
40 }
41 DataValue::Null => "N".to_string(),
42 })
43 .collect();
44 let key = key_parts.join("|");
45 PartitionKey(key)
46 }
47}
48
49#[derive(Debug, Clone)]
51pub struct OrderedPartition {
52 rows: Vec<usize>,
54
55 row_positions: HashMap<usize, usize>,
57}
58
59impl OrderedPartition {
60 fn new(rows: Vec<usize>) -> Self {
62 let row_positions: HashMap<usize, usize> = rows
64 .iter()
65 .enumerate()
66 .map(|(pos, &row_idx)| (row_idx, pos))
67 .collect();
68
69 Self {
70 rows,
71 row_positions,
72 }
73 }
74
75 pub fn get_row_at_offset(&self, current_row: usize, offset: i32) -> Option<usize> {
77 let current_pos = self.row_positions.get(¤t_row)?;
78 let target_pos = (*current_pos as i32) + offset;
79
80 if target_pos >= 0 && target_pos < self.rows.len() as i32 {
81 Some(self.rows[target_pos as usize])
82 } else {
83 None
84 }
85 }
86
87 pub fn get_position(&self, row_index: usize) -> Option<usize> {
89 self.row_positions.get(&row_index).copied()
90 }
91
92 pub fn first_row(&self) -> Option<usize> {
94 self.rows.first().copied()
95 }
96
97 pub fn last_row(&self) -> Option<usize> {
99 self.rows.last().copied()
100 }
101}
102
103pub struct WindowContext {
105 source: Arc<DataView>,
107
108 partitions: BTreeMap<PartitionKey, OrderedPartition>,
110
111 row_to_partition: HashMap<usize, PartitionKey>,
113
114 spec: WindowSpec,
116}
117
118impl WindowContext {
119 pub fn new(
121 view: Arc<DataView>,
122 partition_by: Vec<String>,
123 order_by: Vec<OrderByItem>,
124 ) -> Result<Self> {
125 Self::new_with_spec(
126 view,
127 WindowSpec {
128 partition_by,
129 order_by,
130 frame: None,
131 },
132 )
133 }
134
135 pub fn new_with_spec(view: Arc<DataView>, spec: WindowSpec) -> Result<Self> {
137 let overall_start = Instant::now();
138 let partition_by = spec.partition_by.clone();
139 let order_by = spec.order_by.clone();
140 let row_count = view.row_count();
141
142 if partition_by.is_empty() {
144 info!(
145 "Creating single partition (no PARTITION BY) for {} rows",
146 row_count
147 );
148 let partition_start = Instant::now();
149
150 let single_partition = Self::create_single_partition(&view, &order_by)?;
151 let partition_key = PartitionKey::from_values(vec![]);
152
153 let mut row_to_partition = HashMap::new();
155 for &row_idx in &single_partition.rows {
156 row_to_partition.insert(row_idx, partition_key.clone());
157 }
158
159 let mut partitions = BTreeMap::new();
160 partitions.insert(partition_key, single_partition);
161
162 info!(
163 "Single partition created in {:.2}ms (1 partition, {} rows)",
164 partition_start.elapsed().as_secs_f64() * 1000.0,
165 row_count
166 );
167
168 info!(
169 "WindowContext::new_with_spec (single partition) took {:.2}ms total",
170 overall_start.elapsed().as_secs_f64() * 1000.0
171 );
172
173 return Ok(Self {
174 source: view,
175 partitions,
176 row_to_partition,
177 spec,
178 });
179 }
180
181 info!(
183 "Creating partitions with PARTITION BY for {} rows",
184 row_count
185 );
186 let partition_start = Instant::now();
187
188 let mut partition_map: BTreeMap<PartitionKey, Vec<usize>> = BTreeMap::new();
189 let mut row_to_partition = HashMap::new();
190
191 let source_table = view.source();
193 let partition_col_indices: Vec<usize> = partition_by
194 .iter()
195 .map(|col| {
196 source_table
198 .get_column_index(col)
199 .or_else(|| source_table.find_column_by_qualified_name(col))
200 .or_else(|| {
201 if let Some(dot_pos) = col.find('.') {
203 let table = &col[..dot_pos];
204 let column = &col[dot_pos + 1..];
205 source_table.find_column_flexible(column, Some(table))
206 } else {
207 None
208 }
209 })
210 .ok_or_else(|| anyhow!("Invalid partition column: {}", col))
211 })
212 .collect::<Result<Vec<_>>>()?;
213
214 let grouping_start = Instant::now();
216 for row_idx in view.get_visible_rows() {
217 let mut key_values = Vec::new();
219 for &col_idx in &partition_col_indices {
220 let value = source_table
221 .get_value(row_idx, col_idx)
222 .ok_or_else(|| anyhow!("Failed to get value for partition"))?
223 .clone();
224 key_values.push(value);
225 }
226 let key = PartitionKey::from_values(key_values);
227
228 partition_map.entry(key.clone()).or_default().push(row_idx);
230 row_to_partition.insert(row_idx, key);
231 }
232
233 info!(
234 "Partition grouping took {:.2}ms ({} partitions created)",
235 grouping_start.elapsed().as_secs_f64() * 1000.0,
236 partition_map.len()
237 );
238
239 let sort_start = Instant::now();
241 let mut partitions = BTreeMap::new();
242 let partition_count = partition_map.len();
243
244 for (key, mut rows) in partition_map {
245 if !order_by.is_empty() {
247 Self::sort_rows(&mut rows, source_table, &order_by)?;
248 }
249
250 partitions.insert(key, OrderedPartition::new(rows));
251 }
252
253 info!(
254 "Partition sorting took {:.2}ms ({} partitions, ORDER BY: {})",
255 sort_start.elapsed().as_secs_f64() * 1000.0,
256 partition_count,
257 !order_by.is_empty()
258 );
259
260 info!(
261 "Total partition creation took {:.2}ms",
262 partition_start.elapsed().as_secs_f64() * 1000.0
263 );
264
265 info!(
266 "WindowContext::new_with_spec (multi-partition) took {:.2}ms total",
267 overall_start.elapsed().as_secs_f64() * 1000.0
268 );
269
270 Ok(Self {
271 source: view,
272 partitions,
273 row_to_partition,
274 spec,
275 })
276 }
277
278 fn create_single_partition(
280 view: &DataView,
281 order_by: &[OrderByItem],
282 ) -> Result<OrderedPartition> {
283 let mut rows: Vec<usize> = view.get_visible_rows();
284
285 if !order_by.is_empty() {
286 let sort_start = Instant::now();
287 Self::sort_rows(&mut rows, view.source(), order_by)?;
288 debug!(
289 "Single partition sort took {:.2}ms ({} rows)",
290 sort_start.elapsed().as_secs_f64() * 1000.0,
291 rows.len()
292 );
293 }
294
295 Ok(OrderedPartition::new(rows))
296 }
297
298 fn sort_rows(rows: &mut Vec<usize>, table: &DataTable, order_by: &[OrderByItem]) -> Result<()> {
300 let prep_start = Instant::now();
301
302 let sort_cols: Vec<(usize, bool)> = order_by
304 .iter()
305 .map(|col| {
306 let column_name = match &col.expr {
308 SqlExpression::Column(col_ref) => &col_ref.name,
309 _ => {
310 return Err(anyhow!("Window function ORDER BY only supports simple columns, not expressions"));
311 }
312 };
313 let idx = table
314 .get_column_index(column_name)
315 .ok_or_else(|| anyhow!("Invalid ORDER BY column: {}", column_name))?;
316 let ascending = matches!(col.direction, SortDirection::Asc);
317 Ok((idx, ascending))
318 })
319 .collect::<Result<Vec<_>>>()?;
320
321 debug!(
322 "Sort preparation took {:.2}μs ({} sort columns)",
323 prep_start.elapsed().as_micros(),
324 sort_cols.len()
325 );
326
327 let sort_start = Instant::now();
328
329 rows.sort_by(|&a, &b| {
331 for &(col_idx, ascending) in &sort_cols {
332 let val_a = table.get_value(a, col_idx);
333 let val_b = table.get_value(b, col_idx);
334
335 match (val_a, val_b) {
336 (None, None) => continue,
337 (None, Some(_)) => {
338 return if ascending {
339 std::cmp::Ordering::Less
340 } else {
341 std::cmp::Ordering::Greater
342 }
343 }
344 (Some(_), None) => {
345 return if ascending {
346 std::cmp::Ordering::Greater
347 } else {
348 std::cmp::Ordering::Less
349 }
350 }
351 (Some(v_a), Some(v_b)) => {
352 let ord = v_a.partial_cmp(&v_b).unwrap_or(std::cmp::Ordering::Equal);
354 if ord != std::cmp::Ordering::Equal {
355 return if ascending { ord } else { ord.reverse() };
356 }
357 }
358 }
359 }
360 std::cmp::Ordering::Equal
361 });
362
363 debug!(
364 "Actual sort operation took {:.2}μs ({} rows)",
365 sort_start.elapsed().as_micros(),
366 rows.len()
367 );
368
369 Ok(())
370 }
371
372 pub fn get_offset_value(
374 &self,
375 current_row: usize,
376 offset: i32,
377 column: &str,
378 ) -> Option<DataValue> {
379 let start = Instant::now();
382
383 let partition_lookup_start = Instant::now();
385 let partition_key = self.row_to_partition.get(¤t_row)?;
386 let partition = self.partitions.get(partition_key)?;
387 let partition_lookup_time = partition_lookup_start.elapsed();
388
389 let offset_nav_start = Instant::now();
391 let target_row = partition.get_row_at_offset(current_row, offset)?;
392 let offset_nav_time = offset_nav_start.elapsed();
393
394 let value_access_start = Instant::now();
396 let source_table = self.source.source();
397 let col_idx = source_table.get_column_index(column)?;
398 let value = source_table.get_value(target_row, col_idx).cloned();
399 let value_access_time = value_access_start.elapsed();
400
401 let total_time = start.elapsed();
403 if total_time.as_micros() > 10 {
404 debug!(
405 "get_offset_value slow: total={:.2}μs, partition_lookup={:.2}μs, offset_nav={:.2}μs, value_access={:.2}μs",
406 total_time.as_micros(),
407 partition_lookup_time.as_micros(),
408 offset_nav_time.as_micros(),
409 value_access_time.as_micros()
410 );
411 }
412
413 value
414 }
415
416 pub fn get_row_number(&self, row_index: usize) -> usize {
418 if let Some(partition_key) = self.row_to_partition.get(&row_index) {
419 if let Some(partition) = self.partitions.get(partition_key) {
420 if let Some(position) = partition.get_position(row_index) {
421 return position + 1; }
423 }
424 }
425 0 }
427
428 pub fn get_frame_first_value(&self, row_index: usize, column: &str) -> Option<DataValue> {
430 let frame_rows = self.get_frame_rows(row_index);
431 if frame_rows.is_empty() {
432 return Some(DataValue::Null);
433 }
434
435 let source_table = self.source.source();
436 let col_idx = source_table.get_column_index(column)?;
437
438 let first_row = frame_rows[0];
440 source_table.get_value(first_row, col_idx).cloned()
441 }
442
443 pub fn get_frame_last_value(&self, row_index: usize, column: &str) -> Option<DataValue> {
445 let frame_rows = self.get_frame_rows(row_index);
446 if frame_rows.is_empty() {
447 return Some(DataValue::Null);
448 }
449
450 let source_table = self.source.source();
451 let col_idx = source_table.get_column_index(column)?;
452
453 let last_row = frame_rows[frame_rows.len() - 1];
455 source_table.get_value(last_row, col_idx).cloned()
456 }
457
458 pub fn get_first_value(&self, row_index: usize, column: &str) -> Option<DataValue> {
460 let partition_key = self.row_to_partition.get(&row_index)?;
461 let partition = self.partitions.get(partition_key)?;
462 let first_row = partition.first_row()?;
463
464 let source_table = self.source.source();
465 let col_idx = source_table.get_column_index(column)?;
466 source_table.get_value(first_row, col_idx).cloned()
467 }
468
469 pub fn get_last_value(&self, row_index: usize, column: &str) -> Option<DataValue> {
471 let partition_key = self.row_to_partition.get(&row_index)?;
472 let partition = self.partitions.get(partition_key)?;
473 let last_row = partition.last_row()?;
474
475 let source_table = self.source.source();
476 let col_idx = source_table.get_column_index(column)?;
477 source_table.get_value(last_row, col_idx).cloned()
478 }
479
480 pub fn partition_count(&self) -> usize {
482 self.partitions.len()
483 }
484
485 pub fn has_partitions(&self) -> bool {
487 !self.spec.partition_by.is_empty()
488 }
489
490 pub fn has_frame(&self) -> bool {
492 self.spec.frame.is_some()
493 }
494
495 pub fn is_running_aggregate_frame(&self) -> bool {
497 if let Some(frame) = &self.spec.frame {
498 matches!(frame.start, FrameBound::UnboundedPreceding) && frame.end.is_none()
499 } else {
501 false
502 }
503 }
504
505 pub fn source(&self) -> &DataTable {
507 self.source.source()
508 }
509
510 pub fn get_frame_rows(&self, row_index: usize) -> Vec<usize> {
512 let partition_key = match self.row_to_partition.get(&row_index) {
514 Some(key) => key,
515 None => return vec![],
516 };
517
518 let partition = match self.partitions.get(partition_key) {
519 Some(p) => p,
520 None => return vec![],
521 };
522
523 let current_pos = match partition.get_position(row_index) {
525 Some(pos) => pos as i64,
526 None => return vec![],
527 };
528
529 let frame = match &self.spec.frame {
531 Some(f) => f,
532 None => return partition.rows.clone(),
533 };
534
535 let (start_pos, end_pos) = match frame.unit {
537 FrameUnit::Rows => {
538 let start =
540 self.calculate_frame_position(&frame.start, current_pos, partition.rows.len());
541 let end = match &frame.end {
542 Some(bound) => {
543 self.calculate_frame_position(bound, current_pos, partition.rows.len())
544 }
545 None => current_pos, };
547 (start, end)
548 }
549 FrameUnit::Range => {
550 let start =
553 self.calculate_frame_position(&frame.start, current_pos, partition.rows.len());
554 let end = match &frame.end {
555 Some(bound) => {
556 self.calculate_frame_position(bound, current_pos, partition.rows.len())
557 }
558 None => current_pos,
559 };
560 (start, end)
561 }
562 };
563
564 let mut frame_rows = Vec::new();
566 for i in start_pos..=end_pos {
567 if i >= 0 && (i as usize) < partition.rows.len() {
568 frame_rows.push(partition.rows[i as usize]);
569 }
570 }
571
572 frame_rows
573 }
574
575 fn calculate_frame_position(
577 &self,
578 bound: &FrameBound,
579 current_pos: i64,
580 partition_size: usize,
581 ) -> i64 {
582 match bound {
583 FrameBound::UnboundedPreceding => 0,
584 FrameBound::UnboundedFollowing => partition_size as i64 - 1,
585 FrameBound::CurrentRow => current_pos,
586 FrameBound::Preceding(n) => current_pos - n,
587 FrameBound::Following(n) => current_pos + n,
588 }
589 }
590
591 pub fn get_frame_sum(&self, row_index: usize, column: &str) -> Option<DataValue> {
593 let frame_rows = self.get_frame_rows(row_index);
594 if frame_rows.is_empty() {
595 return Some(DataValue::Null);
596 }
597
598 let source_table = self.source.source();
599 let col_idx = source_table.get_column_index(column)?;
600
601 let mut sum = 0.0;
602 let mut has_float = false;
603 let mut has_value = false;
604
605 for &row_idx in &frame_rows {
607 if let Some(value) = source_table.get_value(row_idx, col_idx) {
608 match value {
609 DataValue::Integer(i) => {
610 sum += *i as f64;
611 has_value = true;
612 }
613 DataValue::Float(f) => {
614 sum += f;
615 has_float = true;
616 has_value = true;
617 }
618 DataValue::Null => {
619 }
621 _ => {
622 return Some(DataValue::Null);
624 }
625 }
626 }
627 }
628
629 if !has_value {
630 return Some(DataValue::Null);
631 }
632
633 if !has_float && sum.fract() == 0.0 && sum >= i64::MIN as f64 && sum <= i64::MAX as f64 {
635 Some(DataValue::Integer(sum as i64))
636 } else {
637 Some(DataValue::Float(sum))
638 }
639 }
640
641 pub fn get_frame_count(&self, row_index: usize, column: Option<&str>) -> Option<DataValue> {
643 let frame_rows = self.get_frame_rows(row_index);
644 if frame_rows.is_empty() {
645 return Some(DataValue::Integer(0));
646 }
647
648 if let Some(col_name) = column {
649 let source_table = self.source.source();
651 let col_idx = source_table.get_column_index(col_name)?;
652
653 let count = frame_rows
654 .iter()
655 .filter_map(|&row_idx| source_table.get_value(row_idx, col_idx))
656 .filter(|v| !matches!(v, DataValue::Null))
657 .count();
658
659 Some(DataValue::Integer(count as i64))
660 } else {
661 Some(DataValue::Integer(frame_rows.len() as i64))
663 }
664 }
665
666 pub fn get_frame_avg(&self, row_index: usize, column: &str) -> Option<DataValue> {
668 let frame_rows = self.get_frame_rows(row_index);
669 if frame_rows.is_empty() {
670 return Some(DataValue::Null);
671 }
672
673 let source_table = self.source.source();
674 let col_idx = source_table.get_column_index(column)?;
675
676 let mut sum = 0.0;
677 let mut count = 0;
678
679 for &row_idx in &frame_rows {
681 if let Some(value) = source_table.get_value(row_idx, col_idx) {
682 match value {
683 DataValue::Integer(i) => {
684 sum += *i as f64;
685 count += 1;
686 }
687 DataValue::Float(f) => {
688 sum += f;
689 count += 1;
690 }
691 DataValue::Null => {
692 }
694 _ => {
695 return Some(DataValue::Null);
697 }
698 }
699 }
700 }
701
702 if count == 0 {
703 return Some(DataValue::Null);
704 }
705
706 Some(DataValue::Float(sum / count as f64))
707 }
708
709 pub fn get_frame_stddev(&self, row_index: usize, column: &str) -> Option<DataValue> {
711 let variance = self.get_frame_variance(row_index, column)?;
712 match variance {
713 DataValue::Float(v) => Some(DataValue::Float(v.sqrt())),
714 DataValue::Null => Some(DataValue::Null),
715 _ => Some(DataValue::Null),
716 }
717 }
718
719 pub fn get_frame_variance(&self, row_index: usize, column: &str) -> Option<DataValue> {
721 let frame_rows = self.get_frame_rows(row_index);
722 if frame_rows.is_empty() {
723 return Some(DataValue::Null);
724 }
725
726 let source_table = self.source.source();
727 let col_idx = source_table.get_column_index(column)?;
728
729 let mut values = Vec::new();
730
731 for &row_idx in &frame_rows {
733 if let Some(value) = source_table.get_value(row_idx, col_idx) {
734 match value {
735 DataValue::Integer(i) => values.push(*i as f64),
736 DataValue::Float(f) => values.push(*f),
737 DataValue::Null => {
738 }
740 _ => {
741 return Some(DataValue::Null);
743 }
744 }
745 }
746 }
747
748 if values.is_empty() {
749 return Some(DataValue::Null);
750 }
751
752 if values.len() == 1 {
753 return Some(DataValue::Float(0.0));
755 }
756
757 let mean = values.iter().sum::<f64>() / values.len() as f64;
759
760 let variance =
762 values.iter().map(|x| (x - mean).powi(2)).sum::<f64>() / (values.len() - 1) as f64;
763
764 Some(DataValue::Float(variance))
765 }
766
767 pub fn get_partition_sum(&self, row_index: usize, column: &str) -> Option<DataValue> {
769 let partition_key = self.row_to_partition.get(&row_index)?;
770 let partition = self.partitions.get(partition_key)?;
771 let source_table = self.source.source();
772 let col_idx = source_table.get_column_index(column)?;
773
774 let mut sum = 0.0;
775 let mut has_float = false;
776 let mut has_value = false;
777
778 for &row_idx in &partition.rows {
780 if let Some(value) = source_table.get_value(row_idx, col_idx) {
781 match value {
782 DataValue::Integer(i) => {
783 sum += *i as f64;
784 has_value = true;
785 }
786 DataValue::Float(f) => {
787 sum += f;
788 has_float = true;
789 has_value = true;
790 }
791 DataValue::Null => {
792 }
794 _ => {
795 return Some(DataValue::Null);
797 }
798 }
799 }
800 }
801
802 if !has_value {
803 return Some(DataValue::Null);
804 }
805
806 if !has_float && sum.fract() == 0.0 && sum >= i64::MIN as f64 && sum <= i64::MAX as f64 {
808 Some(DataValue::Integer(sum as i64))
809 } else {
810 Some(DataValue::Float(sum))
811 }
812 }
813
814 pub fn get_partition_count(&self, row_index: usize, column: Option<&str>) -> Option<DataValue> {
816 let partition_key = self.row_to_partition.get(&row_index)?;
817 let partition = self.partitions.get(partition_key)?;
818
819 if let Some(col_name) = column {
820 let source_table = self.source.source();
822 let col_idx = source_table.get_column_index(col_name)?;
823
824 let count = partition
825 .rows
826 .iter()
827 .filter_map(|&row_idx| source_table.get_value(row_idx, col_idx))
828 .filter(|v| !matches!(v, DataValue::Null))
829 .count();
830
831 Some(DataValue::Integer(count as i64))
832 } else {
833 Some(DataValue::Integer(partition.rows.len() as i64))
835 }
836 }
837
838 pub fn get_partition_avg(&self, row_index: usize, column: &str) -> Option<DataValue> {
840 let partition_key = self.row_to_partition.get(&row_index)?;
841 let partition = self.partitions.get(partition_key)?;
842 let source_table = self.source.source();
843 let col_idx = source_table.get_column_index(column)?;
844
845 let mut sum = 0.0;
846 let mut count = 0;
847
848 for &row_idx in &partition.rows {
850 if let Some(value) = source_table.get_value(row_idx, col_idx) {
851 match value {
852 DataValue::Integer(i) => {
853 sum += *i as f64;
854 count += 1;
855 }
856 DataValue::Float(f) => {
857 sum += f;
858 count += 1;
859 }
860 DataValue::Null => {
861 }
863 _ => {
864 return Some(DataValue::Null);
866 }
867 }
868 }
869 }
870
871 if count == 0 {
872 Some(DataValue::Null)
873 } else {
874 Some(DataValue::Float(sum / count as f64))
875 }
876 }
877
878 pub fn get_partition_min(&self, row_index: usize, column: &str) -> Option<DataValue> {
880 let partition_key = self.row_to_partition.get(&row_index)?;
881 let partition = self.partitions.get(partition_key)?;
882 let source_table = self.source.source();
883 let col_idx = source_table.get_column_index(column)?;
884
885 let mut min_value: Option<DataValue> = None;
886
887 for &row_idx in &partition.rows {
888 if let Some(value) = source_table.get_value(row_idx, col_idx) {
889 if !matches!(value, DataValue::Null) {
890 match &min_value {
891 None => min_value = Some(value.clone()),
892 Some(current_min) => {
893 use crate::data::datavalue_compare::compare_datavalues;
894 if compare_datavalues(value, current_min).is_lt() {
895 min_value = Some(value.clone());
896 }
897 }
898 }
899 }
900 }
901 }
902
903 min_value.or(Some(DataValue::Null))
904 }
905
906 pub fn get_partition_max(&self, row_index: usize, column: &str) -> Option<DataValue> {
908 let partition_key = self.row_to_partition.get(&row_index)?;
909 let partition = self.partitions.get(partition_key)?;
910 let source_table = self.source.source();
911 let col_idx = source_table.get_column_index(column)?;
912
913 let mut max_value: Option<DataValue> = None;
914
915 for &row_idx in &partition.rows {
916 if let Some(value) = source_table.get_value(row_idx, col_idx) {
917 if !matches!(value, DataValue::Null) {
918 match &max_value {
919 None => max_value = Some(value.clone()),
920 Some(current_max) => {
921 use crate::data::datavalue_compare::compare_datavalues;
922 if compare_datavalues(value, current_max).is_gt() {
923 max_value = Some(value.clone());
924 }
925 }
926 }
927 }
928 }
929 }
930
931 max_value.or(Some(DataValue::Null))
932 }
933
934 pub fn get_partition_rows(&self, row_index: usize) -> Vec<usize> {
936 if let Some(partition_key) = self.row_to_partition.get(&row_index) {
937 if let Some(partition) = self.partitions.get(partition_key) {
938 return partition.rows.clone();
939 }
940 }
941 vec![]
942 }
943
944 pub fn get_frame_min(&self, row_index: usize, column: &str) -> Option<DataValue> {
946 let frame_rows = self.get_frame_rows(row_index);
947 let source_table = self.source.source();
948 let col_idx = source_table.get_column_index(column)?;
949
950 let mut min_value: Option<DataValue> = None;
951
952 for &frame_row_idx in &frame_rows {
953 if let Some(value) = source_table.get_value(frame_row_idx, col_idx) {
954 if !matches!(value, DataValue::Null) {
955 match &min_value {
956 None => min_value = Some(value.clone()),
957 Some(current_min) => {
958 use crate::data::datavalue_compare::compare_datavalues;
959 if compare_datavalues(value, current_min).is_lt() {
960 min_value = Some(value.clone());
961 }
962 }
963 }
964 }
965 }
966 }
967
968 min_value.or(Some(DataValue::Null))
969 }
970
971 pub fn get_frame_max(&self, row_index: usize, column: &str) -> Option<DataValue> {
973 let frame_rows = self.get_frame_rows(row_index);
974 let source_table = self.source.source();
975 let col_idx = source_table.get_column_index(column)?;
976
977 let mut max_value: Option<DataValue> = None;
978
979 for &frame_row_idx in &frame_rows {
980 if let Some(value) = source_table.get_value(frame_row_idx, col_idx) {
981 if !matches!(value, DataValue::Null) {
982 match &max_value {
983 None => max_value = Some(value.clone()),
984 Some(current_max) => {
985 use crate::data::datavalue_compare::compare_datavalues;
986 if compare_datavalues(value, current_max).is_gt() {
987 max_value = Some(value.clone());
988 }
989 }
990 }
991 }
992 }
993 }
994
995 max_value.or(Some(DataValue::Null))
996 }
997
998 pub fn evaluate_lag_batch(
1001 &self,
1002 visible_rows: &[usize],
1003 column_name: &str,
1004 offset: i64,
1005 ) -> Result<Vec<DataValue>> {
1006 let start = Instant::now();
1007 let mut results = Vec::with_capacity(visible_rows.len());
1008
1009 let source_table = self.source.source();
1011 let _ = source_table
1012 .get_column_index(column_name)
1013 .ok_or_else(|| anyhow!("Column '{}' not found", column_name))?;
1014
1015 for &row_idx in visible_rows {
1016 let value =
1018 if let Some(val) = self.get_offset_value(row_idx, -(offset as i32), column_name) {
1019 val
1020 } else {
1021 DataValue::Null
1022 };
1023 results.push(value);
1024 }
1025
1026 debug!(
1027 "evaluate_lag_batch: {} rows in {:.3}ms ({:.2}μs/row)",
1028 visible_rows.len(),
1029 start.elapsed().as_secs_f64() * 1000.0,
1030 start.elapsed().as_micros() as f64 / visible_rows.len() as f64
1031 );
1032
1033 Ok(results)
1034 }
1035
1036 pub fn evaluate_lead_batch(
1038 &self,
1039 visible_rows: &[usize],
1040 column_name: &str,
1041 offset: i64,
1042 ) -> Result<Vec<DataValue>> {
1043 let start = Instant::now();
1044 let mut results = Vec::with_capacity(visible_rows.len());
1045
1046 let source_table = self.source.source();
1048 let _ = source_table
1049 .get_column_index(column_name)
1050 .ok_or_else(|| anyhow!("Column '{}' not found", column_name))?;
1051
1052 for &row_idx in visible_rows {
1053 let value =
1055 if let Some(val) = self.get_offset_value(row_idx, offset as i32, column_name) {
1056 val
1057 } else {
1058 DataValue::Null
1059 };
1060 results.push(value);
1061 }
1062
1063 debug!(
1064 "evaluate_lead_batch: {} rows in {:.3}ms ({:.2}μs/row)",
1065 visible_rows.len(),
1066 start.elapsed().as_secs_f64() * 1000.0,
1067 start.elapsed().as_micros() as f64 / visible_rows.len() as f64
1068 );
1069
1070 Ok(results)
1071 }
1072
1073 pub fn evaluate_row_number_batch(&self, visible_rows: &[usize]) -> Result<Vec<DataValue>> {
1075 let start = Instant::now();
1076 let mut results = Vec::with_capacity(visible_rows.len());
1077
1078 for &row_idx in visible_rows {
1079 let row_num = self.get_row_number(row_idx);
1080 if row_num > 0 {
1081 results.push(DataValue::Integer(row_num as i64));
1082 } else {
1083 results.push(DataValue::Null);
1085 }
1086 }
1087
1088 debug!(
1089 "evaluate_row_number_batch: {} rows in {:.3}ms ({:.2}μs/row)",
1090 visible_rows.len(),
1091 start.elapsed().as_secs_f64() * 1000.0,
1092 start.elapsed().as_micros() as f64 / visible_rows.len() as f64
1093 );
1094
1095 Ok(results)
1096 }
1097
1098 pub fn get_rank(&self, row_index: usize) -> i64 {
1101 if let Some(partition_key) = self.row_to_partition.get(&row_index) {
1102 if let Some(partition) = self.partitions.get(partition_key) {
1103 if let Some(position) = partition.get_position(row_index) {
1105 let mut rows_before = 0;
1106
1107 for i in 0..position {
1109 let prev_row = partition.rows[i];
1110 if self.compare_rows_for_rank(prev_row, row_index) < 0 {
1111 rows_before += 1;
1112 }
1113 }
1114
1115 let rank = rows_before + 1;
1116 return rank;
1117 }
1118 }
1119 }
1120 1 }
1122
1123 pub fn get_dense_rank(&self, row_index: usize) -> i64 {
1126 if let Some(partition_key) = self.row_to_partition.get(&row_index) {
1127 if let Some(partition) = self.partitions.get(partition_key) {
1128 if let Some(position) = partition.get_position(row_index) {
1129 let mut dense_rank = 1;
1130 let mut last_value_seen = None;
1131
1132 for i in 0..position {
1134 let prev_row = partition.rows[i];
1135 let cmp = self.compare_rows_for_rank(prev_row, row_index);
1136
1137 if cmp < 0 {
1138 if last_value_seen.is_none()
1140 || last_value_seen.map_or(true, |last| {
1141 self.compare_rows_for_rank(last, prev_row) != 0
1142 })
1143 {
1144 dense_rank += 1;
1145 last_value_seen = Some(prev_row);
1146 }
1147 }
1148 }
1149
1150 return dense_rank;
1151 }
1152 }
1153 }
1154 1 }
1156
1157 fn compare_rows_for_rank(&self, row1: usize, row2: usize) -> i32 {
1160 let source_table = self.source.source();
1161
1162 for order_item in &self.spec.order_by {
1163 if let SqlExpression::Column(col) = &order_item.expr {
1164 if let Some(col_idx) = source_table.get_column_index(&col.name) {
1165 let val1 = source_table.get_value(row1, col_idx);
1166 let val2 = source_table.get_value(row2, col_idx);
1167
1168 let cmp = match (val1, val2) {
1169 (Some(v1), Some(v2)) => {
1170 crate::data::datavalue_compare::compare_datavalues(v1, v2)
1171 }
1172 (None, Some(_)) => std::cmp::Ordering::Less,
1173 (Some(_), None) => std::cmp::Ordering::Greater,
1174 (None, None) => std::cmp::Ordering::Equal,
1175 };
1176
1177 if cmp != std::cmp::Ordering::Equal {
1178 let result = match order_item.direction {
1179 SortDirection::Asc => cmp,
1180 SortDirection::Desc => cmp.reverse(),
1181 };
1182
1183 return match result {
1184 std::cmp::Ordering::Less => -1,
1185 std::cmp::Ordering::Equal => 0,
1186 std::cmp::Ordering::Greater => 1,
1187 };
1188 }
1189 }
1190 }
1191 }
1192
1193 0 }
1195
1196 pub fn evaluate_rank_batch(&self, visible_rows: &[usize]) -> Result<Vec<DataValue>> {
1198 let start = Instant::now();
1199 let mut results = Vec::with_capacity(visible_rows.len());
1200
1201 for &row_idx in visible_rows {
1202 let rank = self.get_rank(row_idx);
1203 results.push(DataValue::Integer(rank));
1204 }
1205
1206 debug!(
1207 "evaluate_rank_batch: {} rows in {:.3}ms ({:.2}μs/row)",
1208 visible_rows.len(),
1209 start.elapsed().as_secs_f64() * 1000.0,
1210 start.elapsed().as_micros() as f64 / visible_rows.len() as f64
1211 );
1212
1213 Ok(results)
1214 }
1215
1216 pub fn evaluate_dense_rank_batch(&self, visible_rows: &[usize]) -> Result<Vec<DataValue>> {
1218 let start = Instant::now();
1219 let mut results = Vec::with_capacity(visible_rows.len());
1220
1221 for &row_idx in visible_rows {
1222 let rank = self.get_dense_rank(row_idx);
1223 results.push(DataValue::Integer(rank));
1224 }
1225
1226 debug!(
1227 "evaluate_dense_rank_batch: {} rows in {:.3}ms ({:.2}μs/row)",
1228 visible_rows.len(),
1229 start.elapsed().as_secs_f64() * 1000.0,
1230 start.elapsed().as_micros() as f64 / visible_rows.len() as f64
1231 );
1232
1233 Ok(results)
1234 }
1235
1236 pub fn evaluate_sum_batch(
1238 &self,
1239 visible_rows: &[usize],
1240 column_name: &str,
1241 ) -> Result<Vec<DataValue>> {
1242 let start = Instant::now();
1243 let mut results = Vec::with_capacity(visible_rows.len());
1244
1245 let is_running_aggregate = self.is_running_aggregate_frame();
1247
1248 if is_running_aggregate && !visible_rows.is_empty() {
1249 debug!(
1251 "Using optimized running sum for {} rows",
1252 visible_rows.len()
1253 );
1254
1255 let mut partition_groups: BTreeMap<PartitionKey, Vec<(usize, usize)>> = BTreeMap::new();
1257 for (idx, &row_idx) in visible_rows.iter().enumerate() {
1258 if let Some(partition_key) = self.row_to_partition.get(&row_idx) {
1259 partition_groups
1260 .entry(partition_key.clone())
1261 .or_insert_with(Vec::new)
1262 .push((idx, row_idx));
1263 }
1264 }
1265
1266 let source_table = self.source.source();
1267 let col_idx = source_table
1268 .get_column_index(column_name)
1269 .ok_or_else(|| anyhow!("Column '{}' not found", column_name))?;
1270
1271 results.resize(visible_rows.len(), DataValue::Null);
1273
1274 for (_partition_key, rows) in partition_groups {
1276 if let Some(partition) = self.partitions.get(&_partition_key) {
1277 let mut running_sum = 0.0;
1278 let mut has_float = false;
1279 let mut position_to_sum: HashMap<usize, DataValue> = HashMap::new();
1280
1281 for (pos, &row_idx) in partition.rows.iter().enumerate() {
1283 if let Some(value) = source_table.get_value(row_idx, col_idx) {
1284 match value {
1285 DataValue::Integer(i) => {
1286 running_sum += *i as f64;
1287 }
1288 DataValue::Float(f) => {
1289 running_sum += f;
1290 has_float = true;
1291 }
1292 DataValue::Null => {
1293 }
1295 _ => {
1296 }
1298 }
1299 }
1300
1301 if !has_float
1303 && running_sum.fract() == 0.0
1304 && running_sum >= i64::MIN as f64
1305 && running_sum <= i64::MAX as f64
1306 {
1307 position_to_sum.insert(pos, DataValue::Integer(running_sum as i64));
1308 } else {
1309 position_to_sum.insert(pos, DataValue::Float(running_sum));
1310 }
1311 }
1312
1313 for (result_idx, row_idx) in rows {
1315 if let Some(pos) = partition.get_position(row_idx) {
1316 results[result_idx] = position_to_sum
1317 .get(&pos)
1318 .cloned()
1319 .unwrap_or(DataValue::Null);
1320 }
1321 }
1322 }
1323 }
1324 } else {
1325 for &row_idx in visible_rows {
1327 let value = if self.has_frame() {
1328 self.get_frame_sum(row_idx, column_name)
1329 .unwrap_or(DataValue::Null)
1330 } else {
1331 self.get_partition_sum(row_idx, column_name)
1332 .unwrap_or(DataValue::Null)
1333 };
1334 results.push(value);
1335 }
1336 }
1337
1338 debug!(
1339 "evaluate_sum_batch: {} rows in {:.3}ms ({:.2}μs/row)",
1340 visible_rows.len(),
1341 start.elapsed().as_secs_f64() * 1000.0,
1342 start.elapsed().as_micros() as f64 / visible_rows.len() as f64
1343 );
1344
1345 Ok(results)
1346 }
1347
1348 pub fn evaluate_avg_batch(
1350 &self,
1351 visible_rows: &[usize],
1352 column_name: &str,
1353 ) -> Result<Vec<DataValue>> {
1354 let start = Instant::now();
1355 let mut results = Vec::with_capacity(visible_rows.len());
1356
1357 let is_running_aggregate = self.is_running_aggregate_frame();
1359
1360 if is_running_aggregate && !visible_rows.is_empty() {
1361 debug!(
1363 "Using optimized running average for {} rows",
1364 visible_rows.len()
1365 );
1366
1367 let mut partition_groups: BTreeMap<PartitionKey, Vec<(usize, usize)>> = BTreeMap::new();
1369 for (idx, &row_idx) in visible_rows.iter().enumerate() {
1370 if let Some(partition_key) = self.row_to_partition.get(&row_idx) {
1371 partition_groups
1372 .entry(partition_key.clone())
1373 .or_insert_with(Vec::new)
1374 .push((idx, row_idx));
1375 }
1376 }
1377
1378 let source_table = self.source.source();
1379 let col_idx = source_table
1380 .get_column_index(column_name)
1381 .ok_or_else(|| anyhow!("Column '{}' not found", column_name))?;
1382
1383 results.resize(visible_rows.len(), DataValue::Null);
1385
1386 for (_partition_key, rows) in partition_groups {
1388 if let Some(partition) = self.partitions.get(&_partition_key) {
1389 let mut running_sum = 0.0;
1390 let mut count = 0;
1391 let mut position_to_avg: HashMap<usize, DataValue> = HashMap::new();
1392
1393 for (pos, &row_idx) in partition.rows.iter().enumerate() {
1395 if let Some(value) = source_table.get_value(row_idx, col_idx) {
1396 match value {
1397 DataValue::Integer(i) => {
1398 running_sum += *i as f64;
1399 count += 1;
1400 }
1401 DataValue::Float(f) => {
1402 running_sum += f;
1403 count += 1;
1404 }
1405 DataValue::Null => {
1406 }
1408 _ => {
1409 }
1411 }
1412 }
1413
1414 if count > 0 {
1416 position_to_avg
1417 .insert(pos, DataValue::Float(running_sum / count as f64));
1418 } else {
1419 position_to_avg.insert(pos, DataValue::Null);
1420 }
1421 }
1422
1423 for (result_idx, row_idx) in rows {
1425 if let Some(pos) = partition.get_position(row_idx) {
1426 results[result_idx] = position_to_avg
1427 .get(&pos)
1428 .cloned()
1429 .unwrap_or(DataValue::Null);
1430 }
1431 }
1432 }
1433 }
1434 } else {
1435 for &row_idx in visible_rows {
1437 let value = if self.has_frame() {
1438 self.get_frame_avg(row_idx, column_name)
1439 .unwrap_or(DataValue::Null)
1440 } else {
1441 self.get_partition_avg(row_idx, column_name)
1442 .unwrap_or(DataValue::Null)
1443 };
1444 results.push(value);
1445 }
1446 }
1447
1448 debug!(
1449 "evaluate_avg_batch: {} rows in {:.3}ms ({:.2}μs/row)",
1450 visible_rows.len(),
1451 start.elapsed().as_secs_f64() * 1000.0,
1452 start.elapsed().as_micros() as f64 / visible_rows.len() as f64
1453 );
1454
1455 Ok(results)
1456 }
1457
1458 pub fn evaluate_min_batch(
1460 &self,
1461 visible_rows: &[usize],
1462 column_name: &str,
1463 ) -> Result<Vec<DataValue>> {
1464 let start = Instant::now();
1465 let mut results = Vec::with_capacity(visible_rows.len());
1466
1467 let is_running_aggregate = self.is_running_aggregate_frame();
1469
1470 if is_running_aggregate && !visible_rows.is_empty() {
1471 debug!(
1473 "Using optimized running minimum for {} rows",
1474 visible_rows.len()
1475 );
1476
1477 let mut partition_groups: BTreeMap<PartitionKey, Vec<(usize, usize)>> = BTreeMap::new();
1479 for (idx, &row_idx) in visible_rows.iter().enumerate() {
1480 if let Some(partition_key) = self.row_to_partition.get(&row_idx) {
1481 partition_groups
1482 .entry(partition_key.clone())
1483 .or_insert_with(Vec::new)
1484 .push((idx, row_idx));
1485 }
1486 }
1487
1488 let source_table = self.source.source();
1489 let col_idx = source_table
1490 .get_column_index(column_name)
1491 .ok_or_else(|| anyhow!("Column '{}' not found", column_name))?;
1492
1493 results.resize(visible_rows.len(), DataValue::Null);
1495
1496 for (_partition_key, rows) in partition_groups {
1498 if let Some(partition) = self.partitions.get(&_partition_key) {
1499 let mut running_min: Option<DataValue> = None;
1500 let mut position_to_min: HashMap<usize, DataValue> = HashMap::new();
1501
1502 for (pos, &row_idx) in partition.rows.iter().enumerate() {
1504 if let Some(value) = source_table.get_value(row_idx, col_idx) {
1505 if !matches!(value, DataValue::Null) {
1506 match &running_min {
1507 None => running_min = Some(value.clone()),
1508 Some(current_min) => {
1509 use crate::data::datavalue_compare::compare_datavalues;
1510 if compare_datavalues(value, current_min).is_lt() {
1511 running_min = Some(value.clone());
1512 }
1513 }
1514 }
1515 }
1516 }
1517
1518 position_to_min.insert(pos, running_min.clone().unwrap_or(DataValue::Null));
1520 }
1521
1522 for (result_idx, row_idx) in rows {
1524 if let Some(pos) = partition.get_position(row_idx) {
1525 results[result_idx] = position_to_min
1526 .get(&pos)
1527 .cloned()
1528 .unwrap_or(DataValue::Null);
1529 }
1530 }
1531 }
1532 }
1533 } else {
1534 for &row_idx in visible_rows {
1536 let value = if self.has_frame() {
1537 self.get_frame_min(row_idx, column_name)
1538 .unwrap_or(DataValue::Null)
1539 } else {
1540 self.get_partition_min(row_idx, column_name)
1541 .unwrap_or(DataValue::Null)
1542 };
1543 results.push(value);
1544 }
1545 }
1546
1547 debug!(
1548 "evaluate_min_batch: {} rows in {:.3}ms ({:.2}μs/row)",
1549 visible_rows.len(),
1550 start.elapsed().as_secs_f64() * 1000.0,
1551 start.elapsed().as_micros() as f64 / visible_rows.len() as f64
1552 );
1553
1554 Ok(results)
1555 }
1556
1557 pub fn evaluate_max_batch(
1559 &self,
1560 visible_rows: &[usize],
1561 column_name: &str,
1562 ) -> Result<Vec<DataValue>> {
1563 let start = Instant::now();
1564 let mut results = Vec::with_capacity(visible_rows.len());
1565
1566 let is_running_aggregate = self.is_running_aggregate_frame();
1568
1569 if is_running_aggregate && !visible_rows.is_empty() {
1570 debug!(
1572 "Using optimized running maximum for {} rows",
1573 visible_rows.len()
1574 );
1575
1576 let mut partition_groups: BTreeMap<PartitionKey, Vec<(usize, usize)>> = BTreeMap::new();
1578 for (idx, &row_idx) in visible_rows.iter().enumerate() {
1579 if let Some(partition_key) = self.row_to_partition.get(&row_idx) {
1580 partition_groups
1581 .entry(partition_key.clone())
1582 .or_insert_with(Vec::new)
1583 .push((idx, row_idx));
1584 }
1585 }
1586
1587 let source_table = self.source.source();
1588 let col_idx = source_table
1589 .get_column_index(column_name)
1590 .ok_or_else(|| anyhow!("Column '{}' not found", column_name))?;
1591
1592 results.resize(visible_rows.len(), DataValue::Null);
1594
1595 for (_partition_key, rows) in partition_groups {
1597 if let Some(partition) = self.partitions.get(&_partition_key) {
1598 let mut running_max: Option<DataValue> = None;
1599 let mut position_to_max: HashMap<usize, DataValue> = HashMap::new();
1600
1601 for (pos, &row_idx) in partition.rows.iter().enumerate() {
1603 if let Some(value) = source_table.get_value(row_idx, col_idx) {
1604 if !matches!(value, DataValue::Null) {
1605 match &running_max {
1606 None => running_max = Some(value.clone()),
1607 Some(current_max) => {
1608 use crate::data::datavalue_compare::compare_datavalues;
1609 if compare_datavalues(value, current_max).is_gt() {
1610 running_max = Some(value.clone());
1611 }
1612 }
1613 }
1614 }
1615 }
1616
1617 position_to_max.insert(pos, running_max.clone().unwrap_or(DataValue::Null));
1619 }
1620
1621 for (result_idx, row_idx) in rows {
1623 if let Some(pos) = partition.get_position(row_idx) {
1624 results[result_idx] = position_to_max
1625 .get(&pos)
1626 .cloned()
1627 .unwrap_or(DataValue::Null);
1628 }
1629 }
1630 }
1631 }
1632 } else {
1633 for &row_idx in visible_rows {
1635 let value = if self.has_frame() {
1636 self.get_frame_max(row_idx, column_name)
1637 .unwrap_or(DataValue::Null)
1638 } else {
1639 self.get_partition_max(row_idx, column_name)
1640 .unwrap_or(DataValue::Null)
1641 };
1642 results.push(value);
1643 }
1644 }
1645
1646 debug!(
1647 "evaluate_max_batch: {} rows in {:.3}ms ({:.2}μs/row)",
1648 visible_rows.len(),
1649 start.elapsed().as_secs_f64() * 1000.0,
1650 start.elapsed().as_micros() as f64 / visible_rows.len() as f64
1651 );
1652
1653 Ok(results)
1654 }
1655
1656 pub fn evaluate_count_batch(
1658 &self,
1659 visible_rows: &[usize],
1660 column_name: Option<&str>,
1661 ) -> Result<Vec<DataValue>> {
1662 let start = Instant::now();
1663 let mut results = Vec::with_capacity(visible_rows.len());
1664
1665 let is_running_aggregate = self.is_running_aggregate_frame();
1667
1668 if is_running_aggregate && !visible_rows.is_empty() {
1669 debug!(
1671 "Using optimized running count for {} rows",
1672 visible_rows.len()
1673 );
1674
1675 let mut partition_groups: BTreeMap<PartitionKey, Vec<(usize, usize)>> = BTreeMap::new();
1677 for (idx, &row_idx) in visible_rows.iter().enumerate() {
1678 if let Some(partition_key) = self.row_to_partition.get(&row_idx) {
1679 partition_groups
1680 .entry(partition_key.clone())
1681 .or_insert_with(Vec::new)
1682 .push((idx, row_idx));
1683 }
1684 }
1685
1686 let source_table = self.source.source();
1687 let col_idx = if let Some(col_name) = column_name {
1688 source_table.get_column_index(col_name)
1689 } else {
1690 None
1691 };
1692
1693 results.resize(visible_rows.len(), DataValue::Null);
1695
1696 for (_partition_key, rows) in partition_groups {
1698 if let Some(partition) = self.partitions.get(&_partition_key) {
1699 let mut running_count = 0i64;
1700 let mut position_to_count: HashMap<usize, DataValue> = HashMap::new();
1701
1702 for (pos, &row_idx) in partition.rows.iter().enumerate() {
1704 if let Some(col_idx) = col_idx {
1705 if let Some(value) = source_table.get_value(row_idx, col_idx) {
1707 if !matches!(value, DataValue::Null) {
1708 running_count += 1;
1709 }
1710 }
1711 } else {
1712 running_count += 1;
1714 }
1715
1716 position_to_count.insert(pos, DataValue::Integer(running_count));
1718 }
1719
1720 for (result_idx, row_idx) in rows {
1722 if let Some(pos) = partition.get_position(row_idx) {
1723 results[result_idx] = position_to_count
1724 .get(&pos)
1725 .cloned()
1726 .unwrap_or(DataValue::Null);
1727 }
1728 }
1729 }
1730 }
1731 } else {
1732 for &row_idx in visible_rows {
1734 let value = if self.has_frame() {
1735 self.get_frame_count(row_idx, column_name)
1736 .unwrap_or(DataValue::Null)
1737 } else {
1738 self.get_partition_count(row_idx, column_name)
1739 .unwrap_or(DataValue::Null)
1740 };
1741 results.push(value);
1742 }
1743 }
1744
1745 debug!(
1746 "evaluate_count_batch: {} rows in {:.3}ms ({:.2}μs/row)",
1747 visible_rows.len(),
1748 start.elapsed().as_secs_f64() * 1000.0,
1749 start.elapsed().as_micros() as f64 / visible_rows.len() as f64
1750 );
1751
1752 Ok(results)
1753 }
1754
1755 pub fn evaluate_first_value_batch(
1757 &self,
1758 visible_rows: &[usize],
1759 column_name: &str,
1760 ) -> Result<Vec<DataValue>> {
1761 let start = Instant::now();
1762 let mut results = Vec::with_capacity(visible_rows.len());
1763
1764 for &row_idx in visible_rows {
1765 let value = if self.has_frame() {
1766 self.get_frame_first_value(row_idx, column_name)
1767 .unwrap_or(DataValue::Null)
1768 } else {
1769 self.get_first_value(row_idx, column_name)
1770 .unwrap_or(DataValue::Null)
1771 };
1772 results.push(value);
1773 }
1774
1775 debug!(
1776 "evaluate_first_value_batch: {} rows in {:.3}ms ({:.2}μs/row)",
1777 visible_rows.len(),
1778 start.elapsed().as_secs_f64() * 1000.0,
1779 start.elapsed().as_micros() as f64 / visible_rows.len() as f64
1780 );
1781
1782 Ok(results)
1783 }
1784
1785 pub fn evaluate_last_value_batch(
1787 &self,
1788 visible_rows: &[usize],
1789 column_name: &str,
1790 ) -> Result<Vec<DataValue>> {
1791 let start = Instant::now();
1792 let mut results = Vec::with_capacity(visible_rows.len());
1793
1794 for &row_idx in visible_rows {
1795 let value = if self.has_frame() {
1796 self.get_frame_last_value(row_idx, column_name)
1797 .unwrap_or(DataValue::Null)
1798 } else {
1799 self.get_last_value(row_idx, column_name)
1800 .unwrap_or(DataValue::Null)
1801 };
1802 results.push(value);
1803 }
1804
1805 debug!(
1806 "evaluate_last_value_batch: {} rows in {:.3}ms ({:.2}μs/row)",
1807 visible_rows.len(),
1808 start.elapsed().as_secs_f64() * 1000.0,
1809 start.elapsed().as_micros() as f64 / visible_rows.len() as f64
1810 );
1811
1812 Ok(results)
1813 }
1814}