1use super::DataFrame;
7use crate::column::expect_col;
8use crate::functions::SortOrder;
9use crate::type_coercion::{coerce_expr_pair, find_common_type, is_numeric_public};
10use crate::udfs;
11use polars::prelude::{
12 DataType, Expr, Float64Chunked, IntoLazy, IntoSeries, NamedFrom, PlSmallStr, PolarsError,
13 SchemaNamesAndDtypes, Selector, Series, UniqueKeepStrategy, col, len, lit, repeat,
14};
15use std::cell::RefCell;
16use std::collections::{HashMap, HashSet};
17
18fn expr_contains_over(expr: &Expr) -> bool {
21 let found = RefCell::new(false);
22 let _ = expr.clone().try_map_expr(|e| {
23 if matches!(&e, Expr::Over { .. }) {
24 *found.borrow_mut() = true;
25 }
26 Ok(e)
27 });
28 found.into_inner()
29}
30
31fn expr_referenced_columns(expr: &Expr) -> HashSet<String> {
33 let refs = RefCell::new(HashSet::<String>::new());
34 let _ = expr.clone().try_map_expr(|e| {
35 if let Expr::Column(n) = &e {
36 refs.borrow_mut().insert(n.as_str().to_string());
37 }
38 Ok(e)
39 });
40 refs.into_inner()
41}
42
43fn expr_refs_column(expr: &Expr, column_name: &str) -> bool {
45 expr_referenced_columns(expr).contains(column_name)
46}
47
48fn find_explode_in_expr(expr: &Expr) -> Option<(Arc<Expr>, polars::prelude::ExplodeOptions)> {
50 if let Expr::Explode { input, options } = expr {
51 return Some((input.clone(), *options));
52 }
53 if let Expr::Alias(inner, _) = expr {
54 return find_explode_in_expr(inner.as_ref());
55 }
56 let out = RefCell::new(None);
58 let _ = expr.clone().try_map_expr(|e| {
59 if out.borrow().is_none() {
60 if let Expr::Explode { input, options } = &e {
61 out.borrow_mut().replace((input.clone(), *options));
62 }
63 }
64 Ok(e)
65 });
66 out.into_inner()
67}
68
69fn expand_pure_literal_to_rows(expr: Expr, first_col: Option<&str>) -> Result<Expr, PolarsError> {
72 let (inner, alias): (Expr, Option<PlSmallStr>) = match &expr {
73 Expr::Alias(e, name) => (e.as_ref().clone(), Some(name.clone())),
74 _ => (expr.clone(), None),
75 };
76 let expanded: Option<Expr> = match &inner {
78 Expr::Literal(lv) if lv.get_datatype() == DataType::String => {
79 let lit_val = lv.extract_str().unwrap_or("").to_string();
80 let out_dtype = DataType::String;
81 Some(if let Some(fc) = first_col {
82 let fc = fc.to_string();
83 use polars::datatypes::Field;
84 col(PlSmallStr::from(fc.as_str())).map(
85 move |c| expect_col(udfs::apply_literal_string_repeat(c, &lit_val)),
86 move |_schema, _field| Ok(Field::new("literal".into(), out_dtype.clone())),
87 )
88 } else {
89 repeat(lit(lit_val), len().cast(DataType::UInt32))
91 })
92 }
93 Expr::Cast { expr: e, dtype, .. } => {
94 if matches!(e.as_ref(), Expr::Literal(lv) if lv.is_null()) {
96 let out_dtype = match dtype {
97 polars::prelude::DataTypeExpr::Literal(dt)
98 if matches!(dt, DataType::List(_)) =>
99 {
100 dt.clone()
101 }
102 _ => return Ok(expr),
103 };
104 Some(if let Some(fc) = first_col {
105 let fc = fc.to_string();
106 let out_dtype_for_apply = out_dtype.clone();
107 let out_dtype_for_field = out_dtype.clone();
108 use polars::datatypes::Field;
109 col(PlSmallStr::from(fc.as_str())).map(
110 move |c| {
111 expect_col(udfs::apply_literal_null_list_repeat(
112 c,
113 &out_dtype_for_apply,
114 ))
115 },
116 move |_schema, _field| {
117 Ok(Field::new("literal".into(), out_dtype_for_field.clone()))
118 },
119 )
120 } else {
121 inner.clone()
123 })
124 } else {
125 None
126 }
127 }
128 _ => None,
129 };
130
131 let expanded = match expanded {
132 Some(e) => e,
133 None => return Ok(expr),
134 };
135 Ok(if let Some(name) = alias {
136 expanded.alias(name.as_str())
137 } else {
138 expanded
139 })
140}
141
142fn series_as_f64_ca(s: &Series, context: &str) -> Result<Float64Chunked, PolarsError> {
143 let s_f64 = s.cast(&DataType::Float64)?;
144 let ca = s_f64.f64().map_err(|_| {
145 PolarsError::ComputeError(format!("{}: need numeric/f64 column", context).into())
146 })?;
147 Ok(ca.clone())
148}
149use std::sync::Arc;
150
151pub fn select(
153 df: &DataFrame,
154 cols: Vec<&str>,
155 case_sensitive: bool,
156) -> Result<DataFrame, PolarsError> {
157 let resolved: Vec<String> = cols
158 .iter()
159 .map(|c| df.resolve_column_name(c))
160 .collect::<Result<Vec<_>, _>>()?;
161 let exprs: Vec<Expr> = resolved.iter().map(|s| col(s.as_str())).collect();
162 let lf = df.lazy_frame().select(&exprs);
163 Ok(super::DataFrame::from_lazy_with_options(lf, case_sensitive))
164}
165
166pub fn select_with_exprs(
171 df: &DataFrame,
172 exprs: Vec<Expr>,
173 case_sensitive: bool,
174 exprs_already_resolved: bool,
175) -> Result<DataFrame, PolarsError> {
176 let exprs: Vec<Expr> = if exprs_already_resolved {
177 exprs
178 } else {
179 exprs
180 .into_iter()
181 .map(|e| df.resolve_expr_column_names(e))
182 .collect::<Result<Vec<_>, _>>()?
183 };
184 let exprs: Vec<Expr> = exprs
185 .into_iter()
186 .map(|e| df.coerce_string_numeric_comparisons(e))
187 .collect::<Result<Vec<_>, _>>()?;
188 let df_columns: HashSet<String> = df
189 .columns()
190 .ok()
191 .map(|c| c.into_iter().map(|s| s.as_str().to_string()).collect())
192 .unwrap_or_default();
193 let first_col = df_columns.iter().next().map(String::as_str);
194 let exprs: Vec<Expr> = exprs
195 .into_iter()
196 .map(|e| expand_pure_literal_to_rows(e.clone(), first_col).unwrap_or(e))
197 .collect();
198
199 type PosexplodeTarget = (
201 PlSmallStr,
202 polars::prelude::ExplodeOptions,
203 [(PlSmallStr, usize); 2], );
205 let posexplode_target: Option<PosexplodeTarget> = {
206 let mut by_col: HashMap<String, Vec<(polars::prelude::ExplodeOptions, String, usize)>> =
207 HashMap::new();
208 for (i, e) in exprs.iter().enumerate() {
209 let (inner, alias_name) = match e {
210 Expr::Alias(inner, name) => (inner.as_ref(), name.as_str().to_string()),
211 _ => continue,
212 };
213 if let Some((input, options)) = find_explode_in_expr(inner) {
214 let refs = expr_referenced_columns(input.as_ref());
215 let frame_refs: Vec<String> = refs.intersection(&df_columns).cloned().collect();
217 if frame_refs.len() == 1 {
218 let list_col = frame_refs.into_iter().next().unwrap();
219 by_col
220 .entry(list_col.clone())
221 .or_default()
222 .push((options, alias_name, i));
223 }
224 }
225 }
226 by_col
227 .into_iter()
228 .find(|(_, v)| v.len() == 2)
229 .map(|(list_col, mut v)| {
230 v.sort_by_key(|(_, _, i)| *i);
231 let list_col = PlSmallStr::from(list_col.as_str());
232 let options = v[0].0;
233 (
234 list_col,
235 options,
236 [
237 (PlSmallStr::from(v[0].1.as_str()), v[0].2),
238 (PlSmallStr::from(v[1].1.as_str()), v[1].2),
239 ],
240 )
241 })
242 };
243
244 let posexplode_pe_col = posexplode_target
249 .as_ref()
250 .map(|(lc, _, _)| format!("__pe_{}", lc.as_str()));
251
252 type ExplodeTarget = (
253 PlSmallStr,
254 Option<PlSmallStr>,
255 polars::prelude::ExplodeOptions,
256 );
257 let mut explode_target: Option<ExplodeTarget> = None;
258 let exprs: Vec<Expr> = exprs
259 .into_iter()
260 .enumerate()
261 .map(|(i, e)| {
262 if let (Some(pt), Some(pe_col)) = (&posexplode_target, &posexplode_pe_col) {
263 if i == pt.2[0].1 {
264 return col(pe_col.as_str())
265 .struct_()
266 .field_by_name("pos")
267 .alias(pt.2[0].0.as_str());
268 }
269 if i == pt.2[1].1 {
270 return col(pe_col.as_str())
271 .struct_()
272 .field_by_name("col")
273 .alias(pt.2[1].0.as_str());
274 }
275 }
276 match e {
277 Expr::Alias(inner, name) => {
278 let inner_ref = inner.as_ref();
279 let (explode_input, options): (
280 Option<Arc<Expr>>,
281 polars::prelude::ExplodeOptions,
282 ) = if let Expr::Explode { input, options } = inner_ref {
283 (Some(input.clone()), *options)
284 } else if let Expr::Alias(inner2, _) = inner_ref {
285 if let Expr::Explode { input, options } = inner2.as_ref() {
286 (Some(input.clone()), *options)
287 } else {
288 (
289 None,
290 polars::prelude::ExplodeOptions {
291 empty_as_null: false,
292 keep_nulls: false,
293 },
294 )
295 }
296 } else {
297 (
298 None,
299 polars::prelude::ExplodeOptions {
300 empty_as_null: false,
301 keep_nulls: false,
302 },
303 )
304 };
305 if let (Some(input), options) = (explode_input, options) {
306 if let Expr::Column(col_name) = input.as_ref() {
307 if explode_target.is_none() {
308 let alias_name = if col_name.as_str() != name.as_str() {
309 Some(name.clone())
310 } else {
311 None
312 };
313 explode_target = Some((col_name.clone(), alias_name, options));
314 }
315 let out_col = explode_target
316 .as_ref()
317 .and_then(|(_, a, _)| a.clone())
318 .unwrap_or_else(|| col_name.clone());
319 Expr::Alias(Arc::new(Expr::Column(out_col)), name)
320 } else {
321 Expr::Alias(inner, name)
322 }
323 } else {
324 Expr::Alias(inner, name)
325 }
326 }
327 Expr::Explode { input, options } => {
328 if let Expr::Column(col_name) = input.as_ref() {
329 if explode_target.is_none() {
330 explode_target = Some((col_name.clone(), None, options));
331 }
332 let (_, alias_name, _) = explode_target.as_ref().unwrap();
333 let out_col = alias_name.clone().unwrap_or_else(|| col_name.clone());
334 Expr::Column(out_col)
335 } else {
336 Expr::Explode { input, options }
337 }
338 }
339 other => other,
340 }
341 })
342 .collect();
343 let mut name_count: HashMap<String, u32> = HashMap::new();
344 let mut output_names: Vec<String> = Vec::new();
345 let exprs: Vec<Expr> = exprs
346 .into_iter()
347 .map(|e| {
348 let base_name = polars_plan::utils::expr_output_name(&e)
349 .map(|s| s.to_string())
350 .unwrap_or_else(|_| "_".to_string());
351 let count = name_count.entry(base_name.clone()).or_insert(0);
352 *count += 1;
353 let final_name = if *count == 1 {
354 base_name.clone()
355 } else {
356 format!("{}_{}", base_name, *count - 1)
357 };
358 output_names.push(final_name.clone());
359 if *count == 1 {
360 e
361 } else {
362 e.alias(final_name.as_str())
363 }
364 })
365 .collect();
366
367 let mut lf = df.lazy_frame();
370 let had_explode = explode_target.is_some() || posexplode_target.is_some();
371
372 if let Some((explode_col, alias_name, options)) = explode_target {
377 if let Some(alias) = &alias_name {
378 lf = lf.with_column(col(explode_col.as_str()).alias(alias.as_str()));
379 let selector = Selector::ByName {
380 names: Arc::from([alias.clone()]),
381 strict: true,
382 };
383 lf = lf.explode(selector, options);
384 } else {
385 let selector = Selector::ByName {
386 names: Arc::from([explode_col]),
387 strict: true,
388 };
389 lf = lf.explode(selector, options);
390 }
391 }
392 if let Some((list_col, options, _)) = &posexplode_target {
394 let pe_col = format!("__pe_{}", list_col.as_str());
395 use polars::prelude::as_struct;
396 let pos_inner = (col("").cum_count(false) - lit(1i64)).alias("pos");
397 let val_inner = col("").alias("col");
398 let list_struct = col(list_col.as_str())
399 .list()
400 .eval(as_struct(vec![pos_inner, val_inner]));
401 lf = lf.with_column(list_struct.alias(pe_col.as_str()));
402 let selector = Selector::ByName {
403 names: Arc::from([PlSmallStr::from(pe_col.as_str())]),
404 strict: true,
405 };
406 lf = lf.explode(selector, *options);
407 }
408 let no_col_refs = !had_explode
412 && first_col.is_some()
413 && exprs.iter().all(|e| {
414 expr_referenced_columns(e)
415 .intersection(&df_columns)
416 .next()
417 .is_none()
418 });
419 let lf = if no_col_refs {
420 let first = first_col.unwrap();
421 let lf_key = lf.clone().select([col(first)]);
422 let lf_vals = lf.select(&exprs);
423 let joined = lf_key.cross_join(lf_vals, None);
424 let right_exprs: Vec<Expr> = output_names.iter().map(|n| col(n.as_str())).collect();
425 joined.select(right_exprs)
426 } else {
427 lf.select(&exprs)
428 };
429 if df.is_eager() {
433 let pl_df = lf.collect()?;
434 Ok(super::DataFrame::from_eager_with_options(
435 pl_df,
436 case_sensitive,
437 ))
438 } else {
439 Ok(super::DataFrame::from_lazy_with_options(lf, case_sensitive))
440 }
441}
442
443#[derive(Clone)]
446pub enum SelectItem<'a> {
447 ColumnName(&'a str),
449 Expr(Expr),
451}
452
453fn struct_field_safe_alias(dotted_name: &str) -> String {
455 format!("__sf_{}", dotted_name.replace('.', "_"))
456}
457
458pub fn select_items(
460 df: &DataFrame,
461 items: Vec<SelectItem<'_>>,
462 case_sensitive: bool,
463) -> Result<DataFrame, PolarsError> {
464 let mut exprs = Vec::with_capacity(items.len());
465 let mut rename_after: Vec<(String, String)> = Vec::new();
466 for item in items {
467 match item {
468 SelectItem::ColumnName(name) => {
469 if let Ok(cols) = df.columns() {
476 let name_lower = name.to_lowercase();
477 let matches: Vec<String> = cols
478 .iter()
479 .filter(|c| c.to_lowercase() == name_lower)
480 .cloned()
481 .collect();
482 if matches.len() > 1 {
483 use polars::prelude::coalesce as pl_coalesce;
484 let coalesce_exprs: Vec<Expr> =
485 matches.iter().map(|m| col(m.as_str())).collect();
486 let coalesced = pl_coalesce(&coalesce_exprs);
487 exprs.push(coalesced.alias(name));
488 continue;
489 }
490 }
491 if name.contains('.') {
494 let e = col(name);
495 let resolved = df.resolve_expr_column_names(e)?;
496 let coerced = df.coerce_string_numeric_comparisons(resolved)?;
497 let safe = struct_field_safe_alias(name);
498 let last_segment = name.split('.').next_back().unwrap_or(name);
499 rename_after.push((safe.clone(), last_segment.to_string()));
500 exprs.push(coerced.alias(safe));
501 } else {
502 df.check_ambiguous_unqualified(name)?;
503 let resolved = df.resolve_column_name(name)?;
504 exprs.push(col(resolved).alias(name));
506 }
507 }
508 SelectItem::Expr(e) => {
509 let name_for_alias = if let polars::prelude::Expr::Column(n) = &e {
510 let s = n.as_str();
511 if s.contains('.') {
512 Some(s.to_string())
513 } else {
514 None
515 }
516 } else if let polars::prelude::Expr::Alias(_, n) = &e {
517 let s = n.as_str();
518 if s.contains('.') {
519 Some(s.to_string())
520 } else {
521 None
522 }
523 } else {
524 None
525 };
526 let resolved = df.resolve_expr_column_names(e)?;
527 let coerced = df.coerce_string_numeric_comparisons(resolved)?;
528 if let Some(name) = name_for_alias {
529 let safe = struct_field_safe_alias(&name);
530 let last_segment = name.split('.').next_back().unwrap_or(&name).to_string();
531 rename_after.push((safe.clone(), last_segment));
532 exprs.push(coerced.alias(safe));
533 } else {
534 exprs.push(coerced);
535 }
536 }
537 }
538 }
539 let mut result = select_with_exprs(df, exprs, case_sensitive, false)?;
540 for (from, to) in rename_after {
541 result = result.with_column_renamed(&from, &to)?;
542 }
543 Ok(result)
544}
545
546pub fn filter(
556 df: &DataFrame,
557 condition: Expr,
558 case_sensitive: bool,
559) -> Result<DataFrame, PolarsError> {
560 if !case_sensitive {
572 if let Ok(cols) = df.columns() {
573 let df_cols: HashSet<String> = cols.into_iter().map(|c| c.to_lowercase()).collect();
574 let referenced = expr_referenced_columns(&condition);
575 if !referenced.is_empty() {
576 let all_missing = referenced.iter().all(|name| {
577 if name.contains('.') {
578 false
581 } else {
582 !df_cols.contains(&name.to_lowercase())
583 }
584 });
585 if all_missing {
586 return Ok(df.clone());
587 }
588 }
589 }
590 }
591 if expr_contains_over(&condition) {
592 return Err(PolarsError::InvalidOperation(
593 "it is not allowed to use window functions inside WHERE clause".into(),
594 ));
595 }
596 let condition = df.resolve_expr_column_names(condition)?;
597 let condition = df.coerce_string_numeric_comparisons(condition)?;
598 let condition = crate::functions::expr_coerce_to_boolean(condition);
601 let lf = df.lazy_frame().filter(condition);
602 Ok(super::DataFrame::from_lazy_with_options(lf, case_sensitive))
603}
604
605pub fn with_column(
607 df: &DataFrame,
608 column_name: &str,
609 column: &crate::column::Column,
610 case_sensitive: bool,
611) -> Result<DataFrame, PolarsError> {
612 if let Some(deferred) = column.deferred {
614 match deferred {
615 crate::column::DeferredRandom::Rand(seed) => {
616 let pl_df = df.collect_inner()?;
617 let mut pl_df = pl_df.as_ref().clone();
618 let n = pl_df.height();
619 let series = crate::udfs::series_rand_n(column_name, n, seed);
620 pl_df.with_column(series.into())?;
621 return Ok(super::DataFrame::from_polars_with_options(
622 pl_df,
623 case_sensitive,
624 ));
625 }
626 crate::column::DeferredRandom::Randn(seed) => {
627 let pl_df = df.collect_inner()?;
628 let mut pl_df = pl_df.as_ref().clone();
629 let n = pl_df.height();
630 let series = crate::udfs::series_randn_n(column_name, n, seed);
631 pl_df.with_column(series.into())?;
632 return Ok(super::DataFrame::from_polars_with_options(
633 pl_df,
634 case_sensitive,
635 ));
636 }
637 }
638 }
639 let mut expr = df.resolve_expr_column_names(column.expr().clone())?;
640 expr = df.coerce_string_numeric_comparisons(expr)?;
641 if column.is_array_expr {
644 let refs = expr_referenced_columns(&expr);
645 if refs.len() >= 2 {
646 let mut has_bool = false;
647 let mut has_non_bool = false;
648 for name in refs.iter() {
649 if let Some(dt) = df.get_column_dtype(name) {
650 match dt {
651 DataType::Boolean => has_bool = true,
652 _ => has_non_bool = true,
653 }
654 }
655 }
656 if has_bool && has_non_bool {
657 return Err(PolarsError::ComputeError(
658 "array() does not support mixed BooleanType with other element types; cast columns to a common type first".into(),
659 ));
660 }
661 }
662 }
663 if let Ok(cols) = df.columns() {
664 let first_col = cols.into_iter().next();
665 if let Ok(expanded) = expand_pure_literal_to_rows(expr.clone(), first_col.as_deref()) {
666 expr = expanded;
667 }
668 }
669 let lf = df.lazy_frame();
673 let lf = if let Ok(existing) = df.resolve_column_name(column_name) {
674 let all = df.columns()?;
675 let existing_str = existing.as_str();
676 if expr_refs_column(&expr, existing_str) {
677 let inner = match &expr {
678 Expr::Alias(e, _) => e.as_ref(),
679 e => e,
680 };
681 let refs = expr_referenced_columns(inner);
683 let use_frame_explode = refs.len() == 1
684 && refs.contains(existing_str)
685 && matches!(inner, Expr::Explode { .. });
686 if use_frame_explode {
687 let options = match inner {
688 Expr::Explode { options, .. } => *options,
689 _ => unreachable!(),
690 };
691 let selector = Selector::ByName {
692 names: Arc::from([PlSmallStr::from(existing_str)]),
693 strict: true,
694 };
695 lf.explode(selector, options)
696 } else {
697 let select_exprs: Vec<Expr> = all
699 .iter()
700 .map(|n| {
701 if n.as_str() == existing_str {
702 expr.clone().alias(column_name)
703 } else {
704 col(n.as_str())
705 }
706 })
707 .collect();
708 lf.select(select_exprs)
709 }
710 } else {
711 let to_keep: Vec<Expr> = all
712 .iter()
713 .filter(|n| n.as_str() != existing_str)
714 .map(|n| col(n.as_str()))
715 .collect();
716 lf.select(&to_keep).with_column(expr.alias(column_name))
717 }
718 } else {
719 let inner = match &expr {
722 Expr::Alias(e, _) => e.as_ref(),
723 e => e,
724 };
725 if let Expr::Explode {
726 input,
727 options: explode_opts,
728 } = inner
729 {
730 if let Expr::Column(explode_col) = input.as_ref() {
731 let refs = expr_referenced_columns(inner);
732 if refs.len() == 1 {
733 let explode_col_str = explode_col.as_str();
734 if df.resolve_column_name(explode_col_str).is_ok() {
735 let lf_with_copy =
737 lf.with_column(col(explode_col.as_str()).alias(column_name));
738 let selector = Selector::ByName {
739 names: Arc::from([PlSmallStr::from(column_name)]),
740 strict: true,
741 };
742 lf_with_copy.explode(selector, *explode_opts)
743 } else {
744 lf.with_column(expr.alias(column_name))
745 }
746 } else {
747 lf.with_column(expr.alias(column_name))
748 }
749 } else {
750 lf.with_column(expr.alias(column_name))
751 }
752 } else {
753 lf.with_column(expr.alias(column_name))
754 }
755 };
756 Ok(super::DataFrame::from_lazy_with_options(lf, case_sensitive))
757}
758
759pub fn order_by(
764 df: &DataFrame,
765 column_names: Vec<&str>,
766 ascending: Vec<bool>,
767 case_sensitive: bool,
768) -> Result<DataFrame, PolarsError> {
769 use polars::prelude::*;
770 let mut asc = ascending;
771 while asc.len() < column_names.len() {
772 asc.push(true);
773 }
774 asc.truncate(column_names.len());
775 for name in &column_names {
779 df.check_ambiguous_unqualified(name)?;
780 }
781 let resolved: Vec<String> = column_names
782 .iter()
783 .map(|c| df.resolve_column_name(c))
784 .collect::<Result<Vec<_>, _>>()?;
785 let exprs: Vec<Expr> = resolved.iter().map(|s| col(s.as_str())).collect();
786 let descending: Vec<bool> = asc.iter().map(|&a| !a).collect();
787 let nulls_last: Vec<bool> = vec![true; column_names.len()];
789 let lf = df.lazy_frame().sort_by_exprs(
790 exprs,
791 SortMultipleOptions::new()
792 .with_order_descending_multi(descending)
793 .with_nulls_last_multi(nulls_last),
794 );
795 Ok(super::DataFrame::from_lazy_with_options(lf, case_sensitive))
796}
797
798pub fn order_by_exprs(
803 df: &DataFrame,
804 sort_orders: Vec<SortOrder>,
805 case_sensitive: bool,
806) -> Result<DataFrame, PolarsError> {
807 use polars::prelude::*;
808 if sort_orders.is_empty() {
809 return Ok(super::DataFrame::from_lazy_with_options(
810 df.lazy_frame(),
811 case_sensitive,
812 ));
813 }
814 let exprs: Vec<Expr> = sort_orders
815 .iter()
816 .map(|s| {
817 let e = df.resolve_expr_column_names(s.expr().clone())?;
818 df.coerce_string_numeric_comparisons(e)
819 })
820 .collect::<Result<Vec<_>, _>>()?;
821 let descending: Vec<bool> = sort_orders.iter().map(|s| s.descending).collect();
822 let nulls_last: Vec<bool> = sort_orders.iter().map(|s| s.nulls_last).collect();
823 let opts = SortMultipleOptions::new()
824 .with_order_descending_multi(descending)
825 .with_nulls_last_multi(nulls_last);
826 let lf = df.lazy_frame().sort_by_exprs(exprs, opts);
827 Ok(super::DataFrame::from_lazy_with_options(lf, case_sensitive))
828}
829
830pub fn union(
836 left: &DataFrame,
837 right: &DataFrame,
838 case_sensitive: bool,
839) -> Result<DataFrame, PolarsError> {
840 let left_names = left.columns()?;
841 let right_names = right.columns()?;
842 if left_names.len() != right_names.len() {
843 return Err(PolarsError::InvalidOperation(
844 format!(
845 "union: column count must match. Left: {:?}, Right: {:?}",
846 left_names, right_names
847 )
848 .into(),
849 ));
850 }
851 let right_names_set: std::collections::HashSet<_> = if case_sensitive {
852 right_names.iter().cloned().collect()
853 } else {
854 right_names.iter().map(|s| s.to_lowercase()).collect()
855 };
856 let names_match = left_names.iter().all(|n| {
857 let key = if case_sensitive {
858 n.clone()
859 } else {
860 n.to_lowercase()
861 };
862 right_names_set.contains(&key)
863 });
864
865 let debug_union = std::env::var("SPARKLESS_DEBUG_UNION").as_deref() == Ok("1");
866 let (left_exprs, right_exprs) = if names_match {
867 let mut left_exprs = Vec::with_capacity(left_names.len());
869 let mut right_exprs = Vec::with_capacity(right_names.len());
870 for name in &left_names {
871 let resolved_left = left.resolve_column_name(name)?;
872 let resolved_right = right.resolve_column_name(name)?;
873 let left_dtype = left.get_column_dtype(name).unwrap_or(DataType::Null);
874 let right_dtype = right.get_column_dtype(name).unwrap_or(DataType::Null);
875 let mut target = if left_dtype == DataType::Null {
876 right_dtype.clone()
877 } else if right_dtype == DataType::Null || left_dtype == right_dtype {
878 left_dtype.clone()
879 } else {
880 find_common_type(&left_dtype, &right_dtype)?
881 };
882 if (left_dtype == DataType::String && is_numeric_public(&right_dtype))
885 || (right_dtype == DataType::String && is_numeric_public(&left_dtype))
886 {
887 target = DataType::String;
888 }
889 let need_coerce = left_dtype != target || right_dtype != target;
890 if debug_union {
891 eprintln!(
892 "[union #1262] name={:?} left_dtype={:?} right_dtype={:?} target={:?} need_coerce={}",
893 name, left_dtype, right_dtype, target, need_coerce
894 );
895 }
896 let left_expr = if need_coerce {
897 col(resolved_left.as_str()).cast(target.clone())
898 } else {
899 col(resolved_left.as_str())
900 };
901 let right_expr = if need_coerce {
902 col(resolved_right.as_str()).cast(target)
903 } else {
904 col(resolved_right.as_str())
905 };
906 left_exprs.push(left_expr.alias(name.as_str()));
907 right_exprs.push(right_expr.alias(name.as_str()));
908 }
909 (left_exprs, right_exprs)
910 } else {
911 let mut left_exprs = Vec::with_capacity(left_names.len());
913 let mut right_exprs = Vec::with_capacity(right_names.len());
914 for (i, left_name) in left_names.iter().enumerate() {
915 let right_name = right_names.get(i).ok_or_else(|| {
916 PolarsError::InvalidOperation("union by position: index out of range".into())
917 })?;
918 let resolved_left = left.resolve_column_name(left_name)?;
919 let resolved_right = right.resolve_column_name(right_name)?;
920 let left_dtype = left.get_column_dtype(left_name).unwrap_or(DataType::Null);
921 let right_dtype = right.get_column_dtype(right_name).unwrap_or(DataType::Null);
922 let mut target = if left_dtype == DataType::Null {
923 right_dtype.clone()
924 } else if right_dtype == DataType::Null || left_dtype == right_dtype {
925 left_dtype.clone()
926 } else {
927 find_common_type(&left_dtype, &right_dtype)?
928 };
929 if (left_dtype == DataType::String && is_numeric_public(&right_dtype))
930 || (right_dtype == DataType::String && is_numeric_public(&left_dtype))
931 {
932 target = DataType::String;
933 }
934 let need_coerce = left_dtype != target || right_dtype != target;
935 if debug_union {
936 eprintln!(
937 "[union #1262] left_name={:?} right_name={:?} left_dtype={:?} right_dtype={:?} target={:?} need_coerce={}",
938 left_name, right_name, left_dtype, right_dtype, target, need_coerce
939 );
940 }
941 let left_expr = if need_coerce {
942 col(resolved_left.as_str()).cast(target.clone())
943 } else {
944 col(resolved_left.as_str())
945 };
946 let right_expr = if need_coerce {
947 col(resolved_right.as_str()).cast(target)
948 } else {
949 col(resolved_right.as_str())
950 };
951 left_exprs.push(left_expr.alias(left_name.as_str()));
952 right_exprs.push(right_expr.alias(left_name.as_str()));
953 }
954 (left_exprs, right_exprs)
955 };
956
957 let lf1 = left.lazy_frame().select(&left_exprs);
958 let lf2 = right.lazy_frame().select(&right_exprs);
959 let mut out = lf1.collect()?;
962 let df2 = lf2.collect()?;
963 if debug_union {
964 eprintln!(
965 "[union #1262] after lf1.collect() schema: {:?}",
966 out.schema().iter_names_and_dtypes().collect::<Vec<_>>()
967 );
968 }
969 out.vstack_mut(&df2)?;
970 if debug_union {
971 eprintln!(
972 "[union #1262] after vstack schema: {:?}",
973 out.schema().iter_names_and_dtypes().collect::<Vec<_>>()
974 );
975 }
976 Ok(super::DataFrame::from_eager_with_options(
977 out,
978 case_sensitive,
979 ))
980}
981
982pub fn union_by_name(
987 left: &DataFrame,
988 right: &DataFrame,
989 allow_missing_columns: bool,
990 case_sensitive: bool,
991) -> Result<DataFrame, PolarsError> {
992 use crate::type_coercion::find_common_type;
993 use polars::prelude::*;
994
995 let left_names = left.columns()?;
996 let right_names = right.columns()?;
997 let contains = |names: &[String], name: &str| -> bool {
998 if case_sensitive {
999 names.iter().any(|n| n.as_str() == name)
1000 } else {
1001 let name_lower = name.to_lowercase();
1002 names
1003 .iter()
1004 .any(|n| n.as_str().to_lowercase() == name_lower)
1005 }
1006 };
1007 let resolve = |names: &[String], name: &str| -> Option<String> {
1008 if case_sensitive {
1009 names.iter().find(|n| n.as_str() == name).cloned()
1010 } else {
1011 let name_lower = name.to_lowercase();
1012 names
1013 .iter()
1014 .find(|n| n.as_str().to_lowercase() == name_lower)
1015 .cloned()
1016 }
1017 };
1018 let all_columns: Vec<String> = if allow_missing_columns {
1019 let mut out = left_names.clone();
1020 for r in &right_names {
1021 if !contains(&out, r.as_str()) {
1022 out.push(r.clone());
1023 }
1024 }
1025 out
1026 } else {
1027 left_names.clone()
1028 };
1029 let mut left_exprs: Vec<Expr> = Vec::with_capacity(all_columns.len());
1031 let mut right_exprs: Vec<Expr> = Vec::with_capacity(all_columns.len());
1032 for c in &all_columns {
1033 let left_has = resolve(&left_names, c.as_str());
1034 let right_has = resolve(&right_names, c.as_str());
1035 let left_dtype = left_has.as_ref().and_then(|r| left.get_column_dtype(r));
1036 let right_dtype = right_has.as_ref().and_then(|r| right.get_column_dtype(r));
1037 if let (Some(l), Some(r)) = (&left_has, &right_has) {
1039 if let (Some(lt), Some(rt)) = (&left_dtype, &right_dtype) {
1040 if lt != rt {
1041 let (le, re) = coerce_expr_pair(l, r, lt, rt, c).map_err(|e| {
1042 PolarsError::ComputeError(
1043 format!("union_by_name: column '{}' type coercion: {}", c, e).into(),
1044 )
1045 })?;
1046 left_exprs.push(le);
1047 right_exprs.push(re);
1048 continue;
1049 }
1050 }
1051 }
1052 let common_dtype = match (&left_dtype, &right_dtype) {
1056 (Some(lt), Some(rt)) if lt != rt => find_common_type(lt, rt).map_err(|e| {
1057 PolarsError::ComputeError(
1058 format!("union_by_name: column '{}' type coercion: {}", c, e).into(),
1059 )
1060 })?,
1061 (Some(lt), Some(_)) => lt.clone(),
1062 (Some(lt), None) | (None, Some(lt)) => lt.clone(),
1066 (None, None) => polars::prelude::DataType::Null,
1067 };
1068 let left_expr = match &left_has {
1069 Some(r) => col(r.as_str()).cast(common_dtype.clone()).alias(c.as_str()),
1070 None => polars::prelude::lit(polars::prelude::NULL)
1071 .cast(common_dtype.clone())
1072 .alias(c.as_str()),
1073 };
1074 left_exprs.push(left_expr);
1075 let right_expr = match &right_has {
1076 Some(r) => col(r.as_str()).cast(common_dtype.clone()).alias(c.as_str()),
1077 None if allow_missing_columns => polars::prelude::lit(polars::prelude::NULL)
1078 .cast(common_dtype)
1079 .alias(c.as_str()),
1080 None => {
1081 return Err(PolarsError::InvalidOperation(
1082 format!(
1083 "union_by_name: column '{}' missing in right DataFrame (allow_missing_columns=False)",
1084 c
1085 )
1086 .into(),
1087 ));
1088 }
1089 };
1090 right_exprs.push(right_expr);
1091 }
1092 let lf1 = left.lazy_frame().select(&left_exprs);
1093 let lf2 = right.lazy_frame().select(&right_exprs);
1094 let out = polars::prelude::concat([lf1, lf2], UnionArgs::default())?;
1095 Ok(super::DataFrame::from_lazy_with_options(
1096 out,
1097 case_sensitive,
1098 ))
1099}
1100
1101pub fn distinct(
1103 df: &DataFrame,
1104 subset: Option<Vec<&str>>,
1105 case_sensitive: bool,
1106) -> Result<DataFrame, PolarsError> {
1107 let subset_names: Option<Vec<String>> = subset
1108 .map(|cols| {
1109 cols.iter()
1110 .map(|s| df.resolve_column_name(s))
1111 .collect::<Result<Vec<_>, _>>()
1112 })
1113 .transpose()?;
1114 let subset_selector: Option<Selector> = subset_names.map(|names| Selector::ByName {
1115 names: Arc::from(names.into_iter().map(PlSmallStr::from).collect::<Vec<_>>()),
1116 strict: false,
1117 });
1118 let lf = df
1119 .lazy_frame()
1120 .unique(subset_selector, UniqueKeepStrategy::First);
1121 Ok(super::DataFrame::from_lazy_with_options(lf, case_sensitive))
1122}
1123
1124pub fn drop(
1126 df: &DataFrame,
1127 columns: Vec<&str>,
1128 case_sensitive: bool,
1129) -> Result<DataFrame, PolarsError> {
1130 let resolved: Vec<String> = columns
1131 .iter()
1132 .map(|c| df.resolve_column_name(c))
1133 .collect::<Result<Vec<_>, _>>()?;
1134 let all_names = df.columns()?;
1135 let to_keep: Vec<Expr> = all_names
1136 .iter()
1137 .filter(|n| !resolved.iter().any(|r| r == n.as_str()))
1138 .map(|n| col(n.as_str()))
1139 .collect();
1140 let lf = df.lazy_frame().select(&to_keep);
1141 Ok(super::DataFrame::from_lazy_with_options(lf, case_sensitive))
1142}
1143
1144pub fn dropna(
1148 df: &DataFrame,
1149 subset: Option<Vec<&str>>,
1150 how: &str,
1151 thresh: Option<usize>,
1152 case_sensitive: bool,
1153) -> Result<DataFrame, PolarsError> {
1154 use polars::prelude::*;
1155 let cols: Vec<String> = match &subset {
1156 Some(c) => c
1157 .iter()
1158 .map(|n| df.resolve_column_name(n))
1159 .collect::<Result<Vec<_>, _>>()?,
1160 None => df.columns()?,
1161 };
1162 let col_exprs: Vec<Expr> = cols.iter().map(|c| col(c.as_str())).collect();
1163 let base_lf = df.lazy_frame();
1164 let lf = if let Some(n) = thresh {
1165 let count_expr: Expr = col_exprs
1167 .iter()
1168 .map(|e| e.clone().is_not_null().cast(DataType::Int32))
1169 .fold(lit(0i32), |a, b| a + b);
1170 base_lf.filter(count_expr.gt_eq(lit(n as i32)))
1171 } else if how.eq_ignore_ascii_case("all") {
1172 let any_not_null: Expr = col_exprs
1174 .into_iter()
1175 .map(|e| e.is_not_null())
1176 .fold(lit(false), |a, b| a.or(b));
1177 base_lf.filter(any_not_null)
1178 } else {
1179 let subset_selector = Selector::ByName {
1181 names: Arc::from(
1182 cols.iter()
1183 .map(|s| PlSmallStr::from(s.as_str()))
1184 .collect::<Vec<_>>(),
1185 ),
1186 strict: false,
1187 };
1188 base_lf.drop_nulls(Some(subset_selector))
1189 };
1190 Ok(super::DataFrame::from_lazy_with_options(lf, case_sensitive))
1191}
1192
1193pub fn fillna(
1197 df: &DataFrame,
1198 value_expr: Expr,
1199 subset: Option<Vec<&str>>,
1200 case_sensitive: bool,
1201) -> Result<DataFrame, PolarsError> {
1202 use polars::prelude::*;
1203 let exprs: Vec<Expr> = match subset {
1204 Some(cols) => cols
1205 .iter()
1206 .map(|n| {
1207 let resolved = df.resolve_column_name(n)?;
1208 let fill = match df.get_column_dtype(resolved.as_str()) {
1209 Some(dt) => value_expr.clone().cast(dt),
1210 None => value_expr.clone(),
1211 };
1212 Ok(col(resolved.as_str()).fill_null(fill))
1213 })
1214 .collect::<Result<Vec<_>, PolarsError>>()?,
1215 None => df
1216 .columns()?
1217 .iter()
1218 .map(|n| {
1219 let fill = match df.get_column_dtype(n) {
1220 Some(dt) => value_expr.clone().cast(dt),
1221 None => value_expr.clone(),
1222 };
1223 col(n.as_str()).fill_null(fill)
1224 })
1225 .collect(),
1226 };
1227 let lf = df.lazy_frame().with_columns(exprs);
1228 Ok(super::DataFrame::from_lazy_with_options(lf, case_sensitive))
1229}
1230
1231pub fn limit(df: &DataFrame, n: usize, case_sensitive: bool) -> Result<DataFrame, PolarsError> {
1233 let lf = df.lazy_frame().slice(0, n as u32);
1235 Ok(super::DataFrame::from_lazy_with_options(lf, case_sensitive))
1236}
1237
1238pub fn with_column_renamed(
1240 df: &DataFrame,
1241 old_name: &str,
1242 new_name: &str,
1243 case_sensitive: bool,
1244) -> Result<DataFrame, PolarsError> {
1245 match df.resolve_column_name(old_name) {
1246 Ok(resolved) => {
1247 let lf = df
1248 .lazy_frame()
1249 .rename([resolved.as_str()], [new_name], true);
1250 Ok(super::DataFrame::from_lazy_with_options(lf, case_sensitive))
1251 }
1252 Err(PolarsError::ColumnNotFound(_)) => Ok(df.clone()),
1254 Err(e) => Err(e),
1255 }
1256}
1257
1258pub fn replace(
1261 df: &DataFrame,
1262 column_name: &str,
1263 old_value: Expr,
1264 new_value: Expr,
1265 case_sensitive: bool,
1266) -> Result<DataFrame, PolarsError> {
1267 use polars::prelude::*;
1268 let resolved = df.resolve_column_name(column_name)?;
1269 let eq_expr = col(resolved.as_str()).eq(old_value);
1270 let coerced_eq = df.coerce_string_numeric_comparisons(eq_expr)?;
1271 let repl = when(coerced_eq)
1272 .then(new_value)
1273 .otherwise(col(resolved.as_str()));
1274 let lf = df.lazy_frame().with_column(repl.alias(resolved.as_str()));
1275 Ok(super::DataFrame::from_lazy_with_options(lf, case_sensitive))
1276}
1277
1278pub fn cross_join(
1282 left: &DataFrame,
1283 right: &DataFrame,
1284 case_sensitive: bool,
1285) -> Result<DataFrame, PolarsError> {
1286 use polars::prelude::col;
1287 let left_names = left.columns()?;
1288 let right_names = right.columns()?;
1289 let right_set: std::collections::HashSet<&str> =
1290 right_names.iter().map(|s| s.as_str()).collect();
1291 let left_set: std::collections::HashSet<&str> = left_names.iter().map(|s| s.as_str()).collect();
1292 let left_ordered = order_columns_common_first(&left_names, &right_set);
1294 let right_ordered = order_columns_common_first(&right_names, &left_set);
1295 let exprs_left: Vec<_> = left_ordered.iter().map(|s| col(*s)).collect();
1296 let exprs_right: Vec<_> = right_ordered
1298 .iter()
1299 .map(|s| {
1300 if left_set.contains(*s) {
1301 col(*s).alias(format!("{}_right", s))
1302 } else {
1303 col(*s)
1304 }
1305 })
1306 .collect();
1307 let lf_left = left.lazy_frame().select(&exprs_left);
1308 let lf_right = right.lazy_frame().select(&exprs_right);
1309 let out = lf_left.cross_join(lf_right, None);
1310 Ok(super::DataFrame::from_lazy_with_options(
1311 out,
1312 case_sensitive,
1313 ))
1314}
1315
1316fn order_columns_common_first<'a>(
1317 names: &'a [String],
1318 other: &std::collections::HashSet<&str>,
1319) -> Vec<&'a str> {
1320 let mut common = Vec::new();
1321 let mut rest = Vec::new();
1322 for n in names {
1323 let s = n.as_str();
1324 if other.contains(s) {
1325 common.push(s);
1326 } else {
1327 rest.push(s);
1328 }
1329 }
1330 common.into_iter().chain(rest).collect()
1331}
1332
1333pub fn describe(df: &DataFrame, case_sensitive: bool) -> Result<DataFrame, PolarsError> {
1336 use polars::prelude::*;
1337 let pl_df = df.collect_inner()?.as_ref().clone();
1338 let mut stat_values: Vec<Column> = Vec::new();
1339 for col in pl_df.columns() {
1340 let s = col.as_materialized_series();
1341 let dtype = s.dtype();
1342 if dtype.is_numeric() {
1343 let name = s.name().clone();
1344 let count = s.len() as i64 - s.null_count() as i64;
1345 let mean_f = s.mean().unwrap_or(f64::NAN);
1346 let std_f = s.std(1).unwrap_or(f64::NAN);
1347 let ca = series_as_f64_ca(s, "describe")?;
1348 let min_f = ca.min().unwrap_or(f64::NAN);
1349 let max_f = ca.max().unwrap_or(f64::NAN);
1350 let is_float = matches!(dtype, DataType::Float64 | DataType::Float32);
1353 let count_s = count.to_string();
1354 let mean_s = if mean_f.is_nan() {
1355 "null".to_string()
1356 } else {
1357 format!("{:.1}", mean_f)
1358 };
1359 let std_s = if std_f.is_nan() {
1360 "null".to_string()
1361 } else {
1362 format!("{:.1}", std_f)
1363 };
1364 let min_s = if min_f.is_nan() {
1365 "null".to_string()
1366 } else if min_f.fract() == 0.0 && is_float {
1367 format!("{:.1}", min_f)
1368 } else if min_f.fract() == 0.0 {
1369 format!("{:.0}", min_f)
1370 } else {
1371 format!("{min_f}")
1372 };
1373 let max_s = if max_f.is_nan() {
1374 "null".to_string()
1375 } else if max_f.fract() == 0.0 && is_float {
1376 format!("{:.1}", max_f)
1377 } else if max_f.fract() == 0.0 {
1378 format!("{:.0}", max_f)
1379 } else {
1380 format!("{max_f}")
1381 };
1382 let series = Series::new(
1383 name,
1384 [
1385 count_s.as_str(),
1386 mean_s.as_str(),
1387 std_s.as_str(),
1388 min_s.as_str(),
1389 max_s.as_str(),
1390 ],
1391 );
1392 stat_values.push(series.into());
1393 }
1394 }
1395 if stat_values.is_empty() {
1396 let stat_col = Series::new(
1398 "summary".into(),
1399 &["count", "mean", "stddev", "min", "max" as &str],
1400 )
1401 .into();
1402 let empty: Vec<f64> = Vec::new();
1403 let empty_series = Series::new("placeholder".into(), empty).into();
1404 let out_pl = polars::prelude::DataFrame::new_infer_height(vec![stat_col, empty_series])?;
1405 return Ok(super::DataFrame::from_polars_with_options(
1406 out_pl,
1407 case_sensitive,
1408 ));
1409 }
1410 let summary_col = Series::new(
1411 "summary".into(),
1412 &["count", "mean", "stddev", "min", "max" as &str],
1413 )
1414 .into();
1415 let mut cols: Vec<Column> = vec![summary_col];
1416 cols.extend(stat_values);
1417 let out_pl = polars::prelude::DataFrame::new_infer_height(cols)?;
1418 Ok(super::DataFrame::from_polars_with_options(
1419 out_pl,
1420 case_sensitive,
1421 ))
1422}
1423
1424pub fn subtract(
1427 left: &DataFrame,
1428 right: &DataFrame,
1429 case_sensitive: bool,
1430) -> Result<DataFrame, PolarsError> {
1431 use polars::prelude::*;
1432 let left_names = left.columns()?;
1433 let right_names = right.columns()?;
1434 let right_on: Vec<Expr> = left_names
1435 .iter()
1436 .map(|ln| {
1437 let resolved = if case_sensitive {
1438 right_names
1439 .iter()
1440 .find(|rn| rn.as_str() == ln.as_str())
1441 .cloned()
1442 .ok_or_else(|| {
1443 PolarsError::ColumnNotFound(
1444 format!(
1445 "cannot resolve: subtract: column '{}' not found on right",
1446 ln
1447 )
1448 .into(),
1449 )
1450 })?
1451 } else {
1452 let ln_lower = ln.to_lowercase();
1453 right_names
1454 .iter()
1455 .find(|rn| rn.to_lowercase() == ln_lower)
1456 .cloned()
1457 .ok_or_else(|| {
1458 PolarsError::ColumnNotFound(
1459 format!(
1460 "cannot resolve: subtract: column '{}' not found on right",
1461 ln
1462 )
1463 .into(),
1464 )
1465 })?
1466 };
1467 Ok(col(resolved.as_str()))
1468 })
1469 .collect::<Result<Vec<_>, PolarsError>>()?;
1470 let left_on: Vec<Expr> = left_names.iter().map(|n| col(n.as_str())).collect();
1471 let right_lf = right.lazy_frame();
1472 let left_lf = left.lazy_frame();
1473 let anti = left_lf.join(right_lf, left_on, right_on, JoinArgs::new(JoinType::Anti));
1474 Ok(super::DataFrame::from_lazy_with_options(
1475 anti,
1476 case_sensitive,
1477 ))
1478}
1479
1480pub fn intersect(
1483 left: &DataFrame,
1484 right: &DataFrame,
1485 case_sensitive: bool,
1486) -> Result<DataFrame, PolarsError> {
1487 use polars::prelude::*;
1488 let left_names = left.columns()?;
1489 let right_names = right.columns()?;
1490 let right_on: Vec<Expr> = left_names
1491 .iter()
1492 .map(|ln| {
1493 let resolved = if case_sensitive {
1494 right_names
1495 .iter()
1496 .find(|rn| rn.as_str() == ln.as_str())
1497 .cloned()
1498 .ok_or_else(|| {
1499 PolarsError::ColumnNotFound(
1500 format!(
1501 "cannot resolve: intersect: column '{}' not found on right",
1502 ln
1503 )
1504 .into(),
1505 )
1506 })?
1507 } else {
1508 let ln_lower = ln.to_lowercase();
1509 right_names
1510 .iter()
1511 .find(|rn| rn.to_lowercase() == ln_lower)
1512 .cloned()
1513 .ok_or_else(|| {
1514 PolarsError::ColumnNotFound(
1515 format!(
1516 "cannot resolve: intersect: column '{}' not found on right",
1517 ln
1518 )
1519 .into(),
1520 )
1521 })?
1522 };
1523 Ok(col(resolved.as_str()))
1524 })
1525 .collect::<Result<Vec<_>, PolarsError>>()?;
1526 let left_on: Vec<Expr> = left_names.iter().map(|n| col(n.as_str())).collect();
1527 let left_lf = left.lazy_frame();
1528 let right_lf = right.lazy_frame();
1529 let semi = left_lf
1530 .join(right_lf, left_on, right_on, JoinArgs::new(JoinType::Semi))
1531 .unique(None, UniqueKeepStrategy::First);
1532 Ok(super::DataFrame::from_lazy_with_options(
1533 semi,
1534 case_sensitive,
1535 ))
1536}
1537
1538pub fn sample(
1542 df: &DataFrame,
1543 with_replacement: bool,
1544 fraction: f64,
1545 seed: Option<u64>,
1546 case_sensitive: bool,
1547) -> Result<DataFrame, PolarsError> {
1548 use polars::prelude::Series;
1549 let pl = df.collect_inner()?;
1550 let n = pl.height();
1551 if n == 0 {
1552 return Ok(super::DataFrame::from_lazy_with_options(
1553 polars::prelude::DataFrame::empty().lazy(),
1554 case_sensitive,
1555 ));
1556 }
1557 let take_n = (n as f64 * fraction).round() as usize;
1558 let take_n = take_n.min(n).max(0);
1559 if take_n == 0 {
1560 return Ok(super::DataFrame::from_lazy_with_options(
1561 pl.as_ref().head(Some(0)).lazy(),
1562 case_sensitive,
1563 ));
1564 }
1565 let idx_series = Series::new("idx".into(), (0..n).map(|i| i as u32).collect::<Vec<_>>());
1566 let sampled_idx = idx_series.sample_n(take_n, with_replacement, true, seed)?;
1567 let idx_ca = sampled_idx
1568 .u32()
1569 .map_err(|_| PolarsError::ComputeError("sample: expected u32 indices".into()))?;
1570 let pl_df = pl.as_ref().take(idx_ca)?;
1571 Ok(super::DataFrame::from_polars_with_options(
1572 pl_df,
1573 case_sensitive,
1574 ))
1575}
1576
1577pub fn random_split(
1581 df: &DataFrame,
1582 weights: &[f64],
1583 seed: Option<u64>,
1584 case_sensitive: bool,
1585) -> Result<Vec<DataFrame>, PolarsError> {
1586 let total: f64 = weights.iter().sum();
1587 if total <= 0.0 || weights.is_empty() {
1588 return Ok(Vec::new());
1589 }
1590 let pl = df.collect_inner()?;
1591 let n = pl.height();
1592 if n == 0 {
1593 return Ok(weights.iter().map(|_| super::DataFrame::empty()).collect());
1594 }
1595 let mut cum = Vec::with_capacity(weights.len());
1597 let mut acc = 0.0_f64;
1598 for w in weights {
1599 acc += w / total;
1600 cum.push(acc);
1601 }
1602 use polars::prelude::Series;
1604 use rand::Rng;
1605 use rand::SeedableRng;
1606 let mut rng = rand::rngs::StdRng::seed_from_u64(seed.unwrap_or(0));
1607 let mut bucket_indices: Vec<Vec<u32>> = (0..weights.len()).map(|_| Vec::new()).collect();
1608 for i in 0..n {
1609 let r: f64 = rng.r#gen();
1610 let bucket = cum
1611 .iter()
1612 .position(|&c| r < c)
1613 .unwrap_or(weights.len().saturating_sub(1));
1614 bucket_indices[bucket].push(i as u32);
1615 }
1616 let pl = pl.as_ref();
1617 let mut out = Vec::with_capacity(weights.len());
1618 for indices in bucket_indices {
1619 if indices.is_empty() {
1620 out.push(super::DataFrame::from_polars_with_options(
1621 pl.clone().head(Some(0)),
1622 case_sensitive,
1623 ));
1624 } else {
1625 let idx_series = Series::new("idx".into(), indices);
1626 let idx_ca = idx_series.u32().map_err(|_| {
1627 PolarsError::ComputeError("random_split: expected u32 indices".into())
1628 })?;
1629 let taken = pl.take(idx_ca)?;
1630 out.push(super::DataFrame::from_polars_with_options(
1631 taken,
1632 case_sensitive,
1633 ));
1634 }
1635 }
1636 Ok(out)
1637}
1638
1639pub fn sample_by(
1642 df: &DataFrame,
1643 col_name: &str,
1644 fractions: &[(Expr, f64)],
1645 seed: Option<u64>,
1646 case_sensitive: bool,
1647) -> Result<DataFrame, PolarsError> {
1648 use polars::prelude::*;
1649 if fractions.is_empty() {
1650 return Ok(super::DataFrame::from_lazy_with_options(
1651 df.lazy_frame().slice(0, 0),
1652 case_sensitive,
1653 ));
1654 }
1655 let resolved = df.resolve_column_name(col_name)?;
1656 let mut parts = Vec::with_capacity(fractions.len());
1657 for (value_expr, frac) in fractions {
1658 let cond = col(resolved.as_str()).eq(value_expr.clone());
1659 let filtered = df.lazy_frame().filter(cond).collect()?;
1660 if filtered.height() == 0 {
1661 parts.push(filtered.head(Some(0)));
1662 continue;
1663 }
1664 let sampled = sample(
1665 &super::DataFrame::from_polars_with_options(filtered, case_sensitive),
1666 false,
1667 *frac,
1668 seed,
1669 case_sensitive,
1670 )?;
1671 parts.push(sampled.collect_inner()?.as_ref().clone());
1672 }
1673 let mut out = parts
1674 .first()
1675 .ok_or_else(|| PolarsError::ComputeError("sample_by: no parts".into()))?
1676 .clone();
1677 for p in parts.iter().skip(1) {
1678 out.vstack_mut(p)?;
1679 }
1680 Ok(super::DataFrame::from_polars_with_options(
1681 out,
1682 case_sensitive,
1683 ))
1684}
1685
1686pub fn first(df: &DataFrame, case_sensitive: bool) -> Result<DataFrame, PolarsError> {
1690 let limited = limit(df, 1, case_sensitive)?;
1691 let pl_df = limited.collect_inner()?.as_ref().clone();
1692 Ok(super::DataFrame::from_polars_with_options(
1693 pl_df,
1694 case_sensitive,
1695 ))
1696}
1697
1698pub fn head(df: &DataFrame, n: usize, case_sensitive: bool) -> Result<DataFrame, PolarsError> {
1700 limit(df, n, case_sensitive)
1701}
1702
1703pub fn take(df: &DataFrame, n: usize, case_sensitive: bool) -> Result<DataFrame, PolarsError> {
1705 limit(df, n, case_sensitive)
1706}
1707
1708pub fn tail(df: &DataFrame, n: usize, case_sensitive: bool) -> Result<DataFrame, PolarsError> {
1710 let pl = df.collect_inner()?;
1711 let total = pl.height();
1712 let skip = total.saturating_sub(n);
1713 let pl_df = pl.as_ref().clone().slice(skip as i64, n);
1714 Ok(super::DataFrame::from_polars_with_options(
1715 pl_df,
1716 case_sensitive,
1717 ))
1718}
1719
1720pub fn is_empty(df: &DataFrame) -> bool {
1722 df.count().map(|n| n == 0).unwrap_or(true)
1723}
1724
1725pub fn to_df(
1727 df: &DataFrame,
1728 names: &[&str],
1729 case_sensitive: bool,
1730) -> Result<DataFrame, PolarsError> {
1731 let cols = df.columns()?;
1732 if names.len() != cols.len() {
1733 return Err(PolarsError::ComputeError(
1734 format!(
1735 "toDF: expected {} column names, got {}",
1736 cols.len(),
1737 names.len()
1738 )
1739 .into(),
1740 ));
1741 }
1742 let pl_df = df.collect_inner()?;
1743 let mut pl_df = pl_df.as_ref().clone();
1744 for (old, new) in cols.iter().zip(names.iter()) {
1745 pl_df.rename(old.as_str(), (*new).into())?;
1746 }
1747 Ok(super::DataFrame::from_polars_with_options(
1748 pl_df,
1749 case_sensitive,
1750 ))
1751}
1752
1753fn any_value_to_serde_value(av: &polars::prelude::AnyValue) -> serde_json::Value {
1756 use polars::prelude::AnyValue;
1757 use serde_json::Number;
1758 match av {
1759 AnyValue::Null => serde_json::Value::Null,
1760 AnyValue::Boolean(v) => serde_json::Value::Bool(*v),
1761 AnyValue::Int8(v) => serde_json::Value::Number(Number::from(*v as i64)),
1762 AnyValue::Int32(v) => serde_json::Value::Number(Number::from(*v)),
1763 AnyValue::Int64(v) => serde_json::Value::Number(Number::from(*v)),
1764 AnyValue::UInt32(v) => serde_json::Value::Number(Number::from(*v)),
1765 AnyValue::Float64(v) => Number::from_f64(*v)
1766 .map(serde_json::Value::Number)
1767 .unwrap_or(serde_json::Value::Null),
1768 AnyValue::String(v) => serde_json::Value::String(v.to_string()),
1769 AnyValue::StringOwned(v) => serde_json::Value::String(v.to_string()),
1770 _ => serde_json::Value::String(format!("{av:?}")),
1771 }
1772}
1773
1774pub(crate) fn literal_value_to_serde_value(
1776 lv: &polars::prelude::LiteralValue,
1777) -> Option<serde_json::Value> {
1778 lv.to_any_value().as_ref().map(any_value_to_serde_value)
1779}
1780
1781pub fn to_json(df: &DataFrame) -> Result<Vec<String>, PolarsError> {
1783 use polars::prelude::*;
1784 let collected = df.collect_inner()?;
1785 let pl = collected.as_ref();
1786 let names = pl.get_column_names();
1787 let mut out = Vec::with_capacity(pl.height());
1788 for r in 0..pl.height() {
1789 let mut row = serde_json::Map::new();
1790 for (i, name) in names.iter().enumerate() {
1791 let col = pl
1792 .columns()
1793 .get(i)
1794 .ok_or_else(|| PolarsError::ComputeError("to_json: column index".into()))?;
1795 let series = col.as_materialized_series();
1796 let av = series
1797 .get(r)
1798 .map_err(|e| PolarsError::ComputeError(e.to_string().into()))?;
1799 row.insert(name.to_string(), any_value_to_serde_value(&av));
1800 }
1801 out.push(
1802 serde_json::to_string(&row)
1803 .map_err(|e| PolarsError::ComputeError(e.to_string().into()))?,
1804 );
1805 }
1806 Ok(out)
1807}
1808
1809pub fn explain(_df: &DataFrame) -> String {
1811 "DataFrame (eager Polars backend)".to_string()
1812}
1813
1814pub fn print_schema(df: &DataFrame) -> Result<String, PolarsError> {
1816 let schema = df.schema()?;
1817 let mut s = "root\n".to_string();
1818 for f in schema.fields() {
1819 let dt = match &f.data_type {
1820 crate::schema::DataType::String => "string",
1821 crate::schema::DataType::Integer => "int",
1822 crate::schema::DataType::Long => "bigint",
1823 crate::schema::DataType::Double => "double",
1824 crate::schema::DataType::Boolean => "boolean",
1825 crate::schema::DataType::Date => "date",
1826 crate::schema::DataType::Timestamp => "timestamp",
1827 _ => "string",
1828 };
1829 s.push_str(&format!(" |-- {}: {}\n", f.name, dt));
1830 }
1831 Ok(s)
1832}
1833
1834fn parse_simple_expr(df: &DataFrame, s: &str) -> Result<Option<Expr>, PolarsError> {
1838 let s = s.trim();
1839 for (op, kind) in [
1840 (" * ", "mul"),
1841 ("*", "mul"),
1842 (" + ", "add"),
1843 ("+", "add"),
1844 (" - ", "sub"),
1845 (" / ", "div"),
1846 ("/", "div"),
1847 ] {
1848 if let Some((a, b)) = s.split_once(op) {
1849 let a = a.trim();
1850 let b = b.trim();
1851 let (col_part, num_part, col_on_left) =
1852 if df.resolve_column_name(a).is_ok() && b.parse::<f64>().is_ok() {
1853 (a, b, true)
1854 } else if df.resolve_column_name(b).is_ok() && a.parse::<f64>().is_ok() {
1855 (b, a, false)
1856 } else {
1857 continue;
1858 };
1859 let resolved = df.resolve_column_name(col_part)?;
1860 let col_expr = col(resolved.as_str());
1861 let num: f64 = num_part.parse().map_err(|_| {
1862 PolarsError::ComputeError(
1863 format!("selectExpr: could not parse literal {num_part:?}").into(),
1864 )
1865 })?;
1866 let lit_expr = lit(num);
1867 let expr = match kind {
1868 "mul" => col_expr * lit_expr,
1869 "add" => col_expr + lit_expr,
1870 "sub" => {
1871 if col_on_left {
1872 col_expr - lit_expr
1873 } else {
1874 lit_expr - col_expr
1875 }
1876 }
1877 "div" => col_expr / lit_expr,
1878 _ => continue,
1879 };
1880 return Ok(Some(expr));
1881 }
1882 }
1883 Ok(None)
1884}
1885
1886pub fn select_expr(
1888 df: &DataFrame,
1889 exprs: &[String],
1890 case_sensitive: bool,
1891) -> Result<DataFrame, PolarsError> {
1892 let mut select_exprs: Vec<Expr> = Vec::new();
1893 for e in exprs {
1894 let e = e.trim();
1895 if let Some((left, right)) = e.split_once(" as ") {
1896 let left = left.trim();
1897 let alias = right.trim();
1898 if let Some(expr) = parse_simple_expr(df, left)? {
1899 select_exprs.push(expr.alias(alias));
1900 } else {
1901 let resolved = df.resolve_column_name(left)?;
1902 select_exprs.push(col(resolved.as_str()).alias(alias));
1903 }
1904 } else {
1905 let resolved = df.resolve_column_name(e)?;
1906 select_exprs.push(col(resolved.as_str()));
1907 }
1908 }
1909 select_with_exprs(df, select_exprs, case_sensitive, false)
1910}
1911
1912pub fn col_regex(
1914 df: &DataFrame,
1915 pattern: &str,
1916 case_sensitive: bool,
1917) -> Result<DataFrame, PolarsError> {
1918 let re = regex::Regex::new(pattern).map_err(|e| {
1919 PolarsError::ComputeError(format!("colRegex: invalid pattern {pattern:?}: {e}").into())
1920 })?;
1921 let names = df.columns()?;
1922 let matched: Vec<&str> = names
1923 .iter()
1924 .filter(|n| re.is_match(n))
1925 .map(|s| s.as_str())
1926 .collect();
1927 if matched.is_empty() {
1928 return Err(PolarsError::ComputeError(
1929 format!("colRegex: no columns matched pattern {pattern:?}").into(),
1930 ));
1931 }
1932 select(df, matched, case_sensitive)
1933}
1934
1935pub fn with_columns(
1937 df: &DataFrame,
1938 exprs: &[(String, crate::column::Column)],
1939 case_sensitive: bool,
1940) -> Result<DataFrame, PolarsError> {
1941 let pl = df.collect_inner()?.as_ref().clone();
1942 let mut current = super::DataFrame::from_polars_with_options(pl, case_sensitive);
1943 for (name, col) in exprs {
1944 current = with_column(¤t, name, col, case_sensitive)?;
1945 }
1946 Ok(current)
1947}
1948
1949pub fn with_columns_renamed(
1951 df: &DataFrame,
1952 renames: &[(String, String)],
1953 case_sensitive: bool,
1954) -> Result<DataFrame, PolarsError> {
1955 let mut lf = df.lazy_frame();
1958 let mut applied_any = false;
1959 for (old_name, new_name) in renames {
1960 match df.resolve_column_name(old_name) {
1961 Ok(resolved) => {
1962 lf = lf.rename([resolved.as_str()], [new_name.as_str()], true);
1963 applied_any = true;
1964 }
1965 Err(PolarsError::ColumnNotFound(_)) => {
1966 continue;
1968 }
1969 Err(e) => return Err(e),
1970 }
1971 }
1972 if !applied_any {
1973 return Ok(df.clone());
1974 }
1975 Ok(super::DataFrame::from_lazy_with_options(lf, case_sensitive))
1976}
1977
1978pub struct DataFrameNa<'a> {
1980 pub(crate) df: &'a DataFrame,
1981}
1982
1983impl<'a> DataFrameNa<'a> {
1984 pub fn new(df: &'a DataFrame) -> Self {
1986 DataFrameNa { df }
1987 }
1988
1989 pub fn fill(&self, value: Expr, subset: Option<Vec<&str>>) -> Result<DataFrame, PolarsError> {
1991 fillna(self.df, value, subset, self.df.case_sensitive)
1992 }
1993
1994 pub fn replace(
1996 &self,
1997 old_value: Expr,
1998 new_value: Expr,
1999 subset: Option<Vec<&str>>,
2000 ) -> Result<DataFrame, PolarsError> {
2001 let cols: Vec<String> = match &subset {
2002 Some(s) => s.iter().map(|x| (*x).to_string()).collect(),
2003 None => self.df.columns()?,
2004 };
2005 let mut result = self.df.clone();
2006 for col_name in &cols {
2007 result = replace(
2008 &result,
2009 col_name.as_str(),
2010 old_value.clone(),
2011 new_value.clone(),
2012 self.df.case_sensitive,
2013 )?;
2014 }
2015 Ok(result)
2016 }
2017
2018 pub fn drop(
2020 &self,
2021 subset: Option<Vec<&str>>,
2022 how: &str,
2023 thresh: Option<usize>,
2024 ) -> Result<DataFrame, PolarsError> {
2025 dropna(self.df, subset, how, thresh, self.df.case_sensitive)
2026 }
2027}
2028
2029pub fn offset(df: &DataFrame, n: usize, case_sensitive: bool) -> Result<DataFrame, PolarsError> {
2033 let lf = df.lazy_frame().slice(n as i64, u32::MAX);
2034 Ok(super::DataFrame::from_lazy_with_options(lf, case_sensitive))
2035}
2036
2037pub fn transform<F>(df: &DataFrame, f: F) -> Result<DataFrame, PolarsError>
2039where
2040 F: FnOnce(DataFrame) -> Result<DataFrame, PolarsError>,
2041{
2042 let df_out = f(df.clone())?;
2043 Ok(df_out)
2044}
2045
2046pub fn freq_items(
2048 df: &DataFrame,
2049 columns: &[&str],
2050 support: f64,
2051 case_sensitive: bool,
2052) -> Result<DataFrame, PolarsError> {
2053 use polars::prelude::SeriesMethods;
2054 if columns.is_empty() {
2055 return Ok(super::DataFrame::from_lazy_with_options(
2056 df.lazy_frame().slice(0, 0),
2057 case_sensitive,
2058 ));
2059 }
2060 let support = support.clamp(1e-4, 1.0);
2061 let collected = df.collect_inner()?;
2062 let pl_df = collected.as_ref();
2063 let n_total = pl_df.height() as f64;
2064 if n_total == 0.0 {
2065 let mut out = Vec::with_capacity(columns.len());
2066 for col_name in columns {
2067 let resolved = df.resolve_column_name(col_name)?;
2068 let s = pl_df
2069 .column(resolved.as_str())?
2070 .as_series()
2071 .ok_or_else(|| PolarsError::ComputeError("column not a series".into()))?
2072 .clone();
2073 let empty_sub = s.head(Some(0));
2074 let list_chunked = polars::prelude::ListChunked::from_iter([empty_sub].into_iter())
2075 .with_name(format!("{resolved}_freqItems").into());
2076 out.push(list_chunked.into_series().into());
2077 }
2078 return Ok(super::DataFrame::from_polars_with_options(
2079 polars::prelude::DataFrame::new_infer_height(out)?,
2080 case_sensitive,
2081 ));
2082 }
2083 let mut out_series = Vec::with_capacity(columns.len());
2084 for col_name in columns {
2085 let resolved = df.resolve_column_name(col_name)?;
2086 let s = pl_df
2087 .column(resolved.as_str())?
2088 .as_series()
2089 .ok_or_else(|| PolarsError::ComputeError("column not a series".into()))?
2090 .clone();
2091 let vc = s.value_counts(false, false, "counts".into(), false)?;
2092 let count_col = vc
2093 .column("counts")
2094 .map_err(|_| PolarsError::ComputeError("value_counts missing counts column".into()))?;
2095 let counts = count_col
2096 .u32()
2097 .map_err(|_| PolarsError::ComputeError("freq_items: counts column not u32".into()))?;
2098 let value_col_name = s.name();
2099 let values_col = vc
2100 .column(value_col_name.as_str())
2101 .map_err(|_| PolarsError::ComputeError("value_counts missing value column".into()))?;
2102 let threshold = (support * n_total).ceil() as u32;
2103 let indices: Vec<u32> = counts
2104 .into_iter()
2105 .enumerate()
2106 .filter_map(|(i, c)| {
2107 if c? >= threshold {
2108 Some(i as u32)
2109 } else {
2110 None
2111 }
2112 })
2113 .collect();
2114 let idx_series = Series::new("idx".into(), indices);
2115 let idx_ca = idx_series
2116 .u32()
2117 .map_err(|_| PolarsError::ComputeError("freq_items: index series not u32".into()))?;
2118 let values_series = values_col
2119 .as_series()
2120 .ok_or_else(|| PolarsError::ComputeError("value column not a series".into()))?;
2121 let filtered = values_series.take(idx_ca)?;
2122 let list_chunked = polars::prelude::ListChunked::from_iter([filtered].into_iter())
2123 .with_name(format!("{resolved}_freqItems").into());
2124 let list_row = list_chunked.into_series();
2125 out_series.push(list_row.into());
2126 }
2127 let out_df = polars::prelude::DataFrame::new_infer_height(out_series)?;
2128 Ok(super::DataFrame::from_polars_with_options(
2129 out_df,
2130 case_sensitive,
2131 ))
2132}
2133
2134pub fn approx_quantile(
2136 df: &DataFrame,
2137 column: &str,
2138 probabilities: &[f64],
2139 case_sensitive: bool,
2140) -> Result<DataFrame, PolarsError> {
2141 use polars::prelude::{ChunkQuantile, QuantileMethod};
2142 if probabilities.is_empty() {
2143 return Ok(super::DataFrame::from_polars_with_options(
2144 polars::prelude::DataFrame::new_infer_height(vec![
2145 Series::new("quantile".into(), Vec::<f64>::new()).into(),
2146 ])?,
2147 case_sensitive,
2148 ));
2149 }
2150 let resolved = df.resolve_column_name(column)?;
2151 let collected = df.collect_inner()?;
2152 let s = collected
2153 .column(resolved.as_str())?
2154 .as_series()
2155 .ok_or_else(|| PolarsError::ComputeError("approx_quantile: column not a series".into()))?
2156 .clone();
2157 let ca = series_as_f64_ca(&s, "approx_quantile")?;
2158 let mut quantiles = Vec::with_capacity(probabilities.len());
2159 for &p in probabilities {
2160 let q = ca.quantile(p, QuantileMethod::Linear)?;
2161 quantiles.push(q.unwrap_or(f64::NAN));
2162 }
2163 let out_df = polars::prelude::DataFrame::new_infer_height(vec![
2164 Series::new("quantile".into(), quantiles).into(),
2165 ])?;
2166 Ok(super::DataFrame::from_polars_with_options(
2167 out_df,
2168 case_sensitive,
2169 ))
2170}
2171
2172pub fn crosstab(
2174 df: &DataFrame,
2175 col1: &str,
2176 col2: &str,
2177 case_sensitive: bool,
2178) -> Result<DataFrame, PolarsError> {
2179 use polars::prelude::*;
2180 let c1 = df.resolve_column_name(col1)?;
2181 let c2 = df.resolve_column_name(col2)?;
2182 let collected = df.collect_inner()?;
2183 let pl_df = collected.as_ref();
2184 let grouped = pl_df
2185 .clone()
2186 .lazy()
2187 .group_by([col(c1.as_str()), col(c2.as_str())])
2188 .agg([len().alias("count")])
2189 .collect()?;
2190 Ok(super::DataFrame::from_polars_with_options(
2191 grouped,
2192 case_sensitive,
2193 ))
2194}
2195
2196pub fn melt(
2198 df: &DataFrame,
2199 id_vars: &[&str],
2200 value_vars: &[&str],
2201 case_sensitive: bool,
2202) -> Result<DataFrame, PolarsError> {
2203 use polars::prelude::*;
2204 let collected = df.collect_inner()?;
2205 let pl_df = collected.as_ref();
2206 if value_vars.is_empty() {
2207 return Ok(super::DataFrame::from_polars_with_options(
2208 pl_df.head(Some(0)),
2209 case_sensitive,
2210 ));
2211 }
2212 let id_resolved: Vec<String> = id_vars
2213 .iter()
2214 .map(|s| df.resolve_column_name(s).map(|r| r.to_string()))
2215 .collect::<Result<Vec<_>, _>>()?;
2216 let value_resolved: Vec<String> = value_vars
2217 .iter()
2218 .map(|s| df.resolve_column_name(s).map(|r| r.to_string()))
2219 .collect::<Result<Vec<_>, _>>()?;
2220 let mut parts = Vec::with_capacity(value_vars.len());
2221 for vname in &value_resolved {
2222 let select_cols: Vec<&str> = id_resolved
2223 .iter()
2224 .map(|s| s.as_str())
2225 .chain([vname.as_str()])
2226 .collect();
2227 let mut part = pl_df.select(select_cols)?;
2228 let var_series = Series::new("variable".into(), vec![vname.as_str(); part.height()]);
2229 part.with_column(var_series.into())?;
2230 part.rename(vname.as_str(), "value".into())?;
2231 parts.push(part);
2232 }
2233 let mut out = parts
2234 .first()
2235 .ok_or_else(|| PolarsError::ComputeError("melt: no value columns".into()))?
2236 .clone();
2237 for p in parts.iter().skip(1) {
2238 out.vstack_mut(p)?;
2239 }
2240 let col_order: Vec<&str> = id_resolved
2241 .iter()
2242 .map(|s| s.as_str())
2243 .chain(["variable", "value"])
2244 .collect();
2245 let out = out.select(col_order)?;
2246 Ok(super::DataFrame::from_polars_with_options(
2247 out,
2248 case_sensitive,
2249 ))
2250}
2251
2252pub fn except_all(
2254 left: &DataFrame,
2255 right: &DataFrame,
2256 case_sensitive: bool,
2257) -> Result<DataFrame, PolarsError> {
2258 subtract(left, right, case_sensitive)
2259}
2260
2261pub fn intersect_all(
2263 left: &DataFrame,
2264 right: &DataFrame,
2265 case_sensitive: bool,
2266) -> Result<DataFrame, PolarsError> {
2267 intersect(left, right, case_sensitive)
2268}
2269
2270#[cfg(test)]
2271mod tests {
2272 use super::{
2273 SelectItem, distinct, drop, dropna, filter, first, head, limit, offset, order_by,
2274 select_items, union, union_by_name, with_column,
2275 };
2276 use crate::column::Column;
2277 use crate::functions;
2278 use crate::{DataFrame, SparkSession};
2279 use polars::prelude::{col, concat_str, lit};
2280 use serde_json::json;
2281
2282 fn test_df() -> DataFrame {
2283 let spark = SparkSession::builder()
2284 .app_name("transform_tests")
2285 .get_or_create();
2286 spark
2287 .create_dataframe(
2288 vec![
2289 (1i64, 10i64, "a".to_string()),
2290 (2i64, 20i64, "b".to_string()),
2291 (3i64, 30i64, "c".to_string()),
2292 ],
2293 vec!["id", "v", "label"],
2294 )
2295 .unwrap()
2296 }
2297
2298 #[test]
2299 fn limit_zero() {
2300 let df = test_df();
2301 let out = limit(&df, 0, false).unwrap();
2302 assert_eq!(out.count().unwrap(), 0);
2303 }
2304
2305 #[test]
2306 fn limit_more_than_rows() {
2307 let df = test_df();
2308 let out = limit(&df, 10, false).unwrap();
2309 assert_eq!(out.count().unwrap(), 3);
2310 }
2311
2312 #[test]
2313 fn distinct_on_empty() {
2314 let spark = SparkSession::builder()
2315 .app_name("transform_tests")
2316 .get_or_create();
2317 let df = spark
2318 .create_dataframe(vec![] as Vec<(i64, i64, String)>, vec!["a", "b", "c"])
2319 .unwrap();
2320 let out = distinct(&df, None, false).unwrap();
2321 assert_eq!(out.count().unwrap(), 0);
2322 }
2323
2324 #[test]
2325 fn first_returns_one_row() {
2326 let df = test_df();
2327 let out = first(&df, false).unwrap();
2328 assert_eq!(out.count().unwrap(), 1);
2329 }
2330
2331 #[test]
2333 fn first_after_order_by_returns_first_in_sort_order() {
2334 use polars::prelude::df;
2335 let spark = SparkSession::builder()
2336 .app_name("transform_tests")
2337 .get_or_create();
2338 let pl = df![
2339 "name" => ["Charlie", "Alice", "Bob"],
2340 "value" => [3i64, 1i64, 2i64],
2341 ]
2342 .unwrap();
2343 let df = spark.create_dataframe_from_polars(pl);
2344 let ordered = order_by(&df, vec!["value"], vec![true], false).unwrap();
2345 let one = first(&ordered, false).unwrap();
2346 let collected = one.collect_inner().unwrap();
2347 let name_series = collected.column("name").unwrap();
2348 let first_name = name_series.str().unwrap().get(0).unwrap();
2349 assert_eq!(
2350 first_name, "Alice",
2351 "first() after orderBy(value) must return row with min value (Alice=1), not first in storage (Charlie)"
2352 );
2353 }
2354
2355 #[test]
2357 fn with_column_to_timestamp_accepts_multiple_types() {
2358 let spark = SparkSession::builder()
2359 .app_name("to_timestamp_types_test")
2360 .get_or_create();
2361
2362 let rows_int = vec![vec![json!(1672574400)]];
2364 let schema_int = vec![("unix_ts".to_string(), "int".to_string())];
2365 let df_int = spark
2366 .create_dataframe_from_rows(rows_int, schema_int, false, false)
2367 .unwrap();
2368 let col_ts = functions::to_timestamp(&df_int.column("unix_ts").unwrap(), None).unwrap();
2369 let out_int = with_column(&df_int, "parsed", &col_ts, false).unwrap();
2370 let rows_out = out_int.collect_as_json_rows().unwrap();
2371 assert_eq!(rows_out.len(), 1);
2372 assert!(rows_out[0].get("parsed").and_then(|v| v.as_str()).is_some());
2373
2374 let rows_long = vec![vec![json!(1672574400)]];
2376 let schema_long = vec![("unix_ts".to_string(), "long".to_string())];
2377 let df_long = spark
2378 .create_dataframe_from_rows(rows_long, schema_long, false, false)
2379 .unwrap();
2380 let col_ts_long =
2381 functions::to_timestamp(&df_long.column("unix_ts").unwrap(), None).unwrap();
2382 let out_long = with_column(&df_long, "parsed", &col_ts_long, false).unwrap();
2383 assert_eq!(out_long.collect_as_json_rows().unwrap().len(), 1);
2384
2385 let rows_date = vec![vec![json!("2023-01-01")]];
2387 let schema_date = vec![("date_col".to_string(), "date".to_string())];
2388 let df_date = spark
2389 .create_dataframe_from_rows(rows_date, schema_date, false, false)
2390 .unwrap();
2391 let col_ts_date =
2392 functions::to_timestamp(&df_date.column("date_col").unwrap(), None).unwrap();
2393 let out_date = with_column(&df_date, "parsed", &col_ts_date, false).unwrap();
2394 assert_eq!(out_date.collect_as_json_rows().unwrap().len(), 1);
2395
2396 let rows_double = vec![vec![json!(1672574400.5)]];
2398 let schema_double = vec![("unix_ts".to_string(), "double".to_string())];
2399 let df_double = spark
2400 .create_dataframe_from_rows(rows_double, schema_double, false, false)
2401 .unwrap();
2402 let col_ts_double =
2403 functions::to_timestamp(&df_double.column("unix_ts").unwrap(), None).unwrap();
2404 let out_double = with_column(&df_double, "parsed", &col_ts_double, false).unwrap();
2405 assert_eq!(out_double.collect_as_json_rows().unwrap().len(), 1);
2406 }
2407
2408 #[test]
2411 fn to_timestamp_after_regexp_replace_cast_string_parses_successfully() {
2412 use polars::prelude::{NamedFrom, Series};
2413 let spark = SparkSession::builder()
2414 .app_name("to_timestamp_regexp_test")
2415 .get_or_create();
2416 let impression_id = Series::new("impression_id".into(), &["IMP-001", "IMP-002", "IMP-003"]);
2417 let impression_date = Series::new(
2418 "impression_date".into(),
2419 &[
2420 "2025-03-07T19:34:56.123456",
2421 "2025-03-07T18:00:00.0",
2422 "2025-03-06T12:00:00.999",
2423 ],
2424 );
2425 let pl = polars::prelude::DataFrame::new_infer_height(vec![
2426 impression_id.into(),
2427 impression_date.into(),
2428 ])
2429 .unwrap();
2430 let df = spark.create_dataframe_from_polars(pl);
2431 let c = df.column("impression_date").unwrap();
2432 let replaced = functions::regexp_replace(&c, r"\.\d+", "");
2433 let casted = replaced.cast_to("string").unwrap();
2434 let ts_col = functions::to_timestamp(&casted, Some("yyyy-MM-dd'T'HH:mm:ss")).unwrap();
2435 let silver = with_column(&df, "impression_date_parsed", &ts_col, false).unwrap();
2436 let selected = select_items(
2437 &silver,
2438 vec![
2439 SelectItem::ColumnName("impression_id"),
2440 SelectItem::ColumnName("impression_date_parsed"),
2441 ],
2442 false,
2443 )
2444 .unwrap();
2445 let cond = functions::col("impression_id")
2446 .is_not_null()
2447 .and_(&functions::col("impression_date_parsed").is_not_null());
2448 let valid = filter(&selected, cond.into_expr(), false).unwrap();
2449 let count = valid.count().unwrap();
2450 assert_eq!(
2451 count, 3,
2452 "regex strips fractional seconds, format parses; all 3 rows valid"
2453 );
2454 }
2455
2456 #[test]
2458 fn to_timestamp_fused_strip_fraction_fixed_2024_strings_non_null() {
2459 use polars::prelude::{NamedFrom, Series};
2460 let spark = SparkSession::builder()
2461 .app_name("to_timestamp_fused_fixed")
2462 .get_or_create();
2463 let id = Series::new("id".into(), &["a", "b", "c"]);
2464 let date_string = Series::new(
2465 "date_string".into(),
2466 &[
2467 "2024-01-15T10:30:45.123456",
2468 "2024-01-16T14:20:30.789012",
2469 "2024-01-17T09:15:22.456789",
2470 ],
2471 );
2472 let pl = polars::prelude::DataFrame::new_infer_height(vec![id.into(), date_string.into()])
2473 .unwrap();
2474 let df = spark.create_dataframe_from_polars(pl);
2475 let c = df.column("date_string").unwrap();
2476 let ts_col =
2477 functions::to_timestamp_fused_strip_fraction(&c, "yyyy-MM-dd'T'HH:mm:ss").unwrap();
2478 let out = with_column(&df, "date_parsed", &ts_col, false).unwrap();
2479 let non_null = out
2480 .filter(functions::col("date_parsed").is_not_null().into_expr())
2481 .unwrap()
2482 .count()
2483 .unwrap();
2484 assert_eq!(
2485 non_null, 3,
2486 "fixed 2024 strings: fused path returns non-null for all"
2487 );
2488 }
2489
2490 #[test]
2492 fn to_timestamp_fused_strip_fraction_recent_strings_null() {
2493 use chrono::TimeDelta;
2494 use polars::prelude::{NamedFrom, Series};
2495 let spark = SparkSession::builder()
2496 .app_name("to_timestamp_fused_recent")
2497 .get_or_create();
2498 let now = chrono::Utc::now();
2499 let strings: Vec<String> = (0..3)
2500 .map(|i| {
2501 (now - TimeDelta::hours(i))
2502 .format("%Y-%m-%dT%H:%M:%S%.6f")
2503 .to_string()
2504 })
2505 .collect();
2506 let id = Series::new("id".into(), &["a", "b", "c"]);
2507 let date_string = Series::new("date_string".into(), strings.as_slice());
2508 let pl = polars::prelude::DataFrame::new_infer_height(vec![id.into(), date_string.into()])
2509 .unwrap();
2510 let df = spark.create_dataframe_from_polars(pl);
2511 let c = df.column("date_string").unwrap();
2512 let ts_col =
2513 functions::to_timestamp_fused_strip_fraction(&c, "yyyy-MM-dd'T'HH:mm:ss").unwrap();
2514 let out = with_column(&df, "date_parsed", &ts_col, false).unwrap();
2515 let non_null = out
2516 .filter(functions::col("date_parsed").is_not_null().into_expr())
2517 .unwrap()
2518 .count()
2519 .unwrap();
2520 assert_eq!(
2521 non_null, 0,
2522 "recent strings: fused path returns null for all (#168 parity)"
2523 );
2524 }
2525
2526 #[test]
2528 fn with_column_explode_adds_column_and_expands_rows() {
2529 use polars::chunked_array::builder::ListStringChunkedBuilder;
2530 use polars::prelude::{IntoSeries, ListBuilderTrait, NamedFrom, Series};
2531 let spark = SparkSession::builder()
2532 .app_name("with_column_explode_test")
2533 .get_or_create();
2534 let names = Series::new("Name".into(), &["Alice", "Bob", "Charlie"]);
2535 let mut list_builder = ListStringChunkedBuilder::new("Value".into(), 3, 16);
2536 list_builder.append_values_iter(["1", "2"].iter().copied());
2537 list_builder.append_values_iter(["2", "3"].iter().copied());
2538 list_builder.append_values_iter(["4", "5"].iter().copied());
2539 let value_series = list_builder.finish().into_series();
2540 let pl =
2541 polars::prelude::DataFrame::new_infer_height(vec![names.into(), value_series.into()])
2542 .unwrap();
2543 let df = spark.create_dataframe_from_polars(pl);
2544 let col_explode = functions::explode(&df.column("Value").unwrap());
2545 let out = with_column(&df, "ExplodedValue", &col_explode, false).unwrap();
2546 assert_eq!(
2547 out.count().unwrap(),
2548 6,
2549 "explode should produce 6 rows (2+2+2)"
2550 );
2551 let cols = out.columns().unwrap();
2552 assert!(cols.iter().any(|c| c == "Name"));
2553 assert!(cols.iter().any(|c| c == "Value"));
2554 assert!(cols.iter().any(|c| c == "ExplodedValue"));
2555 }
2556
2557 #[test]
2559 fn with_column_map_get_with_column_key_resolves_key() {
2560 use polars::prelude::{NamedFrom, Series};
2561 let spark = SparkSession::builder()
2562 .app_name("map_get_test")
2563 .get_or_create();
2564 let names = Series::new("Name".into(), &["Alice", "Bob"]);
2565 let values = Series::new("Value".into(), [1i64, 3i64]);
2566 let pl = polars::prelude::DataFrame::new_infer_height(vec![names.into(), values.into()])
2567 .unwrap();
2568 let df = spark.create_dataframe_from_polars(pl);
2569 let mapping = functions::create_map(&[
2570 &functions::lit_i64(1),
2571 &functions::lit_str("Small"),
2572 &functions::lit_i64(2),
2573 &functions::lit_str("Medium"),
2574 &functions::lit_i64(3),
2575 &functions::lit_str("Large"),
2576 ])
2577 .unwrap();
2578 let size_col = mapping.get(&functions::col("Value"));
2579 let out = with_column(&df, "Size", &size_col, false).unwrap();
2580 let rows = out.collect_as_json_rows().unwrap();
2581 assert_eq!(rows.len(), 2);
2582 assert_eq!(rows[0].get("Size").and_then(|v| v.as_str()), Some("Small"));
2583 assert_eq!(rows[1].get("Size").and_then(|v| v.as_str()), Some("Large"));
2584 }
2585
2586 #[test]
2588 fn select_with_explode_alias_preserves_list_column() {
2589 use polars::chunked_array::builder::ListStringChunkedBuilder;
2590 use polars::prelude::{ExplodeOptions, IntoSeries, ListBuilderTrait, NamedFrom, Series};
2591 let spark = SparkSession::builder()
2592 .app_name("select_explode_test")
2593 .get_or_create();
2594 let names = Series::new("Name".into(), &["Alice", "Bob"]);
2595 let mut list_builder = ListStringChunkedBuilder::new("Value".into(), 2, 8);
2596 list_builder.append_values_iter(["1", "2"].iter().copied());
2597 list_builder.append_values_iter(["2", "3"].iter().copied());
2598 let value_series = list_builder.finish().into_series();
2599 let pl =
2600 polars::prelude::DataFrame::new_infer_height(vec![names.into(), value_series.into()])
2601 .unwrap();
2602 let df = spark.create_dataframe_from_polars(pl);
2603 let explode_expr = polars::prelude::col("Value")
2604 .explode(ExplodeOptions {
2605 empty_as_null: false,
2606 keep_nulls: false,
2607 })
2608 .alias("ExplodedValue");
2609 let items = vec![
2610 SelectItem::ColumnName("Name"),
2611 SelectItem::ColumnName("Value"),
2612 SelectItem::Expr(explode_expr),
2613 ];
2614 let out = select_items(&df, items, false).unwrap();
2615 assert_eq!(
2616 out.count().unwrap(),
2617 4,
2618 "select with explode should produce 4 rows (2+2)"
2619 );
2620 let cols = out.columns().unwrap();
2621 assert!(cols.iter().any(|c| c == "Name"));
2622 assert!(cols.iter().any(|c| c == "Value"));
2623 assert!(cols.iter().any(|c| c == "ExplodedValue"));
2624 }
2625
2626 #[test]
2632 fn select_items_with_window_preserves_column_values() {
2633 let spark = SparkSession::builder()
2634 .app_name("select_window_1267")
2635 .get_or_create();
2636 let rows = vec![vec![json!("A"), json!(100)], vec![json!("A"), json!(200)]];
2637 let schema = vec![
2638 ("dept".to_string(), "string".to_string()),
2639 ("salary".to_string(), "bigint".to_string()),
2640 ];
2641 let df = spark
2642 .create_dataframe_from_rows(rows, schema, false, false)
2643 .unwrap();
2644 let rank_col = Column::row_number_over(&["dept"], &["salary".to_string()]).unwrap();
2645 let rank_expr = rank_col.into_expr().alias("rn");
2646 let items = vec![
2647 SelectItem::ColumnName("dept"),
2648 SelectItem::ColumnName("salary"),
2649 SelectItem::Expr(rank_expr),
2650 ];
2651 let out = select_items(&df, items, false).unwrap();
2652 let rows_out = out.collect_as_json_rows().unwrap();
2653 assert_eq!(rows_out.len(), 2, "expected 2 rows");
2654 let first = &rows_out[0];
2655 assert_eq!(
2656 first.get("dept").and_then(|v| v.as_str()),
2657 Some("A"),
2658 "first row dept must be A (#1267)"
2659 );
2660 assert_eq!(
2661 first.get("salary").and_then(|v| v.as_i64()),
2662 Some(100),
2663 "first row salary must be 100"
2664 );
2665 assert_eq!(
2666 first.get("rn").and_then(|v| v.as_i64()),
2667 Some(1),
2668 "first row rn must be 1"
2669 );
2670 }
2671
2672 #[test]
2673 fn select_items_ambiguous_case_prefers_first_match_and_uses_requested_name() {
2674 use polars::prelude::df;
2675
2676 let spark = SparkSession::builder()
2677 .app_name("select_ambiguous_case")
2678 .get_or_create();
2679 let left_pl = df!("name" => &["Alice"], "value" => &[1i64]).unwrap();
2680 let right_pl = df!("NAME" => &["Bob"], "other" => &[2i64]).unwrap();
2681 let left = spark.create_dataframe_from_polars(left_pl);
2682 let right = spark.create_dataframe_from_polars(right_pl);
2683 let joined = left
2685 .join(&right, vec!["Name"], crate::dataframe::JoinType::Left)
2686 .unwrap();
2687
2688 let out = select_items(&joined, vec![SelectItem::ColumnName("NaMe")], false).unwrap();
2689 let cols = out.columns().unwrap();
2690 assert!(
2691 cols.contains(&"NaMe".to_string()),
2692 "ambiguous select must expose requested spelling"
2693 );
2694
2695 let pl = out.collect().unwrap();
2696 let name_series = pl.column("NaMe").unwrap().str().unwrap();
2697 assert_eq!(name_series.get(0).unwrap(), "Alice");
2698 }
2699
2700 #[test]
2701 fn head_n() {
2702 let df = test_df();
2703 let out = head(&df, 2, false).unwrap();
2704 assert_eq!(out.count().unwrap(), 2);
2705 }
2706
2707 #[test]
2708 fn offset_skip_first() {
2709 let df = test_df();
2710 let out = offset(&df, 1, false).unwrap();
2711 assert_eq!(out.count().unwrap(), 2);
2712 }
2713
2714 #[test]
2715 fn offset_beyond_length_returns_empty() {
2716 let df = test_df();
2717 let out = offset(&df, 10, false).unwrap();
2718 assert_eq!(out.count().unwrap(), 0);
2719 }
2720
2721 #[test]
2722 fn drop_column() {
2723 let df = test_df();
2724 let out = drop(&df, vec!["v"], false).unwrap();
2725 let cols = out.columns().unwrap();
2726 assert!(!cols.contains(&"v".to_string()));
2727 assert_eq!(out.count().unwrap(), 3);
2728 }
2729
2730 #[test]
2732 fn union_coerces_int_str_same_position() {
2733 use polars::prelude::df;
2734
2735 let spark = SparkSession::builder()
2736 .app_name("transform_tests")
2737 .get_or_create();
2738 let left_pl = df!("id" => &[1i64, 2i64], "name" => &["a", "b"]).unwrap();
2739 let right_pl = df!("id" => &["3", "4"], "name" => &["c", "d"]).unwrap();
2740 let left = spark.create_dataframe_from_polars(left_pl);
2741 let right = spark.create_dataframe_from_polars(right_pl);
2742 let out = union(&left, &right, false).expect("#681: union must coerce id Int64 vs String");
2743 assert_eq!(out.count().unwrap(), 4);
2744 let cols = out.columns().unwrap();
2745 assert_eq!(cols.len(), 2);
2746 assert!(cols.contains(&"id".to_string()));
2747 assert!(cols.contains(&"name".to_string()));
2748 let (_names, rows, schema) = out.collect_as_json_rows_with_names().unwrap();
2750 let id_field = schema.fields().iter().find(|f| f.name == "id").unwrap();
2751 assert!(matches!(
2752 id_field.data_type,
2753 robin_sparkless_core::DataType::String
2754 ));
2755 for row in &rows {
2756 let id_val = row.get("id").unwrap();
2757 assert!(
2758 matches!(id_val, serde_json::Value::String(_)),
2759 "id should be string, got {id_val:?}"
2760 );
2761 }
2762 }
2763
2764 #[test]
2766 fn union_same_names_different_order() {
2767 use polars::prelude::df;
2768
2769 let spark = SparkSession::builder()
2770 .app_name("transform_tests")
2771 .get_or_create();
2772 let left_pl = df!("a" => &[1i64, 2i64], "b" => &["x", "y"]).unwrap();
2773 let right_pl = df!("b" => &["p", "q"], "a" => &[3i64, 4i64]).unwrap();
2774 let left = spark.create_dataframe_from_polars(left_pl);
2775 let right = spark.create_dataframe_from_polars(right_pl);
2776 let out = union(&left, &right, false).expect("union by name set should reorder right");
2777 assert_eq!(out.count().unwrap(), 4);
2778 let cols = out.columns().unwrap();
2779 assert_eq!(cols[0], "a");
2780 assert_eq!(cols[1], "b");
2781 }
2782
2783 #[test]
2785 fn union_by_name_coerces_different_column_types() {
2786 use polars::prelude::df;
2787
2788 let spark = SparkSession::builder()
2789 .app_name("transform_tests")
2790 .get_or_create();
2791 let left_pl = df!("id" => &[1i64], "name" => &["a"]).unwrap();
2792 let left = spark.create_dataframe_from_polars(left_pl);
2793 let schema = vec![
2794 ("id".to_string(), "string".to_string()),
2795 ("name".to_string(), "string".to_string()),
2796 ];
2797 let right = spark
2798 .create_dataframe_from_rows(vec![vec![json!("2"), json!("b")]], schema, false, false)
2799 .unwrap();
2800 let out = union_by_name(&left, &right, true, false)
2801 .expect("issue #603: union_by_name must coerce id Int64 vs String");
2802 assert_eq!(out.count().unwrap(), 2);
2803 }
2804
2805 #[test]
2806 fn dropna_all_columns() {
2807 let df = test_df();
2808 let out = dropna(&df, None, "any", None, false).unwrap();
2809 assert_eq!(out.count().unwrap(), 3);
2810 }
2811
2812 #[test]
2814 fn dropna_invalid_subset_column_raises() {
2815 let df = test_df();
2816 let result = dropna(&df, Some(vec!["NonExistentColumn"]), "any", None, false);
2817 match &result {
2818 Err(e) => assert!(
2819 e.to_string().to_lowercase().contains("not found")
2820 || e.to_string().to_lowercase().contains("column"),
2821 "expected column-not-found error, got: {}",
2822 e
2823 ),
2824 Ok(_) => panic!("expected error for dropna with non-existent subset column"),
2825 }
2826 }
2827
2828 #[test]
2831 fn filter_string_equality_after_with_column() {
2832 let spark = SparkSession::builder()
2833 .app_name("filter_string_eq_test")
2834 .get_or_create();
2835 let pl = polars::prelude::df!["record_id" => &["rec1"], "cust_id" => &["cust1"]].unwrap();
2836 let df = spark.create_dataframe_from_polars(pl);
2837 let transformed = df
2838 .with_column_renamed("record_id", "id")
2839 .unwrap()
2840 .with_column_renamed("cust_id", "customer_id")
2841 .unwrap();
2842 let full_id_expr = concat_str(&[col("id"), col("customer_id")], "_", false);
2843 let transformed = with_column(
2844 &transformed,
2845 "full_id",
2846 &Column::from_expr(full_id_expr, None),
2847 false,
2848 )
2849 .unwrap();
2850 let transformed = select_items(
2851 &transformed,
2852 vec![
2853 SelectItem::Expr(col("id")),
2854 SelectItem::Expr(col("customer_id")),
2855 SelectItem::Expr(col("full_id")),
2856 ],
2857 false,
2858 )
2859 .unwrap();
2860 let condition = Column::new("full_id".to_string())
2862 .eq(lit("rec1_cust1"))
2863 .into_expr();
2864 let result = filter(&transformed, condition, false).unwrap();
2865 assert_eq!(
2866 result.count().unwrap(),
2867 1,
2868 "#1105: filter on string column must return 1 row"
2869 );
2870 }
2871}