rw_deno_core/
ops_metrics.rs

1// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
2
3use crate::ops::OpCtx;
4use crate::serde::Serialize;
5use crate::OpDecl;
6use crate::OpId;
7use std::cell::Ref;
8use std::cell::RefCell;
9use std::cell::RefMut;
10use std::rc::Rc;
11
12/// The type of op metrics event.
13#[derive(Copy, Clone, Debug, Eq, PartialEq)]
14#[repr(u8)]
15pub enum OpMetricsEvent {
16  /// Entered an op dispatch.
17  Dispatched,
18  /// Left an op synchronously.
19  Completed,
20  /// Left an op asynchronously.
21  CompletedAsync,
22  /// Left an op synchronously with an exception.
23  Error,
24  /// Left an op asynchronously with an exception.
25  ErrorAsync,
26}
27
28#[derive(Copy, Clone, Debug, Eq, PartialEq)]
29#[repr(u8)]
30pub enum OpMetricsSource {
31  Slow,
32  Fast,
33  Async,
34}
35
36/// A callback to receieve an [`OpMetricsEvent`].
37pub type OpMetricsFn = Rc<dyn Fn(&OpCtx, OpMetricsEvent, OpMetricsSource)>;
38
39// TODO(mmastrac): this would be better as a trait
40/// A callback to retrieve an optional [`OpMetricsFn`] for this op.
41pub type OpMetricsFactoryFn =
42  Box<dyn Fn(OpId, usize, &OpDecl) -> Option<OpMetricsFn>>;
43
44/// Given two [`OpMetricsFactoryFn`] implementations, merges them so that op metric events are
45/// called on both.
46pub fn merge_op_metrics(
47  fn1: impl Fn(OpId, usize, &OpDecl) -> Option<OpMetricsFn> + 'static,
48  fn2: impl Fn(OpId, usize, &OpDecl) -> Option<OpMetricsFn> + 'static,
49) -> OpMetricsFactoryFn {
50  Box::new(move |op, count, decl| {
51    match (fn1(op, count, decl), fn2(op, count, decl)) {
52      (None, None) => None,
53      (Some(a), None) => Some(a),
54      (None, Some(b)) => Some(b),
55      (Some(a), Some(b)) => Some(Rc::new(move |ctx, event, source| {
56        a(ctx, event, source);
57        b(ctx, event, source);
58      })),
59    }
60  })
61}
62
63#[doc(hidden)]
64pub fn dispatch_metrics_fast(opctx: &OpCtx, metrics: OpMetricsEvent) {
65  // SAFETY: this should only be called from ops where we know the function is Some
66  unsafe {
67    (opctx.metrics_fn.as_ref().unwrap_unchecked())(
68      opctx,
69      metrics,
70      OpMetricsSource::Fast,
71    )
72  }
73}
74
75#[doc(hidden)]
76pub fn dispatch_metrics_slow(opctx: &OpCtx, metrics: OpMetricsEvent) {
77  // SAFETY: this should only be called from ops where we know the function is Some
78  unsafe {
79    (opctx.metrics_fn.as_ref().unwrap_unchecked())(
80      opctx,
81      metrics,
82      OpMetricsSource::Slow,
83    )
84  }
85}
86
87#[doc(hidden)]
88pub fn dispatch_metrics_async(opctx: &OpCtx, metrics: OpMetricsEvent) {
89  // SAFETY: this should only be called from ops where we know the function is Some
90  unsafe {
91    (opctx.metrics_fn.as_ref().unwrap_unchecked())(
92      opctx,
93      metrics,
94      OpMetricsSource::Async,
95    )
96  }
97}
98
99/// Used for both aggregate and per-op metrics.
100#[derive(Clone, Default, Debug, Serialize, PartialEq, Eq)]
101#[serde(rename_all = "camelCase")]
102pub struct OpMetricsSummary {
103  // The number of ops dispatched synchronously
104  pub ops_dispatched_sync: u64,
105  // The number of ops dispatched asynchronously
106  pub ops_dispatched_async: u64,
107  // The number of sync ops dispatched fast
108  pub ops_dispatched_fast: u64,
109  // The number of asynchronously-dispatch ops completed
110  pub ops_completed_async: u64,
111}
112
113impl OpMetricsSummary {
114  /// Does this op have outstanding async op dispatches?
115  pub fn has_outstanding_ops(&self) -> bool {
116    self.ops_dispatched_async > self.ops_completed_async
117  }
118}
119
120#[derive(Default, Debug)]
121pub struct OpMetricsSummaryTracker {
122  ops: RefCell<Vec<OpMetricsSummary>>,
123}
124
125impl OpMetricsSummaryTracker {
126  pub fn per_op(&self) -> Ref<'_, Vec<OpMetricsSummary>> {
127    self.ops.borrow()
128  }
129
130  pub fn aggregate(&self) -> OpMetricsSummary {
131    let mut sum = OpMetricsSummary::default();
132
133    for metrics in self.ops.borrow().iter() {
134      sum.ops_dispatched_sync += metrics.ops_dispatched_sync;
135      sum.ops_dispatched_fast += metrics.ops_dispatched_fast;
136      sum.ops_dispatched_async += metrics.ops_dispatched_async;
137      sum.ops_completed_async += metrics.ops_completed_async;
138    }
139
140    sum
141  }
142
143  #[inline]
144  fn metrics_mut(&self, id: OpId) -> RefMut<OpMetricsSummary> {
145    RefMut::map(self.ops.borrow_mut(), |ops| &mut ops[id as usize])
146  }
147
148  /// Returns a [`OpMetricsFn`] for this tracker.
149  fn op_metrics_fn(self: Rc<Self>) -> OpMetricsFn {
150    Rc::new(move |ctx, event, source| match event {
151      OpMetricsEvent::Dispatched => {
152        let mut m = self.metrics_mut(ctx.id);
153        if source == OpMetricsSource::Fast {
154          m.ops_dispatched_fast += 1;
155        }
156        if ctx.decl.is_async {
157          m.ops_dispatched_async += 1;
158        } else {
159          m.ops_dispatched_sync += 1;
160        }
161      }
162      OpMetricsEvent::Completed
163      | OpMetricsEvent::Error
164      | OpMetricsEvent::CompletedAsync
165      | OpMetricsEvent::ErrorAsync => {
166        if ctx.decl.is_async {
167          self.metrics_mut(ctx.id).ops_completed_async += 1;
168        }
169      }
170    })
171  }
172
173  /// Retrieves the metrics factory function for this tracker.
174  pub fn op_metrics_factory_fn(
175    self: Rc<Self>,
176    op_enabled: impl Fn(&OpDecl) -> bool + 'static,
177  ) -> OpMetricsFactoryFn {
178    Box::new(move |_, total, op| {
179      let mut ops = self.ops.borrow_mut();
180      if ops.capacity() == 0 {
181        ops.reserve_exact(total);
182      }
183      ops.push(OpMetricsSummary::default());
184      if op_enabled(op) {
185        Some(self.clone().op_metrics_fn())
186      } else {
187        None
188      }
189    })
190  }
191}