polars_python/dataframe/
io.rs

1use std::io::BufWriter;
2use std::num::NonZeroUsize;
3use std::sync::Arc;
4
5use polars::io::RowIndex;
6#[cfg(feature = "avro")]
7use polars::io::avro::AvroCompression;
8use polars::prelude::*;
9use pyo3::prelude::*;
10use pyo3::pybacked::PyBackedStr;
11
12use super::PyDataFrame;
13use crate::conversion::Wrap;
14use crate::file::{get_file_like, get_mmap_bytes_reader, get_mmap_bytes_reader_and_path};
15use crate::prelude::PyCompatLevel;
16use crate::utils::EnterPolarsExt;
17
18#[pymethods]
19impl PyDataFrame {
20    #[staticmethod]
21    #[cfg(feature = "csv")]
22    #[pyo3(signature = (
23    py_f, infer_schema_length, chunk_size, has_header, ignore_errors, n_rows,
24    skip_rows, skip_lines, projection, separator, rechunk, columns, encoding, n_threads, path,
25    overwrite_dtype, overwrite_dtype_slice, low_memory, comment_prefix, quote_char,
26    null_values, missing_utf8_is_empty_string, try_parse_dates, skip_rows_after_header,
27    row_index, eol_char, raise_if_empty, truncate_ragged_lines, decimal_comma, schema)
28)]
29    pub fn read_csv(
30        py: Python<'_>,
31        py_f: Bound<PyAny>,
32        infer_schema_length: Option<usize>,
33        chunk_size: usize,
34        has_header: bool,
35        ignore_errors: bool,
36        n_rows: Option<usize>,
37        skip_rows: usize,
38        skip_lines: usize,
39        projection: Option<Vec<usize>>,
40        separator: &str,
41        rechunk: bool,
42        columns: Option<Vec<String>>,
43        encoding: Wrap<CsvEncoding>,
44        n_threads: Option<usize>,
45        path: Option<String>,
46        overwrite_dtype: Option<Vec<(PyBackedStr, Wrap<DataType>)>>,
47        overwrite_dtype_slice: Option<Vec<Wrap<DataType>>>,
48        low_memory: bool,
49        comment_prefix: Option<&str>,
50        quote_char: Option<&str>,
51        null_values: Option<Wrap<NullValues>>,
52        missing_utf8_is_empty_string: bool,
53        try_parse_dates: bool,
54        skip_rows_after_header: usize,
55        row_index: Option<(String, IdxSize)>,
56        eol_char: &str,
57        raise_if_empty: bool,
58        truncate_ragged_lines: bool,
59        decimal_comma: bool,
60        schema: Option<Wrap<Schema>>,
61    ) -> PyResult<Self> {
62        let null_values = null_values.map(|w| w.0);
63        let eol_char = eol_char.as_bytes()[0];
64        let row_index = row_index.map(|(name, offset)| RowIndex {
65            name: name.into(),
66            offset,
67        });
68        let quote_char = quote_char.and_then(|s| s.as_bytes().first().copied());
69
70        let overwrite_dtype = overwrite_dtype.map(|overwrite_dtype| {
71            overwrite_dtype
72                .iter()
73                .map(|(name, dtype)| {
74                    let dtype = dtype.0.clone();
75                    Field::new((&**name).into(), dtype)
76                })
77                .collect::<Schema>()
78        });
79
80        let overwrite_dtype_slice = overwrite_dtype_slice.map(|overwrite_dtype| {
81            overwrite_dtype
82                .iter()
83                .map(|dt| dt.0.clone())
84                .collect::<Vec<_>>()
85        });
86
87        let mmap_bytes_r = get_mmap_bytes_reader(&py_f)?;
88        py.enter_polars_df(move || {
89            CsvReadOptions::default()
90                .with_path(path)
91                .with_infer_schema_length(infer_schema_length)
92                .with_has_header(has_header)
93                .with_n_rows(n_rows)
94                .with_skip_rows(skip_rows)
95                .with_skip_lines(skip_lines)
96                .with_ignore_errors(ignore_errors)
97                .with_projection(projection.map(Arc::new))
98                .with_rechunk(rechunk)
99                .with_chunk_size(chunk_size)
100                .with_columns(columns.map(|x| x.into_iter().map(|x| x.into()).collect()))
101                .with_n_threads(n_threads)
102                .with_schema_overwrite(overwrite_dtype.map(Arc::new))
103                .with_dtype_overwrite(overwrite_dtype_slice.map(Arc::new))
104                .with_schema(schema.map(|schema| Arc::new(schema.0)))
105                .with_low_memory(low_memory)
106                .with_skip_rows_after_header(skip_rows_after_header)
107                .with_row_index(row_index)
108                .with_raise_if_empty(raise_if_empty)
109                .with_parse_options(
110                    CsvParseOptions::default()
111                        .with_separator(separator.as_bytes()[0])
112                        .with_encoding(encoding.0)
113                        .with_missing_is_null(!missing_utf8_is_empty_string)
114                        .with_comment_prefix(comment_prefix)
115                        .with_null_values(null_values)
116                        .with_try_parse_dates(try_parse_dates)
117                        .with_quote_char(quote_char)
118                        .with_eol_char(eol_char)
119                        .with_truncate_ragged_lines(truncate_ragged_lines)
120                        .with_decimal_comma(decimal_comma),
121                )
122                .into_reader_with_file_handle(mmap_bytes_r)
123                .finish()
124        })
125    }
126
127    #[staticmethod]
128    #[cfg(feature = "json")]
129    #[pyo3(signature = (py_f, infer_schema_length, schema, schema_overrides))]
130    pub fn read_json(
131        py: Python<'_>,
132        py_f: Bound<PyAny>,
133        infer_schema_length: Option<usize>,
134        schema: Option<Wrap<Schema>>,
135        schema_overrides: Option<Wrap<Schema>>,
136    ) -> PyResult<Self> {
137        assert!(infer_schema_length != Some(0));
138        let mmap_bytes_r = get_mmap_bytes_reader(&py_f)?;
139
140        py.enter_polars_df(move || {
141            let mut builder = JsonReader::new(mmap_bytes_r)
142                .with_json_format(JsonFormat::Json)
143                .infer_schema_len(infer_schema_length.and_then(NonZeroUsize::new));
144
145            if let Some(schema) = schema {
146                builder = builder.with_schema(Arc::new(schema.0));
147            }
148
149            if let Some(schema) = schema_overrides.as_ref() {
150                builder = builder.with_schema_overwrite(&schema.0);
151            }
152
153            builder.finish()
154        })
155    }
156
157    #[staticmethod]
158    #[cfg(feature = "ipc")]
159    #[pyo3(signature = (py_f, columns, projection, n_rows, row_index, memory_map))]
160    pub fn read_ipc(
161        py: Python<'_>,
162        py_f: Bound<PyAny>,
163        columns: Option<Vec<String>>,
164        projection: Option<Vec<usize>>,
165        n_rows: Option<usize>,
166        row_index: Option<(String, IdxSize)>,
167        memory_map: bool,
168    ) -> PyResult<Self> {
169        let row_index = row_index.map(|(name, offset)| RowIndex {
170            name: name.into(),
171            offset,
172        });
173        let (mmap_bytes_r, mmap_path) = get_mmap_bytes_reader_and_path(&py_f)?;
174
175        let mmap_path = if memory_map { mmap_path } else { None };
176        py.enter_polars_df(move || {
177            IpcReader::new(mmap_bytes_r)
178                .with_projection(projection)
179                .with_columns(columns)
180                .with_n_rows(n_rows)
181                .with_row_index(row_index)
182                .memory_mapped(mmap_path)
183                .finish()
184        })
185    }
186
187    #[staticmethod]
188    #[cfg(feature = "ipc_streaming")]
189    #[pyo3(signature = (py_f, columns, projection, n_rows, row_index, rechunk))]
190    pub fn read_ipc_stream(
191        py: Python<'_>,
192        py_f: Bound<PyAny>,
193        columns: Option<Vec<String>>,
194        projection: Option<Vec<usize>>,
195        n_rows: Option<usize>,
196        row_index: Option<(String, IdxSize)>,
197        rechunk: bool,
198    ) -> PyResult<Self> {
199        let row_index = row_index.map(|(name, offset)| RowIndex {
200            name: name.into(),
201            offset,
202        });
203        let mmap_bytes_r = get_mmap_bytes_reader(&py_f)?;
204        py.enter_polars_df(move || {
205            IpcStreamReader::new(mmap_bytes_r)
206                .with_projection(projection)
207                .with_columns(columns)
208                .with_n_rows(n_rows)
209                .with_row_index(row_index)
210                .set_rechunk(rechunk)
211                .finish()
212        })
213    }
214
215    #[staticmethod]
216    #[cfg(feature = "avro")]
217    #[pyo3(signature = (py_f, columns, projection, n_rows))]
218    pub fn read_avro(
219        py: Python<'_>,
220        py_f: PyObject,
221        columns: Option<Vec<String>>,
222        projection: Option<Vec<usize>>,
223        n_rows: Option<usize>,
224    ) -> PyResult<Self> {
225        use polars::io::avro::AvroReader;
226
227        let file = get_file_like(py_f, false)?;
228        py.enter_polars_df(move || {
229            AvroReader::new(file)
230                .with_projection(projection)
231                .with_columns(columns)
232                .with_n_rows(n_rows)
233                .finish()
234        })
235    }
236
237    #[cfg(feature = "json")]
238    pub fn write_json(&mut self, py: Python<'_>, py_f: PyObject) -> PyResult<()> {
239        let file = BufWriter::new(get_file_like(py_f, true)?);
240        py.enter_polars(|| {
241            // TODO: Cloud support
242
243            JsonWriter::new(file)
244                .with_json_format(JsonFormat::Json)
245                .finish(&mut self.df)
246        })
247    }
248
249    #[cfg(feature = "ipc_streaming")]
250    pub fn write_ipc_stream(
251        &mut self,
252        py: Python<'_>,
253        py_f: PyObject,
254        compression: Wrap<Option<IpcCompression>>,
255        compat_level: PyCompatLevel,
256    ) -> PyResult<()> {
257        let mut buf = get_file_like(py_f, true)?;
258        py.enter_polars(|| {
259            IpcStreamWriter::new(&mut buf)
260                .with_compression(compression.0)
261                .with_compat_level(compat_level.0)
262                .finish(&mut self.df)
263        })
264    }
265
266    #[cfg(feature = "avro")]
267    #[pyo3(signature = (py_f, compression, name))]
268    pub fn write_avro(
269        &mut self,
270        py: Python<'_>,
271        py_f: PyObject,
272        compression: Wrap<Option<AvroCompression>>,
273        name: String,
274    ) -> PyResult<()> {
275        use polars::io::avro::AvroWriter;
276        let mut buf = get_file_like(py_f, true)?;
277        py.enter_polars(|| {
278            AvroWriter::new(&mut buf)
279                .with_compression(compression.0)
280                .with_name(name)
281                .finish(&mut self.df)
282        })
283    }
284}