pub mod builder;
pub mod capabilities;
pub mod output;
use arrow::datatypes::ArrowSchemaRef;
use async_trait::async_trait;
use output::FileReaderOutputRecv;
use polars_core::schema::SchemaRef;
use polars_error::PolarsResult;
use polars_io::RowIndex;
use polars_io::predicates::ScanIOPredicate;
use polars_plan::dsl::CastColumnsPolicy;
use polars_utils::IdxSize;
use polars_utils::slice_enum::Slice;
use crate::async_executor::JoinHandle;
use crate::async_primitives::oneshot_channel;
pub use crate::nodes::io_sources::multi_scan::components::projection::Projection;
#[async_trait]
pub trait FileReader: Send + Sync {
async fn initialize(&mut self) -> PolarsResult<()>;
fn prepare_read(&mut self) -> PolarsResult<()> {
Ok(())
}
fn begin_read(
&mut self,
args: BeginReadArgs,
) -> PolarsResult<(FileReaderOutputRecv, JoinHandle<PolarsResult<()>>)>;
async fn file_schema(&mut self) -> PolarsResult<SchemaRef> {
unimplemented!()
}
async fn file_arrow_schema(&mut self) -> PolarsResult<Option<ArrowSchemaRef>> {
Ok(None)
}
async fn n_rows_in_file(&mut self) -> PolarsResult<IdxSize> {
let (tx, rx) = oneshot_channel::channel();
let (morsel_receivers, handle) = self.begin_read(BeginReadArgs {
pre_slice: Some(Slice::Positive { offset: 0, len: 0 }),
callbacks: FileReaderCallbacks {
n_rows_in_file_tx: Some(tx),
..Default::default()
},
..Default::default()
})?;
drop(morsel_receivers);
match rx.recv().await {
Ok(v) => Ok(v),
Err(_) => Err(handle.await.unwrap_err()),
}
}
async fn fast_n_rows_in_file(&mut self) -> PolarsResult<Option<IdxSize>> {
Ok(None)
}
async fn row_position_after_slice(
&mut self,
pre_slice: Option<Slice>,
) -> PolarsResult<IdxSize> {
let Some(pre_slice) = pre_slice else {
return self.n_rows_in_file().await;
};
let (tx, rx) = oneshot_channel::channel();
let (mut morsel_receivers, handle) = self.begin_read(BeginReadArgs {
pre_slice: Some(match pre_slice {
v @ Slice::Positive { .. } => Slice::Positive {
offset: 0,
len: v.end_position(),
},
v @ Slice::Negative { .. } => v,
}),
callbacks: FileReaderCallbacks {
row_position_on_end_tx: Some(tx),
..Default::default()
},
..Default::default()
})?;
while morsel_receivers.recv().await.is_ok() {}
match rx.recv().await {
Ok(v) => Ok(v),
Err(_) => Err(handle.await.unwrap_err()),
}
}
}
#[derive(Debug)]
pub struct BeginReadArgs {
pub projection: Projection,
pub row_index: Option<RowIndex>,
pub pre_slice: Option<Slice>,
pub predicate: Option<ScanIOPredicate>,
pub cast_columns_policy: CastColumnsPolicy,
pub num_pipelines: usize,
pub disable_morsel_split: bool,
pub callbacks: FileReaderCallbacks,
}
impl Default for BeginReadArgs {
fn default() -> Self {
BeginReadArgs {
projection: Projection::Plain(SchemaRef::default()),
row_index: None,
pre_slice: None,
predicate: None,
cast_columns_policy: CastColumnsPolicy::ERROR_ON_MISMATCH,
num_pipelines: 1,
disable_morsel_split: false,
callbacks: FileReaderCallbacks::default(),
}
}
}
#[derive(Default)]
pub struct FileReaderCallbacks {
pub file_schema_tx: Option<oneshot_channel::Sender<SchemaRef>>,
pub n_rows_in_file_tx: Option<oneshot_channel::Sender<IdxSize>>,
pub row_position_on_end_tx: Option<oneshot_channel::Sender<IdxSize>>,
}
impl std::fmt::Debug for FileReaderCallbacks {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let FileReaderCallbacks {
file_schema_tx,
n_rows_in_file_tx,
row_position_on_end_tx,
} = self;
f.write_str(&format!(
"\
FileReaderCallbacks: \
file_schema_tx: {:?}, \
n_rows_in_file_tx: {:?}, \
row_position_on_end_tx: {:?} \
",
file_schema_tx.as_ref().map(|_| ""),
n_rows_in_file_tx.as_ref().map(|_| ""),
row_position_on_end_tx.as_ref().map(|_| "")
))
}
}
pub fn calc_row_position_after_slice(n_rows_in_file: IdxSize, pre_slice: Option<Slice>) -> IdxSize {
let n_rows_in_file = usize::try_from(n_rows_in_file).unwrap();
let out = match pre_slice {
None => n_rows_in_file,
Some(v) => v.restrict_to_bounds(n_rows_in_file).end_position(),
};
IdxSize::try_from(out).unwrap_or(IdxSize::MAX)
}