datafusion_datasource_json/
source.rs1use std::any::Any;
21use std::io::{BufReader, Read, Seek, SeekFrom};
22use std::sync::Arc;
23use std::task::Poll;
24
25use crate::file_format::JsonDecoder;
26
27use datafusion_common::error::{DataFusionError, Result};
28use datafusion_common_runtime::JoinSet;
29use datafusion_datasource::decoder::{deserialize_stream, DecoderDeserializer};
30use datafusion_datasource::file_compression_type::FileCompressionType;
31use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
32use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
33use datafusion_datasource::{
34 as_file_source, calculate_range, ListingTableUrl, PartitionedFile, RangeCalculation,
35 TableSchema,
36};
37use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
38
39use arrow::json::ReaderBuilder;
40use arrow::{datatypes::SchemaRef, json};
41use datafusion_common::Statistics;
42use datafusion_datasource::file::FileSource;
43use datafusion_datasource::file_scan_config::FileScanConfig;
44use datafusion_execution::TaskContext;
45use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
46
47use futures::{StreamExt, TryStreamExt};
48use object_store::buffered::BufWriter;
49use object_store::{GetOptions, GetResultPayload, ObjectStore};
50use tokio::io::AsyncWriteExt;
51
52pub struct JsonOpener {
54 batch_size: usize,
55 projected_schema: SchemaRef,
56 file_compression_type: FileCompressionType,
57 object_store: Arc<dyn ObjectStore>,
58}
59
60impl JsonOpener {
61 pub fn new(
63 batch_size: usize,
64 projected_schema: SchemaRef,
65 file_compression_type: FileCompressionType,
66 object_store: Arc<dyn ObjectStore>,
67 ) -> Self {
68 Self {
69 batch_size,
70 projected_schema,
71 file_compression_type,
72 object_store,
73 }
74 }
75}
76
77#[derive(Clone, Default)]
79pub struct JsonSource {
80 batch_size: Option<usize>,
81 metrics: ExecutionPlanMetricsSet,
82 projected_statistics: Option<Statistics>,
83 schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
84}
85
86impl JsonSource {
87 pub fn new() -> Self {
89 Self::default()
90 }
91}
92
93impl From<JsonSource> for Arc<dyn FileSource> {
94 fn from(source: JsonSource) -> Self {
95 as_file_source(source)
96 }
97}
98
99impl FileSource for JsonSource {
100 fn create_file_opener(
101 &self,
102 object_store: Arc<dyn ObjectStore>,
103 base_config: &FileScanConfig,
104 _partition: usize,
105 ) -> Arc<dyn FileOpener> {
106 Arc::new(JsonOpener {
107 batch_size: self
108 .batch_size
109 .expect("Batch size must set before creating opener"),
110 projected_schema: base_config.projected_file_schema(),
111 file_compression_type: base_config.file_compression_type,
112 object_store,
113 })
114 }
115
116 fn as_any(&self) -> &dyn Any {
117 self
118 }
119
120 fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
121 let mut conf = self.clone();
122 conf.batch_size = Some(batch_size);
123 Arc::new(conf)
124 }
125
126 fn with_schema(&self, _schema: TableSchema) -> Arc<dyn FileSource> {
127 Arc::new(Self { ..self.clone() })
128 }
129 fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
130 let mut conf = self.clone();
131 conf.projected_statistics = Some(statistics);
132 Arc::new(conf)
133 }
134
135 fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource> {
136 Arc::new(Self { ..self.clone() })
137 }
138
139 fn metrics(&self) -> &ExecutionPlanMetricsSet {
140 &self.metrics
141 }
142
143 fn statistics(&self) -> Result<Statistics> {
144 let statistics = &self.projected_statistics;
145 Ok(statistics
146 .clone()
147 .expect("projected_statistics must be set to call"))
148 }
149
150 fn file_type(&self) -> &str {
151 "json"
152 }
153
154 fn with_schema_adapter_factory(
155 &self,
156 schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
157 ) -> Result<Arc<dyn FileSource>> {
158 Ok(Arc::new(Self {
159 schema_adapter_factory: Some(schema_adapter_factory),
160 ..self.clone()
161 }))
162 }
163
164 fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
165 self.schema_adapter_factory.clone()
166 }
167}
168
169impl FileOpener for JsonOpener {
170 fn open(&self, partitioned_file: PartitionedFile) -> Result<FileOpenFuture> {
180 let store = Arc::clone(&self.object_store);
181 let schema = Arc::clone(&self.projected_schema);
182 let batch_size = self.batch_size;
183 let file_compression_type = self.file_compression_type.to_owned();
184
185 Ok(Box::pin(async move {
186 let calculated_range =
187 calculate_range(&partitioned_file, &store, None).await?;
188
189 let range = match calculated_range {
190 RangeCalculation::Range(None) => None,
191 RangeCalculation::Range(Some(range)) => Some(range.into()),
192 RangeCalculation::TerminateEarly => {
193 return Ok(
194 futures::stream::poll_fn(move |_| Poll::Ready(None)).boxed()
195 )
196 }
197 };
198
199 let options = GetOptions {
200 range,
201 ..Default::default()
202 };
203
204 let result = store
205 .get_opts(&partitioned_file.object_meta.location, options)
206 .await?;
207
208 match result.payload {
209 #[cfg(not(target_arch = "wasm32"))]
210 GetResultPayload::File(mut file, _) => {
211 let bytes = match partitioned_file.range {
212 None => file_compression_type.convert_read(file)?,
213 Some(_) => {
214 file.seek(SeekFrom::Start(result.range.start as _))?;
215 let limit = result.range.end - result.range.start;
216 file_compression_type.convert_read(file.take(limit as u64))?
217 }
218 };
219
220 let reader = ReaderBuilder::new(schema)
221 .with_batch_size(batch_size)
222 .build(BufReader::new(bytes))?;
223
224 Ok(futures::stream::iter(reader)
225 .map(|r| r.map_err(Into::into))
226 .boxed())
227 }
228 GetResultPayload::Stream(s) => {
229 let s = s.map_err(DataFusionError::from);
230
231 let decoder = ReaderBuilder::new(schema)
232 .with_batch_size(batch_size)
233 .build_decoder()?;
234 let input = file_compression_type.convert_stream(s.boxed())?.fuse();
235
236 let stream = deserialize_stream(
237 input,
238 DecoderDeserializer::new(JsonDecoder::new(decoder)),
239 );
240 Ok(stream.map_err(Into::into).boxed())
241 }
242 }
243 }))
244 }
245}
246
247pub async fn plan_to_json(
248 task_ctx: Arc<TaskContext>,
249 plan: Arc<dyn ExecutionPlan>,
250 path: impl AsRef<str>,
251) -> Result<()> {
252 let path = path.as_ref();
253 let parsed = ListingTableUrl::parse(path)?;
254 let object_store_url = parsed.object_store();
255 let store = task_ctx.runtime_env().object_store(&object_store_url)?;
256 let writer_buffer_size = task_ctx
257 .session_config()
258 .options()
259 .execution
260 .objectstore_writer_buffer_size;
261 let mut join_set = JoinSet::new();
262 for i in 0..plan.output_partitioning().partition_count() {
263 let storeref = Arc::clone(&store);
264 let plan: Arc<dyn ExecutionPlan> = Arc::clone(&plan);
265 let filename = format!("{}/part-{i}.json", parsed.prefix());
266 let file = object_store::path::Path::parse(filename)?;
267
268 let mut stream = plan.execute(i, Arc::clone(&task_ctx))?;
269 join_set.spawn(async move {
270 let mut buf_writer =
271 BufWriter::with_capacity(storeref, file.clone(), writer_buffer_size);
272
273 let mut buffer = Vec::with_capacity(1024);
274 while let Some(batch) = stream.next().await.transpose()? {
275 let mut writer = json::LineDelimitedWriter::new(buffer);
276 writer.write(&batch)?;
277 buffer = writer.into_inner();
278 buf_writer.write_all(&buffer).await?;
279 buffer.clear();
280 }
281
282 buf_writer.shutdown().await.map_err(DataFusionError::from)
283 });
284 }
285
286 while let Some(result) = join_set.join_next().await {
287 match result {
288 Ok(res) => res?, Err(e) => {
290 if e.is_panic() {
291 std::panic::resume_unwind(e.into_panic());
292 } else {
293 unreachable!();
294 }
295 }
296 }
297 }
298
299 Ok(())
300}