use std::future::Future;
use std::sync::Arc;
use futures::stream::{BoxStream, StreamExt as _};
use url::Url;
use self::executor::TaskExecutor;
use self::filesystem::ObjectStoreStorageHandler;
use self::json::DefaultJsonHandler;
use self::parquet::DefaultParquetHandler;
use super::arrow_conversion::TryFromArrow as _;
use super::arrow_data::ArrowEngineData;
use super::arrow_expression::ArrowEvaluationHandler;
use crate::metrics::MetricsReporter;
use crate::object_store::DynObjectStore;
use crate::schema::Schema;
use crate::transaction::WriteContext;
use crate::{
DeltaResult, Engine, EngineData, EvaluationHandler, JsonHandler, ParquetHandler, StorageHandler,
};
pub mod executor;
pub mod file_stream;
pub mod filesystem;
pub mod json;
pub mod parquet;
pub mod stats;
pub mod storage;
pub(crate) fn stream_future_to_iter<T: Send + 'static, E: executor::TaskExecutor>(
task_executor: Arc<E>,
stream_future: impl Future<Output = DeltaResult<BoxStream<'static, T>>> + Send + 'static,
) -> DeltaResult<Box<dyn Iterator<Item = T> + Send>> {
Ok(Box::new(BlockingStreamIterator {
stream: Some(task_executor.block_on(stream_future)?),
task_executor,
}))
}
struct BlockingStreamIterator<T: Send + 'static, E: executor::TaskExecutor> {
stream: Option<BoxStream<'static, T>>,
task_executor: Arc<E>,
}
impl<T: Send + 'static, E: executor::TaskExecutor> Iterator for BlockingStreamIterator<T, E> {
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
let mut stream = self.stream.take()?;
let (item, stream) = self
.task_executor
.block_on(async move { (stream.next().await, stream) });
if item.is_some() {
self.stream = Some(stream);
}
item
}
}
const DEFAULT_BUFFER_SIZE: usize = 1000;
const DEFAULT_BATCH_SIZE: usize = 1000;
pub(super) struct ReadMetricsIterator {
inner: crate::FileDataReadResultIterator,
reporter: Arc<dyn crate::metrics::MetricsReporter>,
num_files: u64,
bytes_read: u64,
emitted: bool,
make_event: fn(u64, u64) -> crate::metrics::MetricEvent,
}
impl ReadMetricsIterator {
pub(super) fn new(
inner: crate::FileDataReadResultIterator,
reporter: Arc<dyn crate::metrics::MetricsReporter>,
num_files: u64,
bytes_read: u64,
make_event: fn(u64, u64) -> crate::metrics::MetricEvent,
) -> Self {
Self {
inner,
reporter,
num_files,
bytes_read,
emitted: false,
make_event,
}
}
fn emit_once(&mut self) {
if !self.emitted {
self.emitted = true;
self.reporter
.report((self.make_event)(self.num_files, self.bytes_read));
}
}
}
impl Iterator for ReadMetricsIterator {
type Item = crate::DeltaResult<Box<dyn crate::EngineData>>;
fn next(&mut self) -> Option<Self::Item> {
let item = self.inner.next();
if item.is_none() {
self.emit_once();
}
item
}
}
impl Drop for ReadMetricsIterator {
fn drop(&mut self) {
self.emit_once();
}
}
#[derive(Debug)]
pub struct DefaultEngine<E: TaskExecutor> {
object_store: Arc<DynObjectStore>,
task_executor: Arc<E>,
storage: Arc<ObjectStoreStorageHandler<E>>,
json: Arc<DefaultJsonHandler<E>>,
parquet: Arc<DefaultParquetHandler<E>>,
evaluation: Arc<ArrowEvaluationHandler>,
metrics_reporter: Option<Arc<dyn MetricsReporter>>,
}
#[derive(Debug)]
pub struct DefaultEngineBuilder<E: TaskExecutor> {
object_store: Arc<DynObjectStore>,
task_executor: Arc<E>,
metrics_reporter: Option<Arc<dyn MetricsReporter>>,
}
impl DefaultEngineBuilder<executor::tokio::TokioBackgroundExecutor> {
pub fn new(object_store: Arc<DynObjectStore>) -> Self {
Self {
object_store,
task_executor: Arc::new(executor::tokio::TokioBackgroundExecutor::new()),
metrics_reporter: None,
}
}
}
impl<E: TaskExecutor> DefaultEngineBuilder<E> {
pub fn with_metrics_reporter(mut self, reporter: Arc<dyn MetricsReporter>) -> Self {
self.metrics_reporter = Some(reporter);
self
}
pub fn with_task_executor<F: TaskExecutor>(
self,
task_executor: Arc<F>,
) -> DefaultEngineBuilder<F> {
DefaultEngineBuilder {
object_store: self.object_store,
task_executor,
metrics_reporter: self.metrics_reporter,
}
}
pub fn build(self) -> DefaultEngine<E> {
DefaultEngine::new_with_opts(self.object_store, self.task_executor, self.metrics_reporter)
}
}
impl DefaultEngine<executor::tokio::TokioBackgroundExecutor> {
pub fn builder(
object_store: Arc<DynObjectStore>,
) -> DefaultEngineBuilder<executor::tokio::TokioBackgroundExecutor> {
DefaultEngineBuilder::new(object_store)
}
}
impl<E: TaskExecutor> DefaultEngine<E> {
fn new_with_opts(
object_store: Arc<DynObjectStore>,
task_executor: Arc<E>,
metrics_reporter: Option<Arc<dyn MetricsReporter>>,
) -> Self {
Self {
storage: Arc::new(ObjectStoreStorageHandler::new(
object_store.clone(),
task_executor.clone(),
metrics_reporter.clone(),
)),
json: Arc::new(
DefaultJsonHandler::new(object_store.clone(), task_executor.clone())
.with_reporter(metrics_reporter.clone()),
),
parquet: Arc::new(
DefaultParquetHandler::new(object_store.clone(), task_executor.clone())
.with_reporter(metrics_reporter.clone()),
),
object_store,
task_executor,
evaluation: Arc::new(ArrowEvaluationHandler {}),
metrics_reporter,
}
}
pub fn enter(&self) -> <E as TaskExecutor>::Guard<'_> {
self.task_executor.enter()
}
pub fn get_object_store_for_url(&self, _url: &Url) -> Option<Arc<DynObjectStore>> {
Some(self.object_store.clone())
}
pub async fn write_parquet(
&self,
data: &ArrowEngineData,
write_context: &WriteContext,
) -> DeltaResult<Box<dyn EngineData>> {
let transform = write_context.logical_to_physical();
let input_schema = Schema::try_from_arrow(data.record_batch().schema())?;
let output_schema = write_context.physical_schema();
let logical_to_physical_expr = self.evaluation_handler().new_expression_evaluator(
input_schema.into(),
transform.clone(),
output_schema.clone().into(),
)?;
let physical_data = logical_to_physical_expr.evaluate(data)?;
self.parquet
.write_parquet_file(physical_data, write_context)
.await
}
}
pub fn build_add_file_metadata(
file_metadata: parquet::DataFileMetadata,
write_context: &WriteContext,
) -> DeltaResult<Box<dyn EngineData>> {
let add_path = write_context.resolve_file_path(file_metadata.location())?;
file_metadata.as_record_batch(write_context.physical_partition_values(), &add_path)
}
impl<E: TaskExecutor> Engine for DefaultEngine<E> {
fn evaluation_handler(&self) -> Arc<dyn EvaluationHandler> {
self.evaluation.clone()
}
fn storage_handler(&self) -> Arc<dyn StorageHandler> {
self.storage.clone()
}
fn json_handler(&self) -> Arc<dyn JsonHandler> {
self.json.clone()
}
fn parquet_handler(&self) -> Arc<dyn ParquetHandler> {
self.parquet.clone()
}
fn get_metrics_reporter(&self) -> Option<Arc<dyn MetricsReporter>> {
self.metrics_reporter.clone()
}
}
trait UrlExt {
fn is_presigned(&self) -> bool;
}
impl UrlExt for Url {
fn is_presigned(&self) -> bool {
const PRESIGNED_KEYS: &[&str] = &[
"X-Amz-Signature",
"sp",
"X-Goog-Credential",
"X-OSS-Credential",
"X-Databricks-Signature",
];
matches!(self.scheme(), "http" | "https")
&& self
.query_pairs()
.any(|(k, _)| PRESIGNED_KEYS.iter().any(|p| k.eq_ignore_ascii_case(p)))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::engine::tests::test_arrow_engine;
use crate::metrics::MetricEvent;
use crate::object_store::local::LocalFileSystem;
#[derive(Debug)]
struct TestMetricsReporter;
impl MetricsReporter for TestMetricsReporter {
fn report(&self, _event: MetricEvent) {}
}
#[test]
fn test_default_engine() {
let tmp = tempfile::tempdir().unwrap();
let url = Url::from_directory_path(tmp.path()).unwrap();
let object_store = Arc::new(LocalFileSystem::new());
let engine = DefaultEngineBuilder::new(object_store).build();
test_arrow_engine(&engine, &url);
}
#[test]
fn test_default_engine_builder_new_and_build() {
let tmp = tempfile::tempdir().unwrap();
let url = Url::from_directory_path(tmp.path()).unwrap();
let object_store = Arc::new(LocalFileSystem::new());
let engine = DefaultEngineBuilder::new(object_store).build();
test_arrow_engine(&engine, &url);
}
#[test]
fn test_default_engine_builder_with_metrics_reporter() {
let tmp = tempfile::tempdir().unwrap();
let url = Url::from_directory_path(tmp.path()).unwrap();
let object_store = Arc::new(LocalFileSystem::new());
let reporter = Arc::new(TestMetricsReporter);
let engine = DefaultEngineBuilder::new(object_store)
.with_metrics_reporter(reporter)
.build();
assert!(engine.get_metrics_reporter().is_some());
test_arrow_engine(&engine, &url);
}
#[test]
fn test_default_engine_builder_with_custom_executor() {
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();
let tmp = tempfile::tempdir().unwrap();
let url = Url::from_directory_path(tmp.path()).unwrap();
let object_store = Arc::new(LocalFileSystem::new());
let executor = Arc::new(executor::tokio::TokioMultiThreadExecutor::new(
rt.handle().clone(),
));
let engine = DefaultEngineBuilder::new(object_store)
.with_task_executor(executor)
.build();
test_arrow_engine(&engine, &url);
}
#[test]
fn test_default_engine_builder_method() {
let tmp = tempfile::tempdir().unwrap();
let url = Url::from_directory_path(tmp.path()).unwrap();
let object_store = Arc::new(LocalFileSystem::new());
let engine = DefaultEngine::builder(object_store).build();
test_arrow_engine(&engine, &url);
}
#[test]
fn test_default_engine_builder_all_options() {
let tmp = tempfile::tempdir().unwrap();
let url = Url::from_directory_path(tmp.path()).unwrap();
let object_store = Arc::new(LocalFileSystem::new());
let reporter = Arc::new(TestMetricsReporter);
let executor = Arc::new(executor::tokio::TokioBackgroundExecutor::new());
let engine = DefaultEngineBuilder::new(object_store)
.with_metrics_reporter(reporter)
.with_task_executor(executor)
.build();
assert!(engine.get_metrics_reporter().is_some());
test_arrow_engine(&engine, &url);
}
#[test]
fn test_pre_signed_url() {
let url = Url::parse("https://example.com?X-Amz-Signature=foo").unwrap();
assert!(url.is_presigned());
let url = Url::parse("https://example.com?sp=foo").unwrap();
assert!(url.is_presigned());
let url = Url::parse("https://example.com?X-Goog-Credential=foo").unwrap();
assert!(url.is_presigned());
let url = Url::parse("https://example.com?X-OSS-Credential=foo").unwrap();
assert!(url.is_presigned());
let url =
Url::parse("https://example.com?X-Databricks-TTL=3599545&X-Databricks-Signature=bar")
.unwrap();
assert!(url.is_presigned());
let url = Url::parse("https://example.com?x-gooG-credenTIAL=foo").unwrap();
assert!(url.is_presigned());
let url = Url::parse("https://example.com?x-oss-CREDENTIAL=foo").unwrap();
assert!(url.is_presigned());
let url = Url::parse("https://example.com").unwrap();
assert!(!url.is_presigned());
}
}