use std::iter::Iterator;
use std::ops::Range;
use std::sync::Arc;
use async_trait::async_trait;
use polars_buffer::Buffer;
use polars_core::prelude::Field;
use polars_core::schema::{SchemaExt, SchemaRef};
use polars_error::{PolarsResult, polars_bail, polars_err, polars_warn};
use polars_io::cloud::CloudOptions;
use polars_io::csv::read::streaming::read_until_start_and_infer_schema;
use polars_io::prelude::_csv_read_internal::{
CountLines, NullValuesCompiled, cast_columns, prepare_csv_schema, read_chunk,
};
use polars_io::prelude::builder::validate_utf8;
use polars_io::prelude::{CsvEncoding, CsvParseOptions, CsvReadOptions};
use polars_io::utils::compression::CompressedReader;
use polars_io::utils::slice::SplitSlicePosition;
use polars_plan::dsl::ScanSource;
use polars_utils::IdxSize;
use polars_utils::mem::prefetch::prefetch_l2;
use polars_utils::slice_enum::Slice;
use super::multi_scan::reader_interface::output::FileReaderOutputRecv;
use super::multi_scan::reader_interface::{BeginReadArgs, FileReader, FileReaderCallbacks};
use crate::DEFAULT_DISTRIBUTOR_BUFFER_SIZE;
use crate::async_executor::{AbortOnDropHandle, spawn};
use crate::async_primitives::distributor_channel::{self, distributor_channel};
use crate::morsel::SourceToken;
use crate::nodes::compute_node_prelude::*;
use crate::nodes::io_sources::multi_scan::reader_interface::Projection;
use crate::nodes::io_sources::multi_scan::reader_interface::output::FileReaderOutputSend;
use crate::nodes::{MorselSeq, TaskPriority};
pub mod builder {
use std::sync::Arc;
use polars_core::config;
use polars_io::cloud::CloudOptions;
use polars_io::prelude::CsvReadOptions;
use polars_plan::dsl::ScanSource;
use super::CsvFileReader;
use crate::nodes::io_sources::multi_scan::reader_interface::FileReader;
use crate::nodes::io_sources::multi_scan::reader_interface::builder::FileReaderBuilder;
use crate::nodes::io_sources::multi_scan::reader_interface::capabilities::ReaderCapabilities;
impl FileReaderBuilder for Arc<CsvReadOptions> {
fn reader_name(&self) -> &str {
"csv"
}
fn reader_capabilities(&self) -> ReaderCapabilities {
use ReaderCapabilities as RC;
RC::NEEDS_FILE_CACHE_INIT
| if self.parse_options.comment_prefix.is_some() {
RC::empty()
} else {
RC::PRE_SLICE
}
}
fn build_file_reader(
&self,
source: ScanSource,
cloud_options: Option<Arc<CloudOptions>>,
_scan_source_idx: usize,
) -> Box<dyn FileReader> {
let scan_source = source;
let verbose = config::verbose();
let options = self.clone();
let reader = CsvFileReader {
scan_source,
cloud_options,
options,
verbose,
cached_bytes: None,
};
Box::new(reader) as Box<dyn FileReader>
}
}
}
const NO_SLICE: (usize, usize) = (0, usize::MAX);
const SLICE_ENDED: (usize, usize) = (usize::MAX, 0);
struct LineBatch {
mem_slice: Buffer<u8>,
n_lines: usize,
slice: (usize, usize),
row_offset: usize,
morsel_seq: MorselSeq,
}
struct CsvFileReader {
scan_source: ScanSource,
#[expect(unused)] cloud_options: Option<Arc<CloudOptions>>,
options: Arc<CsvReadOptions>,
cached_bytes: Option<Buffer<u8>>,
verbose: bool,
}
#[async_trait]
impl FileReader for CsvFileReader {
async fn initialize(&mut self) -> PolarsResult<()> {
let buffer = self
.scan_source
.as_scan_source_ref()
.to_buffer_async_assume_latest(self.scan_source.run_async())?;
self.cached_bytes = Some(buffer);
Ok(())
}
fn begin_read(
&mut self,
args: BeginReadArgs,
) -> PolarsResult<(FileReaderOutputRecv, JoinHandle<PolarsResult<()>>)> {
let verbose = self.verbose;
let BeginReadArgs {
projection: Projection::Plain(projected_schema),
row_index,
pre_slice,
predicate: None,
cast_columns_policy: _,
num_pipelines,
disable_morsel_split: _,
callbacks:
FileReaderCallbacks {
file_schema_tx,
n_rows_in_file_tx,
row_position_on_end_tx,
},
} = args
else {
panic!("unsupported args: {:?}", &args)
};
assert!(row_index.is_none());
match &pre_slice {
Some(Slice::Negative { .. }) => unimplemented!(),
Some(pre_slice)
if self.options.parse_options.comment_prefix.is_some() && pre_slice.len() > 0 =>
{
panic!("{pre_slice:?}")
},
_ => {},
}
let mut reader = CompressedReader::try_new(self.cached_bytes.clone().unwrap())?;
let (inferred_schema, base_leftover) = read_until_start_and_infer_schema(
&self.options,
Some(projected_schema.clone()),
None,
&mut reader,
)?;
let used_schema = Arc::new(inferred_schema);
if let Some(tx) = file_schema_tx {
_ = tx.send(used_schema.clone())
}
let projection: Vec<usize> = projected_schema
.iter_names()
.filter_map(|name| used_schema.index_of(name))
.collect();
if verbose {
eprintln!(
"[CsvFileReader]: project: {} / {}, slice: {:?}",
projection.len(),
used_schema.len(),
&pre_slice,
)
}
let quote_char = self.options.parse_options.quote_char;
let eol_char = self.options.parse_options.eol_char;
let comment_prefix = self.options.parse_options.comment_prefix.clone();
let line_counter = CountLines::new(quote_char, eol_char, comment_prefix.clone());
let chunk_reader = Arc::new(ChunkReader::try_new(
self.options.clone(),
used_schema.clone(),
projection,
)?);
let needs_full_row_count = n_rows_in_file_tx.is_some();
let (line_batch_tx, line_batch_receivers) =
distributor_channel(num_pipelines, *DEFAULT_DISTRIBUTOR_BUFFER_SIZE);
let line_batch_source_handle = AbortOnDropHandle::new(spawn(
TaskPriority::Low,
LineBatchSource {
base_leftover,
reader,
line_counter,
line_batch_tx,
pre_slice,
needs_full_row_count,
verbose,
}
.run(),
));
let n_workers = line_batch_receivers.len();
let (morsel_senders, rx) = FileReaderOutputSend::new_parallel(num_pipelines);
let line_batch_decode_handles = line_batch_receivers
.into_iter()
.zip(morsel_senders)
.enumerate()
.map(|(worker_idx, (mut line_batch_rx, mut morsel_tx))| {
let verbose = verbose && worker_idx == n_workers - 1;
let mut n_rows_processed: usize = 0;
let chunk_reader = chunk_reader.clone();
let source_token = SourceToken::new();
AbortOnDropHandle::new(spawn(TaskPriority::Low, async move {
while let Ok(LineBatch {
mem_slice,
n_lines,
slice,
row_offset,
morsel_seq,
}) = line_batch_rx.recv().await
{
let (offset, len) = match slice {
SLICE_ENDED => (0, 1),
v => v,
};
let (df, n_rows_in_chunk) = chunk_reader.read_chunk(
&mem_slice,
n_lines,
(offset, len),
row_offset,
)?;
n_rows_processed = n_rows_processed.saturating_add(n_rows_in_chunk);
if (offset, len) == SLICE_ENDED {
break;
}
let morsel = Morsel::new(df, morsel_seq, source_token.clone());
if morsel_tx.send_morsel(morsel).await.is_err() {
break;
}
}
drop(morsel_tx);
if needs_full_row_count {
if verbose {
eprintln!(
"[CSV LineBatchProcessor {worker_idx}]: entering row count mode"
);
}
while let Ok(LineBatch {
mem_slice: _,
n_lines,
slice,
row_offset: _,
morsel_seq: _,
}) = line_batch_rx.recv().await
{
assert_eq!(slice, SLICE_ENDED);
n_rows_processed = n_rows_processed.saturating_add(n_lines);
}
}
PolarsResult::Ok(n_rows_processed)
}))
})
.collect::<Vec<_>>();
Ok((
rx,
spawn(TaskPriority::Low, async move {
let mut row_position: usize = 0;
for handle in line_batch_decode_handles {
let rows_processed = handle.await?;
row_position = row_position.saturating_add(rows_processed);
}
row_position = {
let rows_skipped = line_batch_source_handle.await?;
row_position.saturating_add(rows_skipped)
};
let row_position = IdxSize::try_from(row_position)
.map_err(|_| polars_err!(bigidx, ctx = "csv file", size = row_position))?;
if let Some(n_rows_in_file_tx) = n_rows_in_file_tx {
assert!(needs_full_row_count);
_ = n_rows_in_file_tx.send(row_position);
}
if let Some(row_position_on_end_tx) = row_position_on_end_tx {
_ = row_position_on_end_tx.send(row_position);
}
Ok(())
}),
))
}
}
struct LineBatchSource {
base_leftover: Buffer<u8>,
reader: CompressedReader,
line_counter: CountLines,
line_batch_tx: distributor_channel::Sender<LineBatch>,
pre_slice: Option<Slice>,
needs_full_row_count: bool,
verbose: bool,
}
impl LineBatchSource {
async fn run(self) -> PolarsResult<usize> {
let LineBatchSource {
base_leftover,
mut reader,
line_counter,
mut line_batch_tx,
pre_slice,
needs_full_row_count,
verbose,
} = self;
let global_slice = if let Some(pre_slice) = pre_slice {
match pre_slice {
Slice::Positive { .. } => Some(Range::<usize>::from(pre_slice)),
Slice::Negative { .. } => unreachable!(),
}
} else {
None
};
if verbose {
eprintln!("[CsvSource]: Start line splitting",);
}
let mut prev_leftover = base_leftover;
let mut row_offset = 0usize;
let mut morsel_seq = MorselSeq::default();
let mut n_rows_skipped: usize = 0;
let mut read_size = CompressedReader::initial_read_size();
loop {
let (mem_slice, bytes_read) = reader.read_next_slice(&prev_leftover, read_size)?;
if mem_slice.is_empty() {
break;
}
prefetch_l2(&mem_slice);
let is_eof = bytes_read == 0;
let (n_lines, unconsumed_offset) = line_counter.count_rows(&mem_slice, is_eof);
let batch_slice = mem_slice.clone().sliced(0..unconsumed_offset);
prev_leftover = mem_slice.sliced(unconsumed_offset..);
if batch_slice.is_empty() && !is_eof {
read_size = read_size.saturating_mul(2);
continue;
}
let prev_row_offset = row_offset;
row_offset += n_lines;
let slice = if let Some(global_slice) = &global_slice {
match SplitSlicePosition::split_slice_at_file(
prev_row_offset,
n_lines,
global_slice.clone(),
) {
SplitSlicePosition::Before => {
n_rows_skipped = n_rows_skipped.saturating_add(n_lines);
continue;
},
SplitSlicePosition::Overlapping(offset, len) => (offset, len),
SplitSlicePosition::After => {
if needs_full_row_count {
SLICE_ENDED
} else {
break;
}
},
}
} else {
NO_SLICE
};
morsel_seq = morsel_seq.successor();
let batch = LineBatch {
mem_slice: batch_slice,
n_lines,
slice,
row_offset,
morsel_seq,
};
if line_batch_tx.send(batch).await.is_err() {
break;
}
if is_eof {
break;
}
if read_size < CompressedReader::ideal_read_size() {
read_size *= 4;
}
}
Ok(n_rows_skipped)
}
}
#[derive(Default)]
struct ChunkReader {
reader_schema: SchemaRef,
parse_options: Arc<CsvParseOptions>,
fields_to_cast: Vec<Field>,
ignore_errors: bool,
projection: Vec<usize>,
null_values: Option<NullValuesCompiled>,
validate_utf8: bool,
}
impl ChunkReader {
fn try_new(
options: Arc<CsvReadOptions>,
mut reader_schema: SchemaRef,
projection: Vec<usize>,
) -> PolarsResult<Self> {
let mut fields_to_cast: Vec<Field> = options.fields_to_cast.clone();
prepare_csv_schema(&mut reader_schema, &mut fields_to_cast)?;
let parse_options = options.parse_options.clone();
let null_values = parse_options
.null_values
.clone()
.map(|nv| nv.compile(&reader_schema))
.transpose()?;
let validate_utf8 = matches!(parse_options.encoding, CsvEncoding::Utf8)
&& reader_schema.iter_fields().any(|f| f.dtype().is_string());
Ok(Self {
reader_schema,
parse_options,
fields_to_cast,
ignore_errors: options.ignore_errors,
projection,
null_values,
validate_utf8,
})
}
fn read_chunk(
&self,
chunk: &[u8],
n_lines: usize,
slice: (usize, usize),
chunk_row_offset: usize,
) -> PolarsResult<(DataFrame, usize)> {
if self.validate_utf8 && !validate_utf8(chunk) {
polars_bail!(ComputeError: "invalid utf-8 sequence")
}
let mut df = if self.projection.is_empty() {
DataFrame::empty_with_height(n_lines)
} else {
read_chunk(
chunk,
&self.parse_options,
&self.reader_schema,
self.ignore_errors,
&self.projection,
0, n_lines, self.null_values.as_ref(),
usize::MAX, chunk.len(), Some(0), )?
};
let height = df.height();
if height != n_lines {
let msg = format!(
"CSV malformed: expected {} rows, actual {} rows, in chunk starting at row_offset {}, length {}",
n_lines,
height,
chunk_row_offset,
chunk.len()
);
if self.ignore_errors {
polars_warn!("{}", msg);
} else {
polars_bail!(ComputeError: msg);
}
}
if slice != NO_SLICE {
assert!(slice != SLICE_ENDED);
df = df.slice(i64::try_from(slice.0).unwrap(), slice.1);
}
cast_columns(&mut df, &self.fields_to_cast, false, self.ignore_errors)?;
Ok((df, height))
}
}