TelemetryProcessor

Struct TelemetryProcessor 

Source
pub struct TelemetryProcessor<L> { /* private fields */ }
Expand description

The counterpart of the TelemetryTransmitter. It receives the Observations and other messages and processes them.

A TelemetryProcessor is tied to a specific kind of label which is used to determine which metrics are triggered.

The TelemetryProcessor<L> owns a Receiver for TelemetryMessage<L>.

Implementations§

Source§

impl<L> TelemetryProcessor<L>
where L: Clone + Eq + Send + 'static,

Source

pub fn new_pair<T: Into<String>>( name: T, ) -> (TelemetryTransmitter<L>, TelemetryProcessor<L>)

Creates a TelemetryTransmitter and the corresponding TelemetryProcessor

The name will cause a grouping in the Snapshot.

It is important that the returned TelemetryProcessor gets mounted on a driver soon since otherwise the internal queue will get flooded with unprocessed observations

Examples found in repository?
examples/demo_run.rs (line 87)
48fn create_foo_metrics() -> (TelemetryTransmitter<FooLabel>, ProcessorMount) {
49    let mut foo_a_panel = Panel::named(FooLabel::A, "foo_a_panel");
50    foo_a_panel.add_counter(Counter::new_with_defaults("foo_a_counter"));
51    let mut gauge = Gauge::new_with_defaults("foo_a_gauge");
52    gauge.set_title("title");
53    gauge.set_description("description");
54    foo_a_panel.add_gauge(gauge);
55    foo_a_panel.add_meter(Meter::new_with_defaults("foo_a_meter"));
56    foo_a_panel.add_histogram(Histogram::new_with_defaults("foo_a_histogram"));
57    foo_a_panel.set_title("foo_1_panel_title");
58    foo_a_panel.set_description("foo_a_panel_description");
59
60    let mut foo_b_panel = Panel::new(FooLabel::B);
61    foo_b_panel.add_counter(Counter::new_with_defaults("foo_b_counter"));
62    let mut gauge = Gauge::new_with_defaults("foo_b_gauge").tracking(15);
63    gauge.set_title("title");
64    gauge.set_description("description");
65    foo_b_panel.add_gauge(gauge);
66    foo_b_panel.add_meter(Meter::new_with_defaults("foo_b_meter"));
67    foo_b_panel.add_histogram(Histogram::new_with_defaults("foo_b_histogram"));
68    foo_b_panel.set_title("foo_b_panel_title");
69    foo_b_panel.set_description("foo_b_panel_description");
70
71    let polled_counter = PolledCounter::new();
72    let mut polled_instrument =
73        PollingInstrument::new_with_defaults("polled_instrument_1", polled_counter);
74    polled_instrument.set_title("The polled counter 1");
75    polled_instrument.set_description("A counter that is increased when a snapshot is polled");
76    foo_b_panel.add_snapshooter(polled_instrument);
77
78    let staircase_timer = StaircaseTimer::new("staircase");
79    foo_b_panel.add_instrument(staircase_timer);
80
81    let mut cockpit = Cockpit::new("foo_cockpit");
82    cockpit.add_panel(foo_a_panel);
83    cockpit.add_panel(foo_b_panel);
84    cockpit.set_title("foo_cockpit_title");
85    cockpit.set_description("foo_cockpit_description");
86
87    let (tx, mut processor) = TelemetryProcessor::new_pair("processor_foo");
88
89    processor.add_cockpit(cockpit);
90
91    let mut group_processor = ProcessorMount::default();
92    group_processor.add_processor(processor);
93
94    (tx, group_processor)
95}
Source

pub fn new_pair_bounded<T: Into<String>>( name: T, cap: usize, block_on_full: bool, ) -> (TelemetryTransmitter<L>, TelemetryProcessor<L>)

Creates a TelemetryTransmitter and the corresponding TelemetryProcessor

The name will cause a grouping in the Snapshot.

The message queue will be bound to cap elements. If block_on_full is false messages will be dropped if cap elements are in the queue.

Source

pub fn new_pair_without_name() -> (TelemetryTransmitter<L>, TelemetryProcessor<L>)

Creates a TelemetryTransmitter and the corresponding TelemetryProcessor

No grouping will occur unless the name is set.

It is important that the returned TelemetryProcessor gets mounted on a driver soon since otherwise the internal queue will get flooded with unprocessed observations

Examples found in repository?
examples/demo_run.rs (line 125)
97fn create_bar_metrics() -> (TelemetryTransmitter<BarLabel>, ProcessorMount) {
98    let mut bar_a_panel = Panel::named(BarLabel::A, "bar_a_panel")
99        .counter(Counter::new_with_defaults("bar_a_counter"));
100    bar_a_panel.add_gauge(Gauge::new_with_defaults("bar_a_gauge"));
101    bar_a_panel.add_meter(Meter::new_with_defaults("bar_a_meter"));
102    bar_a_panel.add_histogram(Histogram::new_with_defaults("bar_a_histogram"));
103
104    let mut bar_a_cockpit = Cockpit::without_name();
105    bar_a_cockpit.add_panel(bar_a_panel);
106
107    let mut bar_b_panel = Panel::new(BarLabel::B);
108    bar_b_panel.add_counter(Counter::new_with_defaults("bar_b_counter"));
109    bar_b_panel.add_gauge(Gauge::new_with_defaults("bar_b_gauge"));
110    bar_b_panel.add_meter(Meter::new_with_defaults("bar_b_meter"));
111    bar_b_panel.add_histogram(Histogram::new_with_defaults("bar_b_histogram"));
112
113    let mut bar_b_cockpit = Cockpit::new("bar_b_cockpit");
114    bar_b_cockpit.add_panel(bar_b_panel);
115
116    let mut bar_c_panel = Panel::named(BarLabel::C, "bar_c_panel");
117    bar_c_panel.add_counter(Counter::new_with_defaults("bar_c_counter"));
118    bar_c_panel.add_gauge(Gauge::new_with_defaults("bar_c_gauge"));
119    bar_c_panel.add_meter(Meter::new_with_defaults("bar_c_meter"));
120    bar_c_panel.add_histogram(Histogram::new_with_defaults("bar_c_histogram"));
121
122    let mut bar_c_cockpit = Cockpit::new("bar_c_cockpit");
123    bar_c_cockpit.add_panel(bar_c_panel);
124
125    let (tx, mut processor) = TelemetryProcessor::new_pair_without_name();
126
127    processor.add_cockpit(bar_a_cockpit);
128    processor.add_cockpit(bar_b_cockpit);
129    processor.add_cockpit(bar_c_cockpit);
130
131    let mut group_processor1 = ProcessorMount::default();
132    group_processor1.add_processor(processor);
133
134    let mut group_processor2 = ProcessorMount::default();
135    group_processor2.add_processor(group_processor1);
136    group_processor2.set_name("group_processor_2");
137
138    let polled_counter = PolledCounter::new();
139    let mut polled_instrument =
140        PollingInstrument::new_with_defaults("polled_instrument_2", polled_counter);
141    polled_instrument.set_title("The polled counter 2");
142    polled_instrument.set_description("A counter that is increased when a snapshot is polled");
143
144    group_processor2.add_snapshooter(polled_instrument);
145
146    (tx, group_processor2)
147}
Source

pub fn new_pair_bounded_without_name( cap: usize, block_on_full: bool, ) -> (TelemetryTransmitter<L>, TelemetryProcessor<L>)

Creates a TelemetryTransmitter and the corresponding TelemetryProcessor

The name will cause a grouping in the Snapshot.

The message queue will be bound to cap elements. If block_on_full is false messages will be dropped if cap elements are in the queue.

Source

pub fn add_cockpit(&mut self, cockpit: Cockpit<L>)

Add a Cockpit

If the cockpit has a name and another cockpit with the same name is already present the cockpit will not be added.

Examples found in repository?
examples/demo_run.rs (line 89)
48fn create_foo_metrics() -> (TelemetryTransmitter<FooLabel>, ProcessorMount) {
49    let mut foo_a_panel = Panel::named(FooLabel::A, "foo_a_panel");
50    foo_a_panel.add_counter(Counter::new_with_defaults("foo_a_counter"));
51    let mut gauge = Gauge::new_with_defaults("foo_a_gauge");
52    gauge.set_title("title");
53    gauge.set_description("description");
54    foo_a_panel.add_gauge(gauge);
55    foo_a_panel.add_meter(Meter::new_with_defaults("foo_a_meter"));
56    foo_a_panel.add_histogram(Histogram::new_with_defaults("foo_a_histogram"));
57    foo_a_panel.set_title("foo_1_panel_title");
58    foo_a_panel.set_description("foo_a_panel_description");
59
60    let mut foo_b_panel = Panel::new(FooLabel::B);
61    foo_b_panel.add_counter(Counter::new_with_defaults("foo_b_counter"));
62    let mut gauge = Gauge::new_with_defaults("foo_b_gauge").tracking(15);
63    gauge.set_title("title");
64    gauge.set_description("description");
65    foo_b_panel.add_gauge(gauge);
66    foo_b_panel.add_meter(Meter::new_with_defaults("foo_b_meter"));
67    foo_b_panel.add_histogram(Histogram::new_with_defaults("foo_b_histogram"));
68    foo_b_panel.set_title("foo_b_panel_title");
69    foo_b_panel.set_description("foo_b_panel_description");
70
71    let polled_counter = PolledCounter::new();
72    let mut polled_instrument =
73        PollingInstrument::new_with_defaults("polled_instrument_1", polled_counter);
74    polled_instrument.set_title("The polled counter 1");
75    polled_instrument.set_description("A counter that is increased when a snapshot is polled");
76    foo_b_panel.add_snapshooter(polled_instrument);
77
78    let staircase_timer = StaircaseTimer::new("staircase");
79    foo_b_panel.add_instrument(staircase_timer);
80
81    let mut cockpit = Cockpit::new("foo_cockpit");
82    cockpit.add_panel(foo_a_panel);
83    cockpit.add_panel(foo_b_panel);
84    cockpit.set_title("foo_cockpit_title");
85    cockpit.set_description("foo_cockpit_description");
86
87    let (tx, mut processor) = TelemetryProcessor::new_pair("processor_foo");
88
89    processor.add_cockpit(cockpit);
90
91    let mut group_processor = ProcessorMount::default();
92    group_processor.add_processor(processor);
93
94    (tx, group_processor)
95}
96
97fn create_bar_metrics() -> (TelemetryTransmitter<BarLabel>, ProcessorMount) {
98    let mut bar_a_panel = Panel::named(BarLabel::A, "bar_a_panel")
99        .counter(Counter::new_with_defaults("bar_a_counter"));
100    bar_a_panel.add_gauge(Gauge::new_with_defaults("bar_a_gauge"));
101    bar_a_panel.add_meter(Meter::new_with_defaults("bar_a_meter"));
102    bar_a_panel.add_histogram(Histogram::new_with_defaults("bar_a_histogram"));
103
104    let mut bar_a_cockpit = Cockpit::without_name();
105    bar_a_cockpit.add_panel(bar_a_panel);
106
107    let mut bar_b_panel = Panel::new(BarLabel::B);
108    bar_b_panel.add_counter(Counter::new_with_defaults("bar_b_counter"));
109    bar_b_panel.add_gauge(Gauge::new_with_defaults("bar_b_gauge"));
110    bar_b_panel.add_meter(Meter::new_with_defaults("bar_b_meter"));
111    bar_b_panel.add_histogram(Histogram::new_with_defaults("bar_b_histogram"));
112
113    let mut bar_b_cockpit = Cockpit::new("bar_b_cockpit");
114    bar_b_cockpit.add_panel(bar_b_panel);
115
116    let mut bar_c_panel = Panel::named(BarLabel::C, "bar_c_panel");
117    bar_c_panel.add_counter(Counter::new_with_defaults("bar_c_counter"));
118    bar_c_panel.add_gauge(Gauge::new_with_defaults("bar_c_gauge"));
119    bar_c_panel.add_meter(Meter::new_with_defaults("bar_c_meter"));
120    bar_c_panel.add_histogram(Histogram::new_with_defaults("bar_c_histogram"));
121
122    let mut bar_c_cockpit = Cockpit::new("bar_c_cockpit");
123    bar_c_cockpit.add_panel(bar_c_panel);
124
125    let (tx, mut processor) = TelemetryProcessor::new_pair_without_name();
126
127    processor.add_cockpit(bar_a_cockpit);
128    processor.add_cockpit(bar_b_cockpit);
129    processor.add_cockpit(bar_c_cockpit);
130
131    let mut group_processor1 = ProcessorMount::default();
132    group_processor1.add_processor(processor);
133
134    let mut group_processor2 = ProcessorMount::default();
135    group_processor2.add_processor(group_processor1);
136    group_processor2.set_name("group_processor_2");
137
138    let polled_counter = PolledCounter::new();
139    let mut polled_instrument =
140        PollingInstrument::new_with_defaults("polled_instrument_2", polled_counter);
141    polled_instrument.set_title("The polled counter 2");
142    polled_instrument.set_description("A counter that is increased when a snapshot is polled");
143
144    group_processor2.add_snapshooter(polled_instrument);
145
146    (tx, group_processor2)
147}
Source

pub fn cockpit(self, cockpit: Cockpit<L>) -> Self

Add a Cockpit

If the cockpit has a name and another cockpit with the same name is already present the cockpit will not be added.

Source

pub fn get_cockpits(&self) -> Vec<&Cockpit<L>>

Returns all contained Cockpits

Source

pub fn add_handler<T: HandlesObservations<Label = L>>(&mut self, handler: T)

Add a (custom) handler for Observations.

Source

pub fn handler<T: HandlesObservations<Label = L>>(self, handler: T) -> Self

Add a (custom) handler for Observations.

Source

pub fn get_handlers(&self) -> Vec<&dyn HandlesObservations<Label = L>>

Returns all the handlers

Source

pub fn add_snapshooter<S: PutsSnapshot>(&mut self, snapshooter: S)

Add a snapshooter that simply creates some Snapshot defined by it’s internal logic. Usually it polls something when a Snapshot is requested.

Source

pub fn snapshooter<S: PutsSnapshot>(self, snapshooter: S) -> Self

Add a snapshooter that simply creates some Snapshot defined by it’s internal logic. Usually it polls something when a Snapshot is requested.

Source

pub fn get_snapshooters(&self) -> Vec<&dyn PutsSnapshot>

Source

pub fn get_name(&self) -> Option<&str>

Source

pub fn set_name<T: Into<String>>(&mut self, name: T)

Sets the name which will cause a grouoing in the Snapshot

Source

pub fn set_inactivity_limit(&mut self, limit: Duration)

Sets the maximum amount of time this processor may be inactive until no more snapshots are taken

Source

pub fn inactivity_limit(self, limit: Duration) -> Self

Sets the maximum amount of time this processor may be inactive until no more snapshots are taken

Source

pub fn set_show_activity_state(&mut self, show: bool)

Set whether to show if the processor is inactive or not if inactivity_limit is set.

The default is true. Only has an effect if a inactivity_limit is set.

Source

pub fn show_activity_state(self, show: bool) -> Self

Set whether to show if the processor is inactive or not if inactivity_limit is set.

The default is true. Only has an effect if a inactivity_limit is set.

Trait Implementations§

Source§

impl<L> Descriptive for TelemetryProcessor<L>

Source§

fn title(&self) -> Option<&str>

Source§

fn description(&self) -> Option<&str>

Source§

impl<L> ProcessesTelemetryMessages for TelemetryProcessor<L>
where L: Clone + Eq + Send + 'static,

Source§

fn process( &mut self, max: usize, strategy: ProcessingStrategy, ) -> ProcessingOutcome

Receive and handle pending operations
Source§

impl<L> PutsSnapshot for TelemetryProcessor<L>
where L: Clone + Eq + Send + 'static,

Source§

fn put_snapshot(&self, into: &mut Snapshot, descriptive: bool)

Puts the current snapshot values into the given Snapshot thereby following the guidelines of PutsSnapshot.

Auto Trait Implementations§

§

impl<L> Freeze for TelemetryProcessor<L>

§

impl<L> !RefUnwindSafe for TelemetryProcessor<L>

§

impl<L> Send for TelemetryProcessor<L>
where L: Send,

§

impl<L> !Sync for TelemetryProcessor<L>

§

impl<L> Unpin for TelemetryProcessor<L>
where L: Unpin,

§

impl<L> !UnwindSafe for TelemetryProcessor<L>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V