1use std::collections::HashMap;
19use std::ffi::{CStr, CString};
20use std::ptr::NonNull;
21use std::str::FromStr;
22use std::sync::Arc;
23
24use arrow::array::{Array, ArrayRef, RecordBatch, RecordBatchReader, new_null_array};
25use arrow::compute::can_cast_types;
26use arrow::error::ArrowError;
27use arrow::ffi::FFI_ArrowSchema;
28use arrow::ffi_stream::FFI_ArrowArrayStream;
29use arrow::pyarrow::FromPyArrow;
30use cstr::cstr;
31use datafusion::arrow::datatypes::{Schema, SchemaRef};
32use datafusion::arrow::pyarrow::{PyArrowType, ToPyArrow};
33use datafusion::arrow::util::pretty;
34use datafusion::catalog::TableProvider;
35use datafusion::common::UnnestOptions;
36use datafusion::config::{CsvOptions, ParquetColumnOptions, ParquetOptions, TableParquetOptions};
37use datafusion::dataframe::{DataFrame, DataFrameWriteOptions};
38use datafusion::error::DataFusionError;
39use datafusion::execution::SendableRecordBatchStream;
40use datafusion::logical_expr::SortExpr;
41use datafusion::logical_expr::dml::InsertOp;
42use datafusion::parquet::basic::{BrotliLevel, Compression, GzipLevel, ZstdLevel};
43use datafusion::prelude::*;
44use datafusion_python_util::{is_ipython_env, spawn_future, wait_for_future};
45use futures::{StreamExt, TryStreamExt};
46use parking_lot::Mutex;
47use pyo3::PyErr;
48use pyo3::exceptions::PyValueError;
49use pyo3::prelude::*;
50use pyo3::pybacked::PyBackedStr;
51use pyo3::types::{PyCapsule, PyList, PyTuple, PyTupleMethods};
52
53use crate::common::data_type::PyScalarValue;
54use crate::errors::{PyDataFusionError, PyDataFusionResult, py_datafusion_err};
55use crate::expr::PyExpr;
56use crate::expr::sort_expr::{PySortExpr, to_sort_expressions};
57use crate::physical_plan::PyExecutionPlan;
58use crate::record_batch::{PyRecordBatchStream, poll_next_batch};
59use crate::sql::logical::PyLogicalPlan;
60use crate::table::{PyTable, TempViewTable};
61
62static ARROW_ARRAY_STREAM_NAME: &CStr = cstr!("arrow_array_stream");
64
65type CachedBatches = Option<(Vec<RecordBatch>, bool)>;
68type SharedCachedBatches = Arc<Mutex<CachedBatches>>;
69
70#[derive(Debug, Clone)]
72pub struct FormatterConfig {
73 pub max_bytes: usize,
75 pub min_rows: usize,
77 pub max_rows: usize,
79}
80
81impl Default for FormatterConfig {
82 fn default() -> Self {
83 Self {
84 max_bytes: 2 * 1024 * 1024, min_rows: 10,
86 max_rows: 10,
87 }
88 }
89}
90
91impl FormatterConfig {
92 pub fn validate(&self) -> Result<(), String> {
98 if self.max_bytes == 0 {
99 return Err("max_bytes must be a positive integer".to_string());
100 }
101
102 if self.min_rows == 0 {
103 return Err("min_rows must be a positive integer".to_string());
104 }
105
106 if self.max_rows == 0 {
107 return Err("max_rows must be a positive integer".to_string());
108 }
109
110 if self.min_rows > self.max_rows {
111 return Err("min_rows must be less than or equal to max_rows".to_string());
112 }
113
114 Ok(())
115 }
116}
117
118struct PythonFormatter<'py> {
120 formatter: Bound<'py, PyAny>,
122 config: FormatterConfig,
124}
125
126fn get_python_formatter_with_config(py: Python) -> PyResult<PythonFormatter> {
128 let formatter = import_python_formatter(py)?;
129 let config = build_formatter_config_from_python(&formatter)?;
130 Ok(PythonFormatter { formatter, config })
131}
132
133fn import_python_formatter(py: Python<'_>) -> PyResult<Bound<'_, PyAny>> {
135 let formatter_module = py.import("datafusion.dataframe_formatter")?;
136 let get_formatter = formatter_module.getattr("get_formatter")?;
137 get_formatter.call0()
138}
139
140fn get_attr<'a, T>(py_object: &'a Bound<'a, PyAny>, attr_name: &str, default_value: T) -> T
142where
143 T: for<'py> pyo3::FromPyObject<'py, 'py> + Clone,
144{
145 py_object
146 .getattr(attr_name)
147 .and_then(|v| v.extract::<T>().map_err(Into::<PyErr>::into))
148 .unwrap_or_else(|_| default_value.clone())
149}
150
151fn build_formatter_config_from_python(formatter: &Bound<'_, PyAny>) -> PyResult<FormatterConfig> {
153 let default_config = FormatterConfig::default();
154 let max_bytes = get_attr(formatter, "max_memory_bytes", default_config.max_bytes);
155 let min_rows = get_attr(formatter, "min_rows", default_config.min_rows);
156
157 let max_rows = get_attr(formatter, "max_rows", 0usize);
161 let max_rows = if max_rows > 0 {
162 max_rows
164 } else {
165 let repr_rows = get_attr(formatter, "repr_rows", 0usize);
167 if repr_rows > 0 {
168 repr_rows
169 } else {
170 default_config.max_rows
172 }
173 };
174
175 let config = FormatterConfig {
176 max_bytes,
177 min_rows,
178 max_rows,
179 };
180
181 config.validate().map_err(PyValueError::new_err)?;
183 Ok(config)
184}
185
186#[pyclass(
188 from_py_object,
189 frozen,
190 name = "ParquetWriterOptions",
191 module = "datafusion",
192 subclass
193)]
194#[derive(Clone, Default)]
195pub struct PyParquetWriterOptions {
196 options: ParquetOptions,
197}
198
199#[pymethods]
200impl PyParquetWriterOptions {
201 #[new]
202 #[allow(clippy::too_many_arguments)]
203 pub fn new(
204 data_pagesize_limit: usize,
205 write_batch_size: usize,
206 writer_version: &str,
207 skip_arrow_metadata: bool,
208 compression: Option<String>,
209 dictionary_enabled: Option<bool>,
210 dictionary_page_size_limit: usize,
211 statistics_enabled: Option<String>,
212 max_row_group_size: usize,
213 created_by: String,
214 column_index_truncate_length: Option<usize>,
215 statistics_truncate_length: Option<usize>,
216 data_page_row_count_limit: usize,
217 encoding: Option<String>,
218 bloom_filter_on_write: bool,
219 bloom_filter_fpp: Option<f64>,
220 bloom_filter_ndv: Option<u64>,
221 allow_single_file_parallelism: bool,
222 maximum_parallel_row_group_writers: usize,
223 maximum_buffered_record_batches_per_stream: usize,
224 ) -> PyResult<Self> {
225 let writer_version =
226 datafusion::common::parquet_config::DFParquetWriterVersion::from_str(writer_version)
227 .map_err(py_datafusion_err)?;
228 Ok(Self {
229 options: ParquetOptions {
230 data_pagesize_limit,
231 write_batch_size,
232 writer_version,
233 skip_arrow_metadata,
234 compression,
235 dictionary_enabled,
236 dictionary_page_size_limit,
237 statistics_enabled,
238 max_row_group_size,
239 created_by,
240 column_index_truncate_length,
241 statistics_truncate_length,
242 data_page_row_count_limit,
243 encoding,
244 bloom_filter_on_write,
245 bloom_filter_fpp,
246 bloom_filter_ndv,
247 allow_single_file_parallelism,
248 maximum_parallel_row_group_writers,
249 maximum_buffered_record_batches_per_stream,
250 ..Default::default()
251 },
252 })
253 }
254}
255
256#[pyclass(
258 from_py_object,
259 frozen,
260 name = "ParquetColumnOptions",
261 module = "datafusion",
262 subclass
263)]
264#[derive(Clone, Default)]
265pub struct PyParquetColumnOptions {
266 options: ParquetColumnOptions,
267}
268
269#[pymethods]
270impl PyParquetColumnOptions {
271 #[new]
272 pub fn new(
273 bloom_filter_enabled: Option<bool>,
274 encoding: Option<String>,
275 dictionary_enabled: Option<bool>,
276 compression: Option<String>,
277 statistics_enabled: Option<String>,
278 bloom_filter_fpp: Option<f64>,
279 bloom_filter_ndv: Option<u64>,
280 ) -> Self {
281 Self {
282 options: ParquetColumnOptions {
283 bloom_filter_enabled,
284 encoding,
285 dictionary_enabled,
286 compression,
287 statistics_enabled,
288 bloom_filter_fpp,
289 bloom_filter_ndv,
290 },
291 }
292 }
293}
294
295#[pyclass(
299 from_py_object,
300 name = "DataFrame",
301 module = "datafusion",
302 subclass,
303 frozen
304)]
305#[derive(Clone)]
306pub struct PyDataFrame {
307 df: Arc<DataFrame>,
308
309 batches: SharedCachedBatches,
311}
312
313impl PyDataFrame {
314 pub fn new(df: DataFrame) -> Self {
316 Self {
317 df: Arc::new(df),
318 batches: Arc::new(Mutex::new(None)),
319 }
320 }
321
322 pub(crate) fn inner_df(&self) -> Arc<DataFrame> {
324 Arc::clone(&self.df)
325 }
326
327 fn prepare_repr_string<'py>(
328 &self,
329 py: Python<'py>,
330 as_html: bool,
331 ) -> PyDataFusionResult<String> {
332 let PythonFormatter { formatter, config } = get_python_formatter_with_config(py)?;
334
335 let is_ipython = *is_ipython_env(py);
336
337 let (cached_batches, should_cache) = {
338 let mut cache = self.batches.lock();
339 let should_cache = is_ipython && cache.is_none();
340 let batches = cache.take();
341 (batches, should_cache)
342 };
343
344 let (batches, has_more) = match cached_batches {
345 Some(b) => b,
346 None => wait_for_future(
347 py,
348 collect_record_batches_to_display(self.df.as_ref().clone(), config),
349 )??,
350 };
351
352 if batches.is_empty() {
353 return Ok("No data to display".to_string());
355 }
356
357 let table_uuid = uuid::Uuid::new_v4().to_string();
358
359 let py_batches = batches
361 .iter()
362 .map(|rb| rb.to_pyarrow(py))
363 .collect::<PyResult<Vec<Bound<'py, PyAny>>>>()?;
364
365 let py_schema = self.schema().into_pyobject(py)?;
366
367 let kwargs = pyo3::types::PyDict::new(py);
368 let py_batches_list = PyList::new(py, py_batches.as_slice())?;
369 kwargs.set_item("batches", py_batches_list)?;
370 kwargs.set_item("schema", py_schema)?;
371 kwargs.set_item("has_more", has_more)?;
372 kwargs.set_item("table_uuid", table_uuid)?;
373
374 let method_name = match as_html {
375 true => "format_html",
376 false => "format_str",
377 };
378
379 let html_result = formatter.call_method(method_name, (), Some(&kwargs))?;
380 let html_str: String = html_result.extract()?;
381
382 if should_cache {
383 let mut cache = self.batches.lock();
384 *cache = Some((batches.clone(), has_more));
385 }
386
387 Ok(html_str)
388 }
389
390 async fn collect_column_inner(&self, column: &str) -> Result<ArrayRef, DataFusionError> {
391 let batches = self
392 .df
393 .as_ref()
394 .clone()
395 .select_columns(&[column])?
396 .collect()
397 .await?;
398
399 let arrays = batches
400 .iter()
401 .map(|b| b.column(0).as_ref())
402 .collect::<Vec<_>>();
403
404 arrow_select::concat::concat(&arrays).map_err(Into::into)
405 }
406}
407
408struct PartitionedDataFrameStreamReader {
415 streams: Vec<SendableRecordBatchStream>,
416 schema: SchemaRef,
417 projection: Option<SchemaRef>,
418 current: usize,
419}
420
421impl Iterator for PartitionedDataFrameStreamReader {
422 type Item = Result<RecordBatch, ArrowError>;
423
424 fn next(&mut self) -> Option<Self::Item> {
425 while self.current < self.streams.len() {
426 let stream = &mut self.streams[self.current];
427 let fut = poll_next_batch(stream);
428 let result = Python::attach(|py| wait_for_future(py, fut));
429
430 match result {
431 Ok(Ok(Some(batch))) => {
432 let batch = if let Some(ref schema) = self.projection {
433 match record_batch_into_schema(batch, schema.as_ref()) {
434 Ok(b) => b,
435 Err(e) => return Some(Err(e)),
436 }
437 } else {
438 batch
439 };
440 return Some(Ok(batch));
441 }
442 Ok(Ok(None)) => {
443 self.current += 1;
444 continue;
445 }
446 Ok(Err(e)) => {
447 return Some(Err(ArrowError::ExternalError(Box::new(e))));
448 }
449 Err(e) => {
450 return Some(Err(ArrowError::ExternalError(Box::new(e))));
451 }
452 }
453 }
454
455 None
456 }
457}
458
459impl RecordBatchReader for PartitionedDataFrameStreamReader {
460 fn schema(&self) -> SchemaRef {
461 self.schema.clone()
462 }
463}
464
465#[pymethods]
466impl PyDataFrame {
467 fn __getitem__(&self, key: Bound<'_, PyAny>) -> PyDataFusionResult<Self> {
469 if let Ok(key) = key.extract::<PyBackedStr>() {
470 self.select_exprs(vec![key])
472 } else if let Ok(tuple) = key.cast::<PyTuple>() {
473 let keys = tuple
475 .iter()
476 .map(|item| item.extract::<PyBackedStr>())
477 .collect::<PyResult<Vec<PyBackedStr>>>()?;
478 self.select_exprs(keys)
479 } else if let Ok(keys) = key.extract::<Vec<PyBackedStr>>() {
480 self.select_exprs(keys)
482 } else {
483 let message = "DataFrame can only be indexed by string index or indices".to_string();
484 Err(PyDataFusionError::Common(message))
485 }
486 }
487
488 fn __repr__(&self, py: Python) -> PyDataFusionResult<String> {
489 self.prepare_repr_string(py, false)
490 }
491
492 #[staticmethod]
493 #[expect(unused_variables)]
494 fn default_str_repr<'py>(
495 batches: Vec<Bound<'py, PyAny>>,
496 schema: &Bound<'py, PyAny>,
497 has_more: bool,
498 table_uuid: &str,
499 ) -> PyResult<String> {
500 let batches = batches
501 .into_iter()
502 .map(|batch| RecordBatch::from_pyarrow_bound(&batch))
503 .collect::<PyResult<Vec<RecordBatch>>>()?
504 .into_iter()
505 .filter(|batch| batch.num_rows() > 0)
506 .collect::<Vec<_>>();
507
508 if batches.is_empty() {
509 return Ok("No data to display".to_owned());
510 }
511
512 let batches_as_displ =
513 pretty::pretty_format_batches(&batches).map_err(py_datafusion_err)?;
514
515 let additional_str = match has_more {
516 true => "\nData truncated.",
517 false => "",
518 };
519
520 Ok(format!("DataFrame()\n{batches_as_displ}{additional_str}"))
521 }
522
523 fn _repr_html_(&self, py: Python) -> PyDataFusionResult<String> {
524 self.prepare_repr_string(py, true)
525 }
526
527 fn describe(&self, py: Python) -> PyDataFusionResult<Self> {
529 let df = self.df.as_ref().clone();
530 let stat_df = wait_for_future(py, df.describe())??;
531 Ok(Self::new(stat_df))
532 }
533
534 fn schema(&self) -> PyArrowType<Schema> {
536 PyArrowType(self.df.schema().as_arrow().clone())
537 }
538
539 #[allow(clippy::wrong_self_convention)]
545 pub fn into_view(&self, temporary: bool) -> PyDataFusionResult<PyTable> {
546 let table_provider = if temporary {
547 Arc::new(TempViewTable::new(Arc::clone(&self.df))) as Arc<dyn TableProvider>
548 } else {
549 self.df.as_ref().clone().into_view()
553 };
554 Ok(PyTable::from(table_provider))
555 }
556
557 #[pyo3(signature = (*args))]
558 fn select_exprs(&self, args: Vec<PyBackedStr>) -> PyDataFusionResult<Self> {
559 let args = args.iter().map(|s| s.as_ref()).collect::<Vec<&str>>();
560 let df = self.df.as_ref().clone().select_exprs(&args)?;
561 Ok(Self::new(df))
562 }
563
564 #[pyo3(signature = (*args))]
565 fn select(&self, args: Vec<PyExpr>) -> PyDataFusionResult<Self> {
566 let expr: Vec<Expr> = args.into_iter().map(|e| e.into()).collect();
567 let df = self.df.as_ref().clone().select(expr)?;
568 Ok(Self::new(df))
569 }
570
571 #[pyo3(signature = (*args))]
572 fn drop(&self, args: Vec<PyBackedStr>) -> PyDataFusionResult<Self> {
573 let cols = args.iter().map(|s| s.as_ref()).collect::<Vec<&str>>();
574 let df = self.df.as_ref().clone().drop_columns(&cols)?;
575 Ok(Self::new(df))
576 }
577
578 #[pyo3(signature = (*exprs))]
580 fn window(&self, exprs: Vec<PyExpr>) -> PyDataFusionResult<Self> {
581 let window_exprs = exprs.into_iter().map(|e| e.into()).collect();
582 let df = self.df.as_ref().clone().window(window_exprs)?;
583 Ok(Self::new(df))
584 }
585
586 fn filter(&self, predicate: PyExpr) -> PyDataFusionResult<Self> {
587 let df = self.df.as_ref().clone().filter(predicate.into())?;
588 Ok(Self::new(df))
589 }
590
591 fn parse_sql_expr(&self, expr: PyBackedStr) -> PyDataFusionResult<PyExpr> {
592 self.df
593 .as_ref()
594 .parse_sql_expr(&expr)
595 .map(PyExpr::from)
596 .map_err(PyDataFusionError::from)
597 }
598
599 fn with_column(&self, name: &str, expr: PyExpr) -> PyDataFusionResult<Self> {
600 let df = self.df.as_ref().clone().with_column(name, expr.into())?;
601 Ok(Self::new(df))
602 }
603
604 fn with_columns(&self, exprs: Vec<PyExpr>) -> PyDataFusionResult<Self> {
605 let mut df = self.df.as_ref().clone();
606 for expr in exprs {
607 let expr: Expr = expr.into();
608 let name = format!("{}", expr.schema_name());
609 df = df.with_column(name.as_str(), expr)?
610 }
611 Ok(Self::new(df))
612 }
613
614 fn with_column_renamed(&self, old_name: &str, new_name: &str) -> PyDataFusionResult<Self> {
617 let df = self
618 .df
619 .as_ref()
620 .clone()
621 .with_column_renamed(old_name, new_name)?;
622 Ok(Self::new(df))
623 }
624
625 fn aggregate(&self, group_by: Vec<PyExpr>, aggs: Vec<PyExpr>) -> PyDataFusionResult<Self> {
626 let group_by = group_by.into_iter().map(|e| e.into()).collect();
627 let aggs = aggs.into_iter().map(|e| e.into()).collect();
628 let df = self.df.as_ref().clone().aggregate(group_by, aggs)?;
629 Ok(Self::new(df))
630 }
631
632 #[pyo3(signature = (*exprs))]
633 fn sort(&self, exprs: Vec<PySortExpr>) -> PyDataFusionResult<Self> {
634 let exprs = to_sort_expressions(exprs);
635 let df = self.df.as_ref().clone().sort(exprs)?;
636 Ok(Self::new(df))
637 }
638
639 #[pyo3(signature = (count, offset=0))]
640 fn limit(&self, count: usize, offset: usize) -> PyDataFusionResult<Self> {
641 let df = self.df.as_ref().clone().limit(offset, Some(count))?;
642 Ok(Self::new(df))
643 }
644
645 fn collect<'py>(&self, py: Python<'py>) -> PyResult<Vec<Bound<'py, PyAny>>> {
649 let batches = wait_for_future(py, self.df.as_ref().clone().collect())?
650 .map_err(PyDataFusionError::from)?;
651 batches.into_iter().map(|rb| rb.to_pyarrow(py)).collect()
654 }
655
656 fn cache(&self, py: Python) -> PyDataFusionResult<Self> {
658 let df = wait_for_future(py, self.df.as_ref().clone().cache())??;
659 Ok(Self::new(df))
660 }
661
662 fn collect_partitioned<'py>(&self, py: Python<'py>) -> PyResult<Vec<Vec<Bound<'py, PyAny>>>> {
665 let batches = wait_for_future(py, self.df.as_ref().clone().collect_partitioned())?
666 .map_err(PyDataFusionError::from)?;
667
668 batches
669 .into_iter()
670 .map(|rbs| rbs.into_iter().map(|rb| rb.to_pyarrow(py)).collect())
671 .collect()
672 }
673
674 fn collect_column<'py>(&self, py: Python<'py>, column: &str) -> PyResult<Bound<'py, PyAny>> {
675 wait_for_future(py, self.collect_column_inner(column))?
676 .map_err(PyDataFusionError::from)?
677 .to_data()
678 .to_pyarrow(py)
679 }
680
681 #[pyo3(signature = (num=20))]
683 fn show(&self, py: Python, num: usize) -> PyDataFusionResult<()> {
684 let df = self.df.as_ref().clone().limit(0, Some(num))?;
685 print_dataframe(py, df)
686 }
687
688 fn distinct(&self) -> PyDataFusionResult<Self> {
690 let df = self.df.as_ref().clone().distinct()?;
691 Ok(Self::new(df))
692 }
693
694 fn join(
695 &self,
696 right: PyDataFrame,
697 how: &str,
698 left_on: Vec<PyBackedStr>,
699 right_on: Vec<PyBackedStr>,
700 coalesce_keys: bool,
701 ) -> PyDataFusionResult<Self> {
702 let join_type = match how {
703 "inner" => JoinType::Inner,
704 "left" => JoinType::Left,
705 "right" => JoinType::Right,
706 "full" => JoinType::Full,
707 "semi" => JoinType::LeftSemi,
708 "anti" => JoinType::LeftAnti,
709 how => {
710 return Err(PyDataFusionError::Common(format!(
711 "The join type {how} does not exist or is not implemented"
712 )));
713 }
714 };
715
716 let left_keys = left_on.iter().map(|s| s.as_ref()).collect::<Vec<&str>>();
717 let right_keys = right_on.iter().map(|s| s.as_ref()).collect::<Vec<&str>>();
718
719 let mut df = self.df.as_ref().clone().join(
720 right.df.as_ref().clone(),
721 join_type,
722 &left_keys,
723 &right_keys,
724 None,
725 )?;
726
727 if coalesce_keys {
728 let mutual_keys = left_keys
729 .iter()
730 .zip(right_keys.iter())
731 .filter(|(l, r)| l == r)
732 .map(|(key, _)| *key)
733 .collect::<Vec<_>>();
734
735 let fields_to_coalesce = mutual_keys
736 .iter()
737 .map(|name| {
738 let qualified_fields = df
739 .logical_plan()
740 .schema()
741 .qualified_fields_with_unqualified_name(name);
742 (*name, qualified_fields)
743 })
744 .filter(|(_, fields)| fields.len() == 2)
745 .collect::<Vec<_>>();
746
747 let expr: Vec<Expr> = df
748 .logical_plan()
749 .schema()
750 .fields()
751 .into_iter()
752 .enumerate()
753 .map(|(idx, _)| df.logical_plan().schema().qualified_field(idx))
754 .filter_map(|(qualifier, field)| {
755 if let Some((key_name, qualified_fields)) = fields_to_coalesce
756 .iter()
757 .find(|(_, qf)| qf.contains(&(qualifier, field)))
758 {
759 if (qualifier, field) == qualified_fields[0] {
762 let left_col = Expr::Column(Column::from(qualified_fields[0]));
763 let right_col = Expr::Column(Column::from(qualified_fields[1]));
764 return Some(coalesce(vec![left_col, right_col]).alias(*key_name));
765 }
766 None
767 } else {
768 Some(Expr::Column(Column::from((qualifier, field))))
769 }
770 })
771 .collect();
772 df = df.select(expr)?;
773 }
774
775 Ok(Self::new(df))
776 }
777
778 fn join_on(
779 &self,
780 right: PyDataFrame,
781 on_exprs: Vec<PyExpr>,
782 how: &str,
783 ) -> PyDataFusionResult<Self> {
784 let join_type = match how {
785 "inner" => JoinType::Inner,
786 "left" => JoinType::Left,
787 "right" => JoinType::Right,
788 "full" => JoinType::Full,
789 "semi" => JoinType::LeftSemi,
790 "anti" => JoinType::LeftAnti,
791 how => {
792 return Err(PyDataFusionError::Common(format!(
793 "The join type {how} does not exist or is not implemented"
794 )));
795 }
796 };
797 let exprs: Vec<Expr> = on_exprs.into_iter().map(|e| e.into()).collect();
798
799 let df = self
800 .df
801 .as_ref()
802 .clone()
803 .join_on(right.df.as_ref().clone(), join_type, exprs)?;
804 Ok(Self::new(df))
805 }
806
807 #[pyo3(signature = (verbose=false, analyze=false, format=None))]
809 fn explain(
810 &self,
811 py: Python,
812 verbose: bool,
813 analyze: bool,
814 format: Option<&str>,
815 ) -> PyDataFusionResult<()> {
816 let explain_format = match format {
817 Some(f) => f
818 .parse::<datafusion::common::format::ExplainFormat>()
819 .map_err(|e| {
820 PyDataFusionError::Common(format!("Invalid explain format '{}': {}", f, e))
821 })?,
822 None => datafusion::common::format::ExplainFormat::Indent,
823 };
824 let opts = datafusion::logical_expr::ExplainOption::default()
825 .with_verbose(verbose)
826 .with_analyze(analyze)
827 .with_format(explain_format);
828 let df = self.df.as_ref().clone().explain_with_options(opts)?;
829 print_dataframe(py, df)
830 }
831
832 fn logical_plan(&self) -> PyResult<PyLogicalPlan> {
834 Ok(self.df.as_ref().clone().logical_plan().clone().into())
835 }
836
837 fn optimized_logical_plan(&self) -> PyDataFusionResult<PyLogicalPlan> {
839 Ok(self.df.as_ref().clone().into_optimized_plan()?.into())
840 }
841
842 fn execution_plan(&self, py: Python) -> PyDataFusionResult<PyExecutionPlan> {
844 let plan = wait_for_future(py, self.df.as_ref().clone().create_physical_plan())??;
845 Ok(plan.into())
846 }
847
848 fn repartition(&self, num: usize) -> PyDataFusionResult<Self> {
850 let new_df = self
851 .df
852 .as_ref()
853 .clone()
854 .repartition(Partitioning::RoundRobinBatch(num))?;
855 Ok(Self::new(new_df))
856 }
857
858 #[pyo3(signature = (*args, num))]
860 fn repartition_by_hash(&self, args: Vec<PyExpr>, num: usize) -> PyDataFusionResult<Self> {
861 let expr = args.into_iter().map(|py_expr| py_expr.into()).collect();
862 let new_df = self
863 .df
864 .as_ref()
865 .clone()
866 .repartition(Partitioning::Hash(expr, num))?;
867 Ok(Self::new(new_df))
868 }
869
870 #[pyo3(signature = (py_df, distinct=false))]
873 fn union(&self, py_df: PyDataFrame, distinct: bool) -> PyDataFusionResult<Self> {
874 let new_df = if distinct {
875 self.df
876 .as_ref()
877 .clone()
878 .union_distinct(py_df.df.as_ref().clone())?
879 } else {
880 self.df.as_ref().clone().union(py_df.df.as_ref().clone())?
881 };
882
883 Ok(Self::new(new_df))
884 }
885
886 #[pyo3(signature = (columns, preserve_nulls=true, recursions=None))]
887 fn unnest_columns(
888 &self,
889 columns: Vec<String>,
890 preserve_nulls: bool,
891 recursions: Option<Vec<(String, String, usize)>>,
892 ) -> PyDataFusionResult<Self> {
893 let unnest_options = build_unnest_options(preserve_nulls, recursions);
894 let cols = columns.iter().map(|s| s.as_ref()).collect::<Vec<&str>>();
895 let df = self
896 .df
897 .as_ref()
898 .clone()
899 .unnest_columns_with_options(&cols, unnest_options)?;
900 Ok(Self::new(df))
901 }
902
903 #[pyo3(signature = (py_df, distinct=false))]
905 fn intersect(&self, py_df: PyDataFrame, distinct: bool) -> PyDataFusionResult<Self> {
906 let base = self.df.as_ref().clone();
907 let other = py_df.df.as_ref().clone();
908 let new_df = if distinct {
909 base.intersect_distinct(other)?
910 } else {
911 base.intersect(other)?
912 };
913 Ok(Self::new(new_df))
914 }
915
916 #[pyo3(signature = (py_df, distinct=false))]
918 fn except_all(&self, py_df: PyDataFrame, distinct: bool) -> PyDataFusionResult<Self> {
919 let base = self.df.as_ref().clone();
920 let other = py_df.df.as_ref().clone();
921 let new_df = if distinct {
922 base.except_distinct(other)?
923 } else {
924 base.except(other)?
925 };
926 Ok(Self::new(new_df))
927 }
928
929 #[pyo3(signature = (py_df, distinct=false))]
931 fn union_by_name(&self, py_df: PyDataFrame, distinct: bool) -> PyDataFusionResult<Self> {
932 let base = self.df.as_ref().clone();
933 let other = py_df.df.as_ref().clone();
934 let new_df = if distinct {
935 base.union_by_name_distinct(other)?
936 } else {
937 base.union_by_name(other)?
938 };
939 Ok(Self::new(new_df))
940 }
941
942 fn distinct_on(
944 &self,
945 on_expr: Vec<PyExpr>,
946 select_expr: Vec<PyExpr>,
947 sort_expr: Option<Vec<PySortExpr>>,
948 ) -> PyDataFusionResult<Self> {
949 let on_expr = on_expr.into_iter().map(|e| e.into()).collect();
950 let select_expr = select_expr.into_iter().map(|e| e.into()).collect();
951 let sort_expr = sort_expr.map(to_sort_expressions);
952 let df = self
953 .df
954 .as_ref()
955 .clone()
956 .distinct_on(on_expr, select_expr, sort_expr)?;
957 Ok(Self::new(df))
958 }
959
960 fn sort_by(&self, exprs: Vec<PyExpr>) -> PyDataFusionResult<Self> {
962 let exprs = exprs.into_iter().map(|e| e.into()).collect();
963 let df = self.df.as_ref().clone().sort_by(exprs)?;
964 Ok(Self::new(df))
965 }
966
967 fn find_qualified_columns(&self, names: Vec<String>) -> PyDataFusionResult<Vec<PyExpr>> {
969 let name_refs: Vec<&str> = names.iter().map(|s| s.as_str()).collect();
970 let qualified = self.df.find_qualified_columns(&name_refs)?;
971 Ok(qualified
972 .into_iter()
973 .map(|q| Expr::Column(Column::from(q)).into())
974 .collect())
975 }
976
977 fn write_csv(
979 &self,
980 py: Python,
981 path: &str,
982 with_header: bool,
983 write_options: Option<PyDataFrameWriteOptions>,
984 ) -> PyDataFusionResult<()> {
985 let csv_options = CsvOptions {
986 has_header: Some(with_header),
987 ..Default::default()
988 };
989 let write_options = write_options
990 .map(DataFrameWriteOptions::from)
991 .unwrap_or_default();
992
993 wait_for_future(
994 py,
995 self.df
996 .as_ref()
997 .clone()
998 .write_csv(path, write_options, Some(csv_options)),
999 )??;
1000 Ok(())
1001 }
1002
1003 #[pyo3(signature = (
1005 path,
1006 compression="zstd",
1007 compression_level=None,
1008 write_options=None,
1009 ))]
1010 fn write_parquet(
1011 &self,
1012 path: &str,
1013 compression: &str,
1014 compression_level: Option<u32>,
1015 write_options: Option<PyDataFrameWriteOptions>,
1016 py: Python,
1017 ) -> PyDataFusionResult<()> {
1018 fn verify_compression_level(cl: Option<u32>) -> Result<u32, PyErr> {
1019 cl.ok_or(PyValueError::new_err("compression_level is not defined"))
1020 }
1021
1022 let _validated = match compression.to_lowercase().as_str() {
1023 "snappy" => Compression::SNAPPY,
1024 "gzip" => Compression::GZIP(
1025 GzipLevel::try_new(compression_level.unwrap_or(6))
1026 .map_err(|e| PyValueError::new_err(format!("{e}")))?,
1027 ),
1028 "brotli" => Compression::BROTLI(
1029 BrotliLevel::try_new(verify_compression_level(compression_level)?)
1030 .map_err(|e| PyValueError::new_err(format!("{e}")))?,
1031 ),
1032 "zstd" => Compression::ZSTD(
1033 ZstdLevel::try_new(verify_compression_level(compression_level)? as i32)
1034 .map_err(|e| PyValueError::new_err(format!("{e}")))?,
1035 ),
1036 "lzo" => Compression::LZO,
1037 "lz4" => Compression::LZ4,
1038 "lz4_raw" => Compression::LZ4_RAW,
1039 "uncompressed" => Compression::UNCOMPRESSED,
1040 _ => {
1041 return Err(PyDataFusionError::Common(format!(
1042 "Unrecognized compression type {compression}"
1043 )));
1044 }
1045 };
1046
1047 let mut compression_string = compression.to_string();
1048 if let Some(level) = compression_level {
1049 compression_string.push_str(&format!("({level})"));
1050 }
1051
1052 let mut options = TableParquetOptions::default();
1053 options.global.compression = Some(compression_string);
1054 let write_options = write_options
1055 .map(DataFrameWriteOptions::from)
1056 .unwrap_or_default();
1057
1058 wait_for_future(
1059 py,
1060 self.df
1061 .as_ref()
1062 .clone()
1063 .write_parquet(path, write_options, Option::from(options)),
1064 )??;
1065 Ok(())
1066 }
1067
1068 fn write_parquet_with_options(
1070 &self,
1071 path: &str,
1072 options: PyParquetWriterOptions,
1073 column_specific_options: HashMap<String, PyParquetColumnOptions>,
1074 write_options: Option<PyDataFrameWriteOptions>,
1075 py: Python,
1076 ) -> PyDataFusionResult<()> {
1077 let table_options = TableParquetOptions {
1078 global: options.options,
1079 column_specific_options: column_specific_options
1080 .into_iter()
1081 .map(|(k, v)| (k, v.options))
1082 .collect(),
1083 ..Default::default()
1084 };
1085 let write_options = write_options
1086 .map(DataFrameWriteOptions::from)
1087 .unwrap_or_default();
1088 wait_for_future(
1089 py,
1090 self.df.as_ref().clone().write_parquet(
1091 path,
1092 write_options,
1093 Option::from(table_options),
1094 ),
1095 )??;
1096 Ok(())
1097 }
1098
1099 fn write_json(
1101 &self,
1102 path: &str,
1103 py: Python,
1104 write_options: Option<PyDataFrameWriteOptions>,
1105 ) -> PyDataFusionResult<()> {
1106 let write_options = write_options
1107 .map(DataFrameWriteOptions::from)
1108 .unwrap_or_default();
1109 wait_for_future(
1110 py,
1111 self.df
1112 .as_ref()
1113 .clone()
1114 .write_json(path, write_options, None),
1115 )??;
1116 Ok(())
1117 }
1118
1119 fn write_table(
1120 &self,
1121 py: Python,
1122 table_name: &str,
1123 write_options: Option<PyDataFrameWriteOptions>,
1124 ) -> PyDataFusionResult<()> {
1125 let write_options = write_options
1126 .map(DataFrameWriteOptions::from)
1127 .unwrap_or_default();
1128 wait_for_future(
1129 py,
1130 self.df
1131 .as_ref()
1132 .clone()
1133 .write_table(table_name, write_options),
1134 )??;
1135 Ok(())
1136 }
1137
1138 fn to_arrow_table(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
1141 let batches = self.collect(py)?.into_pyobject(py)?;
1142
1143 let args = if batches.len()? == 0 {
1146 let schema = self.schema().into_pyobject(py)?;
1147 PyTuple::new(py, &[batches, schema])?
1148 } else {
1149 PyTuple::new(py, &[batches])?
1150 };
1151
1152 let table_class = py.import("pyarrow")?.getattr("Table")?;
1154 let table: Py<PyAny> = table_class.call_method1("from_batches", args)?.into();
1155 Ok(table)
1156 }
1157
1158 #[pyo3(signature = (requested_schema=None))]
1159 fn __arrow_c_stream__<'py>(
1160 &'py self,
1161 py: Python<'py>,
1162 requested_schema: Option<Bound<'py, PyCapsule>>,
1163 ) -> PyDataFusionResult<Bound<'py, PyCapsule>> {
1164 let df = self.df.as_ref().clone();
1165 let streams = spawn_future(py, async move { df.execute_stream_partitioned().await })?;
1166
1167 let mut schema: Schema = self.df.schema().to_owned().as_arrow().clone();
1168 let mut projection: Option<SchemaRef> = None;
1169
1170 if let Some(schema_capsule) = requested_schema {
1171 let data: NonNull<FFI_ArrowSchema> = schema_capsule
1172 .pointer_checked(Some(c"arrow_schema"))?
1173 .cast();
1174 let schema_ptr = unsafe { data.as_ref() };
1175 let desired_schema = Schema::try_from(schema_ptr)?;
1176
1177 schema = project_schema(schema, desired_schema)?;
1178 projection = Some(Arc::new(schema.clone()));
1179 }
1180
1181 let schema_ref = Arc::new(schema.clone());
1182
1183 let reader = PartitionedDataFrameStreamReader {
1184 streams,
1185 schema: schema_ref,
1186 projection,
1187 current: 0,
1188 };
1189 let reader: Box<dyn RecordBatchReader + Send> = Box::new(reader);
1190
1191 let stream = FFI_ArrowArrayStream::new(reader);
1195 let name = CString::new(ARROW_ARRAY_STREAM_NAME.to_bytes()).unwrap();
1196 let capsule = PyCapsule::new(py, stream, Some(name))?;
1197 Ok(capsule)
1198 }
1199
1200 fn execute_stream(&self, py: Python) -> PyDataFusionResult<PyRecordBatchStream> {
1201 let df = self.df.as_ref().clone();
1202 let stream = spawn_future(py, async move { df.execute_stream().await })?;
1203 Ok(PyRecordBatchStream::new(stream))
1204 }
1205
1206 fn execute_stream_partitioned(&self, py: Python) -> PyResult<Vec<PyRecordBatchStream>> {
1207 let df = self.df.as_ref().clone();
1208 let streams = spawn_future(py, async move { df.execute_stream_partitioned().await })?;
1209 Ok(streams.into_iter().map(PyRecordBatchStream::new).collect())
1210 }
1211
1212 fn to_pandas(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
1215 let table = self.to_arrow_table(py)?;
1216
1217 let result = table.call_method0(py, "to_pandas")?;
1219 Ok(result)
1220 }
1221
1222 fn to_pylist(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
1225 let table = self.to_arrow_table(py)?;
1226
1227 let result = table.call_method0(py, "to_pylist")?;
1229 Ok(result)
1230 }
1231
1232 fn to_pydict(&self, py: Python) -> PyResult<Py<PyAny>> {
1235 let table = self.to_arrow_table(py)?;
1236
1237 let result = table.call_method0(py, "to_pydict")?;
1239 Ok(result)
1240 }
1241
1242 fn to_polars(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
1245 let table = self.to_arrow_table(py)?;
1246 let dataframe = py.import("polars")?.getattr("DataFrame")?;
1247 let args = PyTuple::new(py, &[table])?;
1248 let result: Py<PyAny> = dataframe.call1(args)?.into();
1249 Ok(result)
1250 }
1251
1252 fn count(&self, py: Python) -> PyDataFusionResult<usize> {
1254 Ok(wait_for_future(py, self.df.as_ref().clone().count())??)
1255 }
1256
1257 #[pyo3(signature = (value, columns=None))]
1259 fn fill_null(
1260 &self,
1261 value: Py<PyAny>,
1262 columns: Option<Vec<PyBackedStr>>,
1263 py: Python,
1264 ) -> PyDataFusionResult<Self> {
1265 let scalar_value: PyScalarValue = value.extract(py)?;
1266
1267 let cols = match columns {
1268 Some(col_names) => col_names.iter().map(|c| c.to_string()).collect(),
1269 None => Vec::new(), };
1271
1272 let df = self.df.as_ref().clone().fill_null(scalar_value.0, cols)?;
1273 Ok(Self::new(df))
1274 }
1275}
1276
1277#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
1278#[pyclass(
1279 from_py_object,
1280 frozen,
1281 eq,
1282 eq_int,
1283 name = "InsertOp",
1284 module = "datafusion"
1285)]
1286pub enum PyInsertOp {
1287 APPEND,
1288 REPLACE,
1289 OVERWRITE,
1290}
1291
1292impl From<PyInsertOp> for InsertOp {
1293 fn from(value: PyInsertOp) -> Self {
1294 match value {
1295 PyInsertOp::APPEND => InsertOp::Append,
1296 PyInsertOp::REPLACE => InsertOp::Replace,
1297 PyInsertOp::OVERWRITE => InsertOp::Overwrite,
1298 }
1299 }
1300}
1301
1302#[derive(Debug, Clone)]
1303#[pyclass(
1304 from_py_object,
1305 frozen,
1306 name = "DataFrameWriteOptions",
1307 module = "datafusion"
1308)]
1309pub struct PyDataFrameWriteOptions {
1310 insert_operation: InsertOp,
1311 single_file_output: bool,
1312 partition_by: Vec<String>,
1313 sort_by: Vec<SortExpr>,
1314}
1315
1316impl From<PyDataFrameWriteOptions> for DataFrameWriteOptions {
1317 fn from(value: PyDataFrameWriteOptions) -> Self {
1318 DataFrameWriteOptions::new()
1319 .with_insert_operation(value.insert_operation)
1320 .with_single_file_output(value.single_file_output)
1321 .with_partition_by(value.partition_by)
1322 .with_sort_by(value.sort_by)
1323 }
1324}
1325
1326#[pymethods]
1327impl PyDataFrameWriteOptions {
1328 #[new]
1329 fn new(
1330 insert_operation: Option<PyInsertOp>,
1331 single_file_output: bool,
1332 partition_by: Option<Vec<String>>,
1333 sort_by: Option<Vec<PySortExpr>>,
1334 ) -> Self {
1335 let insert_operation = insert_operation.map(Into::into).unwrap_or(InsertOp::Append);
1336 let sort_by = sort_by
1337 .unwrap_or_default()
1338 .into_iter()
1339 .map(Into::into)
1340 .collect();
1341 Self {
1342 insert_operation,
1343 single_file_output,
1344 partition_by: partition_by.unwrap_or_default(),
1345 sort_by,
1346 }
1347 }
1348}
1349
1350fn build_unnest_options(
1351 preserve_nulls: bool,
1352 recursions: Option<Vec<(String, String, usize)>>,
1353) -> UnnestOptions {
1354 let mut opts = UnnestOptions::default().with_preserve_nulls(preserve_nulls);
1355 if let Some(recs) = recursions {
1356 opts.recursions = recs
1357 .into_iter()
1358 .map(
1359 |(input, output, depth)| datafusion::common::RecursionUnnestOption {
1360 input_column: datafusion::common::Column::from(input.as_str()),
1361 output_column: datafusion::common::Column::from(output.as_str()),
1362 depth,
1363 },
1364 )
1365 .collect();
1366 }
1367 opts
1368}
1369
1370fn print_dataframe(py: Python, df: DataFrame) -> PyDataFusionResult<()> {
1372 let batches = wait_for_future(py, df.collect())??;
1374 let result = if batches.is_empty() {
1375 "DataFrame has no rows".to_string()
1376 } else {
1377 match pretty::pretty_format_batches(&batches) {
1378 Ok(batch) => format!("DataFrame()\n{batch}"),
1379 Err(err) => format!("Error: {:?}", err.to_string()),
1380 }
1381 };
1382
1383 let print = py.import("builtins")?.getattr("print")?;
1386 print.call1((result,))?;
1387 Ok(())
1388}
1389
1390fn project_schema(from_schema: Schema, to_schema: Schema) -> Result<Schema, ArrowError> {
1391 let merged_schema = Schema::try_merge(vec![from_schema, to_schema.clone()])?;
1392
1393 let project_indices: Vec<usize> = to_schema
1394 .fields
1395 .iter()
1396 .map(|field| field.name())
1397 .filter_map(|field_name| merged_schema.index_of(field_name).ok())
1398 .collect();
1399
1400 merged_schema.project(&project_indices)
1401}
1402fn record_batch_into_schema(
1408 record_batch: RecordBatch,
1409 schema: &Schema,
1410) -> Result<RecordBatch, ArrowError> {
1411 let schema = Arc::new(schema.clone());
1412 let base_schema = record_batch.schema();
1413 if base_schema.fields().is_empty() {
1414 return Ok(RecordBatch::new_empty(schema));
1416 }
1417
1418 let array_size = record_batch.column(0).len();
1419 let mut data_arrays = Vec::with_capacity(schema.fields().len());
1420
1421 for field in schema.fields() {
1422 let desired_data_type = field.data_type();
1423 if let Some(original_data) = record_batch.column_by_name(field.name()) {
1424 let original_data_type = original_data.data_type();
1425
1426 if can_cast_types(original_data_type, desired_data_type) {
1427 data_arrays.push(arrow::compute::kernels::cast(
1428 original_data,
1429 desired_data_type,
1430 )?);
1431 } else if field.is_nullable() {
1432 data_arrays.push(new_null_array(desired_data_type, array_size));
1433 } else {
1434 return Err(ArrowError::CastError(format!(
1435 "Attempting to cast to non-nullable and non-castable field {} during schema projection.",
1436 field.name()
1437 )));
1438 }
1439 } else {
1440 if !field.is_nullable() {
1441 return Err(ArrowError::CastError(format!(
1442 "Attempting to set null to non-nullable field {} during schema projection.",
1443 field.name()
1444 )));
1445 }
1446 data_arrays.push(new_null_array(desired_data_type, array_size));
1447 }
1448 }
1449
1450 RecordBatch::try_new(schema, data_arrays)
1451}
1452
1453async fn collect_record_batches_to_display(
1464 df: DataFrame,
1465 config: FormatterConfig,
1466) -> Result<(Vec<RecordBatch>, bool), DataFusionError> {
1467 let FormatterConfig {
1468 max_bytes,
1469 min_rows,
1470 max_rows,
1471 } = config;
1472
1473 let partitioned_stream = df.execute_stream_partitioned().await?;
1474 let mut stream = futures::stream::iter(partitioned_stream).flatten();
1475 let mut size_estimate_so_far = 0;
1476 let mut rows_so_far = 0;
1477 let mut record_batches = Vec::default();
1478 let mut has_more = false;
1479
1480 while (size_estimate_so_far < max_bytes && rows_so_far < max_rows) || rows_so_far < min_rows {
1485 let mut rb = match stream.next().await {
1486 None => {
1487 break;
1488 }
1489 Some(Ok(r)) => r,
1490 Some(Err(e)) => return Err(e),
1491 };
1492
1493 let mut rows_in_rb = rb.num_rows();
1494 if rows_in_rb > 0 {
1495 size_estimate_so_far += rb.get_array_memory_size();
1496
1497 if size_estimate_so_far > max_bytes {
1499 let ratio = max_bytes as f32 / size_estimate_so_far as f32;
1500 let total_rows = rows_in_rb + rows_so_far;
1501
1502 let mut reduced_row_num = (total_rows as f32 * ratio).round() as usize;
1504 if reduced_row_num < min_rows {
1506 reduced_row_num = min_rows.min(total_rows);
1507 }
1508
1509 let limited_rows_this_rb = reduced_row_num - rows_so_far;
1510 if limited_rows_this_rb < rows_in_rb {
1511 rows_in_rb = limited_rows_this_rb;
1512 rb = rb.slice(0, limited_rows_this_rb);
1513 has_more = true;
1514 }
1515 }
1516
1517 if rows_in_rb + rows_so_far > max_rows {
1518 rb = rb.slice(0, max_rows - rows_so_far);
1519 has_more = true;
1520 }
1521
1522 rows_so_far += rb.num_rows();
1523 record_batches.push(rb);
1524 }
1525 }
1526
1527 if record_batches.is_empty() {
1528 return Ok((Vec::default(), false));
1529 }
1530
1531 if !has_more {
1532 has_more = match stream.try_next().await {
1534 Ok(None) => false, Ok(Some(_)) => true,
1536 Err(_) => false, };
1538 }
1539
1540 Ok((record_batches, has_more))
1541}