1use color_eyre::Result;
2use std::borrow::Cow;
3use std::collections::HashSet;
4use std::sync::Arc;
5use std::{fs, fs::File, path::Path};
6
7use polars::io::HiveOptions;
8use polars::prelude::*;
9use ratatui::{
10 buffer::Buffer,
11 layout::Rect,
12 style::{Color, Modifier, Style},
13 text::{Line, Span},
14 widgets::{
15 Block, Borders, Cell, Padding, Paragraph, Row, StatefulWidget, Table, TableState, Widget,
16 },
17};
18
19use crate::error_display::user_message_from_polars;
20use crate::filter_modal::{FilterOperator, FilterStatement, LogicalOperator};
21use crate::pivot_melt_modal::{MeltSpec, PivotAggregation, PivotSpec};
22use crate::query::parse_query;
23use crate::statistics::collect_lazy;
24use crate::{CompressionFormat, OpenOptions};
25use polars::lazy::frame::pivot::pivot_stable;
26use std::io::{BufReader, Read};
27
28use calamine::{open_workbook_auto, Data, Reader};
29use chrono::{NaiveDate, NaiveDateTime, NaiveTime};
30use orc_rust::ArrowReaderBuilder;
31use tempfile::NamedTempFile;
32
33use arrow::array::types::{
34 Date32Type, Date64Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type,
35 TimestampMillisecondType, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
36};
37use arrow::array::{Array, AsArray};
38use arrow::record_batch::RecordBatch;
39
40fn pivot_agg_expr(agg: PivotAggregation) -> Result<Expr> {
41 let e = col(PlSmallStr::from_static(""));
42 let expr = match agg {
43 PivotAggregation::Last => e.last(),
44 PivotAggregation::First => e.first(),
45 PivotAggregation::Min => e.min(),
46 PivotAggregation::Max => e.max(),
47 PivotAggregation::Avg => e.mean(),
48 PivotAggregation::Med => e.median(),
49 PivotAggregation::Std => e.std(1),
50 PivotAggregation::Count => e.len(),
51 };
52 Ok(expr)
53}
54
55pub struct DataTableState {
56 pub lf: LazyFrame,
57 original_lf: LazyFrame,
58 df: Option<DataFrame>, locked_df: Option<DataFrame>, pub table_state: TableState,
61 pub start_row: usize,
62 pub visible_rows: usize,
63 pub termcol_index: usize,
64 pub visible_termcols: usize,
65 pub error: Option<PolarsError>,
66 pub suppress_error_display: bool, pub schema: Arc<Schema>,
68 pub num_rows: usize,
69 num_rows_valid: bool,
71 filters: Vec<FilterStatement>,
72 sort_columns: Vec<String>,
73 sort_ascending: bool,
74 pub active_query: String,
75 pub active_sql_query: String,
77 pub active_fuzzy_query: String,
79 column_order: Vec<String>, locked_columns_count: usize, grouped_lf: Option<LazyFrame>,
82 drilled_down_group_index: Option<usize>, pub drilled_down_group_key: Option<Vec<String>>, pub drilled_down_group_key_columns: Option<Vec<String>>, pages_lookahead: usize,
86 pages_lookback: usize,
87 max_buffered_rows: usize, max_buffered_mb: usize, buffered_start_row: usize,
90 buffered_end_row: usize,
91 buffered_df: Option<DataFrame>,
94 proximity_threshold: usize,
95 row_numbers: bool,
96 row_start_index: usize,
97 last_pivot_spec: Option<PivotSpec>,
99 last_melt_spec: Option<MeltSpec>,
101 pub partition_columns: Option<Vec<String>>,
103 decompress_temp_file: Option<NamedTempFile>,
105 pub polars_streaming: bool,
107}
108
109#[derive(Clone, Copy)]
111enum ExcelColType {
112 Int64,
113 Float64,
114 Boolean,
115 Utf8,
116 Date,
117 Datetime,
118}
119
120impl DataTableState {
121 pub fn new(
122 lf: LazyFrame,
123 pages_lookahead: Option<usize>,
124 pages_lookback: Option<usize>,
125 max_buffered_rows: Option<usize>,
126 max_buffered_mb: Option<usize>,
127 polars_streaming: bool,
128 ) -> Result<Self> {
129 let schema = lf.clone().collect_schema()?;
130 let column_order: Vec<String> = schema.iter_names().map(|s| s.to_string()).collect();
131 Ok(Self {
132 original_lf: lf.clone(),
133 lf,
134 df: None,
135 locked_df: None,
136 table_state: TableState::default(),
137 start_row: 0,
138 visible_rows: 0,
139 termcol_index: 0,
140 visible_termcols: 0,
141 error: None,
142 suppress_error_display: false,
143 schema,
144 num_rows: 0,
145 num_rows_valid: false,
146 filters: Vec::new(),
147 sort_columns: Vec::new(),
148 sort_ascending: true,
149 active_query: String::new(),
150 active_sql_query: String::new(),
151 active_fuzzy_query: String::new(),
152 column_order,
153 locked_columns_count: 0,
154 grouped_lf: None,
155 drilled_down_group_index: None,
156 drilled_down_group_key: None,
157 drilled_down_group_key_columns: None,
158 pages_lookahead: pages_lookahead.unwrap_or(3),
159 pages_lookback: pages_lookback.unwrap_or(3),
160 max_buffered_rows: max_buffered_rows.unwrap_or(100_000),
161 max_buffered_mb: max_buffered_mb.unwrap_or(512),
162 buffered_start_row: 0,
163 buffered_end_row: 0,
164 buffered_df: None,
165 proximity_threshold: 0, row_numbers: false, row_start_index: 1, last_pivot_spec: None,
169 last_melt_spec: None,
170 partition_columns: None,
171 decompress_temp_file: None,
172 polars_streaming,
173 })
174 }
175
176 pub fn from_lazyframe(lf: LazyFrame, options: &crate::OpenOptions) -> Result<Self> {
178 let mut state = Self::new(
179 lf,
180 options.pages_lookahead,
181 options.pages_lookback,
182 options.max_buffered_rows,
183 options.max_buffered_mb,
184 options.polars_streaming,
185 )?;
186 state.row_numbers = options.row_numbers;
187 state.row_start_index = options.row_start_index;
188 Ok(state)
189 }
190
191 pub fn from_schema_and_lazyframe(
195 schema: Arc<Schema>,
196 lf: LazyFrame,
197 options: &crate::OpenOptions,
198 partition_columns: Option<Vec<String>>,
199 ) -> Result<Self> {
200 let column_order: Vec<String> = if let Some(ref part) = partition_columns {
201 let part_set: HashSet<&str> = part.iter().map(String::as_str).collect();
202 let rest: Vec<String> = schema
203 .iter_names()
204 .map(|s| s.to_string())
205 .filter(|c| !part_set.contains(c.as_str()))
206 .collect();
207 part.iter().cloned().chain(rest).collect()
208 } else {
209 schema.iter_names().map(|s| s.to_string()).collect()
210 };
211 Ok(Self {
212 original_lf: lf.clone(),
213 lf,
214 df: None,
215 locked_df: None,
216 table_state: TableState::default(),
217 start_row: 0,
218 visible_rows: 0,
219 termcol_index: 0,
220 visible_termcols: 0,
221 error: None,
222 suppress_error_display: false,
223 schema,
224 num_rows: 0,
225 num_rows_valid: false,
226 filters: Vec::new(),
227 sort_columns: Vec::new(),
228 sort_ascending: true,
229 active_query: String::new(),
230 active_sql_query: String::new(),
231 active_fuzzy_query: String::new(),
232 column_order,
233 locked_columns_count: 0,
234 grouped_lf: None,
235 drilled_down_group_index: None,
236 drilled_down_group_key: None,
237 drilled_down_group_key_columns: None,
238 pages_lookahead: options.pages_lookahead.unwrap_or(3),
239 pages_lookback: options.pages_lookback.unwrap_or(3),
240 max_buffered_rows: options.max_buffered_rows.unwrap_or(100_000),
241 max_buffered_mb: options.max_buffered_mb.unwrap_or(512),
242 buffered_start_row: 0,
243 buffered_end_row: 0,
244 buffered_df: None,
245 proximity_threshold: 0,
246 row_numbers: options.row_numbers,
247 row_start_index: options.row_start_index,
248 last_pivot_spec: None,
249 last_melt_spec: None,
250 partition_columns,
251 decompress_temp_file: None,
252 polars_streaming: options.polars_streaming,
253 })
254 }
255
256 fn reset_lf_to_original(&mut self) {
261 self.invalidate_num_rows();
262 self.lf = self.original_lf.clone();
263 self.schema = self
264 .original_lf
265 .clone()
266 .collect_schema()
267 .unwrap_or_else(|_| Arc::new(Schema::with_capacity(0)));
268 self.column_order = self.schema.iter_names().map(|s| s.to_string()).collect();
269 self.active_query.clear();
270 self.active_sql_query.clear();
271 self.active_fuzzy_query.clear();
272 self.locked_columns_count = 0;
273 self.filters.clear();
274 self.sort_columns.clear();
275 self.sort_ascending = true;
276 self.start_row = 0;
277 self.termcol_index = 0;
278 self.drilled_down_group_index = None;
279 self.drilled_down_group_key = None;
280 self.drilled_down_group_key_columns = None;
281 self.grouped_lf = None;
282 self.buffered_start_row = 0;
283 self.buffered_end_row = 0;
284 self.buffered_df = None;
285 self.table_state.select(Some(0));
286 }
287
288 pub fn reset(&mut self) {
289 self.reset_lf_to_original();
290 self.error = None;
291 self.suppress_error_display = false;
292 self.last_pivot_spec = None;
293 self.last_melt_spec = None;
294 self.collect();
295 if self.num_rows > 0 {
296 self.start_row = 0;
297 }
298 }
299
300 pub fn from_parquet(
301 path: &Path,
302 pages_lookahead: Option<usize>,
303 pages_lookback: Option<usize>,
304 max_buffered_rows: Option<usize>,
305 max_buffered_mb: Option<usize>,
306 row_numbers: bool,
307 row_start_index: usize,
308 ) -> Result<Self> {
309 let path_str = path.as_os_str().to_string_lossy();
310 let is_glob = path_str.contains('*');
311 let pl_path = PlPath::Local(Arc::from(path));
312 let args = ScanArgsParquet {
313 glob: is_glob,
314 ..Default::default()
315 };
316 let lf = LazyFrame::scan_parquet(pl_path, args)?;
317 let mut state = Self::new(
318 lf,
319 pages_lookahead,
320 pages_lookback,
321 max_buffered_rows,
322 max_buffered_mb,
323 true,
324 )?;
325 state.row_numbers = row_numbers;
326 state.row_start_index = row_start_index;
327 Ok(state)
328 }
329
330 pub fn from_parquet_paths(
332 paths: &[impl AsRef<Path>],
333 pages_lookahead: Option<usize>,
334 pages_lookback: Option<usize>,
335 max_buffered_rows: Option<usize>,
336 max_buffered_mb: Option<usize>,
337 row_numbers: bool,
338 row_start_index: usize,
339 ) -> Result<Self> {
340 if paths.is_empty() {
341 return Err(color_eyre::eyre::eyre!("No paths provided"));
342 }
343 if paths.len() == 1 {
344 return Self::from_parquet(
345 paths[0].as_ref(),
346 pages_lookahead,
347 pages_lookback,
348 max_buffered_rows,
349 max_buffered_mb,
350 row_numbers,
351 row_start_index,
352 );
353 }
354 let mut lazy_frames = Vec::with_capacity(paths.len());
355 for p in paths {
356 let pl_path = PlPath::Local(Arc::from(p.as_ref()));
357 let lf = LazyFrame::scan_parquet(pl_path, Default::default())?;
358 lazy_frames.push(lf);
359 }
360 let lf = polars::prelude::concat(lazy_frames.as_slice(), Default::default())?;
361 let mut state = Self::new(
362 lf,
363 pages_lookahead,
364 pages_lookback,
365 max_buffered_rows,
366 max_buffered_mb,
367 true,
368 )?;
369 state.row_numbers = row_numbers;
370 state.row_start_index = row_start_index;
371 Ok(state)
372 }
373
374 pub fn from_ipc(
376 path: &Path,
377 pages_lookahead: Option<usize>,
378 pages_lookback: Option<usize>,
379 max_buffered_rows: Option<usize>,
380 max_buffered_mb: Option<usize>,
381 row_numbers: bool,
382 row_start_index: usize,
383 ) -> Result<Self> {
384 let pl_path = PlPath::Local(Arc::from(path));
385 let lf = LazyFrame::scan_ipc(pl_path, Default::default(), Default::default())?;
386 let mut state = Self::new(
387 lf,
388 pages_lookahead,
389 pages_lookback,
390 max_buffered_rows,
391 max_buffered_mb,
392 true,
393 )?;
394 state.row_numbers = row_numbers;
395 state.row_start_index = row_start_index;
396 Ok(state)
397 }
398
399 pub fn from_ipc_paths(
401 paths: &[impl AsRef<Path>],
402 pages_lookahead: Option<usize>,
403 pages_lookback: Option<usize>,
404 max_buffered_rows: Option<usize>,
405 max_buffered_mb: Option<usize>,
406 row_numbers: bool,
407 row_start_index: usize,
408 ) -> Result<Self> {
409 if paths.is_empty() {
410 return Err(color_eyre::eyre::eyre!("No paths provided"));
411 }
412 if paths.len() == 1 {
413 return Self::from_ipc(
414 paths[0].as_ref(),
415 pages_lookahead,
416 pages_lookback,
417 max_buffered_rows,
418 max_buffered_mb,
419 row_numbers,
420 row_start_index,
421 );
422 }
423 let mut lazy_frames = Vec::with_capacity(paths.len());
424 for p in paths {
425 let pl_path = PlPath::Local(Arc::from(p.as_ref()));
426 let lf = LazyFrame::scan_ipc(pl_path, Default::default(), Default::default())?;
427 lazy_frames.push(lf);
428 }
429 let lf = polars::prelude::concat(lazy_frames.as_slice(), Default::default())?;
430 let mut state = Self::new(
431 lf,
432 pages_lookahead,
433 pages_lookback,
434 max_buffered_rows,
435 max_buffered_mb,
436 true,
437 )?;
438 state.row_numbers = row_numbers;
439 state.row_start_index = row_start_index;
440 Ok(state)
441 }
442
443 pub fn from_avro(
445 path: &Path,
446 pages_lookahead: Option<usize>,
447 pages_lookback: Option<usize>,
448 max_buffered_rows: Option<usize>,
449 max_buffered_mb: Option<usize>,
450 row_numbers: bool,
451 row_start_index: usize,
452 ) -> Result<Self> {
453 let file = File::open(path)?;
454 let df = polars::io::avro::AvroReader::new(file).finish()?;
455 let lf = df.lazy();
456 let mut state = Self::new(
457 lf,
458 pages_lookahead,
459 pages_lookback,
460 max_buffered_rows,
461 max_buffered_mb,
462 true,
463 )?;
464 state.row_numbers = row_numbers;
465 state.row_start_index = row_start_index;
466 Ok(state)
467 }
468
469 pub fn from_avro_paths(
471 paths: &[impl AsRef<Path>],
472 pages_lookahead: Option<usize>,
473 pages_lookback: Option<usize>,
474 max_buffered_rows: Option<usize>,
475 max_buffered_mb: Option<usize>,
476 row_numbers: bool,
477 row_start_index: usize,
478 ) -> Result<Self> {
479 if paths.is_empty() {
480 return Err(color_eyre::eyre::eyre!("No paths provided"));
481 }
482 if paths.len() == 1 {
483 return Self::from_avro(
484 paths[0].as_ref(),
485 pages_lookahead,
486 pages_lookback,
487 max_buffered_rows,
488 max_buffered_mb,
489 row_numbers,
490 row_start_index,
491 );
492 }
493 let mut lazy_frames = Vec::with_capacity(paths.len());
494 for p in paths {
495 let file = File::open(p.as_ref())?;
496 let df = polars::io::avro::AvroReader::new(file).finish()?;
497 lazy_frames.push(df.lazy());
498 }
499 let lf = polars::prelude::concat(lazy_frames.as_slice(), Default::default())?;
500 let mut state = Self::new(
501 lf,
502 pages_lookahead,
503 pages_lookback,
504 max_buffered_rows,
505 max_buffered_mb,
506 true,
507 )?;
508 state.row_numbers = row_numbers;
509 state.row_start_index = row_start_index;
510 Ok(state)
511 }
512
513 #[allow(clippy::too_many_arguments)]
516 pub fn from_excel(
517 path: &Path,
518 pages_lookahead: Option<usize>,
519 pages_lookback: Option<usize>,
520 max_buffered_rows: Option<usize>,
521 max_buffered_mb: Option<usize>,
522 row_numbers: bool,
523 row_start_index: usize,
524 excel_sheet: Option<&str>,
525 ) -> Result<Self> {
526 let mut workbook =
527 open_workbook_auto(path).map_err(|e| color_eyre::eyre::eyre!("Excel: {}", e))?;
528 let sheet_names = workbook.sheet_names().to_vec();
529 if sheet_names.is_empty() {
530 return Err(color_eyre::eyre::eyre!("Excel file has no worksheets"));
531 }
532 let range = if let Some(sheet_sel) = excel_sheet {
533 if let Ok(idx) = sheet_sel.parse::<usize>() {
534 workbook
535 .worksheet_range_at(idx)
536 .ok_or_else(|| color_eyre::eyre::eyre!("Excel: no sheet at index {}", idx))?
537 .map_err(|e| color_eyre::eyre::eyre!("Excel: {}", e))?
538 } else {
539 workbook
540 .worksheet_range(sheet_sel)
541 .map_err(|e| color_eyre::eyre::eyre!("Excel: {}", e))?
542 }
543 } else {
544 workbook
545 .worksheet_range_at(0)
546 .ok_or_else(|| color_eyre::eyre::eyre!("Excel: no first sheet"))?
547 .map_err(|e| color_eyre::eyre::eyre!("Excel: {}", e))?
548 };
549 let rows: Vec<Vec<Data>> = range.rows().map(|r| r.to_vec()).collect();
550 if rows.is_empty() {
551 let empty_df = DataFrame::new(vec![])?;
552 let mut state = Self::new(
553 empty_df.lazy(),
554 pages_lookahead,
555 pages_lookback,
556 max_buffered_rows,
557 max_buffered_mb,
558 true,
559 )?;
560 state.row_numbers = row_numbers;
561 state.row_start_index = row_start_index;
562 return Ok(state);
563 }
564 let headers: Vec<String> = rows[0]
565 .iter()
566 .map(|c| calamine::DataType::as_string(c).unwrap_or_else(|| c.to_string()))
567 .collect();
568 let n_cols = headers.len();
569 let mut series_vec = Vec::with_capacity(n_cols);
570 for (col_idx, header) in headers.iter().enumerate() {
571 let col_cells: Vec<Option<&Data>> =
572 rows[1..].iter().map(|row| row.get(col_idx)).collect();
573 let inferred = Self::excel_infer_column_type(&col_cells);
574 let name = if header.is_empty() {
575 format!("column_{}", col_idx + 1)
576 } else {
577 header.clone()
578 };
579 let series = Self::excel_column_to_series(name.as_str(), &col_cells, inferred)?;
580 series_vec.push(series.into());
581 }
582 let df = DataFrame::new(series_vec)?;
583 let mut state = Self::new(
584 df.lazy(),
585 pages_lookahead,
586 pages_lookback,
587 max_buffered_rows,
588 max_buffered_mb,
589 true,
590 )?;
591 state.row_numbers = row_numbers;
592 state.row_start_index = row_start_index;
593 Ok(state)
594 }
595
596 fn excel_infer_column_type(cells: &[Option<&Data>]) -> ExcelColType {
599 use calamine::DataType as CalamineTrait;
600 let mut has_string = false;
601 let mut has_float = false;
602 let mut has_int = false;
603 let mut has_bool = false;
604 let mut has_datetime = false;
605 for cell in cells.iter().flatten() {
606 if CalamineTrait::is_string(*cell) {
607 has_string = true;
608 break;
609 }
610 if CalamineTrait::is_float(*cell)
611 || CalamineTrait::is_datetime(*cell)
612 || CalamineTrait::is_datetime_iso(*cell)
613 {
614 has_float = true;
615 }
616 if CalamineTrait::is_int(*cell) {
617 has_int = true;
618 }
619 if CalamineTrait::is_bool(*cell) {
620 has_bool = true;
621 }
622 if CalamineTrait::is_datetime(*cell) || CalamineTrait::is_datetime_iso(*cell) {
623 has_datetime = true;
624 }
625 }
626 if has_string {
627 let any_parsed = cells
628 .iter()
629 .flatten()
630 .any(|c| Self::excel_cell_to_naive_datetime(c).is_some());
631 let all_non_empty_parse = cells.iter().flatten().all(|c| {
632 CalamineTrait::is_empty(*c) || Self::excel_cell_to_naive_datetime(c).is_some()
633 });
634 if any_parsed && all_non_empty_parse {
635 if Self::excel_parsed_cells_all_midnight(cells) {
636 ExcelColType::Date
637 } else {
638 ExcelColType::Datetime
639 }
640 } else {
641 ExcelColType::Utf8
642 }
643 } else if has_int {
644 ExcelColType::Int64
645 } else if has_datetime {
646 if Self::excel_parsed_cells_all_midnight(cells) {
647 ExcelColType::Date
648 } else {
649 ExcelColType::Datetime
650 }
651 } else if has_float {
652 let all_whole = cells.iter().flatten().all(|cell| {
653 cell.as_f64()
654 .is_none_or(|f| f.is_finite() && (f - f.trunc()).abs() < 1e-10)
655 });
656 if all_whole {
657 ExcelColType::Int64
658 } else {
659 ExcelColType::Float64
660 }
661 } else if has_bool {
662 ExcelColType::Boolean
663 } else {
664 ExcelColType::Utf8
665 }
666 }
667
668 fn excel_parsed_cells_all_midnight(cells: &[Option<&Data>]) -> bool {
670 let midnight = NaiveTime::from_hms_opt(0, 0, 0).expect("valid time");
671 cells
672 .iter()
673 .flatten()
674 .filter_map(|c| Self::excel_cell_to_naive_datetime(c))
675 .all(|dt| dt.time() == midnight)
676 }
677
678 fn excel_cell_to_naive_datetime(cell: &Data) -> Option<NaiveDateTime> {
680 use calamine::DataType;
681 if let Some(dt) = cell.as_datetime() {
682 return Some(dt);
683 }
684 let s = cell.get_datetime_iso().or_else(|| cell.get_string())?;
685 Self::parse_naive_datetime_str(s)
686 }
687
688 fn parse_naive_datetime_str(s: &str) -> Option<NaiveDateTime> {
690 let s = s.trim();
691 if s.is_empty() {
692 return None;
693 }
694 const FORMATS: &[&str] = &[
695 "%Y-%m-%dT%H:%M:%S%.f",
696 "%Y-%m-%dT%H:%M:%S",
697 "%Y-%m-%d %H:%M:%S%.f",
698 "%Y-%m-%d %H:%M:%S",
699 "%Y-%m-%d",
700 ];
701 for fmt in FORMATS {
702 if let Ok(dt) = NaiveDateTime::parse_from_str(s, fmt) {
703 return Some(dt);
704 }
705 }
706 if let Ok(d) = NaiveDate::parse_from_str(s, "%Y-%m-%d") {
707 return Some(d.and_hms_opt(0, 0, 0).expect("midnight"));
708 }
709 None
710 }
711
712 fn excel_column_to_series(
714 name: &str,
715 cells: &[Option<&Data>],
716 col_type: ExcelColType,
717 ) -> Result<Series> {
718 use calamine::DataType as CalamineTrait;
719 use polars::datatypes::TimeUnit;
720 let series = match col_type {
721 ExcelColType::Int64 => {
722 let v: Vec<Option<i64>> = cells
723 .iter()
724 .map(|c| c.and_then(|cell| cell.as_i64()))
725 .collect();
726 Series::new(name.into(), v)
727 }
728 ExcelColType::Float64 => {
729 let v: Vec<Option<f64>> = cells
730 .iter()
731 .map(|c| c.and_then(|cell| cell.as_f64()))
732 .collect();
733 Series::new(name.into(), v)
734 }
735 ExcelColType::Boolean => {
736 let v: Vec<Option<bool>> = cells
737 .iter()
738 .map(|c| c.and_then(|cell| cell.get_bool()))
739 .collect();
740 Series::new(name.into(), v)
741 }
742 ExcelColType::Utf8 => {
743 let v: Vec<Option<String>> = cells
744 .iter()
745 .map(|c| c.and_then(|cell| cell.as_string()))
746 .collect();
747 Series::new(name.into(), v)
748 }
749 ExcelColType::Date => {
750 let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).expect("valid date");
751 let v: Vec<Option<i32>> = cells
752 .iter()
753 .map(|c| {
754 c.and_then(Self::excel_cell_to_naive_datetime)
755 .map(|dt| (dt.date() - epoch).num_days() as i32)
756 })
757 .collect();
758 Series::new(name.into(), v).cast(&DataType::Date)?
759 }
760 ExcelColType::Datetime => {
761 let v: Vec<Option<i64>> = cells
762 .iter()
763 .map(|c| {
764 c.and_then(Self::excel_cell_to_naive_datetime)
765 .map(|dt| dt.and_utc().timestamp_micros())
766 })
767 .collect();
768 Series::new(name.into(), v)
769 .cast(&DataType::Datetime(TimeUnit::Microseconds, None))?
770 }
771 };
772 Ok(series)
773 }
774
775 pub fn from_orc(
778 path: &Path,
779 pages_lookahead: Option<usize>,
780 pages_lookback: Option<usize>,
781 max_buffered_rows: Option<usize>,
782 max_buffered_mb: Option<usize>,
783 row_numbers: bool,
784 row_start_index: usize,
785 ) -> Result<Self> {
786 let file = File::open(path)?;
787 let reader = ArrowReaderBuilder::try_new(file)
788 .map_err(|e| color_eyre::eyre::eyre!("ORC: {}", e))?
789 .build();
790 let batches: Vec<RecordBatch> = reader
791 .collect::<std::result::Result<Vec<_>, _>>()
792 .map_err(|e| color_eyre::eyre::eyre!("ORC: {}", e))?;
793 let df = Self::arrow_record_batches_to_dataframe(&batches)?;
794 let lf = df.lazy();
795 let mut state = Self::new(
796 lf,
797 pages_lookahead,
798 pages_lookback,
799 max_buffered_rows,
800 max_buffered_mb,
801 true,
802 )?;
803 state.row_numbers = row_numbers;
804 state.row_start_index = row_start_index;
805 Ok(state)
806 }
807
808 pub fn from_orc_paths(
810 paths: &[impl AsRef<Path>],
811 pages_lookahead: Option<usize>,
812 pages_lookback: Option<usize>,
813 max_buffered_rows: Option<usize>,
814 max_buffered_mb: Option<usize>,
815 row_numbers: bool,
816 row_start_index: usize,
817 ) -> Result<Self> {
818 if paths.is_empty() {
819 return Err(color_eyre::eyre::eyre!("No paths provided"));
820 }
821 if paths.len() == 1 {
822 return Self::from_orc(
823 paths[0].as_ref(),
824 pages_lookahead,
825 pages_lookback,
826 max_buffered_rows,
827 max_buffered_mb,
828 row_numbers,
829 row_start_index,
830 );
831 }
832 let mut lazy_frames = Vec::with_capacity(paths.len());
833 for p in paths {
834 let file = File::open(p.as_ref())?;
835 let reader = ArrowReaderBuilder::try_new(file)
836 .map_err(|e| color_eyre::eyre::eyre!("ORC: {}", e))?
837 .build();
838 let batches: Vec<RecordBatch> = reader
839 .collect::<std::result::Result<Vec<_>, _>>()
840 .map_err(|e| color_eyre::eyre::eyre!("ORC: {}", e))?;
841 let df = Self::arrow_record_batches_to_dataframe(&batches)?;
842 lazy_frames.push(df.lazy());
843 }
844 let lf = polars::prelude::concat(lazy_frames.as_slice(), Default::default())?;
845 let mut state = Self::new(
846 lf,
847 pages_lookahead,
848 pages_lookback,
849 max_buffered_rows,
850 max_buffered_mb,
851 true,
852 )?;
853 state.row_numbers = row_numbers;
854 state.row_start_index = row_start_index;
855 Ok(state)
856 }
857
858 fn arrow_record_batches_to_dataframe(batches: &[RecordBatch]) -> Result<DataFrame> {
861 if batches.is_empty() {
862 return Ok(DataFrame::new(vec![])?);
863 }
864 let mut all_dfs = Vec::with_capacity(batches.len());
865 for batch in batches {
866 let n_cols = batch.num_columns();
867 let schema = batch.schema();
868 let mut series_vec = Vec::with_capacity(n_cols);
869 for (i, col) in batch.columns().iter().enumerate() {
870 let name = schema.field(i).name().as_str();
871 let s = Self::arrow_array_to_polars_series(name, col)?;
872 series_vec.push(s.into());
873 }
874 let df = DataFrame::new(series_vec)?;
875 all_dfs.push(df);
876 }
877 let mut out = all_dfs.remove(0);
878 for df in all_dfs {
879 out = out.vstack(&df)?;
880 }
881 Ok(out)
882 }
883
884 fn arrow_array_to_polars_series(name: &str, array: &dyn Array) -> Result<Series> {
885 use arrow::datatypes::DataType as ArrowDataType;
886 let len = array.len();
887 match array.data_type() {
888 ArrowDataType::Int8 => {
889 let a = array
890 .as_primitive_opt::<Int8Type>()
891 .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Int8 array"))?;
892 let v: Vec<Option<i8>> = (0..len)
893 .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
894 .collect();
895 Ok(Series::new(name.into(), v))
896 }
897 ArrowDataType::Int16 => {
898 let a = array
899 .as_primitive_opt::<Int16Type>()
900 .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Int16 array"))?;
901 let v: Vec<Option<i16>> = (0..len)
902 .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
903 .collect();
904 Ok(Series::new(name.into(), v))
905 }
906 ArrowDataType::Int32 => {
907 let a = array
908 .as_primitive_opt::<Int32Type>()
909 .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Int32 array"))?;
910 let v: Vec<Option<i32>> = (0..len)
911 .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
912 .collect();
913 Ok(Series::new(name.into(), v))
914 }
915 ArrowDataType::Int64 => {
916 let a = array
917 .as_primitive_opt::<Int64Type>()
918 .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Int64 array"))?;
919 let v: Vec<Option<i64>> = (0..len)
920 .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
921 .collect();
922 Ok(Series::new(name.into(), v))
923 }
924 ArrowDataType::UInt8 => {
925 let a = array
926 .as_primitive_opt::<UInt8Type>()
927 .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected UInt8 array"))?;
928 let v: Vec<Option<i64>> = (0..len)
929 .map(|i| {
930 if a.is_null(i) {
931 None
932 } else {
933 Some(a.value(i) as i64)
934 }
935 })
936 .collect();
937 Ok(Series::new(name.into(), v).cast(&DataType::UInt8)?)
938 }
939 ArrowDataType::UInt16 => {
940 let a = array
941 .as_primitive_opt::<UInt16Type>()
942 .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected UInt16 array"))?;
943 let v: Vec<Option<i64>> = (0..len)
944 .map(|i| {
945 if a.is_null(i) {
946 None
947 } else {
948 Some(a.value(i) as i64)
949 }
950 })
951 .collect();
952 Ok(Series::new(name.into(), v).cast(&DataType::UInt16)?)
953 }
954 ArrowDataType::UInt32 => {
955 let a = array
956 .as_primitive_opt::<UInt32Type>()
957 .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected UInt32 array"))?;
958 let v: Vec<Option<u32>> = (0..len)
959 .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
960 .collect();
961 Ok(Series::new(name.into(), v))
962 }
963 ArrowDataType::UInt64 => {
964 let a = array
965 .as_primitive_opt::<UInt64Type>()
966 .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected UInt64 array"))?;
967 let v: Vec<Option<u64>> = (0..len)
968 .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
969 .collect();
970 Ok(Series::new(name.into(), v))
971 }
972 ArrowDataType::Float32 => {
973 let a = array
974 .as_primitive_opt::<Float32Type>()
975 .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Float32 array"))?;
976 let v: Vec<Option<f32>> = (0..len)
977 .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
978 .collect();
979 Ok(Series::new(name.into(), v))
980 }
981 ArrowDataType::Float64 => {
982 let a = array
983 .as_primitive_opt::<Float64Type>()
984 .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Float64 array"))?;
985 let v: Vec<Option<f64>> = (0..len)
986 .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
987 .collect();
988 Ok(Series::new(name.into(), v))
989 }
990 ArrowDataType::Boolean => {
991 let a = array
992 .as_boolean_opt()
993 .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Boolean array"))?;
994 let v: Vec<Option<bool>> = (0..len)
995 .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
996 .collect();
997 Ok(Series::new(name.into(), v))
998 }
999 ArrowDataType::Utf8 => {
1000 let a = array
1001 .as_string_opt::<i32>()
1002 .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Utf8 array"))?;
1003 let v: Vec<Option<String>> = (0..len)
1004 .map(|i| {
1005 if a.is_null(i) {
1006 None
1007 } else {
1008 Some(a.value(i).to_string())
1009 }
1010 })
1011 .collect();
1012 Ok(Series::new(name.into(), v))
1013 }
1014 ArrowDataType::LargeUtf8 => {
1015 let a = array
1016 .as_string_opt::<i64>()
1017 .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected LargeUtf8 array"))?;
1018 let v: Vec<Option<String>> = (0..len)
1019 .map(|i| {
1020 if a.is_null(i) {
1021 None
1022 } else {
1023 Some(a.value(i).to_string())
1024 }
1025 })
1026 .collect();
1027 Ok(Series::new(name.into(), v))
1028 }
1029 ArrowDataType::Date32 => {
1030 let a = array
1031 .as_primitive_opt::<Date32Type>()
1032 .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Date32 array"))?;
1033 let v: Vec<Option<i32>> = (0..len)
1034 .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
1035 .collect();
1036 Ok(Series::new(name.into(), v))
1037 }
1038 ArrowDataType::Date64 => {
1039 let a = array
1040 .as_primitive_opt::<Date64Type>()
1041 .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Date64 array"))?;
1042 let v: Vec<Option<i64>> = (0..len)
1043 .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
1044 .collect();
1045 Ok(Series::new(name.into(), v))
1046 }
1047 ArrowDataType::Timestamp(_, _) => {
1048 let a = array
1049 .as_primitive_opt::<TimestampMillisecondType>()
1050 .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Timestamp array"))?;
1051 let v: Vec<Option<i64>> = (0..len)
1052 .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
1053 .collect();
1054 Ok(Series::new(name.into(), v))
1055 }
1056 other => Err(color_eyre::eyre::eyre!(
1057 "ORC: unsupported column type {:?} for column '{}'",
1058 other,
1059 name
1060 )),
1061 }
1062 }
1063
1064 pub fn scan_parquet_hive(path: &Path) -> Result<LazyFrame> {
1067 let path_str = path.as_os_str().to_string_lossy();
1068 let is_glob = path_str.contains('*');
1069 let pl_path = PlPath::Local(Arc::from(path));
1070 let args = ScanArgsParquet {
1071 hive_options: HiveOptions::new_enabled(),
1072 glob: is_glob,
1073 ..Default::default()
1074 };
1075 LazyFrame::scan_parquet(pl_path, args).map_err(Into::into)
1076 }
1077
1078 pub fn scan_parquet_hive_with_schema(path: &Path, schema: Arc<Schema>) -> Result<LazyFrame> {
1080 let path_str = path.as_os_str().to_string_lossy();
1081 let is_glob = path_str.contains('*');
1082 let pl_path = PlPath::Local(Arc::from(path));
1083 let args = ScanArgsParquet {
1084 schema: Some(schema),
1085 hive_options: HiveOptions::new_enabled(),
1086 glob: is_glob,
1087 ..Default::default()
1088 };
1089 LazyFrame::scan_parquet(pl_path, args).map_err(Into::into)
1090 }
1091
1092 fn first_parquet_file_in_hive_dir(path: &Path) -> Option<std::path::PathBuf> {
1095 const MAX_DEPTH: usize = 64;
1096 Self::first_parquet_file_spine(path, 0, MAX_DEPTH)
1097 }
1098
1099 fn first_parquet_file_spine(
1100 path: &Path,
1101 depth: usize,
1102 max_depth: usize,
1103 ) -> Option<std::path::PathBuf> {
1104 if depth >= max_depth {
1105 return None;
1106 }
1107 let entries = fs::read_dir(path).ok()?;
1108 let mut first_partition_child: Option<std::path::PathBuf> = None;
1109 for entry in entries.flatten() {
1110 let child = entry.path();
1111 if child.is_file() {
1112 if child
1113 .extension()
1114 .is_some_and(|e| e.eq_ignore_ascii_case("parquet"))
1115 {
1116 return Some(child);
1117 }
1118 } else if child.is_dir() {
1119 if let Some(name) = child.file_name().and_then(|n| n.to_str()) {
1120 if name.contains('=') && first_partition_child.is_none() {
1121 first_partition_child = Some(child);
1122 }
1123 }
1124 }
1125 }
1126 first_partition_child.and_then(|p| Self::first_parquet_file_spine(&p, depth + 1, max_depth))
1127 }
1128
1129 fn read_schema_from_single_parquet(path: &Path) -> Result<Arc<Schema>> {
1131 let file = File::open(path)?;
1132 let mut reader = ParquetReader::new(file);
1133 let arrow_schema = reader.schema()?;
1134 let schema = Schema::from_arrow_schema(arrow_schema.as_ref());
1135 Ok(Arc::new(schema))
1136 }
1137
1138 pub fn schema_from_one_hive_parquet(path: &Path) -> Result<(Arc<Schema>, Vec<String>)> {
1142 let partition_columns = Self::discover_hive_partition_columns(path);
1143 let one_file = Self::first_parquet_file_in_hive_dir(path)
1144 .ok_or_else(|| color_eyre::eyre::eyre!("No parquet file found in hive directory"))?;
1145 let file_schema = Self::read_schema_from_single_parquet(&one_file)?;
1146 let part_set: HashSet<&str> = partition_columns.iter().map(String::as_str).collect();
1147 let mut merged = Schema::with_capacity(partition_columns.len() + file_schema.len());
1148 for name in &partition_columns {
1149 merged.with_column(name.clone().into(), DataType::String);
1150 }
1151 for (name, dtype) in file_schema.iter() {
1152 if !part_set.contains(name.as_str()) {
1153 merged.with_column(name.clone(), dtype.clone());
1154 }
1155 }
1156 Ok((Arc::new(merged), partition_columns))
1157 }
1158
1159 pub fn discover_hive_partition_columns(path: &Path) -> Vec<String> {
1161 if path.is_dir() {
1162 Self::discover_partition_columns_from_path(path)
1163 } else {
1164 Self::discover_partition_columns_from_glob_pattern(path)
1165 }
1166 }
1167
1168 fn discover_partition_columns_from_path(path: &Path) -> Vec<String> {
1173 const MAX_PARTITION_DEPTH: usize = 64;
1174 let mut columns = Vec::<String>::new();
1175 let mut seen = HashSet::<String>::new();
1176 Self::discover_partition_columns_spine(
1177 path,
1178 &mut columns,
1179 &mut seen,
1180 0,
1181 MAX_PARTITION_DEPTH,
1182 );
1183 columns
1184 }
1185
1186 fn discover_partition_columns_spine(
1190 path: &Path,
1191 columns: &mut Vec<String>,
1192 seen: &mut HashSet<String>,
1193 depth: usize,
1194 max_depth: usize,
1195 ) {
1196 if depth >= max_depth {
1197 return;
1198 }
1199 let Ok(entries) = fs::read_dir(path) else {
1200 return;
1201 };
1202 let mut first_partition_child: Option<std::path::PathBuf> = None;
1203 for entry in entries.flatten() {
1204 let child = entry.path();
1205 if child.is_dir() {
1206 if let Some(name) = child.file_name().and_then(|n| n.to_str()) {
1207 if let Some((key, _)) = name.split_once('=') {
1208 if !key.is_empty() && seen.insert(key.to_string()) {
1209 columns.push(key.to_string());
1210 }
1211 if first_partition_child.is_none() {
1212 first_partition_child = Some(child);
1213 }
1214 break;
1215 }
1216 }
1217 }
1218 }
1219 if let Some(one) = first_partition_child {
1220 Self::discover_partition_columns_spine(&one, columns, seen, depth + 1, max_depth);
1221 }
1222 }
1223
1224 fn discover_partition_columns_from_glob_pattern(path: &Path) -> Vec<String> {
1226 let path_str = path.as_os_str().to_string_lossy();
1227 let mut columns = Vec::<String>::new();
1228 let mut seen = HashSet::<String>::new();
1229 for segment in path_str.split('/') {
1230 if let Some((key, rest)) = segment.split_once('=') {
1231 if !key.is_empty()
1232 && (rest == "*" || !rest.contains('*'))
1233 && seen.insert(key.to_string())
1234 {
1235 columns.push(key.to_string());
1236 }
1237 }
1238 }
1239 columns
1240 }
1241
1242 pub fn from_parquet_hive(
1251 path: &Path,
1252 pages_lookahead: Option<usize>,
1253 pages_lookback: Option<usize>,
1254 max_buffered_rows: Option<usize>,
1255 max_buffered_mb: Option<usize>,
1256 row_numbers: bool,
1257 row_start_index: usize,
1258 ) -> Result<Self> {
1259 let path_str = path.as_os_str().to_string_lossy();
1260 let is_glob = path_str.contains('*');
1261 let pl_path = PlPath::Local(Arc::from(path));
1262 let args = ScanArgsParquet {
1263 hive_options: HiveOptions::new_enabled(),
1264 glob: is_glob,
1265 ..Default::default()
1266 };
1267 let mut lf = LazyFrame::scan_parquet(pl_path, args)?;
1268 let schema = lf.collect_schema()?;
1269
1270 let mut discovered = if path.is_dir() {
1271 Self::discover_partition_columns_from_path(path)
1272 } else {
1273 Self::discover_partition_columns_from_glob_pattern(path)
1274 };
1275
1276 if discovered.is_empty() {
1279 let mut dir = path;
1280 while !dir.is_dir() {
1281 match dir.parent() {
1282 Some(p) => dir = p,
1283 None => break,
1284 }
1285 }
1286 if dir.is_dir() {
1287 discovered = Self::discover_partition_columns_from_path(dir);
1288 }
1289 }
1290
1291 let partition_columns: Vec<String> = discovered
1292 .into_iter()
1293 .filter(|c| schema.contains(c.as_str()))
1294 .collect();
1295
1296 let new_order: Vec<String> = if partition_columns.is_empty() {
1297 schema.iter_names().map(|s| s.to_string()).collect()
1298 } else {
1299 let part_set: HashSet<&str> = partition_columns.iter().map(String::as_str).collect();
1300 let all_names: Vec<String> = schema.iter_names().map(|s| s.to_string()).collect();
1301 let rest: Vec<String> = all_names
1302 .into_iter()
1303 .filter(|c| !part_set.contains(c.as_str()))
1304 .collect();
1305 partition_columns.iter().cloned().chain(rest).collect()
1306 };
1307
1308 if !partition_columns.is_empty() {
1309 let exprs: Vec<Expr> = new_order.iter().map(|s| col(s.as_str())).collect();
1310 lf = lf.select(exprs);
1311 }
1312
1313 let mut state = Self::new(
1314 lf,
1315 pages_lookahead,
1316 pages_lookback,
1317 max_buffered_rows,
1318 max_buffered_mb,
1319 true,
1320 )?;
1321 state.row_numbers = row_numbers;
1322 state.row_start_index = row_start_index;
1323 state.partition_columns = if partition_columns.is_empty() {
1324 None
1325 } else {
1326 Some(partition_columns)
1327 };
1328 state.set_column_order(new_order);
1330 Ok(state)
1331 }
1332
1333 pub fn set_row_numbers(&mut self, enabled: bool) {
1334 self.row_numbers = enabled;
1335 }
1336
1337 pub fn toggle_row_numbers(&mut self) {
1338 self.row_numbers = !self.row_numbers;
1339 }
1340
1341 pub fn row_start_index(&self) -> usize {
1343 self.row_start_index
1344 }
1345
1346 fn decompress_compressed_csv_to_temp(
1348 path: &Path,
1349 compression: CompressionFormat,
1350 temp_dir: &Path,
1351 ) -> Result<NamedTempFile> {
1352 let mut temp = NamedTempFile::new_in(temp_dir)?;
1353 let out = temp.as_file_mut();
1354 let mut reader: Box<dyn Read> = match compression {
1355 CompressionFormat::Gzip => {
1356 let f = File::open(path)?;
1357 Box::new(flate2::read::GzDecoder::new(BufReader::new(f)))
1358 }
1359 CompressionFormat::Zstd => {
1360 let f = File::open(path)?;
1361 Box::new(zstd::Decoder::new(BufReader::new(f))?)
1362 }
1363 CompressionFormat::Bzip2 => {
1364 let f = File::open(path)?;
1365 Box::new(bzip2::read::BzDecoder::new(BufReader::new(f)))
1366 }
1367 CompressionFormat::Xz => {
1368 let f = File::open(path)?;
1369 Box::new(xz2::read::XzDecoder::new(BufReader::new(f)))
1370 }
1371 };
1372 std::io::copy(&mut reader, out)?;
1373 out.sync_all()?;
1374 Ok(temp)
1375 }
1376
1377 pub fn from_csv(path: &Path, options: &OpenOptions) -> Result<Self> {
1378 let compression = options
1380 .compression
1381 .or_else(|| CompressionFormat::from_extension(path));
1382
1383 if let Some(compression) = compression {
1384 if options.decompress_in_memory {
1385 match compression {
1387 CompressionFormat::Gzip | CompressionFormat::Zstd => {
1388 let mut read_options = CsvReadOptions::default();
1389 if let Some(skip_lines) = options.skip_lines {
1390 read_options.skip_lines = skip_lines;
1391 }
1392 if let Some(skip_rows) = options.skip_rows {
1393 read_options.skip_rows = skip_rows;
1394 }
1395 if let Some(has_header) = options.has_header {
1396 read_options.has_header = has_header;
1397 }
1398 read_options = read_options.map_parse_options(|opts| {
1399 opts.with_try_parse_dates(options.parse_dates)
1400 });
1401 let df = read_options
1402 .try_into_reader_with_file_path(Some(path.into()))?
1403 .finish()?;
1404 let lf = df.lazy();
1405 let mut state = Self::new(
1406 lf,
1407 options.pages_lookahead,
1408 options.pages_lookback,
1409 options.max_buffered_rows,
1410 options.max_buffered_mb,
1411 options.polars_streaming,
1412 )?;
1413 state.row_numbers = options.row_numbers;
1414 state.row_start_index = options.row_start_index;
1415 Ok(state)
1416 }
1417 CompressionFormat::Bzip2 => {
1418 let file = File::open(path)?;
1419 let mut decoder = bzip2::read::BzDecoder::new(BufReader::new(file));
1420 let mut decompressed = Vec::new();
1421 decoder.read_to_end(&mut decompressed)?;
1422 let mut read_options = CsvReadOptions::default();
1423 if let Some(skip_lines) = options.skip_lines {
1424 read_options.skip_lines = skip_lines;
1425 }
1426 if let Some(skip_rows) = options.skip_rows {
1427 read_options.skip_rows = skip_rows;
1428 }
1429 if let Some(has_header) = options.has_header {
1430 read_options.has_header = has_header;
1431 }
1432 read_options = read_options.map_parse_options(|opts| {
1433 opts.with_try_parse_dates(options.parse_dates)
1434 });
1435 let df = CsvReader::new(std::io::Cursor::new(decompressed))
1436 .with_options(read_options)
1437 .finish()?;
1438 let lf = df.lazy();
1439 let mut state = Self::new(
1440 lf,
1441 options.pages_lookahead,
1442 options.pages_lookback,
1443 options.max_buffered_rows,
1444 options.max_buffered_mb,
1445 options.polars_streaming,
1446 )?;
1447 state.row_numbers = options.row_numbers;
1448 state.row_start_index = options.row_start_index;
1449 Ok(state)
1450 }
1451 CompressionFormat::Xz => {
1452 let file = File::open(path)?;
1453 let mut decoder = xz2::read::XzDecoder::new(BufReader::new(file));
1454 let mut decompressed = Vec::new();
1455 decoder.read_to_end(&mut decompressed)?;
1456 let mut read_options = CsvReadOptions::default();
1457 if let Some(skip_lines) = options.skip_lines {
1458 read_options.skip_lines = skip_lines;
1459 }
1460 if let Some(skip_rows) = options.skip_rows {
1461 read_options.skip_rows = skip_rows;
1462 }
1463 if let Some(has_header) = options.has_header {
1464 read_options.has_header = has_header;
1465 }
1466 read_options = read_options.map_parse_options(|opts| {
1467 opts.with_try_parse_dates(options.parse_dates)
1468 });
1469 let df = CsvReader::new(std::io::Cursor::new(decompressed))
1470 .with_options(read_options)
1471 .finish()?;
1472 let lf = df.lazy();
1473 let mut state = Self::new(
1474 lf,
1475 options.pages_lookahead,
1476 options.pages_lookback,
1477 options.max_buffered_rows,
1478 options.max_buffered_mb,
1479 options.polars_streaming,
1480 )?;
1481 state.row_numbers = options.row_numbers;
1482 state.row_start_index = options.row_start_index;
1483 Ok(state)
1484 }
1485 }
1486 } else {
1487 let temp_dir = options.temp_dir.clone().unwrap_or_else(std::env::temp_dir);
1489 let temp = Self::decompress_compressed_csv_to_temp(path, compression, &temp_dir)?;
1490 let mut state = Self::from_csv_customize(
1491 temp.path(),
1492 options.pages_lookahead,
1493 options.pages_lookback,
1494 options.max_buffered_rows,
1495 options.max_buffered_mb,
1496 |mut reader| {
1497 if let Some(skip_lines) = options.skip_lines {
1498 reader = reader.with_skip_lines(skip_lines);
1499 }
1500 if let Some(skip_rows) = options.skip_rows {
1501 reader = reader.with_skip_rows(skip_rows);
1502 }
1503 if let Some(has_header) = options.has_header {
1504 reader = reader.with_has_header(has_header);
1505 }
1506 reader = reader.with_try_parse_dates(options.parse_dates);
1507 reader
1508 },
1509 )?;
1510 state.row_numbers = options.row_numbers;
1511 state.row_start_index = options.row_start_index;
1512 state.decompress_temp_file = Some(temp);
1513 Ok(state)
1514 }
1515 } else {
1516 let mut state = Self::from_csv_customize(
1518 path,
1519 options.pages_lookahead,
1520 options.pages_lookback,
1521 options.max_buffered_rows,
1522 options.max_buffered_mb,
1523 |mut reader| {
1524 if let Some(skip_lines) = options.skip_lines {
1525 reader = reader.with_skip_lines(skip_lines);
1526 }
1527 if let Some(skip_rows) = options.skip_rows {
1528 reader = reader.with_skip_rows(skip_rows);
1529 }
1530 if let Some(has_header) = options.has_header {
1531 reader = reader.with_has_header(has_header);
1532 }
1533 reader = reader.with_try_parse_dates(options.parse_dates);
1534 reader
1535 },
1536 )?;
1537 state.row_numbers = options.row_numbers;
1538 Ok(state)
1539 }
1540 }
1541
1542 pub fn from_csv_customize<F>(
1543 path: &Path,
1544 pages_lookahead: Option<usize>,
1545 pages_lookback: Option<usize>,
1546 max_buffered_rows: Option<usize>,
1547 max_buffered_mb: Option<usize>,
1548 func: F,
1549 ) -> Result<Self>
1550 where
1551 F: FnOnce(LazyCsvReader) -> LazyCsvReader,
1552 {
1553 let pl_path = PlPath::Local(Arc::from(path));
1554 let reader = LazyCsvReader::new(pl_path);
1555 let lf = func(reader).finish()?;
1556 Self::new(
1557 lf,
1558 pages_lookahead,
1559 pages_lookback,
1560 max_buffered_rows,
1561 max_buffered_mb,
1562 true,
1563 )
1564 }
1565
1566 pub fn from_csv_paths(paths: &[impl AsRef<Path>], options: &OpenOptions) -> Result<Self> {
1568 if paths.is_empty() {
1569 return Err(color_eyre::eyre::eyre!("No paths provided"));
1570 }
1571 if paths.len() == 1 {
1572 return Self::from_csv(paths[0].as_ref(), options);
1573 }
1574 let mut lazy_frames = Vec::with_capacity(paths.len());
1575 for p in paths {
1576 let pl_path = PlPath::Local(Arc::from(p.as_ref()));
1577 let mut reader = LazyCsvReader::new(pl_path);
1578 if let Some(skip_lines) = options.skip_lines {
1579 reader = reader.with_skip_lines(skip_lines);
1580 }
1581 if let Some(skip_rows) = options.skip_rows {
1582 reader = reader.with_skip_rows(skip_rows);
1583 }
1584 if let Some(has_header) = options.has_header {
1585 reader = reader.with_has_header(has_header);
1586 }
1587 reader = reader.with_try_parse_dates(options.parse_dates);
1588 let lf = reader.finish()?;
1589 lazy_frames.push(lf);
1590 }
1591 let lf = polars::prelude::concat(lazy_frames.as_slice(), Default::default())?;
1592 let mut state = Self::new(
1593 lf,
1594 options.pages_lookahead,
1595 options.pages_lookback,
1596 options.max_buffered_rows,
1597 options.max_buffered_mb,
1598 options.polars_streaming,
1599 )?;
1600 state.row_numbers = options.row_numbers;
1601 state.row_start_index = options.row_start_index;
1602 Ok(state)
1603 }
1604
1605 pub fn from_ndjson(
1606 path: &Path,
1607 pages_lookahead: Option<usize>,
1608 pages_lookback: Option<usize>,
1609 max_buffered_rows: Option<usize>,
1610 max_buffered_mb: Option<usize>,
1611 row_numbers: bool,
1612 row_start_index: usize,
1613 ) -> Result<Self> {
1614 let pl_path = PlPath::Local(Arc::from(path));
1615 let lf = LazyJsonLineReader::new(pl_path).finish()?;
1616 let mut state = Self::new(
1617 lf,
1618 pages_lookahead,
1619 pages_lookback,
1620 max_buffered_rows,
1621 max_buffered_mb,
1622 true,
1623 )?;
1624 state.row_numbers = row_numbers;
1625 state.row_start_index = row_start_index;
1626 Ok(state)
1627 }
1628
1629 pub fn from_ndjson_paths(
1631 paths: &[impl AsRef<Path>],
1632 pages_lookahead: Option<usize>,
1633 pages_lookback: Option<usize>,
1634 max_buffered_rows: Option<usize>,
1635 max_buffered_mb: Option<usize>,
1636 row_numbers: bool,
1637 row_start_index: usize,
1638 ) -> Result<Self> {
1639 if paths.is_empty() {
1640 return Err(color_eyre::eyre::eyre!("No paths provided"));
1641 }
1642 if paths.len() == 1 {
1643 return Self::from_ndjson(
1644 paths[0].as_ref(),
1645 pages_lookahead,
1646 pages_lookback,
1647 max_buffered_rows,
1648 max_buffered_mb,
1649 row_numbers,
1650 row_start_index,
1651 );
1652 }
1653 let mut lazy_frames = Vec::with_capacity(paths.len());
1654 for p in paths {
1655 let pl_path = PlPath::Local(Arc::from(p.as_ref()));
1656 let lf = LazyJsonLineReader::new(pl_path).finish()?;
1657 lazy_frames.push(lf);
1658 }
1659 let lf = polars::prelude::concat(lazy_frames.as_slice(), Default::default())?;
1660 let mut state = Self::new(
1661 lf,
1662 pages_lookahead,
1663 pages_lookback,
1664 max_buffered_rows,
1665 max_buffered_mb,
1666 true,
1667 )?;
1668 state.row_numbers = row_numbers;
1669 state.row_start_index = row_start_index;
1670 Ok(state)
1671 }
1672
1673 pub fn from_json(
1674 path: &Path,
1675 pages_lookahead: Option<usize>,
1676 pages_lookback: Option<usize>,
1677 max_buffered_rows: Option<usize>,
1678 max_buffered_mb: Option<usize>,
1679 row_numbers: bool,
1680 row_start_index: usize,
1681 ) -> Result<Self> {
1682 Self::from_json_with_format(
1683 path,
1684 pages_lookahead,
1685 pages_lookback,
1686 max_buffered_rows,
1687 max_buffered_mb,
1688 row_numbers,
1689 row_start_index,
1690 JsonFormat::Json,
1691 )
1692 }
1693
1694 pub fn from_json_lines(
1695 path: &Path,
1696 pages_lookahead: Option<usize>,
1697 pages_lookback: Option<usize>,
1698 max_buffered_rows: Option<usize>,
1699 max_buffered_mb: Option<usize>,
1700 row_numbers: bool,
1701 row_start_index: usize,
1702 ) -> Result<Self> {
1703 Self::from_json_with_format(
1704 path,
1705 pages_lookahead,
1706 pages_lookback,
1707 max_buffered_rows,
1708 max_buffered_mb,
1709 row_numbers,
1710 row_start_index,
1711 JsonFormat::JsonLines,
1712 )
1713 }
1714
1715 #[allow(clippy::too_many_arguments)]
1716 fn from_json_with_format(
1717 path: &Path,
1718 pages_lookahead: Option<usize>,
1719 pages_lookback: Option<usize>,
1720 max_buffered_rows: Option<usize>,
1721 max_buffered_mb: Option<usize>,
1722 row_numbers: bool,
1723 row_start_index: usize,
1724 format: JsonFormat,
1725 ) -> Result<Self> {
1726 let file = File::open(path)?;
1727 let lf = JsonReader::new(file)
1728 .with_json_format(format)
1729 .finish()?
1730 .lazy();
1731 let mut state = Self::new(
1732 lf,
1733 pages_lookahead,
1734 pages_lookback,
1735 max_buffered_rows,
1736 max_buffered_mb,
1737 true,
1738 )?;
1739 state.row_numbers = row_numbers;
1740 state.row_start_index = row_start_index;
1741 Ok(state)
1742 }
1743
1744 pub fn from_json_paths(
1746 paths: &[impl AsRef<Path>],
1747 pages_lookahead: Option<usize>,
1748 pages_lookback: Option<usize>,
1749 max_buffered_rows: Option<usize>,
1750 max_buffered_mb: Option<usize>,
1751 row_numbers: bool,
1752 row_start_index: usize,
1753 ) -> Result<Self> {
1754 Self::from_json_with_format_paths(
1755 paths,
1756 pages_lookahead,
1757 pages_lookback,
1758 max_buffered_rows,
1759 max_buffered_mb,
1760 row_numbers,
1761 row_start_index,
1762 JsonFormat::Json,
1763 )
1764 }
1765
1766 pub fn from_json_lines_paths(
1768 paths: &[impl AsRef<Path>],
1769 pages_lookahead: Option<usize>,
1770 pages_lookback: Option<usize>,
1771 max_buffered_rows: Option<usize>,
1772 max_buffered_mb: Option<usize>,
1773 row_numbers: bool,
1774 row_start_index: usize,
1775 ) -> Result<Self> {
1776 Self::from_json_with_format_paths(
1777 paths,
1778 pages_lookahead,
1779 pages_lookback,
1780 max_buffered_rows,
1781 max_buffered_mb,
1782 row_numbers,
1783 row_start_index,
1784 JsonFormat::JsonLines,
1785 )
1786 }
1787
1788 #[allow(clippy::too_many_arguments)]
1789 fn from_json_with_format_paths(
1790 paths: &[impl AsRef<Path>],
1791 pages_lookahead: Option<usize>,
1792 pages_lookback: Option<usize>,
1793 max_buffered_rows: Option<usize>,
1794 max_buffered_mb: Option<usize>,
1795 row_numbers: bool,
1796 row_start_index: usize,
1797 format: JsonFormat,
1798 ) -> Result<Self> {
1799 if paths.is_empty() {
1800 return Err(color_eyre::eyre::eyre!("No paths provided"));
1801 }
1802 if paths.len() == 1 {
1803 return Self::from_json_with_format(
1804 paths[0].as_ref(),
1805 pages_lookahead,
1806 pages_lookback,
1807 max_buffered_rows,
1808 max_buffered_mb,
1809 row_numbers,
1810 row_start_index,
1811 format,
1812 );
1813 }
1814 let mut lazy_frames = Vec::with_capacity(paths.len());
1815 for p in paths {
1816 let file = File::open(p.as_ref())?;
1817 let lf = match &format {
1818 JsonFormat::Json => JsonReader::new(file)
1819 .with_json_format(JsonFormat::Json)
1820 .finish()?
1821 .lazy(),
1822 JsonFormat::JsonLines => JsonReader::new(file)
1823 .with_json_format(JsonFormat::JsonLines)
1824 .finish()?
1825 .lazy(),
1826 };
1827 lazy_frames.push(lf);
1828 }
1829 let lf = polars::prelude::concat(lazy_frames.as_slice(), Default::default())?;
1830 let mut state = Self::new(
1831 lf,
1832 pages_lookahead,
1833 pages_lookback,
1834 max_buffered_rows,
1835 max_buffered_mb,
1836 true,
1837 )?;
1838 state.row_numbers = row_numbers;
1839 state.row_start_index = row_start_index;
1840 Ok(state)
1841 }
1842
1843 #[allow(clippy::too_many_arguments)]
1844 pub fn from_delimited(
1845 path: &Path,
1846 delimiter: u8,
1847 pages_lookahead: Option<usize>,
1848 pages_lookback: Option<usize>,
1849 max_buffered_rows: Option<usize>,
1850 max_buffered_mb: Option<usize>,
1851 row_numbers: bool,
1852 row_start_index: usize,
1853 ) -> Result<Self> {
1854 let pl_path = PlPath::Local(Arc::from(path));
1855 let reader = LazyCsvReader::new(pl_path).with_separator(delimiter);
1856 let lf = reader.finish()?;
1857 let mut state = Self::new(
1858 lf,
1859 pages_lookahead,
1860 pages_lookback,
1861 max_buffered_rows,
1862 max_buffered_mb,
1863 true,
1864 )?;
1865 state.row_numbers = row_numbers;
1866 state.row_start_index = row_start_index;
1867 Ok(state)
1868 }
1869
1870 pub fn scroll_would_trigger_collect(&self, rows: i64) -> bool {
1873 if rows < 0 && self.start_row == 0 {
1874 return false;
1875 }
1876 let new_start_row = if self.start_row as i64 + rows <= 0 {
1877 0
1878 } else {
1879 if let Some(df) = self.df.as_ref() {
1880 if rows > 0 && df.shape().0 <= self.visible_rows {
1881 return false;
1882 }
1883 }
1884 (self.start_row as i64 + rows) as usize
1885 };
1886 let view_end = new_start_row
1887 + self
1888 .visible_rows
1889 .min(self.num_rows.saturating_sub(new_start_row));
1890 let within_buffer = new_start_row >= self.buffered_start_row
1891 && view_end <= self.buffered_end_row
1892 && self.buffered_end_row > 0;
1893 !within_buffer
1894 }
1895
1896 fn slide_table(&mut self, rows: i64) {
1897 if rows < 0 && self.start_row == 0 {
1898 return;
1899 }
1900
1901 let new_start_row = if self.start_row as i64 + rows <= 0 {
1902 0
1903 } else {
1904 if let Some(df) = self.df.as_ref() {
1905 if rows > 0 && df.shape().0 <= self.visible_rows {
1906 return;
1907 }
1908 }
1909 (self.start_row as i64 + rows) as usize
1910 };
1911
1912 let view_end = new_start_row
1914 + self
1915 .visible_rows
1916 .min(self.num_rows.saturating_sub(new_start_row));
1917 let within_buffer = new_start_row >= self.buffered_start_row
1918 && view_end <= self.buffered_end_row
1919 && self.buffered_end_row > 0;
1920
1921 if within_buffer {
1922 self.start_row = new_start_row;
1923 return;
1924 }
1925
1926 self.start_row = new_start_row;
1927 self.collect();
1928 }
1929
1930 pub fn collect(&mut self) {
1931 if self.visible_rows > 0 {
1933 self.proximity_threshold = self.visible_rows;
1934 }
1935
1936 if !self.num_rows_valid {
1938 self.num_rows =
1939 match collect_lazy(self.lf.clone().select([len()]), self.polars_streaming) {
1940 Ok(df) => {
1941 if let Some(col) = df.get(0) {
1942 if let Some(AnyValue::UInt32(len)) = col.first() {
1943 *len as usize
1944 } else {
1945 0
1946 }
1947 } else {
1948 0
1949 }
1950 }
1951 Err(_) => 0,
1952 };
1953 self.num_rows_valid = true;
1954 }
1955
1956 if self.num_rows > 0 {
1957 let max_start = self.num_rows.saturating_sub(1);
1958 if self.start_row > max_start {
1959 self.start_row = max_start;
1960 }
1961 } else {
1962 self.start_row = 0;
1963 self.buffered_start_row = 0;
1964 self.buffered_end_row = 0;
1965 self.buffered_df = None;
1966 self.df = None;
1967 self.locked_df = None;
1968 return;
1969 }
1970
1971 let view_start = self.start_row;
1973 let view_end = self.start_row + self.visible_rows.min(self.num_rows - self.start_row);
1974
1975 let within_buffer = view_start >= self.buffered_start_row
1977 && view_end <= self.buffered_end_row
1978 && self.buffered_end_row > 0;
1979
1980 let page_rows = self.visible_rows.max(1);
1983
1984 if within_buffer {
1985 let dist_to_start = view_start.saturating_sub(self.buffered_start_row);
1986 let dist_to_end = self.buffered_end_row.saturating_sub(view_end);
1987
1988 let needs_expansion_back =
1989 dist_to_start <= self.proximity_threshold && self.buffered_start_row > 0;
1990 let needs_expansion_forward =
1991 dist_to_end <= self.proximity_threshold && self.buffered_end_row < self.num_rows;
1992
1993 if !needs_expansion_back && !needs_expansion_forward {
1994 let expected_len = self
1996 .buffered_end_row
1997 .saturating_sub(self.buffered_start_row);
1998 if self
1999 .buffered_df
2000 .as_ref()
2001 .is_some_and(|b| b.height() == expected_len)
2002 {
2003 self.slice_buffer_into_display();
2004 if self.table_state.selected().is_none() {
2005 self.table_state.select(Some(0));
2006 }
2007 return;
2008 }
2009 self.load_buffer(self.buffered_start_row, self.buffered_end_row);
2010 if self.table_state.selected().is_none() {
2011 self.table_state.select(Some(0));
2012 }
2013 return;
2014 }
2015
2016 let mut new_buffer_start = if needs_expansion_back {
2017 view_start.saturating_sub(self.pages_lookback * page_rows)
2018 } else {
2019 self.buffered_start_row
2020 };
2021
2022 let mut new_buffer_end = if needs_expansion_forward {
2023 (view_end + self.pages_lookahead * page_rows).min(self.num_rows)
2024 } else {
2025 self.buffered_end_row
2026 };
2027
2028 self.clamp_buffer_to_max_size(
2029 view_start,
2030 view_end,
2031 &mut new_buffer_start,
2032 &mut new_buffer_end,
2033 );
2034 self.load_buffer(new_buffer_start, new_buffer_end);
2035 } else {
2036 let mut new_buffer_start;
2041 let mut new_buffer_end;
2042
2043 let had_buffer = self.buffered_end_row > 0;
2044 let scrolled_past_end = had_buffer && view_start >= self.buffered_end_row;
2045 let scrolled_past_start = had_buffer && view_end <= self.buffered_start_row;
2046
2047 let extend_forward_ok = scrolled_past_end
2048 && (view_start - self.buffered_end_row) <= self.pages_lookahead * page_rows;
2049 let extend_backward_ok = scrolled_past_start
2050 && (self.buffered_start_row - view_end) <= self.pages_lookback * page_rows;
2051
2052 if extend_forward_ok {
2053 new_buffer_start = self.buffered_start_row;
2055 new_buffer_end = (view_end + self.pages_lookahead * page_rows).min(self.num_rows);
2056 } else if extend_backward_ok {
2057 new_buffer_start = view_start.saturating_sub(self.pages_lookback * page_rows);
2059 new_buffer_end = self.buffered_end_row;
2060 } else if scrolled_past_end || scrolled_past_start {
2061 new_buffer_start = view_start.saturating_sub(self.pages_lookback * page_rows);
2063 new_buffer_end = (view_end + self.pages_lookahead * page_rows).min(self.num_rows);
2064 let min_initial_len = (1 + self.pages_lookahead + self.pages_lookback) * page_rows;
2065 let current_len = new_buffer_end.saturating_sub(new_buffer_start);
2066 if current_len < min_initial_len {
2067 let need = min_initial_len.saturating_sub(current_len);
2068 let can_extend_end = self.num_rows.saturating_sub(new_buffer_end);
2069 let can_extend_start = new_buffer_start;
2070 if can_extend_end >= need {
2071 new_buffer_end = (new_buffer_end + need).min(self.num_rows);
2072 } else if can_extend_start >= need {
2073 new_buffer_start = new_buffer_start.saturating_sub(need);
2074 } else {
2075 new_buffer_end = (new_buffer_end + can_extend_end).min(self.num_rows);
2076 new_buffer_start =
2077 new_buffer_start.saturating_sub(need.saturating_sub(can_extend_end));
2078 }
2079 }
2080 } else {
2081 new_buffer_start = view_start.saturating_sub(self.pages_lookback * page_rows);
2083 new_buffer_end = (view_end + self.pages_lookahead * page_rows).min(self.num_rows);
2084
2085 let min_initial_len = (1 + self.pages_lookahead + self.pages_lookback) * page_rows;
2087 let current_len = new_buffer_end.saturating_sub(new_buffer_start);
2088 if current_len < min_initial_len {
2089 let need = min_initial_len.saturating_sub(current_len);
2090 let can_extend_end = self.num_rows.saturating_sub(new_buffer_end);
2091 let can_extend_start = new_buffer_start;
2092 if can_extend_end >= need {
2093 new_buffer_end = (new_buffer_end + need).min(self.num_rows);
2094 } else if can_extend_start >= need {
2095 new_buffer_start = new_buffer_start.saturating_sub(need);
2096 } else {
2097 new_buffer_end = (new_buffer_end + can_extend_end).min(self.num_rows);
2098 new_buffer_start =
2099 new_buffer_start.saturating_sub(need.saturating_sub(can_extend_end));
2100 }
2101 }
2102 }
2103
2104 self.clamp_buffer_to_max_size(
2105 view_start,
2106 view_end,
2107 &mut new_buffer_start,
2108 &mut new_buffer_end,
2109 );
2110 self.load_buffer(new_buffer_start, new_buffer_end);
2111 }
2112
2113 self.slice_from_buffer();
2114 if self.table_state.selected().is_none() {
2115 self.table_state.select(Some(0));
2116 }
2117 }
2118
2119 fn invalidate_num_rows(&mut self) {
2121 self.num_rows_valid = false;
2122 }
2123
2124 pub fn num_rows_if_valid(&self) -> Option<usize> {
2127 if self.num_rows_valid {
2128 Some(self.num_rows)
2129 } else {
2130 None
2131 }
2132 }
2133
2134 fn clamp_buffer_to_max_size(
2136 &self,
2137 view_start: usize,
2138 view_end: usize,
2139 buffer_start: &mut usize,
2140 buffer_end: &mut usize,
2141 ) {
2142 if self.max_buffered_rows == 0 {
2143 return;
2144 }
2145 let max_len = self.max_buffered_rows;
2146 let requested_len = buffer_end.saturating_sub(*buffer_start);
2147 if requested_len <= max_len {
2148 return;
2149 }
2150 let view_len = view_end.saturating_sub(view_start);
2151 if view_len >= max_len {
2152 *buffer_start = view_start;
2153 *buffer_end = (view_start + max_len).min(self.num_rows);
2154 } else {
2155 let half = (max_len - view_len) / 2;
2156 *buffer_end = (view_end + half).min(self.num_rows);
2157 *buffer_start = (*buffer_end).saturating_sub(max_len);
2158 if *buffer_start > view_start {
2159 *buffer_start = view_start;
2160 }
2161 *buffer_end = (*buffer_start + max_len).min(self.num_rows);
2162 }
2163 }
2164
2165 fn load_buffer(&mut self, buffer_start: usize, buffer_end: usize) {
2166 let buffer_size = buffer_end.saturating_sub(buffer_start);
2167 if buffer_size == 0 {
2168 return;
2169 }
2170
2171 let all_columns: Vec<_> = self
2172 .column_order
2173 .iter()
2174 .map(|name| col(name.as_str()))
2175 .collect();
2176
2177 let mut full_df = match collect_lazy(
2178 self.lf
2179 .clone()
2180 .select(all_columns)
2181 .slice(buffer_start as i64, buffer_size as u32),
2182 self.polars_streaming,
2183 ) {
2184 Ok(df) => df,
2185 Err(e) => {
2186 self.error = Some(e);
2187 return;
2188 }
2189 };
2190
2191 let mut effective_buffer_end = buffer_end;
2192 if self.max_buffered_mb > 0 {
2193 let size = full_df.estimated_size();
2194 let max_bytes = self.max_buffered_mb * 1024 * 1024;
2195 if size > max_bytes {
2196 let rows = full_df.height();
2197 if rows > 0 {
2198 let bytes_per_row = size / rows;
2199 let max_rows = (max_bytes / bytes_per_row.max(1)).min(rows);
2200 if max_rows < rows {
2201 full_df = full_df.slice(0, max_rows);
2202 effective_buffer_end = buffer_start + max_rows;
2203 }
2204 }
2205 }
2206 }
2207
2208 if self.locked_columns_count > 0 {
2209 let locked_names: Vec<&str> = self
2210 .column_order
2211 .iter()
2212 .take(self.locked_columns_count)
2213 .map(|s| s.as_str())
2214 .collect();
2215 let locked_df = match full_df.select(locked_names) {
2216 Ok(df) => df,
2217 Err(e) => {
2218 self.error = Some(e);
2219 return;
2220 }
2221 };
2222 self.locked_df = if self.is_grouped() {
2223 match self.format_grouped_dataframe(locked_df) {
2224 Ok(formatted_df) => Some(formatted_df),
2225 Err(e) => {
2226 self.error = Some(PolarsError::ComputeError(
2227 crate::error_display::user_message_from_report(&e, None).into(),
2228 ));
2229 return;
2230 }
2231 }
2232 } else {
2233 Some(locked_df)
2234 };
2235 } else {
2236 self.locked_df = None;
2237 }
2238
2239 let scroll_names: Vec<&str> = self
2240 .column_order
2241 .iter()
2242 .skip(self.locked_columns_count + self.termcol_index)
2243 .map(|s| s.as_str())
2244 .collect();
2245 if scroll_names.is_empty() {
2246 self.df = None;
2247 } else {
2248 let scroll_df = match full_df.select(scroll_names) {
2249 Ok(df) => df,
2250 Err(e) => {
2251 self.error = Some(e);
2252 return;
2253 }
2254 };
2255 self.df = if self.is_grouped() {
2256 match self.format_grouped_dataframe(scroll_df) {
2257 Ok(formatted_df) => Some(formatted_df),
2258 Err(e) => {
2259 self.error = Some(PolarsError::ComputeError(
2260 crate::error_display::user_message_from_report(&e, None).into(),
2261 ));
2262 return;
2263 }
2264 }
2265 } else {
2266 Some(scroll_df)
2267 };
2268 }
2269 if self.error.is_some() {
2270 self.error = None;
2271 }
2272 self.buffered_start_row = buffer_start;
2273 self.buffered_end_row = effective_buffer_end;
2274 self.buffered_df = Some(full_df);
2275 }
2276
2277 fn slice_buffer_into_display(&mut self) {
2279 let full_df = match self.buffered_df.as_ref() {
2280 Some(df) => df,
2281 None => return,
2282 };
2283
2284 if self.locked_columns_count > 0 {
2285 let locked_names: Vec<&str> = self
2286 .column_order
2287 .iter()
2288 .take(self.locked_columns_count)
2289 .map(|s| s.as_str())
2290 .collect();
2291 if let Ok(locked_df) = full_df.select(locked_names) {
2292 self.locked_df = if self.is_grouped() {
2293 self.format_grouped_dataframe(locked_df).ok()
2294 } else {
2295 Some(locked_df)
2296 };
2297 }
2298 } else {
2299 self.locked_df = None;
2300 }
2301
2302 let scroll_names: Vec<&str> = self
2303 .column_order
2304 .iter()
2305 .skip(self.locked_columns_count + self.termcol_index)
2306 .map(|s| s.as_str())
2307 .collect();
2308 if scroll_names.is_empty() {
2309 self.df = None;
2310 } else if let Ok(scroll_df) = full_df.select(scroll_names) {
2311 self.df = if self.is_grouped() {
2312 self.format_grouped_dataframe(scroll_df).ok()
2313 } else {
2314 Some(scroll_df)
2315 };
2316 }
2317 }
2318
2319 fn slice_from_buffer(&mut self) {
2320 }
2325
2326 fn format_grouped_dataframe(&self, df: DataFrame) -> Result<DataFrame> {
2327 let schema = df.schema();
2328 let mut new_series = Vec::new();
2329
2330 for (col_name, dtype) in schema.iter() {
2331 let col = df.column(col_name)?;
2332 if matches!(dtype, DataType::List(_)) {
2333 let string_series: Series = col
2334 .list()?
2335 .into_iter()
2336 .map(|opt_list| {
2337 opt_list.map(|list_series| {
2338 let values: Vec<String> = list_series
2339 .iter()
2340 .take(10)
2341 .map(|v| v.str_value().to_string())
2342 .collect();
2343 if list_series.len() > 10 {
2344 format!("[{}...] ({} items)", values.join(", "), list_series.len())
2345 } else {
2346 format!("[{}]", values.join(", "))
2347 }
2348 })
2349 })
2350 .collect();
2351 new_series.push(string_series.with_name(col_name.as_str().into()).into());
2352 } else {
2353 new_series.push(col.clone());
2354 }
2355 }
2356
2357 Ok(DataFrame::new(new_series)?)
2358 }
2359
2360 pub fn select_next(&mut self) {
2361 self.table_state.select_next();
2362 if let Some(selected) = self.table_state.selected() {
2363 if selected >= self.visible_rows && self.visible_rows > 0 {
2364 self.slide_table(1);
2365 }
2366 }
2367 }
2368
2369 pub fn page_down(&mut self) {
2370 self.slide_table(self.visible_rows as i64);
2371 }
2372
2373 pub fn select_previous(&mut self) {
2374 if let Some(selected) = self.table_state.selected() {
2375 self.table_state.select_previous();
2376 if selected == 0 && self.start_row > 0 {
2377 self.slide_table(-1);
2378 }
2379 } else {
2380 self.table_state.select(Some(0));
2381 }
2382 }
2383
2384 pub fn scroll_to(&mut self, index: usize) {
2385 if self.start_row == index {
2386 return;
2387 }
2388
2389 if index == 0 {
2390 self.start_row = 0;
2391 self.collect();
2392 self.start_row = 0;
2393 } else {
2394 self.start_row = index;
2395 self.collect();
2396 }
2397 }
2398
2399 pub fn scroll_to_row_centered(&mut self, row_index: usize) {
2402 self.ensure_num_rows();
2403 if self.num_rows == 0 || self.visible_rows == 0 {
2404 return;
2405 }
2406 let center_offset = self.visible_rows / 2;
2407 let mut start_row = row_index.saturating_sub(center_offset);
2408 let max_start = self.num_rows.saturating_sub(self.visible_rows);
2409 start_row = start_row.min(max_start);
2410
2411 if self.start_row == start_row {
2412 let display_idx = row_index
2413 .saturating_sub(start_row)
2414 .min(self.visible_rows.saturating_sub(1));
2415 self.table_state.select(Some(display_idx));
2416 return;
2417 }
2418
2419 self.start_row = start_row;
2420 self.collect();
2421 let display_idx = row_index
2422 .saturating_sub(start_row)
2423 .min(self.visible_rows.saturating_sub(1));
2424 self.table_state.select(Some(display_idx));
2425 }
2426
2427 fn ensure_num_rows(&mut self) {
2429 if self.num_rows_valid {
2430 return;
2431 }
2432 if self.visible_rows > 0 {
2433 self.proximity_threshold = self.visible_rows;
2434 }
2435 self.num_rows = match self.lf.clone().select([len()]).collect() {
2436 Ok(df) => {
2437 if let Some(col) = df.get(0) {
2438 if let Some(AnyValue::UInt32(len)) = col.first() {
2439 *len as usize
2440 } else {
2441 0
2442 }
2443 } else {
2444 0
2445 }
2446 }
2447 Err(_) => 0,
2448 };
2449 self.num_rows_valid = true;
2450 }
2451
2452 pub fn scroll_to_end(&mut self) {
2454 self.ensure_num_rows();
2455 if self.num_rows == 0 {
2456 self.start_row = 0;
2457 self.buffered_start_row = 0;
2458 self.buffered_end_row = 0;
2459 return;
2460 }
2461 let end_start = self.num_rows.saturating_sub(self.visible_rows);
2462 if self.start_row == end_start {
2463 self.select_last_visible_row();
2464 return;
2465 }
2466 self.start_row = end_start;
2467 self.collect();
2468 self.select_last_visible_row();
2469 }
2470
2471 fn select_last_visible_row(&mut self) {
2473 if self.num_rows == 0 {
2474 return;
2475 }
2476 let last_row_display_idx = (self.num_rows - 1).saturating_sub(self.start_row);
2477 let sel = last_row_display_idx.min(self.visible_rows.saturating_sub(1));
2478 self.table_state.select(Some(sel));
2479 }
2480
2481 pub fn half_page_down(&mut self) {
2482 let half = (self.visible_rows / 2).max(1) as i64;
2483 self.slide_table(half);
2484 }
2485
2486 pub fn half_page_up(&mut self) {
2487 if self.start_row == 0 {
2488 return;
2489 }
2490 let half = (self.visible_rows / 2).max(1) as i64;
2491 self.slide_table(-half);
2492 }
2493
2494 pub fn page_up(&mut self) {
2495 if self.start_row == 0 {
2496 return;
2497 }
2498 self.slide_table(-(self.visible_rows as i64));
2499 }
2500
2501 pub fn scroll_right(&mut self) {
2502 let max_scroll = self
2503 .column_order
2504 .len()
2505 .saturating_sub(self.locked_columns_count);
2506 if self.termcol_index < max_scroll.saturating_sub(1) {
2507 self.termcol_index += 1;
2508 self.collect();
2509 }
2510 }
2511
2512 pub fn scroll_left(&mut self) {
2513 if self.termcol_index > 0 {
2514 self.termcol_index -= 1;
2515 self.collect();
2516 }
2517 }
2518
2519 pub fn headers(&self) -> Vec<String> {
2520 self.column_order.clone()
2521 }
2522
2523 pub fn set_column_order(&mut self, order: Vec<String>) {
2524 self.column_order = order;
2525 self.buffered_start_row = 0;
2526 self.buffered_end_row = 0;
2527 self.buffered_df = None;
2528 self.collect();
2529 }
2530
2531 pub fn set_locked_columns(&mut self, count: usize) {
2532 self.locked_columns_count = count.min(self.column_order.len());
2533 self.buffered_start_row = 0;
2534 self.buffered_end_row = 0;
2535 self.buffered_df = None;
2536 self.collect();
2537 }
2538
2539 pub fn locked_columns_count(&self) -> usize {
2540 self.locked_columns_count
2541 }
2542
2543 pub fn get_filters(&self) -> &[FilterStatement] {
2545 &self.filters
2546 }
2547
2548 pub fn get_sort_columns(&self) -> &[String] {
2549 &self.sort_columns
2550 }
2551
2552 pub fn get_sort_ascending(&self) -> bool {
2553 self.sort_ascending
2554 }
2555
2556 pub fn get_column_order(&self) -> &[String] {
2557 &self.column_order
2558 }
2559
2560 pub fn get_active_query(&self) -> &str {
2561 &self.active_query
2562 }
2563
2564 pub fn get_active_sql_query(&self) -> &str {
2565 &self.active_sql_query
2566 }
2567
2568 pub fn get_active_fuzzy_query(&self) -> &str {
2569 &self.active_fuzzy_query
2570 }
2571
2572 pub fn last_pivot_spec(&self) -> Option<&PivotSpec> {
2573 self.last_pivot_spec.as_ref()
2574 }
2575
2576 pub fn last_melt_spec(&self) -> Option<&MeltSpec> {
2577 self.last_melt_spec.as_ref()
2578 }
2579
2580 pub fn is_grouped(&self) -> bool {
2581 self.schema
2582 .iter()
2583 .any(|(_, dtype)| matches!(dtype, DataType::List(_)))
2584 }
2585
2586 pub fn group_key_columns(&self) -> Vec<String> {
2587 self.schema
2588 .iter()
2589 .filter(|(_, dtype)| !matches!(dtype, DataType::List(_)))
2590 .map(|(name, _)| name.to_string())
2591 .collect()
2592 }
2593
2594 pub fn group_value_columns(&self) -> Vec<String> {
2595 self.schema
2596 .iter()
2597 .filter(|(_, dtype)| matches!(dtype, DataType::List(_)))
2598 .map(|(name, _)| name.to_string())
2599 .collect()
2600 }
2601
2602 pub fn buffered_memory_bytes(&self) -> Option<usize> {
2604 let locked = self
2605 .locked_df
2606 .as_ref()
2607 .map(|df| df.estimated_size())
2608 .unwrap_or(0);
2609 let scroll = self.df.as_ref().map(|df| df.estimated_size()).unwrap_or(0);
2610 if locked == 0 && scroll == 0 {
2611 None
2612 } else {
2613 Some(locked + scroll)
2614 }
2615 }
2616
2617 pub fn buffered_rows(&self) -> usize {
2619 self.buffered_end_row
2620 .saturating_sub(self.buffered_start_row)
2621 }
2622
2623 pub fn max_buffered_rows(&self) -> usize {
2625 self.max_buffered_rows
2626 }
2627
2628 pub fn max_buffered_mb(&self) -> usize {
2630 self.max_buffered_mb
2631 }
2632
2633 pub fn drill_down_into_group(&mut self, group_index: usize) -> Result<()> {
2634 if !self.is_grouped() {
2635 return Ok(());
2636 }
2637
2638 self.grouped_lf = Some(self.lf.clone());
2639
2640 let grouped_df = collect_lazy(self.lf.clone(), self.polars_streaming)?;
2641
2642 if group_index >= grouped_df.height() {
2643 return Err(color_eyre::eyre::eyre!("Group index out of bounds"));
2644 }
2645
2646 let key_columns = self.group_key_columns();
2647 let mut key_values = Vec::new();
2648 for col_name in &key_columns {
2649 let col = grouped_df.column(col_name)?;
2650 let value = col.get(group_index).map_err(|e| {
2651 color_eyre::eyre::eyre!(
2652 "Group index {} out of bounds for column {}: {}",
2653 group_index,
2654 col_name,
2655 e
2656 )
2657 })?;
2658 key_values.push(value.str_value().to_string());
2659 }
2660 self.drilled_down_group_key = Some(key_values.clone());
2661 self.drilled_down_group_key_columns = Some(key_columns.clone());
2662
2663 let value_columns = self.group_value_columns();
2664 if value_columns.is_empty() {
2665 return Err(color_eyre::eyre::eyre!("No value columns in grouped data"));
2666 }
2667
2668 let mut columns = Vec::new();
2669
2670 let first_value_col = grouped_df.column(&value_columns[0])?;
2671 let first_list_value = first_value_col.get(group_index).map_err(|e| {
2672 color_eyre::eyre::eyre!("Group index {} out of bounds: {}", group_index, e)
2673 })?;
2674 let row_count = if let AnyValue::List(list_series) = first_list_value {
2675 list_series.len()
2676 } else {
2677 0
2678 };
2679
2680 for col_name in &key_columns {
2681 let col = grouped_df.column(col_name)?;
2682 let value = col.get(group_index).map_err(|e| {
2683 color_eyre::eyre::eyre!(
2684 "Group index {} out of bounds for column {}: {}",
2685 group_index,
2686 col_name,
2687 e
2688 )
2689 })?;
2690 let constant_series = match value {
2691 AnyValue::Int32(v) => Series::new(col_name.as_str().into(), vec![v; row_count]),
2692 AnyValue::Int64(v) => Series::new(col_name.as_str().into(), vec![v; row_count]),
2693 AnyValue::UInt32(v) => Series::new(col_name.as_str().into(), vec![v; row_count]),
2694 AnyValue::UInt64(v) => Series::new(col_name.as_str().into(), vec![v; row_count]),
2695 AnyValue::Float32(v) => Series::new(col_name.as_str().into(), vec![v; row_count]),
2696 AnyValue::Float64(v) => Series::new(col_name.as_str().into(), vec![v; row_count]),
2697 AnyValue::String(v) => {
2698 Series::new(col_name.as_str().into(), vec![v.to_string(); row_count])
2699 }
2700 AnyValue::Boolean(v) => Series::new(col_name.as_str().into(), vec![v; row_count]),
2701 _ => {
2702 let str_val = value.str_value().to_string();
2703 Series::new(col_name.as_str().into(), vec![str_val; row_count])
2704 }
2705 };
2706 columns.push(constant_series.into());
2707 }
2708
2709 for col_name in &value_columns {
2710 let col = grouped_df.column(col_name)?;
2711 let value = col.get(group_index).map_err(|e| {
2712 color_eyre::eyre::eyre!(
2713 "Group index {} out of bounds for column {}: {}",
2714 group_index,
2715 col_name,
2716 e
2717 )
2718 })?;
2719 if let AnyValue::List(list_series) = value {
2720 let named_series = list_series.with_name(col_name.as_str().into());
2721 columns.push(named_series.into());
2722 }
2723 }
2724
2725 let group_df = DataFrame::new(columns)?;
2726
2727 self.invalidate_num_rows();
2728 self.lf = group_df.lazy();
2729 self.schema = self.lf.clone().collect_schema()?;
2730 self.column_order = self.schema.iter_names().map(|s| s.to_string()).collect();
2731 self.drilled_down_group_index = Some(group_index);
2732 self.start_row = 0;
2733 self.termcol_index = 0;
2734 self.locked_columns_count = 0;
2735 self.table_state.select(Some(0));
2736 self.collect();
2737
2738 Ok(())
2739 }
2740
2741 pub fn drill_up(&mut self) -> Result<()> {
2742 if let Some(grouped_lf) = self.grouped_lf.take() {
2743 self.invalidate_num_rows();
2744 self.lf = grouped_lf;
2745 self.schema = self.lf.clone().collect_schema()?;
2746 self.column_order = self.schema.iter_names().map(|s| s.to_string()).collect();
2747 self.drilled_down_group_index = None;
2748 self.drilled_down_group_key = None;
2749 self.drilled_down_group_key_columns = None;
2750 self.start_row = 0;
2751 self.termcol_index = 0;
2752 self.locked_columns_count = 0;
2753 self.table_state.select(Some(0));
2754 self.collect();
2755 Ok(())
2756 } else {
2757 Err(color_eyre::eyre::eyre!("Not in drill-down mode"))
2758 }
2759 }
2760
2761 pub fn get_analysis_dataframe(&self) -> Result<DataFrame> {
2762 Ok(collect_lazy(self.lf.clone(), self.polars_streaming)?)
2763 }
2764
2765 pub fn get_analysis_context(&self) -> crate::statistics::AnalysisContext {
2766 crate::statistics::AnalysisContext {
2767 has_query: !self.active_query.is_empty(),
2768 query: self.active_query.clone(),
2769 has_filters: !self.filters.is_empty(),
2770 filter_count: self.filters.len(),
2771 is_drilled_down: self.is_drilled_down(),
2772 group_key: self.drilled_down_group_key.clone(),
2773 group_columns: self.drilled_down_group_key_columns.clone(),
2774 }
2775 }
2776
2777 pub fn pivot(&mut self, spec: &PivotSpec) -> Result<()> {
2782 let df = collect_lazy(self.lf.clone(), self.polars_streaming)?;
2783 let agg_expr = pivot_agg_expr(spec.aggregation)?;
2784 let index_str: Vec<&str> = spec.index.iter().map(|s| s.as_str()).collect();
2785 let index_opt = if index_str.is_empty() {
2786 None
2787 } else {
2788 Some(index_str)
2789 };
2790 let pivoted = pivot_stable(
2791 &df,
2792 [spec.pivot_column.as_str()],
2793 index_opt,
2794 Some([spec.value_column.as_str()]),
2795 spec.sort_columns,
2796 Some(agg_expr),
2797 None,
2798 )?;
2799 self.last_pivot_spec = Some(spec.clone());
2800 self.last_melt_spec = None;
2801 self.replace_lf_after_reshape(pivoted.lazy())?;
2802 Ok(())
2803 }
2804
2805 pub fn melt(&mut self, spec: &MeltSpec) -> Result<()> {
2807 let on = cols(spec.value_columns.iter().map(|s| s.as_str()));
2808 let index = cols(spec.index.iter().map(|s| s.as_str()));
2809 let args = UnpivotArgsDSL {
2810 on,
2811 index,
2812 variable_name: Some(PlSmallStr::from(spec.variable_name.as_str())),
2813 value_name: Some(PlSmallStr::from(spec.value_name.as_str())),
2814 };
2815 let lf = self.lf.clone().unpivot(args);
2816 self.last_melt_spec = Some(spec.clone());
2817 self.last_pivot_spec = None;
2818 self.replace_lf_after_reshape(lf)?;
2819 Ok(())
2820 }
2821
2822 fn replace_lf_after_reshape(&mut self, lf: LazyFrame) -> Result<()> {
2823 self.invalidate_num_rows();
2824 self.lf = lf;
2825 self.schema = self.lf.clone().collect_schema()?;
2826 self.column_order = self.schema.iter_names().map(|s| s.to_string()).collect();
2827 self.filters.clear();
2828 self.sort_columns.clear();
2829 self.active_query.clear();
2830 self.active_sql_query.clear();
2831 self.active_fuzzy_query.clear();
2832 self.error = None;
2833 self.df = None;
2834 self.locked_df = None;
2835 self.grouped_lf = None;
2836 self.drilled_down_group_index = None;
2837 self.drilled_down_group_key = None;
2838 self.drilled_down_group_key_columns = None;
2839 self.start_row = 0;
2840 self.termcol_index = 0;
2841 self.locked_columns_count = 0;
2842 self.buffered_start_row = 0;
2843 self.buffered_end_row = 0;
2844 self.buffered_df = None;
2845 self.table_state.select(Some(0));
2846 self.collect();
2847 Ok(())
2848 }
2849
2850 pub fn is_drilled_down(&self) -> bool {
2851 self.drilled_down_group_index.is_some()
2852 }
2853
2854 fn apply_transformations(&mut self) {
2855 let mut lf = self.lf.clone();
2856 let mut final_expr: Option<Expr> = None;
2857
2858 for filter in &self.filters {
2859 let col_expr = col(&filter.column);
2860 let val_lit = if let Some(dtype) = self.schema.get(&filter.column) {
2861 match dtype {
2862 DataType::Float32 | DataType::Float64 => filter
2863 .value
2864 .parse::<f64>()
2865 .map(lit)
2866 .unwrap_or_else(|_| lit(filter.value.as_str())),
2867 DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => filter
2868 .value
2869 .parse::<i64>()
2870 .map(lit)
2871 .unwrap_or_else(|_| lit(filter.value.as_str())),
2872 DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 => {
2873 filter
2874 .value
2875 .parse::<u64>()
2876 .map(lit)
2877 .unwrap_or_else(|_| lit(filter.value.as_str()))
2878 }
2879 DataType::Boolean => filter
2880 .value
2881 .parse::<bool>()
2882 .map(lit)
2883 .unwrap_or_else(|_| lit(filter.value.as_str())),
2884 _ => lit(filter.value.as_str()),
2885 }
2886 } else {
2887 lit(filter.value.as_str())
2888 };
2889
2890 let op_expr = match filter.operator {
2891 FilterOperator::Eq => col_expr.eq(val_lit),
2892 FilterOperator::NotEq => col_expr.neq(val_lit),
2893 FilterOperator::Gt => col_expr.gt(val_lit),
2894 FilterOperator::Lt => col_expr.lt(val_lit),
2895 FilterOperator::GtEq => col_expr.gt_eq(val_lit),
2896 FilterOperator::LtEq => col_expr.lt_eq(val_lit),
2897 FilterOperator::Contains => {
2898 let val = filter.value.clone();
2899 col_expr.str().contains_literal(lit(val))
2900 }
2901 FilterOperator::NotContains => {
2902 let val = filter.value.clone();
2903 col_expr.str().contains_literal(lit(val)).not()
2904 }
2905 };
2906
2907 if let Some(current) = final_expr {
2908 final_expr = Some(match filter.logical_op {
2909 LogicalOperator::And => current.and(op_expr),
2910 LogicalOperator::Or => current.or(op_expr),
2911 });
2912 } else {
2913 final_expr = Some(op_expr);
2914 }
2915 }
2916
2917 if let Some(e) = final_expr {
2918 lf = lf.filter(e);
2919 }
2920
2921 if !self.sort_columns.is_empty() {
2922 let options = SortMultipleOptions {
2923 descending: self
2924 .sort_columns
2925 .iter()
2926 .map(|_| !self.sort_ascending)
2927 .collect(),
2928 ..Default::default()
2929 };
2930 lf = lf.sort_by_exprs(
2931 self.sort_columns.iter().map(col).collect::<Vec<_>>(),
2932 options,
2933 );
2934 } else if !self.sort_ascending {
2935 lf = lf.reverse();
2936 }
2937
2938 self.invalidate_num_rows();
2939 self.lf = lf;
2940 self.collect();
2941 }
2942
2943 pub fn sort(&mut self, columns: Vec<String>, ascending: bool) {
2944 self.sort_columns = columns;
2945 self.sort_ascending = ascending;
2946 self.buffered_start_row = 0;
2947 self.buffered_end_row = 0;
2948 self.buffered_df = None;
2949 self.apply_transformations();
2950 }
2951
2952 pub fn reverse(&mut self) {
2953 self.sort_ascending = !self.sort_ascending;
2954
2955 self.buffered_start_row = 0;
2956 self.buffered_end_row = 0;
2957 self.buffered_df = None;
2958
2959 if !self.sort_columns.is_empty() {
2960 let options = SortMultipleOptions {
2961 descending: self
2962 .sort_columns
2963 .iter()
2964 .map(|_| !self.sort_ascending)
2965 .collect(),
2966 ..Default::default()
2967 };
2968 self.invalidate_num_rows();
2969 self.lf = self.lf.clone().sort_by_exprs(
2970 self.sort_columns.iter().map(col).collect::<Vec<_>>(),
2971 options,
2972 );
2973 self.collect();
2974 } else {
2975 self.invalidate_num_rows();
2976 self.lf = self.lf.clone().reverse();
2977 self.collect();
2978 }
2979 }
2980
2981 pub fn filter(&mut self, filters: Vec<FilterStatement>) {
2982 self.filters = filters;
2983 self.buffered_start_row = 0;
2984 self.buffered_end_row = 0;
2985 self.buffered_df = None;
2986 self.apply_transformations();
2987 }
2988
2989 pub fn query(&mut self, query: String) {
2990 self.error = None;
2991
2992 let trimmed_query = query.trim();
2993 if trimmed_query.is_empty() {
2994 self.reset_lf_to_original();
2995 self.collect();
2996 return;
2997 }
2998
2999 match parse_query(&query) {
3000 Ok((cols, filter, group_by_cols, group_by_col_names)) => {
3001 let mut lf = self.original_lf.clone();
3002
3003 if let Some(f) = filter {
3005 lf = lf.filter(f);
3006 }
3007
3008 if !group_by_cols.is_empty() {
3009 if !cols.is_empty() {
3010 lf = lf.group_by(group_by_cols.clone()).agg(cols);
3011 lf = lf.sort_by_exprs(group_by_cols.clone(), Default::default());
3012 } else {
3013 let schema = match lf.clone().collect_schema() {
3014 Ok(s) => s,
3015 Err(e) => {
3016 self.error = Some(e);
3017 return; }
3019 };
3020 let all_columns: Vec<String> =
3021 schema.iter_names().map(|s| s.to_string()).collect();
3022
3023 let mut agg_exprs = Vec::new();
3027 for col_name in &all_columns {
3028 if !group_by_col_names.contains(col_name) {
3029 agg_exprs.push(col(col_name));
3030 }
3031 }
3032
3033 lf = lf.group_by(group_by_cols.clone()).agg(agg_exprs);
3034 lf = lf.sort_by_exprs(group_by_cols.clone(), Default::default());
3035 }
3036 } else if !cols.is_empty() {
3037 lf = lf.select(cols);
3038 }
3039
3040 let schema = match lf.collect_schema() {
3041 Ok(schema) => schema,
3042 Err(e) => {
3043 self.error = Some(e);
3044 return;
3045 }
3046 };
3047
3048 self.schema = schema;
3049 self.invalidate_num_rows();
3050 self.lf = lf;
3051 self.column_order = self.schema.iter_names().map(|s| s.to_string()).collect();
3052
3053 if !group_by_col_names.is_empty() {
3056 let mut locked_count = 0;
3059 for col_name in &self.column_order {
3060 if group_by_col_names.contains(col_name) {
3061 locked_count += 1;
3062 } else {
3063 break;
3065 }
3066 }
3067 self.locked_columns_count = locked_count;
3068 } else {
3069 self.locked_columns_count = 0;
3070 }
3071
3072 self.filters.clear();
3074 self.sort_columns.clear();
3075 self.sort_ascending = true;
3076 self.start_row = 0;
3077 self.termcol_index = 0;
3078 self.active_query = query;
3079 self.buffered_start_row = 0;
3080 self.buffered_end_row = 0;
3081 self.buffered_df = None;
3082 self.drilled_down_group_index = None;
3084 self.drilled_down_group_key = None;
3085 self.drilled_down_group_key_columns = None;
3086 self.grouped_lf = None;
3087 self.table_state.select(Some(0));
3089 self.collect();
3092 if self.num_rows > 0 {
3095 self.start_row = 0;
3096 }
3097 }
3098 Err(e) => {
3099 self.error = Some(PolarsError::ComputeError(e.into()));
3101 }
3102 }
3103 }
3104
3105 pub fn sql_query(&mut self, sql: String) {
3108 self.error = None;
3109 let trimmed = sql.trim();
3110 if trimmed.is_empty() {
3111 self.reset_lf_to_original();
3112 return;
3113 }
3114
3115 #[cfg(feature = "sql")]
3116 {
3117 use polars_sql::SQLContext;
3118 let mut ctx = SQLContext::new();
3119 ctx.register("df", self.lf.clone());
3120 match ctx.execute(trimmed) {
3121 Ok(result_lf) => {
3122 let schema = match result_lf.clone().collect_schema() {
3123 Ok(s) => s,
3124 Err(e) => {
3125 self.error = Some(e);
3126 return;
3127 }
3128 };
3129 self.schema = schema;
3130 self.invalidate_num_rows();
3131 self.lf = result_lf;
3132 self.column_order = self.schema.iter_names().map(|s| s.to_string()).collect();
3133 self.active_sql_query = sql;
3134 self.locked_columns_count = 0;
3135 self.filters.clear();
3136 self.sort_columns.clear();
3137 self.sort_ascending = true;
3138 self.start_row = 0;
3139 self.termcol_index = 0;
3140 self.drilled_down_group_index = None;
3141 self.drilled_down_group_key = None;
3142 self.drilled_down_group_key_columns = None;
3143 self.grouped_lf = None;
3144 self.buffered_start_row = 0;
3145 self.buffered_end_row = 0;
3146 self.buffered_df = None;
3147 self.table_state.select(Some(0));
3148 }
3149 Err(e) => {
3150 self.error = Some(e);
3151 }
3152 }
3153 }
3154
3155 #[cfg(not(feature = "sql"))]
3156 {
3157 self.error = Some(PolarsError::ComputeError(
3158 format!("SQL support not compiled in (build with --features sql)").into(),
3159 ));
3160 }
3161 }
3162
3163 pub fn fuzzy_search(&mut self, query: String) {
3167 self.error = None;
3168 let trimmed = query.trim();
3169 if trimmed.is_empty() {
3170 self.reset_lf_to_original();
3171 self.collect();
3172 return;
3173 }
3174 let string_cols: Vec<String> = self
3175 .schema
3176 .iter()
3177 .filter(|(_, dtype)| dtype.is_string())
3178 .map(|(name, _)| name.to_string())
3179 .collect();
3180 if string_cols.is_empty() {
3181 self.error = Some(PolarsError::ComputeError(
3182 "Fuzzy search requires at least one string column".into(),
3183 ));
3184 return;
3185 }
3186 let tokens: Vec<&str> = trimmed
3187 .split_whitespace()
3188 .filter(|s| !s.is_empty())
3189 .collect();
3190 let token_exprs: Vec<Expr> = tokens
3191 .iter()
3192 .map(|token| {
3193 let pattern = fuzzy_token_regex(token);
3194 string_cols
3195 .iter()
3196 .map(|c| col(c.as_str()).str().contains(lit(pattern.as_str()), false))
3197 .reduce(|a, b| a.or(b))
3198 .unwrap()
3199 })
3200 .collect();
3201 let combined = token_exprs.into_iter().reduce(|a, b| a.and(b)).unwrap();
3202 self.lf = self.original_lf.clone().filter(combined);
3203 self.filters.clear();
3204 self.sort_columns.clear();
3205 self.active_query.clear();
3206 self.active_sql_query.clear();
3207 self.active_fuzzy_query = query;
3208 self.locked_columns_count = 0;
3210 self.start_row = 0;
3211 self.termcol_index = 0;
3212 self.drilled_down_group_index = None;
3213 self.drilled_down_group_key = None;
3214 self.drilled_down_group_key_columns = None;
3215 self.grouped_lf = None;
3216 self.buffered_start_row = 0;
3217 self.buffered_end_row = 0;
3218 self.buffered_df = None;
3219 self.table_state.select(Some(0));
3220 self.invalidate_num_rows();
3221 self.collect();
3222 }
3223}
3224
3225pub(crate) fn fuzzy_token_regex(token: &str) -> String {
3227 let inner: String =
3228 token
3229 .chars()
3230 .map(|c| regex::escape(&c.to_string()))
3231 .fold(String::new(), |mut s, e| {
3232 if !s.is_empty() {
3233 s.push_str(".*");
3234 }
3235 s.push_str(&e);
3236 s
3237 });
3238 format!("(?i).*{}.*", inner)
3239}
3240
3241pub struct DataTable {
3242 pub header_bg: Color,
3243 pub header_fg: Color,
3244 pub row_numbers_fg: Color,
3245 pub separator_fg: Color,
3246 pub table_cell_padding: u16,
3247 pub alternate_row_bg: Option<Color>,
3248 pub column_colors: bool,
3250 pub str_col: Option<Color>,
3251 pub int_col: Option<Color>,
3252 pub float_col: Option<Color>,
3253 pub bool_col: Option<Color>,
3254 pub temporal_col: Option<Color>,
3255}
3256
3257impl Default for DataTable {
3258 fn default() -> Self {
3259 Self {
3260 header_bg: Color::Indexed(236),
3261 header_fg: Color::White,
3262 row_numbers_fg: Color::DarkGray,
3263 separator_fg: Color::White,
3264 table_cell_padding: 1,
3265 alternate_row_bg: None,
3266 column_colors: false,
3267 str_col: None,
3268 int_col: None,
3269 float_col: None,
3270 bool_col: None,
3271 temporal_col: None,
3272 }
3273 }
3274}
3275
3276struct RowNumbersParams {
3278 start_row: usize,
3279 visible_rows: usize,
3280 num_rows: usize,
3281 row_start_index: usize,
3282 selected_row: Option<usize>,
3283}
3284
3285impl DataTable {
3286 pub fn new() -> Self {
3287 Self::default()
3288 }
3289
3290 pub fn with_colors(
3291 mut self,
3292 header_bg: Color,
3293 header_fg: Color,
3294 row_numbers_fg: Color,
3295 separator_fg: Color,
3296 ) -> Self {
3297 self.header_bg = header_bg;
3298 self.header_fg = header_fg;
3299 self.row_numbers_fg = row_numbers_fg;
3300 self.separator_fg = separator_fg;
3301 self
3302 }
3303
3304 pub fn with_cell_padding(mut self, padding: u16) -> Self {
3305 self.table_cell_padding = padding;
3306 self
3307 }
3308
3309 pub fn with_alternate_row_bg(mut self, color: Option<Color>) -> Self {
3310 self.alternate_row_bg = color;
3311 self
3312 }
3313
3314 pub fn with_column_type_colors(
3316 mut self,
3317 str_col: Color,
3318 int_col: Color,
3319 float_col: Color,
3320 bool_col: Color,
3321 temporal_col: Color,
3322 ) -> Self {
3323 self.column_colors = true;
3324 self.str_col = Some(str_col);
3325 self.int_col = Some(int_col);
3326 self.float_col = Some(float_col);
3327 self.bool_col = Some(bool_col);
3328 self.temporal_col = Some(temporal_col);
3329 self
3330 }
3331
3332 fn column_type_color(&self, dtype: &DataType) -> Option<Color> {
3334 if !self.column_colors {
3335 return None;
3336 }
3337 match dtype {
3338 DataType::String => self.str_col,
3339 DataType::Int8
3340 | DataType::Int16
3341 | DataType::Int32
3342 | DataType::Int64
3343 | DataType::UInt8
3344 | DataType::UInt16
3345 | DataType::UInt32
3346 | DataType::UInt64 => self.int_col,
3347 DataType::Float32 | DataType::Float64 => self.float_col,
3348 DataType::Boolean => self.bool_col,
3349 DataType::Date | DataType::Datetime(_, _) | DataType::Time | DataType::Duration(_) => {
3350 self.temporal_col
3351 }
3352 _ => None,
3353 }
3354 }
3355
3356 fn render_dataframe(
3357 &self,
3358 df: &DataFrame,
3359 area: Rect,
3360 buf: &mut Buffer,
3361 state: &mut TableState,
3362 _row_numbers: bool,
3363 _start_row_offset: usize,
3364 ) {
3365 let (height, cols) = df.shape();
3367
3368 let mut widths: Vec<u16> = df
3370 .get_column_names()
3371 .iter()
3372 .map(|name| name.chars().count() as u16)
3373 .collect();
3374
3375 let mut used_width = 0;
3376
3377 let mut rows: Vec<Vec<Cell>> = vec![vec![]; height];
3379 let mut visible_columns = 0;
3380
3381 let max_rows = height.min(if area.height > 1 {
3382 area.height as usize - 1
3383 } else {
3384 0
3385 });
3386
3387 for col_index in 0..cols {
3388 let mut max_len = widths[col_index];
3389 let col_data = &df[col_index];
3390 let col_color = self.column_type_color(col_data.dtype());
3391
3392 for (row_index, row) in rows.iter_mut().take(max_rows).enumerate() {
3393 let value = col_data.get(row_index).unwrap();
3394 let val_str: Cow<str> = if matches!(value, AnyValue::Null) {
3395 Cow::Borrowed("")
3396 } else {
3397 value.str_value()
3398 };
3399 let len = val_str.chars().count() as u16;
3400 max_len = max_len.max(len);
3401 let cell = match col_color {
3402 Some(c) => Cell::from(Line::from(Span::styled(
3403 val_str.into_owned(),
3404 Style::default().fg(c),
3405 ))),
3406 None => Cell::from(Line::from(val_str)),
3407 };
3408 row.push(cell);
3409 }
3410
3411 let overflows = (used_width + max_len) > area.width;
3413
3414 if overflows && col_data.dtype() == &DataType::String {
3415 let visible_width = area.width.saturating_sub(used_width);
3416 visible_columns += 1;
3417 widths[col_index] = visible_width;
3418 break;
3419 } else if !overflows {
3420 visible_columns += 1;
3421 widths[col_index] = max_len;
3422 used_width += max_len + self.table_cell_padding;
3423 } else {
3424 break;
3425 }
3426 }
3427
3428 widths.truncate(visible_columns);
3429 let rows: Vec<Row> = rows
3431 .into_iter()
3432 .enumerate()
3433 .map(|(row_index, mut row)| {
3434 row.truncate(visible_columns);
3435 let row_style = if row_index % 2 == 1 {
3436 self.alternate_row_bg
3437 .map(|c| Style::default().bg(c))
3438 .unwrap_or_default()
3439 } else {
3440 Style::default()
3441 };
3442 Row::new(row).style(row_style)
3443 })
3444 .collect();
3445
3446 let header_row_style = if self.header_bg == Color::Reset {
3447 Style::default().fg(self.header_fg)
3448 } else {
3449 Style::default().bg(self.header_bg).fg(self.header_fg)
3450 };
3451 let headers: Vec<Span> = df
3452 .get_column_names()
3453 .iter()
3454 .take(visible_columns)
3455 .map(|name| Span::styled(name.to_string(), Style::default()))
3456 .collect();
3457
3458 StatefulWidget::render(
3459 Table::new(rows, widths)
3460 .column_spacing(self.table_cell_padding)
3461 .header(Row::new(headers).style(header_row_style))
3462 .row_highlight_style(Style::default().add_modifier(Modifier::REVERSED)),
3463 area,
3464 buf,
3465 state,
3466 );
3467 }
3468
3469 fn render_row_numbers(&self, area: Rect, buf: &mut Buffer, params: RowNumbersParams) {
3470 let header_style = if self.header_bg == Color::Reset {
3472 Style::default().fg(self.header_fg)
3473 } else {
3474 Style::default().bg(self.header_bg).fg(self.header_fg)
3475 };
3476 let header_fill = " ".repeat(area.width as usize);
3477 Paragraph::new(header_fill).style(header_style).render(
3478 Rect {
3479 x: area.x,
3480 y: area.y,
3481 width: area.width,
3482 height: 1,
3483 },
3484 buf,
3485 );
3486
3487 let rows_to_render = params
3489 .visible_rows
3490 .min(params.num_rows.saturating_sub(params.start_row));
3491
3492 if rows_to_render == 0 {
3493 return;
3494 }
3495
3496 let max_row_num =
3498 params.start_row + rows_to_render.saturating_sub(1) + params.row_start_index;
3499 let max_width = max_row_num.to_string().len();
3500
3501 for row_idx in 0..rows_to_render.min(area.height.saturating_sub(1) as usize) {
3503 let row_num = params.start_row + row_idx + params.row_start_index;
3504 let row_num_text = row_num.to_string();
3505
3506 let padding = max_width.saturating_sub(row_num_text.len());
3508 let padded_text = format!("{}{}", " ".repeat(padding), row_num_text);
3509
3510 let is_selected = params.selected_row == Some(row_idx);
3514 let (fg, bg) = if is_selected {
3515 (
3516 Color::Reset,
3517 self.alternate_row_bg.filter(|_| row_idx % 2 == 1),
3518 )
3519 } else {
3520 (
3521 self.row_numbers_fg,
3522 self.alternate_row_bg.filter(|_| row_idx % 2 == 1),
3523 )
3524 };
3525 let row_num_style = match bg {
3526 Some(bg_color) => Style::default().fg(fg).bg(bg_color),
3527 None => Style::default().fg(fg),
3528 };
3529
3530 let y = area.y + row_idx as u16 + 1; if y < area.y + area.height {
3532 Paragraph::new(padded_text).style(row_num_style).render(
3533 Rect {
3534 x: area.x,
3535 y,
3536 width: area.width,
3537 height: 1,
3538 },
3539 buf,
3540 );
3541 }
3542 }
3543 }
3544}
3545
3546impl StatefulWidget for DataTable {
3547 type State = DataTableState;
3548
3549 fn render(self, area: Rect, buf: &mut Buffer, state: &mut Self::State) {
3550 state.visible_termcols = area.width as usize;
3551 let new_visible_rows = if area.height > 0 {
3552 (area.height - 1) as usize
3553 } else {
3554 0
3555 };
3556 let needs_collect = new_visible_rows != state.visible_rows;
3557 state.visible_rows = new_visible_rows;
3558
3559 if let Some(selected) = state.table_state.selected() {
3560 if selected >= state.visible_rows {
3561 state.table_state.select(Some(state.visible_rows - 1))
3562 }
3563 }
3564
3565 if needs_collect {
3566 state.collect();
3567 }
3568
3569 if let Some(error) = state.error.as_ref() {
3572 if !state.suppress_error_display {
3573 Paragraph::new(format!("Error: {}", user_message_from_polars(error)))
3574 .centered()
3575 .block(
3576 Block::default()
3577 .borders(Borders::NONE)
3578 .padding(Padding::top(area.height / 2)),
3579 )
3580 .wrap(ratatui::widgets::Wrap { trim: true })
3581 .render(area, buf);
3582 return;
3583 }
3584 }
3586
3587 let row_num_width = if state.row_numbers {
3589 let max_row_num = state.start_row + state.visible_rows.saturating_sub(1) + 1; max_row_num.to_string().len().max(1) as u16 + 1 } else {
3592 0
3593 };
3594
3595 let mut locked_width = row_num_width;
3597 if let Some(locked_df) = state.locked_df.as_ref() {
3598 let (_, cols) = locked_df.shape();
3599 for col_index in 0..cols {
3600 let col_name = locked_df.get_column_names()[col_index];
3601 let mut max_len = col_name.chars().count() as u16;
3602 let col_data = &locked_df[col_index];
3603 for row_index in 0..locked_df.height().min(state.visible_rows) {
3604 let value = col_data.get(row_index).unwrap();
3605 let val_str: Cow<str> = if matches!(value, AnyValue::Null) {
3606 Cow::Borrowed("")
3607 } else {
3608 value.str_value()
3609 };
3610 let len = val_str.chars().count() as u16;
3611 max_len = max_len.max(len);
3612 }
3613 locked_width += max_len + 1;
3614 }
3615 }
3616
3617 if locked_width > row_num_width && locked_width < area.width {
3619 let locked_area = Rect {
3620 x: area.x,
3621 y: area.y,
3622 width: locked_width,
3623 height: area.height,
3624 };
3625 let separator_x = locked_area.x + locked_area.width;
3626
3627 if state.row_numbers {
3629 let row_num_area = Rect {
3630 x: area.x,
3631 y: area.y,
3632 width: row_num_width,
3633 height: area.height,
3634 };
3635 self.render_row_numbers(
3636 row_num_area,
3637 buf,
3638 RowNumbersParams {
3639 start_row: state.start_row,
3640 visible_rows: state.visible_rows,
3641 num_rows: state.num_rows,
3642 row_start_index: state.row_start_index,
3643 selected_row: state.table_state.selected(),
3644 },
3645 );
3646 }
3647 let scrollable_area = Rect {
3648 x: separator_x + 1,
3649 y: area.y,
3650 width: area.width.saturating_sub(locked_width + 1),
3651 height: area.height,
3652 };
3653
3654 if let Some(locked_df) = state.locked_df.as_ref() {
3656 let adjusted_locked_area = if state.row_numbers {
3658 Rect {
3659 x: area.x + row_num_width,
3660 y: area.y,
3661 width: locked_width - row_num_width,
3662 height: area.height,
3663 }
3664 } else {
3665 locked_area
3666 };
3667
3668 let offset = state.start_row.saturating_sub(state.buffered_start_row);
3670 let slice_len = state
3671 .visible_rows
3672 .min(locked_df.height().saturating_sub(offset));
3673 if offset < locked_df.height() && slice_len > 0 {
3674 let sliced_df = locked_df.slice(offset as i64, slice_len);
3675 self.render_dataframe(
3676 &sliced_df,
3677 adjusted_locked_area,
3678 buf,
3679 &mut state.table_state,
3680 false,
3681 state.start_row,
3682 );
3683 }
3684 }
3685
3686 let separator_x_adjusted = if state.row_numbers {
3688 area.x + row_num_width + (locked_width - row_num_width)
3689 } else {
3690 separator_x
3691 };
3692 for y in area.y..area.y + area.height {
3693 let cell = &mut buf[(separator_x_adjusted, y)];
3694 cell.set_char('│');
3695 cell.set_style(Style::default().fg(self.separator_fg));
3696 }
3697
3698 let adjusted_scrollable_area = if state.row_numbers {
3700 Rect {
3701 x: separator_x_adjusted + 1,
3702 y: area.y,
3703 width: area.width.saturating_sub(locked_width + 1),
3704 height: area.height,
3705 }
3706 } else {
3707 scrollable_area
3708 };
3709
3710 if let Some(df) = state.df.as_ref() {
3712 let offset = state.start_row.saturating_sub(state.buffered_start_row);
3714 let slice_len = state.visible_rows.min(df.height().saturating_sub(offset));
3715 if offset < df.height() && slice_len > 0 {
3716 let sliced_df = df.slice(offset as i64, slice_len);
3717 self.render_dataframe(
3718 &sliced_df,
3719 adjusted_scrollable_area,
3720 buf,
3721 &mut state.table_state,
3722 false,
3723 state.start_row,
3724 );
3725 }
3726 }
3727 } else if let Some(df) = state.df.as_ref() {
3728 if state.row_numbers {
3731 let row_num_area = Rect {
3732 x: area.x,
3733 y: area.y,
3734 width: row_num_width,
3735 height: area.height,
3736 };
3737 self.render_row_numbers(
3738 row_num_area,
3739 buf,
3740 RowNumbersParams {
3741 start_row: state.start_row,
3742 visible_rows: state.visible_rows,
3743 num_rows: state.num_rows,
3744 row_start_index: state.row_start_index,
3745 selected_row: state.table_state.selected(),
3746 },
3747 );
3748
3749 let data_area = Rect {
3751 x: area.x + row_num_width,
3752 y: area.y,
3753 width: area.width.saturating_sub(row_num_width),
3754 height: area.height,
3755 };
3756
3757 let offset = state.start_row.saturating_sub(state.buffered_start_row);
3759 let slice_len = state.visible_rows.min(df.height().saturating_sub(offset));
3760 if offset < df.height() && slice_len > 0 {
3761 let sliced_df = df.slice(offset as i64, slice_len);
3762 self.render_dataframe(
3763 &sliced_df,
3764 data_area,
3765 buf,
3766 &mut state.table_state,
3767 false,
3768 state.start_row,
3769 );
3770 }
3771 } else {
3772 let offset = state.start_row.saturating_sub(state.buffered_start_row);
3774 let slice_len = state.visible_rows.min(df.height().saturating_sub(offset));
3775 if offset < df.height() && slice_len > 0 {
3776 let sliced_df = df.slice(offset as i64, slice_len);
3777 self.render_dataframe(
3778 &sliced_df,
3779 area,
3780 buf,
3781 &mut state.table_state,
3782 false,
3783 state.start_row,
3784 );
3785 }
3786 }
3787 } else if !state.column_order.is_empty() {
3788 let empty_columns: Vec<_> = state
3790 .column_order
3791 .iter()
3792 .map(|name| Series::new(name.as_str().into(), Vec::<String>::new()).into())
3793 .collect();
3794 if let Ok(empty_df) = DataFrame::new(empty_columns) {
3795 if state.row_numbers {
3796 let row_num_area = Rect {
3797 x: area.x,
3798 y: area.y,
3799 width: row_num_width,
3800 height: area.height,
3801 };
3802 self.render_row_numbers(
3803 row_num_area,
3804 buf,
3805 RowNumbersParams {
3806 start_row: 0,
3807 visible_rows: state.visible_rows,
3808 num_rows: 0,
3809 row_start_index: state.row_start_index,
3810 selected_row: None,
3811 },
3812 );
3813 let data_area = Rect {
3814 x: area.x + row_num_width,
3815 y: area.y,
3816 width: area.width.saturating_sub(row_num_width),
3817 height: area.height,
3818 };
3819 self.render_dataframe(
3820 &empty_df,
3821 data_area,
3822 buf,
3823 &mut state.table_state,
3824 false,
3825 0,
3826 );
3827 } else {
3828 self.render_dataframe(&empty_df, area, buf, &mut state.table_state, false, 0);
3829 }
3830 } else {
3831 Paragraph::new("No data").render(area, buf);
3832 }
3833 } else {
3834 Paragraph::new("No data").render(area, buf);
3836 }
3837 }
3838}
3839
3840#[cfg(test)]
3841mod tests {
3842 use super::*;
3843 use crate::filter_modal::{FilterOperator, FilterStatement, LogicalOperator};
3844 use crate::pivot_melt_modal::{MeltSpec, PivotAggregation, PivotSpec};
3845
3846 fn create_test_lf() -> LazyFrame {
3847 df! (
3848 "a" => &[1, 2, 3],
3849 "b" => &["x", "y", "z"]
3850 )
3851 .unwrap()
3852 .lazy()
3853 }
3854
3855 fn create_large_test_lf() -> LazyFrame {
3856 df! (
3857 "a" => (0..100).collect::<Vec<i32>>(),
3858 "b" => (0..100).map(|i| format!("text_{}", i)).collect::<Vec<String>>(),
3859 "c" => (0..100).map(|i| i % 3).collect::<Vec<i32>>(),
3860 "d" => (0..100).map(|i| i % 5).collect::<Vec<i32>>()
3861 )
3862 .unwrap()
3863 .lazy()
3864 }
3865
3866 #[test]
3867 fn test_from_csv() {
3868 let path = crate::tests::sample_data_dir().join("3-sfd-header.csv");
3871 let state = DataTableState::from_csv(&path, &Default::default()).unwrap(); assert_eq!(state.schema.len(), 6); }
3874
3875 #[test]
3876 fn test_from_csv_gzipped() {
3877 let path = crate::tests::sample_data_dir().join("mixed_types.csv.gz");
3880 let state = DataTableState::from_csv(&path, &Default::default()).unwrap(); assert_eq!(state.schema.len(), 6); }
3883
3884 #[test]
3885 fn test_from_parquet() {
3886 let path = crate::tests::sample_data_dir().join("people.parquet");
3888 let state = DataTableState::from_parquet(&path, None, None, None, None, false, 1).unwrap();
3889 assert!(!state.schema.is_empty());
3890 }
3891
3892 #[test]
3893 fn test_from_ipc() {
3894 use polars::prelude::IpcWriter;
3895 use std::io::BufWriter;
3896 let mut df = df!(
3897 "x" => &[1_i32, 2, 3],
3898 "y" => &["a", "b", "c"]
3899 )
3900 .unwrap();
3901 let dir = std::env::temp_dir();
3902 let path = dir.join("datui_test_ipc.arrow");
3903 let file = std::fs::File::create(&path).unwrap();
3904 let mut writer = BufWriter::new(file);
3905 IpcWriter::new(&mut writer).finish(&mut df).unwrap();
3906 drop(writer);
3907 let state = DataTableState::from_ipc(&path, None, None, None, None, false, 1).unwrap();
3908 assert_eq!(state.schema.len(), 2);
3909 assert!(state.schema.contains("x"));
3910 assert!(state.schema.contains("y"));
3911 let _ = std::fs::remove_file(&path);
3912 }
3913
3914 #[test]
3915 fn test_from_avro() {
3916 use polars::io::avro::AvroWriter;
3917 use std::io::BufWriter;
3918 let mut df = df!(
3919 "id" => &[1_i32, 2, 3],
3920 "name" => &["alice", "bob", "carol"]
3921 )
3922 .unwrap();
3923 let dir = std::env::temp_dir();
3924 let path = dir.join("datui_test_avro.avro");
3925 let file = std::fs::File::create(&path).unwrap();
3926 let mut writer = BufWriter::new(file);
3927 AvroWriter::new(&mut writer).finish(&mut df).unwrap();
3928 drop(writer);
3929 let state = DataTableState::from_avro(&path, None, None, None, None, false, 1).unwrap();
3930 assert_eq!(state.schema.len(), 2);
3931 assert!(state.schema.contains("id"));
3932 assert!(state.schema.contains("name"));
3933 let _ = std::fs::remove_file(&path);
3934 }
3935
3936 #[test]
3937 fn test_from_orc() {
3938 use arrow::array::{Int64Array, StringArray};
3939 use arrow::datatypes::{DataType, Field, Schema};
3940 use arrow::record_batch::RecordBatch;
3941 use orc_rust::ArrowWriterBuilder;
3942 use std::io::BufWriter;
3943 use std::sync::Arc;
3944
3945 let schema = Arc::new(Schema::new(vec![
3946 Field::new("id", DataType::Int64, false),
3947 Field::new("name", DataType::Utf8, false),
3948 ]));
3949 let id_array = Arc::new(Int64Array::from(vec![1_i64, 2, 3]));
3950 let name_array = Arc::new(StringArray::from(vec!["a", "b", "c"]));
3951 let batch = RecordBatch::try_new(schema.clone(), vec![id_array, name_array]).unwrap();
3952
3953 let dir = std::env::temp_dir();
3954 let path = dir.join("datui_test_orc.orc");
3955 let file = std::fs::File::create(&path).unwrap();
3956 let writer = BufWriter::new(file);
3957 let mut orc_writer = ArrowWriterBuilder::new(writer, schema).try_build().unwrap();
3958 orc_writer.write(&batch).unwrap();
3959 orc_writer.close().unwrap();
3960
3961 let state = DataTableState::from_orc(&path, None, None, None, None, false, 1).unwrap();
3962 assert_eq!(state.schema.len(), 2);
3963 assert!(state.schema.contains("id"));
3964 assert!(state.schema.contains("name"));
3965 let _ = std::fs::remove_file(&path);
3966 }
3967
3968 #[test]
3969 fn test_filter() {
3970 let lf = create_test_lf();
3971 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
3972 let filters = vec![FilterStatement {
3973 column: "a".to_string(),
3974 operator: FilterOperator::Gt,
3975 value: "2".to_string(),
3976 logical_op: LogicalOperator::And,
3977 }];
3978 state.filter(filters);
3979 let df = state.lf.clone().collect().unwrap();
3980 assert_eq!(df.shape().0, 1);
3981 assert_eq!(df.column("a").unwrap().get(0).unwrap(), AnyValue::Int32(3));
3982 }
3983
3984 #[test]
3985 fn test_sort() {
3986 let lf = create_test_lf();
3987 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
3988 state.sort(vec!["a".to_string()], false);
3989 let df = state.lf.clone().collect().unwrap();
3990 assert_eq!(df.column("a").unwrap().get(0).unwrap(), AnyValue::Int32(3));
3991 }
3992
3993 #[test]
3994 fn test_query() {
3995 let lf = create_test_lf();
3996 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
3997 state.query("select b where a = 2".to_string());
3998 let df = state.lf.clone().collect().unwrap();
3999 assert_eq!(df.shape(), (1, 1));
4000 assert_eq!(
4001 df.column("b").unwrap().get(0).unwrap(),
4002 AnyValue::String("y")
4003 );
4004 }
4005
4006 #[test]
4007 fn test_query_date_accessors() {
4008 use chrono::NaiveDate;
4009 let df = df!(
4010 "event_date" => [
4011 NaiveDate::from_ymd_opt(2024, 1, 15).unwrap(),
4012 NaiveDate::from_ymd_opt(2024, 6, 20).unwrap(),
4013 NaiveDate::from_ymd_opt(2024, 12, 31).unwrap(),
4014 ],
4015 "name" => &["a", "b", "c"],
4016 )
4017 .unwrap();
4018 let lf = df.lazy();
4019 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4020
4021 state.query("select name, year: event_date.year, month: event_date.month".to_string());
4023 assert!(
4024 state.error.is_none(),
4025 "query should succeed: {:?}",
4026 state.error
4027 );
4028 let df = state.lf.clone().collect().unwrap();
4029 assert_eq!(df.shape(), (3, 3));
4030 assert_eq!(
4031 df.column("year").unwrap().get(0).unwrap(),
4032 AnyValue::Int32(2024)
4033 );
4034 assert_eq!(
4035 df.column("month").unwrap().get(0).unwrap(),
4036 AnyValue::Int8(1)
4037 );
4038 assert_eq!(
4039 df.column("month").unwrap().get(1).unwrap(),
4040 AnyValue::Int8(6)
4041 );
4042
4043 state.query("select name, event_date where event_date.month = 12".to_string());
4045 assert!(
4046 state.error.is_none(),
4047 "filter should succeed: {:?}",
4048 state.error
4049 );
4050 let df = state.lf.clone().collect().unwrap();
4051 assert_eq!(df.height(), 1);
4052 assert_eq!(
4053 df.column("name").unwrap().get(0).unwrap(),
4054 AnyValue::String("c")
4055 );
4056
4057 state.query("select name, event_date where event_date.date > 2024.06.15".to_string());
4059 assert!(
4060 state.error.is_none(),
4061 "date literal filter should succeed: {:?}",
4062 state.error
4063 );
4064 let df = state.lf.clone().collect().unwrap();
4065 assert_eq!(
4066 df.height(),
4067 2,
4068 "2024-06-20 and 2024-12-31 are after 2024-06-15"
4069 );
4070
4071 state.query(
4073 "select name, upper_name: name.upper, name_len: name.len where name.ends_with[\"c\"]"
4074 .to_string(),
4075 );
4076 assert!(
4077 state.error.is_none(),
4078 "string accessors should succeed: {:?}",
4079 state.error
4080 );
4081 let df = state.lf.clone().collect().unwrap();
4082 assert_eq!(df.height(), 1, "only 'c' ends with 'c'");
4083 assert_eq!(
4084 df.column("upper_name").unwrap().get(0).unwrap(),
4085 AnyValue::String("C")
4086 );
4087
4088 state.query("select where event_date.date = 2020.01.01".to_string());
4090 assert!(state.error.is_none());
4091 assert_eq!(state.num_rows, 0);
4092 state.visible_rows = 10;
4093 state.collect();
4094 assert!(state.df.is_none(), "df must be cleared when num_rows is 0");
4095 assert!(
4096 state.locked_df.is_none(),
4097 "locked_df must be cleared when num_rows is 0"
4098 );
4099 }
4100
4101 #[test]
4102 fn test_select_next_previous() {
4103 let lf = create_large_test_lf();
4104 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4105 state.visible_rows = 10;
4106 state.table_state.select(Some(5));
4107
4108 state.select_next();
4109 assert_eq!(state.table_state.selected(), Some(6));
4110
4111 state.select_previous();
4112 assert_eq!(state.table_state.selected(), Some(5));
4113 }
4114
4115 #[test]
4116 fn test_page_up_down() {
4117 let lf = create_large_test_lf();
4118 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4119 state.visible_rows = 20;
4120 state.collect();
4121
4122 assert_eq!(state.start_row, 0);
4123 state.page_down();
4124 assert_eq!(state.start_row, 20);
4125 state.page_down();
4126 assert_eq!(state.start_row, 40);
4127 state.page_up();
4128 assert_eq!(state.start_row, 20);
4129 state.page_up();
4130 assert_eq!(state.start_row, 0);
4131 }
4132
4133 #[test]
4134 fn test_scroll_left_right() {
4135 let lf = create_large_test_lf();
4136 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4137 assert_eq!(state.termcol_index, 0);
4138 state.scroll_right();
4139 assert_eq!(state.termcol_index, 1);
4140 state.scroll_right();
4141 assert_eq!(state.termcol_index, 2);
4142 state.scroll_left();
4143 assert_eq!(state.termcol_index, 1);
4144 state.scroll_left();
4145 assert_eq!(state.termcol_index, 0);
4146 }
4147
4148 #[test]
4149 fn test_reverse() {
4150 let lf = create_test_lf();
4151 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4152 state.sort(vec!["a".to_string()], true);
4153 assert_eq!(
4154 state
4155 .lf
4156 .clone()
4157 .collect()
4158 .unwrap()
4159 .column("a")
4160 .unwrap()
4161 .get(0)
4162 .unwrap(),
4163 AnyValue::Int32(1)
4164 );
4165 state.reverse();
4166 assert_eq!(
4167 state
4168 .lf
4169 .clone()
4170 .collect()
4171 .unwrap()
4172 .column("a")
4173 .unwrap()
4174 .get(0)
4175 .unwrap(),
4176 AnyValue::Int32(3)
4177 );
4178 }
4179
4180 #[test]
4181 fn test_filter_multiple() {
4182 let lf = create_large_test_lf();
4183 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4184 let filters = vec![
4185 FilterStatement {
4186 column: "c".to_string(),
4187 operator: FilterOperator::Eq,
4188 value: "1".to_string(),
4189 logical_op: LogicalOperator::And,
4190 },
4191 FilterStatement {
4192 column: "d".to_string(),
4193 operator: FilterOperator::Eq,
4194 value: "2".to_string(),
4195 logical_op: LogicalOperator::And,
4196 },
4197 ];
4198 state.filter(filters);
4199 let df = state.lf.clone().collect().unwrap();
4200 assert_eq!(df.shape().0, 7);
4201 }
4202
4203 #[test]
4204 fn test_filter_and_sort() {
4205 let lf = create_large_test_lf();
4206 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4207 let filters = vec![FilterStatement {
4208 column: "c".to_string(),
4209 operator: FilterOperator::Eq,
4210 value: "1".to_string(),
4211 logical_op: LogicalOperator::And,
4212 }];
4213 state.filter(filters);
4214 state.sort(vec!["a".to_string()], false);
4215 let df = state.lf.clone().collect().unwrap();
4216 assert_eq!(df.column("a").unwrap().get(0).unwrap(), AnyValue::Int32(97));
4217 }
4218
4219 fn create_pivot_long_lf() -> LazyFrame {
4222 let df = df!(
4223 "id" => &[1_i32, 1, 1, 2, 2, 2, 1, 2],
4224 "date" => &["d1", "d1", "d1", "d1", "d1", "d1", "d1", "d1"],
4225 "key" => &["A", "B", "C", "A", "B", "C", "A", "B"],
4226 "value" => &[10.0_f64, 20.0, 30.0, 40.0, 50.0, 60.0, 11.0, 51.0],
4227 )
4228 .unwrap();
4229 df.lazy()
4230 }
4231
4232 fn create_melt_wide_lf() -> LazyFrame {
4234 let df = df!(
4235 "id" => &[1_i32, 2, 3],
4236 "date" => &["d1", "d2", "d3"],
4237 "c1" => &[10.0_f64, 20.0, 30.0],
4238 "c2" => &[11.0, 21.0, 31.0],
4239 "c3" => &[12.0, 22.0, 32.0],
4240 )
4241 .unwrap();
4242 df.lazy()
4243 }
4244
4245 #[test]
4246 fn test_pivot_basic() {
4247 let lf = create_pivot_long_lf();
4248 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4249 let spec = PivotSpec {
4250 index: vec!["id".to_string(), "date".to_string()],
4251 pivot_column: "key".to_string(),
4252 value_column: "value".to_string(),
4253 aggregation: PivotAggregation::Last,
4254 sort_columns: false,
4255 };
4256 state.pivot(&spec).unwrap();
4257 let df = state.lf.clone().collect().unwrap();
4258 let names: Vec<&str> = df.get_column_names().iter().map(|s| s.as_str()).collect();
4259 assert!(names.contains(&"id"));
4260 assert!(names.contains(&"date"));
4261 assert!(names.contains(&"A"));
4262 assert!(names.contains(&"B"));
4263 assert!(names.contains(&"C"));
4264 assert_eq!(df.height(), 2);
4265 }
4266
4267 #[test]
4268 fn test_pivot_aggregation_last() {
4269 let lf = create_pivot_long_lf();
4270 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4271 let spec = PivotSpec {
4272 index: vec!["id".to_string(), "date".to_string()],
4273 pivot_column: "key".to_string(),
4274 value_column: "value".to_string(),
4275 aggregation: PivotAggregation::Last,
4276 sort_columns: false,
4277 };
4278 state.pivot(&spec).unwrap();
4279 let df = state.lf.clone().collect().unwrap();
4280 let a_col = df.column("A").unwrap();
4281 let row0 = a_col.get(0).unwrap();
4282 let row1 = a_col.get(1).unwrap();
4283 assert_eq!(row0, AnyValue::Float64(11.0));
4284 assert_eq!(row1, AnyValue::Float64(40.0));
4285 }
4286
4287 #[test]
4288 fn test_pivot_aggregation_first() {
4289 let lf = create_pivot_long_lf();
4290 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4291 let spec = PivotSpec {
4292 index: vec!["id".to_string(), "date".to_string()],
4293 pivot_column: "key".to_string(),
4294 value_column: "value".to_string(),
4295 aggregation: PivotAggregation::First,
4296 sort_columns: false,
4297 };
4298 state.pivot(&spec).unwrap();
4299 let df = state.lf.clone().collect().unwrap();
4300 let a_col = df.column("A").unwrap();
4301 assert_eq!(a_col.get(0).unwrap(), AnyValue::Float64(10.0));
4302 assert_eq!(a_col.get(1).unwrap(), AnyValue::Float64(40.0));
4303 }
4304
4305 #[test]
4306 fn test_pivot_aggregation_min_max() {
4307 let lf = create_pivot_long_lf();
4308 let mut state_min = DataTableState::new(lf.clone(), None, None, None, None, true).unwrap();
4309 state_min
4310 .pivot(&PivotSpec {
4311 index: vec!["id".to_string(), "date".to_string()],
4312 pivot_column: "key".to_string(),
4313 value_column: "value".to_string(),
4314 aggregation: PivotAggregation::Min,
4315 sort_columns: false,
4316 })
4317 .unwrap();
4318 let df_min = state_min.lf.clone().collect().unwrap();
4319 assert_eq!(
4320 df_min.column("A").unwrap().get(0).unwrap(),
4321 AnyValue::Float64(10.0)
4322 );
4323
4324 let mut state_max = DataTableState::new(lf, None, None, None, None, true).unwrap();
4325 state_max
4326 .pivot(&PivotSpec {
4327 index: vec!["id".to_string(), "date".to_string()],
4328 pivot_column: "key".to_string(),
4329 value_column: "value".to_string(),
4330 aggregation: PivotAggregation::Max,
4331 sort_columns: false,
4332 })
4333 .unwrap();
4334 let df_max = state_max.lf.clone().collect().unwrap();
4335 assert_eq!(
4336 df_max.column("A").unwrap().get(0).unwrap(),
4337 AnyValue::Float64(11.0)
4338 );
4339 }
4340
4341 #[test]
4342 fn test_pivot_aggregation_avg_count() {
4343 let lf = create_pivot_long_lf();
4344 let mut state_avg = DataTableState::new(lf.clone(), None, None, None, None, true).unwrap();
4345 state_avg
4346 .pivot(&PivotSpec {
4347 index: vec!["id".to_string(), "date".to_string()],
4348 pivot_column: "key".to_string(),
4349 value_column: "value".to_string(),
4350 aggregation: PivotAggregation::Avg,
4351 sort_columns: false,
4352 })
4353 .unwrap();
4354 let df_avg = state_avg.lf.clone().collect().unwrap();
4355 let a = df_avg.column("A").unwrap().get(0).unwrap();
4356 if let AnyValue::Float64(x) = a {
4357 assert!((x - 10.5).abs() < 1e-6);
4358 } else {
4359 panic!("expected float");
4360 }
4361
4362 let mut state_count = DataTableState::new(lf, None, None, None, None, true).unwrap();
4363 state_count
4364 .pivot(&PivotSpec {
4365 index: vec!["id".to_string(), "date".to_string()],
4366 pivot_column: "key".to_string(),
4367 value_column: "value".to_string(),
4368 aggregation: PivotAggregation::Count,
4369 sort_columns: false,
4370 })
4371 .unwrap();
4372 let df_count = state_count.lf.clone().collect().unwrap();
4373 let a = df_count.column("A").unwrap().get(0).unwrap();
4374 assert_eq!(a, AnyValue::UInt32(2));
4375 }
4376
4377 #[test]
4378 fn test_pivot_string_first_last() {
4379 let df = df!(
4380 "id" => &[1_i32, 1, 2, 2],
4381 "key" => &["X", "Y", "X", "Y"],
4382 "value" => &["low", "mid", "high", "mid"],
4383 )
4384 .unwrap();
4385 let lf = df.lazy();
4386 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4387 let spec = PivotSpec {
4388 index: vec!["id".to_string()],
4389 pivot_column: "key".to_string(),
4390 value_column: "value".to_string(),
4391 aggregation: PivotAggregation::Last,
4392 sort_columns: false,
4393 };
4394 state.pivot(&spec).unwrap();
4395 let out = state.lf.clone().collect().unwrap();
4396 assert_eq!(
4397 out.column("X").unwrap().get(0).unwrap(),
4398 AnyValue::String("low")
4399 );
4400 assert_eq!(
4401 out.column("Y").unwrap().get(0).unwrap(),
4402 AnyValue::String("mid")
4403 );
4404 }
4405
4406 #[test]
4407 fn test_melt_basic() {
4408 let lf = create_melt_wide_lf();
4409 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4410 let spec = MeltSpec {
4411 index: vec!["id".to_string(), "date".to_string()],
4412 value_columns: vec!["c1".to_string(), "c2".to_string(), "c3".to_string()],
4413 variable_name: "variable".to_string(),
4414 value_name: "value".to_string(),
4415 };
4416 state.melt(&spec).unwrap();
4417 let df = state.lf.clone().collect().unwrap();
4418 assert_eq!(df.height(), 9);
4419 let names: Vec<&str> = df.get_column_names().iter().map(|s| s.as_str()).collect();
4420 assert!(names.contains(&"variable"));
4421 assert!(names.contains(&"value"));
4422 assert!(names.contains(&"id"));
4423 assert!(names.contains(&"date"));
4424 }
4425
4426 #[test]
4427 fn test_melt_all_except_index() {
4428 let lf = create_melt_wide_lf();
4429 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4430 let spec = MeltSpec {
4431 index: vec!["id".to_string(), "date".to_string()],
4432 value_columns: vec!["c1".to_string(), "c2".to_string(), "c3".to_string()],
4433 variable_name: "var".to_string(),
4434 value_name: "val".to_string(),
4435 };
4436 state.melt(&spec).unwrap();
4437 let df = state.lf.clone().collect().unwrap();
4438 assert!(df.column("var").is_ok());
4439 assert!(df.column("val").is_ok());
4440 }
4441
4442 #[test]
4443 fn test_pivot_on_current_view_after_filter() {
4444 let lf = create_pivot_long_lf();
4445 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4446 state.filter(vec![FilterStatement {
4447 column: "id".to_string(),
4448 operator: FilterOperator::Eq,
4449 value: "1".to_string(),
4450 logical_op: LogicalOperator::And,
4451 }]);
4452 let spec = PivotSpec {
4453 index: vec!["id".to_string(), "date".to_string()],
4454 pivot_column: "key".to_string(),
4455 value_column: "value".to_string(),
4456 aggregation: PivotAggregation::Last,
4457 sort_columns: false,
4458 };
4459 state.pivot(&spec).unwrap();
4460 let df = state.lf.clone().collect().unwrap();
4461 assert_eq!(df.height(), 1);
4462 let id_col = df.column("id").unwrap();
4463 assert_eq!(id_col.get(0).unwrap(), AnyValue::Int32(1));
4464 }
4465
4466 #[test]
4467 fn test_fuzzy_token_regex() {
4468 assert_eq!(fuzzy_token_regex("foo"), "(?i).*f.*o.*o.*");
4469 assert_eq!(fuzzy_token_regex("a"), "(?i).*a.*");
4470 let pat = fuzzy_token_regex("[");
4472 assert!(pat.contains("\\["));
4473 }
4474
4475 #[test]
4476 fn test_fuzzy_search() {
4477 crate::tests::ensure_sample_data();
4480 let path = crate::tests::sample_data_dir().join("3-sfd-header.csv");
4481 let mut state = DataTableState::from_csv(&path, &Default::default()).unwrap();
4482 state.visible_rows = 10;
4483 state.collect();
4484 let before = state.num_rows;
4485 state.fuzzy_search("string".to_string());
4486 assert!(state.error.is_none(), "{:?}", state.error);
4487 assert!(state.num_rows <= before, "fuzzy search should filter rows");
4488 state.fuzzy_search("".to_string());
4489 state.collect();
4490 assert_eq!(state.num_rows, before, "empty fuzzy search should reset");
4491 assert!(state.get_active_fuzzy_query().is_empty());
4492 }
4493
4494 #[test]
4495 fn test_fuzzy_search_regex_direct() {
4496 let lf = df!("name" => &["alice", "bob", "carol"]).unwrap().lazy();
4498 let pattern = fuzzy_token_regex("alice");
4499 let out = lf
4500 .filter(col("name").str().contains(lit(pattern.clone()), false))
4501 .collect()
4502 .unwrap();
4503 assert_eq!(out.height(), 1, "regex {:?} should match alice", pattern);
4504
4505 let lf2 = df!(
4507 "id" => &[1i32, 2, 3],
4508 "name" => &["alice", "bob", "carol"],
4509 "city" => &["NYC", "LA", "Boston"]
4510 )
4511 .unwrap()
4512 .lazy();
4513 let pat = fuzzy_token_regex("alice");
4514 let expr = col("name")
4515 .str()
4516 .contains(lit(pat.clone()), false)
4517 .or(col("city").str().contains(lit(pat), false));
4518 let out2 = lf2.clone().filter(expr).collect().unwrap();
4519 assert_eq!(out2.height(), 1);
4520
4521 let schema = lf2.clone().collect_schema().unwrap();
4523 let string_cols: Vec<String> = schema
4524 .iter()
4525 .filter(|(_, dtype)| dtype.is_string())
4526 .map(|(name, _)| name.to_string())
4527 .collect();
4528 assert!(
4529 !string_cols.is_empty(),
4530 "df! string cols should be detected"
4531 );
4532 let pattern = fuzzy_token_regex("alice");
4533 let token_expr = string_cols
4534 .iter()
4535 .map(|c| col(c.as_str()).str().contains(lit(pattern.clone()), false))
4536 .reduce(|a, b| a.or(b))
4537 .unwrap();
4538 let out3 = lf2.filter(token_expr).collect().unwrap();
4539 assert_eq!(
4540 out3.height(),
4541 1,
4542 "fuzzy_search-style filter should match 1 row"
4543 );
4544 }
4545
4546 #[test]
4547 fn test_fuzzy_search_no_string_columns() {
4548 let lf = df!("a" => &[1i32, 2, 3], "b" => &[10i64, 20, 30])
4549 .unwrap()
4550 .lazy();
4551 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4552 state.fuzzy_search("x".to_string());
4553 assert!(state.error.is_some());
4554 }
4555}