1use std::io::BufReader;
21use std::pin::Pin;
22use std::sync::Arc;
23use std::task::{Context, Poll};
24
25use crate::file_format::JsonDecoder;
26use crate::utils::{ChannelReader, JsonArrayToNdjsonReader};
27
28use crate::boundary_stream::AlignedBoundaryStream;
29
30use datafusion_common::error::{DataFusionError, Result};
31use datafusion_common::exec_datafusion_err;
32use datafusion_common_runtime::{JoinSet, SpawnedTask};
33use datafusion_datasource::decoder::{DecoderDeserializer, deserialize_stream};
34use datafusion_datasource::file_compression_type::FileCompressionType;
35use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
36use datafusion_datasource::projection::{ProjectionOpener, SplitProjection};
37use datafusion_datasource::{ListingTableUrl, PartitionedFile, as_file_source};
38use datafusion_physical_plan::projection::ProjectionExprs;
39use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
40
41use arrow::array::RecordBatch;
42use arrow::json::ReaderBuilder;
43use arrow::{datatypes::SchemaRef, json};
44use datafusion_datasource::file::FileSource;
45use datafusion_datasource::file_scan_config::FileScanConfig;
46use datafusion_execution::TaskContext;
47use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
48
49use futures::{Stream, StreamExt, TryStreamExt};
50use object_store::buffered::BufWriter;
51use object_store::{GetOptions, GetResultPayload, ObjectStore};
52use tokio::io::AsyncWriteExt;
53use tokio_stream::wrappers::ReceiverStream;
54
55const CHANNEL_BUFFER_SIZE: usize = 128;
58
59const JSON_CONVERTER_BUFFER_SIZE: usize = 2 * 1024 * 1024;
61
62struct JsonArrayStream {
72 inner: ReceiverStream<std::result::Result<RecordBatch, arrow::error::ArrowError>>,
73 _read_task: SpawnedTask<()>,
76 _parse_task: SpawnedTask<()>,
79}
80
81impl Stream for JsonArrayStream {
82 type Item = std::result::Result<RecordBatch, arrow::error::ArrowError>;
83
84 fn poll_next(
85 mut self: Pin<&mut Self>,
86 cx: &mut Context<'_>,
87 ) -> Poll<Option<Self::Item>> {
88 Pin::new(&mut self.inner).poll_next(cx)
89 }
90
91 fn size_hint(&self) -> (usize, Option<usize>) {
92 self.inner.size_hint()
93 }
94}
95pub struct JsonOpener {
101 batch_size: usize,
102 projected_schema: SchemaRef,
103 file_compression_type: FileCompressionType,
104 object_store: Arc<dyn ObjectStore>,
105 newline_delimited: bool,
108}
109
110impl JsonOpener {
111 pub fn new(
113 batch_size: usize,
114 projected_schema: SchemaRef,
115 file_compression_type: FileCompressionType,
116 object_store: Arc<dyn ObjectStore>,
117 newline_delimited: bool,
118 ) -> Self {
119 Self {
120 batch_size,
121 projected_schema,
122 file_compression_type,
123 object_store,
124 newline_delimited,
125 }
126 }
127}
128
129#[derive(Clone)]
131pub struct JsonSource {
132 table_schema: datafusion_datasource::TableSchema,
133 batch_size: Option<usize>,
134 metrics: ExecutionPlanMetricsSet,
135 projection: SplitProjection,
136 newline_delimited: bool,
139}
140
141impl JsonSource {
142 pub fn new(table_schema: impl Into<datafusion_datasource::TableSchema>) -> Self {
144 let table_schema = table_schema.into();
145 Self {
146 projection: SplitProjection::unprojected(&table_schema),
147 table_schema,
148 batch_size: None,
149 metrics: ExecutionPlanMetricsSet::new(),
150 newline_delimited: true,
151 }
152 }
153
154 pub fn with_newline_delimited(mut self, newline_delimited: bool) -> Self {
159 self.newline_delimited = newline_delimited;
160 self
161 }
162}
163
164impl From<JsonSource> for Arc<dyn FileSource> {
165 fn from(source: JsonSource) -> Self {
166 as_file_source(source)
167 }
168}
169
170impl FileSource for JsonSource {
171 fn create_file_opener(
172 &self,
173 object_store: Arc<dyn ObjectStore>,
174 base_config: &FileScanConfig,
175 _partition: usize,
176 ) -> Result<Arc<dyn FileOpener>> {
177 let file_schema = self.table_schema.file_schema();
179 let projected_schema =
180 Arc::new(file_schema.project(&self.projection.file_indices)?);
181
182 let mut opener = Arc::new(JsonOpener {
183 batch_size: self
184 .batch_size
185 .expect("Batch size must set before creating opener"),
186 projected_schema,
187 file_compression_type: base_config.file_compression_type,
188 object_store,
189 newline_delimited: self.newline_delimited,
190 }) as Arc<dyn FileOpener>;
191
192 opener = ProjectionOpener::try_new(
194 self.projection.clone(),
195 Arc::clone(&opener),
196 self.table_schema.file_schema(),
197 )?;
198
199 Ok(opener)
200 }
201
202 fn table_schema(&self) -> &datafusion_datasource::TableSchema {
203 &self.table_schema
204 }
205
206 fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
207 let mut conf = self.clone();
208 conf.batch_size = Some(batch_size);
209 Arc::new(conf)
210 }
211
212 fn try_pushdown_projection(
213 &self,
214 projection: &ProjectionExprs,
215 ) -> Result<Option<Arc<dyn FileSource>>> {
216 let mut source = self.clone();
217 let new_projection = self.projection.source.try_merge(projection)?;
218 let split_projection =
219 SplitProjection::new(self.table_schema.file_schema(), &new_projection);
220 source.projection = split_projection;
221 Ok(Some(Arc::new(source)))
222 }
223
224 fn projection(&self) -> Option<&ProjectionExprs> {
225 Some(&self.projection.source)
226 }
227
228 fn metrics(&self) -> &ExecutionPlanMetricsSet {
229 &self.metrics
230 }
231
232 fn file_type(&self) -> &str {
233 "json"
234 }
235}
236
237impl FileOpener for JsonOpener {
238 fn open(&self, partitioned_file: PartitionedFile) -> Result<FileOpenFuture> {
250 let store = Arc::clone(&self.object_store);
251 let schema = Arc::clone(&self.projected_schema);
252 let batch_size = self.batch_size;
253 let file_compression_type = self.file_compression_type.to_owned();
254 let newline_delimited = self.newline_delimited;
255
256 if !newline_delimited && partitioned_file.range.is_some() {
258 return Err(DataFusionError::NotImplemented(
259 "JSON array format does not support range-based file scanning. \
260 Disable repartition_file_scans or use newline-delimited JSON format."
261 .to_string(),
262 ));
263 }
264
265 Ok(Box::pin(async move {
266 let file_size = partitioned_file.object_meta.size;
267 let location = &partitioned_file.object_meta.location;
268
269 if let Some(file_range) = partitioned_file.range.as_ref() {
270 let raw_start: u64 = file_range.start.try_into().map_err(|_| {
271 exec_datafusion_err!(
272 "Expected start range to fit in u64, got {}",
273 file_range.start
274 )
275 })?;
276 let raw_end: u64 = file_range.end.try_into().map_err(|_| {
277 exec_datafusion_err!(
278 "Expected end range to fit in u64, got {}",
279 file_range.end
280 )
281 })?;
282
283 let aligned_stream = AlignedBoundaryStream::new(
284 Arc::clone(&store),
285 location.clone(),
286 raw_start,
287 raw_end,
288 file_size,
289 b'\n',
290 )
291 .await?
292 .map_err(DataFusionError::from);
293
294 let decoder = ReaderBuilder::new(schema)
295 .with_batch_size(batch_size)
296 .build_decoder()?;
297 let input = file_compression_type
298 .convert_stream(aligned_stream.boxed())?
299 .fuse();
300 let stream = deserialize_stream(
301 input,
302 DecoderDeserializer::new(JsonDecoder::new(decoder)),
303 );
304 return Ok(stream.map_err(Into::into).boxed());
305 }
306
307 let options = GetOptions::default();
309 let result = store.get_opts(location, options).await?;
310
311 match result.payload {
312 #[cfg(not(target_arch = "wasm32"))]
313 GetResultPayload::File(file, _) => {
314 let bytes = file_compression_type.convert_read(file)?;
315
316 if newline_delimited {
317 let reader = BufReader::new(bytes);
319 let arrow_reader = ReaderBuilder::new(schema)
320 .with_batch_size(batch_size)
321 .build(reader)?;
322
323 Ok(futures::stream::iter(arrow_reader)
324 .map(|r| r.map_err(Into::into))
325 .boxed())
326 } else {
327 let ndjson_reader = JsonArrayToNdjsonReader::with_capacity(
329 bytes,
330 JSON_CONVERTER_BUFFER_SIZE,
331 );
332 let arrow_reader = ReaderBuilder::new(schema)
333 .with_batch_size(batch_size)
334 .build(ndjson_reader)?;
335
336 Ok(futures::stream::iter(arrow_reader)
337 .map(|r| r.map_err(Into::into))
338 .boxed())
339 }
340 }
341 GetResultPayload::Stream(s) => {
342 if newline_delimited {
343 let s = s.map_err(DataFusionError::from);
345 let decoder = ReaderBuilder::new(schema)
346 .with_batch_size(batch_size)
347 .build_decoder()?;
348 let input =
349 file_compression_type.convert_stream(s.boxed())?.fuse();
350 let stream = deserialize_stream(
351 input,
352 DecoderDeserializer::new(JsonDecoder::new(decoder)),
353 );
354 Ok(stream.map_err(Into::into).boxed())
355 } else {
356 let s = s.map_err(DataFusionError::from);
370 let decompressed_stream =
371 file_compression_type.convert_stream(s.boxed())?;
372
373 let (byte_tx, byte_rx) = tokio::sync::mpsc::channel::<bytes::Bytes>(
378 CHANNEL_BUFFER_SIZE,
379 );
380
381 let (result_tx, result_rx) = tokio::sync::mpsc::channel(2);
383 let error_tx = result_tx.clone();
384
385 let read_task = SpawnedTask::spawn(async move {
388 tokio::pin!(decompressed_stream);
389 while let Some(chunk) = decompressed_stream.next().await {
390 match chunk {
391 Ok(bytes) => {
392 if byte_tx.send(bytes).await.is_err() {
393 break; }
395 }
396 Err(e) => {
397 let _ = error_tx
398 .send(Err(
399 arrow::error::ArrowError::ExternalError(
400 Box::new(e),
401 ),
402 ))
403 .await;
404 break;
405 }
406 }
407 }
408 });
410
411 let parse_task = SpawnedTask::spawn_blocking(move || {
414 let channel_reader = ChannelReader::new(byte_rx);
415 let mut ndjson_reader =
416 JsonArrayToNdjsonReader::with_capacity(
417 channel_reader,
418 JSON_CONVERTER_BUFFER_SIZE,
419 );
420
421 match ReaderBuilder::new(schema)
422 .with_batch_size(batch_size)
423 .build(&mut ndjson_reader)
424 {
425 Ok(arrow_reader) => {
426 for batch_result in arrow_reader {
427 if result_tx.blocking_send(batch_result).is_err()
428 {
429 break; }
431 }
432 }
433 Err(e) => {
434 let _ = result_tx.blocking_send(Err(e));
435 }
436 }
437
438 if let Err(e) = ndjson_reader.validate_complete() {
440 let _ = result_tx.blocking_send(Err(
441 arrow::error::ArrowError::JsonError(e.to_string()),
442 ));
443 }
444 });
446
447 let stream = JsonArrayStream {
449 inner: ReceiverStream::new(result_rx),
450 _read_task: read_task,
451 _parse_task: parse_task,
452 };
453
454 Ok(stream.map(|r| r.map_err(Into::into)).boxed())
455 }
456 }
457 }
458 }))
459 }
460}
461
462pub async fn plan_to_json(
463 task_ctx: Arc<TaskContext>,
464 plan: Arc<dyn ExecutionPlan>,
465 path: impl AsRef<str>,
466) -> Result<()> {
467 let path = path.as_ref();
468 let parsed = ListingTableUrl::parse(path)?;
469 let object_store_url = parsed.object_store();
470 let store = task_ctx.runtime_env().object_store(&object_store_url)?;
471 let writer_buffer_size = task_ctx
472 .session_config()
473 .options()
474 .execution
475 .objectstore_writer_buffer_size;
476 let mut join_set = JoinSet::new();
477 for i in 0..plan.output_partitioning().partition_count() {
478 let storeref = Arc::clone(&store);
479 let plan: Arc<dyn ExecutionPlan> = Arc::clone(&plan);
480 let filename = format!("{}/part-{i}.json", parsed.prefix());
481 let file = object_store::path::Path::parse(filename)?;
482
483 let mut stream = plan.execute(i, Arc::clone(&task_ctx))?;
484 join_set.spawn(async move {
485 let mut buf_writer =
486 BufWriter::with_capacity(storeref, file.clone(), writer_buffer_size);
487
488 let mut buffer = Vec::with_capacity(1024);
489 while let Some(batch) = stream.next().await.transpose()? {
490 let mut writer = json::LineDelimitedWriter::new(buffer);
491 writer.write(&batch)?;
492 buffer = writer.into_inner();
493 buf_writer.write_all(&buffer).await?;
494 buffer.clear();
495 }
496
497 buf_writer.shutdown().await.map_err(DataFusionError::from)
498 });
499 }
500
501 while let Some(result) = join_set.join_next().await {
502 match result {
503 Ok(res) => res?, Err(e) => {
505 if e.is_panic() {
506 std::panic::resume_unwind(e.into_panic());
507 } else {
508 unreachable!();
509 }
510 }
511 }
512 }
513
514 Ok(())
515}
516
517#[cfg(test)]
518mod tests {
519 use super::*;
520 use crate::test_utils::{CHUNK_SIZES, make_chunked_store};
521 use arrow::array::{Int64Array, StringArray};
522 use arrow::compute;
523 use arrow::datatypes::{DataType, Field, Schema};
524 use arrow::record_batch::RecordBatch;
525 use bytes::Bytes;
526 use datafusion_datasource::FileRange;
527 use object_store::memory::InMemory;
528 use object_store::path::Path;
529 use object_store::{ObjectStoreExt, PutPayload};
530
531 fn test_schema() -> SchemaRef {
533 Arc::new(Schema::new(vec![
534 Field::new("id", DataType::Int64, true),
535 Field::new("name", DataType::Utf8, true),
536 ]))
537 }
538
539 #[tokio::test]
540 async fn test_json_array_from_file() -> Result<()> {
541 let json_data = r#"[{"id": 1, "name": "alice"}, {"id": 2, "name": "bob"}]"#;
543
544 let store = Arc::new(InMemory::new());
545 let path = Path::from("test.json");
546 store
547 .put(&path, PutPayload::from_static(json_data.as_bytes()))
548 .await?;
549
550 let opener = JsonOpener::new(
551 1024,
552 test_schema(),
553 FileCompressionType::UNCOMPRESSED,
554 store.clone(),
555 false, );
557
558 let meta = store.head(&path).await?;
559 let file = PartitionedFile::new(path.to_string(), meta.size);
560
561 let stream = opener.open(file)?.await?;
562 let batches: Vec<_> = stream.try_collect().await?;
563
564 assert_eq!(batches.len(), 1);
565 assert_eq!(batches[0].num_rows(), 2);
566
567 Ok(())
568 }
569
570 #[tokio::test]
571 async fn test_json_array_from_stream() -> Result<()> {
572 let json_data = r#"[{"id": 1, "name": "alice"}, {"id": 2, "name": "bob"}, {"id": 3, "name": "charlie"}]"#;
574
575 let store = Arc::new(InMemory::new());
577 let path = Path::from("test_stream.json");
578 store
579 .put(&path, PutPayload::from_static(json_data.as_bytes()))
580 .await?;
581
582 let opener = JsonOpener::new(
583 2, test_schema(),
585 FileCompressionType::UNCOMPRESSED,
586 store.clone(),
587 false, );
589
590 let meta = store.head(&path).await?;
591 let file = PartitionedFile::new(path.to_string(), meta.size);
592
593 let stream = opener.open(file)?.await?;
594 let batches: Vec<_> = stream.try_collect().await?;
595
596 let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
597 assert_eq!(total_rows, 3);
598
599 Ok(())
600 }
601
602 #[tokio::test]
603 async fn test_json_array_nested_objects() -> Result<()> {
604 let schema = Arc::new(Schema::new(vec![
606 Field::new("id", DataType::Int64, true),
607 Field::new("data", DataType::Utf8, true),
608 ]));
609
610 let json_data = r#"[
611 {"id": 1, "data": "{\"nested\": true}"},
612 {"id": 2, "data": "[1, 2, 3]"}
613 ]"#;
614
615 let store = Arc::new(InMemory::new());
616 let path = Path::from("nested.json");
617 store
618 .put(&path, PutPayload::from_static(json_data.as_bytes()))
619 .await?;
620
621 let opener = JsonOpener::new(
622 1024,
623 schema,
624 FileCompressionType::UNCOMPRESSED,
625 store.clone(),
626 false,
627 );
628
629 let meta = store.head(&path).await?;
630 let file = PartitionedFile::new(path.to_string(), meta.size);
631
632 let stream = opener.open(file)?.await?;
633 let batches: Vec<_> = stream.try_collect().await?;
634
635 assert_eq!(batches[0].num_rows(), 2);
636
637 Ok(())
638 }
639
640 #[tokio::test]
641 async fn test_json_array_empty() -> Result<()> {
642 let json_data = "[]";
644
645 let store = Arc::new(InMemory::new());
646 let path = Path::from("empty.json");
647 store
648 .put(&path, PutPayload::from_static(json_data.as_bytes()))
649 .await?;
650
651 let opener = JsonOpener::new(
652 1024,
653 test_schema(),
654 FileCompressionType::UNCOMPRESSED,
655 store.clone(),
656 false,
657 );
658
659 let meta = store.head(&path).await?;
660 let file = PartitionedFile::new(path.to_string(), meta.size);
661
662 let stream = opener.open(file)?.await?;
663 let batches: Vec<_> = stream.try_collect().await?;
664
665 let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
666 assert_eq!(total_rows, 0);
667
668 Ok(())
669 }
670
671 #[tokio::test]
672 async fn test_json_array_range_not_supported() {
673 let store = Arc::new(InMemory::new());
675 let path = Path::from("test.json");
676 store
677 .put(&path, PutPayload::from_static(b"[]"))
678 .await
679 .unwrap();
680
681 let opener = JsonOpener::new(
682 1024,
683 test_schema(),
684 FileCompressionType::UNCOMPRESSED,
685 store.clone(),
686 false, );
688
689 let meta = store.head(&path).await.unwrap();
690 let mut file = PartitionedFile::new(path.to_string(), meta.size);
691 file.range = Some(FileRange { start: 0, end: 10 });
692
693 let result = opener.open(file);
694 match result {
695 Ok(_) => panic!("Expected error for range-based JSON array scanning"),
696 Err(e) => {
697 assert!(
698 e.to_string().contains("does not support range-based"),
699 "Unexpected error message: {e}"
700 );
701 }
702 }
703 }
704
705 #[tokio::test]
706 async fn test_ndjson_still_works() -> Result<()> {
707 let json_data =
709 "{\"id\": 1, \"name\": \"alice\"}\n{\"id\": 2, \"name\": \"bob\"}\n";
710
711 let store = Arc::new(InMemory::new());
712 let path = Path::from("test.ndjson");
713 store
714 .put(&path, PutPayload::from_static(json_data.as_bytes()))
715 .await?;
716
717 let opener = JsonOpener::new(
718 1024,
719 test_schema(),
720 FileCompressionType::UNCOMPRESSED,
721 store.clone(),
722 true, );
724
725 let meta = store.head(&path).await?;
726 let file = PartitionedFile::new(path.to_string(), meta.size);
727
728 let stream = opener.open(file)?.await?;
729 let batches: Vec<_> = stream.try_collect().await?;
730
731 assert_eq!(batches.len(), 1);
732 assert_eq!(batches[0].num_rows(), 2);
733
734 Ok(())
735 }
736
737 #[tokio::test]
738 async fn test_json_array_large_file() -> Result<()> {
739 let mut json_data = String::from("[");
741 for i in 0..1000 {
742 if i > 0 {
743 json_data.push(',');
744 }
745 json_data.push_str(&format!(r#"{{"id": {i}, "name": "user{i}"}}"#));
746 }
747 json_data.push(']');
748
749 let store = Arc::new(InMemory::new());
750 let path = Path::from("large.json");
751 store
752 .put(&path, PutPayload::from(Bytes::from(json_data)))
753 .await?;
754
755 let opener = JsonOpener::new(
756 100, test_schema(),
758 FileCompressionType::UNCOMPRESSED,
759 store.clone(),
760 false,
761 );
762
763 let meta = store.head(&path).await?;
764 let file = PartitionedFile::new(path.to_string(), meta.size);
765
766 let stream = opener.open(file)?.await?;
767 let batches: Vec<_> = stream.try_collect().await?;
768
769 let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
770 assert_eq!(total_rows, 1000);
771
772 assert!(batches.len() >= 10);
774
775 Ok(())
776 }
777
778 #[tokio::test]
779 async fn test_json_array_stream_cancellation() -> Result<()> {
780 let mut json_data = String::from("[");
782 for i in 0..10000 {
783 if i > 0 {
784 json_data.push(',');
785 }
786 json_data.push_str(&format!(r#"{{"id": {i}, "name": "user{i}"}}"#));
787 }
788 json_data.push(']');
789
790 let store = Arc::new(InMemory::new());
791 let path = Path::from("cancel_test.json");
792 store
793 .put(&path, PutPayload::from(Bytes::from(json_data)))
794 .await?;
795
796 let opener = JsonOpener::new(
797 10, test_schema(),
799 FileCompressionType::UNCOMPRESSED,
800 store.clone(),
801 false,
802 );
803
804 let meta = store.head(&path).await?;
805 let file = PartitionedFile::new(path.to_string(), meta.size);
806
807 let mut stream = opener.open(file)?.await?;
808
809 let first_batch = stream.next().await;
811 assert!(first_batch.is_some());
812
813 drop(stream);
815
816 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
818
819 Ok(())
821 }
822
823 fn get_partition_splits() -> Vec<usize> {
824 vec![1usize, 2, 3, 5, 7, 10]
825 }
826
827 async fn collect_partitioned_batches(
830 store: Arc<dyn ObjectStore>,
831 path: &Path,
832 file_size: u64,
833 num_partitions: usize,
834 ) -> Result<Vec<RecordBatch>> {
835 let mut all_batches = Vec::new();
836 for p in 0..num_partitions {
837 let start = (p as u64 * file_size) / num_partitions as u64;
838 let end = ((p as u64 + 1) * file_size) / num_partitions as u64;
839
840 let meta = store.head(path).await?;
841 let mut file = PartitionedFile::new(path.to_string(), meta.size);
842 file.range = Some(FileRange {
843 start: start as i64,
844 end: end as i64,
845 });
846
847 let opener = JsonOpener::new(
848 1024,
849 test_schema(),
850 FileCompressionType::UNCOMPRESSED,
851 Arc::clone(&store),
852 true,
853 );
854
855 let stream = opener.open(file)?.await?;
856 let batches: Vec<_> = stream.try_collect().await?;
857 all_batches.extend(batches);
858 }
859 Ok(all_batches)
860 }
861
862 fn concat_and_sort_by_id(batches: &[RecordBatch]) -> Result<RecordBatch> {
865 let schema = test_schema();
866 let combined = compute::concat_batches(&schema, batches)?;
867 let indices = compute::sort_to_indices(combined.column(0), None, None)?;
868 let sorted_cols: Vec<_> = combined
869 .columns()
870 .iter()
871 .map(|col| compute::take(col.as_ref(), &indices, None))
872 .collect::<std::result::Result<_, _>>()?;
873 Ok(RecordBatch::try_new(schema, sorted_cols)?)
874 }
875
876 #[tokio::test]
877 async fn test_ndjson_partitioned() -> Result<()> {
878 let num_rows: usize = 20;
880 let mut ndjson = String::new();
881 for i in 0..num_rows {
882 ndjson.push_str(&format!("{{\"id\": {i}, \"name\": \"user{i}\"}}\n"));
883 }
884 let ndjson_bytes = Bytes::from(ndjson);
885 let file_size = ndjson_bytes.len() as u64;
886
887 for &cs in CHUNK_SIZES {
888 let (store, path) = make_chunked_store(&ndjson_bytes, cs).await;
889
890 for num_partitions in get_partition_splits() {
891 let batches = collect_partitioned_batches(
892 Arc::clone(&store),
893 &path,
894 file_size,
895 num_partitions,
896 )
897 .await?;
898
899 let total: usize = batches.iter().map(|b| b.num_rows()).sum();
900 assert_eq!(
901 total, num_rows,
902 "Expected {num_rows} rows with {num_partitions} partitions"
903 );
904
905 let result = concat_and_sort_by_id(&batches)?;
906 let ids = result
907 .column(0)
908 .as_any()
909 .downcast_ref::<Int64Array>()
910 .unwrap();
911 let names = result
912 .column(1)
913 .as_any()
914 .downcast_ref::<StringArray>()
915 .unwrap();
916 for i in 0..num_rows {
917 assert_eq!(
918 ids.value(i),
919 i as i64,
920 "id mismatch at row {i} with {num_partitions} partitions"
921 );
922 assert_eq!(
923 names.value(i),
924 format!("user{i}"),
925 "name mismatch at row {i} with {num_partitions} partitions"
926 );
927 }
928 }
929 }
930
931 Ok(())
932 }
933
934 #[tokio::test]
935 async fn test_ndjson_partitioned_uneven_lines() -> Result<()> {
936 let rows: &[(&str, &str)] = &[
939 ("1", "alice"),
940 ("2", "bob-with-a-longer-name"),
941 ("3", "charlie"),
942 ("4", "x"),
943 ("5", "diana-has-an-even-longer-name-here"),
944 ("6", "ed"),
945 ("7", "francesca"),
946 ("8", "g"),
947 ("9", "hector-the-magnificent"),
948 ("10", "isabella"),
949 ];
950 let num_rows = rows.len();
951
952 let mut ndjson = String::new();
953 for (id, name) in rows {
954 ndjson.push_str(&format!("{{\"id\": {id}, \"name\": \"{name}\"}}\n"));
955 }
956 let ndjson_bytes = Bytes::from(ndjson);
957 let file_size = ndjson_bytes.len() as u64;
958
959 for &cs in CHUNK_SIZES {
960 let (store, path) = make_chunked_store(&ndjson_bytes, cs).await;
961
962 for num_partitions in get_partition_splits() {
963 let batches = collect_partitioned_batches(
964 Arc::clone(&store),
965 &path,
966 file_size,
967 num_partitions,
968 )
969 .await?;
970
971 let total: usize = batches.iter().map(|b| b.num_rows()).sum();
972 assert_eq!(
973 total, num_rows,
974 "Expected {num_rows} rows with {num_partitions} partitions"
975 );
976
977 let result = concat_and_sort_by_id(&batches)?;
978 let ids = result
979 .column(0)
980 .as_any()
981 .downcast_ref::<Int64Array>()
982 .unwrap();
983 let names = result
984 .column(1)
985 .as_any()
986 .downcast_ref::<StringArray>()
987 .unwrap();
988 for (i, (expected_id, expected_name)) in rows.iter().enumerate() {
989 assert_eq!(
990 ids.value(i),
991 expected_id.parse::<i64>().unwrap(),
992 "id mismatch at row {i} with {num_partitions} partitions"
993 );
994 assert_eq!(
995 names.value(i),
996 *expected_name,
997 "name mismatch at row {i} with {num_partitions} partitions"
998 );
999 }
1000 }
1001 }
1002
1003 Ok(())
1004 }
1005
1006 #[tokio::test]
1007 async fn test_ndjson_partitioned_single_entry() -> Result<()> {
1008 let ndjson = r#"{"id": 1, "name": "alice"}"#;
1012 let ndjson_bytes = Bytes::from(ndjson);
1013 let file_size = ndjson_bytes.len() as u64;
1014
1015 for &cs in CHUNK_SIZES {
1016 let (store, path) = make_chunked_store(&ndjson_bytes, cs).await;
1017
1018 for num_partitions in get_partition_splits() {
1019 let batches = collect_partitioned_batches(
1020 Arc::clone(&store),
1021 &path,
1022 file_size,
1023 num_partitions,
1024 )
1025 .await?;
1026
1027 let total: usize = batches.iter().map(|b| b.num_rows()).sum();
1028 assert_eq!(
1029 total, 1,
1030 "Expected exactly 1 row with {num_partitions} partitions"
1031 );
1032
1033 let result = concat_and_sort_by_id(&batches)?;
1034 let ids = result
1035 .column(0)
1036 .as_any()
1037 .downcast_ref::<Int64Array>()
1038 .unwrap();
1039 let names = result
1040 .column(1)
1041 .as_any()
1042 .downcast_ref::<StringArray>()
1043 .unwrap();
1044 assert_eq!(ids.value(0), 1);
1045 assert_eq!(names.value(0), "alice");
1046 }
1047 }
1048
1049 Ok(())
1050 }
1051}