1use std::collections::HashSet;
11
12const COL_SEP: u8 = 0x00;
13const VAL_SEP: u8 = 0x01;
14
15#[derive(Debug, Clone, PartialEq)]
19pub enum TimestampFormat {
20 Iso8601,
22 EpochSeconds,
24 EpochMillis,
26}
27
28#[derive(Debug, Clone, PartialEq)]
30pub enum ColumnType {
31 Integer {
32 min: i64,
33 max: i64,
34 nullable: bool,
35 },
36 Float {
37 nullable: bool,
38 },
39 Boolean {
40 nullable: bool,
41 },
42 Timestamp {
43 format: TimestampFormat,
44 nullable: bool,
45 },
46 Uuid {
47 nullable: bool,
48 },
49 Enum {
50 cardinality: u16,
51 nullable: bool,
52 },
53 String {
54 nullable: bool,
55 },
56 Null,
58}
59
60#[derive(Debug, Clone)]
62pub struct ColumnSchema {
63 pub col_type: ColumnType,
64 pub null_count: usize,
65 pub total_count: usize,
66}
67
68#[derive(Debug, Clone)]
70pub struct InferredSchema {
71 pub columns: Vec<ColumnSchema>,
72}
73
74const TAG_NULL: u8 = 0;
77const TAG_INTEGER: u8 = 1;
78const TAG_FLOAT: u8 = 2;
79const TAG_BOOLEAN: u8 = 3;
80const TAG_TIMESTAMP_ISO: u8 = 4;
81const TAG_TIMESTAMP_EPOCH_S: u8 = 5;
82const TAG_TIMESTAMP_EPOCH_MS: u8 = 6;
83const TAG_UUID: u8 = 7;
84const TAG_ENUM: u8 = 8;
85const TAG_STRING: u8 = 9;
86
87const FLAG_NULLABLE: u8 = 0x01;
88
89#[derive(Debug, Clone, PartialEq)]
93enum ValueType {
94 Null,
95 Boolean,
96 Integer(i64),
97 Float,
98 TimestampIso,
99 TimestampEpochS,
100 TimestampEpochMs,
101 Uuid,
102 QuotedString,
103}
104
105fn classify_value(val: &[u8]) -> ValueType {
107 if val == b"null" {
108 return ValueType::Null;
109 }
110 if val == b"true" || val == b"false" {
111 return ValueType::Boolean;
112 }
113
114 if is_integer(val) {
116 if let Some(n) = parse_i64(val) {
117 if n >= 0 {
121 let nu = n as u64;
122 if (946_684_800_000..=4_102_444_800_000).contains(&nu) {
123 return ValueType::TimestampEpochMs;
124 }
125 if (946_684_800..=4_102_444_800).contains(&nu) {
126 return ValueType::TimestampEpochS;
127 }
128 }
129 return ValueType::Integer(n);
130 }
131 }
132
133 if is_float(val) {
135 return ValueType::Float;
136 }
137
138 if val.len() >= 2 && val[0] == b'"' && val[val.len() - 1] == b'"' {
140 let inner = &val[1..val.len() - 1];
141
142 if is_iso8601(inner) {
144 return ValueType::TimestampIso;
145 }
146
147 if is_uuid(inner) {
149 return ValueType::Uuid;
150 }
151
152 return ValueType::QuotedString;
153 }
154
155 ValueType::QuotedString
157}
158
159fn is_integer(val: &[u8]) -> bool {
161 if val.is_empty() {
162 return false;
163 }
164 let start = if val[0] == b'-' { 1 } else { 0 };
165 if start >= val.len() {
166 return false;
167 }
168 val[start..].iter().all(|&b| b.is_ascii_digit())
169}
170
171fn parse_i64(val: &[u8]) -> Option<i64> {
173 let s = std::str::from_utf8(val).ok()?;
175 s.parse::<i64>().ok()
176}
177
178fn is_float(val: &[u8]) -> bool {
181 if val.is_empty() {
182 return false;
183 }
184 let s = match std::str::from_utf8(val) {
185 Ok(s) => s,
186 Err(_) => return false,
187 };
188 if s.parse::<f64>().is_err() {
190 return false;
191 }
192 val.iter().any(|&b| b == b'.' || b == b'e' || b == b'E')
194}
195
196fn is_iso8601(inner: &[u8]) -> bool {
199 if inner.len() < 19 {
201 return false;
202 }
203 if !inner[0].is_ascii_digit()
205 || !inner[1].is_ascii_digit()
206 || !inner[2].is_ascii_digit()
207 || !inner[3].is_ascii_digit()
208 || inner[4] != b'-'
209 || !inner[5].is_ascii_digit()
210 || !inner[6].is_ascii_digit()
211 || inner[7] != b'-'
212 || !inner[8].is_ascii_digit()
213 || !inner[9].is_ascii_digit()
214 || inner[10] != b'T'
215 || !inner[11].is_ascii_digit()
216 || !inner[12].is_ascii_digit()
217 || inner[13] != b':'
218 || !inner[14].is_ascii_digit()
219 || !inner[15].is_ascii_digit()
220 || inner[16] != b':'
221 || !inner[17].is_ascii_digit()
222 || !inner[18].is_ascii_digit()
223 {
224 return false;
225 }
226 let rest = &inner[19..];
228 if rest.is_empty() {
229 return true;
230 }
231 let mut pos = 0;
232 if pos < rest.len() && rest[pos] == b'.' {
234 pos += 1;
235 if pos >= rest.len() || !rest[pos].is_ascii_digit() {
236 return false;
237 }
238 while pos < rest.len() && rest[pos].is_ascii_digit() {
239 pos += 1;
240 }
241 }
242 if pos < rest.len() {
244 match rest[pos] {
245 b'Z' => {
246 pos += 1;
247 }
248 b'+' | b'-' => {
249 pos += 1;
250 if pos + 5 > rest.len() {
252 return false;
253 }
254 if !rest[pos].is_ascii_digit()
255 || !rest[pos + 1].is_ascii_digit()
256 || rest[pos + 2] != b':'
257 || !rest[pos + 3].is_ascii_digit()
258 || !rest[pos + 4].is_ascii_digit()
259 {
260 return false;
261 }
262 pos += 5;
263 }
264 _ => return false,
265 }
266 }
267 pos == rest.len()
268}
269
270fn is_uuid(inner: &[u8]) -> bool {
273 if inner.len() != 36 {
275 return false;
276 }
277 let groups = [
278 (0, 8), (9, 13), (14, 18), (19, 23), (24, 36), ];
284 if inner[8] != b'-' || inner[13] != b'-' || inner[18] != b'-' || inner[23] != b'-' {
286 return false;
287 }
288 for &(start, end) in &groups {
289 for &b in &inner[start..end] {
290 if !b.is_ascii_hexdigit() {
291 return false;
292 }
293 }
294 }
295 true
296}
297
298pub fn infer_schema(columnar_data: &[u8]) -> InferredSchema {
304 if columnar_data.is_empty() {
305 return InferredSchema {
306 columns: Vec::new(),
307 };
308 }
309
310 let col_chunks: Vec<&[u8]> = columnar_data.split(|&b| b == COL_SEP).collect();
311 let mut columns = Vec::with_capacity(col_chunks.len());
312
313 for col_data in &col_chunks {
314 let values: Vec<&[u8]> = col_data.split(|&b| b == VAL_SEP).collect();
315 let total_count = values.len();
316
317 let mut null_count: usize = 0;
319 let mut classifications: Vec<ValueType> = Vec::with_capacity(total_count);
320
321 for val in &values {
322 let vt = classify_value(val);
323 if vt == ValueType::Null {
324 null_count += 1;
325 }
326 classifications.push(vt);
327 }
328
329 let non_null: Vec<&ValueType> = classifications
330 .iter()
331 .filter(|c| **c != ValueType::Null)
332 .collect();
333 let nullable = null_count > 0;
334
335 let col_type = if non_null.is_empty() {
336 ColumnType::Null
338 } else if non_null.iter().all(|c| matches!(c, ValueType::Boolean)) {
339 ColumnType::Boolean { nullable }
340 } else if non_null.iter().all(|c| matches!(c, ValueType::Integer(_))) {
341 let mut min = i64::MAX;
342 let mut max = i64::MIN;
343 for c in &non_null {
344 if let ValueType::Integer(n) = c {
345 if *n < min {
346 min = *n;
347 }
348 if *n > max {
349 max = *n;
350 }
351 }
352 }
353 ColumnType::Integer { min, max, nullable }
354 } else if non_null
355 .iter()
356 .all(|c| matches!(c, ValueType::Integer(_) | ValueType::Float))
357 {
358 ColumnType::Float { nullable }
360 } else if non_null
361 .iter()
362 .all(|c| matches!(c, ValueType::TimestampIso))
363 {
364 ColumnType::Timestamp {
365 format: TimestampFormat::Iso8601,
366 nullable,
367 }
368 } else if non_null
369 .iter()
370 .all(|c| matches!(c, ValueType::TimestampEpochS))
371 {
372 ColumnType::Timestamp {
373 format: TimestampFormat::EpochSeconds,
374 nullable,
375 }
376 } else if non_null
377 .iter()
378 .all(|c| matches!(c, ValueType::TimestampEpochMs))
379 {
380 ColumnType::Timestamp {
381 format: TimestampFormat::EpochMillis,
382 nullable,
383 }
384 } else if non_null
385 .iter()
386 .all(|c| matches!(c, ValueType::TimestampEpochS | ValueType::TimestampEpochMs))
387 {
388 ColumnType::Timestamp {
390 format: TimestampFormat::EpochMillis,
391 nullable,
392 }
393 } else if non_null.iter().all(|c| matches!(c, ValueType::Uuid)) {
394 ColumnType::Uuid { nullable }
395 } else if non_null
396 .iter()
397 .all(|c| matches!(c, ValueType::QuotedString))
398 {
399 let mut unique_vals: HashSet<&[u8]> = HashSet::new();
401 for val in &values {
402 let vt = classify_value(val);
403 if vt != ValueType::Null {
404 unique_vals.insert(val);
405 }
406 }
407 let cardinality = unique_vals.len();
408 if cardinality <= 256 {
409 ColumnType::Enum {
410 cardinality: cardinality as u16,
411 nullable,
412 }
413 } else {
414 ColumnType::String { nullable }
415 }
416 } else {
417 ColumnType::String { nullable }
419 };
420
421 columns.push(ColumnSchema {
422 col_type,
423 null_count,
424 total_count,
425 });
426 }
427
428 InferredSchema { columns }
429}
430
431pub fn serialize_schema(schema: &InferredSchema) -> Vec<u8> {
445 let mut out = Vec::new();
446 out.extend_from_slice(&(schema.columns.len() as u16).to_le_bytes());
447
448 for col in &schema.columns {
449 let (tag, flags, extra) = match &col.col_type {
450 ColumnType::Null => (TAG_NULL, 0u8, Vec::new()),
451 ColumnType::Integer { min, max, nullable } => {
452 let mut extra = Vec::with_capacity(16);
453 extra.extend_from_slice(&min.to_le_bytes());
454 extra.extend_from_slice(&max.to_le_bytes());
455 (
456 TAG_INTEGER,
457 if *nullable { FLAG_NULLABLE } else { 0 },
458 extra,
459 )
460 }
461 ColumnType::Float { nullable } => (
462 TAG_FLOAT,
463 if *nullable { FLAG_NULLABLE } else { 0 },
464 Vec::new(),
465 ),
466 ColumnType::Boolean { nullable } => (
467 TAG_BOOLEAN,
468 if *nullable { FLAG_NULLABLE } else { 0 },
469 Vec::new(),
470 ),
471 ColumnType::Timestamp { format, nullable } => {
472 let tag = match format {
473 TimestampFormat::Iso8601 => TAG_TIMESTAMP_ISO,
474 TimestampFormat::EpochSeconds => TAG_TIMESTAMP_EPOCH_S,
475 TimestampFormat::EpochMillis => TAG_TIMESTAMP_EPOCH_MS,
476 };
477 (tag, if *nullable { FLAG_NULLABLE } else { 0 }, Vec::new())
478 }
479 ColumnType::Uuid { nullable } => (
480 TAG_UUID,
481 if *nullable { FLAG_NULLABLE } else { 0 },
482 Vec::new(),
483 ),
484 ColumnType::Enum {
485 cardinality,
486 nullable,
487 } => {
488 let mut extra = Vec::with_capacity(2);
489 extra.extend_from_slice(&cardinality.to_le_bytes());
490 (TAG_ENUM, if *nullable { FLAG_NULLABLE } else { 0 }, extra)
491 }
492 ColumnType::String { nullable } => (
493 TAG_STRING,
494 if *nullable { FLAG_NULLABLE } else { 0 },
495 Vec::new(),
496 ),
497 };
498 out.push(tag);
499 out.push(flags);
500 out.extend_from_slice(&extra);
501 }
502
503 out
504}
505
506pub fn deserialize_schema(data: &[u8]) -> InferredSchema {
508 if data.len() < 2 {
509 return InferredSchema {
510 columns: Vec::new(),
511 };
512 }
513
514 let num_columns = u16::from_le_bytes(data[0..2].try_into().unwrap()) as usize;
515 let mut pos = 2;
516 let mut columns = Vec::with_capacity(num_columns);
517
518 for _ in 0..num_columns {
519 if pos + 2 > data.len() {
520 break;
521 }
522 let tag = data[pos];
523 pos += 1;
524 let flags = data[pos];
525 pos += 1;
526 let nullable = (flags & FLAG_NULLABLE) != 0;
527
528 let col_type = match tag {
529 TAG_NULL => ColumnType::Null,
530 TAG_INTEGER => {
531 if pos + 16 > data.len() {
532 break;
533 }
534 let min = i64::from_le_bytes(data[pos..pos + 8].try_into().unwrap());
535 pos += 8;
536 let max = i64::from_le_bytes(data[pos..pos + 8].try_into().unwrap());
537 pos += 8;
538 ColumnType::Integer { min, max, nullable }
539 }
540 TAG_FLOAT => ColumnType::Float { nullable },
541 TAG_BOOLEAN => ColumnType::Boolean { nullable },
542 TAG_TIMESTAMP_ISO => ColumnType::Timestamp {
543 format: TimestampFormat::Iso8601,
544 nullable,
545 },
546 TAG_TIMESTAMP_EPOCH_S => ColumnType::Timestamp {
547 format: TimestampFormat::EpochSeconds,
548 nullable,
549 },
550 TAG_TIMESTAMP_EPOCH_MS => ColumnType::Timestamp {
551 format: TimestampFormat::EpochMillis,
552 nullable,
553 },
554 TAG_UUID => ColumnType::Uuid { nullable },
555 TAG_ENUM => {
556 if pos + 2 > data.len() {
557 break;
558 }
559 let cardinality = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap());
560 pos += 2;
561 ColumnType::Enum {
562 cardinality,
563 nullable,
564 }
565 }
566 TAG_STRING => ColumnType::String { nullable },
567 _ => ColumnType::String { nullable }, };
569
570 columns.push(ColumnSchema {
571 col_type,
572 null_count: 0, total_count: 0, });
575 }
576
577 InferredSchema { columns }
578}
579
580impl ColumnType {
583 #[cfg(test)]
585 fn integer_max(self) -> Option<i64> {
586 match self {
587 ColumnType::Integer { max, .. } => Some(max),
588 _ => None,
589 }
590 }
591}
592
593#[cfg(test)]
596mod tests {
597 use super::*;
598
599 fn build_columnar(columns: &[&[&[u8]]]) -> Vec<u8> {
602 let mut out = Vec::new();
603 for (ci, col) in columns.iter().enumerate() {
604 for (vi, val) in col.iter().enumerate() {
605 out.extend_from_slice(val);
606 if vi < col.len() - 1 {
607 out.push(VAL_SEP);
608 }
609 }
610 if ci < columns.len() - 1 {
611 out.push(COL_SEP);
612 }
613 }
614 out
615 }
616
617 #[test]
618 fn test_infer_integers() {
619 let data = build_columnar(&[&[b"1", b"2", b"300", b"-5"]]);
620 let schema = infer_schema(&data);
621 assert_eq!(schema.columns.len(), 1);
622 assert_eq!(
623 schema.columns[0].col_type,
624 ColumnType::Integer {
625 min: -5,
626 max: 300,
627 nullable: false,
628 }
629 );
630 assert_eq!(schema.columns[0].null_count, 0);
631 assert_eq!(schema.columns[0].total_count, 4);
632 }
633
634 #[test]
635 fn test_infer_floats() {
636 let data = build_columnar(&[&[b"3.14", b"2.718", b"1.0"]]);
637 let schema = infer_schema(&data);
638 assert_eq!(schema.columns.len(), 1);
639 assert_eq!(
640 schema.columns[0].col_type,
641 ColumnType::Float { nullable: false }
642 );
643 }
644
645 #[test]
646 fn test_infer_booleans() {
647 let data = build_columnar(&[&[b"true", b"false", b"true"]]);
648 let schema = infer_schema(&data);
649 assert_eq!(schema.columns.len(), 1);
650 assert_eq!(
651 schema.columns[0].col_type,
652 ColumnType::Boolean { nullable: false }
653 );
654 }
655
656 #[test]
657 fn test_infer_timestamps() {
658 let data = build_columnar(&[&[
659 br#""2026-03-15T10:30:00.001Z""#.as_slice(),
660 br#""2026-03-15T10:30:00.234Z""#.as_slice(),
661 br#""2026-03-15T10:30:01.000Z""#.as_slice(),
662 ]]);
663 let schema = infer_schema(&data);
664 assert_eq!(schema.columns.len(), 1);
665 assert_eq!(
666 schema.columns[0].col_type,
667 ColumnType::Timestamp {
668 format: TimestampFormat::Iso8601,
669 nullable: false,
670 }
671 );
672 }
673
674 #[test]
675 fn test_infer_timestamps_with_offset() {
676 let data = build_columnar(&[&[
677 br#""2026-03-15T10:30:00+05:30""#.as_slice(),
678 br#""2026-03-15T10:30:00-04:00""#.as_slice(),
679 ]]);
680 let schema = infer_schema(&data);
681 assert_eq!(
682 schema.columns[0].col_type,
683 ColumnType::Timestamp {
684 format: TimestampFormat::Iso8601,
685 nullable: false,
686 }
687 );
688 }
689
690 #[test]
691 fn test_infer_uuids() {
692 let data = build_columnar(&[&[
693 br#""550e8400-e29b-41d4-a716-446655440000""#.as_slice(),
694 br#""6ba7b810-9dad-11d1-80b4-00c04fd430c8""#.as_slice(),
695 br#""f47ac10b-58cc-4372-a567-0e02b2c3d479""#.as_slice(),
696 ]]);
697 let schema = infer_schema(&data);
698 assert_eq!(schema.columns.len(), 1);
699 assert_eq!(
700 schema.columns[0].col_type,
701 ColumnType::Uuid { nullable: false }
702 );
703 }
704
705 #[test]
706 fn test_infer_enums() {
707 let data = build_columnar(&[&[
708 br#""page_view""#.as_slice(),
709 br#""api_call""#.as_slice(),
710 br#""click""#.as_slice(),
711 br#""page_view""#.as_slice(),
712 br#""scroll""#.as_slice(),
713 br#""api_call""#.as_slice(),
714 ]]);
715 let schema = infer_schema(&data);
716 assert_eq!(schema.columns.len(), 1);
717 match &schema.columns[0].col_type {
718 ColumnType::Enum {
719 cardinality,
720 nullable,
721 } => {
722 assert_eq!(*cardinality, 4); assert!(!nullable);
724 }
725 other => panic!("expected Enum, got {:?}", other),
726 }
727 }
728
729 #[test]
730 fn test_infer_strings() {
731 let vals: Vec<Vec<u8>> = (0..300)
733 .map(|i| format!("\"unique_value_{}\"", i).into_bytes())
734 .collect();
735 let val_refs: Vec<&[u8]> = vals.iter().map(|v| v.as_slice()).collect();
736 let data = build_columnar(&[&val_refs]);
737 let schema = infer_schema(&data);
738 assert_eq!(schema.columns.len(), 1);
739 assert_eq!(
740 schema.columns[0].col_type,
741 ColumnType::String { nullable: false }
742 );
743 }
744
745 #[test]
746 fn test_infer_nullable() {
747 let data = build_columnar(&[&[b"1", b"null", b"3", b"null", b"5"]]);
748 let schema = infer_schema(&data);
749 assert_eq!(schema.columns.len(), 1);
750 assert_eq!(
751 schema.columns[0].col_type,
752 ColumnType::Integer {
753 min: 1,
754 max: 5,
755 nullable: true,
756 }
757 );
758 assert_eq!(schema.columns[0].null_count, 2);
759 assert_eq!(schema.columns[0].total_count, 5);
760 }
761
762 #[test]
763 fn test_infer_mixed_int_float() {
764 let data = build_columnar(&[&[b"1", b"2.5", b"3"]]);
765 let schema = infer_schema(&data);
766 assert_eq!(schema.columns.len(), 1);
767 assert_eq!(
768 schema.columns[0].col_type,
769 ColumnType::Float { nullable: false }
770 );
771 }
772
773 #[test]
774 fn test_infer_all_null() {
775 let data = build_columnar(&[&[b"null", b"null", b"null"]]);
776 let schema = infer_schema(&data);
777 assert_eq!(schema.columns.len(), 1);
778 assert_eq!(schema.columns[0].col_type, ColumnType::Null);
779 assert_eq!(schema.columns[0].null_count, 3);
780 }
781
782 #[test]
783 fn test_schema_roundtrip() {
784 let schema = InferredSchema {
786 columns: vec![
787 ColumnSchema {
788 col_type: ColumnType::Null,
789 null_count: 10,
790 total_count: 10,
791 },
792 ColumnSchema {
793 col_type: ColumnType::Integer {
794 min: -100,
795 max: 999,
796 nullable: true,
797 },
798 null_count: 2,
799 total_count: 50,
800 },
801 ColumnSchema {
802 col_type: ColumnType::Float { nullable: false },
803 null_count: 0,
804 total_count: 50,
805 },
806 ColumnSchema {
807 col_type: ColumnType::Boolean { nullable: true },
808 null_count: 1,
809 total_count: 50,
810 },
811 ColumnSchema {
812 col_type: ColumnType::Timestamp {
813 format: TimestampFormat::Iso8601,
814 nullable: false,
815 },
816 null_count: 0,
817 total_count: 50,
818 },
819 ColumnSchema {
820 col_type: ColumnType::Timestamp {
821 format: TimestampFormat::EpochSeconds,
822 nullable: true,
823 },
824 null_count: 3,
825 total_count: 50,
826 },
827 ColumnSchema {
828 col_type: ColumnType::Timestamp {
829 format: TimestampFormat::EpochMillis,
830 nullable: false,
831 },
832 null_count: 0,
833 total_count: 50,
834 },
835 ColumnSchema {
836 col_type: ColumnType::Uuid { nullable: false },
837 null_count: 0,
838 total_count: 50,
839 },
840 ColumnSchema {
841 col_type: ColumnType::Enum {
842 cardinality: 7,
843 nullable: true,
844 },
845 null_count: 5,
846 total_count: 50,
847 },
848 ColumnSchema {
849 col_type: ColumnType::String { nullable: false },
850 null_count: 0,
851 total_count: 50,
852 },
853 ],
854 };
855
856 let bytes = serialize_schema(&schema);
857 let recovered = deserialize_schema(&bytes);
858
859 assert_eq!(recovered.columns.len(), schema.columns.len());
860 for (orig, rec) in schema.columns.iter().zip(recovered.columns.iter()) {
861 assert_eq!(orig.col_type, rec.col_type);
862 }
863 }
864
865 #[test]
866 fn test_serialize_size() {
867 let schema = InferredSchema {
869 columns: vec![
870 ColumnSchema {
871 col_type: ColumnType::Integer {
872 min: 0,
873 max: 1000,
874 nullable: false,
875 },
876 null_count: 0,
877 total_count: 100,
878 },
879 ColumnSchema {
880 col_type: ColumnType::String { nullable: true },
881 null_count: 5,
882 total_count: 100,
883 },
884 ],
885 };
886 let bytes = serialize_schema(&schema);
887 assert_eq!(bytes.len(), 22);
892 }
893
894 #[test]
895 fn test_empty_input() {
896 let schema = infer_schema(b"");
897 assert!(schema.columns.is_empty());
898 }
899
900 #[test]
901 fn test_multi_column() {
902 let data = build_columnar(&[&[b"1", b"2", b"3"], &[b"true", b"false", b"true"]]);
904 let schema = infer_schema(&data);
905 assert_eq!(schema.columns.len(), 2);
906 assert_eq!(
907 schema.columns[0].col_type,
908 ColumnType::Integer {
909 min: 1,
910 max: 3,
911 nullable: false,
912 }
913 );
914 assert_eq!(
915 schema.columns[1].col_type,
916 ColumnType::Boolean { nullable: false }
917 );
918 }
919
920 #[test]
921 fn test_epoch_seconds() {
922 let data = build_columnar(&[&[b"1742036400", b"1742036500", b"1742036600"]]);
924 let schema = infer_schema(&data);
925 assert_eq!(
926 schema.columns[0].col_type,
927 ColumnType::Timestamp {
928 format: TimestampFormat::EpochSeconds,
929 nullable: false,
930 }
931 );
932 }
933
934 #[test]
935 fn test_epoch_millis() {
936 let data = build_columnar(&[&[b"1742036400001", b"1742036400234", b"1742036401000"]]);
937 let schema = infer_schema(&data);
938 assert_eq!(
939 schema.columns[0].col_type,
940 ColumnType::Timestamp {
941 format: TimestampFormat::EpochMillis,
942 nullable: false,
943 }
944 );
945 }
946
947 #[test]
948 fn test_real_ndjson_corpus() {
949 let corpus = std::fs::read(concat!(
951 env!("CARGO_MANIFEST_DIR"),
952 "/../../corpus/test-ndjson.ndjson"
953 ))
954 .expect("failed to read test-ndjson.ndjson");
955
956 let transform_result =
957 crate::format::ndjson::preprocess(&corpus).expect("ndjson::preprocess failed");
958
959 let schema = infer_schema(&transform_result.data);
960
961 assert!(
968 schema.columns.len() >= 19,
969 "expected at least 19 columns, got {}",
970 schema.columns.len()
971 );
972
973 assert_eq!(
976 schema.columns[0].col_type,
977 ColumnType::Timestamp {
978 format: TimestampFormat::Iso8601,
979 nullable: false,
980 },
981 "column 0 (timestamp) should be Timestamp/Iso8601"
982 );
983
984 match &schema.columns[1].col_type {
986 ColumnType::Enum {
987 cardinality,
988 nullable,
989 } => {
990 assert!(*cardinality <= 20, "event_type cardinality should be low");
991 assert!(!nullable, "event_type should not be nullable");
992 }
993 other => panic!("column 1 (event_type) should be Enum, got {:?}", other),
994 }
995
996 match &schema.columns[2].col_type {
998 ColumnType::Enum { .. } | ColumnType::String { .. } => {
999 }
1001 other => panic!(
1002 "column 2 (user_id) should be Enum or String, got {:?}",
1003 other
1004 ),
1005 }
1006
1007 assert_eq!(
1009 schema.columns[15].col_type,
1010 ColumnType::Integer {
1011 min: 0,
1012 max: schema.columns[15]
1013 .col_type
1014 .clone()
1015 .integer_max()
1016 .unwrap_or(0),
1017 nullable: false,
1018 },
1019 "column 15 (duration_ms) should be Integer"
1020 );
1021
1022 assert_eq!(
1024 schema.columns[16].col_type,
1025 ColumnType::Boolean { nullable: false },
1026 "column 16 (is_authenticated) should be Boolean"
1027 );
1028
1029 match &schema.columns[5].col_type {
1031 ColumnType::Enum { nullable, .. } | ColumnType::String { nullable } => {
1032 assert!(*nullable, "column 5 (referrer) should be nullable");
1033 }
1034 other => panic!(
1035 "column 5 (referrer) should be nullable Enum/String, got {:?}",
1036 other
1037 ),
1038 }
1039 }
1040}