opentelemetry_spanprocessor_any/sdk/metrics/processors/
basic.rs1use crate::sdk::{
2 export::metrics::{
3 self, Accumulation, Aggregator, AggregatorSelector, CheckpointSet, Checkpointer,
4 ExportKind, ExportKindFor, LockedProcessor, Processor, Record, Subtractor,
5 },
6 metrics::aggregators::SumAggregator,
7 Resource,
8};
9use crate::{
10 attributes::{hash_attributes, AttributeSet},
11 metrics::{Descriptor, MetricsError, Result},
12};
13use fnv::FnvHasher;
14use std::collections::HashMap;
15use std::hash::{Hash, Hasher};
16use std::sync::{Arc, Mutex, MutexGuard};
17use std::time::SystemTime;
18
19pub fn basic(
21 aggregator_selector: Box<dyn AggregatorSelector + Send + Sync>,
22 export_selector: Box<dyn ExportKindFor + Send + Sync>,
23 memory: bool,
24) -> BasicProcessor {
25 BasicProcessor {
26 aggregator_selector,
27 export_selector,
28 state: Mutex::new(BasicProcessorState::with_memory(memory)),
29 }
30}
31
32#[derive(Debug)]
34pub struct BasicProcessor {
35 aggregator_selector: Box<dyn AggregatorSelector + Send + Sync>,
36 export_selector: Box<dyn ExportKindFor + Send + Sync>,
37 state: Mutex<BasicProcessorState>,
38}
39
40impl BasicProcessor {
41 pub fn lock(&self) -> Result<BasicLockedProcessor<'_>> {
43 self.state
44 .lock()
45 .map_err(From::from)
46 .map(|locked| BasicLockedProcessor {
47 parent: self,
48 state: locked,
49 })
50 }
51}
52
53impl Processor for BasicProcessor {
54 fn aggregation_selector(&self) -> &dyn AggregatorSelector {
55 self.aggregator_selector.as_ref()
56 }
57}
58
59#[derive(Debug)]
61pub struct BasicLockedProcessor<'a> {
62 parent: &'a BasicProcessor,
63 state: MutexGuard<'a, BasicProcessorState>,
64}
65
66impl<'a> LockedProcessor for BasicLockedProcessor<'a> {
67 fn process(&mut self, accumulation: Accumulation<'_>) -> Result<()> {
68 if self.state.started_collection != self.state.finished_collection.wrapping_add(1) {
69 return Err(MetricsError::InconsistentState);
70 }
71
72 let desc = accumulation.descriptor();
73 let mut hasher = FnvHasher::default();
74 desc.attribute_hash().hash(&mut hasher);
75 hash_attributes(&mut hasher, accumulation.attributes().into_iter());
76 hash_attributes(&mut hasher, accumulation.resource().into_iter());
77 let key = StateKey(hasher.finish());
78 let agg = accumulation.aggregator();
79 let finished_collection = self.state.finished_collection;
80 if let Some(value) = self.state.values.get_mut(&key) {
81 let same_collection = finished_collection == value.updated;
83 value.updated = finished_collection;
84
85 if !same_collection {
115 if !value.current_owned {
116 value.current = agg.clone();
121 return Ok(());
122 }
123 return agg.synchronized_move(&value.current, desc);
124 }
125
126 if !value.current_owned {
129 let tmp = value.current.clone();
130 if let Some(current) = self.parent.aggregation_selector().aggregator_for(desc) {
131 value.current = current;
132 value.current_owned = true;
133 tmp.synchronized_move(&value.current, desc)?;
134 }
135 }
136
137 return value.current.merge(agg.as_ref(), desc);
139 }
140
141 let stateful = self
142 .parent
143 .export_selector
144 .export_kind_for(desc)
145 .memory_required(desc.instrument_kind());
146
147 let mut delta = None;
148 let cumulative = if stateful {
149 if desc.instrument_kind().precomputed_sum() {
150 delta = self.parent.aggregation_selector().aggregator_for(desc);
152 }
153 self.parent.aggregation_selector().aggregator_for(desc)
155 } else {
156 None
157 };
158
159 self.state.values.insert(
160 key,
161 StateValue {
162 descriptor: desc.clone(),
163 attributes: accumulation.attributes().clone(),
164 resource: accumulation.resource().clone(),
165 current_owned: false,
166 current: agg.clone(),
167 delta,
168 cumulative,
169 stateful,
170 updated: finished_collection,
171 },
172 );
173
174 Ok(())
175 }
176}
177
178impl Checkpointer for BasicLockedProcessor<'_> {
179 fn checkpoint_set(&mut self) -> &mut dyn CheckpointSet {
180 &mut *self.state
181 }
182
183 fn start_collection(&mut self) {
184 if self.state.started_collection != 0 {
185 self.state.interval_start = self.state.interval_end;
186 }
187 self.state.started_collection = self.state.started_collection.wrapping_add(1);
188 }
189
190 fn finish_collection(&mut self) -> Result<()> {
191 self.state.interval_end = crate::time::now();
192 if self.state.started_collection != self.state.finished_collection.wrapping_add(1) {
193 return Err(MetricsError::InconsistentState);
194 }
195 let finished_collection = self.state.finished_collection;
196 self.state.finished_collection = self.state.finished_collection.wrapping_add(1);
197 let has_memory = self.state.config.memory;
198
199 let mut result = Ok(());
200
201 self.state.values.retain(|_key, value| {
202 if result.is_err() {
204 return true;
205 }
206
207 let mkind = value.descriptor.instrument_kind();
208
209 let stale = value.updated != finished_collection;
210 let stateless = !value.stateful;
211
212 if stale || stateless {
215 if stale && stateless && !has_memory {
219 return false;
220 }
221 return true;
222 }
223
224 if mkind.precomputed_sum() {
227 if let Some(current_subtractor) =
228 value.current.as_any().downcast_ref::<SumAggregator>()
229 {
230 if let (Some(cumulative), Some(delta)) =
233 (value.cumulative.as_ref(), value.delta.as_ref())
234 {
235 result = current_subtractor
236 .subtract(cumulative.as_ref(), delta.as_ref(), &value.descriptor)
237 .and_then(|_| {
238 value
239 .current
240 .synchronized_move(cumulative, &value.descriptor)
241 });
242 }
243 } else {
244 result = Err(MetricsError::NoSubtraction);
245 }
246 } else {
247 if let Some(cumulative) = value.cumulative.as_ref() {
250 result = cumulative.merge(value.current.as_ref(), &value.descriptor)
251 }
252 }
253
254 true
255 });
256
257 result
258 }
259}
260
261#[derive(Debug, Default)]
262struct BasicProcessorConfig {
263 memory: bool,
268}
269
270#[derive(Debug)]
271struct BasicProcessorState {
272 config: BasicProcessorConfig,
273 values: HashMap<StateKey, StateValue>,
274 process_start: SystemTime,
276 interval_start: SystemTime,
277 interval_end: SystemTime,
278 started_collection: u64,
279 finished_collection: u64,
280}
281
282impl BasicProcessorState {
283 fn with_memory(memory: bool) -> Self {
284 let mut state = BasicProcessorState::default();
285 state.config.memory = memory;
286 state
287 }
288}
289
290impl Default for BasicProcessorState {
291 fn default() -> Self {
292 BasicProcessorState {
293 config: BasicProcessorConfig::default(),
294 values: HashMap::default(),
295 process_start: crate::time::now(),
296 interval_start: crate::time::now(),
297 interval_end: crate::time::now(),
298 started_collection: 0,
299 finished_collection: 0,
300 }
301 }
302}
303
304impl CheckpointSet for BasicProcessorState {
305 fn try_for_each(
306 &mut self,
307 exporter: &dyn ExportKindFor,
308 f: &mut dyn FnMut(&Record<'_>) -> Result<()>,
309 ) -> Result<()> {
310 if self.started_collection != self.finished_collection {
311 return Err(MetricsError::InconsistentState);
312 }
313
314 self.values.iter().try_for_each(|(_key, value)| {
315 let instrument_kind = value.descriptor.instrument_kind();
316
317 let agg;
318 let start;
319
320 if !self.config.memory && value.updated != self.finished_collection.wrapping_sub(1) {
323 return Ok(());
324 }
325
326 match exporter.export_kind_for(&value.descriptor) {
327 ExportKind::Cumulative => {
328 if value.stateful {
332 agg = value.cumulative.as_ref();
333 } else {
334 agg = Some(&value.current);
335 }
336
337 start = self.process_start;
338 }
339
340 ExportKind::Delta => {
341 if instrument_kind.precomputed_sum() {
343 agg = value.delta.as_ref();
344 } else {
345 agg = Some(&value.current);
346 }
347
348 start = self.interval_start;
349 }
350 }
351
352 let res = f(&metrics::record(
353 &value.descriptor,
354 &value.attributes,
355 &value.resource,
356 agg,
357 start,
358 self.interval_end,
359 ));
360 if let Err(MetricsError::NoDataCollected) = res {
361 Ok(())
362 } else {
363 res
364 }
365 })
366 }
367}
368
369#[derive(Debug, PartialEq, Eq, Hash)]
370struct StateKey(u64);
371
372#[derive(Debug)]
373struct StateValue {
374 descriptor: Descriptor,
376
377 attributes: AttributeSet,
379
380 resource: Resource,
382
383 updated: u64,
386
387 stateful: bool,
390
391 current_owned: bool,
399
400 current: Arc<dyn Aggregator + Send + Sync>,
404
405 delta: Option<Arc<dyn Aggregator + Send + Sync>>,
408
409 cumulative: Option<Arc<dyn Aggregator + Send + Sync>>,
412}