use std::sync::Arc;
use bytes::Bytes;
use itertools::Itertools;
use tracing::debug;
use url::Url;
use super::arrow_expression::ArrowEvaluationHandler;
use crate::engine::arrow_data::ArrowEngineData;
use crate::object_store::local::LocalFileSystem;
use crate::object_store::path::Path;
use crate::object_store::DynObjectStore;
#[allow(unused_imports)]
use crate::object_store::ObjectStoreExt as _;
use crate::{
DeltaResult, Engine, Error, EvaluationHandler, FileDataReadResultIterator, FileMeta,
JsonHandler, ParquetHandler, PredicateRef, SchemaRef, StorageHandler,
};
pub(crate) mod json;
mod parquet;
mod storage;
pub(crate) struct SyncEngine {
storage_handler: Arc<storage::SyncStorageHandler>,
json_handler: Arc<json::SyncJsonHandler>,
parquet_handler: Arc<parquet::SyncParquetHandler>,
evaluation_handler: Arc<ArrowEvaluationHandler>,
}
impl SyncEngine {
pub(crate) fn new() -> Self {
Self::new_inner(None)
}
pub(crate) fn new_with_store(store: Arc<DynObjectStore>) -> Self {
Self::new_inner(Some(store))
}
fn new_inner(store: Option<Arc<DynObjectStore>>) -> Self {
SyncEngine {
storage_handler: Arc::new(storage::SyncStorageHandler::new(store.clone())),
json_handler: Arc::new(json::SyncJsonHandler::new(store.clone())),
parquet_handler: Arc::new(parquet::SyncParquetHandler::new(store)),
evaluation_handler: Arc::new(ArrowEvaluationHandler {}),
}
}
}
impl Engine for SyncEngine {
fn evaluation_handler(&self) -> Arc<dyn EvaluationHandler> {
self.evaluation_handler.clone()
}
fn storage_handler(&self) -> Arc<dyn StorageHandler> {
self.storage_handler.clone()
}
fn parquet_handler(&self) -> Arc<dyn ParquetHandler> {
self.parquet_handler.clone()
}
fn json_handler(&self) -> Arc<dyn JsonHandler> {
self.json_handler.clone()
}
}
pub(super) fn resolve_scope(
default_store: Option<&Arc<DynObjectStore>>,
url: &Url,
) -> DeltaResult<(Arc<DynObjectStore>, Url, Path)> {
if let Some(store) = default_store {
let mut base_url = url.clone();
base_url.set_path("/");
let path = Path::from_url_path(url.path())?;
return Ok((store.clone(), base_url, path));
}
if url.scheme() != "file" {
return Err(Error::generic(format!(
"SyncEngine without an explicit store can only access file:// URLs, got: {url}"
)));
}
let file_path = url
.to_file_path()
.map_err(|()| Error::generic(format!("Invalid file URL: {url}")))?;
let target_dir = if url.path().ends_with('/') {
file_path.clone()
} else {
file_path
.parent()
.ok_or_else(|| Error::generic(format!("File URL has no parent: {url}")))?
.to_path_buf()
};
let mut prefix = target_dir.as_path();
while !prefix.exists() {
prefix = prefix
.parent()
.ok_or_else(|| Error::generic(format!("No existing ancestor for {target_dir:?}")))?;
}
let prefix = prefix.to_path_buf();
let relative = file_path
.strip_prefix(&prefix)
.map_err(|e| Error::generic(format!("Failed to strip prefix: {e}")))?;
let path = Path::from_iter(relative.components().filter_map(|c| match c {
std::path::Component::Normal(s) => s.to_str().map(String::from),
_ => None,
}));
let base_url = Url::from_directory_path(&prefix)
.map_err(|()| Error::generic(format!("Could not URL-encode prefix {prefix:?}")))?;
let store: Arc<DynObjectStore> = Arc::new(LocalFileSystem::new_with_prefix(&prefix)?);
Ok((store, base_url, path))
}
pub(super) fn get_bytes(
default_store: Option<&Arc<DynObjectStore>>,
location: &Url,
) -> DeltaResult<Bytes> {
let (store, _, path) = resolve_scope(default_store, location)?;
let get_result = futures::executor::block_on(store.get(&path))?;
Ok(futures::executor::block_on(get_result.bytes())?)
}
pub(super) fn put_bytes(
default_store: Option<&Arc<DynObjectStore>>,
location: &Url,
data: Bytes,
overwrite: bool,
) -> DeltaResult<()> {
if location.scheme() == "file" {
if let Ok(file_path) = location.to_file_path() {
if let Some(parent) = file_path.parent() {
if !parent.exists() {
std::fs::create_dir_all(parent)?;
}
}
}
}
let (store, _, object_path) = resolve_scope(default_store, location)?;
let opts = if overwrite {
crate::object_store::PutOptions::default()
} else {
crate::object_store::PutOptions {
mode: crate::object_store::PutMode::Create,
..Default::default()
}
};
futures::executor::block_on(store.put_opts(&object_path, data.into(), opts)).map_err(|e| {
match e {
crate::object_store::Error::AlreadyExists { .. } => {
Error::FileAlreadyExists(location.to_string())
}
other => Error::generic(other.to_string()),
}
})?;
Ok(())
}
fn read_files<F, I>(
store: Option<&Arc<DynObjectStore>>,
files: &[FileMeta],
schema: SchemaRef,
predicate: Option<PredicateRef>,
mut try_create_from_bytes: F,
) -> DeltaResult<FileDataReadResultIterator>
where
I: Iterator<Item = DeltaResult<ArrowEngineData>> + Send + 'static,
F: FnMut(Bytes, SchemaRef, Option<PredicateRef>, String) -> DeltaResult<I> + Send + 'static,
{
debug!("Reading files: {files:#?} with schema {schema:#?} and predicate {predicate:#?}");
if files.is_empty() {
return Ok(Box::new(std::iter::empty()));
}
let files = files.to_vec();
let store = store.cloned();
let result = files
.into_iter()
.map(move |file| {
let location_string = file.location.to_string();
let bytes = get_bytes(store.as_ref(), &file.location)?;
try_create_from_bytes(bytes, schema.clone(), predicate.clone(), location_string)
})
.flatten_ok()
.map(|data| Ok(Box::new(ArrowEngineData::new(data??.into())) as _));
Ok(Box::new(result))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::engine::tests::test_arrow_engine;
#[test]
fn test_sync_engine() {
let tmp = tempfile::tempdir().unwrap();
let url = Url::from_directory_path(tmp.path()).unwrap();
let engine = SyncEngine::new();
test_arrow_engine(&engine, &url);
}
#[test]
fn test_sync_engine_with_store() {
let store = Arc::new(crate::object_store::memory::InMemory::new());
let engine = SyncEngine::new_with_store(store);
let url = Url::parse("memory:///test/").unwrap();
test_arrow_engine(&engine, &url);
}
}