Skip to main content

datui_lib/widgets/
datatable.rs

1use color_eyre::Result;
2use std::borrow::Cow;
3use std::collections::HashSet;
4use std::sync::Arc;
5use std::{fs, fs::File, path::Path};
6
7use polars::io::HiveOptions;
8use polars::prelude::*;
9use ratatui::{
10    buffer::Buffer,
11    layout::Rect,
12    style::{Color, Modifier, Style},
13    text::{Line, Span},
14    widgets::{
15        Block, Borders, Cell, Padding, Paragraph, Row, StatefulWidget, Table, TableState, Widget,
16    },
17};
18
19use crate::error_display::user_message_from_polars;
20use crate::filter_modal::{FilterOperator, FilterStatement, LogicalOperator};
21use crate::pivot_melt_modal::{MeltSpec, PivotAggregation, PivotSpec};
22use crate::query::parse_query;
23use crate::statistics::collect_lazy;
24use crate::{CompressionFormat, OpenOptions, ParseStringsTarget};
25use polars::io::csv::read::NullValues;
26use polars::lazy::frame::pivot::pivot_stable;
27use polars::prelude::StrptimeOptions;
28use std::io::{BufReader, Read};
29
30use calamine::{open_workbook_auto, Data, Reader};
31use chrono::{NaiveDate, NaiveDateTime, NaiveTime};
32use orc_rust::ArrowReaderBuilder;
33use tempfile::NamedTempFile;
34
35use arrow::array::types::{
36    Date32Type, Date64Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type,
37    TimestampMillisecondType, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
38};
39use arrow::array::{Array, AsArray};
40use arrow::record_batch::RecordBatch;
41
42fn pivot_agg_expr(agg: PivotAggregation) -> Result<Expr> {
43    let e = col(PlSmallStr::from_static(""));
44    let expr = match agg {
45        PivotAggregation::Last => e.last(),
46        PivotAggregation::First => e.first(),
47        PivotAggregation::Min => e.min(),
48        PivotAggregation::Max => e.max(),
49        PivotAggregation::Avg => e.mean(),
50        PivotAggregation::Med => e.median(),
51        PivotAggregation::Std => e.std(1),
52        PivotAggregation::Count => e.len(),
53    };
54    Ok(expr)
55}
56
57pub struct DataTableState {
58    pub lf: LazyFrame,
59    original_lf: LazyFrame,
60    df: Option<DataFrame>,        // Scrollable columns dataframe
61    locked_df: Option<DataFrame>, // Locked columns dataframe
62    pub table_state: TableState,
63    pub start_row: usize,
64    pub visible_rows: usize,
65    pub termcol_index: usize,
66    pub visible_termcols: usize,
67    pub error: Option<PolarsError>,
68    pub suppress_error_display: bool, // When true, don't show errors in main view (e.g., when query input is active)
69    pub schema: Arc<Schema>,
70    pub num_rows: usize,
71    /// When true, collect() skips the len() query.
72    num_rows_valid: bool,
73    filters: Vec<FilterStatement>,
74    sort_columns: Vec<String>,
75    sort_ascending: bool,
76    pub active_query: String,
77    /// Last executed SQL (Sql tab). Independent from active_query; only one applies to current view.
78    pub active_sql_query: String,
79    /// Last executed fuzzy search (Fuzzy tab). Independent from active_query/active_sql_query.
80    pub active_fuzzy_query: String,
81    column_order: Vec<String>,   // Order of columns for display
82    locked_columns_count: usize, // Number of locked columns (from left)
83    grouped_lf: Option<LazyFrame>,
84    drilled_down_group_index: Option<usize>, // Index of the group we're viewing
85    pub drilled_down_group_key: Option<Vec<String>>, // Key values of the drilled down group
86    pub drilled_down_group_key_columns: Option<Vec<String>>, // Key column names of the drilled down group
87    pages_lookahead: usize,
88    pages_lookback: usize,
89    max_buffered_rows: usize, // 0 = no limit
90    max_buffered_mb: usize,   // 0 = no limit
91    buffered_start_row: usize,
92    buffered_end_row: usize,
93    /// Full buffered DataFrame (all columns in column_order) for the current buffer range.
94    /// When set, column scroll (scroll_left/scroll_right) only re-slices columns without re-collecting from LazyFrame.
95    buffered_df: Option<DataFrame>,
96    proximity_threshold: usize,
97    row_numbers: bool,
98    row_start_index: usize,
99    /// Last applied pivot spec, if current lf is result of a pivot. Used for templates.
100    last_pivot_spec: Option<PivotSpec>,
101    /// Last applied melt spec, if current lf is result of a melt. Used for templates.
102    last_melt_spec: Option<MeltSpec>,
103    /// When set, dataset was loaded with hive partitioning; partition column names for Info panel and predicate pushdown.
104    pub partition_columns: Option<Vec<String>>,
105    /// When set, decompressed CSV was written to this temp file; kept alive so the file exists for lazy scan.
106    decompress_temp_file: Option<NamedTempFile>,
107    /// When true, use Polars streaming engine for LazyFrame collect when the streaming feature is enabled.
108    pub polars_streaming: bool,
109    /// When true, cast Date/Datetime pivot index columns to Int32 before pivot (workaround for Polars 0.52).
110    workaround_pivot_date_index: bool,
111}
112
113/// Inferred type for an Excel column (preserves numbers, bools, dates; avoids stringifying).
114#[derive(Clone, Copy)]
115enum ExcelColType {
116    Int64,
117    Float64,
118    Boolean,
119    Utf8,
120    Date,
121    Datetime,
122}
123
124impl DataTableState {
125    pub fn new(
126        lf: LazyFrame,
127        pages_lookahead: Option<usize>,
128        pages_lookback: Option<usize>,
129        max_buffered_rows: Option<usize>,
130        max_buffered_mb: Option<usize>,
131        polars_streaming: bool,
132    ) -> Result<Self> {
133        let schema = lf.clone().collect_schema()?;
134        let column_order: Vec<String> = schema.iter_names().map(|s| s.to_string()).collect();
135        Ok(Self {
136            original_lf: lf.clone(),
137            lf,
138            df: None,
139            locked_df: None,
140            table_state: TableState::default(),
141            start_row: 0,
142            visible_rows: 0,
143            termcol_index: 0,
144            visible_termcols: 0,
145            error: None,
146            suppress_error_display: false,
147            schema,
148            num_rows: 0,
149            num_rows_valid: false,
150            filters: Vec::new(),
151            sort_columns: Vec::new(),
152            sort_ascending: true,
153            active_query: String::new(),
154            active_sql_query: String::new(),
155            active_fuzzy_query: String::new(),
156            column_order,
157            locked_columns_count: 0,
158            grouped_lf: None,
159            drilled_down_group_index: None,
160            drilled_down_group_key: None,
161            drilled_down_group_key_columns: None,
162            pages_lookahead: pages_lookahead.unwrap_or(3),
163            pages_lookback: pages_lookback.unwrap_or(3),
164            max_buffered_rows: max_buffered_rows.unwrap_or(100_000),
165            max_buffered_mb: max_buffered_mb.unwrap_or(512),
166            buffered_start_row: 0,
167            buffered_end_row: 0,
168            buffered_df: None,
169            proximity_threshold: 0, // Will be set when visible_rows is known
170            row_numbers: false,     // Will be set from options
171            row_start_index: 1,     // Will be set from options
172            last_pivot_spec: None,
173            last_melt_spec: None,
174            partition_columns: None,
175            decompress_temp_file: None,
176            polars_streaming,
177            workaround_pivot_date_index: true,
178        })
179    }
180
181    /// Create state from an existing LazyFrame (e.g. from Python or in-memory). Uses OpenOptions for display/buffer settings.
182    pub fn from_lazyframe(lf: LazyFrame, options: &crate::OpenOptions) -> Result<Self> {
183        let mut state = Self::new(
184            lf,
185            options.pages_lookahead,
186            options.pages_lookback,
187            options.max_buffered_rows,
188            options.max_buffered_mb,
189            options.polars_streaming,
190        )?;
191        state.row_numbers = options.row_numbers;
192        state.row_start_index = options.row_start_index;
193        state.workaround_pivot_date_index = options.workaround_pivot_date_index;
194        Ok(state)
195    }
196
197    /// Create state from a pre-collected schema and LazyFrame (for phased loading). Does not call collect_schema();
198    /// df is None so the UI can render headers while the first collect() runs.
199    /// When `partition_columns` is Some (e.g. hive), column order is partition cols first.
200    pub fn from_schema_and_lazyframe(
201        schema: Arc<Schema>,
202        lf: LazyFrame,
203        options: &crate::OpenOptions,
204        partition_columns: Option<Vec<String>>,
205    ) -> Result<Self> {
206        let column_order: Vec<String> = if let Some(ref part) = partition_columns {
207            let part_set: HashSet<&str> = part.iter().map(String::as_str).collect();
208            let rest: Vec<String> = schema
209                .iter_names()
210                .map(|s| s.to_string())
211                .filter(|c| !part_set.contains(c.as_str()))
212                .collect();
213            part.iter().cloned().chain(rest).collect()
214        } else {
215            schema.iter_names().map(|s| s.to_string()).collect()
216        };
217        Ok(Self {
218            original_lf: lf.clone(),
219            lf,
220            df: None,
221            locked_df: None,
222            table_state: TableState::default(),
223            start_row: 0,
224            visible_rows: 0,
225            termcol_index: 0,
226            visible_termcols: 0,
227            error: None,
228            suppress_error_display: false,
229            schema,
230            num_rows: 0,
231            num_rows_valid: false,
232            filters: Vec::new(),
233            sort_columns: Vec::new(),
234            sort_ascending: true,
235            active_query: String::new(),
236            active_sql_query: String::new(),
237            active_fuzzy_query: String::new(),
238            column_order,
239            locked_columns_count: 0,
240            grouped_lf: None,
241            drilled_down_group_index: None,
242            drilled_down_group_key: None,
243            drilled_down_group_key_columns: None,
244            pages_lookahead: options.pages_lookahead.unwrap_or(3),
245            pages_lookback: options.pages_lookback.unwrap_or(3),
246            max_buffered_rows: options.max_buffered_rows.unwrap_or(100_000),
247            max_buffered_mb: options.max_buffered_mb.unwrap_or(512),
248            buffered_start_row: 0,
249            buffered_end_row: 0,
250            buffered_df: None,
251            proximity_threshold: 0,
252            row_numbers: options.row_numbers,
253            row_start_index: options.row_start_index,
254            last_pivot_spec: None,
255            last_melt_spec: None,
256            partition_columns,
257            decompress_temp_file: None,
258            polars_streaming: options.polars_streaming,
259            workaround_pivot_date_index: options.workaround_pivot_date_index,
260        })
261    }
262
263    /// Reset LazyFrame and view state to original_lf. Schema is re-fetched so it matches
264    /// after a previous query/SQL that may have changed columns. Caller should call
265    /// collect() afterward if display update is needed (reset/query/fuzzy do; sql_query
266    /// relies on event loop Collect).
267    fn reset_lf_to_original(&mut self) {
268        self.invalidate_num_rows();
269        self.lf = self.original_lf.clone();
270        self.schema = self
271            .original_lf
272            .clone()
273            .collect_schema()
274            .unwrap_or_else(|_| Arc::new(Schema::with_capacity(0)));
275        self.column_order = self.schema.iter_names().map(|s| s.to_string()).collect();
276        self.active_query.clear();
277        self.active_sql_query.clear();
278        self.active_fuzzy_query.clear();
279        self.locked_columns_count = 0;
280        self.filters.clear();
281        self.sort_columns.clear();
282        self.sort_ascending = true;
283        self.start_row = 0;
284        self.termcol_index = 0;
285        self.drilled_down_group_index = None;
286        self.drilled_down_group_key = None;
287        self.drilled_down_group_key_columns = None;
288        self.grouped_lf = None;
289        self.buffered_start_row = 0;
290        self.buffered_end_row = 0;
291        self.buffered_df = None;
292        self.table_state.select(Some(0));
293    }
294
295    pub fn reset(&mut self) {
296        self.reset_lf_to_original();
297        self.error = None;
298        self.suppress_error_display = false;
299        self.last_pivot_spec = None;
300        self.last_melt_spec = None;
301        self.collect();
302        if self.num_rows > 0 {
303            self.start_row = 0;
304        }
305    }
306
307    pub fn from_parquet(
308        path: &Path,
309        pages_lookahead: Option<usize>,
310        pages_lookback: Option<usize>,
311        max_buffered_rows: Option<usize>,
312        max_buffered_mb: Option<usize>,
313        row_numbers: bool,
314        row_start_index: usize,
315    ) -> Result<Self> {
316        let path_str = path.as_os_str().to_string_lossy();
317        let is_glob = path_str.contains('*');
318        let pl_path = PlPath::Local(Arc::from(path));
319        let args = ScanArgsParquet {
320            glob: is_glob,
321            ..Default::default()
322        };
323        let lf = LazyFrame::scan_parquet(pl_path, args)?;
324        let mut state = Self::new(
325            lf,
326            pages_lookahead,
327            pages_lookback,
328            max_buffered_rows,
329            max_buffered_mb,
330            true,
331        )?;
332        state.row_numbers = row_numbers;
333        state.row_start_index = row_start_index;
334        Ok(state)
335    }
336
337    /// Load multiple Parquet files and concatenate them into one LazyFrame (same schema assumed).
338    pub fn from_parquet_paths(
339        paths: &[impl AsRef<Path>],
340        pages_lookahead: Option<usize>,
341        pages_lookback: Option<usize>,
342        max_buffered_rows: Option<usize>,
343        max_buffered_mb: Option<usize>,
344        row_numbers: bool,
345        row_start_index: usize,
346    ) -> Result<Self> {
347        if paths.is_empty() {
348            return Err(color_eyre::eyre::eyre!("No paths provided"));
349        }
350        if paths.len() == 1 {
351            return Self::from_parquet(
352                paths[0].as_ref(),
353                pages_lookahead,
354                pages_lookback,
355                max_buffered_rows,
356                max_buffered_mb,
357                row_numbers,
358                row_start_index,
359            );
360        }
361        let mut lazy_frames = Vec::with_capacity(paths.len());
362        for p in paths {
363            let pl_path = PlPath::Local(Arc::from(p.as_ref()));
364            let lf = LazyFrame::scan_parquet(pl_path, Default::default())?;
365            lazy_frames.push(lf);
366        }
367        let lf = polars::prelude::concat(lazy_frames.as_slice(), Default::default())?;
368        let mut state = Self::new(
369            lf,
370            pages_lookahead,
371            pages_lookback,
372            max_buffered_rows,
373            max_buffered_mb,
374            true,
375        )?;
376        state.row_numbers = row_numbers;
377        state.row_start_index = row_start_index;
378        Ok(state)
379    }
380
381    /// Load a single Arrow IPC / Feather v2 file (lazy).
382    pub fn from_ipc(
383        path: &Path,
384        pages_lookahead: Option<usize>,
385        pages_lookback: Option<usize>,
386        max_buffered_rows: Option<usize>,
387        max_buffered_mb: Option<usize>,
388        row_numbers: bool,
389        row_start_index: usize,
390    ) -> Result<Self> {
391        let pl_path = PlPath::Local(Arc::from(path));
392        let lf = LazyFrame::scan_ipc(pl_path, Default::default(), Default::default())?;
393        let mut state = Self::new(
394            lf,
395            pages_lookahead,
396            pages_lookback,
397            max_buffered_rows,
398            max_buffered_mb,
399            true,
400        )?;
401        state.row_numbers = row_numbers;
402        state.row_start_index = row_start_index;
403        Ok(state)
404    }
405
406    /// Load multiple Arrow IPC / Feather files and concatenate into one LazyFrame.
407    pub fn from_ipc_paths(
408        paths: &[impl AsRef<Path>],
409        pages_lookahead: Option<usize>,
410        pages_lookback: Option<usize>,
411        max_buffered_rows: Option<usize>,
412        max_buffered_mb: Option<usize>,
413        row_numbers: bool,
414        row_start_index: usize,
415    ) -> Result<Self> {
416        if paths.is_empty() {
417            return Err(color_eyre::eyre::eyre!("No paths provided"));
418        }
419        if paths.len() == 1 {
420            return Self::from_ipc(
421                paths[0].as_ref(),
422                pages_lookahead,
423                pages_lookback,
424                max_buffered_rows,
425                max_buffered_mb,
426                row_numbers,
427                row_start_index,
428            );
429        }
430        let mut lazy_frames = Vec::with_capacity(paths.len());
431        for p in paths {
432            let pl_path = PlPath::Local(Arc::from(p.as_ref()));
433            let lf = LazyFrame::scan_ipc(pl_path, Default::default(), Default::default())?;
434            lazy_frames.push(lf);
435        }
436        let lf = polars::prelude::concat(lazy_frames.as_slice(), Default::default())?;
437        let mut state = Self::new(
438            lf,
439            pages_lookahead,
440            pages_lookback,
441            max_buffered_rows,
442            max_buffered_mb,
443            true,
444        )?;
445        state.row_numbers = row_numbers;
446        state.row_start_index = row_start_index;
447        Ok(state)
448    }
449
450    /// Load a single Avro file (eager read, then lazy).
451    pub fn from_avro(
452        path: &Path,
453        pages_lookahead: Option<usize>,
454        pages_lookback: Option<usize>,
455        max_buffered_rows: Option<usize>,
456        max_buffered_mb: Option<usize>,
457        row_numbers: bool,
458        row_start_index: usize,
459    ) -> Result<Self> {
460        let file = File::open(path)?;
461        let df = polars::io::avro::AvroReader::new(file).finish()?;
462        let lf = df.lazy();
463        let mut state = Self::new(
464            lf,
465            pages_lookahead,
466            pages_lookback,
467            max_buffered_rows,
468            max_buffered_mb,
469            true,
470        )?;
471        state.row_numbers = row_numbers;
472        state.row_start_index = row_start_index;
473        Ok(state)
474    }
475
476    /// Load multiple Avro files and concatenate into one LazyFrame.
477    pub fn from_avro_paths(
478        paths: &[impl AsRef<Path>],
479        pages_lookahead: Option<usize>,
480        pages_lookback: Option<usize>,
481        max_buffered_rows: Option<usize>,
482        max_buffered_mb: Option<usize>,
483        row_numbers: bool,
484        row_start_index: usize,
485    ) -> Result<Self> {
486        if paths.is_empty() {
487            return Err(color_eyre::eyre::eyre!("No paths provided"));
488        }
489        if paths.len() == 1 {
490            return Self::from_avro(
491                paths[0].as_ref(),
492                pages_lookahead,
493                pages_lookback,
494                max_buffered_rows,
495                max_buffered_mb,
496                row_numbers,
497                row_start_index,
498            );
499        }
500        let mut lazy_frames = Vec::with_capacity(paths.len());
501        for p in paths {
502            let file = File::open(p.as_ref())?;
503            let df = polars::io::avro::AvroReader::new(file).finish()?;
504            lazy_frames.push(df.lazy());
505        }
506        let lf = polars::prelude::concat(lazy_frames.as_slice(), Default::default())?;
507        let mut state = Self::new(
508            lf,
509            pages_lookahead,
510            pages_lookback,
511            max_buffered_rows,
512            max_buffered_mb,
513            true,
514        )?;
515        state.row_numbers = row_numbers;
516        state.row_start_index = row_start_index;
517        Ok(state)
518    }
519
520    /// Load a single Excel file (xls, xlsx, xlsm, xlsb) using calamine (eager read, then lazy).
521    /// Sheet is selected by 0-based index or name via `excel_sheet`.
522    #[allow(clippy::too_many_arguments)]
523    pub fn from_excel(
524        path: &Path,
525        pages_lookahead: Option<usize>,
526        pages_lookback: Option<usize>,
527        max_buffered_rows: Option<usize>,
528        max_buffered_mb: Option<usize>,
529        row_numbers: bool,
530        row_start_index: usize,
531        excel_sheet: Option<&str>,
532    ) -> Result<Self> {
533        let mut workbook =
534            open_workbook_auto(path).map_err(|e| color_eyre::eyre::eyre!("Excel: {}", e))?;
535        let sheet_names = workbook.sheet_names().to_vec();
536        if sheet_names.is_empty() {
537            return Err(color_eyre::eyre::eyre!("Excel file has no worksheets"));
538        }
539        let range = if let Some(sheet_sel) = excel_sheet {
540            if let Ok(idx) = sheet_sel.parse::<usize>() {
541                workbook
542                    .worksheet_range_at(idx)
543                    .ok_or_else(|| color_eyre::eyre::eyre!("Excel: no sheet at index {}", idx))?
544                    .map_err(|e| color_eyre::eyre::eyre!("Excel: {}", e))?
545            } else {
546                workbook
547                    .worksheet_range(sheet_sel)
548                    .map_err(|e| color_eyre::eyre::eyre!("Excel: {}", e))?
549            }
550        } else {
551            workbook
552                .worksheet_range_at(0)
553                .ok_or_else(|| color_eyre::eyre::eyre!("Excel: no first sheet"))?
554                .map_err(|e| color_eyre::eyre::eyre!("Excel: {}", e))?
555        };
556        let rows: Vec<Vec<Data>> = range.rows().map(|r| r.to_vec()).collect();
557        if rows.is_empty() {
558            let empty_df = DataFrame::new(vec![])?;
559            let mut state = Self::new(
560                empty_df.lazy(),
561                pages_lookahead,
562                pages_lookback,
563                max_buffered_rows,
564                max_buffered_mb,
565                true,
566            )?;
567            state.row_numbers = row_numbers;
568            state.row_start_index = row_start_index;
569            return Ok(state);
570        }
571        let headers: Vec<String> = rows[0]
572            .iter()
573            .map(|c| calamine::DataType::as_string(c).unwrap_or_else(|| c.to_string()))
574            .collect();
575        let n_cols = headers.len();
576        let mut series_vec = Vec::with_capacity(n_cols);
577        for (col_idx, header) in headers.iter().enumerate() {
578            let col_cells: Vec<Option<&Data>> =
579                rows[1..].iter().map(|row| row.get(col_idx)).collect();
580            let inferred = Self::excel_infer_column_type(&col_cells);
581            let name = if header.is_empty() {
582                format!("column_{}", col_idx + 1)
583            } else {
584                header.clone()
585            };
586            let series = Self::excel_column_to_series(name.as_str(), &col_cells, inferred)?;
587            series_vec.push(series.into());
588        }
589        let df = DataFrame::new(series_vec)?;
590        let mut state = Self::new(
591            df.lazy(),
592            pages_lookahead,
593            pages_lookback,
594            max_buffered_rows,
595            max_buffered_mb,
596            true,
597        )?;
598        state.row_numbers = row_numbers;
599        state.row_start_index = row_start_index;
600        Ok(state)
601    }
602
603    /// Infers column type: prefers Int64 for whole-number floats; infers Date/Datetime for
604    /// calamine DateTime/DateTimeIso or for string columns that parse as ISO date/datetime.
605    fn excel_infer_column_type(cells: &[Option<&Data>]) -> ExcelColType {
606        use calamine::DataType as CalamineTrait;
607        let mut has_string = false;
608        let mut has_float = false;
609        let mut has_int = false;
610        let mut has_bool = false;
611        let mut has_datetime = false;
612        for cell in cells.iter().flatten() {
613            if CalamineTrait::is_string(*cell) {
614                has_string = true;
615                break;
616            }
617            if CalamineTrait::is_float(*cell)
618                || CalamineTrait::is_datetime(*cell)
619                || CalamineTrait::is_datetime_iso(*cell)
620            {
621                has_float = true;
622            }
623            if CalamineTrait::is_int(*cell) {
624                has_int = true;
625            }
626            if CalamineTrait::is_bool(*cell) {
627                has_bool = true;
628            }
629            if CalamineTrait::is_datetime(*cell) || CalamineTrait::is_datetime_iso(*cell) {
630                has_datetime = true;
631            }
632        }
633        if has_string {
634            let any_parsed = cells
635                .iter()
636                .flatten()
637                .any(|c| Self::excel_cell_to_naive_datetime(c).is_some());
638            let all_non_empty_parse = cells.iter().flatten().all(|c| {
639                CalamineTrait::is_empty(*c) || Self::excel_cell_to_naive_datetime(c).is_some()
640            });
641            if any_parsed && all_non_empty_parse {
642                if Self::excel_parsed_cells_all_midnight(cells) {
643                    ExcelColType::Date
644                } else {
645                    ExcelColType::Datetime
646                }
647            } else {
648                ExcelColType::Utf8
649            }
650        } else if has_int {
651            ExcelColType::Int64
652        } else if has_datetime {
653            if Self::excel_parsed_cells_all_midnight(cells) {
654                ExcelColType::Date
655            } else {
656                ExcelColType::Datetime
657            }
658        } else if has_float {
659            let all_whole = cells.iter().flatten().all(|cell| {
660                cell.as_f64()
661                    .is_none_or(|f| f.is_finite() && (f - f.trunc()).abs() < 1e-10)
662            });
663            if all_whole {
664                ExcelColType::Int64
665            } else {
666                ExcelColType::Float64
667            }
668        } else if has_bool {
669            ExcelColType::Boolean
670        } else {
671            ExcelColType::Utf8
672        }
673    }
674
675    /// True if every cell that parses as datetime has time 00:00:00.
676    fn excel_parsed_cells_all_midnight(cells: &[Option<&Data>]) -> bool {
677        let midnight = NaiveTime::from_hms_opt(0, 0, 0).expect("valid time");
678        cells
679            .iter()
680            .flatten()
681            .filter_map(|c| Self::excel_cell_to_naive_datetime(c))
682            .all(|dt| dt.time() == midnight)
683    }
684
685    /// Converts a calamine cell to NaiveDateTime (Excel serial, DateTimeIso, or parseable string).
686    fn excel_cell_to_naive_datetime(cell: &Data) -> Option<NaiveDateTime> {
687        use calamine::DataType;
688        if let Some(dt) = cell.as_datetime() {
689            return Some(dt);
690        }
691        let s = cell.get_datetime_iso().or_else(|| cell.get_string())?;
692        Self::parse_naive_datetime_str(s)
693    }
694
695    /// Parses an ISO-style date/datetime string; tries FORMATS in order.
696    fn parse_naive_datetime_str(s: &str) -> Option<NaiveDateTime> {
697        let s = s.trim();
698        if s.is_empty() {
699            return None;
700        }
701        const FORMATS: &[&str] = &[
702            "%Y-%m-%dT%H:%M:%S%.f",
703            "%Y-%m-%dT%H:%M:%S",
704            "%Y-%m-%d %H:%M:%S%.f",
705            "%Y-%m-%d %H:%M:%S",
706            "%Y-%m-%d",
707        ];
708        for fmt in FORMATS {
709            if let Ok(dt) = NaiveDateTime::parse_from_str(s, fmt) {
710                return Some(dt);
711            }
712        }
713        if let Ok(d) = NaiveDate::parse_from_str(s, "%Y-%m-%d") {
714            return Some(d.and_hms_opt(0, 0, 0).expect("midnight"));
715        }
716        None
717    }
718
719    /// Build a Polars Series from a column of calamine cells using the inferred type.
720    fn excel_column_to_series(
721        name: &str,
722        cells: &[Option<&Data>],
723        col_type: ExcelColType,
724    ) -> Result<Series> {
725        use calamine::DataType as CalamineTrait;
726        use polars::datatypes::TimeUnit;
727        let series = match col_type {
728            ExcelColType::Int64 => {
729                let v: Vec<Option<i64>> = cells
730                    .iter()
731                    .map(|c| c.and_then(|cell| cell.as_i64()))
732                    .collect();
733                Series::new(name.into(), v)
734            }
735            ExcelColType::Float64 => {
736                let v: Vec<Option<f64>> = cells
737                    .iter()
738                    .map(|c| c.and_then(|cell| cell.as_f64()))
739                    .collect();
740                Series::new(name.into(), v)
741            }
742            ExcelColType::Boolean => {
743                let v: Vec<Option<bool>> = cells
744                    .iter()
745                    .map(|c| c.and_then(|cell| cell.get_bool()))
746                    .collect();
747                Series::new(name.into(), v)
748            }
749            ExcelColType::Utf8 => {
750                let v: Vec<Option<String>> = cells
751                    .iter()
752                    .map(|c| c.and_then(|cell| cell.as_string()))
753                    .collect();
754                Series::new(name.into(), v)
755            }
756            ExcelColType::Date => {
757                let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).expect("valid date");
758                let v: Vec<Option<i32>> = cells
759                    .iter()
760                    .map(|c| {
761                        c.and_then(Self::excel_cell_to_naive_datetime)
762                            .map(|dt| (dt.date() - epoch).num_days() as i32)
763                    })
764                    .collect();
765                Series::new(name.into(), v).cast(&DataType::Date)?
766            }
767            ExcelColType::Datetime => {
768                let v: Vec<Option<i64>> = cells
769                    .iter()
770                    .map(|c| {
771                        c.and_then(Self::excel_cell_to_naive_datetime)
772                            .map(|dt| dt.and_utc().timestamp_micros())
773                    })
774                    .collect();
775                Series::new(name.into(), v)
776                    .cast(&DataType::Datetime(TimeUnit::Microseconds, None))?
777            }
778        };
779        Ok(series)
780    }
781
782    /// Load a single ORC file (eager read via orc-rust → Arrow, then convert to Polars, then lazy).
783    /// ORC is read fully into memory; see loading-data docs for large-file notes.
784    pub fn from_orc(
785        path: &Path,
786        pages_lookahead: Option<usize>,
787        pages_lookback: Option<usize>,
788        max_buffered_rows: Option<usize>,
789        max_buffered_mb: Option<usize>,
790        row_numbers: bool,
791        row_start_index: usize,
792    ) -> Result<Self> {
793        let file = File::open(path)?;
794        let reader = ArrowReaderBuilder::try_new(file)
795            .map_err(|e| color_eyre::eyre::eyre!("ORC: {}", e))?
796            .build();
797        let batches: Vec<RecordBatch> = reader
798            .collect::<std::result::Result<Vec<_>, _>>()
799            .map_err(|e| color_eyre::eyre::eyre!("ORC: {}", e))?;
800        let df = Self::arrow_record_batches_to_dataframe(&batches)?;
801        let lf = df.lazy();
802        let mut state = Self::new(
803            lf,
804            pages_lookahead,
805            pages_lookback,
806            max_buffered_rows,
807            max_buffered_mb,
808            true,
809        )?;
810        state.row_numbers = row_numbers;
811        state.row_start_index = row_start_index;
812        Ok(state)
813    }
814
815    /// Load multiple ORC files and concatenate into one LazyFrame.
816    pub fn from_orc_paths(
817        paths: &[impl AsRef<Path>],
818        pages_lookahead: Option<usize>,
819        pages_lookback: Option<usize>,
820        max_buffered_rows: Option<usize>,
821        max_buffered_mb: Option<usize>,
822        row_numbers: bool,
823        row_start_index: usize,
824    ) -> Result<Self> {
825        if paths.is_empty() {
826            return Err(color_eyre::eyre::eyre!("No paths provided"));
827        }
828        if paths.len() == 1 {
829            return Self::from_orc(
830                paths[0].as_ref(),
831                pages_lookahead,
832                pages_lookback,
833                max_buffered_rows,
834                max_buffered_mb,
835                row_numbers,
836                row_start_index,
837            );
838        }
839        let mut lazy_frames = Vec::with_capacity(paths.len());
840        for p in paths {
841            let file = File::open(p.as_ref())?;
842            let reader = ArrowReaderBuilder::try_new(file)
843                .map_err(|e| color_eyre::eyre::eyre!("ORC: {}", e))?
844                .build();
845            let batches: Vec<RecordBatch> = reader
846                .collect::<std::result::Result<Vec<_>, _>>()
847                .map_err(|e| color_eyre::eyre::eyre!("ORC: {}", e))?;
848            let df = Self::arrow_record_batches_to_dataframe(&batches)?;
849            lazy_frames.push(df.lazy());
850        }
851        let lf = polars::prelude::concat(lazy_frames.as_slice(), Default::default())?;
852        let mut state = Self::new(
853            lf,
854            pages_lookahead,
855            pages_lookback,
856            max_buffered_rows,
857            max_buffered_mb,
858            true,
859        )?;
860        state.row_numbers = row_numbers;
861        state.row_start_index = row_start_index;
862        Ok(state)
863    }
864
865    /// Convert Arrow (arrow crate 57) RecordBatches to Polars DataFrame by value (ORC uses
866    /// arrow 57; Polars uses polars-arrow, so we cannot use Series::from_arrow).
867    fn arrow_record_batches_to_dataframe(batches: &[RecordBatch]) -> Result<DataFrame> {
868        if batches.is_empty() {
869            return Ok(DataFrame::new(vec![])?);
870        }
871        let mut all_dfs = Vec::with_capacity(batches.len());
872        for batch in batches {
873            let n_cols = batch.num_columns();
874            let schema = batch.schema();
875            let mut series_vec = Vec::with_capacity(n_cols);
876            for (i, col) in batch.columns().iter().enumerate() {
877                let name = schema.field(i).name().as_str();
878                let s = Self::arrow_array_to_polars_series(name, col)?;
879                series_vec.push(s.into());
880            }
881            let df = DataFrame::new(series_vec)?;
882            all_dfs.push(df);
883        }
884        let mut out = all_dfs.remove(0);
885        for df in all_dfs {
886            out = out.vstack(&df)?;
887        }
888        Ok(out)
889    }
890
891    fn arrow_array_to_polars_series(name: &str, array: &dyn Array) -> Result<Series> {
892        use arrow::datatypes::DataType as ArrowDataType;
893        let len = array.len();
894        match array.data_type() {
895            ArrowDataType::Int8 => {
896                let a = array
897                    .as_primitive_opt::<Int8Type>()
898                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Int8 array"))?;
899                let v: Vec<Option<i8>> = (0..len)
900                    .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
901                    .collect();
902                Ok(Series::new(name.into(), v))
903            }
904            ArrowDataType::Int16 => {
905                let a = array
906                    .as_primitive_opt::<Int16Type>()
907                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Int16 array"))?;
908                let v: Vec<Option<i16>> = (0..len)
909                    .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
910                    .collect();
911                Ok(Series::new(name.into(), v))
912            }
913            ArrowDataType::Int32 => {
914                let a = array
915                    .as_primitive_opt::<Int32Type>()
916                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Int32 array"))?;
917                let v: Vec<Option<i32>> = (0..len)
918                    .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
919                    .collect();
920                Ok(Series::new(name.into(), v))
921            }
922            ArrowDataType::Int64 => {
923                let a = array
924                    .as_primitive_opt::<Int64Type>()
925                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Int64 array"))?;
926                let v: Vec<Option<i64>> = (0..len)
927                    .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
928                    .collect();
929                Ok(Series::new(name.into(), v))
930            }
931            ArrowDataType::UInt8 => {
932                let a = array
933                    .as_primitive_opt::<UInt8Type>()
934                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected UInt8 array"))?;
935                let v: Vec<Option<i64>> = (0..len)
936                    .map(|i| {
937                        if a.is_null(i) {
938                            None
939                        } else {
940                            Some(a.value(i) as i64)
941                        }
942                    })
943                    .collect();
944                Ok(Series::new(name.into(), v).cast(&DataType::UInt8)?)
945            }
946            ArrowDataType::UInt16 => {
947                let a = array
948                    .as_primitive_opt::<UInt16Type>()
949                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected UInt16 array"))?;
950                let v: Vec<Option<i64>> = (0..len)
951                    .map(|i| {
952                        if a.is_null(i) {
953                            None
954                        } else {
955                            Some(a.value(i) as i64)
956                        }
957                    })
958                    .collect();
959                Ok(Series::new(name.into(), v).cast(&DataType::UInt16)?)
960            }
961            ArrowDataType::UInt32 => {
962                let a = array
963                    .as_primitive_opt::<UInt32Type>()
964                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected UInt32 array"))?;
965                let v: Vec<Option<u32>> = (0..len)
966                    .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
967                    .collect();
968                Ok(Series::new(name.into(), v))
969            }
970            ArrowDataType::UInt64 => {
971                let a = array
972                    .as_primitive_opt::<UInt64Type>()
973                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected UInt64 array"))?;
974                let v: Vec<Option<u64>> = (0..len)
975                    .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
976                    .collect();
977                Ok(Series::new(name.into(), v))
978            }
979            ArrowDataType::Float32 => {
980                let a = array
981                    .as_primitive_opt::<Float32Type>()
982                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Float32 array"))?;
983                let v: Vec<Option<f32>> = (0..len)
984                    .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
985                    .collect();
986                Ok(Series::new(name.into(), v))
987            }
988            ArrowDataType::Float64 => {
989                let a = array
990                    .as_primitive_opt::<Float64Type>()
991                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Float64 array"))?;
992                let v: Vec<Option<f64>> = (0..len)
993                    .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
994                    .collect();
995                Ok(Series::new(name.into(), v))
996            }
997            ArrowDataType::Boolean => {
998                let a = array
999                    .as_boolean_opt()
1000                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Boolean array"))?;
1001                let v: Vec<Option<bool>> = (0..len)
1002                    .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
1003                    .collect();
1004                Ok(Series::new(name.into(), v))
1005            }
1006            ArrowDataType::Utf8 => {
1007                let a = array
1008                    .as_string_opt::<i32>()
1009                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Utf8 array"))?;
1010                let v: Vec<Option<String>> = (0..len)
1011                    .map(|i| {
1012                        if a.is_null(i) {
1013                            None
1014                        } else {
1015                            Some(a.value(i).to_string())
1016                        }
1017                    })
1018                    .collect();
1019                Ok(Series::new(name.into(), v))
1020            }
1021            ArrowDataType::LargeUtf8 => {
1022                let a = array
1023                    .as_string_opt::<i64>()
1024                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected LargeUtf8 array"))?;
1025                let v: Vec<Option<String>> = (0..len)
1026                    .map(|i| {
1027                        if a.is_null(i) {
1028                            None
1029                        } else {
1030                            Some(a.value(i).to_string())
1031                        }
1032                    })
1033                    .collect();
1034                Ok(Series::new(name.into(), v))
1035            }
1036            ArrowDataType::Date32 => {
1037                let a = array
1038                    .as_primitive_opt::<Date32Type>()
1039                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Date32 array"))?;
1040                let v: Vec<Option<i32>> = (0..len)
1041                    .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
1042                    .collect();
1043                Ok(Series::new(name.into(), v))
1044            }
1045            ArrowDataType::Date64 => {
1046                let a = array
1047                    .as_primitive_opt::<Date64Type>()
1048                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Date64 array"))?;
1049                let v: Vec<Option<i64>> = (0..len)
1050                    .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
1051                    .collect();
1052                Ok(Series::new(name.into(), v))
1053            }
1054            ArrowDataType::Timestamp(_, _) => {
1055                let a = array
1056                    .as_primitive_opt::<TimestampMillisecondType>()
1057                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Timestamp array"))?;
1058                let v: Vec<Option<i64>> = (0..len)
1059                    .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
1060                    .collect();
1061                Ok(Series::new(name.into(), v))
1062            }
1063            other => Err(color_eyre::eyre::eyre!(
1064                "ORC: unsupported column type {:?} for column '{}'",
1065                other,
1066                name
1067            )),
1068        }
1069    }
1070
1071    /// Build a LazyFrame for hive-partitioned Parquet only (no schema collection, no partition discovery).
1072    /// Use this for phased loading so "Scanning input" is instant; schema and partition handling happen in DoLoadSchema.
1073    pub fn scan_parquet_hive(path: &Path) -> Result<LazyFrame> {
1074        let path_str = path.as_os_str().to_string_lossy();
1075        let is_glob = path_str.contains('*');
1076        let pl_path = PlPath::Local(Arc::from(path));
1077        let args = ScanArgsParquet {
1078            hive_options: HiveOptions::new_enabled(),
1079            glob: is_glob,
1080            ..Default::default()
1081        };
1082        LazyFrame::scan_parquet(pl_path, args).map_err(Into::into)
1083    }
1084
1085    /// Build a LazyFrame for hive-partitioned Parquet with a pre-computed schema (avoids slow collect_schema across all files).
1086    pub fn scan_parquet_hive_with_schema(path: &Path, schema: Arc<Schema>) -> Result<LazyFrame> {
1087        let path_str = path.as_os_str().to_string_lossy();
1088        let is_glob = path_str.contains('*');
1089        let pl_path = PlPath::Local(Arc::from(path));
1090        let args = ScanArgsParquet {
1091            schema: Some(schema),
1092            hive_options: HiveOptions::new_enabled(),
1093            glob: is_glob,
1094            ..Default::default()
1095        };
1096        LazyFrame::scan_parquet(pl_path, args).map_err(Into::into)
1097    }
1098
1099    /// Find the first parquet file along a single spine of a hive-partitioned directory (same walk as partition discovery).
1100    /// Returns `None` if the directory is empty or has no parquet files along that spine.
1101    fn first_parquet_file_in_hive_dir(path: &Path) -> Option<std::path::PathBuf> {
1102        const MAX_DEPTH: usize = 64;
1103        Self::first_parquet_file_spine(path, 0, MAX_DEPTH)
1104    }
1105
1106    fn first_parquet_file_spine(
1107        path: &Path,
1108        depth: usize,
1109        max_depth: usize,
1110    ) -> Option<std::path::PathBuf> {
1111        if depth >= max_depth {
1112            return None;
1113        }
1114        let entries = fs::read_dir(path).ok()?;
1115        let mut first_partition_child: Option<std::path::PathBuf> = None;
1116        for entry in entries.flatten() {
1117            let child = entry.path();
1118            if child.is_file() {
1119                if child
1120                    .extension()
1121                    .is_some_and(|e| e.eq_ignore_ascii_case("parquet"))
1122                {
1123                    return Some(child);
1124                }
1125            } else if child.is_dir() {
1126                if let Some(name) = child.file_name().and_then(|n| n.to_str()) {
1127                    if name.contains('=') && first_partition_child.is_none() {
1128                        first_partition_child = Some(child);
1129                    }
1130                }
1131            }
1132        }
1133        first_partition_child.and_then(|p| Self::first_parquet_file_spine(&p, depth + 1, max_depth))
1134    }
1135
1136    /// Read schema from a single parquet file (metadata only, no data scan). Used to avoid collect_schema() over many files.
1137    fn read_schema_from_single_parquet(path: &Path) -> Result<Arc<Schema>> {
1138        let file = File::open(path)?;
1139        let mut reader = ParquetReader::new(file);
1140        let arrow_schema = reader.schema()?;
1141        let schema = Schema::from_arrow_schema(arrow_schema.as_ref());
1142        Ok(Arc::new(schema))
1143    }
1144
1145    /// Infer schema from one parquet file in a hive directory and merge with partition columns (Utf8).
1146    /// Returns (merged_schema, partition_columns). Use with scan_parquet_hive_with_schema to avoid slow collect_schema().
1147    /// Only supported when path is a directory (not a glob). Returns Err if no parquet file found or read fails.
1148    pub fn schema_from_one_hive_parquet(path: &Path) -> Result<(Arc<Schema>, Vec<String>)> {
1149        let partition_columns = Self::discover_hive_partition_columns(path);
1150        let one_file = Self::first_parquet_file_in_hive_dir(path)
1151            .ok_or_else(|| color_eyre::eyre::eyre!("No parquet file found in hive directory"))?;
1152        let file_schema = Self::read_schema_from_single_parquet(&one_file)?;
1153        let part_set: HashSet<&str> = partition_columns.iter().map(String::as_str).collect();
1154        let mut merged = Schema::with_capacity(partition_columns.len() + file_schema.len());
1155        for name in &partition_columns {
1156            merged.with_column(name.clone().into(), DataType::String);
1157        }
1158        for (name, dtype) in file_schema.iter() {
1159            if !part_set.contains(name.as_str()) {
1160                merged.with_column(name.clone(), dtype.clone());
1161            }
1162        }
1163        Ok((Arc::new(merged), partition_columns))
1164    }
1165
1166    /// Discover hive partition column names (public for phased loading). Directory: single-spine walk; glob: parse pattern.
1167    pub fn discover_hive_partition_columns(path: &Path) -> Vec<String> {
1168        if path.is_dir() {
1169            Self::discover_partition_columns_from_path(path)
1170        } else {
1171            Self::discover_partition_columns_from_glob_pattern(path)
1172        }
1173    }
1174
1175    /// Discover hive partition column names from a directory path by walking a single
1176    /// "spine" (one branch) of key=value directories. Partition keys are uniform across
1177    /// the tree, so we only need one path to infer [year, month, day] etc. Returns columns
1178    /// in path order. Stops after max_depth levels to avoid runaway on malformed trees.
1179    fn discover_partition_columns_from_path(path: &Path) -> Vec<String> {
1180        const MAX_PARTITION_DEPTH: usize = 64;
1181        let mut columns = Vec::<String>::new();
1182        let mut seen = HashSet::<String>::new();
1183        Self::discover_partition_columns_spine(
1184            path,
1185            &mut columns,
1186            &mut seen,
1187            0,
1188            MAX_PARTITION_DEPTH,
1189        );
1190        columns
1191    }
1192
1193    /// Walk one branch: at this directory, find the first child that is a key=value dir,
1194    /// record the key (if not already seen), then recurse into that one child only.
1195    /// This does O(depth) read_dir calls instead of walking the entire tree.
1196    fn discover_partition_columns_spine(
1197        path: &Path,
1198        columns: &mut Vec<String>,
1199        seen: &mut HashSet<String>,
1200        depth: usize,
1201        max_depth: usize,
1202    ) {
1203        if depth >= max_depth {
1204            return;
1205        }
1206        let Ok(entries) = fs::read_dir(path) else {
1207            return;
1208        };
1209        let mut first_partition_child: Option<std::path::PathBuf> = None;
1210        for entry in entries.flatten() {
1211            let child = entry.path();
1212            if child.is_dir() {
1213                if let Some(name) = child.file_name().and_then(|n| n.to_str()) {
1214                    if let Some((key, _)) = name.split_once('=') {
1215                        if !key.is_empty() && seen.insert(key.to_string()) {
1216                            columns.push(key.to_string());
1217                        }
1218                        if first_partition_child.is_none() {
1219                            first_partition_child = Some(child);
1220                        }
1221                        break;
1222                    }
1223                }
1224            }
1225        }
1226        if let Some(one) = first_partition_child {
1227            Self::discover_partition_columns_spine(&one, columns, seen, depth + 1, max_depth);
1228        }
1229    }
1230
1231    /// Infer partition column names from a glob pattern path (e.g. "data/year=*/month=*/*.parquet").
1232    fn discover_partition_columns_from_glob_pattern(path: &Path) -> Vec<String> {
1233        let path_str = path.as_os_str().to_string_lossy();
1234        let mut columns = Vec::<String>::new();
1235        let mut seen = HashSet::<String>::new();
1236        for segment in path_str.split('/') {
1237            if let Some((key, rest)) = segment.split_once('=') {
1238                if !key.is_empty()
1239                    && (rest == "*" || !rest.contains('*'))
1240                    && seen.insert(key.to_string())
1241                {
1242                    columns.push(key.to_string());
1243                }
1244            }
1245        }
1246        columns
1247    }
1248
1249    /// Load Parquet with Hive partitioning from a directory or glob path.
1250    /// When path is a directory, partition columns are discovered from path structure.
1251    /// When path contains glob (e.g. `**/*.parquet`), partition columns are inferred from the pattern (e.g. `year=*/month=*`).
1252    /// Partition columns are moved to the left in the initial LazyFrame before state is created.
1253    ///
1254    /// **Performance**: The slow part is Polars, not our code. `scan_parquet` + `collect_schema()` trigger
1255    /// path expansion (full directory tree or glob) and parquet metadata reads; we only do a single-spine
1256    /// walk for partition key discovery and cheap schema/select work.
1257    pub fn from_parquet_hive(
1258        path: &Path,
1259        pages_lookahead: Option<usize>,
1260        pages_lookback: Option<usize>,
1261        max_buffered_rows: Option<usize>,
1262        max_buffered_mb: Option<usize>,
1263        row_numbers: bool,
1264        row_start_index: usize,
1265    ) -> Result<Self> {
1266        let path_str = path.as_os_str().to_string_lossy();
1267        let is_glob = path_str.contains('*');
1268        let pl_path = PlPath::Local(Arc::from(path));
1269        let args = ScanArgsParquet {
1270            hive_options: HiveOptions::new_enabled(),
1271            glob: is_glob,
1272            ..Default::default()
1273        };
1274        let mut lf = LazyFrame::scan_parquet(pl_path, args)?;
1275        let schema = lf.collect_schema()?;
1276
1277        let mut discovered = if path.is_dir() {
1278            Self::discover_partition_columns_from_path(path)
1279        } else {
1280            Self::discover_partition_columns_from_glob_pattern(path)
1281        };
1282
1283        // Fallback: glob like "**/*.parquet" has no key= in the pattern, so discovery is empty.
1284        // Try discovering from a directory prefix (e.g. path.parent() or walk up until we find a dir).
1285        if discovered.is_empty() {
1286            let mut dir = path;
1287            while !dir.is_dir() {
1288                match dir.parent() {
1289                    Some(p) => dir = p,
1290                    None => break,
1291                }
1292            }
1293            if dir.is_dir() {
1294                discovered = Self::discover_partition_columns_from_path(dir);
1295            }
1296        }
1297
1298        let partition_columns: Vec<String> = discovered
1299            .into_iter()
1300            .filter(|c| schema.contains(c.as_str()))
1301            .collect();
1302
1303        let new_order: Vec<String> = if partition_columns.is_empty() {
1304            schema.iter_names().map(|s| s.to_string()).collect()
1305        } else {
1306            let part_set: HashSet<&str> = partition_columns.iter().map(String::as_str).collect();
1307            let all_names: Vec<String> = schema.iter_names().map(|s| s.to_string()).collect();
1308            let rest: Vec<String> = all_names
1309                .into_iter()
1310                .filter(|c| !part_set.contains(c.as_str()))
1311                .collect();
1312            partition_columns.iter().cloned().chain(rest).collect()
1313        };
1314
1315        if !partition_columns.is_empty() {
1316            let exprs: Vec<Expr> = new_order.iter().map(|s| col(s.as_str())).collect();
1317            lf = lf.select(exprs);
1318        }
1319
1320        let mut state = Self::new(
1321            lf,
1322            pages_lookahead,
1323            pages_lookback,
1324            max_buffered_rows,
1325            max_buffered_mb,
1326            true,
1327        )?;
1328        state.row_numbers = row_numbers;
1329        state.row_start_index = row_start_index;
1330        state.partition_columns = if partition_columns.is_empty() {
1331            None
1332        } else {
1333            Some(partition_columns)
1334        };
1335        // Ensure display order is partition-first (Self::new uses schema order; be explicit).
1336        state.set_column_order(new_order);
1337        Ok(state)
1338    }
1339
1340    pub fn set_row_numbers(&mut self, enabled: bool) {
1341        self.row_numbers = enabled;
1342    }
1343
1344    pub fn toggle_row_numbers(&mut self) {
1345        self.row_numbers = !self.row_numbers;
1346    }
1347
1348    /// Row number display start (0 or 1); used by go-to-line to interpret user input.
1349    pub fn row_start_index(&self) -> usize {
1350        self.row_start_index
1351    }
1352
1353    /// Decompress a compressed file to a temp file for lazy CSV scan.
1354    fn decompress_compressed_csv_to_temp(
1355        path: &Path,
1356        compression: CompressionFormat,
1357        temp_dir: &Path,
1358    ) -> Result<NamedTempFile> {
1359        let mut temp = NamedTempFile::new_in(temp_dir)?;
1360        let out = temp.as_file_mut();
1361        let mut reader: Box<dyn Read> = match compression {
1362            CompressionFormat::Gzip => {
1363                let f = File::open(path)?;
1364                Box::new(flate2::read::GzDecoder::new(BufReader::new(f)))
1365            }
1366            CompressionFormat::Zstd => {
1367                let f = File::open(path)?;
1368                Box::new(zstd::Decoder::new(BufReader::new(f))?)
1369            }
1370            CompressionFormat::Bzip2 => {
1371                let f = File::open(path)?;
1372                Box::new(bzip2::read::BzDecoder::new(BufReader::new(f)))
1373            }
1374            CompressionFormat::Xz => {
1375                let f = File::open(path)?;
1376                Box::new(xz2::read::XzDecoder::new(BufReader::new(f)))
1377            }
1378        };
1379        std::io::copy(&mut reader, out)?;
1380        out.sync_all()?;
1381        Ok(temp)
1382    }
1383
1384    /// Parse null value specs: "VAL" -> global, "COL=VAL" -> per-column (first '=' separates).
1385    fn parse_null_value_specs(specs: &[String]) -> (Vec<String>, Vec<(String, String)>) {
1386        let mut global = Vec::new();
1387        let mut per_column = Vec::new();
1388        for s in specs {
1389            if let Some(i) = s.find('=') {
1390                let (col, val) = (s[..i].to_string(), s[i + 1..].to_string());
1391                per_column.push((col, val));
1392            } else {
1393                global.push(s.clone());
1394            }
1395        }
1396        (global, per_column)
1397    }
1398
1399    /// Build Polars NullValues from parsed specs. When both global and per_column are set, schema is required (caller does schema scan).
1400    fn build_polars_null_values(
1401        global: &[String],
1402        per_column: &[(String, String)],
1403        schema: Option<&Schema>,
1404    ) -> Option<NullValues> {
1405        if global.is_empty() && per_column.is_empty() {
1406            return None;
1407        }
1408        if per_column.is_empty() {
1409            let vals: Vec<PlSmallStr> = global
1410                .iter()
1411                .map(|s| PlSmallStr::from(s.as_str()))
1412                .collect();
1413            return Some(if vals.len() == 1 {
1414                NullValues::AllColumnsSingle(vals[0].clone())
1415            } else {
1416                NullValues::AllColumns(vals)
1417            });
1418        }
1419        if global.is_empty() {
1420            let pairs: Vec<(PlSmallStr, PlSmallStr)> = per_column
1421                .iter()
1422                .map(|(c, v)| (PlSmallStr::from(c.as_str()), PlSmallStr::from(v.as_str())))
1423                .collect();
1424            return Some(NullValues::Named(pairs));
1425        }
1426        let schema = schema?;
1427        let mut pairs: Vec<(PlSmallStr, PlSmallStr)> = Vec::new();
1428        let first_global = PlSmallStr::from(global[0].as_str());
1429        for (name, _) in schema.iter() {
1430            let col_name = name.as_str();
1431            let val = per_column
1432                .iter()
1433                .rev()
1434                .find(|(c, _)| c == col_name)
1435                .map(|(_, v)| PlSmallStr::from(v.as_str()))
1436                .unwrap_or_else(|| first_global.clone());
1437            pairs.push((PlSmallStr::from(col_name), val));
1438        }
1439        Some(NullValues::Named(pairs))
1440    }
1441
1442    /// Infer CSV schema with minimal read (one row) for building null_values when both global and per-column are set.
1443    fn csv_schema_for_null_values(path: &Path, options: &OpenOptions) -> Result<Arc<Schema>> {
1444        let pl_path = PlPath::Local(Arc::from(path));
1445        let mut reader = LazyCsvReader::new(pl_path).with_n_rows(Some(1));
1446        if let Some(skip_lines) = options.skip_lines {
1447            reader = reader.with_skip_lines(skip_lines);
1448        }
1449        if let Some(skip_rows) = options.skip_rows {
1450            reader = reader.with_skip_rows(skip_rows);
1451        }
1452        if let Some(has_header) = options.has_header {
1453            reader = reader.with_has_header(has_header);
1454        }
1455        reader = reader.with_try_parse_dates(options.csv_try_parse_dates());
1456        let mut lf = reader.finish()?;
1457        lf.collect_schema().map_err(color_eyre::eyre::Report::from)
1458    }
1459
1460    /// Build Polars NullValues from options; path_for_schema required when both global and per-column specs are set.
1461    fn build_null_values_for_csv(
1462        options: &OpenOptions,
1463        path_for_schema: Option<&Path>,
1464    ) -> Result<Option<NullValues>> {
1465        let specs = match &options.null_values {
1466            None => return Ok(None),
1467            Some(s) if s.is_empty() => return Ok(None),
1468            Some(s) => s.as_slice(),
1469        };
1470        let (global, per_column) = Self::parse_null_value_specs(specs);
1471        let nv = if !global.is_empty() && !per_column.is_empty() {
1472            let path = path_for_schema.ok_or_else(|| {
1473                color_eyre::eyre::eyre!(
1474                    "Internal error: path required for null_values with both global and per-column"
1475                )
1476            })?;
1477            let schema = Self::csv_schema_for_null_values(path, options)?;
1478            Self::build_polars_null_values(&global, &per_column, Some(schema.as_ref()))
1479        } else {
1480            Self::build_polars_null_values(&global, &per_column, None)
1481        };
1482        Ok(nv)
1483    }
1484
1485    /// Trim leading/trailing whitespace from CSV column names. Applied whenever we have a CSV LazyFrame.
1486    fn trim_csv_column_names(mut lf: LazyFrame) -> Result<LazyFrame> {
1487        let schema = lf.collect_schema()?;
1488        let names: Vec<String> = schema.iter_names().map(|s| s.to_string()).collect();
1489        let trimmed: Vec<String> = names.iter().map(|s| s.trim().to_string()).collect();
1490        if names == trimmed {
1491            return Ok(lf);
1492        }
1493        Ok(lf.rename(
1494            names.iter().map(|s| s.as_str()),
1495            trimmed.iter().map(|s| s.as_str()),
1496            false,
1497        ))
1498    }
1499
1500    /// If options.skip_tail_rows is set, run a count query and slice the LazyFrame to drop that many rows from the end. Used for CSV with trailing garbage/footer.
1501    fn apply_skip_tail_rows_csv(lf: LazyFrame, options: &OpenOptions) -> Result<LazyFrame> {
1502        let n = match options.skip_tail_rows {
1503            None | Some(0) => return Ok(lf),
1504            Some(n) => n,
1505        };
1506        let count_df = collect_lazy(lf.clone().select([len()]), options.polars_streaming)
1507            .map_err(color_eyre::eyre::Report::from)?;
1508        let total: u32 = if let Some(col) = count_df.get(0) {
1509            match col.first() {
1510                Some(AnyValue::UInt32(v)) => *v,
1511                _ => return Ok(lf),
1512            }
1513        } else {
1514            return Ok(lf);
1515        };
1516        let keep = total.saturating_sub(n as u32);
1517        Ok(lf.slice(0, keep))
1518    }
1519
1520    /// Try to detect a date format from a sample string (first format that parses).
1521    /// Returns None if no format matches, so we can avoid passing format: None to Polars (which can error).
1522    fn infer_date_format_from_sample(sample: &str) -> Option<&'static str> {
1523        const DATE_FMTS: &[&str] = &[
1524            "%Y-%m-%d", "%Y/%m/%d", "%Y.%m.%d", "%Y%m%d", "%d-%m-%Y", "%d/%m/%Y", "%d.%m.%Y",
1525            "%m-%d-%Y", "%m/%d/%Y",
1526        ];
1527        DATE_FMTS
1528            .iter()
1529            .find(|fmt| NaiveDate::parse_from_str(sample, fmt).is_ok())
1530            .copied()
1531    }
1532
1533    /// Try to detect a datetime format from a sample string.
1534    fn infer_datetime_format_from_sample(sample: &str) -> Option<&'static str> {
1535        const DATETIME_FMTS: &[&str] = &[
1536            "%Y-%m-%dT%H:%M:%S%.f",
1537            "%Y-%m-%dT%H:%M:%S",
1538            "%Y-%m-%d %H:%M:%S%.f",
1539            "%Y-%m-%d %H:%M:%S",
1540            "%Y-%m-%d %H:%M",
1541            "%Y-%m-%d",
1542            "%d-%m-%YT%H:%M:%S%.f",
1543            "%d-%m-%YT%H:%M:%S",
1544            "%d-%m-%Y %H:%M:%S%.f",
1545            "%d-%m-%Y %H:%M:%S",
1546            "%d/%m/%YT%H:%M:%S%.f",
1547            "%d/%m/%YT%H:%M:%S",
1548            "%d/%m/%Y %H:%M:%S",
1549            "%Y%m%dT%H%M%S%.f",
1550            "%Y%m%d %H%M%S",
1551        ];
1552        DATETIME_FMTS
1553            .iter()
1554            .find(|fmt| NaiveDateTime::parse_from_str(sample, fmt).is_ok())
1555            .copied()
1556    }
1557
1558    /// Parse a string ChunkedArray into a Duration ChunkedArray (nanoseconds). Uses Polars duration
1559    /// format (e.g. `1d`, `2h30m`, `-1w2d`). Invalid or null inputs become null in the output.
1560    fn string_chunked_to_duration_ns(str_ca: &StringChunked) -> DurationChunked {
1561        let name = str_ca.name().clone();
1562        let vals: Vec<Option<i64>> = str_ca
1563            .iter()
1564            .map(|opt_s| {
1565                opt_s.and_then(|s| {
1566                    polars::time::Duration::try_parse(s)
1567                        .ok()
1568                        .map(|d| d.duration_ns())
1569                })
1570            })
1571            .collect();
1572        let int_ca = Int64Chunked::from_iter_options(name, vals.into_iter());
1573        int_ca.into_duration(TimeUnit::Nanoseconds)
1574    }
1575
1576    /// Try to detect a time format from a sample string (HH:MM:SS, HH:MM, with optional fractional seconds).
1577    fn infer_time_format_from_sample(sample: &str) -> Option<&'static str> {
1578        const TIME_FMTS: &[&str] = &[
1579            "%H:%M:%S%.9f",
1580            "%H:%M:%S%.6f",
1581            "%H:%M:%S%.3f",
1582            "%H:%M:%S",
1583            "%H:%M",
1584        ];
1585        TIME_FMTS
1586            .iter()
1587            .find(|fmt| NaiveTime::parse_from_str(sample, fmt).is_ok())
1588            .copied()
1589    }
1590
1591    /// Apply trim and type inference to CSV string columns when --parse-strings is enabled.
1592    /// Samples up to `options.parse_strings_sample_rows` rows to infer types, then overlays lazy exprs (trim then cast) on the LazyFrame.
1593    fn apply_parse_strings_to_csv_lazyframe(
1594        lf: LazyFrame,
1595        options: &OpenOptions,
1596    ) -> Result<LazyFrame> {
1597        let target = match &options.parse_strings {
1598            None => return Ok(lf),
1599            Some(t) => t,
1600        };
1601        let sample_rows = options.parse_strings_sample_rows;
1602        let sample_df = lf.clone().limit(sample_rows as u32).collect()?;
1603        let schema = sample_df.schema();
1604        let string_cols: Vec<String> = schema
1605            .iter()
1606            .filter(|(_name, dtype)| **dtype == DataType::String)
1607            .map(|(name, _)| name.to_string())
1608            .collect();
1609        let target_cols: Vec<String> = match target {
1610            ParseStringsTarget::All => string_cols,
1611            ParseStringsTarget::Columns(c) => c
1612                .iter()
1613                .filter(|name| string_cols.contains(name))
1614                .cloned()
1615                .collect(),
1616        };
1617        if target_cols.is_empty() {
1618            return Ok(lf);
1619        }
1620        use polars::datatypes::TimeUnit;
1621        let whitespace_pat = lit(PlSmallStr::from_static(" \t\n\r"));
1622        // Re-collect sample with values trimmed so inference sees "1" not " 1 "
1623        let trim_sample_exprs: Vec<Expr> = target_cols
1624            .iter()
1625            .map(|c| {
1626                col(PlSmallStr::from(c.as_str()))
1627                    .str()
1628                    .strip_chars(whitespace_pat.clone())
1629                    .alias(PlSmallStr::from(c.as_str()))
1630            })
1631            .collect();
1632        // Treat blank (empty string after trim) as null so "all null" and accept_type use normalized semantics.
1633        let blank_to_null_exprs: Vec<Expr> = target_cols
1634            .iter()
1635            .map(|c| {
1636                let name = PlSmallStr::from(c.as_str());
1637                when(col(name.clone()).eq(lit(PlSmallStr::from_static(""))))
1638                    .then(Null {}.lit())
1639                    .otherwise(col(name.clone()))
1640                    .alias(name)
1641            })
1642            .collect();
1643        let sample_df = lf
1644            .clone()
1645            .limit(sample_rows as u32)
1646            .with_columns(trim_sample_exprs)
1647            .with_columns(blank_to_null_exprs)
1648            .collect()?;
1649        let mut exprs = Vec::with_capacity(target_cols.len());
1650        for col_name in &target_cols {
1651            let s = sample_df.column(col_name.as_str())?;
1652            let null_before = s.null_count();
1653            let len = s.len();
1654            // Accept type if we didn't introduce new nulls (null_after <= null_before).
1655            let accept_type = |null_after: usize| null_after <= null_before;
1656            // Inference order: Date → Datetime → Time → Duration → Int64 → Float64 → String.
1657            enum InferredType {
1658                Date,
1659                Datetime,
1660                Time,
1661                Duration,
1662                Int64,
1663                Float64,
1664                String,
1665            }
1666            let (inferred, date_fmt, datetime_fmt, time_fmt) = if null_before == len {
1667                // Column is all null (including blanks treated as null): leave as string.
1668                (InferredType::String, None, None, None)
1669            } else {
1670                match s.str() {
1671                    Err(_) => (InferredType::String, None, None, None),
1672                    Ok(str_ca) => {
1673                        let first_val: Option<&str> = str_ca
1674                            .iter()
1675                            .find_map(|o: Option<&str>| o.filter(|s: &&str| !s.is_empty()));
1676                        let (mut t, mut date_fmt, mut datetime_fmt, mut time_fmt) = match str_ca
1677                            .as_date(None, true)
1678                        {
1679                            Ok(as_date) if accept_type(as_date.null_count()) => {
1680                                let fmt = first_val.and_then(Self::infer_date_format_from_sample);
1681                                if fmt.is_some() {
1682                                    (InferredType::Date, fmt.map(String::from), None, None)
1683                                } else {
1684                                    (InferredType::String, None, None, None)
1685                                }
1686                            }
1687                            _ => (InferredType::String, None, None, None),
1688                        };
1689                        if matches!(t, InferredType::String) {
1690                            let amb_name: &str = str_ca.name().as_ref();
1691                            let amb_series = Series::new(
1692                                PlSmallStr::from(amb_name),
1693                                vec!["raise"; str_ca.len()],
1694                            );
1695                            let amb_ca =
1696                                amb_series.str().map_err(color_eyre::eyre::Report::from)?;
1697                            (t, date_fmt, datetime_fmt, time_fmt) = match str_ca.as_datetime(
1698                                None,
1699                                TimeUnit::Microseconds,
1700                                true,
1701                                false,
1702                                None,
1703                                amb_ca,
1704                            ) {
1705                                Ok(as_dt) if accept_type(as_dt.null_count()) => {
1706                                    let fmt =
1707                                        first_val.and_then(Self::infer_datetime_format_from_sample);
1708                                    if fmt.is_some() {
1709                                        (InferredType::Datetime, None, fmt.map(String::from), None)
1710                                    } else {
1711                                        (InferredType::String, None, None, None)
1712                                    }
1713                                }
1714                                _ => (InferredType::String, None, None, None),
1715                            };
1716                        }
1717                        if matches!(t, InferredType::String) {
1718                            (t, date_fmt, datetime_fmt, time_fmt) = match str_ca.as_time(None, true)
1719                            {
1720                                Ok(as_time) if accept_type(as_time.null_count()) => {
1721                                    let fmt =
1722                                        first_val.and_then(Self::infer_time_format_from_sample);
1723                                    if fmt.is_some() {
1724                                        (InferredType::Time, None, None, fmt.map(String::from))
1725                                    } else {
1726                                        (InferredType::String, None, None, None)
1727                                    }
1728                                }
1729                                _ => (InferredType::String, None, None, None),
1730                            };
1731                        }
1732                        if matches!(t, InferredType::String) {
1733                            let duration_ca = Self::string_chunked_to_duration_ns(str_ca);
1734                            (t, date_fmt, datetime_fmt, time_fmt) =
1735                                if accept_type(duration_ca.null_count()) {
1736                                    (InferredType::Duration, None, None, None)
1737                                } else {
1738                                    (InferredType::String, None, None, None)
1739                                };
1740                        }
1741                        if matches!(t, InferredType::String) {
1742                            (t, date_fmt, datetime_fmt, time_fmt) =
1743                                match s.strict_cast(&DataType::Int64) {
1744                                    Ok(as_int) if accept_type(as_int.null_count()) => {
1745                                        (InferredType::Int64, None, None, None)
1746                                    }
1747                                    _ => (InferredType::String, None, None, None),
1748                                };
1749                        }
1750                        if matches!(t, InferredType::String) {
1751                            (t, date_fmt, datetime_fmt, time_fmt) =
1752                                match s.strict_cast(&DataType::Float64) {
1753                                    Ok(as_float) if accept_type(as_float.null_count()) => {
1754                                        (InferredType::Float64, None, None, None)
1755                                    }
1756                                    _ => (InferredType::String, None, None, None),
1757                                };
1758                        }
1759                        (t, date_fmt, datetime_fmt, time_fmt)
1760                    }
1761                }
1762            };
1763            let base = col(PlSmallStr::from(col_name.as_str()))
1764                .str()
1765                .strip_chars(whitespace_pat.clone());
1766            // Treat blank as null in the applied pipeline so blanks become null in the result.
1767            let base_with_nulls = when(base.clone().eq(lit(PlSmallStr::from_static(""))))
1768                .then(Null {}.lit())
1769                .otherwise(base.clone());
1770            let expr = match inferred {
1771                InferredType::Date => {
1772                    let opts = StrptimeOptions {
1773                        format: date_fmt.as_deref().map(PlSmallStr::from),
1774                        strict: false,
1775                        exact: false,
1776                        cache: true,
1777                    };
1778                    base_with_nulls
1779                        .clone()
1780                        .str()
1781                        .to_date(opts)
1782                        .alias(PlSmallStr::from(col_name.as_str()))
1783                }
1784                InferredType::Datetime => {
1785                    let opts = StrptimeOptions {
1786                        format: datetime_fmt.as_deref().map(PlSmallStr::from),
1787                        strict: false,
1788                        exact: false,
1789                        cache: true,
1790                    };
1791                    base_with_nulls
1792                        .clone()
1793                        .str()
1794                        .to_datetime(
1795                            Some(TimeUnit::Microseconds),
1796                            None,
1797                            opts,
1798                            lit(PlSmallStr::from_static("raise")),
1799                        )
1800                        .alias(PlSmallStr::from(col_name.as_str()))
1801                }
1802                InferredType::Time => {
1803                    let opts = StrptimeOptions {
1804                        format: time_fmt.as_deref().map(PlSmallStr::from),
1805                        strict: false,
1806                        exact: true,
1807                        cache: true,
1808                    };
1809                    base_with_nulls
1810                        .clone()
1811                        .str()
1812                        .to_time(opts)
1813                        .alias(PlSmallStr::from(col_name.as_str()))
1814                }
1815                // No strptime for Duration in Polars; parse via map using Duration::try_parse.
1816                InferredType::Duration => base_with_nulls
1817                    .clone()
1818                    .map(
1819                        |c: Column| {
1820                            let str_ca = c.str()?;
1821                            let duration_ca = Self::string_chunked_to_duration_ns(str_ca);
1822                            Ok(duration_ca.into_column())
1823                        },
1824                        |_schema: &Schema, field: &Field| {
1825                            Ok(Field::new(
1826                                field.name().clone(),
1827                                DataType::Duration(TimeUnit::Nanoseconds),
1828                            ))
1829                        },
1830                    )
1831                    .alias(PlSmallStr::from(col_name.as_str())),
1832                InferredType::Int64 => base_with_nulls
1833                    .clone()
1834                    .cast(DataType::Int64)
1835                    .alias(PlSmallStr::from(col_name.as_str())),
1836                InferredType::Float64 => base_with_nulls
1837                    .cast(DataType::Float64)
1838                    .alias(PlSmallStr::from(col_name.as_str())),
1839                InferredType::String => base.alias(PlSmallStr::from(col_name.as_str())),
1840            };
1841            exprs.push(expr);
1842        }
1843        Ok(lf.with_columns(exprs))
1844    }
1845
1846    pub fn from_csv(path: &Path, options: &OpenOptions) -> Result<Self> {
1847        let nv = Self::build_null_values_for_csv(options, Some(path))?;
1848
1849        // Determine compression format: explicit option, or auto-detect from extension
1850        let compression = options
1851            .compression
1852            .or_else(|| CompressionFormat::from_extension(path));
1853
1854        if let Some(compression) = compression {
1855            if options.decompress_in_memory {
1856                // Eager read: decompress into memory, then CSV read
1857                match compression {
1858                    CompressionFormat::Gzip | CompressionFormat::Zstd => {
1859                        let mut read_options = CsvReadOptions::default();
1860                        if let Some(skip_lines) = options.skip_lines {
1861                            read_options.skip_lines = skip_lines;
1862                        }
1863                        if let Some(skip_rows) = options.skip_rows {
1864                            read_options.skip_rows = skip_rows;
1865                        }
1866                        if let Some(has_header) = options.has_header {
1867                            read_options.has_header = has_header;
1868                        }
1869                        if let Some(n) = options.infer_schema_length {
1870                            read_options.infer_schema_length = Some(n);
1871                        }
1872                        read_options.ignore_errors = options.ignore_errors;
1873                        read_options = read_options.map_parse_options(|opts| {
1874                            let o = opts.with_try_parse_dates(options.csv_try_parse_dates());
1875                            match &nv {
1876                                Some(n) => o.with_null_values(Some(n.clone())),
1877                                None => o,
1878                            }
1879                        });
1880                        let df = read_options
1881                            .try_into_reader_with_file_path(Some(path.into()))?
1882                            .finish()?;
1883                        let mut lf = Self::trim_csv_column_names(df.lazy())?;
1884                        lf = Self::apply_parse_strings_to_csv_lazyframe(lf, options)?;
1885                        lf = Self::apply_skip_tail_rows_csv(lf, options)?;
1886                        let mut state = Self::new(
1887                            lf,
1888                            options.pages_lookahead,
1889                            options.pages_lookback,
1890                            options.max_buffered_rows,
1891                            options.max_buffered_mb,
1892                            options.polars_streaming,
1893                        )?;
1894                        state.row_numbers = options.row_numbers;
1895                        state.row_start_index = options.row_start_index;
1896                        Ok(state)
1897                    }
1898                    CompressionFormat::Bzip2 => {
1899                        let file = File::open(path)?;
1900                        let mut decoder = bzip2::read::BzDecoder::new(BufReader::new(file));
1901                        let mut decompressed = Vec::new();
1902                        decoder.read_to_end(&mut decompressed)?;
1903                        let mut read_options = CsvReadOptions::default();
1904                        if let Some(skip_lines) = options.skip_lines {
1905                            read_options.skip_lines = skip_lines;
1906                        }
1907                        if let Some(skip_rows) = options.skip_rows {
1908                            read_options.skip_rows = skip_rows;
1909                        }
1910                        if let Some(has_header) = options.has_header {
1911                            read_options.has_header = has_header;
1912                        }
1913                        if let Some(n) = options.infer_schema_length {
1914                            read_options.infer_schema_length = Some(n);
1915                        }
1916                        read_options.ignore_errors = options.ignore_errors;
1917                        read_options = read_options.map_parse_options(|opts| {
1918                            let o = opts.with_try_parse_dates(options.csv_try_parse_dates());
1919                            match &nv {
1920                                Some(n) => o.with_null_values(Some(n.clone())),
1921                                None => o,
1922                            }
1923                        });
1924                        let df = CsvReader::new(std::io::Cursor::new(decompressed))
1925                            .with_options(read_options)
1926                            .finish()?;
1927                        let mut lf = Self::trim_csv_column_names(df.lazy())?;
1928                        lf = Self::apply_parse_strings_to_csv_lazyframe(lf, options)?;
1929                        lf = Self::apply_skip_tail_rows_csv(lf, options)?;
1930                        let mut state = Self::new(
1931                            lf,
1932                            options.pages_lookahead,
1933                            options.pages_lookback,
1934                            options.max_buffered_rows,
1935                            options.max_buffered_mb,
1936                            options.polars_streaming,
1937                        )?;
1938                        state.row_numbers = options.row_numbers;
1939                        state.row_start_index = options.row_start_index;
1940                        Ok(state)
1941                    }
1942                    CompressionFormat::Xz => {
1943                        let file = File::open(path)?;
1944                        let mut decoder = xz2::read::XzDecoder::new(BufReader::new(file));
1945                        let mut decompressed = Vec::new();
1946                        decoder.read_to_end(&mut decompressed)?;
1947                        let mut read_options = CsvReadOptions::default();
1948                        if let Some(skip_lines) = options.skip_lines {
1949                            read_options.skip_lines = skip_lines;
1950                        }
1951                        if let Some(skip_rows) = options.skip_rows {
1952                            read_options.skip_rows = skip_rows;
1953                        }
1954                        if let Some(has_header) = options.has_header {
1955                            read_options.has_header = has_header;
1956                        }
1957                        if let Some(n) = options.infer_schema_length {
1958                            read_options.infer_schema_length = Some(n);
1959                        }
1960                        read_options.ignore_errors = options.ignore_errors;
1961                        read_options = read_options.map_parse_options(|opts| {
1962                            let o = opts.with_try_parse_dates(options.csv_try_parse_dates());
1963                            match &nv {
1964                                Some(n) => o.with_null_values(Some(n.clone())),
1965                                None => o,
1966                            }
1967                        });
1968                        let df = CsvReader::new(std::io::Cursor::new(decompressed))
1969                            .with_options(read_options)
1970                            .finish()?;
1971                        let mut lf = Self::trim_csv_column_names(df.lazy())?;
1972                        lf = Self::apply_parse_strings_to_csv_lazyframe(lf, options)?;
1973                        lf = Self::apply_skip_tail_rows_csv(lf, options)?;
1974                        let mut state = Self::new(
1975                            lf,
1976                            options.pages_lookahead,
1977                            options.pages_lookback,
1978                            options.max_buffered_rows,
1979                            options.max_buffered_mb,
1980                            options.polars_streaming,
1981                        )?;
1982                        state.row_numbers = options.row_numbers;
1983                        state.row_start_index = options.row_start_index;
1984                        Ok(state)
1985                    }
1986                }
1987            } else {
1988                // Decompress to temp file, then lazy scan
1989                let temp_dir = options.temp_dir.clone().unwrap_or_else(std::env::temp_dir);
1990                let temp = Self::decompress_compressed_csv_to_temp(path, compression, &temp_dir)?;
1991                let nv_temp = Self::build_null_values_for_csv(options, Some(temp.path()))?;
1992                let mut state = Self::from_csv_customize(
1993                    temp.path(),
1994                    options.pages_lookahead,
1995                    options.pages_lookback,
1996                    options.max_buffered_rows,
1997                    options.max_buffered_mb,
1998                    |mut reader| {
1999                        if let Some(skip_lines) = options.skip_lines {
2000                            reader = reader.with_skip_lines(skip_lines);
2001                        }
2002                        if let Some(skip_rows) = options.skip_rows {
2003                            reader = reader.with_skip_rows(skip_rows);
2004                        }
2005                        if let Some(has_header) = options.has_header {
2006                            reader = reader.with_has_header(has_header);
2007                        }
2008                        if let Some(n) = options.infer_schema_length {
2009                            reader = reader.with_infer_schema_length(Some(n));
2010                        }
2011                        reader = reader.with_ignore_errors(options.ignore_errors);
2012                        reader = reader.with_try_parse_dates(options.csv_try_parse_dates());
2013                        reader = match &nv_temp {
2014                            Some(n) => reader
2015                                .map_parse_options(|opts| opts.with_null_values(Some(n.clone()))),
2016                            None => reader,
2017                        };
2018                        reader
2019                    },
2020                )?;
2021                let mut lf = Self::trim_csv_column_names(std::mem::take(&mut state.lf))?;
2022                state.original_lf = lf.clone();
2023                state.schema = lf.clone().collect_schema()?;
2024                state.lf = lf.clone();
2025                if options.parse_strings.is_some() {
2026                    lf = Self::apply_parse_strings_to_csv_lazyframe(lf, options)?;
2027                    state.original_lf = lf.clone();
2028                    state.schema = lf.clone().collect_schema()?;
2029                    state.lf = lf.clone();
2030                }
2031                lf = Self::apply_skip_tail_rows_csv(lf, options)?;
2032                state.original_lf = lf.clone();
2033                state.schema = lf.clone().collect_schema()?;
2034                state.lf = lf;
2035                state.row_numbers = options.row_numbers;
2036                state.row_start_index = options.row_start_index;
2037                state.decompress_temp_file = Some(temp);
2038                Ok(state)
2039            }
2040        } else {
2041            // For uncompressed files, use lazy scanning (more efficient)
2042            let mut state = Self::from_csv_customize(
2043                path,
2044                options.pages_lookahead,
2045                options.pages_lookback,
2046                options.max_buffered_rows,
2047                options.max_buffered_mb,
2048                |mut reader| {
2049                    if let Some(skip_lines) = options.skip_lines {
2050                        reader = reader.with_skip_lines(skip_lines);
2051                    }
2052                    if let Some(skip_rows) = options.skip_rows {
2053                        reader = reader.with_skip_rows(skip_rows);
2054                    }
2055                    if let Some(has_header) = options.has_header {
2056                        reader = reader.with_has_header(has_header);
2057                    }
2058                    if let Some(n) = options.infer_schema_length {
2059                        reader = reader.with_infer_schema_length(Some(n));
2060                    }
2061                    reader = reader.with_ignore_errors(options.ignore_errors);
2062                    reader = reader.with_try_parse_dates(options.csv_try_parse_dates());
2063                    reader = match &nv {
2064                        Some(n) => {
2065                            reader.map_parse_options(|opts| opts.with_null_values(Some(n.clone())))
2066                        }
2067                        None => reader,
2068                    };
2069                    reader
2070                },
2071            )?;
2072            let mut lf = Self::trim_csv_column_names(std::mem::take(&mut state.lf))?;
2073            state.original_lf = lf.clone();
2074            state.schema = lf.clone().collect_schema()?;
2075            state.lf = lf.clone();
2076            if options.parse_strings.is_some() {
2077                lf = Self::apply_parse_strings_to_csv_lazyframe(lf, options)?;
2078                state.original_lf = lf.clone();
2079                state.schema = lf.clone().collect_schema()?;
2080                state.lf = lf.clone();
2081            }
2082            lf = Self::apply_skip_tail_rows_csv(lf, options)?;
2083            state.original_lf = lf.clone();
2084            state.schema = lf.clone().collect_schema()?;
2085            state.lf = lf;
2086            state.row_numbers = options.row_numbers;
2087            Ok(state)
2088        }
2089    }
2090
2091    pub fn from_csv_customize<F>(
2092        path: &Path,
2093        pages_lookahead: Option<usize>,
2094        pages_lookback: Option<usize>,
2095        max_buffered_rows: Option<usize>,
2096        max_buffered_mb: Option<usize>,
2097        func: F,
2098    ) -> Result<Self>
2099    where
2100        F: FnOnce(LazyCsvReader) -> LazyCsvReader,
2101    {
2102        let pl_path = PlPath::Local(Arc::from(path));
2103        let reader = LazyCsvReader::new(pl_path);
2104        let lf = func(reader).finish()?;
2105        Self::new(
2106            lf,
2107            pages_lookahead,
2108            pages_lookback,
2109            max_buffered_rows,
2110            max_buffered_mb,
2111            true,
2112        )
2113    }
2114
2115    /// Load multiple CSV files (uncompressed) and concatenate into one LazyFrame.
2116    pub fn from_csv_paths(paths: &[impl AsRef<Path>], options: &OpenOptions) -> Result<Self> {
2117        if paths.is_empty() {
2118            return Err(color_eyre::eyre::eyre!("No paths provided"));
2119        }
2120        if paths.len() == 1 {
2121            return Self::from_csv(paths[0].as_ref(), options);
2122        }
2123        let nv = Self::build_null_values_for_csv(options, Some(paths[0].as_ref()))?;
2124        let mut lazy_frames = Vec::with_capacity(paths.len());
2125        for p in paths {
2126            let pl_path = PlPath::Local(Arc::from(p.as_ref()));
2127            let mut reader = LazyCsvReader::new(pl_path);
2128            if let Some(skip_lines) = options.skip_lines {
2129                reader = reader.with_skip_lines(skip_lines);
2130            }
2131            if let Some(skip_rows) = options.skip_rows {
2132                reader = reader.with_skip_rows(skip_rows);
2133            }
2134            if let Some(has_header) = options.has_header {
2135                reader = reader.with_has_header(has_header);
2136            }
2137            if let Some(n) = options.infer_schema_length {
2138                reader = reader.with_infer_schema_length(Some(n));
2139            }
2140            reader = reader.with_ignore_errors(options.ignore_errors);
2141            reader = reader.with_try_parse_dates(options.csv_try_parse_dates());
2142            reader = match &nv {
2143                Some(n) => reader.map_parse_options(|opts| opts.with_null_values(Some(n.clone()))),
2144                None => reader,
2145            };
2146            let lf = reader.finish()?;
2147            lazy_frames.push(lf);
2148        }
2149        let mut lf = Self::trim_csv_column_names(polars::prelude::concat(
2150            lazy_frames.as_slice(),
2151            Default::default(),
2152        )?)?;
2153        lf = Self::apply_parse_strings_to_csv_lazyframe(lf, options)?;
2154        lf = Self::apply_skip_tail_rows_csv(lf, options)?;
2155        let mut state = Self::new(
2156            lf,
2157            options.pages_lookahead,
2158            options.pages_lookback,
2159            options.max_buffered_rows,
2160            options.max_buffered_mb,
2161            options.polars_streaming,
2162        )?;
2163        state.row_numbers = options.row_numbers;
2164        state.row_start_index = options.row_start_index;
2165        Ok(state)
2166    }
2167
2168    pub fn from_ndjson(
2169        path: &Path,
2170        pages_lookahead: Option<usize>,
2171        pages_lookback: Option<usize>,
2172        max_buffered_rows: Option<usize>,
2173        max_buffered_mb: Option<usize>,
2174        row_numbers: bool,
2175        row_start_index: usize,
2176    ) -> Result<Self> {
2177        let pl_path = PlPath::Local(Arc::from(path));
2178        let lf = LazyJsonLineReader::new(pl_path).finish()?;
2179        let mut state = Self::new(
2180            lf,
2181            pages_lookahead,
2182            pages_lookback,
2183            max_buffered_rows,
2184            max_buffered_mb,
2185            true,
2186        )?;
2187        state.row_numbers = row_numbers;
2188        state.row_start_index = row_start_index;
2189        Ok(state)
2190    }
2191
2192    /// Load multiple NDJSON files and concatenate into one LazyFrame.
2193    pub fn from_ndjson_paths(
2194        paths: &[impl AsRef<Path>],
2195        pages_lookahead: Option<usize>,
2196        pages_lookback: Option<usize>,
2197        max_buffered_rows: Option<usize>,
2198        max_buffered_mb: Option<usize>,
2199        row_numbers: bool,
2200        row_start_index: usize,
2201    ) -> Result<Self> {
2202        if paths.is_empty() {
2203            return Err(color_eyre::eyre::eyre!("No paths provided"));
2204        }
2205        if paths.len() == 1 {
2206            return Self::from_ndjson(
2207                paths[0].as_ref(),
2208                pages_lookahead,
2209                pages_lookback,
2210                max_buffered_rows,
2211                max_buffered_mb,
2212                row_numbers,
2213                row_start_index,
2214            );
2215        }
2216        let mut lazy_frames = Vec::with_capacity(paths.len());
2217        for p in paths {
2218            let pl_path = PlPath::Local(Arc::from(p.as_ref()));
2219            let lf = LazyJsonLineReader::new(pl_path).finish()?;
2220            lazy_frames.push(lf);
2221        }
2222        let lf = polars::prelude::concat(lazy_frames.as_slice(), Default::default())?;
2223        let mut state = Self::new(
2224            lf,
2225            pages_lookahead,
2226            pages_lookback,
2227            max_buffered_rows,
2228            max_buffered_mb,
2229            true,
2230        )?;
2231        state.row_numbers = row_numbers;
2232        state.row_start_index = row_start_index;
2233        Ok(state)
2234    }
2235
2236    pub fn from_json(
2237        path: &Path,
2238        pages_lookahead: Option<usize>,
2239        pages_lookback: Option<usize>,
2240        max_buffered_rows: Option<usize>,
2241        max_buffered_mb: Option<usize>,
2242        row_numbers: bool,
2243        row_start_index: usize,
2244    ) -> Result<Self> {
2245        Self::from_json_with_format(
2246            path,
2247            pages_lookahead,
2248            pages_lookback,
2249            max_buffered_rows,
2250            max_buffered_mb,
2251            row_numbers,
2252            row_start_index,
2253            JsonFormat::Json,
2254        )
2255    }
2256
2257    pub fn from_json_lines(
2258        path: &Path,
2259        pages_lookahead: Option<usize>,
2260        pages_lookback: Option<usize>,
2261        max_buffered_rows: Option<usize>,
2262        max_buffered_mb: Option<usize>,
2263        row_numbers: bool,
2264        row_start_index: usize,
2265    ) -> Result<Self> {
2266        Self::from_json_with_format(
2267            path,
2268            pages_lookahead,
2269            pages_lookback,
2270            max_buffered_rows,
2271            max_buffered_mb,
2272            row_numbers,
2273            row_start_index,
2274            JsonFormat::JsonLines,
2275        )
2276    }
2277
2278    #[allow(clippy::too_many_arguments)]
2279    fn from_json_with_format(
2280        path: &Path,
2281        pages_lookahead: Option<usize>,
2282        pages_lookback: Option<usize>,
2283        max_buffered_rows: Option<usize>,
2284        max_buffered_mb: Option<usize>,
2285        row_numbers: bool,
2286        row_start_index: usize,
2287        format: JsonFormat,
2288    ) -> Result<Self> {
2289        let file = File::open(path)?;
2290        let lf = JsonReader::new(file)
2291            .with_json_format(format)
2292            .finish()?
2293            .lazy();
2294        let mut state = Self::new(
2295            lf,
2296            pages_lookahead,
2297            pages_lookback,
2298            max_buffered_rows,
2299            max_buffered_mb,
2300            true,
2301        )?;
2302        state.row_numbers = row_numbers;
2303        state.row_start_index = row_start_index;
2304        Ok(state)
2305    }
2306
2307    /// Load multiple JSON (array) files and concatenate into one LazyFrame.
2308    pub fn from_json_paths(
2309        paths: &[impl AsRef<Path>],
2310        pages_lookahead: Option<usize>,
2311        pages_lookback: Option<usize>,
2312        max_buffered_rows: Option<usize>,
2313        max_buffered_mb: Option<usize>,
2314        row_numbers: bool,
2315        row_start_index: usize,
2316    ) -> Result<Self> {
2317        Self::from_json_with_format_paths(
2318            paths,
2319            pages_lookahead,
2320            pages_lookback,
2321            max_buffered_rows,
2322            max_buffered_mb,
2323            row_numbers,
2324            row_start_index,
2325            JsonFormat::Json,
2326        )
2327    }
2328
2329    /// Load multiple JSON Lines files and concatenate into one LazyFrame.
2330    pub fn from_json_lines_paths(
2331        paths: &[impl AsRef<Path>],
2332        pages_lookahead: Option<usize>,
2333        pages_lookback: Option<usize>,
2334        max_buffered_rows: Option<usize>,
2335        max_buffered_mb: Option<usize>,
2336        row_numbers: bool,
2337        row_start_index: usize,
2338    ) -> Result<Self> {
2339        Self::from_json_with_format_paths(
2340            paths,
2341            pages_lookahead,
2342            pages_lookback,
2343            max_buffered_rows,
2344            max_buffered_mb,
2345            row_numbers,
2346            row_start_index,
2347            JsonFormat::JsonLines,
2348        )
2349    }
2350
2351    #[allow(clippy::too_many_arguments)]
2352    fn from_json_with_format_paths(
2353        paths: &[impl AsRef<Path>],
2354        pages_lookahead: Option<usize>,
2355        pages_lookback: Option<usize>,
2356        max_buffered_rows: Option<usize>,
2357        max_buffered_mb: Option<usize>,
2358        row_numbers: bool,
2359        row_start_index: usize,
2360        format: JsonFormat,
2361    ) -> Result<Self> {
2362        if paths.is_empty() {
2363            return Err(color_eyre::eyre::eyre!("No paths provided"));
2364        }
2365        if paths.len() == 1 {
2366            return Self::from_json_with_format(
2367                paths[0].as_ref(),
2368                pages_lookahead,
2369                pages_lookback,
2370                max_buffered_rows,
2371                max_buffered_mb,
2372                row_numbers,
2373                row_start_index,
2374                format,
2375            );
2376        }
2377        let mut lazy_frames = Vec::with_capacity(paths.len());
2378        for p in paths {
2379            let file = File::open(p.as_ref())?;
2380            let lf = match &format {
2381                JsonFormat::Json => JsonReader::new(file)
2382                    .with_json_format(JsonFormat::Json)
2383                    .finish()?
2384                    .lazy(),
2385                JsonFormat::JsonLines => JsonReader::new(file)
2386                    .with_json_format(JsonFormat::JsonLines)
2387                    .finish()?
2388                    .lazy(),
2389            };
2390            lazy_frames.push(lf);
2391        }
2392        let lf = polars::prelude::concat(lazy_frames.as_slice(), Default::default())?;
2393        let mut state = Self::new(
2394            lf,
2395            pages_lookahead,
2396            pages_lookback,
2397            max_buffered_rows,
2398            max_buffered_mb,
2399            true,
2400        )?;
2401        state.row_numbers = row_numbers;
2402        state.row_start_index = row_start_index;
2403        Ok(state)
2404    }
2405
2406    pub fn from_delimited(path: &Path, delimiter: u8, options: &OpenOptions) -> Result<Self> {
2407        let pl_path = PlPath::Local(Arc::from(path));
2408        let mut reader = LazyCsvReader::new(pl_path).with_separator(delimiter);
2409        if let Some(skip_lines) = options.skip_lines {
2410            reader = reader.with_skip_lines(skip_lines);
2411        }
2412        if let Some(skip_rows) = options.skip_rows {
2413            reader = reader.with_skip_rows(skip_rows);
2414        }
2415        if let Some(has_header) = options.has_header {
2416            reader = reader.with_has_header(has_header);
2417        }
2418        let lf = reader.finish()?;
2419        let mut state = Self::new(
2420            lf,
2421            options.pages_lookahead,
2422            options.pages_lookback,
2423            options.max_buffered_rows,
2424            options.max_buffered_mb,
2425            true,
2426        )?;
2427        state.row_numbers = options.row_numbers;
2428        state.row_start_index = options.row_start_index;
2429        Ok(state)
2430    }
2431
2432    /// Returns true if a scroll by `rows` would trigger a collect (view would leave the buffer).
2433    /// Used so the UI only shows the throbber when actual data loading will occur.
2434    pub fn scroll_would_trigger_collect(&self, rows: i64) -> bool {
2435        if rows < 0 && self.start_row == 0 {
2436            return false;
2437        }
2438        let new_start_row = if self.start_row as i64 + rows <= 0 {
2439            0
2440        } else {
2441            if let Some(df) = self.df.as_ref() {
2442                if rows > 0 && df.shape().0 <= self.visible_rows {
2443                    return false;
2444                }
2445            }
2446            (self.start_row as i64 + rows) as usize
2447        };
2448        let view_end = new_start_row
2449            + self
2450                .visible_rows
2451                .min(self.num_rows.saturating_sub(new_start_row));
2452        let within_buffer = new_start_row >= self.buffered_start_row
2453            && view_end <= self.buffered_end_row
2454            && self.buffered_end_row > 0;
2455        !within_buffer
2456    }
2457
2458    fn slide_table(&mut self, rows: i64) {
2459        if rows < 0 && self.start_row == 0 {
2460            return;
2461        }
2462
2463        let new_start_row = if self.start_row as i64 + rows <= 0 {
2464            0
2465        } else {
2466            if let Some(df) = self.df.as_ref() {
2467                if rows > 0 && df.shape().0 <= self.visible_rows {
2468                    return;
2469                }
2470            }
2471            (self.start_row as i64 + rows) as usize
2472        };
2473
2474        // Call collect() only when view is outside buffer; otherwise just update start_row.
2475        let view_end = new_start_row
2476            + self
2477                .visible_rows
2478                .min(self.num_rows.saturating_sub(new_start_row));
2479        let within_buffer = new_start_row >= self.buffered_start_row
2480            && view_end <= self.buffered_end_row
2481            && self.buffered_end_row > 0;
2482
2483        if within_buffer {
2484            self.start_row = new_start_row;
2485            return;
2486        }
2487
2488        self.start_row = new_start_row;
2489        self.collect();
2490    }
2491
2492    pub fn collect(&mut self) {
2493        // Update proximity threshold based on visible rows
2494        if self.visible_rows > 0 {
2495            self.proximity_threshold = self.visible_rows;
2496        }
2497
2498        // Run len() only when lf has changed (query, filter, sort, pivot, melt, reset, drill).
2499        if !self.num_rows_valid {
2500            self.num_rows =
2501                match collect_lazy(self.lf.clone().select([len()]), self.polars_streaming) {
2502                    Ok(df) => {
2503                        if let Some(col) = df.get(0) {
2504                            if let Some(AnyValue::UInt32(len)) = col.first() {
2505                                *len as usize
2506                            } else {
2507                                0
2508                            }
2509                        } else {
2510                            0
2511                        }
2512                    }
2513                    Err(_) => 0,
2514                };
2515            self.num_rows_valid = true;
2516        }
2517
2518        if self.num_rows > 0 {
2519            let max_start = self.num_rows.saturating_sub(1);
2520            if self.start_row > max_start {
2521                self.start_row = max_start;
2522            }
2523        } else {
2524            self.start_row = 0;
2525            self.buffered_start_row = 0;
2526            self.buffered_end_row = 0;
2527            self.buffered_df = None;
2528            self.df = None;
2529            self.locked_df = None;
2530            return;
2531        }
2532
2533        // Proximity-based buffer logic
2534        let view_start = self.start_row;
2535        let view_end = self.start_row + self.visible_rows.min(self.num_rows - self.start_row);
2536
2537        // Check if current view is within buffered range
2538        let within_buffer = view_start >= self.buffered_start_row
2539            && view_end <= self.buffered_end_row
2540            && self.buffered_end_row > 0;
2541
2542        // Buffer grows incrementally: initial load and each expansion add only a few pages (lookahead + lookback).
2543        // clamp_buffer_to_max_size caps at max_buffered_rows and slides the window when at cap.
2544        let page_rows = self.visible_rows.max(1);
2545
2546        if within_buffer {
2547            let dist_to_start = view_start.saturating_sub(self.buffered_start_row);
2548            let dist_to_end = self.buffered_end_row.saturating_sub(view_end);
2549
2550            let needs_expansion_back =
2551                dist_to_start <= self.proximity_threshold && self.buffered_start_row > 0;
2552            let needs_expansion_forward =
2553                dist_to_end <= self.proximity_threshold && self.buffered_end_row < self.num_rows;
2554
2555            if !needs_expansion_back && !needs_expansion_forward {
2556                // Column scroll only: reuse cached full buffer and re-slice into locked/scroll columns.
2557                let expected_len = self
2558                    .buffered_end_row
2559                    .saturating_sub(self.buffered_start_row);
2560                if self
2561                    .buffered_df
2562                    .as_ref()
2563                    .is_some_and(|b| b.height() == expected_len)
2564                {
2565                    self.slice_buffer_into_display();
2566                    if self.table_state.selected().is_none() {
2567                        self.table_state.select(Some(0));
2568                    }
2569                    return;
2570                }
2571                self.load_buffer(self.buffered_start_row, self.buffered_end_row);
2572                if self.table_state.selected().is_none() {
2573                    self.table_state.select(Some(0));
2574                }
2575                return;
2576            }
2577
2578            let mut new_buffer_start = if needs_expansion_back {
2579                view_start.saturating_sub(self.pages_lookback * page_rows)
2580            } else {
2581                self.buffered_start_row
2582            };
2583
2584            let mut new_buffer_end = if needs_expansion_forward {
2585                (view_end + self.pages_lookahead * page_rows).min(self.num_rows)
2586            } else {
2587                self.buffered_end_row
2588            };
2589
2590            self.clamp_buffer_to_max_size(
2591                view_start,
2592                view_end,
2593                &mut new_buffer_start,
2594                &mut new_buffer_end,
2595            );
2596            self.load_buffer(new_buffer_start, new_buffer_end);
2597        } else {
2598            // Outside buffer: either extend the previous buffer (so it grows) or load a fresh small window.
2599            // Only extend when the view is "close" to the existing buffer (e.g. user paged down a bit).
2600            // A big jump (e.g. jump to end) should load just a window around the new view, not extend
2601            // the buffer across the whole dataset.
2602            let mut new_buffer_start;
2603            let mut new_buffer_end;
2604
2605            let had_buffer = self.buffered_end_row > 0;
2606            let scrolled_past_end = had_buffer && view_start >= self.buffered_end_row;
2607            let scrolled_past_start = had_buffer && view_end <= self.buffered_start_row;
2608
2609            let extend_forward_ok = scrolled_past_end
2610                && (view_start - self.buffered_end_row) <= self.pages_lookahead * page_rows;
2611            let extend_backward_ok = scrolled_past_start
2612                && (self.buffered_start_row - view_end) <= self.pages_lookback * page_rows;
2613
2614            if extend_forward_ok {
2615                // View is just a few pages past buffer end; extend forward.
2616                new_buffer_start = self.buffered_start_row;
2617                new_buffer_end = (view_end + self.pages_lookahead * page_rows).min(self.num_rows);
2618            } else if extend_backward_ok {
2619                // View is just a few pages before buffer start; extend backward.
2620                new_buffer_start = view_start.saturating_sub(self.pages_lookback * page_rows);
2621                new_buffer_end = self.buffered_end_row;
2622            } else if scrolled_past_end || scrolled_past_start {
2623                // Big jump (e.g. jump to end or jump to start): load a fresh window around the view.
2624                new_buffer_start = view_start.saturating_sub(self.pages_lookback * page_rows);
2625                new_buffer_end = (view_end + self.pages_lookahead * page_rows).min(self.num_rows);
2626                let min_initial_len = (1 + self.pages_lookahead + self.pages_lookback) * page_rows;
2627                let current_len = new_buffer_end.saturating_sub(new_buffer_start);
2628                if current_len < min_initial_len {
2629                    let need = min_initial_len.saturating_sub(current_len);
2630                    let can_extend_end = self.num_rows.saturating_sub(new_buffer_end);
2631                    let can_extend_start = new_buffer_start;
2632                    if can_extend_end >= need {
2633                        new_buffer_end = (new_buffer_end + need).min(self.num_rows);
2634                    } else if can_extend_start >= need {
2635                        new_buffer_start = new_buffer_start.saturating_sub(need);
2636                    } else {
2637                        new_buffer_end = (new_buffer_end + can_extend_end).min(self.num_rows);
2638                        new_buffer_start =
2639                            new_buffer_start.saturating_sub(need.saturating_sub(can_extend_end));
2640                    }
2641                }
2642            } else {
2643                // No buffer yet or big jump: load a fresh small window (view ± a few pages).
2644                new_buffer_start = view_start.saturating_sub(self.pages_lookback * page_rows);
2645                new_buffer_end = (view_end + self.pages_lookahead * page_rows).min(self.num_rows);
2646
2647                // Ensure at least (1 + lookahead + lookback) pages so buffer size is consistent (e.g. 364 at 52 visible).
2648                let min_initial_len = (1 + self.pages_lookahead + self.pages_lookback) * page_rows;
2649                let current_len = new_buffer_end.saturating_sub(new_buffer_start);
2650                if current_len < min_initial_len {
2651                    let need = min_initial_len.saturating_sub(current_len);
2652                    let can_extend_end = self.num_rows.saturating_sub(new_buffer_end);
2653                    let can_extend_start = new_buffer_start;
2654                    if can_extend_end >= need {
2655                        new_buffer_end = (new_buffer_end + need).min(self.num_rows);
2656                    } else if can_extend_start >= need {
2657                        new_buffer_start = new_buffer_start.saturating_sub(need);
2658                    } else {
2659                        new_buffer_end = (new_buffer_end + can_extend_end).min(self.num_rows);
2660                        new_buffer_start =
2661                            new_buffer_start.saturating_sub(need.saturating_sub(can_extend_end));
2662                    }
2663                }
2664            }
2665
2666            self.clamp_buffer_to_max_size(
2667                view_start,
2668                view_end,
2669                &mut new_buffer_start,
2670                &mut new_buffer_end,
2671            );
2672            self.load_buffer(new_buffer_start, new_buffer_end);
2673        }
2674
2675        self.slice_from_buffer();
2676        if self.table_state.selected().is_none() {
2677            self.table_state.select(Some(0));
2678        }
2679    }
2680
2681    /// Invalidate num_rows cache when lf is mutated.
2682    fn invalidate_num_rows(&mut self) {
2683        self.num_rows_valid = false;
2684    }
2685
2686    /// Returns the cached row count when valid (same value shown in the control bar). Use this to
2687    /// avoid an extra full scan for analysis/describe when the table has already been collected.
2688    pub fn num_rows_if_valid(&self) -> Option<usize> {
2689        if self.num_rows_valid {
2690            Some(self.num_rows)
2691        } else {
2692            None
2693        }
2694    }
2695
2696    /// Clamp buffer to max_buffered_rows; when at cap, slide window to keep view inside.
2697    fn clamp_buffer_to_max_size(
2698        &self,
2699        view_start: usize,
2700        view_end: usize,
2701        buffer_start: &mut usize,
2702        buffer_end: &mut usize,
2703    ) {
2704        if self.max_buffered_rows == 0 {
2705            return;
2706        }
2707        let max_len = self.max_buffered_rows;
2708        let requested_len = buffer_end.saturating_sub(*buffer_start);
2709        if requested_len <= max_len {
2710            return;
2711        }
2712        let view_len = view_end.saturating_sub(view_start);
2713        if view_len >= max_len {
2714            *buffer_start = view_start;
2715            *buffer_end = (view_start + max_len).min(self.num_rows);
2716        } else {
2717            let half = (max_len - view_len) / 2;
2718            *buffer_end = (view_end + half).min(self.num_rows);
2719            *buffer_start = (*buffer_end).saturating_sub(max_len);
2720            if *buffer_start > view_start {
2721                *buffer_start = view_start;
2722            }
2723            *buffer_end = (*buffer_start + max_len).min(self.num_rows);
2724        }
2725    }
2726
2727    fn load_buffer(&mut self, buffer_start: usize, buffer_end: usize) {
2728        let buffer_size = buffer_end.saturating_sub(buffer_start);
2729        if buffer_size == 0 {
2730            return;
2731        }
2732
2733        let all_columns: Vec<_> = self
2734            .column_order
2735            .iter()
2736            .map(|name| col(name.as_str()))
2737            .collect();
2738
2739        let use_streaming = self.polars_streaming;
2740        let mut full_df = match collect_lazy(
2741            self.lf
2742                .clone()
2743                .select(all_columns)
2744                .slice(buffer_start as i64, buffer_size as u32),
2745            use_streaming,
2746        ) {
2747            Ok(df) => df,
2748            Err(e) => {
2749                self.error = Some(e);
2750                return;
2751            }
2752        };
2753
2754        let mut effective_buffer_end = buffer_end;
2755        if self.max_buffered_mb > 0 {
2756            let size = full_df.estimated_size();
2757            let max_bytes = self.max_buffered_mb * 1024 * 1024;
2758            if size > max_bytes {
2759                let rows = full_df.height();
2760                if rows > 0 {
2761                    let bytes_per_row = size / rows;
2762                    let max_rows = (max_bytes / bytes_per_row.max(1)).min(rows);
2763                    if max_rows < rows {
2764                        full_df = full_df.slice(0, max_rows);
2765                        effective_buffer_end = buffer_start + max_rows;
2766                    }
2767                }
2768            }
2769        }
2770
2771        if self.locked_columns_count > 0 {
2772            let locked_names: Vec<&str> = self
2773                .column_order
2774                .iter()
2775                .take(self.locked_columns_count)
2776                .map(|s| s.as_str())
2777                .collect();
2778            let locked_df = match full_df.select(locked_names) {
2779                Ok(df) => df,
2780                Err(e) => {
2781                    self.error = Some(e);
2782                    return;
2783                }
2784            };
2785            self.locked_df = if self.is_grouped() {
2786                match self.format_grouped_dataframe(locked_df) {
2787                    Ok(formatted_df) => Some(formatted_df),
2788                    Err(e) => {
2789                        self.error = Some(PolarsError::ComputeError(
2790                            crate::error_display::user_message_from_report(&e, None).into(),
2791                        ));
2792                        return;
2793                    }
2794                }
2795            } else {
2796                Some(locked_df)
2797            };
2798        } else {
2799            self.locked_df = None;
2800        }
2801
2802        let scroll_names: Vec<&str> = self
2803            .column_order
2804            .iter()
2805            .skip(self.locked_columns_count + self.termcol_index)
2806            .map(|s| s.as_str())
2807            .collect();
2808        if scroll_names.is_empty() {
2809            self.df = None;
2810        } else {
2811            let scroll_df = match full_df.select(scroll_names) {
2812                Ok(df) => df,
2813                Err(e) => {
2814                    self.error = Some(e);
2815                    return;
2816                }
2817            };
2818            self.df = if self.is_grouped() {
2819                match self.format_grouped_dataframe(scroll_df) {
2820                    Ok(formatted_df) => Some(formatted_df),
2821                    Err(e) => {
2822                        self.error = Some(PolarsError::ComputeError(
2823                            crate::error_display::user_message_from_report(&e, None).into(),
2824                        ));
2825                        return;
2826                    }
2827                }
2828            } else {
2829                Some(scroll_df)
2830            };
2831        }
2832        if self.error.is_some() {
2833            self.error = None;
2834        }
2835        self.buffered_start_row = buffer_start;
2836        self.buffered_end_row = effective_buffer_end;
2837        self.buffered_df = Some(full_df);
2838    }
2839
2840    /// Recompute locked_df and df from the cached full buffer. Used when only termcol_index (or locked columns) changed.
2841    fn slice_buffer_into_display(&mut self) {
2842        let full_df = match self.buffered_df.as_ref() {
2843            Some(df) => df,
2844            None => return,
2845        };
2846
2847        if self.locked_columns_count > 0 {
2848            let locked_names: Vec<&str> = self
2849                .column_order
2850                .iter()
2851                .take(self.locked_columns_count)
2852                .map(|s| s.as_str())
2853                .collect();
2854            if let Ok(locked_df) = full_df.select(locked_names) {
2855                self.locked_df = if self.is_grouped() {
2856                    self.format_grouped_dataframe(locked_df).ok()
2857                } else {
2858                    Some(locked_df)
2859                };
2860            }
2861        } else {
2862            self.locked_df = None;
2863        }
2864
2865        let scroll_names: Vec<&str> = self
2866            .column_order
2867            .iter()
2868            .skip(self.locked_columns_count + self.termcol_index)
2869            .map(|s| s.as_str())
2870            .collect();
2871        if scroll_names.is_empty() {
2872            self.df = None;
2873        } else if let Ok(scroll_df) = full_df.select(scroll_names) {
2874            self.df = if self.is_grouped() {
2875                self.format_grouped_dataframe(scroll_df).ok()
2876            } else {
2877                Some(scroll_df)
2878            };
2879        }
2880    }
2881
2882    fn slice_from_buffer(&mut self) {
2883        // Buffer contains the full range [buffered_start_row, buffered_end_row)
2884        // The displayed portion [start_row, start_row + visible_rows) is a subset
2885        // We'll slice the displayed portion when rendering based on offset
2886        // No action needed here - the buffer is stored, slicing happens at render time
2887    }
2888
2889    fn format_grouped_dataframe(&self, df: DataFrame) -> Result<DataFrame> {
2890        let schema = df.schema();
2891        let mut new_series = Vec::new();
2892
2893        for (col_name, dtype) in schema.iter() {
2894            let col = df.column(col_name)?;
2895            if matches!(dtype, DataType::List(_)) {
2896                let string_series: Series = col
2897                    .list()?
2898                    .into_iter()
2899                    .map(|opt_list| {
2900                        opt_list.map(|list_series| {
2901                            let values: Vec<String> = list_series
2902                                .iter()
2903                                .take(10)
2904                                .map(|v| v.str_value().to_string())
2905                                .collect();
2906                            if list_series.len() > 10 {
2907                                format!("[{}...] ({} items)", values.join(", "), list_series.len())
2908                            } else {
2909                                format!("[{}]", values.join(", "))
2910                            }
2911                        })
2912                    })
2913                    .collect();
2914                new_series.push(string_series.with_name(col_name.as_str().into()).into());
2915            } else {
2916                new_series.push(col.clone());
2917            }
2918        }
2919
2920        Ok(DataFrame::new(new_series)?)
2921    }
2922
2923    pub fn select_next(&mut self) {
2924        self.table_state.select_next();
2925        if let Some(selected) = self.table_state.selected() {
2926            if selected >= self.visible_rows && self.visible_rows > 0 {
2927                self.slide_table(1);
2928            }
2929        }
2930    }
2931
2932    pub fn page_down(&mut self) {
2933        self.slide_table(self.visible_rows as i64);
2934    }
2935
2936    pub fn select_previous(&mut self) {
2937        if let Some(selected) = self.table_state.selected() {
2938            self.table_state.select_previous();
2939            if selected == 0 && self.start_row > 0 {
2940                self.slide_table(-1);
2941            }
2942        } else {
2943            self.table_state.select(Some(0));
2944        }
2945    }
2946
2947    pub fn scroll_to(&mut self, index: usize) {
2948        if self.start_row == index {
2949            return;
2950        }
2951
2952        if index == 0 {
2953            self.start_row = 0;
2954            self.collect();
2955            self.start_row = 0;
2956        } else {
2957            self.start_row = index;
2958            self.collect();
2959        }
2960    }
2961
2962    /// Scroll so that the given row index is centered in the view when possible (respects table bounds).
2963    /// Selects that row. Used by go-to-line.
2964    pub fn scroll_to_row_centered(&mut self, row_index: usize) {
2965        self.ensure_num_rows();
2966        if self.num_rows == 0 || self.visible_rows == 0 {
2967            return;
2968        }
2969        let center_offset = self.visible_rows / 2;
2970        let mut start_row = row_index.saturating_sub(center_offset);
2971        let max_start = self.num_rows.saturating_sub(self.visible_rows);
2972        start_row = start_row.min(max_start);
2973
2974        if self.start_row == start_row {
2975            let display_idx = row_index
2976                .saturating_sub(start_row)
2977                .min(self.visible_rows.saturating_sub(1));
2978            self.table_state.select(Some(display_idx));
2979            return;
2980        }
2981
2982        self.start_row = start_row;
2983        self.collect();
2984        let display_idx = row_index
2985            .saturating_sub(start_row)
2986            .min(self.visible_rows.saturating_sub(1));
2987        self.table_state.select(Some(display_idx));
2988    }
2989
2990    /// Ensure num_rows is up to date (runs len() query if needed). Used before scroll_to_end.
2991    fn ensure_num_rows(&mut self) {
2992        if self.num_rows_valid {
2993            return;
2994        }
2995        if self.visible_rows > 0 {
2996            self.proximity_threshold = self.visible_rows;
2997        }
2998        self.num_rows = match self.lf.clone().select([len()]).collect() {
2999            Ok(df) => {
3000                if let Some(col) = df.get(0) {
3001                    if let Some(AnyValue::UInt32(len)) = col.first() {
3002                        *len as usize
3003                    } else {
3004                        0
3005                    }
3006                } else {
3007                    0
3008                }
3009            }
3010            Err(_) => 0,
3011        };
3012        self.num_rows_valid = true;
3013    }
3014
3015    /// Jump to the last page; buffer is trimmed/loaded as needed. Selects the last row.
3016    pub fn scroll_to_end(&mut self) {
3017        self.ensure_num_rows();
3018        if self.num_rows == 0 {
3019            self.start_row = 0;
3020            self.buffered_start_row = 0;
3021            self.buffered_end_row = 0;
3022            return;
3023        }
3024        let end_start = self.num_rows.saturating_sub(self.visible_rows);
3025        if self.start_row == end_start {
3026            self.select_last_visible_row();
3027            return;
3028        }
3029        self.start_row = end_start;
3030        self.collect();
3031        self.select_last_visible_row();
3032    }
3033
3034    /// Set table selection to the last row in the current view (for use after scroll_to_end).
3035    fn select_last_visible_row(&mut self) {
3036        if self.num_rows == 0 {
3037            return;
3038        }
3039        let last_row_display_idx = (self.num_rows - 1).saturating_sub(self.start_row);
3040        let sel = last_row_display_idx.min(self.visible_rows.saturating_sub(1));
3041        self.table_state.select(Some(sel));
3042    }
3043
3044    pub fn half_page_down(&mut self) {
3045        let half = (self.visible_rows / 2).max(1) as i64;
3046        self.slide_table(half);
3047    }
3048
3049    pub fn half_page_up(&mut self) {
3050        if self.start_row == 0 {
3051            return;
3052        }
3053        let half = (self.visible_rows / 2).max(1) as i64;
3054        self.slide_table(-half);
3055    }
3056
3057    pub fn page_up(&mut self) {
3058        if self.start_row == 0 {
3059            return;
3060        }
3061        self.slide_table(-(self.visible_rows as i64));
3062    }
3063
3064    pub fn scroll_right(&mut self) {
3065        let max_scroll = self
3066            .column_order
3067            .len()
3068            .saturating_sub(self.locked_columns_count);
3069        if self.termcol_index < max_scroll.saturating_sub(1) {
3070            self.termcol_index += 1;
3071            self.collect();
3072        }
3073    }
3074
3075    pub fn scroll_left(&mut self) {
3076        if self.termcol_index > 0 {
3077            self.termcol_index -= 1;
3078            self.collect();
3079        }
3080    }
3081
3082    pub fn headers(&self) -> Vec<String> {
3083        self.column_order.clone()
3084    }
3085
3086    pub fn set_column_order(&mut self, order: Vec<String>) {
3087        self.column_order = order;
3088        self.buffered_start_row = 0;
3089        self.buffered_end_row = 0;
3090        self.buffered_df = None;
3091        self.collect();
3092    }
3093
3094    pub fn set_locked_columns(&mut self, count: usize) {
3095        self.locked_columns_count = count.min(self.column_order.len());
3096        self.buffered_start_row = 0;
3097        self.buffered_end_row = 0;
3098        self.buffered_df = None;
3099        self.collect();
3100    }
3101
3102    pub fn locked_columns_count(&self) -> usize {
3103        self.locked_columns_count
3104    }
3105
3106    // Getter methods for template creation
3107    pub fn get_filters(&self) -> &[FilterStatement] {
3108        &self.filters
3109    }
3110
3111    pub fn get_sort_columns(&self) -> &[String] {
3112        &self.sort_columns
3113    }
3114
3115    pub fn get_sort_ascending(&self) -> bool {
3116        self.sort_ascending
3117    }
3118
3119    pub fn get_column_order(&self) -> &[String] {
3120        &self.column_order
3121    }
3122
3123    pub fn get_active_query(&self) -> &str {
3124        &self.active_query
3125    }
3126
3127    pub fn get_active_sql_query(&self) -> &str {
3128        &self.active_sql_query
3129    }
3130
3131    pub fn get_active_fuzzy_query(&self) -> &str {
3132        &self.active_fuzzy_query
3133    }
3134
3135    pub fn last_pivot_spec(&self) -> Option<&PivotSpec> {
3136        self.last_pivot_spec.as_ref()
3137    }
3138
3139    pub fn last_melt_spec(&self) -> Option<&MeltSpec> {
3140        self.last_melt_spec.as_ref()
3141    }
3142
3143    pub fn is_grouped(&self) -> bool {
3144        self.schema
3145            .iter()
3146            .any(|(_, dtype)| matches!(dtype, DataType::List(_)))
3147    }
3148
3149    pub fn group_key_columns(&self) -> Vec<String> {
3150        self.schema
3151            .iter()
3152            .filter(|(_, dtype)| !matches!(dtype, DataType::List(_)))
3153            .map(|(name, _)| name.to_string())
3154            .collect()
3155    }
3156
3157    pub fn group_value_columns(&self) -> Vec<String> {
3158        self.schema
3159            .iter()
3160            .filter(|(_, dtype)| matches!(dtype, DataType::List(_)))
3161            .map(|(name, _)| name.to_string())
3162            .collect()
3163    }
3164
3165    /// Estimated heap size in bytes of the currently buffered slice (locked + scrollable), if collected.
3166    pub fn buffered_memory_bytes(&self) -> Option<usize> {
3167        let locked = self
3168            .locked_df
3169            .as_ref()
3170            .map(|df| df.estimated_size())
3171            .unwrap_or(0);
3172        let scroll = self.df.as_ref().map(|df| df.estimated_size()).unwrap_or(0);
3173        if locked == 0 && scroll == 0 {
3174            None
3175        } else {
3176            Some(locked + scroll)
3177        }
3178    }
3179
3180    /// Number of rows currently in the buffer. 0 if no buffer loaded.
3181    pub fn buffered_rows(&self) -> usize {
3182        self.buffered_end_row
3183            .saturating_sub(self.buffered_start_row)
3184    }
3185
3186    /// Current scrollable display buffer. None until first collect().
3187    pub fn display_df(&self) -> Option<&DataFrame> {
3188        self.df.as_ref()
3189    }
3190
3191    /// Visible-window slice of the display buffer (same as passed to render_dataframe).
3192    pub fn display_slice_df(&self) -> Option<DataFrame> {
3193        let df = self.df.as_ref()?;
3194        let offset = self.start_row.saturating_sub(self.buffered_start_row);
3195        let slice_len = self.visible_rows.min(df.height().saturating_sub(offset));
3196        if offset < df.height() && slice_len > 0 {
3197            Some(df.slice(offset as i64, slice_len))
3198        } else {
3199            None
3200        }
3201    }
3202
3203    /// Maximum buffer size in rows (0 = no limit).
3204    pub fn max_buffered_rows(&self) -> usize {
3205        self.max_buffered_rows
3206    }
3207
3208    /// Maximum buffer size in MiB (0 = no limit).
3209    pub fn max_buffered_mb(&self) -> usize {
3210        self.max_buffered_mb
3211    }
3212
3213    pub fn drill_down_into_group(&mut self, group_index: usize) -> Result<()> {
3214        if !self.is_grouped() {
3215            return Ok(());
3216        }
3217
3218        self.grouped_lf = Some(self.lf.clone());
3219
3220        let grouped_df = collect_lazy(self.lf.clone(), self.polars_streaming)?;
3221
3222        if group_index >= grouped_df.height() {
3223            return Err(color_eyre::eyre::eyre!("Group index out of bounds"));
3224        }
3225
3226        let key_columns = self.group_key_columns();
3227        let mut key_values = Vec::new();
3228        for col_name in &key_columns {
3229            let col = grouped_df.column(col_name)?;
3230            let value = col.get(group_index).map_err(|e| {
3231                color_eyre::eyre::eyre!(
3232                    "Group index {} out of bounds for column {}: {}",
3233                    group_index,
3234                    col_name,
3235                    e
3236                )
3237            })?;
3238            key_values.push(value.str_value().to_string());
3239        }
3240        self.drilled_down_group_key = Some(key_values.clone());
3241        self.drilled_down_group_key_columns = Some(key_columns.clone());
3242
3243        let value_columns = self.group_value_columns();
3244        if value_columns.is_empty() {
3245            return Err(color_eyre::eyre::eyre!("No value columns in grouped data"));
3246        }
3247
3248        let mut columns = Vec::new();
3249
3250        let first_value_col = grouped_df.column(&value_columns[0])?;
3251        let first_list_value = first_value_col.get(group_index).map_err(|e| {
3252            color_eyre::eyre::eyre!("Group index {} out of bounds: {}", group_index, e)
3253        })?;
3254        let row_count = if let AnyValue::List(list_series) = first_list_value {
3255            list_series.len()
3256        } else {
3257            0
3258        };
3259
3260        for col_name in &key_columns {
3261            let col = grouped_df.column(col_name)?;
3262            let value = col.get(group_index).map_err(|e| {
3263                color_eyre::eyre::eyre!(
3264                    "Group index {} out of bounds for column {}: {}",
3265                    group_index,
3266                    col_name,
3267                    e
3268                )
3269            })?;
3270            let constant_series = match value {
3271                AnyValue::Int32(v) => Series::new(col_name.as_str().into(), vec![v; row_count]),
3272                AnyValue::Int64(v) => Series::new(col_name.as_str().into(), vec![v; row_count]),
3273                AnyValue::UInt32(v) => Series::new(col_name.as_str().into(), vec![v; row_count]),
3274                AnyValue::UInt64(v) => Series::new(col_name.as_str().into(), vec![v; row_count]),
3275                AnyValue::Float32(v) => Series::new(col_name.as_str().into(), vec![v; row_count]),
3276                AnyValue::Float64(v) => Series::new(col_name.as_str().into(), vec![v; row_count]),
3277                AnyValue::String(v) => {
3278                    Series::new(col_name.as_str().into(), vec![v.to_string(); row_count])
3279                }
3280                AnyValue::Boolean(v) => Series::new(col_name.as_str().into(), vec![v; row_count]),
3281                _ => {
3282                    let str_val = value.str_value().to_string();
3283                    Series::new(col_name.as_str().into(), vec![str_val; row_count])
3284                }
3285            };
3286            columns.push(constant_series.into());
3287        }
3288
3289        for col_name in &value_columns {
3290            let col = grouped_df.column(col_name)?;
3291            let value = col.get(group_index).map_err(|e| {
3292                color_eyre::eyre::eyre!(
3293                    "Group index {} out of bounds for column {}: {}",
3294                    group_index,
3295                    col_name,
3296                    e
3297                )
3298            })?;
3299            if let AnyValue::List(list_series) = value {
3300                let named_series = list_series.with_name(col_name.as_str().into());
3301                columns.push(named_series.into());
3302            }
3303        }
3304
3305        let group_df = DataFrame::new(columns)?;
3306
3307        self.invalidate_num_rows();
3308        self.lf = group_df.lazy();
3309        self.schema = self.lf.clone().collect_schema()?;
3310        self.column_order = self.schema.iter_names().map(|s| s.to_string()).collect();
3311        self.drilled_down_group_index = Some(group_index);
3312        self.start_row = 0;
3313        self.termcol_index = 0;
3314        self.locked_columns_count = 0;
3315        self.table_state.select(Some(0));
3316        self.collect();
3317
3318        Ok(())
3319    }
3320
3321    pub fn drill_up(&mut self) -> Result<()> {
3322        if let Some(grouped_lf) = self.grouped_lf.take() {
3323            self.invalidate_num_rows();
3324            self.lf = grouped_lf;
3325            self.schema = self.lf.clone().collect_schema()?;
3326            self.column_order = self.schema.iter_names().map(|s| s.to_string()).collect();
3327            self.drilled_down_group_index = None;
3328            self.drilled_down_group_key = None;
3329            self.drilled_down_group_key_columns = None;
3330            self.start_row = 0;
3331            self.termcol_index = 0;
3332            self.locked_columns_count = 0;
3333            self.table_state.select(Some(0));
3334            self.collect();
3335            Ok(())
3336        } else {
3337            Err(color_eyre::eyre::eyre!("Not in drill-down mode"))
3338        }
3339    }
3340
3341    pub fn get_analysis_dataframe(&self) -> Result<DataFrame> {
3342        Ok(collect_lazy(self.lf.clone(), self.polars_streaming)?)
3343    }
3344
3345    pub fn get_analysis_context(&self) -> crate::statistics::AnalysisContext {
3346        crate::statistics::AnalysisContext {
3347            has_query: !self.active_query.is_empty(),
3348            query: self.active_query.clone(),
3349            has_filters: !self.filters.is_empty(),
3350            filter_count: self.filters.len(),
3351            is_drilled_down: self.is_drilled_down(),
3352            group_key: self.drilled_down_group_key.clone(),
3353            group_columns: self.drilled_down_group_key_columns.clone(),
3354        }
3355    }
3356
3357    /// Polars 0.52 pivot_stable panics (from_physical Date/UInt32) when index is Date/Datetime. Cast to Int32, restore after.
3358    /// Returns (modified df, list of (column name, original dtype) to restore after pivot).
3359    fn cast_temporal_index_columns_for_pivot(
3360        df: &DataFrame,
3361        index: &[String],
3362    ) -> Result<(DataFrame, Vec<(String, DataType)>)> {
3363        let mut out = df.clone();
3364        let mut restore = Vec::new();
3365        for name in index {
3366            if let Ok(s) = out.column(name) {
3367                let dtype = s.dtype();
3368                if matches!(dtype, DataType::Date | DataType::Datetime(_, _)) {
3369                    restore.push((name.clone(), dtype.clone()));
3370                    let casted = s.cast(&DataType::Int32)?;
3371                    out.with_column(casted)?;
3372                }
3373            }
3374        }
3375        Ok((out, restore))
3376    }
3377
3378    /// Restore Date/Datetime types on index columns after pivot.
3379    fn restore_temporal_index_columns_after_pivot(
3380        pivoted: &mut DataFrame,
3381        restore: &[(String, DataType)],
3382    ) -> Result<()> {
3383        for (name, dtype) in restore {
3384            if let Ok(s) = pivoted.column(name) {
3385                let restored = s.cast(dtype)?;
3386                pivoted.with_column(restored)?;
3387            }
3388        }
3389        Ok(())
3390    }
3391
3392    /// Pivot the current `LazyFrame` (long → wide). Never uses `original_lf`.
3393    /// Collects current `lf`, runs `pivot_stable`, then replaces `lf` with result.
3394    /// We use pivot_stable for all aggregation types: Polars' non-stable pivot() prints
3395    /// "unstable pivot not yet supported, using stable pivot" to stdout, which corrupts the TUI.
3396    pub fn pivot(&mut self, spec: &PivotSpec) -> Result<()> {
3397        let df = collect_lazy(self.lf.clone(), self.polars_streaming)?;
3398        let agg_expr = pivot_agg_expr(spec.aggregation)?;
3399        let index_str: Vec<&str> = spec.index.iter().map(|s| s.as_str()).collect();
3400        let index_opt = if index_str.is_empty() {
3401            None
3402        } else {
3403            Some(index_str)
3404        };
3405
3406        let (df_for_pivot, temporal_index_restore) = if self.workaround_pivot_date_index {
3407            let (df_w, restore) =
3408                Self::cast_temporal_index_columns_for_pivot(&df, spec.index.as_slice())?;
3409            (df_w, Some(restore))
3410        } else {
3411            (df.clone(), None)
3412        };
3413        let sort_new_columns = spec.sort_columns.unwrap_or(true);
3414        let mut pivoted = pivot_stable(
3415            &df_for_pivot,
3416            [spec.pivot_column.as_str()],
3417            index_opt,
3418            Some([spec.value_column.as_str()]),
3419            sort_new_columns,
3420            Some(agg_expr),
3421            None,
3422        )?;
3423        if let Some(restore) = &temporal_index_restore {
3424            Self::restore_temporal_index_columns_after_pivot(&mut pivoted, restore)?;
3425        }
3426
3427        self.last_pivot_spec = Some(spec.clone());
3428        self.last_melt_spec = None;
3429        self.replace_lf_after_reshape(pivoted.lazy())?;
3430        Ok(())
3431    }
3432
3433    /// Melt the current `LazyFrame` (wide → long). Never uses `original_lf`.
3434    pub fn melt(&mut self, spec: &MeltSpec) -> Result<()> {
3435        let on = cols(spec.value_columns.iter().map(|s| s.as_str()));
3436        let index = cols(spec.index.iter().map(|s| s.as_str()));
3437        let args = UnpivotArgsDSL {
3438            on,
3439            index,
3440            variable_name: Some(PlSmallStr::from(spec.variable_name.as_str())),
3441            value_name: Some(PlSmallStr::from(spec.value_name.as_str())),
3442        };
3443        let lf = self.lf.clone().unpivot(args);
3444        self.last_melt_spec = Some(spec.clone());
3445        self.last_pivot_spec = None;
3446        self.replace_lf_after_reshape(lf)?;
3447        Ok(())
3448    }
3449
3450    fn replace_lf_after_reshape(&mut self, lf: LazyFrame) -> Result<()> {
3451        self.invalidate_num_rows();
3452        self.lf = lf;
3453        self.schema = self.lf.clone().collect_schema()?;
3454        self.column_order = self.schema.iter_names().map(|s| s.to_string()).collect();
3455        self.filters.clear();
3456        self.sort_columns.clear();
3457        self.active_query.clear();
3458        self.active_sql_query.clear();
3459        self.active_fuzzy_query.clear();
3460        self.error = None;
3461        self.df = None;
3462        self.locked_df = None;
3463        self.grouped_lf = None;
3464        self.drilled_down_group_index = None;
3465        self.drilled_down_group_key = None;
3466        self.drilled_down_group_key_columns = None;
3467        self.start_row = 0;
3468        self.termcol_index = 0;
3469        self.locked_columns_count = 0;
3470        self.buffered_start_row = 0;
3471        self.buffered_end_row = 0;
3472        self.buffered_df = None;
3473        self.table_state.select(Some(0));
3474        self.collect();
3475        Ok(())
3476    }
3477
3478    pub fn is_drilled_down(&self) -> bool {
3479        self.drilled_down_group_index.is_some()
3480    }
3481
3482    fn apply_transformations(&mut self) {
3483        let mut lf = self.lf.clone();
3484        let mut final_expr: Option<Expr> = None;
3485
3486        for filter in &self.filters {
3487            let col_expr = col(&filter.column);
3488            let val_lit = if let Some(dtype) = self.schema.get(&filter.column) {
3489                match dtype {
3490                    DataType::Float32 | DataType::Float64 => filter
3491                        .value
3492                        .parse::<f64>()
3493                        .map(lit)
3494                        .unwrap_or_else(|_| lit(filter.value.as_str())),
3495                    DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => filter
3496                        .value
3497                        .parse::<i64>()
3498                        .map(lit)
3499                        .unwrap_or_else(|_| lit(filter.value.as_str())),
3500                    DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 => {
3501                        filter
3502                            .value
3503                            .parse::<u64>()
3504                            .map(lit)
3505                            .unwrap_or_else(|_| lit(filter.value.as_str()))
3506                    }
3507                    DataType::Boolean => filter
3508                        .value
3509                        .parse::<bool>()
3510                        .map(lit)
3511                        .unwrap_or_else(|_| lit(filter.value.as_str())),
3512                    _ => lit(filter.value.as_str()),
3513                }
3514            } else {
3515                lit(filter.value.as_str())
3516            };
3517
3518            let op_expr = match filter.operator {
3519                FilterOperator::Eq => col_expr.eq(val_lit),
3520                FilterOperator::NotEq => col_expr.neq(val_lit),
3521                FilterOperator::Gt => col_expr.gt(val_lit),
3522                FilterOperator::Lt => col_expr.lt(val_lit),
3523                FilterOperator::GtEq => col_expr.gt_eq(val_lit),
3524                FilterOperator::LtEq => col_expr.lt_eq(val_lit),
3525                FilterOperator::Contains => {
3526                    let val = filter.value.clone();
3527                    col_expr.str().contains_literal(lit(val))
3528                }
3529                FilterOperator::NotContains => {
3530                    let val = filter.value.clone();
3531                    col_expr.str().contains_literal(lit(val)).not()
3532                }
3533            };
3534
3535            if let Some(current) = final_expr {
3536                final_expr = Some(match filter.logical_op {
3537                    LogicalOperator::And => current.and(op_expr),
3538                    LogicalOperator::Or => current.or(op_expr),
3539                });
3540            } else {
3541                final_expr = Some(op_expr);
3542            }
3543        }
3544
3545        if let Some(e) = final_expr {
3546            lf = lf.filter(e);
3547        }
3548
3549        if !self.sort_columns.is_empty() {
3550            let options = SortMultipleOptions {
3551                descending: self
3552                    .sort_columns
3553                    .iter()
3554                    .map(|_| !self.sort_ascending)
3555                    .collect(),
3556                ..Default::default()
3557            };
3558            lf = lf.sort_by_exprs(
3559                self.sort_columns.iter().map(col).collect::<Vec<_>>(),
3560                options,
3561            );
3562        } else if !self.sort_ascending {
3563            lf = lf.reverse();
3564        }
3565
3566        self.invalidate_num_rows();
3567        self.lf = lf;
3568        self.collect();
3569    }
3570
3571    pub fn sort(&mut self, columns: Vec<String>, ascending: bool) {
3572        self.sort_columns = columns;
3573        self.sort_ascending = ascending;
3574        self.buffered_start_row = 0;
3575        self.buffered_end_row = 0;
3576        self.buffered_df = None;
3577        self.apply_transformations();
3578    }
3579
3580    pub fn reverse(&mut self) {
3581        self.sort_ascending = !self.sort_ascending;
3582
3583        self.buffered_start_row = 0;
3584        self.buffered_end_row = 0;
3585        self.buffered_df = None;
3586
3587        if !self.sort_columns.is_empty() {
3588            let options = SortMultipleOptions {
3589                descending: self
3590                    .sort_columns
3591                    .iter()
3592                    .map(|_| !self.sort_ascending)
3593                    .collect(),
3594                ..Default::default()
3595            };
3596            self.invalidate_num_rows();
3597            self.lf = self.lf.clone().sort_by_exprs(
3598                self.sort_columns.iter().map(col).collect::<Vec<_>>(),
3599                options,
3600            );
3601            self.collect();
3602        } else {
3603            self.invalidate_num_rows();
3604            self.lf = self.lf.clone().reverse();
3605            self.collect();
3606        }
3607    }
3608
3609    pub fn filter(&mut self, filters: Vec<FilterStatement>) {
3610        self.filters = filters;
3611        self.buffered_start_row = 0;
3612        self.buffered_end_row = 0;
3613        self.buffered_df = None;
3614        self.apply_transformations();
3615    }
3616
3617    pub fn query(&mut self, query: String) {
3618        self.error = None;
3619
3620        let trimmed_query = query.trim();
3621        if trimmed_query.is_empty() {
3622            self.reset_lf_to_original();
3623            self.collect();
3624            return;
3625        }
3626
3627        match parse_query(&query) {
3628            Ok((cols, filter, group_by_cols, group_by_col_names)) => {
3629                let mut lf = self.original_lf.clone();
3630                let mut schema_opt: Option<Arc<Schema>> = None;
3631
3632                // Apply filter first (where clause)
3633                if let Some(f) = filter {
3634                    lf = lf.filter(f);
3635                }
3636
3637                if !group_by_cols.is_empty() {
3638                    if !cols.is_empty() {
3639                        lf = lf.group_by(group_by_cols.clone()).agg(cols);
3640                    } else {
3641                        let schema = match lf.clone().collect_schema() {
3642                            Ok(s) => s,
3643                            Err(e) => {
3644                                self.error = Some(e);
3645                                return; // Don't modify state on error
3646                            }
3647                        };
3648                        let all_columns: Vec<String> =
3649                            schema.iter_names().map(|s| s.to_string()).collect();
3650
3651                        // In Polars, when you group_by and aggregate columns without explicit aggregation functions,
3652                        // Polars automatically collects the values as lists. We need to aggregate all columns
3653                        // except the group columns to avoid duplicates.
3654                        let mut agg_exprs = Vec::new();
3655                        for col_name in &all_columns {
3656                            if !group_by_col_names.contains(col_name) {
3657                                agg_exprs.push(col(col_name));
3658                            }
3659                        }
3660
3661                        lf = lf.group_by(group_by_cols.clone()).agg(agg_exprs);
3662                    }
3663                    // Sort by the result's group-key column names (first N columns after agg).
3664                    // Works for aliased or plain names without relying on parser-derived names.
3665                    let schema = match lf.collect_schema() {
3666                        Ok(s) => s,
3667                        Err(e) => {
3668                            self.error = Some(e);
3669                            return;
3670                        }
3671                    };
3672                    schema_opt = Some(schema.clone());
3673                    let sort_exprs: Vec<Expr> = schema
3674                        .iter_names()
3675                        .take(group_by_cols.len())
3676                        .map(|n| col(n.as_str()))
3677                        .collect();
3678                    lf = lf.sort_by_exprs(sort_exprs, Default::default());
3679                } else if !cols.is_empty() {
3680                    lf = lf.select(cols);
3681                }
3682
3683                let schema = match schema_opt {
3684                    Some(s) => s,
3685                    None => match lf.collect_schema() {
3686                        Ok(s) => s,
3687                        Err(e) => {
3688                            self.error = Some(e);
3689                            return;
3690                        }
3691                    },
3692                };
3693
3694                self.schema = schema;
3695                self.invalidate_num_rows();
3696                self.lf = lf;
3697                self.column_order = self.schema.iter_names().map(|s| s.to_string()).collect();
3698
3699                // Lock grouped columns if by clause was used
3700                // Only lock the columns specified in the 'by' clause, not the value columns
3701                if !group_by_col_names.is_empty() {
3702                    // Group columns appear first in Polars results, so count consecutive
3703                    // columns from the start that are in group_by_col_names
3704                    let mut locked_count = 0;
3705                    for col_name in &self.column_order {
3706                        if group_by_col_names.contains(col_name) {
3707                            locked_count += 1;
3708                        } else {
3709                            // Once we hit a non-group column, we've passed all group columns
3710                            break;
3711                        }
3712                    }
3713                    self.locked_columns_count = locked_count;
3714                } else {
3715                    self.locked_columns_count = 0;
3716                }
3717
3718                // Clear filters when using query
3719                self.filters.clear();
3720                self.sort_columns.clear();
3721                self.sort_ascending = true;
3722                self.start_row = 0;
3723                self.termcol_index = 0;
3724                self.active_query = query;
3725                self.buffered_start_row = 0;
3726                self.buffered_end_row = 0;
3727                self.buffered_df = None;
3728                // Reset drill-down state when applying new query
3729                self.drilled_down_group_index = None;
3730                self.drilled_down_group_key = None;
3731                self.drilled_down_group_key_columns = None;
3732                self.grouped_lf = None;
3733                // Reset table state selection
3734                self.table_state.select(Some(0));
3735                // Collect will clamp start_row to valid range, but we want to ensure it's 0
3736                // So we set it to 0, collect (which may clamp it), then ensure it's 0 again
3737                self.collect();
3738                // After collect(), ensure we're at the top (collect() may have clamped if num_rows was wrong)
3739                // But if num_rows > 0, we want start_row = 0 to show the first row
3740                if self.num_rows > 0 {
3741                    self.start_row = 0;
3742                }
3743            }
3744            Err(e) => {
3745                // Parse errors are already user-facing strings; store as ComputeError
3746                self.error = Some(PolarsError::ComputeError(e.into()));
3747            }
3748        }
3749    }
3750
3751    /// Execute a SQL query against the current LazyFrame (registered as table "df").
3752    /// Empty SQL resets to original state. Does not call collect(); the event loop does that via AppEvent::Collect.
3753    pub fn sql_query(&mut self, sql: String) {
3754        self.error = None;
3755        let trimmed = sql.trim();
3756        if trimmed.is_empty() {
3757            self.reset_lf_to_original();
3758            return;
3759        }
3760
3761        #[cfg(feature = "sql")]
3762        {
3763            use polars_sql::SQLContext;
3764            let mut ctx = SQLContext::new();
3765            ctx.register("df", self.lf.clone());
3766            match ctx.execute(trimmed) {
3767                Ok(result_lf) => {
3768                    let schema = match result_lf.clone().collect_schema() {
3769                        Ok(s) => s,
3770                        Err(e) => {
3771                            self.error = Some(e);
3772                            return;
3773                        }
3774                    };
3775                    self.schema = schema;
3776                    self.invalidate_num_rows();
3777                    self.lf = result_lf;
3778                    self.column_order = self.schema.iter_names().map(|s| s.to_string()).collect();
3779                    self.active_sql_query = sql;
3780                    self.locked_columns_count = 0;
3781                    self.filters.clear();
3782                    self.sort_columns.clear();
3783                    self.sort_ascending = true;
3784                    self.start_row = 0;
3785                    self.termcol_index = 0;
3786                    self.drilled_down_group_index = None;
3787                    self.drilled_down_group_key = None;
3788                    self.drilled_down_group_key_columns = None;
3789                    self.grouped_lf = None;
3790                    self.buffered_start_row = 0;
3791                    self.buffered_end_row = 0;
3792                    self.buffered_df = None;
3793                    self.table_state.select(Some(0));
3794                }
3795                Err(e) => {
3796                    self.error = Some(e);
3797                }
3798            }
3799        }
3800
3801        #[cfg(not(feature = "sql"))]
3802        {
3803            self.error = Some(PolarsError::ComputeError(
3804                format!("SQL support not compiled in (build with --features sql)").into(),
3805            ));
3806        }
3807    }
3808
3809    /// Fuzzy search: filter rows where any string column matches the query.
3810    /// Query is split on whitespace; each token must match (in order, case-insensitive) in some string column.
3811    /// Empty query resets to original_lf.
3812    pub fn fuzzy_search(&mut self, query: String) {
3813        self.error = None;
3814        let trimmed = query.trim();
3815        if trimmed.is_empty() {
3816            self.reset_lf_to_original();
3817            self.collect();
3818            return;
3819        }
3820        let string_cols: Vec<String> = self
3821            .schema
3822            .iter()
3823            .filter(|(_, dtype)| dtype.is_string())
3824            .map(|(name, _)| name.to_string())
3825            .collect();
3826        if string_cols.is_empty() {
3827            self.error = Some(PolarsError::ComputeError(
3828                "Fuzzy search requires at least one string column".into(),
3829            ));
3830            return;
3831        }
3832        let tokens: Vec<&str> = trimmed
3833            .split_whitespace()
3834            .filter(|s| !s.is_empty())
3835            .collect();
3836        let token_exprs: Vec<Expr> = tokens
3837            .iter()
3838            .map(|token| {
3839                let pattern = fuzzy_token_regex(token);
3840                string_cols
3841                    .iter()
3842                    .map(|c| col(c.as_str()).str().contains(lit(pattern.as_str()), false))
3843                    .reduce(|a, b| a.or(b))
3844                    .unwrap()
3845            })
3846            .collect();
3847        let combined = token_exprs.into_iter().reduce(|a, b| a.and(b)).unwrap();
3848        self.lf = self.original_lf.clone().filter(combined);
3849        self.filters.clear();
3850        self.sort_columns.clear();
3851        self.active_query.clear();
3852        self.active_sql_query.clear();
3853        self.active_fuzzy_query = query;
3854        // Reset view and buffer so collect() runs on the new lf
3855        self.locked_columns_count = 0;
3856        self.start_row = 0;
3857        self.termcol_index = 0;
3858        self.drilled_down_group_index = None;
3859        self.drilled_down_group_key = None;
3860        self.drilled_down_group_key_columns = None;
3861        self.grouped_lf = None;
3862        self.buffered_start_row = 0;
3863        self.buffered_end_row = 0;
3864        self.buffered_df = None;
3865        self.table_state.select(Some(0));
3866        self.invalidate_num_rows();
3867        self.collect();
3868    }
3869}
3870
3871/// Case-insensitive regex for one token: chars in order with `.*` between.
3872pub(crate) fn fuzzy_token_regex(token: &str) -> String {
3873    let inner: String =
3874        token
3875            .chars()
3876            .map(|c| regex::escape(&c.to_string()))
3877            .fold(String::new(), |mut s, e| {
3878                if !s.is_empty() {
3879                    s.push_str(".*");
3880                }
3881                s.push_str(&e);
3882                s
3883            });
3884    format!("(?i).*{}.*", inner)
3885}
3886
3887pub struct DataTable {
3888    pub header_bg: Color,
3889    pub header_fg: Color,
3890    pub row_numbers_fg: Color,
3891    pub separator_fg: Color,
3892    pub table_cell_padding: u16,
3893    pub alternate_row_bg: Option<Color>,
3894    /// When true, colorize cells by column type using the optional colors below.
3895    pub column_colors: bool,
3896    pub str_col: Option<Color>,
3897    pub int_col: Option<Color>,
3898    pub float_col: Option<Color>,
3899    pub bool_col: Option<Color>,
3900    pub temporal_col: Option<Color>,
3901}
3902
3903impl Default for DataTable {
3904    fn default() -> Self {
3905        Self {
3906            header_bg: Color::Indexed(236),
3907            header_fg: Color::White,
3908            row_numbers_fg: Color::DarkGray,
3909            separator_fg: Color::White,
3910            table_cell_padding: 1,
3911            alternate_row_bg: None,
3912            column_colors: false,
3913            str_col: None,
3914            int_col: None,
3915            float_col: None,
3916            bool_col: None,
3917            temporal_col: None,
3918        }
3919    }
3920}
3921
3922/// Parameters for rendering the row numbers column.
3923struct RowNumbersParams {
3924    start_row: usize,
3925    visible_rows: usize,
3926    num_rows: usize,
3927    row_start_index: usize,
3928    selected_row: Option<usize>,
3929}
3930
3931impl DataTable {
3932    pub fn new() -> Self {
3933        Self::default()
3934    }
3935
3936    pub fn with_colors(
3937        mut self,
3938        header_bg: Color,
3939        header_fg: Color,
3940        row_numbers_fg: Color,
3941        separator_fg: Color,
3942    ) -> Self {
3943        self.header_bg = header_bg;
3944        self.header_fg = header_fg;
3945        self.row_numbers_fg = row_numbers_fg;
3946        self.separator_fg = separator_fg;
3947        self
3948    }
3949
3950    pub fn with_cell_padding(mut self, padding: u16) -> Self {
3951        self.table_cell_padding = padding;
3952        self
3953    }
3954
3955    pub fn with_alternate_row_bg(mut self, color: Option<Color>) -> Self {
3956        self.alternate_row_bg = color;
3957        self
3958    }
3959
3960    /// Enable column-type coloring and set colors for string, int, float, bool, and temporal columns.
3961    pub fn with_column_type_colors(
3962        mut self,
3963        str_col: Color,
3964        int_col: Color,
3965        float_col: Color,
3966        bool_col: Color,
3967        temporal_col: Color,
3968    ) -> Self {
3969        self.column_colors = true;
3970        self.str_col = Some(str_col);
3971        self.int_col = Some(int_col);
3972        self.float_col = Some(float_col);
3973        self.bool_col = Some(bool_col);
3974        self.temporal_col = Some(temporal_col);
3975        self
3976    }
3977
3978    /// Return the color for a column dtype when column_colors is enabled.
3979    fn column_type_color(&self, dtype: &DataType) -> Option<Color> {
3980        if !self.column_colors {
3981            return None;
3982        }
3983        match dtype {
3984            DataType::String => self.str_col,
3985            DataType::Int8
3986            | DataType::Int16
3987            | DataType::Int32
3988            | DataType::Int64
3989            | DataType::UInt8
3990            | DataType::UInt16
3991            | DataType::UInt32
3992            | DataType::UInt64 => self.int_col,
3993            DataType::Float32 | DataType::Float64 => self.float_col,
3994            DataType::Boolean => self.bool_col,
3995            DataType::Date | DataType::Datetime(_, _) | DataType::Time | DataType::Duration(_) => {
3996                self.temporal_col
3997            }
3998            _ => None,
3999        }
4000    }
4001
4002    fn render_dataframe(
4003        &self,
4004        df: &DataFrame,
4005        area: Rect,
4006        buf: &mut Buffer,
4007        state: &mut TableState,
4008        _row_numbers: bool,
4009        _start_row_offset: usize,
4010    ) {
4011        // make each column as wide as it needs to be to fit the content
4012        let (height, cols) = df.shape();
4013
4014        // widths starts at the length of each column naame
4015        let mut widths: Vec<u16> = df
4016            .get_column_names()
4017            .iter()
4018            .map(|name| name.chars().count() as u16)
4019            .collect();
4020
4021        let mut used_width = 0;
4022
4023        // rows is a vector initialized to a vector of lenth "height" empty rows
4024        let mut rows: Vec<Vec<Cell>> = vec![vec![]; height];
4025        let mut visible_columns = 0;
4026
4027        let max_rows = height.min(if area.height > 1 {
4028            area.height as usize - 1
4029        } else {
4030            0
4031        });
4032
4033        for col_index in 0..cols {
4034            let mut max_len = widths[col_index];
4035            let col_data = &df[col_index];
4036            let col_color = self.column_type_color(col_data.dtype());
4037
4038            for (row_index, row) in rows.iter_mut().take(max_rows).enumerate() {
4039                let value = col_data.get(row_index).unwrap();
4040                let val_str: Cow<str> = if matches!(value, AnyValue::Null) {
4041                    Cow::Borrowed("")
4042                } else {
4043                    value.str_value()
4044                };
4045                let len = val_str.chars().count() as u16;
4046                max_len = max_len.max(len);
4047                let cell = match col_color {
4048                    Some(c) => Cell::from(Line::from(Span::styled(
4049                        val_str.into_owned(),
4050                        Style::default().fg(c),
4051                    ))),
4052                    None => Cell::from(Line::from(val_str)),
4053                };
4054                row.push(cell);
4055            }
4056
4057            // Use > not >= so the last column is shown when it fits exactly (no padding needed after it)
4058            let overflows = (used_width + max_len) > area.width;
4059
4060            if overflows && col_data.dtype() == &DataType::String {
4061                let visible_width = area.width.saturating_sub(used_width);
4062                visible_columns += 1;
4063                widths[col_index] = visible_width;
4064                break;
4065            } else if !overflows {
4066                visible_columns += 1;
4067                widths[col_index] = max_len;
4068                used_width += max_len + self.table_cell_padding;
4069            } else {
4070                break;
4071            }
4072        }
4073
4074        widths.truncate(visible_columns);
4075        // convert rows to a vector of Row, with optional alternate row background
4076        let rows: Vec<Row> = rows
4077            .into_iter()
4078            .enumerate()
4079            .map(|(row_index, mut row)| {
4080                row.truncate(visible_columns);
4081                let row_style = if row_index % 2 == 1 {
4082                    self.alternate_row_bg
4083                        .map(|c| Style::default().bg(c))
4084                        .unwrap_or_default()
4085                } else {
4086                    Style::default()
4087                };
4088                Row::new(row).style(row_style)
4089            })
4090            .collect();
4091
4092        let header_row_style = if self.header_bg == Color::Reset {
4093            Style::default().fg(self.header_fg)
4094        } else {
4095            Style::default().bg(self.header_bg).fg(self.header_fg)
4096        };
4097        let headers: Vec<Span> = df
4098            .get_column_names()
4099            .iter()
4100            .take(visible_columns)
4101            .map(|name| Span::styled(name.to_string(), Style::default()))
4102            .collect();
4103
4104        StatefulWidget::render(
4105            Table::new(rows, widths)
4106                .column_spacing(self.table_cell_padding)
4107                .header(Row::new(headers).style(header_row_style))
4108                .row_highlight_style(Style::default().add_modifier(Modifier::REVERSED)),
4109            area,
4110            buf,
4111            state,
4112        );
4113    }
4114
4115    fn render_row_numbers(&self, area: Rect, buf: &mut Buffer, params: RowNumbersParams) {
4116        // Header row: same style as the rest of the column headers (fill full width so color matches)
4117        let header_style = if self.header_bg == Color::Reset {
4118            Style::default().fg(self.header_fg)
4119        } else {
4120            Style::default().bg(self.header_bg).fg(self.header_fg)
4121        };
4122        let header_fill = " ".repeat(area.width as usize);
4123        Paragraph::new(header_fill).style(header_style).render(
4124            Rect {
4125                x: area.x,
4126                y: area.y,
4127                width: area.width,
4128                height: 1,
4129            },
4130            buf,
4131        );
4132
4133        // Only render up to the actual number of rows in the data
4134        let rows_to_render = params
4135            .visible_rows
4136            .min(params.num_rows.saturating_sub(params.start_row));
4137
4138        if rows_to_render == 0 {
4139            return;
4140        }
4141
4142        // Calculate width needed for largest row number
4143        let max_row_num =
4144            params.start_row + rows_to_render.saturating_sub(1) + params.row_start_index;
4145        let max_width = max_row_num.to_string().len();
4146
4147        // Render row numbers
4148        for row_idx in 0..rows_to_render.min(area.height.saturating_sub(1) as usize) {
4149            let row_num = params.start_row + row_idx + params.row_start_index;
4150            let row_num_text = row_num.to_string();
4151
4152            // Right-align row numbers within the available width
4153            let padding = max_width.saturating_sub(row_num_text.len());
4154            let padded_text = format!("{}{}", " ".repeat(padding), row_num_text);
4155
4156            // Match main table background: default when row is even (or no alternate);
4157            // when alternate_row_bg is set, odd rows use that background.
4158            // When selected: same background as row (no inversion), foreground = terminal default.
4159            let is_selected = params.selected_row == Some(row_idx);
4160            let (fg, bg) = if is_selected {
4161                (
4162                    Color::Reset,
4163                    self.alternate_row_bg.filter(|_| row_idx % 2 == 1),
4164                )
4165            } else {
4166                (
4167                    self.row_numbers_fg,
4168                    self.alternate_row_bg.filter(|_| row_idx % 2 == 1),
4169                )
4170            };
4171            let row_num_style = match bg {
4172                Some(bg_color) => Style::default().fg(fg).bg(bg_color),
4173                None => Style::default().fg(fg),
4174            };
4175
4176            let y = area.y + row_idx as u16 + 1; // +1 for header row
4177            if y < area.y + area.height {
4178                Paragraph::new(padded_text).style(row_num_style).render(
4179                    Rect {
4180                        x: area.x,
4181                        y,
4182                        width: area.width,
4183                        height: 1,
4184                    },
4185                    buf,
4186                );
4187            }
4188        }
4189    }
4190}
4191
4192impl StatefulWidget for DataTable {
4193    type State = DataTableState;
4194
4195    fn render(self, area: Rect, buf: &mut Buffer, state: &mut Self::State) {
4196        state.visible_termcols = area.width as usize;
4197        let new_visible_rows = if area.height > 0 {
4198            (area.height - 1) as usize
4199        } else {
4200            0
4201        };
4202        let needs_collect = new_visible_rows != state.visible_rows;
4203        state.visible_rows = new_visible_rows;
4204
4205        if let Some(selected) = state.table_state.selected() {
4206            if selected >= state.visible_rows {
4207                state.table_state.select(Some(state.visible_rows - 1))
4208            }
4209        }
4210
4211        if needs_collect {
4212            state.collect();
4213        }
4214
4215        // Only show errors in main view if not suppressed (e.g., when query input is active)
4216        // Query errors should only be shown in the query input frame
4217        if let Some(error) = state.error.as_ref() {
4218            if !state.suppress_error_display {
4219                Paragraph::new(format!("Error: {}", user_message_from_polars(error)))
4220                    .centered()
4221                    .block(
4222                        Block::default()
4223                            .borders(Borders::NONE)
4224                            .padding(Padding::top(area.height / 2)),
4225                    )
4226                    .wrap(ratatui::widgets::Wrap { trim: true })
4227                    .render(area, buf);
4228                return;
4229            }
4230            // If suppress_error_display is true, continue rendering the table normally
4231        }
4232
4233        // Calculate row number column width if enabled
4234        let row_num_width = if state.row_numbers {
4235            let max_row_num = state.start_row + state.visible_rows.saturating_sub(1) + 1; // +1 for 1-based, +1 for potential
4236            max_row_num.to_string().len().max(1) as u16 + 1 // +1 for spacing
4237        } else {
4238            0
4239        };
4240
4241        // Calculate locked columns width if any
4242        let mut locked_width = row_num_width;
4243        if let Some(locked_df) = state.locked_df.as_ref() {
4244            let (_, cols) = locked_df.shape();
4245            for col_index in 0..cols {
4246                let col_name = locked_df.get_column_names()[col_index];
4247                let mut max_len = col_name.chars().count() as u16;
4248                let col_data = &locked_df[col_index];
4249                for row_index in 0..locked_df.height().min(state.visible_rows) {
4250                    let value = col_data.get(row_index).unwrap();
4251                    let val_str: Cow<str> = if matches!(value, AnyValue::Null) {
4252                        Cow::Borrowed("")
4253                    } else {
4254                        value.str_value()
4255                    };
4256                    let len = val_str.chars().count() as u16;
4257                    max_len = max_len.max(len);
4258                }
4259                locked_width += max_len + 1;
4260            }
4261        }
4262
4263        // Split area into locked and scrollable parts
4264        if locked_width > row_num_width && locked_width < area.width {
4265            let locked_area = Rect {
4266                x: area.x,
4267                y: area.y,
4268                width: locked_width,
4269                height: area.height,
4270            };
4271            let separator_x = locked_area.x + locked_area.width;
4272
4273            // If row numbers are enabled, render them first in a separate area
4274            if state.row_numbers {
4275                let row_num_area = Rect {
4276                    x: area.x,
4277                    y: area.y,
4278                    width: row_num_width,
4279                    height: area.height,
4280                };
4281                self.render_row_numbers(
4282                    row_num_area,
4283                    buf,
4284                    RowNumbersParams {
4285                        start_row: state.start_row,
4286                        visible_rows: state.visible_rows,
4287                        num_rows: state.num_rows,
4288                        row_start_index: state.row_start_index,
4289                        selected_row: state.table_state.selected(),
4290                    },
4291                );
4292            }
4293            let scrollable_area = Rect {
4294                x: separator_x + 1,
4295                y: area.y,
4296                width: area.width.saturating_sub(locked_width + 1),
4297                height: area.height,
4298            };
4299
4300            // Render locked columns (no background shading, just the vertical separator)
4301            if let Some(locked_df) = state.locked_df.as_ref() {
4302                // Adjust locked_area to account for row numbers if present
4303                let adjusted_locked_area = if state.row_numbers {
4304                    Rect {
4305                        x: area.x + row_num_width,
4306                        y: area.y,
4307                        width: locked_width - row_num_width,
4308                        height: area.height,
4309                    }
4310                } else {
4311                    locked_area
4312                };
4313
4314                // Slice buffer to visible portion
4315                let offset = state.start_row.saturating_sub(state.buffered_start_row);
4316                let slice_len = state
4317                    .visible_rows
4318                    .min(locked_df.height().saturating_sub(offset));
4319                if offset < locked_df.height() && slice_len > 0 {
4320                    let sliced_df = locked_df.slice(offset as i64, slice_len);
4321                    self.render_dataframe(
4322                        &sliced_df,
4323                        adjusted_locked_area,
4324                        buf,
4325                        &mut state.table_state,
4326                        false,
4327                        state.start_row,
4328                    );
4329                }
4330            }
4331
4332            // Draw vertical separator line
4333            let separator_x_adjusted = if state.row_numbers {
4334                area.x + row_num_width + (locked_width - row_num_width)
4335            } else {
4336                separator_x
4337            };
4338            for y in area.y..area.y + area.height {
4339                let cell = &mut buf[(separator_x_adjusted, y)];
4340                cell.set_char('│');
4341                cell.set_style(Style::default().fg(self.separator_fg));
4342            }
4343
4344            // Adjust scrollable area to account for row numbers
4345            let adjusted_scrollable_area = if state.row_numbers {
4346                Rect {
4347                    x: separator_x_adjusted + 1,
4348                    y: area.y,
4349                    width: area.width.saturating_sub(locked_width + 1),
4350                    height: area.height,
4351                }
4352            } else {
4353                scrollable_area
4354            };
4355
4356            // Render scrollable columns
4357            if let Some(df) = state.df.as_ref() {
4358                // Slice buffer to visible portion
4359                let offset = state.start_row.saturating_sub(state.buffered_start_row);
4360                let slice_len = state.visible_rows.min(df.height().saturating_sub(offset));
4361                if offset < df.height() && slice_len > 0 {
4362                    let sliced_df = df.slice(offset as i64, slice_len);
4363                    self.render_dataframe(
4364                        &sliced_df,
4365                        adjusted_scrollable_area,
4366                        buf,
4367                        &mut state.table_state,
4368                        false,
4369                        state.start_row,
4370                    );
4371                }
4372            }
4373        } else if let Some(df) = state.df.as_ref() {
4374            // No locked columns, render normally
4375            // If row numbers are enabled, render them first
4376            if state.row_numbers {
4377                let row_num_area = Rect {
4378                    x: area.x,
4379                    y: area.y,
4380                    width: row_num_width,
4381                    height: area.height,
4382                };
4383                self.render_row_numbers(
4384                    row_num_area,
4385                    buf,
4386                    RowNumbersParams {
4387                        start_row: state.start_row,
4388                        visible_rows: state.visible_rows,
4389                        num_rows: state.num_rows,
4390                        row_start_index: state.row_start_index,
4391                        selected_row: state.table_state.selected(),
4392                    },
4393                );
4394
4395                // Adjust data area to exclude row number column
4396                let data_area = Rect {
4397                    x: area.x + row_num_width,
4398                    y: area.y,
4399                    width: area.width.saturating_sub(row_num_width),
4400                    height: area.height,
4401                };
4402
4403                // Slice buffer to visible portion
4404                let offset = state.start_row.saturating_sub(state.buffered_start_row);
4405                let slice_len = state.visible_rows.min(df.height().saturating_sub(offset));
4406                if offset < df.height() && slice_len > 0 {
4407                    let sliced_df = df.slice(offset as i64, slice_len);
4408                    self.render_dataframe(
4409                        &sliced_df,
4410                        data_area,
4411                        buf,
4412                        &mut state.table_state,
4413                        false,
4414                        state.start_row,
4415                    );
4416                }
4417            } else {
4418                // Slice buffer to visible portion
4419                let offset = state.start_row.saturating_sub(state.buffered_start_row);
4420                let slice_len = state.visible_rows.min(df.height().saturating_sub(offset));
4421                if offset < df.height() && slice_len > 0 {
4422                    let sliced_df = df.slice(offset as i64, slice_len);
4423                    self.render_dataframe(
4424                        &sliced_df,
4425                        area,
4426                        buf,
4427                        &mut state.table_state,
4428                        false,
4429                        state.start_row,
4430                    );
4431                }
4432            }
4433        } else if !state.column_order.is_empty() {
4434            // Empty result (0 rows) but we have a schema - show empty table with header, no rows
4435            let empty_columns: Vec<_> = state
4436                .column_order
4437                .iter()
4438                .map(|name| Series::new(name.as_str().into(), Vec::<String>::new()).into())
4439                .collect();
4440            if let Ok(empty_df) = DataFrame::new(empty_columns) {
4441                if state.row_numbers {
4442                    let row_num_area = Rect {
4443                        x: area.x,
4444                        y: area.y,
4445                        width: row_num_width,
4446                        height: area.height,
4447                    };
4448                    self.render_row_numbers(
4449                        row_num_area,
4450                        buf,
4451                        RowNumbersParams {
4452                            start_row: 0,
4453                            visible_rows: state.visible_rows,
4454                            num_rows: 0,
4455                            row_start_index: state.row_start_index,
4456                            selected_row: None,
4457                        },
4458                    );
4459                    let data_area = Rect {
4460                        x: area.x + row_num_width,
4461                        y: area.y,
4462                        width: area.width.saturating_sub(row_num_width),
4463                        height: area.height,
4464                    };
4465                    self.render_dataframe(
4466                        &empty_df,
4467                        data_area,
4468                        buf,
4469                        &mut state.table_state,
4470                        false,
4471                        0,
4472                    );
4473                } else {
4474                    self.render_dataframe(&empty_df, area, buf, &mut state.table_state, false, 0);
4475                }
4476            } else {
4477                Paragraph::new("No data").render(area, buf);
4478            }
4479        } else {
4480            // Truly empty: no schema, not loaded, or blank file
4481            Paragraph::new("No data").render(area, buf);
4482        }
4483    }
4484}
4485
4486#[cfg(test)]
4487mod tests {
4488    use super::*;
4489    use crate::filter_modal::{FilterOperator, FilterStatement, LogicalOperator};
4490    use crate::pivot_melt_modal::{MeltSpec, PivotAggregation, PivotSpec};
4491
4492    fn create_test_lf() -> LazyFrame {
4493        df! (
4494            "a" => &[1, 2, 3],
4495            "b" => &["x", "y", "z"]
4496        )
4497        .unwrap()
4498        .lazy()
4499    }
4500
4501    fn create_large_test_lf() -> LazyFrame {
4502        df! (
4503            "a" => (0..100).collect::<Vec<i32>>(),
4504            "b" => (0..100).map(|i| format!("text_{}", i)).collect::<Vec<String>>(),
4505            "c" => (0..100).map(|i| i % 3).collect::<Vec<i32>>(),
4506            "d" => (0..100).map(|i| i % 5).collect::<Vec<i32>>()
4507        )
4508        .unwrap()
4509        .lazy()
4510    }
4511
4512    /// Pump Open(path, opts) and subsequent events until no event is returned.
4513    /// Returns true if a Crash event was seen.
4514    fn pump_open_until_done(
4515        app: &mut crate::App,
4516        path: std::path::PathBuf,
4517        opts: crate::OpenOptions,
4518    ) -> bool {
4519        use crate::AppEvent;
4520        let mut next = Some(AppEvent::Open(vec![path], opts));
4521        let mut saw_crash = false;
4522        while let Some(ev) = next.take() {
4523            if matches!(ev, AppEvent::Crash(_)) {
4524                saw_crash = true;
4525                break;
4526            }
4527            next = app.event(&ev);
4528        }
4529        saw_crash
4530    }
4531
4532    /// CSV with 100 int-like rows then "N/A" then more ints. With infer_schema_length=100, Polars
4533    /// infers Int from the first 100 rows; the parse error occurs at DoLoadBuffer (first collect),
4534    /// matching CLI crash. We pump app events (Open -> ... -> DoLoadBuffer) and assert we get Crash.
4535    #[test]
4536    fn test_infer_schema_length_csv_fails_with_short_inference() {
4537        use std::sync::mpsc;
4538
4539        let path = crate::tests::sample_data_dir().join("infer_schema_length_data.csv");
4540        let opts = crate::OpenOptions {
4541            infer_schema_length: Some(100),
4542            ..Default::default()
4543        };
4544
4545        let (tx, _rx) = mpsc::channel();
4546        let mut app = crate::App::new(tx);
4547
4548        assert!(
4549            pump_open_until_done(&mut app, path, opts),
4550            "expected Crash when loading with infer_schema_length=100 (N/A at row 101)"
4551        );
4552    }
4553
4554    #[test]
4555    fn test_infer_schema_length_csv_succeeds_with_longer_inference() {
4556        use std::sync::mpsc;
4557
4558        let path = crate::tests::sample_data_dir().join("infer_schema_length_data.csv");
4559        let opts = crate::OpenOptions {
4560            infer_schema_length: Some(101),
4561            ..Default::default()
4562        };
4563
4564        let (tx, _rx) = mpsc::channel();
4565        let mut app = crate::App::new(tx);
4566
4567        assert!(
4568            !pump_open_until_done(&mut app, path, opts),
4569            "load with infer_schema_length=101 should not crash"
4570        );
4571        let state = app.data_table_state.as_ref().unwrap();
4572        assert_eq!(state.schema.len(), 1);
4573        assert!(state.schema.contains("column"));
4574        assert_eq!(state.num_rows, 201);
4575    }
4576
4577    #[test]
4578    fn test_infer_schema_length_csv_succeeds_with_default() {
4579        use std::sync::mpsc;
4580
4581        let path = crate::tests::sample_data_dir().join("infer_schema_length_data.csv");
4582        let opts = crate::OpenOptions {
4583            infer_schema_length: Some(1000),
4584            ..Default::default()
4585        };
4586
4587        let (tx, _rx) = mpsc::channel();
4588        let mut app = crate::App::new(tx);
4589
4590        assert!(
4591            !pump_open_until_done(&mut app, path, opts),
4592            "load with infer_schema_length=1000 (default) should not crash"
4593        );
4594        let state = app.data_table_state.as_ref().unwrap();
4595        assert_eq!(state.schema.len(), 1);
4596        assert_eq!(state.num_rows, 201);
4597    }
4598
4599    #[test]
4600    fn test_from_csv() {
4601        // Ensure sample data is generated before running test
4602        // Test uncompressed CSV loading
4603        let path = crate::tests::sample_data_dir().join("3-sfd-header.csv");
4604        let state = DataTableState::from_csv(&path, &Default::default()).unwrap(); // Uses default buffer params from options
4605        assert_eq!(state.schema.len(), 6); // id, integer_col, float_col, string_col, boolean_col, date_col
4606    }
4607
4608    #[test]
4609    fn test_from_csv_gzipped() {
4610        // Ensure sample data is generated before running test
4611        // Test gzipped CSV loading
4612        let path = crate::tests::sample_data_dir().join("mixed_types.csv.gz");
4613        let state = DataTableState::from_csv(&path, &Default::default()).unwrap(); // Uses default buffer params from options
4614        assert_eq!(state.schema.len(), 6); // id, integer_col, float_col, string_col, boolean_col, date_col
4615    }
4616
4617    #[test]
4618    fn test_from_parquet() {
4619        // Ensure sample data is generated before running test
4620        let path = crate::tests::sample_data_dir().join("people.parquet");
4621        let state = DataTableState::from_parquet(&path, None, None, None, None, false, 1).unwrap();
4622        assert!(!state.schema.is_empty());
4623    }
4624
4625    #[test]
4626    fn test_from_ipc() {
4627        use polars::prelude::IpcWriter;
4628        use std::io::BufWriter;
4629        let mut df = df!(
4630            "x" => &[1_i32, 2, 3],
4631            "y" => &["a", "b", "c"]
4632        )
4633        .unwrap();
4634        let dir = std::env::temp_dir();
4635        let path = dir.join("datui_test_ipc.arrow");
4636        let file = std::fs::File::create(&path).unwrap();
4637        let mut writer = BufWriter::new(file);
4638        IpcWriter::new(&mut writer).finish(&mut df).unwrap();
4639        drop(writer);
4640        let state = DataTableState::from_ipc(&path, None, None, None, None, false, 1).unwrap();
4641        assert_eq!(state.schema.len(), 2);
4642        assert!(state.schema.contains("x"));
4643        assert!(state.schema.contains("y"));
4644        let _ = std::fs::remove_file(&path);
4645    }
4646
4647    #[test]
4648    fn test_from_avro() {
4649        use polars::io::avro::AvroWriter;
4650        use std::io::BufWriter;
4651        let mut df = df!(
4652            "id" => &[1_i32, 2, 3],
4653            "name" => &["alice", "bob", "carol"]
4654        )
4655        .unwrap();
4656        let dir = std::env::temp_dir();
4657        let path = dir.join("datui_test_avro.avro");
4658        let file = std::fs::File::create(&path).unwrap();
4659        let mut writer = BufWriter::new(file);
4660        AvroWriter::new(&mut writer).finish(&mut df).unwrap();
4661        drop(writer);
4662        let state = DataTableState::from_avro(&path, None, None, None, None, false, 1).unwrap();
4663        assert_eq!(state.schema.len(), 2);
4664        assert!(state.schema.contains("id"));
4665        assert!(state.schema.contains("name"));
4666        let _ = std::fs::remove_file(&path);
4667    }
4668
4669    #[test]
4670    fn test_from_orc() {
4671        use arrow::array::{Int64Array, StringArray};
4672        use arrow::datatypes::{DataType, Field, Schema};
4673        use arrow::record_batch::RecordBatch;
4674        use orc_rust::ArrowWriterBuilder;
4675        use std::io::BufWriter;
4676        use std::sync::Arc;
4677
4678        let schema = Arc::new(Schema::new(vec![
4679            Field::new("id", DataType::Int64, false),
4680            Field::new("name", DataType::Utf8, false),
4681        ]));
4682        let id_array = Arc::new(Int64Array::from(vec![1_i64, 2, 3]));
4683        let name_array = Arc::new(StringArray::from(vec!["a", "b", "c"]));
4684        let batch = RecordBatch::try_new(schema.clone(), vec![id_array, name_array]).unwrap();
4685
4686        let dir = std::env::temp_dir();
4687        let path = dir.join("datui_test_orc.orc");
4688        let file = std::fs::File::create(&path).unwrap();
4689        let writer = BufWriter::new(file);
4690        let mut orc_writer = ArrowWriterBuilder::new(writer, schema).try_build().unwrap();
4691        orc_writer.write(&batch).unwrap();
4692        orc_writer.close().unwrap();
4693
4694        let state = DataTableState::from_orc(&path, None, None, None, None, false, 1).unwrap();
4695        assert_eq!(state.schema.len(), 2);
4696        assert!(state.schema.contains("id"));
4697        assert!(state.schema.contains("name"));
4698        let _ = std::fs::remove_file(&path);
4699    }
4700
4701    #[test]
4702    fn test_from_delimited_tsv_has_header() {
4703        let dir = std::env::temp_dir();
4704        let path = dir.join("datui_test_tsv_header.tsv");
4705        let content = "a\tb\tc\td\n1\t2\t3\t4\n5\t6\t7\t8\n";
4706        std::fs::write(&path, content).unwrap();
4707        let opts = OpenOptions {
4708            has_header: Some(true),
4709            ..Default::default()
4710        };
4711        let mut state = DataTableState::from_delimited(&path, b'\t', &opts).unwrap();
4712        state.collect();
4713        assert_eq!(state.schema.len(), 4);
4714        assert!(state.schema.contains("a"));
4715        assert!(state.schema.contains("b"));
4716        assert!(state.schema.contains("c"));
4717        assert!(state.schema.contains("d"));
4718        assert_eq!(state.num_rows, 2);
4719        let _ = std::fs::remove_file(&path);
4720    }
4721
4722    #[test]
4723    fn test_from_delimited_tsv_no_header() {
4724        let dir = std::env::temp_dir();
4725        let path = dir.join("datui_test_tsv_no_header.tsv");
4726        let content = "a\tb\tc\td\n1\t2\t3\t4\n5\t6\t7\t8\n";
4727        std::fs::write(&path, content).unwrap();
4728        let opts = OpenOptions {
4729            has_header: Some(false),
4730            ..Default::default()
4731        };
4732        let mut state = DataTableState::from_delimited(&path, b'\t', &opts).unwrap();
4733        state.collect();
4734        assert_eq!(state.schema.len(), 4);
4735        assert!(state.schema.contains("column_1"));
4736        assert!(state.schema.contains("column_2"));
4737        assert!(state.schema.contains("column_3"));
4738        assert!(state.schema.contains("column_4"));
4739        assert_eq!(state.num_rows, 3);
4740        let _ = std::fs::remove_file(&path);
4741    }
4742
4743    #[test]
4744    fn test_from_delimited_psv_no_header() {
4745        let dir = std::env::temp_dir();
4746        let path = dir.join("datui_test_psv_no_header.psv");
4747        let content = "x|y|z\n10|20|30\n40|50|60\n";
4748        std::fs::write(&path, content).unwrap();
4749        let opts = OpenOptions {
4750            has_header: Some(false),
4751            ..Default::default()
4752        };
4753        let mut state = DataTableState::from_delimited(&path, b'|', &opts).unwrap();
4754        state.collect();
4755        assert_eq!(state.schema.len(), 3);
4756        assert!(state.schema.contains("column_1"));
4757        assert!(state.schema.contains("column_2"));
4758        assert!(state.schema.contains("column_3"));
4759        assert_eq!(state.num_rows, 3);
4760        let _ = std::fs::remove_file(&path);
4761    }
4762
4763    #[test]
4764    fn test_filter() {
4765        let lf = create_test_lf();
4766        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4767        let filters = vec![FilterStatement {
4768            column: "a".to_string(),
4769            operator: FilterOperator::Gt,
4770            value: "2".to_string(),
4771            logical_op: LogicalOperator::And,
4772        }];
4773        state.filter(filters);
4774        let df = state.lf.clone().collect().unwrap();
4775        assert_eq!(df.shape().0, 1);
4776        assert_eq!(df.column("a").unwrap().get(0).unwrap(), AnyValue::Int32(3));
4777    }
4778
4779    #[test]
4780    fn test_sort() {
4781        let lf = create_test_lf();
4782        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4783        state.sort(vec!["a".to_string()], false);
4784        let df = state.lf.clone().collect().unwrap();
4785        assert_eq!(df.column("a").unwrap().get(0).unwrap(), AnyValue::Int32(3));
4786    }
4787
4788    #[test]
4789    fn test_query() {
4790        let lf = create_test_lf();
4791        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4792        state.query("select b where a = 2".to_string());
4793        let df = state.lf.clone().collect().unwrap();
4794        assert_eq!(df.shape(), (1, 1));
4795        assert_eq!(
4796            df.column("b").unwrap().get(0).unwrap(),
4797            AnyValue::String("y")
4798        );
4799    }
4800
4801    #[test]
4802    fn test_query_date_accessors() {
4803        use chrono::NaiveDate;
4804        let df = df!(
4805            "event_date" => [
4806                NaiveDate::from_ymd_opt(2024, 1, 15).unwrap(),
4807                NaiveDate::from_ymd_opt(2024, 6, 20).unwrap(),
4808                NaiveDate::from_ymd_opt(2024, 12, 31).unwrap(),
4809            ],
4810            "name" => &["a", "b", "c"],
4811        )
4812        .unwrap();
4813        let lf = df.lazy();
4814        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4815
4816        // Select with date accessors
4817        state.query("select name, year: event_date.year, month: event_date.month".to_string());
4818        assert!(
4819            state.error.is_none(),
4820            "query should succeed: {:?}",
4821            state.error
4822        );
4823        let df = state.lf.clone().collect().unwrap();
4824        assert_eq!(df.shape(), (3, 3));
4825        assert_eq!(
4826            df.column("year").unwrap().get(0).unwrap(),
4827            AnyValue::Int32(2024)
4828        );
4829        assert_eq!(
4830            df.column("month").unwrap().get(0).unwrap(),
4831            AnyValue::Int8(1)
4832        );
4833        assert_eq!(
4834            df.column("month").unwrap().get(1).unwrap(),
4835            AnyValue::Int8(6)
4836        );
4837
4838        // Filter with date accessor
4839        state.query("select name, event_date where event_date.month = 12".to_string());
4840        assert!(
4841            state.error.is_none(),
4842            "filter should succeed: {:?}",
4843            state.error
4844        );
4845        let df = state.lf.clone().collect().unwrap();
4846        assert_eq!(df.height(), 1);
4847        assert_eq!(
4848            df.column("name").unwrap().get(0).unwrap(),
4849            AnyValue::String("c")
4850        );
4851
4852        // Filter with YYYY.MM.DD date literal
4853        state.query("select name, event_date where event_date.date > 2024.06.15".to_string());
4854        assert!(
4855            state.error.is_none(),
4856            "date literal filter should succeed: {:?}",
4857            state.error
4858        );
4859        let df = state.lf.clone().collect().unwrap();
4860        assert_eq!(
4861            df.height(),
4862            2,
4863            "2024-06-20 and 2024-12-31 are after 2024-06-15"
4864        );
4865
4866        // String accessors: upper, lower, len, ends_with
4867        state.query(
4868            "select name, upper_name: name.upper, name_len: name.len where name.ends_with[\"c\"]"
4869                .to_string(),
4870        );
4871        assert!(
4872            state.error.is_none(),
4873            "string accessors should succeed: {:?}",
4874            state.error
4875        );
4876        let df = state.lf.clone().collect().unwrap();
4877        assert_eq!(df.height(), 1, "only 'c' ends with 'c'");
4878        assert_eq!(
4879            df.column("upper_name").unwrap().get(0).unwrap(),
4880            AnyValue::String("C")
4881        );
4882
4883        // Query that returns 0 rows: df and locked_df must be cleared for correct empty-table render
4884        state.query("select where event_date.date = 2020.01.01".to_string());
4885        assert!(state.error.is_none());
4886        assert_eq!(state.num_rows, 0);
4887        state.visible_rows = 10;
4888        state.collect();
4889        assert!(state.df.is_none(), "df must be cleared when num_rows is 0");
4890        assert!(
4891            state.locked_df.is_none(),
4892            "locked_df must be cleared when num_rows is 0"
4893        );
4894    }
4895
4896    #[test]
4897    fn test_select_next_previous() {
4898        let lf = create_large_test_lf();
4899        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4900        state.visible_rows = 10;
4901        state.table_state.select(Some(5));
4902
4903        state.select_next();
4904        assert_eq!(state.table_state.selected(), Some(6));
4905
4906        state.select_previous();
4907        assert_eq!(state.table_state.selected(), Some(5));
4908    }
4909
4910    #[test]
4911    fn test_page_up_down() {
4912        let lf = create_large_test_lf();
4913        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4914        state.visible_rows = 20;
4915        state.collect();
4916
4917        assert_eq!(state.start_row, 0);
4918        state.page_down();
4919        assert_eq!(state.start_row, 20);
4920        state.page_down();
4921        assert_eq!(state.start_row, 40);
4922        state.page_up();
4923        assert_eq!(state.start_row, 20);
4924        state.page_up();
4925        assert_eq!(state.start_row, 0);
4926    }
4927
4928    #[test]
4929    fn test_scroll_left_right() {
4930        let lf = create_large_test_lf();
4931        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4932        assert_eq!(state.termcol_index, 0);
4933        state.scroll_right();
4934        assert_eq!(state.termcol_index, 1);
4935        state.scroll_right();
4936        assert_eq!(state.termcol_index, 2);
4937        state.scroll_left();
4938        assert_eq!(state.termcol_index, 1);
4939        state.scroll_left();
4940        assert_eq!(state.termcol_index, 0);
4941    }
4942
4943    #[test]
4944    fn test_reverse() {
4945        let lf = create_test_lf();
4946        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4947        state.sort(vec!["a".to_string()], true);
4948        assert_eq!(
4949            state
4950                .lf
4951                .clone()
4952                .collect()
4953                .unwrap()
4954                .column("a")
4955                .unwrap()
4956                .get(0)
4957                .unwrap(),
4958            AnyValue::Int32(1)
4959        );
4960        state.reverse();
4961        assert_eq!(
4962            state
4963                .lf
4964                .clone()
4965                .collect()
4966                .unwrap()
4967                .column("a")
4968                .unwrap()
4969                .get(0)
4970                .unwrap(),
4971            AnyValue::Int32(3)
4972        );
4973    }
4974
4975    #[test]
4976    fn test_filter_multiple() {
4977        let lf = create_large_test_lf();
4978        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4979        let filters = vec![
4980            FilterStatement {
4981                column: "c".to_string(),
4982                operator: FilterOperator::Eq,
4983                value: "1".to_string(),
4984                logical_op: LogicalOperator::And,
4985            },
4986            FilterStatement {
4987                column: "d".to_string(),
4988                operator: FilterOperator::Eq,
4989                value: "2".to_string(),
4990                logical_op: LogicalOperator::And,
4991            },
4992        ];
4993        state.filter(filters);
4994        let df = state.lf.clone().collect().unwrap();
4995        assert_eq!(df.shape().0, 7);
4996    }
4997
4998    #[test]
4999    fn test_filter_and_sort() {
5000        let lf = create_large_test_lf();
5001        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
5002        let filters = vec![FilterStatement {
5003            column: "c".to_string(),
5004            operator: FilterOperator::Eq,
5005            value: "1".to_string(),
5006            logical_op: LogicalOperator::And,
5007        }];
5008        state.filter(filters);
5009        state.sort(vec!["a".to_string()], false);
5010        let df = state.lf.clone().collect().unwrap();
5011        assert_eq!(df.column("a").unwrap().get(0).unwrap(), AnyValue::Int32(97));
5012    }
5013
5014    /// Minimal long-format data for pivot tests: id, date, key, value.
5015    /// Includes duplicates for aggregation (e.g. (1,d1,A) appears twice).
5016    fn create_pivot_long_lf() -> LazyFrame {
5017        let df = df!(
5018            "id" => &[1_i32, 1, 1, 2, 2, 2, 1, 2],
5019            "date" => &["d1", "d1", "d1", "d1", "d1", "d1", "d1", "d1"],
5020            "key" => &["A", "B", "C", "A", "B", "C", "A", "B"],
5021            "value" => &[10.0_f64, 20.0, 30.0, 40.0, 50.0, 60.0, 11.0, 51.0],
5022        )
5023        .unwrap();
5024        df.lazy()
5025    }
5026
5027    /// Wide-format data for melt tests: id, date, c1, c2, c3.
5028    fn create_melt_wide_lf() -> LazyFrame {
5029        let df = df!(
5030            "id" => &[1_i32, 2, 3],
5031            "date" => &["d1", "d2", "d3"],
5032            "c1" => &[10.0_f64, 20.0, 30.0],
5033            "c2" => &[11.0, 21.0, 31.0],
5034            "c3" => &[12.0, 22.0, 32.0],
5035        )
5036        .unwrap();
5037        df.lazy()
5038    }
5039
5040    #[test]
5041    fn test_pivot_basic() {
5042        let lf = create_pivot_long_lf();
5043        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
5044        let spec = PivotSpec {
5045            index: vec!["id".to_string(), "date".to_string()],
5046            pivot_column: "key".to_string(),
5047            value_column: "value".to_string(),
5048            aggregation: PivotAggregation::Last,
5049            sort_columns: None,
5050        };
5051        state.pivot(&spec).unwrap();
5052        let df = state.lf.clone().collect().unwrap();
5053        let names: Vec<&str> = df.get_column_names().iter().map(|s| s.as_str()).collect();
5054        assert!(names.contains(&"id"));
5055        assert!(names.contains(&"date"));
5056        assert!(names.contains(&"A"));
5057        assert!(names.contains(&"B"));
5058        assert!(names.contains(&"C"));
5059        assert_eq!(df.height(), 2);
5060    }
5061
5062    #[test]
5063    fn test_pivot_aggregation_last() {
5064        let lf = create_pivot_long_lf();
5065        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
5066        let spec = PivotSpec {
5067            index: vec!["id".to_string(), "date".to_string()],
5068            pivot_column: "key".to_string(),
5069            value_column: "value".to_string(),
5070            aggregation: PivotAggregation::Last,
5071            sort_columns: None,
5072        };
5073        state.pivot(&spec).unwrap();
5074        let df = state.lf.clone().collect().unwrap();
5075        let a_col = df.column("A").unwrap();
5076        let row0 = a_col.get(0).unwrap();
5077        let row1 = a_col.get(1).unwrap();
5078        assert_eq!(row0, AnyValue::Float64(11.0));
5079        assert_eq!(row1, AnyValue::Float64(40.0));
5080    }
5081
5082    #[test]
5083    fn test_pivot_aggregation_first() {
5084        let lf = create_pivot_long_lf();
5085        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
5086        let spec = PivotSpec {
5087            index: vec!["id".to_string(), "date".to_string()],
5088            pivot_column: "key".to_string(),
5089            value_column: "value".to_string(),
5090            aggregation: PivotAggregation::First,
5091            sort_columns: None,
5092        };
5093        state.pivot(&spec).unwrap();
5094        let df = state.lf.clone().collect().unwrap();
5095        let a_col = df.column("A").unwrap();
5096        assert_eq!(a_col.get(0).unwrap(), AnyValue::Float64(10.0));
5097        assert_eq!(a_col.get(1).unwrap(), AnyValue::Float64(40.0));
5098    }
5099
5100    #[test]
5101    fn test_pivot_aggregation_min_max() {
5102        let lf = create_pivot_long_lf();
5103        let mut state_min = DataTableState::new(lf.clone(), None, None, None, None, true).unwrap();
5104        state_min
5105            .pivot(&PivotSpec {
5106                index: vec!["id".to_string(), "date".to_string()],
5107                pivot_column: "key".to_string(),
5108                value_column: "value".to_string(),
5109                aggregation: PivotAggregation::Min,
5110                sort_columns: None,
5111            })
5112            .unwrap();
5113        let df_min = state_min.lf.clone().collect().unwrap();
5114        assert_eq!(
5115            df_min.column("A").unwrap().get(0).unwrap(),
5116            AnyValue::Float64(10.0)
5117        );
5118
5119        let mut state_max = DataTableState::new(lf, None, None, None, None, true).unwrap();
5120        state_max
5121            .pivot(&PivotSpec {
5122                index: vec!["id".to_string(), "date".to_string()],
5123                pivot_column: "key".to_string(),
5124                value_column: "value".to_string(),
5125                aggregation: PivotAggregation::Max,
5126                sort_columns: None,
5127            })
5128            .unwrap();
5129        let df_max = state_max.lf.clone().collect().unwrap();
5130        assert_eq!(
5131            df_max.column("A").unwrap().get(0).unwrap(),
5132            AnyValue::Float64(11.0)
5133        );
5134    }
5135
5136    #[test]
5137    fn test_pivot_aggregation_avg_count() {
5138        let lf = create_pivot_long_lf();
5139        let mut state_avg = DataTableState::new(lf.clone(), None, None, None, None, true).unwrap();
5140        state_avg
5141            .pivot(&PivotSpec {
5142                index: vec!["id".to_string(), "date".to_string()],
5143                pivot_column: "key".to_string(),
5144                value_column: "value".to_string(),
5145                aggregation: PivotAggregation::Avg,
5146                sort_columns: None,
5147            })
5148            .unwrap();
5149        let df_avg = state_avg.lf.clone().collect().unwrap();
5150        let a = df_avg.column("A").unwrap().get(0).unwrap();
5151        if let AnyValue::Float64(x) = a {
5152            assert!((x - 10.5).abs() < 1e-6);
5153        } else {
5154            panic!("expected float");
5155        }
5156
5157        let mut state_count = DataTableState::new(lf, None, None, None, None, true).unwrap();
5158        state_count
5159            .pivot(&PivotSpec {
5160                index: vec!["id".to_string(), "date".to_string()],
5161                pivot_column: "key".to_string(),
5162                value_column: "value".to_string(),
5163                aggregation: PivotAggregation::Count,
5164                sort_columns: None,
5165            })
5166            .unwrap();
5167        let df_count = state_count.lf.clone().collect().unwrap();
5168        let a = df_count.column("A").unwrap().get(0).unwrap();
5169        assert_eq!(a, AnyValue::UInt32(2));
5170    }
5171
5172    #[test]
5173    fn test_pivot_string_first_last() {
5174        let df = df!(
5175            "id" => &[1_i32, 1, 2, 2],
5176            "key" => &["X", "Y", "X", "Y"],
5177            "value" => &["low", "mid", "high", "mid"],
5178        )
5179        .unwrap();
5180        let lf = df.lazy();
5181        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
5182        let spec = PivotSpec {
5183            index: vec!["id".to_string()],
5184            pivot_column: "key".to_string(),
5185            value_column: "value".to_string(),
5186            aggregation: PivotAggregation::Last,
5187            sort_columns: None,
5188        };
5189        state.pivot(&spec).unwrap();
5190        let out = state.lf.clone().collect().unwrap();
5191        assert_eq!(
5192            out.column("X").unwrap().get(0).unwrap(),
5193            AnyValue::String("low")
5194        );
5195        assert_eq!(
5196            out.column("Y").unwrap().get(0).unwrap(),
5197            AnyValue::String("mid")
5198        );
5199    }
5200
5201    #[test]
5202    fn test_melt_basic() {
5203        let lf = create_melt_wide_lf();
5204        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
5205        let spec = MeltSpec {
5206            index: vec!["id".to_string(), "date".to_string()],
5207            value_columns: vec!["c1".to_string(), "c2".to_string(), "c3".to_string()],
5208            variable_name: "variable".to_string(),
5209            value_name: "value".to_string(),
5210        };
5211        state.melt(&spec).unwrap();
5212        let df = state.lf.clone().collect().unwrap();
5213        assert_eq!(df.height(), 9);
5214        let names: Vec<&str> = df.get_column_names().iter().map(|s| s.as_str()).collect();
5215        assert!(names.contains(&"variable"));
5216        assert!(names.contains(&"value"));
5217        assert!(names.contains(&"id"));
5218        assert!(names.contains(&"date"));
5219    }
5220
5221    #[test]
5222    fn test_melt_all_except_index() {
5223        let lf = create_melt_wide_lf();
5224        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
5225        let spec = MeltSpec {
5226            index: vec!["id".to_string(), "date".to_string()],
5227            value_columns: vec!["c1".to_string(), "c2".to_string(), "c3".to_string()],
5228            variable_name: "var".to_string(),
5229            value_name: "val".to_string(),
5230        };
5231        state.melt(&spec).unwrap();
5232        let df = state.lf.clone().collect().unwrap();
5233        assert!(df.column("var").is_ok());
5234        assert!(df.column("val").is_ok());
5235    }
5236
5237    #[test]
5238    fn test_pivot_on_current_view_after_filter() {
5239        let lf = create_pivot_long_lf();
5240        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
5241        state.filter(vec![FilterStatement {
5242            column: "id".to_string(),
5243            operator: FilterOperator::Eq,
5244            value: "1".to_string(),
5245            logical_op: LogicalOperator::And,
5246        }]);
5247        let spec = PivotSpec {
5248            index: vec!["id".to_string(), "date".to_string()],
5249            pivot_column: "key".to_string(),
5250            value_column: "value".to_string(),
5251            aggregation: PivotAggregation::Last,
5252            sort_columns: None,
5253        };
5254        state.pivot(&spec).unwrap();
5255        let df = state.lf.clone().collect().unwrap();
5256        assert_eq!(df.height(), 1);
5257        let id_col = df.column("id").unwrap();
5258        assert_eq!(id_col.get(0).unwrap(), AnyValue::Int32(1));
5259    }
5260
5261    #[test]
5262    fn test_fuzzy_token_regex() {
5263        assert_eq!(fuzzy_token_regex("foo"), "(?i).*f.*o.*o.*");
5264        assert_eq!(fuzzy_token_regex("a"), "(?i).*a.*");
5265        // Regex-special characters are escaped
5266        let pat = fuzzy_token_regex("[");
5267        assert!(pat.contains("\\["));
5268    }
5269
5270    #[test]
5271    fn test_fuzzy_search() {
5272        // Filter logic is covered by test_fuzzy_search_regex_direct. This test runs the full
5273        // path through DataTableState; it requires sample data (CSV with string column).
5274        crate::tests::ensure_sample_data();
5275        let path = crate::tests::sample_data_dir().join("3-sfd-header.csv");
5276        let mut state = DataTableState::from_csv(&path, &Default::default()).unwrap();
5277        state.visible_rows = 10;
5278        state.collect();
5279        let before = state.num_rows;
5280        state.fuzzy_search("string".to_string());
5281        assert!(state.error.is_none(), "{:?}", state.error);
5282        assert!(state.num_rows <= before, "fuzzy search should filter rows");
5283        state.fuzzy_search("".to_string());
5284        state.collect();
5285        assert_eq!(state.num_rows, before, "empty fuzzy search should reset");
5286        assert!(state.get_active_fuzzy_query().is_empty());
5287    }
5288
5289    #[test]
5290    fn test_fuzzy_search_regex_direct() {
5291        // Sanity check: Polars str().contains with our regex matches "alice" for pattern ".*a.*l.*i.*"
5292        let lf = df!("name" => &["alice", "bob", "carol"]).unwrap().lazy();
5293        let pattern = fuzzy_token_regex("alice");
5294        let out = lf
5295            .filter(col("name").str().contains(lit(pattern.clone()), false))
5296            .collect()
5297            .unwrap();
5298        assert_eq!(out.height(), 1, "regex {:?} should match alice", pattern);
5299
5300        // Two columns OR (as in fuzzy_search)
5301        let lf2 = df!(
5302            "id" => &[1i32, 2, 3],
5303            "name" => &["alice", "bob", "carol"],
5304            "city" => &["NYC", "LA", "Boston"]
5305        )
5306        .unwrap()
5307        .lazy();
5308        let pat = fuzzy_token_regex("alice");
5309        let expr = col("name")
5310            .str()
5311            .contains(lit(pat.clone()), false)
5312            .or(col("city").str().contains(lit(pat), false));
5313        let out2 = lf2.clone().filter(expr).collect().unwrap();
5314        assert_eq!(out2.height(), 1);
5315
5316        // Replicate exact fuzzy_search logic: schema from original_lf, string_cols, then filter
5317        let schema = lf2.clone().collect_schema().unwrap();
5318        let string_cols: Vec<String> = schema
5319            .iter()
5320            .filter(|(_, dtype)| dtype.is_string())
5321            .map(|(name, _)| name.to_string())
5322            .collect();
5323        assert!(
5324            !string_cols.is_empty(),
5325            "df! string cols should be detected"
5326        );
5327        let pattern = fuzzy_token_regex("alice");
5328        let token_expr = string_cols
5329            .iter()
5330            .map(|c| col(c.as_str()).str().contains(lit(pattern.clone()), false))
5331            .reduce(|a, b| a.or(b))
5332            .unwrap();
5333        let out3 = lf2.filter(token_expr).collect().unwrap();
5334        assert_eq!(
5335            out3.height(),
5336            1,
5337            "fuzzy_search-style filter should match 1 row"
5338        );
5339    }
5340
5341    #[test]
5342    fn test_fuzzy_search_no_string_columns() {
5343        let lf = df!("a" => &[1i32, 2, 3], "b" => &[10i64, 20, 30])
5344            .unwrap()
5345            .lazy();
5346        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
5347        state.fuzzy_search("x".to_string());
5348        assert!(state.error.is_some());
5349    }
5350
5351    /// By-queries must produce results sorted by the group columns (age_group, then team)
5352    /// so that output order is deterministic and practical. Raw data is deliberately out of order.
5353    #[test]
5354    fn test_by_query_result_sorted_by_group_columns() {
5355        // Build a small table: age_group (1-5, out of order), team (Red/Blue/Green), score (0-100)
5356        let df = df!(
5357            "age_group" => &[3i64, 1, 5, 2, 4, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5],
5358            "team" => &[
5359                "Red", "Blue", "Green", "Red", "Blue", "Green", "Green", "Red", "Blue",
5360                "Green", "Red", "Blue", "Red", "Blue", "Green",
5361            ],
5362            "score" => &[50.0f64, 10.0, 90.0, 20.0, 30.0, 40.0, 60.0, 70.0, 80.0, 15.0, 25.0, 35.0, 45.0, 55.0, 65.0],
5363        )
5364        .unwrap();
5365        let lf = df.lazy();
5366        let options = crate::OpenOptions::default();
5367        let mut state = DataTableState::from_lazyframe(lf, &options).unwrap();
5368        state.query("select avg score by age_group, team".to_string());
5369        assert!(
5370            state.error.is_none(),
5371            "query should succeed: {:?}",
5372            state.error
5373        );
5374        let result = state.lf.collect().unwrap();
5375        // Result must be sorted by group columns (age_group, then team)
5376        let sorted = result
5377            .sort(
5378                ["age_group", "team"],
5379                SortMultipleOptions::default().with_order_descending(false),
5380            )
5381            .unwrap();
5382        assert_eq!(
5383            result, sorted,
5384            "by-query result must be sorted by (age_group, team)"
5385        );
5386    }
5387
5388    /// Computed group keys (e.g. Fare: 1+floor Fare % 25) must be sorted by their result column
5389    /// values, not by re-evaluating the expression on the result.
5390    #[test]
5391    fn test_by_query_computed_group_key_sorted_by_result_column() {
5392        let df = df!(
5393            "x" => &[7.0f64, 12.0, 3.0, 22.0, 17.0, 8.0],
5394            "v" => &[1.0f64, 2.0, 3.0, 4.0, 5.0, 6.0],
5395        )
5396        .unwrap();
5397        let lf = df.lazy();
5398        let options = crate::OpenOptions::default();
5399        let mut state = DataTableState::from_lazyframe(lf, &options).unwrap();
5400        // bucket: 1+floor(x)%3 -> values 1,2,3; raw x order 7,12,3,22,17,8 -> buckets 2,2,1,2,2,2
5401        state.query("select sum v by bucket: 1+floor x % 3".to_string());
5402        assert!(
5403            state.error.is_none(),
5404            "query should succeed: {:?}",
5405            state.error
5406        );
5407        let result = state.lf.collect().unwrap();
5408        let bucket = result.column("bucket").unwrap();
5409        // Must be sorted by bucket (1, 2, 3)
5410        for i in 1..result.height() {
5411            let prev: i64 = bucket.get(i - 1).unwrap().try_extract().unwrap_or(0);
5412            let curr: i64 = bucket.get(i).unwrap().try_extract().unwrap_or(0);
5413            assert!(
5414                curr >= prev,
5415                "bucket column must be sorted: {} then {}",
5416                prev,
5417                curr
5418            );
5419        }
5420    }
5421}