uxum_pools/
resource.rs

1//! Wrappers for contained resources and operations over them.
2
3use std::{
4    borrow::Cow,
5    future::Future,
6    ops::{Deref, DerefMut},
7    pin::Pin,
8    sync::Arc,
9    task::{ready, Context, Poll},
10    time::Instant,
11};
12
13use opentelemetry::{Key, KeyValue};
14use pin_project::pin_project;
15use tracing::{debug_span, instrument::Instrumented, Instrument, Span};
16
17use crate::metrics::Metrics;
18
19const KEY_OP: Key = Key::from_static_str("db.operation.name");
20
21/// Instrumented resource.
22pub struct InstrumentedResource<R> {
23    /// Linked metrics storage.
24    metrics: Arc<Metrics>,
25    /// Premade label used to record metrics.
26    label: [KeyValue; 1],
27    /// Retrieval time.
28    time: Instant,
29    /// Original resource.
30    resource: R,
31}
32
33impl<R> InstrumentedResource<R> {
34    /// Bundle resource, metrics container and identifier for the originating pool.
35    pub(crate) fn new(metrics: Arc<Metrics>, label: [KeyValue; 1], resource: R) -> Self {
36        Self {
37            metrics,
38            label,
39            // This is different from time used in wait_time metric.
40            time: Instant::now(),
41            resource,
42        }
43    }
44
45    /// Create a guard for running an operation.
46    pub fn op<O: Into<Cow<'static, str>>>(&self, op_name: O) -> InstrumentedOperation {
47        let name: Cow<'static, str> = op_name.into();
48        let span = debug_span!("op", "db.operation.name" = name.to_string());
49        let labels = [self.label[0].clone(), KeyValue::new(KEY_OP, name)];
50        InstrumentedOperation {
51            span,
52            metrics: self.metrics.clone(),
53            labels,
54            time: Instant::now(),
55        }
56    }
57
58    /// Instrument and measure execution of a closure.
59    #[inline]
60    pub fn wrap<O, F, T>(self, op_name: O, func: F) -> T
61    where
62        O: Into<Cow<'static, str>>,
63        F: FnOnce(Self) -> T,
64    {
65        let op = self.op(op_name);
66        op.span.in_scope(|| func(self))
67    }
68
69    /// Instrument and measure execution of a generated future.
70    pub fn wrap_async<'a, T, O, F, Fut>(&'a self, op_name: O, func: F) -> InstrumentedFuture<Fut>
71    where
72        O: Into<Cow<'static, str>>,
73        F: FnOnce(&'a R) -> Fut,
74        Fut: Future<Output = T>,
75    {
76        let op = self.op(op_name);
77        InstrumentedFuture {
78            inner: func(&self.resource).instrument(op.span.clone()),
79            op: Some(op),
80        }
81    }
82
83    /// Instrument and measure execution of a generated future.
84    pub fn wrap_async_mut<'a, T, O, F, Fut>(
85        &'a mut self,
86        op_name: O,
87        func: F,
88    ) -> InstrumentedFuture<Fut>
89    where
90        O: Into<Cow<'static, str>>,
91        F: FnOnce(&'a mut R) -> Fut,
92        Fut: Future<Output = T>,
93    {
94        let op = self.op(op_name);
95        InstrumentedFuture {
96            inner: func(&mut self.resource).instrument(op.span.clone()),
97            op: Some(op),
98        }
99    }
100}
101
102impl<R> Deref for InstrumentedResource<R> {
103    type Target = R;
104
105    fn deref(&self) -> &Self::Target {
106        &self.resource
107    }
108}
109
110impl<R> DerefMut for InstrumentedResource<R> {
111    fn deref_mut(&mut self) -> &mut Self::Target {
112        &mut self.resource
113    }
114}
115
116impl<R> AsRef<R> for InstrumentedResource<R> {
117    fn as_ref(&self) -> &R {
118        &self.resource
119    }
120}
121
122impl<R> AsMut<R> for InstrumentedResource<R> {
123    fn as_mut(&mut self) -> &mut R {
124        &mut self.resource
125    }
126}
127
128impl<R: Clone> Clone for InstrumentedResource<R> {
129    fn clone(&self) -> Self {
130        Self {
131            metrics: self.metrics.clone(),
132            label: self.label.clone(),
133            time: self.time,
134            resource: self.resource.clone(),
135        }
136    }
137}
138
139// TODO: Debug, Display
140
141impl<R> Drop for InstrumentedResource<R> {
142    fn drop(&mut self) {
143        // Record time spent outside the pool.
144        self.metrics
145            .use_time
146            .record(self.time.elapsed().as_secs_f64(), &self.label);
147    }
148}
149
150/// Instrumented operation on a resource.
151pub struct InstrumentedOperation {
152    /// Span to wrap operation in.
153    span: Span,
154    /// Linked metrics storage.
155    metrics: Arc<Metrics>,
156    /// Premade labels used to record metrics.
157    labels: [KeyValue; 2],
158    /// Operation start time.
159    time: Instant,
160}
161
162impl Drop for InstrumentedOperation {
163    fn drop(&mut self) {
164        // Record time spent performing this operation.
165        self.metrics
166            .op_duration
167            .record(self.time.elapsed().as_secs_f64(), &self.labels);
168    }
169}
170
171/// Wrapping future providing operation instrumentation.
172#[pin_project]
173#[non_exhaustive]
174pub struct InstrumentedFuture<F> {
175    /// Inner future.
176    #[pin]
177    inner: Instrumented<F>,
178    /// Instrumentation.
179    op: Option<InstrumentedOperation>,
180}
181
182impl<F: Future> Future for InstrumentedFuture<F> {
183    type Output = F::Output;
184
185    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
186        let this = self.project();
187        let ret = ready!(this.inner.poll(cx));
188        // Force drop execution.
189        // TODO: maybe use ManuallyDrop here (unsafe).
190        this.op.take();
191        Poll::Ready(ret)
192    }
193}