near_async/instrumentation/
reader.rs

1use std::collections::HashMap;
2use std::sync::atomic::Ordering;
3
4use serde::Serialize;
5
6use crate::instrumentation::data::{InstrumentedThread, MessageTypeRegistry};
7use crate::instrumentation::instrumented_window::{
8    AggregatedMessageTypeStats, InstrumentedEvent, InstrumentedEventBuffer, InstrumentedWindow,
9    InstrumentedWindowSummary,
10};
11use crate::instrumentation::{NUM_WINDOWS, WINDOW_SIZE_NS};
12
13#[derive(Serialize, Debug)]
14pub struct InstrumentedThreadsView {
15    pub threads: Vec<InstrumentedThreadView>,
16    pub current_time_unix_ms: u64,
17    pub current_time_relative_ms: u64,
18}
19
20#[derive(Serialize, Debug)]
21pub struct InstrumentedThreadView {
22    pub thread_name: String,
23    pub active_time_ns: u64,
24    pub message_types: Vec<String>,
25    pub windows: Vec<InstrumentedWindowView>,
26    pub active_event: Option<InstrumentedActiveEventView>,
27    pub queue: HashMap<String, u64>,
28}
29
30#[derive(Serialize, Debug)]
31pub struct InstrumentedActiveEventView {
32    pub message_type: u32,
33    /// Nanoseconds active, as of current_time_ms in InstrumentedThreadsView.
34    pub active_for_ns: u64,
35}
36
37#[derive(Serialize, Debug)]
38pub struct InstrumentedWindowView {
39    pub start_time_ms: u64,
40    pub end_time_ms: u64,
41    pub events: Vec<InstrumentedEventView>,
42    pub events_overfilled: bool,
43    pub summary: InstrumentedWindowSummaryView,
44    pub dequeue_summary: InstrumentedWindowSummaryView,
45}
46
47impl InstrumentedWindowView {
48    /// Sometimes one or more recent windows have not yet been created because an event has been
49    /// blocking the thread for so long. In that case, fill in the missing windows with maybe an
50    /// active event.
51    pub fn new_filler(
52        start_time_ns: u64,
53        end_time_ns: u64,
54        active_event: Option<&InstrumentedActiveEventView>,
55    ) -> Self {
56        let mut view = InstrumentedWindowView {
57            start_time_ms: start_time_ns / 1_000_000,
58            end_time_ms: end_time_ns / 1_000_000,
59            events: Vec::new(),
60            events_overfilled: false,
61            summary: InstrumentedWindowSummaryView { message_stats_by_type: Vec::new() },
62            dequeue_summary: InstrumentedWindowSummaryView { message_stats_by_type: Vec::new() },
63        };
64        if let Some(active_event) = active_event {
65            view.summary.message_stats_by_type.push(MessageStatsForTypeView {
66                message_type: active_event.message_type as i32,
67                count: 1,
68                total_time_ns: end_time_ns - start_time_ns,
69            });
70        }
71        view
72    }
73}
74
75#[derive(Serialize, Debug)]
76pub struct InstrumentedEventView {
77    #[serde(rename = "m")]
78    pub message_type: u32,
79    #[serde(rename = "s")]
80    pub is_start: bool,
81    /// Relative to the beginning of the window.
82    #[serde(rename = "t")]
83    pub relative_timestamp_ns: u64,
84}
85
86#[derive(Serialize, Debug)]
87pub struct InstrumentedWindowSummaryView {
88    pub message_stats_by_type: Vec<MessageStatsForTypeView>,
89}
90
91#[derive(Serialize, Debug)]
92pub struct MessageStatsForTypeView {
93    /// Index into InstrumentedThreadView.message_types.
94    /// May be -1 for unknown message types (if the type registry temporarily overflowed).
95    #[serde(rename = "m")]
96    pub message_type: i32,
97    #[serde(rename = "c")]
98    pub count: usize,
99    #[serde(rename = "t")]
100    pub total_time_ns: u64,
101}
102
103fn decode_message_event(encoded: u64) -> (u32, bool) {
104    let message_type = (encoded & 0xFFFFFFFF) as u32;
105    let is_start = (encoded & (1 << 32)) != 0;
106    (message_type, is_start)
107}
108
109impl InstrumentedEvent {
110    pub fn to_view(&self) -> InstrumentedEventView {
111        let encoded = self.event.load(Ordering::Relaxed);
112        let (message_type, is_start) = decode_message_event(encoded);
113        InstrumentedEventView {
114            message_type,
115            is_start,
116            relative_timestamp_ns: self.relative_timestamp_ns.load(Ordering::Acquire),
117        }
118    }
119}
120
121impl InstrumentedEventBuffer {
122    /// Returns the events and whether the buffer is overfilled.
123    pub fn to_view(&self) -> (Vec<InstrumentedEventView>, bool) {
124        let len = self.len.load(Ordering::Acquire);
125        let readable_len = len.min(self.buffer.len());
126        (
127            (0..readable_len).map(|i| self.buffer[i].to_view()).collect::<Vec<_>>(),
128            len != readable_len,
129        )
130    }
131}
132
133impl InstrumentedWindowSummary {
134    pub fn to_view(&self, window_index: usize) -> InstrumentedWindowSummaryView {
135        let mut message_stats_by_type = Vec::new();
136        for (i, stats) in self.time_by_message_type.iter().enumerate() {
137            if let Some(view) = stats.to_view(window_index, i as i32) {
138                message_stats_by_type.push(view);
139            }
140        }
141        if let Some(unknown) = self.unknown_total.to_view(window_index, -1) {
142            message_stats_by_type.push(unknown);
143        }
144        InstrumentedWindowSummaryView { message_stats_by_type }
145    }
146}
147
148impl AggregatedMessageTypeStats {
149    pub fn to_view(
150        &self,
151        window_index: usize,
152        message_type_id: i32,
153    ) -> Option<MessageStatsForTypeView> {
154        if self.window_index.load(Ordering::Acquire) == window_index {
155            let count = self.count.load(Ordering::Relaxed);
156            if count > 0 {
157                let total_time_ns = self.total_time_ns.load(Ordering::Relaxed);
158                return Some(MessageStatsForTypeView {
159                    message_type: message_type_id,
160                    count,
161                    total_time_ns,
162                });
163            }
164        }
165        None
166    }
167}
168
169impl InstrumentedWindow {
170    pub fn to_view(&self, end_time: u64) -> InstrumentedWindowView {
171        let (events, events_overfilled) = self.events.to_view();
172        InstrumentedWindowView {
173            start_time_ms: self.start_time_ns / 1_000_000,
174            end_time_ms: end_time / 1_000_000,
175            events,
176            events_overfilled,
177            summary: self.summary.to_view(self.index),
178            dequeue_summary: self.dequeue_summary.to_view(self.index),
179        }
180    }
181}
182
183impl MessageTypeRegistry {
184    pub fn to_vec(&self) -> Vec<String> {
185        self.types.read().clone()
186    }
187}
188
189impl InstrumentedThread {
190    pub fn to_view(&self, current_time_ns: u64) -> InstrumentedThreadView {
191        let active_event = self.active_event.load(Ordering::Acquire);
192        let active_event = if active_event != 0 {
193            let start_time = self.active_event_start_ns.load(Ordering::Relaxed);
194            let (message_type, _) = decode_message_event(active_event);
195            Some(InstrumentedActiveEventView {
196                message_type,
197                active_for_ns: current_time_ns.saturating_sub(start_time),
198            })
199        } else {
200            None
201        };
202        let current_window_index = self.current_window_index.load(Ordering::Acquire);
203        let mut windows = Vec::new();
204        let mut prev_window_start_time = current_time_ns;
205
206        // Fill in missing windows if the current time is too far ahead of the last recorded window.
207        let last_recorded_window_start_time =
208            self.windows[current_window_index % self.windows.len()].read().start_time_ns;
209        while prev_window_start_time > last_recorded_window_start_time + WINDOW_SIZE_NS {
210            let modulo =
211                (prev_window_start_time - last_recorded_window_start_time) % WINDOW_SIZE_NS;
212            let filler_start_time = if modulo == 0 {
213                prev_window_start_time - WINDOW_SIZE_NS
214            } else {
215                prev_window_start_time - modulo
216            };
217            windows.push(InstrumentedWindowView::new_filler(
218                filler_start_time,
219                prev_window_start_time,
220                active_event.as_ref(),
221            ));
222            prev_window_start_time = filler_start_time;
223        }
224        let oldest_time_to_return =
225            current_time_ns.saturating_sub(NUM_WINDOWS as u64 * WINDOW_SIZE_NS);
226
227        for i in 0..NUM_WINDOWS.min(current_window_index + 1) {
228            let window_index = current_window_index - i;
229            let window = &self.windows[window_index % self.windows.len()];
230            let read = window.read();
231            if read.start_time_ns < oldest_time_to_return {
232                break;
233            }
234            windows.push(read.to_view(prev_window_start_time));
235            prev_window_start_time = read.start_time_ns;
236        }
237        InstrumentedThreadView {
238            thread_name: self.thread_name.clone(),
239            active_time_ns: (current_time_ns.saturating_sub(self.started_time_ns)) / 1_000_000,
240            message_types: self.message_type_registry.to_vec(),
241            windows,
242            active_event,
243            queue: self.queue.get_pending_events(),
244        }
245    }
246}