datafusion_datasource_json/
file_format.rs1use std::collections::HashMap;
21use std::fmt;
22use std::fmt::Debug;
23use std::io::{BufReader, Read};
24use std::sync::Arc;
25
26use crate::source::JsonSource;
27
28use arrow::array::RecordBatch;
29use arrow::datatypes::{Schema, SchemaRef};
30use arrow::error::ArrowError;
31use arrow::json;
32use arrow::json::reader::{ValueIter, infer_json_schema_from_iterator};
33use bytes::{Buf, Bytes};
34use datafusion_common::config::{ConfigField, ConfigFileType, JsonOptions};
35use datafusion_common::file_options::json_writer::JsonWriterOptions;
36use datafusion_common::{
37 DEFAULT_JSON_EXTENSION, GetExt, Result, Statistics, not_impl_err,
38};
39use datafusion_common_runtime::SpawnedTask;
40use datafusion_datasource::TableSchema;
41use datafusion_datasource::decoder::Decoder;
42use datafusion_datasource::display::FileGroupDisplay;
43use datafusion_datasource::file::FileSource;
44use datafusion_datasource::file_compression_type::FileCompressionType;
45use datafusion_datasource::file_format::{
46 DEFAULT_SCHEMA_INFER_MAX_RECORD, FileFormat, FileFormatFactory,
47};
48use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
49use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig};
50use datafusion_datasource::sink::{DataSink, DataSinkExec};
51use datafusion_datasource::source::DataSourceExec;
52use datafusion_datasource::write::BatchSerializer;
53use datafusion_datasource::write::demux::DemuxedStreamReceiver;
54use datafusion_datasource::write::orchestration::spawn_writer_tasks_and_join;
55use datafusion_execution::{SendableRecordBatchStream, TaskContext};
56use datafusion_expr::dml::InsertOp;
57use datafusion_physical_expr_common::sort_expr::LexRequirement;
58use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
59use datafusion_session::Session;
60
61use crate::utils::JsonArrayToNdjsonReader;
62use async_trait::async_trait;
63use object_store::{GetResultPayload, ObjectMeta, ObjectStore, ObjectStoreExt};
64
65#[derive(Default)]
66pub struct JsonFormatFactory {
68 pub options: Option<JsonOptions>,
70}
71
72impl JsonFormatFactory {
73 pub fn new() -> Self {
75 Self { options: None }
76 }
77
78 pub fn new_with_options(options: JsonOptions) -> Self {
80 Self {
81 options: Some(options),
82 }
83 }
84}
85
86impl FileFormatFactory for JsonFormatFactory {
87 fn create(
88 &self,
89 state: &dyn Session,
90 format_options: &HashMap<String, String>,
91 ) -> Result<Arc<dyn FileFormat>> {
92 let json_options = match &self.options {
93 None => {
94 let mut table_options = state.default_table_options();
95 table_options.set_config_format(ConfigFileType::JSON);
96 table_options.alter_with_string_hash_map(format_options)?;
97 table_options.json
98 }
99 Some(json_options) => {
100 let mut json_options = json_options.clone();
101 for (k, v) in format_options {
102 json_options.set(k, v)?;
103 }
104 json_options
105 }
106 };
107
108 Ok(Arc::new(JsonFormat::default().with_options(json_options)))
109 }
110
111 fn default(&self) -> Arc<dyn FileFormat> {
112 Arc::new(JsonFormat::default())
113 }
114}
115
116impl GetExt for JsonFormatFactory {
117 fn get_ext(&self) -> String {
118 DEFAULT_JSON_EXTENSION[1..].to_string()
120 }
121}
122
123impl Debug for JsonFormatFactory {
124 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
125 f.debug_struct("JsonFormatFactory")
126 .field("options", &self.options)
127 .finish()
128 }
129}
130
131#[derive(Debug, Default)]
152pub struct JsonFormat {
153 options: JsonOptions,
154}
155
156impl JsonFormat {
157 pub fn with_options(mut self, options: JsonOptions) -> Self {
159 self.options = options;
160 self
161 }
162
163 pub fn options(&self) -> &JsonOptions {
165 &self.options
166 }
167
168 pub fn with_schema_infer_max_rec(mut self, max_rec: usize) -> Self {
171 self.options.schema_infer_max_rec = Some(max_rec);
172 self
173 }
174
175 pub fn with_file_compression_type(
178 mut self,
179 file_compression_type: FileCompressionType,
180 ) -> Self {
181 self.options.compression = file_compression_type.into();
182 self
183 }
184
185 pub fn with_newline_delimited(mut self, newline_delimited: bool) -> Self {
198 self.options.newline_delimited = newline_delimited;
199 self
200 }
201
202 pub fn is_newline_delimited(&self) -> bool {
204 self.options.newline_delimited
205 }
206}
207
208fn infer_schema_from_json_array<R: Read>(
218 reader: R,
219 max_records: usize,
220) -> Result<(Schema, usize)> {
221 let ndjson_reader = JsonArrayToNdjsonReader::new(reader);
222
223 let iter = ValueIter::new(ndjson_reader, None);
224 let mut count = 0;
225
226 let schema = infer_json_schema_from_iterator(iter.take_while(|_| {
227 let should_take = count < max_records;
228 if should_take {
229 count += 1;
230 }
231 should_take
232 }))?;
233
234 Ok((schema, count))
235}
236
237#[async_trait]
238impl FileFormat for JsonFormat {
239 fn get_ext(&self) -> String {
240 JsonFormatFactory::new().get_ext()
241 }
242
243 fn get_ext_with_compression(
244 &self,
245 file_compression_type: &FileCompressionType,
246 ) -> Result<String> {
247 let ext = self.get_ext();
248 Ok(format!("{}{}", ext, file_compression_type.get_ext()))
249 }
250
251 fn compression_type(&self) -> Option<FileCompressionType> {
252 Some(self.options.compression.into())
253 }
254
255 async fn infer_schema(
256 &self,
257 _state: &dyn Session,
258 store: &Arc<dyn ObjectStore>,
259 objects: &[ObjectMeta],
260 ) -> Result<SchemaRef> {
261 let mut schemas = Vec::new();
262 let mut records_to_read = self
263 .options
264 .schema_infer_max_rec
265 .unwrap_or(DEFAULT_SCHEMA_INFER_MAX_RECORD);
266 let file_compression_type = FileCompressionType::from(self.options.compression);
267 let newline_delimited = self.options.newline_delimited;
268
269 for object in objects {
270 if records_to_read == 0 {
272 break;
273 }
274
275 let r = store.as_ref().get(&object.location).await?;
276
277 let (schema, records_consumed) = match r.payload {
278 #[cfg(not(target_arch = "wasm32"))]
279 GetResultPayload::File(file, _) => {
280 let decoder = file_compression_type.convert_read(file)?;
281 let reader = BufReader::new(decoder);
282
283 if newline_delimited {
284 let iter = ValueIter::new(reader, None);
286 let mut count = 0;
287 let schema =
288 infer_json_schema_from_iterator(iter.take_while(|_| {
289 let should_take = count < records_to_read;
290 if should_take {
291 count += 1;
292 }
293 should_take
294 }))?;
295 (schema, count)
296 } else {
297 infer_schema_from_json_array(reader, records_to_read)?
299 }
300 }
301 GetResultPayload::Stream(_) => {
302 let data = r.bytes().await?;
303 let decoder = file_compression_type.convert_read(data.reader())?;
304 let reader = BufReader::new(decoder);
305
306 if newline_delimited {
307 let iter = ValueIter::new(reader, None);
308 let mut count = 0;
309 let schema =
310 infer_json_schema_from_iterator(iter.take_while(|_| {
311 let should_take = count < records_to_read;
312 if should_take {
313 count += 1;
314 }
315 should_take
316 }))?;
317 (schema, count)
318 } else {
319 infer_schema_from_json_array(reader, records_to_read)?
321 }
322 }
323 };
324
325 schemas.push(schema);
326 records_to_read = records_to_read.saturating_sub(records_consumed);
328 }
329
330 let schema = Schema::try_merge(schemas)?;
331 Ok(Arc::new(schema))
332 }
333
334 async fn infer_stats(
335 &self,
336 _state: &dyn Session,
337 _store: &Arc<dyn ObjectStore>,
338 table_schema: SchemaRef,
339 _object: &ObjectMeta,
340 ) -> Result<Statistics> {
341 Ok(Statistics::new_unknown(&table_schema))
342 }
343
344 async fn create_physical_plan(
345 &self,
346 _state: &dyn Session,
347 conf: FileScanConfig,
348 ) -> Result<Arc<dyn ExecutionPlan>> {
349 let conf = FileScanConfigBuilder::from(conf)
350 .with_file_compression_type(FileCompressionType::from(
351 self.options.compression,
352 ))
353 .build();
354 Ok(DataSourceExec::from_data_source(conf))
355 }
356
357 async fn create_writer_physical_plan(
358 &self,
359 input: Arc<dyn ExecutionPlan>,
360 _state: &dyn Session,
361 conf: FileSinkConfig,
362 order_requirements: Option<LexRequirement>,
363 ) -> Result<Arc<dyn ExecutionPlan>> {
364 if conf.insert_op != InsertOp::Append {
365 return not_impl_err!("Overwrites are not implemented yet for Json");
366 }
367
368 let writer_options = JsonWriterOptions::try_from(&self.options)?;
369
370 let sink = Arc::new(JsonSink::new(conf, writer_options));
371
372 Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _)
373 }
374
375 fn file_source(&self, table_schema: TableSchema) -> Arc<dyn FileSource> {
376 Arc::new(
377 JsonSource::new(table_schema)
378 .with_newline_delimited(self.options.newline_delimited),
379 )
380 }
381}
382
383impl Default for JsonSerializer {
384 fn default() -> Self {
385 Self::new()
386 }
387}
388
389pub struct JsonSerializer {}
391
392impl JsonSerializer {
393 pub fn new() -> Self {
395 Self {}
396 }
397}
398
399impl BatchSerializer for JsonSerializer {
400 fn serialize(&self, batch: RecordBatch, _initial: bool) -> Result<Bytes> {
401 let mut buffer = Vec::with_capacity(4096);
402 let mut writer = json::LineDelimitedWriter::new(&mut buffer);
403 writer.write(&batch)?;
404 Ok(Bytes::from(buffer))
405 }
406}
407
408pub struct JsonSink {
410 config: FileSinkConfig,
412 writer_options: JsonWriterOptions,
414}
415
416impl Debug for JsonSink {
417 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
418 f.debug_struct("JsonSink").finish()
419 }
420}
421
422impl DisplayAs for JsonSink {
423 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
424 match t {
425 DisplayFormatType::Default | DisplayFormatType::Verbose => {
426 write!(f, "JsonSink(file_groups=",)?;
427 FileGroupDisplay(&self.config.file_group).fmt_as(t, f)?;
428 write!(f, ")")
429 }
430 DisplayFormatType::TreeRender => {
431 writeln!(f, "format: json")?;
432 write!(f, "file={}", &self.config.original_url)
433 }
434 }
435 }
436}
437
438impl JsonSink {
439 pub fn new(config: FileSinkConfig, writer_options: JsonWriterOptions) -> Self {
441 Self {
442 config,
443 writer_options,
444 }
445 }
446
447 pub fn writer_options(&self) -> &JsonWriterOptions {
449 &self.writer_options
450 }
451}
452
453#[async_trait]
454impl FileSink for JsonSink {
455 fn config(&self) -> &FileSinkConfig {
456 &self.config
457 }
458
459 async fn spawn_writer_tasks_and_join(
460 &self,
461 context: &Arc<TaskContext>,
462 demux_task: SpawnedTask<Result<()>>,
463 file_stream_rx: DemuxedStreamReceiver,
464 object_store: Arc<dyn ObjectStore>,
465 ) -> Result<u64> {
466 let serializer = Arc::new(JsonSerializer::new()) as _;
467 spawn_writer_tasks_and_join(
468 context,
469 serializer,
470 self.writer_options.compression.into(),
471 self.writer_options.compression_level,
472 object_store,
473 demux_task,
474 file_stream_rx,
475 )
476 .await
477 }
478}
479
480#[async_trait]
481impl DataSink for JsonSink {
482 fn schema(&self) -> &SchemaRef {
483 self.config.output_schema()
484 }
485
486 async fn write_all(
487 &self,
488 data: SendableRecordBatchStream,
489 context: &Arc<TaskContext>,
490 ) -> Result<u64> {
491 FileSink::write_all(self, data, context).await
492 }
493}
494
495#[derive(Debug)]
496pub struct JsonDecoder {
497 inner: json::reader::Decoder,
498}
499
500impl JsonDecoder {
501 pub fn new(decoder: json::reader::Decoder) -> Self {
502 Self { inner: decoder }
503 }
504}
505
506impl Decoder for JsonDecoder {
507 fn decode(&mut self, buf: &[u8]) -> Result<usize, ArrowError> {
508 self.inner.decode(buf)
509 }
510
511 fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
512 self.inner.flush()
513 }
514
515 fn can_flush_early(&self) -> bool {
516 false
517 }
518}