opentelemetry_spanprocessor_any/sdk/metrics/controllers/
push.rs

1use crate::global;
2use crate::metrics::registry;
3use crate::sdk::{
4    export::metrics::{AggregatorSelector, Checkpointer, ExportKindFor, Exporter},
5    metrics::{
6        self,
7        processors::{self, BasicProcessor},
8        Accumulator,
9    },
10    Resource,
11};
12use futures_channel::mpsc;
13use futures_util::{
14    future::Future,
15    stream::{select, Stream, StreamExt as _},
16    task,
17};
18use std::pin::Pin;
19use std::sync::{Arc, Mutex};
20use std::time;
21
22lazy_static::lazy_static! {
23    static ref DEFAULT_PUSH_PERIOD: time::Duration = time::Duration::from_secs(10);
24}
25
26/// Create a new `PushControllerBuilder`.
27pub fn push<AS, ES, E, SP, SO, I, IO>(
28    aggregator_selector: AS,
29    export_selector: ES,
30    exporter: E,
31    spawn: SP,
32    interval: I,
33) -> PushControllerBuilder<SP, I>
34where
35    AS: AggregatorSelector + Send + Sync + 'static,
36    ES: ExportKindFor + Send + Sync + 'static,
37    E: Exporter + Send + Sync + 'static,
38    SP: Fn(PushControllerWorker) -> SO,
39    I: Fn(time::Duration) -> IO,
40{
41    PushControllerBuilder {
42        aggregator_selector: Box::new(aggregator_selector),
43        export_selector: Box::new(export_selector),
44        exporter: Box::new(exporter),
45        spawn,
46        interval,
47        resource: None,
48        period: None,
49        timeout: None,
50    }
51}
52
53/// Organizes a periodic push of metric data.
54#[derive(Debug)]
55pub struct PushController {
56    message_sender: Mutex<mpsc::Sender<PushMessage>>,
57    provider: registry::RegistryMeterProvider,
58}
59
60#[derive(Debug)]
61enum PushMessage {
62    Tick,
63    Shutdown,
64}
65
66/// The future which executes push controller work periodically. Can be run on a
67/// passed in executor.
68#[allow(missing_debug_implementations)]
69pub struct PushControllerWorker {
70    messages: Pin<Box<dyn Stream<Item = PushMessage> + Send>>,
71    accumulator: Accumulator,
72    processor: Arc<BasicProcessor>,
73    exporter: Box<dyn Exporter + Send + Sync>,
74    _timeout: time::Duration,
75}
76
77impl PushControllerWorker {
78    fn on_tick(&mut self) {
79        // TODO handle timeout
80        if let Err(err) = self.processor.lock().and_then(|mut checkpointer| {
81            checkpointer.start_collection();
82            self.accumulator.0.collect(&mut checkpointer);
83            checkpointer.finish_collection()?;
84            self.exporter.export(checkpointer.checkpoint_set())
85        }) {
86            global::handle_error(err)
87        }
88    }
89}
90
91impl Future for PushControllerWorker {
92    type Output = ();
93    fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
94        loop {
95            match futures_util::ready!(self.messages.poll_next_unpin(cx)) {
96                // Span batch interval time reached, export current spans.
97                Some(PushMessage::Tick) => self.on_tick(),
98                // Stream has terminated or processor is shutdown, return to finish execution.
99                None | Some(PushMessage::Shutdown) => {
100                    return task::Poll::Ready(());
101                }
102            }
103        }
104    }
105}
106
107impl Drop for PushControllerWorker {
108    fn drop(&mut self) {
109        // Try to push data one last time
110        self.on_tick()
111    }
112}
113
114impl PushController {
115    /// The controller's meter provider.
116    pub fn provider(&self) -> registry::RegistryMeterProvider {
117        self.provider.clone()
118    }
119}
120
121impl Drop for PushController {
122    fn drop(&mut self) {
123        if let Ok(mut sender) = self.message_sender.lock() {
124            let _ = sender.try_send(PushMessage::Shutdown);
125        }
126    }
127}
128
129/// Configuration for building a new `PushController`.
130#[derive(Debug)]
131pub struct PushControllerBuilder<S, I> {
132    aggregator_selector: Box<dyn AggregatorSelector + Send + Sync>,
133    export_selector: Box<dyn ExportKindFor + Send + Sync>,
134    exporter: Box<dyn Exporter + Send + Sync>,
135    spawn: S,
136    interval: I,
137    resource: Option<Resource>,
138    period: Option<time::Duration>,
139    timeout: Option<time::Duration>,
140}
141
142impl<S, SO, I, IS, ISI> PushControllerBuilder<S, I>
143where
144    S: Fn(PushControllerWorker) -> SO,
145    I: Fn(time::Duration) -> IS,
146    IS: Stream<Item = ISI> + Send + 'static,
147{
148    /// Configure the period of this controller
149    pub fn with_period(self, period: time::Duration) -> Self {
150        PushControllerBuilder {
151            period: Some(period),
152            ..self
153        }
154    }
155
156    /// Configure the resource used by this controller
157    pub fn with_resource(self, resource: Resource) -> Self {
158        PushControllerBuilder {
159            resource: Some(resource),
160            ..self
161        }
162    }
163
164    /// Config the timeout of one request.
165    pub fn with_timeout(self, duration: time::Duration) -> Self {
166        PushControllerBuilder {
167            timeout: Some(duration),
168            ..self
169        }
170    }
171
172    /// Build a new `PushController` with this configuration.
173    pub fn build(self) -> PushController {
174        let processor = processors::basic(self.aggregator_selector, self.export_selector, false);
175        let processor = Arc::new(processor);
176        let mut accumulator = metrics::accumulator(processor.clone());
177
178        if let Some(resource) = self.resource {
179            accumulator = accumulator.with_resource(resource);
180        }
181        let accumulator = accumulator.build();
182        let provider = registry::meter_provider(Arc::new(accumulator.clone()));
183
184        let (message_sender, message_receiver) = mpsc::channel(256);
185        let ticker =
186            (self.interval)(self.period.unwrap_or(*DEFAULT_PUSH_PERIOD)).map(|_| PushMessage::Tick);
187
188        (self.spawn)(PushControllerWorker {
189            messages: Box::pin(select(message_receiver, ticker)),
190            accumulator,
191            processor,
192            exporter: self.exporter,
193            _timeout: self.timeout.unwrap_or(*DEFAULT_PUSH_PERIOD),
194        });
195
196        PushController {
197            message_sender: Mutex::new(message_sender),
198            provider,
199        }
200    }
201}