import sys, copy, os, re
import yaml
IN = sys.argv[1] if len(sys.argv) > 1 else "target/specs/l1_model_fixed.yaml"
OUT = sys.argv[2] if len(sys.argv) > 2 else "target/specs/l2_rust_compatible.yaml"
REPORT = "target/reports/l2_rust_compat_report.md"
with open(IN, "r", encoding="utf-8") as f:
doc = yaml.safe_load(f)
changes = []
def note(msg):
changes.append(msg)
def walk_and_prune(node):
if isinstance(node, dict):
if "default" in node:
node.pop("default", None)
note("Removed default at some nodes")
node.pop("optional", None)
node.pop("$recursiveRef", None)
node.pop("$recursiveAnchor", None)
node.pop("propertyNames", None)
for v in node.values():
walk_and_prune(v)
elif isinstance(node, list):
for v in node:
walk_and_prune(v)
def replace_refs(node, from_ref, to_ref):
if isinstance(node, dict):
if node.get("$ref") == from_ref:
node["$ref"] = to_ref
for v in node.values():
replace_refs(v, from_ref, to_ref)
elif isinstance(node, list):
for v in node:
replace_refs(v, from_ref, to_ref)
def strip_ref_siblings(node):
if isinstance(node, dict):
if "$ref" in node and len(node.keys()) > 1:
ref = node["$ref"]
node.clear()
node["$ref"] = ref
note("Stripped $ref siblings on a schema node")
for v in node.values():
strip_ref_siblings(v)
elif isinstance(node, list):
for v in node:
strip_ref_siblings(v)
def collapse_required_only_unions(node):
if isinstance(node, dict):
for keyword in ("oneOf", "anyOf"):
items = node.get(keyword)
if isinstance(items, list) and len(items) > 0:
type_keys = {"type", "properties", "$ref", "title"}
all_required_only = all(
isinstance(item, dict) and not (set(item.keys()) & type_keys)
for item in items
)
if all_required_only:
del node[keyword]
note(f"Collapsed validation-only {keyword} ({len(items)} items)")
for v in node.values():
collapse_required_only_unions(v)
elif isinstance(node, list):
for v in node:
collapse_required_only_unions(v)
def add_titles_to_unnamed_union_variants(node, path=""):
if isinstance(node, dict):
for keyword in ("oneOf", "anyOf"):
items = node.get(keyword)
if isinstance(items, list):
for i, item in enumerate(items):
if (
isinstance(item, dict)
and "title" not in item
and "$ref" not in item
):
t = item.get("type")
if t == "string":
item["title"] = "Text"
elif t == "integer":
item["title"] = "Integer"
elif t == "number":
item["title"] = "Number"
elif t == "boolean":
item["title"] = "Boolean"
elif t == "array":
items_schema = item.get("items", {})
inner_type = items_schema.get("type", "item").capitalize()
item["title"] = f"ArrayOf{inner_type}s"
elif t == "object":
item["title"] = f"Object{i}"
if "title" in item:
note(
f"Auto-titled {keyword}[{i}] as '{item['title']}' at {path}"
)
for k, v in node.items():
add_titles_to_unnamed_union_variants(v, path=f"{path}.{k}")
elif isinstance(node, list):
for i, v in enumerate(node):
add_titles_to_unnamed_union_variants(v, path=f"{path}[{i}]")
def hoist_inline_unions(schemas):
new_schemas = {}
for schema_name, schema in list(schemas.items()):
if not isinstance(schema, dict):
continue
props = schema.get("properties")
if not isinstance(props, dict):
continue
for prop_name, prop_schema in list(props.items()):
if not isinstance(prop_schema, dict):
continue
for keyword in ("oneOf", "anyOf"):
items = prop_schema.get(keyword)
if not isinstance(items, list) or len(items) == 0:
continue
has_ref = any(
isinstance(item, dict) and "$ref" in item for item in items
)
if has_ref:
continue
has_null_variant = any(
isinstance(item, dict) and item.get("type") == "null"
for item in items
)
if has_null_variant:
continue
complex_types = {"array", "object"}
has_complex = any(
isinstance(item, dict) and item.get("type") in complex_types
for item in items
)
if not has_complex:
continue
camel_prop = "".join(part.capitalize() for part in prop_name.split("_"))
new_name = f"{schema_name}{camel_prop}"
hoisted = {keyword: items, "title": new_name}
desc = prop_schema.get("description")
if desc:
hoisted["description"] = desc
new_schemas[new_name] = hoisted
props[prop_name] = {"$ref": f"#/components/schemas/{new_name}"}
note(
f"Hoisted inline {keyword} {schema_name}.{prop_name} -> {new_name}"
)
break
schemas.update(new_schemas)
schemas = doc.get("components", {}).get("schemas", {})
walk_and_prune(schemas)
strip_ref_siblings(doc)
collapse_required_only_unions(doc)
add_titles_to_unnamed_union_variants(doc)
hoist_inline_unions(schemas)
def set_prop(schema_name, prop, new_schema):
s = schemas.get(schema_name)
if s and isinstance(s, dict):
props = s.setdefault("properties", {})
if prop in props:
props[prop] = new_schema
note(f"Set {schema_name}.{prop} -> {new_schema}")
set_prop("CreateEmbeddingRequest", "model", {"type": "string"})
set_prop("CreateCompletionRequest", "prompt", {"type": "string"})
set_prop("CreateModerationRequest", "input", {"type": "string"})
set_prop("RealtimeResponseCreateParams", "conversation", {"type": "string"})
set_prop("CreateImageEditRequest", "model", {"type": "string"})
set_prop("CreateImageEditRequest", "image", {"type": "string"})
set_prop("CreateImageVariationRequest", "model", {"type": "string"})
set_prop("CreateImageRequest", "model", {"type": "string"})
set_prop("CreateTranscriptionRequest", "model", {"type": "string"})
set_prop("CreateTranslationRequest", "model", {"type": "string"})
set_prop("CreateChatCompletionRequest", "model", {"type": "string"})
set_prop("CreateModerationRequest", "model", {"type": "string"})
set_prop("CreateResponse", "model", {"type": "string"})
for schema, prop in [
("CreateMessageRequest", "content"),
("PredictionContent", "content"),
]:
set_prop(schema, prop, {"type": "string"})
cr = schemas.get("CreateResponse")
if isinstance(cr, dict) and isinstance(cr.get("allOf"), list):
for comp in cr["allOf"]:
if isinstance(comp, dict) and comp.get("type") == "object":
props = comp.setdefault("properties", {})
if "input" in props:
props["input"] = {"type": "string"}
if "model" in props:
props["model"] = {"type": "string"}
set_prop("CreateTranslationRequest", "model", {"type": "string"})
resp = schemas.get("Response")
if isinstance(resp, dict) and isinstance(resp.get("allOf"), list):
for comp in resp["allOf"]:
if isinstance(comp, dict) and comp.get("type") == "object":
props = comp.setdefault("properties", {})
if "instructions" in props:
props["instructions"] = {"type": "string"}
note("Simplified Response.instructions -> string")
for key in [
"CreateTranscriptionResponseStreamEvent",
"OutputItem",
"RealtimeClientEvent",
"RealtimeServerEvent",
"ResponseStreamEvent",
]:
if key in schemas:
schemas[key] = {"type": "object"}
note(f"Coerced schema {key} -> object")
for key in [
"CreateTranscriptionRequestModel",
"CreateTranslationRequestModel",
"CreateImageVariationRequestModel",
"CreateEmbeddingRequestModel",
"CreateImageRequestModel",
]:
if key in schemas:
schemas[key] = {"type": "string"}
for key in ["CreateImageEditRequestModel", "CreateImageEditRequestImage"]:
if key in schemas:
schemas[key] = {"type": "string"}
if "Type" in schemas:
schemas["TypeAction"] = schemas.pop("Type")
replace_refs(doc, "#/components/schemas/Type", "#/components/schemas/TypeAction")
note("Renamed schema Type -> TypeAction and updated refs")
hyphenated_schemas = [name for name in schemas.keys() if "-" in name]
for old_name in hyphenated_schemas:
new_name = re.sub(r"-(\d+)", r"\1", old_name) new_name = new_name.replace("-", "_")
if new_name != old_name:
schemas[new_name] = schemas.pop(old_name)
old_ref = f"#/components/schemas/{old_name}"
new_ref = f"#/components/schemas/{new_name}"
replace_refs(doc, old_ref, new_ref)
note(f"Normalized schema name: {old_name} -> {new_name}")
if "ComputerAction" in schemas:
schemas["ComputerAction"] = {"type": "object"}
paths = doc.get("paths", {})
for path, item in list(paths.items()):
def ensure_path_param(name):
params = item.setdefault("parameters", [])
if not any(
isinstance(p, dict) and p.get("in") == "path" and p.get("name") == name
for p in params
):
params.append(
{
"name": name,
"in": "path",
"required": True,
"schema": {"type": "string"},
}
)
if "{certificate_id}" in path:
ensure_path_param("certificate_id")
if "{project_id}" in path:
ensure_path_param("project_id")
def fix_params_container(container, *, for_path: str):
if not isinstance(container, list):
return
for p in container:
if not isinstance(p, dict):
continue
if p.get("in") == "query":
sch = p.get("schema")
name = p.get("name")
if for_path == "/fine_tuning/jobs" and name == "metadata":
p["schema"] = {
"type": "object",
"additionalProperties": {"type": "string"},
}
p["style"] = "deepObject"
p["explode"] = True
note("Restored deepObject for fine_tuning/jobs?metadata")
continue
if for_path == "/organization/audit_logs" and name == "effective_at":
p["schema"] = {
"type": "object",
"properties": {
"gt": {"type": "integer"},
"gte": {"type": "integer"},
"lt": {"type": "integer"},
"lte": {"type": "integer"},
},
}
p["style"] = "deepObject"
p["explode"] = True
note("Restored deepObject for organization/audit_logs?effective_at")
continue
if isinstance(sch, dict) and (
sch.get("type") == "object"
or sch.get("$ref") == "#/components/schemas/Metadata"
):
p["schema"] = {"type": "string"}
note(f"Coerced query param {p.get('name')} at {path} -> string")
p.pop("style", None)
p.pop("explode", None)
fix_params_container(item.get("parameters"), for_path=path)
for method in ["get", "post", "put", "patch", "delete", "options", "head"]:
op = item.get(method)
if isinstance(op, dict):
fix_params_container(op.get("parameters"), for_path=path)
if "{certificate_id}" in path and isinstance(op.get("parameters"), list):
for p in op["parameters"]:
if (
isinstance(p, dict)
and p.get("in") == "path"
and p.get("name") == "cert_id"
):
p["name"] = "certificate_id"
note(
f"Renamed op path param cert_id -> certificate_id at {path}"
)
with open(OUT, "w", encoding="utf-8") as f:
yaml.safe_dump(doc, f, sort_keys=False)
print(f"Patched spec written to {OUT}")
with open(REPORT, "w", encoding="utf-8") as rf:
rf.write("# Patch Report\n\n")
rf.write(f"- Source: {os.path.relpath(IN)}\n")
rf.write(f"- Output: {os.path.relpath(OUT)}\n")
rf.write("\n## Changes\n")
if not changes:
rf.write("- No structural changes recorded.\n")
else:
seen = set()
for c in changes:
if c not in seen:
rf.write(f"- {c}\n")
seen.add(c)