term_guard/sources/
json.rs

1//! JSON and NDJSON file source implementation.
2
3use super::{CompressionType, DataSource};
4use crate::prelude::*;
5use async_trait::async_trait;
6use datafusion::arrow::datatypes::Schema;
7use datafusion::datasource::file_format::json::JsonFormat;
8use datafusion::datasource::listing::{
9    ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
10};
11use datafusion::prelude::*;
12use std::sync::Arc;
13use tracing::instrument;
14
15/// Format type for JSON files.
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum JsonFormatType {
18    /// Line-delimited JSON (one JSON object per line)
19    NdJson,
20    /// Regular JSON (single JSON object or array)
21    Json,
22}
23
24impl JsonFormatType {
25    /// Detects JSON format from file extension.
26    pub fn from_path(path: &str) -> Self {
27        let lower = path.to_lowercase();
28        // Remove compression extensions first
29        let without_compression =
30            if lower.ends_with(".gz") || lower.ends_with(".zst") || lower.ends_with(".bz2") {
31                &lower[..lower.rfind('.').unwrap_or(lower.len())]
32            } else {
33                &lower
34            };
35
36        if without_compression.ends_with(".ndjson") || without_compression.ends_with(".jsonl") {
37            Self::NdJson
38        } else {
39            Self::Json
40        }
41    }
42}
43
44/// Options for configuring JSON file reading.
45#[derive(Debug, Clone)]
46pub struct JsonOptions {
47    /// JSON format type
48    pub format: JsonFormatType,
49    /// Schema to use (if None, will be inferred)
50    pub schema: Option<Arc<Schema>>,
51    /// Compression type (default: Auto)
52    pub compression: CompressionType,
53    /// Maximum records to read for schema inference
54    pub schema_infer_max_records: usize,
55}
56
57impl Default for JsonOptions {
58    fn default() -> Self {
59        Self {
60            format: JsonFormatType::NdJson,
61            schema: None,
62            compression: CompressionType::Auto,
63            schema_infer_max_records: 1000,
64        }
65    }
66}
67
68/// A JSON/NDJSON file data source with schema inference and compression support.
69///
70/// # Examples
71///
72/// ```rust,ignore
73/// use term_guard::sources::{JsonSource, JsonOptions, JsonFormatType};
74///
75/// # async fn example() -> Result<()> {
76/// // NDJSON file (auto-detected)
77/// let source = JsonSource::new("data/events.ndjson")?;
78///
79/// // Regular JSON with custom options
80/// let options = JsonOptions {
81///     format: JsonFormatType::Json,
82///     ..Default::default()
83/// };
84/// let source = JsonSource::with_options("data/config.json", options)?;
85///
86/// // Compressed NDJSON files with glob
87/// let source = JsonSource::from_glob("logs/*.jsonl.gz").await?;
88/// # Ok(())
89/// # }
90/// ```
91#[derive(Debug, Clone)]
92pub struct JsonSource {
93    paths: Vec<String>,
94    options: JsonOptions,
95    inferred_schema: Option<Arc<Schema>>,
96}
97
98impl JsonSource {
99    /// Creates a new JSON source from a single file path.
100    pub fn new(path: impl Into<String>) -> Result<Self> {
101        let path_str = path.into();
102        let format = JsonFormatType::from_path(&path_str);
103
104        Ok(Self {
105            paths: vec![path_str],
106            options: JsonOptions {
107                format,
108                ..Default::default()
109            },
110            inferred_schema: None,
111        })
112    }
113
114    /// Creates a new JSON source with custom options.
115    pub fn with_options(path: impl Into<String>, options: JsonOptions) -> Result<Self> {
116        Ok(Self {
117            paths: vec![path.into()],
118            options,
119            inferred_schema: None,
120        })
121    }
122
123    /// Creates a JSON source from multiple file paths.
124    pub fn from_paths(paths: Vec<String>) -> Result<Self> {
125        if paths.is_empty() {
126            return Err(TermError::Configuration(
127                "At least one path must be provided".to_string(),
128            ));
129        }
130
131        // Auto-detect format from first file
132        let format = JsonFormatType::from_path(&paths[0]);
133
134        Ok(Self {
135            paths,
136            options: JsonOptions {
137                format,
138                ..Default::default()
139            },
140            inferred_schema: None,
141        })
142    }
143
144    /// Creates a JSON source from a glob pattern.
145    pub async fn from_glob(pattern: impl Into<String>) -> Result<Self> {
146        let patterns = vec![pattern.into()];
147        let paths = super::expand_globs(&patterns).await?;
148        Self::from_paths(paths)
149    }
150
151    /// Creates a JSON source from multiple glob patterns.
152    pub async fn from_globs(patterns: Vec<String>) -> Result<Self> {
153        let paths = super::expand_globs(&patterns).await?;
154        Self::from_paths(paths)
155    }
156
157    /// Sets custom options for this JSON source.
158    pub fn with_custom_options(mut self, options: JsonOptions) -> Self {
159        self.options = options;
160        self
161    }
162
163    /// Infers schema from the JSON files.
164    #[instrument(skip(self))]
165    async fn infer_schema(&self) -> Result<Arc<Schema>> {
166        if let Some(schema) = &self.options.schema {
167            return Ok(schema.clone());
168        }
169
170        if let Some(schema) = &self.inferred_schema {
171            return Ok(schema.clone());
172        }
173
174        // Create a temporary context for schema inference
175        let ctx = SessionContext::new();
176
177        // For JSON, infer schema by reading the first file
178        let first_path = &self.paths[0];
179        let schema = if self.options.format == JsonFormatType::NdJson {
180            // For NDJSON, we need to handle different extensions
181            if first_path.ends_with(".json") {
182                let mut options = NdJsonReadOptions::default();
183                options.schema_infer_max_records = self.options.schema_infer_max_records;
184                let df = ctx.read_json(first_path, options).await?;
185                df.schema().inner().clone()
186            } else {
187                // For .ndjson or .jsonl files, try to read as NDJSON with read_json
188                let mut options = NdJsonReadOptions::default();
189                options.schema_infer_max_records = self.options.schema_infer_max_records;
190
191                // Try to read the file directly to infer schema
192                match ctx.read_json(first_path, options).await {
193                    Ok(df) => df.schema().inner().clone(),
194                    Err(_) => {
195                        // If that fails, fall back to creating a minimal schema
196                        // This is a workaround for the permission issue
197                        return Err(TermError::DataSource {
198                            source_type: "JSON".to_string(),
199                            message: "Unable to infer schema from NDJSON file. Please provide an explicit schema.".to_string(),
200                            source: None,
201                        });
202                    }
203                }
204            }
205        } else {
206            // For regular JSON, we need a different approach
207            // DataFusion doesn't have built-in regular JSON support
208            // So we'll just return an error for now
209            return Err(TermError::NotSupported(
210                "Regular JSON format is not yet supported. Please use NDJSON format.".to_string(),
211            ));
212        };
213
214        Ok(schema)
215    }
216}
217
218#[async_trait]
219impl DataSource for JsonSource {
220    #[instrument(skip(self, ctx, telemetry), fields(table_name = %table_name, source_type = "json", file_count = self.paths.len()))]
221    async fn register_with_telemetry(
222        &self,
223        ctx: &SessionContext,
224        table_name: &str,
225        telemetry: Option<&Arc<TermTelemetry>>,
226    ) -> Result<()> {
227        // Create telemetry span for data source loading
228        let mut _datasource_span = if let Some(tel) = telemetry {
229            tel.start_datasource_span("json", table_name)
230        } else {
231            TermSpan::noop()
232        };
233        // Handle single vs multiple paths
234        if self.paths.len() == 1 {
235            let path = &self.paths[0];
236
237            // For single NDJSON files with .json extension, use register_json
238            if path.ends_with(".json") && self.options.format == JsonFormatType::NdJson {
239                let mut options = NdJsonReadOptions::default();
240                options.schema = self.options.schema.as_deref();
241                options.schema_infer_max_records = self.options.schema_infer_max_records;
242
243                ctx.register_json(table_name, path, options).await?;
244            } else if path.ends_with(".ndjson") || path.ends_with(".jsonl") {
245                // For .ndjson/.jsonl files, create a ListingTable with the specific file
246                let table_path = ListingTableUrl::parse(path)?;
247
248                // Use the actual file extension
249                let extension = if path.ends_with(".ndjson") {
250                    ".ndjson"
251                } else {
252                    ".jsonl"
253                };
254
255                let format = JsonFormat::default();
256                let listing_options =
257                    ListingOptions::new(Arc::new(format)).with_file_extension(extension);
258
259                // Create config with schema if provided
260                let config = if let Some(schema) = &self.options.schema {
261                    ListingTableConfig::new(table_path)
262                        .with_listing_options(listing_options)
263                        .with_schema(schema.clone())
264                } else {
265                    ListingTableConfig::new(table_path)
266                        .with_listing_options(listing_options)
267                        .infer_schema(&ctx.state())
268                        .await?
269                };
270
271                let table = ListingTable::try_new(config)?;
272                ctx.register_table(table_name, Arc::new(table))?;
273            } else {
274                // For regular JSON files
275                return Err(TermError::NotSupported(
276                    "Regular JSON format is not yet supported. Please use NDJSON format."
277                        .to_string(),
278                ));
279            }
280        } else {
281            // Multiple files - register each file separately and create a union
282            // First, infer or get the schema
283            let schema = if let Some(schema) = &self.options.schema {
284                schema.clone()
285            } else {
286                // Infer schema from the first file
287                self.infer_schema().await?
288            };
289
290            // Register each file as a separate table
291            let mut table_names = Vec::new();
292            for (i, path) in self.paths.iter().enumerate() {
293                let temp_table_name = format!("__{table_name}_temp_{i}");
294
295                if path.ends_with(".json") && self.options.format == JsonFormatType::NdJson {
296                    let mut options = NdJsonReadOptions::default();
297                    options.schema = Some(&schema);
298                    options.schema_infer_max_records = self.options.schema_infer_max_records;
299                    ctx.register_json(&temp_table_name, path, options).await?;
300                } else if path.ends_with(".ndjson") || path.ends_with(".jsonl") {
301                    // For .ndjson/.jsonl files, create a ListingTable with the specific file
302                    let table_path = ListingTableUrl::parse(path)?;
303
304                    let extension = if path.ends_with(".ndjson") {
305                        ".ndjson"
306                    } else {
307                        ".jsonl"
308                    };
309
310                    let format = JsonFormat::default();
311                    let listing_options =
312                        ListingOptions::new(Arc::new(format)).with_file_extension(extension);
313
314                    let config = ListingTableConfig::new(table_path)
315                        .with_listing_options(listing_options)
316                        .with_schema(schema.clone());
317
318                    let table = ListingTable::try_new(config)?;
319                    ctx.register_table(&temp_table_name, Arc::new(table))?;
320                } else {
321                    return Err(TermError::NotSupported(
322                        "Regular JSON format is not yet supported. Please use NDJSON format."
323                            .to_string(),
324                    ));
325                }
326
327                table_names.push(temp_table_name);
328            }
329
330            // Create a union of all the temporary tables
331            if !table_names.is_empty() {
332                let union_sql = table_names
333                    .iter()
334                    .map(|name| format!("SELECT * FROM {name}"))
335                    .collect::<Vec<_>>()
336                    .join(" UNION ALL ");
337
338                let df = ctx.sql(&union_sql).await?;
339                ctx.register_table(table_name, df.into_view())?;
340
341                // Clean up temporary tables
342                for temp_name in table_names {
343                    ctx.deregister_table(&temp_name)?;
344                }
345            }
346        }
347
348        Ok(())
349    }
350
351    fn schema(&self) -> Option<&Arc<Schema>> {
352        self.options
353            .schema
354            .as_ref()
355            .or(self.inferred_schema.as_ref())
356    }
357
358    fn description(&self) -> String {
359        let format_str = match self.options.format {
360            JsonFormatType::NdJson => "NDJSON",
361            JsonFormatType::Json => "JSON",
362        };
363
364        if self.paths.len() == 1 {
365            let path = &self.paths[0];
366            format!("{format_str} file: {path}")
367        } else {
368            let count = self.paths.len();
369            format!("{format_str} files: {count} files")
370        }
371    }
372}
373
374#[cfg(test)]
375mod tests {
376    use super::*;
377    use std::io::Write;
378    use tempfile::NamedTempFile;
379
380    fn create_test_ndjson() -> NamedTempFile {
381        let mut file = NamedTempFile::with_suffix(".ndjson").unwrap();
382        writeln!(file, r#"{{"id": 1, "name": "Alice", "age": 30}}"#).unwrap();
383        writeln!(file, r#"{{"id": 2, "name": "Bob", "age": 25}}"#).unwrap();
384        writeln!(file, r#"{{"id": 3, "name": "Charlie", "age": 35}}"#).unwrap();
385        file.flush().unwrap();
386        file
387    }
388
389    fn create_test_json() -> NamedTempFile {
390        let mut file = NamedTempFile::with_suffix(".json").unwrap();
391        writeln!(
392            file,
393            r#"[
394            {{"id": 1, "name": "Alice", "age": 30}},
395            {{"id": 2, "name": "Bob", "age": 25}},
396            {{"id": 3, "name": "Charlie", "age": 35}}
397        ]"#
398        )
399        .unwrap();
400        file.flush().unwrap();
401        file
402    }
403
404    #[test]
405    fn test_format_detection() {
406        assert_eq!(JsonFormatType::from_path("data.json"), JsonFormatType::Json);
407        assert_eq!(
408            JsonFormatType::from_path("data.ndjson"),
409            JsonFormatType::NdJson
410        );
411        assert_eq!(
412            JsonFormatType::from_path("data.jsonl"),
413            JsonFormatType::NdJson
414        );
415        assert_eq!(
416            JsonFormatType::from_path("data.json.gz"),
417            JsonFormatType::Json
418        );
419        assert_eq!(
420            JsonFormatType::from_path("data.ndjson.gz"),
421            JsonFormatType::NdJson
422        );
423    }
424
425    #[tokio::test]
426    async fn test_json_source_single_file() {
427        let file = create_test_ndjson();
428        let source = JsonSource::new(file.path().to_str().unwrap()).unwrap();
429
430        assert_eq!(source.paths.len(), 1);
431        assert_eq!(source.options.format, JsonFormatType::NdJson);
432        assert!(source.description().contains("NDJSON file"));
433    }
434
435    #[tokio::test]
436    async fn test_json_source_with_options() {
437        let file = create_test_json();
438        let options = JsonOptions {
439            format: JsonFormatType::Json,
440            schema_infer_max_records: 500,
441            ..Default::default()
442        };
443
444        let source = JsonSource::with_options(file.path().to_str().unwrap(), options).unwrap();
445        assert_eq!(source.options.format, JsonFormatType::Json);
446        assert_eq!(source.options.schema_infer_max_records, 500);
447    }
448
449    #[tokio::test]
450    async fn test_json_source_multiple_files() {
451        let file1 = create_test_ndjson();
452        let file2 = create_test_ndjson();
453
454        let paths = vec![
455            file1.path().to_str().unwrap().to_string(),
456            file2.path().to_str().unwrap().to_string(),
457        ];
458
459        let source = JsonSource::from_paths(paths).unwrap();
460        assert_eq!(source.paths.len(), 2);
461        assert!(source.description().contains("2 files"));
462    }
463
464    #[tokio::test]
465    async fn test_ndjson_registration() {
466        use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
467
468        let file = create_test_ndjson();
469
470        // Provide schema since JsonFormat may not support inference in DataFusion 48.0
471        let schema = Arc::new(ArrowSchema::new(vec![
472            Field::new("id", DataType::Int64, false),
473            Field::new("name", DataType::Utf8, false),
474            Field::new("age", DataType::Int64, false),
475        ]));
476
477        let options = JsonOptions {
478            schema: Some(schema),
479            ..Default::default()
480        };
481
482        let source = JsonSource::with_options(file.path().to_str().unwrap(), options).unwrap();
483
484        let ctx = SessionContext::new();
485        source.register(&ctx, "test_table").await.unwrap();
486
487        // Verify table is registered
488        let df = ctx
489            .sql("SELECT COUNT(*) as count FROM test_table")
490            .await
491            .unwrap();
492        let batches = df.collect().await.unwrap();
493        assert!(!batches.is_empty());
494    }
495}