import csv
import os
import sys
from pathlib import Path
from typing import Dict, List, Tuple
BENCHMARK_FIELDS = ['mmlu', 'bbh', 'humaneval', 'aime', 'livebench', 'gpqa', 'gpt4_eval', 'agentic', 'ceo_evals', 'context_tokens']
def extract_provider_from_id(model_id: str) -> str:
if '/' in model_id:
return model_id.split('/')[0]
return 'openrouter'
def parse_capabilities(cap_string: str) -> str:
if not cap_string or cap_string == '-':
return ''
return cap_string.upper()
def normalize_price(price_str: str, convert_to_per_million: bool = True) -> str:
if not price_str or price_str == '-' or price_str == '':
return '0'
try:
price_float = float(price_str)
if convert_to_per_million and price_float > 0 and price_float < 0.001:
price_float = price_float * 1_000_000
if price_float == 0:
return '0'
elif price_float < 0.01:
return f"{price_float:.4f}"
elif price_float < 1:
return f"{price_float:.2f}"
else:
return f"{price_float:.2f}".rstrip('0').rstrip('.')
except ValueError:
return '0'
def sanitize_description(desc: str) -> str:
if not desc:
return desc
allowed = set('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789 .,;:()[]{}!?-_/')
desc = ''.join(c for c in desc if c in allowed)
import re
desc = re.sub(r'\s+', ' ', desc).strip()
return desc
def build_benchmarks_string(row: Dict[str, str]) -> str:
benchmarks = []
benchmark_mappings = {
'mmlu_score': 'mmlu',
'bbh_score': 'bbh',
'humaneval_score': 'humaneval',
'aime_score': 'aime',
'livebench_score': 'livebench',
'gpqa_score': 'gpqa',
'gpt4_eval_score': 'gpt4_eval',
'agentic_score': 'agentic',
'ceo_evals_score': 'ceo_evals',
}
for csv_col, field_name in benchmark_mappings.items():
value = row.get(csv_col, '-')
if not value or value == '-':
benchmarks.append('-')
else:
try:
float(value)
benchmarks.append(value)
except ValueError:
benchmarks.append('-')
context = row.get('context_window', '0')
benchmarks.append(context)
return ','.join(benchmarks)
def csv_row_to_pipe_format(row: Dict[str, str], source: str) -> str:
model_id = row['id'].strip()
provider = extract_provider_from_id(model_id)
if '/' not in model_id:
full_id = f"{provider}/{model_id}"
else:
full_id = model_id
alias = row.get('alias', '').strip() or '-'
name = row.get('name', '').strip() or '-'
status = row.get('status', 'C').strip() or 'C'
input_price = normalize_price(row.get('input_price', '0'))
output_price = normalize_price(row.get('output_price', '0'))
cache_price = row.get('cache_input_price', '-').strip()
if cache_price and cache_price != '-':
cache_price = normalize_price(cache_price)
price_str = f"{input_price},{output_price},{cache_price}"
else:
price_str = f"{input_price},{output_price}"
context_window = row.get('context_window', '0').strip() or '0'
max_output = row.get('max_output', '0').strip() or '0'
context_str = f"{context_window},{max_output}"
capabilities = parse_capabilities(row.get('capabilities', ''))
benchmarks = build_benchmarks_string(row)
description = row.get('description', '').strip()
description = sanitize_description(description) if len(description) > 200:
description = description[:197] + '...'
description = description.replace('"', '\\"')
can_classify = row.get('can_classify', 'Y').strip() or 'Y'
line = f"{full_id}|{alias}|{name}|{status}|{price_str}|{context_str}|{capabilities}|{benchmarks}|{description}|{can_classify}"
return line
def process_csv_file(csv_path: Path, source: str) -> List[str]:
lines = []
print(f"Processing {csv_path.name}...", file=sys.stderr)
try:
with open(csv_path, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row_num, row in enumerate(reader, start=2): try:
line = csv_row_to_pipe_format(row, source)
lines.append(line)
except Exception as e:
print(f"Error processing row {row_num} in {csv_path.name}: {e}", file=sys.stderr)
continue
print(f"✓ Processed {len(lines)} models from {csv_path.name}", file=sys.stderr)
return lines
except Exception as e:
print(f"Error reading {csv_path}: {e}", file=sys.stderr)
return []
def generate_model_data_section() -> str:
all_lines = []
data_dir = Path(__file__).parent.parent / 'data' / 'models'
if not data_dir.exists():
print(f"Error: Data directory not found: {data_dir}", file=sys.stderr)
sys.exit(1)
csv_configs = [
('aggregators/openrouter.csv', 'OPENROUTER', 'Meta-aggregator'),
('aggregators/bedrock.csv', 'AWS BEDROCK', 'Enterprise provider'),
('aggregators/together_ai.csv', 'TOGETHER AI', 'Open-source models'),
('inference/groq.csv', 'GROQ', 'Fast inference'),
('inference/fireworks.csv', 'FIREWORKS', 'Fast inference'),
('inference/cerebras.csv', 'CEREBRAS', 'Fast inference'),
('inference/sambanova.csv', 'SAMBANOVA', 'Enterprise inference'),
('inference/perplexity.csv', 'PERPLEXITY', 'Search-augmented'),
('inference/hyperbolic.csv', 'HYPERBOLIC', 'Inference platform'),
('inference/novita.csv', 'NOVITA', 'Inference platform'),
('inference/nebius.csv', 'NEBIUS', 'Cloud inference'),
('core/google.csv', 'GOOGLE', 'Direct API'),
('core/mistral.csv', 'MISTRAL', 'Direct API'),
('core/cohere.csv', 'COHERE', 'Direct API'),
('core/deepseek.csv', 'DEEPSEEK', 'Direct API'),
('core/latest_releases.csv', 'LATEST RELEASES', 'Frontier models'),
('chinese/qwen.csv', 'QWEN', 'Chinese provider'),
('chinese/baichuan.csv', 'BAICHUAN', 'Chinese provider'),
('chinese/zhipu.csv', 'ZHIPU', 'Chinese provider'),
('chinese/minimax.csv', 'MINIMAX', 'Chinese provider'),
('chinese/moonshot.csv', 'MOONSHOT', 'Chinese provider'),
('chinese/yi.csv', 'YI', 'Chinese provider'),
('chinese/volcengine.csv', 'VOLCENGINE', 'Chinese provider'),
('chinese/spark.csv', 'SPARK', 'Chinese provider'),
('specialized/stability.csv', 'STABILITY', 'Image generation'),
('specialized/elevenlabs.csv', 'ELEVENLABS', 'Voice synthesis'),
('specialized/voyage.csv', 'VOYAGE', 'Embeddings'),
('specialized/jina.csv', 'JINA', 'Embeddings'),
('specialized/deepgram.csv', 'DEEPGRAM', 'Speech-to-text'),
]
for csv_path, section_name, description in csv_configs:
full_path = data_dir / csv_path
if full_path.exists():
lines = process_csv_file(full_path, section_name.lower())
if lines:
all_lines.append(f"\n# =============================================================================")
all_lines.append(f"# {section_name} ({description})")
all_lines.append("# =============================================================================")
all_lines.extend(lines)
return '\n'.join(all_lines)
def main():
print("Generating model data...", file=sys.stderr)
output = generate_model_data_section()
if not output:
print("Error: No CSV files processed successfully", file=sys.stderr)
sys.exit(1)
sys.stdout.write(output)
print(f"\nGeneration complete. Total lines: {len(output.splitlines())}", file=sys.stderr)
if __name__ == '__main__':
main()