from __future__ import annotations
import argparse
import dataclasses
import json
import multiprocessing
import pathlib
import utils
ftime_pb2 = utils.import_protobufs('ftime.proto')
def _initialize_worker(path_to_id):
global PATH_TO_ID
PATH_TO_ID = path_to_id
_PARSE_CHUNKSIZE = 20
_AGG_CHUNKSIZE = 100
def main(args):
out = utils.CapacitorFile(args.out_file)
traces = _collect_traces(args.out_dir, args.limit)
sources: dict[str, tuple[int, list[Source]]] = {}
next_id = 0
total = 0
total_source = 0
n_compiles = 0
with multiprocessing.Pool() as p:
print('Note: Parsing and aggregating can take several minutes')
print('Parsing traces')
parsed = p.imap_unordered(_parse_trace, traces, chunksize=_PARSE_CHUNKSIZE)
for compile_total, compile_total_source, compile_sources in parsed:
if compile_total_source is not None:
total += compile_total
total_source += compile_total_source
n_compiles += 1
for source in compile_sources:
if source.name not in sources:
sources[source.name] = (next_id, [])
next_id += 1
sources[source.name][1].append(source)
print('Aggregating traces')
path_to_id = {k: v[0] for k, v in sources.items()}
with multiprocessing.Pool(initializer=_initialize_worker,
initargs=(path_to_id, )) as p:
details = sorted(p.imap_unordered(_aggregate,
(v[1] for v in sources.values()),
chunksize=_AGG_CHUNKSIZE),
key=lambda x: x.id)
print(f'Dumping to {out}')
out.write(
ftime_pb2.Analysis(
out_dir=str(args.out_dir),
total_us=total,
total_source_us=total_source,
n_compiles=n_compiles,
sources=details,
))
@dataclasses.dataclass
class Source:
name: str
transitive_us: int = 0
duration_us: int = 0
includes: list[Source] = dataclasses.field(default_factory=list)
def __repr__(self):
return repr({
'name': self.name,
'transitive': self.transitive_us,
'direct': self.direct_us,
'includes': [i.name for i in self.includes],
})
def _collect_traces(out_dir: pathlib.Path, limit: int | None = None):
traces = []
print('Finding json files')
for d, _, filenames in out_dir.walk():
all_files = set(filenames)
for f in filenames:
idx = f.find('.json')
if idx > 0 and f[:idx] + '.o' in all_files:
traces.append(d / f)
if limit:
traces = sorted(traces)[:limit]
print(f'Found {len(traces)} trace files.')
return traces
def _aggregate(sources: list[Source]) -> ftime_pb2.SourceFile:
name = sources[0].name
direct_us = 0
transitive_us = 0
includes = set()
for source in sources:
direct_us += source.direct_us
transitive_us += source.transitive_us
includes.update([PATH_TO_ID[s.name] for s in source.includes])
return ftime_pb2.SourceFile(name=name,
id=PATH_TO_ID[name],
direct_us=direct_us,
transitive_us=transitive_us,
count=len(sources),
includes=sorted(includes))
def _parse_trace(path: pathlib.Path) -> tuple[int, int | None, list[Source]]:
total_source = None
events = []
with path.open('r') as f:
for event in json.load(f)['traceEvents']:
match event['name']:
case 'Source':
events.append(event)
case 'Total Source':
total_source = event['dur']
case 'Total ExecuteCompiler':
total = event['dur']
events.sort(key=lambda e: e['ts'])
sources = []
stack: list[tuple[Source, int]] = []
for event in events:
if event['ph'] == 'b': fname = event['args']['detail']
source = Source(name=fname)
sources.append(source)
if stack:
stack[-1][0].includes.append(source)
stack.append((source, event['ts']))
else:
source, begin = stack.pop()
duration = event['ts'] - begin
source.transitive_us = duration
for include in source.includes:
duration -= include.transitive_us
source.direct_us = duration
return total, total_source, sources
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('-C',
required=True,
dest='out_dir',
help='GN output directory',
type=pathlib.Path)
parser.add_argument('-o',
required=True,
dest='out_file',
help='capacitor file to output',
type=pathlib.Path)
parser.add_argument(
'--limit',
type=int,
help='Parse a limited number of traces. Useful for debugging.',
)
main(parser.parse_args())