datafusion_datasource_json/
source.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Execution plan for reading line-delimited JSON files
19
20use std::any::Any;
21use std::io::{BufReader, Read, Seek, SeekFrom};
22use std::sync::Arc;
23use std::task::Poll;
24
25use crate::file_format::JsonDecoder;
26
27use datafusion_common::error::{DataFusionError, Result};
28use datafusion_common_runtime::JoinSet;
29use datafusion_datasource::decoder::{deserialize_stream, DecoderDeserializer};
30use datafusion_datasource::file_compression_type::FileCompressionType;
31use datafusion_datasource::file_meta::FileMeta;
32use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
33use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
34use datafusion_datasource::{
35    as_file_source, calculate_range, ListingTableUrl, RangeCalculation,
36};
37use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
38
39use arrow::json::ReaderBuilder;
40use arrow::{datatypes::SchemaRef, json};
41use datafusion_common::Statistics;
42use datafusion_datasource::file::FileSource;
43use datafusion_datasource::file_scan_config::FileScanConfig;
44use datafusion_execution::TaskContext;
45use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
46
47use futures::{StreamExt, TryStreamExt};
48use object_store::buffered::BufWriter;
49use object_store::{GetOptions, GetResultPayload, ObjectStore};
50use tokio::io::AsyncWriteExt;
51
52/// A [`FileOpener`] that opens a JSON file and yields a [`FileOpenFuture`]
53pub struct JsonOpener {
54    batch_size: usize,
55    projected_schema: SchemaRef,
56    file_compression_type: FileCompressionType,
57    object_store: Arc<dyn ObjectStore>,
58}
59
60impl JsonOpener {
61    /// Returns a  [`JsonOpener`]
62    pub fn new(
63        batch_size: usize,
64        projected_schema: SchemaRef,
65        file_compression_type: FileCompressionType,
66        object_store: Arc<dyn ObjectStore>,
67    ) -> Self {
68        Self {
69            batch_size,
70            projected_schema,
71            file_compression_type,
72            object_store,
73        }
74    }
75}
76
77/// JsonSource holds the extra configuration that is necessary for [`JsonOpener`]
78#[derive(Clone, Default)]
79pub struct JsonSource {
80    batch_size: Option<usize>,
81    metrics: ExecutionPlanMetricsSet,
82    projected_statistics: Option<Statistics>,
83    schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
84}
85
86impl JsonSource {
87    /// Initialize a JsonSource with default values
88    pub fn new() -> Self {
89        Self::default()
90    }
91}
92
93impl From<JsonSource> for Arc<dyn FileSource> {
94    fn from(source: JsonSource) -> Self {
95        as_file_source(source)
96    }
97}
98
99impl FileSource for JsonSource {
100    fn create_file_opener(
101        &self,
102        object_store: Arc<dyn ObjectStore>,
103        base_config: &FileScanConfig,
104        _partition: usize,
105    ) -> Arc<dyn FileOpener> {
106        Arc::new(JsonOpener {
107            batch_size: self
108                .batch_size
109                .expect("Batch size must set before creating opener"),
110            projected_schema: base_config.projected_file_schema(),
111            file_compression_type: base_config.file_compression_type,
112            object_store,
113        })
114    }
115
116    fn as_any(&self) -> &dyn Any {
117        self
118    }
119
120    fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
121        let mut conf = self.clone();
122        conf.batch_size = Some(batch_size);
123        Arc::new(conf)
124    }
125
126    fn with_schema(&self, _schema: SchemaRef) -> Arc<dyn FileSource> {
127        Arc::new(Self { ..self.clone() })
128    }
129    fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
130        let mut conf = self.clone();
131        conf.projected_statistics = Some(statistics);
132        Arc::new(conf)
133    }
134
135    fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource> {
136        Arc::new(Self { ..self.clone() })
137    }
138
139    fn metrics(&self) -> &ExecutionPlanMetricsSet {
140        &self.metrics
141    }
142
143    fn statistics(&self) -> Result<Statistics> {
144        let statistics = &self.projected_statistics;
145        Ok(statistics
146            .clone()
147            .expect("projected_statistics must be set to call"))
148    }
149
150    fn file_type(&self) -> &str {
151        "json"
152    }
153
154    fn with_schema_adapter_factory(
155        &self,
156        schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
157    ) -> Result<Arc<dyn FileSource>> {
158        Ok(Arc::new(Self {
159            schema_adapter_factory: Some(schema_adapter_factory),
160            ..self.clone()
161        }))
162    }
163
164    fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
165        self.schema_adapter_factory.clone()
166    }
167}
168
169impl FileOpener for JsonOpener {
170    /// Open a partitioned NDJSON file.
171    ///
172    /// If `file_meta.range` is `None`, the entire file is opened.
173    /// Else `file_meta.range` is `Some(FileRange{start, end})`, which corresponds to the byte range [start, end) within the file.
174    ///
175    /// Note: `start` or `end` might be in the middle of some lines. In such cases, the following rules
176    /// are applied to determine which lines to read:
177    /// 1. The first line of the partition is the line in which the index of the first character >= `start`.
178    /// 2. The last line of the partition is the line in which the byte at position `end - 1` resides.
179    fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture> {
180        let store = Arc::clone(&self.object_store);
181        let schema = Arc::clone(&self.projected_schema);
182        let batch_size = self.batch_size;
183        let file_compression_type = self.file_compression_type.to_owned();
184
185        Ok(Box::pin(async move {
186            let calculated_range = calculate_range(&file_meta, &store, None).await?;
187
188            let range = match calculated_range {
189                RangeCalculation::Range(None) => None,
190                RangeCalculation::Range(Some(range)) => Some(range.into()),
191                RangeCalculation::TerminateEarly => {
192                    return Ok(
193                        futures::stream::poll_fn(move |_| Poll::Ready(None)).boxed()
194                    )
195                }
196            };
197
198            let options = GetOptions {
199                range,
200                ..Default::default()
201            };
202
203            let result = store.get_opts(file_meta.location(), options).await?;
204
205            match result.payload {
206                #[cfg(not(target_arch = "wasm32"))]
207                GetResultPayload::File(mut file, _) => {
208                    let bytes = match file_meta.range {
209                        None => file_compression_type.convert_read(file)?,
210                        Some(_) => {
211                            file.seek(SeekFrom::Start(result.range.start as _))?;
212                            let limit = result.range.end - result.range.start;
213                            file_compression_type.convert_read(file.take(limit as u64))?
214                        }
215                    };
216
217                    let reader = ReaderBuilder::new(schema)
218                        .with_batch_size(batch_size)
219                        .build(BufReader::new(bytes))?;
220
221                    Ok(futures::stream::iter(reader).boxed())
222                }
223                GetResultPayload::Stream(s) => {
224                    let s = s.map_err(DataFusionError::from);
225
226                    let decoder = ReaderBuilder::new(schema)
227                        .with_batch_size(batch_size)
228                        .build_decoder()?;
229                    let input = file_compression_type.convert_stream(s.boxed())?.fuse();
230
231                    Ok(deserialize_stream(
232                        input,
233                        DecoderDeserializer::new(JsonDecoder::new(decoder)),
234                    ))
235                }
236            }
237        }))
238    }
239}
240
241pub async fn plan_to_json(
242    task_ctx: Arc<TaskContext>,
243    plan: Arc<dyn ExecutionPlan>,
244    path: impl AsRef<str>,
245) -> Result<()> {
246    let path = path.as_ref();
247    let parsed = ListingTableUrl::parse(path)?;
248    let object_store_url = parsed.object_store();
249    let store = task_ctx.runtime_env().object_store(&object_store_url)?;
250    let writer_buffer_size = task_ctx
251        .session_config()
252        .options()
253        .execution
254        .objectstore_writer_buffer_size;
255    let mut join_set = JoinSet::new();
256    for i in 0..plan.output_partitioning().partition_count() {
257        let storeref = Arc::clone(&store);
258        let plan: Arc<dyn ExecutionPlan> = Arc::clone(&plan);
259        let filename = format!("{}/part-{i}.json", parsed.prefix());
260        let file = object_store::path::Path::parse(filename)?;
261
262        let mut stream = plan.execute(i, Arc::clone(&task_ctx))?;
263        join_set.spawn(async move {
264            let mut buf_writer =
265                BufWriter::with_capacity(storeref, file.clone(), writer_buffer_size);
266
267            let mut buffer = Vec::with_capacity(1024);
268            while let Some(batch) = stream.next().await.transpose()? {
269                let mut writer = json::LineDelimitedWriter::new(buffer);
270                writer.write(&batch)?;
271                buffer = writer.into_inner();
272                buf_writer.write_all(&buffer).await?;
273                buffer.clear();
274            }
275
276            buf_writer.shutdown().await.map_err(DataFusionError::from)
277        });
278    }
279
280    while let Some(result) = join_set.join_next().await {
281        match result {
282            Ok(res) => res?, // propagate DataFusion error
283            Err(e) => {
284                if e.is_panic() {
285                    std::panic::resume_unwind(e.into_panic());
286                } else {
287                    unreachable!();
288                }
289            }
290        }
291    }
292
293    Ok(())
294}