pub struct TelemetryDriver { /* private fields */ }
Expand description
Triggers registered ProcessesTelemetryMessages
to
poll for messages.
Runs its own background thread. The thread stops once this struct is dropped.
A TelemetryDriver
can be ‘mounted’ into the hierarchy.
If done so, it will still poll its children on its own thread
independently.
§Optional Metrics
The driver can be configured to collect metrics on its own activities.
The metrics will be added to all snapshots
under a field named _metrix
which contains the
following fields:
-
collections_per_second
: The number of observation collection runs done per second -
collection_times_us
: A histogram of the time each observation collection took in microseconds. -
observations_processed_per_second
: The number of observations processed per second. -
observations_processed_per_collection
: A histogram of the number of observations processed during each run -
observations_dropped_per_second
: The number of observations dropped per second. See alsomax_observation_age
. -
observations_dropped_per_collection
: A histogram of the number of observations dropped during each run. See alsomax_observation_age
. -
snapshots_per_second
: The number of snapshots taken per second. -
snapshots_times_us
: A histogram of the times it took to take a snapshot in microseconds -
dropped_observations_alarm
: Will betrue
if observations have been dropped. Will by default staytrue
for 60 seconds once triggered. -
inactivity_alarm
: Will betrue
if no observations have been made for a certain amount of time. The default is 60 seconds.
Implementations§
Source§impl TelemetryDriver
impl TelemetryDriver
Sourcepub fn new(
name: Option<String>,
title: Option<String>,
description: Option<String>,
processing_strategy: ProcessingStrategy,
with_driver_metrics: bool,
) -> TelemetryDriver
pub fn new( name: Option<String>, title: Option<String>, description: Option<String>, processing_strategy: ProcessingStrategy, with_driver_metrics: bool, ) -> TelemetryDriver
Creates a new TelemetryDriver
.
max_observation_age
is the maximum age of an Observation
to be taken into account. This is determined by the timestamp
field of an Observation
. Observations
that are too old are simply
dropped. The default is 60 seconds.
Sourcepub fn change_processing_stragtegy(&self, strategy: ProcessingStrategy)
pub fn change_processing_stragtegy(&self, strategy: ProcessingStrategy)
Changes the ProcessingStrategy
Sourcepub fn snapshot(&self, descriptive: bool) -> Result<Snapshot, GetSnapshotError>
pub fn snapshot(&self, descriptive: bool) -> Result<Snapshot, GetSnapshotError>
Examples found in repository?
149fn main() {
150 let builder = DriverBuilder::new("demo");
151 let mut driver = builder.build();
152 //driver.change_processing_stragtegy(ProcessingStrategy::DropAll);
153 //driver.pause();
154
155 let (foo_transmitter, foo_processor) = create_foo_metrics();
156 let (bar_transmitter, bar_processor) = create_bar_metrics();
157
158 driver.add_processor(foo_processor);
159 driver.add_processor(bar_processor);
160
161 let polled_counter = PolledCounter::new();
162 let mut polled_instrument =
163 PollingInstrument::new_with_defaults("polled_instrument_3", polled_counter);
164 polled_instrument.set_title("The polled counter 3");
165 polled_instrument.set_description("A counter that is increased when a snapshot is polled");
166
167 driver.add_snapshooter(polled_instrument);
168
169 let start = Instant::now();
170
171 let handle1 = {
172 let foo_transmitter = foo_transmitter.clone();
173 let bar_transmitter = bar_transmitter.clone();
174
175 thread::spawn(move || {
176 for n in 0..5_000_000 {
177 foo_transmitter.observed_one_value(FooLabel::A, n, Instant::now());
178 bar_transmitter.measure_time(BarLabel::C, start);
179 }
180 })
181 };
182
183 // Poll a snapshot for the counter
184 let _ = driver.snapshot(true).unwrap();
185
186 let handle2 = {
187 let foo_transmitter = foo_transmitter;
188 let bar_transmitter = bar_transmitter.clone();
189
190 thread::spawn(move || {
191 for n in 0..5_000_000u64 {
192 foo_transmitter.observed_one_value(FooLabel::B, n, Instant::now());
193 bar_transmitter.observed_one_value(BarLabel::B, n * n, Instant::now());
194 }
195 })
196 };
197
198 // Poll a snapshot for the counter
199 let _ = driver.snapshot(true).unwrap();
200
201 let handle3 = {
202 let bar_transmitter = bar_transmitter;
203
204 thread::spawn(move || {
205 for i in 0..5_000_000 {
206 bar_transmitter.observed_one_value(BarLabel::A, i, Instant::now());
207 }
208 })
209 };
210
211 handle1.join().unwrap();
212 handle2.join().unwrap();
213 handle3.join().unwrap();
214
215 //driver.resume();
216
217 println!(
218 "Sending observations took {:?}. Sleeping 1 secs to collect remaining data. \
219 Depending on your machine you might see that not all metrics have a count \
220 of 5 million observations.",
221 start.elapsed()
222 );
223
224 thread::sleep(Duration::from_secs(1));
225
226 println!("\n\n\n=======================\n\n");
227
228 println!(
229 "Get snapshot. If it still blocks here there are still many messages to be processed..."
230 );
231
232 println!("\n\n\n=======================\n\n");
233
234 let snapshot = driver.snapshot(true).unwrap();
235
236 let mut config = JsonConfig::default();
237 config.pretty = Some(4);
238
239 println!("{:?}", snapshot);
240 println!("\n\n\n=======================\n\n");
241 println!("{}", snapshot.to_json(&config));
242}
pub fn snapshot_async( &self, descriptive: bool, ) -> impl Future<Output = Result<Snapshot, GetSnapshotError>> + Send + 'static
Trait Implementations§
Source§impl AggregatesProcessors for TelemetryDriver
impl AggregatesProcessors for TelemetryDriver
Source§fn add_processor<P: ProcessesTelemetryMessages>(&mut self, processor: P)
fn add_processor<P: ProcessesTelemetryMessages>(&mut self, processor: P)
Source§fn add_snapshooter<S: PutsSnapshot>(&mut self, snapshooter: S)
fn add_snapshooter<S: PutsSnapshot>(&mut self, snapshooter: S)
fn attached_mount(&mut self, mount: ProcessorMount) -> AttachedMount
Source§impl Clone for TelemetryDriver
impl Clone for TelemetryDriver
Source§fn clone(&self) -> TelemetryDriver
fn clone(&self) -> TelemetryDriver
1.0.0 · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source
. Read moreSource§impl Default for TelemetryDriver
impl Default for TelemetryDriver
Source§fn default() -> TelemetryDriver
fn default() -> TelemetryDriver
Source§impl Descriptive for TelemetryDriver
impl Descriptive for TelemetryDriver
Source§impl ProcessesTelemetryMessages for TelemetryDriver
impl ProcessesTelemetryMessages for TelemetryDriver
Source§fn process(
&mut self,
_max: usize,
_strategy: ProcessingStrategy,
) -> ProcessingOutcome
fn process( &mut self, _max: usize, _strategy: ProcessingStrategy, ) -> ProcessingOutcome
Receive and handle pending operations
Source§impl PutsSnapshot for TelemetryDriver
impl PutsSnapshot for TelemetryDriver
Source§fn put_snapshot(&self, into: &mut Snapshot, descriptive: bool)
fn put_snapshot(&self, into: &mut Snapshot, descriptive: bool)
Snapshot
thereby
following the guidelines of PutsSnapshot
.