Skip to main content

datafusion_datasource_json/
file_format.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`JsonFormat`]: Line delimited and array JSON [`FileFormat`] abstractions
19
20use std::any::Any;
21use std::collections::HashMap;
22use std::fmt;
23use std::fmt::Debug;
24use std::io::{BufReader, Read};
25use std::sync::Arc;
26
27use crate::source::JsonSource;
28
29use arrow::array::RecordBatch;
30use arrow::datatypes::{Schema, SchemaRef};
31use arrow::error::ArrowError;
32use arrow::json;
33use arrow::json::reader::{ValueIter, infer_json_schema_from_iterator};
34use bytes::{Buf, Bytes};
35use datafusion_common::config::{ConfigField, ConfigFileType, JsonOptions};
36use datafusion_common::file_options::json_writer::JsonWriterOptions;
37use datafusion_common::{
38    DEFAULT_JSON_EXTENSION, GetExt, Result, Statistics, not_impl_err,
39};
40use datafusion_common_runtime::SpawnedTask;
41use datafusion_datasource::TableSchema;
42use datafusion_datasource::decoder::Decoder;
43use datafusion_datasource::display::FileGroupDisplay;
44use datafusion_datasource::file::FileSource;
45use datafusion_datasource::file_compression_type::FileCompressionType;
46use datafusion_datasource::file_format::{
47    DEFAULT_SCHEMA_INFER_MAX_RECORD, FileFormat, FileFormatFactory,
48};
49use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
50use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig};
51use datafusion_datasource::sink::{DataSink, DataSinkExec};
52use datafusion_datasource::source::DataSourceExec;
53use datafusion_datasource::write::BatchSerializer;
54use datafusion_datasource::write::demux::DemuxedStreamReceiver;
55use datafusion_datasource::write::orchestration::spawn_writer_tasks_and_join;
56use datafusion_execution::{SendableRecordBatchStream, TaskContext};
57use datafusion_expr::dml::InsertOp;
58use datafusion_physical_expr_common::sort_expr::LexRequirement;
59use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
60use datafusion_session::Session;
61
62use crate::utils::JsonArrayToNdjsonReader;
63use async_trait::async_trait;
64use object_store::{GetResultPayload, ObjectMeta, ObjectStore, ObjectStoreExt};
65
66#[derive(Default)]
67/// Factory struct used to create [JsonFormat]
68pub struct JsonFormatFactory {
69    /// the options carried by format factory
70    pub options: Option<JsonOptions>,
71}
72
73impl JsonFormatFactory {
74    /// Creates an instance of [JsonFormatFactory]
75    pub fn new() -> Self {
76        Self { options: None }
77    }
78
79    /// Creates an instance of [JsonFormatFactory] with customized default options
80    pub fn new_with_options(options: JsonOptions) -> Self {
81        Self {
82            options: Some(options),
83        }
84    }
85}
86
87impl FileFormatFactory for JsonFormatFactory {
88    fn create(
89        &self,
90        state: &dyn Session,
91        format_options: &HashMap<String, String>,
92    ) -> Result<Arc<dyn FileFormat>> {
93        let json_options = match &self.options {
94            None => {
95                let mut table_options = state.default_table_options();
96                table_options.set_config_format(ConfigFileType::JSON);
97                table_options.alter_with_string_hash_map(format_options)?;
98                table_options.json
99            }
100            Some(json_options) => {
101                let mut json_options = json_options.clone();
102                for (k, v) in format_options {
103                    json_options.set(k, v)?;
104                }
105                json_options
106            }
107        };
108
109        Ok(Arc::new(JsonFormat::default().with_options(json_options)))
110    }
111
112    fn default(&self) -> Arc<dyn FileFormat> {
113        Arc::new(JsonFormat::default())
114    }
115
116    fn as_any(&self) -> &dyn Any {
117        self
118    }
119}
120
121impl GetExt for JsonFormatFactory {
122    fn get_ext(&self) -> String {
123        // Removes the dot, i.e. ".parquet" -> "parquet"
124        DEFAULT_JSON_EXTENSION[1..].to_string()
125    }
126}
127
128impl Debug for JsonFormatFactory {
129    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
130        f.debug_struct("JsonFormatFactory")
131            .field("options", &self.options)
132            .finish()
133    }
134}
135
136/// JSON `FileFormat` implementation supporting both line-delimited and array formats.
137///
138/// # Supported Formats
139///
140/// ## Line-Delimited JSON (default, `newline_delimited = true`)
141/// ```text
142/// {"key1": 1, "key2": "val"}
143/// {"key1": 2, "key2": "vals"}
144/// ```
145///
146/// ## JSON Array Format (`newline_delimited = false`)
147/// ```text
148/// [
149///     {"key1": 1, "key2": "val"},
150///     {"key1": 2, "key2": "vals"}
151/// ]
152/// ```
153///
154/// Note: JSON array format is processed using streaming conversion,
155/// which is memory-efficient even for large files.
156#[derive(Debug, Default)]
157pub struct JsonFormat {
158    options: JsonOptions,
159}
160
161impl JsonFormat {
162    /// Set JSON options
163    pub fn with_options(mut self, options: JsonOptions) -> Self {
164        self.options = options;
165        self
166    }
167
168    /// Retrieve JSON options
169    pub fn options(&self) -> &JsonOptions {
170        &self.options
171    }
172
173    /// Set a limit in terms of records to scan to infer the schema
174    /// - defaults to `DEFAULT_SCHEMA_INFER_MAX_RECORD`
175    pub fn with_schema_infer_max_rec(mut self, max_rec: usize) -> Self {
176        self.options.schema_infer_max_rec = Some(max_rec);
177        self
178    }
179
180    /// Set a [`FileCompressionType`] of JSON
181    /// - defaults to `FileCompressionType::UNCOMPRESSED`
182    pub fn with_file_compression_type(
183        mut self,
184        file_compression_type: FileCompressionType,
185    ) -> Self {
186        self.options.compression = file_compression_type.into();
187        self
188    }
189
190    /// Set whether to read as newline-delimited JSON (NDJSON).
191    ///
192    /// When `true` (default), expects newline-delimited format:
193    /// ```text
194    /// {"a": 1}
195    /// {"a": 2}
196    /// ```
197    ///
198    /// When `false`, expects JSON array format:
199    /// ```text
200    /// [{"a": 1}, {"a": 2}]
201    /// ```
202    pub fn with_newline_delimited(mut self, newline_delimited: bool) -> Self {
203        self.options.newline_delimited = newline_delimited;
204        self
205    }
206
207    /// Returns whether this format expects newline-delimited JSON.
208    pub fn is_newline_delimited(&self) -> bool {
209        self.options.newline_delimited
210    }
211}
212
213/// Infer schema from JSON array format using streaming conversion.
214///
215/// This function converts JSON array format to NDJSON on-the-fly and uses
216/// arrow-json's schema inference. It properly tracks the number of records
217/// processed for correct `records_to_read` management.
218///
219/// # Returns
220/// A tuple of (Schema, records_consumed) where records_consumed is the
221/// number of records that were processed for schema inference.
222fn infer_schema_from_json_array<R: Read>(
223    reader: R,
224    max_records: usize,
225) -> Result<(Schema, usize)> {
226    let ndjson_reader = JsonArrayToNdjsonReader::new(reader);
227
228    let iter = ValueIter::new(ndjson_reader, None);
229    let mut count = 0;
230
231    let schema = infer_json_schema_from_iterator(iter.take_while(|_| {
232        let should_take = count < max_records;
233        if should_take {
234            count += 1;
235        }
236        should_take
237    }))?;
238
239    Ok((schema, count))
240}
241
242#[async_trait]
243impl FileFormat for JsonFormat {
244    fn as_any(&self) -> &dyn Any {
245        self
246    }
247
248    fn get_ext(&self) -> String {
249        JsonFormatFactory::new().get_ext()
250    }
251
252    fn get_ext_with_compression(
253        &self,
254        file_compression_type: &FileCompressionType,
255    ) -> Result<String> {
256        let ext = self.get_ext();
257        Ok(format!("{}{}", ext, file_compression_type.get_ext()))
258    }
259
260    fn compression_type(&self) -> Option<FileCompressionType> {
261        Some(self.options.compression.into())
262    }
263
264    async fn infer_schema(
265        &self,
266        _state: &dyn Session,
267        store: &Arc<dyn ObjectStore>,
268        objects: &[ObjectMeta],
269    ) -> Result<SchemaRef> {
270        let mut schemas = Vec::new();
271        let mut records_to_read = self
272            .options
273            .schema_infer_max_rec
274            .unwrap_or(DEFAULT_SCHEMA_INFER_MAX_RECORD);
275        let file_compression_type = FileCompressionType::from(self.options.compression);
276        let newline_delimited = self.options.newline_delimited;
277
278        for object in objects {
279            // Early exit if we've read enough records
280            if records_to_read == 0 {
281                break;
282            }
283
284            let r = store.as_ref().get(&object.location).await?;
285
286            let (schema, records_consumed) = match r.payload {
287                #[cfg(not(target_arch = "wasm32"))]
288                GetResultPayload::File(file, _) => {
289                    let decoder = file_compression_type.convert_read(file)?;
290                    let reader = BufReader::new(decoder);
291
292                    if newline_delimited {
293                        // NDJSON: use ValueIter directly
294                        let iter = ValueIter::new(reader, None);
295                        let mut count = 0;
296                        let schema =
297                            infer_json_schema_from_iterator(iter.take_while(|_| {
298                                let should_take = count < records_to_read;
299                                if should_take {
300                                    count += 1;
301                                }
302                                should_take
303                            }))?;
304                        (schema, count)
305                    } else {
306                        // JSON array format: use streaming converter
307                        infer_schema_from_json_array(reader, records_to_read)?
308                    }
309                }
310                GetResultPayload::Stream(_) => {
311                    let data = r.bytes().await?;
312                    let decoder = file_compression_type.convert_read(data.reader())?;
313                    let reader = BufReader::new(decoder);
314
315                    if newline_delimited {
316                        let iter = ValueIter::new(reader, None);
317                        let mut count = 0;
318                        let schema =
319                            infer_json_schema_from_iterator(iter.take_while(|_| {
320                                let should_take = count < records_to_read;
321                                if should_take {
322                                    count += 1;
323                                }
324                                should_take
325                            }))?;
326                        (schema, count)
327                    } else {
328                        // JSON array format: use streaming converter
329                        infer_schema_from_json_array(reader, records_to_read)?
330                    }
331                }
332            };
333
334            schemas.push(schema);
335            // Correctly decrement records_to_read
336            records_to_read = records_to_read.saturating_sub(records_consumed);
337        }
338
339        let schema = Schema::try_merge(schemas)?;
340        Ok(Arc::new(schema))
341    }
342
343    async fn infer_stats(
344        &self,
345        _state: &dyn Session,
346        _store: &Arc<dyn ObjectStore>,
347        table_schema: SchemaRef,
348        _object: &ObjectMeta,
349    ) -> Result<Statistics> {
350        Ok(Statistics::new_unknown(&table_schema))
351    }
352
353    async fn create_physical_plan(
354        &self,
355        _state: &dyn Session,
356        conf: FileScanConfig,
357    ) -> Result<Arc<dyn ExecutionPlan>> {
358        let conf = FileScanConfigBuilder::from(conf)
359            .with_file_compression_type(FileCompressionType::from(
360                self.options.compression,
361            ))
362            .build();
363        Ok(DataSourceExec::from_data_source(conf))
364    }
365
366    async fn create_writer_physical_plan(
367        &self,
368        input: Arc<dyn ExecutionPlan>,
369        _state: &dyn Session,
370        conf: FileSinkConfig,
371        order_requirements: Option<LexRequirement>,
372    ) -> Result<Arc<dyn ExecutionPlan>> {
373        if conf.insert_op != InsertOp::Append {
374            return not_impl_err!("Overwrites are not implemented yet for Json");
375        }
376
377        let writer_options = JsonWriterOptions::try_from(&self.options)?;
378
379        let sink = Arc::new(JsonSink::new(conf, writer_options));
380
381        Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _)
382    }
383
384    fn file_source(&self, table_schema: TableSchema) -> Arc<dyn FileSource> {
385        Arc::new(
386            JsonSource::new(table_schema)
387                .with_newline_delimited(self.options.newline_delimited),
388        )
389    }
390}
391
392impl Default for JsonSerializer {
393    fn default() -> Self {
394        Self::new()
395    }
396}
397
398/// Define a struct for serializing Json records to a stream
399pub struct JsonSerializer {}
400
401impl JsonSerializer {
402    /// Constructor for the JsonSerializer object
403    pub fn new() -> Self {
404        Self {}
405    }
406}
407
408impl BatchSerializer for JsonSerializer {
409    fn serialize(&self, batch: RecordBatch, _initial: bool) -> Result<Bytes> {
410        let mut buffer = Vec::with_capacity(4096);
411        let mut writer = json::LineDelimitedWriter::new(&mut buffer);
412        writer.write(&batch)?;
413        Ok(Bytes::from(buffer))
414    }
415}
416
417/// Implements [`DataSink`] for writing to a Json file.
418pub struct JsonSink {
419    /// Config options for writing data
420    config: FileSinkConfig,
421    /// Writer options for underlying Json writer
422    writer_options: JsonWriterOptions,
423}
424
425impl Debug for JsonSink {
426    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
427        f.debug_struct("JsonSink").finish()
428    }
429}
430
431impl DisplayAs for JsonSink {
432    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
433        match t {
434            DisplayFormatType::Default | DisplayFormatType::Verbose => {
435                write!(f, "JsonSink(file_groups=",)?;
436                FileGroupDisplay(&self.config.file_group).fmt_as(t, f)?;
437                write!(f, ")")
438            }
439            DisplayFormatType::TreeRender => {
440                writeln!(f, "format: json")?;
441                write!(f, "file={}", &self.config.original_url)
442            }
443        }
444    }
445}
446
447impl JsonSink {
448    /// Create from config.
449    pub fn new(config: FileSinkConfig, writer_options: JsonWriterOptions) -> Self {
450        Self {
451            config,
452            writer_options,
453        }
454    }
455
456    /// Retrieve the writer options
457    pub fn writer_options(&self) -> &JsonWriterOptions {
458        &self.writer_options
459    }
460}
461
462#[async_trait]
463impl FileSink for JsonSink {
464    fn config(&self) -> &FileSinkConfig {
465        &self.config
466    }
467
468    async fn spawn_writer_tasks_and_join(
469        &self,
470        context: &Arc<TaskContext>,
471        demux_task: SpawnedTask<Result<()>>,
472        file_stream_rx: DemuxedStreamReceiver,
473        object_store: Arc<dyn ObjectStore>,
474    ) -> Result<u64> {
475        let serializer = Arc::new(JsonSerializer::new()) as _;
476        spawn_writer_tasks_and_join(
477            context,
478            serializer,
479            self.writer_options.compression.into(),
480            self.writer_options.compression_level,
481            object_store,
482            demux_task,
483            file_stream_rx,
484        )
485        .await
486    }
487}
488
489#[async_trait]
490impl DataSink for JsonSink {
491    fn as_any(&self) -> &dyn Any {
492        self
493    }
494
495    fn schema(&self) -> &SchemaRef {
496        self.config.output_schema()
497    }
498
499    async fn write_all(
500        &self,
501        data: SendableRecordBatchStream,
502        context: &Arc<TaskContext>,
503    ) -> Result<u64> {
504        FileSink::write_all(self, data, context).await
505    }
506}
507
508#[derive(Debug)]
509pub struct JsonDecoder {
510    inner: json::reader::Decoder,
511}
512
513impl JsonDecoder {
514    pub fn new(decoder: json::reader::Decoder) -> Self {
515        Self { inner: decoder }
516    }
517}
518
519impl Decoder for JsonDecoder {
520    fn decode(&mut self, buf: &[u8]) -> Result<usize, ArrowError> {
521        self.inner.decode(buf)
522    }
523
524    fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
525        self.inner.flush()
526    }
527
528    fn can_flush_early(&self) -> bool {
529        false
530    }
531}