polars_python/dataframe/
io.rs

1use std::borrow::Cow;
2use std::io::BufWriter;
3use std::num::NonZeroUsize;
4use std::sync::Arc;
5
6#[cfg(feature = "cloud")]
7use cloud::credential_provider::PlCredentialProvider;
8use polars::io::RowIndex;
9#[cfg(feature = "avro")]
10use polars::io::avro::AvroCompression;
11use polars::prelude::*;
12#[cfg(feature = "parquet")]
13use polars_parquet::arrow::write::StatisticsOptions;
14use pyo3::prelude::*;
15use pyo3::pybacked::PyBackedStr;
16
17use super::PyDataFrame;
18use crate::conversion::Wrap;
19#[cfg(feature = "parquet")]
20use crate::conversion::parse_parquet_compression;
21use crate::error::PyPolarsErr;
22use crate::file::{
23    EitherRustPythonFile, get_either_file, get_file_like, get_mmap_bytes_reader,
24    get_mmap_bytes_reader_and_path,
25};
26use crate::prelude::PyCompatLevel;
27#[cfg(feature = "cloud")]
28use crate::prelude::parse_cloud_options;
29use crate::utils::EnterPolarsExt;
30
31#[pymethods]
32impl PyDataFrame {
33    #[staticmethod]
34    #[cfg(feature = "csv")]
35    #[pyo3(signature = (
36    py_f, infer_schema_length, chunk_size, has_header, ignore_errors, n_rows,
37    skip_rows, skip_lines, projection, separator, rechunk, columns, encoding, n_threads, path,
38    overwrite_dtype, overwrite_dtype_slice, low_memory, comment_prefix, quote_char,
39    null_values, missing_utf8_is_empty_string, try_parse_dates, skip_rows_after_header,
40    row_index, eol_char, raise_if_empty, truncate_ragged_lines, decimal_comma, schema)
41)]
42    pub fn read_csv(
43        py: Python,
44        py_f: Bound<PyAny>,
45        infer_schema_length: Option<usize>,
46        chunk_size: usize,
47        has_header: bool,
48        ignore_errors: bool,
49        n_rows: Option<usize>,
50        skip_rows: usize,
51        skip_lines: usize,
52        projection: Option<Vec<usize>>,
53        separator: &str,
54        rechunk: bool,
55        columns: Option<Vec<String>>,
56        encoding: Wrap<CsvEncoding>,
57        n_threads: Option<usize>,
58        path: Option<String>,
59        overwrite_dtype: Option<Vec<(PyBackedStr, Wrap<DataType>)>>,
60        overwrite_dtype_slice: Option<Vec<Wrap<DataType>>>,
61        low_memory: bool,
62        comment_prefix: Option<&str>,
63        quote_char: Option<&str>,
64        null_values: Option<Wrap<NullValues>>,
65        missing_utf8_is_empty_string: bool,
66        try_parse_dates: bool,
67        skip_rows_after_header: usize,
68        row_index: Option<(String, IdxSize)>,
69        eol_char: &str,
70        raise_if_empty: bool,
71        truncate_ragged_lines: bool,
72        decimal_comma: bool,
73        schema: Option<Wrap<Schema>>,
74    ) -> PyResult<Self> {
75        let null_values = null_values.map(|w| w.0);
76        let eol_char = eol_char.as_bytes()[0];
77        let row_index = row_index.map(|(name, offset)| RowIndex {
78            name: name.into(),
79            offset,
80        });
81        let quote_char = quote_char.and_then(|s| s.as_bytes().first().copied());
82
83        let overwrite_dtype = overwrite_dtype.map(|overwrite_dtype| {
84            overwrite_dtype
85                .iter()
86                .map(|(name, dtype)| {
87                    let dtype = dtype.0.clone();
88                    Field::new((&**name).into(), dtype)
89                })
90                .collect::<Schema>()
91        });
92
93        let overwrite_dtype_slice = overwrite_dtype_slice.map(|overwrite_dtype| {
94            overwrite_dtype
95                .iter()
96                .map(|dt| dt.0.clone())
97                .collect::<Vec<_>>()
98        });
99
100        let mmap_bytes_r = get_mmap_bytes_reader(&py_f)?;
101        py.enter_polars_df(move || {
102            CsvReadOptions::default()
103                .with_path(path)
104                .with_infer_schema_length(infer_schema_length)
105                .with_has_header(has_header)
106                .with_n_rows(n_rows)
107                .with_skip_rows(skip_rows)
108                .with_skip_lines(skip_lines)
109                .with_ignore_errors(ignore_errors)
110                .with_projection(projection.map(Arc::new))
111                .with_rechunk(rechunk)
112                .with_chunk_size(chunk_size)
113                .with_columns(columns.map(|x| x.into_iter().map(|x| x.into()).collect()))
114                .with_n_threads(n_threads)
115                .with_schema_overwrite(overwrite_dtype.map(Arc::new))
116                .with_dtype_overwrite(overwrite_dtype_slice.map(Arc::new))
117                .with_schema(schema.map(|schema| Arc::new(schema.0)))
118                .with_low_memory(low_memory)
119                .with_skip_rows_after_header(skip_rows_after_header)
120                .with_row_index(row_index)
121                .with_raise_if_empty(raise_if_empty)
122                .with_parse_options(
123                    CsvParseOptions::default()
124                        .with_separator(separator.as_bytes()[0])
125                        .with_encoding(encoding.0)
126                        .with_missing_is_null(!missing_utf8_is_empty_string)
127                        .with_comment_prefix(comment_prefix)
128                        .with_null_values(null_values)
129                        .with_try_parse_dates(try_parse_dates)
130                        .with_quote_char(quote_char)
131                        .with_eol_char(eol_char)
132                        .with_truncate_ragged_lines(truncate_ragged_lines)
133                        .with_decimal_comma(decimal_comma),
134                )
135                .into_reader_with_file_handle(mmap_bytes_r)
136                .finish()
137        })
138    }
139
140    #[staticmethod]
141    #[cfg(feature = "parquet")]
142    #[pyo3(signature = (py_f, columns, projection, n_rows, row_index, low_memory, parallel, use_statistics, rechunk))]
143    pub fn read_parquet(
144        py: Python,
145        py_f: PyObject,
146        columns: Option<Vec<String>>,
147        projection: Option<Vec<usize>>,
148        n_rows: Option<usize>,
149        row_index: Option<(String, IdxSize)>,
150        low_memory: bool,
151        parallel: Wrap<ParallelStrategy>,
152        use_statistics: bool,
153        rechunk: bool,
154    ) -> PyResult<Self> {
155        use EitherRustPythonFile::*;
156
157        let row_index = row_index.map(|(name, offset)| RowIndex {
158            name: name.into(),
159            offset,
160        });
161
162        _ = use_statistics;
163
164        match get_either_file(py_f, false)? {
165            Py(f) => {
166                let buf = std::io::Cursor::new(f.to_memslice());
167                py.enter_polars_df(move || {
168                    ParquetReader::new(buf)
169                        .with_projection(projection)
170                        .with_columns(columns)
171                        .read_parallel(parallel.0)
172                        .with_slice(n_rows.map(|x| (0, x)))
173                        .with_row_index(row_index)
174                        .set_low_memory(low_memory)
175                        .set_rechunk(rechunk)
176                        .finish()
177                })
178            },
179            Rust(f) => py.enter_polars_df(move || {
180                ParquetReader::new(f)
181                    .with_projection(projection)
182                    .with_columns(columns)
183                    .read_parallel(parallel.0)
184                    .with_slice(n_rows.map(|x| (0, x)))
185                    .with_row_index(row_index)
186                    .set_rechunk(rechunk)
187                    .finish()
188            }),
189        }
190    }
191
192    #[staticmethod]
193    #[cfg(feature = "json")]
194    #[pyo3(signature = (py_f, infer_schema_length, schema, schema_overrides))]
195    pub fn read_json(
196        py: Python,
197        py_f: Bound<PyAny>,
198        infer_schema_length: Option<usize>,
199        schema: Option<Wrap<Schema>>,
200        schema_overrides: Option<Wrap<Schema>>,
201    ) -> PyResult<Self> {
202        assert!(infer_schema_length != Some(0));
203        let mmap_bytes_r = get_mmap_bytes_reader(&py_f)?;
204
205        py.enter_polars_df(move || {
206            let mut builder = JsonReader::new(mmap_bytes_r)
207                .with_json_format(JsonFormat::Json)
208                .infer_schema_len(infer_schema_length.and_then(NonZeroUsize::new));
209
210            if let Some(schema) = schema {
211                builder = builder.with_schema(Arc::new(schema.0));
212            }
213
214            if let Some(schema) = schema_overrides.as_ref() {
215                builder = builder.with_schema_overwrite(&schema.0);
216            }
217
218            builder.finish()
219        })
220    }
221
222    #[staticmethod]
223    #[cfg(feature = "json")]
224    #[pyo3(signature = (py_f, ignore_errors, schema, schema_overrides))]
225    pub fn read_ndjson(
226        py: Python,
227        py_f: Bound<PyAny>,
228        ignore_errors: bool,
229        schema: Option<Wrap<Schema>>,
230        schema_overrides: Option<Wrap<Schema>>,
231    ) -> PyResult<Self> {
232        let mmap_bytes_r = get_mmap_bytes_reader(&py_f)?;
233
234        let mut builder = JsonReader::new(mmap_bytes_r)
235            .with_json_format(JsonFormat::JsonLines)
236            .with_ignore_errors(ignore_errors);
237
238        if let Some(schema) = schema {
239            builder = builder.with_schema(Arc::new(schema.0));
240        }
241
242        if let Some(schema) = schema_overrides.as_ref() {
243            builder = builder.with_schema_overwrite(&schema.0);
244        }
245
246        py.enter_polars_df(move || builder.finish())
247    }
248
249    #[staticmethod]
250    #[cfg(feature = "ipc")]
251    #[pyo3(signature = (py_f, columns, projection, n_rows, row_index, memory_map))]
252    pub fn read_ipc(
253        py: Python,
254        py_f: Bound<PyAny>,
255        columns: Option<Vec<String>>,
256        projection: Option<Vec<usize>>,
257        n_rows: Option<usize>,
258        row_index: Option<(String, IdxSize)>,
259        memory_map: bool,
260    ) -> PyResult<Self> {
261        let row_index = row_index.map(|(name, offset)| RowIndex {
262            name: name.into(),
263            offset,
264        });
265        let (mmap_bytes_r, mmap_path) = get_mmap_bytes_reader_and_path(&py_f)?;
266
267        let mmap_path = if memory_map { mmap_path } else { None };
268        py.enter_polars_df(move || {
269            IpcReader::new(mmap_bytes_r)
270                .with_projection(projection)
271                .with_columns(columns)
272                .with_n_rows(n_rows)
273                .with_row_index(row_index)
274                .memory_mapped(mmap_path)
275                .finish()
276        })
277    }
278
279    #[staticmethod]
280    #[cfg(feature = "ipc_streaming")]
281    #[pyo3(signature = (py_f, columns, projection, n_rows, row_index, rechunk))]
282    pub fn read_ipc_stream(
283        py: Python,
284        py_f: Bound<PyAny>,
285        columns: Option<Vec<String>>,
286        projection: Option<Vec<usize>>,
287        n_rows: Option<usize>,
288        row_index: Option<(String, IdxSize)>,
289        rechunk: bool,
290    ) -> PyResult<Self> {
291        let row_index = row_index.map(|(name, offset)| RowIndex {
292            name: name.into(),
293            offset,
294        });
295        let mmap_bytes_r = get_mmap_bytes_reader(&py_f)?;
296        py.enter_polars_df(move || {
297            IpcStreamReader::new(mmap_bytes_r)
298                .with_projection(projection)
299                .with_columns(columns)
300                .with_n_rows(n_rows)
301                .with_row_index(row_index)
302                .set_rechunk(rechunk)
303                .finish()
304        })
305    }
306
307    #[staticmethod]
308    #[cfg(feature = "avro")]
309    #[pyo3(signature = (py_f, columns, projection, n_rows))]
310    pub fn read_avro(
311        py: Python,
312        py_f: PyObject,
313        columns: Option<Vec<String>>,
314        projection: Option<Vec<usize>>,
315        n_rows: Option<usize>,
316    ) -> PyResult<Self> {
317        use polars::io::avro::AvroReader;
318
319        let file = get_file_like(py_f, false)?;
320        py.enter_polars_df(move || {
321            AvroReader::new(file)
322                .with_projection(projection)
323                .with_columns(columns)
324                .with_n_rows(n_rows)
325                .finish()
326        })
327    }
328
329    #[cfg(feature = "csv")]
330    #[pyo3(signature = (
331        py_f, include_bom, include_header, separator, line_terminator, quote_char, batch_size,
332        datetime_format, date_format, time_format, float_scientific, float_precision, null_value,
333        quote_style, cloud_options, credential_provider, retries
334    ))]
335    pub fn write_csv(
336        &mut self,
337        py: Python,
338        py_f: PyObject,
339        include_bom: bool,
340        include_header: bool,
341        separator: u8,
342        line_terminator: String,
343        quote_char: u8,
344        batch_size: NonZeroUsize,
345        datetime_format: Option<String>,
346        date_format: Option<String>,
347        time_format: Option<String>,
348        float_scientific: Option<bool>,
349        float_precision: Option<usize>,
350        null_value: Option<String>,
351        quote_style: Option<Wrap<QuoteStyle>>,
352        cloud_options: Option<Vec<(String, String)>>,
353        credential_provider: Option<PyObject>,
354        retries: usize,
355    ) -> PyResult<()> {
356        let null = null_value.unwrap_or_default();
357
358        #[cfg(feature = "cloud")]
359        let cloud_options = if let Ok(path) = py_f.extract::<Cow<str>>(py) {
360            let cloud_options = parse_cloud_options(&path, cloud_options.unwrap_or_default())?;
361            Some(
362                cloud_options
363                    .with_max_retries(retries)
364                    .with_credential_provider(
365                        credential_provider.map(PlCredentialProvider::from_python_builder),
366                    ),
367            )
368        } else {
369            None
370        };
371
372        #[cfg(not(feature = "cloud"))]
373        let cloud_options = None;
374
375        let mut f = crate::file::try_get_writeable(py_f, cloud_options.as_ref())?;
376
377        py.enter_polars(|| {
378            CsvWriter::new(&mut f)
379                .include_bom(include_bom)
380                .include_header(include_header)
381                .with_separator(separator)
382                .with_line_terminator(line_terminator)
383                .with_quote_char(quote_char)
384                .with_batch_size(batch_size)
385                .with_datetime_format(datetime_format)
386                .with_date_format(date_format)
387                .with_time_format(time_format)
388                .with_float_scientific(float_scientific)
389                .with_float_precision(float_precision)
390                .with_null_value(null)
391                .with_quote_style(quote_style.map(|wrap| wrap.0).unwrap_or_default())
392                .finish(&mut self.df)?;
393
394            crate::file::close_file(f)
395        })
396    }
397
398    #[cfg(feature = "parquet")]
399    #[pyo3(signature = (
400        py_f, compression, compression_level, statistics, row_group_size, data_page_size,
401        partition_by, partition_chunk_size_bytes, cloud_options, credential_provider, retries
402    ))]
403    pub fn write_parquet(
404        &mut self,
405        py: Python,
406        py_f: PyObject,
407        compression: &str,
408        compression_level: Option<i32>,
409        statistics: Wrap<StatisticsOptions>,
410        row_group_size: Option<usize>,
411        data_page_size: Option<usize>,
412        partition_by: Option<Vec<String>>,
413        partition_chunk_size_bytes: usize,
414        cloud_options: Option<Vec<(String, String)>>,
415        credential_provider: Option<PyObject>,
416        retries: usize,
417    ) -> PyResult<()> {
418        use polars_io::partition::write_partitioned_dataset;
419
420        let compression = parse_parquet_compression(compression, compression_level)?;
421
422        #[cfg(feature = "cloud")]
423        let cloud_options = if let Ok(path) = py_f.extract::<Cow<str>>(py) {
424            let cloud_options = parse_cloud_options(&path, cloud_options.unwrap_or_default())?;
425            Some(
426                cloud_options
427                    .with_max_retries(retries)
428                    .with_credential_provider(
429                        credential_provider.map(PlCredentialProvider::from_python_builder),
430                    ),
431            )
432        } else {
433            None
434        };
435
436        #[cfg(not(feature = "cloud"))]
437        let cloud_options = None;
438
439        if let Some(partition_by) = partition_by {
440            let path = py_f.extract::<String>(py)?;
441
442            return py.enter_polars(|| {
443                let write_options = ParquetWriteOptions {
444                    compression,
445                    statistics: statistics.0,
446                    row_group_size,
447                    data_page_size,
448                };
449                write_partitioned_dataset(
450                    &mut self.df,
451                    std::path::Path::new(path.as_str()),
452                    partition_by.into_iter().map(|x| x.into()).collect(),
453                    &write_options,
454                    cloud_options.as_ref(),
455                    partition_chunk_size_bytes,
456                )
457            });
458        };
459
460        let mut f = crate::file::try_get_writeable(py_f, cloud_options.as_ref())?;
461
462        py.enter_polars(|| {
463            ParquetWriter::new(BufWriter::new(&mut f))
464                .with_compression(compression)
465                .with_statistics(statistics.0)
466                .with_row_group_size(row_group_size)
467                .with_data_page_size(data_page_size)
468                .finish(&mut self.df)?;
469
470            crate::file::close_file(f)
471        })
472    }
473
474    #[cfg(feature = "json")]
475    pub fn write_json(&mut self, py: Python, py_f: PyObject) -> PyResult<()> {
476        let file = BufWriter::new(get_file_like(py_f, true)?);
477        py.enter_polars(|| {
478            // TODO: Cloud support
479
480            JsonWriter::new(file)
481                .with_json_format(JsonFormat::Json)
482                .finish(&mut self.df)
483        })
484    }
485
486    #[cfg(feature = "json")]
487    pub fn write_ndjson(&mut self, py: Python, py_f: PyObject) -> PyResult<()> {
488        let file = BufWriter::new(get_file_like(py_f, true)?);
489
490        // TODO: Cloud support
491
492        py.enter_polars(|| {
493            // TODO: Cloud support
494
495            JsonWriter::new(file)
496                .with_json_format(JsonFormat::JsonLines)
497                .finish(&mut self.df)
498                .map_err(PyPolarsErr::from)
499        })
500    }
501
502    #[cfg(feature = "ipc")]
503    #[pyo3(signature = (
504        py_f, compression, compat_level, cloud_options, credential_provider, retries
505    ))]
506    pub fn write_ipc(
507        &mut self,
508        py: Python,
509        py_f: PyObject,
510        compression: Wrap<Option<IpcCompression>>,
511        compat_level: PyCompatLevel,
512        cloud_options: Option<Vec<(String, String)>>,
513        credential_provider: Option<PyObject>,
514        retries: usize,
515    ) -> PyResult<()> {
516        #[cfg(feature = "cloud")]
517        let cloud_options = if let Ok(path) = py_f.extract::<Cow<str>>(py) {
518            let cloud_options = parse_cloud_options(&path, cloud_options.unwrap_or_default())?;
519            Some(
520                cloud_options
521                    .with_max_retries(retries)
522                    .with_credential_provider(
523                        credential_provider.map(PlCredentialProvider::from_python_builder),
524                    ),
525            )
526        } else {
527            None
528        };
529
530        #[cfg(not(feature = "cloud"))]
531        let cloud_options = None;
532
533        let mut f = crate::file::try_get_writeable(py_f, cloud_options.as_ref())?;
534
535        py.enter_polars(|| {
536            IpcWriter::new(&mut f)
537                .with_compression(compression.0)
538                .with_compat_level(compat_level.0)
539                .finish(&mut self.df)?;
540
541            crate::file::close_file(f)
542        })
543    }
544
545    #[cfg(feature = "ipc_streaming")]
546    pub fn write_ipc_stream(
547        &mut self,
548        py: Python,
549        py_f: PyObject,
550        compression: Wrap<Option<IpcCompression>>,
551        compat_level: PyCompatLevel,
552    ) -> PyResult<()> {
553        let mut buf = get_file_like(py_f, true)?;
554        py.enter_polars(|| {
555            IpcStreamWriter::new(&mut buf)
556                .with_compression(compression.0)
557                .with_compat_level(compat_level.0)
558                .finish(&mut self.df)
559        })
560    }
561
562    #[cfg(feature = "avro")]
563    #[pyo3(signature = (py_f, compression, name))]
564    pub fn write_avro(
565        &mut self,
566        py: Python,
567        py_f: PyObject,
568        compression: Wrap<Option<AvroCompression>>,
569        name: String,
570    ) -> PyResult<()> {
571        use polars::io::avro::AvroWriter;
572        let mut buf = get_file_like(py_f, true)?;
573        py.enter_polars(|| {
574            AvroWriter::new(&mut buf)
575                .with_compression(compression.0)
576                .with_name(name)
577                .finish(&mut self.df)
578        })
579    }
580}