datafusion_datasource_json/
file_format.rs1use std::any::Any;
21use std::collections::HashMap;
22use std::fmt;
23use std::fmt::Debug;
24use std::io::BufReader;
25use std::sync::Arc;
26
27use crate::source::JsonSource;
28
29use arrow::array::RecordBatch;
30use arrow::datatypes::{Schema, SchemaRef};
31use arrow::error::ArrowError;
32use arrow::json;
33use arrow::json::reader::{infer_json_schema_from_iterator, ValueIter};
34use datafusion_common::config::{ConfigField, ConfigFileType, JsonOptions};
35use datafusion_common::file_options::json_writer::JsonWriterOptions;
36use datafusion_common::{
37 not_impl_err, GetExt, Result, Statistics, DEFAULT_JSON_EXTENSION,
38};
39use datafusion_common_runtime::SpawnedTask;
40use datafusion_datasource::decoder::Decoder;
41use datafusion_datasource::display::FileGroupDisplay;
42use datafusion_datasource::file::FileSource;
43use datafusion_datasource::file_compression_type::FileCompressionType;
44use datafusion_datasource::file_format::{
45 FileFormat, FileFormatFactory, DEFAULT_SCHEMA_INFER_MAX_RECORD,
46};
47use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
48use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig};
49use datafusion_datasource::sink::{DataSink, DataSinkExec};
50use datafusion_datasource::write::demux::DemuxedStreamReceiver;
51use datafusion_datasource::write::orchestration::spawn_writer_tasks_and_join;
52use datafusion_datasource::write::BatchSerializer;
53use datafusion_execution::{SendableRecordBatchStream, TaskContext};
54use datafusion_expr::dml::InsertOp;
55use datafusion_physical_expr_common::sort_expr::LexRequirement;
56use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
57use datafusion_session::Session;
58
59use async_trait::async_trait;
60use bytes::{Buf, Bytes};
61use datafusion_datasource::source::DataSourceExec;
62use object_store::{GetResultPayload, ObjectMeta, ObjectStore};
63
64#[derive(Default)]
65pub struct JsonFormatFactory {
67 pub options: Option<JsonOptions>,
69}
70
71impl JsonFormatFactory {
72 pub fn new() -> Self {
74 Self { options: None }
75 }
76
77 pub fn new_with_options(options: JsonOptions) -> Self {
79 Self {
80 options: Some(options),
81 }
82 }
83}
84
85impl FileFormatFactory for JsonFormatFactory {
86 fn create(
87 &self,
88 state: &dyn Session,
89 format_options: &HashMap<String, String>,
90 ) -> Result<Arc<dyn FileFormat>> {
91 let json_options = match &self.options {
92 None => {
93 let mut table_options = state.default_table_options();
94 table_options.set_config_format(ConfigFileType::JSON);
95 table_options.alter_with_string_hash_map(format_options)?;
96 table_options.json
97 }
98 Some(json_options) => {
99 let mut json_options = json_options.clone();
100 for (k, v) in format_options {
101 json_options.set(k, v)?;
102 }
103 json_options
104 }
105 };
106
107 Ok(Arc::new(JsonFormat::default().with_options(json_options)))
108 }
109
110 fn default(&self) -> Arc<dyn FileFormat> {
111 Arc::new(JsonFormat::default())
112 }
113
114 fn as_any(&self) -> &dyn Any {
115 self
116 }
117}
118
119impl GetExt for JsonFormatFactory {
120 fn get_ext(&self) -> String {
121 DEFAULT_JSON_EXTENSION[1..].to_string()
123 }
124}
125
126impl Debug for JsonFormatFactory {
127 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
128 f.debug_struct("JsonFormatFactory")
129 .field("options", &self.options)
130 .finish()
131 }
132}
133
134#[derive(Debug, Default)]
136pub struct JsonFormat {
137 options: JsonOptions,
138}
139
140impl JsonFormat {
141 pub fn with_options(mut self, options: JsonOptions) -> Self {
143 self.options = options;
144 self
145 }
146
147 pub fn options(&self) -> &JsonOptions {
149 &self.options
150 }
151
152 pub fn with_schema_infer_max_rec(mut self, max_rec: usize) -> Self {
155 self.options.schema_infer_max_rec = Some(max_rec);
156 self
157 }
158
159 pub fn with_file_compression_type(
162 mut self,
163 file_compression_type: FileCompressionType,
164 ) -> Self {
165 self.options.compression = file_compression_type.into();
166 self
167 }
168}
169
170#[async_trait]
171impl FileFormat for JsonFormat {
172 fn as_any(&self) -> &dyn Any {
173 self
174 }
175
176 fn get_ext(&self) -> String {
177 JsonFormatFactory::new().get_ext()
178 }
179
180 fn get_ext_with_compression(
181 &self,
182 file_compression_type: &FileCompressionType,
183 ) -> Result<String> {
184 let ext = self.get_ext();
185 Ok(format!("{}{}", ext, file_compression_type.get_ext()))
186 }
187
188 fn compression_type(&self) -> Option<FileCompressionType> {
189 Some(self.options.compression.into())
190 }
191
192 async fn infer_schema(
193 &self,
194 _state: &dyn Session,
195 store: &Arc<dyn ObjectStore>,
196 objects: &[ObjectMeta],
197 ) -> Result<SchemaRef> {
198 let mut schemas = Vec::new();
199 let mut records_to_read = self
200 .options
201 .schema_infer_max_rec
202 .unwrap_or(DEFAULT_SCHEMA_INFER_MAX_RECORD);
203 let file_compression_type = FileCompressionType::from(self.options.compression);
204 for object in objects {
205 let mut take_while = || {
206 let should_take = records_to_read > 0;
207 if should_take {
208 records_to_read -= 1;
209 }
210 should_take
211 };
212
213 let r = store.as_ref().get(&object.location).await?;
214 let schema = match r.payload {
215 #[cfg(not(target_arch = "wasm32"))]
216 GetResultPayload::File(file, _) => {
217 let decoder = file_compression_type.convert_read(file)?;
218 let mut reader = BufReader::new(decoder);
219 let iter = ValueIter::new(&mut reader, None);
220 infer_json_schema_from_iterator(iter.take_while(|_| take_while()))?
221 }
222 GetResultPayload::Stream(_) => {
223 let data = r.bytes().await?;
224 let decoder = file_compression_type.convert_read(data.reader())?;
225 let mut reader = BufReader::new(decoder);
226 let iter = ValueIter::new(&mut reader, None);
227 infer_json_schema_from_iterator(iter.take_while(|_| take_while()))?
228 }
229 };
230
231 schemas.push(schema);
232 if records_to_read == 0 {
233 break;
234 }
235 }
236
237 let schema = Schema::try_merge(schemas)?;
238 Ok(Arc::new(schema))
239 }
240
241 async fn infer_stats(
242 &self,
243 _state: &dyn Session,
244 _store: &Arc<dyn ObjectStore>,
245 table_schema: SchemaRef,
246 _object: &ObjectMeta,
247 ) -> Result<Statistics> {
248 Ok(Statistics::new_unknown(&table_schema))
249 }
250
251 async fn create_physical_plan(
252 &self,
253 _state: &dyn Session,
254 conf: FileScanConfig,
255 ) -> Result<Arc<dyn ExecutionPlan>> {
256 let source = Arc::new(JsonSource::new());
257 let conf = FileScanConfigBuilder::from(conf)
258 .with_file_compression_type(FileCompressionType::from(
259 self.options.compression,
260 ))
261 .with_source(source)
262 .build();
263 Ok(DataSourceExec::from_data_source(conf))
264 }
265
266 async fn create_writer_physical_plan(
267 &self,
268 input: Arc<dyn ExecutionPlan>,
269 _state: &dyn Session,
270 conf: FileSinkConfig,
271 order_requirements: Option<LexRequirement>,
272 ) -> Result<Arc<dyn ExecutionPlan>> {
273 if conf.insert_op != InsertOp::Append {
274 return not_impl_err!("Overwrites are not implemented yet for Json");
275 }
276
277 let writer_options = JsonWriterOptions::try_from(&self.options)?;
278
279 let sink = Arc::new(JsonSink::new(conf, writer_options));
280
281 Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _)
282 }
283
284 fn file_source(&self) -> Arc<dyn FileSource> {
285 Arc::new(JsonSource::default())
286 }
287}
288
289impl Default for JsonSerializer {
290 fn default() -> Self {
291 Self::new()
292 }
293}
294
295pub struct JsonSerializer {}
297
298impl JsonSerializer {
299 pub fn new() -> Self {
301 Self {}
302 }
303}
304
305impl BatchSerializer for JsonSerializer {
306 fn serialize(&self, batch: RecordBatch, _initial: bool) -> Result<Bytes> {
307 let mut buffer = Vec::with_capacity(4096);
308 let mut writer = json::LineDelimitedWriter::new(&mut buffer);
309 writer.write(&batch)?;
310 Ok(Bytes::from(buffer))
311 }
312}
313
314pub struct JsonSink {
316 config: FileSinkConfig,
318 writer_options: JsonWriterOptions,
320}
321
322impl Debug for JsonSink {
323 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
324 f.debug_struct("JsonSink").finish()
325 }
326}
327
328impl DisplayAs for JsonSink {
329 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
330 match t {
331 DisplayFormatType::Default | DisplayFormatType::Verbose => {
332 write!(f, "JsonSink(file_groups=",)?;
333 FileGroupDisplay(&self.config.file_group).fmt_as(t, f)?;
334 write!(f, ")")
335 }
336 DisplayFormatType::TreeRender => {
337 writeln!(f, "format: json")?;
338 write!(f, "file={}", &self.config.original_url)
339 }
340 }
341 }
342}
343
344impl JsonSink {
345 pub fn new(config: FileSinkConfig, writer_options: JsonWriterOptions) -> Self {
347 Self {
348 config,
349 writer_options,
350 }
351 }
352
353 pub fn writer_options(&self) -> &JsonWriterOptions {
355 &self.writer_options
356 }
357}
358
359#[async_trait]
360impl FileSink for JsonSink {
361 fn config(&self) -> &FileSinkConfig {
362 &self.config
363 }
364
365 async fn spawn_writer_tasks_and_join(
366 &self,
367 context: &Arc<TaskContext>,
368 demux_task: SpawnedTask<Result<()>>,
369 file_stream_rx: DemuxedStreamReceiver,
370 object_store: Arc<dyn ObjectStore>,
371 ) -> Result<u64> {
372 let serializer = Arc::new(JsonSerializer::new()) as _;
373 spawn_writer_tasks_and_join(
374 context,
375 serializer,
376 self.writer_options.compression.into(),
377 object_store,
378 demux_task,
379 file_stream_rx,
380 )
381 .await
382 }
383}
384
385#[async_trait]
386impl DataSink for JsonSink {
387 fn as_any(&self) -> &dyn Any {
388 self
389 }
390
391 fn schema(&self) -> &SchemaRef {
392 self.config.output_schema()
393 }
394
395 async fn write_all(
396 &self,
397 data: SendableRecordBatchStream,
398 context: &Arc<TaskContext>,
399 ) -> Result<u64> {
400 FileSink::write_all(self, data, context).await
401 }
402}
403
404#[derive(Debug)]
405pub struct JsonDecoder {
406 inner: json::reader::Decoder,
407}
408
409impl JsonDecoder {
410 pub fn new(decoder: json::reader::Decoder) -> Self {
411 Self { inner: decoder }
412 }
413}
414
415impl Decoder for JsonDecoder {
416 fn decode(&mut self, buf: &[u8]) -> Result<usize, ArrowError> {
417 self.inner.decode(buf)
418 }
419
420 fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
421 self.inner.flush()
422 }
423
424 fn can_flush_early(&self) -> bool {
425 false
426 }
427}