use crate::templates::TEMPLATE_QUERY_PARAM_SCRIPT;
use crate::{types::*, ParseConfig};
use fxhash::FxHashMap;
use html_escape::encode_text;
use std::cell::RefCell;
use std::collections::HashSet;
use std::ffi::{OsStr, OsString};
use std::path::Path;
use std::path::PathBuf;
use tinytemplate::TinyTemplate;
use serde_json::Value;
fn format_json_pretty(payload: &str) -> Result<String, anyhow::Error> {
match serde_json::from_str::<Value>(payload) {
Ok(value) => Ok(serde_json::to_string_pretty(&value)?),
Err(_) => {
Ok(payload.to_string())
}
}
}
use std::sync::OnceLock;
use syntect::highlighting::ThemeSet;
use syntect::parsing::SyntaxSet;
struct SyntectResources {
syntax_set: SyntaxSet,
theme_set: ThemeSet,
}
fn syntect_resources() -> &'static SyntectResources {
static RESOURCES: OnceLock<SyntectResources> = OnceLock::new();
RESOURCES.get_or_init(|| SyntectResources {
syntax_set: SyntaxSet::load_defaults_newlines(),
theme_set: ThemeSet::load_defaults(),
})
}
pub use crate::types::{CompileId, EmptyMetadata, Envelope, GraphRuntime, Metadata, OpRuntime};
pub enum ParserOutput {
File(PathBuf, String), GlobalFile(PathBuf, String), PayloadFile(PathBuf), PayloadReformatFile(PathBuf, fn(&str) -> Result<String, anyhow::Error>), Link(String, String), }
pub type ParserResults = Vec<ParserOutput>;
pub trait StructuredLogParser {
fn get_metadata<'e>(&self, e: &'e Envelope) -> Option<Metadata<'e>>;
fn parse<'e>(
&self,
lineno: usize, metadata: Metadata<'e>, rank: Option<u32>, compile_id: &Option<CompileId>, payload: &str, ) -> anyhow::Result<ParserResults>;
fn name(&self) -> &'static str;
}
pub fn build_file_path(filename: &str, lineno: usize, compile_id: &Option<CompileId>) -> PathBuf {
let compile_id_dir: PathBuf = compile_id
.as_ref()
.map_or(format!("unknown_{lineno}"), |cid| cid.as_directory_name())
.into();
let subdir = PathBuf::from(compile_id_dir);
subdir.join(filename)
}
fn simple_file_output(
filename: &str,
lineno: usize,
compile_id: &Option<CompileId>,
payload: &str,
) -> anyhow::Result<ParserResults> {
let f = build_file_path(filename, lineno, compile_id);
Ok(Vec::from([ParserOutput::File(f, String::from(payload))]))
}
fn payload_file_output(
filename: &str,
lineno: usize,
compile_id: &Option<CompileId>,
) -> anyhow::Result<ParserResults> {
let f = build_file_path(filename, lineno, compile_id);
Ok(Vec::from([ParserOutput::PayloadFile(f)]))
}
fn payload_reformat_file_output(
filename: &str,
lineno: usize,
compile_id: &Option<CompileId>,
formatter: fn(&str) -> Result<String, anyhow::Error>,
) -> anyhow::Result<ParserResults> {
let f = build_file_path(filename, lineno, compile_id);
Ok(Vec::from([ParserOutput::PayloadReformatFile(f, formatter)]))
}
pub struct SentinelFileParser {
filename: &'static str,
get_sentinel: fn(&Envelope) -> Option<&EmptyMetadata>,
}
impl SentinelFileParser {
pub fn new(
filename: &'static str,
get_sentinel: fn(&Envelope) -> Option<&EmptyMetadata>,
) -> Self {
Self {
filename,
get_sentinel,
}
}
}
impl StructuredLogParser for SentinelFileParser {
fn name(&self) -> &'static str {
self.filename
}
fn get_metadata<'e>(&self, e: &'e Envelope) -> Option<Metadata<'e>> {
(self.get_sentinel)(e).map(|m| Metadata::Empty(m))
}
fn parse<'e>(
&self,
lineno: usize,
_metadata: Metadata<'e>,
_rank: Option<u32>,
compile_id: &Option<CompileId>,
_payload: &str,
) -> anyhow::Result<ParserResults> {
payload_file_output(&format!("{}.txt", self.filename), lineno, compile_id)
}
}
pub struct GraphDumpParser;
impl StructuredLogParser for GraphDumpParser {
fn name(&self) -> &'static str {
"graph_dump" }
fn get_metadata<'e>(&self, e: &'e Envelope) -> Option<Metadata<'e>> {
if let Some(graph_dump) = &e.graph_dump {
if graph_dump.name.starts_with("vllm_") {
return None;
}
}
e.graph_dump.as_ref().map(|m| Metadata::GraphDump(m))
}
fn parse<'e>(
&self,
lineno: usize,
metadata: Metadata<'e>,
_rank: Option<u32>,
compile_id: &Option<CompileId>,
_payload: &str,
) -> anyhow::Result<ParserResults> {
if let Metadata::GraphDump(metadata) = metadata {
let filename: PathBuf = {
let mut r = OsString::from(&metadata.name);
r.push(OsStr::new(".txt"));
r.into()
};
payload_file_output(&filename.to_string_lossy(), lineno, compile_id)
} else {
Err(anyhow::anyhow!("Expected GraphDump metadata"))
}
}
}
pub struct DynamoOutputGraphParser;
impl StructuredLogParser for DynamoOutputGraphParser {
fn name(&self) -> &'static str {
"dynamo_output_graph"
}
fn get_metadata<'e>(&self, e: &'e Envelope) -> Option<Metadata<'e>> {
e.dynamo_output_graph
.as_ref()
.map(|m| Metadata::DynamoOutputGraph(m))
}
fn parse<'e>(
&self,
lineno: usize,
_metadata: Metadata<'e>, _rank: Option<u32>,
compile_id: &Option<CompileId>,
_payload: &str,
) -> anyhow::Result<ParserResults> {
payload_file_output("dynamo_output_graph.txt", lineno, compile_id)
}
}
pub struct DynamoGuardParser<'t> {
tt: &'t TinyTemplate<'t>,
}
impl StructuredLogParser for DynamoGuardParser<'_> {
fn name(&self) -> &'static str {
"dynamo_guards"
}
fn get_metadata<'e>(&self, e: &'e Envelope) -> Option<Metadata<'e>> {
e.dynamo_guards.as_ref().map(|m| Metadata::Empty(m))
}
fn parse<'e>(
&self,
lineno: usize,
_metadata: Metadata<'e>,
_rank: Option<u32>,
compile_id: &Option<CompileId>,
payload: &str,
) -> anyhow::Result<ParserResults> {
let filename = format!("{}.html", self.name());
let guards = serde_json::from_str::<Vec<DynamoGuard>>(payload)?;
let guards_context = DynamoGuardsContext {
guards,
qps: TEMPLATE_QUERY_PARAM_SCRIPT,
};
let output = self.tt.render(&filename, &guards_context)?;
simple_file_output(&filename, lineno, compile_id, &output)
}
}
pub struct InductorOutputCodeParser {
plain_text: bool,
}
impl InductorOutputCodeParser {
pub fn new(config: &ParseConfig) -> Self {
InductorOutputCodeParser {
plain_text: config.plain_text,
}
}
}
impl StructuredLogParser for InductorOutputCodeParser {
fn name(&self) -> &'static str {
"inductor_output_code"
}
fn get_metadata<'e>(&self, e: &'e Envelope) -> Option<Metadata<'e>> {
e.inductor_output_code
.as_ref()
.map(|m| Metadata::InductorOutputCode(m))
}
fn parse<'e>(
&self,
lineno: usize,
metadata: Metadata<'e>,
_rank: Option<u32>,
compile_id: &Option<CompileId>,
payload: &str,
) -> anyhow::Result<ParserResults> {
if let Metadata::InductorOutputCode(metadata) = metadata {
let filename = metadata
.filename
.as_ref()
.and_then(|p| Path::file_stem(p))
.map_or_else(
|| {
if self.plain_text {
PathBuf::from("inductor_output_code.txt")
} else {
PathBuf::from("inductor_output_code.html")
}
},
|stem| {
let mut r = OsString::from("inductor_output_code_");
r.push(stem);
if self.plain_text {
r.push(OsStr::new(".txt"));
} else {
r.push(OsStr::new(".html"));
}
r.into()
},
);
if self.plain_text {
payload_file_output(&filename.to_string_lossy(), lineno, compile_id)
} else {
let output_content = match generate_html_output(payload) {
Ok(html) => html,
Err(_e) => {
return Err(anyhow::anyhow!("Failed to parse inductor code to html"))
}
};
simple_file_output(
&filename.to_string_lossy(),
lineno,
compile_id,
&output_content,
)
}
} else {
Err(anyhow::anyhow!("Expected InductorOutputCode metadata"))
}
}
}
fn generate_html_output(payload: &str) -> Result<String, anyhow::Error> {
let res = syntect_resources();
let syntax = res.syntax_set.find_syntax_by_extension("py").unwrap();
let html = syntect::html::highlighted_html_for_string(
&payload,
&res.syntax_set,
&syntax,
&res.theme_set.themes["InspiredGitHub"],
);
Ok(html?)
}
pub struct OptimizeDdpSplitChildParser;
impl StructuredLogParser for OptimizeDdpSplitChildParser {
fn name(&self) -> &'static str {
"optimize_ddp_split_child"
}
fn get_metadata<'e>(&self, e: &'e Envelope) -> Option<Metadata<'e>> {
e.optimize_ddp_split_child
.as_ref()
.map(|m| Metadata::OptimizeDdpSplitChild(m))
}
fn parse<'e>(
&self,
lineno: usize,
metadata: Metadata<'e>,
_rank: Option<u32>,
compile_id: &Option<CompileId>,
_payload: &str,
) -> anyhow::Result<ParserResults> {
if let Metadata::OptimizeDdpSplitChild(m) = metadata {
let filename = format!("optimize_ddp_split_child_{}.txt", m.name);
payload_file_output(&filename, lineno, compile_id)
} else {
Err(anyhow::anyhow!("Expected OptimizeDdpSplitChild metadata"))
}
}
}
pub struct LinkParser;
impl StructuredLogParser for LinkParser {
fn name(&self) -> &'static str {
"link_parser"
}
fn get_metadata<'e>(&self, e: &'e Envelope) -> Option<Metadata<'e>> {
e.link.as_ref().map(|m| Metadata::Link(m))
}
fn parse<'e>(
&self,
_lineno: usize,
metadata: Metadata<'e>,
_rank: Option<u32>,
_compile_id: &Option<CompileId>,
_payload: &str,
) -> anyhow::Result<ParserResults> {
if let Metadata::Link(m) = metadata {
Ok(Vec::from([ParserOutput::Link(
m.name.clone(),
m.url.clone(),
)]))
} else {
Err(anyhow::anyhow!("Expected Link Metadata"))
}
}
}
fn format_stack(stack: &StackSummary, caption: &str, open: bool) -> String {
let mut trie = StackTrieNode::default();
trie.insert_no_terminal(stack.to_vec());
trie.fmt(None, caption, open).unwrap()
}
fn format_stack_cached(
cache: &mut FxHashMap<(StackSummary, String), String>,
stack: &StackSummary,
caption: &str,
) -> String {
let key = (stack.clone(), caption.to_string());
if let Some(cached) = cache.get(&key) {
return cached.clone();
}
let result = format_stack(stack, caption, false);
cache.insert(key, result.clone());
result
}
pub struct CompilationMetricsParser<'t> {
pub tt: &'t TinyTemplate<'t>,
pub stack_index: &'t RefCell<StackIndex>,
pub symbolic_shape_specialization_index: &'t RefCell<SymbolicShapeSpecializationIndex>,
pub guard_added_fast_index: &'t RefCell<GuardAddedFastIndex>,
pub create_symbol_index: &'t RefCell<CreateSymbolIndex>,
pub unbacked_symbol_index: &'t RefCell<UnbackedSymbolIndex>,
pub output_files: &'t Vec<OutputFile>,
pub compile_id_dir: &'t PathBuf,
}
impl StructuredLogParser for CompilationMetricsParser<'_> {
fn name(&self) -> &'static str {
"compilation_metrics"
}
fn get_metadata<'e>(&self, e: &'e Envelope) -> Option<Metadata<'e>> {
e.compilation_metrics
.as_ref()
.map(|m| Metadata::CompilationMetrics(m))
}
fn parse<'e>(
&self,
lineno: usize,
metrics: Metadata<'e>,
_rank: Option<u32>,
compile_id: &Option<CompileId>,
_payload: &str,
) -> anyhow::Result<ParserResults> {
let filename = format!("{}.html", self.name());
let mut stack_cache: FxHashMap<(StackSummary, String), String> = FxHashMap::default();
if let Metadata::CompilationMetrics(m) = metrics {
let id = compile_id
.clone()
.map_or("(unknown) ".to_string(), |c| format!("{cid} ", cid = c));
let mut cid = compile_id.clone();
if let Some(c) = cid.as_mut() {
if let Some(_frame_id) = c.frame_compile_id {
c.attempt = Some(0);
}
}
let stack_html = self
.stack_index
.borrow()
.get(&cid)
.map_or("".to_string(), |stack| format_stack(stack, "Stack", false));
let mini_stack_html = if let (Some(name), Some(filename), Some(line)) =
(&m.co_name, &m.co_filename, m.co_firstlineno)
{
format_stack(
&Vec::from([FrameSummary {
uninterned_filename: Some(filename.clone()),
filename: u32::MAX,
line: line,
name: name.clone(),
loc: None,
}]),
"Stack",
false,
)
} else {
"".to_string()
};
let specializations: Vec<_> = self
.symbolic_shape_specialization_index
.borrow_mut()
.remove(&cid)
.unwrap_or_default()
.drain(..)
.map(|spec| {
let user_stack = spec.user_stack.unwrap_or_default();
let stack = spec.stack.unwrap_or_default();
SymbolicShapeSpecializationContext {
symbol: spec.symbol.unwrap_or("".to_string()),
sources: spec.sources.unwrap_or_default(),
value: spec.value.unwrap_or("".to_string()),
user_stack_html: format_stack_cached(
&mut stack_cache,
&user_stack,
"User Stack",
),
stack_html: format_stack_cached(
&mut stack_cache,
&stack,
"Framework Stack",
),
}
})
.collect();
let guards_added_fast: Vec<_> = self
.guard_added_fast_index
.borrow_mut()
.remove(&cid)
.unwrap_or_default()
.drain(..)
.map(|guard| {
let user_stack = guard.user_stack.unwrap_or_default();
let stack = guard.stack.unwrap_or_default();
GuardAddedFastContext {
expr: guard.expr.unwrap_or("".to_string()),
user_stack_html: format_stack_cached(
&mut stack_cache,
&user_stack,
"User Stack",
),
stack_html: format_stack_cached(
&mut stack_cache,
&stack,
"Framework Stack",
),
}
})
.collect();
let create_symbols: Vec<_> = self
.create_symbol_index
.borrow_mut()
.remove(&cid)
.unwrap_or_default()
.drain(..)
.map(|sym| {
let user_stack = sym.user_stack.unwrap_or_default();
let stack = sym.stack.unwrap_or_default();
CreateSymbolContext {
symbol: sym.symbol.unwrap_or("".to_string()),
val: sym.val.unwrap_or("".to_string()),
vr: sym.vr.unwrap_or("".to_string()),
source: sym.source.unwrap_or("".to_string()),
user_stack_html: format_stack_cached(
&mut stack_cache,
&user_stack,
"User Stack",
),
stack_html: format_stack_cached(
&mut stack_cache,
&stack,
"Framework Stack",
),
}
})
.collect();
let unbacked_symbols: Vec<_> = self
.unbacked_symbol_index
.borrow_mut()
.remove(&cid)
.unwrap_or_default()
.drain(..)
.map(|sym| {
let user_stack = sym.user_stack.unwrap_or_default();
let stack = sym.stack.unwrap_or_default();
UnbackedSymbolContext {
symbol: sym.symbol.unwrap_or("".to_string()),
vr: sym.vr.unwrap_or("".to_string()),
user_stack_html: format_stack_cached(
&mut stack_cache,
&user_stack,
"User Stack",
),
stack_html: format_stack_cached(
&mut stack_cache,
&stack,
"Framework Stack",
),
}
})
.collect();
let remove_prefix = |x: &String| -> String {
let parts: Vec<_> = x.split("/").collect();
let new_str: String = parts[1..].join("");
new_str
};
let output_files: Vec<OutputFile> = self
.output_files
.iter()
.map(|o| OutputFile {
url: remove_prefix(&o.url),
name: remove_prefix(&o.name),
number: o.number.clone(),
suffix: o.suffix.clone(),
readable_url: o.readable_url.as_ref().map(|u| remove_prefix(u)),
})
.collect();
let extra_metrics: Vec<ExtraMetricContext> = m
.extra
.iter()
.map(|(key, value)| {
let value_html = match value {
serde_json::Value::String(s) => {
if s.len() > 200 {
format!("<details><summary>(click to expand)</summary><pre>{}</pre></details>",
html_escape::encode_text(s))
} else {
html_escape::encode_text(s).to_string()
}
}
serde_json::Value::Null => "null".to_string(),
other => {
let s = other.to_string();
if s.len() > 200 {
format!("<details><summary>(click to expand)</summary><pre>{}</pre></details>",
html_escape::encode_text(&s))
} else {
html_escape::encode_text(&s).to_string()
}
}
};
ExtraMetricContext {
key: key.clone(),
value_html,
}
})
.collect();
let context = CompilationMetricsContext {
css: crate::CSS,
m: &m,
compile_id: id,
stack_html: stack_html,
mini_stack_html: mini_stack_html,
symbolic_shape_specializations: specializations,
guards_added_fast: guards_added_fast,
create_symbols: create_symbols,
unbacked_symbols: unbacked_symbols,
output_files: &output_files,
compile_id_dir: &self.compile_id_dir,
extra_metrics: extra_metrics,
qps: TEMPLATE_QUERY_PARAM_SCRIPT,
};
let output = self.tt.render(&filename, &context)?;
simple_file_output(&filename, lineno, compile_id, &output)
} else {
Err(anyhow::anyhow!("Expected CompilationMetrics metadata"))
}
}
}
pub struct AOTAutogradBackwardCompilationMetricsParser<'t> {
tt: &'t TinyTemplate<'t>,
}
impl StructuredLogParser for AOTAutogradBackwardCompilationMetricsParser<'_> {
fn name(&self) -> &'static str {
"aot_autograd_backward_compilation_metrics"
}
fn get_metadata<'e>(&self, e: &'e Envelope) -> Option<Metadata<'e>> {
e.aot_autograd_backward_compilation_metrics
.as_ref()
.map(|m| Metadata::AOTAutogradBackwardCompilationMetrics(m))
}
fn parse<'e>(
&self,
lineno: usize,
metrics: Metadata<'e>,
_rank: Option<u32>,
compile_id: &Option<CompileId>,
_payload: &str,
) -> anyhow::Result<ParserResults> {
let filename = format!("{}.html", self.name());
if let Metadata::AOTAutogradBackwardCompilationMetrics(m) = metrics {
let id = compile_id
.clone()
.map_or("(unknown) ".to_string(), |c| format!("{cid} ", cid = c));
let context = AOTAutogradBackwardCompilationMetricsContext {
css: crate::CSS,
m: &m,
compile_id: id,
qps: TEMPLATE_QUERY_PARAM_SCRIPT,
};
let output = self.tt.render(&filename, &context)?;
simple_file_output(&filename, lineno, compile_id, &output)
} else {
Err(anyhow::anyhow!(
"Expected AOTAutogradBackwardCompilationMetrics metadata"
))
}
}
}
pub struct BwdCompilationMetricsParser<'t> {
tt: &'t TinyTemplate<'t>,
}
impl StructuredLogParser for BwdCompilationMetricsParser<'_> {
fn name(&self) -> &'static str {
"bwd_compilation_metrics"
}
fn get_metadata<'e>(&self, e: &'e Envelope) -> Option<Metadata<'e>> {
e.bwd_compilation_metrics
.as_ref()
.map(|m| Metadata::BwdCompilationMetrics(m))
}
fn parse<'e>(
&self,
lineno: usize,
metrics: Metadata<'e>,
_rank: Option<u32>,
compile_id: &Option<CompileId>,
_payload: &str,
) -> anyhow::Result<ParserResults> {
let filename = format!("{}.html", self.name());
if let Metadata::BwdCompilationMetrics(m) = metrics {
let id = compile_id
.clone()
.map_or("(unknown) ".to_string(), |c| format!("{cid} ", cid = c));
let context = BwdCompilationMetricsContext {
css: crate::CSS,
m: &m,
compile_id: id,
qps: TEMPLATE_QUERY_PARAM_SCRIPT,
};
let output = self.tt.render(&filename, &context)?;
simple_file_output(&filename, lineno, compile_id, &output)
} else {
Err(anyhow::anyhow!("Expected BwdCompilationMetrics metadata"))
}
}
}
pub struct DumpFileParser;
impl StructuredLogParser for DumpFileParser {
fn name(&self) -> &'static str {
"dump_file"
}
fn get_metadata<'e>(&self, e: &'e Envelope) -> Option<Metadata<'e>> {
e.dump_file.as_ref().map(|m| Metadata::DumpFile(m))
}
fn parse<'e>(
&self,
_lineno: usize,
metadata: Metadata<'e>,
_rank: Option<u32>,
_compile_id: &Option<CompileId>,
payload: &str,
) -> anyhow::Result<ParserResults> {
if let Metadata::DumpFile(metadata) = metadata {
let mb_fx_id = extract_eval_with_key_id(&metadata.name);
let filename = if let Some(fx_id) = mb_fx_id {
format!("eval_with_key_{}.html", fx_id)
} else {
format!("{}.html", metadata.name)
};
let subdir = PathBuf::from("dump_file");
let f = subdir.join(filename);
Ok(Vec::from([ParserOutput::GlobalFile(
f,
anchor_source(payload),
)]))
} else {
Err(anyhow::anyhow!("Expected DumpFile metadata"))
}
}
}
pub fn anchor_source(text: &str) -> String {
let lines: Vec<&str> = text.lines().collect();
let mut html = String::from(
r#"<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Source Code</title>
<style>
pre {
counter-reset: line;
}
pre span {
display: block;
}
pre span:before {
counter-increment: line;
content: counter(line);
display: inline-block;
padding: 0 .5em;
margin-right: .5em;
color: #888;
}
pre span:target {
background-color: #ffff00;
}
</style>
</head>
<body>
<pre>"#,
);
for (i, line) in lines.iter().enumerate() {
let line_number = i + 1;
html.push_str(&format!(
r#"<span id="L{}">{}</span>"#,
line_number,
encode_text(line)
));
}
html.push_str(&format!(
"</pre>{TEMPLATE_QUERY_PARAM_SCRIPT}</body></html>"
));
html
}
pub fn read_runtime_estimations(
out_path: &PathBuf,
rank_nums: &[u32],
) -> anyhow::Result<Vec<GraphRuntime>> {
read_artifacts(
out_path,
rank_nums,
"inductor_runtime_and_tensor_meta",
|content, rank, graph| {
#[derive(serde::Deserialize)]
struct RuntimeJson {
ops: Vec<OpRuntime>,
}
let json: RuntimeJson = serde_json::from_str(content)?;
Ok((!json.ops.is_empty()).then(|| GraphRuntime {
rank,
graph,
ops: json.ops,
}))
},
)
}
pub fn read_tensor_meta_fingerprints(
out_path: &PathBuf,
rank_nums: &[u32],
) -> anyhow::Result<Vec<TensorMetaFingerprint>> {
read_artifacts(
out_path,
rank_nums,
"inductor_runtime_and_tensor_meta",
|content, rank, graph| {
let json_value: serde_json::Value = serde_json::from_str(content)?;
let canonical_json = serde_json::to_string(&json_value)?;
Ok(Some(TensorMetaFingerprint {
rank,
graph,
fingerprint: canonical_json,
}))
},
)
}
pub fn read_collective_schedules(
out_path: &PathBuf,
rank_nums: &[u32],
) -> anyhow::Result<Vec<CollectiveSchedule>> {
read_artifacts(
out_path,
rank_nums,
"inductor_collective_schedule",
|content, rank, graph| {
let ops: Vec<String> = serde_json::from_str(content)?;
Ok((!ops.is_empty()).then(|| CollectiveSchedule { rank, graph, ops }))
},
)
}
pub fn check_collectives_parity(out_path: &PathBuf, rank_nums: &[u32]) -> anyhow::Result<()> {
use regex::Regex;
use std::{collections::HashMap, fs};
let call_re = Regex::new(
r"torch\s*\.\s*ops\s*\.\s*_?c10d_functional\s*\.\s*([A-Za-z0-9_]+)\s*\.\s*default\s*\(",
)?;
let comment_re = Regex::new(r"(?m)#.*$|//.*$|(?s)/\*.*?\*/")?;
let html_tag_re = Regex::new(r"(?s)<[^>]*>")?;
for &rank in rank_nums {
let rank_dir = out_path.join(format!("rank_{rank}"));
if !rank_dir.exists() {
continue;
}
let dir_to_compile_id: HashMap<String, String> =
fs::read_to_string(rank_dir.join("compile_directory.json"))
.ok()
.and_then(|s| serde_json::from_str::<serde_json::Value>(&s).ok())
.and_then(|v| {
v.as_object().map(|obj| {
obj.iter().fold(HashMap::new(), |mut m, (cid, entry)| {
if let Some(arts) = entry.get("artifacts").and_then(|x| x.as_array()) {
for a in arts {
if let Some(url) = a.get("url").and_then(|x| x.as_str()) {
if let Some((prefix, _)) = url.split_once('/') {
m.entry(prefix.to_string())
.or_insert_with(|| cid.to_string());
}
}
}
}
m
})
})
})
.unwrap_or_default();
let mut report = crate::types::CollectivesParityReport {
description: "Difference of # of collectives in scheduler and inductor output code and missing wait collectives"
.to_string(),
graphs: Vec::new(),
};
for compile_dir in fs::read_dir(&rank_dir)?
.flatten()
.map(|e| e.path())
.filter(|p| p.is_dir())
{
let (mut schedule_path, mut code_path) = (None, None);
for p in fs::read_dir(&compile_dir)?.flatten().map(|e| e.path()) {
let stem = p.file_stem().and_then(|s| s.to_str()).unwrap_or("");
if p.extension() == Some(OsStr::new("json"))
&& stem.starts_with("inductor_collective_schedule")
{
schedule_path = Some(p);
} else if stem.starts_with("inductor_output_code") && code_path.is_none() {
code_path = Some(p);
}
}
let (Some(schedule), Some(code)) = (schedule_path, code_path) else {
continue;
};
let raw_ops: Vec<String> =
serde_json::from_str(&fs::read_to_string(schedule)?).unwrap_or_default();
let normalize_op = |op: &str| -> Option<&'static str> {
let op = op.trim_end_matches('_');
[
"all_reduce",
"reduce_scatter",
"all_gather",
"broadcast",
"all_to_all",
]
.iter()
.find(|&&name| op.contains(name))
.copied()
.or_else(|| {
(op.contains("reduce")
&& !op.contains("all_reduce")
&& !op.contains("reduce_scatter"))
.then_some("reduce")
})
};
let mut schedule_counts: HashMap<&str, usize> = HashMap::new();
for op in &raw_ops {
if let Some(normalized) = normalize_op(op) {
*schedule_counts.entry(normalized).or_insert(0) += 1;
}
}
let code_clean = comment_re
.replace_all(&html_tag_re.replace_all(&fs::read_to_string(code)?, ""), "")
.into_owned();
let mut code_counts: HashMap<&str, usize> = HashMap::new();
let mut wait_count = 0usize;
for cap in call_re.captures_iter(&code_clean) {
let op = cap.get(1).unwrap().as_str();
if op == "wait_tensor" {
wait_count += 1;
} else if let Some(normalized) = normalize_op(op) {
*code_counts.entry(normalized).or_insert(0) += 1;
}
}
let collective_total: usize = code_counts.values().sum();
let missing_waits = collective_total.saturating_sub(wait_count);
let mut all_ops: std::collections::HashSet<&str> =
schedule_counts.keys().copied().collect();
all_ops.extend(code_counts.keys().copied());
let offset: usize = all_ops
.iter()
.map(|&n| {
schedule_counts
.get(n)
.copied()
.unwrap_or(0)
.abs_diff(code_counts.get(n).copied().unwrap_or(0))
})
.sum();
if offset > 0 || missing_waits > 0 {
let graph = compile_dir
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("unknown")
.to_string();
let compile_id = dir_to_compile_id
.get(&graph)
.cloned()
.unwrap_or_else(|| "unknown".to_string());
report.graphs.push(crate::types::GraphCollectivesParity {
graph,
compile_id,
offset,
missing_waits,
});
}
}
fs::write(
rank_dir.join("collectives_parity.json"),
serde_json::to_string_pretty(&report)?,
)?;
}
Ok(())
}
fn read_artifacts<T>(
out_path: &PathBuf,
rank_nums: &[u32],
file_prefix: &str,
parse_fn: impl Fn(&str, u32, String) -> anyhow::Result<Option<T>>,
) -> anyhow::Result<Vec<T>> {
use anyhow::Context;
use std::fs;
let mut results = Vec::new();
for &rank in rank_nums {
let rank_dir = out_path.join(format!("rank_{rank}"));
if !rank_dir.exists() {
continue;
}
for entry in fs::read_dir(&rank_dir)?
.flatten()
.filter(|e| e.path().is_dir())
{
let compile_dir = entry.path();
let file = fs::read_dir(&compile_dir)?.flatten().find(|e| {
let path = e.path();
path.extension() == Some(OsStr::new("json"))
&& path
.file_stem()
.and_then(|s| s.to_str())
.map_or(false, |s| s.starts_with(file_prefix))
});
if let Some(file) = file {
let content = fs::read_to_string(file.path())
.with_context(|| format!("Reading {file_prefix} for rank {rank}"))?;
let graph = compile_dir
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("unknown")
.to_string();
if let Some(result) = parse_fn(&content, rank, graph)? {
results.push(result);
}
}
}
}
Ok(results)
}
pub struct ArtifactParser;
impl StructuredLogParser for ArtifactParser {
fn name(&self) -> &'static str {
"artifact"
}
fn get_metadata<'e>(&self, e: &'e Envelope) -> Option<Metadata<'e>> {
e.artifact.as_ref().map(|m| Metadata::Artifact(m))
}
fn parse<'e>(
&self,
lineno: usize,
metadata: Metadata<'e>,
_rank: Option<u32>,
compile_id: &Option<CompileId>,
_payload: &str,
) -> anyhow::Result<ParserResults> {
if let Metadata::Artifact(metadata) = metadata {
match metadata.encoding.as_str() {
"string" => {
let filename = format!("{}.txt", metadata.name);
payload_file_output(&filename, lineno, compile_id)
}
"json" => {
let filename: String = format!("{}.json", metadata.name);
payload_reformat_file_output(&filename, lineno, compile_id, format_json_pretty)
}
_ => Err(anyhow::anyhow!(
"Unsupported encoding: {}",
metadata.encoding
)),
}
} else {
Err(anyhow::anyhow!("Expected Artifact metadata"))
}
}
}
pub struct MemoizerArtifactsParser;
impl StructuredLogParser for MemoizerArtifactsParser {
fn name(&self) -> &'static str {
"memoizer_artifacts"
}
fn get_metadata<'e>(&self, e: &'e Envelope) -> Option<Metadata<'e>> {
e.memoizer_artifacts
.as_ref()
.map(|m| Metadata::MemoizerArtifacts(m))
}
fn parse<'e>(
&self,
lineno: usize,
_metadata: Metadata<'e>,
_rank: Option<u32>,
compile_id: &Option<CompileId>,
_payload: &str,
) -> anyhow::Result<ParserResults> {
payload_reformat_file_output(
"memoizer_artifacts.json",
lineno,
compile_id,
format_json_pretty,
)
}
}
fn render_sym_expr_trie(
expr: u64,
sym_expr_info_index: &SymExprInfoIndex,
depth: usize,
visited: &mut HashSet<u64>,
) -> Option<String> {
if visited.contains(&expr) {
return None;
}
visited.insert(expr);
let sym_expr_info = sym_expr_info_index.get(&expr)?;
let binding = Vec::new();
let sym_expr_args_id = sym_expr_info.argument_ids.as_ref().unwrap_or(&binding);
let mut children_elements = Vec::new();
for arg_id in sym_expr_args_id {
if let Some(child_element) =
render_sym_expr_trie(*arg_id, sym_expr_info_index, depth + 1, visited)
{
children_elements.push(child_element);
}
}
let mut sym_expr_trie_html = format!(
r#"
<div style="margin-left: {}px;">
<div style="padding: 16px; border: 1px solid #ccc; border-radius: 8px; box-shadow: 2px 2px 5px rgba(0,0,0,0.1); background-color: white;">
<h3 style="font-weight: bold; font-size: 1.25rem;">{}</h3>
<div style="margin-top: 8px;">
<p><span style="font-weight: bold;">Method:</span> {}</p>
<p><span style="font-weight: bold;">Arguments:</span> {}</p>
<div style="margin-top: 8px; font-size: 0.875rem;">
{}
{}
</div>
</div>
</div>
</div>
"#,
depth * 20,
sym_expr_info.result.as_ref().unwrap_or(&"".to_string()),
sym_expr_info.method.as_ref().unwrap_or(&"".to_string()),
sym_expr_info
.arguments
.as_ref()
.unwrap_or(&Vec::new())
.join(", "),
format_stack(
&sym_expr_info.user_stack.as_ref().unwrap_or(&Vec::new()),
"User Stack",
true
),
format_stack(
&sym_expr_info.stack.as_ref().unwrap_or(&Vec::new()),
"Stack",
false
),
);
if !children_elements.is_empty() {
for child_element in children_elements {
sym_expr_trie_html.push_str(&child_element);
}
}
Some(sym_expr_trie_html)
}
pub struct PropagateRealTensorsParser<'t> {
pub tt: &'t TinyTemplate<'t>,
pub sym_expr_info_index: &'t SymExprInfoIndex,
}
impl StructuredLogParser for PropagateRealTensorsParser<'_> {
fn name(&self) -> &'static str {
"guard_added"
}
fn get_metadata<'e>(&self, e: &'e Envelope) -> Option<Metadata<'e>> {
if let Some(m) = e.propagate_real_tensors_provenance.as_ref() {
return Some(Metadata::SymbolicShapePropagateRealTensor(m));
}
if let Some(g) = e.guard_added.as_ref() {
return Some(Metadata::SymbolicShapePropagateRealTensor(g));
}
return None;
}
fn parse<'e>(
&self,
lineno: usize,
metadata: Metadata<'e>,
_rank: Option<u32>,
compile_id: &Option<CompileId>,
_payload: &str,
) -> anyhow::Result<ParserResults> {
if let Metadata::SymbolicShapePropagateRealTensor(m) = metadata {
let filename = "symbolic_guard_information.html";
let framework_stack_html = format_stack(
&m.stack.as_ref().unwrap_or(&Vec::new()),
"Framework Stack",
false,
);
let user_stack_html = format_stack(
&m.user_stack.as_ref().unwrap_or(&Vec::new()),
"User Stack",
true,
);
let locals_html = format!(
"{}",
m.frame_locals.as_ref().unwrap_or(&FrameLocals::default())
);
let mut visited = HashSet::new();
let sym_expr_trie_html = render_sym_expr_trie(
m.expr_node_id.unwrap(),
self.sym_expr_info_index,
0,
&mut visited,
)
.unwrap_or("".to_string());
let context = SymbolicGuardContext {
css: crate::CSS,
expr: m.expr.clone().unwrap(),
user_stack_html: user_stack_html,
framework_stack_html: framework_stack_html,
sym_expr_trie_html: sym_expr_trie_html,
locals_html: locals_html,
};
let output = self.tt.render(&filename, &context)?;
simple_file_output(&filename, lineno, compile_id, &output)
} else {
Err(anyhow::anyhow!(
"Expected SymbolicShapePropagateRealTensor metadata"
))
}
}
}
pub fn default_parsers<'t>(
tt: &'t TinyTemplate<'t>,
parser_config: &ParseConfig,
) -> Vec<Box<dyn StructuredLogParser + 't>> {
if parser_config.export {
return vec![Box::new(SentinelFileParser::new("exported_program", |e| {
e.exported_program.as_ref()
}))];
}
let result: Vec<Box<dyn StructuredLogParser>> = vec![
Box::new(SentinelFileParser::new("optimize_ddp_split_graph", |e| {
e.optimize_ddp_split_graph.as_ref()
})),
Box::new(SentinelFileParser::new("compiled_autograd_graph", |e| {
e.compiled_autograd_graph.as_ref()
})),
Box::new(SentinelFileParser::new("aot_forward_graph", |e| {
e.aot_forward_graph.as_ref()
})),
Box::new(SentinelFileParser::new("aot_backward_graph", |e| {
e.aot_backward_graph.as_ref()
})),
Box::new(SentinelFileParser::new("aot_inference_graph", |e| {
e.aot_inference_graph.as_ref()
})),
Box::new(SentinelFileParser::new("aot_joint_graph", |e| {
e.aot_joint_graph.as_ref()
})),
Box::new(SentinelFileParser::new("inductor_post_grad_graph", |e| {
e.inductor_post_grad_graph.as_ref()
})),
Box::new(SentinelFileParser::new("inductor_pre_grad_graph", |e| {
e.inductor_pre_grad_graph.as_ref()
})),
Box::new(SentinelFileParser::new("dynamo_cpp_guards_str", |e| {
e.dynamo_cpp_guards_str.as_ref()
})),
Box::new(GraphDumpParser),
Box::new(DynamoOutputGraphParser),
Box::new(DynamoGuardParser { tt }),
Box::new(InductorOutputCodeParser::new(parser_config)),
Box::new(OptimizeDdpSplitChildParser),
Box::new(AOTAutogradBackwardCompilationMetricsParser { tt }), Box::new(BwdCompilationMetricsParser { tt }), Box::new(LinkParser),
Box::new(ArtifactParser),
Box::new(DumpFileParser),
];
result
}