1use std::cmp;
2use std::iter::Iterator;
3use std::num::NonZeroUsize;
4use std::sync::Arc;
5
6use polars_buffer::Buffer;
7use polars_core::prelude::Schema;
8use polars_core::schema::SchemaRef;
9use polars_error::{PolarsResult, polars_bail, polars_ensure};
10
11use crate::csv::read::schema_inference::infer_file_schema_impl;
12use crate::prelude::_csv_read_internal::{SplitLines, is_comment_line};
13use crate::prelude::{CsvParseOptions, CsvReadOptions};
14use crate::utils::compression::CompressedReader;
15
16pub type InspectContentFn<'a> = Box<dyn FnMut(&[u8]) + 'a>;
17
18#[inline(never)]
31pub fn read_until_start_and_infer_schema(
32 options: &CsvReadOptions,
33 projected_schema: Option<SchemaRef>,
34 mut inspect_first_content_row_fn: Option<InspectContentFn<'_>>,
35 reader: &mut CompressedReader,
36) -> PolarsResult<(Schema, Buffer<u8>)> {
37 const ESTIMATED_BYTES_PER_ROW: usize = 200;
39
40 #[derive(Copy, Clone)]
41 enum State {
42 SkipEmpty,
44 SkipRowsBeforeHeader(usize),
45 SkipHeader(bool),
46 SkipRowsAfterHeader(usize),
47 ContentInspect,
48 InferCollect,
49 Done,
50 }
51
52 polars_ensure!(
53 !(options.skip_lines != 0 && options.skip_rows != 0),
54 InvalidOperation: "only one of 'skip_rows'/'skip_lines' may be set"
55 );
56
57 let prev_leftover = skip_lines_naive(
60 options.parse_options.eol_char,
61 options.skip_lines,
62 options.raise_if_empty,
63 reader,
64 )?;
65
66 let mut state = if options.has_header {
67 State::SkipEmpty
68 } else if options.skip_lines != 0 {
69 State::SkipHeader(false)
72 } else {
73 State::SkipRowsBeforeHeader(options.skip_rows)
74 };
75
76 let comment_prefix = options.parse_options.comment_prefix.as_ref();
77 let infer_schema_length = options.infer_schema_length.unwrap_or(usize::MAX);
78
79 let mut header_line = None;
80 let mut content_lines = Vec::with_capacity(options.infer_schema_length.unwrap_or_else(|| {
81 reader
82 .total_len_estimate()
83 .saturating_div(ESTIMATED_BYTES_PER_ROW)
84 }));
85
86 let initial_read_size = options
92 .infer_schema_length
93 .map(|isl| {
94 cmp::max(
95 CompressedReader::initial_read_size(),
96 isl.saturating_mul(ESTIMATED_BYTES_PER_ROW),
97 )
98 })
99 .unwrap_or(usize::MAX);
100
101 let leftover = for_each_line_from_reader(
102 &options.parse_options,
103 true,
104 prev_leftover,
105 initial_read_size,
106 reader,
107 |mem_slice_line| {
108 let line = &*mem_slice_line;
109
110 let done = loop {
111 match &mut state {
112 State::SkipEmpty => {
113 if line.is_empty() || line == b"\r" {
114 break LineUse::ConsumeDiscard;
115 }
116
117 state = State::SkipRowsBeforeHeader(options.skip_rows);
118 },
119 State::SkipRowsBeforeHeader(remaining) => {
120 let is_comment = is_comment_line(line, comment_prefix);
121
122 if *remaining == 0 && !is_comment {
123 state = State::SkipHeader(false);
124 continue;
125 }
126
127 *remaining -= !is_comment as usize;
128 break LineUse::ConsumeDiscard;
129 },
130 State::SkipHeader(did_skip) => {
131 if !options.has_header || *did_skip {
132 state = State::SkipRowsAfterHeader(options.skip_rows_after_header);
133 continue;
134 }
135
136 header_line = Some(mem_slice_line.clone());
137 *did_skip = true;
138 break LineUse::ConsumeDiscard;
139 },
140 State::SkipRowsAfterHeader(remaining) => {
141 let is_comment = is_comment_line(line, comment_prefix);
142
143 if *remaining == 0 && !is_comment {
144 state = State::ContentInspect;
145 continue;
146 }
147
148 *remaining -= !is_comment as usize;
149 break LineUse::ConsumeDiscard;
150 },
151 State::ContentInspect => {
152 if let Some(func) = &mut inspect_first_content_row_fn {
153 func(line);
154 }
155
156 state = State::InferCollect;
157 },
158 State::InferCollect => {
159 if !is_comment_line(line, comment_prefix) {
160 content_lines.push(mem_slice_line.clone());
161 if content_lines.len() >= infer_schema_length {
162 state = State::Done;
163 continue;
164 }
165 }
166
167 break LineUse::ConsumeKeep;
168 },
169 State::Done => {
170 break LineUse::Done;
171 },
172 }
173 };
174
175 Ok(done)
176 },
177 )?;
178
179 let infer_all_as_str = infer_schema_length == 0;
180
181 let inferred_schema = infer_schema(
182 &header_line,
183 &content_lines,
184 infer_all_as_str,
185 options,
186 projected_schema,
187 )?;
188
189 Ok((inferred_schema, leftover))
190}
191
192enum LineUse {
193 ConsumeDiscard,
194 ConsumeKeep,
195 Done,
196}
197
198fn for_each_line_from_reader(
203 parse_options: &CsvParseOptions,
204 is_file_start: bool,
205 mut prev_leftover: Buffer<u8>,
206 initial_read_size: usize,
207 reader: &mut CompressedReader,
208 mut line_fn: impl FnMut(Buffer<u8>) -> PolarsResult<LineUse>,
209) -> PolarsResult<Buffer<u8>> {
210 let mut is_first_line = is_file_start;
211
212 let fixed_read_size = std::env::var("POLARS_FORCE_CSV_INFER_CHUNK_SIZE")
213 .map(|x| {
214 x.parse::<NonZeroUsize>()
215 .unwrap_or_else(|_| {
216 panic!("invalid value for POLARS_FORCE_CSV_INFER_CHUNK_SIZE: {x}")
217 })
218 .get()
219 })
220 .ok();
221
222 let mut read_size = fixed_read_size.unwrap_or(initial_read_size);
223 let mut retain_offset = None;
224
225 loop {
226 let (mut slice, bytes_read) = reader.read_next_slice(&prev_leftover, read_size)?;
227 if slice.is_empty() {
228 return Ok(Buffer::new());
229 }
230
231 if is_first_line {
232 is_first_line = false;
233 const UTF8_BOM_MARKER: Option<&[u8]> = Some(b"\xef\xbb\xbf");
234 if slice.get(0..3) == UTF8_BOM_MARKER {
235 slice = slice.sliced(3..);
236 }
237 }
238
239 let line_to_sub_slice = |line: &[u8]| {
240 let start = line.as_ptr() as usize - slice.as_ptr() as usize;
241 slice.clone().sliced(start..(start + line.len()))
242 };
243
244 let effective_slice = if let Some(offset) = retain_offset {
246 slice.clone().sliced(offset..)
247 } else {
248 slice.clone()
249 };
250
251 let mut lines = SplitLines::new(
252 &effective_slice,
253 parse_options.quote_char,
254 parse_options.eol_char,
255 parse_options.comment_prefix.as_ref(),
256 );
257 let Some(mut prev_line) = lines.next() else {
258 read_size = read_size.saturating_mul(2);
259 prev_leftover = slice;
260 continue;
261 };
262
263 let mut should_ret = false;
264
265 for next_line in lines {
268 match line_fn(line_to_sub_slice(prev_line))? {
269 LineUse::ConsumeDiscard => debug_assert!(retain_offset.is_none()),
270 LineUse::ConsumeKeep => {
271 if retain_offset.is_none() {
272 let retain_start_offset =
273 prev_line.as_ptr() as usize - slice.as_ptr() as usize;
274 prev_leftover = slice.clone().sliced(retain_start_offset..);
275 retain_offset = Some(0);
276 }
277 },
278 LineUse::Done => {
279 should_ret = true;
280 break;
281 },
282 }
283 prev_line = next_line;
284 }
285
286 let mut unconsumed_offset = prev_line.as_ptr() as usize - effective_slice.as_ptr() as usize;
287
288 if bytes_read < read_size {
291 match line_fn(line_to_sub_slice(prev_line))? {
292 LineUse::ConsumeDiscard => {
293 debug_assert!(retain_offset.is_none());
294 unconsumed_offset += prev_line.len();
295 if effective_slice.get(unconsumed_offset) == Some(&parse_options.eol_char) {
296 unconsumed_offset += 1;
297 }
298 },
299 LineUse::ConsumeKeep | LineUse::Done => (),
300 }
301 should_ret = true;
302 }
303
304 if let Some(offset) = &mut retain_offset {
305 if *offset == 0 {
306 *offset = unconsumed_offset - (slice.len() - prev_leftover.len());
309 } else {
310 prev_leftover = slice;
311 *offset += unconsumed_offset;
312 }
313 } else {
314 prev_leftover = slice.sliced(unconsumed_offset..);
317 }
318
319 if should_ret {
320 return Ok(prev_leftover);
321 }
322
323 if read_size < CompressedReader::ideal_read_size() && fixed_read_size.is_none() {
324 read_size *= 4;
325 }
326 }
327}
328
329fn skip_lines_naive(
330 eol_char: u8,
331 skip_lines: usize,
332 raise_if_empty: bool,
333 reader: &mut CompressedReader,
334) -> PolarsResult<Buffer<u8>> {
335 let mut prev_leftover = Buffer::new();
336
337 if skip_lines == 0 {
338 return Ok(prev_leftover);
339 }
340
341 let mut remaining = skip_lines;
342 let mut read_size = CompressedReader::initial_read_size();
343
344 loop {
345 let (slice, bytes_read) = reader.read_next_slice(&prev_leftover, read_size)?;
346 let mut bytes: &[u8] = &slice;
347
348 'inner: loop {
349 let Some(mut pos) = memchr::memchr(eol_char, bytes) else {
350 read_size = read_size.saturating_mul(2);
351 break 'inner;
352 };
353 pos = cmp::min(pos + 1, bytes.len());
354
355 bytes = &bytes[pos..];
356 remaining -= 1;
357
358 if remaining == 0 {
359 let unconsumed_offset = bytes.as_ptr() as usize - slice.as_ptr() as usize;
360 prev_leftover = slice.sliced(unconsumed_offset..);
361 return Ok(prev_leftover);
362 }
363 }
364
365 if bytes_read == 0 {
366 if raise_if_empty {
367 polars_bail!(NoData: "specified skip_lines is larger than total number of lines.");
368 } else {
369 return Ok(Buffer::new());
370 }
371 }
372
373 prev_leftover = Buffer::new();
375
376 if read_size < CompressedReader::ideal_read_size() {
377 read_size *= 4;
378 }
379 }
380}
381
382fn infer_schema(
383 header_line: &Option<Buffer<u8>>,
384 content_lines: &[Buffer<u8>],
385 infer_all_as_str: bool,
386 options: &CsvReadOptions,
387 projected_schema: Option<SchemaRef>,
388) -> PolarsResult<Schema> {
389 let has_no_inference_data = if options.has_header {
390 header_line.is_none()
391 } else {
392 content_lines.is_empty()
393 };
394
395 if options.raise_if_empty && has_no_inference_data {
396 polars_bail!(NoData: "empty CSV");
397 }
398
399 let mut inferred_schema = if has_no_inference_data {
400 Schema::default()
401 } else {
402 infer_file_schema_impl(
403 header_line,
404 content_lines,
405 infer_all_as_str,
406 &options.parse_options,
407 options.schema_overwrite.as_deref(),
408 )
409 };
410
411 if let Some(schema) = &options.schema {
412 if schema.len() < inferred_schema.len() && !options.parse_options.truncate_ragged_lines {
416 polars_bail!(
417 SchemaMismatch:
418 "provided schema does not match number of columns in file ({} != {} in file)",
419 schema.len(),
420 inferred_schema.len(),
421 );
422 }
423
424 if options.parse_options.truncate_ragged_lines {
425 inferred_schema = Arc::unwrap_or_clone(schema.clone());
426 } else {
427 inferred_schema = schema
428 .iter_names()
429 .zip(inferred_schema.into_iter().map(|(_, dtype)| dtype))
430 .map(|(name, dtype)| (name.clone(), dtype))
431 .collect();
432 }
433 }
434
435 if let Some(dtypes) = options.dtype_overwrite.as_deref() {
436 for (i, dtype) in dtypes.iter().enumerate() {
437 inferred_schema.set_dtype_at_index(i, dtype.clone());
438 }
439 }
440
441 if let Some(projected_schema) = projected_schema {
444 for (name, inferred_dtype) in inferred_schema.iter_mut() {
445 if let Some(projected_dtype) = projected_schema.get(name) {
446 *inferred_dtype = projected_dtype.clone();
447 }
448 }
449 }
450
451 Ok(inferred_schema)
452}