import csv
import os
import re
from pathlib import Path
from typing import Dict, List, Set
from dataclasses import dataclass
from collections import defaultdict
try:
import tomllib except ModuleNotFoundError: tomllib = None
@dataclass
class APIInfo:
api_id: str
name: str
biz_tag: str
meta_project: str
meta_version: str
meta_resource: str
meta_name: str
url: str
doc_path: str
expected_file: str = ""
is_implemented: bool = False
class APIValidator:
def __init__(self, csv_path: str, src_path: str, filter_tags: List[str] = None, skip_old_versions: bool = True):
self.csv_path = csv_path
self.src_path = Path(src_path)
self.filter_tags = filter_tags
self.skip_old_versions = skip_old_versions
self.apis: List[APIInfo] = []
self.implemented_files: Set[str] = set()
self.missing_apis: List[APIInfo] = []
self.extra_files: Set[str] = set()
self.skipped_old_count: int = 0
@staticmethod
def _camel_to_snake(name: str) -> str:
if not name:
return name
s1 = re.sub(r"([A-Z]+)([A-Z][a-z])", r"\1_\2", name)
s2 = re.sub(r"([a-z0-9])([A-Z])", r"\1_\2", s1)
return s2.lower()
def _normalize_name_path(self, name_path: str) -> str:
name_path = name_path.replace("#", "_")
segments = [s for s in name_path.split("/") if s]
normalized: List[str] = []
for seg in segments:
if seg.startswith("_") and len(seg) > 1:
normalized.append("_" + self._camel_to_snake(seg[1:]))
else:
normalized.append(self._camel_to_snake(seg))
return "/".join(normalized)
def parse_csv(self):
print(f"📄 读取 CSV 文件: {self.csv_path}")
if self.filter_tags:
print(f"🏷️ 过滤业务标签: {', '.join(self.filter_tags)}")
if self.skip_old_versions:
print(f"🔧 跳过旧版本 API: version='old'")
with open(self.csv_path, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
if self.filter_tags and row['bizTag'] not in self.filter_tags:
continue
if self.skip_old_versions and row['meta.Version'] == 'old':
self.skipped_old_count += 1
continue
api = APIInfo(
api_id=row['id'],
name=row['name'],
biz_tag=row['bizTag'],
meta_project=row['meta.Project'],
meta_version=row['meta.Version'],
meta_resource=row['meta.Resource'],
meta_name=row['meta.Name'],
url=row['url'],
doc_path=row['docPath']
)
api.expected_file = self._generate_expected_file_path(api)
self.apis.append(api)
print(f"✅ 解析完成,共 {len(self.apis)} 个 API")
if self.skip_old_versions and self.skipped_old_count > 0:
print(f" 📋 已跳过 {self.skipped_old_count} 个旧版本 API")
def _generate_expected_file_path(self, api: APIInfo) -> str:
if api.biz_tag == 'meeting_room' and api.meta_version == 'old' and api.meta_resource == 'default':
name_path = api.meta_name.replace(':', '_')
name_path = self._normalize_name_path(name_path)
if '/' in name_path:
name_with_path = name_path.replace('/', '/')
else:
name_with_path = name_path
return f"meeting_room/{name_with_path}.rs"
base = f"{api.biz_tag}/{api.meta_project}"
version = api.meta_version
resource_path = api.meta_resource.replace('.', '/')
name_path = api.meta_name.replace(':', '_').rstrip('/')
name_path = self._normalize_name_path(name_path)
if '/' in name_path:
name_with_path = name_path.replace('/', '/')
else:
name_with_path = name_path
full_path = f"{base}/{version}/{resource_path}/{name_with_path}.rs"
return full_path
def scan_implementations(self):
print(f"🔍 扫描代码实现目录: {self.src_path}")
for root, dirs, files in os.walk(self.src_path):
dirs[:] = [d for d in dirs if not d.startswith('.') and d != '__pycache__']
for file in files:
exclude_files = ('mod.rs', 'models.rs', 'macros.rs', 'service.rs', 'responses.rs')
if file.endswith('.rs') and file not in exclude_files:
full_path = os.path.join(root, file)
rel_path = os.path.relpath(full_path, self.src_path)
rel_path = rel_path.replace('\\', '/')
if not rel_path.startswith('lib.rs') and not rel_path.startswith('common/'):
self.implemented_files.add(rel_path)
print(f"✅ 扫描完成,找到 {len(self.implemented_files)} 个实现文件")
def compare(self):
print("🔬 开始对比分析...")
for api in self.apis:
if api.expected_file and api.expected_file in self.implemented_files:
api.is_implemented = True
else:
api.is_implemented = False
self.missing_apis.append(api)
expected_files = set(api.expected_file for api in self.apis if api.expected_file)
self.extra_files = self.implemented_files - expected_files
print(f"✅ 对比完成")
print(f" - 已实现: {len([a for a in self.apis if a.is_implemented])}")
print(f" - 未实现: {len(self.missing_apis)}")
print(f" - 额外文件: {len(self.extra_files)}")
def generate_report(self, output_path: str):
print(f"📝 生成报告: {output_path}")
with open(output_path, 'w', encoding='utf-8') as f:
f.write("# API 验证报告\n\n")
f.write(f"**生成时间**: {self._get_timestamp()}\n")
f.write(f"**CSV 文件**: {self.csv_path}\n")
f.write(f"**源码目录**: {self.src_path}\n")
f.write(f"**命名规范**: `src/bizTag/meta.project/meta.version/meta.resource/meta.name.rs`\n\n")
f.write("## 一、总体统计\n\n")
total_apis = len(self.apis)
implemented = len([a for a in self.apis if a.is_implemented])
missing = len(self.missing_apis)
completion_rate = (implemented / total_apis * 100) if total_apis > 0 else 0
f.write(f"| 指标 | 数量 |\n")
f.write(f"|------|------|\n")
f.write(f"| **API 总数** | {total_apis} |\n")
f.write(f"| **已实现** | {implemented} |\n")
f.write(f"| **未实现** | {missing} |\n")
f.write(f"| **完成率** | {completion_rate:.1f}% |\n")
f.write(f"| **额外文件** | {len(self.extra_files)} |\n\n")
f.write("## 二、模块统计\n\n")
module_stats = self._calculate_module_stats()
f.write("| 模块 | API 数量 | 已实现 | 未实现 | 完成率 |\n")
f.write("|------|---------|--------|--------|--------|\n")
for module_name, stats in sorted(module_stats.items()):
f.write(f"| {module_name} | {stats['total']} | {stats['implemented']} | "
f"{stats['missing']} | {stats['rate']:.1f}% |\n")
f.write("\n")
if self.missing_apis:
f.write("## 三、未实现的 API\n\n")
missing_by_module = defaultdict(list)
for api in self.missing_apis:
module_name = api.biz_tag.upper()
missing_by_module[module_name].append(api)
for module_name in sorted(missing_by_module.keys()):
f.write(f"### {module_name} ({len(missing_by_module[module_name])} 个)\n\n")
for api in sorted(missing_by_module[module_name], key=lambda x: x.name):
f.write(f"#### {api.name}\n\n")
f.write(f"- **API ID**: {api.api_id}\n")
f.write(f"- **预期文件**: `{api.expected_file}`\n")
f.write(f"- **URL**: {api.url}\n")
f.write(f"- **文档**: {api.doc_path}\n\n")
if self.extra_files:
f.write("## 四、额外的实现文件\n\n")
f.write("这些文件存在于代码中,但不在 CSV API 列表中:\n\n")
for file in sorted(self.extra_files):
f.write(f"- `{file}`\n")
f.write("\n")
f.write("## 五、已实现的 API\n\n")
implemented_by_module = defaultdict(list)
for api in self.apis:
if api.is_implemented:
module_name = api.biz_tag.upper()
implemented_by_module[module_name].append(api)
for module_name in sorted(implemented_by_module.keys()):
f.write(f"### {module_name} ({len(implemented_by_module[module_name])} 个)\n\n")
for api in sorted(implemented_by_module[module_name], key=lambda x: x.name):
f.write(f"- ✅ {api.name} (`{api.expected_file}`)\n")
f.write("\n")
print(f"✅ 报告生成完成")
def _calculate_module_stats(self) -> Dict[str, Dict]:
module_stats = defaultdict(lambda: {'total': 0, 'implemented': 0, 'missing': 0, 'rate': 0.0})
for api in self.apis:
module_name = api.biz_tag.upper()
module_stats[module_name]['total'] += 1
if api.is_implemented:
module_stats[module_name]['implemented'] += 1
else:
module_stats[module_name]['missing'] += 1
for stats in module_stats.values():
if stats['total'] > 0:
stats['rate'] = (stats['implemented'] / stats['total']) * 100
return dict(module_stats)
@staticmethod
def _get_timestamp() -> str:
from datetime import datetime
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def main():
import argparse
parser = argparse.ArgumentParser(description='API 验证脚本(基于 strict 命名规范)')
parser.add_argument('--csv', default='api_list_export.csv',
help='CSV 文件路径 (默认: api_list_export.csv)')
parser.add_argument('--src', default=None,
help='源码目录路径(默认: crates/openlark-meeting/src;也可用 --crate 自动设置)')
parser.add_argument('--output', default=None,
help='报告输出路径(默认: API_VALIDATION_REPORT.md;--crate 时默认: reports/api_validation/<crate>.md)')
parser.add_argument('--filter', nargs='+',
help='过滤业务标签 (例如: --filter calendar vc meeting_room)')
parser.add_argument('--crate',
help='按 crate 自动设置 --src/--filter(来源: tools/api_coverage.toml)')
parser.add_argument('--mapping', default='tools/api_coverage.toml',
help='crate→bizTag 映射文件路径 (默认: tools/api_coverage.toml)')
parser.add_argument('--list-crates', action='store_true',
help='列出映射文件中的 crate 与 bizTag,然后退出')
parser.add_argument('--skip-old', dest='skip_old', action='store_true', default=True,
help='跳过旧版本 API (version=old,默认启用)')
parser.add_argument('--include-old', dest='skip_old', action='store_false',
help='包含旧版本 API (version=old)')
args = parser.parse_args()
print("=" * 60)
print("🚀 API 验证工具(Strict 命名规范)")
print("=" * 60)
print()
def _load_mapping(path: str) -> dict:
if tomllib is None:
print("❌ 错误: 当前 Python 不支持 tomllib,请使用 Python 3.11+")
raise SystemExit(1)
mapping_path = Path(path)
if not mapping_path.exists():
print(f"❌ 错误: 映射文件不存在: {mapping_path}")
raise SystemExit(1)
data = tomllib.loads(mapping_path.read_text(encoding="utf-8"))
crates = data.get("crates", {})
if not isinstance(crates, dict) or not crates:
print(f"❌ 错误: 映射文件缺少 [crates.*] 配置: {mapping_path}")
raise SystemExit(1)
return crates
if args.list_crates:
crates = _load_mapping(args.mapping)
print(f"📄 映射文件: {args.mapping}\n")
for crate_name in sorted(crates.keys()):
cfg = crates.get(crate_name, {})
src = cfg.get("src", "")
tags = cfg.get("biz_tags", [])
tags_text = ", ".join(tags) if isinstance(tags, list) else str(tags)
print(f"- {crate_name}: src={src} biz_tags=[{tags_text}]")
return 0
if args.crate:
crates = _load_mapping(args.mapping)
if args.crate not in crates:
print(f"❌ 错误: 映射文件中不存在 crate: {args.crate}")
print(f" 提示:运行 `python3 tools/validate_apis.py --list-crates` 查看可用项")
return 1
cfg = crates[args.crate]
if args.src is None:
args.src = cfg.get("src")
if args.filter is None:
args.filter = cfg.get("biz_tags")
if args.output is None:
if args.crate:
args.output = f"reports/api_validation/{args.crate}.md"
else:
args.output = "API_VALIDATION_REPORT.md"
if args.src is None:
args.src = 'crates/openlark-meeting/src'
if not os.path.exists(args.csv):
print(f"❌ 错误: CSV 文件不存在: {args.csv}")
return 1
if not os.path.exists(args.src):
print(f"❌ 错误: 源码目录不存在: {args.src}")
return 1
validator = APIValidator(args.csv, args.src, args.filter, args.skip_old)
validator.parse_csv()
validator.scan_implementations()
validator.compare()
Path(args.output).parent.mkdir(parents=True, exist_ok=True)
validator.generate_report(args.output)
print()
print("=" * 60)
print("✅ 验证完成!")
print(f"📄 报告已保存到: {args.output}")
print("=" * 60)
return 0
if __name__ == '__main__':
exit(main())