datafusion_datasource_json/
source.rs1use std::any::Any;
21use std::io::{BufReader, Read, Seek, SeekFrom};
22use std::sync::Arc;
23use std::task::Poll;
24
25use crate::file_format::JsonDecoder;
26
27use datafusion_common::error::{DataFusionError, Result};
28use datafusion_common_runtime::JoinSet;
29use datafusion_datasource::decoder::{deserialize_stream, DecoderDeserializer};
30use datafusion_datasource::file_compression_type::FileCompressionType;
31use datafusion_datasource::file_meta::FileMeta;
32use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
33use datafusion_datasource::{calculate_range, ListingTableUrl, RangeCalculation};
34use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
35
36use arrow::json::ReaderBuilder;
37use arrow::{datatypes::SchemaRef, json};
38use datafusion_common::{Constraints, Statistics};
39use datafusion_datasource::file::FileSource;
40use datafusion_datasource::file_scan_config::FileScanConfig;
41use datafusion_datasource::source::DataSourceExec;
42use datafusion_execution::{SendableRecordBatchStream, TaskContext};
43use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
44use datafusion_physical_expr_common::sort_expr::LexOrdering;
45use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
46use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
47use datafusion_physical_plan::{DisplayAs, DisplayFormatType, PlanProperties};
48
49use datafusion_datasource::file_groups::FileGroup;
50use futures::{StreamExt, TryStreamExt};
51use object_store::buffered::BufWriter;
52use object_store::{GetOptions, GetResultPayload, ObjectStore};
53use tokio::io::AsyncWriteExt;
54
55#[derive(Debug, Clone)]
57#[deprecated(since = "46.0.0", note = "use DataSourceExec instead")]
58pub struct NdJsonExec {
59 inner: DataSourceExec,
60 base_config: FileScanConfig,
61 file_compression_type: FileCompressionType,
62}
63
64#[allow(unused, deprecated)]
65impl NdJsonExec {
66 pub fn new(
68 base_config: FileScanConfig,
69 file_compression_type: FileCompressionType,
70 ) -> Self {
71 let (
72 projected_schema,
73 projected_constraints,
74 projected_statistics,
75 projected_output_ordering,
76 ) = base_config.project();
77 let cache = Self::compute_properties(
78 projected_schema,
79 &projected_output_ordering,
80 projected_constraints,
81 &base_config,
82 );
83
84 let json = JsonSource::default();
85 let base_config = base_config
86 .with_file_compression_type(file_compression_type)
87 .with_source(Arc::new(json));
88
89 Self {
90 inner: DataSourceExec::new(Arc::new(base_config.clone())),
91 file_compression_type: base_config.file_compression_type,
92 base_config,
93 }
94 }
95
96 pub fn base_config(&self) -> &FileScanConfig {
98 &self.base_config
99 }
100
101 pub fn file_compression_type(&self) -> &FileCompressionType {
103 &self.file_compression_type
104 }
105
106 fn file_scan_config(&self) -> FileScanConfig {
107 self.inner
108 .data_source()
109 .as_any()
110 .downcast_ref::<FileScanConfig>()
111 .unwrap()
112 .clone()
113 }
114
115 fn json_source(&self) -> JsonSource {
116 let source = self.file_scan_config();
117 source
118 .file_source()
119 .as_any()
120 .downcast_ref::<JsonSource>()
121 .unwrap()
122 .clone()
123 }
124
125 fn output_partitioning_helper(file_scan_config: &FileScanConfig) -> Partitioning {
126 Partitioning::UnknownPartitioning(file_scan_config.file_groups.len())
127 }
128
129 fn compute_properties(
131 schema: SchemaRef,
132 orderings: &[LexOrdering],
133 constraints: Constraints,
134 file_scan_config: &FileScanConfig,
135 ) -> PlanProperties {
136 let eq_properties = EquivalenceProperties::new_with_orderings(schema, orderings)
138 .with_constraints(constraints);
139
140 PlanProperties::new(
141 eq_properties,
142 Self::output_partitioning_helper(file_scan_config), EmissionType::Incremental,
144 Boundedness::Bounded,
145 )
146 }
147
148 fn with_file_groups(mut self, file_groups: Vec<FileGroup>) -> Self {
149 self.base_config.file_groups = file_groups.clone();
150 let mut file_source = self.file_scan_config();
151 file_source = file_source.with_file_groups(file_groups);
152 self.inner = self.inner.with_data_source(Arc::new(file_source));
153 self
154 }
155}
156
157#[allow(unused, deprecated)]
158impl DisplayAs for NdJsonExec {
159 fn fmt_as(
160 &self,
161 t: DisplayFormatType,
162 f: &mut std::fmt::Formatter,
163 ) -> std::fmt::Result {
164 self.inner.fmt_as(t, f)
165 }
166}
167
168#[allow(unused, deprecated)]
169impl ExecutionPlan for NdJsonExec {
170 fn name(&self) -> &'static str {
171 "NdJsonExec"
172 }
173
174 fn as_any(&self) -> &dyn Any {
175 self
176 }
177 fn properties(&self) -> &PlanProperties {
178 self.inner.properties()
179 }
180
181 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
182 Vec::new()
183 }
184
185 fn with_new_children(
186 self: Arc<Self>,
187 _: Vec<Arc<dyn ExecutionPlan>>,
188 ) -> Result<Arc<dyn ExecutionPlan>> {
189 Ok(self)
190 }
191
192 fn repartitioned(
193 &self,
194 target_partitions: usize,
195 config: &datafusion_common::config::ConfigOptions,
196 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
197 self.inner.repartitioned(target_partitions, config)
198 }
199
200 fn execute(
201 &self,
202 partition: usize,
203 context: Arc<TaskContext>,
204 ) -> Result<SendableRecordBatchStream> {
205 self.inner.execute(partition, context)
206 }
207
208 fn statistics(&self) -> Result<Statistics> {
209 self.inner.statistics()
210 }
211
212 fn metrics(&self) -> Option<MetricsSet> {
213 self.inner.metrics()
214 }
215
216 fn fetch(&self) -> Option<usize> {
217 self.inner.fetch()
218 }
219
220 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
221 self.inner.with_fetch(limit)
222 }
223}
224
225pub struct JsonOpener {
227 batch_size: usize,
228 projected_schema: SchemaRef,
229 file_compression_type: FileCompressionType,
230 object_store: Arc<dyn ObjectStore>,
231}
232
233impl JsonOpener {
234 pub fn new(
236 batch_size: usize,
237 projected_schema: SchemaRef,
238 file_compression_type: FileCompressionType,
239 object_store: Arc<dyn ObjectStore>,
240 ) -> Self {
241 Self {
242 batch_size,
243 projected_schema,
244 file_compression_type,
245 object_store,
246 }
247 }
248}
249
250#[derive(Clone, Default)]
252pub struct JsonSource {
253 batch_size: Option<usize>,
254 metrics: ExecutionPlanMetricsSet,
255 projected_statistics: Option<Statistics>,
256}
257
258impl JsonSource {
259 pub fn new() -> Self {
261 Self::default()
262 }
263}
264
265impl FileSource for JsonSource {
266 fn create_file_opener(
267 &self,
268 object_store: Arc<dyn ObjectStore>,
269 base_config: &FileScanConfig,
270 _partition: usize,
271 ) -> Arc<dyn FileOpener> {
272 Arc::new(JsonOpener {
273 batch_size: self
274 .batch_size
275 .expect("Batch size must set before creating opener"),
276 projected_schema: base_config.projected_file_schema(),
277 file_compression_type: base_config.file_compression_type,
278 object_store,
279 })
280 }
281
282 fn as_any(&self) -> &dyn Any {
283 self
284 }
285
286 fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
287 let mut conf = self.clone();
288 conf.batch_size = Some(batch_size);
289 Arc::new(conf)
290 }
291
292 fn with_schema(&self, _schema: SchemaRef) -> Arc<dyn FileSource> {
293 Arc::new(Self { ..self.clone() })
294 }
295 fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
296 let mut conf = self.clone();
297 conf.projected_statistics = Some(statistics);
298 Arc::new(conf)
299 }
300
301 fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource> {
302 Arc::new(Self { ..self.clone() })
303 }
304
305 fn metrics(&self) -> &ExecutionPlanMetricsSet {
306 &self.metrics
307 }
308
309 fn statistics(&self) -> Result<Statistics> {
310 let statistics = &self.projected_statistics;
311 Ok(statistics
312 .clone()
313 .expect("projected_statistics must be set to call"))
314 }
315
316 fn file_type(&self) -> &str {
317 "json"
318 }
319}
320
321impl FileOpener for JsonOpener {
322 fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture> {
332 let store = Arc::clone(&self.object_store);
333 let schema = Arc::clone(&self.projected_schema);
334 let batch_size = self.batch_size;
335 let file_compression_type = self.file_compression_type.to_owned();
336
337 Ok(Box::pin(async move {
338 let calculated_range = calculate_range(&file_meta, &store, None).await?;
339
340 let range = match calculated_range {
341 RangeCalculation::Range(None) => None,
342 RangeCalculation::Range(Some(range)) => Some(range.into()),
343 RangeCalculation::TerminateEarly => {
344 return Ok(
345 futures::stream::poll_fn(move |_| Poll::Ready(None)).boxed()
346 )
347 }
348 };
349
350 let options = GetOptions {
351 range,
352 ..Default::default()
353 };
354
355 let result = store.get_opts(file_meta.location(), options).await?;
356
357 match result.payload {
358 #[cfg(not(target_arch = "wasm32"))]
359 GetResultPayload::File(mut file, _) => {
360 let bytes = match file_meta.range {
361 None => file_compression_type.convert_read(file)?,
362 Some(_) => {
363 file.seek(SeekFrom::Start(result.range.start as _))?;
364 let limit = result.range.end - result.range.start;
365 file_compression_type.convert_read(file.take(limit as u64))?
366 }
367 };
368
369 let reader = ReaderBuilder::new(schema)
370 .with_batch_size(batch_size)
371 .build(BufReader::new(bytes))?;
372
373 Ok(futures::stream::iter(reader).boxed())
374 }
375 GetResultPayload::Stream(s) => {
376 let s = s.map_err(DataFusionError::from);
377
378 let decoder = ReaderBuilder::new(schema)
379 .with_batch_size(batch_size)
380 .build_decoder()?;
381 let input = file_compression_type.convert_stream(s.boxed())?.fuse();
382
383 Ok(deserialize_stream(
384 input,
385 DecoderDeserializer::new(JsonDecoder::new(decoder)),
386 ))
387 }
388 }
389 }))
390 }
391}
392
393pub async fn plan_to_json(
394 task_ctx: Arc<TaskContext>,
395 plan: Arc<dyn ExecutionPlan>,
396 path: impl AsRef<str>,
397) -> Result<()> {
398 let path = path.as_ref();
399 let parsed = ListingTableUrl::parse(path)?;
400 let object_store_url = parsed.object_store();
401 let store = task_ctx.runtime_env().object_store(&object_store_url)?;
402 let mut join_set = JoinSet::new();
403 for i in 0..plan.output_partitioning().partition_count() {
404 let storeref = Arc::clone(&store);
405 let plan: Arc<dyn ExecutionPlan> = Arc::clone(&plan);
406 let filename = format!("{}/part-{i}.json", parsed.prefix());
407 let file = object_store::path::Path::parse(filename)?;
408
409 let mut stream = plan.execute(i, Arc::clone(&task_ctx))?;
410 join_set.spawn(async move {
411 let mut buf_writer = BufWriter::new(storeref, file.clone());
412
413 let mut buffer = Vec::with_capacity(1024);
414 while let Some(batch) = stream.next().await.transpose()? {
415 let mut writer = json::LineDelimitedWriter::new(buffer);
416 writer.write(&batch)?;
417 buffer = writer.into_inner();
418 buf_writer.write_all(&buffer).await?;
419 buffer.clear();
420 }
421
422 buf_writer.shutdown().await.map_err(DataFusionError::from)
423 });
424 }
425
426 while let Some(result) = join_set.join_next().await {
427 match result {
428 Ok(res) => res?, Err(e) => {
430 if e.is_panic() {
431 std::panic::resume_unwind(e.into_panic());
432 } else {
433 unreachable!();
434 }
435 }
436 }
437 }
438
439 Ok(())
440}