use std::collections::BTreeSet;
use std::path::PathBuf;
use super::CALL_ARENA_BUF_LEN;
use super::CodeGenerator;
use super::GeneratedFile;
use super::GeneratedFiles;
use super::collect_peer_contracts;
use super::peer_min_version;
use crate::ir::AbiBuiltin;
use crate::ir::EnumDef;
use crate::ir::EnumVariant;
use crate::ir::PrimitiveType;
use crate::ir::ReprType;
use crate::ir::ResolvedBundle;
use crate::ir::ResolvedContract;
use crate::ir::ResolvedFunction;
use crate::ir::ResolvedHostContract;
use crate::ir::ResolvedParam;
use crate::ir::ResolvedPlugin;
use crate::ir::ResolvedType;
use crate::ir::ResolvedTypeRef;
use crate::ir::ValidatedIr;
use polyplug_codegen::PolyplugcError;
pub(crate) struct PythonGenerator;
const PY_HEADER: &str = "# THIS FILE IS AUTO-GENERATED BY polyplugc\n\
# DO NOT EDIT BY HAND\n\
# Re-generate with: polyplugc generate --api api.toml --lang python --out <dir>\n\n";
impl CodeGenerator for PythonGenerator {
fn generate_host(
&self,
ir: &ValidatedIr,
files: &mut GeneratedFiles,
) -> Result<(), PolyplugcError> {
let types_py: String = generate_host_types_file(ir);
let types_pyi: String = generate_host_types_stub(ir);
let callers_py: String = generate_host_callers_file(ir);
let callers_pyi: String = generate_host_callers_stub(ir);
files.files.push(GeneratedFile {
path: PathBuf::from("host/types.py"),
content: types_py,
force_regenerate: false,
});
files.files.push(GeneratedFile {
path: PathBuf::from("host/types.pyi"),
content: types_pyi,
force_regenerate: false,
});
files.files.push(GeneratedFile {
path: PathBuf::from("host/callers.py"),
content: callers_py,
force_regenerate: false,
});
files.files.push(GeneratedFile {
path: PathBuf::from("host/callers.pyi"),
content: callers_pyi,
force_regenerate: false,
});
if !ir.host_contracts.is_empty() {
let contracts_py: String = generate_host_contracts_file(ir);
let contracts_pyi: String = generate_host_contracts_stub(ir);
files.files.push(GeneratedFile {
path: PathBuf::from("host/contracts.py"),
content: contracts_py,
force_regenerate: false,
});
files.files.push(GeneratedFile {
path: PathBuf::from("host/contracts.pyi"),
content: contracts_pyi,
force_regenerate: false,
});
let interface_factories_py: String = generate_python_host_interface_factories_file(ir);
files.files.push(GeneratedFile {
path: PathBuf::from("host/interface_factories.py"),
content: interface_factories_py,
force_regenerate: false,
});
}
Ok(())
}
fn generate_guest(
&self,
ir: &ValidatedIr,
files: &mut GeneratedFiles,
) -> Result<(), PolyplugcError> {
let types_py: String = generate_python_types_file(ir);
let types_pyi: String = generate_python_types_stub(ir);
let contracts_py: String = generate_guest_contracts_file(ir);
let contracts_pyi: String = generate_guest_contracts_stub(ir);
files.files.push(GeneratedFile {
path: PathBuf::from("guest/types.py"),
content: types_py,
force_regenerate: false,
});
files.files.push(GeneratedFile {
path: PathBuf::from("guest/types.pyi"),
content: types_pyi,
force_regenerate: false,
});
files.files.push(GeneratedFile {
path: PathBuf::from("guest/contracts.py"),
content: contracts_py,
force_regenerate: false,
});
files.files.push(GeneratedFile {
path: PathBuf::from("guest/contracts.pyi"),
content: contracts_pyi,
force_regenerate: false,
});
let init_py: String = generate_init_py(ir);
files.files.push(GeneratedFile {
path: PathBuf::from("guest/init.py"),
content: init_py,
force_regenerate: false,
});
if ir.bundle.is_some() {
let manifest_content: String = generate_bundle_manifest_python(ir);
files.files.push(GeneratedFile {
path: PathBuf::from("manifest.toml"),
content: manifest_content,
force_regenerate: true,
});
}
if !ir.host_contracts.is_empty() {
let host_contracts_py: String = generate_guest_host_contracts_file(ir);
let host_contracts_pyi: String = generate_guest_host_contracts_stub(ir);
files.files.push(GeneratedFile {
path: PathBuf::from("guest/host_contracts.py"),
content: host_contracts_py,
force_regenerate: false,
});
files.files.push(GeneratedFile {
path: PathBuf::from("guest/host_contracts.pyi"),
content: host_contracts_pyi,
force_regenerate: false,
});
}
let peer_contracts: Vec<&ResolvedContract> = collect_peer_contracts(ir);
if !peer_contracts.is_empty() {
let peer_py: String = generate_guest_peer_callers_file(ir, &peer_contracts);
let peer_pyi: String = generate_guest_peer_callers_stub(ir, &peer_contracts);
files.files.push(GeneratedFile {
path: PathBuf::from("guest/peer_callers.py"),
content: peer_py,
force_regenerate: false,
});
files.files.push(GeneratedFile {
path: PathBuf::from("guest/peer_callers.pyi"),
content: peer_pyi,
force_regenerate: false,
});
}
Ok(())
}
}
fn generate_python_types_file(ir: &ValidatedIr) -> String {
let mut out: String = String::new();
out.push_str(PY_HEADER);
out.push_str("from __future__ import annotations\n");
out.push_str("import ctypes\n");
out.push_str("from typing import ClassVar\n");
out.push_str("from polyplug_abi import StringView\n\n");
if !ir.enums.is_empty() {
out.push_str("import enum\n\n");
}
for e in &ir.enums {
generate_python_enum(&mut out, e);
out.push('\n');
}
for ty in &ir.types {
generate_python_user_type(&mut out, ty);
out.push('\n');
}
for contract in &ir.contracts {
let struct_name: String = contract_name_to_struct(&contract.name);
for func in &contract.functions {
if needs_arg_pack(&func.params) {
emit_python_arg_pack_struct(&mut out, &struct_name, func, &ir.enums);
out.push('\n');
}
}
}
out
}
fn generate_python_types_stub(ir: &ValidatedIr) -> String {
let mut out: String = String::new();
out.push_str(PY_HEADER);
out.push_str("from __future__ import annotations\n");
out.push_str("import ctypes\n");
out.push_str("from typing import ClassVar\n");
out.push_str("from polyplug_abi import Buffer, StringView\n\n");
if !ir.enums.is_empty() {
out.push_str("import enum\n\n");
}
for e in &ir.enums {
generate_python_enum(&mut out, e);
out.push('\n');
}
for ty in &ir.types {
generate_python_user_type_stub(&mut out, ty);
out.push('\n');
}
for contract in &ir.contracts {
let struct_name: String = contract_name_to_struct(&contract.name);
for func in &contract.functions {
if needs_arg_pack(&func.params) {
emit_python_arg_pack_stub(&mut out, &struct_name, func);
out.push('\n');
}
}
}
out
}
fn generate_host_types_file(ir: &ValidatedIr) -> String {
let mut out: String = String::new();
out.push_str(PY_HEADER);
out.push_str("from __future__ import annotations\n");
out.push_str("import ctypes\n");
out.push_str("from typing import ClassVar\n\n");
out.push_str("from polyplug_abi import StringView\n\n");
if !ir.enums.is_empty() {
out.push_str("import enum\n\n");
}
for e in &ir.enums {
generate_python_enum(&mut out, e);
out.push('\n');
}
for ty in &ir.types {
generate_python_user_type(&mut out, ty);
out.push('\n');
}
for contract in &ir.contracts {
let struct_name: String = contract_name_to_struct(&contract.name);
for func in &contract.functions {
if needs_arg_pack(&func.params) {
emit_python_arg_pack_struct(&mut out, &struct_name, func, &ir.enums);
out.push('\n');
}
}
}
out
}
fn generate_host_types_stub(ir: &ValidatedIr) -> String {
let mut out: String = String::new();
out.push_str(PY_HEADER);
out.push_str("from __future__ import annotations\n");
out.push_str("import ctypes\n");
out.push_str("from typing import ClassVar\n\n");
out.push_str("from polyplug_abi import StringView\n\n");
if !ir.enums.is_empty() {
out.push_str("import enum\n\n");
}
for e in &ir.enums {
generate_python_enum(&mut out, e);
out.push('\n');
}
for ty in &ir.types {
generate_python_user_type_stub(&mut out, ty);
out.push('\n');
}
for contract in &ir.contracts {
let struct_name: String = contract_name_to_struct(&contract.name);
for func in &contract.functions {
if needs_arg_pack(&func.params) {
emit_python_arg_pack_stub(&mut out, &struct_name, func);
out.push('\n');
}
}
}
out
}
fn generate_host_callers_file(ir: &ValidatedIr) -> String {
let mut out: String = String::new();
out.push_str(PY_HEADER);
out.push_str("from __future__ import annotations\n");
out.push_str("import ctypes\n");
out.push_str("from typing import Any, Callable, Optional, TypeAlias\n\n");
let any_arena: bool = ir.contracts.iter().any(contract_needs_arena);
if any_arena {
out.push_str("from polyplug_abi import AbiErrorCode, ArenaOverflowBlock, CallArena, DispatchType, GuestContractInstance, GuestContractInterface, HostApi, StringView\n\n");
} else {
out.push_str("from polyplug_abi import AbiErrorCode, DispatchType, GuestContractInstance, GuestContractInterface, HostApi, StringView\n\n");
}
if any_arena {
out.push_str("# Size of each caller's inline call-arena buffer.\n");
out.push_str(&format!("CALL_ARENA_BUF_LEN: int = {CALL_ARENA_BUF_LEN}\n"));
out.push_str("_OVERFLOW_BLOCK_ALIGN: int = ctypes.sizeof(ctypes.c_void_p)\n\n");
out.push_str("def _arena_reset(arena: CallArena) -> None:\n");
out.push_str(" \"\"\"Rewind the arena for reuse: retain all overflow blocks.\n\n");
out.push_str(
" After reset, all pointers previously returned by arena allocations are invalid.\n",
);
out.push_str(" \"\"\"\n");
out.push_str(" arena.cur = arena.base\n");
out.push_str(" block: int = arena.first_overflow or 0\n");
out.push_str(" header_size: int = ctypes.sizeof(ArenaOverflowBlock)\n");
out.push_str(" while block:\n");
out.push_str(" hdr: ArenaOverflowBlock = ArenaOverflowBlock.from_address(block)\n");
out.push_str(" hdr.used = header_size\n");
out.push_str(" block = hdr.next or 0\n");
out.push('\n');
out.push_str("def _arena_free_all(arena: CallArena, host: ctypes.c_void_p) -> None:\n");
out.push_str(" \"\"\"Free every overflow block and clear the chain (teardown).\"\"\"\n");
out.push_str(" block: int = arena.first_overflow or 0\n");
out.push_str(" host_api: Any = ctypes.cast(host, ctypes.POINTER(HostApi))\n");
out.push_str(" while block:\n");
out.push_str(" hdr: ArenaOverflowBlock = ArenaOverflowBlock.from_address(block)\n");
out.push_str(" next_block: int = hdr.next or 0\n");
out.push_str(" capacity: int = hdr.capacity\n");
out.push_str(" if arena.host:\n");
out.push_str(
" host_api.contents.free(host, block, capacity, _OVERFLOW_BLOCK_ALIGN)\n",
);
out.push_str(" block = next_block\n");
out.push_str(" arena.first_overflow = None\n\n");
}
out.push_str("class ContractError(Exception):\n");
out.push_str(
" def __init__(self, message: str, code: int = AbiErrorCode.Generic) -> None:\n",
);
out.push_str(" super().__init__(message)\n");
out.push_str(" self.code: int = code\n\n");
out.push_str("# Contract ID constants\n");
for contract in &ir.contracts {
let upper_name: String = contract.name.to_uppercase().replace(['.', '-'], "_");
out.push_str(&format!(
"{}_CONTRACT_ID: int = 0x{:016X}\n",
upper_name, contract.contract_id
));
}
out.push('\n');
let type_imports: BTreeSet<String> = collect_python_type_imports(ir);
if !type_imports.is_empty() {
let import_list: String = type_imports.into_iter().collect::<Vec<String>>().join(", ");
out.push_str(&format!("from host.types import {}\n\n", import_list));
}
out.push_str("POLYPLUG_ABI_VERSION: int = 1\n");
out.push_str("class _AbiError(ctypes.Structure):\n");
out.push_str(" _fields_ = [\n");
out.push_str(" ('code', ctypes.c_uint32),\n");
out.push_str(" ('_pad', ctypes.c_uint32),\n");
out.push_str(" ('message_ptr', ctypes.c_void_p),\n");
out.push_str(" ('message_len', ctypes.c_size_t),\n");
out.push_str(" ]\n\n");
out.push_str(
"_DISPATCH_FN_CTYPE = ctypes.CFUNCTYPE(None, GuestContractInstance, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p)\n",
);
out.push_str(
"_DISPATCH_FN_TYPE: TypeAlias = Callable[[GuestContractInstance, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p], None]\n\n"
);
for contract in &ir.contracts {
generate_host_caller_class(&mut out, contract, &ir.enums);
}
out
}
fn generate_host_callers_stub(ir: &ValidatedIr) -> String {
let mut out: String = String::new();
out.push_str(PY_HEADER);
out.push_str("from __future__ import annotations\n");
out.push_str("import ctypes\n");
out.push_str("from typing import Callable, Optional\n\n");
out.push_str("from polyplug_abi import AbiErrorCode, GuestContractInstance, GuestContractInterface, HostApi, StringView\n\n");
out.push_str("class ContractError(Exception): ...\n\n");
for contract in &ir.contracts {
let upper_name: String = contract.name.to_uppercase().replace(['.', '-'], "_");
out.push_str(&format!("{}_CONTRACT_ID: int\n", upper_name));
}
out.push('\n');
let type_imports: BTreeSet<String> = collect_python_type_imports(ir);
if !type_imports.is_empty() {
let import_list: String = type_imports.into_iter().collect::<Vec<String>>().join(", ");
out.push_str(&format!("from host.types import {}\n\n", import_list));
}
out.push_str("POLYPLUG_ABI_VERSION: int\n");
out.push_str("_DISPATCH_FN_TYPE = Callable[[GuestContractInstance, ctypes.c_void_p, ctypes.c_void_p], int]\n\n");
for contract in &ir.contracts {
generate_host_caller_class_stub(&mut out, contract);
}
out
}
fn generate_host_caller_class_stub(out: &mut String, contract: &ResolvedContract) {
let struct_name: String = contract_name_to_struct(&contract.name);
let caller_name: String = format!("{struct_name}Caller");
out.push_str(&format!("class {caller_name}:\n"));
out.push_str(
" def __init__(self, handle: int, host: ctypes.c_void_p, owner: object | None = None) -> None: ...\n",
);
out.push_str(" def __del__(self) -> None: ...\n");
out.push_str(" @classmethod\n");
out.push_str(
" def create(cls, handle: int, host: ctypes.c_void_p, owner: object | None = None) -> Optional[Self]: ...\n",
);
out.push_str(" def is_valid(self) -> bool: ...\n");
out.push_str(" def reset(self) -> None: ...\n");
out.push_str(" def __bool__(self) -> bool: ...\n");
for func in &contract.functions {
generate_host_caller_stub_method(out, func);
}
out.push('\n');
}
fn generate_guest_contracts_file(ir: &ValidatedIr) -> String {
let mut out: String = String::new();
out.push_str(PY_HEADER);
out.push_str("from __future__ import annotations\n");
out.push_str("import ctypes\n");
let any_struct_const: bool = ir
.contracts
.iter()
.any(|c: &ResolvedContract| c.functions.iter().any(fn_uses_struct_const));
if any_struct_const {
out.push_str("import struct\n");
}
let any_string_arg: bool = ir
.contracts
.iter()
.any(|c: &ResolvedContract| c.functions.iter().any(fn_has_stringview_arg));
let any_string_ret: bool = ir
.contracts
.iter()
.any(|c: &ResolvedContract| c.functions.iter().any(fn_returns_stringview));
if any_string_arg {
out.push_str("from polyplug_abi import StringView, to_str\n");
}
out.push_str("from typing import Any, Callable\n");
if any_string_ret {
out.push_str(
"from polyplug_guest import register_contract, alloc_string_arena, AbiError, AbiErrorCode\n\n",
);
} else {
out.push_str("from polyplug_guest import register_contract, AbiError, AbiErrorCode\n\n");
}
let type_imports: BTreeSet<String> = collect_python_type_imports(ir);
if !type_imports.is_empty() {
let import_list: String = type_imports.into_iter().collect::<Vec<String>>().join(", ");
out.push_str(&format!("from guest.types import {}\n\n", import_list));
}
out.push_str("POLYPLUG_ABI_VERSION: int = 1\n\n");
let mut registrations: Vec<(String, String, String, &ResolvedContract)> = Vec::new();
if let Some(bundle) = &ir.bundle {
for plugin in &bundle.plugins {
for contract_impl in &plugin.implements {
if let Some(contract) = ir.contracts.iter().find(|c| {
let contract_full =
format!("{}@{}.{}", c.name, c.version.major, c.version.minor);
&contract_full == contract_impl
}) {
generate_guest_plugin_trait(&mut out, &plugin.name, contract);
generate_guest_plugin_interface(&mut out, &plugin.name, contract);
let plugin_lower: String = plugin.name.to_lowercase().replace('.', "_");
registrations.push((
plugin.name.clone(),
plugin_lower.clone(),
plugin_lower,
contract,
));
}
}
}
} else {
for contract in &ir.contracts {
generate_guest_contract_trait(&mut out, contract);
generate_guest_contract_interface(&mut out, contract);
let lower: String = contract.name.replace('.', "_");
let upper: String = contract_name_to_upper_snake(&contract.name);
registrations.push((contract.name.clone(), lower, upper, contract));
}
}
out.push_str("def polyplug_abi_version() -> int:\n");
out.push_str(" return 1\n\n");
out.push_str(
"def polyplug_init(host_ptr: int, ctx_ptr: int) -> tuple[list[dict[str, Any]], AbiError]:\n",
);
out.push_str(
" \"\"\"Build this bundle's contract registrations for the polyplug_python VM loader.\n",
);
out.push('\n');
out.push_str(
" Returns a `(registrations, abi_error)` tuple: the loader reads this return\n",
);
out.push_str(
" value (nothing is deposited into the module namespace) and, when `abi_error.code\n",
);
out.push_str(
" == AbiErrorCode.Ok`, registers each contract in `registrations` itself. A factory\n",
);
out.push_str(
" that was never set at import time yields `([], AbiError(code=AbiErrorCode.Generic))`.\n\n",
);
out.push_str(" Args:\n");
out.push_str(" host_ptr: HostApi pointer — passed to each author factory so every\n");
out.push_str(
" implementation is constructed with its owning runtime's host\n",
);
out.push_str(" (no host pointer is stored in the guest SDK).\n");
out.push_str(" ctx_ptr: BundleInitContext pointer (unused)\n\n");
out.push_str(" Returns:\n");
out.push_str(" (registrations, abi_error) — the loader consumes registrations only\n");
out.push_str(" when abi_error.code == AbiErrorCode.Ok.\n");
out.push_str(" \"\"\"\n");
out.push_str(" _ = host_ptr\n");
out.push_str(" _ = ctx_ptr\n");
out.push_str(" registrations: list[dict[str, Any]] = []\n");
for (_, _, impl_var, _) in ®istrations {
out.push_str(&format!(" if _{impl_var}_FACTORY is None:\n"));
out.push_str(" return [], AbiError(code=int(AbiErrorCode.Generic))\n");
}
for (display_name, lower, impl_var, contract) in ®istrations {
let contract_str: String = format!("{}@{}", contract.name, contract.version.major);
out.push_str(" register_contract(\n");
out.push_str(" registrations,\n");
out.push_str(&format!(" contract=\"{contract_str}\",\n"));
out.push_str(&format!(" factory=_{impl_var}_FACTORY,\n"));
out.push_str(" functions=[\n");
for func in &contract.functions {
out.push_str(&format!(" {lower}_{}_abi,\n", func.name));
}
out.push_str(" ],\n");
out.push_str(&format!(" plugin_name=\"{display_name}\",\n"));
out.push_str(" )\n");
}
out.push_str(" return registrations, AbiError(code=int(AbiErrorCode.Ok))\n");
out.push('\n');
out
}
fn fn_returns_stringview(func: &ResolvedFunction) -> bool {
matches!(
func.returns,
Some(ResolvedTypeRef::AbiType(AbiBuiltin::StringView))
)
}
fn fn_has_stringview_arg(func: &ResolvedFunction) -> bool {
func.params
.iter()
.any(|p: &ResolvedParam| matches!(p.ty, ResolvedTypeRef::AbiType(AbiBuiltin::StringView)))
}
fn fn_uses_struct_const(func: &ResolvedFunction) -> bool {
let scalar_args: bool = uses_scalar_args_struct(&func.params);
let scalar_ret: bool = func
.returns
.as_ref()
.is_some_and(|ty: &ResolvedTypeRef| scalar_struct_format_char(ty).is_some());
scalar_args || scalar_ret
}
fn generate_guest_contracts_stub(ir: &ValidatedIr) -> String {
let mut out: String = String::new();
out.push_str(PY_HEADER);
out.push_str("from __future__ import annotations\n");
out.push_str("from typing import Any\n");
out.push_str("from polyplug_guest import AbiError\n\n");
let type_imports: BTreeSet<String> = collect_python_user_struct_imports(ir);
if !type_imports.is_empty() {
let import_list: String = type_imports.into_iter().collect::<Vec<String>>().join(", ");
out.push_str(&format!("from guest.types import {}\n\n", import_list));
}
out.push_str("POLYPLUG_ABI_VERSION: int\n\n");
for contract in &ir.contracts {
let trait_name: String = contract_name_to_guest_trait(&contract.name);
out.push_str(&format!("class {trait_name}:\n"));
for func in &contract.functions {
generate_guest_trait_stub_method(&mut out, func);
}
out.push('\n');
let upper: String = contract_name_to_upper_snake(&contract.name);
out.push_str(&format!(
"def set_{upper}_impl(impl: {trait_name}) -> None: ...\n\n"
));
}
out.push_str("def polyplug_abi_version() -> int: ...\n");
out.push_str(
"def polyplug_init(host_ptr: int, ctx_ptr: int) -> tuple[list[dict[str, Any]], AbiError]:\n",
);
out.push_str(
" \"\"\"Initialize the plugin and return its (registrations, AbiError).\"\"\"\n",
);
out.push_str(" ...\n");
out
}
fn generate_init_py(_ir: &ValidatedIr) -> String {
let mut out: String = String::new();
out.push_str("# THIS FILE IS AUTO-GENERATED BY polyplugc. DO NOT EDIT.\n");
out.push_str(
"# Re-generate with: polyplugc generate --bundle bundle.toml --lang python --out <dir>\n\n",
);
out
}
fn generate_python_user_type(out: &mut String, ty: &ResolvedType) {
out.push_str(&format!("class {}(ctypes.Structure):\n", ty.name));
out.push_str(" _fields_: ClassVar = [\n");
for field in &ty.fields {
let field_ty: String = python_type_name(&field.ty);
out.push_str(&format!(" (\"{}\", {}),\n", field.name, field_ty));
}
out.push_str(" ]\n");
}
fn generate_python_user_type_stub(out: &mut String, ty: &ResolvedType) {
out.push_str(&format!("class {}(ctypes.Structure):\n", ty.name));
for field in &ty.fields {
let field_ty: String = python_type_name(&field.ty);
out.push_str(&format!(" {}: {}\n", field.name, field_ty));
}
out.push_str(" _fields_: ClassVar[list[tuple[str, type]]]\n");
}
fn emit_python_arg_pack_struct(
out: &mut String,
contract_struct: &str,
func: &ResolvedFunction,
enums: &[EnumDef],
) {
let struct_name: String = arg_pack_struct_name(contract_struct, &func.name);
out.push_str(&format!("class {}(ctypes.Structure):\n", struct_name));
out.push_str(" _fields_: ClassVar = [\n");
for param in &func.params {
let param_ty: String = python_pack_field_type(¶m.ty, enums);
out.push_str(&format!(" (\"{}\", {}),\n", param.name, param_ty));
}
out.push_str(" ]\n");
}
fn emit_python_arg_pack_stub(out: &mut String, contract_struct: &str, func: &ResolvedFunction) {
let struct_name: String = arg_pack_struct_name(contract_struct, &func.name);
out.push_str(&format!("class {}(ctypes.Structure):\n", struct_name));
for param in &func.params {
let param_ty: String = python_type_name(¶m.ty);
out.push_str(&format!(" {}: {}\n", param.name, param_ty));
}
out.push_str(" _fields_: ClassVar[list[tuple[str, type]]]\n");
}
fn generate_host_caller_class(out: &mut String, contract: &ResolvedContract, enums: &[EnumDef]) {
let struct_name: String = contract_name_to_struct(&contract.name);
let caller_name: String = format!("{struct_name}Caller");
let contract_upper: String = contract.name.to_uppercase().replace(['.', '-'], "_");
let _contract_id_const: String = format!("{}_CONTRACT_ID", contract_upper);
let needs_arena: bool = contract_needs_arena(contract);
out.push_str(&format!("class {caller_name}:\n"));
out.push_str(&format!(
" \"\"\"Host caller for contract `{}` with instance lifecycle management.\n\n",
contract.name
));
out.push_str(" RAII wrapper that manages instance lifecycle:\n");
out.push_str(" - `create()`: resolves handle and calls `create_instance`\n");
out.push_str(" - `__del__`: calls `destroy_instance` to clean up\n");
out.push_str(" - dispatch: passes `_instance` to all method calls\n");
out.push_str(" \"\"\"\n\n");
out.push_str(
" def __init__(self, handle: int, host: ctypes.c_void_p, owner: object | None = None) -> None:\n",
);
out.push_str(" \"\"\"Create instance wrapper from handle and host interface.\n\n");
out.push_str(" Args:\n");
out.push_str(" handle: Contract handle from find_guest_contract\n");
out.push_str(" host: Host interface pointer\n");
out.push_str(" owner: Python object that owns `host` (e.g. a polyplug.Runtime).\n");
out.push_str(" Held for this caller's lifetime so the runtime cannot be\n");
out.push_str(" finalized before __del__ runs (which tears down the instance\n");
out.push_str(
" and arena through that host). When None, the embedder must keep\n",
);
out.push_str(" the runtime alive past this caller.\n\n");
out.push_str(" Raises:\n");
out.push_str(" ValueError: If interface not found or create_instance failed\n");
out.push_str(" \"\"\"\n");
out.push_str(" # Resolve the interface from the handle via HostApi method\n");
out.push_str(" # Cast host to HostApi pointer and call resolve_guest_contract\n");
out.push_str(" host_iface: ctypes.POINTER(HostApi) = ctypes.cast(host, ctypes.POINTER(HostApi))\n");
out.push_str(" self._interface: ctypes.c_void_p = host_iface.contents.resolve_guest_contract(host, handle)\n");
out.push_str(" if not self._interface:\n");
out.push_str(" raise ValueError(\"Contract not found\")\n");
out.push_str(" # Create instance via host-mediated lifecycle so the runtime tracks\n");
out.push_str(" # it. A null instance is valid: stateless contracts return a null\n");
out.push_str(
" # handle and use it as an opaque dispatch token. Validity is keyed off\n",
);
out.push_str(" # the interface pointer, never off instance data.\n");
out.push_str(" self._instance = GuestContractInstance()\n");
out.push_str(" host_iface.contents.create_guest_instance(host, self._interface, None, ctypes.byref(self._instance))\n");
out.push_str(" self._host: ctypes.c_void_p = host\n");
out.push_str(
" # Retain the handle so the cache can re-resolve after a hot-reload (which\n",
);
out.push_str(
" # swaps a new interface into the same slot) or report a gone contract.\n",
);
out.push_str(" self._handle: int = handle\n");
out.push_str(
" # Fetch the registry revision counter ONCE, then read its current value, so\n",
);
out.push_str(
" # every later call can detect a reload/unload with a direct atomic load (one\n",
);
out.push_str(
" # aligned 64-bit load through the cached pointer, no call into the runtime)\n",
);
out.push_str(" # and re-resolve before dispatching.\n");
out.push_str(
" self._revision_ptr: int = host_iface.contents.revision_counter(host) or 0\n",
);
out.push_str(" self._cached_revision: int = self._live_revision()\n");
out.push_str(
" # Pin the runtime: refcounting then guarantees the owner outlives this\n",
);
out.push_str(" # caller, so __del__ tears down through a still-live host.\n");
out.push_str(" self._owner: object | None = owner\n");
if needs_arena {
out.push_str(
" # Per-caller call arena. create_string_buffer is C-heap memory (not the\n",
);
out.push_str(
" # managed heap), so cross-boundary arena data stays off Python's GC.\n",
);
out.push_str(
" self._arena_buf: Any = ctypes.create_string_buffer(CALL_ARENA_BUF_LEN)\n",
);
out.push_str(
" buf_addr: int = ctypes.cast(self._arena_buf, ctypes.c_void_p).value or 0\n",
);
out.push_str(" self._arena: CallArena = CallArena(\n");
out.push_str(" cur=buf_addr,\n");
out.push_str(" end=buf_addr + CALL_ARENA_BUF_LEN,\n");
out.push_str(" base=buf_addr,\n");
out.push_str(" host=ctypes.cast(host, ctypes.c_void_p).value or 0,\n");
out.push_str(" first_overflow=None,\n");
out.push_str(" )\n");
}
out.push('\n');
out.push_str(" def __del__(self) -> None:\n");
out.push_str(
" \"\"\"Destroy instance via host-mediated destroy_guest_instance.\"\"\"\n",
);
out.push_str(" # SAFETY: the interface pointer is valid for the wrapper lifetime;\n");
out.push_str(
" # destroy_guest_instance tolerates a null instance for stateless contracts.\n",
);
out.push_str(
" # If the registry changed since we resolved, the cached interface and instance\n",
);
out.push_str(
" # are stale — a reload/unload reclaimed their backing — so destroying through\n",
);
out.push_str(
" # the dead interface would be UB; the reload/unload already reclaimed the\n",
);
out.push_str(" # instance, so skip the destroy entirely.\n");
out.push_str(
" if getattr(self, \"_interface\", None) and self._live_revision() == self._cached_revision:\n",
);
out.push_str(" host_iface: ctypes.POINTER(HostApi) = ctypes.cast(self._host, ctypes.POINTER(HostApi))\n");
out.push_str(" host_iface.contents.destroy_guest_instance(self._host, self._interface, self._instance)\n");
out.push_str(" self._interface = None # Prevent reuse after cleanup.\n");
if needs_arena {
out.push_str(" # Free any overflow blocks the arena still holds before teardown.\n");
out.push_str(" if getattr(self, \"_arena\", None) is not None:\n");
out.push_str(" _arena_free_all(self._arena, self._host)\n");
}
out.push_str(" # Release the runtime pin now that teardown has completed.\n");
out.push_str(" self._owner = None\n");
out.push('\n');
out.push_str(" @classmethod\n");
out.push_str(
" def create(cls, handle: int, host: ctypes.c_void_p, owner: object | None = None) -> Optional[Self]:\n",
);
out.push_str(" \"\"\"Factory method - creates instance or None if failed.\n\n");
out.push_str(" Args:\n");
out.push_str(" handle: Contract handle from find_guest_contract\n");
out.push_str(" host: Host interface pointer\n");
out.push_str(" owner: Python object that owns `host` (e.g. a polyplug.Runtime).\n");
out.push_str(" The caller keeps a reference for its lifetime so the runtime\n");
out.push_str(" cannot be finalized before __del__ runs (which calls\n");
out.push_str(" destroy_instance and arena teardown through the runtime). If\n");
out.push_str(
" None, the embedder is responsible for keeping the runtime alive\n",
);
out.push_str(" past the caller.\n\n");
out.push_str(" Returns:\n");
out.push_str(" Self if interface found and instance created, None otherwise\n");
out.push_str(" \"\"\"\n");
out.push_str(" try:\n");
out.push_str(" return cls(handle, host, owner)\n");
out.push_str(" except ValueError:\n");
out.push_str(" return None\n\n");
out.push_str(" def is_valid(self) -> bool:\n");
out.push_str(" \"\"\"Check if this caller holds a resolved contract interface.\"\"\"\n");
out.push_str(" return bool(getattr(self, \"_interface\", None))\n\n");
out.push_str(" def _live_revision(self) -> int:\n");
out.push_str(
" \"\"\"Read the registry revision through the cached pointer — one aligned\n",
);
out.push_str(" atomic load, no call into the runtime. Returns the cached value (i.e.\n");
out.push_str(" \"unchanged\") when there is no counter (null host/runtime), so the\n");
out.push_str(" staleness check is then a no-op.\n");
out.push_str(" \"\"\"\n");
out.push_str(" if not self._revision_ptr:\n");
out.push_str(" return self._cached_revision\n");
out.push_str(" return ctypes.c_uint64.from_address(self._revision_ptr).value\n\n");
out.push_str(" def _revalidate(self) -> bool:\n");
out.push_str(
" \"\"\"Re-resolve the cached interface after the registry changed under us.\n\n",
);
out.push_str(
" A hot-reload swapped a new interface into the same slot, so the retained\n",
);
out.push_str(
" handle still resolves — to the new interface; an unload vacated the slot,\n",
);
out.push_str(
" so it resolves to null and False is returned (the contract is gone).\n\n",
);
out.push_str(" The old instance is ABANDONED, never destroyed: after a reload its\n");
out.push_str(
" interface — and the guest-side state it created — is already epoch-reclaimed,\n",
);
out.push_str(" so calling the dead interface's destroy_instance would be UB. A fresh\n");
out.push_str(" instance is created on the new interface.\n");
out.push_str(" \"\"\"\n");
out.push_str(" host_iface: ctypes.POINTER(HostApi) = ctypes.cast(self._host, ctypes.POINTER(HostApi))\n");
out.push_str(
" interface: ctypes.c_void_p = host_iface.contents.resolve_guest_contract(self._host, self._handle)\n",
);
out.push_str(" if not interface:\n");
out.push_str(" return False\n");
out.push_str(" new_instance = GuestContractInstance()\n");
out.push_str(
" host_iface.contents.create_guest_instance(self._host, interface, None, ctypes.byref(new_instance))\n",
);
out.push_str(" self._interface = interface\n");
out.push_str(" self._instance = new_instance\n");
out.push_str(" self._cached_revision = self._live_revision()\n");
out.push_str(" return True\n\n");
out.push_str(" def reset(self) -> None:\n");
out.push_str(
" \"\"\"Destroy current instance and create a new one (host-mediated).\"\"\"\n",
);
out.push_str(
" # If the registry changed under us, the cached interface/instance are stale\n",
);
out.push_str(
" # (a reload/unload reclaimed their backing). _revalidate abandons the dead\n",
);
out.push_str(
" # instance and builds a fresh one on the current interface — exactly the\n",
);
out.push_str(
" # fresh instance reset() promises — so defer to it and skip the unsafe destroy.\n",
);
out.push_str(" if self._live_revision() != self._cached_revision:\n");
out.push_str(" self._revalidate()\n");
out.push_str(" return\n");
out.push_str(" host_iface: ctypes.POINTER(HostApi) = ctypes.cast(self._host, ctypes.POINTER(HostApi))\n");
out.push_str(" host_iface.contents.destroy_guest_instance(self._host, self._interface, self._instance)\n");
out.push_str(" self._instance = GuestContractInstance()\n");
out.push_str(
" host_iface.contents.create_guest_instance(self._host, self._interface, None, ctypes.byref(self._instance))\n\n",
);
out.push_str(" def __bool__(self) -> bool:\n");
out.push_str(" return self.is_valid()\n\n");
for func in &contract.functions {
generate_host_caller_method(out, func, &struct_name, enums);
}
out.push('\n');
}
fn generate_host_caller_method(
out: &mut String,
func: &ResolvedFunction,
contract_struct: &str,
enums: &[EnumDef],
) {
let fn_id: u32 = func.function_id;
let needs_arena: bool = fn_needs_arena(func);
let sig_params: String = build_python_sig_params(func);
let ret_type: String = python_return_type(&func.returns);
out.push_str(&format!(
" def {}(self{}) -> {}:\n",
func.name, sig_params, ret_type
));
out.push_str(
" if self._live_revision() != self._cached_revision and not self._revalidate():\n",
);
out.push_str(
" raise ContractError(\"contract not found\", AbiErrorCode.NotFound)\n",
);
if needs_arena {
out.push_str(
" # Returns a value borrowing this caller's arena; it stays valid until\n",
);
out.push_str(" # the next arena-backed call on this caller.\n");
out.push_str(
" # Rewind the arena at call start: retains overflow blocks for reuse,\n",
);
out.push_str(" # invalidating all pointers from the previous call.\n");
out.push_str(" _arena_reset(self._arena)\n");
}
emit_python_host_args_setup(out, func, contract_struct, enums);
emit_python_host_out_setup(out, &func.returns, enums);
out.push_str(" # SAFETY: the interface pointer is valid for the wrapper lifetime.\n");
out.push_str(" iface_ptr: ctypes.POINTER(GuestContractInterface) = ctypes.cast(self._interface, ctypes.POINTER(GuestContractInterface))\n");
out.push_str(" interface: GuestContractInterface = iface_ptr.contents\n");
out.push_str(
" # Out-param ABI: dispatch writes the AbiError through a trailing pointer.\n",
);
out.push_str(" err: _AbiError = _AbiError()\n");
out.push_str(" if interface.dispatch_type == DispatchType.Native:\n");
out.push_str(&format!(
" if {fn_id} >= interface.dispatch.native.function_count:\n"
));
out.push_str(" raise ContractError(\"function not available in interface\", AbiErrorCode.FunctionNotAvailable)\n");
out.push_str(" functions_ptr: int = interface.dispatch.native.functions\n");
out.push_str(&format!(
" fn_ptr: int = ctypes.cast(functions_ptr + {fn_id} * 8, ctypes.POINTER(ctypes.c_void_p)).contents.value\n"
));
out.push_str(
" dispatch_fn: _DISPATCH_FN_CTYPE = ctypes.cast(fn_ptr, _DISPATCH_FN_CTYPE)\n",
);
out.push_str(
" # SAFETY: instance is valid for the wrapper lifetime; args_ptr points\n",
);
out.push_str(
" # to valid args, out_ptr to a valid return-type buffer per the ABI contract.\n",
);
out.push_str(" dispatch_fn(self._instance, args_ptr, out_ptr, ctypes.byref(err))\n");
out.push_str(" else:\n");
out.push_str(
" # SAFETY: the union's vm variant is active per dispatch_type; args/out\n",
);
if needs_arena {
out.push_str(
" # are valid per the ABI contract. The arena was reset at call start.\n",
);
out.push_str(&format!(
" interface.dispatch.vm.call(interface.dispatch.vm.loader_data, self._instance, {fn_id}, args_ptr, out_ptr, ctypes.byref(self._arena), ctypes.byref(err))\n"
));
} else {
out.push_str(
" # are valid per the ABI contract. The null arena selects the host->alloc fallback.\n",
);
out.push_str(&format!(
" interface.dispatch.vm.call(interface.dispatch.vm.loader_data, self._instance, {fn_id}, args_ptr, out_ptr, None, ctypes.byref(err))\n"
));
}
out.push_str(" if err.code != AbiErrorCode.Ok:\n");
out.push_str(
" raise ContractError(f\"polyplug call failed (code {err.code})\", err.code)\n",
);
if has_return_value(&func.returns) {
out.push_str(&format!(
" return {}\n\n",
python_host_caller_return_expr(&func.returns, enums)
));
} else {
out.push_str(" return None\n\n");
}
}
fn generate_host_caller_stub_method(out: &mut String, func: &ResolvedFunction) {
let sig_params: String = build_python_sig_params(func);
let ret_type: String = python_return_type(&func.returns);
out.push_str(&format!(
" def {}(self{}) -> {}: ...\n",
func.name, sig_params, ret_type
));
}
fn build_python_sig_params(func: &ResolvedFunction) -> String {
if func.params.is_empty() {
return String::new();
}
if func.params.len() == 1 {
let param: &ResolvedParam = &func.params[0];
let ty_name: String = python_type_name(¶m.ty);
return format!(", {}: {}", param.name, ty_name);
}
let params: Vec<String> = func
.params
.iter()
.map(|p: &ResolvedParam| format!(", {}: {}", p.name, python_type_name(&p.ty)))
.collect();
params.join("")
}
fn emit_python_host_args_setup(
out: &mut String,
func: &ResolvedFunction,
contract_struct: &str,
enums: &[EnumDef],
) {
if func.params.is_empty() {
out.push_str(" args_ptr: ctypes.c_void_p = ctypes.c_void_p()\n");
return;
}
if func.params.len() == 1 {
let param: &ResolvedParam = &func.params[0];
let ty_name: String = python_type_name(¶m.ty);
match ¶m.ty {
ResolvedTypeRef::UserDefined(_)
| ResolvedTypeRef::AbiType(AbiBuiltin::StringView | AbiBuiltin::Buffer) => {
if let Some(e) = python_enum_for_type(¶m.ty, enums) {
let raw_ctype: &str = python_ctype_for_repr(&e.repr);
out.push_str(&format!(
" {name}_val: {raw_ctype} = {raw_ctype}(int({name}))\n",
name = param.name
));
out.push_str(&format!(
" args_ptr: ctypes.c_void_p = ctypes.cast(ctypes.byref({name}_val), ctypes.c_void_p)\n",
name = param.name
));
} else {
out.push_str(&format!(
" args_ptr: ctypes.c_void_p = ctypes.cast(ctypes.byref({}), ctypes.c_void_p)\n",
param.name
));
}
}
_ => {
out.push_str(&format!(
" {name}_val: {ty} = {ty}({name})\n",
name = param.name,
ty = ty_name
));
out.push_str(&format!(
" args_ptr: ctypes.c_void_p = ctypes.cast(ctypes.byref({name}_val), ctypes.c_void_p)\n",
name = param.name
));
}
}
return;
}
let pack_struct: String = arg_pack_struct_name(contract_struct, &func.name);
out.push_str(&format!(
" args_val: {pack_struct} = {pack_struct}()\n"
));
for param in &func.params {
out.push_str(&format!(" args_val.{0} = {0}\n", param.name));
}
out.push_str(" args_ptr: ctypes.c_void_p = ctypes.cast(ctypes.byref(args_val), ctypes.c_void_p)\n");
}
fn emit_python_host_out_setup(
out: &mut String,
returns: &Option<ResolvedTypeRef>,
enums: &[EnumDef],
) {
if !has_return_value(returns) {
out.push_str(" out_ptr: ctypes.c_void_p = ctypes.c_void_p()\n");
return;
}
if let Some(e) = returns
.as_ref()
.and_then(|ret: &ResolvedTypeRef| python_enum_for_type(ret, enums))
{
let raw_ctype: &str = python_ctype_for_repr(&e.repr);
out.push_str(&format!(" out_val: {raw_ctype} = {raw_ctype}()\n"));
out.push_str(
" out_ptr: ctypes.c_void_p = ctypes.cast(ctypes.byref(out_val), ctypes.c_void_p)\n",
);
return;
}
let ret_ty: String = python_return_type(returns);
out.push_str(&format!(" out_val: {ret_ty} = {ret_ty}()\n"));
out.push_str(
" out_ptr: ctypes.c_void_p = ctypes.cast(ctypes.byref(out_val), ctypes.c_void_p)\n",
);
}
fn python_host_caller_return_expr(returns: &Option<ResolvedTypeRef>, enums: &[EnumDef]) -> String {
match returns
.as_ref()
.and_then(|ret: &ResolvedTypeRef| python_enum_for_type(ret, enums))
{
Some(e) => format!("{}(out_val.value)", e.name),
None => "out_val".to_owned(),
}
}
fn collect_python_type_imports(ir: &ValidatedIr) -> BTreeSet<String> {
let mut imports: BTreeSet<String> = BTreeSet::new();
for ty in &ir.types {
imports.insert(ty.name.clone());
}
for contract in &ir.contracts {
let struct_name: String = contract_name_to_struct(&contract.name);
for func in &contract.functions {
if needs_arg_pack(&func.params) {
imports.insert(arg_pack_struct_name(&struct_name, &func.name));
}
collect_type_refs(&mut imports, &func.params, &func.returns);
}
}
imports
}
fn collect_python_user_struct_imports(ir: &ValidatedIr) -> BTreeSet<String> {
let mut imports: BTreeSet<String> = BTreeSet::new();
for contract in &ir.contracts {
for func in &contract.functions {
for param in &func.params {
if let ResolvedTypeRef::UserDefined(name) = ¶m.ty {
imports.insert(name.clone());
}
}
if let Some(ResolvedTypeRef::UserDefined(name)) = &func.returns {
imports.insert(name.clone());
}
}
}
imports
}
fn collect_type_refs(
imports: &mut BTreeSet<String>,
params: &[ResolvedParam],
returns: &Option<ResolvedTypeRef>,
) {
for param in params {
if let ResolvedTypeRef::UserDefined(name) = ¶m.ty {
imports.insert(name.clone());
}
}
if let Some(ResolvedTypeRef::UserDefined(name)) = returns {
imports.insert(name.clone());
}
}
fn generate_guest_contract_trait(out: &mut String, contract: &ResolvedContract) {
let trait_name: String = contract_name_to_guest_trait(&contract.name);
out.push_str(&format!("class {}:\n", trait_name));
for func in &contract.functions {
generate_guest_trait_method(out, func);
}
let upper: String = contract_name_to_upper_snake(&contract.name);
emit_python_factory_slot(out, &upper, &trait_name);
}
fn emit_python_factory_slot(out: &mut String, var: &str, class_name: &str) {
out.push_str(&format!(
"\n_{var}_FACTORY: Callable[[int], {class_name}] | None = None\n"
));
out.push_str(&format!(
"def set_{var}_factory(factory: Callable[[int], {class_name}]) -> None:\n"
));
out.push_str(" \"\"\"Register the author factory; call once at module import time.\n\n");
out.push_str(" The loader calls the factory with the HostApi pointer once per instance\n");
out.push_str(" (create_instance), so each implementation is constructed with its owning\n");
out.push_str(" runtime's host pointer and two live instances never share state.\n");
out.push_str(" \"\"\"\n");
out.push_str(&format!(" global _{var}_FACTORY\n"));
out.push_str(&format!(" _{var}_FACTORY = factory\n\n"));
}
fn generate_guest_plugin_trait(out: &mut String, plugin_name: &str, contract: &ResolvedContract) {
let plugin_lower: String = plugin_name.to_lowercase().replace('.', "_");
let class_name: String = plugin_guest_trait_name(plugin_name, &contract.name);
out.push_str(&format!("class {class_name}:\n"));
for func in &contract.functions {
generate_guest_trait_method(out, func);
}
emit_python_factory_slot(out, &plugin_lower, &class_name);
}
fn generate_guest_trait_method(out: &mut String, func: &ResolvedFunction) {
let sig_params: String = build_python_guest_impl_sig_params(func);
let ret_type: String = python_guest_impl_return_type(&func.returns);
out.push_str(&format!(
" def {}(self{}) -> {}:\n",
func.name, sig_params, ret_type
));
out.push_str(" raise NotImplementedError\n");
}
fn generate_guest_trait_stub_method(out: &mut String, func: &ResolvedFunction) {
let sig_params: String = build_python_guest_impl_sig_params(func);
let ret_type: String = python_guest_impl_return_type(&func.returns);
out.push_str(&format!(
" def {}(self{}) -> {}: ...\n",
func.name, sig_params, ret_type
));
}
fn python_guest_impl_type_name(ty: &ResolvedTypeRef) -> String {
match ty {
ResolvedTypeRef::AbiType(AbiBuiltin::StringView) => "str".to_owned(),
ResolvedTypeRef::AbiType(AbiBuiltin::Buffer) => "bytes".to_owned(),
_ => python_type_name(ty),
}
}
fn build_python_guest_impl_sig_params(func: &ResolvedFunction) -> String {
func.params
.iter()
.map(|p: &ResolvedParam| format!(", {}: {}", p.name, python_guest_impl_type_name(&p.ty)))
.collect::<Vec<String>>()
.join("")
}
fn python_guest_impl_return_type(returns: &Option<ResolvedTypeRef>) -> String {
match returns {
Some(ty) if !is_void_type(ty) => python_guest_impl_type_name(ty),
_ => "None".to_owned(),
}
}
fn generate_guest_contract_interface(out: &mut String, contract: &ResolvedContract) {
let lower: String = contract.name.replace('.', "_");
let trait_name: String = contract_name_to_guest_trait(&contract.name);
let struct_name: String = contract_name_to_struct(&contract.name);
emit_guest_function_callables(out, contract, &lower, &trait_name, &struct_name);
}
fn generate_guest_plugin_interface(
out: &mut String,
plugin_name: &str,
contract: &ResolvedContract,
) {
let plugin_lower: String = plugin_name.to_lowercase().replace('.', "_");
let plugin_class: String = plugin_guest_trait_name(plugin_name, &contract.name);
let struct_name: String = contract_name_to_struct(&contract.name);
emit_guest_function_callables(out, contract, &plugin_lower, &plugin_class, &struct_name);
}
fn emit_guest_function_callables(
out: &mut String,
contract: &ResolvedContract,
fn_prefix: &str,
impl_class: &str,
struct_name: &str,
) {
for func in &contract.functions {
emit_guest_struct_consts(out, func, fn_prefix, struct_name);
}
for func in &contract.functions {
let abi_name: String = format!("{fn_prefix}_{}_abi", func.name);
let has_return: bool = has_return_value(&func.returns);
let has_params: bool = !func.params.is_empty();
out.push_str(&format!(
"def {abi_name}(impl: {impl_class}, args_ptr: int, out_ptr: int, arena_ptr: int, arena_alloc: Callable[[int, int], int]) -> None:\n"
));
if has_params {
out.push_str(" if not args_ptr:\n");
out.push_str(" raise RuntimeError(\"null args pointer\")\n");
}
if has_return {
out.push_str(" if not out_ptr:\n");
out.push_str(" raise RuntimeError(\"null out pointer\")\n");
}
if !has_params {
out.push_str(" _ = args_ptr\n");
}
let uses_arena: bool = matches!(
&func.returns,
Some(ResolvedTypeRef::AbiType(AbiBuiltin::StringView))
);
if !uses_arena {
out.push_str(" _ = arena_ptr\n");
out.push_str(" _ = arena_alloc\n");
}
emit_guest_abi_args_unpack(out, func, fn_prefix, struct_name);
emit_guest_abi_call(out, func);
emit_guest_abi_return(out, func, fn_prefix);
out.push('\n');
}
}
fn emit_guest_struct_consts(
out: &mut String,
func: &ResolvedFunction,
fn_prefix: &str,
struct_name: &str,
) {
if uses_scalar_args_struct(&func.params) {
let fmt: String = struct_format_for_params(&func.params);
out.push_str(&format!(
"{} = struct.Struct(\"{fmt}\")\n",
args_struct_const_name(fn_prefix, &func.name)
));
}
if let Some(ret) = &func.returns {
if let Some(ch) = scalar_struct_format_char(ret) {
out.push_str(&format!(
"{} = struct.Struct(\"<{ch}\")\n",
ret_struct_const_name(fn_prefix, &func.name)
));
}
}
let _ = struct_name;
}
fn emit_guest_abi_args_unpack(
out: &mut String,
func: &ResolvedFunction,
fn_prefix: &str,
contract_struct: &str,
) {
if func.params.is_empty() {
return;
}
if func.params.len() == 1
&& matches!(
func.params[0].ty,
ResolvedTypeRef::AbiType(AbiBuiltin::StringView)
)
{
let name: &str = &func.params[0].name;
out.push_str(&format!(
" {name}: str = to_str(StringView.from_address(args_ptr))\n"
));
return;
}
if func.params.len() == 1 && !is_scalar_param(&func.params[0]) {
let param: &ResolvedParam = &func.params[0];
let ty_name: String = python_type_name(¶m.ty);
out.push_str(&format!(
" {name}: {ty} = {ty}.from_address(args_ptr)\n",
name = param.name,
ty = ty_name
));
return;
}
if uses_scalar_args_struct(&func.params) {
let const_name: String = args_struct_const_name(fn_prefix, &func.name);
let names: Vec<String> = func
.params
.iter()
.map(|p: &ResolvedParam| p.name.clone())
.collect();
let lhs: String = if names.len() == 1 {
format!("({},)", names[0])
} else {
names.join(", ")
};
out.push_str(&format!(
" {lhs} = {const_name}.unpack(ctypes.string_at(args_ptr, {const_name}.size))\n"
));
return;
}
let pack: String = arg_pack_struct_name(contract_struct, &func.name);
out.push_str(&format!(
" args_val: {pack} = {pack}.from_address(args_ptr)\n"
));
for param in &func.params {
out.push_str(&format!(" {} = args_val.{}\n", param.name, param.name));
}
}
fn emit_guest_abi_call(out: &mut String, func: &ResolvedFunction) {
let call_prefix: &str = if has_return_value(&func.returns) {
" result = "
} else {
" "
};
if func.params.is_empty() {
out.push_str(&format!("{call_prefix}impl.{}()\n", func.name));
return;
}
let args: Vec<String> = func
.params
.iter()
.map(|p: &ResolvedParam| p.name.clone())
.collect();
let args_str: String = args.join(", ");
out.push_str(&format!("{call_prefix}impl.{}({})\n", func.name, args_str));
}
fn emit_guest_abi_return(out: &mut String, func: &ResolvedFunction, fn_prefix: &str) {
if !has_return_value(&func.returns) {
return;
}
let returns: &ResolvedTypeRef = match &func.returns {
Some(ty) => ty,
None => return,
};
if let Some(_ch) = scalar_struct_format_char(returns) {
let const_name: String = ret_struct_const_name(fn_prefix, &func.name);
out.push_str(&format!(
" ctypes.memmove(out_ptr, {const_name}.pack(result), {const_name}.size)\n"
));
return;
}
if matches!(returns, ResolvedTypeRef::AbiType(AbiBuiltin::StringView)) {
out.push_str(
" out_view: StringView = alloc_string_arena(arena_alloc, arena_ptr, result)\n",
);
out.push_str(
" ctypes.memmove(out_ptr, ctypes.addressof(out_view), ctypes.sizeof(out_view))\n",
);
return;
}
let _ = returns;
out.push_str(" ctypes.memmove(out_ptr, ctypes.addressof(result), ctypes.sizeof(result))\n");
}
fn needs_arg_pack(params: &[ResolvedParam]) -> bool {
params.len() >= 2
}
fn is_scalar_param(param: &ResolvedParam) -> bool {
scalar_struct_format_char(¶m.ty).is_some()
}
fn all_params_scalar(params: &[ResolvedParam]) -> bool {
!params.is_empty() && params.iter().all(is_scalar_param)
}
fn scalar_size(ty: &ResolvedTypeRef) -> Option<usize> {
match ty {
ResolvedTypeRef::Primitive(p) => Some(match p {
PrimitiveType::U8 | PrimitiveType::I8 | PrimitiveType::Bool => 1,
PrimitiveType::U16 | PrimitiveType::I16 => 2,
PrimitiveType::U32 | PrimitiveType::I32 | PrimitiveType::F32 => 4,
PrimitiveType::U64 | PrimitiveType::I64 | PrimitiveType::F64 => 8,
}),
ResolvedTypeRef::AbiType(AbiBuiltin::Ptr) => Some(8),
_ => None,
}
}
fn uses_scalar_args_struct(params: &[ResolvedParam]) -> bool {
if !all_params_scalar(params) {
return false;
}
if params.len() == 1 {
return true;
}
let first: usize = match scalar_size(¶ms[0].ty) {
Some(s) => s,
None => return false,
};
params
.iter()
.all(|p: &ResolvedParam| scalar_size(&p.ty) == Some(first))
}
fn scalar_struct_format_char(ty: &ResolvedTypeRef) -> Option<char> {
match ty {
ResolvedTypeRef::Primitive(p) => Some(match p {
PrimitiveType::U8 => 'B',
PrimitiveType::I8 => 'b',
PrimitiveType::U16 => 'H',
PrimitiveType::I16 => 'h',
PrimitiveType::U32 => 'I',
PrimitiveType::I32 => 'i',
PrimitiveType::U64 => 'Q',
PrimitiveType::I64 => 'q',
PrimitiveType::F32 => 'f',
PrimitiveType::F64 => 'd',
PrimitiveType::Bool => '?',
}),
ResolvedTypeRef::AbiType(AbiBuiltin::Ptr) => Some('Q'),
ResolvedTypeRef::AbiType(
AbiBuiltin::StringView | AbiBuiltin::Buffer | AbiBuiltin::Void,
) => None,
ResolvedTypeRef::UserDefined(_) => None,
}
}
fn struct_format_for_params(params: &[ResolvedParam]) -> String {
let mut fmt: String = String::from("<");
for param in params {
if let Some(ch) = scalar_struct_format_char(¶m.ty) {
fmt.push(ch);
}
}
fmt
}
fn args_struct_const_name(fn_prefix: &str, fn_name: &str) -> String {
format!(
"_{}_{}_ARGS",
fn_prefix.to_uppercase(),
fn_name.to_uppercase()
)
}
fn ret_struct_const_name(fn_prefix: &str, fn_name: &str) -> String {
format!(
"_{}_{}_RET",
fn_prefix.to_uppercase(),
fn_name.to_uppercase()
)
}
fn fn_needs_arena(func: &ResolvedFunction) -> bool {
matches!(
&func.returns,
Some(ResolvedTypeRef::AbiType(AbiBuiltin::StringView))
| Some(ResolvedTypeRef::AbiType(AbiBuiltin::Buffer))
| Some(ResolvedTypeRef::UserDefined(_))
)
}
fn contract_needs_arena(contract: &ResolvedContract) -> bool {
contract.functions.iter().any(fn_needs_arena)
}
fn python_type_name(ty: &ResolvedTypeRef) -> String {
match ty {
ResolvedTypeRef::Primitive(p) => python_primitive_type(p).to_owned(),
ResolvedTypeRef::AbiType(AbiBuiltin::StringView) => "StringView".to_owned(),
ResolvedTypeRef::AbiType(AbiBuiltin::Buffer) => "Buffer".to_owned(),
ResolvedTypeRef::AbiType(AbiBuiltin::Ptr) => "ctypes.c_void_p".to_owned(),
ResolvedTypeRef::AbiType(AbiBuiltin::Void) => "None".to_owned(),
ResolvedTypeRef::UserDefined(name) => name.clone(),
}
}
fn python_primitive_type(ty: &PrimitiveType) -> &'static str {
match ty {
PrimitiveType::U8 => "ctypes.c_uint8",
PrimitiveType::U16 => "ctypes.c_uint16",
PrimitiveType::U32 => "ctypes.c_uint32",
PrimitiveType::U64 => "ctypes.c_uint64",
PrimitiveType::I8 => "ctypes.c_int8",
PrimitiveType::I16 => "ctypes.c_int16",
PrimitiveType::I32 => "ctypes.c_int32",
PrimitiveType::I64 => "ctypes.c_int64",
PrimitiveType::F32 => "ctypes.c_float",
PrimitiveType::F64 => "ctypes.c_double",
PrimitiveType::Bool => "ctypes.c_bool",
}
}
fn python_return_type(returns: &Option<ResolvedTypeRef>) -> String {
match returns {
Some(ty) => {
if is_void_type(ty) {
"None".to_owned()
} else {
python_type_name(ty)
}
}
None => "None".to_owned(),
}
}
fn is_void_type(ty: &ResolvedTypeRef) -> bool {
matches!(ty, ResolvedTypeRef::AbiType(AbiBuiltin::Void))
}
fn has_return_value(returns: &Option<ResolvedTypeRef>) -> bool {
match returns {
Some(ty) => !is_void_type(ty),
None => false,
}
}
fn contract_name_to_struct(name: &str) -> String {
let converted: String = name
.split('.')
.map(|p: &str| {
let mut chars: core::str::Chars<'_> = p.chars();
match chars.next() {
Some(c) => c.to_uppercase().collect::<String>() + chars.as_str(),
None => String::new(),
}
})
.collect::<Vec<_>>()
.join("");
format!("{converted}Contract")
}
fn contract_name_to_camel(name: &str) -> String {
name.split('.')
.map(|p: &str| {
let mut chars: core::str::Chars<'_> = p.chars();
match chars.next() {
Some(c) => c.to_uppercase().collect::<String>() + chars.as_str(),
None => String::new(),
}
})
.collect::<Vec<_>>()
.join("")
}
fn contract_name_to_guest_trait(name: &str) -> String {
format!("{}GuestContract", contract_name_to_camel(name))
}
fn plugin_guest_trait_name(plugin_name: &str, contract_name: &str) -> String {
let plugin_upper: String = plugin_name.to_uppercase().replace('.', "_");
format!(
"{plugin_upper}{}Plugin",
contract_name_to_camel(contract_name)
)
}
fn contract_name_to_upper_snake(name: &str) -> String {
name.replace('.', "_").to_uppercase()
}
fn arg_pack_struct_name(contract_struct: &str, fn_name: &str) -> String {
let fn_pascal: String = fn_name
.split('_')
.map(|seg: &str| {
let mut chars: core::str::Chars<'_> = seg.chars();
match chars.next() {
Some(c) => c.to_uppercase().collect::<String>() + chars.as_str(),
None => String::new(),
}
})
.collect::<Vec<_>>()
.join("");
format!("{contract_struct}{fn_pascal}Args")
}
fn host_contract_name_to_python_class(name: &str) -> String {
let name_without_prefix: &str = name.strip_prefix("host.").unwrap_or(name);
let pascal: String = name_without_prefix
.split('.')
.map(|p: &str| {
let mut chars: core::str::Chars<'_> = p.chars();
match chars.next() {
Some(c) => c.to_uppercase().collect::<String>() + chars.as_str(),
None => String::new(),
}
})
.collect::<Vec<_>>()
.join("");
if pascal.starts_with("Host") {
pascal
} else {
format!("Host{}", pascal)
}
}
fn host_contract_name_to_python_caller(name: &str) -> String {
let name_without_prefix: &str = name.strip_prefix("host.").unwrap_or(name);
let pascal: String = name_without_prefix
.split('.')
.map(|p: &str| {
let mut chars: core::str::Chars<'_> = p.chars();
match chars.next() {
Some(c) => c.to_uppercase().collect::<String>() + chars.as_str(),
None => String::new(),
}
})
.collect::<Vec<_>>()
.join("");
if pascal.starts_with("Host") {
pascal + "Contract"
} else {
format!("Host{}Contract", pascal)
}
}
fn python_host_param_type_name(ty: &ResolvedTypeRef) -> String {
match ty {
ResolvedTypeRef::Primitive(p) => match p {
PrimitiveType::U8 | PrimitiveType::U16 | PrimitiveType::U32 | PrimitiveType::U64 => {
"int".to_owned()
}
PrimitiveType::I8 | PrimitiveType::I16 | PrimitiveType::I32 | PrimitiveType::I64 => {
"int".to_owned()
}
PrimitiveType::F32 | PrimitiveType::F64 => "float".to_owned(),
PrimitiveType::Bool => "bool".to_owned(),
},
ResolvedTypeRef::AbiType(AbiBuiltin::StringView) => "str".to_owned(),
ResolvedTypeRef::AbiType(AbiBuiltin::Buffer) => "bytes".to_owned(),
ResolvedTypeRef::AbiType(AbiBuiltin::Ptr) => "int".to_owned(),
ResolvedTypeRef::AbiType(AbiBuiltin::Void) => "None".to_owned(),
ResolvedTypeRef::UserDefined(name) => name.clone(),
}
}
fn python_host_return_type_name(ty: &ResolvedTypeRef) -> String {
match ty {
ResolvedTypeRef::Primitive(p) => match p {
PrimitiveType::U8 | PrimitiveType::U16 | PrimitiveType::U32 | PrimitiveType::U64 => {
"int".to_owned()
}
PrimitiveType::I8 | PrimitiveType::I16 | PrimitiveType::I32 | PrimitiveType::I64 => {
"int".to_owned()
}
PrimitiveType::F32 | PrimitiveType::F64 => "float".to_owned(),
PrimitiveType::Bool => "bool".to_owned(),
},
ResolvedTypeRef::AbiType(AbiBuiltin::StringView) => "str".to_owned(),
ResolvedTypeRef::AbiType(AbiBuiltin::Buffer) => "bytes".to_owned(),
ResolvedTypeRef::AbiType(AbiBuiltin::Ptr) => "int".to_owned(),
ResolvedTypeRef::AbiType(AbiBuiltin::Void) => "None".to_owned(),
ResolvedTypeRef::UserDefined(name) => name.clone(),
}
}
fn python_guest_caller_param_type_name(ty: &ResolvedTypeRef) -> String {
match ty {
ResolvedTypeRef::Primitive(p) => match p {
PrimitiveType::U8 | PrimitiveType::U16 | PrimitiveType::U32 | PrimitiveType::U64 => {
"int".to_owned()
}
PrimitiveType::I8 | PrimitiveType::I16 | PrimitiveType::I32 | PrimitiveType::I64 => {
"int".to_owned()
}
PrimitiveType::F32 | PrimitiveType::F64 => "float".to_owned(),
PrimitiveType::Bool => "bool".to_owned(),
},
ResolvedTypeRef::AbiType(AbiBuiltin::StringView) => "str".to_owned(),
ResolvedTypeRef::AbiType(AbiBuiltin::Buffer) => "bytes".to_owned(),
ResolvedTypeRef::AbiType(AbiBuiltin::Ptr) => "int".to_owned(),
ResolvedTypeRef::AbiType(AbiBuiltin::Void) => "None".to_owned(),
ResolvedTypeRef::UserDefined(name) => name.clone(),
}
}
fn python_guest_caller_return_type_name(ty: &ResolvedTypeRef) -> String {
match ty {
ResolvedTypeRef::Primitive(p) => match p {
PrimitiveType::U8 | PrimitiveType::U16 | PrimitiveType::U32 | PrimitiveType::U64 => {
"int".to_owned()
}
PrimitiveType::I8 | PrimitiveType::I16 | PrimitiveType::I32 | PrimitiveType::I64 => {
"int".to_owned()
}
PrimitiveType::F32 | PrimitiveType::F64 => "float".to_owned(),
PrimitiveType::Bool => "bool".to_owned(),
},
ResolvedTypeRef::AbiType(AbiBuiltin::StringView) => "memoryview".to_owned(),
ResolvedTypeRef::AbiType(AbiBuiltin::Buffer) => "memoryview".to_owned(),
ResolvedTypeRef::AbiType(AbiBuiltin::Ptr) => "int".to_owned(),
ResolvedTypeRef::AbiType(AbiBuiltin::Void) => "None".to_owned(),
ResolvedTypeRef::UserDefined(name) => name.clone(),
}
}
fn emit_python_guest_host_contract_success_return(
out: &mut String,
returns: &Option<ResolvedTypeRef>,
enums: &[EnumDef],
) {
let Some(ret_ty) = returns else {
return;
};
if let Some(e) = python_enum_for_type(ret_ty, enums) {
out.push_str(&format!(" return {}(result.value)\n", e.name));
return;
}
match ret_ty {
ResolvedTypeRef::AbiType(AbiBuiltin::StringView)
| ResolvedTypeRef::AbiType(AbiBuiltin::Buffer) => {
out.push_str(
" # Borrowed view into host-owned memory, valid until the next call on this caller.\n",
);
out.push_str(" _addr: int = result.ptr\n");
out.push_str(" _len: int = result.len\n");
out.push_str(" if not _addr or _len == 0:\n");
out.push_str(" return memoryview(b\"\")\n");
out.push_str(" return memoryview((ctypes.c_char * _len).from_address(_addr))\n");
}
ResolvedTypeRef::Primitive(_) | ResolvedTypeRef::AbiType(AbiBuiltin::Ptr) => {
out.push_str(" return result.value\n");
}
ResolvedTypeRef::UserDefined(_) | ResolvedTypeRef::AbiType(_) => {
out.push_str(" return result\n");
}
}
}
fn generate_python_host_contract_method(out: &mut String, func: &ResolvedFunction) {
let return_type: String = match &func.returns {
Some(ty) => python_host_return_type_name(ty),
None => "None".to_owned(),
};
let params_str: String = if func.params.is_empty() {
String::new()
} else {
func.params
.iter()
.map(|p: &ResolvedParam| {
let py_ty: String = python_host_param_type_name(&p.ty);
format!("{}: {}", p.name, py_ty)
})
.collect::<Vec<_>>()
.join(", ")
};
out.push_str(&format!(
" @abstractmethod\n def {}(self{}{}) -> {}:\n pass\n\n",
func.name,
if params_str.is_empty() { "" } else { ", " },
params_str,
return_type
));
}
fn generate_python_host_contract_abc(out: &mut String, contract: &ResolvedHostContract) {
let class_name: String = host_contract_name_to_python_class(&contract.name);
out.push_str(&format!(
"# Host contract `{}` (id=0x{:016X})\n",
contract.name, contract.contract_id
));
out.push_str(&format!("class {}(ABC):\n", class_name));
for func in &contract.functions {
generate_python_host_contract_method(out, func);
}
out.push('\n');
}
fn generate_host_contracts_file(ir: &ValidatedIr) -> String {
let mut out: String = String::new();
out.push_str(PY_HEADER);
out.push_str("from __future__ import annotations\n");
out.push_str("from abc import ABC, abstractmethod\n\n");
for contract in &ir.host_contracts {
generate_python_host_contract_abc(&mut out, contract);
}
out.push_str("# Contract ID constants\n");
for contract in &ir.host_contracts {
let class_name: String = host_contract_name_to_python_class(&contract.name);
let const_name: String = format!("{}_CONTRACT_ID", class_name.to_uppercase());
out.push_str(&format!(
"{}: int = 0x{:016X}\n",
const_name, contract.contract_id
));
}
out
}
fn generate_host_contracts_stub(ir: &ValidatedIr) -> String {
let mut out: String = String::new();
out.push_str(PY_HEADER);
out.push_str("from __future__ import annotations\n");
out.push_str("from abc import ABC, abstractmethod\n\n");
for contract in &ir.host_contracts {
let class_name: String = host_contract_name_to_python_class(&contract.name);
out.push_str(&format!("class {}(ABC):\n", class_name));
for func in &contract.functions {
let return_type: String = match &func.returns {
Some(ty) => python_host_return_type_name(ty),
None => "None".to_owned(),
};
let params_str: String = if func.params.is_empty() {
String::new()
} else {
func.params
.iter()
.map(|p: &ResolvedParam| {
let py_ty: String = python_host_param_type_name(&p.ty);
format!("{}: {}", p.name, py_ty)
})
.collect::<Vec<_>>()
.join(", ")
};
out.push_str(&format!(
" @abstractmethod\n def {}(self{}{}) -> {}: ...\n",
func.name,
if params_str.is_empty() { "" } else { ", " },
params_str,
return_type
));
}
out.push('\n');
}
for contract in &ir.host_contracts {
let class_name: String = host_contract_name_to_python_class(&contract.name);
let const_name: String = format!("{}_CONTRACT_ID", class_name.to_uppercase());
out.push_str(&format!("{}: int\n", const_name));
}
out
}
fn generate_python_guest_host_contract_caller(
out: &mut String,
contract: &ResolvedHostContract,
enums: &[EnumDef],
) {
let class_name: String = host_contract_name_to_python_caller(&contract.name);
out.push_str(&format!(
"# Guest caller for host contract `{}` (id=0x{:016X})\n",
contract.name, contract.contract_id
));
out.push_str(&format!("class {}:\n", class_name));
out.push_str(
" def __init__(self, interface: int, instance: HostContractInstance) -> None:\n",
);
out.push_str(" self._interface: int = interface\n");
out.push_str(" self._instance: HostContractInstance = instance\n\n");
out.push_str(" @classmethod\n");
out.push_str(" def from_host(cls, host_ptr: int, min_version: int = 0) -> Self | None:\n");
out.push_str(" if host_ptr == 0:\n");
out.push_str(" return None\n");
out.push_str(" host: Any = ctypes.cast(host_ptr, ctypes.POINTER(HostApi))\n");
out.push_str(
" # Resolve the contract vtable — the source of dispatch metadata, NOT the instance.\n",
);
out.push_str(&format!(
" interface: int | None = host.contents.resolve_host_contract_interface(host_ptr, 0x{:016X}, min_version)\n",
contract.contract_id
));
out.push_str(" if not interface:\n");
out.push_str(" return None\n");
out.push_str(
" # Per-instance state; native dispatch functions receive instance.data first.\n",
);
out.push_str(&format!(
" instance: HostContractInstance = host.contents.get_host_contract(host_ptr, 0x{:016X}, min_version)\n",
contract.contract_id
));
out.push_str(" return cls(interface, instance)\n\n");
out.push_str(" def is_valid(self) -> bool:\n");
out.push_str(" return self._interface != 0\n\n");
for func in &contract.functions {
generate_python_guest_host_contract_method(out, func, enums);
}
out.push('\n');
}
fn generate_python_guest_host_contract_method(
out: &mut String,
func: &ResolvedFunction,
enums: &[EnumDef],
) {
let fn_id: u32 = func.function_id;
let return_type: String = match &func.returns {
Some(ty) => python_guest_caller_return_type_name(ty),
None => "None".to_owned(),
};
let has_return: bool = func.returns.is_some();
let params_str: String = if func.params.is_empty() {
String::new()
} else {
func.params
.iter()
.map(|p: &ResolvedParam| {
let py_ty: String = python_guest_caller_param_type_name(&p.ty);
format!(", {}: {}", p.name, py_ty)
})
.collect::<Vec<_>>()
.join("")
};
out.push_str(&format!(
" def {}(self{}) -> {}:\n",
func.name, params_str, return_type
));
out.push_str(" if self._interface == 0:\n");
if has_return {
out.push_str(" return None\n");
} else {
out.push_str(" return\n");
}
out.push_str(" iface: Any = ctypes.cast(self._interface, ctypes.POINTER(HostContractInterface)).contents\n");
out.push_str(" dispatch_type: int = iface.dispatch_type\n");
emit_python_guest_host_contract_args_setup(out, func, enums);
emit_python_guest_host_contract_out_setup(out, &func.returns, enums);
out.push_str(" err: AbiError = AbiError()\n");
out.push_str(" if dispatch_type == DispatchType.Native:\n");
out.push_str(&format!(
" if {fn_id} >= iface.dispatch.native.function_count:\n"
));
if has_return {
out.push_str(" return None\n");
} else {
out.push_str(" return\n");
}
out.push_str(&format!(
" fn_ptr: int = ctypes.cast(iface.dispatch.native.functions + {fn_id} * 8, ctypes.POINTER(ctypes.c_void_p)).contents.value\n"
));
out.push_str(
" dispatch_fn: _DISPATCH_FN_CTYPE = ctypes.cast(fn_ptr, _DISPATCH_FN_CTYPE)\n",
);
out.push_str(
" dispatch_fn(self._instance.data, args_ptr, out_ptr, ctypes.byref(err))\n",
);
out.push_str(" elif dispatch_type == DispatchType.VirtualMachine:\n");
out.push_str(&format!(
" iface.dispatch.vm.call(iface.dispatch.vm.loader_data, GuestContractInstance(), {fn_id}, args_ptr, out_ptr, None, ctypes.byref(err))\n"
));
out.push_str(" else:\n");
if has_return {
out.push_str(" return None\n");
} else {
out.push_str(" return\n");
}
out.push_str(" if err.code != AbiErrorCode.Ok:\n");
if has_return {
out.push_str(" return None\n");
} else {
out.push_str(" return\n");
}
emit_python_guest_host_contract_success_return(out, &func.returns, enums);
out.push('\n');
}
fn emit_python_guest_host_contract_args_setup(
out: &mut String,
func: &ResolvedFunction,
enums: &[EnumDef],
) {
if func.params.is_empty() {
out.push_str(" args_ptr: ctypes.c_void_p = ctypes.c_void_p()\n");
return;
}
if func.params.len() == 1 {
let param: &ResolvedParam = &func.params[0];
match ¶m.ty {
ResolvedTypeRef::AbiType(AbiBuiltin::StringView) => {
out.push_str(&format!(
" {name}_bytes: bytes = {name}.encode('utf-8')\n",
name = param.name
));
out.push_str(&format!(
" {name}_view: StringView = StringView(ptr=ctypes.cast(ctypes.c_char_p({name}_bytes), ctypes.c_void_p), len=len({name}_bytes))\n",
name = param.name
));
out.push_str(&format!(
" args_ptr: ctypes.c_void_p = ctypes.cast(ctypes.byref({name}_view), ctypes.c_void_p)\n",
name = param.name
));
}
ResolvedTypeRef::AbiType(AbiBuiltin::Buffer) => {
out.push_str(&format!(
" {name}_buf: Buffer = Buffer(ptr=ctypes.c_void_p(ctypes.addressof({name}) if len({name}) > 0 else 0), len=len({name}))\n",
name = param.name
));
out.push_str(&format!(
" args_ptr: ctypes.c_void_p = ctypes.cast(ctypes.byref({name}_buf), ctypes.c_void_p)\n",
name = param.name
));
}
ResolvedTypeRef::UserDefined(_) => {
if let Some(e) = python_enum_for_type(¶m.ty, enums) {
let raw_ctype: &str = python_ctype_for_repr(&e.repr);
out.push_str(&format!(
" {name}_val: {raw_ctype} = {raw_ctype}(int({name}))\n",
name = param.name
));
out.push_str(&format!(
" args_ptr: ctypes.c_void_p = ctypes.cast(ctypes.byref({name}_val), ctypes.c_void_p)\n",
name = param.name
));
} else {
out.push_str(&format!(
" args_ptr: ctypes.c_void_p = ctypes.cast(ctypes.byref({}), ctypes.c_void_p)\n",
param.name
));
}
}
ResolvedTypeRef::Primitive(_) | ResolvedTypeRef::AbiType(_) => {
let ty_name: String = python_type_name(¶m.ty);
out.push_str(&format!(
" {name}_val: {ty} = {ty}({name})\n",
name = param.name,
ty = ty_name
));
out.push_str(&format!(
" args_ptr: ctypes.c_void_p = ctypes.cast(ctypes.byref({name}_val), ctypes.c_void_p)\n",
name = param.name
));
}
}
return;
}
out.push_str(" class _ArgsPack(ctypes.Structure):\n");
out.push_str(" _fields_ = [\n");
for param in &func.params {
let param_ty: String = python_pack_field_type(¶m.ty, enums);
out.push_str(&format!(
" (\"{}\", {}),\n",
param.name, param_ty
));
}
out.push_str(" ]\n");
out.push_str(" args_val: _ArgsPack = _ArgsPack()\n");
for param in &func.params {
match ¶m.ty {
ResolvedTypeRef::AbiType(AbiBuiltin::StringView) => {
out.push_str(&format!(
" {name}_bytes: bytes = {name}.encode('utf-8')\n",
name = param.name
));
out.push_str(&format!(
" args_val.{name} = StringView(ptr=ctypes.cast(ctypes.c_char_p({name}_bytes), ctypes.c_void_p), len=len({name}_bytes))\n",
name = param.name
));
}
ResolvedTypeRef::AbiType(AbiBuiltin::Buffer) => {
out.push_str(&format!(
" args_val.{name} = Buffer(ptr=ctypes.c_void_p(ctypes.addressof({name}) if len({name}) > 0 else 0), len=len({name}))\n",
name = param.name
));
}
_ => {
if python_enum_for_type(¶m.ty, enums).is_some() {
out.push_str(&format!(" args_val.{0} = int({0})\n", param.name));
} else {
out.push_str(&format!(" args_val.{0} = {0}\n", param.name));
}
}
}
}
out.push_str(
" args_ptr: ctypes.c_void_p = ctypes.cast(ctypes.byref(args_val), ctypes.c_void_p)\n",
);
}
fn emit_python_guest_host_contract_out_setup(
out: &mut String,
returns: &Option<ResolvedTypeRef>,
enums: &[EnumDef],
) {
if let Some(ret_ty) = returns {
let py_ty: String = python_guest_caller_return_type_name(ret_ty);
if let Some(e) = python_enum_for_type(ret_ty, enums) {
let raw_ctype: &str = python_ctype_for_repr(&e.repr);
out.push_str(&format!(" result: {raw_ctype} = {raw_ctype}()\n"));
out.push_str(" out_ptr: ctypes.c_void_p = ctypes.cast(ctypes.byref(result), ctypes.c_void_p)\n");
} else if matches!(ret_ty, ResolvedTypeRef::AbiType(AbiBuiltin::StringView)) {
out.push_str(" result: StringView = StringView()\n");
out.push_str(" out_ptr: ctypes.c_void_p = ctypes.cast(ctypes.byref(result), ctypes.c_void_p)\n");
} else if matches!(ret_ty, ResolvedTypeRef::AbiType(AbiBuiltin::Buffer)) {
out.push_str(" result: Buffer = Buffer()\n");
out.push_str(" out_ptr: ctypes.c_void_p = ctypes.cast(ctypes.byref(result), ctypes.c_void_p)\n");
} else if matches!(ret_ty, ResolvedTypeRef::UserDefined(_)) {
out.push_str(&format!(" result: {py_ty} = {py_ty}()\n"));
out.push_str(" out_ptr: ctypes.c_void_p = ctypes.cast(ctypes.byref(result), ctypes.c_void_p)\n");
} else {
let ctypes_ty: String = python_type_name(ret_ty);
out.push_str(&format!(" result: {ctypes_ty} = {ctypes_ty}()\n"));
out.push_str(" out_ptr: ctypes.c_void_p = ctypes.cast(ctypes.byref(result), ctypes.c_void_p)\n");
}
} else {
out.push_str(" out_ptr: ctypes.c_void_p = ctypes.c_void_p()\n");
}
}
fn generate_guest_host_contracts_file(ir: &ValidatedIr) -> String {
let mut out: String = String::new();
out.push_str(PY_HEADER);
out.push_str("from __future__ import annotations\n");
out.push_str("import ctypes\n");
out.push_str("from typing import Any, Self\n");
out.push_str("from polyplug_abi import AbiErrorCode, AbiError, Buffer, DispatchType, GuestContractInstance, HostContractInstance, HostContractInterface, HostApi, StringView\n\n");
let type_imports: BTreeSet<String> = collect_python_guest_host_contract_type_imports(ir);
if !type_imports.is_empty() {
let import_list: String = type_imports.into_iter().collect::<Vec<String>>().join(", ");
out.push_str(&format!("from guest.types import {}\n\n", import_list));
}
out.push_str(
"_DISPATCH_FN_CTYPE = ctypes.CFUNCTYPE(None, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p)\n\n",
);
for contract in &ir.host_contracts {
generate_python_guest_host_contract_caller(&mut out, contract, &ir.enums);
}
out.push_str("# Contract ID constants\n");
for contract in &ir.host_contracts {
let class_name: String = host_contract_name_to_python_caller(&contract.name);
let const_name: String = format!("{}_ID", class_name.to_uppercase());
out.push_str(&format!(
"{}: int = 0x{:016X}\n",
const_name, contract.contract_id
));
}
out
}
fn collect_python_guest_host_contract_type_imports(ir: &ValidatedIr) -> BTreeSet<String> {
let mut imports: BTreeSet<String> = BTreeSet::new();
for contract in &ir.host_contracts {
for func in &contract.functions {
for param in &func.params {
if let ResolvedTypeRef::UserDefined(name) = ¶m.ty {
imports.insert(name.clone());
}
}
if let Some(ResolvedTypeRef::UserDefined(name)) = &func.returns {
imports.insert(name.clone());
}
}
}
imports
}
fn generate_guest_host_contracts_stub(ir: &ValidatedIr) -> String {
let mut out: String = String::new();
out.push_str(PY_HEADER);
out.push_str("from __future__ import annotations\n");
out.push_str("from typing import Any, Self\n");
out.push_str("from polyplug_abi import HostContractInstance\n");
out.push('\n');
let type_imports: BTreeSet<String> = collect_python_guest_host_contract_type_imports(ir);
if !type_imports.is_empty() {
let import_list: String = type_imports.into_iter().collect::<Vec<String>>().join(", ");
out.push_str(&format!("from guest.types import {}\n\n", import_list));
}
for contract in &ir.host_contracts {
let class_name: String = host_contract_name_to_python_caller(&contract.name);
out.push_str(&format!("class {}:\n", class_name));
out.push_str(
" def __init__(self, interface: int, instance: HostContractInstance) -> None: ...\n",
);
out.push_str(" @classmethod\n");
out.push_str(
" def from_host(cls, host_ptr: int, min_version: int = 0) -> Self | None: ...\n",
);
out.push_str(" def is_valid(self) -> bool: ...\n");
for func in &contract.functions {
let return_type: String = match &func.returns {
Some(ty) => python_guest_caller_return_type_name(ty),
None => "None".to_owned(),
};
let params_str: String = if func.params.is_empty() {
String::new()
} else {
func.params
.iter()
.map(|p: &ResolvedParam| {
let py_ty: String = python_guest_caller_param_type_name(&p.ty);
format!(", {}: {}", p.name, py_ty)
})
.collect::<Vec<_>>()
.join("")
};
out.push_str(&format!(
" def {}(self{}) -> {}: ...\n",
func.name, params_str, return_type
));
}
out.push('\n');
}
for contract in &ir.host_contracts {
let class_name: String = host_contract_name_to_python_caller(&contract.name);
let const_name: String = format!("{}_ID", class_name.to_uppercase());
out.push_str(&format!("{}: int\n", const_name));
}
out
}
fn generate_bundle_manifest_python(ir: &ValidatedIr) -> String {
let bundle: &ResolvedBundle = match ir.bundle.as_ref() {
Some(b) => b,
None => return String::from("# ERROR: bundle manifest called without bundle IR\n"),
};
let name: &str = &bundle.name;
let version: String = format!(
"{}.{}.{}",
bundle.version.major, bundle.version.minor, bundle.version.patch
);
let file_field: String = super::format_manifest_file_field(&bundle.file);
let mut provides: Vec<String> = bundle
.plugins
.iter()
.flat_map(|p: &ResolvedPlugin| p.implements.iter().cloned())
.map(|impl_str: String| {
if let Some(at_pos) = impl_str.find('@') {
let contract_name: &str = &impl_str[..at_pos];
let version_part: &str = &impl_str[at_pos + 1..];
if let Some(dot_pos) = version_part.find('.') {
let major: &str = &version_part[..dot_pos];
format!("{}@{}", contract_name, major)
} else {
impl_str
}
} else {
impl_str
}
})
.collect();
provides.sort();
provides.dedup();
let provides_toml: String = if provides.is_empty() {
String::from("[]")
} else {
format!(
"[{}]",
provides
.iter()
.map(|s: &String| format!("\"{}\"", s))
.collect::<Vec<_>>()
.join(", ")
)
};
let provides_set: std::collections::HashSet<String> = provides.iter().cloned().collect();
let fn_count_entries: Vec<String> = ir
.contracts
.iter()
.filter(|c: &&ResolvedContract| {
provides_set.contains(&format!("{}@{}", c.name, c.version.major))
})
.map(|c: &ResolvedContract| {
let fn_count: u32 = c.functions.len() as u32;
format!("\"{}@{}\" = {}", c.name, c.version.major, fn_count)
})
.collect();
let function_count_toml: String = format!("{{ {} }}", fn_count_entries.join(", "));
let reinit: bool = bundle.needs_reinit_on_dep_reload;
let dep_toml: String = super::emit_manifest_dependencies(&bundle.dependencies);
let loader: &str = &bundle.loader;
format!(
"# THIS FILE IS AUTO-GENERATED BY polyplugc. DO NOT EDIT.\n\
name = \"{name}\"\n\
id = {bundle_id}\n\
version = \"{version}\"\n\
loader = \"{loader}\"\n\
provides = {provides_toml}\n\
function_count = {function_count_toml}\n\
needs_reinit_on_dep_reload = {reinit}\n\
{file_field}\n\
{dep_toml}",
bundle_id = bundle.bundle_id
)
}
fn to_upper_snake_case(s: &str) -> String {
let mut result: String = String::new();
for (i, c) in s.chars().enumerate() {
if c.is_uppercase() && i > 0 {
result.push('_');
}
result.extend(c.to_uppercase());
}
result
}
fn substitute_variant_refs_python(declared_variants: &[EnumVariant], expr: &str) -> String {
let chars: Vec<char> = expr.chars().collect();
let len: usize = chars.len();
let mut result: String = String::new();
let mut i: usize = 0;
while i < len {
let c: char = chars[i];
if c.is_alphabetic() || c == '_' {
let start: usize = i;
while i < len && (chars[i].is_alphanumeric() || chars[i] == '_') {
i += 1;
}
let ident: String = chars[start..i].iter().collect();
let found_variant: Option<&EnumVariant> =
declared_variants.iter().find(|v| v.name == ident);
if let Some(ref_variant) = found_variant {
result.push('(');
result.push_str(&ref_variant.value);
result.push(')');
} else {
result.push_str(&ident);
}
} else {
result.push(c);
i += 1;
}
}
result
}
fn generate_python_enum(out: &mut String, e: &EnumDef) {
let base_class: &str = if e.bitflag {
"enum.IntFlag"
} else {
"enum.IntEnum"
};
out.push_str(&format!("class {}({}):\n", e.name, base_class));
for variant in &e.variants {
let upper_name: String = to_upper_snake_case(&variant.name);
let subst_value: String = substitute_variant_refs_python(&e.variants, &variant.value);
out.push_str(&format!(" {} = {}\n", upper_name, subst_value));
}
}
fn collect_host_contract_user_types(ir: &ValidatedIr) -> Vec<String> {
let mut names: Vec<String> = Vec::new();
let push_unique = |name: &str, names: &mut Vec<String>| {
if !names.iter().any(|n: &String| n == name) {
names.push(name.to_owned());
}
};
for contract in &ir.host_contracts {
for func in &contract.functions {
for param in &func.params {
if let ResolvedTypeRef::UserDefined(name) = ¶m.ty {
push_unique(name, &mut names);
}
}
if let Some(ResolvedTypeRef::UserDefined(name)) = &func.returns {
push_unique(name, &mut names);
}
}
}
names
}
fn generate_python_host_interface_factories_file(ir: &ValidatedIr) -> String {
let mut out: String = String::new();
out.push_str(PY_HEADER);
out.push_str("from __future__ import annotations\n");
out.push_str("import ctypes\n");
out.push_str("from polyplug_abi import (\n");
out.push_str(" AbiErrorCode,\n");
out.push_str(" Buffer,\n");
out.push_str(" DispatchType,\n");
out.push_str(" HostContractInterface,\n");
out.push_str(" StringView,\n");
out.push_str(" Version,\n");
out.push_str(")\n");
let contract_classes: Vec<String> = ir
.host_contracts
.iter()
.map(|c: &ResolvedHostContract| host_contract_name_to_python_class(&c.name))
.collect();
if !contract_classes.is_empty() {
out.push_str(&format!(
"from host.contracts import {}\n",
contract_classes.join(", ")
));
}
let user_types: Vec<String> = collect_host_contract_user_types(ir);
if !user_types.is_empty() {
out.push_str(&format!(
"from host.types import {}\n",
user_types.join(", ")
));
}
out.push('\n');
out.push_str("# ctypes callbacks cannot return structs by value (\"invalid result type\n");
out.push_str("# for callback function\"), so host contracts are registered with VM\n");
out.push_str("# dispatch routed through the native trampoline exported by the python\n");
out.push_str("# loader cdylib (polyplug_python_host_vm_dispatch in polyplug_python);\n");
out.push_str("# the per-contract marshalling lives in a scalar-only ctypes dispatcher.\n");
out.push_str("_HOST_DISPATCH_CALLBACK_CTYPE = ctypes.CFUNCTYPE(\n");
out.push_str(" ctypes.c_uint32, ctypes.c_uint32, ctypes.c_void_p, ctypes.c_void_p\n");
out.push_str(")\n\n\n");
out.push_str("class _PolyplugPythonHostDispatchBridge(ctypes.Structure):\n");
out.push_str(
" \"\"\"Mirrors PolyplugPythonHostDispatchBridge (crates/polyplug_python/src/ffi.rs).\"\"\"\n\n",
);
out.push_str(" _fields_ = [(\"callback\", _HOST_DISPATCH_CALLBACK_CTYPE)]\n\n\n");
for contract in &ir.host_contracts {
let class_name: String = host_contract_name_to_python_class(&contract.name);
for func in &contract.functions {
if func.params.len() >= 2 {
let pack_name: String = python_host_pack_struct_name(&class_name, &func.name);
out.push_str(&format!("class {pack_name}(ctypes.Structure):\n"));
out.push_str(" _fields_ = [\n");
for param in &func.params {
let field_ty: String = python_pack_field_type(¶m.ty, &ir.enums);
out.push_str(&format!(" (\"{}\", {}),\n", param.name, field_ty));
}
out.push_str(" ]\n\n\n");
}
}
}
for contract in &ir.host_contracts {
generate_python_host_interface_factory(&mut out, contract, &ir.enums);
}
out
}
fn python_host_pack_struct_name(class_name: &str, fn_name: &str) -> String {
let fn_pascal: String = fn_name
.split('_')
.map(|seg: &str| {
let mut chars: core::str::Chars<'_> = seg.chars();
match chars.next() {
Some(c) => c.to_uppercase().collect::<String>() + chars.as_str(),
None => String::new(),
}
})
.collect::<Vec<String>>()
.join("");
format!("_{class_name}{fn_pascal}Args")
}
fn generate_python_host_interface_factory(
out: &mut String,
contract: &ResolvedHostContract,
enums: &[EnumDef],
) {
let class_name: String = host_contract_name_to_python_class(&contract.name);
let factory_name: String = format!(
"create_{}_interface",
contract.name.replace('.', "_").to_lowercase()
);
let contract_id: u64 = contract.contract_id;
let major: u32 = contract.version.major;
let minor: u32 = contract.version.minor;
let singleton: bool = contract.singleton;
out.push_str(&format!(
"# Create a host contract interface for `{}` (VM dispatch via the python\n",
contract.name
));
out.push_str("# loader trampoline — see the module header for why native dispatch is\n");
out.push_str("# impossible under ctypes).\n");
out.push_str("#\n");
out.push_str("# Arguments:\n");
out.push_str("# impl: implementation instance (must inherit from the ABC class)\n");
out.push_str("# python_bridge_lib: ctypes.CDLL handle for the python loader cdylib\n");
out.push_str("# (libpolyplug_python), e.g. polyplug_loaders_python.bridge_lib()\n");
out.push_str("#\n");
out.push_str("# Memory:\n");
out.push_str("# The returned interface (with the dispatcher and bridge anchored on its\n");
out.push_str("# `_keepalive`) must stay alive for the lifetime of the program.\n");
out.push_str(&format!(
"def {factory_name}(impl: {class_name}, python_bridge_lib: ctypes.CDLL) -> HostContractInterface:\n"
));
out.push_str(" @_HOST_DISPATCH_CALLBACK_CTYPE\n");
out.push_str(" def _dispatch(fn_id: int, args: int, out: int) -> int:\n");
out.push_str(" try:\n");
for (idx, func) in contract.functions.iter().enumerate() {
out.push_str(&format!(" if fn_id == {idx}:\n"));
generate_python_host_dispatch_args(out, &class_name, func, enums);
generate_python_host_dispatch_call(out, func);
out.push_str(" return AbiErrorCode.Ok\n");
}
out.push_str(" return AbiErrorCode.FunctionNotAvailable\n");
out.push_str(" except Exception:\n");
out.push_str(" return AbiErrorCode.Panic\n\n");
out.push_str(" bridge = _PolyplugPythonHostDispatchBridge(callback=_dispatch)\n\n");
out.push_str(" interface = HostContractInterface()\n");
out.push_str(&format!(
" interface.contract_id = 0x{contract_id:016X}\n"
));
out.push_str(&format!(
" interface.contract_version = Version(major={major}, minor={minor}, patch=0)\n"
));
out.push_str(&format!(
" interface.singleton = {singleton}\n",
singleton = python_bool(singleton)
));
out.push_str(" interface.dispatch_type = DispatchType.VirtualMachine\n");
out.push_str(" interface.runtime = 0 # Set by runtime during registration\n");
out.push_str(" # Opaque NON-NULL instance token: the native create_instance stub\n");
out.push_str(" # returns user_data as the instance data and guest callers treat a\n");
out.push_str(" # null instance.data as \"contract unavailable\".\n");
out.push_str(" interface.user_data = ctypes.addressof(bridge)\n");
out.push_str(" # ctypes cannot create struct-returning callbacks, so the instance\n");
out.push_str(" # stubs and the VM dispatch trampoline are NATIVE functions exported\n");
out.push_str(" # by the python loader cdylib.\n");
out.push_str(
" interface.create_instance = ctypes.cast(python_bridge_lib.polyplug_python_host_create_instance, type(interface.create_instance))\n",
);
out.push_str(
" interface.destroy_instance = ctypes.cast(python_bridge_lib.polyplug_python_host_destroy_instance, type(interface.destroy_instance))\n",
);
out.push_str(
" interface.dispatch.vm.call = ctypes.cast(python_bridge_lib.polyplug_python_host_vm_dispatch, type(interface.dispatch.vm.call))\n",
);
out.push_str(" interface.dispatch.vm.loader_data.data = ctypes.addressof(bridge)\n");
out.push_str(" # Anchor the bridge, dispatcher callback, and implementation on the\n");
out.push_str(" # interface object so they outlive this factory call.\n");
out.push_str(" interface._keepalive = (bridge, _dispatch, impl)\n\n");
out.push_str(" return interface\n\n");
}
fn generate_python_host_dispatch_args(
out: &mut String,
class_name: &str,
func: &ResolvedFunction,
enums: &[EnumDef],
) {
if func.params.is_empty() {
return;
}
if func.params.len() == 1 {
let param: &ResolvedParam = &func.params[0];
let ty_name: String = python_host_abi_type_name(¶m.ty);
match ¶m.ty {
ResolvedTypeRef::AbiType(AbiBuiltin::StringView) => {
out.push_str(&format!(
" {name}_sv: StringView = ctypes.cast(args, ctypes.POINTER(StringView)).contents\n",
name = param.name
));
out.push_str(&format!(
" {name}: str = ctypes.string_at({name}_sv.ptr, {name}_sv.len).decode('utf-8')\n",
name = param.name
));
}
ResolvedTypeRef::AbiType(AbiBuiltin::Buffer) => {
out.push_str(&format!(
" {name}_buf: Buffer = ctypes.cast(args, ctypes.POINTER(Buffer)).contents\n",
name = param.name
));
out.push_str(&format!(
" {name}: bytes = ctypes.string_at({name}_buf.ptr, {name}_buf.len)\n",
name = param.name
));
}
ResolvedTypeRef::UserDefined(_) => {
if let Some(e) = python_enum_for_type(¶m.ty, enums) {
let raw_ctype: &str = python_ctype_for_repr(&e.repr);
out.push_str(&format!(
" {name}_raw: int = ctypes.cast(args, ctypes.POINTER({raw_ctype})).contents.value\n",
name = param.name
));
out.push_str(&format!(
" {name}: {ty} = {ty}({name}_raw)\n",
name = param.name,
ty = ty_name
));
} else {
out.push_str(&format!(
" {name}: {ty} = ctypes.cast(args, ctypes.POINTER({ty})).contents\n",
name = param.name,
ty = ty_name
));
}
}
_ => {
out.push_str(&format!(
" {name}: {ty} = ctypes.cast(args, ctypes.POINTER({ty})).contents.value\n",
name = param.name,
ty = ty_name
));
}
}
} else {
let pack_name: String = python_host_pack_struct_name(class_name, &func.name);
out.push_str(&format!(
" packed: {pack_name} = ctypes.cast(args, ctypes.POINTER({pack_name})).contents\n"
));
for param in &func.params {
match ¶m.ty {
ResolvedTypeRef::AbiType(AbiBuiltin::StringView) => {
out.push_str(&format!(
" {name}: str = ctypes.string_at(packed.{name}.ptr, packed.{name}.len).decode('utf-8')\n",
name = param.name
));
}
ResolvedTypeRef::AbiType(AbiBuiltin::Buffer) => {
out.push_str(&format!(
" {name}: bytes = ctypes.string_at(packed.{name}.ptr, packed.{name}.len)\n",
name = param.name
));
}
_ => {
let ty_name: String = python_host_param_type_name(¶m.ty);
if python_enum_for_type(¶m.ty, enums).is_some() {
out.push_str(&format!(
" {name}: {ty} = {ty}(packed.{name})\n",
name = param.name,
ty = ty_name
));
} else {
out.push_str(&format!(
" {name}: {ty} = packed.{name}\n",
name = param.name,
ty = ty_name
));
}
}
}
}
}
}
fn generate_python_host_dispatch_call(out: &mut String, func: &ResolvedFunction) {
let call_args: String = func
.params
.iter()
.map(|p: &ResolvedParam| p.name.clone())
.collect::<Vec<String>>()
.join(", ");
if func.returns.is_some() {
out.push_str(&format!(
" result = impl.{func_name}({call_args})\n",
func_name = func.name
));
match func.returns.as_ref() {
Some(ResolvedTypeRef::Primitive(p)) => {
let ctype: &str = python_primitive_type(p);
out.push_str(&format!(" result_c = {ctype}(result)\n"));
out.push_str(
" ctypes.memmove(out, ctypes.byref(result_c), ctypes.sizeof(result_c))\n",
);
}
_ => {
out.push_str(
" ctypes.memmove(out, ctypes.byref(result), ctypes.sizeof(result))\n",
);
}
}
} else {
out.push_str(&format!(
" impl.{func_name}({call_args})\n",
func_name = func.name
));
}
}
fn python_bool(value: bool) -> &'static str {
if value { "True" } else { "False" }
}
fn python_host_abi_type_name(ty: &ResolvedTypeRef) -> String {
match ty {
ResolvedTypeRef::Primitive(p) => python_primitive_type(p).to_owned(),
ResolvedTypeRef::AbiType(AbiBuiltin::StringView) => "StringView".to_owned(),
ResolvedTypeRef::AbiType(AbiBuiltin::Buffer) => "Buffer".to_owned(),
ResolvedTypeRef::AbiType(AbiBuiltin::Ptr) => "ctypes.c_void_p".to_owned(),
ResolvedTypeRef::AbiType(AbiBuiltin::Void) => "None".to_owned(),
ResolvedTypeRef::UserDefined(name) => name.clone(),
}
}
fn python_enum_for_type<'a>(ty: &ResolvedTypeRef, enums: &'a [EnumDef]) -> Option<&'a EnumDef> {
match ty {
ResolvedTypeRef::UserDefined(name) => enums.iter().find(|e: &&EnumDef| &e.name == name),
_ => None,
}
}
fn python_ctype_for_repr(repr: &ReprType) -> &'static str {
match repr {
ReprType::U8 => "ctypes.c_uint8",
ReprType::U16 => "ctypes.c_uint16",
ReprType::U32 => "ctypes.c_uint32",
ReprType::U64 => "ctypes.c_uint64",
}
}
fn python_pack_field_type(ty: &ResolvedTypeRef, enums: &[EnumDef]) -> String {
match python_enum_for_type(ty, enums) {
Some(e) => python_ctype_for_repr(&e.repr).to_owned(),
None => python_host_abi_type_name(ty),
}
}
fn generate_guest_peer_callers_file(ir: &ValidatedIr, peers: &[&ResolvedContract]) -> String {
let any_arena: bool = peers
.iter()
.any(|c: &&ResolvedContract| contract_needs_arena(c));
let mut out: String = String::new();
out.push_str("# THIS FILE IS AUTO-GENERATED BY polyplugc. DO NOT EDIT.\n");
out.push_str(
"# Re-generate with: polyplugc generate --bundle bundle.toml --lang python --out <dir>\n\n",
);
out.push_str("from __future__ import annotations\n");
out.push_str("import ctypes\n");
out.push_str("from typing import Any, Optional\n\n");
if any_arena {
out.push_str("from polyplug_abi import (\n");
out.push_str(" AbiError,\n");
out.push_str(" AbiErrorCode,\n");
out.push_str(" ArenaOverflowBlock,\n");
out.push_str(" CallArena,\n");
out.push_str(" DispatchType,\n");
out.push_str(" GuestContractHandle,\n");
out.push_str(" GuestContractInstance,\n");
out.push_str(" GuestContractInterface,\n");
out.push_str(" HostApi,\n");
out.push_str(" StringView,\n");
out.push_str(")\n\n");
} else {
out.push_str("from polyplug_abi import (\n");
out.push_str(" AbiError,\n");
out.push_str(" AbiErrorCode,\n");
out.push_str(" DispatchType,\n");
out.push_str(" GuestContractHandle,\n");
out.push_str(" GuestContractInstance,\n");
out.push_str(" GuestContractInterface,\n");
out.push_str(" HostApi,\n");
out.push_str(" StringView,\n");
out.push_str(")\n\n");
}
let mut type_imports: BTreeSet<String> = BTreeSet::new();
for contract in peers {
for func in &contract.functions {
collect_type_refs(&mut type_imports, &func.params, &func.returns);
}
}
if !type_imports.is_empty() {
let import_list: String = type_imports.into_iter().collect::<Vec<String>>().join(", ");
out.push_str(&format!("from guest.types import {import_list}\n\n"));
}
if any_arena {
out.push_str(&format!("CALL_ARENA_BUF_LEN: int = {CALL_ARENA_BUF_LEN}\n"));
out.push_str("_OVERFLOW_BLOCK_ALIGN: int = ctypes.sizeof(ctypes.c_void_p)\n\n");
out.push_str("def _arena_reset(arena: CallArena) -> None:\n");
out.push_str(" \"\"\"Rewind the arena for reuse: retain all overflow blocks.\"\"\"\n");
out.push_str(" arena.cur = arena.base\n");
out.push_str(" block: int = arena.first_overflow or 0\n");
out.push_str(" header_size: int = ctypes.sizeof(ArenaOverflowBlock)\n");
out.push_str(" while block:\n");
out.push_str(" hdr: ArenaOverflowBlock = ArenaOverflowBlock.from_address(block)\n");
out.push_str(" hdr.used = header_size\n");
out.push_str(" block = hdr.next or 0\n");
out.push('\n');
out.push_str("def _arena_free_all(arena: CallArena, host: ctypes.c_void_p) -> None:\n");
out.push_str(" \"\"\"Free every overflow block and clear the chain (teardown).\"\"\"\n");
out.push_str(" block: int = arena.first_overflow or 0\n");
out.push_str(" host_api: Any = ctypes.cast(host, ctypes.POINTER(HostApi))\n");
out.push_str(" while block:\n");
out.push_str(" hdr: ArenaOverflowBlock = ArenaOverflowBlock.from_address(block)\n");
out.push_str(" next_block: int = hdr.next or 0\n");
out.push_str(" capacity: int = hdr.capacity\n");
out.push_str(" if arena.host:\n");
out.push_str(
" host_api.contents.free(host, block, capacity, _OVERFLOW_BLOCK_ALIGN)\n",
);
out.push_str(" block = next_block\n");
out.push_str(" arena.first_overflow = None\n\n");
}
out.push_str(
"_DISPATCH_FN_CTYPE = ctypes.CFUNCTYPE(None, GuestContractInstance, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p)\n\n",
);
for contract in peers {
let min_ver: u32 = peer_min_version(ir, contract.contract_id);
generate_peer_caller_class(&mut out, contract, min_ver, &ir.enums);
}
out
}
fn generate_guest_peer_callers_stub(ir: &ValidatedIr, peers: &[&ResolvedContract]) -> String {
let mut out: String = String::new();
out.push_str("# THIS FILE IS AUTO-GENERATED BY polyplugc. DO NOT EDIT.\n");
out.push_str(
"# Re-generate with: polyplugc generate --bundle bundle.toml --lang python --out <dir>\n\n",
);
out.push_str("from __future__ import annotations\n");
out.push_str("from typing import Optional\n\n");
out.push_str("from polyplug_abi import GuestContractInstance, StringView\n\n");
for contract in peers {
let class_name: String = format!("{}Peer", contract_name_to_camel(&contract.name));
let min_ver: u32 = peer_min_version(ir, contract.contract_id);
out.push_str(&format!("class {class_name}:\n"));
out.push_str(" @classmethod\n");
out.push_str(&format!(
" def resolve(cls, host_ptr: int, min_version: int = {min_ver}) -> Optional[Self]: ...\n"
));
out.push_str(" def __del__(self) -> None: ...\n");
out.push_str(" def is_valid(self) -> bool: ...\n");
for func in &contract.functions {
let sig_params: String = build_python_sig_params(func);
let ret_type: String = python_return_type(&func.returns);
out.push_str(&format!(
" def {name}(self{sig_params}) -> {ret_type}: ...\n",
name = func.name,
));
}
out.push('\n');
}
out
}
fn generate_peer_caller_class(
out: &mut String,
contract: &ResolvedContract,
min_version: u32,
enums: &[EnumDef],
) {
let class_name: String = format!("{}Peer", contract_name_to_camel(&contract.name));
let needs_arena: bool = contract_needs_arena(contract);
out.push_str(&format!(
"# Peer caller for guest contract `{}` (id=0x{:016X})\n",
contract.name, contract.contract_id
));
out.push_str(&format!("class {class_name}:\n"));
out.push_str(" \"\"\"Guest→guest peer caller dispatching directly through the cached peer interface.\n\n");
out.push_str(
" Per-instance state (interface, instance, arena) is instance-owned; the host\n",
);
out.push_str(" pointer is passed explicitly to resolve() by the calling implementation\n");
out.push_str(" (which received it from its author factory) — no SDK-level host storage.\n");
out.push_str(" \"\"\"\n\n");
out.push_str(
" def __init__(self, host_ptr: int, handle: GuestContractHandle, interface: int, instance: GuestContractInstance) -> None:\n",
);
out.push_str(" self._host_ptr: int = host_ptr\n");
out.push_str(
" # Retain the handle so the cache can re-resolve after a hot-reload (which\n",
);
out.push_str(" # swaps a new interface into the same slot) or report a gone peer.\n");
out.push_str(" self._handle: GuestContractHandle = handle\n");
out.push_str(" self._interface: int = interface\n");
out.push_str(" self._instance: GuestContractInstance = instance\n");
out.push_str(
" # Fetch the registry revision counter ONCE, then read its current value, so\n",
);
out.push_str(
" # every later call can detect a reload/unload with a direct atomic load (one\n",
);
out.push_str(
" # aligned 64-bit load through the cached pointer, no call into the runtime)\n",
);
out.push_str(" # and re-resolve before dispatching.\n");
out.push_str(" host: Any = ctypes.cast(host_ptr, ctypes.POINTER(HostApi))\n");
out.push_str(
" self._revision_ptr: int = host.contents.revision_counter(host_ptr) or 0\n",
);
out.push_str(" self._cached_revision: int = self._live_revision()\n");
if needs_arena {
out.push_str(
" # Per-caller call arena backed by C-heap memory (not Python GC heap).\n",
);
out.push_str(
" self._arena_buf: Any = ctypes.create_string_buffer(CALL_ARENA_BUF_LEN)\n",
);
out.push_str(
" buf_addr: int = ctypes.cast(self._arena_buf, ctypes.c_void_p).value or 0\n",
);
out.push_str(" self._arena: CallArena = CallArena(\n");
out.push_str(" cur=buf_addr,\n");
out.push_str(" end=buf_addr + CALL_ARENA_BUF_LEN,\n");
out.push_str(" base=buf_addr,\n");
out.push_str(" host=host_ptr,\n");
out.push_str(" first_overflow=None,\n");
out.push_str(" )\n");
}
out.push('\n');
out.push_str(" @classmethod\n");
out.push_str(&format!(
" def resolve(cls, host_ptr: int, min_version: int = {min_version}) -> Optional[Self]:\n"
));
out.push_str(" \"\"\"Discover and resolve the peer contract through the host.\n\n");
out.push_str(" Args:\n");
out.push_str(" host_ptr: the HostApi pointer the author factory received\n");
out.push_str(" (polyplug_create_<plugin>).\n");
out.push_str(" min_version: minimum major version to accept.\n\n");
out.push_str(" Returns:\n");
out.push_str(" Self if the contract is found and resolved, None otherwise.\n");
out.push_str(" \"\"\"\n");
out.push_str(" if host_ptr == 0:\n");
out.push_str(" return None\n");
out.push_str(" host: Any = ctypes.cast(host_ptr, ctypes.POINTER(HostApi))\n");
out.push_str(&format!(
" handle: GuestContractHandle = host.contents.find_guest_contract(host_ptr, 0x{:016X}, min_version)\n",
contract.contract_id
));
out.push_str(" # Pass handle straight to resolve_guest_contract — do not inspect\n");
out.push_str(
" # handle fields. resolve_guest_contract returns null for invalid handles.\n",
);
out.push_str(
" interface: int = host.contents.resolve_guest_contract(host_ptr, handle)\n",
);
out.push_str(" if not interface:\n");
out.push_str(" return None\n");
out.push_str(" # A null instance is valid for stateless contracts.\n");
out.push_str(" # Route creation through the host so the runtime tracks the instance.\n");
out.push_str(
" # create_guest_instance is an out-param ABI fn: (this, interface, args, out_instance) -> None.\n",
);
out.push_str(" instance: GuestContractInstance = GuestContractInstance()\n");
out.push_str(
" host.contents.create_guest_instance(host_ptr, interface, None, ctypes.byref(instance))\n",
);
out.push_str(" return cls(host_ptr, handle, interface, instance)\n\n");
out.push_str(" def __del__(self) -> None:\n");
out.push_str(
" \"\"\"Destroy the peer instance via host-mediated destroy_guest_instance.\"\"\"\n",
);
out.push_str(" if not getattr(self, \"_interface\", None):\n");
out.push_str(" return\n");
out.push_str(
" # If the registry changed since we resolved, the cached interface and instance\n",
);
out.push_str(
" # are stale — a reload/unload reclaimed their backing — so destroying through\n",
);
out.push_str(
" # the dead interface would be UB; the reload/unload already reclaimed the\n",
);
out.push_str(" # instance, so skip the destroy entirely.\n");
out.push_str(" if self._live_revision() != self._cached_revision:\n");
out.push_str(" return\n");
out.push_str(" host: Any = ctypes.cast(self._host_ptr, ctypes.POINTER(HostApi))\n");
out.push_str(" host.contents.destroy_guest_instance(self._host_ptr, self._interface, self._instance)\n");
out.push_str(" self._interface = 0\n");
if needs_arena {
out.push_str(" if getattr(self, \"_arena\", None) is not None:\n");
out.push_str(" _arena_free_all(self._arena, self._host_ptr)\n");
}
out.push('\n');
out.push_str(" def is_valid(self) -> bool:\n");
out.push_str(" \"\"\"Return True if the peer interface is resolved and live.\"\"\"\n");
out.push_str(" return bool(getattr(self, \"_interface\", None))\n\n");
out.push_str(" def _live_revision(self) -> int:\n");
out.push_str(
" \"\"\"Read the registry revision through the cached pointer — one aligned\n",
);
out.push_str(
" atomic load, no call into the runtime. Returns the cached value (\"unchanged\")\n",
);
out.push_str(
" when there is no counter (null host/runtime), making the check a no-op.\n",
);
out.push_str(" \"\"\"\n");
out.push_str(" if not self._revision_ptr:\n");
out.push_str(" return self._cached_revision\n");
out.push_str(" return ctypes.c_uint64.from_address(self._revision_ptr).value\n\n");
out.push_str(" def _revalidate(self) -> bool:\n");
out.push_str(
" \"\"\"Re-resolve the cached peer interface after the registry changed under us.\n\n",
);
out.push_str(
" A hot-reload swapped a new interface into the same slot, so the retained\n",
);
out.push_str(
" handle still resolves — to the new interface; an unload vacated the slot,\n",
);
out.push_str(" so it resolves to null and False is returned (the peer is gone).\n\n");
out.push_str(
" The old instance is ABANDONED, never destroyed: after a reload its interface\n",
);
out.push_str(
" — and the guest-side state it created — is already epoch-reclaimed, so calling\n",
);
out.push_str(
" the dead interface's destroy_instance would be UB. A fresh instance is created\n",
);
out.push_str(" on the new interface and cached for direct dispatch.\n");
out.push_str(" \"\"\"\n");
out.push_str(" host: Any = ctypes.cast(self._host_ptr, ctypes.POINTER(HostApi))\n");
out.push_str(
" interface: int = host.contents.resolve_guest_contract(self._host_ptr, self._handle)\n",
);
out.push_str(" if not interface:\n");
out.push_str(" return False\n");
out.push_str(" instance: GuestContractInstance = GuestContractInstance()\n");
out.push_str(
" host.contents.create_guest_instance(self._host_ptr, interface, None, ctypes.byref(instance))\n",
);
out.push_str(" self._interface = interface\n");
out.push_str(" self._instance = instance\n");
out.push_str(" self._cached_revision = self._live_revision()\n");
out.push_str(" return True\n\n");
for func in &contract.functions {
generate_peer_caller_method(out, func, &contract_name_to_struct(&contract.name), enums);
}
out.push('\n');
}
fn generate_peer_caller_method(
out: &mut String,
func: &ResolvedFunction,
contract_struct: &str,
enums: &[EnumDef],
) {
let fn_id: u32 = func.function_id;
let needs_arena: bool = fn_needs_arena(func);
let sig_params: String = build_python_sig_params(func);
let ret_type: String = python_return_type(&func.returns);
out.push_str(&format!(
" def {name}(self{sig_params}) -> {ret_type}:\n",
name = func.name,
));
out.push_str(
" if self._live_revision() != self._cached_revision and not self._revalidate():\n",
);
out.push_str(
" raise RuntimeError(f\"peer call failed (code {int(AbiErrorCode.NotFound)})\")\n",
);
if needs_arena {
out.push_str(
" # Rewind arena at call start; prior returned views are invalidated.\n",
);
out.push_str(" _arena_reset(self._arena)\n");
}
emit_python_host_args_setup(out, func, contract_struct, enums);
emit_python_host_out_setup(out, &func.returns, enums);
out.push_str(" # SAFETY: the interface pointer is valid for the wrapper lifetime.\n");
out.push_str(" iface_ptr: ctypes.POINTER(GuestContractInterface) = ctypes.cast(self._interface, ctypes.POINTER(GuestContractInterface))\n");
out.push_str(" interface: GuestContractInterface = iface_ptr.contents\n");
out.push_str(
" # Out-param ABI: dispatch writes the AbiError through a trailing pointer.\n",
);
out.push_str(" err: AbiError = AbiError()\n");
out.push_str(" if interface.dispatch_type == DispatchType.Native:\n");
out.push_str(&format!(
" if {fn_id} >= interface.dispatch.native.function_count:\n"
));
out.push_str(" raise RuntimeError(f\"peer call failed (code {int(AbiErrorCode.FunctionNotAvailable)})\")\n");
out.push_str(" functions_ptr: int = interface.dispatch.native.functions\n");
out.push_str(&format!(
" fn_ptr: int = ctypes.cast(functions_ptr + {fn_id} * 8, ctypes.POINTER(ctypes.c_void_p)).contents.value\n"
));
out.push_str(
" dispatch_fn: _DISPATCH_FN_CTYPE = ctypes.cast(fn_ptr, _DISPATCH_FN_CTYPE)\n",
);
out.push_str(
" # SAFETY: instance is valid for the wrapper lifetime; args_ptr points\n",
);
out.push_str(
" # to valid args, out_ptr to a valid return-type buffer per the ABI contract.\n",
);
out.push_str(" dispatch_fn(self._instance, args_ptr, out_ptr, ctypes.byref(err))\n");
out.push_str(" else:\n");
out.push_str(
" # SAFETY: the union's vm variant is active per dispatch_type; args/out\n",
);
if needs_arena {
out.push_str(
" # are valid per the ABI contract. The arena was reset at call start.\n",
);
out.push_str(&format!(
" interface.dispatch.vm.call(interface.dispatch.vm.loader_data, self._instance, {fn_id}, args_ptr, out_ptr, ctypes.byref(self._arena), ctypes.byref(err))\n"
));
} else {
out.push_str(
" # are valid per the ABI contract. The null arena selects the host->alloc fallback.\n",
);
out.push_str(&format!(
" interface.dispatch.vm.call(interface.dispatch.vm.loader_data, self._instance, {fn_id}, args_ptr, out_ptr, None, ctypes.byref(err))\n"
));
}
out.push_str(" if err.code != AbiErrorCode.Ok:\n");
out.push_str(" raise RuntimeError(f\"peer call failed (code {err.code})\")\n");
if has_return_value(&func.returns) {
out.push_str(&format!(
" return {}\n\n",
python_host_caller_return_expr(&func.returns, enums)
));
} else {
out.push_str(" return None\n\n");
}
}
#[cfg(test)]
mod tests {
#![allow(clippy::expect_used)]
use super::*;
use crate::ir::ReprType;
use crate::ir::ResolvedDependency;
use crate::ir::Version;
use polyplug_codegen::ResolvedBundleFile;
#[test]
fn generate_python_enum_non_bitflag() {
let e: EnumDef = EnumDef {
name: "PixelFormat".to_owned(),
repr: ReprType::U32,
bitflag: false,
variants: vec![
EnumVariant {
name: "Unknown".to_owned(),
value: "0".to_owned(),
},
EnumVariant {
name: "Rgba8".to_owned(),
value: "1".to_owned(),
},
],
};
let mut out: String = String::new();
generate_python_enum(&mut out, &e);
assert!(
out.contains("class PixelFormat(enum.IntEnum)"),
"missing class def: {out}"
);
assert!(out.contains("UNKNOWN = 0"), "missing UNKNOWN: {out}");
assert!(out.contains("RGBA8 = 1"), "missing RGBA8: {out}");
}
#[test]
fn generate_python_enum_bitflag() {
let e: EnumDef = EnumDef {
name: "ImageFlags".to_owned(),
repr: ReprType::U32,
bitflag: true,
variants: vec![
EnumVariant {
name: "None".to_owned(),
value: "0".to_owned(),
},
EnumVariant {
name: "Compressed".to_owned(),
value: "1".to_owned(),
},
],
};
let mut out: String = String::new();
generate_python_enum(&mut out, &e);
assert!(
out.contains("class ImageFlags(enum.IntFlag)"),
"missing class def: {out}"
);
assert!(out.contains("NONE = 0"), "missing NONE: {out}");
}
#[test]
fn python_interface_factories_no_wildcard_imports() {
let ir: ValidatedIr = ValidatedIr {
types: vec![],
enums: vec![],
contracts: vec![],
host_contracts: vec![ResolvedHostContract {
name: "host.logger".to_owned(),
contract_id: 0x1234_5678_9ABC_DEF0_u64,
version: Version {
major: 1,
minor: 0,
patch: 0,
},
singleton: false,
functions: vec![
ResolvedFunction {
name: "log".to_owned(),
function_id: 0,
params: vec![ResolvedParam {
name: "message".to_owned(),
ty: ResolvedTypeRef::AbiType(AbiBuiltin::StringView),
}],
returns: None,
},
ResolvedFunction {
name: "log_with_level".to_owned(),
function_id: 1,
params: vec![
ResolvedParam {
name: "level".to_owned(),
ty: ResolvedTypeRef::UserDefined("LogLevel".to_owned()),
},
ResolvedParam {
name: "message".to_owned(),
ty: ResolvedTypeRef::AbiType(AbiBuiltin::StringView),
},
],
returns: None,
},
],
}],
bundle: None,
};
let out: String = generate_python_host_interface_factories_file(&ir);
assert!(
!out.contains("import *"),
"generated python must not contain wildcard imports: {out}"
);
assert!(
out.contains("from host.contracts import HostLogger"),
"contract classes must be imported explicitly: {out}"
);
assert!(
out.contains("from host.types import LogLevel"),
"user-defined types referenced by thunks must be imported explicitly: {out}"
);
}
#[test]
fn python_interface_factories_route_through_native_trampoline() {
let ir: ValidatedIr = ValidatedIr {
types: vec![],
enums: vec![EnumDef {
name: "LogLevel".to_owned(),
repr: ReprType::U32,
bitflag: false,
variants: vec![EnumVariant {
name: "Info".to_owned(),
value: "1".to_owned(),
}],
}],
contracts: vec![],
host_contracts: vec![ResolvedHostContract {
name: "host.logger".to_owned(),
contract_id: 0x1234_5678_9ABC_DEF0_u64,
version: Version {
major: 1,
minor: 0,
patch: 0,
},
singleton: false,
functions: vec![
ResolvedFunction {
name: "log".to_owned(),
function_id: 0,
params: vec![ResolvedParam {
name: "message".to_owned(),
ty: ResolvedTypeRef::AbiType(AbiBuiltin::StringView),
}],
returns: None,
},
ResolvedFunction {
name: "log_with_level".to_owned(),
function_id: 1,
params: vec![
ResolvedParam {
name: "level".to_owned(),
ty: ResolvedTypeRef::UserDefined("LogLevel".to_owned()),
},
ResolvedParam {
name: "message".to_owned(),
ty: ResolvedTypeRef::AbiType(AbiBuiltin::StringView),
},
],
returns: None,
},
],
}],
bundle: None,
};
let out: String = generate_python_host_interface_factories_file(&ir);
assert!(
!out.contains("ctypes.CFUNCTYPE(AbiError"),
"no AbiError-returning ctypes callbacks may be emitted: {out}"
);
assert!(
!out.contains("ctypes.cast(None, type(interface.create_instance))"),
"create_instance must not be a NULL function pointer: {out}"
);
assert!(
out.contains("interface.create_instance = ctypes.cast(python_bridge_lib.polyplug_python_host_create_instance"),
"create_instance must use the native stub: {out}"
);
assert!(
out.contains("interface.dispatch.vm.call = ctypes.cast(python_bridge_lib.polyplug_python_host_vm_dispatch"),
"dispatch must route through the native trampoline: {out}"
);
assert!(
out.contains("interface.dispatch_type = DispatchType.VirtualMachine"),
"factories must register VM dispatch: {out}"
);
assert!(
out.contains("(\"level\", ctypes.c_uint32),"),
"enum pack fields must use the repr ctype: {out}"
);
assert!(
out.contains("level: LogLevel = LogLevel(packed.level)"),
"enum pack fields must be wrapped back into the IntEnum: {out}"
);
assert!(
out.contains("interface._keepalive = (bridge, _dispatch, impl)"),
"bridge and dispatcher must be anchored on the interface keepalive: {out}"
);
}
#[test]
fn python_host_dispatch_single_enum_param_uses_repr_ctype() {
let enums: Vec<EnumDef> = vec![EnumDef {
name: "LogLevel".to_owned(),
repr: ReprType::U32,
bitflag: false,
variants: vec![EnumVariant {
name: "Info".to_owned(),
value: "1".to_owned(),
}],
}];
let func: ResolvedFunction = ResolvedFunction {
name: "set_level".to_owned(),
function_id: 0,
params: vec![ResolvedParam {
name: "level".to_owned(),
ty: ResolvedTypeRef::UserDefined("LogLevel".to_owned()),
}],
returns: None,
};
let mut out: String = String::new();
generate_python_host_dispatch_args(&mut out, "HostLogger", &func, &enums);
assert!(
out.contains(
"level_raw: int = ctypes.cast(args, ctypes.POINTER(ctypes.c_uint32)).contents.value"
),
"single enum param must read the repr ctype: {out}"
);
assert!(
out.contains("level: LogLevel = LogLevel(level_raw)"),
"raw integer must be wrapped back into the IntEnum: {out}"
);
assert!(
!out.contains("ctypes.POINTER(LogLevel)"),
"ctypes.POINTER(IntEnum) is a TypeError and must never be emitted: {out}"
);
}
#[test]
fn python_generated_files_no_wildcard_imports() {
let ir: ValidatedIr = ValidatedIr {
types: vec![],
enums: vec![],
contracts: vec![],
host_contracts: vec![ResolvedHostContract {
name: "host.logger".to_owned(),
contract_id: 0x1234_5678_9ABC_DEF0_u64,
version: Version {
major: 1,
minor: 0,
patch: 0,
},
singleton: false,
functions: vec![ResolvedFunction {
name: "log".to_owned(),
function_id: 0,
params: vec![],
returns: None,
}],
}],
bundle: None,
};
let generator: PythonGenerator = PythonGenerator;
let mut files: GeneratedFiles = GeneratedFiles::default();
generator.generate_host(&ir, &mut files).expect("host gen");
generator
.generate_guest(&ir, &mut files)
.expect("guest gen");
for file in &files.files {
assert!(
!file.content.contains("import *"),
"wildcard import in generated {}",
file.path.display()
);
}
}
#[test]
fn host_contract_name_to_python_class_basic() {
assert_eq!(
host_contract_name_to_python_class("host.logger"),
"HostLogger"
);
}
#[test]
fn host_contract_name_to_python_class_nested() {
assert_eq!(
host_contract_name_to_python_class("host.fs.reader"),
"HostFsReader"
);
}
#[test]
fn host_contract_name_to_python_class_already_has_host() {
assert_eq!(
host_contract_name_to_python_class("host.HostLogger"),
"HostLogger"
);
}
#[test]
fn python_host_param_type_stringview() {
let ty: ResolvedTypeRef = ResolvedTypeRef::AbiType(AbiBuiltin::StringView);
assert_eq!(python_host_param_type_name(&ty), "str");
}
#[test]
fn python_host_param_type_buffer() {
let ty: ResolvedTypeRef = ResolvedTypeRef::AbiType(AbiBuiltin::Buffer);
assert_eq!(python_host_param_type_name(&ty), "bytes");
}
#[test]
fn python_host_param_type_primitives() {
assert_eq!(
python_host_param_type_name(&ResolvedTypeRef::Primitive(PrimitiveType::U32)),
"int"
);
assert_eq!(
python_host_param_type_name(&ResolvedTypeRef::Primitive(PrimitiveType::I64)),
"int"
);
assert_eq!(
python_host_param_type_name(&ResolvedTypeRef::Primitive(PrimitiveType::F64)),
"float"
);
assert_eq!(
python_host_param_type_name(&ResolvedTypeRef::Primitive(PrimitiveType::Bool)),
"bool"
);
}
#[test]
fn python_host_return_type_stringview() {
let ty: ResolvedTypeRef = ResolvedTypeRef::AbiType(AbiBuiltin::StringView);
assert_eq!(python_host_return_type_name(&ty), "str");
}
#[test]
fn python_host_return_type_buffer() {
let ty: ResolvedTypeRef = ResolvedTypeRef::AbiType(AbiBuiltin::Buffer);
assert_eq!(python_host_return_type_name(&ty), "bytes");
}
#[test]
fn generate_host_contract_abc_basic() {
let contract: ResolvedHostContract = ResolvedHostContract {
name: "host.logger".to_owned(),
contract_id: 0x123456789ABCDEF0,
version: crate::ir::Version {
major: 1,
minor: 0,
patch: 0,
},
singleton: false,
functions: vec![ResolvedFunction {
name: "log".to_owned(),
function_id: 0,
params: vec![
ResolvedParam {
name: "level".to_owned(),
ty: ResolvedTypeRef::Primitive(PrimitiveType::U32),
},
ResolvedParam {
name: "message".to_owned(),
ty: ResolvedTypeRef::AbiType(AbiBuiltin::StringView),
},
],
returns: None,
}],
};
let mut out: String = String::new();
generate_python_host_contract_abc(&mut out, &contract);
assert!(
out.contains("class HostLogger(ABC)"),
"missing class def: {out}"
);
assert!(
out.contains("@abstractmethod"),
"missing abstractmethod: {out}"
);
assert!(
out.contains("def log(self, level: int, message: str) -> None"),
"missing method: {out}"
);
assert!(out.contains("pass"), "missing pass: {out}");
}
#[test]
fn generate_host_contract_abc_with_return() {
let contract: ResolvedHostContract = ResolvedHostContract {
name: "host.fs.reader".to_owned(),
contract_id: 0xDEADBEEF,
version: crate::ir::Version {
major: 1,
minor: 0,
patch: 0,
},
singleton: false,
functions: vec![ResolvedFunction {
name: "read".to_owned(),
function_id: 0,
params: vec![ResolvedParam {
name: "path".to_owned(),
ty: ResolvedTypeRef::AbiType(AbiBuiltin::StringView),
}],
returns: Some(ResolvedTypeRef::AbiType(AbiBuiltin::Buffer)),
}],
};
let mut out: String = String::new();
generate_python_host_contract_abc(&mut out, &contract);
assert!(
out.contains("class HostFsReader(ABC)"),
"missing class def: {out}"
);
assert!(
out.contains("def read(self, path: str) -> bytes"),
"missing method with return: {out}"
);
}
#[test]
fn generate_host_contracts_file_empty() {
let ir: ValidatedIr = ValidatedIr {
types: vec![],
enums: vec![],
contracts: vec![],
host_contracts: vec![],
bundle: None,
};
let result: String = generate_host_contracts_file(&ir);
assert!(result.contains("from abc import ABC, abstractmethod"));
assert!(!result.contains("class"));
}
#[test]
fn generate_host_contracts_file_with_contract() {
let contract: ResolvedHostContract = ResolvedHostContract {
name: "host.logger".to_owned(),
contract_id: 0x123456789ABCDEF0,
version: crate::ir::Version {
major: 1,
minor: 0,
patch: 0,
},
singleton: false,
functions: vec![ResolvedFunction {
name: "log".to_owned(),
function_id: 0,
params: vec![],
returns: None,
}],
};
let ir: ValidatedIr = ValidatedIr {
types: vec![],
enums: vec![],
contracts: vec![],
host_contracts: vec![contract],
bundle: None,
};
let result: String = generate_host_contracts_file(&ir);
assert!(result.contains("class HostLogger(ABC)"));
assert!(result.contains("HOSTLOGGER_CONTRACT_ID: int = 0x123456789ABCDEF0"));
}
#[test]
fn generate_host_contracts_stub_basic() {
let contract: ResolvedHostContract = ResolvedHostContract {
name: "host.logger".to_owned(),
contract_id: 0x123456789ABCDEF0,
version: crate::ir::Version {
major: 1,
minor: 0,
patch: 0,
},
singleton: false,
functions: vec![ResolvedFunction {
name: "log".to_owned(),
function_id: 0,
params: vec![ResolvedParam {
name: "level".to_owned(),
ty: ResolvedTypeRef::Primitive(PrimitiveType::U32),
}],
returns: None,
}],
};
let ir: ValidatedIr = ValidatedIr {
types: vec![],
enums: vec![],
contracts: vec![],
host_contracts: vec![contract],
bundle: None,
};
let result: String = generate_host_contracts_stub(&ir);
assert!(result.contains("class HostLogger(ABC)"));
assert!(result.contains("def log(self, level: int) -> None: ..."));
assert!(result.contains("HOSTLOGGER_CONTRACT_ID: int"));
}
#[test]
fn host_contract_name_to_python_caller_basic() {
assert_eq!(
host_contract_name_to_python_caller("host.logger"),
"HostLoggerContract"
);
}
#[test]
fn host_contract_name_to_python_caller_nested() {
assert_eq!(
host_contract_name_to_python_caller("host.fs.reader"),
"HostFsReaderContract"
);
}
#[test]
fn host_contract_name_to_python_caller_already_has_host() {
assert_eq!(
host_contract_name_to_python_caller("host.HostLogger"),
"HostLoggerContract"
);
}
#[test]
fn python_guest_caller_param_type_stringview() {
let ty: ResolvedTypeRef = ResolvedTypeRef::AbiType(AbiBuiltin::StringView);
assert_eq!(python_guest_caller_param_type_name(&ty), "str");
}
#[test]
fn python_guest_caller_param_type_buffer() {
let ty: ResolvedTypeRef = ResolvedTypeRef::AbiType(AbiBuiltin::Buffer);
assert_eq!(python_guest_caller_param_type_name(&ty), "bytes");
}
#[test]
fn python_guest_caller_param_type_primitives() {
assert_eq!(
python_guest_caller_param_type_name(&ResolvedTypeRef::Primitive(PrimitiveType::U32)),
"int"
);
assert_eq!(
python_guest_caller_param_type_name(&ResolvedTypeRef::Primitive(PrimitiveType::I64)),
"int"
);
assert_eq!(
python_guest_caller_param_type_name(&ResolvedTypeRef::Primitive(PrimitiveType::F64)),
"float"
);
assert_eq!(
python_guest_caller_param_type_name(&ResolvedTypeRef::Primitive(PrimitiveType::Bool)),
"bool"
);
}
#[test]
fn python_guest_caller_return_type_stringview() {
let ty: ResolvedTypeRef = ResolvedTypeRef::AbiType(AbiBuiltin::StringView);
assert_eq!(python_guest_caller_return_type_name(&ty), "memoryview");
}
#[test]
fn python_guest_caller_return_type_buffer() {
let ty: ResolvedTypeRef = ResolvedTypeRef::AbiType(AbiBuiltin::Buffer);
assert_eq!(python_guest_caller_return_type_name(&ty), "memoryview");
}
#[test]
fn generate_guest_host_contract_caller_basic() {
let contract: ResolvedHostContract = ResolvedHostContract {
name: "host.logger".to_owned(),
contract_id: 0x123456789ABCDEF0,
version: crate::ir::Version {
major: 1,
minor: 0,
patch: 0,
},
singleton: false,
functions: vec![ResolvedFunction {
name: "log".to_owned(),
function_id: 0,
params: vec![
ResolvedParam {
name: "level".to_owned(),
ty: ResolvedTypeRef::Primitive(PrimitiveType::U32),
},
ResolvedParam {
name: "message".to_owned(),
ty: ResolvedTypeRef::AbiType(AbiBuiltin::StringView),
},
],
returns: None,
}],
};
let mut out: String = String::new();
generate_python_guest_host_contract_caller(&mut out, &contract, &[]);
assert!(
out.contains("class HostLoggerContract:"),
"missing class def: {out}"
);
assert!(
out.contains(
"def __init__(self, interface: int, instance: HostContractInstance) -> None:"
),
"missing __init__: {out}"
);
assert!(
out.contains("def from_host(cls, host_ptr: int, min_version: int = 0)"),
"missing from_host: {out}"
);
assert!(
out.contains("host.contents.resolve_host_contract_interface(host_ptr,"),
"from_host must resolve the interface: {out}"
);
assert!(
out.contains(
"instance: HostContractInstance = host.contents.get_host_contract(host_ptr,"
),
"from_host must obtain the instance: {out}"
);
assert!(
out.contains("dispatch_fn(self._instance.data, args_ptr, out_ptr, ctypes.byref(err))"),
"native dispatch must pass instance.data and out_err: {out}"
);
assert!(
!out.contains("impl_ptr"),
"must not read the nonexistent NativeDispatch.impl_ptr: {out}"
);
assert!(
out.contains("def is_valid(self) -> bool:"),
"missing is_valid: {out}"
);
assert!(
out.contains("def log(self, level: int, message: str) -> None:"),
"missing method: {out}"
);
}
#[test]
fn generate_guest_host_contract_caller_with_return() {
let contract: ResolvedHostContract = ResolvedHostContract {
name: "host.fs.reader".to_owned(),
contract_id: 0xDEADBEEF,
version: crate::ir::Version {
major: 1,
minor: 0,
patch: 0,
},
singleton: false,
functions: vec![ResolvedFunction {
name: "read".to_owned(),
function_id: 0,
params: vec![ResolvedParam {
name: "path".to_owned(),
ty: ResolvedTypeRef::AbiType(AbiBuiltin::StringView),
}],
returns: Some(ResolvedTypeRef::AbiType(AbiBuiltin::Buffer)),
}],
};
let mut out: String = String::new();
generate_python_guest_host_contract_caller(&mut out, &contract, &[]);
assert!(
out.contains("class HostFsReaderContract:"),
"missing class def: {out}"
);
assert!(
out.contains("def read(self, path: str) -> memoryview:"),
"missing method with memoryview return: {out}"
);
}
#[test]
fn generate_guest_host_contracts_file_empty() {
let ir: ValidatedIr = ValidatedIr {
types: vec![],
enums: vec![],
contracts: vec![],
host_contracts: vec![],
bundle: None,
};
let result: String = generate_guest_host_contracts_file(&ir);
assert!(result.contains("from polyplug_abi import"));
assert!(!result.contains("class"));
}
#[test]
fn generate_guest_host_contracts_file_with_contract() {
let contract: ResolvedHostContract = ResolvedHostContract {
name: "host.logger".to_owned(),
contract_id: 0x123456789ABCDEF0,
version: crate::ir::Version {
major: 1,
minor: 0,
patch: 0,
},
singleton: false,
functions: vec![ResolvedFunction {
name: "log".to_owned(),
function_id: 0,
params: vec![],
returns: None,
}],
};
let ir: ValidatedIr = ValidatedIr {
types: vec![],
enums: vec![],
contracts: vec![],
host_contracts: vec![contract],
bundle: None,
};
let result: String = generate_guest_host_contracts_file(&ir);
assert!(result.contains("class HostLoggerContract:"));
assert!(result.contains("HOSTLOGGERCONTRACT_ID: int = 0x123456789ABCDEF0"));
}
#[test]
fn generate_guest_host_contracts_stub_basic() {
let contract: ResolvedHostContract = ResolvedHostContract {
name: "host.logger".to_owned(),
contract_id: 0x123456789ABCDEF0,
version: crate::ir::Version {
major: 1,
minor: 0,
patch: 0,
},
singleton: false,
functions: vec![ResolvedFunction {
name: "log".to_owned(),
function_id: 0,
params: vec![ResolvedParam {
name: "level".to_owned(),
ty: ResolvedTypeRef::Primitive(PrimitiveType::U32),
}],
returns: None,
}],
};
let ir: ValidatedIr = ValidatedIr {
types: vec![],
enums: vec![],
contracts: vec![],
host_contracts: vec![contract],
bundle: None,
};
let result: String = generate_guest_host_contracts_stub(&ir);
assert!(result.contains("class HostLoggerContract:"));
assert!(result.contains(
"def __init__(self, interface: int, instance: HostContractInstance) -> None: ..."
));
assert!(result.contains(
"def from_host(cls, host_ptr: int, min_version: int = 0) -> Self | None: ..."
));
assert!(result.contains("def is_valid(self) -> bool: ..."));
assert!(result.contains("def log(self, level: int) -> None: ..."));
assert!(result.contains("HOSTLOGGERCONTRACT_ID: int"));
}
fn make_validator_contract() -> ResolvedContract {
ResolvedContract {
name: "pipeline.Validator".to_owned(),
contract_id: 0xABCD_EF01_2345_6789,
version: crate::ir::Version {
major: 1,
minor: 0,
patch: 0,
},
functions: vec![ResolvedFunction {
name: "validate".to_owned(),
function_id: 0,
params: vec![ResolvedParam {
name: "input".to_owned(),
ty: ResolvedTypeRef::AbiType(AbiBuiltin::StringView),
}],
returns: Some(ResolvedTypeRef::AbiType(AbiBuiltin::StringView)),
}],
}
}
fn make_bundle_with_dep(contract_id: u64) -> crate::ir::ResolvedBundle {
ResolvedBundle {
name: "python_transformer".to_owned(),
version: Version {
major: 1,
minor: 0,
patch: 0,
},
loader: "python".to_owned(),
file: ResolvedBundleFile::Single("transformer.py".to_owned()),
plugins: vec![],
bundle_id: 0,
dependencies: vec![ResolvedDependency::ByContract {
contract: "pipeline.Validator".to_owned(),
contract_id,
min_version: 1,
}],
needs_reinit_on_dep_reload: false,
}
}
#[test]
fn peer_caller_emitted_for_declared_dependency() {
let contract: ResolvedContract = make_validator_contract();
let bundle: crate::ir::ResolvedBundle = make_bundle_with_dep(contract.contract_id);
let ir: ValidatedIr = ValidatedIr {
types: vec![],
enums: vec![],
contracts: vec![contract],
host_contracts: vec![],
bundle: Some(bundle),
};
let peers: Vec<&ResolvedContract> = collect_peer_contracts(&ir);
assert!(
!peers.is_empty(),
"collect_peer_contracts must return the matching contract"
);
let result: String = generate_guest_peer_callers_file(&ir, &peers);
assert!(
result.contains("class PipelineValidatorPeer:"),
"peer class must be emitted: {result}"
);
assert!(
result.contains("def resolve(cls, host_ptr: int, min_version: int"),
"resolve() must take host_ptr explicitly: {result}"
);
assert!(
!result.contains("get_host_interface"),
"resolve() must not read a stored host pointer: {result}"
);
assert!(
result.contains("interface.dispatch_type == DispatchType.Native"),
"peer dispatch must branch on the interface dispatch_type: {result}"
);
assert!(
result.contains("interface.dispatch.native.functions"),
"Native peer dispatch must call the native function pointer: {result}"
);
assert!(
result.contains("interface.dispatch.vm.call(interface.dispatch.vm.loader_data"),
"VM peer dispatch must route through the vm.call trampoline: {result}"
);
assert!(
result.contains(&format!("0x{:016X}", 0xABCD_EF01_2345_6789_u64)),
"must embed the contract_id in find_guest_contract call: {result}"
);
assert!(
!result.contains("_HOST_PTR"),
"must not store host_ptr in a module global: {result}"
);
}
#[test]
fn no_peer_callers_without_dependencies() {
let contract: ResolvedContract = make_validator_contract();
let ir_no_dep: ValidatedIr = ValidatedIr {
types: vec![],
enums: vec![],
contracts: vec![contract],
host_contracts: vec![],
bundle: None,
};
let peers_no_bundle: Vec<&ResolvedContract> = collect_peer_contracts(&ir_no_dep);
assert!(
peers_no_bundle.is_empty(),
"collect_peer_contracts must return empty when bundle is None"
);
let contract2: ResolvedContract = make_validator_contract();
let bundle_wrong_dep: crate::ir::ResolvedBundle =
make_bundle_with_dep(0xDEAD_BEEF_CAFE_BABE);
let ir_wrong_dep: ValidatedIr = ValidatedIr {
types: vec![],
enums: vec![],
contracts: vec![contract2],
host_contracts: vec![],
bundle: Some(bundle_wrong_dep),
};
let peers_wrong: Vec<&ResolvedContract> = collect_peer_contracts(&ir_wrong_dep);
assert!(
peers_wrong.is_empty(),
"collect_peer_contracts must return empty when dep id does not match any contract"
);
}
fn pixel_format_enums() -> Vec<EnumDef> {
vec![EnumDef {
name: "PixelFormat".to_owned(),
repr: ReprType::U32,
bitflag: false,
variants: vec![EnumVariant {
name: "Rgba8".to_owned(),
value: "1".to_owned(),
}],
}]
}
#[test]
fn python_host_caller_single_enum_param_boxes_repr_ctype() {
let func: ResolvedFunction = ResolvedFunction {
name: "set_format".to_owned(),
function_id: 0,
params: vec![ResolvedParam {
name: "fmt".to_owned(),
ty: ResolvedTypeRef::UserDefined("PixelFormat".to_owned()),
}],
returns: None,
};
let mut out: String = String::new();
emit_python_host_args_setup(&mut out, &func, "ImageCodecContract", &pixel_format_enums());
assert!(
out.contains("fmt_val: ctypes.c_uint32 = ctypes.c_uint32(int(fmt))"),
"enum param must be boxed in its repr ctype: {out}"
);
assert!(
out.contains("ctypes.byref(fmt_val)"),
"the repr box's address must be passed: {out}"
);
assert!(
!out.contains("ctypes.byref(fmt)"),
"ctypes.byref(IntEnum) is a TypeError and must never be emitted: {out}"
);
}
#[test]
fn python_host_caller_enum_return_uses_repr_ctype_and_converts_back() {
let returns: Option<ResolvedTypeRef> =
Some(ResolvedTypeRef::UserDefined("PixelFormat".to_owned()));
let mut out: String = String::new();
emit_python_host_out_setup(&mut out, &returns, &pixel_format_enums());
assert!(
out.contains("out_val: ctypes.c_uint32 = ctypes.c_uint32()"),
"enum return must allocate a repr-ctype out slot: {out}"
);
assert!(
!out.contains("PixelFormat()"),
"IntEnum has no zero-arg constructor and must never be instantiated: {out}"
);
let ret_expr: String = python_host_caller_return_expr(&returns, &pixel_format_enums());
assert_eq!(
ret_expr, "PixelFormat(out_val.value)",
"the raw repr integer must convert back into the IntEnum"
);
}
#[test]
fn python_guest_host_contract_enum_return_uses_repr_ctype() {
let returns: Option<ResolvedTypeRef> =
Some(ResolvedTypeRef::UserDefined("PixelFormat".to_owned()));
let mut out: String = String::new();
emit_python_guest_host_contract_out_setup(&mut out, &returns, &pixel_format_enums());
assert!(
out.contains("result: ctypes.c_uint32 = ctypes.c_uint32()"),
"enum return must allocate a repr-ctype out slot: {out}"
);
let mut ret: String = String::new();
emit_python_guest_host_contract_success_return(&mut ret, &returns, &pixel_format_enums());
assert!(
ret.contains("return PixelFormat(result.value)"),
"the raw repr integer must convert back into the IntEnum: {ret}"
);
}
#[test]
fn python_host_caller_arg_pack_enum_field_uses_repr_ctype() {
let func: ResolvedFunction = ResolvedFunction {
name: "convert".to_owned(),
function_id: 0,
params: vec![
ResolvedParam {
name: "fmt".to_owned(),
ty: ResolvedTypeRef::UserDefined("PixelFormat".to_owned()),
},
ResolvedParam {
name: "count".to_owned(),
ty: ResolvedTypeRef::Primitive(PrimitiveType::U32),
},
],
returns: None,
};
let mut out: String = String::new();
emit_python_arg_pack_struct(&mut out, "ImageCodecContract", &func, &pixel_format_enums());
assert!(
out.contains("(\"fmt\", ctypes.c_uint32)"),
"enum pack fields must use the repr ctype — IntEnum in _fields_ is a TypeError: {out}"
);
assert!(
!out.contains("(\"fmt\", PixelFormat)"),
"IntEnum class must never appear in _fields_: {out}"
);
}
}