1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
//! # Span Processor
//!
//! Span processor is an interface which allows hooks for span start and end
//! method invocations. Span processors are invoked only when [`is_recording`]
//! is `true`.
//! Built-in span processors are responsible for batching and conversion of
//! spans to exportable representation and passing batches to exporters.
//! Span processors can be registered directly on SDK [`Provider`] and they are
//! invoked in the same order as they were registered.
//! All [`Tracer`] instances created by a [`Provider`] share the same span
//! processors. Changes to this collection reflect in all [`Tracer`] instances.
//! The following diagram shows [`SpanProcessor`]'s relationship to other
//! components in the SDK:
//!
//! ```ascii
//! +-----+--------------+   +-------------------------+   +-------------------+
//! |     |              |   |                         |   |                   |
//! |     |              |   | BatchExporterProcessor  |   |    SpanExporter   |
//! |     |              +---> SimpleExporterProcessor +--->  (JaegerExporter) |
//! |     |              |   |                         |   |                   |
//! | SDK | Span.start() |   +-------------------------+   +-------------------+
//! |     | Span.end()   |
//! |     |              |   +---------------------+
//! |     |              |   |                     |
//! |     |              +---> ZPagesProcessor     |
//! |     |              |   |                     |
//! +-----+--------------+   +---------------------+
//! ```
//!
//! # Examples
//!
//! #### Exporting spans with a simple exporter:
//!
//! Note that the simple processor exports synchronously every time a span is ended. If you find this
//! limiting, consider the batch processor instead.
//!
//! ```
//! use opentelemetry::{api, sdk, global};
//!
//! // Configure your preferred exporter
//! let exporter = api::NoopSpanExporter {};
//!
//! // Then use the `with_simple_exporter` method to have the provider export when spans finish.
//! let provider = sdk::Provider::builder()
//!     .with_simple_exporter(exporter)
//!     .build();
//!
//! global::set_provider(provider);
//! ```
//!
//! #### Exporting spans asynchronously in batches:
//!
//! This processor can be configured with an [`executor`] of your choice to batch and upload spans
//! asynchronously when they end. If you have added a library like [`tokio`] or [`async-std`], you
//! can pass in their respective `spawn` and `interval` functions to have batching performed in
//! those contexts.
//!
//! ```
//! use futures::{stream};
//! use opentelemetry::{api, sdk, global};
//! use std::time::Duration;
//!
//! #[tokio::main]
//! async fn main() {
//!     // Configure your preferred exporter
//!     let exporter = api::NoopSpanExporter {};
//!
//!     // Then build a batch processor. You can use whichever executor you have available, for
//!     // example if you are using `async-std` instead of `tokio` you can replace the spawn and
//!     // interval functions with `async_std::task::spawn` and `async_std::stream::interval`.
//!     let batch = sdk::BatchSpanProcessor::builder(exporter, tokio::spawn, tokio::time::interval)
//!         .with_max_queue_size(4096)
//!         .build();
//!
//!     // Then use the `with_batch_exporter` method to have the provider export spans in batches.
//!     let provider = sdk::Provider::builder()
//!         .with_batch_exporter(batch)
//!         .build();
//!
//!     global::set_provider(provider);
//! }
//! ```
//!
//! [`is_recording`]: ../../../api/trace/span/trait.Span.html#tymethod.is_recording
//! [`Provider`]: ../../../api/trace/provider/trait.Provider.html
//! [`Tracer`]: ../../../api/trace/tracer/trait.Tracer.html
//! [`SpanProcessor`]: ../../../api/trace/span_processor/trait.SpanProcessor.html
//! [`SimpleSpanProcessor`]: struct.SimpleSpanProcessor.html
//! [`BatchSpanProcessor`]: struct.BatchSpanProcessor.html
//! [`executor`]: https://docs.rs/futures/0.3.4/futures/executor/index.html
//! [`tokio`]: https://tokio.rs
//! [`async-std`]: https://async.rs
use crate::{api, exporter};
use futures::{
    channel::mpsc,
    task::{Context, Poll},
    Future, Stream, StreamExt,
};
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::time;

/// A [`SpanProcessor`] that exports synchronously when spans are finished.
///
/// [`SpanProcessor`]: ../../../api/trace/span_processor/trait.SpanProcessor.html
#[derive(Debug)]
pub struct SimpleSpanProcessor {
    exporter: Box<dyn exporter::trace::SpanExporter>,
}

impl SimpleSpanProcessor {
    pub(crate) fn new(exporter: Box<dyn exporter::trace::SpanExporter>) -> Self {
        SimpleSpanProcessor { exporter }
    }
}

impl api::SpanProcessor for SimpleSpanProcessor {
    fn on_start(&self, _span: Arc<exporter::trace::SpanData>) {
        // Ignored
    }

    fn on_end(&self, span: Arc<exporter::trace::SpanData>) {
        if span.context.is_sampled() {
            self.exporter.export(vec![span]);
        }
    }

    fn shutdown(&self) {
        self.exporter.shutdown();
    }
}

/// A [`SpanProcessor`] that asynchronously buffers finished spans and reports
/// them at a preconfigured interval.
///
/// [`SpanProcessor`]: ../../../api/trace/span_processor/trait.SpanProcessor.html
#[derive(Debug)]
pub struct BatchSpanProcessor {
    message_sender: Mutex<mpsc::Sender<BatchMessage>>,
}

impl api::SpanProcessor for BatchSpanProcessor {
    fn on_start(&self, _span: Arc<exporter::trace::SpanData>) {
        // Ignored
    }

    fn on_end(&self, span: Arc<exporter::trace::SpanData>) {
        if let Ok(mut sender) = self.message_sender.try_lock() {
            let _ = sender.try_send(BatchMessage::ExportSpan(span));
        }
    }

    fn shutdown(&self) {
        if let Ok(mut sender) = self.message_sender.try_lock() {
            let _ = sender.try_send(BatchMessage::Shutdown);
        }
    }
}

/// A worker process that batches and processes spans as they are reported.
///
/// This process is implemented as a [`Future`] that returns when the accompanying
/// [`BatchSpanProcessor`] is shut down, and allows systems like [`tokio`] and [`async-std`] to
/// process the work in the background without requiring dedicated system threads.
#[allow(missing_debug_implementations)]
pub struct BatchSpanProcessorWorker {
    exporter: Box<dyn exporter::trace::SpanExporter>,
    messages: Pin<Box<dyn Stream<Item = BatchMessage> + Send>>,
    config: BatchConfig,
    buffer: Vec<Arc<exporter::trace::SpanData>>,
}

impl Future for BatchSpanProcessorWorker {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        loop {
            match futures::ready!(self.messages.poll_next_unpin(cx)) {
                // Span has finished, add to buffer of pending spans.
                Some(BatchMessage::ExportSpan(span)) => {
                    if self.buffer.len() < self.config.max_queue_size {
                        self.buffer.push(span);
                    }
                }
                // Span batch interval time reached, export current spans.
                Some(BatchMessage::Tick) => {
                    if !self.buffer.is_empty() {
                        let mut spans = std::mem::replace(&mut self.buffer, Vec::new());
                        while !spans.is_empty() {
                            let batch_idx = spans
                                .len()
                                .saturating_sub(self.config.max_export_batch_size);
                            let batch = spans.split_off(batch_idx);
                            self.exporter.export(batch);
                        }
                    }
                }
                // Stream has terminated or processor is shutdown, return to finish execution.
                None | Some(BatchMessage::Shutdown) => {
                    self.exporter.shutdown();
                    return Poll::Ready(());
                }
            }
        }
    }
}

#[derive(Debug)]
enum BatchMessage {
    ExportSpan(Arc<exporter::trace::SpanData>),
    Tick,
    Shutdown,
}

impl BatchSpanProcessor {
    pub(crate) fn new<S, SO, I, IS, ISI>(
        exporter: Box<dyn exporter::trace::SpanExporter>,
        spawn: S,
        interval: I,
        config: BatchConfig,
    ) -> Self
    where
        S: Fn(BatchSpanProcessorWorker) -> SO,
        I: Fn(time::Duration) -> IS,
        IS: Stream<Item = ISI> + Send + 'static,
    {
        let (message_sender, message_receiver) = mpsc::channel(config.max_queue_size);
        let ticker = interval(config.scheduled_delay).map(|_| BatchMessage::Tick);

        // Spawn worker process via user-defined spawn function.
        spawn(BatchSpanProcessorWorker {
            exporter,
            messages: Box::pin(futures::stream::select(message_receiver, ticker)),
            config,
            buffer: Vec::new(),
        });

        // Return batch processor with link to worker
        BatchSpanProcessor {
            message_sender: Mutex::new(message_sender),
        }
    }

    /// Create a new batch processor builder
    pub fn builder<E, S, SO, I, IO>(
        exporter: E,
        spawn: S,
        interval: I,
    ) -> BatchSpanProcessorBuilder<E, S, I>
    where
        E: exporter::trace::SpanExporter,
        S: Fn(BatchSpanProcessorWorker) -> SO,
        I: Fn(time::Duration) -> IO,
    {
        BatchSpanProcessorBuilder {
            exporter,
            spawn,
            interval,
            config: Default::default(),
        }
    }
}

/// Batch span processor configuration
#[derive(Debug)]
pub struct BatchConfig {
    /// The maximum queue size to buffer spans for delayed processing. If the
    /// queue gets full it drops the spans. The default value of is 2048.
    max_queue_size: usize,

    /// The delay interval in milliseconds between two consecutive processing
    /// of batches. The default value is 5 seconds.
    scheduled_delay: time::Duration,

    /// The maximum number of spans to process in a single batch. If there are
    /// more than one batch worth of spans then it processes multiple batches
    /// of spans one batch after the other without any delay. The default value
    /// is 512.
    max_export_batch_size: usize,
}

impl Default for BatchConfig {
    fn default() -> Self {
        BatchConfig {
            max_queue_size: 2048,
            scheduled_delay: time::Duration::from_secs(5),
            max_export_batch_size: 512,
        }
    }
}

/// A builder for creating [`BatchSpanProcessor`] instances.
///
/// [`BatchSpanProcessor`]: struct.BatchSpanProcessor.html
#[derive(Debug)]
pub struct BatchSpanProcessorBuilder<E, S, I> {
    exporter: E,
    interval: I,
    spawn: S,
    config: BatchConfig,
}

impl<E, S, SO, I, IS, ISI> BatchSpanProcessorBuilder<E, S, I>
where
    E: exporter::trace::SpanExporter + 'static,
    S: Fn(BatchSpanProcessorWorker) -> SO,
    I: Fn(time::Duration) -> IS,
    IS: Stream<Item = ISI> + Send + 'static,
{
    /// Set max queue size for batches
    pub fn with_max_queue_size(self, size: usize) -> Self {
        let mut config = self.config;
        config.max_queue_size = size;

        BatchSpanProcessorBuilder { config, ..self }
    }

    /// Set scheduled delay for batches
    pub fn with_scheduled_delay(self, delay: time::Duration) -> Self {
        let mut config = self.config;
        config.scheduled_delay = delay;

        BatchSpanProcessorBuilder { config, ..self }
    }

    /// Set max export size for batches
    pub fn with_max_export_batch_size(self, size: usize) -> Self {
        let mut config = self.config;
        config.max_export_batch_size = size;

        BatchSpanProcessorBuilder { config, ..self }
    }

    /// Build a batch processor
    pub fn build(self) -> BatchSpanProcessor {
        BatchSpanProcessor::new(
            Box::new(self.exporter),
            self.spawn,
            self.interval,
            self.config,
        )
    }
}