Skip to main content

sql_cli/sql/
window_context.rs

1//! Window function context for managing partitioned and ordered data views
2//!
3//! This module provides the WindowContext helper class that enables window functions
4//! like LAG, LEAD, ROW_NUMBER, etc. by managing partitions and ordering.
5
6use std::collections::{BTreeMap, HashMap};
7use std::sync::Arc;
8use std::time::Instant;
9
10use anyhow::{anyhow, Result};
11use tracing::{debug, info};
12
13use crate::data::data_view::DataView;
14use crate::data::datatable::{DataTable, DataValue};
15use crate::sql::parser::ast::{
16    FrameBound, FrameUnit, OrderByItem, SortDirection, SqlExpression, WindowSpec,
17};
18
19/// Key for identifying a partition (combination of partition column values)
20/// We use String representation for now since DataValue doesn't impl Ord
21#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
22struct PartitionKey(String);
23
24impl PartitionKey {
25    /// Create a partition key from data values
26    fn from_values(values: Vec<DataValue>) -> Self {
27        // Create a unique string representation
28        let key_parts: Vec<String> = values
29            .iter()
30            .map(|v| match v {
31                DataValue::String(s) => format!("S:{}", s),
32                DataValue::InternedString(s) => format!("S:{}", s),
33                DataValue::Integer(i) => format!("I:{}", i),
34                DataValue::Float(f) => format!("F:{}", f),
35                DataValue::Boolean(b) => format!("B:{}", b),
36                DataValue::DateTime(dt) => format!("D:{}", dt),
37                DataValue::Vector(v) => {
38                    let components: Vec<String> = v.iter().map(|f| f.to_string()).collect();
39                    format!("V:[{}]", components.join(","))
40                }
41                DataValue::Null => "N".to_string(),
42            })
43            .collect();
44        let key = key_parts.join("|");
45        PartitionKey(key)
46    }
47}
48
49/// An ordered partition containing row indices
50#[derive(Debug, Clone)]
51pub struct OrderedPartition {
52    /// Original row indices from DataView, in sorted order
53    rows: Vec<usize>,
54
55    /// Quick lookup: row_index -> position in partition
56    row_positions: HashMap<usize, usize>,
57}
58
59impl OrderedPartition {
60    /// Create a new ordered partition from row indices
61    fn new(rows: Vec<usize>) -> Self {
62        // Build position lookup
63        let row_positions: HashMap<usize, usize> = rows
64            .iter()
65            .enumerate()
66            .map(|(pos, &row_idx)| (row_idx, pos))
67            .collect();
68
69        Self {
70            rows,
71            row_positions,
72        }
73    }
74
75    /// Navigate to offset from current position
76    pub fn get_row_at_offset(&self, current_row: usize, offset: i32) -> Option<usize> {
77        let current_pos = self.row_positions.get(&current_row)?;
78        let target_pos = (*current_pos as i32) + offset;
79
80        if target_pos >= 0 && target_pos < self.rows.len() as i32 {
81            Some(self.rows[target_pos as usize])
82        } else {
83            None
84        }
85    }
86
87    /// Get position of row in this partition (0-based)
88    pub fn get_position(&self, row_index: usize) -> Option<usize> {
89        self.row_positions.get(&row_index).copied()
90    }
91
92    /// Get the first row index in this partition
93    pub fn first_row(&self) -> Option<usize> {
94        self.rows.first().copied()
95    }
96
97    /// Get the last row index in this partition
98    pub fn last_row(&self) -> Option<usize> {
99        self.rows.last().copied()
100    }
101}
102
103/// Context for evaluating window functions
104pub struct WindowContext {
105    /// Source data view
106    source: Arc<DataView>,
107
108    /// Partitions with their ordered rows
109    partitions: BTreeMap<PartitionKey, OrderedPartition>,
110
111    /// Mapping from row index to its partition key
112    row_to_partition: HashMap<usize, PartitionKey>,
113
114    /// Window specification
115    spec: WindowSpec,
116}
117
118impl WindowContext {
119    /// Create a new window context with partitioning and ordering
120    pub fn new(
121        view: Arc<DataView>,
122        partition_by: Vec<String>,
123        order_by: Vec<OrderByItem>,
124    ) -> Result<Self> {
125        Self::new_with_spec(
126            view,
127            WindowSpec {
128                partition_by,
129                order_by,
130                frame: None,
131            },
132        )
133    }
134
135    /// Create a new window context with a full window specification
136    pub fn new_with_spec(view: Arc<DataView>, spec: WindowSpec) -> Result<Self> {
137        let overall_start = Instant::now();
138        let partition_by = spec.partition_by.clone();
139        let order_by = spec.order_by.clone();
140        let row_count = view.row_count();
141
142        // If no partition columns, treat entire view as single partition
143        if partition_by.is_empty() {
144            info!(
145                "Creating single partition (no PARTITION BY) for {} rows",
146                row_count
147            );
148            let partition_start = Instant::now();
149
150            let single_partition = Self::create_single_partition(&view, &order_by)?;
151            let partition_key = PartitionKey::from_values(vec![]);
152
153            // Build row-to-partition mapping
154            let mut row_to_partition = HashMap::new();
155            for &row_idx in &single_partition.rows {
156                row_to_partition.insert(row_idx, partition_key.clone());
157            }
158
159            let mut partitions = BTreeMap::new();
160            partitions.insert(partition_key, single_partition);
161
162            info!(
163                "Single partition created in {:.2}ms (1 partition, {} rows)",
164                partition_start.elapsed().as_secs_f64() * 1000.0,
165                row_count
166            );
167
168            info!(
169                "WindowContext::new_with_spec (single partition) took {:.2}ms total",
170                overall_start.elapsed().as_secs_f64() * 1000.0
171            );
172
173            return Ok(Self {
174                source: view,
175                partitions,
176                row_to_partition,
177                spec,
178            });
179        }
180
181        // Create partitions based on partition_by columns
182        info!(
183            "Creating partitions with PARTITION BY for {} rows",
184            row_count
185        );
186        let partition_start = Instant::now();
187
188        let mut partition_map: BTreeMap<PartitionKey, Vec<usize>> = BTreeMap::new();
189        let mut row_to_partition = HashMap::new();
190
191        // Get column indices for partition columns
192        let source_table = view.source();
193        let partition_col_indices: Vec<usize> = partition_by
194            .iter()
195            .map(|col| {
196                // Try simple name first, then qualified name, then flexible lookup
197                source_table
198                    .get_column_index(col)
199                    .or_else(|| source_table.find_column_by_qualified_name(col))
200                    .or_else(|| {
201                        // Handle "table.column" by splitting and trying flexible lookup
202                        if let Some(dot_pos) = col.find('.') {
203                            let table = &col[..dot_pos];
204                            let column = &col[dot_pos + 1..];
205                            source_table.find_column_flexible(column, Some(table))
206                        } else {
207                            None
208                        }
209                    })
210                    .ok_or_else(|| anyhow!("Invalid partition column: {}", col))
211            })
212            .collect::<Result<Vec<_>>>()?;
213
214        // Group rows by partition key
215        let grouping_start = Instant::now();
216        for row_idx in view.get_visible_rows() {
217            // Build partition key from row values
218            let mut key_values = Vec::new();
219            for &col_idx in &partition_col_indices {
220                let value = source_table
221                    .get_value(row_idx, col_idx)
222                    .ok_or_else(|| anyhow!("Failed to get value for partition"))?
223                    .clone();
224                key_values.push(value);
225            }
226            let key = PartitionKey::from_values(key_values);
227
228            // Add row to partition
229            partition_map.entry(key.clone()).or_default().push(row_idx);
230            row_to_partition.insert(row_idx, key);
231        }
232
233        info!(
234            "Partition grouping took {:.2}ms ({} partitions created)",
235            grouping_start.elapsed().as_secs_f64() * 1000.0,
236            partition_map.len()
237        );
238
239        // Sort each partition according to ORDER BY
240        let sort_start = Instant::now();
241        let mut partitions = BTreeMap::new();
242        let partition_count = partition_map.len();
243
244        for (key, mut rows) in partition_map {
245            // Sort rows within partition
246            if !order_by.is_empty() {
247                Self::sort_rows(&mut rows, source_table, &order_by)?;
248            }
249
250            partitions.insert(key, OrderedPartition::new(rows));
251        }
252
253        info!(
254            "Partition sorting took {:.2}ms ({} partitions, ORDER BY: {})",
255            sort_start.elapsed().as_secs_f64() * 1000.0,
256            partition_count,
257            !order_by.is_empty()
258        );
259
260        info!(
261            "Total partition creation took {:.2}ms",
262            partition_start.elapsed().as_secs_f64() * 1000.0
263        );
264
265        info!(
266            "WindowContext::new_with_spec (multi-partition) took {:.2}ms total",
267            overall_start.elapsed().as_secs_f64() * 1000.0
268        );
269
270        Ok(Self {
271            source: view,
272            partitions,
273            row_to_partition,
274            spec,
275        })
276    }
277
278    /// Create a single partition from the entire view
279    fn create_single_partition(
280        view: &DataView,
281        order_by: &[OrderByItem],
282    ) -> Result<OrderedPartition> {
283        let mut rows: Vec<usize> = view.get_visible_rows();
284
285        if !order_by.is_empty() {
286            let sort_start = Instant::now();
287            Self::sort_rows(&mut rows, view.source(), order_by)?;
288            debug!(
289                "Single partition sort took {:.2}ms ({} rows)",
290                sort_start.elapsed().as_secs_f64() * 1000.0,
291                rows.len()
292            );
293        }
294
295        Ok(OrderedPartition::new(rows))
296    }
297
298    /// Sort row indices according to ORDER BY specification
299    fn sort_rows(rows: &mut Vec<usize>, table: &DataTable, order_by: &[OrderByItem]) -> Result<()> {
300        let prep_start = Instant::now();
301
302        // Get column indices for ORDER BY columns
303        let sort_cols: Vec<(usize, bool)> = order_by
304            .iter()
305            .map(|col| {
306                // Extract column name from expression (currently only supports simple columns)
307                let column_name = match &col.expr {
308                    SqlExpression::Column(col_ref) => &col_ref.name,
309                    _ => {
310                        return Err(anyhow!("Window function ORDER BY only supports simple columns, not expressions"));
311                    }
312                };
313                let idx = table
314                    .get_column_index(column_name)
315                    .ok_or_else(|| anyhow!("Invalid ORDER BY column: {}", column_name))?;
316                let ascending = matches!(col.direction, SortDirection::Asc);
317                Ok((idx, ascending))
318            })
319            .collect::<Result<Vec<_>>>()?;
320
321        debug!(
322            "Sort preparation took {:.2}μs ({} sort columns)",
323            prep_start.elapsed().as_micros(),
324            sort_cols.len()
325        );
326
327        let sort_start = Instant::now();
328
329        // Sort rows based on column values
330        rows.sort_by(|&a, &b| {
331            for &(col_idx, ascending) in &sort_cols {
332                let val_a = table.get_value(a, col_idx);
333                let val_b = table.get_value(b, col_idx);
334
335                match (val_a, val_b) {
336                    (None, None) => continue,
337                    (None, Some(_)) => {
338                        return if ascending {
339                            std::cmp::Ordering::Less
340                        } else {
341                            std::cmp::Ordering::Greater
342                        }
343                    }
344                    (Some(_), None) => {
345                        return if ascending {
346                            std::cmp::Ordering::Greater
347                        } else {
348                            std::cmp::Ordering::Less
349                        }
350                    }
351                    (Some(v_a), Some(v_b)) => {
352                        // DataValue only implements PartialOrd, not Ord
353                        let ord = v_a.partial_cmp(&v_b).unwrap_or(std::cmp::Ordering::Equal);
354                        if ord != std::cmp::Ordering::Equal {
355                            return if ascending { ord } else { ord.reverse() };
356                        }
357                    }
358                }
359            }
360            std::cmp::Ordering::Equal
361        });
362
363        debug!(
364            "Actual sort operation took {:.2}μs ({} rows)",
365            sort_start.elapsed().as_micros(),
366            rows.len()
367        );
368
369        Ok(())
370    }
371
372    /// Get value at offset from current row (for LAG/LEAD)
373    pub fn get_offset_value(
374        &self,
375        current_row: usize,
376        offset: i32,
377        column: &str,
378    ) -> Option<DataValue> {
379        // Note: This method is called once per row, so we use trace-level logging
380        // to avoid overwhelming the debug output
381        let start = Instant::now();
382
383        // Find which partition this row belongs to
384        let partition_lookup_start = Instant::now();
385        let partition_key = self.row_to_partition.get(&current_row)?;
386        let partition = self.partitions.get(partition_key)?;
387        let partition_lookup_time = partition_lookup_start.elapsed();
388
389        // Navigate to target row
390        let offset_nav_start = Instant::now();
391        let target_row = partition.get_row_at_offset(current_row, offset)?;
392        let offset_nav_time = offset_nav_start.elapsed();
393
394        // Get column value from target row
395        let value_access_start = Instant::now();
396        let source_table = self.source.source();
397        let col_idx = source_table.get_column_index(column)?;
398        let value = source_table.get_value(target_row, col_idx).cloned();
399        let value_access_time = value_access_start.elapsed();
400
401        // Only log if this takes more than 10 microseconds (to avoid spam)
402        let total_time = start.elapsed();
403        if total_time.as_micros() > 10 {
404            debug!(
405                "get_offset_value slow: total={:.2}μs, partition_lookup={:.2}μs, offset_nav={:.2}μs, value_access={:.2}μs",
406                total_time.as_micros(),
407                partition_lookup_time.as_micros(),
408                offset_nav_time.as_micros(),
409                value_access_time.as_micros()
410            );
411        }
412
413        value
414    }
415
416    /// Get row number within partition (1-based)
417    pub fn get_row_number(&self, row_index: usize) -> usize {
418        if let Some(partition_key) = self.row_to_partition.get(&row_index) {
419            if let Some(partition) = self.partitions.get(partition_key) {
420                if let Some(position) = partition.get_position(row_index) {
421                    return position + 1; // Convert to 1-based
422                }
423            }
424        }
425        0 // Should not happen for valid row
426    }
427
428    /// Get first value in frame
429    pub fn get_frame_first_value(&self, row_index: usize, column: &str) -> Option<DataValue> {
430        let frame_rows = self.get_frame_rows(row_index);
431        if frame_rows.is_empty() {
432            return Some(DataValue::Null);
433        }
434
435        let source_table = self.source.source();
436        let col_idx = source_table.get_column_index(column)?;
437
438        // Get the first row in the frame
439        let first_row = frame_rows[0];
440        source_table.get_value(first_row, col_idx).cloned()
441    }
442
443    /// Get last value in frame
444    pub fn get_frame_last_value(&self, row_index: usize, column: &str) -> Option<DataValue> {
445        let frame_rows = self.get_frame_rows(row_index);
446        if frame_rows.is_empty() {
447            return Some(DataValue::Null);
448        }
449
450        let source_table = self.source.source();
451        let col_idx = source_table.get_column_index(column)?;
452
453        // Get the last row in the frame
454        let last_row = frame_rows[frame_rows.len() - 1];
455        source_table.get_value(last_row, col_idx).cloned()
456    }
457
458    /// Get first value in partition
459    pub fn get_first_value(&self, row_index: usize, column: &str) -> Option<DataValue> {
460        let partition_key = self.row_to_partition.get(&row_index)?;
461        let partition = self.partitions.get(partition_key)?;
462        let first_row = partition.first_row()?;
463
464        let source_table = self.source.source();
465        let col_idx = source_table.get_column_index(column)?;
466        source_table.get_value(first_row, col_idx).cloned()
467    }
468
469    /// Get last value in partition
470    pub fn get_last_value(&self, row_index: usize, column: &str) -> Option<DataValue> {
471        let partition_key = self.row_to_partition.get(&row_index)?;
472        let partition = self.partitions.get(partition_key)?;
473        let last_row = partition.last_row()?;
474
475        let source_table = self.source.source();
476        let col_idx = source_table.get_column_index(column)?;
477        source_table.get_value(last_row, col_idx).cloned()
478    }
479
480    /// Get the number of partitions
481    pub fn partition_count(&self) -> usize {
482        self.partitions.len()
483    }
484
485    /// Check if context has partitions (vs single window)
486    pub fn has_partitions(&self) -> bool {
487        !self.spec.partition_by.is_empty()
488    }
489
490    /// Check if context has a window frame specification
491    pub fn has_frame(&self) -> bool {
492        self.spec.frame.is_some()
493    }
494
495    /// Check if this is a running aggregate frame (UNBOUNDED PRECEDING to CURRENT ROW)
496    pub fn is_running_aggregate_frame(&self) -> bool {
497        if let Some(frame) = &self.spec.frame {
498            matches!(frame.start, FrameBound::UnboundedPreceding) && frame.end.is_none()
499        // None means CURRENT ROW
500        } else {
501            false
502        }
503    }
504
505    /// Get the source DataView
506    pub fn source(&self) -> &DataTable {
507        self.source.source()
508    }
509
510    /// Get row indices within the window frame for a given row
511    pub fn get_frame_rows(&self, row_index: usize) -> Vec<usize> {
512        // Find which partition this row belongs to
513        let partition_key = match self.row_to_partition.get(&row_index) {
514            Some(key) => key,
515            None => return vec![],
516        };
517
518        let partition = match self.partitions.get(partition_key) {
519            Some(p) => p,
520            None => return vec![],
521        };
522
523        // Get current row's position in partition
524        let current_pos = match partition.get_position(row_index) {
525            Some(pos) => pos as i64,
526            None => return vec![],
527        };
528
529        // If no frame specified, return entire partition (default behavior)
530        let frame = match &self.spec.frame {
531            Some(f) => f,
532            None => return partition.rows.clone(),
533        };
534
535        // Calculate frame bounds
536        let (start_pos, end_pos) = match frame.unit {
537            FrameUnit::Rows => {
538                // ROWS frame - based on physical row positions
539                let start =
540                    self.calculate_frame_position(&frame.start, current_pos, partition.rows.len());
541                let end = match &frame.end {
542                    Some(bound) => {
543                        self.calculate_frame_position(bound, current_pos, partition.rows.len())
544                    }
545                    None => current_pos, // Default to CURRENT ROW
546                };
547                (start, end)
548            }
549            FrameUnit::Range => {
550                // RANGE frame - based on ORDER BY values (not yet fully implemented)
551                // For now, treat like ROWS
552                let start =
553                    self.calculate_frame_position(&frame.start, current_pos, partition.rows.len());
554                let end = match &frame.end {
555                    Some(bound) => {
556                        self.calculate_frame_position(bound, current_pos, partition.rows.len())
557                    }
558                    None => current_pos,
559                };
560                (start, end)
561            }
562        };
563
564        // Collect rows within frame bounds
565        let mut frame_rows = Vec::new();
566        for i in start_pos..=end_pos {
567            if i >= 0 && (i as usize) < partition.rows.len() {
568                frame_rows.push(partition.rows[i as usize]);
569            }
570        }
571
572        frame_rows
573    }
574
575    /// Calculate absolute position from frame bound
576    fn calculate_frame_position(
577        &self,
578        bound: &FrameBound,
579        current_pos: i64,
580        partition_size: usize,
581    ) -> i64 {
582        match bound {
583            FrameBound::UnboundedPreceding => 0,
584            FrameBound::UnboundedFollowing => partition_size as i64 - 1,
585            FrameBound::CurrentRow => current_pos,
586            FrameBound::Preceding(n) => current_pos - n,
587            FrameBound::Following(n) => current_pos + n,
588        }
589    }
590
591    /// Calculate sum of a column within the window frame for the given row
592    pub fn get_frame_sum(&self, row_index: usize, column: &str) -> Option<DataValue> {
593        let frame_rows = self.get_frame_rows(row_index);
594        if frame_rows.is_empty() {
595            return Some(DataValue::Null);
596        }
597
598        let source_table = self.source.source();
599        let col_idx = source_table.get_column_index(column)?;
600
601        let mut sum = 0.0;
602        let mut has_float = false;
603        let mut has_value = false;
604
605        // Sum all values in the frame
606        for &row_idx in &frame_rows {
607            if let Some(value) = source_table.get_value(row_idx, col_idx) {
608                match value {
609                    DataValue::Integer(i) => {
610                        sum += *i as f64;
611                        has_value = true;
612                    }
613                    DataValue::Float(f) => {
614                        sum += f;
615                        has_float = true;
616                        has_value = true;
617                    }
618                    DataValue::Null => {
619                        // Skip NULL values
620                    }
621                    _ => {
622                        // Non-numeric values - return NULL
623                        return Some(DataValue::Null);
624                    }
625                }
626            }
627        }
628
629        if !has_value {
630            return Some(DataValue::Null);
631        }
632
633        // Return as integer if all values were integers and sum is whole
634        if !has_float && sum.fract() == 0.0 && sum >= i64::MIN as f64 && sum <= i64::MAX as f64 {
635            Some(DataValue::Integer(sum as i64))
636        } else {
637            Some(DataValue::Float(sum))
638        }
639    }
640
641    /// Calculate count within the window frame
642    pub fn get_frame_count(&self, row_index: usize, column: Option<&str>) -> Option<DataValue> {
643        let frame_rows = self.get_frame_rows(row_index);
644        if frame_rows.is_empty() {
645            return Some(DataValue::Integer(0));
646        }
647
648        if let Some(col_name) = column {
649            // COUNT(column) - count non-null values in frame
650            let source_table = self.source.source();
651            let col_idx = source_table.get_column_index(col_name)?;
652
653            let count = frame_rows
654                .iter()
655                .filter_map(|&row_idx| source_table.get_value(row_idx, col_idx))
656                .filter(|v| !matches!(v, DataValue::Null))
657                .count();
658
659            Some(DataValue::Integer(count as i64))
660        } else {
661            // COUNT(*) - count all rows in frame
662            Some(DataValue::Integer(frame_rows.len() as i64))
663        }
664    }
665
666    /// Calculate average of a column within the window frame
667    pub fn get_frame_avg(&self, row_index: usize, column: &str) -> Option<DataValue> {
668        let frame_rows = self.get_frame_rows(row_index);
669        if frame_rows.is_empty() {
670            return Some(DataValue::Null);
671        }
672
673        let source_table = self.source.source();
674        let col_idx = source_table.get_column_index(column)?;
675
676        let mut sum = 0.0;
677        let mut count = 0;
678
679        // Sum all non-null values in the frame
680        for &row_idx in &frame_rows {
681            if let Some(value) = source_table.get_value(row_idx, col_idx) {
682                match value {
683                    DataValue::Integer(i) => {
684                        sum += *i as f64;
685                        count += 1;
686                    }
687                    DataValue::Float(f) => {
688                        sum += f;
689                        count += 1;
690                    }
691                    DataValue::Null => {
692                        // Skip NULL values
693                    }
694                    _ => {
695                        // Non-numeric values - return NULL
696                        return Some(DataValue::Null);
697                    }
698                }
699            }
700        }
701
702        if count == 0 {
703            return Some(DataValue::Null);
704        }
705
706        Some(DataValue::Float(sum / count as f64))
707    }
708
709    /// Calculate standard deviation within the window frame (sample stddev)
710    pub fn get_frame_stddev(&self, row_index: usize, column: &str) -> Option<DataValue> {
711        let variance = self.get_frame_variance(row_index, column)?;
712        match variance {
713            DataValue::Float(v) => Some(DataValue::Float(v.sqrt())),
714            DataValue::Null => Some(DataValue::Null),
715            _ => Some(DataValue::Null),
716        }
717    }
718
719    /// Calculate variance within the window frame (sample variance with n-1)
720    pub fn get_frame_variance(&self, row_index: usize, column: &str) -> Option<DataValue> {
721        let frame_rows = self.get_frame_rows(row_index);
722        if frame_rows.is_empty() {
723            return Some(DataValue::Null);
724        }
725
726        let source_table = self.source.source();
727        let col_idx = source_table.get_column_index(column)?;
728
729        let mut values = Vec::new();
730
731        // Collect all non-null values in the frame
732        for &row_idx in &frame_rows {
733            if let Some(value) = source_table.get_value(row_idx, col_idx) {
734                match value {
735                    DataValue::Integer(i) => values.push(*i as f64),
736                    DataValue::Float(f) => values.push(*f),
737                    DataValue::Null => {
738                        // Skip NULL values
739                    }
740                    _ => {
741                        // Non-numeric values - return NULL
742                        return Some(DataValue::Null);
743                    }
744                }
745            }
746        }
747
748        if values.is_empty() {
749            return Some(DataValue::Null);
750        }
751
752        if values.len() == 1 {
753            // Variance of single value is 0
754            return Some(DataValue::Float(0.0));
755        }
756
757        // Calculate mean
758        let mean = values.iter().sum::<f64>() / values.len() as f64;
759
760        // Calculate sample variance (n-1 denominator)
761        let variance =
762            values.iter().map(|x| (x - mean).powi(2)).sum::<f64>() / (values.len() - 1) as f64;
763
764        Some(DataValue::Float(variance))
765    }
766
767    /// Calculate sum of a column over the partition containing the given row
768    pub fn get_partition_sum(&self, row_index: usize, column: &str) -> Option<DataValue> {
769        let partition_key = self.row_to_partition.get(&row_index)?;
770        let partition = self.partitions.get(partition_key)?;
771        let source_table = self.source.source();
772        let col_idx = source_table.get_column_index(column)?;
773
774        let mut sum = 0.0;
775        let mut has_float = false;
776        let mut has_value = false;
777
778        // Sum all values in the partition
779        for &row_idx in &partition.rows {
780            if let Some(value) = source_table.get_value(row_idx, col_idx) {
781                match value {
782                    DataValue::Integer(i) => {
783                        sum += *i as f64;
784                        has_value = true;
785                    }
786                    DataValue::Float(f) => {
787                        sum += f;
788                        has_float = true;
789                        has_value = true;
790                    }
791                    DataValue::Null => {
792                        // Skip NULL values
793                    }
794                    _ => {
795                        // Non-numeric values - return NULL
796                        return Some(DataValue::Null);
797                    }
798                }
799            }
800        }
801
802        if !has_value {
803            return Some(DataValue::Null);
804        }
805
806        // Return as integer if all values were integers and sum is whole
807        if !has_float && sum.fract() == 0.0 && sum >= i64::MIN as f64 && sum <= i64::MAX as f64 {
808            Some(DataValue::Integer(sum as i64))
809        } else {
810            Some(DataValue::Float(sum))
811        }
812    }
813
814    /// Calculate count of non-null values in a column over the partition
815    pub fn get_partition_count(&self, row_index: usize, column: Option<&str>) -> Option<DataValue> {
816        let partition_key = self.row_to_partition.get(&row_index)?;
817        let partition = self.partitions.get(partition_key)?;
818
819        if let Some(col_name) = column {
820            // COUNT(column) - count non-null values
821            let source_table = self.source.source();
822            let col_idx = source_table.get_column_index(col_name)?;
823
824            let count = partition
825                .rows
826                .iter()
827                .filter_map(|&row_idx| source_table.get_value(row_idx, col_idx))
828                .filter(|v| !matches!(v, DataValue::Null))
829                .count();
830
831            Some(DataValue::Integer(count as i64))
832        } else {
833            // COUNT(*) - count all rows in partition
834            Some(DataValue::Integer(partition.rows.len() as i64))
835        }
836    }
837
838    /// Calculate average of a column over the partition containing the given row
839    pub fn get_partition_avg(&self, row_index: usize, column: &str) -> Option<DataValue> {
840        let partition_key = self.row_to_partition.get(&row_index)?;
841        let partition = self.partitions.get(partition_key)?;
842        let source_table = self.source.source();
843        let col_idx = source_table.get_column_index(column)?;
844
845        let mut sum = 0.0;
846        let mut count = 0;
847
848        // Sum all values in the partition
849        for &row_idx in &partition.rows {
850            if let Some(value) = source_table.get_value(row_idx, col_idx) {
851                match value {
852                    DataValue::Integer(i) => {
853                        sum += *i as f64;
854                        count += 1;
855                    }
856                    DataValue::Float(f) => {
857                        sum += f;
858                        count += 1;
859                    }
860                    DataValue::Null => {
861                        // Skip NULL values
862                    }
863                    _ => {
864                        // Non-numeric values - return NULL
865                        return Some(DataValue::Null);
866                    }
867                }
868            }
869        }
870
871        if count == 0 {
872            Some(DataValue::Null)
873        } else {
874            Some(DataValue::Float(sum / count as f64))
875        }
876    }
877
878    /// Calculate minimum value of a column over the partition containing the given row
879    pub fn get_partition_min(&self, row_index: usize, column: &str) -> Option<DataValue> {
880        let partition_key = self.row_to_partition.get(&row_index)?;
881        let partition = self.partitions.get(partition_key)?;
882        let source_table = self.source.source();
883        let col_idx = source_table.get_column_index(column)?;
884
885        let mut min_value: Option<DataValue> = None;
886
887        for &row_idx in &partition.rows {
888            if let Some(value) = source_table.get_value(row_idx, col_idx) {
889                if !matches!(value, DataValue::Null) {
890                    match &min_value {
891                        None => min_value = Some(value.clone()),
892                        Some(current_min) => {
893                            use crate::data::datavalue_compare::compare_datavalues;
894                            if compare_datavalues(value, current_min).is_lt() {
895                                min_value = Some(value.clone());
896                            }
897                        }
898                    }
899                }
900            }
901        }
902
903        min_value.or(Some(DataValue::Null))
904    }
905
906    /// Calculate maximum value of a column over the partition containing the given row
907    pub fn get_partition_max(&self, row_index: usize, column: &str) -> Option<DataValue> {
908        let partition_key = self.row_to_partition.get(&row_index)?;
909        let partition = self.partitions.get(partition_key)?;
910        let source_table = self.source.source();
911        let col_idx = source_table.get_column_index(column)?;
912
913        let mut max_value: Option<DataValue> = None;
914
915        for &row_idx in &partition.rows {
916            if let Some(value) = source_table.get_value(row_idx, col_idx) {
917                if !matches!(value, DataValue::Null) {
918                    match &max_value {
919                        None => max_value = Some(value.clone()),
920                        Some(current_max) => {
921                            use crate::data::datavalue_compare::compare_datavalues;
922                            if compare_datavalues(value, current_max).is_gt() {
923                                max_value = Some(value.clone());
924                            }
925                        }
926                    }
927                }
928            }
929        }
930
931        max_value.or(Some(DataValue::Null))
932    }
933
934    /// Get all row indices in the partition containing the given row
935    pub fn get_partition_rows(&self, row_index: usize) -> Vec<usize> {
936        if let Some(partition_key) = self.row_to_partition.get(&row_index) {
937            if let Some(partition) = self.partitions.get(partition_key) {
938                return partition.rows.clone();
939            }
940        }
941        vec![]
942    }
943
944    /// Calculate minimum value within the frame for a given row
945    pub fn get_frame_min(&self, row_index: usize, column: &str) -> Option<DataValue> {
946        let frame_rows = self.get_frame_rows(row_index);
947        let source_table = self.source.source();
948        let col_idx = source_table.get_column_index(column)?;
949
950        let mut min_value: Option<DataValue> = None;
951
952        for &frame_row_idx in &frame_rows {
953            if let Some(value) = source_table.get_value(frame_row_idx, col_idx) {
954                if !matches!(value, DataValue::Null) {
955                    match &min_value {
956                        None => min_value = Some(value.clone()),
957                        Some(current_min) => {
958                            use crate::data::datavalue_compare::compare_datavalues;
959                            if compare_datavalues(value, current_min).is_lt() {
960                                min_value = Some(value.clone());
961                            }
962                        }
963                    }
964                }
965            }
966        }
967
968        min_value.or(Some(DataValue::Null))
969    }
970
971    /// Calculate maximum value within the frame for a given row
972    pub fn get_frame_max(&self, row_index: usize, column: &str) -> Option<DataValue> {
973        let frame_rows = self.get_frame_rows(row_index);
974        let source_table = self.source.source();
975        let col_idx = source_table.get_column_index(column)?;
976
977        let mut max_value: Option<DataValue> = None;
978
979        for &frame_row_idx in &frame_rows {
980            if let Some(value) = source_table.get_value(frame_row_idx, col_idx) {
981                if !matches!(value, DataValue::Null) {
982                    match &max_value {
983                        None => max_value = Some(value.clone()),
984                        Some(current_max) => {
985                            use crate::data::datavalue_compare::compare_datavalues;
986                            if compare_datavalues(value, current_max).is_gt() {
987                                max_value = Some(value.clone());
988                            }
989                        }
990                    }
991                }
992            }
993        }
994
995        max_value.or(Some(DataValue::Null))
996    }
997
998    /// Batch evaluate LAG function for multiple rows
999    /// This is significantly faster than calling get_offset_value for each row
1000    pub fn evaluate_lag_batch(
1001        &self,
1002        visible_rows: &[usize],
1003        column_name: &str,
1004        offset: i64,
1005    ) -> Result<Vec<DataValue>> {
1006        let start = Instant::now();
1007        let mut results = Vec::with_capacity(visible_rows.len());
1008
1009        // Validate column exists
1010        let source_table = self.source.source();
1011        let _ = source_table
1012            .get_column_index(column_name)
1013            .ok_or_else(|| anyhow!("Column '{}' not found", column_name))?;
1014
1015        for &row_idx in visible_rows {
1016            // LAG uses negative offset internally
1017            let value =
1018                if let Some(val) = self.get_offset_value(row_idx, -(offset as i32), column_name) {
1019                    val
1020                } else {
1021                    DataValue::Null
1022                };
1023            results.push(value);
1024        }
1025
1026        debug!(
1027            "evaluate_lag_batch: {} rows in {:.3}ms ({:.2}μs/row)",
1028            visible_rows.len(),
1029            start.elapsed().as_secs_f64() * 1000.0,
1030            start.elapsed().as_micros() as f64 / visible_rows.len() as f64
1031        );
1032
1033        Ok(results)
1034    }
1035
1036    /// Batch evaluate LEAD function for multiple rows
1037    pub fn evaluate_lead_batch(
1038        &self,
1039        visible_rows: &[usize],
1040        column_name: &str,
1041        offset: i64,
1042    ) -> Result<Vec<DataValue>> {
1043        let start = Instant::now();
1044        let mut results = Vec::with_capacity(visible_rows.len());
1045
1046        // Validate column exists
1047        let source_table = self.source.source();
1048        let _ = source_table
1049            .get_column_index(column_name)
1050            .ok_or_else(|| anyhow!("Column '{}' not found", column_name))?;
1051
1052        for &row_idx in visible_rows {
1053            // LEAD uses positive offset
1054            let value =
1055                if let Some(val) = self.get_offset_value(row_idx, offset as i32, column_name) {
1056                    val
1057                } else {
1058                    DataValue::Null
1059                };
1060            results.push(value);
1061        }
1062
1063        debug!(
1064            "evaluate_lead_batch: {} rows in {:.3}ms ({:.2}μs/row)",
1065            visible_rows.len(),
1066            start.elapsed().as_secs_f64() * 1000.0,
1067            start.elapsed().as_micros() as f64 / visible_rows.len() as f64
1068        );
1069
1070        Ok(results)
1071    }
1072
1073    /// Batch evaluate ROW_NUMBER function for multiple rows
1074    pub fn evaluate_row_number_batch(&self, visible_rows: &[usize]) -> Result<Vec<DataValue>> {
1075        let start = Instant::now();
1076        let mut results = Vec::with_capacity(visible_rows.len());
1077
1078        for &row_idx in visible_rows {
1079            let row_num = self.get_row_number(row_idx);
1080            if row_num > 0 {
1081                results.push(DataValue::Integer(row_num as i64));
1082            } else {
1083                // This shouldn't happen if WindowContext is properly constructed
1084                results.push(DataValue::Null);
1085            }
1086        }
1087
1088        debug!(
1089            "evaluate_row_number_batch: {} rows in {:.3}ms ({:.2}μs/row)",
1090            visible_rows.len(),
1091            start.elapsed().as_secs_f64() * 1000.0,
1092            start.elapsed().as_micros() as f64 / visible_rows.len() as f64
1093        );
1094
1095        Ok(results)
1096    }
1097
1098    /// Get rank of a row within its partition
1099    /// Ties get the same rank, and next rank(s) are skipped
1100    pub fn get_rank(&self, row_index: usize) -> i64 {
1101        if let Some(partition_key) = self.row_to_partition.get(&row_index) {
1102            if let Some(partition) = self.partitions.get(partition_key) {
1103                // Get the value at this row for comparison
1104                if let Some(position) = partition.get_position(row_index) {
1105                    let mut rows_before = 0;
1106
1107                    // Compare with all previous rows in the partition
1108                    for i in 0..position {
1109                        let prev_row = partition.rows[i];
1110                        if self.compare_rows_for_rank(prev_row, row_index) < 0 {
1111                            rows_before += 1;
1112                        }
1113                    }
1114
1115                    let rank = rows_before + 1;
1116                    return rank;
1117                }
1118            }
1119        }
1120        1 // Default if not found
1121    }
1122
1123    /// Get dense rank of a row within its partition
1124    /// Ties get the same rank, but ranks are not skipped
1125    pub fn get_dense_rank(&self, row_index: usize) -> i64 {
1126        if let Some(partition_key) = self.row_to_partition.get(&row_index) {
1127            if let Some(partition) = self.partitions.get(partition_key) {
1128                if let Some(position) = partition.get_position(row_index) {
1129                    let mut dense_rank = 1;
1130                    let mut last_value_seen = None;
1131
1132                    // Look at all rows before this one
1133                    for i in 0..position {
1134                        let prev_row = partition.rows[i];
1135                        let cmp = self.compare_rows_for_rank(prev_row, row_index);
1136
1137                        if cmp < 0 {
1138                            // This row has a better rank
1139                            if last_value_seen.is_none()
1140                                || last_value_seen.map_or(true, |last| {
1141                                    self.compare_rows_for_rank(last, prev_row) != 0
1142                                })
1143                            {
1144                                dense_rank += 1;
1145                                last_value_seen = Some(prev_row);
1146                            }
1147                        }
1148                    }
1149
1150                    return dense_rank;
1151                }
1152            }
1153        }
1154        1 // Default if not found
1155    }
1156
1157    /// Compare two rows based on ORDER BY columns for ranking
1158    /// Returns -1 if row1 < row2, 0 if equal, 1 if row1 > row2
1159    fn compare_rows_for_rank(&self, row1: usize, row2: usize) -> i32 {
1160        let source_table = self.source.source();
1161
1162        for order_item in &self.spec.order_by {
1163            if let SqlExpression::Column(col) = &order_item.expr {
1164                if let Some(col_idx) = source_table.get_column_index(&col.name) {
1165                    let val1 = source_table.get_value(row1, col_idx);
1166                    let val2 = source_table.get_value(row2, col_idx);
1167
1168                    let cmp = match (val1, val2) {
1169                        (Some(v1), Some(v2)) => {
1170                            crate::data::datavalue_compare::compare_datavalues(v1, v2)
1171                        }
1172                        (None, Some(_)) => std::cmp::Ordering::Less,
1173                        (Some(_), None) => std::cmp::Ordering::Greater,
1174                        (None, None) => std::cmp::Ordering::Equal,
1175                    };
1176
1177                    if cmp != std::cmp::Ordering::Equal {
1178                        let result = match order_item.direction {
1179                            SortDirection::Asc => cmp,
1180                            SortDirection::Desc => cmp.reverse(),
1181                        };
1182
1183                        return match result {
1184                            std::cmp::Ordering::Less => -1,
1185                            std::cmp::Ordering::Equal => 0,
1186                            std::cmp::Ordering::Greater => 1,
1187                        };
1188                    }
1189                }
1190            }
1191        }
1192
1193        0 // All columns equal
1194    }
1195
1196    /// Batch evaluate RANK function for multiple rows
1197    pub fn evaluate_rank_batch(&self, visible_rows: &[usize]) -> Result<Vec<DataValue>> {
1198        let start = Instant::now();
1199        let mut results = Vec::with_capacity(visible_rows.len());
1200
1201        for &row_idx in visible_rows {
1202            let rank = self.get_rank(row_idx);
1203            results.push(DataValue::Integer(rank));
1204        }
1205
1206        debug!(
1207            "evaluate_rank_batch: {} rows in {:.3}ms ({:.2}μs/row)",
1208            visible_rows.len(),
1209            start.elapsed().as_secs_f64() * 1000.0,
1210            start.elapsed().as_micros() as f64 / visible_rows.len() as f64
1211        );
1212
1213        Ok(results)
1214    }
1215
1216    /// Batch evaluate DENSE_RANK function for multiple rows
1217    pub fn evaluate_dense_rank_batch(&self, visible_rows: &[usize]) -> Result<Vec<DataValue>> {
1218        let start = Instant::now();
1219        let mut results = Vec::with_capacity(visible_rows.len());
1220
1221        for &row_idx in visible_rows {
1222            let rank = self.get_dense_rank(row_idx);
1223            results.push(DataValue::Integer(rank));
1224        }
1225
1226        debug!(
1227            "evaluate_dense_rank_batch: {} rows in {:.3}ms ({:.2}μs/row)",
1228            visible_rows.len(),
1229            start.elapsed().as_secs_f64() * 1000.0,
1230            start.elapsed().as_micros() as f64 / visible_rows.len() as f64
1231        );
1232
1233        Ok(results)
1234    }
1235
1236    /// Batch evaluate SUM window aggregate
1237    pub fn evaluate_sum_batch(
1238        &self,
1239        visible_rows: &[usize],
1240        column_name: &str,
1241    ) -> Result<Vec<DataValue>> {
1242        let start = Instant::now();
1243        let mut results = Vec::with_capacity(visible_rows.len());
1244
1245        // Check if this is a running aggregate (UNBOUNDED PRECEDING to CURRENT ROW)
1246        let is_running_aggregate = self.is_running_aggregate_frame();
1247
1248        if is_running_aggregate && !visible_rows.is_empty() {
1249            // Optimized path for running aggregates
1250            debug!(
1251                "Using optimized running sum for {} rows",
1252                visible_rows.len()
1253            );
1254
1255            // Group visible rows by partition
1256            let mut partition_groups: BTreeMap<PartitionKey, Vec<(usize, usize)>> = BTreeMap::new();
1257            for (idx, &row_idx) in visible_rows.iter().enumerate() {
1258                if let Some(partition_key) = self.row_to_partition.get(&row_idx) {
1259                    partition_groups
1260                        .entry(partition_key.clone())
1261                        .or_insert_with(Vec::new)
1262                        .push((idx, row_idx));
1263                }
1264            }
1265
1266            let source_table = self.source.source();
1267            let col_idx = source_table
1268                .get_column_index(column_name)
1269                .ok_or_else(|| anyhow!("Column '{}' not found", column_name))?;
1270
1271            // Initialize results with nulls
1272            results.resize(visible_rows.len(), DataValue::Null);
1273
1274            // Process each partition
1275            for (_partition_key, rows) in partition_groups {
1276                if let Some(partition) = self.partitions.get(&_partition_key) {
1277                    let mut running_sum = 0.0;
1278                    let mut has_float = false;
1279                    let mut position_to_sum: HashMap<usize, DataValue> = HashMap::new();
1280
1281                    // Calculate running sum for all positions in this partition
1282                    for (pos, &row_idx) in partition.rows.iter().enumerate() {
1283                        if let Some(value) = source_table.get_value(row_idx, col_idx) {
1284                            match value {
1285                                DataValue::Integer(i) => {
1286                                    running_sum += *i as f64;
1287                                }
1288                                DataValue::Float(f) => {
1289                                    running_sum += f;
1290                                    has_float = true;
1291                                }
1292                                DataValue::Null => {
1293                                    // Skip NULL values
1294                                }
1295                                _ => {
1296                                    // Non-numeric value
1297                                }
1298                            }
1299                        }
1300
1301                        // Store the running sum for this position
1302                        if !has_float
1303                            && running_sum.fract() == 0.0
1304                            && running_sum >= i64::MIN as f64
1305                            && running_sum <= i64::MAX as f64
1306                        {
1307                            position_to_sum.insert(pos, DataValue::Integer(running_sum as i64));
1308                        } else {
1309                            position_to_sum.insert(pos, DataValue::Float(running_sum));
1310                        }
1311                    }
1312
1313                    // Fill in results for visible rows in this partition
1314                    for (result_idx, row_idx) in rows {
1315                        if let Some(pos) = partition.get_position(row_idx) {
1316                            results[result_idx] = position_to_sum
1317                                .get(&pos)
1318                                .cloned()
1319                                .unwrap_or(DataValue::Null);
1320                        }
1321                    }
1322                }
1323            }
1324        } else {
1325            // Regular path for non-running aggregates or other frame types
1326            for &row_idx in visible_rows {
1327                let value = if self.has_frame() {
1328                    self.get_frame_sum(row_idx, column_name)
1329                        .unwrap_or(DataValue::Null)
1330                } else {
1331                    self.get_partition_sum(row_idx, column_name)
1332                        .unwrap_or(DataValue::Null)
1333                };
1334                results.push(value);
1335            }
1336        }
1337
1338        debug!(
1339            "evaluate_sum_batch: {} rows in {:.3}ms ({:.2}μs/row)",
1340            visible_rows.len(),
1341            start.elapsed().as_secs_f64() * 1000.0,
1342            start.elapsed().as_micros() as f64 / visible_rows.len() as f64
1343        );
1344
1345        Ok(results)
1346    }
1347
1348    /// Batch evaluate AVG window aggregate
1349    pub fn evaluate_avg_batch(
1350        &self,
1351        visible_rows: &[usize],
1352        column_name: &str,
1353    ) -> Result<Vec<DataValue>> {
1354        let start = Instant::now();
1355        let mut results = Vec::with_capacity(visible_rows.len());
1356
1357        // Check if this is a running aggregate (UNBOUNDED PRECEDING to CURRENT ROW)
1358        let is_running_aggregate = self.is_running_aggregate_frame();
1359
1360        if is_running_aggregate && !visible_rows.is_empty() {
1361            // Optimized path for running averages
1362            debug!(
1363                "Using optimized running average for {} rows",
1364                visible_rows.len()
1365            );
1366
1367            // Group visible rows by partition
1368            let mut partition_groups: BTreeMap<PartitionKey, Vec<(usize, usize)>> = BTreeMap::new();
1369            for (idx, &row_idx) in visible_rows.iter().enumerate() {
1370                if let Some(partition_key) = self.row_to_partition.get(&row_idx) {
1371                    partition_groups
1372                        .entry(partition_key.clone())
1373                        .or_insert_with(Vec::new)
1374                        .push((idx, row_idx));
1375                }
1376            }
1377
1378            let source_table = self.source.source();
1379            let col_idx = source_table
1380                .get_column_index(column_name)
1381                .ok_or_else(|| anyhow!("Column '{}' not found", column_name))?;
1382
1383            // Initialize results with nulls
1384            results.resize(visible_rows.len(), DataValue::Null);
1385
1386            // Process each partition
1387            for (_partition_key, rows) in partition_groups {
1388                if let Some(partition) = self.partitions.get(&_partition_key) {
1389                    let mut running_sum = 0.0;
1390                    let mut count = 0;
1391                    let mut position_to_avg: HashMap<usize, DataValue> = HashMap::new();
1392
1393                    // Calculate running average for all positions in this partition
1394                    for (pos, &row_idx) in partition.rows.iter().enumerate() {
1395                        if let Some(value) = source_table.get_value(row_idx, col_idx) {
1396                            match value {
1397                                DataValue::Integer(i) => {
1398                                    running_sum += *i as f64;
1399                                    count += 1;
1400                                }
1401                                DataValue::Float(f) => {
1402                                    running_sum += f;
1403                                    count += 1;
1404                                }
1405                                DataValue::Null => {
1406                                    // Skip NULL values
1407                                }
1408                                _ => {
1409                                    // Non-numeric value
1410                                }
1411                            }
1412                        }
1413
1414                        // Store the running average for this position
1415                        if count > 0 {
1416                            position_to_avg
1417                                .insert(pos, DataValue::Float(running_sum / count as f64));
1418                        } else {
1419                            position_to_avg.insert(pos, DataValue::Null);
1420                        }
1421                    }
1422
1423                    // Fill in results for visible rows in this partition
1424                    for (result_idx, row_idx) in rows {
1425                        if let Some(pos) = partition.get_position(row_idx) {
1426                            results[result_idx] = position_to_avg
1427                                .get(&pos)
1428                                .cloned()
1429                                .unwrap_or(DataValue::Null);
1430                        }
1431                    }
1432                }
1433            }
1434        } else {
1435            // Regular path for non-running aggregates or other frame types
1436            for &row_idx in visible_rows {
1437                let value = if self.has_frame() {
1438                    self.get_frame_avg(row_idx, column_name)
1439                        .unwrap_or(DataValue::Null)
1440                } else {
1441                    self.get_partition_avg(row_idx, column_name)
1442                        .unwrap_or(DataValue::Null)
1443                };
1444                results.push(value);
1445            }
1446        }
1447
1448        debug!(
1449            "evaluate_avg_batch: {} rows in {:.3}ms ({:.2}μs/row)",
1450            visible_rows.len(),
1451            start.elapsed().as_secs_f64() * 1000.0,
1452            start.elapsed().as_micros() as f64 / visible_rows.len() as f64
1453        );
1454
1455        Ok(results)
1456    }
1457
1458    /// Batch evaluate MIN window aggregate
1459    pub fn evaluate_min_batch(
1460        &self,
1461        visible_rows: &[usize],
1462        column_name: &str,
1463    ) -> Result<Vec<DataValue>> {
1464        let start = Instant::now();
1465        let mut results = Vec::with_capacity(visible_rows.len());
1466
1467        // Check if this is a running aggregate (UNBOUNDED PRECEDING to CURRENT ROW)
1468        let is_running_aggregate = self.is_running_aggregate_frame();
1469
1470        if is_running_aggregate && !visible_rows.is_empty() {
1471            // Optimized path for running minimum
1472            debug!(
1473                "Using optimized running minimum for {} rows",
1474                visible_rows.len()
1475            );
1476
1477            // Group visible rows by partition
1478            let mut partition_groups: BTreeMap<PartitionKey, Vec<(usize, usize)>> = BTreeMap::new();
1479            for (idx, &row_idx) in visible_rows.iter().enumerate() {
1480                if let Some(partition_key) = self.row_to_partition.get(&row_idx) {
1481                    partition_groups
1482                        .entry(partition_key.clone())
1483                        .or_insert_with(Vec::new)
1484                        .push((idx, row_idx));
1485                }
1486            }
1487
1488            let source_table = self.source.source();
1489            let col_idx = source_table
1490                .get_column_index(column_name)
1491                .ok_or_else(|| anyhow!("Column '{}' not found", column_name))?;
1492
1493            // Initialize results with nulls
1494            results.resize(visible_rows.len(), DataValue::Null);
1495
1496            // Process each partition
1497            for (_partition_key, rows) in partition_groups {
1498                if let Some(partition) = self.partitions.get(&_partition_key) {
1499                    let mut running_min: Option<DataValue> = None;
1500                    let mut position_to_min: HashMap<usize, DataValue> = HashMap::new();
1501
1502                    // Calculate running minimum for all positions in this partition
1503                    for (pos, &row_idx) in partition.rows.iter().enumerate() {
1504                        if let Some(value) = source_table.get_value(row_idx, col_idx) {
1505                            if !matches!(value, DataValue::Null) {
1506                                match &running_min {
1507                                    None => running_min = Some(value.clone()),
1508                                    Some(current_min) => {
1509                                        use crate::data::datavalue_compare::compare_datavalues;
1510                                        if compare_datavalues(value, current_min).is_lt() {
1511                                            running_min = Some(value.clone());
1512                                        }
1513                                    }
1514                                }
1515                            }
1516                        }
1517
1518                        // Store the running minimum for this position
1519                        position_to_min.insert(pos, running_min.clone().unwrap_or(DataValue::Null));
1520                    }
1521
1522                    // Fill in results for visible rows in this partition
1523                    for (result_idx, row_idx) in rows {
1524                        if let Some(pos) = partition.get_position(row_idx) {
1525                            results[result_idx] = position_to_min
1526                                .get(&pos)
1527                                .cloned()
1528                                .unwrap_or(DataValue::Null);
1529                        }
1530                    }
1531                }
1532            }
1533        } else {
1534            // Regular path for non-running aggregates or other frame types
1535            for &row_idx in visible_rows {
1536                let value = if self.has_frame() {
1537                    self.get_frame_min(row_idx, column_name)
1538                        .unwrap_or(DataValue::Null)
1539                } else {
1540                    self.get_partition_min(row_idx, column_name)
1541                        .unwrap_or(DataValue::Null)
1542                };
1543                results.push(value);
1544            }
1545        }
1546
1547        debug!(
1548            "evaluate_min_batch: {} rows in {:.3}ms ({:.2}μs/row)",
1549            visible_rows.len(),
1550            start.elapsed().as_secs_f64() * 1000.0,
1551            start.elapsed().as_micros() as f64 / visible_rows.len() as f64
1552        );
1553
1554        Ok(results)
1555    }
1556
1557    /// Batch evaluate MAX window aggregate
1558    pub fn evaluate_max_batch(
1559        &self,
1560        visible_rows: &[usize],
1561        column_name: &str,
1562    ) -> Result<Vec<DataValue>> {
1563        let start = Instant::now();
1564        let mut results = Vec::with_capacity(visible_rows.len());
1565
1566        // Check if this is a running aggregate (UNBOUNDED PRECEDING to CURRENT ROW)
1567        let is_running_aggregate = self.is_running_aggregate_frame();
1568
1569        if is_running_aggregate && !visible_rows.is_empty() {
1570            // Optimized path for running maximum
1571            debug!(
1572                "Using optimized running maximum for {} rows",
1573                visible_rows.len()
1574            );
1575
1576            // Group visible rows by partition
1577            let mut partition_groups: BTreeMap<PartitionKey, Vec<(usize, usize)>> = BTreeMap::new();
1578            for (idx, &row_idx) in visible_rows.iter().enumerate() {
1579                if let Some(partition_key) = self.row_to_partition.get(&row_idx) {
1580                    partition_groups
1581                        .entry(partition_key.clone())
1582                        .or_insert_with(Vec::new)
1583                        .push((idx, row_idx));
1584                }
1585            }
1586
1587            let source_table = self.source.source();
1588            let col_idx = source_table
1589                .get_column_index(column_name)
1590                .ok_or_else(|| anyhow!("Column '{}' not found", column_name))?;
1591
1592            // Initialize results with nulls
1593            results.resize(visible_rows.len(), DataValue::Null);
1594
1595            // Process each partition
1596            for (_partition_key, rows) in partition_groups {
1597                if let Some(partition) = self.partitions.get(&_partition_key) {
1598                    let mut running_max: Option<DataValue> = None;
1599                    let mut position_to_max: HashMap<usize, DataValue> = HashMap::new();
1600
1601                    // Calculate running maximum for all positions in this partition
1602                    for (pos, &row_idx) in partition.rows.iter().enumerate() {
1603                        if let Some(value) = source_table.get_value(row_idx, col_idx) {
1604                            if !matches!(value, DataValue::Null) {
1605                                match &running_max {
1606                                    None => running_max = Some(value.clone()),
1607                                    Some(current_max) => {
1608                                        use crate::data::datavalue_compare::compare_datavalues;
1609                                        if compare_datavalues(value, current_max).is_gt() {
1610                                            running_max = Some(value.clone());
1611                                        }
1612                                    }
1613                                }
1614                            }
1615                        }
1616
1617                        // Store the running maximum for this position
1618                        position_to_max.insert(pos, running_max.clone().unwrap_or(DataValue::Null));
1619                    }
1620
1621                    // Fill in results for visible rows in this partition
1622                    for (result_idx, row_idx) in rows {
1623                        if let Some(pos) = partition.get_position(row_idx) {
1624                            results[result_idx] = position_to_max
1625                                .get(&pos)
1626                                .cloned()
1627                                .unwrap_or(DataValue::Null);
1628                        }
1629                    }
1630                }
1631            }
1632        } else {
1633            // Regular path for non-running aggregates or other frame types
1634            for &row_idx in visible_rows {
1635                let value = if self.has_frame() {
1636                    self.get_frame_max(row_idx, column_name)
1637                        .unwrap_or(DataValue::Null)
1638                } else {
1639                    self.get_partition_max(row_idx, column_name)
1640                        .unwrap_or(DataValue::Null)
1641                };
1642                results.push(value);
1643            }
1644        }
1645
1646        debug!(
1647            "evaluate_max_batch: {} rows in {:.3}ms ({:.2}μs/row)",
1648            visible_rows.len(),
1649            start.elapsed().as_secs_f64() * 1000.0,
1650            start.elapsed().as_micros() as f64 / visible_rows.len() as f64
1651        );
1652
1653        Ok(results)
1654    }
1655
1656    /// Batch evaluate COUNT window aggregate
1657    pub fn evaluate_count_batch(
1658        &self,
1659        visible_rows: &[usize],
1660        column_name: Option<&str>,
1661    ) -> Result<Vec<DataValue>> {
1662        let start = Instant::now();
1663        let mut results = Vec::with_capacity(visible_rows.len());
1664
1665        // Check if this is a running aggregate (UNBOUNDED PRECEDING to CURRENT ROW)
1666        let is_running_aggregate = self.is_running_aggregate_frame();
1667
1668        if is_running_aggregate && !visible_rows.is_empty() {
1669            // Optimized path for running count
1670            debug!(
1671                "Using optimized running count for {} rows",
1672                visible_rows.len()
1673            );
1674
1675            // Group visible rows by partition
1676            let mut partition_groups: BTreeMap<PartitionKey, Vec<(usize, usize)>> = BTreeMap::new();
1677            for (idx, &row_idx) in visible_rows.iter().enumerate() {
1678                if let Some(partition_key) = self.row_to_partition.get(&row_idx) {
1679                    partition_groups
1680                        .entry(partition_key.clone())
1681                        .or_insert_with(Vec::new)
1682                        .push((idx, row_idx));
1683                }
1684            }
1685
1686            let source_table = self.source.source();
1687            let col_idx = if let Some(col_name) = column_name {
1688                source_table.get_column_index(col_name)
1689            } else {
1690                None
1691            };
1692
1693            // Initialize results with nulls
1694            results.resize(visible_rows.len(), DataValue::Null);
1695
1696            // Process each partition
1697            for (_partition_key, rows) in partition_groups {
1698                if let Some(partition) = self.partitions.get(&_partition_key) {
1699                    let mut running_count = 0i64;
1700                    let mut position_to_count: HashMap<usize, DataValue> = HashMap::new();
1701
1702                    // Calculate running count for all positions in this partition
1703                    for (pos, &row_idx) in partition.rows.iter().enumerate() {
1704                        if let Some(col_idx) = col_idx {
1705                            // COUNT(column) - count non-null values
1706                            if let Some(value) = source_table.get_value(row_idx, col_idx) {
1707                                if !matches!(value, DataValue::Null) {
1708                                    running_count += 1;
1709                                }
1710                            }
1711                        } else {
1712                            // COUNT(*) - always increment
1713                            running_count += 1;
1714                        }
1715
1716                        // Store the running count for this position
1717                        position_to_count.insert(pos, DataValue::Integer(running_count));
1718                    }
1719
1720                    // Fill in results for visible rows in this partition
1721                    for (result_idx, row_idx) in rows {
1722                        if let Some(pos) = partition.get_position(row_idx) {
1723                            results[result_idx] = position_to_count
1724                                .get(&pos)
1725                                .cloned()
1726                                .unwrap_or(DataValue::Null);
1727                        }
1728                    }
1729                }
1730            }
1731        } else {
1732            // Regular path for non-running aggregates or other frame types
1733            for &row_idx in visible_rows {
1734                let value = if self.has_frame() {
1735                    self.get_frame_count(row_idx, column_name)
1736                        .unwrap_or(DataValue::Null)
1737                } else {
1738                    self.get_partition_count(row_idx, column_name)
1739                        .unwrap_or(DataValue::Null)
1740                };
1741                results.push(value);
1742            }
1743        }
1744
1745        debug!(
1746            "evaluate_count_batch: {} rows in {:.3}ms ({:.2}μs/row)",
1747            visible_rows.len(),
1748            start.elapsed().as_secs_f64() * 1000.0,
1749            start.elapsed().as_micros() as f64 / visible_rows.len() as f64
1750        );
1751
1752        Ok(results)
1753    }
1754
1755    /// Batch evaluate FIRST_VALUE window function
1756    pub fn evaluate_first_value_batch(
1757        &self,
1758        visible_rows: &[usize],
1759        column_name: &str,
1760    ) -> Result<Vec<DataValue>> {
1761        let start = Instant::now();
1762        let mut results = Vec::with_capacity(visible_rows.len());
1763
1764        for &row_idx in visible_rows {
1765            let value = if self.has_frame() {
1766                self.get_frame_first_value(row_idx, column_name)
1767                    .unwrap_or(DataValue::Null)
1768            } else {
1769                self.get_first_value(row_idx, column_name)
1770                    .unwrap_or(DataValue::Null)
1771            };
1772            results.push(value);
1773        }
1774
1775        debug!(
1776            "evaluate_first_value_batch: {} rows in {:.3}ms ({:.2}μs/row)",
1777            visible_rows.len(),
1778            start.elapsed().as_secs_f64() * 1000.0,
1779            start.elapsed().as_micros() as f64 / visible_rows.len() as f64
1780        );
1781
1782        Ok(results)
1783    }
1784
1785    /// Batch evaluate LAST_VALUE window function
1786    pub fn evaluate_last_value_batch(
1787        &self,
1788        visible_rows: &[usize],
1789        column_name: &str,
1790    ) -> Result<Vec<DataValue>> {
1791        let start = Instant::now();
1792        let mut results = Vec::with_capacity(visible_rows.len());
1793
1794        for &row_idx in visible_rows {
1795            let value = if self.has_frame() {
1796                self.get_frame_last_value(row_idx, column_name)
1797                    .unwrap_or(DataValue::Null)
1798            } else {
1799                self.get_last_value(row_idx, column_name)
1800                    .unwrap_or(DataValue::Null)
1801            };
1802            results.push(value);
1803        }
1804
1805        debug!(
1806            "evaluate_last_value_batch: {} rows in {:.3}ms ({:.2}μs/row)",
1807            visible_rows.len(),
1808            start.elapsed().as_secs_f64() * 1000.0,
1809            start.elapsed().as_micros() as f64 / visible_rows.len() as f64
1810        );
1811
1812        Ok(results)
1813    }
1814}