import json
import pathlib
import re
import subprocess
ROOT = pathlib.Path(__file__).resolve().parent
API_DIR = ROOT.parent
PY_RS_OUT = ROOT / "python" / "src" / "generated.rs"
PY_OUT = ROOT / "python" / "python" / "nightshade" / "_generated.py"
GO_OUT = ROOT / "go" / "commands_gen.go"
C_RS_OUT = ROOT / "c" / "src" / "generated.rs"
C_H_OUT = ROOT / "c" / "include" / "nightshade_commands.h"
VEC_SUFFIX = ["x", "y", "z", "w"]
PRIMITIVE_COPY = {"f32", "u32", "i32", "u8", "usize", "bool", "[f32; 2]", "[f32; 3]", "[f32; 4]"}
ARRAY_LEN = {"[f32; 2]": 2, "[f32; 3]": 3, "[f32; 4]": 4}
C_SCALAR = {"f32": "float", "u32": "uint32_t", "i32": "int32_t", "usize": "uintptr_t", "bool": "bool", "u8": "uint8_t"}
GO_SCALAR = {"f32": "float64", "u32": "uint32", "i32": "int32", "usize": "int", "bool": "bool", "u8": "uint8"}
GO_C_SCALAR = {"f32": "C.float", "u32": "C.uint32_t", "i32": "C.int32_t", "usize": "C.uintptr_t", "bool": "C.bool", "u8": "C.uint8_t"}
RUST_SCALAR = {"f32": "f32", "u32": "u32", "i32": "i32", "usize": "usize", "bool": "bool", "u8": "u8"}
def run(args, cwd):
return subprocess.run(args, cwd=cwd, capture_output=True, text=True, check=True)
def load_introspection():
result = run(["cargo", "run", "-q", "--example", "dump_manifest"], API_DIR)
data = json.loads(result.stdout)
return data["manifest"], data["command_schema"]
def snake(name):
return re.sub(r"(?<!^)(?=[A-Z])", "_", name).lower()
def camel(name):
head, *tail = name.split("_")
return head + "".join(part.capitalize() for part in tail)
def rust_enum(type_name):
return type_name.split("::")[-1]
def c_enum(type_name):
return "Ns" + rust_enum(type_name)
def screaming(name):
return re.sub(r"(?<!^)(?=[A-Z])", "_", name).upper()
def is_enum_type(type_name, role):
if role == "copy":
return type_name not in PRIMITIVE_COPY
if role == "owned":
return type_name != "String"
return False
def field_schema(schema, variant, field):
for entry in schema["oneOf"]:
properties = entry.get("properties", {})
if variant in properties:
return properties[variant].get("properties", {}).get(field)
return None
def classify_payload(payload):
kind = payload.get("type")
if kind == "number":
return "f32"
if kind == "integer":
return "u32"
if kind == "array":
items = payload.get("items", {})
count = payload.get("maxItems")
if items.get("type") == "number" and count in (2, 3, 4):
return f"f32x{count}"
if items.get("type") == "integer" and count is None:
return "bytes"
raise SystemExit(f"unhandled enum payload: {payload}")
def enum_variants(enum_schema):
variants = []
if "oneOf" not in enum_schema and "enum" in enum_schema:
for name in enum_schema["enum"]:
variants.append((name, None))
return variants
for entry in enum_schema["oneOf"]:
if "const" in entry:
variants.append((entry["const"], None))
continue
properties = entry["properties"]
name = next(iter(properties))
payload = properties[name]
if payload.get("type") == "object":
fields = [(field, classify_payload(spec)) for field, spec in payload["properties"].items()]
variants.append((name, ("struct", fields)))
else:
variants.append((name, ("tuple", classify_payload(payload))))
return variants
def collect_enums(commands, schema):
enums = {}
for command in commands:
for field in command["fields"]:
type_name = field["type_name"]
if not is_enum_type(type_name, field["role"]) or type_name in enums:
continue
spec = field_schema(schema, command["name"], field["name"])
variants = enum_variants(spec)
plain = all(payload is None for _name, payload in variants)
enums[type_name] = {
"rust": rust_enum(type_name),
"c": c_enum(type_name),
"plain": plain,
"variants": variants,
}
return enums
def data_struct_fields(variants):
fields = []
for name, payload in variants:
if payload is None:
continue
kind, body = payload
if kind == "struct":
for field, scalar in body:
fields.append((f"{snake(name)}_{field}", scalar))
elif body == "bytes":
fields.append((snake(name), "bytes"))
else:
fields.append((snake(name), body))
return fields
def rust_field_type(scalar):
if scalar.startswith("f32x"):
return f"[f32; {scalar[4:]}]"
return RUST_SCALAR.get(scalar, scalar)
def variant_constructor(rust_enum_name, name, payload, source):
if payload is None:
return f"{rust_enum_name}::{name}"
kind, body = payload
if kind == "struct":
parts = [f"{field}: {source(f'{snake(name)}_{field}', scalar)}" for field, scalar in body]
return f"{rust_enum_name}::{name} {{ {', '.join(parts)} }}"
return f"{rust_enum_name}::{name}({source(snake(name), body)})"
C_VALUE_REPLY = {
"entity": ("NsEntity", "NsEntity { id: 0, generation: 0 }", "reply_entity"),
"bool": ("bool", "false", "reply_bool"),
"float": ("f32", "0.0", "reply_float"),
"vector": ("NsVec3", "NsVec3 { x: 0.0, y: 0.0, z: 0.0 }", "reply_vector"),
}
def c_field(name, type_name, role, enums):
if role == "entity":
return ([f"{name}: NsEntity"], [f"NsEntity {name}"], [], f"to_ref({name})")
if role == "opt_entity":
return (
[f"{name}: NsEntity", f"{name}_present: bool"],
[f"NsEntity {name}", f"bool {name}_present"],
[],
f"if {name}_present {{ Some(to_ref({name})) }} else {{ None }}",
)
if role in ("vec2", "vec3") or (role == "copy" and type_name in ARRAY_LEN):
count = ARRAY_LEN.get(type_name, 3)
comps = [f"{name}_{VEC_SUFFIX[index]}" for index in range(count)]
return ([f"{comp}: f32" for comp in comps], [f"float {comp}" for comp in comps], [],
"[" + ", ".join(comps) + "]")
if role == "copy" and type_name in C_SCALAR:
return ([f"{name}: {RUST_SCALAR[type_name]}"], [f"{C_SCALAR[type_name]} {name}"], [], name)
if role in ("text",) or (role == "owned" and type_name == "String"):
return ([f"{name}: *const c_char"], [f"const char *{name}"],
[f"let {name} = unsafe {{ cstr_to_string({name}) }};"], name)
if role == "bytes":
return (
[f"{name}: *const u8", f"{name}_len: usize"],
[f"const uint8_t *{name}", f"uintptr_t {name}_len"],
[f"let {name} = unsafe {{ vec_from({name}, {name}_len) }};"],
name,
)
if is_enum_type(type_name, role):
info = enums[type_name]
if info["plain"]:
return ([f"{name}: u32"], [f"{info['c']} {name}"], [], f"{snake(info['rust'])}_from({name})")
converter = f"unsafe {{ {snake(info['rust'])}_from({name}) }}" if role == "owned" else f"{snake(info['rust'])}_from({name})"
return ([f"{name}: {info['c']}"], [f"{info['c']} {name}"], [], converter)
raise SystemExit(f"c: unhandled field {name}: {type_name} [{role}]")
def c_command(command, enums):
rust_params, c_params, pre, exprs = [], [], [], []
for field in command["fields"]:
more_rust, more_c, more_pre, expr = c_field(field["name"], field["type_name"], field["role"], enums)
rust_params += more_rust
c_params += more_c
pre += more_pre
exprs.append((field["name"], expr))
return rust_params, c_params, pre, exprs
def command_literal(variant, exprs):
if not exprs:
return f"Command::{variant} {{}}"
parts = [name if expr == name else f"{name}: {expr}" for name, expr in exprs]
return f"Command::{variant} {{ " + ", ".join(parts) + " }"
SAFETY = [
"/// # Safety",
"/// `app` must be a live handle from `ns_open`, and any pointer arguments valid.",
"#[unsafe(no_mangle)]",
]
def emit_c_enum_converters(enums):
lines = []
for info in enums.values():
rust = info["rust"]
if info["plain"]:
arms = [f" {index} => {rust}::{name}," for index, (name, _p) in enumerate(info["variants"][:-1])]
arms.append(f" _ => {rust}::{info['variants'][-1][0]},")
lines.append("")
lines.append(f"pub fn {snake(rust)}_from(value: u32) -> {rust} {{")
lines.append(" match value {")
lines += arms
lines.append(" }")
lines.append("}")
continue
struct_fields = data_struct_fields(info["variants"])
field_decls = ", ".join(
f"pub {name}: {rust_field_type(scalar)}" if scalar != "bytes" else f"pub {name}: *const u8, pub {name}_len: usize"
for name, scalar in struct_fields
)
lines.append("")
lines.append("#[repr(C)]")
lines.append("#[derive(Clone, Copy)]")
lines.append(f"pub struct {info['c']} {{ pub tag: u32, {field_decls} }}")
def source(field, scalar, value="value"):
if scalar == "bytes":
return f"unsafe {{ vec_from({value}.{field}, {value}.{field}_len) }}"
return f"{value}.{field}"
unsafe = "unsafe " if any(scalar == "bytes" for _n, scalar in struct_fields) else ""
arms = []
for index, (name, payload) in enumerate(info["variants"]):
head = "_" if index == len(info["variants"]) - 1 else str(index)
arms.append(f" {head} => {variant_constructor(rust, name, payload, lambda f, s: source(f, s))},")
lines.append("")
lines.append(f"pub {unsafe}fn {snake(rust)}_from(value: {info['c']}) -> {rust} {{")
lines.append(" match value.tag {")
lines += arms
lines.append(" }")
lines.append("}")
return lines
def emit_c_rust(commands, enums):
used = sorted({info["rust"] for info in enums.values()})
imports = ", ".join(["Command", "CommandReply", "submit_command"] + used)
lines = [
"// Code generated by generate.py. DO NOT EDIT.",
"",
"use crate::*;",
f"use nightshade_api::prelude::{{{imports}}};",
"use std::os::raw::c_char;",
]
for command in commands:
variant = command["name"]
reply = command["reply"]
rust_params, _c, pre, exprs = c_command(command, enums)
name = "ns_" + snake(variant)
literal = command_literal(variant, exprs)
call = f"submit_command(&mut app.inner.world, &{literal})"
pre_block = "".join(f" {statement}\n" for statement in pre)
lines.append("")
lines += SAFETY
if reply == "none":
params = ", ".join(["app: *mut NsApp"] + rust_params)
lines.append(f'pub unsafe extern "C" fn {name}({params}) {{')
lines.append(" let Some(app) = (unsafe { app.as_mut() }) else { return };")
lines.append(pre_block + f" {call};")
lines.append("}")
elif reply in ("opt_entity", "opt_vector"):
out_type = "NsEntity" if reply == "opt_entity" else "NsVec3"
arm = "CommandReply::Entity(value)" if reply == "opt_entity" else "CommandReply::Vector(value)"
write = (
"NsEntity { id: value.id, generation: value.generation }"
if reply == "opt_entity"
else "NsVec3 { x: value[0], y: value[1], z: value[2] }"
)
params = ", ".join(["app: *mut NsApp"] + rust_params + [f"out: *mut {out_type}"])
lines.append(f'pub unsafe extern "C" fn {name}({params}) -> bool {{')
lines.append(" let Some(app) = (unsafe { app.as_mut() }) else { return false };")
lines.append(pre_block + f" match {call} {{")
lines.append(f" {arm} => {{")
lines.append(f" unsafe {{ *out = {write} }};")
lines.append(" true")
lines.append(" }")
lines.append(" _ => false,")
lines.append(" }")
lines.append("}")
elif reply in ("entities", "strings"):
ret = "*mut NsEntity" if reply == "entities" else "*mut *mut c_char"
helper = "reply_entities" if reply == "entities" else "reply_strings"
params = ", ".join(["app: *mut NsApp"] + rust_params + ["out_len: *mut usize"])
lines.append(f'pub unsafe extern "C" fn {name}({params}) -> {ret} {{')
lines.append(" let Some(app) = (unsafe { app.as_mut() }) else { return std::ptr::null_mut() };")
lines.append(pre_block + f" unsafe {{ {helper}({call}, out_len) }}")
lines.append("}")
else:
ret_type, default, helper = C_VALUE_REPLY[reply]
params = ", ".join(["app: *mut NsApp"] + rust_params)
lines.append(f'pub unsafe extern "C" fn {name}({params}) -> {ret_type} {{')
lines.append(f" let Some(app) = (unsafe {{ app.as_mut() }}) else {{ return {default} }};")
lines.append(pre_block + f" {helper}({call})")
lines.append("}")
lines += emit_c_enum_converters(enums)
return "\n".join(lines) + "\n"
def emit_c_header(commands, enums):
lines = [
"/* Code generated by generate.py. DO NOT EDIT. */",
"#ifndef NIGHTSHADE_COMMANDS_H",
"#define NIGHTSHADE_COMMANDS_H",
"",
'#include "nightshade.h"',
"",
"#ifdef __cplusplus",
'extern "C" {',
"#endif",
]
for info in enums.values():
prefix = "NS_" + screaming(info["rust"])
if info["plain"]:
members = ", ".join(f"{prefix}_{screaming(name)}" for name, _p in info["variants"])
lines.append(f"\ntypedef enum {{ {members} }} {info['c']};")
continue
members = ", ".join(f"{prefix}_{screaming(name)}" for name, _p in info["variants"])
lines.append(f"\ntypedef enum {{ {members} }} {info['c']}Tag;")
struct_fields = data_struct_fields(info["variants"])
decls = ["uint32_t tag;"]
for fname, scalar in struct_fields:
if scalar == "bytes":
decls.append(f"const uint8_t *{fname}; uintptr_t {fname}_len;")
elif scalar.startswith("f32x"):
decls.append(f"float {fname}[{scalar[4:]}];")
else:
decls.append(f"{C_SCALAR.get(scalar, 'float')} {fname};")
lines.append(f"typedef struct {{ {' '.join(decls)} }} {info['c']};")
for command in commands:
variant = command["name"]
reply = command["reply"]
_r, c_params, _p, _e = c_command(command, enums)
name = "ns_" + snake(variant)
params = ["NsApp *app"] + c_params
if reply == "none":
ret = "void"
elif reply == "opt_entity":
ret = "bool"
params.append("NsEntity *out")
elif reply == "opt_vector":
ret = "bool"
params.append("NsVec3 *out")
elif reply == "entities":
ret = "NsEntity *"
params.append("uintptr_t *out_len")
elif reply == "strings":
ret = "char **"
params.append("uintptr_t *out_len")
else:
ret = {"entity": "NsEntity", "bool": "bool", "float": "float", "vector": "NsVec3"}[reply]
joiner = "" if ret.endswith("*") else " "
lines.append(f"\n{ret}{joiner}{name}({', '.join(params)});")
lines += ["", "#ifdef __cplusplus", "}", "#endif", "", "#endif"]
return "\n".join(lines) + "\n"
def go_reply(reply):
return {
"none": "",
"entity": "Entity",
"opt_entity": "(Entity, bool)",
"bool": "bool",
"float": "float64",
"vector": "[3]float64",
"opt_vector": "([3]float64, bool)",
"entities": "[]Entity",
"strings": "[]string",
}[reply]
def go_enum_type(info):
return info["rust"] if info["plain"] else info["rust"]
def go_field(name, type_name, role, enums):
arg = camel(name)
if role == "entity":
return ([f"{arg} Entity"], [], [f"cEntity({arg})"])
if role == "opt_entity":
pre = [
f"\tvar {arg}Value C.NsEntity",
f"\t{arg}Present := false",
f"\tif {arg} != nil {{",
f"\t\t{arg}Value = cEntity(*{arg})",
f"\t\t{arg}Present = true",
"\t}",
]
return ([f"{arg} *Entity"], pre, [f"{arg}Value", f"C._Bool({arg}Present)"])
if role in ("vec2", "vec3") or (role == "copy" and type_name in ARRAY_LEN):
count = ARRAY_LEN.get(type_name, 3)
return ([f"{arg} [{count}]float64"], [], [f"C.float({arg}[{index}])" for index in range(count)])
if role == "copy" and type_name == "bool":
return ([f"{arg} bool"], [], [f"C._Bool({arg})"])
if role == "copy" and type_name in GO_SCALAR:
return ([f"{arg} {GO_SCALAR[type_name]}"], [], [f"{GO_C_SCALAR[type_name]}({arg})"])
if role in ("text",) or (role == "owned" and type_name == "String"):
pre = [f"\tc{arg} := C.CString({arg})", f"\tdefer C.free(unsafe.Pointer(c{arg}))"]
return ([f"{arg} string"], pre, [f"c{arg}"])
if role == "bytes":
pre = [
f"\tvar {arg}Ptr *C.uint8_t",
f"\tif len({arg}) > 0 {{",
f"\t\t{arg}Ptr = (*C.uint8_t)(unsafe.Pointer(&{arg}[0]))",
"\t}",
]
return ([f"{arg} []byte"], pre, [f"{arg}Ptr", f"C.uintptr_t(len({arg}))"])
if is_enum_type(type_name, role):
info = enums[type_name]
if info["plain"]:
return ([f"{arg} {info['rust']}"], [], [f"C.{info['c']}({arg})"])
pre, carg = go_data_enum(arg, info)
return ([f"{arg} {info['rust']}"], pre, [carg])
raise SystemExit(f"go: unhandled field {name}: {type_name} [{role}]")
def go_data_enum(arg, info):
cvar = f"c{arg.capitalize()}"
pre = [f"\tvar {cvar} C.{info['c']}", f"\t{cvar}.tag = C.uint32_t({arg}.Kind)"]
for fname, scalar in data_struct_fields(info["variants"]):
if scalar == "bytes":
field = "".join(part.capitalize() for part in fname.split("_"))
pre += [
f"\tif len({arg}.{field}) > 0 {{",
f"\t\t{cvar}.{fname} = (*C.uint8_t)(unsafe.Pointer(&{arg}.{field}[0]))",
f"\t\t{cvar}.{fname}_len = C.uintptr_t(len({arg}.{field}))",
"\t}",
]
elif scalar.startswith("f32x"):
field = "".join(part.capitalize() for part in fname.split("_"))
count = int(scalar[4:])
pre.append(f"\tfor index := 0; index < {count}; index++ {{ {cvar}.{fname}[index] = C.float({arg}.{field}[index]) }}")
else:
field = "".join(part.capitalize() for part in fname.split("_"))
pre.append(f"\t{cvar}.{fname} = C.float({arg}.{field})")
return pre, cvar
def emit_go(commands, enums):
lines = [
"// Code generated by generate.py. DO NOT EDIT.",
"",
"package nightshade",
"",
"/*",
"#include <stdlib.h>",
'#include "nightshade_commands.h"',
"*/",
'import "C"',
"",
'import "unsafe"',
]
lines += emit_go_enums(enums)
for command in commands:
variant = command["name"]
reply = command["reply"]
go_params, pre, c_args = [], [], []
for field in command["fields"]:
params, more_pre, args = go_field(field["name"], field["type_name"], field["role"], enums)
go_params += params
pre += more_pre
c_args += args
cname = "C.ns_" + snake(variant)
ret = go_reply(reply)
signature = f"func (a *App) {variant}({', '.join(go_params)})"
if ret:
signature += f" {ret}"
lines.append("")
lines.append(signature + " {")
lines += pre
all_args = ", ".join(["a.raw"] + c_args)
lines += go_call(reply, cname, all_args, c_args)
lines.append("}")
return "\n".join(lines) + "\n"
def go_call(reply, cname, all_args, c_args):
if reply == "none":
return [f"\t{cname}({all_args})"]
if reply == "entity":
return [f"\treturn entityFromC({cname}({all_args}))"]
if reply == "bool":
return [f"\treturn bool({cname}({all_args}))"]
if reply == "float":
return [f"\treturn float64({cname}({all_args}))"]
if reply == "vector":
return [f"\treturn vecFromC({cname}({all_args}))"]
if reply == "opt_entity":
return [
"\tvar out C.NsEntity",
f"\tif bool({cname}({', '.join([all_args, '&out'])})) {{",
"\t\treturn entityFromC(out), true",
"\t}",
"\treturn Entity{}, false",
]
if reply == "opt_vector":
return [
"\tvar out C.NsVec3",
f"\tif bool({cname}({', '.join([all_args, '&out'])})) {{",
"\t\treturn vecFromC(out), true",
"\t}",
"\treturn [3]float64{}, false",
]
if reply == "entities":
return [
"\tvar length C.uintptr_t",
f"\tptr := {cname}({', '.join([all_args, '&length'])})",
"\treturn entitiesFromC(ptr, length)",
]
if reply == "strings":
return [
"\tvar length C.uintptr_t",
f"\tptr := {cname}({', '.join([all_args, '&length'])})",
"\treturn stringsFromC(ptr, length)",
]
raise SystemExit(f"go: unhandled reply {reply}")
def emit_go_enums(enums):
lines = []
for info in enums.values():
rust = info["rust"]
if info["plain"]:
lines.append("")
lines.append(f"type {rust} uint32")
lines.append("")
lines.append("const (")
for index, (name, _p) in enumerate(info["variants"]):
suffix = f" {rust} = iota" if index == 0 else ""
lines.append(f"\t{rust}{name}{suffix}")
lines.append(")")
continue
struct_fields = data_struct_fields(info["variants"])
lines.append("")
lines.append(f"type {rust} struct {{")
lines.append("\tKind uint32")
seen = set()
for fname, scalar in struct_fields:
field = "".join(part.capitalize() for part in fname.split("_"))
if field in seen:
continue
seen.add(field)
if scalar == "bytes":
lines.append(f"\t{field} []byte")
elif scalar.startswith("f32x"):
lines.append(f"\t{field} [{scalar[4:]}]float64")
else:
lines.append(f"\t{field} float64")
lines.append("}")
lines.append("")
lines.append("var (")
for index, (name, payload) in enumerate(info["variants"]):
if payload is None:
lines.append(f"\t{rust}{name} = {rust}{{Kind: {index}}}")
lines.append(")")
for index, (name, payload) in enumerate(info["variants"]):
if payload is None:
continue
kind, body = payload
if kind == "struct":
args = ", ".join(f"{field} float64" for field, _s in body)
inits = ", ".join(
f"{''.join(part.capitalize() for part in f'{snake(name)}_{field}'.split('_'))}: {field}"
for field, _s in body
)
lines.append("")
lines.append(f"func {rust}{name}({args}) {rust} {{ return {rust}{{Kind: {index}, {inits}}} }}")
elif body == "bytes":
lines.append("")
lines.append(f"func {rust}{name}(data []byte) {rust} {{ return {rust}{{Kind: {index}, {name.capitalize()}: data}} }}")
else:
count = int(body[4:])
lines.append("")
lines.append(f"func {rust}{name}(value [{count}]float64) {rust} {{ return {rust}{{Kind: {index}, {name.capitalize()}: value}} }}")
return lines
PY_REPLY = {
"none": ("()", "reply_none"),
"entity": ("(u32, u32)", "reply_entity"),
"opt_entity": ("Option<(u32, u32)>", "reply_opt_entity"),
"bool": ("bool", "reply_bool"),
"float": ("f32", "reply_float"),
"vector": ("[f32; 3]", "reply_vector"),
"opt_vector": ("Option<[f32; 3]>", "reply_opt_vector"),
"entities": ("Vec<(u32, u32)>", "reply_entities"),
"strings": ("Vec<String>", "reply_strings"),
}
def py_rs_field(name, type_name, role, enums):
if role == "entity":
return f"{name}: (u32, u32)", f"to_ref({name})"
if role == "opt_entity":
return f"{name}: Option<(u32, u32)>", f"{name}.map(to_ref)"
if role in ("vec2", "vec3"):
return f"{name}: [f32; {ARRAY_LEN[type_name]}]", name
if role == "copy" and type_name in ARRAY_LEN:
return f"{name}: [f32; {ARRAY_LEN[type_name]}]", name
if role == "copy" and type_name in RUST_SCALAR:
return f"{name}: {RUST_SCALAR[type_name]}", name
if role in ("text",) or (role == "owned" and type_name == "String"):
return f"{name}: String", name
if role == "bytes":
return f"{name}: Vec<u8>", name
if is_enum_type(type_name, role):
info = enums[type_name]
converter = f"{snake(info['rust'])}_from"
if info["plain"]:
return f"{name}: u32", f"{converter}({name})"
params = data_struct_fields(info["variants"])
types = ", ".join("Vec<u8>" if scalar == "bytes" else rust_field_type(scalar) for _n, scalar in params)
names = ", ".join(f"{name}.{index}" for index in range(len(params) + 1))
return f"{name}: (u32, {types})", f"{converter}({names})"
raise SystemExit(f"py: unhandled field {name}: {type_name} [{role}]")
def emit_py_enum_converters(enums):
lines = []
for info in enums.values():
rust = info["rust"]
if info["plain"]:
arms = [f" {index} => {rust}::{name}," for index, (name, _p) in enumerate(info["variants"][:-1])]
arms.append(f" _ => {rust}::{info['variants'][-1][0]},")
lines.append("")
lines.append(f"fn {snake(rust)}_from(value: u32) -> {rust} {{")
lines.append(" match value {")
lines += arms
lines.append(" }")
lines.append("}")
continue
params = data_struct_fields(info["variants"])
signature = ", ".join(["tag: u32"] + [f"{n}: Vec<u8>" if s == "bytes" else f"{n}: {rust_field_type(s)}" for n, s in params])
lines.append("")
lines.append(f"fn {snake(rust)}_from({signature}) -> {rust} {{")
lines.append(" match tag {")
for index, (name, payload) in enumerate(info["variants"]):
head = "_" if index == len(info["variants"]) - 1 else str(index)
lines.append(f" {head} => {variant_constructor(rust, name, payload, lambda f, s: f)},")
lines.append(" }")
lines.append("}")
return lines
def emit_py_rust(commands, enums):
used = sorted({info["rust"] for info in enums.values()})
imports = ", ".join(["Command", "submit_command"] + used)
lines = [
"// Code generated by generate.py. DO NOT EDIT.",
"",
"use super::*;",
f"use nightshade_api::prelude::{{{imports}}};",
"",
"#[pymethods]",
"impl App {",
]
for command in commands:
variant = command["name"]
reply = command["reply"]
params, exprs, optional = [], [], []
for field in command["fields"]:
param, expr = py_rs_field(field["name"], field["type_name"], field["role"], enums)
params.append(param)
exprs.append((field["name"], expr))
optional.append(field["role"] == "opt_entity")
ret, helper = PY_REPLY[reply]
literal = command_literal(variant, exprs)
signature = ", ".join(["&mut self"] + params)
if any(optional):
slots = ", ".join(
f"{field['name']}=None" if is_opt else field["name"]
for field, is_opt in zip(command["fields"], optional)
)
lines.append(f" #[pyo3(signature = ({slots}))]")
lines.append(f" fn {snake(variant)}({signature}) -> PyResult<{ret}> {{")
lines.append(f" {helper}(submit_command(&mut self.inner.world, &{literal}))")
lines.append(" }")
lines.append("}")
lines += emit_py_enum_converters(enums)
return "\n".join(lines) + "\n"
def py_wrap(name, role, type_name, enums):
if role == "entity":
return f"self._ref({name})"
if role == "opt_entity":
return f"self._opt_ref({name})"
if role in ("vec2", "vec3") or (role == "copy" and type_name in ARRAY_LEN):
return f"list({name})"
if role == "copy" and is_enum_type(type_name, role) and enums[type_name]["plain"]:
return f"int({name})"
if role == "bytes":
return f"bytes({name})"
return name
def emit_python(commands, enums):
lines = [
"# Code generated by generate.py. DO NOT EDIT.",
"",
"from types import SimpleNamespace",
"",
"",
"class Commands:",
' """Typed command methods mixed into App, one per registry command."""',
]
for command in commands:
variant = command["name"]
reply = command["reply"]
params = [field["name"] for field in command["fields"]]
args = ", ".join(py_wrap(field["name"], field["role"], field["type_name"], enums) for field in command["fields"])
call = f"self._raw.{snake(variant)}({args})"
signature = ", ".join(["self"] + params)
lines.append("")
lines.append(f" def {snake(variant)}({signature}):")
if reply == "entity":
lines.append(f" return self._wrap_entity({call})")
elif reply == "opt_entity":
lines.append(f" return self._wrap_opt_entity({call})")
elif reply == "entities":
lines.append(f" return self._wrap_entities({call})")
else:
lines.append(f" return {call}")
lines += emit_python_enums(enums)
return "\n".join(lines) + "\n"
def emit_python_enums(enums):
lines = []
for info in enums.values():
rust = info["rust"]
if info["plain"]:
members = ", ".join(f"{screaming(name)}={index}" for index, (name, _p) in enumerate(info["variants"]))
lines.append("")
lines.append("")
lines.append(f"{rust} = SimpleNamespace({members})")
continue
lines.append("")
lines.append("")
lines.append(f"class {rust}:")
lines.append(f' """Constructors for the {rust} command enum."""')
for index, (name, payload) in enumerate(info["variants"]):
method = snake(name)
if method in ("none", "static"):
method += "_"
lines.append("")
lines.append(" @staticmethod")
if payload is None:
empties = build_empty_payload(info)
lines.append(f" def {method}():")
lines.append(f" return ({index}{empties})")
else:
kind, body = payload
if kind == "struct":
args = ", ".join(field for field, _s in body)
lines.append(f" def {method}({args}):")
lines.append(f" return {payload_tuple(info, index, payload)}")
elif body == "bytes":
lines.append(f" def {method}(data):")
lines.append(f" return {payload_tuple(info, index, payload)}")
else:
lines.append(f" def {method}(value):")
lines.append(f" return {payload_tuple(info, index, payload)}")
return lines
def build_empty_payload(info):
parts = []
for _name, scalar in data_struct_fields(info["variants"]):
if scalar == "bytes":
parts.append("b''")
elif scalar.startswith("f32x"):
parts.append("[" + ", ".join(["0.0"] * int(scalar[4:])) + "]")
else:
parts.append("0.0")
return ("" if not parts else ", " + ", ".join(parts))
def payload_tuple(info, index, payload):
slots = []
fields = data_struct_fields(info["variants"])
kind, body = payload
name = info["variants"][index][0]
for fname, scalar in fields:
owner = fname.split("_")[0]
if owner == snake(name):
if kind == "struct":
slots.append(fname.split("_", 1)[1])
elif scalar == "bytes":
slots.append("bytes(data)")
else:
slots.append("list(value)")
elif scalar == "bytes":
slots.append("b''")
elif scalar.startswith("f32x"):
slots.append("[" + ", ".join(["0.0"] * int(scalar[4:])) + "]")
else:
slots.append("0.0")
return "(" + ", ".join([str(index)] + slots) + ")"
def main():
commands, schema = load_introspection()
enums = collect_enums(commands, schema)
outputs = {
C_RS_OUT: emit_c_rust(commands, enums),
C_H_OUT: emit_c_header(commands, enums),
GO_OUT: emit_go(commands, enums),
PY_RS_OUT: emit_py_rust(commands, enums),
PY_OUT: emit_python(commands, enums),
}
for path, content in outputs.items():
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content, newline="\n")
print(f"generated {len(commands)} commands across python, go, and c; {len(enums)} enums")
if __name__ == "__main__":
main()