from avro.schema import Schema
from bitstring import BitArray
def process_bitmap(avro_schema: Schema, bitmap_fields: list):
fields = []
if len(bitmap_fields) != 0:
if bitmap_fields[0].startswith("0x"):
bitmap = bitmap_fields[0]
fields = fields + get_fieldnames_from_bitstring(bitmap, avro_schema)
bitmap_fields.remove(bitmap)
if len(bitmap_fields) != 0 and "-" in str(bitmap_fields[-1]):
for bitmap_field in bitmap_fields:
if bitmap_field is not None and "-" in str(bitmap_field):
bitmap_strings = bitmap_field.split("-")
parent_field = avro_schema.fields[int(bitmap_strings[0])]
child_schema = get_value_schema(parent_field.type)
if child_schema.type is not None and child_schema.type == 'record':
nested_size = len(child_schema.fields)
parent_field_name = parent_field.name
full_field_names = get_fieldnames_from_bitstring(bitmap_strings[1], child_schema)
full_field_names = append_parent_name(parent_field_name, full_field_names)
if len(full_field_names) > 0:
fields = fields + full_field_names
return fields
def convert_hexbinary_to_bitset(bitmap):
bit_array = BitArray(hex=bitmap[2:])
binary_string = bit_array.bin
return binary_string[::-1]
def append_parent_name(parent_field_name, full_field_names):
for index in range(len(full_field_names)):
full_field_names[index] = parent_field_name + "." + full_field_names[index]
return full_field_names
def get_fieldnames_from_bitstring(bitmap, avro_schema: Schema):
bitmap_field_name = []
fields_list = list(avro_schema.fields)
binary_string = convert_hexbinary_to_bitset(bitmap)
indexes = find('1', binary_string)
for index in indexes:
bitmap_field_name.append(fields_list[index].name)
return bitmap_field_name
def get_value_schema(parent_field):
if parent_field.type == 'union':
schemas = parent_field.schemas
if len(schemas) == 2 and schemas[0].type == 'null':
return schemas[1]
if len(schemas) == 2 and schemas[0].type == 'string':
return schemas[1]
if len(schemas) == 3 and schemas[0].type == 'null' and schemas[1].type == 'string':
return schemas[2]
return parent_field
def find(to_find, binary_string):
return [i for i, x in enumerate(binary_string) if x == to_find]