datafusion_datasource_json/
file_format.rs1use std::any::Any;
21use std::collections::HashMap;
22use std::fmt;
23use std::fmt::Debug;
24use std::io::{BufReader, Read};
25use std::sync::Arc;
26
27use crate::source::JsonSource;
28
29use arrow::array::RecordBatch;
30use arrow::datatypes::{Schema, SchemaRef};
31use arrow::error::ArrowError;
32use arrow::json;
33use arrow::json::reader::{ValueIter, infer_json_schema_from_iterator};
34use bytes::{Buf, Bytes};
35use datafusion_common::config::{ConfigField, ConfigFileType, JsonOptions};
36use datafusion_common::file_options::json_writer::JsonWriterOptions;
37use datafusion_common::{
38 DEFAULT_JSON_EXTENSION, GetExt, Result, Statistics, not_impl_err,
39};
40use datafusion_common_runtime::SpawnedTask;
41use datafusion_datasource::TableSchema;
42use datafusion_datasource::decoder::Decoder;
43use datafusion_datasource::display::FileGroupDisplay;
44use datafusion_datasource::file::FileSource;
45use datafusion_datasource::file_compression_type::FileCompressionType;
46use datafusion_datasource::file_format::{
47 DEFAULT_SCHEMA_INFER_MAX_RECORD, FileFormat, FileFormatFactory,
48};
49use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
50use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig};
51use datafusion_datasource::sink::{DataSink, DataSinkExec};
52use datafusion_datasource::source::DataSourceExec;
53use datafusion_datasource::write::BatchSerializer;
54use datafusion_datasource::write::demux::DemuxedStreamReceiver;
55use datafusion_datasource::write::orchestration::spawn_writer_tasks_and_join;
56use datafusion_execution::{SendableRecordBatchStream, TaskContext};
57use datafusion_expr::dml::InsertOp;
58use datafusion_physical_expr_common::sort_expr::LexRequirement;
59use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
60use datafusion_session::Session;
61
62use crate::utils::JsonArrayToNdjsonReader;
63use async_trait::async_trait;
64use object_store::{GetResultPayload, ObjectMeta, ObjectStore, ObjectStoreExt};
65
66#[derive(Default)]
67pub struct JsonFormatFactory {
69 pub options: Option<JsonOptions>,
71}
72
73impl JsonFormatFactory {
74 pub fn new() -> Self {
76 Self { options: None }
77 }
78
79 pub fn new_with_options(options: JsonOptions) -> Self {
81 Self {
82 options: Some(options),
83 }
84 }
85}
86
87impl FileFormatFactory for JsonFormatFactory {
88 fn create(
89 &self,
90 state: &dyn Session,
91 format_options: &HashMap<String, String>,
92 ) -> Result<Arc<dyn FileFormat>> {
93 let json_options = match &self.options {
94 None => {
95 let mut table_options = state.default_table_options();
96 table_options.set_config_format(ConfigFileType::JSON);
97 table_options.alter_with_string_hash_map(format_options)?;
98 table_options.json
99 }
100 Some(json_options) => {
101 let mut json_options = json_options.clone();
102 for (k, v) in format_options {
103 json_options.set(k, v)?;
104 }
105 json_options
106 }
107 };
108
109 Ok(Arc::new(JsonFormat::default().with_options(json_options)))
110 }
111
112 fn default(&self) -> Arc<dyn FileFormat> {
113 Arc::new(JsonFormat::default())
114 }
115
116 fn as_any(&self) -> &dyn Any {
117 self
118 }
119}
120
121impl GetExt for JsonFormatFactory {
122 fn get_ext(&self) -> String {
123 DEFAULT_JSON_EXTENSION[1..].to_string()
125 }
126}
127
128impl Debug for JsonFormatFactory {
129 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
130 f.debug_struct("JsonFormatFactory")
131 .field("options", &self.options)
132 .finish()
133 }
134}
135
136#[derive(Debug, Default)]
157pub struct JsonFormat {
158 options: JsonOptions,
159}
160
161impl JsonFormat {
162 pub fn with_options(mut self, options: JsonOptions) -> Self {
164 self.options = options;
165 self
166 }
167
168 pub fn options(&self) -> &JsonOptions {
170 &self.options
171 }
172
173 pub fn with_schema_infer_max_rec(mut self, max_rec: usize) -> Self {
176 self.options.schema_infer_max_rec = Some(max_rec);
177 self
178 }
179
180 pub fn with_file_compression_type(
183 mut self,
184 file_compression_type: FileCompressionType,
185 ) -> Self {
186 self.options.compression = file_compression_type.into();
187 self
188 }
189
190 pub fn with_newline_delimited(mut self, newline_delimited: bool) -> Self {
203 self.options.newline_delimited = newline_delimited;
204 self
205 }
206
207 pub fn is_newline_delimited(&self) -> bool {
209 self.options.newline_delimited
210 }
211}
212
213fn infer_schema_from_json_array<R: Read>(
223 reader: R,
224 max_records: usize,
225) -> Result<(Schema, usize)> {
226 let ndjson_reader = JsonArrayToNdjsonReader::new(reader);
227
228 let iter = ValueIter::new(ndjson_reader, None);
229 let mut count = 0;
230
231 let schema = infer_json_schema_from_iterator(iter.take_while(|_| {
232 let should_take = count < max_records;
233 if should_take {
234 count += 1;
235 }
236 should_take
237 }))?;
238
239 Ok((schema, count))
240}
241
242#[async_trait]
243impl FileFormat for JsonFormat {
244 fn as_any(&self) -> &dyn Any {
245 self
246 }
247
248 fn get_ext(&self) -> String {
249 JsonFormatFactory::new().get_ext()
250 }
251
252 fn get_ext_with_compression(
253 &self,
254 file_compression_type: &FileCompressionType,
255 ) -> Result<String> {
256 let ext = self.get_ext();
257 Ok(format!("{}{}", ext, file_compression_type.get_ext()))
258 }
259
260 fn compression_type(&self) -> Option<FileCompressionType> {
261 Some(self.options.compression.into())
262 }
263
264 async fn infer_schema(
265 &self,
266 _state: &dyn Session,
267 store: &Arc<dyn ObjectStore>,
268 objects: &[ObjectMeta],
269 ) -> Result<SchemaRef> {
270 let mut schemas = Vec::new();
271 let mut records_to_read = self
272 .options
273 .schema_infer_max_rec
274 .unwrap_or(DEFAULT_SCHEMA_INFER_MAX_RECORD);
275 let file_compression_type = FileCompressionType::from(self.options.compression);
276 let newline_delimited = self.options.newline_delimited;
277
278 for object in objects {
279 if records_to_read == 0 {
281 break;
282 }
283
284 let r = store.as_ref().get(&object.location).await?;
285
286 let (schema, records_consumed) = match r.payload {
287 #[cfg(not(target_arch = "wasm32"))]
288 GetResultPayload::File(file, _) => {
289 let decoder = file_compression_type.convert_read(file)?;
290 let reader = BufReader::new(decoder);
291
292 if newline_delimited {
293 let iter = ValueIter::new(reader, None);
295 let mut count = 0;
296 let schema =
297 infer_json_schema_from_iterator(iter.take_while(|_| {
298 let should_take = count < records_to_read;
299 if should_take {
300 count += 1;
301 }
302 should_take
303 }))?;
304 (schema, count)
305 } else {
306 infer_schema_from_json_array(reader, records_to_read)?
308 }
309 }
310 GetResultPayload::Stream(_) => {
311 let data = r.bytes().await?;
312 let decoder = file_compression_type.convert_read(data.reader())?;
313 let reader = BufReader::new(decoder);
314
315 if newline_delimited {
316 let iter = ValueIter::new(reader, None);
317 let mut count = 0;
318 let schema =
319 infer_json_schema_from_iterator(iter.take_while(|_| {
320 let should_take = count < records_to_read;
321 if should_take {
322 count += 1;
323 }
324 should_take
325 }))?;
326 (schema, count)
327 } else {
328 infer_schema_from_json_array(reader, records_to_read)?
330 }
331 }
332 };
333
334 schemas.push(schema);
335 records_to_read = records_to_read.saturating_sub(records_consumed);
337 }
338
339 let schema = Schema::try_merge(schemas)?;
340 Ok(Arc::new(schema))
341 }
342
343 async fn infer_stats(
344 &self,
345 _state: &dyn Session,
346 _store: &Arc<dyn ObjectStore>,
347 table_schema: SchemaRef,
348 _object: &ObjectMeta,
349 ) -> Result<Statistics> {
350 Ok(Statistics::new_unknown(&table_schema))
351 }
352
353 async fn create_physical_plan(
354 &self,
355 _state: &dyn Session,
356 conf: FileScanConfig,
357 ) -> Result<Arc<dyn ExecutionPlan>> {
358 let conf = FileScanConfigBuilder::from(conf)
359 .with_file_compression_type(FileCompressionType::from(
360 self.options.compression,
361 ))
362 .build();
363 Ok(DataSourceExec::from_data_source(conf))
364 }
365
366 async fn create_writer_physical_plan(
367 &self,
368 input: Arc<dyn ExecutionPlan>,
369 _state: &dyn Session,
370 conf: FileSinkConfig,
371 order_requirements: Option<LexRequirement>,
372 ) -> Result<Arc<dyn ExecutionPlan>> {
373 if conf.insert_op != InsertOp::Append {
374 return not_impl_err!("Overwrites are not implemented yet for Json");
375 }
376
377 let writer_options = JsonWriterOptions::try_from(&self.options)?;
378
379 let sink = Arc::new(JsonSink::new(conf, writer_options));
380
381 Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _)
382 }
383
384 fn file_source(&self, table_schema: TableSchema) -> Arc<dyn FileSource> {
385 Arc::new(
386 JsonSource::new(table_schema)
387 .with_newline_delimited(self.options.newline_delimited),
388 )
389 }
390}
391
392impl Default for JsonSerializer {
393 fn default() -> Self {
394 Self::new()
395 }
396}
397
398pub struct JsonSerializer {}
400
401impl JsonSerializer {
402 pub fn new() -> Self {
404 Self {}
405 }
406}
407
408impl BatchSerializer for JsonSerializer {
409 fn serialize(&self, batch: RecordBatch, _initial: bool) -> Result<Bytes> {
410 let mut buffer = Vec::with_capacity(4096);
411 let mut writer = json::LineDelimitedWriter::new(&mut buffer);
412 writer.write(&batch)?;
413 Ok(Bytes::from(buffer))
414 }
415}
416
417pub struct JsonSink {
419 config: FileSinkConfig,
421 writer_options: JsonWriterOptions,
423}
424
425impl Debug for JsonSink {
426 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
427 f.debug_struct("JsonSink").finish()
428 }
429}
430
431impl DisplayAs for JsonSink {
432 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
433 match t {
434 DisplayFormatType::Default | DisplayFormatType::Verbose => {
435 write!(f, "JsonSink(file_groups=",)?;
436 FileGroupDisplay(&self.config.file_group).fmt_as(t, f)?;
437 write!(f, ")")
438 }
439 DisplayFormatType::TreeRender => {
440 writeln!(f, "format: json")?;
441 write!(f, "file={}", &self.config.original_url)
442 }
443 }
444 }
445}
446
447impl JsonSink {
448 pub fn new(config: FileSinkConfig, writer_options: JsonWriterOptions) -> Self {
450 Self {
451 config,
452 writer_options,
453 }
454 }
455
456 pub fn writer_options(&self) -> &JsonWriterOptions {
458 &self.writer_options
459 }
460}
461
462#[async_trait]
463impl FileSink for JsonSink {
464 fn config(&self) -> &FileSinkConfig {
465 &self.config
466 }
467
468 async fn spawn_writer_tasks_and_join(
469 &self,
470 context: &Arc<TaskContext>,
471 demux_task: SpawnedTask<Result<()>>,
472 file_stream_rx: DemuxedStreamReceiver,
473 object_store: Arc<dyn ObjectStore>,
474 ) -> Result<u64> {
475 let serializer = Arc::new(JsonSerializer::new()) as _;
476 spawn_writer_tasks_and_join(
477 context,
478 serializer,
479 self.writer_options.compression.into(),
480 self.writer_options.compression_level,
481 object_store,
482 demux_task,
483 file_stream_rx,
484 )
485 .await
486 }
487}
488
489#[async_trait]
490impl DataSink for JsonSink {
491 fn as_any(&self) -> &dyn Any {
492 self
493 }
494
495 fn schema(&self) -> &SchemaRef {
496 self.config.output_schema()
497 }
498
499 async fn write_all(
500 &self,
501 data: SendableRecordBatchStream,
502 context: &Arc<TaskContext>,
503 ) -> Result<u64> {
504 FileSink::write_all(self, data, context).await
505 }
506}
507
508#[derive(Debug)]
509pub struct JsonDecoder {
510 inner: json::reader::Decoder,
511}
512
513impl JsonDecoder {
514 pub fn new(decoder: json::reader::Decoder) -> Self {
515 Self { inner: decoder }
516 }
517}
518
519impl Decoder for JsonDecoder {
520 fn decode(&mut self, buf: &[u8]) -> Result<usize, ArrowError> {
521 self.inner.decode(buf)
522 }
523
524 fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
525 self.inner.flush()
526 }
527
528 fn can_flush_early(&self) -> bool {
529 false
530 }
531}