#![cfg(all(
feature = "trace_recorder",
not(any(target_arch = "wasm32", target_os = "android", target_os = "ios"))
))]
use std::{
collections::HashMap,
fmt::{self, Write as _},
io::{self, Read},
path::{Path, PathBuf},
time::{Duration, SystemTime},
};
use serde::Deserialize as _;
use tracing_subscriber::{filter::EnvFilter, layer::SubscriberExt as _, util::SubscriberInitExt as _};
use zng_task::parking_lot::Mutex;
use zng_txt::{ToTxt as _, Txt};
#[derive(Clone, Debug)]
#[non_exhaustive]
pub struct Trace {
pub processes: Vec<ProcessTrace>,
}
#[derive(Clone, Debug)]
#[non_exhaustive]
pub struct ProcessTrace {
pub id: u64,
pub name: Txt,
pub threads: Vec<ThreadTrace>,
pub start: SystemTime,
}
#[derive(Clone)]
#[non_exhaustive]
pub struct ThreadTrace {
pub name: Txt,
pub events: Vec<EventTrace>,
pub spans: Vec<SpanTrace>,
}
impl fmt::Debug for ThreadTrace {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ThreadTrace")
.field("name", &self.name)
.field("events.len()", &self.events.len())
.field("spans.len()", &self.spans.len())
.finish()
}
}
#[derive(Clone, Debug)]
#[non_exhaustive]
pub struct EventTrace {
pub info: Info,
pub instant: Duration,
}
#[derive(Clone, Debug)]
#[non_exhaustive]
pub struct SpanTrace {
pub info: Info,
pub start: Duration,
pub end: Duration,
}
#[derive(Clone, Debug)]
#[non_exhaustive]
pub struct Info {
pub name: Txt,
pub categories: Vec<Txt>,
pub file: Txt,
pub line: u32,
pub args: HashMap<Txt, Txt>,
}
impl Trace {
pub fn read_chrome_trace(json_path: impl AsRef<Path>) -> io::Result<Self> {
let json = std::fs::read_to_string(json_path)?;
let trace = Self::parse_chrome_trace(&json)?;
Ok(trace)
}
pub fn parse_chrome_trace(json: &str) -> io::Result<Self> {
fn invalid_data(msg: impl Into<Box<dyn std::error::Error + Send + Sync>>) -> io::Error {
io::Error::new(io::ErrorKind::InvalidData, msg)
}
let json = json.trim_start();
if !json.starts_with('[') {
return Err(invalid_data("expected JSON array"));
}
let json = &json[1..];
enum Phase {
Begin,
End,
Event,
}
struct Entry {
phase: Phase,
pid: u64,
tid: u64,
ts: Duration,
name: Txt,
cat: Vec<Txt>,
file: Txt,
line: u32,
args: HashMap<Txt, Txt>,
}
let mut process_sys_pid = HashMap::new();
let mut process_names = HashMap::new();
let mut process_record_start = HashMap::new();
let mut thread_names = HashMap::new();
let mut entries = vec![];
let mut reader = std::io::Cursor::new(json.as_bytes());
loop {
let mut pos = reader.position();
let mut buf = [0u8];
while reader.read(&mut buf).is_ok() {
if !b" \r\n\t,".contains(&buf[0]) {
break;
}
pos = reader.position();
}
reader.set_position(pos);
let mut de = serde_json::Deserializer::from_reader(&mut reader);
match serde_json::Value::deserialize(&mut de) {
Ok(entry) => match entry {
serde_json::Value::Object(map) => {
let phase_str = match map.get("ph") {
Some(serde_json::Value::String(ph)) => match ph.as_str() {
"B" | "E" | "i" | "M" => ph.as_str(),
u => {
tracing::error!("ignoring unknown or unsupported phase `{u:?}`");
continue;
}
},
_ => return Err(invalid_data("expected \"ph\"")),
};
let pid = match map.get("pid") {
Some(serde_json::Value::Number(n)) => match n.as_u64() {
Some(pid) => pid,
None => return Err(invalid_data("expected \"pid\"")),
},
_ => return Err(invalid_data("expected \"pid\"")),
};
let name = match map.get("name") {
Some(serde_json::Value::String(name)) => name.to_txt(),
_ => return Err(invalid_data("expected \"name\"")),
};
let args: HashMap<Txt, Txt> = match map.get("args") {
Some(a) => match serde_json::from_value(a.clone()) {
Ok(a) => a,
Err(e) => {
tracing::error!("only simple text args are supported, {e}");
continue;
}
},
_ => HashMap::new(),
};
if phase_str == "M" && name == "process_name" {
if let Some(n) = args.get("name") {
process_names.insert(pid, n.to_txt());
}
continue;
}
let tid = match map.get("tid") {
Some(serde_json::Value::Number(n)) => match n.as_u64() {
Some(tid) => tid,
None => return Err(invalid_data("expected \"tid\"")),
},
_ => return Err(invalid_data("expected \"tid\"")),
};
let phase = match phase_str {
"B" => Phase::Begin,
"E" => Phase::End,
"i" => Phase::Event,
"M" => {
if name == "thread_name"
&& let Some(n) = args.get("name")
{
thread_names.insert(tid, n.to_txt());
}
continue;
}
_ => unreachable!(),
};
let ts = match map.get("ts") {
Some(serde_json::Value::Number(ts)) => match ts.as_f64() {
Some(ts) => Duration::from_nanos((ts * 1000.0).round() as u64),
None => return Err(invalid_data("expected \"ts\"")),
},
_ => return Err(invalid_data("expected \"ts\"")),
};
let cat = match map.get("cat") {
Some(serde_json::Value::String(cat)) => cat.split(',').map(|c| c.trim().to_txt()).collect(),
_ => vec![],
};
let file = match map.get(".file") {
Some(serde_json::Value::String(file)) => file.to_txt(),
_ => Txt::from_static(""),
};
let line = match map.get(".line") {
Some(serde_json::Value::Number(line)) => line.as_u64().unwrap_or(0) as u32,
_ => 0,
};
if let Some(msg) = args.get("message") {
if let Some(process_ts) = msg.strip_prefix("zng-record-start: ") {
if let Ok(process_ts) = process_ts.parse::<u64>() {
process_record_start.insert(pid, SystemTime::UNIX_EPOCH + Duration::from_micros(process_ts));
}
} else if let Some(rest) = msg.strip_prefix("pid: ")
&& let Some((sys_pid, p_name)) = rest.split_once(", name: ")
&& let Ok(sys_pid) = sys_pid.parse::<u64>()
{
process_sys_pid.insert(pid, sys_pid);
process_names.insert(pid, p_name.to_txt());
}
}
entries.push(Entry {
phase,
pid,
tid,
ts,
name,
cat,
file,
line,
args,
});
}
_ => return Err(invalid_data("expected JSON array of objects")),
},
Err(_) => {
break;
}
}
}
let mut out = Trace { processes: vec![] };
for mut entry in entries {
let sys_pid = *process_sys_pid.entry(entry.pid).or_insert(entry.pid);
let process = if let Some(p) = out.processes.iter_mut().find(|p| p.id == sys_pid) {
p
} else {
out.processes.push(ProcessTrace {
id: sys_pid,
name: process_names.entry(entry.pid).or_insert_with(|| sys_pid.to_txt()).clone(),
threads: vec![],
start: process_record_start.get(&entry.pid).copied().unwrap_or(SystemTime::UNIX_EPOCH),
});
out.processes.last_mut().unwrap()
};
let thread_name = if let Some(custom_name) = entry.args.remove("thread") {
custom_name.clone()
} else {
thread_names.entry(entry.tid).or_insert_with(|| entry.tid.to_txt()).clone()
};
let thread = if let Some(t) = process.threads.iter_mut().find(|t| t.name == thread_name) {
t
} else {
process.threads.push(ThreadTrace {
name: thread_name,
events: vec![],
spans: vec![],
});
process.threads.last_mut().unwrap()
};
fn entry_to_info(entry: Entry) -> Info {
Info {
name: entry.name,
categories: entry.cat,
file: entry.file,
line: entry.line,
args: entry.args,
}
}
match entry.phase {
Phase::Begin => thread.spans.push(SpanTrace {
start: entry.ts,
end: entry.ts,
info: entry_to_info(entry),
}),
Phase::End => {
let end = entry.ts;
let info = entry_to_info(entry);
if let Some(open) = thread.spans.iter_mut().rev().find(|s| s.start == s.end && s.info.name == info.name) {
open.end = end;
if open.start == open.end {
open.end += Duration::from_nanos(1);
}
open.info.merge(info);
}
}
Phase::Event => thread.events.push(EventTrace {
instant: entry.ts,
info: entry_to_info(entry),
}),
}
}
Ok(out)
}
pub fn to_chrome_trace(&self) -> Txt {
let mut out = String::new();
let _ = writeln!(&mut out, "[");
let mut sep = "";
for p in &self.processes {
let _ = write!(
&mut out,
r#"{sep}{{"ph":"M","pid":{},"name":"process_name","args":{{"name":"{}"}}}}"#,
p.id, p.name
);
sep = ",\n";
for (tid, t) in p.threads.iter().enumerate() {
let _ = write!(
&mut out,
r#"{sep}{{"ph":"M","pid":{}, "tid":{tid},"name":"process_name","args":{{"name":"{}"}}}}"#,
p.id, t.name
);
let mut items = Vec::with_capacity(t.events.len() + t.spans.len() * 2);
for ev in &t.events {
let obj = serde_json::json!({
"ph": "i",
"s": "t",
"ts": (ev.instant.as_nanos() as f64 / 1000.0),
"pid": p.id,
"tid": tid,
"name": ev.info.name,
"cat": ev.info.categories.iter().fold(String::new(), |a, b| format!("{a},{b}")),
"args": ev.info.args,
".file": ev.info.file,
".line": ev.info.line,
});
items.push((ev.instant, obj));
}
for sp in &t.spans {
let start = serde_json::json!({
"ph": "B",
"s": "t",
"ts": (sp.start.as_nanos() as f64 / 1000.0),
"pid": p.id,
"tid": tid,
"name": sp.info.name,
"cat": sp.info.categories.iter().fold(String::new(), |a, b| format!("{a},{b}")),
"args": sp.info.args,
".file": sp.info.file,
".line": sp.info.line,
});
items.push((sp.start, start));
let end = serde_json::json!({
"ph": "E",
"s": "t",
"ts": (sp.end.as_nanos() as f64 / 1000.0),
"pid": p.id,
"tid": tid,
"name": sp.info.name,
"cat": sp.info.categories.iter().fold(String::new(), |a, b| format!("{a},{b}")),
"args": sp.info.args,
".file": sp.info.file,
".line": sp.info.line,
});
items.push((sp.end, end));
}
items.sort_by_key(|a| a.0);
for (_, item) in items {
let item = serde_json::to_string(&item).unwrap();
let _ = write!(&mut out, "{sep}{item}");
}
}
}
let _ = writeln!(&mut out, "]");
out.to_txt()
}
pub fn write_chrome_trace(&self, json_path: impl AsRef<Path>) -> io::Result<()> {
std::fs::write(json_path, self.to_chrome_trace().as_str().as_bytes())
}
pub fn merge(&mut self, other: Self) {
for p in other.processes {
if let Some(ep) = self.processes.iter_mut().find(|ep| ep.id == p.id && ep.name == p.name) {
ep.merge(p);
} else {
self.processes.push(p);
}
}
}
pub fn sort(&mut self) {
self.processes.sort_by(|a, b| a.start.cmp(&b.start).then(a.name.cmp(&b.name)));
for p in &mut self.processes {
p.sort();
}
}
}
impl ProcessTrace {
pub fn merge(&mut self, other: Self) {
for t in other.threads {
if let Some(et) = self.threads.iter_mut().find(|et| et.name == t.name) {
et.merge(t);
} else {
self.threads.push(t);
}
}
}
pub fn sort(&mut self) {
self.threads.sort_by(|a, b| a.start().cmp(&b.start()).then(a.name.cmp(&b.name)));
for t in &mut self.threads {
t.sort();
}
}
}
impl ThreadTrace {
pub fn start(&self) -> Duration {
self.events
.iter()
.map(|e| e.instant)
.min()
.unwrap_or(Duration::MAX)
.min(self.spans.iter().map(|e| e.start).min().unwrap_or(Duration::MAX))
}
pub fn merge(&mut self, mut other: Self) {
self.events.append(&mut other.events);
self.spans.append(&mut other.spans);
}
pub fn sort(&mut self) {
self.events.sort_by_key(|a| a.instant);
self.spans.sort_by(|a, b| a.start.cmp(&b.start).then(b.start.cmp(&a.start)));
}
}
impl Info {
pub fn merge(&mut self, info: Info) {
if !info.file.is_empty() {
self.file = info.file;
self.line = info.line;
}
self.args.extend(info.args);
}
}
pub fn start_recording(output_dir: Option<PathBuf>) {
let mut rec = recording();
if rec.is_some() {
return;
}
let process_start = std::time::SystemTime::now()
.duration_since(std::time::SystemTime::UNIX_EPOCH)
.expect("cannot define process start timestamp")
.as_micros();
let output_dir = output_dir.unwrap_or_else(|| std::env::current_dir().expect("`current_dir` error").join("zng-trace"));
let timestamp = match std::env::var("ZNG_RECORD_TRACE_TIMESTAMP") {
Ok(t) => t,
Err(_) => {
let t = process_start.to_string();
unsafe {
std::env::set_var("ZNG_RECORD_TRACE_TIMESTAMP", t.clone());
}
t
}
};
let output_dir = output_dir.join(timestamp);
std::fs::create_dir_all(&output_dir).expect("cannot create `output_dir`");
let output_file = output_dir.join(format!("{}.json", std::process::id()));
let (chrome_layer, guard) = tracing_chrome::ChromeLayerBuilder::new()
.include_args(true)
.file(output_file)
.category_fn(Box::new(|es| match es {
tracing_chrome::EventOrSpan::Event(event) => format!("{},{}", event.metadata().target(), event.metadata().level()),
tracing_chrome::EventOrSpan::Span(span_ref) => format!("{},{}", span_ref.metadata().target(), span_ref.metadata().level()),
}))
.build();
*rec = Some(guard);
let env_layer = EnvFilter::try_from_env("ZNG_RECORD_TRACE_FILTER")
.or_else(|_| EnvFilter::try_from_default_env())
.unwrap_or_else(|_| EnvFilter::new("trace"));
if let Err(e) = tracing_subscriber::registry().with(env_layer).with(chrome_layer).try_init() {
tracing::error!("cannot start recording, {e}");
eprintln!("cannot start recording, {e}");
*rec = None;
return;
}
zng_env::on_process_exit(|_| stop_recording());
tracing::info!("zng-record-start: {process_start}");
}
pub fn stop_recording() {
*recording() = None;
}
pub fn is_enabled() -> bool {
std::env::var("ZNG_RECORD_TRACE").is_ok()
}
zng_env::on_process_start!(|_| {
if is_enabled() {
start_recording(std::env::var("ZNG_RECORD_TRACE_DIR").ok().map(PathBuf::from));
}
});
zng_app_context::hot_static! {
static RECORDING: Mutex<Option<tracing_chrome::FlushGuard>> = Mutex::new(None);
}
fn recording() -> zng_task::parking_lot::MutexGuard<'static, Option<tracing_chrome::FlushGuard>> {
zng_app_context::hot_static_ref!(RECORDING).lock()
}