dynamecs_analyze/
timing.rs

1use crate::{Record, RecordKind, SpanPath, SpanTree, SpanTreeNode};
2use eyre::eyre;
3use std::cmp::max;
4use std::collections::hash_map::Entry;
5use std::collections::HashMap;
6use std::fmt::Write;
7use std::iter;
8use std::time::Duration;
9use time::OffsetDateTime;
10use RecordKind::{SpanEnter, SpanExit};
11
12pub type TimingTree = SpanTree<Option<DerivedStats>>;
13type TimingTreeNode<'a> = SpanTreeNode<'a, Option<DerivedStats>>;
14
15/// Statistics measured directly from logs.
16#[derive(Debug, Clone, Default)]
17pub struct DirectStats {
18    /// Total accumulated duration for the span.
19    pub duration: Duration,
20    /// Number of times the span was entered and subsequently *exited*.
21    pub count: u64,
22}
23
24impl DirectStats {
25    pub fn from_single_duration(duration: Duration) -> Self {
26        Self { duration, count: 1 }
27    }
28
29    pub fn combine_mut(&mut self, other: &DirectStats) {
30        self.duration += other.duration;
31        self.count += other.count;
32    }
33}
34
35#[derive(Debug, Clone)]
36pub struct DerivedStats {
37    pub duration: Duration,
38    pub count: u64,
39    pub duration_relative_to_parent: Option<f64>,
40    pub duration_relative_to_root: Option<f64>,
41    pub self_duration: Option<Duration>,
42    pub self_relative: Option<f64>,
43}
44
45fn update_column_widths_for_line(column_widths: &mut Vec<usize>, line: &str) {
46    let mut column_iter = line.split("\t");
47    // Update existing column widths
48    for (col_width, column_content) in column_widths.iter_mut().zip(column_iter.by_ref()) {
49        *col_width = max(*col_width, column_content.len());
50    }
51    // Push new column widths
52    for column_content in column_iter {
53        column_widths.push(column_content.len());
54    }
55}
56
57fn write_table_line(output: &mut String, line: &str, column_widths: &[usize], alignments: &[Alignment]) {
58    let padding = 2;
59    debug_assert_eq!(line.lines().count(), 1, "line string must consist of a single line");
60    let alignment_iter = alignments.iter().chain(iter::repeat(&Alignment::Left));
61    for ((cell, width), alignment) in line.split("\t").zip(column_widths).zip(alignment_iter) {
62        match alignment {
63            Alignment::Left => write!(output, "{cell:width$}", width = width).unwrap(),
64            Alignment::Right => write!(output, "{cell: >width$}", width = width).unwrap(),
65        }
66        for _ in 0..padding {
67            output.push(' ');
68        }
69    }
70    writeln!(output).unwrap();
71}
72
73#[derive(Debug, Copy, Clone, PartialEq, Eq)]
74enum Alignment {
75    Left,
76    Right,
77}
78
79fn format_table(header: &str, table: &str, alignments: &[Alignment]) -> String {
80    debug_assert_eq!(header.lines().count(), 1, "Header must only have a single line");
81    let mut column_widths = vec![];
82    update_column_widths_for_line(&mut column_widths, header);
83    for line in table.lines() {
84        update_column_widths_for_line(&mut column_widths, line);
85    }
86
87    let mut output = String::new();
88    // Use default alignment for table headers, apply alignments only to cells
89    write_table_line(&mut output, header, &column_widths, &[]);
90    let header_len = output.len();
91    output.push_str(&"═".repeat(header_len));
92    writeln!(output).unwrap();
93
94    for line in table.lines() {
95        write_table_line(&mut output, line, &column_widths, alignments);
96    }
97
98    output.push_str(&"═".repeat(header_len));
99    writeln!(output).unwrap();
100
101    output
102}
103
104pub fn format_timing_tree(tree: &TimingTree) -> String {
105    let mut table = String::new();
106    if let Some(root) = tree.root() {
107        write_timing_tree_node(&mut table, root, &mut vec![]);
108    }
109    use Alignment::{Left, Right};
110    format_table(
111        "Total\tAverage\tSelf\tCount\tRel parent\tRel root\tSpan",
112        &table,
113        &vec![Right, Right, Right, Right, Right, Left],
114    )
115}
116
117fn write_proportion(output: &mut String, proportion: Option<f64>) {
118    if let Some(proportion) = proportion {
119        let percentage = 100.0 * proportion;
120        let _ = write!(output, "{percentage:5.1} %");
121    } else {
122        let _ = write!(output, "    N/A");
123    }
124}
125
126fn write_timing_tree_node(output: &mut String, node: TimingTreeNode, active_stack: &mut Vec<bool>) {
127    let optional_stats = node.payload().as_ref();
128    let duration = optional_stats.map(|stats| stats.duration);
129    let count = optional_stats.map(|stats| stats.count);
130    write_duration(output, duration);
131    write!(output, "\t").unwrap();
132
133    let avg_duration = duration
134        .zip(count)
135        .map(|(duration, count)| duration.div_f64(count as f64));
136    write_duration(output, avg_duration);
137    write!(output, "\t").unwrap();
138
139    let self_relative = optional_stats.and_then(|stats| stats.self_relative);
140    write_proportion(output, self_relative);
141
142    if let Some(count) = count {
143        write!(output, "\t{count}").unwrap();
144    } else {
145        write!(output, "\tN/A").unwrap();
146    }
147
148    write!(output, "\t").unwrap();
149    let duration_relative_to_parent = optional_stats.and_then(|stats| stats.duration_relative_to_parent);
150    write_proportion(output, duration_relative_to_parent);
151    write!(output, "\t").unwrap();
152    let duration_relative_to_root = optional_stats.and_then(|stats| stats.duration_relative_to_root);
153    write_proportion(output, duration_relative_to_root);
154
155    write!(output, "\t").unwrap();
156    if let Some((&parent_is_active, predecessors)) = active_stack.split_last() {
157        for &is_active in predecessors {
158            if is_active {
159                output.push_str("│   ");
160            } else {
161                output.push_str("    ");
162            }
163        }
164        if parent_is_active {
165            output.push_str("├── ");
166        } else {
167            output.push_str("└── ");
168        }
169    }
170
171    writeln!(output, "{}", node.path().span_name().unwrap_or("<root span>")).unwrap();
172    let num_children = node.count_children();
173    for (child_idx, child) in node.visit_children().enumerate() {
174        // We say that an ancestor is "active" if it's not yet processing its last child.
175        // This criterion lets us avoid drawing excessive numbers of vertical lines,
176        // which make for a visually confusing picture.
177        let is_last_child = child_idx + 1 == num_children;
178        active_stack.push(!is_last_child);
179        write_timing_tree_node(output, child, &mut *active_stack);
180        active_stack.pop();
181    }
182}
183
184// TODO: Unit tests for this one?
185fn write_duration(output: &mut String, duration: Option<Duration>) {
186    if let Some(duration) = duration {
187        let secs = duration.as_secs_f64();
188        if 1e-9 <= secs && secs < 1e-6 {
189            write!(output, "{:5.1} ns", secs / 1e-9).unwrap();
190        } else if 1e-6 <= secs && secs < 1e-3 {
191            write!(output, "{:5.1} μs", secs / 1e-6).unwrap();
192        } else if 1e-3 <= secs && secs < 1.0 {
193            write!(output, "{:5.1} ms", secs / 1e-3).unwrap();
194        } else if 1.0 <= secs && secs < 1e3 || secs == 0.0 {
195            write!(output, "{:5.1} s ", secs).unwrap();
196        } else {
197            write!(output, "{:5.1e} s ", secs).unwrap();
198        }
199    } else {
200        write!(output, "   N/A   ").unwrap();
201    }
202}
203
204#[derive(Debug, Clone)]
205pub struct AccumulatedTimings {
206    span_stats: HashMap<SpanPath, DirectStats>,
207}
208
209#[derive(Debug, Clone)]
210pub struct AccumulatedStepTimings {
211    pub timings: AccumulatedTimings,
212    pub step_index: u64,
213}
214
215impl AccumulatedTimings {
216    pub fn new() -> Self {
217        Self {
218            span_stats: Default::default(),
219        }
220    }
221
222    pub fn merge_with_others<'a>(&mut self, others: impl Iterator<Item = &'a AccumulatedTimings>) {
223        for other in others {
224            for (path, stats) in &other.span_stats {
225                let current_stats = self.span_stats.entry(path.clone()).or_default();
226                current_stats.combine_mut(&stats);
227            }
228        }
229    }
230}
231
232impl AccumulatedTimings {
233    pub fn create_timing_tree(&self) -> TimingTree {
234        // The path entries present in the map might not form a valid span tree.
235        // Therefore, we have to ensure that:
236        //  - there's a root node
237        //  - that every node except the root has its parent also present in the tree
238        //  - there are no duplicate nodes
239        //  - the paths are sorted depth-first
240
241        let mut map: HashMap<_, _> = self
242            .span_stats
243            .iter()
244            .map(|(path, stats)| (path.clone(), Some(stats.clone())))
245            .collect();
246
247        // The root node is the common ancestor of all the paths
248        let common_ancestor = self
249            .span_stats
250            .keys()
251            // TODO: This can be done much more efficiently with some manual labor
252            // (i.e. start with the first element and keep knocking off names
253            // so that the path is an ancestor of *all* paths)
254            .fold(None, |common: Option<SpanPath>, path| match common {
255                None => Some(path.clone()),
256                Some(current_common) => Some(current_common.common_ancestor(path)),
257            });
258
259        if let Some(common_ancestor) = common_ancestor {
260            // Insert all "intermediate nodes". For example, if the hash map contains
261            // a>b>c, then try to insert a>b and a, provided they don't "extend past"
262            // the common ancestor
263            for mut path in self.span_stats.keys().cloned() {
264                while let Some(parent_path) = path.parent() {
265                    if parent_path.depth() < common_ancestor.depth() {
266                        break;
267                    } else {
268                        if !map.contains_key(&parent_path) {
269                            map.insert(parent_path.clone(), None);
270                        }
271                        path = parent_path;
272                    }
273                }
274            }
275
276            // The paths may form a forest, not a tree. We therefore insert the common
277            // ancestor, which will function as the root of the tree.
278            map.entry(common_ancestor).or_insert(None);
279        }
280
281        let mut path_duration_pairs: Vec<_> = map.into_iter().collect();
282
283        path_duration_pairs.sort_by(|pair1, pair2| pair1.0.span_names().cmp(pair2.0.span_names()));
284        let (paths_depth_first, durations) = path_duration_pairs.into_iter().unzip();
285
286        SpanTree::try_from_depth_first_ordering(paths_depth_first, durations)
287            .expect("Input should always be a valid span tree")
288            .transform_payloads(|node| {
289                node.payload().as_ref().map(|stats| {
290                    let duration = stats.duration;
291                    let maybe_children_duration = node
292                        .visit_children()
293                        .map(|child| child.payload().as_ref().map(|stats| stats.duration))
294                        // We only return a "children time" if all children actually have a duration
295                        .fold(Some(Duration::default()), |acc, maybe_duration| {
296                            acc.zip(maybe_duration).map(|(a, b)| a + b)
297                        });
298                    let self_duration = maybe_children_duration.map(|children_duration| duration - children_duration);
299                    let self_relative = self_duration
300                        .map(|self_time| self_time.as_secs_f64() / duration.as_secs_f64())
301                        // If duration is zero, we get a NaN. Return None instead in this case
302                        .filter(|proportion| proportion.is_finite());
303
304                    DerivedStats {
305                        duration: stats.duration,
306                        count: stats.count,
307                        duration_relative_to_parent: node.parent().and_then(|parent_node| {
308                            parent_node.payload().as_ref().map(|parent_stats| {
309                                let parent_duration = parent_stats.duration;
310                                let proportion = duration.as_secs_f64() / parent_duration.as_secs_f64();
311                                proportion
312                            })
313                        }),
314                        duration_relative_to_root: node.root().payload().as_ref().map(|root_stats| {
315                            let root_duration = root_stats.duration;
316                            let proportion = duration.as_secs_f64() / root_duration.as_secs_f64();
317                            proportion
318                        }),
319                        self_duration,
320                        self_relative,
321                    }
322                })
323            })
324    }
325}
326
327#[derive(Debug, Clone)]
328pub struct AccumulatedTimingSeries {
329    steps: Vec<AccumulatedStepTimings>,
330    /// Timings for any spans that are not part of the "step" span (could be related to setup)
331    /// or similar.
332    intransient_timings: AccumulatedTimings,
333    // TODO: Timing from other sources outside of steps?
334}
335
336impl AccumulatedTimingSeries {
337    pub fn summarize(&self) -> AccumulatedTimings {
338        let mut summary = self.intransient_timings.clone();
339        summary.merge_with_others(self.steps().iter().map(|step| &step.timings));
340        summary
341    }
342}
343
344impl AccumulatedTimingSeries {
345    pub fn steps(&self) -> &[AccumulatedStepTimings] {
346        &self.steps
347    }
348}
349
350pub fn extract_step_timings<'a>(records: impl IntoIterator<Item = Record>) -> eyre::Result<AccumulatedTimingSeries> {
351    // TODO: Collect statistics from spans outside run as well
352    find_and_visit_dynamecs_run_span(records.into_iter())
353}
354
355pub fn extract_timing_summary<'a>(records: impl IntoIterator<Item = Record>) -> eyre::Result<AccumulatedTimings> {
356    extract_step_timings(records).map(|series| series.summarize())
357}
358
359fn find_and_visit_dynamecs_run_span<'a>(
360    mut records: impl Iterator<Item = Record>,
361) -> eyre::Result<AccumulatedTimingSeries> {
362    // First try to find the `run` span in the records
363    while let Some(record) = records.next() {
364        if let Some(span) = record.span() {
365            if span.name() == "run" && record.target() == "dynamecs_app" && record.kind() == RecordKind::SpanEnter {
366                return visit_dynamecs_run_span(&record, records);
367            }
368        }
369    }
370
371    Err(eyre!(
372        "Could not find new event for `run` span of dynamecs among records"
373    ))
374}
375
376fn visit_dynamecs_run_span<'a>(
377    run_new_record: &Record,
378    remaining_records: impl Iterator<Item = Record>,
379) -> eyre::Result<AccumulatedTimingSeries> {
380    let run_thread = run_new_record.thread_id();
381    let mut iter = remaining_records;
382    let mut steps = Vec::new();
383
384    let mut intransient_accumulator = TimingAccumulator::new();
385    intransient_accumulator.enter_span(run_new_record.create_span_path()?, *run_new_record.timestamp())?;
386
387    while let Some(record) = iter.next() {
388        if record.thread_id() == run_thread {
389            if let Some(span) = record.span() {
390                match (span.name(), record.target(), record.kind()) {
391                    ("step", "dynamecs_app", SpanEnter) => {
392                        if let Some(step) = visit_dynamecs_step_span(&record, &mut iter)? {
393                            // Only collect complete time steps
394                            steps.push(step);
395                        }
396                    }
397                    // Accumulate "intransient timings", i.e. timings for things that are
398                    // not inside of a step
399                    (_, _, SpanEnter) => {
400                        intransient_accumulator.enter_span(record.create_span_path()?, *record.timestamp())?
401                    }
402                    (span_name, record_target, SpanExit) => {
403                        intransient_accumulator.exit_span(record.create_span_path()?, *record.timestamp())?;
404                        if span_name == "run" && record_target == "dynamecs_app" {
405                            break;
406                        }
407                    }
408                    _ => {}
409                }
410            }
411        }
412    }
413
414    Ok(AccumulatedTimingSeries {
415        steps,
416        intransient_timings: AccumulatedTimings {
417            span_stats: intransient_accumulator.collect_completed_statistics(),
418        },
419    })
420}
421
422/// Returns accumulated timings for the next *complete* step in the records.
423fn visit_dynamecs_step_span<'a>(
424    step_new_record: &Record,
425    remaining_records: &mut impl Iterator<Item = Record>,
426) -> eyre::Result<Option<AccumulatedStepTimings>> {
427    let step_path = step_new_record.create_span_path()?;
428
429    let mut accumulator = TimingAccumulator::new();
430    accumulator.enter_span(step_path.clone(), step_new_record.timestamp().clone())?;
431
432    let step_index = step_new_record
433        .span()
434        .and_then(|span| span.fields().pointer("/step_index"))
435        .and_then(|value| value.as_u64())
436        .ok_or_else(|| eyre!("step span does not have step_index field"))?;
437
438    while let Some(record) = remaining_records.next() {
439        if record.thread_id() == step_new_record.thread_id() {
440            if let Some(span) = record.span() {
441                match record.kind() {
442                    SpanEnter => {
443                        accumulator.enter_span(record.create_span_path()?, record.timestamp().clone())?;
444                    }
445                    SpanExit => {
446                        // TODO: use a stack to verify that open/close events are consistent?
447                        let span_path = record.create_span_path()?;
448                        let is_step_span_path = span_path == step_path;
449                        accumulator.exit_span(span_path, record.timestamp().clone())?;
450                        if span.name() == "step" && record.target() == "dynamecs_app" && is_step_span_path {
451                            break;
452                        }
453                    }
454                    _ => {}
455                }
456            }
457        }
458    }
459
460    if accumulator.has_active_spans() {
461        // If there are active spans, then the step is not yet complete,
462        // so we do not want to include it in accumulation
463        // (would lead to inconsistent time between parent and children)
464        Ok(None)
465    } else {
466        Ok(Some(AccumulatedStepTimings {
467            timings: AccumulatedTimings {
468                span_stats: accumulator.collect_completed_statistics(),
469            },
470            step_index,
471        }))
472    }
473}
474
475#[derive(Debug)]
476struct TimingAccumulator {
477    completed_statistics: HashMap<SpanPath, DirectStats>,
478    enter_timestamps: HashMap<SpanPath, OffsetDateTime>,
479}
480
481impl TimingAccumulator {
482    pub fn new() -> Self {
483        Self {
484            completed_statistics: Default::default(),
485            enter_timestamps: Default::default(),
486        }
487    }
488
489    pub fn enter_span(&mut self, path: SpanPath, timestamp: OffsetDateTime) -> eyre::Result<()> {
490        match self.enter_timestamps.entry(path) {
491            Entry::Vacant(vacancy) => {
492                vacancy.insert(timestamp);
493                Ok(())
494            }
495            Entry::Occupied(old) => Err(eyre!(
496                "tried to create new span {} that is already active\
497                                               (not closed)",
498                old.key()
499            )),
500        }
501    }
502
503    pub fn exit_span(&mut self, path: SpanPath, timestamp_close: OffsetDateTime) -> eyre::Result<()> {
504        let timestamp_enter = self
505            .enter_timestamps
506            .remove(&path)
507            .ok_or_else(|| eyre!("found close event for span that is not currently active. Span path: {path}"))?;
508        let span_duration: Duration = (timestamp_close - timestamp_enter).unsigned_abs();
509        let accumulated_stats = self.completed_statistics.entry(path).or_default();
510        accumulated_stats.combine_mut(&DirectStats::from_single_duration(span_duration));
511        Ok(())
512    }
513
514    pub fn has_active_spans(&self) -> bool {
515        !self.enter_timestamps.is_empty()
516    }
517
518    pub fn collect_completed_statistics(self) -> HashMap<SpanPath, DirectStats> {
519        self.completed_statistics
520    }
521}