1use std::io::BufWriter;
2use std::num::NonZeroUsize;
3use std::sync::Arc;
4
5use polars::io::RowIndex;
6#[cfg(feature = "avro")]
7use polars::io::avro::AvroCompression;
8use polars::prelude::*;
9use pyo3::prelude::*;
10use pyo3::pybacked::PyBackedStr;
11
12use super::PyDataFrame;
13use crate::conversion::Wrap;
14use crate::file::{
15 EitherRustPythonFile, get_either_file, get_file_like, get_mmap_bytes_reader,
16 get_mmap_bytes_reader_and_path,
17};
18use crate::prelude::PyCompatLevel;
19use crate::utils::EnterPolarsExt;
20
21#[pymethods]
22impl PyDataFrame {
23 #[staticmethod]
24 #[cfg(feature = "csv")]
25 #[pyo3(signature = (
26 py_f, infer_schema_length, chunk_size, has_header, ignore_errors, n_rows,
27 skip_rows, skip_lines, projection, separator, rechunk, columns, encoding, n_threads, path,
28 overwrite_dtype, overwrite_dtype_slice, low_memory, comment_prefix, quote_char,
29 null_values, missing_utf8_is_empty_string, try_parse_dates, skip_rows_after_header,
30 row_index, eol_char, raise_if_empty, truncate_ragged_lines, decimal_comma, schema)
31)]
32 pub fn read_csv(
33 py: Python<'_>,
34 py_f: Bound<PyAny>,
35 infer_schema_length: Option<usize>,
36 chunk_size: usize,
37 has_header: bool,
38 ignore_errors: bool,
39 n_rows: Option<usize>,
40 skip_rows: usize,
41 skip_lines: usize,
42 projection: Option<Vec<usize>>,
43 separator: &str,
44 rechunk: bool,
45 columns: Option<Vec<String>>,
46 encoding: Wrap<CsvEncoding>,
47 n_threads: Option<usize>,
48 path: Option<String>,
49 overwrite_dtype: Option<Vec<(PyBackedStr, Wrap<DataType>)>>,
50 overwrite_dtype_slice: Option<Vec<Wrap<DataType>>>,
51 low_memory: bool,
52 comment_prefix: Option<&str>,
53 quote_char: Option<&str>,
54 null_values: Option<Wrap<NullValues>>,
55 missing_utf8_is_empty_string: bool,
56 try_parse_dates: bool,
57 skip_rows_after_header: usize,
58 row_index: Option<(String, IdxSize)>,
59 eol_char: &str,
60 raise_if_empty: bool,
61 truncate_ragged_lines: bool,
62 decimal_comma: bool,
63 schema: Option<Wrap<Schema>>,
64 ) -> PyResult<Self> {
65 let null_values = null_values.map(|w| w.0);
66 let eol_char = eol_char.as_bytes()[0];
67 let row_index = row_index.map(|(name, offset)| RowIndex {
68 name: name.into(),
69 offset,
70 });
71 let quote_char = quote_char.and_then(|s| s.as_bytes().first().copied());
72
73 let overwrite_dtype = overwrite_dtype.map(|overwrite_dtype| {
74 overwrite_dtype
75 .iter()
76 .map(|(name, dtype)| {
77 let dtype = dtype.0.clone();
78 Field::new((&**name).into(), dtype)
79 })
80 .collect::<Schema>()
81 });
82
83 let overwrite_dtype_slice = overwrite_dtype_slice.map(|overwrite_dtype| {
84 overwrite_dtype
85 .iter()
86 .map(|dt| dt.0.clone())
87 .collect::<Vec<_>>()
88 });
89
90 let mmap_bytes_r = get_mmap_bytes_reader(&py_f)?;
91 py.enter_polars_df(move || {
92 CsvReadOptions::default()
93 .with_path(path)
94 .with_infer_schema_length(infer_schema_length)
95 .with_has_header(has_header)
96 .with_n_rows(n_rows)
97 .with_skip_rows(skip_rows)
98 .with_skip_lines(skip_lines)
99 .with_ignore_errors(ignore_errors)
100 .with_projection(projection.map(Arc::new))
101 .with_rechunk(rechunk)
102 .with_chunk_size(chunk_size)
103 .with_columns(columns.map(|x| x.into_iter().map(|x| x.into()).collect()))
104 .with_n_threads(n_threads)
105 .with_schema_overwrite(overwrite_dtype.map(Arc::new))
106 .with_dtype_overwrite(overwrite_dtype_slice.map(Arc::new))
107 .with_schema(schema.map(|schema| Arc::new(schema.0)))
108 .with_low_memory(low_memory)
109 .with_skip_rows_after_header(skip_rows_after_header)
110 .with_row_index(row_index)
111 .with_raise_if_empty(raise_if_empty)
112 .with_parse_options(
113 CsvParseOptions::default()
114 .with_separator(separator.as_bytes()[0])
115 .with_encoding(encoding.0)
116 .with_missing_is_null(!missing_utf8_is_empty_string)
117 .with_comment_prefix(comment_prefix)
118 .with_null_values(null_values)
119 .with_try_parse_dates(try_parse_dates)
120 .with_quote_char(quote_char)
121 .with_eol_char(eol_char)
122 .with_truncate_ragged_lines(truncate_ragged_lines)
123 .with_decimal_comma(decimal_comma),
124 )
125 .into_reader_with_file_handle(mmap_bytes_r)
126 .finish()
127 })
128 }
129
130 #[staticmethod]
131 #[cfg(feature = "parquet")]
132 #[pyo3(signature = (py_f, columns, projection, n_rows, row_index, low_memory, parallel, use_statistics, rechunk))]
133 pub fn read_parquet(
134 py: Python<'_>,
135 py_f: PyObject,
136 columns: Option<Vec<String>>,
137 projection: Option<Vec<usize>>,
138 n_rows: Option<usize>,
139 row_index: Option<(String, IdxSize)>,
140 low_memory: bool,
141 parallel: Wrap<ParallelStrategy>,
142 use_statistics: bool,
143 rechunk: bool,
144 ) -> PyResult<Self> {
145 use EitherRustPythonFile::*;
146
147 let row_index = row_index.map(|(name, offset)| RowIndex {
148 name: name.into(),
149 offset,
150 });
151
152 _ = use_statistics;
153
154 match get_either_file(py_f, false)? {
155 Py(f) => {
156 let buf = std::io::Cursor::new(f.to_memslice());
157 py.enter_polars_df(move || {
158 ParquetReader::new(buf)
159 .with_projection(projection)
160 .with_columns(columns)
161 .read_parallel(parallel.0)
162 .with_slice(n_rows.map(|x| (0, x)))
163 .with_row_index(row_index)
164 .set_low_memory(low_memory)
165 .set_rechunk(rechunk)
166 .finish()
167 })
168 },
169 Rust(f) => py.enter_polars_df(move || {
170 ParquetReader::new(f)
171 .with_projection(projection)
172 .with_columns(columns)
173 .read_parallel(parallel.0)
174 .with_slice(n_rows.map(|x| (0, x)))
175 .with_row_index(row_index)
176 .set_rechunk(rechunk)
177 .finish()
178 }),
179 }
180 }
181
182 #[staticmethod]
183 #[cfg(feature = "json")]
184 #[pyo3(signature = (py_f, infer_schema_length, schema, schema_overrides))]
185 pub fn read_json(
186 py: Python<'_>,
187 py_f: Bound<PyAny>,
188 infer_schema_length: Option<usize>,
189 schema: Option<Wrap<Schema>>,
190 schema_overrides: Option<Wrap<Schema>>,
191 ) -> PyResult<Self> {
192 assert!(infer_schema_length != Some(0));
193 let mmap_bytes_r = get_mmap_bytes_reader(&py_f)?;
194
195 py.enter_polars_df(move || {
196 let mut builder = JsonReader::new(mmap_bytes_r)
197 .with_json_format(JsonFormat::Json)
198 .infer_schema_len(infer_schema_length.and_then(NonZeroUsize::new));
199
200 if let Some(schema) = schema {
201 builder = builder.with_schema(Arc::new(schema.0));
202 }
203
204 if let Some(schema) = schema_overrides.as_ref() {
205 builder = builder.with_schema_overwrite(&schema.0);
206 }
207
208 builder.finish()
209 })
210 }
211
212 #[staticmethod]
213 #[cfg(feature = "json")]
214 #[pyo3(signature = (py_f, ignore_errors, schema, schema_overrides))]
215 pub fn read_ndjson(
216 py: Python<'_>,
217 py_f: Bound<PyAny>,
218 ignore_errors: bool,
219 schema: Option<Wrap<Schema>>,
220 schema_overrides: Option<Wrap<Schema>>,
221 ) -> PyResult<Self> {
222 let mmap_bytes_r = get_mmap_bytes_reader(&py_f)?;
223
224 let mut builder = JsonReader::new(mmap_bytes_r)
225 .with_json_format(JsonFormat::JsonLines)
226 .with_ignore_errors(ignore_errors);
227
228 if let Some(schema) = schema {
229 builder = builder.with_schema(Arc::new(schema.0));
230 }
231
232 if let Some(schema) = schema_overrides.as_ref() {
233 builder = builder.with_schema_overwrite(&schema.0);
234 }
235
236 py.enter_polars_df(move || builder.finish())
237 }
238
239 #[staticmethod]
240 #[cfg(feature = "ipc")]
241 #[pyo3(signature = (py_f, columns, projection, n_rows, row_index, memory_map))]
242 pub fn read_ipc(
243 py: Python<'_>,
244 py_f: Bound<PyAny>,
245 columns: Option<Vec<String>>,
246 projection: Option<Vec<usize>>,
247 n_rows: Option<usize>,
248 row_index: Option<(String, IdxSize)>,
249 memory_map: bool,
250 ) -> PyResult<Self> {
251 let row_index = row_index.map(|(name, offset)| RowIndex {
252 name: name.into(),
253 offset,
254 });
255 let (mmap_bytes_r, mmap_path) = get_mmap_bytes_reader_and_path(&py_f)?;
256
257 let mmap_path = if memory_map { mmap_path } else { None };
258 py.enter_polars_df(move || {
259 IpcReader::new(mmap_bytes_r)
260 .with_projection(projection)
261 .with_columns(columns)
262 .with_n_rows(n_rows)
263 .with_row_index(row_index)
264 .memory_mapped(mmap_path)
265 .finish()
266 })
267 }
268
269 #[staticmethod]
270 #[cfg(feature = "ipc_streaming")]
271 #[pyo3(signature = (py_f, columns, projection, n_rows, row_index, rechunk))]
272 pub fn read_ipc_stream(
273 py: Python<'_>,
274 py_f: Bound<PyAny>,
275 columns: Option<Vec<String>>,
276 projection: Option<Vec<usize>>,
277 n_rows: Option<usize>,
278 row_index: Option<(String, IdxSize)>,
279 rechunk: bool,
280 ) -> PyResult<Self> {
281 let row_index = row_index.map(|(name, offset)| RowIndex {
282 name: name.into(),
283 offset,
284 });
285 let mmap_bytes_r = get_mmap_bytes_reader(&py_f)?;
286 py.enter_polars_df(move || {
287 IpcStreamReader::new(mmap_bytes_r)
288 .with_projection(projection)
289 .with_columns(columns)
290 .with_n_rows(n_rows)
291 .with_row_index(row_index)
292 .set_rechunk(rechunk)
293 .finish()
294 })
295 }
296
297 #[staticmethod]
298 #[cfg(feature = "avro")]
299 #[pyo3(signature = (py_f, columns, projection, n_rows))]
300 pub fn read_avro(
301 py: Python<'_>,
302 py_f: PyObject,
303 columns: Option<Vec<String>>,
304 projection: Option<Vec<usize>>,
305 n_rows: Option<usize>,
306 ) -> PyResult<Self> {
307 use polars::io::avro::AvroReader;
308
309 let file = get_file_like(py_f, false)?;
310 py.enter_polars_df(move || {
311 AvroReader::new(file)
312 .with_projection(projection)
313 .with_columns(columns)
314 .with_n_rows(n_rows)
315 .finish()
316 })
317 }
318
319 #[cfg(feature = "json")]
320 pub fn write_json(&mut self, py: Python<'_>, py_f: PyObject) -> PyResult<()> {
321 let file = BufWriter::new(get_file_like(py_f, true)?);
322 py.enter_polars(|| {
323 JsonWriter::new(file)
326 .with_json_format(JsonFormat::Json)
327 .finish(&mut self.df)
328 })
329 }
330
331 #[cfg(feature = "ipc_streaming")]
332 pub fn write_ipc_stream(
333 &mut self,
334 py: Python<'_>,
335 py_f: PyObject,
336 compression: Wrap<Option<IpcCompression>>,
337 compat_level: PyCompatLevel,
338 ) -> PyResult<()> {
339 let mut buf = get_file_like(py_f, true)?;
340 py.enter_polars(|| {
341 IpcStreamWriter::new(&mut buf)
342 .with_compression(compression.0)
343 .with_compat_level(compat_level.0)
344 .finish(&mut self.df)
345 })
346 }
347
348 #[cfg(feature = "avro")]
349 #[pyo3(signature = (py_f, compression, name))]
350 pub fn write_avro(
351 &mut self,
352 py: Python<'_>,
353 py_f: PyObject,
354 compression: Wrap<Option<AvroCompression>>,
355 name: String,
356 ) -> PyResult<()> {
357 use polars::io::avro::AvroWriter;
358 let mut buf = get_file_like(py_f, true)?;
359 py.enter_polars(|| {
360 AvroWriter::new(&mut buf)
361 .with_compression(compression.0)
362 .with_name(name)
363 .finish(&mut self.df)
364 })
365 }
366}