Skip to main content

datui_lib/widgets/
datatable.rs

1use color_eyre::Result;
2use std::borrow::Cow;
3use std::collections::HashSet;
4use std::sync::Arc;
5use std::{fs, fs::File, path::Path};
6
7use polars::io::HiveOptions;
8use polars::prelude::*;
9use ratatui::{
10    buffer::Buffer,
11    layout::Rect,
12    style::{Color, Modifier, Style},
13    text::{Line, Span},
14    widgets::{
15        Block, Borders, Cell, Padding, Paragraph, Row, StatefulWidget, Table, TableState, Widget,
16    },
17};
18
19use crate::error_display::user_message_from_polars;
20use crate::filter_modal::{FilterOperator, FilterStatement, LogicalOperator};
21use crate::pivot_melt_modal::{MeltSpec, PivotAggregation, PivotSpec};
22use crate::query::parse_query;
23use crate::statistics::collect_lazy;
24use crate::{CompressionFormat, OpenOptions};
25use polars::lazy::frame::pivot::pivot_stable;
26use std::io::{BufReader, Read};
27
28use calamine::{open_workbook_auto, Data, Reader};
29use chrono::{NaiveDate, NaiveDateTime, NaiveTime};
30use orc_rust::ArrowReaderBuilder;
31use tempfile::NamedTempFile;
32
33use arrow::array::types::{
34    Date32Type, Date64Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type,
35    TimestampMillisecondType, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
36};
37use arrow::array::{Array, AsArray};
38use arrow::record_batch::RecordBatch;
39
40fn pivot_agg_expr(agg: PivotAggregation) -> Result<Expr> {
41    let e = col(PlSmallStr::from_static(""));
42    let expr = match agg {
43        PivotAggregation::Last => e.last(),
44        PivotAggregation::First => e.first(),
45        PivotAggregation::Min => e.min(),
46        PivotAggregation::Max => e.max(),
47        PivotAggregation::Avg => e.mean(),
48        PivotAggregation::Med => e.median(),
49        PivotAggregation::Std => e.std(1),
50        PivotAggregation::Count => e.len(),
51    };
52    Ok(expr)
53}
54
55pub struct DataTableState {
56    pub lf: LazyFrame,
57    original_lf: LazyFrame,
58    df: Option<DataFrame>,        // Scrollable columns dataframe
59    locked_df: Option<DataFrame>, // Locked columns dataframe
60    pub table_state: TableState,
61    pub start_row: usize,
62    pub visible_rows: usize,
63    pub termcol_index: usize,
64    pub visible_termcols: usize,
65    pub error: Option<PolarsError>,
66    pub suppress_error_display: bool, // When true, don't show errors in main view (e.g., when query input is active)
67    pub schema: Arc<Schema>,
68    pub num_rows: usize,
69    /// When true, collect() skips the len() query.
70    num_rows_valid: bool,
71    filters: Vec<FilterStatement>,
72    sort_columns: Vec<String>,
73    sort_ascending: bool,
74    pub active_query: String,
75    /// Last executed SQL (Sql tab). Independent from active_query; only one applies to current view.
76    pub active_sql_query: String,
77    /// Last executed fuzzy search (Fuzzy tab). Independent from active_query/active_sql_query.
78    pub active_fuzzy_query: String,
79    column_order: Vec<String>,   // Order of columns for display
80    locked_columns_count: usize, // Number of locked columns (from left)
81    grouped_lf: Option<LazyFrame>,
82    drilled_down_group_index: Option<usize>, // Index of the group we're viewing
83    pub drilled_down_group_key: Option<Vec<String>>, // Key values of the drilled down group
84    pub drilled_down_group_key_columns: Option<Vec<String>>, // Key column names of the drilled down group
85    pages_lookahead: usize,
86    pages_lookback: usize,
87    max_buffered_rows: usize, // 0 = no limit
88    max_buffered_mb: usize,   // 0 = no limit
89    buffered_start_row: usize,
90    buffered_end_row: usize,
91    /// Full buffered DataFrame (all columns in column_order) for the current buffer range.
92    /// When set, column scroll (scroll_left/scroll_right) only re-slices columns without re-collecting from LazyFrame.
93    buffered_df: Option<DataFrame>,
94    proximity_threshold: usize,
95    row_numbers: bool,
96    row_start_index: usize,
97    /// Last applied pivot spec, if current lf is result of a pivot. Used for templates.
98    last_pivot_spec: Option<PivotSpec>,
99    /// Last applied melt spec, if current lf is result of a melt. Used for templates.
100    last_melt_spec: Option<MeltSpec>,
101    /// When set, dataset was loaded with hive partitioning; partition column names for Info panel and predicate pushdown.
102    pub partition_columns: Option<Vec<String>>,
103    /// When set, decompressed CSV was written to this temp file; kept alive so the file exists for lazy scan.
104    decompress_temp_file: Option<NamedTempFile>,
105    /// When true, use Polars streaming engine for LazyFrame collect when the streaming feature is enabled.
106    pub polars_streaming: bool,
107}
108
109/// Inferred type for an Excel column (preserves numbers, bools, dates; avoids stringifying).
110#[derive(Clone, Copy)]
111enum ExcelColType {
112    Int64,
113    Float64,
114    Boolean,
115    Utf8,
116    Date,
117    Datetime,
118}
119
120impl DataTableState {
121    pub fn new(
122        lf: LazyFrame,
123        pages_lookahead: Option<usize>,
124        pages_lookback: Option<usize>,
125        max_buffered_rows: Option<usize>,
126        max_buffered_mb: Option<usize>,
127        polars_streaming: bool,
128    ) -> Result<Self> {
129        let schema = lf.clone().collect_schema()?;
130        let column_order: Vec<String> = schema.iter_names().map(|s| s.to_string()).collect();
131        Ok(Self {
132            original_lf: lf.clone(),
133            lf,
134            df: None,
135            locked_df: None,
136            table_state: TableState::default(),
137            start_row: 0,
138            visible_rows: 0,
139            termcol_index: 0,
140            visible_termcols: 0,
141            error: None,
142            suppress_error_display: false,
143            schema,
144            num_rows: 0,
145            num_rows_valid: false,
146            filters: Vec::new(),
147            sort_columns: Vec::new(),
148            sort_ascending: true,
149            active_query: String::new(),
150            active_sql_query: String::new(),
151            active_fuzzy_query: String::new(),
152            column_order,
153            locked_columns_count: 0,
154            grouped_lf: None,
155            drilled_down_group_index: None,
156            drilled_down_group_key: None,
157            drilled_down_group_key_columns: None,
158            pages_lookahead: pages_lookahead.unwrap_or(3),
159            pages_lookback: pages_lookback.unwrap_or(3),
160            max_buffered_rows: max_buffered_rows.unwrap_or(100_000),
161            max_buffered_mb: max_buffered_mb.unwrap_or(512),
162            buffered_start_row: 0,
163            buffered_end_row: 0,
164            buffered_df: None,
165            proximity_threshold: 0, // Will be set when visible_rows is known
166            row_numbers: false,     // Will be set from options
167            row_start_index: 1,     // Will be set from options
168            last_pivot_spec: None,
169            last_melt_spec: None,
170            partition_columns: None,
171            decompress_temp_file: None,
172            polars_streaming,
173        })
174    }
175
176    /// Create state from an existing LazyFrame (e.g. from Python or in-memory). Uses OpenOptions for display/buffer settings.
177    pub fn from_lazyframe(lf: LazyFrame, options: &crate::OpenOptions) -> Result<Self> {
178        let mut state = Self::new(
179            lf,
180            options.pages_lookahead,
181            options.pages_lookback,
182            options.max_buffered_rows,
183            options.max_buffered_mb,
184            options.polars_streaming,
185        )?;
186        state.row_numbers = options.row_numbers;
187        state.row_start_index = options.row_start_index;
188        Ok(state)
189    }
190
191    /// Create state from a pre-collected schema and LazyFrame (for phased loading). Does not call collect_schema();
192    /// df is None so the UI can render headers while the first collect() runs.
193    /// When `partition_columns` is Some (e.g. hive), column order is partition cols first.
194    pub fn from_schema_and_lazyframe(
195        schema: Arc<Schema>,
196        lf: LazyFrame,
197        options: &crate::OpenOptions,
198        partition_columns: Option<Vec<String>>,
199    ) -> Result<Self> {
200        let column_order: Vec<String> = if let Some(ref part) = partition_columns {
201            let part_set: HashSet<&str> = part.iter().map(String::as_str).collect();
202            let rest: Vec<String> = schema
203                .iter_names()
204                .map(|s| s.to_string())
205                .filter(|c| !part_set.contains(c.as_str()))
206                .collect();
207            part.iter().cloned().chain(rest).collect()
208        } else {
209            schema.iter_names().map(|s| s.to_string()).collect()
210        };
211        Ok(Self {
212            original_lf: lf.clone(),
213            lf,
214            df: None,
215            locked_df: None,
216            table_state: TableState::default(),
217            start_row: 0,
218            visible_rows: 0,
219            termcol_index: 0,
220            visible_termcols: 0,
221            error: None,
222            suppress_error_display: false,
223            schema,
224            num_rows: 0,
225            num_rows_valid: false,
226            filters: Vec::new(),
227            sort_columns: Vec::new(),
228            sort_ascending: true,
229            active_query: String::new(),
230            active_sql_query: String::new(),
231            active_fuzzy_query: String::new(),
232            column_order,
233            locked_columns_count: 0,
234            grouped_lf: None,
235            drilled_down_group_index: None,
236            drilled_down_group_key: None,
237            drilled_down_group_key_columns: None,
238            pages_lookahead: options.pages_lookahead.unwrap_or(3),
239            pages_lookback: options.pages_lookback.unwrap_or(3),
240            max_buffered_rows: options.max_buffered_rows.unwrap_or(100_000),
241            max_buffered_mb: options.max_buffered_mb.unwrap_or(512),
242            buffered_start_row: 0,
243            buffered_end_row: 0,
244            buffered_df: None,
245            proximity_threshold: 0,
246            row_numbers: options.row_numbers,
247            row_start_index: options.row_start_index,
248            last_pivot_spec: None,
249            last_melt_spec: None,
250            partition_columns,
251            decompress_temp_file: None,
252            polars_streaming: options.polars_streaming,
253        })
254    }
255
256    /// Reset LazyFrame and view state to original_lf. Schema is re-fetched so it matches
257    /// after a previous query/SQL that may have changed columns. Caller should call
258    /// collect() afterward if display update is needed (reset/query/fuzzy do; sql_query
259    /// relies on event loop Collect).
260    fn reset_lf_to_original(&mut self) {
261        self.invalidate_num_rows();
262        self.lf = self.original_lf.clone();
263        self.schema = self
264            .original_lf
265            .clone()
266            .collect_schema()
267            .unwrap_or_else(|_| Arc::new(Schema::with_capacity(0)));
268        self.column_order = self.schema.iter_names().map(|s| s.to_string()).collect();
269        self.active_query.clear();
270        self.active_sql_query.clear();
271        self.active_fuzzy_query.clear();
272        self.locked_columns_count = 0;
273        self.filters.clear();
274        self.sort_columns.clear();
275        self.sort_ascending = true;
276        self.start_row = 0;
277        self.termcol_index = 0;
278        self.drilled_down_group_index = None;
279        self.drilled_down_group_key = None;
280        self.drilled_down_group_key_columns = None;
281        self.grouped_lf = None;
282        self.buffered_start_row = 0;
283        self.buffered_end_row = 0;
284        self.buffered_df = None;
285        self.table_state.select(Some(0));
286    }
287
288    pub fn reset(&mut self) {
289        self.reset_lf_to_original();
290        self.error = None;
291        self.suppress_error_display = false;
292        self.last_pivot_spec = None;
293        self.last_melt_spec = None;
294        self.collect();
295        if self.num_rows > 0 {
296            self.start_row = 0;
297        }
298    }
299
300    pub fn from_parquet(
301        path: &Path,
302        pages_lookahead: Option<usize>,
303        pages_lookback: Option<usize>,
304        max_buffered_rows: Option<usize>,
305        max_buffered_mb: Option<usize>,
306        row_numbers: bool,
307        row_start_index: usize,
308    ) -> Result<Self> {
309        let path_str = path.as_os_str().to_string_lossy();
310        let is_glob = path_str.contains('*');
311        let pl_path = PlPath::Local(Arc::from(path));
312        let args = ScanArgsParquet {
313            glob: is_glob,
314            ..Default::default()
315        };
316        let lf = LazyFrame::scan_parquet(pl_path, args)?;
317        let mut state = Self::new(
318            lf,
319            pages_lookahead,
320            pages_lookback,
321            max_buffered_rows,
322            max_buffered_mb,
323            true,
324        )?;
325        state.row_numbers = row_numbers;
326        state.row_start_index = row_start_index;
327        Ok(state)
328    }
329
330    /// Load multiple Parquet files and concatenate them into one LazyFrame (same schema assumed).
331    pub fn from_parquet_paths(
332        paths: &[impl AsRef<Path>],
333        pages_lookahead: Option<usize>,
334        pages_lookback: Option<usize>,
335        max_buffered_rows: Option<usize>,
336        max_buffered_mb: Option<usize>,
337        row_numbers: bool,
338        row_start_index: usize,
339    ) -> Result<Self> {
340        if paths.is_empty() {
341            return Err(color_eyre::eyre::eyre!("No paths provided"));
342        }
343        if paths.len() == 1 {
344            return Self::from_parquet(
345                paths[0].as_ref(),
346                pages_lookahead,
347                pages_lookback,
348                max_buffered_rows,
349                max_buffered_mb,
350                row_numbers,
351                row_start_index,
352            );
353        }
354        let mut lazy_frames = Vec::with_capacity(paths.len());
355        for p in paths {
356            let pl_path = PlPath::Local(Arc::from(p.as_ref()));
357            let lf = LazyFrame::scan_parquet(pl_path, Default::default())?;
358            lazy_frames.push(lf);
359        }
360        let lf = polars::prelude::concat(lazy_frames.as_slice(), Default::default())?;
361        let mut state = Self::new(
362            lf,
363            pages_lookahead,
364            pages_lookback,
365            max_buffered_rows,
366            max_buffered_mb,
367            true,
368        )?;
369        state.row_numbers = row_numbers;
370        state.row_start_index = row_start_index;
371        Ok(state)
372    }
373
374    /// Load a single Arrow IPC / Feather v2 file (lazy).
375    pub fn from_ipc(
376        path: &Path,
377        pages_lookahead: Option<usize>,
378        pages_lookback: Option<usize>,
379        max_buffered_rows: Option<usize>,
380        max_buffered_mb: Option<usize>,
381        row_numbers: bool,
382        row_start_index: usize,
383    ) -> Result<Self> {
384        let pl_path = PlPath::Local(Arc::from(path));
385        let lf = LazyFrame::scan_ipc(pl_path, Default::default(), Default::default())?;
386        let mut state = Self::new(
387            lf,
388            pages_lookahead,
389            pages_lookback,
390            max_buffered_rows,
391            max_buffered_mb,
392            true,
393        )?;
394        state.row_numbers = row_numbers;
395        state.row_start_index = row_start_index;
396        Ok(state)
397    }
398
399    /// Load multiple Arrow IPC / Feather files and concatenate into one LazyFrame.
400    pub fn from_ipc_paths(
401        paths: &[impl AsRef<Path>],
402        pages_lookahead: Option<usize>,
403        pages_lookback: Option<usize>,
404        max_buffered_rows: Option<usize>,
405        max_buffered_mb: Option<usize>,
406        row_numbers: bool,
407        row_start_index: usize,
408    ) -> Result<Self> {
409        if paths.is_empty() {
410            return Err(color_eyre::eyre::eyre!("No paths provided"));
411        }
412        if paths.len() == 1 {
413            return Self::from_ipc(
414                paths[0].as_ref(),
415                pages_lookahead,
416                pages_lookback,
417                max_buffered_rows,
418                max_buffered_mb,
419                row_numbers,
420                row_start_index,
421            );
422        }
423        let mut lazy_frames = Vec::with_capacity(paths.len());
424        for p in paths {
425            let pl_path = PlPath::Local(Arc::from(p.as_ref()));
426            let lf = LazyFrame::scan_ipc(pl_path, Default::default(), Default::default())?;
427            lazy_frames.push(lf);
428        }
429        let lf = polars::prelude::concat(lazy_frames.as_slice(), Default::default())?;
430        let mut state = Self::new(
431            lf,
432            pages_lookahead,
433            pages_lookback,
434            max_buffered_rows,
435            max_buffered_mb,
436            true,
437        )?;
438        state.row_numbers = row_numbers;
439        state.row_start_index = row_start_index;
440        Ok(state)
441    }
442
443    /// Load a single Avro file (eager read, then lazy).
444    pub fn from_avro(
445        path: &Path,
446        pages_lookahead: Option<usize>,
447        pages_lookback: Option<usize>,
448        max_buffered_rows: Option<usize>,
449        max_buffered_mb: Option<usize>,
450        row_numbers: bool,
451        row_start_index: usize,
452    ) -> Result<Self> {
453        let file = File::open(path)?;
454        let df = polars::io::avro::AvroReader::new(file).finish()?;
455        let lf = df.lazy();
456        let mut state = Self::new(
457            lf,
458            pages_lookahead,
459            pages_lookback,
460            max_buffered_rows,
461            max_buffered_mb,
462            true,
463        )?;
464        state.row_numbers = row_numbers;
465        state.row_start_index = row_start_index;
466        Ok(state)
467    }
468
469    /// Load multiple Avro files and concatenate into one LazyFrame.
470    pub fn from_avro_paths(
471        paths: &[impl AsRef<Path>],
472        pages_lookahead: Option<usize>,
473        pages_lookback: Option<usize>,
474        max_buffered_rows: Option<usize>,
475        max_buffered_mb: Option<usize>,
476        row_numbers: bool,
477        row_start_index: usize,
478    ) -> Result<Self> {
479        if paths.is_empty() {
480            return Err(color_eyre::eyre::eyre!("No paths provided"));
481        }
482        if paths.len() == 1 {
483            return Self::from_avro(
484                paths[0].as_ref(),
485                pages_lookahead,
486                pages_lookback,
487                max_buffered_rows,
488                max_buffered_mb,
489                row_numbers,
490                row_start_index,
491            );
492        }
493        let mut lazy_frames = Vec::with_capacity(paths.len());
494        for p in paths {
495            let file = File::open(p.as_ref())?;
496            let df = polars::io::avro::AvroReader::new(file).finish()?;
497            lazy_frames.push(df.lazy());
498        }
499        let lf = polars::prelude::concat(lazy_frames.as_slice(), Default::default())?;
500        let mut state = Self::new(
501            lf,
502            pages_lookahead,
503            pages_lookback,
504            max_buffered_rows,
505            max_buffered_mb,
506            true,
507        )?;
508        state.row_numbers = row_numbers;
509        state.row_start_index = row_start_index;
510        Ok(state)
511    }
512
513    /// Load a single Excel file (xls, xlsx, xlsm, xlsb) using calamine (eager read, then lazy).
514    /// Sheet is selected by 0-based index or name via `excel_sheet`.
515    #[allow(clippy::too_many_arguments)]
516    pub fn from_excel(
517        path: &Path,
518        pages_lookahead: Option<usize>,
519        pages_lookback: Option<usize>,
520        max_buffered_rows: Option<usize>,
521        max_buffered_mb: Option<usize>,
522        row_numbers: bool,
523        row_start_index: usize,
524        excel_sheet: Option<&str>,
525    ) -> Result<Self> {
526        let mut workbook =
527            open_workbook_auto(path).map_err(|e| color_eyre::eyre::eyre!("Excel: {}", e))?;
528        let sheet_names = workbook.sheet_names().to_vec();
529        if sheet_names.is_empty() {
530            return Err(color_eyre::eyre::eyre!("Excel file has no worksheets"));
531        }
532        let range = if let Some(sheet_sel) = excel_sheet {
533            if let Ok(idx) = sheet_sel.parse::<usize>() {
534                workbook
535                    .worksheet_range_at(idx)
536                    .ok_or_else(|| color_eyre::eyre::eyre!("Excel: no sheet at index {}", idx))?
537                    .map_err(|e| color_eyre::eyre::eyre!("Excel: {}", e))?
538            } else {
539                workbook
540                    .worksheet_range(sheet_sel)
541                    .map_err(|e| color_eyre::eyre::eyre!("Excel: {}", e))?
542            }
543        } else {
544            workbook
545                .worksheet_range_at(0)
546                .ok_or_else(|| color_eyre::eyre::eyre!("Excel: no first sheet"))?
547                .map_err(|e| color_eyre::eyre::eyre!("Excel: {}", e))?
548        };
549        let rows: Vec<Vec<Data>> = range.rows().map(|r| r.to_vec()).collect();
550        if rows.is_empty() {
551            let empty_df = DataFrame::new(vec![])?;
552            let mut state = Self::new(
553                empty_df.lazy(),
554                pages_lookahead,
555                pages_lookback,
556                max_buffered_rows,
557                max_buffered_mb,
558                true,
559            )?;
560            state.row_numbers = row_numbers;
561            state.row_start_index = row_start_index;
562            return Ok(state);
563        }
564        let headers: Vec<String> = rows[0]
565            .iter()
566            .map(|c| calamine::DataType::as_string(c).unwrap_or_else(|| c.to_string()))
567            .collect();
568        let n_cols = headers.len();
569        let mut series_vec = Vec::with_capacity(n_cols);
570        for (col_idx, header) in headers.iter().enumerate() {
571            let col_cells: Vec<Option<&Data>> =
572                rows[1..].iter().map(|row| row.get(col_idx)).collect();
573            let inferred = Self::excel_infer_column_type(&col_cells);
574            let name = if header.is_empty() {
575                format!("column_{}", col_idx + 1)
576            } else {
577                header.clone()
578            };
579            let series = Self::excel_column_to_series(name.as_str(), &col_cells, inferred)?;
580            series_vec.push(series.into());
581        }
582        let df = DataFrame::new(series_vec)?;
583        let mut state = Self::new(
584            df.lazy(),
585            pages_lookahead,
586            pages_lookback,
587            max_buffered_rows,
588            max_buffered_mb,
589            true,
590        )?;
591        state.row_numbers = row_numbers;
592        state.row_start_index = row_start_index;
593        Ok(state)
594    }
595
596    /// Infers column type: prefers Int64 for whole-number floats; infers Date/Datetime for
597    /// calamine DateTime/DateTimeIso or for string columns that parse as ISO date/datetime.
598    fn excel_infer_column_type(cells: &[Option<&Data>]) -> ExcelColType {
599        use calamine::DataType as CalamineTrait;
600        let mut has_string = false;
601        let mut has_float = false;
602        let mut has_int = false;
603        let mut has_bool = false;
604        let mut has_datetime = false;
605        for cell in cells.iter().flatten() {
606            if CalamineTrait::is_string(*cell) {
607                has_string = true;
608                break;
609            }
610            if CalamineTrait::is_float(*cell)
611                || CalamineTrait::is_datetime(*cell)
612                || CalamineTrait::is_datetime_iso(*cell)
613            {
614                has_float = true;
615            }
616            if CalamineTrait::is_int(*cell) {
617                has_int = true;
618            }
619            if CalamineTrait::is_bool(*cell) {
620                has_bool = true;
621            }
622            if CalamineTrait::is_datetime(*cell) || CalamineTrait::is_datetime_iso(*cell) {
623                has_datetime = true;
624            }
625        }
626        if has_string {
627            let any_parsed = cells
628                .iter()
629                .flatten()
630                .any(|c| Self::excel_cell_to_naive_datetime(c).is_some());
631            let all_non_empty_parse = cells.iter().flatten().all(|c| {
632                CalamineTrait::is_empty(*c) || Self::excel_cell_to_naive_datetime(c).is_some()
633            });
634            if any_parsed && all_non_empty_parse {
635                if Self::excel_parsed_cells_all_midnight(cells) {
636                    ExcelColType::Date
637                } else {
638                    ExcelColType::Datetime
639                }
640            } else {
641                ExcelColType::Utf8
642            }
643        } else if has_int {
644            ExcelColType::Int64
645        } else if has_datetime {
646            if Self::excel_parsed_cells_all_midnight(cells) {
647                ExcelColType::Date
648            } else {
649                ExcelColType::Datetime
650            }
651        } else if has_float {
652            let all_whole = cells.iter().flatten().all(|cell| {
653                cell.as_f64()
654                    .is_none_or(|f| f.is_finite() && (f - f.trunc()).abs() < 1e-10)
655            });
656            if all_whole {
657                ExcelColType::Int64
658            } else {
659                ExcelColType::Float64
660            }
661        } else if has_bool {
662            ExcelColType::Boolean
663        } else {
664            ExcelColType::Utf8
665        }
666    }
667
668    /// True if every cell that parses as datetime has time 00:00:00.
669    fn excel_parsed_cells_all_midnight(cells: &[Option<&Data>]) -> bool {
670        let midnight = NaiveTime::from_hms_opt(0, 0, 0).expect("valid time");
671        cells
672            .iter()
673            .flatten()
674            .filter_map(|c| Self::excel_cell_to_naive_datetime(c))
675            .all(|dt| dt.time() == midnight)
676    }
677
678    /// Converts a calamine cell to NaiveDateTime (Excel serial, DateTimeIso, or parseable string).
679    fn excel_cell_to_naive_datetime(cell: &Data) -> Option<NaiveDateTime> {
680        use calamine::DataType;
681        if let Some(dt) = cell.as_datetime() {
682            return Some(dt);
683        }
684        let s = cell.get_datetime_iso().or_else(|| cell.get_string())?;
685        Self::parse_naive_datetime_str(s)
686    }
687
688    /// Parses an ISO-style date/datetime string; tries FORMATS in order.
689    fn parse_naive_datetime_str(s: &str) -> Option<NaiveDateTime> {
690        let s = s.trim();
691        if s.is_empty() {
692            return None;
693        }
694        const FORMATS: &[&str] = &[
695            "%Y-%m-%dT%H:%M:%S%.f",
696            "%Y-%m-%dT%H:%M:%S",
697            "%Y-%m-%d %H:%M:%S%.f",
698            "%Y-%m-%d %H:%M:%S",
699            "%Y-%m-%d",
700        ];
701        for fmt in FORMATS {
702            if let Ok(dt) = NaiveDateTime::parse_from_str(s, fmt) {
703                return Some(dt);
704            }
705        }
706        if let Ok(d) = NaiveDate::parse_from_str(s, "%Y-%m-%d") {
707            return Some(d.and_hms_opt(0, 0, 0).expect("midnight"));
708        }
709        None
710    }
711
712    /// Build a Polars Series from a column of calamine cells using the inferred type.
713    fn excel_column_to_series(
714        name: &str,
715        cells: &[Option<&Data>],
716        col_type: ExcelColType,
717    ) -> Result<Series> {
718        use calamine::DataType as CalamineTrait;
719        use polars::datatypes::TimeUnit;
720        let series = match col_type {
721            ExcelColType::Int64 => {
722                let v: Vec<Option<i64>> = cells
723                    .iter()
724                    .map(|c| c.and_then(|cell| cell.as_i64()))
725                    .collect();
726                Series::new(name.into(), v)
727            }
728            ExcelColType::Float64 => {
729                let v: Vec<Option<f64>> = cells
730                    .iter()
731                    .map(|c| c.and_then(|cell| cell.as_f64()))
732                    .collect();
733                Series::new(name.into(), v)
734            }
735            ExcelColType::Boolean => {
736                let v: Vec<Option<bool>> = cells
737                    .iter()
738                    .map(|c| c.and_then(|cell| cell.get_bool()))
739                    .collect();
740                Series::new(name.into(), v)
741            }
742            ExcelColType::Utf8 => {
743                let v: Vec<Option<String>> = cells
744                    .iter()
745                    .map(|c| c.and_then(|cell| cell.as_string()))
746                    .collect();
747                Series::new(name.into(), v)
748            }
749            ExcelColType::Date => {
750                let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).expect("valid date");
751                let v: Vec<Option<i32>> = cells
752                    .iter()
753                    .map(|c| {
754                        c.and_then(Self::excel_cell_to_naive_datetime)
755                            .map(|dt| (dt.date() - epoch).num_days() as i32)
756                    })
757                    .collect();
758                Series::new(name.into(), v).cast(&DataType::Date)?
759            }
760            ExcelColType::Datetime => {
761                let v: Vec<Option<i64>> = cells
762                    .iter()
763                    .map(|c| {
764                        c.and_then(Self::excel_cell_to_naive_datetime)
765                            .map(|dt| dt.and_utc().timestamp_micros())
766                    })
767                    .collect();
768                Series::new(name.into(), v)
769                    .cast(&DataType::Datetime(TimeUnit::Microseconds, None))?
770            }
771        };
772        Ok(series)
773    }
774
775    /// Load a single ORC file (eager read via orc-rust → Arrow, then convert to Polars, then lazy).
776    /// ORC is read fully into memory; see loading-data docs for large-file notes.
777    pub fn from_orc(
778        path: &Path,
779        pages_lookahead: Option<usize>,
780        pages_lookback: Option<usize>,
781        max_buffered_rows: Option<usize>,
782        max_buffered_mb: Option<usize>,
783        row_numbers: bool,
784        row_start_index: usize,
785    ) -> Result<Self> {
786        let file = File::open(path)?;
787        let reader = ArrowReaderBuilder::try_new(file)
788            .map_err(|e| color_eyre::eyre::eyre!("ORC: {}", e))?
789            .build();
790        let batches: Vec<RecordBatch> = reader
791            .collect::<std::result::Result<Vec<_>, _>>()
792            .map_err(|e| color_eyre::eyre::eyre!("ORC: {}", e))?;
793        let df = Self::arrow_record_batches_to_dataframe(&batches)?;
794        let lf = df.lazy();
795        let mut state = Self::new(
796            lf,
797            pages_lookahead,
798            pages_lookback,
799            max_buffered_rows,
800            max_buffered_mb,
801            true,
802        )?;
803        state.row_numbers = row_numbers;
804        state.row_start_index = row_start_index;
805        Ok(state)
806    }
807
808    /// Load multiple ORC files and concatenate into one LazyFrame.
809    pub fn from_orc_paths(
810        paths: &[impl AsRef<Path>],
811        pages_lookahead: Option<usize>,
812        pages_lookback: Option<usize>,
813        max_buffered_rows: Option<usize>,
814        max_buffered_mb: Option<usize>,
815        row_numbers: bool,
816        row_start_index: usize,
817    ) -> Result<Self> {
818        if paths.is_empty() {
819            return Err(color_eyre::eyre::eyre!("No paths provided"));
820        }
821        if paths.len() == 1 {
822            return Self::from_orc(
823                paths[0].as_ref(),
824                pages_lookahead,
825                pages_lookback,
826                max_buffered_rows,
827                max_buffered_mb,
828                row_numbers,
829                row_start_index,
830            );
831        }
832        let mut lazy_frames = Vec::with_capacity(paths.len());
833        for p in paths {
834            let file = File::open(p.as_ref())?;
835            let reader = ArrowReaderBuilder::try_new(file)
836                .map_err(|e| color_eyre::eyre::eyre!("ORC: {}", e))?
837                .build();
838            let batches: Vec<RecordBatch> = reader
839                .collect::<std::result::Result<Vec<_>, _>>()
840                .map_err(|e| color_eyre::eyre::eyre!("ORC: {}", e))?;
841            let df = Self::arrow_record_batches_to_dataframe(&batches)?;
842            lazy_frames.push(df.lazy());
843        }
844        let lf = polars::prelude::concat(lazy_frames.as_slice(), Default::default())?;
845        let mut state = Self::new(
846            lf,
847            pages_lookahead,
848            pages_lookback,
849            max_buffered_rows,
850            max_buffered_mb,
851            true,
852        )?;
853        state.row_numbers = row_numbers;
854        state.row_start_index = row_start_index;
855        Ok(state)
856    }
857
858    /// Convert Arrow (arrow crate 57) RecordBatches to Polars DataFrame by value (ORC uses
859    /// arrow 57; Polars uses polars-arrow, so we cannot use Series::from_arrow).
860    fn arrow_record_batches_to_dataframe(batches: &[RecordBatch]) -> Result<DataFrame> {
861        if batches.is_empty() {
862            return Ok(DataFrame::new(vec![])?);
863        }
864        let mut all_dfs = Vec::with_capacity(batches.len());
865        for batch in batches {
866            let n_cols = batch.num_columns();
867            let schema = batch.schema();
868            let mut series_vec = Vec::with_capacity(n_cols);
869            for (i, col) in batch.columns().iter().enumerate() {
870                let name = schema.field(i).name().as_str();
871                let s = Self::arrow_array_to_polars_series(name, col)?;
872                series_vec.push(s.into());
873            }
874            let df = DataFrame::new(series_vec)?;
875            all_dfs.push(df);
876        }
877        let mut out = all_dfs.remove(0);
878        for df in all_dfs {
879            out = out.vstack(&df)?;
880        }
881        Ok(out)
882    }
883
884    fn arrow_array_to_polars_series(name: &str, array: &dyn Array) -> Result<Series> {
885        use arrow::datatypes::DataType as ArrowDataType;
886        let len = array.len();
887        match array.data_type() {
888            ArrowDataType::Int8 => {
889                let a = array
890                    .as_primitive_opt::<Int8Type>()
891                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Int8 array"))?;
892                let v: Vec<Option<i8>> = (0..len)
893                    .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
894                    .collect();
895                Ok(Series::new(name.into(), v))
896            }
897            ArrowDataType::Int16 => {
898                let a = array
899                    .as_primitive_opt::<Int16Type>()
900                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Int16 array"))?;
901                let v: Vec<Option<i16>> = (0..len)
902                    .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
903                    .collect();
904                Ok(Series::new(name.into(), v))
905            }
906            ArrowDataType::Int32 => {
907                let a = array
908                    .as_primitive_opt::<Int32Type>()
909                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Int32 array"))?;
910                let v: Vec<Option<i32>> = (0..len)
911                    .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
912                    .collect();
913                Ok(Series::new(name.into(), v))
914            }
915            ArrowDataType::Int64 => {
916                let a = array
917                    .as_primitive_opt::<Int64Type>()
918                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Int64 array"))?;
919                let v: Vec<Option<i64>> = (0..len)
920                    .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
921                    .collect();
922                Ok(Series::new(name.into(), v))
923            }
924            ArrowDataType::UInt8 => {
925                let a = array
926                    .as_primitive_opt::<UInt8Type>()
927                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected UInt8 array"))?;
928                let v: Vec<Option<i64>> = (0..len)
929                    .map(|i| {
930                        if a.is_null(i) {
931                            None
932                        } else {
933                            Some(a.value(i) as i64)
934                        }
935                    })
936                    .collect();
937                Ok(Series::new(name.into(), v).cast(&DataType::UInt8)?)
938            }
939            ArrowDataType::UInt16 => {
940                let a = array
941                    .as_primitive_opt::<UInt16Type>()
942                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected UInt16 array"))?;
943                let v: Vec<Option<i64>> = (0..len)
944                    .map(|i| {
945                        if a.is_null(i) {
946                            None
947                        } else {
948                            Some(a.value(i) as i64)
949                        }
950                    })
951                    .collect();
952                Ok(Series::new(name.into(), v).cast(&DataType::UInt16)?)
953            }
954            ArrowDataType::UInt32 => {
955                let a = array
956                    .as_primitive_opt::<UInt32Type>()
957                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected UInt32 array"))?;
958                let v: Vec<Option<u32>> = (0..len)
959                    .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
960                    .collect();
961                Ok(Series::new(name.into(), v))
962            }
963            ArrowDataType::UInt64 => {
964                let a = array
965                    .as_primitive_opt::<UInt64Type>()
966                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected UInt64 array"))?;
967                let v: Vec<Option<u64>> = (0..len)
968                    .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
969                    .collect();
970                Ok(Series::new(name.into(), v))
971            }
972            ArrowDataType::Float32 => {
973                let a = array
974                    .as_primitive_opt::<Float32Type>()
975                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Float32 array"))?;
976                let v: Vec<Option<f32>> = (0..len)
977                    .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
978                    .collect();
979                Ok(Series::new(name.into(), v))
980            }
981            ArrowDataType::Float64 => {
982                let a = array
983                    .as_primitive_opt::<Float64Type>()
984                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Float64 array"))?;
985                let v: Vec<Option<f64>> = (0..len)
986                    .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
987                    .collect();
988                Ok(Series::new(name.into(), v))
989            }
990            ArrowDataType::Boolean => {
991                let a = array
992                    .as_boolean_opt()
993                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Boolean array"))?;
994                let v: Vec<Option<bool>> = (0..len)
995                    .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
996                    .collect();
997                Ok(Series::new(name.into(), v))
998            }
999            ArrowDataType::Utf8 => {
1000                let a = array
1001                    .as_string_opt::<i32>()
1002                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Utf8 array"))?;
1003                let v: Vec<Option<String>> = (0..len)
1004                    .map(|i| {
1005                        if a.is_null(i) {
1006                            None
1007                        } else {
1008                            Some(a.value(i).to_string())
1009                        }
1010                    })
1011                    .collect();
1012                Ok(Series::new(name.into(), v))
1013            }
1014            ArrowDataType::LargeUtf8 => {
1015                let a = array
1016                    .as_string_opt::<i64>()
1017                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected LargeUtf8 array"))?;
1018                let v: Vec<Option<String>> = (0..len)
1019                    .map(|i| {
1020                        if a.is_null(i) {
1021                            None
1022                        } else {
1023                            Some(a.value(i).to_string())
1024                        }
1025                    })
1026                    .collect();
1027                Ok(Series::new(name.into(), v))
1028            }
1029            ArrowDataType::Date32 => {
1030                let a = array
1031                    .as_primitive_opt::<Date32Type>()
1032                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Date32 array"))?;
1033                let v: Vec<Option<i32>> = (0..len)
1034                    .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
1035                    .collect();
1036                Ok(Series::new(name.into(), v))
1037            }
1038            ArrowDataType::Date64 => {
1039                let a = array
1040                    .as_primitive_opt::<Date64Type>()
1041                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Date64 array"))?;
1042                let v: Vec<Option<i64>> = (0..len)
1043                    .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
1044                    .collect();
1045                Ok(Series::new(name.into(), v))
1046            }
1047            ArrowDataType::Timestamp(_, _) => {
1048                let a = array
1049                    .as_primitive_opt::<TimestampMillisecondType>()
1050                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Timestamp array"))?;
1051                let v: Vec<Option<i64>> = (0..len)
1052                    .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
1053                    .collect();
1054                Ok(Series::new(name.into(), v))
1055            }
1056            other => Err(color_eyre::eyre::eyre!(
1057                "ORC: unsupported column type {:?} for column '{}'",
1058                other,
1059                name
1060            )),
1061        }
1062    }
1063
1064    /// Build a LazyFrame for hive-partitioned Parquet only (no schema collection, no partition discovery).
1065    /// Use this for phased loading so "Scanning input" is instant; schema and partition handling happen in DoLoadSchema.
1066    pub fn scan_parquet_hive(path: &Path) -> Result<LazyFrame> {
1067        let path_str = path.as_os_str().to_string_lossy();
1068        let is_glob = path_str.contains('*');
1069        let pl_path = PlPath::Local(Arc::from(path));
1070        let args = ScanArgsParquet {
1071            hive_options: HiveOptions::new_enabled(),
1072            glob: is_glob,
1073            ..Default::default()
1074        };
1075        LazyFrame::scan_parquet(pl_path, args).map_err(Into::into)
1076    }
1077
1078    /// Build a LazyFrame for hive-partitioned Parquet with a pre-computed schema (avoids slow collect_schema across all files).
1079    pub fn scan_parquet_hive_with_schema(path: &Path, schema: Arc<Schema>) -> Result<LazyFrame> {
1080        let path_str = path.as_os_str().to_string_lossy();
1081        let is_glob = path_str.contains('*');
1082        let pl_path = PlPath::Local(Arc::from(path));
1083        let args = ScanArgsParquet {
1084            schema: Some(schema),
1085            hive_options: HiveOptions::new_enabled(),
1086            glob: is_glob,
1087            ..Default::default()
1088        };
1089        LazyFrame::scan_parquet(pl_path, args).map_err(Into::into)
1090    }
1091
1092    /// Find the first parquet file along a single spine of a hive-partitioned directory (same walk as partition discovery).
1093    /// Returns `None` if the directory is empty or has no parquet files along that spine.
1094    fn first_parquet_file_in_hive_dir(path: &Path) -> Option<std::path::PathBuf> {
1095        const MAX_DEPTH: usize = 64;
1096        Self::first_parquet_file_spine(path, 0, MAX_DEPTH)
1097    }
1098
1099    fn first_parquet_file_spine(
1100        path: &Path,
1101        depth: usize,
1102        max_depth: usize,
1103    ) -> Option<std::path::PathBuf> {
1104        if depth >= max_depth {
1105            return None;
1106        }
1107        let entries = fs::read_dir(path).ok()?;
1108        let mut first_partition_child: Option<std::path::PathBuf> = None;
1109        for entry in entries.flatten() {
1110            let child = entry.path();
1111            if child.is_file() {
1112                if child
1113                    .extension()
1114                    .is_some_and(|e| e.eq_ignore_ascii_case("parquet"))
1115                {
1116                    return Some(child);
1117                }
1118            } else if child.is_dir() {
1119                if let Some(name) = child.file_name().and_then(|n| n.to_str()) {
1120                    if name.contains('=') && first_partition_child.is_none() {
1121                        first_partition_child = Some(child);
1122                    }
1123                }
1124            }
1125        }
1126        first_partition_child.and_then(|p| Self::first_parquet_file_spine(&p, depth + 1, max_depth))
1127    }
1128
1129    /// Read schema from a single parquet file (metadata only, no data scan). Used to avoid collect_schema() over many files.
1130    fn read_schema_from_single_parquet(path: &Path) -> Result<Arc<Schema>> {
1131        let file = File::open(path)?;
1132        let mut reader = ParquetReader::new(file);
1133        let arrow_schema = reader.schema()?;
1134        let schema = Schema::from_arrow_schema(arrow_schema.as_ref());
1135        Ok(Arc::new(schema))
1136    }
1137
1138    /// Infer schema from one parquet file in a hive directory and merge with partition columns (Utf8).
1139    /// Returns (merged_schema, partition_columns). Use with scan_parquet_hive_with_schema to avoid slow collect_schema().
1140    /// Only supported when path is a directory (not a glob). Returns Err if no parquet file found or read fails.
1141    pub fn schema_from_one_hive_parquet(path: &Path) -> Result<(Arc<Schema>, Vec<String>)> {
1142        let partition_columns = Self::discover_hive_partition_columns(path);
1143        let one_file = Self::first_parquet_file_in_hive_dir(path)
1144            .ok_or_else(|| color_eyre::eyre::eyre!("No parquet file found in hive directory"))?;
1145        let file_schema = Self::read_schema_from_single_parquet(&one_file)?;
1146        let part_set: HashSet<&str> = partition_columns.iter().map(String::as_str).collect();
1147        let mut merged = Schema::with_capacity(partition_columns.len() + file_schema.len());
1148        for name in &partition_columns {
1149            merged.with_column(name.clone().into(), DataType::String);
1150        }
1151        for (name, dtype) in file_schema.iter() {
1152            if !part_set.contains(name.as_str()) {
1153                merged.with_column(name.clone(), dtype.clone());
1154            }
1155        }
1156        Ok((Arc::new(merged), partition_columns))
1157    }
1158
1159    /// Discover hive partition column names (public for phased loading). Directory: single-spine walk; glob: parse pattern.
1160    pub fn discover_hive_partition_columns(path: &Path) -> Vec<String> {
1161        if path.is_dir() {
1162            Self::discover_partition_columns_from_path(path)
1163        } else {
1164            Self::discover_partition_columns_from_glob_pattern(path)
1165        }
1166    }
1167
1168    /// Discover hive partition column names from a directory path by walking a single
1169    /// "spine" (one branch) of key=value directories. Partition keys are uniform across
1170    /// the tree, so we only need one path to infer [year, month, day] etc. Returns columns
1171    /// in path order. Stops after max_depth levels to avoid runaway on malformed trees.
1172    fn discover_partition_columns_from_path(path: &Path) -> Vec<String> {
1173        const MAX_PARTITION_DEPTH: usize = 64;
1174        let mut columns = Vec::<String>::new();
1175        let mut seen = HashSet::<String>::new();
1176        Self::discover_partition_columns_spine(
1177            path,
1178            &mut columns,
1179            &mut seen,
1180            0,
1181            MAX_PARTITION_DEPTH,
1182        );
1183        columns
1184    }
1185
1186    /// Walk one branch: at this directory, find the first child that is a key=value dir,
1187    /// record the key (if not already seen), then recurse into that one child only.
1188    /// This does O(depth) read_dir calls instead of walking the entire tree.
1189    fn discover_partition_columns_spine(
1190        path: &Path,
1191        columns: &mut Vec<String>,
1192        seen: &mut HashSet<String>,
1193        depth: usize,
1194        max_depth: usize,
1195    ) {
1196        if depth >= max_depth {
1197            return;
1198        }
1199        let Ok(entries) = fs::read_dir(path) else {
1200            return;
1201        };
1202        let mut first_partition_child: Option<std::path::PathBuf> = None;
1203        for entry in entries.flatten() {
1204            let child = entry.path();
1205            if child.is_dir() {
1206                if let Some(name) = child.file_name().and_then(|n| n.to_str()) {
1207                    if let Some((key, _)) = name.split_once('=') {
1208                        if !key.is_empty() && seen.insert(key.to_string()) {
1209                            columns.push(key.to_string());
1210                        }
1211                        if first_partition_child.is_none() {
1212                            first_partition_child = Some(child);
1213                        }
1214                        break;
1215                    }
1216                }
1217            }
1218        }
1219        if let Some(one) = first_partition_child {
1220            Self::discover_partition_columns_spine(&one, columns, seen, depth + 1, max_depth);
1221        }
1222    }
1223
1224    /// Infer partition column names from a glob pattern path (e.g. "data/year=*/month=*/*.parquet").
1225    fn discover_partition_columns_from_glob_pattern(path: &Path) -> Vec<String> {
1226        let path_str = path.as_os_str().to_string_lossy();
1227        let mut columns = Vec::<String>::new();
1228        let mut seen = HashSet::<String>::new();
1229        for segment in path_str.split('/') {
1230            if let Some((key, rest)) = segment.split_once('=') {
1231                if !key.is_empty()
1232                    && (rest == "*" || !rest.contains('*'))
1233                    && seen.insert(key.to_string())
1234                {
1235                    columns.push(key.to_string());
1236                }
1237            }
1238        }
1239        columns
1240    }
1241
1242    /// Load Parquet with Hive partitioning from a directory or glob path.
1243    /// When path is a directory, partition columns are discovered from path structure.
1244    /// When path contains glob (e.g. `**/*.parquet`), partition columns are inferred from the pattern (e.g. `year=*/month=*`).
1245    /// Partition columns are moved to the left in the initial LazyFrame before state is created.
1246    ///
1247    /// **Performance**: The slow part is Polars, not our code. `scan_parquet` + `collect_schema()` trigger
1248    /// path expansion (full directory tree or glob) and parquet metadata reads; we only do a single-spine
1249    /// walk for partition key discovery and cheap schema/select work.
1250    pub fn from_parquet_hive(
1251        path: &Path,
1252        pages_lookahead: Option<usize>,
1253        pages_lookback: Option<usize>,
1254        max_buffered_rows: Option<usize>,
1255        max_buffered_mb: Option<usize>,
1256        row_numbers: bool,
1257        row_start_index: usize,
1258    ) -> Result<Self> {
1259        let path_str = path.as_os_str().to_string_lossy();
1260        let is_glob = path_str.contains('*');
1261        let pl_path = PlPath::Local(Arc::from(path));
1262        let args = ScanArgsParquet {
1263            hive_options: HiveOptions::new_enabled(),
1264            glob: is_glob,
1265            ..Default::default()
1266        };
1267        let mut lf = LazyFrame::scan_parquet(pl_path, args)?;
1268        let schema = lf.collect_schema()?;
1269
1270        let mut discovered = if path.is_dir() {
1271            Self::discover_partition_columns_from_path(path)
1272        } else {
1273            Self::discover_partition_columns_from_glob_pattern(path)
1274        };
1275
1276        // Fallback: glob like "**/*.parquet" has no key= in the pattern, so discovery is empty.
1277        // Try discovering from a directory prefix (e.g. path.parent() or walk up until we find a dir).
1278        if discovered.is_empty() {
1279            let mut dir = path;
1280            while !dir.is_dir() {
1281                match dir.parent() {
1282                    Some(p) => dir = p,
1283                    None => break,
1284                }
1285            }
1286            if dir.is_dir() {
1287                discovered = Self::discover_partition_columns_from_path(dir);
1288            }
1289        }
1290
1291        let partition_columns: Vec<String> = discovered
1292            .into_iter()
1293            .filter(|c| schema.contains(c.as_str()))
1294            .collect();
1295
1296        let new_order: Vec<String> = if partition_columns.is_empty() {
1297            schema.iter_names().map(|s| s.to_string()).collect()
1298        } else {
1299            let part_set: HashSet<&str> = partition_columns.iter().map(String::as_str).collect();
1300            let all_names: Vec<String> = schema.iter_names().map(|s| s.to_string()).collect();
1301            let rest: Vec<String> = all_names
1302                .into_iter()
1303                .filter(|c| !part_set.contains(c.as_str()))
1304                .collect();
1305            partition_columns.iter().cloned().chain(rest).collect()
1306        };
1307
1308        if !partition_columns.is_empty() {
1309            let exprs: Vec<Expr> = new_order.iter().map(|s| col(s.as_str())).collect();
1310            lf = lf.select(exprs);
1311        }
1312
1313        let mut state = Self::new(
1314            lf,
1315            pages_lookahead,
1316            pages_lookback,
1317            max_buffered_rows,
1318            max_buffered_mb,
1319            true,
1320        )?;
1321        state.row_numbers = row_numbers;
1322        state.row_start_index = row_start_index;
1323        state.partition_columns = if partition_columns.is_empty() {
1324            None
1325        } else {
1326            Some(partition_columns)
1327        };
1328        // Ensure display order is partition-first (Self::new uses schema order; be explicit).
1329        state.set_column_order(new_order);
1330        Ok(state)
1331    }
1332
1333    pub fn set_row_numbers(&mut self, enabled: bool) {
1334        self.row_numbers = enabled;
1335    }
1336
1337    pub fn toggle_row_numbers(&mut self) {
1338        self.row_numbers = !self.row_numbers;
1339    }
1340
1341    /// Row number display start (0 or 1); used by go-to-line to interpret user input.
1342    pub fn row_start_index(&self) -> usize {
1343        self.row_start_index
1344    }
1345
1346    /// Decompress a compressed file to a temp file for lazy CSV scan.
1347    fn decompress_compressed_csv_to_temp(
1348        path: &Path,
1349        compression: CompressionFormat,
1350        temp_dir: &Path,
1351    ) -> Result<NamedTempFile> {
1352        let mut temp = NamedTempFile::new_in(temp_dir)?;
1353        let out = temp.as_file_mut();
1354        let mut reader: Box<dyn Read> = match compression {
1355            CompressionFormat::Gzip => {
1356                let f = File::open(path)?;
1357                Box::new(flate2::read::GzDecoder::new(BufReader::new(f)))
1358            }
1359            CompressionFormat::Zstd => {
1360                let f = File::open(path)?;
1361                Box::new(zstd::Decoder::new(BufReader::new(f))?)
1362            }
1363            CompressionFormat::Bzip2 => {
1364                let f = File::open(path)?;
1365                Box::new(bzip2::read::BzDecoder::new(BufReader::new(f)))
1366            }
1367            CompressionFormat::Xz => {
1368                let f = File::open(path)?;
1369                Box::new(xz2::read::XzDecoder::new(BufReader::new(f)))
1370            }
1371        };
1372        std::io::copy(&mut reader, out)?;
1373        out.sync_all()?;
1374        Ok(temp)
1375    }
1376
1377    pub fn from_csv(path: &Path, options: &OpenOptions) -> Result<Self> {
1378        // Determine compression format: explicit option, or auto-detect from extension
1379        let compression = options
1380            .compression
1381            .or_else(|| CompressionFormat::from_extension(path));
1382
1383        if let Some(compression) = compression {
1384            if options.decompress_in_memory {
1385                // Eager read: decompress into memory, then CSV read
1386                match compression {
1387                    CompressionFormat::Gzip | CompressionFormat::Zstd => {
1388                        let mut read_options = CsvReadOptions::default();
1389                        if let Some(skip_lines) = options.skip_lines {
1390                            read_options.skip_lines = skip_lines;
1391                        }
1392                        if let Some(skip_rows) = options.skip_rows {
1393                            read_options.skip_rows = skip_rows;
1394                        }
1395                        if let Some(has_header) = options.has_header {
1396                            read_options.has_header = has_header;
1397                        }
1398                        read_options = read_options.map_parse_options(|opts| {
1399                            opts.with_try_parse_dates(options.parse_dates)
1400                        });
1401                        let df = read_options
1402                            .try_into_reader_with_file_path(Some(path.into()))?
1403                            .finish()?;
1404                        let lf = df.lazy();
1405                        let mut state = Self::new(
1406                            lf,
1407                            options.pages_lookahead,
1408                            options.pages_lookback,
1409                            options.max_buffered_rows,
1410                            options.max_buffered_mb,
1411                            options.polars_streaming,
1412                        )?;
1413                        state.row_numbers = options.row_numbers;
1414                        state.row_start_index = options.row_start_index;
1415                        Ok(state)
1416                    }
1417                    CompressionFormat::Bzip2 => {
1418                        let file = File::open(path)?;
1419                        let mut decoder = bzip2::read::BzDecoder::new(BufReader::new(file));
1420                        let mut decompressed = Vec::new();
1421                        decoder.read_to_end(&mut decompressed)?;
1422                        let mut read_options = CsvReadOptions::default();
1423                        if let Some(skip_lines) = options.skip_lines {
1424                            read_options.skip_lines = skip_lines;
1425                        }
1426                        if let Some(skip_rows) = options.skip_rows {
1427                            read_options.skip_rows = skip_rows;
1428                        }
1429                        if let Some(has_header) = options.has_header {
1430                            read_options.has_header = has_header;
1431                        }
1432                        read_options = read_options.map_parse_options(|opts| {
1433                            opts.with_try_parse_dates(options.parse_dates)
1434                        });
1435                        let df = CsvReader::new(std::io::Cursor::new(decompressed))
1436                            .with_options(read_options)
1437                            .finish()?;
1438                        let lf = df.lazy();
1439                        let mut state = Self::new(
1440                            lf,
1441                            options.pages_lookahead,
1442                            options.pages_lookback,
1443                            options.max_buffered_rows,
1444                            options.max_buffered_mb,
1445                            options.polars_streaming,
1446                        )?;
1447                        state.row_numbers = options.row_numbers;
1448                        state.row_start_index = options.row_start_index;
1449                        Ok(state)
1450                    }
1451                    CompressionFormat::Xz => {
1452                        let file = File::open(path)?;
1453                        let mut decoder = xz2::read::XzDecoder::new(BufReader::new(file));
1454                        let mut decompressed = Vec::new();
1455                        decoder.read_to_end(&mut decompressed)?;
1456                        let mut read_options = CsvReadOptions::default();
1457                        if let Some(skip_lines) = options.skip_lines {
1458                            read_options.skip_lines = skip_lines;
1459                        }
1460                        if let Some(skip_rows) = options.skip_rows {
1461                            read_options.skip_rows = skip_rows;
1462                        }
1463                        if let Some(has_header) = options.has_header {
1464                            read_options.has_header = has_header;
1465                        }
1466                        read_options = read_options.map_parse_options(|opts| {
1467                            opts.with_try_parse_dates(options.parse_dates)
1468                        });
1469                        let df = CsvReader::new(std::io::Cursor::new(decompressed))
1470                            .with_options(read_options)
1471                            .finish()?;
1472                        let lf = df.lazy();
1473                        let mut state = Self::new(
1474                            lf,
1475                            options.pages_lookahead,
1476                            options.pages_lookback,
1477                            options.max_buffered_rows,
1478                            options.max_buffered_mb,
1479                            options.polars_streaming,
1480                        )?;
1481                        state.row_numbers = options.row_numbers;
1482                        state.row_start_index = options.row_start_index;
1483                        Ok(state)
1484                    }
1485                }
1486            } else {
1487                // Decompress to temp file, then lazy scan
1488                let temp_dir = options.temp_dir.clone().unwrap_or_else(std::env::temp_dir);
1489                let temp = Self::decompress_compressed_csv_to_temp(path, compression, &temp_dir)?;
1490                let mut state = Self::from_csv_customize(
1491                    temp.path(),
1492                    options.pages_lookahead,
1493                    options.pages_lookback,
1494                    options.max_buffered_rows,
1495                    options.max_buffered_mb,
1496                    |mut reader| {
1497                        if let Some(skip_lines) = options.skip_lines {
1498                            reader = reader.with_skip_lines(skip_lines);
1499                        }
1500                        if let Some(skip_rows) = options.skip_rows {
1501                            reader = reader.with_skip_rows(skip_rows);
1502                        }
1503                        if let Some(has_header) = options.has_header {
1504                            reader = reader.with_has_header(has_header);
1505                        }
1506                        reader = reader.with_try_parse_dates(options.parse_dates);
1507                        reader
1508                    },
1509                )?;
1510                state.row_numbers = options.row_numbers;
1511                state.row_start_index = options.row_start_index;
1512                state.decompress_temp_file = Some(temp);
1513                Ok(state)
1514            }
1515        } else {
1516            // For uncompressed files, use lazy scanning (more efficient)
1517            let mut state = Self::from_csv_customize(
1518                path,
1519                options.pages_lookahead,
1520                options.pages_lookback,
1521                options.max_buffered_rows,
1522                options.max_buffered_mb,
1523                |mut reader| {
1524                    if let Some(skip_lines) = options.skip_lines {
1525                        reader = reader.with_skip_lines(skip_lines);
1526                    }
1527                    if let Some(skip_rows) = options.skip_rows {
1528                        reader = reader.with_skip_rows(skip_rows);
1529                    }
1530                    if let Some(has_header) = options.has_header {
1531                        reader = reader.with_has_header(has_header);
1532                    }
1533                    reader = reader.with_try_parse_dates(options.parse_dates);
1534                    reader
1535                },
1536            )?;
1537            state.row_numbers = options.row_numbers;
1538            Ok(state)
1539        }
1540    }
1541
1542    pub fn from_csv_customize<F>(
1543        path: &Path,
1544        pages_lookahead: Option<usize>,
1545        pages_lookback: Option<usize>,
1546        max_buffered_rows: Option<usize>,
1547        max_buffered_mb: Option<usize>,
1548        func: F,
1549    ) -> Result<Self>
1550    where
1551        F: FnOnce(LazyCsvReader) -> LazyCsvReader,
1552    {
1553        let pl_path = PlPath::Local(Arc::from(path));
1554        let reader = LazyCsvReader::new(pl_path);
1555        let lf = func(reader).finish()?;
1556        Self::new(
1557            lf,
1558            pages_lookahead,
1559            pages_lookback,
1560            max_buffered_rows,
1561            max_buffered_mb,
1562            true,
1563        )
1564    }
1565
1566    /// Load multiple CSV files (uncompressed) and concatenate into one LazyFrame.
1567    pub fn from_csv_paths(paths: &[impl AsRef<Path>], options: &OpenOptions) -> Result<Self> {
1568        if paths.is_empty() {
1569            return Err(color_eyre::eyre::eyre!("No paths provided"));
1570        }
1571        if paths.len() == 1 {
1572            return Self::from_csv(paths[0].as_ref(), options);
1573        }
1574        let mut lazy_frames = Vec::with_capacity(paths.len());
1575        for p in paths {
1576            let pl_path = PlPath::Local(Arc::from(p.as_ref()));
1577            let mut reader = LazyCsvReader::new(pl_path);
1578            if let Some(skip_lines) = options.skip_lines {
1579                reader = reader.with_skip_lines(skip_lines);
1580            }
1581            if let Some(skip_rows) = options.skip_rows {
1582                reader = reader.with_skip_rows(skip_rows);
1583            }
1584            if let Some(has_header) = options.has_header {
1585                reader = reader.with_has_header(has_header);
1586            }
1587            reader = reader.with_try_parse_dates(options.parse_dates);
1588            let lf = reader.finish()?;
1589            lazy_frames.push(lf);
1590        }
1591        let lf = polars::prelude::concat(lazy_frames.as_slice(), Default::default())?;
1592        let mut state = Self::new(
1593            lf,
1594            options.pages_lookahead,
1595            options.pages_lookback,
1596            options.max_buffered_rows,
1597            options.max_buffered_mb,
1598            options.polars_streaming,
1599        )?;
1600        state.row_numbers = options.row_numbers;
1601        state.row_start_index = options.row_start_index;
1602        Ok(state)
1603    }
1604
1605    pub fn from_ndjson(
1606        path: &Path,
1607        pages_lookahead: Option<usize>,
1608        pages_lookback: Option<usize>,
1609        max_buffered_rows: Option<usize>,
1610        max_buffered_mb: Option<usize>,
1611        row_numbers: bool,
1612        row_start_index: usize,
1613    ) -> Result<Self> {
1614        let pl_path = PlPath::Local(Arc::from(path));
1615        let lf = LazyJsonLineReader::new(pl_path).finish()?;
1616        let mut state = Self::new(
1617            lf,
1618            pages_lookahead,
1619            pages_lookback,
1620            max_buffered_rows,
1621            max_buffered_mb,
1622            true,
1623        )?;
1624        state.row_numbers = row_numbers;
1625        state.row_start_index = row_start_index;
1626        Ok(state)
1627    }
1628
1629    /// Load multiple NDJSON files and concatenate into one LazyFrame.
1630    pub fn from_ndjson_paths(
1631        paths: &[impl AsRef<Path>],
1632        pages_lookahead: Option<usize>,
1633        pages_lookback: Option<usize>,
1634        max_buffered_rows: Option<usize>,
1635        max_buffered_mb: Option<usize>,
1636        row_numbers: bool,
1637        row_start_index: usize,
1638    ) -> Result<Self> {
1639        if paths.is_empty() {
1640            return Err(color_eyre::eyre::eyre!("No paths provided"));
1641        }
1642        if paths.len() == 1 {
1643            return Self::from_ndjson(
1644                paths[0].as_ref(),
1645                pages_lookahead,
1646                pages_lookback,
1647                max_buffered_rows,
1648                max_buffered_mb,
1649                row_numbers,
1650                row_start_index,
1651            );
1652        }
1653        let mut lazy_frames = Vec::with_capacity(paths.len());
1654        for p in paths {
1655            let pl_path = PlPath::Local(Arc::from(p.as_ref()));
1656            let lf = LazyJsonLineReader::new(pl_path).finish()?;
1657            lazy_frames.push(lf);
1658        }
1659        let lf = polars::prelude::concat(lazy_frames.as_slice(), Default::default())?;
1660        let mut state = Self::new(
1661            lf,
1662            pages_lookahead,
1663            pages_lookback,
1664            max_buffered_rows,
1665            max_buffered_mb,
1666            true,
1667        )?;
1668        state.row_numbers = row_numbers;
1669        state.row_start_index = row_start_index;
1670        Ok(state)
1671    }
1672
1673    pub fn from_json(
1674        path: &Path,
1675        pages_lookahead: Option<usize>,
1676        pages_lookback: Option<usize>,
1677        max_buffered_rows: Option<usize>,
1678        max_buffered_mb: Option<usize>,
1679        row_numbers: bool,
1680        row_start_index: usize,
1681    ) -> Result<Self> {
1682        Self::from_json_with_format(
1683            path,
1684            pages_lookahead,
1685            pages_lookback,
1686            max_buffered_rows,
1687            max_buffered_mb,
1688            row_numbers,
1689            row_start_index,
1690            JsonFormat::Json,
1691        )
1692    }
1693
1694    pub fn from_json_lines(
1695        path: &Path,
1696        pages_lookahead: Option<usize>,
1697        pages_lookback: Option<usize>,
1698        max_buffered_rows: Option<usize>,
1699        max_buffered_mb: Option<usize>,
1700        row_numbers: bool,
1701        row_start_index: usize,
1702    ) -> Result<Self> {
1703        Self::from_json_with_format(
1704            path,
1705            pages_lookahead,
1706            pages_lookback,
1707            max_buffered_rows,
1708            max_buffered_mb,
1709            row_numbers,
1710            row_start_index,
1711            JsonFormat::JsonLines,
1712        )
1713    }
1714
1715    #[allow(clippy::too_many_arguments)]
1716    fn from_json_with_format(
1717        path: &Path,
1718        pages_lookahead: Option<usize>,
1719        pages_lookback: Option<usize>,
1720        max_buffered_rows: Option<usize>,
1721        max_buffered_mb: Option<usize>,
1722        row_numbers: bool,
1723        row_start_index: usize,
1724        format: JsonFormat,
1725    ) -> Result<Self> {
1726        let file = File::open(path)?;
1727        let lf = JsonReader::new(file)
1728            .with_json_format(format)
1729            .finish()?
1730            .lazy();
1731        let mut state = Self::new(
1732            lf,
1733            pages_lookahead,
1734            pages_lookback,
1735            max_buffered_rows,
1736            max_buffered_mb,
1737            true,
1738        )?;
1739        state.row_numbers = row_numbers;
1740        state.row_start_index = row_start_index;
1741        Ok(state)
1742    }
1743
1744    /// Load multiple JSON (array) files and concatenate into one LazyFrame.
1745    pub fn from_json_paths(
1746        paths: &[impl AsRef<Path>],
1747        pages_lookahead: Option<usize>,
1748        pages_lookback: Option<usize>,
1749        max_buffered_rows: Option<usize>,
1750        max_buffered_mb: Option<usize>,
1751        row_numbers: bool,
1752        row_start_index: usize,
1753    ) -> Result<Self> {
1754        Self::from_json_with_format_paths(
1755            paths,
1756            pages_lookahead,
1757            pages_lookback,
1758            max_buffered_rows,
1759            max_buffered_mb,
1760            row_numbers,
1761            row_start_index,
1762            JsonFormat::Json,
1763        )
1764    }
1765
1766    /// Load multiple JSON Lines files and concatenate into one LazyFrame.
1767    pub fn from_json_lines_paths(
1768        paths: &[impl AsRef<Path>],
1769        pages_lookahead: Option<usize>,
1770        pages_lookback: Option<usize>,
1771        max_buffered_rows: Option<usize>,
1772        max_buffered_mb: Option<usize>,
1773        row_numbers: bool,
1774        row_start_index: usize,
1775    ) -> Result<Self> {
1776        Self::from_json_with_format_paths(
1777            paths,
1778            pages_lookahead,
1779            pages_lookback,
1780            max_buffered_rows,
1781            max_buffered_mb,
1782            row_numbers,
1783            row_start_index,
1784            JsonFormat::JsonLines,
1785        )
1786    }
1787
1788    #[allow(clippy::too_many_arguments)]
1789    fn from_json_with_format_paths(
1790        paths: &[impl AsRef<Path>],
1791        pages_lookahead: Option<usize>,
1792        pages_lookback: Option<usize>,
1793        max_buffered_rows: Option<usize>,
1794        max_buffered_mb: Option<usize>,
1795        row_numbers: bool,
1796        row_start_index: usize,
1797        format: JsonFormat,
1798    ) -> Result<Self> {
1799        if paths.is_empty() {
1800            return Err(color_eyre::eyre::eyre!("No paths provided"));
1801        }
1802        if paths.len() == 1 {
1803            return Self::from_json_with_format(
1804                paths[0].as_ref(),
1805                pages_lookahead,
1806                pages_lookback,
1807                max_buffered_rows,
1808                max_buffered_mb,
1809                row_numbers,
1810                row_start_index,
1811                format,
1812            );
1813        }
1814        let mut lazy_frames = Vec::with_capacity(paths.len());
1815        for p in paths {
1816            let file = File::open(p.as_ref())?;
1817            let lf = match &format {
1818                JsonFormat::Json => JsonReader::new(file)
1819                    .with_json_format(JsonFormat::Json)
1820                    .finish()?
1821                    .lazy(),
1822                JsonFormat::JsonLines => JsonReader::new(file)
1823                    .with_json_format(JsonFormat::JsonLines)
1824                    .finish()?
1825                    .lazy(),
1826            };
1827            lazy_frames.push(lf);
1828        }
1829        let lf = polars::prelude::concat(lazy_frames.as_slice(), Default::default())?;
1830        let mut state = Self::new(
1831            lf,
1832            pages_lookahead,
1833            pages_lookback,
1834            max_buffered_rows,
1835            max_buffered_mb,
1836            true,
1837        )?;
1838        state.row_numbers = row_numbers;
1839        state.row_start_index = row_start_index;
1840        Ok(state)
1841    }
1842
1843    #[allow(clippy::too_many_arguments)]
1844    pub fn from_delimited(
1845        path: &Path,
1846        delimiter: u8,
1847        pages_lookahead: Option<usize>,
1848        pages_lookback: Option<usize>,
1849        max_buffered_rows: Option<usize>,
1850        max_buffered_mb: Option<usize>,
1851        row_numbers: bool,
1852        row_start_index: usize,
1853    ) -> Result<Self> {
1854        let pl_path = PlPath::Local(Arc::from(path));
1855        let reader = LazyCsvReader::new(pl_path).with_separator(delimiter);
1856        let lf = reader.finish()?;
1857        let mut state = Self::new(
1858            lf,
1859            pages_lookahead,
1860            pages_lookback,
1861            max_buffered_rows,
1862            max_buffered_mb,
1863            true,
1864        )?;
1865        state.row_numbers = row_numbers;
1866        state.row_start_index = row_start_index;
1867        Ok(state)
1868    }
1869
1870    /// Returns true if a scroll by `rows` would trigger a collect (view would leave the buffer).
1871    /// Used so the UI only shows the throbber when actual data loading will occur.
1872    pub fn scroll_would_trigger_collect(&self, rows: i64) -> bool {
1873        if rows < 0 && self.start_row == 0 {
1874            return false;
1875        }
1876        let new_start_row = if self.start_row as i64 + rows <= 0 {
1877            0
1878        } else {
1879            if let Some(df) = self.df.as_ref() {
1880                if rows > 0 && df.shape().0 <= self.visible_rows {
1881                    return false;
1882                }
1883            }
1884            (self.start_row as i64 + rows) as usize
1885        };
1886        let view_end = new_start_row
1887            + self
1888                .visible_rows
1889                .min(self.num_rows.saturating_sub(new_start_row));
1890        let within_buffer = new_start_row >= self.buffered_start_row
1891            && view_end <= self.buffered_end_row
1892            && self.buffered_end_row > 0;
1893        !within_buffer
1894    }
1895
1896    fn slide_table(&mut self, rows: i64) {
1897        if rows < 0 && self.start_row == 0 {
1898            return;
1899        }
1900
1901        let new_start_row = if self.start_row as i64 + rows <= 0 {
1902            0
1903        } else {
1904            if let Some(df) = self.df.as_ref() {
1905                if rows > 0 && df.shape().0 <= self.visible_rows {
1906                    return;
1907                }
1908            }
1909            (self.start_row as i64 + rows) as usize
1910        };
1911
1912        // Call collect() only when view is outside buffer; otherwise just update start_row.
1913        let view_end = new_start_row
1914            + self
1915                .visible_rows
1916                .min(self.num_rows.saturating_sub(new_start_row));
1917        let within_buffer = new_start_row >= self.buffered_start_row
1918            && view_end <= self.buffered_end_row
1919            && self.buffered_end_row > 0;
1920
1921        if within_buffer {
1922            self.start_row = new_start_row;
1923            return;
1924        }
1925
1926        self.start_row = new_start_row;
1927        self.collect();
1928    }
1929
1930    pub fn collect(&mut self) {
1931        // Update proximity threshold based on visible rows
1932        if self.visible_rows > 0 {
1933            self.proximity_threshold = self.visible_rows;
1934        }
1935
1936        // Run len() only when lf has changed (query, filter, sort, pivot, melt, reset, drill).
1937        if !self.num_rows_valid {
1938            self.num_rows =
1939                match collect_lazy(self.lf.clone().select([len()]), self.polars_streaming) {
1940                    Ok(df) => {
1941                        if let Some(col) = df.get(0) {
1942                            if let Some(AnyValue::UInt32(len)) = col.first() {
1943                                *len as usize
1944                            } else {
1945                                0
1946                            }
1947                        } else {
1948                            0
1949                        }
1950                    }
1951                    Err(_) => 0,
1952                };
1953            self.num_rows_valid = true;
1954        }
1955
1956        if self.num_rows > 0 {
1957            let max_start = self.num_rows.saturating_sub(1);
1958            if self.start_row > max_start {
1959                self.start_row = max_start;
1960            }
1961        } else {
1962            self.start_row = 0;
1963            self.buffered_start_row = 0;
1964            self.buffered_end_row = 0;
1965            self.buffered_df = None;
1966            self.df = None;
1967            self.locked_df = None;
1968            return;
1969        }
1970
1971        // Proximity-based buffer logic
1972        let view_start = self.start_row;
1973        let view_end = self.start_row + self.visible_rows.min(self.num_rows - self.start_row);
1974
1975        // Check if current view is within buffered range
1976        let within_buffer = view_start >= self.buffered_start_row
1977            && view_end <= self.buffered_end_row
1978            && self.buffered_end_row > 0;
1979
1980        // Buffer grows incrementally: initial load and each expansion add only a few pages (lookahead + lookback).
1981        // clamp_buffer_to_max_size caps at max_buffered_rows and slides the window when at cap.
1982        let page_rows = self.visible_rows.max(1);
1983
1984        if within_buffer {
1985            let dist_to_start = view_start.saturating_sub(self.buffered_start_row);
1986            let dist_to_end = self.buffered_end_row.saturating_sub(view_end);
1987
1988            let needs_expansion_back =
1989                dist_to_start <= self.proximity_threshold && self.buffered_start_row > 0;
1990            let needs_expansion_forward =
1991                dist_to_end <= self.proximity_threshold && self.buffered_end_row < self.num_rows;
1992
1993            if !needs_expansion_back && !needs_expansion_forward {
1994                // Column scroll only: reuse cached full buffer and re-slice into locked/scroll columns.
1995                let expected_len = self
1996                    .buffered_end_row
1997                    .saturating_sub(self.buffered_start_row);
1998                if self
1999                    .buffered_df
2000                    .as_ref()
2001                    .is_some_and(|b| b.height() == expected_len)
2002                {
2003                    self.slice_buffer_into_display();
2004                    if self.table_state.selected().is_none() {
2005                        self.table_state.select(Some(0));
2006                    }
2007                    return;
2008                }
2009                self.load_buffer(self.buffered_start_row, self.buffered_end_row);
2010                if self.table_state.selected().is_none() {
2011                    self.table_state.select(Some(0));
2012                }
2013                return;
2014            }
2015
2016            let mut new_buffer_start = if needs_expansion_back {
2017                view_start.saturating_sub(self.pages_lookback * page_rows)
2018            } else {
2019                self.buffered_start_row
2020            };
2021
2022            let mut new_buffer_end = if needs_expansion_forward {
2023                (view_end + self.pages_lookahead * page_rows).min(self.num_rows)
2024            } else {
2025                self.buffered_end_row
2026            };
2027
2028            self.clamp_buffer_to_max_size(
2029                view_start,
2030                view_end,
2031                &mut new_buffer_start,
2032                &mut new_buffer_end,
2033            );
2034            self.load_buffer(new_buffer_start, new_buffer_end);
2035        } else {
2036            // Outside buffer: either extend the previous buffer (so it grows) or load a fresh small window.
2037            // Only extend when the view is "close" to the existing buffer (e.g. user paged down a bit).
2038            // A big jump (e.g. jump to end) should load just a window around the new view, not extend
2039            // the buffer across the whole dataset.
2040            let mut new_buffer_start;
2041            let mut new_buffer_end;
2042
2043            let had_buffer = self.buffered_end_row > 0;
2044            let scrolled_past_end = had_buffer && view_start >= self.buffered_end_row;
2045            let scrolled_past_start = had_buffer && view_end <= self.buffered_start_row;
2046
2047            let extend_forward_ok = scrolled_past_end
2048                && (view_start - self.buffered_end_row) <= self.pages_lookahead * page_rows;
2049            let extend_backward_ok = scrolled_past_start
2050                && (self.buffered_start_row - view_end) <= self.pages_lookback * page_rows;
2051
2052            if extend_forward_ok {
2053                // View is just a few pages past buffer end; extend forward.
2054                new_buffer_start = self.buffered_start_row;
2055                new_buffer_end = (view_end + self.pages_lookahead * page_rows).min(self.num_rows);
2056            } else if extend_backward_ok {
2057                // View is just a few pages before buffer start; extend backward.
2058                new_buffer_start = view_start.saturating_sub(self.pages_lookback * page_rows);
2059                new_buffer_end = self.buffered_end_row;
2060            } else if scrolled_past_end || scrolled_past_start {
2061                // Big jump (e.g. jump to end or jump to start): load a fresh window around the view.
2062                new_buffer_start = view_start.saturating_sub(self.pages_lookback * page_rows);
2063                new_buffer_end = (view_end + self.pages_lookahead * page_rows).min(self.num_rows);
2064                let min_initial_len = (1 + self.pages_lookahead + self.pages_lookback) * page_rows;
2065                let current_len = new_buffer_end.saturating_sub(new_buffer_start);
2066                if current_len < min_initial_len {
2067                    let need = min_initial_len.saturating_sub(current_len);
2068                    let can_extend_end = self.num_rows.saturating_sub(new_buffer_end);
2069                    let can_extend_start = new_buffer_start;
2070                    if can_extend_end >= need {
2071                        new_buffer_end = (new_buffer_end + need).min(self.num_rows);
2072                    } else if can_extend_start >= need {
2073                        new_buffer_start = new_buffer_start.saturating_sub(need);
2074                    } else {
2075                        new_buffer_end = (new_buffer_end + can_extend_end).min(self.num_rows);
2076                        new_buffer_start =
2077                            new_buffer_start.saturating_sub(need.saturating_sub(can_extend_end));
2078                    }
2079                }
2080            } else {
2081                // No buffer yet or big jump: load a fresh small window (view ± a few pages).
2082                new_buffer_start = view_start.saturating_sub(self.pages_lookback * page_rows);
2083                new_buffer_end = (view_end + self.pages_lookahead * page_rows).min(self.num_rows);
2084
2085                // Ensure at least (1 + lookahead + lookback) pages so buffer size is consistent (e.g. 364 at 52 visible).
2086                let min_initial_len = (1 + self.pages_lookahead + self.pages_lookback) * page_rows;
2087                let current_len = new_buffer_end.saturating_sub(new_buffer_start);
2088                if current_len < min_initial_len {
2089                    let need = min_initial_len.saturating_sub(current_len);
2090                    let can_extend_end = self.num_rows.saturating_sub(new_buffer_end);
2091                    let can_extend_start = new_buffer_start;
2092                    if can_extend_end >= need {
2093                        new_buffer_end = (new_buffer_end + need).min(self.num_rows);
2094                    } else if can_extend_start >= need {
2095                        new_buffer_start = new_buffer_start.saturating_sub(need);
2096                    } else {
2097                        new_buffer_end = (new_buffer_end + can_extend_end).min(self.num_rows);
2098                        new_buffer_start =
2099                            new_buffer_start.saturating_sub(need.saturating_sub(can_extend_end));
2100                    }
2101                }
2102            }
2103
2104            self.clamp_buffer_to_max_size(
2105                view_start,
2106                view_end,
2107                &mut new_buffer_start,
2108                &mut new_buffer_end,
2109            );
2110            self.load_buffer(new_buffer_start, new_buffer_end);
2111        }
2112
2113        self.slice_from_buffer();
2114        if self.table_state.selected().is_none() {
2115            self.table_state.select(Some(0));
2116        }
2117    }
2118
2119    /// Invalidate num_rows cache when lf is mutated.
2120    fn invalidate_num_rows(&mut self) {
2121        self.num_rows_valid = false;
2122    }
2123
2124    /// Returns the cached row count when valid (same value shown in the control bar). Use this to
2125    /// avoid an extra full scan for analysis/describe when the table has already been collected.
2126    pub fn num_rows_if_valid(&self) -> Option<usize> {
2127        if self.num_rows_valid {
2128            Some(self.num_rows)
2129        } else {
2130            None
2131        }
2132    }
2133
2134    /// Clamp buffer to max_buffered_rows; when at cap, slide window to keep view inside.
2135    fn clamp_buffer_to_max_size(
2136        &self,
2137        view_start: usize,
2138        view_end: usize,
2139        buffer_start: &mut usize,
2140        buffer_end: &mut usize,
2141    ) {
2142        if self.max_buffered_rows == 0 {
2143            return;
2144        }
2145        let max_len = self.max_buffered_rows;
2146        let requested_len = buffer_end.saturating_sub(*buffer_start);
2147        if requested_len <= max_len {
2148            return;
2149        }
2150        let view_len = view_end.saturating_sub(view_start);
2151        if view_len >= max_len {
2152            *buffer_start = view_start;
2153            *buffer_end = (view_start + max_len).min(self.num_rows);
2154        } else {
2155            let half = (max_len - view_len) / 2;
2156            *buffer_end = (view_end + half).min(self.num_rows);
2157            *buffer_start = (*buffer_end).saturating_sub(max_len);
2158            if *buffer_start > view_start {
2159                *buffer_start = view_start;
2160            }
2161            *buffer_end = (*buffer_start + max_len).min(self.num_rows);
2162        }
2163    }
2164
2165    fn load_buffer(&mut self, buffer_start: usize, buffer_end: usize) {
2166        let buffer_size = buffer_end.saturating_sub(buffer_start);
2167        if buffer_size == 0 {
2168            return;
2169        }
2170
2171        let all_columns: Vec<_> = self
2172            .column_order
2173            .iter()
2174            .map(|name| col(name.as_str()))
2175            .collect();
2176
2177        let mut full_df = match collect_lazy(
2178            self.lf
2179                .clone()
2180                .select(all_columns)
2181                .slice(buffer_start as i64, buffer_size as u32),
2182            self.polars_streaming,
2183        ) {
2184            Ok(df) => df,
2185            Err(e) => {
2186                self.error = Some(e);
2187                return;
2188            }
2189        };
2190
2191        let mut effective_buffer_end = buffer_end;
2192        if self.max_buffered_mb > 0 {
2193            let size = full_df.estimated_size();
2194            let max_bytes = self.max_buffered_mb * 1024 * 1024;
2195            if size > max_bytes {
2196                let rows = full_df.height();
2197                if rows > 0 {
2198                    let bytes_per_row = size / rows;
2199                    let max_rows = (max_bytes / bytes_per_row.max(1)).min(rows);
2200                    if max_rows < rows {
2201                        full_df = full_df.slice(0, max_rows);
2202                        effective_buffer_end = buffer_start + max_rows;
2203                    }
2204                }
2205            }
2206        }
2207
2208        if self.locked_columns_count > 0 {
2209            let locked_names: Vec<&str> = self
2210                .column_order
2211                .iter()
2212                .take(self.locked_columns_count)
2213                .map(|s| s.as_str())
2214                .collect();
2215            let locked_df = match full_df.select(locked_names) {
2216                Ok(df) => df,
2217                Err(e) => {
2218                    self.error = Some(e);
2219                    return;
2220                }
2221            };
2222            self.locked_df = if self.is_grouped() {
2223                match self.format_grouped_dataframe(locked_df) {
2224                    Ok(formatted_df) => Some(formatted_df),
2225                    Err(e) => {
2226                        self.error = Some(PolarsError::ComputeError(
2227                            crate::error_display::user_message_from_report(&e, None).into(),
2228                        ));
2229                        return;
2230                    }
2231                }
2232            } else {
2233                Some(locked_df)
2234            };
2235        } else {
2236            self.locked_df = None;
2237        }
2238
2239        let scroll_names: Vec<&str> = self
2240            .column_order
2241            .iter()
2242            .skip(self.locked_columns_count + self.termcol_index)
2243            .map(|s| s.as_str())
2244            .collect();
2245        if scroll_names.is_empty() {
2246            self.df = None;
2247        } else {
2248            let scroll_df = match full_df.select(scroll_names) {
2249                Ok(df) => df,
2250                Err(e) => {
2251                    self.error = Some(e);
2252                    return;
2253                }
2254            };
2255            self.df = if self.is_grouped() {
2256                match self.format_grouped_dataframe(scroll_df) {
2257                    Ok(formatted_df) => Some(formatted_df),
2258                    Err(e) => {
2259                        self.error = Some(PolarsError::ComputeError(
2260                            crate::error_display::user_message_from_report(&e, None).into(),
2261                        ));
2262                        return;
2263                    }
2264                }
2265            } else {
2266                Some(scroll_df)
2267            };
2268        }
2269        if self.error.is_some() {
2270            self.error = None;
2271        }
2272        self.buffered_start_row = buffer_start;
2273        self.buffered_end_row = effective_buffer_end;
2274        self.buffered_df = Some(full_df);
2275    }
2276
2277    /// Recompute locked_df and df from the cached full buffer. Used when only termcol_index (or locked columns) changed.
2278    fn slice_buffer_into_display(&mut self) {
2279        let full_df = match self.buffered_df.as_ref() {
2280            Some(df) => df,
2281            None => return,
2282        };
2283
2284        if self.locked_columns_count > 0 {
2285            let locked_names: Vec<&str> = self
2286                .column_order
2287                .iter()
2288                .take(self.locked_columns_count)
2289                .map(|s| s.as_str())
2290                .collect();
2291            if let Ok(locked_df) = full_df.select(locked_names) {
2292                self.locked_df = if self.is_grouped() {
2293                    self.format_grouped_dataframe(locked_df).ok()
2294                } else {
2295                    Some(locked_df)
2296                };
2297            }
2298        } else {
2299            self.locked_df = None;
2300        }
2301
2302        let scroll_names: Vec<&str> = self
2303            .column_order
2304            .iter()
2305            .skip(self.locked_columns_count + self.termcol_index)
2306            .map(|s| s.as_str())
2307            .collect();
2308        if scroll_names.is_empty() {
2309            self.df = None;
2310        } else if let Ok(scroll_df) = full_df.select(scroll_names) {
2311            self.df = if self.is_grouped() {
2312                self.format_grouped_dataframe(scroll_df).ok()
2313            } else {
2314                Some(scroll_df)
2315            };
2316        }
2317    }
2318
2319    fn slice_from_buffer(&mut self) {
2320        // Buffer contains the full range [buffered_start_row, buffered_end_row)
2321        // The displayed portion [start_row, start_row + visible_rows) is a subset
2322        // We'll slice the displayed portion when rendering based on offset
2323        // No action needed here - the buffer is stored, slicing happens at render time
2324    }
2325
2326    fn format_grouped_dataframe(&self, df: DataFrame) -> Result<DataFrame> {
2327        let schema = df.schema();
2328        let mut new_series = Vec::new();
2329
2330        for (col_name, dtype) in schema.iter() {
2331            let col = df.column(col_name)?;
2332            if matches!(dtype, DataType::List(_)) {
2333                let string_series: Series = col
2334                    .list()?
2335                    .into_iter()
2336                    .map(|opt_list| {
2337                        opt_list.map(|list_series| {
2338                            let values: Vec<String> = list_series
2339                                .iter()
2340                                .take(10)
2341                                .map(|v| v.str_value().to_string())
2342                                .collect();
2343                            if list_series.len() > 10 {
2344                                format!("[{}...] ({} items)", values.join(", "), list_series.len())
2345                            } else {
2346                                format!("[{}]", values.join(", "))
2347                            }
2348                        })
2349                    })
2350                    .collect();
2351                new_series.push(string_series.with_name(col_name.as_str().into()).into());
2352            } else {
2353                new_series.push(col.clone());
2354            }
2355        }
2356
2357        Ok(DataFrame::new(new_series)?)
2358    }
2359
2360    pub fn select_next(&mut self) {
2361        self.table_state.select_next();
2362        if let Some(selected) = self.table_state.selected() {
2363            if selected >= self.visible_rows && self.visible_rows > 0 {
2364                self.slide_table(1);
2365            }
2366        }
2367    }
2368
2369    pub fn page_down(&mut self) {
2370        self.slide_table(self.visible_rows as i64);
2371    }
2372
2373    pub fn select_previous(&mut self) {
2374        if let Some(selected) = self.table_state.selected() {
2375            self.table_state.select_previous();
2376            if selected == 0 && self.start_row > 0 {
2377                self.slide_table(-1);
2378            }
2379        } else {
2380            self.table_state.select(Some(0));
2381        }
2382    }
2383
2384    pub fn scroll_to(&mut self, index: usize) {
2385        if self.start_row == index {
2386            return;
2387        }
2388
2389        if index == 0 {
2390            self.start_row = 0;
2391            self.collect();
2392            self.start_row = 0;
2393        } else {
2394            self.start_row = index;
2395            self.collect();
2396        }
2397    }
2398
2399    /// Scroll so that the given row index is centered in the view when possible (respects table bounds).
2400    /// Selects that row. Used by go-to-line.
2401    pub fn scroll_to_row_centered(&mut self, row_index: usize) {
2402        self.ensure_num_rows();
2403        if self.num_rows == 0 || self.visible_rows == 0 {
2404            return;
2405        }
2406        let center_offset = self.visible_rows / 2;
2407        let mut start_row = row_index.saturating_sub(center_offset);
2408        let max_start = self.num_rows.saturating_sub(self.visible_rows);
2409        start_row = start_row.min(max_start);
2410
2411        if self.start_row == start_row {
2412            let display_idx = row_index
2413                .saturating_sub(start_row)
2414                .min(self.visible_rows.saturating_sub(1));
2415            self.table_state.select(Some(display_idx));
2416            return;
2417        }
2418
2419        self.start_row = start_row;
2420        self.collect();
2421        let display_idx = row_index
2422            .saturating_sub(start_row)
2423            .min(self.visible_rows.saturating_sub(1));
2424        self.table_state.select(Some(display_idx));
2425    }
2426
2427    /// Ensure num_rows is up to date (runs len() query if needed). Used before scroll_to_end.
2428    fn ensure_num_rows(&mut self) {
2429        if self.num_rows_valid {
2430            return;
2431        }
2432        if self.visible_rows > 0 {
2433            self.proximity_threshold = self.visible_rows;
2434        }
2435        self.num_rows = match self.lf.clone().select([len()]).collect() {
2436            Ok(df) => {
2437                if let Some(col) = df.get(0) {
2438                    if let Some(AnyValue::UInt32(len)) = col.first() {
2439                        *len as usize
2440                    } else {
2441                        0
2442                    }
2443                } else {
2444                    0
2445                }
2446            }
2447            Err(_) => 0,
2448        };
2449        self.num_rows_valid = true;
2450    }
2451
2452    /// Jump to the last page; buffer is trimmed/loaded as needed. Selects the last row.
2453    pub fn scroll_to_end(&mut self) {
2454        self.ensure_num_rows();
2455        if self.num_rows == 0 {
2456            self.start_row = 0;
2457            self.buffered_start_row = 0;
2458            self.buffered_end_row = 0;
2459            return;
2460        }
2461        let end_start = self.num_rows.saturating_sub(self.visible_rows);
2462        if self.start_row == end_start {
2463            self.select_last_visible_row();
2464            return;
2465        }
2466        self.start_row = end_start;
2467        self.collect();
2468        self.select_last_visible_row();
2469    }
2470
2471    /// Set table selection to the last row in the current view (for use after scroll_to_end).
2472    fn select_last_visible_row(&mut self) {
2473        if self.num_rows == 0 {
2474            return;
2475        }
2476        let last_row_display_idx = (self.num_rows - 1).saturating_sub(self.start_row);
2477        let sel = last_row_display_idx.min(self.visible_rows.saturating_sub(1));
2478        self.table_state.select(Some(sel));
2479    }
2480
2481    pub fn half_page_down(&mut self) {
2482        let half = (self.visible_rows / 2).max(1) as i64;
2483        self.slide_table(half);
2484    }
2485
2486    pub fn half_page_up(&mut self) {
2487        if self.start_row == 0 {
2488            return;
2489        }
2490        let half = (self.visible_rows / 2).max(1) as i64;
2491        self.slide_table(-half);
2492    }
2493
2494    pub fn page_up(&mut self) {
2495        if self.start_row == 0 {
2496            return;
2497        }
2498        self.slide_table(-(self.visible_rows as i64));
2499    }
2500
2501    pub fn scroll_right(&mut self) {
2502        let max_scroll = self
2503            .column_order
2504            .len()
2505            .saturating_sub(self.locked_columns_count);
2506        if self.termcol_index < max_scroll.saturating_sub(1) {
2507            self.termcol_index += 1;
2508            self.collect();
2509        }
2510    }
2511
2512    pub fn scroll_left(&mut self) {
2513        if self.termcol_index > 0 {
2514            self.termcol_index -= 1;
2515            self.collect();
2516        }
2517    }
2518
2519    pub fn headers(&self) -> Vec<String> {
2520        self.column_order.clone()
2521    }
2522
2523    pub fn set_column_order(&mut self, order: Vec<String>) {
2524        self.column_order = order;
2525        self.buffered_start_row = 0;
2526        self.buffered_end_row = 0;
2527        self.buffered_df = None;
2528        self.collect();
2529    }
2530
2531    pub fn set_locked_columns(&mut self, count: usize) {
2532        self.locked_columns_count = count.min(self.column_order.len());
2533        self.buffered_start_row = 0;
2534        self.buffered_end_row = 0;
2535        self.buffered_df = None;
2536        self.collect();
2537    }
2538
2539    pub fn locked_columns_count(&self) -> usize {
2540        self.locked_columns_count
2541    }
2542
2543    // Getter methods for template creation
2544    pub fn get_filters(&self) -> &[FilterStatement] {
2545        &self.filters
2546    }
2547
2548    pub fn get_sort_columns(&self) -> &[String] {
2549        &self.sort_columns
2550    }
2551
2552    pub fn get_sort_ascending(&self) -> bool {
2553        self.sort_ascending
2554    }
2555
2556    pub fn get_column_order(&self) -> &[String] {
2557        &self.column_order
2558    }
2559
2560    pub fn get_active_query(&self) -> &str {
2561        &self.active_query
2562    }
2563
2564    pub fn get_active_sql_query(&self) -> &str {
2565        &self.active_sql_query
2566    }
2567
2568    pub fn get_active_fuzzy_query(&self) -> &str {
2569        &self.active_fuzzy_query
2570    }
2571
2572    pub fn last_pivot_spec(&self) -> Option<&PivotSpec> {
2573        self.last_pivot_spec.as_ref()
2574    }
2575
2576    pub fn last_melt_spec(&self) -> Option<&MeltSpec> {
2577        self.last_melt_spec.as_ref()
2578    }
2579
2580    pub fn is_grouped(&self) -> bool {
2581        self.schema
2582            .iter()
2583            .any(|(_, dtype)| matches!(dtype, DataType::List(_)))
2584    }
2585
2586    pub fn group_key_columns(&self) -> Vec<String> {
2587        self.schema
2588            .iter()
2589            .filter(|(_, dtype)| !matches!(dtype, DataType::List(_)))
2590            .map(|(name, _)| name.to_string())
2591            .collect()
2592    }
2593
2594    pub fn group_value_columns(&self) -> Vec<String> {
2595        self.schema
2596            .iter()
2597            .filter(|(_, dtype)| matches!(dtype, DataType::List(_)))
2598            .map(|(name, _)| name.to_string())
2599            .collect()
2600    }
2601
2602    /// Estimated heap size in bytes of the currently buffered slice (locked + scrollable), if collected.
2603    pub fn buffered_memory_bytes(&self) -> Option<usize> {
2604        let locked = self
2605            .locked_df
2606            .as_ref()
2607            .map(|df| df.estimated_size())
2608            .unwrap_or(0);
2609        let scroll = self.df.as_ref().map(|df| df.estimated_size()).unwrap_or(0);
2610        if locked == 0 && scroll == 0 {
2611            None
2612        } else {
2613            Some(locked + scroll)
2614        }
2615    }
2616
2617    /// Number of rows currently in the buffer. 0 if no buffer loaded.
2618    pub fn buffered_rows(&self) -> usize {
2619        self.buffered_end_row
2620            .saturating_sub(self.buffered_start_row)
2621    }
2622
2623    /// Maximum buffer size in rows (0 = no limit).
2624    pub fn max_buffered_rows(&self) -> usize {
2625        self.max_buffered_rows
2626    }
2627
2628    /// Maximum buffer size in MiB (0 = no limit).
2629    pub fn max_buffered_mb(&self) -> usize {
2630        self.max_buffered_mb
2631    }
2632
2633    pub fn drill_down_into_group(&mut self, group_index: usize) -> Result<()> {
2634        if !self.is_grouped() {
2635            return Ok(());
2636        }
2637
2638        self.grouped_lf = Some(self.lf.clone());
2639
2640        let grouped_df = collect_lazy(self.lf.clone(), self.polars_streaming)?;
2641
2642        if group_index >= grouped_df.height() {
2643            return Err(color_eyre::eyre::eyre!("Group index out of bounds"));
2644        }
2645
2646        let key_columns = self.group_key_columns();
2647        let mut key_values = Vec::new();
2648        for col_name in &key_columns {
2649            let col = grouped_df.column(col_name)?;
2650            let value = col.get(group_index).map_err(|e| {
2651                color_eyre::eyre::eyre!(
2652                    "Group index {} out of bounds for column {}: {}",
2653                    group_index,
2654                    col_name,
2655                    e
2656                )
2657            })?;
2658            key_values.push(value.str_value().to_string());
2659        }
2660        self.drilled_down_group_key = Some(key_values.clone());
2661        self.drilled_down_group_key_columns = Some(key_columns.clone());
2662
2663        let value_columns = self.group_value_columns();
2664        if value_columns.is_empty() {
2665            return Err(color_eyre::eyre::eyre!("No value columns in grouped data"));
2666        }
2667
2668        let mut columns = Vec::new();
2669
2670        let first_value_col = grouped_df.column(&value_columns[0])?;
2671        let first_list_value = first_value_col.get(group_index).map_err(|e| {
2672            color_eyre::eyre::eyre!("Group index {} out of bounds: {}", group_index, e)
2673        })?;
2674        let row_count = if let AnyValue::List(list_series) = first_list_value {
2675            list_series.len()
2676        } else {
2677            0
2678        };
2679
2680        for col_name in &key_columns {
2681            let col = grouped_df.column(col_name)?;
2682            let value = col.get(group_index).map_err(|e| {
2683                color_eyre::eyre::eyre!(
2684                    "Group index {} out of bounds for column {}: {}",
2685                    group_index,
2686                    col_name,
2687                    e
2688                )
2689            })?;
2690            let constant_series = match value {
2691                AnyValue::Int32(v) => Series::new(col_name.as_str().into(), vec![v; row_count]),
2692                AnyValue::Int64(v) => Series::new(col_name.as_str().into(), vec![v; row_count]),
2693                AnyValue::UInt32(v) => Series::new(col_name.as_str().into(), vec![v; row_count]),
2694                AnyValue::UInt64(v) => Series::new(col_name.as_str().into(), vec![v; row_count]),
2695                AnyValue::Float32(v) => Series::new(col_name.as_str().into(), vec![v; row_count]),
2696                AnyValue::Float64(v) => Series::new(col_name.as_str().into(), vec![v; row_count]),
2697                AnyValue::String(v) => {
2698                    Series::new(col_name.as_str().into(), vec![v.to_string(); row_count])
2699                }
2700                AnyValue::Boolean(v) => Series::new(col_name.as_str().into(), vec![v; row_count]),
2701                _ => {
2702                    let str_val = value.str_value().to_string();
2703                    Series::new(col_name.as_str().into(), vec![str_val; row_count])
2704                }
2705            };
2706            columns.push(constant_series.into());
2707        }
2708
2709        for col_name in &value_columns {
2710            let col = grouped_df.column(col_name)?;
2711            let value = col.get(group_index).map_err(|e| {
2712                color_eyre::eyre::eyre!(
2713                    "Group index {} out of bounds for column {}: {}",
2714                    group_index,
2715                    col_name,
2716                    e
2717                )
2718            })?;
2719            if let AnyValue::List(list_series) = value {
2720                let named_series = list_series.with_name(col_name.as_str().into());
2721                columns.push(named_series.into());
2722            }
2723        }
2724
2725        let group_df = DataFrame::new(columns)?;
2726
2727        self.invalidate_num_rows();
2728        self.lf = group_df.lazy();
2729        self.schema = self.lf.clone().collect_schema()?;
2730        self.column_order = self.schema.iter_names().map(|s| s.to_string()).collect();
2731        self.drilled_down_group_index = Some(group_index);
2732        self.start_row = 0;
2733        self.termcol_index = 0;
2734        self.locked_columns_count = 0;
2735        self.table_state.select(Some(0));
2736        self.collect();
2737
2738        Ok(())
2739    }
2740
2741    pub fn drill_up(&mut self) -> Result<()> {
2742        if let Some(grouped_lf) = self.grouped_lf.take() {
2743            self.invalidate_num_rows();
2744            self.lf = grouped_lf;
2745            self.schema = self.lf.clone().collect_schema()?;
2746            self.column_order = self.schema.iter_names().map(|s| s.to_string()).collect();
2747            self.drilled_down_group_index = None;
2748            self.drilled_down_group_key = None;
2749            self.drilled_down_group_key_columns = None;
2750            self.start_row = 0;
2751            self.termcol_index = 0;
2752            self.locked_columns_count = 0;
2753            self.table_state.select(Some(0));
2754            self.collect();
2755            Ok(())
2756        } else {
2757            Err(color_eyre::eyre::eyre!("Not in drill-down mode"))
2758        }
2759    }
2760
2761    pub fn get_analysis_dataframe(&self) -> Result<DataFrame> {
2762        Ok(collect_lazy(self.lf.clone(), self.polars_streaming)?)
2763    }
2764
2765    pub fn get_analysis_context(&self) -> crate::statistics::AnalysisContext {
2766        crate::statistics::AnalysisContext {
2767            has_query: !self.active_query.is_empty(),
2768            query: self.active_query.clone(),
2769            has_filters: !self.filters.is_empty(),
2770            filter_count: self.filters.len(),
2771            is_drilled_down: self.is_drilled_down(),
2772            group_key: self.drilled_down_group_key.clone(),
2773            group_columns: self.drilled_down_group_key_columns.clone(),
2774        }
2775    }
2776
2777    /// Pivot the current `LazyFrame` (long → wide). Never uses `original_lf`.
2778    /// Collects current `lf`, runs `pivot_stable`, then replaces `lf` with result.
2779    /// We use pivot_stable for all aggregation types: Polars' non-stable pivot() prints
2780    /// "unstable pivot not yet supported, using stable pivot" to stdout, which corrupts the TUI.
2781    pub fn pivot(&mut self, spec: &PivotSpec) -> Result<()> {
2782        let df = collect_lazy(self.lf.clone(), self.polars_streaming)?;
2783        let agg_expr = pivot_agg_expr(spec.aggregation)?;
2784        let index_str: Vec<&str> = spec.index.iter().map(|s| s.as_str()).collect();
2785        let index_opt = if index_str.is_empty() {
2786            None
2787        } else {
2788            Some(index_str)
2789        };
2790        let pivoted = pivot_stable(
2791            &df,
2792            [spec.pivot_column.as_str()],
2793            index_opt,
2794            Some([spec.value_column.as_str()]),
2795            spec.sort_columns,
2796            Some(agg_expr),
2797            None,
2798        )?;
2799        self.last_pivot_spec = Some(spec.clone());
2800        self.last_melt_spec = None;
2801        self.replace_lf_after_reshape(pivoted.lazy())?;
2802        Ok(())
2803    }
2804
2805    /// Melt the current `LazyFrame` (wide → long). Never uses `original_lf`.
2806    pub fn melt(&mut self, spec: &MeltSpec) -> Result<()> {
2807        let on = cols(spec.value_columns.iter().map(|s| s.as_str()));
2808        let index = cols(spec.index.iter().map(|s| s.as_str()));
2809        let args = UnpivotArgsDSL {
2810            on,
2811            index,
2812            variable_name: Some(PlSmallStr::from(spec.variable_name.as_str())),
2813            value_name: Some(PlSmallStr::from(spec.value_name.as_str())),
2814        };
2815        let lf = self.lf.clone().unpivot(args);
2816        self.last_melt_spec = Some(spec.clone());
2817        self.last_pivot_spec = None;
2818        self.replace_lf_after_reshape(lf)?;
2819        Ok(())
2820    }
2821
2822    fn replace_lf_after_reshape(&mut self, lf: LazyFrame) -> Result<()> {
2823        self.invalidate_num_rows();
2824        self.lf = lf;
2825        self.schema = self.lf.clone().collect_schema()?;
2826        self.column_order = self.schema.iter_names().map(|s| s.to_string()).collect();
2827        self.filters.clear();
2828        self.sort_columns.clear();
2829        self.active_query.clear();
2830        self.active_sql_query.clear();
2831        self.active_fuzzy_query.clear();
2832        self.error = None;
2833        self.df = None;
2834        self.locked_df = None;
2835        self.grouped_lf = None;
2836        self.drilled_down_group_index = None;
2837        self.drilled_down_group_key = None;
2838        self.drilled_down_group_key_columns = None;
2839        self.start_row = 0;
2840        self.termcol_index = 0;
2841        self.locked_columns_count = 0;
2842        self.buffered_start_row = 0;
2843        self.buffered_end_row = 0;
2844        self.buffered_df = None;
2845        self.table_state.select(Some(0));
2846        self.collect();
2847        Ok(())
2848    }
2849
2850    pub fn is_drilled_down(&self) -> bool {
2851        self.drilled_down_group_index.is_some()
2852    }
2853
2854    fn apply_transformations(&mut self) {
2855        let mut lf = self.lf.clone();
2856        let mut final_expr: Option<Expr> = None;
2857
2858        for filter in &self.filters {
2859            let col_expr = col(&filter.column);
2860            let val_lit = if let Some(dtype) = self.schema.get(&filter.column) {
2861                match dtype {
2862                    DataType::Float32 | DataType::Float64 => filter
2863                        .value
2864                        .parse::<f64>()
2865                        .map(lit)
2866                        .unwrap_or_else(|_| lit(filter.value.as_str())),
2867                    DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => filter
2868                        .value
2869                        .parse::<i64>()
2870                        .map(lit)
2871                        .unwrap_or_else(|_| lit(filter.value.as_str())),
2872                    DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 => {
2873                        filter
2874                            .value
2875                            .parse::<u64>()
2876                            .map(lit)
2877                            .unwrap_or_else(|_| lit(filter.value.as_str()))
2878                    }
2879                    DataType::Boolean => filter
2880                        .value
2881                        .parse::<bool>()
2882                        .map(lit)
2883                        .unwrap_or_else(|_| lit(filter.value.as_str())),
2884                    _ => lit(filter.value.as_str()),
2885                }
2886            } else {
2887                lit(filter.value.as_str())
2888            };
2889
2890            let op_expr = match filter.operator {
2891                FilterOperator::Eq => col_expr.eq(val_lit),
2892                FilterOperator::NotEq => col_expr.neq(val_lit),
2893                FilterOperator::Gt => col_expr.gt(val_lit),
2894                FilterOperator::Lt => col_expr.lt(val_lit),
2895                FilterOperator::GtEq => col_expr.gt_eq(val_lit),
2896                FilterOperator::LtEq => col_expr.lt_eq(val_lit),
2897                FilterOperator::Contains => {
2898                    let val = filter.value.clone();
2899                    col_expr.str().contains_literal(lit(val))
2900                }
2901                FilterOperator::NotContains => {
2902                    let val = filter.value.clone();
2903                    col_expr.str().contains_literal(lit(val)).not()
2904                }
2905            };
2906
2907            if let Some(current) = final_expr {
2908                final_expr = Some(match filter.logical_op {
2909                    LogicalOperator::And => current.and(op_expr),
2910                    LogicalOperator::Or => current.or(op_expr),
2911                });
2912            } else {
2913                final_expr = Some(op_expr);
2914            }
2915        }
2916
2917        if let Some(e) = final_expr {
2918            lf = lf.filter(e);
2919        }
2920
2921        if !self.sort_columns.is_empty() {
2922            let options = SortMultipleOptions {
2923                descending: self
2924                    .sort_columns
2925                    .iter()
2926                    .map(|_| !self.sort_ascending)
2927                    .collect(),
2928                ..Default::default()
2929            };
2930            lf = lf.sort_by_exprs(
2931                self.sort_columns.iter().map(col).collect::<Vec<_>>(),
2932                options,
2933            );
2934        } else if !self.sort_ascending {
2935            lf = lf.reverse();
2936        }
2937
2938        self.invalidate_num_rows();
2939        self.lf = lf;
2940        self.collect();
2941    }
2942
2943    pub fn sort(&mut self, columns: Vec<String>, ascending: bool) {
2944        self.sort_columns = columns;
2945        self.sort_ascending = ascending;
2946        self.buffered_start_row = 0;
2947        self.buffered_end_row = 0;
2948        self.buffered_df = None;
2949        self.apply_transformations();
2950    }
2951
2952    pub fn reverse(&mut self) {
2953        self.sort_ascending = !self.sort_ascending;
2954
2955        self.buffered_start_row = 0;
2956        self.buffered_end_row = 0;
2957        self.buffered_df = None;
2958
2959        if !self.sort_columns.is_empty() {
2960            let options = SortMultipleOptions {
2961                descending: self
2962                    .sort_columns
2963                    .iter()
2964                    .map(|_| !self.sort_ascending)
2965                    .collect(),
2966                ..Default::default()
2967            };
2968            self.invalidate_num_rows();
2969            self.lf = self.lf.clone().sort_by_exprs(
2970                self.sort_columns.iter().map(col).collect::<Vec<_>>(),
2971                options,
2972            );
2973            self.collect();
2974        } else {
2975            self.invalidate_num_rows();
2976            self.lf = self.lf.clone().reverse();
2977            self.collect();
2978        }
2979    }
2980
2981    pub fn filter(&mut self, filters: Vec<FilterStatement>) {
2982        self.filters = filters;
2983        self.buffered_start_row = 0;
2984        self.buffered_end_row = 0;
2985        self.buffered_df = None;
2986        self.apply_transformations();
2987    }
2988
2989    pub fn query(&mut self, query: String) {
2990        self.error = None;
2991
2992        let trimmed_query = query.trim();
2993        if trimmed_query.is_empty() {
2994            self.reset_lf_to_original();
2995            self.collect();
2996            return;
2997        }
2998
2999        match parse_query(&query) {
3000            Ok((cols, filter, group_by_cols, group_by_col_names)) => {
3001                let mut lf = self.original_lf.clone();
3002                let mut schema_opt: Option<Arc<Schema>> = None;
3003
3004                // Apply filter first (where clause)
3005                if let Some(f) = filter {
3006                    lf = lf.filter(f);
3007                }
3008
3009                if !group_by_cols.is_empty() {
3010                    if !cols.is_empty() {
3011                        lf = lf.group_by(group_by_cols.clone()).agg(cols);
3012                    } else {
3013                        let schema = match lf.clone().collect_schema() {
3014                            Ok(s) => s,
3015                            Err(e) => {
3016                                self.error = Some(e);
3017                                return; // Don't modify state on error
3018                            }
3019                        };
3020                        let all_columns: Vec<String> =
3021                            schema.iter_names().map(|s| s.to_string()).collect();
3022
3023                        // In Polars, when you group_by and aggregate columns without explicit aggregation functions,
3024                        // Polars automatically collects the values as lists. We need to aggregate all columns
3025                        // except the group columns to avoid duplicates.
3026                        let mut agg_exprs = Vec::new();
3027                        for col_name in &all_columns {
3028                            if !group_by_col_names.contains(col_name) {
3029                                agg_exprs.push(col(col_name));
3030                            }
3031                        }
3032
3033                        lf = lf.group_by(group_by_cols.clone()).agg(agg_exprs);
3034                    }
3035                    // Sort by the result's group-key column names (first N columns after agg).
3036                    // Works for aliased or plain names without relying on parser-derived names.
3037                    let schema = match lf.collect_schema() {
3038                        Ok(s) => s,
3039                        Err(e) => {
3040                            self.error = Some(e);
3041                            return;
3042                        }
3043                    };
3044                    schema_opt = Some(schema.clone());
3045                    let sort_exprs: Vec<Expr> = schema
3046                        .iter_names()
3047                        .take(group_by_cols.len())
3048                        .map(|n| col(n.as_str()))
3049                        .collect();
3050                    lf = lf.sort_by_exprs(sort_exprs, Default::default());
3051                } else if !cols.is_empty() {
3052                    lf = lf.select(cols);
3053                }
3054
3055                let schema = match schema_opt {
3056                    Some(s) => s,
3057                    None => match lf.collect_schema() {
3058                        Ok(s) => s,
3059                        Err(e) => {
3060                            self.error = Some(e);
3061                            return;
3062                        }
3063                    },
3064                };
3065
3066                self.schema = schema;
3067                self.invalidate_num_rows();
3068                self.lf = lf;
3069                self.column_order = self.schema.iter_names().map(|s| s.to_string()).collect();
3070
3071                // Lock grouped columns if by clause was used
3072                // Only lock the columns specified in the 'by' clause, not the value columns
3073                if !group_by_col_names.is_empty() {
3074                    // Group columns appear first in Polars results, so count consecutive
3075                    // columns from the start that are in group_by_col_names
3076                    let mut locked_count = 0;
3077                    for col_name in &self.column_order {
3078                        if group_by_col_names.contains(col_name) {
3079                            locked_count += 1;
3080                        } else {
3081                            // Once we hit a non-group column, we've passed all group columns
3082                            break;
3083                        }
3084                    }
3085                    self.locked_columns_count = locked_count;
3086                } else {
3087                    self.locked_columns_count = 0;
3088                }
3089
3090                // Clear filters when using query
3091                self.filters.clear();
3092                self.sort_columns.clear();
3093                self.sort_ascending = true;
3094                self.start_row = 0;
3095                self.termcol_index = 0;
3096                self.active_query = query;
3097                self.buffered_start_row = 0;
3098                self.buffered_end_row = 0;
3099                self.buffered_df = None;
3100                // Reset drill-down state when applying new query
3101                self.drilled_down_group_index = None;
3102                self.drilled_down_group_key = None;
3103                self.drilled_down_group_key_columns = None;
3104                self.grouped_lf = None;
3105                // Reset table state selection
3106                self.table_state.select(Some(0));
3107                // Collect will clamp start_row to valid range, but we want to ensure it's 0
3108                // So we set it to 0, collect (which may clamp it), then ensure it's 0 again
3109                self.collect();
3110                // After collect(), ensure we're at the top (collect() may have clamped if num_rows was wrong)
3111                // But if num_rows > 0, we want start_row = 0 to show the first row
3112                if self.num_rows > 0 {
3113                    self.start_row = 0;
3114                }
3115            }
3116            Err(e) => {
3117                // Parse errors are already user-facing strings; store as ComputeError
3118                self.error = Some(PolarsError::ComputeError(e.into()));
3119            }
3120        }
3121    }
3122
3123    /// Execute a SQL query against the current LazyFrame (registered as table "df").
3124    /// Empty SQL resets to original state. Does not call collect(); the event loop does that via AppEvent::Collect.
3125    pub fn sql_query(&mut self, sql: String) {
3126        self.error = None;
3127        let trimmed = sql.trim();
3128        if trimmed.is_empty() {
3129            self.reset_lf_to_original();
3130            return;
3131        }
3132
3133        #[cfg(feature = "sql")]
3134        {
3135            use polars_sql::SQLContext;
3136            let mut ctx = SQLContext::new();
3137            ctx.register("df", self.lf.clone());
3138            match ctx.execute(trimmed) {
3139                Ok(result_lf) => {
3140                    let schema = match result_lf.clone().collect_schema() {
3141                        Ok(s) => s,
3142                        Err(e) => {
3143                            self.error = Some(e);
3144                            return;
3145                        }
3146                    };
3147                    self.schema = schema;
3148                    self.invalidate_num_rows();
3149                    self.lf = result_lf;
3150                    self.column_order = self.schema.iter_names().map(|s| s.to_string()).collect();
3151                    self.active_sql_query = sql;
3152                    self.locked_columns_count = 0;
3153                    self.filters.clear();
3154                    self.sort_columns.clear();
3155                    self.sort_ascending = true;
3156                    self.start_row = 0;
3157                    self.termcol_index = 0;
3158                    self.drilled_down_group_index = None;
3159                    self.drilled_down_group_key = None;
3160                    self.drilled_down_group_key_columns = None;
3161                    self.grouped_lf = None;
3162                    self.buffered_start_row = 0;
3163                    self.buffered_end_row = 0;
3164                    self.buffered_df = None;
3165                    self.table_state.select(Some(0));
3166                }
3167                Err(e) => {
3168                    self.error = Some(e);
3169                }
3170            }
3171        }
3172
3173        #[cfg(not(feature = "sql"))]
3174        {
3175            self.error = Some(PolarsError::ComputeError(
3176                format!("SQL support not compiled in (build with --features sql)").into(),
3177            ));
3178        }
3179    }
3180
3181    /// Fuzzy search: filter rows where any string column matches the query.
3182    /// Query is split on whitespace; each token must match (in order, case-insensitive) in some string column.
3183    /// Empty query resets to original_lf.
3184    pub fn fuzzy_search(&mut self, query: String) {
3185        self.error = None;
3186        let trimmed = query.trim();
3187        if trimmed.is_empty() {
3188            self.reset_lf_to_original();
3189            self.collect();
3190            return;
3191        }
3192        let string_cols: Vec<String> = self
3193            .schema
3194            .iter()
3195            .filter(|(_, dtype)| dtype.is_string())
3196            .map(|(name, _)| name.to_string())
3197            .collect();
3198        if string_cols.is_empty() {
3199            self.error = Some(PolarsError::ComputeError(
3200                "Fuzzy search requires at least one string column".into(),
3201            ));
3202            return;
3203        }
3204        let tokens: Vec<&str> = trimmed
3205            .split_whitespace()
3206            .filter(|s| !s.is_empty())
3207            .collect();
3208        let token_exprs: Vec<Expr> = tokens
3209            .iter()
3210            .map(|token| {
3211                let pattern = fuzzy_token_regex(token);
3212                string_cols
3213                    .iter()
3214                    .map(|c| col(c.as_str()).str().contains(lit(pattern.as_str()), false))
3215                    .reduce(|a, b| a.or(b))
3216                    .unwrap()
3217            })
3218            .collect();
3219        let combined = token_exprs.into_iter().reduce(|a, b| a.and(b)).unwrap();
3220        self.lf = self.original_lf.clone().filter(combined);
3221        self.filters.clear();
3222        self.sort_columns.clear();
3223        self.active_query.clear();
3224        self.active_sql_query.clear();
3225        self.active_fuzzy_query = query;
3226        // Reset view and buffer so collect() runs on the new lf
3227        self.locked_columns_count = 0;
3228        self.start_row = 0;
3229        self.termcol_index = 0;
3230        self.drilled_down_group_index = None;
3231        self.drilled_down_group_key = None;
3232        self.drilled_down_group_key_columns = None;
3233        self.grouped_lf = None;
3234        self.buffered_start_row = 0;
3235        self.buffered_end_row = 0;
3236        self.buffered_df = None;
3237        self.table_state.select(Some(0));
3238        self.invalidate_num_rows();
3239        self.collect();
3240    }
3241}
3242
3243/// Case-insensitive regex for one token: chars in order with `.*` between.
3244pub(crate) fn fuzzy_token_regex(token: &str) -> String {
3245    let inner: String =
3246        token
3247            .chars()
3248            .map(|c| regex::escape(&c.to_string()))
3249            .fold(String::new(), |mut s, e| {
3250                if !s.is_empty() {
3251                    s.push_str(".*");
3252                }
3253                s.push_str(&e);
3254                s
3255            });
3256    format!("(?i).*{}.*", inner)
3257}
3258
3259pub struct DataTable {
3260    pub header_bg: Color,
3261    pub header_fg: Color,
3262    pub row_numbers_fg: Color,
3263    pub separator_fg: Color,
3264    pub table_cell_padding: u16,
3265    pub alternate_row_bg: Option<Color>,
3266    /// When true, colorize cells by column type using the optional colors below.
3267    pub column_colors: bool,
3268    pub str_col: Option<Color>,
3269    pub int_col: Option<Color>,
3270    pub float_col: Option<Color>,
3271    pub bool_col: Option<Color>,
3272    pub temporal_col: Option<Color>,
3273}
3274
3275impl Default for DataTable {
3276    fn default() -> Self {
3277        Self {
3278            header_bg: Color::Indexed(236),
3279            header_fg: Color::White,
3280            row_numbers_fg: Color::DarkGray,
3281            separator_fg: Color::White,
3282            table_cell_padding: 1,
3283            alternate_row_bg: None,
3284            column_colors: false,
3285            str_col: None,
3286            int_col: None,
3287            float_col: None,
3288            bool_col: None,
3289            temporal_col: None,
3290        }
3291    }
3292}
3293
3294/// Parameters for rendering the row numbers column.
3295struct RowNumbersParams {
3296    start_row: usize,
3297    visible_rows: usize,
3298    num_rows: usize,
3299    row_start_index: usize,
3300    selected_row: Option<usize>,
3301}
3302
3303impl DataTable {
3304    pub fn new() -> Self {
3305        Self::default()
3306    }
3307
3308    pub fn with_colors(
3309        mut self,
3310        header_bg: Color,
3311        header_fg: Color,
3312        row_numbers_fg: Color,
3313        separator_fg: Color,
3314    ) -> Self {
3315        self.header_bg = header_bg;
3316        self.header_fg = header_fg;
3317        self.row_numbers_fg = row_numbers_fg;
3318        self.separator_fg = separator_fg;
3319        self
3320    }
3321
3322    pub fn with_cell_padding(mut self, padding: u16) -> Self {
3323        self.table_cell_padding = padding;
3324        self
3325    }
3326
3327    pub fn with_alternate_row_bg(mut self, color: Option<Color>) -> Self {
3328        self.alternate_row_bg = color;
3329        self
3330    }
3331
3332    /// Enable column-type coloring and set colors for string, int, float, bool, and temporal columns.
3333    pub fn with_column_type_colors(
3334        mut self,
3335        str_col: Color,
3336        int_col: Color,
3337        float_col: Color,
3338        bool_col: Color,
3339        temporal_col: Color,
3340    ) -> Self {
3341        self.column_colors = true;
3342        self.str_col = Some(str_col);
3343        self.int_col = Some(int_col);
3344        self.float_col = Some(float_col);
3345        self.bool_col = Some(bool_col);
3346        self.temporal_col = Some(temporal_col);
3347        self
3348    }
3349
3350    /// Return the color for a column dtype when column_colors is enabled.
3351    fn column_type_color(&self, dtype: &DataType) -> Option<Color> {
3352        if !self.column_colors {
3353            return None;
3354        }
3355        match dtype {
3356            DataType::String => self.str_col,
3357            DataType::Int8
3358            | DataType::Int16
3359            | DataType::Int32
3360            | DataType::Int64
3361            | DataType::UInt8
3362            | DataType::UInt16
3363            | DataType::UInt32
3364            | DataType::UInt64 => self.int_col,
3365            DataType::Float32 | DataType::Float64 => self.float_col,
3366            DataType::Boolean => self.bool_col,
3367            DataType::Date | DataType::Datetime(_, _) | DataType::Time | DataType::Duration(_) => {
3368                self.temporal_col
3369            }
3370            _ => None,
3371        }
3372    }
3373
3374    fn render_dataframe(
3375        &self,
3376        df: &DataFrame,
3377        area: Rect,
3378        buf: &mut Buffer,
3379        state: &mut TableState,
3380        _row_numbers: bool,
3381        _start_row_offset: usize,
3382    ) {
3383        // make each column as wide as it needs to be to fit the content
3384        let (height, cols) = df.shape();
3385
3386        // widths starts at the length of each column naame
3387        let mut widths: Vec<u16> = df
3388            .get_column_names()
3389            .iter()
3390            .map(|name| name.chars().count() as u16)
3391            .collect();
3392
3393        let mut used_width = 0;
3394
3395        // rows is a vector initialized to a vector of lenth "height" empty rows
3396        let mut rows: Vec<Vec<Cell>> = vec![vec![]; height];
3397        let mut visible_columns = 0;
3398
3399        let max_rows = height.min(if area.height > 1 {
3400            area.height as usize - 1
3401        } else {
3402            0
3403        });
3404
3405        for col_index in 0..cols {
3406            let mut max_len = widths[col_index];
3407            let col_data = &df[col_index];
3408            let col_color = self.column_type_color(col_data.dtype());
3409
3410            for (row_index, row) in rows.iter_mut().take(max_rows).enumerate() {
3411                let value = col_data.get(row_index).unwrap();
3412                let val_str: Cow<str> = if matches!(value, AnyValue::Null) {
3413                    Cow::Borrowed("")
3414                } else {
3415                    value.str_value()
3416                };
3417                let len = val_str.chars().count() as u16;
3418                max_len = max_len.max(len);
3419                let cell = match col_color {
3420                    Some(c) => Cell::from(Line::from(Span::styled(
3421                        val_str.into_owned(),
3422                        Style::default().fg(c),
3423                    ))),
3424                    None => Cell::from(Line::from(val_str)),
3425                };
3426                row.push(cell);
3427            }
3428
3429            // Use > not >= so the last column is shown when it fits exactly (no padding needed after it)
3430            let overflows = (used_width + max_len) > area.width;
3431
3432            if overflows && col_data.dtype() == &DataType::String {
3433                let visible_width = area.width.saturating_sub(used_width);
3434                visible_columns += 1;
3435                widths[col_index] = visible_width;
3436                break;
3437            } else if !overflows {
3438                visible_columns += 1;
3439                widths[col_index] = max_len;
3440                used_width += max_len + self.table_cell_padding;
3441            } else {
3442                break;
3443            }
3444        }
3445
3446        widths.truncate(visible_columns);
3447        // convert rows to a vector of Row, with optional alternate row background
3448        let rows: Vec<Row> = rows
3449            .into_iter()
3450            .enumerate()
3451            .map(|(row_index, mut row)| {
3452                row.truncate(visible_columns);
3453                let row_style = if row_index % 2 == 1 {
3454                    self.alternate_row_bg
3455                        .map(|c| Style::default().bg(c))
3456                        .unwrap_or_default()
3457                } else {
3458                    Style::default()
3459                };
3460                Row::new(row).style(row_style)
3461            })
3462            .collect();
3463
3464        let header_row_style = if self.header_bg == Color::Reset {
3465            Style::default().fg(self.header_fg)
3466        } else {
3467            Style::default().bg(self.header_bg).fg(self.header_fg)
3468        };
3469        let headers: Vec<Span> = df
3470            .get_column_names()
3471            .iter()
3472            .take(visible_columns)
3473            .map(|name| Span::styled(name.to_string(), Style::default()))
3474            .collect();
3475
3476        StatefulWidget::render(
3477            Table::new(rows, widths)
3478                .column_spacing(self.table_cell_padding)
3479                .header(Row::new(headers).style(header_row_style))
3480                .row_highlight_style(Style::default().add_modifier(Modifier::REVERSED)),
3481            area,
3482            buf,
3483            state,
3484        );
3485    }
3486
3487    fn render_row_numbers(&self, area: Rect, buf: &mut Buffer, params: RowNumbersParams) {
3488        // Header row: same style as the rest of the column headers (fill full width so color matches)
3489        let header_style = if self.header_bg == Color::Reset {
3490            Style::default().fg(self.header_fg)
3491        } else {
3492            Style::default().bg(self.header_bg).fg(self.header_fg)
3493        };
3494        let header_fill = " ".repeat(area.width as usize);
3495        Paragraph::new(header_fill).style(header_style).render(
3496            Rect {
3497                x: area.x,
3498                y: area.y,
3499                width: area.width,
3500                height: 1,
3501            },
3502            buf,
3503        );
3504
3505        // Only render up to the actual number of rows in the data
3506        let rows_to_render = params
3507            .visible_rows
3508            .min(params.num_rows.saturating_sub(params.start_row));
3509
3510        if rows_to_render == 0 {
3511            return;
3512        }
3513
3514        // Calculate width needed for largest row number
3515        let max_row_num =
3516            params.start_row + rows_to_render.saturating_sub(1) + params.row_start_index;
3517        let max_width = max_row_num.to_string().len();
3518
3519        // Render row numbers
3520        for row_idx in 0..rows_to_render.min(area.height.saturating_sub(1) as usize) {
3521            let row_num = params.start_row + row_idx + params.row_start_index;
3522            let row_num_text = row_num.to_string();
3523
3524            // Right-align row numbers within the available width
3525            let padding = max_width.saturating_sub(row_num_text.len());
3526            let padded_text = format!("{}{}", " ".repeat(padding), row_num_text);
3527
3528            // Match main table background: default when row is even (or no alternate);
3529            // when alternate_row_bg is set, odd rows use that background.
3530            // When selected: same background as row (no inversion), foreground = terminal default.
3531            let is_selected = params.selected_row == Some(row_idx);
3532            let (fg, bg) = if is_selected {
3533                (
3534                    Color::Reset,
3535                    self.alternate_row_bg.filter(|_| row_idx % 2 == 1),
3536                )
3537            } else {
3538                (
3539                    self.row_numbers_fg,
3540                    self.alternate_row_bg.filter(|_| row_idx % 2 == 1),
3541                )
3542            };
3543            let row_num_style = match bg {
3544                Some(bg_color) => Style::default().fg(fg).bg(bg_color),
3545                None => Style::default().fg(fg),
3546            };
3547
3548            let y = area.y + row_idx as u16 + 1; // +1 for header row
3549            if y < area.y + area.height {
3550                Paragraph::new(padded_text).style(row_num_style).render(
3551                    Rect {
3552                        x: area.x,
3553                        y,
3554                        width: area.width,
3555                        height: 1,
3556                    },
3557                    buf,
3558                );
3559            }
3560        }
3561    }
3562}
3563
3564impl StatefulWidget for DataTable {
3565    type State = DataTableState;
3566
3567    fn render(self, area: Rect, buf: &mut Buffer, state: &mut Self::State) {
3568        state.visible_termcols = area.width as usize;
3569        let new_visible_rows = if area.height > 0 {
3570            (area.height - 1) as usize
3571        } else {
3572            0
3573        };
3574        let needs_collect = new_visible_rows != state.visible_rows;
3575        state.visible_rows = new_visible_rows;
3576
3577        if let Some(selected) = state.table_state.selected() {
3578            if selected >= state.visible_rows {
3579                state.table_state.select(Some(state.visible_rows - 1))
3580            }
3581        }
3582
3583        if needs_collect {
3584            state.collect();
3585        }
3586
3587        // Only show errors in main view if not suppressed (e.g., when query input is active)
3588        // Query errors should only be shown in the query input frame
3589        if let Some(error) = state.error.as_ref() {
3590            if !state.suppress_error_display {
3591                Paragraph::new(format!("Error: {}", user_message_from_polars(error)))
3592                    .centered()
3593                    .block(
3594                        Block::default()
3595                            .borders(Borders::NONE)
3596                            .padding(Padding::top(area.height / 2)),
3597                    )
3598                    .wrap(ratatui::widgets::Wrap { trim: true })
3599                    .render(area, buf);
3600                return;
3601            }
3602            // If suppress_error_display is true, continue rendering the table normally
3603        }
3604
3605        // Calculate row number column width if enabled
3606        let row_num_width = if state.row_numbers {
3607            let max_row_num = state.start_row + state.visible_rows.saturating_sub(1) + 1; // +1 for 1-based, +1 for potential
3608            max_row_num.to_string().len().max(1) as u16 + 1 // +1 for spacing
3609        } else {
3610            0
3611        };
3612
3613        // Calculate locked columns width if any
3614        let mut locked_width = row_num_width;
3615        if let Some(locked_df) = state.locked_df.as_ref() {
3616            let (_, cols) = locked_df.shape();
3617            for col_index in 0..cols {
3618                let col_name = locked_df.get_column_names()[col_index];
3619                let mut max_len = col_name.chars().count() as u16;
3620                let col_data = &locked_df[col_index];
3621                for row_index in 0..locked_df.height().min(state.visible_rows) {
3622                    let value = col_data.get(row_index).unwrap();
3623                    let val_str: Cow<str> = if matches!(value, AnyValue::Null) {
3624                        Cow::Borrowed("")
3625                    } else {
3626                        value.str_value()
3627                    };
3628                    let len = val_str.chars().count() as u16;
3629                    max_len = max_len.max(len);
3630                }
3631                locked_width += max_len + 1;
3632            }
3633        }
3634
3635        // Split area into locked and scrollable parts
3636        if locked_width > row_num_width && locked_width < area.width {
3637            let locked_area = Rect {
3638                x: area.x,
3639                y: area.y,
3640                width: locked_width,
3641                height: area.height,
3642            };
3643            let separator_x = locked_area.x + locked_area.width;
3644
3645            // If row numbers are enabled, render them first in a separate area
3646            if state.row_numbers {
3647                let row_num_area = Rect {
3648                    x: area.x,
3649                    y: area.y,
3650                    width: row_num_width,
3651                    height: area.height,
3652                };
3653                self.render_row_numbers(
3654                    row_num_area,
3655                    buf,
3656                    RowNumbersParams {
3657                        start_row: state.start_row,
3658                        visible_rows: state.visible_rows,
3659                        num_rows: state.num_rows,
3660                        row_start_index: state.row_start_index,
3661                        selected_row: state.table_state.selected(),
3662                    },
3663                );
3664            }
3665            let scrollable_area = Rect {
3666                x: separator_x + 1,
3667                y: area.y,
3668                width: area.width.saturating_sub(locked_width + 1),
3669                height: area.height,
3670            };
3671
3672            // Render locked columns (no background shading, just the vertical separator)
3673            if let Some(locked_df) = state.locked_df.as_ref() {
3674                // Adjust locked_area to account for row numbers if present
3675                let adjusted_locked_area = if state.row_numbers {
3676                    Rect {
3677                        x: area.x + row_num_width,
3678                        y: area.y,
3679                        width: locked_width - row_num_width,
3680                        height: area.height,
3681                    }
3682                } else {
3683                    locked_area
3684                };
3685
3686                // Slice buffer to visible portion
3687                let offset = state.start_row.saturating_sub(state.buffered_start_row);
3688                let slice_len = state
3689                    .visible_rows
3690                    .min(locked_df.height().saturating_sub(offset));
3691                if offset < locked_df.height() && slice_len > 0 {
3692                    let sliced_df = locked_df.slice(offset as i64, slice_len);
3693                    self.render_dataframe(
3694                        &sliced_df,
3695                        adjusted_locked_area,
3696                        buf,
3697                        &mut state.table_state,
3698                        false,
3699                        state.start_row,
3700                    );
3701                }
3702            }
3703
3704            // Draw vertical separator line
3705            let separator_x_adjusted = if state.row_numbers {
3706                area.x + row_num_width + (locked_width - row_num_width)
3707            } else {
3708                separator_x
3709            };
3710            for y in area.y..area.y + area.height {
3711                let cell = &mut buf[(separator_x_adjusted, y)];
3712                cell.set_char('│');
3713                cell.set_style(Style::default().fg(self.separator_fg));
3714            }
3715
3716            // Adjust scrollable area to account for row numbers
3717            let adjusted_scrollable_area = if state.row_numbers {
3718                Rect {
3719                    x: separator_x_adjusted + 1,
3720                    y: area.y,
3721                    width: area.width.saturating_sub(locked_width + 1),
3722                    height: area.height,
3723                }
3724            } else {
3725                scrollable_area
3726            };
3727
3728            // Render scrollable columns
3729            if let Some(df) = state.df.as_ref() {
3730                // Slice buffer to visible portion
3731                let offset = state.start_row.saturating_sub(state.buffered_start_row);
3732                let slice_len = state.visible_rows.min(df.height().saturating_sub(offset));
3733                if offset < df.height() && slice_len > 0 {
3734                    let sliced_df = df.slice(offset as i64, slice_len);
3735                    self.render_dataframe(
3736                        &sliced_df,
3737                        adjusted_scrollable_area,
3738                        buf,
3739                        &mut state.table_state,
3740                        false,
3741                        state.start_row,
3742                    );
3743                }
3744            }
3745        } else if let Some(df) = state.df.as_ref() {
3746            // No locked columns, render normally
3747            // If row numbers are enabled, render them first
3748            if state.row_numbers {
3749                let row_num_area = Rect {
3750                    x: area.x,
3751                    y: area.y,
3752                    width: row_num_width,
3753                    height: area.height,
3754                };
3755                self.render_row_numbers(
3756                    row_num_area,
3757                    buf,
3758                    RowNumbersParams {
3759                        start_row: state.start_row,
3760                        visible_rows: state.visible_rows,
3761                        num_rows: state.num_rows,
3762                        row_start_index: state.row_start_index,
3763                        selected_row: state.table_state.selected(),
3764                    },
3765                );
3766
3767                // Adjust data area to exclude row number column
3768                let data_area = Rect {
3769                    x: area.x + row_num_width,
3770                    y: area.y,
3771                    width: area.width.saturating_sub(row_num_width),
3772                    height: area.height,
3773                };
3774
3775                // Slice buffer to visible portion
3776                let offset = state.start_row.saturating_sub(state.buffered_start_row);
3777                let slice_len = state.visible_rows.min(df.height().saturating_sub(offset));
3778                if offset < df.height() && slice_len > 0 {
3779                    let sliced_df = df.slice(offset as i64, slice_len);
3780                    self.render_dataframe(
3781                        &sliced_df,
3782                        data_area,
3783                        buf,
3784                        &mut state.table_state,
3785                        false,
3786                        state.start_row,
3787                    );
3788                }
3789            } else {
3790                // Slice buffer to visible portion
3791                let offset = state.start_row.saturating_sub(state.buffered_start_row);
3792                let slice_len = state.visible_rows.min(df.height().saturating_sub(offset));
3793                if offset < df.height() && slice_len > 0 {
3794                    let sliced_df = df.slice(offset as i64, slice_len);
3795                    self.render_dataframe(
3796                        &sliced_df,
3797                        area,
3798                        buf,
3799                        &mut state.table_state,
3800                        false,
3801                        state.start_row,
3802                    );
3803                }
3804            }
3805        } else if !state.column_order.is_empty() {
3806            // Empty result (0 rows) but we have a schema - show empty table with header, no rows
3807            let empty_columns: Vec<_> = state
3808                .column_order
3809                .iter()
3810                .map(|name| Series::new(name.as_str().into(), Vec::<String>::new()).into())
3811                .collect();
3812            if let Ok(empty_df) = DataFrame::new(empty_columns) {
3813                if state.row_numbers {
3814                    let row_num_area = Rect {
3815                        x: area.x,
3816                        y: area.y,
3817                        width: row_num_width,
3818                        height: area.height,
3819                    };
3820                    self.render_row_numbers(
3821                        row_num_area,
3822                        buf,
3823                        RowNumbersParams {
3824                            start_row: 0,
3825                            visible_rows: state.visible_rows,
3826                            num_rows: 0,
3827                            row_start_index: state.row_start_index,
3828                            selected_row: None,
3829                        },
3830                    );
3831                    let data_area = Rect {
3832                        x: area.x + row_num_width,
3833                        y: area.y,
3834                        width: area.width.saturating_sub(row_num_width),
3835                        height: area.height,
3836                    };
3837                    self.render_dataframe(
3838                        &empty_df,
3839                        data_area,
3840                        buf,
3841                        &mut state.table_state,
3842                        false,
3843                        0,
3844                    );
3845                } else {
3846                    self.render_dataframe(&empty_df, area, buf, &mut state.table_state, false, 0);
3847                }
3848            } else {
3849                Paragraph::new("No data").render(area, buf);
3850            }
3851        } else {
3852            // Truly empty: no schema, not loaded, or blank file
3853            Paragraph::new("No data").render(area, buf);
3854        }
3855    }
3856}
3857
3858#[cfg(test)]
3859mod tests {
3860    use super::*;
3861    use crate::filter_modal::{FilterOperator, FilterStatement, LogicalOperator};
3862    use crate::pivot_melt_modal::{MeltSpec, PivotAggregation, PivotSpec};
3863
3864    fn create_test_lf() -> LazyFrame {
3865        df! (
3866            "a" => &[1, 2, 3],
3867            "b" => &["x", "y", "z"]
3868        )
3869        .unwrap()
3870        .lazy()
3871    }
3872
3873    fn create_large_test_lf() -> LazyFrame {
3874        df! (
3875            "a" => (0..100).collect::<Vec<i32>>(),
3876            "b" => (0..100).map(|i| format!("text_{}", i)).collect::<Vec<String>>(),
3877            "c" => (0..100).map(|i| i % 3).collect::<Vec<i32>>(),
3878            "d" => (0..100).map(|i| i % 5).collect::<Vec<i32>>()
3879        )
3880        .unwrap()
3881        .lazy()
3882    }
3883
3884    #[test]
3885    fn test_from_csv() {
3886        // Ensure sample data is generated before running test
3887        // Test uncompressed CSV loading
3888        let path = crate::tests::sample_data_dir().join("3-sfd-header.csv");
3889        let state = DataTableState::from_csv(&path, &Default::default()).unwrap(); // Uses default buffer params from options
3890        assert_eq!(state.schema.len(), 6); // id, integer_col, float_col, string_col, boolean_col, date_col
3891    }
3892
3893    #[test]
3894    fn test_from_csv_gzipped() {
3895        // Ensure sample data is generated before running test
3896        // Test gzipped CSV loading
3897        let path = crate::tests::sample_data_dir().join("mixed_types.csv.gz");
3898        let state = DataTableState::from_csv(&path, &Default::default()).unwrap(); // Uses default buffer params from options
3899        assert_eq!(state.schema.len(), 6); // id, integer_col, float_col, string_col, boolean_col, date_col
3900    }
3901
3902    #[test]
3903    fn test_from_parquet() {
3904        // Ensure sample data is generated before running test
3905        let path = crate::tests::sample_data_dir().join("people.parquet");
3906        let state = DataTableState::from_parquet(&path, None, None, None, None, false, 1).unwrap();
3907        assert!(!state.schema.is_empty());
3908    }
3909
3910    #[test]
3911    fn test_from_ipc() {
3912        use polars::prelude::IpcWriter;
3913        use std::io::BufWriter;
3914        let mut df = df!(
3915            "x" => &[1_i32, 2, 3],
3916            "y" => &["a", "b", "c"]
3917        )
3918        .unwrap();
3919        let dir = std::env::temp_dir();
3920        let path = dir.join("datui_test_ipc.arrow");
3921        let file = std::fs::File::create(&path).unwrap();
3922        let mut writer = BufWriter::new(file);
3923        IpcWriter::new(&mut writer).finish(&mut df).unwrap();
3924        drop(writer);
3925        let state = DataTableState::from_ipc(&path, None, None, None, None, false, 1).unwrap();
3926        assert_eq!(state.schema.len(), 2);
3927        assert!(state.schema.contains("x"));
3928        assert!(state.schema.contains("y"));
3929        let _ = std::fs::remove_file(&path);
3930    }
3931
3932    #[test]
3933    fn test_from_avro() {
3934        use polars::io::avro::AvroWriter;
3935        use std::io::BufWriter;
3936        let mut df = df!(
3937            "id" => &[1_i32, 2, 3],
3938            "name" => &["alice", "bob", "carol"]
3939        )
3940        .unwrap();
3941        let dir = std::env::temp_dir();
3942        let path = dir.join("datui_test_avro.avro");
3943        let file = std::fs::File::create(&path).unwrap();
3944        let mut writer = BufWriter::new(file);
3945        AvroWriter::new(&mut writer).finish(&mut df).unwrap();
3946        drop(writer);
3947        let state = DataTableState::from_avro(&path, None, None, None, None, false, 1).unwrap();
3948        assert_eq!(state.schema.len(), 2);
3949        assert!(state.schema.contains("id"));
3950        assert!(state.schema.contains("name"));
3951        let _ = std::fs::remove_file(&path);
3952    }
3953
3954    #[test]
3955    fn test_from_orc() {
3956        use arrow::array::{Int64Array, StringArray};
3957        use arrow::datatypes::{DataType, Field, Schema};
3958        use arrow::record_batch::RecordBatch;
3959        use orc_rust::ArrowWriterBuilder;
3960        use std::io::BufWriter;
3961        use std::sync::Arc;
3962
3963        let schema = Arc::new(Schema::new(vec![
3964            Field::new("id", DataType::Int64, false),
3965            Field::new("name", DataType::Utf8, false),
3966        ]));
3967        let id_array = Arc::new(Int64Array::from(vec![1_i64, 2, 3]));
3968        let name_array = Arc::new(StringArray::from(vec!["a", "b", "c"]));
3969        let batch = RecordBatch::try_new(schema.clone(), vec![id_array, name_array]).unwrap();
3970
3971        let dir = std::env::temp_dir();
3972        let path = dir.join("datui_test_orc.orc");
3973        let file = std::fs::File::create(&path).unwrap();
3974        let writer = BufWriter::new(file);
3975        let mut orc_writer = ArrowWriterBuilder::new(writer, schema).try_build().unwrap();
3976        orc_writer.write(&batch).unwrap();
3977        orc_writer.close().unwrap();
3978
3979        let state = DataTableState::from_orc(&path, None, None, None, None, false, 1).unwrap();
3980        assert_eq!(state.schema.len(), 2);
3981        assert!(state.schema.contains("id"));
3982        assert!(state.schema.contains("name"));
3983        let _ = std::fs::remove_file(&path);
3984    }
3985
3986    #[test]
3987    fn test_filter() {
3988        let lf = create_test_lf();
3989        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
3990        let filters = vec![FilterStatement {
3991            column: "a".to_string(),
3992            operator: FilterOperator::Gt,
3993            value: "2".to_string(),
3994            logical_op: LogicalOperator::And,
3995        }];
3996        state.filter(filters);
3997        let df = state.lf.clone().collect().unwrap();
3998        assert_eq!(df.shape().0, 1);
3999        assert_eq!(df.column("a").unwrap().get(0).unwrap(), AnyValue::Int32(3));
4000    }
4001
4002    #[test]
4003    fn test_sort() {
4004        let lf = create_test_lf();
4005        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4006        state.sort(vec!["a".to_string()], false);
4007        let df = state.lf.clone().collect().unwrap();
4008        assert_eq!(df.column("a").unwrap().get(0).unwrap(), AnyValue::Int32(3));
4009    }
4010
4011    #[test]
4012    fn test_query() {
4013        let lf = create_test_lf();
4014        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4015        state.query("select b where a = 2".to_string());
4016        let df = state.lf.clone().collect().unwrap();
4017        assert_eq!(df.shape(), (1, 1));
4018        assert_eq!(
4019            df.column("b").unwrap().get(0).unwrap(),
4020            AnyValue::String("y")
4021        );
4022    }
4023
4024    #[test]
4025    fn test_query_date_accessors() {
4026        use chrono::NaiveDate;
4027        let df = df!(
4028            "event_date" => [
4029                NaiveDate::from_ymd_opt(2024, 1, 15).unwrap(),
4030                NaiveDate::from_ymd_opt(2024, 6, 20).unwrap(),
4031                NaiveDate::from_ymd_opt(2024, 12, 31).unwrap(),
4032            ],
4033            "name" => &["a", "b", "c"],
4034        )
4035        .unwrap();
4036        let lf = df.lazy();
4037        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4038
4039        // Select with date accessors
4040        state.query("select name, year: event_date.year, month: event_date.month".to_string());
4041        assert!(
4042            state.error.is_none(),
4043            "query should succeed: {:?}",
4044            state.error
4045        );
4046        let df = state.lf.clone().collect().unwrap();
4047        assert_eq!(df.shape(), (3, 3));
4048        assert_eq!(
4049            df.column("year").unwrap().get(0).unwrap(),
4050            AnyValue::Int32(2024)
4051        );
4052        assert_eq!(
4053            df.column("month").unwrap().get(0).unwrap(),
4054            AnyValue::Int8(1)
4055        );
4056        assert_eq!(
4057            df.column("month").unwrap().get(1).unwrap(),
4058            AnyValue::Int8(6)
4059        );
4060
4061        // Filter with date accessor
4062        state.query("select name, event_date where event_date.month = 12".to_string());
4063        assert!(
4064            state.error.is_none(),
4065            "filter should succeed: {:?}",
4066            state.error
4067        );
4068        let df = state.lf.clone().collect().unwrap();
4069        assert_eq!(df.height(), 1);
4070        assert_eq!(
4071            df.column("name").unwrap().get(0).unwrap(),
4072            AnyValue::String("c")
4073        );
4074
4075        // Filter with YYYY.MM.DD date literal
4076        state.query("select name, event_date where event_date.date > 2024.06.15".to_string());
4077        assert!(
4078            state.error.is_none(),
4079            "date literal filter should succeed: {:?}",
4080            state.error
4081        );
4082        let df = state.lf.clone().collect().unwrap();
4083        assert_eq!(
4084            df.height(),
4085            2,
4086            "2024-06-20 and 2024-12-31 are after 2024-06-15"
4087        );
4088
4089        // String accessors: upper, lower, len, ends_with
4090        state.query(
4091            "select name, upper_name: name.upper, name_len: name.len where name.ends_with[\"c\"]"
4092                .to_string(),
4093        );
4094        assert!(
4095            state.error.is_none(),
4096            "string accessors should succeed: {:?}",
4097            state.error
4098        );
4099        let df = state.lf.clone().collect().unwrap();
4100        assert_eq!(df.height(), 1, "only 'c' ends with 'c'");
4101        assert_eq!(
4102            df.column("upper_name").unwrap().get(0).unwrap(),
4103            AnyValue::String("C")
4104        );
4105
4106        // Query that returns 0 rows: df and locked_df must be cleared for correct empty-table render
4107        state.query("select where event_date.date = 2020.01.01".to_string());
4108        assert!(state.error.is_none());
4109        assert_eq!(state.num_rows, 0);
4110        state.visible_rows = 10;
4111        state.collect();
4112        assert!(state.df.is_none(), "df must be cleared when num_rows is 0");
4113        assert!(
4114            state.locked_df.is_none(),
4115            "locked_df must be cleared when num_rows is 0"
4116        );
4117    }
4118
4119    #[test]
4120    fn test_select_next_previous() {
4121        let lf = create_large_test_lf();
4122        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4123        state.visible_rows = 10;
4124        state.table_state.select(Some(5));
4125
4126        state.select_next();
4127        assert_eq!(state.table_state.selected(), Some(6));
4128
4129        state.select_previous();
4130        assert_eq!(state.table_state.selected(), Some(5));
4131    }
4132
4133    #[test]
4134    fn test_page_up_down() {
4135        let lf = create_large_test_lf();
4136        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4137        state.visible_rows = 20;
4138        state.collect();
4139
4140        assert_eq!(state.start_row, 0);
4141        state.page_down();
4142        assert_eq!(state.start_row, 20);
4143        state.page_down();
4144        assert_eq!(state.start_row, 40);
4145        state.page_up();
4146        assert_eq!(state.start_row, 20);
4147        state.page_up();
4148        assert_eq!(state.start_row, 0);
4149    }
4150
4151    #[test]
4152    fn test_scroll_left_right() {
4153        let lf = create_large_test_lf();
4154        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4155        assert_eq!(state.termcol_index, 0);
4156        state.scroll_right();
4157        assert_eq!(state.termcol_index, 1);
4158        state.scroll_right();
4159        assert_eq!(state.termcol_index, 2);
4160        state.scroll_left();
4161        assert_eq!(state.termcol_index, 1);
4162        state.scroll_left();
4163        assert_eq!(state.termcol_index, 0);
4164    }
4165
4166    #[test]
4167    fn test_reverse() {
4168        let lf = create_test_lf();
4169        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4170        state.sort(vec!["a".to_string()], true);
4171        assert_eq!(
4172            state
4173                .lf
4174                .clone()
4175                .collect()
4176                .unwrap()
4177                .column("a")
4178                .unwrap()
4179                .get(0)
4180                .unwrap(),
4181            AnyValue::Int32(1)
4182        );
4183        state.reverse();
4184        assert_eq!(
4185            state
4186                .lf
4187                .clone()
4188                .collect()
4189                .unwrap()
4190                .column("a")
4191                .unwrap()
4192                .get(0)
4193                .unwrap(),
4194            AnyValue::Int32(3)
4195        );
4196    }
4197
4198    #[test]
4199    fn test_filter_multiple() {
4200        let lf = create_large_test_lf();
4201        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4202        let filters = vec![
4203            FilterStatement {
4204                column: "c".to_string(),
4205                operator: FilterOperator::Eq,
4206                value: "1".to_string(),
4207                logical_op: LogicalOperator::And,
4208            },
4209            FilterStatement {
4210                column: "d".to_string(),
4211                operator: FilterOperator::Eq,
4212                value: "2".to_string(),
4213                logical_op: LogicalOperator::And,
4214            },
4215        ];
4216        state.filter(filters);
4217        let df = state.lf.clone().collect().unwrap();
4218        assert_eq!(df.shape().0, 7);
4219    }
4220
4221    #[test]
4222    fn test_filter_and_sort() {
4223        let lf = create_large_test_lf();
4224        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4225        let filters = vec![FilterStatement {
4226            column: "c".to_string(),
4227            operator: FilterOperator::Eq,
4228            value: "1".to_string(),
4229            logical_op: LogicalOperator::And,
4230        }];
4231        state.filter(filters);
4232        state.sort(vec!["a".to_string()], false);
4233        let df = state.lf.clone().collect().unwrap();
4234        assert_eq!(df.column("a").unwrap().get(0).unwrap(), AnyValue::Int32(97));
4235    }
4236
4237    /// Minimal long-format data for pivot tests: id, date, key, value.
4238    /// Includes duplicates for aggregation (e.g. (1,d1,A) appears twice).
4239    fn create_pivot_long_lf() -> LazyFrame {
4240        let df = df!(
4241            "id" => &[1_i32, 1, 1, 2, 2, 2, 1, 2],
4242            "date" => &["d1", "d1", "d1", "d1", "d1", "d1", "d1", "d1"],
4243            "key" => &["A", "B", "C", "A", "B", "C", "A", "B"],
4244            "value" => &[10.0_f64, 20.0, 30.0, 40.0, 50.0, 60.0, 11.0, 51.0],
4245        )
4246        .unwrap();
4247        df.lazy()
4248    }
4249
4250    /// Wide-format data for melt tests: id, date, c1, c2, c3.
4251    fn create_melt_wide_lf() -> LazyFrame {
4252        let df = df!(
4253            "id" => &[1_i32, 2, 3],
4254            "date" => &["d1", "d2", "d3"],
4255            "c1" => &[10.0_f64, 20.0, 30.0],
4256            "c2" => &[11.0, 21.0, 31.0],
4257            "c3" => &[12.0, 22.0, 32.0],
4258        )
4259        .unwrap();
4260        df.lazy()
4261    }
4262
4263    #[test]
4264    fn test_pivot_basic() {
4265        let lf = create_pivot_long_lf();
4266        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4267        let spec = PivotSpec {
4268            index: vec!["id".to_string(), "date".to_string()],
4269            pivot_column: "key".to_string(),
4270            value_column: "value".to_string(),
4271            aggregation: PivotAggregation::Last,
4272            sort_columns: false,
4273        };
4274        state.pivot(&spec).unwrap();
4275        let df = state.lf.clone().collect().unwrap();
4276        let names: Vec<&str> = df.get_column_names().iter().map(|s| s.as_str()).collect();
4277        assert!(names.contains(&"id"));
4278        assert!(names.contains(&"date"));
4279        assert!(names.contains(&"A"));
4280        assert!(names.contains(&"B"));
4281        assert!(names.contains(&"C"));
4282        assert_eq!(df.height(), 2);
4283    }
4284
4285    #[test]
4286    fn test_pivot_aggregation_last() {
4287        let lf = create_pivot_long_lf();
4288        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4289        let spec = PivotSpec {
4290            index: vec!["id".to_string(), "date".to_string()],
4291            pivot_column: "key".to_string(),
4292            value_column: "value".to_string(),
4293            aggregation: PivotAggregation::Last,
4294            sort_columns: false,
4295        };
4296        state.pivot(&spec).unwrap();
4297        let df = state.lf.clone().collect().unwrap();
4298        let a_col = df.column("A").unwrap();
4299        let row0 = a_col.get(0).unwrap();
4300        let row1 = a_col.get(1).unwrap();
4301        assert_eq!(row0, AnyValue::Float64(11.0));
4302        assert_eq!(row1, AnyValue::Float64(40.0));
4303    }
4304
4305    #[test]
4306    fn test_pivot_aggregation_first() {
4307        let lf = create_pivot_long_lf();
4308        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4309        let spec = PivotSpec {
4310            index: vec!["id".to_string(), "date".to_string()],
4311            pivot_column: "key".to_string(),
4312            value_column: "value".to_string(),
4313            aggregation: PivotAggregation::First,
4314            sort_columns: false,
4315        };
4316        state.pivot(&spec).unwrap();
4317        let df = state.lf.clone().collect().unwrap();
4318        let a_col = df.column("A").unwrap();
4319        assert_eq!(a_col.get(0).unwrap(), AnyValue::Float64(10.0));
4320        assert_eq!(a_col.get(1).unwrap(), AnyValue::Float64(40.0));
4321    }
4322
4323    #[test]
4324    fn test_pivot_aggregation_min_max() {
4325        let lf = create_pivot_long_lf();
4326        let mut state_min = DataTableState::new(lf.clone(), None, None, None, None, true).unwrap();
4327        state_min
4328            .pivot(&PivotSpec {
4329                index: vec!["id".to_string(), "date".to_string()],
4330                pivot_column: "key".to_string(),
4331                value_column: "value".to_string(),
4332                aggregation: PivotAggregation::Min,
4333                sort_columns: false,
4334            })
4335            .unwrap();
4336        let df_min = state_min.lf.clone().collect().unwrap();
4337        assert_eq!(
4338            df_min.column("A").unwrap().get(0).unwrap(),
4339            AnyValue::Float64(10.0)
4340        );
4341
4342        let mut state_max = DataTableState::new(lf, None, None, None, None, true).unwrap();
4343        state_max
4344            .pivot(&PivotSpec {
4345                index: vec!["id".to_string(), "date".to_string()],
4346                pivot_column: "key".to_string(),
4347                value_column: "value".to_string(),
4348                aggregation: PivotAggregation::Max,
4349                sort_columns: false,
4350            })
4351            .unwrap();
4352        let df_max = state_max.lf.clone().collect().unwrap();
4353        assert_eq!(
4354            df_max.column("A").unwrap().get(0).unwrap(),
4355            AnyValue::Float64(11.0)
4356        );
4357    }
4358
4359    #[test]
4360    fn test_pivot_aggregation_avg_count() {
4361        let lf = create_pivot_long_lf();
4362        let mut state_avg = DataTableState::new(lf.clone(), None, None, None, None, true).unwrap();
4363        state_avg
4364            .pivot(&PivotSpec {
4365                index: vec!["id".to_string(), "date".to_string()],
4366                pivot_column: "key".to_string(),
4367                value_column: "value".to_string(),
4368                aggregation: PivotAggregation::Avg,
4369                sort_columns: false,
4370            })
4371            .unwrap();
4372        let df_avg = state_avg.lf.clone().collect().unwrap();
4373        let a = df_avg.column("A").unwrap().get(0).unwrap();
4374        if let AnyValue::Float64(x) = a {
4375            assert!((x - 10.5).abs() < 1e-6);
4376        } else {
4377            panic!("expected float");
4378        }
4379
4380        let mut state_count = DataTableState::new(lf, None, None, None, None, true).unwrap();
4381        state_count
4382            .pivot(&PivotSpec {
4383                index: vec!["id".to_string(), "date".to_string()],
4384                pivot_column: "key".to_string(),
4385                value_column: "value".to_string(),
4386                aggregation: PivotAggregation::Count,
4387                sort_columns: false,
4388            })
4389            .unwrap();
4390        let df_count = state_count.lf.clone().collect().unwrap();
4391        let a = df_count.column("A").unwrap().get(0).unwrap();
4392        assert_eq!(a, AnyValue::UInt32(2));
4393    }
4394
4395    #[test]
4396    fn test_pivot_string_first_last() {
4397        let df = df!(
4398            "id" => &[1_i32, 1, 2, 2],
4399            "key" => &["X", "Y", "X", "Y"],
4400            "value" => &["low", "mid", "high", "mid"],
4401        )
4402        .unwrap();
4403        let lf = df.lazy();
4404        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4405        let spec = PivotSpec {
4406            index: vec!["id".to_string()],
4407            pivot_column: "key".to_string(),
4408            value_column: "value".to_string(),
4409            aggregation: PivotAggregation::Last,
4410            sort_columns: false,
4411        };
4412        state.pivot(&spec).unwrap();
4413        let out = state.lf.clone().collect().unwrap();
4414        assert_eq!(
4415            out.column("X").unwrap().get(0).unwrap(),
4416            AnyValue::String("low")
4417        );
4418        assert_eq!(
4419            out.column("Y").unwrap().get(0).unwrap(),
4420            AnyValue::String("mid")
4421        );
4422    }
4423
4424    #[test]
4425    fn test_melt_basic() {
4426        let lf = create_melt_wide_lf();
4427        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4428        let spec = MeltSpec {
4429            index: vec!["id".to_string(), "date".to_string()],
4430            value_columns: vec!["c1".to_string(), "c2".to_string(), "c3".to_string()],
4431            variable_name: "variable".to_string(),
4432            value_name: "value".to_string(),
4433        };
4434        state.melt(&spec).unwrap();
4435        let df = state.lf.clone().collect().unwrap();
4436        assert_eq!(df.height(), 9);
4437        let names: Vec<&str> = df.get_column_names().iter().map(|s| s.as_str()).collect();
4438        assert!(names.contains(&"variable"));
4439        assert!(names.contains(&"value"));
4440        assert!(names.contains(&"id"));
4441        assert!(names.contains(&"date"));
4442    }
4443
4444    #[test]
4445    fn test_melt_all_except_index() {
4446        let lf = create_melt_wide_lf();
4447        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4448        let spec = MeltSpec {
4449            index: vec!["id".to_string(), "date".to_string()],
4450            value_columns: vec!["c1".to_string(), "c2".to_string(), "c3".to_string()],
4451            variable_name: "var".to_string(),
4452            value_name: "val".to_string(),
4453        };
4454        state.melt(&spec).unwrap();
4455        let df = state.lf.clone().collect().unwrap();
4456        assert!(df.column("var").is_ok());
4457        assert!(df.column("val").is_ok());
4458    }
4459
4460    #[test]
4461    fn test_pivot_on_current_view_after_filter() {
4462        let lf = create_pivot_long_lf();
4463        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4464        state.filter(vec![FilterStatement {
4465            column: "id".to_string(),
4466            operator: FilterOperator::Eq,
4467            value: "1".to_string(),
4468            logical_op: LogicalOperator::And,
4469        }]);
4470        let spec = PivotSpec {
4471            index: vec!["id".to_string(), "date".to_string()],
4472            pivot_column: "key".to_string(),
4473            value_column: "value".to_string(),
4474            aggregation: PivotAggregation::Last,
4475            sort_columns: false,
4476        };
4477        state.pivot(&spec).unwrap();
4478        let df = state.lf.clone().collect().unwrap();
4479        assert_eq!(df.height(), 1);
4480        let id_col = df.column("id").unwrap();
4481        assert_eq!(id_col.get(0).unwrap(), AnyValue::Int32(1));
4482    }
4483
4484    #[test]
4485    fn test_fuzzy_token_regex() {
4486        assert_eq!(fuzzy_token_regex("foo"), "(?i).*f.*o.*o.*");
4487        assert_eq!(fuzzy_token_regex("a"), "(?i).*a.*");
4488        // Regex-special characters are escaped
4489        let pat = fuzzy_token_regex("[");
4490        assert!(pat.contains("\\["));
4491    }
4492
4493    #[test]
4494    fn test_fuzzy_search() {
4495        // Filter logic is covered by test_fuzzy_search_regex_direct. This test runs the full
4496        // path through DataTableState; it requires sample data (CSV with string column).
4497        crate::tests::ensure_sample_data();
4498        let path = crate::tests::sample_data_dir().join("3-sfd-header.csv");
4499        let mut state = DataTableState::from_csv(&path, &Default::default()).unwrap();
4500        state.visible_rows = 10;
4501        state.collect();
4502        let before = state.num_rows;
4503        state.fuzzy_search("string".to_string());
4504        assert!(state.error.is_none(), "{:?}", state.error);
4505        assert!(state.num_rows <= before, "fuzzy search should filter rows");
4506        state.fuzzy_search("".to_string());
4507        state.collect();
4508        assert_eq!(state.num_rows, before, "empty fuzzy search should reset");
4509        assert!(state.get_active_fuzzy_query().is_empty());
4510    }
4511
4512    #[test]
4513    fn test_fuzzy_search_regex_direct() {
4514        // Sanity check: Polars str().contains with our regex matches "alice" for pattern ".*a.*l.*i.*"
4515        let lf = df!("name" => &["alice", "bob", "carol"]).unwrap().lazy();
4516        let pattern = fuzzy_token_regex("alice");
4517        let out = lf
4518            .filter(col("name").str().contains(lit(pattern.clone()), false))
4519            .collect()
4520            .unwrap();
4521        assert_eq!(out.height(), 1, "regex {:?} should match alice", pattern);
4522
4523        // Two columns OR (as in fuzzy_search)
4524        let lf2 = df!(
4525            "id" => &[1i32, 2, 3],
4526            "name" => &["alice", "bob", "carol"],
4527            "city" => &["NYC", "LA", "Boston"]
4528        )
4529        .unwrap()
4530        .lazy();
4531        let pat = fuzzy_token_regex("alice");
4532        let expr = col("name")
4533            .str()
4534            .contains(lit(pat.clone()), false)
4535            .or(col("city").str().contains(lit(pat), false));
4536        let out2 = lf2.clone().filter(expr).collect().unwrap();
4537        assert_eq!(out2.height(), 1);
4538
4539        // Replicate exact fuzzy_search logic: schema from original_lf, string_cols, then filter
4540        let schema = lf2.clone().collect_schema().unwrap();
4541        let string_cols: Vec<String> = schema
4542            .iter()
4543            .filter(|(_, dtype)| dtype.is_string())
4544            .map(|(name, _)| name.to_string())
4545            .collect();
4546        assert!(
4547            !string_cols.is_empty(),
4548            "df! string cols should be detected"
4549        );
4550        let pattern = fuzzy_token_regex("alice");
4551        let token_expr = string_cols
4552            .iter()
4553            .map(|c| col(c.as_str()).str().contains(lit(pattern.clone()), false))
4554            .reduce(|a, b| a.or(b))
4555            .unwrap();
4556        let out3 = lf2.filter(token_expr).collect().unwrap();
4557        assert_eq!(
4558            out3.height(),
4559            1,
4560            "fuzzy_search-style filter should match 1 row"
4561        );
4562    }
4563
4564    #[test]
4565    fn test_fuzzy_search_no_string_columns() {
4566        let lf = df!("a" => &[1i32, 2, 3], "b" => &[10i64, 20, 30])
4567            .unwrap()
4568            .lazy();
4569        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4570        state.fuzzy_search("x".to_string());
4571        assert!(state.error.is_some());
4572    }
4573
4574    /// By-queries must produce results sorted by the group columns (age_group, then team)
4575    /// so that output order is deterministic and practical. Raw data is deliberately out of order.
4576    #[test]
4577    fn test_by_query_result_sorted_by_group_columns() {
4578        // Build a small table: age_group (1-5, out of order), team (Red/Blue/Green), score (0-100)
4579        let df = df!(
4580            "age_group" => &[3i64, 1, 5, 2, 4, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5],
4581            "team" => &[
4582                "Red", "Blue", "Green", "Red", "Blue", "Green", "Green", "Red", "Blue",
4583                "Green", "Red", "Blue", "Red", "Blue", "Green",
4584            ],
4585            "score" => &[50.0f64, 10.0, 90.0, 20.0, 30.0, 40.0, 60.0, 70.0, 80.0, 15.0, 25.0, 35.0, 45.0, 55.0, 65.0],
4586        )
4587        .unwrap();
4588        let lf = df.lazy();
4589        let options = crate::OpenOptions::default();
4590        let mut state = DataTableState::from_lazyframe(lf, &options).unwrap();
4591        state.query("select avg score by age_group, team".to_string());
4592        assert!(
4593            state.error.is_none(),
4594            "query should succeed: {:?}",
4595            state.error
4596        );
4597        let result = state.lf.collect().unwrap();
4598        // Result must be sorted by group columns (age_group, then team)
4599        let sorted = result
4600            .sort(
4601                ["age_group", "team"],
4602                SortMultipleOptions::default().with_order_descending(false),
4603            )
4604            .unwrap();
4605        assert_eq!(
4606            result, sorted,
4607            "by-query result must be sorted by (age_group, team)"
4608        );
4609    }
4610
4611    /// Computed group keys (e.g. Fare: 1+floor Fare % 25) must be sorted by their result column
4612    /// values, not by re-evaluating the expression on the result.
4613    #[test]
4614    fn test_by_query_computed_group_key_sorted_by_result_column() {
4615        let df = df!(
4616            "x" => &[7.0f64, 12.0, 3.0, 22.0, 17.0, 8.0],
4617            "v" => &[1.0f64, 2.0, 3.0, 4.0, 5.0, 6.0],
4618        )
4619        .unwrap();
4620        let lf = df.lazy();
4621        let options = crate::OpenOptions::default();
4622        let mut state = DataTableState::from_lazyframe(lf, &options).unwrap();
4623        // bucket: 1+floor(x)%3 -> values 1,2,3; raw x order 7,12,3,22,17,8 -> buckets 2,2,1,2,2,2
4624        state.query("select sum v by bucket: 1+floor x % 3".to_string());
4625        assert!(
4626            state.error.is_none(),
4627            "query should succeed: {:?}",
4628            state.error
4629        );
4630        let result = state.lf.collect().unwrap();
4631        let bucket = result.column("bucket").unwrap();
4632        // Must be sorted by bucket (1, 2, 3)
4633        for i in 1..result.height() {
4634            let prev: i64 = bucket.get(i - 1).unwrap().try_extract().unwrap_or(0);
4635            let curr: i64 = bucket.get(i).unwrap().try_extract().unwrap_or(0);
4636            assert!(
4637                curr >= prev,
4638                "bucket column must be sorted: {} then {}",
4639                prev,
4640                curr
4641            );
4642        }
4643    }
4644}