1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
use std::borrow::Cow;

use futures::{sync::mpsc, Future, Stream};

use crate::{Reporter, Sampler, Span, SpanBuilder, SpanState};

pub struct Tracer<S, R>
where
    S: Sampler,
    R: Reporter,
{
    service_name: Cow<'static, str>,
    sampler: S,
    reporter: Option<R>,
    sender: mpsc::UnboundedSender<Span>,
    receiver: Option<mpsc::UnboundedReceiver<Span>>,
}

impl<S, R> Tracer<S, R>
where
    S: Sampler,
    R: Reporter,
{
    pub fn new<N>(service_name: N, sampler: S, reporter: R) -> Self
    where
        N: Into<Cow<'static, str>>,
    {
        let (sender, receiver) = mpsc::unbounded();

        Self {
            service_name: service_name.into(),
            sampler,
            sender,
            reporter: Some(reporter),
            receiver: Some(receiver),
        }
    }

    pub fn serve(&mut self) -> (impl Future<Item = (), Error = ()>) {
        let mut reporter = self.reporter.take().unwrap();

        self.receiver
            .take()
            .unwrap()
            .for_each(move |span| {
                reporter.report(&span);
                Ok(())
            })
            .map_err(|_| ())
    }
}

impl<S, R> opentracing_rs_core::Tracer<SpanState, SpanBuilder> for Tracer<S, R>
where
    S: Sampler,
    R: Reporter,
{
    fn span<N>(&mut self, operation_name: N) -> SpanBuilder
    where
        N: Into<String>,
    {
        SpanBuilder::new(operation_name, self.sender.clone())
    }
}