import re
from typing import Dict, List, Optional, TYPE_CHECKING
from ..naming import (
convert_to_snake_case,
convert_to_pascal_case,
escape_rust_keyword,
)
from ..type_mapping import MatterType
from .tlv_helpers import (
_generate_struct_field_assignments,
generate_field_tlv_encoding,
generate_optional_field_push,
)
from .field import MatterField
if TYPE_CHECKING:
from .enums import MatterEnum, MatterBitmap
from .structs import MatterStruct
def _element_to_push(element_str: str) -> str:
stripped = element_str.strip().rstrip(',')
return f"tlv_fields.push({stripped});"
class MatterCommand:
def __init__(self, id: str, name: str, direction: str, response_name: Optional[str] = None):
self.id = id
self.name = name
self.direction = direction
self.response_name = response_name
self.fields: List[MatterField] = []
def add_field(self, field: MatterField):
self.fields.append(field)
def get_rust_function_name(self) -> str:
return f"encode_{escape_rust_keyword(convert_to_snake_case(self.name))}"
def get_rust_params_struct_name(self) -> str:
return f"{convert_to_pascal_case(self.name)}Params"
def render_params(self, structs: Dict[str, 'MatterStruct'], enums: Optional[Dict[str, 'MatterEnum']] = None, bitmaps: Optional[Dict[str, 'MatterBitmap']] = None):
param_fields = []
for field in self.fields:
param_name = field.get_rust_param_name()
if field.is_list and field.entry_type and structs and field.entry_type in structs:
item_struct = structs[field.entry_type]
rust_type = f"Vec<{item_struct.get_rust_struct_name()}>"
elif field.is_list and field.entry_type and field.entry_type.endswith('Struct') and (not structs or field.entry_type not in structs):
continue
elif not field.is_list and field.field_type.endswith('Struct') and structs and field.field_type in structs:
struct_def = structs[field.field_type]
rust_type = struct_def.get_rust_struct_name()
elif not field.is_list and field.field_type.endswith('Struct') and (not structs or field.field_type not in structs):
continue
elif not field.is_list and field.field_type.endswith('Enum'):
if enums and field.field_type in enums:
rust_type = enums[field.field_type].get_rust_enum_name()
else:
rust_type = 'u8'
elif not field.is_list and field.field_type.endswith('Bitmap'):
if bitmaps and field.field_type in bitmaps:
rust_type = bitmaps[field.field_type].get_rust_bitmap_name()
else:
rust_type = 'u8'
else:
rust_type = MatterType.get_rust_type(field.entry_type if field.is_list and field.entry_type else field.field_type, field.is_list, enums=enums, bitmaps=bitmaps)
if field.nullable or not field.mandatory:
rust_type = f"Option<{rust_type}>"
param_fields.append((param_name, rust_type))
use_param_struct = len(param_fields) > 7
param_struct_name = self.get_rust_params_struct_name() if use_param_struct else None
return param_fields, use_param_struct, param_struct_name
def generate_rust_function(self, structs: Dict[str, 'MatterStruct'], enums: Optional[Dict[str, 'MatterEnum']] = None, bitmaps: Optional[Dict[str, 'MatterBitmap']] = None) -> str:
func_name = self.get_rust_function_name()
param_fields, use_param_struct, struct_name = self.render_params(structs, enums, bitmaps)
params = [f"{name}: {typ}" for name, typ in param_fields]
if use_param_struct:
param_str = f"params: {struct_name}"
param_prefix = "params."
else:
param_str = ", ".join(params) if params else ""
param_prefix = ""
def _is_supported_optional(f):
if f.mandatory or f.nullable:
return False
if f.field_type.endswith('Struct') and (not structs or f.field_type not in structs):
return False
if f.is_list and f.entry_type and f.entry_type.endswith('Struct') and (not structs or f.entry_type not in structs):
return False
return True
has_truly_optional = any(_is_supported_optional(f) for f in self.fields)
pre_statements = [] tlv_fields = [] push_statements = []
for field in self.fields:
if not field.is_list and field.field_type.endswith('Struct') and (not structs or field.field_type not in structs):
continue
param_name = field.get_rust_param_name()
full_param_name = f"{param_prefix}{param_name}"
if has_truly_optional:
if not field.mandatory and not field.nullable:
push_stmt = generate_optional_field_push(field, full_param_name, structs, enums, bitmaps)
if push_stmt:
push_statements.append(push_stmt)
else:
encoding_result = generate_field_tlv_encoding(field, full_param_name, structs, enums, bitmaps)
if not encoding_result:
continue
if '\n' in encoding_result and field.field_type.endswith('Struct'):
lines = encoding_result.split('\n')
pre_statements.extend(lines[:-1])
push_statements.append(_element_to_push(lines[-1]))
else:
push_statements.append(_element_to_push(encoding_result))
else:
encoding_result = generate_field_tlv_encoding(field, full_param_name, structs, enums, bitmaps)
if not encoding_result:
continue
if '\n' in encoding_result and field.field_type.endswith('Struct'):
lines = encoding_result.split('\n')
pre_statements.extend(lines[:-1]) tlv_fields.append(lines[-1]) else:
tlv_fields.append(encoding_result)
clean_id = self.id
struct_def = ""
if use_param_struct:
struct_fields_str = "\n".join([f" pub {name}: {typ}," for name, typ in param_fields])
struct_def = f'''/// Parameters for {self.name} command
pub struct {struct_name} {{
{struct_fields_str}
}}
'''
if has_truly_optional:
pre_stmt_str = "\n ".join(pre_statements) if pre_statements else ""
pre_block = f"\n {pre_stmt_str}" if pre_statements else ""
push_stmts_str = "\n ".join(push_statements) if push_statements else "// No fields"
function = f'''{struct_def}/// Encode {self.name} command ({clean_id})
pub fn {func_name}({param_str}) -> anyhow::Result<Vec<u8>> {{{pre_block}
let mut tlv_fields: Vec<tlv::TlvItemEnc> = Vec::new();
{push_stmts_str}
let tlv = tlv::TlvItemEnc {{
tag: 0,
value: tlv::TlvItemValueEnc::StructInvisible(tlv_fields),
}};
Ok(tlv.encode()?)
}}'''
elif pre_statements:
pre_stmt_str = "\n ".join(pre_statements)
tlv_fields_str = "\n".join(tlv_fields) if tlv_fields else " // No fields"
function = f'''{struct_def}/// Encode {self.name} command ({clean_id})
pub fn {func_name}({param_str}) -> anyhow::Result<Vec<u8>> {{
{pre_stmt_str}
let tlv = tlv::TlvItemEnc {{
tag: 0,
value: tlv::TlvItemValueEnc::StructInvisible(vec![
{tlv_fields_str}
]),
}};
Ok(tlv.encode()?)
}}'''
else:
tlv_fields_str = "\n".join(tlv_fields) if tlv_fields else " // No fields"
function = f'''{struct_def}/// Encode {self.name} command ({clean_id})
pub fn {func_name}({param_str}) -> anyhow::Result<Vec<u8>> {{
let tlv = tlv::TlvItemEnc {{
tag: 0,
value: tlv::TlvItemValueEnc::StructInvisible(vec![
{tlv_fields_str}
]),
}};
Ok(tlv.encode()?)
}}'''
return function
class MatterCommandResponse:
def __init__(self, id: str, name: str):
self.id = id
self.name = name
self.fields: List[MatterField] = []
def add_field(self, field: MatterField):
self.fields.append(field)
def get_rust_struct_name(self) -> str:
words = re.findall(r'[A-Z][a-z]*', self.name)
return ''.join(words) if words else self.name
def generate_rust_struct(self, structs: Optional[Dict[str, 'MatterStruct']] = None, enums: Optional[Dict[str, 'MatterEnum']] = None, bitmaps: Optional[Dict[str, 'MatterBitmap']] = None) -> str:
from .tlv_helpers import _generate_rust_struct_definition
struct_name = self.get_rust_struct_name()
return _generate_rust_struct_definition(struct_name, self.fields, structs, enums, bitmaps)
def generate_decode_function(self, structs: Optional[Dict[str, 'MatterStruct']] = None, enums: Optional[Dict[str, 'MatterEnum']] = None, bitmaps: Optional[Dict[str, 'MatterBitmap']] = None) -> str:
struct_name = self.get_rust_struct_name()
func_name = f"decode_{escape_rust_keyword(convert_to_snake_case(self.name))}"
clean_id = self.id.replace('0x', '') if self.id.startswith('0x') else self.id
field_assignments_str = "\n".join(_generate_struct_field_assignments(self.fields, structs, enums, "item", bitmaps))
return f'''/// Decode {self.name} command response ({clean_id})
pub fn {func_name}(inp: &tlv::TlvItemValue) -> anyhow::Result<{struct_name}> {{
if let tlv::TlvItemValue::List(_fields) = inp {{
let item = tlv::TlvItem {{ tag: 0, value: inp.clone() }};
Ok({struct_name} {{
{field_assignments_str}
}})
}} else {{
Err(anyhow::anyhow!("Expected struct fields"))
}}
}}'''