pub struct InstrumentedThread {
pub thread_name: String,
pub actor_name: String,
pub queue: Arc<InstrumentedQueue>,
pub started_time_ns: u64,
pub message_type_registry: MessageTypeRegistry,
pub windows: Vec<RwLock<InstrumentedWindow>>,
pub current_window_index: AtomicUsize,
pub active_event: AtomicU64,
pub active_event_start_ns: AtomicU64,
}Expand description
Tracks recent thread activity for the past number of windows. Each window is 500 milliseconds (configurable). The actor thread which handles messages is the one pushing events to this struct, and a debugging thread may pull it to display recent activity. It is very important that pushing events is as efficient as possible: it should not be blocked by any debugging threads, and it should not do big allocations except in the very rare cases.
Note that this is not the entire state of instrumentation that is maintained by the actor thread; rather, it is the state that is necessarily shared between the actor thread and debugging threads.
Given that context, you may wonder why the design is so complex with atomics deep in the data structures; after all, can’t we just have the actor thread aggregate stats locally and then give them (as an Arc) to the common data structure when a window is complete, and that way we won’t need any atomics inside the window structures? The problem is that as soon as we the actor thread gives away the Arc, it has no chance of reusing it, so all of that memory would have to be freed later (by the actor thread most likely), and the next window would have to be reallocated from scratch. This would lead to constant allocations and deallocations. It might not sound much, but when this is done on every actor thread, it will be significant.
Fields§
§thread_name: String§actor_name: StringThe name of the actor (if any) that this thread is running. This is used in metrics, to allow grouping by actor name for multithreaded actors.
queue: Arc<InstrumentedQueue>The (possibly shared) queue instrumentation.
started_time_ns: u64Time when this thread was started, in nanoseconds since reference_instant.
message_type_registry: MessageTypeRegistryRegistry of message types that are seen so far on this thread. It is used to enable dense indexing of per-message-type stats in the InstrumentedWindowSummary.
windows: Vec<RwLock<InstrumentedWindow>>This is a fixed-size ring buffer of windows. Although each element is protected by a RwLock, the only time we need to write-lock it is when we need to initialize a new window (every time we advance to the next window). Doing this lock does not cause any contention because the reader thread would not be reading that window.
If there are N windows we keep, then the size of this vector is N + 1. This is because the extra window is used for initialization. When we advance to the next window, we first write-lock the next window and initialize it (meanwhile knowing that any reader thread would not be touching that window at all because it is the extra window doesn’t hold useful data yet), and only after that do we advance the current window index.
All other operations (including when we record new events) only need a read lock, meaning there should be no contention at all.
current_window_index: AtomicUsizeThis is a monotonically increasing index of the current window; it does not wrap around. Rather, we calculate the actual index into the array by modding by the array’s size.
active_event: AtomicU64The event that is currently being processed, if any, encoded with encode_message_event().
active_event_start_ns: AtomicU64Implementations§
Source§impl InstrumentedThread
impl InstrumentedThread
pub fn new( thread_name: String, actor_name: String, queue: Arc<InstrumentedQueue>, start_time: u64, ) -> Self
pub fn start_event( &self, message_type_id: u32, timestamp_ns: u64, dequeue_time_ns: u64, )
pub fn end_event(&self, timestamp_ns: u64) -> u64
pub fn advance_window(&self, window_end_time_ns: u64)
Source§impl InstrumentedThread
impl InstrumentedThread
pub fn to_view(&self, current_time_ns: u64) -> InstrumentedThreadView
Auto Trait Implementations§
impl !Freeze for InstrumentedThread
impl !RefUnwindSafe for InstrumentedThread
impl Send for InstrumentedThread
impl Sync for InstrumentedThread
impl Unpin for InstrumentedThread
impl !UnwindSafe for InstrumentedThread
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> FutureExt for T
impl<T> FutureExt for T
Source§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
Source§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<A, B> IntoMultiSender<B> for Awhere
B: MultiSenderFrom<A>,
impl<A, B> IntoMultiSender<B> for Awhere
B: MultiSenderFrom<A>,
fn as_multi_sender(self: &Arc<A>) -> B
fn into_multi_sender(self) -> B
Source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T in a tonic::Request