use std::collections::BTreeMap;
use serde_json::{json, Value};
use crate::registry::{PathEntry, PathRegistry};
use crate::traversal::JsonType;
#[derive(Default)]
struct Node {
children: BTreeMap<String, Node>,
entry: Option<*const PathEntry>,
}
unsafe impl Send for Node {}
unsafe impl Sync for Node {}
pub fn build_schema(registry: &PathRegistry) -> Value {
let tree = build_tree(registry);
emit_record(&tree, "Root", registry)
}
fn build_tree(registry: &PathRegistry) -> Node {
let mut root = Node::default();
for (path, entry) in ®istry.entries {
let segments: Vec<&str> = if path.is_empty() {
vec![]
} else {
path.split('.').collect()
};
insert_node(&mut root, &segments, entry as *const PathEntry);
}
root
}
fn insert_node(node: &mut Node, segments: &[&str], entry: *const PathEntry) {
if segments.is_empty() {
node.entry = Some(entry);
return;
}
let head = segments[0];
let child = node.children.entry(head.to_string()).or_default();
insert_node(child, &segments[1..], entry);
}
fn emit_record(node: &Node, name: &str, registry: &PathRegistry) -> Value {
let fields: Vec<Value> = node
.children
.iter()
.filter(|(seg, _)| *seg != "$") .map(|(seg, child)| emit_field(seg, child, registry))
.collect();
json!({
"type": "record",
"name": name,
"fields": fields,
})
}
fn emit_field(name: &str, node: &Node, registry: &PathRegistry) -> Value {
let avro_type = emit_type(name, node, registry);
json!({
"name": name,
"type": avro_type,
})
}
fn emit_type(name: &str, node: &Node, registry: &PathRegistry) -> Value {
let is_array = node.children.contains_key("$");
if is_array {
return emit_array_type(name, node, registry);
}
let has_object_children = node.children.keys().any(|k| k != "$");
if has_object_children {
if let Some(entry_ptr) = node.entry {
let entry = unsafe { &*entry_ptr };
if let Some(tracker) = &entry.key_tracker {
if tracker.is_unbounded {
let value_type = infer_map_value_type(node, registry);
return json!({
"type": "map",
"values": value_type,
});
}
}
}
let record_name = pascal_case(name);
return emit_record(node, &record_name, registry);
}
if let Some(entry_ptr) = node.entry {
let entry = unsafe { &*entry_ptr };
return leaf_avro_type(name, entry, registry);
}
json!("null")
}
fn emit_array_type(name: &str, node: &Node, registry: &PathRegistry) -> Value {
let element_node = &node.children["$"];
let items = emit_type(name, element_node, registry);
json!({
"type": "array",
"items": items,
})
}
fn infer_map_value_type(node: &Node, registry: &PathRegistry) -> Value {
if let Some(arr_child) = node.children.get("$") {
let items = emit_type("value", arr_child, registry);
return json!({"type": "array", "items": items});
}
let child_types: Vec<Value> = node
.children
.iter()
.filter(|(k, _)| *k != "$")
.map(|(k, child)| emit_type(k, child, registry))
.collect();
if child_types.is_empty() {
json!("string") } else if child_types.len() == 1 {
child_types.into_iter().next().unwrap()
} else {
Value::Array(child_types)
}
}
fn leaf_avro_type(_name: &str, entry: &PathEntry, _registry: &PathRegistry) -> Value {
let types_by_freq = entry.types_by_frequency();
let mut avro_types: Vec<Value> = Vec::new();
let mut has_null = false;
let mut collision = false;
let mut non_null_count = 0usize;
for (json_type, _count) in &types_by_freq {
match json_type {
JsonType::Null => {
has_null = true;
}
_ => {
non_null_count += 1;
let avro = json_type_to_avro(json_type);
if !avro_types.contains(&avro) {
avro_types.push(avro);
}
}
}
}
if non_null_count > 1 {
collision = true;
}
let mut union: Vec<Value> = Vec::new();
if has_null {
union.push(json!("null"));
}
union.extend(avro_types);
if union.len() == 1 {
let t = union.remove(0);
return t;
}
let union_val = Value::Array(union);
if collision {
let type_summary: Vec<String> = types_by_freq
.iter()
.filter(|(t, _)| **t != JsonType::Null)
.map(|(t, c)| format!("{}={}", t.as_str(), c))
.collect();
let _ = type_summary; union_val
} else {
union_val
}
}
fn json_type_to_avro(t: &JsonType) -> Value {
match t {
JsonType::Null => json!("null"),
JsonType::Bool => json!("boolean"),
JsonType::Integer => json!("long"),
JsonType::Float => json!("double"),
JsonType::String => json!("string"),
JsonType::Array => json!({"type": "array", "items": "null"}), JsonType::Object => json!({"type": "record", "name": "Object", "fields": []}),
}
}
fn pascal_case(s: &str) -> String {
let mut result = String::new();
let mut capitalise_next = true;
for ch in s.chars() {
if ch == '_' || ch == '-' {
capitalise_next = true;
} else if capitalise_next {
result.extend(ch.to_uppercase());
capitalise_next = false;
} else {
result.push(ch);
}
}
if result.is_empty() {
"Record".to_string()
} else {
result
}
}
pub fn annotate_collisions(schema: &mut Value, registry: &PathRegistry) {
annotate_record(schema, "", registry);
}
fn annotate_record(schema: &mut Value, prefix: &str, registry: &PathRegistry) {
if let Some(fields) = schema
.get_mut("fields")
.and_then(|f| f.as_array_mut())
{
for field in fields.iter_mut() {
let field_name = field
.get("name")
.and_then(|n| n.as_str())
.unwrap_or("")
.to_string();
let path = if prefix.is_empty() {
field_name.clone()
} else {
format!("{}.{}", prefix, field_name)
};
if let Some(entry) = registry.entries.get(&path) {
if entry.has_type_collision() {
let type_summary: Vec<String> = entry
.types_by_frequency()
.iter()
.filter(|(t, _)| **t != JsonType::Null)
.map(|(t, c)| format!("{}={}", t.as_str(), c))
.collect();
let doc = format!(
"TYPE COLLISION: {} — manual transformation required",
type_summary.join(", ")
);
if let Some(obj) = field.as_object_mut() {
obj.insert("doc".to_string(), json!(doc));
}
}
if let Some(tracker) = &entry.key_tracker {
if tracker.is_unbounded {
if let Some(obj) = field.as_object_mut() {
obj.insert(
"doc".to_string(),
json!(format!(
"UNBOUNDED MAP: exceeded {} distinct keys",
registry.max_keys
)),
);
}
}
}
}
if let Some(field_type) = field.get_mut("type") {
annotate_value(field_type, &path, registry);
}
}
}
}
fn annotate_value(val: &mut Value, prefix: &str, registry: &PathRegistry) {
match val {
Value::Object(obj) => {
if obj.get("type").and_then(|t| t.as_str()) == Some("record") {
annotate_record(val, prefix, registry);
} else if obj.get("type").and_then(|t| t.as_str()) == Some("array") {
if let Some(items) = obj.get_mut("items") {
let array_prefix = format!("{}.{}", prefix, "$");
annotate_value(items, &array_prefix, registry);
}
}
}
Value::Array(union) => {
for item in union.iter_mut() {
annotate_value(item, prefix, registry);
}
}
_ => {}
}
}