opentelemetry_spanprocessor_any/sdk/metrics/processors/
basic.rs

1use crate::sdk::{
2    export::metrics::{
3        self, Accumulation, Aggregator, AggregatorSelector, CheckpointSet, Checkpointer,
4        ExportKind, ExportKindFor, LockedProcessor, Processor, Record, Subtractor,
5    },
6    metrics::aggregators::SumAggregator,
7    Resource,
8};
9use crate::{
10    attributes::{hash_attributes, AttributeSet},
11    metrics::{Descriptor, MetricsError, Result},
12};
13use fnv::FnvHasher;
14use std::collections::HashMap;
15use std::hash::{Hash, Hasher};
16use std::sync::{Arc, Mutex, MutexGuard};
17use std::time::SystemTime;
18
19/// Create a new basic processor
20pub fn basic(
21    aggregator_selector: Box<dyn AggregatorSelector + Send + Sync>,
22    export_selector: Box<dyn ExportKindFor + Send + Sync>,
23    memory: bool,
24) -> BasicProcessor {
25    BasicProcessor {
26        aggregator_selector,
27        export_selector,
28        state: Mutex::new(BasicProcessorState::with_memory(memory)),
29    }
30}
31
32/// Basic metric integration strategy
33#[derive(Debug)]
34pub struct BasicProcessor {
35    aggregator_selector: Box<dyn AggregatorSelector + Send + Sync>,
36    export_selector: Box<dyn ExportKindFor + Send + Sync>,
37    state: Mutex<BasicProcessorState>,
38}
39
40impl BasicProcessor {
41    /// Lock this processor to return a mutable locked processor
42    pub fn lock(&self) -> Result<BasicLockedProcessor<'_>> {
43        self.state
44            .lock()
45            .map_err(From::from)
46            .map(|locked| BasicLockedProcessor {
47                parent: self,
48                state: locked,
49            })
50    }
51}
52
53impl Processor for BasicProcessor {
54    fn aggregation_selector(&self) -> &dyn AggregatorSelector {
55        self.aggregator_selector.as_ref()
56    }
57}
58
59/// A locked representation of the processor used where mutable references are necessary.
60#[derive(Debug)]
61pub struct BasicLockedProcessor<'a> {
62    parent: &'a BasicProcessor,
63    state: MutexGuard<'a, BasicProcessorState>,
64}
65
66impl<'a> LockedProcessor for BasicLockedProcessor<'a> {
67    fn process(&mut self, accumulation: Accumulation<'_>) -> Result<()> {
68        if self.state.started_collection != self.state.finished_collection.wrapping_add(1) {
69            return Err(MetricsError::InconsistentState);
70        }
71
72        let desc = accumulation.descriptor();
73        let mut hasher = FnvHasher::default();
74        desc.attribute_hash().hash(&mut hasher);
75        hash_attributes(&mut hasher, accumulation.attributes().into_iter());
76        hash_attributes(&mut hasher, accumulation.resource().into_iter());
77        let key = StateKey(hasher.finish());
78        let agg = accumulation.aggregator();
79        let finished_collection = self.state.finished_collection;
80        if let Some(value) = self.state.values.get_mut(&key) {
81            // Advance the update sequence number.
82            let same_collection = finished_collection == value.updated;
83            value.updated = finished_collection;
84
85            // At this point in the code, we have located an existing
86            // value for some stateKey.  This can be because:
87            //
88            // (a) stateful aggregation is being used, the entry was
89            // entered during a prior collection, and this is the first
90            // time processing an accumulation for this stateKey in the
91            // current collection.  Since this is the first time
92            // processing an accumulation for this stateKey during this
93            // collection, we don't know yet whether there are multiple
94            // accumulators at work.  If there are multiple accumulators,
95            // they'll hit case (b) the second time through.
96            //
97            // (b) multiple accumulators are being used, whether stateful
98            // or not.
99            //
100            // Case (a) occurs when the instrument and the exporter
101            // require memory to work correctly, either because the
102            // instrument reports a PrecomputedSum to a DeltaExporter or
103            // the reverse, a non-PrecomputedSum instrument with a
104            // CumulativeExporter.  This logic is encapsulated in
105            // ExportKind.MemoryRequired(MetricKind).
106            //
107            // Case (b) occurs when the variable `sameCollection` is true,
108            // indicating that the stateKey for Accumulation has already
109            // been seen in the same collection.  When this happens, it
110            // implies that multiple Accumulators are being used, or that
111            // a single Accumulator has been configured with an attribute key
112            // filter.
113
114            if !same_collection {
115                if !value.current_owned {
116                    // This is the first Accumulation we've seen for this
117                    // stateKey during this collection.  Just keep a
118                    // reference to the Accumulator's Aggregator. All the other cases
119                    // copy Aggregator state.
120                    value.current = agg.clone();
121                    return Ok(());
122                }
123                return agg.synchronized_move(&value.current, desc);
124            }
125
126            // If the current is not owned, take ownership of a copy
127            // before merging below.
128            if !value.current_owned {
129                let tmp = value.current.clone();
130                if let Some(current) = self.parent.aggregation_selector().aggregator_for(desc) {
131                    value.current = current;
132                    value.current_owned = true;
133                    tmp.synchronized_move(&value.current, desc)?;
134                }
135            }
136
137            // Combine this `Accumulation` with the prior `Accumulation`.
138            return value.current.merge(agg.as_ref(), desc);
139        }
140
141        let stateful = self
142            .parent
143            .export_selector
144            .export_kind_for(desc)
145            .memory_required(desc.instrument_kind());
146
147        let mut delta = None;
148        let cumulative = if stateful {
149            if desc.instrument_kind().precomputed_sum() {
150                // If we know we need to compute deltas, allocate one.
151                delta = self.parent.aggregation_selector().aggregator_for(desc);
152            }
153            // Always allocate a cumulative aggregator if stateful
154            self.parent.aggregation_selector().aggregator_for(desc)
155        } else {
156            None
157        };
158
159        self.state.values.insert(
160            key,
161            StateValue {
162                descriptor: desc.clone(),
163                attributes: accumulation.attributes().clone(),
164                resource: accumulation.resource().clone(),
165                current_owned: false,
166                current: agg.clone(),
167                delta,
168                cumulative,
169                stateful,
170                updated: finished_collection,
171            },
172        );
173
174        Ok(())
175    }
176}
177
178impl Checkpointer for BasicLockedProcessor<'_> {
179    fn checkpoint_set(&mut self) -> &mut dyn CheckpointSet {
180        &mut *self.state
181    }
182
183    fn start_collection(&mut self) {
184        if self.state.started_collection != 0 {
185            self.state.interval_start = self.state.interval_end;
186        }
187        self.state.started_collection = self.state.started_collection.wrapping_add(1);
188    }
189
190    fn finish_collection(&mut self) -> Result<()> {
191        self.state.interval_end = crate::time::now();
192        if self.state.started_collection != self.state.finished_collection.wrapping_add(1) {
193            return Err(MetricsError::InconsistentState);
194        }
195        let finished_collection = self.state.finished_collection;
196        self.state.finished_collection = self.state.finished_collection.wrapping_add(1);
197        let has_memory = self.state.config.memory;
198
199        let mut result = Ok(());
200
201        self.state.values.retain(|_key, value| {
202            // Return early if previous error
203            if result.is_err() {
204                return true;
205            }
206
207            let mkind = value.descriptor.instrument_kind();
208
209            let stale = value.updated != finished_collection;
210            let stateless = !value.stateful;
211
212            // The following branch updates stateful aggregators. Skip these updates
213            // if the aggregator is not stateful or if the aggregator is stale.
214            if stale || stateless {
215                // If this processor does not require memory, stale, stateless
216                // entries can be removed. This implies that they were not updated
217                // over the previous full collection interval.
218                if stale && stateless && !has_memory {
219                    return false;
220                }
221                return true;
222            }
223
224            // Update Aggregator state to support exporting either a
225            // delta or a cumulative aggregation.
226            if mkind.precomputed_sum() {
227                if let Some(current_subtractor) =
228                    value.current.as_any().downcast_ref::<SumAggregator>()
229                {
230                    // This line is equivalent to:
231                    // value.delta = currentSubtractor - value.cumulative
232                    if let (Some(cumulative), Some(delta)) =
233                        (value.cumulative.as_ref(), value.delta.as_ref())
234                    {
235                        result = current_subtractor
236                            .subtract(cumulative.as_ref(), delta.as_ref(), &value.descriptor)
237                            .and_then(|_| {
238                                value
239                                    .current
240                                    .synchronized_move(cumulative, &value.descriptor)
241                            });
242                    }
243                } else {
244                    result = Err(MetricsError::NoSubtraction);
245                }
246            } else {
247                // This line is equivalent to:
248                // value.cumulative = value.cumulative + value.delta
249                if let Some(cumulative) = value.cumulative.as_ref() {
250                    result = cumulative.merge(value.current.as_ref(), &value.descriptor)
251                }
252            }
253
254            true
255        });
256
257        result
258    }
259}
260
261#[derive(Debug, Default)]
262struct BasicProcessorConfig {
263    /// Memory controls whether the processor remembers metric instruments and attribute
264    /// sets that were previously reported. When Memory is true,
265    /// `CheckpointSet::try_for_each` will visit metrics that were not updated in
266    /// the most recent interval.
267    memory: bool,
268}
269
270#[derive(Debug)]
271struct BasicProcessorState {
272    config: BasicProcessorConfig,
273    values: HashMap<StateKey, StateValue>,
274    // Note: the timestamp logic currently assumes all exports are deltas.
275    process_start: SystemTime,
276    interval_start: SystemTime,
277    interval_end: SystemTime,
278    started_collection: u64,
279    finished_collection: u64,
280}
281
282impl BasicProcessorState {
283    fn with_memory(memory: bool) -> Self {
284        let mut state = BasicProcessorState::default();
285        state.config.memory = memory;
286        state
287    }
288}
289
290impl Default for BasicProcessorState {
291    fn default() -> Self {
292        BasicProcessorState {
293            config: BasicProcessorConfig::default(),
294            values: HashMap::default(),
295            process_start: crate::time::now(),
296            interval_start: crate::time::now(),
297            interval_end: crate::time::now(),
298            started_collection: 0,
299            finished_collection: 0,
300        }
301    }
302}
303
304impl CheckpointSet for BasicProcessorState {
305    fn try_for_each(
306        &mut self,
307        exporter: &dyn ExportKindFor,
308        f: &mut dyn FnMut(&Record<'_>) -> Result<()>,
309    ) -> Result<()> {
310        if self.started_collection != self.finished_collection {
311            return Err(MetricsError::InconsistentState);
312        }
313
314        self.values.iter().try_for_each(|(_key, value)| {
315            let instrument_kind = value.descriptor.instrument_kind();
316
317            let agg;
318            let start;
319
320            // If the processor does not have memory and it was not updated in the
321            // prior round, do not visit this value.
322            if !self.config.memory && value.updated != self.finished_collection.wrapping_sub(1) {
323                return Ok(());
324            }
325
326            match exporter.export_kind_for(&value.descriptor) {
327                ExportKind::Cumulative => {
328                    // If stateful, the sum has been computed.  If stateless, the
329                    // input was already cumulative. Either way, use the
330                    // checkpointed value:
331                    if value.stateful {
332                        agg = value.cumulative.as_ref();
333                    } else {
334                        agg = Some(&value.current);
335                    }
336
337                    start = self.process_start;
338                }
339
340                ExportKind::Delta => {
341                    // Precomputed sums are a special case.
342                    if instrument_kind.precomputed_sum() {
343                        agg = value.delta.as_ref();
344                    } else {
345                        agg = Some(&value.current);
346                    }
347
348                    start = self.interval_start;
349                }
350            }
351
352            let res = f(&metrics::record(
353                &value.descriptor,
354                &value.attributes,
355                &value.resource,
356                agg,
357                start,
358                self.interval_end,
359            ));
360            if let Err(MetricsError::NoDataCollected) = res {
361                Ok(())
362            } else {
363                res
364            }
365        })
366    }
367}
368
369#[derive(Debug, PartialEq, Eq, Hash)]
370struct StateKey(u64);
371
372#[derive(Debug)]
373struct StateValue {
374    /// Instrument descriptor
375    descriptor: Descriptor,
376
377    /// Instrument attributes
378    attributes: AttributeSet,
379
380    /// Resource that created the instrument
381    resource: Resource,
382
383    /// Indicates the last sequence number when this value had process called by an
384    /// accumulator.
385    updated: u64,
386
387    /// Indicates that a cumulative aggregation is being maintained, taken from the
388    /// process start time.
389    stateful: bool,
390
391    /// Indicates that "current" was allocated
392    /// by the processor in order to merge results from
393    /// multiple `Accumulator`s during a single collection
394    /// round, which may happen either because:
395    ///
396    /// (1) multiple `Accumulator`s output the same `Accumulation.
397    /// (2) one `Accumulator` is configured with dimensionality reduction.
398    current_owned: bool,
399
400    /// The output from a single `Accumulator` (if !current_owned) or an
401    /// `Aggregator` owned by the processor used to accumulate multiple values in a
402    /// single collection round.
403    current: Arc<dyn Aggregator + Send + Sync>,
404
405    /// If `Some`, refers to an `Aggregator` owned by the processor used to compute
406    /// deltas between precomputed sums.
407    delta: Option<Arc<dyn Aggregator + Send + Sync>>,
408
409    /// If `Some`, refers to an `Aggregator` owned by the processor used to store
410    /// the last cumulative value.
411    cumulative: Option<Arc<dyn Aggregator + Send + Sync>>,
412}