datafusion_datasource_json/
source.rs1use std::any::Any;
21use std::io::{BufReader, Read, Seek, SeekFrom};
22use std::sync::Arc;
23use std::task::Poll;
24
25use crate::file_format::JsonDecoder;
26
27use datafusion_common::error::{DataFusionError, Result};
28use datafusion_common_runtime::JoinSet;
29use datafusion_datasource::decoder::{deserialize_stream, DecoderDeserializer};
30use datafusion_datasource::file_compression_type::FileCompressionType;
31use datafusion_datasource::file_meta::FileMeta;
32use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
33use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
34use datafusion_datasource::{
35 as_file_source, calculate_range, ListingTableUrl, RangeCalculation,
36};
37use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
38
39use arrow::json::ReaderBuilder;
40use arrow::{datatypes::SchemaRef, json};
41use datafusion_common::Statistics;
42use datafusion_datasource::file::FileSource;
43use datafusion_datasource::file_scan_config::FileScanConfig;
44use datafusion_execution::TaskContext;
45use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
46
47use futures::{StreamExt, TryStreamExt};
48use object_store::buffered::BufWriter;
49use object_store::{GetOptions, GetResultPayload, ObjectStore};
50use tokio::io::AsyncWriteExt;
51
52pub struct JsonOpener {
54 batch_size: usize,
55 projected_schema: SchemaRef,
56 file_compression_type: FileCompressionType,
57 object_store: Arc<dyn ObjectStore>,
58}
59
60impl JsonOpener {
61 pub fn new(
63 batch_size: usize,
64 projected_schema: SchemaRef,
65 file_compression_type: FileCompressionType,
66 object_store: Arc<dyn ObjectStore>,
67 ) -> Self {
68 Self {
69 batch_size,
70 projected_schema,
71 file_compression_type,
72 object_store,
73 }
74 }
75}
76
77#[derive(Clone, Default)]
79pub struct JsonSource {
80 batch_size: Option<usize>,
81 metrics: ExecutionPlanMetricsSet,
82 projected_statistics: Option<Statistics>,
83 schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
84}
85
86impl JsonSource {
87 pub fn new() -> Self {
89 Self::default()
90 }
91}
92
93impl From<JsonSource> for Arc<dyn FileSource> {
94 fn from(source: JsonSource) -> Self {
95 as_file_source(source)
96 }
97}
98
99impl FileSource for JsonSource {
100 fn create_file_opener(
101 &self,
102 object_store: Arc<dyn ObjectStore>,
103 base_config: &FileScanConfig,
104 _partition: usize,
105 ) -> Arc<dyn FileOpener> {
106 Arc::new(JsonOpener {
107 batch_size: self
108 .batch_size
109 .expect("Batch size must set before creating opener"),
110 projected_schema: base_config.projected_file_schema(),
111 file_compression_type: base_config.file_compression_type,
112 object_store,
113 })
114 }
115
116 fn as_any(&self) -> &dyn Any {
117 self
118 }
119
120 fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
121 let mut conf = self.clone();
122 conf.batch_size = Some(batch_size);
123 Arc::new(conf)
124 }
125
126 fn with_schema(&self, _schema: SchemaRef) -> Arc<dyn FileSource> {
127 Arc::new(Self { ..self.clone() })
128 }
129 fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
130 let mut conf = self.clone();
131 conf.projected_statistics = Some(statistics);
132 Arc::new(conf)
133 }
134
135 fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource> {
136 Arc::new(Self { ..self.clone() })
137 }
138
139 fn metrics(&self) -> &ExecutionPlanMetricsSet {
140 &self.metrics
141 }
142
143 fn statistics(&self) -> Result<Statistics> {
144 let statistics = &self.projected_statistics;
145 Ok(statistics
146 .clone()
147 .expect("projected_statistics must be set to call"))
148 }
149
150 fn file_type(&self) -> &str {
151 "json"
152 }
153
154 fn with_schema_adapter_factory(
155 &self,
156 schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
157 ) -> Result<Arc<dyn FileSource>> {
158 Ok(Arc::new(Self {
159 schema_adapter_factory: Some(schema_adapter_factory),
160 ..self.clone()
161 }))
162 }
163
164 fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
165 self.schema_adapter_factory.clone()
166 }
167}
168
169impl FileOpener for JsonOpener {
170 fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture> {
180 let store = Arc::clone(&self.object_store);
181 let schema = Arc::clone(&self.projected_schema);
182 let batch_size = self.batch_size;
183 let file_compression_type = self.file_compression_type.to_owned();
184
185 Ok(Box::pin(async move {
186 let calculated_range = calculate_range(&file_meta, &store, None).await?;
187
188 let range = match calculated_range {
189 RangeCalculation::Range(None) => None,
190 RangeCalculation::Range(Some(range)) => Some(range.into()),
191 RangeCalculation::TerminateEarly => {
192 return Ok(
193 futures::stream::poll_fn(move |_| Poll::Ready(None)).boxed()
194 )
195 }
196 };
197
198 let options = GetOptions {
199 range,
200 ..Default::default()
201 };
202
203 let result = store.get_opts(file_meta.location(), options).await?;
204
205 match result.payload {
206 #[cfg(not(target_arch = "wasm32"))]
207 GetResultPayload::File(mut file, _) => {
208 let bytes = match file_meta.range {
209 None => file_compression_type.convert_read(file)?,
210 Some(_) => {
211 file.seek(SeekFrom::Start(result.range.start as _))?;
212 let limit = result.range.end - result.range.start;
213 file_compression_type.convert_read(file.take(limit as u64))?
214 }
215 };
216
217 let reader = ReaderBuilder::new(schema)
218 .with_batch_size(batch_size)
219 .build(BufReader::new(bytes))?;
220
221 Ok(futures::stream::iter(reader).boxed())
222 }
223 GetResultPayload::Stream(s) => {
224 let s = s.map_err(DataFusionError::from);
225
226 let decoder = ReaderBuilder::new(schema)
227 .with_batch_size(batch_size)
228 .build_decoder()?;
229 let input = file_compression_type.convert_stream(s.boxed())?.fuse();
230
231 Ok(deserialize_stream(
232 input,
233 DecoderDeserializer::new(JsonDecoder::new(decoder)),
234 ))
235 }
236 }
237 }))
238 }
239}
240
241pub async fn plan_to_json(
242 task_ctx: Arc<TaskContext>,
243 plan: Arc<dyn ExecutionPlan>,
244 path: impl AsRef<str>,
245) -> Result<()> {
246 let path = path.as_ref();
247 let parsed = ListingTableUrl::parse(path)?;
248 let object_store_url = parsed.object_store();
249 let store = task_ctx.runtime_env().object_store(&object_store_url)?;
250 let writer_buffer_size = task_ctx
251 .session_config()
252 .options()
253 .execution
254 .objectstore_writer_buffer_size;
255 let mut join_set = JoinSet::new();
256 for i in 0..plan.output_partitioning().partition_count() {
257 let storeref = Arc::clone(&store);
258 let plan: Arc<dyn ExecutionPlan> = Arc::clone(&plan);
259 let filename = format!("{}/part-{i}.json", parsed.prefix());
260 let file = object_store::path::Path::parse(filename)?;
261
262 let mut stream = plan.execute(i, Arc::clone(&task_ctx))?;
263 join_set.spawn(async move {
264 let mut buf_writer =
265 BufWriter::with_capacity(storeref, file.clone(), writer_buffer_size);
266
267 let mut buffer = Vec::with_capacity(1024);
268 while let Some(batch) = stream.next().await.transpose()? {
269 let mut writer = json::LineDelimitedWriter::new(buffer);
270 writer.write(&batch)?;
271 buffer = writer.into_inner();
272 buf_writer.write_all(&buffer).await?;
273 buffer.clear();
274 }
275
276 buf_writer.shutdown().await.map_err(DataFusionError::from)
277 });
278 }
279
280 while let Some(result) = join_set.join_next().await {
281 match result {
282 Ok(res) => res?, Err(e) => {
284 if e.is_panic() {
285 std::panic::resume_unwind(e.into_panic());
286 } else {
287 unreachable!();
288 }
289 }
290 }
291 }
292
293 Ok(())
294}