1use crate::{Record, RecordKind, SpanPath, SpanTree, SpanTreeNode};
2use eyre::eyre;
3use std::cmp::max;
4use std::collections::hash_map::Entry;
5use std::collections::HashMap;
6use std::fmt::Write;
7use std::iter;
8use std::time::Duration;
9use time::OffsetDateTime;
10use RecordKind::{SpanEnter, SpanExit};
11
12pub type TimingTree = SpanTree<Option<DerivedStats>>;
13type TimingTreeNode<'a> = SpanTreeNode<'a, Option<DerivedStats>>;
14
15#[derive(Debug, Clone, Default)]
17pub struct DirectStats {
18 pub duration: Duration,
20 pub count: u64,
22}
23
24impl DirectStats {
25 pub fn from_single_duration(duration: Duration) -> Self {
26 Self { duration, count: 1 }
27 }
28
29 pub fn combine_mut(&mut self, other: &DirectStats) {
30 self.duration += other.duration;
31 self.count += other.count;
32 }
33}
34
35#[derive(Debug, Clone)]
36pub struct DerivedStats {
37 pub duration: Duration,
38 pub count: u64,
39 pub duration_relative_to_parent: Option<f64>,
40 pub duration_relative_to_root: Option<f64>,
41 pub self_duration: Option<Duration>,
42 pub self_relative: Option<f64>,
43}
44
45fn update_column_widths_for_line(column_widths: &mut Vec<usize>, line: &str) {
46 let mut column_iter = line.split("\t");
47 for (col_width, column_content) in column_widths.iter_mut().zip(column_iter.by_ref()) {
49 *col_width = max(*col_width, column_content.len());
50 }
51 for column_content in column_iter {
53 column_widths.push(column_content.len());
54 }
55}
56
57fn write_table_line(output: &mut String, line: &str, column_widths: &[usize], alignments: &[Alignment]) {
58 let padding = 2;
59 debug_assert_eq!(line.lines().count(), 1, "line string must consist of a single line");
60 let alignment_iter = alignments.iter().chain(iter::repeat(&Alignment::Left));
61 for ((cell, width), alignment) in line.split("\t").zip(column_widths).zip(alignment_iter) {
62 match alignment {
63 Alignment::Left => write!(output, "{cell:width$}", width = width).unwrap(),
64 Alignment::Right => write!(output, "{cell: >width$}", width = width).unwrap(),
65 }
66 for _ in 0..padding {
67 output.push(' ');
68 }
69 }
70 writeln!(output).unwrap();
71}
72
73#[derive(Debug, Copy, Clone, PartialEq, Eq)]
74enum Alignment {
75 Left,
76 Right,
77}
78
79fn format_table(header: &str, table: &str, alignments: &[Alignment]) -> String {
80 debug_assert_eq!(header.lines().count(), 1, "Header must only have a single line");
81 let mut column_widths = vec![];
82 update_column_widths_for_line(&mut column_widths, header);
83 for line in table.lines() {
84 update_column_widths_for_line(&mut column_widths, line);
85 }
86
87 let mut output = String::new();
88 write_table_line(&mut output, header, &column_widths, &[]);
90 let header_len = output.len();
91 output.push_str(&"═".repeat(header_len));
92 writeln!(output).unwrap();
93
94 for line in table.lines() {
95 write_table_line(&mut output, line, &column_widths, alignments);
96 }
97
98 output.push_str(&"═".repeat(header_len));
99 writeln!(output).unwrap();
100
101 output
102}
103
104pub fn format_timing_tree(tree: &TimingTree) -> String {
105 let mut table = String::new();
106 if let Some(root) = tree.root() {
107 write_timing_tree_node(&mut table, root, &mut vec![]);
108 }
109 use Alignment::{Left, Right};
110 format_table(
111 "Total\tAverage\tSelf\tCount\tRel parent\tRel root\tSpan",
112 &table,
113 &vec![Right, Right, Right, Right, Right, Left],
114 )
115}
116
117fn write_proportion(output: &mut String, proportion: Option<f64>) {
118 if let Some(proportion) = proportion {
119 let percentage = 100.0 * proportion;
120 let _ = write!(output, "{percentage:5.1} %");
121 } else {
122 let _ = write!(output, " N/A");
123 }
124}
125
126fn write_timing_tree_node(output: &mut String, node: TimingTreeNode, active_stack: &mut Vec<bool>) {
127 let optional_stats = node.payload().as_ref();
128 let duration = optional_stats.map(|stats| stats.duration);
129 let count = optional_stats.map(|stats| stats.count);
130 write_duration(output, duration);
131 write!(output, "\t").unwrap();
132
133 let avg_duration = duration
134 .zip(count)
135 .map(|(duration, count)| duration.div_f64(count as f64));
136 write_duration(output, avg_duration);
137 write!(output, "\t").unwrap();
138
139 let self_relative = optional_stats.and_then(|stats| stats.self_relative);
140 write_proportion(output, self_relative);
141
142 if let Some(count) = count {
143 write!(output, "\t{count}").unwrap();
144 } else {
145 write!(output, "\tN/A").unwrap();
146 }
147
148 write!(output, "\t").unwrap();
149 let duration_relative_to_parent = optional_stats.and_then(|stats| stats.duration_relative_to_parent);
150 write_proportion(output, duration_relative_to_parent);
151 write!(output, "\t").unwrap();
152 let duration_relative_to_root = optional_stats.and_then(|stats| stats.duration_relative_to_root);
153 write_proportion(output, duration_relative_to_root);
154
155 write!(output, "\t").unwrap();
156 if let Some((&parent_is_active, predecessors)) = active_stack.split_last() {
157 for &is_active in predecessors {
158 if is_active {
159 output.push_str("│ ");
160 } else {
161 output.push_str(" ");
162 }
163 }
164 if parent_is_active {
165 output.push_str("├── ");
166 } else {
167 output.push_str("└── ");
168 }
169 }
170
171 writeln!(output, "{}", node.path().span_name().unwrap_or("<root span>")).unwrap();
172 let num_children = node.count_children();
173 for (child_idx, child) in node.visit_children().enumerate() {
174 let is_last_child = child_idx + 1 == num_children;
178 active_stack.push(!is_last_child);
179 write_timing_tree_node(output, child, &mut *active_stack);
180 active_stack.pop();
181 }
182}
183
184fn write_duration(output: &mut String, duration: Option<Duration>) {
186 if let Some(duration) = duration {
187 let secs = duration.as_secs_f64();
188 if 1e-9 <= secs && secs < 1e-6 {
189 write!(output, "{:5.1} ns", secs / 1e-9).unwrap();
190 } else if 1e-6 <= secs && secs < 1e-3 {
191 write!(output, "{:5.1} μs", secs / 1e-6).unwrap();
192 } else if 1e-3 <= secs && secs < 1.0 {
193 write!(output, "{:5.1} ms", secs / 1e-3).unwrap();
194 } else if 1.0 <= secs && secs < 1e3 || secs == 0.0 {
195 write!(output, "{:5.1} s ", secs).unwrap();
196 } else {
197 write!(output, "{:5.1e} s ", secs).unwrap();
198 }
199 } else {
200 write!(output, " N/A ").unwrap();
201 }
202}
203
204#[derive(Debug, Clone)]
205pub struct AccumulatedTimings {
206 span_stats: HashMap<SpanPath, DirectStats>,
207}
208
209#[derive(Debug, Clone)]
210pub struct AccumulatedStepTimings {
211 pub timings: AccumulatedTimings,
212 pub step_index: u64,
213}
214
215impl AccumulatedTimings {
216 pub fn new() -> Self {
217 Self {
218 span_stats: Default::default(),
219 }
220 }
221
222 pub fn merge_with_others<'a>(&mut self, others: impl Iterator<Item = &'a AccumulatedTimings>) {
223 for other in others {
224 for (path, stats) in &other.span_stats {
225 let current_stats = self.span_stats.entry(path.clone()).or_default();
226 current_stats.combine_mut(&stats);
227 }
228 }
229 }
230}
231
232impl AccumulatedTimings {
233 pub fn create_timing_tree(&self) -> TimingTree {
234 let mut map: HashMap<_, _> = self
242 .span_stats
243 .iter()
244 .map(|(path, stats)| (path.clone(), Some(stats.clone())))
245 .collect();
246
247 let common_ancestor = self
249 .span_stats
250 .keys()
251 .fold(None, |common: Option<SpanPath>, path| match common {
255 None => Some(path.clone()),
256 Some(current_common) => Some(current_common.common_ancestor(path)),
257 });
258
259 if let Some(common_ancestor) = common_ancestor {
260 for mut path in self.span_stats.keys().cloned() {
264 while let Some(parent_path) = path.parent() {
265 if parent_path.depth() < common_ancestor.depth() {
266 break;
267 } else {
268 if !map.contains_key(&parent_path) {
269 map.insert(parent_path.clone(), None);
270 }
271 path = parent_path;
272 }
273 }
274 }
275
276 map.entry(common_ancestor).or_insert(None);
279 }
280
281 let mut path_duration_pairs: Vec<_> = map.into_iter().collect();
282
283 path_duration_pairs.sort_by(|pair1, pair2| pair1.0.span_names().cmp(pair2.0.span_names()));
284 let (paths_depth_first, durations) = path_duration_pairs.into_iter().unzip();
285
286 SpanTree::try_from_depth_first_ordering(paths_depth_first, durations)
287 .expect("Input should always be a valid span tree")
288 .transform_payloads(|node| {
289 node.payload().as_ref().map(|stats| {
290 let duration = stats.duration;
291 let maybe_children_duration = node
292 .visit_children()
293 .map(|child| child.payload().as_ref().map(|stats| stats.duration))
294 .fold(Some(Duration::default()), |acc, maybe_duration| {
296 acc.zip(maybe_duration).map(|(a, b)| a + b)
297 });
298 let self_duration = maybe_children_duration.map(|children_duration| duration - children_duration);
299 let self_relative = self_duration
300 .map(|self_time| self_time.as_secs_f64() / duration.as_secs_f64())
301 .filter(|proportion| proportion.is_finite());
303
304 DerivedStats {
305 duration: stats.duration,
306 count: stats.count,
307 duration_relative_to_parent: node.parent().and_then(|parent_node| {
308 parent_node.payload().as_ref().map(|parent_stats| {
309 let parent_duration = parent_stats.duration;
310 let proportion = duration.as_secs_f64() / parent_duration.as_secs_f64();
311 proportion
312 })
313 }),
314 duration_relative_to_root: node.root().payload().as_ref().map(|root_stats| {
315 let root_duration = root_stats.duration;
316 let proportion = duration.as_secs_f64() / root_duration.as_secs_f64();
317 proportion
318 }),
319 self_duration,
320 self_relative,
321 }
322 })
323 })
324 }
325}
326
327#[derive(Debug, Clone)]
328pub struct AccumulatedTimingSeries {
329 steps: Vec<AccumulatedStepTimings>,
330 intransient_timings: AccumulatedTimings,
333 }
335
336impl AccumulatedTimingSeries {
337 pub fn summarize(&self) -> AccumulatedTimings {
338 let mut summary = self.intransient_timings.clone();
339 summary.merge_with_others(self.steps().iter().map(|step| &step.timings));
340 summary
341 }
342}
343
344impl AccumulatedTimingSeries {
345 pub fn steps(&self) -> &[AccumulatedStepTimings] {
346 &self.steps
347 }
348}
349
350pub fn extract_step_timings<'a>(records: impl IntoIterator<Item = Record>) -> eyre::Result<AccumulatedTimingSeries> {
351 find_and_visit_dynamecs_run_span(records.into_iter())
353}
354
355pub fn extract_timing_summary<'a>(records: impl IntoIterator<Item = Record>) -> eyre::Result<AccumulatedTimings> {
356 extract_step_timings(records).map(|series| series.summarize())
357}
358
359fn find_and_visit_dynamecs_run_span<'a>(
360 mut records: impl Iterator<Item = Record>,
361) -> eyre::Result<AccumulatedTimingSeries> {
362 while let Some(record) = records.next() {
364 if let Some(span) = record.span() {
365 if span.name() == "run" && record.target() == "dynamecs_app" && record.kind() == RecordKind::SpanEnter {
366 return visit_dynamecs_run_span(&record, records);
367 }
368 }
369 }
370
371 Err(eyre!(
372 "Could not find new event for `run` span of dynamecs among records"
373 ))
374}
375
376fn visit_dynamecs_run_span<'a>(
377 run_new_record: &Record,
378 remaining_records: impl Iterator<Item = Record>,
379) -> eyre::Result<AccumulatedTimingSeries> {
380 let run_thread = run_new_record.thread_id();
381 let mut iter = remaining_records;
382 let mut steps = Vec::new();
383
384 let mut intransient_accumulator = TimingAccumulator::new();
385 intransient_accumulator.enter_span(run_new_record.create_span_path()?, *run_new_record.timestamp())?;
386
387 while let Some(record) = iter.next() {
388 if record.thread_id() == run_thread {
389 if let Some(span) = record.span() {
390 match (span.name(), record.target(), record.kind()) {
391 ("step", "dynamecs_app", SpanEnter) => {
392 if let Some(step) = visit_dynamecs_step_span(&record, &mut iter)? {
393 steps.push(step);
395 }
396 }
397 (_, _, SpanEnter) => {
400 intransient_accumulator.enter_span(record.create_span_path()?, *record.timestamp())?
401 }
402 (span_name, record_target, SpanExit) => {
403 intransient_accumulator.exit_span(record.create_span_path()?, *record.timestamp())?;
404 if span_name == "run" && record_target == "dynamecs_app" {
405 break;
406 }
407 }
408 _ => {}
409 }
410 }
411 }
412 }
413
414 Ok(AccumulatedTimingSeries {
415 steps,
416 intransient_timings: AccumulatedTimings {
417 span_stats: intransient_accumulator.collect_completed_statistics(),
418 },
419 })
420}
421
422fn visit_dynamecs_step_span<'a>(
424 step_new_record: &Record,
425 remaining_records: &mut impl Iterator<Item = Record>,
426) -> eyre::Result<Option<AccumulatedStepTimings>> {
427 let step_path = step_new_record.create_span_path()?;
428
429 let mut accumulator = TimingAccumulator::new();
430 accumulator.enter_span(step_path.clone(), step_new_record.timestamp().clone())?;
431
432 let step_index = step_new_record
433 .span()
434 .and_then(|span| span.fields().pointer("/step_index"))
435 .and_then(|value| value.as_u64())
436 .ok_or_else(|| eyre!("step span does not have step_index field"))?;
437
438 while let Some(record) = remaining_records.next() {
439 if record.thread_id() == step_new_record.thread_id() {
440 if let Some(span) = record.span() {
441 match record.kind() {
442 SpanEnter => {
443 accumulator.enter_span(record.create_span_path()?, record.timestamp().clone())?;
444 }
445 SpanExit => {
446 let span_path = record.create_span_path()?;
448 let is_step_span_path = span_path == step_path;
449 accumulator.exit_span(span_path, record.timestamp().clone())?;
450 if span.name() == "step" && record.target() == "dynamecs_app" && is_step_span_path {
451 break;
452 }
453 }
454 _ => {}
455 }
456 }
457 }
458 }
459
460 if accumulator.has_active_spans() {
461 Ok(None)
465 } else {
466 Ok(Some(AccumulatedStepTimings {
467 timings: AccumulatedTimings {
468 span_stats: accumulator.collect_completed_statistics(),
469 },
470 step_index,
471 }))
472 }
473}
474
475#[derive(Debug)]
476struct TimingAccumulator {
477 completed_statistics: HashMap<SpanPath, DirectStats>,
478 enter_timestamps: HashMap<SpanPath, OffsetDateTime>,
479}
480
481impl TimingAccumulator {
482 pub fn new() -> Self {
483 Self {
484 completed_statistics: Default::default(),
485 enter_timestamps: Default::default(),
486 }
487 }
488
489 pub fn enter_span(&mut self, path: SpanPath, timestamp: OffsetDateTime) -> eyre::Result<()> {
490 match self.enter_timestamps.entry(path) {
491 Entry::Vacant(vacancy) => {
492 vacancy.insert(timestamp);
493 Ok(())
494 }
495 Entry::Occupied(old) => Err(eyre!(
496 "tried to create new span {} that is already active\
497 (not closed)",
498 old.key()
499 )),
500 }
501 }
502
503 pub fn exit_span(&mut self, path: SpanPath, timestamp_close: OffsetDateTime) -> eyre::Result<()> {
504 let timestamp_enter = self
505 .enter_timestamps
506 .remove(&path)
507 .ok_or_else(|| eyre!("found close event for span that is not currently active. Span path: {path}"))?;
508 let span_duration: Duration = (timestamp_close - timestamp_enter).unsigned_abs();
509 let accumulated_stats = self.completed_statistics.entry(path).or_default();
510 accumulated_stats.combine_mut(&DirectStats::from_single_duration(span_duration));
511 Ok(())
512 }
513
514 pub fn has_active_spans(&self) -> bool {
515 !self.enter_timestamps.is_empty()
516 }
517
518 pub fn collect_completed_statistics(self) -> HashMap<SpanPath, DirectStats> {
519 self.completed_statistics
520 }
521}