Skip to main content

robin_sparkless/dataframe/
mod.rs

1//! DataFrame module: main tabular type and submodules for transformations, aggregations, joins, stats.
2
3mod aggregations;
4mod joins;
5mod stats;
6mod transformations;
7
8pub use aggregations::{CubeRollupData, GroupedData};
9pub use joins::{join, JoinType};
10pub use stats::DataFrameStat;
11pub use transformations::{
12    filter, order_by, order_by_exprs, select, select_with_exprs, with_column, DataFrameNa,
13};
14
15use crate::column::Column;
16use crate::functions::SortOrder;
17use crate::schema::StructType;
18use polars::prelude::{
19    AnyValue, DataFrame as PlDataFrame, Expr, PolarsError, SchemaNamesAndDtypes,
20};
21use serde_json::Value as JsonValue;
22use std::collections::HashMap;
23use std::sync::Arc;
24
25/// Default for `spark.sql.caseSensitive` (PySpark default is false = case-insensitive).
26const DEFAULT_CASE_SENSITIVE: bool = false;
27
28/// DataFrame - main tabular data structure.
29/// Thin wrapper around an eager Polars `DataFrame`.
30pub struct DataFrame {
31    pub(crate) df: Arc<PlDataFrame>,
32    /// When false (default), column names are matched case-insensitively (PySpark behavior).
33    pub(crate) case_sensitive: bool,
34}
35
36impl DataFrame {
37    /// Create a new DataFrame from a Polars DataFrame (case-insensitive column matching by default).
38    pub fn from_polars(df: PlDataFrame) -> Self {
39        DataFrame {
40            df: Arc::new(df),
41            case_sensitive: DEFAULT_CASE_SENSITIVE,
42        }
43    }
44
45    /// Create a new DataFrame from a Polars DataFrame with explicit case sensitivity.
46    /// When `case_sensitive` is false, column resolution is case-insensitive (PySpark default).
47    pub fn from_polars_with_options(df: PlDataFrame, case_sensitive: bool) -> Self {
48        DataFrame {
49            df: Arc::new(df),
50            case_sensitive,
51        }
52    }
53
54    /// Create an empty DataFrame
55    pub fn empty() -> Self {
56        DataFrame {
57            df: Arc::new(PlDataFrame::empty()),
58            case_sensitive: DEFAULT_CASE_SENSITIVE,
59        }
60    }
61
62    /// Resolve a logical column name to the actual column name in the schema.
63    /// When case_sensitive is false, matches case-insensitively.
64    pub fn resolve_column_name(&self, name: &str) -> Result<String, PolarsError> {
65        let names = self.df.get_column_names();
66        if self.case_sensitive {
67            if names.iter().any(|n| *n == name) {
68                return Ok(name.to_string());
69            }
70        } else {
71            let name_lower = name.to_lowercase();
72            for n in names {
73                if n.to_lowercase() == name_lower {
74                    return Ok(n.to_string());
75                }
76            }
77        }
78        let available: Vec<String> = self
79            .df
80            .get_column_names()
81            .iter()
82            .map(|s| s.to_string())
83            .collect();
84        Err(PolarsError::ColumnNotFound(
85            format!(
86                "Column '{}' not found. Available columns: [{}]. Check spelling and case sensitivity (spark.sql.caseSensitive).",
87                name,
88                available.join(", ")
89            )
90            .into(),
91        ))
92    }
93
94    /// Get the schema of the DataFrame
95    pub fn schema(&self) -> Result<StructType, PolarsError> {
96        Ok(StructType::from_polars_schema(&self.df.schema()))
97    }
98
99    /// Get column names
100    pub fn columns(&self) -> Result<Vec<String>, PolarsError> {
101        Ok(self
102            .df
103            .get_column_names()
104            .iter()
105            .map(|s| s.to_string())
106            .collect())
107    }
108
109    /// Count the number of rows (action - triggers execution)
110    pub fn count(&self) -> Result<usize, PolarsError> {
111        Ok(self.df.height())
112    }
113
114    /// Show the first n rows
115    pub fn show(&self, n: Option<usize>) -> Result<(), PolarsError> {
116        let n = n.unwrap_or(20);
117        println!("{}", self.df.head(Some(n)));
118        Ok(())
119    }
120
121    /// Collect the DataFrame (action - triggers execution)
122    pub fn collect(&self) -> Result<Arc<PlDataFrame>, PolarsError> {
123        Ok(self.df.clone())
124    }
125
126    /// Collect as rows of column-name -> JSON value. For use by language bindings (Node, etc.).
127    pub fn collect_as_json_rows(&self) -> Result<Vec<HashMap<String, JsonValue>>, PolarsError> {
128        let df = self.df.as_ref();
129        let names = df.get_column_names();
130        let nrows = df.height();
131        let mut rows = Vec::with_capacity(nrows);
132        for i in 0..nrows {
133            let mut row = HashMap::with_capacity(names.len());
134            for (col_idx, name) in names.iter().enumerate() {
135                let s = df
136                    .get_columns()
137                    .get(col_idx)
138                    .ok_or_else(|| PolarsError::ComputeError("column index out of range".into()))?;
139                let av = s.get(i)?;
140                let jv = any_value_to_json(av);
141                row.insert(name.to_string(), jv);
142            }
143            rows.push(row);
144        }
145        Ok(rows)
146    }
147
148    /// Select columns (returns a new DataFrame).
149    /// Accepts either column names (strings) or Column expressions (e.g. from regexp_extract_all(...).alias("m")).
150    /// Column names are resolved according to case sensitivity.
151    pub fn select_exprs(&self, exprs: Vec<Expr>) -> Result<DataFrame, PolarsError> {
152        transformations::select_with_exprs(self, exprs, self.case_sensitive)
153    }
154
155    /// Select columns by name (returns a new DataFrame).
156    /// Column names are resolved according to case sensitivity.
157    pub fn select(&self, cols: Vec<&str>) -> Result<DataFrame, PolarsError> {
158        let resolved: Vec<String> = cols
159            .iter()
160            .map(|c| self.resolve_column_name(c))
161            .collect::<Result<Vec<_>, _>>()?;
162        let refs: Vec<&str> = resolved.iter().map(|s| s.as_str()).collect();
163        let mut result = transformations::select(self, refs, self.case_sensitive)?;
164        // When case-insensitive, PySpark returns column names in requested (e.g. lowercase) form.
165        if !self.case_sensitive {
166            for (requested, res) in cols.iter().zip(resolved.iter()) {
167                if *requested != res.as_str() {
168                    result = result.with_column_renamed(res, requested)?;
169                }
170            }
171        }
172        Ok(result)
173    }
174
175    /// Filter rows using a Polars expression.
176    pub fn filter(&self, condition: Expr) -> Result<DataFrame, PolarsError> {
177        transformations::filter(self, condition, self.case_sensitive)
178    }
179
180    /// Get a column reference by name (for building expressions).
181    /// Respects case sensitivity: when false, "Age" resolves to column "age" if present.
182    pub fn column(&self, name: &str) -> Result<Column, PolarsError> {
183        let resolved = self.resolve_column_name(name)?;
184        Ok(Column::new(resolved))
185    }
186
187    /// Add or replace a column. Use a [`Column`] (e.g. from `col("x")`, `rand(42)`, `randn(42)`).
188    /// For `rand`/`randn`, generates one distinct value per row (PySpark-like).
189    pub fn with_column(&self, column_name: &str, col: &Column) -> Result<DataFrame, PolarsError> {
190        transformations::with_column(self, column_name, col, self.case_sensitive)
191    }
192
193    /// Add or replace a column using an expression. Prefer [`with_column`](Self::with_column) with a `Column` for rand/randn (per-row values).
194    pub fn with_column_expr(
195        &self,
196        column_name: &str,
197        expr: Expr,
198    ) -> Result<DataFrame, PolarsError> {
199        let col = Column::from_expr(expr, None);
200        self.with_column(column_name, &col)
201    }
202
203    /// Group by columns (returns GroupedData for aggregation).
204    /// Column names are resolved according to case sensitivity.
205    pub fn group_by(&self, column_names: Vec<&str>) -> Result<GroupedData, PolarsError> {
206        use polars::prelude::*;
207        let resolved: Vec<String> = column_names
208            .iter()
209            .map(|c| self.resolve_column_name(c))
210            .collect::<Result<Vec<_>, _>>()?;
211        let exprs: Vec<Expr> = resolved.iter().map(|name| col(name.as_str())).collect();
212        let lazy_grouped = self.df.as_ref().clone().lazy().group_by(exprs);
213        Ok(GroupedData {
214            lazy_grouped,
215            grouping_cols: resolved,
216            case_sensitive: self.case_sensitive,
217        })
218    }
219
220    /// Cube: multiple grouping sets (all subsets of columns), then union (PySpark cube).
221    pub fn cube(&self, column_names: Vec<&str>) -> Result<CubeRollupData, PolarsError> {
222        let resolved: Vec<String> = column_names
223            .iter()
224            .map(|c| self.resolve_column_name(c))
225            .collect::<Result<Vec<_>, _>>()?;
226        Ok(CubeRollupData {
227            df: self.df.as_ref().clone(),
228            grouping_cols: resolved,
229            case_sensitive: self.case_sensitive,
230            is_cube: true,
231        })
232    }
233
234    /// Rollup: grouping sets (prefixes of columns), then union (PySpark rollup).
235    pub fn rollup(&self, column_names: Vec<&str>) -> Result<CubeRollupData, PolarsError> {
236        let resolved: Vec<String> = column_names
237            .iter()
238            .map(|c| self.resolve_column_name(c))
239            .collect::<Result<Vec<_>, _>>()?;
240        Ok(CubeRollupData {
241            df: self.df.as_ref().clone(),
242            grouping_cols: resolved,
243            case_sensitive: self.case_sensitive,
244            is_cube: false,
245        })
246    }
247
248    /// Join with another DataFrame on the given columns.
249    /// Join column names are resolved on the left (and right must have matching names).
250    pub fn join(
251        &self,
252        other: &DataFrame,
253        on: Vec<&str>,
254        how: JoinType,
255    ) -> Result<DataFrame, PolarsError> {
256        let resolved: Vec<String> = on
257            .iter()
258            .map(|c| self.resolve_column_name(c))
259            .collect::<Result<Vec<_>, _>>()?;
260        let on_refs: Vec<&str> = resolved.iter().map(|s| s.as_str()).collect();
261        join(self, other, on_refs, how, self.case_sensitive)
262    }
263
264    /// Order by columns (sort).
265    /// Column names are resolved according to case sensitivity.
266    pub fn order_by(
267        &self,
268        column_names: Vec<&str>,
269        ascending: Vec<bool>,
270    ) -> Result<DataFrame, PolarsError> {
271        let resolved: Vec<String> = column_names
272            .iter()
273            .map(|c| self.resolve_column_name(c))
274            .collect::<Result<Vec<_>, _>>()?;
275        let refs: Vec<&str> = resolved.iter().map(|s| s.as_str()).collect();
276        transformations::order_by(self, refs, ascending, self.case_sensitive)
277    }
278
279    /// Order by sort expressions (asc/desc with nulls_first/last).
280    pub fn order_by_exprs(&self, sort_orders: Vec<SortOrder>) -> Result<DataFrame, PolarsError> {
281        transformations::order_by_exprs(self, sort_orders, self.case_sensitive)
282    }
283
284    /// Union (unionAll): stack another DataFrame vertically. Schemas must match (same columns, same order).
285    pub fn union(&self, other: &DataFrame) -> Result<DataFrame, PolarsError> {
286        transformations::union(self, other, self.case_sensitive)
287    }
288
289    /// Union by name: stack vertically, aligning columns by name.
290    pub fn union_by_name(&self, other: &DataFrame) -> Result<DataFrame, PolarsError> {
291        transformations::union_by_name(self, other, self.case_sensitive)
292    }
293
294    /// Distinct: drop duplicate rows (all columns or optional subset).
295    pub fn distinct(&self, subset: Option<Vec<&str>>) -> Result<DataFrame, PolarsError> {
296        transformations::distinct(self, subset, self.case_sensitive)
297    }
298
299    /// Drop one or more columns.
300    pub fn drop(&self, columns: Vec<&str>) -> Result<DataFrame, PolarsError> {
301        transformations::drop(self, columns, self.case_sensitive)
302    }
303
304    /// Drop rows with nulls (all columns or optional subset).
305    pub fn dropna(&self, subset: Option<Vec<&str>>) -> Result<DataFrame, PolarsError> {
306        transformations::dropna(self, subset, self.case_sensitive)
307    }
308
309    /// Fill nulls with a literal expression (applied to all columns).
310    pub fn fillna(&self, value: Expr) -> Result<DataFrame, PolarsError> {
311        transformations::fillna(self, value, self.case_sensitive)
312    }
313
314    /// Limit: return first n rows.
315    pub fn limit(&self, n: usize) -> Result<DataFrame, PolarsError> {
316        transformations::limit(self, n, self.case_sensitive)
317    }
318
319    /// Rename a column (old_name -> new_name).
320    pub fn with_column_renamed(
321        &self,
322        old_name: &str,
323        new_name: &str,
324    ) -> Result<DataFrame, PolarsError> {
325        transformations::with_column_renamed(self, old_name, new_name, self.case_sensitive)
326    }
327
328    /// Replace values in a column (old_value -> new_value). PySpark replace.
329    pub fn replace(
330        &self,
331        column_name: &str,
332        old_value: Expr,
333        new_value: Expr,
334    ) -> Result<DataFrame, PolarsError> {
335        transformations::replace(self, column_name, old_value, new_value, self.case_sensitive)
336    }
337
338    /// Cross join with another DataFrame (cartesian product). PySpark crossJoin.
339    pub fn cross_join(&self, other: &DataFrame) -> Result<DataFrame, PolarsError> {
340        transformations::cross_join(self, other, self.case_sensitive)
341    }
342
343    /// Summary statistics. PySpark describe.
344    pub fn describe(&self) -> Result<DataFrame, PolarsError> {
345        transformations::describe(self, self.case_sensitive)
346    }
347
348    /// No-op: execution is eager by default. PySpark cache.
349    pub fn cache(&self) -> Result<DataFrame, PolarsError> {
350        Ok(self.clone())
351    }
352
353    /// No-op: execution is eager by default. PySpark persist.
354    pub fn persist(&self) -> Result<DataFrame, PolarsError> {
355        Ok(self.clone())
356    }
357
358    /// No-op. PySpark unpersist.
359    pub fn unpersist(&self) -> Result<DataFrame, PolarsError> {
360        Ok(self.clone())
361    }
362
363    /// Set difference: rows in self not in other. PySpark subtract / except.
364    pub fn subtract(&self, other: &DataFrame) -> Result<DataFrame, PolarsError> {
365        transformations::subtract(self, other, self.case_sensitive)
366    }
367
368    /// Set intersection: rows in both self and other. PySpark intersect.
369    pub fn intersect(&self, other: &DataFrame) -> Result<DataFrame, PolarsError> {
370        transformations::intersect(self, other, self.case_sensitive)
371    }
372
373    /// Sample a fraction of rows. PySpark sample(withReplacement, fraction, seed).
374    pub fn sample(
375        &self,
376        with_replacement: bool,
377        fraction: f64,
378        seed: Option<u64>,
379    ) -> Result<DataFrame, PolarsError> {
380        transformations::sample(self, with_replacement, fraction, seed, self.case_sensitive)
381    }
382
383    /// Split into multiple DataFrames by weights. PySpark randomSplit(weights, seed).
384    pub fn random_split(
385        &self,
386        weights: &[f64],
387        seed: Option<u64>,
388    ) -> Result<Vec<DataFrame>, PolarsError> {
389        transformations::random_split(self, weights, seed, self.case_sensitive)
390    }
391
392    /// Stratified sample by column value. PySpark sampleBy(col, fractions, seed).
393    /// fractions: list of (value as Expr, fraction) for that stratum.
394    pub fn sample_by(
395        &self,
396        col_name: &str,
397        fractions: &[(Expr, f64)],
398        seed: Option<u64>,
399    ) -> Result<DataFrame, PolarsError> {
400        transformations::sample_by(self, col_name, fractions, seed, self.case_sensitive)
401    }
402
403    /// First row as a one-row DataFrame. PySpark first().
404    pub fn first(&self) -> Result<DataFrame, PolarsError> {
405        transformations::first(self, self.case_sensitive)
406    }
407
408    /// First n rows. PySpark head(n).
409    pub fn head(&self, n: usize) -> Result<DataFrame, PolarsError> {
410        transformations::head(self, n, self.case_sensitive)
411    }
412
413    /// Take first n rows. PySpark take(n).
414    pub fn take(&self, n: usize) -> Result<DataFrame, PolarsError> {
415        transformations::take(self, n, self.case_sensitive)
416    }
417
418    /// Last n rows. PySpark tail(n).
419    pub fn tail(&self, n: usize) -> Result<DataFrame, PolarsError> {
420        transformations::tail(self, n, self.case_sensitive)
421    }
422
423    /// True if the DataFrame has zero rows. PySpark isEmpty.
424    pub fn is_empty(&self) -> bool {
425        transformations::is_empty(self)
426    }
427
428    /// Rename columns. PySpark toDF(*colNames).
429    pub fn to_df(&self, names: Vec<&str>) -> Result<DataFrame, PolarsError> {
430        transformations::to_df(self, &names, self.case_sensitive)
431    }
432
433    /// Statistical helper. PySpark df.stat().cov / .corr.
434    pub fn stat(&self) -> DataFrameStat<'_> {
435        DataFrameStat { df: self }
436    }
437
438    /// Correlation matrix of all numeric columns. PySpark df.corr() returns a DataFrame of pairwise correlations.
439    pub fn corr(&self) -> Result<DataFrame, PolarsError> {
440        self.stat().corr_matrix()
441    }
442
443    /// Pearson correlation between two columns (scalar). PySpark df.corr(col1, col2).
444    pub fn corr_cols(&self, col1: &str, col2: &str) -> Result<f64, PolarsError> {
445        self.stat().corr(col1, col2)
446    }
447
448    /// Sample covariance between two columns (scalar). PySpark df.cov(col1, col2).
449    pub fn cov_cols(&self, col1: &str, col2: &str) -> Result<f64, PolarsError> {
450        self.stat().cov(col1, col2)
451    }
452
453    /// Summary statistics (alias for describe). PySpark summary.
454    pub fn summary(&self) -> Result<DataFrame, PolarsError> {
455        self.describe()
456    }
457
458    /// Collect rows as JSON strings (one per row). PySpark toJSON.
459    pub fn to_json(&self) -> Result<Vec<String>, PolarsError> {
460        transformations::to_json(self)
461    }
462
463    /// Return execution plan description. PySpark explain.
464    pub fn explain(&self) -> String {
465        transformations::explain(self)
466    }
467
468    /// Return schema as tree string. PySpark printSchema (returns string; print to stdout if needed).
469    pub fn print_schema(&self) -> Result<String, PolarsError> {
470        transformations::print_schema(self)
471    }
472
473    /// No-op: Polars backend is eager. PySpark checkpoint.
474    pub fn checkpoint(&self) -> Result<DataFrame, PolarsError> {
475        Ok(self.clone())
476    }
477
478    /// No-op: Polars backend is eager. PySpark localCheckpoint.
479    pub fn local_checkpoint(&self) -> Result<DataFrame, PolarsError> {
480        Ok(self.clone())
481    }
482
483    /// No-op: single partition in Polars. PySpark repartition(n).
484    pub fn repartition(&self, _num_partitions: usize) -> Result<DataFrame, PolarsError> {
485        Ok(self.clone())
486    }
487
488    /// No-op: Polars has no range partitioning. PySpark repartitionByRange(n, cols).
489    pub fn repartition_by_range(
490        &self,
491        _num_partitions: usize,
492        _cols: Vec<&str>,
493    ) -> Result<DataFrame, PolarsError> {
494        Ok(self.clone())
495    }
496
497    /// Column names and dtype strings. PySpark dtypes. Returns (name, dtype_string) per column.
498    pub fn dtypes(&self) -> Result<Vec<(String, String)>, PolarsError> {
499        let schema = self.df.schema();
500        Ok(schema
501            .iter_names_and_dtypes()
502            .map(|(name, dtype)| (name.to_string(), format!("{dtype:?}")))
503            .collect())
504    }
505
506    /// No-op: we don't model partitions. PySpark sortWithinPartitions. Same as orderBy for compatibility.
507    pub fn sort_within_partitions(
508        &self,
509        _cols: &[crate::functions::SortOrder],
510    ) -> Result<DataFrame, PolarsError> {
511        Ok(self.clone())
512    }
513
514    /// No-op: single partition in Polars. PySpark coalesce(n).
515    pub fn coalesce(&self, _num_partitions: usize) -> Result<DataFrame, PolarsError> {
516        Ok(self.clone())
517    }
518
519    /// No-op. PySpark hint (query planner hint).
520    pub fn hint(&self, _name: &str, _params: &[i32]) -> Result<DataFrame, PolarsError> {
521        Ok(self.clone())
522    }
523
524    /// Returns true (eager single-node). PySpark isLocal.
525    pub fn is_local(&self) -> bool {
526        true
527    }
528
529    /// Returns empty vec (no file sources). PySpark inputFiles.
530    pub fn input_files(&self) -> Vec<String> {
531        Vec::new()
532    }
533
534    /// No-op; returns false. PySpark sameSemantics.
535    pub fn same_semantics(&self, _other: &DataFrame) -> bool {
536        false
537    }
538
539    /// No-op; returns 0. PySpark semanticHash.
540    pub fn semantic_hash(&self) -> u64 {
541        0
542    }
543
544    /// No-op. PySpark observe (metrics).
545    pub fn observe(&self, _name: &str, _expr: Expr) -> Result<DataFrame, PolarsError> {
546        Ok(self.clone())
547    }
548
549    /// No-op. PySpark withWatermark (streaming).
550    pub fn with_watermark(
551        &self,
552        _event_time: &str,
553        _delay: &str,
554    ) -> Result<DataFrame, PolarsError> {
555        Ok(self.clone())
556    }
557
558    /// Select by expression strings (minimal: column names, optionally "col as alias"). PySpark selectExpr.
559    pub fn select_expr(&self, exprs: &[String]) -> Result<DataFrame, PolarsError> {
560        transformations::select_expr(self, exprs, self.case_sensitive)
561    }
562
563    /// Select columns whose names match the regex. PySpark colRegex.
564    pub fn col_regex(&self, pattern: &str) -> Result<DataFrame, PolarsError> {
565        transformations::col_regex(self, pattern, self.case_sensitive)
566    }
567
568    /// Add or replace multiple columns. PySpark withColumns. Accepts `Column` so rand/randn get per-row values.
569    pub fn with_columns(&self, exprs: &[(String, Column)]) -> Result<DataFrame, PolarsError> {
570        transformations::with_columns(self, exprs, self.case_sensitive)
571    }
572
573    /// Rename multiple columns. PySpark withColumnsRenamed.
574    pub fn with_columns_renamed(
575        &self,
576        renames: &[(String, String)],
577    ) -> Result<DataFrame, PolarsError> {
578        transformations::with_columns_renamed(self, renames, self.case_sensitive)
579    }
580
581    /// NA sub-API. PySpark df.na().
582    pub fn na(&self) -> DataFrameNa<'_> {
583        DataFrameNa { df: self }
584    }
585
586    /// Skip first n rows. PySpark offset(n).
587    pub fn offset(&self, n: usize) -> Result<DataFrame, PolarsError> {
588        transformations::offset(self, n, self.case_sensitive)
589    }
590
591    /// Transform by a function. PySpark transform(func).
592    pub fn transform<F>(&self, f: F) -> Result<DataFrame, PolarsError>
593    where
594        F: FnOnce(DataFrame) -> Result<DataFrame, PolarsError>,
595    {
596        transformations::transform(self, f)
597    }
598
599    /// Frequent items. PySpark freqItems (stub).
600    pub fn freq_items(&self, columns: &[&str], support: f64) -> Result<DataFrame, PolarsError> {
601        transformations::freq_items(self, columns, support, self.case_sensitive)
602    }
603
604    /// Approximate quantiles. PySpark approxQuantile (stub).
605    pub fn approx_quantile(
606        &self,
607        column: &str,
608        probabilities: &[f64],
609    ) -> Result<DataFrame, PolarsError> {
610        transformations::approx_quantile(self, column, probabilities, self.case_sensitive)
611    }
612
613    /// Cross-tabulation. PySpark crosstab (stub).
614    pub fn crosstab(&self, col1: &str, col2: &str) -> Result<DataFrame, PolarsError> {
615        transformations::crosstab(self, col1, col2, self.case_sensitive)
616    }
617
618    /// Unpivot (melt). PySpark melt (stub).
619    pub fn melt(&self, id_vars: &[&str], value_vars: &[&str]) -> Result<DataFrame, PolarsError> {
620        transformations::melt(self, id_vars, value_vars, self.case_sensitive)
621    }
622
623    /// Pivot (wide format). PySpark pivot. Stub: not yet implemented; use crosstab for two-column count.
624    pub fn pivot(
625        &self,
626        _pivot_col: &str,
627        _values: Option<Vec<&str>>,
628    ) -> Result<DataFrame, PolarsError> {
629        Err(PolarsError::InvalidOperation(
630            "pivot is not yet implemented; use crosstab(col1, col2) for two-column cross-tabulation."
631                .into(),
632        ))
633    }
634
635    /// Set difference keeping duplicates. PySpark exceptAll.
636    pub fn except_all(&self, other: &DataFrame) -> Result<DataFrame, PolarsError> {
637        transformations::except_all(self, other, self.case_sensitive)
638    }
639
640    /// Set intersection keeping duplicates. PySpark intersectAll.
641    pub fn intersect_all(&self, other: &DataFrame) -> Result<DataFrame, PolarsError> {
642        transformations::intersect_all(self, other, self.case_sensitive)
643    }
644
645    /// Write this DataFrame to a Delta table at the given path.
646    /// Requires the `delta` feature. If `overwrite` is true, replaces the table; otherwise appends.
647    #[cfg(feature = "delta")]
648    pub fn write_delta(
649        &self,
650        path: impl AsRef<std::path::Path>,
651        overwrite: bool,
652    ) -> Result<(), PolarsError> {
653        crate::delta::write_delta(self.df.as_ref(), path, overwrite)
654    }
655
656    /// Stub when `delta` feature is disabled.
657    #[cfg(not(feature = "delta"))]
658    pub fn write_delta(
659        &self,
660        _path: impl AsRef<std::path::Path>,
661        _overwrite: bool,
662    ) -> Result<(), PolarsError> {
663        Err(PolarsError::InvalidOperation(
664            "Delta Lake requires the 'delta' feature. Build with --features delta.".into(),
665        ))
666    }
667
668    /// Return a writer for generic format (parquet, csv, json). PySpark-style write API.
669    pub fn write(&self) -> DataFrameWriter<'_> {
670        DataFrameWriter {
671            df: self,
672            mode: WriteMode::Overwrite,
673            format: WriteFormat::Parquet,
674            options: HashMap::new(),
675            partition_by: Vec::new(),
676        }
677    }
678}
679
680/// Write mode: overwrite or append (PySpark DataFrameWriter.mode).
681#[derive(Clone, Copy)]
682pub enum WriteMode {
683    Overwrite,
684    Append,
685}
686
687/// Output format for generic write (PySpark DataFrameWriter.format).
688#[derive(Clone, Copy)]
689pub enum WriteFormat {
690    Parquet,
691    Csv,
692    Json,
693}
694
695/// Builder for writing DataFrame to path (PySpark DataFrameWriter).
696pub struct DataFrameWriter<'a> {
697    df: &'a DataFrame,
698    mode: WriteMode,
699    format: WriteFormat,
700    options: HashMap<String, String>,
701    partition_by: Vec<String>,
702}
703
704impl<'a> DataFrameWriter<'a> {
705    pub fn mode(mut self, mode: WriteMode) -> Self {
706        self.mode = mode;
707        self
708    }
709
710    pub fn format(mut self, format: WriteFormat) -> Self {
711        self.format = format;
712        self
713    }
714
715    /// Add a single option (PySpark: option(key, value)). Returns self for chaining.
716    pub fn option(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
717        self.options.insert(key.into(), value.into());
718        self
719    }
720
721    /// Add multiple options (PySpark: options(**kwargs)). Returns self for chaining.
722    pub fn options(mut self, opts: impl IntoIterator<Item = (String, String)>) -> Self {
723        for (k, v) in opts {
724            self.options.insert(k, v);
725        }
726        self
727    }
728
729    /// Partition output by the given columns (PySpark: partitionBy(cols)).
730    pub fn partition_by(mut self, cols: impl IntoIterator<Item = impl Into<String>>) -> Self {
731        self.partition_by = cols.into_iter().map(|s| s.into()).collect();
732        self
733    }
734
735    /// Write as Parquet (PySpark: parquet(path)). Equivalent to format(Parquet).save(path).
736    pub fn parquet(&self, path: impl AsRef<std::path::Path>) -> Result<(), PolarsError> {
737        DataFrameWriter {
738            df: self.df,
739            mode: self.mode,
740            format: WriteFormat::Parquet,
741            options: self.options.clone(),
742            partition_by: self.partition_by.clone(),
743        }
744        .save(path)
745    }
746
747    /// Write as CSV (PySpark: csv(path)). Equivalent to format(Csv).save(path).
748    pub fn csv(&self, path: impl AsRef<std::path::Path>) -> Result<(), PolarsError> {
749        DataFrameWriter {
750            df: self.df,
751            mode: self.mode,
752            format: WriteFormat::Csv,
753            options: self.options.clone(),
754            partition_by: self.partition_by.clone(),
755        }
756        .save(path)
757    }
758
759    /// Write as JSON lines (PySpark: json(path)). Equivalent to format(Json).save(path).
760    pub fn json(&self, path: impl AsRef<std::path::Path>) -> Result<(), PolarsError> {
761        DataFrameWriter {
762            df: self.df,
763            mode: self.mode,
764            format: WriteFormat::Json,
765            options: self.options.clone(),
766            partition_by: self.partition_by.clone(),
767        }
768        .save(path)
769    }
770
771    /// Write to path. Overwrite replaces; append reads existing (if any) and concatenates then writes.
772    pub fn save(&self, path: impl AsRef<std::path::Path>) -> Result<(), PolarsError> {
773        use polars::prelude::*;
774        let path = path.as_ref();
775        let to_write: PlDataFrame = match self.mode {
776            WriteMode::Overwrite => self.df.df.as_ref().clone(),
777            WriteMode::Append => {
778                use polars::prelude::*;
779                let existing: Option<PlDataFrame> = if path.exists() {
780                    match self.format {
781                        WriteFormat::Parquet => {
782                            LazyFrame::scan_parquet(path, ScanArgsParquet::default())
783                                .and_then(|lf| lf.collect())
784                                .ok()
785                        }
786                        WriteFormat::Csv => LazyCsvReader::new(path)
787                            .with_has_header(true)
788                            .finish()
789                            .and_then(|lf| lf.collect())
790                            .ok(),
791                        WriteFormat::Json => LazyJsonLineReader::new(path)
792                            .finish()
793                            .and_then(|lf| lf.collect())
794                            .ok(),
795                    }
796                } else {
797                    None
798                };
799                match existing {
800                    Some(existing) => {
801                        let lfs: [polars::prelude::LazyFrame; 2] =
802                            [existing.lazy(), self.df.df.as_ref().clone().lazy()];
803                        polars::prelude::concat(lfs, UnionArgs::default())?.collect()?
804                    }
805                    None => self.df.df.as_ref().clone(),
806                }
807            }
808        };
809        match self.format {
810            WriteFormat::Parquet => {
811                if self.partition_by.is_empty() {
812                    let mut file = std::fs::File::create(path).map_err(|e| {
813                        PolarsError::ComputeError(format!("write parquet create: {e}").into())
814                    })?;
815                    let mut df_mut = to_write;
816                    ParquetWriter::new(&mut file)
817                        .finish(&mut df_mut)
818                        .map_err(|e| {
819                            PolarsError::ComputeError(format!("write parquet: {e}").into())
820                        })?;
821                } else {
822                    // partitionBy stored but partitioned parquet write deferred
823                    return Err(PolarsError::InvalidOperation(
824                        "partitionBy for parquet write is not yet implemented. Use save(path) without partitionBy.".into(),
825                    ));
826                }
827            }
828            WriteFormat::Csv => {
829                let has_header = self
830                    .options
831                    .get("header")
832                    .map(|v| v.eq_ignore_ascii_case("true") || v == "1")
833                    .unwrap_or(true);
834                let delimiter = self
835                    .options
836                    .get("sep")
837                    .and_then(|s| s.bytes().next())
838                    .unwrap_or(b',');
839                let mut file = std::fs::File::create(path).map_err(|e| {
840                    PolarsError::ComputeError(format!("write csv create: {e}").into())
841                })?;
842                CsvWriter::new(&mut file)
843                    .include_header(has_header)
844                    .with_separator(delimiter)
845                    .finish(&mut to_write.clone())
846                    .map_err(|e| PolarsError::ComputeError(format!("write csv: {e}").into()))?;
847            }
848            WriteFormat::Json => {
849                let mut file = std::fs::File::create(path).map_err(|e| {
850                    PolarsError::ComputeError(format!("write json create: {e}").into())
851                })?;
852                JsonWriter::new(&mut file)
853                    .finish(&mut to_write.clone())
854                    .map_err(|e| PolarsError::ComputeError(format!("write json: {e}").into()))?;
855            }
856        }
857        Ok(())
858    }
859}
860
861impl Clone for DataFrame {
862    fn clone(&self) -> Self {
863        DataFrame {
864            df: self.df.clone(),
865            case_sensitive: self.case_sensitive,
866        }
867    }
868}
869
870/// Convert Polars AnyValue to serde_json::Value for language bindings (Node, etc.).
871fn any_value_to_json(av: AnyValue<'_>) -> JsonValue {
872    match av {
873        AnyValue::Null => JsonValue::Null,
874        AnyValue::Boolean(b) => JsonValue::Bool(b),
875        AnyValue::Int32(i) => JsonValue::Number(serde_json::Number::from(i)),
876        AnyValue::Int64(i) => JsonValue::Number(serde_json::Number::from(i)),
877        AnyValue::UInt32(u) => JsonValue::Number(serde_json::Number::from(u)),
878        AnyValue::UInt64(u) => JsonValue::Number(serde_json::Number::from(u)),
879        AnyValue::Float32(f) => serde_json::Number::from_f64(f64::from(f))
880            .map(JsonValue::Number)
881            .unwrap_or(JsonValue::Null),
882        AnyValue::Float64(f) => serde_json::Number::from_f64(f)
883            .map(JsonValue::Number)
884            .unwrap_or(JsonValue::Null),
885        AnyValue::String(s) => JsonValue::String(s.to_string()),
886        AnyValue::StringOwned(s) => JsonValue::String(s.to_string()),
887        _ => JsonValue::Null,
888    }
889}