Skip to main content

datui_lib/widgets/
datatable.rs

1use color_eyre::Result;
2use std::borrow::Cow;
3use std::collections::HashSet;
4use std::sync::Arc;
5use std::{fs, fs::File, path::Path};
6
7use polars::io::HiveOptions;
8use polars::prelude::*;
9use ratatui::{
10    buffer::Buffer,
11    layout::Rect,
12    style::{Color, Modifier, Style},
13    text::{Line, Span},
14    widgets::{
15        Block, Borders, Cell, Padding, Paragraph, Row, StatefulWidget, Table, TableState, Widget,
16    },
17};
18
19use crate::error_display::user_message_from_polars;
20use crate::filter_modal::{FilterOperator, FilterStatement, LogicalOperator};
21use crate::pivot_melt_modal::{MeltSpec, PivotAggregation, PivotSpec};
22use crate::query::parse_query;
23use crate::statistics::collect_lazy;
24use crate::{CompressionFormat, OpenOptions};
25use polars::lazy::frame::pivot::pivot_stable;
26use std::io::{BufReader, Read};
27
28use calamine::{open_workbook_auto, Data, Reader};
29use chrono::{NaiveDate, NaiveDateTime, NaiveTime};
30use orc_rust::ArrowReaderBuilder;
31use tempfile::NamedTempFile;
32
33use arrow::array::types::{
34    Date32Type, Date64Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type,
35    TimestampMillisecondType, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
36};
37use arrow::array::{Array, AsArray};
38use arrow::record_batch::RecordBatch;
39
40fn pivot_agg_expr(agg: PivotAggregation) -> Result<Expr> {
41    let e = col(PlSmallStr::from_static(""));
42    let expr = match agg {
43        PivotAggregation::Last => e.last(),
44        PivotAggregation::First => e.first(),
45        PivotAggregation::Min => e.min(),
46        PivotAggregation::Max => e.max(),
47        PivotAggregation::Avg => e.mean(),
48        PivotAggregation::Med => e.median(),
49        PivotAggregation::Std => e.std(1),
50        PivotAggregation::Count => e.len(),
51    };
52    Ok(expr)
53}
54
55pub struct DataTableState {
56    pub lf: LazyFrame,
57    original_lf: LazyFrame,
58    df: Option<DataFrame>,        // Scrollable columns dataframe
59    locked_df: Option<DataFrame>, // Locked columns dataframe
60    pub table_state: TableState,
61    pub start_row: usize,
62    pub visible_rows: usize,
63    pub termcol_index: usize,
64    pub visible_termcols: usize,
65    pub error: Option<PolarsError>,
66    pub suppress_error_display: bool, // When true, don't show errors in main view (e.g., when query input is active)
67    pub schema: Arc<Schema>,
68    pub num_rows: usize,
69    /// When true, collect() skips the len() query.
70    num_rows_valid: bool,
71    filters: Vec<FilterStatement>,
72    sort_columns: Vec<String>,
73    sort_ascending: bool,
74    pub active_query: String,
75    /// Last executed SQL (Sql tab). Independent from active_query; only one applies to current view.
76    pub active_sql_query: String,
77    /// Last executed fuzzy search (Fuzzy tab). Independent from active_query/active_sql_query.
78    pub active_fuzzy_query: String,
79    column_order: Vec<String>,   // Order of columns for display
80    locked_columns_count: usize, // Number of locked columns (from left)
81    grouped_lf: Option<LazyFrame>,
82    drilled_down_group_index: Option<usize>, // Index of the group we're viewing
83    pub drilled_down_group_key: Option<Vec<String>>, // Key values of the drilled down group
84    pub drilled_down_group_key_columns: Option<Vec<String>>, // Key column names of the drilled down group
85    pages_lookahead: usize,
86    pages_lookback: usize,
87    max_buffered_rows: usize, // 0 = no limit
88    max_buffered_mb: usize,   // 0 = no limit
89    buffered_start_row: usize,
90    buffered_end_row: usize,
91    /// Full buffered DataFrame (all columns in column_order) for the current buffer range.
92    /// When set, column scroll (scroll_left/scroll_right) only re-slices columns without re-collecting from LazyFrame.
93    buffered_df: Option<DataFrame>,
94    proximity_threshold: usize,
95    row_numbers: bool,
96    row_start_index: usize,
97    /// Last applied pivot spec, if current lf is result of a pivot. Used for templates.
98    last_pivot_spec: Option<PivotSpec>,
99    /// Last applied melt spec, if current lf is result of a melt. Used for templates.
100    last_melt_spec: Option<MeltSpec>,
101    /// When set, dataset was loaded with hive partitioning; partition column names for Info panel and predicate pushdown.
102    pub partition_columns: Option<Vec<String>>,
103    /// When set, decompressed CSV was written to this temp file; kept alive so the file exists for lazy scan.
104    decompress_temp_file: Option<NamedTempFile>,
105    /// When true, use Polars streaming engine for LazyFrame collect when the streaming feature is enabled.
106    pub polars_streaming: bool,
107    /// When true, cast Date/Datetime pivot index columns to Int32 before pivot (workaround for Polars 0.52).
108    workaround_pivot_date_index: bool,
109}
110
111/// Inferred type for an Excel column (preserves numbers, bools, dates; avoids stringifying).
112#[derive(Clone, Copy)]
113enum ExcelColType {
114    Int64,
115    Float64,
116    Boolean,
117    Utf8,
118    Date,
119    Datetime,
120}
121
122impl DataTableState {
123    pub fn new(
124        lf: LazyFrame,
125        pages_lookahead: Option<usize>,
126        pages_lookback: Option<usize>,
127        max_buffered_rows: Option<usize>,
128        max_buffered_mb: Option<usize>,
129        polars_streaming: bool,
130    ) -> Result<Self> {
131        let schema = lf.clone().collect_schema()?;
132        let column_order: Vec<String> = schema.iter_names().map(|s| s.to_string()).collect();
133        Ok(Self {
134            original_lf: lf.clone(),
135            lf,
136            df: None,
137            locked_df: None,
138            table_state: TableState::default(),
139            start_row: 0,
140            visible_rows: 0,
141            termcol_index: 0,
142            visible_termcols: 0,
143            error: None,
144            suppress_error_display: false,
145            schema,
146            num_rows: 0,
147            num_rows_valid: false,
148            filters: Vec::new(),
149            sort_columns: Vec::new(),
150            sort_ascending: true,
151            active_query: String::new(),
152            active_sql_query: String::new(),
153            active_fuzzy_query: String::new(),
154            column_order,
155            locked_columns_count: 0,
156            grouped_lf: None,
157            drilled_down_group_index: None,
158            drilled_down_group_key: None,
159            drilled_down_group_key_columns: None,
160            pages_lookahead: pages_lookahead.unwrap_or(3),
161            pages_lookback: pages_lookback.unwrap_or(3),
162            max_buffered_rows: max_buffered_rows.unwrap_or(100_000),
163            max_buffered_mb: max_buffered_mb.unwrap_or(512),
164            buffered_start_row: 0,
165            buffered_end_row: 0,
166            buffered_df: None,
167            proximity_threshold: 0, // Will be set when visible_rows is known
168            row_numbers: false,     // Will be set from options
169            row_start_index: 1,     // Will be set from options
170            last_pivot_spec: None,
171            last_melt_spec: None,
172            partition_columns: None,
173            decompress_temp_file: None,
174            polars_streaming,
175            workaround_pivot_date_index: true,
176        })
177    }
178
179    /// Create state from an existing LazyFrame (e.g. from Python or in-memory). Uses OpenOptions for display/buffer settings.
180    pub fn from_lazyframe(lf: LazyFrame, options: &crate::OpenOptions) -> Result<Self> {
181        let mut state = Self::new(
182            lf,
183            options.pages_lookahead,
184            options.pages_lookback,
185            options.max_buffered_rows,
186            options.max_buffered_mb,
187            options.polars_streaming,
188        )?;
189        state.row_numbers = options.row_numbers;
190        state.row_start_index = options.row_start_index;
191        state.workaround_pivot_date_index = options.workaround_pivot_date_index;
192        Ok(state)
193    }
194
195    /// Create state from a pre-collected schema and LazyFrame (for phased loading). Does not call collect_schema();
196    /// df is None so the UI can render headers while the first collect() runs.
197    /// When `partition_columns` is Some (e.g. hive), column order is partition cols first.
198    pub fn from_schema_and_lazyframe(
199        schema: Arc<Schema>,
200        lf: LazyFrame,
201        options: &crate::OpenOptions,
202        partition_columns: Option<Vec<String>>,
203    ) -> Result<Self> {
204        let column_order: Vec<String> = if let Some(ref part) = partition_columns {
205            let part_set: HashSet<&str> = part.iter().map(String::as_str).collect();
206            let rest: Vec<String> = schema
207                .iter_names()
208                .map(|s| s.to_string())
209                .filter(|c| !part_set.contains(c.as_str()))
210                .collect();
211            part.iter().cloned().chain(rest).collect()
212        } else {
213            schema.iter_names().map(|s| s.to_string()).collect()
214        };
215        Ok(Self {
216            original_lf: lf.clone(),
217            lf,
218            df: None,
219            locked_df: None,
220            table_state: TableState::default(),
221            start_row: 0,
222            visible_rows: 0,
223            termcol_index: 0,
224            visible_termcols: 0,
225            error: None,
226            suppress_error_display: false,
227            schema,
228            num_rows: 0,
229            num_rows_valid: false,
230            filters: Vec::new(),
231            sort_columns: Vec::new(),
232            sort_ascending: true,
233            active_query: String::new(),
234            active_sql_query: String::new(),
235            active_fuzzy_query: String::new(),
236            column_order,
237            locked_columns_count: 0,
238            grouped_lf: None,
239            drilled_down_group_index: None,
240            drilled_down_group_key: None,
241            drilled_down_group_key_columns: None,
242            pages_lookahead: options.pages_lookahead.unwrap_or(3),
243            pages_lookback: options.pages_lookback.unwrap_or(3),
244            max_buffered_rows: options.max_buffered_rows.unwrap_or(100_000),
245            max_buffered_mb: options.max_buffered_mb.unwrap_or(512),
246            buffered_start_row: 0,
247            buffered_end_row: 0,
248            buffered_df: None,
249            proximity_threshold: 0,
250            row_numbers: options.row_numbers,
251            row_start_index: options.row_start_index,
252            last_pivot_spec: None,
253            last_melt_spec: None,
254            partition_columns,
255            decompress_temp_file: None,
256            polars_streaming: options.polars_streaming,
257            workaround_pivot_date_index: options.workaround_pivot_date_index,
258        })
259    }
260
261    /// Reset LazyFrame and view state to original_lf. Schema is re-fetched so it matches
262    /// after a previous query/SQL that may have changed columns. Caller should call
263    /// collect() afterward if display update is needed (reset/query/fuzzy do; sql_query
264    /// relies on event loop Collect).
265    fn reset_lf_to_original(&mut self) {
266        self.invalidate_num_rows();
267        self.lf = self.original_lf.clone();
268        self.schema = self
269            .original_lf
270            .clone()
271            .collect_schema()
272            .unwrap_or_else(|_| Arc::new(Schema::with_capacity(0)));
273        self.column_order = self.schema.iter_names().map(|s| s.to_string()).collect();
274        self.active_query.clear();
275        self.active_sql_query.clear();
276        self.active_fuzzy_query.clear();
277        self.locked_columns_count = 0;
278        self.filters.clear();
279        self.sort_columns.clear();
280        self.sort_ascending = true;
281        self.start_row = 0;
282        self.termcol_index = 0;
283        self.drilled_down_group_index = None;
284        self.drilled_down_group_key = None;
285        self.drilled_down_group_key_columns = None;
286        self.grouped_lf = None;
287        self.buffered_start_row = 0;
288        self.buffered_end_row = 0;
289        self.buffered_df = None;
290        self.table_state.select(Some(0));
291    }
292
293    pub fn reset(&mut self) {
294        self.reset_lf_to_original();
295        self.error = None;
296        self.suppress_error_display = false;
297        self.last_pivot_spec = None;
298        self.last_melt_spec = None;
299        self.collect();
300        if self.num_rows > 0 {
301            self.start_row = 0;
302        }
303    }
304
305    pub fn from_parquet(
306        path: &Path,
307        pages_lookahead: Option<usize>,
308        pages_lookback: Option<usize>,
309        max_buffered_rows: Option<usize>,
310        max_buffered_mb: Option<usize>,
311        row_numbers: bool,
312        row_start_index: usize,
313    ) -> Result<Self> {
314        let path_str = path.as_os_str().to_string_lossy();
315        let is_glob = path_str.contains('*');
316        let pl_path = PlPath::Local(Arc::from(path));
317        let args = ScanArgsParquet {
318            glob: is_glob,
319            ..Default::default()
320        };
321        let lf = LazyFrame::scan_parquet(pl_path, args)?;
322        let mut state = Self::new(
323            lf,
324            pages_lookahead,
325            pages_lookback,
326            max_buffered_rows,
327            max_buffered_mb,
328            true,
329        )?;
330        state.row_numbers = row_numbers;
331        state.row_start_index = row_start_index;
332        Ok(state)
333    }
334
335    /// Load multiple Parquet files and concatenate them into one LazyFrame (same schema assumed).
336    pub fn from_parquet_paths(
337        paths: &[impl AsRef<Path>],
338        pages_lookahead: Option<usize>,
339        pages_lookback: Option<usize>,
340        max_buffered_rows: Option<usize>,
341        max_buffered_mb: Option<usize>,
342        row_numbers: bool,
343        row_start_index: usize,
344    ) -> Result<Self> {
345        if paths.is_empty() {
346            return Err(color_eyre::eyre::eyre!("No paths provided"));
347        }
348        if paths.len() == 1 {
349            return Self::from_parquet(
350                paths[0].as_ref(),
351                pages_lookahead,
352                pages_lookback,
353                max_buffered_rows,
354                max_buffered_mb,
355                row_numbers,
356                row_start_index,
357            );
358        }
359        let mut lazy_frames = Vec::with_capacity(paths.len());
360        for p in paths {
361            let pl_path = PlPath::Local(Arc::from(p.as_ref()));
362            let lf = LazyFrame::scan_parquet(pl_path, Default::default())?;
363            lazy_frames.push(lf);
364        }
365        let lf = polars::prelude::concat(lazy_frames.as_slice(), Default::default())?;
366        let mut state = Self::new(
367            lf,
368            pages_lookahead,
369            pages_lookback,
370            max_buffered_rows,
371            max_buffered_mb,
372            true,
373        )?;
374        state.row_numbers = row_numbers;
375        state.row_start_index = row_start_index;
376        Ok(state)
377    }
378
379    /// Load a single Arrow IPC / Feather v2 file (lazy).
380    pub fn from_ipc(
381        path: &Path,
382        pages_lookahead: Option<usize>,
383        pages_lookback: Option<usize>,
384        max_buffered_rows: Option<usize>,
385        max_buffered_mb: Option<usize>,
386        row_numbers: bool,
387        row_start_index: usize,
388    ) -> Result<Self> {
389        let pl_path = PlPath::Local(Arc::from(path));
390        let lf = LazyFrame::scan_ipc(pl_path, Default::default(), Default::default())?;
391        let mut state = Self::new(
392            lf,
393            pages_lookahead,
394            pages_lookback,
395            max_buffered_rows,
396            max_buffered_mb,
397            true,
398        )?;
399        state.row_numbers = row_numbers;
400        state.row_start_index = row_start_index;
401        Ok(state)
402    }
403
404    /// Load multiple Arrow IPC / Feather files and concatenate into one LazyFrame.
405    pub fn from_ipc_paths(
406        paths: &[impl AsRef<Path>],
407        pages_lookahead: Option<usize>,
408        pages_lookback: Option<usize>,
409        max_buffered_rows: Option<usize>,
410        max_buffered_mb: Option<usize>,
411        row_numbers: bool,
412        row_start_index: usize,
413    ) -> Result<Self> {
414        if paths.is_empty() {
415            return Err(color_eyre::eyre::eyre!("No paths provided"));
416        }
417        if paths.len() == 1 {
418            return Self::from_ipc(
419                paths[0].as_ref(),
420                pages_lookahead,
421                pages_lookback,
422                max_buffered_rows,
423                max_buffered_mb,
424                row_numbers,
425                row_start_index,
426            );
427        }
428        let mut lazy_frames = Vec::with_capacity(paths.len());
429        for p in paths {
430            let pl_path = PlPath::Local(Arc::from(p.as_ref()));
431            let lf = LazyFrame::scan_ipc(pl_path, Default::default(), Default::default())?;
432            lazy_frames.push(lf);
433        }
434        let lf = polars::prelude::concat(lazy_frames.as_slice(), Default::default())?;
435        let mut state = Self::new(
436            lf,
437            pages_lookahead,
438            pages_lookback,
439            max_buffered_rows,
440            max_buffered_mb,
441            true,
442        )?;
443        state.row_numbers = row_numbers;
444        state.row_start_index = row_start_index;
445        Ok(state)
446    }
447
448    /// Load a single Avro file (eager read, then lazy).
449    pub fn from_avro(
450        path: &Path,
451        pages_lookahead: Option<usize>,
452        pages_lookback: Option<usize>,
453        max_buffered_rows: Option<usize>,
454        max_buffered_mb: Option<usize>,
455        row_numbers: bool,
456        row_start_index: usize,
457    ) -> Result<Self> {
458        let file = File::open(path)?;
459        let df = polars::io::avro::AvroReader::new(file).finish()?;
460        let lf = df.lazy();
461        let mut state = Self::new(
462            lf,
463            pages_lookahead,
464            pages_lookback,
465            max_buffered_rows,
466            max_buffered_mb,
467            true,
468        )?;
469        state.row_numbers = row_numbers;
470        state.row_start_index = row_start_index;
471        Ok(state)
472    }
473
474    /// Load multiple Avro files and concatenate into one LazyFrame.
475    pub fn from_avro_paths(
476        paths: &[impl AsRef<Path>],
477        pages_lookahead: Option<usize>,
478        pages_lookback: Option<usize>,
479        max_buffered_rows: Option<usize>,
480        max_buffered_mb: Option<usize>,
481        row_numbers: bool,
482        row_start_index: usize,
483    ) -> Result<Self> {
484        if paths.is_empty() {
485            return Err(color_eyre::eyre::eyre!("No paths provided"));
486        }
487        if paths.len() == 1 {
488            return Self::from_avro(
489                paths[0].as_ref(),
490                pages_lookahead,
491                pages_lookback,
492                max_buffered_rows,
493                max_buffered_mb,
494                row_numbers,
495                row_start_index,
496            );
497        }
498        let mut lazy_frames = Vec::with_capacity(paths.len());
499        for p in paths {
500            let file = File::open(p.as_ref())?;
501            let df = polars::io::avro::AvroReader::new(file).finish()?;
502            lazy_frames.push(df.lazy());
503        }
504        let lf = polars::prelude::concat(lazy_frames.as_slice(), Default::default())?;
505        let mut state = Self::new(
506            lf,
507            pages_lookahead,
508            pages_lookback,
509            max_buffered_rows,
510            max_buffered_mb,
511            true,
512        )?;
513        state.row_numbers = row_numbers;
514        state.row_start_index = row_start_index;
515        Ok(state)
516    }
517
518    /// Load a single Excel file (xls, xlsx, xlsm, xlsb) using calamine (eager read, then lazy).
519    /// Sheet is selected by 0-based index or name via `excel_sheet`.
520    #[allow(clippy::too_many_arguments)]
521    pub fn from_excel(
522        path: &Path,
523        pages_lookahead: Option<usize>,
524        pages_lookback: Option<usize>,
525        max_buffered_rows: Option<usize>,
526        max_buffered_mb: Option<usize>,
527        row_numbers: bool,
528        row_start_index: usize,
529        excel_sheet: Option<&str>,
530    ) -> Result<Self> {
531        let mut workbook =
532            open_workbook_auto(path).map_err(|e| color_eyre::eyre::eyre!("Excel: {}", e))?;
533        let sheet_names = workbook.sheet_names().to_vec();
534        if sheet_names.is_empty() {
535            return Err(color_eyre::eyre::eyre!("Excel file has no worksheets"));
536        }
537        let range = if let Some(sheet_sel) = excel_sheet {
538            if let Ok(idx) = sheet_sel.parse::<usize>() {
539                workbook
540                    .worksheet_range_at(idx)
541                    .ok_or_else(|| color_eyre::eyre::eyre!("Excel: no sheet at index {}", idx))?
542                    .map_err(|e| color_eyre::eyre::eyre!("Excel: {}", e))?
543            } else {
544                workbook
545                    .worksheet_range(sheet_sel)
546                    .map_err(|e| color_eyre::eyre::eyre!("Excel: {}", e))?
547            }
548        } else {
549            workbook
550                .worksheet_range_at(0)
551                .ok_or_else(|| color_eyre::eyre::eyre!("Excel: no first sheet"))?
552                .map_err(|e| color_eyre::eyre::eyre!("Excel: {}", e))?
553        };
554        let rows: Vec<Vec<Data>> = range.rows().map(|r| r.to_vec()).collect();
555        if rows.is_empty() {
556            let empty_df = DataFrame::new(vec![])?;
557            let mut state = Self::new(
558                empty_df.lazy(),
559                pages_lookahead,
560                pages_lookback,
561                max_buffered_rows,
562                max_buffered_mb,
563                true,
564            )?;
565            state.row_numbers = row_numbers;
566            state.row_start_index = row_start_index;
567            return Ok(state);
568        }
569        let headers: Vec<String> = rows[0]
570            .iter()
571            .map(|c| calamine::DataType::as_string(c).unwrap_or_else(|| c.to_string()))
572            .collect();
573        let n_cols = headers.len();
574        let mut series_vec = Vec::with_capacity(n_cols);
575        for (col_idx, header) in headers.iter().enumerate() {
576            let col_cells: Vec<Option<&Data>> =
577                rows[1..].iter().map(|row| row.get(col_idx)).collect();
578            let inferred = Self::excel_infer_column_type(&col_cells);
579            let name = if header.is_empty() {
580                format!("column_{}", col_idx + 1)
581            } else {
582                header.clone()
583            };
584            let series = Self::excel_column_to_series(name.as_str(), &col_cells, inferred)?;
585            series_vec.push(series.into());
586        }
587        let df = DataFrame::new(series_vec)?;
588        let mut state = Self::new(
589            df.lazy(),
590            pages_lookahead,
591            pages_lookback,
592            max_buffered_rows,
593            max_buffered_mb,
594            true,
595        )?;
596        state.row_numbers = row_numbers;
597        state.row_start_index = row_start_index;
598        Ok(state)
599    }
600
601    /// Infers column type: prefers Int64 for whole-number floats; infers Date/Datetime for
602    /// calamine DateTime/DateTimeIso or for string columns that parse as ISO date/datetime.
603    fn excel_infer_column_type(cells: &[Option<&Data>]) -> ExcelColType {
604        use calamine::DataType as CalamineTrait;
605        let mut has_string = false;
606        let mut has_float = false;
607        let mut has_int = false;
608        let mut has_bool = false;
609        let mut has_datetime = false;
610        for cell in cells.iter().flatten() {
611            if CalamineTrait::is_string(*cell) {
612                has_string = true;
613                break;
614            }
615            if CalamineTrait::is_float(*cell)
616                || CalamineTrait::is_datetime(*cell)
617                || CalamineTrait::is_datetime_iso(*cell)
618            {
619                has_float = true;
620            }
621            if CalamineTrait::is_int(*cell) {
622                has_int = true;
623            }
624            if CalamineTrait::is_bool(*cell) {
625                has_bool = true;
626            }
627            if CalamineTrait::is_datetime(*cell) || CalamineTrait::is_datetime_iso(*cell) {
628                has_datetime = true;
629            }
630        }
631        if has_string {
632            let any_parsed = cells
633                .iter()
634                .flatten()
635                .any(|c| Self::excel_cell_to_naive_datetime(c).is_some());
636            let all_non_empty_parse = cells.iter().flatten().all(|c| {
637                CalamineTrait::is_empty(*c) || Self::excel_cell_to_naive_datetime(c).is_some()
638            });
639            if any_parsed && all_non_empty_parse {
640                if Self::excel_parsed_cells_all_midnight(cells) {
641                    ExcelColType::Date
642                } else {
643                    ExcelColType::Datetime
644                }
645            } else {
646                ExcelColType::Utf8
647            }
648        } else if has_int {
649            ExcelColType::Int64
650        } else if has_datetime {
651            if Self::excel_parsed_cells_all_midnight(cells) {
652                ExcelColType::Date
653            } else {
654                ExcelColType::Datetime
655            }
656        } else if has_float {
657            let all_whole = cells.iter().flatten().all(|cell| {
658                cell.as_f64()
659                    .is_none_or(|f| f.is_finite() && (f - f.trunc()).abs() < 1e-10)
660            });
661            if all_whole {
662                ExcelColType::Int64
663            } else {
664                ExcelColType::Float64
665            }
666        } else if has_bool {
667            ExcelColType::Boolean
668        } else {
669            ExcelColType::Utf8
670        }
671    }
672
673    /// True if every cell that parses as datetime has time 00:00:00.
674    fn excel_parsed_cells_all_midnight(cells: &[Option<&Data>]) -> bool {
675        let midnight = NaiveTime::from_hms_opt(0, 0, 0).expect("valid time");
676        cells
677            .iter()
678            .flatten()
679            .filter_map(|c| Self::excel_cell_to_naive_datetime(c))
680            .all(|dt| dt.time() == midnight)
681    }
682
683    /// Converts a calamine cell to NaiveDateTime (Excel serial, DateTimeIso, or parseable string).
684    fn excel_cell_to_naive_datetime(cell: &Data) -> Option<NaiveDateTime> {
685        use calamine::DataType;
686        if let Some(dt) = cell.as_datetime() {
687            return Some(dt);
688        }
689        let s = cell.get_datetime_iso().or_else(|| cell.get_string())?;
690        Self::parse_naive_datetime_str(s)
691    }
692
693    /// Parses an ISO-style date/datetime string; tries FORMATS in order.
694    fn parse_naive_datetime_str(s: &str) -> Option<NaiveDateTime> {
695        let s = s.trim();
696        if s.is_empty() {
697            return None;
698        }
699        const FORMATS: &[&str] = &[
700            "%Y-%m-%dT%H:%M:%S%.f",
701            "%Y-%m-%dT%H:%M:%S",
702            "%Y-%m-%d %H:%M:%S%.f",
703            "%Y-%m-%d %H:%M:%S",
704            "%Y-%m-%d",
705        ];
706        for fmt in FORMATS {
707            if let Ok(dt) = NaiveDateTime::parse_from_str(s, fmt) {
708                return Some(dt);
709            }
710        }
711        if let Ok(d) = NaiveDate::parse_from_str(s, "%Y-%m-%d") {
712            return Some(d.and_hms_opt(0, 0, 0).expect("midnight"));
713        }
714        None
715    }
716
717    /// Build a Polars Series from a column of calamine cells using the inferred type.
718    fn excel_column_to_series(
719        name: &str,
720        cells: &[Option<&Data>],
721        col_type: ExcelColType,
722    ) -> Result<Series> {
723        use calamine::DataType as CalamineTrait;
724        use polars::datatypes::TimeUnit;
725        let series = match col_type {
726            ExcelColType::Int64 => {
727                let v: Vec<Option<i64>> = cells
728                    .iter()
729                    .map(|c| c.and_then(|cell| cell.as_i64()))
730                    .collect();
731                Series::new(name.into(), v)
732            }
733            ExcelColType::Float64 => {
734                let v: Vec<Option<f64>> = cells
735                    .iter()
736                    .map(|c| c.and_then(|cell| cell.as_f64()))
737                    .collect();
738                Series::new(name.into(), v)
739            }
740            ExcelColType::Boolean => {
741                let v: Vec<Option<bool>> = cells
742                    .iter()
743                    .map(|c| c.and_then(|cell| cell.get_bool()))
744                    .collect();
745                Series::new(name.into(), v)
746            }
747            ExcelColType::Utf8 => {
748                let v: Vec<Option<String>> = cells
749                    .iter()
750                    .map(|c| c.and_then(|cell| cell.as_string()))
751                    .collect();
752                Series::new(name.into(), v)
753            }
754            ExcelColType::Date => {
755                let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).expect("valid date");
756                let v: Vec<Option<i32>> = cells
757                    .iter()
758                    .map(|c| {
759                        c.and_then(Self::excel_cell_to_naive_datetime)
760                            .map(|dt| (dt.date() - epoch).num_days() as i32)
761                    })
762                    .collect();
763                Series::new(name.into(), v).cast(&DataType::Date)?
764            }
765            ExcelColType::Datetime => {
766                let v: Vec<Option<i64>> = cells
767                    .iter()
768                    .map(|c| {
769                        c.and_then(Self::excel_cell_to_naive_datetime)
770                            .map(|dt| dt.and_utc().timestamp_micros())
771                    })
772                    .collect();
773                Series::new(name.into(), v)
774                    .cast(&DataType::Datetime(TimeUnit::Microseconds, None))?
775            }
776        };
777        Ok(series)
778    }
779
780    /// Load a single ORC file (eager read via orc-rust → Arrow, then convert to Polars, then lazy).
781    /// ORC is read fully into memory; see loading-data docs for large-file notes.
782    pub fn from_orc(
783        path: &Path,
784        pages_lookahead: Option<usize>,
785        pages_lookback: Option<usize>,
786        max_buffered_rows: Option<usize>,
787        max_buffered_mb: Option<usize>,
788        row_numbers: bool,
789        row_start_index: usize,
790    ) -> Result<Self> {
791        let file = File::open(path)?;
792        let reader = ArrowReaderBuilder::try_new(file)
793            .map_err(|e| color_eyre::eyre::eyre!("ORC: {}", e))?
794            .build();
795        let batches: Vec<RecordBatch> = reader
796            .collect::<std::result::Result<Vec<_>, _>>()
797            .map_err(|e| color_eyre::eyre::eyre!("ORC: {}", e))?;
798        let df = Self::arrow_record_batches_to_dataframe(&batches)?;
799        let lf = df.lazy();
800        let mut state = Self::new(
801            lf,
802            pages_lookahead,
803            pages_lookback,
804            max_buffered_rows,
805            max_buffered_mb,
806            true,
807        )?;
808        state.row_numbers = row_numbers;
809        state.row_start_index = row_start_index;
810        Ok(state)
811    }
812
813    /// Load multiple ORC files and concatenate into one LazyFrame.
814    pub fn from_orc_paths(
815        paths: &[impl AsRef<Path>],
816        pages_lookahead: Option<usize>,
817        pages_lookback: Option<usize>,
818        max_buffered_rows: Option<usize>,
819        max_buffered_mb: Option<usize>,
820        row_numbers: bool,
821        row_start_index: usize,
822    ) -> Result<Self> {
823        if paths.is_empty() {
824            return Err(color_eyre::eyre::eyre!("No paths provided"));
825        }
826        if paths.len() == 1 {
827            return Self::from_orc(
828                paths[0].as_ref(),
829                pages_lookahead,
830                pages_lookback,
831                max_buffered_rows,
832                max_buffered_mb,
833                row_numbers,
834                row_start_index,
835            );
836        }
837        let mut lazy_frames = Vec::with_capacity(paths.len());
838        for p in paths {
839            let file = File::open(p.as_ref())?;
840            let reader = ArrowReaderBuilder::try_new(file)
841                .map_err(|e| color_eyre::eyre::eyre!("ORC: {}", e))?
842                .build();
843            let batches: Vec<RecordBatch> = reader
844                .collect::<std::result::Result<Vec<_>, _>>()
845                .map_err(|e| color_eyre::eyre::eyre!("ORC: {}", e))?;
846            let df = Self::arrow_record_batches_to_dataframe(&batches)?;
847            lazy_frames.push(df.lazy());
848        }
849        let lf = polars::prelude::concat(lazy_frames.as_slice(), Default::default())?;
850        let mut state = Self::new(
851            lf,
852            pages_lookahead,
853            pages_lookback,
854            max_buffered_rows,
855            max_buffered_mb,
856            true,
857        )?;
858        state.row_numbers = row_numbers;
859        state.row_start_index = row_start_index;
860        Ok(state)
861    }
862
863    /// Convert Arrow (arrow crate 57) RecordBatches to Polars DataFrame by value (ORC uses
864    /// arrow 57; Polars uses polars-arrow, so we cannot use Series::from_arrow).
865    fn arrow_record_batches_to_dataframe(batches: &[RecordBatch]) -> Result<DataFrame> {
866        if batches.is_empty() {
867            return Ok(DataFrame::new(vec![])?);
868        }
869        let mut all_dfs = Vec::with_capacity(batches.len());
870        for batch in batches {
871            let n_cols = batch.num_columns();
872            let schema = batch.schema();
873            let mut series_vec = Vec::with_capacity(n_cols);
874            for (i, col) in batch.columns().iter().enumerate() {
875                let name = schema.field(i).name().as_str();
876                let s = Self::arrow_array_to_polars_series(name, col)?;
877                series_vec.push(s.into());
878            }
879            let df = DataFrame::new(series_vec)?;
880            all_dfs.push(df);
881        }
882        let mut out = all_dfs.remove(0);
883        for df in all_dfs {
884            out = out.vstack(&df)?;
885        }
886        Ok(out)
887    }
888
889    fn arrow_array_to_polars_series(name: &str, array: &dyn Array) -> Result<Series> {
890        use arrow::datatypes::DataType as ArrowDataType;
891        let len = array.len();
892        match array.data_type() {
893            ArrowDataType::Int8 => {
894                let a = array
895                    .as_primitive_opt::<Int8Type>()
896                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Int8 array"))?;
897                let v: Vec<Option<i8>> = (0..len)
898                    .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
899                    .collect();
900                Ok(Series::new(name.into(), v))
901            }
902            ArrowDataType::Int16 => {
903                let a = array
904                    .as_primitive_opt::<Int16Type>()
905                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Int16 array"))?;
906                let v: Vec<Option<i16>> = (0..len)
907                    .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
908                    .collect();
909                Ok(Series::new(name.into(), v))
910            }
911            ArrowDataType::Int32 => {
912                let a = array
913                    .as_primitive_opt::<Int32Type>()
914                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Int32 array"))?;
915                let v: Vec<Option<i32>> = (0..len)
916                    .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
917                    .collect();
918                Ok(Series::new(name.into(), v))
919            }
920            ArrowDataType::Int64 => {
921                let a = array
922                    .as_primitive_opt::<Int64Type>()
923                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Int64 array"))?;
924                let v: Vec<Option<i64>> = (0..len)
925                    .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
926                    .collect();
927                Ok(Series::new(name.into(), v))
928            }
929            ArrowDataType::UInt8 => {
930                let a = array
931                    .as_primitive_opt::<UInt8Type>()
932                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected UInt8 array"))?;
933                let v: Vec<Option<i64>> = (0..len)
934                    .map(|i| {
935                        if a.is_null(i) {
936                            None
937                        } else {
938                            Some(a.value(i) as i64)
939                        }
940                    })
941                    .collect();
942                Ok(Series::new(name.into(), v).cast(&DataType::UInt8)?)
943            }
944            ArrowDataType::UInt16 => {
945                let a = array
946                    .as_primitive_opt::<UInt16Type>()
947                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected UInt16 array"))?;
948                let v: Vec<Option<i64>> = (0..len)
949                    .map(|i| {
950                        if a.is_null(i) {
951                            None
952                        } else {
953                            Some(a.value(i) as i64)
954                        }
955                    })
956                    .collect();
957                Ok(Series::new(name.into(), v).cast(&DataType::UInt16)?)
958            }
959            ArrowDataType::UInt32 => {
960                let a = array
961                    .as_primitive_opt::<UInt32Type>()
962                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected UInt32 array"))?;
963                let v: Vec<Option<u32>> = (0..len)
964                    .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
965                    .collect();
966                Ok(Series::new(name.into(), v))
967            }
968            ArrowDataType::UInt64 => {
969                let a = array
970                    .as_primitive_opt::<UInt64Type>()
971                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected UInt64 array"))?;
972                let v: Vec<Option<u64>> = (0..len)
973                    .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
974                    .collect();
975                Ok(Series::new(name.into(), v))
976            }
977            ArrowDataType::Float32 => {
978                let a = array
979                    .as_primitive_opt::<Float32Type>()
980                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Float32 array"))?;
981                let v: Vec<Option<f32>> = (0..len)
982                    .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
983                    .collect();
984                Ok(Series::new(name.into(), v))
985            }
986            ArrowDataType::Float64 => {
987                let a = array
988                    .as_primitive_opt::<Float64Type>()
989                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Float64 array"))?;
990                let v: Vec<Option<f64>> = (0..len)
991                    .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
992                    .collect();
993                Ok(Series::new(name.into(), v))
994            }
995            ArrowDataType::Boolean => {
996                let a = array
997                    .as_boolean_opt()
998                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Boolean array"))?;
999                let v: Vec<Option<bool>> = (0..len)
1000                    .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
1001                    .collect();
1002                Ok(Series::new(name.into(), v))
1003            }
1004            ArrowDataType::Utf8 => {
1005                let a = array
1006                    .as_string_opt::<i32>()
1007                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Utf8 array"))?;
1008                let v: Vec<Option<String>> = (0..len)
1009                    .map(|i| {
1010                        if a.is_null(i) {
1011                            None
1012                        } else {
1013                            Some(a.value(i).to_string())
1014                        }
1015                    })
1016                    .collect();
1017                Ok(Series::new(name.into(), v))
1018            }
1019            ArrowDataType::LargeUtf8 => {
1020                let a = array
1021                    .as_string_opt::<i64>()
1022                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected LargeUtf8 array"))?;
1023                let v: Vec<Option<String>> = (0..len)
1024                    .map(|i| {
1025                        if a.is_null(i) {
1026                            None
1027                        } else {
1028                            Some(a.value(i).to_string())
1029                        }
1030                    })
1031                    .collect();
1032                Ok(Series::new(name.into(), v))
1033            }
1034            ArrowDataType::Date32 => {
1035                let a = array
1036                    .as_primitive_opt::<Date32Type>()
1037                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Date32 array"))?;
1038                let v: Vec<Option<i32>> = (0..len)
1039                    .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
1040                    .collect();
1041                Ok(Series::new(name.into(), v))
1042            }
1043            ArrowDataType::Date64 => {
1044                let a = array
1045                    .as_primitive_opt::<Date64Type>()
1046                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Date64 array"))?;
1047                let v: Vec<Option<i64>> = (0..len)
1048                    .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
1049                    .collect();
1050                Ok(Series::new(name.into(), v))
1051            }
1052            ArrowDataType::Timestamp(_, _) => {
1053                let a = array
1054                    .as_primitive_opt::<TimestampMillisecondType>()
1055                    .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Timestamp array"))?;
1056                let v: Vec<Option<i64>> = (0..len)
1057                    .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
1058                    .collect();
1059                Ok(Series::new(name.into(), v))
1060            }
1061            other => Err(color_eyre::eyre::eyre!(
1062                "ORC: unsupported column type {:?} for column '{}'",
1063                other,
1064                name
1065            )),
1066        }
1067    }
1068
1069    /// Build a LazyFrame for hive-partitioned Parquet only (no schema collection, no partition discovery).
1070    /// Use this for phased loading so "Scanning input" is instant; schema and partition handling happen in DoLoadSchema.
1071    pub fn scan_parquet_hive(path: &Path) -> Result<LazyFrame> {
1072        let path_str = path.as_os_str().to_string_lossy();
1073        let is_glob = path_str.contains('*');
1074        let pl_path = PlPath::Local(Arc::from(path));
1075        let args = ScanArgsParquet {
1076            hive_options: HiveOptions::new_enabled(),
1077            glob: is_glob,
1078            ..Default::default()
1079        };
1080        LazyFrame::scan_parquet(pl_path, args).map_err(Into::into)
1081    }
1082
1083    /// Build a LazyFrame for hive-partitioned Parquet with a pre-computed schema (avoids slow collect_schema across all files).
1084    pub fn scan_parquet_hive_with_schema(path: &Path, schema: Arc<Schema>) -> Result<LazyFrame> {
1085        let path_str = path.as_os_str().to_string_lossy();
1086        let is_glob = path_str.contains('*');
1087        let pl_path = PlPath::Local(Arc::from(path));
1088        let args = ScanArgsParquet {
1089            schema: Some(schema),
1090            hive_options: HiveOptions::new_enabled(),
1091            glob: is_glob,
1092            ..Default::default()
1093        };
1094        LazyFrame::scan_parquet(pl_path, args).map_err(Into::into)
1095    }
1096
1097    /// Find the first parquet file along a single spine of a hive-partitioned directory (same walk as partition discovery).
1098    /// Returns `None` if the directory is empty or has no parquet files along that spine.
1099    fn first_parquet_file_in_hive_dir(path: &Path) -> Option<std::path::PathBuf> {
1100        const MAX_DEPTH: usize = 64;
1101        Self::first_parquet_file_spine(path, 0, MAX_DEPTH)
1102    }
1103
1104    fn first_parquet_file_spine(
1105        path: &Path,
1106        depth: usize,
1107        max_depth: usize,
1108    ) -> Option<std::path::PathBuf> {
1109        if depth >= max_depth {
1110            return None;
1111        }
1112        let entries = fs::read_dir(path).ok()?;
1113        let mut first_partition_child: Option<std::path::PathBuf> = None;
1114        for entry in entries.flatten() {
1115            let child = entry.path();
1116            if child.is_file() {
1117                if child
1118                    .extension()
1119                    .is_some_and(|e| e.eq_ignore_ascii_case("parquet"))
1120                {
1121                    return Some(child);
1122                }
1123            } else if child.is_dir() {
1124                if let Some(name) = child.file_name().and_then(|n| n.to_str()) {
1125                    if name.contains('=') && first_partition_child.is_none() {
1126                        first_partition_child = Some(child);
1127                    }
1128                }
1129            }
1130        }
1131        first_partition_child.and_then(|p| Self::first_parquet_file_spine(&p, depth + 1, max_depth))
1132    }
1133
1134    /// Read schema from a single parquet file (metadata only, no data scan). Used to avoid collect_schema() over many files.
1135    fn read_schema_from_single_parquet(path: &Path) -> Result<Arc<Schema>> {
1136        let file = File::open(path)?;
1137        let mut reader = ParquetReader::new(file);
1138        let arrow_schema = reader.schema()?;
1139        let schema = Schema::from_arrow_schema(arrow_schema.as_ref());
1140        Ok(Arc::new(schema))
1141    }
1142
1143    /// Infer schema from one parquet file in a hive directory and merge with partition columns (Utf8).
1144    /// Returns (merged_schema, partition_columns). Use with scan_parquet_hive_with_schema to avoid slow collect_schema().
1145    /// Only supported when path is a directory (not a glob). Returns Err if no parquet file found or read fails.
1146    pub fn schema_from_one_hive_parquet(path: &Path) -> Result<(Arc<Schema>, Vec<String>)> {
1147        let partition_columns = Self::discover_hive_partition_columns(path);
1148        let one_file = Self::first_parquet_file_in_hive_dir(path)
1149            .ok_or_else(|| color_eyre::eyre::eyre!("No parquet file found in hive directory"))?;
1150        let file_schema = Self::read_schema_from_single_parquet(&one_file)?;
1151        let part_set: HashSet<&str> = partition_columns.iter().map(String::as_str).collect();
1152        let mut merged = Schema::with_capacity(partition_columns.len() + file_schema.len());
1153        for name in &partition_columns {
1154            merged.with_column(name.clone().into(), DataType::String);
1155        }
1156        for (name, dtype) in file_schema.iter() {
1157            if !part_set.contains(name.as_str()) {
1158                merged.with_column(name.clone(), dtype.clone());
1159            }
1160        }
1161        Ok((Arc::new(merged), partition_columns))
1162    }
1163
1164    /// Discover hive partition column names (public for phased loading). Directory: single-spine walk; glob: parse pattern.
1165    pub fn discover_hive_partition_columns(path: &Path) -> Vec<String> {
1166        if path.is_dir() {
1167            Self::discover_partition_columns_from_path(path)
1168        } else {
1169            Self::discover_partition_columns_from_glob_pattern(path)
1170        }
1171    }
1172
1173    /// Discover hive partition column names from a directory path by walking a single
1174    /// "spine" (one branch) of key=value directories. Partition keys are uniform across
1175    /// the tree, so we only need one path to infer [year, month, day] etc. Returns columns
1176    /// in path order. Stops after max_depth levels to avoid runaway on malformed trees.
1177    fn discover_partition_columns_from_path(path: &Path) -> Vec<String> {
1178        const MAX_PARTITION_DEPTH: usize = 64;
1179        let mut columns = Vec::<String>::new();
1180        let mut seen = HashSet::<String>::new();
1181        Self::discover_partition_columns_spine(
1182            path,
1183            &mut columns,
1184            &mut seen,
1185            0,
1186            MAX_PARTITION_DEPTH,
1187        );
1188        columns
1189    }
1190
1191    /// Walk one branch: at this directory, find the first child that is a key=value dir,
1192    /// record the key (if not already seen), then recurse into that one child only.
1193    /// This does O(depth) read_dir calls instead of walking the entire tree.
1194    fn discover_partition_columns_spine(
1195        path: &Path,
1196        columns: &mut Vec<String>,
1197        seen: &mut HashSet<String>,
1198        depth: usize,
1199        max_depth: usize,
1200    ) {
1201        if depth >= max_depth {
1202            return;
1203        }
1204        let Ok(entries) = fs::read_dir(path) else {
1205            return;
1206        };
1207        let mut first_partition_child: Option<std::path::PathBuf> = None;
1208        for entry in entries.flatten() {
1209            let child = entry.path();
1210            if child.is_dir() {
1211                if let Some(name) = child.file_name().and_then(|n| n.to_str()) {
1212                    if let Some((key, _)) = name.split_once('=') {
1213                        if !key.is_empty() && seen.insert(key.to_string()) {
1214                            columns.push(key.to_string());
1215                        }
1216                        if first_partition_child.is_none() {
1217                            first_partition_child = Some(child);
1218                        }
1219                        break;
1220                    }
1221                }
1222            }
1223        }
1224        if let Some(one) = first_partition_child {
1225            Self::discover_partition_columns_spine(&one, columns, seen, depth + 1, max_depth);
1226        }
1227    }
1228
1229    /// Infer partition column names from a glob pattern path (e.g. "data/year=*/month=*/*.parquet").
1230    fn discover_partition_columns_from_glob_pattern(path: &Path) -> Vec<String> {
1231        let path_str = path.as_os_str().to_string_lossy();
1232        let mut columns = Vec::<String>::new();
1233        let mut seen = HashSet::<String>::new();
1234        for segment in path_str.split('/') {
1235            if let Some((key, rest)) = segment.split_once('=') {
1236                if !key.is_empty()
1237                    && (rest == "*" || !rest.contains('*'))
1238                    && seen.insert(key.to_string())
1239                {
1240                    columns.push(key.to_string());
1241                }
1242            }
1243        }
1244        columns
1245    }
1246
1247    /// Load Parquet with Hive partitioning from a directory or glob path.
1248    /// When path is a directory, partition columns are discovered from path structure.
1249    /// When path contains glob (e.g. `**/*.parquet`), partition columns are inferred from the pattern (e.g. `year=*/month=*`).
1250    /// Partition columns are moved to the left in the initial LazyFrame before state is created.
1251    ///
1252    /// **Performance**: The slow part is Polars, not our code. `scan_parquet` + `collect_schema()` trigger
1253    /// path expansion (full directory tree or glob) and parquet metadata reads; we only do a single-spine
1254    /// walk for partition key discovery and cheap schema/select work.
1255    pub fn from_parquet_hive(
1256        path: &Path,
1257        pages_lookahead: Option<usize>,
1258        pages_lookback: Option<usize>,
1259        max_buffered_rows: Option<usize>,
1260        max_buffered_mb: Option<usize>,
1261        row_numbers: bool,
1262        row_start_index: usize,
1263    ) -> Result<Self> {
1264        let path_str = path.as_os_str().to_string_lossy();
1265        let is_glob = path_str.contains('*');
1266        let pl_path = PlPath::Local(Arc::from(path));
1267        let args = ScanArgsParquet {
1268            hive_options: HiveOptions::new_enabled(),
1269            glob: is_glob,
1270            ..Default::default()
1271        };
1272        let mut lf = LazyFrame::scan_parquet(pl_path, args)?;
1273        let schema = lf.collect_schema()?;
1274
1275        let mut discovered = if path.is_dir() {
1276            Self::discover_partition_columns_from_path(path)
1277        } else {
1278            Self::discover_partition_columns_from_glob_pattern(path)
1279        };
1280
1281        // Fallback: glob like "**/*.parquet" has no key= in the pattern, so discovery is empty.
1282        // Try discovering from a directory prefix (e.g. path.parent() or walk up until we find a dir).
1283        if discovered.is_empty() {
1284            let mut dir = path;
1285            while !dir.is_dir() {
1286                match dir.parent() {
1287                    Some(p) => dir = p,
1288                    None => break,
1289                }
1290            }
1291            if dir.is_dir() {
1292                discovered = Self::discover_partition_columns_from_path(dir);
1293            }
1294        }
1295
1296        let partition_columns: Vec<String> = discovered
1297            .into_iter()
1298            .filter(|c| schema.contains(c.as_str()))
1299            .collect();
1300
1301        let new_order: Vec<String> = if partition_columns.is_empty() {
1302            schema.iter_names().map(|s| s.to_string()).collect()
1303        } else {
1304            let part_set: HashSet<&str> = partition_columns.iter().map(String::as_str).collect();
1305            let all_names: Vec<String> = schema.iter_names().map(|s| s.to_string()).collect();
1306            let rest: Vec<String> = all_names
1307                .into_iter()
1308                .filter(|c| !part_set.contains(c.as_str()))
1309                .collect();
1310            partition_columns.iter().cloned().chain(rest).collect()
1311        };
1312
1313        if !partition_columns.is_empty() {
1314            let exprs: Vec<Expr> = new_order.iter().map(|s| col(s.as_str())).collect();
1315            lf = lf.select(exprs);
1316        }
1317
1318        let mut state = Self::new(
1319            lf,
1320            pages_lookahead,
1321            pages_lookback,
1322            max_buffered_rows,
1323            max_buffered_mb,
1324            true,
1325        )?;
1326        state.row_numbers = row_numbers;
1327        state.row_start_index = row_start_index;
1328        state.partition_columns = if partition_columns.is_empty() {
1329            None
1330        } else {
1331            Some(partition_columns)
1332        };
1333        // Ensure display order is partition-first (Self::new uses schema order; be explicit).
1334        state.set_column_order(new_order);
1335        Ok(state)
1336    }
1337
1338    pub fn set_row_numbers(&mut self, enabled: bool) {
1339        self.row_numbers = enabled;
1340    }
1341
1342    pub fn toggle_row_numbers(&mut self) {
1343        self.row_numbers = !self.row_numbers;
1344    }
1345
1346    /// Row number display start (0 or 1); used by go-to-line to interpret user input.
1347    pub fn row_start_index(&self) -> usize {
1348        self.row_start_index
1349    }
1350
1351    /// Decompress a compressed file to a temp file for lazy CSV scan.
1352    fn decompress_compressed_csv_to_temp(
1353        path: &Path,
1354        compression: CompressionFormat,
1355        temp_dir: &Path,
1356    ) -> Result<NamedTempFile> {
1357        let mut temp = NamedTempFile::new_in(temp_dir)?;
1358        let out = temp.as_file_mut();
1359        let mut reader: Box<dyn Read> = match compression {
1360            CompressionFormat::Gzip => {
1361                let f = File::open(path)?;
1362                Box::new(flate2::read::GzDecoder::new(BufReader::new(f)))
1363            }
1364            CompressionFormat::Zstd => {
1365                let f = File::open(path)?;
1366                Box::new(zstd::Decoder::new(BufReader::new(f))?)
1367            }
1368            CompressionFormat::Bzip2 => {
1369                let f = File::open(path)?;
1370                Box::new(bzip2::read::BzDecoder::new(BufReader::new(f)))
1371            }
1372            CompressionFormat::Xz => {
1373                let f = File::open(path)?;
1374                Box::new(xz2::read::XzDecoder::new(BufReader::new(f)))
1375            }
1376        };
1377        std::io::copy(&mut reader, out)?;
1378        out.sync_all()?;
1379        Ok(temp)
1380    }
1381
1382    pub fn from_csv(path: &Path, options: &OpenOptions) -> Result<Self> {
1383        // Determine compression format: explicit option, or auto-detect from extension
1384        let compression = options
1385            .compression
1386            .or_else(|| CompressionFormat::from_extension(path));
1387
1388        if let Some(compression) = compression {
1389            if options.decompress_in_memory {
1390                // Eager read: decompress into memory, then CSV read
1391                match compression {
1392                    CompressionFormat::Gzip | CompressionFormat::Zstd => {
1393                        let mut read_options = CsvReadOptions::default();
1394                        if let Some(skip_lines) = options.skip_lines {
1395                            read_options.skip_lines = skip_lines;
1396                        }
1397                        if let Some(skip_rows) = options.skip_rows {
1398                            read_options.skip_rows = skip_rows;
1399                        }
1400                        if let Some(has_header) = options.has_header {
1401                            read_options.has_header = has_header;
1402                        }
1403                        read_options = read_options.map_parse_options(|opts| {
1404                            opts.with_try_parse_dates(options.parse_dates)
1405                        });
1406                        let df = read_options
1407                            .try_into_reader_with_file_path(Some(path.into()))?
1408                            .finish()?;
1409                        let lf = df.lazy();
1410                        let mut state = Self::new(
1411                            lf,
1412                            options.pages_lookahead,
1413                            options.pages_lookback,
1414                            options.max_buffered_rows,
1415                            options.max_buffered_mb,
1416                            options.polars_streaming,
1417                        )?;
1418                        state.row_numbers = options.row_numbers;
1419                        state.row_start_index = options.row_start_index;
1420                        Ok(state)
1421                    }
1422                    CompressionFormat::Bzip2 => {
1423                        let file = File::open(path)?;
1424                        let mut decoder = bzip2::read::BzDecoder::new(BufReader::new(file));
1425                        let mut decompressed = Vec::new();
1426                        decoder.read_to_end(&mut decompressed)?;
1427                        let mut read_options = CsvReadOptions::default();
1428                        if let Some(skip_lines) = options.skip_lines {
1429                            read_options.skip_lines = skip_lines;
1430                        }
1431                        if let Some(skip_rows) = options.skip_rows {
1432                            read_options.skip_rows = skip_rows;
1433                        }
1434                        if let Some(has_header) = options.has_header {
1435                            read_options.has_header = has_header;
1436                        }
1437                        read_options = read_options.map_parse_options(|opts| {
1438                            opts.with_try_parse_dates(options.parse_dates)
1439                        });
1440                        let df = CsvReader::new(std::io::Cursor::new(decompressed))
1441                            .with_options(read_options)
1442                            .finish()?;
1443                        let lf = df.lazy();
1444                        let mut state = Self::new(
1445                            lf,
1446                            options.pages_lookahead,
1447                            options.pages_lookback,
1448                            options.max_buffered_rows,
1449                            options.max_buffered_mb,
1450                            options.polars_streaming,
1451                        )?;
1452                        state.row_numbers = options.row_numbers;
1453                        state.row_start_index = options.row_start_index;
1454                        Ok(state)
1455                    }
1456                    CompressionFormat::Xz => {
1457                        let file = File::open(path)?;
1458                        let mut decoder = xz2::read::XzDecoder::new(BufReader::new(file));
1459                        let mut decompressed = Vec::new();
1460                        decoder.read_to_end(&mut decompressed)?;
1461                        let mut read_options = CsvReadOptions::default();
1462                        if let Some(skip_lines) = options.skip_lines {
1463                            read_options.skip_lines = skip_lines;
1464                        }
1465                        if let Some(skip_rows) = options.skip_rows {
1466                            read_options.skip_rows = skip_rows;
1467                        }
1468                        if let Some(has_header) = options.has_header {
1469                            read_options.has_header = has_header;
1470                        }
1471                        read_options = read_options.map_parse_options(|opts| {
1472                            opts.with_try_parse_dates(options.parse_dates)
1473                        });
1474                        let df = CsvReader::new(std::io::Cursor::new(decompressed))
1475                            .with_options(read_options)
1476                            .finish()?;
1477                        let lf = df.lazy();
1478                        let mut state = Self::new(
1479                            lf,
1480                            options.pages_lookahead,
1481                            options.pages_lookback,
1482                            options.max_buffered_rows,
1483                            options.max_buffered_mb,
1484                            options.polars_streaming,
1485                        )?;
1486                        state.row_numbers = options.row_numbers;
1487                        state.row_start_index = options.row_start_index;
1488                        Ok(state)
1489                    }
1490                }
1491            } else {
1492                // Decompress to temp file, then lazy scan
1493                let temp_dir = options.temp_dir.clone().unwrap_or_else(std::env::temp_dir);
1494                let temp = Self::decompress_compressed_csv_to_temp(path, compression, &temp_dir)?;
1495                let mut state = Self::from_csv_customize(
1496                    temp.path(),
1497                    options.pages_lookahead,
1498                    options.pages_lookback,
1499                    options.max_buffered_rows,
1500                    options.max_buffered_mb,
1501                    |mut reader| {
1502                        if let Some(skip_lines) = options.skip_lines {
1503                            reader = reader.with_skip_lines(skip_lines);
1504                        }
1505                        if let Some(skip_rows) = options.skip_rows {
1506                            reader = reader.with_skip_rows(skip_rows);
1507                        }
1508                        if let Some(has_header) = options.has_header {
1509                            reader = reader.with_has_header(has_header);
1510                        }
1511                        reader = reader.with_try_parse_dates(options.parse_dates);
1512                        reader
1513                    },
1514                )?;
1515                state.row_numbers = options.row_numbers;
1516                state.row_start_index = options.row_start_index;
1517                state.decompress_temp_file = Some(temp);
1518                Ok(state)
1519            }
1520        } else {
1521            // For uncompressed files, use lazy scanning (more efficient)
1522            let mut state = Self::from_csv_customize(
1523                path,
1524                options.pages_lookahead,
1525                options.pages_lookback,
1526                options.max_buffered_rows,
1527                options.max_buffered_mb,
1528                |mut reader| {
1529                    if let Some(skip_lines) = options.skip_lines {
1530                        reader = reader.with_skip_lines(skip_lines);
1531                    }
1532                    if let Some(skip_rows) = options.skip_rows {
1533                        reader = reader.with_skip_rows(skip_rows);
1534                    }
1535                    if let Some(has_header) = options.has_header {
1536                        reader = reader.with_has_header(has_header);
1537                    }
1538                    reader = reader.with_try_parse_dates(options.parse_dates);
1539                    reader
1540                },
1541            )?;
1542            state.row_numbers = options.row_numbers;
1543            Ok(state)
1544        }
1545    }
1546
1547    pub fn from_csv_customize<F>(
1548        path: &Path,
1549        pages_lookahead: Option<usize>,
1550        pages_lookback: Option<usize>,
1551        max_buffered_rows: Option<usize>,
1552        max_buffered_mb: Option<usize>,
1553        func: F,
1554    ) -> Result<Self>
1555    where
1556        F: FnOnce(LazyCsvReader) -> LazyCsvReader,
1557    {
1558        let pl_path = PlPath::Local(Arc::from(path));
1559        let reader = LazyCsvReader::new(pl_path);
1560        let lf = func(reader).finish()?;
1561        Self::new(
1562            lf,
1563            pages_lookahead,
1564            pages_lookback,
1565            max_buffered_rows,
1566            max_buffered_mb,
1567            true,
1568        )
1569    }
1570
1571    /// Load multiple CSV files (uncompressed) and concatenate into one LazyFrame.
1572    pub fn from_csv_paths(paths: &[impl AsRef<Path>], options: &OpenOptions) -> Result<Self> {
1573        if paths.is_empty() {
1574            return Err(color_eyre::eyre::eyre!("No paths provided"));
1575        }
1576        if paths.len() == 1 {
1577            return Self::from_csv(paths[0].as_ref(), options);
1578        }
1579        let mut lazy_frames = Vec::with_capacity(paths.len());
1580        for p in paths {
1581            let pl_path = PlPath::Local(Arc::from(p.as_ref()));
1582            let mut reader = LazyCsvReader::new(pl_path);
1583            if let Some(skip_lines) = options.skip_lines {
1584                reader = reader.with_skip_lines(skip_lines);
1585            }
1586            if let Some(skip_rows) = options.skip_rows {
1587                reader = reader.with_skip_rows(skip_rows);
1588            }
1589            if let Some(has_header) = options.has_header {
1590                reader = reader.with_has_header(has_header);
1591            }
1592            reader = reader.with_try_parse_dates(options.parse_dates);
1593            let lf = reader.finish()?;
1594            lazy_frames.push(lf);
1595        }
1596        let lf = polars::prelude::concat(lazy_frames.as_slice(), Default::default())?;
1597        let mut state = Self::new(
1598            lf,
1599            options.pages_lookahead,
1600            options.pages_lookback,
1601            options.max_buffered_rows,
1602            options.max_buffered_mb,
1603            options.polars_streaming,
1604        )?;
1605        state.row_numbers = options.row_numbers;
1606        state.row_start_index = options.row_start_index;
1607        Ok(state)
1608    }
1609
1610    pub fn from_ndjson(
1611        path: &Path,
1612        pages_lookahead: Option<usize>,
1613        pages_lookback: Option<usize>,
1614        max_buffered_rows: Option<usize>,
1615        max_buffered_mb: Option<usize>,
1616        row_numbers: bool,
1617        row_start_index: usize,
1618    ) -> Result<Self> {
1619        let pl_path = PlPath::Local(Arc::from(path));
1620        let lf = LazyJsonLineReader::new(pl_path).finish()?;
1621        let mut state = Self::new(
1622            lf,
1623            pages_lookahead,
1624            pages_lookback,
1625            max_buffered_rows,
1626            max_buffered_mb,
1627            true,
1628        )?;
1629        state.row_numbers = row_numbers;
1630        state.row_start_index = row_start_index;
1631        Ok(state)
1632    }
1633
1634    /// Load multiple NDJSON files and concatenate into one LazyFrame.
1635    pub fn from_ndjson_paths(
1636        paths: &[impl AsRef<Path>],
1637        pages_lookahead: Option<usize>,
1638        pages_lookback: Option<usize>,
1639        max_buffered_rows: Option<usize>,
1640        max_buffered_mb: Option<usize>,
1641        row_numbers: bool,
1642        row_start_index: usize,
1643    ) -> Result<Self> {
1644        if paths.is_empty() {
1645            return Err(color_eyre::eyre::eyre!("No paths provided"));
1646        }
1647        if paths.len() == 1 {
1648            return Self::from_ndjson(
1649                paths[0].as_ref(),
1650                pages_lookahead,
1651                pages_lookback,
1652                max_buffered_rows,
1653                max_buffered_mb,
1654                row_numbers,
1655                row_start_index,
1656            );
1657        }
1658        let mut lazy_frames = Vec::with_capacity(paths.len());
1659        for p in paths {
1660            let pl_path = PlPath::Local(Arc::from(p.as_ref()));
1661            let lf = LazyJsonLineReader::new(pl_path).finish()?;
1662            lazy_frames.push(lf);
1663        }
1664        let lf = polars::prelude::concat(lazy_frames.as_slice(), Default::default())?;
1665        let mut state = Self::new(
1666            lf,
1667            pages_lookahead,
1668            pages_lookback,
1669            max_buffered_rows,
1670            max_buffered_mb,
1671            true,
1672        )?;
1673        state.row_numbers = row_numbers;
1674        state.row_start_index = row_start_index;
1675        Ok(state)
1676    }
1677
1678    pub fn from_json(
1679        path: &Path,
1680        pages_lookahead: Option<usize>,
1681        pages_lookback: Option<usize>,
1682        max_buffered_rows: Option<usize>,
1683        max_buffered_mb: Option<usize>,
1684        row_numbers: bool,
1685        row_start_index: usize,
1686    ) -> Result<Self> {
1687        Self::from_json_with_format(
1688            path,
1689            pages_lookahead,
1690            pages_lookback,
1691            max_buffered_rows,
1692            max_buffered_mb,
1693            row_numbers,
1694            row_start_index,
1695            JsonFormat::Json,
1696        )
1697    }
1698
1699    pub fn from_json_lines(
1700        path: &Path,
1701        pages_lookahead: Option<usize>,
1702        pages_lookback: Option<usize>,
1703        max_buffered_rows: Option<usize>,
1704        max_buffered_mb: Option<usize>,
1705        row_numbers: bool,
1706        row_start_index: usize,
1707    ) -> Result<Self> {
1708        Self::from_json_with_format(
1709            path,
1710            pages_lookahead,
1711            pages_lookback,
1712            max_buffered_rows,
1713            max_buffered_mb,
1714            row_numbers,
1715            row_start_index,
1716            JsonFormat::JsonLines,
1717        )
1718    }
1719
1720    #[allow(clippy::too_many_arguments)]
1721    fn from_json_with_format(
1722        path: &Path,
1723        pages_lookahead: Option<usize>,
1724        pages_lookback: Option<usize>,
1725        max_buffered_rows: Option<usize>,
1726        max_buffered_mb: Option<usize>,
1727        row_numbers: bool,
1728        row_start_index: usize,
1729        format: JsonFormat,
1730    ) -> Result<Self> {
1731        let file = File::open(path)?;
1732        let lf = JsonReader::new(file)
1733            .with_json_format(format)
1734            .finish()?
1735            .lazy();
1736        let mut state = Self::new(
1737            lf,
1738            pages_lookahead,
1739            pages_lookback,
1740            max_buffered_rows,
1741            max_buffered_mb,
1742            true,
1743        )?;
1744        state.row_numbers = row_numbers;
1745        state.row_start_index = row_start_index;
1746        Ok(state)
1747    }
1748
1749    /// Load multiple JSON (array) files and concatenate into one LazyFrame.
1750    pub fn from_json_paths(
1751        paths: &[impl AsRef<Path>],
1752        pages_lookahead: Option<usize>,
1753        pages_lookback: Option<usize>,
1754        max_buffered_rows: Option<usize>,
1755        max_buffered_mb: Option<usize>,
1756        row_numbers: bool,
1757        row_start_index: usize,
1758    ) -> Result<Self> {
1759        Self::from_json_with_format_paths(
1760            paths,
1761            pages_lookahead,
1762            pages_lookback,
1763            max_buffered_rows,
1764            max_buffered_mb,
1765            row_numbers,
1766            row_start_index,
1767            JsonFormat::Json,
1768        )
1769    }
1770
1771    /// Load multiple JSON Lines files and concatenate into one LazyFrame.
1772    pub fn from_json_lines_paths(
1773        paths: &[impl AsRef<Path>],
1774        pages_lookahead: Option<usize>,
1775        pages_lookback: Option<usize>,
1776        max_buffered_rows: Option<usize>,
1777        max_buffered_mb: Option<usize>,
1778        row_numbers: bool,
1779        row_start_index: usize,
1780    ) -> Result<Self> {
1781        Self::from_json_with_format_paths(
1782            paths,
1783            pages_lookahead,
1784            pages_lookback,
1785            max_buffered_rows,
1786            max_buffered_mb,
1787            row_numbers,
1788            row_start_index,
1789            JsonFormat::JsonLines,
1790        )
1791    }
1792
1793    #[allow(clippy::too_many_arguments)]
1794    fn from_json_with_format_paths(
1795        paths: &[impl AsRef<Path>],
1796        pages_lookahead: Option<usize>,
1797        pages_lookback: Option<usize>,
1798        max_buffered_rows: Option<usize>,
1799        max_buffered_mb: Option<usize>,
1800        row_numbers: bool,
1801        row_start_index: usize,
1802        format: JsonFormat,
1803    ) -> Result<Self> {
1804        if paths.is_empty() {
1805            return Err(color_eyre::eyre::eyre!("No paths provided"));
1806        }
1807        if paths.len() == 1 {
1808            return Self::from_json_with_format(
1809                paths[0].as_ref(),
1810                pages_lookahead,
1811                pages_lookback,
1812                max_buffered_rows,
1813                max_buffered_mb,
1814                row_numbers,
1815                row_start_index,
1816                format,
1817            );
1818        }
1819        let mut lazy_frames = Vec::with_capacity(paths.len());
1820        for p in paths {
1821            let file = File::open(p.as_ref())?;
1822            let lf = match &format {
1823                JsonFormat::Json => JsonReader::new(file)
1824                    .with_json_format(JsonFormat::Json)
1825                    .finish()?
1826                    .lazy(),
1827                JsonFormat::JsonLines => JsonReader::new(file)
1828                    .with_json_format(JsonFormat::JsonLines)
1829                    .finish()?
1830                    .lazy(),
1831            };
1832            lazy_frames.push(lf);
1833        }
1834        let lf = polars::prelude::concat(lazy_frames.as_slice(), Default::default())?;
1835        let mut state = Self::new(
1836            lf,
1837            pages_lookahead,
1838            pages_lookback,
1839            max_buffered_rows,
1840            max_buffered_mb,
1841            true,
1842        )?;
1843        state.row_numbers = row_numbers;
1844        state.row_start_index = row_start_index;
1845        Ok(state)
1846    }
1847
1848    #[allow(clippy::too_many_arguments)]
1849    pub fn from_delimited(
1850        path: &Path,
1851        delimiter: u8,
1852        pages_lookahead: Option<usize>,
1853        pages_lookback: Option<usize>,
1854        max_buffered_rows: Option<usize>,
1855        max_buffered_mb: Option<usize>,
1856        row_numbers: bool,
1857        row_start_index: usize,
1858    ) -> Result<Self> {
1859        let pl_path = PlPath::Local(Arc::from(path));
1860        let reader = LazyCsvReader::new(pl_path).with_separator(delimiter);
1861        let lf = reader.finish()?;
1862        let mut state = Self::new(
1863            lf,
1864            pages_lookahead,
1865            pages_lookback,
1866            max_buffered_rows,
1867            max_buffered_mb,
1868            true,
1869        )?;
1870        state.row_numbers = row_numbers;
1871        state.row_start_index = row_start_index;
1872        Ok(state)
1873    }
1874
1875    /// Returns true if a scroll by `rows` would trigger a collect (view would leave the buffer).
1876    /// Used so the UI only shows the throbber when actual data loading will occur.
1877    pub fn scroll_would_trigger_collect(&self, rows: i64) -> bool {
1878        if rows < 0 && self.start_row == 0 {
1879            return false;
1880        }
1881        let new_start_row = if self.start_row as i64 + rows <= 0 {
1882            0
1883        } else {
1884            if let Some(df) = self.df.as_ref() {
1885                if rows > 0 && df.shape().0 <= self.visible_rows {
1886                    return false;
1887                }
1888            }
1889            (self.start_row as i64 + rows) as usize
1890        };
1891        let view_end = new_start_row
1892            + self
1893                .visible_rows
1894                .min(self.num_rows.saturating_sub(new_start_row));
1895        let within_buffer = new_start_row >= self.buffered_start_row
1896            && view_end <= self.buffered_end_row
1897            && self.buffered_end_row > 0;
1898        !within_buffer
1899    }
1900
1901    fn slide_table(&mut self, rows: i64) {
1902        if rows < 0 && self.start_row == 0 {
1903            return;
1904        }
1905
1906        let new_start_row = if self.start_row as i64 + rows <= 0 {
1907            0
1908        } else {
1909            if let Some(df) = self.df.as_ref() {
1910                if rows > 0 && df.shape().0 <= self.visible_rows {
1911                    return;
1912                }
1913            }
1914            (self.start_row as i64 + rows) as usize
1915        };
1916
1917        // Call collect() only when view is outside buffer; otherwise just update start_row.
1918        let view_end = new_start_row
1919            + self
1920                .visible_rows
1921                .min(self.num_rows.saturating_sub(new_start_row));
1922        let within_buffer = new_start_row >= self.buffered_start_row
1923            && view_end <= self.buffered_end_row
1924            && self.buffered_end_row > 0;
1925
1926        if within_buffer {
1927            self.start_row = new_start_row;
1928            return;
1929        }
1930
1931        self.start_row = new_start_row;
1932        self.collect();
1933    }
1934
1935    pub fn collect(&mut self) {
1936        // Update proximity threshold based on visible rows
1937        if self.visible_rows > 0 {
1938            self.proximity_threshold = self.visible_rows;
1939        }
1940
1941        // Run len() only when lf has changed (query, filter, sort, pivot, melt, reset, drill).
1942        if !self.num_rows_valid {
1943            self.num_rows =
1944                match collect_lazy(self.lf.clone().select([len()]), self.polars_streaming) {
1945                    Ok(df) => {
1946                        if let Some(col) = df.get(0) {
1947                            if let Some(AnyValue::UInt32(len)) = col.first() {
1948                                *len as usize
1949                            } else {
1950                                0
1951                            }
1952                        } else {
1953                            0
1954                        }
1955                    }
1956                    Err(_) => 0,
1957                };
1958            self.num_rows_valid = true;
1959        }
1960
1961        if self.num_rows > 0 {
1962            let max_start = self.num_rows.saturating_sub(1);
1963            if self.start_row > max_start {
1964                self.start_row = max_start;
1965            }
1966        } else {
1967            self.start_row = 0;
1968            self.buffered_start_row = 0;
1969            self.buffered_end_row = 0;
1970            self.buffered_df = None;
1971            self.df = None;
1972            self.locked_df = None;
1973            return;
1974        }
1975
1976        // Proximity-based buffer logic
1977        let view_start = self.start_row;
1978        let view_end = self.start_row + self.visible_rows.min(self.num_rows - self.start_row);
1979
1980        // Check if current view is within buffered range
1981        let within_buffer = view_start >= self.buffered_start_row
1982            && view_end <= self.buffered_end_row
1983            && self.buffered_end_row > 0;
1984
1985        // Buffer grows incrementally: initial load and each expansion add only a few pages (lookahead + lookback).
1986        // clamp_buffer_to_max_size caps at max_buffered_rows and slides the window when at cap.
1987        let page_rows = self.visible_rows.max(1);
1988
1989        if within_buffer {
1990            let dist_to_start = view_start.saturating_sub(self.buffered_start_row);
1991            let dist_to_end = self.buffered_end_row.saturating_sub(view_end);
1992
1993            let needs_expansion_back =
1994                dist_to_start <= self.proximity_threshold && self.buffered_start_row > 0;
1995            let needs_expansion_forward =
1996                dist_to_end <= self.proximity_threshold && self.buffered_end_row < self.num_rows;
1997
1998            if !needs_expansion_back && !needs_expansion_forward {
1999                // Column scroll only: reuse cached full buffer and re-slice into locked/scroll columns.
2000                let expected_len = self
2001                    .buffered_end_row
2002                    .saturating_sub(self.buffered_start_row);
2003                if self
2004                    .buffered_df
2005                    .as_ref()
2006                    .is_some_and(|b| b.height() == expected_len)
2007                {
2008                    self.slice_buffer_into_display();
2009                    if self.table_state.selected().is_none() {
2010                        self.table_state.select(Some(0));
2011                    }
2012                    return;
2013                }
2014                self.load_buffer(self.buffered_start_row, self.buffered_end_row);
2015                if self.table_state.selected().is_none() {
2016                    self.table_state.select(Some(0));
2017                }
2018                return;
2019            }
2020
2021            let mut new_buffer_start = if needs_expansion_back {
2022                view_start.saturating_sub(self.pages_lookback * page_rows)
2023            } else {
2024                self.buffered_start_row
2025            };
2026
2027            let mut new_buffer_end = if needs_expansion_forward {
2028                (view_end + self.pages_lookahead * page_rows).min(self.num_rows)
2029            } else {
2030                self.buffered_end_row
2031            };
2032
2033            self.clamp_buffer_to_max_size(
2034                view_start,
2035                view_end,
2036                &mut new_buffer_start,
2037                &mut new_buffer_end,
2038            );
2039            self.load_buffer(new_buffer_start, new_buffer_end);
2040        } else {
2041            // Outside buffer: either extend the previous buffer (so it grows) or load a fresh small window.
2042            // Only extend when the view is "close" to the existing buffer (e.g. user paged down a bit).
2043            // A big jump (e.g. jump to end) should load just a window around the new view, not extend
2044            // the buffer across the whole dataset.
2045            let mut new_buffer_start;
2046            let mut new_buffer_end;
2047
2048            let had_buffer = self.buffered_end_row > 0;
2049            let scrolled_past_end = had_buffer && view_start >= self.buffered_end_row;
2050            let scrolled_past_start = had_buffer && view_end <= self.buffered_start_row;
2051
2052            let extend_forward_ok = scrolled_past_end
2053                && (view_start - self.buffered_end_row) <= self.pages_lookahead * page_rows;
2054            let extend_backward_ok = scrolled_past_start
2055                && (self.buffered_start_row - view_end) <= self.pages_lookback * page_rows;
2056
2057            if extend_forward_ok {
2058                // View is just a few pages past buffer end; extend forward.
2059                new_buffer_start = self.buffered_start_row;
2060                new_buffer_end = (view_end + self.pages_lookahead * page_rows).min(self.num_rows);
2061            } else if extend_backward_ok {
2062                // View is just a few pages before buffer start; extend backward.
2063                new_buffer_start = view_start.saturating_sub(self.pages_lookback * page_rows);
2064                new_buffer_end = self.buffered_end_row;
2065            } else if scrolled_past_end || scrolled_past_start {
2066                // Big jump (e.g. jump to end or jump to start): load a fresh window around the view.
2067                new_buffer_start = view_start.saturating_sub(self.pages_lookback * page_rows);
2068                new_buffer_end = (view_end + self.pages_lookahead * page_rows).min(self.num_rows);
2069                let min_initial_len = (1 + self.pages_lookahead + self.pages_lookback) * page_rows;
2070                let current_len = new_buffer_end.saturating_sub(new_buffer_start);
2071                if current_len < min_initial_len {
2072                    let need = min_initial_len.saturating_sub(current_len);
2073                    let can_extend_end = self.num_rows.saturating_sub(new_buffer_end);
2074                    let can_extend_start = new_buffer_start;
2075                    if can_extend_end >= need {
2076                        new_buffer_end = (new_buffer_end + need).min(self.num_rows);
2077                    } else if can_extend_start >= need {
2078                        new_buffer_start = new_buffer_start.saturating_sub(need);
2079                    } else {
2080                        new_buffer_end = (new_buffer_end + can_extend_end).min(self.num_rows);
2081                        new_buffer_start =
2082                            new_buffer_start.saturating_sub(need.saturating_sub(can_extend_end));
2083                    }
2084                }
2085            } else {
2086                // No buffer yet or big jump: load a fresh small window (view ± a few pages).
2087                new_buffer_start = view_start.saturating_sub(self.pages_lookback * page_rows);
2088                new_buffer_end = (view_end + self.pages_lookahead * page_rows).min(self.num_rows);
2089
2090                // Ensure at least (1 + lookahead + lookback) pages so buffer size is consistent (e.g. 364 at 52 visible).
2091                let min_initial_len = (1 + self.pages_lookahead + self.pages_lookback) * page_rows;
2092                let current_len = new_buffer_end.saturating_sub(new_buffer_start);
2093                if current_len < min_initial_len {
2094                    let need = min_initial_len.saturating_sub(current_len);
2095                    let can_extend_end = self.num_rows.saturating_sub(new_buffer_end);
2096                    let can_extend_start = new_buffer_start;
2097                    if can_extend_end >= need {
2098                        new_buffer_end = (new_buffer_end + need).min(self.num_rows);
2099                    } else if can_extend_start >= need {
2100                        new_buffer_start = new_buffer_start.saturating_sub(need);
2101                    } else {
2102                        new_buffer_end = (new_buffer_end + can_extend_end).min(self.num_rows);
2103                        new_buffer_start =
2104                            new_buffer_start.saturating_sub(need.saturating_sub(can_extend_end));
2105                    }
2106                }
2107            }
2108
2109            self.clamp_buffer_to_max_size(
2110                view_start,
2111                view_end,
2112                &mut new_buffer_start,
2113                &mut new_buffer_end,
2114            );
2115            self.load_buffer(new_buffer_start, new_buffer_end);
2116        }
2117
2118        self.slice_from_buffer();
2119        if self.table_state.selected().is_none() {
2120            self.table_state.select(Some(0));
2121        }
2122    }
2123
2124    /// Invalidate num_rows cache when lf is mutated.
2125    fn invalidate_num_rows(&mut self) {
2126        self.num_rows_valid = false;
2127    }
2128
2129    /// Returns the cached row count when valid (same value shown in the control bar). Use this to
2130    /// avoid an extra full scan for analysis/describe when the table has already been collected.
2131    pub fn num_rows_if_valid(&self) -> Option<usize> {
2132        if self.num_rows_valid {
2133            Some(self.num_rows)
2134        } else {
2135            None
2136        }
2137    }
2138
2139    /// Clamp buffer to max_buffered_rows; when at cap, slide window to keep view inside.
2140    fn clamp_buffer_to_max_size(
2141        &self,
2142        view_start: usize,
2143        view_end: usize,
2144        buffer_start: &mut usize,
2145        buffer_end: &mut usize,
2146    ) {
2147        if self.max_buffered_rows == 0 {
2148            return;
2149        }
2150        let max_len = self.max_buffered_rows;
2151        let requested_len = buffer_end.saturating_sub(*buffer_start);
2152        if requested_len <= max_len {
2153            return;
2154        }
2155        let view_len = view_end.saturating_sub(view_start);
2156        if view_len >= max_len {
2157            *buffer_start = view_start;
2158            *buffer_end = (view_start + max_len).min(self.num_rows);
2159        } else {
2160            let half = (max_len - view_len) / 2;
2161            *buffer_end = (view_end + half).min(self.num_rows);
2162            *buffer_start = (*buffer_end).saturating_sub(max_len);
2163            if *buffer_start > view_start {
2164                *buffer_start = view_start;
2165            }
2166            *buffer_end = (*buffer_start + max_len).min(self.num_rows);
2167        }
2168    }
2169
2170    fn load_buffer(&mut self, buffer_start: usize, buffer_end: usize) {
2171        let buffer_size = buffer_end.saturating_sub(buffer_start);
2172        if buffer_size == 0 {
2173            return;
2174        }
2175
2176        let all_columns: Vec<_> = self
2177            .column_order
2178            .iter()
2179            .map(|name| col(name.as_str()))
2180            .collect();
2181
2182        let use_streaming = self.polars_streaming;
2183        let mut full_df = match collect_lazy(
2184            self.lf
2185                .clone()
2186                .select(all_columns)
2187                .slice(buffer_start as i64, buffer_size as u32),
2188            use_streaming,
2189        ) {
2190            Ok(df) => df,
2191            Err(e) => {
2192                self.error = Some(e);
2193                return;
2194            }
2195        };
2196
2197        let mut effective_buffer_end = buffer_end;
2198        if self.max_buffered_mb > 0 {
2199            let size = full_df.estimated_size();
2200            let max_bytes = self.max_buffered_mb * 1024 * 1024;
2201            if size > max_bytes {
2202                let rows = full_df.height();
2203                if rows > 0 {
2204                    let bytes_per_row = size / rows;
2205                    let max_rows = (max_bytes / bytes_per_row.max(1)).min(rows);
2206                    if max_rows < rows {
2207                        full_df = full_df.slice(0, max_rows);
2208                        effective_buffer_end = buffer_start + max_rows;
2209                    }
2210                }
2211            }
2212        }
2213
2214        if self.locked_columns_count > 0 {
2215            let locked_names: Vec<&str> = self
2216                .column_order
2217                .iter()
2218                .take(self.locked_columns_count)
2219                .map(|s| s.as_str())
2220                .collect();
2221            let locked_df = match full_df.select(locked_names) {
2222                Ok(df) => df,
2223                Err(e) => {
2224                    self.error = Some(e);
2225                    return;
2226                }
2227            };
2228            self.locked_df = if self.is_grouped() {
2229                match self.format_grouped_dataframe(locked_df) {
2230                    Ok(formatted_df) => Some(formatted_df),
2231                    Err(e) => {
2232                        self.error = Some(PolarsError::ComputeError(
2233                            crate::error_display::user_message_from_report(&e, None).into(),
2234                        ));
2235                        return;
2236                    }
2237                }
2238            } else {
2239                Some(locked_df)
2240            };
2241        } else {
2242            self.locked_df = None;
2243        }
2244
2245        let scroll_names: Vec<&str> = self
2246            .column_order
2247            .iter()
2248            .skip(self.locked_columns_count + self.termcol_index)
2249            .map(|s| s.as_str())
2250            .collect();
2251        if scroll_names.is_empty() {
2252            self.df = None;
2253        } else {
2254            let scroll_df = match full_df.select(scroll_names) {
2255                Ok(df) => df,
2256                Err(e) => {
2257                    self.error = Some(e);
2258                    return;
2259                }
2260            };
2261            self.df = if self.is_grouped() {
2262                match self.format_grouped_dataframe(scroll_df) {
2263                    Ok(formatted_df) => Some(formatted_df),
2264                    Err(e) => {
2265                        self.error = Some(PolarsError::ComputeError(
2266                            crate::error_display::user_message_from_report(&e, None).into(),
2267                        ));
2268                        return;
2269                    }
2270                }
2271            } else {
2272                Some(scroll_df)
2273            };
2274        }
2275        if self.error.is_some() {
2276            self.error = None;
2277        }
2278        self.buffered_start_row = buffer_start;
2279        self.buffered_end_row = effective_buffer_end;
2280        self.buffered_df = Some(full_df);
2281    }
2282
2283    /// Recompute locked_df and df from the cached full buffer. Used when only termcol_index (or locked columns) changed.
2284    fn slice_buffer_into_display(&mut self) {
2285        let full_df = match self.buffered_df.as_ref() {
2286            Some(df) => df,
2287            None => return,
2288        };
2289
2290        if self.locked_columns_count > 0 {
2291            let locked_names: Vec<&str> = self
2292                .column_order
2293                .iter()
2294                .take(self.locked_columns_count)
2295                .map(|s| s.as_str())
2296                .collect();
2297            if let Ok(locked_df) = full_df.select(locked_names) {
2298                self.locked_df = if self.is_grouped() {
2299                    self.format_grouped_dataframe(locked_df).ok()
2300                } else {
2301                    Some(locked_df)
2302                };
2303            }
2304        } else {
2305            self.locked_df = None;
2306        }
2307
2308        let scroll_names: Vec<&str> = self
2309            .column_order
2310            .iter()
2311            .skip(self.locked_columns_count + self.termcol_index)
2312            .map(|s| s.as_str())
2313            .collect();
2314        if scroll_names.is_empty() {
2315            self.df = None;
2316        } else if let Ok(scroll_df) = full_df.select(scroll_names) {
2317            self.df = if self.is_grouped() {
2318                self.format_grouped_dataframe(scroll_df).ok()
2319            } else {
2320                Some(scroll_df)
2321            };
2322        }
2323    }
2324
2325    fn slice_from_buffer(&mut self) {
2326        // Buffer contains the full range [buffered_start_row, buffered_end_row)
2327        // The displayed portion [start_row, start_row + visible_rows) is a subset
2328        // We'll slice the displayed portion when rendering based on offset
2329        // No action needed here - the buffer is stored, slicing happens at render time
2330    }
2331
2332    fn format_grouped_dataframe(&self, df: DataFrame) -> Result<DataFrame> {
2333        let schema = df.schema();
2334        let mut new_series = Vec::new();
2335
2336        for (col_name, dtype) in schema.iter() {
2337            let col = df.column(col_name)?;
2338            if matches!(dtype, DataType::List(_)) {
2339                let string_series: Series = col
2340                    .list()?
2341                    .into_iter()
2342                    .map(|opt_list| {
2343                        opt_list.map(|list_series| {
2344                            let values: Vec<String> = list_series
2345                                .iter()
2346                                .take(10)
2347                                .map(|v| v.str_value().to_string())
2348                                .collect();
2349                            if list_series.len() > 10 {
2350                                format!("[{}...] ({} items)", values.join(", "), list_series.len())
2351                            } else {
2352                                format!("[{}]", values.join(", "))
2353                            }
2354                        })
2355                    })
2356                    .collect();
2357                new_series.push(string_series.with_name(col_name.as_str().into()).into());
2358            } else {
2359                new_series.push(col.clone());
2360            }
2361        }
2362
2363        Ok(DataFrame::new(new_series)?)
2364    }
2365
2366    pub fn select_next(&mut self) {
2367        self.table_state.select_next();
2368        if let Some(selected) = self.table_state.selected() {
2369            if selected >= self.visible_rows && self.visible_rows > 0 {
2370                self.slide_table(1);
2371            }
2372        }
2373    }
2374
2375    pub fn page_down(&mut self) {
2376        self.slide_table(self.visible_rows as i64);
2377    }
2378
2379    pub fn select_previous(&mut self) {
2380        if let Some(selected) = self.table_state.selected() {
2381            self.table_state.select_previous();
2382            if selected == 0 && self.start_row > 0 {
2383                self.slide_table(-1);
2384            }
2385        } else {
2386            self.table_state.select(Some(0));
2387        }
2388    }
2389
2390    pub fn scroll_to(&mut self, index: usize) {
2391        if self.start_row == index {
2392            return;
2393        }
2394
2395        if index == 0 {
2396            self.start_row = 0;
2397            self.collect();
2398            self.start_row = 0;
2399        } else {
2400            self.start_row = index;
2401            self.collect();
2402        }
2403    }
2404
2405    /// Scroll so that the given row index is centered in the view when possible (respects table bounds).
2406    /// Selects that row. Used by go-to-line.
2407    pub fn scroll_to_row_centered(&mut self, row_index: usize) {
2408        self.ensure_num_rows();
2409        if self.num_rows == 0 || self.visible_rows == 0 {
2410            return;
2411        }
2412        let center_offset = self.visible_rows / 2;
2413        let mut start_row = row_index.saturating_sub(center_offset);
2414        let max_start = self.num_rows.saturating_sub(self.visible_rows);
2415        start_row = start_row.min(max_start);
2416
2417        if self.start_row == start_row {
2418            let display_idx = row_index
2419                .saturating_sub(start_row)
2420                .min(self.visible_rows.saturating_sub(1));
2421            self.table_state.select(Some(display_idx));
2422            return;
2423        }
2424
2425        self.start_row = start_row;
2426        self.collect();
2427        let display_idx = row_index
2428            .saturating_sub(start_row)
2429            .min(self.visible_rows.saturating_sub(1));
2430        self.table_state.select(Some(display_idx));
2431    }
2432
2433    /// Ensure num_rows is up to date (runs len() query if needed). Used before scroll_to_end.
2434    fn ensure_num_rows(&mut self) {
2435        if self.num_rows_valid {
2436            return;
2437        }
2438        if self.visible_rows > 0 {
2439            self.proximity_threshold = self.visible_rows;
2440        }
2441        self.num_rows = match self.lf.clone().select([len()]).collect() {
2442            Ok(df) => {
2443                if let Some(col) = df.get(0) {
2444                    if let Some(AnyValue::UInt32(len)) = col.first() {
2445                        *len as usize
2446                    } else {
2447                        0
2448                    }
2449                } else {
2450                    0
2451                }
2452            }
2453            Err(_) => 0,
2454        };
2455        self.num_rows_valid = true;
2456    }
2457
2458    /// Jump to the last page; buffer is trimmed/loaded as needed. Selects the last row.
2459    pub fn scroll_to_end(&mut self) {
2460        self.ensure_num_rows();
2461        if self.num_rows == 0 {
2462            self.start_row = 0;
2463            self.buffered_start_row = 0;
2464            self.buffered_end_row = 0;
2465            return;
2466        }
2467        let end_start = self.num_rows.saturating_sub(self.visible_rows);
2468        if self.start_row == end_start {
2469            self.select_last_visible_row();
2470            return;
2471        }
2472        self.start_row = end_start;
2473        self.collect();
2474        self.select_last_visible_row();
2475    }
2476
2477    /// Set table selection to the last row in the current view (for use after scroll_to_end).
2478    fn select_last_visible_row(&mut self) {
2479        if self.num_rows == 0 {
2480            return;
2481        }
2482        let last_row_display_idx = (self.num_rows - 1).saturating_sub(self.start_row);
2483        let sel = last_row_display_idx.min(self.visible_rows.saturating_sub(1));
2484        self.table_state.select(Some(sel));
2485    }
2486
2487    pub fn half_page_down(&mut self) {
2488        let half = (self.visible_rows / 2).max(1) as i64;
2489        self.slide_table(half);
2490    }
2491
2492    pub fn half_page_up(&mut self) {
2493        if self.start_row == 0 {
2494            return;
2495        }
2496        let half = (self.visible_rows / 2).max(1) as i64;
2497        self.slide_table(-half);
2498    }
2499
2500    pub fn page_up(&mut self) {
2501        if self.start_row == 0 {
2502            return;
2503        }
2504        self.slide_table(-(self.visible_rows as i64));
2505    }
2506
2507    pub fn scroll_right(&mut self) {
2508        let max_scroll = self
2509            .column_order
2510            .len()
2511            .saturating_sub(self.locked_columns_count);
2512        if self.termcol_index < max_scroll.saturating_sub(1) {
2513            self.termcol_index += 1;
2514            self.collect();
2515        }
2516    }
2517
2518    pub fn scroll_left(&mut self) {
2519        if self.termcol_index > 0 {
2520            self.termcol_index -= 1;
2521            self.collect();
2522        }
2523    }
2524
2525    pub fn headers(&self) -> Vec<String> {
2526        self.column_order.clone()
2527    }
2528
2529    pub fn set_column_order(&mut self, order: Vec<String>) {
2530        self.column_order = order;
2531        self.buffered_start_row = 0;
2532        self.buffered_end_row = 0;
2533        self.buffered_df = None;
2534        self.collect();
2535    }
2536
2537    pub fn set_locked_columns(&mut self, count: usize) {
2538        self.locked_columns_count = count.min(self.column_order.len());
2539        self.buffered_start_row = 0;
2540        self.buffered_end_row = 0;
2541        self.buffered_df = None;
2542        self.collect();
2543    }
2544
2545    pub fn locked_columns_count(&self) -> usize {
2546        self.locked_columns_count
2547    }
2548
2549    // Getter methods for template creation
2550    pub fn get_filters(&self) -> &[FilterStatement] {
2551        &self.filters
2552    }
2553
2554    pub fn get_sort_columns(&self) -> &[String] {
2555        &self.sort_columns
2556    }
2557
2558    pub fn get_sort_ascending(&self) -> bool {
2559        self.sort_ascending
2560    }
2561
2562    pub fn get_column_order(&self) -> &[String] {
2563        &self.column_order
2564    }
2565
2566    pub fn get_active_query(&self) -> &str {
2567        &self.active_query
2568    }
2569
2570    pub fn get_active_sql_query(&self) -> &str {
2571        &self.active_sql_query
2572    }
2573
2574    pub fn get_active_fuzzy_query(&self) -> &str {
2575        &self.active_fuzzy_query
2576    }
2577
2578    pub fn last_pivot_spec(&self) -> Option<&PivotSpec> {
2579        self.last_pivot_spec.as_ref()
2580    }
2581
2582    pub fn last_melt_spec(&self) -> Option<&MeltSpec> {
2583        self.last_melt_spec.as_ref()
2584    }
2585
2586    pub fn is_grouped(&self) -> bool {
2587        self.schema
2588            .iter()
2589            .any(|(_, dtype)| matches!(dtype, DataType::List(_)))
2590    }
2591
2592    pub fn group_key_columns(&self) -> Vec<String> {
2593        self.schema
2594            .iter()
2595            .filter(|(_, dtype)| !matches!(dtype, DataType::List(_)))
2596            .map(|(name, _)| name.to_string())
2597            .collect()
2598    }
2599
2600    pub fn group_value_columns(&self) -> Vec<String> {
2601        self.schema
2602            .iter()
2603            .filter(|(_, dtype)| matches!(dtype, DataType::List(_)))
2604            .map(|(name, _)| name.to_string())
2605            .collect()
2606    }
2607
2608    /// Estimated heap size in bytes of the currently buffered slice (locked + scrollable), if collected.
2609    pub fn buffered_memory_bytes(&self) -> Option<usize> {
2610        let locked = self
2611            .locked_df
2612            .as_ref()
2613            .map(|df| df.estimated_size())
2614            .unwrap_or(0);
2615        let scroll = self.df.as_ref().map(|df| df.estimated_size()).unwrap_or(0);
2616        if locked == 0 && scroll == 0 {
2617            None
2618        } else {
2619            Some(locked + scroll)
2620        }
2621    }
2622
2623    /// Number of rows currently in the buffer. 0 if no buffer loaded.
2624    pub fn buffered_rows(&self) -> usize {
2625        self.buffered_end_row
2626            .saturating_sub(self.buffered_start_row)
2627    }
2628
2629    /// Current scrollable display buffer. None until first collect().
2630    pub fn display_df(&self) -> Option<&DataFrame> {
2631        self.df.as_ref()
2632    }
2633
2634    /// Visible-window slice of the display buffer (same as passed to render_dataframe).
2635    pub fn display_slice_df(&self) -> Option<DataFrame> {
2636        let df = self.df.as_ref()?;
2637        let offset = self.start_row.saturating_sub(self.buffered_start_row);
2638        let slice_len = self.visible_rows.min(df.height().saturating_sub(offset));
2639        if offset < df.height() && slice_len > 0 {
2640            Some(df.slice(offset as i64, slice_len))
2641        } else {
2642            None
2643        }
2644    }
2645
2646    /// Maximum buffer size in rows (0 = no limit).
2647    pub fn max_buffered_rows(&self) -> usize {
2648        self.max_buffered_rows
2649    }
2650
2651    /// Maximum buffer size in MiB (0 = no limit).
2652    pub fn max_buffered_mb(&self) -> usize {
2653        self.max_buffered_mb
2654    }
2655
2656    pub fn drill_down_into_group(&mut self, group_index: usize) -> Result<()> {
2657        if !self.is_grouped() {
2658            return Ok(());
2659        }
2660
2661        self.grouped_lf = Some(self.lf.clone());
2662
2663        let grouped_df = collect_lazy(self.lf.clone(), self.polars_streaming)?;
2664
2665        if group_index >= grouped_df.height() {
2666            return Err(color_eyre::eyre::eyre!("Group index out of bounds"));
2667        }
2668
2669        let key_columns = self.group_key_columns();
2670        let mut key_values = Vec::new();
2671        for col_name in &key_columns {
2672            let col = grouped_df.column(col_name)?;
2673            let value = col.get(group_index).map_err(|e| {
2674                color_eyre::eyre::eyre!(
2675                    "Group index {} out of bounds for column {}: {}",
2676                    group_index,
2677                    col_name,
2678                    e
2679                )
2680            })?;
2681            key_values.push(value.str_value().to_string());
2682        }
2683        self.drilled_down_group_key = Some(key_values.clone());
2684        self.drilled_down_group_key_columns = Some(key_columns.clone());
2685
2686        let value_columns = self.group_value_columns();
2687        if value_columns.is_empty() {
2688            return Err(color_eyre::eyre::eyre!("No value columns in grouped data"));
2689        }
2690
2691        let mut columns = Vec::new();
2692
2693        let first_value_col = grouped_df.column(&value_columns[0])?;
2694        let first_list_value = first_value_col.get(group_index).map_err(|e| {
2695            color_eyre::eyre::eyre!("Group index {} out of bounds: {}", group_index, e)
2696        })?;
2697        let row_count = if let AnyValue::List(list_series) = first_list_value {
2698            list_series.len()
2699        } else {
2700            0
2701        };
2702
2703        for col_name in &key_columns {
2704            let col = grouped_df.column(col_name)?;
2705            let value = col.get(group_index).map_err(|e| {
2706                color_eyre::eyre::eyre!(
2707                    "Group index {} out of bounds for column {}: {}",
2708                    group_index,
2709                    col_name,
2710                    e
2711                )
2712            })?;
2713            let constant_series = match value {
2714                AnyValue::Int32(v) => Series::new(col_name.as_str().into(), vec![v; row_count]),
2715                AnyValue::Int64(v) => Series::new(col_name.as_str().into(), vec![v; row_count]),
2716                AnyValue::UInt32(v) => Series::new(col_name.as_str().into(), vec![v; row_count]),
2717                AnyValue::UInt64(v) => Series::new(col_name.as_str().into(), vec![v; row_count]),
2718                AnyValue::Float32(v) => Series::new(col_name.as_str().into(), vec![v; row_count]),
2719                AnyValue::Float64(v) => Series::new(col_name.as_str().into(), vec![v; row_count]),
2720                AnyValue::String(v) => {
2721                    Series::new(col_name.as_str().into(), vec![v.to_string(); row_count])
2722                }
2723                AnyValue::Boolean(v) => Series::new(col_name.as_str().into(), vec![v; row_count]),
2724                _ => {
2725                    let str_val = value.str_value().to_string();
2726                    Series::new(col_name.as_str().into(), vec![str_val; row_count])
2727                }
2728            };
2729            columns.push(constant_series.into());
2730        }
2731
2732        for col_name in &value_columns {
2733            let col = grouped_df.column(col_name)?;
2734            let value = col.get(group_index).map_err(|e| {
2735                color_eyre::eyre::eyre!(
2736                    "Group index {} out of bounds for column {}: {}",
2737                    group_index,
2738                    col_name,
2739                    e
2740                )
2741            })?;
2742            if let AnyValue::List(list_series) = value {
2743                let named_series = list_series.with_name(col_name.as_str().into());
2744                columns.push(named_series.into());
2745            }
2746        }
2747
2748        let group_df = DataFrame::new(columns)?;
2749
2750        self.invalidate_num_rows();
2751        self.lf = group_df.lazy();
2752        self.schema = self.lf.clone().collect_schema()?;
2753        self.column_order = self.schema.iter_names().map(|s| s.to_string()).collect();
2754        self.drilled_down_group_index = Some(group_index);
2755        self.start_row = 0;
2756        self.termcol_index = 0;
2757        self.locked_columns_count = 0;
2758        self.table_state.select(Some(0));
2759        self.collect();
2760
2761        Ok(())
2762    }
2763
2764    pub fn drill_up(&mut self) -> Result<()> {
2765        if let Some(grouped_lf) = self.grouped_lf.take() {
2766            self.invalidate_num_rows();
2767            self.lf = grouped_lf;
2768            self.schema = self.lf.clone().collect_schema()?;
2769            self.column_order = self.schema.iter_names().map(|s| s.to_string()).collect();
2770            self.drilled_down_group_index = None;
2771            self.drilled_down_group_key = None;
2772            self.drilled_down_group_key_columns = None;
2773            self.start_row = 0;
2774            self.termcol_index = 0;
2775            self.locked_columns_count = 0;
2776            self.table_state.select(Some(0));
2777            self.collect();
2778            Ok(())
2779        } else {
2780            Err(color_eyre::eyre::eyre!("Not in drill-down mode"))
2781        }
2782    }
2783
2784    pub fn get_analysis_dataframe(&self) -> Result<DataFrame> {
2785        Ok(collect_lazy(self.lf.clone(), self.polars_streaming)?)
2786    }
2787
2788    pub fn get_analysis_context(&self) -> crate::statistics::AnalysisContext {
2789        crate::statistics::AnalysisContext {
2790            has_query: !self.active_query.is_empty(),
2791            query: self.active_query.clone(),
2792            has_filters: !self.filters.is_empty(),
2793            filter_count: self.filters.len(),
2794            is_drilled_down: self.is_drilled_down(),
2795            group_key: self.drilled_down_group_key.clone(),
2796            group_columns: self.drilled_down_group_key_columns.clone(),
2797        }
2798    }
2799
2800    /// Polars 0.52 pivot_stable panics (from_physical Date/UInt32) when index is Date/Datetime. Cast to Int32, restore after.
2801    /// Returns (modified df, list of (column name, original dtype) to restore after pivot).
2802    fn cast_temporal_index_columns_for_pivot(
2803        df: &DataFrame,
2804        index: &[String],
2805    ) -> Result<(DataFrame, Vec<(String, DataType)>)> {
2806        let mut out = df.clone();
2807        let mut restore = Vec::new();
2808        for name in index {
2809            if let Ok(s) = out.column(name) {
2810                let dtype = s.dtype();
2811                if matches!(dtype, DataType::Date | DataType::Datetime(_, _)) {
2812                    restore.push((name.clone(), dtype.clone()));
2813                    let casted = s.cast(&DataType::Int32)?;
2814                    out.with_column(casted)?;
2815                }
2816            }
2817        }
2818        Ok((out, restore))
2819    }
2820
2821    /// Restore Date/Datetime types on index columns after pivot.
2822    fn restore_temporal_index_columns_after_pivot(
2823        pivoted: &mut DataFrame,
2824        restore: &[(String, DataType)],
2825    ) -> Result<()> {
2826        for (name, dtype) in restore {
2827            if let Ok(s) = pivoted.column(name) {
2828                let restored = s.cast(dtype)?;
2829                pivoted.with_column(restored)?;
2830            }
2831        }
2832        Ok(())
2833    }
2834
2835    /// Pivot the current `LazyFrame` (long → wide). Never uses `original_lf`.
2836    /// Collects current `lf`, runs `pivot_stable`, then replaces `lf` with result.
2837    /// We use pivot_stable for all aggregation types: Polars' non-stable pivot() prints
2838    /// "unstable pivot not yet supported, using stable pivot" to stdout, which corrupts the TUI.
2839    pub fn pivot(&mut self, spec: &PivotSpec) -> Result<()> {
2840        let df = collect_lazy(self.lf.clone(), self.polars_streaming)?;
2841        let agg_expr = pivot_agg_expr(spec.aggregation)?;
2842        let index_str: Vec<&str> = spec.index.iter().map(|s| s.as_str()).collect();
2843        let index_opt = if index_str.is_empty() {
2844            None
2845        } else {
2846            Some(index_str)
2847        };
2848
2849        let (df_for_pivot, temporal_index_restore) = if self.workaround_pivot_date_index {
2850            let (df_w, restore) =
2851                Self::cast_temporal_index_columns_for_pivot(&df, spec.index.as_slice())?;
2852            (df_w, Some(restore))
2853        } else {
2854            (df.clone(), None)
2855        };
2856        let sort_new_columns = spec.sort_columns.unwrap_or(true);
2857        let mut pivoted = pivot_stable(
2858            &df_for_pivot,
2859            [spec.pivot_column.as_str()],
2860            index_opt,
2861            Some([spec.value_column.as_str()]),
2862            sort_new_columns,
2863            Some(agg_expr),
2864            None,
2865        )?;
2866        if let Some(restore) = &temporal_index_restore {
2867            Self::restore_temporal_index_columns_after_pivot(&mut pivoted, restore)?;
2868        }
2869
2870        self.last_pivot_spec = Some(spec.clone());
2871        self.last_melt_spec = None;
2872        self.replace_lf_after_reshape(pivoted.lazy())?;
2873        Ok(())
2874    }
2875
2876    /// Melt the current `LazyFrame` (wide → long). Never uses `original_lf`.
2877    pub fn melt(&mut self, spec: &MeltSpec) -> Result<()> {
2878        let on = cols(spec.value_columns.iter().map(|s| s.as_str()));
2879        let index = cols(spec.index.iter().map(|s| s.as_str()));
2880        let args = UnpivotArgsDSL {
2881            on,
2882            index,
2883            variable_name: Some(PlSmallStr::from(spec.variable_name.as_str())),
2884            value_name: Some(PlSmallStr::from(spec.value_name.as_str())),
2885        };
2886        let lf = self.lf.clone().unpivot(args);
2887        self.last_melt_spec = Some(spec.clone());
2888        self.last_pivot_spec = None;
2889        self.replace_lf_after_reshape(lf)?;
2890        Ok(())
2891    }
2892
2893    fn replace_lf_after_reshape(&mut self, lf: LazyFrame) -> Result<()> {
2894        self.invalidate_num_rows();
2895        self.lf = lf;
2896        self.schema = self.lf.clone().collect_schema()?;
2897        self.column_order = self.schema.iter_names().map(|s| s.to_string()).collect();
2898        self.filters.clear();
2899        self.sort_columns.clear();
2900        self.active_query.clear();
2901        self.active_sql_query.clear();
2902        self.active_fuzzy_query.clear();
2903        self.error = None;
2904        self.df = None;
2905        self.locked_df = None;
2906        self.grouped_lf = None;
2907        self.drilled_down_group_index = None;
2908        self.drilled_down_group_key = None;
2909        self.drilled_down_group_key_columns = None;
2910        self.start_row = 0;
2911        self.termcol_index = 0;
2912        self.locked_columns_count = 0;
2913        self.buffered_start_row = 0;
2914        self.buffered_end_row = 0;
2915        self.buffered_df = None;
2916        self.table_state.select(Some(0));
2917        self.collect();
2918        Ok(())
2919    }
2920
2921    pub fn is_drilled_down(&self) -> bool {
2922        self.drilled_down_group_index.is_some()
2923    }
2924
2925    fn apply_transformations(&mut self) {
2926        let mut lf = self.lf.clone();
2927        let mut final_expr: Option<Expr> = None;
2928
2929        for filter in &self.filters {
2930            let col_expr = col(&filter.column);
2931            let val_lit = if let Some(dtype) = self.schema.get(&filter.column) {
2932                match dtype {
2933                    DataType::Float32 | DataType::Float64 => filter
2934                        .value
2935                        .parse::<f64>()
2936                        .map(lit)
2937                        .unwrap_or_else(|_| lit(filter.value.as_str())),
2938                    DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => filter
2939                        .value
2940                        .parse::<i64>()
2941                        .map(lit)
2942                        .unwrap_or_else(|_| lit(filter.value.as_str())),
2943                    DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 => {
2944                        filter
2945                            .value
2946                            .parse::<u64>()
2947                            .map(lit)
2948                            .unwrap_or_else(|_| lit(filter.value.as_str()))
2949                    }
2950                    DataType::Boolean => filter
2951                        .value
2952                        .parse::<bool>()
2953                        .map(lit)
2954                        .unwrap_or_else(|_| lit(filter.value.as_str())),
2955                    _ => lit(filter.value.as_str()),
2956                }
2957            } else {
2958                lit(filter.value.as_str())
2959            };
2960
2961            let op_expr = match filter.operator {
2962                FilterOperator::Eq => col_expr.eq(val_lit),
2963                FilterOperator::NotEq => col_expr.neq(val_lit),
2964                FilterOperator::Gt => col_expr.gt(val_lit),
2965                FilterOperator::Lt => col_expr.lt(val_lit),
2966                FilterOperator::GtEq => col_expr.gt_eq(val_lit),
2967                FilterOperator::LtEq => col_expr.lt_eq(val_lit),
2968                FilterOperator::Contains => {
2969                    let val = filter.value.clone();
2970                    col_expr.str().contains_literal(lit(val))
2971                }
2972                FilterOperator::NotContains => {
2973                    let val = filter.value.clone();
2974                    col_expr.str().contains_literal(lit(val)).not()
2975                }
2976            };
2977
2978            if let Some(current) = final_expr {
2979                final_expr = Some(match filter.logical_op {
2980                    LogicalOperator::And => current.and(op_expr),
2981                    LogicalOperator::Or => current.or(op_expr),
2982                });
2983            } else {
2984                final_expr = Some(op_expr);
2985            }
2986        }
2987
2988        if let Some(e) = final_expr {
2989            lf = lf.filter(e);
2990        }
2991
2992        if !self.sort_columns.is_empty() {
2993            let options = SortMultipleOptions {
2994                descending: self
2995                    .sort_columns
2996                    .iter()
2997                    .map(|_| !self.sort_ascending)
2998                    .collect(),
2999                ..Default::default()
3000            };
3001            lf = lf.sort_by_exprs(
3002                self.sort_columns.iter().map(col).collect::<Vec<_>>(),
3003                options,
3004            );
3005        } else if !self.sort_ascending {
3006            lf = lf.reverse();
3007        }
3008
3009        self.invalidate_num_rows();
3010        self.lf = lf;
3011        self.collect();
3012    }
3013
3014    pub fn sort(&mut self, columns: Vec<String>, ascending: bool) {
3015        self.sort_columns = columns;
3016        self.sort_ascending = ascending;
3017        self.buffered_start_row = 0;
3018        self.buffered_end_row = 0;
3019        self.buffered_df = None;
3020        self.apply_transformations();
3021    }
3022
3023    pub fn reverse(&mut self) {
3024        self.sort_ascending = !self.sort_ascending;
3025
3026        self.buffered_start_row = 0;
3027        self.buffered_end_row = 0;
3028        self.buffered_df = None;
3029
3030        if !self.sort_columns.is_empty() {
3031            let options = SortMultipleOptions {
3032                descending: self
3033                    .sort_columns
3034                    .iter()
3035                    .map(|_| !self.sort_ascending)
3036                    .collect(),
3037                ..Default::default()
3038            };
3039            self.invalidate_num_rows();
3040            self.lf = self.lf.clone().sort_by_exprs(
3041                self.sort_columns.iter().map(col).collect::<Vec<_>>(),
3042                options,
3043            );
3044            self.collect();
3045        } else {
3046            self.invalidate_num_rows();
3047            self.lf = self.lf.clone().reverse();
3048            self.collect();
3049        }
3050    }
3051
3052    pub fn filter(&mut self, filters: Vec<FilterStatement>) {
3053        self.filters = filters;
3054        self.buffered_start_row = 0;
3055        self.buffered_end_row = 0;
3056        self.buffered_df = None;
3057        self.apply_transformations();
3058    }
3059
3060    pub fn query(&mut self, query: String) {
3061        self.error = None;
3062
3063        let trimmed_query = query.trim();
3064        if trimmed_query.is_empty() {
3065            self.reset_lf_to_original();
3066            self.collect();
3067            return;
3068        }
3069
3070        match parse_query(&query) {
3071            Ok((cols, filter, group_by_cols, group_by_col_names)) => {
3072                let mut lf = self.original_lf.clone();
3073                let mut schema_opt: Option<Arc<Schema>> = None;
3074
3075                // Apply filter first (where clause)
3076                if let Some(f) = filter {
3077                    lf = lf.filter(f);
3078                }
3079
3080                if !group_by_cols.is_empty() {
3081                    if !cols.is_empty() {
3082                        lf = lf.group_by(group_by_cols.clone()).agg(cols);
3083                    } else {
3084                        let schema = match lf.clone().collect_schema() {
3085                            Ok(s) => s,
3086                            Err(e) => {
3087                                self.error = Some(e);
3088                                return; // Don't modify state on error
3089                            }
3090                        };
3091                        let all_columns: Vec<String> =
3092                            schema.iter_names().map(|s| s.to_string()).collect();
3093
3094                        // In Polars, when you group_by and aggregate columns without explicit aggregation functions,
3095                        // Polars automatically collects the values as lists. We need to aggregate all columns
3096                        // except the group columns to avoid duplicates.
3097                        let mut agg_exprs = Vec::new();
3098                        for col_name in &all_columns {
3099                            if !group_by_col_names.contains(col_name) {
3100                                agg_exprs.push(col(col_name));
3101                            }
3102                        }
3103
3104                        lf = lf.group_by(group_by_cols.clone()).agg(agg_exprs);
3105                    }
3106                    // Sort by the result's group-key column names (first N columns after agg).
3107                    // Works for aliased or plain names without relying on parser-derived names.
3108                    let schema = match lf.collect_schema() {
3109                        Ok(s) => s,
3110                        Err(e) => {
3111                            self.error = Some(e);
3112                            return;
3113                        }
3114                    };
3115                    schema_opt = Some(schema.clone());
3116                    let sort_exprs: Vec<Expr> = schema
3117                        .iter_names()
3118                        .take(group_by_cols.len())
3119                        .map(|n| col(n.as_str()))
3120                        .collect();
3121                    lf = lf.sort_by_exprs(sort_exprs, Default::default());
3122                } else if !cols.is_empty() {
3123                    lf = lf.select(cols);
3124                }
3125
3126                let schema = match schema_opt {
3127                    Some(s) => s,
3128                    None => match lf.collect_schema() {
3129                        Ok(s) => s,
3130                        Err(e) => {
3131                            self.error = Some(e);
3132                            return;
3133                        }
3134                    },
3135                };
3136
3137                self.schema = schema;
3138                self.invalidate_num_rows();
3139                self.lf = lf;
3140                self.column_order = self.schema.iter_names().map(|s| s.to_string()).collect();
3141
3142                // Lock grouped columns if by clause was used
3143                // Only lock the columns specified in the 'by' clause, not the value columns
3144                if !group_by_col_names.is_empty() {
3145                    // Group columns appear first in Polars results, so count consecutive
3146                    // columns from the start that are in group_by_col_names
3147                    let mut locked_count = 0;
3148                    for col_name in &self.column_order {
3149                        if group_by_col_names.contains(col_name) {
3150                            locked_count += 1;
3151                        } else {
3152                            // Once we hit a non-group column, we've passed all group columns
3153                            break;
3154                        }
3155                    }
3156                    self.locked_columns_count = locked_count;
3157                } else {
3158                    self.locked_columns_count = 0;
3159                }
3160
3161                // Clear filters when using query
3162                self.filters.clear();
3163                self.sort_columns.clear();
3164                self.sort_ascending = true;
3165                self.start_row = 0;
3166                self.termcol_index = 0;
3167                self.active_query = query;
3168                self.buffered_start_row = 0;
3169                self.buffered_end_row = 0;
3170                self.buffered_df = None;
3171                // Reset drill-down state when applying new query
3172                self.drilled_down_group_index = None;
3173                self.drilled_down_group_key = None;
3174                self.drilled_down_group_key_columns = None;
3175                self.grouped_lf = None;
3176                // Reset table state selection
3177                self.table_state.select(Some(0));
3178                // Collect will clamp start_row to valid range, but we want to ensure it's 0
3179                // So we set it to 0, collect (which may clamp it), then ensure it's 0 again
3180                self.collect();
3181                // After collect(), ensure we're at the top (collect() may have clamped if num_rows was wrong)
3182                // But if num_rows > 0, we want start_row = 0 to show the first row
3183                if self.num_rows > 0 {
3184                    self.start_row = 0;
3185                }
3186            }
3187            Err(e) => {
3188                // Parse errors are already user-facing strings; store as ComputeError
3189                self.error = Some(PolarsError::ComputeError(e.into()));
3190            }
3191        }
3192    }
3193
3194    /// Execute a SQL query against the current LazyFrame (registered as table "df").
3195    /// Empty SQL resets to original state. Does not call collect(); the event loop does that via AppEvent::Collect.
3196    pub fn sql_query(&mut self, sql: String) {
3197        self.error = None;
3198        let trimmed = sql.trim();
3199        if trimmed.is_empty() {
3200            self.reset_lf_to_original();
3201            return;
3202        }
3203
3204        #[cfg(feature = "sql")]
3205        {
3206            use polars_sql::SQLContext;
3207            let mut ctx = SQLContext::new();
3208            ctx.register("df", self.lf.clone());
3209            match ctx.execute(trimmed) {
3210                Ok(result_lf) => {
3211                    let schema = match result_lf.clone().collect_schema() {
3212                        Ok(s) => s,
3213                        Err(e) => {
3214                            self.error = Some(e);
3215                            return;
3216                        }
3217                    };
3218                    self.schema = schema;
3219                    self.invalidate_num_rows();
3220                    self.lf = result_lf;
3221                    self.column_order = self.schema.iter_names().map(|s| s.to_string()).collect();
3222                    self.active_sql_query = sql;
3223                    self.locked_columns_count = 0;
3224                    self.filters.clear();
3225                    self.sort_columns.clear();
3226                    self.sort_ascending = true;
3227                    self.start_row = 0;
3228                    self.termcol_index = 0;
3229                    self.drilled_down_group_index = None;
3230                    self.drilled_down_group_key = None;
3231                    self.drilled_down_group_key_columns = None;
3232                    self.grouped_lf = None;
3233                    self.buffered_start_row = 0;
3234                    self.buffered_end_row = 0;
3235                    self.buffered_df = None;
3236                    self.table_state.select(Some(0));
3237                }
3238                Err(e) => {
3239                    self.error = Some(e);
3240                }
3241            }
3242        }
3243
3244        #[cfg(not(feature = "sql"))]
3245        {
3246            self.error = Some(PolarsError::ComputeError(
3247                format!("SQL support not compiled in (build with --features sql)").into(),
3248            ));
3249        }
3250    }
3251
3252    /// Fuzzy search: filter rows where any string column matches the query.
3253    /// Query is split on whitespace; each token must match (in order, case-insensitive) in some string column.
3254    /// Empty query resets to original_lf.
3255    pub fn fuzzy_search(&mut self, query: String) {
3256        self.error = None;
3257        let trimmed = query.trim();
3258        if trimmed.is_empty() {
3259            self.reset_lf_to_original();
3260            self.collect();
3261            return;
3262        }
3263        let string_cols: Vec<String> = self
3264            .schema
3265            .iter()
3266            .filter(|(_, dtype)| dtype.is_string())
3267            .map(|(name, _)| name.to_string())
3268            .collect();
3269        if string_cols.is_empty() {
3270            self.error = Some(PolarsError::ComputeError(
3271                "Fuzzy search requires at least one string column".into(),
3272            ));
3273            return;
3274        }
3275        let tokens: Vec<&str> = trimmed
3276            .split_whitespace()
3277            .filter(|s| !s.is_empty())
3278            .collect();
3279        let token_exprs: Vec<Expr> = tokens
3280            .iter()
3281            .map(|token| {
3282                let pattern = fuzzy_token_regex(token);
3283                string_cols
3284                    .iter()
3285                    .map(|c| col(c.as_str()).str().contains(lit(pattern.as_str()), false))
3286                    .reduce(|a, b| a.or(b))
3287                    .unwrap()
3288            })
3289            .collect();
3290        let combined = token_exprs.into_iter().reduce(|a, b| a.and(b)).unwrap();
3291        self.lf = self.original_lf.clone().filter(combined);
3292        self.filters.clear();
3293        self.sort_columns.clear();
3294        self.active_query.clear();
3295        self.active_sql_query.clear();
3296        self.active_fuzzy_query = query;
3297        // Reset view and buffer so collect() runs on the new lf
3298        self.locked_columns_count = 0;
3299        self.start_row = 0;
3300        self.termcol_index = 0;
3301        self.drilled_down_group_index = None;
3302        self.drilled_down_group_key = None;
3303        self.drilled_down_group_key_columns = None;
3304        self.grouped_lf = None;
3305        self.buffered_start_row = 0;
3306        self.buffered_end_row = 0;
3307        self.buffered_df = None;
3308        self.table_state.select(Some(0));
3309        self.invalidate_num_rows();
3310        self.collect();
3311    }
3312}
3313
3314/// Case-insensitive regex for one token: chars in order with `.*` between.
3315pub(crate) fn fuzzy_token_regex(token: &str) -> String {
3316    let inner: String =
3317        token
3318            .chars()
3319            .map(|c| regex::escape(&c.to_string()))
3320            .fold(String::new(), |mut s, e| {
3321                if !s.is_empty() {
3322                    s.push_str(".*");
3323                }
3324                s.push_str(&e);
3325                s
3326            });
3327    format!("(?i).*{}.*", inner)
3328}
3329
3330pub struct DataTable {
3331    pub header_bg: Color,
3332    pub header_fg: Color,
3333    pub row_numbers_fg: Color,
3334    pub separator_fg: Color,
3335    pub table_cell_padding: u16,
3336    pub alternate_row_bg: Option<Color>,
3337    /// When true, colorize cells by column type using the optional colors below.
3338    pub column_colors: bool,
3339    pub str_col: Option<Color>,
3340    pub int_col: Option<Color>,
3341    pub float_col: Option<Color>,
3342    pub bool_col: Option<Color>,
3343    pub temporal_col: Option<Color>,
3344}
3345
3346impl Default for DataTable {
3347    fn default() -> Self {
3348        Self {
3349            header_bg: Color::Indexed(236),
3350            header_fg: Color::White,
3351            row_numbers_fg: Color::DarkGray,
3352            separator_fg: Color::White,
3353            table_cell_padding: 1,
3354            alternate_row_bg: None,
3355            column_colors: false,
3356            str_col: None,
3357            int_col: None,
3358            float_col: None,
3359            bool_col: None,
3360            temporal_col: None,
3361        }
3362    }
3363}
3364
3365/// Parameters for rendering the row numbers column.
3366struct RowNumbersParams {
3367    start_row: usize,
3368    visible_rows: usize,
3369    num_rows: usize,
3370    row_start_index: usize,
3371    selected_row: Option<usize>,
3372}
3373
3374impl DataTable {
3375    pub fn new() -> Self {
3376        Self::default()
3377    }
3378
3379    pub fn with_colors(
3380        mut self,
3381        header_bg: Color,
3382        header_fg: Color,
3383        row_numbers_fg: Color,
3384        separator_fg: Color,
3385    ) -> Self {
3386        self.header_bg = header_bg;
3387        self.header_fg = header_fg;
3388        self.row_numbers_fg = row_numbers_fg;
3389        self.separator_fg = separator_fg;
3390        self
3391    }
3392
3393    pub fn with_cell_padding(mut self, padding: u16) -> Self {
3394        self.table_cell_padding = padding;
3395        self
3396    }
3397
3398    pub fn with_alternate_row_bg(mut self, color: Option<Color>) -> Self {
3399        self.alternate_row_bg = color;
3400        self
3401    }
3402
3403    /// Enable column-type coloring and set colors for string, int, float, bool, and temporal columns.
3404    pub fn with_column_type_colors(
3405        mut self,
3406        str_col: Color,
3407        int_col: Color,
3408        float_col: Color,
3409        bool_col: Color,
3410        temporal_col: Color,
3411    ) -> Self {
3412        self.column_colors = true;
3413        self.str_col = Some(str_col);
3414        self.int_col = Some(int_col);
3415        self.float_col = Some(float_col);
3416        self.bool_col = Some(bool_col);
3417        self.temporal_col = Some(temporal_col);
3418        self
3419    }
3420
3421    /// Return the color for a column dtype when column_colors is enabled.
3422    fn column_type_color(&self, dtype: &DataType) -> Option<Color> {
3423        if !self.column_colors {
3424            return None;
3425        }
3426        match dtype {
3427            DataType::String => self.str_col,
3428            DataType::Int8
3429            | DataType::Int16
3430            | DataType::Int32
3431            | DataType::Int64
3432            | DataType::UInt8
3433            | DataType::UInt16
3434            | DataType::UInt32
3435            | DataType::UInt64 => self.int_col,
3436            DataType::Float32 | DataType::Float64 => self.float_col,
3437            DataType::Boolean => self.bool_col,
3438            DataType::Date | DataType::Datetime(_, _) | DataType::Time | DataType::Duration(_) => {
3439                self.temporal_col
3440            }
3441            _ => None,
3442        }
3443    }
3444
3445    fn render_dataframe(
3446        &self,
3447        df: &DataFrame,
3448        area: Rect,
3449        buf: &mut Buffer,
3450        state: &mut TableState,
3451        _row_numbers: bool,
3452        _start_row_offset: usize,
3453    ) {
3454        // make each column as wide as it needs to be to fit the content
3455        let (height, cols) = df.shape();
3456
3457        // widths starts at the length of each column naame
3458        let mut widths: Vec<u16> = df
3459            .get_column_names()
3460            .iter()
3461            .map(|name| name.chars().count() as u16)
3462            .collect();
3463
3464        let mut used_width = 0;
3465
3466        // rows is a vector initialized to a vector of lenth "height" empty rows
3467        let mut rows: Vec<Vec<Cell>> = vec![vec![]; height];
3468        let mut visible_columns = 0;
3469
3470        let max_rows = height.min(if area.height > 1 {
3471            area.height as usize - 1
3472        } else {
3473            0
3474        });
3475
3476        for col_index in 0..cols {
3477            let mut max_len = widths[col_index];
3478            let col_data = &df[col_index];
3479            let col_color = self.column_type_color(col_data.dtype());
3480
3481            for (row_index, row) in rows.iter_mut().take(max_rows).enumerate() {
3482                let value = col_data.get(row_index).unwrap();
3483                let val_str: Cow<str> = if matches!(value, AnyValue::Null) {
3484                    Cow::Borrowed("")
3485                } else {
3486                    value.str_value()
3487                };
3488                let len = val_str.chars().count() as u16;
3489                max_len = max_len.max(len);
3490                let cell = match col_color {
3491                    Some(c) => Cell::from(Line::from(Span::styled(
3492                        val_str.into_owned(),
3493                        Style::default().fg(c),
3494                    ))),
3495                    None => Cell::from(Line::from(val_str)),
3496                };
3497                row.push(cell);
3498            }
3499
3500            // Use > not >= so the last column is shown when it fits exactly (no padding needed after it)
3501            let overflows = (used_width + max_len) > area.width;
3502
3503            if overflows && col_data.dtype() == &DataType::String {
3504                let visible_width = area.width.saturating_sub(used_width);
3505                visible_columns += 1;
3506                widths[col_index] = visible_width;
3507                break;
3508            } else if !overflows {
3509                visible_columns += 1;
3510                widths[col_index] = max_len;
3511                used_width += max_len + self.table_cell_padding;
3512            } else {
3513                break;
3514            }
3515        }
3516
3517        widths.truncate(visible_columns);
3518        // convert rows to a vector of Row, with optional alternate row background
3519        let rows: Vec<Row> = rows
3520            .into_iter()
3521            .enumerate()
3522            .map(|(row_index, mut row)| {
3523                row.truncate(visible_columns);
3524                let row_style = if row_index % 2 == 1 {
3525                    self.alternate_row_bg
3526                        .map(|c| Style::default().bg(c))
3527                        .unwrap_or_default()
3528                } else {
3529                    Style::default()
3530                };
3531                Row::new(row).style(row_style)
3532            })
3533            .collect();
3534
3535        let header_row_style = if self.header_bg == Color::Reset {
3536            Style::default().fg(self.header_fg)
3537        } else {
3538            Style::default().bg(self.header_bg).fg(self.header_fg)
3539        };
3540        let headers: Vec<Span> = df
3541            .get_column_names()
3542            .iter()
3543            .take(visible_columns)
3544            .map(|name| Span::styled(name.to_string(), Style::default()))
3545            .collect();
3546
3547        StatefulWidget::render(
3548            Table::new(rows, widths)
3549                .column_spacing(self.table_cell_padding)
3550                .header(Row::new(headers).style(header_row_style))
3551                .row_highlight_style(Style::default().add_modifier(Modifier::REVERSED)),
3552            area,
3553            buf,
3554            state,
3555        );
3556    }
3557
3558    fn render_row_numbers(&self, area: Rect, buf: &mut Buffer, params: RowNumbersParams) {
3559        // Header row: same style as the rest of the column headers (fill full width so color matches)
3560        let header_style = if self.header_bg == Color::Reset {
3561            Style::default().fg(self.header_fg)
3562        } else {
3563            Style::default().bg(self.header_bg).fg(self.header_fg)
3564        };
3565        let header_fill = " ".repeat(area.width as usize);
3566        Paragraph::new(header_fill).style(header_style).render(
3567            Rect {
3568                x: area.x,
3569                y: area.y,
3570                width: area.width,
3571                height: 1,
3572            },
3573            buf,
3574        );
3575
3576        // Only render up to the actual number of rows in the data
3577        let rows_to_render = params
3578            .visible_rows
3579            .min(params.num_rows.saturating_sub(params.start_row));
3580
3581        if rows_to_render == 0 {
3582            return;
3583        }
3584
3585        // Calculate width needed for largest row number
3586        let max_row_num =
3587            params.start_row + rows_to_render.saturating_sub(1) + params.row_start_index;
3588        let max_width = max_row_num.to_string().len();
3589
3590        // Render row numbers
3591        for row_idx in 0..rows_to_render.min(area.height.saturating_sub(1) as usize) {
3592            let row_num = params.start_row + row_idx + params.row_start_index;
3593            let row_num_text = row_num.to_string();
3594
3595            // Right-align row numbers within the available width
3596            let padding = max_width.saturating_sub(row_num_text.len());
3597            let padded_text = format!("{}{}", " ".repeat(padding), row_num_text);
3598
3599            // Match main table background: default when row is even (or no alternate);
3600            // when alternate_row_bg is set, odd rows use that background.
3601            // When selected: same background as row (no inversion), foreground = terminal default.
3602            let is_selected = params.selected_row == Some(row_idx);
3603            let (fg, bg) = if is_selected {
3604                (
3605                    Color::Reset,
3606                    self.alternate_row_bg.filter(|_| row_idx % 2 == 1),
3607                )
3608            } else {
3609                (
3610                    self.row_numbers_fg,
3611                    self.alternate_row_bg.filter(|_| row_idx % 2 == 1),
3612                )
3613            };
3614            let row_num_style = match bg {
3615                Some(bg_color) => Style::default().fg(fg).bg(bg_color),
3616                None => Style::default().fg(fg),
3617            };
3618
3619            let y = area.y + row_idx as u16 + 1; // +1 for header row
3620            if y < area.y + area.height {
3621                Paragraph::new(padded_text).style(row_num_style).render(
3622                    Rect {
3623                        x: area.x,
3624                        y,
3625                        width: area.width,
3626                        height: 1,
3627                    },
3628                    buf,
3629                );
3630            }
3631        }
3632    }
3633}
3634
3635impl StatefulWidget for DataTable {
3636    type State = DataTableState;
3637
3638    fn render(self, area: Rect, buf: &mut Buffer, state: &mut Self::State) {
3639        state.visible_termcols = area.width as usize;
3640        let new_visible_rows = if area.height > 0 {
3641            (area.height - 1) as usize
3642        } else {
3643            0
3644        };
3645        let needs_collect = new_visible_rows != state.visible_rows;
3646        state.visible_rows = new_visible_rows;
3647
3648        if let Some(selected) = state.table_state.selected() {
3649            if selected >= state.visible_rows {
3650                state.table_state.select(Some(state.visible_rows - 1))
3651            }
3652        }
3653
3654        if needs_collect {
3655            state.collect();
3656        }
3657
3658        // Only show errors in main view if not suppressed (e.g., when query input is active)
3659        // Query errors should only be shown in the query input frame
3660        if let Some(error) = state.error.as_ref() {
3661            if !state.suppress_error_display {
3662                Paragraph::new(format!("Error: {}", user_message_from_polars(error)))
3663                    .centered()
3664                    .block(
3665                        Block::default()
3666                            .borders(Borders::NONE)
3667                            .padding(Padding::top(area.height / 2)),
3668                    )
3669                    .wrap(ratatui::widgets::Wrap { trim: true })
3670                    .render(area, buf);
3671                return;
3672            }
3673            // If suppress_error_display is true, continue rendering the table normally
3674        }
3675
3676        // Calculate row number column width if enabled
3677        let row_num_width = if state.row_numbers {
3678            let max_row_num = state.start_row + state.visible_rows.saturating_sub(1) + 1; // +1 for 1-based, +1 for potential
3679            max_row_num.to_string().len().max(1) as u16 + 1 // +1 for spacing
3680        } else {
3681            0
3682        };
3683
3684        // Calculate locked columns width if any
3685        let mut locked_width = row_num_width;
3686        if let Some(locked_df) = state.locked_df.as_ref() {
3687            let (_, cols) = locked_df.shape();
3688            for col_index in 0..cols {
3689                let col_name = locked_df.get_column_names()[col_index];
3690                let mut max_len = col_name.chars().count() as u16;
3691                let col_data = &locked_df[col_index];
3692                for row_index in 0..locked_df.height().min(state.visible_rows) {
3693                    let value = col_data.get(row_index).unwrap();
3694                    let val_str: Cow<str> = if matches!(value, AnyValue::Null) {
3695                        Cow::Borrowed("")
3696                    } else {
3697                        value.str_value()
3698                    };
3699                    let len = val_str.chars().count() as u16;
3700                    max_len = max_len.max(len);
3701                }
3702                locked_width += max_len + 1;
3703            }
3704        }
3705
3706        // Split area into locked and scrollable parts
3707        if locked_width > row_num_width && locked_width < area.width {
3708            let locked_area = Rect {
3709                x: area.x,
3710                y: area.y,
3711                width: locked_width,
3712                height: area.height,
3713            };
3714            let separator_x = locked_area.x + locked_area.width;
3715
3716            // If row numbers are enabled, render them first in a separate area
3717            if state.row_numbers {
3718                let row_num_area = Rect {
3719                    x: area.x,
3720                    y: area.y,
3721                    width: row_num_width,
3722                    height: area.height,
3723                };
3724                self.render_row_numbers(
3725                    row_num_area,
3726                    buf,
3727                    RowNumbersParams {
3728                        start_row: state.start_row,
3729                        visible_rows: state.visible_rows,
3730                        num_rows: state.num_rows,
3731                        row_start_index: state.row_start_index,
3732                        selected_row: state.table_state.selected(),
3733                    },
3734                );
3735            }
3736            let scrollable_area = Rect {
3737                x: separator_x + 1,
3738                y: area.y,
3739                width: area.width.saturating_sub(locked_width + 1),
3740                height: area.height,
3741            };
3742
3743            // Render locked columns (no background shading, just the vertical separator)
3744            if let Some(locked_df) = state.locked_df.as_ref() {
3745                // Adjust locked_area to account for row numbers if present
3746                let adjusted_locked_area = if state.row_numbers {
3747                    Rect {
3748                        x: area.x + row_num_width,
3749                        y: area.y,
3750                        width: locked_width - row_num_width,
3751                        height: area.height,
3752                    }
3753                } else {
3754                    locked_area
3755                };
3756
3757                // Slice buffer to visible portion
3758                let offset = state.start_row.saturating_sub(state.buffered_start_row);
3759                let slice_len = state
3760                    .visible_rows
3761                    .min(locked_df.height().saturating_sub(offset));
3762                if offset < locked_df.height() && slice_len > 0 {
3763                    let sliced_df = locked_df.slice(offset as i64, slice_len);
3764                    self.render_dataframe(
3765                        &sliced_df,
3766                        adjusted_locked_area,
3767                        buf,
3768                        &mut state.table_state,
3769                        false,
3770                        state.start_row,
3771                    );
3772                }
3773            }
3774
3775            // Draw vertical separator line
3776            let separator_x_adjusted = if state.row_numbers {
3777                area.x + row_num_width + (locked_width - row_num_width)
3778            } else {
3779                separator_x
3780            };
3781            for y in area.y..area.y + area.height {
3782                let cell = &mut buf[(separator_x_adjusted, y)];
3783                cell.set_char('│');
3784                cell.set_style(Style::default().fg(self.separator_fg));
3785            }
3786
3787            // Adjust scrollable area to account for row numbers
3788            let adjusted_scrollable_area = if state.row_numbers {
3789                Rect {
3790                    x: separator_x_adjusted + 1,
3791                    y: area.y,
3792                    width: area.width.saturating_sub(locked_width + 1),
3793                    height: area.height,
3794                }
3795            } else {
3796                scrollable_area
3797            };
3798
3799            // Render scrollable columns
3800            if let Some(df) = state.df.as_ref() {
3801                // Slice buffer to visible portion
3802                let offset = state.start_row.saturating_sub(state.buffered_start_row);
3803                let slice_len = state.visible_rows.min(df.height().saturating_sub(offset));
3804                if offset < df.height() && slice_len > 0 {
3805                    let sliced_df = df.slice(offset as i64, slice_len);
3806                    self.render_dataframe(
3807                        &sliced_df,
3808                        adjusted_scrollable_area,
3809                        buf,
3810                        &mut state.table_state,
3811                        false,
3812                        state.start_row,
3813                    );
3814                }
3815            }
3816        } else if let Some(df) = state.df.as_ref() {
3817            // No locked columns, render normally
3818            // If row numbers are enabled, render them first
3819            if state.row_numbers {
3820                let row_num_area = Rect {
3821                    x: area.x,
3822                    y: area.y,
3823                    width: row_num_width,
3824                    height: area.height,
3825                };
3826                self.render_row_numbers(
3827                    row_num_area,
3828                    buf,
3829                    RowNumbersParams {
3830                        start_row: state.start_row,
3831                        visible_rows: state.visible_rows,
3832                        num_rows: state.num_rows,
3833                        row_start_index: state.row_start_index,
3834                        selected_row: state.table_state.selected(),
3835                    },
3836                );
3837
3838                // Adjust data area to exclude row number column
3839                let data_area = Rect {
3840                    x: area.x + row_num_width,
3841                    y: area.y,
3842                    width: area.width.saturating_sub(row_num_width),
3843                    height: area.height,
3844                };
3845
3846                // Slice buffer to visible portion
3847                let offset = state.start_row.saturating_sub(state.buffered_start_row);
3848                let slice_len = state.visible_rows.min(df.height().saturating_sub(offset));
3849                if offset < df.height() && slice_len > 0 {
3850                    let sliced_df = df.slice(offset as i64, slice_len);
3851                    self.render_dataframe(
3852                        &sliced_df,
3853                        data_area,
3854                        buf,
3855                        &mut state.table_state,
3856                        false,
3857                        state.start_row,
3858                    );
3859                }
3860            } else {
3861                // Slice buffer to visible portion
3862                let offset = state.start_row.saturating_sub(state.buffered_start_row);
3863                let slice_len = state.visible_rows.min(df.height().saturating_sub(offset));
3864                if offset < df.height() && slice_len > 0 {
3865                    let sliced_df = df.slice(offset as i64, slice_len);
3866                    self.render_dataframe(
3867                        &sliced_df,
3868                        area,
3869                        buf,
3870                        &mut state.table_state,
3871                        false,
3872                        state.start_row,
3873                    );
3874                }
3875            }
3876        } else if !state.column_order.is_empty() {
3877            // Empty result (0 rows) but we have a schema - show empty table with header, no rows
3878            let empty_columns: Vec<_> = state
3879                .column_order
3880                .iter()
3881                .map(|name| Series::new(name.as_str().into(), Vec::<String>::new()).into())
3882                .collect();
3883            if let Ok(empty_df) = DataFrame::new(empty_columns) {
3884                if state.row_numbers {
3885                    let row_num_area = Rect {
3886                        x: area.x,
3887                        y: area.y,
3888                        width: row_num_width,
3889                        height: area.height,
3890                    };
3891                    self.render_row_numbers(
3892                        row_num_area,
3893                        buf,
3894                        RowNumbersParams {
3895                            start_row: 0,
3896                            visible_rows: state.visible_rows,
3897                            num_rows: 0,
3898                            row_start_index: state.row_start_index,
3899                            selected_row: None,
3900                        },
3901                    );
3902                    let data_area = Rect {
3903                        x: area.x + row_num_width,
3904                        y: area.y,
3905                        width: area.width.saturating_sub(row_num_width),
3906                        height: area.height,
3907                    };
3908                    self.render_dataframe(
3909                        &empty_df,
3910                        data_area,
3911                        buf,
3912                        &mut state.table_state,
3913                        false,
3914                        0,
3915                    );
3916                } else {
3917                    self.render_dataframe(&empty_df, area, buf, &mut state.table_state, false, 0);
3918                }
3919            } else {
3920                Paragraph::new("No data").render(area, buf);
3921            }
3922        } else {
3923            // Truly empty: no schema, not loaded, or blank file
3924            Paragraph::new("No data").render(area, buf);
3925        }
3926    }
3927}
3928
3929#[cfg(test)]
3930mod tests {
3931    use super::*;
3932    use crate::filter_modal::{FilterOperator, FilterStatement, LogicalOperator};
3933    use crate::pivot_melt_modal::{MeltSpec, PivotAggregation, PivotSpec};
3934
3935    fn create_test_lf() -> LazyFrame {
3936        df! (
3937            "a" => &[1, 2, 3],
3938            "b" => &["x", "y", "z"]
3939        )
3940        .unwrap()
3941        .lazy()
3942    }
3943
3944    fn create_large_test_lf() -> LazyFrame {
3945        df! (
3946            "a" => (0..100).collect::<Vec<i32>>(),
3947            "b" => (0..100).map(|i| format!("text_{}", i)).collect::<Vec<String>>(),
3948            "c" => (0..100).map(|i| i % 3).collect::<Vec<i32>>(),
3949            "d" => (0..100).map(|i| i % 5).collect::<Vec<i32>>()
3950        )
3951        .unwrap()
3952        .lazy()
3953    }
3954
3955    #[test]
3956    fn test_from_csv() {
3957        // Ensure sample data is generated before running test
3958        // Test uncompressed CSV loading
3959        let path = crate::tests::sample_data_dir().join("3-sfd-header.csv");
3960        let state = DataTableState::from_csv(&path, &Default::default()).unwrap(); // Uses default buffer params from options
3961        assert_eq!(state.schema.len(), 6); // id, integer_col, float_col, string_col, boolean_col, date_col
3962    }
3963
3964    #[test]
3965    fn test_from_csv_gzipped() {
3966        // Ensure sample data is generated before running test
3967        // Test gzipped CSV loading
3968        let path = crate::tests::sample_data_dir().join("mixed_types.csv.gz");
3969        let state = DataTableState::from_csv(&path, &Default::default()).unwrap(); // Uses default buffer params from options
3970        assert_eq!(state.schema.len(), 6); // id, integer_col, float_col, string_col, boolean_col, date_col
3971    }
3972
3973    #[test]
3974    fn test_from_parquet() {
3975        // Ensure sample data is generated before running test
3976        let path = crate::tests::sample_data_dir().join("people.parquet");
3977        let state = DataTableState::from_parquet(&path, None, None, None, None, false, 1).unwrap();
3978        assert!(!state.schema.is_empty());
3979    }
3980
3981    #[test]
3982    fn test_from_ipc() {
3983        use polars::prelude::IpcWriter;
3984        use std::io::BufWriter;
3985        let mut df = df!(
3986            "x" => &[1_i32, 2, 3],
3987            "y" => &["a", "b", "c"]
3988        )
3989        .unwrap();
3990        let dir = std::env::temp_dir();
3991        let path = dir.join("datui_test_ipc.arrow");
3992        let file = std::fs::File::create(&path).unwrap();
3993        let mut writer = BufWriter::new(file);
3994        IpcWriter::new(&mut writer).finish(&mut df).unwrap();
3995        drop(writer);
3996        let state = DataTableState::from_ipc(&path, None, None, None, None, false, 1).unwrap();
3997        assert_eq!(state.schema.len(), 2);
3998        assert!(state.schema.contains("x"));
3999        assert!(state.schema.contains("y"));
4000        let _ = std::fs::remove_file(&path);
4001    }
4002
4003    #[test]
4004    fn test_from_avro() {
4005        use polars::io::avro::AvroWriter;
4006        use std::io::BufWriter;
4007        let mut df = df!(
4008            "id" => &[1_i32, 2, 3],
4009            "name" => &["alice", "bob", "carol"]
4010        )
4011        .unwrap();
4012        let dir = std::env::temp_dir();
4013        let path = dir.join("datui_test_avro.avro");
4014        let file = std::fs::File::create(&path).unwrap();
4015        let mut writer = BufWriter::new(file);
4016        AvroWriter::new(&mut writer).finish(&mut df).unwrap();
4017        drop(writer);
4018        let state = DataTableState::from_avro(&path, None, None, None, None, false, 1).unwrap();
4019        assert_eq!(state.schema.len(), 2);
4020        assert!(state.schema.contains("id"));
4021        assert!(state.schema.contains("name"));
4022        let _ = std::fs::remove_file(&path);
4023    }
4024
4025    #[test]
4026    fn test_from_orc() {
4027        use arrow::array::{Int64Array, StringArray};
4028        use arrow::datatypes::{DataType, Field, Schema};
4029        use arrow::record_batch::RecordBatch;
4030        use orc_rust::ArrowWriterBuilder;
4031        use std::io::BufWriter;
4032        use std::sync::Arc;
4033
4034        let schema = Arc::new(Schema::new(vec![
4035            Field::new("id", DataType::Int64, false),
4036            Field::new("name", DataType::Utf8, false),
4037        ]));
4038        let id_array = Arc::new(Int64Array::from(vec![1_i64, 2, 3]));
4039        let name_array = Arc::new(StringArray::from(vec!["a", "b", "c"]));
4040        let batch = RecordBatch::try_new(schema.clone(), vec![id_array, name_array]).unwrap();
4041
4042        let dir = std::env::temp_dir();
4043        let path = dir.join("datui_test_orc.orc");
4044        let file = std::fs::File::create(&path).unwrap();
4045        let writer = BufWriter::new(file);
4046        let mut orc_writer = ArrowWriterBuilder::new(writer, schema).try_build().unwrap();
4047        orc_writer.write(&batch).unwrap();
4048        orc_writer.close().unwrap();
4049
4050        let state = DataTableState::from_orc(&path, None, None, None, None, false, 1).unwrap();
4051        assert_eq!(state.schema.len(), 2);
4052        assert!(state.schema.contains("id"));
4053        assert!(state.schema.contains("name"));
4054        let _ = std::fs::remove_file(&path);
4055    }
4056
4057    #[test]
4058    fn test_filter() {
4059        let lf = create_test_lf();
4060        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4061        let filters = vec![FilterStatement {
4062            column: "a".to_string(),
4063            operator: FilterOperator::Gt,
4064            value: "2".to_string(),
4065            logical_op: LogicalOperator::And,
4066        }];
4067        state.filter(filters);
4068        let df = state.lf.clone().collect().unwrap();
4069        assert_eq!(df.shape().0, 1);
4070        assert_eq!(df.column("a").unwrap().get(0).unwrap(), AnyValue::Int32(3));
4071    }
4072
4073    #[test]
4074    fn test_sort() {
4075        let lf = create_test_lf();
4076        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4077        state.sort(vec!["a".to_string()], false);
4078        let df = state.lf.clone().collect().unwrap();
4079        assert_eq!(df.column("a").unwrap().get(0).unwrap(), AnyValue::Int32(3));
4080    }
4081
4082    #[test]
4083    fn test_query() {
4084        let lf = create_test_lf();
4085        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4086        state.query("select b where a = 2".to_string());
4087        let df = state.lf.clone().collect().unwrap();
4088        assert_eq!(df.shape(), (1, 1));
4089        assert_eq!(
4090            df.column("b").unwrap().get(0).unwrap(),
4091            AnyValue::String("y")
4092        );
4093    }
4094
4095    #[test]
4096    fn test_query_date_accessors() {
4097        use chrono::NaiveDate;
4098        let df = df!(
4099            "event_date" => [
4100                NaiveDate::from_ymd_opt(2024, 1, 15).unwrap(),
4101                NaiveDate::from_ymd_opt(2024, 6, 20).unwrap(),
4102                NaiveDate::from_ymd_opt(2024, 12, 31).unwrap(),
4103            ],
4104            "name" => &["a", "b", "c"],
4105        )
4106        .unwrap();
4107        let lf = df.lazy();
4108        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4109
4110        // Select with date accessors
4111        state.query("select name, year: event_date.year, month: event_date.month".to_string());
4112        assert!(
4113            state.error.is_none(),
4114            "query should succeed: {:?}",
4115            state.error
4116        );
4117        let df = state.lf.clone().collect().unwrap();
4118        assert_eq!(df.shape(), (3, 3));
4119        assert_eq!(
4120            df.column("year").unwrap().get(0).unwrap(),
4121            AnyValue::Int32(2024)
4122        );
4123        assert_eq!(
4124            df.column("month").unwrap().get(0).unwrap(),
4125            AnyValue::Int8(1)
4126        );
4127        assert_eq!(
4128            df.column("month").unwrap().get(1).unwrap(),
4129            AnyValue::Int8(6)
4130        );
4131
4132        // Filter with date accessor
4133        state.query("select name, event_date where event_date.month = 12".to_string());
4134        assert!(
4135            state.error.is_none(),
4136            "filter should succeed: {:?}",
4137            state.error
4138        );
4139        let df = state.lf.clone().collect().unwrap();
4140        assert_eq!(df.height(), 1);
4141        assert_eq!(
4142            df.column("name").unwrap().get(0).unwrap(),
4143            AnyValue::String("c")
4144        );
4145
4146        // Filter with YYYY.MM.DD date literal
4147        state.query("select name, event_date where event_date.date > 2024.06.15".to_string());
4148        assert!(
4149            state.error.is_none(),
4150            "date literal filter should succeed: {:?}",
4151            state.error
4152        );
4153        let df = state.lf.clone().collect().unwrap();
4154        assert_eq!(
4155            df.height(),
4156            2,
4157            "2024-06-20 and 2024-12-31 are after 2024-06-15"
4158        );
4159
4160        // String accessors: upper, lower, len, ends_with
4161        state.query(
4162            "select name, upper_name: name.upper, name_len: name.len where name.ends_with[\"c\"]"
4163                .to_string(),
4164        );
4165        assert!(
4166            state.error.is_none(),
4167            "string accessors should succeed: {:?}",
4168            state.error
4169        );
4170        let df = state.lf.clone().collect().unwrap();
4171        assert_eq!(df.height(), 1, "only 'c' ends with 'c'");
4172        assert_eq!(
4173            df.column("upper_name").unwrap().get(0).unwrap(),
4174            AnyValue::String("C")
4175        );
4176
4177        // Query that returns 0 rows: df and locked_df must be cleared for correct empty-table render
4178        state.query("select where event_date.date = 2020.01.01".to_string());
4179        assert!(state.error.is_none());
4180        assert_eq!(state.num_rows, 0);
4181        state.visible_rows = 10;
4182        state.collect();
4183        assert!(state.df.is_none(), "df must be cleared when num_rows is 0");
4184        assert!(
4185            state.locked_df.is_none(),
4186            "locked_df must be cleared when num_rows is 0"
4187        );
4188    }
4189
4190    #[test]
4191    fn test_select_next_previous() {
4192        let lf = create_large_test_lf();
4193        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4194        state.visible_rows = 10;
4195        state.table_state.select(Some(5));
4196
4197        state.select_next();
4198        assert_eq!(state.table_state.selected(), Some(6));
4199
4200        state.select_previous();
4201        assert_eq!(state.table_state.selected(), Some(5));
4202    }
4203
4204    #[test]
4205    fn test_page_up_down() {
4206        let lf = create_large_test_lf();
4207        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4208        state.visible_rows = 20;
4209        state.collect();
4210
4211        assert_eq!(state.start_row, 0);
4212        state.page_down();
4213        assert_eq!(state.start_row, 20);
4214        state.page_down();
4215        assert_eq!(state.start_row, 40);
4216        state.page_up();
4217        assert_eq!(state.start_row, 20);
4218        state.page_up();
4219        assert_eq!(state.start_row, 0);
4220    }
4221
4222    #[test]
4223    fn test_scroll_left_right() {
4224        let lf = create_large_test_lf();
4225        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4226        assert_eq!(state.termcol_index, 0);
4227        state.scroll_right();
4228        assert_eq!(state.termcol_index, 1);
4229        state.scroll_right();
4230        assert_eq!(state.termcol_index, 2);
4231        state.scroll_left();
4232        assert_eq!(state.termcol_index, 1);
4233        state.scroll_left();
4234        assert_eq!(state.termcol_index, 0);
4235    }
4236
4237    #[test]
4238    fn test_reverse() {
4239        let lf = create_test_lf();
4240        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4241        state.sort(vec!["a".to_string()], true);
4242        assert_eq!(
4243            state
4244                .lf
4245                .clone()
4246                .collect()
4247                .unwrap()
4248                .column("a")
4249                .unwrap()
4250                .get(0)
4251                .unwrap(),
4252            AnyValue::Int32(1)
4253        );
4254        state.reverse();
4255        assert_eq!(
4256            state
4257                .lf
4258                .clone()
4259                .collect()
4260                .unwrap()
4261                .column("a")
4262                .unwrap()
4263                .get(0)
4264                .unwrap(),
4265            AnyValue::Int32(3)
4266        );
4267    }
4268
4269    #[test]
4270    fn test_filter_multiple() {
4271        let lf = create_large_test_lf();
4272        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4273        let filters = vec![
4274            FilterStatement {
4275                column: "c".to_string(),
4276                operator: FilterOperator::Eq,
4277                value: "1".to_string(),
4278                logical_op: LogicalOperator::And,
4279            },
4280            FilterStatement {
4281                column: "d".to_string(),
4282                operator: FilterOperator::Eq,
4283                value: "2".to_string(),
4284                logical_op: LogicalOperator::And,
4285            },
4286        ];
4287        state.filter(filters);
4288        let df = state.lf.clone().collect().unwrap();
4289        assert_eq!(df.shape().0, 7);
4290    }
4291
4292    #[test]
4293    fn test_filter_and_sort() {
4294        let lf = create_large_test_lf();
4295        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4296        let filters = vec![FilterStatement {
4297            column: "c".to_string(),
4298            operator: FilterOperator::Eq,
4299            value: "1".to_string(),
4300            logical_op: LogicalOperator::And,
4301        }];
4302        state.filter(filters);
4303        state.sort(vec!["a".to_string()], false);
4304        let df = state.lf.clone().collect().unwrap();
4305        assert_eq!(df.column("a").unwrap().get(0).unwrap(), AnyValue::Int32(97));
4306    }
4307
4308    /// Minimal long-format data for pivot tests: id, date, key, value.
4309    /// Includes duplicates for aggregation (e.g. (1,d1,A) appears twice).
4310    fn create_pivot_long_lf() -> LazyFrame {
4311        let df = df!(
4312            "id" => &[1_i32, 1, 1, 2, 2, 2, 1, 2],
4313            "date" => &["d1", "d1", "d1", "d1", "d1", "d1", "d1", "d1"],
4314            "key" => &["A", "B", "C", "A", "B", "C", "A", "B"],
4315            "value" => &[10.0_f64, 20.0, 30.0, 40.0, 50.0, 60.0, 11.0, 51.0],
4316        )
4317        .unwrap();
4318        df.lazy()
4319    }
4320
4321    /// Wide-format data for melt tests: id, date, c1, c2, c3.
4322    fn create_melt_wide_lf() -> LazyFrame {
4323        let df = df!(
4324            "id" => &[1_i32, 2, 3],
4325            "date" => &["d1", "d2", "d3"],
4326            "c1" => &[10.0_f64, 20.0, 30.0],
4327            "c2" => &[11.0, 21.0, 31.0],
4328            "c3" => &[12.0, 22.0, 32.0],
4329        )
4330        .unwrap();
4331        df.lazy()
4332    }
4333
4334    #[test]
4335    fn test_pivot_basic() {
4336        let lf = create_pivot_long_lf();
4337        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4338        let spec = PivotSpec {
4339            index: vec!["id".to_string(), "date".to_string()],
4340            pivot_column: "key".to_string(),
4341            value_column: "value".to_string(),
4342            aggregation: PivotAggregation::Last,
4343            sort_columns: None,
4344        };
4345        state.pivot(&spec).unwrap();
4346        let df = state.lf.clone().collect().unwrap();
4347        let names: Vec<&str> = df.get_column_names().iter().map(|s| s.as_str()).collect();
4348        assert!(names.contains(&"id"));
4349        assert!(names.contains(&"date"));
4350        assert!(names.contains(&"A"));
4351        assert!(names.contains(&"B"));
4352        assert!(names.contains(&"C"));
4353        assert_eq!(df.height(), 2);
4354    }
4355
4356    #[test]
4357    fn test_pivot_aggregation_last() {
4358        let lf = create_pivot_long_lf();
4359        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4360        let spec = PivotSpec {
4361            index: vec!["id".to_string(), "date".to_string()],
4362            pivot_column: "key".to_string(),
4363            value_column: "value".to_string(),
4364            aggregation: PivotAggregation::Last,
4365            sort_columns: None,
4366        };
4367        state.pivot(&spec).unwrap();
4368        let df = state.lf.clone().collect().unwrap();
4369        let a_col = df.column("A").unwrap();
4370        let row0 = a_col.get(0).unwrap();
4371        let row1 = a_col.get(1).unwrap();
4372        assert_eq!(row0, AnyValue::Float64(11.0));
4373        assert_eq!(row1, AnyValue::Float64(40.0));
4374    }
4375
4376    #[test]
4377    fn test_pivot_aggregation_first() {
4378        let lf = create_pivot_long_lf();
4379        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4380        let spec = PivotSpec {
4381            index: vec!["id".to_string(), "date".to_string()],
4382            pivot_column: "key".to_string(),
4383            value_column: "value".to_string(),
4384            aggregation: PivotAggregation::First,
4385            sort_columns: None,
4386        };
4387        state.pivot(&spec).unwrap();
4388        let df = state.lf.clone().collect().unwrap();
4389        let a_col = df.column("A").unwrap();
4390        assert_eq!(a_col.get(0).unwrap(), AnyValue::Float64(10.0));
4391        assert_eq!(a_col.get(1).unwrap(), AnyValue::Float64(40.0));
4392    }
4393
4394    #[test]
4395    fn test_pivot_aggregation_min_max() {
4396        let lf = create_pivot_long_lf();
4397        let mut state_min = DataTableState::new(lf.clone(), None, None, None, None, true).unwrap();
4398        state_min
4399            .pivot(&PivotSpec {
4400                index: vec!["id".to_string(), "date".to_string()],
4401                pivot_column: "key".to_string(),
4402                value_column: "value".to_string(),
4403                aggregation: PivotAggregation::Min,
4404                sort_columns: None,
4405            })
4406            .unwrap();
4407        let df_min = state_min.lf.clone().collect().unwrap();
4408        assert_eq!(
4409            df_min.column("A").unwrap().get(0).unwrap(),
4410            AnyValue::Float64(10.0)
4411        );
4412
4413        let mut state_max = DataTableState::new(lf, None, None, None, None, true).unwrap();
4414        state_max
4415            .pivot(&PivotSpec {
4416                index: vec!["id".to_string(), "date".to_string()],
4417                pivot_column: "key".to_string(),
4418                value_column: "value".to_string(),
4419                aggregation: PivotAggregation::Max,
4420                sort_columns: None,
4421            })
4422            .unwrap();
4423        let df_max = state_max.lf.clone().collect().unwrap();
4424        assert_eq!(
4425            df_max.column("A").unwrap().get(0).unwrap(),
4426            AnyValue::Float64(11.0)
4427        );
4428    }
4429
4430    #[test]
4431    fn test_pivot_aggregation_avg_count() {
4432        let lf = create_pivot_long_lf();
4433        let mut state_avg = DataTableState::new(lf.clone(), None, None, None, None, true).unwrap();
4434        state_avg
4435            .pivot(&PivotSpec {
4436                index: vec!["id".to_string(), "date".to_string()],
4437                pivot_column: "key".to_string(),
4438                value_column: "value".to_string(),
4439                aggregation: PivotAggregation::Avg,
4440                sort_columns: None,
4441            })
4442            .unwrap();
4443        let df_avg = state_avg.lf.clone().collect().unwrap();
4444        let a = df_avg.column("A").unwrap().get(0).unwrap();
4445        if let AnyValue::Float64(x) = a {
4446            assert!((x - 10.5).abs() < 1e-6);
4447        } else {
4448            panic!("expected float");
4449        }
4450
4451        let mut state_count = DataTableState::new(lf, None, None, None, None, true).unwrap();
4452        state_count
4453            .pivot(&PivotSpec {
4454                index: vec!["id".to_string(), "date".to_string()],
4455                pivot_column: "key".to_string(),
4456                value_column: "value".to_string(),
4457                aggregation: PivotAggregation::Count,
4458                sort_columns: None,
4459            })
4460            .unwrap();
4461        let df_count = state_count.lf.clone().collect().unwrap();
4462        let a = df_count.column("A").unwrap().get(0).unwrap();
4463        assert_eq!(a, AnyValue::UInt32(2));
4464    }
4465
4466    #[test]
4467    fn test_pivot_string_first_last() {
4468        let df = df!(
4469            "id" => &[1_i32, 1, 2, 2],
4470            "key" => &["X", "Y", "X", "Y"],
4471            "value" => &["low", "mid", "high", "mid"],
4472        )
4473        .unwrap();
4474        let lf = df.lazy();
4475        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4476        let spec = PivotSpec {
4477            index: vec!["id".to_string()],
4478            pivot_column: "key".to_string(),
4479            value_column: "value".to_string(),
4480            aggregation: PivotAggregation::Last,
4481            sort_columns: None,
4482        };
4483        state.pivot(&spec).unwrap();
4484        let out = state.lf.clone().collect().unwrap();
4485        assert_eq!(
4486            out.column("X").unwrap().get(0).unwrap(),
4487            AnyValue::String("low")
4488        );
4489        assert_eq!(
4490            out.column("Y").unwrap().get(0).unwrap(),
4491            AnyValue::String("mid")
4492        );
4493    }
4494
4495    #[test]
4496    fn test_melt_basic() {
4497        let lf = create_melt_wide_lf();
4498        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4499        let spec = MeltSpec {
4500            index: vec!["id".to_string(), "date".to_string()],
4501            value_columns: vec!["c1".to_string(), "c2".to_string(), "c3".to_string()],
4502            variable_name: "variable".to_string(),
4503            value_name: "value".to_string(),
4504        };
4505        state.melt(&spec).unwrap();
4506        let df = state.lf.clone().collect().unwrap();
4507        assert_eq!(df.height(), 9);
4508        let names: Vec<&str> = df.get_column_names().iter().map(|s| s.as_str()).collect();
4509        assert!(names.contains(&"variable"));
4510        assert!(names.contains(&"value"));
4511        assert!(names.contains(&"id"));
4512        assert!(names.contains(&"date"));
4513    }
4514
4515    #[test]
4516    fn test_melt_all_except_index() {
4517        let lf = create_melt_wide_lf();
4518        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4519        let spec = MeltSpec {
4520            index: vec!["id".to_string(), "date".to_string()],
4521            value_columns: vec!["c1".to_string(), "c2".to_string(), "c3".to_string()],
4522            variable_name: "var".to_string(),
4523            value_name: "val".to_string(),
4524        };
4525        state.melt(&spec).unwrap();
4526        let df = state.lf.clone().collect().unwrap();
4527        assert!(df.column("var").is_ok());
4528        assert!(df.column("val").is_ok());
4529    }
4530
4531    #[test]
4532    fn test_pivot_on_current_view_after_filter() {
4533        let lf = create_pivot_long_lf();
4534        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4535        state.filter(vec![FilterStatement {
4536            column: "id".to_string(),
4537            operator: FilterOperator::Eq,
4538            value: "1".to_string(),
4539            logical_op: LogicalOperator::And,
4540        }]);
4541        let spec = PivotSpec {
4542            index: vec!["id".to_string(), "date".to_string()],
4543            pivot_column: "key".to_string(),
4544            value_column: "value".to_string(),
4545            aggregation: PivotAggregation::Last,
4546            sort_columns: None,
4547        };
4548        state.pivot(&spec).unwrap();
4549        let df = state.lf.clone().collect().unwrap();
4550        assert_eq!(df.height(), 1);
4551        let id_col = df.column("id").unwrap();
4552        assert_eq!(id_col.get(0).unwrap(), AnyValue::Int32(1));
4553    }
4554
4555    #[test]
4556    fn test_fuzzy_token_regex() {
4557        assert_eq!(fuzzy_token_regex("foo"), "(?i).*f.*o.*o.*");
4558        assert_eq!(fuzzy_token_regex("a"), "(?i).*a.*");
4559        // Regex-special characters are escaped
4560        let pat = fuzzy_token_regex("[");
4561        assert!(pat.contains("\\["));
4562    }
4563
4564    #[test]
4565    fn test_fuzzy_search() {
4566        // Filter logic is covered by test_fuzzy_search_regex_direct. This test runs the full
4567        // path through DataTableState; it requires sample data (CSV with string column).
4568        crate::tests::ensure_sample_data();
4569        let path = crate::tests::sample_data_dir().join("3-sfd-header.csv");
4570        let mut state = DataTableState::from_csv(&path, &Default::default()).unwrap();
4571        state.visible_rows = 10;
4572        state.collect();
4573        let before = state.num_rows;
4574        state.fuzzy_search("string".to_string());
4575        assert!(state.error.is_none(), "{:?}", state.error);
4576        assert!(state.num_rows <= before, "fuzzy search should filter rows");
4577        state.fuzzy_search("".to_string());
4578        state.collect();
4579        assert_eq!(state.num_rows, before, "empty fuzzy search should reset");
4580        assert!(state.get_active_fuzzy_query().is_empty());
4581    }
4582
4583    #[test]
4584    fn test_fuzzy_search_regex_direct() {
4585        // Sanity check: Polars str().contains with our regex matches "alice" for pattern ".*a.*l.*i.*"
4586        let lf = df!("name" => &["alice", "bob", "carol"]).unwrap().lazy();
4587        let pattern = fuzzy_token_regex("alice");
4588        let out = lf
4589            .filter(col("name").str().contains(lit(pattern.clone()), false))
4590            .collect()
4591            .unwrap();
4592        assert_eq!(out.height(), 1, "regex {:?} should match alice", pattern);
4593
4594        // Two columns OR (as in fuzzy_search)
4595        let lf2 = df!(
4596            "id" => &[1i32, 2, 3],
4597            "name" => &["alice", "bob", "carol"],
4598            "city" => &["NYC", "LA", "Boston"]
4599        )
4600        .unwrap()
4601        .lazy();
4602        let pat = fuzzy_token_regex("alice");
4603        let expr = col("name")
4604            .str()
4605            .contains(lit(pat.clone()), false)
4606            .or(col("city").str().contains(lit(pat), false));
4607        let out2 = lf2.clone().filter(expr).collect().unwrap();
4608        assert_eq!(out2.height(), 1);
4609
4610        // Replicate exact fuzzy_search logic: schema from original_lf, string_cols, then filter
4611        let schema = lf2.clone().collect_schema().unwrap();
4612        let string_cols: Vec<String> = schema
4613            .iter()
4614            .filter(|(_, dtype)| dtype.is_string())
4615            .map(|(name, _)| name.to_string())
4616            .collect();
4617        assert!(
4618            !string_cols.is_empty(),
4619            "df! string cols should be detected"
4620        );
4621        let pattern = fuzzy_token_regex("alice");
4622        let token_expr = string_cols
4623            .iter()
4624            .map(|c| col(c.as_str()).str().contains(lit(pattern.clone()), false))
4625            .reduce(|a, b| a.or(b))
4626            .unwrap();
4627        let out3 = lf2.filter(token_expr).collect().unwrap();
4628        assert_eq!(
4629            out3.height(),
4630            1,
4631            "fuzzy_search-style filter should match 1 row"
4632        );
4633    }
4634
4635    #[test]
4636    fn test_fuzzy_search_no_string_columns() {
4637        let lf = df!("a" => &[1i32, 2, 3], "b" => &[10i64, 20, 30])
4638            .unwrap()
4639            .lazy();
4640        let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4641        state.fuzzy_search("x".to_string());
4642        assert!(state.error.is_some());
4643    }
4644
4645    /// By-queries must produce results sorted by the group columns (age_group, then team)
4646    /// so that output order is deterministic and practical. Raw data is deliberately out of order.
4647    #[test]
4648    fn test_by_query_result_sorted_by_group_columns() {
4649        // Build a small table: age_group (1-5, out of order), team (Red/Blue/Green), score (0-100)
4650        let df = df!(
4651            "age_group" => &[3i64, 1, 5, 2, 4, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5],
4652            "team" => &[
4653                "Red", "Blue", "Green", "Red", "Blue", "Green", "Green", "Red", "Blue",
4654                "Green", "Red", "Blue", "Red", "Blue", "Green",
4655            ],
4656            "score" => &[50.0f64, 10.0, 90.0, 20.0, 30.0, 40.0, 60.0, 70.0, 80.0, 15.0, 25.0, 35.0, 45.0, 55.0, 65.0],
4657        )
4658        .unwrap();
4659        let lf = df.lazy();
4660        let options = crate::OpenOptions::default();
4661        let mut state = DataTableState::from_lazyframe(lf, &options).unwrap();
4662        state.query("select avg score by age_group, team".to_string());
4663        assert!(
4664            state.error.is_none(),
4665            "query should succeed: {:?}",
4666            state.error
4667        );
4668        let result = state.lf.collect().unwrap();
4669        // Result must be sorted by group columns (age_group, then team)
4670        let sorted = result
4671            .sort(
4672                ["age_group", "team"],
4673                SortMultipleOptions::default().with_order_descending(false),
4674            )
4675            .unwrap();
4676        assert_eq!(
4677            result, sorted,
4678            "by-query result must be sorted by (age_group, team)"
4679        );
4680    }
4681
4682    /// Computed group keys (e.g. Fare: 1+floor Fare % 25) must be sorted by their result column
4683    /// values, not by re-evaluating the expression on the result.
4684    #[test]
4685    fn test_by_query_computed_group_key_sorted_by_result_column() {
4686        let df = df!(
4687            "x" => &[7.0f64, 12.0, 3.0, 22.0, 17.0, 8.0],
4688            "v" => &[1.0f64, 2.0, 3.0, 4.0, 5.0, 6.0],
4689        )
4690        .unwrap();
4691        let lf = df.lazy();
4692        let options = crate::OpenOptions::default();
4693        let mut state = DataTableState::from_lazyframe(lf, &options).unwrap();
4694        // bucket: 1+floor(x)%3 -> values 1,2,3; raw x order 7,12,3,22,17,8 -> buckets 2,2,1,2,2,2
4695        state.query("select sum v by bucket: 1+floor x % 3".to_string());
4696        assert!(
4697            state.error.is_none(),
4698            "query should succeed: {:?}",
4699            state.error
4700        );
4701        let result = state.lf.collect().unwrap();
4702        let bucket = result.column("bucket").unwrap();
4703        // Must be sorted by bucket (1, 2, 3)
4704        for i in 1..result.height() {
4705            let prev: i64 = bucket.get(i - 1).unwrap().try_extract().unwrap_or(0);
4706            let curr: i64 = bucket.get(i).unwrap().try_extract().unwrap_or(0);
4707            assert!(
4708                curr >= prev,
4709                "bucket column must be sorted: {} then {}",
4710                prev,
4711                curr
4712            );
4713        }
4714    }
4715}