datafusion_datasource_json/
file_format.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`JsonFormat`]: Line delimited JSON [`FileFormat`] abstractions
19
20use std::any::Any;
21use std::collections::HashMap;
22use std::fmt;
23use std::fmt::Debug;
24use std::io::BufReader;
25use std::sync::Arc;
26
27use crate::source::JsonSource;
28
29use arrow::array::RecordBatch;
30use arrow::datatypes::{Schema, SchemaRef};
31use arrow::error::ArrowError;
32use arrow::json;
33use arrow::json::reader::{ValueIter, infer_json_schema_from_iterator};
34use datafusion_common::config::{ConfigField, ConfigFileType, JsonOptions};
35use datafusion_common::file_options::json_writer::JsonWriterOptions;
36use datafusion_common::{
37    DEFAULT_JSON_EXTENSION, GetExt, Result, Statistics, not_impl_err,
38};
39use datafusion_common_runtime::SpawnedTask;
40use datafusion_datasource::TableSchema;
41use datafusion_datasource::decoder::Decoder;
42use datafusion_datasource::display::FileGroupDisplay;
43use datafusion_datasource::file::FileSource;
44use datafusion_datasource::file_compression_type::FileCompressionType;
45use datafusion_datasource::file_format::{
46    DEFAULT_SCHEMA_INFER_MAX_RECORD, FileFormat, FileFormatFactory,
47};
48use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
49use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig};
50use datafusion_datasource::sink::{DataSink, DataSinkExec};
51use datafusion_datasource::write::BatchSerializer;
52use datafusion_datasource::write::demux::DemuxedStreamReceiver;
53use datafusion_datasource::write::orchestration::spawn_writer_tasks_and_join;
54use datafusion_execution::{SendableRecordBatchStream, TaskContext};
55use datafusion_expr::dml::InsertOp;
56use datafusion_physical_expr_common::sort_expr::LexRequirement;
57use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
58use datafusion_session::Session;
59
60use async_trait::async_trait;
61use bytes::{Buf, Bytes};
62use datafusion_datasource::source::DataSourceExec;
63use object_store::{GetResultPayload, ObjectMeta, ObjectStore};
64
65#[derive(Default)]
66/// Factory struct used to create [JsonFormat]
67pub struct JsonFormatFactory {
68    /// the options carried by format factory
69    pub options: Option<JsonOptions>,
70}
71
72impl JsonFormatFactory {
73    /// Creates an instance of [JsonFormatFactory]
74    pub fn new() -> Self {
75        Self { options: None }
76    }
77
78    /// Creates an instance of [JsonFormatFactory] with customized default options
79    pub fn new_with_options(options: JsonOptions) -> Self {
80        Self {
81            options: Some(options),
82        }
83    }
84}
85
86impl FileFormatFactory for JsonFormatFactory {
87    fn create(
88        &self,
89        state: &dyn Session,
90        format_options: &HashMap<String, String>,
91    ) -> Result<Arc<dyn FileFormat>> {
92        let json_options = match &self.options {
93            None => {
94                let mut table_options = state.default_table_options();
95                table_options.set_config_format(ConfigFileType::JSON);
96                table_options.alter_with_string_hash_map(format_options)?;
97                table_options.json
98            }
99            Some(json_options) => {
100                let mut json_options = json_options.clone();
101                for (k, v) in format_options {
102                    json_options.set(k, v)?;
103                }
104                json_options
105            }
106        };
107
108        Ok(Arc::new(JsonFormat::default().with_options(json_options)))
109    }
110
111    fn default(&self) -> Arc<dyn FileFormat> {
112        Arc::new(JsonFormat::default())
113    }
114
115    fn as_any(&self) -> &dyn Any {
116        self
117    }
118}
119
120impl GetExt for JsonFormatFactory {
121    fn get_ext(&self) -> String {
122        // Removes the dot, i.e. ".parquet" -> "parquet"
123        DEFAULT_JSON_EXTENSION[1..].to_string()
124    }
125}
126
127impl Debug for JsonFormatFactory {
128    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
129        f.debug_struct("JsonFormatFactory")
130            .field("options", &self.options)
131            .finish()
132    }
133}
134
135/// New line delimited JSON `FileFormat` implementation.
136#[derive(Debug, Default)]
137pub struct JsonFormat {
138    options: JsonOptions,
139}
140
141impl JsonFormat {
142    /// Set JSON options
143    pub fn with_options(mut self, options: JsonOptions) -> Self {
144        self.options = options;
145        self
146    }
147
148    /// Retrieve JSON options
149    pub fn options(&self) -> &JsonOptions {
150        &self.options
151    }
152
153    /// Set a limit in terms of records to scan to infer the schema
154    /// - defaults to `DEFAULT_SCHEMA_INFER_MAX_RECORD`
155    pub fn with_schema_infer_max_rec(mut self, max_rec: usize) -> Self {
156        self.options.schema_infer_max_rec = Some(max_rec);
157        self
158    }
159
160    /// Set a [`FileCompressionType`] of JSON
161    /// - defaults to `FileCompressionType::UNCOMPRESSED`
162    pub fn with_file_compression_type(
163        mut self,
164        file_compression_type: FileCompressionType,
165    ) -> Self {
166        self.options.compression = file_compression_type.into();
167        self
168    }
169}
170
171#[async_trait]
172impl FileFormat for JsonFormat {
173    fn as_any(&self) -> &dyn Any {
174        self
175    }
176
177    fn get_ext(&self) -> String {
178        JsonFormatFactory::new().get_ext()
179    }
180
181    fn get_ext_with_compression(
182        &self,
183        file_compression_type: &FileCompressionType,
184    ) -> Result<String> {
185        let ext = self.get_ext();
186        Ok(format!("{}{}", ext, file_compression_type.get_ext()))
187    }
188
189    fn compression_type(&self) -> Option<FileCompressionType> {
190        Some(self.options.compression.into())
191    }
192
193    async fn infer_schema(
194        &self,
195        _state: &dyn Session,
196        store: &Arc<dyn ObjectStore>,
197        objects: &[ObjectMeta],
198    ) -> Result<SchemaRef> {
199        let mut schemas = Vec::new();
200        let mut records_to_read = self
201            .options
202            .schema_infer_max_rec
203            .unwrap_or(DEFAULT_SCHEMA_INFER_MAX_RECORD);
204        let file_compression_type = FileCompressionType::from(self.options.compression);
205        for object in objects {
206            let mut take_while = || {
207                let should_take = records_to_read > 0;
208                if should_take {
209                    records_to_read -= 1;
210                }
211                should_take
212            };
213
214            let r = store.as_ref().get(&object.location).await?;
215            let schema = match r.payload {
216                #[cfg(not(target_arch = "wasm32"))]
217                GetResultPayload::File(file, _) => {
218                    let decoder = file_compression_type.convert_read(file)?;
219                    let mut reader = BufReader::new(decoder);
220                    let iter = ValueIter::new(&mut reader, None);
221                    infer_json_schema_from_iterator(iter.take_while(|_| take_while()))?
222                }
223                GetResultPayload::Stream(_) => {
224                    let data = r.bytes().await?;
225                    let decoder = file_compression_type.convert_read(data.reader())?;
226                    let mut reader = BufReader::new(decoder);
227                    let iter = ValueIter::new(&mut reader, None);
228                    infer_json_schema_from_iterator(iter.take_while(|_| take_while()))?
229                }
230            };
231
232            schemas.push(schema);
233            if records_to_read == 0 {
234                break;
235            }
236        }
237
238        let schema = Schema::try_merge(schemas)?;
239        Ok(Arc::new(schema))
240    }
241
242    async fn infer_stats(
243        &self,
244        _state: &dyn Session,
245        _store: &Arc<dyn ObjectStore>,
246        table_schema: SchemaRef,
247        _object: &ObjectMeta,
248    ) -> Result<Statistics> {
249        Ok(Statistics::new_unknown(&table_schema))
250    }
251
252    async fn create_physical_plan(
253        &self,
254        _state: &dyn Session,
255        conf: FileScanConfig,
256    ) -> Result<Arc<dyn ExecutionPlan>> {
257        let conf = FileScanConfigBuilder::from(conf)
258            .with_file_compression_type(FileCompressionType::from(
259                self.options.compression,
260            ))
261            .build();
262        Ok(DataSourceExec::from_data_source(conf))
263    }
264
265    async fn create_writer_physical_plan(
266        &self,
267        input: Arc<dyn ExecutionPlan>,
268        _state: &dyn Session,
269        conf: FileSinkConfig,
270        order_requirements: Option<LexRequirement>,
271    ) -> Result<Arc<dyn ExecutionPlan>> {
272        if conf.insert_op != InsertOp::Append {
273            return not_impl_err!("Overwrites are not implemented yet for Json");
274        }
275
276        let writer_options = JsonWriterOptions::try_from(&self.options)?;
277
278        let sink = Arc::new(JsonSink::new(conf, writer_options));
279
280        Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _)
281    }
282
283    fn file_source(&self, table_schema: TableSchema) -> Arc<dyn FileSource> {
284        Arc::new(JsonSource::new(table_schema))
285    }
286}
287
288impl Default for JsonSerializer {
289    fn default() -> Self {
290        Self::new()
291    }
292}
293
294/// Define a struct for serializing Json records to a stream
295pub struct JsonSerializer {}
296
297impl JsonSerializer {
298    /// Constructor for the JsonSerializer object
299    pub fn new() -> Self {
300        Self {}
301    }
302}
303
304impl BatchSerializer for JsonSerializer {
305    fn serialize(&self, batch: RecordBatch, _initial: bool) -> Result<Bytes> {
306        let mut buffer = Vec::with_capacity(4096);
307        let mut writer = json::LineDelimitedWriter::new(&mut buffer);
308        writer.write(&batch)?;
309        Ok(Bytes::from(buffer))
310    }
311}
312
313/// Implements [`DataSink`] for writing to a Json file.
314pub struct JsonSink {
315    /// Config options for writing data
316    config: FileSinkConfig,
317    /// Writer options for underlying Json writer
318    writer_options: JsonWriterOptions,
319}
320
321impl Debug for JsonSink {
322    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
323        f.debug_struct("JsonSink").finish()
324    }
325}
326
327impl DisplayAs for JsonSink {
328    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
329        match t {
330            DisplayFormatType::Default | DisplayFormatType::Verbose => {
331                write!(f, "JsonSink(file_groups=",)?;
332                FileGroupDisplay(&self.config.file_group).fmt_as(t, f)?;
333                write!(f, ")")
334            }
335            DisplayFormatType::TreeRender => {
336                writeln!(f, "format: json")?;
337                write!(f, "file={}", &self.config.original_url)
338            }
339        }
340    }
341}
342
343impl JsonSink {
344    /// Create from config.
345    pub fn new(config: FileSinkConfig, writer_options: JsonWriterOptions) -> Self {
346        Self {
347            config,
348            writer_options,
349        }
350    }
351
352    /// Retrieve the writer options
353    pub fn writer_options(&self) -> &JsonWriterOptions {
354        &self.writer_options
355    }
356}
357
358#[async_trait]
359impl FileSink for JsonSink {
360    fn config(&self) -> &FileSinkConfig {
361        &self.config
362    }
363
364    async fn spawn_writer_tasks_and_join(
365        &self,
366        context: &Arc<TaskContext>,
367        demux_task: SpawnedTask<Result<()>>,
368        file_stream_rx: DemuxedStreamReceiver,
369        object_store: Arc<dyn ObjectStore>,
370    ) -> Result<u64> {
371        let serializer = Arc::new(JsonSerializer::new()) as _;
372        spawn_writer_tasks_and_join(
373            context,
374            serializer,
375            self.writer_options.compression.into(),
376            self.writer_options.compression_level,
377            object_store,
378            demux_task,
379            file_stream_rx,
380        )
381        .await
382    }
383}
384
385#[async_trait]
386impl DataSink for JsonSink {
387    fn as_any(&self) -> &dyn Any {
388        self
389    }
390
391    fn schema(&self) -> &SchemaRef {
392        self.config.output_schema()
393    }
394
395    async fn write_all(
396        &self,
397        data: SendableRecordBatchStream,
398        context: &Arc<TaskContext>,
399    ) -> Result<u64> {
400        FileSink::write_all(self, data, context).await
401    }
402}
403
404#[derive(Debug)]
405pub struct JsonDecoder {
406    inner: json::reader::Decoder,
407}
408
409impl JsonDecoder {
410    pub fn new(decoder: json::reader::Decoder) -> Self {
411        Self { inner: decoder }
412    }
413}
414
415impl Decoder for JsonDecoder {
416    fn decode(&mut self, buf: &[u8]) -> Result<usize, ArrowError> {
417        self.inner.decode(buf)
418    }
419
420    fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
421        self.inner.flush()
422    }
423
424    fn can_flush_early(&self) -> bool {
425        false
426    }
427}