1use color_eyre::Result;
2use std::borrow::Cow;
3use std::collections::HashSet;
4use std::sync::Arc;
5use std::{fs, fs::File, path::Path};
6
7use polars::io::HiveOptions;
8use polars::prelude::*;
9use ratatui::{
10 buffer::Buffer,
11 layout::Rect,
12 style::{Color, Modifier, Style},
13 text::{Line, Span},
14 widgets::{
15 Block, Borders, Cell, Padding, Paragraph, Row, StatefulWidget, Table, TableState, Widget,
16 },
17};
18
19use crate::error_display::user_message_from_polars;
20use crate::filter_modal::{FilterOperator, FilterStatement, LogicalOperator};
21use crate::pivot_melt_modal::{MeltSpec, PivotAggregation, PivotSpec};
22use crate::query::parse_query;
23use crate::statistics::collect_lazy;
24use crate::{CompressionFormat, OpenOptions};
25use polars::io::csv::read::NullValues;
26use polars::lazy::frame::pivot::pivot_stable;
27use std::io::{BufReader, Read};
28
29use calamine::{open_workbook_auto, Data, Reader};
30use chrono::{NaiveDate, NaiveDateTime, NaiveTime};
31use orc_rust::ArrowReaderBuilder;
32use tempfile::NamedTempFile;
33
34use arrow::array::types::{
35 Date32Type, Date64Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type,
36 TimestampMillisecondType, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
37};
38use arrow::array::{Array, AsArray};
39use arrow::record_batch::RecordBatch;
40
41fn pivot_agg_expr(agg: PivotAggregation) -> Result<Expr> {
42 let e = col(PlSmallStr::from_static(""));
43 let expr = match agg {
44 PivotAggregation::Last => e.last(),
45 PivotAggregation::First => e.first(),
46 PivotAggregation::Min => e.min(),
47 PivotAggregation::Max => e.max(),
48 PivotAggregation::Avg => e.mean(),
49 PivotAggregation::Med => e.median(),
50 PivotAggregation::Std => e.std(1),
51 PivotAggregation::Count => e.len(),
52 };
53 Ok(expr)
54}
55
56pub struct DataTableState {
57 pub lf: LazyFrame,
58 original_lf: LazyFrame,
59 df: Option<DataFrame>, locked_df: Option<DataFrame>, pub table_state: TableState,
62 pub start_row: usize,
63 pub visible_rows: usize,
64 pub termcol_index: usize,
65 pub visible_termcols: usize,
66 pub error: Option<PolarsError>,
67 pub suppress_error_display: bool, pub schema: Arc<Schema>,
69 pub num_rows: usize,
70 num_rows_valid: bool,
72 filters: Vec<FilterStatement>,
73 sort_columns: Vec<String>,
74 sort_ascending: bool,
75 pub active_query: String,
76 pub active_sql_query: String,
78 pub active_fuzzy_query: String,
80 column_order: Vec<String>, locked_columns_count: usize, grouped_lf: Option<LazyFrame>,
83 drilled_down_group_index: Option<usize>, pub drilled_down_group_key: Option<Vec<String>>, pub drilled_down_group_key_columns: Option<Vec<String>>, pages_lookahead: usize,
87 pages_lookback: usize,
88 max_buffered_rows: usize, max_buffered_mb: usize, buffered_start_row: usize,
91 buffered_end_row: usize,
92 buffered_df: Option<DataFrame>,
95 proximity_threshold: usize,
96 row_numbers: bool,
97 row_start_index: usize,
98 last_pivot_spec: Option<PivotSpec>,
100 last_melt_spec: Option<MeltSpec>,
102 pub partition_columns: Option<Vec<String>>,
104 decompress_temp_file: Option<NamedTempFile>,
106 pub polars_streaming: bool,
108 workaround_pivot_date_index: bool,
110}
111
112#[derive(Clone, Copy)]
114enum ExcelColType {
115 Int64,
116 Float64,
117 Boolean,
118 Utf8,
119 Date,
120 Datetime,
121}
122
123impl DataTableState {
124 pub fn new(
125 lf: LazyFrame,
126 pages_lookahead: Option<usize>,
127 pages_lookback: Option<usize>,
128 max_buffered_rows: Option<usize>,
129 max_buffered_mb: Option<usize>,
130 polars_streaming: bool,
131 ) -> Result<Self> {
132 let schema = lf.clone().collect_schema()?;
133 let column_order: Vec<String> = schema.iter_names().map(|s| s.to_string()).collect();
134 Ok(Self {
135 original_lf: lf.clone(),
136 lf,
137 df: None,
138 locked_df: None,
139 table_state: TableState::default(),
140 start_row: 0,
141 visible_rows: 0,
142 termcol_index: 0,
143 visible_termcols: 0,
144 error: None,
145 suppress_error_display: false,
146 schema,
147 num_rows: 0,
148 num_rows_valid: false,
149 filters: Vec::new(),
150 sort_columns: Vec::new(),
151 sort_ascending: true,
152 active_query: String::new(),
153 active_sql_query: String::new(),
154 active_fuzzy_query: String::new(),
155 column_order,
156 locked_columns_count: 0,
157 grouped_lf: None,
158 drilled_down_group_index: None,
159 drilled_down_group_key: None,
160 drilled_down_group_key_columns: None,
161 pages_lookahead: pages_lookahead.unwrap_or(3),
162 pages_lookback: pages_lookback.unwrap_or(3),
163 max_buffered_rows: max_buffered_rows.unwrap_or(100_000),
164 max_buffered_mb: max_buffered_mb.unwrap_or(512),
165 buffered_start_row: 0,
166 buffered_end_row: 0,
167 buffered_df: None,
168 proximity_threshold: 0, row_numbers: false, row_start_index: 1, last_pivot_spec: None,
172 last_melt_spec: None,
173 partition_columns: None,
174 decompress_temp_file: None,
175 polars_streaming,
176 workaround_pivot_date_index: true,
177 })
178 }
179
180 pub fn from_lazyframe(lf: LazyFrame, options: &crate::OpenOptions) -> Result<Self> {
182 let mut state = Self::new(
183 lf,
184 options.pages_lookahead,
185 options.pages_lookback,
186 options.max_buffered_rows,
187 options.max_buffered_mb,
188 options.polars_streaming,
189 )?;
190 state.row_numbers = options.row_numbers;
191 state.row_start_index = options.row_start_index;
192 state.workaround_pivot_date_index = options.workaround_pivot_date_index;
193 Ok(state)
194 }
195
196 pub fn from_schema_and_lazyframe(
200 schema: Arc<Schema>,
201 lf: LazyFrame,
202 options: &crate::OpenOptions,
203 partition_columns: Option<Vec<String>>,
204 ) -> Result<Self> {
205 let column_order: Vec<String> = if let Some(ref part) = partition_columns {
206 let part_set: HashSet<&str> = part.iter().map(String::as_str).collect();
207 let rest: Vec<String> = schema
208 .iter_names()
209 .map(|s| s.to_string())
210 .filter(|c| !part_set.contains(c.as_str()))
211 .collect();
212 part.iter().cloned().chain(rest).collect()
213 } else {
214 schema.iter_names().map(|s| s.to_string()).collect()
215 };
216 Ok(Self {
217 original_lf: lf.clone(),
218 lf,
219 df: None,
220 locked_df: None,
221 table_state: TableState::default(),
222 start_row: 0,
223 visible_rows: 0,
224 termcol_index: 0,
225 visible_termcols: 0,
226 error: None,
227 suppress_error_display: false,
228 schema,
229 num_rows: 0,
230 num_rows_valid: false,
231 filters: Vec::new(),
232 sort_columns: Vec::new(),
233 sort_ascending: true,
234 active_query: String::new(),
235 active_sql_query: String::new(),
236 active_fuzzy_query: String::new(),
237 column_order,
238 locked_columns_count: 0,
239 grouped_lf: None,
240 drilled_down_group_index: None,
241 drilled_down_group_key: None,
242 drilled_down_group_key_columns: None,
243 pages_lookahead: options.pages_lookahead.unwrap_or(3),
244 pages_lookback: options.pages_lookback.unwrap_or(3),
245 max_buffered_rows: options.max_buffered_rows.unwrap_or(100_000),
246 max_buffered_mb: options.max_buffered_mb.unwrap_or(512),
247 buffered_start_row: 0,
248 buffered_end_row: 0,
249 buffered_df: None,
250 proximity_threshold: 0,
251 row_numbers: options.row_numbers,
252 row_start_index: options.row_start_index,
253 last_pivot_spec: None,
254 last_melt_spec: None,
255 partition_columns,
256 decompress_temp_file: None,
257 polars_streaming: options.polars_streaming,
258 workaround_pivot_date_index: options.workaround_pivot_date_index,
259 })
260 }
261
262 fn reset_lf_to_original(&mut self) {
267 self.invalidate_num_rows();
268 self.lf = self.original_lf.clone();
269 self.schema = self
270 .original_lf
271 .clone()
272 .collect_schema()
273 .unwrap_or_else(|_| Arc::new(Schema::with_capacity(0)));
274 self.column_order = self.schema.iter_names().map(|s| s.to_string()).collect();
275 self.active_query.clear();
276 self.active_sql_query.clear();
277 self.active_fuzzy_query.clear();
278 self.locked_columns_count = 0;
279 self.filters.clear();
280 self.sort_columns.clear();
281 self.sort_ascending = true;
282 self.start_row = 0;
283 self.termcol_index = 0;
284 self.drilled_down_group_index = None;
285 self.drilled_down_group_key = None;
286 self.drilled_down_group_key_columns = None;
287 self.grouped_lf = None;
288 self.buffered_start_row = 0;
289 self.buffered_end_row = 0;
290 self.buffered_df = None;
291 self.table_state.select(Some(0));
292 }
293
294 pub fn reset(&mut self) {
295 self.reset_lf_to_original();
296 self.error = None;
297 self.suppress_error_display = false;
298 self.last_pivot_spec = None;
299 self.last_melt_spec = None;
300 self.collect();
301 if self.num_rows > 0 {
302 self.start_row = 0;
303 }
304 }
305
306 pub fn from_parquet(
307 path: &Path,
308 pages_lookahead: Option<usize>,
309 pages_lookback: Option<usize>,
310 max_buffered_rows: Option<usize>,
311 max_buffered_mb: Option<usize>,
312 row_numbers: bool,
313 row_start_index: usize,
314 ) -> Result<Self> {
315 let path_str = path.as_os_str().to_string_lossy();
316 let is_glob = path_str.contains('*');
317 let pl_path = PlPath::Local(Arc::from(path));
318 let args = ScanArgsParquet {
319 glob: is_glob,
320 ..Default::default()
321 };
322 let lf = LazyFrame::scan_parquet(pl_path, args)?;
323 let mut state = Self::new(
324 lf,
325 pages_lookahead,
326 pages_lookback,
327 max_buffered_rows,
328 max_buffered_mb,
329 true,
330 )?;
331 state.row_numbers = row_numbers;
332 state.row_start_index = row_start_index;
333 Ok(state)
334 }
335
336 pub fn from_parquet_paths(
338 paths: &[impl AsRef<Path>],
339 pages_lookahead: Option<usize>,
340 pages_lookback: Option<usize>,
341 max_buffered_rows: Option<usize>,
342 max_buffered_mb: Option<usize>,
343 row_numbers: bool,
344 row_start_index: usize,
345 ) -> Result<Self> {
346 if paths.is_empty() {
347 return Err(color_eyre::eyre::eyre!("No paths provided"));
348 }
349 if paths.len() == 1 {
350 return Self::from_parquet(
351 paths[0].as_ref(),
352 pages_lookahead,
353 pages_lookback,
354 max_buffered_rows,
355 max_buffered_mb,
356 row_numbers,
357 row_start_index,
358 );
359 }
360 let mut lazy_frames = Vec::with_capacity(paths.len());
361 for p in paths {
362 let pl_path = PlPath::Local(Arc::from(p.as_ref()));
363 let lf = LazyFrame::scan_parquet(pl_path, Default::default())?;
364 lazy_frames.push(lf);
365 }
366 let lf = polars::prelude::concat(lazy_frames.as_slice(), Default::default())?;
367 let mut state = Self::new(
368 lf,
369 pages_lookahead,
370 pages_lookback,
371 max_buffered_rows,
372 max_buffered_mb,
373 true,
374 )?;
375 state.row_numbers = row_numbers;
376 state.row_start_index = row_start_index;
377 Ok(state)
378 }
379
380 pub fn from_ipc(
382 path: &Path,
383 pages_lookahead: Option<usize>,
384 pages_lookback: Option<usize>,
385 max_buffered_rows: Option<usize>,
386 max_buffered_mb: Option<usize>,
387 row_numbers: bool,
388 row_start_index: usize,
389 ) -> Result<Self> {
390 let pl_path = PlPath::Local(Arc::from(path));
391 let lf = LazyFrame::scan_ipc(pl_path, Default::default(), Default::default())?;
392 let mut state = Self::new(
393 lf,
394 pages_lookahead,
395 pages_lookback,
396 max_buffered_rows,
397 max_buffered_mb,
398 true,
399 )?;
400 state.row_numbers = row_numbers;
401 state.row_start_index = row_start_index;
402 Ok(state)
403 }
404
405 pub fn from_ipc_paths(
407 paths: &[impl AsRef<Path>],
408 pages_lookahead: Option<usize>,
409 pages_lookback: Option<usize>,
410 max_buffered_rows: Option<usize>,
411 max_buffered_mb: Option<usize>,
412 row_numbers: bool,
413 row_start_index: usize,
414 ) -> Result<Self> {
415 if paths.is_empty() {
416 return Err(color_eyre::eyre::eyre!("No paths provided"));
417 }
418 if paths.len() == 1 {
419 return Self::from_ipc(
420 paths[0].as_ref(),
421 pages_lookahead,
422 pages_lookback,
423 max_buffered_rows,
424 max_buffered_mb,
425 row_numbers,
426 row_start_index,
427 );
428 }
429 let mut lazy_frames = Vec::with_capacity(paths.len());
430 for p in paths {
431 let pl_path = PlPath::Local(Arc::from(p.as_ref()));
432 let lf = LazyFrame::scan_ipc(pl_path, Default::default(), Default::default())?;
433 lazy_frames.push(lf);
434 }
435 let lf = polars::prelude::concat(lazy_frames.as_slice(), Default::default())?;
436 let mut state = Self::new(
437 lf,
438 pages_lookahead,
439 pages_lookback,
440 max_buffered_rows,
441 max_buffered_mb,
442 true,
443 )?;
444 state.row_numbers = row_numbers;
445 state.row_start_index = row_start_index;
446 Ok(state)
447 }
448
449 pub fn from_avro(
451 path: &Path,
452 pages_lookahead: Option<usize>,
453 pages_lookback: Option<usize>,
454 max_buffered_rows: Option<usize>,
455 max_buffered_mb: Option<usize>,
456 row_numbers: bool,
457 row_start_index: usize,
458 ) -> Result<Self> {
459 let file = File::open(path)?;
460 let df = polars::io::avro::AvroReader::new(file).finish()?;
461 let lf = df.lazy();
462 let mut state = Self::new(
463 lf,
464 pages_lookahead,
465 pages_lookback,
466 max_buffered_rows,
467 max_buffered_mb,
468 true,
469 )?;
470 state.row_numbers = row_numbers;
471 state.row_start_index = row_start_index;
472 Ok(state)
473 }
474
475 pub fn from_avro_paths(
477 paths: &[impl AsRef<Path>],
478 pages_lookahead: Option<usize>,
479 pages_lookback: Option<usize>,
480 max_buffered_rows: Option<usize>,
481 max_buffered_mb: Option<usize>,
482 row_numbers: bool,
483 row_start_index: usize,
484 ) -> Result<Self> {
485 if paths.is_empty() {
486 return Err(color_eyre::eyre::eyre!("No paths provided"));
487 }
488 if paths.len() == 1 {
489 return Self::from_avro(
490 paths[0].as_ref(),
491 pages_lookahead,
492 pages_lookback,
493 max_buffered_rows,
494 max_buffered_mb,
495 row_numbers,
496 row_start_index,
497 );
498 }
499 let mut lazy_frames = Vec::with_capacity(paths.len());
500 for p in paths {
501 let file = File::open(p.as_ref())?;
502 let df = polars::io::avro::AvroReader::new(file).finish()?;
503 lazy_frames.push(df.lazy());
504 }
505 let lf = polars::prelude::concat(lazy_frames.as_slice(), Default::default())?;
506 let mut state = Self::new(
507 lf,
508 pages_lookahead,
509 pages_lookback,
510 max_buffered_rows,
511 max_buffered_mb,
512 true,
513 )?;
514 state.row_numbers = row_numbers;
515 state.row_start_index = row_start_index;
516 Ok(state)
517 }
518
519 #[allow(clippy::too_many_arguments)]
522 pub fn from_excel(
523 path: &Path,
524 pages_lookahead: Option<usize>,
525 pages_lookback: Option<usize>,
526 max_buffered_rows: Option<usize>,
527 max_buffered_mb: Option<usize>,
528 row_numbers: bool,
529 row_start_index: usize,
530 excel_sheet: Option<&str>,
531 ) -> Result<Self> {
532 let mut workbook =
533 open_workbook_auto(path).map_err(|e| color_eyre::eyre::eyre!("Excel: {}", e))?;
534 let sheet_names = workbook.sheet_names().to_vec();
535 if sheet_names.is_empty() {
536 return Err(color_eyre::eyre::eyre!("Excel file has no worksheets"));
537 }
538 let range = if let Some(sheet_sel) = excel_sheet {
539 if let Ok(idx) = sheet_sel.parse::<usize>() {
540 workbook
541 .worksheet_range_at(idx)
542 .ok_or_else(|| color_eyre::eyre::eyre!("Excel: no sheet at index {}", idx))?
543 .map_err(|e| color_eyre::eyre::eyre!("Excel: {}", e))?
544 } else {
545 workbook
546 .worksheet_range(sheet_sel)
547 .map_err(|e| color_eyre::eyre::eyre!("Excel: {}", e))?
548 }
549 } else {
550 workbook
551 .worksheet_range_at(0)
552 .ok_or_else(|| color_eyre::eyre::eyre!("Excel: no first sheet"))?
553 .map_err(|e| color_eyre::eyre::eyre!("Excel: {}", e))?
554 };
555 let rows: Vec<Vec<Data>> = range.rows().map(|r| r.to_vec()).collect();
556 if rows.is_empty() {
557 let empty_df = DataFrame::new(vec![])?;
558 let mut state = Self::new(
559 empty_df.lazy(),
560 pages_lookahead,
561 pages_lookback,
562 max_buffered_rows,
563 max_buffered_mb,
564 true,
565 )?;
566 state.row_numbers = row_numbers;
567 state.row_start_index = row_start_index;
568 return Ok(state);
569 }
570 let headers: Vec<String> = rows[0]
571 .iter()
572 .map(|c| calamine::DataType::as_string(c).unwrap_or_else(|| c.to_string()))
573 .collect();
574 let n_cols = headers.len();
575 let mut series_vec = Vec::with_capacity(n_cols);
576 for (col_idx, header) in headers.iter().enumerate() {
577 let col_cells: Vec<Option<&Data>> =
578 rows[1..].iter().map(|row| row.get(col_idx)).collect();
579 let inferred = Self::excel_infer_column_type(&col_cells);
580 let name = if header.is_empty() {
581 format!("column_{}", col_idx + 1)
582 } else {
583 header.clone()
584 };
585 let series = Self::excel_column_to_series(name.as_str(), &col_cells, inferred)?;
586 series_vec.push(series.into());
587 }
588 let df = DataFrame::new(series_vec)?;
589 let mut state = Self::new(
590 df.lazy(),
591 pages_lookahead,
592 pages_lookback,
593 max_buffered_rows,
594 max_buffered_mb,
595 true,
596 )?;
597 state.row_numbers = row_numbers;
598 state.row_start_index = row_start_index;
599 Ok(state)
600 }
601
602 fn excel_infer_column_type(cells: &[Option<&Data>]) -> ExcelColType {
605 use calamine::DataType as CalamineTrait;
606 let mut has_string = false;
607 let mut has_float = false;
608 let mut has_int = false;
609 let mut has_bool = false;
610 let mut has_datetime = false;
611 for cell in cells.iter().flatten() {
612 if CalamineTrait::is_string(*cell) {
613 has_string = true;
614 break;
615 }
616 if CalamineTrait::is_float(*cell)
617 || CalamineTrait::is_datetime(*cell)
618 || CalamineTrait::is_datetime_iso(*cell)
619 {
620 has_float = true;
621 }
622 if CalamineTrait::is_int(*cell) {
623 has_int = true;
624 }
625 if CalamineTrait::is_bool(*cell) {
626 has_bool = true;
627 }
628 if CalamineTrait::is_datetime(*cell) || CalamineTrait::is_datetime_iso(*cell) {
629 has_datetime = true;
630 }
631 }
632 if has_string {
633 let any_parsed = cells
634 .iter()
635 .flatten()
636 .any(|c| Self::excel_cell_to_naive_datetime(c).is_some());
637 let all_non_empty_parse = cells.iter().flatten().all(|c| {
638 CalamineTrait::is_empty(*c) || Self::excel_cell_to_naive_datetime(c).is_some()
639 });
640 if any_parsed && all_non_empty_parse {
641 if Self::excel_parsed_cells_all_midnight(cells) {
642 ExcelColType::Date
643 } else {
644 ExcelColType::Datetime
645 }
646 } else {
647 ExcelColType::Utf8
648 }
649 } else if has_int {
650 ExcelColType::Int64
651 } else if has_datetime {
652 if Self::excel_parsed_cells_all_midnight(cells) {
653 ExcelColType::Date
654 } else {
655 ExcelColType::Datetime
656 }
657 } else if has_float {
658 let all_whole = cells.iter().flatten().all(|cell| {
659 cell.as_f64()
660 .is_none_or(|f| f.is_finite() && (f - f.trunc()).abs() < 1e-10)
661 });
662 if all_whole {
663 ExcelColType::Int64
664 } else {
665 ExcelColType::Float64
666 }
667 } else if has_bool {
668 ExcelColType::Boolean
669 } else {
670 ExcelColType::Utf8
671 }
672 }
673
674 fn excel_parsed_cells_all_midnight(cells: &[Option<&Data>]) -> bool {
676 let midnight = NaiveTime::from_hms_opt(0, 0, 0).expect("valid time");
677 cells
678 .iter()
679 .flatten()
680 .filter_map(|c| Self::excel_cell_to_naive_datetime(c))
681 .all(|dt| dt.time() == midnight)
682 }
683
684 fn excel_cell_to_naive_datetime(cell: &Data) -> Option<NaiveDateTime> {
686 use calamine::DataType;
687 if let Some(dt) = cell.as_datetime() {
688 return Some(dt);
689 }
690 let s = cell.get_datetime_iso().or_else(|| cell.get_string())?;
691 Self::parse_naive_datetime_str(s)
692 }
693
694 fn parse_naive_datetime_str(s: &str) -> Option<NaiveDateTime> {
696 let s = s.trim();
697 if s.is_empty() {
698 return None;
699 }
700 const FORMATS: &[&str] = &[
701 "%Y-%m-%dT%H:%M:%S%.f",
702 "%Y-%m-%dT%H:%M:%S",
703 "%Y-%m-%d %H:%M:%S%.f",
704 "%Y-%m-%d %H:%M:%S",
705 "%Y-%m-%d",
706 ];
707 for fmt in FORMATS {
708 if let Ok(dt) = NaiveDateTime::parse_from_str(s, fmt) {
709 return Some(dt);
710 }
711 }
712 if let Ok(d) = NaiveDate::parse_from_str(s, "%Y-%m-%d") {
713 return Some(d.and_hms_opt(0, 0, 0).expect("midnight"));
714 }
715 None
716 }
717
718 fn excel_column_to_series(
720 name: &str,
721 cells: &[Option<&Data>],
722 col_type: ExcelColType,
723 ) -> Result<Series> {
724 use calamine::DataType as CalamineTrait;
725 use polars::datatypes::TimeUnit;
726 let series = match col_type {
727 ExcelColType::Int64 => {
728 let v: Vec<Option<i64>> = cells
729 .iter()
730 .map(|c| c.and_then(|cell| cell.as_i64()))
731 .collect();
732 Series::new(name.into(), v)
733 }
734 ExcelColType::Float64 => {
735 let v: Vec<Option<f64>> = cells
736 .iter()
737 .map(|c| c.and_then(|cell| cell.as_f64()))
738 .collect();
739 Series::new(name.into(), v)
740 }
741 ExcelColType::Boolean => {
742 let v: Vec<Option<bool>> = cells
743 .iter()
744 .map(|c| c.and_then(|cell| cell.get_bool()))
745 .collect();
746 Series::new(name.into(), v)
747 }
748 ExcelColType::Utf8 => {
749 let v: Vec<Option<String>> = cells
750 .iter()
751 .map(|c| c.and_then(|cell| cell.as_string()))
752 .collect();
753 Series::new(name.into(), v)
754 }
755 ExcelColType::Date => {
756 let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).expect("valid date");
757 let v: Vec<Option<i32>> = cells
758 .iter()
759 .map(|c| {
760 c.and_then(Self::excel_cell_to_naive_datetime)
761 .map(|dt| (dt.date() - epoch).num_days() as i32)
762 })
763 .collect();
764 Series::new(name.into(), v).cast(&DataType::Date)?
765 }
766 ExcelColType::Datetime => {
767 let v: Vec<Option<i64>> = cells
768 .iter()
769 .map(|c| {
770 c.and_then(Self::excel_cell_to_naive_datetime)
771 .map(|dt| dt.and_utc().timestamp_micros())
772 })
773 .collect();
774 Series::new(name.into(), v)
775 .cast(&DataType::Datetime(TimeUnit::Microseconds, None))?
776 }
777 };
778 Ok(series)
779 }
780
781 pub fn from_orc(
784 path: &Path,
785 pages_lookahead: Option<usize>,
786 pages_lookback: Option<usize>,
787 max_buffered_rows: Option<usize>,
788 max_buffered_mb: Option<usize>,
789 row_numbers: bool,
790 row_start_index: usize,
791 ) -> Result<Self> {
792 let file = File::open(path)?;
793 let reader = ArrowReaderBuilder::try_new(file)
794 .map_err(|e| color_eyre::eyre::eyre!("ORC: {}", e))?
795 .build();
796 let batches: Vec<RecordBatch> = reader
797 .collect::<std::result::Result<Vec<_>, _>>()
798 .map_err(|e| color_eyre::eyre::eyre!("ORC: {}", e))?;
799 let df = Self::arrow_record_batches_to_dataframe(&batches)?;
800 let lf = df.lazy();
801 let mut state = Self::new(
802 lf,
803 pages_lookahead,
804 pages_lookback,
805 max_buffered_rows,
806 max_buffered_mb,
807 true,
808 )?;
809 state.row_numbers = row_numbers;
810 state.row_start_index = row_start_index;
811 Ok(state)
812 }
813
814 pub fn from_orc_paths(
816 paths: &[impl AsRef<Path>],
817 pages_lookahead: Option<usize>,
818 pages_lookback: Option<usize>,
819 max_buffered_rows: Option<usize>,
820 max_buffered_mb: Option<usize>,
821 row_numbers: bool,
822 row_start_index: usize,
823 ) -> Result<Self> {
824 if paths.is_empty() {
825 return Err(color_eyre::eyre::eyre!("No paths provided"));
826 }
827 if paths.len() == 1 {
828 return Self::from_orc(
829 paths[0].as_ref(),
830 pages_lookahead,
831 pages_lookback,
832 max_buffered_rows,
833 max_buffered_mb,
834 row_numbers,
835 row_start_index,
836 );
837 }
838 let mut lazy_frames = Vec::with_capacity(paths.len());
839 for p in paths {
840 let file = File::open(p.as_ref())?;
841 let reader = ArrowReaderBuilder::try_new(file)
842 .map_err(|e| color_eyre::eyre::eyre!("ORC: {}", e))?
843 .build();
844 let batches: Vec<RecordBatch> = reader
845 .collect::<std::result::Result<Vec<_>, _>>()
846 .map_err(|e| color_eyre::eyre::eyre!("ORC: {}", e))?;
847 let df = Self::arrow_record_batches_to_dataframe(&batches)?;
848 lazy_frames.push(df.lazy());
849 }
850 let lf = polars::prelude::concat(lazy_frames.as_slice(), Default::default())?;
851 let mut state = Self::new(
852 lf,
853 pages_lookahead,
854 pages_lookback,
855 max_buffered_rows,
856 max_buffered_mb,
857 true,
858 )?;
859 state.row_numbers = row_numbers;
860 state.row_start_index = row_start_index;
861 Ok(state)
862 }
863
864 fn arrow_record_batches_to_dataframe(batches: &[RecordBatch]) -> Result<DataFrame> {
867 if batches.is_empty() {
868 return Ok(DataFrame::new(vec![])?);
869 }
870 let mut all_dfs = Vec::with_capacity(batches.len());
871 for batch in batches {
872 let n_cols = batch.num_columns();
873 let schema = batch.schema();
874 let mut series_vec = Vec::with_capacity(n_cols);
875 for (i, col) in batch.columns().iter().enumerate() {
876 let name = schema.field(i).name().as_str();
877 let s = Self::arrow_array_to_polars_series(name, col)?;
878 series_vec.push(s.into());
879 }
880 let df = DataFrame::new(series_vec)?;
881 all_dfs.push(df);
882 }
883 let mut out = all_dfs.remove(0);
884 for df in all_dfs {
885 out = out.vstack(&df)?;
886 }
887 Ok(out)
888 }
889
890 fn arrow_array_to_polars_series(name: &str, array: &dyn Array) -> Result<Series> {
891 use arrow::datatypes::DataType as ArrowDataType;
892 let len = array.len();
893 match array.data_type() {
894 ArrowDataType::Int8 => {
895 let a = array
896 .as_primitive_opt::<Int8Type>()
897 .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Int8 array"))?;
898 let v: Vec<Option<i8>> = (0..len)
899 .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
900 .collect();
901 Ok(Series::new(name.into(), v))
902 }
903 ArrowDataType::Int16 => {
904 let a = array
905 .as_primitive_opt::<Int16Type>()
906 .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Int16 array"))?;
907 let v: Vec<Option<i16>> = (0..len)
908 .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
909 .collect();
910 Ok(Series::new(name.into(), v))
911 }
912 ArrowDataType::Int32 => {
913 let a = array
914 .as_primitive_opt::<Int32Type>()
915 .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Int32 array"))?;
916 let v: Vec<Option<i32>> = (0..len)
917 .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
918 .collect();
919 Ok(Series::new(name.into(), v))
920 }
921 ArrowDataType::Int64 => {
922 let a = array
923 .as_primitive_opt::<Int64Type>()
924 .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Int64 array"))?;
925 let v: Vec<Option<i64>> = (0..len)
926 .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
927 .collect();
928 Ok(Series::new(name.into(), v))
929 }
930 ArrowDataType::UInt8 => {
931 let a = array
932 .as_primitive_opt::<UInt8Type>()
933 .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected UInt8 array"))?;
934 let v: Vec<Option<i64>> = (0..len)
935 .map(|i| {
936 if a.is_null(i) {
937 None
938 } else {
939 Some(a.value(i) as i64)
940 }
941 })
942 .collect();
943 Ok(Series::new(name.into(), v).cast(&DataType::UInt8)?)
944 }
945 ArrowDataType::UInt16 => {
946 let a = array
947 .as_primitive_opt::<UInt16Type>()
948 .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected UInt16 array"))?;
949 let v: Vec<Option<i64>> = (0..len)
950 .map(|i| {
951 if a.is_null(i) {
952 None
953 } else {
954 Some(a.value(i) as i64)
955 }
956 })
957 .collect();
958 Ok(Series::new(name.into(), v).cast(&DataType::UInt16)?)
959 }
960 ArrowDataType::UInt32 => {
961 let a = array
962 .as_primitive_opt::<UInt32Type>()
963 .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected UInt32 array"))?;
964 let v: Vec<Option<u32>> = (0..len)
965 .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
966 .collect();
967 Ok(Series::new(name.into(), v))
968 }
969 ArrowDataType::UInt64 => {
970 let a = array
971 .as_primitive_opt::<UInt64Type>()
972 .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected UInt64 array"))?;
973 let v: Vec<Option<u64>> = (0..len)
974 .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
975 .collect();
976 Ok(Series::new(name.into(), v))
977 }
978 ArrowDataType::Float32 => {
979 let a = array
980 .as_primitive_opt::<Float32Type>()
981 .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Float32 array"))?;
982 let v: Vec<Option<f32>> = (0..len)
983 .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
984 .collect();
985 Ok(Series::new(name.into(), v))
986 }
987 ArrowDataType::Float64 => {
988 let a = array
989 .as_primitive_opt::<Float64Type>()
990 .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Float64 array"))?;
991 let v: Vec<Option<f64>> = (0..len)
992 .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
993 .collect();
994 Ok(Series::new(name.into(), v))
995 }
996 ArrowDataType::Boolean => {
997 let a = array
998 .as_boolean_opt()
999 .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Boolean array"))?;
1000 let v: Vec<Option<bool>> = (0..len)
1001 .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
1002 .collect();
1003 Ok(Series::new(name.into(), v))
1004 }
1005 ArrowDataType::Utf8 => {
1006 let a = array
1007 .as_string_opt::<i32>()
1008 .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Utf8 array"))?;
1009 let v: Vec<Option<String>> = (0..len)
1010 .map(|i| {
1011 if a.is_null(i) {
1012 None
1013 } else {
1014 Some(a.value(i).to_string())
1015 }
1016 })
1017 .collect();
1018 Ok(Series::new(name.into(), v))
1019 }
1020 ArrowDataType::LargeUtf8 => {
1021 let a = array
1022 .as_string_opt::<i64>()
1023 .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected LargeUtf8 array"))?;
1024 let v: Vec<Option<String>> = (0..len)
1025 .map(|i| {
1026 if a.is_null(i) {
1027 None
1028 } else {
1029 Some(a.value(i).to_string())
1030 }
1031 })
1032 .collect();
1033 Ok(Series::new(name.into(), v))
1034 }
1035 ArrowDataType::Date32 => {
1036 let a = array
1037 .as_primitive_opt::<Date32Type>()
1038 .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Date32 array"))?;
1039 let v: Vec<Option<i32>> = (0..len)
1040 .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
1041 .collect();
1042 Ok(Series::new(name.into(), v))
1043 }
1044 ArrowDataType::Date64 => {
1045 let a = array
1046 .as_primitive_opt::<Date64Type>()
1047 .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Date64 array"))?;
1048 let v: Vec<Option<i64>> = (0..len)
1049 .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
1050 .collect();
1051 Ok(Series::new(name.into(), v))
1052 }
1053 ArrowDataType::Timestamp(_, _) => {
1054 let a = array
1055 .as_primitive_opt::<TimestampMillisecondType>()
1056 .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Timestamp array"))?;
1057 let v: Vec<Option<i64>> = (0..len)
1058 .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
1059 .collect();
1060 Ok(Series::new(name.into(), v))
1061 }
1062 other => Err(color_eyre::eyre::eyre!(
1063 "ORC: unsupported column type {:?} for column '{}'",
1064 other,
1065 name
1066 )),
1067 }
1068 }
1069
1070 pub fn scan_parquet_hive(path: &Path) -> Result<LazyFrame> {
1073 let path_str = path.as_os_str().to_string_lossy();
1074 let is_glob = path_str.contains('*');
1075 let pl_path = PlPath::Local(Arc::from(path));
1076 let args = ScanArgsParquet {
1077 hive_options: HiveOptions::new_enabled(),
1078 glob: is_glob,
1079 ..Default::default()
1080 };
1081 LazyFrame::scan_parquet(pl_path, args).map_err(Into::into)
1082 }
1083
1084 pub fn scan_parquet_hive_with_schema(path: &Path, schema: Arc<Schema>) -> Result<LazyFrame> {
1086 let path_str = path.as_os_str().to_string_lossy();
1087 let is_glob = path_str.contains('*');
1088 let pl_path = PlPath::Local(Arc::from(path));
1089 let args = ScanArgsParquet {
1090 schema: Some(schema),
1091 hive_options: HiveOptions::new_enabled(),
1092 glob: is_glob,
1093 ..Default::default()
1094 };
1095 LazyFrame::scan_parquet(pl_path, args).map_err(Into::into)
1096 }
1097
1098 fn first_parquet_file_in_hive_dir(path: &Path) -> Option<std::path::PathBuf> {
1101 const MAX_DEPTH: usize = 64;
1102 Self::first_parquet_file_spine(path, 0, MAX_DEPTH)
1103 }
1104
1105 fn first_parquet_file_spine(
1106 path: &Path,
1107 depth: usize,
1108 max_depth: usize,
1109 ) -> Option<std::path::PathBuf> {
1110 if depth >= max_depth {
1111 return None;
1112 }
1113 let entries = fs::read_dir(path).ok()?;
1114 let mut first_partition_child: Option<std::path::PathBuf> = None;
1115 for entry in entries.flatten() {
1116 let child = entry.path();
1117 if child.is_file() {
1118 if child
1119 .extension()
1120 .is_some_and(|e| e.eq_ignore_ascii_case("parquet"))
1121 {
1122 return Some(child);
1123 }
1124 } else if child.is_dir() {
1125 if let Some(name) = child.file_name().and_then(|n| n.to_str()) {
1126 if name.contains('=') && first_partition_child.is_none() {
1127 first_partition_child = Some(child);
1128 }
1129 }
1130 }
1131 }
1132 first_partition_child.and_then(|p| Self::first_parquet_file_spine(&p, depth + 1, max_depth))
1133 }
1134
1135 fn read_schema_from_single_parquet(path: &Path) -> Result<Arc<Schema>> {
1137 let file = File::open(path)?;
1138 let mut reader = ParquetReader::new(file);
1139 let arrow_schema = reader.schema()?;
1140 let schema = Schema::from_arrow_schema(arrow_schema.as_ref());
1141 Ok(Arc::new(schema))
1142 }
1143
1144 pub fn schema_from_one_hive_parquet(path: &Path) -> Result<(Arc<Schema>, Vec<String>)> {
1148 let partition_columns = Self::discover_hive_partition_columns(path);
1149 let one_file = Self::first_parquet_file_in_hive_dir(path)
1150 .ok_or_else(|| color_eyre::eyre::eyre!("No parquet file found in hive directory"))?;
1151 let file_schema = Self::read_schema_from_single_parquet(&one_file)?;
1152 let part_set: HashSet<&str> = partition_columns.iter().map(String::as_str).collect();
1153 let mut merged = Schema::with_capacity(partition_columns.len() + file_schema.len());
1154 for name in &partition_columns {
1155 merged.with_column(name.clone().into(), DataType::String);
1156 }
1157 for (name, dtype) in file_schema.iter() {
1158 if !part_set.contains(name.as_str()) {
1159 merged.with_column(name.clone(), dtype.clone());
1160 }
1161 }
1162 Ok((Arc::new(merged), partition_columns))
1163 }
1164
1165 pub fn discover_hive_partition_columns(path: &Path) -> Vec<String> {
1167 if path.is_dir() {
1168 Self::discover_partition_columns_from_path(path)
1169 } else {
1170 Self::discover_partition_columns_from_glob_pattern(path)
1171 }
1172 }
1173
1174 fn discover_partition_columns_from_path(path: &Path) -> Vec<String> {
1179 const MAX_PARTITION_DEPTH: usize = 64;
1180 let mut columns = Vec::<String>::new();
1181 let mut seen = HashSet::<String>::new();
1182 Self::discover_partition_columns_spine(
1183 path,
1184 &mut columns,
1185 &mut seen,
1186 0,
1187 MAX_PARTITION_DEPTH,
1188 );
1189 columns
1190 }
1191
1192 fn discover_partition_columns_spine(
1196 path: &Path,
1197 columns: &mut Vec<String>,
1198 seen: &mut HashSet<String>,
1199 depth: usize,
1200 max_depth: usize,
1201 ) {
1202 if depth >= max_depth {
1203 return;
1204 }
1205 let Ok(entries) = fs::read_dir(path) else {
1206 return;
1207 };
1208 let mut first_partition_child: Option<std::path::PathBuf> = None;
1209 for entry in entries.flatten() {
1210 let child = entry.path();
1211 if child.is_dir() {
1212 if let Some(name) = child.file_name().and_then(|n| n.to_str()) {
1213 if let Some((key, _)) = name.split_once('=') {
1214 if !key.is_empty() && seen.insert(key.to_string()) {
1215 columns.push(key.to_string());
1216 }
1217 if first_partition_child.is_none() {
1218 first_partition_child = Some(child);
1219 }
1220 break;
1221 }
1222 }
1223 }
1224 }
1225 if let Some(one) = first_partition_child {
1226 Self::discover_partition_columns_spine(&one, columns, seen, depth + 1, max_depth);
1227 }
1228 }
1229
1230 fn discover_partition_columns_from_glob_pattern(path: &Path) -> Vec<String> {
1232 let path_str = path.as_os_str().to_string_lossy();
1233 let mut columns = Vec::<String>::new();
1234 let mut seen = HashSet::<String>::new();
1235 for segment in path_str.split('/') {
1236 if let Some((key, rest)) = segment.split_once('=') {
1237 if !key.is_empty()
1238 && (rest == "*" || !rest.contains('*'))
1239 && seen.insert(key.to_string())
1240 {
1241 columns.push(key.to_string());
1242 }
1243 }
1244 }
1245 columns
1246 }
1247
1248 pub fn from_parquet_hive(
1257 path: &Path,
1258 pages_lookahead: Option<usize>,
1259 pages_lookback: Option<usize>,
1260 max_buffered_rows: Option<usize>,
1261 max_buffered_mb: Option<usize>,
1262 row_numbers: bool,
1263 row_start_index: usize,
1264 ) -> Result<Self> {
1265 let path_str = path.as_os_str().to_string_lossy();
1266 let is_glob = path_str.contains('*');
1267 let pl_path = PlPath::Local(Arc::from(path));
1268 let args = ScanArgsParquet {
1269 hive_options: HiveOptions::new_enabled(),
1270 glob: is_glob,
1271 ..Default::default()
1272 };
1273 let mut lf = LazyFrame::scan_parquet(pl_path, args)?;
1274 let schema = lf.collect_schema()?;
1275
1276 let mut discovered = if path.is_dir() {
1277 Self::discover_partition_columns_from_path(path)
1278 } else {
1279 Self::discover_partition_columns_from_glob_pattern(path)
1280 };
1281
1282 if discovered.is_empty() {
1285 let mut dir = path;
1286 while !dir.is_dir() {
1287 match dir.parent() {
1288 Some(p) => dir = p,
1289 None => break,
1290 }
1291 }
1292 if dir.is_dir() {
1293 discovered = Self::discover_partition_columns_from_path(dir);
1294 }
1295 }
1296
1297 let partition_columns: Vec<String> = discovered
1298 .into_iter()
1299 .filter(|c| schema.contains(c.as_str()))
1300 .collect();
1301
1302 let new_order: Vec<String> = if partition_columns.is_empty() {
1303 schema.iter_names().map(|s| s.to_string()).collect()
1304 } else {
1305 let part_set: HashSet<&str> = partition_columns.iter().map(String::as_str).collect();
1306 let all_names: Vec<String> = schema.iter_names().map(|s| s.to_string()).collect();
1307 let rest: Vec<String> = all_names
1308 .into_iter()
1309 .filter(|c| !part_set.contains(c.as_str()))
1310 .collect();
1311 partition_columns.iter().cloned().chain(rest).collect()
1312 };
1313
1314 if !partition_columns.is_empty() {
1315 let exprs: Vec<Expr> = new_order.iter().map(|s| col(s.as_str())).collect();
1316 lf = lf.select(exprs);
1317 }
1318
1319 let mut state = Self::new(
1320 lf,
1321 pages_lookahead,
1322 pages_lookback,
1323 max_buffered_rows,
1324 max_buffered_mb,
1325 true,
1326 )?;
1327 state.row_numbers = row_numbers;
1328 state.row_start_index = row_start_index;
1329 state.partition_columns = if partition_columns.is_empty() {
1330 None
1331 } else {
1332 Some(partition_columns)
1333 };
1334 state.set_column_order(new_order);
1336 Ok(state)
1337 }
1338
1339 pub fn set_row_numbers(&mut self, enabled: bool) {
1340 self.row_numbers = enabled;
1341 }
1342
1343 pub fn toggle_row_numbers(&mut self) {
1344 self.row_numbers = !self.row_numbers;
1345 }
1346
1347 pub fn row_start_index(&self) -> usize {
1349 self.row_start_index
1350 }
1351
1352 fn decompress_compressed_csv_to_temp(
1354 path: &Path,
1355 compression: CompressionFormat,
1356 temp_dir: &Path,
1357 ) -> Result<NamedTempFile> {
1358 let mut temp = NamedTempFile::new_in(temp_dir)?;
1359 let out = temp.as_file_mut();
1360 let mut reader: Box<dyn Read> = match compression {
1361 CompressionFormat::Gzip => {
1362 let f = File::open(path)?;
1363 Box::new(flate2::read::GzDecoder::new(BufReader::new(f)))
1364 }
1365 CompressionFormat::Zstd => {
1366 let f = File::open(path)?;
1367 Box::new(zstd::Decoder::new(BufReader::new(f))?)
1368 }
1369 CompressionFormat::Bzip2 => {
1370 let f = File::open(path)?;
1371 Box::new(bzip2::read::BzDecoder::new(BufReader::new(f)))
1372 }
1373 CompressionFormat::Xz => {
1374 let f = File::open(path)?;
1375 Box::new(xz2::read::XzDecoder::new(BufReader::new(f)))
1376 }
1377 };
1378 std::io::copy(&mut reader, out)?;
1379 out.sync_all()?;
1380 Ok(temp)
1381 }
1382
1383 fn parse_null_value_specs(specs: &[String]) -> (Vec<String>, Vec<(String, String)>) {
1385 let mut global = Vec::new();
1386 let mut per_column = Vec::new();
1387 for s in specs {
1388 if let Some(i) = s.find('=') {
1389 let (col, val) = (s[..i].to_string(), s[i + 1..].to_string());
1390 per_column.push((col, val));
1391 } else {
1392 global.push(s.clone());
1393 }
1394 }
1395 (global, per_column)
1396 }
1397
1398 fn build_polars_null_values(
1400 global: &[String],
1401 per_column: &[(String, String)],
1402 schema: Option<&Schema>,
1403 ) -> Option<NullValues> {
1404 if global.is_empty() && per_column.is_empty() {
1405 return None;
1406 }
1407 if per_column.is_empty() {
1408 let vals: Vec<PlSmallStr> = global
1409 .iter()
1410 .map(|s| PlSmallStr::from(s.as_str()))
1411 .collect();
1412 return Some(if vals.len() == 1 {
1413 NullValues::AllColumnsSingle(vals[0].clone())
1414 } else {
1415 NullValues::AllColumns(vals)
1416 });
1417 }
1418 if global.is_empty() {
1419 let pairs: Vec<(PlSmallStr, PlSmallStr)> = per_column
1420 .iter()
1421 .map(|(c, v)| (PlSmallStr::from(c.as_str()), PlSmallStr::from(v.as_str())))
1422 .collect();
1423 return Some(NullValues::Named(pairs));
1424 }
1425 let schema = schema?;
1426 let mut pairs: Vec<(PlSmallStr, PlSmallStr)> = Vec::new();
1427 let first_global = PlSmallStr::from(global[0].as_str());
1428 for (name, _) in schema.iter() {
1429 let col_name = name.as_str();
1430 let val = per_column
1431 .iter()
1432 .rev()
1433 .find(|(c, _)| c == col_name)
1434 .map(|(_, v)| PlSmallStr::from(v.as_str()))
1435 .unwrap_or_else(|| first_global.clone());
1436 pairs.push((PlSmallStr::from(col_name), val));
1437 }
1438 Some(NullValues::Named(pairs))
1439 }
1440
1441 fn csv_schema_for_null_values(path: &Path, options: &OpenOptions) -> Result<Arc<Schema>> {
1443 let pl_path = PlPath::Local(Arc::from(path));
1444 let mut reader = LazyCsvReader::new(pl_path).with_n_rows(Some(1));
1445 if let Some(skip_lines) = options.skip_lines {
1446 reader = reader.with_skip_lines(skip_lines);
1447 }
1448 if let Some(skip_rows) = options.skip_rows {
1449 reader = reader.with_skip_rows(skip_rows);
1450 }
1451 if let Some(has_header) = options.has_header {
1452 reader = reader.with_has_header(has_header);
1453 }
1454 reader = reader.with_try_parse_dates(options.parse_dates);
1455 let mut lf = reader.finish()?;
1456 lf.collect_schema().map_err(color_eyre::eyre::Report::from)
1457 }
1458
1459 fn build_null_values_for_csv(
1461 options: &OpenOptions,
1462 path_for_schema: Option<&Path>,
1463 ) -> Result<Option<NullValues>> {
1464 let specs = match &options.null_values {
1465 None => return Ok(None),
1466 Some(s) if s.is_empty() => return Ok(None),
1467 Some(s) => s.as_slice(),
1468 };
1469 let (global, per_column) = Self::parse_null_value_specs(specs);
1470 let nv = if !global.is_empty() && !per_column.is_empty() {
1471 let path = path_for_schema.ok_or_else(|| {
1472 color_eyre::eyre::eyre!(
1473 "Internal error: path required for null_values with both global and per-column"
1474 )
1475 })?;
1476 let schema = Self::csv_schema_for_null_values(path, options)?;
1477 Self::build_polars_null_values(&global, &per_column, Some(schema.as_ref()))
1478 } else {
1479 Self::build_polars_null_values(&global, &per_column, None)
1480 };
1481 Ok(nv)
1482 }
1483
1484 pub fn from_csv(path: &Path, options: &OpenOptions) -> Result<Self> {
1485 let nv = Self::build_null_values_for_csv(options, Some(path))?;
1486
1487 let compression = options
1489 .compression
1490 .or_else(|| CompressionFormat::from_extension(path));
1491
1492 if let Some(compression) = compression {
1493 if options.decompress_in_memory {
1494 match compression {
1496 CompressionFormat::Gzip | CompressionFormat::Zstd => {
1497 let mut read_options = CsvReadOptions::default();
1498 if let Some(skip_lines) = options.skip_lines {
1499 read_options.skip_lines = skip_lines;
1500 }
1501 if let Some(skip_rows) = options.skip_rows {
1502 read_options.skip_rows = skip_rows;
1503 }
1504 if let Some(has_header) = options.has_header {
1505 read_options.has_header = has_header;
1506 }
1507 read_options = read_options.map_parse_options(|opts| {
1508 let o = opts.with_try_parse_dates(options.parse_dates);
1509 match &nv {
1510 Some(n) => o.with_null_values(Some(n.clone())),
1511 None => o,
1512 }
1513 });
1514 let df = read_options
1515 .try_into_reader_with_file_path(Some(path.into()))?
1516 .finish()?;
1517 let lf = df.lazy();
1518 let mut state = Self::new(
1519 lf,
1520 options.pages_lookahead,
1521 options.pages_lookback,
1522 options.max_buffered_rows,
1523 options.max_buffered_mb,
1524 options.polars_streaming,
1525 )?;
1526 state.row_numbers = options.row_numbers;
1527 state.row_start_index = options.row_start_index;
1528 Ok(state)
1529 }
1530 CompressionFormat::Bzip2 => {
1531 let file = File::open(path)?;
1532 let mut decoder = bzip2::read::BzDecoder::new(BufReader::new(file));
1533 let mut decompressed = Vec::new();
1534 decoder.read_to_end(&mut decompressed)?;
1535 let mut read_options = CsvReadOptions::default();
1536 if let Some(skip_lines) = options.skip_lines {
1537 read_options.skip_lines = skip_lines;
1538 }
1539 if let Some(skip_rows) = options.skip_rows {
1540 read_options.skip_rows = skip_rows;
1541 }
1542 if let Some(has_header) = options.has_header {
1543 read_options.has_header = has_header;
1544 }
1545 read_options = read_options.map_parse_options(|opts| {
1546 let o = opts.with_try_parse_dates(options.parse_dates);
1547 match &nv {
1548 Some(n) => o.with_null_values(Some(n.clone())),
1549 None => o,
1550 }
1551 });
1552 let df = CsvReader::new(std::io::Cursor::new(decompressed))
1553 .with_options(read_options)
1554 .finish()?;
1555 let lf = df.lazy();
1556 let mut state = Self::new(
1557 lf,
1558 options.pages_lookahead,
1559 options.pages_lookback,
1560 options.max_buffered_rows,
1561 options.max_buffered_mb,
1562 options.polars_streaming,
1563 )?;
1564 state.row_numbers = options.row_numbers;
1565 state.row_start_index = options.row_start_index;
1566 Ok(state)
1567 }
1568 CompressionFormat::Xz => {
1569 let file = File::open(path)?;
1570 let mut decoder = xz2::read::XzDecoder::new(BufReader::new(file));
1571 let mut decompressed = Vec::new();
1572 decoder.read_to_end(&mut decompressed)?;
1573 let mut read_options = CsvReadOptions::default();
1574 if let Some(skip_lines) = options.skip_lines {
1575 read_options.skip_lines = skip_lines;
1576 }
1577 if let Some(skip_rows) = options.skip_rows {
1578 read_options.skip_rows = skip_rows;
1579 }
1580 if let Some(has_header) = options.has_header {
1581 read_options.has_header = has_header;
1582 }
1583 read_options = read_options.map_parse_options(|opts| {
1584 let o = opts.with_try_parse_dates(options.parse_dates);
1585 match &nv {
1586 Some(n) => o.with_null_values(Some(n.clone())),
1587 None => o,
1588 }
1589 });
1590 let df = CsvReader::new(std::io::Cursor::new(decompressed))
1591 .with_options(read_options)
1592 .finish()?;
1593 let lf = df.lazy();
1594 let mut state = Self::new(
1595 lf,
1596 options.pages_lookahead,
1597 options.pages_lookback,
1598 options.max_buffered_rows,
1599 options.max_buffered_mb,
1600 options.polars_streaming,
1601 )?;
1602 state.row_numbers = options.row_numbers;
1603 state.row_start_index = options.row_start_index;
1604 Ok(state)
1605 }
1606 }
1607 } else {
1608 let temp_dir = options.temp_dir.clone().unwrap_or_else(std::env::temp_dir);
1610 let temp = Self::decompress_compressed_csv_to_temp(path, compression, &temp_dir)?;
1611 let nv_temp = Self::build_null_values_for_csv(options, Some(temp.path()))?;
1612 let mut state = Self::from_csv_customize(
1613 temp.path(),
1614 options.pages_lookahead,
1615 options.pages_lookback,
1616 options.max_buffered_rows,
1617 options.max_buffered_mb,
1618 |mut reader| {
1619 if let Some(skip_lines) = options.skip_lines {
1620 reader = reader.with_skip_lines(skip_lines);
1621 }
1622 if let Some(skip_rows) = options.skip_rows {
1623 reader = reader.with_skip_rows(skip_rows);
1624 }
1625 if let Some(has_header) = options.has_header {
1626 reader = reader.with_has_header(has_header);
1627 }
1628 reader = reader.with_try_parse_dates(options.parse_dates);
1629 reader = match &nv_temp {
1630 Some(n) => reader
1631 .map_parse_options(|opts| opts.with_null_values(Some(n.clone()))),
1632 None => reader,
1633 };
1634 reader
1635 },
1636 )?;
1637 state.row_numbers = options.row_numbers;
1638 state.row_start_index = options.row_start_index;
1639 state.decompress_temp_file = Some(temp);
1640 Ok(state)
1641 }
1642 } else {
1643 let mut state = Self::from_csv_customize(
1645 path,
1646 options.pages_lookahead,
1647 options.pages_lookback,
1648 options.max_buffered_rows,
1649 options.max_buffered_mb,
1650 |mut reader| {
1651 if let Some(skip_lines) = options.skip_lines {
1652 reader = reader.with_skip_lines(skip_lines);
1653 }
1654 if let Some(skip_rows) = options.skip_rows {
1655 reader = reader.with_skip_rows(skip_rows);
1656 }
1657 if let Some(has_header) = options.has_header {
1658 reader = reader.with_has_header(has_header);
1659 }
1660 reader = reader.with_try_parse_dates(options.parse_dates);
1661 reader = match &nv {
1662 Some(n) => {
1663 reader.map_parse_options(|opts| opts.with_null_values(Some(n.clone())))
1664 }
1665 None => reader,
1666 };
1667 reader
1668 },
1669 )?;
1670 state.row_numbers = options.row_numbers;
1671 Ok(state)
1672 }
1673 }
1674
1675 pub fn from_csv_customize<F>(
1676 path: &Path,
1677 pages_lookahead: Option<usize>,
1678 pages_lookback: Option<usize>,
1679 max_buffered_rows: Option<usize>,
1680 max_buffered_mb: Option<usize>,
1681 func: F,
1682 ) -> Result<Self>
1683 where
1684 F: FnOnce(LazyCsvReader) -> LazyCsvReader,
1685 {
1686 let pl_path = PlPath::Local(Arc::from(path));
1687 let reader = LazyCsvReader::new(pl_path);
1688 let lf = func(reader).finish()?;
1689 Self::new(
1690 lf,
1691 pages_lookahead,
1692 pages_lookback,
1693 max_buffered_rows,
1694 max_buffered_mb,
1695 true,
1696 )
1697 }
1698
1699 pub fn from_csv_paths(paths: &[impl AsRef<Path>], options: &OpenOptions) -> Result<Self> {
1701 if paths.is_empty() {
1702 return Err(color_eyre::eyre::eyre!("No paths provided"));
1703 }
1704 if paths.len() == 1 {
1705 return Self::from_csv(paths[0].as_ref(), options);
1706 }
1707 let nv = Self::build_null_values_for_csv(options, Some(paths[0].as_ref()))?;
1708 let mut lazy_frames = Vec::with_capacity(paths.len());
1709 for p in paths {
1710 let pl_path = PlPath::Local(Arc::from(p.as_ref()));
1711 let mut reader = LazyCsvReader::new(pl_path);
1712 if let Some(skip_lines) = options.skip_lines {
1713 reader = reader.with_skip_lines(skip_lines);
1714 }
1715 if let Some(skip_rows) = options.skip_rows {
1716 reader = reader.with_skip_rows(skip_rows);
1717 }
1718 if let Some(has_header) = options.has_header {
1719 reader = reader.with_has_header(has_header);
1720 }
1721 reader = reader.with_try_parse_dates(options.parse_dates);
1722 reader = match &nv {
1723 Some(n) => reader.map_parse_options(|opts| opts.with_null_values(Some(n.clone()))),
1724 None => reader,
1725 };
1726 let lf = reader.finish()?;
1727 lazy_frames.push(lf);
1728 }
1729 let lf = polars::prelude::concat(lazy_frames.as_slice(), Default::default())?;
1730 let mut state = Self::new(
1731 lf,
1732 options.pages_lookahead,
1733 options.pages_lookback,
1734 options.max_buffered_rows,
1735 options.max_buffered_mb,
1736 options.polars_streaming,
1737 )?;
1738 state.row_numbers = options.row_numbers;
1739 state.row_start_index = options.row_start_index;
1740 Ok(state)
1741 }
1742
1743 pub fn from_ndjson(
1744 path: &Path,
1745 pages_lookahead: Option<usize>,
1746 pages_lookback: Option<usize>,
1747 max_buffered_rows: Option<usize>,
1748 max_buffered_mb: Option<usize>,
1749 row_numbers: bool,
1750 row_start_index: usize,
1751 ) -> Result<Self> {
1752 let pl_path = PlPath::Local(Arc::from(path));
1753 let lf = LazyJsonLineReader::new(pl_path).finish()?;
1754 let mut state = Self::new(
1755 lf,
1756 pages_lookahead,
1757 pages_lookback,
1758 max_buffered_rows,
1759 max_buffered_mb,
1760 true,
1761 )?;
1762 state.row_numbers = row_numbers;
1763 state.row_start_index = row_start_index;
1764 Ok(state)
1765 }
1766
1767 pub fn from_ndjson_paths(
1769 paths: &[impl AsRef<Path>],
1770 pages_lookahead: Option<usize>,
1771 pages_lookback: Option<usize>,
1772 max_buffered_rows: Option<usize>,
1773 max_buffered_mb: Option<usize>,
1774 row_numbers: bool,
1775 row_start_index: usize,
1776 ) -> Result<Self> {
1777 if paths.is_empty() {
1778 return Err(color_eyre::eyre::eyre!("No paths provided"));
1779 }
1780 if paths.len() == 1 {
1781 return Self::from_ndjson(
1782 paths[0].as_ref(),
1783 pages_lookahead,
1784 pages_lookback,
1785 max_buffered_rows,
1786 max_buffered_mb,
1787 row_numbers,
1788 row_start_index,
1789 );
1790 }
1791 let mut lazy_frames = Vec::with_capacity(paths.len());
1792 for p in paths {
1793 let pl_path = PlPath::Local(Arc::from(p.as_ref()));
1794 let lf = LazyJsonLineReader::new(pl_path).finish()?;
1795 lazy_frames.push(lf);
1796 }
1797 let lf = polars::prelude::concat(lazy_frames.as_slice(), Default::default())?;
1798 let mut state = Self::new(
1799 lf,
1800 pages_lookahead,
1801 pages_lookback,
1802 max_buffered_rows,
1803 max_buffered_mb,
1804 true,
1805 )?;
1806 state.row_numbers = row_numbers;
1807 state.row_start_index = row_start_index;
1808 Ok(state)
1809 }
1810
1811 pub fn from_json(
1812 path: &Path,
1813 pages_lookahead: Option<usize>,
1814 pages_lookback: Option<usize>,
1815 max_buffered_rows: Option<usize>,
1816 max_buffered_mb: Option<usize>,
1817 row_numbers: bool,
1818 row_start_index: usize,
1819 ) -> Result<Self> {
1820 Self::from_json_with_format(
1821 path,
1822 pages_lookahead,
1823 pages_lookback,
1824 max_buffered_rows,
1825 max_buffered_mb,
1826 row_numbers,
1827 row_start_index,
1828 JsonFormat::Json,
1829 )
1830 }
1831
1832 pub fn from_json_lines(
1833 path: &Path,
1834 pages_lookahead: Option<usize>,
1835 pages_lookback: Option<usize>,
1836 max_buffered_rows: Option<usize>,
1837 max_buffered_mb: Option<usize>,
1838 row_numbers: bool,
1839 row_start_index: usize,
1840 ) -> Result<Self> {
1841 Self::from_json_with_format(
1842 path,
1843 pages_lookahead,
1844 pages_lookback,
1845 max_buffered_rows,
1846 max_buffered_mb,
1847 row_numbers,
1848 row_start_index,
1849 JsonFormat::JsonLines,
1850 )
1851 }
1852
1853 #[allow(clippy::too_many_arguments)]
1854 fn from_json_with_format(
1855 path: &Path,
1856 pages_lookahead: Option<usize>,
1857 pages_lookback: Option<usize>,
1858 max_buffered_rows: Option<usize>,
1859 max_buffered_mb: Option<usize>,
1860 row_numbers: bool,
1861 row_start_index: usize,
1862 format: JsonFormat,
1863 ) -> Result<Self> {
1864 let file = File::open(path)?;
1865 let lf = JsonReader::new(file)
1866 .with_json_format(format)
1867 .finish()?
1868 .lazy();
1869 let mut state = Self::new(
1870 lf,
1871 pages_lookahead,
1872 pages_lookback,
1873 max_buffered_rows,
1874 max_buffered_mb,
1875 true,
1876 )?;
1877 state.row_numbers = row_numbers;
1878 state.row_start_index = row_start_index;
1879 Ok(state)
1880 }
1881
1882 pub fn from_json_paths(
1884 paths: &[impl AsRef<Path>],
1885 pages_lookahead: Option<usize>,
1886 pages_lookback: Option<usize>,
1887 max_buffered_rows: Option<usize>,
1888 max_buffered_mb: Option<usize>,
1889 row_numbers: bool,
1890 row_start_index: usize,
1891 ) -> Result<Self> {
1892 Self::from_json_with_format_paths(
1893 paths,
1894 pages_lookahead,
1895 pages_lookback,
1896 max_buffered_rows,
1897 max_buffered_mb,
1898 row_numbers,
1899 row_start_index,
1900 JsonFormat::Json,
1901 )
1902 }
1903
1904 pub fn from_json_lines_paths(
1906 paths: &[impl AsRef<Path>],
1907 pages_lookahead: Option<usize>,
1908 pages_lookback: Option<usize>,
1909 max_buffered_rows: Option<usize>,
1910 max_buffered_mb: Option<usize>,
1911 row_numbers: bool,
1912 row_start_index: usize,
1913 ) -> Result<Self> {
1914 Self::from_json_with_format_paths(
1915 paths,
1916 pages_lookahead,
1917 pages_lookback,
1918 max_buffered_rows,
1919 max_buffered_mb,
1920 row_numbers,
1921 row_start_index,
1922 JsonFormat::JsonLines,
1923 )
1924 }
1925
1926 #[allow(clippy::too_many_arguments)]
1927 fn from_json_with_format_paths(
1928 paths: &[impl AsRef<Path>],
1929 pages_lookahead: Option<usize>,
1930 pages_lookback: Option<usize>,
1931 max_buffered_rows: Option<usize>,
1932 max_buffered_mb: Option<usize>,
1933 row_numbers: bool,
1934 row_start_index: usize,
1935 format: JsonFormat,
1936 ) -> Result<Self> {
1937 if paths.is_empty() {
1938 return Err(color_eyre::eyre::eyre!("No paths provided"));
1939 }
1940 if paths.len() == 1 {
1941 return Self::from_json_with_format(
1942 paths[0].as_ref(),
1943 pages_lookahead,
1944 pages_lookback,
1945 max_buffered_rows,
1946 max_buffered_mb,
1947 row_numbers,
1948 row_start_index,
1949 format,
1950 );
1951 }
1952 let mut lazy_frames = Vec::with_capacity(paths.len());
1953 for p in paths {
1954 let file = File::open(p.as_ref())?;
1955 let lf = match &format {
1956 JsonFormat::Json => JsonReader::new(file)
1957 .with_json_format(JsonFormat::Json)
1958 .finish()?
1959 .lazy(),
1960 JsonFormat::JsonLines => JsonReader::new(file)
1961 .with_json_format(JsonFormat::JsonLines)
1962 .finish()?
1963 .lazy(),
1964 };
1965 lazy_frames.push(lf);
1966 }
1967 let lf = polars::prelude::concat(lazy_frames.as_slice(), Default::default())?;
1968 let mut state = Self::new(
1969 lf,
1970 pages_lookahead,
1971 pages_lookback,
1972 max_buffered_rows,
1973 max_buffered_mb,
1974 true,
1975 )?;
1976 state.row_numbers = row_numbers;
1977 state.row_start_index = row_start_index;
1978 Ok(state)
1979 }
1980
1981 #[allow(clippy::too_many_arguments)]
1982 pub fn from_delimited(
1983 path: &Path,
1984 delimiter: u8,
1985 pages_lookahead: Option<usize>,
1986 pages_lookback: Option<usize>,
1987 max_buffered_rows: Option<usize>,
1988 max_buffered_mb: Option<usize>,
1989 row_numbers: bool,
1990 row_start_index: usize,
1991 ) -> Result<Self> {
1992 let pl_path = PlPath::Local(Arc::from(path));
1993 let reader = LazyCsvReader::new(pl_path).with_separator(delimiter);
1994 let lf = reader.finish()?;
1995 let mut state = Self::new(
1996 lf,
1997 pages_lookahead,
1998 pages_lookback,
1999 max_buffered_rows,
2000 max_buffered_mb,
2001 true,
2002 )?;
2003 state.row_numbers = row_numbers;
2004 state.row_start_index = row_start_index;
2005 Ok(state)
2006 }
2007
2008 pub fn scroll_would_trigger_collect(&self, rows: i64) -> bool {
2011 if rows < 0 && self.start_row == 0 {
2012 return false;
2013 }
2014 let new_start_row = if self.start_row as i64 + rows <= 0 {
2015 0
2016 } else {
2017 if let Some(df) = self.df.as_ref() {
2018 if rows > 0 && df.shape().0 <= self.visible_rows {
2019 return false;
2020 }
2021 }
2022 (self.start_row as i64 + rows) as usize
2023 };
2024 let view_end = new_start_row
2025 + self
2026 .visible_rows
2027 .min(self.num_rows.saturating_sub(new_start_row));
2028 let within_buffer = new_start_row >= self.buffered_start_row
2029 && view_end <= self.buffered_end_row
2030 && self.buffered_end_row > 0;
2031 !within_buffer
2032 }
2033
2034 fn slide_table(&mut self, rows: i64) {
2035 if rows < 0 && self.start_row == 0 {
2036 return;
2037 }
2038
2039 let new_start_row = if self.start_row as i64 + rows <= 0 {
2040 0
2041 } else {
2042 if let Some(df) = self.df.as_ref() {
2043 if rows > 0 && df.shape().0 <= self.visible_rows {
2044 return;
2045 }
2046 }
2047 (self.start_row as i64 + rows) as usize
2048 };
2049
2050 let view_end = new_start_row
2052 + self
2053 .visible_rows
2054 .min(self.num_rows.saturating_sub(new_start_row));
2055 let within_buffer = new_start_row >= self.buffered_start_row
2056 && view_end <= self.buffered_end_row
2057 && self.buffered_end_row > 0;
2058
2059 if within_buffer {
2060 self.start_row = new_start_row;
2061 return;
2062 }
2063
2064 self.start_row = new_start_row;
2065 self.collect();
2066 }
2067
2068 pub fn collect(&mut self) {
2069 if self.visible_rows > 0 {
2071 self.proximity_threshold = self.visible_rows;
2072 }
2073
2074 if !self.num_rows_valid {
2076 self.num_rows =
2077 match collect_lazy(self.lf.clone().select([len()]), self.polars_streaming) {
2078 Ok(df) => {
2079 if let Some(col) = df.get(0) {
2080 if let Some(AnyValue::UInt32(len)) = col.first() {
2081 *len as usize
2082 } else {
2083 0
2084 }
2085 } else {
2086 0
2087 }
2088 }
2089 Err(_) => 0,
2090 };
2091 self.num_rows_valid = true;
2092 }
2093
2094 if self.num_rows > 0 {
2095 let max_start = self.num_rows.saturating_sub(1);
2096 if self.start_row > max_start {
2097 self.start_row = max_start;
2098 }
2099 } else {
2100 self.start_row = 0;
2101 self.buffered_start_row = 0;
2102 self.buffered_end_row = 0;
2103 self.buffered_df = None;
2104 self.df = None;
2105 self.locked_df = None;
2106 return;
2107 }
2108
2109 let view_start = self.start_row;
2111 let view_end = self.start_row + self.visible_rows.min(self.num_rows - self.start_row);
2112
2113 let within_buffer = view_start >= self.buffered_start_row
2115 && view_end <= self.buffered_end_row
2116 && self.buffered_end_row > 0;
2117
2118 let page_rows = self.visible_rows.max(1);
2121
2122 if within_buffer {
2123 let dist_to_start = view_start.saturating_sub(self.buffered_start_row);
2124 let dist_to_end = self.buffered_end_row.saturating_sub(view_end);
2125
2126 let needs_expansion_back =
2127 dist_to_start <= self.proximity_threshold && self.buffered_start_row > 0;
2128 let needs_expansion_forward =
2129 dist_to_end <= self.proximity_threshold && self.buffered_end_row < self.num_rows;
2130
2131 if !needs_expansion_back && !needs_expansion_forward {
2132 let expected_len = self
2134 .buffered_end_row
2135 .saturating_sub(self.buffered_start_row);
2136 if self
2137 .buffered_df
2138 .as_ref()
2139 .is_some_and(|b| b.height() == expected_len)
2140 {
2141 self.slice_buffer_into_display();
2142 if self.table_state.selected().is_none() {
2143 self.table_state.select(Some(0));
2144 }
2145 return;
2146 }
2147 self.load_buffer(self.buffered_start_row, self.buffered_end_row);
2148 if self.table_state.selected().is_none() {
2149 self.table_state.select(Some(0));
2150 }
2151 return;
2152 }
2153
2154 let mut new_buffer_start = if needs_expansion_back {
2155 view_start.saturating_sub(self.pages_lookback * page_rows)
2156 } else {
2157 self.buffered_start_row
2158 };
2159
2160 let mut new_buffer_end = if needs_expansion_forward {
2161 (view_end + self.pages_lookahead * page_rows).min(self.num_rows)
2162 } else {
2163 self.buffered_end_row
2164 };
2165
2166 self.clamp_buffer_to_max_size(
2167 view_start,
2168 view_end,
2169 &mut new_buffer_start,
2170 &mut new_buffer_end,
2171 );
2172 self.load_buffer(new_buffer_start, new_buffer_end);
2173 } else {
2174 let mut new_buffer_start;
2179 let mut new_buffer_end;
2180
2181 let had_buffer = self.buffered_end_row > 0;
2182 let scrolled_past_end = had_buffer && view_start >= self.buffered_end_row;
2183 let scrolled_past_start = had_buffer && view_end <= self.buffered_start_row;
2184
2185 let extend_forward_ok = scrolled_past_end
2186 && (view_start - self.buffered_end_row) <= self.pages_lookahead * page_rows;
2187 let extend_backward_ok = scrolled_past_start
2188 && (self.buffered_start_row - view_end) <= self.pages_lookback * page_rows;
2189
2190 if extend_forward_ok {
2191 new_buffer_start = self.buffered_start_row;
2193 new_buffer_end = (view_end + self.pages_lookahead * page_rows).min(self.num_rows);
2194 } else if extend_backward_ok {
2195 new_buffer_start = view_start.saturating_sub(self.pages_lookback * page_rows);
2197 new_buffer_end = self.buffered_end_row;
2198 } else if scrolled_past_end || scrolled_past_start {
2199 new_buffer_start = view_start.saturating_sub(self.pages_lookback * page_rows);
2201 new_buffer_end = (view_end + self.pages_lookahead * page_rows).min(self.num_rows);
2202 let min_initial_len = (1 + self.pages_lookahead + self.pages_lookback) * page_rows;
2203 let current_len = new_buffer_end.saturating_sub(new_buffer_start);
2204 if current_len < min_initial_len {
2205 let need = min_initial_len.saturating_sub(current_len);
2206 let can_extend_end = self.num_rows.saturating_sub(new_buffer_end);
2207 let can_extend_start = new_buffer_start;
2208 if can_extend_end >= need {
2209 new_buffer_end = (new_buffer_end + need).min(self.num_rows);
2210 } else if can_extend_start >= need {
2211 new_buffer_start = new_buffer_start.saturating_sub(need);
2212 } else {
2213 new_buffer_end = (new_buffer_end + can_extend_end).min(self.num_rows);
2214 new_buffer_start =
2215 new_buffer_start.saturating_sub(need.saturating_sub(can_extend_end));
2216 }
2217 }
2218 } else {
2219 new_buffer_start = view_start.saturating_sub(self.pages_lookback * page_rows);
2221 new_buffer_end = (view_end + self.pages_lookahead * page_rows).min(self.num_rows);
2222
2223 let min_initial_len = (1 + self.pages_lookahead + self.pages_lookback) * page_rows;
2225 let current_len = new_buffer_end.saturating_sub(new_buffer_start);
2226 if current_len < min_initial_len {
2227 let need = min_initial_len.saturating_sub(current_len);
2228 let can_extend_end = self.num_rows.saturating_sub(new_buffer_end);
2229 let can_extend_start = new_buffer_start;
2230 if can_extend_end >= need {
2231 new_buffer_end = (new_buffer_end + need).min(self.num_rows);
2232 } else if can_extend_start >= need {
2233 new_buffer_start = new_buffer_start.saturating_sub(need);
2234 } else {
2235 new_buffer_end = (new_buffer_end + can_extend_end).min(self.num_rows);
2236 new_buffer_start =
2237 new_buffer_start.saturating_sub(need.saturating_sub(can_extend_end));
2238 }
2239 }
2240 }
2241
2242 self.clamp_buffer_to_max_size(
2243 view_start,
2244 view_end,
2245 &mut new_buffer_start,
2246 &mut new_buffer_end,
2247 );
2248 self.load_buffer(new_buffer_start, new_buffer_end);
2249 }
2250
2251 self.slice_from_buffer();
2252 if self.table_state.selected().is_none() {
2253 self.table_state.select(Some(0));
2254 }
2255 }
2256
2257 fn invalidate_num_rows(&mut self) {
2259 self.num_rows_valid = false;
2260 }
2261
2262 pub fn num_rows_if_valid(&self) -> Option<usize> {
2265 if self.num_rows_valid {
2266 Some(self.num_rows)
2267 } else {
2268 None
2269 }
2270 }
2271
2272 fn clamp_buffer_to_max_size(
2274 &self,
2275 view_start: usize,
2276 view_end: usize,
2277 buffer_start: &mut usize,
2278 buffer_end: &mut usize,
2279 ) {
2280 if self.max_buffered_rows == 0 {
2281 return;
2282 }
2283 let max_len = self.max_buffered_rows;
2284 let requested_len = buffer_end.saturating_sub(*buffer_start);
2285 if requested_len <= max_len {
2286 return;
2287 }
2288 let view_len = view_end.saturating_sub(view_start);
2289 if view_len >= max_len {
2290 *buffer_start = view_start;
2291 *buffer_end = (view_start + max_len).min(self.num_rows);
2292 } else {
2293 let half = (max_len - view_len) / 2;
2294 *buffer_end = (view_end + half).min(self.num_rows);
2295 *buffer_start = (*buffer_end).saturating_sub(max_len);
2296 if *buffer_start > view_start {
2297 *buffer_start = view_start;
2298 }
2299 *buffer_end = (*buffer_start + max_len).min(self.num_rows);
2300 }
2301 }
2302
2303 fn load_buffer(&mut self, buffer_start: usize, buffer_end: usize) {
2304 let buffer_size = buffer_end.saturating_sub(buffer_start);
2305 if buffer_size == 0 {
2306 return;
2307 }
2308
2309 let all_columns: Vec<_> = self
2310 .column_order
2311 .iter()
2312 .map(|name| col(name.as_str()))
2313 .collect();
2314
2315 let use_streaming = self.polars_streaming;
2316 let mut full_df = match collect_lazy(
2317 self.lf
2318 .clone()
2319 .select(all_columns)
2320 .slice(buffer_start as i64, buffer_size as u32),
2321 use_streaming,
2322 ) {
2323 Ok(df) => df,
2324 Err(e) => {
2325 self.error = Some(e);
2326 return;
2327 }
2328 };
2329
2330 let mut effective_buffer_end = buffer_end;
2331 if self.max_buffered_mb > 0 {
2332 let size = full_df.estimated_size();
2333 let max_bytes = self.max_buffered_mb * 1024 * 1024;
2334 if size > max_bytes {
2335 let rows = full_df.height();
2336 if rows > 0 {
2337 let bytes_per_row = size / rows;
2338 let max_rows = (max_bytes / bytes_per_row.max(1)).min(rows);
2339 if max_rows < rows {
2340 full_df = full_df.slice(0, max_rows);
2341 effective_buffer_end = buffer_start + max_rows;
2342 }
2343 }
2344 }
2345 }
2346
2347 if self.locked_columns_count > 0 {
2348 let locked_names: Vec<&str> = self
2349 .column_order
2350 .iter()
2351 .take(self.locked_columns_count)
2352 .map(|s| s.as_str())
2353 .collect();
2354 let locked_df = match full_df.select(locked_names) {
2355 Ok(df) => df,
2356 Err(e) => {
2357 self.error = Some(e);
2358 return;
2359 }
2360 };
2361 self.locked_df = if self.is_grouped() {
2362 match self.format_grouped_dataframe(locked_df) {
2363 Ok(formatted_df) => Some(formatted_df),
2364 Err(e) => {
2365 self.error = Some(PolarsError::ComputeError(
2366 crate::error_display::user_message_from_report(&e, None).into(),
2367 ));
2368 return;
2369 }
2370 }
2371 } else {
2372 Some(locked_df)
2373 };
2374 } else {
2375 self.locked_df = None;
2376 }
2377
2378 let scroll_names: Vec<&str> = self
2379 .column_order
2380 .iter()
2381 .skip(self.locked_columns_count + self.termcol_index)
2382 .map(|s| s.as_str())
2383 .collect();
2384 if scroll_names.is_empty() {
2385 self.df = None;
2386 } else {
2387 let scroll_df = match full_df.select(scroll_names) {
2388 Ok(df) => df,
2389 Err(e) => {
2390 self.error = Some(e);
2391 return;
2392 }
2393 };
2394 self.df = if self.is_grouped() {
2395 match self.format_grouped_dataframe(scroll_df) {
2396 Ok(formatted_df) => Some(formatted_df),
2397 Err(e) => {
2398 self.error = Some(PolarsError::ComputeError(
2399 crate::error_display::user_message_from_report(&e, None).into(),
2400 ));
2401 return;
2402 }
2403 }
2404 } else {
2405 Some(scroll_df)
2406 };
2407 }
2408 if self.error.is_some() {
2409 self.error = None;
2410 }
2411 self.buffered_start_row = buffer_start;
2412 self.buffered_end_row = effective_buffer_end;
2413 self.buffered_df = Some(full_df);
2414 }
2415
2416 fn slice_buffer_into_display(&mut self) {
2418 let full_df = match self.buffered_df.as_ref() {
2419 Some(df) => df,
2420 None => return,
2421 };
2422
2423 if self.locked_columns_count > 0 {
2424 let locked_names: Vec<&str> = self
2425 .column_order
2426 .iter()
2427 .take(self.locked_columns_count)
2428 .map(|s| s.as_str())
2429 .collect();
2430 if let Ok(locked_df) = full_df.select(locked_names) {
2431 self.locked_df = if self.is_grouped() {
2432 self.format_grouped_dataframe(locked_df).ok()
2433 } else {
2434 Some(locked_df)
2435 };
2436 }
2437 } else {
2438 self.locked_df = None;
2439 }
2440
2441 let scroll_names: Vec<&str> = self
2442 .column_order
2443 .iter()
2444 .skip(self.locked_columns_count + self.termcol_index)
2445 .map(|s| s.as_str())
2446 .collect();
2447 if scroll_names.is_empty() {
2448 self.df = None;
2449 } else if let Ok(scroll_df) = full_df.select(scroll_names) {
2450 self.df = if self.is_grouped() {
2451 self.format_grouped_dataframe(scroll_df).ok()
2452 } else {
2453 Some(scroll_df)
2454 };
2455 }
2456 }
2457
2458 fn slice_from_buffer(&mut self) {
2459 }
2464
2465 fn format_grouped_dataframe(&self, df: DataFrame) -> Result<DataFrame> {
2466 let schema = df.schema();
2467 let mut new_series = Vec::new();
2468
2469 for (col_name, dtype) in schema.iter() {
2470 let col = df.column(col_name)?;
2471 if matches!(dtype, DataType::List(_)) {
2472 let string_series: Series = col
2473 .list()?
2474 .into_iter()
2475 .map(|opt_list| {
2476 opt_list.map(|list_series| {
2477 let values: Vec<String> = list_series
2478 .iter()
2479 .take(10)
2480 .map(|v| v.str_value().to_string())
2481 .collect();
2482 if list_series.len() > 10 {
2483 format!("[{}...] ({} items)", values.join(", "), list_series.len())
2484 } else {
2485 format!("[{}]", values.join(", "))
2486 }
2487 })
2488 })
2489 .collect();
2490 new_series.push(string_series.with_name(col_name.as_str().into()).into());
2491 } else {
2492 new_series.push(col.clone());
2493 }
2494 }
2495
2496 Ok(DataFrame::new(new_series)?)
2497 }
2498
2499 pub fn select_next(&mut self) {
2500 self.table_state.select_next();
2501 if let Some(selected) = self.table_state.selected() {
2502 if selected >= self.visible_rows && self.visible_rows > 0 {
2503 self.slide_table(1);
2504 }
2505 }
2506 }
2507
2508 pub fn page_down(&mut self) {
2509 self.slide_table(self.visible_rows as i64);
2510 }
2511
2512 pub fn select_previous(&mut self) {
2513 if let Some(selected) = self.table_state.selected() {
2514 self.table_state.select_previous();
2515 if selected == 0 && self.start_row > 0 {
2516 self.slide_table(-1);
2517 }
2518 } else {
2519 self.table_state.select(Some(0));
2520 }
2521 }
2522
2523 pub fn scroll_to(&mut self, index: usize) {
2524 if self.start_row == index {
2525 return;
2526 }
2527
2528 if index == 0 {
2529 self.start_row = 0;
2530 self.collect();
2531 self.start_row = 0;
2532 } else {
2533 self.start_row = index;
2534 self.collect();
2535 }
2536 }
2537
2538 pub fn scroll_to_row_centered(&mut self, row_index: usize) {
2541 self.ensure_num_rows();
2542 if self.num_rows == 0 || self.visible_rows == 0 {
2543 return;
2544 }
2545 let center_offset = self.visible_rows / 2;
2546 let mut start_row = row_index.saturating_sub(center_offset);
2547 let max_start = self.num_rows.saturating_sub(self.visible_rows);
2548 start_row = start_row.min(max_start);
2549
2550 if self.start_row == start_row {
2551 let display_idx = row_index
2552 .saturating_sub(start_row)
2553 .min(self.visible_rows.saturating_sub(1));
2554 self.table_state.select(Some(display_idx));
2555 return;
2556 }
2557
2558 self.start_row = start_row;
2559 self.collect();
2560 let display_idx = row_index
2561 .saturating_sub(start_row)
2562 .min(self.visible_rows.saturating_sub(1));
2563 self.table_state.select(Some(display_idx));
2564 }
2565
2566 fn ensure_num_rows(&mut self) {
2568 if self.num_rows_valid {
2569 return;
2570 }
2571 if self.visible_rows > 0 {
2572 self.proximity_threshold = self.visible_rows;
2573 }
2574 self.num_rows = match self.lf.clone().select([len()]).collect() {
2575 Ok(df) => {
2576 if let Some(col) = df.get(0) {
2577 if let Some(AnyValue::UInt32(len)) = col.first() {
2578 *len as usize
2579 } else {
2580 0
2581 }
2582 } else {
2583 0
2584 }
2585 }
2586 Err(_) => 0,
2587 };
2588 self.num_rows_valid = true;
2589 }
2590
2591 pub fn scroll_to_end(&mut self) {
2593 self.ensure_num_rows();
2594 if self.num_rows == 0 {
2595 self.start_row = 0;
2596 self.buffered_start_row = 0;
2597 self.buffered_end_row = 0;
2598 return;
2599 }
2600 let end_start = self.num_rows.saturating_sub(self.visible_rows);
2601 if self.start_row == end_start {
2602 self.select_last_visible_row();
2603 return;
2604 }
2605 self.start_row = end_start;
2606 self.collect();
2607 self.select_last_visible_row();
2608 }
2609
2610 fn select_last_visible_row(&mut self) {
2612 if self.num_rows == 0 {
2613 return;
2614 }
2615 let last_row_display_idx = (self.num_rows - 1).saturating_sub(self.start_row);
2616 let sel = last_row_display_idx.min(self.visible_rows.saturating_sub(1));
2617 self.table_state.select(Some(sel));
2618 }
2619
2620 pub fn half_page_down(&mut self) {
2621 let half = (self.visible_rows / 2).max(1) as i64;
2622 self.slide_table(half);
2623 }
2624
2625 pub fn half_page_up(&mut self) {
2626 if self.start_row == 0 {
2627 return;
2628 }
2629 let half = (self.visible_rows / 2).max(1) as i64;
2630 self.slide_table(-half);
2631 }
2632
2633 pub fn page_up(&mut self) {
2634 if self.start_row == 0 {
2635 return;
2636 }
2637 self.slide_table(-(self.visible_rows as i64));
2638 }
2639
2640 pub fn scroll_right(&mut self) {
2641 let max_scroll = self
2642 .column_order
2643 .len()
2644 .saturating_sub(self.locked_columns_count);
2645 if self.termcol_index < max_scroll.saturating_sub(1) {
2646 self.termcol_index += 1;
2647 self.collect();
2648 }
2649 }
2650
2651 pub fn scroll_left(&mut self) {
2652 if self.termcol_index > 0 {
2653 self.termcol_index -= 1;
2654 self.collect();
2655 }
2656 }
2657
2658 pub fn headers(&self) -> Vec<String> {
2659 self.column_order.clone()
2660 }
2661
2662 pub fn set_column_order(&mut self, order: Vec<String>) {
2663 self.column_order = order;
2664 self.buffered_start_row = 0;
2665 self.buffered_end_row = 0;
2666 self.buffered_df = None;
2667 self.collect();
2668 }
2669
2670 pub fn set_locked_columns(&mut self, count: usize) {
2671 self.locked_columns_count = count.min(self.column_order.len());
2672 self.buffered_start_row = 0;
2673 self.buffered_end_row = 0;
2674 self.buffered_df = None;
2675 self.collect();
2676 }
2677
2678 pub fn locked_columns_count(&self) -> usize {
2679 self.locked_columns_count
2680 }
2681
2682 pub fn get_filters(&self) -> &[FilterStatement] {
2684 &self.filters
2685 }
2686
2687 pub fn get_sort_columns(&self) -> &[String] {
2688 &self.sort_columns
2689 }
2690
2691 pub fn get_sort_ascending(&self) -> bool {
2692 self.sort_ascending
2693 }
2694
2695 pub fn get_column_order(&self) -> &[String] {
2696 &self.column_order
2697 }
2698
2699 pub fn get_active_query(&self) -> &str {
2700 &self.active_query
2701 }
2702
2703 pub fn get_active_sql_query(&self) -> &str {
2704 &self.active_sql_query
2705 }
2706
2707 pub fn get_active_fuzzy_query(&self) -> &str {
2708 &self.active_fuzzy_query
2709 }
2710
2711 pub fn last_pivot_spec(&self) -> Option<&PivotSpec> {
2712 self.last_pivot_spec.as_ref()
2713 }
2714
2715 pub fn last_melt_spec(&self) -> Option<&MeltSpec> {
2716 self.last_melt_spec.as_ref()
2717 }
2718
2719 pub fn is_grouped(&self) -> bool {
2720 self.schema
2721 .iter()
2722 .any(|(_, dtype)| matches!(dtype, DataType::List(_)))
2723 }
2724
2725 pub fn group_key_columns(&self) -> Vec<String> {
2726 self.schema
2727 .iter()
2728 .filter(|(_, dtype)| !matches!(dtype, DataType::List(_)))
2729 .map(|(name, _)| name.to_string())
2730 .collect()
2731 }
2732
2733 pub fn group_value_columns(&self) -> Vec<String> {
2734 self.schema
2735 .iter()
2736 .filter(|(_, dtype)| matches!(dtype, DataType::List(_)))
2737 .map(|(name, _)| name.to_string())
2738 .collect()
2739 }
2740
2741 pub fn buffered_memory_bytes(&self) -> Option<usize> {
2743 let locked = self
2744 .locked_df
2745 .as_ref()
2746 .map(|df| df.estimated_size())
2747 .unwrap_or(0);
2748 let scroll = self.df.as_ref().map(|df| df.estimated_size()).unwrap_or(0);
2749 if locked == 0 && scroll == 0 {
2750 None
2751 } else {
2752 Some(locked + scroll)
2753 }
2754 }
2755
2756 pub fn buffered_rows(&self) -> usize {
2758 self.buffered_end_row
2759 .saturating_sub(self.buffered_start_row)
2760 }
2761
2762 pub fn display_df(&self) -> Option<&DataFrame> {
2764 self.df.as_ref()
2765 }
2766
2767 pub fn display_slice_df(&self) -> Option<DataFrame> {
2769 let df = self.df.as_ref()?;
2770 let offset = self.start_row.saturating_sub(self.buffered_start_row);
2771 let slice_len = self.visible_rows.min(df.height().saturating_sub(offset));
2772 if offset < df.height() && slice_len > 0 {
2773 Some(df.slice(offset as i64, slice_len))
2774 } else {
2775 None
2776 }
2777 }
2778
2779 pub fn max_buffered_rows(&self) -> usize {
2781 self.max_buffered_rows
2782 }
2783
2784 pub fn max_buffered_mb(&self) -> usize {
2786 self.max_buffered_mb
2787 }
2788
2789 pub fn drill_down_into_group(&mut self, group_index: usize) -> Result<()> {
2790 if !self.is_grouped() {
2791 return Ok(());
2792 }
2793
2794 self.grouped_lf = Some(self.lf.clone());
2795
2796 let grouped_df = collect_lazy(self.lf.clone(), self.polars_streaming)?;
2797
2798 if group_index >= grouped_df.height() {
2799 return Err(color_eyre::eyre::eyre!("Group index out of bounds"));
2800 }
2801
2802 let key_columns = self.group_key_columns();
2803 let mut key_values = Vec::new();
2804 for col_name in &key_columns {
2805 let col = grouped_df.column(col_name)?;
2806 let value = col.get(group_index).map_err(|e| {
2807 color_eyre::eyre::eyre!(
2808 "Group index {} out of bounds for column {}: {}",
2809 group_index,
2810 col_name,
2811 e
2812 )
2813 })?;
2814 key_values.push(value.str_value().to_string());
2815 }
2816 self.drilled_down_group_key = Some(key_values.clone());
2817 self.drilled_down_group_key_columns = Some(key_columns.clone());
2818
2819 let value_columns = self.group_value_columns();
2820 if value_columns.is_empty() {
2821 return Err(color_eyre::eyre::eyre!("No value columns in grouped data"));
2822 }
2823
2824 let mut columns = Vec::new();
2825
2826 let first_value_col = grouped_df.column(&value_columns[0])?;
2827 let first_list_value = first_value_col.get(group_index).map_err(|e| {
2828 color_eyre::eyre::eyre!("Group index {} out of bounds: {}", group_index, e)
2829 })?;
2830 let row_count = if let AnyValue::List(list_series) = first_list_value {
2831 list_series.len()
2832 } else {
2833 0
2834 };
2835
2836 for col_name in &key_columns {
2837 let col = grouped_df.column(col_name)?;
2838 let value = col.get(group_index).map_err(|e| {
2839 color_eyre::eyre::eyre!(
2840 "Group index {} out of bounds for column {}: {}",
2841 group_index,
2842 col_name,
2843 e
2844 )
2845 })?;
2846 let constant_series = match value {
2847 AnyValue::Int32(v) => Series::new(col_name.as_str().into(), vec![v; row_count]),
2848 AnyValue::Int64(v) => Series::new(col_name.as_str().into(), vec![v; row_count]),
2849 AnyValue::UInt32(v) => Series::new(col_name.as_str().into(), vec![v; row_count]),
2850 AnyValue::UInt64(v) => Series::new(col_name.as_str().into(), vec![v; row_count]),
2851 AnyValue::Float32(v) => Series::new(col_name.as_str().into(), vec![v; row_count]),
2852 AnyValue::Float64(v) => Series::new(col_name.as_str().into(), vec![v; row_count]),
2853 AnyValue::String(v) => {
2854 Series::new(col_name.as_str().into(), vec![v.to_string(); row_count])
2855 }
2856 AnyValue::Boolean(v) => Series::new(col_name.as_str().into(), vec![v; row_count]),
2857 _ => {
2858 let str_val = value.str_value().to_string();
2859 Series::new(col_name.as_str().into(), vec![str_val; row_count])
2860 }
2861 };
2862 columns.push(constant_series.into());
2863 }
2864
2865 for col_name in &value_columns {
2866 let col = grouped_df.column(col_name)?;
2867 let value = col.get(group_index).map_err(|e| {
2868 color_eyre::eyre::eyre!(
2869 "Group index {} out of bounds for column {}: {}",
2870 group_index,
2871 col_name,
2872 e
2873 )
2874 })?;
2875 if let AnyValue::List(list_series) = value {
2876 let named_series = list_series.with_name(col_name.as_str().into());
2877 columns.push(named_series.into());
2878 }
2879 }
2880
2881 let group_df = DataFrame::new(columns)?;
2882
2883 self.invalidate_num_rows();
2884 self.lf = group_df.lazy();
2885 self.schema = self.lf.clone().collect_schema()?;
2886 self.column_order = self.schema.iter_names().map(|s| s.to_string()).collect();
2887 self.drilled_down_group_index = Some(group_index);
2888 self.start_row = 0;
2889 self.termcol_index = 0;
2890 self.locked_columns_count = 0;
2891 self.table_state.select(Some(0));
2892 self.collect();
2893
2894 Ok(())
2895 }
2896
2897 pub fn drill_up(&mut self) -> Result<()> {
2898 if let Some(grouped_lf) = self.grouped_lf.take() {
2899 self.invalidate_num_rows();
2900 self.lf = grouped_lf;
2901 self.schema = self.lf.clone().collect_schema()?;
2902 self.column_order = self.schema.iter_names().map(|s| s.to_string()).collect();
2903 self.drilled_down_group_index = None;
2904 self.drilled_down_group_key = None;
2905 self.drilled_down_group_key_columns = None;
2906 self.start_row = 0;
2907 self.termcol_index = 0;
2908 self.locked_columns_count = 0;
2909 self.table_state.select(Some(0));
2910 self.collect();
2911 Ok(())
2912 } else {
2913 Err(color_eyre::eyre::eyre!("Not in drill-down mode"))
2914 }
2915 }
2916
2917 pub fn get_analysis_dataframe(&self) -> Result<DataFrame> {
2918 Ok(collect_lazy(self.lf.clone(), self.polars_streaming)?)
2919 }
2920
2921 pub fn get_analysis_context(&self) -> crate::statistics::AnalysisContext {
2922 crate::statistics::AnalysisContext {
2923 has_query: !self.active_query.is_empty(),
2924 query: self.active_query.clone(),
2925 has_filters: !self.filters.is_empty(),
2926 filter_count: self.filters.len(),
2927 is_drilled_down: self.is_drilled_down(),
2928 group_key: self.drilled_down_group_key.clone(),
2929 group_columns: self.drilled_down_group_key_columns.clone(),
2930 }
2931 }
2932
2933 fn cast_temporal_index_columns_for_pivot(
2936 df: &DataFrame,
2937 index: &[String],
2938 ) -> Result<(DataFrame, Vec<(String, DataType)>)> {
2939 let mut out = df.clone();
2940 let mut restore = Vec::new();
2941 for name in index {
2942 if let Ok(s) = out.column(name) {
2943 let dtype = s.dtype();
2944 if matches!(dtype, DataType::Date | DataType::Datetime(_, _)) {
2945 restore.push((name.clone(), dtype.clone()));
2946 let casted = s.cast(&DataType::Int32)?;
2947 out.with_column(casted)?;
2948 }
2949 }
2950 }
2951 Ok((out, restore))
2952 }
2953
2954 fn restore_temporal_index_columns_after_pivot(
2956 pivoted: &mut DataFrame,
2957 restore: &[(String, DataType)],
2958 ) -> Result<()> {
2959 for (name, dtype) in restore {
2960 if let Ok(s) = pivoted.column(name) {
2961 let restored = s.cast(dtype)?;
2962 pivoted.with_column(restored)?;
2963 }
2964 }
2965 Ok(())
2966 }
2967
2968 pub fn pivot(&mut self, spec: &PivotSpec) -> Result<()> {
2973 let df = collect_lazy(self.lf.clone(), self.polars_streaming)?;
2974 let agg_expr = pivot_agg_expr(spec.aggregation)?;
2975 let index_str: Vec<&str> = spec.index.iter().map(|s| s.as_str()).collect();
2976 let index_opt = if index_str.is_empty() {
2977 None
2978 } else {
2979 Some(index_str)
2980 };
2981
2982 let (df_for_pivot, temporal_index_restore) = if self.workaround_pivot_date_index {
2983 let (df_w, restore) =
2984 Self::cast_temporal_index_columns_for_pivot(&df, spec.index.as_slice())?;
2985 (df_w, Some(restore))
2986 } else {
2987 (df.clone(), None)
2988 };
2989 let sort_new_columns = spec.sort_columns.unwrap_or(true);
2990 let mut pivoted = pivot_stable(
2991 &df_for_pivot,
2992 [spec.pivot_column.as_str()],
2993 index_opt,
2994 Some([spec.value_column.as_str()]),
2995 sort_new_columns,
2996 Some(agg_expr),
2997 None,
2998 )?;
2999 if let Some(restore) = &temporal_index_restore {
3000 Self::restore_temporal_index_columns_after_pivot(&mut pivoted, restore)?;
3001 }
3002
3003 self.last_pivot_spec = Some(spec.clone());
3004 self.last_melt_spec = None;
3005 self.replace_lf_after_reshape(pivoted.lazy())?;
3006 Ok(())
3007 }
3008
3009 pub fn melt(&mut self, spec: &MeltSpec) -> Result<()> {
3011 let on = cols(spec.value_columns.iter().map(|s| s.as_str()));
3012 let index = cols(spec.index.iter().map(|s| s.as_str()));
3013 let args = UnpivotArgsDSL {
3014 on,
3015 index,
3016 variable_name: Some(PlSmallStr::from(spec.variable_name.as_str())),
3017 value_name: Some(PlSmallStr::from(spec.value_name.as_str())),
3018 };
3019 let lf = self.lf.clone().unpivot(args);
3020 self.last_melt_spec = Some(spec.clone());
3021 self.last_pivot_spec = None;
3022 self.replace_lf_after_reshape(lf)?;
3023 Ok(())
3024 }
3025
3026 fn replace_lf_after_reshape(&mut self, lf: LazyFrame) -> Result<()> {
3027 self.invalidate_num_rows();
3028 self.lf = lf;
3029 self.schema = self.lf.clone().collect_schema()?;
3030 self.column_order = self.schema.iter_names().map(|s| s.to_string()).collect();
3031 self.filters.clear();
3032 self.sort_columns.clear();
3033 self.active_query.clear();
3034 self.active_sql_query.clear();
3035 self.active_fuzzy_query.clear();
3036 self.error = None;
3037 self.df = None;
3038 self.locked_df = None;
3039 self.grouped_lf = None;
3040 self.drilled_down_group_index = None;
3041 self.drilled_down_group_key = None;
3042 self.drilled_down_group_key_columns = None;
3043 self.start_row = 0;
3044 self.termcol_index = 0;
3045 self.locked_columns_count = 0;
3046 self.buffered_start_row = 0;
3047 self.buffered_end_row = 0;
3048 self.buffered_df = None;
3049 self.table_state.select(Some(0));
3050 self.collect();
3051 Ok(())
3052 }
3053
3054 pub fn is_drilled_down(&self) -> bool {
3055 self.drilled_down_group_index.is_some()
3056 }
3057
3058 fn apply_transformations(&mut self) {
3059 let mut lf = self.lf.clone();
3060 let mut final_expr: Option<Expr> = None;
3061
3062 for filter in &self.filters {
3063 let col_expr = col(&filter.column);
3064 let val_lit = if let Some(dtype) = self.schema.get(&filter.column) {
3065 match dtype {
3066 DataType::Float32 | DataType::Float64 => filter
3067 .value
3068 .parse::<f64>()
3069 .map(lit)
3070 .unwrap_or_else(|_| lit(filter.value.as_str())),
3071 DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => filter
3072 .value
3073 .parse::<i64>()
3074 .map(lit)
3075 .unwrap_or_else(|_| lit(filter.value.as_str())),
3076 DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 => {
3077 filter
3078 .value
3079 .parse::<u64>()
3080 .map(lit)
3081 .unwrap_or_else(|_| lit(filter.value.as_str()))
3082 }
3083 DataType::Boolean => filter
3084 .value
3085 .parse::<bool>()
3086 .map(lit)
3087 .unwrap_or_else(|_| lit(filter.value.as_str())),
3088 _ => lit(filter.value.as_str()),
3089 }
3090 } else {
3091 lit(filter.value.as_str())
3092 };
3093
3094 let op_expr = match filter.operator {
3095 FilterOperator::Eq => col_expr.eq(val_lit),
3096 FilterOperator::NotEq => col_expr.neq(val_lit),
3097 FilterOperator::Gt => col_expr.gt(val_lit),
3098 FilterOperator::Lt => col_expr.lt(val_lit),
3099 FilterOperator::GtEq => col_expr.gt_eq(val_lit),
3100 FilterOperator::LtEq => col_expr.lt_eq(val_lit),
3101 FilterOperator::Contains => {
3102 let val = filter.value.clone();
3103 col_expr.str().contains_literal(lit(val))
3104 }
3105 FilterOperator::NotContains => {
3106 let val = filter.value.clone();
3107 col_expr.str().contains_literal(lit(val)).not()
3108 }
3109 };
3110
3111 if let Some(current) = final_expr {
3112 final_expr = Some(match filter.logical_op {
3113 LogicalOperator::And => current.and(op_expr),
3114 LogicalOperator::Or => current.or(op_expr),
3115 });
3116 } else {
3117 final_expr = Some(op_expr);
3118 }
3119 }
3120
3121 if let Some(e) = final_expr {
3122 lf = lf.filter(e);
3123 }
3124
3125 if !self.sort_columns.is_empty() {
3126 let options = SortMultipleOptions {
3127 descending: self
3128 .sort_columns
3129 .iter()
3130 .map(|_| !self.sort_ascending)
3131 .collect(),
3132 ..Default::default()
3133 };
3134 lf = lf.sort_by_exprs(
3135 self.sort_columns.iter().map(col).collect::<Vec<_>>(),
3136 options,
3137 );
3138 } else if !self.sort_ascending {
3139 lf = lf.reverse();
3140 }
3141
3142 self.invalidate_num_rows();
3143 self.lf = lf;
3144 self.collect();
3145 }
3146
3147 pub fn sort(&mut self, columns: Vec<String>, ascending: bool) {
3148 self.sort_columns = columns;
3149 self.sort_ascending = ascending;
3150 self.buffered_start_row = 0;
3151 self.buffered_end_row = 0;
3152 self.buffered_df = None;
3153 self.apply_transformations();
3154 }
3155
3156 pub fn reverse(&mut self) {
3157 self.sort_ascending = !self.sort_ascending;
3158
3159 self.buffered_start_row = 0;
3160 self.buffered_end_row = 0;
3161 self.buffered_df = None;
3162
3163 if !self.sort_columns.is_empty() {
3164 let options = SortMultipleOptions {
3165 descending: self
3166 .sort_columns
3167 .iter()
3168 .map(|_| !self.sort_ascending)
3169 .collect(),
3170 ..Default::default()
3171 };
3172 self.invalidate_num_rows();
3173 self.lf = self.lf.clone().sort_by_exprs(
3174 self.sort_columns.iter().map(col).collect::<Vec<_>>(),
3175 options,
3176 );
3177 self.collect();
3178 } else {
3179 self.invalidate_num_rows();
3180 self.lf = self.lf.clone().reverse();
3181 self.collect();
3182 }
3183 }
3184
3185 pub fn filter(&mut self, filters: Vec<FilterStatement>) {
3186 self.filters = filters;
3187 self.buffered_start_row = 0;
3188 self.buffered_end_row = 0;
3189 self.buffered_df = None;
3190 self.apply_transformations();
3191 }
3192
3193 pub fn query(&mut self, query: String) {
3194 self.error = None;
3195
3196 let trimmed_query = query.trim();
3197 if trimmed_query.is_empty() {
3198 self.reset_lf_to_original();
3199 self.collect();
3200 return;
3201 }
3202
3203 match parse_query(&query) {
3204 Ok((cols, filter, group_by_cols, group_by_col_names)) => {
3205 let mut lf = self.original_lf.clone();
3206 let mut schema_opt: Option<Arc<Schema>> = None;
3207
3208 if let Some(f) = filter {
3210 lf = lf.filter(f);
3211 }
3212
3213 if !group_by_cols.is_empty() {
3214 if !cols.is_empty() {
3215 lf = lf.group_by(group_by_cols.clone()).agg(cols);
3216 } else {
3217 let schema = match lf.clone().collect_schema() {
3218 Ok(s) => s,
3219 Err(e) => {
3220 self.error = Some(e);
3221 return; }
3223 };
3224 let all_columns: Vec<String> =
3225 schema.iter_names().map(|s| s.to_string()).collect();
3226
3227 let mut agg_exprs = Vec::new();
3231 for col_name in &all_columns {
3232 if !group_by_col_names.contains(col_name) {
3233 agg_exprs.push(col(col_name));
3234 }
3235 }
3236
3237 lf = lf.group_by(group_by_cols.clone()).agg(agg_exprs);
3238 }
3239 let schema = match lf.collect_schema() {
3242 Ok(s) => s,
3243 Err(e) => {
3244 self.error = Some(e);
3245 return;
3246 }
3247 };
3248 schema_opt = Some(schema.clone());
3249 let sort_exprs: Vec<Expr> = schema
3250 .iter_names()
3251 .take(group_by_cols.len())
3252 .map(|n| col(n.as_str()))
3253 .collect();
3254 lf = lf.sort_by_exprs(sort_exprs, Default::default());
3255 } else if !cols.is_empty() {
3256 lf = lf.select(cols);
3257 }
3258
3259 let schema = match schema_opt {
3260 Some(s) => s,
3261 None => match lf.collect_schema() {
3262 Ok(s) => s,
3263 Err(e) => {
3264 self.error = Some(e);
3265 return;
3266 }
3267 },
3268 };
3269
3270 self.schema = schema;
3271 self.invalidate_num_rows();
3272 self.lf = lf;
3273 self.column_order = self.schema.iter_names().map(|s| s.to_string()).collect();
3274
3275 if !group_by_col_names.is_empty() {
3278 let mut locked_count = 0;
3281 for col_name in &self.column_order {
3282 if group_by_col_names.contains(col_name) {
3283 locked_count += 1;
3284 } else {
3285 break;
3287 }
3288 }
3289 self.locked_columns_count = locked_count;
3290 } else {
3291 self.locked_columns_count = 0;
3292 }
3293
3294 self.filters.clear();
3296 self.sort_columns.clear();
3297 self.sort_ascending = true;
3298 self.start_row = 0;
3299 self.termcol_index = 0;
3300 self.active_query = query;
3301 self.buffered_start_row = 0;
3302 self.buffered_end_row = 0;
3303 self.buffered_df = None;
3304 self.drilled_down_group_index = None;
3306 self.drilled_down_group_key = None;
3307 self.drilled_down_group_key_columns = None;
3308 self.grouped_lf = None;
3309 self.table_state.select(Some(0));
3311 self.collect();
3314 if self.num_rows > 0 {
3317 self.start_row = 0;
3318 }
3319 }
3320 Err(e) => {
3321 self.error = Some(PolarsError::ComputeError(e.into()));
3323 }
3324 }
3325 }
3326
3327 pub fn sql_query(&mut self, sql: String) {
3330 self.error = None;
3331 let trimmed = sql.trim();
3332 if trimmed.is_empty() {
3333 self.reset_lf_to_original();
3334 return;
3335 }
3336
3337 #[cfg(feature = "sql")]
3338 {
3339 use polars_sql::SQLContext;
3340 let mut ctx = SQLContext::new();
3341 ctx.register("df", self.lf.clone());
3342 match ctx.execute(trimmed) {
3343 Ok(result_lf) => {
3344 let schema = match result_lf.clone().collect_schema() {
3345 Ok(s) => s,
3346 Err(e) => {
3347 self.error = Some(e);
3348 return;
3349 }
3350 };
3351 self.schema = schema;
3352 self.invalidate_num_rows();
3353 self.lf = result_lf;
3354 self.column_order = self.schema.iter_names().map(|s| s.to_string()).collect();
3355 self.active_sql_query = sql;
3356 self.locked_columns_count = 0;
3357 self.filters.clear();
3358 self.sort_columns.clear();
3359 self.sort_ascending = true;
3360 self.start_row = 0;
3361 self.termcol_index = 0;
3362 self.drilled_down_group_index = None;
3363 self.drilled_down_group_key = None;
3364 self.drilled_down_group_key_columns = None;
3365 self.grouped_lf = None;
3366 self.buffered_start_row = 0;
3367 self.buffered_end_row = 0;
3368 self.buffered_df = None;
3369 self.table_state.select(Some(0));
3370 }
3371 Err(e) => {
3372 self.error = Some(e);
3373 }
3374 }
3375 }
3376
3377 #[cfg(not(feature = "sql"))]
3378 {
3379 self.error = Some(PolarsError::ComputeError(
3380 format!("SQL support not compiled in (build with --features sql)").into(),
3381 ));
3382 }
3383 }
3384
3385 pub fn fuzzy_search(&mut self, query: String) {
3389 self.error = None;
3390 let trimmed = query.trim();
3391 if trimmed.is_empty() {
3392 self.reset_lf_to_original();
3393 self.collect();
3394 return;
3395 }
3396 let string_cols: Vec<String> = self
3397 .schema
3398 .iter()
3399 .filter(|(_, dtype)| dtype.is_string())
3400 .map(|(name, _)| name.to_string())
3401 .collect();
3402 if string_cols.is_empty() {
3403 self.error = Some(PolarsError::ComputeError(
3404 "Fuzzy search requires at least one string column".into(),
3405 ));
3406 return;
3407 }
3408 let tokens: Vec<&str> = trimmed
3409 .split_whitespace()
3410 .filter(|s| !s.is_empty())
3411 .collect();
3412 let token_exprs: Vec<Expr> = tokens
3413 .iter()
3414 .map(|token| {
3415 let pattern = fuzzy_token_regex(token);
3416 string_cols
3417 .iter()
3418 .map(|c| col(c.as_str()).str().contains(lit(pattern.as_str()), false))
3419 .reduce(|a, b| a.or(b))
3420 .unwrap()
3421 })
3422 .collect();
3423 let combined = token_exprs.into_iter().reduce(|a, b| a.and(b)).unwrap();
3424 self.lf = self.original_lf.clone().filter(combined);
3425 self.filters.clear();
3426 self.sort_columns.clear();
3427 self.active_query.clear();
3428 self.active_sql_query.clear();
3429 self.active_fuzzy_query = query;
3430 self.locked_columns_count = 0;
3432 self.start_row = 0;
3433 self.termcol_index = 0;
3434 self.drilled_down_group_index = None;
3435 self.drilled_down_group_key = None;
3436 self.drilled_down_group_key_columns = None;
3437 self.grouped_lf = None;
3438 self.buffered_start_row = 0;
3439 self.buffered_end_row = 0;
3440 self.buffered_df = None;
3441 self.table_state.select(Some(0));
3442 self.invalidate_num_rows();
3443 self.collect();
3444 }
3445}
3446
3447pub(crate) fn fuzzy_token_regex(token: &str) -> String {
3449 let inner: String =
3450 token
3451 .chars()
3452 .map(|c| regex::escape(&c.to_string()))
3453 .fold(String::new(), |mut s, e| {
3454 if !s.is_empty() {
3455 s.push_str(".*");
3456 }
3457 s.push_str(&e);
3458 s
3459 });
3460 format!("(?i).*{}.*", inner)
3461}
3462
3463pub struct DataTable {
3464 pub header_bg: Color,
3465 pub header_fg: Color,
3466 pub row_numbers_fg: Color,
3467 pub separator_fg: Color,
3468 pub table_cell_padding: u16,
3469 pub alternate_row_bg: Option<Color>,
3470 pub column_colors: bool,
3472 pub str_col: Option<Color>,
3473 pub int_col: Option<Color>,
3474 pub float_col: Option<Color>,
3475 pub bool_col: Option<Color>,
3476 pub temporal_col: Option<Color>,
3477}
3478
3479impl Default for DataTable {
3480 fn default() -> Self {
3481 Self {
3482 header_bg: Color::Indexed(236),
3483 header_fg: Color::White,
3484 row_numbers_fg: Color::DarkGray,
3485 separator_fg: Color::White,
3486 table_cell_padding: 1,
3487 alternate_row_bg: None,
3488 column_colors: false,
3489 str_col: None,
3490 int_col: None,
3491 float_col: None,
3492 bool_col: None,
3493 temporal_col: None,
3494 }
3495 }
3496}
3497
3498struct RowNumbersParams {
3500 start_row: usize,
3501 visible_rows: usize,
3502 num_rows: usize,
3503 row_start_index: usize,
3504 selected_row: Option<usize>,
3505}
3506
3507impl DataTable {
3508 pub fn new() -> Self {
3509 Self::default()
3510 }
3511
3512 pub fn with_colors(
3513 mut self,
3514 header_bg: Color,
3515 header_fg: Color,
3516 row_numbers_fg: Color,
3517 separator_fg: Color,
3518 ) -> Self {
3519 self.header_bg = header_bg;
3520 self.header_fg = header_fg;
3521 self.row_numbers_fg = row_numbers_fg;
3522 self.separator_fg = separator_fg;
3523 self
3524 }
3525
3526 pub fn with_cell_padding(mut self, padding: u16) -> Self {
3527 self.table_cell_padding = padding;
3528 self
3529 }
3530
3531 pub fn with_alternate_row_bg(mut self, color: Option<Color>) -> Self {
3532 self.alternate_row_bg = color;
3533 self
3534 }
3535
3536 pub fn with_column_type_colors(
3538 mut self,
3539 str_col: Color,
3540 int_col: Color,
3541 float_col: Color,
3542 bool_col: Color,
3543 temporal_col: Color,
3544 ) -> Self {
3545 self.column_colors = true;
3546 self.str_col = Some(str_col);
3547 self.int_col = Some(int_col);
3548 self.float_col = Some(float_col);
3549 self.bool_col = Some(bool_col);
3550 self.temporal_col = Some(temporal_col);
3551 self
3552 }
3553
3554 fn column_type_color(&self, dtype: &DataType) -> Option<Color> {
3556 if !self.column_colors {
3557 return None;
3558 }
3559 match dtype {
3560 DataType::String => self.str_col,
3561 DataType::Int8
3562 | DataType::Int16
3563 | DataType::Int32
3564 | DataType::Int64
3565 | DataType::UInt8
3566 | DataType::UInt16
3567 | DataType::UInt32
3568 | DataType::UInt64 => self.int_col,
3569 DataType::Float32 | DataType::Float64 => self.float_col,
3570 DataType::Boolean => self.bool_col,
3571 DataType::Date | DataType::Datetime(_, _) | DataType::Time | DataType::Duration(_) => {
3572 self.temporal_col
3573 }
3574 _ => None,
3575 }
3576 }
3577
3578 fn render_dataframe(
3579 &self,
3580 df: &DataFrame,
3581 area: Rect,
3582 buf: &mut Buffer,
3583 state: &mut TableState,
3584 _row_numbers: bool,
3585 _start_row_offset: usize,
3586 ) {
3587 let (height, cols) = df.shape();
3589
3590 let mut widths: Vec<u16> = df
3592 .get_column_names()
3593 .iter()
3594 .map(|name| name.chars().count() as u16)
3595 .collect();
3596
3597 let mut used_width = 0;
3598
3599 let mut rows: Vec<Vec<Cell>> = vec![vec![]; height];
3601 let mut visible_columns = 0;
3602
3603 let max_rows = height.min(if area.height > 1 {
3604 area.height as usize - 1
3605 } else {
3606 0
3607 });
3608
3609 for col_index in 0..cols {
3610 let mut max_len = widths[col_index];
3611 let col_data = &df[col_index];
3612 let col_color = self.column_type_color(col_data.dtype());
3613
3614 for (row_index, row) in rows.iter_mut().take(max_rows).enumerate() {
3615 let value = col_data.get(row_index).unwrap();
3616 let val_str: Cow<str> = if matches!(value, AnyValue::Null) {
3617 Cow::Borrowed("")
3618 } else {
3619 value.str_value()
3620 };
3621 let len = val_str.chars().count() as u16;
3622 max_len = max_len.max(len);
3623 let cell = match col_color {
3624 Some(c) => Cell::from(Line::from(Span::styled(
3625 val_str.into_owned(),
3626 Style::default().fg(c),
3627 ))),
3628 None => Cell::from(Line::from(val_str)),
3629 };
3630 row.push(cell);
3631 }
3632
3633 let overflows = (used_width + max_len) > area.width;
3635
3636 if overflows && col_data.dtype() == &DataType::String {
3637 let visible_width = area.width.saturating_sub(used_width);
3638 visible_columns += 1;
3639 widths[col_index] = visible_width;
3640 break;
3641 } else if !overflows {
3642 visible_columns += 1;
3643 widths[col_index] = max_len;
3644 used_width += max_len + self.table_cell_padding;
3645 } else {
3646 break;
3647 }
3648 }
3649
3650 widths.truncate(visible_columns);
3651 let rows: Vec<Row> = rows
3653 .into_iter()
3654 .enumerate()
3655 .map(|(row_index, mut row)| {
3656 row.truncate(visible_columns);
3657 let row_style = if row_index % 2 == 1 {
3658 self.alternate_row_bg
3659 .map(|c| Style::default().bg(c))
3660 .unwrap_or_default()
3661 } else {
3662 Style::default()
3663 };
3664 Row::new(row).style(row_style)
3665 })
3666 .collect();
3667
3668 let header_row_style = if self.header_bg == Color::Reset {
3669 Style::default().fg(self.header_fg)
3670 } else {
3671 Style::default().bg(self.header_bg).fg(self.header_fg)
3672 };
3673 let headers: Vec<Span> = df
3674 .get_column_names()
3675 .iter()
3676 .take(visible_columns)
3677 .map(|name| Span::styled(name.to_string(), Style::default()))
3678 .collect();
3679
3680 StatefulWidget::render(
3681 Table::new(rows, widths)
3682 .column_spacing(self.table_cell_padding)
3683 .header(Row::new(headers).style(header_row_style))
3684 .row_highlight_style(Style::default().add_modifier(Modifier::REVERSED)),
3685 area,
3686 buf,
3687 state,
3688 );
3689 }
3690
3691 fn render_row_numbers(&self, area: Rect, buf: &mut Buffer, params: RowNumbersParams) {
3692 let header_style = if self.header_bg == Color::Reset {
3694 Style::default().fg(self.header_fg)
3695 } else {
3696 Style::default().bg(self.header_bg).fg(self.header_fg)
3697 };
3698 let header_fill = " ".repeat(area.width as usize);
3699 Paragraph::new(header_fill).style(header_style).render(
3700 Rect {
3701 x: area.x,
3702 y: area.y,
3703 width: area.width,
3704 height: 1,
3705 },
3706 buf,
3707 );
3708
3709 let rows_to_render = params
3711 .visible_rows
3712 .min(params.num_rows.saturating_sub(params.start_row));
3713
3714 if rows_to_render == 0 {
3715 return;
3716 }
3717
3718 let max_row_num =
3720 params.start_row + rows_to_render.saturating_sub(1) + params.row_start_index;
3721 let max_width = max_row_num.to_string().len();
3722
3723 for row_idx in 0..rows_to_render.min(area.height.saturating_sub(1) as usize) {
3725 let row_num = params.start_row + row_idx + params.row_start_index;
3726 let row_num_text = row_num.to_string();
3727
3728 let padding = max_width.saturating_sub(row_num_text.len());
3730 let padded_text = format!("{}{}", " ".repeat(padding), row_num_text);
3731
3732 let is_selected = params.selected_row == Some(row_idx);
3736 let (fg, bg) = if is_selected {
3737 (
3738 Color::Reset,
3739 self.alternate_row_bg.filter(|_| row_idx % 2 == 1),
3740 )
3741 } else {
3742 (
3743 self.row_numbers_fg,
3744 self.alternate_row_bg.filter(|_| row_idx % 2 == 1),
3745 )
3746 };
3747 let row_num_style = match bg {
3748 Some(bg_color) => Style::default().fg(fg).bg(bg_color),
3749 None => Style::default().fg(fg),
3750 };
3751
3752 let y = area.y + row_idx as u16 + 1; if y < area.y + area.height {
3754 Paragraph::new(padded_text).style(row_num_style).render(
3755 Rect {
3756 x: area.x,
3757 y,
3758 width: area.width,
3759 height: 1,
3760 },
3761 buf,
3762 );
3763 }
3764 }
3765 }
3766}
3767
3768impl StatefulWidget for DataTable {
3769 type State = DataTableState;
3770
3771 fn render(self, area: Rect, buf: &mut Buffer, state: &mut Self::State) {
3772 state.visible_termcols = area.width as usize;
3773 let new_visible_rows = if area.height > 0 {
3774 (area.height - 1) as usize
3775 } else {
3776 0
3777 };
3778 let needs_collect = new_visible_rows != state.visible_rows;
3779 state.visible_rows = new_visible_rows;
3780
3781 if let Some(selected) = state.table_state.selected() {
3782 if selected >= state.visible_rows {
3783 state.table_state.select(Some(state.visible_rows - 1))
3784 }
3785 }
3786
3787 if needs_collect {
3788 state.collect();
3789 }
3790
3791 if let Some(error) = state.error.as_ref() {
3794 if !state.suppress_error_display {
3795 Paragraph::new(format!("Error: {}", user_message_from_polars(error)))
3796 .centered()
3797 .block(
3798 Block::default()
3799 .borders(Borders::NONE)
3800 .padding(Padding::top(area.height / 2)),
3801 )
3802 .wrap(ratatui::widgets::Wrap { trim: true })
3803 .render(area, buf);
3804 return;
3805 }
3806 }
3808
3809 let row_num_width = if state.row_numbers {
3811 let max_row_num = state.start_row + state.visible_rows.saturating_sub(1) + 1; max_row_num.to_string().len().max(1) as u16 + 1 } else {
3814 0
3815 };
3816
3817 let mut locked_width = row_num_width;
3819 if let Some(locked_df) = state.locked_df.as_ref() {
3820 let (_, cols) = locked_df.shape();
3821 for col_index in 0..cols {
3822 let col_name = locked_df.get_column_names()[col_index];
3823 let mut max_len = col_name.chars().count() as u16;
3824 let col_data = &locked_df[col_index];
3825 for row_index in 0..locked_df.height().min(state.visible_rows) {
3826 let value = col_data.get(row_index).unwrap();
3827 let val_str: Cow<str> = if matches!(value, AnyValue::Null) {
3828 Cow::Borrowed("")
3829 } else {
3830 value.str_value()
3831 };
3832 let len = val_str.chars().count() as u16;
3833 max_len = max_len.max(len);
3834 }
3835 locked_width += max_len + 1;
3836 }
3837 }
3838
3839 if locked_width > row_num_width && locked_width < area.width {
3841 let locked_area = Rect {
3842 x: area.x,
3843 y: area.y,
3844 width: locked_width,
3845 height: area.height,
3846 };
3847 let separator_x = locked_area.x + locked_area.width;
3848
3849 if state.row_numbers {
3851 let row_num_area = Rect {
3852 x: area.x,
3853 y: area.y,
3854 width: row_num_width,
3855 height: area.height,
3856 };
3857 self.render_row_numbers(
3858 row_num_area,
3859 buf,
3860 RowNumbersParams {
3861 start_row: state.start_row,
3862 visible_rows: state.visible_rows,
3863 num_rows: state.num_rows,
3864 row_start_index: state.row_start_index,
3865 selected_row: state.table_state.selected(),
3866 },
3867 );
3868 }
3869 let scrollable_area = Rect {
3870 x: separator_x + 1,
3871 y: area.y,
3872 width: area.width.saturating_sub(locked_width + 1),
3873 height: area.height,
3874 };
3875
3876 if let Some(locked_df) = state.locked_df.as_ref() {
3878 let adjusted_locked_area = if state.row_numbers {
3880 Rect {
3881 x: area.x + row_num_width,
3882 y: area.y,
3883 width: locked_width - row_num_width,
3884 height: area.height,
3885 }
3886 } else {
3887 locked_area
3888 };
3889
3890 let offset = state.start_row.saturating_sub(state.buffered_start_row);
3892 let slice_len = state
3893 .visible_rows
3894 .min(locked_df.height().saturating_sub(offset));
3895 if offset < locked_df.height() && slice_len > 0 {
3896 let sliced_df = locked_df.slice(offset as i64, slice_len);
3897 self.render_dataframe(
3898 &sliced_df,
3899 adjusted_locked_area,
3900 buf,
3901 &mut state.table_state,
3902 false,
3903 state.start_row,
3904 );
3905 }
3906 }
3907
3908 let separator_x_adjusted = if state.row_numbers {
3910 area.x + row_num_width + (locked_width - row_num_width)
3911 } else {
3912 separator_x
3913 };
3914 for y in area.y..area.y + area.height {
3915 let cell = &mut buf[(separator_x_adjusted, y)];
3916 cell.set_char('│');
3917 cell.set_style(Style::default().fg(self.separator_fg));
3918 }
3919
3920 let adjusted_scrollable_area = if state.row_numbers {
3922 Rect {
3923 x: separator_x_adjusted + 1,
3924 y: area.y,
3925 width: area.width.saturating_sub(locked_width + 1),
3926 height: area.height,
3927 }
3928 } else {
3929 scrollable_area
3930 };
3931
3932 if let Some(df) = state.df.as_ref() {
3934 let offset = state.start_row.saturating_sub(state.buffered_start_row);
3936 let slice_len = state.visible_rows.min(df.height().saturating_sub(offset));
3937 if offset < df.height() && slice_len > 0 {
3938 let sliced_df = df.slice(offset as i64, slice_len);
3939 self.render_dataframe(
3940 &sliced_df,
3941 adjusted_scrollable_area,
3942 buf,
3943 &mut state.table_state,
3944 false,
3945 state.start_row,
3946 );
3947 }
3948 }
3949 } else if let Some(df) = state.df.as_ref() {
3950 if state.row_numbers {
3953 let row_num_area = Rect {
3954 x: area.x,
3955 y: area.y,
3956 width: row_num_width,
3957 height: area.height,
3958 };
3959 self.render_row_numbers(
3960 row_num_area,
3961 buf,
3962 RowNumbersParams {
3963 start_row: state.start_row,
3964 visible_rows: state.visible_rows,
3965 num_rows: state.num_rows,
3966 row_start_index: state.row_start_index,
3967 selected_row: state.table_state.selected(),
3968 },
3969 );
3970
3971 let data_area = Rect {
3973 x: area.x + row_num_width,
3974 y: area.y,
3975 width: area.width.saturating_sub(row_num_width),
3976 height: area.height,
3977 };
3978
3979 let offset = state.start_row.saturating_sub(state.buffered_start_row);
3981 let slice_len = state.visible_rows.min(df.height().saturating_sub(offset));
3982 if offset < df.height() && slice_len > 0 {
3983 let sliced_df = df.slice(offset as i64, slice_len);
3984 self.render_dataframe(
3985 &sliced_df,
3986 data_area,
3987 buf,
3988 &mut state.table_state,
3989 false,
3990 state.start_row,
3991 );
3992 }
3993 } else {
3994 let offset = state.start_row.saturating_sub(state.buffered_start_row);
3996 let slice_len = state.visible_rows.min(df.height().saturating_sub(offset));
3997 if offset < df.height() && slice_len > 0 {
3998 let sliced_df = df.slice(offset as i64, slice_len);
3999 self.render_dataframe(
4000 &sliced_df,
4001 area,
4002 buf,
4003 &mut state.table_state,
4004 false,
4005 state.start_row,
4006 );
4007 }
4008 }
4009 } else if !state.column_order.is_empty() {
4010 let empty_columns: Vec<_> = state
4012 .column_order
4013 .iter()
4014 .map(|name| Series::new(name.as_str().into(), Vec::<String>::new()).into())
4015 .collect();
4016 if let Ok(empty_df) = DataFrame::new(empty_columns) {
4017 if state.row_numbers {
4018 let row_num_area = Rect {
4019 x: area.x,
4020 y: area.y,
4021 width: row_num_width,
4022 height: area.height,
4023 };
4024 self.render_row_numbers(
4025 row_num_area,
4026 buf,
4027 RowNumbersParams {
4028 start_row: 0,
4029 visible_rows: state.visible_rows,
4030 num_rows: 0,
4031 row_start_index: state.row_start_index,
4032 selected_row: None,
4033 },
4034 );
4035 let data_area = Rect {
4036 x: area.x + row_num_width,
4037 y: area.y,
4038 width: area.width.saturating_sub(row_num_width),
4039 height: area.height,
4040 };
4041 self.render_dataframe(
4042 &empty_df,
4043 data_area,
4044 buf,
4045 &mut state.table_state,
4046 false,
4047 0,
4048 );
4049 } else {
4050 self.render_dataframe(&empty_df, area, buf, &mut state.table_state, false, 0);
4051 }
4052 } else {
4053 Paragraph::new("No data").render(area, buf);
4054 }
4055 } else {
4056 Paragraph::new("No data").render(area, buf);
4058 }
4059 }
4060}
4061
4062#[cfg(test)]
4063mod tests {
4064 use super::*;
4065 use crate::filter_modal::{FilterOperator, FilterStatement, LogicalOperator};
4066 use crate::pivot_melt_modal::{MeltSpec, PivotAggregation, PivotSpec};
4067
4068 fn create_test_lf() -> LazyFrame {
4069 df! (
4070 "a" => &[1, 2, 3],
4071 "b" => &["x", "y", "z"]
4072 )
4073 .unwrap()
4074 .lazy()
4075 }
4076
4077 fn create_large_test_lf() -> LazyFrame {
4078 df! (
4079 "a" => (0..100).collect::<Vec<i32>>(),
4080 "b" => (0..100).map(|i| format!("text_{}", i)).collect::<Vec<String>>(),
4081 "c" => (0..100).map(|i| i % 3).collect::<Vec<i32>>(),
4082 "d" => (0..100).map(|i| i % 5).collect::<Vec<i32>>()
4083 )
4084 .unwrap()
4085 .lazy()
4086 }
4087
4088 #[test]
4089 fn test_from_csv() {
4090 let path = crate::tests::sample_data_dir().join("3-sfd-header.csv");
4093 let state = DataTableState::from_csv(&path, &Default::default()).unwrap(); assert_eq!(state.schema.len(), 6); }
4096
4097 #[test]
4098 fn test_from_csv_gzipped() {
4099 let path = crate::tests::sample_data_dir().join("mixed_types.csv.gz");
4102 let state = DataTableState::from_csv(&path, &Default::default()).unwrap(); assert_eq!(state.schema.len(), 6); }
4105
4106 #[test]
4107 fn test_from_parquet() {
4108 let path = crate::tests::sample_data_dir().join("people.parquet");
4110 let state = DataTableState::from_parquet(&path, None, None, None, None, false, 1).unwrap();
4111 assert!(!state.schema.is_empty());
4112 }
4113
4114 #[test]
4115 fn test_from_ipc() {
4116 use polars::prelude::IpcWriter;
4117 use std::io::BufWriter;
4118 let mut df = df!(
4119 "x" => &[1_i32, 2, 3],
4120 "y" => &["a", "b", "c"]
4121 )
4122 .unwrap();
4123 let dir = std::env::temp_dir();
4124 let path = dir.join("datui_test_ipc.arrow");
4125 let file = std::fs::File::create(&path).unwrap();
4126 let mut writer = BufWriter::new(file);
4127 IpcWriter::new(&mut writer).finish(&mut df).unwrap();
4128 drop(writer);
4129 let state = DataTableState::from_ipc(&path, None, None, None, None, false, 1).unwrap();
4130 assert_eq!(state.schema.len(), 2);
4131 assert!(state.schema.contains("x"));
4132 assert!(state.schema.contains("y"));
4133 let _ = std::fs::remove_file(&path);
4134 }
4135
4136 #[test]
4137 fn test_from_avro() {
4138 use polars::io::avro::AvroWriter;
4139 use std::io::BufWriter;
4140 let mut df = df!(
4141 "id" => &[1_i32, 2, 3],
4142 "name" => &["alice", "bob", "carol"]
4143 )
4144 .unwrap();
4145 let dir = std::env::temp_dir();
4146 let path = dir.join("datui_test_avro.avro");
4147 let file = std::fs::File::create(&path).unwrap();
4148 let mut writer = BufWriter::new(file);
4149 AvroWriter::new(&mut writer).finish(&mut df).unwrap();
4150 drop(writer);
4151 let state = DataTableState::from_avro(&path, None, None, None, None, false, 1).unwrap();
4152 assert_eq!(state.schema.len(), 2);
4153 assert!(state.schema.contains("id"));
4154 assert!(state.schema.contains("name"));
4155 let _ = std::fs::remove_file(&path);
4156 }
4157
4158 #[test]
4159 fn test_from_orc() {
4160 use arrow::array::{Int64Array, StringArray};
4161 use arrow::datatypes::{DataType, Field, Schema};
4162 use arrow::record_batch::RecordBatch;
4163 use orc_rust::ArrowWriterBuilder;
4164 use std::io::BufWriter;
4165 use std::sync::Arc;
4166
4167 let schema = Arc::new(Schema::new(vec![
4168 Field::new("id", DataType::Int64, false),
4169 Field::new("name", DataType::Utf8, false),
4170 ]));
4171 let id_array = Arc::new(Int64Array::from(vec![1_i64, 2, 3]));
4172 let name_array = Arc::new(StringArray::from(vec!["a", "b", "c"]));
4173 let batch = RecordBatch::try_new(schema.clone(), vec![id_array, name_array]).unwrap();
4174
4175 let dir = std::env::temp_dir();
4176 let path = dir.join("datui_test_orc.orc");
4177 let file = std::fs::File::create(&path).unwrap();
4178 let writer = BufWriter::new(file);
4179 let mut orc_writer = ArrowWriterBuilder::new(writer, schema).try_build().unwrap();
4180 orc_writer.write(&batch).unwrap();
4181 orc_writer.close().unwrap();
4182
4183 let state = DataTableState::from_orc(&path, None, None, None, None, false, 1).unwrap();
4184 assert_eq!(state.schema.len(), 2);
4185 assert!(state.schema.contains("id"));
4186 assert!(state.schema.contains("name"));
4187 let _ = std::fs::remove_file(&path);
4188 }
4189
4190 #[test]
4191 fn test_filter() {
4192 let lf = create_test_lf();
4193 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4194 let filters = vec![FilterStatement {
4195 column: "a".to_string(),
4196 operator: FilterOperator::Gt,
4197 value: "2".to_string(),
4198 logical_op: LogicalOperator::And,
4199 }];
4200 state.filter(filters);
4201 let df = state.lf.clone().collect().unwrap();
4202 assert_eq!(df.shape().0, 1);
4203 assert_eq!(df.column("a").unwrap().get(0).unwrap(), AnyValue::Int32(3));
4204 }
4205
4206 #[test]
4207 fn test_sort() {
4208 let lf = create_test_lf();
4209 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4210 state.sort(vec!["a".to_string()], false);
4211 let df = state.lf.clone().collect().unwrap();
4212 assert_eq!(df.column("a").unwrap().get(0).unwrap(), AnyValue::Int32(3));
4213 }
4214
4215 #[test]
4216 fn test_query() {
4217 let lf = create_test_lf();
4218 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4219 state.query("select b where a = 2".to_string());
4220 let df = state.lf.clone().collect().unwrap();
4221 assert_eq!(df.shape(), (1, 1));
4222 assert_eq!(
4223 df.column("b").unwrap().get(0).unwrap(),
4224 AnyValue::String("y")
4225 );
4226 }
4227
4228 #[test]
4229 fn test_query_date_accessors() {
4230 use chrono::NaiveDate;
4231 let df = df!(
4232 "event_date" => [
4233 NaiveDate::from_ymd_opt(2024, 1, 15).unwrap(),
4234 NaiveDate::from_ymd_opt(2024, 6, 20).unwrap(),
4235 NaiveDate::from_ymd_opt(2024, 12, 31).unwrap(),
4236 ],
4237 "name" => &["a", "b", "c"],
4238 )
4239 .unwrap();
4240 let lf = df.lazy();
4241 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4242
4243 state.query("select name, year: event_date.year, month: event_date.month".to_string());
4245 assert!(
4246 state.error.is_none(),
4247 "query should succeed: {:?}",
4248 state.error
4249 );
4250 let df = state.lf.clone().collect().unwrap();
4251 assert_eq!(df.shape(), (3, 3));
4252 assert_eq!(
4253 df.column("year").unwrap().get(0).unwrap(),
4254 AnyValue::Int32(2024)
4255 );
4256 assert_eq!(
4257 df.column("month").unwrap().get(0).unwrap(),
4258 AnyValue::Int8(1)
4259 );
4260 assert_eq!(
4261 df.column("month").unwrap().get(1).unwrap(),
4262 AnyValue::Int8(6)
4263 );
4264
4265 state.query("select name, event_date where event_date.month = 12".to_string());
4267 assert!(
4268 state.error.is_none(),
4269 "filter should succeed: {:?}",
4270 state.error
4271 );
4272 let df = state.lf.clone().collect().unwrap();
4273 assert_eq!(df.height(), 1);
4274 assert_eq!(
4275 df.column("name").unwrap().get(0).unwrap(),
4276 AnyValue::String("c")
4277 );
4278
4279 state.query("select name, event_date where event_date.date > 2024.06.15".to_string());
4281 assert!(
4282 state.error.is_none(),
4283 "date literal filter should succeed: {:?}",
4284 state.error
4285 );
4286 let df = state.lf.clone().collect().unwrap();
4287 assert_eq!(
4288 df.height(),
4289 2,
4290 "2024-06-20 and 2024-12-31 are after 2024-06-15"
4291 );
4292
4293 state.query(
4295 "select name, upper_name: name.upper, name_len: name.len where name.ends_with[\"c\"]"
4296 .to_string(),
4297 );
4298 assert!(
4299 state.error.is_none(),
4300 "string accessors should succeed: {:?}",
4301 state.error
4302 );
4303 let df = state.lf.clone().collect().unwrap();
4304 assert_eq!(df.height(), 1, "only 'c' ends with 'c'");
4305 assert_eq!(
4306 df.column("upper_name").unwrap().get(0).unwrap(),
4307 AnyValue::String("C")
4308 );
4309
4310 state.query("select where event_date.date = 2020.01.01".to_string());
4312 assert!(state.error.is_none());
4313 assert_eq!(state.num_rows, 0);
4314 state.visible_rows = 10;
4315 state.collect();
4316 assert!(state.df.is_none(), "df must be cleared when num_rows is 0");
4317 assert!(
4318 state.locked_df.is_none(),
4319 "locked_df must be cleared when num_rows is 0"
4320 );
4321 }
4322
4323 #[test]
4324 fn test_select_next_previous() {
4325 let lf = create_large_test_lf();
4326 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4327 state.visible_rows = 10;
4328 state.table_state.select(Some(5));
4329
4330 state.select_next();
4331 assert_eq!(state.table_state.selected(), Some(6));
4332
4333 state.select_previous();
4334 assert_eq!(state.table_state.selected(), Some(5));
4335 }
4336
4337 #[test]
4338 fn test_page_up_down() {
4339 let lf = create_large_test_lf();
4340 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4341 state.visible_rows = 20;
4342 state.collect();
4343
4344 assert_eq!(state.start_row, 0);
4345 state.page_down();
4346 assert_eq!(state.start_row, 20);
4347 state.page_down();
4348 assert_eq!(state.start_row, 40);
4349 state.page_up();
4350 assert_eq!(state.start_row, 20);
4351 state.page_up();
4352 assert_eq!(state.start_row, 0);
4353 }
4354
4355 #[test]
4356 fn test_scroll_left_right() {
4357 let lf = create_large_test_lf();
4358 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4359 assert_eq!(state.termcol_index, 0);
4360 state.scroll_right();
4361 assert_eq!(state.termcol_index, 1);
4362 state.scroll_right();
4363 assert_eq!(state.termcol_index, 2);
4364 state.scroll_left();
4365 assert_eq!(state.termcol_index, 1);
4366 state.scroll_left();
4367 assert_eq!(state.termcol_index, 0);
4368 }
4369
4370 #[test]
4371 fn test_reverse() {
4372 let lf = create_test_lf();
4373 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4374 state.sort(vec!["a".to_string()], true);
4375 assert_eq!(
4376 state
4377 .lf
4378 .clone()
4379 .collect()
4380 .unwrap()
4381 .column("a")
4382 .unwrap()
4383 .get(0)
4384 .unwrap(),
4385 AnyValue::Int32(1)
4386 );
4387 state.reverse();
4388 assert_eq!(
4389 state
4390 .lf
4391 .clone()
4392 .collect()
4393 .unwrap()
4394 .column("a")
4395 .unwrap()
4396 .get(0)
4397 .unwrap(),
4398 AnyValue::Int32(3)
4399 );
4400 }
4401
4402 #[test]
4403 fn test_filter_multiple() {
4404 let lf = create_large_test_lf();
4405 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4406 let filters = vec![
4407 FilterStatement {
4408 column: "c".to_string(),
4409 operator: FilterOperator::Eq,
4410 value: "1".to_string(),
4411 logical_op: LogicalOperator::And,
4412 },
4413 FilterStatement {
4414 column: "d".to_string(),
4415 operator: FilterOperator::Eq,
4416 value: "2".to_string(),
4417 logical_op: LogicalOperator::And,
4418 },
4419 ];
4420 state.filter(filters);
4421 let df = state.lf.clone().collect().unwrap();
4422 assert_eq!(df.shape().0, 7);
4423 }
4424
4425 #[test]
4426 fn test_filter_and_sort() {
4427 let lf = create_large_test_lf();
4428 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4429 let filters = vec![FilterStatement {
4430 column: "c".to_string(),
4431 operator: FilterOperator::Eq,
4432 value: "1".to_string(),
4433 logical_op: LogicalOperator::And,
4434 }];
4435 state.filter(filters);
4436 state.sort(vec!["a".to_string()], false);
4437 let df = state.lf.clone().collect().unwrap();
4438 assert_eq!(df.column("a").unwrap().get(0).unwrap(), AnyValue::Int32(97));
4439 }
4440
4441 fn create_pivot_long_lf() -> LazyFrame {
4444 let df = df!(
4445 "id" => &[1_i32, 1, 1, 2, 2, 2, 1, 2],
4446 "date" => &["d1", "d1", "d1", "d1", "d1", "d1", "d1", "d1"],
4447 "key" => &["A", "B", "C", "A", "B", "C", "A", "B"],
4448 "value" => &[10.0_f64, 20.0, 30.0, 40.0, 50.0, 60.0, 11.0, 51.0],
4449 )
4450 .unwrap();
4451 df.lazy()
4452 }
4453
4454 fn create_melt_wide_lf() -> LazyFrame {
4456 let df = df!(
4457 "id" => &[1_i32, 2, 3],
4458 "date" => &["d1", "d2", "d3"],
4459 "c1" => &[10.0_f64, 20.0, 30.0],
4460 "c2" => &[11.0, 21.0, 31.0],
4461 "c3" => &[12.0, 22.0, 32.0],
4462 )
4463 .unwrap();
4464 df.lazy()
4465 }
4466
4467 #[test]
4468 fn test_pivot_basic() {
4469 let lf = create_pivot_long_lf();
4470 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4471 let spec = PivotSpec {
4472 index: vec!["id".to_string(), "date".to_string()],
4473 pivot_column: "key".to_string(),
4474 value_column: "value".to_string(),
4475 aggregation: PivotAggregation::Last,
4476 sort_columns: None,
4477 };
4478 state.pivot(&spec).unwrap();
4479 let df = state.lf.clone().collect().unwrap();
4480 let names: Vec<&str> = df.get_column_names().iter().map(|s| s.as_str()).collect();
4481 assert!(names.contains(&"id"));
4482 assert!(names.contains(&"date"));
4483 assert!(names.contains(&"A"));
4484 assert!(names.contains(&"B"));
4485 assert!(names.contains(&"C"));
4486 assert_eq!(df.height(), 2);
4487 }
4488
4489 #[test]
4490 fn test_pivot_aggregation_last() {
4491 let lf = create_pivot_long_lf();
4492 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4493 let spec = PivotSpec {
4494 index: vec!["id".to_string(), "date".to_string()],
4495 pivot_column: "key".to_string(),
4496 value_column: "value".to_string(),
4497 aggregation: PivotAggregation::Last,
4498 sort_columns: None,
4499 };
4500 state.pivot(&spec).unwrap();
4501 let df = state.lf.clone().collect().unwrap();
4502 let a_col = df.column("A").unwrap();
4503 let row0 = a_col.get(0).unwrap();
4504 let row1 = a_col.get(1).unwrap();
4505 assert_eq!(row0, AnyValue::Float64(11.0));
4506 assert_eq!(row1, AnyValue::Float64(40.0));
4507 }
4508
4509 #[test]
4510 fn test_pivot_aggregation_first() {
4511 let lf = create_pivot_long_lf();
4512 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4513 let spec = PivotSpec {
4514 index: vec!["id".to_string(), "date".to_string()],
4515 pivot_column: "key".to_string(),
4516 value_column: "value".to_string(),
4517 aggregation: PivotAggregation::First,
4518 sort_columns: None,
4519 };
4520 state.pivot(&spec).unwrap();
4521 let df = state.lf.clone().collect().unwrap();
4522 let a_col = df.column("A").unwrap();
4523 assert_eq!(a_col.get(0).unwrap(), AnyValue::Float64(10.0));
4524 assert_eq!(a_col.get(1).unwrap(), AnyValue::Float64(40.0));
4525 }
4526
4527 #[test]
4528 fn test_pivot_aggregation_min_max() {
4529 let lf = create_pivot_long_lf();
4530 let mut state_min = DataTableState::new(lf.clone(), None, None, None, None, true).unwrap();
4531 state_min
4532 .pivot(&PivotSpec {
4533 index: vec!["id".to_string(), "date".to_string()],
4534 pivot_column: "key".to_string(),
4535 value_column: "value".to_string(),
4536 aggregation: PivotAggregation::Min,
4537 sort_columns: None,
4538 })
4539 .unwrap();
4540 let df_min = state_min.lf.clone().collect().unwrap();
4541 assert_eq!(
4542 df_min.column("A").unwrap().get(0).unwrap(),
4543 AnyValue::Float64(10.0)
4544 );
4545
4546 let mut state_max = DataTableState::new(lf, None, None, None, None, true).unwrap();
4547 state_max
4548 .pivot(&PivotSpec {
4549 index: vec!["id".to_string(), "date".to_string()],
4550 pivot_column: "key".to_string(),
4551 value_column: "value".to_string(),
4552 aggregation: PivotAggregation::Max,
4553 sort_columns: None,
4554 })
4555 .unwrap();
4556 let df_max = state_max.lf.clone().collect().unwrap();
4557 assert_eq!(
4558 df_max.column("A").unwrap().get(0).unwrap(),
4559 AnyValue::Float64(11.0)
4560 );
4561 }
4562
4563 #[test]
4564 fn test_pivot_aggregation_avg_count() {
4565 let lf = create_pivot_long_lf();
4566 let mut state_avg = DataTableState::new(lf.clone(), None, None, None, None, true).unwrap();
4567 state_avg
4568 .pivot(&PivotSpec {
4569 index: vec!["id".to_string(), "date".to_string()],
4570 pivot_column: "key".to_string(),
4571 value_column: "value".to_string(),
4572 aggregation: PivotAggregation::Avg,
4573 sort_columns: None,
4574 })
4575 .unwrap();
4576 let df_avg = state_avg.lf.clone().collect().unwrap();
4577 let a = df_avg.column("A").unwrap().get(0).unwrap();
4578 if let AnyValue::Float64(x) = a {
4579 assert!((x - 10.5).abs() < 1e-6);
4580 } else {
4581 panic!("expected float");
4582 }
4583
4584 let mut state_count = DataTableState::new(lf, None, None, None, None, true).unwrap();
4585 state_count
4586 .pivot(&PivotSpec {
4587 index: vec!["id".to_string(), "date".to_string()],
4588 pivot_column: "key".to_string(),
4589 value_column: "value".to_string(),
4590 aggregation: PivotAggregation::Count,
4591 sort_columns: None,
4592 })
4593 .unwrap();
4594 let df_count = state_count.lf.clone().collect().unwrap();
4595 let a = df_count.column("A").unwrap().get(0).unwrap();
4596 assert_eq!(a, AnyValue::UInt32(2));
4597 }
4598
4599 #[test]
4600 fn test_pivot_string_first_last() {
4601 let df = df!(
4602 "id" => &[1_i32, 1, 2, 2],
4603 "key" => &["X", "Y", "X", "Y"],
4604 "value" => &["low", "mid", "high", "mid"],
4605 )
4606 .unwrap();
4607 let lf = df.lazy();
4608 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4609 let spec = PivotSpec {
4610 index: vec!["id".to_string()],
4611 pivot_column: "key".to_string(),
4612 value_column: "value".to_string(),
4613 aggregation: PivotAggregation::Last,
4614 sort_columns: None,
4615 };
4616 state.pivot(&spec).unwrap();
4617 let out = state.lf.clone().collect().unwrap();
4618 assert_eq!(
4619 out.column("X").unwrap().get(0).unwrap(),
4620 AnyValue::String("low")
4621 );
4622 assert_eq!(
4623 out.column("Y").unwrap().get(0).unwrap(),
4624 AnyValue::String("mid")
4625 );
4626 }
4627
4628 #[test]
4629 fn test_melt_basic() {
4630 let lf = create_melt_wide_lf();
4631 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4632 let spec = MeltSpec {
4633 index: vec!["id".to_string(), "date".to_string()],
4634 value_columns: vec!["c1".to_string(), "c2".to_string(), "c3".to_string()],
4635 variable_name: "variable".to_string(),
4636 value_name: "value".to_string(),
4637 };
4638 state.melt(&spec).unwrap();
4639 let df = state.lf.clone().collect().unwrap();
4640 assert_eq!(df.height(), 9);
4641 let names: Vec<&str> = df.get_column_names().iter().map(|s| s.as_str()).collect();
4642 assert!(names.contains(&"variable"));
4643 assert!(names.contains(&"value"));
4644 assert!(names.contains(&"id"));
4645 assert!(names.contains(&"date"));
4646 }
4647
4648 #[test]
4649 fn test_melt_all_except_index() {
4650 let lf = create_melt_wide_lf();
4651 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4652 let spec = MeltSpec {
4653 index: vec!["id".to_string(), "date".to_string()],
4654 value_columns: vec!["c1".to_string(), "c2".to_string(), "c3".to_string()],
4655 variable_name: "var".to_string(),
4656 value_name: "val".to_string(),
4657 };
4658 state.melt(&spec).unwrap();
4659 let df = state.lf.clone().collect().unwrap();
4660 assert!(df.column("var").is_ok());
4661 assert!(df.column("val").is_ok());
4662 }
4663
4664 #[test]
4665 fn test_pivot_on_current_view_after_filter() {
4666 let lf = create_pivot_long_lf();
4667 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4668 state.filter(vec![FilterStatement {
4669 column: "id".to_string(),
4670 operator: FilterOperator::Eq,
4671 value: "1".to_string(),
4672 logical_op: LogicalOperator::And,
4673 }]);
4674 let spec = PivotSpec {
4675 index: vec!["id".to_string(), "date".to_string()],
4676 pivot_column: "key".to_string(),
4677 value_column: "value".to_string(),
4678 aggregation: PivotAggregation::Last,
4679 sort_columns: None,
4680 };
4681 state.pivot(&spec).unwrap();
4682 let df = state.lf.clone().collect().unwrap();
4683 assert_eq!(df.height(), 1);
4684 let id_col = df.column("id").unwrap();
4685 assert_eq!(id_col.get(0).unwrap(), AnyValue::Int32(1));
4686 }
4687
4688 #[test]
4689 fn test_fuzzy_token_regex() {
4690 assert_eq!(fuzzy_token_regex("foo"), "(?i).*f.*o.*o.*");
4691 assert_eq!(fuzzy_token_regex("a"), "(?i).*a.*");
4692 let pat = fuzzy_token_regex("[");
4694 assert!(pat.contains("\\["));
4695 }
4696
4697 #[test]
4698 fn test_fuzzy_search() {
4699 crate::tests::ensure_sample_data();
4702 let path = crate::tests::sample_data_dir().join("3-sfd-header.csv");
4703 let mut state = DataTableState::from_csv(&path, &Default::default()).unwrap();
4704 state.visible_rows = 10;
4705 state.collect();
4706 let before = state.num_rows;
4707 state.fuzzy_search("string".to_string());
4708 assert!(state.error.is_none(), "{:?}", state.error);
4709 assert!(state.num_rows <= before, "fuzzy search should filter rows");
4710 state.fuzzy_search("".to_string());
4711 state.collect();
4712 assert_eq!(state.num_rows, before, "empty fuzzy search should reset");
4713 assert!(state.get_active_fuzzy_query().is_empty());
4714 }
4715
4716 #[test]
4717 fn test_fuzzy_search_regex_direct() {
4718 let lf = df!("name" => &["alice", "bob", "carol"]).unwrap().lazy();
4720 let pattern = fuzzy_token_regex("alice");
4721 let out = lf
4722 .filter(col("name").str().contains(lit(pattern.clone()), false))
4723 .collect()
4724 .unwrap();
4725 assert_eq!(out.height(), 1, "regex {:?} should match alice", pattern);
4726
4727 let lf2 = df!(
4729 "id" => &[1i32, 2, 3],
4730 "name" => &["alice", "bob", "carol"],
4731 "city" => &["NYC", "LA", "Boston"]
4732 )
4733 .unwrap()
4734 .lazy();
4735 let pat = fuzzy_token_regex("alice");
4736 let expr = col("name")
4737 .str()
4738 .contains(lit(pat.clone()), false)
4739 .or(col("city").str().contains(lit(pat), false));
4740 let out2 = lf2.clone().filter(expr).collect().unwrap();
4741 assert_eq!(out2.height(), 1);
4742
4743 let schema = lf2.clone().collect_schema().unwrap();
4745 let string_cols: Vec<String> = schema
4746 .iter()
4747 .filter(|(_, dtype)| dtype.is_string())
4748 .map(|(name, _)| name.to_string())
4749 .collect();
4750 assert!(
4751 !string_cols.is_empty(),
4752 "df! string cols should be detected"
4753 );
4754 let pattern = fuzzy_token_regex("alice");
4755 let token_expr = string_cols
4756 .iter()
4757 .map(|c| col(c.as_str()).str().contains(lit(pattern.clone()), false))
4758 .reduce(|a, b| a.or(b))
4759 .unwrap();
4760 let out3 = lf2.filter(token_expr).collect().unwrap();
4761 assert_eq!(
4762 out3.height(),
4763 1,
4764 "fuzzy_search-style filter should match 1 row"
4765 );
4766 }
4767
4768 #[test]
4769 fn test_fuzzy_search_no_string_columns() {
4770 let lf = df!("a" => &[1i32, 2, 3], "b" => &[10i64, 20, 30])
4771 .unwrap()
4772 .lazy();
4773 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4774 state.fuzzy_search("x".to_string());
4775 assert!(state.error.is_some());
4776 }
4777
4778 #[test]
4781 fn test_by_query_result_sorted_by_group_columns() {
4782 let df = df!(
4784 "age_group" => &[3i64, 1, 5, 2, 4, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5],
4785 "team" => &[
4786 "Red", "Blue", "Green", "Red", "Blue", "Green", "Green", "Red", "Blue",
4787 "Green", "Red", "Blue", "Red", "Blue", "Green",
4788 ],
4789 "score" => &[50.0f64, 10.0, 90.0, 20.0, 30.0, 40.0, 60.0, 70.0, 80.0, 15.0, 25.0, 35.0, 45.0, 55.0, 65.0],
4790 )
4791 .unwrap();
4792 let lf = df.lazy();
4793 let options = crate::OpenOptions::default();
4794 let mut state = DataTableState::from_lazyframe(lf, &options).unwrap();
4795 state.query("select avg score by age_group, team".to_string());
4796 assert!(
4797 state.error.is_none(),
4798 "query should succeed: {:?}",
4799 state.error
4800 );
4801 let result = state.lf.collect().unwrap();
4802 let sorted = result
4804 .sort(
4805 ["age_group", "team"],
4806 SortMultipleOptions::default().with_order_descending(false),
4807 )
4808 .unwrap();
4809 assert_eq!(
4810 result, sorted,
4811 "by-query result must be sorted by (age_group, team)"
4812 );
4813 }
4814
4815 #[test]
4818 fn test_by_query_computed_group_key_sorted_by_result_column() {
4819 let df = df!(
4820 "x" => &[7.0f64, 12.0, 3.0, 22.0, 17.0, 8.0],
4821 "v" => &[1.0f64, 2.0, 3.0, 4.0, 5.0, 6.0],
4822 )
4823 .unwrap();
4824 let lf = df.lazy();
4825 let options = crate::OpenOptions::default();
4826 let mut state = DataTableState::from_lazyframe(lf, &options).unwrap();
4827 state.query("select sum v by bucket: 1+floor x % 3".to_string());
4829 assert!(
4830 state.error.is_none(),
4831 "query should succeed: {:?}",
4832 state.error
4833 );
4834 let result = state.lf.collect().unwrap();
4835 let bucket = result.column("bucket").unwrap();
4836 for i in 1..result.height() {
4838 let prev: i64 = bucket.get(i - 1).unwrap().try_extract().unwrap_or(0);
4839 let curr: i64 = bucket.get(i).unwrap().try_extract().unwrap_or(0);
4840 assert!(
4841 curr >= prev,
4842 "bucket column must be sorted: {} then {}",
4843 prev,
4844 curr
4845 );
4846 }
4847 }
4848}