datafusion_datasource_json/
file_format.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`JsonFormat`]: Line delimited JSON [`FileFormat`] abstractions
19
20use std::any::Any;
21use std::collections::HashMap;
22use std::fmt;
23use std::fmt::Debug;
24use std::io::BufReader;
25use std::sync::Arc;
26
27use crate::source::JsonSource;
28
29use arrow::array::RecordBatch;
30use arrow::datatypes::{Schema, SchemaRef};
31use arrow::error::ArrowError;
32use arrow::json;
33use arrow::json::reader::{infer_json_schema_from_iterator, ValueIter};
34use datafusion_common::config::{ConfigField, ConfigFileType, JsonOptions};
35use datafusion_common::file_options::json_writer::JsonWriterOptions;
36use datafusion_common::{
37    not_impl_err, GetExt, Result, Statistics, DEFAULT_JSON_EXTENSION,
38};
39use datafusion_common_runtime::SpawnedTask;
40use datafusion_datasource::decoder::Decoder;
41use datafusion_datasource::display::FileGroupDisplay;
42use datafusion_datasource::file::FileSource;
43use datafusion_datasource::file_compression_type::FileCompressionType;
44use datafusion_datasource::file_format::{
45    FileFormat, FileFormatFactory, DEFAULT_SCHEMA_INFER_MAX_RECORD,
46};
47use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
48use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig};
49use datafusion_datasource::sink::{DataSink, DataSinkExec};
50use datafusion_datasource::write::demux::DemuxedStreamReceiver;
51use datafusion_datasource::write::orchestration::spawn_writer_tasks_and_join;
52use datafusion_datasource::write::BatchSerializer;
53use datafusion_execution::{SendableRecordBatchStream, TaskContext};
54use datafusion_expr::dml::InsertOp;
55use datafusion_physical_expr_common::sort_expr::LexRequirement;
56use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
57use datafusion_session::Session;
58
59use async_trait::async_trait;
60use bytes::{Buf, Bytes};
61use datafusion_datasource::source::DataSourceExec;
62use object_store::{GetResultPayload, ObjectMeta, ObjectStore};
63
64#[derive(Default)]
65/// Factory struct used to create [JsonFormat]
66pub struct JsonFormatFactory {
67    /// the options carried by format factory
68    pub options: Option<JsonOptions>,
69}
70
71impl JsonFormatFactory {
72    /// Creates an instance of [JsonFormatFactory]
73    pub fn new() -> Self {
74        Self { options: None }
75    }
76
77    /// Creates an instance of [JsonFormatFactory] with customized default options
78    pub fn new_with_options(options: JsonOptions) -> Self {
79        Self {
80            options: Some(options),
81        }
82    }
83}
84
85impl FileFormatFactory for JsonFormatFactory {
86    fn create(
87        &self,
88        state: &dyn Session,
89        format_options: &HashMap<String, String>,
90    ) -> Result<Arc<dyn FileFormat>> {
91        let json_options = match &self.options {
92            None => {
93                let mut table_options = state.default_table_options();
94                table_options.set_config_format(ConfigFileType::JSON);
95                table_options.alter_with_string_hash_map(format_options)?;
96                table_options.json
97            }
98            Some(json_options) => {
99                let mut json_options = json_options.clone();
100                for (k, v) in format_options {
101                    json_options.set(k, v)?;
102                }
103                json_options
104            }
105        };
106
107        Ok(Arc::new(JsonFormat::default().with_options(json_options)))
108    }
109
110    fn default(&self) -> Arc<dyn FileFormat> {
111        Arc::new(JsonFormat::default())
112    }
113
114    fn as_any(&self) -> &dyn Any {
115        self
116    }
117}
118
119impl GetExt for JsonFormatFactory {
120    fn get_ext(&self) -> String {
121        // Removes the dot, i.e. ".parquet" -> "parquet"
122        DEFAULT_JSON_EXTENSION[1..].to_string()
123    }
124}
125
126impl Debug for JsonFormatFactory {
127    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
128        f.debug_struct("JsonFormatFactory")
129            .field("options", &self.options)
130            .finish()
131    }
132}
133
134/// New line delimited JSON `FileFormat` implementation.
135#[derive(Debug, Default)]
136pub struct JsonFormat {
137    options: JsonOptions,
138}
139
140impl JsonFormat {
141    /// Set JSON options
142    pub fn with_options(mut self, options: JsonOptions) -> Self {
143        self.options = options;
144        self
145    }
146
147    /// Retrieve JSON options
148    pub fn options(&self) -> &JsonOptions {
149        &self.options
150    }
151
152    /// Set a limit in terms of records to scan to infer the schema
153    /// - defaults to `DEFAULT_SCHEMA_INFER_MAX_RECORD`
154    pub fn with_schema_infer_max_rec(mut self, max_rec: usize) -> Self {
155        self.options.schema_infer_max_rec = Some(max_rec);
156        self
157    }
158
159    /// Set a [`FileCompressionType`] of JSON
160    /// - defaults to `FileCompressionType::UNCOMPRESSED`
161    pub fn with_file_compression_type(
162        mut self,
163        file_compression_type: FileCompressionType,
164    ) -> Self {
165        self.options.compression = file_compression_type.into();
166        self
167    }
168}
169
170#[async_trait]
171impl FileFormat for JsonFormat {
172    fn as_any(&self) -> &dyn Any {
173        self
174    }
175
176    fn get_ext(&self) -> String {
177        JsonFormatFactory::new().get_ext()
178    }
179
180    fn get_ext_with_compression(
181        &self,
182        file_compression_type: &FileCompressionType,
183    ) -> Result<String> {
184        let ext = self.get_ext();
185        Ok(format!("{}{}", ext, file_compression_type.get_ext()))
186    }
187
188    async fn infer_schema(
189        &self,
190        _state: &dyn Session,
191        store: &Arc<dyn ObjectStore>,
192        objects: &[ObjectMeta],
193    ) -> Result<SchemaRef> {
194        let mut schemas = Vec::new();
195        let mut records_to_read = self
196            .options
197            .schema_infer_max_rec
198            .unwrap_or(DEFAULT_SCHEMA_INFER_MAX_RECORD);
199        let file_compression_type = FileCompressionType::from(self.options.compression);
200        for object in objects {
201            let mut take_while = || {
202                let should_take = records_to_read > 0;
203                if should_take {
204                    records_to_read -= 1;
205                }
206                should_take
207            };
208
209            let r = store.as_ref().get(&object.location).await?;
210            let schema = match r.payload {
211                #[cfg(not(target_arch = "wasm32"))]
212                GetResultPayload::File(file, _) => {
213                    let decoder = file_compression_type.convert_read(file)?;
214                    let mut reader = BufReader::new(decoder);
215                    let iter = ValueIter::new(&mut reader, None);
216                    infer_json_schema_from_iterator(iter.take_while(|_| take_while()))?
217                }
218                GetResultPayload::Stream(_) => {
219                    let data = r.bytes().await?;
220                    let decoder = file_compression_type.convert_read(data.reader())?;
221                    let mut reader = BufReader::new(decoder);
222                    let iter = ValueIter::new(&mut reader, None);
223                    infer_json_schema_from_iterator(iter.take_while(|_| take_while()))?
224                }
225            };
226
227            schemas.push(schema);
228            if records_to_read == 0 {
229                break;
230            }
231        }
232
233        let schema = Schema::try_merge(schemas)?;
234        Ok(Arc::new(schema))
235    }
236
237    async fn infer_stats(
238        &self,
239        _state: &dyn Session,
240        _store: &Arc<dyn ObjectStore>,
241        table_schema: SchemaRef,
242        _object: &ObjectMeta,
243    ) -> Result<Statistics> {
244        Ok(Statistics::new_unknown(&table_schema))
245    }
246
247    async fn create_physical_plan(
248        &self,
249        _state: &dyn Session,
250        conf: FileScanConfig,
251    ) -> Result<Arc<dyn ExecutionPlan>> {
252        let source = Arc::new(JsonSource::new());
253        let conf = FileScanConfigBuilder::from(conf)
254            .with_file_compression_type(FileCompressionType::from(
255                self.options.compression,
256            ))
257            .with_source(source)
258            .build();
259        Ok(DataSourceExec::from_data_source(conf))
260    }
261
262    async fn create_writer_physical_plan(
263        &self,
264        input: Arc<dyn ExecutionPlan>,
265        _state: &dyn Session,
266        conf: FileSinkConfig,
267        order_requirements: Option<LexRequirement>,
268    ) -> Result<Arc<dyn ExecutionPlan>> {
269        if conf.insert_op != InsertOp::Append {
270            return not_impl_err!("Overwrites are not implemented yet for Json");
271        }
272
273        let writer_options = JsonWriterOptions::try_from(&self.options)?;
274
275        let sink = Arc::new(JsonSink::new(conf, writer_options));
276
277        Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _)
278    }
279
280    fn file_source(&self) -> Arc<dyn FileSource> {
281        Arc::new(JsonSource::default())
282    }
283}
284
285impl Default for JsonSerializer {
286    fn default() -> Self {
287        Self::new()
288    }
289}
290
291/// Define a struct for serializing Json records to a stream
292pub struct JsonSerializer {}
293
294impl JsonSerializer {
295    /// Constructor for the JsonSerializer object
296    pub fn new() -> Self {
297        Self {}
298    }
299}
300
301impl BatchSerializer for JsonSerializer {
302    fn serialize(&self, batch: RecordBatch, _initial: bool) -> Result<Bytes> {
303        let mut buffer = Vec::with_capacity(4096);
304        let mut writer = json::LineDelimitedWriter::new(&mut buffer);
305        writer.write(&batch)?;
306        Ok(Bytes::from(buffer))
307    }
308}
309
310/// Implements [`DataSink`] for writing to a Json file.
311pub struct JsonSink {
312    /// Config options for writing data
313    config: FileSinkConfig,
314    /// Writer options for underlying Json writer
315    writer_options: JsonWriterOptions,
316}
317
318impl Debug for JsonSink {
319    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
320        f.debug_struct("JsonSink").finish()
321    }
322}
323
324impl DisplayAs for JsonSink {
325    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
326        match t {
327            DisplayFormatType::Default | DisplayFormatType::Verbose => {
328                write!(f, "JsonSink(file_groups=",)?;
329                FileGroupDisplay(&self.config.file_group).fmt_as(t, f)?;
330                write!(f, ")")
331            }
332            DisplayFormatType::TreeRender => {
333                writeln!(f, "format: json")?;
334                write!(f, "file={}", &self.config.original_url)
335            }
336        }
337    }
338}
339
340impl JsonSink {
341    /// Create from config.
342    pub fn new(config: FileSinkConfig, writer_options: JsonWriterOptions) -> Self {
343        Self {
344            config,
345            writer_options,
346        }
347    }
348
349    /// Retrieve the writer options
350    pub fn writer_options(&self) -> &JsonWriterOptions {
351        &self.writer_options
352    }
353}
354
355#[async_trait]
356impl FileSink for JsonSink {
357    fn config(&self) -> &FileSinkConfig {
358        &self.config
359    }
360
361    async fn spawn_writer_tasks_and_join(
362        &self,
363        context: &Arc<TaskContext>,
364        demux_task: SpawnedTask<Result<()>>,
365        file_stream_rx: DemuxedStreamReceiver,
366        object_store: Arc<dyn ObjectStore>,
367    ) -> Result<u64> {
368        let serializer = Arc::new(JsonSerializer::new()) as _;
369        spawn_writer_tasks_and_join(
370            context,
371            serializer,
372            self.writer_options.compression.into(),
373            object_store,
374            demux_task,
375            file_stream_rx,
376        )
377        .await
378    }
379}
380
381#[async_trait]
382impl DataSink for JsonSink {
383    fn as_any(&self) -> &dyn Any {
384        self
385    }
386
387    fn schema(&self) -> &SchemaRef {
388        self.config.output_schema()
389    }
390
391    async fn write_all(
392        &self,
393        data: SendableRecordBatchStream,
394        context: &Arc<TaskContext>,
395    ) -> Result<u64> {
396        FileSink::write_all(self, data, context).await
397    }
398}
399
400#[derive(Debug)]
401pub struct JsonDecoder {
402    inner: json::reader::Decoder,
403}
404
405impl JsonDecoder {
406    pub fn new(decoder: json::reader::Decoder) -> Self {
407        Self { inner: decoder }
408    }
409}
410
411impl Decoder for JsonDecoder {
412    fn decode(&mut self, buf: &[u8]) -> Result<usize, ArrowError> {
413        self.inner.decode(buf)
414    }
415
416    fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
417        self.inner.flush()
418    }
419
420    fn can_flush_early(&self) -> bool {
421        false
422    }
423}