1use crate::types::*;
8use std::collections::{HashMap, HashSet};
9
10pub fn infer_type(value: &str) -> Type {
14 if value == "_" || value.is_empty() || value == "null" || value == "None" {
15 return Type::Null;
16 }
17
18 if value == "T" || value == "F" || value == "true" || value == "false" {
19 return Type::Bool;
20 }
21
22 if let Ok(_) = value.parse::<i64>() {
23 return if value.starts_with('-') {
24 Type::Int
25 } else {
26 Type::Uint
27 };
28 }
29
30 if let Ok(_) = value.parse::<f64>() {
31 return Type::Float;
32 }
33
34 if is_datetime(value) {
35 return Type::DateTime;
36 }
37
38 if is_date(value) {
39 return Type::Date;
40 }
41
42 if is_duration(value) {
43 return Type::Duration;
44 }
45
46 if value.starts_with('[') && value.ends_with(']') {
47 return Type::Array(Box::new(Type::Str));
48 }
49
50 if value.starts_with('{') && value.ends_with('}') {
51 return Type::Map;
52 }
53
54 Type::Str
55}
56
57fn is_date(s: &str) -> bool {
58 if s.len() != 10 || !s.is_ascii() {
59 return false;
60 }
61 let b = s.as_bytes();
62 b[4] == b'-'
63 && b[7] == b'-'
64 && b[0..4].iter().all(|c| c.is_ascii_digit())
65 && b[5..7].iter().all(|c| c.is_ascii_digit())
66 && b[8..10].iter().all(|c| c.is_ascii_digit())
67}
68
69fn is_datetime(s: &str) -> bool {
70 if !s.is_ascii() || s.len() < 11 {
71 return false;
72 }
73 if let Some(t_pos) = s.find('T') {
74 if t_pos == 10 {
75 return is_date(&s[..10]);
76 }
77 }
78 false
79}
80
81fn is_duration(s: &str) -> bool {
82 s.starts_with('P')
83 && s.len() >= 2
84 && s[1..]
85 .chars()
86 .all(|c| c.is_ascii_digit() || "YMWDTHS.".contains(c))
87}
88
89pub fn merge_types(a: &Type, b: &Type) -> Type {
94 if a == b {
95 return a.clone();
96 }
97
98 match (a, b) {
99 (Type::Null, Type::Nullable(inner)) | (Type::Nullable(inner), Type::Null) => {
100 Type::Nullable(inner.clone())
101 }
102 (Type::Null, other) | (other, Type::Null) => Type::Nullable(Box::new(other.clone())),
103
104 (Type::Nullable(a_inner), Type::Nullable(b_inner)) => {
105 Type::Nullable(Box::new(merge_types(a_inner, b_inner)))
106 }
107 (Type::Nullable(inner), other) | (other, Type::Nullable(inner)) => {
108 Type::Nullable(Box::new(merge_types(inner, other)))
109 }
110
111 (Type::Int, Type::Uint) | (Type::Uint, Type::Int) => Type::Int,
112
113 (Type::Int | Type::Uint, Type::Float) | (Type::Float, Type::Int | Type::Uint) => {
114 Type::Float
115 }
116
117 (Type::Date, Type::DateTime) | (Type::DateTime, Type::Date) => Type::DateTime,
118
119 _ => Type::Str,
120 }
121}
122
123pub fn infer_schema(rows: &[Vec<(&str, &str)>]) -> Schema {
125 if rows.is_empty() {
126 return Schema {
127 fields: Vec::new(),
128 };
129 }
130
131 let mut keys: Vec<String> = Vec::new();
132 let mut seen = HashSet::new();
133 for row in rows {
134 for (k, _) in row {
135 if seen.insert(*k) {
136 keys.push(k.to_string());
137 }
138 }
139 }
140
141 let fields: Vec<FieldDef> = keys
142 .iter()
143 .map(|key| {
144 let mut ty: Option<Type> = None;
145 let mut all_present = true;
146
147 for row in rows {
148 if let Some((_, v)) = row.iter().find(|(k, _)| *k == key.as_str()) {
149 let inferred = infer_type(v);
150 ty = Some(match ty {
151 Some(existing) => merge_types(&existing, &inferred),
152 None => inferred,
153 });
154 } else {
155 all_present = false;
156 }
157 }
158
159 let mut final_ty = ty.unwrap_or(Type::Any);
160
161 if !all_present && !matches!(&final_ty, Type::Nullable(_)) {
162 final_ty = Type::Nullable(Box::new(final_ty));
163 }
164
165 let semantic = infer_semantic(key);
166
167 FieldDef {
168 name: key.clone(),
169 field_type: final_ty,
170 semantic,
171 deprecated: false,
172 modifiers: Vec::new(),
173 }
174 })
175 .collect();
176
177 Schema { fields }
178}
179
180pub fn infer_schema_owned(rows: &[Vec<(String, String)>]) -> Schema {
182 let borrowed: Vec<Vec<(&str, &str)>> = rows
183 .iter()
184 .map(|row| {
185 row.iter()
186 .map(|(k, v)| (k.as_str(), v.as_str()))
187 .collect()
188 })
189 .collect();
190 infer_schema(&borrowed)
191}
192
193pub fn infer_semantic(name: &str) -> Option<String> {
195 let lower = name.to_lowercase();
196 let sem = match lower.as_str() {
197 "id" | "uid" | "uuid" | "guid" => "id",
198 "url" | "href" | "link" | "uri" => "url",
199 "path" | "file" | "filepath" | "file_path" | "filename" => "path",
200 "error" | "err" | "error_message" | "err_msg" => "error",
201 "warning" | "warn" => "warning",
202 "command" | "cmd" => "command",
203 "description" | "desc" | "summary" | "body" => "description",
204 "name" | "title" | "label" | "display_name" => "label",
205 "line" | "line_number" | "lineno" | "line_no" => "line_number",
206 "code" | "source_code" | "snippet" => "snippet",
207 _ => {
208 if lower.ends_with("_id") || lower.ends_with("_uid") {
209 return Some("ref".to_string());
210 }
211 return None;
212 }
213 };
214 Some(sem.to_string())
215}
216
217pub fn sanitize_field_name(name: &str) -> String {
219 let s = name.trim();
220 let s = if s.starts_with('"') && s.ends_with('"') && s.len() >= 2 {
221 &s[1..s.len() - 1]
222 } else {
223 s
224 };
225 let mut out = String::with_capacity(s.len());
226 for c in s.chars() {
227 if c.is_ascii_alphanumeric() || c == '_' {
228 out.push(c.to_ascii_lowercase());
229 } else if c == ' ' || c == '-' || c == '/' || c == '.' || c == '$' {
230 if !out.ends_with('_') {
231 out.push('_');
232 }
233 }
234 }
235 while out.ends_with('_') {
236 out.pop();
237 }
238 if out.is_empty() {
239 "field".to_string()
240 } else {
241 out
242 }
243}
244
245pub fn from_json(json: &str) -> Result<Document, String> {
260 let json = json.trim();
261
262 if json.starts_with('[') {
263 let inner = json[1..].trim_start();
264 if inner.starts_with('{') || inner.starts_with(']') {
265 from_json_array(json)
266 } else {
267 from_json_scalar_array(json)
268 }
269 } else if json.starts_with('{') {
270 from_json_object(json)
271 } else {
272 Err("expected JSON array or object".into())
273 }
274}
275
276fn from_json_object(json: &str) -> Result<Document, String> {
285 let obj = parse_json_object(json)?;
286
287 struct FieldInfo {
289 key: String,
290 value: String,
291 kind: FieldKind,
292 size: usize,
293 }
294
295 #[derive(Debug)]
296 enum FieldKind {
297 Scalar,
298 ArrayOfObjects,
299 ArrayOfScalars,
300 ObjectOfObjects,
301 Object,
302 EmptyArray,
303 }
304
305 let mut fields: Vec<FieldInfo> = Vec::new();
306 for (key, value) in &obj {
307 let v = value.trim();
308 let kind = if v.starts_with('[') && v.ends_with(']') {
309 let inner = v[1..].trim_start();
310 if inner.starts_with(']') || inner.is_empty() {
311 FieldKind::EmptyArray
312 } else if inner.starts_with('{') {
313 FieldKind::ArrayOfObjects
314 } else {
315 FieldKind::ArrayOfScalars
316 }
317 } else if v.starts_with('{') && v.ends_with('}') {
318 if let Ok(inner_obj) = parse_json_object(v) {
320 let obj_children = inner_obj.iter()
321 .filter(|(_, v)| v.trim().starts_with('{'))
322 .count();
323 if obj_children > 3 && obj_children as f64 / inner_obj.len().max(1) as f64 > 0.5 {
324 FieldKind::ObjectOfObjects
325 } else {
326 FieldKind::Object
327 }
328 } else {
329 FieldKind::Object
330 }
331 } else {
332 FieldKind::Scalar
333 };
334 fields.push(FieldInfo {
335 key: key.clone(),
336 value: value.clone(),
337 kind,
338 size: value.len(),
339 });
340 }
341
342 let structured_children: Vec<&FieldInfo> = fields.iter()
344 .filter(|f| matches!(f.kind,
345 FieldKind::ArrayOfObjects | FieldKind::ObjectOfObjects))
346 .collect();
347
348 let has_obj_of_obj = structured_children.iter()
350 .any(|f| matches!(f.kind, FieldKind::ObjectOfObjects));
351 let array_children: Vec<&&FieldInfo> = structured_children.iter()
352 .filter(|f| matches!(f.kind, FieldKind::ArrayOfObjects))
353 .collect();
354
355 if !has_obj_of_obj && array_children.len() == 1 {
356 return from_json_array(&array_children[0].value);
357 }
358
359 let mut sections = Vec::new();
362
363 let scalars: Vec<(&str, &str)> = fields.iter()
365 .filter(|f| matches!(f.kind, FieldKind::Scalar))
366 .map(|f| (f.key.as_str(), f.value.as_str()))
367 .collect();
368
369 if !scalars.is_empty() {
370 let schema = Schema {
371 fields: vec![
372 FieldDef {
373 name: "key".to_string(),
374 field_type: Type::Str,
375 semantic: Some("id".to_string()),
376 deprecated: false,
377 modifiers: Vec::new(),
378 },
379 FieldDef {
380 name: "value".to_string(),
381 field_type: Type::Str,
382 semantic: None,
383 deprecated: false,
384 modifiers: Vec::new(),
385 },
386 ],
387 };
388 let records: Vec<Record> = scalars.iter().map(|(k, v)| {
389 Record {
390 values: vec![
391 Value::Str(k.to_string()),
392 Value::Str(v.to_string()),
393 ],
394 cdc_op: CdcOp::Insert,
395 }
396 }).collect();
397 sections.push(Section {
398 id: Some("metadata".to_string()),
399 directives: vec![Directive::Context("Document metadata".to_string())],
400 schema: Some(schema),
401 records,
402 blocks: Vec::new(),
403 templates: Vec::new(),
404 });
405 }
406
407 for field in &fields {
409 match &field.kind {
410 FieldKind::ArrayOfObjects => {
411 if let Ok(objects) = parse_json_array(&field.value) {
412 if objects.is_empty() { continue; }
413 let flat: Vec<Vec<(String, String)>> = objects.iter()
414 .map(|o| flatten_object(o, ""))
415 .collect();
416 let rows: Vec<Vec<(&str, &str)>> = flat.iter()
417 .map(|r| r.iter().map(|(k,v)| (k.as_str(), v.as_str())).collect())
418 .collect();
419 let schema = infer_schema(&rows);
420 let records = build_records(&flat, &schema);
421 let section_id = sanitize_field_name(&field.key);
422 sections.push(Section {
423 id: Some(section_id.clone()),
424 directives: vec![Directive::Context(
425 format!("{} ({} records)", field.key, records.len())
426 )],
427 schema: Some(schema),
428 records,
429 blocks: Vec::new(),
430 templates: Vec::new(),
431 });
432 }
433 }
434 FieldKind::ObjectOfObjects => {
435 if let Ok(inner_obj) = parse_json_object(field.value.trim()) {
437 let mut all_flat: Vec<Vec<(String, String)>> = Vec::new();
438 for (k, v) in &inner_obj {
439 let trimmed = v.trim();
440 if trimmed.starts_with('{') {
441 if let Ok(child) = parse_json_object(trimmed) {
442 let mut flat = flatten_object(&child, "");
443 flat.insert(0, ("_key".to_string(), k.clone()));
444 all_flat.push(flat);
445 }
446 } else {
447 all_flat.push(vec![
449 ("_key".to_string(), k.clone()),
450 ("value".to_string(), json_value_to_sif_string(trimmed)),
451 ]);
452 }
453 }
454 if all_flat.is_empty() { continue; }
455 let rows: Vec<Vec<(&str, &str)>> = all_flat.iter()
456 .map(|r| r.iter().map(|(k,v)| (k.as_str(), v.as_str())).collect())
457 .collect();
458 let schema = infer_schema(&rows);
459 let records = build_records(&all_flat, &schema);
460 let section_id = sanitize_field_name(&field.key);
461 sections.push(Section {
462 id: Some(section_id.clone()),
463 directives: vec![Directive::Context(
464 format!("{} ({} entries)", field.key, records.len())
465 )],
466 schema: Some(schema),
467 records,
468 blocks: Vec::new(),
469 templates: Vec::new(),
470 });
471 }
472 }
473 FieldKind::Object if field.size > 1024 => {
474 if let Ok(inner) = parse_json_object(field.value.trim()) {
476 let flat = flatten_object(&inner, "");
477 let schema = Schema {
478 fields: vec![
479 FieldDef {
480 name: "key".to_string(),
481 field_type: Type::Str,
482 semantic: Some("id".to_string()),
483 deprecated: false,
484 modifiers: Vec::new(),
485 },
486 FieldDef {
487 name: "value".to_string(),
488 field_type: Type::Str,
489 semantic: None,
490 deprecated: false,
491 modifiers: Vec::new(),
492 },
493 ],
494 };
495 let records: Vec<Record> = flat.iter().map(|(k, v)| {
496 Record {
497 values: vec![
498 Value::Str(k.clone()),
499 Value::Str(v.clone()),
500 ],
501 cdc_op: CdcOp::Insert,
502 }
503 }).collect();
504 let section_id = sanitize_field_name(&field.key);
505 sections.push(Section {
506 id: Some(section_id),
507 directives: vec![Directive::Context(field.key.clone())],
508 schema: Some(schema),
509 records,
510 blocks: Vec::new(),
511 templates: Vec::new(),
512 });
513 }
514 }
515 _ => {} }
517 }
518
519 if sections.is_empty() {
521 let wrapped = format!("[{}]", json);
522 return from_json_array(&wrapped);
523 }
524
525 Ok(Document {
526 header: Header {
527 version: 1,
528 attributes: HashMap::new(),
529 },
530 sections,
531 })
532}
533
534pub fn from_jsonl(input: &str, max_records: Option<usize>) -> Result<Document, String> {
536 let limit = max_records.unwrap_or(usize::MAX);
537 let mut all_flat: Vec<Vec<(String, String)>> = Vec::new();
538
539 let mut buf = String::new();
540 let mut depth = 0i32;
541 let mut in_string = false;
542 let mut escape_next = false;
543
544 for line in input.lines() {
545 let trimmed = line.trim();
546 if trimmed.is_empty() && depth == 0 {
547 continue;
548 }
549 if all_flat.len() >= limit {
550 break;
551 }
552
553 if !buf.is_empty() {
554 buf.push(' ');
555 }
556 buf.push_str(line);
557
558 for ch in line.chars() {
559 if escape_next {
560 escape_next = false;
561 continue;
562 }
563 if ch == '\\' && in_string {
564 escape_next = true;
565 continue;
566 }
567 if ch == '"' {
568 in_string = !in_string;
569 continue;
570 }
571 if !in_string {
572 if ch == '{' {
573 depth += 1;
574 }
575 if ch == '}' {
576 depth -= 1;
577 }
578 }
579 }
580
581 if depth == 0 && !buf.trim().is_empty() {
582 let obj = parse_json_object(buf.trim())?;
583 let flat = flatten_object(&obj, "");
584 all_flat.push(flat);
585 buf.clear();
586 in_string = false;
587 }
588 }
589
590 if all_flat.is_empty() {
591 return Ok(empty_doc());
592 }
593
594 let sample_size = all_flat.len().min(1000);
595 let sample: Vec<Vec<(&str, &str)>> = all_flat[..sample_size]
596 .iter()
597 .map(|row| {
598 row.iter()
599 .map(|(k, v)| (k.as_str(), v.as_str()))
600 .collect()
601 })
602 .collect();
603 let schema = infer_schema(&sample);
604
605 let records = build_records(&all_flat, &schema);
606
607 Ok(Document {
608 header: Header {
609 version: 1,
610 attributes: HashMap::new(),
611 },
612 sections: vec![Section {
613 id: None,
614 directives: Vec::new(),
615 schema: Some(schema),
616 records,
617 blocks: Vec::new(),
618 templates: Vec::new(),
619 }],
620 })
621}
622
623pub fn from_csv(csv: &str) -> Result<Document, String> {
625 let mut lines = csv.lines();
626
627 let header_line = lines.next().ok_or("empty CSV")?;
628 let raw_headers = split_csv_line(header_line);
629 let headers: Vec<String> = raw_headers
630 .iter()
631 .map(|h| sanitize_field_name(h))
632 .collect();
633
634 let mut raw_rows: Vec<Vec<String>> = Vec::new();
635 for line in lines {
636 if line.trim().is_empty() {
637 continue;
638 }
639 let values: Vec<String> = split_csv_line(line)
640 .into_iter()
641 .map(|s| {
642 let trimmed = s.trim().to_string();
643 if trimmed.is_empty() {
644 "_".to_string()
645 } else {
646 trimmed
647 }
648 })
649 .collect();
650 raw_rows.push(values);
651 }
652
653 let rows: Vec<Vec<(&str, &str)>> = raw_rows
654 .iter()
655 .map(|row| {
656 headers
657 .iter()
658 .zip(row.iter())
659 .map(|(h, v)| (h.as_str(), v.as_str()))
660 .collect()
661 })
662 .collect();
663
664 let schema = infer_schema(&rows);
665 let records = build_records_csv(&raw_rows, &schema);
666
667 Ok(Document {
668 header: Header {
669 version: 1,
670 attributes: HashMap::new(),
671 },
672 sections: vec![Section {
673 id: None,
674 directives: Vec::new(),
675 schema: Some(schema),
676 records,
677 blocks: Vec::new(),
678 templates: Vec::new(),
679 }],
680 })
681}
682
683pub fn to_jsonl(schema: &Schema, records: &[Record]) -> String {
687 let mut out = String::new();
688 for record in records {
689 out.push('{');
690 for (i, field) in schema.fields.iter().enumerate() {
691 if i > 0 {
692 out.push(',');
693 }
694 out.push('"');
695 out.push_str(&field.name);
696 out.push_str("\":");
697 value_to_json(&record.values[i], &mut out);
698 }
699 out.push_str("}\n");
700 }
701 out
702}
703
704pub fn to_json_array(schema: &Schema, records: &[Record]) -> String {
706 let mut out = String::from("[");
707 for (ri, record) in records.iter().enumerate() {
708 if ri > 0 {
709 out.push(',');
710 }
711 out.push('{');
712 for (i, field) in schema.fields.iter().enumerate() {
713 if i > 0 {
714 out.push(',');
715 }
716 out.push('"');
717 out.push_str(&field.name);
718 out.push_str("\":");
719 if i < record.values.len() {
720 value_to_json(&record.values[i], &mut out);
721 } else {
722 out.push_str("null");
723 }
724 }
725 out.push('}');
726 }
727 out.push(']');
728 out
729}
730
731pub fn to_csv(schema: &Schema, records: &[Record]) -> String {
733 let mut out = String::new();
734 for (i, field) in schema.fields.iter().enumerate() {
735 if i > 0 {
736 out.push(',');
737 }
738 out.push_str(&field.name);
739 }
740 out.push('\n');
741 for record in records {
742 for (i, value) in record.values.iter().enumerate() {
743 if i > 0 {
744 out.push(',');
745 }
746 value_to_csv(value, &mut out);
747 }
748 out.push('\n');
749 }
750 out
751}
752
753fn value_to_json(value: &Value, out: &mut String) {
754 match value {
755 Value::Null => out.push_str("null"),
756 Value::Bool(true) => out.push_str("true"),
757 Value::Bool(false) => out.push_str("false"),
758 Value::Int(n) => out.push_str(&n.to_string()),
759 Value::Uint(n) => out.push_str(&n.to_string()),
760 Value::Float(n) => {
761 if n.is_finite() {
762 let s = n.to_string();
763 if s.contains('.') {
764 out.push_str(&s);
765 } else {
766 out.push_str(&format!("{}.0", s));
767 }
768 } else {
769 out.push_str("null");
770 }
771 }
772 Value::Str(s)
773 | Value::Date(s)
774 | Value::DateTime(s)
775 | Value::Duration(s)
776 | Value::Enum(s) => {
777 out.push('"');
778 for c in s.chars() {
779 match c {
780 '"' => out.push_str("\\\""),
781 '\\' => out.push_str("\\\\"),
782 '\n' => out.push_str("\\n"),
783 '\t' => out.push_str("\\t"),
784 '\r' => out.push_str("\\r"),
785 c if c.is_control() => out.push_str(&format!("\\u{:04x}", c as u32)),
786 c => out.push(c),
787 }
788 }
789 out.push('"');
790 }
791 Value::Bytes(b) => {
792 out.push('"');
793 out.push_str(&crate::types::base64_encode(b));
794 out.push('"');
795 }
796 Value::Array(items) => {
797 out.push('[');
798 for (i, item) in items.iter().enumerate() {
799 if i > 0 {
800 out.push(',');
801 }
802 value_to_json(item, out);
803 }
804 out.push(']');
805 }
806 Value::Map(pairs) => {
807 out.push('{');
808 for (i, (k, v)) in pairs.iter().enumerate() {
809 if i > 0 {
810 out.push(',');
811 }
812 out.push('"');
813 out.push_str(k);
814 out.push_str("\":");
815 value_to_json(v, out);
816 }
817 out.push('}');
818 }
819 }
820}
821
822fn value_to_csv(value: &Value, out: &mut String) {
823 match value {
824 Value::Null => {}
825 Value::Bool(true) => out.push_str("true"),
826 Value::Bool(false) => out.push_str("false"),
827 Value::Int(n) => out.push_str(&n.to_string()),
828 Value::Uint(n) => out.push_str(&n.to_string()),
829 Value::Float(n) => out.push_str(&n.to_string()),
830 Value::Str(s) => {
831 if s.contains(',') || s.contains('"') || s.contains('\n') || s.contains('\r') {
832 out.push('"');
833 out.push_str(&s.replace('"', "\"\""));
834 out.push('"');
835 } else {
836 out.push_str(s);
837 }
838 }
839 _ => {
840 let mut json = String::new();
841 value_to_json(value, &mut json);
842 if json.contains(',') || json.contains('"') {
843 out.push('"');
844 out.push_str(&json.replace('"', "\"\""));
845 out.push('"');
846 } else {
847 out.push_str(&json);
848 }
849 }
850 }
851}
852
853const MAX_FLATTEN_DEPTH: usize = 2;
856
857fn from_json_array(json: &str) -> Result<Document, String> {
858 let json = json.trim();
859 let objects = parse_json_array(json)?;
860 if objects.is_empty() {
861 return Ok(empty_doc());
862 }
863
864 let flat_objects: Vec<Vec<(String, String)>> =
865 objects.iter().map(|obj| flatten_object(obj, "")).collect();
866
867 let rows: Vec<Vec<(&str, &str)>> = flat_objects
868 .iter()
869 .map(|obj| {
870 obj.iter()
871 .map(|(k, v)| (k.as_str(), v.as_str()))
872 .collect()
873 })
874 .collect();
875
876 let schema = infer_schema(&rows);
877 let records = build_records(&flat_objects, &schema);
878
879 Ok(Document {
880 header: Header {
881 version: 1,
882 attributes: HashMap::new(),
883 },
884 sections: vec![Section {
885 id: None,
886 directives: Vec::new(),
887 schema: Some(schema),
888 records,
889 blocks: Vec::new(),
890 templates: Vec::new(),
891 }],
892 })
893}
894
895fn from_json_scalar_array(json: &str) -> Result<Document, String> {
896 let inner = json[1..json.len() - 1].trim();
897 if inner.is_empty() {
898 return Ok(empty_doc());
899 }
900
901 let parts = split_json_top_level(inner, ',');
902 let values: Vec<String> = parts
903 .iter()
904 .map(|p| json_value_to_sif_string(p.trim()))
905 .collect();
906
907 let rows: Vec<Vec<(&str, &str)>> = values
908 .iter()
909 .map(|v| vec![("value", v.as_str())])
910 .collect();
911 let schema = infer_schema(&rows);
912
913 let records: Vec<Record> = values
914 .iter()
915 .map(|v| {
916 let val = crate::parse::parse_untyped_value(v);
917 Record {
918 values: vec![val],
919 cdc_op: CdcOp::Insert,
920 }
921 })
922 .collect();
923
924 Ok(Document {
925 header: Header {
926 version: 1,
927 attributes: HashMap::new(),
928 },
929 sections: vec![Section {
930 id: None,
931 directives: Vec::new(),
932 schema: Some(schema),
933 records,
934 blocks: Vec::new(),
935 templates: Vec::new(),
936 }],
937 })
938}
939
940fn parse_json_array(json: &str) -> Result<Vec<Vec<(String, String)>>, String> {
941 let json = json.trim();
942 if !json.starts_with('[') || !json.ends_with(']') {
943 return Err("expected JSON array".into());
944 }
945 let inner = json[1..json.len() - 1].trim();
946 if inner.is_empty() {
947 return Ok(Vec::new());
948 }
949
950 let mut objects = Vec::new();
951 let parts = split_json_top_level(inner, ',');
952 for part in parts {
953 let part = part.trim();
954 if part.is_empty() {
955 continue;
956 }
957 objects.push(parse_json_object(part)?);
958 }
959 Ok(objects)
960}
961
962fn parse_json_object(json: &str) -> Result<Vec<(String, String)>, String> {
963 let json = json.trim();
964 if !json.starts_with('{') || !json.ends_with('}') {
965 return Err(format!(
966 "expected JSON object, got: {}...",
967 &json[..json.len().min(40)]
968 ));
969 }
970
971 let inner = json[1..json.len() - 1].trim();
972 if inner.is_empty() {
973 return Ok(Vec::new());
974 }
975
976 let mut pairs = Vec::new();
977 let parts = split_json_top_level(inner, ',');
978
979 for part in parts {
980 let part = part.trim();
981 if part.is_empty() {
982 continue;
983 }
984 let colon = find_json_colon(part)?;
985 let key_raw = part[..colon].trim();
986 let val_raw = part[colon + 1..].trim();
987
988 let key = if key_raw.starts_with('"') && key_raw.ends_with('"') {
989 json_unescape(&key_raw[1..key_raw.len() - 1])
990 } else {
991 return Err(format!("expected quoted key, got: {}", key_raw));
992 };
993
994 let value = json_value_to_sif_string(val_raw);
995 pairs.push((key, value));
996 }
997 Ok(pairs)
998}
999
1000fn flatten_object(pairs: &[(String, String)], prefix: &str) -> Vec<(String, String)> {
1001 flatten_object_depth(pairs, prefix, 0)
1002}
1003
1004fn flatten_object_depth(
1005 pairs: &[(String, String)],
1006 prefix: &str,
1007 depth: usize,
1008) -> Vec<(String, String)> {
1009 let mut out = Vec::new();
1010 for (key, value) in pairs {
1011 let full_key = if prefix.is_empty() {
1012 sanitize_field_name(key)
1013 } else {
1014 format!("{}.{}", prefix, sanitize_field_name(key))
1015 };
1016
1017 let trimmed = value.trim();
1018 if depth < MAX_FLATTEN_DEPTH && trimmed.starts_with('{') && trimmed.ends_with('}') {
1019 if let Ok(nested) = parse_json_object(trimmed) {
1020 let flat = flatten_object_depth(&nested, &full_key, depth + 1);
1021 out.extend(flat);
1022 continue;
1023 }
1024 }
1025
1026 let sif_value = if trimmed.starts_with('[') && trimmed.ends_with(']') {
1027 json_array_to_sif(trimmed)
1028 } else {
1029 value.clone()
1030 };
1031
1032 out.push((full_key, sif_value));
1033 }
1034 out
1035}
1036
1037fn json_value_to_sif_string(v: &str) -> String {
1038 let v = v.trim();
1039 if v == "null" {
1040 return "_".to_string();
1041 }
1042 if v == "true" {
1043 return "T".to_string();
1044 }
1045 if v == "false" {
1046 return "F".to_string();
1047 }
1048 if v.starts_with('"') && v.ends_with('"') {
1049 return json_unescape(&v[1..v.len() - 1]);
1050 }
1051 if v.starts_with('{') || v.starts_with('[') {
1052 return v.to_string();
1053 }
1054 v.to_string()
1055}
1056
1057fn json_unescape(s: &str) -> String {
1058 let mut out = String::new();
1059 let mut chars = s.chars();
1060 while let Some(c) = chars.next() {
1061 if c == '\\' {
1062 match chars.next() {
1063 Some('n') => out.push('\n'),
1064 Some('t') => out.push('\t'),
1065 Some('r') => out.push('\r'),
1066 Some('\\') => out.push('\\'),
1067 Some('"') => out.push('"'),
1068 Some('/') => out.push('/'),
1069 Some('u') => {
1070 let mut hex = String::with_capacity(4);
1071 for _ in 0..4 {
1072 if let Some(h) = chars.next() {
1073 hex.push(h);
1074 }
1075 }
1076 if let Ok(cp) = u32::from_str_radix(&hex, 16) {
1077 if let Some(ch) = char::from_u32(cp) {
1078 out.push(ch);
1079 }
1080 }
1081 }
1082 Some(other) => {
1083 out.push('\\');
1084 out.push(other);
1085 }
1086 None => out.push('\\'),
1087 }
1088 } else {
1089 out.push(c);
1090 }
1091 }
1092 out
1093}
1094
1095fn json_array_to_sif(v: &str) -> String {
1096 let inner = v[1..v.len() - 1].trim();
1097 if inner.is_empty() {
1098 return "[]".to_string();
1099 }
1100 let parts = split_json_top_level(inner, ',');
1101 let sif_parts: Vec<String> = parts
1102 .iter()
1103 .map(|p| json_value_to_sif_string(p.trim()))
1104 .collect();
1105 format!("[{}]", sif_parts.join(","))
1106}
1107
1108fn split_json_top_level(s: &str, delimiter: char) -> Vec<&str> {
1109 let mut parts = Vec::new();
1110 let mut depth = 0;
1111 let mut in_string = false;
1112 let mut escape = false;
1113 let mut start = 0;
1114
1115 for (i, c) in s.char_indices() {
1116 if escape {
1117 escape = false;
1118 continue;
1119 }
1120 if c == '\\' && in_string {
1121 escape = true;
1122 continue;
1123 }
1124 if c == '"' {
1125 in_string = !in_string;
1126 continue;
1127 }
1128 if in_string {
1129 continue;
1130 }
1131 match c {
1132 '[' | '{' => depth += 1,
1133 ']' | '}' => depth -= 1,
1134 c2 if c2 == delimiter && depth == 0 => {
1135 parts.push(&s[start..i]);
1136 start = i + 1;
1137 }
1138 _ => {}
1139 }
1140 }
1141 parts.push(&s[start..]);
1142 parts
1143}
1144
1145fn find_json_colon(s: &str) -> Result<usize, String> {
1146 let mut in_string = false;
1147 let mut escape = false;
1148
1149 for (i, c) in s.char_indices() {
1150 if escape {
1151 escape = false;
1152 continue;
1153 }
1154 if c == '\\' && in_string {
1155 escape = true;
1156 continue;
1157 }
1158 if c == '"' {
1159 in_string = !in_string;
1160 continue;
1161 }
1162 if c == ':' && !in_string {
1163 return Ok(i);
1164 }
1165 }
1166 Err(format!("no colon found in: {}", &s[..s.len().min(60)]))
1167}
1168
1169fn split_csv_line(line: &str) -> Vec<String> {
1170 let mut fields = Vec::new();
1171 let mut current = String::new();
1172 let mut in_quote = false;
1173 let mut chars = line.chars().peekable();
1174
1175 while let Some(c) = chars.next() {
1176 if in_quote {
1177 if c == '"' {
1178 if chars.peek() == Some(&'"') {
1179 chars.next();
1180 current.push('"');
1181 } else {
1182 in_quote = false;
1183 }
1184 } else {
1185 current.push(c);
1186 }
1187 } else {
1188 match c {
1189 ',' => {
1190 fields.push(current.clone());
1191 current.clear();
1192 }
1193 '"' => in_quote = true,
1194 _ => current.push(c),
1195 }
1196 }
1197 }
1198 fields.push(current);
1199 fields
1200}
1201
1202fn build_records(flat_rows: &[Vec<(String, String)>], schema: &Schema) -> Vec<Record> {
1203 flat_rows
1204 .iter()
1205 .map(|flat_row| {
1206 let values: Vec<Value> = schema
1207 .fields
1208 .iter()
1209 .map(|field_def| {
1210 if let Some((_, v)) = flat_row.iter().find(|(k, _)| k == &field_def.name) {
1211 crate::parse::parse_typed_value(v, &field_def.field_type)
1212 .unwrap_or_else(|_| crate::parse::parse_untyped_value(v))
1213 } else {
1214 Value::Null
1215 }
1216 })
1217 .collect();
1218 Record {
1219 values,
1220 cdc_op: CdcOp::Insert,
1221 }
1222 })
1223 .collect()
1224}
1225
1226fn build_records_csv(raw_rows: &[Vec<String>], schema: &Schema) -> Vec<Record> {
1227 raw_rows
1228 .iter()
1229 .map(|raw_row| {
1230 let values: Vec<Value> = schema
1231 .fields
1232 .iter()
1233 .enumerate()
1234 .map(|(i, field_def)| {
1235 if let Some(v) = raw_row.get(i) {
1236 crate::parse::parse_typed_value(v, &field_def.field_type)
1237 .unwrap_or_else(|_| crate::parse::parse_untyped_value(v))
1238 } else {
1239 Value::Null
1240 }
1241 })
1242 .collect();
1243 Record {
1244 values,
1245 cdc_op: CdcOp::Insert,
1246 }
1247 })
1248 .collect()
1249}
1250
1251fn empty_doc() -> Document {
1252 Document {
1253 header: Header {
1254 version: 1,
1255 attributes: HashMap::new(),
1256 },
1257 sections: vec![Section {
1258 id: None,
1259 directives: Vec::new(),
1260 schema: None,
1261 records: Vec::new(),
1262 blocks: Vec::new(),
1263 templates: Vec::new(),
1264 }],
1265 }
1266}
1267
1268#[cfg(test)]
1269mod tests {
1270 use super::*;
1271
1272 #[test]
1273 fn test_infer_basic_types() {
1274 assert_eq!(infer_type("42"), Type::Uint);
1275 assert_eq!(infer_type("-7"), Type::Int);
1276 assert_eq!(infer_type("3.14"), Type::Float);
1277 assert_eq!(infer_type("hello"), Type::Str);
1278 assert_eq!(infer_type("T"), Type::Bool);
1279 assert_eq!(infer_type("_"), Type::Null);
1280 assert_eq!(infer_type("2026-03-08"), Type::Date);
1281 assert_eq!(infer_type("2026-03-08T06:50:51Z"), Type::DateTime);
1282 assert_eq!(infer_type("PT2H30M"), Type::Duration);
1283 }
1284
1285 #[test]
1286 fn test_merge_types() {
1287 assert_eq!(merge_types(&Type::Int, &Type::Uint), Type::Int);
1288 assert_eq!(merge_types(&Type::Int, &Type::Float), Type::Float);
1289 assert_eq!(
1290 merge_types(&Type::Int, &Type::Null),
1291 Type::Nullable(Box::new(Type::Int))
1292 );
1293 assert_eq!(merge_types(&Type::Date, &Type::DateTime), Type::DateTime);
1294 }
1295
1296 #[test]
1297 fn test_from_json_basic() {
1298 let json = r#"[{"id":1,"name":"alice"},{"id":2,"name":"bob"}]"#;
1299 let doc = from_json(json).unwrap();
1300 let sec = &doc.sections[0];
1301 assert_eq!(sec.records.len(), 2);
1302 assert_eq!(sec.records[0].values[0], Value::Uint(1));
1303 }
1304
1305 #[test]
1306 fn test_from_json_nested() {
1307 let json = r#"[{"id":1,"actor":{"login":"alice","id":42}}]"#;
1308 let doc = from_json(json).unwrap();
1309 let schema = doc.sections[0].schema.as_ref().unwrap();
1310 let names: Vec<&str> = schema.fields.iter().map(|f| f.name.as_str()).collect();
1311 assert!(names.contains(&"actor.login"));
1312 assert!(names.contains(&"actor.id"));
1313 }
1314
1315 #[test]
1316 fn test_from_json_nulls() {
1317 let json = r#"[{"id":1,"score":42},{"id":2,"score":null}]"#;
1318 let doc = from_json(json).unwrap();
1319 let schema = doc.sections[0].schema.as_ref().unwrap();
1320 assert!(matches!(schema.fields[1].field_type, Type::Nullable(_)));
1321 }
1322
1323 #[test]
1324 fn test_from_csv() {
1325 let csv = "id,name,active\n1,alice,T\n2,bob,F\n";
1326 let doc = from_csv(csv).unwrap();
1327 let sec = &doc.sections[0];
1328 assert_eq!(sec.records.len(), 2);
1329 assert_eq!(sec.records[0].values[1], Value::Str("alice".to_string()));
1330 }
1331
1332 #[test]
1333 fn test_from_jsonl() {
1334 let jsonl = r#"{"id":1,"name":"alice"}
1335{"id":2,"name":"bob"}
1336"#;
1337 let doc = from_jsonl(jsonl, None).unwrap();
1338 assert_eq!(doc.sections[0].records.len(), 2);
1339 }
1340
1341 #[test]
1342 fn test_sanitize_field_name() {
1343 assert_eq!(sanitize_field_name("Case Number"), "case_number");
1344 assert_eq!(sanitize_field_name("$ref"), "_ref");
1345 assert_eq!(sanitize_field_name("\"STATION\""), "station");
1346 }
1347
1348 #[test]
1349 fn test_to_jsonl() {
1350 let schema = Schema {
1351 fields: vec![
1352 FieldDef {
1353 name: "id".to_string(),
1354 field_type: Type::Uint,
1355 semantic: Some("id".to_string()),
1356 deprecated: false,
1357 modifiers: vec![],
1358 },
1359 FieldDef {
1360 name: "name".to_string(),
1361 field_type: Type::Str,
1362 semantic: None,
1363 deprecated: false,
1364 modifiers: vec![],
1365 },
1366 ],
1367 };
1368 let records = vec![Record {
1369 values: vec![Value::Uint(1), Value::Str("alice".to_string())],
1370 cdc_op: CdcOp::Insert,
1371 }];
1372 let json = to_jsonl(&schema, &records);
1373 assert!(json.contains("\"id\":1"));
1374 assert!(json.contains("\"name\":\"alice\""));
1375 }
1376
1377 #[test]
1378 fn test_from_json_object_of_objects() {
1379 let json = r#"{
1381 "openapi": "3.0",
1382 "paths": {
1383 "/users": {"get": {"summary": "List users", "operationId": "getUsers"}},
1384 "/users/{id}": {"get": {"summary": "Get user", "operationId": "getUser"}},
1385 "/posts": {"get": {"summary": "List posts", "operationId": "getPosts"}},
1386 "/posts/{id}": {"get": {"summary": "Get post", "operationId": "getPost"}}
1387 }
1388 }"#;
1389 let doc = from_json(json).unwrap();
1390 eprintln!("sections: {}", doc.sections.len());
1391 for sec in &doc.sections {
1392 eprintln!(" id={:?} records={} fields={}",
1393 sec.id, sec.records.len(),
1394 sec.schema.as_ref().map(|s| s.fields.len()).unwrap_or(0));
1395 }
1396 let total: usize = doc.sections.iter().map(|s| s.records.len()).sum();
1398 assert!(total >= 4, "expected >=4 records, got {}", total);
1399 }
1400}