1use std::any::Any;
21use std::io::{BufReader, Read, Seek, SeekFrom};
22use std::pin::Pin;
23use std::sync::Arc;
24use std::task::{Context, Poll};
25
26use crate::file_format::JsonDecoder;
27use crate::utils::{ChannelReader, JsonArrayToNdjsonReader};
28
29use datafusion_common::error::{DataFusionError, Result};
30use datafusion_common_runtime::{JoinSet, SpawnedTask};
31use datafusion_datasource::decoder::{DecoderDeserializer, deserialize_stream};
32use datafusion_datasource::file_compression_type::FileCompressionType;
33use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
34use datafusion_datasource::projection::{ProjectionOpener, SplitProjection};
35use datafusion_datasource::{
36 ListingTableUrl, PartitionedFile, RangeCalculation, as_file_source, calculate_range,
37};
38use datafusion_physical_plan::projection::ProjectionExprs;
39use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
40
41use arrow::array::RecordBatch;
42use arrow::json::ReaderBuilder;
43use arrow::{datatypes::SchemaRef, json};
44use datafusion_datasource::file::FileSource;
45use datafusion_datasource::file_scan_config::FileScanConfig;
46use datafusion_execution::TaskContext;
47use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
48
49use futures::{Stream, StreamExt, TryStreamExt};
50use object_store::buffered::BufWriter;
51use object_store::{GetOptions, GetResultPayload, ObjectStore};
52use tokio::io::AsyncWriteExt;
53use tokio_stream::wrappers::ReceiverStream;
54
55const CHANNEL_BUFFER_SIZE: usize = 128;
58
59const JSON_CONVERTER_BUFFER_SIZE: usize = 2 * 1024 * 1024;
61
62struct JsonArrayStream {
72 inner: ReceiverStream<std::result::Result<RecordBatch, arrow::error::ArrowError>>,
73 _read_task: SpawnedTask<()>,
76 _parse_task: SpawnedTask<()>,
79}
80
81impl Stream for JsonArrayStream {
82 type Item = std::result::Result<RecordBatch, arrow::error::ArrowError>;
83
84 fn poll_next(
85 mut self: Pin<&mut Self>,
86 cx: &mut Context<'_>,
87 ) -> Poll<Option<Self::Item>> {
88 Pin::new(&mut self.inner).poll_next(cx)
89 }
90
91 fn size_hint(&self) -> (usize, Option<usize>) {
92 self.inner.size_hint()
93 }
94}
95pub struct JsonOpener {
101 batch_size: usize,
102 projected_schema: SchemaRef,
103 file_compression_type: FileCompressionType,
104 object_store: Arc<dyn ObjectStore>,
105 newline_delimited: bool,
108}
109
110impl JsonOpener {
111 pub fn new(
113 batch_size: usize,
114 projected_schema: SchemaRef,
115 file_compression_type: FileCompressionType,
116 object_store: Arc<dyn ObjectStore>,
117 newline_delimited: bool,
118 ) -> Self {
119 Self {
120 batch_size,
121 projected_schema,
122 file_compression_type,
123 object_store,
124 newline_delimited,
125 }
126 }
127}
128
129#[derive(Clone)]
131pub struct JsonSource {
132 table_schema: datafusion_datasource::TableSchema,
133 batch_size: Option<usize>,
134 metrics: ExecutionPlanMetricsSet,
135 projection: SplitProjection,
136 newline_delimited: bool,
139}
140
141impl JsonSource {
142 pub fn new(table_schema: impl Into<datafusion_datasource::TableSchema>) -> Self {
144 let table_schema = table_schema.into();
145 Self {
146 projection: SplitProjection::unprojected(&table_schema),
147 table_schema,
148 batch_size: None,
149 metrics: ExecutionPlanMetricsSet::new(),
150 newline_delimited: true,
151 }
152 }
153
154 pub fn with_newline_delimited(mut self, newline_delimited: bool) -> Self {
159 self.newline_delimited = newline_delimited;
160 self
161 }
162}
163
164impl From<JsonSource> for Arc<dyn FileSource> {
165 fn from(source: JsonSource) -> Self {
166 as_file_source(source)
167 }
168}
169
170impl FileSource for JsonSource {
171 fn create_file_opener(
172 &self,
173 object_store: Arc<dyn ObjectStore>,
174 base_config: &FileScanConfig,
175 _partition: usize,
176 ) -> Result<Arc<dyn FileOpener>> {
177 let file_schema = self.table_schema.file_schema();
179 let projected_schema =
180 Arc::new(file_schema.project(&self.projection.file_indices)?);
181
182 let mut opener = Arc::new(JsonOpener {
183 batch_size: self
184 .batch_size
185 .expect("Batch size must set before creating opener"),
186 projected_schema,
187 file_compression_type: base_config.file_compression_type,
188 object_store,
189 newline_delimited: self.newline_delimited,
190 }) as Arc<dyn FileOpener>;
191
192 opener = ProjectionOpener::try_new(
194 self.projection.clone(),
195 Arc::clone(&opener),
196 self.table_schema.file_schema(),
197 )?;
198
199 Ok(opener)
200 }
201
202 fn as_any(&self) -> &dyn Any {
203 self
204 }
205
206 fn table_schema(&self) -> &datafusion_datasource::TableSchema {
207 &self.table_schema
208 }
209
210 fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
211 let mut conf = self.clone();
212 conf.batch_size = Some(batch_size);
213 Arc::new(conf)
214 }
215
216 fn try_pushdown_projection(
217 &self,
218 projection: &ProjectionExprs,
219 ) -> Result<Option<Arc<dyn FileSource>>> {
220 let mut source = self.clone();
221 let new_projection = self.projection.source.try_merge(projection)?;
222 let split_projection =
223 SplitProjection::new(self.table_schema.file_schema(), &new_projection);
224 source.projection = split_projection;
225 Ok(Some(Arc::new(source)))
226 }
227
228 fn projection(&self) -> Option<&ProjectionExprs> {
229 Some(&self.projection.source)
230 }
231
232 fn metrics(&self) -> &ExecutionPlanMetricsSet {
233 &self.metrics
234 }
235
236 fn file_type(&self) -> &str {
237 "json"
238 }
239}
240
241impl FileOpener for JsonOpener {
242 fn open(&self, partitioned_file: PartitionedFile) -> Result<FileOpenFuture> {
254 let store = Arc::clone(&self.object_store);
255 let schema = Arc::clone(&self.projected_schema);
256 let batch_size = self.batch_size;
257 let file_compression_type = self.file_compression_type.to_owned();
258 let newline_delimited = self.newline_delimited;
259
260 if !newline_delimited && partitioned_file.range.is_some() {
262 return Err(DataFusionError::NotImplemented(
263 "JSON array format does not support range-based file scanning. \
264 Disable repartition_file_scans or use newline-delimited JSON format."
265 .to_string(),
266 ));
267 }
268
269 Ok(Box::pin(async move {
270 let calculated_range =
271 calculate_range(&partitioned_file, &store, None).await?;
272
273 let range = match calculated_range {
274 RangeCalculation::Range(None) => None,
275 RangeCalculation::Range(Some(range)) => Some(range.into()),
276 RangeCalculation::TerminateEarly => {
277 return Ok(
278 futures::stream::poll_fn(move |_| Poll::Ready(None)).boxed()
279 );
280 }
281 };
282
283 let options = GetOptions {
284 range,
285 ..Default::default()
286 };
287
288 let result = store
289 .get_opts(&partitioned_file.object_meta.location, options)
290 .await?;
291
292 match result.payload {
293 #[cfg(not(target_arch = "wasm32"))]
294 GetResultPayload::File(mut file, _) => {
295 let bytes = match partitioned_file.range {
296 None => file_compression_type.convert_read(file)?,
297 Some(_) => {
298 file.seek(SeekFrom::Start(result.range.start as _))?;
299 let limit = result.range.end - result.range.start;
300 file_compression_type.convert_read(file.take(limit))?
301 }
302 };
303
304 if newline_delimited {
305 let reader = BufReader::new(bytes);
307 let arrow_reader = ReaderBuilder::new(schema)
308 .with_batch_size(batch_size)
309 .build(reader)?;
310
311 Ok(futures::stream::iter(arrow_reader)
312 .map(|r| r.map_err(Into::into))
313 .boxed())
314 } else {
315 let ndjson_reader = JsonArrayToNdjsonReader::with_capacity(
317 bytes,
318 JSON_CONVERTER_BUFFER_SIZE,
319 );
320 let arrow_reader = ReaderBuilder::new(schema)
321 .with_batch_size(batch_size)
322 .build(ndjson_reader)?;
323
324 Ok(futures::stream::iter(arrow_reader)
325 .map(|r| r.map_err(Into::into))
326 .boxed())
327 }
328 }
329 GetResultPayload::Stream(s) => {
330 if newline_delimited {
331 let s = s.map_err(DataFusionError::from);
333 let decoder = ReaderBuilder::new(schema)
334 .with_batch_size(batch_size)
335 .build_decoder()?;
336 let input =
337 file_compression_type.convert_stream(s.boxed())?.fuse();
338 let stream = deserialize_stream(
339 input,
340 DecoderDeserializer::new(JsonDecoder::new(decoder)),
341 );
342 Ok(stream.map_err(Into::into).boxed())
343 } else {
344 let s = s.map_err(DataFusionError::from);
358 let decompressed_stream =
359 file_compression_type.convert_stream(s.boxed())?;
360
361 let (byte_tx, byte_rx) = tokio::sync::mpsc::channel::<bytes::Bytes>(
366 CHANNEL_BUFFER_SIZE,
367 );
368
369 let (result_tx, result_rx) = tokio::sync::mpsc::channel(2);
371 let error_tx = result_tx.clone();
372
373 let read_task = SpawnedTask::spawn(async move {
376 tokio::pin!(decompressed_stream);
377 while let Some(chunk) = decompressed_stream.next().await {
378 match chunk {
379 Ok(bytes) => {
380 if byte_tx.send(bytes).await.is_err() {
381 break; }
383 }
384 Err(e) => {
385 let _ = error_tx
386 .send(Err(
387 arrow::error::ArrowError::ExternalError(
388 Box::new(e),
389 ),
390 ))
391 .await;
392 break;
393 }
394 }
395 }
396 });
398
399 let parse_task = SpawnedTask::spawn_blocking(move || {
402 let channel_reader = ChannelReader::new(byte_rx);
403 let mut ndjson_reader =
404 JsonArrayToNdjsonReader::with_capacity(
405 channel_reader,
406 JSON_CONVERTER_BUFFER_SIZE,
407 );
408
409 match ReaderBuilder::new(schema)
410 .with_batch_size(batch_size)
411 .build(&mut ndjson_reader)
412 {
413 Ok(arrow_reader) => {
414 for batch_result in arrow_reader {
415 if result_tx.blocking_send(batch_result).is_err()
416 {
417 break; }
419 }
420 }
421 Err(e) => {
422 let _ = result_tx.blocking_send(Err(e));
423 }
424 }
425
426 if let Err(e) = ndjson_reader.validate_complete() {
428 let _ = result_tx.blocking_send(Err(
429 arrow::error::ArrowError::JsonError(e.to_string()),
430 ));
431 }
432 });
434
435 let stream = JsonArrayStream {
437 inner: ReceiverStream::new(result_rx),
438 _read_task: read_task,
439 _parse_task: parse_task,
440 };
441
442 Ok(stream.map(|r| r.map_err(Into::into)).boxed())
443 }
444 }
445 }
446 }))
447 }
448}
449
450pub async fn plan_to_json(
451 task_ctx: Arc<TaskContext>,
452 plan: Arc<dyn ExecutionPlan>,
453 path: impl AsRef<str>,
454) -> Result<()> {
455 let path = path.as_ref();
456 let parsed = ListingTableUrl::parse(path)?;
457 let object_store_url = parsed.object_store();
458 let store = task_ctx.runtime_env().object_store(&object_store_url)?;
459 let writer_buffer_size = task_ctx
460 .session_config()
461 .options()
462 .execution
463 .objectstore_writer_buffer_size;
464 let mut join_set = JoinSet::new();
465 for i in 0..plan.output_partitioning().partition_count() {
466 let storeref = Arc::clone(&store);
467 let plan: Arc<dyn ExecutionPlan> = Arc::clone(&plan);
468 let filename = format!("{}/part-{i}.json", parsed.prefix());
469 let file = object_store::path::Path::parse(filename)?;
470
471 let mut stream = plan.execute(i, Arc::clone(&task_ctx))?;
472 join_set.spawn(async move {
473 let mut buf_writer =
474 BufWriter::with_capacity(storeref, file.clone(), writer_buffer_size);
475
476 let mut buffer = Vec::with_capacity(1024);
477 while let Some(batch) = stream.next().await.transpose()? {
478 let mut writer = json::LineDelimitedWriter::new(buffer);
479 writer.write(&batch)?;
480 buffer = writer.into_inner();
481 buf_writer.write_all(&buffer).await?;
482 buffer.clear();
483 }
484
485 buf_writer.shutdown().await.map_err(DataFusionError::from)
486 });
487 }
488
489 while let Some(result) = join_set.join_next().await {
490 match result {
491 Ok(res) => res?, Err(e) => {
493 if e.is_panic() {
494 std::panic::resume_unwind(e.into_panic());
495 } else {
496 unreachable!();
497 }
498 }
499 }
500 }
501
502 Ok(())
503}
504
505#[cfg(test)]
506mod tests {
507 use super::*;
508 use arrow::datatypes::{DataType, Field, Schema};
509 use bytes::Bytes;
510 use datafusion_datasource::FileRange;
511 use futures::TryStreamExt;
512 use object_store::memory::InMemory;
513 use object_store::path::Path;
514 use object_store::{ObjectStoreExt, PutPayload};
515
516 fn test_schema() -> SchemaRef {
518 Arc::new(Schema::new(vec![
519 Field::new("id", DataType::Int64, true),
520 Field::new("name", DataType::Utf8, true),
521 ]))
522 }
523
524 #[tokio::test]
525 async fn test_json_array_from_file() -> Result<()> {
526 let json_data = r#"[{"id": 1, "name": "alice"}, {"id": 2, "name": "bob"}]"#;
528
529 let store = Arc::new(InMemory::new());
530 let path = Path::from("test.json");
531 store
532 .put(&path, PutPayload::from_static(json_data.as_bytes()))
533 .await?;
534
535 let opener = JsonOpener::new(
536 1024,
537 test_schema(),
538 FileCompressionType::UNCOMPRESSED,
539 store.clone(),
540 false, );
542
543 let meta = store.head(&path).await?;
544 let file = PartitionedFile::new(path.to_string(), meta.size);
545
546 let stream = opener.open(file)?.await?;
547 let batches: Vec<_> = stream.try_collect().await?;
548
549 assert_eq!(batches.len(), 1);
550 assert_eq!(batches[0].num_rows(), 2);
551
552 Ok(())
553 }
554
555 #[tokio::test]
556 async fn test_json_array_from_stream() -> Result<()> {
557 let json_data = r#"[{"id": 1, "name": "alice"}, {"id": 2, "name": "bob"}, {"id": 3, "name": "charlie"}]"#;
559
560 let store = Arc::new(InMemory::new());
562 let path = Path::from("test_stream.json");
563 store
564 .put(&path, PutPayload::from_static(json_data.as_bytes()))
565 .await?;
566
567 let opener = JsonOpener::new(
568 2, test_schema(),
570 FileCompressionType::UNCOMPRESSED,
571 store.clone(),
572 false, );
574
575 let meta = store.head(&path).await?;
576 let file = PartitionedFile::new(path.to_string(), meta.size);
577
578 let stream = opener.open(file)?.await?;
579 let batches: Vec<_> = stream.try_collect().await?;
580
581 let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
582 assert_eq!(total_rows, 3);
583
584 Ok(())
585 }
586
587 #[tokio::test]
588 async fn test_json_array_nested_objects() -> Result<()> {
589 let schema = Arc::new(Schema::new(vec![
591 Field::new("id", DataType::Int64, true),
592 Field::new("data", DataType::Utf8, true),
593 ]));
594
595 let json_data = r#"[
596 {"id": 1, "data": "{\"nested\": true}"},
597 {"id": 2, "data": "[1, 2, 3]"}
598 ]"#;
599
600 let store = Arc::new(InMemory::new());
601 let path = Path::from("nested.json");
602 store
603 .put(&path, PutPayload::from_static(json_data.as_bytes()))
604 .await?;
605
606 let opener = JsonOpener::new(
607 1024,
608 schema,
609 FileCompressionType::UNCOMPRESSED,
610 store.clone(),
611 false,
612 );
613
614 let meta = store.head(&path).await?;
615 let file = PartitionedFile::new(path.to_string(), meta.size);
616
617 let stream = opener.open(file)?.await?;
618 let batches: Vec<_> = stream.try_collect().await?;
619
620 assert_eq!(batches[0].num_rows(), 2);
621
622 Ok(())
623 }
624
625 #[tokio::test]
626 async fn test_json_array_empty() -> Result<()> {
627 let json_data = "[]";
629
630 let store = Arc::new(InMemory::new());
631 let path = Path::from("empty.json");
632 store
633 .put(&path, PutPayload::from_static(json_data.as_bytes()))
634 .await?;
635
636 let opener = JsonOpener::new(
637 1024,
638 test_schema(),
639 FileCompressionType::UNCOMPRESSED,
640 store.clone(),
641 false,
642 );
643
644 let meta = store.head(&path).await?;
645 let file = PartitionedFile::new(path.to_string(), meta.size);
646
647 let stream = opener.open(file)?.await?;
648 let batches: Vec<_> = stream.try_collect().await?;
649
650 let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
651 assert_eq!(total_rows, 0);
652
653 Ok(())
654 }
655
656 #[tokio::test]
657 async fn test_json_array_range_not_supported() {
658 let store = Arc::new(InMemory::new());
660 let path = Path::from("test.json");
661 store
662 .put(&path, PutPayload::from_static(b"[]"))
663 .await
664 .unwrap();
665
666 let opener = JsonOpener::new(
667 1024,
668 test_schema(),
669 FileCompressionType::UNCOMPRESSED,
670 store.clone(),
671 false, );
673
674 let meta = store.head(&path).await.unwrap();
675 let mut file = PartitionedFile::new(path.to_string(), meta.size);
676 file.range = Some(FileRange { start: 0, end: 10 });
677
678 let result = opener.open(file);
679 match result {
680 Ok(_) => panic!("Expected error for range-based JSON array scanning"),
681 Err(e) => {
682 assert!(
683 e.to_string().contains("does not support range-based"),
684 "Unexpected error message: {e}"
685 );
686 }
687 }
688 }
689
690 #[tokio::test]
691 async fn test_ndjson_still_works() -> Result<()> {
692 let json_data =
694 "{\"id\": 1, \"name\": \"alice\"}\n{\"id\": 2, \"name\": \"bob\"}\n";
695
696 let store = Arc::new(InMemory::new());
697 let path = Path::from("test.ndjson");
698 store
699 .put(&path, PutPayload::from_static(json_data.as_bytes()))
700 .await?;
701
702 let opener = JsonOpener::new(
703 1024,
704 test_schema(),
705 FileCompressionType::UNCOMPRESSED,
706 store.clone(),
707 true, );
709
710 let meta = store.head(&path).await?;
711 let file = PartitionedFile::new(path.to_string(), meta.size);
712
713 let stream = opener.open(file)?.await?;
714 let batches: Vec<_> = stream.try_collect().await?;
715
716 assert_eq!(batches.len(), 1);
717 assert_eq!(batches[0].num_rows(), 2);
718
719 Ok(())
720 }
721
722 #[tokio::test]
723 async fn test_json_array_large_file() -> Result<()> {
724 let mut json_data = String::from("[");
726 for i in 0..1000 {
727 if i > 0 {
728 json_data.push(',');
729 }
730 json_data.push_str(&format!(r#"{{"id": {i}, "name": "user{i}"}}"#));
731 }
732 json_data.push(']');
733
734 let store = Arc::new(InMemory::new());
735 let path = Path::from("large.json");
736 store
737 .put(&path, PutPayload::from(Bytes::from(json_data)))
738 .await?;
739
740 let opener = JsonOpener::new(
741 100, test_schema(),
743 FileCompressionType::UNCOMPRESSED,
744 store.clone(),
745 false,
746 );
747
748 let meta = store.head(&path).await?;
749 let file = PartitionedFile::new(path.to_string(), meta.size);
750
751 let stream = opener.open(file)?.await?;
752 let batches: Vec<_> = stream.try_collect().await?;
753
754 let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
755 assert_eq!(total_rows, 1000);
756
757 assert!(batches.len() >= 10);
759
760 Ok(())
761 }
762
763 #[tokio::test]
764 async fn test_json_array_stream_cancellation() -> Result<()> {
765 let mut json_data = String::from("[");
767 for i in 0..10000 {
768 if i > 0 {
769 json_data.push(',');
770 }
771 json_data.push_str(&format!(r#"{{"id": {i}, "name": "user{i}"}}"#));
772 }
773 json_data.push(']');
774
775 let store = Arc::new(InMemory::new());
776 let path = Path::from("cancel_test.json");
777 store
778 .put(&path, PutPayload::from(Bytes::from(json_data)))
779 .await?;
780
781 let opener = JsonOpener::new(
782 10, test_schema(),
784 FileCompressionType::UNCOMPRESSED,
785 store.clone(),
786 false,
787 );
788
789 let meta = store.head(&path).await?;
790 let file = PartitionedFile::new(path.to_string(), meta.size);
791
792 let mut stream = opener.open(file)?.await?;
793
794 let first_batch = stream.next().await;
796 assert!(first_batch.is_some());
797
798 drop(stream);
800
801 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
803
804 Ok(())
806 }
807}