//! Default Json handler implementation
use std::io::BufReader;
use std::sync::Arc;
use std::task::Poll;
use bytes::{Buf, Bytes};
use futures::stream::{self, BoxStream};
use futures::{ready, StreamExt, TryStreamExt};
use url::Url;
use super::executor::TaskExecutor;
use crate::arrow::datatypes::SchemaRef as ArrowSchemaRef;
use crate::arrow::json::ReaderBuilder;
use crate::arrow::record_batch::RecordBatch;
use crate::engine::arrow_utils::{
build_json_reorder_indices, fixup_json_read, json_arrow_schema, parse_json as arrow_parse_json,
to_json_bytes,
};
use crate::engine_data::FilteredEngineData;
use crate::metrics::{MetricEvent, MetricsReporter};
use crate::object_store::path::Path;
use crate::object_store::{self, DynObjectStore, GetResultPayload, ObjectStoreExt as _, PutMode};
use crate::schema::SchemaRef;
use crate::{
DeltaResult, EngineData, Error, FileDataReadResultIterator, FileMeta, JsonHandler, PredicateRef,
};
#[derive(Debug)]
pub struct DefaultJsonHandler<E: TaskExecutor> {
/// The object store to read files from
store: Arc<DynObjectStore>,
/// The executor to run async tasks on
task_executor: Arc<E>,
/// The maximum number of read requests to buffer in memory at once. Note that this actually
/// controls two things: the number of concurrent requests (done by `buffered`) and the size of
/// the buffer (via our `sync_channel`).
buffer_size: usize,
/// Limit the number of rows per batch. That is, for batch_size = N, then each RecordBatch
/// yielded by the stream will have at most N rows.
batch_size: usize,
/// Optional reporter for emitting [`MetricEvent::JsonReadCompleted`] events.
reporter: Option<Arc<dyn MetricsReporter>>,
}
impl<E: TaskExecutor> DefaultJsonHandler<E> {
pub fn new(store: Arc<DynObjectStore>, task_executor: Arc<E>) -> Self {
Self {
store,
task_executor,
buffer_size: super::DEFAULT_BUFFER_SIZE,
batch_size: super::DEFAULT_BATCH_SIZE,
reporter: None,
}
}
/// Set a metrics reporter to receive [`MetricEvent::JsonReadCompleted`] events.
pub fn with_reporter(mut self, reporter: Option<Arc<dyn MetricsReporter>>) -> Self {
self.reporter = reporter;
self
}
/// Set the maximum number read requests to buffer in memory at once in
/// [Self::read_json_files()].
///
/// Defaults to 1000.
///
/// Memory constraints can be imposed by constraining the buffer size and batch size. Note that
/// overall memory usage is proportional to the product of these two values.
/// 1. Batch size governs the size of RecordBatches yielded in each iteration of the stream
/// 2. Buffer size governs the number of concurrent tasks (which equals the size of the buffer
pub fn with_buffer_size(mut self, buffer_size: usize) -> Self {
self.buffer_size = buffer_size;
self
}
/// Limit the number of rows per batch. That is, for batch_size = N, then each RecordBatch
/// yielded by the stream will have at most N rows.
///
/// Defaults to 1000 rows (json objects).
///
/// See [Decoder::with_buffer_size] for details on constraining memory usage with buffer size
/// and batch size.
///
/// [Decoder::with_buffer_size]: crate::arrow::json::reader::Decoder
pub fn with_batch_size(mut self, batch_size: usize) -> Self {
self.batch_size = batch_size;
self
}
}
/// Internal async implementation of read_json_files
async fn read_json_files_impl(
store: Arc<DynObjectStore>,
files: Vec<FileMeta>,
physical_schema: SchemaRef,
_predicate: Option<PredicateRef>,
batch_size: usize,
buffer_size: usize,
) -> DeltaResult<BoxStream<'static, DeltaResult<Box<dyn EngineData>>>> {
if files.is_empty() {
return Ok(Box::pin(stream::empty()));
}
// Build Arrow schema from only the real JSON columns, omitting any metadata columns
// (e.g. FilePath) that the JSON reader cannot populate from the file content.
let json_arrow_schema = Arc::new(json_arrow_schema(&physical_schema)?);
// Build the reorder index vec once; apply it to every batch via reorder_struct_array.
let reorder_indices: Arc<[_]> = build_json_reorder_indices(&physical_schema)?.into();
// An iterator of futures that open each file and post-process each resulting batch.
let file_futures = files.into_iter().map(move |file| {
let store = store.clone();
let json_arrow_schema = json_arrow_schema.clone();
let reorder_indices = reorder_indices.clone();
async move {
let file_path = file.location.to_string();
let batch_stream = open_json_file(store, json_arrow_schema, batch_size, file).await?;
// Re-insert synthesized metadata columns (e.g. file path) at their schema positions.
let tagged = batch_stream
.map(move |result| fixup_json_read(result?, &reorder_indices, &file_path))
.boxed();
Ok::<_, Error>(tagged)
}
});
// Create a stream from that iterator which buffers up to `buffer_size` futures at a time.
let result_stream = stream::iter(file_futures)
.buffered(buffer_size)
.try_flatten()
.map_ok(|e| -> Box<dyn EngineData> { Box::new(e) });
Ok(Box::pin(result_stream))
}
/// Internal async implementation of write_json_file
/// Note: for now we just buffer all the data and write it out all at once
async fn write_json_file_impl(
store: Arc<DynObjectStore>,
path: Url,
buffer: Vec<u8>,
overwrite: bool,
) -> DeltaResult<()> {
let put_mode = if overwrite {
PutMode::Overwrite
} else {
PutMode::Create
};
let path = Path::from_url_path(path.path())?;
let result = store.put_opts(&path, buffer.into(), put_mode.into()).await;
result.map_err(|e| match e {
object_store::Error::AlreadyExists { .. } => Error::FileAlreadyExists(path.to_string()),
e => e.into(),
})?;
Ok(())
}
impl<E: TaskExecutor> JsonHandler for DefaultJsonHandler<E> {
fn parse_json(
&self,
json_strings: Box<dyn EngineData>,
output_schema: SchemaRef,
) -> DeltaResult<Box<dyn EngineData>> {
arrow_parse_json(json_strings, output_schema)
}
fn read_json_files(
&self,
files: &[FileMeta],
physical_schema: SchemaRef,
predicate: Option<PredicateRef>,
) -> DeltaResult<FileDataReadResultIterator> {
let future = read_json_files_impl(
self.store.clone(),
files.to_vec(),
physical_schema,
predicate,
self.batch_size,
self.buffer_size,
);
let inner = super::stream_future_to_iter(self.task_executor.clone(), future)?;
if let Some(reporter) = &self.reporter {
let num_files = files.len() as u64;
let bytes_read = files.iter().map(|f| f.size).sum();
Ok(Box::new(super::ReadMetricsIterator::new(
inner,
reporter.clone(),
num_files,
bytes_read,
|num_files, bytes_read| MetricEvent::JsonReadCompleted {
num_files,
bytes_read,
},
)))
} else {
Ok(inner)
}
}
// note: for now we just buffer all the data and write it out all at once
fn write_json_file(
&self,
path: &Url,
data: Box<dyn Iterator<Item = DeltaResult<FilteredEngineData>> + Send + '_>,
overwrite: bool,
) -> DeltaResult<()> {
self.task_executor.block_on(write_json_file_impl(
self.store.clone(),
path.clone(),
to_json_bytes(data)?,
overwrite,
))
}
}
/// Opens a JSON file and returns a stream of record batches
async fn open_json_file(
store: Arc<DynObjectStore>,
schema: ArrowSchemaRef,
batch_size: usize,
file_meta: FileMeta,
) -> DeltaResult<BoxStream<'static, DeltaResult<RecordBatch>>> {
let path = Path::from_url_path(file_meta.location.path())?;
let result = store.get(&path).await?;
let builder = ReaderBuilder::new(schema)
.with_batch_size(batch_size)
.with_coerce_primitive(true);
match result.payload {
GetResultPayload::File(file, _) => {
let reader = builder.build(BufReader::new(file))?;
let reader = futures::stream::iter(reader).map_err(Error::from);
// Emit exactly one error, then stop the stream. We check seen_error BEFORE
// updating it so the first error passes through, but subsequent items don't.
// This is necessary because Arrow's Reader loops the same error indefinitely.
let mut seen_error = false;
let reader = reader.take_while(move |result| {
let return_this = !seen_error;
if result.is_err() {
seen_error = true;
}
futures::future::ready(return_this)
});
Ok(reader.boxed())
}
GetResultPayload::Stream(s) => {
let mut decoder = builder.build_decoder()?;
let mut input = s.map_err(Error::from);
let mut buffered = Bytes::new();
let s = futures::stream::poll_fn(move |cx| {
loop {
if buffered.is_empty() {
buffered = match ready!(input.poll_next_unpin(cx)) {
Some(Ok(b)) => b,
Some(Err(e)) => return Poll::Ready(Some(Err(e))),
None => break,
};
}
// NB (from Decoder::decode docs):
// Read JSON objects from `buf` (param), returning the number of bytes read
//
// This method returns once `batch_size` objects have been parsed since the
// last call to [`Self::flush`], or `buf` is exhausted. Any remaining bytes
// should be included in the next call to [`Self::decode`]
let decoded = match decoder.decode(buffered.as_ref()) {
Ok(decoded) => decoded,
Err(e) => return Poll::Ready(Some(Err(e.into()))),
};
let read = buffered.len();
buffered.advance(decoded);
if decoded != read {
break;
}
}
Poll::Ready(decoder.flush().map_err(Error::from).transpose())
});
Ok(s.boxed())
}
}
}
#[cfg(test)]
mod tests {
use std::collections::{HashMap, HashSet, VecDeque};
use std::ops::Range;
use std::path::PathBuf;
use std::sync::{mpsc, Arc, Mutex};
use std::task::Waker;
use futures::future;
use itertools::Itertools;
use serde_json::json;
use tracing::info;
use crate::actions::get_commit_schema;
use crate::arrow::array::{Array, AsArray, Int32Array, RecordBatch, StringArray};
use crate::arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
use crate::engine::arrow_data::{ArrowEngineData, EngineDataArrowExt as _};
use crate::engine::default::executor::tokio::{
TokioBackgroundExecutor, TokioMultiThreadExecutor,
};
use crate::object_store::local::LocalFileSystem;
use crate::object_store::memory::InMemory;
#[cfg(any(not(feature = "arrow-57"), feature = "arrow-58"))]
use crate::object_store::{CopyOptions, ObjectStore};
use crate::object_store::{
GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, PutMultipartOptions,
PutOptions, PutPayload, PutResult, Result,
};
use crate::schema::{DataType as DeltaDataType, Schema, StructField};
use crate::utils::test_utils::string_array_to_engine_data;
// TODO: should just use the one from test_utils, but running into dependency issues
fn into_record_batch(engine_data: Box<dyn EngineData>) -> RecordBatch {
ArrowEngineData::try_from_engine_data(engine_data)
.unwrap()
.into()
}
use super::*;
// A wrapper trait that allows us to work with the ObjectStore trait, without directly importing
// it and the ambiguous method errors it would bring.
#[cfg(all(feature = "arrow-57", not(feature = "arrow-58")))]
trait ObjectStore: crate::object_store::ObjectStore {}
#[cfg(all(feature = "arrow-57", not(feature = "arrow-58")))]
impl<T: crate::object_store::ObjectStore + ?Sized> ObjectStore for T {}
/// Store wrapper that wraps an inner store to guarantee the ordering of GET requests. Note
/// that since the keys are resolved in order, requests to subsequent keys in the order will
/// block until the earlier keys are requested.
///
/// WARN: Does not handle duplicate keys, and will fail on duplicate requests of the same key.
// TODO(zach): we can handle duplicate requests if we retain the ordering of the keys track
// that all of the keys prior to the one requested have been resolved.
#[derive(Debug)]
struct OrderedGetStore<T: ObjectStore> {
// The ObjectStore we are wrapping
inner: T,
// Combined state: queue and wakers, protected by a single mutex
state: Mutex<KeysAndWakers>,
}
#[derive(Debug)]
struct KeysAndWakers {
// Queue of paths in order which they will resolve
ordered_keys: VecDeque<Path>,
// Map of paths to wakers for pending get requests
wakers: HashMap<Path, Waker>,
}
impl<T: ObjectStore> OrderedGetStore<T> {
fn new(inner: T, ordered_keys: &[Path]) -> Self {
let ordered_keys = ordered_keys.to_vec();
// Check for duplicates
let mut seen = HashSet::new();
for key in ordered_keys.iter() {
if !seen.insert(key) {
panic!("Duplicate key in OrderedGetStore: {key}");
}
}
let state = KeysAndWakers {
ordered_keys: ordered_keys.into(),
wakers: HashMap::new(),
};
Self {
inner,
state: Mutex::new(state),
}
}
}
impl<T: ObjectStore> std::fmt::Display for OrderedGetStore<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let state = self.state.lock().unwrap();
write!(f, "OrderedGetStore({:?})", state.ordered_keys)
}
}
#[async_trait::async_trait]
impl<T: ObjectStore> crate::object_store::ObjectStore for OrderedGetStore<T> {
async fn put_opts(
&self,
location: &Path,
payload: PutPayload,
opts: PutOptions,
) -> Result<PutResult> {
self.inner.put_opts(location, payload, opts).await
}
async fn put_multipart_opts(
&self,
location: &Path,
opts: PutMultipartOptions,
) -> Result<Box<dyn MultipartUpload>> {
self.inner.put_multipart_opts(location, opts).await
}
// A GET request is fulfilled by checking if the requested path is next in order:
// - if yes, remove the path from the queue and proceed with the GET request, then wake the
// next path in order
// - if no, register the waker and wait
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
// object_store 0.13 implements `head()` via `get_opts(..., head = true)`.
// The ordering queue is only meant to serialize content reads for the test, so
// skip queue accounting for HEAD probes used while constructing FileMeta.
if options.head {
return self.inner.get_opts(location, options).await;
}
// Do the actual GET request first, then introduce any artificial ordering delays as needed
let result = self.inner.get_opts(location, options.clone()).await;
if options.head {
return result;
}
// we implement a future which only resolves once the requested path is next in order
future::poll_fn(move |cx| {
let mut state = self.state.lock().unwrap();
let Some(next_key) = state.ordered_keys.front() else {
panic!("Ran out of keys before {location}");
};
if next_key == location {
// We are next in line. Nobody else can remove our key, and our successor
// cannot race with us to register itself because we hold the lock.
//
// first, remove our key from the queue.
//
// note: safe to unwrap because we just checked that the front key exists (and
// is the same as our requested location)
state.ordered_keys.pop_front().unwrap();
// there are three possible cases, either:
// 1. the key has already been requested, hence there is a waker waiting, and we
// need to wake it up
// 2. the next key has no waker registered, in which case we do nothing, and
// whenever the request for said key is made, it will either be next in line
// or a waker will be registered - either case ensuring that the request is
// completed
// 3. the next key is the last key in the queue, in which case there is nothing
// left to do (no need to wake anyone)
if let Some(next_key) = state.ordered_keys.front().cloned() {
if let Some(waker) = state.wakers.remove(&next_key) {
waker.wake(); // NOTE: Not async, returns instantly.
}
}
Poll::Ready(())
} else {
// We are not next in line, so wait on our key. Nobody can race to remove it
// because we own it; nobody can race to wake us because we hold the lock.
if state
.wakers
.insert(location.clone(), cx.waker().clone())
.is_some()
{
panic!("Somebody else is already waiting on {location}");
}
Poll::Pending
}
})
.await;
// When we return this result, the future succeeds instantly. Any pending wake() call
// will not be processed before the next time we yield -- unless our executor is
// multi-threaded and happens to have another thread available. In that case, the
// serialization point is the moment our next-key poll_fn issues the wake call (or
// proves no wake is needed).
result
}
async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
self.inner.get_ranges(location, ranges).await
}
#[cfg(all(feature = "arrow-57", not(feature = "arrow-58")))]
async fn delete(&self, location: &Path) -> Result<()> {
self.inner.delete(location).await
}
#[cfg(any(not(feature = "arrow-57"), feature = "arrow-58"))]
fn delete_stream(
&self,
locations: BoxStream<'static, Result<Path>>,
) -> BoxStream<'static, Result<Path>> {
self.inner.delete_stream(locations)
}
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
self.inner.list(prefix)
}
fn list_with_offset(
&self,
prefix: Option<&Path>,
offset: &Path,
) -> BoxStream<'static, Result<ObjectMeta>> {
self.inner.list_with_offset(prefix, offset)
}
async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
self.inner.list_with_delimiter(prefix).await
}
#[cfg(all(feature = "arrow-57", not(feature = "arrow-58")))]
async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
self.inner.copy(from, to).await
}
#[cfg(any(not(feature = "arrow-57"), feature = "arrow-58"))]
async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) -> Result<()> {
self.inner.copy_opts(from, to, options).await
}
#[cfg(all(feature = "arrow-57", not(feature = "arrow-58")))]
async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
self.inner.copy_if_not_exists(from, to).await
}
}
#[test]
fn test_parse_json() {
let store = Arc::new(LocalFileSystem::new());
let handler = DefaultJsonHandler::new(store, Arc::new(TokioBackgroundExecutor::new()));
let json_strings = StringArray::from(vec![
r#"{"add":{"path":"part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet","partitionValues":{},"size":635,"modificationTime":1677811178336,"dataChange":true,"stats":"{\"numRecords\":10,\"minValues\":{\"value\":0},\"maxValues\":{\"value\":9},\"nullCount\":{\"value\":0},\"tightBounds\":true}","tags":{"INSERTION_TIME":"1677811178336000","MIN_INSERTION_TIME":"1677811178336000","MAX_INSERTION_TIME":"1677811178336000","OPTIMIZE_TARGET_SIZE":"268435456"}}}"#,
r#"{"commitInfo":{"timestamp":1677811178585,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"10","numOutputBytes":"635"},"engineInfo":"Runtime/<unknown>","txnId":"a6a94671-55ef-450e-9546-b8465b9147de"}}"#,
r#"{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["deletionVectors"],"writerFeatures":["deletionVectors"]}}"#,
r#"{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableDeletionVectors":"true","delta.columnMapping.mode":"none"},"createdTime":1677811175819}}"#,
]);
let output_schema = get_commit_schema().clone();
let batch = handler
.parse_json(string_array_to_engine_data(json_strings), output_schema)
.unwrap();
assert_eq!(batch.len(), 4);
}
// Test that operationParameters with boolean/numeric primitives are coerced to strings.
// Some delta logs contain values like `"statsOnLoad": false` instead of `"statsOnLoad":
// "false"`. Without `with_coerce_primitive(true)`, this would fail with:
// "whilst decoding field 'commitInfo': whilst decoding field 'operationParameters': expected
// string got false"
#[test]
fn test_parse_json_coerce_operation_parameters() {
let store = Arc::new(LocalFileSystem::new());
let handler = DefaultJsonHandler::new(store, Arc::new(TokioBackgroundExecutor::new()));
// JSON with operationParameters containing boolean and numeric primitives (not strings)
let json_strings = StringArray::from(vec![
r#"{"commitInfo":{"timestamp":1677811178585,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","statsOnLoad":false,"numRetries":5},"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"10","numOutputBytes":"635"},"engineInfo":"Runtime/<unknown>","txnId":"a6a94671-55ef-450e-9546-b8465b9147de"}}"#,
]);
let output_schema = get_commit_schema().clone();
let batch: RecordBatch = handler
.parse_json(string_array_to_engine_data(json_strings), output_schema)
.unwrap()
.try_into_record_batch()
.unwrap();
assert_eq!(batch.num_rows(), 1);
// Verify the operationParameters were parsed correctly with primitives coerced to strings
let commit_info = batch.column_by_name("commitInfo").unwrap().as_struct();
let op_params = commit_info
.column_by_name("operationParameters")
.unwrap()
.as_map();
// The map should have 3 entries: mode, statsOnLoad, numRetries
let map_entries = op_params.value(0);
assert_eq!(map_entries.len(), 3);
// Extract keys and values from the map
let keys = map_entries.column(0).as_string::<i32>();
let values = map_entries.column(1).as_string::<i32>();
// Build a HashMap for easier lookup
let params: std::collections::HashMap<_, _> = (0..keys.len())
.map(|i| (keys.value(i), values.value(i)))
.collect();
// Verify coerced primitive values: boolean false -> "false", integer 5 -> "5"
assert_eq!(params.get("statsOnLoad"), Some(&"false"));
assert_eq!(params.get("numRetries"), Some(&"5"));
assert_eq!(params.get("mode"), Some(&"ErrorIfExists"));
}
#[test]
fn test_parse_json_drop_field() {
let store = Arc::new(LocalFileSystem::new());
let handler = DefaultJsonHandler::new(store, Arc::new(TokioBackgroundExecutor::new()));
let json_strings = StringArray::from(vec![
r#"{"add":{"path":"part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet","partitionValues":{},"size":635,"modificationTime":1677811178336,"dataChange":true,"stats":"{\"numRecords\":10,\"minValues\":{\"value\":0},\"maxValues\":{\"value\":9},\"nullCount\":{\"value\":0},\"tightBounds\":false}","tags":{"INSERTION_TIME":"1677811178336000","MIN_INSERTION_TIME":"1677811178336000","MAX_INSERTION_TIME":"1677811178336000","OPTIMIZE_TARGET_SIZE":"268435456"},"deletionVector":{"storageType":"u","pathOrInlineDv":"vBn[lx{q8@P<9BNH/isA","offset":1,"sizeInBytes":36,"cardinality":2, "maxRowId": 3}}}"#,
]);
let output_schema = get_commit_schema().clone();
let batch: RecordBatch = handler
.parse_json(string_array_to_engine_data(json_strings), output_schema)
.unwrap()
.try_into_record_batch()
.unwrap();
assert_eq!(batch.column(0).len(), 1);
let add_array = batch.column_by_name("add").unwrap().as_struct();
let dv_col = add_array
.column_by_name("deletionVector")
.unwrap()
.as_struct();
assert!(dv_col.column_by_name("storageType").is_some());
assert!(dv_col.column_by_name("maxRowId").is_none());
}
#[tokio::test]
async fn test_read_json_files() {
let store = Arc::new(LocalFileSystem::new());
let path = std::fs::canonicalize(PathBuf::from(
"./tests/data/table-with-dv-small/_delta_log/00000000000000000000.json",
))
.unwrap();
let url = Url::from_file_path(path).unwrap();
let location = Path::from_url_path(url.path()).unwrap();
let meta = store.head(&location).await.unwrap();
let files = &[FileMeta {
location: url.clone(),
last_modified: meta.last_modified.timestamp_millis(),
size: meta.size,
}];
let handler = DefaultJsonHandler::new(store, Arc::new(TokioBackgroundExecutor::new()));
let data: Vec<RecordBatch> = handler
.read_json_files(files, get_commit_schema().clone(), None)
.unwrap()
.map_ok(into_record_batch)
.try_collect()
.unwrap();
assert_eq!(data.len(), 1);
assert_eq!(data[0].num_rows(), 4);
// limit batch size
let handler = handler.with_batch_size(2);
let data: Vec<RecordBatch> = handler
.read_json_files(files, get_commit_schema().clone(), None)
.unwrap()
.map_ok(into_record_batch)
.try_collect()
.unwrap();
assert_eq!(data.len(), 2);
assert_eq!(data[0].num_rows(), 2);
assert_eq!(data[1].num_rows(), 2);
}
#[tokio::test]
async fn test_ordered_get_store() {
// note we don't want to go over 1000 since we only buffer 1000 requests at a time
let num_paths = 1000;
let ordered_paths: Vec<Path> = (0..num_paths)
.map(|i| Path::from(format!("/test/path{i}")))
.collect();
let jumbled_paths: Vec<_> = ordered_paths[100..400]
.iter()
.chain(ordered_paths[400..].iter().rev())
.chain(ordered_paths[..100].iter())
.cloned()
.collect();
let memory_store = InMemory::new();
for (i, path) in ordered_paths.iter().enumerate() {
memory_store
.put(path, Bytes::from(format!("content_{i}")).into())
.await
.unwrap();
}
// Create ordered store with natural order (0, 1, 2, ...)
let ordered_store = Arc::new(OrderedGetStore::new(memory_store, &ordered_paths));
let (tx, rx) = mpsc::channel();
// Spawn tasks to GET each path in our somewhat jumbled order
// They should complete in order (0, 1, 2, ...) due to OrderedGetStore
let handles = jumbled_paths.into_iter().map(|path| {
let store = ordered_store.clone();
let tx = tx.clone();
tokio::spawn(async move {
let _ = store.get(&path).await.unwrap();
tx.send(path).unwrap();
})
});
// TODO(zach): we need to join all the handles otherwise none of the tasks run? despite the
// docs?
future::join_all(handles).await;
drop(tx);
// NB (from mpsc::Receiver::recv): This function will always block the current thread if
// there is no data available and it's possible for more data to be sent (at least one
// sender still exists).
let mut completed = Vec::new();
while let Ok(path) = rx.recv() {
completed.push(path);
}
assert_eq!(
completed,
ordered_paths.into_iter().collect_vec(),
"Expected paths to complete in order"
);
}
use std::io::Write;
use tempfile::NamedTempFile;
use crate::engine::default::DefaultEngineBuilder;
use crate::schema::StructType;
use crate::Engine;
fn make_invalid_named_temp() -> (NamedTempFile, Url) {
let mut temp_file = NamedTempFile::new().expect("Failed to create temp file");
write!(temp_file, r#"this is not valid json"#).expect("Failed to write to temp file");
let path = temp_file.path();
let file_url = Url::from_file_path(path).expect("Failed to create file URL");
info!("Created temporary malformed file at: {file_url}");
(temp_file, file_url)
}
#[test]
fn test_read_invalid_json() -> Result<(), Box<dyn std::error::Error>> {
let _ = tracing_subscriber::fmt().try_init();
let (_temp_file1, file_url1) = make_invalid_named_temp();
let (_temp_file2, file_url2) = make_invalid_named_temp();
let field = StructField::nullable("name", crate::schema::DataType::BOOLEAN);
let schema = Arc::new(StructType::try_new(vec![field]).unwrap());
let default_engine = DefaultEngineBuilder::new(Arc::new(LocalFileSystem::new())).build();
// Helper to check that we get expected number of errors then stream ends
let check_errors = |file_urls: Vec<_>, expected_errors: usize| {
let file_vec: Vec<_> = file_urls
.into_iter()
.map(|url| FileMeta::new(url, 1, 1))
.collect();
let mut iter = default_engine
.json_handler()
.read_json_files(&file_vec, schema.clone(), None)
.unwrap();
for _ in 0..expected_errors {
assert!(
iter.next().unwrap().is_err(),
"Read succeeded unexpectedly. The JSON should have been invalid."
);
}
assert!(
iter.next().is_none(),
"The stream should end once the read result fails"
);
};
// CASE 1: Single failing file
info!("\nAttempting to read single malformed JSON file...");
check_errors(vec![file_url1.clone()], 1);
// CASE 2: Two failing files
info!("\nAttempting to read two malformed JSON files...");
check_errors(vec![file_url1, file_url2], 2);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 3)]
async fn test_read_json_files_ordering() {
// this test checks that the read_json_files method returns the files in order in the
// presence of an ObjectStore (OrderedGetStore) that resolves paths in a jumbled order:
// 1. we set up a list of FileMetas (and some random JSON content) in order
// 2. we then set up an ObjectStore to resolves those paths in a jumbled order
// 3. then call read_json_files and check that the results are in order
let ordered_paths: Vec<Path> = (0..1000)
.map(|i| Path::from(format!("test/path{i}")))
.collect();
let test_list: &[(usize, Vec<Path>)] = &[
// test 1: buffer_size = 1000, just 1000 jumbled paths
(
1000, // buffer_size
ordered_paths[100..400]
.iter()
.chain(ordered_paths[400..].iter().rev())
.chain(ordered_paths[..100].iter())
.cloned()
.collect(),
),
// test 2: buffer_size = 4, jumbled paths in groups of 4
(
4, // buffer_size
(0..250)
.flat_map(|i| {
[
ordered_paths[1 + 4 * i].clone(),
ordered_paths[4 * i].clone(),
ordered_paths[3 + 4 * i].clone(),
ordered_paths[2 + 4 * i].clone(),
]
})
.collect_vec(),
),
];
let memory_store = InMemory::new();
for (i, path) in ordered_paths.iter().enumerate() {
memory_store
.put(path, Bytes::from(format!("{{\"val\": {i}}}")).into())
.await
.unwrap();
}
for (buffer_size, jumbled_paths) in test_list {
// set up our ObjectStore to resolve paths in a jumbled order
let store = Arc::new(OrderedGetStore::new(memory_store.fork(), jumbled_paths));
// convert the paths to FileMeta
let ordered_file_meta: Vec<_> = ordered_paths
.iter()
.map(|path| {
let store = store.clone();
async move {
let url = Url::parse(&format!("memory:/{path}")).unwrap();
let location = Path::from(path.as_ref());
let meta = store.head(&location).await.unwrap();
FileMeta {
location: url,
last_modified: meta.last_modified.timestamp_millis(),
size: meta.size,
}
}
})
.collect();
// note: join_all is ordered
let files = future::join_all(ordered_file_meta).await;
// fire off the read_json_files call (for all the files in order)
let handler = DefaultJsonHandler::new(
store,
Arc::new(TokioMultiThreadExecutor::new(
tokio::runtime::Handle::current(),
)),
);
let handler = handler.with_buffer_size(*buffer_size);
let physical_schema = Arc::new(Schema::new_unchecked(vec![StructField::nullable(
"val",
DeltaDataType::INTEGER,
)]));
let data: Vec<RecordBatch> = handler
.read_json_files(&files, physical_schema, None)
.unwrap()
.map_ok(into_record_batch)
.try_collect()
.unwrap();
// check the order
let all_values: Vec<i32> = data
.iter()
.flat_map(|batch| {
let val_col: &Int32Array = batch.column(0).as_primitive();
(0..val_col.len()).map(|i| val_col.value(i)).collect_vec()
})
.collect();
assert_eq!(all_values, (0..1000).collect_vec());
}
}
// Helper function to create test data
fn create_test_data(values: Vec<&str>) -> DeltaResult<Box<dyn EngineData>> {
let schema = Arc::new(ArrowSchema::new(vec![Field::new(
"dog",
DataType::Utf8,
true,
)]));
let batch =
RecordBatch::try_new(schema.clone(), vec![Arc::new(StringArray::from(values))])?;
Ok(Box::new(ArrowEngineData::new(batch)))
}
// Helper function to read JSON file asynchronously
async fn read_json_file(
store: &Arc<InMemory>,
path: &Path,
) -> DeltaResult<Vec<serde_json::Value>> {
let content = store.get(path).await?;
let file_bytes = content.bytes().await?;
let file_string =
String::from_utf8(file_bytes.to_vec()).map_err(|e| object_store::Error::Generic {
store: "memory",
source: Box::new(e),
})?;
let json: Vec<_> = serde_json::Deserializer::from_str(&file_string)
.into_iter::<serde_json::Value>()
.flatten()
.collect();
Ok(json)
}
#[tokio::test]
async fn test_write_json_file_without_overwrite() -> DeltaResult<()> {
do_test_write_json_file(false).await
}
#[tokio::test]
async fn test_write_json_file_overwrite() -> DeltaResult<()> {
do_test_write_json_file(true).await
}
async fn do_test_write_json_file(overwrite: bool) -> DeltaResult<()> {
let store = Arc::new(InMemory::new());
let executor = Arc::new(TokioBackgroundExecutor::new());
let handler = DefaultJsonHandler::new(store.clone(), executor);
let path = Url::parse("memory:///test/data/00000000000000000001.json")?;
let object_path = Path::from("/test/data/00000000000000000001.json");
// First write with no existing file
let data = create_test_data(vec!["remi", "wilson"])?;
let filtered_data = Ok(FilteredEngineData::with_all_rows_selected(data));
let result =
handler.write_json_file(&path, Box::new(std::iter::once(filtered_data)), overwrite);
// Verify the first write is successful
assert!(result.is_ok());
let json = read_json_file(&store, &object_path).await?;
assert_eq!(json, vec![json!({"dog": "remi"}), json!({"dog": "wilson"})]);
// Second write with existing file
let data = create_test_data(vec!["seb", "tia"])?;
let filtered_data = Ok(FilteredEngineData::with_all_rows_selected(data));
let result =
handler.write_json_file(&path, Box::new(std::iter::once(filtered_data)), overwrite);
if overwrite {
// Verify the second write is successful
assert!(result.is_ok());
let json = read_json_file(&store, &object_path).await?;
assert_eq!(json, vec![json!({"dog": "seb"}), json!({"dog": "tia"})]);
} else {
// Verify the second write fails with FileAlreadyExists error
match result {
Err(Error::FileAlreadyExists(err_path)) => {
assert_eq!(err_path, object_path.to_string());
}
_ => panic!("Expected FileAlreadyExists error, got: {result:?}"),
}
}
Ok(())
}
}