1mod aggregations;
4mod joins;
5mod stats;
6mod transformations;
7
8pub use aggregations::{CubeRollupData, GroupedData};
9pub use joins::{join, JoinType};
10pub use stats::DataFrameStat;
11pub use transformations::{
12 filter, order_by, order_by_exprs, select, select_with_exprs, with_column, DataFrameNa,
13};
14
15use crate::column::Column;
16use crate::functions::SortOrder;
17use crate::schema::StructType;
18use polars::prelude::{
19 AnyValue, DataFrame as PlDataFrame, Expr, PolarsError, SchemaNamesAndDtypes,
20};
21use serde_json::Value as JsonValue;
22use std::collections::HashMap;
23use std::sync::Arc;
24
25const DEFAULT_CASE_SENSITIVE: bool = false;
27
28pub struct DataFrame {
31 pub(crate) df: Arc<PlDataFrame>,
32 pub(crate) case_sensitive: bool,
34}
35
36impl DataFrame {
37 pub fn from_polars(df: PlDataFrame) -> Self {
39 DataFrame {
40 df: Arc::new(df),
41 case_sensitive: DEFAULT_CASE_SENSITIVE,
42 }
43 }
44
45 pub fn from_polars_with_options(df: PlDataFrame, case_sensitive: bool) -> Self {
48 DataFrame {
49 df: Arc::new(df),
50 case_sensitive,
51 }
52 }
53
54 pub fn empty() -> Self {
56 DataFrame {
57 df: Arc::new(PlDataFrame::empty()),
58 case_sensitive: DEFAULT_CASE_SENSITIVE,
59 }
60 }
61
62 pub fn resolve_column_name(&self, name: &str) -> Result<String, PolarsError> {
65 let names = self.df.get_column_names();
66 if self.case_sensitive {
67 if names.iter().any(|n| *n == name) {
68 return Ok(name.to_string());
69 }
70 } else {
71 let name_lower = name.to_lowercase();
72 for n in names {
73 if n.to_lowercase() == name_lower {
74 return Ok(n.to_string());
75 }
76 }
77 }
78 let available: Vec<String> = self
79 .df
80 .get_column_names()
81 .iter()
82 .map(|s| s.to_string())
83 .collect();
84 Err(PolarsError::ColumnNotFound(
85 format!(
86 "Column '{}' not found. Available columns: [{}]. Check spelling and case sensitivity (spark.sql.caseSensitive).",
87 name,
88 available.join(", ")
89 )
90 .into(),
91 ))
92 }
93
94 pub fn schema(&self) -> Result<StructType, PolarsError> {
96 Ok(StructType::from_polars_schema(&self.df.schema()))
97 }
98
99 pub fn columns(&self) -> Result<Vec<String>, PolarsError> {
101 Ok(self
102 .df
103 .get_column_names()
104 .iter()
105 .map(|s| s.to_string())
106 .collect())
107 }
108
109 pub fn count(&self) -> Result<usize, PolarsError> {
111 Ok(self.df.height())
112 }
113
114 pub fn show(&self, n: Option<usize>) -> Result<(), PolarsError> {
116 let n = n.unwrap_or(20);
117 println!("{}", self.df.head(Some(n)));
118 Ok(())
119 }
120
121 pub fn collect(&self) -> Result<Arc<PlDataFrame>, PolarsError> {
123 Ok(self.df.clone())
124 }
125
126 pub fn collect_as_json_rows(&self) -> Result<Vec<HashMap<String, JsonValue>>, PolarsError> {
128 let df = self.df.as_ref();
129 let names = df.get_column_names();
130 let nrows = df.height();
131 let mut rows = Vec::with_capacity(nrows);
132 for i in 0..nrows {
133 let mut row = HashMap::with_capacity(names.len());
134 for (col_idx, name) in names.iter().enumerate() {
135 let s = df
136 .get_columns()
137 .get(col_idx)
138 .ok_or_else(|| PolarsError::ComputeError("column index out of range".into()))?;
139 let av = s.get(i)?;
140 let jv = any_value_to_json(av);
141 row.insert(name.to_string(), jv);
142 }
143 rows.push(row);
144 }
145 Ok(rows)
146 }
147
148 pub fn select_exprs(&self, exprs: Vec<Expr>) -> Result<DataFrame, PolarsError> {
152 transformations::select_with_exprs(self, exprs, self.case_sensitive)
153 }
154
155 pub fn select(&self, cols: Vec<&str>) -> Result<DataFrame, PolarsError> {
158 let resolved: Vec<String> = cols
159 .iter()
160 .map(|c| self.resolve_column_name(c))
161 .collect::<Result<Vec<_>, _>>()?;
162 let refs: Vec<&str> = resolved.iter().map(|s| s.as_str()).collect();
163 let mut result = transformations::select(self, refs, self.case_sensitive)?;
164 if !self.case_sensitive {
166 for (requested, res) in cols.iter().zip(resolved.iter()) {
167 if *requested != res.as_str() {
168 result = result.with_column_renamed(res, requested)?;
169 }
170 }
171 }
172 Ok(result)
173 }
174
175 pub fn filter(&self, condition: Expr) -> Result<DataFrame, PolarsError> {
177 transformations::filter(self, condition, self.case_sensitive)
178 }
179
180 pub fn column(&self, name: &str) -> Result<Column, PolarsError> {
183 let resolved = self.resolve_column_name(name)?;
184 Ok(Column::new(resolved))
185 }
186
187 pub fn with_column(&self, column_name: &str, col: &Column) -> Result<DataFrame, PolarsError> {
190 transformations::with_column(self, column_name, col, self.case_sensitive)
191 }
192
193 pub fn with_column_expr(
195 &self,
196 column_name: &str,
197 expr: Expr,
198 ) -> Result<DataFrame, PolarsError> {
199 let col = Column::from_expr(expr, None);
200 self.with_column(column_name, &col)
201 }
202
203 pub fn group_by(&self, column_names: Vec<&str>) -> Result<GroupedData, PolarsError> {
206 use polars::prelude::*;
207 let resolved: Vec<String> = column_names
208 .iter()
209 .map(|c| self.resolve_column_name(c))
210 .collect::<Result<Vec<_>, _>>()?;
211 let exprs: Vec<Expr> = resolved.iter().map(|name| col(name.as_str())).collect();
212 let lazy_grouped = self.df.as_ref().clone().lazy().group_by(exprs);
213 Ok(GroupedData {
214 lazy_grouped,
215 grouping_cols: resolved,
216 case_sensitive: self.case_sensitive,
217 })
218 }
219
220 pub fn cube(&self, column_names: Vec<&str>) -> Result<CubeRollupData, PolarsError> {
222 let resolved: Vec<String> = column_names
223 .iter()
224 .map(|c| self.resolve_column_name(c))
225 .collect::<Result<Vec<_>, _>>()?;
226 Ok(CubeRollupData {
227 df: self.df.as_ref().clone(),
228 grouping_cols: resolved,
229 case_sensitive: self.case_sensitive,
230 is_cube: true,
231 })
232 }
233
234 pub fn rollup(&self, column_names: Vec<&str>) -> Result<CubeRollupData, PolarsError> {
236 let resolved: Vec<String> = column_names
237 .iter()
238 .map(|c| self.resolve_column_name(c))
239 .collect::<Result<Vec<_>, _>>()?;
240 Ok(CubeRollupData {
241 df: self.df.as_ref().clone(),
242 grouping_cols: resolved,
243 case_sensitive: self.case_sensitive,
244 is_cube: false,
245 })
246 }
247
248 pub fn join(
251 &self,
252 other: &DataFrame,
253 on: Vec<&str>,
254 how: JoinType,
255 ) -> Result<DataFrame, PolarsError> {
256 let resolved: Vec<String> = on
257 .iter()
258 .map(|c| self.resolve_column_name(c))
259 .collect::<Result<Vec<_>, _>>()?;
260 let on_refs: Vec<&str> = resolved.iter().map(|s| s.as_str()).collect();
261 join(self, other, on_refs, how, self.case_sensitive)
262 }
263
264 pub fn order_by(
267 &self,
268 column_names: Vec<&str>,
269 ascending: Vec<bool>,
270 ) -> Result<DataFrame, PolarsError> {
271 let resolved: Vec<String> = column_names
272 .iter()
273 .map(|c| self.resolve_column_name(c))
274 .collect::<Result<Vec<_>, _>>()?;
275 let refs: Vec<&str> = resolved.iter().map(|s| s.as_str()).collect();
276 transformations::order_by(self, refs, ascending, self.case_sensitive)
277 }
278
279 pub fn order_by_exprs(&self, sort_orders: Vec<SortOrder>) -> Result<DataFrame, PolarsError> {
281 transformations::order_by_exprs(self, sort_orders, self.case_sensitive)
282 }
283
284 pub fn union(&self, other: &DataFrame) -> Result<DataFrame, PolarsError> {
286 transformations::union(self, other, self.case_sensitive)
287 }
288
289 pub fn union_by_name(&self, other: &DataFrame) -> Result<DataFrame, PolarsError> {
291 transformations::union_by_name(self, other, self.case_sensitive)
292 }
293
294 pub fn distinct(&self, subset: Option<Vec<&str>>) -> Result<DataFrame, PolarsError> {
296 transformations::distinct(self, subset, self.case_sensitive)
297 }
298
299 pub fn drop(&self, columns: Vec<&str>) -> Result<DataFrame, PolarsError> {
301 transformations::drop(self, columns, self.case_sensitive)
302 }
303
304 pub fn dropna(&self, subset: Option<Vec<&str>>) -> Result<DataFrame, PolarsError> {
306 transformations::dropna(self, subset, self.case_sensitive)
307 }
308
309 pub fn fillna(&self, value: Expr) -> Result<DataFrame, PolarsError> {
311 transformations::fillna(self, value, self.case_sensitive)
312 }
313
314 pub fn limit(&self, n: usize) -> Result<DataFrame, PolarsError> {
316 transformations::limit(self, n, self.case_sensitive)
317 }
318
319 pub fn with_column_renamed(
321 &self,
322 old_name: &str,
323 new_name: &str,
324 ) -> Result<DataFrame, PolarsError> {
325 transformations::with_column_renamed(self, old_name, new_name, self.case_sensitive)
326 }
327
328 pub fn replace(
330 &self,
331 column_name: &str,
332 old_value: Expr,
333 new_value: Expr,
334 ) -> Result<DataFrame, PolarsError> {
335 transformations::replace(self, column_name, old_value, new_value, self.case_sensitive)
336 }
337
338 pub fn cross_join(&self, other: &DataFrame) -> Result<DataFrame, PolarsError> {
340 transformations::cross_join(self, other, self.case_sensitive)
341 }
342
343 pub fn describe(&self) -> Result<DataFrame, PolarsError> {
345 transformations::describe(self, self.case_sensitive)
346 }
347
348 pub fn cache(&self) -> Result<DataFrame, PolarsError> {
350 Ok(self.clone())
351 }
352
353 pub fn persist(&self) -> Result<DataFrame, PolarsError> {
355 Ok(self.clone())
356 }
357
358 pub fn unpersist(&self) -> Result<DataFrame, PolarsError> {
360 Ok(self.clone())
361 }
362
363 pub fn subtract(&self, other: &DataFrame) -> Result<DataFrame, PolarsError> {
365 transformations::subtract(self, other, self.case_sensitive)
366 }
367
368 pub fn intersect(&self, other: &DataFrame) -> Result<DataFrame, PolarsError> {
370 transformations::intersect(self, other, self.case_sensitive)
371 }
372
373 pub fn sample(
375 &self,
376 with_replacement: bool,
377 fraction: f64,
378 seed: Option<u64>,
379 ) -> Result<DataFrame, PolarsError> {
380 transformations::sample(self, with_replacement, fraction, seed, self.case_sensitive)
381 }
382
383 pub fn random_split(
385 &self,
386 weights: &[f64],
387 seed: Option<u64>,
388 ) -> Result<Vec<DataFrame>, PolarsError> {
389 transformations::random_split(self, weights, seed, self.case_sensitive)
390 }
391
392 pub fn sample_by(
395 &self,
396 col_name: &str,
397 fractions: &[(Expr, f64)],
398 seed: Option<u64>,
399 ) -> Result<DataFrame, PolarsError> {
400 transformations::sample_by(self, col_name, fractions, seed, self.case_sensitive)
401 }
402
403 pub fn first(&self) -> Result<DataFrame, PolarsError> {
405 transformations::first(self, self.case_sensitive)
406 }
407
408 pub fn head(&self, n: usize) -> Result<DataFrame, PolarsError> {
410 transformations::head(self, n, self.case_sensitive)
411 }
412
413 pub fn take(&self, n: usize) -> Result<DataFrame, PolarsError> {
415 transformations::take(self, n, self.case_sensitive)
416 }
417
418 pub fn tail(&self, n: usize) -> Result<DataFrame, PolarsError> {
420 transformations::tail(self, n, self.case_sensitive)
421 }
422
423 pub fn is_empty(&self) -> bool {
425 transformations::is_empty(self)
426 }
427
428 pub fn to_df(&self, names: Vec<&str>) -> Result<DataFrame, PolarsError> {
430 transformations::to_df(self, &names, self.case_sensitive)
431 }
432
433 pub fn stat(&self) -> DataFrameStat<'_> {
435 DataFrameStat { df: self }
436 }
437
438 pub fn corr(&self) -> Result<DataFrame, PolarsError> {
440 self.stat().corr_matrix()
441 }
442
443 pub fn corr_cols(&self, col1: &str, col2: &str) -> Result<f64, PolarsError> {
445 self.stat().corr(col1, col2)
446 }
447
448 pub fn cov_cols(&self, col1: &str, col2: &str) -> Result<f64, PolarsError> {
450 self.stat().cov(col1, col2)
451 }
452
453 pub fn summary(&self) -> Result<DataFrame, PolarsError> {
455 self.describe()
456 }
457
458 pub fn to_json(&self) -> Result<Vec<String>, PolarsError> {
460 transformations::to_json(self)
461 }
462
463 pub fn explain(&self) -> String {
465 transformations::explain(self)
466 }
467
468 pub fn print_schema(&self) -> Result<String, PolarsError> {
470 transformations::print_schema(self)
471 }
472
473 pub fn checkpoint(&self) -> Result<DataFrame, PolarsError> {
475 Ok(self.clone())
476 }
477
478 pub fn local_checkpoint(&self) -> Result<DataFrame, PolarsError> {
480 Ok(self.clone())
481 }
482
483 pub fn repartition(&self, _num_partitions: usize) -> Result<DataFrame, PolarsError> {
485 Ok(self.clone())
486 }
487
488 pub fn repartition_by_range(
490 &self,
491 _num_partitions: usize,
492 _cols: Vec<&str>,
493 ) -> Result<DataFrame, PolarsError> {
494 Ok(self.clone())
495 }
496
497 pub fn dtypes(&self) -> Result<Vec<(String, String)>, PolarsError> {
499 let schema = self.df.schema();
500 Ok(schema
501 .iter_names_and_dtypes()
502 .map(|(name, dtype)| (name.to_string(), format!("{dtype:?}")))
503 .collect())
504 }
505
506 pub fn sort_within_partitions(
508 &self,
509 _cols: &[crate::functions::SortOrder],
510 ) -> Result<DataFrame, PolarsError> {
511 Ok(self.clone())
512 }
513
514 pub fn coalesce(&self, _num_partitions: usize) -> Result<DataFrame, PolarsError> {
516 Ok(self.clone())
517 }
518
519 pub fn hint(&self, _name: &str, _params: &[i32]) -> Result<DataFrame, PolarsError> {
521 Ok(self.clone())
522 }
523
524 pub fn is_local(&self) -> bool {
526 true
527 }
528
529 pub fn input_files(&self) -> Vec<String> {
531 Vec::new()
532 }
533
534 pub fn same_semantics(&self, _other: &DataFrame) -> bool {
536 false
537 }
538
539 pub fn semantic_hash(&self) -> u64 {
541 0
542 }
543
544 pub fn observe(&self, _name: &str, _expr: Expr) -> Result<DataFrame, PolarsError> {
546 Ok(self.clone())
547 }
548
549 pub fn with_watermark(
551 &self,
552 _event_time: &str,
553 _delay: &str,
554 ) -> Result<DataFrame, PolarsError> {
555 Ok(self.clone())
556 }
557
558 pub fn select_expr(&self, exprs: &[String]) -> Result<DataFrame, PolarsError> {
560 transformations::select_expr(self, exprs, self.case_sensitive)
561 }
562
563 pub fn col_regex(&self, pattern: &str) -> Result<DataFrame, PolarsError> {
565 transformations::col_regex(self, pattern, self.case_sensitive)
566 }
567
568 pub fn with_columns(&self, exprs: &[(String, Column)]) -> Result<DataFrame, PolarsError> {
570 transformations::with_columns(self, exprs, self.case_sensitive)
571 }
572
573 pub fn with_columns_renamed(
575 &self,
576 renames: &[(String, String)],
577 ) -> Result<DataFrame, PolarsError> {
578 transformations::with_columns_renamed(self, renames, self.case_sensitive)
579 }
580
581 pub fn na(&self) -> DataFrameNa<'_> {
583 DataFrameNa { df: self }
584 }
585
586 pub fn offset(&self, n: usize) -> Result<DataFrame, PolarsError> {
588 transformations::offset(self, n, self.case_sensitive)
589 }
590
591 pub fn transform<F>(&self, f: F) -> Result<DataFrame, PolarsError>
593 where
594 F: FnOnce(DataFrame) -> Result<DataFrame, PolarsError>,
595 {
596 transformations::transform(self, f)
597 }
598
599 pub fn freq_items(&self, columns: &[&str], support: f64) -> Result<DataFrame, PolarsError> {
601 transformations::freq_items(self, columns, support, self.case_sensitive)
602 }
603
604 pub fn approx_quantile(
606 &self,
607 column: &str,
608 probabilities: &[f64],
609 ) -> Result<DataFrame, PolarsError> {
610 transformations::approx_quantile(self, column, probabilities, self.case_sensitive)
611 }
612
613 pub fn crosstab(&self, col1: &str, col2: &str) -> Result<DataFrame, PolarsError> {
615 transformations::crosstab(self, col1, col2, self.case_sensitive)
616 }
617
618 pub fn melt(&self, id_vars: &[&str], value_vars: &[&str]) -> Result<DataFrame, PolarsError> {
620 transformations::melt(self, id_vars, value_vars, self.case_sensitive)
621 }
622
623 pub fn pivot(
625 &self,
626 _pivot_col: &str,
627 _values: Option<Vec<&str>>,
628 ) -> Result<DataFrame, PolarsError> {
629 Err(PolarsError::InvalidOperation(
630 "pivot is not yet implemented; use crosstab(col1, col2) for two-column cross-tabulation."
631 .into(),
632 ))
633 }
634
635 pub fn except_all(&self, other: &DataFrame) -> Result<DataFrame, PolarsError> {
637 transformations::except_all(self, other, self.case_sensitive)
638 }
639
640 pub fn intersect_all(&self, other: &DataFrame) -> Result<DataFrame, PolarsError> {
642 transformations::intersect_all(self, other, self.case_sensitive)
643 }
644
645 #[cfg(feature = "delta")]
648 pub fn write_delta(
649 &self,
650 path: impl AsRef<std::path::Path>,
651 overwrite: bool,
652 ) -> Result<(), PolarsError> {
653 crate::delta::write_delta(self.df.as_ref(), path, overwrite)
654 }
655
656 #[cfg(not(feature = "delta"))]
658 pub fn write_delta(
659 &self,
660 _path: impl AsRef<std::path::Path>,
661 _overwrite: bool,
662 ) -> Result<(), PolarsError> {
663 Err(PolarsError::InvalidOperation(
664 "Delta Lake requires the 'delta' feature. Build with --features delta.".into(),
665 ))
666 }
667
668 pub fn write(&self) -> DataFrameWriter<'_> {
670 DataFrameWriter {
671 df: self,
672 mode: WriteMode::Overwrite,
673 format: WriteFormat::Parquet,
674 options: HashMap::new(),
675 partition_by: Vec::new(),
676 }
677 }
678}
679
680#[derive(Clone, Copy)]
682pub enum WriteMode {
683 Overwrite,
684 Append,
685}
686
687#[derive(Clone, Copy)]
689pub enum WriteFormat {
690 Parquet,
691 Csv,
692 Json,
693}
694
695pub struct DataFrameWriter<'a> {
697 df: &'a DataFrame,
698 mode: WriteMode,
699 format: WriteFormat,
700 options: HashMap<String, String>,
701 partition_by: Vec<String>,
702}
703
704impl<'a> DataFrameWriter<'a> {
705 pub fn mode(mut self, mode: WriteMode) -> Self {
706 self.mode = mode;
707 self
708 }
709
710 pub fn format(mut self, format: WriteFormat) -> Self {
711 self.format = format;
712 self
713 }
714
715 pub fn option(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
717 self.options.insert(key.into(), value.into());
718 self
719 }
720
721 pub fn options(mut self, opts: impl IntoIterator<Item = (String, String)>) -> Self {
723 for (k, v) in opts {
724 self.options.insert(k, v);
725 }
726 self
727 }
728
729 pub fn partition_by(mut self, cols: impl IntoIterator<Item = impl Into<String>>) -> Self {
731 self.partition_by = cols.into_iter().map(|s| s.into()).collect();
732 self
733 }
734
735 pub fn parquet(&self, path: impl AsRef<std::path::Path>) -> Result<(), PolarsError> {
737 DataFrameWriter {
738 df: self.df,
739 mode: self.mode,
740 format: WriteFormat::Parquet,
741 options: self.options.clone(),
742 partition_by: self.partition_by.clone(),
743 }
744 .save(path)
745 }
746
747 pub fn csv(&self, path: impl AsRef<std::path::Path>) -> Result<(), PolarsError> {
749 DataFrameWriter {
750 df: self.df,
751 mode: self.mode,
752 format: WriteFormat::Csv,
753 options: self.options.clone(),
754 partition_by: self.partition_by.clone(),
755 }
756 .save(path)
757 }
758
759 pub fn json(&self, path: impl AsRef<std::path::Path>) -> Result<(), PolarsError> {
761 DataFrameWriter {
762 df: self.df,
763 mode: self.mode,
764 format: WriteFormat::Json,
765 options: self.options.clone(),
766 partition_by: self.partition_by.clone(),
767 }
768 .save(path)
769 }
770
771 pub fn save(&self, path: impl AsRef<std::path::Path>) -> Result<(), PolarsError> {
773 use polars::prelude::*;
774 let path = path.as_ref();
775 let to_write: PlDataFrame = match self.mode {
776 WriteMode::Overwrite => self.df.df.as_ref().clone(),
777 WriteMode::Append => {
778 use polars::prelude::*;
779 let existing: Option<PlDataFrame> = if path.exists() {
780 match self.format {
781 WriteFormat::Parquet => {
782 LazyFrame::scan_parquet(path, ScanArgsParquet::default())
783 .and_then(|lf| lf.collect())
784 .ok()
785 }
786 WriteFormat::Csv => LazyCsvReader::new(path)
787 .with_has_header(true)
788 .finish()
789 .and_then(|lf| lf.collect())
790 .ok(),
791 WriteFormat::Json => LazyJsonLineReader::new(path)
792 .finish()
793 .and_then(|lf| lf.collect())
794 .ok(),
795 }
796 } else {
797 None
798 };
799 match existing {
800 Some(existing) => {
801 let lfs: [polars::prelude::LazyFrame; 2] =
802 [existing.lazy(), self.df.df.as_ref().clone().lazy()];
803 polars::prelude::concat(lfs, UnionArgs::default())?.collect()?
804 }
805 None => self.df.df.as_ref().clone(),
806 }
807 }
808 };
809 match self.format {
810 WriteFormat::Parquet => {
811 if self.partition_by.is_empty() {
812 let mut file = std::fs::File::create(path).map_err(|e| {
813 PolarsError::ComputeError(format!("write parquet create: {e}").into())
814 })?;
815 let mut df_mut = to_write;
816 ParquetWriter::new(&mut file)
817 .finish(&mut df_mut)
818 .map_err(|e| {
819 PolarsError::ComputeError(format!("write parquet: {e}").into())
820 })?;
821 } else {
822 return Err(PolarsError::InvalidOperation(
824 "partitionBy for parquet write is not yet implemented. Use save(path) without partitionBy.".into(),
825 ));
826 }
827 }
828 WriteFormat::Csv => {
829 let has_header = self
830 .options
831 .get("header")
832 .map(|v| v.eq_ignore_ascii_case("true") || v == "1")
833 .unwrap_or(true);
834 let delimiter = self
835 .options
836 .get("sep")
837 .and_then(|s| s.bytes().next())
838 .unwrap_or(b',');
839 let mut file = std::fs::File::create(path).map_err(|e| {
840 PolarsError::ComputeError(format!("write csv create: {e}").into())
841 })?;
842 CsvWriter::new(&mut file)
843 .include_header(has_header)
844 .with_separator(delimiter)
845 .finish(&mut to_write.clone())
846 .map_err(|e| PolarsError::ComputeError(format!("write csv: {e}").into()))?;
847 }
848 WriteFormat::Json => {
849 let mut file = std::fs::File::create(path).map_err(|e| {
850 PolarsError::ComputeError(format!("write json create: {e}").into())
851 })?;
852 JsonWriter::new(&mut file)
853 .finish(&mut to_write.clone())
854 .map_err(|e| PolarsError::ComputeError(format!("write json: {e}").into()))?;
855 }
856 }
857 Ok(())
858 }
859}
860
861impl Clone for DataFrame {
862 fn clone(&self) -> Self {
863 DataFrame {
864 df: self.df.clone(),
865 case_sensitive: self.case_sensitive,
866 }
867 }
868}
869
870fn any_value_to_json(av: AnyValue<'_>) -> JsonValue {
872 match av {
873 AnyValue::Null => JsonValue::Null,
874 AnyValue::Boolean(b) => JsonValue::Bool(b),
875 AnyValue::Int32(i) => JsonValue::Number(serde_json::Number::from(i)),
876 AnyValue::Int64(i) => JsonValue::Number(serde_json::Number::from(i)),
877 AnyValue::UInt32(u) => JsonValue::Number(serde_json::Number::from(u)),
878 AnyValue::UInt64(u) => JsonValue::Number(serde_json::Number::from(u)),
879 AnyValue::Float32(f) => serde_json::Number::from_f64(f64::from(f))
880 .map(JsonValue::Number)
881 .unwrap_or(JsonValue::Null),
882 AnyValue::Float64(f) => serde_json::Number::from_f64(f)
883 .map(JsonValue::Number)
884 .unwrap_or(JsonValue::Null),
885 AnyValue::String(s) => JsonValue::String(s.to_string()),
886 AnyValue::StringOwned(s) => JsonValue::String(s.to_string()),
887 _ => JsonValue::Null,
888 }
889}