polars_python/dataframe/
io.rs

1use std::borrow::Cow;
2use std::io::BufWriter;
3use std::num::NonZeroUsize;
4use std::sync::Arc;
5
6#[cfg(feature = "cloud")]
7use cloud::credential_provider::PlCredentialProvider;
8#[cfg(feature = "avro")]
9use polars::io::avro::AvroCompression;
10use polars::io::RowIndex;
11use polars::prelude::*;
12#[cfg(feature = "parquet")]
13use polars_parquet::arrow::write::StatisticsOptions;
14use pyo3::prelude::*;
15use pyo3::pybacked::PyBackedStr;
16
17use super::PyDataFrame;
18#[cfg(feature = "parquet")]
19use crate::conversion::parse_parquet_compression;
20use crate::conversion::Wrap;
21use crate::error::PyPolarsErr;
22use crate::file::{
23    get_either_file, get_file_like, get_mmap_bytes_reader, get_mmap_bytes_reader_and_path,
24    EitherRustPythonFile,
25};
26#[cfg(feature = "cloud")]
27use crate::prelude::parse_cloud_options;
28use crate::prelude::PyCompatLevel;
29
30#[pymethods]
31impl PyDataFrame {
32    #[staticmethod]
33    #[cfg(feature = "csv")]
34    #[pyo3(signature = (
35    py_f, infer_schema_length, chunk_size, has_header, ignore_errors, n_rows,
36    skip_rows, skip_lines, projection, separator, rechunk, columns, encoding, n_threads, path,
37    overwrite_dtype, overwrite_dtype_slice, low_memory, comment_prefix, quote_char,
38    null_values, missing_utf8_is_empty_string, try_parse_dates, skip_rows_after_header,
39    row_index, eol_char, raise_if_empty, truncate_ragged_lines, decimal_comma, schema)
40)]
41    pub fn read_csv(
42        py: Python,
43        py_f: Bound<PyAny>,
44        infer_schema_length: Option<usize>,
45        chunk_size: usize,
46        has_header: bool,
47        ignore_errors: bool,
48        n_rows: Option<usize>,
49        skip_rows: usize,
50        skip_lines: usize,
51        projection: Option<Vec<usize>>,
52        separator: &str,
53        rechunk: bool,
54        columns: Option<Vec<String>>,
55        encoding: Wrap<CsvEncoding>,
56        n_threads: Option<usize>,
57        path: Option<String>,
58        overwrite_dtype: Option<Vec<(PyBackedStr, Wrap<DataType>)>>,
59        overwrite_dtype_slice: Option<Vec<Wrap<DataType>>>,
60        low_memory: bool,
61        comment_prefix: Option<&str>,
62        quote_char: Option<&str>,
63        null_values: Option<Wrap<NullValues>>,
64        missing_utf8_is_empty_string: bool,
65        try_parse_dates: bool,
66        skip_rows_after_header: usize,
67        row_index: Option<(String, IdxSize)>,
68        eol_char: &str,
69        raise_if_empty: bool,
70        truncate_ragged_lines: bool,
71        decimal_comma: bool,
72        schema: Option<Wrap<Schema>>,
73    ) -> PyResult<Self> {
74        let null_values = null_values.map(|w| w.0);
75        let eol_char = eol_char.as_bytes()[0];
76        let row_index = row_index.map(|(name, offset)| RowIndex {
77            name: name.into(),
78            offset,
79        });
80        let quote_char = quote_char.and_then(|s| s.as_bytes().first().copied());
81
82        let overwrite_dtype = overwrite_dtype.map(|overwrite_dtype| {
83            overwrite_dtype
84                .iter()
85                .map(|(name, dtype)| {
86                    let dtype = dtype.0.clone();
87                    Field::new((&**name).into(), dtype)
88                })
89                .collect::<Schema>()
90        });
91
92        let overwrite_dtype_slice = overwrite_dtype_slice.map(|overwrite_dtype| {
93            overwrite_dtype
94                .iter()
95                .map(|dt| dt.0.clone())
96                .collect::<Vec<_>>()
97        });
98
99        let mmap_bytes_r = get_mmap_bytes_reader(&py_f)?;
100        let df = py.allow_threads(move || {
101            CsvReadOptions::default()
102                .with_path(path)
103                .with_infer_schema_length(infer_schema_length)
104                .with_has_header(has_header)
105                .with_n_rows(n_rows)
106                .with_skip_rows(skip_rows)
107                .with_skip_lines(skip_lines)
108                .with_ignore_errors(ignore_errors)
109                .with_projection(projection.map(Arc::new))
110                .with_rechunk(rechunk)
111                .with_chunk_size(chunk_size)
112                .with_columns(columns.map(|x| x.into_iter().map(|x| x.into()).collect()))
113                .with_n_threads(n_threads)
114                .with_schema_overwrite(overwrite_dtype.map(Arc::new))
115                .with_dtype_overwrite(overwrite_dtype_slice.map(Arc::new))
116                .with_schema(schema.map(|schema| Arc::new(schema.0)))
117                .with_low_memory(low_memory)
118                .with_skip_rows_after_header(skip_rows_after_header)
119                .with_row_index(row_index)
120                .with_raise_if_empty(raise_if_empty)
121                .with_parse_options(
122                    CsvParseOptions::default()
123                        .with_separator(separator.as_bytes()[0])
124                        .with_encoding(encoding.0)
125                        .with_missing_is_null(!missing_utf8_is_empty_string)
126                        .with_comment_prefix(comment_prefix)
127                        .with_null_values(null_values)
128                        .with_try_parse_dates(try_parse_dates)
129                        .with_quote_char(quote_char)
130                        .with_eol_char(eol_char)
131                        .with_truncate_ragged_lines(truncate_ragged_lines)
132                        .with_decimal_comma(decimal_comma),
133                )
134                .into_reader_with_file_handle(mmap_bytes_r)
135                .finish()
136                .map_err(PyPolarsErr::from)
137        })?;
138        Ok(df.into())
139    }
140
141    #[staticmethod]
142    #[cfg(feature = "parquet")]
143    #[pyo3(signature = (py_f, columns, projection, n_rows, row_index, low_memory, parallel, use_statistics, rechunk))]
144    pub fn read_parquet(
145        py: Python,
146        py_f: PyObject,
147        columns: Option<Vec<String>>,
148        projection: Option<Vec<usize>>,
149        n_rows: Option<usize>,
150        row_index: Option<(String, IdxSize)>,
151        low_memory: bool,
152        parallel: Wrap<ParallelStrategy>,
153        use_statistics: bool,
154        rechunk: bool,
155    ) -> PyResult<Self> {
156        use EitherRustPythonFile::*;
157
158        let row_index = row_index.map(|(name, offset)| RowIndex {
159            name: name.into(),
160            offset,
161        });
162
163        let result = match get_either_file(py_f, false)? {
164            Py(f) => {
165                let buf = std::io::Cursor::new(f.to_memslice());
166                py.allow_threads(move || {
167                    ParquetReader::new(buf)
168                        .with_projection(projection)
169                        .with_columns(columns)
170                        .read_parallel(parallel.0)
171                        .with_slice(n_rows.map(|x| (0, x)))
172                        .with_row_index(row_index)
173                        .set_low_memory(low_memory)
174                        .use_statistics(use_statistics)
175                        .set_rechunk(rechunk)
176                        .finish()
177                })
178            },
179            Rust(f) => py.allow_threads(move || {
180                ParquetReader::new(f)
181                    .with_projection(projection)
182                    .with_columns(columns)
183                    .read_parallel(parallel.0)
184                    .with_slice(n_rows.map(|x| (0, x)))
185                    .with_row_index(row_index)
186                    .use_statistics(use_statistics)
187                    .set_rechunk(rechunk)
188                    .finish()
189            }),
190        };
191        let df = result.map_err(PyPolarsErr::from)?;
192        Ok(PyDataFrame::new(df))
193    }
194
195    #[staticmethod]
196    #[cfg(feature = "json")]
197    #[pyo3(signature = (py_f, infer_schema_length, schema, schema_overrides))]
198    pub fn read_json(
199        py: Python,
200        py_f: Bound<PyAny>,
201        infer_schema_length: Option<usize>,
202        schema: Option<Wrap<Schema>>,
203        schema_overrides: Option<Wrap<Schema>>,
204    ) -> PyResult<Self> {
205        assert!(infer_schema_length != Some(0));
206        let mmap_bytes_r = get_mmap_bytes_reader(&py_f)?;
207
208        py.allow_threads(move || {
209            let mut builder = JsonReader::new(mmap_bytes_r)
210                .with_json_format(JsonFormat::Json)
211                .infer_schema_len(infer_schema_length.and_then(NonZeroUsize::new));
212
213            if let Some(schema) = schema {
214                builder = builder.with_schema(Arc::new(schema.0));
215            }
216
217            if let Some(schema) = schema_overrides.as_ref() {
218                builder = builder.with_schema_overwrite(&schema.0);
219            }
220
221            let out = builder.finish().map_err(PyPolarsErr::from)?;
222            Ok(out.into())
223        })
224    }
225
226    #[staticmethod]
227    #[cfg(feature = "json")]
228    #[pyo3(signature = (py_f, ignore_errors, schema, schema_overrides))]
229    pub fn read_ndjson(
230        py: Python,
231        py_f: Bound<PyAny>,
232        ignore_errors: bool,
233        schema: Option<Wrap<Schema>>,
234        schema_overrides: Option<Wrap<Schema>>,
235    ) -> PyResult<Self> {
236        let mmap_bytes_r = get_mmap_bytes_reader(&py_f)?;
237
238        let mut builder = JsonReader::new(mmap_bytes_r)
239            .with_json_format(JsonFormat::JsonLines)
240            .with_ignore_errors(ignore_errors);
241
242        if let Some(schema) = schema {
243            builder = builder.with_schema(Arc::new(schema.0));
244        }
245
246        if let Some(schema) = schema_overrides.as_ref() {
247            builder = builder.with_schema_overwrite(&schema.0);
248        }
249
250        let out = py
251            .allow_threads(move || builder.finish())
252            .map_err(|e| PyPolarsErr::Other(format!("{e}")))?;
253        Ok(out.into())
254    }
255
256    #[staticmethod]
257    #[cfg(feature = "ipc")]
258    #[pyo3(signature = (py_f, columns, projection, n_rows, row_index, memory_map))]
259    pub fn read_ipc(
260        py: Python,
261        py_f: Bound<PyAny>,
262        columns: Option<Vec<String>>,
263        projection: Option<Vec<usize>>,
264        n_rows: Option<usize>,
265        row_index: Option<(String, IdxSize)>,
266        memory_map: bool,
267    ) -> PyResult<Self> {
268        let row_index = row_index.map(|(name, offset)| RowIndex {
269            name: name.into(),
270            offset,
271        });
272        let (mmap_bytes_r, mmap_path) = get_mmap_bytes_reader_and_path(&py_f)?;
273
274        let mmap_path = if memory_map { mmap_path } else { None };
275        let df = py.allow_threads(move || {
276            IpcReader::new(mmap_bytes_r)
277                .with_projection(projection)
278                .with_columns(columns)
279                .with_n_rows(n_rows)
280                .with_row_index(row_index)
281                .memory_mapped(mmap_path)
282                .finish()
283                .map_err(PyPolarsErr::from)
284        })?;
285        Ok(PyDataFrame::new(df))
286    }
287
288    #[staticmethod]
289    #[cfg(feature = "ipc_streaming")]
290    #[pyo3(signature = (py_f, columns, projection, n_rows, row_index, rechunk))]
291    pub fn read_ipc_stream(
292        py: Python,
293        py_f: Bound<PyAny>,
294        columns: Option<Vec<String>>,
295        projection: Option<Vec<usize>>,
296        n_rows: Option<usize>,
297        row_index: Option<(String, IdxSize)>,
298        rechunk: bool,
299    ) -> PyResult<Self> {
300        let row_index = row_index.map(|(name, offset)| RowIndex {
301            name: name.into(),
302            offset,
303        });
304        let mmap_bytes_r = get_mmap_bytes_reader(&py_f)?;
305        let df = py.allow_threads(move || {
306            IpcStreamReader::new(mmap_bytes_r)
307                .with_projection(projection)
308                .with_columns(columns)
309                .with_n_rows(n_rows)
310                .with_row_index(row_index)
311                .set_rechunk(rechunk)
312                .finish()
313                .map_err(PyPolarsErr::from)
314        })?;
315        Ok(PyDataFrame::new(df))
316    }
317
318    #[staticmethod]
319    #[cfg(feature = "avro")]
320    #[pyo3(signature = (py_f, columns, projection, n_rows))]
321    pub fn read_avro(
322        py: Python,
323        py_f: PyObject,
324        columns: Option<Vec<String>>,
325        projection: Option<Vec<usize>>,
326        n_rows: Option<usize>,
327    ) -> PyResult<Self> {
328        use polars::io::avro::AvroReader;
329
330        let file = get_file_like(py_f, false)?;
331        let df = py.allow_threads(move || {
332            AvroReader::new(file)
333                .with_projection(projection)
334                .with_columns(columns)
335                .with_n_rows(n_rows)
336                .finish()
337                .map_err(PyPolarsErr::from)
338        })?;
339        Ok(PyDataFrame::new(df))
340    }
341
342    #[cfg(feature = "csv")]
343    #[pyo3(signature = (
344        py_f, include_bom, include_header, separator, line_terminator, quote_char, batch_size,
345        datetime_format, date_format, time_format, float_scientific, float_precision, null_value,
346        quote_style, cloud_options, credential_provider, retries
347    ))]
348    pub fn write_csv(
349        &mut self,
350        py: Python,
351        py_f: PyObject,
352        include_bom: bool,
353        include_header: bool,
354        separator: u8,
355        line_terminator: String,
356        quote_char: u8,
357        batch_size: NonZeroUsize,
358        datetime_format: Option<String>,
359        date_format: Option<String>,
360        time_format: Option<String>,
361        float_scientific: Option<bool>,
362        float_precision: Option<usize>,
363        null_value: Option<String>,
364        quote_style: Option<Wrap<QuoteStyle>>,
365        cloud_options: Option<Vec<(String, String)>>,
366        credential_provider: Option<PyObject>,
367        retries: usize,
368    ) -> PyResult<()> {
369        let null = null_value.unwrap_or_default();
370
371        #[cfg(feature = "cloud")]
372        let cloud_options = if let Ok(path) = py_f.extract::<Cow<str>>(py) {
373            let cloud_options = parse_cloud_options(&path, cloud_options.unwrap_or_default())?;
374            Some(
375                cloud_options
376                    .with_max_retries(retries)
377                    .with_credential_provider(
378                        credential_provider.map(PlCredentialProvider::from_python_func_object),
379                    ),
380            )
381        } else {
382            None
383        };
384
385        #[cfg(not(feature = "cloud"))]
386        let cloud_options = None;
387
388        let f = crate::file::try_get_writeable(py_f, cloud_options.as_ref())?;
389
390        py.allow_threads(|| {
391            CsvWriter::new(f)
392                .include_bom(include_bom)
393                .include_header(include_header)
394                .with_separator(separator)
395                .with_line_terminator(line_terminator)
396                .with_quote_char(quote_char)
397                .with_batch_size(batch_size)
398                .with_datetime_format(datetime_format)
399                .with_date_format(date_format)
400                .with_time_format(time_format)
401                .with_float_scientific(float_scientific)
402                .with_float_precision(float_precision)
403                .with_null_value(null)
404                .with_quote_style(quote_style.map(|wrap| wrap.0).unwrap_or_default())
405                .finish(&mut self.df)
406                .map_err(PyPolarsErr::from)
407        })?;
408        Ok(())
409    }
410
411    #[cfg(feature = "parquet")]
412    #[pyo3(signature = (
413        py_f, compression, compression_level, statistics, row_group_size, data_page_size,
414        partition_by, partition_chunk_size_bytes, cloud_options, credential_provider, retries
415    ))]
416    pub fn write_parquet(
417        &mut self,
418        py: Python,
419        py_f: PyObject,
420        compression: &str,
421        compression_level: Option<i32>,
422        statistics: Wrap<StatisticsOptions>,
423        row_group_size: Option<usize>,
424        data_page_size: Option<usize>,
425        partition_by: Option<Vec<String>>,
426        partition_chunk_size_bytes: usize,
427        cloud_options: Option<Vec<(String, String)>>,
428        credential_provider: Option<PyObject>,
429        retries: usize,
430    ) -> PyResult<()> {
431        use polars_io::partition::write_partitioned_dataset;
432
433        let compression = parse_parquet_compression(compression, compression_level)?;
434
435        #[cfg(feature = "cloud")]
436        let cloud_options = if let Ok(path) = py_f.extract::<Cow<str>>(py) {
437            let cloud_options = parse_cloud_options(&path, cloud_options.unwrap_or_default())?;
438            Some(
439                cloud_options
440                    .with_max_retries(retries)
441                    .with_credential_provider(
442                        credential_provider.map(PlCredentialProvider::from_python_func_object),
443                    ),
444            )
445        } else {
446            None
447        };
448
449        #[cfg(not(feature = "cloud"))]
450        let cloud_options = None;
451
452        if let Some(partition_by) = partition_by {
453            let path = py_f.extract::<String>(py)?;
454
455            py.allow_threads(|| {
456                let write_options = ParquetWriteOptions {
457                    compression,
458                    statistics: statistics.0,
459                    row_group_size,
460                    data_page_size,
461                    maintain_order: true,
462                };
463                write_partitioned_dataset(
464                    &mut self.df,
465                    std::path::Path::new(path.as_str()),
466                    partition_by.into_iter().map(|x| x.into()).collect(),
467                    &write_options,
468                    cloud_options.as_ref(),
469                    partition_chunk_size_bytes,
470                )
471                .map_err(PyPolarsErr::from)
472            })?;
473
474            return Ok(());
475        };
476
477        let f = crate::file::try_get_writeable(py_f, cloud_options.as_ref())?;
478
479        py.allow_threads(|| {
480            ParquetWriter::new(BufWriter::new(f))
481                .with_compression(compression)
482                .with_statistics(statistics.0)
483                .with_row_group_size(row_group_size)
484                .with_data_page_size(data_page_size)
485                .finish(&mut self.df)
486                .map_err(PyPolarsErr::from)
487        })?;
488        Ok(())
489    }
490
491    #[cfg(feature = "json")]
492    pub fn write_json(&mut self, py_f: PyObject) -> PyResult<()> {
493        let file = BufWriter::new(get_file_like(py_f, true)?);
494
495        // TODO: Cloud support
496
497        JsonWriter::new(file)
498            .with_json_format(JsonFormat::Json)
499            .finish(&mut self.df)
500            .map_err(PyPolarsErr::from)?;
501        Ok(())
502    }
503
504    #[cfg(feature = "json")]
505    pub fn write_ndjson(&mut self, py_f: PyObject) -> PyResult<()> {
506        let file = BufWriter::new(get_file_like(py_f, true)?);
507
508        // TODO: Cloud support
509
510        JsonWriter::new(file)
511            .with_json_format(JsonFormat::JsonLines)
512            .finish(&mut self.df)
513            .map_err(PyPolarsErr::from)?;
514
515        Ok(())
516    }
517
518    #[cfg(feature = "ipc")]
519    #[pyo3(signature = (
520        py_f, compression, compat_level, cloud_options, credential_provider, retries
521    ))]
522    pub fn write_ipc(
523        &mut self,
524        py: Python,
525        py_f: PyObject,
526        compression: Wrap<Option<IpcCompression>>,
527        compat_level: PyCompatLevel,
528        cloud_options: Option<Vec<(String, String)>>,
529        credential_provider: Option<PyObject>,
530        retries: usize,
531    ) -> PyResult<()> {
532        #[cfg(feature = "cloud")]
533        let cloud_options = if let Ok(path) = py_f.extract::<Cow<str>>(py) {
534            let cloud_options = parse_cloud_options(&path, cloud_options.unwrap_or_default())?;
535            Some(
536                cloud_options
537                    .with_max_retries(retries)
538                    .with_credential_provider(
539                        credential_provider.map(PlCredentialProvider::from_python_func_object),
540                    ),
541            )
542        } else {
543            None
544        };
545
546        #[cfg(not(feature = "cloud"))]
547        let cloud_options = None;
548
549        let f = crate::file::try_get_writeable(py_f, cloud_options.as_ref())?;
550
551        py.allow_threads(|| {
552            IpcWriter::new(f)
553                .with_compression(compression.0)
554                .with_compat_level(compat_level.0)
555                .finish(&mut self.df)
556                .map_err(PyPolarsErr::from)
557        })?;
558        Ok(())
559    }
560
561    #[cfg(feature = "ipc_streaming")]
562    pub fn write_ipc_stream(
563        &mut self,
564        py: Python,
565        py_f: PyObject,
566        compression: Wrap<Option<IpcCompression>>,
567        compat_level: PyCompatLevel,
568    ) -> PyResult<()> {
569        let mut buf = get_file_like(py_f, true)?;
570        py.allow_threads(|| {
571            IpcStreamWriter::new(&mut buf)
572                .with_compression(compression.0)
573                .with_compat_level(compat_level.0)
574                .finish(&mut self.df)
575                .map_err(PyPolarsErr::from)
576        })?;
577        Ok(())
578    }
579
580    #[cfg(feature = "avro")]
581    #[pyo3(signature = (py_f, compression, name))]
582    pub fn write_avro(
583        &mut self,
584        py: Python,
585        py_f: PyObject,
586        compression: Wrap<Option<AvroCompression>>,
587        name: String,
588    ) -> PyResult<()> {
589        use polars::io::avro::AvroWriter;
590        let mut buf = get_file_like(py_f, true)?;
591        py.allow_threads(|| {
592            AvroWriter::new(&mut buf)
593                .with_compression(compression.0)
594                .with_name(name)
595                .finish(&mut self.df)
596                .map_err(PyPolarsErr::from)
597        })?;
598        Ok(())
599    }
600}