Function polars_io::csv::utils::get_reader_bytes
source · pub fn get_reader_bytes<R: Read + MmapBytesReader + ?Sized>(
reader: &mut R
) -> PolarsResult<ReaderBytes<'_>>Available on crate feature
csv-file only.Examples found in repository?
src/ndjson_core/ndjson.rs (line 112)
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
fn finish(mut self) -> PolarsResult<DataFrame> {
let rechunk = self.rechunk;
let reader_bytes = get_reader_bytes(&mut self.reader)?;
let mut json_reader = CoreJsonReader::new(
reader_bytes,
self.n_rows,
self.schema,
self.n_threads,
1024, // sample size
self.chunk_size,
self.low_memory,
self.infer_schema_len,
)?;
let mut df: DataFrame = json_reader.as_df()?;
if rechunk && df.n_chunks() > 1 {
df.as_single_chunk_par();
}
Ok(df)
}More examples
src/csv/read.rs (line 333)
325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447
fn core_reader<'b>(
&'b mut self,
schema: Option<&'b Schema>,
to_cast: Vec<Field>,
) -> PolarsResult<CoreReader<'b>>
where
'a: 'b,
{
let reader_bytes = get_reader_bytes(&mut self.reader)?;
CoreReader::new(
reader_bytes,
self.n_rows,
self.skip_rows_before_header,
std::mem::take(&mut self.projection),
self.max_records,
self.delimiter,
self.has_header,
self.ignore_parser_errors,
self.schema,
std::mem::take(&mut self.columns),
self.encoding,
self.n_threads,
schema,
self.dtype_overwrite,
self.sample_size,
self.chunk_size,
self.low_memory,
self.comment_char,
self.quote_char,
self.eol_char,
std::mem::take(&mut self.null_values),
std::mem::take(&mut self.predicate),
to_cast,
self.skip_rows_after_header,
std::mem::take(&mut self.row_count),
self.parse_dates,
)
}
fn prepare_schema_overwrite(&self, overwriting_schema: &Schema) -> (Schema, Vec<Field>, bool) {
// This branch we check if there are dtypes we cannot parse.
// We only support a few dtypes in the parser and later cast to the required dtype
let mut to_cast = Vec::with_capacity(overwriting_schema.len());
let mut _has_categorical = false;
#[allow(clippy::unnecessary_filter_map)]
let fields = overwriting_schema.iter_fields().filter_map(|mut fld| {
use DataType::*;
match fld.data_type() {
Time => {
to_cast.push(fld);
// let inference decide the column type
None
}
Int8 | Int16 | UInt8 | UInt16 => {
// We have not compiled these buffers, so we cast them later.
to_cast.push(fld.clone());
fld.coerce(DataType::Int32);
Some(fld)
}
#[cfg(feature = "dtype-categorical")]
Categorical(_) => {
_has_categorical = true;
Some(fld)
}
_ => Some(fld),
}
});
let schema = Schema::from(fields);
(schema, to_cast, _has_categorical)
}
pub fn batched_borrowed(&'a mut self) -> PolarsResult<BatchedCsvReader<'a>> {
if let Some(schema) = self.schema_overwrite {
let (schema, to_cast, has_cat) = self.prepare_schema_overwrite(schema);
self.owned_schema = Some(Box::new(schema));
// safety
// we boxed the schema and we refer to the boxed pointer
// the schema will drop once self drops
// so it is bound to 'a
let schema = unsafe {
std::mem::transmute::<Option<&Schema>, Option<&Schema>>(
self.owned_schema.as_ref().map(|b| b.as_ref()),
)
};
let csv_reader = self.core_reader(schema, to_cast)?;
csv_reader.batched(has_cat)
} else {
let csv_reader = self.core_reader(self.schema, vec![])?;
csv_reader.batched(false)
}
}
}
impl<'a> CsvReader<'a, Box<dyn MmapBytesReader>> {
pub fn batched(mut self, schema: Option<SchemaRef>) -> PolarsResult<OwnedBatchedCsvReader> {
match schema {
Some(schema) => Ok(to_batched_owned(self, schema)),
None => {
let reader_bytes = get_reader_bytes(&mut self.reader)?;
let (inferred_schema, _, _) = infer_file_schema(
&reader_bytes,
self.delimiter.unwrap_or(b','),
self.max_records,
self.has_header,
None,
&mut self.skip_rows_before_header,
self.skip_rows_after_header,
self.comment_char,
self.quote_char,
self.eol_char,
self.null_values.as_ref(),
self.parse_dates,
)?;
let schema = Arc::new(inferred_schema);
Ok(to_batched_owned(self, schema))
}
}
}src/parquet/read_impl.rs (line 351)
323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367
pub fn new(
mut reader: Box<dyn MmapBytesReader>,
limit: usize,
projection: Option<Vec<usize>>,
row_count: Option<RowCount>,
chunk_size: usize,
) -> PolarsResult<Self> {
let metadata = read::read_metadata(&mut reader)?;
let schema = read::schema::infer_schema(&metadata)?;
let n_row_groups = metadata.row_groups.len();
let projection =
projection.unwrap_or_else(|| (0usize..schema.fields.len()).collect::<Vec<_>>());
let parallel =
if n_row_groups > projection.len() || n_row_groups > POOL.current_num_threads() {
ParallelStrategy::RowGroups
} else {
ParallelStrategy::Columns
};
// safety we will keep ownership on the struct and reference the bytes on the heap.
// this should not work with passed bytes so we check if it is a file
assert!(reader.to_file().is_some());
let reader_ptr = unsafe {
std::mem::transmute::<&mut dyn MmapBytesReader, &'static mut dyn MmapBytesReader>(
reader.as_mut(),
)
};
let reader_bytes = get_reader_bytes(reader_ptr)?;
Ok(BatchedParquetReader {
reader,
reader_bytes,
limit,
projection,
schema,
metadata,
row_count,
rows_read: 0,
row_group_offset: 0,
n_row_groups,
chunks_fifo: VecDeque::with_capacity(POOL.current_num_threads()),
parallel,
chunk_size,
})
}