datafusion_datasource_json/
source.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Execution plan for reading line-delimited JSON files
19
20use std::any::Any;
21use std::io::{BufReader, Read, Seek, SeekFrom};
22use std::sync::Arc;
23use std::task::Poll;
24
25use crate::file_format::JsonDecoder;
26
27use datafusion_common::error::{DataFusionError, Result};
28use datafusion_common_runtime::JoinSet;
29use datafusion_datasource::decoder::{deserialize_stream, DecoderDeserializer};
30use datafusion_datasource::file_compression_type::FileCompressionType;
31use datafusion_datasource::file_meta::FileMeta;
32use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
33use datafusion_datasource::{calculate_range, ListingTableUrl, RangeCalculation};
34use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
35
36use arrow::json::ReaderBuilder;
37use arrow::{datatypes::SchemaRef, json};
38use datafusion_common::{Constraints, Statistics};
39use datafusion_datasource::file::FileSource;
40use datafusion_datasource::file_scan_config::FileScanConfig;
41use datafusion_datasource::source::DataSourceExec;
42use datafusion_execution::{SendableRecordBatchStream, TaskContext};
43use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
44use datafusion_physical_expr_common::sort_expr::LexOrdering;
45use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
46use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
47use datafusion_physical_plan::{DisplayAs, DisplayFormatType, PlanProperties};
48
49use datafusion_datasource::file_groups::FileGroup;
50use futures::{StreamExt, TryStreamExt};
51use object_store::buffered::BufWriter;
52use object_store::{GetOptions, GetResultPayload, ObjectStore};
53use tokio::io::AsyncWriteExt;
54
55/// Execution plan for scanning NdJson data source
56#[derive(Debug, Clone)]
57#[deprecated(since = "46.0.0", note = "use DataSourceExec instead")]
58pub struct NdJsonExec {
59    inner: DataSourceExec,
60    base_config: FileScanConfig,
61    file_compression_type: FileCompressionType,
62}
63
64#[allow(unused, deprecated)]
65impl NdJsonExec {
66    /// Create a new JSON reader execution plan provided base configurations
67    pub fn new(
68        base_config: FileScanConfig,
69        file_compression_type: FileCompressionType,
70    ) -> Self {
71        let (
72            projected_schema,
73            projected_constraints,
74            projected_statistics,
75            projected_output_ordering,
76        ) = base_config.project();
77        let cache = Self::compute_properties(
78            projected_schema,
79            &projected_output_ordering,
80            projected_constraints,
81            &base_config,
82        );
83
84        let json = JsonSource::default();
85        let base_config = base_config
86            .with_file_compression_type(file_compression_type)
87            .with_source(Arc::new(json));
88
89        Self {
90            inner: DataSourceExec::new(Arc::new(base_config.clone())),
91            file_compression_type: base_config.file_compression_type,
92            base_config,
93        }
94    }
95
96    /// Ref to the base configs
97    pub fn base_config(&self) -> &FileScanConfig {
98        &self.base_config
99    }
100
101    /// Ref to file compression type
102    pub fn file_compression_type(&self) -> &FileCompressionType {
103        &self.file_compression_type
104    }
105
106    fn file_scan_config(&self) -> FileScanConfig {
107        self.inner
108            .data_source()
109            .as_any()
110            .downcast_ref::<FileScanConfig>()
111            .unwrap()
112            .clone()
113    }
114
115    fn json_source(&self) -> JsonSource {
116        let source = self.file_scan_config();
117        source
118            .file_source()
119            .as_any()
120            .downcast_ref::<JsonSource>()
121            .unwrap()
122            .clone()
123    }
124
125    fn output_partitioning_helper(file_scan_config: &FileScanConfig) -> Partitioning {
126        Partitioning::UnknownPartitioning(file_scan_config.file_groups.len())
127    }
128
129    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
130    fn compute_properties(
131        schema: SchemaRef,
132        orderings: &[LexOrdering],
133        constraints: Constraints,
134        file_scan_config: &FileScanConfig,
135    ) -> PlanProperties {
136        // Equivalence Properties
137        let eq_properties = EquivalenceProperties::new_with_orderings(schema, orderings)
138            .with_constraints(constraints);
139
140        PlanProperties::new(
141            eq_properties,
142            Self::output_partitioning_helper(file_scan_config), // Output Partitioning
143            EmissionType::Incremental,
144            Boundedness::Bounded,
145        )
146    }
147
148    fn with_file_groups(mut self, file_groups: Vec<FileGroup>) -> Self {
149        self.base_config.file_groups = file_groups.clone();
150        let mut file_source = self.file_scan_config();
151        file_source = file_source.with_file_groups(file_groups);
152        self.inner = self.inner.with_data_source(Arc::new(file_source));
153        self
154    }
155}
156
157#[allow(unused, deprecated)]
158impl DisplayAs for NdJsonExec {
159    fn fmt_as(
160        &self,
161        t: DisplayFormatType,
162        f: &mut std::fmt::Formatter,
163    ) -> std::fmt::Result {
164        self.inner.fmt_as(t, f)
165    }
166}
167
168#[allow(unused, deprecated)]
169impl ExecutionPlan for NdJsonExec {
170    fn name(&self) -> &'static str {
171        "NdJsonExec"
172    }
173
174    fn as_any(&self) -> &dyn Any {
175        self
176    }
177    fn properties(&self) -> &PlanProperties {
178        self.inner.properties()
179    }
180
181    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
182        Vec::new()
183    }
184
185    fn with_new_children(
186        self: Arc<Self>,
187        _: Vec<Arc<dyn ExecutionPlan>>,
188    ) -> Result<Arc<dyn ExecutionPlan>> {
189        Ok(self)
190    }
191
192    fn repartitioned(
193        &self,
194        target_partitions: usize,
195        config: &datafusion_common::config::ConfigOptions,
196    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
197        self.inner.repartitioned(target_partitions, config)
198    }
199
200    fn execute(
201        &self,
202        partition: usize,
203        context: Arc<TaskContext>,
204    ) -> Result<SendableRecordBatchStream> {
205        self.inner.execute(partition, context)
206    }
207
208    fn statistics(&self) -> Result<Statistics> {
209        self.inner.statistics()
210    }
211
212    fn metrics(&self) -> Option<MetricsSet> {
213        self.inner.metrics()
214    }
215
216    fn fetch(&self) -> Option<usize> {
217        self.inner.fetch()
218    }
219
220    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
221        self.inner.with_fetch(limit)
222    }
223}
224
225/// A [`FileOpener`] that opens a JSON file and yields a [`FileOpenFuture`]
226pub struct JsonOpener {
227    batch_size: usize,
228    projected_schema: SchemaRef,
229    file_compression_type: FileCompressionType,
230    object_store: Arc<dyn ObjectStore>,
231}
232
233impl JsonOpener {
234    /// Returns a  [`JsonOpener`]
235    pub fn new(
236        batch_size: usize,
237        projected_schema: SchemaRef,
238        file_compression_type: FileCompressionType,
239        object_store: Arc<dyn ObjectStore>,
240    ) -> Self {
241        Self {
242            batch_size,
243            projected_schema,
244            file_compression_type,
245            object_store,
246        }
247    }
248}
249
250/// JsonSource holds the extra configuration that is necessary for [`JsonOpener`]
251#[derive(Clone, Default)]
252pub struct JsonSource {
253    batch_size: Option<usize>,
254    metrics: ExecutionPlanMetricsSet,
255    projected_statistics: Option<Statistics>,
256}
257
258impl JsonSource {
259    /// Initialize a JsonSource with default values
260    pub fn new() -> Self {
261        Self::default()
262    }
263}
264
265impl FileSource for JsonSource {
266    fn create_file_opener(
267        &self,
268        object_store: Arc<dyn ObjectStore>,
269        base_config: &FileScanConfig,
270        _partition: usize,
271    ) -> Arc<dyn FileOpener> {
272        Arc::new(JsonOpener {
273            batch_size: self
274                .batch_size
275                .expect("Batch size must set before creating opener"),
276            projected_schema: base_config.projected_file_schema(),
277            file_compression_type: base_config.file_compression_type,
278            object_store,
279        })
280    }
281
282    fn as_any(&self) -> &dyn Any {
283        self
284    }
285
286    fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
287        let mut conf = self.clone();
288        conf.batch_size = Some(batch_size);
289        Arc::new(conf)
290    }
291
292    fn with_schema(&self, _schema: SchemaRef) -> Arc<dyn FileSource> {
293        Arc::new(Self { ..self.clone() })
294    }
295    fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
296        let mut conf = self.clone();
297        conf.projected_statistics = Some(statistics);
298        Arc::new(conf)
299    }
300
301    fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource> {
302        Arc::new(Self { ..self.clone() })
303    }
304
305    fn metrics(&self) -> &ExecutionPlanMetricsSet {
306        &self.metrics
307    }
308
309    fn statistics(&self) -> Result<Statistics> {
310        let statistics = &self.projected_statistics;
311        Ok(statistics
312            .clone()
313            .expect("projected_statistics must be set to call"))
314    }
315
316    fn file_type(&self) -> &str {
317        "json"
318    }
319}
320
321impl FileOpener for JsonOpener {
322    /// Open a partitioned NDJSON file.
323    ///
324    /// If `file_meta.range` is `None`, the entire file is opened.
325    /// Else `file_meta.range` is `Some(FileRange{start, end})`, which corresponds to the byte range [start, end) within the file.
326    ///
327    /// Note: `start` or `end` might be in the middle of some lines. In such cases, the following rules
328    /// are applied to determine which lines to read:
329    /// 1. The first line of the partition is the line in which the index of the first character >= `start`.
330    /// 2. The last line of the partition is the line in which the byte at position `end - 1` resides.
331    fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture> {
332        let store = Arc::clone(&self.object_store);
333        let schema = Arc::clone(&self.projected_schema);
334        let batch_size = self.batch_size;
335        let file_compression_type = self.file_compression_type.to_owned();
336
337        Ok(Box::pin(async move {
338            let calculated_range = calculate_range(&file_meta, &store, None).await?;
339
340            let range = match calculated_range {
341                RangeCalculation::Range(None) => None,
342                RangeCalculation::Range(Some(range)) => Some(range.into()),
343                RangeCalculation::TerminateEarly => {
344                    return Ok(
345                        futures::stream::poll_fn(move |_| Poll::Ready(None)).boxed()
346                    )
347                }
348            };
349
350            let options = GetOptions {
351                range,
352                ..Default::default()
353            };
354
355            let result = store.get_opts(file_meta.location(), options).await?;
356
357            match result.payload {
358                #[cfg(not(target_arch = "wasm32"))]
359                GetResultPayload::File(mut file, _) => {
360                    let bytes = match file_meta.range {
361                        None => file_compression_type.convert_read(file)?,
362                        Some(_) => {
363                            file.seek(SeekFrom::Start(result.range.start as _))?;
364                            let limit = result.range.end - result.range.start;
365                            file_compression_type.convert_read(file.take(limit as u64))?
366                        }
367                    };
368
369                    let reader = ReaderBuilder::new(schema)
370                        .with_batch_size(batch_size)
371                        .build(BufReader::new(bytes))?;
372
373                    Ok(futures::stream::iter(reader).boxed())
374                }
375                GetResultPayload::Stream(s) => {
376                    let s = s.map_err(DataFusionError::from);
377
378                    let decoder = ReaderBuilder::new(schema)
379                        .with_batch_size(batch_size)
380                        .build_decoder()?;
381                    let input = file_compression_type.convert_stream(s.boxed())?.fuse();
382
383                    Ok(deserialize_stream(
384                        input,
385                        DecoderDeserializer::new(JsonDecoder::new(decoder)),
386                    ))
387                }
388            }
389        }))
390    }
391}
392
393pub async fn plan_to_json(
394    task_ctx: Arc<TaskContext>,
395    plan: Arc<dyn ExecutionPlan>,
396    path: impl AsRef<str>,
397) -> Result<()> {
398    let path = path.as_ref();
399    let parsed = ListingTableUrl::parse(path)?;
400    let object_store_url = parsed.object_store();
401    let store = task_ctx.runtime_env().object_store(&object_store_url)?;
402    let mut join_set = JoinSet::new();
403    for i in 0..plan.output_partitioning().partition_count() {
404        let storeref = Arc::clone(&store);
405        let plan: Arc<dyn ExecutionPlan> = Arc::clone(&plan);
406        let filename = format!("{}/part-{i}.json", parsed.prefix());
407        let file = object_store::path::Path::parse(filename)?;
408
409        let mut stream = plan.execute(i, Arc::clone(&task_ctx))?;
410        join_set.spawn(async move {
411            let mut buf_writer = BufWriter::new(storeref, file.clone());
412
413            let mut buffer = Vec::with_capacity(1024);
414            while let Some(batch) = stream.next().await.transpose()? {
415                let mut writer = json::LineDelimitedWriter::new(buffer);
416                writer.write(&batch)?;
417                buffer = writer.into_inner();
418                buf_writer.write_all(&buffer).await?;
419                buffer.clear();
420            }
421
422            buf_writer.shutdown().await.map_err(DataFusionError::from)
423        });
424    }
425
426    while let Some(result) = join_set.join_next().await {
427        match result {
428            Ok(res) => res?, // propagate DataFusion error
429            Err(e) => {
430                if e.is_panic() {
431                    std::panic::resume_unwind(e.into_panic());
432                } else {
433                    unreachable!();
434                }
435            }
436        }
437    }
438
439    Ok(())
440}