Skip to main content

robin_sparkless/
dataframe.rs

1//! Root-owned DataFrame API; delegates to robin-sparkless-polars for execution.
2
3use robin_sparkless_core::EngineError;
4use robin_sparkless_core::engine::{CollectedRows, DataFrameBackend, GroupedDataBackend};
5use robin_sparkless_core::expr::ExprIr;
6use robin_sparkless_core::{DataType, StructType};
7use robin_sparkless_polars::dataframe::{
8    DataFrameNa as PolarsDataFrameNa, DataFrameStat as PolarsDataFrameStat,
9    DataFrameWriter as PolarsDataFrameWriter,
10};
11use robin_sparkless_polars::functions::SortOrder;
12use robin_sparkless_polars::{
13    Column, CubeRollupData as PolarsCubeRollupData, DataFrame as PolarsDataFrame, Expr,
14    GroupedData as PolarsGroupedData, LazyFrame, PivotedGroupedData as PolarsPivotedGroupedData,
15    PlDataFrame, PolarsError,
16};
17use std::collections::HashMap;
18use std::path::Path;
19use std::sync::Arc;
20
21use serde_json::Value as JsonValue;
22
23/// Downcast trait result to root DataFrame (Polars backend).
24fn downcast_df(box_df: Box<dyn DataFrameBackend>) -> Result<DataFrame, EngineError> {
25    let concrete = box_df
26        .as_any()
27        .downcast_ref::<PolarsDataFrame>()
28        .ok_or_else(|| EngineError::Internal("expected Polars backend".into()))?;
29    Ok(DataFrame(concrete.clone()))
30}
31
32/// Root-owned DataFrame; delegates to the Polars backend.
33#[derive(Clone)]
34pub struct DataFrame(pub(crate) PolarsDataFrame);
35
36/// Root-owned GroupedData; delegates to the Polars backend.
37pub struct GroupedData(pub(crate) PolarsGroupedData);
38
39/// Root-owned CubeRollupData; delegates to the Polars backend.
40pub struct CubeRollupData(pub(crate) PolarsCubeRollupData);
41
42/// Root-owned PivotedGroupedData; delegates to the Polars backend.
43pub struct PivotedGroupedData(pub(crate) PolarsPivotedGroupedData);
44
45/// Re-export for API compatibility.
46pub use robin_sparkless_polars::dataframe::{
47    JoinType, SaveMode, SelectItem, WriteFormat, WriteMode,
48};
49
50/// Root-owned DataFrameStat; delegates to the Polars backend.
51pub struct DataFrameStat<'a>(PolarsDataFrameStat<'a>);
52
53/// Root-owned DataFrameNa; delegates to the Polars backend.
54pub struct DataFrameNa<'a>(PolarsDataFrameNa<'a>);
55
56/// Root-owned DataFrameWriter; delegates to the Polars backend.
57pub struct DataFrameWriter<'a> {
58    inner: PolarsDataFrameWriter<'a>,
59}
60
61impl DataFrame {
62    /// Create from a Polars DataFrame (via backend).
63    pub fn from_polars(df: PlDataFrame) -> Self {
64        DataFrame(PolarsDataFrame::from_polars(df))
65    }
66
67    /// Create from a Polars DataFrame with options.
68    pub fn from_polars_with_options(df: PlDataFrame, case_sensitive: bool) -> Self {
69        DataFrame(PolarsDataFrame::from_polars_with_options(
70            df,
71            case_sensitive,
72        ))
73    }
74
75    /// Create from a LazyFrame (via backend).
76    pub fn from_lazy(lf: LazyFrame) -> Self {
77        DataFrame(PolarsDataFrame::from_lazy(lf))
78    }
79
80    /// Create from a LazyFrame with options.
81    pub fn from_lazy_with_options(lf: LazyFrame, case_sensitive: bool) -> Self {
82        DataFrame(PolarsDataFrame::from_lazy_with_options(lf, case_sensitive))
83    }
84
85    /// Create an empty DataFrame.
86    pub fn empty() -> Self {
87        DataFrame(PolarsDataFrame::empty())
88    }
89
90    /// Return a DataFrame with the given alias.
91    pub fn alias(&self, name: &str) -> Self {
92        DataFrame(self.0.alias(name))
93    }
94
95    /// Filter rows using an engine-agnostic expression (ExprIr).
96    pub fn filter_expr_ir(&self, condition: &ExprIr) -> Result<DataFrame, EngineError> {
97        downcast_df(DataFrameBackend::filter(&self.0, condition)?)
98    }
99
100    /// Select columns/expressions using ExprIr.
101    pub fn select_expr_ir(&self, exprs: &[ExprIr]) -> Result<DataFrame, EngineError> {
102        downcast_df(DataFrameBackend::select(&self.0, exprs)?)
103    }
104
105    /// Add or replace a column using ExprIr.
106    pub fn with_column_expr_ir(&self, name: &str, expr: &ExprIr) -> Result<DataFrame, EngineError> {
107        downcast_df(DataFrameBackend::with_column(&self.0, name, expr)?)
108    }
109
110    /// Collect as engine-agnostic rows (column name -> JSON value per row).
111    pub fn collect_rows(&self) -> Result<CollectedRows, EngineError> {
112        DataFrameBackend::collect(&self.0)
113    }
114
115    pub fn resolve_expr_column_names(&self, expr: Expr) -> Result<Expr, PolarsError> {
116        self.0.resolve_expr_column_names(expr)
117    }
118
119    pub fn coerce_string_numeric_comparisons(&self, expr: Expr) -> Result<Expr, PolarsError> {
120        self.0.coerce_string_numeric_comparisons(expr)
121    }
122
123    pub fn resolve_column_name(&self, name: &str) -> Result<String, PolarsError> {
124        self.0.resolve_column_name(name)
125    }
126
127    pub fn schema(&self) -> Result<StructType, PolarsError> {
128        self.0.schema()
129    }
130
131    pub fn schema_engine(&self) -> Result<StructType, EngineError> {
132        self.0.schema_engine()
133    }
134
135    pub fn get_column_dtype(&self, name: &str) -> Option<robin_sparkless_polars::PlDataType> {
136        self.0.get_column_dtype(name)
137    }
138
139    pub fn get_column_data_type(&self, name: &str) -> Option<DataType> {
140        self.0.get_column_data_type(name)
141    }
142
143    pub fn columns(&self) -> Result<Vec<String>, PolarsError> {
144        self.0.columns()
145    }
146
147    pub fn columns_engine(&self) -> Result<Vec<String>, EngineError> {
148        self.0.columns_engine()
149    }
150
151    pub fn count(&self) -> Result<usize, PolarsError> {
152        self.0.count()
153    }
154
155    pub fn count_engine(&self) -> Result<usize, EngineError> {
156        self.0.count_engine()
157    }
158
159    pub fn show(&self, n: Option<usize>) -> Result<(), PolarsError> {
160        self.0.show(n)
161    }
162
163    pub fn collect(&self) -> Result<Arc<PlDataFrame>, PolarsError> {
164        self.0.collect()
165    }
166
167    pub fn collect_as_json_rows_engine(
168        &self,
169    ) -> Result<Vec<HashMap<String, JsonValue>>, EngineError> {
170        self.0.collect_as_json_rows_engine()
171    }
172
173    pub fn collect_as_json_rows(&self) -> Result<Vec<HashMap<String, JsonValue>>, PolarsError> {
174        self.0.collect_as_json_rows()
175    }
176
177    pub fn to_json_rows(&self) -> Result<String, EngineError> {
178        self.0.to_json_rows()
179    }
180
181    pub fn select_exprs(&self, exprs: Vec<Expr>) -> Result<DataFrame, PolarsError> {
182        self.0.select_exprs(exprs).map(DataFrame)
183    }
184
185    pub fn select(&self, cols: Vec<&str>) -> Result<DataFrame, PolarsError> {
186        self.0.select(cols).map(DataFrame)
187    }
188
189    pub fn select_engine(&self, cols: Vec<&str>) -> Result<DataFrame, EngineError> {
190        self.0.select_engine(cols).map(DataFrame)
191    }
192
193    pub fn select_items(&self, items: Vec<SelectItem<'_>>) -> Result<DataFrame, PolarsError> {
194        self.0.select_items(items).map(DataFrame)
195    }
196
197    pub fn filter(&self, condition: Expr) -> Result<DataFrame, PolarsError> {
198        self.0.filter(condition).map(DataFrame)
199    }
200
201    pub fn filter_engine(&self, condition: Expr) -> Result<DataFrame, EngineError> {
202        self.0.filter_engine(condition).map(DataFrame)
203    }
204
205    pub fn column(&self, name: &str) -> Result<Column, PolarsError> {
206        self.0.column(name)
207    }
208
209    pub fn with_column(&self, column_name: &str, col: &Column) -> Result<DataFrame, PolarsError> {
210        self.0.with_column(column_name, col).map(DataFrame)
211    }
212
213    pub fn with_column_engine(
214        &self,
215        column_name: &str,
216        col: &Column,
217    ) -> Result<DataFrame, EngineError> {
218        self.0.with_column_engine(column_name, col).map(DataFrame)
219    }
220
221    pub fn with_column_expr(
222        &self,
223        column_name: &str,
224        expr: Expr,
225    ) -> Result<DataFrame, PolarsError> {
226        self.0.with_column_expr(column_name, expr).map(DataFrame)
227    }
228
229    pub fn group_by(&self, column_names: Vec<&str>) -> Result<GroupedData, PolarsError> {
230        self.0.group_by(column_names).map(GroupedData)
231    }
232
233    pub fn group_by_engine(&self, column_names: Vec<&str>) -> Result<GroupedData, EngineError> {
234        self.0.group_by_engine(column_names).map(GroupedData)
235    }
236
237    pub fn group_by_exprs(
238        &self,
239        exprs: Vec<Expr>,
240        grouping_col_names: Vec<String>,
241    ) -> Result<GroupedData, PolarsError> {
242        self.0
243            .group_by_exprs(exprs, grouping_col_names)
244            .map(GroupedData)
245    }
246
247    pub fn cube(&self, column_names: Vec<&str>) -> Result<CubeRollupData, PolarsError> {
248        self.0.cube(column_names).map(CubeRollupData)
249    }
250
251    pub fn rollup(&self, column_names: Vec<&str>) -> Result<CubeRollupData, PolarsError> {
252        self.0.rollup(column_names).map(CubeRollupData)
253    }
254
255    pub fn agg(&self, aggregations: Vec<Expr>) -> Result<DataFrame, PolarsError> {
256        self.0.agg(aggregations).map(DataFrame)
257    }
258
259    pub fn join(
260        &self,
261        other: &DataFrame,
262        on: Vec<&str>,
263        how: JoinType,
264    ) -> Result<DataFrame, PolarsError> {
265        self.0.join(&other.0, on, how).map(DataFrame)
266    }
267
268    pub fn order_by(
269        &self,
270        column_names: Vec<&str>,
271        ascending: Vec<bool>,
272    ) -> Result<DataFrame, PolarsError> {
273        self.0.order_by(column_names, ascending).map(DataFrame)
274    }
275
276    pub fn order_by_exprs(&self, sort_orders: Vec<SortOrder>) -> Result<DataFrame, PolarsError> {
277        self.0.order_by_exprs(sort_orders).map(DataFrame)
278    }
279
280    pub fn union(&self, other: &DataFrame) -> Result<DataFrame, PolarsError> {
281        self.0.union(&other.0).map(DataFrame)
282    }
283
284    pub fn union_all(&self, other: &DataFrame) -> Result<DataFrame, PolarsError> {
285        self.0.union_all(&other.0).map(DataFrame)
286    }
287
288    pub fn union_by_name(
289        &self,
290        other: &DataFrame,
291        allow_missing_columns: bool,
292    ) -> Result<DataFrame, PolarsError> {
293        self.0
294            .union_by_name(&other.0, allow_missing_columns)
295            .map(DataFrame)
296    }
297
298    pub fn distinct(&self, subset: Option<Vec<&str>>) -> Result<DataFrame, PolarsError> {
299        self.0.distinct(subset).map(DataFrame)
300    }
301
302    pub fn drop(&self, columns: Vec<&str>) -> Result<DataFrame, PolarsError> {
303        self.0.drop(columns).map(DataFrame)
304    }
305
306    pub fn dropna(
307        &self,
308        subset: Option<Vec<&str>>,
309        how: &str,
310        thresh: Option<usize>,
311    ) -> Result<DataFrame, PolarsError> {
312        self.0.dropna(subset, how, thresh).map(DataFrame)
313    }
314
315    pub fn fillna(&self, value: Expr, subset: Option<Vec<&str>>) -> Result<DataFrame, PolarsError> {
316        self.0.fillna(value, subset).map(DataFrame)
317    }
318
319    pub fn limit(&self, n: usize) -> Result<DataFrame, PolarsError> {
320        self.0.limit(n).map(DataFrame)
321    }
322
323    pub fn limit_engine(&self, n: usize) -> Result<DataFrame, EngineError> {
324        self.0.limit_engine(n).map(DataFrame)
325    }
326
327    pub fn with_column_renamed(
328        &self,
329        old_name: &str,
330        new_name: &str,
331    ) -> Result<DataFrame, PolarsError> {
332        self.0
333            .with_column_renamed(old_name, new_name)
334            .map(DataFrame)
335    }
336
337    pub fn replace(
338        &self,
339        column_name: &str,
340        old_value: Expr,
341        new_value: Expr,
342    ) -> Result<DataFrame, PolarsError> {
343        self.0
344            .replace(column_name, old_value, new_value)
345            .map(DataFrame)
346    }
347
348    pub fn cross_join(&self, other: &DataFrame) -> Result<DataFrame, PolarsError> {
349        self.0.cross_join(&other.0).map(DataFrame)
350    }
351
352    pub fn describe(&self) -> Result<DataFrame, PolarsError> {
353        self.0.describe().map(DataFrame)
354    }
355
356    pub fn cache(&self) -> Result<DataFrame, PolarsError> {
357        self.0.cache().map(DataFrame)
358    }
359
360    pub fn persist(&self) -> Result<DataFrame, PolarsError> {
361        self.0.persist().map(DataFrame)
362    }
363
364    pub fn unpersist(&self) -> Result<DataFrame, PolarsError> {
365        self.0.unpersist().map(DataFrame)
366    }
367
368    pub fn subtract(&self, other: &DataFrame) -> Result<DataFrame, PolarsError> {
369        self.0.subtract(&other.0).map(DataFrame)
370    }
371
372    pub fn intersect(&self, other: &DataFrame) -> Result<DataFrame, PolarsError> {
373        self.0.intersect(&other.0).map(DataFrame)
374    }
375
376    pub fn sample(
377        &self,
378        with_replacement: bool,
379        fraction: f64,
380        seed: Option<u64>,
381    ) -> Result<DataFrame, PolarsError> {
382        self.0
383            .sample(with_replacement, fraction, seed)
384            .map(DataFrame)
385    }
386
387    pub fn random_split(
388        &self,
389        weights: &[f64],
390        seed: Option<u64>,
391    ) -> Result<Vec<DataFrame>, PolarsError> {
392        self.0
393            .random_split(weights, seed)
394            .map(|v| v.into_iter().map(DataFrame).collect())
395    }
396
397    pub fn sample_by(
398        &self,
399        col_name: &str,
400        fractions: &[(Expr, f64)],
401        seed: Option<u64>,
402    ) -> Result<DataFrame, PolarsError> {
403        self.0.sample_by(col_name, fractions, seed).map(DataFrame)
404    }
405
406    pub fn first(&self) -> Result<DataFrame, PolarsError> {
407        self.0.first().map(DataFrame)
408    }
409
410    pub fn head(&self, n: usize) -> Result<DataFrame, PolarsError> {
411        self.0.head(n).map(DataFrame)
412    }
413
414    pub fn take(&self, n: usize) -> Result<DataFrame, PolarsError> {
415        self.0.take(n).map(DataFrame)
416    }
417
418    pub fn tail(&self, n: usize) -> Result<DataFrame, PolarsError> {
419        self.0.tail(n).map(DataFrame)
420    }
421
422    pub fn is_empty(&self) -> bool {
423        self.0.is_empty()
424    }
425
426    pub fn to_df(&self, names: Vec<&str>) -> Result<DataFrame, PolarsError> {
427        self.0.to_df(names).map(DataFrame)
428    }
429
430    pub fn stat(&self) -> DataFrameStat<'_> {
431        DataFrameStat(PolarsDataFrameStat::new(&self.0))
432    }
433
434    pub fn corr(&self) -> Result<DataFrame, PolarsError> {
435        self.0.corr().map(DataFrame)
436    }
437
438    pub fn corr_cols(&self, col1: &str, col2: &str) -> Result<f64, PolarsError> {
439        self.0.corr_cols(col1, col2)
440    }
441
442    pub fn cov_cols(&self, col1: &str, col2: &str) -> Result<f64, PolarsError> {
443        self.0.cov_cols(col1, col2)
444    }
445
446    pub fn summary(&self) -> Result<DataFrame, PolarsError> {
447        self.0.summary().map(DataFrame)
448    }
449
450    pub fn to_json(&self) -> Result<Vec<String>, PolarsError> {
451        self.0.to_json()
452    }
453
454    pub fn explain(&self) -> String {
455        self.0.explain()
456    }
457
458    pub fn print_schema(&self) -> Result<String, PolarsError> {
459        self.0.print_schema()
460    }
461
462    pub fn checkpoint(&self) -> Result<DataFrame, PolarsError> {
463        self.0.checkpoint().map(DataFrame)
464    }
465
466    pub fn local_checkpoint(&self) -> Result<DataFrame, PolarsError> {
467        self.0.local_checkpoint().map(DataFrame)
468    }
469
470    pub fn repartition(&self, num_partitions: usize) -> Result<DataFrame, PolarsError> {
471        self.0.repartition(num_partitions).map(DataFrame)
472    }
473
474    pub fn repartition_by_range(
475        &self,
476        num_partitions: usize,
477        columns: Vec<&str>,
478    ) -> Result<DataFrame, PolarsError> {
479        self.0
480            .repartition_by_range(num_partitions, columns)
481            .map(DataFrame)
482    }
483
484    pub fn dtypes(&self) -> Result<Vec<(String, String)>, PolarsError> {
485        self.0.dtypes()
486    }
487
488    pub fn sort_within_partitions(&self, cols: &[SortOrder]) -> Result<DataFrame, PolarsError> {
489        self.0.sort_within_partitions(cols).map(DataFrame)
490    }
491
492    pub fn coalesce(&self, num_partitions: usize) -> Result<DataFrame, PolarsError> {
493        self.0.coalesce(num_partitions).map(DataFrame)
494    }
495
496    pub fn hint(&self, name: &str, params: &[i32]) -> Result<DataFrame, PolarsError> {
497        self.0.hint(name, params).map(DataFrame)
498    }
499
500    pub fn is_local(&self) -> bool {
501        self.0.is_local()
502    }
503
504    pub fn input_files(&self) -> Vec<String> {
505        self.0.input_files()
506    }
507
508    pub fn same_semantics(&self, other: &DataFrame) -> bool {
509        self.0.same_semantics(&other.0)
510    }
511
512    pub fn semantic_hash(&self) -> u64 {
513        self.0.semantic_hash()
514    }
515
516    pub fn observe(&self, name: &str, expr: Expr) -> Result<DataFrame, PolarsError> {
517        self.0.observe(name, expr).map(DataFrame)
518    }
519
520    pub fn with_watermark(
521        &self,
522        event_time: &str,
523        delay_threshold: &str,
524    ) -> Result<DataFrame, PolarsError> {
525        self.0
526            .with_watermark(event_time, delay_threshold)
527            .map(DataFrame)
528    }
529
530    pub fn select_expr(&self, exprs: &[String]) -> Result<DataFrame, PolarsError> {
531        self.0.select_expr(exprs).map(DataFrame)
532    }
533
534    pub fn col_regex(&self, pattern: &str) -> Result<DataFrame, PolarsError> {
535        self.0.col_regex(pattern).map(DataFrame)
536    }
537
538    pub fn with_columns(&self, exprs: &[(String, Column)]) -> Result<DataFrame, PolarsError> {
539        self.0.with_columns(exprs).map(DataFrame)
540    }
541
542    pub fn with_columns_renamed(
543        &self,
544        renames: &[(String, String)],
545    ) -> Result<DataFrame, PolarsError> {
546        self.0.with_columns_renamed(renames).map(DataFrame)
547    }
548
549    pub fn na(&self) -> DataFrameNa<'_> {
550        DataFrameNa(PolarsDataFrameNa::new(&self.0))
551    }
552
553    pub fn offset(&self, n: usize) -> Result<DataFrame, PolarsError> {
554        self.0.offset(n).map(DataFrame)
555    }
556
557    pub fn transform<F>(&self, f: F) -> Result<DataFrame, PolarsError>
558    where
559        F: FnOnce(DataFrame) -> Result<DataFrame, PolarsError>,
560    {
561        self.0
562            .transform(|polars_df| f(DataFrame(polars_df)).map(|r| r.0))
563            .map(DataFrame)
564    }
565
566    pub fn freq_items(&self, columns: &[&str], support: f64) -> Result<DataFrame, PolarsError> {
567        self.0.freq_items(columns, support).map(DataFrame)
568    }
569
570    pub fn approx_quantile(
571        &self,
572        column: &str,
573        probabilities: &[f64],
574    ) -> Result<DataFrame, PolarsError> {
575        self.0.approx_quantile(column, probabilities).map(DataFrame)
576    }
577
578    pub fn crosstab(&self, col1: &str, col2: &str) -> Result<DataFrame, PolarsError> {
579        self.0.crosstab(col1, col2).map(DataFrame)
580    }
581
582    /// Write this DataFrame to path (returns root-owned writer).
583    pub fn write(&self) -> DataFrameWriter<'_> {
584        DataFrameWriter {
585            inner: self.0.write(),
586        }
587    }
588
589    /// Write to Delta table (requires delta feature). Delegates to backend.
590    #[cfg(feature = "delta")]
591    pub fn write_delta(&self, path: impl AsRef<Path>, overwrite: bool) -> Result<(), PolarsError> {
592        self.0.write_delta(path, overwrite)
593    }
594
595    #[cfg(not(feature = "delta"))]
596    pub fn write_delta(
597        &self,
598        _path: impl AsRef<Path>,
599        _overwrite: bool,
600    ) -> Result<(), PolarsError> {
601        Err(PolarsError::InvalidOperation(
602            "Delta Lake requires the 'delta' feature. Build with --features delta.".into(),
603        ))
604    }
605
606    /// Register as delta table by name.
607    pub fn save_as_delta_table(&self, session: &crate::session::SparkSession, name: &str) {
608        self.0.save_as_delta_table(&session.0, name)
609    }
610}
611
612impl<'a> DataFrameStat<'a> {
613    pub fn cov(&self, col1: &str, col2: &str) -> Result<f64, PolarsError> {
614        self.0.cov(col1, col2)
615    }
616
617    pub fn corr(&self, col1: &str, col2: &str) -> Result<f64, PolarsError> {
618        self.0.corr(col1, col2)
619    }
620
621    pub fn corr_matrix(&self) -> Result<DataFrame, PolarsError> {
622        self.0.corr_matrix().map(DataFrame)
623    }
624}
625
626// DataFrameNa - delegate to inner (methods return DataFrame, wrap)
627impl<'a> DataFrameNa<'a> {
628    pub fn drop(
629        &self,
630        subset: Option<Vec<&str>>,
631        how: &str,
632        thresh: Option<usize>,
633    ) -> Result<DataFrame, PolarsError> {
634        self.0.drop(subset, how, thresh).map(DataFrame)
635    }
636
637    pub fn fill(&self, value: Expr, subset: Option<Vec<&str>>) -> Result<DataFrame, PolarsError> {
638        self.0.fill(value, subset).map(DataFrame)
639    }
640
641    pub fn replace(
642        &self,
643        old_value: Expr,
644        new_value: Expr,
645        subset: Option<Vec<&str>>,
646    ) -> Result<DataFrame, PolarsError> {
647        self.0.replace(old_value, new_value, subset).map(DataFrame)
648    }
649}
650
651impl<'a> DataFrameWriter<'a> {
652    pub fn mode(mut self, mode: WriteMode) -> Self {
653        self.inner = self.inner.mode(mode);
654        self
655    }
656
657    pub fn format(mut self, format: WriteFormat) -> Self {
658        self.inner = self.inner.format(format);
659        self
660    }
661
662    pub fn option(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
663        self.inner = self.inner.option(key, value);
664        self
665    }
666
667    pub fn options(mut self, opts: impl IntoIterator<Item = (String, String)>) -> Self {
668        self.inner = self.inner.options(opts);
669        self
670    }
671
672    pub fn partition_by(mut self, cols: impl IntoIterator<Item = impl Into<String>>) -> Self {
673        self.inner = self.inner.partition_by(cols);
674        self
675    }
676
677    pub fn save_as_table(
678        &self,
679        session: &crate::session::SparkSession,
680        name: &str,
681        mode: SaveMode,
682    ) -> Result<(), PolarsError> {
683        self.inner.save_as_table(&session.0, name, mode)
684    }
685
686    pub fn save(&self, path: impl AsRef<Path>) -> Result<(), PolarsError> {
687        self.inner.save(path)
688    }
689}
690
691impl GroupedData {
692    pub fn count(&self) -> Result<DataFrame, PolarsError> {
693        self.0.count().map(DataFrame)
694    }
695
696    pub fn sum(&self, column: &str) -> Result<DataFrame, PolarsError> {
697        self.0.sum(column).map(DataFrame)
698    }
699
700    pub fn min(&self, column: &str) -> Result<DataFrame, PolarsError> {
701        self.0.min(column).map(DataFrame)
702    }
703
704    pub fn max(&self, column: &str) -> Result<DataFrame, PolarsError> {
705        self.0.max(column).map(DataFrame)
706    }
707
708    pub fn mean(&self, column: &str) -> Result<DataFrame, PolarsError> {
709        self.0.avg(&[column]).map(DataFrame)
710    }
711
712    pub fn avg(&self, columns: &[&str]) -> Result<DataFrame, PolarsError> {
713        self.0.avg(columns).map(DataFrame)
714    }
715
716    pub fn agg(&self, exprs: Vec<Expr>) -> Result<DataFrame, PolarsError> {
717        self.0.agg(exprs).map(DataFrame)
718    }
719
720    pub fn agg_columns(&self, aggregations: Vec<Column>) -> Result<DataFrame, PolarsError> {
721        self.0.agg_columns(aggregations).map(DataFrame)
722    }
723
724    /// Aggregate using ExprIr expressions (e.g. sum(col("x")), count(col("y")) from core).
725    pub fn agg_expr_ir(&self, exprs: &[ExprIr]) -> Result<DataFrame, EngineError> {
726        downcast_df(GroupedDataBackend::agg(&self.0, exprs)?)
727    }
728
729    pub fn pivot(&self, pivot_col: &str, values: Option<Vec<String>>) -> PivotedGroupedData {
730        PivotedGroupedData(self.0.pivot(pivot_col, values))
731    }
732}
733
734impl CubeRollupData {
735    pub fn count(&self) -> Result<DataFrame, PolarsError> {
736        self.0.count().map(DataFrame)
737    }
738
739    pub fn agg(&self, exprs: Vec<Expr>) -> Result<DataFrame, PolarsError> {
740        self.0.agg(exprs).map(DataFrame)
741    }
742}
743
744impl PivotedGroupedData {
745    pub fn count(&self) -> Result<DataFrame, PolarsError> {
746        self.0.count().map(DataFrame)
747    }
748
749    pub fn sum(&self, value_col: &str) -> Result<DataFrame, PolarsError> {
750        self.0.sum(value_col).map(DataFrame)
751    }
752
753    pub fn avg(&self, value_col: &str) -> Result<DataFrame, PolarsError> {
754        self.0.avg(value_col).map(DataFrame)
755    }
756
757    pub fn min(&self, value_col: &str) -> Result<DataFrame, PolarsError> {
758        self.0.min(value_col).map(DataFrame)
759    }
760
761    pub fn max(&self, value_col: &str) -> Result<DataFrame, PolarsError> {
762        self.0.max(value_col).map(DataFrame)
763    }
764}