near_async/instrumentation/
reader.rs1use std::collections::HashMap;
2use std::sync::atomic::Ordering;
3
4use serde::Serialize;
5
6use crate::instrumentation::data::{InstrumentedThread, MessageTypeRegistry};
7use crate::instrumentation::instrumented_window::{
8 AggregatedMessageTypeStats, InstrumentedEvent, InstrumentedEventBuffer, InstrumentedWindow,
9 InstrumentedWindowSummary,
10};
11use crate::instrumentation::{NUM_WINDOWS, WINDOW_SIZE_NS};
12
13#[derive(Serialize, Debug)]
14pub struct InstrumentedThreadsView {
15 pub threads: Vec<InstrumentedThreadView>,
16 pub current_time_unix_ms: u64,
17 pub current_time_relative_ms: u64,
18}
19
20#[derive(Serialize, Debug)]
21pub struct InstrumentedThreadView {
22 pub thread_name: String,
23 pub active_time_ns: u64,
24 pub message_types: Vec<String>,
25 pub windows: Vec<InstrumentedWindowView>,
26 pub active_event: Option<InstrumentedActiveEventView>,
27 pub queue: HashMap<String, u64>,
28}
29
30#[derive(Serialize, Debug)]
31pub struct InstrumentedActiveEventView {
32 pub message_type: u32,
33 pub active_for_ns: u64,
35}
36
37#[derive(Serialize, Debug)]
38pub struct InstrumentedWindowView {
39 pub start_time_ms: u64,
40 pub end_time_ms: u64,
41 pub events: Vec<InstrumentedEventView>,
42 pub events_overfilled: bool,
43 pub summary: InstrumentedWindowSummaryView,
44 pub dequeue_summary: InstrumentedWindowSummaryView,
45}
46
47impl InstrumentedWindowView {
48 pub fn new_filler(
52 start_time_ns: u64,
53 end_time_ns: u64,
54 active_event: Option<&InstrumentedActiveEventView>,
55 ) -> Self {
56 let mut view = InstrumentedWindowView {
57 start_time_ms: start_time_ns / 1_000_000,
58 end_time_ms: end_time_ns / 1_000_000,
59 events: Vec::new(),
60 events_overfilled: false,
61 summary: InstrumentedWindowSummaryView { message_stats_by_type: Vec::new() },
62 dequeue_summary: InstrumentedWindowSummaryView { message_stats_by_type: Vec::new() },
63 };
64 if let Some(active_event) = active_event {
65 view.summary.message_stats_by_type.push(MessageStatsForTypeView {
66 message_type: active_event.message_type as i32,
67 count: 1,
68 total_time_ns: end_time_ns - start_time_ns,
69 });
70 }
71 view
72 }
73}
74
75#[derive(Serialize, Debug)]
76pub struct InstrumentedEventView {
77 #[serde(rename = "m")]
78 pub message_type: u32,
79 #[serde(rename = "s")]
80 pub is_start: bool,
81 #[serde(rename = "t")]
83 pub relative_timestamp_ns: u64,
84}
85
86#[derive(Serialize, Debug)]
87pub struct InstrumentedWindowSummaryView {
88 pub message_stats_by_type: Vec<MessageStatsForTypeView>,
89}
90
91#[derive(Serialize, Debug)]
92pub struct MessageStatsForTypeView {
93 #[serde(rename = "m")]
96 pub message_type: i32,
97 #[serde(rename = "c")]
98 pub count: usize,
99 #[serde(rename = "t")]
100 pub total_time_ns: u64,
101}
102
103fn decode_message_event(encoded: u64) -> (u32, bool) {
104 let message_type = (encoded & 0xFFFFFFFF) as u32;
105 let is_start = (encoded & (1 << 32)) != 0;
106 (message_type, is_start)
107}
108
109impl InstrumentedEvent {
110 pub fn to_view(&self) -> InstrumentedEventView {
111 let encoded = self.event.load(Ordering::Relaxed);
112 let (message_type, is_start) = decode_message_event(encoded);
113 InstrumentedEventView {
114 message_type,
115 is_start,
116 relative_timestamp_ns: self.relative_timestamp_ns.load(Ordering::Acquire),
117 }
118 }
119}
120
121impl InstrumentedEventBuffer {
122 pub fn to_view(&self) -> (Vec<InstrumentedEventView>, bool) {
124 let len = self.len.load(Ordering::Acquire);
125 let readable_len = len.min(self.buffer.len());
126 (
127 (0..readable_len).map(|i| self.buffer[i].to_view()).collect::<Vec<_>>(),
128 len != readable_len,
129 )
130 }
131}
132
133impl InstrumentedWindowSummary {
134 pub fn to_view(&self, window_index: usize) -> InstrumentedWindowSummaryView {
135 let mut message_stats_by_type = Vec::new();
136 for (i, stats) in self.time_by_message_type.iter().enumerate() {
137 if let Some(view) = stats.to_view(window_index, i as i32) {
138 message_stats_by_type.push(view);
139 }
140 }
141 if let Some(unknown) = self.unknown_total.to_view(window_index, -1) {
142 message_stats_by_type.push(unknown);
143 }
144 InstrumentedWindowSummaryView { message_stats_by_type }
145 }
146}
147
148impl AggregatedMessageTypeStats {
149 pub fn to_view(
150 &self,
151 window_index: usize,
152 message_type_id: i32,
153 ) -> Option<MessageStatsForTypeView> {
154 if self.window_index.load(Ordering::Acquire) == window_index {
155 let count = self.count.load(Ordering::Relaxed);
156 if count > 0 {
157 let total_time_ns = self.total_time_ns.load(Ordering::Relaxed);
158 return Some(MessageStatsForTypeView {
159 message_type: message_type_id,
160 count,
161 total_time_ns,
162 });
163 }
164 }
165 None
166 }
167}
168
169impl InstrumentedWindow {
170 pub fn to_view(&self, end_time: u64) -> InstrumentedWindowView {
171 let (events, events_overfilled) = self.events.to_view();
172 InstrumentedWindowView {
173 start_time_ms: self.start_time_ns / 1_000_000,
174 end_time_ms: end_time / 1_000_000,
175 events,
176 events_overfilled,
177 summary: self.summary.to_view(self.index),
178 dequeue_summary: self.dequeue_summary.to_view(self.index),
179 }
180 }
181}
182
183impl MessageTypeRegistry {
184 pub fn to_vec(&self) -> Vec<String> {
185 self.types.read().clone()
186 }
187}
188
189impl InstrumentedThread {
190 pub fn to_view(&self, current_time_ns: u64) -> InstrumentedThreadView {
191 let active_event = self.active_event.load(Ordering::Acquire);
192 let active_event = if active_event != 0 {
193 let start_time = self.active_event_start_ns.load(Ordering::Relaxed);
194 let (message_type, _) = decode_message_event(active_event);
195 Some(InstrumentedActiveEventView {
196 message_type,
197 active_for_ns: current_time_ns.saturating_sub(start_time),
198 })
199 } else {
200 None
201 };
202 let current_window_index = self.current_window_index.load(Ordering::Acquire);
203 let mut windows = Vec::new();
204 let mut prev_window_start_time = current_time_ns;
205
206 let last_recorded_window_start_time =
208 self.windows[current_window_index % self.windows.len()].read().start_time_ns;
209 while prev_window_start_time > last_recorded_window_start_time + WINDOW_SIZE_NS {
210 let modulo =
211 (prev_window_start_time - last_recorded_window_start_time) % WINDOW_SIZE_NS;
212 let filler_start_time = if modulo == 0 {
213 prev_window_start_time - WINDOW_SIZE_NS
214 } else {
215 prev_window_start_time - modulo
216 };
217 windows.push(InstrumentedWindowView::new_filler(
218 filler_start_time,
219 prev_window_start_time,
220 active_event.as_ref(),
221 ));
222 prev_window_start_time = filler_start_time;
223 }
224 let oldest_time_to_return =
225 current_time_ns.saturating_sub(NUM_WINDOWS as u64 * WINDOW_SIZE_NS);
226
227 for i in 0..NUM_WINDOWS.min(current_window_index + 1) {
228 let window_index = current_window_index - i;
229 let window = &self.windows[window_index % self.windows.len()];
230 let read = window.read();
231 if read.start_time_ns < oldest_time_to_return {
232 break;
233 }
234 windows.push(read.to_view(prev_window_start_time));
235 prev_window_start_time = read.start_time_ns;
236 }
237 InstrumentedThreadView {
238 thread_name: self.thread_name.clone(),
239 active_time_ns: (current_time_ns.saturating_sub(self.started_time_ns)) / 1_000_000,
240 message_types: self.message_type_registry.to_vec(),
241 windows,
242 active_event,
243 queue: self.queue.get_pending_events(),
244 }
245 }
246}