1extern crate metrix;
2
3use std::sync::atomic::{AtomicUsize, Ordering};
4use std::thread;
5use std::time::{Duration, Instant};
6
7use metrix::cockpit::*;
8use metrix::driver::*;
9use metrix::instruments::polled::*;
10use metrix::instruments::*;
11use metrix::processor::*;
12use metrix::snapshot::*;
13use metrix::*;
14
15#[derive(Clone, PartialEq, Eq)]
16enum FooLabel {
17 A,
18 B,
19}
20
21#[derive(Clone, PartialEq, Eq)]
22enum BarLabel {
23 A,
24 B,
25 C,
26}
27
28struct PolledCounter {
29 counter: AtomicUsize,
30}
31
32impl PolledCounter {
33 pub fn new() -> PolledCounter {
34 PolledCounter {
35 counter: AtomicUsize::new(0),
36 }
37 }
38}
39
40impl PutsSnapshot for PolledCounter {
41 fn put_snapshot(&self, into: &mut Snapshot, _descriptive: bool) {
42 let v = self.counter.fetch_add(1, Ordering::SeqCst);
43 into.items
44 .push(("polled_counter".into(), ItemKind::UInt(v as u64)));
45 }
46}
47
48fn create_foo_metrics() -> (TelemetryTransmitter<FooLabel>, ProcessorMount) {
49 let mut foo_a_panel = Panel::named(FooLabel::A, "foo_a_panel");
50 foo_a_panel.add_counter(Counter::new_with_defaults("foo_a_counter"));
51 let mut gauge = Gauge::new_with_defaults("foo_a_gauge");
52 gauge.set_title("title");
53 gauge.set_description("description");
54 foo_a_panel.add_gauge(gauge);
55 foo_a_panel.add_meter(Meter::new_with_defaults("foo_a_meter"));
56 foo_a_panel.add_histogram(Histogram::new_with_defaults("foo_a_histogram"));
57 foo_a_panel.set_title("foo_1_panel_title");
58 foo_a_panel.set_description("foo_a_panel_description");
59
60 let mut foo_b_panel = Panel::new(FooLabel::B);
61 foo_b_panel.add_counter(Counter::new_with_defaults("foo_b_counter"));
62 let mut gauge = Gauge::new_with_defaults("foo_b_gauge").tracking(15);
63 gauge.set_title("title");
64 gauge.set_description("description");
65 foo_b_panel.add_gauge(gauge);
66 foo_b_panel.add_meter(Meter::new_with_defaults("foo_b_meter"));
67 foo_b_panel.add_histogram(Histogram::new_with_defaults("foo_b_histogram"));
68 foo_b_panel.set_title("foo_b_panel_title");
69 foo_b_panel.set_description("foo_b_panel_description");
70
71 let polled_counter = PolledCounter::new();
72 let mut polled_instrument =
73 PollingInstrument::new_with_defaults("polled_instrument_1", polled_counter);
74 polled_instrument.set_title("The polled counter 1");
75 polled_instrument.set_description("A counter that is increased when a snapshot is polled");
76 foo_b_panel.add_snapshooter(polled_instrument);
77
78 let staircase_timer = StaircaseTimer::new("staircase");
79 foo_b_panel.add_instrument(staircase_timer);
80
81 let mut cockpit = Cockpit::new("foo_cockpit");
82 cockpit.add_panel(foo_a_panel);
83 cockpit.add_panel(foo_b_panel);
84 cockpit.set_title("foo_cockpit_title");
85 cockpit.set_description("foo_cockpit_description");
86
87 let (tx, mut processor) = TelemetryProcessor::new_pair("processor_foo");
88
89 processor.add_cockpit(cockpit);
90
91 let mut group_processor = ProcessorMount::default();
92 group_processor.add_processor(processor);
93
94 (tx, group_processor)
95}
96
97fn create_bar_metrics() -> (TelemetryTransmitter<BarLabel>, ProcessorMount) {
98 let mut bar_a_panel = Panel::named(BarLabel::A, "bar_a_panel")
99 .counter(Counter::new_with_defaults("bar_a_counter"));
100 bar_a_panel.add_gauge(Gauge::new_with_defaults("bar_a_gauge"));
101 bar_a_panel.add_meter(Meter::new_with_defaults("bar_a_meter"));
102 bar_a_panel.add_histogram(Histogram::new_with_defaults("bar_a_histogram"));
103
104 let mut bar_a_cockpit = Cockpit::without_name();
105 bar_a_cockpit.add_panel(bar_a_panel);
106
107 let mut bar_b_panel = Panel::new(BarLabel::B);
108 bar_b_panel.add_counter(Counter::new_with_defaults("bar_b_counter"));
109 bar_b_panel.add_gauge(Gauge::new_with_defaults("bar_b_gauge"));
110 bar_b_panel.add_meter(Meter::new_with_defaults("bar_b_meter"));
111 bar_b_panel.add_histogram(Histogram::new_with_defaults("bar_b_histogram"));
112
113 let mut bar_b_cockpit = Cockpit::new("bar_b_cockpit");
114 bar_b_cockpit.add_panel(bar_b_panel);
115
116 let mut bar_c_panel = Panel::named(BarLabel::C, "bar_c_panel");
117 bar_c_panel.add_counter(Counter::new_with_defaults("bar_c_counter"));
118 bar_c_panel.add_gauge(Gauge::new_with_defaults("bar_c_gauge"));
119 bar_c_panel.add_meter(Meter::new_with_defaults("bar_c_meter"));
120 bar_c_panel.add_histogram(Histogram::new_with_defaults("bar_c_histogram"));
121
122 let mut bar_c_cockpit = Cockpit::new("bar_c_cockpit");
123 bar_c_cockpit.add_panel(bar_c_panel);
124
125 let (tx, mut processor) = TelemetryProcessor::new_pair_without_name();
126
127 processor.add_cockpit(bar_a_cockpit);
128 processor.add_cockpit(bar_b_cockpit);
129 processor.add_cockpit(bar_c_cockpit);
130
131 let mut group_processor1 = ProcessorMount::default();
132 group_processor1.add_processor(processor);
133
134 let mut group_processor2 = ProcessorMount::default();
135 group_processor2.add_processor(group_processor1);
136 group_processor2.set_name("group_processor_2");
137
138 let polled_counter = PolledCounter::new();
139 let mut polled_instrument =
140 PollingInstrument::new_with_defaults("polled_instrument_2", polled_counter);
141 polled_instrument.set_title("The polled counter 2");
142 polled_instrument.set_description("A counter that is increased when a snapshot is polled");
143
144 group_processor2.add_snapshooter(polled_instrument);
145
146 (tx, group_processor2)
147}
148
149fn main() {
150 let builder = DriverBuilder::new("demo");
151 let mut driver = builder.build();
152 let (foo_transmitter, foo_processor) = create_foo_metrics();
156 let (bar_transmitter, bar_processor) = create_bar_metrics();
157
158 driver.add_processor(foo_processor);
159 driver.add_processor(bar_processor);
160
161 let polled_counter = PolledCounter::new();
162 let mut polled_instrument =
163 PollingInstrument::new_with_defaults("polled_instrument_3", polled_counter);
164 polled_instrument.set_title("The polled counter 3");
165 polled_instrument.set_description("A counter that is increased when a snapshot is polled");
166
167 driver.add_snapshooter(polled_instrument);
168
169 let start = Instant::now();
170
171 let handle1 = {
172 let foo_transmitter = foo_transmitter.clone();
173 let bar_transmitter = bar_transmitter.clone();
174
175 thread::spawn(move || {
176 for n in 0..5_000_000 {
177 foo_transmitter.observed_one_value(FooLabel::A, n, Instant::now());
178 bar_transmitter.measure_time(BarLabel::C, start);
179 }
180 })
181 };
182
183 let _ = driver.snapshot(true).unwrap();
185
186 let handle2 = {
187 let foo_transmitter = foo_transmitter;
188 let bar_transmitter = bar_transmitter.clone();
189
190 thread::spawn(move || {
191 for n in 0..5_000_000u64 {
192 foo_transmitter.observed_one_value(FooLabel::B, n, Instant::now());
193 bar_transmitter.observed_one_value(BarLabel::B, n * n, Instant::now());
194 }
195 })
196 };
197
198 let _ = driver.snapshot(true).unwrap();
200
201 let handle3 = {
202 let bar_transmitter = bar_transmitter;
203
204 thread::spawn(move || {
205 for i in 0..5_000_000 {
206 bar_transmitter.observed_one_value(BarLabel::A, i, Instant::now());
207 }
208 })
209 };
210
211 handle1.join().unwrap();
212 handle2.join().unwrap();
213 handle3.join().unwrap();
214
215 println!(
218 "Sending observations took {:?}. Sleeping 1 secs to collect remaining data. \
219 Depending on your machine you might see that not all metrics have a count \
220 of 5 million observations.",
221 start.elapsed()
222 );
223
224 thread::sleep(Duration::from_secs(1));
225
226 println!("\n\n\n=======================\n\n");
227
228 println!(
229 "Get snapshot. If it still blocks here there are still many messages to be processed..."
230 );
231
232 println!("\n\n\n=======================\n\n");
233
234 let snapshot = driver.snapshot(true).unwrap();
235
236 let mut config = JsonConfig::default();
237 config.pretty = Some(4);
238
239 println!("{:?}", snapshot);
240 println!("\n\n\n=======================\n\n");
241 println!("{}", snapshot.to_json(&config));
242}