use std::collections::HashSet;
use prost_types::field_descriptor_proto::{Label, Type as ProtoType};
use prost_types::{DescriptorProto, FieldDescriptorProto, MessageOptions};
use serde::Deserialize;
use serde_json::Value as JsonValue;
use thiserror::Error;
#[derive(Debug, Clone, Deserialize)]
pub struct UcColumn {
pub name: String,
pub type_name: String,
#[serde(default)]
pub type_text: String,
#[serde(default)]
pub type_json: String,
#[serde(default = "default_true")]
pub nullable: bool,
#[serde(default)]
pub position: i32,
}
fn default_true() -> bool {
true
}
#[derive(Debug, Clone, Deserialize)]
pub struct UcTableSchema {
pub name: String,
pub catalog_name: String,
pub schema_name: String,
pub columns: Vec<UcColumn>,
}
#[derive(Debug, Error)]
pub enum SchemaError {
#[error("invalid field name '{name}': {reason}")]
InvalidFieldName { name: String, reason: String },
#[error("unsupported Databricks type '{0}'")]
UnsupportedType(String),
#[error("missing type_json for complex column '{0}'")]
MissingTypeJson(String),
#[error("failed to parse type_json for column '{column}': {reason}")]
InvalidTypeJson { column: String, reason: String },
#[error("{0}")]
Invalid(String),
}
pub fn descriptor_from_uc_columns(
columns: &[UcColumn],
message_name: &str,
) -> Result<DescriptorProto, SchemaError> {
let mut collector = MessageCollector::new();
let mut fields = Vec::with_capacity(columns.len());
let mut sorted: Vec<&UcColumn> = columns.iter().filter(|c| c.position >= 0).collect();
sorted.sort_by_key(|c| c.position);
for column in sorted.iter() {
validate_field_name(&column.name)?;
let (field_type, type_name, is_repeated) = if is_complex(&column.type_name) {
if column.type_json.is_empty() {
return Err(SchemaError::MissingTypeJson(column.name.clone()));
}
let complex = parse_type_json(&column.type_json).map_err(|reason| {
SchemaError::InvalidTypeJson {
column: column.name.clone(),
reason,
}
})?;
let is_repeated = matches!(complex, ComplexType::Array(_) | ComplexType::Map { .. });
let (ty, type_name) =
map_complex_type_to_protobuf(&complex, &column.name, &mut collector)?;
(ty, type_name, is_repeated)
} else {
(map_simple_databricks_type(&column.type_name)?, None, false)
};
fields.push(field_descriptor(
&column.name,
column.position + 1,
field_type,
type_name,
column.nullable,
is_repeated,
));
}
Ok(DescriptorProto {
name: Some(message_name.to_string()),
field: fields,
nested_type: collector.nested,
..Default::default()
})
}
pub fn descriptor_from_uc_schema(schema: &UcTableSchema) -> Result<DescriptorProto, SchemaError> {
let message_name = sanitize_message_name(&format!("{}_{}", schema.schema_name, schema.name));
descriptor_from_uc_columns(&schema.columns, &message_name)
}
fn is_complex(type_name: &str) -> bool {
matches!(type_name, "STRUCT" | "ARRAY" | "MAP")
}
fn field_descriptor(
name: &str,
number: i32,
field_type: ProtoType,
type_name: Option<String>,
nullable: bool,
is_repeated: bool,
) -> FieldDescriptorProto {
let label = if is_repeated {
Label::Repeated
} else if nullable {
Label::Optional
} else {
Label::Required
};
FieldDescriptorProto {
name: Some(name.to_string()),
number: Some(number),
label: Some(label as i32),
r#type: Some(field_type as i32),
type_name,
json_name: Some(name.to_string()),
proto3_optional: Some(nullable && !is_repeated),
..Default::default()
}
}
fn map_simple_databricks_type(type_name: &str) -> Result<ProtoType, SchemaError> {
Ok(match type_name {
"STRING" => ProtoType::String,
"INT" | "INTEGER" => ProtoType::Int32,
"LONG" | "BIGINT" => ProtoType::Int64,
"SHORT" | "SMALLINT" | "BYTE" | "TINYINT" => ProtoType::Int32,
"BOOLEAN" | "BOOL" => ProtoType::Bool,
"DOUBLE" => ProtoType::Double,
"FLOAT" => ProtoType::Float,
"TIMESTAMP" | "TIMESTAMP_NTZ" => ProtoType::Int64,
"DATE" => ProtoType::Int32,
"BINARY" => ProtoType::Bytes,
"DECIMAL" => ProtoType::String,
"VARIANT" => ProtoType::String,
other => return Err(SchemaError::UnsupportedType(other.to_string())),
})
}
#[derive(Debug, Clone)]
enum ComplexType {
Primitive(PrimitiveType),
Struct(StructType),
Array(Box<ComplexType>),
Map {
key: Box<ComplexType>,
value: Box<ComplexType>,
},
}
#[derive(Debug, Clone, Copy)]
enum PrimitiveType {
String,
Long,
Integer,
Short,
Byte,
Double,
Float,
Boolean,
Binary,
Timestamp,
Date,
Decimal,
}
#[derive(Debug, Clone)]
struct StructField {
name: String,
field_type: ComplexType,
nullable: bool,
}
#[derive(Debug, Clone)]
struct StructType {
fields: Vec<StructField>,
}
const MAX_NESTING_DEPTH: usize = 100;
#[derive(Deserialize)]
#[serde(untagged)]
enum TypeRef {
Complex(ComplexTypeJson),
Primitive(String),
}
#[derive(Deserialize)]
#[serde(tag = "type", rename_all = "lowercase")]
enum ComplexTypeJson {
Struct {
fields: Vec<StructFieldJson>,
},
Array {
#[serde(rename = "elementType")]
element_type: Box<TypeRef>,
},
Map {
#[serde(rename = "keyType")]
key_type: Box<TypeRef>,
#[serde(rename = "valueType")]
value_type: Box<TypeRef>,
},
}
#[derive(Deserialize)]
struct StructFieldJson {
name: String,
#[serde(rename = "type")]
ty: TypeRef,
#[serde(default = "default_true")]
nullable: bool,
}
fn parse_type_json(type_json: &str) -> Result<ComplexType, String> {
if type_json.is_empty() || type_json == "{}" {
return Err("empty type_json".into());
}
let raw: JsonValue = serde_json::from_str(type_json).map_err(|e| e.to_string())?;
let inner = match raw.as_object() {
Some(obj) if obj.contains_key("name") && obj.contains_key("type") => {
obj.get("type").unwrap().clone()
}
_ => raw,
};
let tref: TypeRef = serde_json::from_value(inner).map_err(|e| e.to_string())?;
type_ref_to_complex(&tref, 0)
}
fn type_ref_to_complex(tref: &TypeRef, level: usize) -> Result<ComplexType, String> {
if level > MAX_NESTING_DEPTH {
return Err(format!(
"nesting level exceeds maximum depth of {}",
MAX_NESTING_DEPTH
));
}
match tref {
TypeRef::Primitive(s) => parse_primitive_type(s).map(ComplexType::Primitive),
TypeRef::Complex(ComplexTypeJson::Struct { fields }) => {
let mut out = Vec::with_capacity(fields.len());
for f in fields {
out.push(StructField {
name: f.name.clone(),
field_type: type_ref_to_complex(&f.ty, level + 1)?,
nullable: f.nullable,
});
}
Ok(ComplexType::Struct(StructType { fields: out }))
}
TypeRef::Complex(ComplexTypeJson::Array { element_type }) => Ok(ComplexType::Array(
Box::new(type_ref_to_complex(element_type, level + 1)?),
)),
TypeRef::Complex(ComplexTypeJson::Map {
key_type,
value_type,
}) => Ok(ComplexType::Map {
key: Box::new(type_ref_to_complex(key_type, level + 1)?),
value: Box::new(type_ref_to_complex(value_type, level + 1)?),
}),
}
}
fn parse_primitive_type(s: &str) -> Result<PrimitiveType, String> {
Ok(match s {
"string" => PrimitiveType::String,
"long" => PrimitiveType::Long,
"integer" => PrimitiveType::Integer,
"short" => PrimitiveType::Short,
"byte" => PrimitiveType::Byte,
"double" => PrimitiveType::Double,
"float" => PrimitiveType::Float,
"boolean" => PrimitiveType::Boolean,
"binary" => PrimitiveType::Binary,
"timestamp" | "timestamp_ntz" => PrimitiveType::Timestamp,
"date" => PrimitiveType::Date,
s if s.starts_with("decimal") => PrimitiveType::Decimal,
other => return Err(format!("unknown primitive type '{}'", other)),
})
}
const fn map_primitive_to_protobuf(p: PrimitiveType) -> ProtoType {
match p {
PrimitiveType::String => ProtoType::String,
PrimitiveType::Long => ProtoType::Int64,
PrimitiveType::Integer => ProtoType::Int32,
PrimitiveType::Short | PrimitiveType::Byte => ProtoType::Int32,
PrimitiveType::Double => ProtoType::Double,
PrimitiveType::Float => ProtoType::Float,
PrimitiveType::Boolean => ProtoType::Bool,
PrimitiveType::Binary => ProtoType::Bytes,
PrimitiveType::Timestamp => ProtoType::Int64,
PrimitiveType::Date => ProtoType::Int32,
PrimitiveType::Decimal => ProtoType::String,
}
}
const fn is_valid_map_key(p: PrimitiveType) -> bool {
!matches!(
p,
PrimitiveType::Double | PrimitiveType::Float | PrimitiveType::Binary
)
}
struct MessageCollector {
nested: Vec<DescriptorProto>,
used: HashSet<String>,
}
impl MessageCollector {
fn new() -> Self {
Self {
nested: Vec::new(),
used: HashSet::new(),
}
}
fn unique_name(&mut self, base: String) -> String {
if self.used.insert(base.clone()) {
return base;
}
let mut n = 2u32;
loop {
let candidate = format!("{}{}", base, n);
if self.used.insert(candidate.clone()) {
return candidate;
}
n += 1;
}
}
fn push(&mut self, message: DescriptorProto) {
self.nested.push(message);
}
}
fn map_complex_type_to_protobuf(
ct: &ComplexType,
path: &str,
collector: &mut MessageCollector,
) -> Result<(ProtoType, Option<String>), SchemaError> {
match ct {
ComplexType::Primitive(p) => Ok((map_primitive_to_protobuf(*p), None)),
ComplexType::Struct(st) => {
let name = collector.unique_name(sanitize_message_name(path));
let msg = generate_struct_message(&name, st)?;
collector.push(msg);
Ok((ProtoType::Message, Some(name)))
}
ComplexType::Array(element) => match element.as_ref() {
ComplexType::Primitive(p) => Ok((map_primitive_to_protobuf(*p), None)),
ComplexType::Struct(_) => {
let element_path = format!("{}_element", sanitize_message_name(path));
map_complex_type_to_protobuf(element, &element_path, collector)
}
ComplexType::Array(_) => Err(SchemaError::Invalid(format!(
"nested arrays not supported for field '{}'",
path
))),
ComplexType::Map { .. } => Err(SchemaError::Invalid(format!(
"arrays of maps not supported for field '{}'",
path
))),
},
ComplexType::Map { key, value } => {
let key_primitive = match key.as_ref() {
ComplexType::Primitive(p) if is_valid_map_key(*p) => *p,
ComplexType::Primitive(p) => {
return Err(SchemaError::Invalid(format!(
"unsupported map key type {:?} for field '{}' \
(protobuf map keys must be integral, bool, or string)",
p, path
)));
}
_ => {
return Err(SchemaError::Invalid(format!(
"map keys must be primitive types (field '{}')",
path
)));
}
};
let base = sanitize_message_name(path);
let map_value = match value.as_ref() {
ComplexType::Primitive(v) => MapValue::Primitive(*v),
ComplexType::Struct(st) => {
let value_name = collector.unique_name(format!("{}Value", base));
let value_msg = generate_struct_message(&value_name, st)?;
collector.push(value_msg);
MapValue::Message(value_name)
}
ComplexType::Array(_) | ComplexType::Map { .. } => {
return Err(SchemaError::Invalid(format!(
"maps with complex value types not supported for field '{}'",
path
)));
}
};
let entry_name = collector.unique_name(format!("{}Entry", base));
let entry = generate_map_entry(&entry_name, key_primitive, map_value);
collector.push(entry);
Ok((ProtoType::Message, Some(entry_name)))
}
}
}
fn generate_struct_message(
message_name: &str,
st: &StructType,
) -> Result<DescriptorProto, SchemaError> {
let mut local = MessageCollector::new();
let mut fields = Vec::with_capacity(st.fields.len());
for (index, f) in st.fields.iter().enumerate() {
validate_field_name(&f.name)?;
let path = format!("{}_{}", message_name, f.name);
let (field_type, type_name) =
map_complex_type_to_protobuf(&f.field_type, &path, &mut local)?;
let is_repeated = matches!(
f.field_type,
ComplexType::Array(_) | ComplexType::Map { .. }
);
fields.push(field_descriptor(
&f.name,
(index + 1) as i32,
field_type,
type_name,
f.nullable,
is_repeated,
));
}
Ok(DescriptorProto {
name: Some(message_name.to_string()),
field: fields,
nested_type: local.nested,
..Default::default()
})
}
enum MapValue {
Primitive(PrimitiveType),
Message(String),
}
fn generate_map_entry(name: &str, key: PrimitiveType, value: MapValue) -> DescriptorProto {
let key_field = FieldDescriptorProto {
name: Some("key".into()),
number: Some(1),
label: Some(Label::Optional as i32),
r#type: Some(map_primitive_to_protobuf(key) as i32),
json_name: Some("key".into()),
proto3_optional: Some(false),
..Default::default()
};
let (value_type, value_type_name) = match value {
MapValue::Primitive(p) => (map_primitive_to_protobuf(p), None),
MapValue::Message(n) => (ProtoType::Message, Some(n)),
};
let value_field = FieldDescriptorProto {
name: Some("value".into()),
number: Some(2),
label: Some(Label::Optional as i32),
r#type: Some(value_type as i32),
type_name: value_type_name,
json_name: Some("value".into()),
proto3_optional: Some(true),
..Default::default()
};
DescriptorProto {
name: Some(name.to_string()),
field: vec![key_field, value_field],
options: Some(MessageOptions {
map_entry: Some(true),
..Default::default()
}),
..Default::default()
}
}
const RESERVED_FIELD_NAMES: &[&str] = &[
"syntax", "import", "option", "package", "message", "enum", "service", "rpc", "returns",
"reserved", "to", "max", "double", "float", "int32", "int64", "uint32", "uint64", "sint32",
"sint64", "fixed32", "fixed64", "sfixed32", "sfixed64", "bool", "string", "bytes",
];
fn validate_field_name(name: &str) -> Result<(), SchemaError> {
if name.is_empty() {
return Err(SchemaError::InvalidFieldName {
name: name.to_string(),
reason: "empty".into(),
});
}
if name.starts_with(|c: char| c.is_ascii_digit()) {
return Err(SchemaError::InvalidFieldName {
name: name.to_string(),
reason: "cannot start with a digit".into(),
});
}
if !name.chars().all(|c| c.is_ascii_alphanumeric() || c == '_') {
return Err(SchemaError::InvalidFieldName {
name: name.to_string(),
reason: "only alphanumeric and '_' characters allowed".into(),
});
}
if RESERVED_FIELD_NAMES.contains(&name) {
return Err(SchemaError::InvalidFieldName {
name: name.to_string(),
reason: "reserved proto keyword".into(),
});
}
Ok(())
}
fn sanitize_message_name(name: &str) -> String {
let mut out = String::with_capacity(name.len());
let mut capitalize = true;
for c in name.chars() {
if c.is_ascii_alphanumeric() {
if capitalize {
out.push(c.to_ascii_uppercase());
capitalize = false;
} else {
out.push(c);
}
} else {
capitalize = true;
}
}
if out.is_empty() || !out.chars().next().unwrap().is_ascii_alphabetic() {
out.insert(0, 'M');
}
out
}
#[cfg(test)]
mod tests {
use super::*;
fn col(name: &str, type_name: &str, nullable: bool, position: i32) -> UcColumn {
UcColumn {
name: name.into(),
type_name: type_name.into(),
type_text: type_name.to_lowercase(),
type_json: String::new(),
nullable,
position,
}
}
fn complex_col(name: &str, type_name: &str, type_json: &str, position: i32) -> UcColumn {
UcColumn {
name: name.into(),
type_name: type_name.into(),
type_text: String::new(),
type_json: type_json.into(),
nullable: true,
position,
}
}
fn field<'a>(desc: &'a DescriptorProto, name: &str) -> &'a FieldDescriptorProto {
desc.field
.iter()
.find(|f| f.name() == name)
.unwrap_or_else(|| panic!("field '{}' not found in {:?}", name, desc.name()))
}
#[test]
fn scalars_round_trip() {
let cols = vec![
col("id", "BIGINT", false, 0),
col("name", "STRING", true, 1),
col("score", "DOUBLE", true, 2),
col("created_at", "TIMESTAMP", true, 3),
col("d", "DATE", false, 4),
col("data", "BINARY", false, 5),
];
let d = descriptor_from_uc_columns(&cols, "m").unwrap();
assert_eq!(d.name(), "m");
assert_eq!(field(&d, "id").r#type(), ProtoType::Int64);
assert_eq!(field(&d, "id").label(), Label::Required);
assert_eq!(field(&d, "name").label(), Label::Optional);
assert_eq!(field(&d, "score").r#type(), ProtoType::Double);
assert_eq!(field(&d, "created_at").r#type(), ProtoType::Int64);
assert_eq!(field(&d, "d").r#type(), ProtoType::Int32);
assert_eq!(field(&d, "data").r#type(), ProtoType::Bytes);
assert_eq!(field(&d, "id").number(), 1);
assert_eq!(field(&d, "data").number(), 6);
}
#[test]
fn field_numbers_mirror_uc_position() {
let cols = vec![
col("a", "STRING", true, 0),
col("b", "STRING", true, 4),
col("c", "STRING", true, 8),
];
let d = descriptor_from_uc_columns(&cols, "m").unwrap();
assert_eq!(field(&d, "a").number(), 1);
assert_eq!(field(&d, "b").number(), 5);
assert_eq!(field(&d, "c").number(), 9);
}
#[test]
fn struct_becomes_nested_message() {
let type_json = r#"{
"type":"struct",
"fields":[
{"name":"street","type":"string","nullable":true,"metadata":{}},
{"name":"zip","type":"integer","nullable":false,"metadata":{}}
]
}"#;
let cols = vec![complex_col("address", "STRUCT", type_json, 0)];
let d = descriptor_from_uc_columns(&cols, "m").unwrap();
let f = field(&d, "address");
assert_eq!(f.r#type(), ProtoType::Message);
assert_eq!(f.label(), Label::Optional);
let type_name = f.type_name.as_deref().unwrap();
let nested = d
.nested_type
.iter()
.find(|n| n.name() == type_name)
.expect("nested struct message not emitted");
assert_eq!(field(nested, "street").r#type(), ProtoType::String);
assert_eq!(field(nested, "zip").r#type(), ProtoType::Int32);
assert_eq!(field(nested, "zip").label(), Label::Required);
}
#[test]
fn array_of_primitive_is_repeated_scalar() {
let type_json = r#"{"type":"array","elementType":"long","containsNull":true}"#;
let cols = vec![complex_col("tags", "ARRAY", type_json, 0)];
let d = descriptor_from_uc_columns(&cols, "m").unwrap();
let f = field(&d, "tags");
assert_eq!(f.label(), Label::Repeated);
assert_eq!(f.r#type(), ProtoType::Int64);
assert!(f.type_name.is_none());
}
#[test]
fn array_of_struct_emits_nested_message() {
let type_json = r#"{
"type":"array",
"elementType":{
"type":"struct",
"fields":[{"name":"k","type":"string","nullable":true,"metadata":{}}]
},
"containsNull":true
}"#;
let cols = vec![complex_col("items", "ARRAY", type_json, 0)];
let d = descriptor_from_uc_columns(&cols, "m").unwrap();
let f = field(&d, "items");
assert_eq!(f.label(), Label::Repeated);
assert_eq!(f.r#type(), ProtoType::Message);
let name = f.type_name.as_deref().unwrap();
assert!(d.nested_type.iter().any(|n| n.name() == name));
}
#[test]
fn map_of_primitive_generates_entry_message() {
let type_json =
r#"{"type":"map","keyType":"string","valueType":"integer","valueContainsNull":true}"#;
let cols = vec![complex_col("props", "MAP", type_json, 0)];
let d = descriptor_from_uc_columns(&cols, "m").unwrap();
let f = field(&d, "props");
assert_eq!(f.label(), Label::Repeated);
assert_eq!(f.r#type(), ProtoType::Message);
let entry_name = f.type_name.as_deref().unwrap();
let entry = d
.nested_type
.iter()
.find(|n| n.name() == entry_name)
.expect("map entry message missing");
assert_eq!(entry.options.as_ref().and_then(|o| o.map_entry), Some(true));
assert_eq!(field(entry, "key").r#type(), ProtoType::String);
assert_eq!(field(entry, "value").r#type(), ProtoType::Int32);
}
#[test]
fn map_with_struct_value_emits_value_and_entry() {
let type_json = r#"{
"type":"map",
"keyType":"string",
"valueType":{
"type":"struct",
"fields":[{"name":"v","type":"long","nullable":true,"metadata":{}}]
},
"valueContainsNull":true
}"#;
let cols = vec![complex_col("lookup", "MAP", type_json, 0)];
let d = descriptor_from_uc_columns(&cols, "m").unwrap();
let f = field(&d, "lookup");
let entry_name = f.type_name.as_deref().unwrap();
let entry = d
.nested_type
.iter()
.find(|n| n.name() == entry_name)
.unwrap();
assert_eq!(entry.options.as_ref().and_then(|o| o.map_entry), Some(true));
let value_type_name = field(entry, "value").type_name.as_deref().unwrap();
assert!(d.nested_type.iter().any(|n| n.name() == value_type_name));
}
#[test]
fn rejects_unsupported_map_key() {
let type_json =
r#"{"type":"map","keyType":"double","valueType":"integer","valueContainsNull":true}"#;
let cols = vec![complex_col("bad", "MAP", type_json, 0)];
let err = descriptor_from_uc_columns(&cols, "m").unwrap_err();
assert!(matches!(err, SchemaError::Invalid(_)), "got {:?}", err);
}
#[test]
fn rejects_excessively_deep_nesting() {
let mut type_json = String::from("\"integer\"");
for _ in 0..MAX_NESTING_DEPTH + 2 {
type_json = format!(
r#"{{"type":"array","elementType":{},"containsNull":true}}"#,
type_json
);
}
let cols = vec![complex_col("deep", "ARRAY", &type_json, 0)];
let err = descriptor_from_uc_columns(&cols, "m").unwrap_err();
match err {
SchemaError::InvalidTypeJson { reason, .. } => {
assert!(
reason.contains("maximum depth"),
"unexpected reason: {}",
reason
);
}
other => panic!("expected InvalidTypeJson, got {:?}", other),
}
}
#[test]
fn rejects_nested_arrays() {
let type_json = r#"{"type":"array","elementType":{"type":"array","elementType":"integer","containsNull":true},"containsNull":true}"#;
let cols = vec![complex_col("nested", "ARRAY", type_json, 0)];
let err = descriptor_from_uc_columns(&cols, "m").unwrap_err();
assert!(matches!(err, SchemaError::Invalid(_)), "got {:?}", err);
}
#[test]
fn rejects_invalid_field_name() {
let cols = vec![col("1bad", "STRING", true, 0)];
let err = descriptor_from_uc_columns(&cols, "m").unwrap_err();
assert!(matches!(err, SchemaError::InvalidFieldName { .. }));
}
#[test]
fn rejects_reserved_proto_keyword() {
let cols = vec![col("message", "STRING", true, 0)];
let err = descriptor_from_uc_columns(&cols, "m").unwrap_err();
match err {
SchemaError::InvalidFieldName { name, reason } => {
assert_eq!(name, "message");
assert!(reason.contains("reserved"), "got reason: {}", reason);
}
other => panic!("expected InvalidFieldName, got {:?}", other),
}
}
#[test]
fn complex_column_requires_type_json() {
let cols = vec![col("x", "STRUCT", true, 0)];
let err = descriptor_from_uc_columns(&cols, "m").unwrap_err();
assert!(matches!(err, SchemaError::MissingTypeJson(_)));
}
#[test]
fn descriptor_from_uc_schema_derives_name() {
let schema = UcTableSchema {
name: "events".into(),
catalog_name: "main".into(),
schema_name: "analytics".into(),
columns: vec![col("id", "BIGINT", false, 0)],
};
let d = descriptor_from_uc_schema(&schema).unwrap();
assert_eq!(d.name(), "AnalyticsEvents");
}
#[test]
fn unique_name_disambiguates_collisions_in_input_order() {
let type_json = r#"{
"type":"struct",
"fields":[
{"name":"foo","type":{"type":"struct","fields":[
{"name":"a","type":"string","nullable":true,"metadata":{}}
]},"nullable":true,"metadata":{}},
{"name":"Foo","type":{"type":"struct","fields":[
{"name":"b","type":"string","nullable":true,"metadata":{}}
]},"nullable":true,"metadata":{}}
]
}"#;
let cols = vec![complex_col("parent", "STRUCT", type_json, 0)];
let d = descriptor_from_uc_columns(&cols, "m").unwrap();
let parent = d
.nested_type
.iter()
.find(|n| n.name() == "Parent")
.expect("Parent message missing");
let foo = field(parent, "foo");
let foo_cap = field(parent, "Foo");
assert_eq!(foo.type_name.as_deref(), Some("ParentFoo"));
assert_eq!(foo_cap.type_name.as_deref(), Some("ParentFoo2"));
}
#[test]
fn sanitize_message_name_handles_invalid_chars() {
assert_eq!(sanitize_message_name("foo-bar"), "FooBar");
assert_eq!(sanitize_message_name("1abc"), "M1abc");
assert_eq!(sanitize_message_name("analytics.events"), "AnalyticsEvents");
}
#[test]
fn sanitize_message_name_drops_non_ascii() {
assert_eq!(sanitize_message_name("café"), "Caf");
assert_eq!(sanitize_message_name("événements"), "VNements");
assert_eq!(sanitize_message_name("中文_table"), "Table");
assert_eq!(sanitize_message_name("中文"), "M");
let result = sanitize_message_name("éfoo");
assert!(result.chars().next().unwrap().is_ascii_alphabetic());
assert!(result
.chars()
.all(|c| c.is_ascii_alphanumeric() || c == '_'));
}
#[test]
fn uc_column_deserializes_from_uc_api_shape() {
let json = r#"{
"name":"id",
"type_name":"INT",
"type_text":"int",
"type_json":"{\"name\":\"id\",\"type\":\"integer\",\"nullable\":false,\"metadata\":{}}",
"nullable":false,
"position":0
}"#;
let col: UcColumn = serde_json::from_str(json).unwrap();
assert_eq!(col.name, "id");
assert_eq!(col.type_name, "INT");
assert!(!col.nullable);
}
}