near_async/instrumentation/
data.rs

1use std::collections::HashMap;
2use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
3use std::sync::{Arc, LazyLock};
4use std::time::Instant;
5
6use near_time::Clock;
7use parking_lot::RwLock;
8
9use crate::instrumentation::instrumented_window::InstrumentedWindow;
10use crate::instrumentation::queue::InstrumentedQueue;
11use crate::instrumentation::reader::InstrumentedThreadsView;
12use crate::instrumentation::{NUM_WINDOWS, WINDOW_SIZE_NS};
13/// it needs to be at least NUM_WINDOWS + 1, but we round up to a power of two for efficiency
14const WINDOW_ARRAY_SIZE: usize = NUM_WINDOWS + 4;
15
16/// Top-level struct containing all actor instrumentations.
17pub struct AllActorInstrumentations {
18    /// This is the instant that all timestamps are in reference to.
19    pub reference_instant: Instant,
20    /// Map from the actor's unique identifier to its instrumentation data.
21    pub threads: RwLock<HashMap<usize, Arc<InstrumentedThread>>>,
22}
23
24impl AllActorInstrumentations {
25    /// Converts the entire data structure into a view that can be serialized.
26    pub fn to_view(&self, clock: &Clock) -> InstrumentedThreadsView {
27        #[allow(clippy::needless_collect)] // to avoid long locking
28        let threads = self.threads.read().values().cloned().collect::<Vec<_>>();
29        let current_time_ns = clock.now().duration_since(self.reference_instant).as_nanos() as u64;
30        let current_time_unix_ms = (clock.now_utc().unix_timestamp_nanos() / 1000000) as u64;
31        let mut threads =
32            threads.into_iter().map(|thread| thread.to_view(current_time_ns)).collect::<Vec<_>>();
33        threads.sort_by_key(|thread| -(thread.active_time_ns as i128));
34        InstrumentedThreadsView {
35            current_time_unix_ms,
36            current_time_relative_ms: current_time_ns / 1_000_000,
37            threads,
38        }
39    }
40}
41
42pub static ALL_ACTOR_INSTRUMENTATIONS: LazyLock<AllActorInstrumentations> =
43    LazyLock::new(|| AllActorInstrumentations {
44        reference_instant: Instant::now(),
45        threads: RwLock::new(HashMap::new()),
46    });
47
48/// Tracks recent thread activity for the past number of windows.
49/// Each window is 500 milliseconds (configurable).
50/// The actor thread which handles messages is the one pushing events to this struct,
51/// and a debugging thread may pull it to display recent activity.
52/// It is very important that pushing events is as efficient as possible: it should not be
53/// blocked by any debugging threads, and it should not do big allocations except in the
54/// very rare cases.
55///
56/// Note that this is not the entire state of instrumentation that is maintained by the
57/// actor thread; rather, it is the state that is necessarily shared between the actor
58/// thread and debugging threads.
59///
60/// Given that context, you may wonder why the design is so complex with atomics deep in
61/// the data structures; after all, can't we just have the actor thread aggregate stats
62/// locally and then give them (as an Arc) to the common data structure when a window is
63/// complete, and that way we won't need any atomics inside the window structures? The
64/// problem is that as soon as we the actor thread gives away the Arc, it has no chance
65/// of reusing it, so all of that memory would have to be freed later (by the actor thread
66/// most likely), and the next window would have to be reallocated from scratch. This would
67/// lead to constant allocations and deallocations. It might not sound much, but when this
68/// is done on every actor thread, it will be significant.
69pub struct InstrumentedThread {
70    pub thread_name: String,
71    /// The name of the actor (if any) that this thread is running. This is used
72    /// in metrics, to allow grouping by actor name for multithreaded actors.
73    pub actor_name: String,
74    /// The (possibly shared) queue instrumentation.
75    pub queue: Arc<InstrumentedQueue>,
76    /// Time when this thread was started, in nanoseconds since reference_instant.
77    pub started_time_ns: u64,
78    /// Registry of message types that are seen so far on this thread. It is used to
79    /// enable dense indexing of per-message-type stats in the InstrumentedWindowSummary.
80    pub message_type_registry: MessageTypeRegistry,
81    /// This is a fixed-size ring buffer of windows. Although each element is protected
82    /// by a RwLock, the only time we need to write-lock it is when we need to initialize
83    /// a new window (every time we advance to the next window). Doing this lock does not
84    /// cause any contention because the reader thread would not be reading that window.
85    ///
86    /// If there are N windows we keep, then the size of this vector is N + 1. This is
87    /// because the extra window is used for initialization. When we advance to the next
88    /// window, we first write-lock the next window and initialize it (meanwhile knowing
89    /// that any reader thread would not be touching that window at all because it is
90    /// the extra window doesn't hold useful data yet), and only after that do we advance
91    /// the current window index.
92    ///
93    /// All other operations (including when we record new events) only need a read lock,
94    /// meaning there should be no contention at all.
95    pub windows: Vec<RwLock<InstrumentedWindow>>,
96    /// This is a monotonically increasing index of the current window;
97    /// it does not wrap around. Rather, we calculate the actual index into the array
98    /// by modding by the array's size.
99    pub current_window_index: AtomicUsize,
100    /// The event that is currently being processed, if any, encoded with
101    /// encode_message_event().
102    pub active_event: AtomicU64,
103    pub active_event_start_ns: AtomicU64,
104}
105
106impl InstrumentedThread {
107    pub fn new(
108        thread_name: String,
109        actor_name: String,
110        queue: Arc<InstrumentedQueue>,
111        start_time: u64,
112    ) -> Self {
113        Self {
114            thread_name,
115            actor_name,
116            queue,
117            started_time_ns: start_time,
118            message_type_registry: MessageTypeRegistry::default(),
119            windows: (0..WINDOW_ARRAY_SIZE)
120                .map(|_| RwLock::new(InstrumentedWindow::new()))
121                .collect(),
122            current_window_index: AtomicUsize::new(0),
123            active_event: AtomicU64::new(0),
124            active_event_start_ns: AtomicU64::new(0),
125        }
126    }
127
128    pub fn start_event(&self, message_type_id: u32, timestamp_ns: u64, dequeue_time_ns: u64) {
129        let encoded_event = encode_message_event(message_type_id, true);
130        self.active_event_start_ns.store(timestamp_ns, Ordering::Relaxed);
131        // Release order here because this atomic embeds an "is present" bit, and this
132        // is used to synchronize the start timestamp stored above.
133        // Note that this isn't actually very sound because it's possible that the reader
134        // reads an active event but by the time it reads the timestamp, another event has
135        // started. This seems very unlikely though because the reader only needs to do two
136        // atomic reads whereas the writer has to do a bunch of other stuff. And we don't
137        // need this to be absolutely perfect anyway.
138        self.active_event.store(encoded_event, Ordering::Release);
139        let current_window_index = self.current_window_index.load(Ordering::Relaxed);
140        let window = &self.windows[current_window_index % WINDOW_ARRAY_SIZE];
141        let window = window.read();
142        window.events.push(encoded_event, timestamp_ns.saturating_sub(window.start_time_ns));
143        window.dequeue_summary.add_message_time(
144            current_window_index,
145            message_type_id,
146            dequeue_time_ns,
147        );
148    }
149
150    // Ends the currently active event, if any, and returns the elapsed time in nanoseconds.
151    pub fn end_event(&self, timestamp_ns: u64) -> u64 {
152        let active_event = self.active_event.load(Ordering::Relaxed);
153        let message_type_id = active_event as u32;
154        let start_timestamp = self.active_event_start_ns.load(Ordering::Relaxed);
155        let encoded_event = encode_message_event(message_type_id, false);
156        self.active_event.store(0, Ordering::Relaxed);
157        let current_window_index = self.current_window_index.load(Ordering::Relaxed);
158        let window = &self.windows[current_window_index % WINDOW_ARRAY_SIZE];
159        let window = window.read();
160        window.events.push(encoded_event, timestamp_ns.saturating_sub(window.start_time_ns));
161        let elapsed_ns = timestamp_ns.saturating_sub(start_timestamp.max(window.start_time_ns));
162        window.summary.add_message_time(current_window_index, message_type_id, elapsed_ns);
163        let total_elapsed_ns = timestamp_ns.saturating_sub(start_timestamp);
164        total_elapsed_ns
165    }
166
167    pub fn advance_window(&self, window_end_time_ns: u64) {
168        let current_window_index = self.current_window_index.load(Ordering::Relaxed);
169        let active_event = self.active_event.load(Ordering::Relaxed);
170        if active_event != 0 {
171            let active_event = active_event as u32;
172            let elapsed_in_window = window_end_time_ns
173                .saturating_sub(self.active_event_start_ns.load(Ordering::Relaxed))
174                .min(WINDOW_SIZE_NS);
175            self.windows[current_window_index % WINDOW_ARRAY_SIZE].read().summary.add_message_time(
176                current_window_index,
177                active_event,
178                elapsed_in_window,
179            );
180        }
181        let next_window_index = current_window_index + 1;
182        let next_window = &self.windows[next_window_index % WINDOW_ARRAY_SIZE];
183        let num_types = self.message_type_registry.types.read().len();
184        next_window.write().reinitialize(next_window_index, window_end_time_ns, num_types);
185        // Release ordering to indicate to any reader that the new window is ready for reading.
186        self.current_window_index.store(next_window_index, Ordering::Release);
187    }
188}
189
190/// This is the registry of message type indexes. It may be surprising to see that
191/// there is no map from the type name to the index. This is because the purpose of
192/// this registry is only to provide information on the mapping to the debug UI frontend.
193/// To figure out what message type ID corresponds to a message type, it is the actor
194/// thread's responsibility to remember and lookup the mapping - it is much easier there
195/// because it does not need any locking.
196#[derive(Default)]
197pub struct MessageTypeRegistry {
198    pub types: RwLock<Vec<String>>,
199}
200
201impl MessageTypeRegistry {
202    pub fn push_type(&self, type_name: String) {
203        let mut types = self.types.write();
204        types.push(type_name);
205    }
206}
207
208fn encode_message_event(message_type_id: u32, is_start: bool) -> u64 {
209    let mut event = message_type_id as u64;
210    if is_start {
211        event |= 1 << 32;
212    }
213    event
214}