1use std::{
4 borrow::Cow,
5 future::Future,
6 ops::{Deref, DerefMut},
7 pin::Pin,
8 sync::Arc,
9 task::{ready, Context, Poll},
10 time::Instant,
11};
12
13use opentelemetry::{Key, KeyValue};
14use pin_project::pin_project;
15use tracing::{debug_span, instrument::Instrumented, Instrument, Span};
16
17use crate::metrics::Metrics;
18
19const KEY_OP: Key = Key::from_static_str("db.operation.name");
20
21pub struct InstrumentedResource<R> {
23 metrics: Arc<Metrics>,
25 label: [KeyValue; 1],
27 time: Instant,
29 resource: R,
31}
32
33impl<R> InstrumentedResource<R> {
34 pub(crate) fn new(metrics: Arc<Metrics>, label: [KeyValue; 1], resource: R) -> Self {
36 Self {
37 metrics,
38 label,
39 time: Instant::now(),
41 resource,
42 }
43 }
44
45 pub fn op<O: Into<Cow<'static, str>>>(&self, op_name: O) -> InstrumentedOperation {
47 let name: Cow<'static, str> = op_name.into();
48 let span = debug_span!("op", "db.operation.name" = name.to_string());
49 let labels = [self.label[0].clone(), KeyValue::new(KEY_OP, name)];
50 InstrumentedOperation {
51 span,
52 metrics: self.metrics.clone(),
53 labels,
54 time: Instant::now(),
55 }
56 }
57
58 #[inline]
60 pub fn wrap<O, F, T>(self, op_name: O, func: F) -> T
61 where
62 O: Into<Cow<'static, str>>,
63 F: FnOnce(Self) -> T,
64 {
65 let op = self.op(op_name);
66 op.span.in_scope(|| func(self))
67 }
68
69 pub fn wrap_async<'a, T, O, F, Fut>(&'a self, op_name: O, func: F) -> InstrumentedFuture<Fut>
71 where
72 O: Into<Cow<'static, str>>,
73 F: FnOnce(&'a R) -> Fut,
74 Fut: Future<Output = T>,
75 {
76 let op = self.op(op_name);
77 InstrumentedFuture {
78 inner: func(&self.resource).instrument(op.span.clone()),
79 op: Some(op),
80 }
81 }
82
83 pub fn wrap_async_mut<'a, T, O, F, Fut>(
85 &'a mut self,
86 op_name: O,
87 func: F,
88 ) -> InstrumentedFuture<Fut>
89 where
90 O: Into<Cow<'static, str>>,
91 F: FnOnce(&'a mut R) -> Fut,
92 Fut: Future<Output = T>,
93 {
94 let op = self.op(op_name);
95 InstrumentedFuture {
96 inner: func(&mut self.resource).instrument(op.span.clone()),
97 op: Some(op),
98 }
99 }
100}
101
102impl<R> Deref for InstrumentedResource<R> {
103 type Target = R;
104
105 fn deref(&self) -> &Self::Target {
106 &self.resource
107 }
108}
109
110impl<R> DerefMut for InstrumentedResource<R> {
111 fn deref_mut(&mut self) -> &mut Self::Target {
112 &mut self.resource
113 }
114}
115
116impl<R> AsRef<R> for InstrumentedResource<R> {
117 fn as_ref(&self) -> &R {
118 &self.resource
119 }
120}
121
122impl<R> AsMut<R> for InstrumentedResource<R> {
123 fn as_mut(&mut self) -> &mut R {
124 &mut self.resource
125 }
126}
127
128impl<R: Clone> Clone for InstrumentedResource<R> {
129 fn clone(&self) -> Self {
130 Self {
131 metrics: self.metrics.clone(),
132 label: self.label.clone(),
133 time: self.time,
134 resource: self.resource.clone(),
135 }
136 }
137}
138
139impl<R> Drop for InstrumentedResource<R> {
142 fn drop(&mut self) {
143 self.metrics
145 .use_time
146 .record(self.time.elapsed().as_secs_f64(), &self.label);
147 }
148}
149
150pub struct InstrumentedOperation {
152 span: Span,
154 metrics: Arc<Metrics>,
156 labels: [KeyValue; 2],
158 time: Instant,
160}
161
162impl Drop for InstrumentedOperation {
163 fn drop(&mut self) {
164 self.metrics
166 .op_duration
167 .record(self.time.elapsed().as_secs_f64(), &self.labels);
168 }
169}
170
171#[pin_project]
173#[non_exhaustive]
174pub struct InstrumentedFuture<F> {
175 #[pin]
177 inner: Instrumented<F>,
178 op: Option<InstrumentedOperation>,
180}
181
182impl<F: Future> Future for InstrumentedFuture<F> {
183 type Output = F::Output;
184
185 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
186 let this = self.project();
187 let ret = ready!(this.inner.poll(cx));
188 this.op.take();
191 Poll::Ready(ret)
192 }
193}