use crate::carrier;
use crate::convert::MaybeAsRef;
use crate::log::{Log, LogBuilder, StdErrorLogFieldsBuilder};
use crate::sampler::{AllSampler, Sampler};
use crate::tag::{StdTag, Tag, TagValue};
use crate::Result;
use std::borrow::Cow;
use std::io::{Read, Write};
use std::time::SystemTime;
pub type SpanReceiver<T> = crossbeam_channel::Receiver<FinishedSpan<T>>;
pub type SpanSender<T> = crossbeam_channel::Sender<FinishedSpan<T>>;
#[derive(Debug)]
pub struct Span<T>(Option<SpanInner<T>>);
impl<T> Span<T> {
pub fn inactive() -> Self {
Span(None)
}
pub fn handle(&self) -> SpanHandle<T>
where
T: Clone,
{
SpanHandle(
self.0
.as_ref()
.map(|inner| (inner.context.clone(), inner.span_tx.clone())),
)
}
pub fn is_sampled(&self) -> bool {
self.0.is_some()
}
pub fn context(&self) -> Option<&SpanContext<T>> {
self.0.as_ref().map(|x| &x.context)
}
pub fn set_operation_name<F, N>(&mut self, f: F)
where
F: FnOnce() -> N,
N: Into<Cow<'static, str>>,
{
if let Some(inner) = self.0.as_mut() {
inner.operation_name = f().into();
}
}
pub fn set_finish_time<F>(&mut self, f: F)
where
F: FnOnce() -> SystemTime,
{
if let Some(inner) = self.0.as_mut() {
inner.finish_time = Some(f());
}
}
pub fn set_tag<F>(&mut self, f: F)
where
F: FnOnce() -> Tag,
{
use std::iter::once;
self.set_tags(|| once(f()));
}
pub fn set_tags<F, I>(&mut self, f: F)
where
F: FnOnce() -> I,
I: IntoIterator<Item = Tag>,
{
if let Some(inner) = self.0.as_mut() {
for tag in f() {
inner.tags.retain(|x| x.name() != tag.name());
inner.tags.push(tag);
}
}
}
pub fn set_baggage_item<F>(&mut self, f: F)
where
F: FnOnce() -> BaggageItem,
{
if let Some(inner) = self.0.as_mut() {
let item = f();
inner.context.baggage_items.retain(|x| x.name != item.name);
inner.context.baggage_items.push(item);
}
}
pub fn get_baggage_item(&self, name: &str) -> Option<&BaggageItem> {
if let Some(inner) = self.0.as_ref() {
inner.context.baggage_items.iter().find(|x| x.name == name)
} else {
None
}
}
pub fn log<F>(&mut self, f: F)
where
F: FnOnce(&mut LogBuilder),
{
if let Some(inner) = self.0.as_mut() {
let mut builder = LogBuilder::new();
f(&mut builder);
if let Some(log) = builder.finish() {
inner.logs.push(log);
}
}
}
pub fn error_log<F>(&mut self, f: F)
where
F: FnOnce(&mut StdErrorLogFieldsBuilder),
{
if let Some(inner) = self.0.as_mut() {
let mut builder = LogBuilder::new();
f(&mut builder.error());
if let Some(log) = builder.finish() {
inner.logs.push(log);
}
if inner.tags.iter().find(|x| x.name() == "error").is_none() {
inner.tags.push(StdTag::error());
}
}
}
pub fn child<N, F>(&self, operation_name: N, f: F) -> Span<T>
where
N: Into<Cow<'static, str>>,
T: Clone,
F: FnOnce(StartSpanOptions<AllSampler, T>) -> Span<T>,
{
self.handle().child(operation_name, f)
}
pub fn follower<N, F>(&self, operation_name: N, f: F) -> Span<T>
where
N: Into<Cow<'static, str>>,
T: Clone,
F: FnOnce(StartSpanOptions<AllSampler, T>) -> Span<T>,
{
self.handle().follower(operation_name, f)
}
pub(crate) fn new(
operation_name: Cow<'static, str>,
start_time: SystemTime,
references: Vec<SpanReference<T>>,
tags: Vec<Tag>,
state: T,
baggage_items: Vec<BaggageItem>,
span_tx: SpanSender<T>,
) -> Self {
let context = SpanContext::new(state, baggage_items);
let inner = SpanInner {
operation_name,
start_time,
finish_time: None,
references,
tags,
logs: Vec::new(),
context,
span_tx,
};
Span(Some(inner))
}
}
impl<T> Drop for Span<T> {
fn drop(&mut self) {
if let Some(inner) = self.0.take() {
let finished = FinishedSpan {
operation_name: inner.operation_name,
start_time: inner.start_time,
finish_time: inner.finish_time.unwrap_or_else(SystemTime::now),
references: inner.references,
tags: inner.tags,
logs: inner.logs,
context: inner.context,
};
let _ = inner.span_tx.try_send(finished);
}
}
}
impl<T> MaybeAsRef<SpanContext<T>> for Span<T> {
fn maybe_as_ref(&self) -> Option<&SpanContext<T>> {
self.context()
}
}
#[derive(Debug)]
struct SpanInner<T> {
operation_name: Cow<'static, str>,
start_time: SystemTime,
finish_time: Option<SystemTime>,
references: Vec<SpanReference<T>>,
tags: Vec<Tag>,
logs: Vec<Log>,
context: SpanContext<T>,
span_tx: SpanSender<T>,
}
#[derive(Debug)]
pub struct FinishedSpan<T> {
operation_name: Cow<'static, str>,
start_time: SystemTime,
finish_time: SystemTime,
references: Vec<SpanReference<T>>,
tags: Vec<Tag>,
logs: Vec<Log>,
context: SpanContext<T>,
}
impl<T> FinishedSpan<T> {
pub fn operation_name(&self) -> &str {
self.operation_name.as_ref()
}
pub fn start_time(&self) -> SystemTime {
self.start_time
}
pub fn finish_time(&self) -> SystemTime {
self.finish_time
}
pub fn logs(&self) -> &[Log] {
&self.logs
}
pub fn tags(&self) -> &[Tag] {
&self.tags
}
pub fn references(&self) -> &[SpanReference<T>] {
&self.references
}
pub fn context(&self) -> &SpanContext<T> {
&self.context
}
}
#[derive(Debug, Clone)]
pub struct SpanContext<T> {
state: T,
baggage_items: Vec<BaggageItem>,
}
impl<T> SpanContext<T> {
pub fn new(state: T, mut baggage_items: Vec<BaggageItem>) -> Self {
baggage_items.reverse();
baggage_items.sort_by(|a, b| a.name().cmp(b.name()));
baggage_items.dedup_by(|a, b| a.name() == b.name());
SpanContext {
state,
baggage_items,
}
}
pub fn state(&self) -> &T {
&self.state
}
pub fn baggage_items(&self) -> &[BaggageItem] {
&self.baggage_items
}
pub fn inject_to_text_map<C>(&self, carrier: &mut C) -> Result<()>
where
C: carrier::TextMap,
T: carrier::InjectToTextMap<C>,
{
track!(T::inject_to_text_map(self, carrier))
}
pub fn inject_to_http_header<C>(&self, carrier: &mut C) -> Result<()>
where
C: carrier::SetHttpHeaderField,
T: carrier::InjectToHttpHeader<C>,
{
track!(T::inject_to_http_header(self, carrier))
}
pub fn inject_to_binary<C>(&self, carrier: &mut C) -> Result<()>
where
C: Write,
T: carrier::InjectToBinary<C>,
{
track!(T::inject_to_binary(self, carrier))
}
pub fn extract_from_text_map<C>(carrier: &C) -> Result<Option<Self>>
where
C: carrier::TextMap,
T: carrier::ExtractFromTextMap<C>,
{
track!(T::extract_from_text_map(carrier))
}
pub fn extract_from_http_header<'a, C>(carrier: &'a C) -> Result<Option<Self>>
where
C: carrier::IterHttpHeaderFields<'a>,
T: carrier::ExtractFromHttpHeader<'a, C>,
{
track!(T::extract_from_http_header(carrier))
}
pub fn extract_from_binary<C>(carrier: &mut C) -> Result<Option<Self>>
where
C: Read,
T: carrier::ExtractFromBinary<C>,
{
track!(T::extract_from_binary(carrier))
}
}
impl<T> MaybeAsRef<SpanContext<T>> for SpanContext<T> {
fn maybe_as_ref(&self) -> Option<&Self> {
Some(self)
}
}
#[derive(Debug, Clone)]
pub struct BaggageItem {
name: String,
value: String,
}
impl BaggageItem {
pub fn new(name: &str, value: &str) -> Self {
BaggageItem {
name: name.to_owned(),
value: value.to_owned(),
}
}
pub fn name(&self) -> &str {
&self.name
}
pub fn value(&self) -> &str {
&self.value
}
}
#[derive(Debug, Clone)]
#[allow(missing_docs)]
pub enum SpanReference<T> {
ChildOf(T),
FollowsFrom(T),
}
impl<T> SpanReference<T> {
pub fn span(&self) -> &T {
match *self {
SpanReference::ChildOf(ref x) | SpanReference::FollowsFrom(ref x) => x,
}
}
pub fn is_child_of(&self) -> bool {
if let SpanReference::ChildOf(_) = *self {
true
} else {
false
}
}
pub fn is_follows_from(&self) -> bool {
if let SpanReference::FollowsFrom(_) = *self {
true
} else {
false
}
}
}
#[derive(Debug)]
pub struct CandidateSpan<'a, T: 'a> {
tags: &'a [Tag],
references: &'a [SpanReference<T>],
baggage_items: &'a [BaggageItem],
}
impl<'a, T: 'a> CandidateSpan<'a, T> {
pub fn tags(&self) -> &[Tag] {
self.tags
}
pub fn references(&self) -> &[SpanReference<T>] {
self.references
}
pub fn baggage_items(&self) -> &[BaggageItem] {
self.baggage_items
}
}
#[derive(Debug)]
pub struct StartSpanOptions<'a, S: 'a, T: 'a> {
operation_name: Cow<'static, str>,
start_time: Option<SystemTime>,
tags: Vec<Tag>,
references: Vec<SpanReference<T>>,
baggage_items: Vec<BaggageItem>,
span_tx: &'a SpanSender<T>,
sampler: &'a S,
}
impl<'a, S: 'a, T: 'a> StartSpanOptions<'a, S, T>
where
S: Sampler<T>,
{
pub fn start_time(mut self, time: SystemTime) -> Self {
self.start_time = Some(time);
self
}
pub fn tag(mut self, tag: Tag) -> Self {
self.tags.push(tag);
self
}
pub fn child_of<C>(mut self, context: &C) -> Self
where
C: MaybeAsRef<SpanContext<T>>,
T: Clone,
{
if let Some(context) = context.maybe_as_ref() {
let reference = SpanReference::ChildOf(context.state().clone());
self.references.push(reference);
self.baggage_items
.extend(context.baggage_items().iter().cloned());
}
self
}
pub fn follows_from<C>(mut self, context: &C) -> Self
where
C: MaybeAsRef<SpanContext<T>>,
T: Clone,
{
if let Some(context) = context.maybe_as_ref() {
let reference = SpanReference::FollowsFrom(context.state().clone());
self.references.push(reference);
self.baggage_items
.extend(context.baggage_items().iter().cloned());
}
self
}
pub fn start(mut self) -> Span<T>
where
T: for<'b> From<CandidateSpan<'b, T>>,
{
self.normalize();
if !self.is_sampled() {
return Span(None);
}
let state = T::from(self.span());
Span::new(
self.operation_name,
self.start_time.unwrap_or_else(SystemTime::now),
self.references,
self.tags,
state,
self.baggage_items,
self.span_tx.clone(),
)
}
pub fn start_with_state(mut self, state: T) -> Span<T> {
self.normalize();
if !self.is_sampled() {
return Span(None);
}
Span::new(
self.operation_name,
self.start_time.unwrap_or_else(SystemTime::now),
self.references,
self.tags,
state,
self.baggage_items,
self.span_tx.clone(),
)
}
pub(crate) fn new<N>(operation_name: N, span_tx: &'a SpanSender<T>, sampler: &'a S) -> Self
where
N: Into<Cow<'static, str>>,
{
StartSpanOptions {
operation_name: operation_name.into(),
start_time: None,
tags: Vec::new(),
references: Vec::new(),
baggage_items: Vec::new(),
span_tx,
sampler,
}
}
fn normalize(&mut self) {
self.tags.reverse();
self.tags.sort_by(|a, b| a.name().cmp(b.name()));
self.tags.dedup_by(|a, b| a.name() == b.name());
self.baggage_items.reverse();
self.baggage_items.sort_by(|a, b| a.name().cmp(b.name()));
self.baggage_items.dedup_by(|a, b| a.name() == b.name());
}
fn span(&self) -> CandidateSpan<T> {
CandidateSpan {
references: &self.references,
tags: &self.tags,
baggage_items: &self.baggage_items,
}
}
fn is_sampled(&self) -> bool {
if let Some(&TagValue::Integer(n)) = self
.tags
.iter()
.find(|t| t.name() == "sampling.priority")
.map(|t| t.value())
{
n > 0
} else {
self.sampler.is_sampled(&self.span())
}
}
}
#[derive(Debug, Clone)]
pub struct SpanHandle<T>(Option<(SpanContext<T>, SpanSender<T>)>);
impl<T> SpanHandle<T> {
pub fn is_sampled(&self) -> bool {
self.0.is_some()
}
pub fn context(&self) -> Option<&SpanContext<T>> {
self.0.as_ref().map(|&(ref context, _)| context)
}
pub fn get_baggage_item(&self, name: &str) -> Option<&BaggageItem> {
if let Some(context) = self.context() {
context.baggage_items.iter().find(|x| x.name == name)
} else {
None
}
}
pub fn child<N, F>(&self, operation_name: N, f: F) -> Span<T>
where
N: Into<Cow<'static, str>>,
T: Clone,
F: FnOnce(StartSpanOptions<AllSampler, T>) -> Span<T>,
{
if let Some(&(ref context, ref span_tx)) = self.0.as_ref() {
let options =
StartSpanOptions::new(operation_name, span_tx, &AllSampler).child_of(context);
f(options)
} else {
Span::inactive()
}
}
pub fn follower<N, F>(&self, operation_name: N, f: F) -> Span<T>
where
N: Into<Cow<'static, str>>,
T: Clone,
F: FnOnce(StartSpanOptions<AllSampler, T>) -> Span<T>,
{
if let Some(&(ref context, ref span_tx)) = self.0.as_ref() {
let options =
StartSpanOptions::new(operation_name, span_tx, &AllSampler).follows_from(context);
f(options)
} else {
Span::inactive()
}
}
}