use std::any::Any;
use std::io::{BufReader, Read, Seek, SeekFrom};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use crate::file_format::JsonDecoder;
use crate::utils::{ChannelReader, JsonArrayToNdjsonReader};
use datafusion_common::error::{DataFusionError, Result};
use datafusion_common_runtime::{JoinSet, SpawnedTask};
use datafusion_datasource::decoder::{DecoderDeserializer, deserialize_stream};
use datafusion_datasource::file_compression_type::FileCompressionType;
use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
use datafusion_datasource::projection::{ProjectionOpener, SplitProjection};
use datafusion_datasource::{
ListingTableUrl, PartitionedFile, RangeCalculation, as_file_source, calculate_range,
};
use datafusion_physical_plan::projection::ProjectionExprs;
use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
use arrow::array::RecordBatch;
use arrow::json::ReaderBuilder;
use arrow::{datatypes::SchemaRef, json};
use datafusion_datasource::file::FileSource;
use datafusion_datasource::file_scan_config::FileScanConfig;
use datafusion_execution::TaskContext;
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use futures::{Stream, StreamExt, TryStreamExt};
use object_store::buffered::BufWriter;
use object_store::{GetOptions, GetResultPayload, ObjectStore};
use tokio::io::AsyncWriteExt;
use tokio_stream::wrappers::ReceiverStream;
const CHANNEL_BUFFER_SIZE: usize = 128;
const JSON_CONVERTER_BUFFER_SIZE: usize = 2 * 1024 * 1024;
struct JsonArrayStream {
inner: ReceiverStream<std::result::Result<RecordBatch, arrow::error::ArrowError>>,
_read_task: SpawnedTask<()>,
_parse_task: SpawnedTask<()>,
}
impl Stream for JsonArrayStream {
type Item = std::result::Result<RecordBatch, arrow::error::ArrowError>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.inner).poll_next(cx)
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.inner.size_hint()
}
}
pub struct JsonOpener {
batch_size: usize,
projected_schema: SchemaRef,
file_compression_type: FileCompressionType,
object_store: Arc<dyn ObjectStore>,
newline_delimited: bool,
}
impl JsonOpener {
pub fn new(
batch_size: usize,
projected_schema: SchemaRef,
file_compression_type: FileCompressionType,
object_store: Arc<dyn ObjectStore>,
newline_delimited: bool,
) -> Self {
Self {
batch_size,
projected_schema,
file_compression_type,
object_store,
newline_delimited,
}
}
}
#[derive(Clone)]
pub struct JsonSource {
table_schema: datafusion_datasource::TableSchema,
batch_size: Option<usize>,
metrics: ExecutionPlanMetricsSet,
projection: SplitProjection,
newline_delimited: bool,
}
impl JsonSource {
pub fn new(table_schema: impl Into<datafusion_datasource::TableSchema>) -> Self {
let table_schema = table_schema.into();
Self {
projection: SplitProjection::unprojected(&table_schema),
table_schema,
batch_size: None,
metrics: ExecutionPlanMetricsSet::new(),
newline_delimited: true,
}
}
pub fn with_newline_delimited(mut self, newline_delimited: bool) -> Self {
self.newline_delimited = newline_delimited;
self
}
}
impl From<JsonSource> for Arc<dyn FileSource> {
fn from(source: JsonSource) -> Self {
as_file_source(source)
}
}
impl FileSource for JsonSource {
fn create_file_opener(
&self,
object_store: Arc<dyn ObjectStore>,
base_config: &FileScanConfig,
_partition: usize,
) -> Result<Arc<dyn FileOpener>> {
let file_schema = self.table_schema.file_schema();
let projected_schema =
Arc::new(file_schema.project(&self.projection.file_indices)?);
let mut opener = Arc::new(JsonOpener {
batch_size: self
.batch_size
.expect("Batch size must set before creating opener"),
projected_schema,
file_compression_type: base_config.file_compression_type,
object_store,
newline_delimited: self.newline_delimited,
}) as Arc<dyn FileOpener>;
opener = ProjectionOpener::try_new(
self.projection.clone(),
Arc::clone(&opener),
self.table_schema.file_schema(),
)?;
Ok(opener)
}
fn as_any(&self) -> &dyn Any {
self
}
fn table_schema(&self) -> &datafusion_datasource::TableSchema {
&self.table_schema
}
fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
let mut conf = self.clone();
conf.batch_size = Some(batch_size);
Arc::new(conf)
}
fn try_pushdown_projection(
&self,
projection: &ProjectionExprs,
) -> Result<Option<Arc<dyn FileSource>>> {
let mut source = self.clone();
let new_projection = self.projection.source.try_merge(projection)?;
let split_projection =
SplitProjection::new(self.table_schema.file_schema(), &new_projection);
source.projection = split_projection;
Ok(Some(Arc::new(source)))
}
fn projection(&self) -> Option<&ProjectionExprs> {
Some(&self.projection.source)
}
fn metrics(&self) -> &ExecutionPlanMetricsSet {
&self.metrics
}
fn file_type(&self) -> &str {
"json"
}
}
impl FileOpener for JsonOpener {
fn open(&self, partitioned_file: PartitionedFile) -> Result<FileOpenFuture> {
let store = Arc::clone(&self.object_store);
let schema = Arc::clone(&self.projected_schema);
let batch_size = self.batch_size;
let file_compression_type = self.file_compression_type.to_owned();
let newline_delimited = self.newline_delimited;
if !newline_delimited && partitioned_file.range.is_some() {
return Err(DataFusionError::NotImplemented(
"JSON array format does not support range-based file scanning. \
Disable repartition_file_scans or use newline-delimited JSON format."
.to_string(),
));
}
Ok(Box::pin(async move {
let calculated_range =
calculate_range(&partitioned_file, &store, None).await?;
let range = match calculated_range {
RangeCalculation::Range(None) => None,
RangeCalculation::Range(Some(range)) => Some(range.into()),
RangeCalculation::TerminateEarly => {
return Ok(
futures::stream::poll_fn(move |_| Poll::Ready(None)).boxed()
);
}
};
let options = GetOptions {
range,
..Default::default()
};
let result = store
.get_opts(&partitioned_file.object_meta.location, options)
.await?;
match result.payload {
#[cfg(not(target_arch = "wasm32"))]
GetResultPayload::File(mut file, _) => {
let bytes = match partitioned_file.range {
None => file_compression_type.convert_read(file)?,
Some(_) => {
file.seek(SeekFrom::Start(result.range.start as _))?;
let limit = result.range.end - result.range.start;
file_compression_type.convert_read(file.take(limit))?
}
};
if newline_delimited {
let reader = BufReader::new(bytes);
let arrow_reader = ReaderBuilder::new(schema)
.with_batch_size(batch_size)
.build(reader)?;
Ok(futures::stream::iter(arrow_reader)
.map(|r| r.map_err(Into::into))
.boxed())
} else {
let ndjson_reader = JsonArrayToNdjsonReader::with_capacity(
bytes,
JSON_CONVERTER_BUFFER_SIZE,
);
let arrow_reader = ReaderBuilder::new(schema)
.with_batch_size(batch_size)
.build(ndjson_reader)?;
Ok(futures::stream::iter(arrow_reader)
.map(|r| r.map_err(Into::into))
.boxed())
}
}
GetResultPayload::Stream(s) => {
if newline_delimited {
let s = s.map_err(DataFusionError::from);
let decoder = ReaderBuilder::new(schema)
.with_batch_size(batch_size)
.build_decoder()?;
let input =
file_compression_type.convert_stream(s.boxed())?.fuse();
let stream = deserialize_stream(
input,
DecoderDeserializer::new(JsonDecoder::new(decoder)),
);
Ok(stream.map_err(Into::into).boxed())
} else {
let s = s.map_err(DataFusionError::from);
let decompressed_stream =
file_compression_type.convert_stream(s.boxed())?;
let (byte_tx, byte_rx) = tokio::sync::mpsc::channel::<bytes::Bytes>(
CHANNEL_BUFFER_SIZE,
);
let (result_tx, result_rx) = tokio::sync::mpsc::channel(2);
let error_tx = result_tx.clone();
let read_task = SpawnedTask::spawn(async move {
tokio::pin!(decompressed_stream);
while let Some(chunk) = decompressed_stream.next().await {
match chunk {
Ok(bytes) => {
if byte_tx.send(bytes).await.is_err() {
break; }
}
Err(e) => {
let _ = error_tx
.send(Err(
arrow::error::ArrowError::ExternalError(
Box::new(e),
),
))
.await;
break;
}
}
}
});
let parse_task = SpawnedTask::spawn_blocking(move || {
let channel_reader = ChannelReader::new(byte_rx);
let mut ndjson_reader =
JsonArrayToNdjsonReader::with_capacity(
channel_reader,
JSON_CONVERTER_BUFFER_SIZE,
);
match ReaderBuilder::new(schema)
.with_batch_size(batch_size)
.build(&mut ndjson_reader)
{
Ok(arrow_reader) => {
for batch_result in arrow_reader {
if result_tx.blocking_send(batch_result).is_err()
{
break; }
}
}
Err(e) => {
let _ = result_tx.blocking_send(Err(e));
}
}
if let Err(e) = ndjson_reader.validate_complete() {
let _ = result_tx.blocking_send(Err(
arrow::error::ArrowError::JsonError(e.to_string()),
));
}
});
let stream = JsonArrayStream {
inner: ReceiverStream::new(result_rx),
_read_task: read_task,
_parse_task: parse_task,
};
Ok(stream.map(|r| r.map_err(Into::into)).boxed())
}
}
}
}))
}
}
pub async fn plan_to_json(
task_ctx: Arc<TaskContext>,
plan: Arc<dyn ExecutionPlan>,
path: impl AsRef<str>,
) -> Result<()> {
let path = path.as_ref();
let parsed = ListingTableUrl::parse(path)?;
let object_store_url = parsed.object_store();
let store = task_ctx.runtime_env().object_store(&object_store_url)?;
let writer_buffer_size = task_ctx
.session_config()
.options()
.execution
.objectstore_writer_buffer_size;
let mut join_set = JoinSet::new();
for i in 0..plan.output_partitioning().partition_count() {
let storeref = Arc::clone(&store);
let plan: Arc<dyn ExecutionPlan> = Arc::clone(&plan);
let filename = format!("{}/part-{i}.json", parsed.prefix());
let file = object_store::path::Path::parse(filename)?;
let mut stream = plan.execute(i, Arc::clone(&task_ctx))?;
join_set.spawn(async move {
let mut buf_writer =
BufWriter::with_capacity(storeref, file.clone(), writer_buffer_size);
let mut buffer = Vec::with_capacity(1024);
while let Some(batch) = stream.next().await.transpose()? {
let mut writer = json::LineDelimitedWriter::new(buffer);
writer.write(&batch)?;
buffer = writer.into_inner();
buf_writer.write_all(&buffer).await?;
buffer.clear();
}
buf_writer.shutdown().await.map_err(DataFusionError::from)
});
}
while let Some(result) = join_set.join_next().await {
match result {
Ok(res) => res?, Err(e) => {
if e.is_panic() {
std::panic::resume_unwind(e.into_panic());
} else {
unreachable!();
}
}
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use arrow::datatypes::{DataType, Field, Schema};
use bytes::Bytes;
use datafusion_datasource::FileRange;
use futures::TryStreamExt;
use object_store::memory::InMemory;
use object_store::path::Path;
use object_store::{ObjectStoreExt, PutPayload};
fn test_schema() -> SchemaRef {
Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, true),
Field::new("name", DataType::Utf8, true),
]))
}
#[tokio::test]
async fn test_json_array_from_file() -> Result<()> {
let json_data = r#"[{"id": 1, "name": "alice"}, {"id": 2, "name": "bob"}]"#;
let store = Arc::new(InMemory::new());
let path = Path::from("test.json");
store
.put(&path, PutPayload::from_static(json_data.as_bytes()))
.await?;
let opener = JsonOpener::new(
1024,
test_schema(),
FileCompressionType::UNCOMPRESSED,
store.clone(),
false, );
let meta = store.head(&path).await?;
let file = PartitionedFile::new(path.to_string(), meta.size);
let stream = opener.open(file)?.await?;
let batches: Vec<_> = stream.try_collect().await?;
assert_eq!(batches.len(), 1);
assert_eq!(batches[0].num_rows(), 2);
Ok(())
}
#[tokio::test]
async fn test_json_array_from_stream() -> Result<()> {
let json_data = r#"[{"id": 1, "name": "alice"}, {"id": 2, "name": "bob"}, {"id": 3, "name": "charlie"}]"#;
let store = Arc::new(InMemory::new());
let path = Path::from("test_stream.json");
store
.put(&path, PutPayload::from_static(json_data.as_bytes()))
.await?;
let opener = JsonOpener::new(
2, test_schema(),
FileCompressionType::UNCOMPRESSED,
store.clone(),
false, );
let meta = store.head(&path).await?;
let file = PartitionedFile::new(path.to_string(), meta.size);
let stream = opener.open(file)?.await?;
let batches: Vec<_> = stream.try_collect().await?;
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(total_rows, 3);
Ok(())
}
#[tokio::test]
async fn test_json_array_nested_objects() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, true),
Field::new("data", DataType::Utf8, true),
]));
let json_data = r#"[
{"id": 1, "data": "{\"nested\": true}"},
{"id": 2, "data": "[1, 2, 3]"}
]"#;
let store = Arc::new(InMemory::new());
let path = Path::from("nested.json");
store
.put(&path, PutPayload::from_static(json_data.as_bytes()))
.await?;
let opener = JsonOpener::new(
1024,
schema,
FileCompressionType::UNCOMPRESSED,
store.clone(),
false,
);
let meta = store.head(&path).await?;
let file = PartitionedFile::new(path.to_string(), meta.size);
let stream = opener.open(file)?.await?;
let batches: Vec<_> = stream.try_collect().await?;
assert_eq!(batches[0].num_rows(), 2);
Ok(())
}
#[tokio::test]
async fn test_json_array_empty() -> Result<()> {
let json_data = "[]";
let store = Arc::new(InMemory::new());
let path = Path::from("empty.json");
store
.put(&path, PutPayload::from_static(json_data.as_bytes()))
.await?;
let opener = JsonOpener::new(
1024,
test_schema(),
FileCompressionType::UNCOMPRESSED,
store.clone(),
false,
);
let meta = store.head(&path).await?;
let file = PartitionedFile::new(path.to_string(), meta.size);
let stream = opener.open(file)?.await?;
let batches: Vec<_> = stream.try_collect().await?;
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(total_rows, 0);
Ok(())
}
#[tokio::test]
async fn test_json_array_range_not_supported() {
let store = Arc::new(InMemory::new());
let path = Path::from("test.json");
store
.put(&path, PutPayload::from_static(b"[]"))
.await
.unwrap();
let opener = JsonOpener::new(
1024,
test_schema(),
FileCompressionType::UNCOMPRESSED,
store.clone(),
false, );
let meta = store.head(&path).await.unwrap();
let mut file = PartitionedFile::new(path.to_string(), meta.size);
file.range = Some(FileRange { start: 0, end: 10 });
let result = opener.open(file);
match result {
Ok(_) => panic!("Expected error for range-based JSON array scanning"),
Err(e) => {
assert!(
e.to_string().contains("does not support range-based"),
"Unexpected error message: {e}"
);
}
}
}
#[tokio::test]
async fn test_ndjson_still_works() -> Result<()> {
let json_data =
"{\"id\": 1, \"name\": \"alice\"}\n{\"id\": 2, \"name\": \"bob\"}\n";
let store = Arc::new(InMemory::new());
let path = Path::from("test.ndjson");
store
.put(&path, PutPayload::from_static(json_data.as_bytes()))
.await?;
let opener = JsonOpener::new(
1024,
test_schema(),
FileCompressionType::UNCOMPRESSED,
store.clone(),
true, );
let meta = store.head(&path).await?;
let file = PartitionedFile::new(path.to_string(), meta.size);
let stream = opener.open(file)?.await?;
let batches: Vec<_> = stream.try_collect().await?;
assert_eq!(batches.len(), 1);
assert_eq!(batches[0].num_rows(), 2);
Ok(())
}
#[tokio::test]
async fn test_json_array_large_file() -> Result<()> {
let mut json_data = String::from("[");
for i in 0..1000 {
if i > 0 {
json_data.push(',');
}
json_data.push_str(&format!(r#"{{"id": {i}, "name": "user{i}"}}"#));
}
json_data.push(']');
let store = Arc::new(InMemory::new());
let path = Path::from("large.json");
store
.put(&path, PutPayload::from(Bytes::from(json_data)))
.await?;
let opener = JsonOpener::new(
100, test_schema(),
FileCompressionType::UNCOMPRESSED,
store.clone(),
false,
);
let meta = store.head(&path).await?;
let file = PartitionedFile::new(path.to_string(), meta.size);
let stream = opener.open(file)?.await?;
let batches: Vec<_> = stream.try_collect().await?;
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(total_rows, 1000);
assert!(batches.len() >= 10);
Ok(())
}
#[tokio::test]
async fn test_json_array_stream_cancellation() -> Result<()> {
let mut json_data = String::from("[");
for i in 0..10000 {
if i > 0 {
json_data.push(',');
}
json_data.push_str(&format!(r#"{{"id": {i}, "name": "user{i}"}}"#));
}
json_data.push(']');
let store = Arc::new(InMemory::new());
let path = Path::from("cancel_test.json");
store
.put(&path, PutPayload::from(Bytes::from(json_data)))
.await?;
let opener = JsonOpener::new(
10, test_schema(),
FileCompressionType::UNCOMPRESSED,
store.clone(),
false,
);
let meta = store.head(&path).await?;
let file = PartitionedFile::new(path.to_string(), meta.size);
let mut stream = opener.open(file)?.await?;
let first_batch = stream.next().await;
assert!(first_batch.is_some());
drop(stream);
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
Ok(())
}
}