cf_rustracing/
span.rs

1//! Span.
2use crate::carrier;
3use crate::convert::MaybeAsRef;
4use crate::log::{Log, LogBuilder, StdErrorLogFieldsBuilder};
5use crate::sampler::{AllSampler, Sampler};
6use crate::tag::{StdTag, Tag, TagValue};
7use crate::Result;
8use std::borrow::Cow;
9use std::io::{Read, Write};
10use std::time::SystemTime;
11use tokio::sync::mpsc;
12
13/// Finished span receiver.
14pub type SpanReceiver<T> = mpsc::UnboundedReceiver<FinishedSpan<T>>;
15/// Sender of finished spans to the destination channel.
16pub type SpanSender<T> = mpsc::UnboundedSender<FinishedSpan<T>>;
17
18/// Span.
19///
20/// When this span is dropped, it will be converted to `FinishedSpan` and
21/// it will be sent to the associated `SpanReceiver`.
22#[derive(Debug)]
23pub struct Span<T>(Option<SpanInner<T>>);
24impl<T> Span<T> {
25    /// Makes an inactive span.
26    ///
27    /// This span is never traced.
28    ///
29    /// # Examples
30    ///
31    /// ```
32    /// use cf_rustracing::span::Span;
33    ///
34    /// let span = Span::<()>::inactive();
35    /// assert!(! span.is_sampled());
36    /// ```
37    pub fn inactive() -> Self {
38        Span(None)
39    }
40
41    /// Returns a handle of this span.
42    pub fn handle(&self) -> SpanHandle<T>
43    where
44        T: Clone,
45    {
46        SpanHandle(
47            self.0
48                .as_ref()
49                .map(|inner| (inner.context.clone(), inner.span_tx.clone())),
50        )
51    }
52
53    /// Returns `true` if this span is sampled (i.e., being traced).
54    pub fn is_sampled(&self) -> bool {
55        self.0.is_some()
56    }
57
58    /// Returns the context of this span.
59    pub fn context(&self) -> Option<&SpanContext<T>> {
60        self.0.as_ref().map(|x| &x.context)
61    }
62
63    /// Sets the operation name of this span.
64    pub fn set_operation_name<F, N>(&mut self, f: F)
65    where
66        F: FnOnce() -> N,
67        N: Into<Cow<'static, str>>,
68    {
69        if let Some(inner) = self.0.as_mut() {
70            inner.operation_name = f().into();
71        }
72    }
73
74    /// Sets the start time of this span.
75    pub fn set_start_time<F>(&mut self, f: F)
76    where
77        F: FnOnce() -> SystemTime,
78    {
79        if let Some(inner) = self.0.as_mut() {
80            inner.start_time = f();
81        }
82    }
83
84    /// Sets the finish time of this span.
85    pub fn set_finish_time<F>(&mut self, f: F)
86    where
87        F: FnOnce() -> SystemTime,
88    {
89        if let Some(inner) = self.0.as_mut() {
90            inner.finish_time = Some(f());
91        }
92    }
93
94    /// Sets the tag to this span.
95    pub fn set_tag<F>(&mut self, f: F)
96    where
97        F: FnOnce() -> Tag,
98    {
99        use std::iter::once;
100        self.set_tags(|| once(f()));
101    }
102
103    /// Sets the tags to this span.
104    pub fn set_tags<F, I>(&mut self, f: F)
105    where
106        F: FnOnce() -> I,
107        I: IntoIterator<Item = Tag>,
108    {
109        if let Some(inner) = self.0.as_mut() {
110            for tag in f() {
111                inner.tags.retain(|x| x.name() != tag.name());
112                inner.tags.push(tag);
113            }
114        }
115    }
116
117    /// Sets the baggage item to this span.
118    pub fn set_baggage_item<F>(&mut self, f: F)
119    where
120        F: FnOnce() -> BaggageItem,
121    {
122        if let Some(inner) = self.0.as_mut() {
123            let item = f();
124            inner.context.baggage_items.retain(|x| x.name != item.name);
125            inner.context.baggage_items.push(item);
126        }
127    }
128
129    /// Gets the baggage item that has the name `name`.
130    pub fn get_baggage_item(&self, name: &str) -> Option<&BaggageItem> {
131        if let Some(inner) = self.0.as_ref() {
132            inner.context.baggage_items.iter().find(|x| x.name == name)
133        } else {
134            None
135        }
136    }
137
138    /// Logs structured data.
139    pub fn log<F>(&mut self, f: F)
140    where
141        F: FnOnce(&mut LogBuilder),
142    {
143        if let Some(inner) = self.0.as_mut() {
144            let mut builder = LogBuilder::new();
145            f(&mut builder);
146            if let Some(log) = builder.finish() {
147                inner.logs.push(log);
148            }
149        }
150    }
151
152    /// Logs an error.
153    ///
154    /// This is a simple wrapper of `log` method
155    /// except that the `StdTag::error()` tag will be set in this method.
156    pub fn error_log<F>(&mut self, f: F)
157    where
158        F: FnOnce(&mut StdErrorLogFieldsBuilder),
159    {
160        if let Some(inner) = self.0.as_mut() {
161            let mut builder = LogBuilder::new();
162            f(&mut builder.error());
163            if let Some(log) = builder.finish() {
164                inner.logs.push(log);
165            }
166            if !inner.tags.iter().any(|x| x.name() == "error") {
167                inner.tags.push(StdTag::error());
168            }
169        }
170    }
171
172    /// Starts a `ChildOf` span if this span is sampled.
173    pub fn child<N, F>(&self, operation_name: N, f: F) -> Span<T>
174    where
175        N: Into<Cow<'static, str>>,
176        T: Clone,
177        F: FnOnce(StartSpanOptions<AllSampler, T>) -> Span<T>,
178    {
179        self.handle().child(operation_name, f)
180    }
181
182    /// Starts a `FollowsFrom` span if this span is sampled.
183    pub fn follower<N, F>(&self, operation_name: N, f: F) -> Span<T>
184    where
185        N: Into<Cow<'static, str>>,
186        T: Clone,
187        F: FnOnce(StartSpanOptions<AllSampler, T>) -> Span<T>,
188    {
189        self.handle().follower(operation_name, f)
190    }
191
192    pub(crate) fn new(
193        operation_name: Cow<'static, str>,
194        start_time: SystemTime,
195        references: Vec<SpanReference<T>>,
196        tags: Vec<Tag>,
197        state: T,
198        baggage_items: Vec<BaggageItem>,
199        span_tx: SpanSender<T>,
200    ) -> Self {
201        let context = SpanContext::new(state, baggage_items);
202        let inner = SpanInner {
203            operation_name,
204            start_time,
205            finish_time: None,
206            references,
207            tags,
208            logs: Vec::new(),
209            context,
210            span_tx,
211        };
212        Span(Some(inner))
213    }
214}
215impl<T> Drop for Span<T> {
216    fn drop(&mut self) {
217        if let Some(inner) = self.0.take() {
218            let finished = FinishedSpan {
219                operation_name: inner.operation_name,
220                start_time: inner.start_time,
221                finish_time: inner.finish_time.unwrap_or_else(SystemTime::now),
222                references: inner.references,
223                tags: inner.tags,
224                logs: inner.logs,
225                context: inner.context,
226            };
227            let _ = inner.span_tx.send(finished);
228        }
229    }
230}
231impl<T> MaybeAsRef<SpanContext<T>> for Span<T> {
232    fn maybe_as_ref(&self) -> Option<&SpanContext<T>> {
233        self.context()
234    }
235}
236
237#[derive(Debug)]
238struct SpanInner<T> {
239    operation_name: Cow<'static, str>,
240    start_time: SystemTime,
241    finish_time: Option<SystemTime>,
242    references: Vec<SpanReference<T>>,
243    tags: Vec<Tag>,
244    logs: Vec<Log>,
245    context: SpanContext<T>,
246    span_tx: SpanSender<T>,
247}
248
249/// Finished span.
250#[derive(Debug)]
251pub struct FinishedSpan<T> {
252    operation_name: Cow<'static, str>,
253    start_time: SystemTime,
254    finish_time: SystemTime,
255    references: Vec<SpanReference<T>>,
256    tags: Vec<Tag>,
257    logs: Vec<Log>,
258    context: SpanContext<T>,
259}
260impl<T> FinishedSpan<T> {
261    /// Returns the operation name of this span.
262    pub fn operation_name(&self) -> &str {
263        self.operation_name.as_ref()
264    }
265
266    /// Returns the start time of this span.
267    pub fn start_time(&self) -> SystemTime {
268        self.start_time
269    }
270
271    /// Returns the finish time of this span.
272    pub fn finish_time(&self) -> SystemTime {
273        self.finish_time
274    }
275
276    /// Returns the logs recorded during this span.
277    pub fn logs(&self) -> &[Log] {
278        &self.logs
279    }
280
281    /// Returns the tags of this span.
282    pub fn tags(&self) -> &[Tag] {
283        &self.tags
284    }
285
286    /// Returns the references of this span.
287    pub fn references(&self) -> &[SpanReference<T>] {
288        &self.references
289    }
290
291    /// Returns the context of this span.
292    pub fn context(&self) -> &SpanContext<T> {
293        &self.context
294    }
295}
296
297/// Span context.
298///
299/// Each `SpanContext` encapsulates the following state:
300///
301/// - `T`: OpenTracing-implementation-dependent state (for example, trace and span ids) needed to refer to a distinct `Span` across a process boundary
302/// - `BaggageItems`: These are just key:value pairs that cross process boundaries
303#[derive(Debug, Clone)]
304pub struct SpanContext<T> {
305    state: T,
306    baggage_items: Vec<BaggageItem>,
307}
308impl<T> SpanContext<T> {
309    /// Makes a new `SpanContext` instance.
310    pub fn new(state: T, mut baggage_items: Vec<BaggageItem>) -> Self {
311        baggage_items.reverse();
312        baggage_items.sort_by(|a, b| a.name().cmp(b.name()));
313        baggage_items.dedup_by(|a, b| a.name() == b.name());
314        SpanContext {
315            state,
316            baggage_items,
317        }
318    }
319
320    /// Returns the implementation-dependent state of this context.
321    pub fn state(&self) -> &T {
322        &self.state
323    }
324
325    /// Returns the baggage items associated with this context.
326    pub fn baggage_items(&self) -> &[BaggageItem] {
327        &self.baggage_items
328    }
329
330    /// Injects this context to the **Text Map** `carrier`.
331    pub fn inject_to_text_map<C>(&self, carrier: &mut C) -> Result<()>
332    where
333        C: carrier::TextMap,
334        T: carrier::InjectToTextMap<C>,
335    {
336        track!(T::inject_to_text_map(self, carrier))
337    }
338
339    /// Injects this context to the **HTTP Header** `carrier`.
340    pub fn inject_to_http_header<C>(&self, carrier: &mut C) -> Result<()>
341    where
342        C: carrier::SetHttpHeaderField,
343        T: carrier::InjectToHttpHeader<C>,
344    {
345        track!(T::inject_to_http_header(self, carrier))
346    }
347
348    /// Injects this context to the **Binary** `carrier`.
349    pub fn inject_to_binary<C>(&self, carrier: &mut C) -> Result<()>
350    where
351        C: Write,
352        T: carrier::InjectToBinary<C>,
353    {
354        track!(T::inject_to_binary(self, carrier))
355    }
356
357    /// Extracts a context from the **Text Map** `carrier`.
358    pub fn extract_from_text_map<C>(carrier: &C) -> Result<Option<Self>>
359    where
360        C: carrier::TextMap,
361        T: carrier::ExtractFromTextMap<C>,
362    {
363        track!(T::extract_from_text_map(carrier))
364    }
365
366    /// Extracts a context from the **HTTP Header** `carrier`.
367    pub fn extract_from_http_header<'a, C>(carrier: &'a C) -> Result<Option<Self>>
368    where
369        C: carrier::IterHttpHeaderFields<'a>,
370        T: carrier::ExtractFromHttpHeader<'a, C>,
371    {
372        track!(T::extract_from_http_header(carrier))
373    }
374
375    /// Extracts a context from the **Binary** `carrier`.
376    pub fn extract_from_binary<C>(carrier: &mut C) -> Result<Option<Self>>
377    where
378        C: Read,
379        T: carrier::ExtractFromBinary<C>,
380    {
381        track!(T::extract_from_binary(carrier))
382    }
383}
384impl<T> MaybeAsRef<SpanContext<T>> for SpanContext<T> {
385    fn maybe_as_ref(&self) -> Option<&Self> {
386        Some(self)
387    }
388}
389
390/// Baggage item.
391///
392/// `BaggageItem`s are key:value string pairs that apply to a `Span`, its `SpanContext`,
393/// and all `Span`s which directly or transitively reference the local `Span`.
394/// That is, `BaggageItem`s propagate in-band along with the trace itself.
395///
396/// `BaggageItem`s enable powerful functionality given a full-stack OpenTracing integration
397/// (for example, arbitrary application data from a mobile app can make it, transparently,
398/// all the way into the depths of a storage system),
399/// and with it some powerful costs: use this feature with care.
400///
401/// Use this feature thoughtfully and with care.
402/// Every key and value is copied into every local and remote child of the associated `Span`,
403/// and that can add up to a lot of network and cpu overhead.
404#[derive(Debug, Clone)]
405pub struct BaggageItem {
406    name: String,
407    value: String,
408}
409impl BaggageItem {
410    /// Makes a new `BaggageItem` instance.
411    pub fn new(name: &str, value: &str) -> Self {
412        BaggageItem {
413            name: name.to_owned(),
414            value: value.to_owned(),
415        }
416    }
417
418    /// Returns the name of this item.
419    pub fn name(&self) -> &str {
420        &self.name
421    }
422
423    /// Returns the value of this item.
424    pub fn value(&self) -> &str {
425        &self.value
426    }
427}
428
429/// Span reference.
430#[derive(Debug, Clone)]
431#[allow(missing_docs)]
432pub enum SpanReference<T> {
433    ChildOf(T),
434    FollowsFrom(T),
435}
436impl<T> SpanReference<T> {
437    /// Returns the span context state of this reference.
438    pub fn span(&self) -> &T {
439        match *self {
440            SpanReference::ChildOf(ref x) | SpanReference::FollowsFrom(ref x) => x,
441        }
442    }
443
444    /// Returns `true` if this is a `ChildOf` reference.
445    pub fn is_child_of(&self) -> bool {
446        matches!(*self, SpanReference::ChildOf(_))
447    }
448
449    /// Returns `true` if this is a `FollowsFrom` reference.
450    pub fn is_follows_from(&self) -> bool {
451        matches!(*self, SpanReference::FollowsFrom(_))
452    }
453}
454
455/// Candidate span for tracing.
456#[derive(Debug)]
457pub struct CandidateSpan<'a, T: 'a> {
458    tags: &'a [Tag],
459    references: &'a [SpanReference<T>],
460    baggage_items: &'a [BaggageItem],
461}
462impl<'a, T: 'a> CandidateSpan<'a, T> {
463    /// Returns the tags of this span.
464    pub fn tags(&self) -> &[Tag] {
465        self.tags
466    }
467
468    /// Returns the references of this span.
469    pub fn references(&self) -> &[SpanReference<T>] {
470        self.references
471    }
472
473    /// Returns the baggage items of this span.
474    pub fn baggage_items(&self) -> &[BaggageItem] {
475        self.baggage_items
476    }
477}
478
479/// Options for starting a span.
480#[derive(Debug)]
481pub struct StartSpanOptions<'a, S: 'a, T: 'a> {
482    operation_name: Cow<'static, str>,
483    start_time: Option<SystemTime>,
484    tags: Vec<Tag>,
485    references: Vec<SpanReference<T>>,
486    baggage_items: Vec<BaggageItem>,
487    span_tx: &'a SpanSender<T>,
488    sampler: &'a S,
489}
490impl<'a, S: 'a, T: 'a> StartSpanOptions<'a, S, T>
491where
492    S: Sampler<T>,
493{
494    /// Sets the start time of this span.
495    pub fn start_time(mut self, time: SystemTime) -> Self {
496        self.start_time = Some(time);
497        self
498    }
499
500    /// Sets the tag to this span.
501    pub fn tag(mut self, tag: Tag) -> Self {
502        self.tags.push(tag);
503        self
504    }
505
506    /// Adds the `ChildOf` reference to this span.
507    pub fn child_of<C>(mut self, context: &C) -> Self
508    where
509        C: MaybeAsRef<SpanContext<T>>,
510        T: Clone,
511    {
512        if let Some(context) = context.maybe_as_ref() {
513            let reference = SpanReference::ChildOf(context.state().clone());
514            self.references.push(reference);
515            self.baggage_items
516                .extend(context.baggage_items().iter().cloned());
517        }
518        self
519    }
520
521    /// Adds the `FollowsFrom` reference to this span.
522    pub fn follows_from<C>(mut self, context: &C) -> Self
523    where
524        C: MaybeAsRef<SpanContext<T>>,
525        T: Clone,
526    {
527        if let Some(context) = context.maybe_as_ref() {
528            let reference = SpanReference::FollowsFrom(context.state().clone());
529            self.references.push(reference);
530            self.baggage_items
531                .extend(context.baggage_items().iter().cloned());
532        }
533        self
534    }
535
536    /// Starts a new span.
537    pub fn start(mut self) -> Span<T>
538    where
539        T: for<'b> From<CandidateSpan<'b, T>>,
540    {
541        self.normalize();
542        if !self.is_sampled() {
543            return Span(None);
544        }
545        let state = T::from(self.span());
546        Span::new(
547            self.operation_name,
548            self.start_time.unwrap_or_else(SystemTime::now),
549            self.references,
550            self.tags,
551            state,
552            self.baggage_items,
553            self.span_tx.clone(),
554        )
555    }
556
557    /// Starts a new span with the explicit `state`.
558    pub fn start_with_state(mut self, state: T) -> Span<T> {
559        self.normalize();
560        if !self.is_sampled() {
561            return Span(None);
562        }
563        Span::new(
564            self.operation_name,
565            self.start_time.unwrap_or_else(SystemTime::now),
566            self.references,
567            self.tags,
568            state,
569            self.baggage_items,
570            self.span_tx.clone(),
571        )
572    }
573
574    pub(crate) fn new<N>(operation_name: N, span_tx: &'a SpanSender<T>, sampler: &'a S) -> Self
575    where
576        N: Into<Cow<'static, str>>,
577    {
578        StartSpanOptions {
579            operation_name: operation_name.into(),
580            start_time: None,
581            tags: Vec::new(),
582            references: Vec::new(),
583            baggage_items: Vec::new(),
584            span_tx,
585            sampler,
586        }
587    }
588
589    fn normalize(&mut self) {
590        self.tags.reverse();
591        self.tags.sort_by(|a, b| a.name().cmp(b.name()));
592        self.tags.dedup_by(|a, b| a.name() == b.name());
593
594        self.baggage_items.reverse();
595        self.baggage_items.sort_by(|a, b| a.name().cmp(b.name()));
596        self.baggage_items.dedup_by(|a, b| a.name() == b.name());
597    }
598
599    fn span(&self) -> CandidateSpan<T> {
600        CandidateSpan {
601            references: &self.references,
602            tags: &self.tags,
603            baggage_items: &self.baggage_items,
604        }
605    }
606
607    fn is_sampled(&self) -> bool {
608        if let Some(&TagValue::Integer(n)) = self
609            .tags
610            .iter()
611            .find(|t| t.name() == "sampling.priority")
612            .map(|t| t.value())
613        {
614            n > 0
615        } else {
616            self.sampler.is_sampled(&self.span())
617        }
618    }
619}
620
621/// Immutable handle of `Span`.
622#[derive(Debug, Clone)]
623pub struct SpanHandle<T>(Option<(SpanContext<T>, SpanSender<T>)>);
624impl<T> SpanHandle<T> {
625    /// Returns `true` if this span is sampled (i.e., being traced).
626    pub fn is_sampled(&self) -> bool {
627        self.0.is_some()
628    }
629
630    /// Returns the context of this span.
631    pub fn context(&self) -> Option<&SpanContext<T>> {
632        self.0.as_ref().map(|(context, _)| context)
633    }
634
635    /// Gets the baggage item that has the name `name`.
636    pub fn get_baggage_item(&self, name: &str) -> Option<&BaggageItem> {
637        if let Some(context) = self.context() {
638            context.baggage_items.iter().find(|x| x.name == name)
639        } else {
640            None
641        }
642    }
643
644    /// Starts a `ChildOf` span if this span is sampled.
645    pub fn child<N, F>(&self, operation_name: N, f: F) -> Span<T>
646    where
647        N: Into<Cow<'static, str>>,
648        T: Clone,
649        F: FnOnce(StartSpanOptions<AllSampler, T>) -> Span<T>,
650    {
651        if let Some((context, span_tx)) = self.0.as_ref() {
652            let options =
653                StartSpanOptions::new(operation_name, span_tx, &AllSampler).child_of(context);
654            f(options)
655        } else {
656            Span::inactive()
657        }
658    }
659
660    /// Starts a `FollowsFrom` span if this span is sampled.
661    pub fn follower<N, F>(&self, operation_name: N, f: F) -> Span<T>
662    where
663        N: Into<Cow<'static, str>>,
664        T: Clone,
665        F: FnOnce(StartSpanOptions<AllSampler, T>) -> Span<T>,
666    {
667        if let Some((context, span_tx)) = self.0.as_ref() {
668            let options =
669                StartSpanOptions::new(operation_name, span_tx, &AllSampler).follows_from(context);
670            f(options)
671        } else {
672            Span::inactive()
673        }
674    }
675}
676
677/// Extension trait for inspecting an in-progress `Span`.
678pub trait InspectableSpan<T> {
679    /// Returns the operation name of this span.
680    fn operation_name(&self) -> &str;
681
682    /// Returns the start time of this span.
683    fn start_time(&self) -> SystemTime;
684
685    /// Returns the finish time of this span, if it has finished.
686    fn finish_time(&self) -> Option<SystemTime>;
687
688    /// Returns the logs recorded during this span.
689    fn logs(&self) -> &[Log];
690
691    /// Returns the tags of this span.
692    fn tags(&self) -> &[Tag];
693
694    /// Returns the references of this span.
695    fn references(&self) -> &[SpanReference<T>];
696}
697
698impl<T> InspectableSpan<T> for Span<T> {
699    /// Returns the operation name of this span.
700    fn operation_name(&self) -> &str {
701        self.0
702            .as_ref()
703            .map(|inner| inner.operation_name.as_ref())
704            .unwrap_or("")
705    }
706
707    /// Returns the start time of this span.
708    fn start_time(&self) -> SystemTime {
709        self.0
710            .as_ref()
711            .map(|inner| inner.start_time)
712            .unwrap_or_else(SystemTime::now)
713    }
714
715    /// Returns the finish time of this span.
716    fn finish_time(&self) -> Option<SystemTime> {
717        self.0.as_ref().and_then(|inner| inner.finish_time)
718    }
719
720    /// Returns the logs recorded during this span.
721    fn logs(&self) -> &[Log] {
722        self.0
723            .as_ref()
724            .map(|inner| inner.logs.as_ref())
725            .unwrap_or(&[])
726    }
727
728    /// Returns the tags of this span.
729    fn tags(&self) -> &[Tag] {
730        self.0
731            .as_ref()
732            .map(|inner| inner.tags.as_ref())
733            .unwrap_or(&[])
734    }
735
736    /// Returns the references of this span.
737    fn references(&self) -> &[SpanReference<T>] {
738        self.0
739            .as_ref()
740            .map(|inner| inner.references.as_ref())
741            .unwrap_or(&[])
742    }
743}