datafusion_datasource_json/
source.rs1use std::any::Any;
21use std::io::{BufReader, Read, Seek, SeekFrom};
22use std::sync::Arc;
23use std::task::Poll;
24
25use crate::file_format::JsonDecoder;
26
27use datafusion_common::error::{DataFusionError, Result};
28use datafusion_common_runtime::JoinSet;
29use datafusion_datasource::decoder::{DecoderDeserializer, deserialize_stream};
30use datafusion_datasource::file_compression_type::FileCompressionType;
31use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
32use datafusion_datasource::projection::{ProjectionOpener, SplitProjection};
33use datafusion_datasource::{
34 ListingTableUrl, PartitionedFile, RangeCalculation, as_file_source, calculate_range,
35};
36use datafusion_physical_plan::projection::ProjectionExprs;
37use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
38
39use arrow::json::ReaderBuilder;
40use arrow::{datatypes::SchemaRef, json};
41use datafusion_datasource::file::FileSource;
42use datafusion_datasource::file_scan_config::FileScanConfig;
43use datafusion_execution::TaskContext;
44use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
45
46use futures::{StreamExt, TryStreamExt};
47use object_store::buffered::BufWriter;
48use object_store::{GetOptions, GetResultPayload, ObjectStore};
49use tokio::io::AsyncWriteExt;
50
51pub struct JsonOpener {
53 batch_size: usize,
54 projected_schema: SchemaRef,
55 file_compression_type: FileCompressionType,
56 object_store: Arc<dyn ObjectStore>,
57}
58
59impl JsonOpener {
60 pub fn new(
62 batch_size: usize,
63 projected_schema: SchemaRef,
64 file_compression_type: FileCompressionType,
65 object_store: Arc<dyn ObjectStore>,
66 ) -> Self {
67 Self {
68 batch_size,
69 projected_schema,
70 file_compression_type,
71 object_store,
72 }
73 }
74}
75
76#[derive(Clone)]
78pub struct JsonSource {
79 table_schema: datafusion_datasource::TableSchema,
80 batch_size: Option<usize>,
81 metrics: ExecutionPlanMetricsSet,
82 projection: SplitProjection,
83}
84
85impl JsonSource {
86 pub fn new(table_schema: impl Into<datafusion_datasource::TableSchema>) -> Self {
88 let table_schema = table_schema.into();
89 Self {
90 projection: SplitProjection::unprojected(&table_schema),
91 table_schema,
92 batch_size: None,
93 metrics: ExecutionPlanMetricsSet::new(),
94 }
95 }
96}
97
98impl From<JsonSource> for Arc<dyn FileSource> {
99 fn from(source: JsonSource) -> Self {
100 as_file_source(source)
101 }
102}
103
104impl FileSource for JsonSource {
105 fn create_file_opener(
106 &self,
107 object_store: Arc<dyn ObjectStore>,
108 base_config: &FileScanConfig,
109 _partition: usize,
110 ) -> Result<Arc<dyn FileOpener>> {
111 let file_schema = self.table_schema.file_schema();
113 let projected_schema =
114 Arc::new(file_schema.project(&self.projection.file_indices)?);
115
116 let mut opener = Arc::new(JsonOpener {
117 batch_size: self
118 .batch_size
119 .expect("Batch size must set before creating opener"),
120 projected_schema,
121 file_compression_type: base_config.file_compression_type,
122 object_store,
123 }) as Arc<dyn FileOpener>;
124
125 opener = ProjectionOpener::try_new(
127 self.projection.clone(),
128 Arc::clone(&opener),
129 self.table_schema.file_schema(),
130 )?;
131
132 Ok(opener)
133 }
134
135 fn as_any(&self) -> &dyn Any {
136 self
137 }
138
139 fn table_schema(&self) -> &datafusion_datasource::TableSchema {
140 &self.table_schema
141 }
142
143 fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
144 let mut conf = self.clone();
145 conf.batch_size = Some(batch_size);
146 Arc::new(conf)
147 }
148
149 fn try_pushdown_projection(
150 &self,
151 projection: &ProjectionExprs,
152 ) -> Result<Option<Arc<dyn FileSource>>> {
153 let mut source = self.clone();
154 let new_projection = self.projection.source.try_merge(projection)?;
155 let split_projection =
156 SplitProjection::new(self.table_schema.file_schema(), &new_projection);
157 source.projection = split_projection;
158 Ok(Some(Arc::new(source)))
159 }
160
161 fn projection(&self) -> Option<&ProjectionExprs> {
162 Some(&self.projection.source)
163 }
164
165 fn metrics(&self) -> &ExecutionPlanMetricsSet {
166 &self.metrics
167 }
168
169 fn file_type(&self) -> &str {
170 "json"
171 }
172}
173
174impl FileOpener for JsonOpener {
175 fn open(&self, partitioned_file: PartitionedFile) -> Result<FileOpenFuture> {
185 let store = Arc::clone(&self.object_store);
186 let schema = Arc::clone(&self.projected_schema);
187 let batch_size = self.batch_size;
188 let file_compression_type = self.file_compression_type.to_owned();
189
190 Ok(Box::pin(async move {
191 let calculated_range =
192 calculate_range(&partitioned_file, &store, None).await?;
193
194 let range = match calculated_range {
195 RangeCalculation::Range(None) => None,
196 RangeCalculation::Range(Some(range)) => Some(range.into()),
197 RangeCalculation::TerminateEarly => {
198 return Ok(
199 futures::stream::poll_fn(move |_| Poll::Ready(None)).boxed()
200 );
201 }
202 };
203
204 let options = GetOptions {
205 range,
206 ..Default::default()
207 };
208
209 let result = store
210 .get_opts(&partitioned_file.object_meta.location, options)
211 .await?;
212
213 match result.payload {
214 #[cfg(not(target_arch = "wasm32"))]
215 GetResultPayload::File(mut file, _) => {
216 let bytes = match partitioned_file.range {
217 None => file_compression_type.convert_read(file)?,
218 Some(_) => {
219 file.seek(SeekFrom::Start(result.range.start as _))?;
220 let limit = result.range.end - result.range.start;
221 file_compression_type.convert_read(file.take(limit as u64))?
222 }
223 };
224
225 let reader = ReaderBuilder::new(schema)
226 .with_batch_size(batch_size)
227 .build(BufReader::new(bytes))?;
228
229 Ok(futures::stream::iter(reader)
230 .map(|r| r.map_err(Into::into))
231 .boxed())
232 }
233 GetResultPayload::Stream(s) => {
234 let s = s.map_err(DataFusionError::from);
235
236 let decoder = ReaderBuilder::new(schema)
237 .with_batch_size(batch_size)
238 .build_decoder()?;
239 let input = file_compression_type.convert_stream(s.boxed())?.fuse();
240
241 let stream = deserialize_stream(
242 input,
243 DecoderDeserializer::new(JsonDecoder::new(decoder)),
244 );
245 Ok(stream.map_err(Into::into).boxed())
246 }
247 }
248 }))
249 }
250}
251
252pub async fn plan_to_json(
253 task_ctx: Arc<TaskContext>,
254 plan: Arc<dyn ExecutionPlan>,
255 path: impl AsRef<str>,
256) -> Result<()> {
257 let path = path.as_ref();
258 let parsed = ListingTableUrl::parse(path)?;
259 let object_store_url = parsed.object_store();
260 let store = task_ctx.runtime_env().object_store(&object_store_url)?;
261 let writer_buffer_size = task_ctx
262 .session_config()
263 .options()
264 .execution
265 .objectstore_writer_buffer_size;
266 let mut join_set = JoinSet::new();
267 for i in 0..plan.output_partitioning().partition_count() {
268 let storeref = Arc::clone(&store);
269 let plan: Arc<dyn ExecutionPlan> = Arc::clone(&plan);
270 let filename = format!("{}/part-{i}.json", parsed.prefix());
271 let file = object_store::path::Path::parse(filename)?;
272
273 let mut stream = plan.execute(i, Arc::clone(&task_ctx))?;
274 join_set.spawn(async move {
275 let mut buf_writer =
276 BufWriter::with_capacity(storeref, file.clone(), writer_buffer_size);
277
278 let mut buffer = Vec::with_capacity(1024);
279 while let Some(batch) = stream.next().await.transpose()? {
280 let mut writer = json::LineDelimitedWriter::new(buffer);
281 writer.write(&batch)?;
282 buffer = writer.into_inner();
283 buf_writer.write_all(&buffer).await?;
284 buffer.clear();
285 }
286
287 buf_writer.shutdown().await.map_err(DataFusionError::from)
288 });
289 }
290
291 while let Some(result) = join_set.join_next().await {
292 match result {
293 Ok(res) => res?, Err(e) => {
295 if e.is_panic() {
296 std::panic::resume_unwind(e.into_panic());
297 } else {
298 unreachable!();
299 }
300 }
301 }
302 }
303
304 Ok(())
305}