1use std::io::BufWriter;
2use std::num::NonZeroUsize;
3use std::sync::Arc;
4
5use polars::io::RowIndex;
6#[cfg(feature = "avro")]
7use polars::io::avro::AvroCompression;
8use polars::prelude::*;
9use pyo3::prelude::*;
10use pyo3::pybacked::PyBackedStr;
11
12use super::PyDataFrame;
13use crate::conversion::Wrap;
14use crate::file::{get_file_like, get_mmap_bytes_reader, get_mmap_bytes_reader_and_path};
15use crate::prelude::PyCompatLevel;
16use crate::utils::EnterPolarsExt;
17
18#[pymethods]
19impl PyDataFrame {
20 #[staticmethod]
21 #[cfg(feature = "csv")]
22 #[pyo3(signature = (
23 py_f, infer_schema_length, chunk_size, has_header, ignore_errors, n_rows,
24 skip_rows, skip_lines, projection, separator, rechunk, columns, encoding, n_threads, path,
25 overwrite_dtype, overwrite_dtype_slice, low_memory, comment_prefix, quote_char,
26 null_values, missing_utf8_is_empty_string, try_parse_dates, skip_rows_after_header,
27 row_index, eol_char, raise_if_empty, truncate_ragged_lines, decimal_comma, schema)
28)]
29 pub fn read_csv(
30 py: Python<'_>,
31 py_f: Bound<PyAny>,
32 infer_schema_length: Option<usize>,
33 chunk_size: usize,
34 has_header: bool,
35 ignore_errors: bool,
36 n_rows: Option<usize>,
37 skip_rows: usize,
38 skip_lines: usize,
39 projection: Option<Vec<usize>>,
40 separator: &str,
41 rechunk: bool,
42 columns: Option<Vec<String>>,
43 encoding: Wrap<CsvEncoding>,
44 n_threads: Option<usize>,
45 path: Option<String>,
46 overwrite_dtype: Option<Vec<(PyBackedStr, Wrap<DataType>)>>,
47 overwrite_dtype_slice: Option<Vec<Wrap<DataType>>>,
48 low_memory: bool,
49 comment_prefix: Option<&str>,
50 quote_char: Option<&str>,
51 null_values: Option<Wrap<NullValues>>,
52 missing_utf8_is_empty_string: bool,
53 try_parse_dates: bool,
54 skip_rows_after_header: usize,
55 row_index: Option<(String, IdxSize)>,
56 eol_char: &str,
57 raise_if_empty: bool,
58 truncate_ragged_lines: bool,
59 decimal_comma: bool,
60 schema: Option<Wrap<Schema>>,
61 ) -> PyResult<Self> {
62 let null_values = null_values.map(|w| w.0);
63 let eol_char = eol_char.as_bytes()[0];
64 let row_index = row_index.map(|(name, offset)| RowIndex {
65 name: name.into(),
66 offset,
67 });
68 let quote_char = quote_char.and_then(|s| s.as_bytes().first().copied());
69
70 let overwrite_dtype = overwrite_dtype.map(|overwrite_dtype| {
71 overwrite_dtype
72 .iter()
73 .map(|(name, dtype)| {
74 let dtype = dtype.0.clone();
75 Field::new((&**name).into(), dtype)
76 })
77 .collect::<Schema>()
78 });
79
80 let overwrite_dtype_slice = overwrite_dtype_slice.map(|overwrite_dtype| {
81 overwrite_dtype
82 .iter()
83 .map(|dt| dt.0.clone())
84 .collect::<Vec<_>>()
85 });
86
87 let mmap_bytes_r = get_mmap_bytes_reader(&py_f)?;
88 py.enter_polars_df(move || {
89 CsvReadOptions::default()
90 .with_path(path)
91 .with_infer_schema_length(infer_schema_length)
92 .with_has_header(has_header)
93 .with_n_rows(n_rows)
94 .with_skip_rows(skip_rows)
95 .with_skip_lines(skip_lines)
96 .with_ignore_errors(ignore_errors)
97 .with_projection(projection.map(Arc::new))
98 .with_rechunk(rechunk)
99 .with_chunk_size(chunk_size)
100 .with_columns(columns.map(|x| x.into_iter().map(|x| x.into()).collect()))
101 .with_n_threads(n_threads)
102 .with_schema_overwrite(overwrite_dtype.map(Arc::new))
103 .with_dtype_overwrite(overwrite_dtype_slice.map(Arc::new))
104 .with_schema(schema.map(|schema| Arc::new(schema.0)))
105 .with_low_memory(low_memory)
106 .with_skip_rows_after_header(skip_rows_after_header)
107 .with_row_index(row_index)
108 .with_raise_if_empty(raise_if_empty)
109 .with_parse_options(
110 CsvParseOptions::default()
111 .with_separator(separator.as_bytes()[0])
112 .with_encoding(encoding.0)
113 .with_missing_is_null(!missing_utf8_is_empty_string)
114 .with_comment_prefix(comment_prefix)
115 .with_null_values(null_values)
116 .with_try_parse_dates(try_parse_dates)
117 .with_quote_char(quote_char)
118 .with_eol_char(eol_char)
119 .with_truncate_ragged_lines(truncate_ragged_lines)
120 .with_decimal_comma(decimal_comma),
121 )
122 .into_reader_with_file_handle(mmap_bytes_r)
123 .finish()
124 })
125 }
126
127 #[staticmethod]
128 #[cfg(feature = "json")]
129 #[pyo3(signature = (py_f, infer_schema_length, schema, schema_overrides))]
130 pub fn read_json(
131 py: Python<'_>,
132 py_f: Bound<PyAny>,
133 infer_schema_length: Option<usize>,
134 schema: Option<Wrap<Schema>>,
135 schema_overrides: Option<Wrap<Schema>>,
136 ) -> PyResult<Self> {
137 assert!(infer_schema_length != Some(0));
138 let mmap_bytes_r = get_mmap_bytes_reader(&py_f)?;
139
140 py.enter_polars_df(move || {
141 let mut builder = JsonReader::new(mmap_bytes_r)
142 .with_json_format(JsonFormat::Json)
143 .infer_schema_len(infer_schema_length.and_then(NonZeroUsize::new));
144
145 if let Some(schema) = schema {
146 builder = builder.with_schema(Arc::new(schema.0));
147 }
148
149 if let Some(schema) = schema_overrides.as_ref() {
150 builder = builder.with_schema_overwrite(&schema.0);
151 }
152
153 builder.finish()
154 })
155 }
156
157 #[staticmethod]
158 #[cfg(feature = "ipc")]
159 #[pyo3(signature = (py_f, columns, projection, n_rows, row_index, memory_map))]
160 pub fn read_ipc(
161 py: Python<'_>,
162 py_f: Bound<PyAny>,
163 columns: Option<Vec<String>>,
164 projection: Option<Vec<usize>>,
165 n_rows: Option<usize>,
166 row_index: Option<(String, IdxSize)>,
167 memory_map: bool,
168 ) -> PyResult<Self> {
169 let row_index = row_index.map(|(name, offset)| RowIndex {
170 name: name.into(),
171 offset,
172 });
173 let (mmap_bytes_r, mmap_path) = get_mmap_bytes_reader_and_path(&py_f)?;
174
175 let mmap_path = if memory_map { mmap_path } else { None };
176 py.enter_polars_df(move || {
177 IpcReader::new(mmap_bytes_r)
178 .with_projection(projection)
179 .with_columns(columns)
180 .with_n_rows(n_rows)
181 .with_row_index(row_index)
182 .memory_mapped(mmap_path)
183 .finish()
184 })
185 }
186
187 #[staticmethod]
188 #[cfg(feature = "ipc_streaming")]
189 #[pyo3(signature = (py_f, columns, projection, n_rows, row_index, rechunk))]
190 pub fn read_ipc_stream(
191 py: Python<'_>,
192 py_f: Bound<PyAny>,
193 columns: Option<Vec<String>>,
194 projection: Option<Vec<usize>>,
195 n_rows: Option<usize>,
196 row_index: Option<(String, IdxSize)>,
197 rechunk: bool,
198 ) -> PyResult<Self> {
199 let row_index = row_index.map(|(name, offset)| RowIndex {
200 name: name.into(),
201 offset,
202 });
203 let mmap_bytes_r = get_mmap_bytes_reader(&py_f)?;
204 py.enter_polars_df(move || {
205 IpcStreamReader::new(mmap_bytes_r)
206 .with_projection(projection)
207 .with_columns(columns)
208 .with_n_rows(n_rows)
209 .with_row_index(row_index)
210 .set_rechunk(rechunk)
211 .finish()
212 })
213 }
214
215 #[staticmethod]
216 #[cfg(feature = "avro")]
217 #[pyo3(signature = (py_f, columns, projection, n_rows))]
218 pub fn read_avro(
219 py: Python<'_>,
220 py_f: PyObject,
221 columns: Option<Vec<String>>,
222 projection: Option<Vec<usize>>,
223 n_rows: Option<usize>,
224 ) -> PyResult<Self> {
225 use polars::io::avro::AvroReader;
226
227 let file = get_file_like(py_f, false)?;
228 py.enter_polars_df(move || {
229 AvroReader::new(file)
230 .with_projection(projection)
231 .with_columns(columns)
232 .with_n_rows(n_rows)
233 .finish()
234 })
235 }
236
237 #[cfg(feature = "json")]
238 pub fn write_json(&mut self, py: Python<'_>, py_f: PyObject) -> PyResult<()> {
239 let file = BufWriter::new(get_file_like(py_f, true)?);
240 py.enter_polars(|| {
241 JsonWriter::new(file)
244 .with_json_format(JsonFormat::Json)
245 .finish(&mut self.df)
246 })
247 }
248
249 #[cfg(feature = "ipc_streaming")]
250 pub fn write_ipc_stream(
251 &mut self,
252 py: Python<'_>,
253 py_f: PyObject,
254 compression: Wrap<Option<IpcCompression>>,
255 compat_level: PyCompatLevel,
256 ) -> PyResult<()> {
257 let mut buf = get_file_like(py_f, true)?;
258 py.enter_polars(|| {
259 IpcStreamWriter::new(&mut buf)
260 .with_compression(compression.0)
261 .with_compat_level(compat_level.0)
262 .finish(&mut self.df)
263 })
264 }
265
266 #[cfg(feature = "avro")]
267 #[pyo3(signature = (py_f, compression, name))]
268 pub fn write_avro(
269 &mut self,
270 py: Python<'_>,
271 py_f: PyObject,
272 compression: Wrap<Option<AvroCompression>>,
273 name: String,
274 ) -> PyResult<()> {
275 use polars::io::avro::AvroWriter;
276 let mut buf = get_file_like(py_f, true)?;
277 py.enter_polars(|| {
278 AvroWriter::new(&mut buf)
279 .with_compression(compression.0)
280 .with_name(name)
281 .finish(&mut self.df)
282 })
283 }
284}