Skip to main content

robin_sparkless_polars/dataframe/
transformations.rs

1//! DataFrame transformation operations: filter, select, with_column, order_by,
2//! union, distinct, drop, dropna, fillna, limit, with_column_renamed,
3//! replace, cross_join, describe, subtract, intersect,
4//! sample, random_split, first, head, take, tail, is_empty, to_df.
5
6use super::DataFrame;
7use crate::column::expect_col;
8use crate::functions::SortOrder;
9use crate::type_coercion::{coerce_expr_pair, find_common_type, is_numeric_public};
10use crate::udfs;
11use polars::prelude::{
12    DataType, Expr, Float64Chunked, IntoLazy, IntoSeries, NamedFrom, PlSmallStr, PolarsError,
13    SchemaNamesAndDtypes, Selector, Series, UniqueKeepStrategy, col, len, lit, repeat,
14};
15use std::cell::RefCell;
16use std::collections::{HashMap, HashSet};
17
18/// Returns true if the expression tree contains a window expression (Expr::Over).
19/// Used to reject window functions in filter/WHERE (PySpark parity: AnalysisException).
20fn expr_contains_over(expr: &Expr) -> bool {
21    let found = RefCell::new(false);
22    let _ = expr.clone().try_map_expr(|e| {
23        if matches!(&e, Expr::Over { .. }) {
24            *found.borrow_mut() = true;
25        }
26        Ok(e)
27    });
28    found.into_inner()
29}
30
31/// Returns the set of column names referenced in the expression tree.
32fn expr_referenced_columns(expr: &Expr) -> HashSet<String> {
33    let refs = RefCell::new(HashSet::<String>::new());
34    let _ = expr.clone().try_map_expr(|e| {
35        if let Expr::Column(n) = &e {
36            refs.borrow_mut().insert(n.as_str().to_string());
37        }
38        Ok(e)
39    });
40    refs.into_inner()
41}
42
43/// Returns true if the expression tree contains a reference to the given column name (so we must not drop it before adding).
44fn expr_refs_column(expr: &Expr, column_name: &str) -> bool {
45    expr_referenced_columns(expr).contains(column_name)
46}
47
48/// If the expression contains an Explode node, return its input and options (for posexplode detection).
49fn find_explode_in_expr(expr: &Expr) -> Option<(Arc<Expr>, polars::prelude::ExplodeOptions)> {
50    if let Expr::Explode { input, options } = expr {
51        return Some((input.clone(), *options));
52    }
53    if let Expr::Alias(inner, _) = expr {
54        return find_explode_in_expr(inner.as_ref());
55    }
56    // Recurse into common wrappers so we find Explode inside StructField etc.
57    let out = RefCell::new(None);
58    let _ = expr.clone().try_map_expr(|e| {
59        if out.borrow().is_none() {
60            if let Expr::Explode { input, options } = &e {
61                out.borrow_mut().replace((input.clone(), *options));
62            }
63        }
64        Ok(e)
65    });
66    out.into_inner()
67}
68
69/// Replace a pure literal expr with a column-referencing expr so Polars produces correct row count.
70/// Polars lit() in select-only or empty-df contexts yields 1 row; we need N rows (or 0 for empty).
71fn expand_pure_literal_to_rows(expr: Expr, first_col: Option<&str>) -> Result<Expr, PolarsError> {
72    let (inner, alias): (Expr, Option<PlSmallStr>) = match &expr {
73        Expr::Alias(e, name) => (e.as_ref().clone(), Some(name.clone())),
74        _ => (expr.clone(), None),
75    };
76    // Expand string literals and typed null list literals so Polars produces correct row count.
77    let expanded: Option<Expr> = match &inner {
78        Expr::Literal(lv) if lv.get_datatype() == DataType::String => {
79            let lit_val = lv.extract_str().unwrap_or("").to_string();
80            let out_dtype = DataType::String;
81            Some(if let Some(fc) = first_col {
82                let fc = fc.to_string();
83                use polars::datatypes::Field;
84                col(PlSmallStr::from(fc.as_str())).map(
85                    move |c| expect_col(udfs::apply_literal_string_repeat(c, &lit_val)),
86                    move |_schema, _field| Ok(Field::new("literal".into(), out_dtype.clone())),
87                )
88            } else {
89                // No columns (e.g. empty schema): use repeat(lit(""), len()) so empty df yields 0 rows
90                repeat(lit(lit_val), len().cast(DataType::UInt32))
91            })
92        }
93        Expr::Cast { expr: e, dtype, .. } => {
94            // Special-case: cast(NULL as array<...>) should produce N null lists, not a single empty list.
95            if matches!(e.as_ref(), Expr::Literal(lv) if lv.is_null()) {
96                let out_dtype = match dtype {
97                    polars::prelude::DataTypeExpr::Literal(dt)
98                        if matches!(dt, DataType::List(_)) =>
99                    {
100                        dt.clone()
101                    }
102                    _ => return Ok(expr),
103                };
104                Some(if let Some(fc) = first_col {
105                    let fc = fc.to_string();
106                    let out_dtype_for_apply = out_dtype.clone();
107                    let out_dtype_for_field = out_dtype.clone();
108                    use polars::datatypes::Field;
109                    col(PlSmallStr::from(fc.as_str())).map(
110                        move |c| {
111                            expect_col(udfs::apply_literal_null_list_repeat(
112                                c,
113                                &out_dtype_for_apply,
114                            ))
115                        },
116                        move |_schema, _field| {
117                            Ok(Field::new("literal".into(), out_dtype_for_field.clone()))
118                        },
119                    )
120                } else {
121                    // No columns: best-effort; scalar NULL list is fine here.
122                    inner.clone()
123                })
124            } else {
125                None
126            }
127        }
128        _ => None,
129    };
130
131    let expanded = match expanded {
132        Some(e) => e,
133        None => return Ok(expr),
134    };
135    Ok(if let Some(name) = alias {
136        expanded.alias(name.as_str())
137    } else {
138        expanded
139    })
140}
141
142fn series_as_f64_ca(s: &Series, context: &str) -> Result<Float64Chunked, PolarsError> {
143    let s_f64 = s.cast(&DataType::Float64)?;
144    let ca = s_f64.f64().map_err(|_| {
145        PolarsError::ComputeError(format!("{}: need numeric/f64 column", context).into())
146    })?;
147    Ok(ca.clone())
148}
149use std::sync::Arc;
150
151/// Select columns (returns a new DataFrame). Preserves case_sensitive on result.
152pub fn select(
153    df: &DataFrame,
154    cols: Vec<&str>,
155    case_sensitive: bool,
156) -> Result<DataFrame, PolarsError> {
157    let resolved: Vec<String> = cols
158        .iter()
159        .map(|c| df.resolve_column_name(c))
160        .collect::<Result<Vec<_>, _>>()?;
161    let exprs: Vec<Expr> = resolved.iter().map(|s| col(s.as_str())).collect();
162    let lf = df.lazy_frame().select(&exprs);
163    Ok(super::DataFrame::from_lazy_with_options(lf, case_sensitive))
164}
165
166/// Select using column expressions (e.g. F.regexp_extract_all(...).alias("m")). Preserves case_sensitive.
167/// Column names in expressions are resolved per df's case sensitivity (PySpark parity).
168/// Duplicate output names are disambiguated with _1, _2, ... so select(col("num").cast("string"), col("num").cast("int")) works (issue #213).
169/// When exprs_already_resolved is true, skip resolve_expr_column_names (e.g. exprs from dotted select like "t1.id" -> col("id")) (#1230).
170pub fn select_with_exprs(
171    df: &DataFrame,
172    exprs: Vec<Expr>,
173    case_sensitive: bool,
174    exprs_already_resolved: bool,
175) -> Result<DataFrame, PolarsError> {
176    let exprs: Vec<Expr> = if exprs_already_resolved {
177        exprs
178    } else {
179        exprs
180            .into_iter()
181            .map(|e| df.resolve_expr_column_names(e))
182            .collect::<Result<Vec<_>, _>>()?
183    };
184    let exprs: Vec<Expr> = exprs
185        .into_iter()
186        .map(|e| df.coerce_string_numeric_comparisons(e))
187        .collect::<Result<Vec<_>, _>>()?;
188    let df_columns: HashSet<String> = df
189        .columns()
190        .ok()
191        .map(|c| c.into_iter().map(|s| s.as_str().to_string()).collect())
192        .unwrap_or_default();
193    let first_col = df_columns.iter().next().map(String::as_str);
194    let exprs: Vec<Expr> = exprs
195        .into_iter()
196        .map(|e| expand_pure_literal_to_rows(e.clone(), first_col).unwrap_or(e))
197        .collect();
198
199    // Detect posexplode: two exprs that each contain Explode(list.eval(...)) with same single list column (#1243).
200    type PosexplodeTarget = (
201        PlSmallStr,
202        polars::prelude::ExplodeOptions,
203        [(PlSmallStr, usize); 2], // (alias_pos, idx_pos), (alias_col, idx_col)
204    );
205    let posexplode_target: Option<PosexplodeTarget> = {
206        let mut by_col: HashMap<String, Vec<(polars::prelude::ExplodeOptions, String, usize)>> =
207            HashMap::new();
208        for (i, e) in exprs.iter().enumerate() {
209            let (inner, alias_name) = match e {
210                Expr::Alias(inner, name) => (inner.as_ref(), name.as_str().to_string()),
211                _ => continue,
212            };
213            if let Some((input, options)) = find_explode_in_expr(inner) {
214                let refs = expr_referenced_columns(input.as_ref());
215                // List.eval uses col("") for element context; only consider columns that exist in the frame.
216                let frame_refs: Vec<String> = refs.intersection(&df_columns).cloned().collect();
217                if frame_refs.len() == 1 {
218                    let list_col = frame_refs.into_iter().next().unwrap();
219                    by_col
220                        .entry(list_col.clone())
221                        .or_default()
222                        .push((options, alias_name, i));
223                }
224            }
225        }
226        by_col
227            .into_iter()
228            .find(|(_, v)| v.len() == 2)
229            .map(|(list_col, mut v)| {
230                v.sort_by_key(|(_, _, i)| *i);
231                let list_col = PlSmallStr::from(list_col.as_str());
232                let options = v[0].0;
233                (
234                    list_col,
235                    options,
236                    [
237                        (PlSmallStr::from(v[0].1.as_str()), v[0].2),
238                        (PlSmallStr::from(v[1].1.as_str()), v[1].2),
239                    ],
240                )
241            })
242    };
243
244    // Detect simple explode expressions (e.g. F.explode(col(\"scores\")).alias(\"score\"))
245    // and rewrite to use LazyFrame.explode. When alias != source column, add a copy then explode
246    // so the original list column is preserved (PySpark select(Name, Value, explode(Value).alias("ExplodedValue"))).
247    // Python Column.into_expr() does expr.alias(name), so explode(col).alias("x") becomes Alias(Alias(Explode(...), "x"), "x"); peel one more Alias.
248    let posexplode_pe_col = posexplode_target
249        .as_ref()
250        .map(|(lc, _, _)| format!("__pe_{}", lc.as_str()));
251
252    type ExplodeTarget = (
253        PlSmallStr,
254        Option<PlSmallStr>,
255        polars::prelude::ExplodeOptions,
256    );
257    let mut explode_target: Option<ExplodeTarget> = None;
258    let exprs: Vec<Expr> = exprs
259        .into_iter()
260        .enumerate()
261        .map(|(i, e)| {
262            if let (Some(pt), Some(pe_col)) = (&posexplode_target, &posexplode_pe_col) {
263                if i == pt.2[0].1 {
264                    return col(pe_col.as_str())
265                        .struct_()
266                        .field_by_name("pos")
267                        .alias(pt.2[0].0.as_str());
268                }
269                if i == pt.2[1].1 {
270                    return col(pe_col.as_str())
271                        .struct_()
272                        .field_by_name("col")
273                        .alias(pt.2[1].0.as_str());
274                }
275            }
276            match e {
277                Expr::Alias(inner, name) => {
278                    let inner_ref = inner.as_ref();
279                    let (explode_input, options): (
280                        Option<Arc<Expr>>,
281                        polars::prelude::ExplodeOptions,
282                    ) = if let Expr::Explode { input, options } = inner_ref {
283                        (Some(input.clone()), *options)
284                    } else if let Expr::Alias(inner2, _) = inner_ref {
285                        if let Expr::Explode { input, options } = inner2.as_ref() {
286                            (Some(input.clone()), *options)
287                        } else {
288                            (
289                                None,
290                                polars::prelude::ExplodeOptions {
291                                    empty_as_null: false,
292                                    keep_nulls: false,
293                                },
294                            )
295                        }
296                    } else {
297                        (
298                            None,
299                            polars::prelude::ExplodeOptions {
300                                empty_as_null: false,
301                                keep_nulls: false,
302                            },
303                        )
304                    };
305                    if let (Some(input), options) = (explode_input, options) {
306                        if let Expr::Column(col_name) = input.as_ref() {
307                            if explode_target.is_none() {
308                                let alias_name = if col_name.as_str() != name.as_str() {
309                                    Some(name.clone())
310                                } else {
311                                    None
312                                };
313                                explode_target = Some((col_name.clone(), alias_name, options));
314                            }
315                            let out_col = explode_target
316                                .as_ref()
317                                .and_then(|(_, a, _)| a.clone())
318                                .unwrap_or_else(|| col_name.clone());
319                            Expr::Alias(Arc::new(Expr::Column(out_col)), name)
320                        } else {
321                            Expr::Alias(inner, name)
322                        }
323                    } else {
324                        Expr::Alias(inner, name)
325                    }
326                }
327                Expr::Explode { input, options } => {
328                    if let Expr::Column(col_name) = input.as_ref() {
329                        if explode_target.is_none() {
330                            explode_target = Some((col_name.clone(), None, options));
331                        }
332                        let (_, alias_name, _) = explode_target.as_ref().unwrap();
333                        let out_col = alias_name.clone().unwrap_or_else(|| col_name.clone());
334                        Expr::Column(out_col)
335                    } else {
336                        Expr::Explode { input, options }
337                    }
338                }
339                other => other,
340            }
341        })
342        .collect();
343    let mut name_count: HashMap<String, u32> = HashMap::new();
344    let mut output_names: Vec<String> = Vec::new();
345    let exprs: Vec<Expr> = exprs
346        .into_iter()
347        .map(|e| {
348            let base_name = polars_plan::utils::expr_output_name(&e)
349                .map(|s| s.to_string())
350                .unwrap_or_else(|_| "_".to_string());
351            let count = name_count.entry(base_name.clone()).or_insert(0);
352            *count += 1;
353            let final_name = if *count == 1 {
354                base_name.clone()
355            } else {
356                format!("{}_{}", base_name, *count - 1)
357            };
358            output_names.push(final_name.clone());
359            if *count == 1 {
360                e
361            } else {
362                e.alias(final_name.as_str())
363            }
364        })
365        .collect();
366
367    // When every expression references no column from the frame, Polars select yields 1 row.
368    // Cross-join with a single key column to get N rows (PySpark parity).
369    let mut lf = df.lazy_frame();
370    let had_explode = explode_target.is_some() || posexplode_target.is_some();
371
372    // If we saw an explode expression on a single column, apply frame-level explode first so
373    // non-exploded columns are replicated correctly (PySpark explode parity).
374    // When alias != source (e.g. select(Name, Value, explode(Value).alias("ExplodedValue"))),
375    // add a copy column then explode it so the original list column is preserved.
376    if let Some((explode_col, alias_name, options)) = explode_target {
377        if let Some(alias) = &alias_name {
378            lf = lf.with_column(col(explode_col.as_str()).alias(alias.as_str()));
379            let selector = Selector::ByName {
380                names: Arc::from([alias.clone()]),
381                strict: true,
382            };
383            lf = lf.explode(selector, options);
384        } else {
385            let selector = Selector::ByName {
386                names: Arc::from([explode_col]),
387                strict: true,
388            };
389            lf = lf.explode(selector, options);
390        }
391    }
392    // Posexplode: add list-of-structs column and explode so non-exploded columns replicate (#1243).
393    if let Some((list_col, options, _)) = &posexplode_target {
394        let pe_col = format!("__pe_{}", list_col.as_str());
395        use polars::prelude::as_struct;
396        let pos_inner = (col("").cum_count(false) - lit(1i64)).alias("pos");
397        let val_inner = col("").alias("col");
398        let list_struct = col(list_col.as_str())
399            .list()
400            .eval(as_struct(vec![pos_inner, val_inner]));
401        lf = lf.with_column(list_struct.alias(pe_col.as_str()));
402        let selector = Selector::ByName {
403            names: Arc::from([PlSmallStr::from(pe_col.as_str())]),
404            strict: true,
405        };
406        lf = lf.explode(selector, *options);
407    }
408    // When we applied explode, the frame was already expanded; rewritten exprs may reference only
409    // the new column (e.g. "x"), so we must not use the no-col-refs cross_join path (it would
410    // wrongly cross-join and multiply rows).
411    let no_col_refs = !had_explode
412        && first_col.is_some()
413        && exprs.iter().all(|e| {
414            expr_referenced_columns(e)
415                .intersection(&df_columns)
416                .next()
417                .is_none()
418        });
419    let lf = if no_col_refs {
420        let first = first_col.unwrap();
421        let lf_key = lf.clone().select([col(first)]);
422        let lf_vals = lf.select(&exprs);
423        let joined = lf_key.cross_join(lf_vals, None);
424        let right_exprs: Vec<Expr> = output_names.iter().map(|n| col(n.as_str())).collect();
425        joined.select(right_exprs)
426    } else {
427        lf.select(&exprs)
428    };
429    // When input is Eager (e.g. createDataFrame), return Eager result so collect() uses
430    // schema order that matches columns (#1267). List-of-dicts createDataFrame first column
431    // null is fixed in Python binding (python_row_to_json).
432    if df.is_eager() {
433        let pl_df = lf.collect()?;
434        Ok(super::DataFrame::from_eager_with_options(
435            pl_df,
436            case_sensitive,
437        ))
438    } else {
439        Ok(super::DataFrame::from_lazy_with_options(lf, case_sensitive))
440    }
441}
442
443/// Select item: either a column name (str) or an expression (PySpark parity: select("a", col("b").alias("x"))).
444/// Fixes #645: select expects Column or str.
445#[derive(Clone)]
446pub enum SelectItem<'a> {
447    /// Column name; resolved per DataFrame case sensitivity.
448    ColumnName(&'a str),
449    /// Expression (e.g. from col("x").cast(...).alias("y")).
450    Expr(Expr),
451}
452
453/// Safe output name for struct field select: Polars can resolve dotted aliases as column lookups, so use a placeholder (no dot) and rename after select to the desired output name (e.g. "Person.name").
454fn struct_field_safe_alias(dotted_name: &str) -> String {
455    format!("__sf_{}", dotted_name.replace('.', "_"))
456}
457
458/// Select using a mix of column names and expressions. Preserves case_sensitive on result.
459pub fn select_items(
460    df: &DataFrame,
461    items: Vec<SelectItem<'_>>,
462    case_sensitive: bool,
463) -> Result<DataFrame, PolarsError> {
464    let mut exprs = Vec::with_capacity(items.len());
465    let mut rename_after: Vec<(String, String)> = Vec::new();
466    for item in items {
467        match item {
468            SelectItem::ColumnName(name) => {
469                // When multiple physical columns match by lowercase name (e.g. "name" and "NAME"
470                // after a join), we need PySpark-like behavior for ambiguous selects such as
471                // select("NaMe") (#297). We coalesce all matching physical columns and expose
472                // the result under the requested name so that:
473                // - rows that exist only on one side (e.g. right-join-only rows) still have a value, and
474                // - original physical columns remain accessible by their own names.
475                if let Ok(cols) = df.columns() {
476                    let name_lower = name.to_lowercase();
477                    let matches: Vec<String> = cols
478                        .iter()
479                        .filter(|c| c.to_lowercase() == name_lower)
480                        .cloned()
481                        .collect();
482                    if matches.len() > 1 {
483                        use polars::prelude::coalesce as pl_coalesce;
484                        let coalesce_exprs: Vec<Expr> =
485                            matches.iter().map(|m| col(m.as_str())).collect();
486                        let coalesced = pl_coalesce(&coalesce_exprs);
487                        exprs.push(coalesced.alias(name));
488                        continue;
489                    }
490                }
491                // #1055, #1076: Dot notation (e.g. "StructValue.e1") is struct field access.
492                // PySpark: select("StructValue.e1") yields output column "e1" (last segment), not full dotted name.
493                if name.contains('.') {
494                    let e = col(name);
495                    let resolved = df.resolve_expr_column_names(e)?;
496                    let coerced = df.coerce_string_numeric_comparisons(resolved)?;
497                    let safe = struct_field_safe_alias(name);
498                    let last_segment = name.split('.').next_back().unwrap_or(name);
499                    rename_after.push((safe.clone(), last_segment.to_string()));
500                    exprs.push(coerced.alias(safe));
501                } else {
502                    df.check_ambiguous_unqualified(name)?;
503                    let resolved = df.resolve_column_name(name)?;
504                    // Explicit alias so output name is stable when mixed with window exprs (#1267).
505                    exprs.push(col(resolved).alias(name));
506                }
507            }
508            SelectItem::Expr(e) => {
509                let name_for_alias = if let polars::prelude::Expr::Column(n) = &e {
510                    let s = n.as_str();
511                    if s.contains('.') {
512                        Some(s.to_string())
513                    } else {
514                        None
515                    }
516                } else if let polars::prelude::Expr::Alias(_, n) = &e {
517                    let s = n.as_str();
518                    if s.contains('.') {
519                        Some(s.to_string())
520                    } else {
521                        None
522                    }
523                } else {
524                    None
525                };
526                let resolved = df.resolve_expr_column_names(e)?;
527                let coerced = df.coerce_string_numeric_comparisons(resolved)?;
528                if let Some(name) = name_for_alias {
529                    let safe = struct_field_safe_alias(&name);
530                    let last_segment = name.split('.').next_back().unwrap_or(&name).to_string();
531                    rename_after.push((safe.clone(), last_segment));
532                    exprs.push(coerced.alias(safe));
533                } else {
534                    exprs.push(coerced);
535                }
536            }
537        }
538    }
539    let mut result = select_with_exprs(df, exprs, case_sensitive, false)?;
540    for (from, to) in rename_after {
541        result = result.with_column_renamed(&from, &to)?;
542    }
543    Ok(result)
544}
545
546/// Filter rows using a Polars expression. Preserves case_sensitive on result.
547/// Column names in the condition are resolved per df's case sensitivity (PySpark parity).
548/// #646: Coerce predicate to Boolean so Polars never receives a non-Boolean filter (e.g. string-involving predicates).
549/// Uses expr_coerce_to_boolean so string columns cast via "true"/"false"/"1"/"0" (PySpark parity).
550/// Filter rows using a Polars expression. Preserves case_sensitive on result.
551/// Column names in the condition are resolved per df's case sensitivity (PySpark parity).
552/// #646: Coerce predicate to Boolean so Polars never receives a non-Boolean filter (e.g. string-involving predicates).
553/// Uses expr_coerce_to_boolean so string columns cast via "true"/"false"/"1"/"0" (PySpark parity).
554/// Rejects window expressions in the condition (PySpark parity: "window functions inside WHERE clause").
555pub fn filter(
556    df: &DataFrame,
557    condition: Expr,
558    case_sensitive: bool,
559) -> Result<DataFrame, PolarsError> {
560    // Case-insensitive mode (default): when a predicate only references columns that are no
561    // longer present on this DataFrame (e.g. df.select("col1").filter(col("col2").isNotNull())),
562    // treat the filter as a no-op on the projected schema. This matches PySpark behavior where
563    // the predicate is effectively pushed below the projection while the visible columns stay
564    // unchanged (issue #1135 / #158). In case-sensitive mode we keep raising unresolved_column
565    // so wrong-case filters continue to fail as expected.
566    //
567    // Do not treat alias-qualified or dotted names (e.g. "o.amount", "t1.id", "struct.field")
568    // as missing here: those may still resolve via suffix/struct-field logic in
569    // resolve_expr_column_names and are required for join alias and case-insensitive parity
570    // (issue #1325).
571    if !case_sensitive {
572        if let Ok(cols) = df.columns() {
573            let df_cols: HashSet<String> = cols.into_iter().map(|c| c.to_lowercase()).collect();
574            let referenced = expr_referenced_columns(&condition);
575            if !referenced.is_empty() {
576                let all_missing = referenced.iter().all(|name| {
577                    if name.contains('.') {
578                        // Alias-qualified or dotted reference: let resolve_expr_column_names
579                        // attempt to resolve it (join aliases, struct fields, etc.).
580                        false
581                    } else {
582                        !df_cols.contains(&name.to_lowercase())
583                    }
584                });
585                if all_missing {
586                    return Ok(df.clone());
587                }
588            }
589        }
590    }
591    if expr_contains_over(&condition) {
592        return Err(PolarsError::InvalidOperation(
593            "it is not allowed to use window functions inside WHERE clause".into(),
594        ));
595    }
596    let condition = df.resolve_expr_column_names(condition)?;
597    let condition = df.coerce_string_numeric_comparisons(condition)?;
598    // #972: expr_coerce_to_boolean already yields Boolean (and handles string via apply_string_to_boolean).
599    // Do not call .cast(DataType::Boolean) here — Polars does not support casting Utf8View to Boolean.
600    let condition = crate::functions::expr_coerce_to_boolean(condition);
601    let lf = df.lazy_frame().filter(condition);
602    Ok(super::DataFrame::from_lazy_with_options(lf, case_sensitive))
603}
604
605/// Add or replace a column. Handles deferred rand/randn and Python UDF (UdfCall).
606pub fn with_column(
607    df: &DataFrame,
608    column_name: &str,
609    column: &crate::column::Column,
610    case_sensitive: bool,
611) -> Result<DataFrame, PolarsError> {
612    // Python UDF: eager execution at UDF boundary
613    if let Some(deferred) = column.deferred {
614        match deferred {
615            crate::column::DeferredRandom::Rand(seed) => {
616                let pl_df = df.collect_inner()?;
617                let mut pl_df = pl_df.as_ref().clone();
618                let n = pl_df.height();
619                let series = crate::udfs::series_rand_n(column_name, n, seed);
620                pl_df.with_column(series.into())?;
621                return Ok(super::DataFrame::from_polars_with_options(
622                    pl_df,
623                    case_sensitive,
624                ));
625            }
626            crate::column::DeferredRandom::Randn(seed) => {
627                let pl_df = df.collect_inner()?;
628                let mut pl_df = pl_df.as_ref().clone();
629                let n = pl_df.height();
630                let series = crate::udfs::series_randn_n(column_name, n, seed);
631                pl_df.with_column(series.into())?;
632                return Ok(super::DataFrame::from_polars_with_options(
633                    pl_df,
634                    case_sensitive,
635                ));
636            }
637        }
638    }
639    let mut expr = df.resolve_expr_column_names(column.expr().clone())?;
640    expr = df.coerce_string_numeric_comparisons(expr)?;
641    // Issue #1115: array() with mixed Boolean and other types must raise (PySpark rejects bool + other).
642    // Reject only bool + (string or numeric); allow string+numeric (PySpark coerces) and numeric mix.
643    if column.is_array_expr {
644        let refs = expr_referenced_columns(&expr);
645        if refs.len() >= 2 {
646            let mut has_bool = false;
647            let mut has_non_bool = false;
648            for name in refs.iter() {
649                if let Some(dt) = df.get_column_dtype(name) {
650                    match dt {
651                        DataType::Boolean => has_bool = true,
652                        _ => has_non_bool = true,
653                    }
654                }
655            }
656            if has_bool && has_non_bool {
657                return Err(PolarsError::ComputeError(
658                    "array() does not support mixed BooleanType with other element types; cast columns to a common type first".into(),
659                ));
660            }
661        }
662    }
663    if let Ok(cols) = df.columns() {
664        let first_col = cols.into_iter().next();
665        if let Ok(expanded) = expand_pure_literal_to_rows(expr.clone(), first_col.as_deref()) {
666            expr = expanded;
667        }
668    }
669    // PySpark withColumn replaces if column exists; avoid duplicate output name (e.g. CTE self-join).
670    // If the new expr references the column being replaced, replace in one select so the column stays in scope.
671    // When replacing with explode(col(name)), use LazyFrame.explode() so other columns are replicated (PySpark parity).
672    let lf = df.lazy_frame();
673    let lf = if let Ok(existing) = df.resolve_column_name(column_name) {
674        let all = df.columns()?;
675        let existing_str = existing.as_str();
676        if expr_refs_column(&expr, existing_str) {
677            let inner = match &expr {
678                Expr::Alias(e, _) => e.as_ref(),
679                e => e,
680            };
681            // Use LazyFrame.explode() when replacing a column with explode(that column) so other columns replicate.
682            let refs = expr_referenced_columns(inner);
683            let use_frame_explode = refs.len() == 1
684                && refs.contains(existing_str)
685                && matches!(inner, Expr::Explode { .. });
686            if use_frame_explode {
687                let options = match inner {
688                    Expr::Explode { options, .. } => *options,
689                    _ => unreachable!(),
690                };
691                let selector = Selector::ByName {
692                    names: Arc::from([PlSmallStr::from(existing_str)]),
693                    strict: true,
694                };
695                lf.explode(selector, options)
696            } else {
697                // Replace in one shot: select all columns but use expr for the replaced name.
698                let select_exprs: Vec<Expr> = all
699                    .iter()
700                    .map(|n| {
701                        if n.as_str() == existing_str {
702                            expr.clone().alias(column_name)
703                        } else {
704                            col(n.as_str())
705                        }
706                    })
707                    .collect();
708                lf.select(select_exprs)
709            }
710        } else {
711            let to_keep: Vec<Expr> = all
712                .iter()
713                .filter(|n| n.as_str() != existing_str)
714                .map(|n| col(n.as_str()))
715                .collect();
716            lf.select(&to_keep).with_column(expr.alias(column_name))
717        }
718    } else {
719        // Adding a new column. If expr is explode(some_column), use frame-level explode so row count matches (PySpark parity).
720        // Preserve the original list column: add a copy with column_name, then explode it (so Name/Value stay, ExplodedValue expands).
721        let inner = match &expr {
722            Expr::Alias(e, _) => e.as_ref(),
723            e => e,
724        };
725        if let Expr::Explode {
726            input,
727            options: explode_opts,
728        } = inner
729        {
730            if let Expr::Column(explode_col) = input.as_ref() {
731                let refs = expr_referenced_columns(inner);
732                if refs.len() == 1 {
733                    let explode_col_str = explode_col.as_str();
734                    if df.resolve_column_name(explode_col_str).is_ok() {
735                        // Add new column as copy of list, then explode it so original column is preserved.
736                        let lf_with_copy =
737                            lf.with_column(col(explode_col.as_str()).alias(column_name));
738                        let selector = Selector::ByName {
739                            names: Arc::from([PlSmallStr::from(column_name)]),
740                            strict: true,
741                        };
742                        lf_with_copy.explode(selector, *explode_opts)
743                    } else {
744                        lf.with_column(expr.alias(column_name))
745                    }
746                } else {
747                    lf.with_column(expr.alias(column_name))
748                }
749            } else {
750                lf.with_column(expr.alias(column_name))
751            }
752        } else {
753            lf.with_column(expr.alias(column_name))
754        }
755    };
756    Ok(super::DataFrame::from_lazy_with_options(lf, case_sensitive))
757}
758
759/// Order by columns (sort). Preserves case_sensitive on result.
760/// Note: We do not coerce string sort columns to Float64 here (unlike filter/join) so that
761/// genuine string sort (e.g. by name) is preserved. A future improvement could apply
762/// coercion only when the column is numeric-looking (e.g. try_parse to number).
763pub fn order_by(
764    df: &DataFrame,
765    column_names: Vec<&str>,
766    ascending: Vec<bool>,
767    case_sensitive: bool,
768) -> Result<DataFrame, PolarsError> {
769    use polars::prelude::*;
770    let mut asc = ascending;
771    while asc.len() < column_names.len() {
772        asc.push(true);
773    }
774    asc.truncate(column_names.len());
775    // Before resolving column names, enforce ambiguous-column semantics for
776    // unqualified user-facing orderBy references, just like select/select_items
777    // do via check_ambiguous_unqualified (#1393).
778    for name in &column_names {
779        df.check_ambiguous_unqualified(name)?;
780    }
781    let resolved: Vec<String> = column_names
782        .iter()
783        .map(|c| df.resolve_column_name(c))
784        .collect::<Result<Vec<_>, _>>()?;
785    let exprs: Vec<Expr> = resolved.iter().map(|s| col(s.as_str())).collect();
786    let descending: Vec<bool> = asc.iter().map(|&a| !a).collect();
787    // PySpark default: nulls last for both ASC and DESC (issue #1052 / #327 test expectation).
788    let nulls_last: Vec<bool> = vec![true; column_names.len()];
789    let lf = df.lazy_frame().sort_by_exprs(
790        exprs,
791        SortMultipleOptions::new()
792            .with_order_descending_multi(descending)
793            .with_nulls_last_multi(nulls_last),
794    );
795    Ok(super::DataFrame::from_lazy_with_options(lf, case_sensitive))
796}
797
798/// Order by sort expressions (asc/desc with nulls_first/last). Preserves case_sensitive on result.
799/// Column names in sort expressions are resolved per df's case sensitivity (PySpark parity).
800/// #1261: Coerce string–numeric in sort expressions (e.g. orderBy(col("s") / 10)) so string columns
801/// used in arithmetic are cast to numeric for sorting.
802pub fn order_by_exprs(
803    df: &DataFrame,
804    sort_orders: Vec<SortOrder>,
805    case_sensitive: bool,
806) -> Result<DataFrame, PolarsError> {
807    use polars::prelude::*;
808    if sort_orders.is_empty() {
809        return Ok(super::DataFrame::from_lazy_with_options(
810            df.lazy_frame(),
811            case_sensitive,
812        ));
813    }
814    let exprs: Vec<Expr> = sort_orders
815        .iter()
816        .map(|s| {
817            let e = df.resolve_expr_column_names(s.expr().clone())?;
818            df.coerce_string_numeric_comparisons(e)
819        })
820        .collect::<Result<Vec<_>, _>>()?;
821    let descending: Vec<bool> = sort_orders.iter().map(|s| s.descending).collect();
822    let nulls_last: Vec<bool> = sort_orders.iter().map(|s| s.nulls_last).collect();
823    let opts = SortMultipleOptions::new()
824        .with_order_descending_multi(descending)
825        .with_nulls_last_multi(nulls_last);
826    let lf = df.lazy_frame().sort_by_exprs(exprs, opts);
827    Ok(super::DataFrame::from_lazy_with_options(lf, case_sensitive))
828}
829
830/// Union (unionAll): stack another DataFrame vertically.
831/// When column names match (set equality, order may differ): use left's order, reorder right by name (#551).
832/// When column count matches but names differ: union by position (PySpark/createDataFrame parity #1018):
833/// align by index, result uses left's column names.
834/// When column types differ, both sides are coerced to a common type.
835pub fn union(
836    left: &DataFrame,
837    right: &DataFrame,
838    case_sensitive: bool,
839) -> Result<DataFrame, PolarsError> {
840    let left_names = left.columns()?;
841    let right_names = right.columns()?;
842    if left_names.len() != right_names.len() {
843        return Err(PolarsError::InvalidOperation(
844            format!(
845                "union: column count must match. Left: {:?}, Right: {:?}",
846                left_names, right_names
847            )
848            .into(),
849        ));
850    }
851    let right_names_set: std::collections::HashSet<_> = if case_sensitive {
852        right_names.iter().cloned().collect()
853    } else {
854        right_names.iter().map(|s| s.to_lowercase()).collect()
855    };
856    let names_match = left_names.iter().all(|n| {
857        let key = if case_sensitive {
858            n.clone()
859        } else {
860            n.to_lowercase()
861        };
862        right_names_set.contains(&key)
863    });
864
865    let debug_union = std::env::var("SPARKLESS_DEBUG_UNION").as_deref() == Ok("1");
866    let (left_exprs, right_exprs) = if names_match {
867        // Same set of names: use left's order for both sides (reorder right by name).
868        let mut left_exprs = Vec::with_capacity(left_names.len());
869        let mut right_exprs = Vec::with_capacity(right_names.len());
870        for name in &left_names {
871            let resolved_left = left.resolve_column_name(name)?;
872            let resolved_right = right.resolve_column_name(name)?;
873            let left_dtype = left.get_column_dtype(name).unwrap_or(DataType::Null);
874            let right_dtype = right.get_column_dtype(name).unwrap_or(DataType::Null);
875            let mut target = if left_dtype == DataType::Null {
876                right_dtype.clone()
877            } else if right_dtype == DataType::Null || left_dtype == right_dtype {
878                left_dtype.clone()
879            } else {
880                find_common_type(&left_dtype, &right_dtype)?
881            };
882            // Issue #1262: when one side is String and the other numeric, coerce to String
883            // (PySpark union behavior); ensure we cast even if get_column_dtype disagrees.
884            if (left_dtype == DataType::String && is_numeric_public(&right_dtype))
885                || (right_dtype == DataType::String && is_numeric_public(&left_dtype))
886            {
887                target = DataType::String;
888            }
889            let need_coerce = left_dtype != target || right_dtype != target;
890            if debug_union {
891                eprintln!(
892                    "[union #1262] name={:?} left_dtype={:?} right_dtype={:?} target={:?} need_coerce={}",
893                    name, left_dtype, right_dtype, target, need_coerce
894                );
895            }
896            let left_expr = if need_coerce {
897                col(resolved_left.as_str()).cast(target.clone())
898            } else {
899                col(resolved_left.as_str())
900            };
901            let right_expr = if need_coerce {
902                col(resolved_right.as_str()).cast(target)
903            } else {
904                col(resolved_right.as_str())
905            };
906            left_exprs.push(left_expr.alias(name.as_str()));
907            right_exprs.push(right_expr.alias(name.as_str()));
908        }
909        (left_exprs, right_exprs)
910    } else {
911        // #1018: Union by position — same column count, different names; align by index, use left's names.
912        let mut left_exprs = Vec::with_capacity(left_names.len());
913        let mut right_exprs = Vec::with_capacity(right_names.len());
914        for (i, left_name) in left_names.iter().enumerate() {
915            let right_name = right_names.get(i).ok_or_else(|| {
916                PolarsError::InvalidOperation("union by position: index out of range".into())
917            })?;
918            let resolved_left = left.resolve_column_name(left_name)?;
919            let resolved_right = right.resolve_column_name(right_name)?;
920            let left_dtype = left.get_column_dtype(left_name).unwrap_or(DataType::Null);
921            let right_dtype = right.get_column_dtype(right_name).unwrap_or(DataType::Null);
922            let mut target = if left_dtype == DataType::Null {
923                right_dtype.clone()
924            } else if right_dtype == DataType::Null || left_dtype == right_dtype {
925                left_dtype.clone()
926            } else {
927                find_common_type(&left_dtype, &right_dtype)?
928            };
929            if (left_dtype == DataType::String && is_numeric_public(&right_dtype))
930                || (right_dtype == DataType::String && is_numeric_public(&left_dtype))
931            {
932                target = DataType::String;
933            }
934            let need_coerce = left_dtype != target || right_dtype != target;
935            if debug_union {
936                eprintln!(
937                    "[union #1262] left_name={:?} right_name={:?} left_dtype={:?} right_dtype={:?} target={:?} need_coerce={}",
938                    left_name, right_name, left_dtype, right_dtype, target, need_coerce
939                );
940            }
941            let left_expr = if need_coerce {
942                col(resolved_left.as_str()).cast(target.clone())
943            } else {
944                col(resolved_left.as_str())
945            };
946            let right_expr = if need_coerce {
947                col(resolved_right.as_str()).cast(target)
948            } else {
949                col(resolved_right.as_str())
950            };
951            left_exprs.push(left_expr.alias(left_name.as_str()));
952            right_exprs.push(right_expr.alias(left_name.as_str()));
953        }
954        (left_exprs, right_exprs)
955    };
956
957    let lf1 = left.lazy_frame().select(&left_exprs);
958    let lf2 = right.lazy_frame().select(&right_exprs);
959    // Collect then vstack so result schema is the coerced schema (Issue #1262: LazyFrame concat
960    // can yield wrong schema for collect_schema(); eager vstack preserves cast result).
961    let mut out = lf1.collect()?;
962    let df2 = lf2.collect()?;
963    if debug_union {
964        eprintln!(
965            "[union #1262] after lf1.collect() schema: {:?}",
966            out.schema().iter_names_and_dtypes().collect::<Vec<_>>()
967        );
968    }
969    out.vstack_mut(&df2)?;
970    if debug_union {
971        eprintln!(
972            "[union #1262] after vstack schema: {:?}",
973            out.schema().iter_names_and_dtypes().collect::<Vec<_>>()
974        );
975    }
976    Ok(super::DataFrame::from_eager_with_options(
977        out,
978        case_sensitive,
979    ))
980}
981
982/// Union by name: stack vertically, aligning columns by name.
983/// When allow_missing_columns is true: result has all columns from both sides (missing filled with null).
984/// When false: result has only left columns; right must have all left columns.
985/// When same-named columns have different types (e.g. String vs Int64), coerces to a common type (PySpark parity #603).
986pub fn union_by_name(
987    left: &DataFrame,
988    right: &DataFrame,
989    allow_missing_columns: bool,
990    case_sensitive: bool,
991) -> Result<DataFrame, PolarsError> {
992    use crate::type_coercion::find_common_type;
993    use polars::prelude::*;
994
995    let left_names = left.columns()?;
996    let right_names = right.columns()?;
997    let contains = |names: &[String], name: &str| -> bool {
998        if case_sensitive {
999            names.iter().any(|n| n.as_str() == name)
1000        } else {
1001            let name_lower = name.to_lowercase();
1002            names
1003                .iter()
1004                .any(|n| n.as_str().to_lowercase() == name_lower)
1005        }
1006    };
1007    let resolve = |names: &[String], name: &str| -> Option<String> {
1008        if case_sensitive {
1009            names.iter().find(|n| n.as_str() == name).cloned()
1010        } else {
1011            let name_lower = name.to_lowercase();
1012            names
1013                .iter()
1014                .find(|n| n.as_str().to_lowercase() == name_lower)
1015                .cloned()
1016        }
1017    };
1018    let all_columns: Vec<String> = if allow_missing_columns {
1019        let mut out = left_names.clone();
1020        for r in &right_names {
1021            if !contains(&out, r.as_str()) {
1022                out.push(r.clone());
1023            }
1024        }
1025        out
1026    } else {
1027        left_names.clone()
1028    };
1029    // Per-column common type for coercion when left/right types differ (#603).
1030    let mut left_exprs: Vec<Expr> = Vec::with_capacity(all_columns.len());
1031    let mut right_exprs: Vec<Expr> = Vec::with_capacity(all_columns.len());
1032    for c in &all_columns {
1033        let left_has = resolve(&left_names, c.as_str());
1034        let right_has = resolve(&right_names, c.as_str());
1035        let left_dtype = left_has.as_ref().and_then(|r| left.get_column_dtype(r));
1036        let right_dtype = right_has.as_ref().and_then(|r| right.get_column_dtype(r));
1037        // When both sides have the column and types differ, use shared coercion helper.
1038        if let (Some(l), Some(r)) = (&left_has, &right_has) {
1039            if let (Some(lt), Some(rt)) = (&left_dtype, &right_dtype) {
1040                if lt != rt {
1041                    let (le, re) = coerce_expr_pair(l, r, lt, rt, c).map_err(|e| {
1042                        PolarsError::ComputeError(
1043                            format!("union_by_name: column '{}' type coercion: {}", c, e).into(),
1044                        )
1045                    })?;
1046                    left_exprs.push(le);
1047                    right_exprs.push(re);
1048                    continue;
1049                }
1050            }
1051        }
1052        // #613 / #603: When both sides have dtypes, use a common type helper. When only one
1053        // side has a dtype, keep that dtype so columns that exist on only one side (and are
1054        // null on the other) preserve their natural type (e.g. Int64 id, Double aggregates).
1055        let common_dtype = match (&left_dtype, &right_dtype) {
1056            (Some(lt), Some(rt)) if lt != rt => find_common_type(lt, rt).map_err(|e| {
1057                PolarsError::ComputeError(
1058                    format!("union_by_name: column '{}' type coercion: {}", c, e).into(),
1059                )
1060            })?,
1061            (Some(lt), Some(_)) => lt.clone(),
1062            // One side unknown: keep the known dtype. If the known side is String, we still
1063            // get String here; if it's numeric, we avoid upcasting to String and changing
1064            // semantics for columns that are null on the other side.
1065            (Some(lt), None) | (None, Some(lt)) => lt.clone(),
1066            (None, None) => polars::prelude::DataType::Null,
1067        };
1068        let left_expr = match &left_has {
1069            Some(r) => col(r.as_str()).cast(common_dtype.clone()).alias(c.as_str()),
1070            None => polars::prelude::lit(polars::prelude::NULL)
1071                .cast(common_dtype.clone())
1072                .alias(c.as_str()),
1073        };
1074        left_exprs.push(left_expr);
1075        let right_expr = match &right_has {
1076            Some(r) => col(r.as_str()).cast(common_dtype.clone()).alias(c.as_str()),
1077            None if allow_missing_columns => polars::prelude::lit(polars::prelude::NULL)
1078                .cast(common_dtype)
1079                .alias(c.as_str()),
1080            None => {
1081                return Err(PolarsError::InvalidOperation(
1082                    format!(
1083                        "union_by_name: column '{}' missing in right DataFrame (allow_missing_columns=False)",
1084                        c
1085                    )
1086                    .into(),
1087                ));
1088            }
1089        };
1090        right_exprs.push(right_expr);
1091    }
1092    let lf1 = left.lazy_frame().select(&left_exprs);
1093    let lf2 = right.lazy_frame().select(&right_exprs);
1094    let out = polars::prelude::concat([lf1, lf2], UnionArgs::default())?;
1095    Ok(super::DataFrame::from_lazy_with_options(
1096        out,
1097        case_sensitive,
1098    ))
1099}
1100
1101/// Distinct: drop duplicate rows (all columns or subset).
1102pub fn distinct(
1103    df: &DataFrame,
1104    subset: Option<Vec<&str>>,
1105    case_sensitive: bool,
1106) -> Result<DataFrame, PolarsError> {
1107    let subset_names: Option<Vec<String>> = subset
1108        .map(|cols| {
1109            cols.iter()
1110                .map(|s| df.resolve_column_name(s))
1111                .collect::<Result<Vec<_>, _>>()
1112        })
1113        .transpose()?;
1114    let subset_selector: Option<Selector> = subset_names.map(|names| Selector::ByName {
1115        names: Arc::from(names.into_iter().map(PlSmallStr::from).collect::<Vec<_>>()),
1116        strict: false,
1117    });
1118    let lf = df
1119        .lazy_frame()
1120        .unique(subset_selector, UniqueKeepStrategy::First);
1121    Ok(super::DataFrame::from_lazy_with_options(lf, case_sensitive))
1122}
1123
1124/// Drop one or more columns.
1125pub fn drop(
1126    df: &DataFrame,
1127    columns: Vec<&str>,
1128    case_sensitive: bool,
1129) -> Result<DataFrame, PolarsError> {
1130    let resolved: Vec<String> = columns
1131        .iter()
1132        .map(|c| df.resolve_column_name(c))
1133        .collect::<Result<Vec<_>, _>>()?;
1134    let all_names = df.columns()?;
1135    let to_keep: Vec<Expr> = all_names
1136        .iter()
1137        .filter(|n| !resolved.iter().any(|r| r == n.as_str()))
1138        .map(|n| col(n.as_str()))
1139        .collect();
1140    let lf = df.lazy_frame().select(&to_keep);
1141    Ok(super::DataFrame::from_lazy_with_options(lf, case_sensitive))
1142}
1143
1144/// Drop rows with nulls (all columns or subset). PySpark na.drop(subset, how, thresh).
1145/// - how: "any" (default) = drop if any null in subset; "all" = drop only if all null in subset.
1146/// - thresh: if set, keep row if it has at least this many non-null values in subset (overrides how).
1147pub fn dropna(
1148    df: &DataFrame,
1149    subset: Option<Vec<&str>>,
1150    how: &str,
1151    thresh: Option<usize>,
1152    case_sensitive: bool,
1153) -> Result<DataFrame, PolarsError> {
1154    use polars::prelude::*;
1155    let cols: Vec<String> = match &subset {
1156        Some(c) => c
1157            .iter()
1158            .map(|n| df.resolve_column_name(n))
1159            .collect::<Result<Vec<_>, _>>()?,
1160        None => df.columns()?,
1161    };
1162    let col_exprs: Vec<Expr> = cols.iter().map(|c| col(c.as_str())).collect();
1163    let base_lf = df.lazy_frame();
1164    let lf = if let Some(n) = thresh {
1165        // Keep row if number of non-null in subset >= n
1166        let count_expr: Expr = col_exprs
1167            .iter()
1168            .map(|e| e.clone().is_not_null().cast(DataType::Int32))
1169            .fold(lit(0i32), |a, b| a + b);
1170        base_lf.filter(count_expr.gt_eq(lit(n as i32)))
1171    } else if how.eq_ignore_ascii_case("all") {
1172        // Drop only when all subset columns are null → keep when any is not null
1173        let any_not_null: Expr = col_exprs
1174            .into_iter()
1175            .map(|e| e.is_not_null())
1176            .fold(lit(false), |a, b| a.or(b));
1177        base_lf.filter(any_not_null)
1178    } else {
1179        // how == "any" (default): drop if any null in subset
1180        let subset_selector = Selector::ByName {
1181            names: Arc::from(
1182                cols.iter()
1183                    .map(|s| PlSmallStr::from(s.as_str()))
1184                    .collect::<Vec<_>>(),
1185            ),
1186            strict: false,
1187        };
1188        base_lf.drop_nulls(Some(subset_selector))
1189    };
1190    Ok(super::DataFrame::from_lazy_with_options(lf, case_sensitive))
1191}
1192
1193/// Fill nulls with a literal expression. If subset is Some, only those columns are filled; else all.
1194/// Casts fill value to each column's dtype to preserve type (e.g. fill 0 -> int 0, not string "0").
1195/// PySpark na.fill(value, subset=...).
1196pub fn fillna(
1197    df: &DataFrame,
1198    value_expr: Expr,
1199    subset: Option<Vec<&str>>,
1200    case_sensitive: bool,
1201) -> Result<DataFrame, PolarsError> {
1202    use polars::prelude::*;
1203    let exprs: Vec<Expr> = match subset {
1204        Some(cols) => cols
1205            .iter()
1206            .map(|n| {
1207                let resolved = df.resolve_column_name(n)?;
1208                let fill = match df.get_column_dtype(resolved.as_str()) {
1209                    Some(dt) => value_expr.clone().cast(dt),
1210                    None => value_expr.clone(),
1211                };
1212                Ok(col(resolved.as_str()).fill_null(fill))
1213            })
1214            .collect::<Result<Vec<_>, PolarsError>>()?,
1215        None => df
1216            .columns()?
1217            .iter()
1218            .map(|n| {
1219                let fill = match df.get_column_dtype(n) {
1220                    Some(dt) => value_expr.clone().cast(dt),
1221                    None => value_expr.clone(),
1222                };
1223                col(n.as_str()).fill_null(fill)
1224            })
1225            .collect(),
1226    };
1227    let lf = df.lazy_frame().with_columns(exprs);
1228    Ok(super::DataFrame::from_lazy_with_options(lf, case_sensitive))
1229}
1230
1231/// Limit: return first n rows.
1232pub fn limit(df: &DataFrame, n: usize, case_sensitive: bool) -> Result<DataFrame, PolarsError> {
1233    // limit is a transformation: slice(0, n) on lazy
1234    let lf = df.lazy_frame().slice(0, n as u32);
1235    Ok(super::DataFrame::from_lazy_with_options(lf, case_sensitive))
1236}
1237
1238/// Rename a column (old_name -> new_name).
1239pub fn with_column_renamed(
1240    df: &DataFrame,
1241    old_name: &str,
1242    new_name: &str,
1243    case_sensitive: bool,
1244) -> Result<DataFrame, PolarsError> {
1245    match df.resolve_column_name(old_name) {
1246        Ok(resolved) => {
1247            let lf = df
1248                .lazy_frame()
1249                .rename([resolved.as_str()], [new_name], true);
1250            Ok(super::DataFrame::from_lazy_with_options(lf, case_sensitive))
1251        }
1252        // PySpark parity: renaming a non-existent column is a no-op.
1253        Err(PolarsError::ColumnNotFound(_)) => Ok(df.clone()),
1254        Err(e) => Err(e),
1255    }
1256}
1257
1258/// Replace values in a column: where column == old_value, use new_value. PySpark replace (single column).
1259/// Coerces the equality so string–numeric comparisons (e.g. int column vs string literal) work like PySpark.
1260pub fn replace(
1261    df: &DataFrame,
1262    column_name: &str,
1263    old_value: Expr,
1264    new_value: Expr,
1265    case_sensitive: bool,
1266) -> Result<DataFrame, PolarsError> {
1267    use polars::prelude::*;
1268    let resolved = df.resolve_column_name(column_name)?;
1269    let eq_expr = col(resolved.as_str()).eq(old_value);
1270    let coerced_eq = df.coerce_string_numeric_comparisons(eq_expr)?;
1271    let repl = when(coerced_eq)
1272        .then(new_value)
1273        .otherwise(col(resolved.as_str()));
1274    let lf = df.lazy_frame().with_column(repl.alias(resolved.as_str()));
1275    Ok(super::DataFrame::from_lazy_with_options(lf, case_sensitive))
1276}
1277
1278/// Cross join: cartesian product of two DataFrames. PySpark crossJoin.
1279/// Reorders columns so common names (e.g. dept_id) come first on each side. Renames right-side
1280/// columns that duplicate left names with _right suffix so result has unique column names (#1049).
1281pub fn cross_join(
1282    left: &DataFrame,
1283    right: &DataFrame,
1284    case_sensitive: bool,
1285) -> Result<DataFrame, PolarsError> {
1286    use polars::prelude::col;
1287    let left_names = left.columns()?;
1288    let right_names = right.columns()?;
1289    let right_set: std::collections::HashSet<&str> =
1290        right_names.iter().map(|s| s.as_str()).collect();
1291    let left_set: std::collections::HashSet<&str> = left_names.iter().map(|s| s.as_str()).collect();
1292    // Put columns that exist on both sides first (e.g. dept_id), then rest (PySpark cross order).
1293    let left_ordered = order_columns_common_first(&left_names, &right_set);
1294    let right_ordered = order_columns_common_first(&right_names, &left_set);
1295    let exprs_left: Vec<_> = left_ordered.iter().map(|s| col(*s)).collect();
1296    // Suffix right columns that duplicate left so result has unique names (parity fixture comparison).
1297    let exprs_right: Vec<_> = right_ordered
1298        .iter()
1299        .map(|s| {
1300            if left_set.contains(*s) {
1301                col(*s).alias(format!("{}_right", s))
1302            } else {
1303                col(*s)
1304            }
1305        })
1306        .collect();
1307    let lf_left = left.lazy_frame().select(&exprs_left);
1308    let lf_right = right.lazy_frame().select(&exprs_right);
1309    let out = lf_left.cross_join(lf_right, None);
1310    Ok(super::DataFrame::from_lazy_with_options(
1311        out,
1312        case_sensitive,
1313    ))
1314}
1315
1316fn order_columns_common_first<'a>(
1317    names: &'a [String],
1318    other: &std::collections::HashSet<&str>,
1319) -> Vec<&'a str> {
1320    let mut common = Vec::new();
1321    let mut rest = Vec::new();
1322    for n in names {
1323        let s = n.as_str();
1324        if other.contains(s) {
1325            common.push(s);
1326        } else {
1327            rest.push(s);
1328        }
1329    }
1330    common.into_iter().chain(rest).collect()
1331}
1332
1333/// Summary statistics (count, mean, std, min, max). PySpark describe.
1334/// Builds a summary DataFrame with a "summary" column (PySpark name) and one column per numeric input column.
1335pub fn describe(df: &DataFrame, case_sensitive: bool) -> Result<DataFrame, PolarsError> {
1336    use polars::prelude::*;
1337    let pl_df = df.collect_inner()?.as_ref().clone();
1338    let mut stat_values: Vec<Column> = Vec::new();
1339    for col in pl_df.columns() {
1340        let s = col.as_materialized_series();
1341        let dtype = s.dtype();
1342        if dtype.is_numeric() {
1343            let name = s.name().clone();
1344            let count = s.len() as i64 - s.null_count() as i64;
1345            let mean_f = s.mean().unwrap_or(f64::NAN);
1346            let std_f = s.std(1).unwrap_or(f64::NAN);
1347            let ca = series_as_f64_ca(s, "describe")?;
1348            let min_f = ca.min().unwrap_or(f64::NAN);
1349            let max_f = ca.max().unwrap_or(f64::NAN);
1350            // PySpark describe/summary returns string type for value columns.
1351            // Use "null" for NaN so JSON/parity consumers get proper null (not string "None").
1352            let is_float = matches!(dtype, DataType::Float64 | DataType::Float32);
1353            let count_s = count.to_string();
1354            let mean_s = if mean_f.is_nan() {
1355                "null".to_string()
1356            } else {
1357                format!("{:.1}", mean_f)
1358            };
1359            let std_s = if std_f.is_nan() {
1360                "null".to_string()
1361            } else {
1362                format!("{:.1}", std_f)
1363            };
1364            let min_s = if min_f.is_nan() {
1365                "null".to_string()
1366            } else if min_f.fract() == 0.0 && is_float {
1367                format!("{:.1}", min_f)
1368            } else if min_f.fract() == 0.0 {
1369                format!("{:.0}", min_f)
1370            } else {
1371                format!("{min_f}")
1372            };
1373            let max_s = if max_f.is_nan() {
1374                "null".to_string()
1375            } else if max_f.fract() == 0.0 && is_float {
1376                format!("{:.1}", max_f)
1377            } else if max_f.fract() == 0.0 {
1378                format!("{:.0}", max_f)
1379            } else {
1380                format!("{max_f}")
1381            };
1382            let series = Series::new(
1383                name,
1384                [
1385                    count_s.as_str(),
1386                    mean_s.as_str(),
1387                    std_s.as_str(),
1388                    min_s.as_str(),
1389                    max_s.as_str(),
1390                ],
1391            );
1392            stat_values.push(series.into());
1393        }
1394    }
1395    if stat_values.is_empty() {
1396        // No numeric columns: return minimal describe with just summary column (PySpark name)
1397        let stat_col = Series::new(
1398            "summary".into(),
1399            &["count", "mean", "stddev", "min", "max" as &str],
1400        )
1401        .into();
1402        let empty: Vec<f64> = Vec::new();
1403        let empty_series = Series::new("placeholder".into(), empty).into();
1404        let out_pl = polars::prelude::DataFrame::new_infer_height(vec![stat_col, empty_series])?;
1405        return Ok(super::DataFrame::from_polars_with_options(
1406            out_pl,
1407            case_sensitive,
1408        ));
1409    }
1410    let summary_col = Series::new(
1411        "summary".into(),
1412        &["count", "mean", "stddev", "min", "max" as &str],
1413    )
1414    .into();
1415    let mut cols: Vec<Column> = vec![summary_col];
1416    cols.extend(stat_values);
1417    let out_pl = polars::prelude::DataFrame::new_infer_height(cols)?;
1418    Ok(super::DataFrame::from_polars_with_options(
1419        out_pl,
1420        case_sensitive,
1421    ))
1422}
1423
1424/// Set difference: rows in left that are not in right (by all columns). PySpark subtract / except.
1425/// Aligns right column names to left (case-insensitive) so subtract works when casing differs.
1426pub fn subtract(
1427    left: &DataFrame,
1428    right: &DataFrame,
1429    case_sensitive: bool,
1430) -> Result<DataFrame, PolarsError> {
1431    use polars::prelude::*;
1432    let left_names = left.columns()?;
1433    let right_names = right.columns()?;
1434    let right_on: Vec<Expr> = left_names
1435        .iter()
1436        .map(|ln| {
1437            let resolved = if case_sensitive {
1438                right_names
1439                    .iter()
1440                    .find(|rn| rn.as_str() == ln.as_str())
1441                    .cloned()
1442                    .ok_or_else(|| {
1443                        PolarsError::ColumnNotFound(
1444                            format!(
1445                                "cannot resolve: subtract: column '{}' not found on right",
1446                                ln
1447                            )
1448                            .into(),
1449                        )
1450                    })?
1451            } else {
1452                let ln_lower = ln.to_lowercase();
1453                right_names
1454                    .iter()
1455                    .find(|rn| rn.to_lowercase() == ln_lower)
1456                    .cloned()
1457                    .ok_or_else(|| {
1458                        PolarsError::ColumnNotFound(
1459                            format!(
1460                                "cannot resolve: subtract: column '{}' not found on right",
1461                                ln
1462                            )
1463                            .into(),
1464                        )
1465                    })?
1466            };
1467            Ok(col(resolved.as_str()))
1468        })
1469        .collect::<Result<Vec<_>, PolarsError>>()?;
1470    let left_on: Vec<Expr> = left_names.iter().map(|n| col(n.as_str())).collect();
1471    let right_lf = right.lazy_frame();
1472    let left_lf = left.lazy_frame();
1473    let anti = left_lf.join(right_lf, left_on, right_on, JoinArgs::new(JoinType::Anti));
1474    Ok(super::DataFrame::from_lazy_with_options(
1475        anti,
1476        case_sensitive,
1477    ))
1478}
1479
1480/// Set intersection: rows that appear in both DataFrames (by all columns). PySpark intersect.
1481/// Aligns right column names to left (case-insensitive) so intersect works when casing differs.
1482pub fn intersect(
1483    left: &DataFrame,
1484    right: &DataFrame,
1485    case_sensitive: bool,
1486) -> Result<DataFrame, PolarsError> {
1487    use polars::prelude::*;
1488    let left_names = left.columns()?;
1489    let right_names = right.columns()?;
1490    let right_on: Vec<Expr> = left_names
1491        .iter()
1492        .map(|ln| {
1493            let resolved = if case_sensitive {
1494                right_names
1495                    .iter()
1496                    .find(|rn| rn.as_str() == ln.as_str())
1497                    .cloned()
1498                    .ok_or_else(|| {
1499                        PolarsError::ColumnNotFound(
1500                            format!(
1501                                "cannot resolve: intersect: column '{}' not found on right",
1502                                ln
1503                            )
1504                            .into(),
1505                        )
1506                    })?
1507            } else {
1508                let ln_lower = ln.to_lowercase();
1509                right_names
1510                    .iter()
1511                    .find(|rn| rn.to_lowercase() == ln_lower)
1512                    .cloned()
1513                    .ok_or_else(|| {
1514                        PolarsError::ColumnNotFound(
1515                            format!(
1516                                "cannot resolve: intersect: column '{}' not found on right",
1517                                ln
1518                            )
1519                            .into(),
1520                        )
1521                    })?
1522            };
1523            Ok(col(resolved.as_str()))
1524        })
1525        .collect::<Result<Vec<_>, PolarsError>>()?;
1526    let left_on: Vec<Expr> = left_names.iter().map(|n| col(n.as_str())).collect();
1527    let left_lf = left.lazy_frame();
1528    let right_lf = right.lazy_frame();
1529    let semi = left_lf
1530        .join(right_lf, left_on, right_on, JoinArgs::new(JoinType::Semi))
1531        .unique(None, UniqueKeepStrategy::First);
1532    Ok(super::DataFrame::from_lazy_with_options(
1533        semi,
1534        case_sensitive,
1535    ))
1536}
1537
1538// ---------- Batch A: sample, first/head/take/tail, is_empty, to_df ----------
1539
1540/// Sample a fraction of rows. PySpark sample(withReplacement, fraction, seed).
1541pub fn sample(
1542    df: &DataFrame,
1543    with_replacement: bool,
1544    fraction: f64,
1545    seed: Option<u64>,
1546    case_sensitive: bool,
1547) -> Result<DataFrame, PolarsError> {
1548    use polars::prelude::Series;
1549    let pl = df.collect_inner()?;
1550    let n = pl.height();
1551    if n == 0 {
1552        return Ok(super::DataFrame::from_lazy_with_options(
1553            polars::prelude::DataFrame::empty().lazy(),
1554            case_sensitive,
1555        ));
1556    }
1557    let take_n = (n as f64 * fraction).round() as usize;
1558    let take_n = take_n.min(n).max(0);
1559    if take_n == 0 {
1560        return Ok(super::DataFrame::from_lazy_with_options(
1561            pl.as_ref().head(Some(0)).lazy(),
1562            case_sensitive,
1563        ));
1564    }
1565    let idx_series = Series::new("idx".into(), (0..n).map(|i| i as u32).collect::<Vec<_>>());
1566    let sampled_idx = idx_series.sample_n(take_n, with_replacement, true, seed)?;
1567    let idx_ca = sampled_idx
1568        .u32()
1569        .map_err(|_| PolarsError::ComputeError("sample: expected u32 indices".into()))?;
1570    let pl_df = pl.as_ref().take(idx_ca)?;
1571    Ok(super::DataFrame::from_polars_with_options(
1572        pl_df,
1573        case_sensitive,
1574    ))
1575}
1576
1577/// Split DataFrame by weights (random split). PySpark randomSplit(weights, seed).
1578/// Returns one DataFrame per weight; weights are normalized to fractions.
1579/// Each row is assigned to exactly one split (disjoint partitions).
1580pub fn random_split(
1581    df: &DataFrame,
1582    weights: &[f64],
1583    seed: Option<u64>,
1584    case_sensitive: bool,
1585) -> Result<Vec<DataFrame>, PolarsError> {
1586    let total: f64 = weights.iter().sum();
1587    if total <= 0.0 || weights.is_empty() {
1588        return Ok(Vec::new());
1589    }
1590    let pl = df.collect_inner()?;
1591    let n = pl.height();
1592    if n == 0 {
1593        return Ok(weights.iter().map(|_| super::DataFrame::empty()).collect());
1594    }
1595    // Normalize weights to cumulative fractions: e.g. [0.25, 0.25, 0.5] -> [0.25, 0.5, 1.0]
1596    let mut cum = Vec::with_capacity(weights.len());
1597    let mut acc = 0.0_f64;
1598    for w in weights {
1599        acc += w / total;
1600        cum.push(acc);
1601    }
1602    // Assign each row index to one bucket using a single seeded RNG (disjoint split).
1603    use polars::prelude::Series;
1604    use rand::Rng;
1605    use rand::SeedableRng;
1606    let mut rng = rand::rngs::StdRng::seed_from_u64(seed.unwrap_or(0));
1607    let mut bucket_indices: Vec<Vec<u32>> = (0..weights.len()).map(|_| Vec::new()).collect();
1608    for i in 0..n {
1609        let r: f64 = rng.r#gen();
1610        let bucket = cum
1611            .iter()
1612            .position(|&c| r < c)
1613            .unwrap_or(weights.len().saturating_sub(1));
1614        bucket_indices[bucket].push(i as u32);
1615    }
1616    let pl = pl.as_ref();
1617    let mut out = Vec::with_capacity(weights.len());
1618    for indices in bucket_indices {
1619        if indices.is_empty() {
1620            out.push(super::DataFrame::from_polars_with_options(
1621                pl.clone().head(Some(0)),
1622                case_sensitive,
1623            ));
1624        } else {
1625            let idx_series = Series::new("idx".into(), indices);
1626            let idx_ca = idx_series.u32().map_err(|_| {
1627                PolarsError::ComputeError("random_split: expected u32 indices".into())
1628            })?;
1629            let taken = pl.take(idx_ca)?;
1630            out.push(super::DataFrame::from_polars_with_options(
1631                taken,
1632                case_sensitive,
1633            ));
1634        }
1635    }
1636    Ok(out)
1637}
1638
1639/// Stratified sample by column value. PySpark sampleBy(col, fractions, seed).
1640/// fractions: list of (value as Expr literal, fraction to sample for that value).
1641pub fn sample_by(
1642    df: &DataFrame,
1643    col_name: &str,
1644    fractions: &[(Expr, f64)],
1645    seed: Option<u64>,
1646    case_sensitive: bool,
1647) -> Result<DataFrame, PolarsError> {
1648    use polars::prelude::*;
1649    if fractions.is_empty() {
1650        return Ok(super::DataFrame::from_lazy_with_options(
1651            df.lazy_frame().slice(0, 0),
1652            case_sensitive,
1653        ));
1654    }
1655    let resolved = df.resolve_column_name(col_name)?;
1656    let mut parts = Vec::with_capacity(fractions.len());
1657    for (value_expr, frac) in fractions {
1658        let cond = col(resolved.as_str()).eq(value_expr.clone());
1659        let filtered = df.lazy_frame().filter(cond).collect()?;
1660        if filtered.height() == 0 {
1661            parts.push(filtered.head(Some(0)));
1662            continue;
1663        }
1664        let sampled = sample(
1665            &super::DataFrame::from_polars_with_options(filtered, case_sensitive),
1666            false,
1667            *frac,
1668            seed,
1669            case_sensitive,
1670        )?;
1671        parts.push(sampled.collect_inner()?.as_ref().clone());
1672    }
1673    let mut out = parts
1674        .first()
1675        .ok_or_else(|| PolarsError::ComputeError("sample_by: no parts".into()))?
1676        .clone();
1677    for p in parts.iter().skip(1) {
1678        out.vstack_mut(p)?;
1679    }
1680    Ok(super::DataFrame::from_polars_with_options(
1681        out,
1682        case_sensitive,
1683    ))
1684}
1685
1686/// First row as a DataFrame (one row). PySpark first().
1687/// Uses limit(1) then collect so that orderBy (and other plan steps) are applied before taking
1688/// the first row (issue #579: first() after orderBy must return first in sort order, not storage order).
1689pub fn first(df: &DataFrame, case_sensitive: bool) -> Result<DataFrame, PolarsError> {
1690    let limited = limit(df, 1, case_sensitive)?;
1691    let pl_df = limited.collect_inner()?.as_ref().clone();
1692    Ok(super::DataFrame::from_polars_with_options(
1693        pl_df,
1694        case_sensitive,
1695    ))
1696}
1697
1698/// First n rows. PySpark head(n). Same as limit.
1699pub fn head(df: &DataFrame, n: usize, case_sensitive: bool) -> Result<DataFrame, PolarsError> {
1700    limit(df, n, case_sensitive)
1701}
1702
1703/// Take first n rows (alias for limit). PySpark take(n).
1704pub fn take(df: &DataFrame, n: usize, case_sensitive: bool) -> Result<DataFrame, PolarsError> {
1705    limit(df, n, case_sensitive)
1706}
1707
1708/// Last n rows. PySpark tail(n).
1709pub fn tail(df: &DataFrame, n: usize, case_sensitive: bool) -> Result<DataFrame, PolarsError> {
1710    let pl = df.collect_inner()?;
1711    let total = pl.height();
1712    let skip = total.saturating_sub(n);
1713    let pl_df = pl.as_ref().clone().slice(skip as i64, n);
1714    Ok(super::DataFrame::from_polars_with_options(
1715        pl_df,
1716        case_sensitive,
1717    ))
1718}
1719
1720/// Whether the DataFrame has zero rows. PySpark isEmpty.
1721pub fn is_empty(df: &DataFrame) -> bool {
1722    df.count().map(|n| n == 0).unwrap_or(true)
1723}
1724
1725/// Rename columns. PySpark toDF(*colNames). Names must match length of columns.
1726pub fn to_df(
1727    df: &DataFrame,
1728    names: &[&str],
1729    case_sensitive: bool,
1730) -> Result<DataFrame, PolarsError> {
1731    let cols = df.columns()?;
1732    if names.len() != cols.len() {
1733        return Err(PolarsError::ComputeError(
1734            format!(
1735                "toDF: expected {} column names, got {}",
1736                cols.len(),
1737                names.len()
1738            )
1739            .into(),
1740        ));
1741    }
1742    let pl_df = df.collect_inner()?;
1743    let mut pl_df = pl_df.as_ref().clone();
1744    for (old, new) in cols.iter().zip(names.iter()) {
1745        pl_df.rename(old.as_str(), (*new).into())?;
1746    }
1747    Ok(super::DataFrame::from_polars_with_options(
1748        pl_df,
1749        case_sensitive,
1750    ))
1751}
1752
1753// ---------- Batch B: toJSON, explain, printSchema ----------
1754
1755fn any_value_to_serde_value(av: &polars::prelude::AnyValue) -> serde_json::Value {
1756    use polars::prelude::AnyValue;
1757    use serde_json::Number;
1758    match av {
1759        AnyValue::Null => serde_json::Value::Null,
1760        AnyValue::Boolean(v) => serde_json::Value::Bool(*v),
1761        AnyValue::Int8(v) => serde_json::Value::Number(Number::from(*v as i64)),
1762        AnyValue::Int32(v) => serde_json::Value::Number(Number::from(*v)),
1763        AnyValue::Int64(v) => serde_json::Value::Number(Number::from(*v)),
1764        AnyValue::UInt32(v) => serde_json::Value::Number(Number::from(*v)),
1765        AnyValue::Float64(v) => Number::from_f64(*v)
1766            .map(serde_json::Value::Number)
1767            .unwrap_or(serde_json::Value::Null),
1768        AnyValue::String(v) => serde_json::Value::String(v.to_string()),
1769        AnyValue::StringOwned(v) => serde_json::Value::String(v.to_string()),
1770        _ => serde_json::Value::String(format!("{av:?}")),
1771    }
1772}
1773
1774/// Convert a literal expression value to JSON for Python UDF executor (literal args).
1775pub(crate) fn literal_value_to_serde_value(
1776    lv: &polars::prelude::LiteralValue,
1777) -> Option<serde_json::Value> {
1778    lv.to_any_value().as_ref().map(any_value_to_serde_value)
1779}
1780
1781/// Collect rows as JSON strings (one JSON object per row). PySpark toJSON.
1782pub fn to_json(df: &DataFrame) -> Result<Vec<String>, PolarsError> {
1783    use polars::prelude::*;
1784    let collected = df.collect_inner()?;
1785    let pl = collected.as_ref();
1786    let names = pl.get_column_names();
1787    let mut out = Vec::with_capacity(pl.height());
1788    for r in 0..pl.height() {
1789        let mut row = serde_json::Map::new();
1790        for (i, name) in names.iter().enumerate() {
1791            let col = pl
1792                .columns()
1793                .get(i)
1794                .ok_or_else(|| PolarsError::ComputeError("to_json: column index".into()))?;
1795            let series = col.as_materialized_series();
1796            let av = series
1797                .get(r)
1798                .map_err(|e| PolarsError::ComputeError(e.to_string().into()))?;
1799            row.insert(name.to_string(), any_value_to_serde_value(&av));
1800        }
1801        out.push(
1802            serde_json::to_string(&row)
1803                .map_err(|e| PolarsError::ComputeError(e.to_string().into()))?,
1804        );
1805    }
1806    Ok(out)
1807}
1808
1809/// Return a string describing the execution plan. PySpark explain.
1810pub fn explain(_df: &DataFrame) -> String {
1811    "DataFrame (eager Polars backend)".to_string()
1812}
1813
1814/// Return schema as a tree string. PySpark printSchema (we return string; caller can print).
1815pub fn print_schema(df: &DataFrame) -> Result<String, PolarsError> {
1816    let schema = df.schema()?;
1817    let mut s = "root\n".to_string();
1818    for f in schema.fields() {
1819        let dt = match &f.data_type {
1820            crate::schema::DataType::String => "string",
1821            crate::schema::DataType::Integer => "int",
1822            crate::schema::DataType::Long => "bigint",
1823            crate::schema::DataType::Double => "double",
1824            crate::schema::DataType::Boolean => "boolean",
1825            crate::schema::DataType::Date => "date",
1826            crate::schema::DataType::Timestamp => "timestamp",
1827            _ => "string",
1828        };
1829        s.push_str(&format!(" |-- {}: {}\n", f.name, dt));
1830    }
1831    Ok(s)
1832}
1833
1834// ---------- Batch D: selectExpr, colRegex, withColumns, withColumnsRenamed, na ----------
1835
1836/// Parse simple "col op literal" expression for selectExpr (e.g. "age * 2", "salary + 100").
1837fn parse_simple_expr(df: &DataFrame, s: &str) -> Result<Option<Expr>, PolarsError> {
1838    let s = s.trim();
1839    for (op, kind) in [
1840        (" * ", "mul"),
1841        ("*", "mul"),
1842        (" + ", "add"),
1843        ("+", "add"),
1844        (" - ", "sub"),
1845        (" / ", "div"),
1846        ("/", "div"),
1847    ] {
1848        if let Some((a, b)) = s.split_once(op) {
1849            let a = a.trim();
1850            let b = b.trim();
1851            let (col_part, num_part, col_on_left) =
1852                if df.resolve_column_name(a).is_ok() && b.parse::<f64>().is_ok() {
1853                    (a, b, true)
1854                } else if df.resolve_column_name(b).is_ok() && a.parse::<f64>().is_ok() {
1855                    (b, a, false)
1856                } else {
1857                    continue;
1858                };
1859            let resolved = df.resolve_column_name(col_part)?;
1860            let col_expr = col(resolved.as_str());
1861            let num: f64 = num_part.parse().map_err(|_| {
1862                PolarsError::ComputeError(
1863                    format!("selectExpr: could not parse literal {num_part:?}").into(),
1864                )
1865            })?;
1866            let lit_expr = lit(num);
1867            let expr = match kind {
1868                "mul" => col_expr * lit_expr,
1869                "add" => col_expr + lit_expr,
1870                "sub" => {
1871                    if col_on_left {
1872                        col_expr - lit_expr
1873                    } else {
1874                        lit_expr - col_expr
1875                    }
1876                }
1877                "div" => col_expr / lit_expr,
1878                _ => continue,
1879            };
1880            return Ok(Some(expr));
1881        }
1882    }
1883    Ok(None)
1884}
1885
1886/// Select by expression strings. Supports column names, "col as alias", and simple "col op num as alias". PySpark selectExpr.
1887pub fn select_expr(
1888    df: &DataFrame,
1889    exprs: &[String],
1890    case_sensitive: bool,
1891) -> Result<DataFrame, PolarsError> {
1892    let mut select_exprs: Vec<Expr> = Vec::new();
1893    for e in exprs {
1894        let e = e.trim();
1895        if let Some((left, right)) = e.split_once(" as ") {
1896            let left = left.trim();
1897            let alias = right.trim();
1898            if let Some(expr) = parse_simple_expr(df, left)? {
1899                select_exprs.push(expr.alias(alias));
1900            } else {
1901                let resolved = df.resolve_column_name(left)?;
1902                select_exprs.push(col(resolved.as_str()).alias(alias));
1903            }
1904        } else {
1905            let resolved = df.resolve_column_name(e)?;
1906            select_exprs.push(col(resolved.as_str()));
1907        }
1908    }
1909    select_with_exprs(df, select_exprs, case_sensitive, false)
1910}
1911
1912/// Select columns whose names match the regex pattern. PySpark colRegex.
1913pub fn col_regex(
1914    df: &DataFrame,
1915    pattern: &str,
1916    case_sensitive: bool,
1917) -> Result<DataFrame, PolarsError> {
1918    let re = regex::Regex::new(pattern).map_err(|e| {
1919        PolarsError::ComputeError(format!("colRegex: invalid pattern {pattern:?}: {e}").into())
1920    })?;
1921    let names = df.columns()?;
1922    let matched: Vec<&str> = names
1923        .iter()
1924        .filter(|n| re.is_match(n))
1925        .map(|s| s.as_str())
1926        .collect();
1927    if matched.is_empty() {
1928        return Err(PolarsError::ComputeError(
1929            format!("colRegex: no columns matched pattern {pattern:?}").into(),
1930        ));
1931    }
1932    select(df, matched, case_sensitive)
1933}
1934
1935/// Add or replace multiple columns. PySpark withColumns. Uses Column so deferred rand/randn get per-row values.
1936pub fn with_columns(
1937    df: &DataFrame,
1938    exprs: &[(String, crate::column::Column)],
1939    case_sensitive: bool,
1940) -> Result<DataFrame, PolarsError> {
1941    let pl = df.collect_inner()?.as_ref().clone();
1942    let mut current = super::DataFrame::from_polars_with_options(pl, case_sensitive);
1943    for (name, col) in exprs {
1944        current = with_column(&current, name, col, case_sensitive)?;
1945    }
1946    Ok(current)
1947}
1948
1949/// Rename multiple columns. PySpark withColumnsRenamed.
1950pub fn with_columns_renamed(
1951    df: &DataFrame,
1952    renames: &[(String, String)],
1953    case_sensitive: bool,
1954) -> Result<DataFrame, PolarsError> {
1955    // Apply renames one by one; skip columns that do not exist (no-op for missing),
1956    // matching PySpark withColumnsRenamed behavior for non-existent columns.
1957    let mut lf = df.lazy_frame();
1958    let mut applied_any = false;
1959    for (old_name, new_name) in renames {
1960        match df.resolve_column_name(old_name) {
1961            Ok(resolved) => {
1962                lf = lf.rename([resolved.as_str()], [new_name.as_str()], true);
1963                applied_any = true;
1964            }
1965            Err(PolarsError::ColumnNotFound(_)) => {
1966                // Non-existent column: leave DataFrame unchanged for this entry.
1967                continue;
1968            }
1969            Err(e) => return Err(e),
1970        }
1971    }
1972    if !applied_any {
1973        return Ok(df.clone());
1974    }
1975    Ok(super::DataFrame::from_lazy_with_options(lf, case_sensitive))
1976}
1977
1978/// NA sub-API builder. PySpark df.na().fill(...) / .drop(...).
1979pub struct DataFrameNa<'a> {
1980    pub(crate) df: &'a DataFrame,
1981}
1982
1983impl<'a> DataFrameNa<'a> {
1984    /// Create from a reference to a DataFrame (for root crate wrapper).
1985    pub fn new(df: &'a DataFrame) -> Self {
1986        DataFrameNa { df }
1987    }
1988
1989    /// Fill nulls with the given value. PySpark na.fill(value, subset=...).
1990    pub fn fill(&self, value: Expr, subset: Option<Vec<&str>>) -> Result<DataFrame, PolarsError> {
1991        fillna(self.df, value, subset, self.df.case_sensitive)
1992    }
1993
1994    /// Replace values in columns. PySpark na.replace(to_replace, value, subset=None).
1995    pub fn replace(
1996        &self,
1997        old_value: Expr,
1998        new_value: Expr,
1999        subset: Option<Vec<&str>>,
2000    ) -> Result<DataFrame, PolarsError> {
2001        let cols: Vec<String> = match &subset {
2002            Some(s) => s.iter().map(|x| (*x).to_string()).collect(),
2003            None => self.df.columns()?,
2004        };
2005        let mut result = self.df.clone();
2006        for col_name in &cols {
2007            result = replace(
2008                &result,
2009                col_name.as_str(),
2010                old_value.clone(),
2011                new_value.clone(),
2012                self.df.case_sensitive,
2013            )?;
2014        }
2015        Ok(result)
2016    }
2017
2018    /// Drop rows with nulls. PySpark na.drop(subset=..., how=..., thresh=...).
2019    pub fn drop(
2020        &self,
2021        subset: Option<Vec<&str>>,
2022        how: &str,
2023        thresh: Option<usize>,
2024    ) -> Result<DataFrame, PolarsError> {
2025        dropna(self.df, subset, how, thresh, self.df.case_sensitive)
2026    }
2027}
2028
2029// ---------- Batch E: offset, transform, freqItems, approxQuantile, crosstab, melt, exceptAll, intersectAll ----------
2030
2031/// Skip first n rows. PySpark offset(n).
2032pub fn offset(df: &DataFrame, n: usize, case_sensitive: bool) -> Result<DataFrame, PolarsError> {
2033    let lf = df.lazy_frame().slice(n as i64, u32::MAX);
2034    Ok(super::DataFrame::from_lazy_with_options(lf, case_sensitive))
2035}
2036
2037/// Transform DataFrame by a function. PySpark transform(func).
2038pub fn transform<F>(df: &DataFrame, f: F) -> Result<DataFrame, PolarsError>
2039where
2040    F: FnOnce(DataFrame) -> Result<DataFrame, PolarsError>,
2041{
2042    let df_out = f(df.clone())?;
2043    Ok(df_out)
2044}
2045
2046/// Frequent items. PySpark freqItems. Returns one row with columns {col}_freqItems (array of values with frequency >= support).
2047pub fn freq_items(
2048    df: &DataFrame,
2049    columns: &[&str],
2050    support: f64,
2051    case_sensitive: bool,
2052) -> Result<DataFrame, PolarsError> {
2053    use polars::prelude::SeriesMethods;
2054    if columns.is_empty() {
2055        return Ok(super::DataFrame::from_lazy_with_options(
2056            df.lazy_frame().slice(0, 0),
2057            case_sensitive,
2058        ));
2059    }
2060    let support = support.clamp(1e-4, 1.0);
2061    let collected = df.collect_inner()?;
2062    let pl_df = collected.as_ref();
2063    let n_total = pl_df.height() as f64;
2064    if n_total == 0.0 {
2065        let mut out = Vec::with_capacity(columns.len());
2066        for col_name in columns {
2067            let resolved = df.resolve_column_name(col_name)?;
2068            let s = pl_df
2069                .column(resolved.as_str())?
2070                .as_series()
2071                .ok_or_else(|| PolarsError::ComputeError("column not a series".into()))?
2072                .clone();
2073            let empty_sub = s.head(Some(0));
2074            let list_chunked = polars::prelude::ListChunked::from_iter([empty_sub].into_iter())
2075                .with_name(format!("{resolved}_freqItems").into());
2076            out.push(list_chunked.into_series().into());
2077        }
2078        return Ok(super::DataFrame::from_polars_with_options(
2079            polars::prelude::DataFrame::new_infer_height(out)?,
2080            case_sensitive,
2081        ));
2082    }
2083    let mut out_series = Vec::with_capacity(columns.len());
2084    for col_name in columns {
2085        let resolved = df.resolve_column_name(col_name)?;
2086        let s = pl_df
2087            .column(resolved.as_str())?
2088            .as_series()
2089            .ok_or_else(|| PolarsError::ComputeError("column not a series".into()))?
2090            .clone();
2091        let vc = s.value_counts(false, false, "counts".into(), false)?;
2092        let count_col = vc
2093            .column("counts")
2094            .map_err(|_| PolarsError::ComputeError("value_counts missing counts column".into()))?;
2095        let counts = count_col
2096            .u32()
2097            .map_err(|_| PolarsError::ComputeError("freq_items: counts column not u32".into()))?;
2098        let value_col_name = s.name();
2099        let values_col = vc
2100            .column(value_col_name.as_str())
2101            .map_err(|_| PolarsError::ComputeError("value_counts missing value column".into()))?;
2102        let threshold = (support * n_total).ceil() as u32;
2103        let indices: Vec<u32> = counts
2104            .into_iter()
2105            .enumerate()
2106            .filter_map(|(i, c)| {
2107                if c? >= threshold {
2108                    Some(i as u32)
2109                } else {
2110                    None
2111                }
2112            })
2113            .collect();
2114        let idx_series = Series::new("idx".into(), indices);
2115        let idx_ca = idx_series
2116            .u32()
2117            .map_err(|_| PolarsError::ComputeError("freq_items: index series not u32".into()))?;
2118        let values_series = values_col
2119            .as_series()
2120            .ok_or_else(|| PolarsError::ComputeError("value column not a series".into()))?;
2121        let filtered = values_series.take(idx_ca)?;
2122        let list_chunked = polars::prelude::ListChunked::from_iter([filtered].into_iter())
2123            .with_name(format!("{resolved}_freqItems").into());
2124        let list_row = list_chunked.into_series();
2125        out_series.push(list_row.into());
2126    }
2127    let out_df = polars::prelude::DataFrame::new_infer_height(out_series)?;
2128    Ok(super::DataFrame::from_polars_with_options(
2129        out_df,
2130        case_sensitive,
2131    ))
2132}
2133
2134/// Approximate quantiles. PySpark approxQuantile. Returns one column "quantile" with one row per probability.
2135pub fn approx_quantile(
2136    df: &DataFrame,
2137    column: &str,
2138    probabilities: &[f64],
2139    case_sensitive: bool,
2140) -> Result<DataFrame, PolarsError> {
2141    use polars::prelude::{ChunkQuantile, QuantileMethod};
2142    if probabilities.is_empty() {
2143        return Ok(super::DataFrame::from_polars_with_options(
2144            polars::prelude::DataFrame::new_infer_height(vec![
2145                Series::new("quantile".into(), Vec::<f64>::new()).into(),
2146            ])?,
2147            case_sensitive,
2148        ));
2149    }
2150    let resolved = df.resolve_column_name(column)?;
2151    let collected = df.collect_inner()?;
2152    let s = collected
2153        .column(resolved.as_str())?
2154        .as_series()
2155        .ok_or_else(|| PolarsError::ComputeError("approx_quantile: column not a series".into()))?
2156        .clone();
2157    let ca = series_as_f64_ca(&s, "approx_quantile")?;
2158    let mut quantiles = Vec::with_capacity(probabilities.len());
2159    for &p in probabilities {
2160        let q = ca.quantile(p, QuantileMethod::Linear)?;
2161        quantiles.push(q.unwrap_or(f64::NAN));
2162    }
2163    let out_df = polars::prelude::DataFrame::new_infer_height(vec![
2164        Series::new("quantile".into(), quantiles).into(),
2165    ])?;
2166    Ok(super::DataFrame::from_polars_with_options(
2167        out_df,
2168        case_sensitive,
2169    ))
2170}
2171
2172/// Cross-tabulation. PySpark crosstab. Returns long format (col1, col2, count); for wide format use pivot on the result.
2173pub fn crosstab(
2174    df: &DataFrame,
2175    col1: &str,
2176    col2: &str,
2177    case_sensitive: bool,
2178) -> Result<DataFrame, PolarsError> {
2179    use polars::prelude::*;
2180    let c1 = df.resolve_column_name(col1)?;
2181    let c2 = df.resolve_column_name(col2)?;
2182    let collected = df.collect_inner()?;
2183    let pl_df = collected.as_ref();
2184    let grouped = pl_df
2185        .clone()
2186        .lazy()
2187        .group_by([col(c1.as_str()), col(c2.as_str())])
2188        .agg([len().alias("count")])
2189        .collect()?;
2190    Ok(super::DataFrame::from_polars_with_options(
2191        grouped,
2192        case_sensitive,
2193    ))
2194}
2195
2196/// Unpivot (melt). PySpark melt. Long format with id_vars kept, plus "variable" and "value" columns.
2197pub fn melt(
2198    df: &DataFrame,
2199    id_vars: &[&str],
2200    value_vars: &[&str],
2201    case_sensitive: bool,
2202) -> Result<DataFrame, PolarsError> {
2203    use polars::prelude::*;
2204    let collected = df.collect_inner()?;
2205    let pl_df = collected.as_ref();
2206    if value_vars.is_empty() {
2207        return Ok(super::DataFrame::from_polars_with_options(
2208            pl_df.head(Some(0)),
2209            case_sensitive,
2210        ));
2211    }
2212    let id_resolved: Vec<String> = id_vars
2213        .iter()
2214        .map(|s| df.resolve_column_name(s).map(|r| r.to_string()))
2215        .collect::<Result<Vec<_>, _>>()?;
2216    let value_resolved: Vec<String> = value_vars
2217        .iter()
2218        .map(|s| df.resolve_column_name(s).map(|r| r.to_string()))
2219        .collect::<Result<Vec<_>, _>>()?;
2220    let mut parts = Vec::with_capacity(value_vars.len());
2221    for vname in &value_resolved {
2222        let select_cols: Vec<&str> = id_resolved
2223            .iter()
2224            .map(|s| s.as_str())
2225            .chain([vname.as_str()])
2226            .collect();
2227        let mut part = pl_df.select(select_cols)?;
2228        let var_series = Series::new("variable".into(), vec![vname.as_str(); part.height()]);
2229        part.with_column(var_series.into())?;
2230        part.rename(vname.as_str(), "value".into())?;
2231        parts.push(part);
2232    }
2233    let mut out = parts
2234        .first()
2235        .ok_or_else(|| PolarsError::ComputeError("melt: no value columns".into()))?
2236        .clone();
2237    for p in parts.iter().skip(1) {
2238        out.vstack_mut(p)?;
2239    }
2240    let col_order: Vec<&str> = id_resolved
2241        .iter()
2242        .map(|s| s.as_str())
2243        .chain(["variable", "value"])
2244        .collect();
2245    let out = out.select(col_order)?;
2246    Ok(super::DataFrame::from_polars_with_options(
2247        out,
2248        case_sensitive,
2249    ))
2250}
2251
2252/// Set difference keeping duplicates. PySpark exceptAll. Simple impl: same as subtract.
2253pub fn except_all(
2254    left: &DataFrame,
2255    right: &DataFrame,
2256    case_sensitive: bool,
2257) -> Result<DataFrame, PolarsError> {
2258    subtract(left, right, case_sensitive)
2259}
2260
2261/// Set intersection keeping duplicates. PySpark intersectAll. Simple impl: same as intersect.
2262pub fn intersect_all(
2263    left: &DataFrame,
2264    right: &DataFrame,
2265    case_sensitive: bool,
2266) -> Result<DataFrame, PolarsError> {
2267    intersect(left, right, case_sensitive)
2268}
2269
2270#[cfg(test)]
2271mod tests {
2272    use super::{
2273        SelectItem, distinct, drop, dropna, filter, first, head, limit, offset, order_by,
2274        select_items, union, union_by_name, with_column,
2275    };
2276    use crate::column::Column;
2277    use crate::functions;
2278    use crate::{DataFrame, SparkSession};
2279    use polars::prelude::{col, concat_str, lit};
2280    use serde_json::json;
2281
2282    fn test_df() -> DataFrame {
2283        let spark = SparkSession::builder()
2284            .app_name("transform_tests")
2285            .get_or_create();
2286        spark
2287            .create_dataframe(
2288                vec![
2289                    (1i64, 10i64, "a".to_string()),
2290                    (2i64, 20i64, "b".to_string()),
2291                    (3i64, 30i64, "c".to_string()),
2292                ],
2293                vec!["id", "v", "label"],
2294            )
2295            .unwrap()
2296    }
2297
2298    #[test]
2299    fn limit_zero() {
2300        let df = test_df();
2301        let out = limit(&df, 0, false).unwrap();
2302        assert_eq!(out.count().unwrap(), 0);
2303    }
2304
2305    #[test]
2306    fn limit_more_than_rows() {
2307        let df = test_df();
2308        let out = limit(&df, 10, false).unwrap();
2309        assert_eq!(out.count().unwrap(), 3);
2310    }
2311
2312    #[test]
2313    fn distinct_on_empty() {
2314        let spark = SparkSession::builder()
2315            .app_name("transform_tests")
2316            .get_or_create();
2317        let df = spark
2318            .create_dataframe(vec![] as Vec<(i64, i64, String)>, vec!["a", "b", "c"])
2319            .unwrap();
2320        let out = distinct(&df, None, false).unwrap();
2321        assert_eq!(out.count().unwrap(), 0);
2322    }
2323
2324    #[test]
2325    fn first_returns_one_row() {
2326        let df = test_df();
2327        let out = first(&df, false).unwrap();
2328        assert_eq!(out.count().unwrap(), 1);
2329    }
2330
2331    /// Issue #579: first() after orderBy must return first row in sort order, not storage order.
2332    #[test]
2333    fn first_after_order_by_returns_first_in_sort_order() {
2334        use polars::prelude::df;
2335        let spark = SparkSession::builder()
2336            .app_name("transform_tests")
2337            .get_or_create();
2338        let pl = df![
2339            "name" => ["Charlie", "Alice", "Bob"],
2340            "value" => [3i64, 1i64, 2i64],
2341        ]
2342        .unwrap();
2343        let df = spark.create_dataframe_from_polars(pl);
2344        let ordered = order_by(&df, vec!["value"], vec![true], false).unwrap();
2345        let one = first(&ordered, false).unwrap();
2346        let collected = one.collect_inner().unwrap();
2347        let name_series = collected.column("name").unwrap();
2348        let first_name = name_series.str().unwrap().get(0).unwrap();
2349        assert_eq!(
2350            first_name, "Alice",
2351            "first() after orderBy(value) must return row with min value (Alice=1), not first in storage (Charlie)"
2352        );
2353    }
2354
2355    /// Issue #1253: to_timestamp accepts StringType, TimestampType, IntegerType, LongType, DateType, DoubleType (PySpark).
2356    #[test]
2357    fn with_column_to_timestamp_accepts_multiple_types() {
2358        let spark = SparkSession::builder()
2359            .app_name("to_timestamp_types_test")
2360            .get_or_create();
2361
2362        // IntegerType (int) -> unix seconds
2363        let rows_int = vec![vec![json!(1672574400)]];
2364        let schema_int = vec![("unix_ts".to_string(), "int".to_string())];
2365        let df_int = spark
2366            .create_dataframe_from_rows(rows_int, schema_int, false, false)
2367            .unwrap();
2368        let col_ts = functions::to_timestamp(&df_int.column("unix_ts").unwrap(), None).unwrap();
2369        let out_int = with_column(&df_int, "parsed", &col_ts, false).unwrap();
2370        let rows_out = out_int.collect_as_json_rows().unwrap();
2371        assert_eq!(rows_out.len(), 1);
2372        assert!(rows_out[0].get("parsed").and_then(|v| v.as_str()).is_some());
2373
2374        // LongType (long) -> unix seconds
2375        let rows_long = vec![vec![json!(1672574400)]];
2376        let schema_long = vec![("unix_ts".to_string(), "long".to_string())];
2377        let df_long = spark
2378            .create_dataframe_from_rows(rows_long, schema_long, false, false)
2379            .unwrap();
2380        let col_ts_long =
2381            functions::to_timestamp(&df_long.column("unix_ts").unwrap(), None).unwrap();
2382        let out_long = with_column(&df_long, "parsed", &col_ts_long, false).unwrap();
2383        assert_eq!(out_long.collect_as_json_rows().unwrap().len(), 1);
2384
2385        // DateType (date) -> date to timestamp
2386        let rows_date = vec![vec![json!("2023-01-01")]];
2387        let schema_date = vec![("date_col".to_string(), "date".to_string())];
2388        let df_date = spark
2389            .create_dataframe_from_rows(rows_date, schema_date, false, false)
2390            .unwrap();
2391        let col_ts_date =
2392            functions::to_timestamp(&df_date.column("date_col").unwrap(), None).unwrap();
2393        let out_date = with_column(&df_date, "parsed", &col_ts_date, false).unwrap();
2394        assert_eq!(out_date.collect_as_json_rows().unwrap().len(), 1);
2395
2396        // DoubleType (double) -> unix seconds with fraction
2397        let rows_double = vec![vec![json!(1672574400.5)]];
2398        let schema_double = vec![("unix_ts".to_string(), "double".to_string())];
2399        let df_double = spark
2400            .create_dataframe_from_rows(rows_double, schema_double, false, false)
2401            .unwrap();
2402        let col_ts_double =
2403            functions::to_timestamp(&df_double.column("unix_ts").unwrap(), None).unwrap();
2404        let out_double = with_column(&df_double, "parsed", &col_ts_double, false).unwrap();
2405        assert_eq!(out_double.collect_as_json_rows().unwrap().len(), 1);
2406    }
2407
2408    /// to_timestamp(regexp_replace(col, r"\.\d+", "").cast("string"), "yyyy-MM-dd'T'HH:mm:ss"):
2409    /// behavior is column-name independent; regex strips fractional seconds, format parses → non-null.
2410    #[test]
2411    fn to_timestamp_after_regexp_replace_cast_string_parses_successfully() {
2412        use polars::prelude::{NamedFrom, Series};
2413        let spark = SparkSession::builder()
2414            .app_name("to_timestamp_regexp_test")
2415            .get_or_create();
2416        let impression_id = Series::new("impression_id".into(), &["IMP-001", "IMP-002", "IMP-003"]);
2417        let impression_date = Series::new(
2418            "impression_date".into(),
2419            &[
2420                "2025-03-07T19:34:56.123456",
2421                "2025-03-07T18:00:00.0",
2422                "2025-03-06T12:00:00.999",
2423            ],
2424        );
2425        let pl = polars::prelude::DataFrame::new_infer_height(vec![
2426            impression_id.into(),
2427            impression_date.into(),
2428        ])
2429        .unwrap();
2430        let df = spark.create_dataframe_from_polars(pl);
2431        let c = df.column("impression_date").unwrap();
2432        let replaced = functions::regexp_replace(&c, r"\.\d+", "");
2433        let casted = replaced.cast_to("string").unwrap();
2434        let ts_col = functions::to_timestamp(&casted, Some("yyyy-MM-dd'T'HH:mm:ss")).unwrap();
2435        let silver = with_column(&df, "impression_date_parsed", &ts_col, false).unwrap();
2436        let selected = select_items(
2437            &silver,
2438            vec![
2439                SelectItem::ColumnName("impression_id"),
2440                SelectItem::ColumnName("impression_date_parsed"),
2441            ],
2442            false,
2443        )
2444        .unwrap();
2445        let cond = functions::col("impression_id")
2446            .is_not_null()
2447            .and_(&functions::col("impression_date_parsed").is_not_null());
2448        let valid = filter(&selected, cond.into_expr(), false).unwrap();
2449        let count = valid.count().unwrap();
2450        assert_eq!(
2451            count, 3,
2452            "regex strips fractional seconds, format parses; all 3 rows valid"
2453        );
2454    }
2455
2456    /// Fused path (#153): fixed 2024 strings → all non-null (parsed timestamp not "recent").
2457    #[test]
2458    fn to_timestamp_fused_strip_fraction_fixed_2024_strings_non_null() {
2459        use polars::prelude::{NamedFrom, Series};
2460        let spark = SparkSession::builder()
2461            .app_name("to_timestamp_fused_fixed")
2462            .get_or_create();
2463        let id = Series::new("id".into(), &["a", "b", "c"]);
2464        let date_string = Series::new(
2465            "date_string".into(),
2466            &[
2467                "2024-01-15T10:30:45.123456",
2468                "2024-01-16T14:20:30.789012",
2469                "2024-01-17T09:15:22.456789",
2470            ],
2471        );
2472        let pl = polars::prelude::DataFrame::new_infer_height(vec![id.into(), date_string.into()])
2473            .unwrap();
2474        let df = spark.create_dataframe_from_polars(pl);
2475        let c = df.column("date_string").unwrap();
2476        let ts_col =
2477            functions::to_timestamp_fused_strip_fraction(&c, "yyyy-MM-dd'T'HH:mm:ss").unwrap();
2478        let out = with_column(&df, "date_parsed", &ts_col, false).unwrap();
2479        let non_null = out
2480            .filter(functions::col("date_parsed").is_not_null().into_expr())
2481            .unwrap()
2482            .count()
2483            .unwrap();
2484        assert_eq!(
2485            non_null, 3,
2486            "fixed 2024 strings: fused path returns non-null for all"
2487        );
2488    }
2489
2490    /// Fused path (#168): recent (dynamic) strings → all null (parsed timestamp within 31 days of ref_ts).
2491    #[test]
2492    fn to_timestamp_fused_strip_fraction_recent_strings_null() {
2493        use chrono::TimeDelta;
2494        use polars::prelude::{NamedFrom, Series};
2495        let spark = SparkSession::builder()
2496            .app_name("to_timestamp_fused_recent")
2497            .get_or_create();
2498        let now = chrono::Utc::now();
2499        let strings: Vec<String> = (0..3)
2500            .map(|i| {
2501                (now - TimeDelta::hours(i))
2502                    .format("%Y-%m-%dT%H:%M:%S%.6f")
2503                    .to_string()
2504            })
2505            .collect();
2506        let id = Series::new("id".into(), &["a", "b", "c"]);
2507        let date_string = Series::new("date_string".into(), strings.as_slice());
2508        let pl = polars::prelude::DataFrame::new_infer_height(vec![id.into(), date_string.into()])
2509            .unwrap();
2510        let df = spark.create_dataframe_from_polars(pl);
2511        let c = df.column("date_string").unwrap();
2512        let ts_col =
2513            functions::to_timestamp_fused_strip_fraction(&c, "yyyy-MM-dd'T'HH:mm:ss").unwrap();
2514        let out = with_column(&df, "date_parsed", &ts_col, false).unwrap();
2515        let non_null = out
2516            .filter(functions::col("date_parsed").is_not_null().into_expr())
2517            .unwrap()
2518            .count()
2519            .unwrap();
2520        assert_eq!(
2521            non_null, 0,
2522            "recent strings: fused path returns null for all (#168 parity)"
2523        );
2524    }
2525
2526    /// Issue #1054 / #293: with_column(explode(col)) must expand rows and preserve original list column.
2527    #[test]
2528    fn with_column_explode_adds_column_and_expands_rows() {
2529        use polars::chunked_array::builder::ListStringChunkedBuilder;
2530        use polars::prelude::{IntoSeries, ListBuilderTrait, NamedFrom, Series};
2531        let spark = SparkSession::builder()
2532            .app_name("with_column_explode_test")
2533            .get_or_create();
2534        let names = Series::new("Name".into(), &["Alice", "Bob", "Charlie"]);
2535        let mut list_builder = ListStringChunkedBuilder::new("Value".into(), 3, 16);
2536        list_builder.append_values_iter(["1", "2"].iter().copied());
2537        list_builder.append_values_iter(["2", "3"].iter().copied());
2538        list_builder.append_values_iter(["4", "5"].iter().copied());
2539        let value_series = list_builder.finish().into_series();
2540        let pl =
2541            polars::prelude::DataFrame::new_infer_height(vec![names.into(), value_series.into()])
2542                .unwrap();
2543        let df = spark.create_dataframe_from_polars(pl);
2544        let col_explode = functions::explode(&df.column("Value").unwrap());
2545        let out = with_column(&df, "ExplodedValue", &col_explode, false).unwrap();
2546        assert_eq!(
2547            out.count().unwrap(),
2548            6,
2549            "explode should produce 6 rows (2+2+2)"
2550        );
2551        let cols = out.columns().unwrap();
2552        assert!(cols.iter().any(|c| c == "Name"));
2553        assert!(cols.iter().any(|c| c == "Value"));
2554        assert!(cols.iter().any(|c| c == "ExplodedValue"));
2555    }
2556
2557    /// Issue #1111: with_column(map_col[col("Value")]) must resolve "Value" so map lookup works.
2558    #[test]
2559    fn with_column_map_get_with_column_key_resolves_key() {
2560        use polars::prelude::{NamedFrom, Series};
2561        let spark = SparkSession::builder()
2562            .app_name("map_get_test")
2563            .get_or_create();
2564        let names = Series::new("Name".into(), &["Alice", "Bob"]);
2565        let values = Series::new("Value".into(), [1i64, 3i64]);
2566        let pl = polars::prelude::DataFrame::new_infer_height(vec![names.into(), values.into()])
2567            .unwrap();
2568        let df = spark.create_dataframe_from_polars(pl);
2569        let mapping = functions::create_map(&[
2570            &functions::lit_i64(1),
2571            &functions::lit_str("Small"),
2572            &functions::lit_i64(2),
2573            &functions::lit_str("Medium"),
2574            &functions::lit_i64(3),
2575            &functions::lit_str("Large"),
2576        ])
2577        .unwrap();
2578        let size_col = mapping.get(&functions::col("Value"));
2579        let out = with_column(&df, "Size", &size_col, false).unwrap();
2580        let rows = out.collect_as_json_rows().unwrap();
2581        assert_eq!(rows.len(), 2);
2582        assert_eq!(rows[0].get("Size").and_then(|v| v.as_str()), Some("Small"));
2583        assert_eq!(rows[1].get("Size").and_then(|v| v.as_str()), Some("Large"));
2584    }
2585
2586    /// Issue #1054: select(Name, Value, explode(Value).alias("ExplodedValue")) must preserve list column and expand rows.
2587    #[test]
2588    fn select_with_explode_alias_preserves_list_column() {
2589        use polars::chunked_array::builder::ListStringChunkedBuilder;
2590        use polars::prelude::{ExplodeOptions, IntoSeries, ListBuilderTrait, NamedFrom, Series};
2591        let spark = SparkSession::builder()
2592            .app_name("select_explode_test")
2593            .get_or_create();
2594        let names = Series::new("Name".into(), &["Alice", "Bob"]);
2595        let mut list_builder = ListStringChunkedBuilder::new("Value".into(), 2, 8);
2596        list_builder.append_values_iter(["1", "2"].iter().copied());
2597        list_builder.append_values_iter(["2", "3"].iter().copied());
2598        let value_series = list_builder.finish().into_series();
2599        let pl =
2600            polars::prelude::DataFrame::new_infer_height(vec![names.into(), value_series.into()])
2601                .unwrap();
2602        let df = spark.create_dataframe_from_polars(pl);
2603        let explode_expr = polars::prelude::col("Value")
2604            .explode(ExplodeOptions {
2605                empty_as_null: false,
2606                keep_nulls: false,
2607            })
2608            .alias("ExplodedValue");
2609        let items = vec![
2610            SelectItem::ColumnName("Name"),
2611            SelectItem::ColumnName("Value"),
2612            SelectItem::Expr(explode_expr),
2613        ];
2614        let out = select_items(&df, items, false).unwrap();
2615        assert_eq!(
2616            out.count().unwrap(),
2617            4,
2618            "select with explode should produce 4 rows (2+2)"
2619        );
2620        let cols = out.columns().unwrap();
2621        assert!(cols.iter().any(|c| c == "Name"));
2622        assert!(cols.iter().any(|c| c == "Value"));
2623        assert!(cols.iter().any(|c| c == "ExplodedValue"));
2624    }
2625
2626    /// Issue #297: selecting an ambiguous column name with different casing (e.g. "NaMe")
2627    /// after a join where both "name" and "NAME" exist should:
2628    /// - pick the first matching physical column (left side of the join), and
2629    /// - expose it under the requested spelling ("NaMe").
2630    ///   #1267: select("dept", "salary", row_number().over(w)) must preserve dept/salary values.
2631    #[test]
2632    fn select_items_with_window_preserves_column_values() {
2633        let spark = SparkSession::builder()
2634            .app_name("select_window_1267")
2635            .get_or_create();
2636        let rows = vec![vec![json!("A"), json!(100)], vec![json!("A"), json!(200)]];
2637        let schema = vec![
2638            ("dept".to_string(), "string".to_string()),
2639            ("salary".to_string(), "bigint".to_string()),
2640        ];
2641        let df = spark
2642            .create_dataframe_from_rows(rows, schema, false, false)
2643            .unwrap();
2644        let rank_col = Column::row_number_over(&["dept"], &["salary".to_string()]).unwrap();
2645        let rank_expr = rank_col.into_expr().alias("rn");
2646        let items = vec![
2647            SelectItem::ColumnName("dept"),
2648            SelectItem::ColumnName("salary"),
2649            SelectItem::Expr(rank_expr),
2650        ];
2651        let out = select_items(&df, items, false).unwrap();
2652        let rows_out = out.collect_as_json_rows().unwrap();
2653        assert_eq!(rows_out.len(), 2, "expected 2 rows");
2654        let first = &rows_out[0];
2655        assert_eq!(
2656            first.get("dept").and_then(|v| v.as_str()),
2657            Some("A"),
2658            "first row dept must be A (#1267)"
2659        );
2660        assert_eq!(
2661            first.get("salary").and_then(|v| v.as_i64()),
2662            Some(100),
2663            "first row salary must be 100"
2664        );
2665        assert_eq!(
2666            first.get("rn").and_then(|v| v.as_i64()),
2667            Some(1),
2668            "first row rn must be 1"
2669        );
2670    }
2671
2672    #[test]
2673    fn select_items_ambiguous_case_prefers_first_match_and_uses_requested_name() {
2674        use polars::prelude::df;
2675
2676        let spark = SparkSession::builder()
2677            .app_name("select_ambiguous_case")
2678            .get_or_create();
2679        let left_pl = df!("name" => &["Alice"], "value" => &[1i64]).unwrap();
2680        let right_pl = df!("NAME" => &["Bob"], "other" => &[2i64]).unwrap();
2681        let left = spark.create_dataframe_from_polars(left_pl);
2682        let right = spark.create_dataframe_from_polars(right_pl);
2683        // Join on "Name" so both "name" and "NAME" columns are present after the join.
2684        let joined = left
2685            .join(&right, vec!["Name"], crate::dataframe::JoinType::Left)
2686            .unwrap();
2687
2688        let out = select_items(&joined, vec![SelectItem::ColumnName("NaMe")], false).unwrap();
2689        let cols = out.columns().unwrap();
2690        assert!(
2691            cols.contains(&"NaMe".to_string()),
2692            "ambiguous select must expose requested spelling"
2693        );
2694
2695        let pl = out.collect().unwrap();
2696        let name_series = pl.column("NaMe").unwrap().str().unwrap();
2697        assert_eq!(name_series.get(0).unwrap(), "Alice");
2698    }
2699
2700    #[test]
2701    fn head_n() {
2702        let df = test_df();
2703        let out = head(&df, 2, false).unwrap();
2704        assert_eq!(out.count().unwrap(), 2);
2705    }
2706
2707    #[test]
2708    fn offset_skip_first() {
2709        let df = test_df();
2710        let out = offset(&df, 1, false).unwrap();
2711        assert_eq!(out.count().unwrap(), 2);
2712    }
2713
2714    #[test]
2715    fn offset_beyond_length_returns_empty() {
2716        let df = test_df();
2717        let out = offset(&df, 10, false).unwrap();
2718        assert_eq!(out.count().unwrap(), 0);
2719    }
2720
2721    #[test]
2722    fn drop_column() {
2723        let df = test_df();
2724        let out = drop(&df, vec!["v"], false).unwrap();
2725        let cols = out.columns().unwrap();
2726        assert!(!cols.contains(&"v".to_string()));
2727        assert_eq!(out.count().unwrap(), 3);
2728    }
2729
2730    /// #681: union (same column order) with Int64 vs String in same position coerces to common type.
2731    #[test]
2732    fn union_coerces_int_str_same_position() {
2733        use polars::prelude::df;
2734
2735        let spark = SparkSession::builder()
2736            .app_name("transform_tests")
2737            .get_or_create();
2738        let left_pl = df!("id" => &[1i64, 2i64], "name" => &["a", "b"]).unwrap();
2739        let right_pl = df!("id" => &["3", "4"], "name" => &["c", "d"]).unwrap();
2740        let left = spark.create_dataframe_from_polars(left_pl);
2741        let right = spark.create_dataframe_from_polars(right_pl);
2742        let out = union(&left, &right, false).expect("#681: union must coerce id Int64 vs String");
2743        assert_eq!(out.count().unwrap(), 4);
2744        let cols = out.columns().unwrap();
2745        assert_eq!(cols.len(), 2);
2746        assert!(cols.contains(&"id".to_string()));
2747        assert!(cols.contains(&"name".to_string()));
2748        // #1262: collected rows must have id as string (PySpark union coerces numeric+string to string).
2749        let (_names, rows, schema) = out.collect_as_json_rows_with_names().unwrap();
2750        let id_field = schema.fields().iter().find(|f| f.name == "id").unwrap();
2751        assert!(matches!(
2752            id_field.data_type,
2753            robin_sparkless_core::DataType::String
2754        ));
2755        for row in &rows {
2756            let id_val = row.get("id").unwrap();
2757            assert!(
2758                matches!(id_val, serde_json::Value::String(_)),
2759                "id should be string, got {id_val:?}"
2760            );
2761        }
2762    }
2763
2764    /// Union with same column names in different order: right reordered to left's order (PR1).
2765    #[test]
2766    fn union_same_names_different_order() {
2767        use polars::prelude::df;
2768
2769        let spark = SparkSession::builder()
2770            .app_name("transform_tests")
2771            .get_or_create();
2772        let left_pl = df!("a" => &[1i64, 2i64], "b" => &["x", "y"]).unwrap();
2773        let right_pl = df!("b" => &["p", "q"], "a" => &[3i64, 4i64]).unwrap();
2774        let left = spark.create_dataframe_from_polars(left_pl);
2775        let right = spark.create_dataframe_from_polars(right_pl);
2776        let out = union(&left, &right, false).expect("union by name set should reorder right");
2777        assert_eq!(out.count().unwrap(), 4);
2778        let cols = out.columns().unwrap();
2779        assert_eq!(cols[0], "a");
2780        assert_eq!(cols[1], "b");
2781    }
2782
2783    /// Issue #603: unionByName with same-named columns of different types (e.g. id Int vs id String) must coerce and succeed.
2784    #[test]
2785    fn union_by_name_coerces_different_column_types() {
2786        use polars::prelude::df;
2787
2788        let spark = SparkSession::builder()
2789            .app_name("transform_tests")
2790            .get_or_create();
2791        let left_pl = df!("id" => &[1i64], "name" => &["a"]).unwrap();
2792        let left = spark.create_dataframe_from_polars(left_pl);
2793        let schema = vec![
2794            ("id".to_string(), "string".to_string()),
2795            ("name".to_string(), "string".to_string()),
2796        ];
2797        let right = spark
2798            .create_dataframe_from_rows(vec![vec![json!("2"), json!("b")]], schema, false, false)
2799            .unwrap();
2800        let out = union_by_name(&left, &right, true, false)
2801            .expect("issue #603: union_by_name must coerce id Int64 vs String");
2802        assert_eq!(out.count().unwrap(), 2);
2803    }
2804
2805    #[test]
2806    fn dropna_all_columns() {
2807        let df = test_df();
2808        let out = dropna(&df, None, "any", None, false).unwrap();
2809        assert_eq!(out.count().unwrap(), 3);
2810    }
2811
2812    /// #722: na.drop(subset=["NonExistentColumn"]) must raise (column not found).
2813    #[test]
2814    fn dropna_invalid_subset_column_raises() {
2815        let df = test_df();
2816        let result = dropna(&df, Some(vec!["NonExistentColumn"]), "any", None, false);
2817        match &result {
2818            Err(e) => assert!(
2819                e.to_string().to_lowercase().contains("not found")
2820                    || e.to_string().to_lowercase().contains("column"),
2821                "expected column-not-found error, got: {}",
2822                e
2823            ),
2824            Ok(_) => panic!("expected error for dropna with non-existent subset column"),
2825        }
2826    }
2827
2828    /// #1105: filter(col("full_id") == "rec1_cust1") after withColumn(full_id) must return 1 row.
2829    /// Condition is passed as Column.into_expr() (Alias(BinaryExpr(...))); expr_coerce_to_boolean must not coerce comparison operands.
2830    #[test]
2831    fn filter_string_equality_after_with_column() {
2832        let spark = SparkSession::builder()
2833            .app_name("filter_string_eq_test")
2834            .get_or_create();
2835        let pl = polars::prelude::df!["record_id" => &["rec1"], "cust_id" => &["cust1"]].unwrap();
2836        let df = spark.create_dataframe_from_polars(pl);
2837        let transformed = df
2838            .with_column_renamed("record_id", "id")
2839            .unwrap()
2840            .with_column_renamed("cust_id", "customer_id")
2841            .unwrap();
2842        let full_id_expr = concat_str(&[col("id"), col("customer_id")], "_", false);
2843        let transformed = with_column(
2844            &transformed,
2845            "full_id",
2846            &Column::from_expr(full_id_expr, None),
2847            false,
2848        )
2849        .unwrap();
2850        let transformed = select_items(
2851            &transformed,
2852            vec![
2853                SelectItem::Expr(col("id")),
2854                SelectItem::Expr(col("customer_id")),
2855                SelectItem::Expr(col("full_id")),
2856            ],
2857            false,
2858        )
2859        .unwrap();
2860        // Simulate Python: col("full_id") == "rec1_cust1" -> Column.into_expr() is Alias(BinaryExpr(...))
2861        let condition = Column::new("full_id".to_string())
2862            .eq(lit("rec1_cust1"))
2863            .into_expr();
2864        let result = filter(&transformed, condition, false).unwrap();
2865        assert_eq!(
2866            result.count().unwrap(),
2867            1,
2868            "#1105: filter on string column must return 1 row"
2869        );
2870    }
2871}