sql_cli/sql/
window_context.rs

1//! Window function context for managing partitioned and ordered data views
2//!
3//! This module provides the WindowContext helper class that enables window functions
4//! like LAG, LEAD, ROW_NUMBER, etc. by managing partitions and ordering.
5
6use std::collections::{BTreeMap, HashMap};
7use std::sync::Arc;
8use std::time::Instant;
9
10use anyhow::{anyhow, Result};
11use tracing::{debug, info};
12
13use crate::data::data_view::DataView;
14use crate::data::datatable::{DataTable, DataValue};
15use crate::sql::parser::ast::{
16    FrameBound, FrameUnit, OrderByItem, SortDirection, SqlExpression, WindowSpec,
17};
18
19/// Key for identifying a partition (combination of partition column values)
20/// We use String representation for now since DataValue doesn't impl Ord
21#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
22struct PartitionKey(String);
23
24impl PartitionKey {
25    /// Create a partition key from data values
26    fn from_values(values: Vec<DataValue>) -> Self {
27        // Create a unique string representation
28        let key_parts: Vec<String> = values
29            .iter()
30            .map(|v| match v {
31                DataValue::String(s) => format!("S:{}", s),
32                DataValue::InternedString(s) => format!("S:{}", s),
33                DataValue::Integer(i) => format!("I:{}", i),
34                DataValue::Float(f) => format!("F:{}", f),
35                DataValue::Boolean(b) => format!("B:{}", b),
36                DataValue::DateTime(dt) => format!("D:{}", dt),
37                DataValue::Vector(v) => {
38                    let components: Vec<String> = v.iter().map(|f| f.to_string()).collect();
39                    format!("V:[{}]", components.join(","))
40                }
41                DataValue::Null => "N".to_string(),
42            })
43            .collect();
44        let key = key_parts.join("|");
45        PartitionKey(key)
46    }
47}
48
49/// An ordered partition containing row indices
50#[derive(Debug, Clone)]
51pub struct OrderedPartition {
52    /// Original row indices from DataView, in sorted order
53    rows: Vec<usize>,
54
55    /// Quick lookup: row_index -> position in partition
56    row_positions: HashMap<usize, usize>,
57}
58
59impl OrderedPartition {
60    /// Create a new ordered partition from row indices
61    fn new(rows: Vec<usize>) -> Self {
62        // Build position lookup
63        let row_positions: HashMap<usize, usize> = rows
64            .iter()
65            .enumerate()
66            .map(|(pos, &row_idx)| (row_idx, pos))
67            .collect();
68
69        Self {
70            rows,
71            row_positions,
72        }
73    }
74
75    /// Navigate to offset from current position
76    pub fn get_row_at_offset(&self, current_row: usize, offset: i32) -> Option<usize> {
77        let current_pos = self.row_positions.get(&current_row)?;
78        let target_pos = (*current_pos as i32) + offset;
79
80        if target_pos >= 0 && target_pos < self.rows.len() as i32 {
81            Some(self.rows[target_pos as usize])
82        } else {
83            None
84        }
85    }
86
87    /// Get position of row in this partition (0-based)
88    pub fn get_position(&self, row_index: usize) -> Option<usize> {
89        self.row_positions.get(&row_index).copied()
90    }
91
92    /// Get the first row index in this partition
93    pub fn first_row(&self) -> Option<usize> {
94        self.rows.first().copied()
95    }
96
97    /// Get the last row index in this partition
98    pub fn last_row(&self) -> Option<usize> {
99        self.rows.last().copied()
100    }
101}
102
103/// Context for evaluating window functions
104pub struct WindowContext {
105    /// Source data view
106    source: Arc<DataView>,
107
108    /// Partitions with their ordered rows
109    partitions: BTreeMap<PartitionKey, OrderedPartition>,
110
111    /// Mapping from row index to its partition key
112    row_to_partition: HashMap<usize, PartitionKey>,
113
114    /// Window specification
115    spec: WindowSpec,
116}
117
118impl WindowContext {
119    /// Create a new window context with partitioning and ordering
120    pub fn new(
121        view: Arc<DataView>,
122        partition_by: Vec<String>,
123        order_by: Vec<OrderByItem>,
124    ) -> Result<Self> {
125        Self::new_with_spec(
126            view,
127            WindowSpec {
128                partition_by,
129                order_by,
130                frame: None,
131            },
132        )
133    }
134
135    /// Create a new window context with a full window specification
136    pub fn new_with_spec(view: Arc<DataView>, spec: WindowSpec) -> Result<Self> {
137        let overall_start = Instant::now();
138        let partition_by = spec.partition_by.clone();
139        let order_by = spec.order_by.clone();
140        let row_count = view.row_count();
141
142        // If no partition columns, treat entire view as single partition
143        if partition_by.is_empty() {
144            info!(
145                "Creating single partition (no PARTITION BY) for {} rows",
146                row_count
147            );
148            let partition_start = Instant::now();
149
150            let single_partition = Self::create_single_partition(&view, &order_by)?;
151            let partition_key = PartitionKey::from_values(vec![]);
152
153            // Build row-to-partition mapping
154            let mut row_to_partition = HashMap::new();
155            for &row_idx in &single_partition.rows {
156                row_to_partition.insert(row_idx, partition_key.clone());
157            }
158
159            let mut partitions = BTreeMap::new();
160            partitions.insert(partition_key, single_partition);
161
162            info!(
163                "Single partition created in {:.2}ms (1 partition, {} rows)",
164                partition_start.elapsed().as_secs_f64() * 1000.0,
165                row_count
166            );
167
168            info!(
169                "WindowContext::new_with_spec (single partition) took {:.2}ms total",
170                overall_start.elapsed().as_secs_f64() * 1000.0
171            );
172
173            return Ok(Self {
174                source: view,
175                partitions,
176                row_to_partition,
177                spec,
178            });
179        }
180
181        // Create partitions based on partition_by columns
182        info!(
183            "Creating partitions with PARTITION BY for {} rows",
184            row_count
185        );
186        let partition_start = Instant::now();
187
188        let mut partition_map: BTreeMap<PartitionKey, Vec<usize>> = BTreeMap::new();
189        let mut row_to_partition = HashMap::new();
190
191        // Get column indices for partition columns
192        let source_table = view.source();
193        let partition_col_indices: Vec<usize> = partition_by
194            .iter()
195            .map(|col| {
196                source_table
197                    .get_column_index(col)
198                    .ok_or_else(|| anyhow!("Invalid partition column: {}", col))
199            })
200            .collect::<Result<Vec<_>>>()?;
201
202        // Group rows by partition key
203        let grouping_start = Instant::now();
204        for row_idx in view.get_visible_rows() {
205            // Build partition key from row values
206            let mut key_values = Vec::new();
207            for &col_idx in &partition_col_indices {
208                let value = source_table
209                    .get_value(row_idx, col_idx)
210                    .ok_or_else(|| anyhow!("Failed to get value for partition"))?
211                    .clone();
212                key_values.push(value);
213            }
214            let key = PartitionKey::from_values(key_values);
215
216            // Add row to partition
217            partition_map.entry(key.clone()).or_default().push(row_idx);
218            row_to_partition.insert(row_idx, key);
219        }
220
221        info!(
222            "Partition grouping took {:.2}ms ({} partitions created)",
223            grouping_start.elapsed().as_secs_f64() * 1000.0,
224            partition_map.len()
225        );
226
227        // Sort each partition according to ORDER BY
228        let sort_start = Instant::now();
229        let mut partitions = BTreeMap::new();
230        let partition_count = partition_map.len();
231
232        for (key, mut rows) in partition_map {
233            // Sort rows within partition
234            if !order_by.is_empty() {
235                Self::sort_rows(&mut rows, source_table, &order_by)?;
236            }
237
238            partitions.insert(key, OrderedPartition::new(rows));
239        }
240
241        info!(
242            "Partition sorting took {:.2}ms ({} partitions, ORDER BY: {})",
243            sort_start.elapsed().as_secs_f64() * 1000.0,
244            partition_count,
245            !order_by.is_empty()
246        );
247
248        info!(
249            "Total partition creation took {:.2}ms",
250            partition_start.elapsed().as_secs_f64() * 1000.0
251        );
252
253        info!(
254            "WindowContext::new_with_spec (multi-partition) took {:.2}ms total",
255            overall_start.elapsed().as_secs_f64() * 1000.0
256        );
257
258        Ok(Self {
259            source: view,
260            partitions,
261            row_to_partition,
262            spec,
263        })
264    }
265
266    /// Create a single partition from the entire view
267    fn create_single_partition(
268        view: &DataView,
269        order_by: &[OrderByItem],
270    ) -> Result<OrderedPartition> {
271        let mut rows: Vec<usize> = view.get_visible_rows();
272
273        if !order_by.is_empty() {
274            let sort_start = Instant::now();
275            Self::sort_rows(&mut rows, view.source(), order_by)?;
276            debug!(
277                "Single partition sort took {:.2}ms ({} rows)",
278                sort_start.elapsed().as_secs_f64() * 1000.0,
279                rows.len()
280            );
281        }
282
283        Ok(OrderedPartition::new(rows))
284    }
285
286    /// Sort row indices according to ORDER BY specification
287    fn sort_rows(rows: &mut Vec<usize>, table: &DataTable, order_by: &[OrderByItem]) -> Result<()> {
288        let prep_start = Instant::now();
289
290        // Get column indices for ORDER BY columns
291        let sort_cols: Vec<(usize, bool)> = order_by
292            .iter()
293            .map(|col| {
294                // Extract column name from expression (currently only supports simple columns)
295                let column_name = match &col.expr {
296                    SqlExpression::Column(col_ref) => &col_ref.name,
297                    _ => {
298                        return Err(anyhow!("Window function ORDER BY only supports simple columns, not expressions"));
299                    }
300                };
301                let idx = table
302                    .get_column_index(column_name)
303                    .ok_or_else(|| anyhow!("Invalid ORDER BY column: {}", column_name))?;
304                let ascending = matches!(col.direction, SortDirection::Asc);
305                Ok((idx, ascending))
306            })
307            .collect::<Result<Vec<_>>>()?;
308
309        debug!(
310            "Sort preparation took {:.2}μs ({} sort columns)",
311            prep_start.elapsed().as_micros(),
312            sort_cols.len()
313        );
314
315        let sort_start = Instant::now();
316
317        // Sort rows based on column values
318        rows.sort_by(|&a, &b| {
319            for &(col_idx, ascending) in &sort_cols {
320                let val_a = table.get_value(a, col_idx);
321                let val_b = table.get_value(b, col_idx);
322
323                match (val_a, val_b) {
324                    (None, None) => continue,
325                    (None, Some(_)) => {
326                        return if ascending {
327                            std::cmp::Ordering::Less
328                        } else {
329                            std::cmp::Ordering::Greater
330                        }
331                    }
332                    (Some(_), None) => {
333                        return if ascending {
334                            std::cmp::Ordering::Greater
335                        } else {
336                            std::cmp::Ordering::Less
337                        }
338                    }
339                    (Some(v_a), Some(v_b)) => {
340                        // DataValue only implements PartialOrd, not Ord
341                        let ord = v_a.partial_cmp(&v_b).unwrap_or(std::cmp::Ordering::Equal);
342                        if ord != std::cmp::Ordering::Equal {
343                            return if ascending { ord } else { ord.reverse() };
344                        }
345                    }
346                }
347            }
348            std::cmp::Ordering::Equal
349        });
350
351        debug!(
352            "Actual sort operation took {:.2}μs ({} rows)",
353            sort_start.elapsed().as_micros(),
354            rows.len()
355        );
356
357        Ok(())
358    }
359
360    /// Get value at offset from current row (for LAG/LEAD)
361    pub fn get_offset_value(
362        &self,
363        current_row: usize,
364        offset: i32,
365        column: &str,
366    ) -> Option<DataValue> {
367        // Note: This method is called once per row, so we use trace-level logging
368        // to avoid overwhelming the debug output
369        let start = Instant::now();
370
371        // Find which partition this row belongs to
372        let partition_lookup_start = Instant::now();
373        let partition_key = self.row_to_partition.get(&current_row)?;
374        let partition = self.partitions.get(partition_key)?;
375        let partition_lookup_time = partition_lookup_start.elapsed();
376
377        // Navigate to target row
378        let offset_nav_start = Instant::now();
379        let target_row = partition.get_row_at_offset(current_row, offset)?;
380        let offset_nav_time = offset_nav_start.elapsed();
381
382        // Get column value from target row
383        let value_access_start = Instant::now();
384        let source_table = self.source.source();
385        let col_idx = source_table.get_column_index(column)?;
386        let value = source_table.get_value(target_row, col_idx).cloned();
387        let value_access_time = value_access_start.elapsed();
388
389        // Only log if this takes more than 10 microseconds (to avoid spam)
390        let total_time = start.elapsed();
391        if total_time.as_micros() > 10 {
392            debug!(
393                "get_offset_value slow: total={:.2}μs, partition_lookup={:.2}μs, offset_nav={:.2}μs, value_access={:.2}μs",
394                total_time.as_micros(),
395                partition_lookup_time.as_micros(),
396                offset_nav_time.as_micros(),
397                value_access_time.as_micros()
398            );
399        }
400
401        value
402    }
403
404    /// Get row number within partition (1-based)
405    pub fn get_row_number(&self, row_index: usize) -> usize {
406        if let Some(partition_key) = self.row_to_partition.get(&row_index) {
407            if let Some(partition) = self.partitions.get(partition_key) {
408                if let Some(position) = partition.get_position(row_index) {
409                    return position + 1; // Convert to 1-based
410                }
411            }
412        }
413        0 // Should not happen for valid row
414    }
415
416    /// Get first value in frame
417    pub fn get_frame_first_value(&self, row_index: usize, column: &str) -> Option<DataValue> {
418        let frame_rows = self.get_frame_rows(row_index);
419        if frame_rows.is_empty() {
420            return Some(DataValue::Null);
421        }
422
423        let source_table = self.source.source();
424        let col_idx = source_table.get_column_index(column)?;
425
426        // Get the first row in the frame
427        let first_row = frame_rows[0];
428        source_table.get_value(first_row, col_idx).cloned()
429    }
430
431    /// Get last value in frame
432    pub fn get_frame_last_value(&self, row_index: usize, column: &str) -> Option<DataValue> {
433        let frame_rows = self.get_frame_rows(row_index);
434        if frame_rows.is_empty() {
435            return Some(DataValue::Null);
436        }
437
438        let source_table = self.source.source();
439        let col_idx = source_table.get_column_index(column)?;
440
441        // Get the last row in the frame
442        let last_row = frame_rows[frame_rows.len() - 1];
443        source_table.get_value(last_row, col_idx).cloned()
444    }
445
446    /// Get first value in partition
447    pub fn get_first_value(&self, row_index: usize, column: &str) -> Option<DataValue> {
448        let partition_key = self.row_to_partition.get(&row_index)?;
449        let partition = self.partitions.get(partition_key)?;
450        let first_row = partition.first_row()?;
451
452        let source_table = self.source.source();
453        let col_idx = source_table.get_column_index(column)?;
454        source_table.get_value(first_row, col_idx).cloned()
455    }
456
457    /// Get last value in partition
458    pub fn get_last_value(&self, row_index: usize, column: &str) -> Option<DataValue> {
459        let partition_key = self.row_to_partition.get(&row_index)?;
460        let partition = self.partitions.get(partition_key)?;
461        let last_row = partition.last_row()?;
462
463        let source_table = self.source.source();
464        let col_idx = source_table.get_column_index(column)?;
465        source_table.get_value(last_row, col_idx).cloned()
466    }
467
468    /// Get the number of partitions
469    pub fn partition_count(&self) -> usize {
470        self.partitions.len()
471    }
472
473    /// Check if context has partitions (vs single window)
474    pub fn has_partitions(&self) -> bool {
475        !self.spec.partition_by.is_empty()
476    }
477
478    /// Check if context has a window frame specification
479    pub fn has_frame(&self) -> bool {
480        self.spec.frame.is_some()
481    }
482
483    /// Check if this is a running aggregate frame (UNBOUNDED PRECEDING to CURRENT ROW)
484    pub fn is_running_aggregate_frame(&self) -> bool {
485        if let Some(frame) = &self.spec.frame {
486            matches!(frame.start, FrameBound::UnboundedPreceding) && frame.end.is_none()
487        // None means CURRENT ROW
488        } else {
489            false
490        }
491    }
492
493    /// Get the source DataView
494    pub fn source(&self) -> &DataTable {
495        self.source.source()
496    }
497
498    /// Get row indices within the window frame for a given row
499    pub fn get_frame_rows(&self, row_index: usize) -> Vec<usize> {
500        // Find which partition this row belongs to
501        let partition_key = match self.row_to_partition.get(&row_index) {
502            Some(key) => key,
503            None => return vec![],
504        };
505
506        let partition = match self.partitions.get(partition_key) {
507            Some(p) => p,
508            None => return vec![],
509        };
510
511        // Get current row's position in partition
512        let current_pos = match partition.get_position(row_index) {
513            Some(pos) => pos as i64,
514            None => return vec![],
515        };
516
517        // If no frame specified, return entire partition (default behavior)
518        let frame = match &self.spec.frame {
519            Some(f) => f,
520            None => return partition.rows.clone(),
521        };
522
523        // Calculate frame bounds
524        let (start_pos, end_pos) = match frame.unit {
525            FrameUnit::Rows => {
526                // ROWS frame - based on physical row positions
527                let start =
528                    self.calculate_frame_position(&frame.start, current_pos, partition.rows.len());
529                let end = match &frame.end {
530                    Some(bound) => {
531                        self.calculate_frame_position(bound, current_pos, partition.rows.len())
532                    }
533                    None => current_pos, // Default to CURRENT ROW
534                };
535                (start, end)
536            }
537            FrameUnit::Range => {
538                // RANGE frame - based on ORDER BY values (not yet fully implemented)
539                // For now, treat like ROWS
540                let start =
541                    self.calculate_frame_position(&frame.start, current_pos, partition.rows.len());
542                let end = match &frame.end {
543                    Some(bound) => {
544                        self.calculate_frame_position(bound, current_pos, partition.rows.len())
545                    }
546                    None => current_pos,
547                };
548                (start, end)
549            }
550        };
551
552        // Collect rows within frame bounds
553        let mut frame_rows = Vec::new();
554        for i in start_pos..=end_pos {
555            if i >= 0 && (i as usize) < partition.rows.len() {
556                frame_rows.push(partition.rows[i as usize]);
557            }
558        }
559
560        frame_rows
561    }
562
563    /// Calculate absolute position from frame bound
564    fn calculate_frame_position(
565        &self,
566        bound: &FrameBound,
567        current_pos: i64,
568        partition_size: usize,
569    ) -> i64 {
570        match bound {
571            FrameBound::UnboundedPreceding => 0,
572            FrameBound::UnboundedFollowing => partition_size as i64 - 1,
573            FrameBound::CurrentRow => current_pos,
574            FrameBound::Preceding(n) => current_pos - n,
575            FrameBound::Following(n) => current_pos + n,
576        }
577    }
578
579    /// Calculate sum of a column within the window frame for the given row
580    pub fn get_frame_sum(&self, row_index: usize, column: &str) -> Option<DataValue> {
581        let frame_rows = self.get_frame_rows(row_index);
582        if frame_rows.is_empty() {
583            return Some(DataValue::Null);
584        }
585
586        let source_table = self.source.source();
587        let col_idx = source_table.get_column_index(column)?;
588
589        let mut sum = 0.0;
590        let mut has_float = false;
591        let mut has_value = false;
592
593        // Sum all values in the frame
594        for &row_idx in &frame_rows {
595            if let Some(value) = source_table.get_value(row_idx, col_idx) {
596                match value {
597                    DataValue::Integer(i) => {
598                        sum += *i as f64;
599                        has_value = true;
600                    }
601                    DataValue::Float(f) => {
602                        sum += f;
603                        has_float = true;
604                        has_value = true;
605                    }
606                    DataValue::Null => {
607                        // Skip NULL values
608                    }
609                    _ => {
610                        // Non-numeric values - return NULL
611                        return Some(DataValue::Null);
612                    }
613                }
614            }
615        }
616
617        if !has_value {
618            return Some(DataValue::Null);
619        }
620
621        // Return as integer if all values were integers and sum is whole
622        if !has_float && sum.fract() == 0.0 && sum >= i64::MIN as f64 && sum <= i64::MAX as f64 {
623            Some(DataValue::Integer(sum as i64))
624        } else {
625            Some(DataValue::Float(sum))
626        }
627    }
628
629    /// Calculate count within the window frame
630    pub fn get_frame_count(&self, row_index: usize, column: Option<&str>) -> Option<DataValue> {
631        let frame_rows = self.get_frame_rows(row_index);
632        if frame_rows.is_empty() {
633            return Some(DataValue::Integer(0));
634        }
635
636        if let Some(col_name) = column {
637            // COUNT(column) - count non-null values in frame
638            let source_table = self.source.source();
639            let col_idx = source_table.get_column_index(col_name)?;
640
641            let count = frame_rows
642                .iter()
643                .filter_map(|&row_idx| source_table.get_value(row_idx, col_idx))
644                .filter(|v| !matches!(v, DataValue::Null))
645                .count();
646
647            Some(DataValue::Integer(count as i64))
648        } else {
649            // COUNT(*) - count all rows in frame
650            Some(DataValue::Integer(frame_rows.len() as i64))
651        }
652    }
653
654    /// Calculate average of a column within the window frame
655    pub fn get_frame_avg(&self, row_index: usize, column: &str) -> Option<DataValue> {
656        let frame_rows = self.get_frame_rows(row_index);
657        if frame_rows.is_empty() {
658            return Some(DataValue::Null);
659        }
660
661        let source_table = self.source.source();
662        let col_idx = source_table.get_column_index(column)?;
663
664        let mut sum = 0.0;
665        let mut count = 0;
666
667        // Sum all non-null values in the frame
668        for &row_idx in &frame_rows {
669            if let Some(value) = source_table.get_value(row_idx, col_idx) {
670                match value {
671                    DataValue::Integer(i) => {
672                        sum += *i as f64;
673                        count += 1;
674                    }
675                    DataValue::Float(f) => {
676                        sum += f;
677                        count += 1;
678                    }
679                    DataValue::Null => {
680                        // Skip NULL values
681                    }
682                    _ => {
683                        // Non-numeric values - return NULL
684                        return Some(DataValue::Null);
685                    }
686                }
687            }
688        }
689
690        if count == 0 {
691            return Some(DataValue::Null);
692        }
693
694        Some(DataValue::Float(sum / count as f64))
695    }
696
697    /// Calculate standard deviation within the window frame (sample stddev)
698    pub fn get_frame_stddev(&self, row_index: usize, column: &str) -> Option<DataValue> {
699        let variance = self.get_frame_variance(row_index, column)?;
700        match variance {
701            DataValue::Float(v) => Some(DataValue::Float(v.sqrt())),
702            DataValue::Null => Some(DataValue::Null),
703            _ => Some(DataValue::Null),
704        }
705    }
706
707    /// Calculate variance within the window frame (sample variance with n-1)
708    pub fn get_frame_variance(&self, row_index: usize, column: &str) -> Option<DataValue> {
709        let frame_rows = self.get_frame_rows(row_index);
710        if frame_rows.is_empty() {
711            return Some(DataValue::Null);
712        }
713
714        let source_table = self.source.source();
715        let col_idx = source_table.get_column_index(column)?;
716
717        let mut values = Vec::new();
718
719        // Collect all non-null values in the frame
720        for &row_idx in &frame_rows {
721            if let Some(value) = source_table.get_value(row_idx, col_idx) {
722                match value {
723                    DataValue::Integer(i) => values.push(*i as f64),
724                    DataValue::Float(f) => values.push(*f),
725                    DataValue::Null => {
726                        // Skip NULL values
727                    }
728                    _ => {
729                        // Non-numeric values - return NULL
730                        return Some(DataValue::Null);
731                    }
732                }
733            }
734        }
735
736        if values.is_empty() {
737            return Some(DataValue::Null);
738        }
739
740        if values.len() == 1 {
741            // Variance of single value is 0
742            return Some(DataValue::Float(0.0));
743        }
744
745        // Calculate mean
746        let mean = values.iter().sum::<f64>() / values.len() as f64;
747
748        // Calculate sample variance (n-1 denominator)
749        let variance =
750            values.iter().map(|x| (x - mean).powi(2)).sum::<f64>() / (values.len() - 1) as f64;
751
752        Some(DataValue::Float(variance))
753    }
754
755    /// Calculate sum of a column over the partition containing the given row
756    pub fn get_partition_sum(&self, row_index: usize, column: &str) -> Option<DataValue> {
757        let partition_key = self.row_to_partition.get(&row_index)?;
758        let partition = self.partitions.get(partition_key)?;
759        let source_table = self.source.source();
760        let col_idx = source_table.get_column_index(column)?;
761
762        let mut sum = 0.0;
763        let mut has_float = false;
764        let mut has_value = false;
765
766        // Sum all values in the partition
767        for &row_idx in &partition.rows {
768            if let Some(value) = source_table.get_value(row_idx, col_idx) {
769                match value {
770                    DataValue::Integer(i) => {
771                        sum += *i as f64;
772                        has_value = true;
773                    }
774                    DataValue::Float(f) => {
775                        sum += f;
776                        has_float = true;
777                        has_value = true;
778                    }
779                    DataValue::Null => {
780                        // Skip NULL values
781                    }
782                    _ => {
783                        // Non-numeric values - return NULL
784                        return Some(DataValue::Null);
785                    }
786                }
787            }
788        }
789
790        if !has_value {
791            return Some(DataValue::Null);
792        }
793
794        // Return as integer if all values were integers and sum is whole
795        if !has_float && sum.fract() == 0.0 && sum >= i64::MIN as f64 && sum <= i64::MAX as f64 {
796            Some(DataValue::Integer(sum as i64))
797        } else {
798            Some(DataValue::Float(sum))
799        }
800    }
801
802    /// Calculate count of non-null values in a column over the partition
803    pub fn get_partition_count(&self, row_index: usize, column: Option<&str>) -> Option<DataValue> {
804        let partition_key = self.row_to_partition.get(&row_index)?;
805        let partition = self.partitions.get(partition_key)?;
806
807        if let Some(col_name) = column {
808            // COUNT(column) - count non-null values
809            let source_table = self.source.source();
810            let col_idx = source_table.get_column_index(col_name)?;
811
812            let count = partition
813                .rows
814                .iter()
815                .filter_map(|&row_idx| source_table.get_value(row_idx, col_idx))
816                .filter(|v| !matches!(v, DataValue::Null))
817                .count();
818
819            Some(DataValue::Integer(count as i64))
820        } else {
821            // COUNT(*) - count all rows in partition
822            Some(DataValue::Integer(partition.rows.len() as i64))
823        }
824    }
825
826    /// Calculate average of a column over the partition containing the given row
827    pub fn get_partition_avg(&self, row_index: usize, column: &str) -> Option<DataValue> {
828        let partition_key = self.row_to_partition.get(&row_index)?;
829        let partition = self.partitions.get(partition_key)?;
830        let source_table = self.source.source();
831        let col_idx = source_table.get_column_index(column)?;
832
833        let mut sum = 0.0;
834        let mut count = 0;
835
836        // Sum all values in the partition
837        for &row_idx in &partition.rows {
838            if let Some(value) = source_table.get_value(row_idx, col_idx) {
839                match value {
840                    DataValue::Integer(i) => {
841                        sum += *i as f64;
842                        count += 1;
843                    }
844                    DataValue::Float(f) => {
845                        sum += f;
846                        count += 1;
847                    }
848                    DataValue::Null => {
849                        // Skip NULL values
850                    }
851                    _ => {
852                        // Non-numeric values - return NULL
853                        return Some(DataValue::Null);
854                    }
855                }
856            }
857        }
858
859        if count == 0 {
860            Some(DataValue::Null)
861        } else {
862            Some(DataValue::Float(sum / count as f64))
863        }
864    }
865
866    /// Calculate minimum value of a column over the partition containing the given row
867    pub fn get_partition_min(&self, row_index: usize, column: &str) -> Option<DataValue> {
868        let partition_key = self.row_to_partition.get(&row_index)?;
869        let partition = self.partitions.get(partition_key)?;
870        let source_table = self.source.source();
871        let col_idx = source_table.get_column_index(column)?;
872
873        let mut min_value: Option<DataValue> = None;
874
875        for &row_idx in &partition.rows {
876            if let Some(value) = source_table.get_value(row_idx, col_idx) {
877                if !matches!(value, DataValue::Null) {
878                    match &min_value {
879                        None => min_value = Some(value.clone()),
880                        Some(current_min) => {
881                            use crate::data::datavalue_compare::compare_datavalues;
882                            if compare_datavalues(value, current_min).is_lt() {
883                                min_value = Some(value.clone());
884                            }
885                        }
886                    }
887                }
888            }
889        }
890
891        min_value.or(Some(DataValue::Null))
892    }
893
894    /// Calculate maximum value of a column over the partition containing the given row
895    pub fn get_partition_max(&self, row_index: usize, column: &str) -> Option<DataValue> {
896        let partition_key = self.row_to_partition.get(&row_index)?;
897        let partition = self.partitions.get(partition_key)?;
898        let source_table = self.source.source();
899        let col_idx = source_table.get_column_index(column)?;
900
901        let mut max_value: Option<DataValue> = None;
902
903        for &row_idx in &partition.rows {
904            if let Some(value) = source_table.get_value(row_idx, col_idx) {
905                if !matches!(value, DataValue::Null) {
906                    match &max_value {
907                        None => max_value = Some(value.clone()),
908                        Some(current_max) => {
909                            use crate::data::datavalue_compare::compare_datavalues;
910                            if compare_datavalues(value, current_max).is_gt() {
911                                max_value = Some(value.clone());
912                            }
913                        }
914                    }
915                }
916            }
917        }
918
919        max_value.or(Some(DataValue::Null))
920    }
921
922    /// Get all row indices in the partition containing the given row
923    pub fn get_partition_rows(&self, row_index: usize) -> Vec<usize> {
924        if let Some(partition_key) = self.row_to_partition.get(&row_index) {
925            if let Some(partition) = self.partitions.get(partition_key) {
926                return partition.rows.clone();
927            }
928        }
929        vec![]
930    }
931
932    /// Calculate minimum value within the frame for a given row
933    pub fn get_frame_min(&self, row_index: usize, column: &str) -> Option<DataValue> {
934        let frame_rows = self.get_frame_rows(row_index);
935        let source_table = self.source.source();
936        let col_idx = source_table.get_column_index(column)?;
937
938        let mut min_value: Option<DataValue> = None;
939
940        for &frame_row_idx in &frame_rows {
941            if let Some(value) = source_table.get_value(frame_row_idx, col_idx) {
942                if !matches!(value, DataValue::Null) {
943                    match &min_value {
944                        None => min_value = Some(value.clone()),
945                        Some(current_min) => {
946                            use crate::data::datavalue_compare::compare_datavalues;
947                            if compare_datavalues(value, current_min).is_lt() {
948                                min_value = Some(value.clone());
949                            }
950                        }
951                    }
952                }
953            }
954        }
955
956        min_value.or(Some(DataValue::Null))
957    }
958
959    /// Calculate maximum value within the frame for a given row
960    pub fn get_frame_max(&self, row_index: usize, column: &str) -> Option<DataValue> {
961        let frame_rows = self.get_frame_rows(row_index);
962        let source_table = self.source.source();
963        let col_idx = source_table.get_column_index(column)?;
964
965        let mut max_value: Option<DataValue> = None;
966
967        for &frame_row_idx in &frame_rows {
968            if let Some(value) = source_table.get_value(frame_row_idx, col_idx) {
969                if !matches!(value, DataValue::Null) {
970                    match &max_value {
971                        None => max_value = Some(value.clone()),
972                        Some(current_max) => {
973                            use crate::data::datavalue_compare::compare_datavalues;
974                            if compare_datavalues(value, current_max).is_gt() {
975                                max_value = Some(value.clone());
976                            }
977                        }
978                    }
979                }
980            }
981        }
982
983        max_value.or(Some(DataValue::Null))
984    }
985
986    /// Batch evaluate LAG function for multiple rows
987    /// This is significantly faster than calling get_offset_value for each row
988    pub fn evaluate_lag_batch(
989        &self,
990        visible_rows: &[usize],
991        column_name: &str,
992        offset: i64,
993    ) -> Result<Vec<DataValue>> {
994        let start = Instant::now();
995        let mut results = Vec::with_capacity(visible_rows.len());
996
997        // Validate column exists
998        let source_table = self.source.source();
999        let _ = source_table
1000            .get_column_index(column_name)
1001            .ok_or_else(|| anyhow!("Column '{}' not found", column_name))?;
1002
1003        for &row_idx in visible_rows {
1004            // LAG uses negative offset internally
1005            let value =
1006                if let Some(val) = self.get_offset_value(row_idx, -(offset as i32), column_name) {
1007                    val
1008                } else {
1009                    DataValue::Null
1010                };
1011            results.push(value);
1012        }
1013
1014        debug!(
1015            "evaluate_lag_batch: {} rows in {:.3}ms ({:.2}μs/row)",
1016            visible_rows.len(),
1017            start.elapsed().as_secs_f64() * 1000.0,
1018            start.elapsed().as_micros() as f64 / visible_rows.len() as f64
1019        );
1020
1021        Ok(results)
1022    }
1023
1024    /// Batch evaluate LEAD function for multiple rows
1025    pub fn evaluate_lead_batch(
1026        &self,
1027        visible_rows: &[usize],
1028        column_name: &str,
1029        offset: i64,
1030    ) -> Result<Vec<DataValue>> {
1031        let start = Instant::now();
1032        let mut results = Vec::with_capacity(visible_rows.len());
1033
1034        // Validate column exists
1035        let source_table = self.source.source();
1036        let _ = source_table
1037            .get_column_index(column_name)
1038            .ok_or_else(|| anyhow!("Column '{}' not found", column_name))?;
1039
1040        for &row_idx in visible_rows {
1041            // LEAD uses positive offset
1042            let value =
1043                if let Some(val) = self.get_offset_value(row_idx, offset as i32, column_name) {
1044                    val
1045                } else {
1046                    DataValue::Null
1047                };
1048            results.push(value);
1049        }
1050
1051        debug!(
1052            "evaluate_lead_batch: {} rows in {:.3}ms ({:.2}μs/row)",
1053            visible_rows.len(),
1054            start.elapsed().as_secs_f64() * 1000.0,
1055            start.elapsed().as_micros() as f64 / visible_rows.len() as f64
1056        );
1057
1058        Ok(results)
1059    }
1060
1061    /// Batch evaluate ROW_NUMBER function for multiple rows
1062    pub fn evaluate_row_number_batch(&self, visible_rows: &[usize]) -> Result<Vec<DataValue>> {
1063        let start = Instant::now();
1064        let mut results = Vec::with_capacity(visible_rows.len());
1065
1066        for &row_idx in visible_rows {
1067            let row_num = self.get_row_number(row_idx);
1068            if row_num > 0 {
1069                results.push(DataValue::Integer(row_num as i64));
1070            } else {
1071                // This shouldn't happen if WindowContext is properly constructed
1072                results.push(DataValue::Null);
1073            }
1074        }
1075
1076        debug!(
1077            "evaluate_row_number_batch: {} rows in {:.3}ms ({:.2}μs/row)",
1078            visible_rows.len(),
1079            start.elapsed().as_secs_f64() * 1000.0,
1080            start.elapsed().as_micros() as f64 / visible_rows.len() as f64
1081        );
1082
1083        Ok(results)
1084    }
1085
1086    /// Get rank of a row within its partition
1087    /// Ties get the same rank, and next rank(s) are skipped
1088    pub fn get_rank(&self, row_index: usize) -> i64 {
1089        if let Some(partition_key) = self.row_to_partition.get(&row_index) {
1090            if let Some(partition) = self.partitions.get(partition_key) {
1091                // Get the value at this row for comparison
1092                if let Some(position) = partition.get_position(row_index) {
1093                    let mut rows_before = 0;
1094
1095                    // Compare with all previous rows in the partition
1096                    for i in 0..position {
1097                        let prev_row = partition.rows[i];
1098                        if self.compare_rows_for_rank(prev_row, row_index) < 0 {
1099                            rows_before += 1;
1100                        }
1101                    }
1102
1103                    let rank = rows_before + 1;
1104                    return rank;
1105                }
1106            }
1107        }
1108        1 // Default if not found
1109    }
1110
1111    /// Get dense rank of a row within its partition
1112    /// Ties get the same rank, but ranks are not skipped
1113    pub fn get_dense_rank(&self, row_index: usize) -> i64 {
1114        if let Some(partition_key) = self.row_to_partition.get(&row_index) {
1115            if let Some(partition) = self.partitions.get(partition_key) {
1116                if let Some(position) = partition.get_position(row_index) {
1117                    let mut dense_rank = 1;
1118                    let mut last_value_seen = None;
1119
1120                    // Look at all rows before this one
1121                    for i in 0..position {
1122                        let prev_row = partition.rows[i];
1123                        let cmp = self.compare_rows_for_rank(prev_row, row_index);
1124
1125                        if cmp < 0 {
1126                            // This row has a better rank
1127                            if last_value_seen.is_none()
1128                                || last_value_seen.map_or(true, |last| {
1129                                    self.compare_rows_for_rank(last, prev_row) != 0
1130                                })
1131                            {
1132                                dense_rank += 1;
1133                                last_value_seen = Some(prev_row);
1134                            }
1135                        }
1136                    }
1137
1138                    return dense_rank;
1139                }
1140            }
1141        }
1142        1 // Default if not found
1143    }
1144
1145    /// Compare two rows based on ORDER BY columns for ranking
1146    /// Returns -1 if row1 < row2, 0 if equal, 1 if row1 > row2
1147    fn compare_rows_for_rank(&self, row1: usize, row2: usize) -> i32 {
1148        let source_table = self.source.source();
1149
1150        for order_item in &self.spec.order_by {
1151            if let SqlExpression::Column(col) = &order_item.expr {
1152                if let Some(col_idx) = source_table.get_column_index(&col.name) {
1153                    let val1 = source_table.get_value(row1, col_idx);
1154                    let val2 = source_table.get_value(row2, col_idx);
1155
1156                    let cmp = match (val1, val2) {
1157                        (Some(v1), Some(v2)) => {
1158                            crate::data::datavalue_compare::compare_datavalues(v1, v2)
1159                        }
1160                        (None, Some(_)) => std::cmp::Ordering::Less,
1161                        (Some(_), None) => std::cmp::Ordering::Greater,
1162                        (None, None) => std::cmp::Ordering::Equal,
1163                    };
1164
1165                    if cmp != std::cmp::Ordering::Equal {
1166                        let result = match order_item.direction {
1167                            SortDirection::Asc => cmp,
1168                            SortDirection::Desc => cmp.reverse(),
1169                        };
1170
1171                        return match result {
1172                            std::cmp::Ordering::Less => -1,
1173                            std::cmp::Ordering::Equal => 0,
1174                            std::cmp::Ordering::Greater => 1,
1175                        };
1176                    }
1177                }
1178            }
1179        }
1180
1181        0 // All columns equal
1182    }
1183
1184    /// Batch evaluate RANK function for multiple rows
1185    pub fn evaluate_rank_batch(&self, visible_rows: &[usize]) -> Result<Vec<DataValue>> {
1186        let start = Instant::now();
1187        let mut results = Vec::with_capacity(visible_rows.len());
1188
1189        for &row_idx in visible_rows {
1190            let rank = self.get_rank(row_idx);
1191            results.push(DataValue::Integer(rank));
1192        }
1193
1194        debug!(
1195            "evaluate_rank_batch: {} rows in {:.3}ms ({:.2}μs/row)",
1196            visible_rows.len(),
1197            start.elapsed().as_secs_f64() * 1000.0,
1198            start.elapsed().as_micros() as f64 / visible_rows.len() as f64
1199        );
1200
1201        Ok(results)
1202    }
1203
1204    /// Batch evaluate DENSE_RANK function for multiple rows
1205    pub fn evaluate_dense_rank_batch(&self, visible_rows: &[usize]) -> Result<Vec<DataValue>> {
1206        let start = Instant::now();
1207        let mut results = Vec::with_capacity(visible_rows.len());
1208
1209        for &row_idx in visible_rows {
1210            let rank = self.get_dense_rank(row_idx);
1211            results.push(DataValue::Integer(rank));
1212        }
1213
1214        debug!(
1215            "evaluate_dense_rank_batch: {} rows in {:.3}ms ({:.2}μs/row)",
1216            visible_rows.len(),
1217            start.elapsed().as_secs_f64() * 1000.0,
1218            start.elapsed().as_micros() as f64 / visible_rows.len() as f64
1219        );
1220
1221        Ok(results)
1222    }
1223
1224    /// Batch evaluate SUM window aggregate
1225    pub fn evaluate_sum_batch(
1226        &self,
1227        visible_rows: &[usize],
1228        column_name: &str,
1229    ) -> Result<Vec<DataValue>> {
1230        let start = Instant::now();
1231        let mut results = Vec::with_capacity(visible_rows.len());
1232
1233        // Check if this is a running aggregate (UNBOUNDED PRECEDING to CURRENT ROW)
1234        let is_running_aggregate = self.is_running_aggregate_frame();
1235
1236        if is_running_aggregate && !visible_rows.is_empty() {
1237            // Optimized path for running aggregates
1238            debug!(
1239                "Using optimized running sum for {} rows",
1240                visible_rows.len()
1241            );
1242
1243            // Group visible rows by partition
1244            let mut partition_groups: BTreeMap<PartitionKey, Vec<(usize, usize)>> = BTreeMap::new();
1245            for (idx, &row_idx) in visible_rows.iter().enumerate() {
1246                if let Some(partition_key) = self.row_to_partition.get(&row_idx) {
1247                    partition_groups
1248                        .entry(partition_key.clone())
1249                        .or_insert_with(Vec::new)
1250                        .push((idx, row_idx));
1251                }
1252            }
1253
1254            let source_table = self.source.source();
1255            let col_idx = source_table
1256                .get_column_index(column_name)
1257                .ok_or_else(|| anyhow!("Column '{}' not found", column_name))?;
1258
1259            // Initialize results with nulls
1260            results.resize(visible_rows.len(), DataValue::Null);
1261
1262            // Process each partition
1263            for (_partition_key, rows) in partition_groups {
1264                if let Some(partition) = self.partitions.get(&_partition_key) {
1265                    let mut running_sum = 0.0;
1266                    let mut has_float = false;
1267                    let mut position_to_sum: HashMap<usize, DataValue> = HashMap::new();
1268
1269                    // Calculate running sum for all positions in this partition
1270                    for (pos, &row_idx) in partition.rows.iter().enumerate() {
1271                        if let Some(value) = source_table.get_value(row_idx, col_idx) {
1272                            match value {
1273                                DataValue::Integer(i) => {
1274                                    running_sum += *i as f64;
1275                                }
1276                                DataValue::Float(f) => {
1277                                    running_sum += f;
1278                                    has_float = true;
1279                                }
1280                                DataValue::Null => {
1281                                    // Skip NULL values
1282                                }
1283                                _ => {
1284                                    // Non-numeric value
1285                                }
1286                            }
1287                        }
1288
1289                        // Store the running sum for this position
1290                        if !has_float
1291                            && running_sum.fract() == 0.0
1292                            && running_sum >= i64::MIN as f64
1293                            && running_sum <= i64::MAX as f64
1294                        {
1295                            position_to_sum.insert(pos, DataValue::Integer(running_sum as i64));
1296                        } else {
1297                            position_to_sum.insert(pos, DataValue::Float(running_sum));
1298                        }
1299                    }
1300
1301                    // Fill in results for visible rows in this partition
1302                    for (result_idx, row_idx) in rows {
1303                        if let Some(pos) = partition.get_position(row_idx) {
1304                            results[result_idx] = position_to_sum
1305                                .get(&pos)
1306                                .cloned()
1307                                .unwrap_or(DataValue::Null);
1308                        }
1309                    }
1310                }
1311            }
1312        } else {
1313            // Regular path for non-running aggregates or other frame types
1314            for &row_idx in visible_rows {
1315                let value = if self.has_frame() {
1316                    self.get_frame_sum(row_idx, column_name)
1317                        .unwrap_or(DataValue::Null)
1318                } else {
1319                    self.get_partition_sum(row_idx, column_name)
1320                        .unwrap_or(DataValue::Null)
1321                };
1322                results.push(value);
1323            }
1324        }
1325
1326        debug!(
1327            "evaluate_sum_batch: {} rows in {:.3}ms ({:.2}μs/row)",
1328            visible_rows.len(),
1329            start.elapsed().as_secs_f64() * 1000.0,
1330            start.elapsed().as_micros() as f64 / visible_rows.len() as f64
1331        );
1332
1333        Ok(results)
1334    }
1335
1336    /// Batch evaluate AVG window aggregate
1337    pub fn evaluate_avg_batch(
1338        &self,
1339        visible_rows: &[usize],
1340        column_name: &str,
1341    ) -> Result<Vec<DataValue>> {
1342        let start = Instant::now();
1343        let mut results = Vec::with_capacity(visible_rows.len());
1344
1345        // Check if this is a running aggregate (UNBOUNDED PRECEDING to CURRENT ROW)
1346        let is_running_aggregate = self.is_running_aggregate_frame();
1347
1348        if is_running_aggregate && !visible_rows.is_empty() {
1349            // Optimized path for running averages
1350            debug!(
1351                "Using optimized running average for {} rows",
1352                visible_rows.len()
1353            );
1354
1355            // Group visible rows by partition
1356            let mut partition_groups: BTreeMap<PartitionKey, Vec<(usize, usize)>> = BTreeMap::new();
1357            for (idx, &row_idx) in visible_rows.iter().enumerate() {
1358                if let Some(partition_key) = self.row_to_partition.get(&row_idx) {
1359                    partition_groups
1360                        .entry(partition_key.clone())
1361                        .or_insert_with(Vec::new)
1362                        .push((idx, row_idx));
1363                }
1364            }
1365
1366            let source_table = self.source.source();
1367            let col_idx = source_table
1368                .get_column_index(column_name)
1369                .ok_or_else(|| anyhow!("Column '{}' not found", column_name))?;
1370
1371            // Initialize results with nulls
1372            results.resize(visible_rows.len(), DataValue::Null);
1373
1374            // Process each partition
1375            for (_partition_key, rows) in partition_groups {
1376                if let Some(partition) = self.partitions.get(&_partition_key) {
1377                    let mut running_sum = 0.0;
1378                    let mut count = 0;
1379                    let mut position_to_avg: HashMap<usize, DataValue> = HashMap::new();
1380
1381                    // Calculate running average for all positions in this partition
1382                    for (pos, &row_idx) in partition.rows.iter().enumerate() {
1383                        if let Some(value) = source_table.get_value(row_idx, col_idx) {
1384                            match value {
1385                                DataValue::Integer(i) => {
1386                                    running_sum += *i as f64;
1387                                    count += 1;
1388                                }
1389                                DataValue::Float(f) => {
1390                                    running_sum += f;
1391                                    count += 1;
1392                                }
1393                                DataValue::Null => {
1394                                    // Skip NULL values
1395                                }
1396                                _ => {
1397                                    // Non-numeric value
1398                                }
1399                            }
1400                        }
1401
1402                        // Store the running average for this position
1403                        if count > 0 {
1404                            position_to_avg
1405                                .insert(pos, DataValue::Float(running_sum / count as f64));
1406                        } else {
1407                            position_to_avg.insert(pos, DataValue::Null);
1408                        }
1409                    }
1410
1411                    // Fill in results for visible rows in this partition
1412                    for (result_idx, row_idx) in rows {
1413                        if let Some(pos) = partition.get_position(row_idx) {
1414                            results[result_idx] = position_to_avg
1415                                .get(&pos)
1416                                .cloned()
1417                                .unwrap_or(DataValue::Null);
1418                        }
1419                    }
1420                }
1421            }
1422        } else {
1423            // Regular path for non-running aggregates or other frame types
1424            for &row_idx in visible_rows {
1425                let value = if self.has_frame() {
1426                    self.get_frame_avg(row_idx, column_name)
1427                        .unwrap_or(DataValue::Null)
1428                } else {
1429                    self.get_partition_avg(row_idx, column_name)
1430                        .unwrap_or(DataValue::Null)
1431                };
1432                results.push(value);
1433            }
1434        }
1435
1436        debug!(
1437            "evaluate_avg_batch: {} rows in {:.3}ms ({:.2}μs/row)",
1438            visible_rows.len(),
1439            start.elapsed().as_secs_f64() * 1000.0,
1440            start.elapsed().as_micros() as f64 / visible_rows.len() as f64
1441        );
1442
1443        Ok(results)
1444    }
1445
1446    /// Batch evaluate MIN window aggregate
1447    pub fn evaluate_min_batch(
1448        &self,
1449        visible_rows: &[usize],
1450        column_name: &str,
1451    ) -> Result<Vec<DataValue>> {
1452        let start = Instant::now();
1453        let mut results = Vec::with_capacity(visible_rows.len());
1454
1455        // Check if this is a running aggregate (UNBOUNDED PRECEDING to CURRENT ROW)
1456        let is_running_aggregate = self.is_running_aggregate_frame();
1457
1458        if is_running_aggregate && !visible_rows.is_empty() {
1459            // Optimized path for running minimum
1460            debug!(
1461                "Using optimized running minimum for {} rows",
1462                visible_rows.len()
1463            );
1464
1465            // Group visible rows by partition
1466            let mut partition_groups: BTreeMap<PartitionKey, Vec<(usize, usize)>> = BTreeMap::new();
1467            for (idx, &row_idx) in visible_rows.iter().enumerate() {
1468                if let Some(partition_key) = self.row_to_partition.get(&row_idx) {
1469                    partition_groups
1470                        .entry(partition_key.clone())
1471                        .or_insert_with(Vec::new)
1472                        .push((idx, row_idx));
1473                }
1474            }
1475
1476            let source_table = self.source.source();
1477            let col_idx = source_table
1478                .get_column_index(column_name)
1479                .ok_or_else(|| anyhow!("Column '{}' not found", column_name))?;
1480
1481            // Initialize results with nulls
1482            results.resize(visible_rows.len(), DataValue::Null);
1483
1484            // Process each partition
1485            for (_partition_key, rows) in partition_groups {
1486                if let Some(partition) = self.partitions.get(&_partition_key) {
1487                    let mut running_min: Option<DataValue> = None;
1488                    let mut position_to_min: HashMap<usize, DataValue> = HashMap::new();
1489
1490                    // Calculate running minimum for all positions in this partition
1491                    for (pos, &row_idx) in partition.rows.iter().enumerate() {
1492                        if let Some(value) = source_table.get_value(row_idx, col_idx) {
1493                            if !matches!(value, DataValue::Null) {
1494                                match &running_min {
1495                                    None => running_min = Some(value.clone()),
1496                                    Some(current_min) => {
1497                                        use crate::data::datavalue_compare::compare_datavalues;
1498                                        if compare_datavalues(value, current_min).is_lt() {
1499                                            running_min = Some(value.clone());
1500                                        }
1501                                    }
1502                                }
1503                            }
1504                        }
1505
1506                        // Store the running minimum for this position
1507                        position_to_min.insert(pos, running_min.clone().unwrap_or(DataValue::Null));
1508                    }
1509
1510                    // Fill in results for visible rows in this partition
1511                    for (result_idx, row_idx) in rows {
1512                        if let Some(pos) = partition.get_position(row_idx) {
1513                            results[result_idx] = position_to_min
1514                                .get(&pos)
1515                                .cloned()
1516                                .unwrap_or(DataValue::Null);
1517                        }
1518                    }
1519                }
1520            }
1521        } else {
1522            // Regular path for non-running aggregates or other frame types
1523            for &row_idx in visible_rows {
1524                let value = if self.has_frame() {
1525                    self.get_frame_min(row_idx, column_name)
1526                        .unwrap_or(DataValue::Null)
1527                } else {
1528                    self.get_partition_min(row_idx, column_name)
1529                        .unwrap_or(DataValue::Null)
1530                };
1531                results.push(value);
1532            }
1533        }
1534
1535        debug!(
1536            "evaluate_min_batch: {} rows in {:.3}ms ({:.2}μs/row)",
1537            visible_rows.len(),
1538            start.elapsed().as_secs_f64() * 1000.0,
1539            start.elapsed().as_micros() as f64 / visible_rows.len() as f64
1540        );
1541
1542        Ok(results)
1543    }
1544
1545    /// Batch evaluate MAX window aggregate
1546    pub fn evaluate_max_batch(
1547        &self,
1548        visible_rows: &[usize],
1549        column_name: &str,
1550    ) -> Result<Vec<DataValue>> {
1551        let start = Instant::now();
1552        let mut results = Vec::with_capacity(visible_rows.len());
1553
1554        // Check if this is a running aggregate (UNBOUNDED PRECEDING to CURRENT ROW)
1555        let is_running_aggregate = self.is_running_aggregate_frame();
1556
1557        if is_running_aggregate && !visible_rows.is_empty() {
1558            // Optimized path for running maximum
1559            debug!(
1560                "Using optimized running maximum for {} rows",
1561                visible_rows.len()
1562            );
1563
1564            // Group visible rows by partition
1565            let mut partition_groups: BTreeMap<PartitionKey, Vec<(usize, usize)>> = BTreeMap::new();
1566            for (idx, &row_idx) in visible_rows.iter().enumerate() {
1567                if let Some(partition_key) = self.row_to_partition.get(&row_idx) {
1568                    partition_groups
1569                        .entry(partition_key.clone())
1570                        .or_insert_with(Vec::new)
1571                        .push((idx, row_idx));
1572                }
1573            }
1574
1575            let source_table = self.source.source();
1576            let col_idx = source_table
1577                .get_column_index(column_name)
1578                .ok_or_else(|| anyhow!("Column '{}' not found", column_name))?;
1579
1580            // Initialize results with nulls
1581            results.resize(visible_rows.len(), DataValue::Null);
1582
1583            // Process each partition
1584            for (_partition_key, rows) in partition_groups {
1585                if let Some(partition) = self.partitions.get(&_partition_key) {
1586                    let mut running_max: Option<DataValue> = None;
1587                    let mut position_to_max: HashMap<usize, DataValue> = HashMap::new();
1588
1589                    // Calculate running maximum for all positions in this partition
1590                    for (pos, &row_idx) in partition.rows.iter().enumerate() {
1591                        if let Some(value) = source_table.get_value(row_idx, col_idx) {
1592                            if !matches!(value, DataValue::Null) {
1593                                match &running_max {
1594                                    None => running_max = Some(value.clone()),
1595                                    Some(current_max) => {
1596                                        use crate::data::datavalue_compare::compare_datavalues;
1597                                        if compare_datavalues(value, current_max).is_gt() {
1598                                            running_max = Some(value.clone());
1599                                        }
1600                                    }
1601                                }
1602                            }
1603                        }
1604
1605                        // Store the running maximum for this position
1606                        position_to_max.insert(pos, running_max.clone().unwrap_or(DataValue::Null));
1607                    }
1608
1609                    // Fill in results for visible rows in this partition
1610                    for (result_idx, row_idx) in rows {
1611                        if let Some(pos) = partition.get_position(row_idx) {
1612                            results[result_idx] = position_to_max
1613                                .get(&pos)
1614                                .cloned()
1615                                .unwrap_or(DataValue::Null);
1616                        }
1617                    }
1618                }
1619            }
1620        } else {
1621            // Regular path for non-running aggregates or other frame types
1622            for &row_idx in visible_rows {
1623                let value = if self.has_frame() {
1624                    self.get_frame_max(row_idx, column_name)
1625                        .unwrap_or(DataValue::Null)
1626                } else {
1627                    self.get_partition_max(row_idx, column_name)
1628                        .unwrap_or(DataValue::Null)
1629                };
1630                results.push(value);
1631            }
1632        }
1633
1634        debug!(
1635            "evaluate_max_batch: {} rows in {:.3}ms ({:.2}μs/row)",
1636            visible_rows.len(),
1637            start.elapsed().as_secs_f64() * 1000.0,
1638            start.elapsed().as_micros() as f64 / visible_rows.len() as f64
1639        );
1640
1641        Ok(results)
1642    }
1643
1644    /// Batch evaluate COUNT window aggregate
1645    pub fn evaluate_count_batch(
1646        &self,
1647        visible_rows: &[usize],
1648        column_name: Option<&str>,
1649    ) -> Result<Vec<DataValue>> {
1650        let start = Instant::now();
1651        let mut results = Vec::with_capacity(visible_rows.len());
1652
1653        // Check if this is a running aggregate (UNBOUNDED PRECEDING to CURRENT ROW)
1654        let is_running_aggregate = self.is_running_aggregate_frame();
1655
1656        if is_running_aggregate && !visible_rows.is_empty() {
1657            // Optimized path for running count
1658            debug!(
1659                "Using optimized running count for {} rows",
1660                visible_rows.len()
1661            );
1662
1663            // Group visible rows by partition
1664            let mut partition_groups: BTreeMap<PartitionKey, Vec<(usize, usize)>> = BTreeMap::new();
1665            for (idx, &row_idx) in visible_rows.iter().enumerate() {
1666                if let Some(partition_key) = self.row_to_partition.get(&row_idx) {
1667                    partition_groups
1668                        .entry(partition_key.clone())
1669                        .or_insert_with(Vec::new)
1670                        .push((idx, row_idx));
1671                }
1672            }
1673
1674            let source_table = self.source.source();
1675            let col_idx = if let Some(col_name) = column_name {
1676                source_table.get_column_index(col_name)
1677            } else {
1678                None
1679            };
1680
1681            // Initialize results with nulls
1682            results.resize(visible_rows.len(), DataValue::Null);
1683
1684            // Process each partition
1685            for (_partition_key, rows) in partition_groups {
1686                if let Some(partition) = self.partitions.get(&_partition_key) {
1687                    let mut running_count = 0i64;
1688                    let mut position_to_count: HashMap<usize, DataValue> = HashMap::new();
1689
1690                    // Calculate running count for all positions in this partition
1691                    for (pos, &row_idx) in partition.rows.iter().enumerate() {
1692                        if let Some(col_idx) = col_idx {
1693                            // COUNT(column) - count non-null values
1694                            if let Some(value) = source_table.get_value(row_idx, col_idx) {
1695                                if !matches!(value, DataValue::Null) {
1696                                    running_count += 1;
1697                                }
1698                            }
1699                        } else {
1700                            // COUNT(*) - always increment
1701                            running_count += 1;
1702                        }
1703
1704                        // Store the running count for this position
1705                        position_to_count.insert(pos, DataValue::Integer(running_count));
1706                    }
1707
1708                    // Fill in results for visible rows in this partition
1709                    for (result_idx, row_idx) in rows {
1710                        if let Some(pos) = partition.get_position(row_idx) {
1711                            results[result_idx] = position_to_count
1712                                .get(&pos)
1713                                .cloned()
1714                                .unwrap_or(DataValue::Null);
1715                        }
1716                    }
1717                }
1718            }
1719        } else {
1720            // Regular path for non-running aggregates or other frame types
1721            for &row_idx in visible_rows {
1722                let value = if self.has_frame() {
1723                    self.get_frame_count(row_idx, column_name)
1724                        .unwrap_or(DataValue::Null)
1725                } else {
1726                    self.get_partition_count(row_idx, column_name)
1727                        .unwrap_or(DataValue::Null)
1728                };
1729                results.push(value);
1730            }
1731        }
1732
1733        debug!(
1734            "evaluate_count_batch: {} rows in {:.3}ms ({:.2}μs/row)",
1735            visible_rows.len(),
1736            start.elapsed().as_secs_f64() * 1000.0,
1737            start.elapsed().as_micros() as f64 / visible_rows.len() as f64
1738        );
1739
1740        Ok(results)
1741    }
1742
1743    /// Batch evaluate FIRST_VALUE window function
1744    pub fn evaluate_first_value_batch(
1745        &self,
1746        visible_rows: &[usize],
1747        column_name: &str,
1748    ) -> Result<Vec<DataValue>> {
1749        let start = Instant::now();
1750        let mut results = Vec::with_capacity(visible_rows.len());
1751
1752        for &row_idx in visible_rows {
1753            let value = if self.has_frame() {
1754                self.get_frame_first_value(row_idx, column_name)
1755                    .unwrap_or(DataValue::Null)
1756            } else {
1757                self.get_first_value(row_idx, column_name)
1758                    .unwrap_or(DataValue::Null)
1759            };
1760            results.push(value);
1761        }
1762
1763        debug!(
1764            "evaluate_first_value_batch: {} rows in {:.3}ms ({:.2}μs/row)",
1765            visible_rows.len(),
1766            start.elapsed().as_secs_f64() * 1000.0,
1767            start.elapsed().as_micros() as f64 / visible_rows.len() as f64
1768        );
1769
1770        Ok(results)
1771    }
1772
1773    /// Batch evaluate LAST_VALUE window function
1774    pub fn evaluate_last_value_batch(
1775        &self,
1776        visible_rows: &[usize],
1777        column_name: &str,
1778    ) -> Result<Vec<DataValue>> {
1779        let start = Instant::now();
1780        let mut results = Vec::with_capacity(visible_rows.len());
1781
1782        for &row_idx in visible_rows {
1783            let value = if self.has_frame() {
1784                self.get_frame_last_value(row_idx, column_name)
1785                    .unwrap_or(DataValue::Null)
1786            } else {
1787                self.get_last_value(row_idx, column_name)
1788                    .unwrap_or(DataValue::Null)
1789            };
1790            results.push(value);
1791        }
1792
1793        debug!(
1794            "evaluate_last_value_batch: {} rows in {:.3}ms ({:.2}μs/row)",
1795            visible_rows.len(),
1796            start.elapsed().as_secs_f64() * 1000.0,
1797            start.elapsed().as_micros() as f64 / visible_rows.len() as f64
1798        );
1799
1800        Ok(results)
1801    }
1802}