use cf_rustracing::span::SpanConsumer;
use cf_rustracing_jaeger::span::{FinishedSpan, SpanContextState as JaegerContext};
use std::future::poll_fn;
use std::num::NonZeroUsize;
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::sync::{Mutex, mpsc};
enum Receiver<T> {
Bounded(mpsc::Receiver<T>),
Unbounded(mpsc::UnboundedReceiver<T>),
}
impl<T> Receiver<T> {
#[allow(dead_code, reason = "only used if `metrics` feature is enabled")]
#[inline]
fn len(&self) -> usize {
match self {
Self::Bounded(r) => r.len(),
Self::Unbounded(r) => r.len(),
}
}
#[allow(dead_code, reason = "only used if `testing` feature is enabled")]
#[inline]
fn try_recv(&mut self) -> Result<T, mpsc::error::TryRecvError> {
match self {
Self::Bounded(r) => r.try_recv(),
Self::Unbounded(r) => r.try_recv(),
}
}
#[inline]
fn poll_recv_many(
&mut self,
cx: &mut Context,
buffer: &mut Vec<T>,
limit: usize,
) -> Poll<usize> {
match self {
Self::Bounded(r) => r.poll_recv_many(cx, buffer, limit),
Self::Unbounded(r) => r.poll_recv_many(cx, buffer, limit),
}
}
}
#[derive(Clone)]
pub(super) struct SharedSpanReceiver {
rx: Arc<Mutex<Receiver<FinishedSpan>>>,
}
impl SharedSpanReceiver {
fn new_bounded(receiver: mpsc::Receiver<FinishedSpan>) -> Self {
Self {
rx: Arc::new(Mutex::new(Receiver::Bounded(receiver))),
}
}
fn new_unbounded(receiver: mpsc::UnboundedReceiver<FinishedSpan>) -> Self {
Self {
rx: Arc::new(Mutex::new(Receiver::Unbounded(receiver))),
}
}
#[cfg(any(test, feature = "testing"))]
pub(super) fn try_unique_recv(&mut self) -> Option<FinishedSpan> {
let rx = Arc::get_mut(&mut self.rx)?.get_mut();
let res = rx.try_recv();
#[cfg(feature = "metrics")]
super::metrics::tracing::queue_size().set(rx.len() as u64);
res.ok()
}
pub(super) async fn recv_many(&self, buffer: &mut Vec<FinishedSpan>, limit: usize) -> usize {
let rx = &mut *self.rx.lock().await;
#[cfg(feature = "metrics")]
let queue_size = super::metrics::tracing::queue_size();
let res = poll_fn(|cx| {
#[cfg(feature = "metrics")]
queue_size.set(rx.len() as u64);
rx.poll_recv_many(cx, buffer, limit)
})
.await;
#[cfg(feature = "metrics")]
queue_size.set(rx.len() as u64);
res
}
}
trait Sender<T> {
fn try_send(&self, message: T) -> Result<(), mpsc::error::TrySendError<T>>;
}
impl<T> Sender<T> for mpsc::Sender<T> {
#[inline]
fn try_send(&self, message: T) -> Result<(), mpsc::error::TrySendError<T>> {
mpsc::Sender::try_send(self, message)
}
}
impl<T> Sender<T> for mpsc::UnboundedSender<T> {
#[inline]
fn try_send(&self, message: T) -> Result<(), mpsc::error::TrySendError<T>> {
self.send(message)?;
Ok(())
}
}
#[derive(Clone)]
pub(super) struct SpanSender<S>(S);
impl<S: Sender<FinishedSpan> + Send + Sync> SpanConsumer<JaegerContext> for SpanSender<S> {
fn consume_span(&self, span: FinishedSpan) {
let _res = self.0.try_send(span);
#[cfg(feature = "metrics")]
{
super::metrics::tracing::spans_total().inc();
if _res.is_err() {
super::metrics::tracing::spans_dropped().inc();
}
}
}
}
pub(super) type BoundedSpanSender = SpanSender<mpsc::Sender<FinishedSpan>>;
pub(super) type UnboundedSpanSender = SpanSender<mpsc::UnboundedSender<FinishedSpan>>;
pub(super) fn channel(buffer: NonZeroUsize) -> (BoundedSpanSender, SharedSpanReceiver) {
let (send, recv) = mpsc::channel(buffer.get());
(SpanSender(send), SharedSpanReceiver::new_bounded(recv))
}
pub(super) fn unbounded_channel() -> (UnboundedSpanSender, SharedSpanReceiver) {
let (send, recv) = mpsc::unbounded_channel();
(SpanSender(send), SharedSpanReceiver::new_unbounded(recv))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::telemetry::tracing::metrics::tracing as tracing_metrics;
use cf_rustracing::Tracer;
use cf_rustracing::sampler::AllSampler;
#[tokio::test]
async fn test_span_metrics() {
let (send, recv) = channel(NonZeroUsize::new(3).unwrap());
let tracer = Tracer::with_consumer(AllSampler, send);
for _ in 0..5 {
let _span = tracer.span("my span").start();
}
assert_eq!(tracing_metrics::spans_total().get(), 5);
assert_eq!(tracing_metrics::spans_dropped().get(), 2);
let mut spans = Vec::new();
let got = recv.recv_many(&mut spans, 1).await;
assert_eq!(got, 1);
assert_eq!(spans.len(), 1);
assert_eq!(tracing_metrics::queue_size().get(), 2);
let got = recv.recv_many(&mut spans, 100).await;
assert_eq!(got, 2);
assert_eq!(spans.len(), 3);
assert_eq!(tracing_metrics::queue_size().get(), 0);
}
}