1use crate::carrier;
3use crate::convert::MaybeAsRef;
4use crate::log::{Log, LogBuilder, StdErrorLogFieldsBuilder};
5use crate::sampler::{AllSampler, Sampler};
6use crate::tag::{StdTag, Tag, TagValue};
7use crate::Result;
8use std::borrow::Cow;
9use std::io::{Read, Write};
10use std::time::SystemTime;
11use tokio::sync::mpsc;
12
13pub type SpanReceiver<T> = mpsc::UnboundedReceiver<FinishedSpan<T>>;
15pub type SpanSender<T> = mpsc::UnboundedSender<FinishedSpan<T>>;
17
18#[derive(Debug)]
23pub struct Span<T>(Option<SpanInner<T>>);
24impl<T> Span<T> {
25 pub fn inactive() -> Self {
38 Span(None)
39 }
40
41 pub fn handle(&self) -> SpanHandle<T>
43 where
44 T: Clone,
45 {
46 SpanHandle(
47 self.0
48 .as_ref()
49 .map(|inner| (inner.context.clone(), inner.span_tx.clone())),
50 )
51 }
52
53 pub fn is_sampled(&self) -> bool {
55 self.0.is_some()
56 }
57
58 pub fn context(&self) -> Option<&SpanContext<T>> {
60 self.0.as_ref().map(|x| &x.context)
61 }
62
63 pub fn set_operation_name<F, N>(&mut self, f: F)
65 where
66 F: FnOnce() -> N,
67 N: Into<Cow<'static, str>>,
68 {
69 if let Some(inner) = self.0.as_mut() {
70 inner.operation_name = f().into();
71 }
72 }
73
74 pub fn set_start_time<F>(&mut self, f: F)
76 where
77 F: FnOnce() -> SystemTime,
78 {
79 if let Some(inner) = self.0.as_mut() {
80 inner.start_time = f();
81 }
82 }
83
84 pub fn set_finish_time<F>(&mut self, f: F)
86 where
87 F: FnOnce() -> SystemTime,
88 {
89 if let Some(inner) = self.0.as_mut() {
90 inner.finish_time = Some(f());
91 }
92 }
93
94 pub fn set_tag<F>(&mut self, f: F)
96 where
97 F: FnOnce() -> Tag,
98 {
99 use std::iter::once;
100 self.set_tags(|| once(f()));
101 }
102
103 pub fn set_tags<F, I>(&mut self, f: F)
105 where
106 F: FnOnce() -> I,
107 I: IntoIterator<Item = Tag>,
108 {
109 if let Some(inner) = self.0.as_mut() {
110 for tag in f() {
111 inner.tags.retain(|x| x.name() != tag.name());
112 inner.tags.push(tag);
113 }
114 }
115 }
116
117 pub fn set_baggage_item<F>(&mut self, f: F)
119 where
120 F: FnOnce() -> BaggageItem,
121 {
122 if let Some(inner) = self.0.as_mut() {
123 let item = f();
124 inner.context.baggage_items.retain(|x| x.name != item.name);
125 inner.context.baggage_items.push(item);
126 }
127 }
128
129 pub fn get_baggage_item(&self, name: &str) -> Option<&BaggageItem> {
131 if let Some(inner) = self.0.as_ref() {
132 inner.context.baggage_items.iter().find(|x| x.name == name)
133 } else {
134 None
135 }
136 }
137
138 pub fn log<F>(&mut self, f: F)
140 where
141 F: FnOnce(&mut LogBuilder),
142 {
143 if let Some(inner) = self.0.as_mut() {
144 let mut builder = LogBuilder::new();
145 f(&mut builder);
146 if let Some(log) = builder.finish() {
147 inner.logs.push(log);
148 }
149 }
150 }
151
152 pub fn error_log<F>(&mut self, f: F)
157 where
158 F: FnOnce(&mut StdErrorLogFieldsBuilder),
159 {
160 if let Some(inner) = self.0.as_mut() {
161 let mut builder = LogBuilder::new();
162 f(&mut builder.error());
163 if let Some(log) = builder.finish() {
164 inner.logs.push(log);
165 }
166 if !inner.tags.iter().any(|x| x.name() == "error") {
167 inner.tags.push(StdTag::error());
168 }
169 }
170 }
171
172 pub fn child<N, F>(&self, operation_name: N, f: F) -> Span<T>
174 where
175 N: Into<Cow<'static, str>>,
176 T: Clone,
177 F: FnOnce(StartSpanOptions<AllSampler, T>) -> Span<T>,
178 {
179 self.handle().child(operation_name, f)
180 }
181
182 pub fn follower<N, F>(&self, operation_name: N, f: F) -> Span<T>
184 where
185 N: Into<Cow<'static, str>>,
186 T: Clone,
187 F: FnOnce(StartSpanOptions<AllSampler, T>) -> Span<T>,
188 {
189 self.handle().follower(operation_name, f)
190 }
191
192 pub(crate) fn new(
193 operation_name: Cow<'static, str>,
194 start_time: SystemTime,
195 references: Vec<SpanReference<T>>,
196 tags: Vec<Tag>,
197 state: T,
198 baggage_items: Vec<BaggageItem>,
199 span_tx: SpanSender<T>,
200 ) -> Self {
201 let context = SpanContext::new(state, baggage_items);
202 let inner = SpanInner {
203 operation_name,
204 start_time,
205 finish_time: None,
206 references,
207 tags,
208 logs: Vec::new(),
209 context,
210 span_tx,
211 };
212 Span(Some(inner))
213 }
214}
215impl<T> Drop for Span<T> {
216 fn drop(&mut self) {
217 if let Some(inner) = self.0.take() {
218 let finished = FinishedSpan {
219 operation_name: inner.operation_name,
220 start_time: inner.start_time,
221 finish_time: inner.finish_time.unwrap_or_else(SystemTime::now),
222 references: inner.references,
223 tags: inner.tags,
224 logs: inner.logs,
225 context: inner.context,
226 };
227 let _ = inner.span_tx.send(finished);
228 }
229 }
230}
231impl<T> MaybeAsRef<SpanContext<T>> for Span<T> {
232 fn maybe_as_ref(&self) -> Option<&SpanContext<T>> {
233 self.context()
234 }
235}
236
237#[derive(Debug)]
238struct SpanInner<T> {
239 operation_name: Cow<'static, str>,
240 start_time: SystemTime,
241 finish_time: Option<SystemTime>,
242 references: Vec<SpanReference<T>>,
243 tags: Vec<Tag>,
244 logs: Vec<Log>,
245 context: SpanContext<T>,
246 span_tx: SpanSender<T>,
247}
248
249#[derive(Debug)]
251pub struct FinishedSpan<T> {
252 operation_name: Cow<'static, str>,
253 start_time: SystemTime,
254 finish_time: SystemTime,
255 references: Vec<SpanReference<T>>,
256 tags: Vec<Tag>,
257 logs: Vec<Log>,
258 context: SpanContext<T>,
259}
260impl<T> FinishedSpan<T> {
261 pub fn operation_name(&self) -> &str {
263 self.operation_name.as_ref()
264 }
265
266 pub fn start_time(&self) -> SystemTime {
268 self.start_time
269 }
270
271 pub fn finish_time(&self) -> SystemTime {
273 self.finish_time
274 }
275
276 pub fn logs(&self) -> &[Log] {
278 &self.logs
279 }
280
281 pub fn tags(&self) -> &[Tag] {
283 &self.tags
284 }
285
286 pub fn references(&self) -> &[SpanReference<T>] {
288 &self.references
289 }
290
291 pub fn context(&self) -> &SpanContext<T> {
293 &self.context
294 }
295}
296
297#[derive(Debug, Clone)]
304pub struct SpanContext<T> {
305 state: T,
306 baggage_items: Vec<BaggageItem>,
307}
308impl<T> SpanContext<T> {
309 pub fn new(state: T, mut baggage_items: Vec<BaggageItem>) -> Self {
311 baggage_items.reverse();
312 baggage_items.sort_by(|a, b| a.name().cmp(b.name()));
313 baggage_items.dedup_by(|a, b| a.name() == b.name());
314 SpanContext {
315 state,
316 baggage_items,
317 }
318 }
319
320 pub fn state(&self) -> &T {
322 &self.state
323 }
324
325 pub fn baggage_items(&self) -> &[BaggageItem] {
327 &self.baggage_items
328 }
329
330 pub fn inject_to_text_map<C>(&self, carrier: &mut C) -> Result<()>
332 where
333 C: carrier::TextMap,
334 T: carrier::InjectToTextMap<C>,
335 {
336 track!(T::inject_to_text_map(self, carrier))
337 }
338
339 pub fn inject_to_http_header<C>(&self, carrier: &mut C) -> Result<()>
341 where
342 C: carrier::SetHttpHeaderField,
343 T: carrier::InjectToHttpHeader<C>,
344 {
345 track!(T::inject_to_http_header(self, carrier))
346 }
347
348 pub fn inject_to_binary<C>(&self, carrier: &mut C) -> Result<()>
350 where
351 C: Write,
352 T: carrier::InjectToBinary<C>,
353 {
354 track!(T::inject_to_binary(self, carrier))
355 }
356
357 pub fn extract_from_text_map<C>(carrier: &C) -> Result<Option<Self>>
359 where
360 C: carrier::TextMap,
361 T: carrier::ExtractFromTextMap<C>,
362 {
363 track!(T::extract_from_text_map(carrier))
364 }
365
366 pub fn extract_from_http_header<'a, C>(carrier: &'a C) -> Result<Option<Self>>
368 where
369 C: carrier::IterHttpHeaderFields<'a>,
370 T: carrier::ExtractFromHttpHeader<'a, C>,
371 {
372 track!(T::extract_from_http_header(carrier))
373 }
374
375 pub fn extract_from_binary<C>(carrier: &mut C) -> Result<Option<Self>>
377 where
378 C: Read,
379 T: carrier::ExtractFromBinary<C>,
380 {
381 track!(T::extract_from_binary(carrier))
382 }
383}
384impl<T> MaybeAsRef<SpanContext<T>> for SpanContext<T> {
385 fn maybe_as_ref(&self) -> Option<&Self> {
386 Some(self)
387 }
388}
389
390#[derive(Debug, Clone)]
405pub struct BaggageItem {
406 name: String,
407 value: String,
408}
409impl BaggageItem {
410 pub fn new(name: &str, value: &str) -> Self {
412 BaggageItem {
413 name: name.to_owned(),
414 value: value.to_owned(),
415 }
416 }
417
418 pub fn name(&self) -> &str {
420 &self.name
421 }
422
423 pub fn value(&self) -> &str {
425 &self.value
426 }
427}
428
429#[derive(Debug, Clone)]
431#[allow(missing_docs)]
432pub enum SpanReference<T> {
433 ChildOf(T),
434 FollowsFrom(T),
435}
436impl<T> SpanReference<T> {
437 pub fn span(&self) -> &T {
439 match *self {
440 SpanReference::ChildOf(ref x) | SpanReference::FollowsFrom(ref x) => x,
441 }
442 }
443
444 pub fn is_child_of(&self) -> bool {
446 matches!(*self, SpanReference::ChildOf(_))
447 }
448
449 pub fn is_follows_from(&self) -> bool {
451 matches!(*self, SpanReference::FollowsFrom(_))
452 }
453}
454
455#[derive(Debug)]
457pub struct CandidateSpan<'a, T: 'a> {
458 tags: &'a [Tag],
459 references: &'a [SpanReference<T>],
460 baggage_items: &'a [BaggageItem],
461}
462impl<'a, T: 'a> CandidateSpan<'a, T> {
463 pub fn tags(&self) -> &[Tag] {
465 self.tags
466 }
467
468 pub fn references(&self) -> &[SpanReference<T>] {
470 self.references
471 }
472
473 pub fn baggage_items(&self) -> &[BaggageItem] {
475 self.baggage_items
476 }
477}
478
479#[derive(Debug)]
481pub struct StartSpanOptions<'a, S: 'a, T: 'a> {
482 operation_name: Cow<'static, str>,
483 start_time: Option<SystemTime>,
484 tags: Vec<Tag>,
485 references: Vec<SpanReference<T>>,
486 baggage_items: Vec<BaggageItem>,
487 span_tx: &'a SpanSender<T>,
488 sampler: &'a S,
489}
490impl<'a, S: 'a, T: 'a> StartSpanOptions<'a, S, T>
491where
492 S: Sampler<T>,
493{
494 pub fn start_time(mut self, time: SystemTime) -> Self {
496 self.start_time = Some(time);
497 self
498 }
499
500 pub fn tag(mut self, tag: Tag) -> Self {
502 self.tags.push(tag);
503 self
504 }
505
506 pub fn child_of<C>(mut self, context: &C) -> Self
508 where
509 C: MaybeAsRef<SpanContext<T>>,
510 T: Clone,
511 {
512 if let Some(context) = context.maybe_as_ref() {
513 let reference = SpanReference::ChildOf(context.state().clone());
514 self.references.push(reference);
515 self.baggage_items
516 .extend(context.baggage_items().iter().cloned());
517 }
518 self
519 }
520
521 pub fn follows_from<C>(mut self, context: &C) -> Self
523 where
524 C: MaybeAsRef<SpanContext<T>>,
525 T: Clone,
526 {
527 if let Some(context) = context.maybe_as_ref() {
528 let reference = SpanReference::FollowsFrom(context.state().clone());
529 self.references.push(reference);
530 self.baggage_items
531 .extend(context.baggage_items().iter().cloned());
532 }
533 self
534 }
535
536 pub fn start(mut self) -> Span<T>
538 where
539 T: for<'b> From<CandidateSpan<'b, T>>,
540 {
541 self.normalize();
542 if !self.is_sampled() {
543 return Span(None);
544 }
545 let state = T::from(self.span());
546 Span::new(
547 self.operation_name,
548 self.start_time.unwrap_or_else(SystemTime::now),
549 self.references,
550 self.tags,
551 state,
552 self.baggage_items,
553 self.span_tx.clone(),
554 )
555 }
556
557 pub fn start_with_state(mut self, state: T) -> Span<T> {
559 self.normalize();
560 if !self.is_sampled() {
561 return Span(None);
562 }
563 Span::new(
564 self.operation_name,
565 self.start_time.unwrap_or_else(SystemTime::now),
566 self.references,
567 self.tags,
568 state,
569 self.baggage_items,
570 self.span_tx.clone(),
571 )
572 }
573
574 pub(crate) fn new<N>(operation_name: N, span_tx: &'a SpanSender<T>, sampler: &'a S) -> Self
575 where
576 N: Into<Cow<'static, str>>,
577 {
578 StartSpanOptions {
579 operation_name: operation_name.into(),
580 start_time: None,
581 tags: Vec::new(),
582 references: Vec::new(),
583 baggage_items: Vec::new(),
584 span_tx,
585 sampler,
586 }
587 }
588
589 fn normalize(&mut self) {
590 self.tags.reverse();
591 self.tags.sort_by(|a, b| a.name().cmp(b.name()));
592 self.tags.dedup_by(|a, b| a.name() == b.name());
593
594 self.baggage_items.reverse();
595 self.baggage_items.sort_by(|a, b| a.name().cmp(b.name()));
596 self.baggage_items.dedup_by(|a, b| a.name() == b.name());
597 }
598
599 fn span(&self) -> CandidateSpan<T> {
600 CandidateSpan {
601 references: &self.references,
602 tags: &self.tags,
603 baggage_items: &self.baggage_items,
604 }
605 }
606
607 fn is_sampled(&self) -> bool {
608 if let Some(&TagValue::Integer(n)) = self
609 .tags
610 .iter()
611 .find(|t| t.name() == "sampling.priority")
612 .map(|t| t.value())
613 {
614 n > 0
615 } else {
616 self.sampler.is_sampled(&self.span())
617 }
618 }
619}
620
621#[derive(Debug, Clone)]
623pub struct SpanHandle<T>(Option<(SpanContext<T>, SpanSender<T>)>);
624impl<T> SpanHandle<T> {
625 pub fn is_sampled(&self) -> bool {
627 self.0.is_some()
628 }
629
630 pub fn context(&self) -> Option<&SpanContext<T>> {
632 self.0.as_ref().map(|(context, _)| context)
633 }
634
635 pub fn get_baggage_item(&self, name: &str) -> Option<&BaggageItem> {
637 if let Some(context) = self.context() {
638 context.baggage_items.iter().find(|x| x.name == name)
639 } else {
640 None
641 }
642 }
643
644 pub fn child<N, F>(&self, operation_name: N, f: F) -> Span<T>
646 where
647 N: Into<Cow<'static, str>>,
648 T: Clone,
649 F: FnOnce(StartSpanOptions<AllSampler, T>) -> Span<T>,
650 {
651 if let Some((context, span_tx)) = self.0.as_ref() {
652 let options =
653 StartSpanOptions::new(operation_name, span_tx, &AllSampler).child_of(context);
654 f(options)
655 } else {
656 Span::inactive()
657 }
658 }
659
660 pub fn follower<N, F>(&self, operation_name: N, f: F) -> Span<T>
662 where
663 N: Into<Cow<'static, str>>,
664 T: Clone,
665 F: FnOnce(StartSpanOptions<AllSampler, T>) -> Span<T>,
666 {
667 if let Some((context, span_tx)) = self.0.as_ref() {
668 let options =
669 StartSpanOptions::new(operation_name, span_tx, &AllSampler).follows_from(context);
670 f(options)
671 } else {
672 Span::inactive()
673 }
674 }
675}
676
677pub trait InspectableSpan<T> {
679 fn operation_name(&self) -> &str;
681
682 fn start_time(&self) -> SystemTime;
684
685 fn finish_time(&self) -> Option<SystemTime>;
687
688 fn logs(&self) -> &[Log];
690
691 fn tags(&self) -> &[Tag];
693
694 fn references(&self) -> &[SpanReference<T>];
696}
697
698impl<T> InspectableSpan<T> for Span<T> {
699 fn operation_name(&self) -> &str {
701 self.0
702 .as_ref()
703 .map(|inner| inner.operation_name.as_ref())
704 .unwrap_or("")
705 }
706
707 fn start_time(&self) -> SystemTime {
709 self.0
710 .as_ref()
711 .map(|inner| inner.start_time)
712 .unwrap_or_else(SystemTime::now)
713 }
714
715 fn finish_time(&self) -> Option<SystemTime> {
717 self.0.as_ref().and_then(|inner| inner.finish_time)
718 }
719
720 fn logs(&self) -> &[Log] {
722 self.0
723 .as_ref()
724 .map(|inner| inner.logs.as_ref())
725 .unwrap_or(&[])
726 }
727
728 fn tags(&self) -> &[Tag] {
730 self.0
731 .as_ref()
732 .map(|inner| inner.tags.as_ref())
733 .unwrap_or(&[])
734 }
735
736 fn references(&self) -> &[SpanReference<T>] {
738 self.0
739 .as_ref()
740 .map(|inner| inner.references.as_ref())
741 .unwrap_or(&[])
742 }
743}