use crate::carrier;
use crate::convert::MaybeAsRef;
use crate::log::{Log, LogBuilder, StdErrorLogFieldsBuilder};
use crate::sampler::{AllSampler, Sampler};
use crate::tag::{StdTag, Tag, TagValue};
use crate::Result;
use std::borrow::Cow;
use std::fmt;
use std::io::{Read, Write};
use std::sync::Arc;
use std::time::SystemTime;
use tokio::sync::mpsc;
pub trait SpanConsumer<T>: Send + Sync {
fn consume_span(&self, span: FinishedSpan<T>);
}
impl<T: Send> SpanConsumer<T> for mpsc::UnboundedSender<FinishedSpan<T>> {
fn consume_span(&self, span: FinishedSpan<T>) {
let _ = self.send(span);
}
}
impl<T: Send> SpanConsumer<T> for mpsc::Sender<FinishedSpan<T>> {
fn consume_span(&self, span: FinishedSpan<T>) {
let _ = self.try_send(span);
}
}
impl<T: Send> SpanConsumer<T> for std::sync::mpsc::Sender<FinishedSpan<T>> {
fn consume_span(&self, span: FinishedSpan<T>) {
let _ = self.send(span);
}
}
impl<T: Send> SpanConsumer<T> for std::sync::mpsc::SyncSender<FinishedSpan<T>> {
fn consume_span(&self, span: FinishedSpan<T>) {
let _ = self.try_send(span);
}
}
pub type SpanReceiver<T> = mpsc::UnboundedReceiver<FinishedSpan<T>>;
#[deprecated = "SpanSender is an implementation detail of rustracing. It should not be public."]
pub type SpanSender<T> = mpsc::UnboundedSender<FinishedSpan<T>>;
pub(crate) struct SharedSpanConsumer<T>(Arc<dyn SpanConsumer<T>>);
impl<T> SharedSpanConsumer<T> {
pub(crate) fn new(consumer: impl SpanConsumer<T> + 'static) -> Self {
Self(Arc::new(consumer))
}
}
impl<T> fmt::Debug for SharedSpanConsumer<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_str("SharedSpanConsumer")
}
}
impl<T> Clone for SharedSpanConsumer<T> {
fn clone(&self) -> Self {
Self(Arc::clone(&self.0))
}
}
pub struct FinishSpanCallback<T>(FinishCallbackInner<T>);
type FinishCallbackInner<T> = Arc<dyn Fn(&mut Span<T>) + Send + Sync>;
impl<T> fmt::Debug for FinishSpanCallback<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_str("FinishSpanCallback")
}
}
impl<T> Clone for FinishSpanCallback<T> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl<T> From<FinishSpanCallback<T>> for FinishCallbackInner<T> {
fn from(v: FinishSpanCallback<T>) -> Self {
v.0
}
}
impl<T> From<FinishCallbackInner<T>> for FinishSpanCallback<T> {
fn from(v: FinishCallbackInner<T>) -> Self {
Self(v)
}
}
impl<T, F: Fn(&mut Span<T>) + Send + Sync + 'static> From<F> for FinishSpanCallback<T> {
fn from(v: F) -> Self {
Self(Arc::new(v))
}
}
#[derive(Debug)]
pub struct Span<T>(Option<SpanInner<T>>);
impl<T> Span<T> {
pub const fn inactive() -> Self {
Span(None)
}
pub fn handle(&self) -> SpanHandle<T>
where
T: Clone,
{
SpanHandle(
self.0
.as_ref()
.map(|inner| (inner.context.clone(), inner.span_tx.clone())),
)
}
pub fn is_sampled(&self) -> bool {
self.0.is_some()
}
pub fn context(&self) -> Option<&SpanContext<T>> {
self.0.as_ref().map(|x| &x.context)
}
pub fn set_operation_name<F, N>(&mut self, f: F)
where
F: FnOnce() -> N,
N: Into<Cow<'static, str>>,
{
if let Some(inner) = self.0.as_mut() {
inner.operation_name = f().into();
}
}
pub fn set_start_time<F>(&mut self, f: F)
where
F: FnOnce() -> SystemTime,
{
if let Some(inner) = self.0.as_mut() {
inner.start_time = f();
}
}
pub fn set_finish_time<F>(&mut self, f: F)
where
F: FnOnce() -> SystemTime,
{
if let Some(inner) = self.0.as_mut() {
inner.finish_time = Some(f());
}
}
pub fn set_finish_callback<C>(&mut self, cb: C)
where
C: Into<FinishSpanCallback<T>>,
{
if let Some(inner) = &mut self.0 {
inner.finish_cb = Some(cb.into());
}
}
#[doc(alias = "remove_finish_callback")]
pub fn take_finish_callback(&mut self) -> Option<FinishSpanCallback<T>> {
self.0.as_mut().and_then(|s| s.finish_cb.take())
}
pub fn set_tag<F>(&mut self, f: F)
where
F: FnOnce() -> Tag,
{
use std::iter::once;
self.set_tags(|| once(f()));
}
pub fn set_tags<F, I>(&mut self, f: F)
where
F: FnOnce() -> I,
I: IntoIterator<Item = Tag>,
{
if let Some(inner) = self.0.as_mut() {
for tag in f() {
inner.tags.retain(|x| x.name() != tag.name());
inner.tags.push(tag);
}
}
}
pub fn set_baggage_item<F>(&mut self, f: F)
where
F: FnOnce() -> BaggageItem,
{
if let Some(inner) = self.0.as_mut() {
let item = f();
inner.context.baggage_items.retain(|x| x.name != item.name);
inner.context.baggage_items.push(item);
}
}
pub fn get_baggage_item(&self, name: &str) -> Option<&BaggageItem> {
if let Some(inner) = self.0.as_ref() {
inner.context.baggage_items.iter().find(|x| x.name == name)
} else {
None
}
}
pub fn log<F>(&mut self, f: F)
where
F: FnOnce(&mut LogBuilder),
{
if let Some(inner) = self.0.as_mut() {
let mut builder = LogBuilder::new();
f(&mut builder);
if let Some(log) = builder.finish() {
inner.logs.push(log);
}
}
}
pub fn error_log<F>(&mut self, f: F)
where
F: FnOnce(&mut StdErrorLogFieldsBuilder),
{
if let Some(inner) = self.0.as_mut() {
let mut builder = LogBuilder::new();
f(&mut builder.error());
if let Some(log) = builder.finish() {
inner.logs.push(log);
}
if !inner.tags.iter().any(|x| x.name() == "error") {
inner.tags.push(StdTag::error());
}
}
}
pub fn child<N, F>(&self, operation_name: N, f: F) -> Span<T>
where
N: Into<Cow<'static, str>>,
T: Clone,
F: FnOnce(StartSpanOptions<AllSampler, T>) -> Span<T>,
{
self.handle().child(operation_name, move |mut opts| {
if let Some(finish_cb) = self.0.as_ref().and_then(|s| s.finish_cb.clone()) {
opts = opts.finish_callback(finish_cb);
}
f(opts)
})
}
pub fn follower<N, F>(&self, operation_name: N, f: F) -> Span<T>
where
N: Into<Cow<'static, str>>,
T: Clone,
F: FnOnce(StartSpanOptions<AllSampler, T>) -> Span<T>,
{
self.handle().follower(operation_name, f)
}
pub(crate) fn new<S>(state: T, opts: StartSpanOptions<S, T>) -> Self {
let context = SpanContext::new(state, opts.baggage_items);
let inner = SpanInner {
operation_name: opts.operation_name,
start_time: opts.start_time.unwrap_or_else(SystemTime::now),
finish_time: None,
references: opts.references,
tags: opts.tags,
logs: Vec::new(),
context,
finish_cb: opts.finish_cb,
span_tx: opts.span_tx.clone(),
};
Span(Some(inner))
}
}
impl<T> Drop for Span<T> {
fn drop(&mut self) {
if let Some(finish_cb) = self.take_finish_callback() {
finish_cb.0(self);
}
if let Some(inner) = self.0.take() {
let finished = FinishedSpan {
operation_name: inner.operation_name,
start_time: inner.start_time,
finish_time: inner.finish_time.unwrap_or_else(SystemTime::now),
references: inner.references,
tags: inner.tags,
logs: inner.logs,
context: inner.context,
};
inner.span_tx.0.consume_span(finished);
}
}
}
impl<T> MaybeAsRef<SpanContext<T>> for Span<T> {
fn maybe_as_ref(&self) -> Option<&SpanContext<T>> {
self.context()
}
}
#[derive(Debug)]
struct SpanInner<T> {
operation_name: Cow<'static, str>,
start_time: SystemTime,
finish_time: Option<SystemTime>,
references: Vec<SpanReference<T>>,
tags: Vec<Tag>,
logs: Vec<Log>,
context: SpanContext<T>,
finish_cb: Option<FinishSpanCallback<T>>,
span_tx: SharedSpanConsumer<T>,
}
#[derive(Debug)]
pub struct FinishedSpan<T> {
operation_name: Cow<'static, str>,
start_time: SystemTime,
finish_time: SystemTime,
references: Vec<SpanReference<T>>,
tags: Vec<Tag>,
logs: Vec<Log>,
context: SpanContext<T>,
}
impl<T> FinishedSpan<T> {
pub fn operation_name(&self) -> &str {
self.operation_name.as_ref()
}
pub fn start_time(&self) -> SystemTime {
self.start_time
}
pub fn finish_time(&self) -> SystemTime {
self.finish_time
}
pub fn logs(&self) -> &[Log] {
&self.logs
}
pub fn tags(&self) -> &[Tag] {
&self.tags
}
pub fn references(&self) -> &[SpanReference<T>] {
&self.references
}
pub fn context(&self) -> &SpanContext<T> {
&self.context
}
}
#[derive(Debug, Clone)]
pub struct SpanContext<T> {
state: T,
baggage_items: Vec<BaggageItem>,
}
impl<T> SpanContext<T> {
pub fn new(state: T, mut baggage_items: Vec<BaggageItem>) -> Self {
baggage_items.reverse();
baggage_items.sort_by(|a, b| a.name().cmp(b.name()));
baggage_items.dedup_by(|a, b| a.name() == b.name());
SpanContext {
state,
baggage_items,
}
}
pub fn state(&self) -> &T {
&self.state
}
pub fn baggage_items(&self) -> &[BaggageItem] {
&self.baggage_items
}
pub fn inject_to_text_map<C>(&self, carrier: &mut C) -> Result<()>
where
C: carrier::TextMap,
T: carrier::InjectToTextMap<C>,
{
track!(T::inject_to_text_map(self, carrier))
}
pub fn inject_to_http_header<C>(&self, carrier: &mut C) -> Result<()>
where
C: carrier::SetHttpHeaderField,
T: carrier::InjectToHttpHeader<C>,
{
track!(T::inject_to_http_header(self, carrier))
}
pub fn inject_to_binary<C>(&self, carrier: &mut C) -> Result<()>
where
C: Write,
T: carrier::InjectToBinary<C>,
{
track!(T::inject_to_binary(self, carrier))
}
pub fn extract_from_text_map<C>(carrier: &C) -> Result<Option<Self>>
where
C: carrier::TextMap,
T: carrier::ExtractFromTextMap<C>,
{
track!(T::extract_from_text_map(carrier))
}
pub fn extract_from_http_header<'a, C>(carrier: &'a C) -> Result<Option<Self>>
where
C: carrier::IterHttpHeaderFields<'a>,
T: carrier::ExtractFromHttpHeader<'a, C>,
{
track!(T::extract_from_http_header(carrier))
}
pub fn extract_from_binary<C>(carrier: &mut C) -> Result<Option<Self>>
where
C: Read,
T: carrier::ExtractFromBinary<C>,
{
track!(T::extract_from_binary(carrier))
}
}
impl<T> MaybeAsRef<SpanContext<T>> for SpanContext<T> {
fn maybe_as_ref(&self) -> Option<&Self> {
Some(self)
}
}
#[derive(Debug, Clone)]
pub struct BaggageItem {
name: String,
value: String,
}
impl BaggageItem {
pub fn new(name: &str, value: &str) -> Self {
BaggageItem {
name: name.to_owned(),
value: value.to_owned(),
}
}
pub fn name(&self) -> &str {
&self.name
}
pub fn value(&self) -> &str {
&self.value
}
}
#[derive(Debug, Clone)]
#[allow(missing_docs)]
pub enum SpanReference<T> {
ChildOf(T),
FollowsFrom(T),
}
impl<T> SpanReference<T> {
pub fn span(&self) -> &T {
match *self {
SpanReference::ChildOf(ref x) | SpanReference::FollowsFrom(ref x) => x,
}
}
pub fn is_child_of(&self) -> bool {
matches!(*self, SpanReference::ChildOf(_))
}
pub fn is_follows_from(&self) -> bool {
matches!(*self, SpanReference::FollowsFrom(_))
}
}
#[derive(Debug)]
pub struct CandidateSpan<'a, T: 'a> {
tags: &'a [Tag],
references: &'a [SpanReference<T>],
baggage_items: &'a [BaggageItem],
}
impl<'a, T: 'a> CandidateSpan<'a, T> {
pub fn tags(&self) -> &[Tag] {
self.tags
}
pub fn references(&self) -> &[SpanReference<T>] {
self.references
}
pub fn baggage_items(&self) -> &[BaggageItem] {
self.baggage_items
}
}
#[derive(Debug)]
pub struct StartSpanOptions<'a, S: 'a, T: 'a> {
operation_name: Cow<'static, str>,
start_time: Option<SystemTime>,
tags: Vec<Tag>,
references: Vec<SpanReference<T>>,
baggage_items: Vec<BaggageItem>,
finish_cb: Option<FinishSpanCallback<T>>,
span_tx: &'a SharedSpanConsumer<T>,
sampler: &'a S,
}
impl<'a, S: 'a, T: 'a> StartSpanOptions<'a, S, T>
where
S: Sampler<T>,
{
pub fn start_time(mut self, time: SystemTime) -> Self {
self.start_time = Some(time);
self
}
pub fn tag(mut self, tag: Tag) -> Self {
self.tags.push(tag);
self
}
pub fn finish_callback<C>(mut self, cb: C) -> Self
where
C: Into<FinishSpanCallback<T>>,
{
self.finish_cb = Some(cb.into());
self
}
pub fn child_of<C>(mut self, context: &C) -> Self
where
C: MaybeAsRef<SpanContext<T>>,
T: Clone,
{
if let Some(context) = context.maybe_as_ref() {
let reference = SpanReference::ChildOf(context.state().clone());
self.references.push(reference);
self.baggage_items
.extend(context.baggage_items().iter().cloned());
}
self
}
pub fn follows_from<C>(mut self, context: &C) -> Self
where
C: MaybeAsRef<SpanContext<T>>,
T: Clone,
{
if let Some(context) = context.maybe_as_ref() {
let reference = SpanReference::FollowsFrom(context.state().clone());
self.references.push(reference);
self.baggage_items
.extend(context.baggage_items().iter().cloned());
}
self
}
pub fn start(mut self) -> Span<T>
where
T: for<'b> From<CandidateSpan<'b, T>>,
{
self.normalize();
if !self.is_sampled() {
return Span(None);
}
let state = T::from(self.span());
Span::new(state, self)
}
pub fn start_with_state(mut self, state: T) -> Span<T> {
self.normalize();
if !self.is_sampled() {
return Span(None);
}
Span::new(state, self)
}
pub(crate) fn new<N>(
operation_name: N,
span_tx: &'a SharedSpanConsumer<T>,
sampler: &'a S,
) -> Self
where
N: Into<Cow<'static, str>>,
{
StartSpanOptions {
operation_name: operation_name.into(),
start_time: None,
tags: Vec::new(),
references: Vec::new(),
baggage_items: Vec::new(),
finish_cb: None,
span_tx,
sampler,
}
}
fn normalize(&mut self) {
self.tags.reverse();
self.tags.sort_by(|a, b| a.name().cmp(b.name()));
self.tags.dedup_by(|a, b| a.name() == b.name());
self.baggage_items.reverse();
self.baggage_items.sort_by(|a, b| a.name().cmp(b.name()));
self.baggage_items.dedup_by(|a, b| a.name() == b.name());
}
fn span(&self) -> CandidateSpan<'_, T> {
CandidateSpan {
references: &self.references,
tags: &self.tags,
baggage_items: &self.baggage_items,
}
}
fn is_sampled(&self) -> bool {
if let Some(&TagValue::Integer(n)) = self
.tags
.iter()
.find(|t| t.name() == "sampling.priority")
.map(|t| t.value())
{
n > 0
} else {
self.sampler.is_sampled(&self.span())
}
}
}
#[derive(Debug, Clone)]
pub struct SpanHandle<T>(Option<(SpanContext<T>, SharedSpanConsumer<T>)>);
impl<T> SpanHandle<T> {
pub fn is_sampled(&self) -> bool {
self.0.is_some()
}
pub fn context(&self) -> Option<&SpanContext<T>> {
self.0.as_ref().map(|(context, _)| context)
}
pub fn get_baggage_item(&self, name: &str) -> Option<&BaggageItem> {
if let Some(context) = self.context() {
context.baggage_items.iter().find(|x| x.name == name)
} else {
None
}
}
pub fn child<N, F>(&self, operation_name: N, f: F) -> Span<T>
where
N: Into<Cow<'static, str>>,
T: Clone,
F: FnOnce(StartSpanOptions<AllSampler, T>) -> Span<T>,
{
if let Some((context, span_tx)) = self.0.as_ref() {
let options =
StartSpanOptions::new(operation_name, span_tx, &AllSampler).child_of(context);
f(options)
} else {
Span::inactive()
}
}
pub fn follower<N, F>(&self, operation_name: N, f: F) -> Span<T>
where
N: Into<Cow<'static, str>>,
T: Clone,
F: FnOnce(StartSpanOptions<AllSampler, T>) -> Span<T>,
{
if let Some((context, span_tx)) = self.0.as_ref() {
let options =
StartSpanOptions::new(operation_name, span_tx, &AllSampler).follows_from(context);
f(options)
} else {
Span::inactive()
}
}
}
pub trait InspectableSpan<T> {
fn operation_name(&self) -> &str;
fn start_time(&self) -> SystemTime;
fn finish_time(&self) -> Option<SystemTime>;
fn logs(&self) -> &[Log];
fn tags(&self) -> &[Tag];
fn references(&self) -> &[SpanReference<T>];
}
impl<T> InspectableSpan<T> for Span<T> {
fn operation_name(&self) -> &str {
self.0
.as_ref()
.map(|inner| inner.operation_name.as_ref())
.unwrap_or("")
}
fn start_time(&self) -> SystemTime {
self.0
.as_ref()
.map(|inner| inner.start_time)
.unwrap_or_else(SystemTime::now)
}
fn finish_time(&self) -> Option<SystemTime> {
self.0.as_ref().and_then(|inner| inner.finish_time)
}
fn logs(&self) -> &[Log] {
self.0
.as_ref()
.map(|inner| inner.logs.as_ref())
.unwrap_or(&[])
}
fn tags(&self) -> &[Tag] {
self.0
.as_ref()
.map(|inner| inner.tags.as_ref())
.unwrap_or(&[])
}
fn references(&self) -> &[SpanReference<T>] {
self.0
.as_ref()
.map(|inner| inner.references.as_ref())
.unwrap_or(&[])
}
}