Skip to main content

datacortex_core/format/
schema.rs

1//! Schema inference engine for columnar JSON/NDJSON data.
2//!
3//! Takes the output of `ndjson::preprocess` (columns separated by \x00,
4//! values within columns separated by \x01) and infers the type of each
5//! column by examining its values.
6//!
7//! The inferred schema can be serialized into compact binary metadata
8//! for storage in .dcx transform metadata, and deserialized by the decoder.
9
10use std::collections::HashSet;
11
12const COL_SEP: u8 = 0x00;
13const VAL_SEP: u8 = 0x01;
14
15// ─── Type Definitions ────────────────────────────────────────────────────────
16
17/// Detected timestamp format.
18#[derive(Debug, Clone, PartialEq)]
19pub enum TimestampFormat {
20    /// ISO 8601: "2026-03-15T10:30:00.001Z" or with offset
21    Iso8601,
22    /// Unix epoch in seconds: 1742036400
23    EpochSeconds,
24    /// Unix epoch in milliseconds: 1742036400001
25    EpochMillis,
26}
27
28/// Inferred type of a single column.
29#[derive(Debug, Clone, PartialEq)]
30pub enum ColumnType {
31    Integer {
32        min: i64,
33        max: i64,
34        nullable: bool,
35    },
36    Float {
37        nullable: bool,
38    },
39    Boolean {
40        nullable: bool,
41    },
42    Timestamp {
43        format: TimestampFormat,
44        nullable: bool,
45    },
46    Uuid {
47        nullable: bool,
48    },
49    Enum {
50        cardinality: u16,
51        nullable: bool,
52    },
53    String {
54        nullable: bool,
55    },
56    /// Column where every value is null.
57    Null,
58}
59
60/// Schema for a single column: type + null statistics.
61#[derive(Debug, Clone)]
62pub struct ColumnSchema {
63    pub col_type: ColumnType,
64    pub null_count: usize,
65    pub total_count: usize,
66}
67
68/// Inferred schema for an entire columnar dataset.
69#[derive(Debug, Clone)]
70pub struct InferredSchema {
71    pub columns: Vec<ColumnSchema>,
72}
73
74// ─── Type Tag Constants (for serialization) ──────────────────────────────────
75
76const TAG_NULL: u8 = 0;
77const TAG_INTEGER: u8 = 1;
78const TAG_FLOAT: u8 = 2;
79const TAG_BOOLEAN: u8 = 3;
80const TAG_TIMESTAMP_ISO: u8 = 4;
81const TAG_TIMESTAMP_EPOCH_S: u8 = 5;
82const TAG_TIMESTAMP_EPOCH_MS: u8 = 6;
83const TAG_UUID: u8 = 7;
84const TAG_ENUM: u8 = 8;
85const TAG_STRING: u8 = 9;
86
87const FLAG_NULLABLE: u8 = 0x01;
88
89// ─── Value Classification ────────────────────────────────────────────────────
90
91/// Classification of a single value for type inference.
92#[derive(Debug, Clone, PartialEq)]
93enum ValueType {
94    Null,
95    Boolean,
96    Integer(i64),
97    Float,
98    TimestampIso,
99    TimestampEpochS,
100    TimestampEpochMs,
101    Uuid,
102    QuotedString,
103}
104
105/// Classify a single value from the columnar data.
106fn classify_value(val: &[u8]) -> ValueType {
107    if val == b"null" {
108        return ValueType::Null;
109    }
110    if val == b"true" || val == b"false" {
111        return ValueType::Boolean;
112    }
113
114    // Try integer: ^-?[0-9]+$
115    if is_integer(val) {
116        if let Some(n) = parse_i64(val) {
117            // Check if it could be an epoch timestamp.
118            // Seconds range: 946684800 (2000-01-01) .. 4102444800 (2099-12-31)
119            // Millis range: those * 1000
120            if n >= 0 {
121                let nu = n as u64;
122                if (946_684_800_000..=4_102_444_800_000).contains(&nu) {
123                    return ValueType::TimestampEpochMs;
124                }
125                if (946_684_800..=4_102_444_800).contains(&nu) {
126                    return ValueType::TimestampEpochS;
127                }
128            }
129            return ValueType::Integer(n);
130        }
131    }
132
133    // Try float: ^-?[0-9]*\.[0-9]+([eE][+-]?[0-9]+)?$ or integer with exponent
134    if is_float(val) {
135        return ValueType::Float;
136    }
137
138    // Quoted value checks — must start and end with "
139    if val.len() >= 2 && val[0] == b'"' && val[val.len() - 1] == b'"' {
140        let inner = &val[1..val.len() - 1];
141
142        // ISO 8601 timestamp: YYYY-MM-DDTHH:MM:SS...
143        if is_iso8601(inner) {
144            return ValueType::TimestampIso;
145        }
146
147        // UUID: 8-4-4-4-12 hex
148        if is_uuid(inner) {
149            return ValueType::Uuid;
150        }
151
152        return ValueType::QuotedString;
153    }
154
155    // Unquoted, non-null, non-bool, non-numeric — treat as string.
156    ValueType::QuotedString
157}
158
159/// Check if bytes represent an integer: ^-?[0-9]+$
160fn is_integer(val: &[u8]) -> bool {
161    if val.is_empty() {
162        return false;
163    }
164    let start = if val[0] == b'-' { 1 } else { 0 };
165    if start >= val.len() {
166        return false;
167    }
168    val[start..].iter().all(|&b| b.is_ascii_digit())
169}
170
171/// Parse bytes as i64, returning None on overflow.
172fn parse_i64(val: &[u8]) -> Option<i64> {
173    // Safe: we already validated it's ASCII digits with optional leading minus.
174    let s = std::str::from_utf8(val).ok()?;
175    s.parse::<i64>().ok()
176}
177
178/// Check if bytes represent a float:
179/// ^-?[0-9]*\.[0-9]+([eE][+-]?[0-9]+)?$ OR integer with exponent ^-?[0-9]+[eE][+-]?[0-9]+$
180fn is_float(val: &[u8]) -> bool {
181    if val.is_empty() {
182        return false;
183    }
184    let s = match std::str::from_utf8(val) {
185        Ok(s) => s,
186        Err(_) => return false,
187    };
188    // Must parse as a valid float and contain either '.' or 'e'/'E'
189    if s.parse::<f64>().is_err() {
190        return false;
191    }
192    // Distinguish from pure integer: must have decimal point or exponent.
193    val.iter().any(|&b| b == b'.' || b == b'e' || b == b'E')
194}
195
196/// Check if inner bytes (without surrounding quotes) match ISO 8601.
197/// Pattern: YYYY-MM-DDTHH:MM:SS with optional fractional seconds and timezone.
198fn is_iso8601(inner: &[u8]) -> bool {
199    // Minimum: "2026-03-15T10:30:00" = 19 chars
200    if inner.len() < 19 {
201        return false;
202    }
203    // YYYY-MM-DDTHH:MM:SS
204    if !inner[0].is_ascii_digit()
205        || !inner[1].is_ascii_digit()
206        || !inner[2].is_ascii_digit()
207        || !inner[3].is_ascii_digit()
208        || inner[4] != b'-'
209        || !inner[5].is_ascii_digit()
210        || !inner[6].is_ascii_digit()
211        || inner[7] != b'-'
212        || !inner[8].is_ascii_digit()
213        || !inner[9].is_ascii_digit()
214        || inner[10] != b'T'
215        || !inner[11].is_ascii_digit()
216        || !inner[12].is_ascii_digit()
217        || inner[13] != b':'
218        || !inner[14].is_ascii_digit()
219        || !inner[15].is_ascii_digit()
220        || inner[16] != b':'
221        || !inner[17].is_ascii_digit()
222        || !inner[18].is_ascii_digit()
223    {
224        return false;
225    }
226    // After the base datetime, allow: nothing, .fractional, Z, +HH:MM, -HH:MM, or combos.
227    let rest = &inner[19..];
228    if rest.is_empty() {
229        return true;
230    }
231    let mut pos = 0;
232    // Optional fractional seconds: .NNN...
233    if pos < rest.len() && rest[pos] == b'.' {
234        pos += 1;
235        if pos >= rest.len() || !rest[pos].is_ascii_digit() {
236            return false;
237        }
238        while pos < rest.len() && rest[pos].is_ascii_digit() {
239            pos += 1;
240        }
241    }
242    // Optional timezone: Z or +HH:MM or -HH:MM
243    if pos < rest.len() {
244        match rest[pos] {
245            b'Z' => {
246                pos += 1;
247            }
248            b'+' | b'-' => {
249                pos += 1;
250                // Expect HH:MM (5 chars)
251                if pos + 5 > rest.len() {
252                    return false;
253                }
254                if !rest[pos].is_ascii_digit()
255                    || !rest[pos + 1].is_ascii_digit()
256                    || rest[pos + 2] != b':'
257                    || !rest[pos + 3].is_ascii_digit()
258                    || !rest[pos + 4].is_ascii_digit()
259                {
260                    return false;
261                }
262                pos += 5;
263            }
264            _ => return false,
265        }
266    }
267    pos == rest.len()
268}
269
270/// Check if inner bytes (without surrounding quotes) match UUID format.
271/// 8-4-4-4-12 hex characters.
272fn is_uuid(inner: &[u8]) -> bool {
273    // Exactly 36 chars: 8-4-4-4-12
274    if inner.len() != 36 {
275        return false;
276    }
277    let groups = [
278        (0, 8),   // 8 hex
279        (9, 13),  // 4 hex
280        (14, 18), // 4 hex
281        (19, 23), // 4 hex
282        (24, 36), // 12 hex
283    ];
284    // Check dashes at positions 8, 13, 18, 23
285    if inner[8] != b'-' || inner[13] != b'-' || inner[18] != b'-' || inner[23] != b'-' {
286        return false;
287    }
288    for &(start, end) in &groups {
289        for &b in &inner[start..end] {
290            if !b.is_ascii_hexdigit() {
291                return false;
292            }
293        }
294    }
295    true
296}
297
298// ─── Schema Inference ────────────────────────────────────────────────────────
299
300/// Infer schema from columnar data (post `ndjson::preprocess` output).
301///
302/// Data format: columns separated by \x00, values within columns by \x01.
303pub fn infer_schema(columnar_data: &[u8]) -> InferredSchema {
304    if columnar_data.is_empty() {
305        return InferredSchema {
306            columns: Vec::new(),
307        };
308    }
309
310    let col_chunks: Vec<&[u8]> = columnar_data.split(|&b| b == COL_SEP).collect();
311    let mut columns = Vec::with_capacity(col_chunks.len());
312
313    for col_data in &col_chunks {
314        let values: Vec<&[u8]> = col_data.split(|&b| b == VAL_SEP).collect();
315        let total_count = values.len();
316
317        // Classify every value.
318        let mut null_count: usize = 0;
319        let mut classifications: Vec<ValueType> = Vec::with_capacity(total_count);
320
321        for val in &values {
322            let vt = classify_value(val);
323            if vt == ValueType::Null {
324                null_count += 1;
325            }
326            classifications.push(vt);
327        }
328
329        let non_null: Vec<&ValueType> = classifications
330            .iter()
331            .filter(|c| **c != ValueType::Null)
332            .collect();
333        let nullable = null_count > 0;
334
335        let col_type = if non_null.is_empty() {
336            // All null.
337            ColumnType::Null
338        } else if non_null.iter().all(|c| matches!(c, ValueType::Boolean)) {
339            ColumnType::Boolean { nullable }
340        } else if non_null.iter().all(|c| matches!(c, ValueType::Integer(_))) {
341            let mut min = i64::MAX;
342            let mut max = i64::MIN;
343            for c in &non_null {
344                if let ValueType::Integer(n) = c {
345                    if *n < min {
346                        min = *n;
347                    }
348                    if *n > max {
349                        max = *n;
350                    }
351                }
352            }
353            ColumnType::Integer { min, max, nullable }
354        } else if non_null
355            .iter()
356            .all(|c| matches!(c, ValueType::Integer(_) | ValueType::Float))
357        {
358            // Mixed int+float => Float
359            ColumnType::Float { nullable }
360        } else if non_null
361            .iter()
362            .all(|c| matches!(c, ValueType::TimestampIso))
363        {
364            ColumnType::Timestamp {
365                format: TimestampFormat::Iso8601,
366                nullable,
367            }
368        } else if non_null
369            .iter()
370            .all(|c| matches!(c, ValueType::TimestampEpochS))
371        {
372            ColumnType::Timestamp {
373                format: TimestampFormat::EpochSeconds,
374                nullable,
375            }
376        } else if non_null
377            .iter()
378            .all(|c| matches!(c, ValueType::TimestampEpochMs))
379        {
380            ColumnType::Timestamp {
381                format: TimestampFormat::EpochMillis,
382                nullable,
383            }
384        } else if non_null
385            .iter()
386            .all(|c| matches!(c, ValueType::TimestampEpochS | ValueType::TimestampEpochMs))
387        {
388            // Mixed epoch seconds and millis — pick millis as the broader type.
389            ColumnType::Timestamp {
390                format: TimestampFormat::EpochMillis,
391                nullable,
392            }
393        } else if non_null.iter().all(|c| {
394            matches!(
395                c,
396                ValueType::Integer(_) | ValueType::TimestampEpochS | ValueType::TimestampEpochMs
397            )
398        }) {
399            // Mixed integers and epoch timestamps — the epoch classification was a
400            // heuristic guess.  Since not ALL values look like timestamps, treat the
401            // whole column as plain integers.  Epoch timestamps are just integers
402            // that happen to fall in a certain range.
403            let mut min = i64::MAX;
404            let mut max = i64::MIN;
405            for val in &values {
406                let vt = classify_value(val);
407                if vt == ValueType::Null {
408                    continue;
409                }
410                // All non-null values in this branch are numeric (Integer or
411                // epoch timestamp), so parse_i64 will succeed.
412                if let Some(n) = parse_i64(val) {
413                    if n < min {
414                        min = n;
415                    }
416                    if n > max {
417                        max = n;
418                    }
419                }
420            }
421            ColumnType::Integer { min, max, nullable }
422        } else if non_null.iter().all(|c| matches!(c, ValueType::Uuid)) {
423            ColumnType::Uuid { nullable }
424        } else if non_null
425            .iter()
426            .all(|c| matches!(c, ValueType::QuotedString))
427        {
428            // Check cardinality for Enum vs String.
429            let mut unique_vals: HashSet<&[u8]> = HashSet::new();
430            for val in &values {
431                let vt = classify_value(val);
432                if vt != ValueType::Null {
433                    unique_vals.insert(val);
434                }
435            }
436            let cardinality = unique_vals.len();
437            if cardinality <= 256 {
438                ColumnType::Enum {
439                    cardinality: cardinality as u16,
440                    nullable,
441                }
442            } else {
443                ColumnType::String { nullable }
444            }
445        } else {
446            // Mixed types that don't fit any unified category.
447            ColumnType::String { nullable }
448        };
449
450        columns.push(ColumnSchema {
451            col_type,
452            null_count,
453            total_count,
454        });
455    }
456
457    InferredSchema { columns }
458}
459
460// ─── Serialization ───────────────────────────────────────────────────────────
461
462/// Serialize schema to compact binary bytes for transform metadata.
463///
464/// Format:
465///   Header: num_columns (u16 LE)
466///   Per column:
467///     byte 0: type tag
468///     byte 1: flags (bit 0 = nullable)
469///     [type-specific data]:
470///       Integer: 8 bytes min (i64 LE) + 8 bytes max (i64 LE)
471///       Enum: 2 bytes cardinality (u16 LE)
472///       Others: no extra data
473pub fn serialize_schema(schema: &InferredSchema) -> Vec<u8> {
474    let mut out = Vec::new();
475    out.extend_from_slice(&(schema.columns.len() as u16).to_le_bytes());
476
477    for col in &schema.columns {
478        let (tag, flags, extra) = match &col.col_type {
479            ColumnType::Null => (TAG_NULL, 0u8, Vec::new()),
480            ColumnType::Integer { min, max, nullable } => {
481                let mut extra = Vec::with_capacity(16);
482                extra.extend_from_slice(&min.to_le_bytes());
483                extra.extend_from_slice(&max.to_le_bytes());
484                (
485                    TAG_INTEGER,
486                    if *nullable { FLAG_NULLABLE } else { 0 },
487                    extra,
488                )
489            }
490            ColumnType::Float { nullable } => (
491                TAG_FLOAT,
492                if *nullable { FLAG_NULLABLE } else { 0 },
493                Vec::new(),
494            ),
495            ColumnType::Boolean { nullable } => (
496                TAG_BOOLEAN,
497                if *nullable { FLAG_NULLABLE } else { 0 },
498                Vec::new(),
499            ),
500            ColumnType::Timestamp { format, nullable } => {
501                let tag = match format {
502                    TimestampFormat::Iso8601 => TAG_TIMESTAMP_ISO,
503                    TimestampFormat::EpochSeconds => TAG_TIMESTAMP_EPOCH_S,
504                    TimestampFormat::EpochMillis => TAG_TIMESTAMP_EPOCH_MS,
505                };
506                (tag, if *nullable { FLAG_NULLABLE } else { 0 }, Vec::new())
507            }
508            ColumnType::Uuid { nullable } => (
509                TAG_UUID,
510                if *nullable { FLAG_NULLABLE } else { 0 },
511                Vec::new(),
512            ),
513            ColumnType::Enum {
514                cardinality,
515                nullable,
516            } => {
517                let mut extra = Vec::with_capacity(2);
518                extra.extend_from_slice(&cardinality.to_le_bytes());
519                (TAG_ENUM, if *nullable { FLAG_NULLABLE } else { 0 }, extra)
520            }
521            ColumnType::String { nullable } => (
522                TAG_STRING,
523                if *nullable { FLAG_NULLABLE } else { 0 },
524                Vec::new(),
525            ),
526        };
527        out.push(tag);
528        out.push(flags);
529        out.extend_from_slice(&extra);
530    }
531
532    out
533}
534
535/// Deserialize schema from transform metadata bytes.
536pub fn deserialize_schema(data: &[u8]) -> InferredSchema {
537    if data.len() < 2 {
538        return InferredSchema {
539            columns: Vec::new(),
540        };
541    }
542
543    let num_columns = u16::from_le_bytes(data[0..2].try_into().unwrap()) as usize;
544    let mut pos = 2;
545    let mut columns = Vec::with_capacity(num_columns);
546
547    for _ in 0..num_columns {
548        if pos + 2 > data.len() {
549            break;
550        }
551        let tag = data[pos];
552        pos += 1;
553        let flags = data[pos];
554        pos += 1;
555        let nullable = (flags & FLAG_NULLABLE) != 0;
556
557        let col_type = match tag {
558            TAG_NULL => ColumnType::Null,
559            TAG_INTEGER => {
560                if pos + 16 > data.len() {
561                    break;
562                }
563                let min = i64::from_le_bytes(data[pos..pos + 8].try_into().unwrap());
564                pos += 8;
565                let max = i64::from_le_bytes(data[pos..pos + 8].try_into().unwrap());
566                pos += 8;
567                ColumnType::Integer { min, max, nullable }
568            }
569            TAG_FLOAT => ColumnType::Float { nullable },
570            TAG_BOOLEAN => ColumnType::Boolean { nullable },
571            TAG_TIMESTAMP_ISO => ColumnType::Timestamp {
572                format: TimestampFormat::Iso8601,
573                nullable,
574            },
575            TAG_TIMESTAMP_EPOCH_S => ColumnType::Timestamp {
576                format: TimestampFormat::EpochSeconds,
577                nullable,
578            },
579            TAG_TIMESTAMP_EPOCH_MS => ColumnType::Timestamp {
580                format: TimestampFormat::EpochMillis,
581                nullable,
582            },
583            TAG_UUID => ColumnType::Uuid { nullable },
584            TAG_ENUM => {
585                if pos + 2 > data.len() {
586                    break;
587                }
588                let cardinality = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap());
589                pos += 2;
590                ColumnType::Enum {
591                    cardinality,
592                    nullable,
593                }
594            }
595            TAG_STRING => ColumnType::String { nullable },
596            _ => ColumnType::String { nullable }, // Unknown tag fallback.
597        };
598
599        columns.push(ColumnSchema {
600            col_type,
601            null_count: 0,  // Not stored in serialized form.
602            total_count: 0, // Not stored in serialized form.
603        });
604    }
605
606    InferredSchema { columns }
607}
608
609// ─── Helper trait for test convenience ───────────────────────────────────────
610
611impl ColumnType {
612    /// Extract max from Integer variant (test helper).
613    #[cfg(test)]
614    fn integer_max(self) -> Option<i64> {
615        match self {
616            ColumnType::Integer { max, .. } => Some(max),
617            _ => None,
618        }
619    }
620}
621
622// ─── Tests ───────────────────────────────────────────────────────────────────
623
624#[cfg(test)]
625mod tests {
626    use super::*;
627
628    /// Helper: build columnar data from column value slices.
629    /// Each inner slice is one column's values.
630    fn build_columnar(columns: &[&[&[u8]]]) -> Vec<u8> {
631        let mut out = Vec::new();
632        for (ci, col) in columns.iter().enumerate() {
633            for (vi, val) in col.iter().enumerate() {
634                out.extend_from_slice(val);
635                if vi < col.len() - 1 {
636                    out.push(VAL_SEP);
637                }
638            }
639            if ci < columns.len() - 1 {
640                out.push(COL_SEP);
641            }
642        }
643        out
644    }
645
646    #[test]
647    fn test_infer_integers() {
648        let data = build_columnar(&[&[b"1", b"2", b"300", b"-5"]]);
649        let schema = infer_schema(&data);
650        assert_eq!(schema.columns.len(), 1);
651        assert_eq!(
652            schema.columns[0].col_type,
653            ColumnType::Integer {
654                min: -5,
655                max: 300,
656                nullable: false,
657            }
658        );
659        assert_eq!(schema.columns[0].null_count, 0);
660        assert_eq!(schema.columns[0].total_count, 4);
661    }
662
663    #[test]
664    fn test_infer_mixed_integer_and_epoch_as_integer() {
665        // Regression: 2147483647 (i32::MAX) falls in the epoch-seconds range
666        // and was misclassified as TimestampEpochS.  When mixed with plain
667        // integers, the column should be inferred as Integer, not String.
668        let data = build_columnar(&[&[
669            b"0",
670            b"-1",
671            b"1",
672            b"-2147483648",
673            b"2147483647",
674            b"-9007199254740991",
675            b"9007199254740991",
676        ]]);
677        let schema = infer_schema(&data);
678        assert_eq!(schema.columns.len(), 1);
679        assert_eq!(
680            schema.columns[0].col_type,
681            ColumnType::Integer {
682                min: -9007199254740991,
683                max: 9007199254740991,
684                nullable: false,
685            },
686            "mixed integers with epoch-range values should infer as Integer, got {:?}",
687            schema.columns[0].col_type
688        );
689    }
690
691    #[test]
692    fn test_infer_floats() {
693        let data = build_columnar(&[&[b"3.14", b"2.718", b"1.0"]]);
694        let schema = infer_schema(&data);
695        assert_eq!(schema.columns.len(), 1);
696        assert_eq!(
697            schema.columns[0].col_type,
698            ColumnType::Float { nullable: false }
699        );
700    }
701
702    #[test]
703    fn test_infer_booleans() {
704        let data = build_columnar(&[&[b"true", b"false", b"true"]]);
705        let schema = infer_schema(&data);
706        assert_eq!(schema.columns.len(), 1);
707        assert_eq!(
708            schema.columns[0].col_type,
709            ColumnType::Boolean { nullable: false }
710        );
711    }
712
713    #[test]
714    fn test_infer_timestamps() {
715        let data = build_columnar(&[&[
716            br#""2026-03-15T10:30:00.001Z""#.as_slice(),
717            br#""2026-03-15T10:30:00.234Z""#.as_slice(),
718            br#""2026-03-15T10:30:01.000Z""#.as_slice(),
719        ]]);
720        let schema = infer_schema(&data);
721        assert_eq!(schema.columns.len(), 1);
722        assert_eq!(
723            schema.columns[0].col_type,
724            ColumnType::Timestamp {
725                format: TimestampFormat::Iso8601,
726                nullable: false,
727            }
728        );
729    }
730
731    #[test]
732    fn test_infer_timestamps_with_offset() {
733        let data = build_columnar(&[&[
734            br#""2026-03-15T10:30:00+05:30""#.as_slice(),
735            br#""2026-03-15T10:30:00-04:00""#.as_slice(),
736        ]]);
737        let schema = infer_schema(&data);
738        assert_eq!(
739            schema.columns[0].col_type,
740            ColumnType::Timestamp {
741                format: TimestampFormat::Iso8601,
742                nullable: false,
743            }
744        );
745    }
746
747    #[test]
748    fn test_infer_uuids() {
749        let data = build_columnar(&[&[
750            br#""550e8400-e29b-41d4-a716-446655440000""#.as_slice(),
751            br#""6ba7b810-9dad-11d1-80b4-00c04fd430c8""#.as_slice(),
752            br#""f47ac10b-58cc-4372-a567-0e02b2c3d479""#.as_slice(),
753        ]]);
754        let schema = infer_schema(&data);
755        assert_eq!(schema.columns.len(), 1);
756        assert_eq!(
757            schema.columns[0].col_type,
758            ColumnType::Uuid { nullable: false }
759        );
760    }
761
762    #[test]
763    fn test_infer_enums() {
764        let data = build_columnar(&[&[
765            br#""page_view""#.as_slice(),
766            br#""api_call""#.as_slice(),
767            br#""click""#.as_slice(),
768            br#""page_view""#.as_slice(),
769            br#""scroll""#.as_slice(),
770            br#""api_call""#.as_slice(),
771        ]]);
772        let schema = infer_schema(&data);
773        assert_eq!(schema.columns.len(), 1);
774        match &schema.columns[0].col_type {
775            ColumnType::Enum {
776                cardinality,
777                nullable,
778            } => {
779                assert_eq!(*cardinality, 4); // page_view, api_call, click, scroll
780                assert!(!nullable);
781            }
782            other => panic!("expected Enum, got {:?}", other),
783        }
784    }
785
786    #[test]
787    fn test_infer_strings() {
788        // High cardinality: every value unique, > 256 unique values.
789        let vals: Vec<Vec<u8>> = (0..300)
790            .map(|i| format!("\"unique_value_{}\"", i).into_bytes())
791            .collect();
792        let val_refs: Vec<&[u8]> = vals.iter().map(|v| v.as_slice()).collect();
793        let data = build_columnar(&[&val_refs]);
794        let schema = infer_schema(&data);
795        assert_eq!(schema.columns.len(), 1);
796        assert_eq!(
797            schema.columns[0].col_type,
798            ColumnType::String { nullable: false }
799        );
800    }
801
802    #[test]
803    fn test_infer_nullable() {
804        let data = build_columnar(&[&[b"1", b"null", b"3", b"null", b"5"]]);
805        let schema = infer_schema(&data);
806        assert_eq!(schema.columns.len(), 1);
807        assert_eq!(
808            schema.columns[0].col_type,
809            ColumnType::Integer {
810                min: 1,
811                max: 5,
812                nullable: true,
813            }
814        );
815        assert_eq!(schema.columns[0].null_count, 2);
816        assert_eq!(schema.columns[0].total_count, 5);
817    }
818
819    #[test]
820    fn test_infer_mixed_int_float() {
821        let data = build_columnar(&[&[b"1", b"2.5", b"3"]]);
822        let schema = infer_schema(&data);
823        assert_eq!(schema.columns.len(), 1);
824        assert_eq!(
825            schema.columns[0].col_type,
826            ColumnType::Float { nullable: false }
827        );
828    }
829
830    #[test]
831    fn test_infer_all_null() {
832        let data = build_columnar(&[&[b"null", b"null", b"null"]]);
833        let schema = infer_schema(&data);
834        assert_eq!(schema.columns.len(), 1);
835        assert_eq!(schema.columns[0].col_type, ColumnType::Null);
836        assert_eq!(schema.columns[0].null_count, 3);
837    }
838
839    #[test]
840    fn test_schema_roundtrip() {
841        // Build a schema with every column type.
842        let schema = InferredSchema {
843            columns: vec![
844                ColumnSchema {
845                    col_type: ColumnType::Null,
846                    null_count: 10,
847                    total_count: 10,
848                },
849                ColumnSchema {
850                    col_type: ColumnType::Integer {
851                        min: -100,
852                        max: 999,
853                        nullable: true,
854                    },
855                    null_count: 2,
856                    total_count: 50,
857                },
858                ColumnSchema {
859                    col_type: ColumnType::Float { nullable: false },
860                    null_count: 0,
861                    total_count: 50,
862                },
863                ColumnSchema {
864                    col_type: ColumnType::Boolean { nullable: true },
865                    null_count: 1,
866                    total_count: 50,
867                },
868                ColumnSchema {
869                    col_type: ColumnType::Timestamp {
870                        format: TimestampFormat::Iso8601,
871                        nullable: false,
872                    },
873                    null_count: 0,
874                    total_count: 50,
875                },
876                ColumnSchema {
877                    col_type: ColumnType::Timestamp {
878                        format: TimestampFormat::EpochSeconds,
879                        nullable: true,
880                    },
881                    null_count: 3,
882                    total_count: 50,
883                },
884                ColumnSchema {
885                    col_type: ColumnType::Timestamp {
886                        format: TimestampFormat::EpochMillis,
887                        nullable: false,
888                    },
889                    null_count: 0,
890                    total_count: 50,
891                },
892                ColumnSchema {
893                    col_type: ColumnType::Uuid { nullable: false },
894                    null_count: 0,
895                    total_count: 50,
896                },
897                ColumnSchema {
898                    col_type: ColumnType::Enum {
899                        cardinality: 7,
900                        nullable: true,
901                    },
902                    null_count: 5,
903                    total_count: 50,
904                },
905                ColumnSchema {
906                    col_type: ColumnType::String { nullable: false },
907                    null_count: 0,
908                    total_count: 50,
909                },
910            ],
911        };
912
913        let bytes = serialize_schema(&schema);
914        let recovered = deserialize_schema(&bytes);
915
916        assert_eq!(recovered.columns.len(), schema.columns.len());
917        for (orig, rec) in schema.columns.iter().zip(recovered.columns.iter()) {
918            assert_eq!(orig.col_type, rec.col_type);
919        }
920    }
921
922    #[test]
923    fn test_serialize_size() {
924        // Verify serialization is compact.
925        let schema = InferredSchema {
926            columns: vec![
927                ColumnSchema {
928                    col_type: ColumnType::Integer {
929                        min: 0,
930                        max: 1000,
931                        nullable: false,
932                    },
933                    null_count: 0,
934                    total_count: 100,
935                },
936                ColumnSchema {
937                    col_type: ColumnType::String { nullable: true },
938                    null_count: 5,
939                    total_count: 100,
940                },
941            ],
942        };
943        let bytes = serialize_schema(&schema);
944        // Header: 2 bytes
945        // Integer column: 2 (tag+flags) + 16 (min+max) = 18
946        // String column: 2 (tag+flags) = 2
947        // Total: 2 + 18 + 2 = 22
948        assert_eq!(bytes.len(), 22);
949    }
950
951    #[test]
952    fn test_empty_input() {
953        let schema = infer_schema(b"");
954        assert!(schema.columns.is_empty());
955    }
956
957    #[test]
958    fn test_multi_column() {
959        // Two columns: integers and booleans.
960        let data = build_columnar(&[&[b"1", b"2", b"3"], &[b"true", b"false", b"true"]]);
961        let schema = infer_schema(&data);
962        assert_eq!(schema.columns.len(), 2);
963        assert_eq!(
964            schema.columns[0].col_type,
965            ColumnType::Integer {
966                min: 1,
967                max: 3,
968                nullable: false,
969            }
970        );
971        assert_eq!(
972            schema.columns[1].col_type,
973            ColumnType::Boolean { nullable: false }
974        );
975    }
976
977    #[test]
978    fn test_epoch_seconds() {
979        // Values in the epoch seconds range.
980        let data = build_columnar(&[&[b"1742036400", b"1742036500", b"1742036600"]]);
981        let schema = infer_schema(&data);
982        assert_eq!(
983            schema.columns[0].col_type,
984            ColumnType::Timestamp {
985                format: TimestampFormat::EpochSeconds,
986                nullable: false,
987            }
988        );
989    }
990
991    #[test]
992    fn test_epoch_millis() {
993        let data = build_columnar(&[&[b"1742036400001", b"1742036400234", b"1742036401000"]]);
994        let schema = infer_schema(&data);
995        assert_eq!(
996            schema.columns[0].col_type,
997            ColumnType::Timestamp {
998                format: TimestampFormat::EpochMillis,
999                nullable: false,
1000            }
1001        );
1002    }
1003
1004    #[test]
1005    fn test_real_ndjson_corpus() {
1006        // Read the test corpus, run through ndjson::preprocess, then infer schema.
1007        let corpus = std::fs::read(concat!(
1008            env!("CARGO_MANIFEST_DIR"),
1009            "/../../corpus/test-ndjson.ndjson"
1010        ))
1011        .expect("failed to read test-ndjson.ndjson");
1012
1013        let transform_result =
1014            crate::format::ndjson::preprocess(&corpus).expect("ndjson::preprocess failed");
1015
1016        let schema = infer_schema(&transform_result.data);
1017
1018        // The corpus has 20 columns (keys per JSON line):
1019        // timestamp, event_type, user_id, session_id, page, referrer,
1020        // user_agent, ip_hash, country, region, city, device, browser,
1021        // browser_version, os, duration_ms, is_authenticated, plan, metadata
1022        //
1023        // All 200 lines have the same schema.
1024        assert!(
1025            schema.columns.len() >= 19,
1026            "expected at least 19 columns, got {}",
1027            schema.columns.len()
1028        );
1029
1030        // Find columns by examining the corpus key order.
1031        // Column 0: timestamp — ISO 8601 strings like "2026-03-15T10:30:00.081Z"
1032        assert_eq!(
1033            schema.columns[0].col_type,
1034            ColumnType::Timestamp {
1035                format: TimestampFormat::Iso8601,
1036                nullable: false,
1037            },
1038            "column 0 (timestamp) should be Timestamp/Iso8601"
1039        );
1040
1041        // Column 1: event_type — low cardinality quoted strings
1042        match &schema.columns[1].col_type {
1043            ColumnType::Enum {
1044                cardinality,
1045                nullable,
1046            } => {
1047                assert!(*cardinality <= 20, "event_type cardinality should be low");
1048                assert!(!nullable, "event_type should not be nullable");
1049            }
1050            other => panic!("column 1 (event_type) should be Enum, got {:?}", other),
1051        }
1052
1053        // Column 2: user_id — quoted strings like "usr_a1b2c3d4"
1054        match &schema.columns[2].col_type {
1055            ColumnType::Enum { .. } | ColumnType::String { .. } => {
1056                // user_id with limited users could be Enum or String.
1057            }
1058            other => panic!(
1059                "column 2 (user_id) should be Enum or String, got {:?}",
1060                other
1061            ),
1062        }
1063
1064        // Column 15: duration_ms — integers
1065        assert_eq!(
1066            schema.columns[15].col_type,
1067            ColumnType::Integer {
1068                min: 0,
1069                max: schema.columns[15]
1070                    .col_type
1071                    .clone()
1072                    .integer_max()
1073                    .unwrap_or(0),
1074                nullable: false,
1075            },
1076            "column 15 (duration_ms) should be Integer"
1077        );
1078
1079        // Column 16: is_authenticated — booleans
1080        assert_eq!(
1081            schema.columns[16].col_type,
1082            ColumnType::Boolean { nullable: false },
1083            "column 16 (is_authenticated) should be Boolean"
1084        );
1085
1086        // Column 5: referrer — has null values
1087        match &schema.columns[5].col_type {
1088            ColumnType::Enum { nullable, .. } | ColumnType::String { nullable } => {
1089                assert!(*nullable, "column 5 (referrer) should be nullable");
1090            }
1091            other => panic!(
1092                "column 5 (referrer) should be nullable Enum/String, got {:?}",
1093                other
1094            ),
1095        }
1096    }
1097}