use std::collections::HashMap;
use std::sync::Arc;
use self::storage::parse_url_opts;
use object_store::{path::Path, DynObjectStore};
use url::Url;
use self::executor::TaskExecutor;
use self::filesystem::ObjectStoreFileSystemClient;
use self::json::DefaultJsonHandler;
use self::parquet::DefaultParquetHandler;
use super::arrow_data::ArrowEngineData;
use super::arrow_expression::ArrowExpressionHandler;
use crate::schema::Schema;
use crate::transaction::WriteContext;
use crate::{
DeltaResult, Engine, EngineData, ExpressionHandler, FileSystemClient, JsonHandler,
ParquetHandler,
};
pub mod executor;
pub mod file_stream;
pub mod filesystem;
pub mod json;
pub mod parquet;
pub mod storage;
#[derive(Debug)]
pub struct DefaultEngine<E: TaskExecutor> {
store: Arc<DynObjectStore>,
file_system: Arc<ObjectStoreFileSystemClient<E>>,
json: Arc<DefaultJsonHandler<E>>,
parquet: Arc<DefaultParquetHandler<E>>,
expression: Arc<ArrowExpressionHandler>,
}
impl<E: TaskExecutor> DefaultEngine<E> {
pub fn try_new<K, V>(
table_root: &Url,
options: impl IntoIterator<Item = (K, V)>,
task_executor: Arc<E>,
) -> DeltaResult<Self>
where
K: AsRef<str>,
V: Into<String>,
{
let (store, table_root) = parse_url_opts(table_root, options)?;
Ok(Self::new(Arc::new(store), table_root, task_executor))
}
pub fn new(store: Arc<DynObjectStore>, table_root: Path, task_executor: Arc<E>) -> Self {
let store_str = format!("{}", store);
let is_local = store_str.starts_with("LocalFileSystem");
Self {
file_system: Arc::new(ObjectStoreFileSystemClient::new(
store.clone(),
!is_local,
table_root,
task_executor.clone(),
)),
json: Arc::new(DefaultJsonHandler::new(
store.clone(),
task_executor.clone(),
)),
parquet: Arc::new(DefaultParquetHandler::new(store.clone(), task_executor)),
store,
expression: Arc::new(ArrowExpressionHandler {}),
}
}
pub fn get_object_store_for_url(&self, _url: &Url) -> Option<Arc<DynObjectStore>> {
Some(self.store.clone())
}
pub async fn write_parquet(
&self,
data: &ArrowEngineData,
write_context: &WriteContext,
partition_values: HashMap<String, String>,
data_change: bool,
) -> DeltaResult<Box<dyn EngineData>> {
let transform = write_context.logical_to_physical();
let input_schema: Schema = data.record_batch().schema().try_into()?;
let output_schema = write_context.schema();
let logical_to_physical_expr = self.get_expression_handler().get_evaluator(
input_schema.into(),
transform.clone(),
output_schema.clone().into(),
);
let physical_data = logical_to_physical_expr.evaluate(data)?;
self.parquet
.write_parquet_file(
write_context.target_dir(),
physical_data,
partition_values,
data_change,
)
.await
}
}
impl<E: TaskExecutor> Engine for DefaultEngine<E> {
fn get_expression_handler(&self) -> Arc<dyn ExpressionHandler> {
self.expression.clone()
}
fn get_file_system_client(&self) -> Arc<dyn FileSystemClient> {
self.file_system.clone()
}
fn get_json_handler(&self) -> Arc<dyn JsonHandler> {
self.json.clone()
}
fn get_parquet_handler(&self) -> Arc<dyn ParquetHandler> {
self.parquet.clone()
}
}