opentelemetry_spanprocessor_any/sdk/metrics/controllers/
pull.rs1use crate::metrics::{registry, Result};
2use crate::sdk::{
3 export::metrics::{AggregatorSelector, CheckpointSet, Checkpointer, ExportKindFor, Record},
4 metrics::{
5 accumulator,
6 processors::{self, BasicProcessor},
7 Accumulator,
8 },
9 Resource,
10};
11use std::sync::Arc;
12use std::time::{Duration, SystemTime};
13
14const DEFAULT_CACHE_DURATION: Duration = Duration::from_secs(10);
15
16pub fn pull(
18 aggregator_selector: Box<dyn AggregatorSelector + Send + Sync>,
19 export_selector: Box<dyn ExportKindFor + Send + Sync>,
20) -> PullControllerBuilder {
21 PullControllerBuilder::with_selectors(aggregator_selector, export_selector)
22}
23
24#[derive(Debug)]
30pub struct PullController {
31 accumulator: Accumulator,
32 processor: Arc<BasicProcessor>,
33 provider: registry::RegistryMeterProvider,
34 period: Duration,
35 last_collect: SystemTime,
36}
37
38impl PullController {
39 pub fn provider(&self) -> registry::RegistryMeterProvider {
41 self.provider.clone()
42 }
43
44 pub fn collect(&mut self) -> Result<()> {
46 if self
47 .last_collect
48 .elapsed()
49 .map_or(true, |elapsed| elapsed > self.period)
50 {
51 self.last_collect = crate::time::now();
52 self.processor.lock().and_then(|mut checkpointer| {
53 checkpointer.start_collection();
54 self.accumulator.0.collect(&mut checkpointer);
55 checkpointer.finish_collection()
56 })
57 } else {
58 Ok(())
59 }
60 }
61}
62
63impl CheckpointSet for PullController {
64 fn try_for_each(
65 &mut self,
66 export_selector: &dyn ExportKindFor,
67 f: &mut dyn FnMut(&Record<'_>) -> Result<()>,
68 ) -> Result<()> {
69 self.processor.lock().and_then(|mut locked_processor| {
70 locked_processor
71 .checkpoint_set()
72 .try_for_each(export_selector, f)
73 })
74 }
75}
76
77#[derive(Debug)]
79pub struct PullControllerBuilder {
80 aggregator_selector: Box<dyn AggregatorSelector + Send + Sync>,
82
83 export_selector: Box<dyn ExportKindFor + Send + Sync>,
85
86 resource: Option<Resource>,
89
90 cache_period: Option<Duration>,
96
97 memory: bool,
102}
103
104impl PullControllerBuilder {
105 pub fn with_selectors(
107 aggregator_selector: Box<dyn AggregatorSelector + Send + Sync>,
108 export_selector: Box<dyn ExportKindFor + Send + Sync>,
109 ) -> Self {
110 PullControllerBuilder {
111 aggregator_selector,
112 export_selector,
113 resource: None,
114 cache_period: None,
115 memory: true,
116 }
117 }
118
119 pub fn with_resource(self, resource: Resource) -> Self {
121 PullControllerBuilder {
122 resource: Some(resource),
123 ..self
124 }
125 }
126
127 pub fn with_cache_period(self, period: Duration) -> Self {
129 PullControllerBuilder {
130 cache_period: Some(period),
131 ..self
132 }
133 }
134
135 pub fn with_memory(self, memory: bool) -> Self {
139 PullControllerBuilder { memory, ..self }
140 }
141
142 pub fn build(self) -> PullController {
144 let processor = Arc::new(processors::basic(
145 self.aggregator_selector,
146 self.export_selector,
147 self.memory,
148 ));
149
150 let accumulator = accumulator(processor.clone())
151 .with_resource(self.resource.unwrap_or_default())
152 .build();
153 let provider = registry::meter_provider(Arc::new(accumulator.clone()));
154
155 PullController {
156 accumulator,
157 processor,
158 provider,
159 period: self.cache_period.unwrap_or(DEFAULT_CACHE_DURATION),
160 last_collect: crate::time::now(),
161 }
162 }
163}