Skip to main content

datafusion_datasource_json/
file_format.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`JsonFormat`]: Line delimited and array JSON [`FileFormat`] abstractions
19
20use std::collections::HashMap;
21use std::fmt;
22use std::fmt::Debug;
23use std::io::{BufReader, Read};
24use std::sync::Arc;
25
26use crate::source::JsonSource;
27
28use arrow::array::RecordBatch;
29use arrow::datatypes::{Schema, SchemaRef};
30use arrow::error::ArrowError;
31use arrow::json;
32use arrow::json::reader::{ValueIter, infer_json_schema_from_iterator};
33use bytes::{Buf, Bytes};
34use datafusion_common::config::{ConfigField, ConfigFileType, JsonOptions};
35use datafusion_common::file_options::json_writer::JsonWriterOptions;
36use datafusion_common::{
37    DEFAULT_JSON_EXTENSION, GetExt, Result, Statistics, not_impl_err,
38};
39use datafusion_common_runtime::SpawnedTask;
40use datafusion_datasource::TableSchema;
41use datafusion_datasource::decoder::Decoder;
42use datafusion_datasource::display::FileGroupDisplay;
43use datafusion_datasource::file::FileSource;
44use datafusion_datasource::file_compression_type::FileCompressionType;
45use datafusion_datasource::file_format::{
46    DEFAULT_SCHEMA_INFER_MAX_RECORD, FileFormat, FileFormatFactory,
47};
48use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
49use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig};
50use datafusion_datasource::sink::{DataSink, DataSinkExec};
51use datafusion_datasource::source::DataSourceExec;
52use datafusion_datasource::write::BatchSerializer;
53use datafusion_datasource::write::demux::DemuxedStreamReceiver;
54use datafusion_datasource::write::orchestration::spawn_writer_tasks_and_join;
55use datafusion_execution::{SendableRecordBatchStream, TaskContext};
56use datafusion_expr::dml::InsertOp;
57use datafusion_physical_expr_common::sort_expr::LexRequirement;
58use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
59use datafusion_session::Session;
60
61use crate::utils::JsonArrayToNdjsonReader;
62use async_trait::async_trait;
63use object_store::{GetResultPayload, ObjectMeta, ObjectStore, ObjectStoreExt};
64
65#[derive(Default)]
66/// Factory struct used to create [JsonFormat]
67pub struct JsonFormatFactory {
68    /// the options carried by format factory
69    pub options: Option<JsonOptions>,
70}
71
72impl JsonFormatFactory {
73    /// Creates an instance of [JsonFormatFactory]
74    pub fn new() -> Self {
75        Self { options: None }
76    }
77
78    /// Creates an instance of [JsonFormatFactory] with customized default options
79    pub fn new_with_options(options: JsonOptions) -> Self {
80        Self {
81            options: Some(options),
82        }
83    }
84}
85
86impl FileFormatFactory for JsonFormatFactory {
87    fn create(
88        &self,
89        state: &dyn Session,
90        format_options: &HashMap<String, String>,
91    ) -> Result<Arc<dyn FileFormat>> {
92        let json_options = match &self.options {
93            None => {
94                let mut table_options = state.default_table_options();
95                table_options.set_config_format(ConfigFileType::JSON);
96                table_options.alter_with_string_hash_map(format_options)?;
97                table_options.json
98            }
99            Some(json_options) => {
100                let mut json_options = json_options.clone();
101                for (k, v) in format_options {
102                    json_options.set(k, v)?;
103                }
104                json_options
105            }
106        };
107
108        Ok(Arc::new(JsonFormat::default().with_options(json_options)))
109    }
110
111    fn default(&self) -> Arc<dyn FileFormat> {
112        Arc::new(JsonFormat::default())
113    }
114}
115
116impl GetExt for JsonFormatFactory {
117    fn get_ext(&self) -> String {
118        // Removes the dot, i.e. ".parquet" -> "parquet"
119        DEFAULT_JSON_EXTENSION[1..].to_string()
120    }
121}
122
123impl Debug for JsonFormatFactory {
124    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
125        f.debug_struct("JsonFormatFactory")
126            .field("options", &self.options)
127            .finish()
128    }
129}
130
131/// JSON `FileFormat` implementation supporting both line-delimited and array formats.
132///
133/// # Supported Formats
134///
135/// ## Line-Delimited JSON (default, `newline_delimited = true`)
136/// ```text
137/// {"key1": 1, "key2": "val"}
138/// {"key1": 2, "key2": "vals"}
139/// ```
140///
141/// ## JSON Array Format (`newline_delimited = false`)
142/// ```text
143/// [
144///     {"key1": 1, "key2": "val"},
145///     {"key1": 2, "key2": "vals"}
146/// ]
147/// ```
148///
149/// Note: JSON array format is processed using streaming conversion,
150/// which is memory-efficient even for large files.
151#[derive(Debug, Default)]
152pub struct JsonFormat {
153    options: JsonOptions,
154}
155
156impl JsonFormat {
157    /// Set JSON options
158    pub fn with_options(mut self, options: JsonOptions) -> Self {
159        self.options = options;
160        self
161    }
162
163    /// Retrieve JSON options
164    pub fn options(&self) -> &JsonOptions {
165        &self.options
166    }
167
168    /// Set a limit in terms of records to scan to infer the schema
169    /// - defaults to `DEFAULT_SCHEMA_INFER_MAX_RECORD`
170    pub fn with_schema_infer_max_rec(mut self, max_rec: usize) -> Self {
171        self.options.schema_infer_max_rec = Some(max_rec);
172        self
173    }
174
175    /// Set a [`FileCompressionType`] of JSON
176    /// - defaults to `FileCompressionType::UNCOMPRESSED`
177    pub fn with_file_compression_type(
178        mut self,
179        file_compression_type: FileCompressionType,
180    ) -> Self {
181        self.options.compression = file_compression_type.into();
182        self
183    }
184
185    /// Set whether to read as newline-delimited JSON (NDJSON).
186    ///
187    /// When `true` (default), expects newline-delimited format:
188    /// ```text
189    /// {"a": 1}
190    /// {"a": 2}
191    /// ```
192    ///
193    /// When `false`, expects JSON array format:
194    /// ```text
195    /// [{"a": 1}, {"a": 2}]
196    /// ```
197    pub fn with_newline_delimited(mut self, newline_delimited: bool) -> Self {
198        self.options.newline_delimited = newline_delimited;
199        self
200    }
201
202    /// Returns whether this format expects newline-delimited JSON.
203    pub fn is_newline_delimited(&self) -> bool {
204        self.options.newline_delimited
205    }
206}
207
208/// Infer schema from JSON array format using streaming conversion.
209///
210/// This function converts JSON array format to NDJSON on-the-fly and uses
211/// arrow-json's schema inference. It properly tracks the number of records
212/// processed for correct `records_to_read` management.
213///
214/// # Returns
215/// A tuple of (Schema, records_consumed) where records_consumed is the
216/// number of records that were processed for schema inference.
217fn infer_schema_from_json_array<R: Read>(
218    reader: R,
219    max_records: usize,
220) -> Result<(Schema, usize)> {
221    let ndjson_reader = JsonArrayToNdjsonReader::new(reader);
222
223    let iter = ValueIter::new(ndjson_reader, None);
224    let mut count = 0;
225
226    let schema = infer_json_schema_from_iterator(iter.take_while(|_| {
227        let should_take = count < max_records;
228        if should_take {
229            count += 1;
230        }
231        should_take
232    }))?;
233
234    Ok((schema, count))
235}
236
237#[async_trait]
238impl FileFormat for JsonFormat {
239    fn get_ext(&self) -> String {
240        JsonFormatFactory::new().get_ext()
241    }
242
243    fn get_ext_with_compression(
244        &self,
245        file_compression_type: &FileCompressionType,
246    ) -> Result<String> {
247        let ext = self.get_ext();
248        Ok(format!("{}{}", ext, file_compression_type.get_ext()))
249    }
250
251    fn compression_type(&self) -> Option<FileCompressionType> {
252        Some(self.options.compression.into())
253    }
254
255    async fn infer_schema(
256        &self,
257        _state: &dyn Session,
258        store: &Arc<dyn ObjectStore>,
259        objects: &[ObjectMeta],
260    ) -> Result<SchemaRef> {
261        let mut schemas = Vec::new();
262        let mut records_to_read = self
263            .options
264            .schema_infer_max_rec
265            .unwrap_or(DEFAULT_SCHEMA_INFER_MAX_RECORD);
266        let file_compression_type = FileCompressionType::from(self.options.compression);
267        let newline_delimited = self.options.newline_delimited;
268
269        for object in objects {
270            // Early exit if we've read enough records
271            if records_to_read == 0 {
272                break;
273            }
274
275            let r = store.as_ref().get(&object.location).await?;
276
277            let (schema, records_consumed) = match r.payload {
278                #[cfg(not(target_arch = "wasm32"))]
279                GetResultPayload::File(file, _) => {
280                    let decoder = file_compression_type.convert_read(file)?;
281                    let reader = BufReader::new(decoder);
282
283                    if newline_delimited {
284                        // NDJSON: use ValueIter directly
285                        let iter = ValueIter::new(reader, None);
286                        let mut count = 0;
287                        let schema =
288                            infer_json_schema_from_iterator(iter.take_while(|_| {
289                                let should_take = count < records_to_read;
290                                if should_take {
291                                    count += 1;
292                                }
293                                should_take
294                            }))?;
295                        (schema, count)
296                    } else {
297                        // JSON array format: use streaming converter
298                        infer_schema_from_json_array(reader, records_to_read)?
299                    }
300                }
301                GetResultPayload::Stream(_) => {
302                    let data = r.bytes().await?;
303                    let decoder = file_compression_type.convert_read(data.reader())?;
304                    let reader = BufReader::new(decoder);
305
306                    if newline_delimited {
307                        let iter = ValueIter::new(reader, None);
308                        let mut count = 0;
309                        let schema =
310                            infer_json_schema_from_iterator(iter.take_while(|_| {
311                                let should_take = count < records_to_read;
312                                if should_take {
313                                    count += 1;
314                                }
315                                should_take
316                            }))?;
317                        (schema, count)
318                    } else {
319                        // JSON array format: use streaming converter
320                        infer_schema_from_json_array(reader, records_to_read)?
321                    }
322                }
323            };
324
325            schemas.push(schema);
326            // Correctly decrement records_to_read
327            records_to_read = records_to_read.saturating_sub(records_consumed);
328        }
329
330        let schema = Schema::try_merge(schemas)?;
331        Ok(Arc::new(schema))
332    }
333
334    async fn infer_stats(
335        &self,
336        _state: &dyn Session,
337        _store: &Arc<dyn ObjectStore>,
338        table_schema: SchemaRef,
339        _object: &ObjectMeta,
340    ) -> Result<Statistics> {
341        Ok(Statistics::new_unknown(&table_schema))
342    }
343
344    async fn create_physical_plan(
345        &self,
346        _state: &dyn Session,
347        conf: FileScanConfig,
348    ) -> Result<Arc<dyn ExecutionPlan>> {
349        let conf = FileScanConfigBuilder::from(conf)
350            .with_file_compression_type(FileCompressionType::from(
351                self.options.compression,
352            ))
353            .build();
354        Ok(DataSourceExec::from_data_source(conf))
355    }
356
357    async fn create_writer_physical_plan(
358        &self,
359        input: Arc<dyn ExecutionPlan>,
360        _state: &dyn Session,
361        conf: FileSinkConfig,
362        order_requirements: Option<LexRequirement>,
363    ) -> Result<Arc<dyn ExecutionPlan>> {
364        if conf.insert_op != InsertOp::Append {
365            return not_impl_err!("Overwrites are not implemented yet for Json");
366        }
367
368        let writer_options = JsonWriterOptions::try_from(&self.options)?;
369
370        let sink = Arc::new(JsonSink::new(conf, writer_options));
371
372        Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _)
373    }
374
375    fn file_source(&self, table_schema: TableSchema) -> Arc<dyn FileSource> {
376        Arc::new(
377            JsonSource::new(table_schema)
378                .with_newline_delimited(self.options.newline_delimited),
379        )
380    }
381}
382
383impl Default for JsonSerializer {
384    fn default() -> Self {
385        Self::new()
386    }
387}
388
389/// Define a struct for serializing Json records to a stream
390pub struct JsonSerializer {}
391
392impl JsonSerializer {
393    /// Constructor for the JsonSerializer object
394    pub fn new() -> Self {
395        Self {}
396    }
397}
398
399impl BatchSerializer for JsonSerializer {
400    fn serialize(&self, batch: RecordBatch, _initial: bool) -> Result<Bytes> {
401        let mut buffer = Vec::with_capacity(4096);
402        let mut writer = json::LineDelimitedWriter::new(&mut buffer);
403        writer.write(&batch)?;
404        Ok(Bytes::from(buffer))
405    }
406}
407
408/// Implements [`DataSink`] for writing to a Json file.
409pub struct JsonSink {
410    /// Config options for writing data
411    config: FileSinkConfig,
412    /// Writer options for underlying Json writer
413    writer_options: JsonWriterOptions,
414}
415
416impl Debug for JsonSink {
417    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
418        f.debug_struct("JsonSink").finish()
419    }
420}
421
422impl DisplayAs for JsonSink {
423    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
424        match t {
425            DisplayFormatType::Default | DisplayFormatType::Verbose => {
426                write!(f, "JsonSink(file_groups=",)?;
427                FileGroupDisplay(&self.config.file_group).fmt_as(t, f)?;
428                write!(f, ")")
429            }
430            DisplayFormatType::TreeRender => {
431                writeln!(f, "format: json")?;
432                write!(f, "file={}", &self.config.original_url)
433            }
434        }
435    }
436}
437
438impl JsonSink {
439    /// Create from config.
440    pub fn new(config: FileSinkConfig, writer_options: JsonWriterOptions) -> Self {
441        Self {
442            config,
443            writer_options,
444        }
445    }
446
447    /// Retrieve the writer options
448    pub fn writer_options(&self) -> &JsonWriterOptions {
449        &self.writer_options
450    }
451}
452
453#[async_trait]
454impl FileSink for JsonSink {
455    fn config(&self) -> &FileSinkConfig {
456        &self.config
457    }
458
459    async fn spawn_writer_tasks_and_join(
460        &self,
461        context: &Arc<TaskContext>,
462        demux_task: SpawnedTask<Result<()>>,
463        file_stream_rx: DemuxedStreamReceiver,
464        object_store: Arc<dyn ObjectStore>,
465    ) -> Result<u64> {
466        let serializer = Arc::new(JsonSerializer::new()) as _;
467        spawn_writer_tasks_and_join(
468            context,
469            serializer,
470            self.writer_options.compression.into(),
471            self.writer_options.compression_level,
472            object_store,
473            demux_task,
474            file_stream_rx,
475        )
476        .await
477    }
478}
479
480#[async_trait]
481impl DataSink for JsonSink {
482    fn schema(&self) -> &SchemaRef {
483        self.config.output_schema()
484    }
485
486    async fn write_all(
487        &self,
488        data: SendableRecordBatchStream,
489        context: &Arc<TaskContext>,
490    ) -> Result<u64> {
491        FileSink::write_all(self, data, context).await
492    }
493}
494
495#[derive(Debug)]
496pub struct JsonDecoder {
497    inner: json::reader::Decoder,
498}
499
500impl JsonDecoder {
501    pub fn new(decoder: json::reader::Decoder) -> Self {
502        Self { inner: decoder }
503    }
504}
505
506impl Decoder for JsonDecoder {
507    fn decode(&mut self, buf: &[u8]) -> Result<usize, ArrowError> {
508        self.inner.decode(buf)
509    }
510
511    fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
512        self.inner.flush()
513    }
514
515    fn can_flush_early(&self) -> bool {
516        false
517    }
518}