import argparse
import gzip
import json
import os
ALLOC_NOISE = ("alloc::", "dhat::")
def clean_symbol(name):
return name.split("::h")[0]
def resolve_allocator_stack(pp, frames):
stack = []
for f_idx in pp.get('fs', [])[:15]:
if not isinstance(f_idx, int) or f_idx >= len(frames):
continue
name = clean_symbol(frames[f_idx])
if not any(noise in name for noise in ALLOC_NOISE):
stack.append(name)
return stack
def print_top_allocators(pps, frames, total_bytes, limit=10):
print(f"Top {limit} Allocators (by total bytes):")
for pp in pps[:limit]:
tb = pp.get('tb', 0)
tbk = pp.get('tbk', 0)
stack = resolve_allocator_stack(pp, frames)
name = stack[0] if stack else "Unknown"
pct = (tb / total_bytes * 100) if total_bytes > 0 else 0
print(f"- {tb:12,} bytes ({pct:5.1f}%) | {tbk:8,} allocs | {name}")
for s in stack[1:4]:
print(f" <- {s}")
def analyze_dhat(dhat_path="dhat-heap.json"):
print("╭────────────────────────────────────────────────────────────────────────────╮")
print("│ DHAT MEMORY ANALYSIS │")
print("╰────────────────────────────────────────────────────────────────────────────╯")
if not os.path.exists(dhat_path):
print(f"{dhat_path} not found!\n")
return
try:
with open(dhat_path) as f:
d = json.load(f)
frames = d.get('ftbl', [])
pps = d.get('pps', [])
total_bytes = sum(p.get('tb', 0) for p in pps)
total_blocks = sum(p.get('tbk', 0) for p in pps)
print(f"Total Allocated: {total_bytes:,} bytes in {total_blocks:,} blocks\n")
pps.sort(key=lambda x: x.get('tb', 0), reverse=True)
print_top_allocators(pps, frames, total_bytes)
except Exception as e:
print(f"Error analyzing DHAT: {e}")
def build_thread_tables(thread):
return {
"strings": thread.get('stringArray', []),
"f_func": thread.get('frameTable', {}).get('func', []),
"fn_name": thread.get('funcTable', {}).get('name', []),
"st_frame": thread.get('stackTable', {}).get('frame', []),
"st_prefix": thread.get('stackTable', {}).get('prefix', []),
}
def resolve_func_name(frame_idx, tables):
f_func = tables["f_func"]
fn_name = tables["fn_name"]
strings = tables["strings"]
if frame_idx is None or frame_idx >= len(f_func):
return None
func_idx = f_func[frame_idx]
if func_idx is None or func_idx >= len(fn_name):
return None
name_idx = fn_name[func_idx]
if name_idx is None or name_idx >= len(strings):
return None
return clean_symbol(strings[name_idx])
def count_samples(stack_data, tables):
st_frame = tables["st_frame"]
st_prefix = tables["st_prefix"]
self_counts = {}
total_counts = {}
for stack_idx in stack_data:
if stack_idx is None:
continue
curr = stack_idx
is_leaf = True
seen = set()
while curr is not None and curr < len(st_frame):
func_name = resolve_func_name(st_frame[curr], tables)
if func_name is not None:
if is_leaf:
self_counts[func_name] = self_counts.get(func_name, 0) + 1
is_leaf = False
if func_name not in seen:
total_counts[func_name] = total_counts.get(func_name, 0) + 1
seen.add(func_name)
curr = st_prefix[curr] if curr < len(st_prefix) else None
return self_counts, total_counts
def print_top_functions(counts, total_samples, header, limit=10):
print(header)
sorted_items = sorted(counts.items(), key=lambda x: x[1], reverse=True)
for func, count in sorted_items[:limit]:
print(f" - {count:5} samples ({count / total_samples * 100:5.1f}%) : {func}")
def analyze_thread(thread):
samples = thread.get('samples', {})
stack_data = samples.get('stack', [])
if not stack_data or len(stack_data) < 100:
return
name = thread.get('name', 'Unknown')
is_main = thread.get('isMainThread', False)
total_samples = len(stack_data)
print(f"\nThread: {name} (Main: {is_main}) - {total_samples} samples")
tables = build_thread_tables(thread)
self_counts, total_counts = count_samples(stack_data, tables)
print_top_functions(self_counts, total_samples,
" Top functions by SELF time (where execution was bottlenecked):")
print_top_functions(total_counts, total_samples,
"\n Top functions by TOTAL time (execution + children):")
def analyze_samply(samply_path="profile.json.gz"):
print("\n╭────────────────────────────────────────────────────────────────────────────╮")
print("│ SAMPLY CPU ANALYSIS │")
print("╰────────────────────────────────────────────────────────────────────────────╯")
if not os.path.exists(samply_path):
print(f"{samply_path} not found!\n")
return
try:
with gzip.open(samply_path, 'rt') as f:
prof = json.load(f)
for thread in prof.get('threads', []):
analyze_thread(thread)
except Exception as e:
print(f"Error analyzing profile.json.gz: {e}")
def main():
parser = argparse.ArgumentParser(
description="Parse and analyze DHAT and Samply profiles to identify memory and CPU bottlenecks."
)
parser.add_argument('--dhat-file', type=str, default='dhat-heap.json',
help="Path to DHAT heap profile (default: dhat-heap.json)")
parser.add_argument('--samply-file', type=str, default='profile.json.gz',
help="Path to Samply profile (default: profile.json.gz)")
parser.add_argument('--no-dhat', action='store_true', help="Skip DHAT memory analysis")
parser.add_argument('--no-samply', action='store_true', help="Skip Samply CPU analysis")
args = parser.parse_args()
if not args.no_dhat:
analyze_dhat(args.dhat_file)
if not args.no_samply:
analyze_samply(args.samply_file)
if __name__ == '__main__':
main()