datafusion_datasource_json/
source.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Execution plan for reading line-delimited JSON files
19
20use std::any::Any;
21use std::io::{BufReader, Read, Seek, SeekFrom};
22use std::sync::Arc;
23use std::task::Poll;
24
25use crate::file_format::JsonDecoder;
26
27use datafusion_common::error::{DataFusionError, Result};
28use datafusion_common_runtime::JoinSet;
29use datafusion_datasource::decoder::{DecoderDeserializer, deserialize_stream};
30use datafusion_datasource::file_compression_type::FileCompressionType;
31use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
32use datafusion_datasource::projection::{ProjectionOpener, SplitProjection};
33use datafusion_datasource::{
34    ListingTableUrl, PartitionedFile, RangeCalculation, as_file_source, calculate_range,
35};
36use datafusion_physical_plan::projection::ProjectionExprs;
37use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
38
39use arrow::json::ReaderBuilder;
40use arrow::{datatypes::SchemaRef, json};
41use datafusion_datasource::file::FileSource;
42use datafusion_datasource::file_scan_config::FileScanConfig;
43use datafusion_execution::TaskContext;
44use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
45
46use futures::{StreamExt, TryStreamExt};
47use object_store::buffered::BufWriter;
48use object_store::{GetOptions, GetResultPayload, ObjectStore};
49use tokio::io::AsyncWriteExt;
50
51/// A [`FileOpener`] that opens a JSON file and yields a [`FileOpenFuture`]
52pub struct JsonOpener {
53    batch_size: usize,
54    projected_schema: SchemaRef,
55    file_compression_type: FileCompressionType,
56    object_store: Arc<dyn ObjectStore>,
57}
58
59impl JsonOpener {
60    /// Returns a  [`JsonOpener`]
61    pub fn new(
62        batch_size: usize,
63        projected_schema: SchemaRef,
64        file_compression_type: FileCompressionType,
65        object_store: Arc<dyn ObjectStore>,
66    ) -> Self {
67        Self {
68            batch_size,
69            projected_schema,
70            file_compression_type,
71            object_store,
72        }
73    }
74}
75
76/// JsonSource holds the extra configuration that is necessary for [`JsonOpener`]
77#[derive(Clone)]
78pub struct JsonSource {
79    table_schema: datafusion_datasource::TableSchema,
80    batch_size: Option<usize>,
81    metrics: ExecutionPlanMetricsSet,
82    projection: SplitProjection,
83}
84
85impl JsonSource {
86    /// Initialize a JsonSource with the provided schema
87    pub fn new(table_schema: impl Into<datafusion_datasource::TableSchema>) -> Self {
88        let table_schema = table_schema.into();
89        Self {
90            projection: SplitProjection::unprojected(&table_schema),
91            table_schema,
92            batch_size: None,
93            metrics: ExecutionPlanMetricsSet::new(),
94        }
95    }
96}
97
98impl From<JsonSource> for Arc<dyn FileSource> {
99    fn from(source: JsonSource) -> Self {
100        as_file_source(source)
101    }
102}
103
104impl FileSource for JsonSource {
105    fn create_file_opener(
106        &self,
107        object_store: Arc<dyn ObjectStore>,
108        base_config: &FileScanConfig,
109        _partition: usize,
110    ) -> Result<Arc<dyn FileOpener>> {
111        // Get the projected file schema for JsonOpener
112        let file_schema = self.table_schema.file_schema();
113        let projected_schema =
114            Arc::new(file_schema.project(&self.projection.file_indices)?);
115
116        let mut opener = Arc::new(JsonOpener {
117            batch_size: self
118                .batch_size
119                .expect("Batch size must set before creating opener"),
120            projected_schema,
121            file_compression_type: base_config.file_compression_type,
122            object_store,
123        }) as Arc<dyn FileOpener>;
124
125        // Wrap with ProjectionOpener
126        opener = ProjectionOpener::try_new(
127            self.projection.clone(),
128            Arc::clone(&opener),
129            self.table_schema.file_schema(),
130        )?;
131
132        Ok(opener)
133    }
134
135    fn as_any(&self) -> &dyn Any {
136        self
137    }
138
139    fn table_schema(&self) -> &datafusion_datasource::TableSchema {
140        &self.table_schema
141    }
142
143    fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
144        let mut conf = self.clone();
145        conf.batch_size = Some(batch_size);
146        Arc::new(conf)
147    }
148
149    fn try_pushdown_projection(
150        &self,
151        projection: &ProjectionExprs,
152    ) -> Result<Option<Arc<dyn FileSource>>> {
153        let mut source = self.clone();
154        let new_projection = self.projection.source.try_merge(projection)?;
155        let split_projection =
156            SplitProjection::new(self.table_schema.file_schema(), &new_projection);
157        source.projection = split_projection;
158        Ok(Some(Arc::new(source)))
159    }
160
161    fn projection(&self) -> Option<&ProjectionExprs> {
162        Some(&self.projection.source)
163    }
164
165    fn metrics(&self) -> &ExecutionPlanMetricsSet {
166        &self.metrics
167    }
168
169    fn file_type(&self) -> &str {
170        "json"
171    }
172}
173
174impl FileOpener for JsonOpener {
175    /// Open a partitioned NDJSON file.
176    ///
177    /// If `file_meta.range` is `None`, the entire file is opened.
178    /// Else `file_meta.range` is `Some(FileRange{start, end})`, which corresponds to the byte range [start, end) within the file.
179    ///
180    /// Note: `start` or `end` might be in the middle of some lines. In such cases, the following rules
181    /// are applied to determine which lines to read:
182    /// 1. The first line of the partition is the line in which the index of the first character >= `start`.
183    /// 2. The last line of the partition is the line in which the byte at position `end - 1` resides.
184    fn open(&self, partitioned_file: PartitionedFile) -> Result<FileOpenFuture> {
185        let store = Arc::clone(&self.object_store);
186        let schema = Arc::clone(&self.projected_schema);
187        let batch_size = self.batch_size;
188        let file_compression_type = self.file_compression_type.to_owned();
189
190        Ok(Box::pin(async move {
191            let calculated_range =
192                calculate_range(&partitioned_file, &store, None).await?;
193
194            let range = match calculated_range {
195                RangeCalculation::Range(None) => None,
196                RangeCalculation::Range(Some(range)) => Some(range.into()),
197                RangeCalculation::TerminateEarly => {
198                    return Ok(
199                        futures::stream::poll_fn(move |_| Poll::Ready(None)).boxed()
200                    );
201                }
202            };
203
204            let options = GetOptions {
205                range,
206                ..Default::default()
207            };
208
209            let result = store
210                .get_opts(&partitioned_file.object_meta.location, options)
211                .await?;
212
213            match result.payload {
214                #[cfg(not(target_arch = "wasm32"))]
215                GetResultPayload::File(mut file, _) => {
216                    let bytes = match partitioned_file.range {
217                        None => file_compression_type.convert_read(file)?,
218                        Some(_) => {
219                            file.seek(SeekFrom::Start(result.range.start as _))?;
220                            let limit = result.range.end - result.range.start;
221                            file_compression_type.convert_read(file.take(limit as u64))?
222                        }
223                    };
224
225                    let reader = ReaderBuilder::new(schema)
226                        .with_batch_size(batch_size)
227                        .build(BufReader::new(bytes))?;
228
229                    Ok(futures::stream::iter(reader)
230                        .map(|r| r.map_err(Into::into))
231                        .boxed())
232                }
233                GetResultPayload::Stream(s) => {
234                    let s = s.map_err(DataFusionError::from);
235
236                    let decoder = ReaderBuilder::new(schema)
237                        .with_batch_size(batch_size)
238                        .build_decoder()?;
239                    let input = file_compression_type.convert_stream(s.boxed())?.fuse();
240
241                    let stream = deserialize_stream(
242                        input,
243                        DecoderDeserializer::new(JsonDecoder::new(decoder)),
244                    );
245                    Ok(stream.map_err(Into::into).boxed())
246                }
247            }
248        }))
249    }
250}
251
252pub async fn plan_to_json(
253    task_ctx: Arc<TaskContext>,
254    plan: Arc<dyn ExecutionPlan>,
255    path: impl AsRef<str>,
256) -> Result<()> {
257    let path = path.as_ref();
258    let parsed = ListingTableUrl::parse(path)?;
259    let object_store_url = parsed.object_store();
260    let store = task_ctx.runtime_env().object_store(&object_store_url)?;
261    let writer_buffer_size = task_ctx
262        .session_config()
263        .options()
264        .execution
265        .objectstore_writer_buffer_size;
266    let mut join_set = JoinSet::new();
267    for i in 0..plan.output_partitioning().partition_count() {
268        let storeref = Arc::clone(&store);
269        let plan: Arc<dyn ExecutionPlan> = Arc::clone(&plan);
270        let filename = format!("{}/part-{i}.json", parsed.prefix());
271        let file = object_store::path::Path::parse(filename)?;
272
273        let mut stream = plan.execute(i, Arc::clone(&task_ctx))?;
274        join_set.spawn(async move {
275            let mut buf_writer =
276                BufWriter::with_capacity(storeref, file.clone(), writer_buffer_size);
277
278            let mut buffer = Vec::with_capacity(1024);
279            while let Some(batch) = stream.next().await.transpose()? {
280                let mut writer = json::LineDelimitedWriter::new(buffer);
281                writer.write(&batch)?;
282                buffer = writer.into_inner();
283                buf_writer.write_all(&buffer).await?;
284                buffer.clear();
285            }
286
287            buf_writer.shutdown().await.map_err(DataFusionError::from)
288        });
289    }
290
291    while let Some(result) = join_set.join_next().await {
292        match result {
293            Ok(res) => res?, // propagate DataFusion error
294            Err(e) => {
295                if e.is_panic() {
296                    std::panic::resume_unwind(e.into_panic());
297                } else {
298                    unreachable!();
299                }
300            }
301        }
302    }
303
304    Ok(())
305}