use arrow::array::{Int64Array, StringArray, StructArray};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use arrow_zerobus_sdk_wrapper::wrapper::conversion;
use arrow_zerobus_sdk_wrapper::ZerobusError;
use prost_types::{
field_descriptor_proto::{Label, Type},
DescriptorProto, FieldDescriptorProto,
};
use std::collections::HashMap;
use std::sync::Arc;
fn create_nested_descriptor() -> (DescriptorProto, DescriptorProto) {
let nested = DescriptorProto {
name: Some("NestedMessage".to_string()),
field: vec![
FieldDescriptorProto {
name: Some("nested_id".to_string()),
number: Some(1),
label: Some(Label::Optional as i32),
r#type: Some(Type::Int64 as i32),
type_name: None,
extendee: None,
default_value: None,
oneof_index: None,
json_name: None,
options: None,
proto3_optional: None,
},
FieldDescriptorProto {
name: Some("nested_name".to_string()),
number: Some(2),
label: Some(Label::Optional as i32),
r#type: Some(Type::String as i32),
type_name: None,
extendee: None,
default_value: None,
oneof_index: None,
json_name: None,
options: None,
proto3_optional: None,
},
],
extension: vec![],
nested_type: vec![],
enum_type: vec![],
extension_range: vec![],
oneof_decl: vec![],
options: None,
reserved_range: vec![],
reserved_name: vec![],
};
let parent = DescriptorProto {
name: Some("ParentMessage".to_string()),
field: vec![
FieldDescriptorProto {
name: Some("id".to_string()),
number: Some(1),
label: Some(Label::Optional as i32),
r#type: Some(Type::Int64 as i32),
type_name: None,
extendee: None,
default_value: None,
oneof_index: None,
json_name: None,
options: None,
proto3_optional: None,
},
FieldDescriptorProto {
name: Some("nested".to_string()),
number: Some(2),
label: Some(Label::Optional as i32),
r#type: Some(Type::Message as i32),
type_name: Some(".ParentMessage.NestedMessage".to_string()),
extendee: None,
default_value: None,
oneof_index: None,
json_name: None,
options: None,
proto3_optional: None,
},
],
extension: vec![],
nested_type: vec![nested.clone()],
enum_type: vec![],
extension_range: vec![],
oneof_decl: vec![],
options: None,
reserved_range: vec![],
reserved_name: vec![],
};
(parent, nested)
}
#[test]
fn test_single_nested_message() {
let (parent_desc, nested_desc) = create_nested_descriptor();
let nested_schema = Schema::new(vec![
Field::new("nested_id", DataType::Int64, false),
Field::new("nested_name", DataType::Utf8, false),
]);
let parent_schema = Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("nested", DataType::Struct(nested_schema.fields().clone()), false),
]);
let id_array = Int64Array::from(vec![1]);
let nested_id_array = Int64Array::from(vec![100]);
let nested_name_array = StringArray::from(vec!["nested_value"]);
let struct_array = StructArray::from(vec![
(Field::new("nested_id", DataType::Int64, false), Arc::new(nested_id_array) as Arc<dyn arrow::array::Array>),
(Field::new("nested_name", DataType::Utf8, false), Arc::new(nested_name_array) as Arc<dyn arrow::array::Array>),
]);
let batch = RecordBatch::try_new(
Arc::new(parent_schema),
vec![
Arc::new(id_array),
Arc::new(struct_array),
],
).unwrap();
let mut nested_types = HashMap::new();
nested_types.insert("NestedMessage".to_string(), &nested_desc);
let result = conversion::record_batch_to_protobuf_bytes(&batch, &parent_desc);
if result.failed_rows.is_empty() {
assert_eq!(result.successful_bytes.len(), 1);
let bytes_list: Vec<Vec<u8>> = result.successful_bytes.into_iter().map(|(_, bytes)| bytes).collect();
assert!(!bytes_list[0].is_empty());
} else {
for (_, error) in &result.failed_rows {
match error {
ZerobusError::ConversionError(_) => {
}
_ => {
}
}
}
}
}
#[test]
fn test_repeated_nested_message() {
let (parent_desc, nested_desc) = create_nested_descriptor();
let nested_schema = Schema::new(vec![
Field::new("nested_id", DataType::Int64, false),
Field::new("nested_name", DataType::Utf8, false),
]);
let parent_schema = Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new(
"nested_list",
DataType::List(Arc::new(Field::new("item", DataType::Struct(nested_schema.fields().clone()), false))),
false,
),
]);
let id_array = Int64Array::from(vec![1]);
use arrow::array::ListArray;
use arrow::buffer::OffsetBuffer;
let offsets = OffsetBuffer::from_lengths(vec![0]);
let empty_struct = StructArray::from(vec![]);
let list_array = ListArray::new(
Arc::new(Field::new("item", DataType::Struct(nested_schema.fields().clone()), false)),
offsets,
Arc::new(empty_struct),
None,
);
let batch = RecordBatch::try_new(
Arc::new(parent_schema),
vec![
Arc::new(id_array),
Arc::new(list_array),
],
).unwrap();
let mut parent_with_repeated = parent_desc.clone();
parent_with_repeated.field[1].label = Some(Label::Repeated as i32);
parent_with_repeated.field[1].r#type = Some(Type::Message as i32);
let result = conversion::record_batch_to_protobuf_bytes(&batch, &parent_with_repeated);
if result.failed_rows.is_empty() {
} else {
for (_, error) in &result.failed_rows {
match error {
ZerobusError::ConversionError(_) => {
}
_ => {
}
}
}
}
}
#[test]
fn test_deeply_nested_messages() {
let level3 = DescriptorProto {
name: Some("Level3".to_string()),
field: vec![FieldDescriptorProto {
name: Some("value".to_string()),
number: Some(1),
label: Some(Label::Optional as i32),
r#type: Some(Type::Int64 as i32),
type_name: None,
extendee: None,
default_value: None,
oneof_index: None,
json_name: None,
options: None,
proto3_optional: None,
}],
extension: vec![],
nested_type: vec![],
enum_type: vec![],
extension_range: vec![],
oneof_decl: vec![],
options: None,
reserved_range: vec![],
reserved_name: vec![],
};
let level2 = DescriptorProto {
name: Some("Level2".to_string()),
field: vec![FieldDescriptorProto {
name: Some("level3".to_string()),
number: Some(1),
label: Some(Label::Optional as i32),
r#type: Some(Type::Message as i32),
type_name: Some(".Level1.Level2.Level3".to_string()),
extendee: None,
default_value: None,
oneof_index: None,
json_name: None,
options: None,
proto3_optional: None,
}],
extension: vec![],
nested_type: vec![level3.clone()],
enum_type: vec![],
extension_range: vec![],
oneof_decl: vec![],
options: None,
reserved_range: vec![],
reserved_name: vec![],
};
let level1 = DescriptorProto {
name: Some("Level1".to_string()),
field: vec![FieldDescriptorProto {
name: Some("level2".to_string()),
number: Some(1),
label: Some(Label::Optional as i32),
r#type: Some(Type::Message as i32),
type_name: Some(".Level1.Level2".to_string()),
extendee: None,
default_value: None,
oneof_index: None,
json_name: None,
options: None,
proto3_optional: None,
}],
extension: vec![],
nested_type: vec![level2],
enum_type: vec![],
extension_range: vec![],
oneof_decl: vec![],
options: None,
reserved_range: vec![],
reserved_name: vec![],
};
let result = conversion::validate_protobuf_descriptor(&level1);
assert!(result.is_ok(), "3 levels of nesting should be valid");
}
#[test]
fn test_nested_message_with_missing_descriptor() {
let parent_desc = DescriptorProto {
name: Some("ParentMessage".to_string()),
field: vec![FieldDescriptorProto {
name: Some("nested".to_string()),
number: Some(1),
label: Some(Label::Optional as i32),
r#type: Some(Type::Message as i32),
type_name: Some(".ParentMessage.MissingNested".to_string()), extendee: None,
default_value: None,
oneof_index: None,
json_name: None,
options: None,
proto3_optional: None,
}],
extension: vec![],
nested_type: vec![], enum_type: vec![],
extension_range: vec![],
oneof_decl: vec![],
options: None,
reserved_range: vec![],
reserved_name: vec![],
};
let nested_schema = Schema::new(vec![
Field::new("value", DataType::Int64, false),
]);
let parent_schema = Schema::new(vec![
Field::new("nested", DataType::Struct(nested_schema.fields().clone()), false),
]);
let value_array = Int64Array::from(vec![42]);
let struct_array = StructArray::from(vec![
(Field::new("value", DataType::Int64, false), Arc::new(value_array) as Arc<dyn arrow::array::Array>),
]);
let batch = RecordBatch::try_new(
Arc::new(parent_schema),
vec![Arc::new(struct_array)],
).unwrap();
let result = conversion::record_batch_to_protobuf_bytes(&batch, &parent_desc);
assert!(result.failed_rows.len() > 0, "Missing nested descriptor should result in failed rows");
for (_, error) in &result.failed_rows {
match error {
ZerobusError::ConversionError(msg) => {
assert!(
msg.contains("nested") || msg.contains("descriptor") || msg.contains("type_name") || msg.contains("encoding"),
"Error message should mention nested/descriptor: {}",
msg
);
}
_ => {
}
}
} else {
panic!("Expected ConversionError, got: {:?}", result);
}
}
#[test]
fn test_nested_message_type_name_parsing() {
let type_name = ".ParentMessage.NestedMessage";
let parts: Vec<&str> = type_name.trim_start_matches('.').split('.').collect();
assert_eq!(parts.len(), 2);
assert_eq!(parts[0], "ParentMessage");
assert_eq!(parts[1], "NestedMessage");
if let Some(last_part) = parts.last() {
assert_eq!(*last_part, "NestedMessage");
} else {
panic!("Should have last part");
}
}
#[test]
fn test_nested_message_with_empty_struct() {
let nested_desc = DescriptorProto {
name: Some("EmptyNested".to_string()),
field: vec![], extension: vec![],
nested_type: vec![],
enum_type: vec![],
extension_range: vec![],
oneof_decl: vec![],
options: None,
reserved_range: vec![],
reserved_name: vec![],
};
let parent_desc = DescriptorProto {
name: Some("ParentMessage".to_string()),
field: vec![FieldDescriptorProto {
name: Some("nested".to_string()),
number: Some(1),
label: Some(Label::Optional as i32),
r#type: Some(Type::Message as i32),
type_name: Some(".ParentMessage.EmptyNested".to_string()),
extendee: None,
default_value: None,
oneof_index: None,
json_name: None,
options: None,
proto3_optional: None,
}],
extension: vec![],
nested_type: vec![nested_desc],
enum_type: vec![],
extension_range: vec![],
oneof_decl: vec![],
options: None,
reserved_range: vec![],
reserved_name: vec![],
};
let parent_schema = Schema::new(vec![
Field::new("nested", DataType::Struct(vec![]), false),
]);
let empty_struct = StructArray::from(vec![]);
let batch = RecordBatch::try_new(
Arc::new(parent_schema),
vec![Arc::new(empty_struct)],
).unwrap();
let result = conversion::record_batch_to_protobuf_bytes(&batch, &parent_desc);
if result.failed_rows.is_empty() {
assert_eq!(result.successful_bytes.len(), 1);
} else {
for (_, error) in &result.failed_rows {
match error {
ZerobusError::ConversionError(_) => {
}
_ => {
}
}
}
}
}