1mod aggregations;
4mod joins;
5mod stats;
6mod transformations;
7
8pub use aggregations::{CubeRollupData, GroupedData};
9pub use joins::{join, JoinType};
10pub use stats::DataFrameStat;
11pub use transformations::{filter, order_by, order_by_exprs, select, with_column, DataFrameNa};
12
13use crate::column::Column;
14use crate::functions::SortOrder;
15use crate::schema::StructType;
16use polars::prelude::{
17 AnyValue, DataFrame as PlDataFrame, Expr, PolarsError, SchemaNamesAndDtypes,
18};
19use serde_json::Value as JsonValue;
20use std::collections::HashMap;
21use std::sync::Arc;
22
23const DEFAULT_CASE_SENSITIVE: bool = false;
25
26pub struct DataFrame {
29 pub(crate) df: Arc<PlDataFrame>,
30 pub(crate) case_sensitive: bool,
32}
33
34impl DataFrame {
35 pub fn from_polars(df: PlDataFrame) -> Self {
37 DataFrame {
38 df: Arc::new(df),
39 case_sensitive: DEFAULT_CASE_SENSITIVE,
40 }
41 }
42
43 pub fn from_polars_with_options(df: PlDataFrame, case_sensitive: bool) -> Self {
46 DataFrame {
47 df: Arc::new(df),
48 case_sensitive,
49 }
50 }
51
52 pub fn empty() -> Self {
54 DataFrame {
55 df: Arc::new(PlDataFrame::empty()),
56 case_sensitive: DEFAULT_CASE_SENSITIVE,
57 }
58 }
59
60 pub fn resolve_column_name(&self, name: &str) -> Result<String, PolarsError> {
63 let names = self.df.get_column_names();
64 if self.case_sensitive {
65 if names.iter().any(|n| *n == name) {
66 return Ok(name.to_string());
67 }
68 } else {
69 let name_lower = name.to_lowercase();
70 for n in names {
71 if n.to_lowercase() == name_lower {
72 return Ok(n.to_string());
73 }
74 }
75 }
76 let available: Vec<String> = self
77 .df
78 .get_column_names()
79 .iter()
80 .map(|s| s.to_string())
81 .collect();
82 Err(PolarsError::ColumnNotFound(
83 format!(
84 "Column '{}' not found. Available columns: [{}]. Check spelling and case sensitivity (spark.sql.caseSensitive).",
85 name,
86 available.join(", ")
87 )
88 .into(),
89 ))
90 }
91
92 pub fn schema(&self) -> Result<StructType, PolarsError> {
94 Ok(StructType::from_polars_schema(&self.df.schema()))
95 }
96
97 pub fn columns(&self) -> Result<Vec<String>, PolarsError> {
99 Ok(self
100 .df
101 .get_column_names()
102 .iter()
103 .map(|s| s.to_string())
104 .collect())
105 }
106
107 pub fn count(&self) -> Result<usize, PolarsError> {
109 Ok(self.df.height())
110 }
111
112 pub fn show(&self, n: Option<usize>) -> Result<(), PolarsError> {
114 let n = n.unwrap_or(20);
115 println!("{}", self.df.head(Some(n)));
116 Ok(())
117 }
118
119 pub fn collect(&self) -> Result<Arc<PlDataFrame>, PolarsError> {
121 Ok(self.df.clone())
122 }
123
124 pub fn collect_as_json_rows(&self) -> Result<Vec<HashMap<String, JsonValue>>, PolarsError> {
126 let df = self.df.as_ref();
127 let names = df.get_column_names();
128 let nrows = df.height();
129 let mut rows = Vec::with_capacity(nrows);
130 for i in 0..nrows {
131 let mut row = HashMap::with_capacity(names.len());
132 for (col_idx, name) in names.iter().enumerate() {
133 let s = df
134 .get_columns()
135 .get(col_idx)
136 .ok_or_else(|| PolarsError::ComputeError("column index out of range".into()))?;
137 let av = s.get(i)?;
138 let jv = any_value_to_json(av);
139 row.insert(name.to_string(), jv);
140 }
141 rows.push(row);
142 }
143 Ok(rows)
144 }
145
146 pub fn select(&self, cols: Vec<&str>) -> Result<DataFrame, PolarsError> {
149 let resolved: Vec<String> = cols
150 .iter()
151 .map(|c| self.resolve_column_name(c))
152 .collect::<Result<Vec<_>, _>>()?;
153 let refs: Vec<&str> = resolved.iter().map(|s| s.as_str()).collect();
154 let mut result = transformations::select(self, refs, self.case_sensitive)?;
155 if !self.case_sensitive {
157 for (requested, res) in cols.iter().zip(resolved.iter()) {
158 if *requested != res.as_str() {
159 result = result.with_column_renamed(res, requested)?;
160 }
161 }
162 }
163 Ok(result)
164 }
165
166 pub fn filter(&self, condition: Expr) -> Result<DataFrame, PolarsError> {
168 transformations::filter(self, condition, self.case_sensitive)
169 }
170
171 pub fn column(&self, name: &str) -> Result<Column, PolarsError> {
174 let resolved = self.resolve_column_name(name)?;
175 Ok(Column::new(resolved))
176 }
177
178 pub fn with_column(&self, column_name: &str, col: &Column) -> Result<DataFrame, PolarsError> {
181 transformations::with_column(self, column_name, col, self.case_sensitive)
182 }
183
184 pub fn with_column_expr(
186 &self,
187 column_name: &str,
188 expr: Expr,
189 ) -> Result<DataFrame, PolarsError> {
190 let col = Column::from_expr(expr, None);
191 self.with_column(column_name, &col)
192 }
193
194 pub fn group_by(&self, column_names: Vec<&str>) -> Result<GroupedData, PolarsError> {
197 use polars::prelude::*;
198 let resolved: Vec<String> = column_names
199 .iter()
200 .map(|c| self.resolve_column_name(c))
201 .collect::<Result<Vec<_>, _>>()?;
202 let exprs: Vec<Expr> = resolved.iter().map(|name| col(name.as_str())).collect();
203 let lazy_grouped = self.df.as_ref().clone().lazy().group_by(exprs);
204 Ok(GroupedData {
205 lazy_grouped,
206 grouping_cols: resolved,
207 case_sensitive: self.case_sensitive,
208 })
209 }
210
211 pub fn cube(&self, column_names: Vec<&str>) -> Result<CubeRollupData, PolarsError> {
213 let resolved: Vec<String> = column_names
214 .iter()
215 .map(|c| self.resolve_column_name(c))
216 .collect::<Result<Vec<_>, _>>()?;
217 Ok(CubeRollupData {
218 df: self.df.as_ref().clone(),
219 grouping_cols: resolved,
220 case_sensitive: self.case_sensitive,
221 is_cube: true,
222 })
223 }
224
225 pub fn rollup(&self, column_names: Vec<&str>) -> Result<CubeRollupData, PolarsError> {
227 let resolved: Vec<String> = column_names
228 .iter()
229 .map(|c| self.resolve_column_name(c))
230 .collect::<Result<Vec<_>, _>>()?;
231 Ok(CubeRollupData {
232 df: self.df.as_ref().clone(),
233 grouping_cols: resolved,
234 case_sensitive: self.case_sensitive,
235 is_cube: false,
236 })
237 }
238
239 pub fn join(
242 &self,
243 other: &DataFrame,
244 on: Vec<&str>,
245 how: JoinType,
246 ) -> Result<DataFrame, PolarsError> {
247 let resolved: Vec<String> = on
248 .iter()
249 .map(|c| self.resolve_column_name(c))
250 .collect::<Result<Vec<_>, _>>()?;
251 let on_refs: Vec<&str> = resolved.iter().map(|s| s.as_str()).collect();
252 join(self, other, on_refs, how, self.case_sensitive)
253 }
254
255 pub fn order_by(
258 &self,
259 column_names: Vec<&str>,
260 ascending: Vec<bool>,
261 ) -> Result<DataFrame, PolarsError> {
262 let resolved: Vec<String> = column_names
263 .iter()
264 .map(|c| self.resolve_column_name(c))
265 .collect::<Result<Vec<_>, _>>()?;
266 let refs: Vec<&str> = resolved.iter().map(|s| s.as_str()).collect();
267 transformations::order_by(self, refs, ascending, self.case_sensitive)
268 }
269
270 pub fn order_by_exprs(&self, sort_orders: Vec<SortOrder>) -> Result<DataFrame, PolarsError> {
272 transformations::order_by_exprs(self, sort_orders, self.case_sensitive)
273 }
274
275 pub fn union(&self, other: &DataFrame) -> Result<DataFrame, PolarsError> {
277 transformations::union(self, other, self.case_sensitive)
278 }
279
280 pub fn union_by_name(&self, other: &DataFrame) -> Result<DataFrame, PolarsError> {
282 transformations::union_by_name(self, other, self.case_sensitive)
283 }
284
285 pub fn distinct(&self, subset: Option<Vec<&str>>) -> Result<DataFrame, PolarsError> {
287 transformations::distinct(self, subset, self.case_sensitive)
288 }
289
290 pub fn drop(&self, columns: Vec<&str>) -> Result<DataFrame, PolarsError> {
292 transformations::drop(self, columns, self.case_sensitive)
293 }
294
295 pub fn dropna(&self, subset: Option<Vec<&str>>) -> Result<DataFrame, PolarsError> {
297 transformations::dropna(self, subset, self.case_sensitive)
298 }
299
300 pub fn fillna(&self, value: Expr) -> Result<DataFrame, PolarsError> {
302 transformations::fillna(self, value, self.case_sensitive)
303 }
304
305 pub fn limit(&self, n: usize) -> Result<DataFrame, PolarsError> {
307 transformations::limit(self, n, self.case_sensitive)
308 }
309
310 pub fn with_column_renamed(
312 &self,
313 old_name: &str,
314 new_name: &str,
315 ) -> Result<DataFrame, PolarsError> {
316 transformations::with_column_renamed(self, old_name, new_name, self.case_sensitive)
317 }
318
319 pub fn replace(
321 &self,
322 column_name: &str,
323 old_value: Expr,
324 new_value: Expr,
325 ) -> Result<DataFrame, PolarsError> {
326 transformations::replace(self, column_name, old_value, new_value, self.case_sensitive)
327 }
328
329 pub fn cross_join(&self, other: &DataFrame) -> Result<DataFrame, PolarsError> {
331 transformations::cross_join(self, other, self.case_sensitive)
332 }
333
334 pub fn describe(&self) -> Result<DataFrame, PolarsError> {
336 transformations::describe(self, self.case_sensitive)
337 }
338
339 pub fn cache(&self) -> Result<DataFrame, PolarsError> {
341 Ok(self.clone())
342 }
343
344 pub fn persist(&self) -> Result<DataFrame, PolarsError> {
346 Ok(self.clone())
347 }
348
349 pub fn unpersist(&self) -> Result<DataFrame, PolarsError> {
351 Ok(self.clone())
352 }
353
354 pub fn subtract(&self, other: &DataFrame) -> Result<DataFrame, PolarsError> {
356 transformations::subtract(self, other, self.case_sensitive)
357 }
358
359 pub fn intersect(&self, other: &DataFrame) -> Result<DataFrame, PolarsError> {
361 transformations::intersect(self, other, self.case_sensitive)
362 }
363
364 pub fn sample(
366 &self,
367 with_replacement: bool,
368 fraction: f64,
369 seed: Option<u64>,
370 ) -> Result<DataFrame, PolarsError> {
371 transformations::sample(self, with_replacement, fraction, seed, self.case_sensitive)
372 }
373
374 pub fn random_split(
376 &self,
377 weights: &[f64],
378 seed: Option<u64>,
379 ) -> Result<Vec<DataFrame>, PolarsError> {
380 transformations::random_split(self, weights, seed, self.case_sensitive)
381 }
382
383 pub fn sample_by(
386 &self,
387 col_name: &str,
388 fractions: &[(Expr, f64)],
389 seed: Option<u64>,
390 ) -> Result<DataFrame, PolarsError> {
391 transformations::sample_by(self, col_name, fractions, seed, self.case_sensitive)
392 }
393
394 pub fn first(&self) -> Result<DataFrame, PolarsError> {
396 transformations::first(self, self.case_sensitive)
397 }
398
399 pub fn head(&self, n: usize) -> Result<DataFrame, PolarsError> {
401 transformations::head(self, n, self.case_sensitive)
402 }
403
404 pub fn take(&self, n: usize) -> Result<DataFrame, PolarsError> {
406 transformations::take(self, n, self.case_sensitive)
407 }
408
409 pub fn tail(&self, n: usize) -> Result<DataFrame, PolarsError> {
411 transformations::tail(self, n, self.case_sensitive)
412 }
413
414 pub fn is_empty(&self) -> bool {
416 transformations::is_empty(self)
417 }
418
419 pub fn to_df(&self, names: Vec<&str>) -> Result<DataFrame, PolarsError> {
421 transformations::to_df(self, &names, self.case_sensitive)
422 }
423
424 pub fn stat(&self) -> DataFrameStat<'_> {
426 DataFrameStat { df: self }
427 }
428
429 pub fn corr(&self) -> Result<DataFrame, PolarsError> {
431 self.stat().corr_matrix()
432 }
433
434 pub fn summary(&self) -> Result<DataFrame, PolarsError> {
436 self.describe()
437 }
438
439 pub fn to_json(&self) -> Result<Vec<String>, PolarsError> {
441 transformations::to_json(self)
442 }
443
444 pub fn explain(&self) -> String {
446 transformations::explain(self)
447 }
448
449 pub fn print_schema(&self) -> Result<String, PolarsError> {
451 transformations::print_schema(self)
452 }
453
454 pub fn checkpoint(&self) -> Result<DataFrame, PolarsError> {
456 Ok(self.clone())
457 }
458
459 pub fn local_checkpoint(&self) -> Result<DataFrame, PolarsError> {
461 Ok(self.clone())
462 }
463
464 pub fn repartition(&self, _num_partitions: usize) -> Result<DataFrame, PolarsError> {
466 Ok(self.clone())
467 }
468
469 pub fn repartition_by_range(
471 &self,
472 _num_partitions: usize,
473 _cols: Vec<&str>,
474 ) -> Result<DataFrame, PolarsError> {
475 Ok(self.clone())
476 }
477
478 pub fn dtypes(&self) -> Result<Vec<(String, String)>, PolarsError> {
480 let schema = self.df.schema();
481 Ok(schema
482 .iter_names_and_dtypes()
483 .map(|(name, dtype)| (name.to_string(), format!("{dtype:?}")))
484 .collect())
485 }
486
487 pub fn sort_within_partitions(
489 &self,
490 _cols: &[crate::functions::SortOrder],
491 ) -> Result<DataFrame, PolarsError> {
492 Ok(self.clone())
493 }
494
495 pub fn coalesce(&self, _num_partitions: usize) -> Result<DataFrame, PolarsError> {
497 Ok(self.clone())
498 }
499
500 pub fn hint(&self, _name: &str, _params: &[i32]) -> Result<DataFrame, PolarsError> {
502 Ok(self.clone())
503 }
504
505 pub fn is_local(&self) -> bool {
507 true
508 }
509
510 pub fn input_files(&self) -> Vec<String> {
512 Vec::new()
513 }
514
515 pub fn same_semantics(&self, _other: &DataFrame) -> bool {
517 false
518 }
519
520 pub fn semantic_hash(&self) -> u64 {
522 0
523 }
524
525 pub fn observe(&self, _name: &str, _expr: Expr) -> Result<DataFrame, PolarsError> {
527 Ok(self.clone())
528 }
529
530 pub fn with_watermark(
532 &self,
533 _event_time: &str,
534 _delay: &str,
535 ) -> Result<DataFrame, PolarsError> {
536 Ok(self.clone())
537 }
538
539 pub fn select_expr(&self, exprs: &[String]) -> Result<DataFrame, PolarsError> {
541 transformations::select_expr(self, exprs, self.case_sensitive)
542 }
543
544 pub fn col_regex(&self, pattern: &str) -> Result<DataFrame, PolarsError> {
546 transformations::col_regex(self, pattern, self.case_sensitive)
547 }
548
549 pub fn with_columns(&self, exprs: &[(String, Column)]) -> Result<DataFrame, PolarsError> {
551 transformations::with_columns(self, exprs, self.case_sensitive)
552 }
553
554 pub fn with_columns_renamed(
556 &self,
557 renames: &[(String, String)],
558 ) -> Result<DataFrame, PolarsError> {
559 transformations::with_columns_renamed(self, renames, self.case_sensitive)
560 }
561
562 pub fn na(&self) -> DataFrameNa<'_> {
564 DataFrameNa { df: self }
565 }
566
567 pub fn offset(&self, n: usize) -> Result<DataFrame, PolarsError> {
569 transformations::offset(self, n, self.case_sensitive)
570 }
571
572 pub fn transform<F>(&self, f: F) -> Result<DataFrame, PolarsError>
574 where
575 F: FnOnce(DataFrame) -> Result<DataFrame, PolarsError>,
576 {
577 transformations::transform(self, f)
578 }
579
580 pub fn freq_items(&self, columns: &[&str], support: f64) -> Result<DataFrame, PolarsError> {
582 transformations::freq_items(self, columns, support, self.case_sensitive)
583 }
584
585 pub fn approx_quantile(
587 &self,
588 column: &str,
589 probabilities: &[f64],
590 ) -> Result<DataFrame, PolarsError> {
591 transformations::approx_quantile(self, column, probabilities, self.case_sensitive)
592 }
593
594 pub fn crosstab(&self, col1: &str, col2: &str) -> Result<DataFrame, PolarsError> {
596 transformations::crosstab(self, col1, col2, self.case_sensitive)
597 }
598
599 pub fn melt(&self, id_vars: &[&str], value_vars: &[&str]) -> Result<DataFrame, PolarsError> {
601 transformations::melt(self, id_vars, value_vars, self.case_sensitive)
602 }
603
604 pub fn pivot(
606 &self,
607 _pivot_col: &str,
608 _values: Option<Vec<&str>>,
609 ) -> Result<DataFrame, PolarsError> {
610 Err(PolarsError::InvalidOperation(
611 "pivot is not yet implemented; use crosstab(col1, col2) for two-column cross-tabulation."
612 .into(),
613 ))
614 }
615
616 pub fn except_all(&self, other: &DataFrame) -> Result<DataFrame, PolarsError> {
618 transformations::except_all(self, other, self.case_sensitive)
619 }
620
621 pub fn intersect_all(&self, other: &DataFrame) -> Result<DataFrame, PolarsError> {
623 transformations::intersect_all(self, other, self.case_sensitive)
624 }
625
626 #[cfg(feature = "delta")]
629 pub fn write_delta(
630 &self,
631 path: impl AsRef<std::path::Path>,
632 overwrite: bool,
633 ) -> Result<(), PolarsError> {
634 crate::delta::write_delta(self.df.as_ref(), path, overwrite)
635 }
636
637 #[cfg(not(feature = "delta"))]
639 pub fn write_delta(
640 &self,
641 _path: impl AsRef<std::path::Path>,
642 _overwrite: bool,
643 ) -> Result<(), PolarsError> {
644 Err(PolarsError::InvalidOperation(
645 "Delta Lake requires the 'delta' feature. Build with --features delta.".into(),
646 ))
647 }
648
649 pub fn write(&self) -> DataFrameWriter<'_> {
651 DataFrameWriter {
652 df: self,
653 mode: WriteMode::Overwrite,
654 format: WriteFormat::Parquet,
655 }
656 }
657}
658
659#[derive(Clone, Copy)]
661pub enum WriteMode {
662 Overwrite,
663 Append,
664}
665
666#[derive(Clone, Copy)]
668pub enum WriteFormat {
669 Parquet,
670 Csv,
671 Json,
672}
673
674pub struct DataFrameWriter<'a> {
676 df: &'a DataFrame,
677 mode: WriteMode,
678 format: WriteFormat,
679}
680
681impl<'a> DataFrameWriter<'a> {
682 pub fn mode(mut self, mode: WriteMode) -> Self {
683 self.mode = mode;
684 self
685 }
686
687 pub fn format(mut self, format: WriteFormat) -> Self {
688 self.format = format;
689 self
690 }
691
692 pub fn save(&self, path: impl AsRef<std::path::Path>) -> Result<(), PolarsError> {
694 use polars::prelude::*;
695 let path = path.as_ref();
696 let to_write: PlDataFrame = match self.mode {
697 WriteMode::Overwrite => self.df.df.as_ref().clone(),
698 WriteMode::Append => {
699 use polars::prelude::*;
700 let existing: Option<PlDataFrame> = if path.exists() {
701 match self.format {
702 WriteFormat::Parquet => {
703 LazyFrame::scan_parquet(path, ScanArgsParquet::default())
704 .and_then(|lf| lf.collect())
705 .ok()
706 }
707 WriteFormat::Csv => LazyCsvReader::new(path)
708 .with_has_header(true)
709 .finish()
710 .and_then(|lf| lf.collect())
711 .ok(),
712 WriteFormat::Json => LazyJsonLineReader::new(path)
713 .finish()
714 .and_then(|lf| lf.collect())
715 .ok(),
716 }
717 } else {
718 None
719 };
720 match existing {
721 Some(existing) => {
722 let lfs: [polars::prelude::LazyFrame; 2] =
723 [existing.lazy(), self.df.df.as_ref().clone().lazy()];
724 polars::prelude::concat(lfs, UnionArgs::default())?.collect()?
725 }
726 None => self.df.df.as_ref().clone(),
727 }
728 }
729 };
730 match self.format {
731 WriteFormat::Parquet => {
732 let mut file = std::fs::File::create(path).map_err(|e| {
733 PolarsError::ComputeError(format!("write parquet create: {e}").into())
734 })?;
735 let mut df_mut = to_write;
736 ParquetWriter::new(&mut file)
737 .finish(&mut df_mut)
738 .map_err(|e| PolarsError::ComputeError(format!("write parquet: {e}").into()))?;
739 }
740 WriteFormat::Csv => {
741 let mut file = std::fs::File::create(path).map_err(|e| {
742 PolarsError::ComputeError(format!("write csv create: {e}").into())
743 })?;
744 CsvWriter::new(&mut file)
745 .finish(&mut to_write.clone())
746 .map_err(|e| PolarsError::ComputeError(format!("write csv: {e}").into()))?;
747 }
748 WriteFormat::Json => {
749 let mut file = std::fs::File::create(path).map_err(|e| {
750 PolarsError::ComputeError(format!("write json create: {e}").into())
751 })?;
752 JsonWriter::new(&mut file)
753 .finish(&mut to_write.clone())
754 .map_err(|e| PolarsError::ComputeError(format!("write json: {e}").into()))?;
755 }
756 }
757 Ok(())
758 }
759}
760
761impl Clone for DataFrame {
762 fn clone(&self) -> Self {
763 DataFrame {
764 df: self.df.clone(),
765 case_sensitive: self.case_sensitive,
766 }
767 }
768}
769
770fn any_value_to_json(av: AnyValue<'_>) -> JsonValue {
772 match av {
773 AnyValue::Null => JsonValue::Null,
774 AnyValue::Boolean(b) => JsonValue::Bool(b),
775 AnyValue::Int32(i) => JsonValue::Number(serde_json::Number::from(i)),
776 AnyValue::Int64(i) => JsonValue::Number(serde_json::Number::from(i)),
777 AnyValue::UInt32(u) => JsonValue::Number(serde_json::Number::from(u)),
778 AnyValue::UInt64(u) => JsonValue::Number(serde_json::Number::from(u)),
779 AnyValue::Float32(f) => serde_json::Number::from_f64(f64::from(f))
780 .map(JsonValue::Number)
781 .unwrap_or(JsonValue::Null),
782 AnyValue::Float64(f) => serde_json::Number::from_f64(f)
783 .map(JsonValue::Number)
784 .unwrap_or(JsonValue::Null),
785 AnyValue::String(s) => JsonValue::String(s.to_string()),
786 AnyValue::StringOwned(s) => JsonValue::String(s.to_string()),
787 _ => JsonValue::Null,
788 }
789}