opentelemetry_spanprocessor_any/sdk/metrics/controllers/
push.rs1use crate::global;
2use crate::metrics::registry;
3use crate::sdk::{
4 export::metrics::{AggregatorSelector, Checkpointer, ExportKindFor, Exporter},
5 metrics::{
6 self,
7 processors::{self, BasicProcessor},
8 Accumulator,
9 },
10 Resource,
11};
12use futures_channel::mpsc;
13use futures_util::{
14 future::Future,
15 stream::{select, Stream, StreamExt as _},
16 task,
17};
18use std::pin::Pin;
19use std::sync::{Arc, Mutex};
20use std::time;
21
22lazy_static::lazy_static! {
23 static ref DEFAULT_PUSH_PERIOD: time::Duration = time::Duration::from_secs(10);
24}
25
26pub fn push<AS, ES, E, SP, SO, I, IO>(
28 aggregator_selector: AS,
29 export_selector: ES,
30 exporter: E,
31 spawn: SP,
32 interval: I,
33) -> PushControllerBuilder<SP, I>
34where
35 AS: AggregatorSelector + Send + Sync + 'static,
36 ES: ExportKindFor + Send + Sync + 'static,
37 E: Exporter + Send + Sync + 'static,
38 SP: Fn(PushControllerWorker) -> SO,
39 I: Fn(time::Duration) -> IO,
40{
41 PushControllerBuilder {
42 aggregator_selector: Box::new(aggregator_selector),
43 export_selector: Box::new(export_selector),
44 exporter: Box::new(exporter),
45 spawn,
46 interval,
47 resource: None,
48 period: None,
49 timeout: None,
50 }
51}
52
53#[derive(Debug)]
55pub struct PushController {
56 message_sender: Mutex<mpsc::Sender<PushMessage>>,
57 provider: registry::RegistryMeterProvider,
58}
59
60#[derive(Debug)]
61enum PushMessage {
62 Tick,
63 Shutdown,
64}
65
66#[allow(missing_debug_implementations)]
69pub struct PushControllerWorker {
70 messages: Pin<Box<dyn Stream<Item = PushMessage> + Send>>,
71 accumulator: Accumulator,
72 processor: Arc<BasicProcessor>,
73 exporter: Box<dyn Exporter + Send + Sync>,
74 _timeout: time::Duration,
75}
76
77impl PushControllerWorker {
78 fn on_tick(&mut self) {
79 if let Err(err) = self.processor.lock().and_then(|mut checkpointer| {
81 checkpointer.start_collection();
82 self.accumulator.0.collect(&mut checkpointer);
83 checkpointer.finish_collection()?;
84 self.exporter.export(checkpointer.checkpoint_set())
85 }) {
86 global::handle_error(err)
87 }
88 }
89}
90
91impl Future for PushControllerWorker {
92 type Output = ();
93 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
94 loop {
95 match futures_util::ready!(self.messages.poll_next_unpin(cx)) {
96 Some(PushMessage::Tick) => self.on_tick(),
98 None | Some(PushMessage::Shutdown) => {
100 return task::Poll::Ready(());
101 }
102 }
103 }
104 }
105}
106
107impl Drop for PushControllerWorker {
108 fn drop(&mut self) {
109 self.on_tick()
111 }
112}
113
114impl PushController {
115 pub fn provider(&self) -> registry::RegistryMeterProvider {
117 self.provider.clone()
118 }
119}
120
121impl Drop for PushController {
122 fn drop(&mut self) {
123 if let Ok(mut sender) = self.message_sender.lock() {
124 let _ = sender.try_send(PushMessage::Shutdown);
125 }
126 }
127}
128
129#[derive(Debug)]
131pub struct PushControllerBuilder<S, I> {
132 aggregator_selector: Box<dyn AggregatorSelector + Send + Sync>,
133 export_selector: Box<dyn ExportKindFor + Send + Sync>,
134 exporter: Box<dyn Exporter + Send + Sync>,
135 spawn: S,
136 interval: I,
137 resource: Option<Resource>,
138 period: Option<time::Duration>,
139 timeout: Option<time::Duration>,
140}
141
142impl<S, SO, I, IS, ISI> PushControllerBuilder<S, I>
143where
144 S: Fn(PushControllerWorker) -> SO,
145 I: Fn(time::Duration) -> IS,
146 IS: Stream<Item = ISI> + Send + 'static,
147{
148 pub fn with_period(self, period: time::Duration) -> Self {
150 PushControllerBuilder {
151 period: Some(period),
152 ..self
153 }
154 }
155
156 pub fn with_resource(self, resource: Resource) -> Self {
158 PushControllerBuilder {
159 resource: Some(resource),
160 ..self
161 }
162 }
163
164 pub fn with_timeout(self, duration: time::Duration) -> Self {
166 PushControllerBuilder {
167 timeout: Some(duration),
168 ..self
169 }
170 }
171
172 pub fn build(self) -> PushController {
174 let processor = processors::basic(self.aggregator_selector, self.export_selector, false);
175 let processor = Arc::new(processor);
176 let mut accumulator = metrics::accumulator(processor.clone());
177
178 if let Some(resource) = self.resource {
179 accumulator = accumulator.with_resource(resource);
180 }
181 let accumulator = accumulator.build();
182 let provider = registry::meter_provider(Arc::new(accumulator.clone()));
183
184 let (message_sender, message_receiver) = mpsc::channel(256);
185 let ticker =
186 (self.interval)(self.period.unwrap_or(*DEFAULT_PUSH_PERIOD)).map(|_| PushMessage::Tick);
187
188 (self.spawn)(PushControllerWorker {
189 messages: Box::pin(select(message_receiver, ticker)),
190 accumulator,
191 processor,
192 exporter: self.exporter,
193 _timeout: self.timeout.unwrap_or(*DEFAULT_PUSH_PERIOD),
194 });
195
196 PushController {
197 message_sender: Mutex::new(message_sender),
198 provider,
199 }
200 }
201}