use std::{
borrow::Cow,
future::Future,
ops::{Deref, DerefMut},
pin::Pin,
sync::Arc,
task::{ready, Context, Poll},
time::Instant,
};
use opentelemetry::{Key, KeyValue};
use pin_project::pin_project;
use tracing::{debug_span, instrument::Instrumented, Instrument, Span};
use crate::metrics::Metrics;
const KEY_OP: Key = Key::from_static_str("db.operation.name");
pub struct InstrumentedResource<R> {
metrics: Arc<Metrics>,
label: [KeyValue; 1],
time: Instant,
resource: R,
}
impl<R> InstrumentedResource<R> {
pub(crate) fn new(metrics: Arc<Metrics>, label: [KeyValue; 1], resource: R) -> Self {
Self {
metrics,
label,
time: Instant::now(),
resource,
}
}
pub fn op<O: Into<Cow<'static, str>>>(&self, op_name: O) -> InstrumentedOperation {
let name: Cow<'static, str> = op_name.into();
let span = debug_span!("op", "db.operation.name" = name.to_string());
let labels = [self.label[0].clone(), KeyValue::new(KEY_OP, name)];
InstrumentedOperation {
span,
metrics: self.metrics.clone(),
labels,
time: Instant::now(),
}
}
#[inline]
pub fn wrap<O, F, T>(self, op_name: O, func: F) -> T
where
O: Into<Cow<'static, str>>,
F: FnOnce(Self) -> T,
{
let op = self.op(op_name);
op.span.in_scope(|| func(self))
}
pub fn wrap_async<'a, T, O, F, Fut>(&'a self, op_name: O, func: F) -> InstrumentedFuture<Fut>
where
O: Into<Cow<'static, str>>,
F: FnOnce(&'a R) -> Fut,
Fut: Future<Output = T>,
{
let op = self.op(op_name);
InstrumentedFuture {
inner: func(&self.resource).instrument(op.span.clone()),
op: Some(op),
}
}
pub fn wrap_async_mut<'a, T, O, F, Fut>(
&'a mut self,
op_name: O,
func: F,
) -> InstrumentedFuture<Fut>
where
O: Into<Cow<'static, str>>,
F: FnOnce(&'a mut R) -> Fut,
Fut: Future<Output = T>,
{
let op = self.op(op_name);
InstrumentedFuture {
inner: func(&mut self.resource).instrument(op.span.clone()),
op: Some(op),
}
}
}
impl<R> Deref for InstrumentedResource<R> {
type Target = R;
fn deref(&self) -> &Self::Target {
&self.resource
}
}
impl<R> DerefMut for InstrumentedResource<R> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.resource
}
}
impl<R> AsRef<R> for InstrumentedResource<R> {
fn as_ref(&self) -> &R {
&self.resource
}
}
impl<R> AsMut<R> for InstrumentedResource<R> {
fn as_mut(&mut self) -> &mut R {
&mut self.resource
}
}
impl<R: Clone> Clone for InstrumentedResource<R> {
fn clone(&self) -> Self {
Self {
metrics: self.metrics.clone(),
label: self.label.clone(),
time: self.time,
resource: self.resource.clone(),
}
}
}
impl<R> Drop for InstrumentedResource<R> {
fn drop(&mut self) {
self.metrics
.use_time
.record(self.time.elapsed().as_secs_f64(), &self.label);
}
}
pub struct InstrumentedOperation {
span: Span,
metrics: Arc<Metrics>,
labels: [KeyValue; 2],
time: Instant,
}
impl Drop for InstrumentedOperation {
fn drop(&mut self) {
self.metrics
.op_duration
.record(self.time.elapsed().as_secs_f64(), &self.labels);
}
}
#[pin_project]
#[non_exhaustive]
pub struct InstrumentedFuture<F> {
#[pin]
inner: Instrumented<F>,
op: Option<InstrumentedOperation>,
}
impl<F: Future> Future for InstrumentedFuture<F> {
type Output = F::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let ret = ready!(this.inner.poll(cx));
this.op.take();
Poll::Ready(ret)
}
}