InstrumentedThread

Struct InstrumentedThread 

Source
pub struct InstrumentedThread {
    pub thread_name: String,
    pub actor_name: String,
    pub queue: Arc<InstrumentedQueue>,
    pub started_time_ns: u64,
    pub message_type_registry: MessageTypeRegistry,
    pub windows: Vec<RwLock<InstrumentedWindow>>,
    pub current_window_index: AtomicUsize,
    pub active_event: AtomicU64,
    pub active_event_start_ns: AtomicU64,
}
Expand description

Tracks recent thread activity for the past number of windows. Each window is 500 milliseconds (configurable). The actor thread which handles messages is the one pushing events to this struct, and a debugging thread may pull it to display recent activity. It is very important that pushing events is as efficient as possible: it should not be blocked by any debugging threads, and it should not do big allocations except in the very rare cases.

Note that this is not the entire state of instrumentation that is maintained by the actor thread; rather, it is the state that is necessarily shared between the actor thread and debugging threads.

Given that context, you may wonder why the design is so complex with atomics deep in the data structures; after all, can’t we just have the actor thread aggregate stats locally and then give them (as an Arc) to the common data structure when a window is complete, and that way we won’t need any atomics inside the window structures? The problem is that as soon as we the actor thread gives away the Arc, it has no chance of reusing it, so all of that memory would have to be freed later (by the actor thread most likely), and the next window would have to be reallocated from scratch. This would lead to constant allocations and deallocations. It might not sound much, but when this is done on every actor thread, it will be significant.

Fields§

§thread_name: String§actor_name: String

The name of the actor (if any) that this thread is running. This is used in metrics, to allow grouping by actor name for multithreaded actors.

§queue: Arc<InstrumentedQueue>

The (possibly shared) queue instrumentation.

§started_time_ns: u64

Time when this thread was started, in nanoseconds since reference_instant.

§message_type_registry: MessageTypeRegistry

Registry of message types that are seen so far on this thread. It is used to enable dense indexing of per-message-type stats in the InstrumentedWindowSummary.

§windows: Vec<RwLock<InstrumentedWindow>>

This is a fixed-size ring buffer of windows. Although each element is protected by a RwLock, the only time we need to write-lock it is when we need to initialize a new window (every time we advance to the next window). Doing this lock does not cause any contention because the reader thread would not be reading that window.

If there are N windows we keep, then the size of this vector is N + 1. This is because the extra window is used for initialization. When we advance to the next window, we first write-lock the next window and initialize it (meanwhile knowing that any reader thread would not be touching that window at all because it is the extra window doesn’t hold useful data yet), and only after that do we advance the current window index.

All other operations (including when we record new events) only need a read lock, meaning there should be no contention at all.

§current_window_index: AtomicUsize

This is a monotonically increasing index of the current window; it does not wrap around. Rather, we calculate the actual index into the array by modding by the array’s size.

§active_event: AtomicU64

The event that is currently being processed, if any, encoded with encode_message_event().

§active_event_start_ns: AtomicU64

Implementations§

Source§

impl InstrumentedThread

Source

pub fn new( thread_name: String, actor_name: String, queue: Arc<InstrumentedQueue>, start_time: u64, ) -> Self

Source

pub fn start_event( &self, message_type_id: u32, timestamp_ns: u64, dequeue_time_ns: u64, )

Source

pub fn end_event(&self, timestamp_ns: u64) -> u64

Source

pub fn advance_window(&self, window_end_time_ns: u64)

Source§

impl InstrumentedThread

Source

pub fn to_view(&self, current_time_ns: u64) -> InstrumentedThreadView

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> FutureExt for T

Source§

fn with_context(self, otel_cx: Context) -> WithContext<Self>

Attaches the provided Context to this type, returning a WithContext wrapper. Read more
Source§

fn with_current_context(self) -> WithContext<Self>

Attaches the current Context to this type, returning a WithContext wrapper. Read more
Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<A, B> IntoMultiSender<B> for A
where B: MultiSenderFrom<A>,

Source§

fn as_multi_sender(self: &Arc<A>) -> B

Source§

fn into_multi_sender(self) -> B

Source§

impl<T> IntoRequest<T> for T

Source§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
Source§

impl<L> LayerExt<L> for L

Source§

fn named_layer<S>(&self, service: S) -> Layered<<L as Layer<S>>::Service, S>
where L: Layer<S>,

Applies the layer to a service and wraps it in Layered.
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T> SpanWrappedMessageExt for T

Source§

fn span_wrap(self) -> SpanWrapped<Self>

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more