1use std::{borrow::Cow, sync::RwLock};
30
31use measured::{
32 FixedCardinalityLabel, LabelGroup, MetricGroup,
33 label::{ComposedGroup, LabelGroupVisitor, LabelName, LabelValue, LabelVisitor, NoLabels},
34 metric::{
35 MetricEncoding,
36 counter::CounterState,
37 gauge::{FloatGaugeState, GaugeState},
38 group::Encoding,
39 name::MetricName,
40 },
41};
42use tokio::runtime::RuntimeMetrics;
43
44pub struct NamedRuntimesCollector {
46 runtimes: RwLock<Vec<RuntimeCollector>>,
47}
48
49impl NamedRuntimesCollector {
50 pub fn new() -> Self {
52 Self {
53 runtimes: RwLock::new(vec![]),
54 }
55 }
56
57 pub fn add(&self, rt: RuntimeMetrics, name: impl Into<Cow<'static, str>>) {
59 self.runtimes
60 .write()
61 .unwrap()
62 .push(RuntimeCollector::new(rt).with_name(name))
63 }
64
65 pub fn add_current(&self, name: impl Into<Cow<'static, str>>) {
74 self.add(tokio::runtime::Handle::current().metrics(), name);
75 }
76}
77
78impl Default for NamedRuntimesCollector {
79 fn default() -> Self {
80 Self::new()
81 }
82}
83
84impl<Enc: Encoding> MetricGroup<Enc> for NamedRuntimesCollector
85where
86 CounterState: MetricEncoding<Enc>,
87 GaugeState: MetricEncoding<Enc>,
88 FloatGaugeState: MetricEncoding<Enc>,
89{
90 fn collect_group_into(&self, enc: &mut Enc) -> Result<(), <Enc as Encoding>::Err> {
91 collect(&self.runtimes.read().unwrap(), enc)
92 }
93}
94
95pub struct RuntimeCollector {
97 runtime: RuntimeMetrics,
98 name: RuntimeName,
99}
100
101impl RuntimeCollector {
102 pub fn new(runtime: RuntimeMetrics) -> Self {
104 RuntimeCollector {
105 runtime,
106 name: RuntimeName { name: None },
107 }
108 }
109
110 pub fn current() -> Self {
119 RuntimeCollector::new(tokio::runtime::Handle::current().metrics())
120 }
121
122 pub fn with_name(self, name: impl Into<Cow<'static, str>>) -> Self {
123 Self {
124 runtime: self.runtime,
125 name: RuntimeName {
126 name: Some(name.into()),
127 },
128 }
129 }
130}
131
132#[cfg(tokio_unstable)]
133fn histogram_le(rt: &RuntimeMetrics, bucket: usize) -> HistogramLabelLe {
134 let le = rt.poll_time_histogram_bucket_range(bucket).end;
135 let le = if le == std::time::Duration::from_nanos(u64::MAX) {
136 f64::INFINITY
137 } else {
138 le.as_secs_f64()
139 };
140 HistogramLabelLe { le }
141}
142
143fn collect<Enc: Encoding>(runtimes: &[RuntimeCollector], enc: &mut Enc) -> Result<(), Enc::Err>
144where
145 CounterState: MetricEncoding<Enc>,
146 GaugeState: MetricEncoding<Enc>,
147 FloatGaugeState: MetricEncoding<Enc>,
148{
149 macro_rules! metric {
150 ($name:literal, $help:literal, |$rt:ident| $expr:expr) => {{
151 #![allow(unused_macros)]
152 const NAME: &MetricName = MetricName::from_str($name);
153 enc.write_help(NAME, $help)?;
154 for rt in runtimes {
155 let rt_name = &rt.name;
156 macro_rules! write_counter {
157 ($labels:expr, $val:expr) => {
158 measured::metric::counter::write_counter(
159 enc,
160 NAME,
161 ComposedGroup(rt_name, $labels),
162 $val,
163 )?
164 };
165 ($suffix:expr, $labels:expr, $val:expr) => {
166 measured::metric::counter::write_counter(
167 enc,
168 NAME.with_suffix($suffix),
169 ComposedGroup(rt_name, $labels),
170 $val,
171 )?
172 };
173 }
174 macro_rules! write_gauge {
175 ($labels:expr, $val:expr) => {
176 measured::metric::gauge::write_gauge(
177 enc,
178 NAME,
179 ComposedGroup(rt_name, $labels),
180 $val,
181 )?
182 };
183 ($suffix:expr, $labels:expr, $val:expr) => {
184 measured::metric::gauge::write_gauge(
185 enc,
186 NAME.with_suffix($suffix),
187 ComposedGroup(rt_name, $labels),
188 $val,
189 )?
190 };
191 }
192 macro_rules! write_float_gauge {
193 ($labels:expr, $val:expr) => {
194 measured::metric::gauge::write_float_gauge(
195 enc,
196 NAME,
197 ComposedGroup(rt_name, $labels),
198 $val,
199 )?
200 };
201 ($suffix:expr, $labels:expr, $val:expr) => {
202 measured::metric::gauge::write_float_gauge(
203 enc,
204 NAME.with_suffix($suffix),
205 ComposedGroup(rt_name, $labels),
206 $val,
207 )?
208 };
209 }
210 let $rt = &rt.runtime;
211 ($expr)
212 }
213 }};
214 }
215
216 metric!(
217 "threads_total",
218 "number of threads used by the runtime",
219 |rt| {
220 write_gauge!(ThreadKind::Worker, rt.num_workers() as i64);
221
222 #[cfg(tokio_unstable)]
223 let idle = rt.num_idle_blocking_threads();
224
225 #[cfg(tokio_unstable)]
227 write_gauge!(
228 ThreadKind::Blocking,
229 rt.num_blocking_threads().saturating_sub(idle) as i64
230 );
231
232 #[cfg(tokio_unstable)]
233 write_gauge!(ThreadKind::BlockingIdle, idle as i64);
234 }
235 );
236
237 metric!(
238 "alive_tasks",
239 "number of live tasks spawned in the runtime",
240 |rt| write_gauge!(NoLabels, rt.num_alive_tasks() as i64)
241 );
242
243 #[cfg(tokio_unstable)]
244 metric!("tasks_total", "number of tasks", |rt| {
245 write_counter!(NoLabels, rt.spawned_tasks_count());
246 });
247
248 metric!(
249 "queued_tasks",
250 "number of tasks currently in a queue",
251 |rt| {
252 #[cfg(tokio_unstable)]
253 write_gauge!(QueueKind::Blocking, rt.blocking_queue_depth() as i64);
254
255 write_gauge!(QueueKind::Global, rt.global_queue_depth() as i64);
256
257 #[cfg(tokio_unstable)]
258 for worker in 0..rt.num_workers() {
259 let queue_depth = rt.worker_local_queue_depth(worker);
260 write_gauge!(QueueKind::Worker(worker), queue_depth as i64);
261 }
262 }
263 );
264
265 #[cfg(tokio_unstable)]
266 metric!(
267 "scheduled_tasks_total",
268 "total number of tasks scheduled into the runtime",
269 |rt| {
270 struct Overflow(bool);
271
272 impl LabelGroup for Overflow {
273 fn visit_values(&self, v: &mut impl LabelGroupVisitor) {
274 const OVERFLOW: &LabelName = LabelName::from_str("overflow");
275 v.write_value(OVERFLOW, if self.0 { &Str("true") } else { &Str("false") });
276 }
277 }
278
279 struct Remote;
280
281 impl LabelGroup for Remote {
282 fn visit_values(&self, v: &mut impl LabelGroupVisitor) {
283 const LE: &LabelName = LabelName::from_str("worker");
284 v.write_value(LE, &Str("remote"));
285 }
286 }
287
288 for worker in 0..rt.num_workers() {
289 write_counter!(
290 Worker(worker).compose_with(Overflow(false)),
291 rt.worker_local_schedule_count(worker)
292 );
293 write_counter!(
294 Worker(worker).compose_with(Overflow(true)),
295 rt.worker_overflow_count(worker)
296 );
297 }
298 write_counter!(
299 Remote.compose_with(Overflow(true)),
300 rt.remote_schedule_count()
301 );
302 }
303 );
304
305 #[cfg(tokio_unstable)]
306 metric!(
307 "budget_forced_yield_total",
308 "number of tasks forced to yield after exhausting their budget",
309 |rt| write_counter!(NoLabels, rt.budget_forced_yield_count())
310 );
311
312 #[cfg(tokio_unstable)]
313 metric!(
314 "worker_mean_poll_time_seconds",
315 "estimated weighted moving average of the poll time for this worker",
316 |rt| for worker in 0..rt.num_workers() {
317 let poll_time = rt.worker_mean_poll_time(worker);
318 write_float_gauge!(Worker(worker), poll_time.as_secs_f64());
319 }
320 );
321
322 #[cfg(tokio_unstable)]
323 metric!(
324 "worker_noop_total",
325 "number of times the given worker thread woke up with no work",
326 |rt| for worker in 0..rt.num_workers() {
327 let noops = rt.worker_noop_count(worker);
328 write_counter!(Worker(worker), noops);
329 }
330 );
331
332 metric!(
333 "worker_park_total",
334 "number of times the given worker thread has parked",
335 |rt| for worker in 0..rt.num_workers() {
336 let count = rt.worker_park_count(worker);
337 write_counter!(Worker(worker), count);
338 }
339 );
340
341 metric!(
342 "workers_park_unpark_total",
343 "number of times the given worker thread has parked and unparked",
344 |rt| for worker in 0..rt.num_workers() {
345 let count = rt.worker_park_unpark_count(worker);
346 write_counter!(Worker(worker), count);
347 }
348 );
349
350 #[cfg(tokio_unstable)]
351 metric!(
352 "worker_steal_total",
353 "number of tasks the given worker thread has stolen",
354 |rt| for worker in 0..rt.num_workers() {
355 let count = rt.worker_steal_count(worker);
356 write_counter!(Worker(worker), count);
357 }
358 );
359
360 #[cfg(tokio_unstable)]
361 metric!(
362 "worker_steal_operations_total",
363 "number of times the given worker thread has attempted to steal tasks",
364 |rt| for worker in 0..rt.num_workers() {
365 let count = rt.worker_steal_operations(worker);
366 write_counter!(Worker(worker), count);
367 }
368 );
369
370 metric!(
371 "worker_poll_time_seconds",
372 "time this runtime thread has spent polling tasks",
373 |rt| for worker in 0..rt.num_workers() {
374 use measured::metric::name::Sum;
375
376 let worker_label = Worker(worker);
377
378 #[cfg(tokio_unstable)]
379 {
380 use measured::metric::name::{Bucket, Count};
381 if rt.poll_time_histogram_enabled() {
382 let buckets = rt.poll_time_histogram_num_buckets();
383 let mut total = 0;
384 for bucket in 0..buckets {
385 let le = histogram_le(rt, bucket);
386 total += rt.poll_time_histogram_bucket_count(worker, bucket);
387 write_counter!(Bucket, worker_label.compose_with(le), total);
388 }
389 }
390
391 let count = rt.worker_poll_count(worker);
392 write_counter!(Count, worker_label, count);
393 }
394
395 let busy = rt.worker_total_busy_duration(worker);
396 write_float_gauge!(Sum, worker_label, busy.as_secs_f64());
397 }
398 );
399
400 #[cfg(tokio_unstable)]
401 #[cfg(feature = "net")]
402 {
403 metric!(
404 "registered_fds_total",
405 "total number of file descriptors that have been registered in the runtime",
406 |rt| write_counter!(NoLabels, rt.io_driver_fd_registered_count())
407 );
408 metric!(
409 "deregistered_fds_total",
410 "total number of file descriptors that have been deregistered from the runtime",
411 |rt| write_counter!(NoLabels, rt.io_driver_fd_deregistered_count())
412 );
413 metric!(
414 "io_ready_events_total",
415 "total number of ready events the runtime's IO driver has processed",
416 |rt| write_counter!(NoLabels, rt.io_driver_ready_count())
417 );
418 }
419
420 Ok(())
421}
422
423impl<Enc: Encoding> MetricGroup<Enc> for RuntimeCollector
424where
425 CounterState: MetricEncoding<Enc>,
426 GaugeState: MetricEncoding<Enc>,
427 FloatGaugeState: MetricEncoding<Enc>,
428{
429 fn collect_group_into(&self, enc: &mut Enc) -> Result<(), Enc::Err> {
430 collect(std::slice::from_ref(self), enc)
431 }
432}
433
434struct I64(i64);
435
436impl LabelValue for I64 {
437 fn visit<V: LabelVisitor>(&self, v: V) -> V::Output {
438 v.write_int(self.0)
439 }
440}
441
442#[cfg(tokio_unstable)]
443struct F64(f64);
444
445#[cfg(tokio_unstable)]
446impl LabelValue for F64 {
447 fn visit<V: LabelVisitor>(&self, v: V) -> V::Output {
448 v.write_float(self.0)
449 }
450}
451
452#[derive(Copy, Clone)]
453struct Worker(usize);
454
455impl LabelGroup for Worker {
456 fn visit_values(&self, v: &mut impl LabelGroupVisitor) {
457 const LE: &LabelName = LabelName::from_str("worker");
458 v.write_value(LE, &I64(self.0 as i64));
459 }
460}
461
462#[cfg(tokio_unstable)]
463struct HistogramLabelLe {
464 le: f64,
465}
466
467#[cfg(tokio_unstable)]
468impl LabelGroup for HistogramLabelLe {
469 fn visit_values(&self, v: &mut impl LabelGroupVisitor) {
470 const LE: &LabelName = LabelName::from_str("le");
471 v.write_value(LE, &F64(self.le));
472 }
473}
474
475struct Str<'a>(&'a str);
476impl LabelValue for Str<'_> {
477 fn visit<V: LabelVisitor>(&self, v: V) -> V::Output {
478 v.write_str(self.0)
479 }
480}
481
482struct RuntimeName {
483 name: Option<Cow<'static, str>>,
484}
485
486impl LabelGroup for RuntimeName {
487 fn visit_values(&self, v: &mut impl LabelGroupVisitor) {
488 const LE: &LabelName = LabelName::from_str("runtime");
489 if let Some(name) = self.name.as_deref() {
490 v.write_value(LE, &Str(name));
491 }
492 }
493}
494
495#[derive(FixedCardinalityLabel, Clone, Copy)]
496#[label(singleton = "kind")]
497enum ThreadKind {
498 Worker,
499 BlockingIdle,
500 Blocking,
501}
502
503#[allow(unused)]
504enum QueueKind {
505 Worker(usize),
506 Blocking,
507 Global,
508}
509
510#[automatically_derived]
511impl LabelValue for QueueKind {
512 fn visit<V: LabelVisitor>(&self, v: V) -> V::Output {
513 match self {
514 QueueKind::Worker(i) => v.write_str(itoa::Buffer::new().format(*i)),
515 QueueKind::Blocking => v.write_str("blocking"),
516 QueueKind::Global => v.write_str("global"),
517 }
518 }
519}
520
521impl LabelGroup for QueueKind {
522 fn visit_values(&self, v: &mut impl LabelGroupVisitor) {
523 const NAME: &LabelName = LabelName::from_str("kind");
524 v.write_value(NAME, self);
525 }
526}
527
528