term-guard 0.0.2

A Rust data validation library providing Deequ-like capabilities without Spark dependencies
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
//! JSON and NDJSON file source implementation.

use super::{CompressionType, DataSource};
use crate::prelude::*;
use async_trait::async_trait;
use datafusion::arrow::datatypes::Schema;
use datafusion::datasource::file_format::json::JsonFormat;
use datafusion::datasource::listing::{
    ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
};
use datafusion::prelude::*;
use std::sync::Arc;
use tracing::instrument;

/// Format type for JSON files.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum JsonFormatType {
    /// Line-delimited JSON (one JSON object per line)
    NdJson,
    /// Regular JSON (single JSON object or array)
    Json,
}

impl JsonFormatType {
    /// Detects JSON format from file extension.
    pub fn from_path(path: &str) -> Self {
        let lower = path.to_lowercase();
        // Remove compression extensions first
        let without_compression =
            if lower.ends_with(".gz") || lower.ends_with(".zst") || lower.ends_with(".bz2") {
                &lower[..lower.rfind('.').unwrap_or(lower.len())]
            } else {
                &lower
            };

        if without_compression.ends_with(".ndjson") || without_compression.ends_with(".jsonl") {
            Self::NdJson
        } else {
            Self::Json
        }
    }
}

/// Options for configuring JSON file reading.
#[derive(Debug, Clone)]
pub struct JsonOptions {
    /// JSON format type
    pub format: JsonFormatType,
    /// Schema to use (if None, will be inferred)
    pub schema: Option<Arc<Schema>>,
    /// Compression type (default: Auto)
    pub compression: CompressionType,
    /// Maximum records to read for schema inference
    pub schema_infer_max_records: usize,
}

impl Default for JsonOptions {
    fn default() -> Self {
        Self {
            format: JsonFormatType::NdJson,
            schema: None,
            compression: CompressionType::Auto,
            schema_infer_max_records: 1000,
        }
    }
}

/// A JSON/NDJSON file data source with schema inference and compression support.
///
/// # Examples
///
/// ```rust,ignore
/// use term_guard::sources::{JsonSource, JsonOptions, JsonFormatType};
///
/// # async fn example() -> Result<()> {
/// // NDJSON file (auto-detected)
/// let source = JsonSource::new("data/events.ndjson")?;
///
/// // Regular JSON with custom options
/// let options = JsonOptions {
///     format: JsonFormatType::Json,
///     ..Default::default()
/// };
/// let source = JsonSource::with_options("data/config.json", options)?;
///
/// // Compressed NDJSON files with glob
/// let source = JsonSource::from_glob("logs/*.jsonl.gz").await?;
/// # Ok(())
/// # }
/// ```
#[derive(Debug, Clone)]
pub struct JsonSource {
    paths: Vec<String>,
    options: JsonOptions,
    inferred_schema: Option<Arc<Schema>>,
}

impl JsonSource {
    /// Creates a new JSON source from a single file path.
    pub fn new(path: impl Into<String>) -> Result<Self> {
        let path_str = path.into();
        let format = JsonFormatType::from_path(&path_str);

        Ok(Self {
            paths: vec![path_str],
            options: JsonOptions {
                format,
                ..Default::default()
            },
            inferred_schema: None,
        })
    }

    /// Creates a new JSON source with custom options.
    pub fn with_options(path: impl Into<String>, options: JsonOptions) -> Result<Self> {
        Ok(Self {
            paths: vec![path.into()],
            options,
            inferred_schema: None,
        })
    }

    /// Creates a JSON source from multiple file paths.
    pub fn from_paths(paths: Vec<String>) -> Result<Self> {
        if paths.is_empty() {
            return Err(TermError::Configuration(
                "At least one path must be provided".to_string(),
            ));
        }

        // Auto-detect format from first file
        let format = JsonFormatType::from_path(&paths[0]);

        Ok(Self {
            paths,
            options: JsonOptions {
                format,
                ..Default::default()
            },
            inferred_schema: None,
        })
    }

    /// Creates a JSON source from a glob pattern.
    pub async fn from_glob(pattern: impl Into<String>) -> Result<Self> {
        let patterns = vec![pattern.into()];
        let paths = super::expand_globs(&patterns).await?;
        Self::from_paths(paths)
    }

    /// Creates a JSON source from multiple glob patterns.
    pub async fn from_globs(patterns: Vec<String>) -> Result<Self> {
        let paths = super::expand_globs(&patterns).await?;
        Self::from_paths(paths)
    }

    /// Sets custom options for this JSON source.
    pub fn with_custom_options(mut self, options: JsonOptions) -> Self {
        self.options = options;
        self
    }

    /// Infers schema from the JSON files.
    #[instrument(skip(self))]
    async fn infer_schema(&self) -> Result<Arc<Schema>> {
        if let Some(schema) = &self.options.schema {
            return Ok(schema.clone());
        }

        if let Some(schema) = &self.inferred_schema {
            return Ok(schema.clone());
        }

        // Create a temporary context for schema inference
        let ctx = SessionContext::new();

        // For JSON, infer schema by reading the first file
        let first_path = &self.paths[0];
        let schema = if self.options.format == JsonFormatType::NdJson {
            // For NDJSON, we need to handle different extensions
            if first_path.ends_with(".json") {
                let mut options = NdJsonReadOptions::default();
                options.schema_infer_max_records = self.options.schema_infer_max_records;
                let df = ctx.read_json(first_path, options).await?;
                df.schema().inner().clone()
            } else {
                // For .ndjson or .jsonl files, try to read as NDJSON with read_json
                let mut options = NdJsonReadOptions::default();
                options.schema_infer_max_records = self.options.schema_infer_max_records;

                // Try to read the file directly to infer schema
                match ctx.read_json(first_path, options).await {
                    Ok(df) => df.schema().inner().clone(),
                    Err(_) => {
                        // If that fails, fall back to creating a minimal schema
                        // This is a workaround for the permission issue
                        return Err(TermError::DataSource {
                            source_type: "JSON".to_string(),
                            message: "Unable to infer schema from NDJSON file. Please provide an explicit schema.".to_string(),
                            source: None,
                        });
                    }
                }
            }
        } else {
            // For regular JSON, we need a different approach
            // DataFusion doesn't have built-in regular JSON support
            // So we'll just return an error for now
            return Err(TermError::NotSupported(
                "Regular JSON format is not yet supported. Please use NDJSON format.".to_string(),
            ));
        };

        Ok(schema)
    }
}

#[async_trait]
impl DataSource for JsonSource {
    #[instrument(skip(self, ctx, telemetry), fields(table_name = %table_name, source_type = "json", file_count = self.paths.len()))]
    async fn register_with_telemetry(
        &self,
        ctx: &SessionContext,
        table_name: &str,
        telemetry: Option<&Arc<TermTelemetry>>,
    ) -> Result<()> {
        // Create telemetry span for data source loading
        let mut _datasource_span = if let Some(tel) = telemetry {
            tel.start_datasource_span("json", table_name)
        } else {
            TermSpan::noop()
        };
        // Handle single vs multiple paths
        if self.paths.len() == 1 {
            let path = &self.paths[0];

            // For single NDJSON files with .json extension, use register_json
            if path.ends_with(".json") && self.options.format == JsonFormatType::NdJson {
                let mut options = NdJsonReadOptions::default();
                options.schema = self.options.schema.as_deref();
                options.schema_infer_max_records = self.options.schema_infer_max_records;

                ctx.register_json(table_name, path, options).await?;
            } else if path.ends_with(".ndjson") || path.ends_with(".jsonl") {
                // For .ndjson/.jsonl files, create a ListingTable with the specific file
                let table_path = ListingTableUrl::parse(path)?;

                // Use the actual file extension
                let extension = if path.ends_with(".ndjson") {
                    ".ndjson"
                } else {
                    ".jsonl"
                };

                let format = JsonFormat::default();
                let listing_options =
                    ListingOptions::new(Arc::new(format)).with_file_extension(extension);

                // Create config with schema if provided
                let config = if let Some(schema) = &self.options.schema {
                    ListingTableConfig::new(table_path)
                        .with_listing_options(listing_options)
                        .with_schema(schema.clone())
                } else {
                    ListingTableConfig::new(table_path)
                        .with_listing_options(listing_options)
                        .infer_schema(&ctx.state())
                        .await?
                };

                let table = ListingTable::try_new(config)?;
                ctx.register_table(table_name, Arc::new(table))?;
            } else {
                // For regular JSON files
                return Err(TermError::NotSupported(
                    "Regular JSON format is not yet supported. Please use NDJSON format."
                        .to_string(),
                ));
            }
        } else {
            // Multiple files - register each file separately and create a union
            // First, infer or get the schema
            let schema = if let Some(schema) = &self.options.schema {
                schema.clone()
            } else {
                // Infer schema from the first file
                self.infer_schema().await?
            };

            // Register each file as a separate table
            let mut table_names = Vec::new();
            for (i, path) in self.paths.iter().enumerate() {
                let temp_table_name = format!("__{table_name}_temp_{i}");

                if path.ends_with(".json") && self.options.format == JsonFormatType::NdJson {
                    let mut options = NdJsonReadOptions::default();
                    options.schema = Some(&schema);
                    options.schema_infer_max_records = self.options.schema_infer_max_records;
                    ctx.register_json(&temp_table_name, path, options).await?;
                } else if path.ends_with(".ndjson") || path.ends_with(".jsonl") {
                    // For .ndjson/.jsonl files, create a ListingTable with the specific file
                    let table_path = ListingTableUrl::parse(path)?;

                    let extension = if path.ends_with(".ndjson") {
                        ".ndjson"
                    } else {
                        ".jsonl"
                    };

                    let format = JsonFormat::default();
                    let listing_options =
                        ListingOptions::new(Arc::new(format)).with_file_extension(extension);

                    let config = ListingTableConfig::new(table_path)
                        .with_listing_options(listing_options)
                        .with_schema(schema.clone());

                    let table = ListingTable::try_new(config)?;
                    ctx.register_table(&temp_table_name, Arc::new(table))?;
                } else {
                    return Err(TermError::NotSupported(
                        "Regular JSON format is not yet supported. Please use NDJSON format."
                            .to_string(),
                    ));
                }

                table_names.push(temp_table_name);
            }

            // Create a union of all the temporary tables
            if !table_names.is_empty() {
                let union_sql = table_names
                    .iter()
                    .map(|name| format!("SELECT * FROM {name}"))
                    .collect::<Vec<_>>()
                    .join(" UNION ALL ");

                let df = ctx.sql(&union_sql).await?;
                ctx.register_table(table_name, df.into_view())?;

                // Clean up temporary tables
                for temp_name in table_names {
                    ctx.deregister_table(&temp_name)?;
                }
            }
        }

        Ok(())
    }

    fn schema(&self) -> Option<&Arc<Schema>> {
        self.options
            .schema
            .as_ref()
            .or(self.inferred_schema.as_ref())
    }

    fn description(&self) -> String {
        let format_str = match self.options.format {
            JsonFormatType::NdJson => "NDJSON",
            JsonFormatType::Json => "JSON",
        };

        if self.paths.len() == 1 {
            let path = &self.paths[0];
            format!("{format_str} file: {path}")
        } else {
            let count = self.paths.len();
            format!("{format_str} files: {count} files")
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::io::Write;
    use tempfile::NamedTempFile;

    fn create_test_ndjson() -> NamedTempFile {
        let mut file = NamedTempFile::with_suffix(".ndjson").unwrap();
        writeln!(file, r#"{{"id": 1, "name": "Alice", "age": 30}}"#).unwrap();
        writeln!(file, r#"{{"id": 2, "name": "Bob", "age": 25}}"#).unwrap();
        writeln!(file, r#"{{"id": 3, "name": "Charlie", "age": 35}}"#).unwrap();
        file.flush().unwrap();
        file
    }

    fn create_test_json() -> NamedTempFile {
        let mut file = NamedTempFile::with_suffix(".json").unwrap();
        writeln!(
            file,
            r#"[
            {{"id": 1, "name": "Alice", "age": 30}},
            {{"id": 2, "name": "Bob", "age": 25}},
            {{"id": 3, "name": "Charlie", "age": 35}}
        ]"#
        )
        .unwrap();
        file.flush().unwrap();
        file
    }

    #[test]
    fn test_format_detection() {
        assert_eq!(JsonFormatType::from_path("data.json"), JsonFormatType::Json);
        assert_eq!(
            JsonFormatType::from_path("data.ndjson"),
            JsonFormatType::NdJson
        );
        assert_eq!(
            JsonFormatType::from_path("data.jsonl"),
            JsonFormatType::NdJson
        );
        assert_eq!(
            JsonFormatType::from_path("data.json.gz"),
            JsonFormatType::Json
        );
        assert_eq!(
            JsonFormatType::from_path("data.ndjson.gz"),
            JsonFormatType::NdJson
        );
    }

    #[tokio::test]
    async fn test_json_source_single_file() {
        let file = create_test_ndjson();
        let source = JsonSource::new(file.path().to_str().unwrap()).unwrap();

        assert_eq!(source.paths.len(), 1);
        assert_eq!(source.options.format, JsonFormatType::NdJson);
        assert!(source.description().contains("NDJSON file"));
    }

    #[tokio::test]
    async fn test_json_source_with_options() {
        let file = create_test_json();
        let options = JsonOptions {
            format: JsonFormatType::Json,
            schema_infer_max_records: 500,
            ..Default::default()
        };

        let source = JsonSource::with_options(file.path().to_str().unwrap(), options).unwrap();
        assert_eq!(source.options.format, JsonFormatType::Json);
        assert_eq!(source.options.schema_infer_max_records, 500);
    }

    #[tokio::test]
    async fn test_json_source_multiple_files() {
        let file1 = create_test_ndjson();
        let file2 = create_test_ndjson();

        let paths = vec![
            file1.path().to_str().unwrap().to_string(),
            file2.path().to_str().unwrap().to_string(),
        ];

        let source = JsonSource::from_paths(paths).unwrap();
        assert_eq!(source.paths.len(), 2);
        assert!(source.description().contains("2 files"));
    }

    #[tokio::test]
    async fn test_ndjson_registration() {
        use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};

        let file = create_test_ndjson();

        // Provide schema since JsonFormat may not support inference in DataFusion 48.0
        let schema = Arc::new(ArrowSchema::new(vec![
            Field::new("id", DataType::Int64, false),
            Field::new("name", DataType::Utf8, false),
            Field::new("age", DataType::Int64, false),
        ]));

        let options = JsonOptions {
            schema: Some(schema),
            ..Default::default()
        };

        let source = JsonSource::with_options(file.path().to_str().unwrap(), options).unwrap();

        let ctx = SessionContext::new();
        source.register(&ctx, "test_table").await.unwrap();

        // Verify table is registered
        let df = ctx
            .sql("SELECT COUNT(*) as count FROM test_table")
            .await
            .unwrap();
        let batches = df.collect().await.unwrap();
        assert!(!batches.is_empty());
    }
}