import csv
import sys
from pathlib import Path
from typing import List, Dict, Any
UNICODE_REPLACEMENTS = {
'\u2018': "'", '\u2019': "'", '\u201c': '"', '\u201d': '"', '\u2013': '-', '\u2014': '-', '\u2010': '-', '\u2011': '-', '\u2212': '-', '\u00b4': "'", '`': "'", '\xa0': ' ', }
def sanitize_text(text: str) -> str:
if not text:
return text
for unicode_char, ascii_char in UNICODE_REPLACEMENTS.items():
text = text.replace(unicode_char, ascii_char)
safe_chars = []
for char in text:
if ord(char) < 128: safe_chars.append(char)
else: pass
result = ''.join(safe_chars)
result = result.rstrip('([{')
while ' ' in result:
result = result.replace(' ', ' ')
return result.strip()
def clean_csv_file(csv_path: Path) -> int:
if not csv_path.exists():
print(f"Error: File not found: {csv_path}", file=sys.stderr)
return 0
print(f"Cleaning {csv_path.name}...", file=sys.stderr)
try:
rows = []
fieldnames = None
with open(csv_path, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
fieldnames = reader.fieldnames
for row in reader:
cleaned_row = {}
for key, value in row.items():
cleaned_row[key] = sanitize_text(value) if value else value
rows.append(cleaned_row)
with open(csv_path, 'w', newline='', encoding='utf-8') as f:
if fieldnames:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(rows)
print(f"✓ Cleaned {len(rows)} rows in {csv_path.name}", file=sys.stderr)
return len(rows)
except Exception as e:
print(f"Error processing {csv_path}: {e}", file=sys.stderr)
return 0
def main():
data_dir = Path(__file__).parent.parent / 'data' / 'models'
csv_files = [
data_dir / 'aggregators' / 'openrouter.csv',
data_dir / 'aggregators' / 'bedrock.csv',
data_dir / 'aggregators' / 'together_ai.csv',
data_dir / 'core' / 'latest_releases.csv',
]
total_rows = 0
for csv_file in csv_files:
if csv_file.exists():
count = clean_csv_file(csv_file)
total_rows += count
print(f"\nTotal rows cleaned: {total_rows}", file=sys.stderr)
return 0
if __name__ == '__main__':
sys.exit(main())