use std::path::{Path, PathBuf};
use std::sync::Arc;
use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use datafusion::catalog::Session;
use datafusion::datasource::memory::MemorySourceConfig;
use datafusion::datasource::{TableProvider, TableType};
use datafusion::error::DataFusionError;
use datafusion::error::Result as DFResult;
use datafusion::logical_expr::Expr;
use datafusion::physical_plan::ExecutionPlan;
#[derive(Debug, Clone)]
pub struct ParquetTableProvider {
path: PathBuf,
schema: SchemaRef,
}
impl ParquetTableProvider {
pub fn open(path: impl AsRef<Path>) -> Result<Self, DataFusionError> {
let path = path.as_ref().to_path_buf();
let meta = oxistore_columnar::read_metadata(&path)
.map_err(|e| DataFusionError::External(Box::new(e)))?;
Ok(Self {
path,
schema: meta.schema,
})
}
pub fn path(&self) -> &Path {
&self.path
}
}
#[async_trait]
impl TableProvider for ParquetTableProvider {
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
fn table_type(&self) -> TableType {
TableType::Base
}
async fn scan(
&self,
_state: &dyn Session,
projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> DFResult<Arc<dyn ExecutionPlan>> {
let batches = match projection {
Some(indices) if !indices.is_empty() => {
oxistore_columnar::read_batches_with_projection(&self.path, indices)
.map_err(|e| DataFusionError::External(Box::new(e)))?
}
_ => oxistore_columnar::read_batches(&self.path)
.map_err(|e| DataFusionError::External(Box::new(e)))?,
};
let projected_schema = if let (Some(indices), Some(first)) = (projection, batches.first()) {
if indices.is_empty() {
Arc::clone(&self.schema)
} else {
first.schema()
}
} else {
Arc::clone(&self.schema)
};
let partitions = vec![batches];
let exec = MemorySourceConfig::try_new_exec(&partitions, projected_schema, None)?;
Ok(exec as Arc<dyn ExecutionPlan>)
}
}