1use super::{CompressionType, DataSource};
4use crate::prelude::*;
5use async_trait::async_trait;
6use datafusion::arrow::datatypes::Schema;
7use datafusion::datasource::file_format::json::JsonFormat;
8use datafusion::datasource::listing::{
9 ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
10};
11use datafusion::prelude::*;
12use std::sync::Arc;
13use tracing::instrument;
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum JsonFormatType {
18 NdJson,
20 Json,
22}
23
24impl JsonFormatType {
25 pub fn from_path(path: &str) -> Self {
27 let lower = path.to_lowercase();
28 let without_compression =
30 if lower.ends_with(".gz") || lower.ends_with(".zst") || lower.ends_with(".bz2") {
31 &lower[..lower.rfind('.').unwrap_or(lower.len())]
32 } else {
33 &lower
34 };
35
36 if without_compression.ends_with(".ndjson") || without_compression.ends_with(".jsonl") {
37 Self::NdJson
38 } else {
39 Self::Json
40 }
41 }
42}
43
44#[derive(Debug, Clone)]
46pub struct JsonOptions {
47 pub format: JsonFormatType,
49 pub schema: Option<Arc<Schema>>,
51 pub compression: CompressionType,
53 pub schema_infer_max_records: usize,
55}
56
57impl Default for JsonOptions {
58 fn default() -> Self {
59 Self {
60 format: JsonFormatType::NdJson,
61 schema: None,
62 compression: CompressionType::Auto,
63 schema_infer_max_records: 1000,
64 }
65 }
66}
67
68#[derive(Debug, Clone)]
92pub struct JsonSource {
93 paths: Vec<String>,
94 options: JsonOptions,
95 inferred_schema: Option<Arc<Schema>>,
96}
97
98impl JsonSource {
99 pub fn new(path: impl Into<String>) -> Result<Self> {
101 let path_str = path.into();
102 let format = JsonFormatType::from_path(&path_str);
103
104 Ok(Self {
105 paths: vec![path_str],
106 options: JsonOptions {
107 format,
108 ..Default::default()
109 },
110 inferred_schema: None,
111 })
112 }
113
114 pub fn with_options(path: impl Into<String>, options: JsonOptions) -> Result<Self> {
116 Ok(Self {
117 paths: vec![path.into()],
118 options,
119 inferred_schema: None,
120 })
121 }
122
123 pub fn from_paths(paths: Vec<String>) -> Result<Self> {
125 if paths.is_empty() {
126 return Err(TermError::Configuration(
127 "At least one path must be provided".to_string(),
128 ));
129 }
130
131 let format = JsonFormatType::from_path(&paths[0]);
133
134 Ok(Self {
135 paths,
136 options: JsonOptions {
137 format,
138 ..Default::default()
139 },
140 inferred_schema: None,
141 })
142 }
143
144 pub async fn from_glob(pattern: impl Into<String>) -> Result<Self> {
146 let patterns = vec![pattern.into()];
147 let paths = super::expand_globs(&patterns).await?;
148 Self::from_paths(paths)
149 }
150
151 pub async fn from_globs(patterns: Vec<String>) -> Result<Self> {
153 let paths = super::expand_globs(&patterns).await?;
154 Self::from_paths(paths)
155 }
156
157 pub fn with_custom_options(mut self, options: JsonOptions) -> Self {
159 self.options = options;
160 self
161 }
162
163 #[instrument(skip(self))]
165 async fn infer_schema(&self) -> Result<Arc<Schema>> {
166 if let Some(schema) = &self.options.schema {
167 return Ok(schema.clone());
168 }
169
170 if let Some(schema) = &self.inferred_schema {
171 return Ok(schema.clone());
172 }
173
174 let ctx = SessionContext::new();
176
177 let first_path = &self.paths[0];
179 let schema = if self.options.format == JsonFormatType::NdJson {
180 if first_path.ends_with(".json") {
182 let mut options = NdJsonReadOptions::default();
183 options.schema_infer_max_records = self.options.schema_infer_max_records;
184 let df = ctx.read_json(first_path, options).await?;
185 df.schema().inner().clone()
186 } else {
187 let mut options = NdJsonReadOptions::default();
189 options.schema_infer_max_records = self.options.schema_infer_max_records;
190
191 match ctx.read_json(first_path, options).await {
193 Ok(df) => df.schema().inner().clone(),
194 Err(_) => {
195 return Err(TermError::DataSource {
198 source_type: "JSON".to_string(),
199 message: "Unable to infer schema from NDJSON file. Please provide an explicit schema.".to_string(),
200 source: None,
201 });
202 }
203 }
204 }
205 } else {
206 return Err(TermError::NotSupported(
210 "Regular JSON format is not yet supported. Please use NDJSON format.".to_string(),
211 ));
212 };
213
214 Ok(schema)
215 }
216}
217
218#[async_trait]
219impl DataSource for JsonSource {
220 #[instrument(skip(self, ctx, telemetry), fields(table_name = %table_name, source_type = "json", file_count = self.paths.len()))]
221 async fn register_with_telemetry(
222 &self,
223 ctx: &SessionContext,
224 table_name: &str,
225 telemetry: Option<&Arc<TermTelemetry>>,
226 ) -> Result<()> {
227 let mut _datasource_span = if let Some(tel) = telemetry {
229 tel.start_datasource_span("json", table_name)
230 } else {
231 TermSpan::noop()
232 };
233 if self.paths.len() == 1 {
235 let path = &self.paths[0];
236
237 if path.ends_with(".json") && self.options.format == JsonFormatType::NdJson {
239 let mut options = NdJsonReadOptions::default();
240 options.schema = self.options.schema.as_deref();
241 options.schema_infer_max_records = self.options.schema_infer_max_records;
242
243 ctx.register_json(table_name, path, options).await?;
244 } else if path.ends_with(".ndjson") || path.ends_with(".jsonl") {
245 let table_path = ListingTableUrl::parse(path)?;
247
248 let extension = if path.ends_with(".ndjson") {
250 ".ndjson"
251 } else {
252 ".jsonl"
253 };
254
255 let format = JsonFormat::default();
256 let listing_options =
257 ListingOptions::new(Arc::new(format)).with_file_extension(extension);
258
259 let config = if let Some(schema) = &self.options.schema {
261 ListingTableConfig::new(table_path)
262 .with_listing_options(listing_options)
263 .with_schema(schema.clone())
264 } else {
265 ListingTableConfig::new(table_path)
266 .with_listing_options(listing_options)
267 .infer_schema(&ctx.state())
268 .await?
269 };
270
271 let table = ListingTable::try_new(config)?;
272 ctx.register_table(table_name, Arc::new(table))?;
273 } else {
274 return Err(TermError::NotSupported(
276 "Regular JSON format is not yet supported. Please use NDJSON format."
277 .to_string(),
278 ));
279 }
280 } else {
281 let schema = if let Some(schema) = &self.options.schema {
284 schema.clone()
285 } else {
286 self.infer_schema().await?
288 };
289
290 let mut table_names = Vec::new();
292 for (i, path) in self.paths.iter().enumerate() {
293 let temp_table_name = format!("__{table_name}_temp_{i}");
294
295 if path.ends_with(".json") && self.options.format == JsonFormatType::NdJson {
296 let mut options = NdJsonReadOptions::default();
297 options.schema = Some(&schema);
298 options.schema_infer_max_records = self.options.schema_infer_max_records;
299 ctx.register_json(&temp_table_name, path, options).await?;
300 } else if path.ends_with(".ndjson") || path.ends_with(".jsonl") {
301 let table_path = ListingTableUrl::parse(path)?;
303
304 let extension = if path.ends_with(".ndjson") {
305 ".ndjson"
306 } else {
307 ".jsonl"
308 };
309
310 let format = JsonFormat::default();
311 let listing_options =
312 ListingOptions::new(Arc::new(format)).with_file_extension(extension);
313
314 let config = ListingTableConfig::new(table_path)
315 .with_listing_options(listing_options)
316 .with_schema(schema.clone());
317
318 let table = ListingTable::try_new(config)?;
319 ctx.register_table(&temp_table_name, Arc::new(table))?;
320 } else {
321 return Err(TermError::NotSupported(
322 "Regular JSON format is not yet supported. Please use NDJSON format."
323 .to_string(),
324 ));
325 }
326
327 table_names.push(temp_table_name);
328 }
329
330 if !table_names.is_empty() {
332 let union_sql = table_names
333 .iter()
334 .map(|name| format!("SELECT * FROM {name}"))
335 .collect::<Vec<_>>()
336 .join(" UNION ALL ");
337
338 let df = ctx.sql(&union_sql).await?;
339 ctx.register_table(table_name, df.into_view())?;
340
341 for temp_name in table_names {
343 ctx.deregister_table(&temp_name)?;
344 }
345 }
346 }
347
348 Ok(())
349 }
350
351 fn schema(&self) -> Option<&Arc<Schema>> {
352 self.options
353 .schema
354 .as_ref()
355 .or(self.inferred_schema.as_ref())
356 }
357
358 fn description(&self) -> String {
359 let format_str = match self.options.format {
360 JsonFormatType::NdJson => "NDJSON",
361 JsonFormatType::Json => "JSON",
362 };
363
364 if self.paths.len() == 1 {
365 let path = &self.paths[0];
366 format!("{format_str} file: {path}")
367 } else {
368 let count = self.paths.len();
369 format!("{format_str} files: {count} files")
370 }
371 }
372}
373
374#[cfg(test)]
375mod tests {
376 use super::*;
377 use std::io::Write;
378 use tempfile::NamedTempFile;
379
380 fn create_test_ndjson() -> NamedTempFile {
381 let mut file = NamedTempFile::with_suffix(".ndjson").unwrap();
382 writeln!(file, r#"{{"id": 1, "name": "Alice", "age": 30}}"#).unwrap();
383 writeln!(file, r#"{{"id": 2, "name": "Bob", "age": 25}}"#).unwrap();
384 writeln!(file, r#"{{"id": 3, "name": "Charlie", "age": 35}}"#).unwrap();
385 file.flush().unwrap();
386 file
387 }
388
389 fn create_test_json() -> NamedTempFile {
390 let mut file = NamedTempFile::with_suffix(".json").unwrap();
391 writeln!(
392 file,
393 r#"[
394 {{"id": 1, "name": "Alice", "age": 30}},
395 {{"id": 2, "name": "Bob", "age": 25}},
396 {{"id": 3, "name": "Charlie", "age": 35}}
397 ]"#
398 )
399 .unwrap();
400 file.flush().unwrap();
401 file
402 }
403
404 #[test]
405 fn test_format_detection() {
406 assert_eq!(JsonFormatType::from_path("data.json"), JsonFormatType::Json);
407 assert_eq!(
408 JsonFormatType::from_path("data.ndjson"),
409 JsonFormatType::NdJson
410 );
411 assert_eq!(
412 JsonFormatType::from_path("data.jsonl"),
413 JsonFormatType::NdJson
414 );
415 assert_eq!(
416 JsonFormatType::from_path("data.json.gz"),
417 JsonFormatType::Json
418 );
419 assert_eq!(
420 JsonFormatType::from_path("data.ndjson.gz"),
421 JsonFormatType::NdJson
422 );
423 }
424
425 #[tokio::test]
426 async fn test_json_source_single_file() {
427 let file = create_test_ndjson();
428 let source = JsonSource::new(file.path().to_str().unwrap()).unwrap();
429
430 assert_eq!(source.paths.len(), 1);
431 assert_eq!(source.options.format, JsonFormatType::NdJson);
432 assert!(source.description().contains("NDJSON file"));
433 }
434
435 #[tokio::test]
436 async fn test_json_source_with_options() {
437 let file = create_test_json();
438 let options = JsonOptions {
439 format: JsonFormatType::Json,
440 schema_infer_max_records: 500,
441 ..Default::default()
442 };
443
444 let source = JsonSource::with_options(file.path().to_str().unwrap(), options).unwrap();
445 assert_eq!(source.options.format, JsonFormatType::Json);
446 assert_eq!(source.options.schema_infer_max_records, 500);
447 }
448
449 #[tokio::test]
450 async fn test_json_source_multiple_files() {
451 let file1 = create_test_ndjson();
452 let file2 = create_test_ndjson();
453
454 let paths = vec![
455 file1.path().to_str().unwrap().to_string(),
456 file2.path().to_str().unwrap().to_string(),
457 ];
458
459 let source = JsonSource::from_paths(paths).unwrap();
460 assert_eq!(source.paths.len(), 2);
461 assert!(source.description().contains("2 files"));
462 }
463
464 #[tokio::test]
465 async fn test_ndjson_registration() {
466 use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
467
468 let file = create_test_ndjson();
469
470 let schema = Arc::new(ArrowSchema::new(vec![
472 Field::new("id", DataType::Int64, false),
473 Field::new("name", DataType::Utf8, false),
474 Field::new("age", DataType::Int64, false),
475 ]));
476
477 let options = JsonOptions {
478 schema: Some(schema),
479 ..Default::default()
480 };
481
482 let source = JsonSource::with_options(file.path().to_str().unwrap(), options).unwrap();
483
484 let ctx = SessionContext::new();
485 source.register(&ctx, "test_table").await.unwrap();
486
487 let df = ctx
489 .sql("SELECT COUNT(*) as count FROM test_table")
490 .await
491 .unwrap();
492 let batches = df.collect().await.unwrap();
493 assert!(!batches.is_empty());
494 }
495}