bq-schema-gen 0.1.1

Generate BigQuery schema from JSON or CSV data files
Documentation
# BigQuery Schema Generator - Technical Overview

## Project Purpose

A Rust CLI tool and library that generates BigQuery schemas from JSON/CSV data by processing **all records** (unlike BigQuery's auto-detect which only samples 500 records). Supports schema merging, type inference, validation, parallel processing, and file watching.

---

## Architecture

```
src/
├── main.rs           # CLI entry point, clap parsing, command dispatch
├── lib.rs            # Public API exports
├── error.rs          # Error types (Error enum, ErrorLog, Result alias)
├── schema/
│   ├── mod.rs        # Module exports
│   ├── types.rs      # Core types: BqType, BqMode, SchemaEntry, SchemaMap, BqSchemaField
│   ├── generator.rs  # SchemaGenerator: record processing, type inference, merging
│   └── existing.rs   # Load existing schemas from files, type alias conversion
├── inference/
│   └── mod.rs        # Type inference from JSON values, regex patterns for DATE/TIME/TIMESTAMP
├── input/
│   ├── mod.rs        # Module exports
│   ├── json.rs       # JsonRecordIterator: line-by-line NDJSON parsing
│   └── csv.rs        # CsvRecordIterator: CSV to JSON object conversion
├── output/
│   └── mod.rs        # Output formatters: JSON, DDL, debug-map, JSON-Schema
├── diff/
│   ├── mod.rs        # Schema comparison: diff_schemas(), breaking change detection
│   └── output.rs     # Diff formatters: text, JSON, JSON-patch, SQL
├── validate/
│   ├── mod.rs        # SchemaValidator: validates data against existing schema
│   └── error.rs      # ValidationError, ValidationErrorType, ValidationResult
└── watch/
    └── mod.rs        # WatchState: file watching with incremental schema caching
```

---

## Core Types

### `SchemaMap` (schema/types.rs:181)
```rust
pub type SchemaMap = IndexMap<String, SchemaEntry>;
```
- Keys are **lowercase/canonical** field names
- Uses `IndexMap` to preserve insertion order (important for `--preserve-input-sort-order`)

### `SchemaEntry` (schema/types.rs:139-151)
```rust
pub struct SchemaEntry {
    pub status: EntryStatus,  // Hard | Soft | Ignore
    pub filled: bool,         // Present in all records?
    pub name: String,         // Original field name (pre-lowercase)
    pub bq_type: BqType,      // Inferred type
    pub mode: BqMode,         // NULLABLE | REQUIRED | REPEATED
}
```

### `BqType` (schema/types.rs:14-41)
```rust
pub enum BqType {
    // Output types
    Boolean, Integer, Float, String, Timestamp, Date, Time, Record(SchemaMap),
    // Internal tracking types (become STRING in output)
    Null, EmptyArray, EmptyRecord,
    // Quoted types (for CSV/quoted JSON strings)
    QBoolean, QInteger, QFloat,
}
```
- `Q*` types track values inferred from quoted strings (e.g., `"123"``QInteger`)
- Internal types (`Null`, `EmptyArray`, `EmptyRecord`) handled specially in output

### `BqSchemaField` (schema/types.rs:184-192)
```rust
pub struct BqSchemaField {
    pub fields: Option<Vec<BqSchemaField>>,  // Nested fields for RECORD
    pub mode: String,      // "NULLABLE", "REQUIRED", "REPEATED"
    pub name: String,
    pub field_type: String, // "STRING", "INTEGER", etc.
}
```
- JSON-serializable output format matching BigQuery schema JSON

### `EntryStatus` (schema/types.rs:128-136)
- `Hard`: Type definitively determined from non-null value
- `Soft`: Type provisional (from null/empty), can be overwritten
- `Ignore`: Conflicting types, excluded from output

---

## Data Flow

### Schema Generation
```
Input (JSON/CSV)
  → JsonRecordIterator/CsvRecordIterator (yields (line_num, serde_json::Value))
  → SchemaGenerator::process_record()
    → deduce_schema_for_record() - recursive field extraction
    → merge_schema_entry() - merges new entry with existing
  → SchemaMap (internal representation)
  → flatten_schema() - converts to Vec<BqSchemaField>
  → write_schema_json/ddl/etc. (output)
```

### Type Inference Priority (inference/mod.rs)
1. JSON null → `Null`
2. JSON boolean → `Boolean`
3. JSON number → `Integer` (if fits i64) or `Float`
4. JSON string:
   - Check TIMESTAMP regex first
   - Check DATE regex
   - Check TIME regex
   - If `!quoted_values_are_strings`: check INTEGER/FLOAT/BOOLEAN patterns → `Q*` types
   - Otherwise → `String`
5. JSON array → `(Repeated, element_type)` or `EmptyArray`
6. JSON object → `Record(SchemaMap)` or `EmptyRecord`

### Type Merging Rules (schema/generator.rs:300-432)
- Same type → same type
- `[Q]Boolean + [Q]Boolean``Boolean`
- `[Q]Integer + [Q]Integer``Integer`
- `[Q]Float + [Q]Float``Float`
- `QInteger + QFloat``QFloat`
- `[Q]Integer + [Q]Float``Float`
- String-compatible types (`String`, `Timestamp`, `Date`, `Time`, `Q*`) → `String`
- `Record + Record` → merged Record (recursive)
- `NULLABLE RECORD → REPEATED RECORD` allowed (logs warning)
- Incompatible types → `EntryStatus::Ignore`

---

## CLI Structure (main.rs)

### Main Struct
```rust
#[derive(Parser)]
struct Cli {
    files: Vec<String>,          // Positional, glob-expanded
    // ... many flags with aliases (--input_format and --input-format both work)
    threads: Option<usize>,      // Parallel processing
    watch: bool,                 // Watch mode
    debounce: u64,               // Watch debounce (ms)
    on_change: Option<String>,   // Watch on-change command
}
```

### Subcommands
```rust
enum Commands {
    Diff { old_schema, new_schema, format, color, strict, output },
    Validate { files, schema, allow_unknown, strict_types, max_errors, format, quiet },
}
```

### Command Dispatch (main.rs:161-197)
1. `Some(Commands::Diff {...})``run_diff()`
2. `Some(Commands::Validate {...})``run_validate()`
3. `None``run_generate()` (default schema generation)

### Processing Modes (run_generate)
1. **Watch mode** (`--watch`) → `run_watch_mode()`
2. **Per-file mode** (`--per-file`) → `process_per_file()`
3. **Stdin mode** (no files) → `process_single_input(None, ...)`
4. **Merged mode** (default) → `process_merged_files()`
   - If `threads > 1 && files > 1``process_files_parallel()` with rayon
   - Otherwise → `process_files_sequential()`

---

## Key Patterns

### Streaming Processing
- Files processed line-by-line via iterators
- Memory-efficient for large files
- `JsonRecordIterator` yields `(line_number, serde_json::Value)`

### Case-Insensitive Field Matching
- Fields stored with lowercase canonical keys
- Original name preserved in `SchemaEntry.name`
- Matching done via `key.to_lowercase()`

### Error Handling
- `Error` enum with `thiserror` derive
- Non-fatal errors logged to `SchemaGenerator.error_logs: Vec<ErrorLog>`
- `--ignore-invalid-lines` skips parse errors

### Field Name Sanitization (`--sanitize-names`)
- Regex replaces `[^a-zA-Z0-9_]` with `_`
- Truncates to 128 chars (BigQuery limit)

---

## Validation Module (validate/)

### SchemaValidator
```rust
pub struct SchemaValidator<'a> {
    schema: &'a [BqSchemaField],
    options: ValidationOptions,
    schema_map: HashMap<String, &'a BqSchemaField>,  // lowercase lookup
}
```

### ValidationOptions
```rust
pub struct ValidationOptions {
    pub allow_unknown: bool,   // Unknown fields → warnings instead of errors
    pub strict_types: bool,    // "123" fails INTEGER (JSON string ≠ number)
    pub max_errors: usize,     // Stop after N errors
}
```

### Type Coercion (lenient mode, default)
- Uses inference module's `is_*_string()` functions
- `"123"` valid for INTEGER, `"true"` valid for BOOLEAN, etc.

### Strict Mode (`--strict-types`)
- JSON type must match schema type exactly
- JSON string → only STRING
- JSON number → INTEGER or FLOAT
- JSON boolean → only BOOLEAN

---

## Parallel Processing (main.rs:715-935)

### Approach
1. Collect files from glob patterns
2. Create rayon thread pool (`ThreadPoolBuilder::new().num_threads(n)`)
3. Process files via `par_iter()`, each producing `SchemaMap`
4. Merge all `SchemaMap` results using `merge_schema_maps()`

### Progress Bar
```rust
ProgressBar::new(files.len() as u64)
    .set_style(ProgressStyle::default_bar()
        .template("{spinner:.green} [{bar:40.cyan/blue}] {pos}/{len} files | {msg}"))
```

### Schema Merging
`merge_schema_maps()` converts `SchemaEntry` back to JSON value and processes through generator to trigger merge logic.

---

## Watch Mode (watch/mod.rs)

### WatchState
```rust
pub struct WatchState {
    file_schemas: HashMap<PathBuf, SchemaMap>,  // Per-file cache
    current_schema: Vec<BqSchemaField>,         // Merged result
    config: GeneratorConfig,
    watch_config: WatchConfig,
}
```

### Incremental Updates
- `handle_file_change(path)`: Reprocess single file, rebuild merged schema, return diff
- `handle_file_delete(path)`: Remove from cache, rebuild, return diff

### Event Loop
- Uses `notify-debouncer-mini` for debounced file events
- Watches unique parent directories of matched files
- Only processes events for files matching original patterns
- Prints diff summary with colored output

---

## Diff Module (diff/)

### Breaking Change Rules (diff/mod.rs:247-282)
**Always Breaking:**
- Field removal
- `NULLABLE → REQUIRED`
- `REPEATED ↔ NULLABLE/REQUIRED`

**Not Breaking:**
- `INTEGER → FLOAT` (widening)
- Any type → `STRING`
- `REQUIRED → NULLABLE`

**Strict Mode (`--strict`):** All changes flagged as breaking

### Output Formats
- `text`: Colored human-readable
- `json`: Full structured diff
- `json-patch`: RFC 6902 format
- `sql`: Migration hints with comments

---

## Testing

### Test Organization
```
tests/
├── cli_tests.rs          # CLI argument handling, end-to-end
├── csv_tests.rs          # CSV parsing edge cases
├── edge_cases.rs         # Type coercion, conflicts, unicode
└── integration_tests.rs  # Full processing scenarios
```

### Test Helper Pattern
```rust
fn generate_schema(records: &[&str], config: GeneratorConfig) -> (Vec<BqSchemaField>, Vec<ErrorLog>) {
    let mut generator = SchemaGenerator::new(config);
    let mut schema_map = SchemaMap::new();
    for record in records {
        let value: Value = serde_json::from_str(record).unwrap();
        generator.process_record(&value, &mut schema_map).ok();
    }
    (generator.flatten_schema(&schema_map), generator.error_logs().to_vec())
}
```

---

## Dependencies

```toml
# Core
serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0", features = ["preserve_order"] }
clap = { version = "4.4", features = ["derive", "string"] }
indexmap = { version = "2.1", features = ["serde"] }
regex = "1.10"
once_cell = "1.19"  # Lazy statics for regex
glob = "0.3"
colored = "2"
thiserror = "1.0"
csv = "1.3"

# Parallel processing
rayon = "1.8"
indicatif = "0.17"
num_cpus = "1.16"

# Watch mode
notify = "6.1"
notify-debouncer-mini = "0.4"
```

---

## Important Gotchas

1. **Field order**: `IndexMap` preserves insertion order; `--preserve-input-sort-order` disables alphabetical sorting

2. **Case sensitivity**: BigQuery is case-insensitive for field names; canonical key is lowercase, original preserved in entry

3. **Quoted types**: `QInteger`, `QFloat`, `QBoolean` track inferred types from strings; merge with unquoted → unquoted type

4. **Empty arrays**: `[]` becomes `EmptyArray`, can later upgrade to typed array; `--keep-nulls` outputs as `STRING REPEATED`

5. **Nested arrays**: Not supported by BigQuery; returns `None` from type inference, logged as error

6. **NULLABLE RECORD → REPEATED**: Allowed with warning (BigQuery behavior)

7. **Integer overflow**: Numbers > `i64::MAX` become `FLOAT`

8. **Regex patterns**: Defined in `inference/mod.rs` with `once_cell::Lazy`; match Python implementation exactly

9. **Exit codes**:
   - Generation: 0 success
   - Diff: 1 if breaking changes
   - Validate: 0 valid, 1 invalid, 2 file error

10. **CLI flag aliases**: Both `--input_format` and `--input-format` work (Python compatibility)

---

## Future Enhancement Points

1. **Single-file chunking**: Parallel processing within a single large file
2. **CSV validation**: Currently JSON-only
3. **Incremental parallel watch**: Parallelize file reprocessing in watch mode
4. **Custom type mappings**: User-defined regex → type rules
5. **Schema migration generation**: DDL ALTER statements from diff