Skip to main content

datacortex_core/format/
schema.rs

1//! Schema inference engine for columnar JSON/NDJSON data.
2//!
3//! Takes the output of `ndjson::preprocess` (columns separated by \x00,
4//! values within columns separated by \x01) and infers the type of each
5//! column by examining its values.
6//!
7//! The inferred schema can be serialized into compact binary metadata
8//! for storage in .dcx transform metadata, and deserialized by the decoder.
9
10use std::collections::HashSet;
11
12const COL_SEP: u8 = 0x00;
13const VAL_SEP: u8 = 0x01;
14
15// ─── Type Definitions ────────────────────────────────────────────────────────
16
17/// Detected timestamp format.
18#[derive(Debug, Clone, PartialEq)]
19pub enum TimestampFormat {
20    /// ISO 8601: "2026-03-15T10:30:00.001Z" or with offset
21    Iso8601,
22    /// Unix epoch in seconds: 1742036400
23    EpochSeconds,
24    /// Unix epoch in milliseconds: 1742036400001
25    EpochMillis,
26}
27
28/// Inferred type of a single column.
29#[derive(Debug, Clone, PartialEq)]
30pub enum ColumnType {
31    Integer {
32        min: i64,
33        max: i64,
34        nullable: bool,
35    },
36    Float {
37        nullable: bool,
38    },
39    Boolean {
40        nullable: bool,
41    },
42    Timestamp {
43        format: TimestampFormat,
44        nullable: bool,
45    },
46    Uuid {
47        nullable: bool,
48    },
49    Enum {
50        cardinality: u16,
51        nullable: bool,
52    },
53    String {
54        nullable: bool,
55    },
56    /// Column where every value is null.
57    Null,
58}
59
60/// Schema for a single column: type + null statistics.
61#[derive(Debug, Clone)]
62pub struct ColumnSchema {
63    pub col_type: ColumnType,
64    pub null_count: usize,
65    pub total_count: usize,
66}
67
68/// Inferred schema for an entire columnar dataset.
69#[derive(Debug, Clone)]
70pub struct InferredSchema {
71    pub columns: Vec<ColumnSchema>,
72}
73
74// ─── Type Tag Constants (for serialization) ──────────────────────────────────
75
76const TAG_NULL: u8 = 0;
77const TAG_INTEGER: u8 = 1;
78const TAG_FLOAT: u8 = 2;
79const TAG_BOOLEAN: u8 = 3;
80const TAG_TIMESTAMP_ISO: u8 = 4;
81const TAG_TIMESTAMP_EPOCH_S: u8 = 5;
82const TAG_TIMESTAMP_EPOCH_MS: u8 = 6;
83const TAG_UUID: u8 = 7;
84const TAG_ENUM: u8 = 8;
85const TAG_STRING: u8 = 9;
86
87const FLAG_NULLABLE: u8 = 0x01;
88
89// ─── Value Classification ────────────────────────────────────────────────────
90
91/// Classification of a single value for type inference.
92#[derive(Debug, Clone, PartialEq)]
93enum ValueType {
94    Null,
95    Boolean,
96    Integer(i64),
97    Float,
98    TimestampIso,
99    TimestampEpochS,
100    TimestampEpochMs,
101    Uuid,
102    QuotedString,
103}
104
105/// Classify a single value from the columnar data.
106fn classify_value(val: &[u8]) -> ValueType {
107    if val == b"null" {
108        return ValueType::Null;
109    }
110    if val == b"true" || val == b"false" {
111        return ValueType::Boolean;
112    }
113
114    // Try integer: ^-?[0-9]+$
115    if is_integer(val) {
116        if let Some(n) = parse_i64(val) {
117            // Check if it could be an epoch timestamp.
118            // Seconds range: 946684800 (2000-01-01) .. 4102444800 (2099-12-31)
119            // Millis range: those * 1000
120            if n >= 0 {
121                let nu = n as u64;
122                if (946_684_800_000..=4_102_444_800_000).contains(&nu) {
123                    return ValueType::TimestampEpochMs;
124                }
125                if (946_684_800..=4_102_444_800).contains(&nu) {
126                    return ValueType::TimestampEpochS;
127                }
128            }
129            return ValueType::Integer(n);
130        }
131    }
132
133    // Try float: ^-?[0-9]*\.[0-9]+([eE][+-]?[0-9]+)?$ or integer with exponent
134    if is_float(val) {
135        return ValueType::Float;
136    }
137
138    // Quoted value checks — must start and end with "
139    if val.len() >= 2 && val[0] == b'"' && val[val.len() - 1] == b'"' {
140        let inner = &val[1..val.len() - 1];
141
142        // ISO 8601 timestamp: YYYY-MM-DDTHH:MM:SS...
143        if is_iso8601(inner) {
144            return ValueType::TimestampIso;
145        }
146
147        // UUID: 8-4-4-4-12 hex
148        if is_uuid(inner) {
149            return ValueType::Uuid;
150        }
151
152        return ValueType::QuotedString;
153    }
154
155    // Unquoted, non-null, non-bool, non-numeric — treat as string.
156    ValueType::QuotedString
157}
158
159/// Check if bytes represent an integer: ^-?[0-9]+$
160fn is_integer(val: &[u8]) -> bool {
161    if val.is_empty() {
162        return false;
163    }
164    let start = if val[0] == b'-' { 1 } else { 0 };
165    if start >= val.len() {
166        return false;
167    }
168    val[start..].iter().all(|&b| b.is_ascii_digit())
169}
170
171/// Parse bytes as i64, returning None on overflow.
172fn parse_i64(val: &[u8]) -> Option<i64> {
173    // Safe: we already validated it's ASCII digits with optional leading minus.
174    let s = std::str::from_utf8(val).ok()?;
175    s.parse::<i64>().ok()
176}
177
178/// Check if bytes represent a float:
179/// ^-?[0-9]*\.[0-9]+([eE][+-]?[0-9]+)?$ OR integer with exponent ^-?[0-9]+[eE][+-]?[0-9]+$
180fn is_float(val: &[u8]) -> bool {
181    if val.is_empty() {
182        return false;
183    }
184    let s = match std::str::from_utf8(val) {
185        Ok(s) => s,
186        Err(_) => return false,
187    };
188    // Must parse as a valid float and contain either '.' or 'e'/'E'
189    if s.parse::<f64>().is_err() {
190        return false;
191    }
192    // Distinguish from pure integer: must have decimal point or exponent.
193    val.iter().any(|&b| b == b'.' || b == b'e' || b == b'E')
194}
195
196/// Check if inner bytes (without surrounding quotes) match ISO 8601.
197/// Pattern: YYYY-MM-DDTHH:MM:SS with optional fractional seconds and timezone.
198fn is_iso8601(inner: &[u8]) -> bool {
199    // Minimum: "2026-03-15T10:30:00" = 19 chars
200    if inner.len() < 19 {
201        return false;
202    }
203    // YYYY-MM-DDTHH:MM:SS
204    if !inner[0].is_ascii_digit()
205        || !inner[1].is_ascii_digit()
206        || !inner[2].is_ascii_digit()
207        || !inner[3].is_ascii_digit()
208        || inner[4] != b'-'
209        || !inner[5].is_ascii_digit()
210        || !inner[6].is_ascii_digit()
211        || inner[7] != b'-'
212        || !inner[8].is_ascii_digit()
213        || !inner[9].is_ascii_digit()
214        || inner[10] != b'T'
215        || !inner[11].is_ascii_digit()
216        || !inner[12].is_ascii_digit()
217        || inner[13] != b':'
218        || !inner[14].is_ascii_digit()
219        || !inner[15].is_ascii_digit()
220        || inner[16] != b':'
221        || !inner[17].is_ascii_digit()
222        || !inner[18].is_ascii_digit()
223    {
224        return false;
225    }
226    // After the base datetime, allow: nothing, .fractional, Z, +HH:MM, -HH:MM, or combos.
227    let rest = &inner[19..];
228    if rest.is_empty() {
229        return true;
230    }
231    let mut pos = 0;
232    // Optional fractional seconds: .NNN...
233    if pos < rest.len() && rest[pos] == b'.' {
234        pos += 1;
235        if pos >= rest.len() || !rest[pos].is_ascii_digit() {
236            return false;
237        }
238        while pos < rest.len() && rest[pos].is_ascii_digit() {
239            pos += 1;
240        }
241    }
242    // Optional timezone: Z or +HH:MM or -HH:MM
243    if pos < rest.len() {
244        match rest[pos] {
245            b'Z' => {
246                pos += 1;
247            }
248            b'+' | b'-' => {
249                pos += 1;
250                // Expect HH:MM (5 chars)
251                if pos + 5 > rest.len() {
252                    return false;
253                }
254                if !rest[pos].is_ascii_digit()
255                    || !rest[pos + 1].is_ascii_digit()
256                    || rest[pos + 2] != b':'
257                    || !rest[pos + 3].is_ascii_digit()
258                    || !rest[pos + 4].is_ascii_digit()
259                {
260                    return false;
261                }
262                pos += 5;
263            }
264            _ => return false,
265        }
266    }
267    pos == rest.len()
268}
269
270/// Check if inner bytes (without surrounding quotes) match UUID format.
271/// 8-4-4-4-12 hex characters.
272fn is_uuid(inner: &[u8]) -> bool {
273    // Exactly 36 chars: 8-4-4-4-12
274    if inner.len() != 36 {
275        return false;
276    }
277    let groups = [
278        (0, 8),   // 8 hex
279        (9, 13),  // 4 hex
280        (14, 18), // 4 hex
281        (19, 23), // 4 hex
282        (24, 36), // 12 hex
283    ];
284    // Check dashes at positions 8, 13, 18, 23
285    if inner[8] != b'-' || inner[13] != b'-' || inner[18] != b'-' || inner[23] != b'-' {
286        return false;
287    }
288    for &(start, end) in &groups {
289        for &b in &inner[start..end] {
290            if !b.is_ascii_hexdigit() {
291                return false;
292            }
293        }
294    }
295    true
296}
297
298// ─── Schema Inference ────────────────────────────────────────────────────────
299
300/// Infer schema from columnar data (post `ndjson::preprocess` output).
301///
302/// Data format: columns separated by \x00, values within columns by \x01.
303pub fn infer_schema(columnar_data: &[u8]) -> InferredSchema {
304    if columnar_data.is_empty() {
305        return InferredSchema {
306            columns: Vec::new(),
307        };
308    }
309
310    let col_chunks: Vec<&[u8]> = columnar_data.split(|&b| b == COL_SEP).collect();
311    let mut columns = Vec::with_capacity(col_chunks.len());
312
313    for col_data in &col_chunks {
314        let values: Vec<&[u8]> = col_data.split(|&b| b == VAL_SEP).collect();
315        let total_count = values.len();
316
317        // Classify every value.
318        let mut null_count: usize = 0;
319        let mut classifications: Vec<ValueType> = Vec::with_capacity(total_count);
320
321        for val in &values {
322            let vt = classify_value(val);
323            if vt == ValueType::Null {
324                null_count += 1;
325            }
326            classifications.push(vt);
327        }
328
329        let non_null: Vec<&ValueType> = classifications
330            .iter()
331            .filter(|c| **c != ValueType::Null)
332            .collect();
333        let nullable = null_count > 0;
334
335        let col_type = if non_null.is_empty() {
336            // All null.
337            ColumnType::Null
338        } else if non_null.iter().all(|c| matches!(c, ValueType::Boolean)) {
339            ColumnType::Boolean { nullable }
340        } else if non_null.iter().all(|c| matches!(c, ValueType::Integer(_))) {
341            let mut min = i64::MAX;
342            let mut max = i64::MIN;
343            for c in &non_null {
344                if let ValueType::Integer(n) = c {
345                    if *n < min {
346                        min = *n;
347                    }
348                    if *n > max {
349                        max = *n;
350                    }
351                }
352            }
353            ColumnType::Integer { min, max, nullable }
354        } else if non_null
355            .iter()
356            .all(|c| matches!(c, ValueType::Integer(_) | ValueType::Float))
357        {
358            // Mixed int+float => Float
359            ColumnType::Float { nullable }
360        } else if non_null
361            .iter()
362            .all(|c| matches!(c, ValueType::TimestampIso))
363        {
364            ColumnType::Timestamp {
365                format: TimestampFormat::Iso8601,
366                nullable,
367            }
368        } else if non_null
369            .iter()
370            .all(|c| matches!(c, ValueType::TimestampEpochS))
371        {
372            ColumnType::Timestamp {
373                format: TimestampFormat::EpochSeconds,
374                nullable,
375            }
376        } else if non_null
377            .iter()
378            .all(|c| matches!(c, ValueType::TimestampEpochMs))
379        {
380            ColumnType::Timestamp {
381                format: TimestampFormat::EpochMillis,
382                nullable,
383            }
384        } else if non_null
385            .iter()
386            .all(|c| matches!(c, ValueType::TimestampEpochS | ValueType::TimestampEpochMs))
387        {
388            // Mixed epoch seconds and millis — pick millis as the broader type.
389            ColumnType::Timestamp {
390                format: TimestampFormat::EpochMillis,
391                nullable,
392            }
393        } else if non_null.iter().all(|c| matches!(c, ValueType::Uuid)) {
394            ColumnType::Uuid { nullable }
395        } else if non_null
396            .iter()
397            .all(|c| matches!(c, ValueType::QuotedString))
398        {
399            // Check cardinality for Enum vs String.
400            let mut unique_vals: HashSet<&[u8]> = HashSet::new();
401            for val in &values {
402                let vt = classify_value(val);
403                if vt != ValueType::Null {
404                    unique_vals.insert(val);
405                }
406            }
407            let cardinality = unique_vals.len();
408            if cardinality <= 256 {
409                ColumnType::Enum {
410                    cardinality: cardinality as u16,
411                    nullable,
412                }
413            } else {
414                ColumnType::String { nullable }
415            }
416        } else {
417            // Mixed types that don't fit any unified category.
418            ColumnType::String { nullable }
419        };
420
421        columns.push(ColumnSchema {
422            col_type,
423            null_count,
424            total_count,
425        });
426    }
427
428    InferredSchema { columns }
429}
430
431// ─── Serialization ───────────────────────────────────────────────────────────
432
433/// Serialize schema to compact binary bytes for transform metadata.
434///
435/// Format:
436///   Header: num_columns (u16 LE)
437///   Per column:
438///     byte 0: type tag
439///     byte 1: flags (bit 0 = nullable)
440///     [type-specific data]:
441///       Integer: 8 bytes min (i64 LE) + 8 bytes max (i64 LE)
442///       Enum: 2 bytes cardinality (u16 LE)
443///       Others: no extra data
444pub fn serialize_schema(schema: &InferredSchema) -> Vec<u8> {
445    let mut out = Vec::new();
446    out.extend_from_slice(&(schema.columns.len() as u16).to_le_bytes());
447
448    for col in &schema.columns {
449        let (tag, flags, extra) = match &col.col_type {
450            ColumnType::Null => (TAG_NULL, 0u8, Vec::new()),
451            ColumnType::Integer { min, max, nullable } => {
452                let mut extra = Vec::with_capacity(16);
453                extra.extend_from_slice(&min.to_le_bytes());
454                extra.extend_from_slice(&max.to_le_bytes());
455                (
456                    TAG_INTEGER,
457                    if *nullable { FLAG_NULLABLE } else { 0 },
458                    extra,
459                )
460            }
461            ColumnType::Float { nullable } => (
462                TAG_FLOAT,
463                if *nullable { FLAG_NULLABLE } else { 0 },
464                Vec::new(),
465            ),
466            ColumnType::Boolean { nullable } => (
467                TAG_BOOLEAN,
468                if *nullable { FLAG_NULLABLE } else { 0 },
469                Vec::new(),
470            ),
471            ColumnType::Timestamp { format, nullable } => {
472                let tag = match format {
473                    TimestampFormat::Iso8601 => TAG_TIMESTAMP_ISO,
474                    TimestampFormat::EpochSeconds => TAG_TIMESTAMP_EPOCH_S,
475                    TimestampFormat::EpochMillis => TAG_TIMESTAMP_EPOCH_MS,
476                };
477                (tag, if *nullable { FLAG_NULLABLE } else { 0 }, Vec::new())
478            }
479            ColumnType::Uuid { nullable } => (
480                TAG_UUID,
481                if *nullable { FLAG_NULLABLE } else { 0 },
482                Vec::new(),
483            ),
484            ColumnType::Enum {
485                cardinality,
486                nullable,
487            } => {
488                let mut extra = Vec::with_capacity(2);
489                extra.extend_from_slice(&cardinality.to_le_bytes());
490                (TAG_ENUM, if *nullable { FLAG_NULLABLE } else { 0 }, extra)
491            }
492            ColumnType::String { nullable } => (
493                TAG_STRING,
494                if *nullable { FLAG_NULLABLE } else { 0 },
495                Vec::new(),
496            ),
497        };
498        out.push(tag);
499        out.push(flags);
500        out.extend_from_slice(&extra);
501    }
502
503    out
504}
505
506/// Deserialize schema from transform metadata bytes.
507pub fn deserialize_schema(data: &[u8]) -> InferredSchema {
508    if data.len() < 2 {
509        return InferredSchema {
510            columns: Vec::new(),
511        };
512    }
513
514    let num_columns = u16::from_le_bytes(data[0..2].try_into().unwrap()) as usize;
515    let mut pos = 2;
516    let mut columns = Vec::with_capacity(num_columns);
517
518    for _ in 0..num_columns {
519        if pos + 2 > data.len() {
520            break;
521        }
522        let tag = data[pos];
523        pos += 1;
524        let flags = data[pos];
525        pos += 1;
526        let nullable = (flags & FLAG_NULLABLE) != 0;
527
528        let col_type = match tag {
529            TAG_NULL => ColumnType::Null,
530            TAG_INTEGER => {
531                if pos + 16 > data.len() {
532                    break;
533                }
534                let min = i64::from_le_bytes(data[pos..pos + 8].try_into().unwrap());
535                pos += 8;
536                let max = i64::from_le_bytes(data[pos..pos + 8].try_into().unwrap());
537                pos += 8;
538                ColumnType::Integer { min, max, nullable }
539            }
540            TAG_FLOAT => ColumnType::Float { nullable },
541            TAG_BOOLEAN => ColumnType::Boolean { nullable },
542            TAG_TIMESTAMP_ISO => ColumnType::Timestamp {
543                format: TimestampFormat::Iso8601,
544                nullable,
545            },
546            TAG_TIMESTAMP_EPOCH_S => ColumnType::Timestamp {
547                format: TimestampFormat::EpochSeconds,
548                nullable,
549            },
550            TAG_TIMESTAMP_EPOCH_MS => ColumnType::Timestamp {
551                format: TimestampFormat::EpochMillis,
552                nullable,
553            },
554            TAG_UUID => ColumnType::Uuid { nullable },
555            TAG_ENUM => {
556                if pos + 2 > data.len() {
557                    break;
558                }
559                let cardinality = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap());
560                pos += 2;
561                ColumnType::Enum {
562                    cardinality,
563                    nullable,
564                }
565            }
566            TAG_STRING => ColumnType::String { nullable },
567            _ => ColumnType::String { nullable }, // Unknown tag fallback.
568        };
569
570        columns.push(ColumnSchema {
571            col_type,
572            null_count: 0,  // Not stored in serialized form.
573            total_count: 0, // Not stored in serialized form.
574        });
575    }
576
577    InferredSchema { columns }
578}
579
580// ─── Helper trait for test convenience ───────────────────────────────────────
581
582impl ColumnType {
583    /// Extract max from Integer variant (test helper).
584    #[cfg(test)]
585    fn integer_max(self) -> Option<i64> {
586        match self {
587            ColumnType::Integer { max, .. } => Some(max),
588            _ => None,
589        }
590    }
591}
592
593// ─── Tests ───────────────────────────────────────────────────────────────────
594
595#[cfg(test)]
596mod tests {
597    use super::*;
598
599    /// Helper: build columnar data from column value slices.
600    /// Each inner slice is one column's values.
601    fn build_columnar(columns: &[&[&[u8]]]) -> Vec<u8> {
602        let mut out = Vec::new();
603        for (ci, col) in columns.iter().enumerate() {
604            for (vi, val) in col.iter().enumerate() {
605                out.extend_from_slice(val);
606                if vi < col.len() - 1 {
607                    out.push(VAL_SEP);
608                }
609            }
610            if ci < columns.len() - 1 {
611                out.push(COL_SEP);
612            }
613        }
614        out
615    }
616
617    #[test]
618    fn test_infer_integers() {
619        let data = build_columnar(&[&[b"1", b"2", b"300", b"-5"]]);
620        let schema = infer_schema(&data);
621        assert_eq!(schema.columns.len(), 1);
622        assert_eq!(
623            schema.columns[0].col_type,
624            ColumnType::Integer {
625                min: -5,
626                max: 300,
627                nullable: false,
628            }
629        );
630        assert_eq!(schema.columns[0].null_count, 0);
631        assert_eq!(schema.columns[0].total_count, 4);
632    }
633
634    #[test]
635    fn test_infer_floats() {
636        let data = build_columnar(&[&[b"3.14", b"2.718", b"1.0"]]);
637        let schema = infer_schema(&data);
638        assert_eq!(schema.columns.len(), 1);
639        assert_eq!(
640            schema.columns[0].col_type,
641            ColumnType::Float { nullable: false }
642        );
643    }
644
645    #[test]
646    fn test_infer_booleans() {
647        let data = build_columnar(&[&[b"true", b"false", b"true"]]);
648        let schema = infer_schema(&data);
649        assert_eq!(schema.columns.len(), 1);
650        assert_eq!(
651            schema.columns[0].col_type,
652            ColumnType::Boolean { nullable: false }
653        );
654    }
655
656    #[test]
657    fn test_infer_timestamps() {
658        let data = build_columnar(&[&[
659            br#""2026-03-15T10:30:00.001Z""#.as_slice(),
660            br#""2026-03-15T10:30:00.234Z""#.as_slice(),
661            br#""2026-03-15T10:30:01.000Z""#.as_slice(),
662        ]]);
663        let schema = infer_schema(&data);
664        assert_eq!(schema.columns.len(), 1);
665        assert_eq!(
666            schema.columns[0].col_type,
667            ColumnType::Timestamp {
668                format: TimestampFormat::Iso8601,
669                nullable: false,
670            }
671        );
672    }
673
674    #[test]
675    fn test_infer_timestamps_with_offset() {
676        let data = build_columnar(&[&[
677            br#""2026-03-15T10:30:00+05:30""#.as_slice(),
678            br#""2026-03-15T10:30:00-04:00""#.as_slice(),
679        ]]);
680        let schema = infer_schema(&data);
681        assert_eq!(
682            schema.columns[0].col_type,
683            ColumnType::Timestamp {
684                format: TimestampFormat::Iso8601,
685                nullable: false,
686            }
687        );
688    }
689
690    #[test]
691    fn test_infer_uuids() {
692        let data = build_columnar(&[&[
693            br#""550e8400-e29b-41d4-a716-446655440000""#.as_slice(),
694            br#""6ba7b810-9dad-11d1-80b4-00c04fd430c8""#.as_slice(),
695            br#""f47ac10b-58cc-4372-a567-0e02b2c3d479""#.as_slice(),
696        ]]);
697        let schema = infer_schema(&data);
698        assert_eq!(schema.columns.len(), 1);
699        assert_eq!(
700            schema.columns[0].col_type,
701            ColumnType::Uuid { nullable: false }
702        );
703    }
704
705    #[test]
706    fn test_infer_enums() {
707        let data = build_columnar(&[&[
708            br#""page_view""#.as_slice(),
709            br#""api_call""#.as_slice(),
710            br#""click""#.as_slice(),
711            br#""page_view""#.as_slice(),
712            br#""scroll""#.as_slice(),
713            br#""api_call""#.as_slice(),
714        ]]);
715        let schema = infer_schema(&data);
716        assert_eq!(schema.columns.len(), 1);
717        match &schema.columns[0].col_type {
718            ColumnType::Enum {
719                cardinality,
720                nullable,
721            } => {
722                assert_eq!(*cardinality, 4); // page_view, api_call, click, scroll
723                assert!(!nullable);
724            }
725            other => panic!("expected Enum, got {:?}", other),
726        }
727    }
728
729    #[test]
730    fn test_infer_strings() {
731        // High cardinality: every value unique, > 256 unique values.
732        let vals: Vec<Vec<u8>> = (0..300)
733            .map(|i| format!("\"unique_value_{}\"", i).into_bytes())
734            .collect();
735        let val_refs: Vec<&[u8]> = vals.iter().map(|v| v.as_slice()).collect();
736        let data = build_columnar(&[&val_refs]);
737        let schema = infer_schema(&data);
738        assert_eq!(schema.columns.len(), 1);
739        assert_eq!(
740            schema.columns[0].col_type,
741            ColumnType::String { nullable: false }
742        );
743    }
744
745    #[test]
746    fn test_infer_nullable() {
747        let data = build_columnar(&[&[b"1", b"null", b"3", b"null", b"5"]]);
748        let schema = infer_schema(&data);
749        assert_eq!(schema.columns.len(), 1);
750        assert_eq!(
751            schema.columns[0].col_type,
752            ColumnType::Integer {
753                min: 1,
754                max: 5,
755                nullable: true,
756            }
757        );
758        assert_eq!(schema.columns[0].null_count, 2);
759        assert_eq!(schema.columns[0].total_count, 5);
760    }
761
762    #[test]
763    fn test_infer_mixed_int_float() {
764        let data = build_columnar(&[&[b"1", b"2.5", b"3"]]);
765        let schema = infer_schema(&data);
766        assert_eq!(schema.columns.len(), 1);
767        assert_eq!(
768            schema.columns[0].col_type,
769            ColumnType::Float { nullable: false }
770        );
771    }
772
773    #[test]
774    fn test_infer_all_null() {
775        let data = build_columnar(&[&[b"null", b"null", b"null"]]);
776        let schema = infer_schema(&data);
777        assert_eq!(schema.columns.len(), 1);
778        assert_eq!(schema.columns[0].col_type, ColumnType::Null);
779        assert_eq!(schema.columns[0].null_count, 3);
780    }
781
782    #[test]
783    fn test_schema_roundtrip() {
784        // Build a schema with every column type.
785        let schema = InferredSchema {
786            columns: vec![
787                ColumnSchema {
788                    col_type: ColumnType::Null,
789                    null_count: 10,
790                    total_count: 10,
791                },
792                ColumnSchema {
793                    col_type: ColumnType::Integer {
794                        min: -100,
795                        max: 999,
796                        nullable: true,
797                    },
798                    null_count: 2,
799                    total_count: 50,
800                },
801                ColumnSchema {
802                    col_type: ColumnType::Float { nullable: false },
803                    null_count: 0,
804                    total_count: 50,
805                },
806                ColumnSchema {
807                    col_type: ColumnType::Boolean { nullable: true },
808                    null_count: 1,
809                    total_count: 50,
810                },
811                ColumnSchema {
812                    col_type: ColumnType::Timestamp {
813                        format: TimestampFormat::Iso8601,
814                        nullable: false,
815                    },
816                    null_count: 0,
817                    total_count: 50,
818                },
819                ColumnSchema {
820                    col_type: ColumnType::Timestamp {
821                        format: TimestampFormat::EpochSeconds,
822                        nullable: true,
823                    },
824                    null_count: 3,
825                    total_count: 50,
826                },
827                ColumnSchema {
828                    col_type: ColumnType::Timestamp {
829                        format: TimestampFormat::EpochMillis,
830                        nullable: false,
831                    },
832                    null_count: 0,
833                    total_count: 50,
834                },
835                ColumnSchema {
836                    col_type: ColumnType::Uuid { nullable: false },
837                    null_count: 0,
838                    total_count: 50,
839                },
840                ColumnSchema {
841                    col_type: ColumnType::Enum {
842                        cardinality: 7,
843                        nullable: true,
844                    },
845                    null_count: 5,
846                    total_count: 50,
847                },
848                ColumnSchema {
849                    col_type: ColumnType::String { nullable: false },
850                    null_count: 0,
851                    total_count: 50,
852                },
853            ],
854        };
855
856        let bytes = serialize_schema(&schema);
857        let recovered = deserialize_schema(&bytes);
858
859        assert_eq!(recovered.columns.len(), schema.columns.len());
860        for (orig, rec) in schema.columns.iter().zip(recovered.columns.iter()) {
861            assert_eq!(orig.col_type, rec.col_type);
862        }
863    }
864
865    #[test]
866    fn test_serialize_size() {
867        // Verify serialization is compact.
868        let schema = InferredSchema {
869            columns: vec![
870                ColumnSchema {
871                    col_type: ColumnType::Integer {
872                        min: 0,
873                        max: 1000,
874                        nullable: false,
875                    },
876                    null_count: 0,
877                    total_count: 100,
878                },
879                ColumnSchema {
880                    col_type: ColumnType::String { nullable: true },
881                    null_count: 5,
882                    total_count: 100,
883                },
884            ],
885        };
886        let bytes = serialize_schema(&schema);
887        // Header: 2 bytes
888        // Integer column: 2 (tag+flags) + 16 (min+max) = 18
889        // String column: 2 (tag+flags) = 2
890        // Total: 2 + 18 + 2 = 22
891        assert_eq!(bytes.len(), 22);
892    }
893
894    #[test]
895    fn test_empty_input() {
896        let schema = infer_schema(b"");
897        assert!(schema.columns.is_empty());
898    }
899
900    #[test]
901    fn test_multi_column() {
902        // Two columns: integers and booleans.
903        let data = build_columnar(&[&[b"1", b"2", b"3"], &[b"true", b"false", b"true"]]);
904        let schema = infer_schema(&data);
905        assert_eq!(schema.columns.len(), 2);
906        assert_eq!(
907            schema.columns[0].col_type,
908            ColumnType::Integer {
909                min: 1,
910                max: 3,
911                nullable: false,
912            }
913        );
914        assert_eq!(
915            schema.columns[1].col_type,
916            ColumnType::Boolean { nullable: false }
917        );
918    }
919
920    #[test]
921    fn test_epoch_seconds() {
922        // Values in the epoch seconds range.
923        let data = build_columnar(&[&[b"1742036400", b"1742036500", b"1742036600"]]);
924        let schema = infer_schema(&data);
925        assert_eq!(
926            schema.columns[0].col_type,
927            ColumnType::Timestamp {
928                format: TimestampFormat::EpochSeconds,
929                nullable: false,
930            }
931        );
932    }
933
934    #[test]
935    fn test_epoch_millis() {
936        let data = build_columnar(&[&[b"1742036400001", b"1742036400234", b"1742036401000"]]);
937        let schema = infer_schema(&data);
938        assert_eq!(
939            schema.columns[0].col_type,
940            ColumnType::Timestamp {
941                format: TimestampFormat::EpochMillis,
942                nullable: false,
943            }
944        );
945    }
946
947    #[test]
948    fn test_real_ndjson_corpus() {
949        // Read the test corpus, run through ndjson::preprocess, then infer schema.
950        let corpus = std::fs::read(concat!(
951            env!("CARGO_MANIFEST_DIR"),
952            "/../../corpus/test-ndjson.ndjson"
953        ))
954        .expect("failed to read test-ndjson.ndjson");
955
956        let transform_result =
957            crate::format::ndjson::preprocess(&corpus).expect("ndjson::preprocess failed");
958
959        let schema = infer_schema(&transform_result.data);
960
961        // The corpus has 20 columns (keys per JSON line):
962        // timestamp, event_type, user_id, session_id, page, referrer,
963        // user_agent, ip_hash, country, region, city, device, browser,
964        // browser_version, os, duration_ms, is_authenticated, plan, metadata
965        //
966        // All 200 lines have the same schema.
967        assert!(
968            schema.columns.len() >= 19,
969            "expected at least 19 columns, got {}",
970            schema.columns.len()
971        );
972
973        // Find columns by examining the corpus key order.
974        // Column 0: timestamp — ISO 8601 strings like "2026-03-15T10:30:00.081Z"
975        assert_eq!(
976            schema.columns[0].col_type,
977            ColumnType::Timestamp {
978                format: TimestampFormat::Iso8601,
979                nullable: false,
980            },
981            "column 0 (timestamp) should be Timestamp/Iso8601"
982        );
983
984        // Column 1: event_type — low cardinality quoted strings
985        match &schema.columns[1].col_type {
986            ColumnType::Enum {
987                cardinality,
988                nullable,
989            } => {
990                assert!(*cardinality <= 20, "event_type cardinality should be low");
991                assert!(!nullable, "event_type should not be nullable");
992            }
993            other => panic!("column 1 (event_type) should be Enum, got {:?}", other),
994        }
995
996        // Column 2: user_id — quoted strings like "usr_a1b2c3d4"
997        match &schema.columns[2].col_type {
998            ColumnType::Enum { .. } | ColumnType::String { .. } => {
999                // user_id with limited users could be Enum or String.
1000            }
1001            other => panic!(
1002                "column 2 (user_id) should be Enum or String, got {:?}",
1003                other
1004            ),
1005        }
1006
1007        // Column 15: duration_ms — integers
1008        assert_eq!(
1009            schema.columns[15].col_type,
1010            ColumnType::Integer {
1011                min: 0,
1012                max: schema.columns[15]
1013                    .col_type
1014                    .clone()
1015                    .integer_max()
1016                    .unwrap_or(0),
1017                nullable: false,
1018            },
1019            "column 15 (duration_ms) should be Integer"
1020        );
1021
1022        // Column 16: is_authenticated — booleans
1023        assert_eq!(
1024            schema.columns[16].col_type,
1025            ColumnType::Boolean { nullable: false },
1026            "column 16 (is_authenticated) should be Boolean"
1027        );
1028
1029        // Column 5: referrer — has null values
1030        match &schema.columns[5].col_type {
1031            ColumnType::Enum { nullable, .. } | ColumnType::String { nullable } => {
1032                assert!(*nullable, "column 5 (referrer) should be nullable");
1033            }
1034            other => panic!(
1035                "column 5 (referrer) should be nullable Enum/String, got {:?}",
1036                other
1037            ),
1038        }
1039    }
1040}