datafusion_datasource_json/
file_format.rs1use std::any::Any;
21use std::collections::HashMap;
22use std::fmt;
23use std::fmt::Debug;
24use std::io::BufReader;
25use std::sync::Arc;
26
27use crate::source::JsonSource;
28
29use arrow::array::RecordBatch;
30use arrow::datatypes::{Schema, SchemaRef};
31use arrow::error::ArrowError;
32use arrow::json;
33use arrow::json::reader::{ValueIter, infer_json_schema_from_iterator};
34use datafusion_common::config::{ConfigField, ConfigFileType, JsonOptions};
35use datafusion_common::file_options::json_writer::JsonWriterOptions;
36use datafusion_common::{
37 DEFAULT_JSON_EXTENSION, GetExt, Result, Statistics, not_impl_err,
38};
39use datafusion_common_runtime::SpawnedTask;
40use datafusion_datasource::TableSchema;
41use datafusion_datasource::decoder::Decoder;
42use datafusion_datasource::display::FileGroupDisplay;
43use datafusion_datasource::file::FileSource;
44use datafusion_datasource::file_compression_type::FileCompressionType;
45use datafusion_datasource::file_format::{
46 DEFAULT_SCHEMA_INFER_MAX_RECORD, FileFormat, FileFormatFactory,
47};
48use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
49use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig};
50use datafusion_datasource::sink::{DataSink, DataSinkExec};
51use datafusion_datasource::write::BatchSerializer;
52use datafusion_datasource::write::demux::DemuxedStreamReceiver;
53use datafusion_datasource::write::orchestration::spawn_writer_tasks_and_join;
54use datafusion_execution::{SendableRecordBatchStream, TaskContext};
55use datafusion_expr::dml::InsertOp;
56use datafusion_physical_expr_common::sort_expr::LexRequirement;
57use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
58use datafusion_session::Session;
59
60use async_trait::async_trait;
61use bytes::{Buf, Bytes};
62use datafusion_datasource::source::DataSourceExec;
63use object_store::{GetResultPayload, ObjectMeta, ObjectStore};
64
65#[derive(Default)]
66pub struct JsonFormatFactory {
68 pub options: Option<JsonOptions>,
70}
71
72impl JsonFormatFactory {
73 pub fn new() -> Self {
75 Self { options: None }
76 }
77
78 pub fn new_with_options(options: JsonOptions) -> Self {
80 Self {
81 options: Some(options),
82 }
83 }
84}
85
86impl FileFormatFactory for JsonFormatFactory {
87 fn create(
88 &self,
89 state: &dyn Session,
90 format_options: &HashMap<String, String>,
91 ) -> Result<Arc<dyn FileFormat>> {
92 let json_options = match &self.options {
93 None => {
94 let mut table_options = state.default_table_options();
95 table_options.set_config_format(ConfigFileType::JSON);
96 table_options.alter_with_string_hash_map(format_options)?;
97 table_options.json
98 }
99 Some(json_options) => {
100 let mut json_options = json_options.clone();
101 for (k, v) in format_options {
102 json_options.set(k, v)?;
103 }
104 json_options
105 }
106 };
107
108 Ok(Arc::new(JsonFormat::default().with_options(json_options)))
109 }
110
111 fn default(&self) -> Arc<dyn FileFormat> {
112 Arc::new(JsonFormat::default())
113 }
114
115 fn as_any(&self) -> &dyn Any {
116 self
117 }
118}
119
120impl GetExt for JsonFormatFactory {
121 fn get_ext(&self) -> String {
122 DEFAULT_JSON_EXTENSION[1..].to_string()
124 }
125}
126
127impl Debug for JsonFormatFactory {
128 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
129 f.debug_struct("JsonFormatFactory")
130 .field("options", &self.options)
131 .finish()
132 }
133}
134
135#[derive(Debug, Default)]
137pub struct JsonFormat {
138 options: JsonOptions,
139}
140
141impl JsonFormat {
142 pub fn with_options(mut self, options: JsonOptions) -> Self {
144 self.options = options;
145 self
146 }
147
148 pub fn options(&self) -> &JsonOptions {
150 &self.options
151 }
152
153 pub fn with_schema_infer_max_rec(mut self, max_rec: usize) -> Self {
156 self.options.schema_infer_max_rec = Some(max_rec);
157 self
158 }
159
160 pub fn with_file_compression_type(
163 mut self,
164 file_compression_type: FileCompressionType,
165 ) -> Self {
166 self.options.compression = file_compression_type.into();
167 self
168 }
169}
170
171#[async_trait]
172impl FileFormat for JsonFormat {
173 fn as_any(&self) -> &dyn Any {
174 self
175 }
176
177 fn get_ext(&self) -> String {
178 JsonFormatFactory::new().get_ext()
179 }
180
181 fn get_ext_with_compression(
182 &self,
183 file_compression_type: &FileCompressionType,
184 ) -> Result<String> {
185 let ext = self.get_ext();
186 Ok(format!("{}{}", ext, file_compression_type.get_ext()))
187 }
188
189 fn compression_type(&self) -> Option<FileCompressionType> {
190 Some(self.options.compression.into())
191 }
192
193 async fn infer_schema(
194 &self,
195 _state: &dyn Session,
196 store: &Arc<dyn ObjectStore>,
197 objects: &[ObjectMeta],
198 ) -> Result<SchemaRef> {
199 let mut schemas = Vec::new();
200 let mut records_to_read = self
201 .options
202 .schema_infer_max_rec
203 .unwrap_or(DEFAULT_SCHEMA_INFER_MAX_RECORD);
204 let file_compression_type = FileCompressionType::from(self.options.compression);
205 for object in objects {
206 let mut take_while = || {
207 let should_take = records_to_read > 0;
208 if should_take {
209 records_to_read -= 1;
210 }
211 should_take
212 };
213
214 let r = store.as_ref().get(&object.location).await?;
215 let schema = match r.payload {
216 #[cfg(not(target_arch = "wasm32"))]
217 GetResultPayload::File(file, _) => {
218 let decoder = file_compression_type.convert_read(file)?;
219 let mut reader = BufReader::new(decoder);
220 let iter = ValueIter::new(&mut reader, None);
221 infer_json_schema_from_iterator(iter.take_while(|_| take_while()))?
222 }
223 GetResultPayload::Stream(_) => {
224 let data = r.bytes().await?;
225 let decoder = file_compression_type.convert_read(data.reader())?;
226 let mut reader = BufReader::new(decoder);
227 let iter = ValueIter::new(&mut reader, None);
228 infer_json_schema_from_iterator(iter.take_while(|_| take_while()))?
229 }
230 };
231
232 schemas.push(schema);
233 if records_to_read == 0 {
234 break;
235 }
236 }
237
238 let schema = Schema::try_merge(schemas)?;
239 Ok(Arc::new(schema))
240 }
241
242 async fn infer_stats(
243 &self,
244 _state: &dyn Session,
245 _store: &Arc<dyn ObjectStore>,
246 table_schema: SchemaRef,
247 _object: &ObjectMeta,
248 ) -> Result<Statistics> {
249 Ok(Statistics::new_unknown(&table_schema))
250 }
251
252 async fn create_physical_plan(
253 &self,
254 _state: &dyn Session,
255 conf: FileScanConfig,
256 ) -> Result<Arc<dyn ExecutionPlan>> {
257 let conf = FileScanConfigBuilder::from(conf)
258 .with_file_compression_type(FileCompressionType::from(
259 self.options.compression,
260 ))
261 .build();
262 Ok(DataSourceExec::from_data_source(conf))
263 }
264
265 async fn create_writer_physical_plan(
266 &self,
267 input: Arc<dyn ExecutionPlan>,
268 _state: &dyn Session,
269 conf: FileSinkConfig,
270 order_requirements: Option<LexRequirement>,
271 ) -> Result<Arc<dyn ExecutionPlan>> {
272 if conf.insert_op != InsertOp::Append {
273 return not_impl_err!("Overwrites are not implemented yet for Json");
274 }
275
276 let writer_options = JsonWriterOptions::try_from(&self.options)?;
277
278 let sink = Arc::new(JsonSink::new(conf, writer_options));
279
280 Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _)
281 }
282
283 fn file_source(&self, table_schema: TableSchema) -> Arc<dyn FileSource> {
284 Arc::new(JsonSource::new(table_schema))
285 }
286}
287
288impl Default for JsonSerializer {
289 fn default() -> Self {
290 Self::new()
291 }
292}
293
294pub struct JsonSerializer {}
296
297impl JsonSerializer {
298 pub fn new() -> Self {
300 Self {}
301 }
302}
303
304impl BatchSerializer for JsonSerializer {
305 fn serialize(&self, batch: RecordBatch, _initial: bool) -> Result<Bytes> {
306 let mut buffer = Vec::with_capacity(4096);
307 let mut writer = json::LineDelimitedWriter::new(&mut buffer);
308 writer.write(&batch)?;
309 Ok(Bytes::from(buffer))
310 }
311}
312
313pub struct JsonSink {
315 config: FileSinkConfig,
317 writer_options: JsonWriterOptions,
319}
320
321impl Debug for JsonSink {
322 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
323 f.debug_struct("JsonSink").finish()
324 }
325}
326
327impl DisplayAs for JsonSink {
328 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
329 match t {
330 DisplayFormatType::Default | DisplayFormatType::Verbose => {
331 write!(f, "JsonSink(file_groups=",)?;
332 FileGroupDisplay(&self.config.file_group).fmt_as(t, f)?;
333 write!(f, ")")
334 }
335 DisplayFormatType::TreeRender => {
336 writeln!(f, "format: json")?;
337 write!(f, "file={}", &self.config.original_url)
338 }
339 }
340 }
341}
342
343impl JsonSink {
344 pub fn new(config: FileSinkConfig, writer_options: JsonWriterOptions) -> Self {
346 Self {
347 config,
348 writer_options,
349 }
350 }
351
352 pub fn writer_options(&self) -> &JsonWriterOptions {
354 &self.writer_options
355 }
356}
357
358#[async_trait]
359impl FileSink for JsonSink {
360 fn config(&self) -> &FileSinkConfig {
361 &self.config
362 }
363
364 async fn spawn_writer_tasks_and_join(
365 &self,
366 context: &Arc<TaskContext>,
367 demux_task: SpawnedTask<Result<()>>,
368 file_stream_rx: DemuxedStreamReceiver,
369 object_store: Arc<dyn ObjectStore>,
370 ) -> Result<u64> {
371 let serializer = Arc::new(JsonSerializer::new()) as _;
372 spawn_writer_tasks_and_join(
373 context,
374 serializer,
375 self.writer_options.compression.into(),
376 self.writer_options.compression_level,
377 object_store,
378 demux_task,
379 file_stream_rx,
380 )
381 .await
382 }
383}
384
385#[async_trait]
386impl DataSink for JsonSink {
387 fn as_any(&self) -> &dyn Any {
388 self
389 }
390
391 fn schema(&self) -> &SchemaRef {
392 self.config.output_schema()
393 }
394
395 async fn write_all(
396 &self,
397 data: SendableRecordBatchStream,
398 context: &Arc<TaskContext>,
399 ) -> Result<u64> {
400 FileSink::write_all(self, data, context).await
401 }
402}
403
404#[derive(Debug)]
405pub struct JsonDecoder {
406 inner: json::reader::Decoder,
407}
408
409impl JsonDecoder {
410 pub fn new(decoder: json::reader::Decoder) -> Self {
411 Self { inner: decoder }
412 }
413}
414
415impl Decoder for JsonDecoder {
416 fn decode(&mut self, buf: &[u8]) -> Result<usize, ArrowError> {
417 self.inner.decode(buf)
418 }
419
420 fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
421 self.inner.flush()
422 }
423
424 fn can_flush_early(&self) -> bool {
425 false
426 }
427}