polars_python/
batched_csv.rs1use std::path::PathBuf;
2use std::sync::Mutex;
3
4use polars::io::RowIndex;
5use polars::io::csv::read::OwnedBatchedCsvReader;
6use polars::io::mmap::MmapBytesReader;
7use polars::prelude::*;
8use polars_utils::open_file;
9use pyo3::prelude::*;
10use pyo3::pybacked::PyBackedStr;
11
12use crate::error::PyPolarsErr;
13use crate::utils::EnterPolarsExt;
14use crate::{PyDataFrame, Wrap};
15
16#[pyclass]
17#[repr(transparent)]
18pub struct PyBatchedCsv {
19 reader: Mutex<OwnedBatchedCsvReader>,
20}
21
22#[pymethods]
23#[allow(clippy::wrong_self_convention, clippy::should_implement_trait)]
24impl PyBatchedCsv {
25 #[staticmethod]
26 #[pyo3(signature = (
27 infer_schema_length, chunk_size, has_header, ignore_errors, n_rows, skip_rows, skip_lines,
28 projection, separator, rechunk, columns, encoding, n_threads, path, schema_overrides,
29 overwrite_dtype_slice, low_memory, comment_prefix, quote_char, null_values,
30 missing_utf8_is_empty_string, try_parse_dates, skip_rows_after_header, row_index,
31 eol_char, raise_if_empty, truncate_ragged_lines, decimal_comma)
32 )]
33 fn new(
34 infer_schema_length: Option<usize>,
35 chunk_size: usize,
36 has_header: bool,
37 ignore_errors: bool,
38 n_rows: Option<usize>,
39 skip_rows: usize,
40 skip_lines: usize,
41 projection: Option<Vec<usize>>,
42 separator: &str,
43 rechunk: bool,
44 columns: Option<Vec<String>>,
45 encoding: Wrap<CsvEncoding>,
46 n_threads: Option<usize>,
47 path: PathBuf,
48 schema_overrides: Option<Vec<(PyBackedStr, Wrap<DataType>)>>,
49 overwrite_dtype_slice: Option<Vec<Wrap<DataType>>>,
50 low_memory: bool,
51 comment_prefix: Option<&str>,
52 quote_char: Option<&str>,
53 null_values: Option<Wrap<NullValues>>,
54 missing_utf8_is_empty_string: bool,
55 try_parse_dates: bool,
56 skip_rows_after_header: usize,
57 row_index: Option<(String, IdxSize)>,
58 eol_char: &str,
59 raise_if_empty: bool,
60 truncate_ragged_lines: bool,
61 decimal_comma: bool,
62 ) -> PyResult<PyBatchedCsv> {
63 let null_values = null_values.map(|w| w.0);
64 let eol_char = eol_char.as_bytes()[0];
65 let row_index = row_index.map(|(name, offset)| RowIndex {
66 name: name.into(),
67 offset,
68 });
69 let quote_char = if let Some(s) = quote_char {
70 if s.is_empty() {
71 None
72 } else {
73 Some(s.as_bytes()[0])
74 }
75 } else {
76 None
77 };
78
79 let schema_overrides = schema_overrides.map(|overwrite_dtype| {
80 overwrite_dtype
81 .iter()
82 .map(|(name, dtype)| {
83 let dtype = dtype.0.clone();
84 Field::new((&**name).into(), dtype)
85 })
86 .collect::<Schema>()
87 });
88
89 let overwrite_dtype_slice = overwrite_dtype_slice.map(|overwrite_dtype| {
90 overwrite_dtype
91 .iter()
92 .map(|dt| dt.0.clone())
93 .collect::<Vec<_>>()
94 });
95
96 let file = open_file(&path).map_err(PyPolarsErr::from)?;
97 let reader = Box::new(file) as Box<dyn MmapBytesReader>;
98 let reader = CsvReadOptions::default()
99 .with_infer_schema_length(infer_schema_length)
100 .with_has_header(has_header)
101 .with_n_rows(n_rows)
102 .with_skip_rows(skip_rows)
103 .with_skip_rows(skip_lines)
104 .with_ignore_errors(ignore_errors)
105 .with_projection(projection.map(Arc::new))
106 .with_rechunk(rechunk)
107 .with_chunk_size(chunk_size)
108 .with_columns(columns.map(|x| x.into_iter().map(PlSmallStr::from_string).collect()))
109 .with_n_threads(n_threads)
110 .with_dtype_overwrite(overwrite_dtype_slice.map(Arc::new))
111 .with_low_memory(low_memory)
112 .with_schema_overwrite(schema_overrides.map(Arc::new))
113 .with_skip_rows_after_header(skip_rows_after_header)
114 .with_row_index(row_index)
115 .with_raise_if_empty(raise_if_empty)
116 .with_parse_options(
117 CsvParseOptions::default()
118 .with_separator(separator.as_bytes()[0])
119 .with_encoding(encoding.0)
120 .with_missing_is_null(!missing_utf8_is_empty_string)
121 .with_comment_prefix(comment_prefix)
122 .with_null_values(null_values)
123 .with_try_parse_dates(try_parse_dates)
124 .with_quote_char(quote_char)
125 .with_eol_char(eol_char)
126 .with_truncate_ragged_lines(truncate_ragged_lines)
127 .with_decimal_comma(decimal_comma),
128 )
129 .into_reader_with_file_handle(reader);
130
131 let reader = reader.batched(None).map_err(PyPolarsErr::from)?;
132
133 Ok(PyBatchedCsv {
134 reader: Mutex::new(reader),
135 })
136 }
137
138 fn next_batches(&self, py: Python<'_>, n: usize) -> PyResult<Option<Vec<PyDataFrame>>> {
139 let reader = &self.reader;
140 let batches = py.enter_polars(move || reader.lock().unwrap().next_batches(n))?;
141
142 let batches = unsafe {
144 std::mem::transmute::<Option<Vec<DataFrame>>, Option<Vec<PyDataFrame>>>(batches)
145 };
146 Ok(batches)
147 }
148}