1use std::borrow::Cow;
2use std::io::BufWriter;
3use std::num::NonZeroUsize;
4use std::sync::Arc;
5
6#[cfg(feature = "cloud")]
7use cloud::credential_provider::PlCredentialProvider;
8use polars::io::RowIndex;
9#[cfg(feature = "avro")]
10use polars::io::avro::AvroCompression;
11use polars::prelude::*;
12#[cfg(feature = "parquet")]
13use polars_parquet::arrow::write::StatisticsOptions;
14use pyo3::prelude::*;
15use pyo3::pybacked::PyBackedStr;
16
17use super::PyDataFrame;
18use crate::conversion::Wrap;
19#[cfg(feature = "parquet")]
20use crate::conversion::parse_parquet_compression;
21use crate::error::PyPolarsErr;
22use crate::file::{
23 EitherRustPythonFile, get_either_file, get_file_like, get_mmap_bytes_reader,
24 get_mmap_bytes_reader_and_path,
25};
26use crate::prelude::PyCompatLevel;
27#[cfg(feature = "cloud")]
28use crate::prelude::parse_cloud_options;
29use crate::utils::EnterPolarsExt;
30
31#[pymethods]
32impl PyDataFrame {
33 #[staticmethod]
34 #[cfg(feature = "csv")]
35 #[pyo3(signature = (
36 py_f, infer_schema_length, chunk_size, has_header, ignore_errors, n_rows,
37 skip_rows, skip_lines, projection, separator, rechunk, columns, encoding, n_threads, path,
38 overwrite_dtype, overwrite_dtype_slice, low_memory, comment_prefix, quote_char,
39 null_values, missing_utf8_is_empty_string, try_parse_dates, skip_rows_after_header,
40 row_index, eol_char, raise_if_empty, truncate_ragged_lines, decimal_comma, schema)
41)]
42 pub fn read_csv(
43 py: Python,
44 py_f: Bound<PyAny>,
45 infer_schema_length: Option<usize>,
46 chunk_size: usize,
47 has_header: bool,
48 ignore_errors: bool,
49 n_rows: Option<usize>,
50 skip_rows: usize,
51 skip_lines: usize,
52 projection: Option<Vec<usize>>,
53 separator: &str,
54 rechunk: bool,
55 columns: Option<Vec<String>>,
56 encoding: Wrap<CsvEncoding>,
57 n_threads: Option<usize>,
58 path: Option<String>,
59 overwrite_dtype: Option<Vec<(PyBackedStr, Wrap<DataType>)>>,
60 overwrite_dtype_slice: Option<Vec<Wrap<DataType>>>,
61 low_memory: bool,
62 comment_prefix: Option<&str>,
63 quote_char: Option<&str>,
64 null_values: Option<Wrap<NullValues>>,
65 missing_utf8_is_empty_string: bool,
66 try_parse_dates: bool,
67 skip_rows_after_header: usize,
68 row_index: Option<(String, IdxSize)>,
69 eol_char: &str,
70 raise_if_empty: bool,
71 truncate_ragged_lines: bool,
72 decimal_comma: bool,
73 schema: Option<Wrap<Schema>>,
74 ) -> PyResult<Self> {
75 let null_values = null_values.map(|w| w.0);
76 let eol_char = eol_char.as_bytes()[0];
77 let row_index = row_index.map(|(name, offset)| RowIndex {
78 name: name.into(),
79 offset,
80 });
81 let quote_char = quote_char.and_then(|s| s.as_bytes().first().copied());
82
83 let overwrite_dtype = overwrite_dtype.map(|overwrite_dtype| {
84 overwrite_dtype
85 .iter()
86 .map(|(name, dtype)| {
87 let dtype = dtype.0.clone();
88 Field::new((&**name).into(), dtype)
89 })
90 .collect::<Schema>()
91 });
92
93 let overwrite_dtype_slice = overwrite_dtype_slice.map(|overwrite_dtype| {
94 overwrite_dtype
95 .iter()
96 .map(|dt| dt.0.clone())
97 .collect::<Vec<_>>()
98 });
99
100 let mmap_bytes_r = get_mmap_bytes_reader(&py_f)?;
101 py.enter_polars_df(move || {
102 CsvReadOptions::default()
103 .with_path(path)
104 .with_infer_schema_length(infer_schema_length)
105 .with_has_header(has_header)
106 .with_n_rows(n_rows)
107 .with_skip_rows(skip_rows)
108 .with_skip_lines(skip_lines)
109 .with_ignore_errors(ignore_errors)
110 .with_projection(projection.map(Arc::new))
111 .with_rechunk(rechunk)
112 .with_chunk_size(chunk_size)
113 .with_columns(columns.map(|x| x.into_iter().map(|x| x.into()).collect()))
114 .with_n_threads(n_threads)
115 .with_schema_overwrite(overwrite_dtype.map(Arc::new))
116 .with_dtype_overwrite(overwrite_dtype_slice.map(Arc::new))
117 .with_schema(schema.map(|schema| Arc::new(schema.0)))
118 .with_low_memory(low_memory)
119 .with_skip_rows_after_header(skip_rows_after_header)
120 .with_row_index(row_index)
121 .with_raise_if_empty(raise_if_empty)
122 .with_parse_options(
123 CsvParseOptions::default()
124 .with_separator(separator.as_bytes()[0])
125 .with_encoding(encoding.0)
126 .with_missing_is_null(!missing_utf8_is_empty_string)
127 .with_comment_prefix(comment_prefix)
128 .with_null_values(null_values)
129 .with_try_parse_dates(try_parse_dates)
130 .with_quote_char(quote_char)
131 .with_eol_char(eol_char)
132 .with_truncate_ragged_lines(truncate_ragged_lines)
133 .with_decimal_comma(decimal_comma),
134 )
135 .into_reader_with_file_handle(mmap_bytes_r)
136 .finish()
137 })
138 }
139
140 #[staticmethod]
141 #[cfg(feature = "parquet")]
142 #[pyo3(signature = (py_f, columns, projection, n_rows, row_index, low_memory, parallel, use_statistics, rechunk))]
143 pub fn read_parquet(
144 py: Python,
145 py_f: PyObject,
146 columns: Option<Vec<String>>,
147 projection: Option<Vec<usize>>,
148 n_rows: Option<usize>,
149 row_index: Option<(String, IdxSize)>,
150 low_memory: bool,
151 parallel: Wrap<ParallelStrategy>,
152 use_statistics: bool,
153 rechunk: bool,
154 ) -> PyResult<Self> {
155 use EitherRustPythonFile::*;
156
157 let row_index = row_index.map(|(name, offset)| RowIndex {
158 name: name.into(),
159 offset,
160 });
161
162 _ = use_statistics;
163
164 match get_either_file(py_f, false)? {
165 Py(f) => {
166 let buf = std::io::Cursor::new(f.to_memslice());
167 py.enter_polars_df(move || {
168 ParquetReader::new(buf)
169 .with_projection(projection)
170 .with_columns(columns)
171 .read_parallel(parallel.0)
172 .with_slice(n_rows.map(|x| (0, x)))
173 .with_row_index(row_index)
174 .set_low_memory(low_memory)
175 .set_rechunk(rechunk)
176 .finish()
177 })
178 },
179 Rust(f) => py.enter_polars_df(move || {
180 ParquetReader::new(f)
181 .with_projection(projection)
182 .with_columns(columns)
183 .read_parallel(parallel.0)
184 .with_slice(n_rows.map(|x| (0, x)))
185 .with_row_index(row_index)
186 .set_rechunk(rechunk)
187 .finish()
188 }),
189 }
190 }
191
192 #[staticmethod]
193 #[cfg(feature = "json")]
194 #[pyo3(signature = (py_f, infer_schema_length, schema, schema_overrides))]
195 pub fn read_json(
196 py: Python,
197 py_f: Bound<PyAny>,
198 infer_schema_length: Option<usize>,
199 schema: Option<Wrap<Schema>>,
200 schema_overrides: Option<Wrap<Schema>>,
201 ) -> PyResult<Self> {
202 assert!(infer_schema_length != Some(0));
203 let mmap_bytes_r = get_mmap_bytes_reader(&py_f)?;
204
205 py.enter_polars_df(move || {
206 let mut builder = JsonReader::new(mmap_bytes_r)
207 .with_json_format(JsonFormat::Json)
208 .infer_schema_len(infer_schema_length.and_then(NonZeroUsize::new));
209
210 if let Some(schema) = schema {
211 builder = builder.with_schema(Arc::new(schema.0));
212 }
213
214 if let Some(schema) = schema_overrides.as_ref() {
215 builder = builder.with_schema_overwrite(&schema.0);
216 }
217
218 builder.finish()
219 })
220 }
221
222 #[staticmethod]
223 #[cfg(feature = "json")]
224 #[pyo3(signature = (py_f, ignore_errors, schema, schema_overrides))]
225 pub fn read_ndjson(
226 py: Python,
227 py_f: Bound<PyAny>,
228 ignore_errors: bool,
229 schema: Option<Wrap<Schema>>,
230 schema_overrides: Option<Wrap<Schema>>,
231 ) -> PyResult<Self> {
232 let mmap_bytes_r = get_mmap_bytes_reader(&py_f)?;
233
234 let mut builder = JsonReader::new(mmap_bytes_r)
235 .with_json_format(JsonFormat::JsonLines)
236 .with_ignore_errors(ignore_errors);
237
238 if let Some(schema) = schema {
239 builder = builder.with_schema(Arc::new(schema.0));
240 }
241
242 if let Some(schema) = schema_overrides.as_ref() {
243 builder = builder.with_schema_overwrite(&schema.0);
244 }
245
246 py.enter_polars_df(move || builder.finish())
247 }
248
249 #[staticmethod]
250 #[cfg(feature = "ipc")]
251 #[pyo3(signature = (py_f, columns, projection, n_rows, row_index, memory_map))]
252 pub fn read_ipc(
253 py: Python,
254 py_f: Bound<PyAny>,
255 columns: Option<Vec<String>>,
256 projection: Option<Vec<usize>>,
257 n_rows: Option<usize>,
258 row_index: Option<(String, IdxSize)>,
259 memory_map: bool,
260 ) -> PyResult<Self> {
261 let row_index = row_index.map(|(name, offset)| RowIndex {
262 name: name.into(),
263 offset,
264 });
265 let (mmap_bytes_r, mmap_path) = get_mmap_bytes_reader_and_path(&py_f)?;
266
267 let mmap_path = if memory_map { mmap_path } else { None };
268 py.enter_polars_df(move || {
269 IpcReader::new(mmap_bytes_r)
270 .with_projection(projection)
271 .with_columns(columns)
272 .with_n_rows(n_rows)
273 .with_row_index(row_index)
274 .memory_mapped(mmap_path)
275 .finish()
276 })
277 }
278
279 #[staticmethod]
280 #[cfg(feature = "ipc_streaming")]
281 #[pyo3(signature = (py_f, columns, projection, n_rows, row_index, rechunk))]
282 pub fn read_ipc_stream(
283 py: Python,
284 py_f: Bound<PyAny>,
285 columns: Option<Vec<String>>,
286 projection: Option<Vec<usize>>,
287 n_rows: Option<usize>,
288 row_index: Option<(String, IdxSize)>,
289 rechunk: bool,
290 ) -> PyResult<Self> {
291 let row_index = row_index.map(|(name, offset)| RowIndex {
292 name: name.into(),
293 offset,
294 });
295 let mmap_bytes_r = get_mmap_bytes_reader(&py_f)?;
296 py.enter_polars_df(move || {
297 IpcStreamReader::new(mmap_bytes_r)
298 .with_projection(projection)
299 .with_columns(columns)
300 .with_n_rows(n_rows)
301 .with_row_index(row_index)
302 .set_rechunk(rechunk)
303 .finish()
304 })
305 }
306
307 #[staticmethod]
308 #[cfg(feature = "avro")]
309 #[pyo3(signature = (py_f, columns, projection, n_rows))]
310 pub fn read_avro(
311 py: Python,
312 py_f: PyObject,
313 columns: Option<Vec<String>>,
314 projection: Option<Vec<usize>>,
315 n_rows: Option<usize>,
316 ) -> PyResult<Self> {
317 use polars::io::avro::AvroReader;
318
319 let file = get_file_like(py_f, false)?;
320 py.enter_polars_df(move || {
321 AvroReader::new(file)
322 .with_projection(projection)
323 .with_columns(columns)
324 .with_n_rows(n_rows)
325 .finish()
326 })
327 }
328
329 #[cfg(feature = "csv")]
330 #[pyo3(signature = (
331 py_f, include_bom, include_header, separator, line_terminator, quote_char, batch_size,
332 datetime_format, date_format, time_format, float_scientific, float_precision, null_value,
333 quote_style, cloud_options, credential_provider, retries
334 ))]
335 pub fn write_csv(
336 &mut self,
337 py: Python,
338 py_f: PyObject,
339 include_bom: bool,
340 include_header: bool,
341 separator: u8,
342 line_terminator: String,
343 quote_char: u8,
344 batch_size: NonZeroUsize,
345 datetime_format: Option<String>,
346 date_format: Option<String>,
347 time_format: Option<String>,
348 float_scientific: Option<bool>,
349 float_precision: Option<usize>,
350 null_value: Option<String>,
351 quote_style: Option<Wrap<QuoteStyle>>,
352 cloud_options: Option<Vec<(String, String)>>,
353 credential_provider: Option<PyObject>,
354 retries: usize,
355 ) -> PyResult<()> {
356 let null = null_value.unwrap_or_default();
357
358 #[cfg(feature = "cloud")]
359 let cloud_options = if let Ok(path) = py_f.extract::<Cow<str>>(py) {
360 let cloud_options = parse_cloud_options(&path, cloud_options.unwrap_or_default())?;
361 Some(
362 cloud_options
363 .with_max_retries(retries)
364 .with_credential_provider(
365 credential_provider.map(PlCredentialProvider::from_python_builder),
366 ),
367 )
368 } else {
369 None
370 };
371
372 #[cfg(not(feature = "cloud"))]
373 let cloud_options = None;
374
375 let mut f = crate::file::try_get_writeable(py_f, cloud_options.as_ref())?;
376
377 py.enter_polars(|| {
378 CsvWriter::new(&mut f)
379 .include_bom(include_bom)
380 .include_header(include_header)
381 .with_separator(separator)
382 .with_line_terminator(line_terminator)
383 .with_quote_char(quote_char)
384 .with_batch_size(batch_size)
385 .with_datetime_format(datetime_format)
386 .with_date_format(date_format)
387 .with_time_format(time_format)
388 .with_float_scientific(float_scientific)
389 .with_float_precision(float_precision)
390 .with_null_value(null)
391 .with_quote_style(quote_style.map(|wrap| wrap.0).unwrap_or_default())
392 .finish(&mut self.df)?;
393
394 crate::file::close_file(f)
395 })
396 }
397
398 #[cfg(feature = "parquet")]
399 #[pyo3(signature = (
400 py_f, compression, compression_level, statistics, row_group_size, data_page_size,
401 partition_by, partition_chunk_size_bytes, cloud_options, credential_provider, retries
402 ))]
403 pub fn write_parquet(
404 &mut self,
405 py: Python,
406 py_f: PyObject,
407 compression: &str,
408 compression_level: Option<i32>,
409 statistics: Wrap<StatisticsOptions>,
410 row_group_size: Option<usize>,
411 data_page_size: Option<usize>,
412 partition_by: Option<Vec<String>>,
413 partition_chunk_size_bytes: usize,
414 cloud_options: Option<Vec<(String, String)>>,
415 credential_provider: Option<PyObject>,
416 retries: usize,
417 ) -> PyResult<()> {
418 use polars_io::partition::write_partitioned_dataset;
419
420 let compression = parse_parquet_compression(compression, compression_level)?;
421
422 #[cfg(feature = "cloud")]
423 let cloud_options = if let Ok(path) = py_f.extract::<Cow<str>>(py) {
424 let cloud_options = parse_cloud_options(&path, cloud_options.unwrap_or_default())?;
425 Some(
426 cloud_options
427 .with_max_retries(retries)
428 .with_credential_provider(
429 credential_provider.map(PlCredentialProvider::from_python_builder),
430 ),
431 )
432 } else {
433 None
434 };
435
436 #[cfg(not(feature = "cloud"))]
437 let cloud_options = None;
438
439 if let Some(partition_by) = partition_by {
440 let path = py_f.extract::<String>(py)?;
441
442 return py.enter_polars(|| {
443 let write_options = ParquetWriteOptions {
444 compression,
445 statistics: statistics.0,
446 row_group_size,
447 data_page_size,
448 };
449 write_partitioned_dataset(
450 &mut self.df,
451 std::path::Path::new(path.as_str()),
452 partition_by.into_iter().map(|x| x.into()).collect(),
453 &write_options,
454 cloud_options.as_ref(),
455 partition_chunk_size_bytes,
456 )
457 });
458 };
459
460 let mut f = crate::file::try_get_writeable(py_f, cloud_options.as_ref())?;
461
462 py.enter_polars(|| {
463 ParquetWriter::new(BufWriter::new(&mut f))
464 .with_compression(compression)
465 .with_statistics(statistics.0)
466 .with_row_group_size(row_group_size)
467 .with_data_page_size(data_page_size)
468 .finish(&mut self.df)?;
469
470 crate::file::close_file(f)
471 })
472 }
473
474 #[cfg(feature = "json")]
475 pub fn write_json(&mut self, py: Python, py_f: PyObject) -> PyResult<()> {
476 let file = BufWriter::new(get_file_like(py_f, true)?);
477 py.enter_polars(|| {
478 JsonWriter::new(file)
481 .with_json_format(JsonFormat::Json)
482 .finish(&mut self.df)
483 })
484 }
485
486 #[cfg(feature = "json")]
487 pub fn write_ndjson(&mut self, py: Python, py_f: PyObject) -> PyResult<()> {
488 let file = BufWriter::new(get_file_like(py_f, true)?);
489
490 py.enter_polars(|| {
493 JsonWriter::new(file)
496 .with_json_format(JsonFormat::JsonLines)
497 .finish(&mut self.df)
498 .map_err(PyPolarsErr::from)
499 })
500 }
501
502 #[cfg(feature = "ipc")]
503 #[pyo3(signature = (
504 py_f, compression, compat_level, cloud_options, credential_provider, retries
505 ))]
506 pub fn write_ipc(
507 &mut self,
508 py: Python,
509 py_f: PyObject,
510 compression: Wrap<Option<IpcCompression>>,
511 compat_level: PyCompatLevel,
512 cloud_options: Option<Vec<(String, String)>>,
513 credential_provider: Option<PyObject>,
514 retries: usize,
515 ) -> PyResult<()> {
516 #[cfg(feature = "cloud")]
517 let cloud_options = if let Ok(path) = py_f.extract::<Cow<str>>(py) {
518 let cloud_options = parse_cloud_options(&path, cloud_options.unwrap_or_default())?;
519 Some(
520 cloud_options
521 .with_max_retries(retries)
522 .with_credential_provider(
523 credential_provider.map(PlCredentialProvider::from_python_builder),
524 ),
525 )
526 } else {
527 None
528 };
529
530 #[cfg(not(feature = "cloud"))]
531 let cloud_options = None;
532
533 let mut f = crate::file::try_get_writeable(py_f, cloud_options.as_ref())?;
534
535 py.enter_polars(|| {
536 IpcWriter::new(&mut f)
537 .with_compression(compression.0)
538 .with_compat_level(compat_level.0)
539 .finish(&mut self.df)?;
540
541 crate::file::close_file(f)
542 })
543 }
544
545 #[cfg(feature = "ipc_streaming")]
546 pub fn write_ipc_stream(
547 &mut self,
548 py: Python,
549 py_f: PyObject,
550 compression: Wrap<Option<IpcCompression>>,
551 compat_level: PyCompatLevel,
552 ) -> PyResult<()> {
553 let mut buf = get_file_like(py_f, true)?;
554 py.enter_polars(|| {
555 IpcStreamWriter::new(&mut buf)
556 .with_compression(compression.0)
557 .with_compat_level(compat_level.0)
558 .finish(&mut self.df)
559 })
560 }
561
562 #[cfg(feature = "avro")]
563 #[pyo3(signature = (py_f, compression, name))]
564 pub fn write_avro(
565 &mut self,
566 py: Python,
567 py_f: PyObject,
568 compression: Wrap<Option<AvroCompression>>,
569 name: String,
570 ) -> PyResult<()> {
571 use polars::io::avro::AvroWriter;
572 let mut buf = get_file_like(py_f, true)?;
573 py.enter_polars(|| {
574 AvroWriter::new(&mut buf)
575 .with_compression(compression.0)
576 .with_name(name)
577 .finish(&mut self.df)
578 })
579 }
580}