Skip to main content

datui_lib/widgets/
datatable.rs

1use color_eyre::Result;
2use std::borrow::Cow;
3use std::collections::HashSet;
4use std::sync::Arc;
5use std::{fs, fs::File, path::Path};
6
7use polars::io::HiveOptions;
8use polars::prelude::*;
9use ratatui::{
10    buffer::Buffer,
11    layout::Rect,
12    style::{Color, Modifier, Style},
13    text::{Line, Span},
14    widgets::{
15        Block, Borders, Cell, Padding, Paragraph, Row, StatefulWidget, Table, TableState, Widget,
16    },
17};
18
19use crate::error_display::user_message_from_polars;
20use crate::filter_modal::{FilterOperator, FilterStatement, LogicalOperator};
21use crate::pivot_melt_modal::{MeltSpec, PivotAggregation, PivotSpec};
22use crate::query::parse_query;
23use crate::statistics::collect_lazy;
24use crate::{CompressionFormat, OpenOptions};
25use polars::lazy::frame::pivot::pivot_stable;
26use std::io::{BufReader, Read};
27
28use calamine::{open_workbook_auto, Data, Reader};
29use chrono::{NaiveDate, NaiveDateTime, NaiveTime};
30use orc_rust::ArrowReaderBuilder;
31use tempfile::NamedTempFile;
32
33use arrow::array::types::{
34    Date32Type, Date64Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type,
35    TimestampMillisecondType, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
36};
37use arrow::array::{Array, AsArray};
38use arrow::record_batch::RecordBatch;
39
40fn pivot_agg_expr(agg: PivotAggregation) -> Result<Expr> {
41    let e = col(PlSmallStr::from_static(""));
42    let expr = match agg {
43        PivotAggregation::Last => e.last(),
44        PivotAggregation::First => e.first(),
45        PivotAggregation::Min => e.min(),
46        PivotAggregation::Max => e.max(),
47        PivotAggregation::Avg => e.mean(),
48        PivotAggregation::Med => e.median(),
49        PivotAggregation::Std => e.std(1),
50        PivotAggregation::Count => e.len(),
51    };
52    Ok(expr)
53}
54
55pub struct DataTableState {
56    pub lf: LazyFrame,
57    original_lf: LazyFrame,
58    df: Option<DataFrame>,        // Scrollable columns dataframe
59    locked_df: Option<DataFrame>, // Locked columns dataframe
60    pub table_state: TableState,
61    pub start_row: usize,
62    pub visible_rows: usize,
63    pub termcol_index: usize,
64    pub visible_termcols: usize,
65    pub error: Option<PolarsError>,
66    pub suppress_error_display: bool, // When true, don't show errors in main view (e.g., when query input is active)
67    pub schema: Arc<Schema>,
68    pub num_rows: usize,
69    /// When true, collect() skips the len() query.
70    num_rows_valid: bool,
71    filters: Vec<FilterStatement>,
72    sort_columns: Vec<String>,
73    sort_ascending: bool,
74    pub active_query: String,
75    /// Last executed SQL (Sql tab). Independent from active_query; only one applies to current view.
76    pub active_sql_query: String,
77    /// Last executed fuzzy search (Fuzzy tab). Independent from active_query/active_sql_query.
78    pub active_fuzzy_query: String,
79    column_order: Vec<String>,   // Order of columns for display
80    locked_columns_count: usize, // Number of locked columns (from left)
81    grouped_lf: Option<LazyFrame>,
82    drilled_down_group_index: Option<usize>, // Index of the group we're viewing
83    pub drilled_down_group_key: Option<Vec<String>>, // Key values of the drilled down group
84    pub drilled_down_group_key_columns: Option<Vec<String>>, // Key column names of the drilled down group
85    pages_lookahead: usize,
86    pages_lookback: usize,
87    max_buffered_rows: usize, // 0 = no limit
88    max_buffered_mb: usize,   // 0 = no limit
89    buffered_start_row: usize,
90    buffered_end_row: usize,
91    /// Full buffered DataFrame (all columns in column_order) for the current buffer range.
92    /// When set, column scroll (scroll_left/scroll_right) only re-slices columns without re-collecting from LazyFrame.
93    buffered_df: Option<DataFrame>,
94    proximity_threshold: usize,
95    row_numbers: bool,
96    row_start_index: usize,
97    /// Last applied pivot spec, if current lf is result of a pivot. Used for templates.
98    last_pivot_spec: Option<PivotSpec>,
99    /// Last applied melt spec, if current lf is result of a melt. Used for templates.
100    last_melt_spec: Option<MeltSpec>,
101    /// When set, dataset was loaded with hive partitioning; partition column names for Info panel and predicate pushdown.
102    pub partition_columns: Option<Vec<String>>,
103    /// When set, decompressed CSV was written to this temp file; kept alive so the file exists for lazy scan.
104    decompress_temp_file: Option<NamedTempFile>,
105    /// When true, use Polars streaming engine for LazyFrame collect when the streaming feature is enabled.
106    pub polars_streaming: bool,
107}
108
109/// Inferred type for an Excel column (preserves numbers, bools, dates; avoids stringifying).
110#[derive(Clone, Copy)]
111enum ExcelColType {
112    Int64,
113    Float64,
114    Boolean,
115    Utf8,
116    Date,
117    Datetime,
118}
119
120impl DataTableState {
121    pub fn new(
122        lf: LazyFrame,
123        pages_lookahead: Option<usize>,
124        pages_lookback: Option<usize>,
125        max_buffered_rows: Option<usize>,
126        max_buffered_mb: Option<usize>,
127        polars_streaming: bool,
128    ) -> Result<Self> {
129        let schema = lf.clone().collect_schema()?;
130        let column_order: Vec<String> = schema.iter_names().map(|s| s.to_string()).collect();
131        Ok(Self {
132            original_lf: lf.clone(),
133            lf,
134            df: None,
135            locked_df: None,
136            table_state: TableState::default(),
137            start_row: 0,
138            visible_rows: 0,
139            termcol_index: 0,
140            visible_termcols: 0,
141            error: None,
142            suppress_error_display: false,
143            schema,
144            num_rows: 0,
145            num_rows_valid: false,
146            filters: Vec::new(),
147            sort_columns: Vec::new(),
148            sort_ascending: true,
149            active_query: String::new(),
150            active_sql_query: String::new(),
151            active_fuzzy_query: String::new(),
152            column_order,
153            locked_columns_count: 0,
154            grouped_lf: None,
155            drilled_down_group_index: None,
156            drilled_down_group_key: None,
157            drilled_down_group_key_columns: None,
158            pages_lookahead: pages_lookahead.unwrap_or(3),
159            pages_lookback: pages_lookback.unwrap_or(3),
160            max_buffered_rows: max_buffered_rows.unwrap_or(100_000),
161            max_buffered_mb: max_buffered_mb.unwrap_or(512),
162            buffered_start_row: 0,
163            buffered_end_row: 0,
164            buffered_df: None,
165            proximity_threshold: 0, // Will be set when visible_rows is known
166            row_numbers: false,     // Will be set from options
167            row_start_index: 1,     // Will be set from options
168            last_pivot_spec: None,
169            last_melt_spec: None,
170            partition_columns: None,
171            decompress_temp_file: None,
172            polars_streaming,
173        })
174    }
175
176    /// Create state from an existing LazyFrame (e.g. from Python or in-memory). Uses OpenOptions for display/buffer settings.
177    pub fn from_lazyframe(lf: LazyFrame, options: &crate::OpenOptions) -> Result<Self> {
178        let mut state = Self::new(
179            lf,
180            options.pages_lookahead,
181            options.pages_lookback,
182            options.max_buffered_rows,
183            options.max_buffered_mb,
184            options.polars_streaming,
185        )?;
186        state.row_numbers = options.row_numbers;
187        state.row_start_index = options.row_start_index;
188        Ok(state)
189    }
190
191    /// Create state from a pre-collected schema and LazyFrame (for phased loading). Does not call collect_schema();
192    /// df is None so the UI can render headers while the first collect() runs.
193    /// When `partition_columns` is Some (e.g. hive), column order is partition cols first.
194    pub fn from_schema_and_lazyframe(
195        schema: Arc<Schema>,
196        lf: LazyFrame,
197        options: &crate::OpenOptions,
198        partition_columns: Option<Vec<String>>,
199    ) -> Result<Self> {
200        let column_order: Vec<String> = if let Some(ref part) = partition_columns {
201            let part_set: HashSet<&str> = part.iter().map(String::as_str).collect();
202            let rest: Vec<String> = schema
203                .iter_names()
204                .map(|s| s.to_string())
205                .filter(|c| !part_set.contains(c.as_str()))
206                .collect();
207            part.iter().cloned().chain(rest).collect()
208        } else {
209            schema.iter_names().map(|s| s.to_string()).collect()
210        };
211        Ok(Self {
212            original_lf: lf.clone(),
213            lf,
214            df: None,
215            locked_df: None,
216            table_state: TableState::default(),
217            start_row: 0,
218            visible_rows: 0,
219            termcol_index: 0,
220            visible_termcols: 0,
221            error: None,
222            suppress_error_display: false,
223            schema,
224            num_rows: 0,
225            num_rows_valid: false,
226            filters: Vec::new(),
227            sort_columns: Vec::new(),
228            sort_ascending: true,
229            active_query: String::new(),
230            active_sql_query: String::new(),
231            active_fuzzy_query: String::new(),
232            column_order,
233            locked_columns_count: 0,
234            grouped_lf: None,
235            drilled_down_group_index: None,
236            drilled_down_group_key: None,
237            drilled_down_group_key_columns: None,
238            pages_lookahead: options.pages_lookahead.unwrap_or(3),
239            pages_lookback: options.pages_lookback.unwrap_or(3),
240            max_buffered_rows: options.max_buffered_rows.unwrap_or(100_000),
241            max_buffered_mb: options.max_buffered_mb.unwrap_or(512),
242            buffered_start_row: 0,
243            buffered_end_row: 0,
244            buffered_df: None,
245            proximity_threshold: 0,
246            row_numbers: options.row_numbers,
247            row_start_index: options.row_start_index,
248            last_pivot_spec: None,
249            last_melt_spec: None,
250            partition_columns,
251            decompress_temp_file: None,
252            polars_streaming: options.polars_streaming,
253        })
254    }
255
256    /// Reset LazyFrame and view state to original_lf. Schema is re-fetched so it matches
257    /// after a previous query/SQL that may have changed columns. Caller should call
258    /// collect() afterward if display update is needed (reset/query/fuzzy do; sql_query
259    /// relies on event loop Collect).
260    fn reset_lf_to_original(&mut self) {
261        self.invalidate_num_rows();
262        self.lf = self.original_lf.clone();
263        self.schema = self
264            .original_lf
265            .clone()
266            .collect_schema()
267            .unwrap_or_else(|_| Arc::new(Schema::with_capacity(0)));
268        self.column_order = self.schema.iter_names().map(|s| s.to_string()).collect();
269        self.active_query.clear();
270        self.active_sql_query.clear();
271        self.active_fuzzy_query.clear();
272        self.locked_columns_count = 0;
273        self.filters.clear();
274        self.sort_columns.clear();
275        self.sort_ascending = true;
276        self.start_row = 0;
277        self.termcol_index = 0;
278        self.drilled_down_group_index = None;
279        self.drilled_down_group_key = None;
280        self.drilled_down_group_key_columns = None;
281        self.grouped_lf = None;
282        self.buffered_start_row = 0;
283        self.buffered_end_row = 0;
284        self.buffered_df = None;
285        self.table_state.select(Some(0));
286    }
287
288    pub fn reset(&mut self) {
289        self.reset_lf_to_original();
290        self.error = None;
291        self.suppress_error_display = false;
292        self.last_pivot_spec = None;
293        self.last_melt_spec = None;
294        self.collect();
295        if self.num_rows > 0 {
296            self.start_row = 0;
297        }
298    }
299
300    pub fn from_parquet(
301        path: &Path,
302        pages_lookahead: Option<usize>,
303        pages_lookback: Option<usize>,
304        max_buffered_rows: Option<usize>,
305        max_buffered_mb: Option<usize>,
306        row_numbers: bool,
307        row_start_index: usize,
308    ) -> Result<Self> {
309        let path_str = path.as_os_str().to_string_lossy();
310        let is_glob = path_str.contains('*');
311        let pl_path = PlPath::Local(Arc::from(path));
312        let args = ScanArgsParquet {
313            glob: is_glob,
314            ..Default::default()
315        };
316        let lf = LazyFrame::scan_parquet(pl_path, args)?;
317        let mut state = Self::new(
318            lf,
319            pages_lookahead,
320            pages_lookback,
321            max_buffered_rows,
322            max_buffered_mb,
323            true,
324        )?;
325        state.row_numbers = row_numbers;
326        state.row_start_index = row_start_index;
327        Ok(state)
328    }
329
330    /// Load multiple Parquet files and concatenate them into one LazyFrame (same schema assumed).
331    pub fn from_parquet_paths(
332        paths: &[impl AsRef<Path>],
333        pages_lookahead: Option<usize>,
334        pages_lookback: Option<usize>,
335        max_buffered_rows: Option<usize>,
336        max_buffered_mb: Option<usize>,
337        row_numbers: bool,
338        row_start_index: usize,
339    ) -> Result<Self> {
340        if paths.is_empty() {
341            return Err(color_eyre::eyre::eyre!("No paths provided"));
342        }
343        if paths.len() == 1 {
344            return Self::from_parquet(
345                paths[0].as_ref(),
346                pages_lookahead,
347                pages_lookback,
348                max_buffered_rows,
349                max_buffered_mb,
350                row_numbers,
351                row_start_index,
352            );
353        }
354        let mut lazy_frames = Vec::with_capacity(paths.len());
355        for p in paths {
356            let pl_path = PlPath::Local(Arc::from(p.as_ref()));
357            let lf = LazyFrame::scan_parquet(pl_path, Default::default())?;
358            lazy_frames.push(lf);
359        }
360        let lf = polars::prelude::concat(lazy_frames.as_slice(), Default::default())?;
361        let mut state = Self::new(
362            lf,
363            pages_lookahead,
364            pages_lookback,
365            max_buffered_rows,
366            max_buffered_mb,
367            true,
368        )?;
369        state.row_numbers = row_numbers;
370        state.row_start_index = row_start_index;
371        Ok(state)
372    }
373
374    /// Load a single Arrow IPC / Feather v2 file (lazy).
375    pub fn from_ipc(
376        path: &Path,
377        pages_lookahead: Option<usize>,
378        pages_lookback: Option<usize>,
379        max_buffered_rows: Option<usize>,
380        max_buffered_mb: Option<usize>,
381        row_numbers: bool,
382        row_start_index: usize,
383    ) -> Result<Self> {
384        let pl_path = PlPath::Local(Arc::from(path));
385        let lf = LazyFrame::scan_ipc(pl_path, Default::default(), Default::default())?;
386        let mut state = Self::new(
387            lf,
388            pages_lookahead,
389            pages_lookback,
390            max_buffered_rows,
391            max_buffered_mb,
392            true,
393        )?;
394        state.row_numbers = row_numbers;
395        state.row_start_index = row_start_index;
396        Ok(state)
397    }
398
399    /// Load multiple Arrow IPC / Feather files and concatenate into one LazyFrame.
400    pub fn from_ipc_paths(
401        paths: &[impl AsRef<Path>],
402        pages_lookahead: Option<usize>,
403        pages_lookback: Option<usize>,
404        max_buffered_rows: Option<usize>,
405        max_buffered_mb: Option<usize>,
406        row_numbers: bool,
407        row_start_index: usize,
408    ) -> Result<Self> {
409        if paths.is_empty() {
410            return Err(color_eyre::eyre::eyre!("No paths provided"));
411        }
412        if paths.len() == 1 {
413            return Self::from_ipc(
414                paths[0].as_ref(),
415                pages_lookahead,
416                pages_lookback,
417                max_buffered_rows,
418                max_buffered_mb,
419                row_numbers,
420                row_start_index,
421            );
422        }
423        let mut lazy_frames = Vec::with_capacity(paths.len());
424        for p in paths {
425            let pl_path = PlPath::Local(Arc::from(p.as_ref()));
426            let lf = LazyFrame::scan_ipc(pl_path, Default::default(), Default::default())?;
427            lazy_frames.push(lf);
428        }
429        let lf = polars::prelude::concat(lazy_frames.as_slice(), Default::default())?;
430        let mut state = Self::new(
431            lf,
432            pages_lookahead,
433            pages_lookback,
434            max_buffered_rows,
435            max_buffered_mb,
436            true,
437        )?;
438        state.row_numbers = row_numbers;
439        state.row_start_index = row_start_index;
440        Ok(state)
441    }
442
443    /// Load a single Avro file (eager read, then lazy).
444    pub fn from_avro(
445        path: &Path,
446        pages_lookahead: Option<usize>,
447        pages_lookback: Option<usize>,
448        max_buffered_rows: Option<usize>,
449        max_buffered_mb: Option<usize>,
450        row_numbers: bool,
451        row_start_index: usize,
452    ) -> Result<Self> {
453        let file = File::open(path)?;
454        let df = polars::io::avro::AvroReader::new(file).finish()?;
455        let lf = df.lazy();
456        let mut state = Self::new(
457            lf,
458            pages_lookahead,
459            pages_lookback,
460            max_buffered_rows,
461            max_buffered_mb,
462            true,
463        )?;
464        state.row_numbers = row_numbers;
465        state.row_start_index = row_start_index;
466        Ok(state)
467    }
468
469    /// Load multiple Avro files and concatenate into one LazyFrame.
470    pub fn from_avro_paths(
471        paths: &[impl AsRef<Path>],
472        pages_lookahead: Option<usize>,
473        pages_lookback: Option<usize>,
474        max_buffered_rows: Option<usize>,
475        max_buffered_mb: Option<usize>,
476        row_numbers: bool,
477        row_start_index: usize,
478    ) -> Result<Self> {
479        if paths.is_empty() {
480            return Err(color_eyre::eyre::eyre!("No paths provided"));
481        }
482        if paths.len() == 1 {
483            return Self::from_avro(
484                paths[0].as_ref(),
485                pages_lookahead,
486                pages_lookback,
487                max_buffered_rows,
488                max_buffered_mb,
489                row_numbers,
490                row_start_index,
491            );
492        }
493        let mut lazy_frames = Vec::with_capacity(paths.len());
494        for p in paths {
495            let file = File::open(p.as_ref())?;
496            let df = polars::io::avro::AvroReader::new(file).finish()?;
497            lazy_frames.push(df.lazy());
498        }
499        let lf = polars::prelude::concat(lazy_frames.as_slice(), Default::default())?;
500        let mut state = Self::new(
501            lf,
502            pages_lookahead,
503            pages_lookback,
504            max_buffered_rows,
505            max_buffered_mb,
506            true,
507        )?;
508        state.row_numbers = row_numbers;
509        state.row_start_index = row_start_index;
510        Ok(state)
511    }
512
513    /// Load a single Excel file (xls, xlsx, xlsm, xlsb) using calamine (eager read, then lazy).
514    /// Sheet is selected by 0-based index or name via `excel_sheet`.
515    #[allow(clippy::too_many_arguments)]
516    pub fn from_excel(
517        path: &Path,
518        pages_lookahead: Option<usize>,
519        pages_lookback: Option<usize>,
520        max_buffered_rows: Option<usize>,
521        max_buffered_mb: Option<usize>,
522        row_numbers: bool,
523        row_start_index: usize,
524        excel_sheet: Option<&str>,
525    ) -> Result<Self> {
526        let mut workbook =
527            open_workbook_auto(path).map_err(|e| color_eyre::eyre::eyre!("Excel: {}", e))?;
528        let sheet_names = workbook.sheet_names().to_vec();
529        if sheet_names.is_empty() {
530            return Err(color_eyre::eyre::eyre!("Excel file has no worksheets"));
531        }
532        let range = if let Some(sheet_sel) = excel_sheet {
533            if let Ok(idx) = sheet_sel.parse::<usize>() {
534                workbook
535                    .worksheet_range_at(idx)
536                    .ok_or_else(|| color_eyre::eyre::eyre!("Excel: no sheet at index {}", idx))?
537                    .map_err(|e| color_eyre::eyre::eyre!("Excel: {}", e))?
538            } else {
539                workbook
540                    .worksheet_range(sheet_sel)
541                    .map_err(|e| color_eyre::eyre::eyre!("Excel: {}", e))?
542            }
543        } else {
544            workbook
545                .worksheet_range_at(0)
546                .ok_or_else(|| color_eyre::eyre::eyre!("Excel: no first sheet"))?
547                .map_err(|e| color_eyre::eyre::eyre!("Excel: {}", e))?
548        };
549        let rows: Vec<Vec<Data>> = range.rows().map(|r| r.to_vec()).collect();
550        if rows.is_empty() {
551            let empty_df = DataFrame::new(vec![])?;
552            let mut state = Self::new(
553                empty_df.lazy(),
554                pages_lookahead,
555                pages_lookback,
556                max_buffered_rows,
557                max_buffered_mb,
558                true,
559            )?;
560            state.row_numbers = row_numbers;
561            state.row_start_index = row_start_index;
562            return Ok(state);
563        }
564        let headers: Vec<String> = rows[0]
565            .iter()
566            .map(|c| calamine::DataType::as_string(c).unwrap_or_else(|| c.to_string()))
567            .collect();
568        let n_cols = headers.len();
569        let mut series_vec = Vec::with_capacity(n_cols);
570        for (col_idx, header) in headers.iter().enumerate() {
571            let col_cells: Vec<Option<&Data>> =
572                rows[1..].iter().map(|row| row.get(col_idx)).collect();
573            let inferred = Self::excel_infer_column_type(&col_cells);
574            let name = if header.is_empty() {
575                format!("column_{}", col_idx + 1)
576            } else {
577                header.clone()
578            };
579            let series = Self::excel_column_to_series(name.as_str(), &col_cells, inferred)?;
580            series_vec.push(series.into());
581        }
582        let df = DataFrame::new(series_vec)?;
583        let mut state = Self::new(
584            df.lazy(),
585            pages_lookahead,
586            pages_lookback,
587            max_buffered_rows,
588            max_buffered_mb,
589            true,
590        )?;
591        state.row_numbers = row_numbers;
592        state.row_start_index = row_start_index;
593        Ok(state)
594    }
595
596    /// Infers column type: prefers Int64 for whole-number floats; infers Date/Datetime for
597    /// calamine DateTime/DateTimeIso or for string columns that parse as ISO date/datetime.
598    fn excel_infer_column_type(cells: &[Option<&Data>]) -> ExcelColType {
599        use calamine::DataType as CalamineTrait;
600        let mut has_string = false;
601        let mut has_float = false;
602        let mut has_int = false;
603        let mut has_bool = false;
604        let mut has_datetime = false;
605        for cell in cells.iter().flatten() {
606            if CalamineTrait::is_string(*cell) {
607                has_string = true;
608                break;
609            }
610            if CalamineTrait::is_float(*cell)
611                || CalamineTrait::is_datetime(*cell)
612                || CalamineTrait::is_datetime_iso(*cell)
613            {
614                has_float = true;
615            }
616            if CalamineTrait::is_int(*cell) {
617                has_int = true;
618            }
619            if CalamineTrait::is_bool(*cell) {
620                has_bool = true;
621            }
622            if CalamineTrait::is_datetime(*cell) || CalamineTrait::is_datetime_iso(*cell) {
623                has_datetime = true;
624            }
625        }
626        if has_string {
627            let any_parsed = cells
628                .iter()
629                .flatten()
630                .any(|c| Self::excel_cell_to_naive_datetime(c).is_some());
631            let all_non_empty_parse = cells.iter().flatten().all(|c| {
632                CalamineTrait::is_empty(*c) || Self::excel_cell_to_naive_datetime(c).is_some()
633            });
634            if any_parsed && all_non_empty_parse {
635                if Self::excel_parsed_cells_all_midnight(cells) {
636                    ExcelColType::Date
637                } else {
638                    ExcelColType::Datetime
639                }
640            } else {
641                ExcelColType::Utf8
642            }
643        } else if has_int {
644            ExcelColType::Int64
645        } else if has_datetime {
646            if Self::excel_parsed_cells_all_midnight(cells) {
647                ExcelColType::Date
648            } else {
649                ExcelColType::Datetime
650            }
651        } else if has_float {
652            let all_whole = cells.iter().flatten().all(|cell| {
653                cell.as_f64()
654                    .is_none_or(|f| f.is_finite() && (f - f.trunc()).abs() < 1e-10)
655            });
656            if all_whole {
657                ExcelColType::Int64
658            } else {
659                ExcelColType::Float64
660            }
661        } else if has_bool {
662            ExcelColType::Boolean
663        } else {
664            ExcelColType::Utf8
665        }
666    }
667
668    /// True if every cell that parses as datetime has time 00:00:00.
669    fn excel_parsed_cells_all_midnight(cells: &[Option<&Data>]) -> bool {
670        let midnight = NaiveTime::from_hms_opt(0, 0, 0).expect("valid time");
671        cells
672            .iter()
673            .flatten()
674            .filter_map(|c| Self::excel_cell_to_naive_datetime(c))
675            .all(|dt| dt.time() == midnight)
676    }
677
678    /// Converts a calamine cell to NaiveDateTime (Excel serial, DateTimeIso, or parseable string).
679    fn excel_cell_to_naive_datetime(cell: &Data) -> Option<NaiveDateTime> {
680        use calamine::DataType;
681        if let Some(dt) = cell.as_datetime() {
682            return Some(dt);
683        }
684        let s = cell.get_datetime_iso().or_else(|| cell.get_string())?;
685        Self::parse_naive_datetime_str(s)
686    }
687
688    /// Parses an ISO-style date/datetime string; tries FORMATS in order.
689    fn parse_naive_datetime_str(s: &str) -> Option<NaiveDateTime> {
690        let s = s.trim();
691        if s.is_empty() {
692            return None;
693        }
694        const FORMATS: &[&str] = &[
695            "%Y-%m-%dT%H:%M:%S%.f",
696            "%Y-%m-%dT%H:%M:%S",
697            "%Y-%m-%d %H:%M:%S%.f",
698            "%Y-%m-%d %H:%M:%S",
699            "%Y-%m-%d",
700        ];
701        for fmt in FORMATS {
702            if let Ok(dt) = NaiveDateTime::parse_from_str(s, fmt) {
703                return Some(dt);
704            }
705        }
706        if let Ok(d) = NaiveDate::parse_from_str(s, "%Y-%m-%d") {
707            return Some(d.and_hms_opt(0, 0, 0).expect("midnight"));
708        }
709        None
710    }
711
712    /// Build a Polars Series from a column of calamine cells using the inferred type.
713    fn excel_column_to_series(
714        name: &str,
715        cells: &[Option<&Data>],
716        col_type: ExcelColType,
717    ) -> Result<Series> {
718        use calamine::DataType as CalamineTrait;
719        use polars::datatypes::TimeUnit;
720        let series = match col_type {
721            ExcelColType::Int64 => {
722                let v: Vec<Option<i64>> = cells
723                    .iter()
724                    .map(|c| c.and_then(|cell| cell.as_i64()))
725                    .collect();
726                Series::new(name.into(), v)
727            }
728            ExcelColType::Float64 => {
729                let v: Vec<Option<f64>> = cells
730                    .iter()
731                    .map(|c| c.and_then(|cell| cell.as_f64()))
732                    .collect();
733                Series::new(name.into(), v)
734            }
735            ExcelColType::Boolean => {
736                let v: Vec<Option<bool>> = cells
737                    .iter()
738                    .map(|c| c.and_then(|cell| cell.get_bool()))
739                    .collect();
740                Series::new(name.into(), v)
741            }
742            ExcelColType::Utf8 => {
743                let v: Vec<Option<String>> = cells
744                    .iter()
745                    .map(|c| c.and_then(|cell| cell.as_string()))
746                    .collect();
747                Series::new(name.into(), v)
748            }
749            ExcelColType::Date => {
750                let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).expect("valid date");
751                let v: Vec<Option<i32>> = cells
752                    .iter()
753                    .map(|c| {
754                        c.and_then(Self::excel_cell_to_naive_datetime)
755                            .map(|dt| (dt.date() - epoch).num_days() as i32)
756                    })
757                    .collect();
758                Series::new(name.into(), v).cast(&DataType::Date)?
759            }
760            ExcelColType::Datetime => {
761                let v: Vec<Option<i64>> = cells
762                    .iter()
763                    .map(|c| {
764                        c.and_then(Self::excel_cell_to_naive_datetime)
765                            .map(|dt| dt.and_utc().timestamp_micros())
766                    })
767                    .collect();
768                Series::new(name.into(), v)
769                    .cast(&DataType::Datetime(TimeUnit::Microseconds, None))?
770            }
771        };
772        Ok(series)
773    }
774
775    /// Load a single ORC file (eager read via orc-rust → Arrow, then convert to Polars, then lazy).
776    /// ORC is read fully into memory; see loading-data docs for large-file notes.
777    pub fn from_orc(
778        path: &Path,
779        pages_lookahead: Option<usize>,
780        pages_lookback: Option<usize>,
781        max_buffered_rows: Option<usize>,
782        max_buffered_mb: Option<usize>,
783        row_numbers: bool,
784        row_start_index: usize,
785    ) -> Result<Self> {
786        let file = File::open(path)?;
787        let reader = ArrowReaderBuilder::try_new(file)
788            .map_err(|e| color_eyre::eyre::eyre!("ORC: {}", e))?
789            .build();
790        let batches: Vec<RecordBatch> = reader
791            .collect::<std::result::Result<Vec<_>, _>>()
792            .map_err(|e| color_eyre::eyre::eyre!("ORC: {}", e))?;
793        let df = Self::arrow_record_batches_to_dataframe(&batches)?;
794        let lf = df.lazy();
795        let mut state = Self::new(
796            lf,
797            pages_lookahead,
798            pages_lookback,
799            max_buffered_rows,
800            max_buffered_mb,
801            true,
802        )?;
803        state.row_numbers = row_numbers;
804        state.row_start_index = row_start_index;
805        Ok(state)
806    }
807
808    /// Load multiple ORC files and concatenate into one LazyFrame.
809    pub fn from_orc_paths(
810        paths: &[impl AsRef<Path>],
811        pages_lookahead: Option<usize>,
812        pages_lookback: Option<usize>,
813        max_buffered_rows: Option<usize>,
814        max_buffered_mb: Option<usize>,
815        row_numbers: bool,
816        row_start_index: usize,
817    ) -> Result<Self> {
818        if paths.is_empty() {
819            return Err(color_eyre::eyre::eyre!("No paths provided"));
820        }
821        if paths.len() == 1 {
822            return Self::from_orc(
823                paths[0].as_ref(),
824                pages_lookahead,
825                pages_lookback,
826                max_buffered_rows,
827                max_buffered_mb,
828                row_numbers,
829                row_start_index,
830            );
831        }
832        let mut lazy_frames = Vec::with_capacity(paths.len());
833        for p in paths {
834            let file = File::open(p.as_ref())?;
835            let reader = ArrowReaderBuilder::try_new(file)
836                .map_err(|e| color_eyre::eyre::eyre!("ORC: {}", e))?
837                .build();
838            let batches: Vec<RecordBatch> = reader
839                .collect::<std::result::Result<Vec<_>, _>>()
840                .map_err(|e| color_eyre::eyre::eyre!("ORC: {}", e))?;
841            let df = Self::arrow_record_batches_to_dataframe(&batches)?;
842            lazy_frames.push(df.lazy());
843        }
844        let lf = polars::prelude::concat(lazy_frames.as_slice(), Default::default())?;
845        let mut state = Self::new(
846            lf,
847            pages_lookahead,
848            pages_lookback,
849            max_buffered_rows,
850            max_buffered_mb,
851            true,
852        )?;
853        state.row_numbers = row_numbers;
854        state.row_start_index = row_start_index;
855        Ok(state)
856    }
857
858    /// Convert Arrow (arrow crate 57) RecordBatches to Polars DataFrame by value (ORC uses
859    /// arrow 57; Polars uses polars-arrow, so we cannot use Series::from_arrow).
860    fn arrow_record_batches_to_dataframe(batches: &[RecordBatch]) -> Result<DataFrame> {
861        if batches.is_empty() {
862            return Ok(DataFrame::new(vec![])?);
863        }
864        let mut all_dfs = Vec::with_capacity(batches.len());
865        for batch in batches {
866            let n_cols = batch.num_columns();
867            let schema = batch.schema();
868            let mut series_vec = Vec::with_capacity(n_cols);
869            for (i, col) in batch.columns().iter().enumerate() {
870                let name = schema.field(i).name().as_str();
871                let s = Self::arrow_array_to_polars_series(name, col)?;
872                series_vec.push(s.into());
873            }
874            let df = DataFrame::new(series_vec)?;
875            all_dfs.push(df);
876        }
877        let mut out = all_dfs.remove(0);
878        for df in all_dfs {
879            out = out.vstack(&df)?;
880        }
881        Ok(out)
882    }
883
884    fn arrow_array_to_polars_series(name: &str, array: &dyn Array) -> Result<Series> {
885        use arrow::datatypes::DataType as ArrowDataType;
886        let len = array.len();
887        match array.data_type() {
888            ArrowDataType::Int8 => {
889                let a = array
890                    .as_primitive_opt::<Int8Type>()
891                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Int8 array"))?;
892                let v: Vec<Option<i8>> = (0..len)
893                    .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
894                    .collect();
895                Ok(Series::new(name.into(), v))
896            }
897            ArrowDataType::Int16 => {
898                let a = array
899                    .as_primitive_opt::<Int16Type>()
900                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Int16 array"))?;
901                let v: Vec<Option<i16>> = (0..len)
902                    .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
903                    .collect();
904                Ok(Series::new(name.into(), v))
905            }
906            ArrowDataType::Int32 => {
907                let a = array
908                    .as_primitive_opt::<Int32Type>()
909                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Int32 array"))?;
910                let v: Vec<Option<i32>> = (0..len)
911                    .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
912                    .collect();
913                Ok(Series::new(name.into(), v))
914            }
915            ArrowDataType::Int64 => {
916                let a = array
917                    .as_primitive_opt::<Int64Type>()
918                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Int64 array"))?;
919                let v: Vec<Option<i64>> = (0..len)
920                    .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
921                    .collect();
922                Ok(Series::new(name.into(), v))
923            }
924            ArrowDataType::UInt8 => {
925                let a = array
926                    .as_primitive_opt::<UInt8Type>()
927                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected UInt8 array"))?;
928                let v: Vec<Option<i64>> = (0..len)
929                    .map(|i| {
930                        if a.is_null(i) {
931                            None
932                        } else {
933                            Some(a.value(i) as i64)
934                        }
935                    })
936                    .collect();
937                Ok(Series::new(name.into(), v).cast(&DataType::UInt8)?)
938            }
939            ArrowDataType::UInt16 => {
940                let a = array
941                    .as_primitive_opt::<UInt16Type>()
942                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected UInt16 array"))?;
943                let v: Vec<Option<i64>> = (0..len)
944                    .map(|i| {
945                        if a.is_null(i) {
946                            None
947                        } else {
948                            Some(a.value(i) as i64)
949                        }
950                    })
951                    .collect();
952                Ok(Series::new(name.into(), v).cast(&DataType::UInt16)?)
953            }
954            ArrowDataType::UInt32 => {
955                let a = array
956                    .as_primitive_opt::<UInt32Type>()
957                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected UInt32 array"))?;
958                let v: Vec<Option<u32>> = (0..len)
959                    .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
960                    .collect();
961                Ok(Series::new(name.into(), v))
962            }
963            ArrowDataType::UInt64 => {
964                let a = array
965                    .as_primitive_opt::<UInt64Type>()
966                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected UInt64 array"))?;
967                let v: Vec<Option<u64>> = (0..len)
968                    .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
969                    .collect();
970                Ok(Series::new(name.into(), v))
971            }
972            ArrowDataType::Float32 => {
973                let a = array
974                    .as_primitive_opt::<Float32Type>()
975                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Float32 array"))?;
976                let v: Vec<Option<f32>> = (0..len)
977                    .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
978                    .collect();
979                Ok(Series::new(name.into(), v))
980            }
981            ArrowDataType::Float64 => {
982                let a = array
983                    .as_primitive_opt::<Float64Type>()
984                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Float64 array"))?;
985                let v: Vec<Option<f64>> = (0..len)
986                    .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
987                    .collect();
988                Ok(Series::new(name.into(), v))
989            }
990            ArrowDataType::Boolean => {
991                let a = array
992                    .as_boolean_opt()
993                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Boolean array"))?;
994                let v: Vec<Option<bool>> = (0..len)
995                    .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
996                    .collect();
997                Ok(Series::new(name.into(), v))
998            }
999            ArrowDataType::Utf8 => {
1000                let a = array
1001                    .as_string_opt::<i32>()
1002                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Utf8 array"))?;
1003                let v: Vec<Option<String>> = (0..len)
1004                    .map(|i| {
1005                        if a.is_null(i) {
1006                            None
1007                        } else {
1008                            Some(a.value(i).to_string())
1009                        }
1010                    })
1011                    .collect();
1012                Ok(Series::new(name.into(), v))
1013            }
1014            ArrowDataType::LargeUtf8 => {
1015                let a = array
1016                    .as_string_opt::<i64>()
1017                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected LargeUtf8 array"))?;
1018                let v: Vec<Option<String>> = (0..len)
1019                    .map(|i| {
1020                        if a.is_null(i) {
1021                            None
1022                        } else {
1023                            Some(a.value(i).to_string())
1024                        }
1025                    })
1026                    .collect();
1027                Ok(Series::new(name.into(), v))
1028            }
1029            ArrowDataType::Date32 => {
1030                let a = array
1031                    .as_primitive_opt::<Date32Type>()
1032                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Date32 array"))?;
1033                let v: Vec<Option<i32>> = (0..len)
1034                    .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
1035                    .collect();
1036                Ok(Series::new(name.into(), v))
1037            }
1038            ArrowDataType::Date64 => {
1039                let a = array
1040                    .as_primitive_opt::<Date64Type>()
1041                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Date64 array"))?;
1042                let v: Vec<Option<i64>> = (0..len)
1043                    .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
1044                    .collect();
1045                Ok(Series::new(name.into(), v))
1046            }
1047            ArrowDataType::Timestamp(_, _) => {
1048                let a = array
1049                    .as_primitive_opt::<TimestampMillisecondType>()
1050                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Timestamp array"))?;
1051                let v: Vec<Option<i64>> = (0..len)
1052                    .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
1053                    .collect();
1054                Ok(Series::new(name.into(), v))
1055            }
1056            other => Err(color_eyre::eyre::eyre!(
1057                "ORC: unsupported column type {:?} for column '{}'",
1058                other,
1059                name
1060            )),
1061        }
1062    }
1063
1064    /// Build a LazyFrame for hive-partitioned Parquet only (no schema collection, no partition discovery).
1065    /// Use this for phased loading so "Scanning input" is instant; schema and partition handling happen in DoLoadSchema.
1066    pub fn scan_parquet_hive(path: &Path) -> Result<LazyFrame> {
1067        let path_str = path.as_os_str().to_string_lossy();
1068        let is_glob = path_str.contains('*');
1069        let pl_path = PlPath::Local(Arc::from(path));
1070        let args = ScanArgsParquet {
1071            hive_options: HiveOptions::new_enabled(),
1072            glob: is_glob,
1073            ..Default::default()
1074        };
1075        LazyFrame::scan_parquet(pl_path, args).map_err(Into::into)
1076    }
1077
1078    /// Build a LazyFrame for hive-partitioned Parquet with a pre-computed schema (avoids slow collect_schema across all files).
1079    pub fn scan_parquet_hive_with_schema(path: &Path, schema: Arc<Schema>) -> Result<LazyFrame> {
1080        let path_str = path.as_os_str().to_string_lossy();
1081        let is_glob = path_str.contains('*');
1082        let pl_path = PlPath::Local(Arc::from(path));
1083        let args = ScanArgsParquet {
1084            schema: Some(schema),
1085            hive_options: HiveOptions::new_enabled(),
1086            glob: is_glob,
1087            ..Default::default()
1088        };
1089        LazyFrame::scan_parquet(pl_path, args).map_err(Into::into)
1090    }
1091
1092    /// Find the first parquet file along a single spine of a hive-partitioned directory (same walk as partition discovery).
1093    /// Returns `None` if the directory is empty or has no parquet files along that spine.
1094    fn first_parquet_file_in_hive_dir(path: &Path) -> Option<std::path::PathBuf> {
1095        const MAX_DEPTH: usize = 64;
1096        Self::first_parquet_file_spine(path, 0, MAX_DEPTH)
1097    }
1098
1099    fn first_parquet_file_spine(
1100        path: &Path,
1101        depth: usize,
1102        max_depth: usize,
1103    ) -> Option<std::path::PathBuf> {
1104        if depth >= max_depth {
1105            return None;
1106        }
1107        let entries = fs::read_dir(path).ok()?;
1108        let mut first_partition_child: Option<std::path::PathBuf> = None;
1109        for entry in entries.flatten() {
1110            let child = entry.path();
1111            if child.is_file() {
1112                if child
1113                    .extension()
1114                    .is_some_and(|e| e.eq_ignore_ascii_case("parquet"))
1115                {
1116                    return Some(child);
1117                }
1118            } else if child.is_dir() {
1119                if let Some(name) = child.file_name().and_then(|n| n.to_str()) {
1120                    if name.contains('=') && first_partition_child.is_none() {
1121                        first_partition_child = Some(child);
1122                    }
1123                }
1124            }
1125        }
1126        first_partition_child.and_then(|p| Self::first_parquet_file_spine(&p, depth + 1, max_depth))
1127    }
1128
1129    /// Read schema from a single parquet file (metadata only, no data scan). Used to avoid collect_schema() over many files.
1130    fn read_schema_from_single_parquet(path: &Path) -> Result<Arc<Schema>> {
1131        let file = File::open(path)?;
1132        let mut reader = ParquetReader::new(file);
1133        let arrow_schema = reader.schema()?;
1134        let schema = Schema::from_arrow_schema(arrow_schema.as_ref());
1135        Ok(Arc::new(schema))
1136    }
1137
1138    /// Infer schema from one parquet file in a hive directory and merge with partition columns (Utf8).
1139    /// Returns (merged_schema, partition_columns). Use with scan_parquet_hive_with_schema to avoid slow collect_schema().
1140    /// Only supported when path is a directory (not a glob). Returns Err if no parquet file found or read fails.
1141    pub fn schema_from_one_hive_parquet(path: &Path) -> Result<(Arc<Schema>, Vec<String>)> {
1142        let partition_columns = Self::discover_hive_partition_columns(path);
1143        let one_file = Self::first_parquet_file_in_hive_dir(path)
1144            .ok_or_else(|| color_eyre::eyre::eyre!("No parquet file found in hive directory"))?;
1145        let file_schema = Self::read_schema_from_single_parquet(&one_file)?;
1146        let part_set: HashSet<&str> = partition_columns.iter().map(String::as_str).collect();
1147        let mut merged = Schema::with_capacity(partition_columns.len() + file_schema.len());
1148        for name in &partition_columns {
1149            merged.with_column(name.clone().into(), DataType::String);
1150        }
1151        for (name, dtype) in file_schema.iter() {
1152            if !part_set.contains(name.as_str()) {
1153                merged.with_column(name.clone(), dtype.clone());
1154            }
1155        }
1156        Ok((Arc::new(merged), partition_columns))
1157    }
1158
1159    /// Discover hive partition column names (public for phased loading). Directory: single-spine walk; glob: parse pattern.
1160    pub fn discover_hive_partition_columns(path: &Path) -> Vec<String> {
1161        if path.is_dir() {
1162            Self::discover_partition_columns_from_path(path)
1163        } else {
1164            Self::discover_partition_columns_from_glob_pattern(path)
1165        }
1166    }
1167
1168    /// Discover hive partition column names from a directory path by walking a single
1169    /// "spine" (one branch) of key=value directories. Partition keys are uniform across
1170    /// the tree, so we only need one path to infer [year, month, day] etc. Returns columns
1171    /// in path order. Stops after max_depth levels to avoid runaway on malformed trees.
1172    fn discover_partition_columns_from_path(path: &Path) -> Vec<String> {
1173        const MAX_PARTITION_DEPTH: usize = 64;
1174        let mut columns = Vec::<String>::new();
1175        let mut seen = HashSet::<String>::new();
1176        Self::discover_partition_columns_spine(
1177            path,
1178            &mut columns,
1179            &mut seen,
1180            0,
1181            MAX_PARTITION_DEPTH,
1182        );
1183        columns
1184    }
1185
1186    /// Walk one branch: at this directory, find the first child that is a key=value dir,
1187    /// record the key (if not already seen), then recurse into that one child only.
1188    /// This does O(depth) read_dir calls instead of walking the entire tree.
1189    fn discover_partition_columns_spine(
1190        path: &Path,
1191        columns: &mut Vec<String>,
1192        seen: &mut HashSet<String>,
1193        depth: usize,
1194        max_depth: usize,
1195    ) {
1196        if depth >= max_depth {
1197            return;
1198        }
1199        let Ok(entries) = fs::read_dir(path) else {
1200            return;
1201        };
1202        let mut first_partition_child: Option<std::path::PathBuf> = None;
1203        for entry in entries.flatten() {
1204            let child = entry.path();
1205            if child.is_dir() {
1206                if let Some(name) = child.file_name().and_then(|n| n.to_str()) {
1207                    if let Some((key, _)) = name.split_once('=') {
1208                        if !key.is_empty() && seen.insert(key.to_string()) {
1209                            columns.push(key.to_string());
1210                        }
1211                        if first_partition_child.is_none() {
1212                            first_partition_child = Some(child);
1213                        }
1214                        break;
1215                    }
1216                }
1217            }
1218        }
1219        if let Some(one) = first_partition_child {
1220            Self::discover_partition_columns_spine(&one, columns, seen, depth + 1, max_depth);
1221        }
1222    }
1223
1224    /// Infer partition column names from a glob pattern path (e.g. "data/year=*/month=*/*.parquet").
1225    fn discover_partition_columns_from_glob_pattern(path: &Path) -> Vec<String> {
1226        let path_str = path.as_os_str().to_string_lossy();
1227        let mut columns = Vec::<String>::new();
1228        let mut seen = HashSet::<String>::new();
1229        for segment in path_str.split('/') {
1230            if let Some((key, rest)) = segment.split_once('=') {
1231                if !key.is_empty()
1232                    && (rest == "*" || !rest.contains('*'))
1233                    && seen.insert(key.to_string())
1234                {
1235                    columns.push(key.to_string());
1236                }
1237            }
1238        }
1239        columns
1240    }
1241
1242    /// Load Parquet with Hive partitioning from a directory or glob path.
1243    /// When path is a directory, partition columns are discovered from path structure.
1244    /// When path contains glob (e.g. `**/*.parquet`), partition columns are inferred from the pattern (e.g. `year=*/month=*`).
1245    /// Partition columns are moved to the left in the initial LazyFrame before state is created.
1246    ///
1247    /// **Performance**: The slow part is Polars, not our code. `scan_parquet` + `collect_schema()` trigger
1248    /// path expansion (full directory tree or glob) and parquet metadata reads; we only do a single-spine
1249    /// walk for partition key discovery and cheap schema/select work.
1250    pub fn from_parquet_hive(
1251        path: &Path,
1252        pages_lookahead: Option<usize>,
1253        pages_lookback: Option<usize>,
1254        max_buffered_rows: Option<usize>,
1255        max_buffered_mb: Option<usize>,
1256        row_numbers: bool,
1257        row_start_index: usize,
1258    ) -> Result<Self> {
1259        let path_str = path.as_os_str().to_string_lossy();
1260        let is_glob = path_str.contains('*');
1261        let pl_path = PlPath::Local(Arc::from(path));
1262        let args = ScanArgsParquet {
1263            hive_options: HiveOptions::new_enabled(),
1264            glob: is_glob,
1265            ..Default::default()
1266        };
1267        let mut lf = LazyFrame::scan_parquet(pl_path, args)?;
1268        let schema = lf.collect_schema()?;
1269
1270        let mut discovered = if path.is_dir() {
1271            Self::discover_partition_columns_from_path(path)
1272        } else {
1273            Self::discover_partition_columns_from_glob_pattern(path)
1274        };
1275
1276        // Fallback: glob like "**/*.parquet" has no key= in the pattern, so discovery is empty.
1277        // Try discovering from a directory prefix (e.g. path.parent() or walk up until we find a dir).
1278        if discovered.is_empty() {
1279            let mut dir = path;
1280            while !dir.is_dir() {
1281                match dir.parent() {
1282                    Some(p) => dir = p,
1283                    None => break,
1284                }
1285            }
1286            if dir.is_dir() {
1287                discovered = Self::discover_partition_columns_from_path(dir);
1288            }
1289        }
1290
1291        let partition_columns: Vec<String> = discovered
1292            .into_iter()
1293            .filter(|c| schema.contains(c.as_str()))
1294            .collect();
1295
1296        let new_order: Vec<String> = if partition_columns.is_empty() {
1297            schema.iter_names().map(|s| s.to_string()).collect()
1298        } else {
1299            let part_set: HashSet<&str> = partition_columns.iter().map(String::as_str).collect();
1300            let all_names: Vec<String> = schema.iter_names().map(|s| s.to_string()).collect();
1301            let rest: Vec<String> = all_names
1302                .into_iter()
1303                .filter(|c| !part_set.contains(c.as_str()))
1304                .collect();
1305            partition_columns.iter().cloned().chain(rest).collect()
1306        };
1307
1308        if !partition_columns.is_empty() {
1309            let exprs: Vec<Expr> = new_order.iter().map(|s| col(s.as_str())).collect();
1310            lf = lf.select(exprs);
1311        }
1312
1313        let mut state = Self::new(
1314            lf,
1315            pages_lookahead,
1316            pages_lookback,
1317            max_buffered_rows,
1318            max_buffered_mb,
1319            true,
1320        )?;
1321        state.row_numbers = row_numbers;
1322        state.row_start_index = row_start_index;
1323        state.partition_columns = if partition_columns.is_empty() {
1324            None
1325        } else {
1326            Some(partition_columns)
1327        };
1328        // Ensure display order is partition-first (Self::new uses schema order; be explicit).
1329        state.set_column_order(new_order);
1330        Ok(state)
1331    }
1332
1333    pub fn set_row_numbers(&mut self, enabled: bool) {
1334        self.row_numbers = enabled;
1335    }
1336
1337    pub fn toggle_row_numbers(&mut self) {
1338        self.row_numbers = !self.row_numbers;
1339    }
1340
1341    /// Row number display start (0 or 1); used by go-to-line to interpret user input.
1342    pub fn row_start_index(&self) -> usize {
1343        self.row_start_index
1344    }
1345
1346    /// Decompress a compressed file to a temp file for lazy CSV scan.
1347    fn decompress_compressed_csv_to_temp(
1348        path: &Path,
1349        compression: CompressionFormat,
1350        temp_dir: &Path,
1351    ) -> Result<NamedTempFile> {
1352        let mut temp = NamedTempFile::new_in(temp_dir)?;
1353        let out = temp.as_file_mut();
1354        let mut reader: Box<dyn Read> = match compression {
1355            CompressionFormat::Gzip => {
1356                let f = File::open(path)?;
1357                Box::new(flate2::read::GzDecoder::new(BufReader::new(f)))
1358            }
1359            CompressionFormat::Zstd => {
1360                let f = File::open(path)?;
1361                Box::new(zstd::Decoder::new(BufReader::new(f))?)
1362            }
1363            CompressionFormat::Bzip2 => {
1364                let f = File::open(path)?;
1365                Box::new(bzip2::read::BzDecoder::new(BufReader::new(f)))
1366            }
1367            CompressionFormat::Xz => {
1368                let f = File::open(path)?;
1369                Box::new(xz2::read::XzDecoder::new(BufReader::new(f)))
1370            }
1371        };
1372        std::io::copy(&mut reader, out)?;
1373        out.sync_all()?;
1374        Ok(temp)
1375    }
1376
1377    pub fn from_csv(path: &Path, options: &OpenOptions) -> Result<Self> {
1378        // Determine compression format: explicit option, or auto-detect from extension
1379        let compression = options
1380            .compression
1381            .or_else(|| CompressionFormat::from_extension(path));
1382
1383        if let Some(compression) = compression {
1384            if options.decompress_in_memory {
1385                // Eager read: decompress into memory, then CSV read
1386                match compression {
1387                    CompressionFormat::Gzip | CompressionFormat::Zstd => {
1388                        let mut read_options = CsvReadOptions::default();
1389                        if let Some(skip_lines) = options.skip_lines {
1390                            read_options.skip_lines = skip_lines;
1391                        }
1392                        if let Some(skip_rows) = options.skip_rows {
1393                            read_options.skip_rows = skip_rows;
1394                        }
1395                        if let Some(has_header) = options.has_header {
1396                            read_options.has_header = has_header;
1397                        }
1398                        read_options = read_options.map_parse_options(|opts| {
1399                            opts.with_try_parse_dates(options.parse_dates)
1400                        });
1401                        let df = read_options
1402                            .try_into_reader_with_file_path(Some(path.into()))?
1403                            .finish()?;
1404                        let lf = df.lazy();
1405                        let mut state = Self::new(
1406                            lf,
1407                            options.pages_lookahead,
1408                            options.pages_lookback,
1409                            options.max_buffered_rows,
1410                            options.max_buffered_mb,
1411                            options.polars_streaming,
1412                        )?;
1413                        state.row_numbers = options.row_numbers;
1414                        state.row_start_index = options.row_start_index;
1415                        Ok(state)
1416                    }
1417                    CompressionFormat::Bzip2 => {
1418                        let file = File::open(path)?;
1419                        let mut decoder = bzip2::read::BzDecoder::new(BufReader::new(file));
1420                        let mut decompressed = Vec::new();
1421                        decoder.read_to_end(&mut decompressed)?;
1422                        let mut read_options = CsvReadOptions::default();
1423                        if let Some(skip_lines) = options.skip_lines {
1424                            read_options.skip_lines = skip_lines;
1425                        }
1426                        if let Some(skip_rows) = options.skip_rows {
1427                            read_options.skip_rows = skip_rows;
1428                        }
1429                        if let Some(has_header) = options.has_header {
1430                            read_options.has_header = has_header;
1431                        }
1432                        read_options = read_options.map_parse_options(|opts| {
1433                            opts.with_try_parse_dates(options.parse_dates)
1434                        });
1435                        let df = CsvReader::new(std::io::Cursor::new(decompressed))
1436                            .with_options(read_options)
1437                            .finish()?;
1438                        let lf = df.lazy();
1439                        let mut state = Self::new(
1440                            lf,
1441                            options.pages_lookahead,
1442                            options.pages_lookback,
1443                            options.max_buffered_rows,
1444                            options.max_buffered_mb,
1445                            options.polars_streaming,
1446                        )?;
1447                        state.row_numbers = options.row_numbers;
1448                        state.row_start_index = options.row_start_index;
1449                        Ok(state)
1450                    }
1451                    CompressionFormat::Xz => {
1452                        let file = File::open(path)?;
1453                        let mut decoder = xz2::read::XzDecoder::new(BufReader::new(file));
1454                        let mut decompressed = Vec::new();
1455                        decoder.read_to_end(&mut decompressed)?;
1456                        let mut read_options = CsvReadOptions::default();
1457                        if let Some(skip_lines) = options.skip_lines {
1458                            read_options.skip_lines = skip_lines;
1459                        }
1460                        if let Some(skip_rows) = options.skip_rows {
1461                            read_options.skip_rows = skip_rows;
1462                        }
1463                        if let Some(has_header) = options.has_header {
1464                            read_options.has_header = has_header;
1465                        }
1466                        read_options = read_options.map_parse_options(|opts| {
1467                            opts.with_try_parse_dates(options.parse_dates)
1468                        });
1469                        let df = CsvReader::new(std::io::Cursor::new(decompressed))
1470                            .with_options(read_options)
1471                            .finish()?;
1472                        let lf = df.lazy();
1473                        let mut state = Self::new(
1474                            lf,
1475                            options.pages_lookahead,
1476                            options.pages_lookback,
1477                            options.max_buffered_rows,
1478                            options.max_buffered_mb,
1479                            options.polars_streaming,
1480                        )?;
1481                        state.row_numbers = options.row_numbers;
1482                        state.row_start_index = options.row_start_index;
1483                        Ok(state)
1484                    }
1485                }
1486            } else {
1487                // Decompress to temp file, then lazy scan
1488                let temp_dir = options.temp_dir.clone().unwrap_or_else(std::env::temp_dir);
1489                let temp = Self::decompress_compressed_csv_to_temp(path, compression, &temp_dir)?;
1490                let mut state = Self::from_csv_customize(
1491                    temp.path(),
1492                    options.pages_lookahead,
1493                    options.pages_lookback,
1494                    options.max_buffered_rows,
1495                    options.max_buffered_mb,
1496                    |mut reader| {
1497                        if let Some(skip_lines) = options.skip_lines {
1498                            reader = reader.with_skip_lines(skip_lines);
1499                        }
1500                        if let Some(skip_rows) = options.skip_rows {
1501                            reader = reader.with_skip_rows(skip_rows);
1502                        }
1503                        if let Some(has_header) = options.has_header {
1504                            reader = reader.with_has_header(has_header);
1505                        }
1506                        reader = reader.with_try_parse_dates(options.parse_dates);
1507                        reader
1508                    },
1509                )?;
1510                state.row_numbers = options.row_numbers;
1511                state.row_start_index = options.row_start_index;
1512                state.decompress_temp_file = Some(temp);
1513                Ok(state)
1514            }
1515        } else {
1516            // For uncompressed files, use lazy scanning (more efficient)
1517            let mut state = Self::from_csv_customize(
1518                path,
1519                options.pages_lookahead,
1520                options.pages_lookback,
1521                options.max_buffered_rows,
1522                options.max_buffered_mb,
1523                |mut reader| {
1524                    if let Some(skip_lines) = options.skip_lines {
1525                        reader = reader.with_skip_lines(skip_lines);
1526                    }
1527                    if let Some(skip_rows) = options.skip_rows {
1528                        reader = reader.with_skip_rows(skip_rows);
1529                    }
1530                    if let Some(has_header) = options.has_header {
1531                        reader = reader.with_has_header(has_header);
1532                    }
1533                    reader = reader.with_try_parse_dates(options.parse_dates);
1534                    reader
1535                },
1536            )?;
1537            state.row_numbers = options.row_numbers;
1538            Ok(state)
1539        }
1540    }
1541
1542    pub fn from_csv_customize<F>(
1543        path: &Path,
1544        pages_lookahead: Option<usize>,
1545        pages_lookback: Option<usize>,
1546        max_buffered_rows: Option<usize>,
1547        max_buffered_mb: Option<usize>,
1548        func: F,
1549    ) -> Result<Self>
1550    where
1551        F: FnOnce(LazyCsvReader) -> LazyCsvReader,
1552    {
1553        let pl_path = PlPath::Local(Arc::from(path));
1554        let reader = LazyCsvReader::new(pl_path);
1555        let lf = func(reader).finish()?;
1556        Self::new(
1557            lf,
1558            pages_lookahead,
1559            pages_lookback,
1560            max_buffered_rows,
1561            max_buffered_mb,
1562            true,
1563        )
1564    }
1565
1566    /// Load multiple CSV files (uncompressed) and concatenate into one LazyFrame.
1567    pub fn from_csv_paths(paths: &[impl AsRef<Path>], options: &OpenOptions) -> Result<Self> {
1568        if paths.is_empty() {
1569            return Err(color_eyre::eyre::eyre!("No paths provided"));
1570        }
1571        if paths.len() == 1 {
1572            return Self::from_csv(paths[0].as_ref(), options);
1573        }
1574        let mut lazy_frames = Vec::with_capacity(paths.len());
1575        for p in paths {
1576            let pl_path = PlPath::Local(Arc::from(p.as_ref()));
1577            let mut reader = LazyCsvReader::new(pl_path);
1578            if let Some(skip_lines) = options.skip_lines {
1579                reader = reader.with_skip_lines(skip_lines);
1580            }
1581            if let Some(skip_rows) = options.skip_rows {
1582                reader = reader.with_skip_rows(skip_rows);
1583            }
1584            if let Some(has_header) = options.has_header {
1585                reader = reader.with_has_header(has_header);
1586            }
1587            reader = reader.with_try_parse_dates(options.parse_dates);
1588            let lf = reader.finish()?;
1589            lazy_frames.push(lf);
1590        }
1591        let lf = polars::prelude::concat(lazy_frames.as_slice(), Default::default())?;
1592        let mut state = Self::new(
1593            lf,
1594            options.pages_lookahead,
1595            options.pages_lookback,
1596            options.max_buffered_rows,
1597            options.max_buffered_mb,
1598            options.polars_streaming,
1599        )?;
1600        state.row_numbers = options.row_numbers;
1601        state.row_start_index = options.row_start_index;
1602        Ok(state)
1603    }
1604
1605    pub fn from_ndjson(
1606        path: &Path,
1607        pages_lookahead: Option<usize>,
1608        pages_lookback: Option<usize>,
1609        max_buffered_rows: Option<usize>,
1610        max_buffered_mb: Option<usize>,
1611        row_numbers: bool,
1612        row_start_index: usize,
1613    ) -> Result<Self> {
1614        let pl_path = PlPath::Local(Arc::from(path));
1615        let lf = LazyJsonLineReader::new(pl_path).finish()?;
1616        let mut state = Self::new(
1617            lf,
1618            pages_lookahead,
1619            pages_lookback,
1620            max_buffered_rows,
1621            max_buffered_mb,
1622            true,
1623        )?;
1624        state.row_numbers = row_numbers;
1625        state.row_start_index = row_start_index;
1626        Ok(state)
1627    }
1628
1629    /// Load multiple NDJSON files and concatenate into one LazyFrame.
1630    pub fn from_ndjson_paths(
1631        paths: &[impl AsRef<Path>],
1632        pages_lookahead: Option<usize>,
1633        pages_lookback: Option<usize>,
1634        max_buffered_rows: Option<usize>,
1635        max_buffered_mb: Option<usize>,
1636        row_numbers: bool,
1637        row_start_index: usize,
1638    ) -> Result<Self> {
1639        if paths.is_empty() {
1640            return Err(color_eyre::eyre::eyre!("No paths provided"));
1641        }
1642        if paths.len() == 1 {
1643            return Self::from_ndjson(
1644                paths[0].as_ref(),
1645                pages_lookahead,
1646                pages_lookback,
1647                max_buffered_rows,
1648                max_buffered_mb,
1649                row_numbers,
1650                row_start_index,
1651            );
1652        }
1653        let mut lazy_frames = Vec::with_capacity(paths.len());
1654        for p in paths {
1655            let pl_path = PlPath::Local(Arc::from(p.as_ref()));
1656            let lf = LazyJsonLineReader::new(pl_path).finish()?;
1657            lazy_frames.push(lf);
1658        }
1659        let lf = polars::prelude::concat(lazy_frames.as_slice(), Default::default())?;
1660        let mut state = Self::new(
1661            lf,
1662            pages_lookahead,
1663            pages_lookback,
1664            max_buffered_rows,
1665            max_buffered_mb,
1666            true,
1667        )?;
1668        state.row_numbers = row_numbers;
1669        state.row_start_index = row_start_index;
1670        Ok(state)
1671    }
1672
1673    pub fn from_json(
1674        path: &Path,
1675        pages_lookahead: Option<usize>,
1676        pages_lookback: Option<usize>,
1677        max_buffered_rows: Option<usize>,
1678        max_buffered_mb: Option<usize>,
1679        row_numbers: bool,
1680        row_start_index: usize,
1681    ) -> Result<Self> {
1682        Self::from_json_with_format(
1683            path,
1684            pages_lookahead,
1685            pages_lookback,
1686            max_buffered_rows,
1687            max_buffered_mb,
1688            row_numbers,
1689            row_start_index,
1690            JsonFormat::Json,
1691        )
1692    }
1693
1694    pub fn from_json_lines(
1695        path: &Path,
1696        pages_lookahead: Option<usize>,
1697        pages_lookback: Option<usize>,
1698        max_buffered_rows: Option<usize>,
1699        max_buffered_mb: Option<usize>,
1700        row_numbers: bool,
1701        row_start_index: usize,
1702    ) -> Result<Self> {
1703        Self::from_json_with_format(
1704            path,
1705            pages_lookahead,
1706            pages_lookback,
1707            max_buffered_rows,
1708            max_buffered_mb,
1709            row_numbers,
1710            row_start_index,
1711            JsonFormat::JsonLines,
1712        )
1713    }
1714
1715    #[allow(clippy::too_many_arguments)]
1716    fn from_json_with_format(
1717        path: &Path,
1718        pages_lookahead: Option<usize>,
1719        pages_lookback: Option<usize>,
1720        max_buffered_rows: Option<usize>,
1721        max_buffered_mb: Option<usize>,
1722        row_numbers: bool,
1723        row_start_index: usize,
1724        format: JsonFormat,
1725    ) -> Result<Self> {
1726        let file = File::open(path)?;
1727        let lf = JsonReader::new(file)
1728            .with_json_format(format)
1729            .finish()?
1730            .lazy();
1731        let mut state = Self::new(
1732            lf,
1733            pages_lookahead,
1734            pages_lookback,
1735            max_buffered_rows,
1736            max_buffered_mb,
1737            true,
1738        )?;
1739        state.row_numbers = row_numbers;
1740        state.row_start_index = row_start_index;
1741        Ok(state)
1742    }
1743
1744    /// Load multiple JSON (array) files and concatenate into one LazyFrame.
1745    pub fn from_json_paths(
1746        paths: &[impl AsRef<Path>],
1747        pages_lookahead: Option<usize>,
1748        pages_lookback: Option<usize>,
1749        max_buffered_rows: Option<usize>,
1750        max_buffered_mb: Option<usize>,
1751        row_numbers: bool,
1752        row_start_index: usize,
1753    ) -> Result<Self> {
1754        Self::from_json_with_format_paths(
1755            paths,
1756            pages_lookahead,
1757            pages_lookback,
1758            max_buffered_rows,
1759            max_buffered_mb,
1760            row_numbers,
1761            row_start_index,
1762            JsonFormat::Json,
1763        )
1764    }
1765
1766    /// Load multiple JSON Lines files and concatenate into one LazyFrame.
1767    pub fn from_json_lines_paths(
1768        paths: &[impl AsRef<Path>],
1769        pages_lookahead: Option<usize>,
1770        pages_lookback: Option<usize>,
1771        max_buffered_rows: Option<usize>,
1772        max_buffered_mb: Option<usize>,
1773        row_numbers: bool,
1774        row_start_index: usize,
1775    ) -> Result<Self> {
1776        Self::from_json_with_format_paths(
1777            paths,
1778            pages_lookahead,
1779            pages_lookback,
1780            max_buffered_rows,
1781            max_buffered_mb,
1782            row_numbers,
1783            row_start_index,
1784            JsonFormat::JsonLines,
1785        )
1786    }
1787
1788    #[allow(clippy::too_many_arguments)]
1789    fn from_json_with_format_paths(
1790        paths: &[impl AsRef<Path>],
1791        pages_lookahead: Option<usize>,
1792        pages_lookback: Option<usize>,
1793        max_buffered_rows: Option<usize>,
1794        max_buffered_mb: Option<usize>,
1795        row_numbers: bool,
1796        row_start_index: usize,
1797        format: JsonFormat,
1798    ) -> Result<Self> {
1799        if paths.is_empty() {
1800            return Err(color_eyre::eyre::eyre!("No paths provided"));
1801        }
1802        if paths.len() == 1 {
1803            return Self::from_json_with_format(
1804                paths[0].as_ref(),
1805                pages_lookahead,
1806                pages_lookback,
1807                max_buffered_rows,
1808                max_buffered_mb,
1809                row_numbers,
1810                row_start_index,
1811                format,
1812            );
1813        }
1814        let mut lazy_frames = Vec::with_capacity(paths.len());
1815        for p in paths {
1816            let file = File::open(p.as_ref())?;
1817            let lf = match &format {
1818                JsonFormat::Json => JsonReader::new(file)
1819                    .with_json_format(JsonFormat::Json)
1820                    .finish()?
1821                    .lazy(),
1822                JsonFormat::JsonLines => JsonReader::new(file)
1823                    .with_json_format(JsonFormat::JsonLines)
1824                    .finish()?
1825                    .lazy(),
1826            };
1827            lazy_frames.push(lf);
1828        }
1829        let lf = polars::prelude::concat(lazy_frames.as_slice(), Default::default())?;
1830        let mut state = Self::new(
1831            lf,
1832            pages_lookahead,
1833            pages_lookback,
1834            max_buffered_rows,
1835            max_buffered_mb,
1836            true,
1837        )?;
1838        state.row_numbers = row_numbers;
1839        state.row_start_index = row_start_index;
1840        Ok(state)
1841    }
1842
1843    #[allow(clippy::too_many_arguments)]
1844    pub fn from_delimited(
1845        path: &Path,
1846        delimiter: u8,
1847        pages_lookahead: Option<usize>,
1848        pages_lookback: Option<usize>,
1849        max_buffered_rows: Option<usize>,
1850        max_buffered_mb: Option<usize>,
1851        row_numbers: bool,
1852        row_start_index: usize,
1853    ) -> Result<Self> {
1854        let pl_path = PlPath::Local(Arc::from(path));
1855        let reader = LazyCsvReader::new(pl_path).with_separator(delimiter);
1856        let lf = reader.finish()?;
1857        let mut state = Self::new(
1858            lf,
1859            pages_lookahead,
1860            pages_lookback,
1861            max_buffered_rows,
1862            max_buffered_mb,
1863            true,
1864        )?;
1865        state.row_numbers = row_numbers;
1866        state.row_start_index = row_start_index;
1867        Ok(state)
1868    }
1869
1870    /// Returns true if a scroll by `rows` would trigger a collect (view would leave the buffer).
1871    /// Used so the UI only shows the throbber when actual data loading will occur.
1872    pub fn scroll_would_trigger_collect(&self, rows: i64) -> bool {
1873        if rows < 0 && self.start_row == 0 {
1874            return false;
1875        }
1876        let new_start_row = if self.start_row as i64 + rows <= 0 {
1877            0
1878        } else {
1879            if let Some(df) = self.df.as_ref() {
1880                if rows > 0 && df.shape().0 <= self.visible_rows {
1881                    return false;
1882                }
1883            }
1884            (self.start_row as i64 + rows) as usize
1885        };
1886        let view_end = new_start_row
1887            + self
1888                .visible_rows
1889                .min(self.num_rows.saturating_sub(new_start_row));
1890        let within_buffer = new_start_row >= self.buffered_start_row
1891            && view_end <= self.buffered_end_row
1892            && self.buffered_end_row > 0;
1893        !within_buffer
1894    }
1895
1896    fn slide_table(&mut self, rows: i64) {
1897        if rows < 0 && self.start_row == 0 {
1898            return;
1899        }
1900
1901        let new_start_row = if self.start_row as i64 + rows <= 0 {
1902            0
1903        } else {
1904            if let Some(df) = self.df.as_ref() {
1905                if rows > 0 && df.shape().0 <= self.visible_rows {
1906                    return;
1907                }
1908            }
1909            (self.start_row as i64 + rows) as usize
1910        };
1911
1912        // Call collect() only when view is outside buffer; otherwise just update start_row.
1913        let view_end = new_start_row
1914            + self
1915                .visible_rows
1916                .min(self.num_rows.saturating_sub(new_start_row));
1917        let within_buffer = new_start_row >= self.buffered_start_row
1918            && view_end <= self.buffered_end_row
1919            && self.buffered_end_row > 0;
1920
1921        if within_buffer {
1922            self.start_row = new_start_row;
1923            return;
1924        }
1925
1926        self.start_row = new_start_row;
1927        self.collect();
1928    }
1929
1930    pub fn collect(&mut self) {
1931        // Update proximity threshold based on visible rows
1932        if self.visible_rows > 0 {
1933            self.proximity_threshold = self.visible_rows;
1934        }
1935
1936        // Run len() only when lf has changed (query, filter, sort, pivot, melt, reset, drill).
1937        if !self.num_rows_valid {
1938            self.num_rows =
1939                match collect_lazy(self.lf.clone().select([len()]), self.polars_streaming) {
1940                    Ok(df) => {
1941                        if let Some(col) = df.get(0) {
1942                            if let Some(AnyValue::UInt32(len)) = col.first() {
1943                                *len as usize
1944                            } else {
1945                                0
1946                            }
1947                        } else {
1948                            0
1949                        }
1950                    }
1951                    Err(_) => 0,
1952                };
1953            self.num_rows_valid = true;
1954        }
1955
1956        if self.num_rows > 0 {
1957            let max_start = self.num_rows.saturating_sub(1);
1958            if self.start_row > max_start {
1959                self.start_row = max_start;
1960            }
1961        } else {
1962            self.start_row = 0;
1963            self.buffered_start_row = 0;
1964            self.buffered_end_row = 0;
1965            self.buffered_df = None;
1966            self.df = None;
1967            self.locked_df = None;
1968            return;
1969        }
1970
1971        // Proximity-based buffer logic
1972        let view_start = self.start_row;
1973        let view_end = self.start_row + self.visible_rows.min(self.num_rows - self.start_row);
1974
1975        // Check if current view is within buffered range
1976        let within_buffer = view_start >= self.buffered_start_row
1977            && view_end <= self.buffered_end_row
1978            && self.buffered_end_row > 0;
1979
1980        // Buffer grows incrementally: initial load and each expansion add only a few pages (lookahead + lookback).
1981        // clamp_buffer_to_max_size caps at max_buffered_rows and slides the window when at cap.
1982        let page_rows = self.visible_rows.max(1);
1983
1984        if within_buffer {
1985            let dist_to_start = view_start.saturating_sub(self.buffered_start_row);
1986            let dist_to_end = self.buffered_end_row.saturating_sub(view_end);
1987
1988            let needs_expansion_back =
1989                dist_to_start <= self.proximity_threshold && self.buffered_start_row > 0;
1990            let needs_expansion_forward =
1991                dist_to_end <= self.proximity_threshold && self.buffered_end_row < self.num_rows;
1992
1993            if !needs_expansion_back && !needs_expansion_forward {
1994                // Column scroll only: reuse cached full buffer and re-slice into locked/scroll columns.
1995                let expected_len = self
1996                    .buffered_end_row
1997                    .saturating_sub(self.buffered_start_row);
1998                if self
1999                    .buffered_df
2000                    .as_ref()
2001                    .is_some_and(|b| b.height() == expected_len)
2002                {
2003                    self.slice_buffer_into_display();
2004                    if self.table_state.selected().is_none() {
2005                        self.table_state.select(Some(0));
2006                    }
2007                    return;
2008                }
2009                self.load_buffer(self.buffered_start_row, self.buffered_end_row);
2010                if self.table_state.selected().is_none() {
2011                    self.table_state.select(Some(0));
2012                }
2013                return;
2014            }
2015
2016            let mut new_buffer_start = if needs_expansion_back {
2017                view_start.saturating_sub(self.pages_lookback * page_rows)
2018            } else {
2019                self.buffered_start_row
2020            };
2021
2022            let mut new_buffer_end = if needs_expansion_forward {
2023                (view_end + self.pages_lookahead * page_rows).min(self.num_rows)
2024            } else {
2025                self.buffered_end_row
2026            };
2027
2028            self.clamp_buffer_to_max_size(
2029                view_start,
2030                view_end,
2031                &mut new_buffer_start,
2032                &mut new_buffer_end,
2033            );
2034            self.load_buffer(new_buffer_start, new_buffer_end);
2035        } else {
2036            // Outside buffer: either extend the previous buffer (so it grows) or load a fresh small window.
2037            // Only extend when the view is "close" to the existing buffer (e.g. user paged down a bit).
2038            // A big jump (e.g. jump to end) should load just a window around the new view, not extend
2039            // the buffer across the whole dataset.
2040            let mut new_buffer_start;
2041            let mut new_buffer_end;
2042
2043            let had_buffer = self.buffered_end_row > 0;
2044            let scrolled_past_end = had_buffer && view_start >= self.buffered_end_row;
2045            let scrolled_past_start = had_buffer && view_end <= self.buffered_start_row;
2046
2047            let extend_forward_ok = scrolled_past_end
2048                && (view_start - self.buffered_end_row) <= self.pages_lookahead * page_rows;
2049            let extend_backward_ok = scrolled_past_start
2050                && (self.buffered_start_row - view_end) <= self.pages_lookback * page_rows;
2051
2052            if extend_forward_ok {
2053                // View is just a few pages past buffer end; extend forward.
2054                new_buffer_start = self.buffered_start_row;
2055                new_buffer_end = (view_end + self.pages_lookahead * page_rows).min(self.num_rows);
2056            } else if extend_backward_ok {
2057                // View is just a few pages before buffer start; extend backward.
2058                new_buffer_start = view_start.saturating_sub(self.pages_lookback * page_rows);
2059                new_buffer_end = self.buffered_end_row;
2060            } else if scrolled_past_end || scrolled_past_start {
2061                // Big jump (e.g. jump to end or jump to start): load a fresh window around the view.
2062                new_buffer_start = view_start.saturating_sub(self.pages_lookback * page_rows);
2063                new_buffer_end = (view_end + self.pages_lookahead * page_rows).min(self.num_rows);
2064                let min_initial_len = (1 + self.pages_lookahead + self.pages_lookback) * page_rows;
2065                let current_len = new_buffer_end.saturating_sub(new_buffer_start);
2066                if current_len < min_initial_len {
2067                    let need = min_initial_len.saturating_sub(current_len);
2068                    let can_extend_end = self.num_rows.saturating_sub(new_buffer_end);
2069                    let can_extend_start = new_buffer_start;
2070                    if can_extend_end >= need {
2071                        new_buffer_end = (new_buffer_end + need).min(self.num_rows);
2072                    } else if can_extend_start >= need {
2073                        new_buffer_start = new_buffer_start.saturating_sub(need);
2074                    } else {
2075                        new_buffer_end = (new_buffer_end + can_extend_end).min(self.num_rows);
2076                        new_buffer_start =
2077                            new_buffer_start.saturating_sub(need.saturating_sub(can_extend_end));
2078                    }
2079                }
2080            } else {
2081                // No buffer yet or big jump: load a fresh small window (view ± a few pages).
2082                new_buffer_start = view_start.saturating_sub(self.pages_lookback * page_rows);
2083                new_buffer_end = (view_end + self.pages_lookahead * page_rows).min(self.num_rows);
2084
2085                // Ensure at least (1 + lookahead + lookback) pages so buffer size is consistent (e.g. 364 at 52 visible).
2086                let min_initial_len = (1 + self.pages_lookahead + self.pages_lookback) * page_rows;
2087                let current_len = new_buffer_end.saturating_sub(new_buffer_start);
2088                if current_len < min_initial_len {
2089                    let need = min_initial_len.saturating_sub(current_len);
2090                    let can_extend_end = self.num_rows.saturating_sub(new_buffer_end);
2091                    let can_extend_start = new_buffer_start;
2092                    if can_extend_end >= need {
2093                        new_buffer_end = (new_buffer_end + need).min(self.num_rows);
2094                    } else if can_extend_start >= need {
2095                        new_buffer_start = new_buffer_start.saturating_sub(need);
2096                    } else {
2097                        new_buffer_end = (new_buffer_end + can_extend_end).min(self.num_rows);
2098                        new_buffer_start =
2099                            new_buffer_start.saturating_sub(need.saturating_sub(can_extend_end));
2100                    }
2101                }
2102            }
2103
2104            self.clamp_buffer_to_max_size(
2105                view_start,
2106                view_end,
2107                &mut new_buffer_start,
2108                &mut new_buffer_end,
2109            );
2110            self.load_buffer(new_buffer_start, new_buffer_end);
2111        }
2112
2113        self.slice_from_buffer();
2114        if self.table_state.selected().is_none() {
2115            self.table_state.select(Some(0));
2116        }
2117    }
2118
2119    /// Invalidate num_rows cache when lf is mutated.
2120    fn invalidate_num_rows(&mut self) {
2121        self.num_rows_valid = false;
2122    }
2123
2124    /// Returns the cached row count when valid (same value shown in the control bar). Use this to
2125    /// avoid an extra full scan for analysis/describe when the table has already been collected.
2126    pub fn num_rows_if_valid(&self) -> Option<usize> {
2127        if self.num_rows_valid {
2128            Some(self.num_rows)
2129        } else {
2130            None
2131        }
2132    }
2133
2134    /// Clamp buffer to max_buffered_rows; when at cap, slide window to keep view inside.
2135    fn clamp_buffer_to_max_size(
2136        &self,
2137        view_start: usize,
2138        view_end: usize,
2139        buffer_start: &mut usize,
2140        buffer_end: &mut usize,
2141    ) {
2142        if self.max_buffered_rows == 0 {
2143            return;
2144        }
2145        let max_len = self.max_buffered_rows;
2146        let requested_len = buffer_end.saturating_sub(*buffer_start);
2147        if requested_len <= max_len {
2148            return;
2149        }
2150        let view_len = view_end.saturating_sub(view_start);
2151        if view_len >= max_len {
2152            *buffer_start = view_start;
2153            *buffer_end = (view_start + max_len).min(self.num_rows);
2154        } else {
2155            let half = (max_len - view_len) / 2;
2156            *buffer_end = (view_end + half).min(self.num_rows);
2157            *buffer_start = (*buffer_end).saturating_sub(max_len);
2158            if *buffer_start > view_start {
2159                *buffer_start = view_start;
2160            }
2161            *buffer_end = (*buffer_start + max_len).min(self.num_rows);
2162        }
2163    }
2164
2165    fn load_buffer(&mut self, buffer_start: usize, buffer_end: usize) {
2166        let buffer_size = buffer_end.saturating_sub(buffer_start);
2167        if buffer_size == 0 {
2168            return;
2169        }
2170
2171        let all_columns: Vec<_> = self
2172            .column_order
2173            .iter()
2174            .map(|name| col(name.as_str()))
2175            .collect();
2176
2177        let mut full_df = match collect_lazy(
2178            self.lf
2179                .clone()
2180                .select(all_columns)
2181                .slice(buffer_start as i64, buffer_size as u32),
2182            self.polars_streaming,
2183        ) {
2184            Ok(df) => df,
2185            Err(e) => {
2186                self.error = Some(e);
2187                return;
2188            }
2189        };
2190
2191        let mut effective_buffer_end = buffer_end;
2192        if self.max_buffered_mb > 0 {
2193            let size = full_df.estimated_size();
2194            let max_bytes = self.max_buffered_mb * 1024 * 1024;
2195            if size > max_bytes {
2196                let rows = full_df.height();
2197                if rows > 0 {
2198                    let bytes_per_row = size / rows;
2199                    let max_rows = (max_bytes / bytes_per_row.max(1)).min(rows);
2200                    if max_rows < rows {
2201                        full_df = full_df.slice(0, max_rows);
2202                        effective_buffer_end = buffer_start + max_rows;
2203                    }
2204                }
2205            }
2206        }
2207
2208        if self.locked_columns_count > 0 {
2209            let locked_names: Vec<&str> = self
2210                .column_order
2211                .iter()
2212                .take(self.locked_columns_count)
2213                .map(|s| s.as_str())
2214                .collect();
2215            let locked_df = match full_df.select(locked_names) {
2216                Ok(df) => df,
2217                Err(e) => {
2218                    self.error = Some(e);
2219                    return;
2220                }
2221            };
2222            self.locked_df = if self.is_grouped() {
2223                match self.format_grouped_dataframe(locked_df) {
2224                    Ok(formatted_df) => Some(formatted_df),
2225                    Err(e) => {
2226                        self.error = Some(PolarsError::ComputeError(
2227                            crate::error_display::user_message_from_report(&e, None).into(),
2228                        ));
2229                        return;
2230                    }
2231                }
2232            } else {
2233                Some(locked_df)
2234            };
2235        } else {
2236            self.locked_df = None;
2237        }
2238
2239        let scroll_names: Vec<&str> = self
2240            .column_order
2241            .iter()
2242            .skip(self.locked_columns_count + self.termcol_index)
2243            .map(|s| s.as_str())
2244            .collect();
2245        if scroll_names.is_empty() {
2246            self.df = None;
2247        } else {
2248            let scroll_df = match full_df.select(scroll_names) {
2249                Ok(df) => df,
2250                Err(e) => {
2251                    self.error = Some(e);
2252                    return;
2253                }
2254            };
2255            self.df = if self.is_grouped() {
2256                match self.format_grouped_dataframe(scroll_df) {
2257                    Ok(formatted_df) => Some(formatted_df),
2258                    Err(e) => {
2259                        self.error = Some(PolarsError::ComputeError(
2260                            crate::error_display::user_message_from_report(&e, None).into(),
2261                        ));
2262                        return;
2263                    }
2264                }
2265            } else {
2266                Some(scroll_df)
2267            };
2268        }
2269        if self.error.is_some() {
2270            self.error = None;
2271        }
2272        self.buffered_start_row = buffer_start;
2273        self.buffered_end_row = effective_buffer_end;
2274        self.buffered_df = Some(full_df);
2275    }
2276
2277    /// Recompute locked_df and df from the cached full buffer. Used when only termcol_index (or locked columns) changed.
2278    fn slice_buffer_into_display(&mut self) {
2279        let full_df = match self.buffered_df.as_ref() {
2280            Some(df) => df,
2281            None => return,
2282        };
2283
2284        if self.locked_columns_count > 0 {
2285            let locked_names: Vec<&str> = self
2286                .column_order
2287                .iter()
2288                .take(self.locked_columns_count)
2289                .map(|s| s.as_str())
2290                .collect();
2291            if let Ok(locked_df) = full_df.select(locked_names) {
2292                self.locked_df = if self.is_grouped() {
2293                    self.format_grouped_dataframe(locked_df).ok()
2294                } else {
2295                    Some(locked_df)
2296                };
2297            }
2298        } else {
2299            self.locked_df = None;
2300        }
2301
2302        let scroll_names: Vec<&str> = self
2303            .column_order
2304            .iter()
2305            .skip(self.locked_columns_count + self.termcol_index)
2306            .map(|s| s.as_str())
2307            .collect();
2308        if scroll_names.is_empty() {
2309            self.df = None;
2310        } else if let Ok(scroll_df) = full_df.select(scroll_names) {
2311            self.df = if self.is_grouped() {
2312                self.format_grouped_dataframe(scroll_df).ok()
2313            } else {
2314                Some(scroll_df)
2315            };
2316        }
2317    }
2318
2319    fn slice_from_buffer(&mut self) {
2320        // Buffer contains the full range [buffered_start_row, buffered_end_row)
2321        // The displayed portion [start_row, start_row + visible_rows) is a subset
2322        // We'll slice the displayed portion when rendering based on offset
2323        // No action needed here - the buffer is stored, slicing happens at render time
2324    }
2325
2326    fn format_grouped_dataframe(&self, df: DataFrame) -> Result<DataFrame> {
2327        let schema = df.schema();
2328        let mut new_series = Vec::new();
2329
2330        for (col_name, dtype) in schema.iter() {
2331            let col = df.column(col_name)?;
2332            if matches!(dtype, DataType::List(_)) {
2333                let string_series: Series = col
2334                    .list()?
2335                    .into_iter()
2336                    .map(|opt_list| {
2337                        opt_list.map(|list_series| {
2338                            let values: Vec<String> = list_series
2339                                .iter()
2340                                .take(10)
2341                                .map(|v| v.str_value().to_string())
2342                                .collect();
2343                            if list_series.len() > 10 {
2344                                format!("[{}...] ({} items)", values.join(", "), list_series.len())
2345                            } else {
2346                                format!("[{}]", values.join(", "))
2347                            }
2348                        })
2349                    })
2350                    .collect();
2351                new_series.push(string_series.with_name(col_name.as_str().into()).into());
2352            } else {
2353                new_series.push(col.clone());
2354            }
2355        }
2356
2357        Ok(DataFrame::new(new_series)?)
2358    }
2359
2360    pub fn select_next(&mut self) {
2361        self.table_state.select_next();
2362        if let Some(selected) = self.table_state.selected() {
2363            if selected >= self.visible_rows && self.visible_rows > 0 {
2364                self.slide_table(1);
2365            }
2366        }
2367    }
2368
2369    pub fn page_down(&mut self) {
2370        self.slide_table(self.visible_rows as i64);
2371    }
2372
2373    pub fn select_previous(&mut self) {
2374        if let Some(selected) = self.table_state.selected() {
2375            self.table_state.select_previous();
2376            if selected == 0 && self.start_row > 0 {
2377                self.slide_table(-1);
2378            }
2379        } else {
2380            self.table_state.select(Some(0));
2381        }
2382    }
2383
2384    pub fn scroll_to(&mut self, index: usize) {
2385        if self.start_row == index {
2386            return;
2387        }
2388
2389        if index == 0 {
2390            self.start_row = 0;
2391            self.collect();
2392            self.start_row = 0;
2393        } else {
2394            self.start_row = index;
2395            self.collect();
2396        }
2397    }
2398
2399    /// Scroll so that the given row index is centered in the view when possible (respects table bounds).
2400    /// Selects that row. Used by go-to-line.
2401    pub fn scroll_to_row_centered(&mut self, row_index: usize) {
2402        self.ensure_num_rows();
2403        if self.num_rows == 0 || self.visible_rows == 0 {
2404            return;
2405        }
2406        let center_offset = self.visible_rows / 2;
2407        let mut start_row = row_index.saturating_sub(center_offset);
2408        let max_start = self.num_rows.saturating_sub(self.visible_rows);
2409        start_row = start_row.min(max_start);
2410
2411        if self.start_row == start_row {
2412            let display_idx = row_index
2413                .saturating_sub(start_row)
2414                .min(self.visible_rows.saturating_sub(1));
2415            self.table_state.select(Some(display_idx));
2416            return;
2417        }
2418
2419        self.start_row = start_row;
2420        self.collect();
2421        let display_idx = row_index
2422            .saturating_sub(start_row)
2423            .min(self.visible_rows.saturating_sub(1));
2424        self.table_state.select(Some(display_idx));
2425    }
2426
2427    /// Ensure num_rows is up to date (runs len() query if needed). Used before scroll_to_end.
2428    fn ensure_num_rows(&mut self) {
2429        if self.num_rows_valid {
2430            return;
2431        }
2432        if self.visible_rows > 0 {
2433            self.proximity_threshold = self.visible_rows;
2434        }
2435        self.num_rows = match self.lf.clone().select([len()]).collect() {
2436            Ok(df) => {
2437                if let Some(col) = df.get(0) {
2438                    if let Some(AnyValue::UInt32(len)) = col.first() {
2439                        *len as usize
2440                    } else {
2441                        0
2442                    }
2443                } else {
2444                    0
2445                }
2446            }
2447            Err(_) => 0,
2448        };
2449        self.num_rows_valid = true;
2450    }
2451
2452    /// Jump to the last page; buffer is trimmed/loaded as needed. Selects the last row.
2453    pub fn scroll_to_end(&mut self) {
2454        self.ensure_num_rows();
2455        if self.num_rows == 0 {
2456            self.start_row = 0;
2457            self.buffered_start_row = 0;
2458            self.buffered_end_row = 0;
2459            return;
2460        }
2461        let end_start = self.num_rows.saturating_sub(self.visible_rows);
2462        if self.start_row == end_start {
2463            self.select_last_visible_row();
2464            return;
2465        }
2466        self.start_row = end_start;
2467        self.collect();
2468        self.select_last_visible_row();
2469    }
2470
2471    /// Set table selection to the last row in the current view (for use after scroll_to_end).
2472    fn select_last_visible_row(&mut self) {
2473        if self.num_rows == 0 {
2474            return;
2475        }
2476        let last_row_display_idx = (self.num_rows - 1).saturating_sub(self.start_row);
2477        let sel = last_row_display_idx.min(self.visible_rows.saturating_sub(1));
2478        self.table_state.select(Some(sel));
2479    }
2480
2481    pub fn half_page_down(&mut self) {
2482        let half = (self.visible_rows / 2).max(1) as i64;
2483        self.slide_table(half);
2484    }
2485
2486    pub fn half_page_up(&mut self) {
2487        if self.start_row == 0 {
2488            return;
2489        }
2490        let half = (self.visible_rows / 2).max(1) as i64;
2491        self.slide_table(-half);
2492    }
2493
2494    pub fn page_up(&mut self) {
2495        if self.start_row == 0 {
2496            return;
2497        }
2498        self.slide_table(-(self.visible_rows as i64));
2499    }
2500
2501    pub fn scroll_right(&mut self) {
2502        let max_scroll = self
2503            .column_order
2504            .len()
2505            .saturating_sub(self.locked_columns_count);
2506        if self.termcol_index < max_scroll.saturating_sub(1) {
2507            self.termcol_index += 1;
2508            self.collect();
2509        }
2510    }
2511
2512    pub fn scroll_left(&mut self) {
2513        if self.termcol_index > 0 {
2514            self.termcol_index -= 1;
2515            self.collect();
2516        }
2517    }
2518
2519    pub fn headers(&self) -> Vec<String> {
2520        self.column_order.clone()
2521    }
2522
2523    pub fn set_column_order(&mut self, order: Vec<String>) {
2524        self.column_order = order;
2525        self.buffered_start_row = 0;
2526        self.buffered_end_row = 0;
2527        self.buffered_df = None;
2528        self.collect();
2529    }
2530
2531    pub fn set_locked_columns(&mut self, count: usize) {
2532        self.locked_columns_count = count.min(self.column_order.len());
2533        self.buffered_start_row = 0;
2534        self.buffered_end_row = 0;
2535        self.buffered_df = None;
2536        self.collect();
2537    }
2538
2539    pub fn locked_columns_count(&self) -> usize {
2540        self.locked_columns_count
2541    }
2542
2543    // Getter methods for template creation
2544    pub fn get_filters(&self) -> &[FilterStatement] {
2545        &self.filters
2546    }
2547
2548    pub fn get_sort_columns(&self) -> &[String] {
2549        &self.sort_columns
2550    }
2551
2552    pub fn get_sort_ascending(&self) -> bool {
2553        self.sort_ascending
2554    }
2555
2556    pub fn get_column_order(&self) -> &[String] {
2557        &self.column_order
2558    }
2559
2560    pub fn get_active_query(&self) -> &str {
2561        &self.active_query
2562    }
2563
2564    pub fn get_active_sql_query(&self) -> &str {
2565        &self.active_sql_query
2566    }
2567
2568    pub fn get_active_fuzzy_query(&self) -> &str {
2569        &self.active_fuzzy_query
2570    }
2571
2572    pub fn last_pivot_spec(&self) -> Option<&PivotSpec> {
2573        self.last_pivot_spec.as_ref()
2574    }
2575
2576    pub fn last_melt_spec(&self) -> Option<&MeltSpec> {
2577        self.last_melt_spec.as_ref()
2578    }
2579
2580    pub fn is_grouped(&self) -> bool {
2581        self.schema
2582            .iter()
2583            .any(|(_, dtype)| matches!(dtype, DataType::List(_)))
2584    }
2585
2586    pub fn group_key_columns(&self) -> Vec<String> {
2587        self.schema
2588            .iter()
2589            .filter(|(_, dtype)| !matches!(dtype, DataType::List(_)))
2590            .map(|(name, _)| name.to_string())
2591            .collect()
2592    }
2593
2594    pub fn group_value_columns(&self) -> Vec<String> {
2595        self.schema
2596            .iter()
2597            .filter(|(_, dtype)| matches!(dtype, DataType::List(_)))
2598            .map(|(name, _)| name.to_string())
2599            .collect()
2600    }
2601
2602    /// Estimated heap size in bytes of the currently buffered slice (locked + scrollable), if collected.
2603    pub fn buffered_memory_bytes(&self) -> Option<usize> {
2604        let locked = self
2605            .locked_df
2606            .as_ref()
2607            .map(|df| df.estimated_size())
2608            .unwrap_or(0);
2609        let scroll = self.df.as_ref().map(|df| df.estimated_size()).unwrap_or(0);
2610        if locked == 0 && scroll == 0 {
2611            None
2612        } else {
2613            Some(locked + scroll)
2614        }
2615    }
2616
2617    /// Number of rows currently in the buffer. 0 if no buffer loaded.
2618    pub fn buffered_rows(&self) -> usize {
2619        self.buffered_end_row
2620            .saturating_sub(self.buffered_start_row)
2621    }
2622
2623    /// Maximum buffer size in rows (0 = no limit).
2624    pub fn max_buffered_rows(&self) -> usize {
2625        self.max_buffered_rows
2626    }
2627
2628    /// Maximum buffer size in MiB (0 = no limit).
2629    pub fn max_buffered_mb(&self) -> usize {
2630        self.max_buffered_mb
2631    }
2632
2633    pub fn drill_down_into_group(&mut self, group_index: usize) -> Result<()> {
2634        if !self.is_grouped() {
2635            return Ok(());
2636        }
2637
2638        self.grouped_lf = Some(self.lf.clone());
2639
2640        let grouped_df = collect_lazy(self.lf.clone(), self.polars_streaming)?;
2641
2642        if group_index >= grouped_df.height() {
2643            return Err(color_eyre::eyre::eyre!("Group index out of bounds"));
2644        }
2645
2646        let key_columns = self.group_key_columns();
2647        let mut key_values = Vec::new();
2648        for col_name in &key_columns {
2649            let col = grouped_df.column(col_name)?;
2650            let value = col.get(group_index).map_err(|e| {
2651                color_eyre::eyre::eyre!(
2652                    "Group index {} out of bounds for column {}: {}",
2653                    group_index,
2654                    col_name,
2655                    e
2656                )
2657            })?;
2658            key_values.push(value.str_value().to_string());
2659        }
2660        self.drilled_down_group_key = Some(key_values.clone());
2661        self.drilled_down_group_key_columns = Some(key_columns.clone());
2662
2663        let value_columns = self.group_value_columns();
2664        if value_columns.is_empty() {
2665            return Err(color_eyre::eyre::eyre!("No value columns in grouped data"));
2666        }
2667
2668        let mut columns = Vec::new();
2669
2670        let first_value_col = grouped_df.column(&value_columns[0])?;
2671        let first_list_value = first_value_col.get(group_index).map_err(|e| {
2672            color_eyre::eyre::eyre!("Group index {} out of bounds: {}", group_index, e)
2673        })?;
2674        let row_count = if let AnyValue::List(list_series) = first_list_value {
2675            list_series.len()
2676        } else {
2677            0
2678        };
2679
2680        for col_name in &key_columns {
2681            let col = grouped_df.column(col_name)?;
2682            let value = col.get(group_index).map_err(|e| {
2683                color_eyre::eyre::eyre!(
2684                    "Group index {} out of bounds for column {}: {}",
2685                    group_index,
2686                    col_name,
2687                    e
2688                )
2689            })?;
2690            let constant_series = match value {
2691                AnyValue::Int32(v) => Series::new(col_name.as_str().into(), vec![v; row_count]),
2692                AnyValue::Int64(v) => Series::new(col_name.as_str().into(), vec![v; row_count]),
2693                AnyValue::UInt32(v) => Series::new(col_name.as_str().into(), vec![v; row_count]),
2694                AnyValue::UInt64(v) => Series::new(col_name.as_str().into(), vec![v; row_count]),
2695                AnyValue::Float32(v) => Series::new(col_name.as_str().into(), vec![v; row_count]),
2696                AnyValue::Float64(v) => Series::new(col_name.as_str().into(), vec![v; row_count]),
2697                AnyValue::String(v) => {
2698                    Series::new(col_name.as_str().into(), vec![v.to_string(); row_count])
2699                }
2700                AnyValue::Boolean(v) => Series::new(col_name.as_str().into(), vec![v; row_count]),
2701                _ => {
2702                    let str_val = value.str_value().to_string();
2703                    Series::new(col_name.as_str().into(), vec![str_val; row_count])
2704                }
2705            };
2706            columns.push(constant_series.into());
2707        }
2708
2709        for col_name in &value_columns {
2710            let col = grouped_df.column(col_name)?;
2711            let value = col.get(group_index).map_err(|e| {
2712                color_eyre::eyre::eyre!(
2713                    "Group index {} out of bounds for column {}: {}",
2714                    group_index,
2715                    col_name,
2716                    e
2717                )
2718            })?;
2719            if let AnyValue::List(list_series) = value {
2720                let named_series = list_series.with_name(col_name.as_str().into());
2721                columns.push(named_series.into());
2722            }
2723        }
2724
2725        let group_df = DataFrame::new(columns)?;
2726
2727        self.invalidate_num_rows();
2728        self.lf = group_df.lazy();
2729        self.schema = self.lf.clone().collect_schema()?;
2730        self.column_order = self.schema.iter_names().map(|s| s.to_string()).collect();
2731        self.drilled_down_group_index = Some(group_index);
2732        self.start_row = 0;
2733        self.termcol_index = 0;
2734        self.locked_columns_count = 0;
2735        self.table_state.select(Some(0));
2736        self.collect();
2737
2738        Ok(())
2739    }
2740
2741    pub fn drill_up(&mut self) -> Result<()> {
2742        if let Some(grouped_lf) = self.grouped_lf.take() {
2743            self.invalidate_num_rows();
2744            self.lf = grouped_lf;
2745            self.schema = self.lf.clone().collect_schema()?;
2746            self.column_order = self.schema.iter_names().map(|s| s.to_string()).collect();
2747            self.drilled_down_group_index = None;
2748            self.drilled_down_group_key = None;
2749            self.drilled_down_group_key_columns = None;
2750            self.start_row = 0;
2751            self.termcol_index = 0;
2752            self.locked_columns_count = 0;
2753            self.table_state.select(Some(0));
2754            self.collect();
2755            Ok(())
2756        } else {
2757            Err(color_eyre::eyre::eyre!("Not in drill-down mode"))
2758        }
2759    }
2760
2761    pub fn get_analysis_dataframe(&self) -> Result<DataFrame> {
2762        Ok(collect_lazy(self.lf.clone(), self.polars_streaming)?)
2763    }
2764
2765    pub fn get_analysis_context(&self) -> crate::statistics::AnalysisContext {
2766        crate::statistics::AnalysisContext {
2767            has_query: !self.active_query.is_empty(),
2768            query: self.active_query.clone(),
2769            has_filters: !self.filters.is_empty(),
2770            filter_count: self.filters.len(),
2771            is_drilled_down: self.is_drilled_down(),
2772            group_key: self.drilled_down_group_key.clone(),
2773            group_columns: self.drilled_down_group_key_columns.clone(),
2774        }
2775    }
2776
2777    /// Pivot the current `LazyFrame` (long → wide). Never uses `original_lf`.
2778    /// Collects current `lf`, runs `pivot_stable`, then replaces `lf` with result.
2779    /// We use pivot_stable for all aggregation types: Polars' non-stable pivot() prints
2780    /// "unstable pivot not yet supported, using stable pivot" to stdout, which corrupts the TUI.
2781    pub fn pivot(&mut self, spec: &PivotSpec) -> Result<()> {
2782        let df = collect_lazy(self.lf.clone(), self.polars_streaming)?;
2783        let agg_expr = pivot_agg_expr(spec.aggregation)?;
2784        let index_str: Vec<&str> = spec.index.iter().map(|s| s.as_str()).collect();
2785        let index_opt = if index_str.is_empty() {
2786            None
2787        } else {
2788            Some(index_str)
2789        };
2790        let pivoted = pivot_stable(
2791            &df,
2792            [spec.pivot_column.as_str()],
2793            index_opt,
2794            Some([spec.value_column.as_str()]),
2795            spec.sort_columns,
2796            Some(agg_expr),
2797            None,
2798        )?;
2799        self.last_pivot_spec = Some(spec.clone());
2800        self.last_melt_spec = None;
2801        self.replace_lf_after_reshape(pivoted.lazy())?;
2802        Ok(())
2803    }
2804
2805    /// Melt the current `LazyFrame` (wide → long). Never uses `original_lf`.
2806    pub fn melt(&mut self, spec: &MeltSpec) -> Result<()> {
2807        let on = cols(spec.value_columns.iter().map(|s| s.as_str()));
2808        let index = cols(spec.index.iter().map(|s| s.as_str()));
2809        let args = UnpivotArgsDSL {
2810            on,
2811            index,
2812            variable_name: Some(PlSmallStr::from(spec.variable_name.as_str())),
2813            value_name: Some(PlSmallStr::from(spec.value_name.as_str())),
2814        };
2815        let lf = self.lf.clone().unpivot(args);
2816        self.last_melt_spec = Some(spec.clone());
2817        self.last_pivot_spec = None;
2818        self.replace_lf_after_reshape(lf)?;
2819        Ok(())
2820    }
2821
2822    fn replace_lf_after_reshape(&mut self, lf: LazyFrame) -> Result<()> {
2823        self.invalidate_num_rows();
2824        self.lf = lf;
2825        self.schema = self.lf.clone().collect_schema()?;
2826        self.column_order = self.schema.iter_names().map(|s| s.to_string()).collect();
2827        self.filters.clear();
2828        self.sort_columns.clear();
2829        self.active_query.clear();
2830        self.active_sql_query.clear();
2831        self.active_fuzzy_query.clear();
2832        self.error = None;
2833        self.df = None;
2834        self.locked_df = None;
2835        self.grouped_lf = None;
2836        self.drilled_down_group_index = None;
2837        self.drilled_down_group_key = None;
2838        self.drilled_down_group_key_columns = None;
2839        self.start_row = 0;
2840        self.termcol_index = 0;
2841        self.locked_columns_count = 0;
2842        self.buffered_start_row = 0;
2843        self.buffered_end_row = 0;
2844        self.buffered_df = None;
2845        self.table_state.select(Some(0));
2846        self.collect();
2847        Ok(())
2848    }
2849
2850    pub fn is_drilled_down(&self) -> bool {
2851        self.drilled_down_group_index.is_some()
2852    }
2853
2854    fn apply_transformations(&mut self) {
2855        let mut lf = self.lf.clone();
2856        let mut final_expr: Option<Expr> = None;
2857
2858        for filter in &self.filters {
2859            let col_expr = col(&filter.column);
2860            let val_lit = if let Some(dtype) = self.schema.get(&filter.column) {
2861                match dtype {
2862                    DataType::Float32 | DataType::Float64 => filter
2863                        .value
2864                        .parse::<f64>()
2865                        .map(lit)
2866                        .unwrap_or_else(|_| lit(filter.value.as_str())),
2867                    DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => filter
2868                        .value
2869                        .parse::<i64>()
2870                        .map(lit)
2871                        .unwrap_or_else(|_| lit(filter.value.as_str())),
2872                    DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 => {
2873                        filter
2874                            .value
2875                            .parse::<u64>()
2876                            .map(lit)
2877                            .unwrap_or_else(|_| lit(filter.value.as_str()))
2878                    }
2879                    DataType::Boolean => filter
2880                        .value
2881                        .parse::<bool>()
2882                        .map(lit)
2883                        .unwrap_or_else(|_| lit(filter.value.as_str())),
2884                    _ => lit(filter.value.as_str()),
2885                }
2886            } else {
2887                lit(filter.value.as_str())
2888            };
2889
2890            let op_expr = match filter.operator {
2891                FilterOperator::Eq => col_expr.eq(val_lit),
2892                FilterOperator::NotEq => col_expr.neq(val_lit),
2893                FilterOperator::Gt => col_expr.gt(val_lit),
2894                FilterOperator::Lt => col_expr.lt(val_lit),
2895                FilterOperator::GtEq => col_expr.gt_eq(val_lit),
2896                FilterOperator::LtEq => col_expr.lt_eq(val_lit),
2897                FilterOperator::Contains => {
2898                    let val = filter.value.clone();
2899                    col_expr.str().contains_literal(lit(val))
2900                }
2901                FilterOperator::NotContains => {
2902                    let val = filter.value.clone();
2903                    col_expr.str().contains_literal(lit(val)).not()
2904                }
2905            };
2906
2907            if let Some(current) = final_expr {
2908                final_expr = Some(match filter.logical_op {
2909                    LogicalOperator::And => current.and(op_expr),
2910                    LogicalOperator::Or => current.or(op_expr),
2911                });
2912            } else {
2913                final_expr = Some(op_expr);
2914            }
2915        }
2916
2917        if let Some(e) = final_expr {
2918            lf = lf.filter(e);
2919        }
2920
2921        if !self.sort_columns.is_empty() {
2922            let options = SortMultipleOptions {
2923                descending: self
2924                    .sort_columns
2925                    .iter()
2926                    .map(|_| !self.sort_ascending)
2927                    .collect(),
2928                ..Default::default()
2929            };
2930            lf = lf.sort_by_exprs(
2931                self.sort_columns.iter().map(col).collect::<Vec<_>>(),
2932                options,
2933            );
2934        } else if !self.sort_ascending {
2935            lf = lf.reverse();
2936        }
2937
2938        self.invalidate_num_rows();
2939        self.lf = lf;
2940        self.collect();
2941    }
2942
2943    pub fn sort(&mut self, columns: Vec<String>, ascending: bool) {
2944        self.sort_columns = columns;
2945        self.sort_ascending = ascending;
2946        self.buffered_start_row = 0;
2947        self.buffered_end_row = 0;
2948        self.buffered_df = None;
2949        self.apply_transformations();
2950    }
2951
2952    pub fn reverse(&mut self) {
2953        self.sort_ascending = !self.sort_ascending;
2954
2955        self.buffered_start_row = 0;
2956        self.buffered_end_row = 0;
2957        self.buffered_df = None;
2958
2959        if !self.sort_columns.is_empty() {
2960            let options = SortMultipleOptions {
2961                descending: self
2962                    .sort_columns
2963                    .iter()
2964                    .map(|_| !self.sort_ascending)
2965                    .collect(),
2966                ..Default::default()
2967            };
2968            self.invalidate_num_rows();
2969            self.lf = self.lf.clone().sort_by_exprs(
2970                self.sort_columns.iter().map(col).collect::<Vec<_>>(),
2971                options,
2972            );
2973            self.collect();
2974        } else {
2975            self.invalidate_num_rows();
2976            self.lf = self.lf.clone().reverse();
2977            self.collect();
2978        }
2979    }
2980
2981    pub fn filter(&mut self, filters: Vec<FilterStatement>) {
2982        self.filters = filters;
2983        self.buffered_start_row = 0;
2984        self.buffered_end_row = 0;
2985        self.buffered_df = None;
2986        self.apply_transformations();
2987    }
2988
2989    pub fn query(&mut self, query: String) {
2990        self.error = None;
2991
2992        let trimmed_query = query.trim();
2993        if trimmed_query.is_empty() {
2994            self.reset_lf_to_original();
2995            self.collect();
2996            return;
2997        }
2998
2999        match parse_query(&query) {
3000            Ok((cols, filter, group_by_cols, group_by_col_names)) => {
3001                let mut lf = self.original_lf.clone();
3002
3003                // Apply filter first (where clause)
3004                if let Some(f) = filter {
3005                    lf = lf.filter(f);
3006                }
3007
3008                if !group_by_cols.is_empty() {
3009                    if !cols.is_empty() {
3010                        lf = lf.group_by(group_by_cols.clone()).agg(cols);
3011                        lf = lf.sort_by_exprs(group_by_cols.clone(), Default::default());
3012                    } else {
3013                        let schema = match lf.clone().collect_schema() {
3014                            Ok(s) => s,
3015                            Err(e) => {
3016                                self.error = Some(e);
3017                                return; // Don't modify state on error
3018                            }
3019                        };
3020                        let all_columns: Vec<String> =
3021                            schema.iter_names().map(|s| s.to_string()).collect();
3022
3023                        // In Polars, when you group_by and aggregate columns without explicit aggregation functions,
3024                        // Polars automatically collects the values as lists. We need to aggregate all columns
3025                        // except the group columns to avoid duplicates.
3026                        let mut agg_exprs = Vec::new();
3027                        for col_name in &all_columns {
3028                            if !group_by_col_names.contains(col_name) {
3029                                agg_exprs.push(col(col_name));
3030                            }
3031                        }
3032
3033                        lf = lf.group_by(group_by_cols.clone()).agg(agg_exprs);
3034                        lf = lf.sort_by_exprs(group_by_cols.clone(), Default::default());
3035                    }
3036                } else if !cols.is_empty() {
3037                    lf = lf.select(cols);
3038                }
3039
3040                let schema = match lf.collect_schema() {
3041                    Ok(schema) => schema,
3042                    Err(e) => {
3043                        self.error = Some(e);
3044                        return;
3045                    }
3046                };
3047
3048                self.schema = schema;
3049                self.invalidate_num_rows();
3050                self.lf = lf;
3051                self.column_order = self.schema.iter_names().map(|s| s.to_string()).collect();
3052
3053                // Lock grouped columns if by clause was used
3054                // Only lock the columns specified in the 'by' clause, not the value columns
3055                if !group_by_col_names.is_empty() {
3056                    // Group columns appear first in Polars results, so count consecutive
3057                    // columns from the start that are in group_by_col_names
3058                    let mut locked_count = 0;
3059                    for col_name in &self.column_order {
3060                        if group_by_col_names.contains(col_name) {
3061                            locked_count += 1;
3062                        } else {
3063                            // Once we hit a non-group column, we've passed all group columns
3064                            break;
3065                        }
3066                    }
3067                    self.locked_columns_count = locked_count;
3068                } else {
3069                    self.locked_columns_count = 0;
3070                }
3071
3072                // Clear filters when using query
3073                self.filters.clear();
3074                self.sort_columns.clear();
3075                self.sort_ascending = true;
3076                self.start_row = 0;
3077                self.termcol_index = 0;
3078                self.active_query = query;
3079                self.buffered_start_row = 0;
3080                self.buffered_end_row = 0;
3081                self.buffered_df = None;
3082                // Reset drill-down state when applying new query
3083                self.drilled_down_group_index = None;
3084                self.drilled_down_group_key = None;
3085                self.drilled_down_group_key_columns = None;
3086                self.grouped_lf = None;
3087                // Reset table state selection
3088                self.table_state.select(Some(0));
3089                // Collect will clamp start_row to valid range, but we want to ensure it's 0
3090                // So we set it to 0, collect (which may clamp it), then ensure it's 0 again
3091                self.collect();
3092                // After collect(), ensure we're at the top (collect() may have clamped if num_rows was wrong)
3093                // But if num_rows > 0, we want start_row = 0 to show the first row
3094                if self.num_rows > 0 {
3095                    self.start_row = 0;
3096                }
3097            }
3098            Err(e) => {
3099                // Parse errors are already user-facing strings; store as ComputeError
3100                self.error = Some(PolarsError::ComputeError(e.into()));
3101            }
3102        }
3103    }
3104
3105    /// Execute a SQL query against the current LazyFrame (registered as table "df").
3106    /// Empty SQL resets to original state. Does not call collect(); the event loop does that via AppEvent::Collect.
3107    pub fn sql_query(&mut self, sql: String) {
3108        self.error = None;
3109        let trimmed = sql.trim();
3110        if trimmed.is_empty() {
3111            self.reset_lf_to_original();
3112            return;
3113        }
3114
3115        #[cfg(feature = "sql")]
3116        {
3117            use polars_sql::SQLContext;
3118            let mut ctx = SQLContext::new();
3119            ctx.register("df", self.lf.clone());
3120            match ctx.execute(trimmed) {
3121                Ok(result_lf) => {
3122                    let schema = match result_lf.clone().collect_schema() {
3123                        Ok(s) => s,
3124                        Err(e) => {
3125                            self.error = Some(e);
3126                            return;
3127                        }
3128                    };
3129                    self.schema = schema;
3130                    self.invalidate_num_rows();
3131                    self.lf = result_lf;
3132                    self.column_order = self.schema.iter_names().map(|s| s.to_string()).collect();
3133                    self.active_sql_query = sql;
3134                    self.locked_columns_count = 0;
3135                    self.filters.clear();
3136                    self.sort_columns.clear();
3137                    self.sort_ascending = true;
3138                    self.start_row = 0;
3139                    self.termcol_index = 0;
3140                    self.drilled_down_group_index = None;
3141                    self.drilled_down_group_key = None;
3142                    self.drilled_down_group_key_columns = None;
3143                    self.grouped_lf = None;
3144                    self.buffered_start_row = 0;
3145                    self.buffered_end_row = 0;
3146                    self.buffered_df = None;
3147                    self.table_state.select(Some(0));
3148                }
3149                Err(e) => {
3150                    self.error = Some(e);
3151                }
3152            }
3153        }
3154
3155        #[cfg(not(feature = "sql"))]
3156        {
3157            self.error = Some(PolarsError::ComputeError(
3158                format!("SQL support not compiled in (build with --features sql)").into(),
3159            ));
3160        }
3161    }
3162
3163    /// Fuzzy search: filter rows where any string column matches the query.
3164    /// Query is split on whitespace; each token must match (in order, case-insensitive) in some string column.
3165    /// Empty query resets to original_lf.
3166    pub fn fuzzy_search(&mut self, query: String) {
3167        self.error = None;
3168        let trimmed = query.trim();
3169        if trimmed.is_empty() {
3170            self.reset_lf_to_original();
3171            self.collect();
3172            return;
3173        }
3174        let string_cols: Vec<String> = self
3175            .schema
3176            .iter()
3177            .filter(|(_, dtype)| dtype.is_string())
3178            .map(|(name, _)| name.to_string())
3179            .collect();
3180        if string_cols.is_empty() {
3181            self.error = Some(PolarsError::ComputeError(
3182                "Fuzzy search requires at least one string column".into(),
3183            ));
3184            return;
3185        }
3186        let tokens: Vec<&str> = trimmed
3187            .split_whitespace()
3188            .filter(|s| !s.is_empty())
3189            .collect();
3190        let token_exprs: Vec<Expr> = tokens
3191            .iter()
3192            .map(|token| {
3193                let pattern = fuzzy_token_regex(token);
3194                string_cols
3195                    .iter()
3196                    .map(|c| col(c.as_str()).str().contains(lit(pattern.as_str()), false))
3197                    .reduce(|a, b| a.or(b))
3198                    .unwrap()
3199            })
3200            .collect();
3201        let combined = token_exprs.into_iter().reduce(|a, b| a.and(b)).unwrap();
3202        self.lf = self.original_lf.clone().filter(combined);
3203        self.filters.clear();
3204        self.sort_columns.clear();
3205        self.active_query.clear();
3206        self.active_sql_query.clear();
3207        self.active_fuzzy_query = query;
3208        // Reset view and buffer so collect() runs on the new lf
3209        self.locked_columns_count = 0;
3210        self.start_row = 0;
3211        self.termcol_index = 0;
3212        self.drilled_down_group_index = None;
3213        self.drilled_down_group_key = None;
3214        self.drilled_down_group_key_columns = None;
3215        self.grouped_lf = None;
3216        self.buffered_start_row = 0;
3217        self.buffered_end_row = 0;
3218        self.buffered_df = None;
3219        self.table_state.select(Some(0));
3220        self.invalidate_num_rows();
3221        self.collect();
3222    }
3223}
3224
3225/// Case-insensitive regex for one token: chars in order with `.*` between.
3226pub(crate) fn fuzzy_token_regex(token: &str) -> String {
3227    let inner: String =
3228        token
3229            .chars()
3230            .map(|c| regex::escape(&c.to_string()))
3231            .fold(String::new(), |mut s, e| {
3232                if !s.is_empty() {
3233                    s.push_str(".*");
3234                }
3235                s.push_str(&e);
3236                s
3237            });
3238    format!("(?i).*{}.*", inner)
3239}
3240
3241pub struct DataTable {
3242    pub header_bg: Color,
3243    pub header_fg: Color,
3244    pub row_numbers_fg: Color,
3245    pub separator_fg: Color,
3246    pub table_cell_padding: u16,
3247    pub alternate_row_bg: Option<Color>,
3248    /// When true, colorize cells by column type using the optional colors below.
3249    pub column_colors: bool,
3250    pub str_col: Option<Color>,
3251    pub int_col: Option<Color>,
3252    pub float_col: Option<Color>,
3253    pub bool_col: Option<Color>,
3254    pub temporal_col: Option<Color>,
3255}
3256
3257impl Default for DataTable {
3258    fn default() -> Self {
3259        Self {
3260            header_bg: Color::Indexed(236),
3261            header_fg: Color::White,
3262            row_numbers_fg: Color::DarkGray,
3263            separator_fg: Color::White,
3264            table_cell_padding: 1,
3265            alternate_row_bg: None,
3266            column_colors: false,
3267            str_col: None,
3268            int_col: None,
3269            float_col: None,
3270            bool_col: None,
3271            temporal_col: None,
3272        }
3273    }
3274}
3275
3276/// Parameters for rendering the row numbers column.
3277struct RowNumbersParams {
3278    start_row: usize,
3279    visible_rows: usize,
3280    num_rows: usize,
3281    row_start_index: usize,
3282    selected_row: Option<usize>,
3283}
3284
3285impl DataTable {
3286    pub fn new() -> Self {
3287        Self::default()
3288    }
3289
3290    pub fn with_colors(
3291        mut self,
3292        header_bg: Color,
3293        header_fg: Color,
3294        row_numbers_fg: Color,
3295        separator_fg: Color,
3296    ) -> Self {
3297        self.header_bg = header_bg;
3298        self.header_fg = header_fg;
3299        self.row_numbers_fg = row_numbers_fg;
3300        self.separator_fg = separator_fg;
3301        self
3302    }
3303
3304    pub fn with_cell_padding(mut self, padding: u16) -> Self {
3305        self.table_cell_padding = padding;
3306        self
3307    }
3308
3309    pub fn with_alternate_row_bg(mut self, color: Option<Color>) -> Self {
3310        self.alternate_row_bg = color;
3311        self
3312    }
3313
3314    /// Enable column-type coloring and set colors for string, int, float, bool, and temporal columns.
3315    pub fn with_column_type_colors(
3316        mut self,
3317        str_col: Color,
3318        int_col: Color,
3319        float_col: Color,
3320        bool_col: Color,
3321        temporal_col: Color,
3322    ) -> Self {
3323        self.column_colors = true;
3324        self.str_col = Some(str_col);
3325        self.int_col = Some(int_col);
3326        self.float_col = Some(float_col);
3327        self.bool_col = Some(bool_col);
3328        self.temporal_col = Some(temporal_col);
3329        self
3330    }
3331
3332    /// Return the color for a column dtype when column_colors is enabled.
3333    fn column_type_color(&self, dtype: &DataType) -> Option<Color> {
3334        if !self.column_colors {
3335            return None;
3336        }
3337        match dtype {
3338            DataType::String => self.str_col,
3339            DataType::Int8
3340            | DataType::Int16
3341            | DataType::Int32
3342            | DataType::Int64
3343            | DataType::UInt8
3344            | DataType::UInt16
3345            | DataType::UInt32
3346            | DataType::UInt64 => self.int_col,
3347            DataType::Float32 | DataType::Float64 => self.float_col,
3348            DataType::Boolean => self.bool_col,
3349            DataType::Date | DataType::Datetime(_, _) | DataType::Time | DataType::Duration(_) => {
3350                self.temporal_col
3351            }
3352            _ => None,
3353        }
3354    }
3355
3356    fn render_dataframe(
3357        &self,
3358        df: &DataFrame,
3359        area: Rect,
3360        buf: &mut Buffer,
3361        state: &mut TableState,
3362        _row_numbers: bool,
3363        _start_row_offset: usize,
3364    ) {
3365        // make each column as wide as it needs to be to fit the content
3366        let (height, cols) = df.shape();
3367
3368        // widths starts at the length of each column naame
3369        let mut widths: Vec<u16> = df
3370            .get_column_names()
3371            .iter()
3372            .map(|name| name.chars().count() as u16)
3373            .collect();
3374
3375        let mut used_width = 0;
3376
3377        // rows is a vector initialized to a vector of lenth "height" empty rows
3378        let mut rows: Vec<Vec<Cell>> = vec![vec![]; height];
3379        let mut visible_columns = 0;
3380
3381        let max_rows = height.min(if area.height > 1 {
3382            area.height as usize - 1
3383        } else {
3384            0
3385        });
3386
3387        for col_index in 0..cols {
3388            let mut max_len = widths[col_index];
3389            let col_data = &df[col_index];
3390            let col_color = self.column_type_color(col_data.dtype());
3391
3392            for (row_index, row) in rows.iter_mut().take(max_rows).enumerate() {
3393                let value = col_data.get(row_index).unwrap();
3394                let val_str: Cow<str> = if matches!(value, AnyValue::Null) {
3395                    Cow::Borrowed("")
3396                } else {
3397                    value.str_value()
3398                };
3399                let len = val_str.chars().count() as u16;
3400                max_len = max_len.max(len);
3401                let cell = match col_color {
3402                    Some(c) => Cell::from(Line::from(Span::styled(
3403                        val_str.into_owned(),
3404                        Style::default().fg(c),
3405                    ))),
3406                    None => Cell::from(Line::from(val_str)),
3407                };
3408                row.push(cell);
3409            }
3410
3411            // Use > not >= so the last column is shown when it fits exactly (no padding needed after it)
3412            let overflows = (used_width + max_len) > area.width;
3413
3414            if overflows && col_data.dtype() == &DataType::String {
3415                let visible_width = area.width.saturating_sub(used_width);
3416                visible_columns += 1;
3417                widths[col_index] = visible_width;
3418                break;
3419            } else if !overflows {
3420                visible_columns += 1;
3421                widths[col_index] = max_len;
3422                used_width += max_len + self.table_cell_padding;
3423            } else {
3424                break;
3425            }
3426        }
3427
3428        widths.truncate(visible_columns);
3429        // convert rows to a vector of Row, with optional alternate row background
3430        let rows: Vec<Row> = rows
3431            .into_iter()
3432            .enumerate()
3433            .map(|(row_index, mut row)| {
3434                row.truncate(visible_columns);
3435                let row_style = if row_index % 2 == 1 {
3436                    self.alternate_row_bg
3437                        .map(|c| Style::default().bg(c))
3438                        .unwrap_or_default()
3439                } else {
3440                    Style::default()
3441                };
3442                Row::new(row).style(row_style)
3443            })
3444            .collect();
3445
3446        let header_row_style = if self.header_bg == Color::Reset {
3447            Style::default().fg(self.header_fg)
3448        } else {
3449            Style::default().bg(self.header_bg).fg(self.header_fg)
3450        };
3451        let headers: Vec<Span> = df
3452            .get_column_names()
3453            .iter()
3454            .take(visible_columns)
3455            .map(|name| Span::styled(name.to_string(), Style::default()))
3456            .collect();
3457
3458        StatefulWidget::render(
3459            Table::new(rows, widths)
3460                .column_spacing(self.table_cell_padding)
3461                .header(Row::new(headers).style(header_row_style))
3462                .row_highlight_style(Style::default().add_modifier(Modifier::REVERSED)),
3463            area,
3464            buf,
3465            state,
3466        );
3467    }
3468
3469    fn render_row_numbers(&self, area: Rect, buf: &mut Buffer, params: RowNumbersParams) {
3470        // Header row: same style as the rest of the column headers (fill full width so color matches)
3471        let header_style = if self.header_bg == Color::Reset {
3472            Style::default().fg(self.header_fg)
3473        } else {
3474            Style::default().bg(self.header_bg).fg(self.header_fg)
3475        };
3476        let header_fill = " ".repeat(area.width as usize);
3477        Paragraph::new(header_fill).style(header_style).render(
3478            Rect {
3479                x: area.x,
3480                y: area.y,
3481                width: area.width,
3482                height: 1,
3483            },
3484            buf,
3485        );
3486
3487        // Only render up to the actual number of rows in the data
3488        let rows_to_render = params
3489            .visible_rows
3490            .min(params.num_rows.saturating_sub(params.start_row));
3491
3492        if rows_to_render == 0 {
3493            return;
3494        }
3495
3496        // Calculate width needed for largest row number
3497        let max_row_num =
3498            params.start_row + rows_to_render.saturating_sub(1) + params.row_start_index;
3499        let max_width = max_row_num.to_string().len();
3500
3501        // Render row numbers
3502        for row_idx in 0..rows_to_render.min(area.height.saturating_sub(1) as usize) {
3503            let row_num = params.start_row + row_idx + params.row_start_index;
3504            let row_num_text = row_num.to_string();
3505
3506            // Right-align row numbers within the available width
3507            let padding = max_width.saturating_sub(row_num_text.len());
3508            let padded_text = format!("{}{}", " ".repeat(padding), row_num_text);
3509
3510            // Match main table background: default when row is even (or no alternate);
3511            // when alternate_row_bg is set, odd rows use that background.
3512            // When selected: same background as row (no inversion), foreground = terminal default.
3513            let is_selected = params.selected_row == Some(row_idx);
3514            let (fg, bg) = if is_selected {
3515                (
3516                    Color::Reset,
3517                    self.alternate_row_bg.filter(|_| row_idx % 2 == 1),
3518                )
3519            } else {
3520                (
3521                    self.row_numbers_fg,
3522                    self.alternate_row_bg.filter(|_| row_idx % 2 == 1),
3523                )
3524            };
3525            let row_num_style = match bg {
3526                Some(bg_color) => Style::default().fg(fg).bg(bg_color),
3527                None => Style::default().fg(fg),
3528            };
3529
3530            let y = area.y + row_idx as u16 + 1; // +1 for header row
3531            if y < area.y + area.height {
3532                Paragraph::new(padded_text).style(row_num_style).render(
3533                    Rect {
3534                        x: area.x,
3535                        y,
3536                        width: area.width,
3537                        height: 1,
3538                    },
3539                    buf,
3540                );
3541            }
3542        }
3543    }
3544}
3545
3546impl StatefulWidget for DataTable {
3547    type State = DataTableState;
3548
3549    fn render(self, area: Rect, buf: &mut Buffer, state: &mut Self::State) {
3550        state.visible_termcols = area.width as usize;
3551        let new_visible_rows = if area.height > 0 {
3552            (area.height - 1) as usize
3553        } else {
3554            0
3555        };
3556        let needs_collect = new_visible_rows != state.visible_rows;
3557        state.visible_rows = new_visible_rows;
3558
3559        if let Some(selected) = state.table_state.selected() {
3560            if selected >= state.visible_rows {
3561                state.table_state.select(Some(state.visible_rows - 1))
3562            }
3563        }
3564
3565        if needs_collect {
3566            state.collect();
3567        }
3568
3569        // Only show errors in main view if not suppressed (e.g., when query input is active)
3570        // Query errors should only be shown in the query input frame
3571        if let Some(error) = state.error.as_ref() {
3572            if !state.suppress_error_display {
3573                Paragraph::new(format!("Error: {}", user_message_from_polars(error)))
3574                    .centered()
3575                    .block(
3576                        Block::default()
3577                            .borders(Borders::NONE)
3578                            .padding(Padding::top(area.height / 2)),
3579                    )
3580                    .wrap(ratatui::widgets::Wrap { trim: true })
3581                    .render(area, buf);
3582                return;
3583            }
3584            // If suppress_error_display is true, continue rendering the table normally
3585        }
3586
3587        // Calculate row number column width if enabled
3588        let row_num_width = if state.row_numbers {
3589            let max_row_num = state.start_row + state.visible_rows.saturating_sub(1) + 1; // +1 for 1-based, +1 for potential
3590            max_row_num.to_string().len().max(1) as u16 + 1 // +1 for spacing
3591        } else {
3592            0
3593        };
3594
3595        // Calculate locked columns width if any
3596        let mut locked_width = row_num_width;
3597        if let Some(locked_df) = state.locked_df.as_ref() {
3598            let (_, cols) = locked_df.shape();
3599            for col_index in 0..cols {
3600                let col_name = locked_df.get_column_names()[col_index];
3601                let mut max_len = col_name.chars().count() as u16;
3602                let col_data = &locked_df[col_index];
3603                for row_index in 0..locked_df.height().min(state.visible_rows) {
3604                    let value = col_data.get(row_index).unwrap();
3605                    let val_str: Cow<str> = if matches!(value, AnyValue::Null) {
3606                        Cow::Borrowed("")
3607                    } else {
3608                        value.str_value()
3609                    };
3610                    let len = val_str.chars().count() as u16;
3611                    max_len = max_len.max(len);
3612                }
3613                locked_width += max_len + 1;
3614            }
3615        }
3616
3617        // Split area into locked and scrollable parts
3618        if locked_width > row_num_width && locked_width < area.width {
3619            let locked_area = Rect {
3620                x: area.x,
3621                y: area.y,
3622                width: locked_width,
3623                height: area.height,
3624            };
3625            let separator_x = locked_area.x + locked_area.width;
3626
3627            // If row numbers are enabled, render them first in a separate area
3628            if state.row_numbers {
3629                let row_num_area = Rect {
3630                    x: area.x,
3631                    y: area.y,
3632                    width: row_num_width,
3633                    height: area.height,
3634                };
3635                self.render_row_numbers(
3636                    row_num_area,
3637                    buf,
3638                    RowNumbersParams {
3639                        start_row: state.start_row,
3640                        visible_rows: state.visible_rows,
3641                        num_rows: state.num_rows,
3642                        row_start_index: state.row_start_index,
3643                        selected_row: state.table_state.selected(),
3644                    },
3645                );
3646            }
3647            let scrollable_area = Rect {
3648                x: separator_x + 1,
3649                y: area.y,
3650                width: area.width.saturating_sub(locked_width + 1),
3651                height: area.height,
3652            };
3653
3654            // Render locked columns (no background shading, just the vertical separator)
3655            if let Some(locked_df) = state.locked_df.as_ref() {
3656                // Adjust locked_area to account for row numbers if present
3657                let adjusted_locked_area = if state.row_numbers {
3658                    Rect {
3659                        x: area.x + row_num_width,
3660                        y: area.y,
3661                        width: locked_width - row_num_width,
3662                        height: area.height,
3663                    }
3664                } else {
3665                    locked_area
3666                };
3667
3668                // Slice buffer to visible portion
3669                let offset = state.start_row.saturating_sub(state.buffered_start_row);
3670                let slice_len = state
3671                    .visible_rows
3672                    .min(locked_df.height().saturating_sub(offset));
3673                if offset < locked_df.height() && slice_len > 0 {
3674                    let sliced_df = locked_df.slice(offset as i64, slice_len);
3675                    self.render_dataframe(
3676                        &sliced_df,
3677                        adjusted_locked_area,
3678                        buf,
3679                        &mut state.table_state,
3680                        false,
3681                        state.start_row,
3682                    );
3683                }
3684            }
3685
3686            // Draw vertical separator line
3687            let separator_x_adjusted = if state.row_numbers {
3688                area.x + row_num_width + (locked_width - row_num_width)
3689            } else {
3690                separator_x
3691            };
3692            for y in area.y..area.y + area.height {
3693                let cell = &mut buf[(separator_x_adjusted, y)];
3694                cell.set_char('│');
3695                cell.set_style(Style::default().fg(self.separator_fg));
3696            }
3697
3698            // Adjust scrollable area to account for row numbers
3699            let adjusted_scrollable_area = if state.row_numbers {
3700                Rect {
3701                    x: separator_x_adjusted + 1,
3702                    y: area.y,
3703                    width: area.width.saturating_sub(locked_width + 1),
3704                    height: area.height,
3705                }
3706            } else {
3707                scrollable_area
3708            };
3709
3710            // Render scrollable columns
3711            if let Some(df) = state.df.as_ref() {
3712                // Slice buffer to visible portion
3713                let offset = state.start_row.saturating_sub(state.buffered_start_row);
3714                let slice_len = state.visible_rows.min(df.height().saturating_sub(offset));
3715                if offset < df.height() && slice_len > 0 {
3716                    let sliced_df = df.slice(offset as i64, slice_len);
3717                    self.render_dataframe(
3718                        &sliced_df,
3719                        adjusted_scrollable_area,
3720                        buf,
3721                        &mut state.table_state,
3722                        false,
3723                        state.start_row,
3724                    );
3725                }
3726            }
3727        } else if let Some(df) = state.df.as_ref() {
3728            // No locked columns, render normally
3729            // If row numbers are enabled, render them first
3730            if state.row_numbers {
3731                let row_num_area = Rect {
3732                    x: area.x,
3733                    y: area.y,
3734                    width: row_num_width,
3735                    height: area.height,
3736                };
3737                self.render_row_numbers(
3738                    row_num_area,
3739                    buf,
3740                    RowNumbersParams {
3741                        start_row: state.start_row,
3742                        visible_rows: state.visible_rows,
3743                        num_rows: state.num_rows,
3744                        row_start_index: state.row_start_index,
3745                        selected_row: state.table_state.selected(),
3746                    },
3747                );
3748
3749                // Adjust data area to exclude row number column
3750                let data_area = Rect {
3751                    x: area.x + row_num_width,
3752                    y: area.y,
3753                    width: area.width.saturating_sub(row_num_width),
3754                    height: area.height,
3755                };
3756
3757                // Slice buffer to visible portion
3758                let offset = state.start_row.saturating_sub(state.buffered_start_row);
3759                let slice_len = state.visible_rows.min(df.height().saturating_sub(offset));
3760                if offset < df.height() && slice_len > 0 {
3761                    let sliced_df = df.slice(offset as i64, slice_len);
3762                    self.render_dataframe(
3763                        &sliced_df,
3764                        data_area,
3765                        buf,
3766                        &mut state.table_state,
3767                        false,
3768                        state.start_row,
3769                    );
3770                }
3771            } else {
3772                // Slice buffer to visible portion
3773                let offset = state.start_row.saturating_sub(state.buffered_start_row);
3774                let slice_len = state.visible_rows.min(df.height().saturating_sub(offset));
3775                if offset < df.height() && slice_len > 0 {
3776                    let sliced_df = df.slice(offset as i64, slice_len);
3777                    self.render_dataframe(
3778                        &sliced_df,
3779                        area,
3780                        buf,
3781                        &mut state.table_state,
3782                        false,
3783                        state.start_row,
3784                    );
3785                }
3786            }
3787        } else if !state.column_order.is_empty() {
3788            // Empty result (0 rows) but we have a schema - show empty table with header, no rows
3789            let empty_columns: Vec<_> = state
3790                .column_order
3791                .iter()
3792                .map(|name| Series::new(name.as_str().into(), Vec::<String>::new()).into())
3793                .collect();
3794            if let Ok(empty_df) = DataFrame::new(empty_columns) {
3795                if state.row_numbers {
3796                    let row_num_area = Rect {
3797                        x: area.x,
3798                        y: area.y,
3799                        width: row_num_width,
3800                        height: area.height,
3801                    };
3802                    self.render_row_numbers(
3803                        row_num_area,
3804                        buf,
3805                        RowNumbersParams {
3806                            start_row: 0,
3807                            visible_rows: state.visible_rows,
3808                            num_rows: 0,
3809                            row_start_index: state.row_start_index,
3810                            selected_row: None,
3811                        },
3812                    );
3813                    let data_area = Rect {
3814                        x: area.x + row_num_width,
3815                        y: area.y,
3816                        width: area.width.saturating_sub(row_num_width),
3817                        height: area.height,
3818                    };
3819                    self.render_dataframe(
3820                        &empty_df,
3821                        data_area,
3822                        buf,
3823                        &mut state.table_state,
3824                        false,
3825                        0,
3826                    );
3827                } else {
3828                    self.render_dataframe(&empty_df, area, buf, &mut state.table_state, false, 0);
3829                }
3830            } else {
3831                Paragraph::new("No data").render(area, buf);
3832            }
3833        } else {
3834            // Truly empty: no schema, not loaded, or blank file
3835            Paragraph::new("No data").render(area, buf);
3836        }
3837    }
3838}
3839
3840#[cfg(test)]
3841mod tests {
3842    use super::*;
3843    use crate::filter_modal::{FilterOperator, FilterStatement, LogicalOperator};
3844    use crate::pivot_melt_modal::{MeltSpec, PivotAggregation, PivotSpec};
3845
3846    fn create_test_lf() -> LazyFrame {
3847        df! (
3848            "a" => &[1, 2, 3],
3849            "b" => &["x", "y", "z"]
3850        )
3851        .unwrap()
3852        .lazy()
3853    }
3854
3855    fn create_large_test_lf() -> LazyFrame {
3856        df! (
3857            "a" => (0..100).collect::<Vec<i32>>(),
3858            "b" => (0..100).map(|i| format!("text_{}", i)).collect::<Vec<String>>(),
3859            "c" => (0..100).map(|i| i % 3).collect::<Vec<i32>>(),
3860            "d" => (0..100).map(|i| i % 5).collect::<Vec<i32>>()
3861        )
3862        .unwrap()
3863        .lazy()
3864    }
3865
3866    #[test]
3867    fn test_from_csv() {
3868        // Ensure sample data is generated before running test
3869        // Test uncompressed CSV loading
3870        let path = crate::tests::sample_data_dir().join("3-sfd-header.csv");
3871        let state = DataTableState::from_csv(&path, &Default::default()).unwrap(); // Uses default buffer params from options
3872        assert_eq!(state.schema.len(), 6); // id, integer_col, float_col, string_col, boolean_col, date_col
3873    }
3874
3875    #[test]
3876    fn test_from_csv_gzipped() {
3877        // Ensure sample data is generated before running test
3878        // Test gzipped CSV loading
3879        let path = crate::tests::sample_data_dir().join("mixed_types.csv.gz");
3880        let state = DataTableState::from_csv(&path, &Default::default()).unwrap(); // Uses default buffer params from options
3881        assert_eq!(state.schema.len(), 6); // id, integer_col, float_col, string_col, boolean_col, date_col
3882    }
3883
3884    #[test]
3885    fn test_from_parquet() {
3886        // Ensure sample data is generated before running test
3887        let path = crate::tests::sample_data_dir().join("people.parquet");
3888        let state = DataTableState::from_parquet(&path, None, None, None, None, false, 1).unwrap();
3889        assert!(!state.schema.is_empty());
3890    }
3891
3892    #[test]
3893    fn test_from_ipc() {
3894        use polars::prelude::IpcWriter;
3895        use std::io::BufWriter;
3896        let mut df = df!(
3897            "x" => &[1_i32, 2, 3],
3898            "y" => &["a", "b", "c"]
3899        )
3900        .unwrap();
3901        let dir = std::env::temp_dir();
3902        let path = dir.join("datui_test_ipc.arrow");
3903        let file = std::fs::File::create(&path).unwrap();
3904        let mut writer = BufWriter::new(file);
3905        IpcWriter::new(&mut writer).finish(&mut df).unwrap();
3906        drop(writer);
3907        let state = DataTableState::from_ipc(&path, None, None, None, None, false, 1).unwrap();
3908        assert_eq!(state.schema.len(), 2);
3909        assert!(state.schema.contains("x"));
3910        assert!(state.schema.contains("y"));
3911        let _ = std::fs::remove_file(&path);
3912    }
3913
3914    #[test]
3915    fn test_from_avro() {
3916        use polars::io::avro::AvroWriter;
3917        use std::io::BufWriter;
3918        let mut df = df!(
3919            "id" => &[1_i32, 2, 3],
3920            "name" => &["alice", "bob", "carol"]
3921        )
3922        .unwrap();
3923        let dir = std::env::temp_dir();
3924        let path = dir.join("datui_test_avro.avro");
3925        let file = std::fs::File::create(&path).unwrap();
3926        let mut writer = BufWriter::new(file);
3927        AvroWriter::new(&mut writer).finish(&mut df).unwrap();
3928        drop(writer);
3929        let state = DataTableState::from_avro(&path, None, None, None, None, false, 1).unwrap();
3930        assert_eq!(state.schema.len(), 2);
3931        assert!(state.schema.contains("id"));
3932        assert!(state.schema.contains("name"));
3933        let _ = std::fs::remove_file(&path);
3934    }
3935
3936    #[test]
3937    fn test_from_orc() {
3938        use arrow::array::{Int64Array, StringArray};
3939        use arrow::datatypes::{DataType, Field, Schema};
3940        use arrow::record_batch::RecordBatch;
3941        use orc_rust::ArrowWriterBuilder;
3942        use std::io::BufWriter;
3943        use std::sync::Arc;
3944
3945        let schema = Arc::new(Schema::new(vec![
3946            Field::new("id", DataType::Int64, false),
3947            Field::new("name", DataType::Utf8, false),
3948        ]));
3949        let id_array = Arc::new(Int64Array::from(vec![1_i64, 2, 3]));
3950        let name_array = Arc::new(StringArray::from(vec!["a", "b", "c"]));
3951        let batch = RecordBatch::try_new(schema.clone(), vec![id_array, name_array]).unwrap();
3952
3953        let dir = std::env::temp_dir();
3954        let path = dir.join("datui_test_orc.orc");
3955        let file = std::fs::File::create(&path).unwrap();
3956        let writer = BufWriter::new(file);
3957        let mut orc_writer = ArrowWriterBuilder::new(writer, schema).try_build().unwrap();
3958        orc_writer.write(&batch).unwrap();
3959        orc_writer.close().unwrap();
3960
3961        let state = DataTableState::from_orc(&path, None, None, None, None, false, 1).unwrap();
3962        assert_eq!(state.schema.len(), 2);
3963        assert!(state.schema.contains("id"));
3964        assert!(state.schema.contains("name"));
3965        let _ = std::fs::remove_file(&path);
3966    }
3967
3968    #[test]
3969    fn test_filter() {
3970        let lf = create_test_lf();
3971        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
3972        let filters = vec![FilterStatement {
3973            column: "a".to_string(),
3974            operator: FilterOperator::Gt,
3975            value: "2".to_string(),
3976            logical_op: LogicalOperator::And,
3977        }];
3978        state.filter(filters);
3979        let df = state.lf.clone().collect().unwrap();
3980        assert_eq!(df.shape().0, 1);
3981        assert_eq!(df.column("a").unwrap().get(0).unwrap(), AnyValue::Int32(3));
3982    }
3983
3984    #[test]
3985    fn test_sort() {
3986        let lf = create_test_lf();
3987        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
3988        state.sort(vec!["a".to_string()], false);
3989        let df = state.lf.clone().collect().unwrap();
3990        assert_eq!(df.column("a").unwrap().get(0).unwrap(), AnyValue::Int32(3));
3991    }
3992
3993    #[test]
3994    fn test_query() {
3995        let lf = create_test_lf();
3996        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
3997        state.query("select b where a = 2".to_string());
3998        let df = state.lf.clone().collect().unwrap();
3999        assert_eq!(df.shape(), (1, 1));
4000        assert_eq!(
4001            df.column("b").unwrap().get(0).unwrap(),
4002            AnyValue::String("y")
4003        );
4004    }
4005
4006    #[test]
4007    fn test_query_date_accessors() {
4008        use chrono::NaiveDate;
4009        let df = df!(
4010            "event_date" => [
4011                NaiveDate::from_ymd_opt(2024, 1, 15).unwrap(),
4012                NaiveDate::from_ymd_opt(2024, 6, 20).unwrap(),
4013                NaiveDate::from_ymd_opt(2024, 12, 31).unwrap(),
4014            ],
4015            "name" => &["a", "b", "c"],
4016        )
4017        .unwrap();
4018        let lf = df.lazy();
4019        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4020
4021        // Select with date accessors
4022        state.query("select name, year: event_date.year, month: event_date.month".to_string());
4023        assert!(
4024            state.error.is_none(),
4025            "query should succeed: {:?}",
4026            state.error
4027        );
4028        let df = state.lf.clone().collect().unwrap();
4029        assert_eq!(df.shape(), (3, 3));
4030        assert_eq!(
4031            df.column("year").unwrap().get(0).unwrap(),
4032            AnyValue::Int32(2024)
4033        );
4034        assert_eq!(
4035            df.column("month").unwrap().get(0).unwrap(),
4036            AnyValue::Int8(1)
4037        );
4038        assert_eq!(
4039            df.column("month").unwrap().get(1).unwrap(),
4040            AnyValue::Int8(6)
4041        );
4042
4043        // Filter with date accessor
4044        state.query("select name, event_date where event_date.month = 12".to_string());
4045        assert!(
4046            state.error.is_none(),
4047            "filter should succeed: {:?}",
4048            state.error
4049        );
4050        let df = state.lf.clone().collect().unwrap();
4051        assert_eq!(df.height(), 1);
4052        assert_eq!(
4053            df.column("name").unwrap().get(0).unwrap(),
4054            AnyValue::String("c")
4055        );
4056
4057        // Filter with YYYY.MM.DD date literal
4058        state.query("select name, event_date where event_date.date > 2024.06.15".to_string());
4059        assert!(
4060            state.error.is_none(),
4061            "date literal filter should succeed: {:?}",
4062            state.error
4063        );
4064        let df = state.lf.clone().collect().unwrap();
4065        assert_eq!(
4066            df.height(),
4067            2,
4068            "2024-06-20 and 2024-12-31 are after 2024-06-15"
4069        );
4070
4071        // String accessors: upper, lower, len, ends_with
4072        state.query(
4073            "select name, upper_name: name.upper, name_len: name.len where name.ends_with[\"c\"]"
4074                .to_string(),
4075        );
4076        assert!(
4077            state.error.is_none(),
4078            "string accessors should succeed: {:?}",
4079            state.error
4080        );
4081        let df = state.lf.clone().collect().unwrap();
4082        assert_eq!(df.height(), 1, "only 'c' ends with 'c'");
4083        assert_eq!(
4084            df.column("upper_name").unwrap().get(0).unwrap(),
4085            AnyValue::String("C")
4086        );
4087
4088        // Query that returns 0 rows: df and locked_df must be cleared for correct empty-table render
4089        state.query("select where event_date.date = 2020.01.01".to_string());
4090        assert!(state.error.is_none());
4091        assert_eq!(state.num_rows, 0);
4092        state.visible_rows = 10;
4093        state.collect();
4094        assert!(state.df.is_none(), "df must be cleared when num_rows is 0");
4095        assert!(
4096            state.locked_df.is_none(),
4097            "locked_df must be cleared when num_rows is 0"
4098        );
4099    }
4100
4101    #[test]
4102    fn test_select_next_previous() {
4103        let lf = create_large_test_lf();
4104        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4105        state.visible_rows = 10;
4106        state.table_state.select(Some(5));
4107
4108        state.select_next();
4109        assert_eq!(state.table_state.selected(), Some(6));
4110
4111        state.select_previous();
4112        assert_eq!(state.table_state.selected(), Some(5));
4113    }
4114
4115    #[test]
4116    fn test_page_up_down() {
4117        let lf = create_large_test_lf();
4118        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4119        state.visible_rows = 20;
4120        state.collect();
4121
4122        assert_eq!(state.start_row, 0);
4123        state.page_down();
4124        assert_eq!(state.start_row, 20);
4125        state.page_down();
4126        assert_eq!(state.start_row, 40);
4127        state.page_up();
4128        assert_eq!(state.start_row, 20);
4129        state.page_up();
4130        assert_eq!(state.start_row, 0);
4131    }
4132
4133    #[test]
4134    fn test_scroll_left_right() {
4135        let lf = create_large_test_lf();
4136        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4137        assert_eq!(state.termcol_index, 0);
4138        state.scroll_right();
4139        assert_eq!(state.termcol_index, 1);
4140        state.scroll_right();
4141        assert_eq!(state.termcol_index, 2);
4142        state.scroll_left();
4143        assert_eq!(state.termcol_index, 1);
4144        state.scroll_left();
4145        assert_eq!(state.termcol_index, 0);
4146    }
4147
4148    #[test]
4149    fn test_reverse() {
4150        let lf = create_test_lf();
4151        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4152        state.sort(vec!["a".to_string()], true);
4153        assert_eq!(
4154            state
4155                .lf
4156                .clone()
4157                .collect()
4158                .unwrap()
4159                .column("a")
4160                .unwrap()
4161                .get(0)
4162                .unwrap(),
4163            AnyValue::Int32(1)
4164        );
4165        state.reverse();
4166        assert_eq!(
4167            state
4168                .lf
4169                .clone()
4170                .collect()
4171                .unwrap()
4172                .column("a")
4173                .unwrap()
4174                .get(0)
4175                .unwrap(),
4176            AnyValue::Int32(3)
4177        );
4178    }
4179
4180    #[test]
4181    fn test_filter_multiple() {
4182        let lf = create_large_test_lf();
4183        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4184        let filters = vec![
4185            FilterStatement {
4186                column: "c".to_string(),
4187                operator: FilterOperator::Eq,
4188                value: "1".to_string(),
4189                logical_op: LogicalOperator::And,
4190            },
4191            FilterStatement {
4192                column: "d".to_string(),
4193                operator: FilterOperator::Eq,
4194                value: "2".to_string(),
4195                logical_op: LogicalOperator::And,
4196            },
4197        ];
4198        state.filter(filters);
4199        let df = state.lf.clone().collect().unwrap();
4200        assert_eq!(df.shape().0, 7);
4201    }
4202
4203    #[test]
4204    fn test_filter_and_sort() {
4205        let lf = create_large_test_lf();
4206        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4207        let filters = vec![FilterStatement {
4208            column: "c".to_string(),
4209            operator: FilterOperator::Eq,
4210            value: "1".to_string(),
4211            logical_op: LogicalOperator::And,
4212        }];
4213        state.filter(filters);
4214        state.sort(vec!["a".to_string()], false);
4215        let df = state.lf.clone().collect().unwrap();
4216        assert_eq!(df.column("a").unwrap().get(0).unwrap(), AnyValue::Int32(97));
4217    }
4218
4219    /// Minimal long-format data for pivot tests: id, date, key, value.
4220    /// Includes duplicates for aggregation (e.g. (1,d1,A) appears twice).
4221    fn create_pivot_long_lf() -> LazyFrame {
4222        let df = df!(
4223            "id" => &[1_i32, 1, 1, 2, 2, 2, 1, 2],
4224            "date" => &["d1", "d1", "d1", "d1", "d1", "d1", "d1", "d1"],
4225            "key" => &["A", "B", "C", "A", "B", "C", "A", "B"],
4226            "value" => &[10.0_f64, 20.0, 30.0, 40.0, 50.0, 60.0, 11.0, 51.0],
4227        )
4228        .unwrap();
4229        df.lazy()
4230    }
4231
4232    /// Wide-format data for melt tests: id, date, c1, c2, c3.
4233    fn create_melt_wide_lf() -> LazyFrame {
4234        let df = df!(
4235            "id" => &[1_i32, 2, 3],
4236            "date" => &["d1", "d2", "d3"],
4237            "c1" => &[10.0_f64, 20.0, 30.0],
4238            "c2" => &[11.0, 21.0, 31.0],
4239            "c3" => &[12.0, 22.0, 32.0],
4240        )
4241        .unwrap();
4242        df.lazy()
4243    }
4244
4245    #[test]
4246    fn test_pivot_basic() {
4247        let lf = create_pivot_long_lf();
4248        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4249        let spec = PivotSpec {
4250            index: vec!["id".to_string(), "date".to_string()],
4251            pivot_column: "key".to_string(),
4252            value_column: "value".to_string(),
4253            aggregation: PivotAggregation::Last,
4254            sort_columns: false,
4255        };
4256        state.pivot(&spec).unwrap();
4257        let df = state.lf.clone().collect().unwrap();
4258        let names: Vec<&str> = df.get_column_names().iter().map(|s| s.as_str()).collect();
4259        assert!(names.contains(&"id"));
4260        assert!(names.contains(&"date"));
4261        assert!(names.contains(&"A"));
4262        assert!(names.contains(&"B"));
4263        assert!(names.contains(&"C"));
4264        assert_eq!(df.height(), 2);
4265    }
4266
4267    #[test]
4268    fn test_pivot_aggregation_last() {
4269        let lf = create_pivot_long_lf();
4270        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4271        let spec = PivotSpec {
4272            index: vec!["id".to_string(), "date".to_string()],
4273            pivot_column: "key".to_string(),
4274            value_column: "value".to_string(),
4275            aggregation: PivotAggregation::Last,
4276            sort_columns: false,
4277        };
4278        state.pivot(&spec).unwrap();
4279        let df = state.lf.clone().collect().unwrap();
4280        let a_col = df.column("A").unwrap();
4281        let row0 = a_col.get(0).unwrap();
4282        let row1 = a_col.get(1).unwrap();
4283        assert_eq!(row0, AnyValue::Float64(11.0));
4284        assert_eq!(row1, AnyValue::Float64(40.0));
4285    }
4286
4287    #[test]
4288    fn test_pivot_aggregation_first() {
4289        let lf = create_pivot_long_lf();
4290        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4291        let spec = PivotSpec {
4292            index: vec!["id".to_string(), "date".to_string()],
4293            pivot_column: "key".to_string(),
4294            value_column: "value".to_string(),
4295            aggregation: PivotAggregation::First,
4296            sort_columns: false,
4297        };
4298        state.pivot(&spec).unwrap();
4299        let df = state.lf.clone().collect().unwrap();
4300        let a_col = df.column("A").unwrap();
4301        assert_eq!(a_col.get(0).unwrap(), AnyValue::Float64(10.0));
4302        assert_eq!(a_col.get(1).unwrap(), AnyValue::Float64(40.0));
4303    }
4304
4305    #[test]
4306    fn test_pivot_aggregation_min_max() {
4307        let lf = create_pivot_long_lf();
4308        let mut state_min = DataTableState::new(lf.clone(), None, None, None, None, true).unwrap();
4309        state_min
4310            .pivot(&PivotSpec {
4311                index: vec!["id".to_string(), "date".to_string()],
4312                pivot_column: "key".to_string(),
4313                value_column: "value".to_string(),
4314                aggregation: PivotAggregation::Min,
4315                sort_columns: false,
4316            })
4317            .unwrap();
4318        let df_min = state_min.lf.clone().collect().unwrap();
4319        assert_eq!(
4320            df_min.column("A").unwrap().get(0).unwrap(),
4321            AnyValue::Float64(10.0)
4322        );
4323
4324        let mut state_max = DataTableState::new(lf, None, None, None, None, true).unwrap();
4325        state_max
4326            .pivot(&PivotSpec {
4327                index: vec!["id".to_string(), "date".to_string()],
4328                pivot_column: "key".to_string(),
4329                value_column: "value".to_string(),
4330                aggregation: PivotAggregation::Max,
4331                sort_columns: false,
4332            })
4333            .unwrap();
4334        let df_max = state_max.lf.clone().collect().unwrap();
4335        assert_eq!(
4336            df_max.column("A").unwrap().get(0).unwrap(),
4337            AnyValue::Float64(11.0)
4338        );
4339    }
4340
4341    #[test]
4342    fn test_pivot_aggregation_avg_count() {
4343        let lf = create_pivot_long_lf();
4344        let mut state_avg = DataTableState::new(lf.clone(), None, None, None, None, true).unwrap();
4345        state_avg
4346            .pivot(&PivotSpec {
4347                index: vec!["id".to_string(), "date".to_string()],
4348                pivot_column: "key".to_string(),
4349                value_column: "value".to_string(),
4350                aggregation: PivotAggregation::Avg,
4351                sort_columns: false,
4352            })
4353            .unwrap();
4354        let df_avg = state_avg.lf.clone().collect().unwrap();
4355        let a = df_avg.column("A").unwrap().get(0).unwrap();
4356        if let AnyValue::Float64(x) = a {
4357            assert!((x - 10.5).abs() < 1e-6);
4358        } else {
4359            panic!("expected float");
4360        }
4361
4362        let mut state_count = DataTableState::new(lf, None, None, None, None, true).unwrap();
4363        state_count
4364            .pivot(&PivotSpec {
4365                index: vec!["id".to_string(), "date".to_string()],
4366                pivot_column: "key".to_string(),
4367                value_column: "value".to_string(),
4368                aggregation: PivotAggregation::Count,
4369                sort_columns: false,
4370            })
4371            .unwrap();
4372        let df_count = state_count.lf.clone().collect().unwrap();
4373        let a = df_count.column("A").unwrap().get(0).unwrap();
4374        assert_eq!(a, AnyValue::UInt32(2));
4375    }
4376
4377    #[test]
4378    fn test_pivot_string_first_last() {
4379        let df = df!(
4380            "id" => &[1_i32, 1, 2, 2],
4381            "key" => &["X", "Y", "X", "Y"],
4382            "value" => &["low", "mid", "high", "mid"],
4383        )
4384        .unwrap();
4385        let lf = df.lazy();
4386        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4387        let spec = PivotSpec {
4388            index: vec!["id".to_string()],
4389            pivot_column: "key".to_string(),
4390            value_column: "value".to_string(),
4391            aggregation: PivotAggregation::Last,
4392            sort_columns: false,
4393        };
4394        state.pivot(&spec).unwrap();
4395        let out = state.lf.clone().collect().unwrap();
4396        assert_eq!(
4397            out.column("X").unwrap().get(0).unwrap(),
4398            AnyValue::String("low")
4399        );
4400        assert_eq!(
4401            out.column("Y").unwrap().get(0).unwrap(),
4402            AnyValue::String("mid")
4403        );
4404    }
4405
4406    #[test]
4407    fn test_melt_basic() {
4408        let lf = create_melt_wide_lf();
4409        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4410        let spec = MeltSpec {
4411            index: vec!["id".to_string(), "date".to_string()],
4412            value_columns: vec!["c1".to_string(), "c2".to_string(), "c3".to_string()],
4413            variable_name: "variable".to_string(),
4414            value_name: "value".to_string(),
4415        };
4416        state.melt(&spec).unwrap();
4417        let df = state.lf.clone().collect().unwrap();
4418        assert_eq!(df.height(), 9);
4419        let names: Vec<&str> = df.get_column_names().iter().map(|s| s.as_str()).collect();
4420        assert!(names.contains(&"variable"));
4421        assert!(names.contains(&"value"));
4422        assert!(names.contains(&"id"));
4423        assert!(names.contains(&"date"));
4424    }
4425
4426    #[test]
4427    fn test_melt_all_except_index() {
4428        let lf = create_melt_wide_lf();
4429        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4430        let spec = MeltSpec {
4431            index: vec!["id".to_string(), "date".to_string()],
4432            value_columns: vec!["c1".to_string(), "c2".to_string(), "c3".to_string()],
4433            variable_name: "var".to_string(),
4434            value_name: "val".to_string(),
4435        };
4436        state.melt(&spec).unwrap();
4437        let df = state.lf.clone().collect().unwrap();
4438        assert!(df.column("var").is_ok());
4439        assert!(df.column("val").is_ok());
4440    }
4441
4442    #[test]
4443    fn test_pivot_on_current_view_after_filter() {
4444        let lf = create_pivot_long_lf();
4445        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4446        state.filter(vec![FilterStatement {
4447            column: "id".to_string(),
4448            operator: FilterOperator::Eq,
4449            value: "1".to_string(),
4450            logical_op: LogicalOperator::And,
4451        }]);
4452        let spec = PivotSpec {
4453            index: vec!["id".to_string(), "date".to_string()],
4454            pivot_column: "key".to_string(),
4455            value_column: "value".to_string(),
4456            aggregation: PivotAggregation::Last,
4457            sort_columns: false,
4458        };
4459        state.pivot(&spec).unwrap();
4460        let df = state.lf.clone().collect().unwrap();
4461        assert_eq!(df.height(), 1);
4462        let id_col = df.column("id").unwrap();
4463        assert_eq!(id_col.get(0).unwrap(), AnyValue::Int32(1));
4464    }
4465
4466    #[test]
4467    fn test_fuzzy_token_regex() {
4468        assert_eq!(fuzzy_token_regex("foo"), "(?i).*f.*o.*o.*");
4469        assert_eq!(fuzzy_token_regex("a"), "(?i).*a.*");
4470        // Regex-special characters are escaped
4471        let pat = fuzzy_token_regex("[");
4472        assert!(pat.contains("\\["));
4473    }
4474
4475    #[test]
4476    fn test_fuzzy_search() {
4477        // Filter logic is covered by test_fuzzy_search_regex_direct. This test runs the full
4478        // path through DataTableState; it requires sample data (CSV with string column).
4479        crate::tests::ensure_sample_data();
4480        let path = crate::tests::sample_data_dir().join("3-sfd-header.csv");
4481        let mut state = DataTableState::from_csv(&path, &Default::default()).unwrap();
4482        state.visible_rows = 10;
4483        state.collect();
4484        let before = state.num_rows;
4485        state.fuzzy_search("string".to_string());
4486        assert!(state.error.is_none(), "{:?}", state.error);
4487        assert!(state.num_rows <= before, "fuzzy search should filter rows");
4488        state.fuzzy_search("".to_string());
4489        state.collect();
4490        assert_eq!(state.num_rows, before, "empty fuzzy search should reset");
4491        assert!(state.get_active_fuzzy_query().is_empty());
4492    }
4493
4494    #[test]
4495    fn test_fuzzy_search_regex_direct() {
4496        // Sanity check: Polars str().contains with our regex matches "alice" for pattern ".*a.*l.*i.*"
4497        let lf = df!("name" => &["alice", "bob", "carol"]).unwrap().lazy();
4498        let pattern = fuzzy_token_regex("alice");
4499        let out = lf
4500            .filter(col("name").str().contains(lit(pattern.clone()), false))
4501            .collect()
4502            .unwrap();
4503        assert_eq!(out.height(), 1, "regex {:?} should match alice", pattern);
4504
4505        // Two columns OR (as in fuzzy_search)
4506        let lf2 = df!(
4507            "id" => &[1i32, 2, 3],
4508            "name" => &["alice", "bob", "carol"],
4509            "city" => &["NYC", "LA", "Boston"]
4510        )
4511        .unwrap()
4512        .lazy();
4513        let pat = fuzzy_token_regex("alice");
4514        let expr = col("name")
4515            .str()
4516            .contains(lit(pat.clone()), false)
4517            .or(col("city").str().contains(lit(pat), false));
4518        let out2 = lf2.clone().filter(expr).collect().unwrap();
4519        assert_eq!(out2.height(), 1);
4520
4521        // Replicate exact fuzzy_search logic: schema from original_lf, string_cols, then filter
4522        let schema = lf2.clone().collect_schema().unwrap();
4523        let string_cols: Vec<String> = schema
4524            .iter()
4525            .filter(|(_, dtype)| dtype.is_string())
4526            .map(|(name, _)| name.to_string())
4527            .collect();
4528        assert!(
4529            !string_cols.is_empty(),
4530            "df! string cols should be detected"
4531        );
4532        let pattern = fuzzy_token_regex("alice");
4533        let token_expr = string_cols
4534            .iter()
4535            .map(|c| col(c.as_str()).str().contains(lit(pattern.clone()), false))
4536            .reduce(|a, b| a.or(b))
4537            .unwrap();
4538        let out3 = lf2.filter(token_expr).collect().unwrap();
4539        assert_eq!(
4540            out3.height(),
4541            1,
4542            "fuzzy_search-style filter should match 1 row"
4543        );
4544    }
4545
4546    #[test]
4547    fn test_fuzzy_search_no_string_columns() {
4548        let lf = df!("a" => &[1i32, 2, 3], "b" => &[10i64, 20, 30])
4549            .unwrap()
4550            .lazy();
4551        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4552        state.fuzzy_search("x".to_string());
4553        assert!(state.error.is_some());
4554    }
4555}