polars_python/dataframe/
io.rs

1use std::io::BufWriter;
2use std::num::NonZeroUsize;
3use std::sync::Arc;
4
5use polars::io::RowIndex;
6#[cfg(feature = "avro")]
7use polars::io::avro::AvroCompression;
8use polars::prelude::*;
9use pyo3::prelude::*;
10use pyo3::pybacked::PyBackedStr;
11
12use super::PyDataFrame;
13use crate::conversion::Wrap;
14use crate::file::{
15    EitherRustPythonFile, get_either_file, get_file_like, get_mmap_bytes_reader,
16    get_mmap_bytes_reader_and_path,
17};
18use crate::prelude::PyCompatLevel;
19use crate::utils::EnterPolarsExt;
20
21#[pymethods]
22impl PyDataFrame {
23    #[staticmethod]
24    #[cfg(feature = "csv")]
25    #[pyo3(signature = (
26    py_f, infer_schema_length, chunk_size, has_header, ignore_errors, n_rows,
27    skip_rows, skip_lines, projection, separator, rechunk, columns, encoding, n_threads, path,
28    overwrite_dtype, overwrite_dtype_slice, low_memory, comment_prefix, quote_char,
29    null_values, missing_utf8_is_empty_string, try_parse_dates, skip_rows_after_header,
30    row_index, eol_char, raise_if_empty, truncate_ragged_lines, decimal_comma, schema)
31)]
32    pub fn read_csv(
33        py: Python<'_>,
34        py_f: Bound<PyAny>,
35        infer_schema_length: Option<usize>,
36        chunk_size: usize,
37        has_header: bool,
38        ignore_errors: bool,
39        n_rows: Option<usize>,
40        skip_rows: usize,
41        skip_lines: usize,
42        projection: Option<Vec<usize>>,
43        separator: &str,
44        rechunk: bool,
45        columns: Option<Vec<String>>,
46        encoding: Wrap<CsvEncoding>,
47        n_threads: Option<usize>,
48        path: Option<String>,
49        overwrite_dtype: Option<Vec<(PyBackedStr, Wrap<DataType>)>>,
50        overwrite_dtype_slice: Option<Vec<Wrap<DataType>>>,
51        low_memory: bool,
52        comment_prefix: Option<&str>,
53        quote_char: Option<&str>,
54        null_values: Option<Wrap<NullValues>>,
55        missing_utf8_is_empty_string: bool,
56        try_parse_dates: bool,
57        skip_rows_after_header: usize,
58        row_index: Option<(String, IdxSize)>,
59        eol_char: &str,
60        raise_if_empty: bool,
61        truncate_ragged_lines: bool,
62        decimal_comma: bool,
63        schema: Option<Wrap<Schema>>,
64    ) -> PyResult<Self> {
65        let null_values = null_values.map(|w| w.0);
66        let eol_char = eol_char.as_bytes()[0];
67        let row_index = row_index.map(|(name, offset)| RowIndex {
68            name: name.into(),
69            offset,
70        });
71        let quote_char = quote_char.and_then(|s| s.as_bytes().first().copied());
72
73        let overwrite_dtype = overwrite_dtype.map(|overwrite_dtype| {
74            overwrite_dtype
75                .iter()
76                .map(|(name, dtype)| {
77                    let dtype = dtype.0.clone();
78                    Field::new((&**name).into(), dtype)
79                })
80                .collect::<Schema>()
81        });
82
83        let overwrite_dtype_slice = overwrite_dtype_slice.map(|overwrite_dtype| {
84            overwrite_dtype
85                .iter()
86                .map(|dt| dt.0.clone())
87                .collect::<Vec<_>>()
88        });
89
90        let mmap_bytes_r = get_mmap_bytes_reader(&py_f)?;
91        py.enter_polars_df(move || {
92            CsvReadOptions::default()
93                .with_path(path)
94                .with_infer_schema_length(infer_schema_length)
95                .with_has_header(has_header)
96                .with_n_rows(n_rows)
97                .with_skip_rows(skip_rows)
98                .with_skip_lines(skip_lines)
99                .with_ignore_errors(ignore_errors)
100                .with_projection(projection.map(Arc::new))
101                .with_rechunk(rechunk)
102                .with_chunk_size(chunk_size)
103                .with_columns(columns.map(|x| x.into_iter().map(|x| x.into()).collect()))
104                .with_n_threads(n_threads)
105                .with_schema_overwrite(overwrite_dtype.map(Arc::new))
106                .with_dtype_overwrite(overwrite_dtype_slice.map(Arc::new))
107                .with_schema(schema.map(|schema| Arc::new(schema.0)))
108                .with_low_memory(low_memory)
109                .with_skip_rows_after_header(skip_rows_after_header)
110                .with_row_index(row_index)
111                .with_raise_if_empty(raise_if_empty)
112                .with_parse_options(
113                    CsvParseOptions::default()
114                        .with_separator(separator.as_bytes()[0])
115                        .with_encoding(encoding.0)
116                        .with_missing_is_null(!missing_utf8_is_empty_string)
117                        .with_comment_prefix(comment_prefix)
118                        .with_null_values(null_values)
119                        .with_try_parse_dates(try_parse_dates)
120                        .with_quote_char(quote_char)
121                        .with_eol_char(eol_char)
122                        .with_truncate_ragged_lines(truncate_ragged_lines)
123                        .with_decimal_comma(decimal_comma),
124                )
125                .into_reader_with_file_handle(mmap_bytes_r)
126                .finish()
127        })
128    }
129
130    #[staticmethod]
131    #[cfg(feature = "parquet")]
132    #[pyo3(signature = (py_f, columns, projection, n_rows, row_index, low_memory, parallel, use_statistics, rechunk))]
133    pub fn read_parquet(
134        py: Python<'_>,
135        py_f: PyObject,
136        columns: Option<Vec<String>>,
137        projection: Option<Vec<usize>>,
138        n_rows: Option<usize>,
139        row_index: Option<(String, IdxSize)>,
140        low_memory: bool,
141        parallel: Wrap<ParallelStrategy>,
142        use_statistics: bool,
143        rechunk: bool,
144    ) -> PyResult<Self> {
145        use EitherRustPythonFile::*;
146
147        let row_index = row_index.map(|(name, offset)| RowIndex {
148            name: name.into(),
149            offset,
150        });
151
152        _ = use_statistics;
153
154        match get_either_file(py_f, false)? {
155            Py(f) => {
156                let buf = std::io::Cursor::new(f.to_memslice());
157                py.enter_polars_df(move || {
158                    ParquetReader::new(buf)
159                        .with_projection(projection)
160                        .with_columns(columns)
161                        .read_parallel(parallel.0)
162                        .with_slice(n_rows.map(|x| (0, x)))
163                        .with_row_index(row_index)
164                        .set_low_memory(low_memory)
165                        .set_rechunk(rechunk)
166                        .finish()
167                })
168            },
169            Rust(f) => py.enter_polars_df(move || {
170                ParquetReader::new(f)
171                    .with_projection(projection)
172                    .with_columns(columns)
173                    .read_parallel(parallel.0)
174                    .with_slice(n_rows.map(|x| (0, x)))
175                    .with_row_index(row_index)
176                    .set_rechunk(rechunk)
177                    .finish()
178            }),
179        }
180    }
181
182    #[staticmethod]
183    #[cfg(feature = "json")]
184    #[pyo3(signature = (py_f, infer_schema_length, schema, schema_overrides))]
185    pub fn read_json(
186        py: Python<'_>,
187        py_f: Bound<PyAny>,
188        infer_schema_length: Option<usize>,
189        schema: Option<Wrap<Schema>>,
190        schema_overrides: Option<Wrap<Schema>>,
191    ) -> PyResult<Self> {
192        assert!(infer_schema_length != Some(0));
193        let mmap_bytes_r = get_mmap_bytes_reader(&py_f)?;
194
195        py.enter_polars_df(move || {
196            let mut builder = JsonReader::new(mmap_bytes_r)
197                .with_json_format(JsonFormat::Json)
198                .infer_schema_len(infer_schema_length.and_then(NonZeroUsize::new));
199
200            if let Some(schema) = schema {
201                builder = builder.with_schema(Arc::new(schema.0));
202            }
203
204            if let Some(schema) = schema_overrides.as_ref() {
205                builder = builder.with_schema_overwrite(&schema.0);
206            }
207
208            builder.finish()
209        })
210    }
211
212    #[staticmethod]
213    #[cfg(feature = "json")]
214    #[pyo3(signature = (py_f, ignore_errors, schema, schema_overrides))]
215    pub fn read_ndjson(
216        py: Python<'_>,
217        py_f: Bound<PyAny>,
218        ignore_errors: bool,
219        schema: Option<Wrap<Schema>>,
220        schema_overrides: Option<Wrap<Schema>>,
221    ) -> PyResult<Self> {
222        let mmap_bytes_r = get_mmap_bytes_reader(&py_f)?;
223
224        let mut builder = JsonReader::new(mmap_bytes_r)
225            .with_json_format(JsonFormat::JsonLines)
226            .with_ignore_errors(ignore_errors);
227
228        if let Some(schema) = schema {
229            builder = builder.with_schema(Arc::new(schema.0));
230        }
231
232        if let Some(schema) = schema_overrides.as_ref() {
233            builder = builder.with_schema_overwrite(&schema.0);
234        }
235
236        py.enter_polars_df(move || builder.finish())
237    }
238
239    #[staticmethod]
240    #[cfg(feature = "ipc")]
241    #[pyo3(signature = (py_f, columns, projection, n_rows, row_index, memory_map))]
242    pub fn read_ipc(
243        py: Python<'_>,
244        py_f: Bound<PyAny>,
245        columns: Option<Vec<String>>,
246        projection: Option<Vec<usize>>,
247        n_rows: Option<usize>,
248        row_index: Option<(String, IdxSize)>,
249        memory_map: bool,
250    ) -> PyResult<Self> {
251        let row_index = row_index.map(|(name, offset)| RowIndex {
252            name: name.into(),
253            offset,
254        });
255        let (mmap_bytes_r, mmap_path) = get_mmap_bytes_reader_and_path(&py_f)?;
256
257        let mmap_path = if memory_map { mmap_path } else { None };
258        py.enter_polars_df(move || {
259            IpcReader::new(mmap_bytes_r)
260                .with_projection(projection)
261                .with_columns(columns)
262                .with_n_rows(n_rows)
263                .with_row_index(row_index)
264                .memory_mapped(mmap_path)
265                .finish()
266        })
267    }
268
269    #[staticmethod]
270    #[cfg(feature = "ipc_streaming")]
271    #[pyo3(signature = (py_f, columns, projection, n_rows, row_index, rechunk))]
272    pub fn read_ipc_stream(
273        py: Python<'_>,
274        py_f: Bound<PyAny>,
275        columns: Option<Vec<String>>,
276        projection: Option<Vec<usize>>,
277        n_rows: Option<usize>,
278        row_index: Option<(String, IdxSize)>,
279        rechunk: bool,
280    ) -> PyResult<Self> {
281        let row_index = row_index.map(|(name, offset)| RowIndex {
282            name: name.into(),
283            offset,
284        });
285        let mmap_bytes_r = get_mmap_bytes_reader(&py_f)?;
286        py.enter_polars_df(move || {
287            IpcStreamReader::new(mmap_bytes_r)
288                .with_projection(projection)
289                .with_columns(columns)
290                .with_n_rows(n_rows)
291                .with_row_index(row_index)
292                .set_rechunk(rechunk)
293                .finish()
294        })
295    }
296
297    #[staticmethod]
298    #[cfg(feature = "avro")]
299    #[pyo3(signature = (py_f, columns, projection, n_rows))]
300    pub fn read_avro(
301        py: Python<'_>,
302        py_f: PyObject,
303        columns: Option<Vec<String>>,
304        projection: Option<Vec<usize>>,
305        n_rows: Option<usize>,
306    ) -> PyResult<Self> {
307        use polars::io::avro::AvroReader;
308
309        let file = get_file_like(py_f, false)?;
310        py.enter_polars_df(move || {
311            AvroReader::new(file)
312                .with_projection(projection)
313                .with_columns(columns)
314                .with_n_rows(n_rows)
315                .finish()
316        })
317    }
318
319    #[cfg(feature = "json")]
320    pub fn write_json(&mut self, py: Python<'_>, py_f: PyObject) -> PyResult<()> {
321        let file = BufWriter::new(get_file_like(py_f, true)?);
322        py.enter_polars(|| {
323            // TODO: Cloud support
324
325            JsonWriter::new(file)
326                .with_json_format(JsonFormat::Json)
327                .finish(&mut self.df)
328        })
329    }
330
331    #[cfg(feature = "ipc_streaming")]
332    pub fn write_ipc_stream(
333        &mut self,
334        py: Python<'_>,
335        py_f: PyObject,
336        compression: Wrap<Option<IpcCompression>>,
337        compat_level: PyCompatLevel,
338    ) -> PyResult<()> {
339        let mut buf = get_file_like(py_f, true)?;
340        py.enter_polars(|| {
341            IpcStreamWriter::new(&mut buf)
342                .with_compression(compression.0)
343                .with_compat_level(compat_level.0)
344                .finish(&mut self.df)
345        })
346    }
347
348    #[cfg(feature = "avro")]
349    #[pyo3(signature = (py_f, compression, name))]
350    pub fn write_avro(
351        &mut self,
352        py: Python<'_>,
353        py_f: PyObject,
354        compression: Wrap<Option<AvroCompression>>,
355        name: String,
356    ) -> PyResult<()> {
357        use polars::io::avro::AvroWriter;
358        let mut buf = get_file_like(py_f, true)?;
359        py.enter_polars(|| {
360            AvroWriter::new(&mut buf)
361                .with_compression(compression.0)
362                .with_name(name)
363                .finish(&mut self.df)
364        })
365    }
366}