use crate::error::ZerobusError;
use crate::wrapper::protobuf_serialization::{encode_tag, encode_varint};
use arrow::array::*;
use arrow::datatypes::DataType;
use arrow::record_batch::RecordBatch;
use prost_types::{
field_descriptor_proto::Label, field_descriptor_proto::Type, DescriptorProto,
FieldDescriptorProto,
};
use std::sync::Arc;
use tracing::debug;
const MAX_NESTING_DEPTH: usize = 10;
const MAX_FIELDS_PER_MESSAGE: usize = 2000;
const MIN_FIELD_NUMBER: i32 = 1;
const MAX_FIELD_NUMBER: i32 = 536870911;
const MAX_RECORD_SIZE_BYTES: usize = 4_194_285;
pub fn validate_protobuf_descriptor(descriptor: &DescriptorProto) -> Result<(), ZerobusError> {
validate_descriptor_recursive(descriptor, 0)
}
fn validate_descriptor_recursive(
descriptor: &DescriptorProto,
depth: usize,
) -> Result<(), ZerobusError> {
if depth > MAX_NESTING_DEPTH {
return Err(ZerobusError::ConfigurationError(format!(
"Protobuf descriptor nesting depth ({}) exceeds maximum ({})",
depth, MAX_NESTING_DEPTH
)));
}
if descriptor.field.len() > MAX_FIELDS_PER_MESSAGE {
return Err(ZerobusError::ConfigurationError(format!(
"Protobuf descriptor field count ({}) exceeds maximum ({})",
descriptor.field.len(),
MAX_FIELDS_PER_MESSAGE
)));
}
for field in &descriptor.field {
if let Some(field_number) = field.number {
if !(MIN_FIELD_NUMBER..=MAX_FIELD_NUMBER).contains(&field_number) {
return Err(ZerobusError::ConfigurationError(format!(
"Invalid Protobuf field number: {} (must be between {} and {})",
field_number, MIN_FIELD_NUMBER, MAX_FIELD_NUMBER
)));
}
}
}
for nested_type in &descriptor.nested_type {
validate_descriptor_recursive(nested_type, depth + 1)?;
}
Ok(())
}
#[derive(Debug)]
pub struct ProtobufConversionResult {
pub successful_bytes: Vec<(usize, Vec<u8>)>,
pub failed_rows: Vec<(usize, ZerobusError)>,
}
pub fn record_batch_to_protobuf_bytes(
batch: &RecordBatch,
descriptor: &DescriptorProto,
) -> ProtobufConversionResult {
let schema = batch.schema();
let num_rows = batch.num_rows();
if num_rows == 0 {
return ProtobufConversionResult {
successful_bytes: vec![],
failed_rows: vec![],
};
}
let field_by_name: std::collections::HashMap<String, &FieldDescriptorProto> = descriptor
.field
.iter()
.filter_map(|f| f.name.as_ref().map(|name| (name.clone(), f)))
.collect();
let nested_types_by_name: std::collections::HashMap<String, &DescriptorProto> = descriptor
.nested_type
.iter()
.filter_map(|nt| {
nt.name.as_ref().map(|name| {
(name.clone(), nt)
})
})
.collect();
let mut successful_bytes = Vec::new();
let mut failed_rows = Vec::new();
for row_idx in 0..num_rows {
let mut row_buffer = Vec::new();
let mut row_failed = false;
let mut row_error: Option<ZerobusError> = None;
for (field_idx, field) in schema.fields().iter().enumerate() {
let array = batch.column(field_idx);
if let Some(field_desc) = field_by_name.get(field.name()) {
let field_number = field_desc.number.unwrap_or(0);
if let Err(e) = encode_arrow_field_to_protobuf(
&mut row_buffer,
field_number,
field_desc,
array,
row_idx,
descriptor,
Some(&nested_types_by_name),
) {
row_failed = true;
row_error = Some(ZerobusError::ConversionError(format!(
"Field encoding failed: field='{}', row={}, error={}",
field.name(),
row_idx,
e
)));
break; }
} else {
debug!("Field '{}' not found in descriptor, skipping", field.name());
}
}
if row_failed {
if let Some(error) = row_error {
failed_rows.push((row_idx, error));
}
} else {
if row_buffer.len() > MAX_RECORD_SIZE_BYTES {
failed_rows.push((
row_idx,
ZerobusError::ConversionError(format!(
"Record size ({}) exceeds Zerobus limit of {} bytes (4MB). Headers require 19 bytes, leaving {} bytes for payload.",
row_buffer.len(),
MAX_RECORD_SIZE_BYTES + 19,
MAX_RECORD_SIZE_BYTES
)),
));
} else {
successful_bytes.push((row_idx, row_buffer));
}
}
}
ProtobufConversionResult {
successful_bytes,
failed_rows,
}
}
fn encode_arrow_field_to_protobuf(
buffer: &mut Vec<u8>,
field_number: i32,
field_desc: &FieldDescriptorProto,
array: &Arc<dyn Array>,
row_idx: usize,
_parent_descriptor: &DescriptorProto,
nested_types: Option<&std::collections::HashMap<String, &DescriptorProto>>,
) -> Result<(), ZerobusError> {
if array.is_null(row_idx) {
return Ok(());
}
let protobuf_type = field_desc.r#type.unwrap_or(9); let is_repeated = field_desc.label == Some(Label::Repeated as i32);
if is_repeated {
if let Some(list_array) = array.as_any().downcast_ref::<ListArray>() {
let offsets = list_array.value_offsets();
let start = offsets[row_idx] as usize;
let end = offsets[row_idx + 1] as usize;
let values = list_array.values();
if protobuf_type == 11 {
if let Some(type_name) = &field_desc.type_name {
let nested_descriptor = if let Some(nested_map) = nested_types {
let parts: Vec<&str> =
type_name.trim_start_matches('.').split('.').collect();
if let Some(last_part) = parts.last() {
nested_map.get(*last_part)
} else {
None
}
} else {
None
};
if let Some(nested_desc) = nested_descriptor {
if let Some(struct_array) = values.as_any().downcast_ref::<StructArray>() {
for i in start..end {
if !struct_array.is_null(i) {
let wire_type = 2u32;
encode_tag(buffer, field_number, wire_type)?;
let mut nested_buffer = Vec::new();
let nested_schema = struct_array.fields();
let nested_field_by_name: std::collections::HashMap<
String,
&FieldDescriptorProto,
> = nested_desc
.field
.iter()
.filter_map(|f| {
f.name.as_ref().map(|name| (name.clone(), f))
})
.collect();
let nested_nested_types: std::collections::HashMap<
String,
&DescriptorProto,
> = nested_desc
.nested_type
.iter()
.filter_map(|nt| {
nt.name.as_ref().map(|name| (name.clone(), nt))
})
.collect();
for (field_idx, field) in nested_schema.iter().enumerate() {
let nested_array = struct_array.column(field_idx);
if let Some(nested_field_desc) =
nested_field_by_name.get(field.name())
{
let nested_field_number =
nested_field_desc.number.unwrap_or(0);
if let Err(e) = encode_arrow_field_to_protobuf(
&mut nested_buffer,
nested_field_number,
nested_field_desc,
nested_array,
i, nested_desc,
Some(&nested_nested_types),
) {
return Err(ZerobusError::ConversionError(format!(
"Repeated nested message encoding failed: field='{}', element={}, error={}",
field_desc.name.as_ref().unwrap_or(&"unknown".to_string()),
i,
e
)));
}
}
}
encode_varint(buffer, nested_buffer.len() as u64)?;
buffer.extend_from_slice(&nested_buffer);
}
}
return Ok(());
} else {
return Err(ZerobusError::ConversionError(format!(
"Invalid array type: field='{}', expected='StructArray', found='ListArray'",
field_desc.name.as_ref().unwrap_or(&"unknown".to_string())
)));
}
} else {
return Err(ZerobusError::ConversionError(format!(
"Nested type not found: field='{}', type_name='{}', issue='descriptor_missing'",
field_desc.name.as_ref().unwrap_or(&"unknown".to_string()),
type_name
)));
}
} else {
return Err(ZerobusError::ConversionError(format!(
"Missing type_name: field='{}', issue='required_for_nested_message'",
field_desc.name.as_ref().unwrap_or(&"unknown".to_string())
)));
}
} else {
for i in start..end {
if !values.is_null(i) {
encode_arrow_value_to_protobuf(
buffer,
field_number,
field_desc,
values,
i,
)?;
}
}
return Ok(());
}
} else if protobuf_type == 11 {
}
}
if protobuf_type == 11 {
if let Some(type_name) = &field_desc.type_name {
let nested_descriptor = if let Some(nested_map) = nested_types {
let parts: Vec<&str> = type_name.trim_start_matches('.').split('.').collect();
if let Some(last_part) = parts.last() {
nested_map.get(*last_part)
} else {
None
}
} else {
None
};
if let Some(nested_desc) = nested_descriptor {
if let Some(struct_array) = array.as_any().downcast_ref::<StructArray>() {
let wire_type = 2u32;
encode_tag(buffer, field_number, wire_type)?;
let mut nested_buffer = Vec::new();
let nested_schema = struct_array.fields();
let nested_field_by_name: std::collections::HashMap<
String,
&FieldDescriptorProto,
> = nested_desc
.field
.iter()
.filter_map(|f| f.name.as_ref().map(|name| (name.clone(), f)))
.collect();
let nested_nested_types: std::collections::HashMap<String, &DescriptorProto> =
nested_desc
.nested_type
.iter()
.filter_map(|nt| nt.name.as_ref().map(|name| (name.clone(), nt)))
.collect();
for (field_idx, field) in nested_schema.iter().enumerate() {
let nested_array = struct_array.column(field_idx);
if let Some(nested_field_desc) = nested_field_by_name.get(field.name()) {
let nested_field_number = nested_field_desc.number.unwrap_or(0);
if let Err(e) = encode_arrow_field_to_protobuf(
&mut nested_buffer,
nested_field_number,
nested_field_desc,
nested_array,
row_idx,
nested_desc,
Some(&nested_nested_types),
) {
return Err(ZerobusError::ConversionError(format!(
"Nested field encoding failed: field='{}', row={}, error={}",
field.name(),
row_idx,
e
)));
}
}
}
encode_varint(buffer, nested_buffer.len() as u64)?;
buffer.extend_from_slice(&nested_buffer);
return Ok(());
} else {
return Err(ZerobusError::ConversionError(format!(
"Invalid array type: field='{}', expected='StructArray', issue='nested_message_required'",
field_desc.name.as_ref().unwrap_or(&"unknown".to_string())
)));
}
} else {
return Err(ZerobusError::ConversionError(format!(
"Nested type not found: field='{}', type_name='{}', issue='descriptor_missing'",
field_desc.name.as_ref().unwrap_or(&"unknown".to_string()),
type_name
)));
}
} else {
return Err(ZerobusError::ConversionError(format!(
"Missing type_name: field='{}', issue='required_for_nested_message'",
field_desc.name.as_ref().unwrap_or(&"unknown".to_string())
)));
}
}
if protobuf_type == 11 {
if let Some(type_name) = &field_desc.type_name {
let nested_descriptor = if let Some(nested_map) = nested_types {
let parts: Vec<&str> = type_name.trim_start_matches('.').split('.').collect();
if let Some(last_part) = parts.last() {
nested_map.get(*last_part)
} else {
None
}
} else {
None
};
if let Some(nested_desc) = nested_descriptor {
if let Some(struct_array) = array.as_any().downcast_ref::<StructArray>() {
let wire_type = 2u32;
encode_tag(buffer, field_number, wire_type)?;
let mut nested_buffer = Vec::new();
let nested_schema = struct_array.fields();
let nested_field_by_name: std::collections::HashMap<
String,
&FieldDescriptorProto,
> = nested_desc
.field
.iter()
.filter_map(|f| f.name.as_ref().map(|name| (name.clone(), f)))
.collect();
let nested_nested_types: std::collections::HashMap<String, &DescriptorProto> =
nested_desc
.nested_type
.iter()
.filter_map(|nt| nt.name.as_ref().map(|name| (name.clone(), nt)))
.collect();
for (field_idx, field) in nested_schema.iter().enumerate() {
let nested_array = struct_array.column(field_idx);
if let Some(nested_field_desc) = nested_field_by_name.get(field.name()) {
let nested_field_number = nested_field_desc.number.unwrap_or(0);
if let Err(e) = encode_arrow_field_to_protobuf(
&mut nested_buffer,
nested_field_number,
nested_field_desc,
nested_array,
row_idx,
nested_desc,
Some(&nested_nested_types),
) {
return Err(ZerobusError::ConversionError(format!(
"Nested field encoding failed: field='{}', row={}, error={}",
field.name(),
row_idx,
e
)));
}
}
}
encode_varint(buffer, nested_buffer.len() as u64)?;
buffer.extend_from_slice(&nested_buffer);
return Ok(());
}
}
}
}
if array.as_any().downcast_ref::<StructArray>().is_some() {
if let Some(type_name) = &field_desc.type_name {
let nested_descriptor = if let Some(nested_map) = nested_types {
let parts: Vec<&str> = type_name.trim_start_matches('.').split('.').collect();
if let Some(last_part) = parts.last() {
nested_map.get(*last_part)
} else {
None
}
} else {
None
};
if let Some(nested_desc) = nested_descriptor {
if let Some(struct_array) = array.as_any().downcast_ref::<StructArray>() {
let wire_type = 2u32;
encode_tag(buffer, field_number, wire_type)?;
let mut nested_buffer = Vec::new();
let nested_schema = struct_array.fields();
let nested_field_by_name: std::collections::HashMap<
String,
&FieldDescriptorProto,
> = nested_desc
.field
.iter()
.filter_map(|f| f.name.as_ref().map(|name| (name.clone(), f)))
.collect();
let nested_nested_types: std::collections::HashMap<String, &DescriptorProto> =
nested_desc
.nested_type
.iter()
.filter_map(|nt| nt.name.as_ref().map(|name| (name.clone(), nt)))
.collect();
for (field_idx, field) in nested_schema.iter().enumerate() {
let nested_array = struct_array.column(field_idx);
if let Some(nested_field_desc) = nested_field_by_name.get(field.name()) {
let nested_field_number = nested_field_desc.number.unwrap_or(0);
if let Err(e) = encode_arrow_field_to_protobuf(
&mut nested_buffer,
nested_field_number,
nested_field_desc,
nested_array,
row_idx,
nested_desc,
Some(&nested_nested_types),
) {
return Err(ZerobusError::ConversionError(format!(
"Nested field encoding failed: field='{}', row={}, error={}",
field.name(),
row_idx,
e
)));
}
}
}
encode_varint(buffer, nested_buffer.len() as u64)?;
buffer.extend_from_slice(&nested_buffer);
return Ok(());
}
}
}
}
encode_arrow_value_to_protobuf(buffer, field_number, field_desc, array, row_idx)
}
fn encode_arrow_value_to_protobuf(
buffer: &mut Vec<u8>,
field_number: i32,
field_desc: &FieldDescriptorProto,
array: &Arc<dyn Array>,
row_idx: usize,
) -> Result<(), ZerobusError> {
let protobuf_type = field_desc.r#type.unwrap_or(9);
match protobuf_type {
1 => {
let arr = array
.as_any()
.downcast_ref::<Float64Array>()
.ok_or_else(|| {
ZerobusError::ConversionError("Expected Float64Array".to_string())
})?;
let wire_type = 1u32; encode_tag(buffer, field_number, wire_type)?;
buffer.extend_from_slice(&arr.value(row_idx).to_le_bytes());
Ok(())
}
2 => {
let arr = array
.as_any()
.downcast_ref::<Float32Array>()
.ok_or_else(|| {
ZerobusError::ConversionError("Expected Float32Array".to_string())
})?;
let wire_type = 5u32; encode_tag(buffer, field_number, wire_type)?;
buffer.extend_from_slice(&arr.value(row_idx).to_le_bytes());
Ok(())
}
3 => {
if let Some(arr) = array.as_any().downcast_ref::<Int64Array>() {
let wire_type = 0u32; encode_tag(buffer, field_number, wire_type)?;
encode_varint(buffer, arr.value(row_idx) as u64)?;
Ok(())
} else if let Some(arr) = array.as_any().downcast_ref::<arrow::array::Date64Array>() {
let wire_type = 0u32; encode_tag(buffer, field_number, wire_type)?;
encode_varint(buffer, arr.value(row_idx) as u64)?;
Ok(())
} else if let Some(arr) = array
.as_any()
.downcast_ref::<arrow::array::TimestampMicrosecondArray>()
{
let wire_type = 0u32; encode_tag(buffer, field_number, wire_type)?;
encode_varint(buffer, arr.value(row_idx) as u64)?;
Ok(())
} else if let Some(arr) = array
.as_any()
.downcast_ref::<arrow::array::TimestampMillisecondArray>()
{
let wire_type = 0u32; encode_tag(buffer, field_number, wire_type)?;
encode_varint(buffer, (arr.value(row_idx) * 1000) as u64)?; Ok(())
} else if let Some(arr) = array
.as_any()
.downcast_ref::<arrow::array::TimestampSecondArray>()
{
let wire_type = 0u32; encode_tag(buffer, field_number, wire_type)?;
encode_varint(buffer, (arr.value(row_idx) * 1_000_000) as u64)?; Ok(())
} else if let Some(arr) = array
.as_any()
.downcast_ref::<arrow::array::TimestampNanosecondArray>()
{
let wire_type = 0u32; encode_tag(buffer, field_number, wire_type)?;
encode_varint(buffer, (arr.value(row_idx) / 1000) as u64)?; Ok(())
} else {
Err(ZerobusError::ConversionError(format!(
"Expected Int64Array or TimestampArray for Int64 field, got: {:?}",
array.data_type()
)))
}
}
4 => {
let arr = array
.as_any()
.downcast_ref::<UInt64Array>()
.ok_or_else(|| ZerobusError::ConversionError("Expected UInt64Array".to_string()))?;
let wire_type = 0u32; encode_tag(buffer, field_number, wire_type)?;
encode_varint(buffer, arr.value(row_idx))?;
Ok(())
}
5 => {
if let Some(arr) = array.as_any().downcast_ref::<Int32Array>() {
let wire_type = 0u32; encode_tag(buffer, field_number, wire_type)?;
encode_varint(buffer, arr.value(row_idx) as u64)?;
Ok(())
} else if let Some(arr) = array.as_any().downcast_ref::<arrow::array::Date32Array>() {
let wire_type = 0u32; encode_tag(buffer, field_number, wire_type)?;
encode_varint(buffer, arr.value(row_idx) as u64)?;
Ok(())
} else {
Err(ZerobusError::ConversionError(format!(
"Expected Int32Array or Date32Array for Int32 field, got: {:?}",
array.data_type()
)))
}
}
8 => {
let arr = array
.as_any()
.downcast_ref::<BooleanArray>()
.ok_or_else(|| {
ZerobusError::ConversionError("Expected BooleanArray".to_string())
})?;
let wire_type = 0u32; encode_tag(buffer, field_number, wire_type)?;
encode_varint(buffer, if arr.value(row_idx) { 1 } else { 0 })?;
Ok(())
}
9 => {
let arr = array
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| ZerobusError::ConversionError("Expected StringArray".to_string()))?;
let wire_type = 2u32; encode_tag(buffer, field_number, wire_type)?;
let bytes = arr.value(row_idx).as_bytes();
encode_varint(buffer, bytes.len() as u64)?;
buffer.extend_from_slice(bytes);
Ok(())
}
12 => {
let arr = array
.as_any()
.downcast_ref::<BinaryArray>()
.ok_or_else(|| ZerobusError::ConversionError("Expected BinaryArray".to_string()))?;
let wire_type = 2u32; encode_tag(buffer, field_number, wire_type)?;
let bytes = arr.value(row_idx);
encode_varint(buffer, bytes.len() as u64)?;
buffer.extend_from_slice(bytes);
Ok(())
}
17 => {
if let Some(arr) = array.as_any().downcast_ref::<StringArray>() {
let wire_type = 2u32; encode_tag(buffer, field_number, wire_type)?;
let bytes = arr.value(row_idx).as_bytes();
encode_varint(buffer, bytes.len() as u64)?;
buffer.extend_from_slice(bytes);
Ok(())
} else if let Some(arr) = array.as_any().downcast_ref::<Int32Array>() {
let wire_type = 0u32; encode_tag(buffer, field_number, wire_type)?;
use crate::wrapper::protobuf_serialization::encode_sint32;
encode_sint32(buffer, arr.value(row_idx))?;
Ok(())
} else {
Err(ZerobusError::ConversionError(format!(
"Expected Int32Array or StringArray for SInt32 field '{}', got: {:?}",
field_desc.name.as_ref().unwrap_or(&"unknown".to_string()),
array.data_type()
)))
}
}
18 => {
if let Some(arr) = array.as_any().downcast_ref::<StringArray>() {
let wire_type = 2u32; encode_tag(buffer, field_number, wire_type)?;
let bytes = arr.value(row_idx).as_bytes();
encode_varint(buffer, bytes.len() as u64)?;
buffer.extend_from_slice(bytes);
Ok(())
} else if let Some(arr) = array.as_any().downcast_ref::<Int64Array>() {
let wire_type = 0u32; encode_tag(buffer, field_number, wire_type)?;
use crate::wrapper::protobuf_serialization::encode_sint64;
encode_sint64(buffer, arr.value(row_idx))?;
Ok(())
} else {
Err(ZerobusError::ConversionError(format!(
"Expected Int64Array or StringArray for SInt64 field '{}', got: {:?}",
field_desc.name.as_ref().unwrap_or(&"unknown".to_string()),
array.data_type()
)))
}
}
_ => {
if protobuf_type == 11 {
let field_name = field_desc.name.as_deref().unwrap_or("unknown");
let is_repeated_for_log = field_desc.label == Some(Label::Repeated as i32);
return Err(ZerobusError::ConversionError(format!(
"Protobuf type 11 (Message) reached encode_arrow_value_to_protobuf for field '{}'. \
This indicates a bug in the routing logic - nested messages should be handled by \
encode_arrow_field_to_protobuf. Field descriptor: type={:?}, type_name={:?}, \
is_repeated={:?}, label={:?}, array_type={:?}. \
Please check the routing logic in encode_arrow_field_to_protobuf.",
field_name,
protobuf_type,
field_desc.type_name,
is_repeated_for_log,
field_desc.label,
array.data_type()
)));
}
Err(ZerobusError::ConversionError(format!(
"Unsupported Protobuf type: {}",
protobuf_type
)))
}
}
}
pub fn generate_protobuf_descriptor(
schema: &arrow::datatypes::Schema,
) -> Result<DescriptorProto, ZerobusError> {
generate_protobuf_descriptor_internal(schema, "ZerobusMessage")
}
fn generate_protobuf_descriptor_internal(
schema: &arrow::datatypes::Schema,
message_name: &str,
) -> Result<DescriptorProto, ZerobusError> {
use prost_types::FieldDescriptorProto;
let mut fields = Vec::new();
let mut nested_types = Vec::new();
let mut field_number = 1;
for field in schema.fields().iter() {
let field_name = field.name();
if !field_name
.chars()
.all(|c| c.is_ascii_alphanumeric() || c == '_')
{
return Err(ZerobusError::ConfigurationError(format!(
"Column name '{}' must contain only ASCII letters, digits, and underscores (Zerobus requirement)",
field_name
)));
}
let is_repeated = matches!(
field.data_type(),
DataType::List(_) | DataType::LargeList(_)
);
let (_inner_data_type, field_type) = match field.data_type() {
DataType::List(inner_field) | DataType::LargeList(inner_field) => (
inner_field.data_type(),
arrow_type_to_protobuf_type(inner_field.data_type())?,
),
_ => (
field.data_type(),
arrow_type_to_protobuf_type(field.data_type())?,
),
};
let type_name = if field_type == Type::Message {
let struct_fields = match field.data_type() {
DataType::Struct(sf) => sf,
DataType::List(inner_field) | DataType::LargeList(inner_field) => {
if let DataType::Struct(sf) = inner_field.data_type() {
sf
} else {
return Err(ZerobusError::ConversionError(format!(
"List field '{}' contains non-Struct type: {:?}",
field.name(),
inner_field.data_type()
)));
}
}
_ => {
return Err(ZerobusError::ConversionError(format!(
"Field '{}' has Message type but is not a Struct or List<Struct>: {:?}",
field.name(),
field.data_type()
)));
}
};
let nested_message_name = format!("{}_{}", message_name, field.name());
let nested_type_name = format!(".{}.{}", message_name, nested_message_name);
let nested_schema = arrow::datatypes::Schema::new(struct_fields.clone());
let nested_descriptor =
generate_protobuf_descriptor_internal(&nested_schema, &nested_message_name)?;
nested_types.push(nested_descriptor);
Some(nested_type_name)
} else {
None
};
fields.push(FieldDescriptorProto {
name: Some(field.name().clone()),
number: Some(field_number),
label: Some(if is_repeated {
Label::Repeated as i32
} else {
Label::Optional as i32
}),
r#type: Some(field_type as i32),
type_name,
extendee: None,
default_value: None,
oneof_index: None,
json_name: None,
options: None,
proto3_optional: None,
});
field_number += 1;
}
Ok(DescriptorProto {
name: Some(message_name.to_string()),
field: fields,
extension: vec![],
nested_type: nested_types,
enum_type: vec![],
extension_range: vec![],
oneof_decl: vec![],
options: None,
reserved_range: vec![],
reserved_name: vec![],
})
}
fn arrow_type_to_protobuf_type(
arrow_type: &arrow::datatypes::DataType,
) -> Result<Type, ZerobusError> {
use arrow::datatypes::DataType;
match arrow_type {
DataType::Int8 | DataType::Int16 | DataType::Int32 => Ok(Type::Int32),
DataType::Int64 => Ok(Type::Int64),
DataType::UInt8 | DataType::UInt16 | DataType::UInt32 => Ok(Type::Int32), DataType::UInt64 => Ok(Type::Int64), DataType::Float32 => Ok(Type::Float),
DataType::Float64 => Ok(Type::Double),
DataType::Boolean => Ok(Type::Bool),
DataType::Utf8 | DataType::LargeUtf8 => Ok(Type::String),
DataType::Binary | DataType::LargeBinary => Ok(Type::Bytes),
DataType::Timestamp(_, _) => Ok(Type::Int64), DataType::Date32 => Ok(Type::Int32), DataType::Date64 => Ok(Type::Int64), DataType::List(inner_type) | DataType::LargeList(inner_type) => {
arrow_type_to_protobuf_type(inner_type.data_type())
}
DataType::Struct(_) => Ok(Type::Message), _ => Err(ZerobusError::ConversionError(format!(
"Unsupported Arrow type: {:?}",
arrow_type
))),
}
}