Skip to main content

datafusion_python/
dataframe.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::ffi::{CStr, CString};
20use std::ptr::NonNull;
21use std::str::FromStr;
22use std::sync::Arc;
23
24use arrow::array::{Array, ArrayRef, RecordBatch, RecordBatchReader, new_null_array};
25use arrow::compute::can_cast_types;
26use arrow::error::ArrowError;
27use arrow::ffi::FFI_ArrowSchema;
28use arrow::ffi_stream::FFI_ArrowArrayStream;
29use arrow::pyarrow::FromPyArrow;
30use cstr::cstr;
31use datafusion::arrow::datatypes::{Schema, SchemaRef};
32use datafusion::arrow::pyarrow::{PyArrowType, ToPyArrow};
33use datafusion::arrow::util::pretty;
34use datafusion::catalog::TableProvider;
35use datafusion::common::UnnestOptions;
36use datafusion::config::{CsvOptions, ParquetColumnOptions, ParquetOptions, TableParquetOptions};
37use datafusion::dataframe::{DataFrame, DataFrameWriteOptions};
38use datafusion::error::DataFusionError;
39use datafusion::execution::SendableRecordBatchStream;
40use datafusion::logical_expr::SortExpr;
41use datafusion::logical_expr::dml::InsertOp;
42use datafusion::parquet::basic::{BrotliLevel, Compression, GzipLevel, ZstdLevel};
43use datafusion::prelude::*;
44use datafusion_python_util::{is_ipython_env, spawn_future, wait_for_future};
45use futures::{StreamExt, TryStreamExt};
46use parking_lot::Mutex;
47use pyo3::PyErr;
48use pyo3::exceptions::PyValueError;
49use pyo3::prelude::*;
50use pyo3::pybacked::PyBackedStr;
51use pyo3::types::{PyCapsule, PyList, PyTuple, PyTupleMethods};
52
53use crate::common::data_type::PyScalarValue;
54use crate::errors::{PyDataFusionError, PyDataFusionResult, py_datafusion_err};
55use crate::expr::PyExpr;
56use crate::expr::sort_expr::{PySortExpr, to_sort_expressions};
57use crate::physical_plan::PyExecutionPlan;
58use crate::record_batch::{PyRecordBatchStream, poll_next_batch};
59use crate::sql::logical::PyLogicalPlan;
60use crate::table::{PyTable, TempViewTable};
61
62/// File-level static CStr for the Arrow array stream capsule name.
63static ARROW_ARRAY_STREAM_NAME: &CStr = cstr!("arrow_array_stream");
64
65// Type aliases to simplify very complex types used in this file and
66// avoid compiler complaints about deeply nested types in struct fields.
67type CachedBatches = Option<(Vec<RecordBatch>, bool)>;
68type SharedCachedBatches = Arc<Mutex<CachedBatches>>;
69
70/// Configuration for DataFrame display formatting
71#[derive(Debug, Clone)]
72pub struct FormatterConfig {
73    /// Maximum memory in bytes to use for display (default: 2MB)
74    pub max_bytes: usize,
75    /// Minimum number of rows to display (default: 10)
76    pub min_rows: usize,
77    /// Maximum number of rows to include in __repr__ output (default: 10)
78    pub max_rows: usize,
79}
80
81impl Default for FormatterConfig {
82    fn default() -> Self {
83        Self {
84            max_bytes: 2 * 1024 * 1024, // 2MB
85            min_rows: 10,
86            max_rows: 10,
87        }
88    }
89}
90
91impl FormatterConfig {
92    /// Validates that all configuration values are positive integers.
93    ///
94    /// # Returns
95    ///
96    /// `Ok(())` if all values are valid, or an `Err` with a descriptive error message.
97    pub fn validate(&self) -> Result<(), String> {
98        if self.max_bytes == 0 {
99            return Err("max_bytes must be a positive integer".to_string());
100        }
101
102        if self.min_rows == 0 {
103            return Err("min_rows must be a positive integer".to_string());
104        }
105
106        if self.max_rows == 0 {
107            return Err("max_rows must be a positive integer".to_string());
108        }
109
110        if self.min_rows > self.max_rows {
111            return Err("min_rows must be less than or equal to max_rows".to_string());
112        }
113
114        Ok(())
115    }
116}
117
118/// Holds the Python formatter and its configuration
119struct PythonFormatter<'py> {
120    /// The Python formatter object
121    formatter: Bound<'py, PyAny>,
122    /// The formatter configuration
123    config: FormatterConfig,
124}
125
126/// Get the Python formatter and its configuration
127fn get_python_formatter_with_config(py: Python) -> PyResult<PythonFormatter> {
128    let formatter = import_python_formatter(py)?;
129    let config = build_formatter_config_from_python(&formatter)?;
130    Ok(PythonFormatter { formatter, config })
131}
132
133/// Get the Python formatter from the datafusion.dataframe_formatter module
134fn import_python_formatter(py: Python<'_>) -> PyResult<Bound<'_, PyAny>> {
135    let formatter_module = py.import("datafusion.dataframe_formatter")?;
136    let get_formatter = formatter_module.getattr("get_formatter")?;
137    get_formatter.call0()
138}
139
140// Helper function to extract attributes with fallback to default
141fn get_attr<'a, T>(py_object: &'a Bound<'a, PyAny>, attr_name: &str, default_value: T) -> T
142where
143    T: for<'py> pyo3::FromPyObject<'py, 'py> + Clone,
144{
145    py_object
146        .getattr(attr_name)
147        .and_then(|v| v.extract::<T>().map_err(Into::<PyErr>::into))
148        .unwrap_or_else(|_| default_value.clone())
149}
150
151/// Helper function to create a FormatterConfig from a Python formatter object
152fn build_formatter_config_from_python(formatter: &Bound<'_, PyAny>) -> PyResult<FormatterConfig> {
153    let default_config = FormatterConfig::default();
154    let max_bytes = get_attr(formatter, "max_memory_bytes", default_config.max_bytes);
155    let min_rows = get_attr(formatter, "min_rows", default_config.min_rows);
156
157    // Backward compatibility: Try max_rows first (new name), fall back to repr_rows (deprecated),
158    // then use default. This ensures backward compatibility with custom formatter implementations
159    // during the deprecation period.
160    let max_rows = get_attr(formatter, "max_rows", 0usize);
161    let max_rows = if max_rows > 0 {
162        // max_rows attribute exists and has a value
163        max_rows
164    } else {
165        // Try the deprecated repr_rows attribute
166        let repr_rows = get_attr(formatter, "repr_rows", 0usize);
167        if repr_rows > 0 {
168            repr_rows
169        } else {
170            // Use default
171            default_config.max_rows
172        }
173    };
174
175    let config = FormatterConfig {
176        max_bytes,
177        min_rows,
178        max_rows,
179    };
180
181    // Return the validated config, converting String error to PyErr
182    config.validate().map_err(PyValueError::new_err)?;
183    Ok(config)
184}
185
186/// Python mapping of `ParquetOptions` (includes just the writer-related options).
187#[pyclass(
188    from_py_object,
189    frozen,
190    name = "ParquetWriterOptions",
191    module = "datafusion",
192    subclass
193)]
194#[derive(Clone, Default)]
195pub struct PyParquetWriterOptions {
196    options: ParquetOptions,
197}
198
199#[pymethods]
200impl PyParquetWriterOptions {
201    #[new]
202    #[allow(clippy::too_many_arguments)]
203    pub fn new(
204        data_pagesize_limit: usize,
205        write_batch_size: usize,
206        writer_version: &str,
207        skip_arrow_metadata: bool,
208        compression: Option<String>,
209        dictionary_enabled: Option<bool>,
210        dictionary_page_size_limit: usize,
211        statistics_enabled: Option<String>,
212        max_row_group_size: usize,
213        created_by: String,
214        column_index_truncate_length: Option<usize>,
215        statistics_truncate_length: Option<usize>,
216        data_page_row_count_limit: usize,
217        encoding: Option<String>,
218        bloom_filter_on_write: bool,
219        bloom_filter_fpp: Option<f64>,
220        bloom_filter_ndv: Option<u64>,
221        allow_single_file_parallelism: bool,
222        maximum_parallel_row_group_writers: usize,
223        maximum_buffered_record_batches_per_stream: usize,
224    ) -> PyResult<Self> {
225        let writer_version =
226            datafusion::common::parquet_config::DFParquetWriterVersion::from_str(writer_version)
227                .map_err(py_datafusion_err)?;
228        Ok(Self {
229            options: ParquetOptions {
230                data_pagesize_limit,
231                write_batch_size,
232                writer_version,
233                skip_arrow_metadata,
234                compression,
235                dictionary_enabled,
236                dictionary_page_size_limit,
237                statistics_enabled,
238                max_row_group_size,
239                created_by,
240                column_index_truncate_length,
241                statistics_truncate_length,
242                data_page_row_count_limit,
243                encoding,
244                bloom_filter_on_write,
245                bloom_filter_fpp,
246                bloom_filter_ndv,
247                allow_single_file_parallelism,
248                maximum_parallel_row_group_writers,
249                maximum_buffered_record_batches_per_stream,
250                ..Default::default()
251            },
252        })
253    }
254}
255
256/// Python mapping of `ParquetColumnOptions`.
257#[pyclass(
258    from_py_object,
259    frozen,
260    name = "ParquetColumnOptions",
261    module = "datafusion",
262    subclass
263)]
264#[derive(Clone, Default)]
265pub struct PyParquetColumnOptions {
266    options: ParquetColumnOptions,
267}
268
269#[pymethods]
270impl PyParquetColumnOptions {
271    #[new]
272    pub fn new(
273        bloom_filter_enabled: Option<bool>,
274        encoding: Option<String>,
275        dictionary_enabled: Option<bool>,
276        compression: Option<String>,
277        statistics_enabled: Option<String>,
278        bloom_filter_fpp: Option<f64>,
279        bloom_filter_ndv: Option<u64>,
280    ) -> Self {
281        Self {
282            options: ParquetColumnOptions {
283                bloom_filter_enabled,
284                encoding,
285                dictionary_enabled,
286                compression,
287                statistics_enabled,
288                bloom_filter_fpp,
289                bloom_filter_ndv,
290            },
291        }
292    }
293}
294
295/// A PyDataFrame is a representation of a logical plan and an API to compose statements.
296/// Use it to build a plan and `.collect()` to execute the plan and collect the result.
297/// The actual execution of a plan runs natively on Rust and Arrow on a multi-threaded environment.
298#[pyclass(
299    from_py_object,
300    name = "DataFrame",
301    module = "datafusion",
302    subclass,
303    frozen
304)]
305#[derive(Clone)]
306pub struct PyDataFrame {
307    df: Arc<DataFrame>,
308
309    // In IPython environment cache batches between __repr__ and _repr_html_ calls.
310    batches: SharedCachedBatches,
311}
312
313impl PyDataFrame {
314    /// creates a new PyDataFrame
315    pub fn new(df: DataFrame) -> Self {
316        Self {
317            df: Arc::new(df),
318            batches: Arc::new(Mutex::new(None)),
319        }
320    }
321
322    /// Return a clone of the inner Arc<DataFrame> for crate-local callers.
323    pub(crate) fn inner_df(&self) -> Arc<DataFrame> {
324        Arc::clone(&self.df)
325    }
326
327    fn prepare_repr_string<'py>(
328        &self,
329        py: Python<'py>,
330        as_html: bool,
331    ) -> PyDataFusionResult<String> {
332        // Get the Python formatter and config
333        let PythonFormatter { formatter, config } = get_python_formatter_with_config(py)?;
334
335        let is_ipython = *is_ipython_env(py);
336
337        let (cached_batches, should_cache) = {
338            let mut cache = self.batches.lock();
339            let should_cache = is_ipython && cache.is_none();
340            let batches = cache.take();
341            (batches, should_cache)
342        };
343
344        let (batches, has_more) = match cached_batches {
345            Some(b) => b,
346            None => wait_for_future(
347                py,
348                collect_record_batches_to_display(self.df.as_ref().clone(), config),
349            )??,
350        };
351
352        if batches.is_empty() {
353            // This should not be reached, but do it for safety since we index into the vector below
354            return Ok("No data to display".to_string());
355        }
356
357        let table_uuid = uuid::Uuid::new_v4().to_string();
358
359        // Convert record batches to Py<PyAny> list
360        let py_batches = batches
361            .iter()
362            .map(|rb| rb.to_pyarrow(py))
363            .collect::<PyResult<Vec<Bound<'py, PyAny>>>>()?;
364
365        let py_schema = self.schema().into_pyobject(py)?;
366
367        let kwargs = pyo3::types::PyDict::new(py);
368        let py_batches_list = PyList::new(py, py_batches.as_slice())?;
369        kwargs.set_item("batches", py_batches_list)?;
370        kwargs.set_item("schema", py_schema)?;
371        kwargs.set_item("has_more", has_more)?;
372        kwargs.set_item("table_uuid", table_uuid)?;
373
374        let method_name = match as_html {
375            true => "format_html",
376            false => "format_str",
377        };
378
379        let html_result = formatter.call_method(method_name, (), Some(&kwargs))?;
380        let html_str: String = html_result.extract()?;
381
382        if should_cache {
383            let mut cache = self.batches.lock();
384            *cache = Some((batches.clone(), has_more));
385        }
386
387        Ok(html_str)
388    }
389
390    async fn collect_column_inner(&self, column: &str) -> Result<ArrayRef, DataFusionError> {
391        let batches = self
392            .df
393            .as_ref()
394            .clone()
395            .select_columns(&[column])?
396            .collect()
397            .await?;
398
399        let arrays = batches
400            .iter()
401            .map(|b| b.column(0).as_ref())
402            .collect::<Vec<_>>();
403
404        arrow_select::concat::concat(&arrays).map_err(Into::into)
405    }
406}
407
408/// Synchronous wrapper around partitioned [`SendableRecordBatchStream`]s used
409/// for the `__arrow_c_stream__` implementation.
410///
411/// It drains each partition's stream sequentially, yielding record batches in
412/// their original partition order. When a `projection` is set, each batch is
413/// converted via `record_batch_into_schema` to apply schema changes per batch.
414struct PartitionedDataFrameStreamReader {
415    streams: Vec<SendableRecordBatchStream>,
416    schema: SchemaRef,
417    projection: Option<SchemaRef>,
418    current: usize,
419}
420
421impl Iterator for PartitionedDataFrameStreamReader {
422    type Item = Result<RecordBatch, ArrowError>;
423
424    fn next(&mut self) -> Option<Self::Item> {
425        while self.current < self.streams.len() {
426            let stream = &mut self.streams[self.current];
427            let fut = poll_next_batch(stream);
428            let result = Python::attach(|py| wait_for_future(py, fut));
429
430            match result {
431                Ok(Ok(Some(batch))) => {
432                    let batch = if let Some(ref schema) = self.projection {
433                        match record_batch_into_schema(batch, schema.as_ref()) {
434                            Ok(b) => b,
435                            Err(e) => return Some(Err(e)),
436                        }
437                    } else {
438                        batch
439                    };
440                    return Some(Ok(batch));
441                }
442                Ok(Ok(None)) => {
443                    self.current += 1;
444                    continue;
445                }
446                Ok(Err(e)) => {
447                    return Some(Err(ArrowError::ExternalError(Box::new(e))));
448                }
449                Err(e) => {
450                    return Some(Err(ArrowError::ExternalError(Box::new(e))));
451                }
452            }
453        }
454
455        None
456    }
457}
458
459impl RecordBatchReader for PartitionedDataFrameStreamReader {
460    fn schema(&self) -> SchemaRef {
461        self.schema.clone()
462    }
463}
464
465#[pymethods]
466impl PyDataFrame {
467    /// Enable selection for `df[col]`, `df[col1, col2, col3]`, and `df[[col1, col2, col3]]`
468    fn __getitem__(&self, key: Bound<'_, PyAny>) -> PyDataFusionResult<Self> {
469        if let Ok(key) = key.extract::<PyBackedStr>() {
470            // df[col]
471            self.select_exprs(vec![key])
472        } else if let Ok(tuple) = key.cast::<PyTuple>() {
473            // df[col1, col2, col3]
474            let keys = tuple
475                .iter()
476                .map(|item| item.extract::<PyBackedStr>())
477                .collect::<PyResult<Vec<PyBackedStr>>>()?;
478            self.select_exprs(keys)
479        } else if let Ok(keys) = key.extract::<Vec<PyBackedStr>>() {
480            // df[[col1, col2, col3]]
481            self.select_exprs(keys)
482        } else {
483            let message = "DataFrame can only be indexed by string index or indices".to_string();
484            Err(PyDataFusionError::Common(message))
485        }
486    }
487
488    fn __repr__(&self, py: Python) -> PyDataFusionResult<String> {
489        self.prepare_repr_string(py, false)
490    }
491
492    #[staticmethod]
493    #[expect(unused_variables)]
494    fn default_str_repr<'py>(
495        batches: Vec<Bound<'py, PyAny>>,
496        schema: &Bound<'py, PyAny>,
497        has_more: bool,
498        table_uuid: &str,
499    ) -> PyResult<String> {
500        let batches = batches
501            .into_iter()
502            .map(|batch| RecordBatch::from_pyarrow_bound(&batch))
503            .collect::<PyResult<Vec<RecordBatch>>>()?
504            .into_iter()
505            .filter(|batch| batch.num_rows() > 0)
506            .collect::<Vec<_>>();
507
508        if batches.is_empty() {
509            return Ok("No data to display".to_owned());
510        }
511
512        let batches_as_displ =
513            pretty::pretty_format_batches(&batches).map_err(py_datafusion_err)?;
514
515        let additional_str = match has_more {
516            true => "\nData truncated.",
517            false => "",
518        };
519
520        Ok(format!("DataFrame()\n{batches_as_displ}{additional_str}"))
521    }
522
523    fn _repr_html_(&self, py: Python) -> PyDataFusionResult<String> {
524        self.prepare_repr_string(py, true)
525    }
526
527    /// Calculate summary statistics for a DataFrame
528    fn describe(&self, py: Python) -> PyDataFusionResult<Self> {
529        let df = self.df.as_ref().clone();
530        let stat_df = wait_for_future(py, df.describe())??;
531        Ok(Self::new(stat_df))
532    }
533
534    /// Returns the schema from the logical plan
535    fn schema(&self) -> PyArrowType<Schema> {
536        PyArrowType(self.df.schema().as_arrow().clone())
537    }
538
539    /// Convert this DataFrame into a Table Provider that can be used in register_table
540    /// By convention, into_... methods consume self and return the new object.
541    /// Disabling the clippy lint, so we can use &self
542    /// because we're working with Python bindings
543    /// where objects are shared
544    #[allow(clippy::wrong_self_convention)]
545    pub fn into_view(&self, temporary: bool) -> PyDataFusionResult<PyTable> {
546        let table_provider = if temporary {
547            Arc::new(TempViewTable::new(Arc::clone(&self.df))) as Arc<dyn TableProvider>
548        } else {
549            // Call the underlying Rust DataFrame::into_view method.
550            // Note that the Rust method consumes self; here we clone the inner Arc<DataFrame>
551            // so that we don't invalidate this PyDataFrame.
552            self.df.as_ref().clone().into_view()
553        };
554        Ok(PyTable::from(table_provider))
555    }
556
557    #[pyo3(signature = (*args))]
558    fn select_exprs(&self, args: Vec<PyBackedStr>) -> PyDataFusionResult<Self> {
559        let args = args.iter().map(|s| s.as_ref()).collect::<Vec<&str>>();
560        let df = self.df.as_ref().clone().select_exprs(&args)?;
561        Ok(Self::new(df))
562    }
563
564    #[pyo3(signature = (*args))]
565    fn select(&self, args: Vec<PyExpr>) -> PyDataFusionResult<Self> {
566        let expr: Vec<Expr> = args.into_iter().map(|e| e.into()).collect();
567        let df = self.df.as_ref().clone().select(expr)?;
568        Ok(Self::new(df))
569    }
570
571    #[pyo3(signature = (*args))]
572    fn drop(&self, args: Vec<PyBackedStr>) -> PyDataFusionResult<Self> {
573        let cols = args.iter().map(|s| s.as_ref()).collect::<Vec<&str>>();
574        let df = self.df.as_ref().clone().drop_columns(&cols)?;
575        Ok(Self::new(df))
576    }
577
578    /// Apply window function expressions to the DataFrame
579    #[pyo3(signature = (*exprs))]
580    fn window(&self, exprs: Vec<PyExpr>) -> PyDataFusionResult<Self> {
581        let window_exprs = exprs.into_iter().map(|e| e.into()).collect();
582        let df = self.df.as_ref().clone().window(window_exprs)?;
583        Ok(Self::new(df))
584    }
585
586    fn filter(&self, predicate: PyExpr) -> PyDataFusionResult<Self> {
587        let df = self.df.as_ref().clone().filter(predicate.into())?;
588        Ok(Self::new(df))
589    }
590
591    fn parse_sql_expr(&self, expr: PyBackedStr) -> PyDataFusionResult<PyExpr> {
592        self.df
593            .as_ref()
594            .parse_sql_expr(&expr)
595            .map(PyExpr::from)
596            .map_err(PyDataFusionError::from)
597    }
598
599    fn with_column(&self, name: &str, expr: PyExpr) -> PyDataFusionResult<Self> {
600        let df = self.df.as_ref().clone().with_column(name, expr.into())?;
601        Ok(Self::new(df))
602    }
603
604    fn with_columns(&self, exprs: Vec<PyExpr>) -> PyDataFusionResult<Self> {
605        let mut df = self.df.as_ref().clone();
606        for expr in exprs {
607            let expr: Expr = expr.into();
608            let name = format!("{}", expr.schema_name());
609            df = df.with_column(name.as_str(), expr)?
610        }
611        Ok(Self::new(df))
612    }
613
614    /// Rename one column by applying a new projection. This is a no-op if the column to be
615    /// renamed does not exist.
616    fn with_column_renamed(&self, old_name: &str, new_name: &str) -> PyDataFusionResult<Self> {
617        let df = self
618            .df
619            .as_ref()
620            .clone()
621            .with_column_renamed(old_name, new_name)?;
622        Ok(Self::new(df))
623    }
624
625    fn aggregate(&self, group_by: Vec<PyExpr>, aggs: Vec<PyExpr>) -> PyDataFusionResult<Self> {
626        let group_by = group_by.into_iter().map(|e| e.into()).collect();
627        let aggs = aggs.into_iter().map(|e| e.into()).collect();
628        let df = self.df.as_ref().clone().aggregate(group_by, aggs)?;
629        Ok(Self::new(df))
630    }
631
632    #[pyo3(signature = (*exprs))]
633    fn sort(&self, exprs: Vec<PySortExpr>) -> PyDataFusionResult<Self> {
634        let exprs = to_sort_expressions(exprs);
635        let df = self.df.as_ref().clone().sort(exprs)?;
636        Ok(Self::new(df))
637    }
638
639    #[pyo3(signature = (count, offset=0))]
640    fn limit(&self, count: usize, offset: usize) -> PyDataFusionResult<Self> {
641        let df = self.df.as_ref().clone().limit(offset, Some(count))?;
642        Ok(Self::new(df))
643    }
644
645    /// Executes the plan, returning a list of `RecordBatch`es.
646    /// Unless some order is specified in the plan, there is no
647    /// guarantee of the order of the result.
648    fn collect<'py>(&self, py: Python<'py>) -> PyResult<Vec<Bound<'py, PyAny>>> {
649        let batches = wait_for_future(py, self.df.as_ref().clone().collect())?
650            .map_err(PyDataFusionError::from)?;
651        // cannot use PyResult<Vec<RecordBatch>> return type due to
652        // https://github.com/PyO3/pyo3/issues/1813
653        batches.into_iter().map(|rb| rb.to_pyarrow(py)).collect()
654    }
655
656    /// Cache DataFrame.
657    fn cache(&self, py: Python) -> PyDataFusionResult<Self> {
658        let df = wait_for_future(py, self.df.as_ref().clone().cache())??;
659        Ok(Self::new(df))
660    }
661
662    /// Executes this DataFrame and collects all results into a vector of vector of RecordBatch
663    /// maintaining the input partitioning.
664    fn collect_partitioned<'py>(&self, py: Python<'py>) -> PyResult<Vec<Vec<Bound<'py, PyAny>>>> {
665        let batches = wait_for_future(py, self.df.as_ref().clone().collect_partitioned())?
666            .map_err(PyDataFusionError::from)?;
667
668        batches
669            .into_iter()
670            .map(|rbs| rbs.into_iter().map(|rb| rb.to_pyarrow(py)).collect())
671            .collect()
672    }
673
674    fn collect_column<'py>(&self, py: Python<'py>, column: &str) -> PyResult<Bound<'py, PyAny>> {
675        wait_for_future(py, self.collect_column_inner(column))?
676            .map_err(PyDataFusionError::from)?
677            .to_data()
678            .to_pyarrow(py)
679    }
680
681    /// Print the result, 20 lines by default
682    #[pyo3(signature = (num=20))]
683    fn show(&self, py: Python, num: usize) -> PyDataFusionResult<()> {
684        let df = self.df.as_ref().clone().limit(0, Some(num))?;
685        print_dataframe(py, df)
686    }
687
688    /// Filter out duplicate rows
689    fn distinct(&self) -> PyDataFusionResult<Self> {
690        let df = self.df.as_ref().clone().distinct()?;
691        Ok(Self::new(df))
692    }
693
694    fn join(
695        &self,
696        right: PyDataFrame,
697        how: &str,
698        left_on: Vec<PyBackedStr>,
699        right_on: Vec<PyBackedStr>,
700        coalesce_keys: bool,
701    ) -> PyDataFusionResult<Self> {
702        let join_type = match how {
703            "inner" => JoinType::Inner,
704            "left" => JoinType::Left,
705            "right" => JoinType::Right,
706            "full" => JoinType::Full,
707            "semi" => JoinType::LeftSemi,
708            "anti" => JoinType::LeftAnti,
709            how => {
710                return Err(PyDataFusionError::Common(format!(
711                    "The join type {how} does not exist or is not implemented"
712                )));
713            }
714        };
715
716        let left_keys = left_on.iter().map(|s| s.as_ref()).collect::<Vec<&str>>();
717        let right_keys = right_on.iter().map(|s| s.as_ref()).collect::<Vec<&str>>();
718
719        let mut df = self.df.as_ref().clone().join(
720            right.df.as_ref().clone(),
721            join_type,
722            &left_keys,
723            &right_keys,
724            None,
725        )?;
726
727        if coalesce_keys {
728            let mutual_keys = left_keys
729                .iter()
730                .zip(right_keys.iter())
731                .filter(|(l, r)| l == r)
732                .map(|(key, _)| *key)
733                .collect::<Vec<_>>();
734
735            let fields_to_coalesce = mutual_keys
736                .iter()
737                .map(|name| {
738                    let qualified_fields = df
739                        .logical_plan()
740                        .schema()
741                        .qualified_fields_with_unqualified_name(name);
742                    (*name, qualified_fields)
743                })
744                .filter(|(_, fields)| fields.len() == 2)
745                .collect::<Vec<_>>();
746
747            let expr: Vec<Expr> = df
748                .logical_plan()
749                .schema()
750                .fields()
751                .into_iter()
752                .enumerate()
753                .map(|(idx, _)| df.logical_plan().schema().qualified_field(idx))
754                .filter_map(|(qualifier, field)| {
755                    if let Some((key_name, qualified_fields)) = fields_to_coalesce
756                        .iter()
757                        .find(|(_, qf)| qf.contains(&(qualifier, field)))
758                    {
759                        // Only add the coalesce expression once (when we encounter the first field)
760                        // Skip the second field (it's already included in to coalesce)
761                        if (qualifier, field) == qualified_fields[0] {
762                            let left_col = Expr::Column(Column::from(qualified_fields[0]));
763                            let right_col = Expr::Column(Column::from(qualified_fields[1]));
764                            return Some(coalesce(vec![left_col, right_col]).alias(*key_name));
765                        }
766                        None
767                    } else {
768                        Some(Expr::Column(Column::from((qualifier, field))))
769                    }
770                })
771                .collect();
772            df = df.select(expr)?;
773        }
774
775        Ok(Self::new(df))
776    }
777
778    fn join_on(
779        &self,
780        right: PyDataFrame,
781        on_exprs: Vec<PyExpr>,
782        how: &str,
783    ) -> PyDataFusionResult<Self> {
784        let join_type = match how {
785            "inner" => JoinType::Inner,
786            "left" => JoinType::Left,
787            "right" => JoinType::Right,
788            "full" => JoinType::Full,
789            "semi" => JoinType::LeftSemi,
790            "anti" => JoinType::LeftAnti,
791            how => {
792                return Err(PyDataFusionError::Common(format!(
793                    "The join type {how} does not exist or is not implemented"
794                )));
795            }
796        };
797        let exprs: Vec<Expr> = on_exprs.into_iter().map(|e| e.into()).collect();
798
799        let df = self
800            .df
801            .as_ref()
802            .clone()
803            .join_on(right.df.as_ref().clone(), join_type, exprs)?;
804        Ok(Self::new(df))
805    }
806
807    /// Print the query plan
808    #[pyo3(signature = (verbose=false, analyze=false, format=None))]
809    fn explain(
810        &self,
811        py: Python,
812        verbose: bool,
813        analyze: bool,
814        format: Option<&str>,
815    ) -> PyDataFusionResult<()> {
816        let explain_format = match format {
817            Some(f) => f
818                .parse::<datafusion::common::format::ExplainFormat>()
819                .map_err(|e| {
820                    PyDataFusionError::Common(format!("Invalid explain format '{}': {}", f, e))
821                })?,
822            None => datafusion::common::format::ExplainFormat::Indent,
823        };
824        let opts = datafusion::logical_expr::ExplainOption::default()
825            .with_verbose(verbose)
826            .with_analyze(analyze)
827            .with_format(explain_format);
828        let df = self.df.as_ref().clone().explain_with_options(opts)?;
829        print_dataframe(py, df)
830    }
831
832    /// Get the logical plan for this `DataFrame`
833    fn logical_plan(&self) -> PyResult<PyLogicalPlan> {
834        Ok(self.df.as_ref().clone().logical_plan().clone().into())
835    }
836
837    /// Get the optimized logical plan for this `DataFrame`
838    fn optimized_logical_plan(&self) -> PyDataFusionResult<PyLogicalPlan> {
839        Ok(self.df.as_ref().clone().into_optimized_plan()?.into())
840    }
841
842    /// Get the execution plan for this `DataFrame`
843    fn execution_plan(&self, py: Python) -> PyDataFusionResult<PyExecutionPlan> {
844        let plan = wait_for_future(py, self.df.as_ref().clone().create_physical_plan())??;
845        Ok(plan.into())
846    }
847
848    /// Repartition a `DataFrame` based on a logical partitioning scheme.
849    fn repartition(&self, num: usize) -> PyDataFusionResult<Self> {
850        let new_df = self
851            .df
852            .as_ref()
853            .clone()
854            .repartition(Partitioning::RoundRobinBatch(num))?;
855        Ok(Self::new(new_df))
856    }
857
858    /// Repartition a `DataFrame` based on a logical partitioning scheme.
859    #[pyo3(signature = (*args, num))]
860    fn repartition_by_hash(&self, args: Vec<PyExpr>, num: usize) -> PyDataFusionResult<Self> {
861        let expr = args.into_iter().map(|py_expr| py_expr.into()).collect();
862        let new_df = self
863            .df
864            .as_ref()
865            .clone()
866            .repartition(Partitioning::Hash(expr, num))?;
867        Ok(Self::new(new_df))
868    }
869
870    /// Calculate the union of two `DataFrame`s, preserving duplicate rows.The
871    /// two `DataFrame`s must have exactly the same schema
872    #[pyo3(signature = (py_df, distinct=false))]
873    fn union(&self, py_df: PyDataFrame, distinct: bool) -> PyDataFusionResult<Self> {
874        let new_df = if distinct {
875            self.df
876                .as_ref()
877                .clone()
878                .union_distinct(py_df.df.as_ref().clone())?
879        } else {
880            self.df.as_ref().clone().union(py_df.df.as_ref().clone())?
881        };
882
883        Ok(Self::new(new_df))
884    }
885
886    #[pyo3(signature = (columns, preserve_nulls=true, recursions=None))]
887    fn unnest_columns(
888        &self,
889        columns: Vec<String>,
890        preserve_nulls: bool,
891        recursions: Option<Vec<(String, String, usize)>>,
892    ) -> PyDataFusionResult<Self> {
893        let unnest_options = build_unnest_options(preserve_nulls, recursions);
894        let cols = columns.iter().map(|s| s.as_ref()).collect::<Vec<&str>>();
895        let df = self
896            .df
897            .as_ref()
898            .clone()
899            .unnest_columns_with_options(&cols, unnest_options)?;
900        Ok(Self::new(df))
901    }
902
903    /// Calculate the intersection of two `DataFrame`s.  The two `DataFrame`s must have exactly the same schema
904    #[pyo3(signature = (py_df, distinct=false))]
905    fn intersect(&self, py_df: PyDataFrame, distinct: bool) -> PyDataFusionResult<Self> {
906        let base = self.df.as_ref().clone();
907        let other = py_df.df.as_ref().clone();
908        let new_df = if distinct {
909            base.intersect_distinct(other)?
910        } else {
911            base.intersect(other)?
912        };
913        Ok(Self::new(new_df))
914    }
915
916    /// Calculate the exception of two `DataFrame`s.  The two `DataFrame`s must have exactly the same schema
917    #[pyo3(signature = (py_df, distinct=false))]
918    fn except_all(&self, py_df: PyDataFrame, distinct: bool) -> PyDataFusionResult<Self> {
919        let base = self.df.as_ref().clone();
920        let other = py_df.df.as_ref().clone();
921        let new_df = if distinct {
922            base.except_distinct(other)?
923        } else {
924            base.except(other)?
925        };
926        Ok(Self::new(new_df))
927    }
928
929    /// Union two DataFrames matching columns by name
930    #[pyo3(signature = (py_df, distinct=false))]
931    fn union_by_name(&self, py_df: PyDataFrame, distinct: bool) -> PyDataFusionResult<Self> {
932        let base = self.df.as_ref().clone();
933        let other = py_df.df.as_ref().clone();
934        let new_df = if distinct {
935            base.union_by_name_distinct(other)?
936        } else {
937            base.union_by_name(other)?
938        };
939        Ok(Self::new(new_df))
940    }
941
942    /// Deduplicate rows based on specific columns, keeping the first row per group
943    fn distinct_on(
944        &self,
945        on_expr: Vec<PyExpr>,
946        select_expr: Vec<PyExpr>,
947        sort_expr: Option<Vec<PySortExpr>>,
948    ) -> PyDataFusionResult<Self> {
949        let on_expr = on_expr.into_iter().map(|e| e.into()).collect();
950        let select_expr = select_expr.into_iter().map(|e| e.into()).collect();
951        let sort_expr = sort_expr.map(to_sort_expressions);
952        let df = self
953            .df
954            .as_ref()
955            .clone()
956            .distinct_on(on_expr, select_expr, sort_expr)?;
957        Ok(Self::new(df))
958    }
959
960    /// Sort by column expressions with ascending order and nulls last
961    fn sort_by(&self, exprs: Vec<PyExpr>) -> PyDataFusionResult<Self> {
962        let exprs = exprs.into_iter().map(|e| e.into()).collect();
963        let df = self.df.as_ref().clone().sort_by(exprs)?;
964        Ok(Self::new(df))
965    }
966
967    /// Return fully qualified column expressions for the given column names
968    fn find_qualified_columns(&self, names: Vec<String>) -> PyDataFusionResult<Vec<PyExpr>> {
969        let name_refs: Vec<&str> = names.iter().map(|s| s.as_str()).collect();
970        let qualified = self.df.find_qualified_columns(&name_refs)?;
971        Ok(qualified
972            .into_iter()
973            .map(|q| Expr::Column(Column::from(q)).into())
974            .collect())
975    }
976
977    /// Write a `DataFrame` to a CSV file.
978    fn write_csv(
979        &self,
980        py: Python,
981        path: &str,
982        with_header: bool,
983        write_options: Option<PyDataFrameWriteOptions>,
984    ) -> PyDataFusionResult<()> {
985        let csv_options = CsvOptions {
986            has_header: Some(with_header),
987            ..Default::default()
988        };
989        let write_options = write_options
990            .map(DataFrameWriteOptions::from)
991            .unwrap_or_default();
992
993        wait_for_future(
994            py,
995            self.df
996                .as_ref()
997                .clone()
998                .write_csv(path, write_options, Some(csv_options)),
999        )??;
1000        Ok(())
1001    }
1002
1003    /// Write a `DataFrame` to a Parquet file.
1004    #[pyo3(signature = (
1005        path,
1006        compression="zstd",
1007        compression_level=None,
1008        write_options=None,
1009        ))]
1010    fn write_parquet(
1011        &self,
1012        path: &str,
1013        compression: &str,
1014        compression_level: Option<u32>,
1015        write_options: Option<PyDataFrameWriteOptions>,
1016        py: Python,
1017    ) -> PyDataFusionResult<()> {
1018        fn verify_compression_level(cl: Option<u32>) -> Result<u32, PyErr> {
1019            cl.ok_or(PyValueError::new_err("compression_level is not defined"))
1020        }
1021
1022        let _validated = match compression.to_lowercase().as_str() {
1023            "snappy" => Compression::SNAPPY,
1024            "gzip" => Compression::GZIP(
1025                GzipLevel::try_new(compression_level.unwrap_or(6))
1026                    .map_err(|e| PyValueError::new_err(format!("{e}")))?,
1027            ),
1028            "brotli" => Compression::BROTLI(
1029                BrotliLevel::try_new(verify_compression_level(compression_level)?)
1030                    .map_err(|e| PyValueError::new_err(format!("{e}")))?,
1031            ),
1032            "zstd" => Compression::ZSTD(
1033                ZstdLevel::try_new(verify_compression_level(compression_level)? as i32)
1034                    .map_err(|e| PyValueError::new_err(format!("{e}")))?,
1035            ),
1036            "lzo" => Compression::LZO,
1037            "lz4" => Compression::LZ4,
1038            "lz4_raw" => Compression::LZ4_RAW,
1039            "uncompressed" => Compression::UNCOMPRESSED,
1040            _ => {
1041                return Err(PyDataFusionError::Common(format!(
1042                    "Unrecognized compression type {compression}"
1043                )));
1044            }
1045        };
1046
1047        let mut compression_string = compression.to_string();
1048        if let Some(level) = compression_level {
1049            compression_string.push_str(&format!("({level})"));
1050        }
1051
1052        let mut options = TableParquetOptions::default();
1053        options.global.compression = Some(compression_string);
1054        let write_options = write_options
1055            .map(DataFrameWriteOptions::from)
1056            .unwrap_or_default();
1057
1058        wait_for_future(
1059            py,
1060            self.df
1061                .as_ref()
1062                .clone()
1063                .write_parquet(path, write_options, Option::from(options)),
1064        )??;
1065        Ok(())
1066    }
1067
1068    /// Write a `DataFrame` to a Parquet file, using advanced options.
1069    fn write_parquet_with_options(
1070        &self,
1071        path: &str,
1072        options: PyParquetWriterOptions,
1073        column_specific_options: HashMap<String, PyParquetColumnOptions>,
1074        write_options: Option<PyDataFrameWriteOptions>,
1075        py: Python,
1076    ) -> PyDataFusionResult<()> {
1077        let table_options = TableParquetOptions {
1078            global: options.options,
1079            column_specific_options: column_specific_options
1080                .into_iter()
1081                .map(|(k, v)| (k, v.options))
1082                .collect(),
1083            ..Default::default()
1084        };
1085        let write_options = write_options
1086            .map(DataFrameWriteOptions::from)
1087            .unwrap_or_default();
1088        wait_for_future(
1089            py,
1090            self.df.as_ref().clone().write_parquet(
1091                path,
1092                write_options,
1093                Option::from(table_options),
1094            ),
1095        )??;
1096        Ok(())
1097    }
1098
1099    /// Executes a query and writes the results to a partitioned JSON file.
1100    fn write_json(
1101        &self,
1102        path: &str,
1103        py: Python,
1104        write_options: Option<PyDataFrameWriteOptions>,
1105    ) -> PyDataFusionResult<()> {
1106        let write_options = write_options
1107            .map(DataFrameWriteOptions::from)
1108            .unwrap_or_default();
1109        wait_for_future(
1110            py,
1111            self.df
1112                .as_ref()
1113                .clone()
1114                .write_json(path, write_options, None),
1115        )??;
1116        Ok(())
1117    }
1118
1119    fn write_table(
1120        &self,
1121        py: Python,
1122        table_name: &str,
1123        write_options: Option<PyDataFrameWriteOptions>,
1124    ) -> PyDataFusionResult<()> {
1125        let write_options = write_options
1126            .map(DataFrameWriteOptions::from)
1127            .unwrap_or_default();
1128        wait_for_future(
1129            py,
1130            self.df
1131                .as_ref()
1132                .clone()
1133                .write_table(table_name, write_options),
1134        )??;
1135        Ok(())
1136    }
1137
1138    /// Convert to Arrow Table
1139    /// Collect the batches and pass to Arrow Table
1140    fn to_arrow_table(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
1141        let batches = self.collect(py)?.into_pyobject(py)?;
1142
1143        // only use the DataFrame's schema if there are no batches, otherwise let the schema be
1144        // determined from the batches (avoids some inconsistencies with nullable columns)
1145        let args = if batches.len()? == 0 {
1146            let schema = self.schema().into_pyobject(py)?;
1147            PyTuple::new(py, &[batches, schema])?
1148        } else {
1149            PyTuple::new(py, &[batches])?
1150        };
1151
1152        // Instantiate pyarrow Table object and use its from_batches method
1153        let table_class = py.import("pyarrow")?.getattr("Table")?;
1154        let table: Py<PyAny> = table_class.call_method1("from_batches", args)?.into();
1155        Ok(table)
1156    }
1157
1158    #[pyo3(signature = (requested_schema=None))]
1159    fn __arrow_c_stream__<'py>(
1160        &'py self,
1161        py: Python<'py>,
1162        requested_schema: Option<Bound<'py, PyCapsule>>,
1163    ) -> PyDataFusionResult<Bound<'py, PyCapsule>> {
1164        let df = self.df.as_ref().clone();
1165        let streams = spawn_future(py, async move { df.execute_stream_partitioned().await })?;
1166
1167        let mut schema: Schema = self.df.schema().to_owned().as_arrow().clone();
1168        let mut projection: Option<SchemaRef> = None;
1169
1170        if let Some(schema_capsule) = requested_schema {
1171            let data: NonNull<FFI_ArrowSchema> = schema_capsule
1172                .pointer_checked(Some(c"arrow_schema"))?
1173                .cast();
1174            let schema_ptr = unsafe { data.as_ref() };
1175            let desired_schema = Schema::try_from(schema_ptr)?;
1176
1177            schema = project_schema(schema, desired_schema)?;
1178            projection = Some(Arc::new(schema.clone()));
1179        }
1180
1181        let schema_ref = Arc::new(schema.clone());
1182
1183        let reader = PartitionedDataFrameStreamReader {
1184            streams,
1185            schema: schema_ref,
1186            projection,
1187            current: 0,
1188        };
1189        let reader: Box<dyn RecordBatchReader + Send> = Box::new(reader);
1190
1191        // Create the Arrow stream and wrap it in a PyCapsule. The default
1192        // destructor provided by PyO3 will drop the stream unless ownership is
1193        // transferred to PyArrow during import.
1194        let stream = FFI_ArrowArrayStream::new(reader);
1195        let name = CString::new(ARROW_ARRAY_STREAM_NAME.to_bytes()).unwrap();
1196        let capsule = PyCapsule::new(py, stream, Some(name))?;
1197        Ok(capsule)
1198    }
1199
1200    fn execute_stream(&self, py: Python) -> PyDataFusionResult<PyRecordBatchStream> {
1201        let df = self.df.as_ref().clone();
1202        let stream = spawn_future(py, async move { df.execute_stream().await })?;
1203        Ok(PyRecordBatchStream::new(stream))
1204    }
1205
1206    fn execute_stream_partitioned(&self, py: Python) -> PyResult<Vec<PyRecordBatchStream>> {
1207        let df = self.df.as_ref().clone();
1208        let streams = spawn_future(py, async move { df.execute_stream_partitioned().await })?;
1209        Ok(streams.into_iter().map(PyRecordBatchStream::new).collect())
1210    }
1211
1212    /// Convert to pandas dataframe with pyarrow
1213    /// Collect the batches, pass to Arrow Table & then convert to Pandas DataFrame
1214    fn to_pandas(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
1215        let table = self.to_arrow_table(py)?;
1216
1217        // See also: https://arrow.apache.org/docs/python/generated/pyarrow.Table.html#pyarrow.Table.to_pandas
1218        let result = table.call_method0(py, "to_pandas")?;
1219        Ok(result)
1220    }
1221
1222    /// Convert to Python list using pyarrow
1223    /// Each list item represents one row encoded as dictionary
1224    fn to_pylist(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
1225        let table = self.to_arrow_table(py)?;
1226
1227        // See also: https://arrow.apache.org/docs/python/generated/pyarrow.Table.html#pyarrow.Table.to_pylist
1228        let result = table.call_method0(py, "to_pylist")?;
1229        Ok(result)
1230    }
1231
1232    /// Convert to Python dictionary using pyarrow
1233    /// Each dictionary key is a column and the dictionary value represents the column values
1234    fn to_pydict(&self, py: Python) -> PyResult<Py<PyAny>> {
1235        let table = self.to_arrow_table(py)?;
1236
1237        // See also: https://arrow.apache.org/docs/python/generated/pyarrow.Table.html#pyarrow.Table.to_pydict
1238        let result = table.call_method0(py, "to_pydict")?;
1239        Ok(result)
1240    }
1241
1242    /// Convert to polars dataframe with pyarrow
1243    /// Collect the batches, pass to Arrow Table & then convert to polars DataFrame
1244    fn to_polars(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
1245        let table = self.to_arrow_table(py)?;
1246        let dataframe = py.import("polars")?.getattr("DataFrame")?;
1247        let args = PyTuple::new(py, &[table])?;
1248        let result: Py<PyAny> = dataframe.call1(args)?.into();
1249        Ok(result)
1250    }
1251
1252    // Executes this DataFrame to get the total number of rows.
1253    fn count(&self, py: Python) -> PyDataFusionResult<usize> {
1254        Ok(wait_for_future(py, self.df.as_ref().clone().count())??)
1255    }
1256
1257    /// Fill null values with a specified value for specific columns
1258    #[pyo3(signature = (value, columns=None))]
1259    fn fill_null(
1260        &self,
1261        value: Py<PyAny>,
1262        columns: Option<Vec<PyBackedStr>>,
1263        py: Python,
1264    ) -> PyDataFusionResult<Self> {
1265        let scalar_value: PyScalarValue = value.extract(py)?;
1266
1267        let cols = match columns {
1268            Some(col_names) => col_names.iter().map(|c| c.to_string()).collect(),
1269            None => Vec::new(), // Empty vector means fill null for all columns
1270        };
1271
1272        let df = self.df.as_ref().clone().fill_null(scalar_value.0, cols)?;
1273        Ok(Self::new(df))
1274    }
1275}
1276
1277#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
1278#[pyclass(
1279    from_py_object,
1280    frozen,
1281    eq,
1282    eq_int,
1283    name = "InsertOp",
1284    module = "datafusion"
1285)]
1286pub enum PyInsertOp {
1287    APPEND,
1288    REPLACE,
1289    OVERWRITE,
1290}
1291
1292impl From<PyInsertOp> for InsertOp {
1293    fn from(value: PyInsertOp) -> Self {
1294        match value {
1295            PyInsertOp::APPEND => InsertOp::Append,
1296            PyInsertOp::REPLACE => InsertOp::Replace,
1297            PyInsertOp::OVERWRITE => InsertOp::Overwrite,
1298        }
1299    }
1300}
1301
1302#[derive(Debug, Clone)]
1303#[pyclass(
1304    from_py_object,
1305    frozen,
1306    name = "DataFrameWriteOptions",
1307    module = "datafusion"
1308)]
1309pub struct PyDataFrameWriteOptions {
1310    insert_operation: InsertOp,
1311    single_file_output: bool,
1312    partition_by: Vec<String>,
1313    sort_by: Vec<SortExpr>,
1314}
1315
1316impl From<PyDataFrameWriteOptions> for DataFrameWriteOptions {
1317    fn from(value: PyDataFrameWriteOptions) -> Self {
1318        DataFrameWriteOptions::new()
1319            .with_insert_operation(value.insert_operation)
1320            .with_single_file_output(value.single_file_output)
1321            .with_partition_by(value.partition_by)
1322            .with_sort_by(value.sort_by)
1323    }
1324}
1325
1326#[pymethods]
1327impl PyDataFrameWriteOptions {
1328    #[new]
1329    fn new(
1330        insert_operation: Option<PyInsertOp>,
1331        single_file_output: bool,
1332        partition_by: Option<Vec<String>>,
1333        sort_by: Option<Vec<PySortExpr>>,
1334    ) -> Self {
1335        let insert_operation = insert_operation.map(Into::into).unwrap_or(InsertOp::Append);
1336        let sort_by = sort_by
1337            .unwrap_or_default()
1338            .into_iter()
1339            .map(Into::into)
1340            .collect();
1341        Self {
1342            insert_operation,
1343            single_file_output,
1344            partition_by: partition_by.unwrap_or_default(),
1345            sort_by,
1346        }
1347    }
1348}
1349
1350fn build_unnest_options(
1351    preserve_nulls: bool,
1352    recursions: Option<Vec<(String, String, usize)>>,
1353) -> UnnestOptions {
1354    let mut opts = UnnestOptions::default().with_preserve_nulls(preserve_nulls);
1355    if let Some(recs) = recursions {
1356        opts.recursions = recs
1357            .into_iter()
1358            .map(
1359                |(input, output, depth)| datafusion::common::RecursionUnnestOption {
1360                    input_column: datafusion::common::Column::from(input.as_str()),
1361                    output_column: datafusion::common::Column::from(output.as_str()),
1362                    depth,
1363                },
1364            )
1365            .collect();
1366    }
1367    opts
1368}
1369
1370/// Print DataFrame
1371fn print_dataframe(py: Python, df: DataFrame) -> PyDataFusionResult<()> {
1372    // Get string representation of record batches
1373    let batches = wait_for_future(py, df.collect())??;
1374    let result = if batches.is_empty() {
1375        "DataFrame has no rows".to_string()
1376    } else {
1377        match pretty::pretty_format_batches(&batches) {
1378            Ok(batch) => format!("DataFrame()\n{batch}"),
1379            Err(err) => format!("Error: {:?}", err.to_string()),
1380        }
1381    };
1382
1383    // Import the Python 'builtins' module to access the print function
1384    // Note that println! does not print to the Python debug console and is not visible in notebooks for instance
1385    let print = py.import("builtins")?.getattr("print")?;
1386    print.call1((result,))?;
1387    Ok(())
1388}
1389
1390fn project_schema(from_schema: Schema, to_schema: Schema) -> Result<Schema, ArrowError> {
1391    let merged_schema = Schema::try_merge(vec![from_schema, to_schema.clone()])?;
1392
1393    let project_indices: Vec<usize> = to_schema
1394        .fields
1395        .iter()
1396        .map(|field| field.name())
1397        .filter_map(|field_name| merged_schema.index_of(field_name).ok())
1398        .collect();
1399
1400    merged_schema.project(&project_indices)
1401}
1402// NOTE: `arrow::compute::cast` in combination with `RecordBatch::try_select` or
1403// DataFusion's `schema::cast_record_batch` do not fully cover the required
1404// transformations here. They will not create missing columns and may insert
1405// nulls for non-nullable fields without erroring. To maintain current behavior
1406// we perform the casting and null checks manually.
1407fn record_batch_into_schema(
1408    record_batch: RecordBatch,
1409    schema: &Schema,
1410) -> Result<RecordBatch, ArrowError> {
1411    let schema = Arc::new(schema.clone());
1412    let base_schema = record_batch.schema();
1413    if base_schema.fields().is_empty() {
1414        // Nothing to project
1415        return Ok(RecordBatch::new_empty(schema));
1416    }
1417
1418    let array_size = record_batch.column(0).len();
1419    let mut data_arrays = Vec::with_capacity(schema.fields().len());
1420
1421    for field in schema.fields() {
1422        let desired_data_type = field.data_type();
1423        if let Some(original_data) = record_batch.column_by_name(field.name()) {
1424            let original_data_type = original_data.data_type();
1425
1426            if can_cast_types(original_data_type, desired_data_type) {
1427                data_arrays.push(arrow::compute::kernels::cast(
1428                    original_data,
1429                    desired_data_type,
1430                )?);
1431            } else if field.is_nullable() {
1432                data_arrays.push(new_null_array(desired_data_type, array_size));
1433            } else {
1434                return Err(ArrowError::CastError(format!(
1435                    "Attempting to cast to non-nullable and non-castable field {} during schema projection.",
1436                    field.name()
1437                )));
1438            }
1439        } else {
1440            if !field.is_nullable() {
1441                return Err(ArrowError::CastError(format!(
1442                    "Attempting to set null to non-nullable field {} during schema projection.",
1443                    field.name()
1444                )));
1445            }
1446            data_arrays.push(new_null_array(desired_data_type, array_size));
1447        }
1448    }
1449
1450    RecordBatch::try_new(schema, data_arrays)
1451}
1452
1453/// This is a helper function to return the first non-empty record batch from executing a DataFrame.
1454/// It additionally returns a bool, which indicates if there are more record batches available.
1455/// We do this so we can determine if we should indicate to the user that the data has been
1456/// truncated. This collects until we have archived both of these two conditions
1457///
1458/// - We have collected our minimum number of rows
1459/// - We have reached our limit, either data size or maximum number of rows
1460///
1461/// Otherwise it will return when the stream has exhausted. If you want a specific number of
1462/// rows, set min_rows == max_rows.
1463async fn collect_record_batches_to_display(
1464    df: DataFrame,
1465    config: FormatterConfig,
1466) -> Result<(Vec<RecordBatch>, bool), DataFusionError> {
1467    let FormatterConfig {
1468        max_bytes,
1469        min_rows,
1470        max_rows,
1471    } = config;
1472
1473    let partitioned_stream = df.execute_stream_partitioned().await?;
1474    let mut stream = futures::stream::iter(partitioned_stream).flatten();
1475    let mut size_estimate_so_far = 0;
1476    let mut rows_so_far = 0;
1477    let mut record_batches = Vec::default();
1478    let mut has_more = false;
1479
1480    // Collect rows until we hit a limit (memory or max_rows) OR reach the guaranteed minimum.
1481    // The minimum rows constraint overrides both memory and row limits to ensure a baseline
1482    // of data is always displayed, even if it temporarily exceeds those limits.
1483    // This provides better UX by guaranteeing users see at least min_rows rows.
1484    while (size_estimate_so_far < max_bytes && rows_so_far < max_rows) || rows_so_far < min_rows {
1485        let mut rb = match stream.next().await {
1486            None => {
1487                break;
1488            }
1489            Some(Ok(r)) => r,
1490            Some(Err(e)) => return Err(e),
1491        };
1492
1493        let mut rows_in_rb = rb.num_rows();
1494        if rows_in_rb > 0 {
1495            size_estimate_so_far += rb.get_array_memory_size();
1496
1497            // When memory limit is exceeded, scale back row count proportionally to stay within budget
1498            if size_estimate_so_far > max_bytes {
1499                let ratio = max_bytes as f32 / size_estimate_so_far as f32;
1500                let total_rows = rows_in_rb + rows_so_far;
1501
1502                // Calculate reduced rows maintaining the memory/data proportion
1503                let mut reduced_row_num = (total_rows as f32 * ratio).round() as usize;
1504                // Ensure we always respect the minimum rows guarantee
1505                if reduced_row_num < min_rows {
1506                    reduced_row_num = min_rows.min(total_rows);
1507                }
1508
1509                let limited_rows_this_rb = reduced_row_num - rows_so_far;
1510                if limited_rows_this_rb < rows_in_rb {
1511                    rows_in_rb = limited_rows_this_rb;
1512                    rb = rb.slice(0, limited_rows_this_rb);
1513                    has_more = true;
1514                }
1515            }
1516
1517            if rows_in_rb + rows_so_far > max_rows {
1518                rb = rb.slice(0, max_rows - rows_so_far);
1519                has_more = true;
1520            }
1521
1522            rows_so_far += rb.num_rows();
1523            record_batches.push(rb);
1524        }
1525    }
1526
1527    if record_batches.is_empty() {
1528        return Ok((Vec::default(), false));
1529    }
1530
1531    if !has_more {
1532        // Data was not already truncated, so check to see if more record batches remain
1533        has_more = match stream.try_next().await {
1534            Ok(None) => false, // reached end
1535            Ok(Some(_)) => true,
1536            Err(_) => false, // Stream disconnected
1537        };
1538    }
1539
1540    Ok((record_batches, has_more))
1541}