Skip to main content

datui_lib/widgets/
datatable.rs

1use color_eyre::Result;
2use std::borrow::Cow;
3use std::collections::HashSet;
4use std::sync::Arc;
5use std::{fs, fs::File, path::Path};
6
7use polars::io::HiveOptions;
8use polars::prelude::*;
9use ratatui::{
10    buffer::Buffer,
11    layout::Rect,
12    style::{Color, Modifier, Style},
13    text::{Line, Span},
14    widgets::{
15        Block, Borders, Cell, Padding, Paragraph, Row, StatefulWidget, Table, TableState, Widget,
16    },
17};
18
19use crate::error_display::user_message_from_polars;
20use crate::filter_modal::{FilterOperator, FilterStatement, LogicalOperator};
21use crate::pivot_melt_modal::{MeltSpec, PivotAggregation, PivotSpec};
22use crate::query::parse_query;
23use crate::statistics::collect_lazy;
24use crate::{CompressionFormat, OpenOptions};
25use polars::io::csv::read::NullValues;
26use polars::lazy::frame::pivot::pivot_stable;
27use std::io::{BufReader, Read};
28
29use calamine::{open_workbook_auto, Data, Reader};
30use chrono::{NaiveDate, NaiveDateTime, NaiveTime};
31use orc_rust::ArrowReaderBuilder;
32use tempfile::NamedTempFile;
33
34use arrow::array::types::{
35    Date32Type, Date64Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type,
36    TimestampMillisecondType, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
37};
38use arrow::array::{Array, AsArray};
39use arrow::record_batch::RecordBatch;
40
41fn pivot_agg_expr(agg: PivotAggregation) -> Result<Expr> {
42    let e = col(PlSmallStr::from_static(""));
43    let expr = match agg {
44        PivotAggregation::Last => e.last(),
45        PivotAggregation::First => e.first(),
46        PivotAggregation::Min => e.min(),
47        PivotAggregation::Max => e.max(),
48        PivotAggregation::Avg => e.mean(),
49        PivotAggregation::Med => e.median(),
50        PivotAggregation::Std => e.std(1),
51        PivotAggregation::Count => e.len(),
52    };
53    Ok(expr)
54}
55
56pub struct DataTableState {
57    pub lf: LazyFrame,
58    original_lf: LazyFrame,
59    df: Option<DataFrame>,        // Scrollable columns dataframe
60    locked_df: Option<DataFrame>, // Locked columns dataframe
61    pub table_state: TableState,
62    pub start_row: usize,
63    pub visible_rows: usize,
64    pub termcol_index: usize,
65    pub visible_termcols: usize,
66    pub error: Option<PolarsError>,
67    pub suppress_error_display: bool, // When true, don't show errors in main view (e.g., when query input is active)
68    pub schema: Arc<Schema>,
69    pub num_rows: usize,
70    /// When true, collect() skips the len() query.
71    num_rows_valid: bool,
72    filters: Vec<FilterStatement>,
73    sort_columns: Vec<String>,
74    sort_ascending: bool,
75    pub active_query: String,
76    /// Last executed SQL (Sql tab). Independent from active_query; only one applies to current view.
77    pub active_sql_query: String,
78    /// Last executed fuzzy search (Fuzzy tab). Independent from active_query/active_sql_query.
79    pub active_fuzzy_query: String,
80    column_order: Vec<String>,   // Order of columns for display
81    locked_columns_count: usize, // Number of locked columns (from left)
82    grouped_lf: Option<LazyFrame>,
83    drilled_down_group_index: Option<usize>, // Index of the group we're viewing
84    pub drilled_down_group_key: Option<Vec<String>>, // Key values of the drilled down group
85    pub drilled_down_group_key_columns: Option<Vec<String>>, // Key column names of the drilled down group
86    pages_lookahead: usize,
87    pages_lookback: usize,
88    max_buffered_rows: usize, // 0 = no limit
89    max_buffered_mb: usize,   // 0 = no limit
90    buffered_start_row: usize,
91    buffered_end_row: usize,
92    /// Full buffered DataFrame (all columns in column_order) for the current buffer range.
93    /// When set, column scroll (scroll_left/scroll_right) only re-slices columns without re-collecting from LazyFrame.
94    buffered_df: Option<DataFrame>,
95    proximity_threshold: usize,
96    row_numbers: bool,
97    row_start_index: usize,
98    /// Last applied pivot spec, if current lf is result of a pivot. Used for templates.
99    last_pivot_spec: Option<PivotSpec>,
100    /// Last applied melt spec, if current lf is result of a melt. Used for templates.
101    last_melt_spec: Option<MeltSpec>,
102    /// When set, dataset was loaded with hive partitioning; partition column names for Info panel and predicate pushdown.
103    pub partition_columns: Option<Vec<String>>,
104    /// When set, decompressed CSV was written to this temp file; kept alive so the file exists for lazy scan.
105    decompress_temp_file: Option<NamedTempFile>,
106    /// When true, use Polars streaming engine for LazyFrame collect when the streaming feature is enabled.
107    pub polars_streaming: bool,
108    /// When true, cast Date/Datetime pivot index columns to Int32 before pivot (workaround for Polars 0.52).
109    workaround_pivot_date_index: bool,
110}
111
112/// Inferred type for an Excel column (preserves numbers, bools, dates; avoids stringifying).
113#[derive(Clone, Copy)]
114enum ExcelColType {
115    Int64,
116    Float64,
117    Boolean,
118    Utf8,
119    Date,
120    Datetime,
121}
122
123impl DataTableState {
124    pub fn new(
125        lf: LazyFrame,
126        pages_lookahead: Option<usize>,
127        pages_lookback: Option<usize>,
128        max_buffered_rows: Option<usize>,
129        max_buffered_mb: Option<usize>,
130        polars_streaming: bool,
131    ) -> Result<Self> {
132        let schema = lf.clone().collect_schema()?;
133        let column_order: Vec<String> = schema.iter_names().map(|s| s.to_string()).collect();
134        Ok(Self {
135            original_lf: lf.clone(),
136            lf,
137            df: None,
138            locked_df: None,
139            table_state: TableState::default(),
140            start_row: 0,
141            visible_rows: 0,
142            termcol_index: 0,
143            visible_termcols: 0,
144            error: None,
145            suppress_error_display: false,
146            schema,
147            num_rows: 0,
148            num_rows_valid: false,
149            filters: Vec::new(),
150            sort_columns: Vec::new(),
151            sort_ascending: true,
152            active_query: String::new(),
153            active_sql_query: String::new(),
154            active_fuzzy_query: String::new(),
155            column_order,
156            locked_columns_count: 0,
157            grouped_lf: None,
158            drilled_down_group_index: None,
159            drilled_down_group_key: None,
160            drilled_down_group_key_columns: None,
161            pages_lookahead: pages_lookahead.unwrap_or(3),
162            pages_lookback: pages_lookback.unwrap_or(3),
163            max_buffered_rows: max_buffered_rows.unwrap_or(100_000),
164            max_buffered_mb: max_buffered_mb.unwrap_or(512),
165            buffered_start_row: 0,
166            buffered_end_row: 0,
167            buffered_df: None,
168            proximity_threshold: 0, // Will be set when visible_rows is known
169            row_numbers: false,     // Will be set from options
170            row_start_index: 1,     // Will be set from options
171            last_pivot_spec: None,
172            last_melt_spec: None,
173            partition_columns: None,
174            decompress_temp_file: None,
175            polars_streaming,
176            workaround_pivot_date_index: true,
177        })
178    }
179
180    /// Create state from an existing LazyFrame (e.g. from Python or in-memory). Uses OpenOptions for display/buffer settings.
181    pub fn from_lazyframe(lf: LazyFrame, options: &crate::OpenOptions) -> Result<Self> {
182        let mut state = Self::new(
183            lf,
184            options.pages_lookahead,
185            options.pages_lookback,
186            options.max_buffered_rows,
187            options.max_buffered_mb,
188            options.polars_streaming,
189        )?;
190        state.row_numbers = options.row_numbers;
191        state.row_start_index = options.row_start_index;
192        state.workaround_pivot_date_index = options.workaround_pivot_date_index;
193        Ok(state)
194    }
195
196    /// Create state from a pre-collected schema and LazyFrame (for phased loading). Does not call collect_schema();
197    /// df is None so the UI can render headers while the first collect() runs.
198    /// When `partition_columns` is Some (e.g. hive), column order is partition cols first.
199    pub fn from_schema_and_lazyframe(
200        schema: Arc<Schema>,
201        lf: LazyFrame,
202        options: &crate::OpenOptions,
203        partition_columns: Option<Vec<String>>,
204    ) -> Result<Self> {
205        let column_order: Vec<String> = if let Some(ref part) = partition_columns {
206            let part_set: HashSet<&str> = part.iter().map(String::as_str).collect();
207            let rest: Vec<String> = schema
208                .iter_names()
209                .map(|s| s.to_string())
210                .filter(|c| !part_set.contains(c.as_str()))
211                .collect();
212            part.iter().cloned().chain(rest).collect()
213        } else {
214            schema.iter_names().map(|s| s.to_string()).collect()
215        };
216        Ok(Self {
217            original_lf: lf.clone(),
218            lf,
219            df: None,
220            locked_df: None,
221            table_state: TableState::default(),
222            start_row: 0,
223            visible_rows: 0,
224            termcol_index: 0,
225            visible_termcols: 0,
226            error: None,
227            suppress_error_display: false,
228            schema,
229            num_rows: 0,
230            num_rows_valid: false,
231            filters: Vec::new(),
232            sort_columns: Vec::new(),
233            sort_ascending: true,
234            active_query: String::new(),
235            active_sql_query: String::new(),
236            active_fuzzy_query: String::new(),
237            column_order,
238            locked_columns_count: 0,
239            grouped_lf: None,
240            drilled_down_group_index: None,
241            drilled_down_group_key: None,
242            drilled_down_group_key_columns: None,
243            pages_lookahead: options.pages_lookahead.unwrap_or(3),
244            pages_lookback: options.pages_lookback.unwrap_or(3),
245            max_buffered_rows: options.max_buffered_rows.unwrap_or(100_000),
246            max_buffered_mb: options.max_buffered_mb.unwrap_or(512),
247            buffered_start_row: 0,
248            buffered_end_row: 0,
249            buffered_df: None,
250            proximity_threshold: 0,
251            row_numbers: options.row_numbers,
252            row_start_index: options.row_start_index,
253            last_pivot_spec: None,
254            last_melt_spec: None,
255            partition_columns,
256            decompress_temp_file: None,
257            polars_streaming: options.polars_streaming,
258            workaround_pivot_date_index: options.workaround_pivot_date_index,
259        })
260    }
261
262    /// Reset LazyFrame and view state to original_lf. Schema is re-fetched so it matches
263    /// after a previous query/SQL that may have changed columns. Caller should call
264    /// collect() afterward if display update is needed (reset/query/fuzzy do; sql_query
265    /// relies on event loop Collect).
266    fn reset_lf_to_original(&mut self) {
267        self.invalidate_num_rows();
268        self.lf = self.original_lf.clone();
269        self.schema = self
270            .original_lf
271            .clone()
272            .collect_schema()
273            .unwrap_or_else(|_| Arc::new(Schema::with_capacity(0)));
274        self.column_order = self.schema.iter_names().map(|s| s.to_string()).collect();
275        self.active_query.clear();
276        self.active_sql_query.clear();
277        self.active_fuzzy_query.clear();
278        self.locked_columns_count = 0;
279        self.filters.clear();
280        self.sort_columns.clear();
281        self.sort_ascending = true;
282        self.start_row = 0;
283        self.termcol_index = 0;
284        self.drilled_down_group_index = None;
285        self.drilled_down_group_key = None;
286        self.drilled_down_group_key_columns = None;
287        self.grouped_lf = None;
288        self.buffered_start_row = 0;
289        self.buffered_end_row = 0;
290        self.buffered_df = None;
291        self.table_state.select(Some(0));
292    }
293
294    pub fn reset(&mut self) {
295        self.reset_lf_to_original();
296        self.error = None;
297        self.suppress_error_display = false;
298        self.last_pivot_spec = None;
299        self.last_melt_spec = None;
300        self.collect();
301        if self.num_rows > 0 {
302            self.start_row = 0;
303        }
304    }
305
306    pub fn from_parquet(
307        path: &Path,
308        pages_lookahead: Option<usize>,
309        pages_lookback: Option<usize>,
310        max_buffered_rows: Option<usize>,
311        max_buffered_mb: Option<usize>,
312        row_numbers: bool,
313        row_start_index: usize,
314    ) -> Result<Self> {
315        let path_str = path.as_os_str().to_string_lossy();
316        let is_glob = path_str.contains('*');
317        let pl_path = PlPath::Local(Arc::from(path));
318        let args = ScanArgsParquet {
319            glob: is_glob,
320            ..Default::default()
321        };
322        let lf = LazyFrame::scan_parquet(pl_path, args)?;
323        let mut state = Self::new(
324            lf,
325            pages_lookahead,
326            pages_lookback,
327            max_buffered_rows,
328            max_buffered_mb,
329            true,
330        )?;
331        state.row_numbers = row_numbers;
332        state.row_start_index = row_start_index;
333        Ok(state)
334    }
335
336    /// Load multiple Parquet files and concatenate them into one LazyFrame (same schema assumed).
337    pub fn from_parquet_paths(
338        paths: &[impl AsRef<Path>],
339        pages_lookahead: Option<usize>,
340        pages_lookback: Option<usize>,
341        max_buffered_rows: Option<usize>,
342        max_buffered_mb: Option<usize>,
343        row_numbers: bool,
344        row_start_index: usize,
345    ) -> Result<Self> {
346        if paths.is_empty() {
347            return Err(color_eyre::eyre::eyre!("No paths provided"));
348        }
349        if paths.len() == 1 {
350            return Self::from_parquet(
351                paths[0].as_ref(),
352                pages_lookahead,
353                pages_lookback,
354                max_buffered_rows,
355                max_buffered_mb,
356                row_numbers,
357                row_start_index,
358            );
359        }
360        let mut lazy_frames = Vec::with_capacity(paths.len());
361        for p in paths {
362            let pl_path = PlPath::Local(Arc::from(p.as_ref()));
363            let lf = LazyFrame::scan_parquet(pl_path, Default::default())?;
364            lazy_frames.push(lf);
365        }
366        let lf = polars::prelude::concat(lazy_frames.as_slice(), Default::default())?;
367        let mut state = Self::new(
368            lf,
369            pages_lookahead,
370            pages_lookback,
371            max_buffered_rows,
372            max_buffered_mb,
373            true,
374        )?;
375        state.row_numbers = row_numbers;
376        state.row_start_index = row_start_index;
377        Ok(state)
378    }
379
380    /// Load a single Arrow IPC / Feather v2 file (lazy).
381    pub fn from_ipc(
382        path: &Path,
383        pages_lookahead: Option<usize>,
384        pages_lookback: Option<usize>,
385        max_buffered_rows: Option<usize>,
386        max_buffered_mb: Option<usize>,
387        row_numbers: bool,
388        row_start_index: usize,
389    ) -> Result<Self> {
390        let pl_path = PlPath::Local(Arc::from(path));
391        let lf = LazyFrame::scan_ipc(pl_path, Default::default(), Default::default())?;
392        let mut state = Self::new(
393            lf,
394            pages_lookahead,
395            pages_lookback,
396            max_buffered_rows,
397            max_buffered_mb,
398            true,
399        )?;
400        state.row_numbers = row_numbers;
401        state.row_start_index = row_start_index;
402        Ok(state)
403    }
404
405    /// Load multiple Arrow IPC / Feather files and concatenate into one LazyFrame.
406    pub fn from_ipc_paths(
407        paths: &[impl AsRef<Path>],
408        pages_lookahead: Option<usize>,
409        pages_lookback: Option<usize>,
410        max_buffered_rows: Option<usize>,
411        max_buffered_mb: Option<usize>,
412        row_numbers: bool,
413        row_start_index: usize,
414    ) -> Result<Self> {
415        if paths.is_empty() {
416            return Err(color_eyre::eyre::eyre!("No paths provided"));
417        }
418        if paths.len() == 1 {
419            return Self::from_ipc(
420                paths[0].as_ref(),
421                pages_lookahead,
422                pages_lookback,
423                max_buffered_rows,
424                max_buffered_mb,
425                row_numbers,
426                row_start_index,
427            );
428        }
429        let mut lazy_frames = Vec::with_capacity(paths.len());
430        for p in paths {
431            let pl_path = PlPath::Local(Arc::from(p.as_ref()));
432            let lf = LazyFrame::scan_ipc(pl_path, Default::default(), Default::default())?;
433            lazy_frames.push(lf);
434        }
435        let lf = polars::prelude::concat(lazy_frames.as_slice(), Default::default())?;
436        let mut state = Self::new(
437            lf,
438            pages_lookahead,
439            pages_lookback,
440            max_buffered_rows,
441            max_buffered_mb,
442            true,
443        )?;
444        state.row_numbers = row_numbers;
445        state.row_start_index = row_start_index;
446        Ok(state)
447    }
448
449    /// Load a single Avro file (eager read, then lazy).
450    pub fn from_avro(
451        path: &Path,
452        pages_lookahead: Option<usize>,
453        pages_lookback: Option<usize>,
454        max_buffered_rows: Option<usize>,
455        max_buffered_mb: Option<usize>,
456        row_numbers: bool,
457        row_start_index: usize,
458    ) -> Result<Self> {
459        let file = File::open(path)?;
460        let df = polars::io::avro::AvroReader::new(file).finish()?;
461        let lf = df.lazy();
462        let mut state = Self::new(
463            lf,
464            pages_lookahead,
465            pages_lookback,
466            max_buffered_rows,
467            max_buffered_mb,
468            true,
469        )?;
470        state.row_numbers = row_numbers;
471        state.row_start_index = row_start_index;
472        Ok(state)
473    }
474
475    /// Load multiple Avro files and concatenate into one LazyFrame.
476    pub fn from_avro_paths(
477        paths: &[impl AsRef<Path>],
478        pages_lookahead: Option<usize>,
479        pages_lookback: Option<usize>,
480        max_buffered_rows: Option<usize>,
481        max_buffered_mb: Option<usize>,
482        row_numbers: bool,
483        row_start_index: usize,
484    ) -> Result<Self> {
485        if paths.is_empty() {
486            return Err(color_eyre::eyre::eyre!("No paths provided"));
487        }
488        if paths.len() == 1 {
489            return Self::from_avro(
490                paths[0].as_ref(),
491                pages_lookahead,
492                pages_lookback,
493                max_buffered_rows,
494                max_buffered_mb,
495                row_numbers,
496                row_start_index,
497            );
498        }
499        let mut lazy_frames = Vec::with_capacity(paths.len());
500        for p in paths {
501            let file = File::open(p.as_ref())?;
502            let df = polars::io::avro::AvroReader::new(file).finish()?;
503            lazy_frames.push(df.lazy());
504        }
505        let lf = polars::prelude::concat(lazy_frames.as_slice(), Default::default())?;
506        let mut state = Self::new(
507            lf,
508            pages_lookahead,
509            pages_lookback,
510            max_buffered_rows,
511            max_buffered_mb,
512            true,
513        )?;
514        state.row_numbers = row_numbers;
515        state.row_start_index = row_start_index;
516        Ok(state)
517    }
518
519    /// Load a single Excel file (xls, xlsx, xlsm, xlsb) using calamine (eager read, then lazy).
520    /// Sheet is selected by 0-based index or name via `excel_sheet`.
521    #[allow(clippy::too_many_arguments)]
522    pub fn from_excel(
523        path: &Path,
524        pages_lookahead: Option<usize>,
525        pages_lookback: Option<usize>,
526        max_buffered_rows: Option<usize>,
527        max_buffered_mb: Option<usize>,
528        row_numbers: bool,
529        row_start_index: usize,
530        excel_sheet: Option<&str>,
531    ) -> Result<Self> {
532        let mut workbook =
533            open_workbook_auto(path).map_err(|e| color_eyre::eyre::eyre!("Excel: {}", e))?;
534        let sheet_names = workbook.sheet_names().to_vec();
535        if sheet_names.is_empty() {
536            return Err(color_eyre::eyre::eyre!("Excel file has no worksheets"));
537        }
538        let range = if let Some(sheet_sel) = excel_sheet {
539            if let Ok(idx) = sheet_sel.parse::<usize>() {
540                workbook
541                    .worksheet_range_at(idx)
542                    .ok_or_else(|| color_eyre::eyre::eyre!("Excel: no sheet at index {}", idx))?
543                    .map_err(|e| color_eyre::eyre::eyre!("Excel: {}", e))?
544            } else {
545                workbook
546                    .worksheet_range(sheet_sel)
547                    .map_err(|e| color_eyre::eyre::eyre!("Excel: {}", e))?
548            }
549        } else {
550            workbook
551                .worksheet_range_at(0)
552                .ok_or_else(|| color_eyre::eyre::eyre!("Excel: no first sheet"))?
553                .map_err(|e| color_eyre::eyre::eyre!("Excel: {}", e))?
554        };
555        let rows: Vec<Vec<Data>> = range.rows().map(|r| r.to_vec()).collect();
556        if rows.is_empty() {
557            let empty_df = DataFrame::new(vec![])?;
558            let mut state = Self::new(
559                empty_df.lazy(),
560                pages_lookahead,
561                pages_lookback,
562                max_buffered_rows,
563                max_buffered_mb,
564                true,
565            )?;
566            state.row_numbers = row_numbers;
567            state.row_start_index = row_start_index;
568            return Ok(state);
569        }
570        let headers: Vec<String> = rows[0]
571            .iter()
572            .map(|c| calamine::DataType::as_string(c).unwrap_or_else(|| c.to_string()))
573            .collect();
574        let n_cols = headers.len();
575        let mut series_vec = Vec::with_capacity(n_cols);
576        for (col_idx, header) in headers.iter().enumerate() {
577            let col_cells: Vec<Option<&Data>> =
578                rows[1..].iter().map(|row| row.get(col_idx)).collect();
579            let inferred = Self::excel_infer_column_type(&col_cells);
580            let name = if header.is_empty() {
581                format!("column_{}", col_idx + 1)
582            } else {
583                header.clone()
584            };
585            let series = Self::excel_column_to_series(name.as_str(), &col_cells, inferred)?;
586            series_vec.push(series.into());
587        }
588        let df = DataFrame::new(series_vec)?;
589        let mut state = Self::new(
590            df.lazy(),
591            pages_lookahead,
592            pages_lookback,
593            max_buffered_rows,
594            max_buffered_mb,
595            true,
596        )?;
597        state.row_numbers = row_numbers;
598        state.row_start_index = row_start_index;
599        Ok(state)
600    }
601
602    /// Infers column type: prefers Int64 for whole-number floats; infers Date/Datetime for
603    /// calamine DateTime/DateTimeIso or for string columns that parse as ISO date/datetime.
604    fn excel_infer_column_type(cells: &[Option<&Data>]) -> ExcelColType {
605        use calamine::DataType as CalamineTrait;
606        let mut has_string = false;
607        let mut has_float = false;
608        let mut has_int = false;
609        let mut has_bool = false;
610        let mut has_datetime = false;
611        for cell in cells.iter().flatten() {
612            if CalamineTrait::is_string(*cell) {
613                has_string = true;
614                break;
615            }
616            if CalamineTrait::is_float(*cell)
617                || CalamineTrait::is_datetime(*cell)
618                || CalamineTrait::is_datetime_iso(*cell)
619            {
620                has_float = true;
621            }
622            if CalamineTrait::is_int(*cell) {
623                has_int = true;
624            }
625            if CalamineTrait::is_bool(*cell) {
626                has_bool = true;
627            }
628            if CalamineTrait::is_datetime(*cell) || CalamineTrait::is_datetime_iso(*cell) {
629                has_datetime = true;
630            }
631        }
632        if has_string {
633            let any_parsed = cells
634                .iter()
635                .flatten()
636                .any(|c| Self::excel_cell_to_naive_datetime(c).is_some());
637            let all_non_empty_parse = cells.iter().flatten().all(|c| {
638                CalamineTrait::is_empty(*c) || Self::excel_cell_to_naive_datetime(c).is_some()
639            });
640            if any_parsed && all_non_empty_parse {
641                if Self::excel_parsed_cells_all_midnight(cells) {
642                    ExcelColType::Date
643                } else {
644                    ExcelColType::Datetime
645                }
646            } else {
647                ExcelColType::Utf8
648            }
649        } else if has_int {
650            ExcelColType::Int64
651        } else if has_datetime {
652            if Self::excel_parsed_cells_all_midnight(cells) {
653                ExcelColType::Date
654            } else {
655                ExcelColType::Datetime
656            }
657        } else if has_float {
658            let all_whole = cells.iter().flatten().all(|cell| {
659                cell.as_f64()
660                    .is_none_or(|f| f.is_finite() && (f - f.trunc()).abs() < 1e-10)
661            });
662            if all_whole {
663                ExcelColType::Int64
664            } else {
665                ExcelColType::Float64
666            }
667        } else if has_bool {
668            ExcelColType::Boolean
669        } else {
670            ExcelColType::Utf8
671        }
672    }
673
674    /// True if every cell that parses as datetime has time 00:00:00.
675    fn excel_parsed_cells_all_midnight(cells: &[Option<&Data>]) -> bool {
676        let midnight = NaiveTime::from_hms_opt(0, 0, 0).expect("valid time");
677        cells
678            .iter()
679            .flatten()
680            .filter_map(|c| Self::excel_cell_to_naive_datetime(c))
681            .all(|dt| dt.time() == midnight)
682    }
683
684    /// Converts a calamine cell to NaiveDateTime (Excel serial, DateTimeIso, or parseable string).
685    fn excel_cell_to_naive_datetime(cell: &Data) -> Option<NaiveDateTime> {
686        use calamine::DataType;
687        if let Some(dt) = cell.as_datetime() {
688            return Some(dt);
689        }
690        let s = cell.get_datetime_iso().or_else(|| cell.get_string())?;
691        Self::parse_naive_datetime_str(s)
692    }
693
694    /// Parses an ISO-style date/datetime string; tries FORMATS in order.
695    fn parse_naive_datetime_str(s: &str) -> Option<NaiveDateTime> {
696        let s = s.trim();
697        if s.is_empty() {
698            return None;
699        }
700        const FORMATS: &[&str] = &[
701            "%Y-%m-%dT%H:%M:%S%.f",
702            "%Y-%m-%dT%H:%M:%S",
703            "%Y-%m-%d %H:%M:%S%.f",
704            "%Y-%m-%d %H:%M:%S",
705            "%Y-%m-%d",
706        ];
707        for fmt in FORMATS {
708            if let Ok(dt) = NaiveDateTime::parse_from_str(s, fmt) {
709                return Some(dt);
710            }
711        }
712        if let Ok(d) = NaiveDate::parse_from_str(s, "%Y-%m-%d") {
713            return Some(d.and_hms_opt(0, 0, 0).expect("midnight"));
714        }
715        None
716    }
717
718    /// Build a Polars Series from a column of calamine cells using the inferred type.
719    fn excel_column_to_series(
720        name: &str,
721        cells: &[Option<&Data>],
722        col_type: ExcelColType,
723    ) -> Result<Series> {
724        use calamine::DataType as CalamineTrait;
725        use polars::datatypes::TimeUnit;
726        let series = match col_type {
727            ExcelColType::Int64 => {
728                let v: Vec<Option<i64>> = cells
729                    .iter()
730                    .map(|c| c.and_then(|cell| cell.as_i64()))
731                    .collect();
732                Series::new(name.into(), v)
733            }
734            ExcelColType::Float64 => {
735                let v: Vec<Option<f64>> = cells
736                    .iter()
737                    .map(|c| c.and_then(|cell| cell.as_f64()))
738                    .collect();
739                Series::new(name.into(), v)
740            }
741            ExcelColType::Boolean => {
742                let v: Vec<Option<bool>> = cells
743                    .iter()
744                    .map(|c| c.and_then(|cell| cell.get_bool()))
745                    .collect();
746                Series::new(name.into(), v)
747            }
748            ExcelColType::Utf8 => {
749                let v: Vec<Option<String>> = cells
750                    .iter()
751                    .map(|c| c.and_then(|cell| cell.as_string()))
752                    .collect();
753                Series::new(name.into(), v)
754            }
755            ExcelColType::Date => {
756                let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).expect("valid date");
757                let v: Vec<Option<i32>> = cells
758                    .iter()
759                    .map(|c| {
760                        c.and_then(Self::excel_cell_to_naive_datetime)
761                            .map(|dt| (dt.date() - epoch).num_days() as i32)
762                    })
763                    .collect();
764                Series::new(name.into(), v).cast(&DataType::Date)?
765            }
766            ExcelColType::Datetime => {
767                let v: Vec<Option<i64>> = cells
768                    .iter()
769                    .map(|c| {
770                        c.and_then(Self::excel_cell_to_naive_datetime)
771                            .map(|dt| dt.and_utc().timestamp_micros())
772                    })
773                    .collect();
774                Series::new(name.into(), v)
775                    .cast(&DataType::Datetime(TimeUnit::Microseconds, None))?
776            }
777        };
778        Ok(series)
779    }
780
781    /// Load a single ORC file (eager read via orc-rust → Arrow, then convert to Polars, then lazy).
782    /// ORC is read fully into memory; see loading-data docs for large-file notes.
783    pub fn from_orc(
784        path: &Path,
785        pages_lookahead: Option<usize>,
786        pages_lookback: Option<usize>,
787        max_buffered_rows: Option<usize>,
788        max_buffered_mb: Option<usize>,
789        row_numbers: bool,
790        row_start_index: usize,
791    ) -> Result<Self> {
792        let file = File::open(path)?;
793        let reader = ArrowReaderBuilder::try_new(file)
794            .map_err(|e| color_eyre::eyre::eyre!("ORC: {}", e))?
795            .build();
796        let batches: Vec<RecordBatch> = reader
797            .collect::<std::result::Result<Vec<_>, _>>()
798            .map_err(|e| color_eyre::eyre::eyre!("ORC: {}", e))?;
799        let df = Self::arrow_record_batches_to_dataframe(&batches)?;
800        let lf = df.lazy();
801        let mut state = Self::new(
802            lf,
803            pages_lookahead,
804            pages_lookback,
805            max_buffered_rows,
806            max_buffered_mb,
807            true,
808        )?;
809        state.row_numbers = row_numbers;
810        state.row_start_index = row_start_index;
811        Ok(state)
812    }
813
814    /// Load multiple ORC files and concatenate into one LazyFrame.
815    pub fn from_orc_paths(
816        paths: &[impl AsRef<Path>],
817        pages_lookahead: Option<usize>,
818        pages_lookback: Option<usize>,
819        max_buffered_rows: Option<usize>,
820        max_buffered_mb: Option<usize>,
821        row_numbers: bool,
822        row_start_index: usize,
823    ) -> Result<Self> {
824        if paths.is_empty() {
825            return Err(color_eyre::eyre::eyre!("No paths provided"));
826        }
827        if paths.len() == 1 {
828            return Self::from_orc(
829                paths[0].as_ref(),
830                pages_lookahead,
831                pages_lookback,
832                max_buffered_rows,
833                max_buffered_mb,
834                row_numbers,
835                row_start_index,
836            );
837        }
838        let mut lazy_frames = Vec::with_capacity(paths.len());
839        for p in paths {
840            let file = File::open(p.as_ref())?;
841            let reader = ArrowReaderBuilder::try_new(file)
842                .map_err(|e| color_eyre::eyre::eyre!("ORC: {}", e))?
843                .build();
844            let batches: Vec<RecordBatch> = reader
845                .collect::<std::result::Result<Vec<_>, _>>()
846                .map_err(|e| color_eyre::eyre::eyre!("ORC: {}", e))?;
847            let df = Self::arrow_record_batches_to_dataframe(&batches)?;
848            lazy_frames.push(df.lazy());
849        }
850        let lf = polars::prelude::concat(lazy_frames.as_slice(), Default::default())?;
851        let mut state = Self::new(
852            lf,
853            pages_lookahead,
854            pages_lookback,
855            max_buffered_rows,
856            max_buffered_mb,
857            true,
858        )?;
859        state.row_numbers = row_numbers;
860        state.row_start_index = row_start_index;
861        Ok(state)
862    }
863
864    /// Convert Arrow (arrow crate 57) RecordBatches to Polars DataFrame by value (ORC uses
865    /// arrow 57; Polars uses polars-arrow, so we cannot use Series::from_arrow).
866    fn arrow_record_batches_to_dataframe(batches: &[RecordBatch]) -> Result<DataFrame> {
867        if batches.is_empty() {
868            return Ok(DataFrame::new(vec![])?);
869        }
870        let mut all_dfs = Vec::with_capacity(batches.len());
871        for batch in batches {
872            let n_cols = batch.num_columns();
873            let schema = batch.schema();
874            let mut series_vec = Vec::with_capacity(n_cols);
875            for (i, col) in batch.columns().iter().enumerate() {
876                let name = schema.field(i).name().as_str();
877                let s = Self::arrow_array_to_polars_series(name, col)?;
878                series_vec.push(s.into());
879            }
880            let df = DataFrame::new(series_vec)?;
881            all_dfs.push(df);
882        }
883        let mut out = all_dfs.remove(0);
884        for df in all_dfs {
885            out = out.vstack(&df)?;
886        }
887        Ok(out)
888    }
889
890    fn arrow_array_to_polars_series(name: &str, array: &dyn Array) -> Result<Series> {
891        use arrow::datatypes::DataType as ArrowDataType;
892        let len = array.len();
893        match array.data_type() {
894            ArrowDataType::Int8 => {
895                let a = array
896                    .as_primitive_opt::<Int8Type>()
897                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Int8 array"))?;
898                let v: Vec<Option<i8>> = (0..len)
899                    .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
900                    .collect();
901                Ok(Series::new(name.into(), v))
902            }
903            ArrowDataType::Int16 => {
904                let a = array
905                    .as_primitive_opt::<Int16Type>()
906                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Int16 array"))?;
907                let v: Vec<Option<i16>> = (0..len)
908                    .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
909                    .collect();
910                Ok(Series::new(name.into(), v))
911            }
912            ArrowDataType::Int32 => {
913                let a = array
914                    .as_primitive_opt::<Int32Type>()
915                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Int32 array"))?;
916                let v: Vec<Option<i32>> = (0..len)
917                    .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
918                    .collect();
919                Ok(Series::new(name.into(), v))
920            }
921            ArrowDataType::Int64 => {
922                let a = array
923                    .as_primitive_opt::<Int64Type>()
924                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Int64 array"))?;
925                let v: Vec<Option<i64>> = (0..len)
926                    .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
927                    .collect();
928                Ok(Series::new(name.into(), v))
929            }
930            ArrowDataType::UInt8 => {
931                let a = array
932                    .as_primitive_opt::<UInt8Type>()
933                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected UInt8 array"))?;
934                let v: Vec<Option<i64>> = (0..len)
935                    .map(|i| {
936                        if a.is_null(i) {
937                            None
938                        } else {
939                            Some(a.value(i) as i64)
940                        }
941                    })
942                    .collect();
943                Ok(Series::new(name.into(), v).cast(&DataType::UInt8)?)
944            }
945            ArrowDataType::UInt16 => {
946                let a = array
947                    .as_primitive_opt::<UInt16Type>()
948                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected UInt16 array"))?;
949                let v: Vec<Option<i64>> = (0..len)
950                    .map(|i| {
951                        if a.is_null(i) {
952                            None
953                        } else {
954                            Some(a.value(i) as i64)
955                        }
956                    })
957                    .collect();
958                Ok(Series::new(name.into(), v).cast(&DataType::UInt16)?)
959            }
960            ArrowDataType::UInt32 => {
961                let a = array
962                    .as_primitive_opt::<UInt32Type>()
963                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected UInt32 array"))?;
964                let v: Vec<Option<u32>> = (0..len)
965                    .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
966                    .collect();
967                Ok(Series::new(name.into(), v))
968            }
969            ArrowDataType::UInt64 => {
970                let a = array
971                    .as_primitive_opt::<UInt64Type>()
972                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected UInt64 array"))?;
973                let v: Vec<Option<u64>> = (0..len)
974                    .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
975                    .collect();
976                Ok(Series::new(name.into(), v))
977            }
978            ArrowDataType::Float32 => {
979                let a = array
980                    .as_primitive_opt::<Float32Type>()
981                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Float32 array"))?;
982                let v: Vec<Option<f32>> = (0..len)
983                    .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
984                    .collect();
985                Ok(Series::new(name.into(), v))
986            }
987            ArrowDataType::Float64 => {
988                let a = array
989                    .as_primitive_opt::<Float64Type>()
990                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Float64 array"))?;
991                let v: Vec<Option<f64>> = (0..len)
992                    .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
993                    .collect();
994                Ok(Series::new(name.into(), v))
995            }
996            ArrowDataType::Boolean => {
997                let a = array
998                    .as_boolean_opt()
999                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Boolean array"))?;
1000                let v: Vec<Option<bool>> = (0..len)
1001                    .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
1002                    .collect();
1003                Ok(Series::new(name.into(), v))
1004            }
1005            ArrowDataType::Utf8 => {
1006                let a = array
1007                    .as_string_opt::<i32>()
1008                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Utf8 array"))?;
1009                let v: Vec<Option<String>> = (0..len)
1010                    .map(|i| {
1011                        if a.is_null(i) {
1012                            None
1013                        } else {
1014                            Some(a.value(i).to_string())
1015                        }
1016                    })
1017                    .collect();
1018                Ok(Series::new(name.into(), v))
1019            }
1020            ArrowDataType::LargeUtf8 => {
1021                let a = array
1022                    .as_string_opt::<i64>()
1023                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected LargeUtf8 array"))?;
1024                let v: Vec<Option<String>> = (0..len)
1025                    .map(|i| {
1026                        if a.is_null(i) {
1027                            None
1028                        } else {
1029                            Some(a.value(i).to_string())
1030                        }
1031                    })
1032                    .collect();
1033                Ok(Series::new(name.into(), v))
1034            }
1035            ArrowDataType::Date32 => {
1036                let a = array
1037                    .as_primitive_opt::<Date32Type>()
1038                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Date32 array"))?;
1039                let v: Vec<Option<i32>> = (0..len)
1040                    .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
1041                    .collect();
1042                Ok(Series::new(name.into(), v))
1043            }
1044            ArrowDataType::Date64 => {
1045                let a = array
1046                    .as_primitive_opt::<Date64Type>()
1047                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Date64 array"))?;
1048                let v: Vec<Option<i64>> = (0..len)
1049                    .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
1050                    .collect();
1051                Ok(Series::new(name.into(), v))
1052            }
1053            ArrowDataType::Timestamp(_, _) => {
1054                let a = array
1055                    .as_primitive_opt::<TimestampMillisecondType>()
1056                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Timestamp array"))?;
1057                let v: Vec<Option<i64>> = (0..len)
1058                    .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
1059                    .collect();
1060                Ok(Series::new(name.into(), v))
1061            }
1062            other => Err(color_eyre::eyre::eyre!(
1063                "ORC: unsupported column type {:?} for column '{}'",
1064                other,
1065                name
1066            )),
1067        }
1068    }
1069
1070    /// Build a LazyFrame for hive-partitioned Parquet only (no schema collection, no partition discovery).
1071    /// Use this for phased loading so "Scanning input" is instant; schema and partition handling happen in DoLoadSchema.
1072    pub fn scan_parquet_hive(path: &Path) -> Result<LazyFrame> {
1073        let path_str = path.as_os_str().to_string_lossy();
1074        let is_glob = path_str.contains('*');
1075        let pl_path = PlPath::Local(Arc::from(path));
1076        let args = ScanArgsParquet {
1077            hive_options: HiveOptions::new_enabled(),
1078            glob: is_glob,
1079            ..Default::default()
1080        };
1081        LazyFrame::scan_parquet(pl_path, args).map_err(Into::into)
1082    }
1083
1084    /// Build a LazyFrame for hive-partitioned Parquet with a pre-computed schema (avoids slow collect_schema across all files).
1085    pub fn scan_parquet_hive_with_schema(path: &Path, schema: Arc<Schema>) -> Result<LazyFrame> {
1086        let path_str = path.as_os_str().to_string_lossy();
1087        let is_glob = path_str.contains('*');
1088        let pl_path = PlPath::Local(Arc::from(path));
1089        let args = ScanArgsParquet {
1090            schema: Some(schema),
1091            hive_options: HiveOptions::new_enabled(),
1092            glob: is_glob,
1093            ..Default::default()
1094        };
1095        LazyFrame::scan_parquet(pl_path, args).map_err(Into::into)
1096    }
1097
1098    /// Find the first parquet file along a single spine of a hive-partitioned directory (same walk as partition discovery).
1099    /// Returns `None` if the directory is empty or has no parquet files along that spine.
1100    fn first_parquet_file_in_hive_dir(path: &Path) -> Option<std::path::PathBuf> {
1101        const MAX_DEPTH: usize = 64;
1102        Self::first_parquet_file_spine(path, 0, MAX_DEPTH)
1103    }
1104
1105    fn first_parquet_file_spine(
1106        path: &Path,
1107        depth: usize,
1108        max_depth: usize,
1109    ) -> Option<std::path::PathBuf> {
1110        if depth >= max_depth {
1111            return None;
1112        }
1113        let entries = fs::read_dir(path).ok()?;
1114        let mut first_partition_child: Option<std::path::PathBuf> = None;
1115        for entry in entries.flatten() {
1116            let child = entry.path();
1117            if child.is_file() {
1118                if child
1119                    .extension()
1120                    .is_some_and(|e| e.eq_ignore_ascii_case("parquet"))
1121                {
1122                    return Some(child);
1123                }
1124            } else if child.is_dir() {
1125                if let Some(name) = child.file_name().and_then(|n| n.to_str()) {
1126                    if name.contains('=') && first_partition_child.is_none() {
1127                        first_partition_child = Some(child);
1128                    }
1129                }
1130            }
1131        }
1132        first_partition_child.and_then(|p| Self::first_parquet_file_spine(&p, depth + 1, max_depth))
1133    }
1134
1135    /// Read schema from a single parquet file (metadata only, no data scan). Used to avoid collect_schema() over many files.
1136    fn read_schema_from_single_parquet(path: &Path) -> Result<Arc<Schema>> {
1137        let file = File::open(path)?;
1138        let mut reader = ParquetReader::new(file);
1139        let arrow_schema = reader.schema()?;
1140        let schema = Schema::from_arrow_schema(arrow_schema.as_ref());
1141        Ok(Arc::new(schema))
1142    }
1143
1144    /// Infer schema from one parquet file in a hive directory and merge with partition columns (Utf8).
1145    /// Returns (merged_schema, partition_columns). Use with scan_parquet_hive_with_schema to avoid slow collect_schema().
1146    /// Only supported when path is a directory (not a glob). Returns Err if no parquet file found or read fails.
1147    pub fn schema_from_one_hive_parquet(path: &Path) -> Result<(Arc<Schema>, Vec<String>)> {
1148        let partition_columns = Self::discover_hive_partition_columns(path);
1149        let one_file = Self::first_parquet_file_in_hive_dir(path)
1150            .ok_or_else(|| color_eyre::eyre::eyre!("No parquet file found in hive directory"))?;
1151        let file_schema = Self::read_schema_from_single_parquet(&one_file)?;
1152        let part_set: HashSet<&str> = partition_columns.iter().map(String::as_str).collect();
1153        let mut merged = Schema::with_capacity(partition_columns.len() + file_schema.len());
1154        for name in &partition_columns {
1155            merged.with_column(name.clone().into(), DataType::String);
1156        }
1157        for (name, dtype) in file_schema.iter() {
1158            if !part_set.contains(name.as_str()) {
1159                merged.with_column(name.clone(), dtype.clone());
1160            }
1161        }
1162        Ok((Arc::new(merged), partition_columns))
1163    }
1164
1165    /// Discover hive partition column names (public for phased loading). Directory: single-spine walk; glob: parse pattern.
1166    pub fn discover_hive_partition_columns(path: &Path) -> Vec<String> {
1167        if path.is_dir() {
1168            Self::discover_partition_columns_from_path(path)
1169        } else {
1170            Self::discover_partition_columns_from_glob_pattern(path)
1171        }
1172    }
1173
1174    /// Discover hive partition column names from a directory path by walking a single
1175    /// "spine" (one branch) of key=value directories. Partition keys are uniform across
1176    /// the tree, so we only need one path to infer [year, month, day] etc. Returns columns
1177    /// in path order. Stops after max_depth levels to avoid runaway on malformed trees.
1178    fn discover_partition_columns_from_path(path: &Path) -> Vec<String> {
1179        const MAX_PARTITION_DEPTH: usize = 64;
1180        let mut columns = Vec::<String>::new();
1181        let mut seen = HashSet::<String>::new();
1182        Self::discover_partition_columns_spine(
1183            path,
1184            &mut columns,
1185            &mut seen,
1186            0,
1187            MAX_PARTITION_DEPTH,
1188        );
1189        columns
1190    }
1191
1192    /// Walk one branch: at this directory, find the first child that is a key=value dir,
1193    /// record the key (if not already seen), then recurse into that one child only.
1194    /// This does O(depth) read_dir calls instead of walking the entire tree.
1195    fn discover_partition_columns_spine(
1196        path: &Path,
1197        columns: &mut Vec<String>,
1198        seen: &mut HashSet<String>,
1199        depth: usize,
1200        max_depth: usize,
1201    ) {
1202        if depth >= max_depth {
1203            return;
1204        }
1205        let Ok(entries) = fs::read_dir(path) else {
1206            return;
1207        };
1208        let mut first_partition_child: Option<std::path::PathBuf> = None;
1209        for entry in entries.flatten() {
1210            let child = entry.path();
1211            if child.is_dir() {
1212                if let Some(name) = child.file_name().and_then(|n| n.to_str()) {
1213                    if let Some((key, _)) = name.split_once('=') {
1214                        if !key.is_empty() && seen.insert(key.to_string()) {
1215                            columns.push(key.to_string());
1216                        }
1217                        if first_partition_child.is_none() {
1218                            first_partition_child = Some(child);
1219                        }
1220                        break;
1221                    }
1222                }
1223            }
1224        }
1225        if let Some(one) = first_partition_child {
1226            Self::discover_partition_columns_spine(&one, columns, seen, depth + 1, max_depth);
1227        }
1228    }
1229
1230    /// Infer partition column names from a glob pattern path (e.g. "data/year=*/month=*/*.parquet").
1231    fn discover_partition_columns_from_glob_pattern(path: &Path) -> Vec<String> {
1232        let path_str = path.as_os_str().to_string_lossy();
1233        let mut columns = Vec::<String>::new();
1234        let mut seen = HashSet::<String>::new();
1235        for segment in path_str.split('/') {
1236            if let Some((key, rest)) = segment.split_once('=') {
1237                if !key.is_empty()
1238                    && (rest == "*" || !rest.contains('*'))
1239                    && seen.insert(key.to_string())
1240                {
1241                    columns.push(key.to_string());
1242                }
1243            }
1244        }
1245        columns
1246    }
1247
1248    /// Load Parquet with Hive partitioning from a directory or glob path.
1249    /// When path is a directory, partition columns are discovered from path structure.
1250    /// When path contains glob (e.g. `**/*.parquet`), partition columns are inferred from the pattern (e.g. `year=*/month=*`).
1251    /// Partition columns are moved to the left in the initial LazyFrame before state is created.
1252    ///
1253    /// **Performance**: The slow part is Polars, not our code. `scan_parquet` + `collect_schema()` trigger
1254    /// path expansion (full directory tree or glob) and parquet metadata reads; we only do a single-spine
1255    /// walk for partition key discovery and cheap schema/select work.
1256    pub fn from_parquet_hive(
1257        path: &Path,
1258        pages_lookahead: Option<usize>,
1259        pages_lookback: Option<usize>,
1260        max_buffered_rows: Option<usize>,
1261        max_buffered_mb: Option<usize>,
1262        row_numbers: bool,
1263        row_start_index: usize,
1264    ) -> Result<Self> {
1265        let path_str = path.as_os_str().to_string_lossy();
1266        let is_glob = path_str.contains('*');
1267        let pl_path = PlPath::Local(Arc::from(path));
1268        let args = ScanArgsParquet {
1269            hive_options: HiveOptions::new_enabled(),
1270            glob: is_glob,
1271            ..Default::default()
1272        };
1273        let mut lf = LazyFrame::scan_parquet(pl_path, args)?;
1274        let schema = lf.collect_schema()?;
1275
1276        let mut discovered = if path.is_dir() {
1277            Self::discover_partition_columns_from_path(path)
1278        } else {
1279            Self::discover_partition_columns_from_glob_pattern(path)
1280        };
1281
1282        // Fallback: glob like "**/*.parquet" has no key= in the pattern, so discovery is empty.
1283        // Try discovering from a directory prefix (e.g. path.parent() or walk up until we find a dir).
1284        if discovered.is_empty() {
1285            let mut dir = path;
1286            while !dir.is_dir() {
1287                match dir.parent() {
1288                    Some(p) => dir = p,
1289                    None => break,
1290                }
1291            }
1292            if dir.is_dir() {
1293                discovered = Self::discover_partition_columns_from_path(dir);
1294            }
1295        }
1296
1297        let partition_columns: Vec<String> = discovered
1298            .into_iter()
1299            .filter(|c| schema.contains(c.as_str()))
1300            .collect();
1301
1302        let new_order: Vec<String> = if partition_columns.is_empty() {
1303            schema.iter_names().map(|s| s.to_string()).collect()
1304        } else {
1305            let part_set: HashSet<&str> = partition_columns.iter().map(String::as_str).collect();
1306            let all_names: Vec<String> = schema.iter_names().map(|s| s.to_string()).collect();
1307            let rest: Vec<String> = all_names
1308                .into_iter()
1309                .filter(|c| !part_set.contains(c.as_str()))
1310                .collect();
1311            partition_columns.iter().cloned().chain(rest).collect()
1312        };
1313
1314        if !partition_columns.is_empty() {
1315            let exprs: Vec<Expr> = new_order.iter().map(|s| col(s.as_str())).collect();
1316            lf = lf.select(exprs);
1317        }
1318
1319        let mut state = Self::new(
1320            lf,
1321            pages_lookahead,
1322            pages_lookback,
1323            max_buffered_rows,
1324            max_buffered_mb,
1325            true,
1326        )?;
1327        state.row_numbers = row_numbers;
1328        state.row_start_index = row_start_index;
1329        state.partition_columns = if partition_columns.is_empty() {
1330            None
1331        } else {
1332            Some(partition_columns)
1333        };
1334        // Ensure display order is partition-first (Self::new uses schema order; be explicit).
1335        state.set_column_order(new_order);
1336        Ok(state)
1337    }
1338
1339    pub fn set_row_numbers(&mut self, enabled: bool) {
1340        self.row_numbers = enabled;
1341    }
1342
1343    pub fn toggle_row_numbers(&mut self) {
1344        self.row_numbers = !self.row_numbers;
1345    }
1346
1347    /// Row number display start (0 or 1); used by go-to-line to interpret user input.
1348    pub fn row_start_index(&self) -> usize {
1349        self.row_start_index
1350    }
1351
1352    /// Decompress a compressed file to a temp file for lazy CSV scan.
1353    fn decompress_compressed_csv_to_temp(
1354        path: &Path,
1355        compression: CompressionFormat,
1356        temp_dir: &Path,
1357    ) -> Result<NamedTempFile> {
1358        let mut temp = NamedTempFile::new_in(temp_dir)?;
1359        let out = temp.as_file_mut();
1360        let mut reader: Box<dyn Read> = match compression {
1361            CompressionFormat::Gzip => {
1362                let f = File::open(path)?;
1363                Box::new(flate2::read::GzDecoder::new(BufReader::new(f)))
1364            }
1365            CompressionFormat::Zstd => {
1366                let f = File::open(path)?;
1367                Box::new(zstd::Decoder::new(BufReader::new(f))?)
1368            }
1369            CompressionFormat::Bzip2 => {
1370                let f = File::open(path)?;
1371                Box::new(bzip2::read::BzDecoder::new(BufReader::new(f)))
1372            }
1373            CompressionFormat::Xz => {
1374                let f = File::open(path)?;
1375                Box::new(xz2::read::XzDecoder::new(BufReader::new(f)))
1376            }
1377        };
1378        std::io::copy(&mut reader, out)?;
1379        out.sync_all()?;
1380        Ok(temp)
1381    }
1382
1383    /// Parse null value specs: "VAL" -> global, "COL=VAL" -> per-column (first '=' separates).
1384    fn parse_null_value_specs(specs: &[String]) -> (Vec<String>, Vec<(String, String)>) {
1385        let mut global = Vec::new();
1386        let mut per_column = Vec::new();
1387        for s in specs {
1388            if let Some(i) = s.find('=') {
1389                let (col, val) = (s[..i].to_string(), s[i + 1..].to_string());
1390                per_column.push((col, val));
1391            } else {
1392                global.push(s.clone());
1393            }
1394        }
1395        (global, per_column)
1396    }
1397
1398    /// Build Polars NullValues from parsed specs. When both global and per_column are set, schema is required (caller does schema scan).
1399    fn build_polars_null_values(
1400        global: &[String],
1401        per_column: &[(String, String)],
1402        schema: Option<&Schema>,
1403    ) -> Option<NullValues> {
1404        if global.is_empty() && per_column.is_empty() {
1405            return None;
1406        }
1407        if per_column.is_empty() {
1408            let vals: Vec<PlSmallStr> = global
1409                .iter()
1410                .map(|s| PlSmallStr::from(s.as_str()))
1411                .collect();
1412            return Some(if vals.len() == 1 {
1413                NullValues::AllColumnsSingle(vals[0].clone())
1414            } else {
1415                NullValues::AllColumns(vals)
1416            });
1417        }
1418        if global.is_empty() {
1419            let pairs: Vec<(PlSmallStr, PlSmallStr)> = per_column
1420                .iter()
1421                .map(|(c, v)| (PlSmallStr::from(c.as_str()), PlSmallStr::from(v.as_str())))
1422                .collect();
1423            return Some(NullValues::Named(pairs));
1424        }
1425        let schema = schema?;
1426        let mut pairs: Vec<(PlSmallStr, PlSmallStr)> = Vec::new();
1427        let first_global = PlSmallStr::from(global[0].as_str());
1428        for (name, _) in schema.iter() {
1429            let col_name = name.as_str();
1430            let val = per_column
1431                .iter()
1432                .rev()
1433                .find(|(c, _)| c == col_name)
1434                .map(|(_, v)| PlSmallStr::from(v.as_str()))
1435                .unwrap_or_else(|| first_global.clone());
1436            pairs.push((PlSmallStr::from(col_name), val));
1437        }
1438        Some(NullValues::Named(pairs))
1439    }
1440
1441    /// Infer CSV schema with minimal read (one row) for building null_values when both global and per-column are set.
1442    fn csv_schema_for_null_values(path: &Path, options: &OpenOptions) -> Result<Arc<Schema>> {
1443        let pl_path = PlPath::Local(Arc::from(path));
1444        let mut reader = LazyCsvReader::new(pl_path).with_n_rows(Some(1));
1445        if let Some(skip_lines) = options.skip_lines {
1446            reader = reader.with_skip_lines(skip_lines);
1447        }
1448        if let Some(skip_rows) = options.skip_rows {
1449            reader = reader.with_skip_rows(skip_rows);
1450        }
1451        if let Some(has_header) = options.has_header {
1452            reader = reader.with_has_header(has_header);
1453        }
1454        reader = reader.with_try_parse_dates(options.parse_dates);
1455        let mut lf = reader.finish()?;
1456        lf.collect_schema().map_err(color_eyre::eyre::Report::from)
1457    }
1458
1459    /// Build Polars NullValues from options; path_for_schema required when both global and per-column specs are set.
1460    fn build_null_values_for_csv(
1461        options: &OpenOptions,
1462        path_for_schema: Option<&Path>,
1463    ) -> Result<Option<NullValues>> {
1464        let specs = match &options.null_values {
1465            None => return Ok(None),
1466            Some(s) if s.is_empty() => return Ok(None),
1467            Some(s) => s.as_slice(),
1468        };
1469        let (global, per_column) = Self::parse_null_value_specs(specs);
1470        let nv = if !global.is_empty() && !per_column.is_empty() {
1471            let path = path_for_schema.ok_or_else(|| {
1472                color_eyre::eyre::eyre!(
1473                    "Internal error: path required for null_values with both global and per-column"
1474                )
1475            })?;
1476            let schema = Self::csv_schema_for_null_values(path, options)?;
1477            Self::build_polars_null_values(&global, &per_column, Some(schema.as_ref()))
1478        } else {
1479            Self::build_polars_null_values(&global, &per_column, None)
1480        };
1481        Ok(nv)
1482    }
1483
1484    pub fn from_csv(path: &Path, options: &OpenOptions) -> Result<Self> {
1485        let nv = Self::build_null_values_for_csv(options, Some(path))?;
1486
1487        // Determine compression format: explicit option, or auto-detect from extension
1488        let compression = options
1489            .compression
1490            .or_else(|| CompressionFormat::from_extension(path));
1491
1492        if let Some(compression) = compression {
1493            if options.decompress_in_memory {
1494                // Eager read: decompress into memory, then CSV read
1495                match compression {
1496                    CompressionFormat::Gzip | CompressionFormat::Zstd => {
1497                        let mut read_options = CsvReadOptions::default();
1498                        if let Some(skip_lines) = options.skip_lines {
1499                            read_options.skip_lines = skip_lines;
1500                        }
1501                        if let Some(skip_rows) = options.skip_rows {
1502                            read_options.skip_rows = skip_rows;
1503                        }
1504                        if let Some(has_header) = options.has_header {
1505                            read_options.has_header = has_header;
1506                        }
1507                        read_options = read_options.map_parse_options(|opts| {
1508                            let o = opts.with_try_parse_dates(options.parse_dates);
1509                            match &nv {
1510                                Some(n) => o.with_null_values(Some(n.clone())),
1511                                None => o,
1512                            }
1513                        });
1514                        let df = read_options
1515                            .try_into_reader_with_file_path(Some(path.into()))?
1516                            .finish()?;
1517                        let lf = df.lazy();
1518                        let mut state = Self::new(
1519                            lf,
1520                            options.pages_lookahead,
1521                            options.pages_lookback,
1522                            options.max_buffered_rows,
1523                            options.max_buffered_mb,
1524                            options.polars_streaming,
1525                        )?;
1526                        state.row_numbers = options.row_numbers;
1527                        state.row_start_index = options.row_start_index;
1528                        Ok(state)
1529                    }
1530                    CompressionFormat::Bzip2 => {
1531                        let file = File::open(path)?;
1532                        let mut decoder = bzip2::read::BzDecoder::new(BufReader::new(file));
1533                        let mut decompressed = Vec::new();
1534                        decoder.read_to_end(&mut decompressed)?;
1535                        let mut read_options = CsvReadOptions::default();
1536                        if let Some(skip_lines) = options.skip_lines {
1537                            read_options.skip_lines = skip_lines;
1538                        }
1539                        if let Some(skip_rows) = options.skip_rows {
1540                            read_options.skip_rows = skip_rows;
1541                        }
1542                        if let Some(has_header) = options.has_header {
1543                            read_options.has_header = has_header;
1544                        }
1545                        read_options = read_options.map_parse_options(|opts| {
1546                            let o = opts.with_try_parse_dates(options.parse_dates);
1547                            match &nv {
1548                                Some(n) => o.with_null_values(Some(n.clone())),
1549                                None => o,
1550                            }
1551                        });
1552                        let df = CsvReader::new(std::io::Cursor::new(decompressed))
1553                            .with_options(read_options)
1554                            .finish()?;
1555                        let lf = df.lazy();
1556                        let mut state = Self::new(
1557                            lf,
1558                            options.pages_lookahead,
1559                            options.pages_lookback,
1560                            options.max_buffered_rows,
1561                            options.max_buffered_mb,
1562                            options.polars_streaming,
1563                        )?;
1564                        state.row_numbers = options.row_numbers;
1565                        state.row_start_index = options.row_start_index;
1566                        Ok(state)
1567                    }
1568                    CompressionFormat::Xz => {
1569                        let file = File::open(path)?;
1570                        let mut decoder = xz2::read::XzDecoder::new(BufReader::new(file));
1571                        let mut decompressed = Vec::new();
1572                        decoder.read_to_end(&mut decompressed)?;
1573                        let mut read_options = CsvReadOptions::default();
1574                        if let Some(skip_lines) = options.skip_lines {
1575                            read_options.skip_lines = skip_lines;
1576                        }
1577                        if let Some(skip_rows) = options.skip_rows {
1578                            read_options.skip_rows = skip_rows;
1579                        }
1580                        if let Some(has_header) = options.has_header {
1581                            read_options.has_header = has_header;
1582                        }
1583                        read_options = read_options.map_parse_options(|opts| {
1584                            let o = opts.with_try_parse_dates(options.parse_dates);
1585                            match &nv {
1586                                Some(n) => o.with_null_values(Some(n.clone())),
1587                                None => o,
1588                            }
1589                        });
1590                        let df = CsvReader::new(std::io::Cursor::new(decompressed))
1591                            .with_options(read_options)
1592                            .finish()?;
1593                        let lf = df.lazy();
1594                        let mut state = Self::new(
1595                            lf,
1596                            options.pages_lookahead,
1597                            options.pages_lookback,
1598                            options.max_buffered_rows,
1599                            options.max_buffered_mb,
1600                            options.polars_streaming,
1601                        )?;
1602                        state.row_numbers = options.row_numbers;
1603                        state.row_start_index = options.row_start_index;
1604                        Ok(state)
1605                    }
1606                }
1607            } else {
1608                // Decompress to temp file, then lazy scan
1609                let temp_dir = options.temp_dir.clone().unwrap_or_else(std::env::temp_dir);
1610                let temp = Self::decompress_compressed_csv_to_temp(path, compression, &temp_dir)?;
1611                let nv_temp = Self::build_null_values_for_csv(options, Some(temp.path()))?;
1612                let mut state = Self::from_csv_customize(
1613                    temp.path(),
1614                    options.pages_lookahead,
1615                    options.pages_lookback,
1616                    options.max_buffered_rows,
1617                    options.max_buffered_mb,
1618                    |mut reader| {
1619                        if let Some(skip_lines) = options.skip_lines {
1620                            reader = reader.with_skip_lines(skip_lines);
1621                        }
1622                        if let Some(skip_rows) = options.skip_rows {
1623                            reader = reader.with_skip_rows(skip_rows);
1624                        }
1625                        if let Some(has_header) = options.has_header {
1626                            reader = reader.with_has_header(has_header);
1627                        }
1628                        reader = reader.with_try_parse_dates(options.parse_dates);
1629                        reader = match &nv_temp {
1630                            Some(n) => reader
1631                                .map_parse_options(|opts| opts.with_null_values(Some(n.clone()))),
1632                            None => reader,
1633                        };
1634                        reader
1635                    },
1636                )?;
1637                state.row_numbers = options.row_numbers;
1638                state.row_start_index = options.row_start_index;
1639                state.decompress_temp_file = Some(temp);
1640                Ok(state)
1641            }
1642        } else {
1643            // For uncompressed files, use lazy scanning (more efficient)
1644            let mut state = Self::from_csv_customize(
1645                path,
1646                options.pages_lookahead,
1647                options.pages_lookback,
1648                options.max_buffered_rows,
1649                options.max_buffered_mb,
1650                |mut reader| {
1651                    if let Some(skip_lines) = options.skip_lines {
1652                        reader = reader.with_skip_lines(skip_lines);
1653                    }
1654                    if let Some(skip_rows) = options.skip_rows {
1655                        reader = reader.with_skip_rows(skip_rows);
1656                    }
1657                    if let Some(has_header) = options.has_header {
1658                        reader = reader.with_has_header(has_header);
1659                    }
1660                    reader = reader.with_try_parse_dates(options.parse_dates);
1661                    reader = match &nv {
1662                        Some(n) => {
1663                            reader.map_parse_options(|opts| opts.with_null_values(Some(n.clone())))
1664                        }
1665                        None => reader,
1666                    };
1667                    reader
1668                },
1669            )?;
1670            state.row_numbers = options.row_numbers;
1671            Ok(state)
1672        }
1673    }
1674
1675    pub fn from_csv_customize<F>(
1676        path: &Path,
1677        pages_lookahead: Option<usize>,
1678        pages_lookback: Option<usize>,
1679        max_buffered_rows: Option<usize>,
1680        max_buffered_mb: Option<usize>,
1681        func: F,
1682    ) -> Result<Self>
1683    where
1684        F: FnOnce(LazyCsvReader) -> LazyCsvReader,
1685    {
1686        let pl_path = PlPath::Local(Arc::from(path));
1687        let reader = LazyCsvReader::new(pl_path);
1688        let lf = func(reader).finish()?;
1689        Self::new(
1690            lf,
1691            pages_lookahead,
1692            pages_lookback,
1693            max_buffered_rows,
1694            max_buffered_mb,
1695            true,
1696        )
1697    }
1698
1699    /// Load multiple CSV files (uncompressed) and concatenate into one LazyFrame.
1700    pub fn from_csv_paths(paths: &[impl AsRef<Path>], options: &OpenOptions) -> Result<Self> {
1701        if paths.is_empty() {
1702            return Err(color_eyre::eyre::eyre!("No paths provided"));
1703        }
1704        if paths.len() == 1 {
1705            return Self::from_csv(paths[0].as_ref(), options);
1706        }
1707        let nv = Self::build_null_values_for_csv(options, Some(paths[0].as_ref()))?;
1708        let mut lazy_frames = Vec::with_capacity(paths.len());
1709        for p in paths {
1710            let pl_path = PlPath::Local(Arc::from(p.as_ref()));
1711            let mut reader = LazyCsvReader::new(pl_path);
1712            if let Some(skip_lines) = options.skip_lines {
1713                reader = reader.with_skip_lines(skip_lines);
1714            }
1715            if let Some(skip_rows) = options.skip_rows {
1716                reader = reader.with_skip_rows(skip_rows);
1717            }
1718            if let Some(has_header) = options.has_header {
1719                reader = reader.with_has_header(has_header);
1720            }
1721            reader = reader.with_try_parse_dates(options.parse_dates);
1722            reader = match &nv {
1723                Some(n) => reader.map_parse_options(|opts| opts.with_null_values(Some(n.clone()))),
1724                None => reader,
1725            };
1726            let lf = reader.finish()?;
1727            lazy_frames.push(lf);
1728        }
1729        let lf = polars::prelude::concat(lazy_frames.as_slice(), Default::default())?;
1730        let mut state = Self::new(
1731            lf,
1732            options.pages_lookahead,
1733            options.pages_lookback,
1734            options.max_buffered_rows,
1735            options.max_buffered_mb,
1736            options.polars_streaming,
1737        )?;
1738        state.row_numbers = options.row_numbers;
1739        state.row_start_index = options.row_start_index;
1740        Ok(state)
1741    }
1742
1743    pub fn from_ndjson(
1744        path: &Path,
1745        pages_lookahead: Option<usize>,
1746        pages_lookback: Option<usize>,
1747        max_buffered_rows: Option<usize>,
1748        max_buffered_mb: Option<usize>,
1749        row_numbers: bool,
1750        row_start_index: usize,
1751    ) -> Result<Self> {
1752        let pl_path = PlPath::Local(Arc::from(path));
1753        let lf = LazyJsonLineReader::new(pl_path).finish()?;
1754        let mut state = Self::new(
1755            lf,
1756            pages_lookahead,
1757            pages_lookback,
1758            max_buffered_rows,
1759            max_buffered_mb,
1760            true,
1761        )?;
1762        state.row_numbers = row_numbers;
1763        state.row_start_index = row_start_index;
1764        Ok(state)
1765    }
1766
1767    /// Load multiple NDJSON files and concatenate into one LazyFrame.
1768    pub fn from_ndjson_paths(
1769        paths: &[impl AsRef<Path>],
1770        pages_lookahead: Option<usize>,
1771        pages_lookback: Option<usize>,
1772        max_buffered_rows: Option<usize>,
1773        max_buffered_mb: Option<usize>,
1774        row_numbers: bool,
1775        row_start_index: usize,
1776    ) -> Result<Self> {
1777        if paths.is_empty() {
1778            return Err(color_eyre::eyre::eyre!("No paths provided"));
1779        }
1780        if paths.len() == 1 {
1781            return Self::from_ndjson(
1782                paths[0].as_ref(),
1783                pages_lookahead,
1784                pages_lookback,
1785                max_buffered_rows,
1786                max_buffered_mb,
1787                row_numbers,
1788                row_start_index,
1789            );
1790        }
1791        let mut lazy_frames = Vec::with_capacity(paths.len());
1792        for p in paths {
1793            let pl_path = PlPath::Local(Arc::from(p.as_ref()));
1794            let lf = LazyJsonLineReader::new(pl_path).finish()?;
1795            lazy_frames.push(lf);
1796        }
1797        let lf = polars::prelude::concat(lazy_frames.as_slice(), Default::default())?;
1798        let mut state = Self::new(
1799            lf,
1800            pages_lookahead,
1801            pages_lookback,
1802            max_buffered_rows,
1803            max_buffered_mb,
1804            true,
1805        )?;
1806        state.row_numbers = row_numbers;
1807        state.row_start_index = row_start_index;
1808        Ok(state)
1809    }
1810
1811    pub fn from_json(
1812        path: &Path,
1813        pages_lookahead: Option<usize>,
1814        pages_lookback: Option<usize>,
1815        max_buffered_rows: Option<usize>,
1816        max_buffered_mb: Option<usize>,
1817        row_numbers: bool,
1818        row_start_index: usize,
1819    ) -> Result<Self> {
1820        Self::from_json_with_format(
1821            path,
1822            pages_lookahead,
1823            pages_lookback,
1824            max_buffered_rows,
1825            max_buffered_mb,
1826            row_numbers,
1827            row_start_index,
1828            JsonFormat::Json,
1829        )
1830    }
1831
1832    pub fn from_json_lines(
1833        path: &Path,
1834        pages_lookahead: Option<usize>,
1835        pages_lookback: Option<usize>,
1836        max_buffered_rows: Option<usize>,
1837        max_buffered_mb: Option<usize>,
1838        row_numbers: bool,
1839        row_start_index: usize,
1840    ) -> Result<Self> {
1841        Self::from_json_with_format(
1842            path,
1843            pages_lookahead,
1844            pages_lookback,
1845            max_buffered_rows,
1846            max_buffered_mb,
1847            row_numbers,
1848            row_start_index,
1849            JsonFormat::JsonLines,
1850        )
1851    }
1852
1853    #[allow(clippy::too_many_arguments)]
1854    fn from_json_with_format(
1855        path: &Path,
1856        pages_lookahead: Option<usize>,
1857        pages_lookback: Option<usize>,
1858        max_buffered_rows: Option<usize>,
1859        max_buffered_mb: Option<usize>,
1860        row_numbers: bool,
1861        row_start_index: usize,
1862        format: JsonFormat,
1863    ) -> Result<Self> {
1864        let file = File::open(path)?;
1865        let lf = JsonReader::new(file)
1866            .with_json_format(format)
1867            .finish()?
1868            .lazy();
1869        let mut state = Self::new(
1870            lf,
1871            pages_lookahead,
1872            pages_lookback,
1873            max_buffered_rows,
1874            max_buffered_mb,
1875            true,
1876        )?;
1877        state.row_numbers = row_numbers;
1878        state.row_start_index = row_start_index;
1879        Ok(state)
1880    }
1881
1882    /// Load multiple JSON (array) files and concatenate into one LazyFrame.
1883    pub fn from_json_paths(
1884        paths: &[impl AsRef<Path>],
1885        pages_lookahead: Option<usize>,
1886        pages_lookback: Option<usize>,
1887        max_buffered_rows: Option<usize>,
1888        max_buffered_mb: Option<usize>,
1889        row_numbers: bool,
1890        row_start_index: usize,
1891    ) -> Result<Self> {
1892        Self::from_json_with_format_paths(
1893            paths,
1894            pages_lookahead,
1895            pages_lookback,
1896            max_buffered_rows,
1897            max_buffered_mb,
1898            row_numbers,
1899            row_start_index,
1900            JsonFormat::Json,
1901        )
1902    }
1903
1904    /// Load multiple JSON Lines files and concatenate into one LazyFrame.
1905    pub fn from_json_lines_paths(
1906        paths: &[impl AsRef<Path>],
1907        pages_lookahead: Option<usize>,
1908        pages_lookback: Option<usize>,
1909        max_buffered_rows: Option<usize>,
1910        max_buffered_mb: Option<usize>,
1911        row_numbers: bool,
1912        row_start_index: usize,
1913    ) -> Result<Self> {
1914        Self::from_json_with_format_paths(
1915            paths,
1916            pages_lookahead,
1917            pages_lookback,
1918            max_buffered_rows,
1919            max_buffered_mb,
1920            row_numbers,
1921            row_start_index,
1922            JsonFormat::JsonLines,
1923        )
1924    }
1925
1926    #[allow(clippy::too_many_arguments)]
1927    fn from_json_with_format_paths(
1928        paths: &[impl AsRef<Path>],
1929        pages_lookahead: Option<usize>,
1930        pages_lookback: Option<usize>,
1931        max_buffered_rows: Option<usize>,
1932        max_buffered_mb: Option<usize>,
1933        row_numbers: bool,
1934        row_start_index: usize,
1935        format: JsonFormat,
1936    ) -> Result<Self> {
1937        if paths.is_empty() {
1938            return Err(color_eyre::eyre::eyre!("No paths provided"));
1939        }
1940        if paths.len() == 1 {
1941            return Self::from_json_with_format(
1942                paths[0].as_ref(),
1943                pages_lookahead,
1944                pages_lookback,
1945                max_buffered_rows,
1946                max_buffered_mb,
1947                row_numbers,
1948                row_start_index,
1949                format,
1950            );
1951        }
1952        let mut lazy_frames = Vec::with_capacity(paths.len());
1953        for p in paths {
1954            let file = File::open(p.as_ref())?;
1955            let lf = match &format {
1956                JsonFormat::Json => JsonReader::new(file)
1957                    .with_json_format(JsonFormat::Json)
1958                    .finish()?
1959                    .lazy(),
1960                JsonFormat::JsonLines => JsonReader::new(file)
1961                    .with_json_format(JsonFormat::JsonLines)
1962                    .finish()?
1963                    .lazy(),
1964            };
1965            lazy_frames.push(lf);
1966        }
1967        let lf = polars::prelude::concat(lazy_frames.as_slice(), Default::default())?;
1968        let mut state = Self::new(
1969            lf,
1970            pages_lookahead,
1971            pages_lookback,
1972            max_buffered_rows,
1973            max_buffered_mb,
1974            true,
1975        )?;
1976        state.row_numbers = row_numbers;
1977        state.row_start_index = row_start_index;
1978        Ok(state)
1979    }
1980
1981    #[allow(clippy::too_many_arguments)]
1982    pub fn from_delimited(
1983        path: &Path,
1984        delimiter: u8,
1985        pages_lookahead: Option<usize>,
1986        pages_lookback: Option<usize>,
1987        max_buffered_rows: Option<usize>,
1988        max_buffered_mb: Option<usize>,
1989        row_numbers: bool,
1990        row_start_index: usize,
1991    ) -> Result<Self> {
1992        let pl_path = PlPath::Local(Arc::from(path));
1993        let reader = LazyCsvReader::new(pl_path).with_separator(delimiter);
1994        let lf = reader.finish()?;
1995        let mut state = Self::new(
1996            lf,
1997            pages_lookahead,
1998            pages_lookback,
1999            max_buffered_rows,
2000            max_buffered_mb,
2001            true,
2002        )?;
2003        state.row_numbers = row_numbers;
2004        state.row_start_index = row_start_index;
2005        Ok(state)
2006    }
2007
2008    /// Returns true if a scroll by `rows` would trigger a collect (view would leave the buffer).
2009    /// Used so the UI only shows the throbber when actual data loading will occur.
2010    pub fn scroll_would_trigger_collect(&self, rows: i64) -> bool {
2011        if rows < 0 && self.start_row == 0 {
2012            return false;
2013        }
2014        let new_start_row = if self.start_row as i64 + rows <= 0 {
2015            0
2016        } else {
2017            if let Some(df) = self.df.as_ref() {
2018                if rows > 0 && df.shape().0 <= self.visible_rows {
2019                    return false;
2020                }
2021            }
2022            (self.start_row as i64 + rows) as usize
2023        };
2024        let view_end = new_start_row
2025            + self
2026                .visible_rows
2027                .min(self.num_rows.saturating_sub(new_start_row));
2028        let within_buffer = new_start_row >= self.buffered_start_row
2029            && view_end <= self.buffered_end_row
2030            && self.buffered_end_row > 0;
2031        !within_buffer
2032    }
2033
2034    fn slide_table(&mut self, rows: i64) {
2035        if rows < 0 && self.start_row == 0 {
2036            return;
2037        }
2038
2039        let new_start_row = if self.start_row as i64 + rows <= 0 {
2040            0
2041        } else {
2042            if let Some(df) = self.df.as_ref() {
2043                if rows > 0 && df.shape().0 <= self.visible_rows {
2044                    return;
2045                }
2046            }
2047            (self.start_row as i64 + rows) as usize
2048        };
2049
2050        // Call collect() only when view is outside buffer; otherwise just update start_row.
2051        let view_end = new_start_row
2052            + self
2053                .visible_rows
2054                .min(self.num_rows.saturating_sub(new_start_row));
2055        let within_buffer = new_start_row >= self.buffered_start_row
2056            && view_end <= self.buffered_end_row
2057            && self.buffered_end_row > 0;
2058
2059        if within_buffer {
2060            self.start_row = new_start_row;
2061            return;
2062        }
2063
2064        self.start_row = new_start_row;
2065        self.collect();
2066    }
2067
2068    pub fn collect(&mut self) {
2069        // Update proximity threshold based on visible rows
2070        if self.visible_rows > 0 {
2071            self.proximity_threshold = self.visible_rows;
2072        }
2073
2074        // Run len() only when lf has changed (query, filter, sort, pivot, melt, reset, drill).
2075        if !self.num_rows_valid {
2076            self.num_rows =
2077                match collect_lazy(self.lf.clone().select([len()]), self.polars_streaming) {
2078                    Ok(df) => {
2079                        if let Some(col) = df.get(0) {
2080                            if let Some(AnyValue::UInt32(len)) = col.first() {
2081                                *len as usize
2082                            } else {
2083                                0
2084                            }
2085                        } else {
2086                            0
2087                        }
2088                    }
2089                    Err(_) => 0,
2090                };
2091            self.num_rows_valid = true;
2092        }
2093
2094        if self.num_rows > 0 {
2095            let max_start = self.num_rows.saturating_sub(1);
2096            if self.start_row > max_start {
2097                self.start_row = max_start;
2098            }
2099        } else {
2100            self.start_row = 0;
2101            self.buffered_start_row = 0;
2102            self.buffered_end_row = 0;
2103            self.buffered_df = None;
2104            self.df = None;
2105            self.locked_df = None;
2106            return;
2107        }
2108
2109        // Proximity-based buffer logic
2110        let view_start = self.start_row;
2111        let view_end = self.start_row + self.visible_rows.min(self.num_rows - self.start_row);
2112
2113        // Check if current view is within buffered range
2114        let within_buffer = view_start >= self.buffered_start_row
2115            && view_end <= self.buffered_end_row
2116            && self.buffered_end_row > 0;
2117
2118        // Buffer grows incrementally: initial load and each expansion add only a few pages (lookahead + lookback).
2119        // clamp_buffer_to_max_size caps at max_buffered_rows and slides the window when at cap.
2120        let page_rows = self.visible_rows.max(1);
2121
2122        if within_buffer {
2123            let dist_to_start = view_start.saturating_sub(self.buffered_start_row);
2124            let dist_to_end = self.buffered_end_row.saturating_sub(view_end);
2125
2126            let needs_expansion_back =
2127                dist_to_start <= self.proximity_threshold && self.buffered_start_row > 0;
2128            let needs_expansion_forward =
2129                dist_to_end <= self.proximity_threshold && self.buffered_end_row < self.num_rows;
2130
2131            if !needs_expansion_back && !needs_expansion_forward {
2132                // Column scroll only: reuse cached full buffer and re-slice into locked/scroll columns.
2133                let expected_len = self
2134                    .buffered_end_row
2135                    .saturating_sub(self.buffered_start_row);
2136                if self
2137                    .buffered_df
2138                    .as_ref()
2139                    .is_some_and(|b| b.height() == expected_len)
2140                {
2141                    self.slice_buffer_into_display();
2142                    if self.table_state.selected().is_none() {
2143                        self.table_state.select(Some(0));
2144                    }
2145                    return;
2146                }
2147                self.load_buffer(self.buffered_start_row, self.buffered_end_row);
2148                if self.table_state.selected().is_none() {
2149                    self.table_state.select(Some(0));
2150                }
2151                return;
2152            }
2153
2154            let mut new_buffer_start = if needs_expansion_back {
2155                view_start.saturating_sub(self.pages_lookback * page_rows)
2156            } else {
2157                self.buffered_start_row
2158            };
2159
2160            let mut new_buffer_end = if needs_expansion_forward {
2161                (view_end + self.pages_lookahead * page_rows).min(self.num_rows)
2162            } else {
2163                self.buffered_end_row
2164            };
2165
2166            self.clamp_buffer_to_max_size(
2167                view_start,
2168                view_end,
2169                &mut new_buffer_start,
2170                &mut new_buffer_end,
2171            );
2172            self.load_buffer(new_buffer_start, new_buffer_end);
2173        } else {
2174            // Outside buffer: either extend the previous buffer (so it grows) or load a fresh small window.
2175            // Only extend when the view is "close" to the existing buffer (e.g. user paged down a bit).
2176            // A big jump (e.g. jump to end) should load just a window around the new view, not extend
2177            // the buffer across the whole dataset.
2178            let mut new_buffer_start;
2179            let mut new_buffer_end;
2180
2181            let had_buffer = self.buffered_end_row > 0;
2182            let scrolled_past_end = had_buffer && view_start >= self.buffered_end_row;
2183            let scrolled_past_start = had_buffer && view_end <= self.buffered_start_row;
2184
2185            let extend_forward_ok = scrolled_past_end
2186                && (view_start - self.buffered_end_row) <= self.pages_lookahead * page_rows;
2187            let extend_backward_ok = scrolled_past_start
2188                && (self.buffered_start_row - view_end) <= self.pages_lookback * page_rows;
2189
2190            if extend_forward_ok {
2191                // View is just a few pages past buffer end; extend forward.
2192                new_buffer_start = self.buffered_start_row;
2193                new_buffer_end = (view_end + self.pages_lookahead * page_rows).min(self.num_rows);
2194            } else if extend_backward_ok {
2195                // View is just a few pages before buffer start; extend backward.
2196                new_buffer_start = view_start.saturating_sub(self.pages_lookback * page_rows);
2197                new_buffer_end = self.buffered_end_row;
2198            } else if scrolled_past_end || scrolled_past_start {
2199                // Big jump (e.g. jump to end or jump to start): load a fresh window around the view.
2200                new_buffer_start = view_start.saturating_sub(self.pages_lookback * page_rows);
2201                new_buffer_end = (view_end + self.pages_lookahead * page_rows).min(self.num_rows);
2202                let min_initial_len = (1 + self.pages_lookahead + self.pages_lookback) * page_rows;
2203                let current_len = new_buffer_end.saturating_sub(new_buffer_start);
2204                if current_len < min_initial_len {
2205                    let need = min_initial_len.saturating_sub(current_len);
2206                    let can_extend_end = self.num_rows.saturating_sub(new_buffer_end);
2207                    let can_extend_start = new_buffer_start;
2208                    if can_extend_end >= need {
2209                        new_buffer_end = (new_buffer_end + need).min(self.num_rows);
2210                    } else if can_extend_start >= need {
2211                        new_buffer_start = new_buffer_start.saturating_sub(need);
2212                    } else {
2213                        new_buffer_end = (new_buffer_end + can_extend_end).min(self.num_rows);
2214                        new_buffer_start =
2215                            new_buffer_start.saturating_sub(need.saturating_sub(can_extend_end));
2216                    }
2217                }
2218            } else {
2219                // No buffer yet or big jump: load a fresh small window (view ± a few pages).
2220                new_buffer_start = view_start.saturating_sub(self.pages_lookback * page_rows);
2221                new_buffer_end = (view_end + self.pages_lookahead * page_rows).min(self.num_rows);
2222
2223                // Ensure at least (1 + lookahead + lookback) pages so buffer size is consistent (e.g. 364 at 52 visible).
2224                let min_initial_len = (1 + self.pages_lookahead + self.pages_lookback) * page_rows;
2225                let current_len = new_buffer_end.saturating_sub(new_buffer_start);
2226                if current_len < min_initial_len {
2227                    let need = min_initial_len.saturating_sub(current_len);
2228                    let can_extend_end = self.num_rows.saturating_sub(new_buffer_end);
2229                    let can_extend_start = new_buffer_start;
2230                    if can_extend_end >= need {
2231                        new_buffer_end = (new_buffer_end + need).min(self.num_rows);
2232                    } else if can_extend_start >= need {
2233                        new_buffer_start = new_buffer_start.saturating_sub(need);
2234                    } else {
2235                        new_buffer_end = (new_buffer_end + can_extend_end).min(self.num_rows);
2236                        new_buffer_start =
2237                            new_buffer_start.saturating_sub(need.saturating_sub(can_extend_end));
2238                    }
2239                }
2240            }
2241
2242            self.clamp_buffer_to_max_size(
2243                view_start,
2244                view_end,
2245                &mut new_buffer_start,
2246                &mut new_buffer_end,
2247            );
2248            self.load_buffer(new_buffer_start, new_buffer_end);
2249        }
2250
2251        self.slice_from_buffer();
2252        if self.table_state.selected().is_none() {
2253            self.table_state.select(Some(0));
2254        }
2255    }
2256
2257    /// Invalidate num_rows cache when lf is mutated.
2258    fn invalidate_num_rows(&mut self) {
2259        self.num_rows_valid = false;
2260    }
2261
2262    /// Returns the cached row count when valid (same value shown in the control bar). Use this to
2263    /// avoid an extra full scan for analysis/describe when the table has already been collected.
2264    pub fn num_rows_if_valid(&self) -> Option<usize> {
2265        if self.num_rows_valid {
2266            Some(self.num_rows)
2267        } else {
2268            None
2269        }
2270    }
2271
2272    /// Clamp buffer to max_buffered_rows; when at cap, slide window to keep view inside.
2273    fn clamp_buffer_to_max_size(
2274        &self,
2275        view_start: usize,
2276        view_end: usize,
2277        buffer_start: &mut usize,
2278        buffer_end: &mut usize,
2279    ) {
2280        if self.max_buffered_rows == 0 {
2281            return;
2282        }
2283        let max_len = self.max_buffered_rows;
2284        let requested_len = buffer_end.saturating_sub(*buffer_start);
2285        if requested_len <= max_len {
2286            return;
2287        }
2288        let view_len = view_end.saturating_sub(view_start);
2289        if view_len >= max_len {
2290            *buffer_start = view_start;
2291            *buffer_end = (view_start + max_len).min(self.num_rows);
2292        } else {
2293            let half = (max_len - view_len) / 2;
2294            *buffer_end = (view_end + half).min(self.num_rows);
2295            *buffer_start = (*buffer_end).saturating_sub(max_len);
2296            if *buffer_start > view_start {
2297                *buffer_start = view_start;
2298            }
2299            *buffer_end = (*buffer_start + max_len).min(self.num_rows);
2300        }
2301    }
2302
2303    fn load_buffer(&mut self, buffer_start: usize, buffer_end: usize) {
2304        let buffer_size = buffer_end.saturating_sub(buffer_start);
2305        if buffer_size == 0 {
2306            return;
2307        }
2308
2309        let all_columns: Vec<_> = self
2310            .column_order
2311            .iter()
2312            .map(|name| col(name.as_str()))
2313            .collect();
2314
2315        let use_streaming = self.polars_streaming;
2316        let mut full_df = match collect_lazy(
2317            self.lf
2318                .clone()
2319                .select(all_columns)
2320                .slice(buffer_start as i64, buffer_size as u32),
2321            use_streaming,
2322        ) {
2323            Ok(df) => df,
2324            Err(e) => {
2325                self.error = Some(e);
2326                return;
2327            }
2328        };
2329
2330        let mut effective_buffer_end = buffer_end;
2331        if self.max_buffered_mb > 0 {
2332            let size = full_df.estimated_size();
2333            let max_bytes = self.max_buffered_mb * 1024 * 1024;
2334            if size > max_bytes {
2335                let rows = full_df.height();
2336                if rows > 0 {
2337                    let bytes_per_row = size / rows;
2338                    let max_rows = (max_bytes / bytes_per_row.max(1)).min(rows);
2339                    if max_rows < rows {
2340                        full_df = full_df.slice(0, max_rows);
2341                        effective_buffer_end = buffer_start + max_rows;
2342                    }
2343                }
2344            }
2345        }
2346
2347        if self.locked_columns_count > 0 {
2348            let locked_names: Vec<&str> = self
2349                .column_order
2350                .iter()
2351                .take(self.locked_columns_count)
2352                .map(|s| s.as_str())
2353                .collect();
2354            let locked_df = match full_df.select(locked_names) {
2355                Ok(df) => df,
2356                Err(e) => {
2357                    self.error = Some(e);
2358                    return;
2359                }
2360            };
2361            self.locked_df = if self.is_grouped() {
2362                match self.format_grouped_dataframe(locked_df) {
2363                    Ok(formatted_df) => Some(formatted_df),
2364                    Err(e) => {
2365                        self.error = Some(PolarsError::ComputeError(
2366                            crate::error_display::user_message_from_report(&e, None).into(),
2367                        ));
2368                        return;
2369                    }
2370                }
2371            } else {
2372                Some(locked_df)
2373            };
2374        } else {
2375            self.locked_df = None;
2376        }
2377
2378        let scroll_names: Vec<&str> = self
2379            .column_order
2380            .iter()
2381            .skip(self.locked_columns_count + self.termcol_index)
2382            .map(|s| s.as_str())
2383            .collect();
2384        if scroll_names.is_empty() {
2385            self.df = None;
2386        } else {
2387            let scroll_df = match full_df.select(scroll_names) {
2388                Ok(df) => df,
2389                Err(e) => {
2390                    self.error = Some(e);
2391                    return;
2392                }
2393            };
2394            self.df = if self.is_grouped() {
2395                match self.format_grouped_dataframe(scroll_df) {
2396                    Ok(formatted_df) => Some(formatted_df),
2397                    Err(e) => {
2398                        self.error = Some(PolarsError::ComputeError(
2399                            crate::error_display::user_message_from_report(&e, None).into(),
2400                        ));
2401                        return;
2402                    }
2403                }
2404            } else {
2405                Some(scroll_df)
2406            };
2407        }
2408        if self.error.is_some() {
2409            self.error = None;
2410        }
2411        self.buffered_start_row = buffer_start;
2412        self.buffered_end_row = effective_buffer_end;
2413        self.buffered_df = Some(full_df);
2414    }
2415
2416    /// Recompute locked_df and df from the cached full buffer. Used when only termcol_index (or locked columns) changed.
2417    fn slice_buffer_into_display(&mut self) {
2418        let full_df = match self.buffered_df.as_ref() {
2419            Some(df) => df,
2420            None => return,
2421        };
2422
2423        if self.locked_columns_count > 0 {
2424            let locked_names: Vec<&str> = self
2425                .column_order
2426                .iter()
2427                .take(self.locked_columns_count)
2428                .map(|s| s.as_str())
2429                .collect();
2430            if let Ok(locked_df) = full_df.select(locked_names) {
2431                self.locked_df = if self.is_grouped() {
2432                    self.format_grouped_dataframe(locked_df).ok()
2433                } else {
2434                    Some(locked_df)
2435                };
2436            }
2437        } else {
2438            self.locked_df = None;
2439        }
2440
2441        let scroll_names: Vec<&str> = self
2442            .column_order
2443            .iter()
2444            .skip(self.locked_columns_count + self.termcol_index)
2445            .map(|s| s.as_str())
2446            .collect();
2447        if scroll_names.is_empty() {
2448            self.df = None;
2449        } else if let Ok(scroll_df) = full_df.select(scroll_names) {
2450            self.df = if self.is_grouped() {
2451                self.format_grouped_dataframe(scroll_df).ok()
2452            } else {
2453                Some(scroll_df)
2454            };
2455        }
2456    }
2457
2458    fn slice_from_buffer(&mut self) {
2459        // Buffer contains the full range [buffered_start_row, buffered_end_row)
2460        // The displayed portion [start_row, start_row + visible_rows) is a subset
2461        // We'll slice the displayed portion when rendering based on offset
2462        // No action needed here - the buffer is stored, slicing happens at render time
2463    }
2464
2465    fn format_grouped_dataframe(&self, df: DataFrame) -> Result<DataFrame> {
2466        let schema = df.schema();
2467        let mut new_series = Vec::new();
2468
2469        for (col_name, dtype) in schema.iter() {
2470            let col = df.column(col_name)?;
2471            if matches!(dtype, DataType::List(_)) {
2472                let string_series: Series = col
2473                    .list()?
2474                    .into_iter()
2475                    .map(|opt_list| {
2476                        opt_list.map(|list_series| {
2477                            let values: Vec<String> = list_series
2478                                .iter()
2479                                .take(10)
2480                                .map(|v| v.str_value().to_string())
2481                                .collect();
2482                            if list_series.len() > 10 {
2483                                format!("[{}...] ({} items)", values.join(", "), list_series.len())
2484                            } else {
2485                                format!("[{}]", values.join(", "))
2486                            }
2487                        })
2488                    })
2489                    .collect();
2490                new_series.push(string_series.with_name(col_name.as_str().into()).into());
2491            } else {
2492                new_series.push(col.clone());
2493            }
2494        }
2495
2496        Ok(DataFrame::new(new_series)?)
2497    }
2498
2499    pub fn select_next(&mut self) {
2500        self.table_state.select_next();
2501        if let Some(selected) = self.table_state.selected() {
2502            if selected >= self.visible_rows && self.visible_rows > 0 {
2503                self.slide_table(1);
2504            }
2505        }
2506    }
2507
2508    pub fn page_down(&mut self) {
2509        self.slide_table(self.visible_rows as i64);
2510    }
2511
2512    pub fn select_previous(&mut self) {
2513        if let Some(selected) = self.table_state.selected() {
2514            self.table_state.select_previous();
2515            if selected == 0 && self.start_row > 0 {
2516                self.slide_table(-1);
2517            }
2518        } else {
2519            self.table_state.select(Some(0));
2520        }
2521    }
2522
2523    pub fn scroll_to(&mut self, index: usize) {
2524        if self.start_row == index {
2525            return;
2526        }
2527
2528        if index == 0 {
2529            self.start_row = 0;
2530            self.collect();
2531            self.start_row = 0;
2532        } else {
2533            self.start_row = index;
2534            self.collect();
2535        }
2536    }
2537
2538    /// Scroll so that the given row index is centered in the view when possible (respects table bounds).
2539    /// Selects that row. Used by go-to-line.
2540    pub fn scroll_to_row_centered(&mut self, row_index: usize) {
2541        self.ensure_num_rows();
2542        if self.num_rows == 0 || self.visible_rows == 0 {
2543            return;
2544        }
2545        let center_offset = self.visible_rows / 2;
2546        let mut start_row = row_index.saturating_sub(center_offset);
2547        let max_start = self.num_rows.saturating_sub(self.visible_rows);
2548        start_row = start_row.min(max_start);
2549
2550        if self.start_row == start_row {
2551            let display_idx = row_index
2552                .saturating_sub(start_row)
2553                .min(self.visible_rows.saturating_sub(1));
2554            self.table_state.select(Some(display_idx));
2555            return;
2556        }
2557
2558        self.start_row = start_row;
2559        self.collect();
2560        let display_idx = row_index
2561            .saturating_sub(start_row)
2562            .min(self.visible_rows.saturating_sub(1));
2563        self.table_state.select(Some(display_idx));
2564    }
2565
2566    /// Ensure num_rows is up to date (runs len() query if needed). Used before scroll_to_end.
2567    fn ensure_num_rows(&mut self) {
2568        if self.num_rows_valid {
2569            return;
2570        }
2571        if self.visible_rows > 0 {
2572            self.proximity_threshold = self.visible_rows;
2573        }
2574        self.num_rows = match self.lf.clone().select([len()]).collect() {
2575            Ok(df) => {
2576                if let Some(col) = df.get(0) {
2577                    if let Some(AnyValue::UInt32(len)) = col.first() {
2578                        *len as usize
2579                    } else {
2580                        0
2581                    }
2582                } else {
2583                    0
2584                }
2585            }
2586            Err(_) => 0,
2587        };
2588        self.num_rows_valid = true;
2589    }
2590
2591    /// Jump to the last page; buffer is trimmed/loaded as needed. Selects the last row.
2592    pub fn scroll_to_end(&mut self) {
2593        self.ensure_num_rows();
2594        if self.num_rows == 0 {
2595            self.start_row = 0;
2596            self.buffered_start_row = 0;
2597            self.buffered_end_row = 0;
2598            return;
2599        }
2600        let end_start = self.num_rows.saturating_sub(self.visible_rows);
2601        if self.start_row == end_start {
2602            self.select_last_visible_row();
2603            return;
2604        }
2605        self.start_row = end_start;
2606        self.collect();
2607        self.select_last_visible_row();
2608    }
2609
2610    /// Set table selection to the last row in the current view (for use after scroll_to_end).
2611    fn select_last_visible_row(&mut self) {
2612        if self.num_rows == 0 {
2613            return;
2614        }
2615        let last_row_display_idx = (self.num_rows - 1).saturating_sub(self.start_row);
2616        let sel = last_row_display_idx.min(self.visible_rows.saturating_sub(1));
2617        self.table_state.select(Some(sel));
2618    }
2619
2620    pub fn half_page_down(&mut self) {
2621        let half = (self.visible_rows / 2).max(1) as i64;
2622        self.slide_table(half);
2623    }
2624
2625    pub fn half_page_up(&mut self) {
2626        if self.start_row == 0 {
2627            return;
2628        }
2629        let half = (self.visible_rows / 2).max(1) as i64;
2630        self.slide_table(-half);
2631    }
2632
2633    pub fn page_up(&mut self) {
2634        if self.start_row == 0 {
2635            return;
2636        }
2637        self.slide_table(-(self.visible_rows as i64));
2638    }
2639
2640    pub fn scroll_right(&mut self) {
2641        let max_scroll = self
2642            .column_order
2643            .len()
2644            .saturating_sub(self.locked_columns_count);
2645        if self.termcol_index < max_scroll.saturating_sub(1) {
2646            self.termcol_index += 1;
2647            self.collect();
2648        }
2649    }
2650
2651    pub fn scroll_left(&mut self) {
2652        if self.termcol_index > 0 {
2653            self.termcol_index -= 1;
2654            self.collect();
2655        }
2656    }
2657
2658    pub fn headers(&self) -> Vec<String> {
2659        self.column_order.clone()
2660    }
2661
2662    pub fn set_column_order(&mut self, order: Vec<String>) {
2663        self.column_order = order;
2664        self.buffered_start_row = 0;
2665        self.buffered_end_row = 0;
2666        self.buffered_df = None;
2667        self.collect();
2668    }
2669
2670    pub fn set_locked_columns(&mut self, count: usize) {
2671        self.locked_columns_count = count.min(self.column_order.len());
2672        self.buffered_start_row = 0;
2673        self.buffered_end_row = 0;
2674        self.buffered_df = None;
2675        self.collect();
2676    }
2677
2678    pub fn locked_columns_count(&self) -> usize {
2679        self.locked_columns_count
2680    }
2681
2682    // Getter methods for template creation
2683    pub fn get_filters(&self) -> &[FilterStatement] {
2684        &self.filters
2685    }
2686
2687    pub fn get_sort_columns(&self) -> &[String] {
2688        &self.sort_columns
2689    }
2690
2691    pub fn get_sort_ascending(&self) -> bool {
2692        self.sort_ascending
2693    }
2694
2695    pub fn get_column_order(&self) -> &[String] {
2696        &self.column_order
2697    }
2698
2699    pub fn get_active_query(&self) -> &str {
2700        &self.active_query
2701    }
2702
2703    pub fn get_active_sql_query(&self) -> &str {
2704        &self.active_sql_query
2705    }
2706
2707    pub fn get_active_fuzzy_query(&self) -> &str {
2708        &self.active_fuzzy_query
2709    }
2710
2711    pub fn last_pivot_spec(&self) -> Option<&PivotSpec> {
2712        self.last_pivot_spec.as_ref()
2713    }
2714
2715    pub fn last_melt_spec(&self) -> Option<&MeltSpec> {
2716        self.last_melt_spec.as_ref()
2717    }
2718
2719    pub fn is_grouped(&self) -> bool {
2720        self.schema
2721            .iter()
2722            .any(|(_, dtype)| matches!(dtype, DataType::List(_)))
2723    }
2724
2725    pub fn group_key_columns(&self) -> Vec<String> {
2726        self.schema
2727            .iter()
2728            .filter(|(_, dtype)| !matches!(dtype, DataType::List(_)))
2729            .map(|(name, _)| name.to_string())
2730            .collect()
2731    }
2732
2733    pub fn group_value_columns(&self) -> Vec<String> {
2734        self.schema
2735            .iter()
2736            .filter(|(_, dtype)| matches!(dtype, DataType::List(_)))
2737            .map(|(name, _)| name.to_string())
2738            .collect()
2739    }
2740
2741    /// Estimated heap size in bytes of the currently buffered slice (locked + scrollable), if collected.
2742    pub fn buffered_memory_bytes(&self) -> Option<usize> {
2743        let locked = self
2744            .locked_df
2745            .as_ref()
2746            .map(|df| df.estimated_size())
2747            .unwrap_or(0);
2748        let scroll = self.df.as_ref().map(|df| df.estimated_size()).unwrap_or(0);
2749        if locked == 0 && scroll == 0 {
2750            None
2751        } else {
2752            Some(locked + scroll)
2753        }
2754    }
2755
2756    /// Number of rows currently in the buffer. 0 if no buffer loaded.
2757    pub fn buffered_rows(&self) -> usize {
2758        self.buffered_end_row
2759            .saturating_sub(self.buffered_start_row)
2760    }
2761
2762    /// Current scrollable display buffer. None until first collect().
2763    pub fn display_df(&self) -> Option<&DataFrame> {
2764        self.df.as_ref()
2765    }
2766
2767    /// Visible-window slice of the display buffer (same as passed to render_dataframe).
2768    pub fn display_slice_df(&self) -> Option<DataFrame> {
2769        let df = self.df.as_ref()?;
2770        let offset = self.start_row.saturating_sub(self.buffered_start_row);
2771        let slice_len = self.visible_rows.min(df.height().saturating_sub(offset));
2772        if offset < df.height() && slice_len > 0 {
2773            Some(df.slice(offset as i64, slice_len))
2774        } else {
2775            None
2776        }
2777    }
2778
2779    /// Maximum buffer size in rows (0 = no limit).
2780    pub fn max_buffered_rows(&self) -> usize {
2781        self.max_buffered_rows
2782    }
2783
2784    /// Maximum buffer size in MiB (0 = no limit).
2785    pub fn max_buffered_mb(&self) -> usize {
2786        self.max_buffered_mb
2787    }
2788
2789    pub fn drill_down_into_group(&mut self, group_index: usize) -> Result<()> {
2790        if !self.is_grouped() {
2791            return Ok(());
2792        }
2793
2794        self.grouped_lf = Some(self.lf.clone());
2795
2796        let grouped_df = collect_lazy(self.lf.clone(), self.polars_streaming)?;
2797
2798        if group_index >= grouped_df.height() {
2799            return Err(color_eyre::eyre::eyre!("Group index out of bounds"));
2800        }
2801
2802        let key_columns = self.group_key_columns();
2803        let mut key_values = Vec::new();
2804        for col_name in &key_columns {
2805            let col = grouped_df.column(col_name)?;
2806            let value = col.get(group_index).map_err(|e| {
2807                color_eyre::eyre::eyre!(
2808                    "Group index {} out of bounds for column {}: {}",
2809                    group_index,
2810                    col_name,
2811                    e
2812                )
2813            })?;
2814            key_values.push(value.str_value().to_string());
2815        }
2816        self.drilled_down_group_key = Some(key_values.clone());
2817        self.drilled_down_group_key_columns = Some(key_columns.clone());
2818
2819        let value_columns = self.group_value_columns();
2820        if value_columns.is_empty() {
2821            return Err(color_eyre::eyre::eyre!("No value columns in grouped data"));
2822        }
2823
2824        let mut columns = Vec::new();
2825
2826        let first_value_col = grouped_df.column(&value_columns[0])?;
2827        let first_list_value = first_value_col.get(group_index).map_err(|e| {
2828            color_eyre::eyre::eyre!("Group index {} out of bounds: {}", group_index, e)
2829        })?;
2830        let row_count = if let AnyValue::List(list_series) = first_list_value {
2831            list_series.len()
2832        } else {
2833            0
2834        };
2835
2836        for col_name in &key_columns {
2837            let col = grouped_df.column(col_name)?;
2838            let value = col.get(group_index).map_err(|e| {
2839                color_eyre::eyre::eyre!(
2840                    "Group index {} out of bounds for column {}: {}",
2841                    group_index,
2842                    col_name,
2843                    e
2844                )
2845            })?;
2846            let constant_series = match value {
2847                AnyValue::Int32(v) => Series::new(col_name.as_str().into(), vec![v; row_count]),
2848                AnyValue::Int64(v) => Series::new(col_name.as_str().into(), vec![v; row_count]),
2849                AnyValue::UInt32(v) => Series::new(col_name.as_str().into(), vec![v; row_count]),
2850                AnyValue::UInt64(v) => Series::new(col_name.as_str().into(), vec![v; row_count]),
2851                AnyValue::Float32(v) => Series::new(col_name.as_str().into(), vec![v; row_count]),
2852                AnyValue::Float64(v) => Series::new(col_name.as_str().into(), vec![v; row_count]),
2853                AnyValue::String(v) => {
2854                    Series::new(col_name.as_str().into(), vec![v.to_string(); row_count])
2855                }
2856                AnyValue::Boolean(v) => Series::new(col_name.as_str().into(), vec![v; row_count]),
2857                _ => {
2858                    let str_val = value.str_value().to_string();
2859                    Series::new(col_name.as_str().into(), vec![str_val; row_count])
2860                }
2861            };
2862            columns.push(constant_series.into());
2863        }
2864
2865        for col_name in &value_columns {
2866            let col = grouped_df.column(col_name)?;
2867            let value = col.get(group_index).map_err(|e| {
2868                color_eyre::eyre::eyre!(
2869                    "Group index {} out of bounds for column {}: {}",
2870                    group_index,
2871                    col_name,
2872                    e
2873                )
2874            })?;
2875            if let AnyValue::List(list_series) = value {
2876                let named_series = list_series.with_name(col_name.as_str().into());
2877                columns.push(named_series.into());
2878            }
2879        }
2880
2881        let group_df = DataFrame::new(columns)?;
2882
2883        self.invalidate_num_rows();
2884        self.lf = group_df.lazy();
2885        self.schema = self.lf.clone().collect_schema()?;
2886        self.column_order = self.schema.iter_names().map(|s| s.to_string()).collect();
2887        self.drilled_down_group_index = Some(group_index);
2888        self.start_row = 0;
2889        self.termcol_index = 0;
2890        self.locked_columns_count = 0;
2891        self.table_state.select(Some(0));
2892        self.collect();
2893
2894        Ok(())
2895    }
2896
2897    pub fn drill_up(&mut self) -> Result<()> {
2898        if let Some(grouped_lf) = self.grouped_lf.take() {
2899            self.invalidate_num_rows();
2900            self.lf = grouped_lf;
2901            self.schema = self.lf.clone().collect_schema()?;
2902            self.column_order = self.schema.iter_names().map(|s| s.to_string()).collect();
2903            self.drilled_down_group_index = None;
2904            self.drilled_down_group_key = None;
2905            self.drilled_down_group_key_columns = None;
2906            self.start_row = 0;
2907            self.termcol_index = 0;
2908            self.locked_columns_count = 0;
2909            self.table_state.select(Some(0));
2910            self.collect();
2911            Ok(())
2912        } else {
2913            Err(color_eyre::eyre::eyre!("Not in drill-down mode"))
2914        }
2915    }
2916
2917    pub fn get_analysis_dataframe(&self) -> Result<DataFrame> {
2918        Ok(collect_lazy(self.lf.clone(), self.polars_streaming)?)
2919    }
2920
2921    pub fn get_analysis_context(&self) -> crate::statistics::AnalysisContext {
2922        crate::statistics::AnalysisContext {
2923            has_query: !self.active_query.is_empty(),
2924            query: self.active_query.clone(),
2925            has_filters: !self.filters.is_empty(),
2926            filter_count: self.filters.len(),
2927            is_drilled_down: self.is_drilled_down(),
2928            group_key: self.drilled_down_group_key.clone(),
2929            group_columns: self.drilled_down_group_key_columns.clone(),
2930        }
2931    }
2932
2933    /// Polars 0.52 pivot_stable panics (from_physical Date/UInt32) when index is Date/Datetime. Cast to Int32, restore after.
2934    /// Returns (modified df, list of (column name, original dtype) to restore after pivot).
2935    fn cast_temporal_index_columns_for_pivot(
2936        df: &DataFrame,
2937        index: &[String],
2938    ) -> Result<(DataFrame, Vec<(String, DataType)>)> {
2939        let mut out = df.clone();
2940        let mut restore = Vec::new();
2941        for name in index {
2942            if let Ok(s) = out.column(name) {
2943                let dtype = s.dtype();
2944                if matches!(dtype, DataType::Date | DataType::Datetime(_, _)) {
2945                    restore.push((name.clone(), dtype.clone()));
2946                    let casted = s.cast(&DataType::Int32)?;
2947                    out.with_column(casted)?;
2948                }
2949            }
2950        }
2951        Ok((out, restore))
2952    }
2953
2954    /// Restore Date/Datetime types on index columns after pivot.
2955    fn restore_temporal_index_columns_after_pivot(
2956        pivoted: &mut DataFrame,
2957        restore: &[(String, DataType)],
2958    ) -> Result<()> {
2959        for (name, dtype) in restore {
2960            if let Ok(s) = pivoted.column(name) {
2961                let restored = s.cast(dtype)?;
2962                pivoted.with_column(restored)?;
2963            }
2964        }
2965        Ok(())
2966    }
2967
2968    /// Pivot the current `LazyFrame` (long → wide). Never uses `original_lf`.
2969    /// Collects current `lf`, runs `pivot_stable`, then replaces `lf` with result.
2970    /// We use pivot_stable for all aggregation types: Polars' non-stable pivot() prints
2971    /// "unstable pivot not yet supported, using stable pivot" to stdout, which corrupts the TUI.
2972    pub fn pivot(&mut self, spec: &PivotSpec) -> Result<()> {
2973        let df = collect_lazy(self.lf.clone(), self.polars_streaming)?;
2974        let agg_expr = pivot_agg_expr(spec.aggregation)?;
2975        let index_str: Vec<&str> = spec.index.iter().map(|s| s.as_str()).collect();
2976        let index_opt = if index_str.is_empty() {
2977            None
2978        } else {
2979            Some(index_str)
2980        };
2981
2982        let (df_for_pivot, temporal_index_restore) = if self.workaround_pivot_date_index {
2983            let (df_w, restore) =
2984                Self::cast_temporal_index_columns_for_pivot(&df, spec.index.as_slice())?;
2985            (df_w, Some(restore))
2986        } else {
2987            (df.clone(), None)
2988        };
2989        let sort_new_columns = spec.sort_columns.unwrap_or(true);
2990        let mut pivoted = pivot_stable(
2991            &df_for_pivot,
2992            [spec.pivot_column.as_str()],
2993            index_opt,
2994            Some([spec.value_column.as_str()]),
2995            sort_new_columns,
2996            Some(agg_expr),
2997            None,
2998        )?;
2999        if let Some(restore) = &temporal_index_restore {
3000            Self::restore_temporal_index_columns_after_pivot(&mut pivoted, restore)?;
3001        }
3002
3003        self.last_pivot_spec = Some(spec.clone());
3004        self.last_melt_spec = None;
3005        self.replace_lf_after_reshape(pivoted.lazy())?;
3006        Ok(())
3007    }
3008
3009    /// Melt the current `LazyFrame` (wide → long). Never uses `original_lf`.
3010    pub fn melt(&mut self, spec: &MeltSpec) -> Result<()> {
3011        let on = cols(spec.value_columns.iter().map(|s| s.as_str()));
3012        let index = cols(spec.index.iter().map(|s| s.as_str()));
3013        let args = UnpivotArgsDSL {
3014            on,
3015            index,
3016            variable_name: Some(PlSmallStr::from(spec.variable_name.as_str())),
3017            value_name: Some(PlSmallStr::from(spec.value_name.as_str())),
3018        };
3019        let lf = self.lf.clone().unpivot(args);
3020        self.last_melt_spec = Some(spec.clone());
3021        self.last_pivot_spec = None;
3022        self.replace_lf_after_reshape(lf)?;
3023        Ok(())
3024    }
3025
3026    fn replace_lf_after_reshape(&mut self, lf: LazyFrame) -> Result<()> {
3027        self.invalidate_num_rows();
3028        self.lf = lf;
3029        self.schema = self.lf.clone().collect_schema()?;
3030        self.column_order = self.schema.iter_names().map(|s| s.to_string()).collect();
3031        self.filters.clear();
3032        self.sort_columns.clear();
3033        self.active_query.clear();
3034        self.active_sql_query.clear();
3035        self.active_fuzzy_query.clear();
3036        self.error = None;
3037        self.df = None;
3038        self.locked_df = None;
3039        self.grouped_lf = None;
3040        self.drilled_down_group_index = None;
3041        self.drilled_down_group_key = None;
3042        self.drilled_down_group_key_columns = None;
3043        self.start_row = 0;
3044        self.termcol_index = 0;
3045        self.locked_columns_count = 0;
3046        self.buffered_start_row = 0;
3047        self.buffered_end_row = 0;
3048        self.buffered_df = None;
3049        self.table_state.select(Some(0));
3050        self.collect();
3051        Ok(())
3052    }
3053
3054    pub fn is_drilled_down(&self) -> bool {
3055        self.drilled_down_group_index.is_some()
3056    }
3057
3058    fn apply_transformations(&mut self) {
3059        let mut lf = self.lf.clone();
3060        let mut final_expr: Option<Expr> = None;
3061
3062        for filter in &self.filters {
3063            let col_expr = col(&filter.column);
3064            let val_lit = if let Some(dtype) = self.schema.get(&filter.column) {
3065                match dtype {
3066                    DataType::Float32 | DataType::Float64 => filter
3067                        .value
3068                        .parse::<f64>()
3069                        .map(lit)
3070                        .unwrap_or_else(|_| lit(filter.value.as_str())),
3071                    DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => filter
3072                        .value
3073                        .parse::<i64>()
3074                        .map(lit)
3075                        .unwrap_or_else(|_| lit(filter.value.as_str())),
3076                    DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 => {
3077                        filter
3078                            .value
3079                            .parse::<u64>()
3080                            .map(lit)
3081                            .unwrap_or_else(|_| lit(filter.value.as_str()))
3082                    }
3083                    DataType::Boolean => filter
3084                        .value
3085                        .parse::<bool>()
3086                        .map(lit)
3087                        .unwrap_or_else(|_| lit(filter.value.as_str())),
3088                    _ => lit(filter.value.as_str()),
3089                }
3090            } else {
3091                lit(filter.value.as_str())
3092            };
3093
3094            let op_expr = match filter.operator {
3095                FilterOperator::Eq => col_expr.eq(val_lit),
3096                FilterOperator::NotEq => col_expr.neq(val_lit),
3097                FilterOperator::Gt => col_expr.gt(val_lit),
3098                FilterOperator::Lt => col_expr.lt(val_lit),
3099                FilterOperator::GtEq => col_expr.gt_eq(val_lit),
3100                FilterOperator::LtEq => col_expr.lt_eq(val_lit),
3101                FilterOperator::Contains => {
3102                    let val = filter.value.clone();
3103                    col_expr.str().contains_literal(lit(val))
3104                }
3105                FilterOperator::NotContains => {
3106                    let val = filter.value.clone();
3107                    col_expr.str().contains_literal(lit(val)).not()
3108                }
3109            };
3110
3111            if let Some(current) = final_expr {
3112                final_expr = Some(match filter.logical_op {
3113                    LogicalOperator::And => current.and(op_expr),
3114                    LogicalOperator::Or => current.or(op_expr),
3115                });
3116            } else {
3117                final_expr = Some(op_expr);
3118            }
3119        }
3120
3121        if let Some(e) = final_expr {
3122            lf = lf.filter(e);
3123        }
3124
3125        if !self.sort_columns.is_empty() {
3126            let options = SortMultipleOptions {
3127                descending: self
3128                    .sort_columns
3129                    .iter()
3130                    .map(|_| !self.sort_ascending)
3131                    .collect(),
3132                ..Default::default()
3133            };
3134            lf = lf.sort_by_exprs(
3135                self.sort_columns.iter().map(col).collect::<Vec<_>>(),
3136                options,
3137            );
3138        } else if !self.sort_ascending {
3139            lf = lf.reverse();
3140        }
3141
3142        self.invalidate_num_rows();
3143        self.lf = lf;
3144        self.collect();
3145    }
3146
3147    pub fn sort(&mut self, columns: Vec<String>, ascending: bool) {
3148        self.sort_columns = columns;
3149        self.sort_ascending = ascending;
3150        self.buffered_start_row = 0;
3151        self.buffered_end_row = 0;
3152        self.buffered_df = None;
3153        self.apply_transformations();
3154    }
3155
3156    pub fn reverse(&mut self) {
3157        self.sort_ascending = !self.sort_ascending;
3158
3159        self.buffered_start_row = 0;
3160        self.buffered_end_row = 0;
3161        self.buffered_df = None;
3162
3163        if !self.sort_columns.is_empty() {
3164            let options = SortMultipleOptions {
3165                descending: self
3166                    .sort_columns
3167                    .iter()
3168                    .map(|_| !self.sort_ascending)
3169                    .collect(),
3170                ..Default::default()
3171            };
3172            self.invalidate_num_rows();
3173            self.lf = self.lf.clone().sort_by_exprs(
3174                self.sort_columns.iter().map(col).collect::<Vec<_>>(),
3175                options,
3176            );
3177            self.collect();
3178        } else {
3179            self.invalidate_num_rows();
3180            self.lf = self.lf.clone().reverse();
3181            self.collect();
3182        }
3183    }
3184
3185    pub fn filter(&mut self, filters: Vec<FilterStatement>) {
3186        self.filters = filters;
3187        self.buffered_start_row = 0;
3188        self.buffered_end_row = 0;
3189        self.buffered_df = None;
3190        self.apply_transformations();
3191    }
3192
3193    pub fn query(&mut self, query: String) {
3194        self.error = None;
3195
3196        let trimmed_query = query.trim();
3197        if trimmed_query.is_empty() {
3198            self.reset_lf_to_original();
3199            self.collect();
3200            return;
3201        }
3202
3203        match parse_query(&query) {
3204            Ok((cols, filter, group_by_cols, group_by_col_names)) => {
3205                let mut lf = self.original_lf.clone();
3206                let mut schema_opt: Option<Arc<Schema>> = None;
3207
3208                // Apply filter first (where clause)
3209                if let Some(f) = filter {
3210                    lf = lf.filter(f);
3211                }
3212
3213                if !group_by_cols.is_empty() {
3214                    if !cols.is_empty() {
3215                        lf = lf.group_by(group_by_cols.clone()).agg(cols);
3216                    } else {
3217                        let schema = match lf.clone().collect_schema() {
3218                            Ok(s) => s,
3219                            Err(e) => {
3220                                self.error = Some(e);
3221                                return; // Don't modify state on error
3222                            }
3223                        };
3224                        let all_columns: Vec<String> =
3225                            schema.iter_names().map(|s| s.to_string()).collect();
3226
3227                        // In Polars, when you group_by and aggregate columns without explicit aggregation functions,
3228                        // Polars automatically collects the values as lists. We need to aggregate all columns
3229                        // except the group columns to avoid duplicates.
3230                        let mut agg_exprs = Vec::new();
3231                        for col_name in &all_columns {
3232                            if !group_by_col_names.contains(col_name) {
3233                                agg_exprs.push(col(col_name));
3234                            }
3235                        }
3236
3237                        lf = lf.group_by(group_by_cols.clone()).agg(agg_exprs);
3238                    }
3239                    // Sort by the result's group-key column names (first N columns after agg).
3240                    // Works for aliased or plain names without relying on parser-derived names.
3241                    let schema = match lf.collect_schema() {
3242                        Ok(s) => s,
3243                        Err(e) => {
3244                            self.error = Some(e);
3245                            return;
3246                        }
3247                    };
3248                    schema_opt = Some(schema.clone());
3249                    let sort_exprs: Vec<Expr> = schema
3250                        .iter_names()
3251                        .take(group_by_cols.len())
3252                        .map(|n| col(n.as_str()))
3253                        .collect();
3254                    lf = lf.sort_by_exprs(sort_exprs, Default::default());
3255                } else if !cols.is_empty() {
3256                    lf = lf.select(cols);
3257                }
3258
3259                let schema = match schema_opt {
3260                    Some(s) => s,
3261                    None => match lf.collect_schema() {
3262                        Ok(s) => s,
3263                        Err(e) => {
3264                            self.error = Some(e);
3265                            return;
3266                        }
3267                    },
3268                };
3269
3270                self.schema = schema;
3271                self.invalidate_num_rows();
3272                self.lf = lf;
3273                self.column_order = self.schema.iter_names().map(|s| s.to_string()).collect();
3274
3275                // Lock grouped columns if by clause was used
3276                // Only lock the columns specified in the 'by' clause, not the value columns
3277                if !group_by_col_names.is_empty() {
3278                    // Group columns appear first in Polars results, so count consecutive
3279                    // columns from the start that are in group_by_col_names
3280                    let mut locked_count = 0;
3281                    for col_name in &self.column_order {
3282                        if group_by_col_names.contains(col_name) {
3283                            locked_count += 1;
3284                        } else {
3285                            // Once we hit a non-group column, we've passed all group columns
3286                            break;
3287                        }
3288                    }
3289                    self.locked_columns_count = locked_count;
3290                } else {
3291                    self.locked_columns_count = 0;
3292                }
3293
3294                // Clear filters when using query
3295                self.filters.clear();
3296                self.sort_columns.clear();
3297                self.sort_ascending = true;
3298                self.start_row = 0;
3299                self.termcol_index = 0;
3300                self.active_query = query;
3301                self.buffered_start_row = 0;
3302                self.buffered_end_row = 0;
3303                self.buffered_df = None;
3304                // Reset drill-down state when applying new query
3305                self.drilled_down_group_index = None;
3306                self.drilled_down_group_key = None;
3307                self.drilled_down_group_key_columns = None;
3308                self.grouped_lf = None;
3309                // Reset table state selection
3310                self.table_state.select(Some(0));
3311                // Collect will clamp start_row to valid range, but we want to ensure it's 0
3312                // So we set it to 0, collect (which may clamp it), then ensure it's 0 again
3313                self.collect();
3314                // After collect(), ensure we're at the top (collect() may have clamped if num_rows was wrong)
3315                // But if num_rows > 0, we want start_row = 0 to show the first row
3316                if self.num_rows > 0 {
3317                    self.start_row = 0;
3318                }
3319            }
3320            Err(e) => {
3321                // Parse errors are already user-facing strings; store as ComputeError
3322                self.error = Some(PolarsError::ComputeError(e.into()));
3323            }
3324        }
3325    }
3326
3327    /// Execute a SQL query against the current LazyFrame (registered as table "df").
3328    /// Empty SQL resets to original state. Does not call collect(); the event loop does that via AppEvent::Collect.
3329    pub fn sql_query(&mut self, sql: String) {
3330        self.error = None;
3331        let trimmed = sql.trim();
3332        if trimmed.is_empty() {
3333            self.reset_lf_to_original();
3334            return;
3335        }
3336
3337        #[cfg(feature = "sql")]
3338        {
3339            use polars_sql::SQLContext;
3340            let mut ctx = SQLContext::new();
3341            ctx.register("df", self.lf.clone());
3342            match ctx.execute(trimmed) {
3343                Ok(result_lf) => {
3344                    let schema = match result_lf.clone().collect_schema() {
3345                        Ok(s) => s,
3346                        Err(e) => {
3347                            self.error = Some(e);
3348                            return;
3349                        }
3350                    };
3351                    self.schema = schema;
3352                    self.invalidate_num_rows();
3353                    self.lf = result_lf;
3354                    self.column_order = self.schema.iter_names().map(|s| s.to_string()).collect();
3355                    self.active_sql_query = sql;
3356                    self.locked_columns_count = 0;
3357                    self.filters.clear();
3358                    self.sort_columns.clear();
3359                    self.sort_ascending = true;
3360                    self.start_row = 0;
3361                    self.termcol_index = 0;
3362                    self.drilled_down_group_index = None;
3363                    self.drilled_down_group_key = None;
3364                    self.drilled_down_group_key_columns = None;
3365                    self.grouped_lf = None;
3366                    self.buffered_start_row = 0;
3367                    self.buffered_end_row = 0;
3368                    self.buffered_df = None;
3369                    self.table_state.select(Some(0));
3370                }
3371                Err(e) => {
3372                    self.error = Some(e);
3373                }
3374            }
3375        }
3376
3377        #[cfg(not(feature = "sql"))]
3378        {
3379            self.error = Some(PolarsError::ComputeError(
3380                format!("SQL support not compiled in (build with --features sql)").into(),
3381            ));
3382        }
3383    }
3384
3385    /// Fuzzy search: filter rows where any string column matches the query.
3386    /// Query is split on whitespace; each token must match (in order, case-insensitive) in some string column.
3387    /// Empty query resets to original_lf.
3388    pub fn fuzzy_search(&mut self, query: String) {
3389        self.error = None;
3390        let trimmed = query.trim();
3391        if trimmed.is_empty() {
3392            self.reset_lf_to_original();
3393            self.collect();
3394            return;
3395        }
3396        let string_cols: Vec<String> = self
3397            .schema
3398            .iter()
3399            .filter(|(_, dtype)| dtype.is_string())
3400            .map(|(name, _)| name.to_string())
3401            .collect();
3402        if string_cols.is_empty() {
3403            self.error = Some(PolarsError::ComputeError(
3404                "Fuzzy search requires at least one string column".into(),
3405            ));
3406            return;
3407        }
3408        let tokens: Vec<&str> = trimmed
3409            .split_whitespace()
3410            .filter(|s| !s.is_empty())
3411            .collect();
3412        let token_exprs: Vec<Expr> = tokens
3413            .iter()
3414            .map(|token| {
3415                let pattern = fuzzy_token_regex(token);
3416                string_cols
3417                    .iter()
3418                    .map(|c| col(c.as_str()).str().contains(lit(pattern.as_str()), false))
3419                    .reduce(|a, b| a.or(b))
3420                    .unwrap()
3421            })
3422            .collect();
3423        let combined = token_exprs.into_iter().reduce(|a, b| a.and(b)).unwrap();
3424        self.lf = self.original_lf.clone().filter(combined);
3425        self.filters.clear();
3426        self.sort_columns.clear();
3427        self.active_query.clear();
3428        self.active_sql_query.clear();
3429        self.active_fuzzy_query = query;
3430        // Reset view and buffer so collect() runs on the new lf
3431        self.locked_columns_count = 0;
3432        self.start_row = 0;
3433        self.termcol_index = 0;
3434        self.drilled_down_group_index = None;
3435        self.drilled_down_group_key = None;
3436        self.drilled_down_group_key_columns = None;
3437        self.grouped_lf = None;
3438        self.buffered_start_row = 0;
3439        self.buffered_end_row = 0;
3440        self.buffered_df = None;
3441        self.table_state.select(Some(0));
3442        self.invalidate_num_rows();
3443        self.collect();
3444    }
3445}
3446
3447/// Case-insensitive regex for one token: chars in order with `.*` between.
3448pub(crate) fn fuzzy_token_regex(token: &str) -> String {
3449    let inner: String =
3450        token
3451            .chars()
3452            .map(|c| regex::escape(&c.to_string()))
3453            .fold(String::new(), |mut s, e| {
3454                if !s.is_empty() {
3455                    s.push_str(".*");
3456                }
3457                s.push_str(&e);
3458                s
3459            });
3460    format!("(?i).*{}.*", inner)
3461}
3462
3463pub struct DataTable {
3464    pub header_bg: Color,
3465    pub header_fg: Color,
3466    pub row_numbers_fg: Color,
3467    pub separator_fg: Color,
3468    pub table_cell_padding: u16,
3469    pub alternate_row_bg: Option<Color>,
3470    /// When true, colorize cells by column type using the optional colors below.
3471    pub column_colors: bool,
3472    pub str_col: Option<Color>,
3473    pub int_col: Option<Color>,
3474    pub float_col: Option<Color>,
3475    pub bool_col: Option<Color>,
3476    pub temporal_col: Option<Color>,
3477}
3478
3479impl Default for DataTable {
3480    fn default() -> Self {
3481        Self {
3482            header_bg: Color::Indexed(236),
3483            header_fg: Color::White,
3484            row_numbers_fg: Color::DarkGray,
3485            separator_fg: Color::White,
3486            table_cell_padding: 1,
3487            alternate_row_bg: None,
3488            column_colors: false,
3489            str_col: None,
3490            int_col: None,
3491            float_col: None,
3492            bool_col: None,
3493            temporal_col: None,
3494        }
3495    }
3496}
3497
3498/// Parameters for rendering the row numbers column.
3499struct RowNumbersParams {
3500    start_row: usize,
3501    visible_rows: usize,
3502    num_rows: usize,
3503    row_start_index: usize,
3504    selected_row: Option<usize>,
3505}
3506
3507impl DataTable {
3508    pub fn new() -> Self {
3509        Self::default()
3510    }
3511
3512    pub fn with_colors(
3513        mut self,
3514        header_bg: Color,
3515        header_fg: Color,
3516        row_numbers_fg: Color,
3517        separator_fg: Color,
3518    ) -> Self {
3519        self.header_bg = header_bg;
3520        self.header_fg = header_fg;
3521        self.row_numbers_fg = row_numbers_fg;
3522        self.separator_fg = separator_fg;
3523        self
3524    }
3525
3526    pub fn with_cell_padding(mut self, padding: u16) -> Self {
3527        self.table_cell_padding = padding;
3528        self
3529    }
3530
3531    pub fn with_alternate_row_bg(mut self, color: Option<Color>) -> Self {
3532        self.alternate_row_bg = color;
3533        self
3534    }
3535
3536    /// Enable column-type coloring and set colors for string, int, float, bool, and temporal columns.
3537    pub fn with_column_type_colors(
3538        mut self,
3539        str_col: Color,
3540        int_col: Color,
3541        float_col: Color,
3542        bool_col: Color,
3543        temporal_col: Color,
3544    ) -> Self {
3545        self.column_colors = true;
3546        self.str_col = Some(str_col);
3547        self.int_col = Some(int_col);
3548        self.float_col = Some(float_col);
3549        self.bool_col = Some(bool_col);
3550        self.temporal_col = Some(temporal_col);
3551        self
3552    }
3553
3554    /// Return the color for a column dtype when column_colors is enabled.
3555    fn column_type_color(&self, dtype: &DataType) -> Option<Color> {
3556        if !self.column_colors {
3557            return None;
3558        }
3559        match dtype {
3560            DataType::String => self.str_col,
3561            DataType::Int8
3562            | DataType::Int16
3563            | DataType::Int32
3564            | DataType::Int64
3565            | DataType::UInt8
3566            | DataType::UInt16
3567            | DataType::UInt32
3568            | DataType::UInt64 => self.int_col,
3569            DataType::Float32 | DataType::Float64 => self.float_col,
3570            DataType::Boolean => self.bool_col,
3571            DataType::Date | DataType::Datetime(_, _) | DataType::Time | DataType::Duration(_) => {
3572                self.temporal_col
3573            }
3574            _ => None,
3575        }
3576    }
3577
3578    fn render_dataframe(
3579        &self,
3580        df: &DataFrame,
3581        area: Rect,
3582        buf: &mut Buffer,
3583        state: &mut TableState,
3584        _row_numbers: bool,
3585        _start_row_offset: usize,
3586    ) {
3587        // make each column as wide as it needs to be to fit the content
3588        let (height, cols) = df.shape();
3589
3590        // widths starts at the length of each column naame
3591        let mut widths: Vec<u16> = df
3592            .get_column_names()
3593            .iter()
3594            .map(|name| name.chars().count() as u16)
3595            .collect();
3596
3597        let mut used_width = 0;
3598
3599        // rows is a vector initialized to a vector of lenth "height" empty rows
3600        let mut rows: Vec<Vec<Cell>> = vec![vec![]; height];
3601        let mut visible_columns = 0;
3602
3603        let max_rows = height.min(if area.height > 1 {
3604            area.height as usize - 1
3605        } else {
3606            0
3607        });
3608
3609        for col_index in 0..cols {
3610            let mut max_len = widths[col_index];
3611            let col_data = &df[col_index];
3612            let col_color = self.column_type_color(col_data.dtype());
3613
3614            for (row_index, row) in rows.iter_mut().take(max_rows).enumerate() {
3615                let value = col_data.get(row_index).unwrap();
3616                let val_str: Cow<str> = if matches!(value, AnyValue::Null) {
3617                    Cow::Borrowed("")
3618                } else {
3619                    value.str_value()
3620                };
3621                let len = val_str.chars().count() as u16;
3622                max_len = max_len.max(len);
3623                let cell = match col_color {
3624                    Some(c) => Cell::from(Line::from(Span::styled(
3625                        val_str.into_owned(),
3626                        Style::default().fg(c),
3627                    ))),
3628                    None => Cell::from(Line::from(val_str)),
3629                };
3630                row.push(cell);
3631            }
3632
3633            // Use > not >= so the last column is shown when it fits exactly (no padding needed after it)
3634            let overflows = (used_width + max_len) > area.width;
3635
3636            if overflows && col_data.dtype() == &DataType::String {
3637                let visible_width = area.width.saturating_sub(used_width);
3638                visible_columns += 1;
3639                widths[col_index] = visible_width;
3640                break;
3641            } else if !overflows {
3642                visible_columns += 1;
3643                widths[col_index] = max_len;
3644                used_width += max_len + self.table_cell_padding;
3645            } else {
3646                break;
3647            }
3648        }
3649
3650        widths.truncate(visible_columns);
3651        // convert rows to a vector of Row, with optional alternate row background
3652        let rows: Vec<Row> = rows
3653            .into_iter()
3654            .enumerate()
3655            .map(|(row_index, mut row)| {
3656                row.truncate(visible_columns);
3657                let row_style = if row_index % 2 == 1 {
3658                    self.alternate_row_bg
3659                        .map(|c| Style::default().bg(c))
3660                        .unwrap_or_default()
3661                } else {
3662                    Style::default()
3663                };
3664                Row::new(row).style(row_style)
3665            })
3666            .collect();
3667
3668        let header_row_style = if self.header_bg == Color::Reset {
3669            Style::default().fg(self.header_fg)
3670        } else {
3671            Style::default().bg(self.header_bg).fg(self.header_fg)
3672        };
3673        let headers: Vec<Span> = df
3674            .get_column_names()
3675            .iter()
3676            .take(visible_columns)
3677            .map(|name| Span::styled(name.to_string(), Style::default()))
3678            .collect();
3679
3680        StatefulWidget::render(
3681            Table::new(rows, widths)
3682                .column_spacing(self.table_cell_padding)
3683                .header(Row::new(headers).style(header_row_style))
3684                .row_highlight_style(Style::default().add_modifier(Modifier::REVERSED)),
3685            area,
3686            buf,
3687            state,
3688        );
3689    }
3690
3691    fn render_row_numbers(&self, area: Rect, buf: &mut Buffer, params: RowNumbersParams) {
3692        // Header row: same style as the rest of the column headers (fill full width so color matches)
3693        let header_style = if self.header_bg == Color::Reset {
3694            Style::default().fg(self.header_fg)
3695        } else {
3696            Style::default().bg(self.header_bg).fg(self.header_fg)
3697        };
3698        let header_fill = " ".repeat(area.width as usize);
3699        Paragraph::new(header_fill).style(header_style).render(
3700            Rect {
3701                x: area.x,
3702                y: area.y,
3703                width: area.width,
3704                height: 1,
3705            },
3706            buf,
3707        );
3708
3709        // Only render up to the actual number of rows in the data
3710        let rows_to_render = params
3711            .visible_rows
3712            .min(params.num_rows.saturating_sub(params.start_row));
3713
3714        if rows_to_render == 0 {
3715            return;
3716        }
3717
3718        // Calculate width needed for largest row number
3719        let max_row_num =
3720            params.start_row + rows_to_render.saturating_sub(1) + params.row_start_index;
3721        let max_width = max_row_num.to_string().len();
3722
3723        // Render row numbers
3724        for row_idx in 0..rows_to_render.min(area.height.saturating_sub(1) as usize) {
3725            let row_num = params.start_row + row_idx + params.row_start_index;
3726            let row_num_text = row_num.to_string();
3727
3728            // Right-align row numbers within the available width
3729            let padding = max_width.saturating_sub(row_num_text.len());
3730            let padded_text = format!("{}{}", " ".repeat(padding), row_num_text);
3731
3732            // Match main table background: default when row is even (or no alternate);
3733            // when alternate_row_bg is set, odd rows use that background.
3734            // When selected: same background as row (no inversion), foreground = terminal default.
3735            let is_selected = params.selected_row == Some(row_idx);
3736            let (fg, bg) = if is_selected {
3737                (
3738                    Color::Reset,
3739                    self.alternate_row_bg.filter(|_| row_idx % 2 == 1),
3740                )
3741            } else {
3742                (
3743                    self.row_numbers_fg,
3744                    self.alternate_row_bg.filter(|_| row_idx % 2 == 1),
3745                )
3746            };
3747            let row_num_style = match bg {
3748                Some(bg_color) => Style::default().fg(fg).bg(bg_color),
3749                None => Style::default().fg(fg),
3750            };
3751
3752            let y = area.y + row_idx as u16 + 1; // +1 for header row
3753            if y < area.y + area.height {
3754                Paragraph::new(padded_text).style(row_num_style).render(
3755                    Rect {
3756                        x: area.x,
3757                        y,
3758                        width: area.width,
3759                        height: 1,
3760                    },
3761                    buf,
3762                );
3763            }
3764        }
3765    }
3766}
3767
3768impl StatefulWidget for DataTable {
3769    type State = DataTableState;
3770
3771    fn render(self, area: Rect, buf: &mut Buffer, state: &mut Self::State) {
3772        state.visible_termcols = area.width as usize;
3773        let new_visible_rows = if area.height > 0 {
3774            (area.height - 1) as usize
3775        } else {
3776            0
3777        };
3778        let needs_collect = new_visible_rows != state.visible_rows;
3779        state.visible_rows = new_visible_rows;
3780
3781        if let Some(selected) = state.table_state.selected() {
3782            if selected >= state.visible_rows {
3783                state.table_state.select(Some(state.visible_rows - 1))
3784            }
3785        }
3786
3787        if needs_collect {
3788            state.collect();
3789        }
3790
3791        // Only show errors in main view if not suppressed (e.g., when query input is active)
3792        // Query errors should only be shown in the query input frame
3793        if let Some(error) = state.error.as_ref() {
3794            if !state.suppress_error_display {
3795                Paragraph::new(format!("Error: {}", user_message_from_polars(error)))
3796                    .centered()
3797                    .block(
3798                        Block::default()
3799                            .borders(Borders::NONE)
3800                            .padding(Padding::top(area.height / 2)),
3801                    )
3802                    .wrap(ratatui::widgets::Wrap { trim: true })
3803                    .render(area, buf);
3804                return;
3805            }
3806            // If suppress_error_display is true, continue rendering the table normally
3807        }
3808
3809        // Calculate row number column width if enabled
3810        let row_num_width = if state.row_numbers {
3811            let max_row_num = state.start_row + state.visible_rows.saturating_sub(1) + 1; // +1 for 1-based, +1 for potential
3812            max_row_num.to_string().len().max(1) as u16 + 1 // +1 for spacing
3813        } else {
3814            0
3815        };
3816
3817        // Calculate locked columns width if any
3818        let mut locked_width = row_num_width;
3819        if let Some(locked_df) = state.locked_df.as_ref() {
3820            let (_, cols) = locked_df.shape();
3821            for col_index in 0..cols {
3822                let col_name = locked_df.get_column_names()[col_index];
3823                let mut max_len = col_name.chars().count() as u16;
3824                let col_data = &locked_df[col_index];
3825                for row_index in 0..locked_df.height().min(state.visible_rows) {
3826                    let value = col_data.get(row_index).unwrap();
3827                    let val_str: Cow<str> = if matches!(value, AnyValue::Null) {
3828                        Cow::Borrowed("")
3829                    } else {
3830                        value.str_value()
3831                    };
3832                    let len = val_str.chars().count() as u16;
3833                    max_len = max_len.max(len);
3834                }
3835                locked_width += max_len + 1;
3836            }
3837        }
3838
3839        // Split area into locked and scrollable parts
3840        if locked_width > row_num_width && locked_width < area.width {
3841            let locked_area = Rect {
3842                x: area.x,
3843                y: area.y,
3844                width: locked_width,
3845                height: area.height,
3846            };
3847            let separator_x = locked_area.x + locked_area.width;
3848
3849            // If row numbers are enabled, render them first in a separate area
3850            if state.row_numbers {
3851                let row_num_area = Rect {
3852                    x: area.x,
3853                    y: area.y,
3854                    width: row_num_width,
3855                    height: area.height,
3856                };
3857                self.render_row_numbers(
3858                    row_num_area,
3859                    buf,
3860                    RowNumbersParams {
3861                        start_row: state.start_row,
3862                        visible_rows: state.visible_rows,
3863                        num_rows: state.num_rows,
3864                        row_start_index: state.row_start_index,
3865                        selected_row: state.table_state.selected(),
3866                    },
3867                );
3868            }
3869            let scrollable_area = Rect {
3870                x: separator_x + 1,
3871                y: area.y,
3872                width: area.width.saturating_sub(locked_width + 1),
3873                height: area.height,
3874            };
3875
3876            // Render locked columns (no background shading, just the vertical separator)
3877            if let Some(locked_df) = state.locked_df.as_ref() {
3878                // Adjust locked_area to account for row numbers if present
3879                let adjusted_locked_area = if state.row_numbers {
3880                    Rect {
3881                        x: area.x + row_num_width,
3882                        y: area.y,
3883                        width: locked_width - row_num_width,
3884                        height: area.height,
3885                    }
3886                } else {
3887                    locked_area
3888                };
3889
3890                // Slice buffer to visible portion
3891                let offset = state.start_row.saturating_sub(state.buffered_start_row);
3892                let slice_len = state
3893                    .visible_rows
3894                    .min(locked_df.height().saturating_sub(offset));
3895                if offset < locked_df.height() && slice_len > 0 {
3896                    let sliced_df = locked_df.slice(offset as i64, slice_len);
3897                    self.render_dataframe(
3898                        &sliced_df,
3899                        adjusted_locked_area,
3900                        buf,
3901                        &mut state.table_state,
3902                        false,
3903                        state.start_row,
3904                    );
3905                }
3906            }
3907
3908            // Draw vertical separator line
3909            let separator_x_adjusted = if state.row_numbers {
3910                area.x + row_num_width + (locked_width - row_num_width)
3911            } else {
3912                separator_x
3913            };
3914            for y in area.y..area.y + area.height {
3915                let cell = &mut buf[(separator_x_adjusted, y)];
3916                cell.set_char('│');
3917                cell.set_style(Style::default().fg(self.separator_fg));
3918            }
3919
3920            // Adjust scrollable area to account for row numbers
3921            let adjusted_scrollable_area = if state.row_numbers {
3922                Rect {
3923                    x: separator_x_adjusted + 1,
3924                    y: area.y,
3925                    width: area.width.saturating_sub(locked_width + 1),
3926                    height: area.height,
3927                }
3928            } else {
3929                scrollable_area
3930            };
3931
3932            // Render scrollable columns
3933            if let Some(df) = state.df.as_ref() {
3934                // Slice buffer to visible portion
3935                let offset = state.start_row.saturating_sub(state.buffered_start_row);
3936                let slice_len = state.visible_rows.min(df.height().saturating_sub(offset));
3937                if offset < df.height() && slice_len > 0 {
3938                    let sliced_df = df.slice(offset as i64, slice_len);
3939                    self.render_dataframe(
3940                        &sliced_df,
3941                        adjusted_scrollable_area,
3942                        buf,
3943                        &mut state.table_state,
3944                        false,
3945                        state.start_row,
3946                    );
3947                }
3948            }
3949        } else if let Some(df) = state.df.as_ref() {
3950            // No locked columns, render normally
3951            // If row numbers are enabled, render them first
3952            if state.row_numbers {
3953                let row_num_area = Rect {
3954                    x: area.x,
3955                    y: area.y,
3956                    width: row_num_width,
3957                    height: area.height,
3958                };
3959                self.render_row_numbers(
3960                    row_num_area,
3961                    buf,
3962                    RowNumbersParams {
3963                        start_row: state.start_row,
3964                        visible_rows: state.visible_rows,
3965                        num_rows: state.num_rows,
3966                        row_start_index: state.row_start_index,
3967                        selected_row: state.table_state.selected(),
3968                    },
3969                );
3970
3971                // Adjust data area to exclude row number column
3972                let data_area = Rect {
3973                    x: area.x + row_num_width,
3974                    y: area.y,
3975                    width: area.width.saturating_sub(row_num_width),
3976                    height: area.height,
3977                };
3978
3979                // Slice buffer to visible portion
3980                let offset = state.start_row.saturating_sub(state.buffered_start_row);
3981                let slice_len = state.visible_rows.min(df.height().saturating_sub(offset));
3982                if offset < df.height() && slice_len > 0 {
3983                    let sliced_df = df.slice(offset as i64, slice_len);
3984                    self.render_dataframe(
3985                        &sliced_df,
3986                        data_area,
3987                        buf,
3988                        &mut state.table_state,
3989                        false,
3990                        state.start_row,
3991                    );
3992                }
3993            } else {
3994                // Slice buffer to visible portion
3995                let offset = state.start_row.saturating_sub(state.buffered_start_row);
3996                let slice_len = state.visible_rows.min(df.height().saturating_sub(offset));
3997                if offset < df.height() && slice_len > 0 {
3998                    let sliced_df = df.slice(offset as i64, slice_len);
3999                    self.render_dataframe(
4000                        &sliced_df,
4001                        area,
4002                        buf,
4003                        &mut state.table_state,
4004                        false,
4005                        state.start_row,
4006                    );
4007                }
4008            }
4009        } else if !state.column_order.is_empty() {
4010            // Empty result (0 rows) but we have a schema - show empty table with header, no rows
4011            let empty_columns: Vec<_> = state
4012                .column_order
4013                .iter()
4014                .map(|name| Series::new(name.as_str().into(), Vec::<String>::new()).into())
4015                .collect();
4016            if let Ok(empty_df) = DataFrame::new(empty_columns) {
4017                if state.row_numbers {
4018                    let row_num_area = Rect {
4019                        x: area.x,
4020                        y: area.y,
4021                        width: row_num_width,
4022                        height: area.height,
4023                    };
4024                    self.render_row_numbers(
4025                        row_num_area,
4026                        buf,
4027                        RowNumbersParams {
4028                            start_row: 0,
4029                            visible_rows: state.visible_rows,
4030                            num_rows: 0,
4031                            row_start_index: state.row_start_index,
4032                            selected_row: None,
4033                        },
4034                    );
4035                    let data_area = Rect {
4036                        x: area.x + row_num_width,
4037                        y: area.y,
4038                        width: area.width.saturating_sub(row_num_width),
4039                        height: area.height,
4040                    };
4041                    self.render_dataframe(
4042                        &empty_df,
4043                        data_area,
4044                        buf,
4045                        &mut state.table_state,
4046                        false,
4047                        0,
4048                    );
4049                } else {
4050                    self.render_dataframe(&empty_df, area, buf, &mut state.table_state, false, 0);
4051                }
4052            } else {
4053                Paragraph::new("No data").render(area, buf);
4054            }
4055        } else {
4056            // Truly empty: no schema, not loaded, or blank file
4057            Paragraph::new("No data").render(area, buf);
4058        }
4059    }
4060}
4061
4062#[cfg(test)]
4063mod tests {
4064    use super::*;
4065    use crate::filter_modal::{FilterOperator, FilterStatement, LogicalOperator};
4066    use crate::pivot_melt_modal::{MeltSpec, PivotAggregation, PivotSpec};
4067
4068    fn create_test_lf() -> LazyFrame {
4069        df! (
4070            "a" => &[1, 2, 3],
4071            "b" => &["x", "y", "z"]
4072        )
4073        .unwrap()
4074        .lazy()
4075    }
4076
4077    fn create_large_test_lf() -> LazyFrame {
4078        df! (
4079            "a" => (0..100).collect::<Vec<i32>>(),
4080            "b" => (0..100).map(|i| format!("text_{}", i)).collect::<Vec<String>>(),
4081            "c" => (0..100).map(|i| i % 3).collect::<Vec<i32>>(),
4082            "d" => (0..100).map(|i| i % 5).collect::<Vec<i32>>()
4083        )
4084        .unwrap()
4085        .lazy()
4086    }
4087
4088    #[test]
4089    fn test_from_csv() {
4090        // Ensure sample data is generated before running test
4091        // Test uncompressed CSV loading
4092        let path = crate::tests::sample_data_dir().join("3-sfd-header.csv");
4093        let state = DataTableState::from_csv(&path, &Default::default()).unwrap(); // Uses default buffer params from options
4094        assert_eq!(state.schema.len(), 6); // id, integer_col, float_col, string_col, boolean_col, date_col
4095    }
4096
4097    #[test]
4098    fn test_from_csv_gzipped() {
4099        // Ensure sample data is generated before running test
4100        // Test gzipped CSV loading
4101        let path = crate::tests::sample_data_dir().join("mixed_types.csv.gz");
4102        let state = DataTableState::from_csv(&path, &Default::default()).unwrap(); // Uses default buffer params from options
4103        assert_eq!(state.schema.len(), 6); // id, integer_col, float_col, string_col, boolean_col, date_col
4104    }
4105
4106    #[test]
4107    fn test_from_parquet() {
4108        // Ensure sample data is generated before running test
4109        let path = crate::tests::sample_data_dir().join("people.parquet");
4110        let state = DataTableState::from_parquet(&path, None, None, None, None, false, 1).unwrap();
4111        assert!(!state.schema.is_empty());
4112    }
4113
4114    #[test]
4115    fn test_from_ipc() {
4116        use polars::prelude::IpcWriter;
4117        use std::io::BufWriter;
4118        let mut df = df!(
4119            "x" => &[1_i32, 2, 3],
4120            "y" => &["a", "b", "c"]
4121        )
4122        .unwrap();
4123        let dir = std::env::temp_dir();
4124        let path = dir.join("datui_test_ipc.arrow");
4125        let file = std::fs::File::create(&path).unwrap();
4126        let mut writer = BufWriter::new(file);
4127        IpcWriter::new(&mut writer).finish(&mut df).unwrap();
4128        drop(writer);
4129        let state = DataTableState::from_ipc(&path, None, None, None, None, false, 1).unwrap();
4130        assert_eq!(state.schema.len(), 2);
4131        assert!(state.schema.contains("x"));
4132        assert!(state.schema.contains("y"));
4133        let _ = std::fs::remove_file(&path);
4134    }
4135
4136    #[test]
4137    fn test_from_avro() {
4138        use polars::io::avro::AvroWriter;
4139        use std::io::BufWriter;
4140        let mut df = df!(
4141            "id" => &[1_i32, 2, 3],
4142            "name" => &["alice", "bob", "carol"]
4143        )
4144        .unwrap();
4145        let dir = std::env::temp_dir();
4146        let path = dir.join("datui_test_avro.avro");
4147        let file = std::fs::File::create(&path).unwrap();
4148        let mut writer = BufWriter::new(file);
4149        AvroWriter::new(&mut writer).finish(&mut df).unwrap();
4150        drop(writer);
4151        let state = DataTableState::from_avro(&path, None, None, None, None, false, 1).unwrap();
4152        assert_eq!(state.schema.len(), 2);
4153        assert!(state.schema.contains("id"));
4154        assert!(state.schema.contains("name"));
4155        let _ = std::fs::remove_file(&path);
4156    }
4157
4158    #[test]
4159    fn test_from_orc() {
4160        use arrow::array::{Int64Array, StringArray};
4161        use arrow::datatypes::{DataType, Field, Schema};
4162        use arrow::record_batch::RecordBatch;
4163        use orc_rust::ArrowWriterBuilder;
4164        use std::io::BufWriter;
4165        use std::sync::Arc;
4166
4167        let schema = Arc::new(Schema::new(vec![
4168            Field::new("id", DataType::Int64, false),
4169            Field::new("name", DataType::Utf8, false),
4170        ]));
4171        let id_array = Arc::new(Int64Array::from(vec![1_i64, 2, 3]));
4172        let name_array = Arc::new(StringArray::from(vec!["a", "b", "c"]));
4173        let batch = RecordBatch::try_new(schema.clone(), vec![id_array, name_array]).unwrap();
4174
4175        let dir = std::env::temp_dir();
4176        let path = dir.join("datui_test_orc.orc");
4177        let file = std::fs::File::create(&path).unwrap();
4178        let writer = BufWriter::new(file);
4179        let mut orc_writer = ArrowWriterBuilder::new(writer, schema).try_build().unwrap();
4180        orc_writer.write(&batch).unwrap();
4181        orc_writer.close().unwrap();
4182
4183        let state = DataTableState::from_orc(&path, None, None, None, None, false, 1).unwrap();
4184        assert_eq!(state.schema.len(), 2);
4185        assert!(state.schema.contains("id"));
4186        assert!(state.schema.contains("name"));
4187        let _ = std::fs::remove_file(&path);
4188    }
4189
4190    #[test]
4191    fn test_filter() {
4192        let lf = create_test_lf();
4193        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4194        let filters = vec![FilterStatement {
4195            column: "a".to_string(),
4196            operator: FilterOperator::Gt,
4197            value: "2".to_string(),
4198            logical_op: LogicalOperator::And,
4199        }];
4200        state.filter(filters);
4201        let df = state.lf.clone().collect().unwrap();
4202        assert_eq!(df.shape().0, 1);
4203        assert_eq!(df.column("a").unwrap().get(0).unwrap(), AnyValue::Int32(3));
4204    }
4205
4206    #[test]
4207    fn test_sort() {
4208        let lf = create_test_lf();
4209        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4210        state.sort(vec!["a".to_string()], false);
4211        let df = state.lf.clone().collect().unwrap();
4212        assert_eq!(df.column("a").unwrap().get(0).unwrap(), AnyValue::Int32(3));
4213    }
4214
4215    #[test]
4216    fn test_query() {
4217        let lf = create_test_lf();
4218        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4219        state.query("select b where a = 2".to_string());
4220        let df = state.lf.clone().collect().unwrap();
4221        assert_eq!(df.shape(), (1, 1));
4222        assert_eq!(
4223            df.column("b").unwrap().get(0).unwrap(),
4224            AnyValue::String("y")
4225        );
4226    }
4227
4228    #[test]
4229    fn test_query_date_accessors() {
4230        use chrono::NaiveDate;
4231        let df = df!(
4232            "event_date" => [
4233                NaiveDate::from_ymd_opt(2024, 1, 15).unwrap(),
4234                NaiveDate::from_ymd_opt(2024, 6, 20).unwrap(),
4235                NaiveDate::from_ymd_opt(2024, 12, 31).unwrap(),
4236            ],
4237            "name" => &["a", "b", "c"],
4238        )
4239        .unwrap();
4240        let lf = df.lazy();
4241        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4242
4243        // Select with date accessors
4244        state.query("select name, year: event_date.year, month: event_date.month".to_string());
4245        assert!(
4246            state.error.is_none(),
4247            "query should succeed: {:?}",
4248            state.error
4249        );
4250        let df = state.lf.clone().collect().unwrap();
4251        assert_eq!(df.shape(), (3, 3));
4252        assert_eq!(
4253            df.column("year").unwrap().get(0).unwrap(),
4254            AnyValue::Int32(2024)
4255        );
4256        assert_eq!(
4257            df.column("month").unwrap().get(0).unwrap(),
4258            AnyValue::Int8(1)
4259        );
4260        assert_eq!(
4261            df.column("month").unwrap().get(1).unwrap(),
4262            AnyValue::Int8(6)
4263        );
4264
4265        // Filter with date accessor
4266        state.query("select name, event_date where event_date.month = 12".to_string());
4267        assert!(
4268            state.error.is_none(),
4269            "filter should succeed: {:?}",
4270            state.error
4271        );
4272        let df = state.lf.clone().collect().unwrap();
4273        assert_eq!(df.height(), 1);
4274        assert_eq!(
4275            df.column("name").unwrap().get(0).unwrap(),
4276            AnyValue::String("c")
4277        );
4278
4279        // Filter with YYYY.MM.DD date literal
4280        state.query("select name, event_date where event_date.date > 2024.06.15".to_string());
4281        assert!(
4282            state.error.is_none(),
4283            "date literal filter should succeed: {:?}",
4284            state.error
4285        );
4286        let df = state.lf.clone().collect().unwrap();
4287        assert_eq!(
4288            df.height(),
4289            2,
4290            "2024-06-20 and 2024-12-31 are after 2024-06-15"
4291        );
4292
4293        // String accessors: upper, lower, len, ends_with
4294        state.query(
4295            "select name, upper_name: name.upper, name_len: name.len where name.ends_with[\"c\"]"
4296                .to_string(),
4297        );
4298        assert!(
4299            state.error.is_none(),
4300            "string accessors should succeed: {:?}",
4301            state.error
4302        );
4303        let df = state.lf.clone().collect().unwrap();
4304        assert_eq!(df.height(), 1, "only 'c' ends with 'c'");
4305        assert_eq!(
4306            df.column("upper_name").unwrap().get(0).unwrap(),
4307            AnyValue::String("C")
4308        );
4309
4310        // Query that returns 0 rows: df and locked_df must be cleared for correct empty-table render
4311        state.query("select where event_date.date = 2020.01.01".to_string());
4312        assert!(state.error.is_none());
4313        assert_eq!(state.num_rows, 0);
4314        state.visible_rows = 10;
4315        state.collect();
4316        assert!(state.df.is_none(), "df must be cleared when num_rows is 0");
4317        assert!(
4318            state.locked_df.is_none(),
4319            "locked_df must be cleared when num_rows is 0"
4320        );
4321    }
4322
4323    #[test]
4324    fn test_select_next_previous() {
4325        let lf = create_large_test_lf();
4326        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4327        state.visible_rows = 10;
4328        state.table_state.select(Some(5));
4329
4330        state.select_next();
4331        assert_eq!(state.table_state.selected(), Some(6));
4332
4333        state.select_previous();
4334        assert_eq!(state.table_state.selected(), Some(5));
4335    }
4336
4337    #[test]
4338    fn test_page_up_down() {
4339        let lf = create_large_test_lf();
4340        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4341        state.visible_rows = 20;
4342        state.collect();
4343
4344        assert_eq!(state.start_row, 0);
4345        state.page_down();
4346        assert_eq!(state.start_row, 20);
4347        state.page_down();
4348        assert_eq!(state.start_row, 40);
4349        state.page_up();
4350        assert_eq!(state.start_row, 20);
4351        state.page_up();
4352        assert_eq!(state.start_row, 0);
4353    }
4354
4355    #[test]
4356    fn test_scroll_left_right() {
4357        let lf = create_large_test_lf();
4358        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4359        assert_eq!(state.termcol_index, 0);
4360        state.scroll_right();
4361        assert_eq!(state.termcol_index, 1);
4362        state.scroll_right();
4363        assert_eq!(state.termcol_index, 2);
4364        state.scroll_left();
4365        assert_eq!(state.termcol_index, 1);
4366        state.scroll_left();
4367        assert_eq!(state.termcol_index, 0);
4368    }
4369
4370    #[test]
4371    fn test_reverse() {
4372        let lf = create_test_lf();
4373        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4374        state.sort(vec!["a".to_string()], true);
4375        assert_eq!(
4376            state
4377                .lf
4378                .clone()
4379                .collect()
4380                .unwrap()
4381                .column("a")
4382                .unwrap()
4383                .get(0)
4384                .unwrap(),
4385            AnyValue::Int32(1)
4386        );
4387        state.reverse();
4388        assert_eq!(
4389            state
4390                .lf
4391                .clone()
4392                .collect()
4393                .unwrap()
4394                .column("a")
4395                .unwrap()
4396                .get(0)
4397                .unwrap(),
4398            AnyValue::Int32(3)
4399        );
4400    }
4401
4402    #[test]
4403    fn test_filter_multiple() {
4404        let lf = create_large_test_lf();
4405        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4406        let filters = vec![
4407            FilterStatement {
4408                column: "c".to_string(),
4409                operator: FilterOperator::Eq,
4410                value: "1".to_string(),
4411                logical_op: LogicalOperator::And,
4412            },
4413            FilterStatement {
4414                column: "d".to_string(),
4415                operator: FilterOperator::Eq,
4416                value: "2".to_string(),
4417                logical_op: LogicalOperator::And,
4418            },
4419        ];
4420        state.filter(filters);
4421        let df = state.lf.clone().collect().unwrap();
4422        assert_eq!(df.shape().0, 7);
4423    }
4424
4425    #[test]
4426    fn test_filter_and_sort() {
4427        let lf = create_large_test_lf();
4428        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4429        let filters = vec![FilterStatement {
4430            column: "c".to_string(),
4431            operator: FilterOperator::Eq,
4432            value: "1".to_string(),
4433            logical_op: LogicalOperator::And,
4434        }];
4435        state.filter(filters);
4436        state.sort(vec!["a".to_string()], false);
4437        let df = state.lf.clone().collect().unwrap();
4438        assert_eq!(df.column("a").unwrap().get(0).unwrap(), AnyValue::Int32(97));
4439    }
4440
4441    /// Minimal long-format data for pivot tests: id, date, key, value.
4442    /// Includes duplicates for aggregation (e.g. (1,d1,A) appears twice).
4443    fn create_pivot_long_lf() -> LazyFrame {
4444        let df = df!(
4445            "id" => &[1_i32, 1, 1, 2, 2, 2, 1, 2],
4446            "date" => &["d1", "d1", "d1", "d1", "d1", "d1", "d1", "d1"],
4447            "key" => &["A", "B", "C", "A", "B", "C", "A", "B"],
4448            "value" => &[10.0_f64, 20.0, 30.0, 40.0, 50.0, 60.0, 11.0, 51.0],
4449        )
4450        .unwrap();
4451        df.lazy()
4452    }
4453
4454    /// Wide-format data for melt tests: id, date, c1, c2, c3.
4455    fn create_melt_wide_lf() -> LazyFrame {
4456        let df = df!(
4457            "id" => &[1_i32, 2, 3],
4458            "date" => &["d1", "d2", "d3"],
4459            "c1" => &[10.0_f64, 20.0, 30.0],
4460            "c2" => &[11.0, 21.0, 31.0],
4461            "c3" => &[12.0, 22.0, 32.0],
4462        )
4463        .unwrap();
4464        df.lazy()
4465    }
4466
4467    #[test]
4468    fn test_pivot_basic() {
4469        let lf = create_pivot_long_lf();
4470        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4471        let spec = PivotSpec {
4472            index: vec!["id".to_string(), "date".to_string()],
4473            pivot_column: "key".to_string(),
4474            value_column: "value".to_string(),
4475            aggregation: PivotAggregation::Last,
4476            sort_columns: None,
4477        };
4478        state.pivot(&spec).unwrap();
4479        let df = state.lf.clone().collect().unwrap();
4480        let names: Vec<&str> = df.get_column_names().iter().map(|s| s.as_str()).collect();
4481        assert!(names.contains(&"id"));
4482        assert!(names.contains(&"date"));
4483        assert!(names.contains(&"A"));
4484        assert!(names.contains(&"B"));
4485        assert!(names.contains(&"C"));
4486        assert_eq!(df.height(), 2);
4487    }
4488
4489    #[test]
4490    fn test_pivot_aggregation_last() {
4491        let lf = create_pivot_long_lf();
4492        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4493        let spec = PivotSpec {
4494            index: vec!["id".to_string(), "date".to_string()],
4495            pivot_column: "key".to_string(),
4496            value_column: "value".to_string(),
4497            aggregation: PivotAggregation::Last,
4498            sort_columns: None,
4499        };
4500        state.pivot(&spec).unwrap();
4501        let df = state.lf.clone().collect().unwrap();
4502        let a_col = df.column("A").unwrap();
4503        let row0 = a_col.get(0).unwrap();
4504        let row1 = a_col.get(1).unwrap();
4505        assert_eq!(row0, AnyValue::Float64(11.0));
4506        assert_eq!(row1, AnyValue::Float64(40.0));
4507    }
4508
4509    #[test]
4510    fn test_pivot_aggregation_first() {
4511        let lf = create_pivot_long_lf();
4512        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4513        let spec = PivotSpec {
4514            index: vec!["id".to_string(), "date".to_string()],
4515            pivot_column: "key".to_string(),
4516            value_column: "value".to_string(),
4517            aggregation: PivotAggregation::First,
4518            sort_columns: None,
4519        };
4520        state.pivot(&spec).unwrap();
4521        let df = state.lf.clone().collect().unwrap();
4522        let a_col = df.column("A").unwrap();
4523        assert_eq!(a_col.get(0).unwrap(), AnyValue::Float64(10.0));
4524        assert_eq!(a_col.get(1).unwrap(), AnyValue::Float64(40.0));
4525    }
4526
4527    #[test]
4528    fn test_pivot_aggregation_min_max() {
4529        let lf = create_pivot_long_lf();
4530        let mut state_min = DataTableState::new(lf.clone(), None, None, None, None, true).unwrap();
4531        state_min
4532            .pivot(&PivotSpec {
4533                index: vec!["id".to_string(), "date".to_string()],
4534                pivot_column: "key".to_string(),
4535                value_column: "value".to_string(),
4536                aggregation: PivotAggregation::Min,
4537                sort_columns: None,
4538            })
4539            .unwrap();
4540        let df_min = state_min.lf.clone().collect().unwrap();
4541        assert_eq!(
4542            df_min.column("A").unwrap().get(0).unwrap(),
4543            AnyValue::Float64(10.0)
4544        );
4545
4546        let mut state_max = DataTableState::new(lf, None, None, None, None, true).unwrap();
4547        state_max
4548            .pivot(&PivotSpec {
4549                index: vec!["id".to_string(), "date".to_string()],
4550                pivot_column: "key".to_string(),
4551                value_column: "value".to_string(),
4552                aggregation: PivotAggregation::Max,
4553                sort_columns: None,
4554            })
4555            .unwrap();
4556        let df_max = state_max.lf.clone().collect().unwrap();
4557        assert_eq!(
4558            df_max.column("A").unwrap().get(0).unwrap(),
4559            AnyValue::Float64(11.0)
4560        );
4561    }
4562
4563    #[test]
4564    fn test_pivot_aggregation_avg_count() {
4565        let lf = create_pivot_long_lf();
4566        let mut state_avg = DataTableState::new(lf.clone(), None, None, None, None, true).unwrap();
4567        state_avg
4568            .pivot(&PivotSpec {
4569                index: vec!["id".to_string(), "date".to_string()],
4570                pivot_column: "key".to_string(),
4571                value_column: "value".to_string(),
4572                aggregation: PivotAggregation::Avg,
4573                sort_columns: None,
4574            })
4575            .unwrap();
4576        let df_avg = state_avg.lf.clone().collect().unwrap();
4577        let a = df_avg.column("A").unwrap().get(0).unwrap();
4578        if let AnyValue::Float64(x) = a {
4579            assert!((x - 10.5).abs() < 1e-6);
4580        } else {
4581            panic!("expected float");
4582        }
4583
4584        let mut state_count = DataTableState::new(lf, None, None, None, None, true).unwrap();
4585        state_count
4586            .pivot(&PivotSpec {
4587                index: vec!["id".to_string(), "date".to_string()],
4588                pivot_column: "key".to_string(),
4589                value_column: "value".to_string(),
4590                aggregation: PivotAggregation::Count,
4591                sort_columns: None,
4592            })
4593            .unwrap();
4594        let df_count = state_count.lf.clone().collect().unwrap();
4595        let a = df_count.column("A").unwrap().get(0).unwrap();
4596        assert_eq!(a, AnyValue::UInt32(2));
4597    }
4598
4599    #[test]
4600    fn test_pivot_string_first_last() {
4601        let df = df!(
4602            "id" => &[1_i32, 1, 2, 2],
4603            "key" => &["X", "Y", "X", "Y"],
4604            "value" => &["low", "mid", "high", "mid"],
4605        )
4606        .unwrap();
4607        let lf = df.lazy();
4608        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4609        let spec = PivotSpec {
4610            index: vec!["id".to_string()],
4611            pivot_column: "key".to_string(),
4612            value_column: "value".to_string(),
4613            aggregation: PivotAggregation::Last,
4614            sort_columns: None,
4615        };
4616        state.pivot(&spec).unwrap();
4617        let out = state.lf.clone().collect().unwrap();
4618        assert_eq!(
4619            out.column("X").unwrap().get(0).unwrap(),
4620            AnyValue::String("low")
4621        );
4622        assert_eq!(
4623            out.column("Y").unwrap().get(0).unwrap(),
4624            AnyValue::String("mid")
4625        );
4626    }
4627
4628    #[test]
4629    fn test_melt_basic() {
4630        let lf = create_melt_wide_lf();
4631        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4632        let spec = MeltSpec {
4633            index: vec!["id".to_string(), "date".to_string()],
4634            value_columns: vec!["c1".to_string(), "c2".to_string(), "c3".to_string()],
4635            variable_name: "variable".to_string(),
4636            value_name: "value".to_string(),
4637        };
4638        state.melt(&spec).unwrap();
4639        let df = state.lf.clone().collect().unwrap();
4640        assert_eq!(df.height(), 9);
4641        let names: Vec<&str> = df.get_column_names().iter().map(|s| s.as_str()).collect();
4642        assert!(names.contains(&"variable"));
4643        assert!(names.contains(&"value"));
4644        assert!(names.contains(&"id"));
4645        assert!(names.contains(&"date"));
4646    }
4647
4648    #[test]
4649    fn test_melt_all_except_index() {
4650        let lf = create_melt_wide_lf();
4651        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4652        let spec = MeltSpec {
4653            index: vec!["id".to_string(), "date".to_string()],
4654            value_columns: vec!["c1".to_string(), "c2".to_string(), "c3".to_string()],
4655            variable_name: "var".to_string(),
4656            value_name: "val".to_string(),
4657        };
4658        state.melt(&spec).unwrap();
4659        let df = state.lf.clone().collect().unwrap();
4660        assert!(df.column("var").is_ok());
4661        assert!(df.column("val").is_ok());
4662    }
4663
4664    #[test]
4665    fn test_pivot_on_current_view_after_filter() {
4666        let lf = create_pivot_long_lf();
4667        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4668        state.filter(vec![FilterStatement {
4669            column: "id".to_string(),
4670            operator: FilterOperator::Eq,
4671            value: "1".to_string(),
4672            logical_op: LogicalOperator::And,
4673        }]);
4674        let spec = PivotSpec {
4675            index: vec!["id".to_string(), "date".to_string()],
4676            pivot_column: "key".to_string(),
4677            value_column: "value".to_string(),
4678            aggregation: PivotAggregation::Last,
4679            sort_columns: None,
4680        };
4681        state.pivot(&spec).unwrap();
4682        let df = state.lf.clone().collect().unwrap();
4683        assert_eq!(df.height(), 1);
4684        let id_col = df.column("id").unwrap();
4685        assert_eq!(id_col.get(0).unwrap(), AnyValue::Int32(1));
4686    }
4687
4688    #[test]
4689    fn test_fuzzy_token_regex() {
4690        assert_eq!(fuzzy_token_regex("foo"), "(?i).*f.*o.*o.*");
4691        assert_eq!(fuzzy_token_regex("a"), "(?i).*a.*");
4692        // Regex-special characters are escaped
4693        let pat = fuzzy_token_regex("[");
4694        assert!(pat.contains("\\["));
4695    }
4696
4697    #[test]
4698    fn test_fuzzy_search() {
4699        // Filter logic is covered by test_fuzzy_search_regex_direct. This test runs the full
4700        // path through DataTableState; it requires sample data (CSV with string column).
4701        crate::tests::ensure_sample_data();
4702        let path = crate::tests::sample_data_dir().join("3-sfd-header.csv");
4703        let mut state = DataTableState::from_csv(&path, &Default::default()).unwrap();
4704        state.visible_rows = 10;
4705        state.collect();
4706        let before = state.num_rows;
4707        state.fuzzy_search("string".to_string());
4708        assert!(state.error.is_none(), "{:?}", state.error);
4709        assert!(state.num_rows <= before, "fuzzy search should filter rows");
4710        state.fuzzy_search("".to_string());
4711        state.collect();
4712        assert_eq!(state.num_rows, before, "empty fuzzy search should reset");
4713        assert!(state.get_active_fuzzy_query().is_empty());
4714    }
4715
4716    #[test]
4717    fn test_fuzzy_search_regex_direct() {
4718        // Sanity check: Polars str().contains with our regex matches "alice" for pattern ".*a.*l.*i.*"
4719        let lf = df!("name" => &["alice", "bob", "carol"]).unwrap().lazy();
4720        let pattern = fuzzy_token_regex("alice");
4721        let out = lf
4722            .filter(col("name").str().contains(lit(pattern.clone()), false))
4723            .collect()
4724            .unwrap();
4725        assert_eq!(out.height(), 1, "regex {:?} should match alice", pattern);
4726
4727        // Two columns OR (as in fuzzy_search)
4728        let lf2 = df!(
4729            "id" => &[1i32, 2, 3],
4730            "name" => &["alice", "bob", "carol"],
4731            "city" => &["NYC", "LA", "Boston"]
4732        )
4733        .unwrap()
4734        .lazy();
4735        let pat = fuzzy_token_regex("alice");
4736        let expr = col("name")
4737            .str()
4738            .contains(lit(pat.clone()), false)
4739            .or(col("city").str().contains(lit(pat), false));
4740        let out2 = lf2.clone().filter(expr).collect().unwrap();
4741        assert_eq!(out2.height(), 1);
4742
4743        // Replicate exact fuzzy_search logic: schema from original_lf, string_cols, then filter
4744        let schema = lf2.clone().collect_schema().unwrap();
4745        let string_cols: Vec<String> = schema
4746            .iter()
4747            .filter(|(_, dtype)| dtype.is_string())
4748            .map(|(name, _)| name.to_string())
4749            .collect();
4750        assert!(
4751            !string_cols.is_empty(),
4752            "df! string cols should be detected"
4753        );
4754        let pattern = fuzzy_token_regex("alice");
4755        let token_expr = string_cols
4756            .iter()
4757            .map(|c| col(c.as_str()).str().contains(lit(pattern.clone()), false))
4758            .reduce(|a, b| a.or(b))
4759            .unwrap();
4760        let out3 = lf2.filter(token_expr).collect().unwrap();
4761        assert_eq!(
4762            out3.height(),
4763            1,
4764            "fuzzy_search-style filter should match 1 row"
4765        );
4766    }
4767
4768    #[test]
4769    fn test_fuzzy_search_no_string_columns() {
4770        let lf = df!("a" => &[1i32, 2, 3], "b" => &[10i64, 20, 30])
4771            .unwrap()
4772            .lazy();
4773        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4774        state.fuzzy_search("x".to_string());
4775        assert!(state.error.is_some());
4776    }
4777
4778    /// By-queries must produce results sorted by the group columns (age_group, then team)
4779    /// so that output order is deterministic and practical. Raw data is deliberately out of order.
4780    #[test]
4781    fn test_by_query_result_sorted_by_group_columns() {
4782        // Build a small table: age_group (1-5, out of order), team (Red/Blue/Green), score (0-100)
4783        let df = df!(
4784            "age_group" => &[3i64, 1, 5, 2, 4, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5],
4785            "team" => &[
4786                "Red", "Blue", "Green", "Red", "Blue", "Green", "Green", "Red", "Blue",
4787                "Green", "Red", "Blue", "Red", "Blue", "Green",
4788            ],
4789            "score" => &[50.0f64, 10.0, 90.0, 20.0, 30.0, 40.0, 60.0, 70.0, 80.0, 15.0, 25.0, 35.0, 45.0, 55.0, 65.0],
4790        )
4791        .unwrap();
4792        let lf = df.lazy();
4793        let options = crate::OpenOptions::default();
4794        let mut state = DataTableState::from_lazyframe(lf, &options).unwrap();
4795        state.query("select avg score by age_group, team".to_string());
4796        assert!(
4797            state.error.is_none(),
4798            "query should succeed: {:?}",
4799            state.error
4800        );
4801        let result = state.lf.collect().unwrap();
4802        // Result must be sorted by group columns (age_group, then team)
4803        let sorted = result
4804            .sort(
4805                ["age_group", "team"],
4806                SortMultipleOptions::default().with_order_descending(false),
4807            )
4808            .unwrap();
4809        assert_eq!(
4810            result, sorted,
4811            "by-query result must be sorted by (age_group, team)"
4812        );
4813    }
4814
4815    /// Computed group keys (e.g. Fare: 1+floor Fare % 25) must be sorted by their result column
4816    /// values, not by re-evaluating the expression on the result.
4817    #[test]
4818    fn test_by_query_computed_group_key_sorted_by_result_column() {
4819        let df = df!(
4820            "x" => &[7.0f64, 12.0, 3.0, 22.0, 17.0, 8.0],
4821            "v" => &[1.0f64, 2.0, 3.0, 4.0, 5.0, 6.0],
4822        )
4823        .unwrap();
4824        let lf = df.lazy();
4825        let options = crate::OpenOptions::default();
4826        let mut state = DataTableState::from_lazyframe(lf, &options).unwrap();
4827        // bucket: 1+floor(x)%3 -> values 1,2,3; raw x order 7,12,3,22,17,8 -> buckets 2,2,1,2,2,2
4828        state.query("select sum v by bucket: 1+floor x % 3".to_string());
4829        assert!(
4830            state.error.is_none(),
4831            "query should succeed: {:?}",
4832            state.error
4833        );
4834        let result = state.lf.collect().unwrap();
4835        let bucket = result.column("bucket").unwrap();
4836        // Must be sorted by bucket (1, 2, 3)
4837        for i in 1..result.height() {
4838            let prev: i64 = bucket.get(i - 1).unwrap().try_extract().unwrap_or(0);
4839            let curr: i64 = bucket.get(i).unwrap().try_extract().unwrap_or(0);
4840            assert!(
4841                curr >= prev,
4842                "bucket column must be sorted: {} then {}",
4843                prev,
4844                curr
4845            );
4846        }
4847    }
4848}