datafusion_datasource_json/
source.rs1use std::any::Any;
21use std::io::{BufReader, Read, Seek, SeekFrom};
22use std::sync::Arc;
23use std::task::Poll;
24
25use crate::file_format::JsonDecoder;
26
27use datafusion_common::error::{DataFusionError, Result};
28use datafusion_common_runtime::JoinSet;
29use datafusion_datasource::decoder::{deserialize_stream, DecoderDeserializer};
30use datafusion_datasource::file_compression_type::FileCompressionType;
31use datafusion_datasource::file_meta::FileMeta;
32use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
33use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
34use datafusion_datasource::{
35 as_file_source, calculate_range, ListingTableUrl, PartitionedFile, RangeCalculation,
36};
37use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
38
39use arrow::json::ReaderBuilder;
40use arrow::{datatypes::SchemaRef, json};
41use datafusion_common::Statistics;
42use datafusion_datasource::file::FileSource;
43use datafusion_datasource::file_scan_config::FileScanConfig;
44use datafusion_execution::TaskContext;
45use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
46
47use futures::{StreamExt, TryStreamExt};
48use object_store::buffered::BufWriter;
49use object_store::{GetOptions, GetResultPayload, ObjectStore};
50use tokio::io::AsyncWriteExt;
51
52pub struct JsonOpener {
54 batch_size: usize,
55 projected_schema: SchemaRef,
56 file_compression_type: FileCompressionType,
57 object_store: Arc<dyn ObjectStore>,
58}
59
60impl JsonOpener {
61 pub fn new(
63 batch_size: usize,
64 projected_schema: SchemaRef,
65 file_compression_type: FileCompressionType,
66 object_store: Arc<dyn ObjectStore>,
67 ) -> Self {
68 Self {
69 batch_size,
70 projected_schema,
71 file_compression_type,
72 object_store,
73 }
74 }
75}
76
77#[derive(Clone, Default)]
79pub struct JsonSource {
80 batch_size: Option<usize>,
81 metrics: ExecutionPlanMetricsSet,
82 projected_statistics: Option<Statistics>,
83 schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
84}
85
86impl JsonSource {
87 pub fn new() -> Self {
89 Self::default()
90 }
91}
92
93impl From<JsonSource> for Arc<dyn FileSource> {
94 fn from(source: JsonSource) -> Self {
95 as_file_source(source)
96 }
97}
98
99impl FileSource for JsonSource {
100 fn create_file_opener(
101 &self,
102 object_store: Arc<dyn ObjectStore>,
103 base_config: &FileScanConfig,
104 _partition: usize,
105 ) -> Arc<dyn FileOpener> {
106 Arc::new(JsonOpener {
107 batch_size: self
108 .batch_size
109 .expect("Batch size must set before creating opener"),
110 projected_schema: base_config.projected_file_schema(),
111 file_compression_type: base_config.file_compression_type,
112 object_store,
113 })
114 }
115
116 fn as_any(&self) -> &dyn Any {
117 self
118 }
119
120 fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
121 let mut conf = self.clone();
122 conf.batch_size = Some(batch_size);
123 Arc::new(conf)
124 }
125
126 fn with_schema(&self, _schema: SchemaRef) -> Arc<dyn FileSource> {
127 Arc::new(Self { ..self.clone() })
128 }
129 fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
130 let mut conf = self.clone();
131 conf.projected_statistics = Some(statistics);
132 Arc::new(conf)
133 }
134
135 fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource> {
136 Arc::new(Self { ..self.clone() })
137 }
138
139 fn metrics(&self) -> &ExecutionPlanMetricsSet {
140 &self.metrics
141 }
142
143 fn statistics(&self) -> Result<Statistics> {
144 let statistics = &self.projected_statistics;
145 Ok(statistics
146 .clone()
147 .expect("projected_statistics must be set to call"))
148 }
149
150 fn file_type(&self) -> &str {
151 "json"
152 }
153
154 fn with_schema_adapter_factory(
155 &self,
156 schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
157 ) -> Result<Arc<dyn FileSource>> {
158 Ok(Arc::new(Self {
159 schema_adapter_factory: Some(schema_adapter_factory),
160 ..self.clone()
161 }))
162 }
163
164 fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
165 self.schema_adapter_factory.clone()
166 }
167}
168
169impl FileOpener for JsonOpener {
170 fn open(
180 &self,
181 file_meta: FileMeta,
182 _file: PartitionedFile,
183 ) -> Result<FileOpenFuture> {
184 let store = Arc::clone(&self.object_store);
185 let schema = Arc::clone(&self.projected_schema);
186 let batch_size = self.batch_size;
187 let file_compression_type = self.file_compression_type.to_owned();
188
189 Ok(Box::pin(async move {
190 let calculated_range = calculate_range(&file_meta, &store, None).await?;
191
192 let range = match calculated_range {
193 RangeCalculation::Range(None) => None,
194 RangeCalculation::Range(Some(range)) => Some(range.into()),
195 RangeCalculation::TerminateEarly => {
196 return Ok(
197 futures::stream::poll_fn(move |_| Poll::Ready(None)).boxed()
198 )
199 }
200 };
201
202 let options = GetOptions {
203 range,
204 ..Default::default()
205 };
206
207 let result = store.get_opts(file_meta.location(), options).await?;
208
209 match result.payload {
210 #[cfg(not(target_arch = "wasm32"))]
211 GetResultPayload::File(mut file, _) => {
212 let bytes = match file_meta.range {
213 None => file_compression_type.convert_read(file)?,
214 Some(_) => {
215 file.seek(SeekFrom::Start(result.range.start as _))?;
216 let limit = result.range.end - result.range.start;
217 file_compression_type.convert_read(file.take(limit as u64))?
218 }
219 };
220
221 let reader = ReaderBuilder::new(schema)
222 .with_batch_size(batch_size)
223 .build(BufReader::new(bytes))?;
224
225 Ok(futures::stream::iter(reader).boxed())
226 }
227 GetResultPayload::Stream(s) => {
228 let s = s.map_err(DataFusionError::from);
229
230 let decoder = ReaderBuilder::new(schema)
231 .with_batch_size(batch_size)
232 .build_decoder()?;
233 let input = file_compression_type.convert_stream(s.boxed())?.fuse();
234
235 Ok(deserialize_stream(
236 input,
237 DecoderDeserializer::new(JsonDecoder::new(decoder)),
238 ))
239 }
240 }
241 }))
242 }
243}
244
245pub async fn plan_to_json(
246 task_ctx: Arc<TaskContext>,
247 plan: Arc<dyn ExecutionPlan>,
248 path: impl AsRef<str>,
249) -> Result<()> {
250 let path = path.as_ref();
251 let parsed = ListingTableUrl::parse(path)?;
252 let object_store_url = parsed.object_store();
253 let store = task_ctx.runtime_env().object_store(&object_store_url)?;
254 let writer_buffer_size = task_ctx
255 .session_config()
256 .options()
257 .execution
258 .objectstore_writer_buffer_size;
259 let mut join_set = JoinSet::new();
260 for i in 0..plan.output_partitioning().partition_count() {
261 let storeref = Arc::clone(&store);
262 let plan: Arc<dyn ExecutionPlan> = Arc::clone(&plan);
263 let filename = format!("{}/part-{i}.json", parsed.prefix());
264 let file = object_store::path::Path::parse(filename)?;
265
266 let mut stream = plan.execute(i, Arc::clone(&task_ctx))?;
267 join_set.spawn(async move {
268 let mut buf_writer =
269 BufWriter::with_capacity(storeref, file.clone(), writer_buffer_size);
270
271 let mut buffer = Vec::with_capacity(1024);
272 while let Some(batch) = stream.next().await.transpose()? {
273 let mut writer = json::LineDelimitedWriter::new(buffer);
274 writer.write(&batch)?;
275 buffer = writer.into_inner();
276 buf_writer.write_all(&buffer).await?;
277 buffer.clear();
278 }
279
280 buf_writer.shutdown().await.map_err(DataFusionError::from)
281 });
282 }
283
284 while let Some(result) = join_set.join_next().await {
285 match result {
286 Ok(res) => res?, Err(e) => {
288 if e.is_panic() {
289 std::panic::resume_unwind(e.into_panic());
290 } else {
291 unreachable!();
292 }
293 }
294 }
295 }
296
297 Ok(())
298}