Skip to main content

robin_sparkless/
dataframe.rs

1//! Root-owned DataFrame API; delegates to robin-sparkless-polars for execution.
2
3use robin_sparkless_core::EngineError;
4use robin_sparkless_core::engine::{CollectedRows, DataFrameBackend, GroupedDataBackend};
5use robin_sparkless_core::expr::ExprIr;
6use robin_sparkless_core::{DataType, StructType};
7use robin_sparkless_polars::dataframe::{
8    DataFrameNa as PolarsDataFrameNa, DataFrameStat as PolarsDataFrameStat,
9    DataFrameWriter as PolarsDataFrameWriter,
10};
11use robin_sparkless_polars::functions::SortOrder;
12use robin_sparkless_polars::{
13    Column, CubeRollupData as PolarsCubeRollupData, DataFrame as PolarsDataFrame, Expr,
14    GroupedData as PolarsGroupedData, LazyFrame, PivotedGroupedData as PolarsPivotedGroupedData,
15    PlDataFrame, PolarsError,
16};
17use std::collections::HashMap;
18use std::path::Path;
19use std::sync::Arc;
20
21use serde_json::Value as JsonValue;
22
23/// Downcast trait result to root DataFrame (Polars backend).
24fn downcast_df(box_df: Box<dyn DataFrameBackend>) -> Result<DataFrame, EngineError> {
25    let concrete = box_df
26        .as_any()
27        .downcast_ref::<PolarsDataFrame>()
28        .ok_or_else(|| EngineError::Internal("expected Polars backend".into()))?;
29    Ok(DataFrame(concrete.clone()))
30}
31
32/// Convert a boxed engine [`DataFrameBackend`] into a root-owned [`DataFrame`].
33/// This is a small adapter used by plan execution and other engine-generic entry points.
34pub(crate) fn from_backend(box_df: Box<dyn DataFrameBackend>) -> Result<DataFrame, EngineError> {
35    downcast_df(box_df)
36}
37
38/// Root-owned DataFrame; delegates to the Polars backend.
39#[derive(Clone)]
40pub struct DataFrame(pub(crate) PolarsDataFrame);
41
42/// Root-owned GroupedData; delegates to the Polars backend.
43pub struct GroupedData(pub(crate) PolarsGroupedData);
44
45/// Root-owned CubeRollupData; delegates to the Polars backend.
46pub struct CubeRollupData(pub(crate) PolarsCubeRollupData);
47
48/// Root-owned PivotedGroupedData; delegates to the Polars backend.
49pub struct PivotedGroupedData(pub(crate) PolarsPivotedGroupedData);
50
51/// Engine-generic DataFrame operations expressed in terms of `ExprIr` and `EngineError`.
52/// Implemented for the root [`DataFrame`] using the `DataFrameBackend` trait so callers
53/// can depend on this trait instead of the concrete Polars-backed type.
54pub trait EngineDataFrame {
55    fn filter_expr_ir(&self, condition: &ExprIr) -> Result<DataFrame, EngineError>;
56    fn select_expr_ir(&self, exprs: &[ExprIr]) -> Result<DataFrame, EngineError>;
57    fn with_column_expr_ir(&self, name: &str, expr: &ExprIr) -> Result<DataFrame, EngineError>;
58    fn collect_rows(&self) -> Result<CollectedRows, EngineError>;
59}
60
61impl EngineDataFrame for DataFrame {
62    fn filter_expr_ir(&self, condition: &ExprIr) -> Result<DataFrame, EngineError> {
63        downcast_df(DataFrameBackend::filter(&self.0, condition)?)
64    }
65
66    fn select_expr_ir(&self, exprs: &[ExprIr]) -> Result<DataFrame, EngineError> {
67        downcast_df(DataFrameBackend::select(&self.0, exprs)?)
68    }
69
70    fn with_column_expr_ir(&self, name: &str, expr: &ExprIr) -> Result<DataFrame, EngineError> {
71        downcast_df(DataFrameBackend::with_column(&self.0, name, expr)?)
72    }
73
74    fn collect_rows(&self) -> Result<CollectedRows, EngineError> {
75        DataFrameBackend::collect(&self.0)
76    }
77}
78
79/// Re-export for API compatibility.
80pub use robin_sparkless_polars::dataframe::{
81    GroupBySpec, JoinType, SaveMode, SelectItem, WriteFormat, WriteMode,
82    expr_contains_only_join_key_equalities, try_extract_join_eq_columns,
83    try_extract_join_eq_columns_all,
84};
85
86/// Root-owned DataFrameStat; delegates to the Polars backend.
87pub struct DataFrameStat<'a>(PolarsDataFrameStat<'a>);
88
89/// Root-owned DataFrameNa; delegates to the Polars backend.
90pub struct DataFrameNa<'a>(PolarsDataFrameNa<'a>);
91
92/// Root-owned DataFrameWriter; delegates to the Polars backend.
93pub struct DataFrameWriter<'a> {
94    inner: PolarsDataFrameWriter<'a>,
95}
96
97impl DataFrame {
98    /// Create from a Polars DataFrame (via backend).
99    pub fn from_polars(df: PlDataFrame) -> Self {
100        DataFrame(PolarsDataFrame::from_polars(df))
101    }
102
103    /// Create from a Polars DataFrame with options.
104    pub fn from_polars_with_options(df: PlDataFrame, case_sensitive: bool) -> Self {
105        DataFrame(PolarsDataFrame::from_polars_with_options(
106            df,
107            case_sensitive,
108        ))
109    }
110
111    /// Create from a LazyFrame (via backend).
112    pub fn from_lazy(lf: LazyFrame) -> Self {
113        DataFrame(PolarsDataFrame::from_lazy(lf))
114    }
115
116    /// Create from a LazyFrame with options.
117    pub fn from_lazy_with_options(lf: LazyFrame, case_sensitive: bool) -> Self {
118        DataFrame(PolarsDataFrame::from_lazy_with_options(lf, case_sensitive))
119    }
120
121    /// Create an empty DataFrame.
122    pub fn empty() -> Self {
123        DataFrame(PolarsDataFrame::empty())
124    }
125
126    /// Return a DataFrame with the given alias.
127    pub fn alias(&self, name: &str) -> Self {
128        DataFrame(self.0.alias(name))
129    }
130
131    /// Return the table alias if set (e.g. from df.alias("t")). Used for join condition resolution (#374).
132    pub fn get_alias(&self) -> Option<String> {
133        self.0.get_alias()
134    }
135
136    /// Filter rows using an engine-agnostic expression (ExprIr).
137    pub fn filter_expr_ir(&self, condition: &ExprIr) -> Result<DataFrame, EngineError> {
138        <Self as EngineDataFrame>::filter_expr_ir(self, condition)
139    }
140
141    /// Select columns/expressions using ExprIr.
142    pub fn select_expr_ir(&self, exprs: &[ExprIr]) -> Result<DataFrame, EngineError> {
143        <Self as EngineDataFrame>::select_expr_ir(self, exprs)
144    }
145
146    /// Add or replace a column using ExprIr.
147    pub fn with_column_expr_ir(&self, name: &str, expr: &ExprIr) -> Result<DataFrame, EngineError> {
148        <Self as EngineDataFrame>::with_column_expr_ir(self, name, expr)
149    }
150
151    /// Collect as engine-agnostic rows (column name -> JSON value per row).
152    pub fn collect_rows(&self) -> Result<CollectedRows, EngineError> {
153        <Self as EngineDataFrame>::collect_rows(self)
154    }
155
156    pub fn resolve_expr_column_names(&self, expr: Expr) -> Result<Expr, PolarsError> {
157        self.0.resolve_expr_column_names(expr)
158    }
159
160    pub fn coerce_string_numeric_comparisons(&self, expr: Expr) -> Result<Expr, PolarsError> {
161        self.0.coerce_string_numeric_comparisons(expr)
162    }
163
164    pub fn resolve_column_name(&self, name: &str) -> Result<String, PolarsError> {
165        self.0.resolve_column_name(name)
166    }
167
168    pub fn schema(&self) -> Result<StructType, PolarsError> {
169        self.0.schema()
170    }
171
172    pub fn schema_engine(&self) -> Result<StructType, EngineError> {
173        self.0.schema_engine()
174    }
175
176    pub fn get_column_dtype(&self, name: &str) -> Option<robin_sparkless_polars::PlDataType> {
177        self.0.get_column_dtype(name)
178    }
179
180    pub fn get_column_data_type(&self, name: &str) -> Option<DataType> {
181        self.0.get_column_data_type(name)
182    }
183
184    pub fn columns(&self) -> Result<Vec<String>, PolarsError> {
185        self.0.columns()
186    }
187
188    pub fn columns_engine(&self) -> Result<Vec<String>, EngineError> {
189        self.0.columns_engine()
190    }
191
192    pub fn count(&self) -> Result<usize, PolarsError> {
193        self.0.count()
194    }
195
196    pub fn count_engine(&self) -> Result<usize, EngineError> {
197        self.0.count_engine()
198    }
199
200    pub fn show(&self, n: Option<usize>) -> Result<(), PolarsError> {
201        self.0.show(n)
202    }
203
204    pub fn collect(&self) -> Result<Arc<PlDataFrame>, PolarsError> {
205        self.0.collect()
206    }
207
208    pub fn collect_as_json_rows_engine(
209        &self,
210    ) -> Result<Vec<HashMap<String, JsonValue>>, EngineError> {
211        self.0.collect_as_json_rows_engine()
212    }
213
214    pub fn collect_as_json_rows(&self) -> Result<Vec<HashMap<String, JsonValue>>, PolarsError> {
215        self.0.collect_as_json_rows()
216    }
217
218    /// Returns (output_column_names, rows, collected_schema). Use output names for Row keys; use collected schema for dtype so get_json_object etc. are string (#1146).
219    #[allow(clippy::type_complexity)]
220    pub fn collect_as_json_rows_with_names(
221        &self,
222    ) -> Result<(Vec<String>, Vec<HashMap<String, JsonValue>>, StructType), PolarsError> {
223        self.0.collect_as_json_rows_with_names()
224    }
225
226    pub fn to_json_rows(&self) -> Result<String, EngineError> {
227        self.0.to_json_rows()
228    }
229
230    pub fn select_exprs(&self, exprs: Vec<Expr>) -> Result<DataFrame, PolarsError> {
231        self.0.select_exprs(exprs).map(DataFrame)
232    }
233
234    pub fn select(&self, cols: Vec<&str>) -> Result<DataFrame, PolarsError> {
235        self.0.select(cols).map(DataFrame)
236    }
237
238    pub fn select_engine(&self, cols: Vec<&str>) -> Result<DataFrame, EngineError> {
239        self.0.select_engine(cols).map(DataFrame)
240    }
241
242    pub fn select_items(&self, items: Vec<SelectItem<'_>>) -> Result<DataFrame, PolarsError> {
243        self.0.select_items(items).map(DataFrame)
244    }
245
246    pub fn filter(&self, condition: Expr) -> Result<DataFrame, PolarsError> {
247        self.0.filter(condition).map(DataFrame)
248    }
249
250    pub fn filter_engine(&self, condition: Expr) -> Result<DataFrame, EngineError> {
251        self.0.filter_engine(condition).map(DataFrame)
252    }
253
254    pub fn column(&self, name: &str) -> Result<Column, PolarsError> {
255        self.0.column(name)
256    }
257
258    pub fn with_column(&self, column_name: &str, col: &Column) -> Result<DataFrame, PolarsError> {
259        self.0.with_column(column_name, col).map(DataFrame)
260    }
261
262    pub fn with_column_engine(
263        &self,
264        column_name: &str,
265        col: &Column,
266    ) -> Result<DataFrame, EngineError> {
267        self.0.with_column_engine(column_name, col).map(DataFrame)
268    }
269
270    pub fn with_column_expr(
271        &self,
272        column_name: &str,
273        expr: Expr,
274    ) -> Result<DataFrame, PolarsError> {
275        self.0.with_column_expr(column_name, expr).map(DataFrame)
276    }
277
278    pub fn group_by(&self, column_names: Vec<&str>) -> Result<GroupedData, PolarsError> {
279        self.0.group_by(column_names).map(GroupedData)
280    }
281
282    pub fn group_by_engine(&self, column_names: Vec<&str>) -> Result<GroupedData, EngineError> {
283        self.0.group_by_engine(column_names).map(GroupedData)
284    }
285
286    pub fn group_by_exprs(
287        &self,
288        exprs: Vec<Expr>,
289        grouping_col_names: Vec<String>,
290    ) -> Result<GroupedData, PolarsError> {
291        self.0
292            .group_by_exprs(exprs, grouping_col_names)
293            .map(GroupedData)
294    }
295
296    pub fn group_by_specs(&self, specs: Vec<GroupBySpec>) -> Result<GroupedData, PolarsError> {
297        self.0.group_by_specs(specs).map(GroupedData)
298    }
299
300    pub fn cube(&self, column_names: Vec<&str>) -> Result<CubeRollupData, PolarsError> {
301        self.0.cube(column_names).map(CubeRollupData)
302    }
303
304    pub fn rollup(&self, column_names: Vec<&str>) -> Result<CubeRollupData, PolarsError> {
305        self.0.rollup(column_names).map(CubeRollupData)
306    }
307
308    pub fn agg(&self, aggregations: Vec<Expr>) -> Result<DataFrame, PolarsError> {
309        self.0.agg(aggregations).map(DataFrame)
310    }
311
312    pub fn join(
313        &self,
314        other: &DataFrame,
315        on: Vec<&str>,
316        how: JoinType,
317    ) -> Result<DataFrame, PolarsError> {
318        self.0.join(&other.0, on, how).map(DataFrame)
319    }
320
321    /// Join with different column names on left and right (PySpark left_on/right_on).
322    pub fn join_with_keys(
323        &self,
324        other: &DataFrame,
325        left_on: Vec<&str>,
326        right_on: Vec<&str>,
327        how: JoinType,
328        only_key_equalities: bool,
329    ) -> Result<DataFrame, PolarsError> {
330        self.0
331            .join_with_keys(&other.0, left_on, right_on, how, only_key_equalities)
332            .map(DataFrame)
333    }
334
335    pub fn order_by(
336        &self,
337        column_names: Vec<&str>,
338        ascending: Vec<bool>,
339    ) -> Result<DataFrame, PolarsError> {
340        self.0.order_by(column_names, ascending).map(DataFrame)
341    }
342
343    pub fn order_by_exprs(&self, sort_orders: Vec<SortOrder>) -> Result<DataFrame, PolarsError> {
344        self.0.order_by_exprs(sort_orders).map(DataFrame)
345    }
346
347    pub fn union(&self, other: &DataFrame) -> Result<DataFrame, PolarsError> {
348        self.0.union(&other.0).map(DataFrame)
349    }
350
351    pub fn union_all(&self, other: &DataFrame) -> Result<DataFrame, PolarsError> {
352        self.0.union_all(&other.0).map(DataFrame)
353    }
354
355    pub fn union_by_name(
356        &self,
357        other: &DataFrame,
358        allow_missing_columns: bool,
359    ) -> Result<DataFrame, PolarsError> {
360        self.0
361            .union_by_name(&other.0, allow_missing_columns)
362            .map(DataFrame)
363    }
364
365    pub fn distinct(&self, subset: Option<Vec<&str>>) -> Result<DataFrame, PolarsError> {
366        self.0.distinct(subset).map(DataFrame)
367    }
368
369    pub fn drop(&self, columns: Vec<&str>) -> Result<DataFrame, PolarsError> {
370        self.0.drop(columns).map(DataFrame)
371    }
372
373    pub fn dropna(
374        &self,
375        subset: Option<Vec<&str>>,
376        how: &str,
377        thresh: Option<usize>,
378    ) -> Result<DataFrame, PolarsError> {
379        self.0.dropna(subset, how, thresh).map(DataFrame)
380    }
381
382    pub fn fillna(&self, value: Expr, subset: Option<Vec<&str>>) -> Result<DataFrame, PolarsError> {
383        self.0.fillna(value, subset).map(DataFrame)
384    }
385
386    pub fn limit(&self, n: usize) -> Result<DataFrame, PolarsError> {
387        self.0.limit(n).map(DataFrame)
388    }
389
390    pub fn limit_engine(&self, n: usize) -> Result<DataFrame, EngineError> {
391        self.0.limit_engine(n).map(DataFrame)
392    }
393
394    pub fn with_column_renamed(
395        &self,
396        old_name: &str,
397        new_name: &str,
398    ) -> Result<DataFrame, PolarsError> {
399        self.0
400            .with_column_renamed(old_name, new_name)
401            .map(DataFrame)
402    }
403
404    pub fn replace(
405        &self,
406        column_name: &str,
407        old_value: Expr,
408        new_value: Expr,
409    ) -> Result<DataFrame, PolarsError> {
410        self.0
411            .replace(column_name, old_value, new_value)
412            .map(DataFrame)
413    }
414
415    pub fn cross_join(&self, other: &DataFrame) -> Result<DataFrame, PolarsError> {
416        self.0.cross_join(&other.0).map(DataFrame)
417    }
418
419    pub fn describe(&self) -> Result<DataFrame, PolarsError> {
420        self.0.describe().map(DataFrame)
421    }
422
423    pub fn cache(&self) -> Result<DataFrame, PolarsError> {
424        self.0.cache().map(DataFrame)
425    }
426
427    pub fn persist(&self) -> Result<DataFrame, PolarsError> {
428        self.0.persist().map(DataFrame)
429    }
430
431    pub fn unpersist(&self) -> Result<DataFrame, PolarsError> {
432        self.0.unpersist().map(DataFrame)
433    }
434
435    pub fn subtract(&self, other: &DataFrame) -> Result<DataFrame, PolarsError> {
436        self.0.subtract(&other.0).map(DataFrame)
437    }
438
439    pub fn intersect(&self, other: &DataFrame) -> Result<DataFrame, PolarsError> {
440        self.0.intersect(&other.0).map(DataFrame)
441    }
442
443    pub fn except_all(&self, other: &DataFrame) -> Result<DataFrame, PolarsError> {
444        self.0.except_all(&other.0).map(DataFrame)
445    }
446
447    pub fn sample(
448        &self,
449        with_replacement: bool,
450        fraction: f64,
451        seed: Option<u64>,
452    ) -> Result<DataFrame, PolarsError> {
453        self.0
454            .sample(with_replacement, fraction, seed)
455            .map(DataFrame)
456    }
457
458    pub fn random_split(
459        &self,
460        weights: &[f64],
461        seed: Option<u64>,
462    ) -> Result<Vec<DataFrame>, PolarsError> {
463        self.0
464            .random_split(weights, seed)
465            .map(|v| v.into_iter().map(DataFrame).collect())
466    }
467
468    pub fn sample_by(
469        &self,
470        col_name: &str,
471        fractions: &[(Expr, f64)],
472        seed: Option<u64>,
473    ) -> Result<DataFrame, PolarsError> {
474        self.0.sample_by(col_name, fractions, seed).map(DataFrame)
475    }
476
477    pub fn first(&self) -> Result<DataFrame, PolarsError> {
478        self.0.first().map(DataFrame)
479    }
480
481    pub fn head(&self, n: usize) -> Result<DataFrame, PolarsError> {
482        self.0.head(n).map(DataFrame)
483    }
484
485    pub fn take(&self, n: usize) -> Result<DataFrame, PolarsError> {
486        self.0.take(n).map(DataFrame)
487    }
488
489    pub fn tail(&self, n: usize) -> Result<DataFrame, PolarsError> {
490        self.0.tail(n).map(DataFrame)
491    }
492
493    pub fn is_empty(&self) -> bool {
494        self.0.is_empty()
495    }
496
497    pub fn to_df(&self, names: Vec<&str>) -> Result<DataFrame, PolarsError> {
498        self.0.to_df(names).map(DataFrame)
499    }
500
501    pub fn stat(&self) -> DataFrameStat<'_> {
502        DataFrameStat(PolarsDataFrameStat::new(&self.0))
503    }
504
505    pub fn corr(&self) -> Result<DataFrame, PolarsError> {
506        self.0.corr().map(DataFrame)
507    }
508
509    pub fn corr_cols(&self, col1: &str, col2: &str) -> Result<f64, PolarsError> {
510        self.0.corr_cols(col1, col2)
511    }
512
513    pub fn cov_cols(&self, col1: &str, col2: &str) -> Result<f64, PolarsError> {
514        self.0.cov_cols(col1, col2)
515    }
516
517    pub fn summary(&self) -> Result<DataFrame, PolarsError> {
518        self.0.summary().map(DataFrame)
519    }
520
521    pub fn to_json(&self) -> Result<Vec<String>, PolarsError> {
522        self.0.to_json()
523    }
524
525    pub fn explain(&self) -> String {
526        self.0.explain()
527    }
528
529    pub fn print_schema(&self) -> Result<String, PolarsError> {
530        self.0.print_schema()
531    }
532
533    pub fn checkpoint(&self) -> Result<DataFrame, PolarsError> {
534        self.0.checkpoint().map(DataFrame)
535    }
536
537    pub fn local_checkpoint(&self) -> Result<DataFrame, PolarsError> {
538        self.0.local_checkpoint().map(DataFrame)
539    }
540
541    pub fn repartition(&self, num_partitions: usize) -> Result<DataFrame, PolarsError> {
542        self.0.repartition(num_partitions).map(DataFrame)
543    }
544
545    pub fn repartition_by_range(
546        &self,
547        num_partitions: usize,
548        columns: Vec<&str>,
549    ) -> Result<DataFrame, PolarsError> {
550        self.0
551            .repartition_by_range(num_partitions, columns)
552            .map(DataFrame)
553    }
554
555    pub fn dtypes(&self) -> Result<Vec<(String, String)>, PolarsError> {
556        self.0.dtypes()
557    }
558
559    pub fn sort_within_partitions(&self, cols: &[SortOrder]) -> Result<DataFrame, PolarsError> {
560        self.0.sort_within_partitions(cols).map(DataFrame)
561    }
562
563    pub fn coalesce(&self, num_partitions: usize) -> Result<DataFrame, PolarsError> {
564        self.0.coalesce(num_partitions).map(DataFrame)
565    }
566
567    pub fn hint(&self, name: &str, params: &[i32]) -> Result<DataFrame, PolarsError> {
568        self.0.hint(name, params).map(DataFrame)
569    }
570
571    pub fn is_local(&self) -> bool {
572        self.0.is_local()
573    }
574
575    pub fn input_files(&self) -> Vec<String> {
576        self.0.input_files()
577    }
578
579    pub fn same_semantics(&self, other: &DataFrame) -> bool {
580        self.0.same_semantics(&other.0)
581    }
582
583    pub fn semantic_hash(&self) -> u64 {
584        self.0.semantic_hash()
585    }
586
587    pub fn observe(&self, name: &str, expr: Expr) -> Result<DataFrame, PolarsError> {
588        self.0.observe(name, expr).map(DataFrame)
589    }
590
591    pub fn with_watermark(
592        &self,
593        event_time: &str,
594        delay_threshold: &str,
595    ) -> Result<DataFrame, PolarsError> {
596        self.0
597            .with_watermark(event_time, delay_threshold)
598            .map(DataFrame)
599    }
600
601    pub fn select_expr(&self, exprs: &[String]) -> Result<DataFrame, PolarsError> {
602        self.0.select_expr(exprs).map(DataFrame)
603    }
604
605    /// Select by expression strings using SQL parsing (e.g. "upper(Name) as u"). Requires active session; use when session is available for full selectExpr parity.
606    #[cfg(feature = "sql")]
607    pub fn select_expr_with_session(
608        &self,
609        session: &crate::session::SparkSession,
610        exprs: &[String],
611    ) -> Result<DataFrame, PolarsError> {
612        self.0
613            .select_expr_with_session(&session.0, exprs)
614            .map(DataFrame)
615    }
616
617    pub fn col_regex(&self, pattern: &str) -> Result<DataFrame, PolarsError> {
618        self.0.col_regex(pattern).map(DataFrame)
619    }
620
621    pub fn with_columns(&self, exprs: &[(String, Column)]) -> Result<DataFrame, PolarsError> {
622        self.0.with_columns(exprs).map(DataFrame)
623    }
624
625    pub fn with_columns_renamed(
626        &self,
627        renames: &[(String, String)],
628    ) -> Result<DataFrame, PolarsError> {
629        self.0.with_columns_renamed(renames).map(DataFrame)
630    }
631
632    pub fn na(&self) -> DataFrameNa<'_> {
633        DataFrameNa(PolarsDataFrameNa::new(&self.0))
634    }
635
636    pub fn offset(&self, n: usize) -> Result<DataFrame, PolarsError> {
637        self.0.offset(n).map(DataFrame)
638    }
639
640    pub fn transform<F>(&self, f: F) -> Result<DataFrame, PolarsError>
641    where
642        F: FnOnce(DataFrame) -> Result<DataFrame, PolarsError>,
643    {
644        self.0
645            .transform(|polars_df| f(DataFrame(polars_df)).map(|r| r.0))
646            .map(DataFrame)
647    }
648
649    pub fn freq_items(&self, columns: &[&str], support: f64) -> Result<DataFrame, PolarsError> {
650        self.0.freq_items(columns, support).map(DataFrame)
651    }
652
653    pub fn approx_quantile(
654        &self,
655        column: &str,
656        probabilities: &[f64],
657    ) -> Result<DataFrame, PolarsError> {
658        self.0.approx_quantile(column, probabilities).map(DataFrame)
659    }
660
661    pub fn crosstab(&self, col1: &str, col2: &str) -> Result<DataFrame, PolarsError> {
662        self.0.crosstab(col1, col2).map(DataFrame)
663    }
664
665    pub fn melt(&self, id_vars: &[&str], value_vars: &[&str]) -> Result<DataFrame, PolarsError> {
666        self.0.melt(id_vars, value_vars).map(DataFrame)
667    }
668
669    pub fn unpivot(&self, ids: &[&str], values: &[&str]) -> Result<DataFrame, PolarsError> {
670        self.0.unpivot(ids, values).map(DataFrame)
671    }
672
673    /// Write this DataFrame to path (returns root-owned writer).
674    pub fn write(&self) -> DataFrameWriter<'_> {
675        DataFrameWriter {
676            inner: self.0.write(),
677        }
678    }
679
680    /// Write to Delta table (requires delta feature). When appending, `merge_schema` merges schemas (#851).
681    #[cfg(feature = "delta")]
682    pub fn write_delta(
683        &self,
684        path: impl AsRef<Path>,
685        overwrite: bool,
686        merge_schema: bool,
687    ) -> Result<(), PolarsError> {
688        self.0.write_delta(path, overwrite, merge_schema)
689    }
690
691    #[cfg(not(feature = "delta"))]
692    pub fn write_delta(
693        &self,
694        _path: impl AsRef<Path>,
695        _overwrite: bool,
696        _merge_schema: bool,
697    ) -> Result<(), PolarsError> {
698        Err(PolarsError::InvalidOperation(
699            "Delta Lake requires the 'delta' feature. Build with --features delta.".into(),
700        ))
701    }
702
703    /// Register as delta table by name.
704    pub fn save_as_delta_table(&self, session: &crate::session::SparkSession, name: &str) {
705        self.0.save_as_delta_table(&session.0, name)
706    }
707}
708
709impl<'a> DataFrameStat<'a> {
710    pub fn cov(&self, col1: &str, col2: &str) -> Result<f64, PolarsError> {
711        self.0.cov(col1, col2)
712    }
713
714    pub fn corr(&self, col1: &str, col2: &str) -> Result<f64, PolarsError> {
715        self.0.corr(col1, col2)
716    }
717
718    pub fn corr_matrix(&self) -> Result<DataFrame, PolarsError> {
719        self.0.corr_matrix().map(DataFrame)
720    }
721}
722
723// DataFrameNa - delegate to inner (methods return DataFrame, wrap)
724impl<'a> DataFrameNa<'a> {
725    pub fn drop(
726        &self,
727        subset: Option<Vec<&str>>,
728        how: &str,
729        thresh: Option<usize>,
730    ) -> Result<DataFrame, PolarsError> {
731        self.0.drop(subset, how, thresh).map(DataFrame)
732    }
733
734    pub fn fill(&self, value: Expr, subset: Option<Vec<&str>>) -> Result<DataFrame, PolarsError> {
735        self.0.fill(value, subset).map(DataFrame)
736    }
737
738    pub fn replace(
739        &self,
740        old_value: Expr,
741        new_value: Expr,
742        subset: Option<Vec<&str>>,
743    ) -> Result<DataFrame, PolarsError> {
744        self.0.replace(old_value, new_value, subset).map(DataFrame)
745    }
746}
747
748impl<'a> DataFrameWriter<'a> {
749    pub fn mode(mut self, mode: WriteMode) -> Self {
750        self.inner = self.inner.mode(mode);
751        self
752    }
753
754    pub fn format(mut self, format: WriteFormat) -> Self {
755        self.inner = self.inner.format(format);
756        self
757    }
758
759    pub fn option(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
760        self.inner = self.inner.option(key, value);
761        self
762    }
763
764    pub fn options(mut self, opts: impl IntoIterator<Item = (String, String)>) -> Self {
765        self.inner = self.inner.options(opts);
766        self
767    }
768
769    pub fn partition_by(mut self, cols: impl IntoIterator<Item = impl Into<String>>) -> Self {
770        self.inner = self.inner.partition_by(cols);
771        self
772    }
773
774    pub fn save_as_table(
775        &self,
776        session: &crate::session::SparkSession,
777        name: &str,
778        mode: SaveMode,
779    ) -> Result<(), PolarsError> {
780        self.inner.save_as_table(&session.0, name, mode)
781    }
782
783    /// Save the DataFrame as a table with writer options (e.g. mergeSchema=true). Used by Python bindings.
784    pub fn save_as_table_with_options(
785        &self,
786        session: &crate::session::SparkSession,
787        name: &str,
788        mode: SaveMode,
789        options: &[(String, String)],
790    ) -> Result<(), PolarsError> {
791        self.inner
792            .save_as_table_with_options(&session.0, name, mode, options)
793    }
794
795    pub fn save(&self, path: impl AsRef<Path>) -> Result<(), PolarsError> {
796        self.inner.save(path)
797    }
798
799    /// JDBC write convenience mirroring PySpark's df.write.jdbc(url, table, properties).
800    #[cfg(any(
801        feature = "jdbc",
802        feature = "jdbc_mysql",
803        feature = "jdbc_mariadb",
804        feature = "jdbc_mssql",
805        feature = "jdbc_oracle",
806        feature = "jdbc_db2",
807        feature = "sqlite"
808    ))]
809    pub fn jdbc(
810        &self,
811        url: &str,
812        table: &str,
813        properties: &[(String, String)],
814        mode: SaveMode,
815    ) -> Result<(), robin_sparkless_core::EngineError> {
816        self.inner.jdbc(url, table, properties, mode)
817    }
818}
819
820impl GroupedData {
821    pub fn count(&self) -> Result<DataFrame, PolarsError> {
822        self.0.count().map(DataFrame)
823    }
824
825    pub fn sum(&self, column: &str) -> Result<DataFrame, PolarsError> {
826        self.0.sum(column).map(DataFrame)
827    }
828
829    pub fn min(&self, column: &str) -> Result<DataFrame, PolarsError> {
830        self.0.min(column).map(DataFrame)
831    }
832
833    pub fn max(&self, column: &str) -> Result<DataFrame, PolarsError> {
834        self.0.max(column).map(DataFrame)
835    }
836
837    pub fn mean(&self, column: &str) -> Result<DataFrame, PolarsError> {
838        self.0.avg(&[column]).map(DataFrame)
839    }
840
841    pub fn avg(&self, columns: &[&str]) -> Result<DataFrame, PolarsError> {
842        self.0.avg(columns).map(DataFrame)
843    }
844
845    pub fn agg(&self, exprs: Vec<Expr>) -> Result<DataFrame, PolarsError> {
846        self.0.agg(exprs).map(DataFrame)
847    }
848
849    pub fn agg_columns(&self, aggregations: Vec<Column>) -> Result<DataFrame, PolarsError> {
850        self.0.agg_columns(aggregations).map(DataFrame)
851    }
852
853    /// Aggregate using ExprIr expressions (e.g. sum(col("x")), count(col("y")) from core).
854    pub fn agg_expr_ir(&self, exprs: &[ExprIr]) -> Result<DataFrame, EngineError> {
855        downcast_df(GroupedDataBackend::agg(&self.0, exprs)?)
856    }
857
858    pub fn pivot(&self, pivot_col: &str, values: Option<Vec<String>>) -> PivotedGroupedData {
859        PivotedGroupedData(self.0.pivot(pivot_col, values))
860    }
861}
862
863impl CubeRollupData {
864    pub fn count(&self) -> Result<DataFrame, PolarsError> {
865        self.0.count().map(DataFrame)
866    }
867
868    pub fn agg(&self, exprs: Vec<Expr>) -> Result<DataFrame, PolarsError> {
869        self.0.agg(exprs).map(DataFrame)
870    }
871}
872
873impl PivotedGroupedData {
874    pub fn count(&self) -> Result<DataFrame, PolarsError> {
875        self.0.count().map(DataFrame)
876    }
877
878    pub fn sum(&self, value_col: &str) -> Result<DataFrame, PolarsError> {
879        self.0.sum(value_col).map(DataFrame)
880    }
881
882    pub fn avg(&self, value_col: &str) -> Result<DataFrame, PolarsError> {
883        self.0.avg(value_col).map(DataFrame)
884    }
885
886    pub fn min(&self, value_col: &str) -> Result<DataFrame, PolarsError> {
887        self.0.min(value_col).map(DataFrame)
888    }
889
890    pub fn max(&self, value_col: &str) -> Result<DataFrame, PolarsError> {
891        self.0.max(value_col).map(DataFrame)
892    }
893
894    /// Internal: not in PySpark pivot API; use .agg(F.count_distinct(...)).
895    pub fn _count_distinct(&self, value_col: &str) -> Result<DataFrame, PolarsError> {
896        self.0._count_distinct(value_col).map(DataFrame)
897    }
898
899    /// Internal: not in PySpark pivot API; use .agg(F.collect_list(...)).
900    pub fn _collect_list(&self, value_col: &str) -> Result<DataFrame, PolarsError> {
901        self.0._collect_list(value_col).map(DataFrame)
902    }
903
904    /// Internal: not in PySpark pivot API; use .agg(F.collect_set(...)).
905    pub fn _collect_set(&self, value_col: &str) -> Result<DataFrame, PolarsError> {
906        self.0._collect_set(value_col).map(DataFrame)
907    }
908
909    /// Internal: not in PySpark pivot API.
910    pub fn _first(&self, value_col: &str) -> Result<DataFrame, PolarsError> {
911        self.0._first(value_col).map(DataFrame)
912    }
913
914    /// Internal: not in PySpark pivot API.
915    pub fn _last(&self, value_col: &str) -> Result<DataFrame, PolarsError> {
916        self.0._last(value_col).map(DataFrame)
917    }
918
919    /// Internal: not in PySpark pivot API.
920    pub fn _stddev(&self, value_col: &str) -> Result<DataFrame, PolarsError> {
921        self.0._stddev(value_col).map(DataFrame)
922    }
923
924    /// Internal: not in PySpark pivot API.
925    pub fn _variance(&self, value_col: &str) -> Result<DataFrame, PolarsError> {
926        self.0._variance(value_col).map(DataFrame)
927    }
928
929    pub fn mean(&self, value_col: &str) -> Result<DataFrame, PolarsError> {
930        self.0.mean(value_col).map(DataFrame)
931    }
932
933    pub fn agg(&self, exprs: Vec<Expr>) -> Result<DataFrame, PolarsError> {
934        self.0.agg(exprs).map(DataFrame)
935    }
936}
937
938#[cfg(test)]
939mod tests {
940    /// Smoke test for EngineDataFrame: ensure filter_expr_ir + collect_rows round-trips a simple filter.
941    #[test]
942    fn engine_dataframe_filter_and_collect_rows() {
943        use robin_sparkless_core::expr::{col, gt};
944        use serde_json::json;
945
946        let session = crate::session::SparkSession::builder()
947            .app_name("engine_dataframe_filter")
948            .get_or_create();
949        let data = vec![vec![json!(1)], vec![json!(2)], vec![json!(3)]];
950        let schema = vec![("x".to_string(), "bigint".to_string())];
951        let df = session
952            .create_dataframe_from_rows_engine(data, schema, false, false)
953            .unwrap();
954
955        let cond = gt(col("x"), col("x")); // always false; expect zero rows
956        let filtered = df.filter_expr_ir(&cond).unwrap();
957        let rows = filtered.collect_rows().unwrap();
958        assert!(
959            rows.is_empty(),
960            "expected no rows after always-false filter"
961        );
962    }
963}