1pub(super) mod batched;
2
3use std::fmt;
4use std::sync::Mutex;
5
6use polars_core::POOL;
7use polars_core::prelude::*;
8use polars_core::utils::{accumulate_dataframes_vertical, handle_casting_failures};
9#[cfg(feature = "polars-time")]
10use polars_time::prelude::*;
11use polars_utils::relaxed_cell::RelaxedCell;
12use rayon::prelude::*;
13
14use super::CsvParseOptions;
15use super::buffer::init_buffers;
16use super::options::{CommentPrefix, CsvEncoding, NullValuesCompiled};
17use super::parser::{
18 CountLines, SplitLines, is_comment_line, parse_lines, skip_bom, skip_line_ending,
19 skip_lines_naive, skip_this_line,
20};
21use super::reader::prepare_csv_schema;
22use super::schema_inference::infer_file_schema;
23#[cfg(feature = "decompress")]
24use super::utils::decompress;
25use crate::RowIndex;
26use crate::csv::read::parser::skip_this_line_naive;
27use crate::mmap::ReaderBytes;
28use crate::predicates::PhysicalIoExpr;
29use crate::utils::compression::SupportedCompression;
30use crate::utils::update_row_counts2;
31
32pub fn cast_columns(
33 df: &mut DataFrame,
34 to_cast: &[Field],
35 parallel: bool,
36 ignore_errors: bool,
37) -> PolarsResult<()> {
38 let cast_fn = |c: &Column, fld: &Field| {
39 let out = match (c.dtype(), fld.dtype()) {
40 #[cfg(feature = "temporal")]
41 (DataType::String, DataType::Date) => c
42 .str()
43 .unwrap()
44 .as_date(None, false)
45 .map(|ca| ca.into_column()),
46 #[cfg(feature = "temporal")]
47 (DataType::String, DataType::Time) => c
48 .str()
49 .unwrap()
50 .as_time(None, false)
51 .map(|ca| ca.into_column()),
52 #[cfg(feature = "temporal")]
53 (DataType::String, DataType::Datetime(tu, _)) => c
54 .str()
55 .unwrap()
56 .as_datetime(
57 None,
58 *tu,
59 false,
60 false,
61 None,
62 &StringChunked::from_iter(std::iter::once("raise")),
63 )
64 .map(|ca| ca.into_column()),
65 (_, dt) => c.cast(dt),
66 }?;
67 if !ignore_errors && c.null_count() != out.null_count() {
68 handle_casting_failures(c.as_materialized_series(), out.as_materialized_series())?;
69 }
70 Ok(out)
71 };
72
73 if parallel {
74 let cols = POOL.install(|| {
75 df.get_columns()
76 .into_par_iter()
77 .map(|s| {
78 if let Some(fld) = to_cast.iter().find(|fld| fld.name() == s.name()) {
79 cast_fn(s, fld)
80 } else {
81 Ok(s.clone())
82 }
83 })
84 .collect::<PolarsResult<Vec<_>>>()
85 })?;
86 *df = unsafe { DataFrame::new_no_checks(df.height(), cols) }
87 } else {
88 for fld in to_cast {
90 if let Some(idx) = df.get_column_index(fld.name()) {
92 df.try_apply_at_idx(idx, |s| cast_fn(s, fld))?;
93 }
94 }
95
96 df.clear_schema();
97 }
98 Ok(())
99}
100
101pub(crate) struct CoreReader<'a> {
103 reader_bytes: Option<ReaderBytes<'a>>,
104 schema: SchemaRef,
106 parse_options: CsvParseOptions,
107 projection: Option<Vec<usize>>,
109 current_line: usize,
111 ignore_errors: bool,
112 skip_lines: usize,
113 skip_rows_before_header: usize,
114 skip_rows_after_header: usize,
116 n_rows: Option<usize>,
117 n_threads: Option<usize>,
118 has_header: bool,
119 chunk_size: usize,
120 null_values: Option<NullValuesCompiled>,
121 predicate: Option<Arc<dyn PhysicalIoExpr>>,
122 to_cast: Vec<Field>,
123 row_index: Option<RowIndex>,
124}
125
126impl fmt::Debug for CoreReader<'_> {
127 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
128 f.debug_struct("Reader")
129 .field("schema", &self.schema)
130 .field("projection", &self.projection)
131 .field("current_line", &self.current_line)
132 .finish()
133 }
134}
135
136impl<'a> CoreReader<'a> {
137 #[allow(clippy::too_many_arguments)]
138 pub(crate) fn new(
139 reader_bytes: ReaderBytes<'a>,
140 parse_options: Arc<CsvParseOptions>,
141 n_rows: Option<usize>,
142 skip_rows: usize,
143 skip_lines: usize,
144 mut projection: Option<Vec<usize>>,
145 max_records: Option<usize>,
146 has_header: bool,
147 ignore_errors: bool,
148 schema: Option<SchemaRef>,
149 columns: Option<Arc<[PlSmallStr]>>,
150 n_threads: Option<usize>,
151 schema_overwrite: Option<SchemaRef>,
152 dtype_overwrite: Option<Arc<Vec<DataType>>>,
153 chunk_size: usize,
154 predicate: Option<Arc<dyn PhysicalIoExpr>>,
155 mut to_cast: Vec<Field>,
156 skip_rows_after_header: usize,
157 row_index: Option<RowIndex>,
158 raise_if_empty: bool,
159 ) -> PolarsResult<CoreReader<'a>> {
160 let separator = parse_options.separator;
161
162 #[cfg(feature = "decompress")]
163 let mut reader_bytes = reader_bytes;
164
165 if !cfg!(feature = "decompress") && SupportedCompression::check(&reader_bytes).is_some() {
166 polars_bail!(
167 ComputeError: "cannot read compressed CSV file; \
168 compile with feature 'decompress'"
169 );
170 }
171 #[cfg(feature = "decompress")]
175 {
176 let total_n_rows =
177 n_rows.map(|n| skip_rows + (has_header as usize) + skip_rows_after_header + n);
178 if let Some(b) = decompress(
179 &reader_bytes,
180 total_n_rows,
181 separator,
182 parse_options.quote_char,
183 parse_options.eol_char,
184 ) {
185 reader_bytes = ReaderBytes::Owned(b.into());
186 }
187 }
188
189 let mut schema = match schema {
190 Some(schema) => schema,
191 None => {
192 let (inferred_schema, _, _) = infer_file_schema(
193 &reader_bytes,
194 &parse_options,
195 max_records,
196 has_header,
197 schema_overwrite.as_deref(),
198 skip_rows,
199 skip_lines,
200 skip_rows_after_header,
201 raise_if_empty,
202 )?;
203 Arc::new(inferred_schema)
204 },
205 };
206 if let Some(dtypes) = dtype_overwrite {
207 polars_ensure!(
208 dtypes.len() <= schema.len(),
209 InvalidOperation: "The number of schema overrides must be less than or equal to the number of fields"
210 );
211 let s = Arc::make_mut(&mut schema);
212 for (index, dt) in dtypes.iter().enumerate() {
213 s.set_dtype_at_index(index, dt.clone()).unwrap();
214 }
215 }
216
217 prepare_csv_schema(&mut schema, &mut to_cast)?;
218
219 let null_values = parse_options
221 .null_values
222 .as_ref()
223 .map(|nv| nv.clone().compile(&schema))
224 .transpose()?;
225
226 if let Some(cols) = columns {
227 let mut prj = Vec::with_capacity(cols.len());
228 for col in cols.as_ref() {
229 let i = schema.try_index_of(col)?;
230 prj.push(i);
231 }
232 projection = Some(prj);
233 }
234
235 Ok(CoreReader {
236 reader_bytes: Some(reader_bytes),
237 parse_options: (*parse_options).clone(),
238 schema,
239 projection,
240 current_line: usize::from(has_header),
241 ignore_errors,
242 skip_lines,
243 skip_rows_before_header: skip_rows,
244 skip_rows_after_header,
245 n_rows,
246 n_threads,
247 has_header,
248 chunk_size,
249 null_values,
250 predicate,
251 to_cast,
252 row_index,
253 })
254 }
255
256 fn find_starting_point<'b>(
257 &self,
258 bytes: &'b [u8],
259 quote_char: Option<u8>,
260 eol_char: u8,
261 ) -> PolarsResult<(&'b [u8], Option<usize>)> {
262 let i = find_starting_point(
263 bytes,
264 quote_char,
265 eol_char,
266 self.schema.len(),
267 self.skip_lines,
268 self.skip_rows_before_header,
269 self.skip_rows_after_header,
270 self.parse_options.comment_prefix.as_ref(),
271 self.has_header,
272 )?;
273
274 Ok((&bytes[i..], (i <= bytes.len()).then_some(i)))
275 }
276
277 fn get_projection(&mut self) -> PolarsResult<Vec<usize>> {
278 self.projection
281 .take()
282 .map(|mut v| {
283 v.sort_unstable();
284 if let Some(idx) = v.last() {
285 polars_ensure!(*idx < self.schema.len(), OutOfBounds: "projection index: {} is out of bounds for csv schema with length: {}", idx, self.schema.len())
286 }
287 Ok(v)
288 })
289 .unwrap_or_else(|| Ok((0..self.schema.len()).collect()))
290 }
291
292 fn read_chunk(
293 &self,
294 bytes: &[u8],
295 projection: &[usize],
296 bytes_offset: usize,
297 capacity: usize,
298 starting_point_offset: Option<usize>,
299 stop_at_nbytes: usize,
300 ) -> PolarsResult<DataFrame> {
301 let mut df = read_chunk(
302 bytes,
303 &self.parse_options,
304 self.schema.as_ref(),
305 self.ignore_errors,
306 projection,
307 bytes_offset,
308 capacity,
309 self.null_values.as_ref(),
310 usize::MAX,
311 stop_at_nbytes,
312 starting_point_offset,
313 )?;
314
315 cast_columns(&mut df, &self.to_cast, false, self.ignore_errors)?;
316 Ok(df)
317 }
318
319 fn parse_csv(&mut self, bytes: &[u8]) -> PolarsResult<DataFrame> {
324 let (bytes, _) = self.find_starting_point(
325 bytes,
326 self.parse_options.quote_char,
327 self.parse_options.eol_char,
328 )?;
329
330 let projection = self.get_projection()?;
331
332 if bytes.is_empty() {
334 let mut df = if projection.len() == self.schema.len() {
335 DataFrame::empty_with_schema(self.schema.as_ref())
336 } else {
337 DataFrame::empty_with_schema(
338 &projection
339 .iter()
340 .map(|&i| self.schema.get_at_index(i).unwrap())
341 .map(|(name, dtype)| Field {
342 name: name.clone(),
343 dtype: dtype.clone(),
344 })
345 .collect::<Schema>(),
346 )
347 };
348
349 cast_columns(&mut df, &self.to_cast, false, self.ignore_errors)?;
350
351 if let Some(ref row_index) = self.row_index {
352 df.insert_column(0, Series::new_empty(row_index.name.clone(), &IDX_DTYPE))?;
353 }
354 return Ok(df);
355 }
356
357 let n_threads = self.n_threads.unwrap_or_else(|| POOL.current_num_threads());
358
359 let n_parts_hint = n_threads * 16;
364 let chunk_size = std::cmp::min(bytes.len() / n_parts_hint, 16 * 1024 * 1024);
365
366 #[cfg(debug_assertions)]
368 let min_chunk_size = 64;
369 #[cfg(not(debug_assertions))]
370 let min_chunk_size = 1024 * 4;
371
372 let mut chunk_size = std::cmp::max(chunk_size, min_chunk_size);
373 let mut total_bytes_offset = 0;
374
375 let results = Arc::new(Mutex::new(vec![]));
376 let total_line_count = &RelaxedCell::new_usize(0);
378
379 let counter = CountLines::new(
380 self.parse_options.quote_char,
381 self.parse_options.eol_char,
382 None,
383 );
384 let mut total_offset = 0;
385 let mut previous_total_offset = 0;
386 let check_utf8 = matches!(self.parse_options.encoding, CsvEncoding::Utf8)
387 && self.schema.iter_fields().any(|f| f.dtype().is_string());
388
389 POOL.scope(|s| {
390 loop {
392 let b = unsafe { bytes.get_unchecked(total_offset..) };
393 if b.is_empty() {
394 break;
395 }
396 debug_assert!(
397 total_offset == 0 || bytes[total_offset - 1] == self.parse_options.eol_char
398 );
399
400 let (count, position) = counter.find_next(b, &mut chunk_size);
403 debug_assert!(count == 0 || b[position] == self.parse_options.eol_char);
404
405 let (b, count) = if count == 0
406 && unsafe {
407 std::ptr::eq(b.as_ptr().add(b.len()), bytes.as_ptr().add(bytes.len()))
408 } {
409 total_offset = bytes.len();
410 let c = if is_comment_line(bytes, self.parse_options.comment_prefix.as_ref()) {
411 0
412 } else {
413 1
414 };
415 (b, c)
416 } else {
417 let end = total_offset + position + 1;
418 let b = unsafe { bytes.get_unchecked(total_offset..end) };
419
420 previous_total_offset = total_offset;
421 total_offset = end;
422 (b, count)
423 };
424
425 if !b.is_empty() {
427 let results = results.clone();
428 let projection = projection.as_ref();
429 let slf = &(*self);
430 s.spawn(move |_| {
431 if check_utf8 && !super::buffer::validate_utf8(b) {
432 let mut results = results.lock().unwrap();
433 results.push((
434 b.as_ptr() as usize,
435 Err(polars_err!(ComputeError: "invalid utf-8 sequence")),
436 ));
437 return;
438 }
439
440 let result = slf
441 .read_chunk(b, projection, 0, count, Some(0), b.len())
442 .and_then(|mut df| {
443 if df.height() > count
445 || (df.height() < count
446 && slf.parse_options.comment_prefix.is_none())
447 {
448 let msg = format!(
450 "CSV malformed: expected {} rows, \
451 actual {} rows, in chunk starting at \
452 byte offset {}, length {}",
453 count,
454 df.height(),
455 previous_total_offset,
456 b.len()
457 );
458 if slf.ignore_errors {
459 polars_warn!(msg);
460 } else {
461 polars_bail!(ComputeError: msg);
462 }
463 }
464
465 if slf.n_rows.is_some() {
466 total_line_count.fetch_add(df.height());
467 }
468
469 if let Some(rc) = &slf.row_index {
471 let offset = if std::ptr::eq(b.as_ptr(), bytes.as_ptr()) {
473 Some(rc.offset)
474 } else {
475 None
476 };
477
478 unsafe { df.with_row_index_mut(rc.name.clone(), offset) };
479 };
480
481 if let Some(predicate) = slf.predicate.as_ref() {
482 let s = predicate.evaluate_io(&df)?;
483 let mask = s.bool()?;
484 df = df.filter(mask)?;
485 }
486 Ok(df)
487 });
488
489 results.lock().unwrap().push((b.as_ptr() as usize, result));
490 });
491
492 if self.n_rows.is_some() && total_line_count.load() > self.n_rows.unwrap() {
495 break;
496 }
497 }
498 total_bytes_offset += b.len();
499 }
500 });
501
502 let mut results = std::mem::take(&mut *results.lock().unwrap());
503 results.sort_unstable_by_key(|k| k.0);
504 let mut dfs = results
505 .into_iter()
506 .map(|k| k.1)
507 .collect::<PolarsResult<Vec<_>>>()?;
508
509 if let Some(rc) = &self.row_index {
510 update_row_counts2(&mut dfs, rc.offset)
511 };
512 accumulate_dataframes_vertical(dfs)
513 }
514
515 pub fn finish(mut self) -> PolarsResult<DataFrame> {
517 let reader_bytes = self.reader_bytes.take().unwrap();
518 let mut df = self.parse_csv(&reader_bytes)?;
519
520 if let Some(n_rows) = self.n_rows {
523 if n_rows < df.height() {
524 df = df.slice(0, n_rows)
525 }
526 }
527 Ok(df)
528 }
529}
530
531#[allow(clippy::too_many_arguments)]
532pub fn read_chunk(
533 bytes: &[u8],
534 parse_options: &CsvParseOptions,
535 schema: &Schema,
536 ignore_errors: bool,
537 projection: &[usize],
538 bytes_offset_thread: usize,
539 capacity: usize,
540 null_values: Option<&NullValuesCompiled>,
541 chunk_size: usize,
542 stop_at_nbytes: usize,
543 starting_point_offset: Option<usize>,
544) -> PolarsResult<DataFrame> {
545 let mut read = bytes_offset_thread;
546 let mut buffers = init_buffers(
552 projection,
553 capacity + 1,
554 schema,
555 parse_options.quote_char,
556 parse_options.encoding,
557 parse_options.decimal_comma,
558 )?;
559
560 debug_assert!(projection.is_sorted());
561
562 let mut last_read = usize::MAX;
563 loop {
564 if read >= stop_at_nbytes || read == last_read {
565 break;
566 }
567 let local_bytes = &bytes[read..stop_at_nbytes];
568
569 last_read = read;
570 let offset = read + starting_point_offset.unwrap();
571 read += parse_lines(
572 local_bytes,
573 parse_options,
574 offset,
575 ignore_errors,
576 null_values,
577 projection,
578 &mut buffers,
579 chunk_size,
580 schema.len(),
581 schema,
582 )?;
583 }
584
585 let columns = buffers
586 .into_iter()
587 .map(|buf| buf.into_series().map(Column::from))
588 .collect::<PolarsResult<Vec<_>>>()?;
589 Ok(unsafe { DataFrame::new_no_checks_height_from_first(columns) })
590}
591
592#[allow(clippy::too_many_arguments)]
593pub fn find_starting_point(
594 mut bytes: &[u8],
595 quote_char: Option<u8>,
596 eol_char: u8,
597 schema_len: usize,
598 skip_lines: usize,
599 skip_rows_before_header: usize,
600 skip_rows_after_header: usize,
601 comment_prefix: Option<&CommentPrefix>,
602 has_header: bool,
603) -> PolarsResult<usize> {
604 let full_len = bytes.len();
605 let starting_point_offset = bytes.as_ptr() as usize;
606
607 bytes = if skip_lines > 0 {
608 polars_ensure!(skip_rows_before_header == 0, InvalidOperation: "only one of 'skip_rows'/'skip_lines' may be set");
609 skip_lines_naive(bytes, eol_char, skip_lines)
610 } else {
611 bytes = skip_bom(bytes);
613
614 if schema_len > 1 {
617 bytes = skip_line_ending(bytes, eol_char)
618 }
619 bytes
620 };
621
622 if skip_rows_before_header > 0 {
624 let mut split_lines = SplitLines::new(bytes, quote_char, eol_char, comment_prefix);
625 let mut current_line = &bytes[..0];
626
627 for _ in 0..skip_rows_before_header {
628 current_line = split_lines
629 .next()
630 .ok_or_else(|| polars_err!(NoData: "not enough lines to skip"))?;
631 }
632
633 current_line = split_lines
634 .next()
635 .unwrap_or(¤t_line[current_line.len()..]);
636 bytes = &bytes[current_line.as_ptr() as usize - bytes.as_ptr() as usize..];
637 }
638
639 while is_comment_line(bytes, comment_prefix) {
641 bytes = skip_this_line_naive(bytes, eol_char);
642 }
643
644 if has_header {
646 bytes = skip_this_line(bytes, quote_char, eol_char);
647 }
648 if skip_rows_after_header > 0 {
650 let mut split_lines = SplitLines::new(bytes, quote_char, eol_char, comment_prefix);
651 let mut current_line = &bytes[..0];
652
653 for _ in 0..skip_rows_after_header {
654 current_line = split_lines
655 .next()
656 .ok_or_else(|| polars_err!(NoData: "not enough lines to skip"))?;
657 }
658
659 current_line = split_lines
660 .next()
661 .unwrap_or(¤t_line[current_line.len()..]);
662 bytes = &bytes[current_line.as_ptr() as usize - bytes.as_ptr() as usize..];
663 }
664
665 Ok(
666 if bytes.is_empty() {
669 full_len
670 } else {
671 bytes.as_ptr() as usize - starting_point_offset
672 },
673 )
674}