openlark 0.15.0

飞书开放平台 Rust SDK - 企业级高覆盖率 API 客户端,极简依赖一条命令
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
#!/usr/bin/env python3
"""
检查已实现 API 的调用方法与地址是否和 CSV 一致。

复用 tools/validate_apis.py 中的落盘映射逻辑,只对已实现文件做进一步校验:
- 解析实现文件中的 ApiRequest::<...>::get/post/put/patch/delete(...)
- 尽量展开字面量、端点常量、api_endpoints 枚举 to_url/path、format!、replace、字符串拼接
- 与 CSV 的 url 列(METHOD:/open-apis/...)逐项对比
"""

from __future__ import annotations

import argparse
import contextlib
import io
import json
import re
from collections import Counter, defaultdict
from dataclasses import dataclass
from importlib.util import module_from_spec, spec_from_file_location
from pathlib import Path
from typing import Dict, Iterable, List, Optional, Tuple

try:
    import tomllib  # py>=3.11
except ModuleNotFoundError:  # pragma: no cover
    tomllib = None


ROOT = Path(__file__).resolve().parent.parent
DEFAULT_CSV = ROOT / "api_list_export.csv"
DEFAULT_CONFIG = ROOT / "tools" / "api_coverage.toml"
DEFAULT_JSON = ROOT / "reports" / "api_url_validation" / "summary.json"
DEFAULT_MD = ROOT / "reports" / "api_url_validation" / "summary.md"


@dataclass
class ImplRecord:
    crate: str
    file_path: Path
    api: object


class EndpointIndex:
    def __init__(self, root: Path):
        self.root = root
        self.consts: Dict[str, str] = {}
        self.enum_routes: Dict[str, str] = {}
        self.enum_aliases: Dict[str, str] = {}

    def build(self) -> None:
        endpoint_files = set(self.root.glob("crates/openlark-*/src/**/api_endpoints.rs"))
        endpoint_files.update(self.root.glob("crates/openlark-*/src/endpoints/*.rs"))
        endpoint_files.update(self.root.glob("crates/openlark-*/src/endpoints/**/*.rs"))
        endpoint_files.update(self.root.glob("crates/openlark-core/src/constants.rs"))

        endpoint_files = sorted(endpoint_files)

        for path in endpoint_files:
            text = path.read_text(encoding="utf-8")
            self._load_consts(text)
            self._load_enum_aliases(text)

        for path in endpoint_files:
            text = path.read_text(encoding="utf-8")
            self._load_enum_routes(text)

        for _ in range(8):
            changed = False
            for key, value in list(self.consts.items()):
                if value in self.consts and self.consts[value] != value:
                    self.consts[key] = self.consts[value]
                    changed = True
            if not changed:
                break

    def _load_consts(self, text: str) -> None:
        literal_pat = re.compile(r'pub const\s+([A-Z0-9_]+)\s*:\s*&str\s*=\s*"([^"]+)";')
        alias_pat = re.compile(r"pub const\s+([A-Z0-9_]+)\s*:\s*&str\s*=\s*([A-Z0-9_]+);")
        for name, value in literal_pat.findall(text):
            self.consts[name] = value
        for name, alias in alias_pat.findall(text):
            self.consts.setdefault(name, alias)

    def _load_enum_aliases(self, text: str) -> None:
        for alias, target in re.findall(r"pub type\s+([A-Za-z0-9_]+)\s*=\s*([A-Za-z0-9_]+)\s*;", text):
            self.enum_aliases[alias] = target

    def _load_enum_routes(self, text: str) -> None:
        arm_pattern = re.compile(
            r"([A-Za-z0-9_]+::[A-Za-z0-9_]+)(?:\([^)]*\))?\s*=>\s*(?:\{\s*)?(?:format!\(\s*\"([^\"]+)\"|\"([^\"]+)\")",
            re.S,
        )
        for key, fmt_literal, literal in arm_pattern.findall(text):
            route = fmt_literal or literal
            if route.startswith("/open-apis/"):
                self.enum_routes[key] = route

        const_arm_pattern = re.compile(
            r"([A-Za-z0-9_]+::[A-Za-z0-9_]+)(?:\([^)]*\))?\s*=>\s*([A-Z0-9_]+)",
            re.S,
        )
        for key, const_name in const_arm_pattern.findall(text):
            route = self.consts.get(const_name)
            if route and route.startswith("/open-apis/"):
                self.enum_routes[key] = route

        grouped_arm_pattern = re.compile(
            r"((?:[A-Za-z0-9_]+::[A-Za-z0-9_]+(?:\([^)]*\))?\s*\|\s*)+[A-Za-z0-9_]+::[A-Za-z0-9_]+(?:\([^)]*\))?)\s*=>\s*(?:\{\s*)?(?:format!\(\s*\"([^\"]+)\"|\"([^\"]+)\")",
            re.S,
        )
        for variants_expr, fmt_literal, literal in grouped_arm_pattern.findall(text):
            route = fmt_literal or literal
            if not route.startswith("/open-apis/"):
                continue
            for key in re.findall(r"([A-Za-z0-9_]+::[A-Za-z0-9_]+)(?:\([^)]*\))?", variants_expr):
                self.enum_routes[key] = route

        match_blocks = re.findall(
            r"impl\s+([A-Za-z0-9_]+)\s*\{.*?pub fn (?:to_url|path)\s*\([^)]*\)\s*(?:->\s*[^{]+)?\{(.*?)\n\s*\}",
            text,
            re.S,
        )
        for enum_name, body in match_blocks:
            match_match = re.search(r"match\s+self\s*\{(.*)\}", body, re.S)
            if not match_match:
                continue
            arms = match_match.group(1)
            for variant, expr in re.findall(rf"{enum_name}::([A-Za-z0-9_]+)(?:\([^)]*\))?\s*=>\s*(.*?)(?=,\s*{enum_name}::|\s*$)", arms, re.S):
                route = self._extract_route_expr(expr.strip())
                if route:
                    self.enum_routes[f"{enum_name}::{variant}"] = route

    def _extract_route_expr(self, expr: str) -> Optional[str]:
        expr = expr.strip().rstrip(",")
        if expr.startswith("{") and expr.endswith("}"):
            expr = expr[1:-1].strip()
        literal = re.search(r'"(/open-apis/[^"]*)"', expr)
        if literal:
            return literal.group(1)
        format_match = re.search(r'format!\(\s*"([^"]+)"', expr)
        if format_match:
            return format_match.group(1)
        const_match = re.fullmatch(r"[A-Z0-9_]+", expr)
        if const_match:
            value = self.consts.get(expr)
            if value and value.startswith("/open-apis/"):
                return value
        return None


class ExprResolver:
    def __init__(self, endpoint_index: EndpointIndex):
        self.endpoint_index = endpoint_index

    def resolve(self, expr: str, compact: str) -> Optional[str]:
        if not expr:
            return None

        expr = self._strip_wrappers(expr.strip())

        if expr.startswith("self."):
            return ":{}"

        # format!("/open-apis/...", ...)
        format_value = self._resolve_format_expr(expr, compact)
        if format_value:
            return format_value

        replace_value = self._resolve_replace_chain(expr, compact)
        if replace_value:
            return replace_value

        # "a" + "b" + var
        concat_value = self._resolve_concat_expr(expr, compact)
        if concat_value:
            return concat_value

        # path/to_url
        route_value = self._resolve_route_call(expr, compact)
        if route_value:
            return route_value

        # 直接字面量
        literal = self._extract_first_open_api_literal(expr)
        if literal:
            return self._normalize_template(literal)

        # 常量
        if re.fullmatch(r"[A-Z0-9_]+", expr):
            const_value = self.endpoint_index.consts.get(expr)
            if const_value:
                return self._normalize_template(const_value)

        # 变量
        if re.fullmatch(r"[a-z_][A-Za-z0-9_]*", expr):
            assigned = self._resolve_variable(compact, expr)
            if assigned and assigned != expr:
                return self.resolve(assigned, compact)

        return None

    def _strip_wrappers(self, expr: str) -> str:
        while True:
            original = expr
            expr = expr.strip().rstrip(",").strip()
            if expr.startswith("&"):
                expr = expr[1:].strip()
            if expr.endswith(".to_string()"):
                expr = expr[:-12].strip()
            if expr.endswith(".clone()"):
                expr = expr[:-8].strip()
            if expr.startswith("(") and expr.endswith(")") and self._balanced(expr[1:-1]):
                expr = expr[1:-1].strip()
            if expr == original:
                return expr

    def _resolve_variable(self, compact: str, name: str) -> Optional[str]:
        patterns = [
            rf"let\s+mut\s+{re.escape(name)}\s*=\s*(.*?);",
            rf"let\s+{re.escape(name)}\s*=\s*(.*?);",
        ]
        for pattern in patterns:
            match = re.search(pattern, compact)
            if match:
                return match.group(1).strip()
        return None

    def _resolve_format_expr(self, expr: str, compact: str) -> Optional[str]:
        format_match = re.search(r"format!\((.*)\)$", expr)
        if format_match:
            inner = format_match.group(1).strip()
            parts = [part.strip() for part in self._split_top_level(inner, ",")]
            if parts:
                fmt_literal = self._extract_string_literal(parts[0])
                if fmt_literal is not None:
                    args = parts[1:]
                    rendered = fmt_literal
                    for arg in args:
                        replacement = self.resolve(arg, compact)
                        if replacement is None:
                            replacement = ":{}"
                        rendered = rendered.replace("{}", replacement, 1)
                    return self._normalize_template(rendered)

        # 变量 = format!(...)
        var_match = re.fullmatch(r"[a-z_][A-Za-z0-9_]*", expr)
        if var_match:
            assigned = self._resolve_variable(compact, expr)
            if assigned and "format!(" in assigned:
                return self._resolve_format_expr(assigned, compact)

        return None

    def _resolve_replace_chain(self, expr: str, compact: str) -> Optional[str]:
        if ".replace" not in expr:
            return None

        match = re.match(r"(.+?)(\s*(?:\.\s*replace\([^)]*\)\s*)+)$", expr)
        if not match:
            return None

        base_expr = match.group(1).strip()
        suffix = match.group(2)
        base_value = self.resolve(base_expr, compact)
        if not base_value:
            return None

        value = base_value
        for old, new in re.findall(r'\.\s*replace\(\s*"([^"]+)"\s*,\s*([^)]+)\)', suffix):
            replacement = self._resolve_replace_arg(new.strip(), compact)
            value = value.replace(old, replacement)
        return self._normalize_template(value)

    def _resolve_concat_expr(self, expr: str, compact: str) -> Optional[str]:
        if "+" not in expr:
            return None

        parts = self._split_top_level(expr, "+")
        if len(parts) <= 1:
            return None

        resolved_parts: List[str] = []
        for part in parts:
            part = part.strip()
            if not part:
                continue
            resolved = self.resolve(part, compact)
            if resolved:
                resolved_parts.append(resolved)
                continue

            # "/" 或其他静态字面量
            string_match = re.fullmatch(r'"([^"]*)"', self._strip_wrappers(part))
            if string_match:
                resolved_parts.append(string_match.group(1))
                continue

            return None

        return self._normalize_template("".join(resolved_parts))

    def _resolve_route_call(self, expr: str, compact: str) -> Optional[str]:
        route_match = re.search(r"(.+?)\.(to_url|path)\(\)$", expr)
        if route_match:
            target = route_match.group(1).strip()
            if re.fullmatch(r"[a-z_][A-Za-z0-9_]*", target):
                assigned = self._resolve_variable(compact, target)
                if assigned:
                    return self._resolve_route_call(assigned + f".{route_match.group(2)}()", compact)

            key = target.split("(", 1)[0].strip()
            route = self.endpoint_index.enum_routes.get(key)
            if route is None and "::" in key:
                parts = [part for part in key.split("::") if part]
                if len(parts) >= 2:
                    route = self.endpoint_index.enum_routes.get("::".join(parts[-2:]))
                if route is None:
                    enum_name = parts[-2] if len(parts) >= 2 else None
                    variant = parts[-1] if parts else None
                    target_enum = self.endpoint_index.enum_aliases.get(enum_name or "")
                    if target_enum and variant:
                        route = self.endpoint_index.enum_routes.get(f"{target_enum}::{variant}")
            if route:
                return self._normalize_template(route)

        return None

    def _extract_first_open_api_literal(self, expr: str) -> Optional[str]:
        match = re.search(r'"(/open-apis/[^"]*)"', expr)
        if match:
            return match.group(1)

        # 常量.replace(...).replace(...)
        replace_match = re.match(r"([A-Z0-9_]+)\s*(.*)?$", expr)
        if replace_match:
            const_name = replace_match.group(1)
            suffix = replace_match.group(2) or ""
            base = self.endpoint_index.consts.get(const_name)
            if base:
                value = base
                for old, new in re.findall(r'\.\s*replace\(\s*"([^"]+)"\s*,\s*([^)]+)\)', suffix):
                    replacement = self._resolve_replace_arg(new.strip(), expr)
                    value = value.replace(old, replacement)
                return value

        return None

    def _resolve_replace_arg(self, expr: str, compact: str) -> str:
        expr = self._strip_wrappers(expr)
        literal_match = re.fullmatch(r'"([^"]*)"', expr)
        if literal_match:
            return literal_match.group(1)
        if re.fullmatch(r"[a-z_][A-Za-z0-9_]*", expr):
            return "{" + expr + "}"
        if expr.startswith("self."):
            return "{" + expr.split(".")[-1] + "}"
        return "{}"

    def _normalize_template(self, path: str) -> str:
        path = path.strip()
        if "?" in path:
            path = path.split("?", 1)[0]
        path = re.sub(r"\{[^}]+\}", ":{}", path)
        path = path.replace("{}", ":{}")
        path = re.sub(r":[A-Za-z0-9_]+", ":{}", path)
        path = re.sub(r":+\{\}", ":{}", path)
        path = path.replace(":{}}", ":{}")
        path = path.replace(":{}}}", ":{}")
        path = re.sub(r"/:+", "/:", path)
        path = path.replace("//", "/")
        return path

    def _extract_string_literal(self, expr: str) -> Optional[str]:
        expr = self._strip_wrappers(expr)
        match = re.fullmatch(r'"([^"]*)"', expr)
        if match:
            return match.group(1)
        return None

    def _balanced(self, text: str) -> bool:
        depth = 0
        in_str = False
        escape = False
        for ch in text:
            if in_str:
                if escape:
                    escape = False
                elif ch == "\\":
                    escape = True
                elif ch == '"':
                    in_str = False
                continue
            if ch == '"':
                in_str = True
            elif ch == "(":
                depth += 1
            elif ch == ")":
                depth -= 1
                if depth < 0:
                    return False
        return depth == 0 and not in_str

    def _split_top_level(self, text: str, separator: str) -> List[str]:
        parts: List[str] = []
        buf: List[str] = []
        depth_paren = 0
        in_str = False
        escape = False
        for ch in text:
            if in_str:
                buf.append(ch)
                if escape:
                    escape = False
                elif ch == "\\":
                    escape = True
                elif ch == '"':
                    in_str = False
                continue
            if ch == '"':
                in_str = True
                buf.append(ch)
                continue
            if ch == "(":
                depth_paren += 1
            elif ch == ")":
                depth_paren -= 1
            if ch == separator and depth_paren == 0:
                parts.append("".join(buf))
                buf = []
            else:
                buf.append(ch)
        parts.append("".join(buf))
        return parts


def load_validator():
    spec = spec_from_file_location("validate_apis", ROOT / "tools" / "validate_apis.py")
    module = module_from_spec(spec)
    spec.loader.exec_module(module)
    return module.APIValidator


def parse_args():
    parser = argparse.ArgumentParser(description="检查已实现 API 的方法和地址是否与 CSV 一致")
    parser.add_argument("--csv", default=str(DEFAULT_CSV), help="CSV 文件路径")
    parser.add_argument("--config", default=str(DEFAULT_CONFIG), help="crate 映射配置")
    parser.add_argument("--crate", dest="crate_name", help="仅检查单个 crate")
    parser.add_argument("--json", default=str(DEFAULT_JSON), help="JSON 输出路径")
    parser.add_argument("--markdown", default=str(DEFAULT_MD), help="Markdown 输出路径")
    parser.add_argument("--limit", type=int, default=80, help="Markdown 中每个 crate 展示的最大问题数")
    return parser.parse_args()


def find_request(text: str) -> Tuple[Optional[str], Optional[str], str]:
    compact = re.sub(r"\s+", " ", text)
    match = re.search(r"ApiRequest(?:::<.+?>)?::(get|post|put|patch|delete)\(", compact)
    if not match:
        return None, None, compact
    method = match.group(1).upper()
    arg = parse_balanced_arg(compact, match.end())
    return method, arg, compact


def parse_balanced_arg(text: str, start: int) -> Optional[str]:
    depth = 1
    idx = start
    buf: List[str] = []
    in_str = False
    escape = False
    while idx < len(text):
        ch = text[idx]
        buf.append(ch)
        if in_str:
            if escape:
                escape = False
            elif ch == "\\":
                escape = True
            elif ch == '"':
                in_str = False
        else:
            if ch == '"':
                in_str = True
            elif ch == "(":
                depth += 1
            elif ch == ")":
                depth -= 1
                if depth == 0:
                    buf.pop()
                    return "".join(buf).strip()
        idx += 1
    return None


def collect_implemented_apis(
    validator_cls,
    csv_path: Path,
    config_path: Path,
    crate_name: Optional[str] = None,
) -> List[ImplRecord]:
    if tomllib is None:
        raise RuntimeError("当前 Python 不支持 tomllib,请使用 Python 3.11+")

    config = tomllib.loads(config_path.read_text(encoding="utf-8"))
    crates = config["crates"]
    if crate_name:
        if crate_name not in crates:
            raise KeyError(f"未找到 crate: {crate_name}")
        crates = {crate_name: crates[crate_name]}

    records: List[ImplRecord] = []
    for crate, info in crates.items():
        validator = validator_cls(str(csv_path), str(ROOT / info["src"]), info["biz_tags"])
        with contextlib.redirect_stdout(io.StringIO()):
            validator.parse_csv()
            validator.scan_implementations()
            validator.compare()

        implemented = {
            api.expected_file: api for api in validator.apis if api.is_implemented
        }
        for rel in sorted(validator.implemented_files):
            api = implemented.get(rel)
            if api:
                records.append(ImplRecord(crate=crate, file_path=ROOT / info["src"] / rel, api=api))

    return records


def compare(records: Iterable[ImplRecord], resolver: ExprResolver) -> Dict[str, object]:
    resolved = []
    unresolved = []

    for record in records:
        text = record.file_path.read_text(encoding="utf-8")
        actual_method, arg, compact = find_request(text)
        actual_path = resolver.resolve(arg, compact) if arg else None

        if not actual_method or not actual_path:
            comment_match = re.search(r"url:\s*(GET|POST|PUT|PATCH|DELETE):([^\s]+)", text)
            if comment_match:
                actual_method = comment_match.group(1)
                actual_path = resolver._normalize_template(comment_match.group(2))

        expected_method, expected_path = record.api.url.split(":", 1)
        item = {
            "crate": record.crate,
            "file": str(record.file_path.relative_to(ROOT)),
            "name": record.api.name,
            "expected_method": expected_method,
            "expected_path": resolver._normalize_template(expected_path),
            "actual_method": actual_method,
            "actual_path": actual_path,
            "csv_url": record.api.url,
            "doc_path": record.api.doc_path,
        }

        if not actual_method or not actual_path:
            unresolved.append(item)
            continue

        item["match"] = (
            item["expected_method"] == item["actual_method"]
            and item["expected_path"] == item["actual_path"]
        )
        resolved.append(item)

    mismatches = [item for item in resolved if not item["match"]]

    return {
        "summary": {
            "implemented_total": len(list(records)) if not isinstance(records, list) else len(records),
            "resolved_total": len(resolved),
            "unresolved_total": len(unresolved),
            "matched_total": len(resolved) - len(mismatches),
            "mismatched_total": len(mismatches),
        },
        "mismatches": mismatches,
        "unresolved": unresolved,
    }


def build_markdown(result: Dict[str, object], limit: int) -> str:
    summary = result["summary"]
    mismatches = result["mismatches"]
    unresolved = result["unresolved"]

    lines = [
        "# API 地址校验报告",
        "",
        "## 总体统计",
        "",
        "| 指标 | 数量 |",
        "|---|---:|",
        f"| 已实现文件 | {summary['implemented_total']} |",
        f"| 成功解析 | {summary['resolved_total']} |",
        f"| 一致 | {summary['matched_total']} |",
        f"| 不一致 | {summary['mismatched_total']} |",
        f"| 未解析 | {summary['unresolved_total']} |",
        "",
        "## 不一致分布",
        "",
    ]

    mismatch_counter = Counter(item["crate"] for item in mismatches)
    unresolved_counter = Counter(item["crate"] for item in unresolved)
    lines.append("| crate | 不一致 | 未解析 |")
    lines.append("|---|---:|---:|")
    for crate in sorted(set(mismatch_counter) | set(unresolved_counter)):
        lines.append(f"| {crate} | {mismatch_counter[crate]} | {unresolved_counter[crate]} |")

    by_crate: Dict[str, List[dict]] = defaultdict(list)
    for item in mismatches:
        by_crate[item["crate"]].append(item)

    for crate in sorted(by_crate):
        lines.extend(["", f"## {crate}", ""])
        for item in by_crate[crate][:limit]:
            lines.append(f"- `{item['file']}`")
            lines.append(f"  期望: `{item['expected_method']}:{item['expected_path']}`")
            lines.append(f"  实际: `{item['actual_method']}:{item['actual_path']}`")
        remain = len(by_crate[crate]) - limit
        if remain > 0:
            lines.append(f"- 其余 {remain} 项已省略")

    return "\n".join(lines) + "\n"


def main():
    args = parse_args()
    validator_cls = load_validator()

    records = collect_implemented_apis(
        validator_cls,
        csv_path=Path(args.csv),
        config_path=Path(args.config),
        crate_name=args.crate_name,
    )

    endpoint_index = EndpointIndex(ROOT)
    endpoint_index.build()
    resolver = ExprResolver(endpoint_index)
    result = compare(records, resolver)

    json_path = Path(args.json)
    md_path = Path(args.markdown)
    json_path.parent.mkdir(parents=True, exist_ok=True)
    md_path.parent.mkdir(parents=True, exist_ok=True)

    json_path.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
    md_path.write_text(build_markdown(result, args.limit), encoding="utf-8")

    print(json.dumps(result["summary"], ensure_ascii=False, indent=2))
    print(f"JSON: {json_path}")
    print(f"Markdown: {md_path}")


if __name__ == "__main__":
    main()