bq_schema_gen/schema/
generator.rs

1//! Schema generator - the core logic for deducing BigQuery schemas.
2//!
3//! This module contains the `SchemaGenerator` struct which processes
4//! JSON/CSV records and builds a BigQuery-compatible schema.
5
6use once_cell::sync::Lazy;
7use regex::Regex;
8
9use crate::error::{Error, ErrorLog, Result};
10use crate::inference::{convert_type, infer_bigquery_type};
11use crate::schema::types::{BqMode, BqSchemaField, BqType, EntryStatus, SchemaEntry, SchemaMap};
12
13/// Valid BigQuery field name pattern.
14static FIELD_NAME_SANITIZER: Lazy<Regex> = Lazy::new(|| Regex::new(r"[^a-zA-Z0-9_]").unwrap());
15
16/// Configuration options for schema generation.
17#[derive(Debug, Clone)]
18pub struct GeneratorConfig {
19    /// Input format: "json", "csv"
20    pub input_format: InputFormat,
21    /// Infer REQUIRED mode for CSV fields that are always filled
22    pub infer_mode: bool,
23    /// Keep null/empty fields in output schema
24    pub keep_nulls: bool,
25    /// Treat quoted values as strings (don't infer types)
26    pub quoted_values_are_strings: bool,
27    /// Sanitize field names for BigQuery compatibility
28    pub sanitize_names: bool,
29    /// Preserve input field order instead of sorting alphabetically
30    pub preserve_input_sort_order: bool,
31}
32
33impl Default for GeneratorConfig {
34    fn default() -> Self {
35        Self {
36            input_format: InputFormat::Json,
37            infer_mode: false,
38            keep_nulls: false,
39            quoted_values_are_strings: false,
40            sanitize_names: false,
41            preserve_input_sort_order: false,
42        }
43    }
44}
45
46/// Input format for the schema generator.
47#[derive(Debug, Clone, Copy, PartialEq, Eq)]
48pub enum InputFormat {
49    Json,
50    Csv,
51}
52
53/// Schema generator that processes records and builds a BigQuery schema.
54pub struct SchemaGenerator {
55    config: GeneratorConfig,
56    line_number: usize,
57    error_logs: Vec<ErrorLog>,
58}
59
60impl SchemaGenerator {
61    /// Create a new schema generator with the given configuration.
62    ///
63    /// Note: For CSV input format, `keep_nulls` is automatically set to `true`
64    /// to ensure positional column matching works correctly.
65    pub fn new(mut config: GeneratorConfig) -> Self {
66        // CSV requires keep_nulls to be true (matches Python behavior)
67        if config.input_format == InputFormat::Csv {
68            config.keep_nulls = true;
69        }
70
71        Self {
72            config,
73            line_number: 0,
74            error_logs: Vec::new(),
75        }
76    }
77
78    /// Create a new schema generator with default configuration.
79    pub fn default_config() -> Self {
80        Self::new(GeneratorConfig::default())
81    }
82
83    /// Get the current line number.
84    pub fn line_number(&self) -> usize {
85        self.line_number
86    }
87
88    /// Get the error logs.
89    pub fn error_logs(&self) -> &[ErrorLog] {
90        &self.error_logs
91    }
92
93    /// Log an error at the current line.
94    fn log_error(&mut self, msg: String) {
95        self.error_logs.push(ErrorLog {
96            line_number: self.line_number,
97            msg,
98        });
99    }
100
101    /// Process a single JSON record and update the schema map.
102    pub fn process_record(
103        &mut self,
104        record: &serde_json::Value,
105        schema_map: &mut SchemaMap,
106    ) -> Result<()> {
107        self.line_number += 1;
108
109        match record {
110            serde_json::Value::Object(obj) => {
111                self.deduce_schema_for_record(obj, schema_map, None);
112                Ok(())
113            }
114            _ => {
115                let msg = format!(
116                    "Record should be a JSON Object but was a {:?}",
117                    json_type_name(record)
118                );
119                self.log_error(msg.clone());
120                Err(Error::InvalidRecord(msg))
121            }
122        }
123    }
124
125    /// Deduce schema for a single record (JSON object).
126    fn deduce_schema_for_record(
127        &mut self,
128        obj: &serde_json::Map<String, serde_json::Value>,
129        schema_map: &mut SchemaMap,
130        base_path: Option<&str>,
131    ) {
132        for (key, value) in obj {
133            let sanitized_key = self.sanitize_name(key);
134            let canonical_key = sanitized_key.to_lowercase();
135
136            let new_entry = match self.get_schema_entry(&sanitized_key, value, base_path) {
137                Some(entry) => entry,
138                None => continue, // Unsupported type, skip
139            };
140
141            // Check if entry exists - if so, merge in place to preserve order
142            if let Some(existing_entry) = schema_map.get(&canonical_key).cloned() {
143                let merged_entry =
144                    self.merge_schema_entry(Some(existing_entry), new_entry, base_path);
145
146                if let Some(entry) = merged_entry {
147                    // Update in place to preserve order
148                    if let Some(slot) = schema_map.get_mut(&canonical_key) {
149                        *slot = entry;
150                    }
151                } else {
152                    // Remove if merge resulted in None (shouldn't happen normally)
153                    schema_map.shift_remove(&canonical_key);
154                }
155            } else {
156                // New field - just insert
157                let merged_entry = self.merge_schema_entry(None, new_entry, base_path);
158                if let Some(entry) = merged_entry {
159                    schema_map.insert(canonical_key, entry);
160                }
161            }
162        }
163    }
164
165    /// Sanitize a field name for BigQuery compatibility.
166    fn sanitize_name(&self, name: &str) -> String {
167        if self.config.sanitize_names {
168            let sanitized = FIELD_NAME_SANITIZER.replace_all(name, "_");
169            // Truncate to 128 characters (BigQuery limit)
170            if sanitized.len() > 128 {
171                sanitized[..128].to_string()
172            } else {
173                sanitized.into_owned()
174            }
175        } else {
176            name.to_string()
177        }
178    }
179
180    /// Get a schema entry for a key-value pair.
181    fn get_schema_entry(
182        &mut self,
183        key: &str,
184        value: &serde_json::Value,
185        base_path: Option<&str>,
186    ) -> Option<SchemaEntry> {
187        let result = infer_bigquery_type(value, self.config.quoted_values_are_strings);
188
189        let (mode, bq_type) = match result {
190            Some(r) => r,
191            None => {
192                // Log error for unsupported types
193                if let serde_json::Value::Array(arr) = value {
194                    // Check what kind of array error
195                    if arr.iter().any(|v| matches!(v, serde_json::Value::Array(_))) {
196                        if arr
197                            .iter()
198                            .all(|v| matches!(v, serde_json::Value::Array(a) if a.is_empty()))
199                        {
200                            self.log_error(
201                                "Unsupported array element type: __empty_array__".to_string(),
202                            );
203                        } else {
204                            self.log_error("Unsupported array element type: __array__".to_string());
205                        }
206                    } else {
207                        self.log_error(format!(
208                            "All array elements must be the same compatible type: {:?}",
209                            arr
210                        ));
211                    }
212                }
213                return None;
214            }
215        };
216
217        match &bq_type {
218            BqType::Record(_) => {
219                // Recursively process nested record
220                let new_base_path = json_full_path(base_path, key);
221                let mut fields = SchemaMap::new();
222
223                if mode == BqMode::Nullable {
224                    // Single object
225                    if let serde_json::Value::Object(obj) = value {
226                        self.deduce_schema_for_record(obj, &mut fields, Some(&new_base_path));
227                    }
228                } else {
229                    // Array of objects (REPEATED)
230                    if let serde_json::Value::Array(arr) = value {
231                        for item in arr {
232                            if let serde_json::Value::Object(obj) = item {
233                                self.deduce_schema_for_record(
234                                    obj,
235                                    &mut fields,
236                                    Some(&new_base_path),
237                                );
238                            }
239                        }
240                    }
241                }
242
243                Some(SchemaEntry {
244                    status: EntryStatus::Hard,
245                    filled: true,
246                    name: key.to_string(),
247                    bq_type: BqType::Record(fields),
248                    mode,
249                })
250            }
251            BqType::Null => Some(SchemaEntry {
252                status: EntryStatus::Soft,
253                filled: false,
254                name: key.to_string(),
255                bq_type: BqType::String,
256                mode: BqMode::Nullable,
257            }),
258            BqType::EmptyArray => Some(SchemaEntry {
259                status: EntryStatus::Soft,
260                filled: false,
261                name: key.to_string(),
262                bq_type: BqType::String,
263                mode: BqMode::Repeated,
264            }),
265            BqType::EmptyRecord => Some(SchemaEntry {
266                status: EntryStatus::Soft,
267                filled: false,
268                name: key.to_string(),
269                bq_type: BqType::Record(SchemaMap::new()),
270                mode,
271            }),
272            _ => {
273                // Check for empty string in CSV mode
274                let (status, filled) = if self.config.input_format == InputFormat::Csv {
275                    if let serde_json::Value::String(s) = value {
276                        if s.is_empty() {
277                            (EntryStatus::Soft, false)
278                        } else {
279                            (EntryStatus::Hard, true)
280                        }
281                    } else {
282                        (EntryStatus::Hard, true)
283                    }
284                } else {
285                    (EntryStatus::Hard, true)
286                };
287
288                Some(SchemaEntry {
289                    status,
290                    filled,
291                    name: key.to_string(),
292                    bq_type,
293                    mode,
294                })
295            }
296        }
297    }
298
299    /// Merge a new schema entry with an existing one.
300    fn merge_schema_entry(
301        &mut self,
302        old_entry: Option<SchemaEntry>,
303        new_entry: SchemaEntry,
304        base_path: Option<&str>,
305    ) -> Option<SchemaEntry> {
306        let mut old_entry = match old_entry {
307            Some(e) => e,
308            None => return Some(new_entry),
309        };
310
311        // Track filled status
312        if !new_entry.filled || !old_entry.filled {
313            old_entry.filled = false;
314        }
315
316        // If old was ignored, keep ignoring
317        if old_entry.status == EntryStatus::Ignore {
318            return Some(old_entry);
319        }
320
321        // Hard -> Soft: keep old hard
322        if old_entry.status == EntryStatus::Hard && new_entry.status == EntryStatus::Soft {
323            if let Some(mode) = self.merge_mode(&old_entry, &new_entry, base_path) {
324                old_entry.mode = mode;
325                return Some(old_entry);
326            } else {
327                old_entry.status = EntryStatus::Ignore;
328                return Some(old_entry);
329            }
330        }
331
332        // Soft -> Hard: use new hard
333        if old_entry.status == EntryStatus::Soft && new_entry.status == EntryStatus::Hard {
334            let mut result = new_entry;
335            result.filled = old_entry.filled;
336            if let Some(mode) = self.merge_mode(&old_entry, &result, base_path) {
337                result.mode = mode;
338                return Some(result);
339            } else {
340                old_entry.status = EntryStatus::Ignore;
341                return Some(old_entry);
342            }
343        }
344
345        // Same status - merge types
346        let old_type = &old_entry.bq_type;
347        let new_type = &new_entry.bq_type;
348
349        // Handle RECORD + RECORD merging
350        if let (BqType::Record(old_fields), BqType::Record(new_fields)) = (old_type, new_type) {
351            // Allow NULLABLE RECORD -> REPEATED RECORD
352            if old_entry.mode == BqMode::Nullable && new_entry.mode == BqMode::Repeated {
353                let full_name = json_full_path(base_path, &old_entry.name);
354                self.log_error(format!(
355                    "Converting schema for \"{}\" from NULLABLE RECORD into REPEATED RECORD",
356                    full_name
357                ));
358                old_entry.mode = BqMode::Repeated;
359            } else if old_entry.mode == BqMode::Repeated && new_entry.mode == BqMode::Nullable {
360                let full_name = json_full_path(base_path, &old_entry.name);
361                self.log_error(format!(
362                    "Leaving schema for \"{}\" as REPEATED RECORD",
363                    full_name
364                ));
365            }
366
367            // Merge the record fields
368            let mut merged_fields = old_fields.clone();
369            let new_base_path = json_full_path(base_path, &old_entry.name);
370
371            for (key, new_field_entry) in new_fields {
372                if let Some(existing) = merged_fields.get(key).cloned() {
373                    // Update existing field in place to preserve order
374                    if let Some(merged) = self.merge_schema_entry(
375                        Some(existing),
376                        new_field_entry.clone(),
377                        Some(&new_base_path),
378                    ) {
379                        if let Some(slot) = merged_fields.get_mut(key) {
380                            *slot = merged;
381                        }
382                    } else {
383                        merged_fields.shift_remove(key);
384                    }
385                } else {
386                    // New field - insert at end
387                    if let Some(merged) =
388                        self.merge_schema_entry(None, new_field_entry.clone(), Some(&new_base_path))
389                    {
390                        merged_fields.insert(key.clone(), merged);
391                    }
392                }
393            }
394
395            old_entry.bq_type = BqType::Record(merged_fields);
396            return Some(old_entry);
397        }
398
399        // Merge mode
400        let merged_mode = match self.merge_mode(&old_entry, &new_entry, base_path) {
401            Some(m) => m,
402            None => {
403                old_entry.status = EntryStatus::Ignore;
404                return Some(old_entry);
405            }
406        };
407
408        // Merge types
409        if old_type != new_type {
410            match convert_type(old_type, new_type) {
411                Some(converted) => {
412                    old_entry.bq_type = converted;
413                    old_entry.mode = merged_mode;
414                    Some(old_entry)
415                }
416                None => {
417                    let full_old_name = json_full_path(base_path, &old_entry.name);
418                    let full_new_name = json_full_path(base_path, &new_entry.name);
419                    self.log_error(format!(
420                        "Ignoring field with mismatched type: old=({:?},{},{},{:?}); new=({:?},{},{},{:?})",
421                        old_entry.status, full_old_name, old_entry.mode, old_entry.bq_type,
422                        new_entry.status, full_new_name, new_entry.mode, new_entry.bq_type
423                    ));
424                    old_entry.status = EntryStatus::Ignore;
425                    Some(old_entry)
426                }
427            }
428        } else {
429            old_entry.mode = merged_mode;
430            Some(old_entry)
431        }
432    }
433
434    /// Merge field modes, returning None if incompatible.
435    fn merge_mode(
436        &mut self,
437        old_entry: &SchemaEntry,
438        new_entry: &SchemaEntry,
439        base_path: Option<&str>,
440    ) -> Option<BqMode> {
441        let old_mode = old_entry.mode;
442        let new_mode = new_entry.mode;
443
444        // Same mode - no change needed
445        if old_mode == new_mode {
446            return Some(old_mode);
447        }
448
449        let full_old_name = json_full_path(base_path, &old_entry.name);
450        let full_new_name = json_full_path(base_path, &new_entry.name);
451
452        // REQUIRED -> NULLABLE transition
453        if old_mode == BqMode::Required && new_mode == BqMode::Nullable {
454            if new_entry.filled {
455                return Some(old_mode); // Keep REQUIRED
456            } else if self.config.infer_mode {
457                return Some(new_mode); // Allow relaxation
458            } else {
459                self.log_error(format!(
460                    "Ignoring non-RECORD field with mismatched mode. Cannot convert to NULLABLE because infer_schema not set: old=({:?},{},{},{:?}); new=({:?},{},{},{:?})",
461                    old_entry.status, full_old_name, old_mode, old_entry.bq_type,
462                    new_entry.status, full_new_name, new_mode, new_entry.bq_type
463                ));
464                return None;
465            }
466        }
467
468        // NULLABLE(soft) -> REPEATED(hard)
469        if old_mode == BqMode::Nullable && new_mode == BqMode::Repeated {
470            if old_entry.status == EntryStatus::Soft && new_entry.status == EntryStatus::Hard {
471                return Some(new_mode);
472            }
473            self.log_error(format!(
474                "Cannot convert NULLABLE(hard) -> REPEATED: old=({:?},{},{},{:?}); new=({:?},{},{},{:?})",
475                old_entry.status, full_old_name, old_mode, old_entry.bq_type,
476                new_entry.status, full_new_name, new_mode, new_entry.bq_type
477            ));
478            return None;
479        }
480
481        // REPEATED -> NULLABLE(soft): keep REPEATED
482        if old_mode == BqMode::Repeated && new_mode == BqMode::Nullable {
483            if old_entry.status == EntryStatus::Hard && new_entry.status == EntryStatus::Soft {
484                return Some(old_mode);
485            }
486            self.log_error(format!(
487                "Cannot convert REPEATED -> NULLABLE(hard): old=({:?},{},{},{:?}); new=({:?},{},{},{:?})",
488                old_entry.status, full_old_name, old_mode, old_entry.bq_type,
489                new_entry.status, full_new_name, new_mode, new_entry.bq_type
490            ));
491            return None;
492        }
493
494        // Other mode mismatches
495        self.log_error(format!(
496            "Ignoring non-RECORD field with mismatched mode: old=({:?},{},{},{:?}); new=({:?},{},{},{:?})",
497            old_entry.status, full_old_name, old_mode, old_entry.bq_type,
498            new_entry.status, full_new_name, new_mode, new_entry.bq_type
499        ));
500        None
501    }
502
503    /// Convert the schema map to BigQuery JSON schema format.
504    pub fn flatten_schema(&self, schema_map: &SchemaMap) -> Vec<BqSchemaField> {
505        self.flatten_schema_map(schema_map)
506    }
507
508    fn flatten_schema_map(&self, schema_map: &SchemaMap) -> Vec<BqSchemaField> {
509        let mut result = Vec::new();
510
511        // Get items, optionally sorted
512        let items: Vec<_> = if self.config.preserve_input_sort_order
513            || self.config.input_format == InputFormat::Csv
514        {
515            schema_map.iter().collect()
516        } else {
517            let mut items: Vec<_> = schema_map.iter().collect();
518            items.sort_by(|a, b| a.0.cmp(b.0));
519            items
520        };
521
522        for (_canonical_name, entry) in items {
523            // Skip ignored entries
524            if entry.status == EntryStatus::Ignore {
525                continue;
526            }
527
528            // Skip soft entries unless keep_nulls is enabled
529            if entry.status == EntryStatus::Soft && !self.config.keep_nulls {
530                continue;
531            }
532
533            let mode = self.determine_output_mode(entry);
534            let field = self.entry_to_schema_field(entry, mode);
535            result.push(field);
536        }
537
538        result
539    }
540
541    fn determine_output_mode(&self, entry: &SchemaEntry) -> BqMode {
542        // Infer REQUIRED mode for CSV with infer_mode enabled
543        if self.config.infer_mode
544            && self.config.input_format == InputFormat::Csv
545            && entry.mode == BqMode::Nullable
546            && entry.filled
547        {
548            BqMode::Required
549        } else {
550            entry.mode
551        }
552    }
553
554    fn entry_to_schema_field(&self, entry: &SchemaEntry, mode: BqMode) -> BqSchemaField {
555        match &entry.bq_type {
556            BqType::Record(fields) => {
557                let nested_fields = if fields.is_empty() {
558                    // Empty record needs a placeholder field for BigQuery
559                    vec![BqSchemaField::new(
560                        "__unknown__".to_string(),
561                        "STRING".to_string(),
562                        "NULLABLE".to_string(),
563                    )]
564                } else {
565                    self.flatten_schema_map(fields)
566                };
567                BqSchemaField::record(entry.name.clone(), mode.as_str().to_string(), nested_fields)
568            }
569            _ => BqSchemaField::new(
570                entry.name.clone(),
571                entry.bq_type.as_str().to_string(),
572                mode.as_str().to_string(),
573            ),
574        }
575    }
576}
577
578/// Get the JSON type name for error messages.
579fn json_type_name(value: &serde_json::Value) -> &'static str {
580    match value {
581        serde_json::Value::Null => "null",
582        serde_json::Value::Bool(_) => "boolean",
583        serde_json::Value::Number(_) => "number",
584        serde_json::Value::String(_) => "string",
585        serde_json::Value::Array(_) => "array",
586        serde_json::Value::Object(_) => "object",
587    }
588}
589
590/// Build full JSON path for nested fields.
591fn json_full_path(base_path: Option<&str>, key: &str) -> String {
592    match base_path {
593        Some(base) if !base.is_empty() => format!("{}.{}", base, key),
594        _ => key.to_string(),
595    }
596}
597
598#[cfg(test)]
599mod tests {
600    use super::*;
601    use serde_json::json;
602
603    #[test]
604    fn test_simple_schema() {
605        let mut generator = SchemaGenerator::default_config();
606        let mut schema_map = SchemaMap::new();
607
608        let record = json!({"name": "test", "count": 42, "active": true});
609        generator.process_record(&record, &mut schema_map).unwrap();
610
611        let schema = generator.flatten_schema(&schema_map);
612        assert_eq!(schema.len(), 3);
613    }
614
615    #[test]
616    fn test_nested_record() {
617        let mut generator = SchemaGenerator::default_config();
618        let mut schema_map = SchemaMap::new();
619
620        let record = json!({
621            "user": {
622                "name": "test",
623                "age": 25
624            }
625        });
626        generator.process_record(&record, &mut schema_map).unwrap();
627
628        let schema = generator.flatten_schema(&schema_map);
629        assert_eq!(schema.len(), 1);
630        assert_eq!(schema[0].field_type, "RECORD");
631        assert!(schema[0].fields.is_some());
632    }
633
634    #[test]
635    fn test_array_type() {
636        let mut generator = SchemaGenerator::default_config();
637        let mut schema_map = SchemaMap::new();
638
639        let record = json!({"tags": ["a", "b", "c"]});
640        generator.process_record(&record, &mut schema_map).unwrap();
641
642        let schema = generator.flatten_schema(&schema_map);
643        assert_eq!(schema.len(), 1);
644        assert_eq!(schema[0].mode, "REPEATED");
645        assert_eq!(schema[0].field_type, "STRING");
646    }
647
648    #[test]
649    fn test_type_coercion() {
650        let mut generator = SchemaGenerator::default_config();
651        let mut schema_map = SchemaMap::new();
652
653        // First record has integer
654        let record1 = json!({"value": 42});
655        generator.process_record(&record1, &mut schema_map).unwrap();
656
657        // Second record has float
658        let record2 = json!({"value": 3.5});
659        generator.process_record(&record2, &mut schema_map).unwrap();
660
661        let schema = generator.flatten_schema(&schema_map);
662        assert_eq!(schema.len(), 1);
663        assert_eq!(schema[0].field_type, "FLOAT");
664    }
665
666    #[test]
667    fn test_sanitize_names() {
668        let config = GeneratorConfig {
669            sanitize_names: true,
670            ..Default::default()
671        };
672        let mut generator = SchemaGenerator::new(config);
673        let mut schema_map = SchemaMap::new();
674
675        let record = json!({"field-name": "test", "field.with.dots": 42});
676        generator.process_record(&record, &mut schema_map).unwrap();
677
678        let schema = generator.flatten_schema(&schema_map);
679        assert_eq!(schema.len(), 2);
680        // Check that names are sanitized
681        for field in &schema {
682            assert!(!field.name.contains('-'));
683            assert!(!field.name.contains('.'));
684        }
685    }
686}