Skip to main content

datafusion_datasource_json/
source.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Execution plan for reading JSON files (line-delimited and array formats)
19
20use std::any::Any;
21use std::io::{BufReader, Read, Seek, SeekFrom};
22use std::pin::Pin;
23use std::sync::Arc;
24use std::task::{Context, Poll};
25
26use crate::file_format::JsonDecoder;
27use crate::utils::{ChannelReader, JsonArrayToNdjsonReader};
28
29use datafusion_common::error::{DataFusionError, Result};
30use datafusion_common_runtime::{JoinSet, SpawnedTask};
31use datafusion_datasource::decoder::{DecoderDeserializer, deserialize_stream};
32use datafusion_datasource::file_compression_type::FileCompressionType;
33use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
34use datafusion_datasource::projection::{ProjectionOpener, SplitProjection};
35use datafusion_datasource::{
36    ListingTableUrl, PartitionedFile, RangeCalculation, as_file_source, calculate_range,
37};
38use datafusion_physical_plan::projection::ProjectionExprs;
39use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
40
41use arrow::array::RecordBatch;
42use arrow::json::ReaderBuilder;
43use arrow::{datatypes::SchemaRef, json};
44use datafusion_datasource::file::FileSource;
45use datafusion_datasource::file_scan_config::FileScanConfig;
46use datafusion_execution::TaskContext;
47use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
48
49use futures::{Stream, StreamExt, TryStreamExt};
50use object_store::buffered::BufWriter;
51use object_store::{GetOptions, GetResultPayload, ObjectStore};
52use tokio::io::AsyncWriteExt;
53use tokio_stream::wrappers::ReceiverStream;
54
55/// Channel buffer size for streaming JSON array processing.
56/// With ~128KB average chunk size, 128 chunks ≈ 16MB buffer.
57const CHANNEL_BUFFER_SIZE: usize = 128;
58
59/// Buffer size for JsonArrayToNdjsonReader (2MB each, 4MB total for input+output)
60const JSON_CONVERTER_BUFFER_SIZE: usize = 2 * 1024 * 1024;
61
62// ============================================================================
63// JsonArrayStream - Custom stream wrapper to hold SpawnedTask handles
64// ============================================================================
65
66/// A stream wrapper that holds SpawnedTask handles to keep them alive
67/// until the stream is fully consumed or dropped.
68///
69/// This ensures cancel-safety: when the stream is dropped, the tasks
70/// are properly aborted via SpawnedTask's Drop implementation.
71struct JsonArrayStream {
72    inner: ReceiverStream<std::result::Result<RecordBatch, arrow::error::ArrowError>>,
73    /// Task that reads from object store and sends bytes to channel.
74    /// Kept alive until stream is consumed or dropped.
75    _read_task: SpawnedTask<()>,
76    /// Task that parses JSON and sends RecordBatches.
77    /// Kept alive until stream is consumed or dropped.
78    _parse_task: SpawnedTask<()>,
79}
80
81impl Stream for JsonArrayStream {
82    type Item = std::result::Result<RecordBatch, arrow::error::ArrowError>;
83
84    fn poll_next(
85        mut self: Pin<&mut Self>,
86        cx: &mut Context<'_>,
87    ) -> Poll<Option<Self::Item>> {
88        Pin::new(&mut self.inner).poll_next(cx)
89    }
90
91    fn size_hint(&self) -> (usize, Option<usize>) {
92        self.inner.size_hint()
93    }
94}
95// ============================================================================
96// JsonOpener and JsonSource
97// ============================================================================
98
99/// A [`FileOpener`] that opens a JSON file and yields a [`FileOpenFuture`]
100pub struct JsonOpener {
101    batch_size: usize,
102    projected_schema: SchemaRef,
103    file_compression_type: FileCompressionType,
104    object_store: Arc<dyn ObjectStore>,
105    /// When `true` (default), expects newline-delimited JSON (NDJSON).
106    /// When `false`, expects JSON array format `[{...}, {...}]`.
107    newline_delimited: bool,
108}
109
110impl JsonOpener {
111    /// Returns a [`JsonOpener`]
112    pub fn new(
113        batch_size: usize,
114        projected_schema: SchemaRef,
115        file_compression_type: FileCompressionType,
116        object_store: Arc<dyn ObjectStore>,
117        newline_delimited: bool,
118    ) -> Self {
119        Self {
120            batch_size,
121            projected_schema,
122            file_compression_type,
123            object_store,
124            newline_delimited,
125        }
126    }
127}
128
129/// JsonSource holds the extra configuration that is necessary for [`JsonOpener`]
130#[derive(Clone)]
131pub struct JsonSource {
132    table_schema: datafusion_datasource::TableSchema,
133    batch_size: Option<usize>,
134    metrics: ExecutionPlanMetricsSet,
135    projection: SplitProjection,
136    /// When `true` (default), expects newline-delimited JSON (NDJSON).
137    /// When `false`, expects JSON array format `[{...}, {...}]`.
138    newline_delimited: bool,
139}
140
141impl JsonSource {
142    /// Initialize a JsonSource with the provided schema
143    pub fn new(table_schema: impl Into<datafusion_datasource::TableSchema>) -> Self {
144        let table_schema = table_schema.into();
145        Self {
146            projection: SplitProjection::unprojected(&table_schema),
147            table_schema,
148            batch_size: None,
149            metrics: ExecutionPlanMetricsSet::new(),
150            newline_delimited: true,
151        }
152    }
153
154    /// Set whether to read as newline-delimited JSON.
155    ///
156    /// When `true` (default), expects newline-delimited format.
157    /// When `false`, expects JSON array format `[{...}, {...}]`.
158    pub fn with_newline_delimited(mut self, newline_delimited: bool) -> Self {
159        self.newline_delimited = newline_delimited;
160        self
161    }
162}
163
164impl From<JsonSource> for Arc<dyn FileSource> {
165    fn from(source: JsonSource) -> Self {
166        as_file_source(source)
167    }
168}
169
170impl FileSource for JsonSource {
171    fn create_file_opener(
172        &self,
173        object_store: Arc<dyn ObjectStore>,
174        base_config: &FileScanConfig,
175        _partition: usize,
176    ) -> Result<Arc<dyn FileOpener>> {
177        // Get the projected file schema for JsonOpener
178        let file_schema = self.table_schema.file_schema();
179        let projected_schema =
180            Arc::new(file_schema.project(&self.projection.file_indices)?);
181
182        let mut opener = Arc::new(JsonOpener {
183            batch_size: self
184                .batch_size
185                .expect("Batch size must set before creating opener"),
186            projected_schema,
187            file_compression_type: base_config.file_compression_type,
188            object_store,
189            newline_delimited: self.newline_delimited,
190        }) as Arc<dyn FileOpener>;
191
192        // Wrap with ProjectionOpener
193        opener = ProjectionOpener::try_new(
194            self.projection.clone(),
195            Arc::clone(&opener),
196            self.table_schema.file_schema(),
197        )?;
198
199        Ok(opener)
200    }
201
202    fn as_any(&self) -> &dyn Any {
203        self
204    }
205
206    fn table_schema(&self) -> &datafusion_datasource::TableSchema {
207        &self.table_schema
208    }
209
210    fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
211        let mut conf = self.clone();
212        conf.batch_size = Some(batch_size);
213        Arc::new(conf)
214    }
215
216    fn try_pushdown_projection(
217        &self,
218        projection: &ProjectionExprs,
219    ) -> Result<Option<Arc<dyn FileSource>>> {
220        let mut source = self.clone();
221        let new_projection = self.projection.source.try_merge(projection)?;
222        let split_projection =
223            SplitProjection::new(self.table_schema.file_schema(), &new_projection);
224        source.projection = split_projection;
225        Ok(Some(Arc::new(source)))
226    }
227
228    fn projection(&self) -> Option<&ProjectionExprs> {
229        Some(&self.projection.source)
230    }
231
232    fn metrics(&self) -> &ExecutionPlanMetricsSet {
233        &self.metrics
234    }
235
236    fn file_type(&self) -> &str {
237        "json"
238    }
239}
240
241impl FileOpener for JsonOpener {
242    /// Open a partitioned JSON file.
243    ///
244    /// If `file_meta.range` is `None`, the entire file is opened.
245    /// Else `file_meta.range` is `Some(FileRange{start, end})`, which corresponds to the byte range [start, end) within the file.
246    ///
247    /// Note: `start` or `end` might be in the middle of some lines. In such cases, the following rules
248    /// are applied to determine which lines to read:
249    /// 1. The first line of the partition is the line in which the index of the first character >= `start`.
250    /// 2. The last line of the partition is the line in which the byte at position `end - 1` resides.
251    ///
252    /// Note: JSON array format does not support range-based scanning.
253    fn open(&self, partitioned_file: PartitionedFile) -> Result<FileOpenFuture> {
254        let store = Arc::clone(&self.object_store);
255        let schema = Arc::clone(&self.projected_schema);
256        let batch_size = self.batch_size;
257        let file_compression_type = self.file_compression_type.to_owned();
258        let newline_delimited = self.newline_delimited;
259
260        // JSON array format requires reading the complete file
261        if !newline_delimited && partitioned_file.range.is_some() {
262            return Err(DataFusionError::NotImplemented(
263                "JSON array format does not support range-based file scanning. \
264                 Disable repartition_file_scans or use newline-delimited JSON format."
265                    .to_string(),
266            ));
267        }
268
269        Ok(Box::pin(async move {
270            let calculated_range =
271                calculate_range(&partitioned_file, &store, None).await?;
272
273            let range = match calculated_range {
274                RangeCalculation::Range(None) => None,
275                RangeCalculation::Range(Some(range)) => Some(range.into()),
276                RangeCalculation::TerminateEarly => {
277                    return Ok(
278                        futures::stream::poll_fn(move |_| Poll::Ready(None)).boxed()
279                    );
280                }
281            };
282
283            let options = GetOptions {
284                range,
285                ..Default::default()
286            };
287
288            let result = store
289                .get_opts(&partitioned_file.object_meta.location, options)
290                .await?;
291
292            match result.payload {
293                #[cfg(not(target_arch = "wasm32"))]
294                GetResultPayload::File(mut file, _) => {
295                    let bytes = match partitioned_file.range {
296                        None => file_compression_type.convert_read(file)?,
297                        Some(_) => {
298                            file.seek(SeekFrom::Start(result.range.start as _))?;
299                            let limit = result.range.end - result.range.start;
300                            file_compression_type.convert_read(file.take(limit))?
301                        }
302                    };
303
304                    if newline_delimited {
305                        // NDJSON: use BufReader directly
306                        let reader = BufReader::new(bytes);
307                        let arrow_reader = ReaderBuilder::new(schema)
308                            .with_batch_size(batch_size)
309                            .build(reader)?;
310
311                        Ok(futures::stream::iter(arrow_reader)
312                            .map(|r| r.map_err(Into::into))
313                            .boxed())
314                    } else {
315                        // JSON array format: wrap with streaming converter
316                        let ndjson_reader = JsonArrayToNdjsonReader::with_capacity(
317                            bytes,
318                            JSON_CONVERTER_BUFFER_SIZE,
319                        );
320                        let arrow_reader = ReaderBuilder::new(schema)
321                            .with_batch_size(batch_size)
322                            .build(ndjson_reader)?;
323
324                        Ok(futures::stream::iter(arrow_reader)
325                            .map(|r| r.map_err(Into::into))
326                            .boxed())
327                    }
328                }
329                GetResultPayload::Stream(s) => {
330                    if newline_delimited {
331                        // Newline-delimited JSON (NDJSON) streaming reader
332                        let s = s.map_err(DataFusionError::from);
333                        let decoder = ReaderBuilder::new(schema)
334                            .with_batch_size(batch_size)
335                            .build_decoder()?;
336                        let input =
337                            file_compression_type.convert_stream(s.boxed())?.fuse();
338                        let stream = deserialize_stream(
339                            input,
340                            DecoderDeserializer::new(JsonDecoder::new(decoder)),
341                        );
342                        Ok(stream.map_err(Into::into).boxed())
343                    } else {
344                        // JSON array format: streaming conversion with channel-based byte transfer
345                        //
346                        // Architecture:
347                        // 1. Async task reads from object store stream, decompresses, sends to channel
348                        // 2. Blocking task receives bytes, converts JSON array to NDJSON, parses to Arrow
349                        // 3. RecordBatches are sent back via another channel
350                        //
351                        // Memory budget (~32MB):
352                        // - sync_channel: CHANNEL_BUFFER_SIZE chunks (~16MB)
353                        // - JsonArrayToNdjsonReader: 2 × JSON_CONVERTER_BUFFER_SIZE (~4MB)
354                        // - Arrow JsonReader internal buffer (~8MB)
355                        // - Miscellaneous (~4MB)
356
357                        let s = s.map_err(DataFusionError::from);
358                        let decompressed_stream =
359                            file_compression_type.convert_stream(s.boxed())?;
360
361                        // Channel for bytes: async producer -> blocking consumer
362                        // Uses tokio::sync::mpsc so the async send never blocks a
363                        // tokio worker thread; the consumer calls blocking_recv()
364                        // inside spawn_blocking.
365                        let (byte_tx, byte_rx) = tokio::sync::mpsc::channel::<bytes::Bytes>(
366                            CHANNEL_BUFFER_SIZE,
367                        );
368
369                        // Channel for results: sync producer -> async consumer
370                        let (result_tx, result_rx) = tokio::sync::mpsc::channel(2);
371                        let error_tx = result_tx.clone();
372
373                        // Async task: read from object store stream and send bytes to channel
374                        // Store the SpawnedTask to keep it alive until stream is dropped
375                        let read_task = SpawnedTask::spawn(async move {
376                            tokio::pin!(decompressed_stream);
377                            while let Some(chunk) = decompressed_stream.next().await {
378                                match chunk {
379                                    Ok(bytes) => {
380                                        if byte_tx.send(bytes).await.is_err() {
381                                            break; // Consumer dropped
382                                        }
383                                    }
384                                    Err(e) => {
385                                        let _ = error_tx
386                                            .send(Err(
387                                                arrow::error::ArrowError::ExternalError(
388                                                    Box::new(e),
389                                                ),
390                                            ))
391                                            .await;
392                                        break;
393                                    }
394                                }
395                            }
396                            // byte_tx dropped here, signals EOF to ChannelReader
397                        });
398
399                        // Blocking task: receive bytes from channel and parse JSON
400                        // Store the SpawnedTask to keep it alive until stream is dropped
401                        let parse_task = SpawnedTask::spawn_blocking(move || {
402                            let channel_reader = ChannelReader::new(byte_rx);
403                            let mut ndjson_reader =
404                                JsonArrayToNdjsonReader::with_capacity(
405                                    channel_reader,
406                                    JSON_CONVERTER_BUFFER_SIZE,
407                                );
408
409                            match ReaderBuilder::new(schema)
410                                .with_batch_size(batch_size)
411                                .build(&mut ndjson_reader)
412                            {
413                                Ok(arrow_reader) => {
414                                    for batch_result in arrow_reader {
415                                        if result_tx.blocking_send(batch_result).is_err()
416                                        {
417                                            break; // Receiver dropped
418                                        }
419                                    }
420                                }
421                                Err(e) => {
422                                    let _ = result_tx.blocking_send(Err(e));
423                                }
424                            }
425
426                            // Validate the JSON array was properly formed
427                            if let Err(e) = ndjson_reader.validate_complete() {
428                                let _ = result_tx.blocking_send(Err(
429                                    arrow::error::ArrowError::JsonError(e.to_string()),
430                                ));
431                            }
432                            // result_tx dropped here, closes the stream
433                        });
434
435                        // Wrap in JsonArrayStream to keep tasks alive until stream is consumed
436                        let stream = JsonArrayStream {
437                            inner: ReceiverStream::new(result_rx),
438                            _read_task: read_task,
439                            _parse_task: parse_task,
440                        };
441
442                        Ok(stream.map(|r| r.map_err(Into::into)).boxed())
443                    }
444                }
445            }
446        }))
447    }
448}
449
450pub async fn plan_to_json(
451    task_ctx: Arc<TaskContext>,
452    plan: Arc<dyn ExecutionPlan>,
453    path: impl AsRef<str>,
454) -> Result<()> {
455    let path = path.as_ref();
456    let parsed = ListingTableUrl::parse(path)?;
457    let object_store_url = parsed.object_store();
458    let store = task_ctx.runtime_env().object_store(&object_store_url)?;
459    let writer_buffer_size = task_ctx
460        .session_config()
461        .options()
462        .execution
463        .objectstore_writer_buffer_size;
464    let mut join_set = JoinSet::new();
465    for i in 0..plan.output_partitioning().partition_count() {
466        let storeref = Arc::clone(&store);
467        let plan: Arc<dyn ExecutionPlan> = Arc::clone(&plan);
468        let filename = format!("{}/part-{i}.json", parsed.prefix());
469        let file = object_store::path::Path::parse(filename)?;
470
471        let mut stream = plan.execute(i, Arc::clone(&task_ctx))?;
472        join_set.spawn(async move {
473            let mut buf_writer =
474                BufWriter::with_capacity(storeref, file.clone(), writer_buffer_size);
475
476            let mut buffer = Vec::with_capacity(1024);
477            while let Some(batch) = stream.next().await.transpose()? {
478                let mut writer = json::LineDelimitedWriter::new(buffer);
479                writer.write(&batch)?;
480                buffer = writer.into_inner();
481                buf_writer.write_all(&buffer).await?;
482                buffer.clear();
483            }
484
485            buf_writer.shutdown().await.map_err(DataFusionError::from)
486        });
487    }
488
489    while let Some(result) = join_set.join_next().await {
490        match result {
491            Ok(res) => res?, // propagate DataFusion error
492            Err(e) => {
493                if e.is_panic() {
494                    std::panic::resume_unwind(e.into_panic());
495                } else {
496                    unreachable!();
497                }
498            }
499        }
500    }
501
502    Ok(())
503}
504
505#[cfg(test)]
506mod tests {
507    use super::*;
508    use arrow::datatypes::{DataType, Field, Schema};
509    use bytes::Bytes;
510    use datafusion_datasource::FileRange;
511    use futures::TryStreamExt;
512    use object_store::memory::InMemory;
513    use object_store::path::Path;
514    use object_store::{ObjectStoreExt, PutPayload};
515
516    /// Helper to create a test schema
517    fn test_schema() -> SchemaRef {
518        Arc::new(Schema::new(vec![
519            Field::new("id", DataType::Int64, true),
520            Field::new("name", DataType::Utf8, true),
521        ]))
522    }
523
524    #[tokio::test]
525    async fn test_json_array_from_file() -> Result<()> {
526        // Test reading JSON array format from a file
527        let json_data = r#"[{"id": 1, "name": "alice"}, {"id": 2, "name": "bob"}]"#;
528
529        let store = Arc::new(InMemory::new());
530        let path = Path::from("test.json");
531        store
532            .put(&path, PutPayload::from_static(json_data.as_bytes()))
533            .await?;
534
535        let opener = JsonOpener::new(
536            1024,
537            test_schema(),
538            FileCompressionType::UNCOMPRESSED,
539            store.clone(),
540            false, // JSON array format
541        );
542
543        let meta = store.head(&path).await?;
544        let file = PartitionedFile::new(path.to_string(), meta.size);
545
546        let stream = opener.open(file)?.await?;
547        let batches: Vec<_> = stream.try_collect().await?;
548
549        assert_eq!(batches.len(), 1);
550        assert_eq!(batches[0].num_rows(), 2);
551
552        Ok(())
553    }
554
555    #[tokio::test]
556    async fn test_json_array_from_stream() -> Result<()> {
557        // Test reading JSON array format from object store stream (simulates S3)
558        let json_data = r#"[{"id": 1, "name": "alice"}, {"id": 2, "name": "bob"}, {"id": 3, "name": "charlie"}]"#;
559
560        // Use InMemory store which returns Stream payload
561        let store = Arc::new(InMemory::new());
562        let path = Path::from("test_stream.json");
563        store
564            .put(&path, PutPayload::from_static(json_data.as_bytes()))
565            .await?;
566
567        let opener = JsonOpener::new(
568            2, // small batch size to test multiple batches
569            test_schema(),
570            FileCompressionType::UNCOMPRESSED,
571            store.clone(),
572            false, // JSON array format
573        );
574
575        let meta = store.head(&path).await?;
576        let file = PartitionedFile::new(path.to_string(), meta.size);
577
578        let stream = opener.open(file)?.await?;
579        let batches: Vec<_> = stream.try_collect().await?;
580
581        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
582        assert_eq!(total_rows, 3);
583
584        Ok(())
585    }
586
587    #[tokio::test]
588    async fn test_json_array_nested_objects() -> Result<()> {
589        // Test JSON array with nested objects and arrays
590        let schema = Arc::new(Schema::new(vec![
591            Field::new("id", DataType::Int64, true),
592            Field::new("data", DataType::Utf8, true),
593        ]));
594
595        let json_data = r#"[
596            {"id": 1, "data": "{\"nested\": true}"},
597            {"id": 2, "data": "[1, 2, 3]"}
598        ]"#;
599
600        let store = Arc::new(InMemory::new());
601        let path = Path::from("nested.json");
602        store
603            .put(&path, PutPayload::from_static(json_data.as_bytes()))
604            .await?;
605
606        let opener = JsonOpener::new(
607            1024,
608            schema,
609            FileCompressionType::UNCOMPRESSED,
610            store.clone(),
611            false,
612        );
613
614        let meta = store.head(&path).await?;
615        let file = PartitionedFile::new(path.to_string(), meta.size);
616
617        let stream = opener.open(file)?.await?;
618        let batches: Vec<_> = stream.try_collect().await?;
619
620        assert_eq!(batches[0].num_rows(), 2);
621
622        Ok(())
623    }
624
625    #[tokio::test]
626    async fn test_json_array_empty() -> Result<()> {
627        // Test empty JSON array
628        let json_data = "[]";
629
630        let store = Arc::new(InMemory::new());
631        let path = Path::from("empty.json");
632        store
633            .put(&path, PutPayload::from_static(json_data.as_bytes()))
634            .await?;
635
636        let opener = JsonOpener::new(
637            1024,
638            test_schema(),
639            FileCompressionType::UNCOMPRESSED,
640            store.clone(),
641            false,
642        );
643
644        let meta = store.head(&path).await?;
645        let file = PartitionedFile::new(path.to_string(), meta.size);
646
647        let stream = opener.open(file)?.await?;
648        let batches: Vec<_> = stream.try_collect().await?;
649
650        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
651        assert_eq!(total_rows, 0);
652
653        Ok(())
654    }
655
656    #[tokio::test]
657    async fn test_json_array_range_not_supported() {
658        // Test that range-based scanning returns error for JSON array format
659        let store = Arc::new(InMemory::new());
660        let path = Path::from("test.json");
661        store
662            .put(&path, PutPayload::from_static(b"[]"))
663            .await
664            .unwrap();
665
666        let opener = JsonOpener::new(
667            1024,
668            test_schema(),
669            FileCompressionType::UNCOMPRESSED,
670            store.clone(),
671            false, // JSON array format
672        );
673
674        let meta = store.head(&path).await.unwrap();
675        let mut file = PartitionedFile::new(path.to_string(), meta.size);
676        file.range = Some(FileRange { start: 0, end: 10 });
677
678        let result = opener.open(file);
679        match result {
680            Ok(_) => panic!("Expected error for range-based JSON array scanning"),
681            Err(e) => {
682                assert!(
683                    e.to_string().contains("does not support range-based"),
684                    "Unexpected error message: {e}"
685                );
686            }
687        }
688    }
689
690    #[tokio::test]
691    async fn test_ndjson_still_works() -> Result<()> {
692        // Ensure NDJSON format still works correctly
693        let json_data =
694            "{\"id\": 1, \"name\": \"alice\"}\n{\"id\": 2, \"name\": \"bob\"}\n";
695
696        let store = Arc::new(InMemory::new());
697        let path = Path::from("test.ndjson");
698        store
699            .put(&path, PutPayload::from_static(json_data.as_bytes()))
700            .await?;
701
702        let opener = JsonOpener::new(
703            1024,
704            test_schema(),
705            FileCompressionType::UNCOMPRESSED,
706            store.clone(),
707            true, // NDJSON format
708        );
709
710        let meta = store.head(&path).await?;
711        let file = PartitionedFile::new(path.to_string(), meta.size);
712
713        let stream = opener.open(file)?.await?;
714        let batches: Vec<_> = stream.try_collect().await?;
715
716        assert_eq!(batches.len(), 1);
717        assert_eq!(batches[0].num_rows(), 2);
718
719        Ok(())
720    }
721
722    #[tokio::test]
723    async fn test_json_array_large_file() -> Result<()> {
724        // Test with a larger JSON array to verify streaming works
725        let mut json_data = String::from("[");
726        for i in 0..1000 {
727            if i > 0 {
728                json_data.push(',');
729            }
730            json_data.push_str(&format!(r#"{{"id": {i}, "name": "user{i}"}}"#));
731        }
732        json_data.push(']');
733
734        let store = Arc::new(InMemory::new());
735        let path = Path::from("large.json");
736        store
737            .put(&path, PutPayload::from(Bytes::from(json_data)))
738            .await?;
739
740        let opener = JsonOpener::new(
741            100, // batch size of 100
742            test_schema(),
743            FileCompressionType::UNCOMPRESSED,
744            store.clone(),
745            false,
746        );
747
748        let meta = store.head(&path).await?;
749        let file = PartitionedFile::new(path.to_string(), meta.size);
750
751        let stream = opener.open(file)?.await?;
752        let batches: Vec<_> = stream.try_collect().await?;
753
754        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
755        assert_eq!(total_rows, 1000);
756
757        // Should have multiple batches due to batch_size=100
758        assert!(batches.len() >= 10);
759
760        Ok(())
761    }
762
763    #[tokio::test]
764    async fn test_json_array_stream_cancellation() -> Result<()> {
765        // Test that cancellation works correctly (tasks are aborted when stream is dropped)
766        let mut json_data = String::from("[");
767        for i in 0..10000 {
768            if i > 0 {
769                json_data.push(',');
770            }
771            json_data.push_str(&format!(r#"{{"id": {i}, "name": "user{i}"}}"#));
772        }
773        json_data.push(']');
774
775        let store = Arc::new(InMemory::new());
776        let path = Path::from("cancel_test.json");
777        store
778            .put(&path, PutPayload::from(Bytes::from(json_data)))
779            .await?;
780
781        let opener = JsonOpener::new(
782            10, // small batch size
783            test_schema(),
784            FileCompressionType::UNCOMPRESSED,
785            store.clone(),
786            false,
787        );
788
789        let meta = store.head(&path).await?;
790        let file = PartitionedFile::new(path.to_string(), meta.size);
791
792        let mut stream = opener.open(file)?.await?;
793
794        // Read only first batch, then drop the stream (simulating cancellation)
795        let first_batch = stream.next().await;
796        assert!(first_batch.is_some());
797
798        // Drop the stream - this should abort the spawned tasks via SpawnedTask's Drop
799        drop(stream);
800
801        // Give tasks time to be aborted
802        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
803
804        // If we reach here without hanging, cancellation worked
805        Ok(())
806    }
807}