demo_run/
demo_run.rs

1extern crate metrix;
2
3use std::sync::atomic::{AtomicUsize, Ordering};
4use std::thread;
5use std::time::{Duration, Instant};
6
7use metrix::cockpit::*;
8use metrix::driver::*;
9use metrix::instruments::polled::*;
10use metrix::instruments::*;
11use metrix::processor::*;
12use metrix::snapshot::*;
13use metrix::*;
14
15#[derive(Clone, PartialEq, Eq)]
16enum FooLabel {
17    A,
18    B,
19}
20
21#[derive(Clone, PartialEq, Eq)]
22enum BarLabel {
23    A,
24    B,
25    C,
26}
27
28struct PolledCounter {
29    counter: AtomicUsize,
30}
31
32impl PolledCounter {
33    pub fn new() -> PolledCounter {
34        PolledCounter {
35            counter: AtomicUsize::new(0),
36        }
37    }
38}
39
40impl PutsSnapshot for PolledCounter {
41    fn put_snapshot(&self, into: &mut Snapshot, _descriptive: bool) {
42        let v = self.counter.fetch_add(1, Ordering::SeqCst);
43        into.items
44            .push(("polled_counter".into(), ItemKind::UInt(v as u64)));
45    }
46}
47
48fn create_foo_metrics() -> (TelemetryTransmitter<FooLabel>, ProcessorMount) {
49    let mut foo_a_panel = Panel::named(FooLabel::A, "foo_a_panel");
50    foo_a_panel.add_counter(Counter::new_with_defaults("foo_a_counter"));
51    let mut gauge = Gauge::new_with_defaults("foo_a_gauge");
52    gauge.set_title("title");
53    gauge.set_description("description");
54    foo_a_panel.add_gauge(gauge);
55    foo_a_panel.add_meter(Meter::new_with_defaults("foo_a_meter"));
56    foo_a_panel.add_histogram(Histogram::new_with_defaults("foo_a_histogram"));
57    foo_a_panel.set_title("foo_1_panel_title");
58    foo_a_panel.set_description("foo_a_panel_description");
59
60    let mut foo_b_panel = Panel::new(FooLabel::B);
61    foo_b_panel.add_counter(Counter::new_with_defaults("foo_b_counter"));
62    let mut gauge = Gauge::new_with_defaults("foo_b_gauge").tracking(15);
63    gauge.set_title("title");
64    gauge.set_description("description");
65    foo_b_panel.add_gauge(gauge);
66    foo_b_panel.add_meter(Meter::new_with_defaults("foo_b_meter"));
67    foo_b_panel.add_histogram(Histogram::new_with_defaults("foo_b_histogram"));
68    foo_b_panel.set_title("foo_b_panel_title");
69    foo_b_panel.set_description("foo_b_panel_description");
70
71    let polled_counter = PolledCounter::new();
72    let mut polled_instrument =
73        PollingInstrument::new_with_defaults("polled_instrument_1", polled_counter);
74    polled_instrument.set_title("The polled counter 1");
75    polled_instrument.set_description("A counter that is increased when a snapshot is polled");
76    foo_b_panel.add_snapshooter(polled_instrument);
77
78    let staircase_timer = StaircaseTimer::new("staircase");
79    foo_b_panel.add_instrument(staircase_timer);
80
81    let mut cockpit = Cockpit::new("foo_cockpit");
82    cockpit.add_panel(foo_a_panel);
83    cockpit.add_panel(foo_b_panel);
84    cockpit.set_title("foo_cockpit_title");
85    cockpit.set_description("foo_cockpit_description");
86
87    let (tx, mut processor) = TelemetryProcessor::new_pair("processor_foo");
88
89    processor.add_cockpit(cockpit);
90
91    let mut group_processor = ProcessorMount::default();
92    group_processor.add_processor(processor);
93
94    (tx, group_processor)
95}
96
97fn create_bar_metrics() -> (TelemetryTransmitter<BarLabel>, ProcessorMount) {
98    let mut bar_a_panel = Panel::named(BarLabel::A, "bar_a_panel")
99        .counter(Counter::new_with_defaults("bar_a_counter"));
100    bar_a_panel.add_gauge(Gauge::new_with_defaults("bar_a_gauge"));
101    bar_a_panel.add_meter(Meter::new_with_defaults("bar_a_meter"));
102    bar_a_panel.add_histogram(Histogram::new_with_defaults("bar_a_histogram"));
103
104    let mut bar_a_cockpit = Cockpit::without_name();
105    bar_a_cockpit.add_panel(bar_a_panel);
106
107    let mut bar_b_panel = Panel::new(BarLabel::B);
108    bar_b_panel.add_counter(Counter::new_with_defaults("bar_b_counter"));
109    bar_b_panel.add_gauge(Gauge::new_with_defaults("bar_b_gauge"));
110    bar_b_panel.add_meter(Meter::new_with_defaults("bar_b_meter"));
111    bar_b_panel.add_histogram(Histogram::new_with_defaults("bar_b_histogram"));
112
113    let mut bar_b_cockpit = Cockpit::new("bar_b_cockpit");
114    bar_b_cockpit.add_panel(bar_b_panel);
115
116    let mut bar_c_panel = Panel::named(BarLabel::C, "bar_c_panel");
117    bar_c_panel.add_counter(Counter::new_with_defaults("bar_c_counter"));
118    bar_c_panel.add_gauge(Gauge::new_with_defaults("bar_c_gauge"));
119    bar_c_panel.add_meter(Meter::new_with_defaults("bar_c_meter"));
120    bar_c_panel.add_histogram(Histogram::new_with_defaults("bar_c_histogram"));
121
122    let mut bar_c_cockpit = Cockpit::new("bar_c_cockpit");
123    bar_c_cockpit.add_panel(bar_c_panel);
124
125    let (tx, mut processor) = TelemetryProcessor::new_pair_without_name();
126
127    processor.add_cockpit(bar_a_cockpit);
128    processor.add_cockpit(bar_b_cockpit);
129    processor.add_cockpit(bar_c_cockpit);
130
131    let mut group_processor1 = ProcessorMount::default();
132    group_processor1.add_processor(processor);
133
134    let mut group_processor2 = ProcessorMount::default();
135    group_processor2.add_processor(group_processor1);
136    group_processor2.set_name("group_processor_2");
137
138    let polled_counter = PolledCounter::new();
139    let mut polled_instrument =
140        PollingInstrument::new_with_defaults("polled_instrument_2", polled_counter);
141    polled_instrument.set_title("The polled counter 2");
142    polled_instrument.set_description("A counter that is increased when a snapshot is polled");
143
144    group_processor2.add_snapshooter(polled_instrument);
145
146    (tx, group_processor2)
147}
148
149fn main() {
150    let builder = DriverBuilder::new("demo");
151    let mut driver = builder.build();
152    //driver.change_processing_stragtegy(ProcessingStrategy::DropAll);
153    //driver.pause();
154
155    let (foo_transmitter, foo_processor) = create_foo_metrics();
156    let (bar_transmitter, bar_processor) = create_bar_metrics();
157
158    driver.add_processor(foo_processor);
159    driver.add_processor(bar_processor);
160
161    let polled_counter = PolledCounter::new();
162    let mut polled_instrument =
163        PollingInstrument::new_with_defaults("polled_instrument_3", polled_counter);
164    polled_instrument.set_title("The polled counter 3");
165    polled_instrument.set_description("A counter that is increased when a snapshot is polled");
166
167    driver.add_snapshooter(polled_instrument);
168
169    let start = Instant::now();
170
171    let handle1 = {
172        let foo_transmitter = foo_transmitter.clone();
173        let bar_transmitter = bar_transmitter.clone();
174
175        thread::spawn(move || {
176            for n in 0..5_000_000 {
177                foo_transmitter.observed_one_value(FooLabel::A, n, Instant::now());
178                bar_transmitter.measure_time(BarLabel::C, start);
179            }
180        })
181    };
182
183    // Poll a snapshot for the counter
184    let _ = driver.snapshot(true).unwrap();
185
186    let handle2 = {
187        let foo_transmitter = foo_transmitter;
188        let bar_transmitter = bar_transmitter.clone();
189
190        thread::spawn(move || {
191            for n in 0..5_000_000u64 {
192                foo_transmitter.observed_one_value(FooLabel::B, n, Instant::now());
193                bar_transmitter.observed_one_value(BarLabel::B, n * n, Instant::now());
194            }
195        })
196    };
197
198    // Poll a snapshot for the counter
199    let _ = driver.snapshot(true).unwrap();
200
201    let handle3 = {
202        let bar_transmitter = bar_transmitter;
203
204        thread::spawn(move || {
205            for i in 0..5_000_000 {
206                bar_transmitter.observed_one_value(BarLabel::A, i, Instant::now());
207            }
208        })
209    };
210
211    handle1.join().unwrap();
212    handle2.join().unwrap();
213    handle3.join().unwrap();
214
215    //driver.resume();
216
217    println!(
218        "Sending observations took {:?}. Sleeping 1 secs to collect remaining data. \
219         Depending on your machine you might see that not all metrics have a count \
220         of 5 million observations.",
221        start.elapsed()
222    );
223
224    thread::sleep(Duration::from_secs(1));
225
226    println!("\n\n\n=======================\n\n");
227
228    println!(
229        "Get snapshot. If it still blocks here there are still many messages to be processed..."
230    );
231
232    println!("\n\n\n=======================\n\n");
233
234    let snapshot = driver.snapshot(true).unwrap();
235
236    let mut config = JsonConfig::default();
237    config.pretty = Some(4);
238
239    println!("{:?}", snapshot);
240    println!("\n\n\n=======================\n\n");
241    println!("{}", snapshot.to_json(&config));
242}