near_async/instrumentation/data.rs
1use std::collections::HashMap;
2use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
3use std::sync::{Arc, LazyLock};
4use std::time::Instant;
5
6use near_time::Clock;
7use parking_lot::RwLock;
8
9use crate::instrumentation::instrumented_window::InstrumentedWindow;
10use crate::instrumentation::queue::InstrumentedQueue;
11use crate::instrumentation::reader::InstrumentedThreadsView;
12use crate::instrumentation::{NUM_WINDOWS, WINDOW_SIZE_NS};
13/// it needs to be at least NUM_WINDOWS + 1, but we round up to a power of two for efficiency
14const WINDOW_ARRAY_SIZE: usize = NUM_WINDOWS + 4;
15
16/// Top-level struct containing all actor instrumentations.
17pub struct AllActorInstrumentations {
18 /// This is the instant that all timestamps are in reference to.
19 pub reference_instant: Instant,
20 /// Map from the actor's unique identifier to its instrumentation data.
21 pub threads: RwLock<HashMap<usize, Arc<InstrumentedThread>>>,
22}
23
24impl AllActorInstrumentations {
25 /// Converts the entire data structure into a view that can be serialized.
26 pub fn to_view(&self, clock: &Clock) -> InstrumentedThreadsView {
27 #[allow(clippy::needless_collect)] // to avoid long locking
28 let threads = self.threads.read().values().cloned().collect::<Vec<_>>();
29 let current_time_ns = clock.now().duration_since(self.reference_instant).as_nanos() as u64;
30 let current_time_unix_ms = (clock.now_utc().unix_timestamp_nanos() / 1000000) as u64;
31 let mut threads =
32 threads.into_iter().map(|thread| thread.to_view(current_time_ns)).collect::<Vec<_>>();
33 threads.sort_by_key(|thread| -(thread.active_time_ns as i128));
34 InstrumentedThreadsView {
35 current_time_unix_ms,
36 current_time_relative_ms: current_time_ns / 1_000_000,
37 threads,
38 }
39 }
40}
41
42pub static ALL_ACTOR_INSTRUMENTATIONS: LazyLock<AllActorInstrumentations> =
43 LazyLock::new(|| AllActorInstrumentations {
44 reference_instant: Instant::now(),
45 threads: RwLock::new(HashMap::new()),
46 });
47
48/// Tracks recent thread activity for the past number of windows.
49/// Each window is 500 milliseconds (configurable).
50/// The actor thread which handles messages is the one pushing events to this struct,
51/// and a debugging thread may pull it to display recent activity.
52/// It is very important that pushing events is as efficient as possible: it should not be
53/// blocked by any debugging threads, and it should not do big allocations except in the
54/// very rare cases.
55///
56/// Note that this is not the entire state of instrumentation that is maintained by the
57/// actor thread; rather, it is the state that is necessarily shared between the actor
58/// thread and debugging threads.
59///
60/// Given that context, you may wonder why the design is so complex with atomics deep in
61/// the data structures; after all, can't we just have the actor thread aggregate stats
62/// locally and then give them (as an Arc) to the common data structure when a window is
63/// complete, and that way we won't need any atomics inside the window structures? The
64/// problem is that as soon as we the actor thread gives away the Arc, it has no chance
65/// of reusing it, so all of that memory would have to be freed later (by the actor thread
66/// most likely), and the next window would have to be reallocated from scratch. This would
67/// lead to constant allocations and deallocations. It might not sound much, but when this
68/// is done on every actor thread, it will be significant.
69pub struct InstrumentedThread {
70 pub thread_name: String,
71 /// The name of the actor (if any) that this thread is running. This is used
72 /// in metrics, to allow grouping by actor name for multithreaded actors.
73 pub actor_name: String,
74 /// The (possibly shared) queue instrumentation.
75 pub queue: Arc<InstrumentedQueue>,
76 /// Time when this thread was started, in nanoseconds since reference_instant.
77 pub started_time_ns: u64,
78 /// Registry of message types that are seen so far on this thread. It is used to
79 /// enable dense indexing of per-message-type stats in the InstrumentedWindowSummary.
80 pub message_type_registry: MessageTypeRegistry,
81 /// This is a fixed-size ring buffer of windows. Although each element is protected
82 /// by a RwLock, the only time we need to write-lock it is when we need to initialize
83 /// a new window (every time we advance to the next window). Doing this lock does not
84 /// cause any contention because the reader thread would not be reading that window.
85 ///
86 /// If there are N windows we keep, then the size of this vector is N + 1. This is
87 /// because the extra window is used for initialization. When we advance to the next
88 /// window, we first write-lock the next window and initialize it (meanwhile knowing
89 /// that any reader thread would not be touching that window at all because it is
90 /// the extra window doesn't hold useful data yet), and only after that do we advance
91 /// the current window index.
92 ///
93 /// All other operations (including when we record new events) only need a read lock,
94 /// meaning there should be no contention at all.
95 pub windows: Vec<RwLock<InstrumentedWindow>>,
96 /// This is a monotonically increasing index of the current window;
97 /// it does not wrap around. Rather, we calculate the actual index into the array
98 /// by modding by the array's size.
99 pub current_window_index: AtomicUsize,
100 /// The event that is currently being processed, if any, encoded with
101 /// encode_message_event().
102 pub active_event: AtomicU64,
103 pub active_event_start_ns: AtomicU64,
104}
105
106impl InstrumentedThread {
107 pub fn new(
108 thread_name: String,
109 actor_name: String,
110 queue: Arc<InstrumentedQueue>,
111 start_time: u64,
112 ) -> Self {
113 Self {
114 thread_name,
115 actor_name,
116 queue,
117 started_time_ns: start_time,
118 message_type_registry: MessageTypeRegistry::default(),
119 windows: (0..WINDOW_ARRAY_SIZE)
120 .map(|_| RwLock::new(InstrumentedWindow::new()))
121 .collect(),
122 current_window_index: AtomicUsize::new(0),
123 active_event: AtomicU64::new(0),
124 active_event_start_ns: AtomicU64::new(0),
125 }
126 }
127
128 pub fn start_event(&self, message_type_id: u32, timestamp_ns: u64, dequeue_time_ns: u64) {
129 let encoded_event = encode_message_event(message_type_id, true);
130 self.active_event_start_ns.store(timestamp_ns, Ordering::Relaxed);
131 // Release order here because this atomic embeds an "is present" bit, and this
132 // is used to synchronize the start timestamp stored above.
133 // Note that this isn't actually very sound because it's possible that the reader
134 // reads an active event but by the time it reads the timestamp, another event has
135 // started. This seems very unlikely though because the reader only needs to do two
136 // atomic reads whereas the writer has to do a bunch of other stuff. And we don't
137 // need this to be absolutely perfect anyway.
138 self.active_event.store(encoded_event, Ordering::Release);
139 let current_window_index = self.current_window_index.load(Ordering::Relaxed);
140 let window = &self.windows[current_window_index % WINDOW_ARRAY_SIZE];
141 let window = window.read();
142 window.events.push(encoded_event, timestamp_ns.saturating_sub(window.start_time_ns));
143 window.dequeue_summary.add_message_time(
144 current_window_index,
145 message_type_id,
146 dequeue_time_ns,
147 );
148 }
149
150 // Ends the currently active event, if any, and returns the elapsed time in nanoseconds.
151 pub fn end_event(&self, timestamp_ns: u64) -> u64 {
152 let active_event = self.active_event.load(Ordering::Relaxed);
153 let message_type_id = active_event as u32;
154 let start_timestamp = self.active_event_start_ns.load(Ordering::Relaxed);
155 let encoded_event = encode_message_event(message_type_id, false);
156 self.active_event.store(0, Ordering::Relaxed);
157 let current_window_index = self.current_window_index.load(Ordering::Relaxed);
158 let window = &self.windows[current_window_index % WINDOW_ARRAY_SIZE];
159 let window = window.read();
160 window.events.push(encoded_event, timestamp_ns.saturating_sub(window.start_time_ns));
161 let elapsed_ns = timestamp_ns.saturating_sub(start_timestamp.max(window.start_time_ns));
162 window.summary.add_message_time(current_window_index, message_type_id, elapsed_ns);
163 let total_elapsed_ns = timestamp_ns.saturating_sub(start_timestamp);
164 total_elapsed_ns
165 }
166
167 pub fn advance_window(&self, window_end_time_ns: u64) {
168 let current_window_index = self.current_window_index.load(Ordering::Relaxed);
169 let active_event = self.active_event.load(Ordering::Relaxed);
170 if active_event != 0 {
171 let active_event = active_event as u32;
172 let elapsed_in_window = window_end_time_ns
173 .saturating_sub(self.active_event_start_ns.load(Ordering::Relaxed))
174 .min(WINDOW_SIZE_NS);
175 self.windows[current_window_index % WINDOW_ARRAY_SIZE].read().summary.add_message_time(
176 current_window_index,
177 active_event,
178 elapsed_in_window,
179 );
180 }
181 let next_window_index = current_window_index + 1;
182 let next_window = &self.windows[next_window_index % WINDOW_ARRAY_SIZE];
183 let num_types = self.message_type_registry.types.read().len();
184 next_window.write().reinitialize(next_window_index, window_end_time_ns, num_types);
185 // Release ordering to indicate to any reader that the new window is ready for reading.
186 self.current_window_index.store(next_window_index, Ordering::Release);
187 }
188}
189
190/// This is the registry of message type indexes. It may be surprising to see that
191/// there is no map from the type name to the index. This is because the purpose of
192/// this registry is only to provide information on the mapping to the debug UI frontend.
193/// To figure out what message type ID corresponds to a message type, it is the actor
194/// thread's responsibility to remember and lookup the mapping - it is much easier there
195/// because it does not need any locking.
196#[derive(Default)]
197pub struct MessageTypeRegistry {
198 pub types: RwLock<Vec<String>>,
199}
200
201impl MessageTypeRegistry {
202 pub fn push_type(&self, type_name: String) {
203 let mut types = self.types.write();
204 types.push(type_name);
205 }
206}
207
208fn encode_message_event(message_type_id: u32, is_start: bool) -> u64 {
209 let mut event = message_type_id as u64;
210 if is_start {
211 event |= 1 << 32;
212 }
213 event
214}