1use robin_sparkless_core::EngineError;
4use robin_sparkless_core::engine::{CollectedRows, DataFrameBackend, GroupedDataBackend};
5use robin_sparkless_core::expr::ExprIr;
6use robin_sparkless_core::{DataType, StructType};
7use robin_sparkless_polars::dataframe::{
8 DataFrameNa as PolarsDataFrameNa, DataFrameStat as PolarsDataFrameStat,
9 DataFrameWriter as PolarsDataFrameWriter,
10};
11use robin_sparkless_polars::functions::SortOrder;
12use robin_sparkless_polars::{
13 Column, CubeRollupData as PolarsCubeRollupData, DataFrame as PolarsDataFrame, Expr,
14 GroupedData as PolarsGroupedData, LazyFrame, PivotedGroupedData as PolarsPivotedGroupedData,
15 PlDataFrame, PolarsError,
16};
17use std::collections::HashMap;
18use std::path::Path;
19use std::sync::Arc;
20
21use serde_json::Value as JsonValue;
22
23fn downcast_df(box_df: Box<dyn DataFrameBackend>) -> Result<DataFrame, EngineError> {
25 let concrete = box_df
26 .as_any()
27 .downcast_ref::<PolarsDataFrame>()
28 .ok_or_else(|| EngineError::Internal("expected Polars backend".into()))?;
29 Ok(DataFrame(concrete.clone()))
30}
31
32pub(crate) fn from_backend(box_df: Box<dyn DataFrameBackend>) -> Result<DataFrame, EngineError> {
35 downcast_df(box_df)
36}
37
38#[derive(Clone)]
40pub struct DataFrame(pub(crate) PolarsDataFrame);
41
42pub struct GroupedData(pub(crate) PolarsGroupedData);
44
45pub struct CubeRollupData(pub(crate) PolarsCubeRollupData);
47
48pub struct PivotedGroupedData(pub(crate) PolarsPivotedGroupedData);
50
51pub trait EngineDataFrame {
55 fn filter_expr_ir(&self, condition: &ExprIr) -> Result<DataFrame, EngineError>;
56 fn select_expr_ir(&self, exprs: &[ExprIr]) -> Result<DataFrame, EngineError>;
57 fn with_column_expr_ir(&self, name: &str, expr: &ExprIr) -> Result<DataFrame, EngineError>;
58 fn collect_rows(&self) -> Result<CollectedRows, EngineError>;
59}
60
61impl EngineDataFrame for DataFrame {
62 fn filter_expr_ir(&self, condition: &ExprIr) -> Result<DataFrame, EngineError> {
63 downcast_df(DataFrameBackend::filter(&self.0, condition)?)
64 }
65
66 fn select_expr_ir(&self, exprs: &[ExprIr]) -> Result<DataFrame, EngineError> {
67 downcast_df(DataFrameBackend::select(&self.0, exprs)?)
68 }
69
70 fn with_column_expr_ir(&self, name: &str, expr: &ExprIr) -> Result<DataFrame, EngineError> {
71 downcast_df(DataFrameBackend::with_column(&self.0, name, expr)?)
72 }
73
74 fn collect_rows(&self) -> Result<CollectedRows, EngineError> {
75 DataFrameBackend::collect(&self.0)
76 }
77}
78
79pub use robin_sparkless_polars::dataframe::{
81 GroupBySpec, JoinType, SaveMode, SelectItem, WriteFormat, WriteMode,
82 expr_contains_only_join_key_equalities, try_extract_join_eq_columns,
83 try_extract_join_eq_columns_all,
84};
85
86pub struct DataFrameStat<'a>(PolarsDataFrameStat<'a>);
88
89pub struct DataFrameNa<'a>(PolarsDataFrameNa<'a>);
91
92pub struct DataFrameWriter<'a> {
94 inner: PolarsDataFrameWriter<'a>,
95}
96
97impl DataFrame {
98 pub fn from_polars(df: PlDataFrame) -> Self {
100 DataFrame(PolarsDataFrame::from_polars(df))
101 }
102
103 pub fn from_polars_with_options(df: PlDataFrame, case_sensitive: bool) -> Self {
105 DataFrame(PolarsDataFrame::from_polars_with_options(
106 df,
107 case_sensitive,
108 ))
109 }
110
111 pub fn from_lazy(lf: LazyFrame) -> Self {
113 DataFrame(PolarsDataFrame::from_lazy(lf))
114 }
115
116 pub fn from_lazy_with_options(lf: LazyFrame, case_sensitive: bool) -> Self {
118 DataFrame(PolarsDataFrame::from_lazy_with_options(lf, case_sensitive))
119 }
120
121 pub fn empty() -> Self {
123 DataFrame(PolarsDataFrame::empty())
124 }
125
126 pub fn alias(&self, name: &str) -> Self {
128 DataFrame(self.0.alias(name))
129 }
130
131 pub fn get_alias(&self) -> Option<String> {
133 self.0.get_alias()
134 }
135
136 pub fn filter_expr_ir(&self, condition: &ExprIr) -> Result<DataFrame, EngineError> {
138 <Self as EngineDataFrame>::filter_expr_ir(self, condition)
139 }
140
141 pub fn select_expr_ir(&self, exprs: &[ExprIr]) -> Result<DataFrame, EngineError> {
143 <Self as EngineDataFrame>::select_expr_ir(self, exprs)
144 }
145
146 pub fn with_column_expr_ir(&self, name: &str, expr: &ExprIr) -> Result<DataFrame, EngineError> {
148 <Self as EngineDataFrame>::with_column_expr_ir(self, name, expr)
149 }
150
151 pub fn collect_rows(&self) -> Result<CollectedRows, EngineError> {
153 <Self as EngineDataFrame>::collect_rows(self)
154 }
155
156 pub fn resolve_expr_column_names(&self, expr: Expr) -> Result<Expr, PolarsError> {
157 self.0.resolve_expr_column_names(expr)
158 }
159
160 pub fn coerce_string_numeric_comparisons(&self, expr: Expr) -> Result<Expr, PolarsError> {
161 self.0.coerce_string_numeric_comparisons(expr)
162 }
163
164 pub fn resolve_column_name(&self, name: &str) -> Result<String, PolarsError> {
165 self.0.resolve_column_name(name)
166 }
167
168 pub fn schema(&self) -> Result<StructType, PolarsError> {
169 self.0.schema()
170 }
171
172 pub fn schema_engine(&self) -> Result<StructType, EngineError> {
173 self.0.schema_engine()
174 }
175
176 pub fn get_column_dtype(&self, name: &str) -> Option<robin_sparkless_polars::PlDataType> {
177 self.0.get_column_dtype(name)
178 }
179
180 pub fn get_column_data_type(&self, name: &str) -> Option<DataType> {
181 self.0.get_column_data_type(name)
182 }
183
184 pub fn columns(&self) -> Result<Vec<String>, PolarsError> {
185 self.0.columns()
186 }
187
188 pub fn columns_engine(&self) -> Result<Vec<String>, EngineError> {
189 self.0.columns_engine()
190 }
191
192 pub fn count(&self) -> Result<usize, PolarsError> {
193 self.0.count()
194 }
195
196 pub fn count_engine(&self) -> Result<usize, EngineError> {
197 self.0.count_engine()
198 }
199
200 pub fn show(&self, n: Option<usize>) -> Result<(), PolarsError> {
201 self.0.show(n)
202 }
203
204 pub fn collect(&self) -> Result<Arc<PlDataFrame>, PolarsError> {
205 self.0.collect()
206 }
207
208 pub fn collect_as_json_rows_engine(
209 &self,
210 ) -> Result<Vec<HashMap<String, JsonValue>>, EngineError> {
211 self.0.collect_as_json_rows_engine()
212 }
213
214 pub fn collect_as_json_rows(&self) -> Result<Vec<HashMap<String, JsonValue>>, PolarsError> {
215 self.0.collect_as_json_rows()
216 }
217
218 #[allow(clippy::type_complexity)]
220 pub fn collect_as_json_rows_with_names(
221 &self,
222 ) -> Result<(Vec<String>, Vec<HashMap<String, JsonValue>>, StructType), PolarsError> {
223 self.0.collect_as_json_rows_with_names()
224 }
225
226 pub fn to_json_rows(&self) -> Result<String, EngineError> {
227 self.0.to_json_rows()
228 }
229
230 pub fn select_exprs(&self, exprs: Vec<Expr>) -> Result<DataFrame, PolarsError> {
231 self.0.select_exprs(exprs).map(DataFrame)
232 }
233
234 pub fn select(&self, cols: Vec<&str>) -> Result<DataFrame, PolarsError> {
235 self.0.select(cols).map(DataFrame)
236 }
237
238 pub fn select_engine(&self, cols: Vec<&str>) -> Result<DataFrame, EngineError> {
239 self.0.select_engine(cols).map(DataFrame)
240 }
241
242 pub fn select_items(&self, items: Vec<SelectItem<'_>>) -> Result<DataFrame, PolarsError> {
243 self.0.select_items(items).map(DataFrame)
244 }
245
246 pub fn filter(&self, condition: Expr) -> Result<DataFrame, PolarsError> {
247 self.0.filter(condition).map(DataFrame)
248 }
249
250 pub fn filter_engine(&self, condition: Expr) -> Result<DataFrame, EngineError> {
251 self.0.filter_engine(condition).map(DataFrame)
252 }
253
254 pub fn column(&self, name: &str) -> Result<Column, PolarsError> {
255 self.0.column(name)
256 }
257
258 pub fn with_column(&self, column_name: &str, col: &Column) -> Result<DataFrame, PolarsError> {
259 self.0.with_column(column_name, col).map(DataFrame)
260 }
261
262 pub fn with_column_engine(
263 &self,
264 column_name: &str,
265 col: &Column,
266 ) -> Result<DataFrame, EngineError> {
267 self.0.with_column_engine(column_name, col).map(DataFrame)
268 }
269
270 pub fn with_column_expr(
271 &self,
272 column_name: &str,
273 expr: Expr,
274 ) -> Result<DataFrame, PolarsError> {
275 self.0.with_column_expr(column_name, expr).map(DataFrame)
276 }
277
278 pub fn group_by(&self, column_names: Vec<&str>) -> Result<GroupedData, PolarsError> {
279 self.0.group_by(column_names).map(GroupedData)
280 }
281
282 pub fn group_by_engine(&self, column_names: Vec<&str>) -> Result<GroupedData, EngineError> {
283 self.0.group_by_engine(column_names).map(GroupedData)
284 }
285
286 pub fn group_by_exprs(
287 &self,
288 exprs: Vec<Expr>,
289 grouping_col_names: Vec<String>,
290 ) -> Result<GroupedData, PolarsError> {
291 self.0
292 .group_by_exprs(exprs, grouping_col_names)
293 .map(GroupedData)
294 }
295
296 pub fn group_by_specs(&self, specs: Vec<GroupBySpec>) -> Result<GroupedData, PolarsError> {
297 self.0.group_by_specs(specs).map(GroupedData)
298 }
299
300 pub fn cube(&self, column_names: Vec<&str>) -> Result<CubeRollupData, PolarsError> {
301 self.0.cube(column_names).map(CubeRollupData)
302 }
303
304 pub fn rollup(&self, column_names: Vec<&str>) -> Result<CubeRollupData, PolarsError> {
305 self.0.rollup(column_names).map(CubeRollupData)
306 }
307
308 pub fn agg(&self, aggregations: Vec<Expr>) -> Result<DataFrame, PolarsError> {
309 self.0.agg(aggregations).map(DataFrame)
310 }
311
312 pub fn join(
313 &self,
314 other: &DataFrame,
315 on: Vec<&str>,
316 how: JoinType,
317 ) -> Result<DataFrame, PolarsError> {
318 self.0.join(&other.0, on, how).map(DataFrame)
319 }
320
321 pub fn join_with_keys(
323 &self,
324 other: &DataFrame,
325 left_on: Vec<&str>,
326 right_on: Vec<&str>,
327 how: JoinType,
328 only_key_equalities: bool,
329 ) -> Result<DataFrame, PolarsError> {
330 self.0
331 .join_with_keys(&other.0, left_on, right_on, how, only_key_equalities)
332 .map(DataFrame)
333 }
334
335 pub fn order_by(
336 &self,
337 column_names: Vec<&str>,
338 ascending: Vec<bool>,
339 ) -> Result<DataFrame, PolarsError> {
340 self.0.order_by(column_names, ascending).map(DataFrame)
341 }
342
343 pub fn order_by_exprs(&self, sort_orders: Vec<SortOrder>) -> Result<DataFrame, PolarsError> {
344 self.0.order_by_exprs(sort_orders).map(DataFrame)
345 }
346
347 pub fn union(&self, other: &DataFrame) -> Result<DataFrame, PolarsError> {
348 self.0.union(&other.0).map(DataFrame)
349 }
350
351 pub fn union_all(&self, other: &DataFrame) -> Result<DataFrame, PolarsError> {
352 self.0.union_all(&other.0).map(DataFrame)
353 }
354
355 pub fn union_by_name(
356 &self,
357 other: &DataFrame,
358 allow_missing_columns: bool,
359 ) -> Result<DataFrame, PolarsError> {
360 self.0
361 .union_by_name(&other.0, allow_missing_columns)
362 .map(DataFrame)
363 }
364
365 pub fn distinct(&self, subset: Option<Vec<&str>>) -> Result<DataFrame, PolarsError> {
366 self.0.distinct(subset).map(DataFrame)
367 }
368
369 pub fn drop(&self, columns: Vec<&str>) -> Result<DataFrame, PolarsError> {
370 self.0.drop(columns).map(DataFrame)
371 }
372
373 pub fn dropna(
374 &self,
375 subset: Option<Vec<&str>>,
376 how: &str,
377 thresh: Option<usize>,
378 ) -> Result<DataFrame, PolarsError> {
379 self.0.dropna(subset, how, thresh).map(DataFrame)
380 }
381
382 pub fn fillna(&self, value: Expr, subset: Option<Vec<&str>>) -> Result<DataFrame, PolarsError> {
383 self.0.fillna(value, subset).map(DataFrame)
384 }
385
386 pub fn limit(&self, n: usize) -> Result<DataFrame, PolarsError> {
387 self.0.limit(n).map(DataFrame)
388 }
389
390 pub fn limit_engine(&self, n: usize) -> Result<DataFrame, EngineError> {
391 self.0.limit_engine(n).map(DataFrame)
392 }
393
394 pub fn with_column_renamed(
395 &self,
396 old_name: &str,
397 new_name: &str,
398 ) -> Result<DataFrame, PolarsError> {
399 self.0
400 .with_column_renamed(old_name, new_name)
401 .map(DataFrame)
402 }
403
404 pub fn replace(
405 &self,
406 column_name: &str,
407 old_value: Expr,
408 new_value: Expr,
409 ) -> Result<DataFrame, PolarsError> {
410 self.0
411 .replace(column_name, old_value, new_value)
412 .map(DataFrame)
413 }
414
415 pub fn cross_join(&self, other: &DataFrame) -> Result<DataFrame, PolarsError> {
416 self.0.cross_join(&other.0).map(DataFrame)
417 }
418
419 pub fn describe(&self) -> Result<DataFrame, PolarsError> {
420 self.0.describe().map(DataFrame)
421 }
422
423 pub fn cache(&self) -> Result<DataFrame, PolarsError> {
424 self.0.cache().map(DataFrame)
425 }
426
427 pub fn persist(&self) -> Result<DataFrame, PolarsError> {
428 self.0.persist().map(DataFrame)
429 }
430
431 pub fn unpersist(&self) -> Result<DataFrame, PolarsError> {
432 self.0.unpersist().map(DataFrame)
433 }
434
435 pub fn subtract(&self, other: &DataFrame) -> Result<DataFrame, PolarsError> {
436 self.0.subtract(&other.0).map(DataFrame)
437 }
438
439 pub fn intersect(&self, other: &DataFrame) -> Result<DataFrame, PolarsError> {
440 self.0.intersect(&other.0).map(DataFrame)
441 }
442
443 pub fn except_all(&self, other: &DataFrame) -> Result<DataFrame, PolarsError> {
444 self.0.except_all(&other.0).map(DataFrame)
445 }
446
447 pub fn sample(
448 &self,
449 with_replacement: bool,
450 fraction: f64,
451 seed: Option<u64>,
452 ) -> Result<DataFrame, PolarsError> {
453 self.0
454 .sample(with_replacement, fraction, seed)
455 .map(DataFrame)
456 }
457
458 pub fn random_split(
459 &self,
460 weights: &[f64],
461 seed: Option<u64>,
462 ) -> Result<Vec<DataFrame>, PolarsError> {
463 self.0
464 .random_split(weights, seed)
465 .map(|v| v.into_iter().map(DataFrame).collect())
466 }
467
468 pub fn sample_by(
469 &self,
470 col_name: &str,
471 fractions: &[(Expr, f64)],
472 seed: Option<u64>,
473 ) -> Result<DataFrame, PolarsError> {
474 self.0.sample_by(col_name, fractions, seed).map(DataFrame)
475 }
476
477 pub fn first(&self) -> Result<DataFrame, PolarsError> {
478 self.0.first().map(DataFrame)
479 }
480
481 pub fn head(&self, n: usize) -> Result<DataFrame, PolarsError> {
482 self.0.head(n).map(DataFrame)
483 }
484
485 pub fn take(&self, n: usize) -> Result<DataFrame, PolarsError> {
486 self.0.take(n).map(DataFrame)
487 }
488
489 pub fn tail(&self, n: usize) -> Result<DataFrame, PolarsError> {
490 self.0.tail(n).map(DataFrame)
491 }
492
493 pub fn is_empty(&self) -> bool {
494 self.0.is_empty()
495 }
496
497 pub fn to_df(&self, names: Vec<&str>) -> Result<DataFrame, PolarsError> {
498 self.0.to_df(names).map(DataFrame)
499 }
500
501 pub fn stat(&self) -> DataFrameStat<'_> {
502 DataFrameStat(PolarsDataFrameStat::new(&self.0))
503 }
504
505 pub fn corr(&self) -> Result<DataFrame, PolarsError> {
506 self.0.corr().map(DataFrame)
507 }
508
509 pub fn corr_cols(&self, col1: &str, col2: &str) -> Result<f64, PolarsError> {
510 self.0.corr_cols(col1, col2)
511 }
512
513 pub fn cov_cols(&self, col1: &str, col2: &str) -> Result<f64, PolarsError> {
514 self.0.cov_cols(col1, col2)
515 }
516
517 pub fn summary(&self) -> Result<DataFrame, PolarsError> {
518 self.0.summary().map(DataFrame)
519 }
520
521 pub fn to_json(&self) -> Result<Vec<String>, PolarsError> {
522 self.0.to_json()
523 }
524
525 pub fn explain(&self) -> String {
526 self.0.explain()
527 }
528
529 pub fn print_schema(&self) -> Result<String, PolarsError> {
530 self.0.print_schema()
531 }
532
533 pub fn checkpoint(&self) -> Result<DataFrame, PolarsError> {
534 self.0.checkpoint().map(DataFrame)
535 }
536
537 pub fn local_checkpoint(&self) -> Result<DataFrame, PolarsError> {
538 self.0.local_checkpoint().map(DataFrame)
539 }
540
541 pub fn repartition(&self, num_partitions: usize) -> Result<DataFrame, PolarsError> {
542 self.0.repartition(num_partitions).map(DataFrame)
543 }
544
545 pub fn repartition_by_range(
546 &self,
547 num_partitions: usize,
548 columns: Vec<&str>,
549 ) -> Result<DataFrame, PolarsError> {
550 self.0
551 .repartition_by_range(num_partitions, columns)
552 .map(DataFrame)
553 }
554
555 pub fn dtypes(&self) -> Result<Vec<(String, String)>, PolarsError> {
556 self.0.dtypes()
557 }
558
559 pub fn sort_within_partitions(&self, cols: &[SortOrder]) -> Result<DataFrame, PolarsError> {
560 self.0.sort_within_partitions(cols).map(DataFrame)
561 }
562
563 pub fn coalesce(&self, num_partitions: usize) -> Result<DataFrame, PolarsError> {
564 self.0.coalesce(num_partitions).map(DataFrame)
565 }
566
567 pub fn hint(&self, name: &str, params: &[i32]) -> Result<DataFrame, PolarsError> {
568 self.0.hint(name, params).map(DataFrame)
569 }
570
571 pub fn is_local(&self) -> bool {
572 self.0.is_local()
573 }
574
575 pub fn input_files(&self) -> Vec<String> {
576 self.0.input_files()
577 }
578
579 pub fn same_semantics(&self, other: &DataFrame) -> bool {
580 self.0.same_semantics(&other.0)
581 }
582
583 pub fn semantic_hash(&self) -> u64 {
584 self.0.semantic_hash()
585 }
586
587 pub fn observe(&self, name: &str, expr: Expr) -> Result<DataFrame, PolarsError> {
588 self.0.observe(name, expr).map(DataFrame)
589 }
590
591 pub fn with_watermark(
592 &self,
593 event_time: &str,
594 delay_threshold: &str,
595 ) -> Result<DataFrame, PolarsError> {
596 self.0
597 .with_watermark(event_time, delay_threshold)
598 .map(DataFrame)
599 }
600
601 pub fn select_expr(&self, exprs: &[String]) -> Result<DataFrame, PolarsError> {
602 self.0.select_expr(exprs).map(DataFrame)
603 }
604
605 #[cfg(feature = "sql")]
607 pub fn select_expr_with_session(
608 &self,
609 session: &crate::session::SparkSession,
610 exprs: &[String],
611 ) -> Result<DataFrame, PolarsError> {
612 self.0
613 .select_expr_with_session(&session.0, exprs)
614 .map(DataFrame)
615 }
616
617 pub fn col_regex(&self, pattern: &str) -> Result<DataFrame, PolarsError> {
618 self.0.col_regex(pattern).map(DataFrame)
619 }
620
621 pub fn with_columns(&self, exprs: &[(String, Column)]) -> Result<DataFrame, PolarsError> {
622 self.0.with_columns(exprs).map(DataFrame)
623 }
624
625 pub fn with_columns_renamed(
626 &self,
627 renames: &[(String, String)],
628 ) -> Result<DataFrame, PolarsError> {
629 self.0.with_columns_renamed(renames).map(DataFrame)
630 }
631
632 pub fn na(&self) -> DataFrameNa<'_> {
633 DataFrameNa(PolarsDataFrameNa::new(&self.0))
634 }
635
636 pub fn offset(&self, n: usize) -> Result<DataFrame, PolarsError> {
637 self.0.offset(n).map(DataFrame)
638 }
639
640 pub fn transform<F>(&self, f: F) -> Result<DataFrame, PolarsError>
641 where
642 F: FnOnce(DataFrame) -> Result<DataFrame, PolarsError>,
643 {
644 self.0
645 .transform(|polars_df| f(DataFrame(polars_df)).map(|r| r.0))
646 .map(DataFrame)
647 }
648
649 pub fn freq_items(&self, columns: &[&str], support: f64) -> Result<DataFrame, PolarsError> {
650 self.0.freq_items(columns, support).map(DataFrame)
651 }
652
653 pub fn approx_quantile(
654 &self,
655 column: &str,
656 probabilities: &[f64],
657 ) -> Result<DataFrame, PolarsError> {
658 self.0.approx_quantile(column, probabilities).map(DataFrame)
659 }
660
661 pub fn crosstab(&self, col1: &str, col2: &str) -> Result<DataFrame, PolarsError> {
662 self.0.crosstab(col1, col2).map(DataFrame)
663 }
664
665 pub fn melt(&self, id_vars: &[&str], value_vars: &[&str]) -> Result<DataFrame, PolarsError> {
666 self.0.melt(id_vars, value_vars).map(DataFrame)
667 }
668
669 pub fn unpivot(&self, ids: &[&str], values: &[&str]) -> Result<DataFrame, PolarsError> {
670 self.0.unpivot(ids, values).map(DataFrame)
671 }
672
673 pub fn write(&self) -> DataFrameWriter<'_> {
675 DataFrameWriter {
676 inner: self.0.write(),
677 }
678 }
679
680 #[cfg(feature = "delta")]
682 pub fn write_delta(
683 &self,
684 path: impl AsRef<Path>,
685 overwrite: bool,
686 merge_schema: bool,
687 ) -> Result<(), PolarsError> {
688 self.0.write_delta(path, overwrite, merge_schema)
689 }
690
691 #[cfg(not(feature = "delta"))]
692 pub fn write_delta(
693 &self,
694 _path: impl AsRef<Path>,
695 _overwrite: bool,
696 _merge_schema: bool,
697 ) -> Result<(), PolarsError> {
698 Err(PolarsError::InvalidOperation(
699 "Delta Lake requires the 'delta' feature. Build with --features delta.".into(),
700 ))
701 }
702
703 pub fn save_as_delta_table(&self, session: &crate::session::SparkSession, name: &str) {
705 self.0.save_as_delta_table(&session.0, name)
706 }
707}
708
709impl<'a> DataFrameStat<'a> {
710 pub fn cov(&self, col1: &str, col2: &str) -> Result<f64, PolarsError> {
711 self.0.cov(col1, col2)
712 }
713
714 pub fn corr(&self, col1: &str, col2: &str) -> Result<f64, PolarsError> {
715 self.0.corr(col1, col2)
716 }
717
718 pub fn corr_matrix(&self) -> Result<DataFrame, PolarsError> {
719 self.0.corr_matrix().map(DataFrame)
720 }
721}
722
723impl<'a> DataFrameNa<'a> {
725 pub fn drop(
726 &self,
727 subset: Option<Vec<&str>>,
728 how: &str,
729 thresh: Option<usize>,
730 ) -> Result<DataFrame, PolarsError> {
731 self.0.drop(subset, how, thresh).map(DataFrame)
732 }
733
734 pub fn fill(&self, value: Expr, subset: Option<Vec<&str>>) -> Result<DataFrame, PolarsError> {
735 self.0.fill(value, subset).map(DataFrame)
736 }
737
738 pub fn replace(
739 &self,
740 old_value: Expr,
741 new_value: Expr,
742 subset: Option<Vec<&str>>,
743 ) -> Result<DataFrame, PolarsError> {
744 self.0.replace(old_value, new_value, subset).map(DataFrame)
745 }
746}
747
748impl<'a> DataFrameWriter<'a> {
749 pub fn mode(mut self, mode: WriteMode) -> Self {
750 self.inner = self.inner.mode(mode);
751 self
752 }
753
754 pub fn format(mut self, format: WriteFormat) -> Self {
755 self.inner = self.inner.format(format);
756 self
757 }
758
759 pub fn option(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
760 self.inner = self.inner.option(key, value);
761 self
762 }
763
764 pub fn options(mut self, opts: impl IntoIterator<Item = (String, String)>) -> Self {
765 self.inner = self.inner.options(opts);
766 self
767 }
768
769 pub fn partition_by(mut self, cols: impl IntoIterator<Item = impl Into<String>>) -> Self {
770 self.inner = self.inner.partition_by(cols);
771 self
772 }
773
774 pub fn save_as_table(
775 &self,
776 session: &crate::session::SparkSession,
777 name: &str,
778 mode: SaveMode,
779 ) -> Result<(), PolarsError> {
780 self.inner.save_as_table(&session.0, name, mode)
781 }
782
783 pub fn save_as_table_with_options(
785 &self,
786 session: &crate::session::SparkSession,
787 name: &str,
788 mode: SaveMode,
789 options: &[(String, String)],
790 ) -> Result<(), PolarsError> {
791 self.inner
792 .save_as_table_with_options(&session.0, name, mode, options)
793 }
794
795 pub fn save(&self, path: impl AsRef<Path>) -> Result<(), PolarsError> {
796 self.inner.save(path)
797 }
798
799 #[cfg(any(
801 feature = "jdbc",
802 feature = "jdbc_mysql",
803 feature = "jdbc_mariadb",
804 feature = "jdbc_mssql",
805 feature = "jdbc_oracle",
806 feature = "jdbc_db2",
807 feature = "sqlite"
808 ))]
809 pub fn jdbc(
810 &self,
811 url: &str,
812 table: &str,
813 properties: &[(String, String)],
814 mode: SaveMode,
815 ) -> Result<(), robin_sparkless_core::EngineError> {
816 self.inner.jdbc(url, table, properties, mode)
817 }
818}
819
820impl GroupedData {
821 pub fn count(&self) -> Result<DataFrame, PolarsError> {
822 self.0.count().map(DataFrame)
823 }
824
825 pub fn sum(&self, column: &str) -> Result<DataFrame, PolarsError> {
826 self.0.sum(column).map(DataFrame)
827 }
828
829 pub fn min(&self, column: &str) -> Result<DataFrame, PolarsError> {
830 self.0.min(column).map(DataFrame)
831 }
832
833 pub fn max(&self, column: &str) -> Result<DataFrame, PolarsError> {
834 self.0.max(column).map(DataFrame)
835 }
836
837 pub fn mean(&self, column: &str) -> Result<DataFrame, PolarsError> {
838 self.0.avg(&[column]).map(DataFrame)
839 }
840
841 pub fn avg(&self, columns: &[&str]) -> Result<DataFrame, PolarsError> {
842 self.0.avg(columns).map(DataFrame)
843 }
844
845 pub fn agg(&self, exprs: Vec<Expr>) -> Result<DataFrame, PolarsError> {
846 self.0.agg(exprs).map(DataFrame)
847 }
848
849 pub fn agg_columns(&self, aggregations: Vec<Column>) -> Result<DataFrame, PolarsError> {
850 self.0.agg_columns(aggregations).map(DataFrame)
851 }
852
853 pub fn agg_expr_ir(&self, exprs: &[ExprIr]) -> Result<DataFrame, EngineError> {
855 downcast_df(GroupedDataBackend::agg(&self.0, exprs)?)
856 }
857
858 pub fn pivot(&self, pivot_col: &str, values: Option<Vec<String>>) -> PivotedGroupedData {
859 PivotedGroupedData(self.0.pivot(pivot_col, values))
860 }
861}
862
863impl CubeRollupData {
864 pub fn count(&self) -> Result<DataFrame, PolarsError> {
865 self.0.count().map(DataFrame)
866 }
867
868 pub fn agg(&self, exprs: Vec<Expr>) -> Result<DataFrame, PolarsError> {
869 self.0.agg(exprs).map(DataFrame)
870 }
871}
872
873impl PivotedGroupedData {
874 pub fn count(&self) -> Result<DataFrame, PolarsError> {
875 self.0.count().map(DataFrame)
876 }
877
878 pub fn sum(&self, value_col: &str) -> Result<DataFrame, PolarsError> {
879 self.0.sum(value_col).map(DataFrame)
880 }
881
882 pub fn avg(&self, value_col: &str) -> Result<DataFrame, PolarsError> {
883 self.0.avg(value_col).map(DataFrame)
884 }
885
886 pub fn min(&self, value_col: &str) -> Result<DataFrame, PolarsError> {
887 self.0.min(value_col).map(DataFrame)
888 }
889
890 pub fn max(&self, value_col: &str) -> Result<DataFrame, PolarsError> {
891 self.0.max(value_col).map(DataFrame)
892 }
893
894 pub fn _count_distinct(&self, value_col: &str) -> Result<DataFrame, PolarsError> {
896 self.0._count_distinct(value_col).map(DataFrame)
897 }
898
899 pub fn _collect_list(&self, value_col: &str) -> Result<DataFrame, PolarsError> {
901 self.0._collect_list(value_col).map(DataFrame)
902 }
903
904 pub fn _collect_set(&self, value_col: &str) -> Result<DataFrame, PolarsError> {
906 self.0._collect_set(value_col).map(DataFrame)
907 }
908
909 pub fn _first(&self, value_col: &str) -> Result<DataFrame, PolarsError> {
911 self.0._first(value_col).map(DataFrame)
912 }
913
914 pub fn _last(&self, value_col: &str) -> Result<DataFrame, PolarsError> {
916 self.0._last(value_col).map(DataFrame)
917 }
918
919 pub fn _stddev(&self, value_col: &str) -> Result<DataFrame, PolarsError> {
921 self.0._stddev(value_col).map(DataFrame)
922 }
923
924 pub fn _variance(&self, value_col: &str) -> Result<DataFrame, PolarsError> {
926 self.0._variance(value_col).map(DataFrame)
927 }
928
929 pub fn mean(&self, value_col: &str) -> Result<DataFrame, PolarsError> {
930 self.0.mean(value_col).map(DataFrame)
931 }
932
933 pub fn agg(&self, exprs: Vec<Expr>) -> Result<DataFrame, PolarsError> {
934 self.0.agg(exprs).map(DataFrame)
935 }
936}
937
938#[cfg(test)]
939mod tests {
940 #[test]
942 fn engine_dataframe_filter_and_collect_rows() {
943 use robin_sparkless_core::expr::{col, gt};
944 use serde_json::json;
945
946 let session = crate::session::SparkSession::builder()
947 .app_name("engine_dataframe_filter")
948 .get_or_create();
949 let data = vec![vec![json!(1)], vec![json!(2)], vec![json!(3)]];
950 let schema = vec![("x".to_string(), "bigint".to_string())];
951 let df = session
952 .create_dataframe_from_rows_engine(data, schema, false, false)
953 .unwrap();
954
955 let cond = gt(col("x"), col("x")); let filtered = df.filter_expr_ir(&cond).unwrap();
957 let rows = filtered.collect_rows().unwrap();
958 assert!(
959 rows.is_empty(),
960 "expected no rows after always-false filter"
961 );
962 }
963}