opentelemetry_spanprocessor_any/sdk/metrics/controllers/
pull.rs

1use crate::metrics::{registry, Result};
2use crate::sdk::{
3    export::metrics::{AggregatorSelector, CheckpointSet, Checkpointer, ExportKindFor, Record},
4    metrics::{
5        accumulator,
6        processors::{self, BasicProcessor},
7        Accumulator,
8    },
9    Resource,
10};
11use std::sync::Arc;
12use std::time::{Duration, SystemTime};
13
14const DEFAULT_CACHE_DURATION: Duration = Duration::from_secs(10);
15
16/// Returns a builder for creating a `PullController` with the configured and options.
17pub fn pull(
18    aggregator_selector: Box<dyn AggregatorSelector + Send + Sync>,
19    export_selector: Box<dyn ExportKindFor + Send + Sync>,
20) -> PullControllerBuilder {
21    PullControllerBuilder::with_selectors(aggregator_selector, export_selector)
22}
23
24/// Pull controllers are typically used in an environment where there are
25/// multiple readers. It is common, therefore, when configuring a
26/// `BasicProcessor` for use with this controller, to use a
27/// `ExportKind::Cumulative` strategy and the `with_memory(true)` builder
28/// option, which ensures that every `CheckpointSet` includes full state.
29#[derive(Debug)]
30pub struct PullController {
31    accumulator: Accumulator,
32    processor: Arc<BasicProcessor>,
33    provider: registry::RegistryMeterProvider,
34    period: Duration,
35    last_collect: SystemTime,
36}
37
38impl PullController {
39    /// The provider for this controller
40    pub fn provider(&self) -> registry::RegistryMeterProvider {
41        self.provider.clone()
42    }
43
44    /// Collects all metrics if the last collected at time is past the current period
45    pub fn collect(&mut self) -> Result<()> {
46        if self
47            .last_collect
48            .elapsed()
49            .map_or(true, |elapsed| elapsed > self.period)
50        {
51            self.last_collect = crate::time::now();
52            self.processor.lock().and_then(|mut checkpointer| {
53                checkpointer.start_collection();
54                self.accumulator.0.collect(&mut checkpointer);
55                checkpointer.finish_collection()
56            })
57        } else {
58            Ok(())
59        }
60    }
61}
62
63impl CheckpointSet for PullController {
64    fn try_for_each(
65        &mut self,
66        export_selector: &dyn ExportKindFor,
67        f: &mut dyn FnMut(&Record<'_>) -> Result<()>,
68    ) -> Result<()> {
69        self.processor.lock().and_then(|mut locked_processor| {
70            locked_processor
71                .checkpoint_set()
72                .try_for_each(export_selector, f)
73        })
74    }
75}
76
77/// Configuration for a `PullController`.
78#[derive(Debug)]
79pub struct PullControllerBuilder {
80    /// The aggregator selector used by the controller
81    aggregator_selector: Box<dyn AggregatorSelector + Send + Sync>,
82
83    /// The export kind selector used by this controller
84    export_selector: Box<dyn ExportKindFor + Send + Sync>,
85
86    /// Resource is the OpenTelemetry resource associated with all Meters created by
87    /// the controller.
88    resource: Option<Resource>,
89
90    /// CachePeriod is the period which a recently-computed result will be returned
91    /// without gathering metric data again.
92    ///
93    /// If the period is zero, caching of the result is disabled. The default value
94    /// is 10 seconds.
95    cache_period: Option<Duration>,
96
97    /// Memory controls whether the controller's processor remembers metric
98    /// instruments and attribute sets that were previously reported. When memory is
99    /// `true`, `CheckpointSet::try_for_each` will visit metrics that were not
100    /// updated in the most recent interval. Default true.
101    memory: bool,
102}
103
104impl PullControllerBuilder {
105    /// Configure the sectors for this controller
106    pub fn with_selectors(
107        aggregator_selector: Box<dyn AggregatorSelector + Send + Sync>,
108        export_selector: Box<dyn ExportKindFor + Send + Sync>,
109    ) -> Self {
110        PullControllerBuilder {
111            aggregator_selector,
112            export_selector,
113            resource: None,
114            cache_period: None,
115            memory: true,
116        }
117    }
118
119    /// Configure the resource for this controller
120    pub fn with_resource(self, resource: Resource) -> Self {
121        PullControllerBuilder {
122            resource: Some(resource),
123            ..self
124        }
125    }
126
127    /// Configure the cache period for this controller
128    pub fn with_cache_period(self, period: Duration) -> Self {
129        PullControllerBuilder {
130            cache_period: Some(period),
131            ..self
132        }
133    }
134
135    /// Sets the memory behavior of the controller's `Processor`.  If this is
136    /// `true`, the processor will report metric instruments and attribute sets that
137    /// were previously reported but not updated in the most recent interval.
138    pub fn with_memory(self, memory: bool) -> Self {
139        PullControllerBuilder { memory, ..self }
140    }
141
142    /// Build a new `PullController` from the current configuration.
143    pub fn build(self) -> PullController {
144        let processor = Arc::new(processors::basic(
145            self.aggregator_selector,
146            self.export_selector,
147            self.memory,
148        ));
149
150        let accumulator = accumulator(processor.clone())
151            .with_resource(self.resource.unwrap_or_default())
152            .build();
153        let provider = registry::meter_provider(Arc::new(accumulator.clone()));
154
155        PullController {
156            accumulator,
157            processor,
158            provider,
159            period: self.cache_period.unwrap_or(DEFAULT_CACHE_DURATION),
160            last_collect: crate::time::now(),
161        }
162    }
163}