1use color_eyre::Result;
2use std::borrow::Cow;
3use std::collections::HashSet;
4use std::sync::Arc;
5use std::{fs, fs::File, path::Path};
6
7use polars::io::HiveOptions;
8use polars::prelude::*;
9use ratatui::{
10 buffer::Buffer,
11 layout::Rect,
12 style::{Color, Modifier, Style},
13 text::{Line, Span},
14 widgets::{
15 Block, Borders, Cell, Padding, Paragraph, Row, StatefulWidget, Table, TableState, Widget,
16 },
17};
18
19use crate::error_display::user_message_from_polars;
20use crate::filter_modal::{FilterOperator, FilterStatement, LogicalOperator};
21use crate::pivot_melt_modal::{MeltSpec, PivotAggregation, PivotSpec};
22use crate::query::parse_query;
23use crate::statistics::collect_lazy;
24use crate::{CompressionFormat, OpenOptions, ParseStringsTarget};
25use polars::io::csv::read::NullValues;
26use polars::lazy::frame::pivot::pivot_stable;
27use polars::prelude::StrptimeOptions;
28use std::io::{BufReader, Read};
29
30use calamine::{open_workbook_auto, Data, Reader};
31use chrono::{NaiveDate, NaiveDateTime, NaiveTime};
32use orc_rust::ArrowReaderBuilder;
33use tempfile::NamedTempFile;
34
35use arrow::array::types::{
36 Date32Type, Date64Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type,
37 TimestampMillisecondType, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
38};
39use arrow::array::{Array, AsArray};
40use arrow::record_batch::RecordBatch;
41
42fn pivot_agg_expr(agg: PivotAggregation) -> Result<Expr> {
43 let e = col(PlSmallStr::from_static(""));
44 let expr = match agg {
45 PivotAggregation::Last => e.last(),
46 PivotAggregation::First => e.first(),
47 PivotAggregation::Min => e.min(),
48 PivotAggregation::Max => e.max(),
49 PivotAggregation::Avg => e.mean(),
50 PivotAggregation::Med => e.median(),
51 PivotAggregation::Std => e.std(1),
52 PivotAggregation::Count => e.len(),
53 };
54 Ok(expr)
55}
56
57pub struct DataTableState {
58 pub lf: LazyFrame,
59 original_lf: LazyFrame,
60 df: Option<DataFrame>, locked_df: Option<DataFrame>, pub table_state: TableState,
63 pub start_row: usize,
64 pub visible_rows: usize,
65 pub termcol_index: usize,
66 pub visible_termcols: usize,
67 pub error: Option<PolarsError>,
68 pub suppress_error_display: bool, pub schema: Arc<Schema>,
70 pub num_rows: usize,
71 num_rows_valid: bool,
73 filters: Vec<FilterStatement>,
74 sort_columns: Vec<String>,
75 sort_ascending: bool,
76 pub active_query: String,
77 pub active_sql_query: String,
79 pub active_fuzzy_query: String,
81 column_order: Vec<String>, locked_columns_count: usize, grouped_lf: Option<LazyFrame>,
84 drilled_down_group_index: Option<usize>, pub drilled_down_group_key: Option<Vec<String>>, pub drilled_down_group_key_columns: Option<Vec<String>>, pages_lookahead: usize,
88 pages_lookback: usize,
89 max_buffered_rows: usize, max_buffered_mb: usize, buffered_start_row: usize,
92 buffered_end_row: usize,
93 buffered_df: Option<DataFrame>,
96 proximity_threshold: usize,
97 row_numbers: bool,
98 row_start_index: usize,
99 last_pivot_spec: Option<PivotSpec>,
101 last_melt_spec: Option<MeltSpec>,
103 pub partition_columns: Option<Vec<String>>,
105 decompress_temp_file: Option<NamedTempFile>,
107 pub polars_streaming: bool,
109 workaround_pivot_date_index: bool,
111}
112
113#[derive(Clone, Copy)]
115enum ExcelColType {
116 Int64,
117 Float64,
118 Boolean,
119 Utf8,
120 Date,
121 Datetime,
122}
123
124impl DataTableState {
125 pub fn new(
126 lf: LazyFrame,
127 pages_lookahead: Option<usize>,
128 pages_lookback: Option<usize>,
129 max_buffered_rows: Option<usize>,
130 max_buffered_mb: Option<usize>,
131 polars_streaming: bool,
132 ) -> Result<Self> {
133 let schema = lf.clone().collect_schema()?;
134 let column_order: Vec<String> = schema.iter_names().map(|s| s.to_string()).collect();
135 Ok(Self {
136 original_lf: lf.clone(),
137 lf,
138 df: None,
139 locked_df: None,
140 table_state: TableState::default(),
141 start_row: 0,
142 visible_rows: 0,
143 termcol_index: 0,
144 visible_termcols: 0,
145 error: None,
146 suppress_error_display: false,
147 schema,
148 num_rows: 0,
149 num_rows_valid: false,
150 filters: Vec::new(),
151 sort_columns: Vec::new(),
152 sort_ascending: true,
153 active_query: String::new(),
154 active_sql_query: String::new(),
155 active_fuzzy_query: String::new(),
156 column_order,
157 locked_columns_count: 0,
158 grouped_lf: None,
159 drilled_down_group_index: None,
160 drilled_down_group_key: None,
161 drilled_down_group_key_columns: None,
162 pages_lookahead: pages_lookahead.unwrap_or(3),
163 pages_lookback: pages_lookback.unwrap_or(3),
164 max_buffered_rows: max_buffered_rows.unwrap_or(100_000),
165 max_buffered_mb: max_buffered_mb.unwrap_or(512),
166 buffered_start_row: 0,
167 buffered_end_row: 0,
168 buffered_df: None,
169 proximity_threshold: 0, row_numbers: false, row_start_index: 1, last_pivot_spec: None,
173 last_melt_spec: None,
174 partition_columns: None,
175 decompress_temp_file: None,
176 polars_streaming,
177 workaround_pivot_date_index: true,
178 })
179 }
180
181 pub fn from_lazyframe(lf: LazyFrame, options: &crate::OpenOptions) -> Result<Self> {
183 let mut state = Self::new(
184 lf,
185 options.pages_lookahead,
186 options.pages_lookback,
187 options.max_buffered_rows,
188 options.max_buffered_mb,
189 options.polars_streaming,
190 )?;
191 state.row_numbers = options.row_numbers;
192 state.row_start_index = options.row_start_index;
193 state.workaround_pivot_date_index = options.workaround_pivot_date_index;
194 Ok(state)
195 }
196
197 pub fn from_schema_and_lazyframe(
201 schema: Arc<Schema>,
202 lf: LazyFrame,
203 options: &crate::OpenOptions,
204 partition_columns: Option<Vec<String>>,
205 ) -> Result<Self> {
206 let column_order: Vec<String> = if let Some(ref part) = partition_columns {
207 let part_set: HashSet<&str> = part.iter().map(String::as_str).collect();
208 let rest: Vec<String> = schema
209 .iter_names()
210 .map(|s| s.to_string())
211 .filter(|c| !part_set.contains(c.as_str()))
212 .collect();
213 part.iter().cloned().chain(rest).collect()
214 } else {
215 schema.iter_names().map(|s| s.to_string()).collect()
216 };
217 Ok(Self {
218 original_lf: lf.clone(),
219 lf,
220 df: None,
221 locked_df: None,
222 table_state: TableState::default(),
223 start_row: 0,
224 visible_rows: 0,
225 termcol_index: 0,
226 visible_termcols: 0,
227 error: None,
228 suppress_error_display: false,
229 schema,
230 num_rows: 0,
231 num_rows_valid: false,
232 filters: Vec::new(),
233 sort_columns: Vec::new(),
234 sort_ascending: true,
235 active_query: String::new(),
236 active_sql_query: String::new(),
237 active_fuzzy_query: String::new(),
238 column_order,
239 locked_columns_count: 0,
240 grouped_lf: None,
241 drilled_down_group_index: None,
242 drilled_down_group_key: None,
243 drilled_down_group_key_columns: None,
244 pages_lookahead: options.pages_lookahead.unwrap_or(3),
245 pages_lookback: options.pages_lookback.unwrap_or(3),
246 max_buffered_rows: options.max_buffered_rows.unwrap_or(100_000),
247 max_buffered_mb: options.max_buffered_mb.unwrap_or(512),
248 buffered_start_row: 0,
249 buffered_end_row: 0,
250 buffered_df: None,
251 proximity_threshold: 0,
252 row_numbers: options.row_numbers,
253 row_start_index: options.row_start_index,
254 last_pivot_spec: None,
255 last_melt_spec: None,
256 partition_columns,
257 decompress_temp_file: None,
258 polars_streaming: options.polars_streaming,
259 workaround_pivot_date_index: options.workaround_pivot_date_index,
260 })
261 }
262
263 fn reset_lf_to_original(&mut self) {
268 self.invalidate_num_rows();
269 self.lf = self.original_lf.clone();
270 self.schema = self
271 .original_lf
272 .clone()
273 .collect_schema()
274 .unwrap_or_else(|_| Arc::new(Schema::with_capacity(0)));
275 self.column_order = self.schema.iter_names().map(|s| s.to_string()).collect();
276 self.active_query.clear();
277 self.active_sql_query.clear();
278 self.active_fuzzy_query.clear();
279 self.locked_columns_count = 0;
280 self.filters.clear();
281 self.sort_columns.clear();
282 self.sort_ascending = true;
283 self.start_row = 0;
284 self.termcol_index = 0;
285 self.drilled_down_group_index = None;
286 self.drilled_down_group_key = None;
287 self.drilled_down_group_key_columns = None;
288 self.grouped_lf = None;
289 self.buffered_start_row = 0;
290 self.buffered_end_row = 0;
291 self.buffered_df = None;
292 self.table_state.select(Some(0));
293 }
294
295 pub fn reset(&mut self) {
296 self.reset_lf_to_original();
297 self.error = None;
298 self.suppress_error_display = false;
299 self.last_pivot_spec = None;
300 self.last_melt_spec = None;
301 self.collect();
302 if self.num_rows > 0 {
303 self.start_row = 0;
304 }
305 }
306
307 pub fn from_parquet(
308 path: &Path,
309 pages_lookahead: Option<usize>,
310 pages_lookback: Option<usize>,
311 max_buffered_rows: Option<usize>,
312 max_buffered_mb: Option<usize>,
313 row_numbers: bool,
314 row_start_index: usize,
315 ) -> Result<Self> {
316 let path_str = path.as_os_str().to_string_lossy();
317 let is_glob = path_str.contains('*');
318 let pl_path = PlPath::Local(Arc::from(path));
319 let args = ScanArgsParquet {
320 glob: is_glob,
321 ..Default::default()
322 };
323 let lf = LazyFrame::scan_parquet(pl_path, args)?;
324 let mut state = Self::new(
325 lf,
326 pages_lookahead,
327 pages_lookback,
328 max_buffered_rows,
329 max_buffered_mb,
330 true,
331 )?;
332 state.row_numbers = row_numbers;
333 state.row_start_index = row_start_index;
334 Ok(state)
335 }
336
337 pub fn from_parquet_paths(
339 paths: &[impl AsRef<Path>],
340 pages_lookahead: Option<usize>,
341 pages_lookback: Option<usize>,
342 max_buffered_rows: Option<usize>,
343 max_buffered_mb: Option<usize>,
344 row_numbers: bool,
345 row_start_index: usize,
346 ) -> Result<Self> {
347 if paths.is_empty() {
348 return Err(color_eyre::eyre::eyre!("No paths provided"));
349 }
350 if paths.len() == 1 {
351 return Self::from_parquet(
352 paths[0].as_ref(),
353 pages_lookahead,
354 pages_lookback,
355 max_buffered_rows,
356 max_buffered_mb,
357 row_numbers,
358 row_start_index,
359 );
360 }
361 let mut lazy_frames = Vec::with_capacity(paths.len());
362 for p in paths {
363 let pl_path = PlPath::Local(Arc::from(p.as_ref()));
364 let lf = LazyFrame::scan_parquet(pl_path, Default::default())?;
365 lazy_frames.push(lf);
366 }
367 let lf = polars::prelude::concat(lazy_frames.as_slice(), Default::default())?;
368 let mut state = Self::new(
369 lf,
370 pages_lookahead,
371 pages_lookback,
372 max_buffered_rows,
373 max_buffered_mb,
374 true,
375 )?;
376 state.row_numbers = row_numbers;
377 state.row_start_index = row_start_index;
378 Ok(state)
379 }
380
381 pub fn from_ipc(
383 path: &Path,
384 pages_lookahead: Option<usize>,
385 pages_lookback: Option<usize>,
386 max_buffered_rows: Option<usize>,
387 max_buffered_mb: Option<usize>,
388 row_numbers: bool,
389 row_start_index: usize,
390 ) -> Result<Self> {
391 let pl_path = PlPath::Local(Arc::from(path));
392 let lf = LazyFrame::scan_ipc(pl_path, Default::default(), Default::default())?;
393 let mut state = Self::new(
394 lf,
395 pages_lookahead,
396 pages_lookback,
397 max_buffered_rows,
398 max_buffered_mb,
399 true,
400 )?;
401 state.row_numbers = row_numbers;
402 state.row_start_index = row_start_index;
403 Ok(state)
404 }
405
406 pub fn from_ipc_paths(
408 paths: &[impl AsRef<Path>],
409 pages_lookahead: Option<usize>,
410 pages_lookback: Option<usize>,
411 max_buffered_rows: Option<usize>,
412 max_buffered_mb: Option<usize>,
413 row_numbers: bool,
414 row_start_index: usize,
415 ) -> Result<Self> {
416 if paths.is_empty() {
417 return Err(color_eyre::eyre::eyre!("No paths provided"));
418 }
419 if paths.len() == 1 {
420 return Self::from_ipc(
421 paths[0].as_ref(),
422 pages_lookahead,
423 pages_lookback,
424 max_buffered_rows,
425 max_buffered_mb,
426 row_numbers,
427 row_start_index,
428 );
429 }
430 let mut lazy_frames = Vec::with_capacity(paths.len());
431 for p in paths {
432 let pl_path = PlPath::Local(Arc::from(p.as_ref()));
433 let lf = LazyFrame::scan_ipc(pl_path, Default::default(), Default::default())?;
434 lazy_frames.push(lf);
435 }
436 let lf = polars::prelude::concat(lazy_frames.as_slice(), Default::default())?;
437 let mut state = Self::new(
438 lf,
439 pages_lookahead,
440 pages_lookback,
441 max_buffered_rows,
442 max_buffered_mb,
443 true,
444 )?;
445 state.row_numbers = row_numbers;
446 state.row_start_index = row_start_index;
447 Ok(state)
448 }
449
450 pub fn from_avro(
452 path: &Path,
453 pages_lookahead: Option<usize>,
454 pages_lookback: Option<usize>,
455 max_buffered_rows: Option<usize>,
456 max_buffered_mb: Option<usize>,
457 row_numbers: bool,
458 row_start_index: usize,
459 ) -> Result<Self> {
460 let file = File::open(path)?;
461 let df = polars::io::avro::AvroReader::new(file).finish()?;
462 let lf = df.lazy();
463 let mut state = Self::new(
464 lf,
465 pages_lookahead,
466 pages_lookback,
467 max_buffered_rows,
468 max_buffered_mb,
469 true,
470 )?;
471 state.row_numbers = row_numbers;
472 state.row_start_index = row_start_index;
473 Ok(state)
474 }
475
476 pub fn from_avro_paths(
478 paths: &[impl AsRef<Path>],
479 pages_lookahead: Option<usize>,
480 pages_lookback: Option<usize>,
481 max_buffered_rows: Option<usize>,
482 max_buffered_mb: Option<usize>,
483 row_numbers: bool,
484 row_start_index: usize,
485 ) -> Result<Self> {
486 if paths.is_empty() {
487 return Err(color_eyre::eyre::eyre!("No paths provided"));
488 }
489 if paths.len() == 1 {
490 return Self::from_avro(
491 paths[0].as_ref(),
492 pages_lookahead,
493 pages_lookback,
494 max_buffered_rows,
495 max_buffered_mb,
496 row_numbers,
497 row_start_index,
498 );
499 }
500 let mut lazy_frames = Vec::with_capacity(paths.len());
501 for p in paths {
502 let file = File::open(p.as_ref())?;
503 let df = polars::io::avro::AvroReader::new(file).finish()?;
504 lazy_frames.push(df.lazy());
505 }
506 let lf = polars::prelude::concat(lazy_frames.as_slice(), Default::default())?;
507 let mut state = Self::new(
508 lf,
509 pages_lookahead,
510 pages_lookback,
511 max_buffered_rows,
512 max_buffered_mb,
513 true,
514 )?;
515 state.row_numbers = row_numbers;
516 state.row_start_index = row_start_index;
517 Ok(state)
518 }
519
520 #[allow(clippy::too_many_arguments)]
523 pub fn from_excel(
524 path: &Path,
525 pages_lookahead: Option<usize>,
526 pages_lookback: Option<usize>,
527 max_buffered_rows: Option<usize>,
528 max_buffered_mb: Option<usize>,
529 row_numbers: bool,
530 row_start_index: usize,
531 excel_sheet: Option<&str>,
532 ) -> Result<Self> {
533 let mut workbook =
534 open_workbook_auto(path).map_err(|e| color_eyre::eyre::eyre!("Excel: {}", e))?;
535 let sheet_names = workbook.sheet_names().to_vec();
536 if sheet_names.is_empty() {
537 return Err(color_eyre::eyre::eyre!("Excel file has no worksheets"));
538 }
539 let range = if let Some(sheet_sel) = excel_sheet {
540 if let Ok(idx) = sheet_sel.parse::<usize>() {
541 workbook
542 .worksheet_range_at(idx)
543 .ok_or_else(|| color_eyre::eyre::eyre!("Excel: no sheet at index {}", idx))?
544 .map_err(|e| color_eyre::eyre::eyre!("Excel: {}", e))?
545 } else {
546 workbook
547 .worksheet_range(sheet_sel)
548 .map_err(|e| color_eyre::eyre::eyre!("Excel: {}", e))?
549 }
550 } else {
551 workbook
552 .worksheet_range_at(0)
553 .ok_or_else(|| color_eyre::eyre::eyre!("Excel: no first sheet"))?
554 .map_err(|e| color_eyre::eyre::eyre!("Excel: {}", e))?
555 };
556 let rows: Vec<Vec<Data>> = range.rows().map(|r| r.to_vec()).collect();
557 if rows.is_empty() {
558 let empty_df = DataFrame::new(vec![])?;
559 let mut state = Self::new(
560 empty_df.lazy(),
561 pages_lookahead,
562 pages_lookback,
563 max_buffered_rows,
564 max_buffered_mb,
565 true,
566 )?;
567 state.row_numbers = row_numbers;
568 state.row_start_index = row_start_index;
569 return Ok(state);
570 }
571 let headers: Vec<String> = rows[0]
572 .iter()
573 .map(|c| calamine::DataType::as_string(c).unwrap_or_else(|| c.to_string()))
574 .collect();
575 let n_cols = headers.len();
576 let mut series_vec = Vec::with_capacity(n_cols);
577 for (col_idx, header) in headers.iter().enumerate() {
578 let col_cells: Vec<Option<&Data>> =
579 rows[1..].iter().map(|row| row.get(col_idx)).collect();
580 let inferred = Self::excel_infer_column_type(&col_cells);
581 let name = if header.is_empty() {
582 format!("column_{}", col_idx + 1)
583 } else {
584 header.clone()
585 };
586 let series = Self::excel_column_to_series(name.as_str(), &col_cells, inferred)?;
587 series_vec.push(series.into());
588 }
589 let df = DataFrame::new(series_vec)?;
590 let mut state = Self::new(
591 df.lazy(),
592 pages_lookahead,
593 pages_lookback,
594 max_buffered_rows,
595 max_buffered_mb,
596 true,
597 )?;
598 state.row_numbers = row_numbers;
599 state.row_start_index = row_start_index;
600 Ok(state)
601 }
602
603 fn excel_infer_column_type(cells: &[Option<&Data>]) -> ExcelColType {
606 use calamine::DataType as CalamineTrait;
607 let mut has_string = false;
608 let mut has_float = false;
609 let mut has_int = false;
610 let mut has_bool = false;
611 let mut has_datetime = false;
612 for cell in cells.iter().flatten() {
613 if CalamineTrait::is_string(*cell) {
614 has_string = true;
615 break;
616 }
617 if CalamineTrait::is_float(*cell)
618 || CalamineTrait::is_datetime(*cell)
619 || CalamineTrait::is_datetime_iso(*cell)
620 {
621 has_float = true;
622 }
623 if CalamineTrait::is_int(*cell) {
624 has_int = true;
625 }
626 if CalamineTrait::is_bool(*cell) {
627 has_bool = true;
628 }
629 if CalamineTrait::is_datetime(*cell) || CalamineTrait::is_datetime_iso(*cell) {
630 has_datetime = true;
631 }
632 }
633 if has_string {
634 let any_parsed = cells
635 .iter()
636 .flatten()
637 .any(|c| Self::excel_cell_to_naive_datetime(c).is_some());
638 let all_non_empty_parse = cells.iter().flatten().all(|c| {
639 CalamineTrait::is_empty(*c) || Self::excel_cell_to_naive_datetime(c).is_some()
640 });
641 if any_parsed && all_non_empty_parse {
642 if Self::excel_parsed_cells_all_midnight(cells) {
643 ExcelColType::Date
644 } else {
645 ExcelColType::Datetime
646 }
647 } else {
648 ExcelColType::Utf8
649 }
650 } else if has_int {
651 ExcelColType::Int64
652 } else if has_datetime {
653 if Self::excel_parsed_cells_all_midnight(cells) {
654 ExcelColType::Date
655 } else {
656 ExcelColType::Datetime
657 }
658 } else if has_float {
659 let all_whole = cells.iter().flatten().all(|cell| {
660 cell.as_f64()
661 .is_none_or(|f| f.is_finite() && (f - f.trunc()).abs() < 1e-10)
662 });
663 if all_whole {
664 ExcelColType::Int64
665 } else {
666 ExcelColType::Float64
667 }
668 } else if has_bool {
669 ExcelColType::Boolean
670 } else {
671 ExcelColType::Utf8
672 }
673 }
674
675 fn excel_parsed_cells_all_midnight(cells: &[Option<&Data>]) -> bool {
677 let midnight = NaiveTime::from_hms_opt(0, 0, 0).expect("valid time");
678 cells
679 .iter()
680 .flatten()
681 .filter_map(|c| Self::excel_cell_to_naive_datetime(c))
682 .all(|dt| dt.time() == midnight)
683 }
684
685 fn excel_cell_to_naive_datetime(cell: &Data) -> Option<NaiveDateTime> {
687 use calamine::DataType;
688 if let Some(dt) = cell.as_datetime() {
689 return Some(dt);
690 }
691 let s = cell.get_datetime_iso().or_else(|| cell.get_string())?;
692 Self::parse_naive_datetime_str(s)
693 }
694
695 fn parse_naive_datetime_str(s: &str) -> Option<NaiveDateTime> {
697 let s = s.trim();
698 if s.is_empty() {
699 return None;
700 }
701 const FORMATS: &[&str] = &[
702 "%Y-%m-%dT%H:%M:%S%.f",
703 "%Y-%m-%dT%H:%M:%S",
704 "%Y-%m-%d %H:%M:%S%.f",
705 "%Y-%m-%d %H:%M:%S",
706 "%Y-%m-%d",
707 ];
708 for fmt in FORMATS {
709 if let Ok(dt) = NaiveDateTime::parse_from_str(s, fmt) {
710 return Some(dt);
711 }
712 }
713 if let Ok(d) = NaiveDate::parse_from_str(s, "%Y-%m-%d") {
714 return Some(d.and_hms_opt(0, 0, 0).expect("midnight"));
715 }
716 None
717 }
718
719 fn excel_column_to_series(
721 name: &str,
722 cells: &[Option<&Data>],
723 col_type: ExcelColType,
724 ) -> Result<Series> {
725 use calamine::DataType as CalamineTrait;
726 use polars::datatypes::TimeUnit;
727 let series = match col_type {
728 ExcelColType::Int64 => {
729 let v: Vec<Option<i64>> = cells
730 .iter()
731 .map(|c| c.and_then(|cell| cell.as_i64()))
732 .collect();
733 Series::new(name.into(), v)
734 }
735 ExcelColType::Float64 => {
736 let v: Vec<Option<f64>> = cells
737 .iter()
738 .map(|c| c.and_then(|cell| cell.as_f64()))
739 .collect();
740 Series::new(name.into(), v)
741 }
742 ExcelColType::Boolean => {
743 let v: Vec<Option<bool>> = cells
744 .iter()
745 .map(|c| c.and_then(|cell| cell.get_bool()))
746 .collect();
747 Series::new(name.into(), v)
748 }
749 ExcelColType::Utf8 => {
750 let v: Vec<Option<String>> = cells
751 .iter()
752 .map(|c| c.and_then(|cell| cell.as_string()))
753 .collect();
754 Series::new(name.into(), v)
755 }
756 ExcelColType::Date => {
757 let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).expect("valid date");
758 let v: Vec<Option<i32>> = cells
759 .iter()
760 .map(|c| {
761 c.and_then(Self::excel_cell_to_naive_datetime)
762 .map(|dt| (dt.date() - epoch).num_days() as i32)
763 })
764 .collect();
765 Series::new(name.into(), v).cast(&DataType::Date)?
766 }
767 ExcelColType::Datetime => {
768 let v: Vec<Option<i64>> = cells
769 .iter()
770 .map(|c| {
771 c.and_then(Self::excel_cell_to_naive_datetime)
772 .map(|dt| dt.and_utc().timestamp_micros())
773 })
774 .collect();
775 Series::new(name.into(), v)
776 .cast(&DataType::Datetime(TimeUnit::Microseconds, None))?
777 }
778 };
779 Ok(series)
780 }
781
782 pub fn from_orc(
785 path: &Path,
786 pages_lookahead: Option<usize>,
787 pages_lookback: Option<usize>,
788 max_buffered_rows: Option<usize>,
789 max_buffered_mb: Option<usize>,
790 row_numbers: bool,
791 row_start_index: usize,
792 ) -> Result<Self> {
793 let file = File::open(path)?;
794 let reader = ArrowReaderBuilder::try_new(file)
795 .map_err(|e| color_eyre::eyre::eyre!("ORC: {}", e))?
796 .build();
797 let batches: Vec<RecordBatch> = reader
798 .collect::<std::result::Result<Vec<_>, _>>()
799 .map_err(|e| color_eyre::eyre::eyre!("ORC: {}", e))?;
800 let df = Self::arrow_record_batches_to_dataframe(&batches)?;
801 let lf = df.lazy();
802 let mut state = Self::new(
803 lf,
804 pages_lookahead,
805 pages_lookback,
806 max_buffered_rows,
807 max_buffered_mb,
808 true,
809 )?;
810 state.row_numbers = row_numbers;
811 state.row_start_index = row_start_index;
812 Ok(state)
813 }
814
815 pub fn from_orc_paths(
817 paths: &[impl AsRef<Path>],
818 pages_lookahead: Option<usize>,
819 pages_lookback: Option<usize>,
820 max_buffered_rows: Option<usize>,
821 max_buffered_mb: Option<usize>,
822 row_numbers: bool,
823 row_start_index: usize,
824 ) -> Result<Self> {
825 if paths.is_empty() {
826 return Err(color_eyre::eyre::eyre!("No paths provided"));
827 }
828 if paths.len() == 1 {
829 return Self::from_orc(
830 paths[0].as_ref(),
831 pages_lookahead,
832 pages_lookback,
833 max_buffered_rows,
834 max_buffered_mb,
835 row_numbers,
836 row_start_index,
837 );
838 }
839 let mut lazy_frames = Vec::with_capacity(paths.len());
840 for p in paths {
841 let file = File::open(p.as_ref())?;
842 let reader = ArrowReaderBuilder::try_new(file)
843 .map_err(|e| color_eyre::eyre::eyre!("ORC: {}", e))?
844 .build();
845 let batches: Vec<RecordBatch> = reader
846 .collect::<std::result::Result<Vec<_>, _>>()
847 .map_err(|e| color_eyre::eyre::eyre!("ORC: {}", e))?;
848 let df = Self::arrow_record_batches_to_dataframe(&batches)?;
849 lazy_frames.push(df.lazy());
850 }
851 let lf = polars::prelude::concat(lazy_frames.as_slice(), Default::default())?;
852 let mut state = Self::new(
853 lf,
854 pages_lookahead,
855 pages_lookback,
856 max_buffered_rows,
857 max_buffered_mb,
858 true,
859 )?;
860 state.row_numbers = row_numbers;
861 state.row_start_index = row_start_index;
862 Ok(state)
863 }
864
865 fn arrow_record_batches_to_dataframe(batches: &[RecordBatch]) -> Result<DataFrame> {
868 if batches.is_empty() {
869 return Ok(DataFrame::new(vec![])?);
870 }
871 let mut all_dfs = Vec::with_capacity(batches.len());
872 for batch in batches {
873 let n_cols = batch.num_columns();
874 let schema = batch.schema();
875 let mut series_vec = Vec::with_capacity(n_cols);
876 for (i, col) in batch.columns().iter().enumerate() {
877 let name = schema.field(i).name().as_str();
878 let s = Self::arrow_array_to_polars_series(name, col)?;
879 series_vec.push(s.into());
880 }
881 let df = DataFrame::new(series_vec)?;
882 all_dfs.push(df);
883 }
884 let mut out = all_dfs.remove(0);
885 for df in all_dfs {
886 out = out.vstack(&df)?;
887 }
888 Ok(out)
889 }
890
891 fn arrow_array_to_polars_series(name: &str, array: &dyn Array) -> Result<Series> {
892 use arrow::datatypes::DataType as ArrowDataType;
893 let len = array.len();
894 match array.data_type() {
895 ArrowDataType::Int8 => {
896 let a = array
897 .as_primitive_opt::<Int8Type>()
898 .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Int8 array"))?;
899 let v: Vec<Option<i8>> = (0..len)
900 .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
901 .collect();
902 Ok(Series::new(name.into(), v))
903 }
904 ArrowDataType::Int16 => {
905 let a = array
906 .as_primitive_opt::<Int16Type>()
907 .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Int16 array"))?;
908 let v: Vec<Option<i16>> = (0..len)
909 .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
910 .collect();
911 Ok(Series::new(name.into(), v))
912 }
913 ArrowDataType::Int32 => {
914 let a = array
915 .as_primitive_opt::<Int32Type>()
916 .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Int32 array"))?;
917 let v: Vec<Option<i32>> = (0..len)
918 .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
919 .collect();
920 Ok(Series::new(name.into(), v))
921 }
922 ArrowDataType::Int64 => {
923 let a = array
924 .as_primitive_opt::<Int64Type>()
925 .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Int64 array"))?;
926 let v: Vec<Option<i64>> = (0..len)
927 .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
928 .collect();
929 Ok(Series::new(name.into(), v))
930 }
931 ArrowDataType::UInt8 => {
932 let a = array
933 .as_primitive_opt::<UInt8Type>()
934 .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected UInt8 array"))?;
935 let v: Vec<Option<i64>> = (0..len)
936 .map(|i| {
937 if a.is_null(i) {
938 None
939 } else {
940 Some(a.value(i) as i64)
941 }
942 })
943 .collect();
944 Ok(Series::new(name.into(), v).cast(&DataType::UInt8)?)
945 }
946 ArrowDataType::UInt16 => {
947 let a = array
948 .as_primitive_opt::<UInt16Type>()
949 .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected UInt16 array"))?;
950 let v: Vec<Option<i64>> = (0..len)
951 .map(|i| {
952 if a.is_null(i) {
953 None
954 } else {
955 Some(a.value(i) as i64)
956 }
957 })
958 .collect();
959 Ok(Series::new(name.into(), v).cast(&DataType::UInt16)?)
960 }
961 ArrowDataType::UInt32 => {
962 let a = array
963 .as_primitive_opt::<UInt32Type>()
964 .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected UInt32 array"))?;
965 let v: Vec<Option<u32>> = (0..len)
966 .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
967 .collect();
968 Ok(Series::new(name.into(), v))
969 }
970 ArrowDataType::UInt64 => {
971 let a = array
972 .as_primitive_opt::<UInt64Type>()
973 .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected UInt64 array"))?;
974 let v: Vec<Option<u64>> = (0..len)
975 .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
976 .collect();
977 Ok(Series::new(name.into(), v))
978 }
979 ArrowDataType::Float32 => {
980 let a = array
981 .as_primitive_opt::<Float32Type>()
982 .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Float32 array"))?;
983 let v: Vec<Option<f32>> = (0..len)
984 .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
985 .collect();
986 Ok(Series::new(name.into(), v))
987 }
988 ArrowDataType::Float64 => {
989 let a = array
990 .as_primitive_opt::<Float64Type>()
991 .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Float64 array"))?;
992 let v: Vec<Option<f64>> = (0..len)
993 .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
994 .collect();
995 Ok(Series::new(name.into(), v))
996 }
997 ArrowDataType::Boolean => {
998 let a = array
999 .as_boolean_opt()
1000 .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Boolean array"))?;
1001 let v: Vec<Option<bool>> = (0..len)
1002 .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
1003 .collect();
1004 Ok(Series::new(name.into(), v))
1005 }
1006 ArrowDataType::Utf8 => {
1007 let a = array
1008 .as_string_opt::<i32>()
1009 .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Utf8 array"))?;
1010 let v: Vec<Option<String>> = (0..len)
1011 .map(|i| {
1012 if a.is_null(i) {
1013 None
1014 } else {
1015 Some(a.value(i).to_string())
1016 }
1017 })
1018 .collect();
1019 Ok(Series::new(name.into(), v))
1020 }
1021 ArrowDataType::LargeUtf8 => {
1022 let a = array
1023 .as_string_opt::<i64>()
1024 .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected LargeUtf8 array"))?;
1025 let v: Vec<Option<String>> = (0..len)
1026 .map(|i| {
1027 if a.is_null(i) {
1028 None
1029 } else {
1030 Some(a.value(i).to_string())
1031 }
1032 })
1033 .collect();
1034 Ok(Series::new(name.into(), v))
1035 }
1036 ArrowDataType::Date32 => {
1037 let a = array
1038 .as_primitive_opt::<Date32Type>()
1039 .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Date32 array"))?;
1040 let v: Vec<Option<i32>> = (0..len)
1041 .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
1042 .collect();
1043 Ok(Series::new(name.into(), v))
1044 }
1045 ArrowDataType::Date64 => {
1046 let a = array
1047 .as_primitive_opt::<Date64Type>()
1048 .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Date64 array"))?;
1049 let v: Vec<Option<i64>> = (0..len)
1050 .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
1051 .collect();
1052 Ok(Series::new(name.into(), v))
1053 }
1054 ArrowDataType::Timestamp(_, _) => {
1055 let a = array
1056 .as_primitive_opt::<TimestampMillisecondType>()
1057 .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Timestamp array"))?;
1058 let v: Vec<Option<i64>> = (0..len)
1059 .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
1060 .collect();
1061 Ok(Series::new(name.into(), v))
1062 }
1063 other => Err(color_eyre::eyre::eyre!(
1064 "ORC: unsupported column type {:?} for column '{}'",
1065 other,
1066 name
1067 )),
1068 }
1069 }
1070
1071 pub fn scan_parquet_hive(path: &Path) -> Result<LazyFrame> {
1074 let path_str = path.as_os_str().to_string_lossy();
1075 let is_glob = path_str.contains('*');
1076 let pl_path = PlPath::Local(Arc::from(path));
1077 let args = ScanArgsParquet {
1078 hive_options: HiveOptions::new_enabled(),
1079 glob: is_glob,
1080 ..Default::default()
1081 };
1082 LazyFrame::scan_parquet(pl_path, args).map_err(Into::into)
1083 }
1084
1085 pub fn scan_parquet_hive_with_schema(path: &Path, schema: Arc<Schema>) -> Result<LazyFrame> {
1087 let path_str = path.as_os_str().to_string_lossy();
1088 let is_glob = path_str.contains('*');
1089 let pl_path = PlPath::Local(Arc::from(path));
1090 let args = ScanArgsParquet {
1091 schema: Some(schema),
1092 hive_options: HiveOptions::new_enabled(),
1093 glob: is_glob,
1094 ..Default::default()
1095 };
1096 LazyFrame::scan_parquet(pl_path, args).map_err(Into::into)
1097 }
1098
1099 fn first_parquet_file_in_hive_dir(path: &Path) -> Option<std::path::PathBuf> {
1102 const MAX_DEPTH: usize = 64;
1103 Self::first_parquet_file_spine(path, 0, MAX_DEPTH)
1104 }
1105
1106 fn first_parquet_file_spine(
1107 path: &Path,
1108 depth: usize,
1109 max_depth: usize,
1110 ) -> Option<std::path::PathBuf> {
1111 if depth >= max_depth {
1112 return None;
1113 }
1114 let entries = fs::read_dir(path).ok()?;
1115 let mut first_partition_child: Option<std::path::PathBuf> = None;
1116 for entry in entries.flatten() {
1117 let child = entry.path();
1118 if child.is_file() {
1119 if child
1120 .extension()
1121 .is_some_and(|e| e.eq_ignore_ascii_case("parquet"))
1122 {
1123 return Some(child);
1124 }
1125 } else if child.is_dir() {
1126 if let Some(name) = child.file_name().and_then(|n| n.to_str()) {
1127 if name.contains('=') && first_partition_child.is_none() {
1128 first_partition_child = Some(child);
1129 }
1130 }
1131 }
1132 }
1133 first_partition_child.and_then(|p| Self::first_parquet_file_spine(&p, depth + 1, max_depth))
1134 }
1135
1136 fn read_schema_from_single_parquet(path: &Path) -> Result<Arc<Schema>> {
1138 let file = File::open(path)?;
1139 let mut reader = ParquetReader::new(file);
1140 let arrow_schema = reader.schema()?;
1141 let schema = Schema::from_arrow_schema(arrow_schema.as_ref());
1142 Ok(Arc::new(schema))
1143 }
1144
1145 pub fn schema_from_one_hive_parquet(path: &Path) -> Result<(Arc<Schema>, Vec<String>)> {
1149 let partition_columns = Self::discover_hive_partition_columns(path);
1150 let one_file = Self::first_parquet_file_in_hive_dir(path)
1151 .ok_or_else(|| color_eyre::eyre::eyre!("No parquet file found in hive directory"))?;
1152 let file_schema = Self::read_schema_from_single_parquet(&one_file)?;
1153 let part_set: HashSet<&str> = partition_columns.iter().map(String::as_str).collect();
1154 let mut merged = Schema::with_capacity(partition_columns.len() + file_schema.len());
1155 for name in &partition_columns {
1156 merged.with_column(name.clone().into(), DataType::String);
1157 }
1158 for (name, dtype) in file_schema.iter() {
1159 if !part_set.contains(name.as_str()) {
1160 merged.with_column(name.clone(), dtype.clone());
1161 }
1162 }
1163 Ok((Arc::new(merged), partition_columns))
1164 }
1165
1166 pub fn discover_hive_partition_columns(path: &Path) -> Vec<String> {
1168 if path.is_dir() {
1169 Self::discover_partition_columns_from_path(path)
1170 } else {
1171 Self::discover_partition_columns_from_glob_pattern(path)
1172 }
1173 }
1174
1175 fn discover_partition_columns_from_path(path: &Path) -> Vec<String> {
1180 const MAX_PARTITION_DEPTH: usize = 64;
1181 let mut columns = Vec::<String>::new();
1182 let mut seen = HashSet::<String>::new();
1183 Self::discover_partition_columns_spine(
1184 path,
1185 &mut columns,
1186 &mut seen,
1187 0,
1188 MAX_PARTITION_DEPTH,
1189 );
1190 columns
1191 }
1192
1193 fn discover_partition_columns_spine(
1197 path: &Path,
1198 columns: &mut Vec<String>,
1199 seen: &mut HashSet<String>,
1200 depth: usize,
1201 max_depth: usize,
1202 ) {
1203 if depth >= max_depth {
1204 return;
1205 }
1206 let Ok(entries) = fs::read_dir(path) else {
1207 return;
1208 };
1209 let mut first_partition_child: Option<std::path::PathBuf> = None;
1210 for entry in entries.flatten() {
1211 let child = entry.path();
1212 if child.is_dir() {
1213 if let Some(name) = child.file_name().and_then(|n| n.to_str()) {
1214 if let Some((key, _)) = name.split_once('=') {
1215 if !key.is_empty() && seen.insert(key.to_string()) {
1216 columns.push(key.to_string());
1217 }
1218 if first_partition_child.is_none() {
1219 first_partition_child = Some(child);
1220 }
1221 break;
1222 }
1223 }
1224 }
1225 }
1226 if let Some(one) = first_partition_child {
1227 Self::discover_partition_columns_spine(&one, columns, seen, depth + 1, max_depth);
1228 }
1229 }
1230
1231 fn discover_partition_columns_from_glob_pattern(path: &Path) -> Vec<String> {
1233 let path_str = path.as_os_str().to_string_lossy();
1234 let mut columns = Vec::<String>::new();
1235 let mut seen = HashSet::<String>::new();
1236 for segment in path_str.split('/') {
1237 if let Some((key, rest)) = segment.split_once('=') {
1238 if !key.is_empty()
1239 && (rest == "*" || !rest.contains('*'))
1240 && seen.insert(key.to_string())
1241 {
1242 columns.push(key.to_string());
1243 }
1244 }
1245 }
1246 columns
1247 }
1248
1249 pub fn from_parquet_hive(
1258 path: &Path,
1259 pages_lookahead: Option<usize>,
1260 pages_lookback: Option<usize>,
1261 max_buffered_rows: Option<usize>,
1262 max_buffered_mb: Option<usize>,
1263 row_numbers: bool,
1264 row_start_index: usize,
1265 ) -> Result<Self> {
1266 let path_str = path.as_os_str().to_string_lossy();
1267 let is_glob = path_str.contains('*');
1268 let pl_path = PlPath::Local(Arc::from(path));
1269 let args = ScanArgsParquet {
1270 hive_options: HiveOptions::new_enabled(),
1271 glob: is_glob,
1272 ..Default::default()
1273 };
1274 let mut lf = LazyFrame::scan_parquet(pl_path, args)?;
1275 let schema = lf.collect_schema()?;
1276
1277 let mut discovered = if path.is_dir() {
1278 Self::discover_partition_columns_from_path(path)
1279 } else {
1280 Self::discover_partition_columns_from_glob_pattern(path)
1281 };
1282
1283 if discovered.is_empty() {
1286 let mut dir = path;
1287 while !dir.is_dir() {
1288 match dir.parent() {
1289 Some(p) => dir = p,
1290 None => break,
1291 }
1292 }
1293 if dir.is_dir() {
1294 discovered = Self::discover_partition_columns_from_path(dir);
1295 }
1296 }
1297
1298 let partition_columns: Vec<String> = discovered
1299 .into_iter()
1300 .filter(|c| schema.contains(c.as_str()))
1301 .collect();
1302
1303 let new_order: Vec<String> = if partition_columns.is_empty() {
1304 schema.iter_names().map(|s| s.to_string()).collect()
1305 } else {
1306 let part_set: HashSet<&str> = partition_columns.iter().map(String::as_str).collect();
1307 let all_names: Vec<String> = schema.iter_names().map(|s| s.to_string()).collect();
1308 let rest: Vec<String> = all_names
1309 .into_iter()
1310 .filter(|c| !part_set.contains(c.as_str()))
1311 .collect();
1312 partition_columns.iter().cloned().chain(rest).collect()
1313 };
1314
1315 if !partition_columns.is_empty() {
1316 let exprs: Vec<Expr> = new_order.iter().map(|s| col(s.as_str())).collect();
1317 lf = lf.select(exprs);
1318 }
1319
1320 let mut state = Self::new(
1321 lf,
1322 pages_lookahead,
1323 pages_lookback,
1324 max_buffered_rows,
1325 max_buffered_mb,
1326 true,
1327 )?;
1328 state.row_numbers = row_numbers;
1329 state.row_start_index = row_start_index;
1330 state.partition_columns = if partition_columns.is_empty() {
1331 None
1332 } else {
1333 Some(partition_columns)
1334 };
1335 state.set_column_order(new_order);
1337 Ok(state)
1338 }
1339
1340 pub fn set_row_numbers(&mut self, enabled: bool) {
1341 self.row_numbers = enabled;
1342 }
1343
1344 pub fn toggle_row_numbers(&mut self) {
1345 self.row_numbers = !self.row_numbers;
1346 }
1347
1348 pub fn row_start_index(&self) -> usize {
1350 self.row_start_index
1351 }
1352
1353 fn decompress_compressed_csv_to_temp(
1355 path: &Path,
1356 compression: CompressionFormat,
1357 temp_dir: &Path,
1358 ) -> Result<NamedTempFile> {
1359 let mut temp = NamedTempFile::new_in(temp_dir)?;
1360 let out = temp.as_file_mut();
1361 let mut reader: Box<dyn Read> = match compression {
1362 CompressionFormat::Gzip => {
1363 let f = File::open(path)?;
1364 Box::new(flate2::read::GzDecoder::new(BufReader::new(f)))
1365 }
1366 CompressionFormat::Zstd => {
1367 let f = File::open(path)?;
1368 Box::new(zstd::Decoder::new(BufReader::new(f))?)
1369 }
1370 CompressionFormat::Bzip2 => {
1371 let f = File::open(path)?;
1372 Box::new(bzip2::read::BzDecoder::new(BufReader::new(f)))
1373 }
1374 CompressionFormat::Xz => {
1375 let f = File::open(path)?;
1376 Box::new(xz2::read::XzDecoder::new(BufReader::new(f)))
1377 }
1378 };
1379 std::io::copy(&mut reader, out)?;
1380 out.sync_all()?;
1381 Ok(temp)
1382 }
1383
1384 fn parse_null_value_specs(specs: &[String]) -> (Vec<String>, Vec<(String, String)>) {
1386 let mut global = Vec::new();
1387 let mut per_column = Vec::new();
1388 for s in specs {
1389 if let Some(i) = s.find('=') {
1390 let (col, val) = (s[..i].to_string(), s[i + 1..].to_string());
1391 per_column.push((col, val));
1392 } else {
1393 global.push(s.clone());
1394 }
1395 }
1396 (global, per_column)
1397 }
1398
1399 fn build_polars_null_values(
1401 global: &[String],
1402 per_column: &[(String, String)],
1403 schema: Option<&Schema>,
1404 ) -> Option<NullValues> {
1405 if global.is_empty() && per_column.is_empty() {
1406 return None;
1407 }
1408 if per_column.is_empty() {
1409 let vals: Vec<PlSmallStr> = global
1410 .iter()
1411 .map(|s| PlSmallStr::from(s.as_str()))
1412 .collect();
1413 return Some(if vals.len() == 1 {
1414 NullValues::AllColumnsSingle(vals[0].clone())
1415 } else {
1416 NullValues::AllColumns(vals)
1417 });
1418 }
1419 if global.is_empty() {
1420 let pairs: Vec<(PlSmallStr, PlSmallStr)> = per_column
1421 .iter()
1422 .map(|(c, v)| (PlSmallStr::from(c.as_str()), PlSmallStr::from(v.as_str())))
1423 .collect();
1424 return Some(NullValues::Named(pairs));
1425 }
1426 let schema = schema?;
1427 let mut pairs: Vec<(PlSmallStr, PlSmallStr)> = Vec::new();
1428 let first_global = PlSmallStr::from(global[0].as_str());
1429 for (name, _) in schema.iter() {
1430 let col_name = name.as_str();
1431 let val = per_column
1432 .iter()
1433 .rev()
1434 .find(|(c, _)| c == col_name)
1435 .map(|(_, v)| PlSmallStr::from(v.as_str()))
1436 .unwrap_or_else(|| first_global.clone());
1437 pairs.push((PlSmallStr::from(col_name), val));
1438 }
1439 Some(NullValues::Named(pairs))
1440 }
1441
1442 fn csv_schema_for_null_values(path: &Path, options: &OpenOptions) -> Result<Arc<Schema>> {
1444 let pl_path = PlPath::Local(Arc::from(path));
1445 let mut reader = LazyCsvReader::new(pl_path).with_n_rows(Some(1));
1446 if let Some(skip_lines) = options.skip_lines {
1447 reader = reader.with_skip_lines(skip_lines);
1448 }
1449 if let Some(skip_rows) = options.skip_rows {
1450 reader = reader.with_skip_rows(skip_rows);
1451 }
1452 if let Some(has_header) = options.has_header {
1453 reader = reader.with_has_header(has_header);
1454 }
1455 reader = reader.with_try_parse_dates(options.csv_try_parse_dates());
1456 let mut lf = reader.finish()?;
1457 lf.collect_schema().map_err(color_eyre::eyre::Report::from)
1458 }
1459
1460 fn build_null_values_for_csv(
1462 options: &OpenOptions,
1463 path_for_schema: Option<&Path>,
1464 ) -> Result<Option<NullValues>> {
1465 let specs = match &options.null_values {
1466 None => return Ok(None),
1467 Some(s) if s.is_empty() => return Ok(None),
1468 Some(s) => s.as_slice(),
1469 };
1470 let (global, per_column) = Self::parse_null_value_specs(specs);
1471 let nv = if !global.is_empty() && !per_column.is_empty() {
1472 let path = path_for_schema.ok_or_else(|| {
1473 color_eyre::eyre::eyre!(
1474 "Internal error: path required for null_values with both global and per-column"
1475 )
1476 })?;
1477 let schema = Self::csv_schema_for_null_values(path, options)?;
1478 Self::build_polars_null_values(&global, &per_column, Some(schema.as_ref()))
1479 } else {
1480 Self::build_polars_null_values(&global, &per_column, None)
1481 };
1482 Ok(nv)
1483 }
1484
1485 fn trim_csv_column_names(mut lf: LazyFrame) -> Result<LazyFrame> {
1487 let schema = lf.collect_schema()?;
1488 let names: Vec<String> = schema.iter_names().map(|s| s.to_string()).collect();
1489 let trimmed: Vec<String> = names.iter().map(|s| s.trim().to_string()).collect();
1490 if names == trimmed {
1491 return Ok(lf);
1492 }
1493 Ok(lf.rename(
1494 names.iter().map(|s| s.as_str()),
1495 trimmed.iter().map(|s| s.as_str()),
1496 false,
1497 ))
1498 }
1499
1500 fn apply_skip_tail_rows_csv(lf: LazyFrame, options: &OpenOptions) -> Result<LazyFrame> {
1502 let n = match options.skip_tail_rows {
1503 None | Some(0) => return Ok(lf),
1504 Some(n) => n,
1505 };
1506 let count_df = collect_lazy(lf.clone().select([len()]), options.polars_streaming)
1507 .map_err(color_eyre::eyre::Report::from)?;
1508 let total: u32 = if let Some(col) = count_df.get(0) {
1509 match col.first() {
1510 Some(AnyValue::UInt32(v)) => *v,
1511 _ => return Ok(lf),
1512 }
1513 } else {
1514 return Ok(lf);
1515 };
1516 let keep = total.saturating_sub(n as u32);
1517 Ok(lf.slice(0, keep))
1518 }
1519
1520 fn infer_date_format_from_sample(sample: &str) -> Option<&'static str> {
1523 const DATE_FMTS: &[&str] = &[
1524 "%Y-%m-%d", "%Y/%m/%d", "%Y.%m.%d", "%Y%m%d", "%d-%m-%Y", "%d/%m/%Y", "%d.%m.%Y",
1525 "%m-%d-%Y", "%m/%d/%Y",
1526 ];
1527 DATE_FMTS
1528 .iter()
1529 .find(|fmt| NaiveDate::parse_from_str(sample, fmt).is_ok())
1530 .copied()
1531 }
1532
1533 fn infer_datetime_format_from_sample(sample: &str) -> Option<&'static str> {
1535 const DATETIME_FMTS: &[&str] = &[
1536 "%Y-%m-%dT%H:%M:%S%.f",
1537 "%Y-%m-%dT%H:%M:%S",
1538 "%Y-%m-%d %H:%M:%S%.f",
1539 "%Y-%m-%d %H:%M:%S",
1540 "%Y-%m-%d %H:%M",
1541 "%Y-%m-%d",
1542 "%d-%m-%YT%H:%M:%S%.f",
1543 "%d-%m-%YT%H:%M:%S",
1544 "%d-%m-%Y %H:%M:%S%.f",
1545 "%d-%m-%Y %H:%M:%S",
1546 "%d/%m/%YT%H:%M:%S%.f",
1547 "%d/%m/%YT%H:%M:%S",
1548 "%d/%m/%Y %H:%M:%S",
1549 "%Y%m%dT%H%M%S%.f",
1550 "%Y%m%d %H%M%S",
1551 ];
1552 DATETIME_FMTS
1553 .iter()
1554 .find(|fmt| NaiveDateTime::parse_from_str(sample, fmt).is_ok())
1555 .copied()
1556 }
1557
1558 fn string_chunked_to_duration_ns(str_ca: &StringChunked) -> DurationChunked {
1561 let name = str_ca.name().clone();
1562 let vals: Vec<Option<i64>> = str_ca
1563 .iter()
1564 .map(|opt_s| {
1565 opt_s.and_then(|s| {
1566 polars::time::Duration::try_parse(s)
1567 .ok()
1568 .map(|d| d.duration_ns())
1569 })
1570 })
1571 .collect();
1572 let int_ca = Int64Chunked::from_iter_options(name, vals.into_iter());
1573 int_ca.into_duration(TimeUnit::Nanoseconds)
1574 }
1575
1576 fn infer_time_format_from_sample(sample: &str) -> Option<&'static str> {
1578 const TIME_FMTS: &[&str] = &[
1579 "%H:%M:%S%.9f",
1580 "%H:%M:%S%.6f",
1581 "%H:%M:%S%.3f",
1582 "%H:%M:%S",
1583 "%H:%M",
1584 ];
1585 TIME_FMTS
1586 .iter()
1587 .find(|fmt| NaiveTime::parse_from_str(sample, fmt).is_ok())
1588 .copied()
1589 }
1590
1591 fn apply_parse_strings_to_csv_lazyframe(
1594 lf: LazyFrame,
1595 options: &OpenOptions,
1596 ) -> Result<LazyFrame> {
1597 let target = match &options.parse_strings {
1598 None => return Ok(lf),
1599 Some(t) => t,
1600 };
1601 let sample_rows = options.parse_strings_sample_rows;
1602 let sample_df = lf.clone().limit(sample_rows as u32).collect()?;
1603 let schema = sample_df.schema();
1604 let string_cols: Vec<String> = schema
1605 .iter()
1606 .filter(|(_name, dtype)| **dtype == DataType::String)
1607 .map(|(name, _)| name.to_string())
1608 .collect();
1609 let target_cols: Vec<String> = match target {
1610 ParseStringsTarget::All => string_cols,
1611 ParseStringsTarget::Columns(c) => c
1612 .iter()
1613 .filter(|name| string_cols.contains(name))
1614 .cloned()
1615 .collect(),
1616 };
1617 if target_cols.is_empty() {
1618 return Ok(lf);
1619 }
1620 use polars::datatypes::TimeUnit;
1621 let whitespace_pat = lit(PlSmallStr::from_static(" \t\n\r"));
1622 let trim_sample_exprs: Vec<Expr> = target_cols
1624 .iter()
1625 .map(|c| {
1626 col(PlSmallStr::from(c.as_str()))
1627 .str()
1628 .strip_chars(whitespace_pat.clone())
1629 .alias(PlSmallStr::from(c.as_str()))
1630 })
1631 .collect();
1632 let blank_to_null_exprs: Vec<Expr> = target_cols
1634 .iter()
1635 .map(|c| {
1636 let name = PlSmallStr::from(c.as_str());
1637 when(col(name.clone()).eq(lit(PlSmallStr::from_static(""))))
1638 .then(Null {}.lit())
1639 .otherwise(col(name.clone()))
1640 .alias(name)
1641 })
1642 .collect();
1643 let sample_df = lf
1644 .clone()
1645 .limit(sample_rows as u32)
1646 .with_columns(trim_sample_exprs)
1647 .with_columns(blank_to_null_exprs)
1648 .collect()?;
1649 let mut exprs = Vec::with_capacity(target_cols.len());
1650 for col_name in &target_cols {
1651 let s = sample_df.column(col_name.as_str())?;
1652 let null_before = s.null_count();
1653 let len = s.len();
1654 let accept_type = |null_after: usize| null_after <= null_before;
1656 enum InferredType {
1658 Date,
1659 Datetime,
1660 Time,
1661 Duration,
1662 Int64,
1663 Float64,
1664 String,
1665 }
1666 let (inferred, date_fmt, datetime_fmt, time_fmt) = if null_before == len {
1667 (InferredType::String, None, None, None)
1669 } else {
1670 match s.str() {
1671 Err(_) => (InferredType::String, None, None, None),
1672 Ok(str_ca) => {
1673 let first_val: Option<&str> = str_ca
1674 .iter()
1675 .find_map(|o: Option<&str>| o.filter(|s: &&str| !s.is_empty()));
1676 let (mut t, mut date_fmt, mut datetime_fmt, mut time_fmt) = match str_ca
1677 .as_date(None, true)
1678 {
1679 Ok(as_date) if accept_type(as_date.null_count()) => {
1680 let fmt = first_val.and_then(Self::infer_date_format_from_sample);
1681 if fmt.is_some() {
1682 (InferredType::Date, fmt.map(String::from), None, None)
1683 } else {
1684 (InferredType::String, None, None, None)
1685 }
1686 }
1687 _ => (InferredType::String, None, None, None),
1688 };
1689 if matches!(t, InferredType::String) {
1690 let amb_name: &str = str_ca.name().as_ref();
1691 let amb_series = Series::new(
1692 PlSmallStr::from(amb_name),
1693 vec!["raise"; str_ca.len()],
1694 );
1695 let amb_ca =
1696 amb_series.str().map_err(color_eyre::eyre::Report::from)?;
1697 (t, date_fmt, datetime_fmt, time_fmt) = match str_ca.as_datetime(
1698 None,
1699 TimeUnit::Microseconds,
1700 true,
1701 false,
1702 None,
1703 amb_ca,
1704 ) {
1705 Ok(as_dt) if accept_type(as_dt.null_count()) => {
1706 let fmt =
1707 first_val.and_then(Self::infer_datetime_format_from_sample);
1708 if fmt.is_some() {
1709 (InferredType::Datetime, None, fmt.map(String::from), None)
1710 } else {
1711 (InferredType::String, None, None, None)
1712 }
1713 }
1714 _ => (InferredType::String, None, None, None),
1715 };
1716 }
1717 if matches!(t, InferredType::String) {
1718 (t, date_fmt, datetime_fmt, time_fmt) = match str_ca.as_time(None, true)
1719 {
1720 Ok(as_time) if accept_type(as_time.null_count()) => {
1721 let fmt =
1722 first_val.and_then(Self::infer_time_format_from_sample);
1723 if fmt.is_some() {
1724 (InferredType::Time, None, None, fmt.map(String::from))
1725 } else {
1726 (InferredType::String, None, None, None)
1727 }
1728 }
1729 _ => (InferredType::String, None, None, None),
1730 };
1731 }
1732 if matches!(t, InferredType::String) {
1733 let duration_ca = Self::string_chunked_to_duration_ns(str_ca);
1734 (t, date_fmt, datetime_fmt, time_fmt) =
1735 if accept_type(duration_ca.null_count()) {
1736 (InferredType::Duration, None, None, None)
1737 } else {
1738 (InferredType::String, None, None, None)
1739 };
1740 }
1741 if matches!(t, InferredType::String) {
1742 (t, date_fmt, datetime_fmt, time_fmt) =
1743 match s.strict_cast(&DataType::Int64) {
1744 Ok(as_int) if accept_type(as_int.null_count()) => {
1745 (InferredType::Int64, None, None, None)
1746 }
1747 _ => (InferredType::String, None, None, None),
1748 };
1749 }
1750 if matches!(t, InferredType::String) {
1751 (t, date_fmt, datetime_fmt, time_fmt) =
1752 match s.strict_cast(&DataType::Float64) {
1753 Ok(as_float) if accept_type(as_float.null_count()) => {
1754 (InferredType::Float64, None, None, None)
1755 }
1756 _ => (InferredType::String, None, None, None),
1757 };
1758 }
1759 (t, date_fmt, datetime_fmt, time_fmt)
1760 }
1761 }
1762 };
1763 let base = col(PlSmallStr::from(col_name.as_str()))
1764 .str()
1765 .strip_chars(whitespace_pat.clone());
1766 let base_with_nulls = when(base.clone().eq(lit(PlSmallStr::from_static(""))))
1768 .then(Null {}.lit())
1769 .otherwise(base.clone());
1770 let expr = match inferred {
1771 InferredType::Date => {
1772 let opts = StrptimeOptions {
1773 format: date_fmt.as_deref().map(PlSmallStr::from),
1774 strict: false,
1775 exact: false,
1776 cache: true,
1777 };
1778 base_with_nulls
1779 .clone()
1780 .str()
1781 .to_date(opts)
1782 .alias(PlSmallStr::from(col_name.as_str()))
1783 }
1784 InferredType::Datetime => {
1785 let opts = StrptimeOptions {
1786 format: datetime_fmt.as_deref().map(PlSmallStr::from),
1787 strict: false,
1788 exact: false,
1789 cache: true,
1790 };
1791 base_with_nulls
1792 .clone()
1793 .str()
1794 .to_datetime(
1795 Some(TimeUnit::Microseconds),
1796 None,
1797 opts,
1798 lit(PlSmallStr::from_static("raise")),
1799 )
1800 .alias(PlSmallStr::from(col_name.as_str()))
1801 }
1802 InferredType::Time => {
1803 let opts = StrptimeOptions {
1804 format: time_fmt.as_deref().map(PlSmallStr::from),
1805 strict: false,
1806 exact: true,
1807 cache: true,
1808 };
1809 base_with_nulls
1810 .clone()
1811 .str()
1812 .to_time(opts)
1813 .alias(PlSmallStr::from(col_name.as_str()))
1814 }
1815 InferredType::Duration => base_with_nulls
1817 .clone()
1818 .map(
1819 |c: Column| {
1820 let str_ca = c.str()?;
1821 let duration_ca = Self::string_chunked_to_duration_ns(str_ca);
1822 Ok(duration_ca.into_column())
1823 },
1824 |_schema: &Schema, field: &Field| {
1825 Ok(Field::new(
1826 field.name().clone(),
1827 DataType::Duration(TimeUnit::Nanoseconds),
1828 ))
1829 },
1830 )
1831 .alias(PlSmallStr::from(col_name.as_str())),
1832 InferredType::Int64 => base_with_nulls
1833 .clone()
1834 .cast(DataType::Int64)
1835 .alias(PlSmallStr::from(col_name.as_str())),
1836 InferredType::Float64 => base_with_nulls
1837 .cast(DataType::Float64)
1838 .alias(PlSmallStr::from(col_name.as_str())),
1839 InferredType::String => base.alias(PlSmallStr::from(col_name.as_str())),
1840 };
1841 exprs.push(expr);
1842 }
1843 Ok(lf.with_columns(exprs))
1844 }
1845
1846 pub fn from_csv(path: &Path, options: &OpenOptions) -> Result<Self> {
1847 let nv = Self::build_null_values_for_csv(options, Some(path))?;
1848
1849 let compression = options
1851 .compression
1852 .or_else(|| CompressionFormat::from_extension(path));
1853
1854 if let Some(compression) = compression {
1855 if options.decompress_in_memory {
1856 match compression {
1858 CompressionFormat::Gzip | CompressionFormat::Zstd => {
1859 let mut read_options = CsvReadOptions::default();
1860 if let Some(skip_lines) = options.skip_lines {
1861 read_options.skip_lines = skip_lines;
1862 }
1863 if let Some(skip_rows) = options.skip_rows {
1864 read_options.skip_rows = skip_rows;
1865 }
1866 if let Some(has_header) = options.has_header {
1867 read_options.has_header = has_header;
1868 }
1869 if let Some(n) = options.infer_schema_length {
1870 read_options.infer_schema_length = Some(n);
1871 }
1872 read_options.ignore_errors = options.ignore_errors;
1873 read_options = read_options.map_parse_options(|opts| {
1874 let o = opts.with_try_parse_dates(options.csv_try_parse_dates());
1875 match &nv {
1876 Some(n) => o.with_null_values(Some(n.clone())),
1877 None => o,
1878 }
1879 });
1880 let df = read_options
1881 .try_into_reader_with_file_path(Some(path.into()))?
1882 .finish()?;
1883 let mut lf = Self::trim_csv_column_names(df.lazy())?;
1884 lf = Self::apply_parse_strings_to_csv_lazyframe(lf, options)?;
1885 lf = Self::apply_skip_tail_rows_csv(lf, options)?;
1886 let mut state = Self::new(
1887 lf,
1888 options.pages_lookahead,
1889 options.pages_lookback,
1890 options.max_buffered_rows,
1891 options.max_buffered_mb,
1892 options.polars_streaming,
1893 )?;
1894 state.row_numbers = options.row_numbers;
1895 state.row_start_index = options.row_start_index;
1896 Ok(state)
1897 }
1898 CompressionFormat::Bzip2 => {
1899 let file = File::open(path)?;
1900 let mut decoder = bzip2::read::BzDecoder::new(BufReader::new(file));
1901 let mut decompressed = Vec::new();
1902 decoder.read_to_end(&mut decompressed)?;
1903 let mut read_options = CsvReadOptions::default();
1904 if let Some(skip_lines) = options.skip_lines {
1905 read_options.skip_lines = skip_lines;
1906 }
1907 if let Some(skip_rows) = options.skip_rows {
1908 read_options.skip_rows = skip_rows;
1909 }
1910 if let Some(has_header) = options.has_header {
1911 read_options.has_header = has_header;
1912 }
1913 if let Some(n) = options.infer_schema_length {
1914 read_options.infer_schema_length = Some(n);
1915 }
1916 read_options.ignore_errors = options.ignore_errors;
1917 read_options = read_options.map_parse_options(|opts| {
1918 let o = opts.with_try_parse_dates(options.csv_try_parse_dates());
1919 match &nv {
1920 Some(n) => o.with_null_values(Some(n.clone())),
1921 None => o,
1922 }
1923 });
1924 let df = CsvReader::new(std::io::Cursor::new(decompressed))
1925 .with_options(read_options)
1926 .finish()?;
1927 let mut lf = Self::trim_csv_column_names(df.lazy())?;
1928 lf = Self::apply_parse_strings_to_csv_lazyframe(lf, options)?;
1929 lf = Self::apply_skip_tail_rows_csv(lf, options)?;
1930 let mut state = Self::new(
1931 lf,
1932 options.pages_lookahead,
1933 options.pages_lookback,
1934 options.max_buffered_rows,
1935 options.max_buffered_mb,
1936 options.polars_streaming,
1937 )?;
1938 state.row_numbers = options.row_numbers;
1939 state.row_start_index = options.row_start_index;
1940 Ok(state)
1941 }
1942 CompressionFormat::Xz => {
1943 let file = File::open(path)?;
1944 let mut decoder = xz2::read::XzDecoder::new(BufReader::new(file));
1945 let mut decompressed = Vec::new();
1946 decoder.read_to_end(&mut decompressed)?;
1947 let mut read_options = CsvReadOptions::default();
1948 if let Some(skip_lines) = options.skip_lines {
1949 read_options.skip_lines = skip_lines;
1950 }
1951 if let Some(skip_rows) = options.skip_rows {
1952 read_options.skip_rows = skip_rows;
1953 }
1954 if let Some(has_header) = options.has_header {
1955 read_options.has_header = has_header;
1956 }
1957 if let Some(n) = options.infer_schema_length {
1958 read_options.infer_schema_length = Some(n);
1959 }
1960 read_options.ignore_errors = options.ignore_errors;
1961 read_options = read_options.map_parse_options(|opts| {
1962 let o = opts.with_try_parse_dates(options.csv_try_parse_dates());
1963 match &nv {
1964 Some(n) => o.with_null_values(Some(n.clone())),
1965 None => o,
1966 }
1967 });
1968 let df = CsvReader::new(std::io::Cursor::new(decompressed))
1969 .with_options(read_options)
1970 .finish()?;
1971 let mut lf = Self::trim_csv_column_names(df.lazy())?;
1972 lf = Self::apply_parse_strings_to_csv_lazyframe(lf, options)?;
1973 lf = Self::apply_skip_tail_rows_csv(lf, options)?;
1974 let mut state = Self::new(
1975 lf,
1976 options.pages_lookahead,
1977 options.pages_lookback,
1978 options.max_buffered_rows,
1979 options.max_buffered_mb,
1980 options.polars_streaming,
1981 )?;
1982 state.row_numbers = options.row_numbers;
1983 state.row_start_index = options.row_start_index;
1984 Ok(state)
1985 }
1986 }
1987 } else {
1988 let temp_dir = options.temp_dir.clone().unwrap_or_else(std::env::temp_dir);
1990 let temp = Self::decompress_compressed_csv_to_temp(path, compression, &temp_dir)?;
1991 let nv_temp = Self::build_null_values_for_csv(options, Some(temp.path()))?;
1992 let mut state = Self::from_csv_customize(
1993 temp.path(),
1994 options.pages_lookahead,
1995 options.pages_lookback,
1996 options.max_buffered_rows,
1997 options.max_buffered_mb,
1998 |mut reader| {
1999 if let Some(skip_lines) = options.skip_lines {
2000 reader = reader.with_skip_lines(skip_lines);
2001 }
2002 if let Some(skip_rows) = options.skip_rows {
2003 reader = reader.with_skip_rows(skip_rows);
2004 }
2005 if let Some(has_header) = options.has_header {
2006 reader = reader.with_has_header(has_header);
2007 }
2008 if let Some(n) = options.infer_schema_length {
2009 reader = reader.with_infer_schema_length(Some(n));
2010 }
2011 reader = reader.with_ignore_errors(options.ignore_errors);
2012 reader = reader.with_try_parse_dates(options.csv_try_parse_dates());
2013 reader = match &nv_temp {
2014 Some(n) => reader
2015 .map_parse_options(|opts| opts.with_null_values(Some(n.clone()))),
2016 None => reader,
2017 };
2018 reader
2019 },
2020 )?;
2021 let mut lf = Self::trim_csv_column_names(std::mem::take(&mut state.lf))?;
2022 state.original_lf = lf.clone();
2023 state.schema = lf.clone().collect_schema()?;
2024 state.lf = lf.clone();
2025 if options.parse_strings.is_some() {
2026 lf = Self::apply_parse_strings_to_csv_lazyframe(lf, options)?;
2027 state.original_lf = lf.clone();
2028 state.schema = lf.clone().collect_schema()?;
2029 state.lf = lf.clone();
2030 }
2031 lf = Self::apply_skip_tail_rows_csv(lf, options)?;
2032 state.original_lf = lf.clone();
2033 state.schema = lf.clone().collect_schema()?;
2034 state.lf = lf;
2035 state.row_numbers = options.row_numbers;
2036 state.row_start_index = options.row_start_index;
2037 state.decompress_temp_file = Some(temp);
2038 Ok(state)
2039 }
2040 } else {
2041 let mut state = Self::from_csv_customize(
2043 path,
2044 options.pages_lookahead,
2045 options.pages_lookback,
2046 options.max_buffered_rows,
2047 options.max_buffered_mb,
2048 |mut reader| {
2049 if let Some(skip_lines) = options.skip_lines {
2050 reader = reader.with_skip_lines(skip_lines);
2051 }
2052 if let Some(skip_rows) = options.skip_rows {
2053 reader = reader.with_skip_rows(skip_rows);
2054 }
2055 if let Some(has_header) = options.has_header {
2056 reader = reader.with_has_header(has_header);
2057 }
2058 if let Some(n) = options.infer_schema_length {
2059 reader = reader.with_infer_schema_length(Some(n));
2060 }
2061 reader = reader.with_ignore_errors(options.ignore_errors);
2062 reader = reader.with_try_parse_dates(options.csv_try_parse_dates());
2063 reader = match &nv {
2064 Some(n) => {
2065 reader.map_parse_options(|opts| opts.with_null_values(Some(n.clone())))
2066 }
2067 None => reader,
2068 };
2069 reader
2070 },
2071 )?;
2072 let mut lf = Self::trim_csv_column_names(std::mem::take(&mut state.lf))?;
2073 state.original_lf = lf.clone();
2074 state.schema = lf.clone().collect_schema()?;
2075 state.lf = lf.clone();
2076 if options.parse_strings.is_some() {
2077 lf = Self::apply_parse_strings_to_csv_lazyframe(lf, options)?;
2078 state.original_lf = lf.clone();
2079 state.schema = lf.clone().collect_schema()?;
2080 state.lf = lf.clone();
2081 }
2082 lf = Self::apply_skip_tail_rows_csv(lf, options)?;
2083 state.original_lf = lf.clone();
2084 state.schema = lf.clone().collect_schema()?;
2085 state.lf = lf;
2086 state.row_numbers = options.row_numbers;
2087 Ok(state)
2088 }
2089 }
2090
2091 pub fn from_csv_customize<F>(
2092 path: &Path,
2093 pages_lookahead: Option<usize>,
2094 pages_lookback: Option<usize>,
2095 max_buffered_rows: Option<usize>,
2096 max_buffered_mb: Option<usize>,
2097 func: F,
2098 ) -> Result<Self>
2099 where
2100 F: FnOnce(LazyCsvReader) -> LazyCsvReader,
2101 {
2102 let pl_path = PlPath::Local(Arc::from(path));
2103 let reader = LazyCsvReader::new(pl_path);
2104 let lf = func(reader).finish()?;
2105 Self::new(
2106 lf,
2107 pages_lookahead,
2108 pages_lookback,
2109 max_buffered_rows,
2110 max_buffered_mb,
2111 true,
2112 )
2113 }
2114
2115 pub fn from_csv_paths(paths: &[impl AsRef<Path>], options: &OpenOptions) -> Result<Self> {
2117 if paths.is_empty() {
2118 return Err(color_eyre::eyre::eyre!("No paths provided"));
2119 }
2120 if paths.len() == 1 {
2121 return Self::from_csv(paths[0].as_ref(), options);
2122 }
2123 let nv = Self::build_null_values_for_csv(options, Some(paths[0].as_ref()))?;
2124 let mut lazy_frames = Vec::with_capacity(paths.len());
2125 for p in paths {
2126 let pl_path = PlPath::Local(Arc::from(p.as_ref()));
2127 let mut reader = LazyCsvReader::new(pl_path);
2128 if let Some(skip_lines) = options.skip_lines {
2129 reader = reader.with_skip_lines(skip_lines);
2130 }
2131 if let Some(skip_rows) = options.skip_rows {
2132 reader = reader.with_skip_rows(skip_rows);
2133 }
2134 if let Some(has_header) = options.has_header {
2135 reader = reader.with_has_header(has_header);
2136 }
2137 if let Some(n) = options.infer_schema_length {
2138 reader = reader.with_infer_schema_length(Some(n));
2139 }
2140 reader = reader.with_ignore_errors(options.ignore_errors);
2141 reader = reader.with_try_parse_dates(options.csv_try_parse_dates());
2142 reader = match &nv {
2143 Some(n) => reader.map_parse_options(|opts| opts.with_null_values(Some(n.clone()))),
2144 None => reader,
2145 };
2146 let lf = reader.finish()?;
2147 lazy_frames.push(lf);
2148 }
2149 let mut lf = Self::trim_csv_column_names(polars::prelude::concat(
2150 lazy_frames.as_slice(),
2151 Default::default(),
2152 )?)?;
2153 lf = Self::apply_parse_strings_to_csv_lazyframe(lf, options)?;
2154 lf = Self::apply_skip_tail_rows_csv(lf, options)?;
2155 let mut state = Self::new(
2156 lf,
2157 options.pages_lookahead,
2158 options.pages_lookback,
2159 options.max_buffered_rows,
2160 options.max_buffered_mb,
2161 options.polars_streaming,
2162 )?;
2163 state.row_numbers = options.row_numbers;
2164 state.row_start_index = options.row_start_index;
2165 Ok(state)
2166 }
2167
2168 pub fn from_ndjson(
2169 path: &Path,
2170 pages_lookahead: Option<usize>,
2171 pages_lookback: Option<usize>,
2172 max_buffered_rows: Option<usize>,
2173 max_buffered_mb: Option<usize>,
2174 row_numbers: bool,
2175 row_start_index: usize,
2176 ) -> Result<Self> {
2177 let pl_path = PlPath::Local(Arc::from(path));
2178 let lf = LazyJsonLineReader::new(pl_path).finish()?;
2179 let mut state = Self::new(
2180 lf,
2181 pages_lookahead,
2182 pages_lookback,
2183 max_buffered_rows,
2184 max_buffered_mb,
2185 true,
2186 )?;
2187 state.row_numbers = row_numbers;
2188 state.row_start_index = row_start_index;
2189 Ok(state)
2190 }
2191
2192 pub fn from_ndjson_paths(
2194 paths: &[impl AsRef<Path>],
2195 pages_lookahead: Option<usize>,
2196 pages_lookback: Option<usize>,
2197 max_buffered_rows: Option<usize>,
2198 max_buffered_mb: Option<usize>,
2199 row_numbers: bool,
2200 row_start_index: usize,
2201 ) -> Result<Self> {
2202 if paths.is_empty() {
2203 return Err(color_eyre::eyre::eyre!("No paths provided"));
2204 }
2205 if paths.len() == 1 {
2206 return Self::from_ndjson(
2207 paths[0].as_ref(),
2208 pages_lookahead,
2209 pages_lookback,
2210 max_buffered_rows,
2211 max_buffered_mb,
2212 row_numbers,
2213 row_start_index,
2214 );
2215 }
2216 let mut lazy_frames = Vec::with_capacity(paths.len());
2217 for p in paths {
2218 let pl_path = PlPath::Local(Arc::from(p.as_ref()));
2219 let lf = LazyJsonLineReader::new(pl_path).finish()?;
2220 lazy_frames.push(lf);
2221 }
2222 let lf = polars::prelude::concat(lazy_frames.as_slice(), Default::default())?;
2223 let mut state = Self::new(
2224 lf,
2225 pages_lookahead,
2226 pages_lookback,
2227 max_buffered_rows,
2228 max_buffered_mb,
2229 true,
2230 )?;
2231 state.row_numbers = row_numbers;
2232 state.row_start_index = row_start_index;
2233 Ok(state)
2234 }
2235
2236 pub fn from_json(
2237 path: &Path,
2238 pages_lookahead: Option<usize>,
2239 pages_lookback: Option<usize>,
2240 max_buffered_rows: Option<usize>,
2241 max_buffered_mb: Option<usize>,
2242 row_numbers: bool,
2243 row_start_index: usize,
2244 ) -> Result<Self> {
2245 Self::from_json_with_format(
2246 path,
2247 pages_lookahead,
2248 pages_lookback,
2249 max_buffered_rows,
2250 max_buffered_mb,
2251 row_numbers,
2252 row_start_index,
2253 JsonFormat::Json,
2254 )
2255 }
2256
2257 pub fn from_json_lines(
2258 path: &Path,
2259 pages_lookahead: Option<usize>,
2260 pages_lookback: Option<usize>,
2261 max_buffered_rows: Option<usize>,
2262 max_buffered_mb: Option<usize>,
2263 row_numbers: bool,
2264 row_start_index: usize,
2265 ) -> Result<Self> {
2266 Self::from_json_with_format(
2267 path,
2268 pages_lookahead,
2269 pages_lookback,
2270 max_buffered_rows,
2271 max_buffered_mb,
2272 row_numbers,
2273 row_start_index,
2274 JsonFormat::JsonLines,
2275 )
2276 }
2277
2278 #[allow(clippy::too_many_arguments)]
2279 fn from_json_with_format(
2280 path: &Path,
2281 pages_lookahead: Option<usize>,
2282 pages_lookback: Option<usize>,
2283 max_buffered_rows: Option<usize>,
2284 max_buffered_mb: Option<usize>,
2285 row_numbers: bool,
2286 row_start_index: usize,
2287 format: JsonFormat,
2288 ) -> Result<Self> {
2289 let file = File::open(path)?;
2290 let lf = JsonReader::new(file)
2291 .with_json_format(format)
2292 .finish()?
2293 .lazy();
2294 let mut state = Self::new(
2295 lf,
2296 pages_lookahead,
2297 pages_lookback,
2298 max_buffered_rows,
2299 max_buffered_mb,
2300 true,
2301 )?;
2302 state.row_numbers = row_numbers;
2303 state.row_start_index = row_start_index;
2304 Ok(state)
2305 }
2306
2307 pub fn from_json_paths(
2309 paths: &[impl AsRef<Path>],
2310 pages_lookahead: Option<usize>,
2311 pages_lookback: Option<usize>,
2312 max_buffered_rows: Option<usize>,
2313 max_buffered_mb: Option<usize>,
2314 row_numbers: bool,
2315 row_start_index: usize,
2316 ) -> Result<Self> {
2317 Self::from_json_with_format_paths(
2318 paths,
2319 pages_lookahead,
2320 pages_lookback,
2321 max_buffered_rows,
2322 max_buffered_mb,
2323 row_numbers,
2324 row_start_index,
2325 JsonFormat::Json,
2326 )
2327 }
2328
2329 pub fn from_json_lines_paths(
2331 paths: &[impl AsRef<Path>],
2332 pages_lookahead: Option<usize>,
2333 pages_lookback: Option<usize>,
2334 max_buffered_rows: Option<usize>,
2335 max_buffered_mb: Option<usize>,
2336 row_numbers: bool,
2337 row_start_index: usize,
2338 ) -> Result<Self> {
2339 Self::from_json_with_format_paths(
2340 paths,
2341 pages_lookahead,
2342 pages_lookback,
2343 max_buffered_rows,
2344 max_buffered_mb,
2345 row_numbers,
2346 row_start_index,
2347 JsonFormat::JsonLines,
2348 )
2349 }
2350
2351 #[allow(clippy::too_many_arguments)]
2352 fn from_json_with_format_paths(
2353 paths: &[impl AsRef<Path>],
2354 pages_lookahead: Option<usize>,
2355 pages_lookback: Option<usize>,
2356 max_buffered_rows: Option<usize>,
2357 max_buffered_mb: Option<usize>,
2358 row_numbers: bool,
2359 row_start_index: usize,
2360 format: JsonFormat,
2361 ) -> Result<Self> {
2362 if paths.is_empty() {
2363 return Err(color_eyre::eyre::eyre!("No paths provided"));
2364 }
2365 if paths.len() == 1 {
2366 return Self::from_json_with_format(
2367 paths[0].as_ref(),
2368 pages_lookahead,
2369 pages_lookback,
2370 max_buffered_rows,
2371 max_buffered_mb,
2372 row_numbers,
2373 row_start_index,
2374 format,
2375 );
2376 }
2377 let mut lazy_frames = Vec::with_capacity(paths.len());
2378 for p in paths {
2379 let file = File::open(p.as_ref())?;
2380 let lf = match &format {
2381 JsonFormat::Json => JsonReader::new(file)
2382 .with_json_format(JsonFormat::Json)
2383 .finish()?
2384 .lazy(),
2385 JsonFormat::JsonLines => JsonReader::new(file)
2386 .with_json_format(JsonFormat::JsonLines)
2387 .finish()?
2388 .lazy(),
2389 };
2390 lazy_frames.push(lf);
2391 }
2392 let lf = polars::prelude::concat(lazy_frames.as_slice(), Default::default())?;
2393 let mut state = Self::new(
2394 lf,
2395 pages_lookahead,
2396 pages_lookback,
2397 max_buffered_rows,
2398 max_buffered_mb,
2399 true,
2400 )?;
2401 state.row_numbers = row_numbers;
2402 state.row_start_index = row_start_index;
2403 Ok(state)
2404 }
2405
2406 pub fn from_delimited(path: &Path, delimiter: u8, options: &OpenOptions) -> Result<Self> {
2407 let pl_path = PlPath::Local(Arc::from(path));
2408 let mut reader = LazyCsvReader::new(pl_path).with_separator(delimiter);
2409 if let Some(skip_lines) = options.skip_lines {
2410 reader = reader.with_skip_lines(skip_lines);
2411 }
2412 if let Some(skip_rows) = options.skip_rows {
2413 reader = reader.with_skip_rows(skip_rows);
2414 }
2415 if let Some(has_header) = options.has_header {
2416 reader = reader.with_has_header(has_header);
2417 }
2418 let lf = reader.finish()?;
2419 let mut state = Self::new(
2420 lf,
2421 options.pages_lookahead,
2422 options.pages_lookback,
2423 options.max_buffered_rows,
2424 options.max_buffered_mb,
2425 true,
2426 )?;
2427 state.row_numbers = options.row_numbers;
2428 state.row_start_index = options.row_start_index;
2429 Ok(state)
2430 }
2431
2432 pub fn scroll_would_trigger_collect(&self, rows: i64) -> bool {
2435 if rows < 0 && self.start_row == 0 {
2436 return false;
2437 }
2438 let new_start_row = if self.start_row as i64 + rows <= 0 {
2439 0
2440 } else {
2441 if let Some(df) = self.df.as_ref() {
2442 if rows > 0 && df.shape().0 <= self.visible_rows {
2443 return false;
2444 }
2445 }
2446 (self.start_row as i64 + rows) as usize
2447 };
2448 let view_end = new_start_row
2449 + self
2450 .visible_rows
2451 .min(self.num_rows.saturating_sub(new_start_row));
2452 let within_buffer = new_start_row >= self.buffered_start_row
2453 && view_end <= self.buffered_end_row
2454 && self.buffered_end_row > 0;
2455 !within_buffer
2456 }
2457
2458 fn slide_table(&mut self, rows: i64) {
2459 if rows < 0 && self.start_row == 0 {
2460 return;
2461 }
2462
2463 let new_start_row = if self.start_row as i64 + rows <= 0 {
2464 0
2465 } else {
2466 if let Some(df) = self.df.as_ref() {
2467 if rows > 0 && df.shape().0 <= self.visible_rows {
2468 return;
2469 }
2470 }
2471 (self.start_row as i64 + rows) as usize
2472 };
2473
2474 let view_end = new_start_row
2476 + self
2477 .visible_rows
2478 .min(self.num_rows.saturating_sub(new_start_row));
2479 let within_buffer = new_start_row >= self.buffered_start_row
2480 && view_end <= self.buffered_end_row
2481 && self.buffered_end_row > 0;
2482
2483 if within_buffer {
2484 self.start_row = new_start_row;
2485 return;
2486 }
2487
2488 self.start_row = new_start_row;
2489 self.collect();
2490 }
2491
2492 pub fn collect(&mut self) {
2493 if self.visible_rows > 0 {
2495 self.proximity_threshold = self.visible_rows;
2496 }
2497
2498 if !self.num_rows_valid {
2500 self.num_rows =
2501 match collect_lazy(self.lf.clone().select([len()]), self.polars_streaming) {
2502 Ok(df) => {
2503 if let Some(col) = df.get(0) {
2504 if let Some(AnyValue::UInt32(len)) = col.first() {
2505 *len as usize
2506 } else {
2507 0
2508 }
2509 } else {
2510 0
2511 }
2512 }
2513 Err(_) => 0,
2514 };
2515 self.num_rows_valid = true;
2516 }
2517
2518 if self.num_rows > 0 {
2519 let max_start = self.num_rows.saturating_sub(1);
2520 if self.start_row > max_start {
2521 self.start_row = max_start;
2522 }
2523 } else {
2524 self.start_row = 0;
2525 self.buffered_start_row = 0;
2526 self.buffered_end_row = 0;
2527 self.buffered_df = None;
2528 self.df = None;
2529 self.locked_df = None;
2530 return;
2531 }
2532
2533 let view_start = self.start_row;
2535 let view_end = self.start_row + self.visible_rows.min(self.num_rows - self.start_row);
2536
2537 let within_buffer = view_start >= self.buffered_start_row
2539 && view_end <= self.buffered_end_row
2540 && self.buffered_end_row > 0;
2541
2542 let page_rows = self.visible_rows.max(1);
2545
2546 if within_buffer {
2547 let dist_to_start = view_start.saturating_sub(self.buffered_start_row);
2548 let dist_to_end = self.buffered_end_row.saturating_sub(view_end);
2549
2550 let needs_expansion_back =
2551 dist_to_start <= self.proximity_threshold && self.buffered_start_row > 0;
2552 let needs_expansion_forward =
2553 dist_to_end <= self.proximity_threshold && self.buffered_end_row < self.num_rows;
2554
2555 if !needs_expansion_back && !needs_expansion_forward {
2556 let expected_len = self
2558 .buffered_end_row
2559 .saturating_sub(self.buffered_start_row);
2560 if self
2561 .buffered_df
2562 .as_ref()
2563 .is_some_and(|b| b.height() == expected_len)
2564 {
2565 self.slice_buffer_into_display();
2566 if self.table_state.selected().is_none() {
2567 self.table_state.select(Some(0));
2568 }
2569 return;
2570 }
2571 self.load_buffer(self.buffered_start_row, self.buffered_end_row);
2572 if self.table_state.selected().is_none() {
2573 self.table_state.select(Some(0));
2574 }
2575 return;
2576 }
2577
2578 let mut new_buffer_start = if needs_expansion_back {
2579 view_start.saturating_sub(self.pages_lookback * page_rows)
2580 } else {
2581 self.buffered_start_row
2582 };
2583
2584 let mut new_buffer_end = if needs_expansion_forward {
2585 (view_end + self.pages_lookahead * page_rows).min(self.num_rows)
2586 } else {
2587 self.buffered_end_row
2588 };
2589
2590 self.clamp_buffer_to_max_size(
2591 view_start,
2592 view_end,
2593 &mut new_buffer_start,
2594 &mut new_buffer_end,
2595 );
2596 self.load_buffer(new_buffer_start, new_buffer_end);
2597 } else {
2598 let mut new_buffer_start;
2603 let mut new_buffer_end;
2604
2605 let had_buffer = self.buffered_end_row > 0;
2606 let scrolled_past_end = had_buffer && view_start >= self.buffered_end_row;
2607 let scrolled_past_start = had_buffer && view_end <= self.buffered_start_row;
2608
2609 let extend_forward_ok = scrolled_past_end
2610 && (view_start - self.buffered_end_row) <= self.pages_lookahead * page_rows;
2611 let extend_backward_ok = scrolled_past_start
2612 && (self.buffered_start_row - view_end) <= self.pages_lookback * page_rows;
2613
2614 if extend_forward_ok {
2615 new_buffer_start = self.buffered_start_row;
2617 new_buffer_end = (view_end + self.pages_lookahead * page_rows).min(self.num_rows);
2618 } else if extend_backward_ok {
2619 new_buffer_start = view_start.saturating_sub(self.pages_lookback * page_rows);
2621 new_buffer_end = self.buffered_end_row;
2622 } else if scrolled_past_end || scrolled_past_start {
2623 new_buffer_start = view_start.saturating_sub(self.pages_lookback * page_rows);
2625 new_buffer_end = (view_end + self.pages_lookahead * page_rows).min(self.num_rows);
2626 let min_initial_len = (1 + self.pages_lookahead + self.pages_lookback) * page_rows;
2627 let current_len = new_buffer_end.saturating_sub(new_buffer_start);
2628 if current_len < min_initial_len {
2629 let need = min_initial_len.saturating_sub(current_len);
2630 let can_extend_end = self.num_rows.saturating_sub(new_buffer_end);
2631 let can_extend_start = new_buffer_start;
2632 if can_extend_end >= need {
2633 new_buffer_end = (new_buffer_end + need).min(self.num_rows);
2634 } else if can_extend_start >= need {
2635 new_buffer_start = new_buffer_start.saturating_sub(need);
2636 } else {
2637 new_buffer_end = (new_buffer_end + can_extend_end).min(self.num_rows);
2638 new_buffer_start =
2639 new_buffer_start.saturating_sub(need.saturating_sub(can_extend_end));
2640 }
2641 }
2642 } else {
2643 new_buffer_start = view_start.saturating_sub(self.pages_lookback * page_rows);
2645 new_buffer_end = (view_end + self.pages_lookahead * page_rows).min(self.num_rows);
2646
2647 let min_initial_len = (1 + self.pages_lookahead + self.pages_lookback) * page_rows;
2649 let current_len = new_buffer_end.saturating_sub(new_buffer_start);
2650 if current_len < min_initial_len {
2651 let need = min_initial_len.saturating_sub(current_len);
2652 let can_extend_end = self.num_rows.saturating_sub(new_buffer_end);
2653 let can_extend_start = new_buffer_start;
2654 if can_extend_end >= need {
2655 new_buffer_end = (new_buffer_end + need).min(self.num_rows);
2656 } else if can_extend_start >= need {
2657 new_buffer_start = new_buffer_start.saturating_sub(need);
2658 } else {
2659 new_buffer_end = (new_buffer_end + can_extend_end).min(self.num_rows);
2660 new_buffer_start =
2661 new_buffer_start.saturating_sub(need.saturating_sub(can_extend_end));
2662 }
2663 }
2664 }
2665
2666 self.clamp_buffer_to_max_size(
2667 view_start,
2668 view_end,
2669 &mut new_buffer_start,
2670 &mut new_buffer_end,
2671 );
2672 self.load_buffer(new_buffer_start, new_buffer_end);
2673 }
2674
2675 self.slice_from_buffer();
2676 if self.table_state.selected().is_none() {
2677 self.table_state.select(Some(0));
2678 }
2679 }
2680
2681 fn invalidate_num_rows(&mut self) {
2683 self.num_rows_valid = false;
2684 }
2685
2686 pub fn num_rows_if_valid(&self) -> Option<usize> {
2689 if self.num_rows_valid {
2690 Some(self.num_rows)
2691 } else {
2692 None
2693 }
2694 }
2695
2696 fn clamp_buffer_to_max_size(
2698 &self,
2699 view_start: usize,
2700 view_end: usize,
2701 buffer_start: &mut usize,
2702 buffer_end: &mut usize,
2703 ) {
2704 if self.max_buffered_rows == 0 {
2705 return;
2706 }
2707 let max_len = self.max_buffered_rows;
2708 let requested_len = buffer_end.saturating_sub(*buffer_start);
2709 if requested_len <= max_len {
2710 return;
2711 }
2712 let view_len = view_end.saturating_sub(view_start);
2713 if view_len >= max_len {
2714 *buffer_start = view_start;
2715 *buffer_end = (view_start + max_len).min(self.num_rows);
2716 } else {
2717 let half = (max_len - view_len) / 2;
2718 *buffer_end = (view_end + half).min(self.num_rows);
2719 *buffer_start = (*buffer_end).saturating_sub(max_len);
2720 if *buffer_start > view_start {
2721 *buffer_start = view_start;
2722 }
2723 *buffer_end = (*buffer_start + max_len).min(self.num_rows);
2724 }
2725 }
2726
2727 fn load_buffer(&mut self, buffer_start: usize, buffer_end: usize) {
2728 let buffer_size = buffer_end.saturating_sub(buffer_start);
2729 if buffer_size == 0 {
2730 return;
2731 }
2732
2733 let all_columns: Vec<_> = self
2734 .column_order
2735 .iter()
2736 .map(|name| col(name.as_str()))
2737 .collect();
2738
2739 let use_streaming = self.polars_streaming;
2740 let mut full_df = match collect_lazy(
2741 self.lf
2742 .clone()
2743 .select(all_columns)
2744 .slice(buffer_start as i64, buffer_size as u32),
2745 use_streaming,
2746 ) {
2747 Ok(df) => df,
2748 Err(e) => {
2749 self.error = Some(e);
2750 return;
2751 }
2752 };
2753
2754 let mut effective_buffer_end = buffer_end;
2755 if self.max_buffered_mb > 0 {
2756 let size = full_df.estimated_size();
2757 let max_bytes = self.max_buffered_mb * 1024 * 1024;
2758 if size > max_bytes {
2759 let rows = full_df.height();
2760 if rows > 0 {
2761 let bytes_per_row = size / rows;
2762 let max_rows = (max_bytes / bytes_per_row.max(1)).min(rows);
2763 if max_rows < rows {
2764 full_df = full_df.slice(0, max_rows);
2765 effective_buffer_end = buffer_start + max_rows;
2766 }
2767 }
2768 }
2769 }
2770
2771 if self.locked_columns_count > 0 {
2772 let locked_names: Vec<&str> = self
2773 .column_order
2774 .iter()
2775 .take(self.locked_columns_count)
2776 .map(|s| s.as_str())
2777 .collect();
2778 let locked_df = match full_df.select(locked_names) {
2779 Ok(df) => df,
2780 Err(e) => {
2781 self.error = Some(e);
2782 return;
2783 }
2784 };
2785 self.locked_df = if self.is_grouped() {
2786 match self.format_grouped_dataframe(locked_df) {
2787 Ok(formatted_df) => Some(formatted_df),
2788 Err(e) => {
2789 self.error = Some(PolarsError::ComputeError(
2790 crate::error_display::user_message_from_report(&e, None).into(),
2791 ));
2792 return;
2793 }
2794 }
2795 } else {
2796 Some(locked_df)
2797 };
2798 } else {
2799 self.locked_df = None;
2800 }
2801
2802 let scroll_names: Vec<&str> = self
2803 .column_order
2804 .iter()
2805 .skip(self.locked_columns_count + self.termcol_index)
2806 .map(|s| s.as_str())
2807 .collect();
2808 if scroll_names.is_empty() {
2809 self.df = None;
2810 } else {
2811 let scroll_df = match full_df.select(scroll_names) {
2812 Ok(df) => df,
2813 Err(e) => {
2814 self.error = Some(e);
2815 return;
2816 }
2817 };
2818 self.df = if self.is_grouped() {
2819 match self.format_grouped_dataframe(scroll_df) {
2820 Ok(formatted_df) => Some(formatted_df),
2821 Err(e) => {
2822 self.error = Some(PolarsError::ComputeError(
2823 crate::error_display::user_message_from_report(&e, None).into(),
2824 ));
2825 return;
2826 }
2827 }
2828 } else {
2829 Some(scroll_df)
2830 };
2831 }
2832 if self.error.is_some() {
2833 self.error = None;
2834 }
2835 self.buffered_start_row = buffer_start;
2836 self.buffered_end_row = effective_buffer_end;
2837 self.buffered_df = Some(full_df);
2838 }
2839
2840 fn slice_buffer_into_display(&mut self) {
2842 let full_df = match self.buffered_df.as_ref() {
2843 Some(df) => df,
2844 None => return,
2845 };
2846
2847 if self.locked_columns_count > 0 {
2848 let locked_names: Vec<&str> = self
2849 .column_order
2850 .iter()
2851 .take(self.locked_columns_count)
2852 .map(|s| s.as_str())
2853 .collect();
2854 if let Ok(locked_df) = full_df.select(locked_names) {
2855 self.locked_df = if self.is_grouped() {
2856 self.format_grouped_dataframe(locked_df).ok()
2857 } else {
2858 Some(locked_df)
2859 };
2860 }
2861 } else {
2862 self.locked_df = None;
2863 }
2864
2865 let scroll_names: Vec<&str> = self
2866 .column_order
2867 .iter()
2868 .skip(self.locked_columns_count + self.termcol_index)
2869 .map(|s| s.as_str())
2870 .collect();
2871 if scroll_names.is_empty() {
2872 self.df = None;
2873 } else if let Ok(scroll_df) = full_df.select(scroll_names) {
2874 self.df = if self.is_grouped() {
2875 self.format_grouped_dataframe(scroll_df).ok()
2876 } else {
2877 Some(scroll_df)
2878 };
2879 }
2880 }
2881
2882 fn slice_from_buffer(&mut self) {
2883 }
2888
2889 fn format_grouped_dataframe(&self, df: DataFrame) -> Result<DataFrame> {
2890 let schema = df.schema();
2891 let mut new_series = Vec::new();
2892
2893 for (col_name, dtype) in schema.iter() {
2894 let col = df.column(col_name)?;
2895 if matches!(dtype, DataType::List(_)) {
2896 let string_series: Series = col
2897 .list()?
2898 .into_iter()
2899 .map(|opt_list| {
2900 opt_list.map(|list_series| {
2901 let values: Vec<String> = list_series
2902 .iter()
2903 .take(10)
2904 .map(|v| v.str_value().to_string())
2905 .collect();
2906 if list_series.len() > 10 {
2907 format!("[{}...] ({} items)", values.join(", "), list_series.len())
2908 } else {
2909 format!("[{}]", values.join(", "))
2910 }
2911 })
2912 })
2913 .collect();
2914 new_series.push(string_series.with_name(col_name.as_str().into()).into());
2915 } else {
2916 new_series.push(col.clone());
2917 }
2918 }
2919
2920 Ok(DataFrame::new(new_series)?)
2921 }
2922
2923 pub fn select_next(&mut self) {
2924 self.table_state.select_next();
2925 if let Some(selected) = self.table_state.selected() {
2926 if selected >= self.visible_rows && self.visible_rows > 0 {
2927 self.slide_table(1);
2928 }
2929 }
2930 }
2931
2932 pub fn page_down(&mut self) {
2933 self.slide_table(self.visible_rows as i64);
2934 }
2935
2936 pub fn select_previous(&mut self) {
2937 if let Some(selected) = self.table_state.selected() {
2938 self.table_state.select_previous();
2939 if selected == 0 && self.start_row > 0 {
2940 self.slide_table(-1);
2941 }
2942 } else {
2943 self.table_state.select(Some(0));
2944 }
2945 }
2946
2947 pub fn scroll_to(&mut self, index: usize) {
2948 if self.start_row == index {
2949 return;
2950 }
2951
2952 if index == 0 {
2953 self.start_row = 0;
2954 self.collect();
2955 self.start_row = 0;
2956 } else {
2957 self.start_row = index;
2958 self.collect();
2959 }
2960 }
2961
2962 pub fn scroll_to_row_centered(&mut self, row_index: usize) {
2965 self.ensure_num_rows();
2966 if self.num_rows == 0 || self.visible_rows == 0 {
2967 return;
2968 }
2969 let center_offset = self.visible_rows / 2;
2970 let mut start_row = row_index.saturating_sub(center_offset);
2971 let max_start = self.num_rows.saturating_sub(self.visible_rows);
2972 start_row = start_row.min(max_start);
2973
2974 if self.start_row == start_row {
2975 let display_idx = row_index
2976 .saturating_sub(start_row)
2977 .min(self.visible_rows.saturating_sub(1));
2978 self.table_state.select(Some(display_idx));
2979 return;
2980 }
2981
2982 self.start_row = start_row;
2983 self.collect();
2984 let display_idx = row_index
2985 .saturating_sub(start_row)
2986 .min(self.visible_rows.saturating_sub(1));
2987 self.table_state.select(Some(display_idx));
2988 }
2989
2990 fn ensure_num_rows(&mut self) {
2992 if self.num_rows_valid {
2993 return;
2994 }
2995 if self.visible_rows > 0 {
2996 self.proximity_threshold = self.visible_rows;
2997 }
2998 self.num_rows = match self.lf.clone().select([len()]).collect() {
2999 Ok(df) => {
3000 if let Some(col) = df.get(0) {
3001 if let Some(AnyValue::UInt32(len)) = col.first() {
3002 *len as usize
3003 } else {
3004 0
3005 }
3006 } else {
3007 0
3008 }
3009 }
3010 Err(_) => 0,
3011 };
3012 self.num_rows_valid = true;
3013 }
3014
3015 pub fn scroll_to_end(&mut self) {
3017 self.ensure_num_rows();
3018 if self.num_rows == 0 {
3019 self.start_row = 0;
3020 self.buffered_start_row = 0;
3021 self.buffered_end_row = 0;
3022 return;
3023 }
3024 let end_start = self.num_rows.saturating_sub(self.visible_rows);
3025 if self.start_row == end_start {
3026 self.select_last_visible_row();
3027 return;
3028 }
3029 self.start_row = end_start;
3030 self.collect();
3031 self.select_last_visible_row();
3032 }
3033
3034 fn select_last_visible_row(&mut self) {
3036 if self.num_rows == 0 {
3037 return;
3038 }
3039 let last_row_display_idx = (self.num_rows - 1).saturating_sub(self.start_row);
3040 let sel = last_row_display_idx.min(self.visible_rows.saturating_sub(1));
3041 self.table_state.select(Some(sel));
3042 }
3043
3044 pub fn half_page_down(&mut self) {
3045 let half = (self.visible_rows / 2).max(1) as i64;
3046 self.slide_table(half);
3047 }
3048
3049 pub fn half_page_up(&mut self) {
3050 if self.start_row == 0 {
3051 return;
3052 }
3053 let half = (self.visible_rows / 2).max(1) as i64;
3054 self.slide_table(-half);
3055 }
3056
3057 pub fn page_up(&mut self) {
3058 if self.start_row == 0 {
3059 return;
3060 }
3061 self.slide_table(-(self.visible_rows as i64));
3062 }
3063
3064 pub fn scroll_right(&mut self) {
3065 let max_scroll = self
3066 .column_order
3067 .len()
3068 .saturating_sub(self.locked_columns_count);
3069 if self.termcol_index < max_scroll.saturating_sub(1) {
3070 self.termcol_index += 1;
3071 self.collect();
3072 }
3073 }
3074
3075 pub fn scroll_left(&mut self) {
3076 if self.termcol_index > 0 {
3077 self.termcol_index -= 1;
3078 self.collect();
3079 }
3080 }
3081
3082 pub fn headers(&self) -> Vec<String> {
3083 self.column_order.clone()
3084 }
3085
3086 pub fn set_column_order(&mut self, order: Vec<String>) {
3087 self.column_order = order;
3088 self.buffered_start_row = 0;
3089 self.buffered_end_row = 0;
3090 self.buffered_df = None;
3091 self.collect();
3092 }
3093
3094 pub fn set_locked_columns(&mut self, count: usize) {
3095 self.locked_columns_count = count.min(self.column_order.len());
3096 self.buffered_start_row = 0;
3097 self.buffered_end_row = 0;
3098 self.buffered_df = None;
3099 self.collect();
3100 }
3101
3102 pub fn locked_columns_count(&self) -> usize {
3103 self.locked_columns_count
3104 }
3105
3106 pub fn get_filters(&self) -> &[FilterStatement] {
3108 &self.filters
3109 }
3110
3111 pub fn get_sort_columns(&self) -> &[String] {
3112 &self.sort_columns
3113 }
3114
3115 pub fn get_sort_ascending(&self) -> bool {
3116 self.sort_ascending
3117 }
3118
3119 pub fn get_column_order(&self) -> &[String] {
3120 &self.column_order
3121 }
3122
3123 pub fn get_active_query(&self) -> &str {
3124 &self.active_query
3125 }
3126
3127 pub fn get_active_sql_query(&self) -> &str {
3128 &self.active_sql_query
3129 }
3130
3131 pub fn get_active_fuzzy_query(&self) -> &str {
3132 &self.active_fuzzy_query
3133 }
3134
3135 pub fn last_pivot_spec(&self) -> Option<&PivotSpec> {
3136 self.last_pivot_spec.as_ref()
3137 }
3138
3139 pub fn last_melt_spec(&self) -> Option<&MeltSpec> {
3140 self.last_melt_spec.as_ref()
3141 }
3142
3143 pub fn is_grouped(&self) -> bool {
3144 self.schema
3145 .iter()
3146 .any(|(_, dtype)| matches!(dtype, DataType::List(_)))
3147 }
3148
3149 pub fn group_key_columns(&self) -> Vec<String> {
3150 self.schema
3151 .iter()
3152 .filter(|(_, dtype)| !matches!(dtype, DataType::List(_)))
3153 .map(|(name, _)| name.to_string())
3154 .collect()
3155 }
3156
3157 pub fn group_value_columns(&self) -> Vec<String> {
3158 self.schema
3159 .iter()
3160 .filter(|(_, dtype)| matches!(dtype, DataType::List(_)))
3161 .map(|(name, _)| name.to_string())
3162 .collect()
3163 }
3164
3165 pub fn buffered_memory_bytes(&self) -> Option<usize> {
3167 let locked = self
3168 .locked_df
3169 .as_ref()
3170 .map(|df| df.estimated_size())
3171 .unwrap_or(0);
3172 let scroll = self.df.as_ref().map(|df| df.estimated_size()).unwrap_or(0);
3173 if locked == 0 && scroll == 0 {
3174 None
3175 } else {
3176 Some(locked + scroll)
3177 }
3178 }
3179
3180 pub fn buffered_rows(&self) -> usize {
3182 self.buffered_end_row
3183 .saturating_sub(self.buffered_start_row)
3184 }
3185
3186 pub fn display_df(&self) -> Option<&DataFrame> {
3188 self.df.as_ref()
3189 }
3190
3191 pub fn display_slice_df(&self) -> Option<DataFrame> {
3193 let df = self.df.as_ref()?;
3194 let offset = self.start_row.saturating_sub(self.buffered_start_row);
3195 let slice_len = self.visible_rows.min(df.height().saturating_sub(offset));
3196 if offset < df.height() && slice_len > 0 {
3197 Some(df.slice(offset as i64, slice_len))
3198 } else {
3199 None
3200 }
3201 }
3202
3203 pub fn max_buffered_rows(&self) -> usize {
3205 self.max_buffered_rows
3206 }
3207
3208 pub fn max_buffered_mb(&self) -> usize {
3210 self.max_buffered_mb
3211 }
3212
3213 pub fn drill_down_into_group(&mut self, group_index: usize) -> Result<()> {
3214 if !self.is_grouped() {
3215 return Ok(());
3216 }
3217
3218 self.grouped_lf = Some(self.lf.clone());
3219
3220 let grouped_df = collect_lazy(self.lf.clone(), self.polars_streaming)?;
3221
3222 if group_index >= grouped_df.height() {
3223 return Err(color_eyre::eyre::eyre!("Group index out of bounds"));
3224 }
3225
3226 let key_columns = self.group_key_columns();
3227 let mut key_values = Vec::new();
3228 for col_name in &key_columns {
3229 let col = grouped_df.column(col_name)?;
3230 let value = col.get(group_index).map_err(|e| {
3231 color_eyre::eyre::eyre!(
3232 "Group index {} out of bounds for column {}: {}",
3233 group_index,
3234 col_name,
3235 e
3236 )
3237 })?;
3238 key_values.push(value.str_value().to_string());
3239 }
3240 self.drilled_down_group_key = Some(key_values.clone());
3241 self.drilled_down_group_key_columns = Some(key_columns.clone());
3242
3243 let value_columns = self.group_value_columns();
3244 if value_columns.is_empty() {
3245 return Err(color_eyre::eyre::eyre!("No value columns in grouped data"));
3246 }
3247
3248 let mut columns = Vec::new();
3249
3250 let first_value_col = grouped_df.column(&value_columns[0])?;
3251 let first_list_value = first_value_col.get(group_index).map_err(|e| {
3252 color_eyre::eyre::eyre!("Group index {} out of bounds: {}", group_index, e)
3253 })?;
3254 let row_count = if let AnyValue::List(list_series) = first_list_value {
3255 list_series.len()
3256 } else {
3257 0
3258 };
3259
3260 for col_name in &key_columns {
3261 let col = grouped_df.column(col_name)?;
3262 let value = col.get(group_index).map_err(|e| {
3263 color_eyre::eyre::eyre!(
3264 "Group index {} out of bounds for column {}: {}",
3265 group_index,
3266 col_name,
3267 e
3268 )
3269 })?;
3270 let constant_series = match value {
3271 AnyValue::Int32(v) => Series::new(col_name.as_str().into(), vec![v; row_count]),
3272 AnyValue::Int64(v) => Series::new(col_name.as_str().into(), vec![v; row_count]),
3273 AnyValue::UInt32(v) => Series::new(col_name.as_str().into(), vec![v; row_count]),
3274 AnyValue::UInt64(v) => Series::new(col_name.as_str().into(), vec![v; row_count]),
3275 AnyValue::Float32(v) => Series::new(col_name.as_str().into(), vec![v; row_count]),
3276 AnyValue::Float64(v) => Series::new(col_name.as_str().into(), vec![v; row_count]),
3277 AnyValue::String(v) => {
3278 Series::new(col_name.as_str().into(), vec![v.to_string(); row_count])
3279 }
3280 AnyValue::Boolean(v) => Series::new(col_name.as_str().into(), vec![v; row_count]),
3281 _ => {
3282 let str_val = value.str_value().to_string();
3283 Series::new(col_name.as_str().into(), vec![str_val; row_count])
3284 }
3285 };
3286 columns.push(constant_series.into());
3287 }
3288
3289 for col_name in &value_columns {
3290 let col = grouped_df.column(col_name)?;
3291 let value = col.get(group_index).map_err(|e| {
3292 color_eyre::eyre::eyre!(
3293 "Group index {} out of bounds for column {}: {}",
3294 group_index,
3295 col_name,
3296 e
3297 )
3298 })?;
3299 if let AnyValue::List(list_series) = value {
3300 let named_series = list_series.with_name(col_name.as_str().into());
3301 columns.push(named_series.into());
3302 }
3303 }
3304
3305 let group_df = DataFrame::new(columns)?;
3306
3307 self.invalidate_num_rows();
3308 self.lf = group_df.lazy();
3309 self.schema = self.lf.clone().collect_schema()?;
3310 self.column_order = self.schema.iter_names().map(|s| s.to_string()).collect();
3311 self.drilled_down_group_index = Some(group_index);
3312 self.start_row = 0;
3313 self.termcol_index = 0;
3314 self.locked_columns_count = 0;
3315 self.table_state.select(Some(0));
3316 self.collect();
3317
3318 Ok(())
3319 }
3320
3321 pub fn drill_up(&mut self) -> Result<()> {
3322 if let Some(grouped_lf) = self.grouped_lf.take() {
3323 self.invalidate_num_rows();
3324 self.lf = grouped_lf;
3325 self.schema = self.lf.clone().collect_schema()?;
3326 self.column_order = self.schema.iter_names().map(|s| s.to_string()).collect();
3327 self.drilled_down_group_index = None;
3328 self.drilled_down_group_key = None;
3329 self.drilled_down_group_key_columns = None;
3330 self.start_row = 0;
3331 self.termcol_index = 0;
3332 self.locked_columns_count = 0;
3333 self.table_state.select(Some(0));
3334 self.collect();
3335 Ok(())
3336 } else {
3337 Err(color_eyre::eyre::eyre!("Not in drill-down mode"))
3338 }
3339 }
3340
3341 pub fn get_analysis_dataframe(&self) -> Result<DataFrame> {
3342 Ok(collect_lazy(self.lf.clone(), self.polars_streaming)?)
3343 }
3344
3345 pub fn get_analysis_context(&self) -> crate::statistics::AnalysisContext {
3346 crate::statistics::AnalysisContext {
3347 has_query: !self.active_query.is_empty(),
3348 query: self.active_query.clone(),
3349 has_filters: !self.filters.is_empty(),
3350 filter_count: self.filters.len(),
3351 is_drilled_down: self.is_drilled_down(),
3352 group_key: self.drilled_down_group_key.clone(),
3353 group_columns: self.drilled_down_group_key_columns.clone(),
3354 }
3355 }
3356
3357 fn cast_temporal_index_columns_for_pivot(
3360 df: &DataFrame,
3361 index: &[String],
3362 ) -> Result<(DataFrame, Vec<(String, DataType)>)> {
3363 let mut out = df.clone();
3364 let mut restore = Vec::new();
3365 for name in index {
3366 if let Ok(s) = out.column(name) {
3367 let dtype = s.dtype();
3368 if matches!(dtype, DataType::Date | DataType::Datetime(_, _)) {
3369 restore.push((name.clone(), dtype.clone()));
3370 let casted = s.cast(&DataType::Int32)?;
3371 out.with_column(casted)?;
3372 }
3373 }
3374 }
3375 Ok((out, restore))
3376 }
3377
3378 fn restore_temporal_index_columns_after_pivot(
3380 pivoted: &mut DataFrame,
3381 restore: &[(String, DataType)],
3382 ) -> Result<()> {
3383 for (name, dtype) in restore {
3384 if let Ok(s) = pivoted.column(name) {
3385 let restored = s.cast(dtype)?;
3386 pivoted.with_column(restored)?;
3387 }
3388 }
3389 Ok(())
3390 }
3391
3392 pub fn pivot(&mut self, spec: &PivotSpec) -> Result<()> {
3397 let df = collect_lazy(self.lf.clone(), self.polars_streaming)?;
3398 let agg_expr = pivot_agg_expr(spec.aggregation)?;
3399 let index_str: Vec<&str> = spec.index.iter().map(|s| s.as_str()).collect();
3400 let index_opt = if index_str.is_empty() {
3401 None
3402 } else {
3403 Some(index_str)
3404 };
3405
3406 let (df_for_pivot, temporal_index_restore) = if self.workaround_pivot_date_index {
3407 let (df_w, restore) =
3408 Self::cast_temporal_index_columns_for_pivot(&df, spec.index.as_slice())?;
3409 (df_w, Some(restore))
3410 } else {
3411 (df.clone(), None)
3412 };
3413 let sort_new_columns = spec.sort_columns.unwrap_or(true);
3414 let mut pivoted = pivot_stable(
3415 &df_for_pivot,
3416 [spec.pivot_column.as_str()],
3417 index_opt,
3418 Some([spec.value_column.as_str()]),
3419 sort_new_columns,
3420 Some(agg_expr),
3421 None,
3422 )?;
3423 if let Some(restore) = &temporal_index_restore {
3424 Self::restore_temporal_index_columns_after_pivot(&mut pivoted, restore)?;
3425 }
3426
3427 self.last_pivot_spec = Some(spec.clone());
3428 self.last_melt_spec = None;
3429 self.replace_lf_after_reshape(pivoted.lazy())?;
3430 Ok(())
3431 }
3432
3433 pub fn melt(&mut self, spec: &MeltSpec) -> Result<()> {
3435 let on = cols(spec.value_columns.iter().map(|s| s.as_str()));
3436 let index = cols(spec.index.iter().map(|s| s.as_str()));
3437 let args = UnpivotArgsDSL {
3438 on,
3439 index,
3440 variable_name: Some(PlSmallStr::from(spec.variable_name.as_str())),
3441 value_name: Some(PlSmallStr::from(spec.value_name.as_str())),
3442 };
3443 let lf = self.lf.clone().unpivot(args);
3444 self.last_melt_spec = Some(spec.clone());
3445 self.last_pivot_spec = None;
3446 self.replace_lf_after_reshape(lf)?;
3447 Ok(())
3448 }
3449
3450 fn replace_lf_after_reshape(&mut self, lf: LazyFrame) -> Result<()> {
3451 self.invalidate_num_rows();
3452 self.lf = lf;
3453 self.schema = self.lf.clone().collect_schema()?;
3454 self.column_order = self.schema.iter_names().map(|s| s.to_string()).collect();
3455 self.filters.clear();
3456 self.sort_columns.clear();
3457 self.active_query.clear();
3458 self.active_sql_query.clear();
3459 self.active_fuzzy_query.clear();
3460 self.error = None;
3461 self.df = None;
3462 self.locked_df = None;
3463 self.grouped_lf = None;
3464 self.drilled_down_group_index = None;
3465 self.drilled_down_group_key = None;
3466 self.drilled_down_group_key_columns = None;
3467 self.start_row = 0;
3468 self.termcol_index = 0;
3469 self.locked_columns_count = 0;
3470 self.buffered_start_row = 0;
3471 self.buffered_end_row = 0;
3472 self.buffered_df = None;
3473 self.table_state.select(Some(0));
3474 self.collect();
3475 Ok(())
3476 }
3477
3478 pub fn is_drilled_down(&self) -> bool {
3479 self.drilled_down_group_index.is_some()
3480 }
3481
3482 fn apply_transformations(&mut self) {
3483 let mut lf = self.lf.clone();
3484 let mut final_expr: Option<Expr> = None;
3485
3486 for filter in &self.filters {
3487 let col_expr = col(&filter.column);
3488 let val_lit = if let Some(dtype) = self.schema.get(&filter.column) {
3489 match dtype {
3490 DataType::Float32 | DataType::Float64 => filter
3491 .value
3492 .parse::<f64>()
3493 .map(lit)
3494 .unwrap_or_else(|_| lit(filter.value.as_str())),
3495 DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => filter
3496 .value
3497 .parse::<i64>()
3498 .map(lit)
3499 .unwrap_or_else(|_| lit(filter.value.as_str())),
3500 DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 => {
3501 filter
3502 .value
3503 .parse::<u64>()
3504 .map(lit)
3505 .unwrap_or_else(|_| lit(filter.value.as_str()))
3506 }
3507 DataType::Boolean => filter
3508 .value
3509 .parse::<bool>()
3510 .map(lit)
3511 .unwrap_or_else(|_| lit(filter.value.as_str())),
3512 _ => lit(filter.value.as_str()),
3513 }
3514 } else {
3515 lit(filter.value.as_str())
3516 };
3517
3518 let op_expr = match filter.operator {
3519 FilterOperator::Eq => col_expr.eq(val_lit),
3520 FilterOperator::NotEq => col_expr.neq(val_lit),
3521 FilterOperator::Gt => col_expr.gt(val_lit),
3522 FilterOperator::Lt => col_expr.lt(val_lit),
3523 FilterOperator::GtEq => col_expr.gt_eq(val_lit),
3524 FilterOperator::LtEq => col_expr.lt_eq(val_lit),
3525 FilterOperator::Contains => {
3526 let val = filter.value.clone();
3527 col_expr.str().contains_literal(lit(val))
3528 }
3529 FilterOperator::NotContains => {
3530 let val = filter.value.clone();
3531 col_expr.str().contains_literal(lit(val)).not()
3532 }
3533 };
3534
3535 if let Some(current) = final_expr {
3536 final_expr = Some(match filter.logical_op {
3537 LogicalOperator::And => current.and(op_expr),
3538 LogicalOperator::Or => current.or(op_expr),
3539 });
3540 } else {
3541 final_expr = Some(op_expr);
3542 }
3543 }
3544
3545 if let Some(e) = final_expr {
3546 lf = lf.filter(e);
3547 }
3548
3549 if !self.sort_columns.is_empty() {
3550 let options = SortMultipleOptions {
3551 descending: self
3552 .sort_columns
3553 .iter()
3554 .map(|_| !self.sort_ascending)
3555 .collect(),
3556 ..Default::default()
3557 };
3558 lf = lf.sort_by_exprs(
3559 self.sort_columns.iter().map(col).collect::<Vec<_>>(),
3560 options,
3561 );
3562 } else if !self.sort_ascending {
3563 lf = lf.reverse();
3564 }
3565
3566 self.invalidate_num_rows();
3567 self.lf = lf;
3568 self.collect();
3569 }
3570
3571 pub fn sort(&mut self, columns: Vec<String>, ascending: bool) {
3572 self.sort_columns = columns;
3573 self.sort_ascending = ascending;
3574 self.buffered_start_row = 0;
3575 self.buffered_end_row = 0;
3576 self.buffered_df = None;
3577 self.apply_transformations();
3578 }
3579
3580 pub fn reverse(&mut self) {
3581 self.sort_ascending = !self.sort_ascending;
3582
3583 self.buffered_start_row = 0;
3584 self.buffered_end_row = 0;
3585 self.buffered_df = None;
3586
3587 if !self.sort_columns.is_empty() {
3588 let options = SortMultipleOptions {
3589 descending: self
3590 .sort_columns
3591 .iter()
3592 .map(|_| !self.sort_ascending)
3593 .collect(),
3594 ..Default::default()
3595 };
3596 self.invalidate_num_rows();
3597 self.lf = self.lf.clone().sort_by_exprs(
3598 self.sort_columns.iter().map(col).collect::<Vec<_>>(),
3599 options,
3600 );
3601 self.collect();
3602 } else {
3603 self.invalidate_num_rows();
3604 self.lf = self.lf.clone().reverse();
3605 self.collect();
3606 }
3607 }
3608
3609 pub fn filter(&mut self, filters: Vec<FilterStatement>) {
3610 self.filters = filters;
3611 self.buffered_start_row = 0;
3612 self.buffered_end_row = 0;
3613 self.buffered_df = None;
3614 self.apply_transformations();
3615 }
3616
3617 pub fn query(&mut self, query: String) {
3618 self.error = None;
3619
3620 let trimmed_query = query.trim();
3621 if trimmed_query.is_empty() {
3622 self.reset_lf_to_original();
3623 self.collect();
3624 return;
3625 }
3626
3627 match parse_query(&query) {
3628 Ok((cols, filter, group_by_cols, group_by_col_names)) => {
3629 let mut lf = self.original_lf.clone();
3630 let mut schema_opt: Option<Arc<Schema>> = None;
3631
3632 if let Some(f) = filter {
3634 lf = lf.filter(f);
3635 }
3636
3637 if !group_by_cols.is_empty() {
3638 if !cols.is_empty() {
3639 lf = lf.group_by(group_by_cols.clone()).agg(cols);
3640 } else {
3641 let schema = match lf.clone().collect_schema() {
3642 Ok(s) => s,
3643 Err(e) => {
3644 self.error = Some(e);
3645 return; }
3647 };
3648 let all_columns: Vec<String> =
3649 schema.iter_names().map(|s| s.to_string()).collect();
3650
3651 let mut agg_exprs = Vec::new();
3655 for col_name in &all_columns {
3656 if !group_by_col_names.contains(col_name) {
3657 agg_exprs.push(col(col_name));
3658 }
3659 }
3660
3661 lf = lf.group_by(group_by_cols.clone()).agg(agg_exprs);
3662 }
3663 let schema = match lf.collect_schema() {
3666 Ok(s) => s,
3667 Err(e) => {
3668 self.error = Some(e);
3669 return;
3670 }
3671 };
3672 schema_opt = Some(schema.clone());
3673 let sort_exprs: Vec<Expr> = schema
3674 .iter_names()
3675 .take(group_by_cols.len())
3676 .map(|n| col(n.as_str()))
3677 .collect();
3678 lf = lf.sort_by_exprs(sort_exprs, Default::default());
3679 } else if !cols.is_empty() {
3680 lf = lf.select(cols);
3681 }
3682
3683 let schema = match schema_opt {
3684 Some(s) => s,
3685 None => match lf.collect_schema() {
3686 Ok(s) => s,
3687 Err(e) => {
3688 self.error = Some(e);
3689 return;
3690 }
3691 },
3692 };
3693
3694 self.schema = schema;
3695 self.invalidate_num_rows();
3696 self.lf = lf;
3697 self.column_order = self.schema.iter_names().map(|s| s.to_string()).collect();
3698
3699 if !group_by_col_names.is_empty() {
3702 let mut locked_count = 0;
3705 for col_name in &self.column_order {
3706 if group_by_col_names.contains(col_name) {
3707 locked_count += 1;
3708 } else {
3709 break;
3711 }
3712 }
3713 self.locked_columns_count = locked_count;
3714 } else {
3715 self.locked_columns_count = 0;
3716 }
3717
3718 self.filters.clear();
3720 self.sort_columns.clear();
3721 self.sort_ascending = true;
3722 self.start_row = 0;
3723 self.termcol_index = 0;
3724 self.active_query = query;
3725 self.buffered_start_row = 0;
3726 self.buffered_end_row = 0;
3727 self.buffered_df = None;
3728 self.drilled_down_group_index = None;
3730 self.drilled_down_group_key = None;
3731 self.drilled_down_group_key_columns = None;
3732 self.grouped_lf = None;
3733 self.table_state.select(Some(0));
3735 self.collect();
3738 if self.num_rows > 0 {
3741 self.start_row = 0;
3742 }
3743 }
3744 Err(e) => {
3745 self.error = Some(PolarsError::ComputeError(e.into()));
3747 }
3748 }
3749 }
3750
3751 pub fn sql_query(&mut self, sql: String) {
3754 self.error = None;
3755 let trimmed = sql.trim();
3756 if trimmed.is_empty() {
3757 self.reset_lf_to_original();
3758 return;
3759 }
3760
3761 #[cfg(feature = "sql")]
3762 {
3763 use polars_sql::SQLContext;
3764 let mut ctx = SQLContext::new();
3765 ctx.register("df", self.lf.clone());
3766 match ctx.execute(trimmed) {
3767 Ok(result_lf) => {
3768 let schema = match result_lf.clone().collect_schema() {
3769 Ok(s) => s,
3770 Err(e) => {
3771 self.error = Some(e);
3772 return;
3773 }
3774 };
3775 self.schema = schema;
3776 self.invalidate_num_rows();
3777 self.lf = result_lf;
3778 self.column_order = self.schema.iter_names().map(|s| s.to_string()).collect();
3779 self.active_sql_query = sql;
3780 self.locked_columns_count = 0;
3781 self.filters.clear();
3782 self.sort_columns.clear();
3783 self.sort_ascending = true;
3784 self.start_row = 0;
3785 self.termcol_index = 0;
3786 self.drilled_down_group_index = None;
3787 self.drilled_down_group_key = None;
3788 self.drilled_down_group_key_columns = None;
3789 self.grouped_lf = None;
3790 self.buffered_start_row = 0;
3791 self.buffered_end_row = 0;
3792 self.buffered_df = None;
3793 self.table_state.select(Some(0));
3794 }
3795 Err(e) => {
3796 self.error = Some(e);
3797 }
3798 }
3799 }
3800
3801 #[cfg(not(feature = "sql"))]
3802 {
3803 self.error = Some(PolarsError::ComputeError(
3804 format!("SQL support not compiled in (build with --features sql)").into(),
3805 ));
3806 }
3807 }
3808
3809 pub fn fuzzy_search(&mut self, query: String) {
3813 self.error = None;
3814 let trimmed = query.trim();
3815 if trimmed.is_empty() {
3816 self.reset_lf_to_original();
3817 self.collect();
3818 return;
3819 }
3820 let string_cols: Vec<String> = self
3821 .schema
3822 .iter()
3823 .filter(|(_, dtype)| dtype.is_string())
3824 .map(|(name, _)| name.to_string())
3825 .collect();
3826 if string_cols.is_empty() {
3827 self.error = Some(PolarsError::ComputeError(
3828 "Fuzzy search requires at least one string column".into(),
3829 ));
3830 return;
3831 }
3832 let tokens: Vec<&str> = trimmed
3833 .split_whitespace()
3834 .filter(|s| !s.is_empty())
3835 .collect();
3836 let token_exprs: Vec<Expr> = tokens
3837 .iter()
3838 .map(|token| {
3839 let pattern = fuzzy_token_regex(token);
3840 string_cols
3841 .iter()
3842 .map(|c| col(c.as_str()).str().contains(lit(pattern.as_str()), false))
3843 .reduce(|a, b| a.or(b))
3844 .unwrap()
3845 })
3846 .collect();
3847 let combined = token_exprs.into_iter().reduce(|a, b| a.and(b)).unwrap();
3848 self.lf = self.original_lf.clone().filter(combined);
3849 self.filters.clear();
3850 self.sort_columns.clear();
3851 self.active_query.clear();
3852 self.active_sql_query.clear();
3853 self.active_fuzzy_query = query;
3854 self.locked_columns_count = 0;
3856 self.start_row = 0;
3857 self.termcol_index = 0;
3858 self.drilled_down_group_index = None;
3859 self.drilled_down_group_key = None;
3860 self.drilled_down_group_key_columns = None;
3861 self.grouped_lf = None;
3862 self.buffered_start_row = 0;
3863 self.buffered_end_row = 0;
3864 self.buffered_df = None;
3865 self.table_state.select(Some(0));
3866 self.invalidate_num_rows();
3867 self.collect();
3868 }
3869}
3870
3871pub(crate) fn fuzzy_token_regex(token: &str) -> String {
3873 let inner: String =
3874 token
3875 .chars()
3876 .map(|c| regex::escape(&c.to_string()))
3877 .fold(String::new(), |mut s, e| {
3878 if !s.is_empty() {
3879 s.push_str(".*");
3880 }
3881 s.push_str(&e);
3882 s
3883 });
3884 format!("(?i).*{}.*", inner)
3885}
3886
3887pub struct DataTable {
3888 pub header_bg: Color,
3889 pub header_fg: Color,
3890 pub row_numbers_fg: Color,
3891 pub separator_fg: Color,
3892 pub table_cell_padding: u16,
3893 pub alternate_row_bg: Option<Color>,
3894 pub column_colors: bool,
3896 pub str_col: Option<Color>,
3897 pub int_col: Option<Color>,
3898 pub float_col: Option<Color>,
3899 pub bool_col: Option<Color>,
3900 pub temporal_col: Option<Color>,
3901}
3902
3903impl Default for DataTable {
3904 fn default() -> Self {
3905 Self {
3906 header_bg: Color::Indexed(236),
3907 header_fg: Color::White,
3908 row_numbers_fg: Color::DarkGray,
3909 separator_fg: Color::White,
3910 table_cell_padding: 1,
3911 alternate_row_bg: None,
3912 column_colors: false,
3913 str_col: None,
3914 int_col: None,
3915 float_col: None,
3916 bool_col: None,
3917 temporal_col: None,
3918 }
3919 }
3920}
3921
3922struct RowNumbersParams {
3924 start_row: usize,
3925 visible_rows: usize,
3926 num_rows: usize,
3927 row_start_index: usize,
3928 selected_row: Option<usize>,
3929}
3930
3931impl DataTable {
3932 pub fn new() -> Self {
3933 Self::default()
3934 }
3935
3936 pub fn with_colors(
3937 mut self,
3938 header_bg: Color,
3939 header_fg: Color,
3940 row_numbers_fg: Color,
3941 separator_fg: Color,
3942 ) -> Self {
3943 self.header_bg = header_bg;
3944 self.header_fg = header_fg;
3945 self.row_numbers_fg = row_numbers_fg;
3946 self.separator_fg = separator_fg;
3947 self
3948 }
3949
3950 pub fn with_cell_padding(mut self, padding: u16) -> Self {
3951 self.table_cell_padding = padding;
3952 self
3953 }
3954
3955 pub fn with_alternate_row_bg(mut self, color: Option<Color>) -> Self {
3956 self.alternate_row_bg = color;
3957 self
3958 }
3959
3960 pub fn with_column_type_colors(
3962 mut self,
3963 str_col: Color,
3964 int_col: Color,
3965 float_col: Color,
3966 bool_col: Color,
3967 temporal_col: Color,
3968 ) -> Self {
3969 self.column_colors = true;
3970 self.str_col = Some(str_col);
3971 self.int_col = Some(int_col);
3972 self.float_col = Some(float_col);
3973 self.bool_col = Some(bool_col);
3974 self.temporal_col = Some(temporal_col);
3975 self
3976 }
3977
3978 fn column_type_color(&self, dtype: &DataType) -> Option<Color> {
3980 if !self.column_colors {
3981 return None;
3982 }
3983 match dtype {
3984 DataType::String => self.str_col,
3985 DataType::Int8
3986 | DataType::Int16
3987 | DataType::Int32
3988 | DataType::Int64
3989 | DataType::UInt8
3990 | DataType::UInt16
3991 | DataType::UInt32
3992 | DataType::UInt64 => self.int_col,
3993 DataType::Float32 | DataType::Float64 => self.float_col,
3994 DataType::Boolean => self.bool_col,
3995 DataType::Date | DataType::Datetime(_, _) | DataType::Time | DataType::Duration(_) => {
3996 self.temporal_col
3997 }
3998 _ => None,
3999 }
4000 }
4001
4002 fn render_dataframe(
4003 &self,
4004 df: &DataFrame,
4005 area: Rect,
4006 buf: &mut Buffer,
4007 state: &mut TableState,
4008 _row_numbers: bool,
4009 _start_row_offset: usize,
4010 ) {
4011 let (height, cols) = df.shape();
4013
4014 let mut widths: Vec<u16> = df
4016 .get_column_names()
4017 .iter()
4018 .map(|name| name.chars().count() as u16)
4019 .collect();
4020
4021 let mut used_width = 0;
4022
4023 let mut rows: Vec<Vec<Cell>> = vec![vec![]; height];
4025 let mut visible_columns = 0;
4026
4027 let max_rows = height.min(if area.height > 1 {
4028 area.height as usize - 1
4029 } else {
4030 0
4031 });
4032
4033 for col_index in 0..cols {
4034 let mut max_len = widths[col_index];
4035 let col_data = &df[col_index];
4036 let col_color = self.column_type_color(col_data.dtype());
4037
4038 for (row_index, row) in rows.iter_mut().take(max_rows).enumerate() {
4039 let value = col_data.get(row_index).unwrap();
4040 let val_str: Cow<str> = if matches!(value, AnyValue::Null) {
4041 Cow::Borrowed("")
4042 } else {
4043 value.str_value()
4044 };
4045 let len = val_str.chars().count() as u16;
4046 max_len = max_len.max(len);
4047 let cell = match col_color {
4048 Some(c) => Cell::from(Line::from(Span::styled(
4049 val_str.into_owned(),
4050 Style::default().fg(c),
4051 ))),
4052 None => Cell::from(Line::from(val_str)),
4053 };
4054 row.push(cell);
4055 }
4056
4057 let overflows = (used_width + max_len) > area.width;
4059
4060 if overflows && col_data.dtype() == &DataType::String {
4061 let visible_width = area.width.saturating_sub(used_width);
4062 visible_columns += 1;
4063 widths[col_index] = visible_width;
4064 break;
4065 } else if !overflows {
4066 visible_columns += 1;
4067 widths[col_index] = max_len;
4068 used_width += max_len + self.table_cell_padding;
4069 } else {
4070 break;
4071 }
4072 }
4073
4074 widths.truncate(visible_columns);
4075 let rows: Vec<Row> = rows
4077 .into_iter()
4078 .enumerate()
4079 .map(|(row_index, mut row)| {
4080 row.truncate(visible_columns);
4081 let row_style = if row_index % 2 == 1 {
4082 self.alternate_row_bg
4083 .map(|c| Style::default().bg(c))
4084 .unwrap_or_default()
4085 } else {
4086 Style::default()
4087 };
4088 Row::new(row).style(row_style)
4089 })
4090 .collect();
4091
4092 let header_row_style = if self.header_bg == Color::Reset {
4093 Style::default().fg(self.header_fg)
4094 } else {
4095 Style::default().bg(self.header_bg).fg(self.header_fg)
4096 };
4097 let headers: Vec<Span> = df
4098 .get_column_names()
4099 .iter()
4100 .take(visible_columns)
4101 .map(|name| Span::styled(name.to_string(), Style::default()))
4102 .collect();
4103
4104 StatefulWidget::render(
4105 Table::new(rows, widths)
4106 .column_spacing(self.table_cell_padding)
4107 .header(Row::new(headers).style(header_row_style))
4108 .row_highlight_style(Style::default().add_modifier(Modifier::REVERSED)),
4109 area,
4110 buf,
4111 state,
4112 );
4113 }
4114
4115 fn render_row_numbers(&self, area: Rect, buf: &mut Buffer, params: RowNumbersParams) {
4116 let header_style = if self.header_bg == Color::Reset {
4118 Style::default().fg(self.header_fg)
4119 } else {
4120 Style::default().bg(self.header_bg).fg(self.header_fg)
4121 };
4122 let header_fill = " ".repeat(area.width as usize);
4123 Paragraph::new(header_fill).style(header_style).render(
4124 Rect {
4125 x: area.x,
4126 y: area.y,
4127 width: area.width,
4128 height: 1,
4129 },
4130 buf,
4131 );
4132
4133 let rows_to_render = params
4135 .visible_rows
4136 .min(params.num_rows.saturating_sub(params.start_row));
4137
4138 if rows_to_render == 0 {
4139 return;
4140 }
4141
4142 let max_row_num =
4144 params.start_row + rows_to_render.saturating_sub(1) + params.row_start_index;
4145 let max_width = max_row_num.to_string().len();
4146
4147 for row_idx in 0..rows_to_render.min(area.height.saturating_sub(1) as usize) {
4149 let row_num = params.start_row + row_idx + params.row_start_index;
4150 let row_num_text = row_num.to_string();
4151
4152 let padding = max_width.saturating_sub(row_num_text.len());
4154 let padded_text = format!("{}{}", " ".repeat(padding), row_num_text);
4155
4156 let is_selected = params.selected_row == Some(row_idx);
4160 let (fg, bg) = if is_selected {
4161 (
4162 Color::Reset,
4163 self.alternate_row_bg.filter(|_| row_idx % 2 == 1),
4164 )
4165 } else {
4166 (
4167 self.row_numbers_fg,
4168 self.alternate_row_bg.filter(|_| row_idx % 2 == 1),
4169 )
4170 };
4171 let row_num_style = match bg {
4172 Some(bg_color) => Style::default().fg(fg).bg(bg_color),
4173 None => Style::default().fg(fg),
4174 };
4175
4176 let y = area.y + row_idx as u16 + 1; if y < area.y + area.height {
4178 Paragraph::new(padded_text).style(row_num_style).render(
4179 Rect {
4180 x: area.x,
4181 y,
4182 width: area.width,
4183 height: 1,
4184 },
4185 buf,
4186 );
4187 }
4188 }
4189 }
4190}
4191
4192impl StatefulWidget for DataTable {
4193 type State = DataTableState;
4194
4195 fn render(self, area: Rect, buf: &mut Buffer, state: &mut Self::State) {
4196 state.visible_termcols = area.width as usize;
4197 let new_visible_rows = if area.height > 0 {
4198 (area.height - 1) as usize
4199 } else {
4200 0
4201 };
4202 let needs_collect = new_visible_rows != state.visible_rows;
4203 state.visible_rows = new_visible_rows;
4204
4205 if let Some(selected) = state.table_state.selected() {
4206 if selected >= state.visible_rows {
4207 state.table_state.select(Some(state.visible_rows - 1))
4208 }
4209 }
4210
4211 if needs_collect {
4212 state.collect();
4213 }
4214
4215 if let Some(error) = state.error.as_ref() {
4218 if !state.suppress_error_display {
4219 Paragraph::new(format!("Error: {}", user_message_from_polars(error)))
4220 .centered()
4221 .block(
4222 Block::default()
4223 .borders(Borders::NONE)
4224 .padding(Padding::top(area.height / 2)),
4225 )
4226 .wrap(ratatui::widgets::Wrap { trim: true })
4227 .render(area, buf);
4228 return;
4229 }
4230 }
4232
4233 let row_num_width = if state.row_numbers {
4235 let max_row_num = state.start_row + state.visible_rows.saturating_sub(1) + 1; max_row_num.to_string().len().max(1) as u16 + 1 } else {
4238 0
4239 };
4240
4241 let mut locked_width = row_num_width;
4243 if let Some(locked_df) = state.locked_df.as_ref() {
4244 let (_, cols) = locked_df.shape();
4245 for col_index in 0..cols {
4246 let col_name = locked_df.get_column_names()[col_index];
4247 let mut max_len = col_name.chars().count() as u16;
4248 let col_data = &locked_df[col_index];
4249 for row_index in 0..locked_df.height().min(state.visible_rows) {
4250 let value = col_data.get(row_index).unwrap();
4251 let val_str: Cow<str> = if matches!(value, AnyValue::Null) {
4252 Cow::Borrowed("")
4253 } else {
4254 value.str_value()
4255 };
4256 let len = val_str.chars().count() as u16;
4257 max_len = max_len.max(len);
4258 }
4259 locked_width += max_len + 1;
4260 }
4261 }
4262
4263 if locked_width > row_num_width && locked_width < area.width {
4265 let locked_area = Rect {
4266 x: area.x,
4267 y: area.y,
4268 width: locked_width,
4269 height: area.height,
4270 };
4271 let separator_x = locked_area.x + locked_area.width;
4272
4273 if state.row_numbers {
4275 let row_num_area = Rect {
4276 x: area.x,
4277 y: area.y,
4278 width: row_num_width,
4279 height: area.height,
4280 };
4281 self.render_row_numbers(
4282 row_num_area,
4283 buf,
4284 RowNumbersParams {
4285 start_row: state.start_row,
4286 visible_rows: state.visible_rows,
4287 num_rows: state.num_rows,
4288 row_start_index: state.row_start_index,
4289 selected_row: state.table_state.selected(),
4290 },
4291 );
4292 }
4293 let scrollable_area = Rect {
4294 x: separator_x + 1,
4295 y: area.y,
4296 width: area.width.saturating_sub(locked_width + 1),
4297 height: area.height,
4298 };
4299
4300 if let Some(locked_df) = state.locked_df.as_ref() {
4302 let adjusted_locked_area = if state.row_numbers {
4304 Rect {
4305 x: area.x + row_num_width,
4306 y: area.y,
4307 width: locked_width - row_num_width,
4308 height: area.height,
4309 }
4310 } else {
4311 locked_area
4312 };
4313
4314 let offset = state.start_row.saturating_sub(state.buffered_start_row);
4316 let slice_len = state
4317 .visible_rows
4318 .min(locked_df.height().saturating_sub(offset));
4319 if offset < locked_df.height() && slice_len > 0 {
4320 let sliced_df = locked_df.slice(offset as i64, slice_len);
4321 self.render_dataframe(
4322 &sliced_df,
4323 adjusted_locked_area,
4324 buf,
4325 &mut state.table_state,
4326 false,
4327 state.start_row,
4328 );
4329 }
4330 }
4331
4332 let separator_x_adjusted = if state.row_numbers {
4334 area.x + row_num_width + (locked_width - row_num_width)
4335 } else {
4336 separator_x
4337 };
4338 for y in area.y..area.y + area.height {
4339 let cell = &mut buf[(separator_x_adjusted, y)];
4340 cell.set_char('│');
4341 cell.set_style(Style::default().fg(self.separator_fg));
4342 }
4343
4344 let adjusted_scrollable_area = if state.row_numbers {
4346 Rect {
4347 x: separator_x_adjusted + 1,
4348 y: area.y,
4349 width: area.width.saturating_sub(locked_width + 1),
4350 height: area.height,
4351 }
4352 } else {
4353 scrollable_area
4354 };
4355
4356 if let Some(df) = state.df.as_ref() {
4358 let offset = state.start_row.saturating_sub(state.buffered_start_row);
4360 let slice_len = state.visible_rows.min(df.height().saturating_sub(offset));
4361 if offset < df.height() && slice_len > 0 {
4362 let sliced_df = df.slice(offset as i64, slice_len);
4363 self.render_dataframe(
4364 &sliced_df,
4365 adjusted_scrollable_area,
4366 buf,
4367 &mut state.table_state,
4368 false,
4369 state.start_row,
4370 );
4371 }
4372 }
4373 } else if let Some(df) = state.df.as_ref() {
4374 if state.row_numbers {
4377 let row_num_area = Rect {
4378 x: area.x,
4379 y: area.y,
4380 width: row_num_width,
4381 height: area.height,
4382 };
4383 self.render_row_numbers(
4384 row_num_area,
4385 buf,
4386 RowNumbersParams {
4387 start_row: state.start_row,
4388 visible_rows: state.visible_rows,
4389 num_rows: state.num_rows,
4390 row_start_index: state.row_start_index,
4391 selected_row: state.table_state.selected(),
4392 },
4393 );
4394
4395 let data_area = Rect {
4397 x: area.x + row_num_width,
4398 y: area.y,
4399 width: area.width.saturating_sub(row_num_width),
4400 height: area.height,
4401 };
4402
4403 let offset = state.start_row.saturating_sub(state.buffered_start_row);
4405 let slice_len = state.visible_rows.min(df.height().saturating_sub(offset));
4406 if offset < df.height() && slice_len > 0 {
4407 let sliced_df = df.slice(offset as i64, slice_len);
4408 self.render_dataframe(
4409 &sliced_df,
4410 data_area,
4411 buf,
4412 &mut state.table_state,
4413 false,
4414 state.start_row,
4415 );
4416 }
4417 } else {
4418 let offset = state.start_row.saturating_sub(state.buffered_start_row);
4420 let slice_len = state.visible_rows.min(df.height().saturating_sub(offset));
4421 if offset < df.height() && slice_len > 0 {
4422 let sliced_df = df.slice(offset as i64, slice_len);
4423 self.render_dataframe(
4424 &sliced_df,
4425 area,
4426 buf,
4427 &mut state.table_state,
4428 false,
4429 state.start_row,
4430 );
4431 }
4432 }
4433 } else if !state.column_order.is_empty() {
4434 let empty_columns: Vec<_> = state
4436 .column_order
4437 .iter()
4438 .map(|name| Series::new(name.as_str().into(), Vec::<String>::new()).into())
4439 .collect();
4440 if let Ok(empty_df) = DataFrame::new(empty_columns) {
4441 if state.row_numbers {
4442 let row_num_area = Rect {
4443 x: area.x,
4444 y: area.y,
4445 width: row_num_width,
4446 height: area.height,
4447 };
4448 self.render_row_numbers(
4449 row_num_area,
4450 buf,
4451 RowNumbersParams {
4452 start_row: 0,
4453 visible_rows: state.visible_rows,
4454 num_rows: 0,
4455 row_start_index: state.row_start_index,
4456 selected_row: None,
4457 },
4458 );
4459 let data_area = Rect {
4460 x: area.x + row_num_width,
4461 y: area.y,
4462 width: area.width.saturating_sub(row_num_width),
4463 height: area.height,
4464 };
4465 self.render_dataframe(
4466 &empty_df,
4467 data_area,
4468 buf,
4469 &mut state.table_state,
4470 false,
4471 0,
4472 );
4473 } else {
4474 self.render_dataframe(&empty_df, area, buf, &mut state.table_state, false, 0);
4475 }
4476 } else {
4477 Paragraph::new("No data").render(area, buf);
4478 }
4479 } else {
4480 Paragraph::new("No data").render(area, buf);
4482 }
4483 }
4484}
4485
4486#[cfg(test)]
4487mod tests {
4488 use super::*;
4489 use crate::filter_modal::{FilterOperator, FilterStatement, LogicalOperator};
4490 use crate::pivot_melt_modal::{MeltSpec, PivotAggregation, PivotSpec};
4491
4492 fn create_test_lf() -> LazyFrame {
4493 df! (
4494 "a" => &[1, 2, 3],
4495 "b" => &["x", "y", "z"]
4496 )
4497 .unwrap()
4498 .lazy()
4499 }
4500
4501 fn create_large_test_lf() -> LazyFrame {
4502 df! (
4503 "a" => (0..100).collect::<Vec<i32>>(),
4504 "b" => (0..100).map(|i| format!("text_{}", i)).collect::<Vec<String>>(),
4505 "c" => (0..100).map(|i| i % 3).collect::<Vec<i32>>(),
4506 "d" => (0..100).map(|i| i % 5).collect::<Vec<i32>>()
4507 )
4508 .unwrap()
4509 .lazy()
4510 }
4511
4512 fn pump_open_until_done(
4515 app: &mut crate::App,
4516 path: std::path::PathBuf,
4517 opts: crate::OpenOptions,
4518 ) -> bool {
4519 use crate::AppEvent;
4520 let mut next = Some(AppEvent::Open(vec![path], opts));
4521 let mut saw_crash = false;
4522 while let Some(ev) = next.take() {
4523 if matches!(ev, AppEvent::Crash(_)) {
4524 saw_crash = true;
4525 break;
4526 }
4527 next = app.event(&ev);
4528 }
4529 saw_crash
4530 }
4531
4532 #[test]
4536 fn test_infer_schema_length_csv_fails_with_short_inference() {
4537 use std::sync::mpsc;
4538
4539 let path = crate::tests::sample_data_dir().join("infer_schema_length_data.csv");
4540 let opts = crate::OpenOptions {
4541 infer_schema_length: Some(100),
4542 ..Default::default()
4543 };
4544
4545 let (tx, _rx) = mpsc::channel();
4546 let mut app = crate::App::new(tx);
4547
4548 assert!(
4549 pump_open_until_done(&mut app, path, opts),
4550 "expected Crash when loading with infer_schema_length=100 (N/A at row 101)"
4551 );
4552 }
4553
4554 #[test]
4555 fn test_infer_schema_length_csv_succeeds_with_longer_inference() {
4556 use std::sync::mpsc;
4557
4558 let path = crate::tests::sample_data_dir().join("infer_schema_length_data.csv");
4559 let opts = crate::OpenOptions {
4560 infer_schema_length: Some(101),
4561 ..Default::default()
4562 };
4563
4564 let (tx, _rx) = mpsc::channel();
4565 let mut app = crate::App::new(tx);
4566
4567 assert!(
4568 !pump_open_until_done(&mut app, path, opts),
4569 "load with infer_schema_length=101 should not crash"
4570 );
4571 let state = app.data_table_state.as_ref().unwrap();
4572 assert_eq!(state.schema.len(), 1);
4573 assert!(state.schema.contains("column"));
4574 assert_eq!(state.num_rows, 201);
4575 }
4576
4577 #[test]
4578 fn test_infer_schema_length_csv_succeeds_with_default() {
4579 use std::sync::mpsc;
4580
4581 let path = crate::tests::sample_data_dir().join("infer_schema_length_data.csv");
4582 let opts = crate::OpenOptions {
4583 infer_schema_length: Some(1000),
4584 ..Default::default()
4585 };
4586
4587 let (tx, _rx) = mpsc::channel();
4588 let mut app = crate::App::new(tx);
4589
4590 assert!(
4591 !pump_open_until_done(&mut app, path, opts),
4592 "load with infer_schema_length=1000 (default) should not crash"
4593 );
4594 let state = app.data_table_state.as_ref().unwrap();
4595 assert_eq!(state.schema.len(), 1);
4596 assert_eq!(state.num_rows, 201);
4597 }
4598
4599 #[test]
4600 fn test_from_csv() {
4601 let path = crate::tests::sample_data_dir().join("3-sfd-header.csv");
4604 let state = DataTableState::from_csv(&path, &Default::default()).unwrap(); assert_eq!(state.schema.len(), 6); }
4607
4608 #[test]
4609 fn test_from_csv_gzipped() {
4610 let path = crate::tests::sample_data_dir().join("mixed_types.csv.gz");
4613 let state = DataTableState::from_csv(&path, &Default::default()).unwrap(); assert_eq!(state.schema.len(), 6); }
4616
4617 #[test]
4618 fn test_from_parquet() {
4619 let path = crate::tests::sample_data_dir().join("people.parquet");
4621 let state = DataTableState::from_parquet(&path, None, None, None, None, false, 1).unwrap();
4622 assert!(!state.schema.is_empty());
4623 }
4624
4625 #[test]
4626 fn test_from_ipc() {
4627 use polars::prelude::IpcWriter;
4628 use std::io::BufWriter;
4629 let mut df = df!(
4630 "x" => &[1_i32, 2, 3],
4631 "y" => &["a", "b", "c"]
4632 )
4633 .unwrap();
4634 let dir = std::env::temp_dir();
4635 let path = dir.join("datui_test_ipc.arrow");
4636 let file = std::fs::File::create(&path).unwrap();
4637 let mut writer = BufWriter::new(file);
4638 IpcWriter::new(&mut writer).finish(&mut df).unwrap();
4639 drop(writer);
4640 let state = DataTableState::from_ipc(&path, None, None, None, None, false, 1).unwrap();
4641 assert_eq!(state.schema.len(), 2);
4642 assert!(state.schema.contains("x"));
4643 assert!(state.schema.contains("y"));
4644 let _ = std::fs::remove_file(&path);
4645 }
4646
4647 #[test]
4648 fn test_from_avro() {
4649 use polars::io::avro::AvroWriter;
4650 use std::io::BufWriter;
4651 let mut df = df!(
4652 "id" => &[1_i32, 2, 3],
4653 "name" => &["alice", "bob", "carol"]
4654 )
4655 .unwrap();
4656 let dir = std::env::temp_dir();
4657 let path = dir.join("datui_test_avro.avro");
4658 let file = std::fs::File::create(&path).unwrap();
4659 let mut writer = BufWriter::new(file);
4660 AvroWriter::new(&mut writer).finish(&mut df).unwrap();
4661 drop(writer);
4662 let state = DataTableState::from_avro(&path, None, None, None, None, false, 1).unwrap();
4663 assert_eq!(state.schema.len(), 2);
4664 assert!(state.schema.contains("id"));
4665 assert!(state.schema.contains("name"));
4666 let _ = std::fs::remove_file(&path);
4667 }
4668
4669 #[test]
4670 fn test_from_orc() {
4671 use arrow::array::{Int64Array, StringArray};
4672 use arrow::datatypes::{DataType, Field, Schema};
4673 use arrow::record_batch::RecordBatch;
4674 use orc_rust::ArrowWriterBuilder;
4675 use std::io::BufWriter;
4676 use std::sync::Arc;
4677
4678 let schema = Arc::new(Schema::new(vec![
4679 Field::new("id", DataType::Int64, false),
4680 Field::new("name", DataType::Utf8, false),
4681 ]));
4682 let id_array = Arc::new(Int64Array::from(vec![1_i64, 2, 3]));
4683 let name_array = Arc::new(StringArray::from(vec!["a", "b", "c"]));
4684 let batch = RecordBatch::try_new(schema.clone(), vec![id_array, name_array]).unwrap();
4685
4686 let dir = std::env::temp_dir();
4687 let path = dir.join("datui_test_orc.orc");
4688 let file = std::fs::File::create(&path).unwrap();
4689 let writer = BufWriter::new(file);
4690 let mut orc_writer = ArrowWriterBuilder::new(writer, schema).try_build().unwrap();
4691 orc_writer.write(&batch).unwrap();
4692 orc_writer.close().unwrap();
4693
4694 let state = DataTableState::from_orc(&path, None, None, None, None, false, 1).unwrap();
4695 assert_eq!(state.schema.len(), 2);
4696 assert!(state.schema.contains("id"));
4697 assert!(state.schema.contains("name"));
4698 let _ = std::fs::remove_file(&path);
4699 }
4700
4701 #[test]
4702 fn test_from_delimited_tsv_has_header() {
4703 let dir = std::env::temp_dir();
4704 let path = dir.join("datui_test_tsv_header.tsv");
4705 let content = "a\tb\tc\td\n1\t2\t3\t4\n5\t6\t7\t8\n";
4706 std::fs::write(&path, content).unwrap();
4707 let opts = OpenOptions {
4708 has_header: Some(true),
4709 ..Default::default()
4710 };
4711 let mut state = DataTableState::from_delimited(&path, b'\t', &opts).unwrap();
4712 state.collect();
4713 assert_eq!(state.schema.len(), 4);
4714 assert!(state.schema.contains("a"));
4715 assert!(state.schema.contains("b"));
4716 assert!(state.schema.contains("c"));
4717 assert!(state.schema.contains("d"));
4718 assert_eq!(state.num_rows, 2);
4719 let _ = std::fs::remove_file(&path);
4720 }
4721
4722 #[test]
4723 fn test_from_delimited_tsv_no_header() {
4724 let dir = std::env::temp_dir();
4725 let path = dir.join("datui_test_tsv_no_header.tsv");
4726 let content = "a\tb\tc\td\n1\t2\t3\t4\n5\t6\t7\t8\n";
4727 std::fs::write(&path, content).unwrap();
4728 let opts = OpenOptions {
4729 has_header: Some(false),
4730 ..Default::default()
4731 };
4732 let mut state = DataTableState::from_delimited(&path, b'\t', &opts).unwrap();
4733 state.collect();
4734 assert_eq!(state.schema.len(), 4);
4735 assert!(state.schema.contains("column_1"));
4736 assert!(state.schema.contains("column_2"));
4737 assert!(state.schema.contains("column_3"));
4738 assert!(state.schema.contains("column_4"));
4739 assert_eq!(state.num_rows, 3);
4740 let _ = std::fs::remove_file(&path);
4741 }
4742
4743 #[test]
4744 fn test_from_delimited_psv_no_header() {
4745 let dir = std::env::temp_dir();
4746 let path = dir.join("datui_test_psv_no_header.psv");
4747 let content = "x|y|z\n10|20|30\n40|50|60\n";
4748 std::fs::write(&path, content).unwrap();
4749 let opts = OpenOptions {
4750 has_header: Some(false),
4751 ..Default::default()
4752 };
4753 let mut state = DataTableState::from_delimited(&path, b'|', &opts).unwrap();
4754 state.collect();
4755 assert_eq!(state.schema.len(), 3);
4756 assert!(state.schema.contains("column_1"));
4757 assert!(state.schema.contains("column_2"));
4758 assert!(state.schema.contains("column_3"));
4759 assert_eq!(state.num_rows, 3);
4760 let _ = std::fs::remove_file(&path);
4761 }
4762
4763 #[test]
4764 fn test_filter() {
4765 let lf = create_test_lf();
4766 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4767 let filters = vec![FilterStatement {
4768 column: "a".to_string(),
4769 operator: FilterOperator::Gt,
4770 value: "2".to_string(),
4771 logical_op: LogicalOperator::And,
4772 }];
4773 state.filter(filters);
4774 let df = state.lf.clone().collect().unwrap();
4775 assert_eq!(df.shape().0, 1);
4776 assert_eq!(df.column("a").unwrap().get(0).unwrap(), AnyValue::Int32(3));
4777 }
4778
4779 #[test]
4780 fn test_sort() {
4781 let lf = create_test_lf();
4782 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4783 state.sort(vec!["a".to_string()], false);
4784 let df = state.lf.clone().collect().unwrap();
4785 assert_eq!(df.column("a").unwrap().get(0).unwrap(), AnyValue::Int32(3));
4786 }
4787
4788 #[test]
4789 fn test_query() {
4790 let lf = create_test_lf();
4791 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4792 state.query("select b where a = 2".to_string());
4793 let df = state.lf.clone().collect().unwrap();
4794 assert_eq!(df.shape(), (1, 1));
4795 assert_eq!(
4796 df.column("b").unwrap().get(0).unwrap(),
4797 AnyValue::String("y")
4798 );
4799 }
4800
4801 #[test]
4802 fn test_query_date_accessors() {
4803 use chrono::NaiveDate;
4804 let df = df!(
4805 "event_date" => [
4806 NaiveDate::from_ymd_opt(2024, 1, 15).unwrap(),
4807 NaiveDate::from_ymd_opt(2024, 6, 20).unwrap(),
4808 NaiveDate::from_ymd_opt(2024, 12, 31).unwrap(),
4809 ],
4810 "name" => &["a", "b", "c"],
4811 )
4812 .unwrap();
4813 let lf = df.lazy();
4814 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4815
4816 state.query("select name, year: event_date.year, month: event_date.month".to_string());
4818 assert!(
4819 state.error.is_none(),
4820 "query should succeed: {:?}",
4821 state.error
4822 );
4823 let df = state.lf.clone().collect().unwrap();
4824 assert_eq!(df.shape(), (3, 3));
4825 assert_eq!(
4826 df.column("year").unwrap().get(0).unwrap(),
4827 AnyValue::Int32(2024)
4828 );
4829 assert_eq!(
4830 df.column("month").unwrap().get(0).unwrap(),
4831 AnyValue::Int8(1)
4832 );
4833 assert_eq!(
4834 df.column("month").unwrap().get(1).unwrap(),
4835 AnyValue::Int8(6)
4836 );
4837
4838 state.query("select name, event_date where event_date.month = 12".to_string());
4840 assert!(
4841 state.error.is_none(),
4842 "filter should succeed: {:?}",
4843 state.error
4844 );
4845 let df = state.lf.clone().collect().unwrap();
4846 assert_eq!(df.height(), 1);
4847 assert_eq!(
4848 df.column("name").unwrap().get(0).unwrap(),
4849 AnyValue::String("c")
4850 );
4851
4852 state.query("select name, event_date where event_date.date > 2024.06.15".to_string());
4854 assert!(
4855 state.error.is_none(),
4856 "date literal filter should succeed: {:?}",
4857 state.error
4858 );
4859 let df = state.lf.clone().collect().unwrap();
4860 assert_eq!(
4861 df.height(),
4862 2,
4863 "2024-06-20 and 2024-12-31 are after 2024-06-15"
4864 );
4865
4866 state.query(
4868 "select name, upper_name: name.upper, name_len: name.len where name.ends_with[\"c\"]"
4869 .to_string(),
4870 );
4871 assert!(
4872 state.error.is_none(),
4873 "string accessors should succeed: {:?}",
4874 state.error
4875 );
4876 let df = state.lf.clone().collect().unwrap();
4877 assert_eq!(df.height(), 1, "only 'c' ends with 'c'");
4878 assert_eq!(
4879 df.column("upper_name").unwrap().get(0).unwrap(),
4880 AnyValue::String("C")
4881 );
4882
4883 state.query("select where event_date.date = 2020.01.01".to_string());
4885 assert!(state.error.is_none());
4886 assert_eq!(state.num_rows, 0);
4887 state.visible_rows = 10;
4888 state.collect();
4889 assert!(state.df.is_none(), "df must be cleared when num_rows is 0");
4890 assert!(
4891 state.locked_df.is_none(),
4892 "locked_df must be cleared when num_rows is 0"
4893 );
4894 }
4895
4896 #[test]
4897 fn test_select_next_previous() {
4898 let lf = create_large_test_lf();
4899 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4900 state.visible_rows = 10;
4901 state.table_state.select(Some(5));
4902
4903 state.select_next();
4904 assert_eq!(state.table_state.selected(), Some(6));
4905
4906 state.select_previous();
4907 assert_eq!(state.table_state.selected(), Some(5));
4908 }
4909
4910 #[test]
4911 fn test_page_up_down() {
4912 let lf = create_large_test_lf();
4913 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4914 state.visible_rows = 20;
4915 state.collect();
4916
4917 assert_eq!(state.start_row, 0);
4918 state.page_down();
4919 assert_eq!(state.start_row, 20);
4920 state.page_down();
4921 assert_eq!(state.start_row, 40);
4922 state.page_up();
4923 assert_eq!(state.start_row, 20);
4924 state.page_up();
4925 assert_eq!(state.start_row, 0);
4926 }
4927
4928 #[test]
4929 fn test_scroll_left_right() {
4930 let lf = create_large_test_lf();
4931 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4932 assert_eq!(state.termcol_index, 0);
4933 state.scroll_right();
4934 assert_eq!(state.termcol_index, 1);
4935 state.scroll_right();
4936 assert_eq!(state.termcol_index, 2);
4937 state.scroll_left();
4938 assert_eq!(state.termcol_index, 1);
4939 state.scroll_left();
4940 assert_eq!(state.termcol_index, 0);
4941 }
4942
4943 #[test]
4944 fn test_reverse() {
4945 let lf = create_test_lf();
4946 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4947 state.sort(vec!["a".to_string()], true);
4948 assert_eq!(
4949 state
4950 .lf
4951 .clone()
4952 .collect()
4953 .unwrap()
4954 .column("a")
4955 .unwrap()
4956 .get(0)
4957 .unwrap(),
4958 AnyValue::Int32(1)
4959 );
4960 state.reverse();
4961 assert_eq!(
4962 state
4963 .lf
4964 .clone()
4965 .collect()
4966 .unwrap()
4967 .column("a")
4968 .unwrap()
4969 .get(0)
4970 .unwrap(),
4971 AnyValue::Int32(3)
4972 );
4973 }
4974
4975 #[test]
4976 fn test_filter_multiple() {
4977 let lf = create_large_test_lf();
4978 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4979 let filters = vec![
4980 FilterStatement {
4981 column: "c".to_string(),
4982 operator: FilterOperator::Eq,
4983 value: "1".to_string(),
4984 logical_op: LogicalOperator::And,
4985 },
4986 FilterStatement {
4987 column: "d".to_string(),
4988 operator: FilterOperator::Eq,
4989 value: "2".to_string(),
4990 logical_op: LogicalOperator::And,
4991 },
4992 ];
4993 state.filter(filters);
4994 let df = state.lf.clone().collect().unwrap();
4995 assert_eq!(df.shape().0, 7);
4996 }
4997
4998 #[test]
4999 fn test_filter_and_sort() {
5000 let lf = create_large_test_lf();
5001 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
5002 let filters = vec![FilterStatement {
5003 column: "c".to_string(),
5004 operator: FilterOperator::Eq,
5005 value: "1".to_string(),
5006 logical_op: LogicalOperator::And,
5007 }];
5008 state.filter(filters);
5009 state.sort(vec!["a".to_string()], false);
5010 let df = state.lf.clone().collect().unwrap();
5011 assert_eq!(df.column("a").unwrap().get(0).unwrap(), AnyValue::Int32(97));
5012 }
5013
5014 fn create_pivot_long_lf() -> LazyFrame {
5017 let df = df!(
5018 "id" => &[1_i32, 1, 1, 2, 2, 2, 1, 2],
5019 "date" => &["d1", "d1", "d1", "d1", "d1", "d1", "d1", "d1"],
5020 "key" => &["A", "B", "C", "A", "B", "C", "A", "B"],
5021 "value" => &[10.0_f64, 20.0, 30.0, 40.0, 50.0, 60.0, 11.0, 51.0],
5022 )
5023 .unwrap();
5024 df.lazy()
5025 }
5026
5027 fn create_melt_wide_lf() -> LazyFrame {
5029 let df = df!(
5030 "id" => &[1_i32, 2, 3],
5031 "date" => &["d1", "d2", "d3"],
5032 "c1" => &[10.0_f64, 20.0, 30.0],
5033 "c2" => &[11.0, 21.0, 31.0],
5034 "c3" => &[12.0, 22.0, 32.0],
5035 )
5036 .unwrap();
5037 df.lazy()
5038 }
5039
5040 #[test]
5041 fn test_pivot_basic() {
5042 let lf = create_pivot_long_lf();
5043 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
5044 let spec = PivotSpec {
5045 index: vec!["id".to_string(), "date".to_string()],
5046 pivot_column: "key".to_string(),
5047 value_column: "value".to_string(),
5048 aggregation: PivotAggregation::Last,
5049 sort_columns: None,
5050 };
5051 state.pivot(&spec).unwrap();
5052 let df = state.lf.clone().collect().unwrap();
5053 let names: Vec<&str> = df.get_column_names().iter().map(|s| s.as_str()).collect();
5054 assert!(names.contains(&"id"));
5055 assert!(names.contains(&"date"));
5056 assert!(names.contains(&"A"));
5057 assert!(names.contains(&"B"));
5058 assert!(names.contains(&"C"));
5059 assert_eq!(df.height(), 2);
5060 }
5061
5062 #[test]
5063 fn test_pivot_aggregation_last() {
5064 let lf = create_pivot_long_lf();
5065 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
5066 let spec = PivotSpec {
5067 index: vec!["id".to_string(), "date".to_string()],
5068 pivot_column: "key".to_string(),
5069 value_column: "value".to_string(),
5070 aggregation: PivotAggregation::Last,
5071 sort_columns: None,
5072 };
5073 state.pivot(&spec).unwrap();
5074 let df = state.lf.clone().collect().unwrap();
5075 let a_col = df.column("A").unwrap();
5076 let row0 = a_col.get(0).unwrap();
5077 let row1 = a_col.get(1).unwrap();
5078 assert_eq!(row0, AnyValue::Float64(11.0));
5079 assert_eq!(row1, AnyValue::Float64(40.0));
5080 }
5081
5082 #[test]
5083 fn test_pivot_aggregation_first() {
5084 let lf = create_pivot_long_lf();
5085 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
5086 let spec = PivotSpec {
5087 index: vec!["id".to_string(), "date".to_string()],
5088 pivot_column: "key".to_string(),
5089 value_column: "value".to_string(),
5090 aggregation: PivotAggregation::First,
5091 sort_columns: None,
5092 };
5093 state.pivot(&spec).unwrap();
5094 let df = state.lf.clone().collect().unwrap();
5095 let a_col = df.column("A").unwrap();
5096 assert_eq!(a_col.get(0).unwrap(), AnyValue::Float64(10.0));
5097 assert_eq!(a_col.get(1).unwrap(), AnyValue::Float64(40.0));
5098 }
5099
5100 #[test]
5101 fn test_pivot_aggregation_min_max() {
5102 let lf = create_pivot_long_lf();
5103 let mut state_min = DataTableState::new(lf.clone(), None, None, None, None, true).unwrap();
5104 state_min
5105 .pivot(&PivotSpec {
5106 index: vec!["id".to_string(), "date".to_string()],
5107 pivot_column: "key".to_string(),
5108 value_column: "value".to_string(),
5109 aggregation: PivotAggregation::Min,
5110 sort_columns: None,
5111 })
5112 .unwrap();
5113 let df_min = state_min.lf.clone().collect().unwrap();
5114 assert_eq!(
5115 df_min.column("A").unwrap().get(0).unwrap(),
5116 AnyValue::Float64(10.0)
5117 );
5118
5119 let mut state_max = DataTableState::new(lf, None, None, None, None, true).unwrap();
5120 state_max
5121 .pivot(&PivotSpec {
5122 index: vec!["id".to_string(), "date".to_string()],
5123 pivot_column: "key".to_string(),
5124 value_column: "value".to_string(),
5125 aggregation: PivotAggregation::Max,
5126 sort_columns: None,
5127 })
5128 .unwrap();
5129 let df_max = state_max.lf.clone().collect().unwrap();
5130 assert_eq!(
5131 df_max.column("A").unwrap().get(0).unwrap(),
5132 AnyValue::Float64(11.0)
5133 );
5134 }
5135
5136 #[test]
5137 fn test_pivot_aggregation_avg_count() {
5138 let lf = create_pivot_long_lf();
5139 let mut state_avg = DataTableState::new(lf.clone(), None, None, None, None, true).unwrap();
5140 state_avg
5141 .pivot(&PivotSpec {
5142 index: vec!["id".to_string(), "date".to_string()],
5143 pivot_column: "key".to_string(),
5144 value_column: "value".to_string(),
5145 aggregation: PivotAggregation::Avg,
5146 sort_columns: None,
5147 })
5148 .unwrap();
5149 let df_avg = state_avg.lf.clone().collect().unwrap();
5150 let a = df_avg.column("A").unwrap().get(0).unwrap();
5151 if let AnyValue::Float64(x) = a {
5152 assert!((x - 10.5).abs() < 1e-6);
5153 } else {
5154 panic!("expected float");
5155 }
5156
5157 let mut state_count = DataTableState::new(lf, None, None, None, None, true).unwrap();
5158 state_count
5159 .pivot(&PivotSpec {
5160 index: vec!["id".to_string(), "date".to_string()],
5161 pivot_column: "key".to_string(),
5162 value_column: "value".to_string(),
5163 aggregation: PivotAggregation::Count,
5164 sort_columns: None,
5165 })
5166 .unwrap();
5167 let df_count = state_count.lf.clone().collect().unwrap();
5168 let a = df_count.column("A").unwrap().get(0).unwrap();
5169 assert_eq!(a, AnyValue::UInt32(2));
5170 }
5171
5172 #[test]
5173 fn test_pivot_string_first_last() {
5174 let df = df!(
5175 "id" => &[1_i32, 1, 2, 2],
5176 "key" => &["X", "Y", "X", "Y"],
5177 "value" => &["low", "mid", "high", "mid"],
5178 )
5179 .unwrap();
5180 let lf = df.lazy();
5181 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
5182 let spec = PivotSpec {
5183 index: vec!["id".to_string()],
5184 pivot_column: "key".to_string(),
5185 value_column: "value".to_string(),
5186 aggregation: PivotAggregation::Last,
5187 sort_columns: None,
5188 };
5189 state.pivot(&spec).unwrap();
5190 let out = state.lf.clone().collect().unwrap();
5191 assert_eq!(
5192 out.column("X").unwrap().get(0).unwrap(),
5193 AnyValue::String("low")
5194 );
5195 assert_eq!(
5196 out.column("Y").unwrap().get(0).unwrap(),
5197 AnyValue::String("mid")
5198 );
5199 }
5200
5201 #[test]
5202 fn test_melt_basic() {
5203 let lf = create_melt_wide_lf();
5204 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
5205 let spec = MeltSpec {
5206 index: vec!["id".to_string(), "date".to_string()],
5207 value_columns: vec!["c1".to_string(), "c2".to_string(), "c3".to_string()],
5208 variable_name: "variable".to_string(),
5209 value_name: "value".to_string(),
5210 };
5211 state.melt(&spec).unwrap();
5212 let df = state.lf.clone().collect().unwrap();
5213 assert_eq!(df.height(), 9);
5214 let names: Vec<&str> = df.get_column_names().iter().map(|s| s.as_str()).collect();
5215 assert!(names.contains(&"variable"));
5216 assert!(names.contains(&"value"));
5217 assert!(names.contains(&"id"));
5218 assert!(names.contains(&"date"));
5219 }
5220
5221 #[test]
5222 fn test_melt_all_except_index() {
5223 let lf = create_melt_wide_lf();
5224 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
5225 let spec = MeltSpec {
5226 index: vec!["id".to_string(), "date".to_string()],
5227 value_columns: vec!["c1".to_string(), "c2".to_string(), "c3".to_string()],
5228 variable_name: "var".to_string(),
5229 value_name: "val".to_string(),
5230 };
5231 state.melt(&spec).unwrap();
5232 let df = state.lf.clone().collect().unwrap();
5233 assert!(df.column("var").is_ok());
5234 assert!(df.column("val").is_ok());
5235 }
5236
5237 #[test]
5238 fn test_pivot_on_current_view_after_filter() {
5239 let lf = create_pivot_long_lf();
5240 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
5241 state.filter(vec![FilterStatement {
5242 column: "id".to_string(),
5243 operator: FilterOperator::Eq,
5244 value: "1".to_string(),
5245 logical_op: LogicalOperator::And,
5246 }]);
5247 let spec = PivotSpec {
5248 index: vec!["id".to_string(), "date".to_string()],
5249 pivot_column: "key".to_string(),
5250 value_column: "value".to_string(),
5251 aggregation: PivotAggregation::Last,
5252 sort_columns: None,
5253 };
5254 state.pivot(&spec).unwrap();
5255 let df = state.lf.clone().collect().unwrap();
5256 assert_eq!(df.height(), 1);
5257 let id_col = df.column("id").unwrap();
5258 assert_eq!(id_col.get(0).unwrap(), AnyValue::Int32(1));
5259 }
5260
5261 #[test]
5262 fn test_fuzzy_token_regex() {
5263 assert_eq!(fuzzy_token_regex("foo"), "(?i).*f.*o.*o.*");
5264 assert_eq!(fuzzy_token_regex("a"), "(?i).*a.*");
5265 let pat = fuzzy_token_regex("[");
5267 assert!(pat.contains("\\["));
5268 }
5269
5270 #[test]
5271 fn test_fuzzy_search() {
5272 crate::tests::ensure_sample_data();
5275 let path = crate::tests::sample_data_dir().join("3-sfd-header.csv");
5276 let mut state = DataTableState::from_csv(&path, &Default::default()).unwrap();
5277 state.visible_rows = 10;
5278 state.collect();
5279 let before = state.num_rows;
5280 state.fuzzy_search("string".to_string());
5281 assert!(state.error.is_none(), "{:?}", state.error);
5282 assert!(state.num_rows <= before, "fuzzy search should filter rows");
5283 state.fuzzy_search("".to_string());
5284 state.collect();
5285 assert_eq!(state.num_rows, before, "empty fuzzy search should reset");
5286 assert!(state.get_active_fuzzy_query().is_empty());
5287 }
5288
5289 #[test]
5290 fn test_fuzzy_search_regex_direct() {
5291 let lf = df!("name" => &["alice", "bob", "carol"]).unwrap().lazy();
5293 let pattern = fuzzy_token_regex("alice");
5294 let out = lf
5295 .filter(col("name").str().contains(lit(pattern.clone()), false))
5296 .collect()
5297 .unwrap();
5298 assert_eq!(out.height(), 1, "regex {:?} should match alice", pattern);
5299
5300 let lf2 = df!(
5302 "id" => &[1i32, 2, 3],
5303 "name" => &["alice", "bob", "carol"],
5304 "city" => &["NYC", "LA", "Boston"]
5305 )
5306 .unwrap()
5307 .lazy();
5308 let pat = fuzzy_token_regex("alice");
5309 let expr = col("name")
5310 .str()
5311 .contains(lit(pat.clone()), false)
5312 .or(col("city").str().contains(lit(pat), false));
5313 let out2 = lf2.clone().filter(expr).collect().unwrap();
5314 assert_eq!(out2.height(), 1);
5315
5316 let schema = lf2.clone().collect_schema().unwrap();
5318 let string_cols: Vec<String> = schema
5319 .iter()
5320 .filter(|(_, dtype)| dtype.is_string())
5321 .map(|(name, _)| name.to_string())
5322 .collect();
5323 assert!(
5324 !string_cols.is_empty(),
5325 "df! string cols should be detected"
5326 );
5327 let pattern = fuzzy_token_regex("alice");
5328 let token_expr = string_cols
5329 .iter()
5330 .map(|c| col(c.as_str()).str().contains(lit(pattern.clone()), false))
5331 .reduce(|a, b| a.or(b))
5332 .unwrap();
5333 let out3 = lf2.filter(token_expr).collect().unwrap();
5334 assert_eq!(
5335 out3.height(),
5336 1,
5337 "fuzzy_search-style filter should match 1 row"
5338 );
5339 }
5340
5341 #[test]
5342 fn test_fuzzy_search_no_string_columns() {
5343 let lf = df!("a" => &[1i32, 2, 3], "b" => &[10i64, 20, 30])
5344 .unwrap()
5345 .lazy();
5346 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
5347 state.fuzzy_search("x".to_string());
5348 assert!(state.error.is_some());
5349 }
5350
5351 #[test]
5354 fn test_by_query_result_sorted_by_group_columns() {
5355 let df = df!(
5357 "age_group" => &[3i64, 1, 5, 2, 4, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5],
5358 "team" => &[
5359 "Red", "Blue", "Green", "Red", "Blue", "Green", "Green", "Red", "Blue",
5360 "Green", "Red", "Blue", "Red", "Blue", "Green",
5361 ],
5362 "score" => &[50.0f64, 10.0, 90.0, 20.0, 30.0, 40.0, 60.0, 70.0, 80.0, 15.0, 25.0, 35.0, 45.0, 55.0, 65.0],
5363 )
5364 .unwrap();
5365 let lf = df.lazy();
5366 let options = crate::OpenOptions::default();
5367 let mut state = DataTableState::from_lazyframe(lf, &options).unwrap();
5368 state.query("select avg score by age_group, team".to_string());
5369 assert!(
5370 state.error.is_none(),
5371 "query should succeed: {:?}",
5372 state.error
5373 );
5374 let result = state.lf.collect().unwrap();
5375 let sorted = result
5377 .sort(
5378 ["age_group", "team"],
5379 SortMultipleOptions::default().with_order_descending(false),
5380 )
5381 .unwrap();
5382 assert_eq!(
5383 result, sorted,
5384 "by-query result must be sorted by (age_group, team)"
5385 );
5386 }
5387
5388 #[test]
5391 fn test_by_query_computed_group_key_sorted_by_result_column() {
5392 let df = df!(
5393 "x" => &[7.0f64, 12.0, 3.0, 22.0, 17.0, 8.0],
5394 "v" => &[1.0f64, 2.0, 3.0, 4.0, 5.0, 6.0],
5395 )
5396 .unwrap();
5397 let lf = df.lazy();
5398 let options = crate::OpenOptions::default();
5399 let mut state = DataTableState::from_lazyframe(lf, &options).unwrap();
5400 state.query("select sum v by bucket: 1+floor x % 3".to_string());
5402 assert!(
5403 state.error.is_none(),
5404 "query should succeed: {:?}",
5405 state.error
5406 );
5407 let result = state.lf.collect().unwrap();
5408 let bucket = result.column("bucket").unwrap();
5409 for i in 1..result.height() {
5411 let prev: i64 = bucket.get(i - 1).unwrap().try_extract().unwrap_or(0);
5412 let curr: i64 = bucket.get(i).unwrap().try_extract().unwrap_or(0);
5413 assert!(
5414 curr >= prev,
5415 "bucket column must be sorted: {} then {}",
5416 prev,
5417 curr
5418 );
5419 }
5420 }
5421}