Skip to main content

datafusion_datasource_json/
source.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Execution plan for reading JSON files (line-delimited and array formats)
19
20use std::io::BufReader;
21use std::pin::Pin;
22use std::sync::Arc;
23use std::task::{Context, Poll};
24
25use crate::file_format::JsonDecoder;
26use crate::utils::{ChannelReader, JsonArrayToNdjsonReader};
27
28use crate::boundary_stream::AlignedBoundaryStream;
29
30use datafusion_common::error::{DataFusionError, Result};
31use datafusion_common::exec_datafusion_err;
32use datafusion_common_runtime::{JoinSet, SpawnedTask};
33use datafusion_datasource::decoder::{DecoderDeserializer, deserialize_stream};
34use datafusion_datasource::file_compression_type::FileCompressionType;
35use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
36use datafusion_datasource::projection::{ProjectionOpener, SplitProjection};
37use datafusion_datasource::{ListingTableUrl, PartitionedFile, as_file_source};
38use datafusion_physical_plan::projection::ProjectionExprs;
39use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
40
41use arrow::array::RecordBatch;
42use arrow::json::ReaderBuilder;
43use arrow::{datatypes::SchemaRef, json};
44use datafusion_datasource::file::FileSource;
45use datafusion_datasource::file_scan_config::FileScanConfig;
46use datafusion_execution::TaskContext;
47use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
48
49use futures::{Stream, StreamExt, TryStreamExt};
50use object_store::buffered::BufWriter;
51use object_store::{GetOptions, GetResultPayload, ObjectStore};
52use tokio::io::AsyncWriteExt;
53use tokio_stream::wrappers::ReceiverStream;
54
55/// Channel buffer size for streaming JSON array processing.
56/// With ~128KB average chunk size, 128 chunks ≈ 16MB buffer.
57const CHANNEL_BUFFER_SIZE: usize = 128;
58
59/// Buffer size for JsonArrayToNdjsonReader (2MB each, 4MB total for input+output)
60const JSON_CONVERTER_BUFFER_SIZE: usize = 2 * 1024 * 1024;
61
62// ============================================================================
63// JsonArrayStream - Custom stream wrapper to hold SpawnedTask handles
64// ============================================================================
65
66/// A stream wrapper that holds SpawnedTask handles to keep them alive
67/// until the stream is fully consumed or dropped.
68///
69/// This ensures cancel-safety: when the stream is dropped, the tasks
70/// are properly aborted via SpawnedTask's Drop implementation.
71struct JsonArrayStream {
72    inner: ReceiverStream<std::result::Result<RecordBatch, arrow::error::ArrowError>>,
73    /// Task that reads from object store and sends bytes to channel.
74    /// Kept alive until stream is consumed or dropped.
75    _read_task: SpawnedTask<()>,
76    /// Task that parses JSON and sends RecordBatches.
77    /// Kept alive until stream is consumed or dropped.
78    _parse_task: SpawnedTask<()>,
79}
80
81impl Stream for JsonArrayStream {
82    type Item = std::result::Result<RecordBatch, arrow::error::ArrowError>;
83
84    fn poll_next(
85        mut self: Pin<&mut Self>,
86        cx: &mut Context<'_>,
87    ) -> Poll<Option<Self::Item>> {
88        Pin::new(&mut self.inner).poll_next(cx)
89    }
90
91    fn size_hint(&self) -> (usize, Option<usize>) {
92        self.inner.size_hint()
93    }
94}
95// ============================================================================
96// JsonOpener and JsonSource
97// ============================================================================
98
99/// A [`FileOpener`] that opens a JSON file and yields a [`FileOpenFuture`]
100pub struct JsonOpener {
101    batch_size: usize,
102    projected_schema: SchemaRef,
103    file_compression_type: FileCompressionType,
104    object_store: Arc<dyn ObjectStore>,
105    /// When `true` (default), expects newline-delimited JSON (NDJSON).
106    /// When `false`, expects JSON array format `[{...}, {...}]`.
107    newline_delimited: bool,
108}
109
110impl JsonOpener {
111    /// Returns a [`JsonOpener`]
112    pub fn new(
113        batch_size: usize,
114        projected_schema: SchemaRef,
115        file_compression_type: FileCompressionType,
116        object_store: Arc<dyn ObjectStore>,
117        newline_delimited: bool,
118    ) -> Self {
119        Self {
120            batch_size,
121            projected_schema,
122            file_compression_type,
123            object_store,
124            newline_delimited,
125        }
126    }
127}
128
129/// JsonSource holds the extra configuration that is necessary for [`JsonOpener`]
130#[derive(Clone)]
131pub struct JsonSource {
132    table_schema: datafusion_datasource::TableSchema,
133    batch_size: Option<usize>,
134    metrics: ExecutionPlanMetricsSet,
135    projection: SplitProjection,
136    /// When `true` (default), expects newline-delimited JSON (NDJSON).
137    /// When `false`, expects JSON array format `[{...}, {...}]`.
138    newline_delimited: bool,
139}
140
141impl JsonSource {
142    /// Initialize a JsonSource with the provided schema
143    pub fn new(table_schema: impl Into<datafusion_datasource::TableSchema>) -> Self {
144        let table_schema = table_schema.into();
145        Self {
146            projection: SplitProjection::unprojected(&table_schema),
147            table_schema,
148            batch_size: None,
149            metrics: ExecutionPlanMetricsSet::new(),
150            newline_delimited: true,
151        }
152    }
153
154    /// Set whether to read as newline-delimited JSON.
155    ///
156    /// When `true` (default), expects newline-delimited format.
157    /// When `false`, expects JSON array format `[{...}, {...}]`.
158    pub fn with_newline_delimited(mut self, newline_delimited: bool) -> Self {
159        self.newline_delimited = newline_delimited;
160        self
161    }
162}
163
164impl From<JsonSource> for Arc<dyn FileSource> {
165    fn from(source: JsonSource) -> Self {
166        as_file_source(source)
167    }
168}
169
170impl FileSource for JsonSource {
171    fn create_file_opener(
172        &self,
173        object_store: Arc<dyn ObjectStore>,
174        base_config: &FileScanConfig,
175        _partition: usize,
176    ) -> Result<Arc<dyn FileOpener>> {
177        // Get the projected file schema for JsonOpener
178        let file_schema = self.table_schema.file_schema();
179        let projected_schema =
180            Arc::new(file_schema.project(&self.projection.file_indices)?);
181
182        let mut opener = Arc::new(JsonOpener {
183            batch_size: self
184                .batch_size
185                .expect("Batch size must set before creating opener"),
186            projected_schema,
187            file_compression_type: base_config.file_compression_type,
188            object_store,
189            newline_delimited: self.newline_delimited,
190        }) as Arc<dyn FileOpener>;
191
192        // Wrap with ProjectionOpener
193        opener = ProjectionOpener::try_new(
194            self.projection.clone(),
195            Arc::clone(&opener),
196            self.table_schema.file_schema(),
197        )?;
198
199        Ok(opener)
200    }
201
202    fn table_schema(&self) -> &datafusion_datasource::TableSchema {
203        &self.table_schema
204    }
205
206    fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
207        let mut conf = self.clone();
208        conf.batch_size = Some(batch_size);
209        Arc::new(conf)
210    }
211
212    fn try_pushdown_projection(
213        &self,
214        projection: &ProjectionExprs,
215    ) -> Result<Option<Arc<dyn FileSource>>> {
216        let mut source = self.clone();
217        let new_projection = self.projection.source.try_merge(projection)?;
218        let split_projection =
219            SplitProjection::new(self.table_schema.file_schema(), &new_projection);
220        source.projection = split_projection;
221        Ok(Some(Arc::new(source)))
222    }
223
224    fn projection(&self) -> Option<&ProjectionExprs> {
225        Some(&self.projection.source)
226    }
227
228    fn metrics(&self) -> &ExecutionPlanMetricsSet {
229        &self.metrics
230    }
231
232    fn file_type(&self) -> &str {
233        "json"
234    }
235}
236
237impl FileOpener for JsonOpener {
238    /// Open a partitioned JSON file.
239    ///
240    /// If `file_meta.range` is `None`, the entire file is opened.
241    /// Else `file_meta.range` is `Some(FileRange{start, end})`, which corresponds to the byte range [start, end) within the file.
242    ///
243    /// Note: `start` or `end` might be in the middle of some lines. In such cases, the following rules
244    /// are applied to determine which lines to read:
245    /// 1. The first line of the partition is the line in which the index of the first character >= `start`.
246    /// 2. The last line of the partition is the line in which the byte at position `end - 1` resides.
247    ///
248    /// Note: JSON array format does not support range-based scanning.
249    fn open(&self, partitioned_file: PartitionedFile) -> Result<FileOpenFuture> {
250        let store = Arc::clone(&self.object_store);
251        let schema = Arc::clone(&self.projected_schema);
252        let batch_size = self.batch_size;
253        let file_compression_type = self.file_compression_type.to_owned();
254        let newline_delimited = self.newline_delimited;
255
256        // JSON array format requires reading the complete file
257        if !newline_delimited && partitioned_file.range.is_some() {
258            return Err(DataFusionError::NotImplemented(
259                "JSON array format does not support range-based file scanning. \
260                 Disable repartition_file_scans or use newline-delimited JSON format."
261                    .to_string(),
262            ));
263        }
264
265        Ok(Box::pin(async move {
266            let file_size = partitioned_file.object_meta.size;
267            let location = &partitioned_file.object_meta.location;
268
269            if let Some(file_range) = partitioned_file.range.as_ref() {
270                let raw_start: u64 = file_range.start.try_into().map_err(|_| {
271                    exec_datafusion_err!(
272                        "Expected start range to fit in u64, got {}",
273                        file_range.start
274                    )
275                })?;
276                let raw_end: u64 = file_range.end.try_into().map_err(|_| {
277                    exec_datafusion_err!(
278                        "Expected end range to fit in u64, got {}",
279                        file_range.end
280                    )
281                })?;
282
283                let aligned_stream = AlignedBoundaryStream::new(
284                    Arc::clone(&store),
285                    location.clone(),
286                    raw_start,
287                    raw_end,
288                    file_size,
289                    b'\n',
290                )
291                .await?
292                .map_err(DataFusionError::from);
293
294                let decoder = ReaderBuilder::new(schema)
295                    .with_batch_size(batch_size)
296                    .build_decoder()?;
297                let input = file_compression_type
298                    .convert_stream(aligned_stream.boxed())?
299                    .fuse();
300                let stream = deserialize_stream(
301                    input,
302                    DecoderDeserializer::new(JsonDecoder::new(decoder)),
303                );
304                return Ok(stream.map_err(Into::into).boxed());
305            }
306
307            // No range specified — read the entire file
308            let options = GetOptions::default();
309            let result = store.get_opts(location, options).await?;
310
311            match result.payload {
312                #[cfg(not(target_arch = "wasm32"))]
313                GetResultPayload::File(file, _) => {
314                    let bytes = file_compression_type.convert_read(file)?;
315
316                    if newline_delimited {
317                        // NDJSON: use BufReader directly
318                        let reader = BufReader::new(bytes);
319                        let arrow_reader = ReaderBuilder::new(schema)
320                            .with_batch_size(batch_size)
321                            .build(reader)?;
322
323                        Ok(futures::stream::iter(arrow_reader)
324                            .map(|r| r.map_err(Into::into))
325                            .boxed())
326                    } else {
327                        // JSON array format: wrap with streaming converter
328                        let ndjson_reader = JsonArrayToNdjsonReader::with_capacity(
329                            bytes,
330                            JSON_CONVERTER_BUFFER_SIZE,
331                        );
332                        let arrow_reader = ReaderBuilder::new(schema)
333                            .with_batch_size(batch_size)
334                            .build(ndjson_reader)?;
335
336                        Ok(futures::stream::iter(arrow_reader)
337                            .map(|r| r.map_err(Into::into))
338                            .boxed())
339                    }
340                }
341                GetResultPayload::Stream(s) => {
342                    if newline_delimited {
343                        // Newline-delimited JSON (NDJSON) streaming reader
344                        let s = s.map_err(DataFusionError::from);
345                        let decoder = ReaderBuilder::new(schema)
346                            .with_batch_size(batch_size)
347                            .build_decoder()?;
348                        let input =
349                            file_compression_type.convert_stream(s.boxed())?.fuse();
350                        let stream = deserialize_stream(
351                            input,
352                            DecoderDeserializer::new(JsonDecoder::new(decoder)),
353                        );
354                        Ok(stream.map_err(Into::into).boxed())
355                    } else {
356                        // JSON array format: streaming conversion with channel-based byte transfer
357                        //
358                        // Architecture:
359                        // 1. Async task reads from object store stream, decompresses, sends to channel
360                        // 2. Blocking task receives bytes, converts JSON array to NDJSON, parses to Arrow
361                        // 3. RecordBatches are sent back via another channel
362                        //
363                        // Memory budget (~32MB):
364                        // - sync_channel: CHANNEL_BUFFER_SIZE chunks (~16MB)
365                        // - JsonArrayToNdjsonReader: 2 × JSON_CONVERTER_BUFFER_SIZE (~4MB)
366                        // - Arrow JsonReader internal buffer (~8MB)
367                        // - Miscellaneous (~4MB)
368
369                        let s = s.map_err(DataFusionError::from);
370                        let decompressed_stream =
371                            file_compression_type.convert_stream(s.boxed())?;
372
373                        // Channel for bytes: async producer -> blocking consumer
374                        // Uses tokio::sync::mpsc so the async send never blocks a
375                        // tokio worker thread; the consumer calls blocking_recv()
376                        // inside spawn_blocking.
377                        let (byte_tx, byte_rx) = tokio::sync::mpsc::channel::<bytes::Bytes>(
378                            CHANNEL_BUFFER_SIZE,
379                        );
380
381                        // Channel for results: sync producer -> async consumer
382                        let (result_tx, result_rx) = tokio::sync::mpsc::channel(2);
383                        let error_tx = result_tx.clone();
384
385                        // Async task: read from object store stream and send bytes to channel
386                        // Store the SpawnedTask to keep it alive until stream is dropped
387                        let read_task = SpawnedTask::spawn(async move {
388                            tokio::pin!(decompressed_stream);
389                            while let Some(chunk) = decompressed_stream.next().await {
390                                match chunk {
391                                    Ok(bytes) => {
392                                        if byte_tx.send(bytes).await.is_err() {
393                                            break; // Consumer dropped
394                                        }
395                                    }
396                                    Err(e) => {
397                                        let _ = error_tx
398                                            .send(Err(
399                                                arrow::error::ArrowError::ExternalError(
400                                                    Box::new(e),
401                                                ),
402                                            ))
403                                            .await;
404                                        break;
405                                    }
406                                }
407                            }
408                            // byte_tx dropped here, signals EOF to ChannelReader
409                        });
410
411                        // Blocking task: receive bytes from channel and parse JSON
412                        // Store the SpawnedTask to keep it alive until stream is dropped
413                        let parse_task = SpawnedTask::spawn_blocking(move || {
414                            let channel_reader = ChannelReader::new(byte_rx);
415                            let mut ndjson_reader =
416                                JsonArrayToNdjsonReader::with_capacity(
417                                    channel_reader,
418                                    JSON_CONVERTER_BUFFER_SIZE,
419                                );
420
421                            match ReaderBuilder::new(schema)
422                                .with_batch_size(batch_size)
423                                .build(&mut ndjson_reader)
424                            {
425                                Ok(arrow_reader) => {
426                                    for batch_result in arrow_reader {
427                                        if result_tx.blocking_send(batch_result).is_err()
428                                        {
429                                            break; // Receiver dropped
430                                        }
431                                    }
432                                }
433                                Err(e) => {
434                                    let _ = result_tx.blocking_send(Err(e));
435                                }
436                            }
437
438                            // Validate the JSON array was properly formed
439                            if let Err(e) = ndjson_reader.validate_complete() {
440                                let _ = result_tx.blocking_send(Err(
441                                    arrow::error::ArrowError::JsonError(e.to_string()),
442                                ));
443                            }
444                            // result_tx dropped here, closes the stream
445                        });
446
447                        // Wrap in JsonArrayStream to keep tasks alive until stream is consumed
448                        let stream = JsonArrayStream {
449                            inner: ReceiverStream::new(result_rx),
450                            _read_task: read_task,
451                            _parse_task: parse_task,
452                        };
453
454                        Ok(stream.map(|r| r.map_err(Into::into)).boxed())
455                    }
456                }
457            }
458        }))
459    }
460}
461
462pub async fn plan_to_json(
463    task_ctx: Arc<TaskContext>,
464    plan: Arc<dyn ExecutionPlan>,
465    path: impl AsRef<str>,
466) -> Result<()> {
467    let path = path.as_ref();
468    let parsed = ListingTableUrl::parse(path)?;
469    let object_store_url = parsed.object_store();
470    let store = task_ctx.runtime_env().object_store(&object_store_url)?;
471    let writer_buffer_size = task_ctx
472        .session_config()
473        .options()
474        .execution
475        .objectstore_writer_buffer_size;
476    let mut join_set = JoinSet::new();
477    for i in 0..plan.output_partitioning().partition_count() {
478        let storeref = Arc::clone(&store);
479        let plan: Arc<dyn ExecutionPlan> = Arc::clone(&plan);
480        let filename = format!("{}/part-{i}.json", parsed.prefix());
481        let file = object_store::path::Path::parse(filename)?;
482
483        let mut stream = plan.execute(i, Arc::clone(&task_ctx))?;
484        join_set.spawn(async move {
485            let mut buf_writer =
486                BufWriter::with_capacity(storeref, file.clone(), writer_buffer_size);
487
488            let mut buffer = Vec::with_capacity(1024);
489            while let Some(batch) = stream.next().await.transpose()? {
490                let mut writer = json::LineDelimitedWriter::new(buffer);
491                writer.write(&batch)?;
492                buffer = writer.into_inner();
493                buf_writer.write_all(&buffer).await?;
494                buffer.clear();
495            }
496
497            buf_writer.shutdown().await.map_err(DataFusionError::from)
498        });
499    }
500
501    while let Some(result) = join_set.join_next().await {
502        match result {
503            Ok(res) => res?, // propagate DataFusion error
504            Err(e) => {
505                if e.is_panic() {
506                    std::panic::resume_unwind(e.into_panic());
507                } else {
508                    unreachable!();
509                }
510            }
511        }
512    }
513
514    Ok(())
515}
516
517#[cfg(test)]
518mod tests {
519    use super::*;
520    use crate::test_utils::{CHUNK_SIZES, make_chunked_store};
521    use arrow::array::{Int64Array, StringArray};
522    use arrow::compute;
523    use arrow::datatypes::{DataType, Field, Schema};
524    use arrow::record_batch::RecordBatch;
525    use bytes::Bytes;
526    use datafusion_datasource::FileRange;
527    use object_store::memory::InMemory;
528    use object_store::path::Path;
529    use object_store::{ObjectStoreExt, PutPayload};
530
531    /// Helper to create a test schema
532    fn test_schema() -> SchemaRef {
533        Arc::new(Schema::new(vec![
534            Field::new("id", DataType::Int64, true),
535            Field::new("name", DataType::Utf8, true),
536        ]))
537    }
538
539    #[tokio::test]
540    async fn test_json_array_from_file() -> Result<()> {
541        // Test reading JSON array format from a file
542        let json_data = r#"[{"id": 1, "name": "alice"}, {"id": 2, "name": "bob"}]"#;
543
544        let store = Arc::new(InMemory::new());
545        let path = Path::from("test.json");
546        store
547            .put(&path, PutPayload::from_static(json_data.as_bytes()))
548            .await?;
549
550        let opener = JsonOpener::new(
551            1024,
552            test_schema(),
553            FileCompressionType::UNCOMPRESSED,
554            store.clone(),
555            false, // JSON array format
556        );
557
558        let meta = store.head(&path).await?;
559        let file = PartitionedFile::new(path.to_string(), meta.size);
560
561        let stream = opener.open(file)?.await?;
562        let batches: Vec<_> = stream.try_collect().await?;
563
564        assert_eq!(batches.len(), 1);
565        assert_eq!(batches[0].num_rows(), 2);
566
567        Ok(())
568    }
569
570    #[tokio::test]
571    async fn test_json_array_from_stream() -> Result<()> {
572        // Test reading JSON array format from object store stream (simulates S3)
573        let json_data = r#"[{"id": 1, "name": "alice"}, {"id": 2, "name": "bob"}, {"id": 3, "name": "charlie"}]"#;
574
575        // Use InMemory store which returns Stream payload
576        let store = Arc::new(InMemory::new());
577        let path = Path::from("test_stream.json");
578        store
579            .put(&path, PutPayload::from_static(json_data.as_bytes()))
580            .await?;
581
582        let opener = JsonOpener::new(
583            2, // small batch size to test multiple batches
584            test_schema(),
585            FileCompressionType::UNCOMPRESSED,
586            store.clone(),
587            false, // JSON array format
588        );
589
590        let meta = store.head(&path).await?;
591        let file = PartitionedFile::new(path.to_string(), meta.size);
592
593        let stream = opener.open(file)?.await?;
594        let batches: Vec<_> = stream.try_collect().await?;
595
596        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
597        assert_eq!(total_rows, 3);
598
599        Ok(())
600    }
601
602    #[tokio::test]
603    async fn test_json_array_nested_objects() -> Result<()> {
604        // Test JSON array with nested objects and arrays
605        let schema = Arc::new(Schema::new(vec![
606            Field::new("id", DataType::Int64, true),
607            Field::new("data", DataType::Utf8, true),
608        ]));
609
610        let json_data = r#"[
611            {"id": 1, "data": "{\"nested\": true}"},
612            {"id": 2, "data": "[1, 2, 3]"}
613        ]"#;
614
615        let store = Arc::new(InMemory::new());
616        let path = Path::from("nested.json");
617        store
618            .put(&path, PutPayload::from_static(json_data.as_bytes()))
619            .await?;
620
621        let opener = JsonOpener::new(
622            1024,
623            schema,
624            FileCompressionType::UNCOMPRESSED,
625            store.clone(),
626            false,
627        );
628
629        let meta = store.head(&path).await?;
630        let file = PartitionedFile::new(path.to_string(), meta.size);
631
632        let stream = opener.open(file)?.await?;
633        let batches: Vec<_> = stream.try_collect().await?;
634
635        assert_eq!(batches[0].num_rows(), 2);
636
637        Ok(())
638    }
639
640    #[tokio::test]
641    async fn test_json_array_empty() -> Result<()> {
642        // Test empty JSON array
643        let json_data = "[]";
644
645        let store = Arc::new(InMemory::new());
646        let path = Path::from("empty.json");
647        store
648            .put(&path, PutPayload::from_static(json_data.as_bytes()))
649            .await?;
650
651        let opener = JsonOpener::new(
652            1024,
653            test_schema(),
654            FileCompressionType::UNCOMPRESSED,
655            store.clone(),
656            false,
657        );
658
659        let meta = store.head(&path).await?;
660        let file = PartitionedFile::new(path.to_string(), meta.size);
661
662        let stream = opener.open(file)?.await?;
663        let batches: Vec<_> = stream.try_collect().await?;
664
665        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
666        assert_eq!(total_rows, 0);
667
668        Ok(())
669    }
670
671    #[tokio::test]
672    async fn test_json_array_range_not_supported() {
673        // Test that range-based scanning returns error for JSON array format
674        let store = Arc::new(InMemory::new());
675        let path = Path::from("test.json");
676        store
677            .put(&path, PutPayload::from_static(b"[]"))
678            .await
679            .unwrap();
680
681        let opener = JsonOpener::new(
682            1024,
683            test_schema(),
684            FileCompressionType::UNCOMPRESSED,
685            store.clone(),
686            false, // JSON array format
687        );
688
689        let meta = store.head(&path).await.unwrap();
690        let mut file = PartitionedFile::new(path.to_string(), meta.size);
691        file.range = Some(FileRange { start: 0, end: 10 });
692
693        let result = opener.open(file);
694        match result {
695            Ok(_) => panic!("Expected error for range-based JSON array scanning"),
696            Err(e) => {
697                assert!(
698                    e.to_string().contains("does not support range-based"),
699                    "Unexpected error message: {e}"
700                );
701            }
702        }
703    }
704
705    #[tokio::test]
706    async fn test_ndjson_still_works() -> Result<()> {
707        // Ensure NDJSON format still works correctly
708        let json_data =
709            "{\"id\": 1, \"name\": \"alice\"}\n{\"id\": 2, \"name\": \"bob\"}\n";
710
711        let store = Arc::new(InMemory::new());
712        let path = Path::from("test.ndjson");
713        store
714            .put(&path, PutPayload::from_static(json_data.as_bytes()))
715            .await?;
716
717        let opener = JsonOpener::new(
718            1024,
719            test_schema(),
720            FileCompressionType::UNCOMPRESSED,
721            store.clone(),
722            true, // NDJSON format
723        );
724
725        let meta = store.head(&path).await?;
726        let file = PartitionedFile::new(path.to_string(), meta.size);
727
728        let stream = opener.open(file)?.await?;
729        let batches: Vec<_> = stream.try_collect().await?;
730
731        assert_eq!(batches.len(), 1);
732        assert_eq!(batches[0].num_rows(), 2);
733
734        Ok(())
735    }
736
737    #[tokio::test]
738    async fn test_json_array_large_file() -> Result<()> {
739        // Test with a larger JSON array to verify streaming works
740        let mut json_data = String::from("[");
741        for i in 0..1000 {
742            if i > 0 {
743                json_data.push(',');
744            }
745            json_data.push_str(&format!(r#"{{"id": {i}, "name": "user{i}"}}"#));
746        }
747        json_data.push(']');
748
749        let store = Arc::new(InMemory::new());
750        let path = Path::from("large.json");
751        store
752            .put(&path, PutPayload::from(Bytes::from(json_data)))
753            .await?;
754
755        let opener = JsonOpener::new(
756            100, // batch size of 100
757            test_schema(),
758            FileCompressionType::UNCOMPRESSED,
759            store.clone(),
760            false,
761        );
762
763        let meta = store.head(&path).await?;
764        let file = PartitionedFile::new(path.to_string(), meta.size);
765
766        let stream = opener.open(file)?.await?;
767        let batches: Vec<_> = stream.try_collect().await?;
768
769        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
770        assert_eq!(total_rows, 1000);
771
772        // Should have multiple batches due to batch_size=100
773        assert!(batches.len() >= 10);
774
775        Ok(())
776    }
777
778    #[tokio::test]
779    async fn test_json_array_stream_cancellation() -> Result<()> {
780        // Test that cancellation works correctly (tasks are aborted when stream is dropped)
781        let mut json_data = String::from("[");
782        for i in 0..10000 {
783            if i > 0 {
784                json_data.push(',');
785            }
786            json_data.push_str(&format!(r#"{{"id": {i}, "name": "user{i}"}}"#));
787        }
788        json_data.push(']');
789
790        let store = Arc::new(InMemory::new());
791        let path = Path::from("cancel_test.json");
792        store
793            .put(&path, PutPayload::from(Bytes::from(json_data)))
794            .await?;
795
796        let opener = JsonOpener::new(
797            10, // small batch size
798            test_schema(),
799            FileCompressionType::UNCOMPRESSED,
800            store.clone(),
801            false,
802        );
803
804        let meta = store.head(&path).await?;
805        let file = PartitionedFile::new(path.to_string(), meta.size);
806
807        let mut stream = opener.open(file)?.await?;
808
809        // Read only first batch, then drop the stream (simulating cancellation)
810        let first_batch = stream.next().await;
811        assert!(first_batch.is_some());
812
813        // Drop the stream - this should abort the spawned tasks via SpawnedTask's Drop
814        drop(stream);
815
816        // Give tasks time to be aborted
817        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
818
819        // If we reach here without hanging, cancellation worked
820        Ok(())
821    }
822
823    fn get_partition_splits() -> Vec<usize> {
824        vec![1usize, 2, 3, 5, 7, 10]
825    }
826
827    /// Opens each byte-range partition of `path` in `store` and collects all
828    /// record batches produced across every partition.
829    async fn collect_partitioned_batches(
830        store: Arc<dyn ObjectStore>,
831        path: &Path,
832        file_size: u64,
833        num_partitions: usize,
834    ) -> Result<Vec<RecordBatch>> {
835        let mut all_batches = Vec::new();
836        for p in 0..num_partitions {
837            let start = (p as u64 * file_size) / num_partitions as u64;
838            let end = ((p as u64 + 1) * file_size) / num_partitions as u64;
839
840            let meta = store.head(path).await?;
841            let mut file = PartitionedFile::new(path.to_string(), meta.size);
842            file.range = Some(FileRange {
843                start: start as i64,
844                end: end as i64,
845            });
846
847            let opener = JsonOpener::new(
848                1024,
849                test_schema(),
850                FileCompressionType::UNCOMPRESSED,
851                Arc::clone(&store),
852                true,
853            );
854
855            let stream = opener.open(file)?.await?;
856            let batches: Vec<_> = stream.try_collect().await?;
857            all_batches.extend(batches);
858        }
859        Ok(all_batches)
860    }
861
862    /// Concatenates `batches` and returns a single batch sorted ascending by
863    /// the first (id) column.
864    fn concat_and_sort_by_id(batches: &[RecordBatch]) -> Result<RecordBatch> {
865        let schema = test_schema();
866        let combined = compute::concat_batches(&schema, batches)?;
867        let indices = compute::sort_to_indices(combined.column(0), None, None)?;
868        let sorted_cols: Vec<_> = combined
869            .columns()
870            .iter()
871            .map(|col| compute::take(col.as_ref(), &indices, None))
872            .collect::<std::result::Result<_, _>>()?;
873        Ok(RecordBatch::try_new(schema, sorted_cols)?)
874    }
875
876    #[tokio::test]
877    async fn test_ndjson_partitioned() -> Result<()> {
878        // Build an NDJSON file with a known number of rows.
879        let num_rows: usize = 20;
880        let mut ndjson = String::new();
881        for i in 0..num_rows {
882            ndjson.push_str(&format!("{{\"id\": {i}, \"name\": \"user{i}\"}}\n"));
883        }
884        let ndjson_bytes = Bytes::from(ndjson);
885        let file_size = ndjson_bytes.len() as u64;
886
887        for &cs in CHUNK_SIZES {
888            let (store, path) = make_chunked_store(&ndjson_bytes, cs).await;
889
890            for num_partitions in get_partition_splits() {
891                let batches = collect_partitioned_batches(
892                    Arc::clone(&store),
893                    &path,
894                    file_size,
895                    num_partitions,
896                )
897                .await?;
898
899                let total: usize = batches.iter().map(|b| b.num_rows()).sum();
900                assert_eq!(
901                    total, num_rows,
902                    "Expected {num_rows} rows with {num_partitions} partitions"
903                );
904
905                let result = concat_and_sort_by_id(&batches)?;
906                let ids = result
907                    .column(0)
908                    .as_any()
909                    .downcast_ref::<Int64Array>()
910                    .unwrap();
911                let names = result
912                    .column(1)
913                    .as_any()
914                    .downcast_ref::<StringArray>()
915                    .unwrap();
916                for i in 0..num_rows {
917                    assert_eq!(
918                        ids.value(i),
919                        i as i64,
920                        "id mismatch at row {i} with {num_partitions} partitions"
921                    );
922                    assert_eq!(
923                        names.value(i),
924                        format!("user{i}"),
925                        "name mismatch at row {i} with {num_partitions} partitions"
926                    );
927                }
928            }
929        }
930
931        Ok(())
932    }
933
934    #[tokio::test]
935    async fn test_ndjson_partitioned_uneven_lines() -> Result<()> {
936        // Lines of deliberately varying lengths so byte-range boundaries are
937        // more likely to land in the middle of a line.
938        let rows: &[(&str, &str)] = &[
939            ("1", "alice"),
940            ("2", "bob-with-a-longer-name"),
941            ("3", "charlie"),
942            ("4", "x"),
943            ("5", "diana-has-an-even-longer-name-here"),
944            ("6", "ed"),
945            ("7", "francesca"),
946            ("8", "g"),
947            ("9", "hector-the-magnificent"),
948            ("10", "isabella"),
949        ];
950        let num_rows = rows.len();
951
952        let mut ndjson = String::new();
953        for (id, name) in rows {
954            ndjson.push_str(&format!("{{\"id\": {id}, \"name\": \"{name}\"}}\n"));
955        }
956        let ndjson_bytes = Bytes::from(ndjson);
957        let file_size = ndjson_bytes.len() as u64;
958
959        for &cs in CHUNK_SIZES {
960            let (store, path) = make_chunked_store(&ndjson_bytes, cs).await;
961
962            for num_partitions in get_partition_splits() {
963                let batches = collect_partitioned_batches(
964                    Arc::clone(&store),
965                    &path,
966                    file_size,
967                    num_partitions,
968                )
969                .await?;
970
971                let total: usize = batches.iter().map(|b| b.num_rows()).sum();
972                assert_eq!(
973                    total, num_rows,
974                    "Expected {num_rows} rows with {num_partitions} partitions"
975                );
976
977                let result = concat_and_sort_by_id(&batches)?;
978                let ids = result
979                    .column(0)
980                    .as_any()
981                    .downcast_ref::<Int64Array>()
982                    .unwrap();
983                let names = result
984                    .column(1)
985                    .as_any()
986                    .downcast_ref::<StringArray>()
987                    .unwrap();
988                for (i, (expected_id, expected_name)) in rows.iter().enumerate() {
989                    assert_eq!(
990                        ids.value(i),
991                        expected_id.parse::<i64>().unwrap(),
992                        "id mismatch at row {i} with {num_partitions} partitions"
993                    );
994                    assert_eq!(
995                        names.value(i),
996                        *expected_name,
997                        "name mismatch at row {i} with {num_partitions} partitions"
998                    );
999                }
1000            }
1001        }
1002
1003        Ok(())
1004    }
1005
1006    #[tokio::test]
1007    async fn test_ndjson_partitioned_single_entry() -> Result<()> {
1008        // A single JSON object with no trailing newline. No matter how many
1009        // byte-range partitions the file is split into, exactly one row must
1010        // be produced in total.
1011        let ndjson = r#"{"id": 1, "name": "alice"}"#;
1012        let ndjson_bytes = Bytes::from(ndjson);
1013        let file_size = ndjson_bytes.len() as u64;
1014
1015        for &cs in CHUNK_SIZES {
1016            let (store, path) = make_chunked_store(&ndjson_bytes, cs).await;
1017
1018            for num_partitions in get_partition_splits() {
1019                let batches = collect_partitioned_batches(
1020                    Arc::clone(&store),
1021                    &path,
1022                    file_size,
1023                    num_partitions,
1024                )
1025                .await?;
1026
1027                let total: usize = batches.iter().map(|b| b.num_rows()).sum();
1028                assert_eq!(
1029                    total, 1,
1030                    "Expected exactly 1 row with {num_partitions} partitions"
1031                );
1032
1033                let result = concat_and_sort_by_id(&batches)?;
1034                let ids = result
1035                    .column(0)
1036                    .as_any()
1037                    .downcast_ref::<Int64Array>()
1038                    .unwrap();
1039                let names = result
1040                    .column(1)
1041                    .as_any()
1042                    .downcast_ref::<StringArray>()
1043                    .unwrap();
1044                assert_eq!(ids.value(0), 1);
1045                assert_eq!(names.value(0), "alice");
1046            }
1047        }
1048
1049        Ok(())
1050    }
1051}