1use once_cell::sync::Lazy;
7use regex::Regex;
8
9use crate::error::{Error, ErrorLog, Result};
10use crate::inference::{convert_type, infer_bigquery_type};
11use crate::schema::types::{BqMode, BqSchemaField, BqType, EntryStatus, SchemaEntry, SchemaMap};
12
13static FIELD_NAME_SANITIZER: Lazy<Regex> = Lazy::new(|| Regex::new(r"[^a-zA-Z0-9_]").unwrap());
15
16#[derive(Debug, Clone)]
18pub struct GeneratorConfig {
19 pub input_format: InputFormat,
21 pub infer_mode: bool,
23 pub keep_nulls: bool,
25 pub quoted_values_are_strings: bool,
27 pub sanitize_names: bool,
29 pub preserve_input_sort_order: bool,
31}
32
33impl Default for GeneratorConfig {
34 fn default() -> Self {
35 Self {
36 input_format: InputFormat::Json,
37 infer_mode: false,
38 keep_nulls: false,
39 quoted_values_are_strings: false,
40 sanitize_names: false,
41 preserve_input_sort_order: false,
42 }
43 }
44}
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
48pub enum InputFormat {
49 Json,
50 Csv,
51}
52
53pub struct SchemaGenerator {
55 config: GeneratorConfig,
56 line_number: usize,
57 error_logs: Vec<ErrorLog>,
58}
59
60impl SchemaGenerator {
61 pub fn new(mut config: GeneratorConfig) -> Self {
66 if config.input_format == InputFormat::Csv {
68 config.keep_nulls = true;
69 }
70
71 Self {
72 config,
73 line_number: 0,
74 error_logs: Vec::new(),
75 }
76 }
77
78 pub fn default_config() -> Self {
80 Self::new(GeneratorConfig::default())
81 }
82
83 pub fn line_number(&self) -> usize {
85 self.line_number
86 }
87
88 pub fn error_logs(&self) -> &[ErrorLog] {
90 &self.error_logs
91 }
92
93 fn log_error(&mut self, msg: String) {
95 self.error_logs.push(ErrorLog {
96 line_number: self.line_number,
97 msg,
98 });
99 }
100
101 pub fn process_record(
103 &mut self,
104 record: &serde_json::Value,
105 schema_map: &mut SchemaMap,
106 ) -> Result<()> {
107 self.line_number += 1;
108
109 match record {
110 serde_json::Value::Object(obj) => {
111 self.deduce_schema_for_record(obj, schema_map, None);
112 Ok(())
113 }
114 _ => {
115 let msg = format!(
116 "Record should be a JSON Object but was a {:?}",
117 json_type_name(record)
118 );
119 self.log_error(msg.clone());
120 Err(Error::InvalidRecord(msg))
121 }
122 }
123 }
124
125 fn deduce_schema_for_record(
127 &mut self,
128 obj: &serde_json::Map<String, serde_json::Value>,
129 schema_map: &mut SchemaMap,
130 base_path: Option<&str>,
131 ) {
132 for (key, value) in obj {
133 let sanitized_key = self.sanitize_name(key);
134 let canonical_key = sanitized_key.to_lowercase();
135
136 let new_entry = match self.get_schema_entry(&sanitized_key, value, base_path) {
137 Some(entry) => entry,
138 None => continue, };
140
141 if let Some(existing_entry) = schema_map.get(&canonical_key).cloned() {
143 let merged_entry =
144 self.merge_schema_entry(Some(existing_entry), new_entry, base_path);
145
146 if let Some(entry) = merged_entry {
147 if let Some(slot) = schema_map.get_mut(&canonical_key) {
149 *slot = entry;
150 }
151 } else {
152 schema_map.shift_remove(&canonical_key);
154 }
155 } else {
156 let merged_entry = self.merge_schema_entry(None, new_entry, base_path);
158 if let Some(entry) = merged_entry {
159 schema_map.insert(canonical_key, entry);
160 }
161 }
162 }
163 }
164
165 fn sanitize_name(&self, name: &str) -> String {
167 if self.config.sanitize_names {
168 let sanitized = FIELD_NAME_SANITIZER.replace_all(name, "_");
169 if sanitized.len() > 128 {
171 sanitized[..128].to_string()
172 } else {
173 sanitized.into_owned()
174 }
175 } else {
176 name.to_string()
177 }
178 }
179
180 fn get_schema_entry(
182 &mut self,
183 key: &str,
184 value: &serde_json::Value,
185 base_path: Option<&str>,
186 ) -> Option<SchemaEntry> {
187 let result = infer_bigquery_type(value, self.config.quoted_values_are_strings);
188
189 let (mode, bq_type) = match result {
190 Some(r) => r,
191 None => {
192 if let serde_json::Value::Array(arr) = value {
194 if arr.iter().any(|v| matches!(v, serde_json::Value::Array(_))) {
196 if arr
197 .iter()
198 .all(|v| matches!(v, serde_json::Value::Array(a) if a.is_empty()))
199 {
200 self.log_error(
201 "Unsupported array element type: __empty_array__".to_string(),
202 );
203 } else {
204 self.log_error("Unsupported array element type: __array__".to_string());
205 }
206 } else {
207 self.log_error(format!(
208 "All array elements must be the same compatible type: {:?}",
209 arr
210 ));
211 }
212 }
213 return None;
214 }
215 };
216
217 match &bq_type {
218 BqType::Record(_) => {
219 let new_base_path = json_full_path(base_path, key);
221 let mut fields = SchemaMap::new();
222
223 if mode == BqMode::Nullable {
224 if let serde_json::Value::Object(obj) = value {
226 self.deduce_schema_for_record(obj, &mut fields, Some(&new_base_path));
227 }
228 } else {
229 if let serde_json::Value::Array(arr) = value {
231 for item in arr {
232 if let serde_json::Value::Object(obj) = item {
233 self.deduce_schema_for_record(
234 obj,
235 &mut fields,
236 Some(&new_base_path),
237 );
238 }
239 }
240 }
241 }
242
243 Some(SchemaEntry {
244 status: EntryStatus::Hard,
245 filled: true,
246 name: key.to_string(),
247 bq_type: BqType::Record(fields),
248 mode,
249 })
250 }
251 BqType::Null => Some(SchemaEntry {
252 status: EntryStatus::Soft,
253 filled: false,
254 name: key.to_string(),
255 bq_type: BqType::String,
256 mode: BqMode::Nullable,
257 }),
258 BqType::EmptyArray => Some(SchemaEntry {
259 status: EntryStatus::Soft,
260 filled: false,
261 name: key.to_string(),
262 bq_type: BqType::String,
263 mode: BqMode::Repeated,
264 }),
265 BqType::EmptyRecord => Some(SchemaEntry {
266 status: EntryStatus::Soft,
267 filled: false,
268 name: key.to_string(),
269 bq_type: BqType::Record(SchemaMap::new()),
270 mode,
271 }),
272 _ => {
273 let (status, filled) = if self.config.input_format == InputFormat::Csv {
275 if let serde_json::Value::String(s) = value {
276 if s.is_empty() {
277 (EntryStatus::Soft, false)
278 } else {
279 (EntryStatus::Hard, true)
280 }
281 } else {
282 (EntryStatus::Hard, true)
283 }
284 } else {
285 (EntryStatus::Hard, true)
286 };
287
288 Some(SchemaEntry {
289 status,
290 filled,
291 name: key.to_string(),
292 bq_type,
293 mode,
294 })
295 }
296 }
297 }
298
299 fn merge_schema_entry(
301 &mut self,
302 old_entry: Option<SchemaEntry>,
303 new_entry: SchemaEntry,
304 base_path: Option<&str>,
305 ) -> Option<SchemaEntry> {
306 let mut old_entry = match old_entry {
307 Some(e) => e,
308 None => return Some(new_entry),
309 };
310
311 if !new_entry.filled || !old_entry.filled {
313 old_entry.filled = false;
314 }
315
316 if old_entry.status == EntryStatus::Ignore {
318 return Some(old_entry);
319 }
320
321 if old_entry.status == EntryStatus::Hard && new_entry.status == EntryStatus::Soft {
323 if let Some(mode) = self.merge_mode(&old_entry, &new_entry, base_path) {
324 old_entry.mode = mode;
325 return Some(old_entry);
326 } else {
327 old_entry.status = EntryStatus::Ignore;
328 return Some(old_entry);
329 }
330 }
331
332 if old_entry.status == EntryStatus::Soft && new_entry.status == EntryStatus::Hard {
334 let mut result = new_entry;
335 result.filled = old_entry.filled;
336 if let Some(mode) = self.merge_mode(&old_entry, &result, base_path) {
337 result.mode = mode;
338 return Some(result);
339 } else {
340 old_entry.status = EntryStatus::Ignore;
341 return Some(old_entry);
342 }
343 }
344
345 let old_type = &old_entry.bq_type;
347 let new_type = &new_entry.bq_type;
348
349 if let (BqType::Record(old_fields), BqType::Record(new_fields)) = (old_type, new_type) {
351 if old_entry.mode == BqMode::Nullable && new_entry.mode == BqMode::Repeated {
353 let full_name = json_full_path(base_path, &old_entry.name);
354 self.log_error(format!(
355 "Converting schema for \"{}\" from NULLABLE RECORD into REPEATED RECORD",
356 full_name
357 ));
358 old_entry.mode = BqMode::Repeated;
359 } else if old_entry.mode == BqMode::Repeated && new_entry.mode == BqMode::Nullable {
360 let full_name = json_full_path(base_path, &old_entry.name);
361 self.log_error(format!(
362 "Leaving schema for \"{}\" as REPEATED RECORD",
363 full_name
364 ));
365 }
366
367 let mut merged_fields = old_fields.clone();
369 let new_base_path = json_full_path(base_path, &old_entry.name);
370
371 for (key, new_field_entry) in new_fields {
372 if let Some(existing) = merged_fields.get(key).cloned() {
373 if let Some(merged) = self.merge_schema_entry(
375 Some(existing),
376 new_field_entry.clone(),
377 Some(&new_base_path),
378 ) {
379 if let Some(slot) = merged_fields.get_mut(key) {
380 *slot = merged;
381 }
382 } else {
383 merged_fields.shift_remove(key);
384 }
385 } else {
386 if let Some(merged) =
388 self.merge_schema_entry(None, new_field_entry.clone(), Some(&new_base_path))
389 {
390 merged_fields.insert(key.clone(), merged);
391 }
392 }
393 }
394
395 old_entry.bq_type = BqType::Record(merged_fields);
396 return Some(old_entry);
397 }
398
399 let merged_mode = match self.merge_mode(&old_entry, &new_entry, base_path) {
401 Some(m) => m,
402 None => {
403 old_entry.status = EntryStatus::Ignore;
404 return Some(old_entry);
405 }
406 };
407
408 if old_type != new_type {
410 match convert_type(old_type, new_type) {
411 Some(converted) => {
412 old_entry.bq_type = converted;
413 old_entry.mode = merged_mode;
414 Some(old_entry)
415 }
416 None => {
417 let full_old_name = json_full_path(base_path, &old_entry.name);
418 let full_new_name = json_full_path(base_path, &new_entry.name);
419 self.log_error(format!(
420 "Ignoring field with mismatched type: old=({:?},{},{},{:?}); new=({:?},{},{},{:?})",
421 old_entry.status, full_old_name, old_entry.mode, old_entry.bq_type,
422 new_entry.status, full_new_name, new_entry.mode, new_entry.bq_type
423 ));
424 old_entry.status = EntryStatus::Ignore;
425 Some(old_entry)
426 }
427 }
428 } else {
429 old_entry.mode = merged_mode;
430 Some(old_entry)
431 }
432 }
433
434 fn merge_mode(
436 &mut self,
437 old_entry: &SchemaEntry,
438 new_entry: &SchemaEntry,
439 base_path: Option<&str>,
440 ) -> Option<BqMode> {
441 let old_mode = old_entry.mode;
442 let new_mode = new_entry.mode;
443
444 if old_mode == new_mode {
446 return Some(old_mode);
447 }
448
449 let full_old_name = json_full_path(base_path, &old_entry.name);
450 let full_new_name = json_full_path(base_path, &new_entry.name);
451
452 if old_mode == BqMode::Required && new_mode == BqMode::Nullable {
454 if new_entry.filled {
455 return Some(old_mode); } else if self.config.infer_mode {
457 return Some(new_mode); } else {
459 self.log_error(format!(
460 "Ignoring non-RECORD field with mismatched mode. Cannot convert to NULLABLE because infer_schema not set: old=({:?},{},{},{:?}); new=({:?},{},{},{:?})",
461 old_entry.status, full_old_name, old_mode, old_entry.bq_type,
462 new_entry.status, full_new_name, new_mode, new_entry.bq_type
463 ));
464 return None;
465 }
466 }
467
468 if old_mode == BqMode::Nullable && new_mode == BqMode::Repeated {
470 if old_entry.status == EntryStatus::Soft && new_entry.status == EntryStatus::Hard {
471 return Some(new_mode);
472 }
473 self.log_error(format!(
474 "Cannot convert NULLABLE(hard) -> REPEATED: old=({:?},{},{},{:?}); new=({:?},{},{},{:?})",
475 old_entry.status, full_old_name, old_mode, old_entry.bq_type,
476 new_entry.status, full_new_name, new_mode, new_entry.bq_type
477 ));
478 return None;
479 }
480
481 if old_mode == BqMode::Repeated && new_mode == BqMode::Nullable {
483 if old_entry.status == EntryStatus::Hard && new_entry.status == EntryStatus::Soft {
484 return Some(old_mode);
485 }
486 self.log_error(format!(
487 "Cannot convert REPEATED -> NULLABLE(hard): old=({:?},{},{},{:?}); new=({:?},{},{},{:?})",
488 old_entry.status, full_old_name, old_mode, old_entry.bq_type,
489 new_entry.status, full_new_name, new_mode, new_entry.bq_type
490 ));
491 return None;
492 }
493
494 self.log_error(format!(
496 "Ignoring non-RECORD field with mismatched mode: old=({:?},{},{},{:?}); new=({:?},{},{},{:?})",
497 old_entry.status, full_old_name, old_mode, old_entry.bq_type,
498 new_entry.status, full_new_name, new_mode, new_entry.bq_type
499 ));
500 None
501 }
502
503 pub fn flatten_schema(&self, schema_map: &SchemaMap) -> Vec<BqSchemaField> {
505 self.flatten_schema_map(schema_map)
506 }
507
508 fn flatten_schema_map(&self, schema_map: &SchemaMap) -> Vec<BqSchemaField> {
509 let mut result = Vec::new();
510
511 let items: Vec<_> = if self.config.preserve_input_sort_order
513 || self.config.input_format == InputFormat::Csv
514 {
515 schema_map.iter().collect()
516 } else {
517 let mut items: Vec<_> = schema_map.iter().collect();
518 items.sort_by(|a, b| a.0.cmp(b.0));
519 items
520 };
521
522 for (_canonical_name, entry) in items {
523 if entry.status == EntryStatus::Ignore {
525 continue;
526 }
527
528 if entry.status == EntryStatus::Soft && !self.config.keep_nulls {
530 continue;
531 }
532
533 let mode = self.determine_output_mode(entry);
534 let field = self.entry_to_schema_field(entry, mode);
535 result.push(field);
536 }
537
538 result
539 }
540
541 fn determine_output_mode(&self, entry: &SchemaEntry) -> BqMode {
542 if self.config.infer_mode
544 && self.config.input_format == InputFormat::Csv
545 && entry.mode == BqMode::Nullable
546 && entry.filled
547 {
548 BqMode::Required
549 } else {
550 entry.mode
551 }
552 }
553
554 fn entry_to_schema_field(&self, entry: &SchemaEntry, mode: BqMode) -> BqSchemaField {
555 match &entry.bq_type {
556 BqType::Record(fields) => {
557 let nested_fields = if fields.is_empty() {
558 vec![BqSchemaField::new(
560 "__unknown__".to_string(),
561 "STRING".to_string(),
562 "NULLABLE".to_string(),
563 )]
564 } else {
565 self.flatten_schema_map(fields)
566 };
567 BqSchemaField::record(entry.name.clone(), mode.as_str().to_string(), nested_fields)
568 }
569 _ => BqSchemaField::new(
570 entry.name.clone(),
571 entry.bq_type.as_str().to_string(),
572 mode.as_str().to_string(),
573 ),
574 }
575 }
576}
577
578fn json_type_name(value: &serde_json::Value) -> &'static str {
580 match value {
581 serde_json::Value::Null => "null",
582 serde_json::Value::Bool(_) => "boolean",
583 serde_json::Value::Number(_) => "number",
584 serde_json::Value::String(_) => "string",
585 serde_json::Value::Array(_) => "array",
586 serde_json::Value::Object(_) => "object",
587 }
588}
589
590fn json_full_path(base_path: Option<&str>, key: &str) -> String {
592 match base_path {
593 Some(base) if !base.is_empty() => format!("{}.{}", base, key),
594 _ => key.to_string(),
595 }
596}
597
598#[cfg(test)]
599mod tests {
600 use super::*;
601 use serde_json::json;
602
603 #[test]
604 fn test_simple_schema() {
605 let mut generator = SchemaGenerator::default_config();
606 let mut schema_map = SchemaMap::new();
607
608 let record = json!({"name": "test", "count": 42, "active": true});
609 generator.process_record(&record, &mut schema_map).unwrap();
610
611 let schema = generator.flatten_schema(&schema_map);
612 assert_eq!(schema.len(), 3);
613 }
614
615 #[test]
616 fn test_nested_record() {
617 let mut generator = SchemaGenerator::default_config();
618 let mut schema_map = SchemaMap::new();
619
620 let record = json!({
621 "user": {
622 "name": "test",
623 "age": 25
624 }
625 });
626 generator.process_record(&record, &mut schema_map).unwrap();
627
628 let schema = generator.flatten_schema(&schema_map);
629 assert_eq!(schema.len(), 1);
630 assert_eq!(schema[0].field_type, "RECORD");
631 assert!(schema[0].fields.is_some());
632 }
633
634 #[test]
635 fn test_array_type() {
636 let mut generator = SchemaGenerator::default_config();
637 let mut schema_map = SchemaMap::new();
638
639 let record = json!({"tags": ["a", "b", "c"]});
640 generator.process_record(&record, &mut schema_map).unwrap();
641
642 let schema = generator.flatten_schema(&schema_map);
643 assert_eq!(schema.len(), 1);
644 assert_eq!(schema[0].mode, "REPEATED");
645 assert_eq!(schema[0].field_type, "STRING");
646 }
647
648 #[test]
649 fn test_type_coercion() {
650 let mut generator = SchemaGenerator::default_config();
651 let mut schema_map = SchemaMap::new();
652
653 let record1 = json!({"value": 42});
655 generator.process_record(&record1, &mut schema_map).unwrap();
656
657 let record2 = json!({"value": 3.5});
659 generator.process_record(&record2, &mut schema_map).unwrap();
660
661 let schema = generator.flatten_schema(&schema_map);
662 assert_eq!(schema.len(), 1);
663 assert_eq!(schema[0].field_type, "FLOAT");
664 }
665
666 #[test]
667 fn test_sanitize_names() {
668 let config = GeneratorConfig {
669 sanitize_names: true,
670 ..Default::default()
671 };
672 let mut generator = SchemaGenerator::new(config);
673 let mut schema_map = SchemaMap::new();
674
675 let record = json!({"field-name": "test", "field.with.dots": 42});
676 generator.process_record(&record, &mut schema_map).unwrap();
677
678 let schema = generator.flatten_schema(&schema_map);
679 assert_eq!(schema.len(), 2);
680 for field in &schema {
682 assert!(!field.name.contains('-'));
683 assert!(!field.name.contains('.'));
684 }
685 }
686}