use super::*;
use crate::csv::CsvReader;
use crate::mmap::MmapBytesReader;
impl<'a> CoreReader<'a> {
pub fn batched(mut self, _has_cat: bool) -> PolarsResult<BatchedCsvReader<'a>> {
let mut n_threads = self.n_threads.unwrap_or_else(|| POOL.current_num_threads());
let reader_bytes = self.reader_bytes.take().unwrap();
let logging = std::env::var("POLARS_VERBOSE").as_deref().unwrap_or("0") == "1";
let (file_chunks, chunk_size, _total_rows, starting_point_offset, _bytes) = self
.determine_file_chunks_and_statistics(&mut n_threads, &reader_bytes, logging, true)?;
let projection = self.get_projection();
let str_columns = unsafe {
std::mem::transmute::<Vec<&str>, Vec<&'a str>>(self.get_string_columns(&projection)?)
};
#[cfg(feature = "dtype-categorical")]
let _cat_lock = if _has_cat {
Some(polars_core::IUseStringCache::new())
} else {
None
};
#[cfg(not(feature = "dtype-categorical"))]
let _cat_lock = None;
Ok(BatchedCsvReader {
reader_bytes,
chunk_size,
file_chunks,
chunk_offset: 0,
str_capacities: self.init_string_size_stats(&str_columns, chunk_size),
str_columns,
projection,
starting_point_offset,
row_count: self.row_count,
comment_char: self.comment_char,
quote_char: self.quote_char,
eol_char: self.eol_char,
null_values: self.null_values,
to_cast: self.to_cast,
ignore_parser_errors: self.ignore_parser_errors,
n_rows: self.n_rows,
encoding: self.encoding,
delimiter: self.delimiter,
schema: self.schema,
rows_read: 0,
_cat_lock,
})
}
}
pub struct BatchedCsvReader<'a> {
reader_bytes: ReaderBytes<'a>,
chunk_size: usize,
file_chunks: Vec<(usize, usize)>,
chunk_offset: IdxSize,
str_capacities: Vec<RunningSize>,
str_columns: Vec<&'a str>,
projection: Vec<usize>,
starting_point_offset: Option<usize>,
row_count: Option<RowCount>,
comment_char: Option<u8>,
quote_char: Option<u8>,
eol_char: u8,
null_values: Option<NullValuesCompiled>,
to_cast: Vec<Field>,
ignore_parser_errors: bool,
n_rows: Option<usize>,
encoding: CsvEncoding,
delimiter: u8,
schema: Cow<'a, Schema>,
rows_read: IdxSize,
#[cfg(feature = "dtype-categorical")]
_cat_lock: Option<polars_core::IUseStringCache>,
#[cfg(not(feature = "dtype-categorical"))]
_cat_lock: Option<u8>,
}
impl<'a> BatchedCsvReader<'a> {
pub fn next_batches(&mut self, n: usize) -> PolarsResult<Option<Vec<(IdxSize, DataFrame)>>> {
if self.chunk_offset == self.file_chunks.len() as IdxSize {
return Ok(None);
}
if let Some(n_rows) = self.n_rows {
if self.rows_read >= n_rows as IdxSize {
return Ok(None);
}
}
let end = std::cmp::min(self.chunk_offset as usize + n, self.file_chunks.len());
let chunks = &self.file_chunks[self.chunk_offset as usize..end];
self.chunk_offset = end as IdxSize;
let mut bytes = self.reader_bytes.deref();
if let Some(pos) = self.starting_point_offset {
bytes = &bytes[pos..];
}
let mut chunks = POOL.install(|| {
chunks
.into_par_iter()
.copied()
.map(|(bytes_offset_thread, stop_at_nbytes)| {
let mut df = read_chunk(
bytes,
self.delimiter,
self.schema.as_ref(),
self.ignore_parser_errors,
&self.projection,
bytes_offset_thread,
self.quote_char,
self.eol_char,
self.comment_char,
self.chunk_size,
&self.str_capacities,
self.encoding,
self.null_values.as_ref(),
self.chunk_size,
stop_at_nbytes,
self.starting_point_offset,
)?;
cast_columns(&mut df, &self.to_cast, false)?;
update_string_stats(&self.str_capacities, &self.str_columns, &df)?;
if let Some(rc) = &self.row_count {
df.with_row_count_mut(&rc.name, Some(rc.offset));
}
let n_read = df.height() as IdxSize;
Ok((df, n_read))
})
.collect::<PolarsResult<Vec<_>>>()
})?;
if self.row_count.is_some() {
update_row_counts(&mut chunks, self.rows_read)
}
self.rows_read += chunks[chunks.len() - 1].1;
Ok(Some(
chunks
.into_iter()
.enumerate()
.map(|(i, t)| (i as IdxSize + self.chunk_offset, t.0))
.collect(),
))
}
}
pub struct OwnedBatchedCsvReader {
#[allow(dead_code)]
schema: SchemaRef,
reader: *mut CsvReader<'static, Box<dyn MmapBytesReader>>,
batched_reader: *mut BatchedCsvReader<'static>,
}
unsafe impl Send for OwnedBatchedCsvReader {}
unsafe impl Sync for OwnedBatchedCsvReader {}
impl OwnedBatchedCsvReader {
pub fn next_batches(&mut self, n: usize) -> PolarsResult<Option<Vec<(IdxSize, DataFrame)>>> {
let reader = unsafe { &mut *self.batched_reader };
reader.next_batches(n)
}
}
impl Drop for OwnedBatchedCsvReader {
fn drop(&mut self) {
unsafe {
let _to_drop = Box::from_raw(self.batched_reader);
let _to_drop = Box::from_raw(self.reader);
};
}
}
pub fn to_batched_owned(
reader: CsvReader<'_, Box<dyn MmapBytesReader>>,
schema: SchemaRef,
) -> OwnedBatchedCsvReader {
let reader = reader.with_schema(schema.as_ref());
let reader = unsafe {
std::mem::transmute::<
CsvReader<'_, Box<dyn MmapBytesReader>>,
CsvReader<'static, Box<dyn MmapBytesReader>>,
>(reader)
};
let reader = Box::new(reader);
let reader = Box::leak(reader) as *mut CsvReader<'static, Box<dyn MmapBytesReader>>;
let batched_reader = unsafe { Box::new((*reader).batched_borrowed().unwrap()) };
let batched_reader = Box::leak(batched_reader) as *mut BatchedCsvReader;
OwnedBatchedCsvReader {
schema,
reader,
batched_reader,
}
}