1use crate::carrier;
3use crate::convert::MaybeAsRef;
4use crate::log::{Log, LogBuilder, StdErrorLogFieldsBuilder};
5use crate::sampler::{AllSampler, Sampler};
6use crate::tag::{StdTag, Tag, TagValue};
7use crate::Result;
8use std::borrow::Cow;
9use std::fmt;
10use std::io::{Read, Write};
11use std::sync::Arc;
12use std::time::SystemTime;
13use tokio::sync::mpsc;
14
15pub type SpanReceiver<T> = mpsc::UnboundedReceiver<FinishedSpan<T>>;
17pub type SpanSender<T> = mpsc::UnboundedSender<FinishedSpan<T>>;
19
20pub struct FinishSpanCallback<T>(FinishCallbackInner<T>);
22type FinishCallbackInner<T> = Arc<dyn Fn(&mut Span<T>) + Send + Sync>;
23
24impl<T> fmt::Debug for FinishSpanCallback<T> {
25 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
26 f.write_str("FinishSpanCallback")
27 }
28}
29
30impl<T> Clone for FinishSpanCallback<T> {
31 fn clone(&self) -> Self {
32 Self(self.0.clone())
33 }
34}
35
36impl<T> From<FinishSpanCallback<T>> for FinishCallbackInner<T> {
37 fn from(v: FinishSpanCallback<T>) -> Self {
38 v.0
39 }
40}
41
42impl<T> From<FinishCallbackInner<T>> for FinishSpanCallback<T> {
43 fn from(v: FinishCallbackInner<T>) -> Self {
44 Self(v)
45 }
46}
47
48impl<T, F: Fn(&mut Span<T>) + Send + Sync + 'static> From<F> for FinishSpanCallback<T> {
49 fn from(v: F) -> Self {
50 Self(Arc::new(v))
51 }
52}
53
54#[derive(Debug)]
59pub struct Span<T>(Option<SpanInner<T>>);
60impl<T> Span<T> {
61 pub const fn inactive() -> Self {
74 Span(None)
75 }
76
77 pub fn handle(&self) -> SpanHandle<T>
79 where
80 T: Clone,
81 {
82 SpanHandle(
83 self.0
84 .as_ref()
85 .map(|inner| (inner.context.clone(), inner.span_tx.clone())),
86 )
87 }
88
89 pub fn is_sampled(&self) -> bool {
91 self.0.is_some()
92 }
93
94 pub fn context(&self) -> Option<&SpanContext<T>> {
96 self.0.as_ref().map(|x| &x.context)
97 }
98
99 pub fn set_operation_name<F, N>(&mut self, f: F)
101 where
102 F: FnOnce() -> N,
103 N: Into<Cow<'static, str>>,
104 {
105 if let Some(inner) = self.0.as_mut() {
106 inner.operation_name = f().into();
107 }
108 }
109
110 pub fn set_start_time<F>(&mut self, f: F)
112 where
113 F: FnOnce() -> SystemTime,
114 {
115 if let Some(inner) = self.0.as_mut() {
116 inner.start_time = f();
117 }
118 }
119
120 pub fn set_finish_time<F>(&mut self, f: F)
122 where
123 F: FnOnce() -> SystemTime,
124 {
125 if let Some(inner) = self.0.as_mut() {
126 inner.finish_time = Some(f());
127 }
128 }
129
130 pub fn set_finish_callback<C>(&mut self, cb: C)
132 where
133 C: Into<FinishSpanCallback<T>>,
134 {
135 if let Some(inner) = &mut self.0 {
136 inner.finish_cb = Some(cb.into());
137 }
138 }
139
140 #[doc(alias = "remove_finish_callback")]
145 pub fn take_finish_callback(&mut self) -> Option<FinishSpanCallback<T>> {
146 self.0.as_mut().and_then(|s| s.finish_cb.take())
147 }
148
149 pub fn set_tag<F>(&mut self, f: F)
151 where
152 F: FnOnce() -> Tag,
153 {
154 use std::iter::once;
155 self.set_tags(|| once(f()));
156 }
157
158 pub fn set_tags<F, I>(&mut self, f: F)
160 where
161 F: FnOnce() -> I,
162 I: IntoIterator<Item = Tag>,
163 {
164 if let Some(inner) = self.0.as_mut() {
165 for tag in f() {
166 inner.tags.retain(|x| x.name() != tag.name());
167 inner.tags.push(tag);
168 }
169 }
170 }
171
172 pub fn set_baggage_item<F>(&mut self, f: F)
174 where
175 F: FnOnce() -> BaggageItem,
176 {
177 if let Some(inner) = self.0.as_mut() {
178 let item = f();
179 inner.context.baggage_items.retain(|x| x.name != item.name);
180 inner.context.baggage_items.push(item);
181 }
182 }
183
184 pub fn get_baggage_item(&self, name: &str) -> Option<&BaggageItem> {
186 if let Some(inner) = self.0.as_ref() {
187 inner.context.baggage_items.iter().find(|x| x.name == name)
188 } else {
189 None
190 }
191 }
192
193 pub fn log<F>(&mut self, f: F)
195 where
196 F: FnOnce(&mut LogBuilder),
197 {
198 if let Some(inner) = self.0.as_mut() {
199 let mut builder = LogBuilder::new();
200 f(&mut builder);
201 if let Some(log) = builder.finish() {
202 inner.logs.push(log);
203 }
204 }
205 }
206
207 pub fn error_log<F>(&mut self, f: F)
212 where
213 F: FnOnce(&mut StdErrorLogFieldsBuilder),
214 {
215 if let Some(inner) = self.0.as_mut() {
216 let mut builder = LogBuilder::new();
217 f(&mut builder.error());
218 if let Some(log) = builder.finish() {
219 inner.logs.push(log);
220 }
221 if !inner.tags.iter().any(|x| x.name() == "error") {
222 inner.tags.push(StdTag::error());
223 }
224 }
225 }
226
227 pub fn child<N, F>(&self, operation_name: N, f: F) -> Span<T>
232 where
233 N: Into<Cow<'static, str>>,
234 T: Clone,
235 F: FnOnce(StartSpanOptions<AllSampler, T>) -> Span<T>,
236 {
237 self.handle().child(operation_name, move |mut opts| {
238 if let Some(finish_cb) = self.0.as_ref().and_then(|s| s.finish_cb.clone()) {
239 opts = opts.finish_callback(finish_cb);
240 }
241 f(opts)
242 })
243 }
244
245 pub fn follower<N, F>(&self, operation_name: N, f: F) -> Span<T>
247 where
248 N: Into<Cow<'static, str>>,
249 T: Clone,
250 F: FnOnce(StartSpanOptions<AllSampler, T>) -> Span<T>,
251 {
252 self.handle().follower(operation_name, f)
253 }
254
255 pub(crate) fn new<S>(state: T, opts: StartSpanOptions<S, T>) -> Self {
256 let context = SpanContext::new(state, opts.baggage_items);
257 let inner = SpanInner {
258 operation_name: opts.operation_name,
259 start_time: opts.start_time.unwrap_or_else(SystemTime::now),
260 finish_time: None,
261 references: opts.references,
262 tags: opts.tags,
263 logs: Vec::new(),
264 context,
265 finish_cb: opts.finish_cb,
266 span_tx: opts.span_tx.clone(),
267 };
268 Span(Some(inner))
269 }
270}
271impl<T> Drop for Span<T> {
272 fn drop(&mut self) {
273 if let Some(finish_cb) = self.take_finish_callback() {
274 finish_cb.0(self);
275 }
276
277 if let Some(inner) = self.0.take() {
278 let finished = FinishedSpan {
279 operation_name: inner.operation_name,
280 start_time: inner.start_time,
281 finish_time: inner.finish_time.unwrap_or_else(SystemTime::now),
282 references: inner.references,
283 tags: inner.tags,
284 logs: inner.logs,
285 context: inner.context,
286 };
287 let _ = inner.span_tx.send(finished);
288 }
289 }
290}
291impl<T> MaybeAsRef<SpanContext<T>> for Span<T> {
292 fn maybe_as_ref(&self) -> Option<&SpanContext<T>> {
293 self.context()
294 }
295}
296
297#[derive(Debug)]
298struct SpanInner<T> {
299 operation_name: Cow<'static, str>,
300 start_time: SystemTime,
301 finish_time: Option<SystemTime>,
302 references: Vec<SpanReference<T>>,
303 tags: Vec<Tag>,
304 logs: Vec<Log>,
305 context: SpanContext<T>,
306 finish_cb: Option<FinishSpanCallback<T>>,
307 span_tx: SpanSender<T>,
308}
309
310#[derive(Debug)]
312pub struct FinishedSpan<T> {
313 operation_name: Cow<'static, str>,
314 start_time: SystemTime,
315 finish_time: SystemTime,
316 references: Vec<SpanReference<T>>,
317 tags: Vec<Tag>,
318 logs: Vec<Log>,
319 context: SpanContext<T>,
320}
321impl<T> FinishedSpan<T> {
322 pub fn operation_name(&self) -> &str {
324 self.operation_name.as_ref()
325 }
326
327 pub fn start_time(&self) -> SystemTime {
329 self.start_time
330 }
331
332 pub fn finish_time(&self) -> SystemTime {
334 self.finish_time
335 }
336
337 pub fn logs(&self) -> &[Log] {
339 &self.logs
340 }
341
342 pub fn tags(&self) -> &[Tag] {
344 &self.tags
345 }
346
347 pub fn references(&self) -> &[SpanReference<T>] {
349 &self.references
350 }
351
352 pub fn context(&self) -> &SpanContext<T> {
354 &self.context
355 }
356}
357
358#[derive(Debug, Clone)]
365pub struct SpanContext<T> {
366 state: T,
367 baggage_items: Vec<BaggageItem>,
368}
369impl<T> SpanContext<T> {
370 pub fn new(state: T, mut baggage_items: Vec<BaggageItem>) -> Self {
372 baggage_items.reverse();
373 baggage_items.sort_by(|a, b| a.name().cmp(b.name()));
374 baggage_items.dedup_by(|a, b| a.name() == b.name());
375 SpanContext {
376 state,
377 baggage_items,
378 }
379 }
380
381 pub fn state(&self) -> &T {
383 &self.state
384 }
385
386 pub fn baggage_items(&self) -> &[BaggageItem] {
388 &self.baggage_items
389 }
390
391 pub fn inject_to_text_map<C>(&self, carrier: &mut C) -> Result<()>
393 where
394 C: carrier::TextMap,
395 T: carrier::InjectToTextMap<C>,
396 {
397 track!(T::inject_to_text_map(self, carrier))
398 }
399
400 pub fn inject_to_http_header<C>(&self, carrier: &mut C) -> Result<()>
402 where
403 C: carrier::SetHttpHeaderField,
404 T: carrier::InjectToHttpHeader<C>,
405 {
406 track!(T::inject_to_http_header(self, carrier))
407 }
408
409 pub fn inject_to_binary<C>(&self, carrier: &mut C) -> Result<()>
411 where
412 C: Write,
413 T: carrier::InjectToBinary<C>,
414 {
415 track!(T::inject_to_binary(self, carrier))
416 }
417
418 pub fn extract_from_text_map<C>(carrier: &C) -> Result<Option<Self>>
420 where
421 C: carrier::TextMap,
422 T: carrier::ExtractFromTextMap<C>,
423 {
424 track!(T::extract_from_text_map(carrier))
425 }
426
427 pub fn extract_from_http_header<'a, C>(carrier: &'a C) -> Result<Option<Self>>
429 where
430 C: carrier::IterHttpHeaderFields<'a>,
431 T: carrier::ExtractFromHttpHeader<'a, C>,
432 {
433 track!(T::extract_from_http_header(carrier))
434 }
435
436 pub fn extract_from_binary<C>(carrier: &mut C) -> Result<Option<Self>>
438 where
439 C: Read,
440 T: carrier::ExtractFromBinary<C>,
441 {
442 track!(T::extract_from_binary(carrier))
443 }
444}
445impl<T> MaybeAsRef<SpanContext<T>> for SpanContext<T> {
446 fn maybe_as_ref(&self) -> Option<&Self> {
447 Some(self)
448 }
449}
450
451#[derive(Debug, Clone)]
466pub struct BaggageItem {
467 name: String,
468 value: String,
469}
470impl BaggageItem {
471 pub fn new(name: &str, value: &str) -> Self {
473 BaggageItem {
474 name: name.to_owned(),
475 value: value.to_owned(),
476 }
477 }
478
479 pub fn name(&self) -> &str {
481 &self.name
482 }
483
484 pub fn value(&self) -> &str {
486 &self.value
487 }
488}
489
490#[derive(Debug, Clone)]
492#[allow(missing_docs)]
493pub enum SpanReference<T> {
494 ChildOf(T),
495 FollowsFrom(T),
496}
497impl<T> SpanReference<T> {
498 pub fn span(&self) -> &T {
500 match *self {
501 SpanReference::ChildOf(ref x) | SpanReference::FollowsFrom(ref x) => x,
502 }
503 }
504
505 pub fn is_child_of(&self) -> bool {
507 matches!(*self, SpanReference::ChildOf(_))
508 }
509
510 pub fn is_follows_from(&self) -> bool {
512 matches!(*self, SpanReference::FollowsFrom(_))
513 }
514}
515
516#[derive(Debug)]
518pub struct CandidateSpan<'a, T: 'a> {
519 tags: &'a [Tag],
520 references: &'a [SpanReference<T>],
521 baggage_items: &'a [BaggageItem],
522}
523impl<'a, T: 'a> CandidateSpan<'a, T> {
524 pub fn tags(&self) -> &[Tag] {
526 self.tags
527 }
528
529 pub fn references(&self) -> &[SpanReference<T>] {
531 self.references
532 }
533
534 pub fn baggage_items(&self) -> &[BaggageItem] {
536 self.baggage_items
537 }
538}
539
540#[derive(Debug)]
542pub struct StartSpanOptions<'a, S: 'a, T: 'a> {
543 operation_name: Cow<'static, str>,
544 start_time: Option<SystemTime>,
545 tags: Vec<Tag>,
546 references: Vec<SpanReference<T>>,
547 baggage_items: Vec<BaggageItem>,
548 finish_cb: Option<FinishSpanCallback<T>>,
549 span_tx: &'a SpanSender<T>,
550 sampler: &'a S,
551}
552impl<'a, S: 'a, T: 'a> StartSpanOptions<'a, S, T>
553where
554 S: Sampler<T>,
555{
556 pub fn start_time(mut self, time: SystemTime) -> Self {
558 self.start_time = Some(time);
559 self
560 }
561
562 pub fn tag(mut self, tag: Tag) -> Self {
564 self.tags.push(tag);
565 self
566 }
567
568 pub fn finish_callback<C>(mut self, cb: C) -> Self
570 where
571 C: Into<FinishSpanCallback<T>>,
572 {
573 self.finish_cb = Some(cb.into());
574 self
575 }
576
577 pub fn child_of<C>(mut self, context: &C) -> Self
579 where
580 C: MaybeAsRef<SpanContext<T>>,
581 T: Clone,
582 {
583 if let Some(context) = context.maybe_as_ref() {
584 let reference = SpanReference::ChildOf(context.state().clone());
585 self.references.push(reference);
586 self.baggage_items
587 .extend(context.baggage_items().iter().cloned());
588 }
589 self
590 }
591
592 pub fn follows_from<C>(mut self, context: &C) -> Self
594 where
595 C: MaybeAsRef<SpanContext<T>>,
596 T: Clone,
597 {
598 if let Some(context) = context.maybe_as_ref() {
599 let reference = SpanReference::FollowsFrom(context.state().clone());
600 self.references.push(reference);
601 self.baggage_items
602 .extend(context.baggage_items().iter().cloned());
603 }
604 self
605 }
606
607 pub fn start(mut self) -> Span<T>
609 where
610 T: for<'b> From<CandidateSpan<'b, T>>,
611 {
612 self.normalize();
613 if !self.is_sampled() {
614 return Span(None);
615 }
616 let state = T::from(self.span());
617 Span::new(state, self)
618 }
619
620 pub fn start_with_state(mut self, state: T) -> Span<T> {
622 self.normalize();
623 if !self.is_sampled() {
624 return Span(None);
625 }
626 Span::new(state, self)
627 }
628
629 pub(crate) fn new<N>(operation_name: N, span_tx: &'a SpanSender<T>, sampler: &'a S) -> Self
630 where
631 N: Into<Cow<'static, str>>,
632 {
633 StartSpanOptions {
634 operation_name: operation_name.into(),
635 start_time: None,
636 tags: Vec::new(),
637 references: Vec::new(),
638 baggage_items: Vec::new(),
639 finish_cb: None,
640 span_tx,
641 sampler,
642 }
643 }
644
645 fn normalize(&mut self) {
646 self.tags.reverse();
647 self.tags.sort_by(|a, b| a.name().cmp(b.name()));
648 self.tags.dedup_by(|a, b| a.name() == b.name());
649
650 self.baggage_items.reverse();
651 self.baggage_items.sort_by(|a, b| a.name().cmp(b.name()));
652 self.baggage_items.dedup_by(|a, b| a.name() == b.name());
653 }
654
655 fn span(&self) -> CandidateSpan<'_, T> {
656 CandidateSpan {
657 references: &self.references,
658 tags: &self.tags,
659 baggage_items: &self.baggage_items,
660 }
661 }
662
663 fn is_sampled(&self) -> bool {
664 if let Some(&TagValue::Integer(n)) = self
665 .tags
666 .iter()
667 .find(|t| t.name() == "sampling.priority")
668 .map(|t| t.value())
669 {
670 n > 0
671 } else {
672 self.sampler.is_sampled(&self.span())
673 }
674 }
675}
676
677#[derive(Debug, Clone)]
679pub struct SpanHandle<T>(Option<(SpanContext<T>, SpanSender<T>)>);
680impl<T> SpanHandle<T> {
681 pub fn is_sampled(&self) -> bool {
683 self.0.is_some()
684 }
685
686 pub fn context(&self) -> Option<&SpanContext<T>> {
688 self.0.as_ref().map(|(context, _)| context)
689 }
690
691 pub fn get_baggage_item(&self, name: &str) -> Option<&BaggageItem> {
693 if let Some(context) = self.context() {
694 context.baggage_items.iter().find(|x| x.name == name)
695 } else {
696 None
697 }
698 }
699
700 pub fn child<N, F>(&self, operation_name: N, f: F) -> Span<T>
702 where
703 N: Into<Cow<'static, str>>,
704 T: Clone,
705 F: FnOnce(StartSpanOptions<AllSampler, T>) -> Span<T>,
706 {
707 if let Some((context, span_tx)) = self.0.as_ref() {
708 let options =
709 StartSpanOptions::new(operation_name, span_tx, &AllSampler).child_of(context);
710 f(options)
711 } else {
712 Span::inactive()
713 }
714 }
715
716 pub fn follower<N, F>(&self, operation_name: N, f: F) -> Span<T>
718 where
719 N: Into<Cow<'static, str>>,
720 T: Clone,
721 F: FnOnce(StartSpanOptions<AllSampler, T>) -> Span<T>,
722 {
723 if let Some((context, span_tx)) = self.0.as_ref() {
724 let options =
725 StartSpanOptions::new(operation_name, span_tx, &AllSampler).follows_from(context);
726 f(options)
727 } else {
728 Span::inactive()
729 }
730 }
731}
732
733pub trait InspectableSpan<T> {
735 fn operation_name(&self) -> &str;
737
738 fn start_time(&self) -> SystemTime;
740
741 fn finish_time(&self) -> Option<SystemTime>;
743
744 fn logs(&self) -> &[Log];
746
747 fn tags(&self) -> &[Tag];
749
750 fn references(&self) -> &[SpanReference<T>];
752}
753
754impl<T> InspectableSpan<T> for Span<T> {
755 fn operation_name(&self) -> &str {
757 self.0
758 .as_ref()
759 .map(|inner| inner.operation_name.as_ref())
760 .unwrap_or("")
761 }
762
763 fn start_time(&self) -> SystemTime {
765 self.0
766 .as_ref()
767 .map(|inner| inner.start_time)
768 .unwrap_or_else(SystemTime::now)
769 }
770
771 fn finish_time(&self) -> Option<SystemTime> {
773 self.0.as_ref().and_then(|inner| inner.finish_time)
774 }
775
776 fn logs(&self) -> &[Log] {
778 self.0
779 .as_ref()
780 .map(|inner| inner.logs.as_ref())
781 .unwrap_or(&[])
782 }
783
784 fn tags(&self) -> &[Tag] {
786 self.0
787 .as_ref()
788 .map(|inner| inner.tags.as_ref())
789 .unwrap_or(&[])
790 }
791
792 fn references(&self) -> &[SpanReference<T>] {
794 self.0
795 .as_ref()
796 .map(|inner| inner.references.as_ref())
797 .unwrap_or(&[])
798 }
799}