import json
import re
import sys
from pathlib import Path
try:
import tomllib except ImportError:
try:
import tomli as tomllib
except ImportError:
print("ERROR: Need Python 3.11+ (tomllib) or `pip install tomli`", file=sys.stderr)
sys.exit(1)
CACHE_DIR = Path(__file__).parent / "discovery_cache"
MANIFESTS_DIR = Path(__file__).parent / "manifests"
REFERENCE_DIR = Path(__file__).parent / "reference"
RESERVED_WORDS = frozenset([
'type', 'match', 'self', 'ref', 'mod', 'use', 'fn', 'let', 'mut',
'pub', 'return', 'if', 'else', 'for', 'while', 'loop', 'break',
'continue', 'struct', 'enum', 'impl', 'trait', 'where', 'async',
'await', 'move', 'static', 'const', 'crate', 'super', 'extern',
'unsafe', 'dyn', 'abstract', 'become', 'box', 'do', 'final',
'macro', 'override', 'priv', 'typeof', 'unsized', 'virtual', 'yield',
])
def to_snake_case(name: str) -> str:
s = re.sub(r'([a-z0-9])([A-Z])', r'\1_\2', name)
s = re.sub(r'([A-Z]+)([A-Z][a-z])', r'\1_\2', s)
return s.lower()
def field_annotation(fname: str, fprop: dict) -> str:
parts = []
ftype = fprop.get('type', '')
ref = fprop.get('$ref', '')
fmt = fprop.get('format', '')
if ref:
parts.append(f'$ref: {ref}')
elif ftype == 'array':
items = fprop.get('items', {})
item_ref = items.get('$ref', '')
item_type = items.get('type', '?')
parts.append(f'array<{item_ref or item_type}>')
elif ftype == 'object':
addl = fprop.get('additionalProperties', {})
if addl:
val_type = addl.get('$ref', addl.get('type', '?'))
parts.append(f'map<string, {val_type}>')
else:
parts.append('object')
else:
parts.append(ftype or '?')
if fmt:
parts.append(f'format: {fmt}')
if 'enum' in fprop:
vals = fprop['enum']
if len(vals) <= 5:
parts.append(f'enum: [{", ".join(vals)}]')
else:
parts.append(f'enum: [{", ".join(vals[:4])}, ... +{len(vals)-4}]')
if fprop.get('repeated'):
parts.append('repeated')
if fprop.get('readOnly'):
parts.append('readOnly')
if fname in RESERVED_WORDS:
parts.append('RESERVED_WORD')
return ', '.join(parts)
def collect_all_methods(discovery: dict) -> list[dict]:
results = []
def walk(resources: dict, prefix: str = ""):
for name, res in resources.items():
full = f"{prefix}{name}" if prefix else name
for method_name, method_data in res.get('methods', {}).items():
results.append({
'resource': full,
'method': method_name,
'dotted': f"{full}.{method_name}",
'data': method_data,
})
if 'resources' in res:
walk(res['resources'], f"{full}.")
walk(discovery.get('resources', {}))
return results
def generate_reference(api_name: str, manifest: dict, discovery: dict) -> str:
schemas = discovery.get('schemas', {})
methods = collect_all_methods(discovery)
title = discovery.get('title', api_name)
manifest_schemas = {t['schema'] for t in manifest.get('types', [])}
manifest_ops = set()
for op in manifest.get('operations', []):
resource = op.get('discovery_resource', '')
method = op.get('discovery_method', '')
if resource and '.' not in method:
manifest_ops.add(f"{resource}.{method}")
else:
manifest_ops.add(method)
lines = []
lines.append(f"# {title} — Full Reference Manifest")
lines.append(f"# Auto-generated by reference.py — DO NOT HAND-EDIT")
lines.append(f"# This file contains EVERY schema and operation from the discovery doc.")
lines.append(f"# Use it as a reference when extending codegen/manifests/{api_name}.toml")
lines.append(f"#")
lines.append(f"# Schemas: {len(schemas)} total, {len(manifest_schemas)} in manifest")
lines.append(f"# Operations: {len(methods)} total, {len(manifest_ops)} in manifest")
lines.append(f"#")
lines.append(f"# Legend:")
lines.append(f"# [IN MANIFEST] = already in the curated manifest")
lines.append(f"# format: byte = needs format = \"bytes\" override")
lines.append(f"# RESERVED_WORD = needs rust_name + serde_rename override")
lines.append(f"")
lines.append(f"# {'=' * 70}")
lines.append(f"# SCHEMAS ({len(schemas)} total)")
lines.append(f"# {'=' * 70}")
lines.append(f"")
for schema_name in sorted(schemas.keys()):
schema = schemas[schema_name]
props = schema.get('properties', {})
desc = re.sub(r'\s+', ' ', schema.get('description', '')).strip()[:100]
in_manifest = schema_name in manifest_schemas
tag = " [IN MANIFEST]" if in_manifest else ""
lines.append(f"# --- {schema_name} ({len(props)} fields){tag} ---")
if desc:
lines.append(f"# {desc}")
byte_fields = [f for f, p in props.items() if p.get('format') == 'byte']
reserved_fields = [f for f in props if f in RESERVED_WORDS]
enum_fields = [f for f, p in props.items() if 'enum' in p]
if byte_fields:
lines.append(f"# WARNING: format:\"byte\" fields: {', '.join(byte_fields)}")
if reserved_fields:
lines.append(f"# WARNING: reserved words: {', '.join(reserved_fields)}")
lines.append(f"# [[types]]")
lines.append(f"# schema = \"{schema_name}\"")
if props:
field_strs = ', '.join(f'"{f}"' for f in sorted(props.keys()))
lines.append(f"# include_fields = [{field_strs}]")
lines.append(f"#")
for fname in sorted(props.keys()):
fprop = props[fname]
ann = field_annotation(fname, fprop)
lines.append(f"# {fname}: {ann}")
overrides = []
if 'name' in props:
overrides.append('name = { required = true }')
for fname in reserved_fields:
rust_name = f'{to_snake_case(fname)}_value'
overrides.append(f'{fname} = {{ rust_name = "{rust_name}", serde_rename = "{fname}" }}')
for fname in byte_fields:
overrides.append(f'{fname} = {{ format = "bytes" }}')
for fname in enum_fields:
enum_name = f'{schema_name}{fname[0].upper()}{fname[1:]}'
overrides.append(f'# {fname} = {{ enum_type = "{enum_name}" }}')
if overrides:
lines.append(f"#")
lines.append(f"# [types.field_overrides]")
for o in overrides:
lines.append(f"# {o}")
lines.append(f"")
lines.append(f"# {'=' * 70}")
lines.append(f"# OPERATIONS ({len(methods)} total)")
lines.append(f"# {'=' * 70}")
lines.append(f"")
for m in methods:
data = m['data']
dotted = m['dotted']
resource = m['resource']
method_name = m['method']
http_method = data.get('httpMethod', '?')
path = data.get('path', '')
desc = re.sub(r'\s+', ' ', data.get('description', '')).strip()[:100]
req_ref = data.get('request', {}).get('$ref', '')
resp_ref = data.get('response', {}).get('$ref', '')
params = data.get('parameters', {})
in_manifest = dotted in manifest_ops
tag = " [IN MANIFEST]" if in_manifest else ""
name_map = {'insert': 'create', 'patch': 'update'}
action = name_map.get(method_name, method_name)
singular = resource.rstrip('s') if resource.endswith('s') and method_name != 'list' else resource
if method_name == 'list':
rust_name = f"list_{to_snake_case(resource)}"
else:
rust_name = f"{action}_{to_snake_case(singular)}"
is_lro = False
if resp_ref == 'Operation' and resp_ref in schemas:
resp_schema = schemas[resp_ref]
resp_props = resp_schema.get('properties', {})
if ('selfLink' in resp_props and 'status' in resp_props) or ('done' in resp_props):
is_lro = True
list_response = None
if resp_ref and resp_ref in schemas:
resp_schema = schemas[resp_ref]
resp_props = resp_schema.get('properties', {})
if 'nextPageToken' in resp_props:
for fname, fprop in resp_props.items():
if fname == 'nextPageToken':
continue
if fprop.get('type') == 'array' and fprop.get('items', {}).get('$ref'):
list_response = {
'type_name': resp_ref,
'items_field': fname,
'item_type': fprop['items']['$ref'],
}
break
query_params = {k: v for k, v in params.items() if v.get('location') == 'query'}
lines.append(f"# --- {dotted}{tag} ---")
lines.append(f"# {http_method} {path}")
if desc:
lines.append(f"# {desc}")
if req_ref:
lines.append(f"# Request: {req_ref}")
if resp_ref:
lines.append(f"# Response: {resp_ref}")
if query_params:
qp_strs = []
for qname, qprop in sorted(query_params.items()):
rep = " (repeated)" if qprop.get('repeated') else ""
qp_strs.append(f"{qname}{rep}")
lines.append(f"# Query params: {', '.join(qp_strs)}")
lines.append(f"# [[operations]]")
if '.' not in resource:
lines.append(f'# discovery_resource = "{resource}"')
lines.append(f'# discovery_method = "{method_name}"')
else:
lines.append(f'# discovery_method = "{resource}.{method_name}"')
lines.append(f'# rust_name = "{rust_name}"')
if is_lro:
lines.append(f'# is_lro = true')
if list_response:
lines.append(
f'# list_response = {{ type_name = "{list_response["type_name"]}", '
f'items_field = "{list_response["items_field"]}", '
f'item_type = "{list_response["item_type"]}" }}'
)
if query_params:
qp_names = sorted(query_params.keys())
lines.append(f'# query_params = [{", ".join(f"{chr(34)}{q}{chr(34)}" for q in qp_names)}]')
lines.append(f"")
lines.append(f"# {'=' * 70}")
lines.append(f"# COVERAGE SUMMARY")
lines.append(f"# {'=' * 70}")
lines.append(f"# Schemas in discovery: {len(schemas)}")
lines.append(f"# Schemas in manifest: {len(manifest_schemas)}")
lines.append(f"# Coverage: {len(manifest_schemas)}/{len(schemas)} ({100*len(manifest_schemas)//max(len(schemas),1)}%)")
lines.append(f"#")
lines.append(f"# Operations in discovery: {len(methods)}")
lines.append(f"# Operations in manifest: {len(manifest_ops)}")
lines.append(f"# Coverage: {len(manifest_ops)}/{len(methods)} ({100*len(manifest_ops)//max(len(methods),1)}%)")
total_byte = 0
unhandled_byte = 0
for type_conf in manifest.get('types', []):
schema_name = type_conf['schema']
schema = schemas.get(schema_name, {})
props = schema.get('properties', {})
include_fields = type_conf.get('include_fields', list(props.keys()))
field_overrides = type_conf.get('field_overrides', {})
for fname in include_fields:
if fname not in props:
continue
if props[fname].get('format') == 'byte':
total_byte += 1
override = field_overrides.get(fname, {})
if not (isinstance(override, dict) and override.get('format') == 'bytes'):
unhandled_byte += 1
if total_byte > 0:
lines.append(f"#")
lines.append(f"# format:\"byte\" fields in manifest types: {total_byte}")
lines.append(f"# Unhandled (missing format=\"bytes\"): {unhandled_byte}")
lines.append(f"")
return '\n'.join(lines)
def process_api(api_name: str) -> bool:
manifest = None
manifest_path = None
for path in sorted(MANIFESTS_DIR.glob("*.toml")):
with open(path, 'rb') as f:
m = tomllib.load(f)
if m.get('api', {}).get('name') == api_name:
manifest = m
manifest_path = path
break
if not manifest:
print(f" ERROR: No manifest found for '{api_name}'", file=sys.stderr)
return False
version = manifest['api'].get('version', 'v1')
cache_file = CACHE_DIR / f"{api_name}.{version}.json"
if not cache_file.exists():
print(f" ERROR: Discovery doc not cached for '{api_name}'", file=sys.stderr)
print(f" Run: python3 codegen/fetch_discovery.py", file=sys.stderr)
return False
with open(cache_file) as f:
discovery = json.load(f)
content = generate_reference(api_name, manifest, discovery)
REFERENCE_DIR.mkdir(parents=True, exist_ok=True)
out_file = REFERENCE_DIR / f"{api_name}.full.toml"
out_file.write_text(content)
schema_count = len(discovery.get('schemas', {}))
method_count = len(collect_all_methods(discovery))
print(f" {api_name}: {schema_count} schemas, {method_count} operations -> {out_file.name}")
return True
def check_references() -> bool:
import tempfile
all_ok = True
for path in sorted(MANIFESTS_DIR.glob("*.toml")):
with open(path, 'rb') as f:
m = tomllib.load(f)
api_name = m['api']['name']
version = m['api'].get('version', 'v1')
ref_file = REFERENCE_DIR / f"{api_name}.full.toml"
if not ref_file.exists():
print(f" {api_name}: MISSING (run 'python3 codegen/reference.py')")
all_ok = False
continue
cache_file = CACHE_DIR / f"{api_name}.{version}.json"
if not cache_file.exists():
print(f" {api_name}: discovery doc not cached, skipping")
continue
with open(cache_file) as f:
discovery = json.load(f)
expected = generate_reference(api_name, m, discovery)
actual = ref_file.read_text()
if expected != actual:
print(f" {api_name}: OUT OF DATE (regenerate with 'python3 codegen/reference.py {api_name}')")
all_ok = False
else:
print(f" {api_name}: OK")
return all_ok
def main():
args = [a for a in sys.argv[1:] if not a.startswith('--')]
flags = set(sys.argv[1:]) - set(args)
if '--check' in flags:
print("Checking reference manifests...\n")
ok = check_references()
sys.exit(0 if ok else 1)
if args:
for api_name in args:
process_api(api_name)
else:
print("Generating reference manifests for all APIs...\n")
for path in sorted(MANIFESTS_DIR.glob("*.toml")):
with open(path, 'rb') as f:
m = tomllib.load(f)
process_api(m['api']['name'])
print("\nDone!")
if __name__ == '__main__':
main()