1use crate::carrier;
3use crate::convert::MaybeAsRef;
4use crate::log::{Log, LogBuilder, StdErrorLogFieldsBuilder};
5use crate::sampler::{AllSampler, Sampler};
6use crate::tag::{StdTag, Tag, TagValue};
7use crate::Result;
8use std::borrow::Cow;
9use std::fmt;
10use std::io::{Read, Write};
11use std::sync::Arc;
12use std::time::SystemTime;
13use tokio::sync::mpsc;
14
15pub trait SpanConsumer<T>: Send + Sync {
17 fn consume_span(&self, span: FinishedSpan<T>);
22}
23
24impl<T: Send> SpanConsumer<T> for mpsc::UnboundedSender<FinishedSpan<T>> {
25 fn consume_span(&self, span: FinishedSpan<T>) {
26 let _ = self.send(span);
27 }
28}
29
30impl<T: Send> SpanConsumer<T> for mpsc::Sender<FinishedSpan<T>> {
31 fn consume_span(&self, span: FinishedSpan<T>) {
32 let _ = self.try_send(span);
33 }
34}
35
36impl<T: Send> SpanConsumer<T> for std::sync::mpsc::Sender<FinishedSpan<T>> {
37 fn consume_span(&self, span: FinishedSpan<T>) {
38 let _ = self.send(span);
39 }
40}
41
42impl<T: Send> SpanConsumer<T> for std::sync::mpsc::SyncSender<FinishedSpan<T>> {
43 fn consume_span(&self, span: FinishedSpan<T>) {
44 let _ = self.try_send(span);
45 }
46}
47
48pub type SpanReceiver<T> = mpsc::UnboundedReceiver<FinishedSpan<T>>;
50#[deprecated = "SpanSender is an implementation detail of rustracing. It should not be public."]
52pub type SpanSender<T> = mpsc::UnboundedSender<FinishedSpan<T>>;
53
54pub(crate) struct SharedSpanConsumer<T>(Arc<dyn SpanConsumer<T>>);
56
57impl<T> SharedSpanConsumer<T> {
58 pub(crate) fn new(consumer: impl SpanConsumer<T> + 'static) -> Self {
59 Self(Arc::new(consumer))
60 }
61}
62
63impl<T> fmt::Debug for SharedSpanConsumer<T> {
64 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
65 f.write_str("SharedSpanConsumer")
66 }
67}
68
69impl<T> Clone for SharedSpanConsumer<T> {
70 fn clone(&self) -> Self {
71 Self(Arc::clone(&self.0))
72 }
73}
74
75pub struct FinishSpanCallback<T>(FinishCallbackInner<T>);
77type FinishCallbackInner<T> = Arc<dyn Fn(&mut Span<T>) + Send + Sync>;
78
79impl<T> fmt::Debug for FinishSpanCallback<T> {
80 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
81 f.write_str("FinishSpanCallback")
82 }
83}
84
85impl<T> Clone for FinishSpanCallback<T> {
86 fn clone(&self) -> Self {
87 Self(self.0.clone())
88 }
89}
90
91impl<T> From<FinishSpanCallback<T>> for FinishCallbackInner<T> {
92 fn from(v: FinishSpanCallback<T>) -> Self {
93 v.0
94 }
95}
96
97impl<T> From<FinishCallbackInner<T>> for FinishSpanCallback<T> {
98 fn from(v: FinishCallbackInner<T>) -> Self {
99 Self(v)
100 }
101}
102
103impl<T, F: Fn(&mut Span<T>) + Send + Sync + 'static> From<F> for FinishSpanCallback<T> {
104 fn from(v: F) -> Self {
105 Self(Arc::new(v))
106 }
107}
108
109#[derive(Debug)]
114pub struct Span<T>(Option<SpanInner<T>>);
115impl<T> Span<T> {
116 pub const fn inactive() -> Self {
129 Span(None)
130 }
131
132 pub fn handle(&self) -> SpanHandle<T>
134 where
135 T: Clone,
136 {
137 SpanHandle(
138 self.0
139 .as_ref()
140 .map(|inner| (inner.context.clone(), inner.span_tx.clone())),
141 )
142 }
143
144 pub fn is_sampled(&self) -> bool {
146 self.0.is_some()
147 }
148
149 pub fn context(&self) -> Option<&SpanContext<T>> {
151 self.0.as_ref().map(|x| &x.context)
152 }
153
154 pub fn set_operation_name<F, N>(&mut self, f: F)
156 where
157 F: FnOnce() -> N,
158 N: Into<Cow<'static, str>>,
159 {
160 if let Some(inner) = self.0.as_mut() {
161 inner.operation_name = f().into();
162 }
163 }
164
165 pub fn set_start_time<F>(&mut self, f: F)
167 where
168 F: FnOnce() -> SystemTime,
169 {
170 if let Some(inner) = self.0.as_mut() {
171 inner.start_time = f();
172 }
173 }
174
175 pub fn set_finish_time<F>(&mut self, f: F)
177 where
178 F: FnOnce() -> SystemTime,
179 {
180 if let Some(inner) = self.0.as_mut() {
181 inner.finish_time = Some(f());
182 }
183 }
184
185 pub fn set_finish_callback<C>(&mut self, cb: C)
187 where
188 C: Into<FinishSpanCallback<T>>,
189 {
190 if let Some(inner) = &mut self.0 {
191 inner.finish_cb = Some(cb.into());
192 }
193 }
194
195 #[doc(alias = "remove_finish_callback")]
200 pub fn take_finish_callback(&mut self) -> Option<FinishSpanCallback<T>> {
201 self.0.as_mut().and_then(|s| s.finish_cb.take())
202 }
203
204 pub fn set_tag<F>(&mut self, f: F)
206 where
207 F: FnOnce() -> Tag,
208 {
209 use std::iter::once;
210 self.set_tags(|| once(f()));
211 }
212
213 pub fn set_tags<F, I>(&mut self, f: F)
215 where
216 F: FnOnce() -> I,
217 I: IntoIterator<Item = Tag>,
218 {
219 if let Some(inner) = self.0.as_mut() {
220 for tag in f() {
221 inner.tags.retain(|x| x.name() != tag.name());
222 inner.tags.push(tag);
223 }
224 }
225 }
226
227 pub fn set_baggage_item<F>(&mut self, f: F)
229 where
230 F: FnOnce() -> BaggageItem,
231 {
232 if let Some(inner) = self.0.as_mut() {
233 let item = f();
234 inner.context.baggage_items.retain(|x| x.name != item.name);
235 inner.context.baggage_items.push(item);
236 }
237 }
238
239 pub fn get_baggage_item(&self, name: &str) -> Option<&BaggageItem> {
241 if let Some(inner) = self.0.as_ref() {
242 inner.context.baggage_items.iter().find(|x| x.name == name)
243 } else {
244 None
245 }
246 }
247
248 pub fn log<F>(&mut self, f: F)
250 where
251 F: FnOnce(&mut LogBuilder),
252 {
253 if let Some(inner) = self.0.as_mut() {
254 let mut builder = LogBuilder::new();
255 f(&mut builder);
256 if let Some(log) = builder.finish() {
257 inner.logs.push(log);
258 }
259 }
260 }
261
262 pub fn error_log<F>(&mut self, f: F)
267 where
268 F: FnOnce(&mut StdErrorLogFieldsBuilder),
269 {
270 if let Some(inner) = self.0.as_mut() {
271 let mut builder = LogBuilder::new();
272 f(&mut builder.error());
273 if let Some(log) = builder.finish() {
274 inner.logs.push(log);
275 }
276 if !inner.tags.iter().any(|x| x.name() == "error") {
277 inner.tags.push(StdTag::error());
278 }
279 }
280 }
281
282 pub fn child<N, F>(&self, operation_name: N, f: F) -> Span<T>
287 where
288 N: Into<Cow<'static, str>>,
289 T: Clone,
290 F: FnOnce(StartSpanOptions<AllSampler, T>) -> Span<T>,
291 {
292 self.handle().child(operation_name, move |mut opts| {
293 if let Some(finish_cb) = self.0.as_ref().and_then(|s| s.finish_cb.clone()) {
294 opts = opts.finish_callback(finish_cb);
295 }
296 f(opts)
297 })
298 }
299
300 pub fn follower<N, F>(&self, operation_name: N, f: F) -> Span<T>
302 where
303 N: Into<Cow<'static, str>>,
304 T: Clone,
305 F: FnOnce(StartSpanOptions<AllSampler, T>) -> Span<T>,
306 {
307 self.handle().follower(operation_name, f)
308 }
309
310 pub(crate) fn new<S>(state: T, opts: StartSpanOptions<S, T>) -> Self {
311 let context = SpanContext::new(state, opts.baggage_items);
312 let inner = SpanInner {
313 operation_name: opts.operation_name,
314 start_time: opts.start_time.unwrap_or_else(SystemTime::now),
315 finish_time: None,
316 references: opts.references,
317 tags: opts.tags,
318 logs: Vec::new(),
319 context,
320 finish_cb: opts.finish_cb,
321 span_tx: opts.span_tx.clone(),
322 };
323 Span(Some(inner))
324 }
325}
326impl<T> Drop for Span<T> {
327 fn drop(&mut self) {
328 if let Some(finish_cb) = self.take_finish_callback() {
329 finish_cb.0(self);
330 }
331
332 if let Some(inner) = self.0.take() {
333 let finished = FinishedSpan {
334 operation_name: inner.operation_name,
335 start_time: inner.start_time,
336 finish_time: inner.finish_time.unwrap_or_else(SystemTime::now),
337 references: inner.references,
338 tags: inner.tags,
339 logs: inner.logs,
340 context: inner.context,
341 };
342 inner.span_tx.0.consume_span(finished);
343 }
344 }
345}
346impl<T> MaybeAsRef<SpanContext<T>> for Span<T> {
347 fn maybe_as_ref(&self) -> Option<&SpanContext<T>> {
348 self.context()
349 }
350}
351
352#[derive(Debug)]
353struct SpanInner<T> {
354 operation_name: Cow<'static, str>,
355 start_time: SystemTime,
356 finish_time: Option<SystemTime>,
357 references: Vec<SpanReference<T>>,
358 tags: Vec<Tag>,
359 logs: Vec<Log>,
360 context: SpanContext<T>,
361 finish_cb: Option<FinishSpanCallback<T>>,
362 span_tx: SharedSpanConsumer<T>,
363}
364
365#[derive(Debug)]
367pub struct FinishedSpan<T> {
368 operation_name: Cow<'static, str>,
369 start_time: SystemTime,
370 finish_time: SystemTime,
371 references: Vec<SpanReference<T>>,
372 tags: Vec<Tag>,
373 logs: Vec<Log>,
374 context: SpanContext<T>,
375}
376impl<T> FinishedSpan<T> {
377 pub fn operation_name(&self) -> &str {
379 self.operation_name.as_ref()
380 }
381
382 pub fn start_time(&self) -> SystemTime {
384 self.start_time
385 }
386
387 pub fn finish_time(&self) -> SystemTime {
389 self.finish_time
390 }
391
392 pub fn logs(&self) -> &[Log] {
394 &self.logs
395 }
396
397 pub fn tags(&self) -> &[Tag] {
399 &self.tags
400 }
401
402 pub fn references(&self) -> &[SpanReference<T>] {
404 &self.references
405 }
406
407 pub fn context(&self) -> &SpanContext<T> {
409 &self.context
410 }
411}
412
413#[derive(Debug, Clone)]
420pub struct SpanContext<T> {
421 state: T,
422 baggage_items: Vec<BaggageItem>,
423}
424impl<T> SpanContext<T> {
425 pub fn new(state: T, mut baggage_items: Vec<BaggageItem>) -> Self {
427 baggage_items.reverse();
428 baggage_items.sort_by(|a, b| a.name().cmp(b.name()));
429 baggage_items.dedup_by(|a, b| a.name() == b.name());
430 SpanContext {
431 state,
432 baggage_items,
433 }
434 }
435
436 pub fn state(&self) -> &T {
438 &self.state
439 }
440
441 pub fn baggage_items(&self) -> &[BaggageItem] {
443 &self.baggage_items
444 }
445
446 pub fn inject_to_text_map<C>(&self, carrier: &mut C) -> Result<()>
448 where
449 C: carrier::TextMap,
450 T: carrier::InjectToTextMap<C>,
451 {
452 track!(T::inject_to_text_map(self, carrier))
453 }
454
455 pub fn inject_to_http_header<C>(&self, carrier: &mut C) -> Result<()>
457 where
458 C: carrier::SetHttpHeaderField,
459 T: carrier::InjectToHttpHeader<C>,
460 {
461 track!(T::inject_to_http_header(self, carrier))
462 }
463
464 pub fn inject_to_binary<C>(&self, carrier: &mut C) -> Result<()>
466 where
467 C: Write,
468 T: carrier::InjectToBinary<C>,
469 {
470 track!(T::inject_to_binary(self, carrier))
471 }
472
473 pub fn extract_from_text_map<C>(carrier: &C) -> Result<Option<Self>>
475 where
476 C: carrier::TextMap,
477 T: carrier::ExtractFromTextMap<C>,
478 {
479 track!(T::extract_from_text_map(carrier))
480 }
481
482 pub fn extract_from_http_header<'a, C>(carrier: &'a C) -> Result<Option<Self>>
484 where
485 C: carrier::IterHttpHeaderFields<'a>,
486 T: carrier::ExtractFromHttpHeader<'a, C>,
487 {
488 track!(T::extract_from_http_header(carrier))
489 }
490
491 pub fn extract_from_binary<C>(carrier: &mut C) -> Result<Option<Self>>
493 where
494 C: Read,
495 T: carrier::ExtractFromBinary<C>,
496 {
497 track!(T::extract_from_binary(carrier))
498 }
499}
500impl<T> MaybeAsRef<SpanContext<T>> for SpanContext<T> {
501 fn maybe_as_ref(&self) -> Option<&Self> {
502 Some(self)
503 }
504}
505
506#[derive(Debug, Clone)]
521pub struct BaggageItem {
522 name: String,
523 value: String,
524}
525impl BaggageItem {
526 pub fn new(name: &str, value: &str) -> Self {
528 BaggageItem {
529 name: name.to_owned(),
530 value: value.to_owned(),
531 }
532 }
533
534 pub fn name(&self) -> &str {
536 &self.name
537 }
538
539 pub fn value(&self) -> &str {
541 &self.value
542 }
543}
544
545#[derive(Debug, Clone)]
547#[allow(missing_docs)]
548pub enum SpanReference<T> {
549 ChildOf(T),
550 FollowsFrom(T),
551}
552impl<T> SpanReference<T> {
553 pub fn span(&self) -> &T {
555 match *self {
556 SpanReference::ChildOf(ref x) | SpanReference::FollowsFrom(ref x) => x,
557 }
558 }
559
560 pub fn is_child_of(&self) -> bool {
562 matches!(*self, SpanReference::ChildOf(_))
563 }
564
565 pub fn is_follows_from(&self) -> bool {
567 matches!(*self, SpanReference::FollowsFrom(_))
568 }
569}
570
571#[derive(Debug)]
573pub struct CandidateSpan<'a, T: 'a> {
574 tags: &'a [Tag],
575 references: &'a [SpanReference<T>],
576 baggage_items: &'a [BaggageItem],
577}
578impl<'a, T: 'a> CandidateSpan<'a, T> {
579 pub fn tags(&self) -> &[Tag] {
581 self.tags
582 }
583
584 pub fn references(&self) -> &[SpanReference<T>] {
586 self.references
587 }
588
589 pub fn baggage_items(&self) -> &[BaggageItem] {
591 self.baggage_items
592 }
593}
594
595#[derive(Debug)]
597pub struct StartSpanOptions<'a, S: 'a, T: 'a> {
598 operation_name: Cow<'static, str>,
599 start_time: Option<SystemTime>,
600 tags: Vec<Tag>,
601 references: Vec<SpanReference<T>>,
602 baggage_items: Vec<BaggageItem>,
603 finish_cb: Option<FinishSpanCallback<T>>,
604 span_tx: &'a SharedSpanConsumer<T>,
605 sampler: &'a S,
606}
607impl<'a, S: 'a, T: 'a> StartSpanOptions<'a, S, T>
608where
609 S: Sampler<T>,
610{
611 pub fn start_time(mut self, time: SystemTime) -> Self {
613 self.start_time = Some(time);
614 self
615 }
616
617 pub fn tag(mut self, tag: Tag) -> Self {
619 self.tags.push(tag);
620 self
621 }
622
623 pub fn finish_callback<C>(mut self, cb: C) -> Self
625 where
626 C: Into<FinishSpanCallback<T>>,
627 {
628 self.finish_cb = Some(cb.into());
629 self
630 }
631
632 pub fn child_of<C>(mut self, context: &C) -> Self
634 where
635 C: MaybeAsRef<SpanContext<T>>,
636 T: Clone,
637 {
638 if let Some(context) = context.maybe_as_ref() {
639 let reference = SpanReference::ChildOf(context.state().clone());
640 self.references.push(reference);
641 self.baggage_items
642 .extend(context.baggage_items().iter().cloned());
643 }
644 self
645 }
646
647 pub fn follows_from<C>(mut self, context: &C) -> Self
649 where
650 C: MaybeAsRef<SpanContext<T>>,
651 T: Clone,
652 {
653 if let Some(context) = context.maybe_as_ref() {
654 let reference = SpanReference::FollowsFrom(context.state().clone());
655 self.references.push(reference);
656 self.baggage_items
657 .extend(context.baggage_items().iter().cloned());
658 }
659 self
660 }
661
662 pub fn start(mut self) -> Span<T>
664 where
665 T: for<'b> From<CandidateSpan<'b, T>>,
666 {
667 self.normalize();
668 if !self.is_sampled() {
669 return Span(None);
670 }
671 let state = T::from(self.span());
672 Span::new(state, self)
673 }
674
675 pub fn start_with_state(mut self, state: T) -> Span<T> {
677 self.normalize();
678 if !self.is_sampled() {
679 return Span(None);
680 }
681 Span::new(state, self)
682 }
683
684 pub(crate) fn new<N>(
685 operation_name: N,
686 span_tx: &'a SharedSpanConsumer<T>,
687 sampler: &'a S,
688 ) -> Self
689 where
690 N: Into<Cow<'static, str>>,
691 {
692 StartSpanOptions {
693 operation_name: operation_name.into(),
694 start_time: None,
695 tags: Vec::new(),
696 references: Vec::new(),
697 baggage_items: Vec::new(),
698 finish_cb: None,
699 span_tx,
700 sampler,
701 }
702 }
703
704 fn normalize(&mut self) {
705 self.tags.reverse();
706 self.tags.sort_by(|a, b| a.name().cmp(b.name()));
707 self.tags.dedup_by(|a, b| a.name() == b.name());
708
709 self.baggage_items.reverse();
710 self.baggage_items.sort_by(|a, b| a.name().cmp(b.name()));
711 self.baggage_items.dedup_by(|a, b| a.name() == b.name());
712 }
713
714 fn span(&self) -> CandidateSpan<'_, T> {
715 CandidateSpan {
716 references: &self.references,
717 tags: &self.tags,
718 baggage_items: &self.baggage_items,
719 }
720 }
721
722 fn is_sampled(&self) -> bool {
723 if let Some(&TagValue::Integer(n)) = self
724 .tags
725 .iter()
726 .find(|t| t.name() == "sampling.priority")
727 .map(|t| t.value())
728 {
729 n > 0
730 } else {
731 self.sampler.is_sampled(&self.span())
732 }
733 }
734}
735
736#[derive(Debug, Clone)]
738pub struct SpanHandle<T>(Option<(SpanContext<T>, SharedSpanConsumer<T>)>);
739impl<T> SpanHandle<T> {
740 pub fn is_sampled(&self) -> bool {
742 self.0.is_some()
743 }
744
745 pub fn context(&self) -> Option<&SpanContext<T>> {
747 self.0.as_ref().map(|(context, _)| context)
748 }
749
750 pub fn get_baggage_item(&self, name: &str) -> Option<&BaggageItem> {
752 if let Some(context) = self.context() {
753 context.baggage_items.iter().find(|x| x.name == name)
754 } else {
755 None
756 }
757 }
758
759 pub fn child<N, F>(&self, operation_name: N, f: F) -> Span<T>
761 where
762 N: Into<Cow<'static, str>>,
763 T: Clone,
764 F: FnOnce(StartSpanOptions<AllSampler, T>) -> Span<T>,
765 {
766 if let Some((context, span_tx)) = self.0.as_ref() {
767 let options =
768 StartSpanOptions::new(operation_name, span_tx, &AllSampler).child_of(context);
769 f(options)
770 } else {
771 Span::inactive()
772 }
773 }
774
775 pub fn follower<N, F>(&self, operation_name: N, f: F) -> Span<T>
777 where
778 N: Into<Cow<'static, str>>,
779 T: Clone,
780 F: FnOnce(StartSpanOptions<AllSampler, T>) -> Span<T>,
781 {
782 if let Some((context, span_tx)) = self.0.as_ref() {
783 let options =
784 StartSpanOptions::new(operation_name, span_tx, &AllSampler).follows_from(context);
785 f(options)
786 } else {
787 Span::inactive()
788 }
789 }
790}
791
792pub trait InspectableSpan<T> {
794 fn operation_name(&self) -> &str;
796
797 fn start_time(&self) -> SystemTime;
799
800 fn finish_time(&self) -> Option<SystemTime>;
802
803 fn logs(&self) -> &[Log];
805
806 fn tags(&self) -> &[Tag];
808
809 fn references(&self) -> &[SpanReference<T>];
811}
812
813impl<T> InspectableSpan<T> for Span<T> {
814 fn operation_name(&self) -> &str {
816 self.0
817 .as_ref()
818 .map(|inner| inner.operation_name.as_ref())
819 .unwrap_or("")
820 }
821
822 fn start_time(&self) -> SystemTime {
824 self.0
825 .as_ref()
826 .map(|inner| inner.start_time)
827 .unwrap_or_else(SystemTime::now)
828 }
829
830 fn finish_time(&self) -> Option<SystemTime> {
832 self.0.as_ref().and_then(|inner| inner.finish_time)
833 }
834
835 fn logs(&self) -> &[Log] {
837 self.0
838 .as_ref()
839 .map(|inner| inner.logs.as_ref())
840 .unwrap_or(&[])
841 }
842
843 fn tags(&self) -> &[Tag] {
845 self.0
846 .as_ref()
847 .map(|inner| inner.tags.as_ref())
848 .unwrap_or(&[])
849 }
850
851 fn references(&self) -> &[SpanReference<T>] {
853 self.0
854 .as_ref()
855 .map(|inner| inner.references.as_ref())
856 .unwrap_or(&[])
857 }
858}