1use color_eyre::Result;
2use std::borrow::Cow;
3use std::collections::HashSet;
4use std::sync::Arc;
5use std::{fs, fs::File, path::Path};
6
7use polars::io::HiveOptions;
8use polars::prelude::*;
9use ratatui::{
10 buffer::Buffer,
11 layout::Rect,
12 style::{Color, Modifier, Style},
13 text::{Line, Span},
14 widgets::{
15 Block, Borders, Cell, Padding, Paragraph, Row, StatefulWidget, Table, TableState, Widget,
16 },
17};
18
19use crate::error_display::user_message_from_polars;
20use crate::filter_modal::{FilterOperator, FilterStatement, LogicalOperator};
21use crate::pivot_melt_modal::{MeltSpec, PivotAggregation, PivotSpec};
22use crate::query::parse_query;
23use crate::statistics::collect_lazy;
24use crate::{CompressionFormat, OpenOptions};
25use polars::lazy::frame::pivot::pivot_stable;
26use std::io::{BufReader, Read};
27
28use calamine::{open_workbook_auto, Data, Reader};
29use chrono::{NaiveDate, NaiveDateTime, NaiveTime};
30use orc_rust::ArrowReaderBuilder;
31use tempfile::NamedTempFile;
32
33use arrow::array::types::{
34 Date32Type, Date64Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type,
35 TimestampMillisecondType, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
36};
37use arrow::array::{Array, AsArray};
38use arrow::record_batch::RecordBatch;
39
40fn pivot_agg_expr(agg: PivotAggregation) -> Result<Expr> {
41 let e = col(PlSmallStr::from_static(""));
42 let expr = match agg {
43 PivotAggregation::Last => e.last(),
44 PivotAggregation::First => e.first(),
45 PivotAggregation::Min => e.min(),
46 PivotAggregation::Max => e.max(),
47 PivotAggregation::Avg => e.mean(),
48 PivotAggregation::Med => e.median(),
49 PivotAggregation::Std => e.std(1),
50 PivotAggregation::Count => e.len(),
51 };
52 Ok(expr)
53}
54
55pub struct DataTableState {
56 pub lf: LazyFrame,
57 original_lf: LazyFrame,
58 df: Option<DataFrame>, locked_df: Option<DataFrame>, pub table_state: TableState,
61 pub start_row: usize,
62 pub visible_rows: usize,
63 pub termcol_index: usize,
64 pub visible_termcols: usize,
65 pub error: Option<PolarsError>,
66 pub suppress_error_display: bool, pub schema: Arc<Schema>,
68 pub num_rows: usize,
69 num_rows_valid: bool,
71 filters: Vec<FilterStatement>,
72 sort_columns: Vec<String>,
73 sort_ascending: bool,
74 pub active_query: String,
75 pub active_sql_query: String,
77 pub active_fuzzy_query: String,
79 column_order: Vec<String>, locked_columns_count: usize, grouped_lf: Option<LazyFrame>,
82 drilled_down_group_index: Option<usize>, pub drilled_down_group_key: Option<Vec<String>>, pub drilled_down_group_key_columns: Option<Vec<String>>, pages_lookahead: usize,
86 pages_lookback: usize,
87 max_buffered_rows: usize, max_buffered_mb: usize, buffered_start_row: usize,
90 buffered_end_row: usize,
91 buffered_df: Option<DataFrame>,
94 proximity_threshold: usize,
95 row_numbers: bool,
96 row_start_index: usize,
97 last_pivot_spec: Option<PivotSpec>,
99 last_melt_spec: Option<MeltSpec>,
101 pub partition_columns: Option<Vec<String>>,
103 decompress_temp_file: Option<NamedTempFile>,
105 pub polars_streaming: bool,
107 workaround_pivot_date_index: bool,
109}
110
111#[derive(Clone, Copy)]
113enum ExcelColType {
114 Int64,
115 Float64,
116 Boolean,
117 Utf8,
118 Date,
119 Datetime,
120}
121
122impl DataTableState {
123 pub fn new(
124 lf: LazyFrame,
125 pages_lookahead: Option<usize>,
126 pages_lookback: Option<usize>,
127 max_buffered_rows: Option<usize>,
128 max_buffered_mb: Option<usize>,
129 polars_streaming: bool,
130 ) -> Result<Self> {
131 let schema = lf.clone().collect_schema()?;
132 let column_order: Vec<String> = schema.iter_names().map(|s| s.to_string()).collect();
133 Ok(Self {
134 original_lf: lf.clone(),
135 lf,
136 df: None,
137 locked_df: None,
138 table_state: TableState::default(),
139 start_row: 0,
140 visible_rows: 0,
141 termcol_index: 0,
142 visible_termcols: 0,
143 error: None,
144 suppress_error_display: false,
145 schema,
146 num_rows: 0,
147 num_rows_valid: false,
148 filters: Vec::new(),
149 sort_columns: Vec::new(),
150 sort_ascending: true,
151 active_query: String::new(),
152 active_sql_query: String::new(),
153 active_fuzzy_query: String::new(),
154 column_order,
155 locked_columns_count: 0,
156 grouped_lf: None,
157 drilled_down_group_index: None,
158 drilled_down_group_key: None,
159 drilled_down_group_key_columns: None,
160 pages_lookahead: pages_lookahead.unwrap_or(3),
161 pages_lookback: pages_lookback.unwrap_or(3),
162 max_buffered_rows: max_buffered_rows.unwrap_or(100_000),
163 max_buffered_mb: max_buffered_mb.unwrap_or(512),
164 buffered_start_row: 0,
165 buffered_end_row: 0,
166 buffered_df: None,
167 proximity_threshold: 0, row_numbers: false, row_start_index: 1, last_pivot_spec: None,
171 last_melt_spec: None,
172 partition_columns: None,
173 decompress_temp_file: None,
174 polars_streaming,
175 workaround_pivot_date_index: true,
176 })
177 }
178
179 pub fn from_lazyframe(lf: LazyFrame, options: &crate::OpenOptions) -> Result<Self> {
181 let mut state = Self::new(
182 lf,
183 options.pages_lookahead,
184 options.pages_lookback,
185 options.max_buffered_rows,
186 options.max_buffered_mb,
187 options.polars_streaming,
188 )?;
189 state.row_numbers = options.row_numbers;
190 state.row_start_index = options.row_start_index;
191 state.workaround_pivot_date_index = options.workaround_pivot_date_index;
192 Ok(state)
193 }
194
195 pub fn from_schema_and_lazyframe(
199 schema: Arc<Schema>,
200 lf: LazyFrame,
201 options: &crate::OpenOptions,
202 partition_columns: Option<Vec<String>>,
203 ) -> Result<Self> {
204 let column_order: Vec<String> = if let Some(ref part) = partition_columns {
205 let part_set: HashSet<&str> = part.iter().map(String::as_str).collect();
206 let rest: Vec<String> = schema
207 .iter_names()
208 .map(|s| s.to_string())
209 .filter(|c| !part_set.contains(c.as_str()))
210 .collect();
211 part.iter().cloned().chain(rest).collect()
212 } else {
213 schema.iter_names().map(|s| s.to_string()).collect()
214 };
215 Ok(Self {
216 original_lf: lf.clone(),
217 lf,
218 df: None,
219 locked_df: None,
220 table_state: TableState::default(),
221 start_row: 0,
222 visible_rows: 0,
223 termcol_index: 0,
224 visible_termcols: 0,
225 error: None,
226 suppress_error_display: false,
227 schema,
228 num_rows: 0,
229 num_rows_valid: false,
230 filters: Vec::new(),
231 sort_columns: Vec::new(),
232 sort_ascending: true,
233 active_query: String::new(),
234 active_sql_query: String::new(),
235 active_fuzzy_query: String::new(),
236 column_order,
237 locked_columns_count: 0,
238 grouped_lf: None,
239 drilled_down_group_index: None,
240 drilled_down_group_key: None,
241 drilled_down_group_key_columns: None,
242 pages_lookahead: options.pages_lookahead.unwrap_or(3),
243 pages_lookback: options.pages_lookback.unwrap_or(3),
244 max_buffered_rows: options.max_buffered_rows.unwrap_or(100_000),
245 max_buffered_mb: options.max_buffered_mb.unwrap_or(512),
246 buffered_start_row: 0,
247 buffered_end_row: 0,
248 buffered_df: None,
249 proximity_threshold: 0,
250 row_numbers: options.row_numbers,
251 row_start_index: options.row_start_index,
252 last_pivot_spec: None,
253 last_melt_spec: None,
254 partition_columns,
255 decompress_temp_file: None,
256 polars_streaming: options.polars_streaming,
257 workaround_pivot_date_index: options.workaround_pivot_date_index,
258 })
259 }
260
261 fn reset_lf_to_original(&mut self) {
266 self.invalidate_num_rows();
267 self.lf = self.original_lf.clone();
268 self.schema = self
269 .original_lf
270 .clone()
271 .collect_schema()
272 .unwrap_or_else(|_| Arc::new(Schema::with_capacity(0)));
273 self.column_order = self.schema.iter_names().map(|s| s.to_string()).collect();
274 self.active_query.clear();
275 self.active_sql_query.clear();
276 self.active_fuzzy_query.clear();
277 self.locked_columns_count = 0;
278 self.filters.clear();
279 self.sort_columns.clear();
280 self.sort_ascending = true;
281 self.start_row = 0;
282 self.termcol_index = 0;
283 self.drilled_down_group_index = None;
284 self.drilled_down_group_key = None;
285 self.drilled_down_group_key_columns = None;
286 self.grouped_lf = None;
287 self.buffered_start_row = 0;
288 self.buffered_end_row = 0;
289 self.buffered_df = None;
290 self.table_state.select(Some(0));
291 }
292
293 pub fn reset(&mut self) {
294 self.reset_lf_to_original();
295 self.error = None;
296 self.suppress_error_display = false;
297 self.last_pivot_spec = None;
298 self.last_melt_spec = None;
299 self.collect();
300 if self.num_rows > 0 {
301 self.start_row = 0;
302 }
303 }
304
305 pub fn from_parquet(
306 path: &Path,
307 pages_lookahead: Option<usize>,
308 pages_lookback: Option<usize>,
309 max_buffered_rows: Option<usize>,
310 max_buffered_mb: Option<usize>,
311 row_numbers: bool,
312 row_start_index: usize,
313 ) -> Result<Self> {
314 let path_str = path.as_os_str().to_string_lossy();
315 let is_glob = path_str.contains('*');
316 let pl_path = PlPath::Local(Arc::from(path));
317 let args = ScanArgsParquet {
318 glob: is_glob,
319 ..Default::default()
320 };
321 let lf = LazyFrame::scan_parquet(pl_path, args)?;
322 let mut state = Self::new(
323 lf,
324 pages_lookahead,
325 pages_lookback,
326 max_buffered_rows,
327 max_buffered_mb,
328 true,
329 )?;
330 state.row_numbers = row_numbers;
331 state.row_start_index = row_start_index;
332 Ok(state)
333 }
334
335 pub fn from_parquet_paths(
337 paths: &[impl AsRef<Path>],
338 pages_lookahead: Option<usize>,
339 pages_lookback: Option<usize>,
340 max_buffered_rows: Option<usize>,
341 max_buffered_mb: Option<usize>,
342 row_numbers: bool,
343 row_start_index: usize,
344 ) -> Result<Self> {
345 if paths.is_empty() {
346 return Err(color_eyre::eyre::eyre!("No paths provided"));
347 }
348 if paths.len() == 1 {
349 return Self::from_parquet(
350 paths[0].as_ref(),
351 pages_lookahead,
352 pages_lookback,
353 max_buffered_rows,
354 max_buffered_mb,
355 row_numbers,
356 row_start_index,
357 );
358 }
359 let mut lazy_frames = Vec::with_capacity(paths.len());
360 for p in paths {
361 let pl_path = PlPath::Local(Arc::from(p.as_ref()));
362 let lf = LazyFrame::scan_parquet(pl_path, Default::default())?;
363 lazy_frames.push(lf);
364 }
365 let lf = polars::prelude::concat(lazy_frames.as_slice(), Default::default())?;
366 let mut state = Self::new(
367 lf,
368 pages_lookahead,
369 pages_lookback,
370 max_buffered_rows,
371 max_buffered_mb,
372 true,
373 )?;
374 state.row_numbers = row_numbers;
375 state.row_start_index = row_start_index;
376 Ok(state)
377 }
378
379 pub fn from_ipc(
381 path: &Path,
382 pages_lookahead: Option<usize>,
383 pages_lookback: Option<usize>,
384 max_buffered_rows: Option<usize>,
385 max_buffered_mb: Option<usize>,
386 row_numbers: bool,
387 row_start_index: usize,
388 ) -> Result<Self> {
389 let pl_path = PlPath::Local(Arc::from(path));
390 let lf = LazyFrame::scan_ipc(pl_path, Default::default(), Default::default())?;
391 let mut state = Self::new(
392 lf,
393 pages_lookahead,
394 pages_lookback,
395 max_buffered_rows,
396 max_buffered_mb,
397 true,
398 )?;
399 state.row_numbers = row_numbers;
400 state.row_start_index = row_start_index;
401 Ok(state)
402 }
403
404 pub fn from_ipc_paths(
406 paths: &[impl AsRef<Path>],
407 pages_lookahead: Option<usize>,
408 pages_lookback: Option<usize>,
409 max_buffered_rows: Option<usize>,
410 max_buffered_mb: Option<usize>,
411 row_numbers: bool,
412 row_start_index: usize,
413 ) -> Result<Self> {
414 if paths.is_empty() {
415 return Err(color_eyre::eyre::eyre!("No paths provided"));
416 }
417 if paths.len() == 1 {
418 return Self::from_ipc(
419 paths[0].as_ref(),
420 pages_lookahead,
421 pages_lookback,
422 max_buffered_rows,
423 max_buffered_mb,
424 row_numbers,
425 row_start_index,
426 );
427 }
428 let mut lazy_frames = Vec::with_capacity(paths.len());
429 for p in paths {
430 let pl_path = PlPath::Local(Arc::from(p.as_ref()));
431 let lf = LazyFrame::scan_ipc(pl_path, Default::default(), Default::default())?;
432 lazy_frames.push(lf);
433 }
434 let lf = polars::prelude::concat(lazy_frames.as_slice(), Default::default())?;
435 let mut state = Self::new(
436 lf,
437 pages_lookahead,
438 pages_lookback,
439 max_buffered_rows,
440 max_buffered_mb,
441 true,
442 )?;
443 state.row_numbers = row_numbers;
444 state.row_start_index = row_start_index;
445 Ok(state)
446 }
447
448 pub fn from_avro(
450 path: &Path,
451 pages_lookahead: Option<usize>,
452 pages_lookback: Option<usize>,
453 max_buffered_rows: Option<usize>,
454 max_buffered_mb: Option<usize>,
455 row_numbers: bool,
456 row_start_index: usize,
457 ) -> Result<Self> {
458 let file = File::open(path)?;
459 let df = polars::io::avro::AvroReader::new(file).finish()?;
460 let lf = df.lazy();
461 let mut state = Self::new(
462 lf,
463 pages_lookahead,
464 pages_lookback,
465 max_buffered_rows,
466 max_buffered_mb,
467 true,
468 )?;
469 state.row_numbers = row_numbers;
470 state.row_start_index = row_start_index;
471 Ok(state)
472 }
473
474 pub fn from_avro_paths(
476 paths: &[impl AsRef<Path>],
477 pages_lookahead: Option<usize>,
478 pages_lookback: Option<usize>,
479 max_buffered_rows: Option<usize>,
480 max_buffered_mb: Option<usize>,
481 row_numbers: bool,
482 row_start_index: usize,
483 ) -> Result<Self> {
484 if paths.is_empty() {
485 return Err(color_eyre::eyre::eyre!("No paths provided"));
486 }
487 if paths.len() == 1 {
488 return Self::from_avro(
489 paths[0].as_ref(),
490 pages_lookahead,
491 pages_lookback,
492 max_buffered_rows,
493 max_buffered_mb,
494 row_numbers,
495 row_start_index,
496 );
497 }
498 let mut lazy_frames = Vec::with_capacity(paths.len());
499 for p in paths {
500 let file = File::open(p.as_ref())?;
501 let df = polars::io::avro::AvroReader::new(file).finish()?;
502 lazy_frames.push(df.lazy());
503 }
504 let lf = polars::prelude::concat(lazy_frames.as_slice(), Default::default())?;
505 let mut state = Self::new(
506 lf,
507 pages_lookahead,
508 pages_lookback,
509 max_buffered_rows,
510 max_buffered_mb,
511 true,
512 )?;
513 state.row_numbers = row_numbers;
514 state.row_start_index = row_start_index;
515 Ok(state)
516 }
517
518 #[allow(clippy::too_many_arguments)]
521 pub fn from_excel(
522 path: &Path,
523 pages_lookahead: Option<usize>,
524 pages_lookback: Option<usize>,
525 max_buffered_rows: Option<usize>,
526 max_buffered_mb: Option<usize>,
527 row_numbers: bool,
528 row_start_index: usize,
529 excel_sheet: Option<&str>,
530 ) -> Result<Self> {
531 let mut workbook =
532 open_workbook_auto(path).map_err(|e| color_eyre::eyre::eyre!("Excel: {}", e))?;
533 let sheet_names = workbook.sheet_names().to_vec();
534 if sheet_names.is_empty() {
535 return Err(color_eyre::eyre::eyre!("Excel file has no worksheets"));
536 }
537 let range = if let Some(sheet_sel) = excel_sheet {
538 if let Ok(idx) = sheet_sel.parse::<usize>() {
539 workbook
540 .worksheet_range_at(idx)
541 .ok_or_else(|| color_eyre::eyre::eyre!("Excel: no sheet at index {}", idx))?
542 .map_err(|e| color_eyre::eyre::eyre!("Excel: {}", e))?
543 } else {
544 workbook
545 .worksheet_range(sheet_sel)
546 .map_err(|e| color_eyre::eyre::eyre!("Excel: {}", e))?
547 }
548 } else {
549 workbook
550 .worksheet_range_at(0)
551 .ok_or_else(|| color_eyre::eyre::eyre!("Excel: no first sheet"))?
552 .map_err(|e| color_eyre::eyre::eyre!("Excel: {}", e))?
553 };
554 let rows: Vec<Vec<Data>> = range.rows().map(|r| r.to_vec()).collect();
555 if rows.is_empty() {
556 let empty_df = DataFrame::new(vec![])?;
557 let mut state = Self::new(
558 empty_df.lazy(),
559 pages_lookahead,
560 pages_lookback,
561 max_buffered_rows,
562 max_buffered_mb,
563 true,
564 )?;
565 state.row_numbers = row_numbers;
566 state.row_start_index = row_start_index;
567 return Ok(state);
568 }
569 let headers: Vec<String> = rows[0]
570 .iter()
571 .map(|c| calamine::DataType::as_string(c).unwrap_or_else(|| c.to_string()))
572 .collect();
573 let n_cols = headers.len();
574 let mut series_vec = Vec::with_capacity(n_cols);
575 for (col_idx, header) in headers.iter().enumerate() {
576 let col_cells: Vec<Option<&Data>> =
577 rows[1..].iter().map(|row| row.get(col_idx)).collect();
578 let inferred = Self::excel_infer_column_type(&col_cells);
579 let name = if header.is_empty() {
580 format!("column_{}", col_idx + 1)
581 } else {
582 header.clone()
583 };
584 let series = Self::excel_column_to_series(name.as_str(), &col_cells, inferred)?;
585 series_vec.push(series.into());
586 }
587 let df = DataFrame::new(series_vec)?;
588 let mut state = Self::new(
589 df.lazy(),
590 pages_lookahead,
591 pages_lookback,
592 max_buffered_rows,
593 max_buffered_mb,
594 true,
595 )?;
596 state.row_numbers = row_numbers;
597 state.row_start_index = row_start_index;
598 Ok(state)
599 }
600
601 fn excel_infer_column_type(cells: &[Option<&Data>]) -> ExcelColType {
604 use calamine::DataType as CalamineTrait;
605 let mut has_string = false;
606 let mut has_float = false;
607 let mut has_int = false;
608 let mut has_bool = false;
609 let mut has_datetime = false;
610 for cell in cells.iter().flatten() {
611 if CalamineTrait::is_string(*cell) {
612 has_string = true;
613 break;
614 }
615 if CalamineTrait::is_float(*cell)
616 || CalamineTrait::is_datetime(*cell)
617 || CalamineTrait::is_datetime_iso(*cell)
618 {
619 has_float = true;
620 }
621 if CalamineTrait::is_int(*cell) {
622 has_int = true;
623 }
624 if CalamineTrait::is_bool(*cell) {
625 has_bool = true;
626 }
627 if CalamineTrait::is_datetime(*cell) || CalamineTrait::is_datetime_iso(*cell) {
628 has_datetime = true;
629 }
630 }
631 if has_string {
632 let any_parsed = cells
633 .iter()
634 .flatten()
635 .any(|c| Self::excel_cell_to_naive_datetime(c).is_some());
636 let all_non_empty_parse = cells.iter().flatten().all(|c| {
637 CalamineTrait::is_empty(*c) || Self::excel_cell_to_naive_datetime(c).is_some()
638 });
639 if any_parsed && all_non_empty_parse {
640 if Self::excel_parsed_cells_all_midnight(cells) {
641 ExcelColType::Date
642 } else {
643 ExcelColType::Datetime
644 }
645 } else {
646 ExcelColType::Utf8
647 }
648 } else if has_int {
649 ExcelColType::Int64
650 } else if has_datetime {
651 if Self::excel_parsed_cells_all_midnight(cells) {
652 ExcelColType::Date
653 } else {
654 ExcelColType::Datetime
655 }
656 } else if has_float {
657 let all_whole = cells.iter().flatten().all(|cell| {
658 cell.as_f64()
659 .is_none_or(|f| f.is_finite() && (f - f.trunc()).abs() < 1e-10)
660 });
661 if all_whole {
662 ExcelColType::Int64
663 } else {
664 ExcelColType::Float64
665 }
666 } else if has_bool {
667 ExcelColType::Boolean
668 } else {
669 ExcelColType::Utf8
670 }
671 }
672
673 fn excel_parsed_cells_all_midnight(cells: &[Option<&Data>]) -> bool {
675 let midnight = NaiveTime::from_hms_opt(0, 0, 0).expect("valid time");
676 cells
677 .iter()
678 .flatten()
679 .filter_map(|c| Self::excel_cell_to_naive_datetime(c))
680 .all(|dt| dt.time() == midnight)
681 }
682
683 fn excel_cell_to_naive_datetime(cell: &Data) -> Option<NaiveDateTime> {
685 use calamine::DataType;
686 if let Some(dt) = cell.as_datetime() {
687 return Some(dt);
688 }
689 let s = cell.get_datetime_iso().or_else(|| cell.get_string())?;
690 Self::parse_naive_datetime_str(s)
691 }
692
693 fn parse_naive_datetime_str(s: &str) -> Option<NaiveDateTime> {
695 let s = s.trim();
696 if s.is_empty() {
697 return None;
698 }
699 const FORMATS: &[&str] = &[
700 "%Y-%m-%dT%H:%M:%S%.f",
701 "%Y-%m-%dT%H:%M:%S",
702 "%Y-%m-%d %H:%M:%S%.f",
703 "%Y-%m-%d %H:%M:%S",
704 "%Y-%m-%d",
705 ];
706 for fmt in FORMATS {
707 if let Ok(dt) = NaiveDateTime::parse_from_str(s, fmt) {
708 return Some(dt);
709 }
710 }
711 if let Ok(d) = NaiveDate::parse_from_str(s, "%Y-%m-%d") {
712 return Some(d.and_hms_opt(0, 0, 0).expect("midnight"));
713 }
714 None
715 }
716
717 fn excel_column_to_series(
719 name: &str,
720 cells: &[Option<&Data>],
721 col_type: ExcelColType,
722 ) -> Result<Series> {
723 use calamine::DataType as CalamineTrait;
724 use polars::datatypes::TimeUnit;
725 let series = match col_type {
726 ExcelColType::Int64 => {
727 let v: Vec<Option<i64>> = cells
728 .iter()
729 .map(|c| c.and_then(|cell| cell.as_i64()))
730 .collect();
731 Series::new(name.into(), v)
732 }
733 ExcelColType::Float64 => {
734 let v: Vec<Option<f64>> = cells
735 .iter()
736 .map(|c| c.and_then(|cell| cell.as_f64()))
737 .collect();
738 Series::new(name.into(), v)
739 }
740 ExcelColType::Boolean => {
741 let v: Vec<Option<bool>> = cells
742 .iter()
743 .map(|c| c.and_then(|cell| cell.get_bool()))
744 .collect();
745 Series::new(name.into(), v)
746 }
747 ExcelColType::Utf8 => {
748 let v: Vec<Option<String>> = cells
749 .iter()
750 .map(|c| c.and_then(|cell| cell.as_string()))
751 .collect();
752 Series::new(name.into(), v)
753 }
754 ExcelColType::Date => {
755 let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).expect("valid date");
756 let v: Vec<Option<i32>> = cells
757 .iter()
758 .map(|c| {
759 c.and_then(Self::excel_cell_to_naive_datetime)
760 .map(|dt| (dt.date() - epoch).num_days() as i32)
761 })
762 .collect();
763 Series::new(name.into(), v).cast(&DataType::Date)?
764 }
765 ExcelColType::Datetime => {
766 let v: Vec<Option<i64>> = cells
767 .iter()
768 .map(|c| {
769 c.and_then(Self::excel_cell_to_naive_datetime)
770 .map(|dt| dt.and_utc().timestamp_micros())
771 })
772 .collect();
773 Series::new(name.into(), v)
774 .cast(&DataType::Datetime(TimeUnit::Microseconds, None))?
775 }
776 };
777 Ok(series)
778 }
779
780 pub fn from_orc(
783 path: &Path,
784 pages_lookahead: Option<usize>,
785 pages_lookback: Option<usize>,
786 max_buffered_rows: Option<usize>,
787 max_buffered_mb: Option<usize>,
788 row_numbers: bool,
789 row_start_index: usize,
790 ) -> Result<Self> {
791 let file = File::open(path)?;
792 let reader = ArrowReaderBuilder::try_new(file)
793 .map_err(|e| color_eyre::eyre::eyre!("ORC: {}", e))?
794 .build();
795 let batches: Vec<RecordBatch> = reader
796 .collect::<std::result::Result<Vec<_>, _>>()
797 .map_err(|e| color_eyre::eyre::eyre!("ORC: {}", e))?;
798 let df = Self::arrow_record_batches_to_dataframe(&batches)?;
799 let lf = df.lazy();
800 let mut state = Self::new(
801 lf,
802 pages_lookahead,
803 pages_lookback,
804 max_buffered_rows,
805 max_buffered_mb,
806 true,
807 )?;
808 state.row_numbers = row_numbers;
809 state.row_start_index = row_start_index;
810 Ok(state)
811 }
812
813 pub fn from_orc_paths(
815 paths: &[impl AsRef<Path>],
816 pages_lookahead: Option<usize>,
817 pages_lookback: Option<usize>,
818 max_buffered_rows: Option<usize>,
819 max_buffered_mb: Option<usize>,
820 row_numbers: bool,
821 row_start_index: usize,
822 ) -> Result<Self> {
823 if paths.is_empty() {
824 return Err(color_eyre::eyre::eyre!("No paths provided"));
825 }
826 if paths.len() == 1 {
827 return Self::from_orc(
828 paths[0].as_ref(),
829 pages_lookahead,
830 pages_lookback,
831 max_buffered_rows,
832 max_buffered_mb,
833 row_numbers,
834 row_start_index,
835 );
836 }
837 let mut lazy_frames = Vec::with_capacity(paths.len());
838 for p in paths {
839 let file = File::open(p.as_ref())?;
840 let reader = ArrowReaderBuilder::try_new(file)
841 .map_err(|e| color_eyre::eyre::eyre!("ORC: {}", e))?
842 .build();
843 let batches: Vec<RecordBatch> = reader
844 .collect::<std::result::Result<Vec<_>, _>>()
845 .map_err(|e| color_eyre::eyre::eyre!("ORC: {}", e))?;
846 let df = Self::arrow_record_batches_to_dataframe(&batches)?;
847 lazy_frames.push(df.lazy());
848 }
849 let lf = polars::prelude::concat(lazy_frames.as_slice(), Default::default())?;
850 let mut state = Self::new(
851 lf,
852 pages_lookahead,
853 pages_lookback,
854 max_buffered_rows,
855 max_buffered_mb,
856 true,
857 )?;
858 state.row_numbers = row_numbers;
859 state.row_start_index = row_start_index;
860 Ok(state)
861 }
862
863 fn arrow_record_batches_to_dataframe(batches: &[RecordBatch]) -> Result<DataFrame> {
866 if batches.is_empty() {
867 return Ok(DataFrame::new(vec![])?);
868 }
869 let mut all_dfs = Vec::with_capacity(batches.len());
870 for batch in batches {
871 let n_cols = batch.num_columns();
872 let schema = batch.schema();
873 let mut series_vec = Vec::with_capacity(n_cols);
874 for (i, col) in batch.columns().iter().enumerate() {
875 let name = schema.field(i).name().as_str();
876 let s = Self::arrow_array_to_polars_series(name, col)?;
877 series_vec.push(s.into());
878 }
879 let df = DataFrame::new(series_vec)?;
880 all_dfs.push(df);
881 }
882 let mut out = all_dfs.remove(0);
883 for df in all_dfs {
884 out = out.vstack(&df)?;
885 }
886 Ok(out)
887 }
888
889 fn arrow_array_to_polars_series(name: &str, array: &dyn Array) -> Result<Series> {
890 use arrow::datatypes::DataType as ArrowDataType;
891 let len = array.len();
892 match array.data_type() {
893 ArrowDataType::Int8 => {
894 let a = array
895 .as_primitive_opt::<Int8Type>()
896 .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Int8 array"))?;
897 let v: Vec<Option<i8>> = (0..len)
898 .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
899 .collect();
900 Ok(Series::new(name.into(), v))
901 }
902 ArrowDataType::Int16 => {
903 let a = array
904 .as_primitive_opt::<Int16Type>()
905 .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Int16 array"))?;
906 let v: Vec<Option<i16>> = (0..len)
907 .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
908 .collect();
909 Ok(Series::new(name.into(), v))
910 }
911 ArrowDataType::Int32 => {
912 let a = array
913 .as_primitive_opt::<Int32Type>()
914 .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Int32 array"))?;
915 let v: Vec<Option<i32>> = (0..len)
916 .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
917 .collect();
918 Ok(Series::new(name.into(), v))
919 }
920 ArrowDataType::Int64 => {
921 let a = array
922 .as_primitive_opt::<Int64Type>()
923 .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Int64 array"))?;
924 let v: Vec<Option<i64>> = (0..len)
925 .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
926 .collect();
927 Ok(Series::new(name.into(), v))
928 }
929 ArrowDataType::UInt8 => {
930 let a = array
931 .as_primitive_opt::<UInt8Type>()
932 .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected UInt8 array"))?;
933 let v: Vec<Option<i64>> = (0..len)
934 .map(|i| {
935 if a.is_null(i) {
936 None
937 } else {
938 Some(a.value(i) as i64)
939 }
940 })
941 .collect();
942 Ok(Series::new(name.into(), v).cast(&DataType::UInt8)?)
943 }
944 ArrowDataType::UInt16 => {
945 let a = array
946 .as_primitive_opt::<UInt16Type>()
947 .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected UInt16 array"))?;
948 let v: Vec<Option<i64>> = (0..len)
949 .map(|i| {
950 if a.is_null(i) {
951 None
952 } else {
953 Some(a.value(i) as i64)
954 }
955 })
956 .collect();
957 Ok(Series::new(name.into(), v).cast(&DataType::UInt16)?)
958 }
959 ArrowDataType::UInt32 => {
960 let a = array
961 .as_primitive_opt::<UInt32Type>()
962 .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected UInt32 array"))?;
963 let v: Vec<Option<u32>> = (0..len)
964 .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
965 .collect();
966 Ok(Series::new(name.into(), v))
967 }
968 ArrowDataType::UInt64 => {
969 let a = array
970 .as_primitive_opt::<UInt64Type>()
971 .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected UInt64 array"))?;
972 let v: Vec<Option<u64>> = (0..len)
973 .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
974 .collect();
975 Ok(Series::new(name.into(), v))
976 }
977 ArrowDataType::Float32 => {
978 let a = array
979 .as_primitive_opt::<Float32Type>()
980 .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Float32 array"))?;
981 let v: Vec<Option<f32>> = (0..len)
982 .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
983 .collect();
984 Ok(Series::new(name.into(), v))
985 }
986 ArrowDataType::Float64 => {
987 let a = array
988 .as_primitive_opt::<Float64Type>()
989 .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Float64 array"))?;
990 let v: Vec<Option<f64>> = (0..len)
991 .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
992 .collect();
993 Ok(Series::new(name.into(), v))
994 }
995 ArrowDataType::Boolean => {
996 let a = array
997 .as_boolean_opt()
998 .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Boolean array"))?;
999 let v: Vec<Option<bool>> = (0..len)
1000 .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
1001 .collect();
1002 Ok(Series::new(name.into(), v))
1003 }
1004 ArrowDataType::Utf8 => {
1005 let a = array
1006 .as_string_opt::<i32>()
1007 .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Utf8 array"))?;
1008 let v: Vec<Option<String>> = (0..len)
1009 .map(|i| {
1010 if a.is_null(i) {
1011 None
1012 } else {
1013 Some(a.value(i).to_string())
1014 }
1015 })
1016 .collect();
1017 Ok(Series::new(name.into(), v))
1018 }
1019 ArrowDataType::LargeUtf8 => {
1020 let a = array
1021 .as_string_opt::<i64>()
1022 .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected LargeUtf8 array"))?;
1023 let v: Vec<Option<String>> = (0..len)
1024 .map(|i| {
1025 if a.is_null(i) {
1026 None
1027 } else {
1028 Some(a.value(i).to_string())
1029 }
1030 })
1031 .collect();
1032 Ok(Series::new(name.into(), v))
1033 }
1034 ArrowDataType::Date32 => {
1035 let a = array
1036 .as_primitive_opt::<Date32Type>()
1037 .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Date32 array"))?;
1038 let v: Vec<Option<i32>> = (0..len)
1039 .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
1040 .collect();
1041 Ok(Series::new(name.into(), v))
1042 }
1043 ArrowDataType::Date64 => {
1044 let a = array
1045 .as_primitive_opt::<Date64Type>()
1046 .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Date64 array"))?;
1047 let v: Vec<Option<i64>> = (0..len)
1048 .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
1049 .collect();
1050 Ok(Series::new(name.into(), v))
1051 }
1052 ArrowDataType::Timestamp(_, _) => {
1053 let a = array
1054 .as_primitive_opt::<TimestampMillisecondType>()
1055 .ok_or_else(|| color_eyre::eyre::eyre!("ORC: expected Timestamp array"))?;
1056 let v: Vec<Option<i64>> = (0..len)
1057 .map(|i| if a.is_null(i) { None } else { Some(a.value(i)) })
1058 .collect();
1059 Ok(Series::new(name.into(), v))
1060 }
1061 other => Err(color_eyre::eyre::eyre!(
1062 "ORC: unsupported column type {:?} for column '{}'",
1063 other,
1064 name
1065 )),
1066 }
1067 }
1068
1069 pub fn scan_parquet_hive(path: &Path) -> Result<LazyFrame> {
1072 let path_str = path.as_os_str().to_string_lossy();
1073 let is_glob = path_str.contains('*');
1074 let pl_path = PlPath::Local(Arc::from(path));
1075 let args = ScanArgsParquet {
1076 hive_options: HiveOptions::new_enabled(),
1077 glob: is_glob,
1078 ..Default::default()
1079 };
1080 LazyFrame::scan_parquet(pl_path, args).map_err(Into::into)
1081 }
1082
1083 pub fn scan_parquet_hive_with_schema(path: &Path, schema: Arc<Schema>) -> Result<LazyFrame> {
1085 let path_str = path.as_os_str().to_string_lossy();
1086 let is_glob = path_str.contains('*');
1087 let pl_path = PlPath::Local(Arc::from(path));
1088 let args = ScanArgsParquet {
1089 schema: Some(schema),
1090 hive_options: HiveOptions::new_enabled(),
1091 glob: is_glob,
1092 ..Default::default()
1093 };
1094 LazyFrame::scan_parquet(pl_path, args).map_err(Into::into)
1095 }
1096
1097 fn first_parquet_file_in_hive_dir(path: &Path) -> Option<std::path::PathBuf> {
1100 const MAX_DEPTH: usize = 64;
1101 Self::first_parquet_file_spine(path, 0, MAX_DEPTH)
1102 }
1103
1104 fn first_parquet_file_spine(
1105 path: &Path,
1106 depth: usize,
1107 max_depth: usize,
1108 ) -> Option<std::path::PathBuf> {
1109 if depth >= max_depth {
1110 return None;
1111 }
1112 let entries = fs::read_dir(path).ok()?;
1113 let mut first_partition_child: Option<std::path::PathBuf> = None;
1114 for entry in entries.flatten() {
1115 let child = entry.path();
1116 if child.is_file() {
1117 if child
1118 .extension()
1119 .is_some_and(|e| e.eq_ignore_ascii_case("parquet"))
1120 {
1121 return Some(child);
1122 }
1123 } else if child.is_dir() {
1124 if let Some(name) = child.file_name().and_then(|n| n.to_str()) {
1125 if name.contains('=') && first_partition_child.is_none() {
1126 first_partition_child = Some(child);
1127 }
1128 }
1129 }
1130 }
1131 first_partition_child.and_then(|p| Self::first_parquet_file_spine(&p, depth + 1, max_depth))
1132 }
1133
1134 fn read_schema_from_single_parquet(path: &Path) -> Result<Arc<Schema>> {
1136 let file = File::open(path)?;
1137 let mut reader = ParquetReader::new(file);
1138 let arrow_schema = reader.schema()?;
1139 let schema = Schema::from_arrow_schema(arrow_schema.as_ref());
1140 Ok(Arc::new(schema))
1141 }
1142
1143 pub fn schema_from_one_hive_parquet(path: &Path) -> Result<(Arc<Schema>, Vec<String>)> {
1147 let partition_columns = Self::discover_hive_partition_columns(path);
1148 let one_file = Self::first_parquet_file_in_hive_dir(path)
1149 .ok_or_else(|| color_eyre::eyre::eyre!("No parquet file found in hive directory"))?;
1150 let file_schema = Self::read_schema_from_single_parquet(&one_file)?;
1151 let part_set: HashSet<&str> = partition_columns.iter().map(String::as_str).collect();
1152 let mut merged = Schema::with_capacity(partition_columns.len() + file_schema.len());
1153 for name in &partition_columns {
1154 merged.with_column(name.clone().into(), DataType::String);
1155 }
1156 for (name, dtype) in file_schema.iter() {
1157 if !part_set.contains(name.as_str()) {
1158 merged.with_column(name.clone(), dtype.clone());
1159 }
1160 }
1161 Ok((Arc::new(merged), partition_columns))
1162 }
1163
1164 pub fn discover_hive_partition_columns(path: &Path) -> Vec<String> {
1166 if path.is_dir() {
1167 Self::discover_partition_columns_from_path(path)
1168 } else {
1169 Self::discover_partition_columns_from_glob_pattern(path)
1170 }
1171 }
1172
1173 fn discover_partition_columns_from_path(path: &Path) -> Vec<String> {
1178 const MAX_PARTITION_DEPTH: usize = 64;
1179 let mut columns = Vec::<String>::new();
1180 let mut seen = HashSet::<String>::new();
1181 Self::discover_partition_columns_spine(
1182 path,
1183 &mut columns,
1184 &mut seen,
1185 0,
1186 MAX_PARTITION_DEPTH,
1187 );
1188 columns
1189 }
1190
1191 fn discover_partition_columns_spine(
1195 path: &Path,
1196 columns: &mut Vec<String>,
1197 seen: &mut HashSet<String>,
1198 depth: usize,
1199 max_depth: usize,
1200 ) {
1201 if depth >= max_depth {
1202 return;
1203 }
1204 let Ok(entries) = fs::read_dir(path) else {
1205 return;
1206 };
1207 let mut first_partition_child: Option<std::path::PathBuf> = None;
1208 for entry in entries.flatten() {
1209 let child = entry.path();
1210 if child.is_dir() {
1211 if let Some(name) = child.file_name().and_then(|n| n.to_str()) {
1212 if let Some((key, _)) = name.split_once('=') {
1213 if !key.is_empty() && seen.insert(key.to_string()) {
1214 columns.push(key.to_string());
1215 }
1216 if first_partition_child.is_none() {
1217 first_partition_child = Some(child);
1218 }
1219 break;
1220 }
1221 }
1222 }
1223 }
1224 if let Some(one) = first_partition_child {
1225 Self::discover_partition_columns_spine(&one, columns, seen, depth + 1, max_depth);
1226 }
1227 }
1228
1229 fn discover_partition_columns_from_glob_pattern(path: &Path) -> Vec<String> {
1231 let path_str = path.as_os_str().to_string_lossy();
1232 let mut columns = Vec::<String>::new();
1233 let mut seen = HashSet::<String>::new();
1234 for segment in path_str.split('/') {
1235 if let Some((key, rest)) = segment.split_once('=') {
1236 if !key.is_empty()
1237 && (rest == "*" || !rest.contains('*'))
1238 && seen.insert(key.to_string())
1239 {
1240 columns.push(key.to_string());
1241 }
1242 }
1243 }
1244 columns
1245 }
1246
1247 pub fn from_parquet_hive(
1256 path: &Path,
1257 pages_lookahead: Option<usize>,
1258 pages_lookback: Option<usize>,
1259 max_buffered_rows: Option<usize>,
1260 max_buffered_mb: Option<usize>,
1261 row_numbers: bool,
1262 row_start_index: usize,
1263 ) -> Result<Self> {
1264 let path_str = path.as_os_str().to_string_lossy();
1265 let is_glob = path_str.contains('*');
1266 let pl_path = PlPath::Local(Arc::from(path));
1267 let args = ScanArgsParquet {
1268 hive_options: HiveOptions::new_enabled(),
1269 glob: is_glob,
1270 ..Default::default()
1271 };
1272 let mut lf = LazyFrame::scan_parquet(pl_path, args)?;
1273 let schema = lf.collect_schema()?;
1274
1275 let mut discovered = if path.is_dir() {
1276 Self::discover_partition_columns_from_path(path)
1277 } else {
1278 Self::discover_partition_columns_from_glob_pattern(path)
1279 };
1280
1281 if discovered.is_empty() {
1284 let mut dir = path;
1285 while !dir.is_dir() {
1286 match dir.parent() {
1287 Some(p) => dir = p,
1288 None => break,
1289 }
1290 }
1291 if dir.is_dir() {
1292 discovered = Self::discover_partition_columns_from_path(dir);
1293 }
1294 }
1295
1296 let partition_columns: Vec<String> = discovered
1297 .into_iter()
1298 .filter(|c| schema.contains(c.as_str()))
1299 .collect();
1300
1301 let new_order: Vec<String> = if partition_columns.is_empty() {
1302 schema.iter_names().map(|s| s.to_string()).collect()
1303 } else {
1304 let part_set: HashSet<&str> = partition_columns.iter().map(String::as_str).collect();
1305 let all_names: Vec<String> = schema.iter_names().map(|s| s.to_string()).collect();
1306 let rest: Vec<String> = all_names
1307 .into_iter()
1308 .filter(|c| !part_set.contains(c.as_str()))
1309 .collect();
1310 partition_columns.iter().cloned().chain(rest).collect()
1311 };
1312
1313 if !partition_columns.is_empty() {
1314 let exprs: Vec<Expr> = new_order.iter().map(|s| col(s.as_str())).collect();
1315 lf = lf.select(exprs);
1316 }
1317
1318 let mut state = Self::new(
1319 lf,
1320 pages_lookahead,
1321 pages_lookback,
1322 max_buffered_rows,
1323 max_buffered_mb,
1324 true,
1325 )?;
1326 state.row_numbers = row_numbers;
1327 state.row_start_index = row_start_index;
1328 state.partition_columns = if partition_columns.is_empty() {
1329 None
1330 } else {
1331 Some(partition_columns)
1332 };
1333 state.set_column_order(new_order);
1335 Ok(state)
1336 }
1337
1338 pub fn set_row_numbers(&mut self, enabled: bool) {
1339 self.row_numbers = enabled;
1340 }
1341
1342 pub fn toggle_row_numbers(&mut self) {
1343 self.row_numbers = !self.row_numbers;
1344 }
1345
1346 pub fn row_start_index(&self) -> usize {
1348 self.row_start_index
1349 }
1350
1351 fn decompress_compressed_csv_to_temp(
1353 path: &Path,
1354 compression: CompressionFormat,
1355 temp_dir: &Path,
1356 ) -> Result<NamedTempFile> {
1357 let mut temp = NamedTempFile::new_in(temp_dir)?;
1358 let out = temp.as_file_mut();
1359 let mut reader: Box<dyn Read> = match compression {
1360 CompressionFormat::Gzip => {
1361 let f = File::open(path)?;
1362 Box::new(flate2::read::GzDecoder::new(BufReader::new(f)))
1363 }
1364 CompressionFormat::Zstd => {
1365 let f = File::open(path)?;
1366 Box::new(zstd::Decoder::new(BufReader::new(f))?)
1367 }
1368 CompressionFormat::Bzip2 => {
1369 let f = File::open(path)?;
1370 Box::new(bzip2::read::BzDecoder::new(BufReader::new(f)))
1371 }
1372 CompressionFormat::Xz => {
1373 let f = File::open(path)?;
1374 Box::new(xz2::read::XzDecoder::new(BufReader::new(f)))
1375 }
1376 };
1377 std::io::copy(&mut reader, out)?;
1378 out.sync_all()?;
1379 Ok(temp)
1380 }
1381
1382 pub fn from_csv(path: &Path, options: &OpenOptions) -> Result<Self> {
1383 let compression = options
1385 .compression
1386 .or_else(|| CompressionFormat::from_extension(path));
1387
1388 if let Some(compression) = compression {
1389 if options.decompress_in_memory {
1390 match compression {
1392 CompressionFormat::Gzip | CompressionFormat::Zstd => {
1393 let mut read_options = CsvReadOptions::default();
1394 if let Some(skip_lines) = options.skip_lines {
1395 read_options.skip_lines = skip_lines;
1396 }
1397 if let Some(skip_rows) = options.skip_rows {
1398 read_options.skip_rows = skip_rows;
1399 }
1400 if let Some(has_header) = options.has_header {
1401 read_options.has_header = has_header;
1402 }
1403 read_options = read_options.map_parse_options(|opts| {
1404 opts.with_try_parse_dates(options.parse_dates)
1405 });
1406 let df = read_options
1407 .try_into_reader_with_file_path(Some(path.into()))?
1408 .finish()?;
1409 let lf = df.lazy();
1410 let mut state = Self::new(
1411 lf,
1412 options.pages_lookahead,
1413 options.pages_lookback,
1414 options.max_buffered_rows,
1415 options.max_buffered_mb,
1416 options.polars_streaming,
1417 )?;
1418 state.row_numbers = options.row_numbers;
1419 state.row_start_index = options.row_start_index;
1420 Ok(state)
1421 }
1422 CompressionFormat::Bzip2 => {
1423 let file = File::open(path)?;
1424 let mut decoder = bzip2::read::BzDecoder::new(BufReader::new(file));
1425 let mut decompressed = Vec::new();
1426 decoder.read_to_end(&mut decompressed)?;
1427 let mut read_options = CsvReadOptions::default();
1428 if let Some(skip_lines) = options.skip_lines {
1429 read_options.skip_lines = skip_lines;
1430 }
1431 if let Some(skip_rows) = options.skip_rows {
1432 read_options.skip_rows = skip_rows;
1433 }
1434 if let Some(has_header) = options.has_header {
1435 read_options.has_header = has_header;
1436 }
1437 read_options = read_options.map_parse_options(|opts| {
1438 opts.with_try_parse_dates(options.parse_dates)
1439 });
1440 let df = CsvReader::new(std::io::Cursor::new(decompressed))
1441 .with_options(read_options)
1442 .finish()?;
1443 let lf = df.lazy();
1444 let mut state = Self::new(
1445 lf,
1446 options.pages_lookahead,
1447 options.pages_lookback,
1448 options.max_buffered_rows,
1449 options.max_buffered_mb,
1450 options.polars_streaming,
1451 )?;
1452 state.row_numbers = options.row_numbers;
1453 state.row_start_index = options.row_start_index;
1454 Ok(state)
1455 }
1456 CompressionFormat::Xz => {
1457 let file = File::open(path)?;
1458 let mut decoder = xz2::read::XzDecoder::new(BufReader::new(file));
1459 let mut decompressed = Vec::new();
1460 decoder.read_to_end(&mut decompressed)?;
1461 let mut read_options = CsvReadOptions::default();
1462 if let Some(skip_lines) = options.skip_lines {
1463 read_options.skip_lines = skip_lines;
1464 }
1465 if let Some(skip_rows) = options.skip_rows {
1466 read_options.skip_rows = skip_rows;
1467 }
1468 if let Some(has_header) = options.has_header {
1469 read_options.has_header = has_header;
1470 }
1471 read_options = read_options.map_parse_options(|opts| {
1472 opts.with_try_parse_dates(options.parse_dates)
1473 });
1474 let df = CsvReader::new(std::io::Cursor::new(decompressed))
1475 .with_options(read_options)
1476 .finish()?;
1477 let lf = df.lazy();
1478 let mut state = Self::new(
1479 lf,
1480 options.pages_lookahead,
1481 options.pages_lookback,
1482 options.max_buffered_rows,
1483 options.max_buffered_mb,
1484 options.polars_streaming,
1485 )?;
1486 state.row_numbers = options.row_numbers;
1487 state.row_start_index = options.row_start_index;
1488 Ok(state)
1489 }
1490 }
1491 } else {
1492 let temp_dir = options.temp_dir.clone().unwrap_or_else(std::env::temp_dir);
1494 let temp = Self::decompress_compressed_csv_to_temp(path, compression, &temp_dir)?;
1495 let mut state = Self::from_csv_customize(
1496 temp.path(),
1497 options.pages_lookahead,
1498 options.pages_lookback,
1499 options.max_buffered_rows,
1500 options.max_buffered_mb,
1501 |mut reader| {
1502 if let Some(skip_lines) = options.skip_lines {
1503 reader = reader.with_skip_lines(skip_lines);
1504 }
1505 if let Some(skip_rows) = options.skip_rows {
1506 reader = reader.with_skip_rows(skip_rows);
1507 }
1508 if let Some(has_header) = options.has_header {
1509 reader = reader.with_has_header(has_header);
1510 }
1511 reader = reader.with_try_parse_dates(options.parse_dates);
1512 reader
1513 },
1514 )?;
1515 state.row_numbers = options.row_numbers;
1516 state.row_start_index = options.row_start_index;
1517 state.decompress_temp_file = Some(temp);
1518 Ok(state)
1519 }
1520 } else {
1521 let mut state = Self::from_csv_customize(
1523 path,
1524 options.pages_lookahead,
1525 options.pages_lookback,
1526 options.max_buffered_rows,
1527 options.max_buffered_mb,
1528 |mut reader| {
1529 if let Some(skip_lines) = options.skip_lines {
1530 reader = reader.with_skip_lines(skip_lines);
1531 }
1532 if let Some(skip_rows) = options.skip_rows {
1533 reader = reader.with_skip_rows(skip_rows);
1534 }
1535 if let Some(has_header) = options.has_header {
1536 reader = reader.with_has_header(has_header);
1537 }
1538 reader = reader.with_try_parse_dates(options.parse_dates);
1539 reader
1540 },
1541 )?;
1542 state.row_numbers = options.row_numbers;
1543 Ok(state)
1544 }
1545 }
1546
1547 pub fn from_csv_customize<F>(
1548 path: &Path,
1549 pages_lookahead: Option<usize>,
1550 pages_lookback: Option<usize>,
1551 max_buffered_rows: Option<usize>,
1552 max_buffered_mb: Option<usize>,
1553 func: F,
1554 ) -> Result<Self>
1555 where
1556 F: FnOnce(LazyCsvReader) -> LazyCsvReader,
1557 {
1558 let pl_path = PlPath::Local(Arc::from(path));
1559 let reader = LazyCsvReader::new(pl_path);
1560 let lf = func(reader).finish()?;
1561 Self::new(
1562 lf,
1563 pages_lookahead,
1564 pages_lookback,
1565 max_buffered_rows,
1566 max_buffered_mb,
1567 true,
1568 )
1569 }
1570
1571 pub fn from_csv_paths(paths: &[impl AsRef<Path>], options: &OpenOptions) -> Result<Self> {
1573 if paths.is_empty() {
1574 return Err(color_eyre::eyre::eyre!("No paths provided"));
1575 }
1576 if paths.len() == 1 {
1577 return Self::from_csv(paths[0].as_ref(), options);
1578 }
1579 let mut lazy_frames = Vec::with_capacity(paths.len());
1580 for p in paths {
1581 let pl_path = PlPath::Local(Arc::from(p.as_ref()));
1582 let mut reader = LazyCsvReader::new(pl_path);
1583 if let Some(skip_lines) = options.skip_lines {
1584 reader = reader.with_skip_lines(skip_lines);
1585 }
1586 if let Some(skip_rows) = options.skip_rows {
1587 reader = reader.with_skip_rows(skip_rows);
1588 }
1589 if let Some(has_header) = options.has_header {
1590 reader = reader.with_has_header(has_header);
1591 }
1592 reader = reader.with_try_parse_dates(options.parse_dates);
1593 let lf = reader.finish()?;
1594 lazy_frames.push(lf);
1595 }
1596 let lf = polars::prelude::concat(lazy_frames.as_slice(), Default::default())?;
1597 let mut state = Self::new(
1598 lf,
1599 options.pages_lookahead,
1600 options.pages_lookback,
1601 options.max_buffered_rows,
1602 options.max_buffered_mb,
1603 options.polars_streaming,
1604 )?;
1605 state.row_numbers = options.row_numbers;
1606 state.row_start_index = options.row_start_index;
1607 Ok(state)
1608 }
1609
1610 pub fn from_ndjson(
1611 path: &Path,
1612 pages_lookahead: Option<usize>,
1613 pages_lookback: Option<usize>,
1614 max_buffered_rows: Option<usize>,
1615 max_buffered_mb: Option<usize>,
1616 row_numbers: bool,
1617 row_start_index: usize,
1618 ) -> Result<Self> {
1619 let pl_path = PlPath::Local(Arc::from(path));
1620 let lf = LazyJsonLineReader::new(pl_path).finish()?;
1621 let mut state = Self::new(
1622 lf,
1623 pages_lookahead,
1624 pages_lookback,
1625 max_buffered_rows,
1626 max_buffered_mb,
1627 true,
1628 )?;
1629 state.row_numbers = row_numbers;
1630 state.row_start_index = row_start_index;
1631 Ok(state)
1632 }
1633
1634 pub fn from_ndjson_paths(
1636 paths: &[impl AsRef<Path>],
1637 pages_lookahead: Option<usize>,
1638 pages_lookback: Option<usize>,
1639 max_buffered_rows: Option<usize>,
1640 max_buffered_mb: Option<usize>,
1641 row_numbers: bool,
1642 row_start_index: usize,
1643 ) -> Result<Self> {
1644 if paths.is_empty() {
1645 return Err(color_eyre::eyre::eyre!("No paths provided"));
1646 }
1647 if paths.len() == 1 {
1648 return Self::from_ndjson(
1649 paths[0].as_ref(),
1650 pages_lookahead,
1651 pages_lookback,
1652 max_buffered_rows,
1653 max_buffered_mb,
1654 row_numbers,
1655 row_start_index,
1656 );
1657 }
1658 let mut lazy_frames = Vec::with_capacity(paths.len());
1659 for p in paths {
1660 let pl_path = PlPath::Local(Arc::from(p.as_ref()));
1661 let lf = LazyJsonLineReader::new(pl_path).finish()?;
1662 lazy_frames.push(lf);
1663 }
1664 let lf = polars::prelude::concat(lazy_frames.as_slice(), Default::default())?;
1665 let mut state = Self::new(
1666 lf,
1667 pages_lookahead,
1668 pages_lookback,
1669 max_buffered_rows,
1670 max_buffered_mb,
1671 true,
1672 )?;
1673 state.row_numbers = row_numbers;
1674 state.row_start_index = row_start_index;
1675 Ok(state)
1676 }
1677
1678 pub fn from_json(
1679 path: &Path,
1680 pages_lookahead: Option<usize>,
1681 pages_lookback: Option<usize>,
1682 max_buffered_rows: Option<usize>,
1683 max_buffered_mb: Option<usize>,
1684 row_numbers: bool,
1685 row_start_index: usize,
1686 ) -> Result<Self> {
1687 Self::from_json_with_format(
1688 path,
1689 pages_lookahead,
1690 pages_lookback,
1691 max_buffered_rows,
1692 max_buffered_mb,
1693 row_numbers,
1694 row_start_index,
1695 JsonFormat::Json,
1696 )
1697 }
1698
1699 pub fn from_json_lines(
1700 path: &Path,
1701 pages_lookahead: Option<usize>,
1702 pages_lookback: Option<usize>,
1703 max_buffered_rows: Option<usize>,
1704 max_buffered_mb: Option<usize>,
1705 row_numbers: bool,
1706 row_start_index: usize,
1707 ) -> Result<Self> {
1708 Self::from_json_with_format(
1709 path,
1710 pages_lookahead,
1711 pages_lookback,
1712 max_buffered_rows,
1713 max_buffered_mb,
1714 row_numbers,
1715 row_start_index,
1716 JsonFormat::JsonLines,
1717 )
1718 }
1719
1720 #[allow(clippy::too_many_arguments)]
1721 fn from_json_with_format(
1722 path: &Path,
1723 pages_lookahead: Option<usize>,
1724 pages_lookback: Option<usize>,
1725 max_buffered_rows: Option<usize>,
1726 max_buffered_mb: Option<usize>,
1727 row_numbers: bool,
1728 row_start_index: usize,
1729 format: JsonFormat,
1730 ) -> Result<Self> {
1731 let file = File::open(path)?;
1732 let lf = JsonReader::new(file)
1733 .with_json_format(format)
1734 .finish()?
1735 .lazy();
1736 let mut state = Self::new(
1737 lf,
1738 pages_lookahead,
1739 pages_lookback,
1740 max_buffered_rows,
1741 max_buffered_mb,
1742 true,
1743 )?;
1744 state.row_numbers = row_numbers;
1745 state.row_start_index = row_start_index;
1746 Ok(state)
1747 }
1748
1749 pub fn from_json_paths(
1751 paths: &[impl AsRef<Path>],
1752 pages_lookahead: Option<usize>,
1753 pages_lookback: Option<usize>,
1754 max_buffered_rows: Option<usize>,
1755 max_buffered_mb: Option<usize>,
1756 row_numbers: bool,
1757 row_start_index: usize,
1758 ) -> Result<Self> {
1759 Self::from_json_with_format_paths(
1760 paths,
1761 pages_lookahead,
1762 pages_lookback,
1763 max_buffered_rows,
1764 max_buffered_mb,
1765 row_numbers,
1766 row_start_index,
1767 JsonFormat::Json,
1768 )
1769 }
1770
1771 pub fn from_json_lines_paths(
1773 paths: &[impl AsRef<Path>],
1774 pages_lookahead: Option<usize>,
1775 pages_lookback: Option<usize>,
1776 max_buffered_rows: Option<usize>,
1777 max_buffered_mb: Option<usize>,
1778 row_numbers: bool,
1779 row_start_index: usize,
1780 ) -> Result<Self> {
1781 Self::from_json_with_format_paths(
1782 paths,
1783 pages_lookahead,
1784 pages_lookback,
1785 max_buffered_rows,
1786 max_buffered_mb,
1787 row_numbers,
1788 row_start_index,
1789 JsonFormat::JsonLines,
1790 )
1791 }
1792
1793 #[allow(clippy::too_many_arguments)]
1794 fn from_json_with_format_paths(
1795 paths: &[impl AsRef<Path>],
1796 pages_lookahead: Option<usize>,
1797 pages_lookback: Option<usize>,
1798 max_buffered_rows: Option<usize>,
1799 max_buffered_mb: Option<usize>,
1800 row_numbers: bool,
1801 row_start_index: usize,
1802 format: JsonFormat,
1803 ) -> Result<Self> {
1804 if paths.is_empty() {
1805 return Err(color_eyre::eyre::eyre!("No paths provided"));
1806 }
1807 if paths.len() == 1 {
1808 return Self::from_json_with_format(
1809 paths[0].as_ref(),
1810 pages_lookahead,
1811 pages_lookback,
1812 max_buffered_rows,
1813 max_buffered_mb,
1814 row_numbers,
1815 row_start_index,
1816 format,
1817 );
1818 }
1819 let mut lazy_frames = Vec::with_capacity(paths.len());
1820 for p in paths {
1821 let file = File::open(p.as_ref())?;
1822 let lf = match &format {
1823 JsonFormat::Json => JsonReader::new(file)
1824 .with_json_format(JsonFormat::Json)
1825 .finish()?
1826 .lazy(),
1827 JsonFormat::JsonLines => JsonReader::new(file)
1828 .with_json_format(JsonFormat::JsonLines)
1829 .finish()?
1830 .lazy(),
1831 };
1832 lazy_frames.push(lf);
1833 }
1834 let lf = polars::prelude::concat(lazy_frames.as_slice(), Default::default())?;
1835 let mut state = Self::new(
1836 lf,
1837 pages_lookahead,
1838 pages_lookback,
1839 max_buffered_rows,
1840 max_buffered_mb,
1841 true,
1842 )?;
1843 state.row_numbers = row_numbers;
1844 state.row_start_index = row_start_index;
1845 Ok(state)
1846 }
1847
1848 #[allow(clippy::too_many_arguments)]
1849 pub fn from_delimited(
1850 path: &Path,
1851 delimiter: u8,
1852 pages_lookahead: Option<usize>,
1853 pages_lookback: Option<usize>,
1854 max_buffered_rows: Option<usize>,
1855 max_buffered_mb: Option<usize>,
1856 row_numbers: bool,
1857 row_start_index: usize,
1858 ) -> Result<Self> {
1859 let pl_path = PlPath::Local(Arc::from(path));
1860 let reader = LazyCsvReader::new(pl_path).with_separator(delimiter);
1861 let lf = reader.finish()?;
1862 let mut state = Self::new(
1863 lf,
1864 pages_lookahead,
1865 pages_lookback,
1866 max_buffered_rows,
1867 max_buffered_mb,
1868 true,
1869 )?;
1870 state.row_numbers = row_numbers;
1871 state.row_start_index = row_start_index;
1872 Ok(state)
1873 }
1874
1875 pub fn scroll_would_trigger_collect(&self, rows: i64) -> bool {
1878 if rows < 0 && self.start_row == 0 {
1879 return false;
1880 }
1881 let new_start_row = if self.start_row as i64 + rows <= 0 {
1882 0
1883 } else {
1884 if let Some(df) = self.df.as_ref() {
1885 if rows > 0 && df.shape().0 <= self.visible_rows {
1886 return false;
1887 }
1888 }
1889 (self.start_row as i64 + rows) as usize
1890 };
1891 let view_end = new_start_row
1892 + self
1893 .visible_rows
1894 .min(self.num_rows.saturating_sub(new_start_row));
1895 let within_buffer = new_start_row >= self.buffered_start_row
1896 && view_end <= self.buffered_end_row
1897 && self.buffered_end_row > 0;
1898 !within_buffer
1899 }
1900
1901 fn slide_table(&mut self, rows: i64) {
1902 if rows < 0 && self.start_row == 0 {
1903 return;
1904 }
1905
1906 let new_start_row = if self.start_row as i64 + rows <= 0 {
1907 0
1908 } else {
1909 if let Some(df) = self.df.as_ref() {
1910 if rows > 0 && df.shape().0 <= self.visible_rows {
1911 return;
1912 }
1913 }
1914 (self.start_row as i64 + rows) as usize
1915 };
1916
1917 let view_end = new_start_row
1919 + self
1920 .visible_rows
1921 .min(self.num_rows.saturating_sub(new_start_row));
1922 let within_buffer = new_start_row >= self.buffered_start_row
1923 && view_end <= self.buffered_end_row
1924 && self.buffered_end_row > 0;
1925
1926 if within_buffer {
1927 self.start_row = new_start_row;
1928 return;
1929 }
1930
1931 self.start_row = new_start_row;
1932 self.collect();
1933 }
1934
1935 pub fn collect(&mut self) {
1936 if self.visible_rows > 0 {
1938 self.proximity_threshold = self.visible_rows;
1939 }
1940
1941 if !self.num_rows_valid {
1943 self.num_rows =
1944 match collect_lazy(self.lf.clone().select([len()]), self.polars_streaming) {
1945 Ok(df) => {
1946 if let Some(col) = df.get(0) {
1947 if let Some(AnyValue::UInt32(len)) = col.first() {
1948 *len as usize
1949 } else {
1950 0
1951 }
1952 } else {
1953 0
1954 }
1955 }
1956 Err(_) => 0,
1957 };
1958 self.num_rows_valid = true;
1959 }
1960
1961 if self.num_rows > 0 {
1962 let max_start = self.num_rows.saturating_sub(1);
1963 if self.start_row > max_start {
1964 self.start_row = max_start;
1965 }
1966 } else {
1967 self.start_row = 0;
1968 self.buffered_start_row = 0;
1969 self.buffered_end_row = 0;
1970 self.buffered_df = None;
1971 self.df = None;
1972 self.locked_df = None;
1973 return;
1974 }
1975
1976 let view_start = self.start_row;
1978 let view_end = self.start_row + self.visible_rows.min(self.num_rows - self.start_row);
1979
1980 let within_buffer = view_start >= self.buffered_start_row
1982 && view_end <= self.buffered_end_row
1983 && self.buffered_end_row > 0;
1984
1985 let page_rows = self.visible_rows.max(1);
1988
1989 if within_buffer {
1990 let dist_to_start = view_start.saturating_sub(self.buffered_start_row);
1991 let dist_to_end = self.buffered_end_row.saturating_sub(view_end);
1992
1993 let needs_expansion_back =
1994 dist_to_start <= self.proximity_threshold && self.buffered_start_row > 0;
1995 let needs_expansion_forward =
1996 dist_to_end <= self.proximity_threshold && self.buffered_end_row < self.num_rows;
1997
1998 if !needs_expansion_back && !needs_expansion_forward {
1999 let expected_len = self
2001 .buffered_end_row
2002 .saturating_sub(self.buffered_start_row);
2003 if self
2004 .buffered_df
2005 .as_ref()
2006 .is_some_and(|b| b.height() == expected_len)
2007 {
2008 self.slice_buffer_into_display();
2009 if self.table_state.selected().is_none() {
2010 self.table_state.select(Some(0));
2011 }
2012 return;
2013 }
2014 self.load_buffer(self.buffered_start_row, self.buffered_end_row);
2015 if self.table_state.selected().is_none() {
2016 self.table_state.select(Some(0));
2017 }
2018 return;
2019 }
2020
2021 let mut new_buffer_start = if needs_expansion_back {
2022 view_start.saturating_sub(self.pages_lookback * page_rows)
2023 } else {
2024 self.buffered_start_row
2025 };
2026
2027 let mut new_buffer_end = if needs_expansion_forward {
2028 (view_end + self.pages_lookahead * page_rows).min(self.num_rows)
2029 } else {
2030 self.buffered_end_row
2031 };
2032
2033 self.clamp_buffer_to_max_size(
2034 view_start,
2035 view_end,
2036 &mut new_buffer_start,
2037 &mut new_buffer_end,
2038 );
2039 self.load_buffer(new_buffer_start, new_buffer_end);
2040 } else {
2041 let mut new_buffer_start;
2046 let mut new_buffer_end;
2047
2048 let had_buffer = self.buffered_end_row > 0;
2049 let scrolled_past_end = had_buffer && view_start >= self.buffered_end_row;
2050 let scrolled_past_start = had_buffer && view_end <= self.buffered_start_row;
2051
2052 let extend_forward_ok = scrolled_past_end
2053 && (view_start - self.buffered_end_row) <= self.pages_lookahead * page_rows;
2054 let extend_backward_ok = scrolled_past_start
2055 && (self.buffered_start_row - view_end) <= self.pages_lookback * page_rows;
2056
2057 if extend_forward_ok {
2058 new_buffer_start = self.buffered_start_row;
2060 new_buffer_end = (view_end + self.pages_lookahead * page_rows).min(self.num_rows);
2061 } else if extend_backward_ok {
2062 new_buffer_start = view_start.saturating_sub(self.pages_lookback * page_rows);
2064 new_buffer_end = self.buffered_end_row;
2065 } else if scrolled_past_end || scrolled_past_start {
2066 new_buffer_start = view_start.saturating_sub(self.pages_lookback * page_rows);
2068 new_buffer_end = (view_end + self.pages_lookahead * page_rows).min(self.num_rows);
2069 let min_initial_len = (1 + self.pages_lookahead + self.pages_lookback) * page_rows;
2070 let current_len = new_buffer_end.saturating_sub(new_buffer_start);
2071 if current_len < min_initial_len {
2072 let need = min_initial_len.saturating_sub(current_len);
2073 let can_extend_end = self.num_rows.saturating_sub(new_buffer_end);
2074 let can_extend_start = new_buffer_start;
2075 if can_extend_end >= need {
2076 new_buffer_end = (new_buffer_end + need).min(self.num_rows);
2077 } else if can_extend_start >= need {
2078 new_buffer_start = new_buffer_start.saturating_sub(need);
2079 } else {
2080 new_buffer_end = (new_buffer_end + can_extend_end).min(self.num_rows);
2081 new_buffer_start =
2082 new_buffer_start.saturating_sub(need.saturating_sub(can_extend_end));
2083 }
2084 }
2085 } else {
2086 new_buffer_start = view_start.saturating_sub(self.pages_lookback * page_rows);
2088 new_buffer_end = (view_end + self.pages_lookahead * page_rows).min(self.num_rows);
2089
2090 let min_initial_len = (1 + self.pages_lookahead + self.pages_lookback) * page_rows;
2092 let current_len = new_buffer_end.saturating_sub(new_buffer_start);
2093 if current_len < min_initial_len {
2094 let need = min_initial_len.saturating_sub(current_len);
2095 let can_extend_end = self.num_rows.saturating_sub(new_buffer_end);
2096 let can_extend_start = new_buffer_start;
2097 if can_extend_end >= need {
2098 new_buffer_end = (new_buffer_end + need).min(self.num_rows);
2099 } else if can_extend_start >= need {
2100 new_buffer_start = new_buffer_start.saturating_sub(need);
2101 } else {
2102 new_buffer_end = (new_buffer_end + can_extend_end).min(self.num_rows);
2103 new_buffer_start =
2104 new_buffer_start.saturating_sub(need.saturating_sub(can_extend_end));
2105 }
2106 }
2107 }
2108
2109 self.clamp_buffer_to_max_size(
2110 view_start,
2111 view_end,
2112 &mut new_buffer_start,
2113 &mut new_buffer_end,
2114 );
2115 self.load_buffer(new_buffer_start, new_buffer_end);
2116 }
2117
2118 self.slice_from_buffer();
2119 if self.table_state.selected().is_none() {
2120 self.table_state.select(Some(0));
2121 }
2122 }
2123
2124 fn invalidate_num_rows(&mut self) {
2126 self.num_rows_valid = false;
2127 }
2128
2129 pub fn num_rows_if_valid(&self) -> Option<usize> {
2132 if self.num_rows_valid {
2133 Some(self.num_rows)
2134 } else {
2135 None
2136 }
2137 }
2138
2139 fn clamp_buffer_to_max_size(
2141 &self,
2142 view_start: usize,
2143 view_end: usize,
2144 buffer_start: &mut usize,
2145 buffer_end: &mut usize,
2146 ) {
2147 if self.max_buffered_rows == 0 {
2148 return;
2149 }
2150 let max_len = self.max_buffered_rows;
2151 let requested_len = buffer_end.saturating_sub(*buffer_start);
2152 if requested_len <= max_len {
2153 return;
2154 }
2155 let view_len = view_end.saturating_sub(view_start);
2156 if view_len >= max_len {
2157 *buffer_start = view_start;
2158 *buffer_end = (view_start + max_len).min(self.num_rows);
2159 } else {
2160 let half = (max_len - view_len) / 2;
2161 *buffer_end = (view_end + half).min(self.num_rows);
2162 *buffer_start = (*buffer_end).saturating_sub(max_len);
2163 if *buffer_start > view_start {
2164 *buffer_start = view_start;
2165 }
2166 *buffer_end = (*buffer_start + max_len).min(self.num_rows);
2167 }
2168 }
2169
2170 fn load_buffer(&mut self, buffer_start: usize, buffer_end: usize) {
2171 let buffer_size = buffer_end.saturating_sub(buffer_start);
2172 if buffer_size == 0 {
2173 return;
2174 }
2175
2176 let all_columns: Vec<_> = self
2177 .column_order
2178 .iter()
2179 .map(|name| col(name.as_str()))
2180 .collect();
2181
2182 let use_streaming = self.polars_streaming;
2183 let mut full_df = match collect_lazy(
2184 self.lf
2185 .clone()
2186 .select(all_columns)
2187 .slice(buffer_start as i64, buffer_size as u32),
2188 use_streaming,
2189 ) {
2190 Ok(df) => df,
2191 Err(e) => {
2192 self.error = Some(e);
2193 return;
2194 }
2195 };
2196
2197 let mut effective_buffer_end = buffer_end;
2198 if self.max_buffered_mb > 0 {
2199 let size = full_df.estimated_size();
2200 let max_bytes = self.max_buffered_mb * 1024 * 1024;
2201 if size > max_bytes {
2202 let rows = full_df.height();
2203 if rows > 0 {
2204 let bytes_per_row = size / rows;
2205 let max_rows = (max_bytes / bytes_per_row.max(1)).min(rows);
2206 if max_rows < rows {
2207 full_df = full_df.slice(0, max_rows);
2208 effective_buffer_end = buffer_start + max_rows;
2209 }
2210 }
2211 }
2212 }
2213
2214 if self.locked_columns_count > 0 {
2215 let locked_names: Vec<&str> = self
2216 .column_order
2217 .iter()
2218 .take(self.locked_columns_count)
2219 .map(|s| s.as_str())
2220 .collect();
2221 let locked_df = match full_df.select(locked_names) {
2222 Ok(df) => df,
2223 Err(e) => {
2224 self.error = Some(e);
2225 return;
2226 }
2227 };
2228 self.locked_df = if self.is_grouped() {
2229 match self.format_grouped_dataframe(locked_df) {
2230 Ok(formatted_df) => Some(formatted_df),
2231 Err(e) => {
2232 self.error = Some(PolarsError::ComputeError(
2233 crate::error_display::user_message_from_report(&e, None).into(),
2234 ));
2235 return;
2236 }
2237 }
2238 } else {
2239 Some(locked_df)
2240 };
2241 } else {
2242 self.locked_df = None;
2243 }
2244
2245 let scroll_names: Vec<&str> = self
2246 .column_order
2247 .iter()
2248 .skip(self.locked_columns_count + self.termcol_index)
2249 .map(|s| s.as_str())
2250 .collect();
2251 if scroll_names.is_empty() {
2252 self.df = None;
2253 } else {
2254 let scroll_df = match full_df.select(scroll_names) {
2255 Ok(df) => df,
2256 Err(e) => {
2257 self.error = Some(e);
2258 return;
2259 }
2260 };
2261 self.df = if self.is_grouped() {
2262 match self.format_grouped_dataframe(scroll_df) {
2263 Ok(formatted_df) => Some(formatted_df),
2264 Err(e) => {
2265 self.error = Some(PolarsError::ComputeError(
2266 crate::error_display::user_message_from_report(&e, None).into(),
2267 ));
2268 return;
2269 }
2270 }
2271 } else {
2272 Some(scroll_df)
2273 };
2274 }
2275 if self.error.is_some() {
2276 self.error = None;
2277 }
2278 self.buffered_start_row = buffer_start;
2279 self.buffered_end_row = effective_buffer_end;
2280 self.buffered_df = Some(full_df);
2281 }
2282
2283 fn slice_buffer_into_display(&mut self) {
2285 let full_df = match self.buffered_df.as_ref() {
2286 Some(df) => df,
2287 None => return,
2288 };
2289
2290 if self.locked_columns_count > 0 {
2291 let locked_names: Vec<&str> = self
2292 .column_order
2293 .iter()
2294 .take(self.locked_columns_count)
2295 .map(|s| s.as_str())
2296 .collect();
2297 if let Ok(locked_df) = full_df.select(locked_names) {
2298 self.locked_df = if self.is_grouped() {
2299 self.format_grouped_dataframe(locked_df).ok()
2300 } else {
2301 Some(locked_df)
2302 };
2303 }
2304 } else {
2305 self.locked_df = None;
2306 }
2307
2308 let scroll_names: Vec<&str> = self
2309 .column_order
2310 .iter()
2311 .skip(self.locked_columns_count + self.termcol_index)
2312 .map(|s| s.as_str())
2313 .collect();
2314 if scroll_names.is_empty() {
2315 self.df = None;
2316 } else if let Ok(scroll_df) = full_df.select(scroll_names) {
2317 self.df = if self.is_grouped() {
2318 self.format_grouped_dataframe(scroll_df).ok()
2319 } else {
2320 Some(scroll_df)
2321 };
2322 }
2323 }
2324
2325 fn slice_from_buffer(&mut self) {
2326 }
2331
2332 fn format_grouped_dataframe(&self, df: DataFrame) -> Result<DataFrame> {
2333 let schema = df.schema();
2334 let mut new_series = Vec::new();
2335
2336 for (col_name, dtype) in schema.iter() {
2337 let col = df.column(col_name)?;
2338 if matches!(dtype, DataType::List(_)) {
2339 let string_series: Series = col
2340 .list()?
2341 .into_iter()
2342 .map(|opt_list| {
2343 opt_list.map(|list_series| {
2344 let values: Vec<String> = list_series
2345 .iter()
2346 .take(10)
2347 .map(|v| v.str_value().to_string())
2348 .collect();
2349 if list_series.len() > 10 {
2350 format!("[{}...] ({} items)", values.join(", "), list_series.len())
2351 } else {
2352 format!("[{}]", values.join(", "))
2353 }
2354 })
2355 })
2356 .collect();
2357 new_series.push(string_series.with_name(col_name.as_str().into()).into());
2358 } else {
2359 new_series.push(col.clone());
2360 }
2361 }
2362
2363 Ok(DataFrame::new(new_series)?)
2364 }
2365
2366 pub fn select_next(&mut self) {
2367 self.table_state.select_next();
2368 if let Some(selected) = self.table_state.selected() {
2369 if selected >= self.visible_rows && self.visible_rows > 0 {
2370 self.slide_table(1);
2371 }
2372 }
2373 }
2374
2375 pub fn page_down(&mut self) {
2376 self.slide_table(self.visible_rows as i64);
2377 }
2378
2379 pub fn select_previous(&mut self) {
2380 if let Some(selected) = self.table_state.selected() {
2381 self.table_state.select_previous();
2382 if selected == 0 && self.start_row > 0 {
2383 self.slide_table(-1);
2384 }
2385 } else {
2386 self.table_state.select(Some(0));
2387 }
2388 }
2389
2390 pub fn scroll_to(&mut self, index: usize) {
2391 if self.start_row == index {
2392 return;
2393 }
2394
2395 if index == 0 {
2396 self.start_row = 0;
2397 self.collect();
2398 self.start_row = 0;
2399 } else {
2400 self.start_row = index;
2401 self.collect();
2402 }
2403 }
2404
2405 pub fn scroll_to_row_centered(&mut self, row_index: usize) {
2408 self.ensure_num_rows();
2409 if self.num_rows == 0 || self.visible_rows == 0 {
2410 return;
2411 }
2412 let center_offset = self.visible_rows / 2;
2413 let mut start_row = row_index.saturating_sub(center_offset);
2414 let max_start = self.num_rows.saturating_sub(self.visible_rows);
2415 start_row = start_row.min(max_start);
2416
2417 if self.start_row == start_row {
2418 let display_idx = row_index
2419 .saturating_sub(start_row)
2420 .min(self.visible_rows.saturating_sub(1));
2421 self.table_state.select(Some(display_idx));
2422 return;
2423 }
2424
2425 self.start_row = start_row;
2426 self.collect();
2427 let display_idx = row_index
2428 .saturating_sub(start_row)
2429 .min(self.visible_rows.saturating_sub(1));
2430 self.table_state.select(Some(display_idx));
2431 }
2432
2433 fn ensure_num_rows(&mut self) {
2435 if self.num_rows_valid {
2436 return;
2437 }
2438 if self.visible_rows > 0 {
2439 self.proximity_threshold = self.visible_rows;
2440 }
2441 self.num_rows = match self.lf.clone().select([len()]).collect() {
2442 Ok(df) => {
2443 if let Some(col) = df.get(0) {
2444 if let Some(AnyValue::UInt32(len)) = col.first() {
2445 *len as usize
2446 } else {
2447 0
2448 }
2449 } else {
2450 0
2451 }
2452 }
2453 Err(_) => 0,
2454 };
2455 self.num_rows_valid = true;
2456 }
2457
2458 pub fn scroll_to_end(&mut self) {
2460 self.ensure_num_rows();
2461 if self.num_rows == 0 {
2462 self.start_row = 0;
2463 self.buffered_start_row = 0;
2464 self.buffered_end_row = 0;
2465 return;
2466 }
2467 let end_start = self.num_rows.saturating_sub(self.visible_rows);
2468 if self.start_row == end_start {
2469 self.select_last_visible_row();
2470 return;
2471 }
2472 self.start_row = end_start;
2473 self.collect();
2474 self.select_last_visible_row();
2475 }
2476
2477 fn select_last_visible_row(&mut self) {
2479 if self.num_rows == 0 {
2480 return;
2481 }
2482 let last_row_display_idx = (self.num_rows - 1).saturating_sub(self.start_row);
2483 let sel = last_row_display_idx.min(self.visible_rows.saturating_sub(1));
2484 self.table_state.select(Some(sel));
2485 }
2486
2487 pub fn half_page_down(&mut self) {
2488 let half = (self.visible_rows / 2).max(1) as i64;
2489 self.slide_table(half);
2490 }
2491
2492 pub fn half_page_up(&mut self) {
2493 if self.start_row == 0 {
2494 return;
2495 }
2496 let half = (self.visible_rows / 2).max(1) as i64;
2497 self.slide_table(-half);
2498 }
2499
2500 pub fn page_up(&mut self) {
2501 if self.start_row == 0 {
2502 return;
2503 }
2504 self.slide_table(-(self.visible_rows as i64));
2505 }
2506
2507 pub fn scroll_right(&mut self) {
2508 let max_scroll = self
2509 .column_order
2510 .len()
2511 .saturating_sub(self.locked_columns_count);
2512 if self.termcol_index < max_scroll.saturating_sub(1) {
2513 self.termcol_index += 1;
2514 self.collect();
2515 }
2516 }
2517
2518 pub fn scroll_left(&mut self) {
2519 if self.termcol_index > 0 {
2520 self.termcol_index -= 1;
2521 self.collect();
2522 }
2523 }
2524
2525 pub fn headers(&self) -> Vec<String> {
2526 self.column_order.clone()
2527 }
2528
2529 pub fn set_column_order(&mut self, order: Vec<String>) {
2530 self.column_order = order;
2531 self.buffered_start_row = 0;
2532 self.buffered_end_row = 0;
2533 self.buffered_df = None;
2534 self.collect();
2535 }
2536
2537 pub fn set_locked_columns(&mut self, count: usize) {
2538 self.locked_columns_count = count.min(self.column_order.len());
2539 self.buffered_start_row = 0;
2540 self.buffered_end_row = 0;
2541 self.buffered_df = None;
2542 self.collect();
2543 }
2544
2545 pub fn locked_columns_count(&self) -> usize {
2546 self.locked_columns_count
2547 }
2548
2549 pub fn get_filters(&self) -> &[FilterStatement] {
2551 &self.filters
2552 }
2553
2554 pub fn get_sort_columns(&self) -> &[String] {
2555 &self.sort_columns
2556 }
2557
2558 pub fn get_sort_ascending(&self) -> bool {
2559 self.sort_ascending
2560 }
2561
2562 pub fn get_column_order(&self) -> &[String] {
2563 &self.column_order
2564 }
2565
2566 pub fn get_active_query(&self) -> &str {
2567 &self.active_query
2568 }
2569
2570 pub fn get_active_sql_query(&self) -> &str {
2571 &self.active_sql_query
2572 }
2573
2574 pub fn get_active_fuzzy_query(&self) -> &str {
2575 &self.active_fuzzy_query
2576 }
2577
2578 pub fn last_pivot_spec(&self) -> Option<&PivotSpec> {
2579 self.last_pivot_spec.as_ref()
2580 }
2581
2582 pub fn last_melt_spec(&self) -> Option<&MeltSpec> {
2583 self.last_melt_spec.as_ref()
2584 }
2585
2586 pub fn is_grouped(&self) -> bool {
2587 self.schema
2588 .iter()
2589 .any(|(_, dtype)| matches!(dtype, DataType::List(_)))
2590 }
2591
2592 pub fn group_key_columns(&self) -> Vec<String> {
2593 self.schema
2594 .iter()
2595 .filter(|(_, dtype)| !matches!(dtype, DataType::List(_)))
2596 .map(|(name, _)| name.to_string())
2597 .collect()
2598 }
2599
2600 pub fn group_value_columns(&self) -> Vec<String> {
2601 self.schema
2602 .iter()
2603 .filter(|(_, dtype)| matches!(dtype, DataType::List(_)))
2604 .map(|(name, _)| name.to_string())
2605 .collect()
2606 }
2607
2608 pub fn buffered_memory_bytes(&self) -> Option<usize> {
2610 let locked = self
2611 .locked_df
2612 .as_ref()
2613 .map(|df| df.estimated_size())
2614 .unwrap_or(0);
2615 let scroll = self.df.as_ref().map(|df| df.estimated_size()).unwrap_or(0);
2616 if locked == 0 && scroll == 0 {
2617 None
2618 } else {
2619 Some(locked + scroll)
2620 }
2621 }
2622
2623 pub fn buffered_rows(&self) -> usize {
2625 self.buffered_end_row
2626 .saturating_sub(self.buffered_start_row)
2627 }
2628
2629 pub fn display_df(&self) -> Option<&DataFrame> {
2631 self.df.as_ref()
2632 }
2633
2634 pub fn display_slice_df(&self) -> Option<DataFrame> {
2636 let df = self.df.as_ref()?;
2637 let offset = self.start_row.saturating_sub(self.buffered_start_row);
2638 let slice_len = self.visible_rows.min(df.height().saturating_sub(offset));
2639 if offset < df.height() && slice_len > 0 {
2640 Some(df.slice(offset as i64, slice_len))
2641 } else {
2642 None
2643 }
2644 }
2645
2646 pub fn max_buffered_rows(&self) -> usize {
2648 self.max_buffered_rows
2649 }
2650
2651 pub fn max_buffered_mb(&self) -> usize {
2653 self.max_buffered_mb
2654 }
2655
2656 pub fn drill_down_into_group(&mut self, group_index: usize) -> Result<()> {
2657 if !self.is_grouped() {
2658 return Ok(());
2659 }
2660
2661 self.grouped_lf = Some(self.lf.clone());
2662
2663 let grouped_df = collect_lazy(self.lf.clone(), self.polars_streaming)?;
2664
2665 if group_index >= grouped_df.height() {
2666 return Err(color_eyre::eyre::eyre!("Group index out of bounds"));
2667 }
2668
2669 let key_columns = self.group_key_columns();
2670 let mut key_values = Vec::new();
2671 for col_name in &key_columns {
2672 let col = grouped_df.column(col_name)?;
2673 let value = col.get(group_index).map_err(|e| {
2674 color_eyre::eyre::eyre!(
2675 "Group index {} out of bounds for column {}: {}",
2676 group_index,
2677 col_name,
2678 e
2679 )
2680 })?;
2681 key_values.push(value.str_value().to_string());
2682 }
2683 self.drilled_down_group_key = Some(key_values.clone());
2684 self.drilled_down_group_key_columns = Some(key_columns.clone());
2685
2686 let value_columns = self.group_value_columns();
2687 if value_columns.is_empty() {
2688 return Err(color_eyre::eyre::eyre!("No value columns in grouped data"));
2689 }
2690
2691 let mut columns = Vec::new();
2692
2693 let first_value_col = grouped_df.column(&value_columns[0])?;
2694 let first_list_value = first_value_col.get(group_index).map_err(|e| {
2695 color_eyre::eyre::eyre!("Group index {} out of bounds: {}", group_index, e)
2696 })?;
2697 let row_count = if let AnyValue::List(list_series) = first_list_value {
2698 list_series.len()
2699 } else {
2700 0
2701 };
2702
2703 for col_name in &key_columns {
2704 let col = grouped_df.column(col_name)?;
2705 let value = col.get(group_index).map_err(|e| {
2706 color_eyre::eyre::eyre!(
2707 "Group index {} out of bounds for column {}: {}",
2708 group_index,
2709 col_name,
2710 e
2711 )
2712 })?;
2713 let constant_series = match value {
2714 AnyValue::Int32(v) => Series::new(col_name.as_str().into(), vec![v; row_count]),
2715 AnyValue::Int64(v) => Series::new(col_name.as_str().into(), vec![v; row_count]),
2716 AnyValue::UInt32(v) => Series::new(col_name.as_str().into(), vec![v; row_count]),
2717 AnyValue::UInt64(v) => Series::new(col_name.as_str().into(), vec![v; row_count]),
2718 AnyValue::Float32(v) => Series::new(col_name.as_str().into(), vec![v; row_count]),
2719 AnyValue::Float64(v) => Series::new(col_name.as_str().into(), vec![v; row_count]),
2720 AnyValue::String(v) => {
2721 Series::new(col_name.as_str().into(), vec![v.to_string(); row_count])
2722 }
2723 AnyValue::Boolean(v) => Series::new(col_name.as_str().into(), vec![v; row_count]),
2724 _ => {
2725 let str_val = value.str_value().to_string();
2726 Series::new(col_name.as_str().into(), vec![str_val; row_count])
2727 }
2728 };
2729 columns.push(constant_series.into());
2730 }
2731
2732 for col_name in &value_columns {
2733 let col = grouped_df.column(col_name)?;
2734 let value = col.get(group_index).map_err(|e| {
2735 color_eyre::eyre::eyre!(
2736 "Group index {} out of bounds for column {}: {}",
2737 group_index,
2738 col_name,
2739 e
2740 )
2741 })?;
2742 if let AnyValue::List(list_series) = value {
2743 let named_series = list_series.with_name(col_name.as_str().into());
2744 columns.push(named_series.into());
2745 }
2746 }
2747
2748 let group_df = DataFrame::new(columns)?;
2749
2750 self.invalidate_num_rows();
2751 self.lf = group_df.lazy();
2752 self.schema = self.lf.clone().collect_schema()?;
2753 self.column_order = self.schema.iter_names().map(|s| s.to_string()).collect();
2754 self.drilled_down_group_index = Some(group_index);
2755 self.start_row = 0;
2756 self.termcol_index = 0;
2757 self.locked_columns_count = 0;
2758 self.table_state.select(Some(0));
2759 self.collect();
2760
2761 Ok(())
2762 }
2763
2764 pub fn drill_up(&mut self) -> Result<()> {
2765 if let Some(grouped_lf) = self.grouped_lf.take() {
2766 self.invalidate_num_rows();
2767 self.lf = grouped_lf;
2768 self.schema = self.lf.clone().collect_schema()?;
2769 self.column_order = self.schema.iter_names().map(|s| s.to_string()).collect();
2770 self.drilled_down_group_index = None;
2771 self.drilled_down_group_key = None;
2772 self.drilled_down_group_key_columns = None;
2773 self.start_row = 0;
2774 self.termcol_index = 0;
2775 self.locked_columns_count = 0;
2776 self.table_state.select(Some(0));
2777 self.collect();
2778 Ok(())
2779 } else {
2780 Err(color_eyre::eyre::eyre!("Not in drill-down mode"))
2781 }
2782 }
2783
2784 pub fn get_analysis_dataframe(&self) -> Result<DataFrame> {
2785 Ok(collect_lazy(self.lf.clone(), self.polars_streaming)?)
2786 }
2787
2788 pub fn get_analysis_context(&self) -> crate::statistics::AnalysisContext {
2789 crate::statistics::AnalysisContext {
2790 has_query: !self.active_query.is_empty(),
2791 query: self.active_query.clone(),
2792 has_filters: !self.filters.is_empty(),
2793 filter_count: self.filters.len(),
2794 is_drilled_down: self.is_drilled_down(),
2795 group_key: self.drilled_down_group_key.clone(),
2796 group_columns: self.drilled_down_group_key_columns.clone(),
2797 }
2798 }
2799
2800 fn cast_temporal_index_columns_for_pivot(
2803 df: &DataFrame,
2804 index: &[String],
2805 ) -> Result<(DataFrame, Vec<(String, DataType)>)> {
2806 let mut out = df.clone();
2807 let mut restore = Vec::new();
2808 for name in index {
2809 if let Ok(s) = out.column(name) {
2810 let dtype = s.dtype();
2811 if matches!(dtype, DataType::Date | DataType::Datetime(_, _)) {
2812 restore.push((name.clone(), dtype.clone()));
2813 let casted = s.cast(&DataType::Int32)?;
2814 out.with_column(casted)?;
2815 }
2816 }
2817 }
2818 Ok((out, restore))
2819 }
2820
2821 fn restore_temporal_index_columns_after_pivot(
2823 pivoted: &mut DataFrame,
2824 restore: &[(String, DataType)],
2825 ) -> Result<()> {
2826 for (name, dtype) in restore {
2827 if let Ok(s) = pivoted.column(name) {
2828 let restored = s.cast(dtype)?;
2829 pivoted.with_column(restored)?;
2830 }
2831 }
2832 Ok(())
2833 }
2834
2835 pub fn pivot(&mut self, spec: &PivotSpec) -> Result<()> {
2840 let df = collect_lazy(self.lf.clone(), self.polars_streaming)?;
2841 let agg_expr = pivot_agg_expr(spec.aggregation)?;
2842 let index_str: Vec<&str> = spec.index.iter().map(|s| s.as_str()).collect();
2843 let index_opt = if index_str.is_empty() {
2844 None
2845 } else {
2846 Some(index_str)
2847 };
2848
2849 let (df_for_pivot, temporal_index_restore) = if self.workaround_pivot_date_index {
2850 let (df_w, restore) =
2851 Self::cast_temporal_index_columns_for_pivot(&df, spec.index.as_slice())?;
2852 (df_w, Some(restore))
2853 } else {
2854 (df.clone(), None)
2855 };
2856 let sort_new_columns = spec.sort_columns.unwrap_or(true);
2857 let mut pivoted = pivot_stable(
2858 &df_for_pivot,
2859 [spec.pivot_column.as_str()],
2860 index_opt,
2861 Some([spec.value_column.as_str()]),
2862 sort_new_columns,
2863 Some(agg_expr),
2864 None,
2865 )?;
2866 if let Some(restore) = &temporal_index_restore {
2867 Self::restore_temporal_index_columns_after_pivot(&mut pivoted, restore)?;
2868 }
2869
2870 self.last_pivot_spec = Some(spec.clone());
2871 self.last_melt_spec = None;
2872 self.replace_lf_after_reshape(pivoted.lazy())?;
2873 Ok(())
2874 }
2875
2876 pub fn melt(&mut self, spec: &MeltSpec) -> Result<()> {
2878 let on = cols(spec.value_columns.iter().map(|s| s.as_str()));
2879 let index = cols(spec.index.iter().map(|s| s.as_str()));
2880 let args = UnpivotArgsDSL {
2881 on,
2882 index,
2883 variable_name: Some(PlSmallStr::from(spec.variable_name.as_str())),
2884 value_name: Some(PlSmallStr::from(spec.value_name.as_str())),
2885 };
2886 let lf = self.lf.clone().unpivot(args);
2887 self.last_melt_spec = Some(spec.clone());
2888 self.last_pivot_spec = None;
2889 self.replace_lf_after_reshape(lf)?;
2890 Ok(())
2891 }
2892
2893 fn replace_lf_after_reshape(&mut self, lf: LazyFrame) -> Result<()> {
2894 self.invalidate_num_rows();
2895 self.lf = lf;
2896 self.schema = self.lf.clone().collect_schema()?;
2897 self.column_order = self.schema.iter_names().map(|s| s.to_string()).collect();
2898 self.filters.clear();
2899 self.sort_columns.clear();
2900 self.active_query.clear();
2901 self.active_sql_query.clear();
2902 self.active_fuzzy_query.clear();
2903 self.error = None;
2904 self.df = None;
2905 self.locked_df = None;
2906 self.grouped_lf = None;
2907 self.drilled_down_group_index = None;
2908 self.drilled_down_group_key = None;
2909 self.drilled_down_group_key_columns = None;
2910 self.start_row = 0;
2911 self.termcol_index = 0;
2912 self.locked_columns_count = 0;
2913 self.buffered_start_row = 0;
2914 self.buffered_end_row = 0;
2915 self.buffered_df = None;
2916 self.table_state.select(Some(0));
2917 self.collect();
2918 Ok(())
2919 }
2920
2921 pub fn is_drilled_down(&self) -> bool {
2922 self.drilled_down_group_index.is_some()
2923 }
2924
2925 fn apply_transformations(&mut self) {
2926 let mut lf = self.lf.clone();
2927 let mut final_expr: Option<Expr> = None;
2928
2929 for filter in &self.filters {
2930 let col_expr = col(&filter.column);
2931 let val_lit = if let Some(dtype) = self.schema.get(&filter.column) {
2932 match dtype {
2933 DataType::Float32 | DataType::Float64 => filter
2934 .value
2935 .parse::<f64>()
2936 .map(lit)
2937 .unwrap_or_else(|_| lit(filter.value.as_str())),
2938 DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => filter
2939 .value
2940 .parse::<i64>()
2941 .map(lit)
2942 .unwrap_or_else(|_| lit(filter.value.as_str())),
2943 DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 => {
2944 filter
2945 .value
2946 .parse::<u64>()
2947 .map(lit)
2948 .unwrap_or_else(|_| lit(filter.value.as_str()))
2949 }
2950 DataType::Boolean => filter
2951 .value
2952 .parse::<bool>()
2953 .map(lit)
2954 .unwrap_or_else(|_| lit(filter.value.as_str())),
2955 _ => lit(filter.value.as_str()),
2956 }
2957 } else {
2958 lit(filter.value.as_str())
2959 };
2960
2961 let op_expr = match filter.operator {
2962 FilterOperator::Eq => col_expr.eq(val_lit),
2963 FilterOperator::NotEq => col_expr.neq(val_lit),
2964 FilterOperator::Gt => col_expr.gt(val_lit),
2965 FilterOperator::Lt => col_expr.lt(val_lit),
2966 FilterOperator::GtEq => col_expr.gt_eq(val_lit),
2967 FilterOperator::LtEq => col_expr.lt_eq(val_lit),
2968 FilterOperator::Contains => {
2969 let val = filter.value.clone();
2970 col_expr.str().contains_literal(lit(val))
2971 }
2972 FilterOperator::NotContains => {
2973 let val = filter.value.clone();
2974 col_expr.str().contains_literal(lit(val)).not()
2975 }
2976 };
2977
2978 if let Some(current) = final_expr {
2979 final_expr = Some(match filter.logical_op {
2980 LogicalOperator::And => current.and(op_expr),
2981 LogicalOperator::Or => current.or(op_expr),
2982 });
2983 } else {
2984 final_expr = Some(op_expr);
2985 }
2986 }
2987
2988 if let Some(e) = final_expr {
2989 lf = lf.filter(e);
2990 }
2991
2992 if !self.sort_columns.is_empty() {
2993 let options = SortMultipleOptions {
2994 descending: self
2995 .sort_columns
2996 .iter()
2997 .map(|_| !self.sort_ascending)
2998 .collect(),
2999 ..Default::default()
3000 };
3001 lf = lf.sort_by_exprs(
3002 self.sort_columns.iter().map(col).collect::<Vec<_>>(),
3003 options,
3004 );
3005 } else if !self.sort_ascending {
3006 lf = lf.reverse();
3007 }
3008
3009 self.invalidate_num_rows();
3010 self.lf = lf;
3011 self.collect();
3012 }
3013
3014 pub fn sort(&mut self, columns: Vec<String>, ascending: bool) {
3015 self.sort_columns = columns;
3016 self.sort_ascending = ascending;
3017 self.buffered_start_row = 0;
3018 self.buffered_end_row = 0;
3019 self.buffered_df = None;
3020 self.apply_transformations();
3021 }
3022
3023 pub fn reverse(&mut self) {
3024 self.sort_ascending = !self.sort_ascending;
3025
3026 self.buffered_start_row = 0;
3027 self.buffered_end_row = 0;
3028 self.buffered_df = None;
3029
3030 if !self.sort_columns.is_empty() {
3031 let options = SortMultipleOptions {
3032 descending: self
3033 .sort_columns
3034 .iter()
3035 .map(|_| !self.sort_ascending)
3036 .collect(),
3037 ..Default::default()
3038 };
3039 self.invalidate_num_rows();
3040 self.lf = self.lf.clone().sort_by_exprs(
3041 self.sort_columns.iter().map(col).collect::<Vec<_>>(),
3042 options,
3043 );
3044 self.collect();
3045 } else {
3046 self.invalidate_num_rows();
3047 self.lf = self.lf.clone().reverse();
3048 self.collect();
3049 }
3050 }
3051
3052 pub fn filter(&mut self, filters: Vec<FilterStatement>) {
3053 self.filters = filters;
3054 self.buffered_start_row = 0;
3055 self.buffered_end_row = 0;
3056 self.buffered_df = None;
3057 self.apply_transformations();
3058 }
3059
3060 pub fn query(&mut self, query: String) {
3061 self.error = None;
3062
3063 let trimmed_query = query.trim();
3064 if trimmed_query.is_empty() {
3065 self.reset_lf_to_original();
3066 self.collect();
3067 return;
3068 }
3069
3070 match parse_query(&query) {
3071 Ok((cols, filter, group_by_cols, group_by_col_names)) => {
3072 let mut lf = self.original_lf.clone();
3073 let mut schema_opt: Option<Arc<Schema>> = None;
3074
3075 if let Some(f) = filter {
3077 lf = lf.filter(f);
3078 }
3079
3080 if !group_by_cols.is_empty() {
3081 if !cols.is_empty() {
3082 lf = lf.group_by(group_by_cols.clone()).agg(cols);
3083 } else {
3084 let schema = match lf.clone().collect_schema() {
3085 Ok(s) => s,
3086 Err(e) => {
3087 self.error = Some(e);
3088 return; }
3090 };
3091 let all_columns: Vec<String> =
3092 schema.iter_names().map(|s| s.to_string()).collect();
3093
3094 let mut agg_exprs = Vec::new();
3098 for col_name in &all_columns {
3099 if !group_by_col_names.contains(col_name) {
3100 agg_exprs.push(col(col_name));
3101 }
3102 }
3103
3104 lf = lf.group_by(group_by_cols.clone()).agg(agg_exprs);
3105 }
3106 let schema = match lf.collect_schema() {
3109 Ok(s) => s,
3110 Err(e) => {
3111 self.error = Some(e);
3112 return;
3113 }
3114 };
3115 schema_opt = Some(schema.clone());
3116 let sort_exprs: Vec<Expr> = schema
3117 .iter_names()
3118 .take(group_by_cols.len())
3119 .map(|n| col(n.as_str()))
3120 .collect();
3121 lf = lf.sort_by_exprs(sort_exprs, Default::default());
3122 } else if !cols.is_empty() {
3123 lf = lf.select(cols);
3124 }
3125
3126 let schema = match schema_opt {
3127 Some(s) => s,
3128 None => match lf.collect_schema() {
3129 Ok(s) => s,
3130 Err(e) => {
3131 self.error = Some(e);
3132 return;
3133 }
3134 },
3135 };
3136
3137 self.schema = schema;
3138 self.invalidate_num_rows();
3139 self.lf = lf;
3140 self.column_order = self.schema.iter_names().map(|s| s.to_string()).collect();
3141
3142 if !group_by_col_names.is_empty() {
3145 let mut locked_count = 0;
3148 for col_name in &self.column_order {
3149 if group_by_col_names.contains(col_name) {
3150 locked_count += 1;
3151 } else {
3152 break;
3154 }
3155 }
3156 self.locked_columns_count = locked_count;
3157 } else {
3158 self.locked_columns_count = 0;
3159 }
3160
3161 self.filters.clear();
3163 self.sort_columns.clear();
3164 self.sort_ascending = true;
3165 self.start_row = 0;
3166 self.termcol_index = 0;
3167 self.active_query = query;
3168 self.buffered_start_row = 0;
3169 self.buffered_end_row = 0;
3170 self.buffered_df = None;
3171 self.drilled_down_group_index = None;
3173 self.drilled_down_group_key = None;
3174 self.drilled_down_group_key_columns = None;
3175 self.grouped_lf = None;
3176 self.table_state.select(Some(0));
3178 self.collect();
3181 if self.num_rows > 0 {
3184 self.start_row = 0;
3185 }
3186 }
3187 Err(e) => {
3188 self.error = Some(PolarsError::ComputeError(e.into()));
3190 }
3191 }
3192 }
3193
3194 pub fn sql_query(&mut self, sql: String) {
3197 self.error = None;
3198 let trimmed = sql.trim();
3199 if trimmed.is_empty() {
3200 self.reset_lf_to_original();
3201 return;
3202 }
3203
3204 #[cfg(feature = "sql")]
3205 {
3206 use polars_sql::SQLContext;
3207 let mut ctx = SQLContext::new();
3208 ctx.register("df", self.lf.clone());
3209 match ctx.execute(trimmed) {
3210 Ok(result_lf) => {
3211 let schema = match result_lf.clone().collect_schema() {
3212 Ok(s) => s,
3213 Err(e) => {
3214 self.error = Some(e);
3215 return;
3216 }
3217 };
3218 self.schema = schema;
3219 self.invalidate_num_rows();
3220 self.lf = result_lf;
3221 self.column_order = self.schema.iter_names().map(|s| s.to_string()).collect();
3222 self.active_sql_query = sql;
3223 self.locked_columns_count = 0;
3224 self.filters.clear();
3225 self.sort_columns.clear();
3226 self.sort_ascending = true;
3227 self.start_row = 0;
3228 self.termcol_index = 0;
3229 self.drilled_down_group_index = None;
3230 self.drilled_down_group_key = None;
3231 self.drilled_down_group_key_columns = None;
3232 self.grouped_lf = None;
3233 self.buffered_start_row = 0;
3234 self.buffered_end_row = 0;
3235 self.buffered_df = None;
3236 self.table_state.select(Some(0));
3237 }
3238 Err(e) => {
3239 self.error = Some(e);
3240 }
3241 }
3242 }
3243
3244 #[cfg(not(feature = "sql"))]
3245 {
3246 self.error = Some(PolarsError::ComputeError(
3247 format!("SQL support not compiled in (build with --features sql)").into(),
3248 ));
3249 }
3250 }
3251
3252 pub fn fuzzy_search(&mut self, query: String) {
3256 self.error = None;
3257 let trimmed = query.trim();
3258 if trimmed.is_empty() {
3259 self.reset_lf_to_original();
3260 self.collect();
3261 return;
3262 }
3263 let string_cols: Vec<String> = self
3264 .schema
3265 .iter()
3266 .filter(|(_, dtype)| dtype.is_string())
3267 .map(|(name, _)| name.to_string())
3268 .collect();
3269 if string_cols.is_empty() {
3270 self.error = Some(PolarsError::ComputeError(
3271 "Fuzzy search requires at least one string column".into(),
3272 ));
3273 return;
3274 }
3275 let tokens: Vec<&str> = trimmed
3276 .split_whitespace()
3277 .filter(|s| !s.is_empty())
3278 .collect();
3279 let token_exprs: Vec<Expr> = tokens
3280 .iter()
3281 .map(|token| {
3282 let pattern = fuzzy_token_regex(token);
3283 string_cols
3284 .iter()
3285 .map(|c| col(c.as_str()).str().contains(lit(pattern.as_str()), false))
3286 .reduce(|a, b| a.or(b))
3287 .unwrap()
3288 })
3289 .collect();
3290 let combined = token_exprs.into_iter().reduce(|a, b| a.and(b)).unwrap();
3291 self.lf = self.original_lf.clone().filter(combined);
3292 self.filters.clear();
3293 self.sort_columns.clear();
3294 self.active_query.clear();
3295 self.active_sql_query.clear();
3296 self.active_fuzzy_query = query;
3297 self.locked_columns_count = 0;
3299 self.start_row = 0;
3300 self.termcol_index = 0;
3301 self.drilled_down_group_index = None;
3302 self.drilled_down_group_key = None;
3303 self.drilled_down_group_key_columns = None;
3304 self.grouped_lf = None;
3305 self.buffered_start_row = 0;
3306 self.buffered_end_row = 0;
3307 self.buffered_df = None;
3308 self.table_state.select(Some(0));
3309 self.invalidate_num_rows();
3310 self.collect();
3311 }
3312}
3313
3314pub(crate) fn fuzzy_token_regex(token: &str) -> String {
3316 let inner: String =
3317 token
3318 .chars()
3319 .map(|c| regex::escape(&c.to_string()))
3320 .fold(String::new(), |mut s, e| {
3321 if !s.is_empty() {
3322 s.push_str(".*");
3323 }
3324 s.push_str(&e);
3325 s
3326 });
3327 format!("(?i).*{}.*", inner)
3328}
3329
3330pub struct DataTable {
3331 pub header_bg: Color,
3332 pub header_fg: Color,
3333 pub row_numbers_fg: Color,
3334 pub separator_fg: Color,
3335 pub table_cell_padding: u16,
3336 pub alternate_row_bg: Option<Color>,
3337 pub column_colors: bool,
3339 pub str_col: Option<Color>,
3340 pub int_col: Option<Color>,
3341 pub float_col: Option<Color>,
3342 pub bool_col: Option<Color>,
3343 pub temporal_col: Option<Color>,
3344}
3345
3346impl Default for DataTable {
3347 fn default() -> Self {
3348 Self {
3349 header_bg: Color::Indexed(236),
3350 header_fg: Color::White,
3351 row_numbers_fg: Color::DarkGray,
3352 separator_fg: Color::White,
3353 table_cell_padding: 1,
3354 alternate_row_bg: None,
3355 column_colors: false,
3356 str_col: None,
3357 int_col: None,
3358 float_col: None,
3359 bool_col: None,
3360 temporal_col: None,
3361 }
3362 }
3363}
3364
3365struct RowNumbersParams {
3367 start_row: usize,
3368 visible_rows: usize,
3369 num_rows: usize,
3370 row_start_index: usize,
3371 selected_row: Option<usize>,
3372}
3373
3374impl DataTable {
3375 pub fn new() -> Self {
3376 Self::default()
3377 }
3378
3379 pub fn with_colors(
3380 mut self,
3381 header_bg: Color,
3382 header_fg: Color,
3383 row_numbers_fg: Color,
3384 separator_fg: Color,
3385 ) -> Self {
3386 self.header_bg = header_bg;
3387 self.header_fg = header_fg;
3388 self.row_numbers_fg = row_numbers_fg;
3389 self.separator_fg = separator_fg;
3390 self
3391 }
3392
3393 pub fn with_cell_padding(mut self, padding: u16) -> Self {
3394 self.table_cell_padding = padding;
3395 self
3396 }
3397
3398 pub fn with_alternate_row_bg(mut self, color: Option<Color>) -> Self {
3399 self.alternate_row_bg = color;
3400 self
3401 }
3402
3403 pub fn with_column_type_colors(
3405 mut self,
3406 str_col: Color,
3407 int_col: Color,
3408 float_col: Color,
3409 bool_col: Color,
3410 temporal_col: Color,
3411 ) -> Self {
3412 self.column_colors = true;
3413 self.str_col = Some(str_col);
3414 self.int_col = Some(int_col);
3415 self.float_col = Some(float_col);
3416 self.bool_col = Some(bool_col);
3417 self.temporal_col = Some(temporal_col);
3418 self
3419 }
3420
3421 fn column_type_color(&self, dtype: &DataType) -> Option<Color> {
3423 if !self.column_colors {
3424 return None;
3425 }
3426 match dtype {
3427 DataType::String => self.str_col,
3428 DataType::Int8
3429 | DataType::Int16
3430 | DataType::Int32
3431 | DataType::Int64
3432 | DataType::UInt8
3433 | DataType::UInt16
3434 | DataType::UInt32
3435 | DataType::UInt64 => self.int_col,
3436 DataType::Float32 | DataType::Float64 => self.float_col,
3437 DataType::Boolean => self.bool_col,
3438 DataType::Date | DataType::Datetime(_, _) | DataType::Time | DataType::Duration(_) => {
3439 self.temporal_col
3440 }
3441 _ => None,
3442 }
3443 }
3444
3445 fn render_dataframe(
3446 &self,
3447 df: &DataFrame,
3448 area: Rect,
3449 buf: &mut Buffer,
3450 state: &mut TableState,
3451 _row_numbers: bool,
3452 _start_row_offset: usize,
3453 ) {
3454 let (height, cols) = df.shape();
3456
3457 let mut widths: Vec<u16> = df
3459 .get_column_names()
3460 .iter()
3461 .map(|name| name.chars().count() as u16)
3462 .collect();
3463
3464 let mut used_width = 0;
3465
3466 let mut rows: Vec<Vec<Cell>> = vec![vec![]; height];
3468 let mut visible_columns = 0;
3469
3470 let max_rows = height.min(if area.height > 1 {
3471 area.height as usize - 1
3472 } else {
3473 0
3474 });
3475
3476 for col_index in 0..cols {
3477 let mut max_len = widths[col_index];
3478 let col_data = &df[col_index];
3479 let col_color = self.column_type_color(col_data.dtype());
3480
3481 for (row_index, row) in rows.iter_mut().take(max_rows).enumerate() {
3482 let value = col_data.get(row_index).unwrap();
3483 let val_str: Cow<str> = if matches!(value, AnyValue::Null) {
3484 Cow::Borrowed("")
3485 } else {
3486 value.str_value()
3487 };
3488 let len = val_str.chars().count() as u16;
3489 max_len = max_len.max(len);
3490 let cell = match col_color {
3491 Some(c) => Cell::from(Line::from(Span::styled(
3492 val_str.into_owned(),
3493 Style::default().fg(c),
3494 ))),
3495 None => Cell::from(Line::from(val_str)),
3496 };
3497 row.push(cell);
3498 }
3499
3500 let overflows = (used_width + max_len) > area.width;
3502
3503 if overflows && col_data.dtype() == &DataType::String {
3504 let visible_width = area.width.saturating_sub(used_width);
3505 visible_columns += 1;
3506 widths[col_index] = visible_width;
3507 break;
3508 } else if !overflows {
3509 visible_columns += 1;
3510 widths[col_index] = max_len;
3511 used_width += max_len + self.table_cell_padding;
3512 } else {
3513 break;
3514 }
3515 }
3516
3517 widths.truncate(visible_columns);
3518 let rows: Vec<Row> = rows
3520 .into_iter()
3521 .enumerate()
3522 .map(|(row_index, mut row)| {
3523 row.truncate(visible_columns);
3524 let row_style = if row_index % 2 == 1 {
3525 self.alternate_row_bg
3526 .map(|c| Style::default().bg(c))
3527 .unwrap_or_default()
3528 } else {
3529 Style::default()
3530 };
3531 Row::new(row).style(row_style)
3532 })
3533 .collect();
3534
3535 let header_row_style = if self.header_bg == Color::Reset {
3536 Style::default().fg(self.header_fg)
3537 } else {
3538 Style::default().bg(self.header_bg).fg(self.header_fg)
3539 };
3540 let headers: Vec<Span> = df
3541 .get_column_names()
3542 .iter()
3543 .take(visible_columns)
3544 .map(|name| Span::styled(name.to_string(), Style::default()))
3545 .collect();
3546
3547 StatefulWidget::render(
3548 Table::new(rows, widths)
3549 .column_spacing(self.table_cell_padding)
3550 .header(Row::new(headers).style(header_row_style))
3551 .row_highlight_style(Style::default().add_modifier(Modifier::REVERSED)),
3552 area,
3553 buf,
3554 state,
3555 );
3556 }
3557
3558 fn render_row_numbers(&self, area: Rect, buf: &mut Buffer, params: RowNumbersParams) {
3559 let header_style = if self.header_bg == Color::Reset {
3561 Style::default().fg(self.header_fg)
3562 } else {
3563 Style::default().bg(self.header_bg).fg(self.header_fg)
3564 };
3565 let header_fill = " ".repeat(area.width as usize);
3566 Paragraph::new(header_fill).style(header_style).render(
3567 Rect {
3568 x: area.x,
3569 y: area.y,
3570 width: area.width,
3571 height: 1,
3572 },
3573 buf,
3574 );
3575
3576 let rows_to_render = params
3578 .visible_rows
3579 .min(params.num_rows.saturating_sub(params.start_row));
3580
3581 if rows_to_render == 0 {
3582 return;
3583 }
3584
3585 let max_row_num =
3587 params.start_row + rows_to_render.saturating_sub(1) + params.row_start_index;
3588 let max_width = max_row_num.to_string().len();
3589
3590 for row_idx in 0..rows_to_render.min(area.height.saturating_sub(1) as usize) {
3592 let row_num = params.start_row + row_idx + params.row_start_index;
3593 let row_num_text = row_num.to_string();
3594
3595 let padding = max_width.saturating_sub(row_num_text.len());
3597 let padded_text = format!("{}{}", " ".repeat(padding), row_num_text);
3598
3599 let is_selected = params.selected_row == Some(row_idx);
3603 let (fg, bg) = if is_selected {
3604 (
3605 Color::Reset,
3606 self.alternate_row_bg.filter(|_| row_idx % 2 == 1),
3607 )
3608 } else {
3609 (
3610 self.row_numbers_fg,
3611 self.alternate_row_bg.filter(|_| row_idx % 2 == 1),
3612 )
3613 };
3614 let row_num_style = match bg {
3615 Some(bg_color) => Style::default().fg(fg).bg(bg_color),
3616 None => Style::default().fg(fg),
3617 };
3618
3619 let y = area.y + row_idx as u16 + 1; if y < area.y + area.height {
3621 Paragraph::new(padded_text).style(row_num_style).render(
3622 Rect {
3623 x: area.x,
3624 y,
3625 width: area.width,
3626 height: 1,
3627 },
3628 buf,
3629 );
3630 }
3631 }
3632 }
3633}
3634
3635impl StatefulWidget for DataTable {
3636 type State = DataTableState;
3637
3638 fn render(self, area: Rect, buf: &mut Buffer, state: &mut Self::State) {
3639 state.visible_termcols = area.width as usize;
3640 let new_visible_rows = if area.height > 0 {
3641 (area.height - 1) as usize
3642 } else {
3643 0
3644 };
3645 let needs_collect = new_visible_rows != state.visible_rows;
3646 state.visible_rows = new_visible_rows;
3647
3648 if let Some(selected) = state.table_state.selected() {
3649 if selected >= state.visible_rows {
3650 state.table_state.select(Some(state.visible_rows - 1))
3651 }
3652 }
3653
3654 if needs_collect {
3655 state.collect();
3656 }
3657
3658 if let Some(error) = state.error.as_ref() {
3661 if !state.suppress_error_display {
3662 Paragraph::new(format!("Error: {}", user_message_from_polars(error)))
3663 .centered()
3664 .block(
3665 Block::default()
3666 .borders(Borders::NONE)
3667 .padding(Padding::top(area.height / 2)),
3668 )
3669 .wrap(ratatui::widgets::Wrap { trim: true })
3670 .render(area, buf);
3671 return;
3672 }
3673 }
3675
3676 let row_num_width = if state.row_numbers {
3678 let max_row_num = state.start_row + state.visible_rows.saturating_sub(1) + 1; max_row_num.to_string().len().max(1) as u16 + 1 } else {
3681 0
3682 };
3683
3684 let mut locked_width = row_num_width;
3686 if let Some(locked_df) = state.locked_df.as_ref() {
3687 let (_, cols) = locked_df.shape();
3688 for col_index in 0..cols {
3689 let col_name = locked_df.get_column_names()[col_index];
3690 let mut max_len = col_name.chars().count() as u16;
3691 let col_data = &locked_df[col_index];
3692 for row_index in 0..locked_df.height().min(state.visible_rows) {
3693 let value = col_data.get(row_index).unwrap();
3694 let val_str: Cow<str> = if matches!(value, AnyValue::Null) {
3695 Cow::Borrowed("")
3696 } else {
3697 value.str_value()
3698 };
3699 let len = val_str.chars().count() as u16;
3700 max_len = max_len.max(len);
3701 }
3702 locked_width += max_len + 1;
3703 }
3704 }
3705
3706 if locked_width > row_num_width && locked_width < area.width {
3708 let locked_area = Rect {
3709 x: area.x,
3710 y: area.y,
3711 width: locked_width,
3712 height: area.height,
3713 };
3714 let separator_x = locked_area.x + locked_area.width;
3715
3716 if state.row_numbers {
3718 let row_num_area = Rect {
3719 x: area.x,
3720 y: area.y,
3721 width: row_num_width,
3722 height: area.height,
3723 };
3724 self.render_row_numbers(
3725 row_num_area,
3726 buf,
3727 RowNumbersParams {
3728 start_row: state.start_row,
3729 visible_rows: state.visible_rows,
3730 num_rows: state.num_rows,
3731 row_start_index: state.row_start_index,
3732 selected_row: state.table_state.selected(),
3733 },
3734 );
3735 }
3736 let scrollable_area = Rect {
3737 x: separator_x + 1,
3738 y: area.y,
3739 width: area.width.saturating_sub(locked_width + 1),
3740 height: area.height,
3741 };
3742
3743 if let Some(locked_df) = state.locked_df.as_ref() {
3745 let adjusted_locked_area = if state.row_numbers {
3747 Rect {
3748 x: area.x + row_num_width,
3749 y: area.y,
3750 width: locked_width - row_num_width,
3751 height: area.height,
3752 }
3753 } else {
3754 locked_area
3755 };
3756
3757 let offset = state.start_row.saturating_sub(state.buffered_start_row);
3759 let slice_len = state
3760 .visible_rows
3761 .min(locked_df.height().saturating_sub(offset));
3762 if offset < locked_df.height() && slice_len > 0 {
3763 let sliced_df = locked_df.slice(offset as i64, slice_len);
3764 self.render_dataframe(
3765 &sliced_df,
3766 adjusted_locked_area,
3767 buf,
3768 &mut state.table_state,
3769 false,
3770 state.start_row,
3771 );
3772 }
3773 }
3774
3775 let separator_x_adjusted = if state.row_numbers {
3777 area.x + row_num_width + (locked_width - row_num_width)
3778 } else {
3779 separator_x
3780 };
3781 for y in area.y..area.y + area.height {
3782 let cell = &mut buf[(separator_x_adjusted, y)];
3783 cell.set_char('│');
3784 cell.set_style(Style::default().fg(self.separator_fg));
3785 }
3786
3787 let adjusted_scrollable_area = if state.row_numbers {
3789 Rect {
3790 x: separator_x_adjusted + 1,
3791 y: area.y,
3792 width: area.width.saturating_sub(locked_width + 1),
3793 height: area.height,
3794 }
3795 } else {
3796 scrollable_area
3797 };
3798
3799 if let Some(df) = state.df.as_ref() {
3801 let offset = state.start_row.saturating_sub(state.buffered_start_row);
3803 let slice_len = state.visible_rows.min(df.height().saturating_sub(offset));
3804 if offset < df.height() && slice_len > 0 {
3805 let sliced_df = df.slice(offset as i64, slice_len);
3806 self.render_dataframe(
3807 &sliced_df,
3808 adjusted_scrollable_area,
3809 buf,
3810 &mut state.table_state,
3811 false,
3812 state.start_row,
3813 );
3814 }
3815 }
3816 } else if let Some(df) = state.df.as_ref() {
3817 if state.row_numbers {
3820 let row_num_area = Rect {
3821 x: area.x,
3822 y: area.y,
3823 width: row_num_width,
3824 height: area.height,
3825 };
3826 self.render_row_numbers(
3827 row_num_area,
3828 buf,
3829 RowNumbersParams {
3830 start_row: state.start_row,
3831 visible_rows: state.visible_rows,
3832 num_rows: state.num_rows,
3833 row_start_index: state.row_start_index,
3834 selected_row: state.table_state.selected(),
3835 },
3836 );
3837
3838 let data_area = Rect {
3840 x: area.x + row_num_width,
3841 y: area.y,
3842 width: area.width.saturating_sub(row_num_width),
3843 height: area.height,
3844 };
3845
3846 let offset = state.start_row.saturating_sub(state.buffered_start_row);
3848 let slice_len = state.visible_rows.min(df.height().saturating_sub(offset));
3849 if offset < df.height() && slice_len > 0 {
3850 let sliced_df = df.slice(offset as i64, slice_len);
3851 self.render_dataframe(
3852 &sliced_df,
3853 data_area,
3854 buf,
3855 &mut state.table_state,
3856 false,
3857 state.start_row,
3858 );
3859 }
3860 } else {
3861 let offset = state.start_row.saturating_sub(state.buffered_start_row);
3863 let slice_len = state.visible_rows.min(df.height().saturating_sub(offset));
3864 if offset < df.height() && slice_len > 0 {
3865 let sliced_df = df.slice(offset as i64, slice_len);
3866 self.render_dataframe(
3867 &sliced_df,
3868 area,
3869 buf,
3870 &mut state.table_state,
3871 false,
3872 state.start_row,
3873 );
3874 }
3875 }
3876 } else if !state.column_order.is_empty() {
3877 let empty_columns: Vec<_> = state
3879 .column_order
3880 .iter()
3881 .map(|name| Series::new(name.as_str().into(), Vec::<String>::new()).into())
3882 .collect();
3883 if let Ok(empty_df) = DataFrame::new(empty_columns) {
3884 if state.row_numbers {
3885 let row_num_area = Rect {
3886 x: area.x,
3887 y: area.y,
3888 width: row_num_width,
3889 height: area.height,
3890 };
3891 self.render_row_numbers(
3892 row_num_area,
3893 buf,
3894 RowNumbersParams {
3895 start_row: 0,
3896 visible_rows: state.visible_rows,
3897 num_rows: 0,
3898 row_start_index: state.row_start_index,
3899 selected_row: None,
3900 },
3901 );
3902 let data_area = Rect {
3903 x: area.x + row_num_width,
3904 y: area.y,
3905 width: area.width.saturating_sub(row_num_width),
3906 height: area.height,
3907 };
3908 self.render_dataframe(
3909 &empty_df,
3910 data_area,
3911 buf,
3912 &mut state.table_state,
3913 false,
3914 0,
3915 );
3916 } else {
3917 self.render_dataframe(&empty_df, area, buf, &mut state.table_state, false, 0);
3918 }
3919 } else {
3920 Paragraph::new("No data").render(area, buf);
3921 }
3922 } else {
3923 Paragraph::new("No data").render(area, buf);
3925 }
3926 }
3927}
3928
3929#[cfg(test)]
3930mod tests {
3931 use super::*;
3932 use crate::filter_modal::{FilterOperator, FilterStatement, LogicalOperator};
3933 use crate::pivot_melt_modal::{MeltSpec, PivotAggregation, PivotSpec};
3934
3935 fn create_test_lf() -> LazyFrame {
3936 df! (
3937 "a" => &[1, 2, 3],
3938 "b" => &["x", "y", "z"]
3939 )
3940 .unwrap()
3941 .lazy()
3942 }
3943
3944 fn create_large_test_lf() -> LazyFrame {
3945 df! (
3946 "a" => (0..100).collect::<Vec<i32>>(),
3947 "b" => (0..100).map(|i| format!("text_{}", i)).collect::<Vec<String>>(),
3948 "c" => (0..100).map(|i| i % 3).collect::<Vec<i32>>(),
3949 "d" => (0..100).map(|i| i % 5).collect::<Vec<i32>>()
3950 )
3951 .unwrap()
3952 .lazy()
3953 }
3954
3955 #[test]
3956 fn test_from_csv() {
3957 let path = crate::tests::sample_data_dir().join("3-sfd-header.csv");
3960 let state = DataTableState::from_csv(&path, &Default::default()).unwrap(); assert_eq!(state.schema.len(), 6); }
3963
3964 #[test]
3965 fn test_from_csv_gzipped() {
3966 let path = crate::tests::sample_data_dir().join("mixed_types.csv.gz");
3969 let state = DataTableState::from_csv(&path, &Default::default()).unwrap(); assert_eq!(state.schema.len(), 6); }
3972
3973 #[test]
3974 fn test_from_parquet() {
3975 let path = crate::tests::sample_data_dir().join("people.parquet");
3977 let state = DataTableState::from_parquet(&path, None, None, None, None, false, 1).unwrap();
3978 assert!(!state.schema.is_empty());
3979 }
3980
3981 #[test]
3982 fn test_from_ipc() {
3983 use polars::prelude::IpcWriter;
3984 use std::io::BufWriter;
3985 let mut df = df!(
3986 "x" => &[1_i32, 2, 3],
3987 "y" => &["a", "b", "c"]
3988 )
3989 .unwrap();
3990 let dir = std::env::temp_dir();
3991 let path = dir.join("datui_test_ipc.arrow");
3992 let file = std::fs::File::create(&path).unwrap();
3993 let mut writer = BufWriter::new(file);
3994 IpcWriter::new(&mut writer).finish(&mut df).unwrap();
3995 drop(writer);
3996 let state = DataTableState::from_ipc(&path, None, None, None, None, false, 1).unwrap();
3997 assert_eq!(state.schema.len(), 2);
3998 assert!(state.schema.contains("x"));
3999 assert!(state.schema.contains("y"));
4000 let _ = std::fs::remove_file(&path);
4001 }
4002
4003 #[test]
4004 fn test_from_avro() {
4005 use polars::io::avro::AvroWriter;
4006 use std::io::BufWriter;
4007 let mut df = df!(
4008 "id" => &[1_i32, 2, 3],
4009 "name" => &["alice", "bob", "carol"]
4010 )
4011 .unwrap();
4012 let dir = std::env::temp_dir();
4013 let path = dir.join("datui_test_avro.avro");
4014 let file = std::fs::File::create(&path).unwrap();
4015 let mut writer = BufWriter::new(file);
4016 AvroWriter::new(&mut writer).finish(&mut df).unwrap();
4017 drop(writer);
4018 let state = DataTableState::from_avro(&path, None, None, None, None, false, 1).unwrap();
4019 assert_eq!(state.schema.len(), 2);
4020 assert!(state.schema.contains("id"));
4021 assert!(state.schema.contains("name"));
4022 let _ = std::fs::remove_file(&path);
4023 }
4024
4025 #[test]
4026 fn test_from_orc() {
4027 use arrow::array::{Int64Array, StringArray};
4028 use arrow::datatypes::{DataType, Field, Schema};
4029 use arrow::record_batch::RecordBatch;
4030 use orc_rust::ArrowWriterBuilder;
4031 use std::io::BufWriter;
4032 use std::sync::Arc;
4033
4034 let schema = Arc::new(Schema::new(vec![
4035 Field::new("id", DataType::Int64, false),
4036 Field::new("name", DataType::Utf8, false),
4037 ]));
4038 let id_array = Arc::new(Int64Array::from(vec![1_i64, 2, 3]));
4039 let name_array = Arc::new(StringArray::from(vec!["a", "b", "c"]));
4040 let batch = RecordBatch::try_new(schema.clone(), vec![id_array, name_array]).unwrap();
4041
4042 let dir = std::env::temp_dir();
4043 let path = dir.join("datui_test_orc.orc");
4044 let file = std::fs::File::create(&path).unwrap();
4045 let writer = BufWriter::new(file);
4046 let mut orc_writer = ArrowWriterBuilder::new(writer, schema).try_build().unwrap();
4047 orc_writer.write(&batch).unwrap();
4048 orc_writer.close().unwrap();
4049
4050 let state = DataTableState::from_orc(&path, None, None, None, None, false, 1).unwrap();
4051 assert_eq!(state.schema.len(), 2);
4052 assert!(state.schema.contains("id"));
4053 assert!(state.schema.contains("name"));
4054 let _ = std::fs::remove_file(&path);
4055 }
4056
4057 #[test]
4058 fn test_filter() {
4059 let lf = create_test_lf();
4060 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4061 let filters = vec![FilterStatement {
4062 column: "a".to_string(),
4063 operator: FilterOperator::Gt,
4064 value: "2".to_string(),
4065 logical_op: LogicalOperator::And,
4066 }];
4067 state.filter(filters);
4068 let df = state.lf.clone().collect().unwrap();
4069 assert_eq!(df.shape().0, 1);
4070 assert_eq!(df.column("a").unwrap().get(0).unwrap(), AnyValue::Int32(3));
4071 }
4072
4073 #[test]
4074 fn test_sort() {
4075 let lf = create_test_lf();
4076 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4077 state.sort(vec!["a".to_string()], false);
4078 let df = state.lf.clone().collect().unwrap();
4079 assert_eq!(df.column("a").unwrap().get(0).unwrap(), AnyValue::Int32(3));
4080 }
4081
4082 #[test]
4083 fn test_query() {
4084 let lf = create_test_lf();
4085 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4086 state.query("select b where a = 2".to_string());
4087 let df = state.lf.clone().collect().unwrap();
4088 assert_eq!(df.shape(), (1, 1));
4089 assert_eq!(
4090 df.column("b").unwrap().get(0).unwrap(),
4091 AnyValue::String("y")
4092 );
4093 }
4094
4095 #[test]
4096 fn test_query_date_accessors() {
4097 use chrono::NaiveDate;
4098 let df = df!(
4099 "event_date" => [
4100 NaiveDate::from_ymd_opt(2024, 1, 15).unwrap(),
4101 NaiveDate::from_ymd_opt(2024, 6, 20).unwrap(),
4102 NaiveDate::from_ymd_opt(2024, 12, 31).unwrap(),
4103 ],
4104 "name" => &["a", "b", "c"],
4105 )
4106 .unwrap();
4107 let lf = df.lazy();
4108 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4109
4110 state.query("select name, year: event_date.year, month: event_date.month".to_string());
4112 assert!(
4113 state.error.is_none(),
4114 "query should succeed: {:?}",
4115 state.error
4116 );
4117 let df = state.lf.clone().collect().unwrap();
4118 assert_eq!(df.shape(), (3, 3));
4119 assert_eq!(
4120 df.column("year").unwrap().get(0).unwrap(),
4121 AnyValue::Int32(2024)
4122 );
4123 assert_eq!(
4124 df.column("month").unwrap().get(0).unwrap(),
4125 AnyValue::Int8(1)
4126 );
4127 assert_eq!(
4128 df.column("month").unwrap().get(1).unwrap(),
4129 AnyValue::Int8(6)
4130 );
4131
4132 state.query("select name, event_date where event_date.month = 12".to_string());
4134 assert!(
4135 state.error.is_none(),
4136 "filter should succeed: {:?}",
4137 state.error
4138 );
4139 let df = state.lf.clone().collect().unwrap();
4140 assert_eq!(df.height(), 1);
4141 assert_eq!(
4142 df.column("name").unwrap().get(0).unwrap(),
4143 AnyValue::String("c")
4144 );
4145
4146 state.query("select name, event_date where event_date.date > 2024.06.15".to_string());
4148 assert!(
4149 state.error.is_none(),
4150 "date literal filter should succeed: {:?}",
4151 state.error
4152 );
4153 let df = state.lf.clone().collect().unwrap();
4154 assert_eq!(
4155 df.height(),
4156 2,
4157 "2024-06-20 and 2024-12-31 are after 2024-06-15"
4158 );
4159
4160 state.query(
4162 "select name, upper_name: name.upper, name_len: name.len where name.ends_with[\"c\"]"
4163 .to_string(),
4164 );
4165 assert!(
4166 state.error.is_none(),
4167 "string accessors should succeed: {:?}",
4168 state.error
4169 );
4170 let df = state.lf.clone().collect().unwrap();
4171 assert_eq!(df.height(), 1, "only 'c' ends with 'c'");
4172 assert_eq!(
4173 df.column("upper_name").unwrap().get(0).unwrap(),
4174 AnyValue::String("C")
4175 );
4176
4177 state.query("select where event_date.date = 2020.01.01".to_string());
4179 assert!(state.error.is_none());
4180 assert_eq!(state.num_rows, 0);
4181 state.visible_rows = 10;
4182 state.collect();
4183 assert!(state.df.is_none(), "df must be cleared when num_rows is 0");
4184 assert!(
4185 state.locked_df.is_none(),
4186 "locked_df must be cleared when num_rows is 0"
4187 );
4188 }
4189
4190 #[test]
4191 fn test_select_next_previous() {
4192 let lf = create_large_test_lf();
4193 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4194 state.visible_rows = 10;
4195 state.table_state.select(Some(5));
4196
4197 state.select_next();
4198 assert_eq!(state.table_state.selected(), Some(6));
4199
4200 state.select_previous();
4201 assert_eq!(state.table_state.selected(), Some(5));
4202 }
4203
4204 #[test]
4205 fn test_page_up_down() {
4206 let lf = create_large_test_lf();
4207 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4208 state.visible_rows = 20;
4209 state.collect();
4210
4211 assert_eq!(state.start_row, 0);
4212 state.page_down();
4213 assert_eq!(state.start_row, 20);
4214 state.page_down();
4215 assert_eq!(state.start_row, 40);
4216 state.page_up();
4217 assert_eq!(state.start_row, 20);
4218 state.page_up();
4219 assert_eq!(state.start_row, 0);
4220 }
4221
4222 #[test]
4223 fn test_scroll_left_right() {
4224 let lf = create_large_test_lf();
4225 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4226 assert_eq!(state.termcol_index, 0);
4227 state.scroll_right();
4228 assert_eq!(state.termcol_index, 1);
4229 state.scroll_right();
4230 assert_eq!(state.termcol_index, 2);
4231 state.scroll_left();
4232 assert_eq!(state.termcol_index, 1);
4233 state.scroll_left();
4234 assert_eq!(state.termcol_index, 0);
4235 }
4236
4237 #[test]
4238 fn test_reverse() {
4239 let lf = create_test_lf();
4240 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4241 state.sort(vec!["a".to_string()], true);
4242 assert_eq!(
4243 state
4244 .lf
4245 .clone()
4246 .collect()
4247 .unwrap()
4248 .column("a")
4249 .unwrap()
4250 .get(0)
4251 .unwrap(),
4252 AnyValue::Int32(1)
4253 );
4254 state.reverse();
4255 assert_eq!(
4256 state
4257 .lf
4258 .clone()
4259 .collect()
4260 .unwrap()
4261 .column("a")
4262 .unwrap()
4263 .get(0)
4264 .unwrap(),
4265 AnyValue::Int32(3)
4266 );
4267 }
4268
4269 #[test]
4270 fn test_filter_multiple() {
4271 let lf = create_large_test_lf();
4272 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4273 let filters = vec![
4274 FilterStatement {
4275 column: "c".to_string(),
4276 operator: FilterOperator::Eq,
4277 value: "1".to_string(),
4278 logical_op: LogicalOperator::And,
4279 },
4280 FilterStatement {
4281 column: "d".to_string(),
4282 operator: FilterOperator::Eq,
4283 value: "2".to_string(),
4284 logical_op: LogicalOperator::And,
4285 },
4286 ];
4287 state.filter(filters);
4288 let df = state.lf.clone().collect().unwrap();
4289 assert_eq!(df.shape().0, 7);
4290 }
4291
4292 #[test]
4293 fn test_filter_and_sort() {
4294 let lf = create_large_test_lf();
4295 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4296 let filters = vec![FilterStatement {
4297 column: "c".to_string(),
4298 operator: FilterOperator::Eq,
4299 value: "1".to_string(),
4300 logical_op: LogicalOperator::And,
4301 }];
4302 state.filter(filters);
4303 state.sort(vec!["a".to_string()], false);
4304 let df = state.lf.clone().collect().unwrap();
4305 assert_eq!(df.column("a").unwrap().get(0).unwrap(), AnyValue::Int32(97));
4306 }
4307
4308 fn create_pivot_long_lf() -> LazyFrame {
4311 let df = df!(
4312 "id" => &[1_i32, 1, 1, 2, 2, 2, 1, 2],
4313 "date" => &["d1", "d1", "d1", "d1", "d1", "d1", "d1", "d1"],
4314 "key" => &["A", "B", "C", "A", "B", "C", "A", "B"],
4315 "value" => &[10.0_f64, 20.0, 30.0, 40.0, 50.0, 60.0, 11.0, 51.0],
4316 )
4317 .unwrap();
4318 df.lazy()
4319 }
4320
4321 fn create_melt_wide_lf() -> LazyFrame {
4323 let df = df!(
4324 "id" => &[1_i32, 2, 3],
4325 "date" => &["d1", "d2", "d3"],
4326 "c1" => &[10.0_f64, 20.0, 30.0],
4327 "c2" => &[11.0, 21.0, 31.0],
4328 "c3" => &[12.0, 22.0, 32.0],
4329 )
4330 .unwrap();
4331 df.lazy()
4332 }
4333
4334 #[test]
4335 fn test_pivot_basic() {
4336 let lf = create_pivot_long_lf();
4337 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4338 let spec = PivotSpec {
4339 index: vec!["id".to_string(), "date".to_string()],
4340 pivot_column: "key".to_string(),
4341 value_column: "value".to_string(),
4342 aggregation: PivotAggregation::Last,
4343 sort_columns: None,
4344 };
4345 state.pivot(&spec).unwrap();
4346 let df = state.lf.clone().collect().unwrap();
4347 let names: Vec<&str> = df.get_column_names().iter().map(|s| s.as_str()).collect();
4348 assert!(names.contains(&"id"));
4349 assert!(names.contains(&"date"));
4350 assert!(names.contains(&"A"));
4351 assert!(names.contains(&"B"));
4352 assert!(names.contains(&"C"));
4353 assert_eq!(df.height(), 2);
4354 }
4355
4356 #[test]
4357 fn test_pivot_aggregation_last() {
4358 let lf = create_pivot_long_lf();
4359 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4360 let spec = PivotSpec {
4361 index: vec!["id".to_string(), "date".to_string()],
4362 pivot_column: "key".to_string(),
4363 value_column: "value".to_string(),
4364 aggregation: PivotAggregation::Last,
4365 sort_columns: None,
4366 };
4367 state.pivot(&spec).unwrap();
4368 let df = state.lf.clone().collect().unwrap();
4369 let a_col = df.column("A").unwrap();
4370 let row0 = a_col.get(0).unwrap();
4371 let row1 = a_col.get(1).unwrap();
4372 assert_eq!(row0, AnyValue::Float64(11.0));
4373 assert_eq!(row1, AnyValue::Float64(40.0));
4374 }
4375
4376 #[test]
4377 fn test_pivot_aggregation_first() {
4378 let lf = create_pivot_long_lf();
4379 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4380 let spec = PivotSpec {
4381 index: vec!["id".to_string(), "date".to_string()],
4382 pivot_column: "key".to_string(),
4383 value_column: "value".to_string(),
4384 aggregation: PivotAggregation::First,
4385 sort_columns: None,
4386 };
4387 state.pivot(&spec).unwrap();
4388 let df = state.lf.clone().collect().unwrap();
4389 let a_col = df.column("A").unwrap();
4390 assert_eq!(a_col.get(0).unwrap(), AnyValue::Float64(10.0));
4391 assert_eq!(a_col.get(1).unwrap(), AnyValue::Float64(40.0));
4392 }
4393
4394 #[test]
4395 fn test_pivot_aggregation_min_max() {
4396 let lf = create_pivot_long_lf();
4397 let mut state_min = DataTableState::new(lf.clone(), None, None, None, None, true).unwrap();
4398 state_min
4399 .pivot(&PivotSpec {
4400 index: vec!["id".to_string(), "date".to_string()],
4401 pivot_column: "key".to_string(),
4402 value_column: "value".to_string(),
4403 aggregation: PivotAggregation::Min,
4404 sort_columns: None,
4405 })
4406 .unwrap();
4407 let df_min = state_min.lf.clone().collect().unwrap();
4408 assert_eq!(
4409 df_min.column("A").unwrap().get(0).unwrap(),
4410 AnyValue::Float64(10.0)
4411 );
4412
4413 let mut state_max = DataTableState::new(lf, None, None, None, None, true).unwrap();
4414 state_max
4415 .pivot(&PivotSpec {
4416 index: vec!["id".to_string(), "date".to_string()],
4417 pivot_column: "key".to_string(),
4418 value_column: "value".to_string(),
4419 aggregation: PivotAggregation::Max,
4420 sort_columns: None,
4421 })
4422 .unwrap();
4423 let df_max = state_max.lf.clone().collect().unwrap();
4424 assert_eq!(
4425 df_max.column("A").unwrap().get(0).unwrap(),
4426 AnyValue::Float64(11.0)
4427 );
4428 }
4429
4430 #[test]
4431 fn test_pivot_aggregation_avg_count() {
4432 let lf = create_pivot_long_lf();
4433 let mut state_avg = DataTableState::new(lf.clone(), None, None, None, None, true).unwrap();
4434 state_avg
4435 .pivot(&PivotSpec {
4436 index: vec!["id".to_string(), "date".to_string()],
4437 pivot_column: "key".to_string(),
4438 value_column: "value".to_string(),
4439 aggregation: PivotAggregation::Avg,
4440 sort_columns: None,
4441 })
4442 .unwrap();
4443 let df_avg = state_avg.lf.clone().collect().unwrap();
4444 let a = df_avg.column("A").unwrap().get(0).unwrap();
4445 if let AnyValue::Float64(x) = a {
4446 assert!((x - 10.5).abs() < 1e-6);
4447 } else {
4448 panic!("expected float");
4449 }
4450
4451 let mut state_count = DataTableState::new(lf, None, None, None, None, true).unwrap();
4452 state_count
4453 .pivot(&PivotSpec {
4454 index: vec!["id".to_string(), "date".to_string()],
4455 pivot_column: "key".to_string(),
4456 value_column: "value".to_string(),
4457 aggregation: PivotAggregation::Count,
4458 sort_columns: None,
4459 })
4460 .unwrap();
4461 let df_count = state_count.lf.clone().collect().unwrap();
4462 let a = df_count.column("A").unwrap().get(0).unwrap();
4463 assert_eq!(a, AnyValue::UInt32(2));
4464 }
4465
4466 #[test]
4467 fn test_pivot_string_first_last() {
4468 let df = df!(
4469 "id" => &[1_i32, 1, 2, 2],
4470 "key" => &["X", "Y", "X", "Y"],
4471 "value" => &["low", "mid", "high", "mid"],
4472 )
4473 .unwrap();
4474 let lf = df.lazy();
4475 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4476 let spec = PivotSpec {
4477 index: vec!["id".to_string()],
4478 pivot_column: "key".to_string(),
4479 value_column: "value".to_string(),
4480 aggregation: PivotAggregation::Last,
4481 sort_columns: None,
4482 };
4483 state.pivot(&spec).unwrap();
4484 let out = state.lf.clone().collect().unwrap();
4485 assert_eq!(
4486 out.column("X").unwrap().get(0).unwrap(),
4487 AnyValue::String("low")
4488 );
4489 assert_eq!(
4490 out.column("Y").unwrap().get(0).unwrap(),
4491 AnyValue::String("mid")
4492 );
4493 }
4494
4495 #[test]
4496 fn test_melt_basic() {
4497 let lf = create_melt_wide_lf();
4498 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4499 let spec = MeltSpec {
4500 index: vec!["id".to_string(), "date".to_string()],
4501 value_columns: vec!["c1".to_string(), "c2".to_string(), "c3".to_string()],
4502 variable_name: "variable".to_string(),
4503 value_name: "value".to_string(),
4504 };
4505 state.melt(&spec).unwrap();
4506 let df = state.lf.clone().collect().unwrap();
4507 assert_eq!(df.height(), 9);
4508 let names: Vec<&str> = df.get_column_names().iter().map(|s| s.as_str()).collect();
4509 assert!(names.contains(&"variable"));
4510 assert!(names.contains(&"value"));
4511 assert!(names.contains(&"id"));
4512 assert!(names.contains(&"date"));
4513 }
4514
4515 #[test]
4516 fn test_melt_all_except_index() {
4517 let lf = create_melt_wide_lf();
4518 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4519 let spec = MeltSpec {
4520 index: vec!["id".to_string(), "date".to_string()],
4521 value_columns: vec!["c1".to_string(), "c2".to_string(), "c3".to_string()],
4522 variable_name: "var".to_string(),
4523 value_name: "val".to_string(),
4524 };
4525 state.melt(&spec).unwrap();
4526 let df = state.lf.clone().collect().unwrap();
4527 assert!(df.column("var").is_ok());
4528 assert!(df.column("val").is_ok());
4529 }
4530
4531 #[test]
4532 fn test_pivot_on_current_view_after_filter() {
4533 let lf = create_pivot_long_lf();
4534 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4535 state.filter(vec![FilterStatement {
4536 column: "id".to_string(),
4537 operator: FilterOperator::Eq,
4538 value: "1".to_string(),
4539 logical_op: LogicalOperator::And,
4540 }]);
4541 let spec = PivotSpec {
4542 index: vec!["id".to_string(), "date".to_string()],
4543 pivot_column: "key".to_string(),
4544 value_column: "value".to_string(),
4545 aggregation: PivotAggregation::Last,
4546 sort_columns: None,
4547 };
4548 state.pivot(&spec).unwrap();
4549 let df = state.lf.clone().collect().unwrap();
4550 assert_eq!(df.height(), 1);
4551 let id_col = df.column("id").unwrap();
4552 assert_eq!(id_col.get(0).unwrap(), AnyValue::Int32(1));
4553 }
4554
4555 #[test]
4556 fn test_fuzzy_token_regex() {
4557 assert_eq!(fuzzy_token_regex("foo"), "(?i).*f.*o.*o.*");
4558 assert_eq!(fuzzy_token_regex("a"), "(?i).*a.*");
4559 let pat = fuzzy_token_regex("[");
4561 assert!(pat.contains("\\["));
4562 }
4563
4564 #[test]
4565 fn test_fuzzy_search() {
4566 crate::tests::ensure_sample_data();
4569 let path = crate::tests::sample_data_dir().join("3-sfd-header.csv");
4570 let mut state = DataTableState::from_csv(&path, &Default::default()).unwrap();
4571 state.visible_rows = 10;
4572 state.collect();
4573 let before = state.num_rows;
4574 state.fuzzy_search("string".to_string());
4575 assert!(state.error.is_none(), "{:?}", state.error);
4576 assert!(state.num_rows <= before, "fuzzy search should filter rows");
4577 state.fuzzy_search("".to_string());
4578 state.collect();
4579 assert_eq!(state.num_rows, before, "empty fuzzy search should reset");
4580 assert!(state.get_active_fuzzy_query().is_empty());
4581 }
4582
4583 #[test]
4584 fn test_fuzzy_search_regex_direct() {
4585 let lf = df!("name" => &["alice", "bob", "carol"]).unwrap().lazy();
4587 let pattern = fuzzy_token_regex("alice");
4588 let out = lf
4589 .filter(col("name").str().contains(lit(pattern.clone()), false))
4590 .collect()
4591 .unwrap();
4592 assert_eq!(out.height(), 1, "regex {:?} should match alice", pattern);
4593
4594 let lf2 = df!(
4596 "id" => &[1i32, 2, 3],
4597 "name" => &["alice", "bob", "carol"],
4598 "city" => &["NYC", "LA", "Boston"]
4599 )
4600 .unwrap()
4601 .lazy();
4602 let pat = fuzzy_token_regex("alice");
4603 let expr = col("name")
4604 .str()
4605 .contains(lit(pat.clone()), false)
4606 .or(col("city").str().contains(lit(pat), false));
4607 let out2 = lf2.clone().filter(expr).collect().unwrap();
4608 assert_eq!(out2.height(), 1);
4609
4610 let schema = lf2.clone().collect_schema().unwrap();
4612 let string_cols: Vec<String> = schema
4613 .iter()
4614 .filter(|(_, dtype)| dtype.is_string())
4615 .map(|(name, _)| name.to_string())
4616 .collect();
4617 assert!(
4618 !string_cols.is_empty(),
4619 "df! string cols should be detected"
4620 );
4621 let pattern = fuzzy_token_regex("alice");
4622 let token_expr = string_cols
4623 .iter()
4624 .map(|c| col(c.as_str()).str().contains(lit(pattern.clone()), false))
4625 .reduce(|a, b| a.or(b))
4626 .unwrap();
4627 let out3 = lf2.filter(token_expr).collect().unwrap();
4628 assert_eq!(
4629 out3.height(),
4630 1,
4631 "fuzzy_search-style filter should match 1 row"
4632 );
4633 }
4634
4635 #[test]
4636 fn test_fuzzy_search_no_string_columns() {
4637 let lf = df!("a" => &[1i32, 2, 3], "b" => &[10i64, 20, 30])
4638 .unwrap()
4639 .lazy();
4640 let mut state = DataTableState::new(lf, None, None, None, None, true).unwrap();
4641 state.fuzzy_search("x".to_string());
4642 assert!(state.error.is_some());
4643 }
4644
4645 #[test]
4648 fn test_by_query_result_sorted_by_group_columns() {
4649 let df = df!(
4651 "age_group" => &[3i64, 1, 5, 2, 4, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5],
4652 "team" => &[
4653 "Red", "Blue", "Green", "Red", "Blue", "Green", "Green", "Red", "Blue",
4654 "Green", "Red", "Blue", "Red", "Blue", "Green",
4655 ],
4656 "score" => &[50.0f64, 10.0, 90.0, 20.0, 30.0, 40.0, 60.0, 70.0, 80.0, 15.0, 25.0, 35.0, 45.0, 55.0, 65.0],
4657 )
4658 .unwrap();
4659 let lf = df.lazy();
4660 let options = crate::OpenOptions::default();
4661 let mut state = DataTableState::from_lazyframe(lf, &options).unwrap();
4662 state.query("select avg score by age_group, team".to_string());
4663 assert!(
4664 state.error.is_none(),
4665 "query should succeed: {:?}",
4666 state.error
4667 );
4668 let result = state.lf.collect().unwrap();
4669 let sorted = result
4671 .sort(
4672 ["age_group", "team"],
4673 SortMultipleOptions::default().with_order_descending(false),
4674 )
4675 .unwrap();
4676 assert_eq!(
4677 result, sorted,
4678 "by-query result must be sorted by (age_group, team)"
4679 );
4680 }
4681
4682 #[test]
4685 fn test_by_query_computed_group_key_sorted_by_result_column() {
4686 let df = df!(
4687 "x" => &[7.0f64, 12.0, 3.0, 22.0, 17.0, 8.0],
4688 "v" => &[1.0f64, 2.0, 3.0, 4.0, 5.0, 6.0],
4689 )
4690 .unwrap();
4691 let lf = df.lazy();
4692 let options = crate::OpenOptions::default();
4693 let mut state = DataTableState::from_lazyframe(lf, &options).unwrap();
4694 state.query("select sum v by bucket: 1+floor x % 3".to_string());
4696 assert!(
4697 state.error.is_none(),
4698 "query should succeed: {:?}",
4699 state.error
4700 );
4701 let result = state.lf.collect().unwrap();
4702 let bucket = result.column("bucket").unwrap();
4703 for i in 1..result.height() {
4705 let prev: i64 = bucket.get(i - 1).unwrap().try_extract().unwrap_or(0);
4706 let curr: i64 = bucket.get(i).unwrap().try_extract().unwrap_or(0);
4707 assert!(
4708 curr >= prev,
4709 "bucket column must be sorted: {} then {}",
4710 prev,
4711 curr
4712 );
4713 }
4714 }
4715}