Skip to main content

robin_sparkless/dataframe/
mod.rs

1//! DataFrame module: main tabular type and submodules for transformations, aggregations, joins, stats.
2
3mod aggregations;
4mod joins;
5mod stats;
6mod transformations;
7
8pub use aggregations::{CubeRollupData, GroupedData};
9pub use joins::{join, JoinType};
10pub use stats::DataFrameStat;
11pub use transformations::{filter, order_by, order_by_exprs, select, with_column, DataFrameNa};
12
13use crate::column::Column;
14use crate::functions::SortOrder;
15use crate::schema::StructType;
16use polars::prelude::{
17    AnyValue, DataFrame as PlDataFrame, Expr, PolarsError, SchemaNamesAndDtypes,
18};
19use serde_json::Value as JsonValue;
20use std::collections::HashMap;
21use std::sync::Arc;
22
23/// Default for `spark.sql.caseSensitive` (PySpark default is false = case-insensitive).
24const DEFAULT_CASE_SENSITIVE: bool = false;
25
26/// DataFrame - main tabular data structure.
27/// Thin wrapper around an eager Polars `DataFrame`.
28pub struct DataFrame {
29    pub(crate) df: Arc<PlDataFrame>,
30    /// When false (default), column names are matched case-insensitively (PySpark behavior).
31    pub(crate) case_sensitive: bool,
32}
33
34impl DataFrame {
35    /// Create a new DataFrame from a Polars DataFrame (case-insensitive column matching by default).
36    pub fn from_polars(df: PlDataFrame) -> Self {
37        DataFrame {
38            df: Arc::new(df),
39            case_sensitive: DEFAULT_CASE_SENSITIVE,
40        }
41    }
42
43    /// Create a new DataFrame from a Polars DataFrame with explicit case sensitivity.
44    /// When `case_sensitive` is false, column resolution is case-insensitive (PySpark default).
45    pub fn from_polars_with_options(df: PlDataFrame, case_sensitive: bool) -> Self {
46        DataFrame {
47            df: Arc::new(df),
48            case_sensitive,
49        }
50    }
51
52    /// Create an empty DataFrame
53    pub fn empty() -> Self {
54        DataFrame {
55            df: Arc::new(PlDataFrame::empty()),
56            case_sensitive: DEFAULT_CASE_SENSITIVE,
57        }
58    }
59
60    /// Resolve a logical column name to the actual column name in the schema.
61    /// When case_sensitive is false, matches case-insensitively.
62    pub fn resolve_column_name(&self, name: &str) -> Result<String, PolarsError> {
63        let names = self.df.get_column_names();
64        if self.case_sensitive {
65            if names.iter().any(|n| *n == name) {
66                return Ok(name.to_string());
67            }
68        } else {
69            let name_lower = name.to_lowercase();
70            for n in names {
71                if n.to_lowercase() == name_lower {
72                    return Ok(n.to_string());
73                }
74            }
75        }
76        let available: Vec<String> = self
77            .df
78            .get_column_names()
79            .iter()
80            .map(|s| s.to_string())
81            .collect();
82        Err(PolarsError::ColumnNotFound(
83            format!(
84                "Column '{}' not found. Available columns: [{}]. Check spelling and case sensitivity (spark.sql.caseSensitive).",
85                name,
86                available.join(", ")
87            )
88            .into(),
89        ))
90    }
91
92    /// Get the schema of the DataFrame
93    pub fn schema(&self) -> Result<StructType, PolarsError> {
94        Ok(StructType::from_polars_schema(&self.df.schema()))
95    }
96
97    /// Get column names
98    pub fn columns(&self) -> Result<Vec<String>, PolarsError> {
99        Ok(self
100            .df
101            .get_column_names()
102            .iter()
103            .map(|s| s.to_string())
104            .collect())
105    }
106
107    /// Count the number of rows (action - triggers execution)
108    pub fn count(&self) -> Result<usize, PolarsError> {
109        Ok(self.df.height())
110    }
111
112    /// Show the first n rows
113    pub fn show(&self, n: Option<usize>) -> Result<(), PolarsError> {
114        let n = n.unwrap_or(20);
115        println!("{}", self.df.head(Some(n)));
116        Ok(())
117    }
118
119    /// Collect the DataFrame (action - triggers execution)
120    pub fn collect(&self) -> Result<Arc<PlDataFrame>, PolarsError> {
121        Ok(self.df.clone())
122    }
123
124    /// Collect as rows of column-name -> JSON value. For use by language bindings (Node, etc.).
125    pub fn collect_as_json_rows(&self) -> Result<Vec<HashMap<String, JsonValue>>, PolarsError> {
126        let df = self.df.as_ref();
127        let names = df.get_column_names();
128        let nrows = df.height();
129        let mut rows = Vec::with_capacity(nrows);
130        for i in 0..nrows {
131            let mut row = HashMap::with_capacity(names.len());
132            for (col_idx, name) in names.iter().enumerate() {
133                let s = df
134                    .get_columns()
135                    .get(col_idx)
136                    .ok_or_else(|| PolarsError::ComputeError("column index out of range".into()))?;
137                let av = s.get(i)?;
138                let jv = any_value_to_json(av);
139                row.insert(name.to_string(), jv);
140            }
141            rows.push(row);
142        }
143        Ok(rows)
144    }
145
146    /// Select columns (returns a new DataFrame).
147    /// Column names are resolved according to case sensitivity.
148    pub fn select(&self, cols: Vec<&str>) -> Result<DataFrame, PolarsError> {
149        let resolved: Vec<String> = cols
150            .iter()
151            .map(|c| self.resolve_column_name(c))
152            .collect::<Result<Vec<_>, _>>()?;
153        let refs: Vec<&str> = resolved.iter().map(|s| s.as_str()).collect();
154        let mut result = transformations::select(self, refs, self.case_sensitive)?;
155        // When case-insensitive, PySpark returns column names in requested (e.g. lowercase) form.
156        if !self.case_sensitive {
157            for (requested, res) in cols.iter().zip(resolved.iter()) {
158                if *requested != res.as_str() {
159                    result = result.with_column_renamed(res, requested)?;
160                }
161            }
162        }
163        Ok(result)
164    }
165
166    /// Filter rows using a Polars expression.
167    pub fn filter(&self, condition: Expr) -> Result<DataFrame, PolarsError> {
168        transformations::filter(self, condition, self.case_sensitive)
169    }
170
171    /// Get a column reference by name (for building expressions).
172    /// Respects case sensitivity: when false, "Age" resolves to column "age" if present.
173    pub fn column(&self, name: &str) -> Result<Column, PolarsError> {
174        let resolved = self.resolve_column_name(name)?;
175        Ok(Column::new(resolved))
176    }
177
178    /// Add or replace a column. Use a [`Column`] (e.g. from `col("x")`, `rand(42)`, `randn(42)`).
179    /// For `rand`/`randn`, generates one distinct value per row (PySpark-like).
180    pub fn with_column(&self, column_name: &str, col: &Column) -> Result<DataFrame, PolarsError> {
181        transformations::with_column(self, column_name, col, self.case_sensitive)
182    }
183
184    /// Add or replace a column using an expression. Prefer [`with_column`](Self::with_column) with a `Column` for rand/randn (per-row values).
185    pub fn with_column_expr(
186        &self,
187        column_name: &str,
188        expr: Expr,
189    ) -> Result<DataFrame, PolarsError> {
190        let col = Column::from_expr(expr, None);
191        self.with_column(column_name, &col)
192    }
193
194    /// Group by columns (returns GroupedData for aggregation).
195    /// Column names are resolved according to case sensitivity.
196    pub fn group_by(&self, column_names: Vec<&str>) -> Result<GroupedData, PolarsError> {
197        use polars::prelude::*;
198        let resolved: Vec<String> = column_names
199            .iter()
200            .map(|c| self.resolve_column_name(c))
201            .collect::<Result<Vec<_>, _>>()?;
202        let exprs: Vec<Expr> = resolved.iter().map(|name| col(name.as_str())).collect();
203        let lazy_grouped = self.df.as_ref().clone().lazy().group_by(exprs);
204        Ok(GroupedData {
205            lazy_grouped,
206            grouping_cols: resolved,
207            case_sensitive: self.case_sensitive,
208        })
209    }
210
211    /// Cube: multiple grouping sets (all subsets of columns), then union (PySpark cube).
212    pub fn cube(&self, column_names: Vec<&str>) -> Result<CubeRollupData, PolarsError> {
213        let resolved: Vec<String> = column_names
214            .iter()
215            .map(|c| self.resolve_column_name(c))
216            .collect::<Result<Vec<_>, _>>()?;
217        Ok(CubeRollupData {
218            df: self.df.as_ref().clone(),
219            grouping_cols: resolved,
220            case_sensitive: self.case_sensitive,
221            is_cube: true,
222        })
223    }
224
225    /// Rollup: grouping sets (prefixes of columns), then union (PySpark rollup).
226    pub fn rollup(&self, column_names: Vec<&str>) -> Result<CubeRollupData, PolarsError> {
227        let resolved: Vec<String> = column_names
228            .iter()
229            .map(|c| self.resolve_column_name(c))
230            .collect::<Result<Vec<_>, _>>()?;
231        Ok(CubeRollupData {
232            df: self.df.as_ref().clone(),
233            grouping_cols: resolved,
234            case_sensitive: self.case_sensitive,
235            is_cube: false,
236        })
237    }
238
239    /// Join with another DataFrame on the given columns.
240    /// Join column names are resolved on the left (and right must have matching names).
241    pub fn join(
242        &self,
243        other: &DataFrame,
244        on: Vec<&str>,
245        how: JoinType,
246    ) -> Result<DataFrame, PolarsError> {
247        let resolved: Vec<String> = on
248            .iter()
249            .map(|c| self.resolve_column_name(c))
250            .collect::<Result<Vec<_>, _>>()?;
251        let on_refs: Vec<&str> = resolved.iter().map(|s| s.as_str()).collect();
252        join(self, other, on_refs, how, self.case_sensitive)
253    }
254
255    /// Order by columns (sort).
256    /// Column names are resolved according to case sensitivity.
257    pub fn order_by(
258        &self,
259        column_names: Vec<&str>,
260        ascending: Vec<bool>,
261    ) -> Result<DataFrame, PolarsError> {
262        let resolved: Vec<String> = column_names
263            .iter()
264            .map(|c| self.resolve_column_name(c))
265            .collect::<Result<Vec<_>, _>>()?;
266        let refs: Vec<&str> = resolved.iter().map(|s| s.as_str()).collect();
267        transformations::order_by(self, refs, ascending, self.case_sensitive)
268    }
269
270    /// Order by sort expressions (asc/desc with nulls_first/last).
271    pub fn order_by_exprs(&self, sort_orders: Vec<SortOrder>) -> Result<DataFrame, PolarsError> {
272        transformations::order_by_exprs(self, sort_orders, self.case_sensitive)
273    }
274
275    /// Union (unionAll): stack another DataFrame vertically. Schemas must match (same columns, same order).
276    pub fn union(&self, other: &DataFrame) -> Result<DataFrame, PolarsError> {
277        transformations::union(self, other, self.case_sensitive)
278    }
279
280    /// Union by name: stack vertically, aligning columns by name.
281    pub fn union_by_name(&self, other: &DataFrame) -> Result<DataFrame, PolarsError> {
282        transformations::union_by_name(self, other, self.case_sensitive)
283    }
284
285    /// Distinct: drop duplicate rows (all columns or optional subset).
286    pub fn distinct(&self, subset: Option<Vec<&str>>) -> Result<DataFrame, PolarsError> {
287        transformations::distinct(self, subset, self.case_sensitive)
288    }
289
290    /// Drop one or more columns.
291    pub fn drop(&self, columns: Vec<&str>) -> Result<DataFrame, PolarsError> {
292        transformations::drop(self, columns, self.case_sensitive)
293    }
294
295    /// Drop rows with nulls (all columns or optional subset).
296    pub fn dropna(&self, subset: Option<Vec<&str>>) -> Result<DataFrame, PolarsError> {
297        transformations::dropna(self, subset, self.case_sensitive)
298    }
299
300    /// Fill nulls with a literal expression (applied to all columns).
301    pub fn fillna(&self, value: Expr) -> Result<DataFrame, PolarsError> {
302        transformations::fillna(self, value, self.case_sensitive)
303    }
304
305    /// Limit: return first n rows.
306    pub fn limit(&self, n: usize) -> Result<DataFrame, PolarsError> {
307        transformations::limit(self, n, self.case_sensitive)
308    }
309
310    /// Rename a column (old_name -> new_name).
311    pub fn with_column_renamed(
312        &self,
313        old_name: &str,
314        new_name: &str,
315    ) -> Result<DataFrame, PolarsError> {
316        transformations::with_column_renamed(self, old_name, new_name, self.case_sensitive)
317    }
318
319    /// Replace values in a column (old_value -> new_value). PySpark replace.
320    pub fn replace(
321        &self,
322        column_name: &str,
323        old_value: Expr,
324        new_value: Expr,
325    ) -> Result<DataFrame, PolarsError> {
326        transformations::replace(self, column_name, old_value, new_value, self.case_sensitive)
327    }
328
329    /// Cross join with another DataFrame (cartesian product). PySpark crossJoin.
330    pub fn cross_join(&self, other: &DataFrame) -> Result<DataFrame, PolarsError> {
331        transformations::cross_join(self, other, self.case_sensitive)
332    }
333
334    /// Summary statistics. PySpark describe.
335    pub fn describe(&self) -> Result<DataFrame, PolarsError> {
336        transformations::describe(self, self.case_sensitive)
337    }
338
339    /// No-op: execution is eager by default. PySpark cache.
340    pub fn cache(&self) -> Result<DataFrame, PolarsError> {
341        Ok(self.clone())
342    }
343
344    /// No-op: execution is eager by default. PySpark persist.
345    pub fn persist(&self) -> Result<DataFrame, PolarsError> {
346        Ok(self.clone())
347    }
348
349    /// No-op. PySpark unpersist.
350    pub fn unpersist(&self) -> Result<DataFrame, PolarsError> {
351        Ok(self.clone())
352    }
353
354    /// Set difference: rows in self not in other. PySpark subtract / except.
355    pub fn subtract(&self, other: &DataFrame) -> Result<DataFrame, PolarsError> {
356        transformations::subtract(self, other, self.case_sensitive)
357    }
358
359    /// Set intersection: rows in both self and other. PySpark intersect.
360    pub fn intersect(&self, other: &DataFrame) -> Result<DataFrame, PolarsError> {
361        transformations::intersect(self, other, self.case_sensitive)
362    }
363
364    /// Sample a fraction of rows. PySpark sample(withReplacement, fraction, seed).
365    pub fn sample(
366        &self,
367        with_replacement: bool,
368        fraction: f64,
369        seed: Option<u64>,
370    ) -> Result<DataFrame, PolarsError> {
371        transformations::sample(self, with_replacement, fraction, seed, self.case_sensitive)
372    }
373
374    /// Split into multiple DataFrames by weights. PySpark randomSplit(weights, seed).
375    pub fn random_split(
376        &self,
377        weights: &[f64],
378        seed: Option<u64>,
379    ) -> Result<Vec<DataFrame>, PolarsError> {
380        transformations::random_split(self, weights, seed, self.case_sensitive)
381    }
382
383    /// Stratified sample by column value. PySpark sampleBy(col, fractions, seed).
384    /// fractions: list of (value as Expr, fraction) for that stratum.
385    pub fn sample_by(
386        &self,
387        col_name: &str,
388        fractions: &[(Expr, f64)],
389        seed: Option<u64>,
390    ) -> Result<DataFrame, PolarsError> {
391        transformations::sample_by(self, col_name, fractions, seed, self.case_sensitive)
392    }
393
394    /// First row as a one-row DataFrame. PySpark first().
395    pub fn first(&self) -> Result<DataFrame, PolarsError> {
396        transformations::first(self, self.case_sensitive)
397    }
398
399    /// First n rows. PySpark head(n).
400    pub fn head(&self, n: usize) -> Result<DataFrame, PolarsError> {
401        transformations::head(self, n, self.case_sensitive)
402    }
403
404    /// Take first n rows. PySpark take(n).
405    pub fn take(&self, n: usize) -> Result<DataFrame, PolarsError> {
406        transformations::take(self, n, self.case_sensitive)
407    }
408
409    /// Last n rows. PySpark tail(n).
410    pub fn tail(&self, n: usize) -> Result<DataFrame, PolarsError> {
411        transformations::tail(self, n, self.case_sensitive)
412    }
413
414    /// True if the DataFrame has zero rows. PySpark isEmpty.
415    pub fn is_empty(&self) -> bool {
416        transformations::is_empty(self)
417    }
418
419    /// Rename columns. PySpark toDF(*colNames).
420    pub fn to_df(&self, names: Vec<&str>) -> Result<DataFrame, PolarsError> {
421        transformations::to_df(self, &names, self.case_sensitive)
422    }
423
424    /// Statistical helper. PySpark df.stat().cov / .corr.
425    pub fn stat(&self) -> DataFrameStat<'_> {
426        DataFrameStat { df: self }
427    }
428
429    /// Correlation matrix of all numeric columns. PySpark df.corr() returns a DataFrame of pairwise correlations.
430    pub fn corr(&self) -> Result<DataFrame, PolarsError> {
431        self.stat().corr_matrix()
432    }
433
434    /// Summary statistics (alias for describe). PySpark summary.
435    pub fn summary(&self) -> Result<DataFrame, PolarsError> {
436        self.describe()
437    }
438
439    /// Collect rows as JSON strings (one per row). PySpark toJSON.
440    pub fn to_json(&self) -> Result<Vec<String>, PolarsError> {
441        transformations::to_json(self)
442    }
443
444    /// Return execution plan description. PySpark explain.
445    pub fn explain(&self) -> String {
446        transformations::explain(self)
447    }
448
449    /// Return schema as tree string. PySpark printSchema (returns string; print to stdout if needed).
450    pub fn print_schema(&self) -> Result<String, PolarsError> {
451        transformations::print_schema(self)
452    }
453
454    /// No-op: Polars backend is eager. PySpark checkpoint.
455    pub fn checkpoint(&self) -> Result<DataFrame, PolarsError> {
456        Ok(self.clone())
457    }
458
459    /// No-op: Polars backend is eager. PySpark localCheckpoint.
460    pub fn local_checkpoint(&self) -> Result<DataFrame, PolarsError> {
461        Ok(self.clone())
462    }
463
464    /// No-op: single partition in Polars. PySpark repartition(n).
465    pub fn repartition(&self, _num_partitions: usize) -> Result<DataFrame, PolarsError> {
466        Ok(self.clone())
467    }
468
469    /// No-op: Polars has no range partitioning. PySpark repartitionByRange(n, cols).
470    pub fn repartition_by_range(
471        &self,
472        _num_partitions: usize,
473        _cols: Vec<&str>,
474    ) -> Result<DataFrame, PolarsError> {
475        Ok(self.clone())
476    }
477
478    /// Column names and dtype strings. PySpark dtypes. Returns (name, dtype_string) per column.
479    pub fn dtypes(&self) -> Result<Vec<(String, String)>, PolarsError> {
480        let schema = self.df.schema();
481        Ok(schema
482            .iter_names_and_dtypes()
483            .map(|(name, dtype)| (name.to_string(), format!("{dtype:?}")))
484            .collect())
485    }
486
487    /// No-op: we don't model partitions. PySpark sortWithinPartitions. Same as orderBy for compatibility.
488    pub fn sort_within_partitions(
489        &self,
490        _cols: &[crate::functions::SortOrder],
491    ) -> Result<DataFrame, PolarsError> {
492        Ok(self.clone())
493    }
494
495    /// No-op: single partition in Polars. PySpark coalesce(n).
496    pub fn coalesce(&self, _num_partitions: usize) -> Result<DataFrame, PolarsError> {
497        Ok(self.clone())
498    }
499
500    /// No-op. PySpark hint (query planner hint).
501    pub fn hint(&self, _name: &str, _params: &[i32]) -> Result<DataFrame, PolarsError> {
502        Ok(self.clone())
503    }
504
505    /// Returns true (eager single-node). PySpark isLocal.
506    pub fn is_local(&self) -> bool {
507        true
508    }
509
510    /// Returns empty vec (no file sources). PySpark inputFiles.
511    pub fn input_files(&self) -> Vec<String> {
512        Vec::new()
513    }
514
515    /// No-op; returns false. PySpark sameSemantics.
516    pub fn same_semantics(&self, _other: &DataFrame) -> bool {
517        false
518    }
519
520    /// No-op; returns 0. PySpark semanticHash.
521    pub fn semantic_hash(&self) -> u64 {
522        0
523    }
524
525    /// No-op. PySpark observe (metrics).
526    pub fn observe(&self, _name: &str, _expr: Expr) -> Result<DataFrame, PolarsError> {
527        Ok(self.clone())
528    }
529
530    /// No-op. PySpark withWatermark (streaming).
531    pub fn with_watermark(
532        &self,
533        _event_time: &str,
534        _delay: &str,
535    ) -> Result<DataFrame, PolarsError> {
536        Ok(self.clone())
537    }
538
539    /// Select by expression strings (minimal: column names, optionally "col as alias"). PySpark selectExpr.
540    pub fn select_expr(&self, exprs: &[String]) -> Result<DataFrame, PolarsError> {
541        transformations::select_expr(self, exprs, self.case_sensitive)
542    }
543
544    /// Select columns whose names match the regex. PySpark colRegex.
545    pub fn col_regex(&self, pattern: &str) -> Result<DataFrame, PolarsError> {
546        transformations::col_regex(self, pattern, self.case_sensitive)
547    }
548
549    /// Add or replace multiple columns. PySpark withColumns. Accepts `Column` so rand/randn get per-row values.
550    pub fn with_columns(&self, exprs: &[(String, Column)]) -> Result<DataFrame, PolarsError> {
551        transformations::with_columns(self, exprs, self.case_sensitive)
552    }
553
554    /// Rename multiple columns. PySpark withColumnsRenamed.
555    pub fn with_columns_renamed(
556        &self,
557        renames: &[(String, String)],
558    ) -> Result<DataFrame, PolarsError> {
559        transformations::with_columns_renamed(self, renames, self.case_sensitive)
560    }
561
562    /// NA sub-API. PySpark df.na().
563    pub fn na(&self) -> DataFrameNa<'_> {
564        DataFrameNa { df: self }
565    }
566
567    /// Skip first n rows. PySpark offset(n).
568    pub fn offset(&self, n: usize) -> Result<DataFrame, PolarsError> {
569        transformations::offset(self, n, self.case_sensitive)
570    }
571
572    /// Transform by a function. PySpark transform(func).
573    pub fn transform<F>(&self, f: F) -> Result<DataFrame, PolarsError>
574    where
575        F: FnOnce(DataFrame) -> Result<DataFrame, PolarsError>,
576    {
577        transformations::transform(self, f)
578    }
579
580    /// Frequent items. PySpark freqItems (stub).
581    pub fn freq_items(&self, columns: &[&str], support: f64) -> Result<DataFrame, PolarsError> {
582        transformations::freq_items(self, columns, support, self.case_sensitive)
583    }
584
585    /// Approximate quantiles. PySpark approxQuantile (stub).
586    pub fn approx_quantile(
587        &self,
588        column: &str,
589        probabilities: &[f64],
590    ) -> Result<DataFrame, PolarsError> {
591        transformations::approx_quantile(self, column, probabilities, self.case_sensitive)
592    }
593
594    /// Cross-tabulation. PySpark crosstab (stub).
595    pub fn crosstab(&self, col1: &str, col2: &str) -> Result<DataFrame, PolarsError> {
596        transformations::crosstab(self, col1, col2, self.case_sensitive)
597    }
598
599    /// Unpivot (melt). PySpark melt (stub).
600    pub fn melt(&self, id_vars: &[&str], value_vars: &[&str]) -> Result<DataFrame, PolarsError> {
601        transformations::melt(self, id_vars, value_vars, self.case_sensitive)
602    }
603
604    /// Pivot (wide format). PySpark pivot. Stub: not yet implemented; use crosstab for two-column count.
605    pub fn pivot(
606        &self,
607        _pivot_col: &str,
608        _values: Option<Vec<&str>>,
609    ) -> Result<DataFrame, PolarsError> {
610        Err(PolarsError::InvalidOperation(
611            "pivot is not yet implemented; use crosstab(col1, col2) for two-column cross-tabulation."
612                .into(),
613        ))
614    }
615
616    /// Set difference keeping duplicates. PySpark exceptAll.
617    pub fn except_all(&self, other: &DataFrame) -> Result<DataFrame, PolarsError> {
618        transformations::except_all(self, other, self.case_sensitive)
619    }
620
621    /// Set intersection keeping duplicates. PySpark intersectAll.
622    pub fn intersect_all(&self, other: &DataFrame) -> Result<DataFrame, PolarsError> {
623        transformations::intersect_all(self, other, self.case_sensitive)
624    }
625
626    /// Write this DataFrame to a Delta table at the given path.
627    /// Requires the `delta` feature. If `overwrite` is true, replaces the table; otherwise appends.
628    #[cfg(feature = "delta")]
629    pub fn write_delta(
630        &self,
631        path: impl AsRef<std::path::Path>,
632        overwrite: bool,
633    ) -> Result<(), PolarsError> {
634        crate::delta::write_delta(self.df.as_ref(), path, overwrite)
635    }
636
637    /// Stub when `delta` feature is disabled.
638    #[cfg(not(feature = "delta"))]
639    pub fn write_delta(
640        &self,
641        _path: impl AsRef<std::path::Path>,
642        _overwrite: bool,
643    ) -> Result<(), PolarsError> {
644        Err(PolarsError::InvalidOperation(
645            "Delta Lake requires the 'delta' feature. Build with --features delta.".into(),
646        ))
647    }
648
649    /// Return a writer for generic format (parquet, csv, json). PySpark-style write API.
650    pub fn write(&self) -> DataFrameWriter<'_> {
651        DataFrameWriter {
652            df: self,
653            mode: WriteMode::Overwrite,
654            format: WriteFormat::Parquet,
655        }
656    }
657}
658
659/// Write mode: overwrite or append (PySpark DataFrameWriter.mode).
660#[derive(Clone, Copy)]
661pub enum WriteMode {
662    Overwrite,
663    Append,
664}
665
666/// Output format for generic write (PySpark DataFrameWriter.format).
667#[derive(Clone, Copy)]
668pub enum WriteFormat {
669    Parquet,
670    Csv,
671    Json,
672}
673
674/// Builder for writing DataFrame to path (PySpark DataFrameWriter).
675pub struct DataFrameWriter<'a> {
676    df: &'a DataFrame,
677    mode: WriteMode,
678    format: WriteFormat,
679}
680
681impl<'a> DataFrameWriter<'a> {
682    pub fn mode(mut self, mode: WriteMode) -> Self {
683        self.mode = mode;
684        self
685    }
686
687    pub fn format(mut self, format: WriteFormat) -> Self {
688        self.format = format;
689        self
690    }
691
692    /// Write to path. Overwrite replaces; append reads existing (if any) and concatenates then writes.
693    pub fn save(&self, path: impl AsRef<std::path::Path>) -> Result<(), PolarsError> {
694        use polars::prelude::*;
695        let path = path.as_ref();
696        let to_write: PlDataFrame = match self.mode {
697            WriteMode::Overwrite => self.df.df.as_ref().clone(),
698            WriteMode::Append => {
699                use polars::prelude::*;
700                let existing: Option<PlDataFrame> = if path.exists() {
701                    match self.format {
702                        WriteFormat::Parquet => {
703                            LazyFrame::scan_parquet(path, ScanArgsParquet::default())
704                                .and_then(|lf| lf.collect())
705                                .ok()
706                        }
707                        WriteFormat::Csv => LazyCsvReader::new(path)
708                            .with_has_header(true)
709                            .finish()
710                            .and_then(|lf| lf.collect())
711                            .ok(),
712                        WriteFormat::Json => LazyJsonLineReader::new(path)
713                            .finish()
714                            .and_then(|lf| lf.collect())
715                            .ok(),
716                    }
717                } else {
718                    None
719                };
720                match existing {
721                    Some(existing) => {
722                        let lfs: [polars::prelude::LazyFrame; 2] =
723                            [existing.lazy(), self.df.df.as_ref().clone().lazy()];
724                        polars::prelude::concat(lfs, UnionArgs::default())?.collect()?
725                    }
726                    None => self.df.df.as_ref().clone(),
727                }
728            }
729        };
730        match self.format {
731            WriteFormat::Parquet => {
732                let mut file = std::fs::File::create(path).map_err(|e| {
733                    PolarsError::ComputeError(format!("write parquet create: {e}").into())
734                })?;
735                let mut df_mut = to_write;
736                ParquetWriter::new(&mut file)
737                    .finish(&mut df_mut)
738                    .map_err(|e| PolarsError::ComputeError(format!("write parquet: {e}").into()))?;
739            }
740            WriteFormat::Csv => {
741                let mut file = std::fs::File::create(path).map_err(|e| {
742                    PolarsError::ComputeError(format!("write csv create: {e}").into())
743                })?;
744                CsvWriter::new(&mut file)
745                    .finish(&mut to_write.clone())
746                    .map_err(|e| PolarsError::ComputeError(format!("write csv: {e}").into()))?;
747            }
748            WriteFormat::Json => {
749                let mut file = std::fs::File::create(path).map_err(|e| {
750                    PolarsError::ComputeError(format!("write json create: {e}").into())
751                })?;
752                JsonWriter::new(&mut file)
753                    .finish(&mut to_write.clone())
754                    .map_err(|e| PolarsError::ComputeError(format!("write json: {e}").into()))?;
755            }
756        }
757        Ok(())
758    }
759}
760
761impl Clone for DataFrame {
762    fn clone(&self) -> Self {
763        DataFrame {
764            df: self.df.clone(),
765            case_sensitive: self.case_sensitive,
766        }
767    }
768}
769
770/// Convert Polars AnyValue to serde_json::Value for language bindings (Node, etc.).
771fn any_value_to_json(av: AnyValue<'_>) -> JsonValue {
772    match av {
773        AnyValue::Null => JsonValue::Null,
774        AnyValue::Boolean(b) => JsonValue::Bool(b),
775        AnyValue::Int32(i) => JsonValue::Number(serde_json::Number::from(i)),
776        AnyValue::Int64(i) => JsonValue::Number(serde_json::Number::from(i)),
777        AnyValue::UInt32(u) => JsonValue::Number(serde_json::Number::from(u)),
778        AnyValue::UInt64(u) => JsonValue::Number(serde_json::Number::from(u)),
779        AnyValue::Float32(f) => serde_json::Number::from_f64(f64::from(f))
780            .map(JsonValue::Number)
781            .unwrap_or(JsonValue::Null),
782        AnyValue::Float64(f) => serde_json::Number::from_f64(f)
783            .map(JsonValue::Number)
784            .unwrap_or(JsonValue::Null),
785        AnyValue::String(s) => JsonValue::String(s.to_string()),
786        AnyValue::StringOwned(s) => JsonValue::String(s.to_string()),
787        _ => JsonValue::Null,
788    }
789}