from __future__ import annotations
import argparse
import os
import re
import subprocess
import sys
from collections import Counter
from pathlib import Path
from xml.etree import ElementTree as ET
REPO_ROOT = Path(__file__).resolve().parent.parent.parent
PROFILE_BINARIES = ("profile_search", "profile_build")
def find_profiling_binary(name: str = "profile_search") -> Path:
target = REPO_ROOT / "target" / "profiling" / "examples" / name
if target.exists():
return target
deps = REPO_ROOT / "target" / "profiling" / "deps"
if deps.exists():
for f in deps.iterdir():
if f.name.startswith(name) and f.is_file() and os.access(f, os.X_OK):
return f
return target
def build_profiling_binary(name: str = "profile_search") -> Path:
print(f"Building {name} with --profile profiling...")
subprocess.run(
["cargo", "build", "--profile", "profiling", "--example", name, "-p", "matcher_rs"],
cwd=REPO_ROOT,
check=True,
)
binary = find_profiling_binary(name)
if not binary.exists():
sys.exit(f"Error: binary not found at {binary} after build")
return binary
def record_profile(
*,
target: str = "search",
scene: str | None = None,
mode: str = "process",
shape: str = "literal",
dict_lang: str = "en",
rules: int = 10_000,
pt: str = "none",
seconds: int = 10,
output: Path | None = None,
build: bool = True,
) -> Path:
binary_name = f"profile_{target}"
binary = find_profiling_binary(binary_name)
if build:
binary = build_profiling_binary(binary_name)
elif not binary.exists():
binary = build_profiling_binary(binary_name)
binary_args: list[str] = []
if target == "build":
if output is None:
output = Path(f"/tmp/prof_build_{dict_lang}_{rules}.trace")
binary_args += [
"--dict", dict_lang,
"--rules", str(rules),
"--pt", pt,
"--seconds", str(seconds),
]
print(f"Recording build: dict={dict_lang} rules={rules} pt={pt} seconds={seconds}s")
elif scene:
if output is None:
output = Path(f"/tmp/prof_{scene}.trace")
binary_args += ["--scene", scene, "--seconds", str(seconds)]
print(f"Recording: scene={scene} seconds={seconds}s")
else:
if output is None:
output = Path(f"/tmp/prof_{mode}_{shape}_{dict_lang}_{rules}.trace")
binary_args += [
"--dict", dict_lang,
"--rules", str(rules),
"--mode", mode,
"--shape", shape,
"--pt", pt,
"--seconds", str(seconds),
]
print(f"Recording: mode={mode} shape={shape} dict={dict_lang} rules={rules} pt={pt} seconds={seconds}s")
if output.exists():
subprocess.run(["rm", "-rf", str(output)], check=True)
cmd = [
"xctrace", "record",
"--template", "Time Profiler",
"--time-limit", f"{seconds + 5}s",
"--output", str(output),
"--launch", "--", str(binary), *binary_args,
]
print(f"Output: {output}")
result = subprocess.run(cmd)
if result.returncode != 0:
sys.exit(f"xctrace record failed (exit {result.returncode})")
if not output.exists():
sys.exit(f"Error: trace not created at {output}")
print(f"Trace saved: {output}")
return output
def batch_demangle(mangled: set[str]) -> dict[str, str]:
if not mangled:
return {}
names = sorted(mangled)
try:
proc = subprocess.run(
["rustfilt"],
input="\n".join(names),
capture_output=True,
text=True,
timeout=10,
)
demangled = proc.stdout.strip().split("\n")
if len(demangled) == len(names):
return dict(zip(names, demangled))
except (FileNotFoundError, subprocess.TimeoutExpired):
pass
result = {}
for n in names:
clean = re.sub(r"::h[0-9a-f]{16}$", "", n)
result[n] = clean
return result
def shorten_symbol(demangled: str) -> str:
s = demangled
if s.startswith("<") and ">" in s:
s = re.sub(r"^<(.+?)>::", r"\1::", s)
return s
def resolve_addresses_atos(
binary_path: str,
load_addr: str,
addresses: list[str],
) -> dict[str, list[dict]]:
if not addresses:
return {}
unique_addrs = sorted(set(addresses))
try:
cmd = ["atos", "-i", "-o", binary_path, "-l", load_addr] + unique_addrs
proc = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
raw_output = proc.stdout.strip()
except (FileNotFoundError, subprocess.TimeoutExpired):
return {}
if not raw_output:
return {}
try:
proc2 = subprocess.run(
["rustfilt"], input=raw_output,
capture_output=True, text=True, timeout=10,
)
demangled_output = proc2.stdout.strip()
except (FileNotFoundError, subprocess.TimeoutExpired):
demangled_output = raw_output
groups = re.split(r"\n\s*\n", demangled_output)
result: dict[str, list[dict]] = {}
for addr, group in zip(unique_addrs, groups):
chain = []
for line in group.strip().splitlines():
line = line.strip()
if not line:
continue
m = re.match(r"^(.+?)\s+\(in .+?\)\s+\((.+?):(\d+)\)$", line)
if m:
chain.append({
"name": shorten_symbol(m.group(1)),
"file": Path(m.group(2)).name,
"line": m.group(3),
})
else:
m2 = re.match(r"^(.+?)\s+\(in .+?\)", line)
name = shorten_symbol(m2.group(1)) if m2 else line
chain.append({"name": name, "file": "", "line": ""})
if chain:
result[addr] = chain
return result
def _build_source_attribution(
samples: list[dict],
atos_cache: dict[str, list[dict]],
total_weight_ns: int,
) -> list[dict]:
OUR_FILES = {
"mod.rs", "build.rs", "engine.rs", "search.rs", "state.rs", "rule.rs",
"step.rs", "graph.rs", "api.rs", "process_type.rs", "string_pool.rs",
"replace.rs", "delete.rs", "utf8.rs", "simd.rs", "constants.rs",
"builder.rs",
}
attribution: Counter[str] = Counter()
attribution_chain: dict[str, str] = {}
for s in samples:
frames = s["frames"]
w = s["weight_ns"]
leaf_addr = frames[0].get("addr", "") if frames else ""
chain = atos_cache.get(leaf_addr, [])
attributed = None
chain_str = ""
for i, frame in enumerate(chain):
fname = frame.get("file", "")
if fname in OUR_FILES:
loc = f"{fname}:{frame['line']}" if frame.get("line") else fname
name = frame["name"]
name = re.sub(r"::<.*$", "", name) name = re.sub(r"<[^>]*>", "", name) if "::" in name:
parts = [p for p in name.split("::") if p]
name = "::".join(parts[-2:]) if len(parts) > 2 else name
attributed = f"{name} ({loc})"
chain_parts = []
for j in range(i + 1, min(i + 4, len(chain))):
cname = chain[j]["name"]
cname = re.sub(r"::<.*$", "", cname)
cname = re.sub(r"<[^>]*>", "", cname)
if "::" in cname:
cparts = [p for p in cname.split("::") if p]
cname = cparts[-1] if len(cparts) > 1 else cname
chain_parts.append(cname)
if chain_parts:
chain_str = " → ".join(chain_parts)
break
if attributed is None:
if chain:
attributed = f"{chain[0]['name']} ({chain[0].get('file', '?')}:{chain[0].get('line', '?')})"
else:
attributed = "unknown"
attribution[attributed] += w
if attributed not in attribution_chain and chain_str:
attribution_chain[attributed] = chain_str
result = []
for source, w in attribution.most_common(25):
entry = {
"source": source,
"weight_ms": w / 1_000_000,
"pct": w / total_weight_ns * 100 if total_weight_ns else 0,
}
if source in attribution_chain:
entry["chain"] = attribution_chain[source]
result.append(entry)
return result
def parse_time_profile(xml_text: str) -> tuple[list[dict], dict]:
root = ET.fromstring(xml_text)
id_map: dict[str, ET.Element] = {}
for elem in root.iter():
eid = elem.get("id")
if eid:
id_map[eid] = elem
binary_info: dict[str, dict] = {}
for binary_el in root.iter("binary"):
bref = binary_el.get("ref")
if bref and bref in id_map:
binary_el = id_map[bref]
bname = binary_el.get("name", "")
bpath = binary_el.get("path", "")
load_addr = binary_el.get("load-addr", "")
if bname and bpath and load_addr:
binary_info[bname] = {"path": bpath, "load_addr": load_addr}
samples: list[dict] = []
for row in root.iter("row"):
weight_ns = 1_000_000
weight_el = row.find("weight")
if weight_el is not None:
ref = weight_el.get("ref")
if ref and ref in id_map:
weight_el = id_map[ref]
try:
weight_ns = int(weight_el.text or "1000000")
except ValueError:
pass
thread_el = row.find("thread")
thread_name = ""
if thread_el is not None:
ref = thread_el.get("ref")
if ref and ref in id_map:
thread_el = id_map[ref]
thread_name = thread_el.get("fmt", "")
bt_el = row.find("backtrace")
if bt_el is None:
continue
ref = bt_el.get("ref")
if ref and ref in id_map:
bt_el = id_map[ref]
frames = []
for frame_el in bt_el.iter("frame"):
fref = frame_el.get("ref")
if fref and fref in id_map:
frame_el = id_map[fref]
name = frame_el.get("name", "")
addr = frame_el.get("addr", "")
source_file = None
source_line = None
src_el = frame_el.find("source")
if src_el is not None:
source_line = src_el.get("line")
path_el = src_el.find("path")
if path_el is not None:
pref = path_el.get("ref")
if pref and pref in id_map:
path_el = id_map[pref]
source_file = (path_el.text or "").strip()
frames.append({
"name": name,
"addr": addr,
"source_file": source_file,
"source_line": source_line,
})
if frames:
samples.append({
"weight_ns": weight_ns,
"frames": frames,
"thread": thread_name,
})
return samples, binary_info
CATEGORY_RULES: list[tuple[str, list[str]]] = [
("DFA scan", ["dfa::", "DFA::", "aho_corasick::dfa", "aho_corasick::automaton",
"aho_corasick::ahocorasick", "AcAutomaton"]),
("Daachorse scan", ["bytewise", "charwise", "daachorse"]),
("Engine dispatch", ["engine::", "ScanPlan", "BytewiseMatcher", "CharwiseMatcher",
"count_non_ascii", "text_non_ascii_density",
"PatternIndex", "dispatch_indirect", "DIRECT_RULE"]),
("Search hot path", ["search::", "walk_and_scan", "scan_variant", "process_match",
"is_match_simple", "process_simple"]),
("State machine", ["state::", "WordState", "SimpleMatchState", "ScanContext",
"generation", "mark_positive"]),
("Rule evaluation", ["rule::", "process_entry", "RuleSet", "RuleHot", "RuleShape",
"PatternEntry", "PatternKind"]),
("Text transform", ["process::", "transform::", "DeleteMatcher", "VariantNormMatcher",
"RomanizeMatcher", "NormalizeMatcher", "NormalizeFilter",
"string_pool", "ProcessType"]),
("ASCII check", ["is_ascii", "ascii::"]),
("Allocator", ["alloc::", "mi_", "malloc", "free", "realloc", "Allocator"]),
("Vec / collections", ["vec::", "Vec::", "HashMap", "hashbrown", "AHash",
"drop_in_place"]),
("Std / overhead", ["black_box", "hint::", "option.rs", "cmp::", "PartialOrd",
"PartialEq", "slice::cmp", "ptr::read", "ptr::copy",
"memcmp", "memcpy", "memmove", "memmove"]),
("Sort (init)", ["sort::", "quicksort", "smallsort", "median"]),
("Main loop", ["profile_search", "profile_build", "main"]),
("Thread / spawn", ["thread::", "pthread", "thread_start", "spawn"]),
("dyld / system", ["dyld", "libsystem", "boot_boot", "ignite", "_open"]),
]
def categorize(demangled_leaf: str, demangled_chain: list[str]) -> str:
combined = demangled_leaf + " " + " ".join(demangled_chain[:5])
for category, keywords in CATEGORY_RULES:
for kw in keywords:
if kw in combined:
return category
return "Other"
BOILERPLATE_FRAGMENTS = {
"FnOnce", "call_once", "lang_start", "std::rt", "std::sys",
"std::panicking", "std::thread::lifecycle", "boxed::Box",
"core::ops::function",
}
def _find_meaningful_caller(chain_names: list[str]) -> str | None:
for name in chain_names:
if any(frag in name for frag in BOILERPLATE_FRAGMENTS):
continue
if name in ("main", "start", "thread_start", "_pthread_start"):
continue
return name
return None
def analyze_trace(trace_path: Path) -> dict:
print(f"Exporting trace: {trace_path.name}...")
toc_result = subprocess.run(
["xctrace", "export", "--input", str(trace_path), "--toc"],
capture_output=True, text=True,
)
if toc_result.returncode != 0:
sys.exit(f"xctrace export --toc failed: {toc_result.stderr}")
xpath = '/trace-toc/run[@number="1"]/data/table[@schema="time-profile"]'
export_result = subprocess.run(
["xctrace", "export", "--input", str(trace_path), "--xpath", xpath],
capture_output=True, text=True,
)
if export_result.returncode != 0:
sys.exit(f"xctrace export failed: {export_result.stderr}")
xml_text = export_result.stdout
if not xml_text.strip():
sys.exit("Empty export — trace may not contain time-profile data")
print("Parsing samples...")
samples, binary_info = parse_time_profile(xml_text)
if not samples:
print("No samples parsed. Try opening in Instruments.app for interactive analysis.")
return {"total_weight_ms": 0, "categories": {}, "top_symbols": [], "samples": 0}
main_samples = [s for s in samples if "Main Thread" in s["thread"] or "profile_" in s["thread"]]
if not main_samples:
main_samples = samples
all_mangled: set[str] = set()
for s in main_samples:
for f in s["frames"]:
if f["name"]:
all_mangled.add(f["name"])
print(f"Demangling {len(all_mangled)} symbols...")
demangle_map = batch_demangle(all_mangled)
atos_cache: dict[str, list[dict]] = {}
profile_binary = None
for bname in PROFILE_BINARIES:
profile_binary = binary_info.get(bname)
if profile_binary:
break
if profile_binary:
all_leaf_addrs: set[str] = set()
for s in main_samples:
if s["frames"]:
addr = s["frames"][0].get("addr", "")
if addr:
all_leaf_addrs.add(addr)
if all_leaf_addrs:
print(f"Resolving {len(all_leaf_addrs)} addresses via atos -i...")
atos_cache = resolve_addresses_atos(
profile_binary["path"],
profile_binary["load_addr"],
sorted(all_leaf_addrs),
)
if atos_cache:
print(f" Resolved {len(atos_cache)} inline chains")
else:
print(" (atos resolution failed — inline chains unavailable)")
else:
print(" (binary info not found in trace — skipping atos resolution)")
total_weight_ns = sum(s["weight_ns"] for s in main_samples)
categories: Counter[str] = Counter()
leaf_symbols: Counter[str] = Counter()
leaf_with_caller: Counter[str] = Counter()
for s in main_samples:
frames = s["frames"]
w = s["weight_ns"]
leaf_name = demangle_map.get(frames[0]["name"], frames[0]["name"])
chain_names = [demangle_map.get(f["name"], f["name"]) for f in frames[1:]]
cat = categorize(leaf_name, chain_names)
categories[cat] += w
leaf_short = shorten_symbol(leaf_name)
src = frames[0].get("source_file") or ""
line = frames[0].get("source_line") or ""
if src:
fname = Path(src).name
loc = f"{fname}:{line}" if line and line != "0" else fname
leaf_display = f"{leaf_short} ({loc})"
else:
leaf_display = leaf_short
leaf_symbols[leaf_display] += w
meaningful_caller = _find_meaningful_caller(chain_names)
if meaningful_caller:
caller_short = shorten_symbol(meaningful_caller)
leaf_with_caller[f"{leaf_display} <- {caller_short}"] += w
top_symbols = [
{"symbol": sym, "weight_ms": w / 1_000_000, "pct": w / total_weight_ns * 100}
for sym, w in leaf_symbols.most_common(30)
]
call_tree = _build_call_tree(main_samples, demangle_map, total_weight_ns)
heavy_backtraces = _build_heavy_backtraces(main_samples, demangle_map, total_weight_ns, atos_cache)
source_attribution = _build_source_attribution(main_samples, atos_cache, total_weight_ns) if atos_cache else []
return {
"total_weight_ms": total_weight_ns / 1_000_000,
"samples": len(main_samples),
"categories": {
cat: {"weight_ms": w / 1_000_000, "pct": w / total_weight_ns * 100}
for cat, w in sorted(categories.items(), key=lambda x: -x[1])
},
"top_symbols": top_symbols,
"top_with_caller": [
{"display": d, "weight_ms": w / 1_000_000, "pct": w / total_weight_ns * 100}
for d, w in leaf_with_caller.most_common(30)
],
"call_tree": call_tree,
"heavy_backtraces": heavy_backtraces,
"source_attribution": source_attribution,
}
class CallTreeNode:
__slots__ = ("name", "total_ns", "self_ns", "children")
def __init__(self, name: str):
self.name = name
self.total_ns: int = 0
self.self_ns: int = 0
self.children: dict[str, CallTreeNode] = {}
def child(self, name: str) -> CallTreeNode:
if name not in self.children:
self.children[name] = CallTreeNode(name)
return self.children[name]
def _frame_display(frame: dict, demangle_map: dict[str, str]) -> str:
name = demangle_map.get(frame["name"], frame["name"])
short = shorten_symbol(name)
src = frame.get("source_file") or ""
line = frame.get("source_line") or ""
if src:
fname = Path(src).name
loc = f"{fname}:{line}" if line and line != "0" else fname
return f"{short} ({loc})"
return short
def _build_call_tree(
samples: list[dict],
demangle_map: dict[str, str],
total_weight_ns: int,
) -> CallTreeNode:
root = CallTreeNode("[root]")
root.total_ns = total_weight_ns
for s in samples:
frames = s["frames"]
w = s["weight_ns"]
stack = []
for f in reversed(frames):
name = demangle_map.get(f["name"], f["name"])
short = shorten_symbol(name)
src = f.get("source_file") or ""
line = f.get("source_line") or ""
if src:
fname = Path(src).name
loc = f"{fname}:{line}" if line and line != "0" else fname
display = f"{short} ({loc})"
else:
display = short
stack.append(display)
node = root
for display in stack:
node = node.child(display)
node.total_ns += w
node.self_ns += w
return root
def _print_call_tree(
node: CallTreeNode,
total_ns: int,
depth: int = 0,
min_pct: float = 1.0,
max_depth: int = 15,
):
if depth > max_depth:
return
children = sorted(node.children.values(), key=lambda c: -c.total_ns)
hot_children = [c for c in children if (c.total_ns / total_ns * 100 if total_ns else 0) >= min_pct]
for child in hot_children:
collapsed: list[str] = []
walk = child
while True:
walk_children = sorted(walk.children.values(), key=lambda c: -c.total_ns)
walk_hot = [c for c in walk_children if (c.total_ns / total_ns * 100 if total_ns else 0) >= min_pct]
walk_self_pct = walk.self_ns / total_ns * 100 if total_ns else 0
if len(walk_hot) == 1 and walk_self_pct < 0.5:
collapsed.append(walk.name)
walk = walk_hot[0]
else:
break
pct = walk.total_ns / total_ns * 100 if total_ns else 0
self_pct = walk.self_ns / total_ns * 100 if total_ns else 0
self_tag = f" [self: {self_pct:.1f}%]" if self_pct >= 0.5 else ""
indent = " │ " * depth
if collapsed:
print(f" {pct:5.1f}%{self_tag} {indent} ├─ ... → {walk.name}")
else:
print(f" {pct:5.1f}%{self_tag} {indent} ├─ {walk.name}")
_print_call_tree(walk, total_ns, depth + 1, min_pct, max_depth)
HEAVY_BOILERPLATE_EXACT = {
"start", "main", "thread_start", "_pthread_start",
}
HEAVY_BOILERPLATE_FRAGMENTS = {
"FnOnce", "call_once", "lang_start", "std::rt", "std::sys",
"std::panicking", "std::thread::lifecycle",
}
def _build_heavy_backtraces(
samples: list[dict],
demangle_map: dict[str, str],
total_weight_ns: int,
atos_cache: dict[str, list[dict]] | None = None,
top_n: int = 8,
) -> list[dict]:
if atos_cache is None:
atos_cache = {}
leaf_stacks: dict[str, list[tuple[int, list[str]]]] = {}
leaf_atos: dict[str, Counter[tuple[str, ...]]] = {}
for s in samples:
frames = s["frames"]
w = s["weight_ns"]
if not frames:
continue
leaf_name = demangle_map.get(frames[0]["name"], frames[0]["name"])
leaf_short = shorten_symbol(leaf_name)
src = frames[0].get("source_file") or ""
line = frames[0].get("source_line") or ""
if src:
fname = Path(src).name
loc = f"{fname}:{line}" if line and line != "0" else fname
leaf_display = f"{leaf_short} ({loc})"
else:
leaf_display = leaf_short
callers = []
for f in frames[1:]:
name = demangle_map.get(f["name"], f["name"])
short = shorten_symbol(name)
if short in HEAVY_BOILERPLATE_EXACT:
continue
if any(frag in name for frag in HEAVY_BOILERPLATE_FRAGMENTS):
continue
src2 = f.get("source_file") or ""
line2 = f.get("source_line") or ""
if src2:
fname2 = Path(src2).name
loc2 = f"{fname2}:{line2}" if line2 and line2 != "0" else fname2
callers.append(f"{short} ({loc2})")
else:
callers.append(short)
if leaf_display not in leaf_stacks:
leaf_stacks[leaf_display] = []
leaf_stacks[leaf_display].append((w, callers))
leaf_addr = frames[0].get("addr", "")
if leaf_addr and leaf_addr in atos_cache:
chain = atos_cache[leaf_addr]
atos_callers = tuple(
f"{f['name']} ({f['file']}:{f['line']})" if f.get("file") else f["name"]
for f in chain[1:]
)
if leaf_display not in leaf_atos:
leaf_atos[leaf_display] = Counter()
leaf_atos[leaf_display][atos_callers] += w
leaf_totals = {leaf: sum(w for w, _ in stacks) for leaf, stacks in leaf_stacks.items()}
top_leaves = sorted(leaf_totals.items(), key=lambda x: -x[1])[:top_n]
result = []
for leaf_display, total_ns in top_leaves:
pct = total_ns / total_weight_ns * 100
atos_has_callers = (
leaf_display in leaf_atos
and leaf_atos[leaf_display]
and any(callers for callers in leaf_atos[leaf_display] if callers)
)
if atos_has_callers:
top_stacks = []
for stack_tuple, sw in leaf_atos[leaf_display].most_common(3):
top_stacks.append({
"callers": list(stack_tuple),
"weight_ms": sw / 1_000_000,
"pct": sw / total_weight_ns * 100,
"source": "atos",
})
else:
stack_weights: Counter[tuple[str, ...]] = Counter()
for w, callers in leaf_stacks[leaf_display]:
key = tuple(callers[:12])
stack_weights[key] += w
top_stacks = []
for stack_tuple, sw in stack_weights.most_common(3):
top_stacks.append({
"callers": list(stack_tuple),
"weight_ms": sw / 1_000_000,
"pct": sw / total_weight_ns * 100,
})
result.append({
"leaf": leaf_display,
"total_ms": total_ns / 1_000_000,
"pct": pct,
"stacks": top_stacks,
})
return result
def print_report(result: dict, path: Path) -> None:
total_ms = result["total_weight_ms"]
if total_ms == 0:
print("No samples to report.")
return
print(f"\n{'=' * 78}")
print(f" Trace: {path.name}")
print(f" Samples: {result['samples']:,} Total: {total_ms:.0f} ms")
print(f"{'=' * 78}")
print("\n Category Breakdown:")
print(f" {'-' * 58}")
for cat, info in result["categories"].items():
pct = info["pct"]
bar = "█" * int(pct / 2)
print(f" {pct:5.1f}% {info['weight_ms']:7.0f}ms {cat:<25s} {bar}")
print("\n Top Leaf Symbols:")
print(" " + "-" * 74)
print(f" {'%':>6s} {'ms':>7s} Symbol")
print(f" {'-' * 74}")
for entry in result["top_symbols"][:20]:
print(f" {entry['pct']:5.1f}% {entry['weight_ms']:7.0f}ms {entry['symbol']}")
if result.get("top_with_caller"):
print("\n Top Leaf + Caller:")
print(" " + "-" * 74)
for entry in result["top_with_caller"][:15]:
print(f" {entry['pct']:5.1f}% {entry['weight_ms']:7.0f}ms {entry['display']}")
if result.get("call_tree"):
print("\n Call Tree (top-down, ≥1% of total):")
print(" " + "-" * 74)
_print_call_tree(result["call_tree"], result["call_tree"].total_ns)
if result.get("heavy_backtraces"):
print("\n Heavy Backtraces (bottom-up, inline-resolved via atos):")
print(" " + "=" * 74)
for entry in result["heavy_backtraces"]:
print(f"\n {entry['pct']:5.1f}% {entry['total_ms']:7.0f}ms ▶ {entry['leaf']}")
has_any_callers = any(stack["callers"] for stack in entry["stacks"])
if not has_any_callers:
print(" (callers inlined away — open in Instruments.app for full context)")
continue
for i, stack in enumerate(entry["stacks"]):
if not stack["callers"]:
continue
source = stack.get("source", "")
label = "inline chain" if source == "atos" else f"stack #{i+1}"
print(f" {label} ({stack['pct']:.1f}%):")
for depth, caller in enumerate(stack["callers"]):
prefix = " " + " " * depth + "← "
print(f"{prefix}{caller}")
if result.get("source_attribution"):
print("\n Source Attribution (inline-resolved, by our code location):")
print(" " + "=" * 74)
for entry in result["source_attribution"]:
if entry["pct"] < 0.5:
break
chain_suffix = f" in: {entry['chain']}" if entry.get("chain") else ""
print(f" {entry['pct']:5.1f}% {entry['weight_ms']:7.0f}ms {entry['source']}{chain_suffix}")
print()
def main():
parser = argparse.ArgumentParser(
description="Profile matcher_rs using Xcode Instruments.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__,
)
sub = parser.add_subparsers(dest="command", required=True)
rec = sub.add_parser("record", help="Record a Time Profiler trace")
rec.add_argument("--target", default="search", choices=["search", "build"],
help="Profiling target: 'search' (default) or 'build' (construction)")
rec.add_argument("--scene", default=None,
help="Named scene (e.g. en-search, cn-transform, all). Overrides --mode/--dict/etc.")
rec.add_argument("--mode", default="process", choices=["is_match", "process"])
rec.add_argument("--shape", default="literal",
choices=["literal", "and", "not", "or", "word_boundary"])
rec.add_argument("--dict", default="en", choices=["en", "cn", "mixed"], dest="dict_lang")
rec.add_argument("--rules", type=int, default=10_000)
rec.add_argument("--pt", default="none",
choices=["none", "variant_norm", "delete", "norm", "dn", "fdn", "romanize", "pychar"])
rec.add_argument("--seconds", type=int, default=10)
rec.add_argument("--output", "-o", type=Path, default=None)
rec.add_argument("--no-build", action="store_true", help="Skip rebuild (use existing binary as-is)")
rec.add_argument("--analyze", action="store_true", help="Analyze immediately after recording")
rec.add_argument("--open", action="store_true", help="Open trace in Instruments.app after recording")
ana = sub.add_parser("analyze", help="Analyze an existing .trace bundle")
ana.add_argument("traces", nargs="+", type=Path, help="One or more .trace files")
ana.add_argument("--open", action="store_true", help="Also open in Instruments.app")
opn = sub.add_parser("open", help="Open a .trace in Instruments.app")
opn.add_argument("trace", type=Path)
args = parser.parse_args()
if args.command == "record":
trace = record_profile(
target=args.target,
scene=args.scene,
mode=args.mode,
shape=args.shape,
dict_lang=args.dict_lang,
rules=args.rules,
pt=args.pt,
seconds=args.seconds,
output=args.output,
build=not args.no_build,
)
if args.analyze:
result = analyze_trace(trace)
print_report(result, trace)
if args.open:
subprocess.run(["open", str(trace)])
if not args.analyze and not args.open:
print(f"Open in Instruments: open {trace}")
elif args.command == "analyze":
for path in args.traces:
if not path.exists():
print(f"Warning: {path} not found, skipping", file=sys.stderr)
continue
result = analyze_trace(path)
print_report(result, path)
if args.open:
subprocess.run(["open", str(path)])
elif args.command == "open":
if not args.trace.exists():
sys.exit(f"Error: {args.trace} not found")
subprocess.run(["open", str(args.trace)])
if __name__ == "__main__":
main()