1use std::borrow::Cow;
2use std::io::BufWriter;
3use std::num::NonZeroUsize;
4use std::sync::Arc;
5
6#[cfg(feature = "cloud")]
7use cloud::credential_provider::PlCredentialProvider;
8#[cfg(feature = "avro")]
9use polars::io::avro::AvroCompression;
10use polars::io::RowIndex;
11use polars::prelude::*;
12#[cfg(feature = "parquet")]
13use polars_parquet::arrow::write::StatisticsOptions;
14use pyo3::prelude::*;
15use pyo3::pybacked::PyBackedStr;
16
17use super::PyDataFrame;
18#[cfg(feature = "parquet")]
19use crate::conversion::parse_parquet_compression;
20use crate::conversion::Wrap;
21use crate::error::PyPolarsErr;
22use crate::file::{
23 get_either_file, get_file_like, get_mmap_bytes_reader, get_mmap_bytes_reader_and_path,
24 EitherRustPythonFile,
25};
26#[cfg(feature = "cloud")]
27use crate::prelude::parse_cloud_options;
28use crate::prelude::PyCompatLevel;
29
30#[pymethods]
31impl PyDataFrame {
32 #[staticmethod]
33 #[cfg(feature = "csv")]
34 #[pyo3(signature = (
35 py_f, infer_schema_length, chunk_size, has_header, ignore_errors, n_rows,
36 skip_rows, skip_lines, projection, separator, rechunk, columns, encoding, n_threads, path,
37 overwrite_dtype, overwrite_dtype_slice, low_memory, comment_prefix, quote_char,
38 null_values, missing_utf8_is_empty_string, try_parse_dates, skip_rows_after_header,
39 row_index, eol_char, raise_if_empty, truncate_ragged_lines, decimal_comma, schema)
40)]
41 pub fn read_csv(
42 py: Python,
43 py_f: Bound<PyAny>,
44 infer_schema_length: Option<usize>,
45 chunk_size: usize,
46 has_header: bool,
47 ignore_errors: bool,
48 n_rows: Option<usize>,
49 skip_rows: usize,
50 skip_lines: usize,
51 projection: Option<Vec<usize>>,
52 separator: &str,
53 rechunk: bool,
54 columns: Option<Vec<String>>,
55 encoding: Wrap<CsvEncoding>,
56 n_threads: Option<usize>,
57 path: Option<String>,
58 overwrite_dtype: Option<Vec<(PyBackedStr, Wrap<DataType>)>>,
59 overwrite_dtype_slice: Option<Vec<Wrap<DataType>>>,
60 low_memory: bool,
61 comment_prefix: Option<&str>,
62 quote_char: Option<&str>,
63 null_values: Option<Wrap<NullValues>>,
64 missing_utf8_is_empty_string: bool,
65 try_parse_dates: bool,
66 skip_rows_after_header: usize,
67 row_index: Option<(String, IdxSize)>,
68 eol_char: &str,
69 raise_if_empty: bool,
70 truncate_ragged_lines: bool,
71 decimal_comma: bool,
72 schema: Option<Wrap<Schema>>,
73 ) -> PyResult<Self> {
74 let null_values = null_values.map(|w| w.0);
75 let eol_char = eol_char.as_bytes()[0];
76 let row_index = row_index.map(|(name, offset)| RowIndex {
77 name: name.into(),
78 offset,
79 });
80 let quote_char = quote_char.and_then(|s| s.as_bytes().first().copied());
81
82 let overwrite_dtype = overwrite_dtype.map(|overwrite_dtype| {
83 overwrite_dtype
84 .iter()
85 .map(|(name, dtype)| {
86 let dtype = dtype.0.clone();
87 Field::new((&**name).into(), dtype)
88 })
89 .collect::<Schema>()
90 });
91
92 let overwrite_dtype_slice = overwrite_dtype_slice.map(|overwrite_dtype| {
93 overwrite_dtype
94 .iter()
95 .map(|dt| dt.0.clone())
96 .collect::<Vec<_>>()
97 });
98
99 let mmap_bytes_r = get_mmap_bytes_reader(&py_f)?;
100 let df = py.allow_threads(move || {
101 CsvReadOptions::default()
102 .with_path(path)
103 .with_infer_schema_length(infer_schema_length)
104 .with_has_header(has_header)
105 .with_n_rows(n_rows)
106 .with_skip_rows(skip_rows)
107 .with_skip_lines(skip_lines)
108 .with_ignore_errors(ignore_errors)
109 .with_projection(projection.map(Arc::new))
110 .with_rechunk(rechunk)
111 .with_chunk_size(chunk_size)
112 .with_columns(columns.map(|x| x.into_iter().map(|x| x.into()).collect()))
113 .with_n_threads(n_threads)
114 .with_schema_overwrite(overwrite_dtype.map(Arc::new))
115 .with_dtype_overwrite(overwrite_dtype_slice.map(Arc::new))
116 .with_schema(schema.map(|schema| Arc::new(schema.0)))
117 .with_low_memory(low_memory)
118 .with_skip_rows_after_header(skip_rows_after_header)
119 .with_row_index(row_index)
120 .with_raise_if_empty(raise_if_empty)
121 .with_parse_options(
122 CsvParseOptions::default()
123 .with_separator(separator.as_bytes()[0])
124 .with_encoding(encoding.0)
125 .with_missing_is_null(!missing_utf8_is_empty_string)
126 .with_comment_prefix(comment_prefix)
127 .with_null_values(null_values)
128 .with_try_parse_dates(try_parse_dates)
129 .with_quote_char(quote_char)
130 .with_eol_char(eol_char)
131 .with_truncate_ragged_lines(truncate_ragged_lines)
132 .with_decimal_comma(decimal_comma),
133 )
134 .into_reader_with_file_handle(mmap_bytes_r)
135 .finish()
136 .map_err(PyPolarsErr::from)
137 })?;
138 Ok(df.into())
139 }
140
141 #[staticmethod]
142 #[cfg(feature = "parquet")]
143 #[pyo3(signature = (py_f, columns, projection, n_rows, row_index, low_memory, parallel, use_statistics, rechunk))]
144 pub fn read_parquet(
145 py: Python,
146 py_f: PyObject,
147 columns: Option<Vec<String>>,
148 projection: Option<Vec<usize>>,
149 n_rows: Option<usize>,
150 row_index: Option<(String, IdxSize)>,
151 low_memory: bool,
152 parallel: Wrap<ParallelStrategy>,
153 use_statistics: bool,
154 rechunk: bool,
155 ) -> PyResult<Self> {
156 use EitherRustPythonFile::*;
157
158 let row_index = row_index.map(|(name, offset)| RowIndex {
159 name: name.into(),
160 offset,
161 });
162
163 let result = match get_either_file(py_f, false)? {
164 Py(f) => {
165 let buf = std::io::Cursor::new(f.to_memslice());
166 py.allow_threads(move || {
167 ParquetReader::new(buf)
168 .with_projection(projection)
169 .with_columns(columns)
170 .read_parallel(parallel.0)
171 .with_slice(n_rows.map(|x| (0, x)))
172 .with_row_index(row_index)
173 .set_low_memory(low_memory)
174 .use_statistics(use_statistics)
175 .set_rechunk(rechunk)
176 .finish()
177 })
178 },
179 Rust(f) => py.allow_threads(move || {
180 ParquetReader::new(f)
181 .with_projection(projection)
182 .with_columns(columns)
183 .read_parallel(parallel.0)
184 .with_slice(n_rows.map(|x| (0, x)))
185 .with_row_index(row_index)
186 .use_statistics(use_statistics)
187 .set_rechunk(rechunk)
188 .finish()
189 }),
190 };
191 let df = result.map_err(PyPolarsErr::from)?;
192 Ok(PyDataFrame::new(df))
193 }
194
195 #[staticmethod]
196 #[cfg(feature = "json")]
197 #[pyo3(signature = (py_f, infer_schema_length, schema, schema_overrides))]
198 pub fn read_json(
199 py: Python,
200 py_f: Bound<PyAny>,
201 infer_schema_length: Option<usize>,
202 schema: Option<Wrap<Schema>>,
203 schema_overrides: Option<Wrap<Schema>>,
204 ) -> PyResult<Self> {
205 assert!(infer_schema_length != Some(0));
206 let mmap_bytes_r = get_mmap_bytes_reader(&py_f)?;
207
208 py.allow_threads(move || {
209 let mut builder = JsonReader::new(mmap_bytes_r)
210 .with_json_format(JsonFormat::Json)
211 .infer_schema_len(infer_schema_length.and_then(NonZeroUsize::new));
212
213 if let Some(schema) = schema {
214 builder = builder.with_schema(Arc::new(schema.0));
215 }
216
217 if let Some(schema) = schema_overrides.as_ref() {
218 builder = builder.with_schema_overwrite(&schema.0);
219 }
220
221 let out = builder.finish().map_err(PyPolarsErr::from)?;
222 Ok(out.into())
223 })
224 }
225
226 #[staticmethod]
227 #[cfg(feature = "json")]
228 #[pyo3(signature = (py_f, ignore_errors, schema, schema_overrides))]
229 pub fn read_ndjson(
230 py: Python,
231 py_f: Bound<PyAny>,
232 ignore_errors: bool,
233 schema: Option<Wrap<Schema>>,
234 schema_overrides: Option<Wrap<Schema>>,
235 ) -> PyResult<Self> {
236 let mmap_bytes_r = get_mmap_bytes_reader(&py_f)?;
237
238 let mut builder = JsonReader::new(mmap_bytes_r)
239 .with_json_format(JsonFormat::JsonLines)
240 .with_ignore_errors(ignore_errors);
241
242 if let Some(schema) = schema {
243 builder = builder.with_schema(Arc::new(schema.0));
244 }
245
246 if let Some(schema) = schema_overrides.as_ref() {
247 builder = builder.with_schema_overwrite(&schema.0);
248 }
249
250 let out = py
251 .allow_threads(move || builder.finish())
252 .map_err(|e| PyPolarsErr::Other(format!("{e}")))?;
253 Ok(out.into())
254 }
255
256 #[staticmethod]
257 #[cfg(feature = "ipc")]
258 #[pyo3(signature = (py_f, columns, projection, n_rows, row_index, memory_map))]
259 pub fn read_ipc(
260 py: Python,
261 py_f: Bound<PyAny>,
262 columns: Option<Vec<String>>,
263 projection: Option<Vec<usize>>,
264 n_rows: Option<usize>,
265 row_index: Option<(String, IdxSize)>,
266 memory_map: bool,
267 ) -> PyResult<Self> {
268 let row_index = row_index.map(|(name, offset)| RowIndex {
269 name: name.into(),
270 offset,
271 });
272 let (mmap_bytes_r, mmap_path) = get_mmap_bytes_reader_and_path(&py_f)?;
273
274 let mmap_path = if memory_map { mmap_path } else { None };
275 let df = py.allow_threads(move || {
276 IpcReader::new(mmap_bytes_r)
277 .with_projection(projection)
278 .with_columns(columns)
279 .with_n_rows(n_rows)
280 .with_row_index(row_index)
281 .memory_mapped(mmap_path)
282 .finish()
283 .map_err(PyPolarsErr::from)
284 })?;
285 Ok(PyDataFrame::new(df))
286 }
287
288 #[staticmethod]
289 #[cfg(feature = "ipc_streaming")]
290 #[pyo3(signature = (py_f, columns, projection, n_rows, row_index, rechunk))]
291 pub fn read_ipc_stream(
292 py: Python,
293 py_f: Bound<PyAny>,
294 columns: Option<Vec<String>>,
295 projection: Option<Vec<usize>>,
296 n_rows: Option<usize>,
297 row_index: Option<(String, IdxSize)>,
298 rechunk: bool,
299 ) -> PyResult<Self> {
300 let row_index = row_index.map(|(name, offset)| RowIndex {
301 name: name.into(),
302 offset,
303 });
304 let mmap_bytes_r = get_mmap_bytes_reader(&py_f)?;
305 let df = py.allow_threads(move || {
306 IpcStreamReader::new(mmap_bytes_r)
307 .with_projection(projection)
308 .with_columns(columns)
309 .with_n_rows(n_rows)
310 .with_row_index(row_index)
311 .set_rechunk(rechunk)
312 .finish()
313 .map_err(PyPolarsErr::from)
314 })?;
315 Ok(PyDataFrame::new(df))
316 }
317
318 #[staticmethod]
319 #[cfg(feature = "avro")]
320 #[pyo3(signature = (py_f, columns, projection, n_rows))]
321 pub fn read_avro(
322 py: Python,
323 py_f: PyObject,
324 columns: Option<Vec<String>>,
325 projection: Option<Vec<usize>>,
326 n_rows: Option<usize>,
327 ) -> PyResult<Self> {
328 use polars::io::avro::AvroReader;
329
330 let file = get_file_like(py_f, false)?;
331 let df = py.allow_threads(move || {
332 AvroReader::new(file)
333 .with_projection(projection)
334 .with_columns(columns)
335 .with_n_rows(n_rows)
336 .finish()
337 .map_err(PyPolarsErr::from)
338 })?;
339 Ok(PyDataFrame::new(df))
340 }
341
342 #[cfg(feature = "csv")]
343 #[pyo3(signature = (
344 py_f, include_bom, include_header, separator, line_terminator, quote_char, batch_size,
345 datetime_format, date_format, time_format, float_scientific, float_precision, null_value,
346 quote_style, cloud_options, credential_provider, retries
347 ))]
348 pub fn write_csv(
349 &mut self,
350 py: Python,
351 py_f: PyObject,
352 include_bom: bool,
353 include_header: bool,
354 separator: u8,
355 line_terminator: String,
356 quote_char: u8,
357 batch_size: NonZeroUsize,
358 datetime_format: Option<String>,
359 date_format: Option<String>,
360 time_format: Option<String>,
361 float_scientific: Option<bool>,
362 float_precision: Option<usize>,
363 null_value: Option<String>,
364 quote_style: Option<Wrap<QuoteStyle>>,
365 cloud_options: Option<Vec<(String, String)>>,
366 credential_provider: Option<PyObject>,
367 retries: usize,
368 ) -> PyResult<()> {
369 let null = null_value.unwrap_or_default();
370
371 #[cfg(feature = "cloud")]
372 let cloud_options = if let Ok(path) = py_f.extract::<Cow<str>>(py) {
373 let cloud_options = parse_cloud_options(&path, cloud_options.unwrap_or_default())?;
374 Some(
375 cloud_options
376 .with_max_retries(retries)
377 .with_credential_provider(
378 credential_provider.map(PlCredentialProvider::from_python_func_object),
379 ),
380 )
381 } else {
382 None
383 };
384
385 #[cfg(not(feature = "cloud"))]
386 let cloud_options = None;
387
388 let f = crate::file::try_get_writeable(py_f, cloud_options.as_ref())?;
389
390 py.allow_threads(|| {
391 CsvWriter::new(f)
392 .include_bom(include_bom)
393 .include_header(include_header)
394 .with_separator(separator)
395 .with_line_terminator(line_terminator)
396 .with_quote_char(quote_char)
397 .with_batch_size(batch_size)
398 .with_datetime_format(datetime_format)
399 .with_date_format(date_format)
400 .with_time_format(time_format)
401 .with_float_scientific(float_scientific)
402 .with_float_precision(float_precision)
403 .with_null_value(null)
404 .with_quote_style(quote_style.map(|wrap| wrap.0).unwrap_or_default())
405 .finish(&mut self.df)
406 .map_err(PyPolarsErr::from)
407 })?;
408 Ok(())
409 }
410
411 #[cfg(feature = "parquet")]
412 #[pyo3(signature = (
413 py_f, compression, compression_level, statistics, row_group_size, data_page_size,
414 partition_by, partition_chunk_size_bytes, cloud_options, credential_provider, retries
415 ))]
416 pub fn write_parquet(
417 &mut self,
418 py: Python,
419 py_f: PyObject,
420 compression: &str,
421 compression_level: Option<i32>,
422 statistics: Wrap<StatisticsOptions>,
423 row_group_size: Option<usize>,
424 data_page_size: Option<usize>,
425 partition_by: Option<Vec<String>>,
426 partition_chunk_size_bytes: usize,
427 cloud_options: Option<Vec<(String, String)>>,
428 credential_provider: Option<PyObject>,
429 retries: usize,
430 ) -> PyResult<()> {
431 use polars_io::partition::write_partitioned_dataset;
432
433 let compression = parse_parquet_compression(compression, compression_level)?;
434
435 #[cfg(feature = "cloud")]
436 let cloud_options = if let Ok(path) = py_f.extract::<Cow<str>>(py) {
437 let cloud_options = parse_cloud_options(&path, cloud_options.unwrap_or_default())?;
438 Some(
439 cloud_options
440 .with_max_retries(retries)
441 .with_credential_provider(
442 credential_provider.map(PlCredentialProvider::from_python_func_object),
443 ),
444 )
445 } else {
446 None
447 };
448
449 #[cfg(not(feature = "cloud"))]
450 let cloud_options = None;
451
452 if let Some(partition_by) = partition_by {
453 let path = py_f.extract::<String>(py)?;
454
455 py.allow_threads(|| {
456 let write_options = ParquetWriteOptions {
457 compression,
458 statistics: statistics.0,
459 row_group_size,
460 data_page_size,
461 maintain_order: true,
462 };
463 write_partitioned_dataset(
464 &mut self.df,
465 std::path::Path::new(path.as_str()),
466 partition_by.into_iter().map(|x| x.into()).collect(),
467 &write_options,
468 cloud_options.as_ref(),
469 partition_chunk_size_bytes,
470 )
471 .map_err(PyPolarsErr::from)
472 })?;
473
474 return Ok(());
475 };
476
477 let f = crate::file::try_get_writeable(py_f, cloud_options.as_ref())?;
478
479 py.allow_threads(|| {
480 ParquetWriter::new(BufWriter::new(f))
481 .with_compression(compression)
482 .with_statistics(statistics.0)
483 .with_row_group_size(row_group_size)
484 .with_data_page_size(data_page_size)
485 .finish(&mut self.df)
486 .map_err(PyPolarsErr::from)
487 })?;
488 Ok(())
489 }
490
491 #[cfg(feature = "json")]
492 pub fn write_json(&mut self, py_f: PyObject) -> PyResult<()> {
493 let file = BufWriter::new(get_file_like(py_f, true)?);
494
495 JsonWriter::new(file)
498 .with_json_format(JsonFormat::Json)
499 .finish(&mut self.df)
500 .map_err(PyPolarsErr::from)?;
501 Ok(())
502 }
503
504 #[cfg(feature = "json")]
505 pub fn write_ndjson(&mut self, py_f: PyObject) -> PyResult<()> {
506 let file = BufWriter::new(get_file_like(py_f, true)?);
507
508 JsonWriter::new(file)
511 .with_json_format(JsonFormat::JsonLines)
512 .finish(&mut self.df)
513 .map_err(PyPolarsErr::from)?;
514
515 Ok(())
516 }
517
518 #[cfg(feature = "ipc")]
519 #[pyo3(signature = (
520 py_f, compression, compat_level, cloud_options, credential_provider, retries
521 ))]
522 pub fn write_ipc(
523 &mut self,
524 py: Python,
525 py_f: PyObject,
526 compression: Wrap<Option<IpcCompression>>,
527 compat_level: PyCompatLevel,
528 cloud_options: Option<Vec<(String, String)>>,
529 credential_provider: Option<PyObject>,
530 retries: usize,
531 ) -> PyResult<()> {
532 #[cfg(feature = "cloud")]
533 let cloud_options = if let Ok(path) = py_f.extract::<Cow<str>>(py) {
534 let cloud_options = parse_cloud_options(&path, cloud_options.unwrap_or_default())?;
535 Some(
536 cloud_options
537 .with_max_retries(retries)
538 .with_credential_provider(
539 credential_provider.map(PlCredentialProvider::from_python_func_object),
540 ),
541 )
542 } else {
543 None
544 };
545
546 #[cfg(not(feature = "cloud"))]
547 let cloud_options = None;
548
549 let f = crate::file::try_get_writeable(py_f, cloud_options.as_ref())?;
550
551 py.allow_threads(|| {
552 IpcWriter::new(f)
553 .with_compression(compression.0)
554 .with_compat_level(compat_level.0)
555 .finish(&mut self.df)
556 .map_err(PyPolarsErr::from)
557 })?;
558 Ok(())
559 }
560
561 #[cfg(feature = "ipc_streaming")]
562 pub fn write_ipc_stream(
563 &mut self,
564 py: Python,
565 py_f: PyObject,
566 compression: Wrap<Option<IpcCompression>>,
567 compat_level: PyCompatLevel,
568 ) -> PyResult<()> {
569 let mut buf = get_file_like(py_f, true)?;
570 py.allow_threads(|| {
571 IpcStreamWriter::new(&mut buf)
572 .with_compression(compression.0)
573 .with_compat_level(compat_level.0)
574 .finish(&mut self.df)
575 .map_err(PyPolarsErr::from)
576 })?;
577 Ok(())
578 }
579
580 #[cfg(feature = "avro")]
581 #[pyo3(signature = (py_f, compression, name))]
582 pub fn write_avro(
583 &mut self,
584 py: Python,
585 py_f: PyObject,
586 compression: Wrap<Option<AvroCompression>>,
587 name: String,
588 ) -> PyResult<()> {
589 use polars::io::avro::AvroWriter;
590 let mut buf = get_file_like(py_f, true)?;
591 py.allow_threads(|| {
592 AvroWriter::new(&mut buf)
593 .with_compression(compression.0)
594 .with_name(name)
595 .finish(&mut self.df)
596 .map_err(PyPolarsErr::from)
597 })?;
598 Ok(())
599 }
600}