opentelemetry_spanprocessor_any/sdk/metrics/
mod.rs

1//! # OpenTelemetry Metrics SDK
2use crate::global;
3use crate::metrics::{
4    sdk_api::{self, InstrumentCore as _, SyncBoundInstrumentCore as _},
5    AsyncRunner, AtomicNumber, Descriptor, Measurement, Number, NumberKind, Observation, Result,
6};
7use crate::sdk::{
8    export::{
9        self,
10        metrics::{Aggregator, LockedProcessor, Processor},
11    },
12    resource::Resource,
13};
14use crate::{
15    attributes::{hash_attributes, AttributeSet},
16    Context, KeyValue,
17};
18use fnv::FnvHasher;
19use std::any::Any;
20use std::cmp::Ordering;
21use std::collections::HashMap;
22use std::hash::{Hash, Hasher};
23use std::sync::{Arc, Mutex};
24
25pub mod aggregators;
26pub mod controllers;
27pub mod processors;
28pub mod selectors;
29
30use crate::sdk::resource::SdkProvidedResourceDetector;
31pub use controllers::{PullController, PushController, PushControllerWorker};
32use std::time::Duration;
33
34/// Creates a new accumulator builder
35pub fn accumulator(processor: Arc<dyn Processor + Send + Sync>) -> AccumulatorBuilder {
36    AccumulatorBuilder {
37        processor,
38        resource: None,
39    }
40}
41
42/// Configuration for an accumulator
43#[derive(Debug)]
44pub struct AccumulatorBuilder {
45    processor: Arc<dyn Processor + Send + Sync>,
46    resource: Option<Resource>,
47}
48
49impl AccumulatorBuilder {
50    /// The resource that will be applied to all records in this accumulator.
51    pub fn with_resource(self, resource: Resource) -> Self {
52        AccumulatorBuilder {
53            resource: Some(resource),
54            ..self
55        }
56    }
57
58    /// Create a new accumulator from this configuration
59    pub fn build(self) -> Accumulator {
60        let sdk_provided_resource = Resource::from_detectors(
61            Duration::from_secs(0),
62            vec![Box::new(SdkProvidedResourceDetector)],
63        );
64        let resource = self.resource.unwrap_or(sdk_provided_resource);
65        Accumulator(Arc::new(AccumulatorCore::new(self.processor, resource)))
66    }
67}
68
69/// Accumulator implements the OpenTelemetry Meter API. The Accumulator is bound
70/// to a single `Processor`.
71///
72/// The Accumulator supports a collect API to gather and export current data.
73/// `Collect` should be arranged according to the processor model. Push-based
74/// processors will setup a timer to call `collect` periodically. Pull-based
75/// processors will call `collect` when a pull request arrives.
76#[derive(Debug, Clone)]
77pub struct Accumulator(Arc<AccumulatorCore>);
78
79#[derive(Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
80struct MapKey {
81    instrument_hash: u64,
82}
83
84type AsyncRunnerPair = (AsyncRunner, Option<Arc<dyn sdk_api::AsyncInstrumentCore>>);
85
86#[derive(Default, Debug)]
87struct AsyncInstrumentState {
88    /// The set of runners in the order they were registered that will run each
89    /// collection interval.
90    ///
91    /// Non-batch observers are entered with an instrument, batch observers are
92    /// entered without an instrument, each is called once allowing both batch and
93    /// individual observations to be collected.
94    runners: Vec<AsyncRunnerPair>,
95
96    /// The set of instruments in the order they were registered.
97    instruments: Vec<Arc<dyn sdk_api::AsyncInstrumentCore>>,
98}
99
100fn collect_async(attributes: &[KeyValue], observations: &[Observation]) {
101    let attributes = AttributeSet::from_attributes(attributes.iter().cloned());
102
103    for observation in observations {
104        if let Some(instrument) = observation
105            .instrument()
106            .as_any()
107            .downcast_ref::<AsyncInstrument>()
108        {
109            instrument.observe(observation.number(), &attributes)
110        }
111    }
112}
113
114impl AsyncInstrumentState {
115    /// Executes the complete set of observer callbacks.
116    fn run(&self) {
117        for (runner, instrument) in self.runners.iter() {
118            runner.run(instrument, collect_async)
119        }
120    }
121}
122
123#[derive(Debug)]
124struct AccumulatorCore {
125    /// A concurrent map of current sync instrument state.
126    current: dashmap::DashMap<MapKey, Arc<Record>>,
127    /// A collection of async instrument state
128    async_instruments: Mutex<AsyncInstrumentState>,
129
130    /// The current epoch number. It is incremented in `collect`.
131    current_epoch: AtomicNumber,
132    /// The configured processor.
133    processor: Arc<dyn Processor + Send + Sync>,
134    /// The resource applied to all records in this Accumulator.
135    resource: Resource,
136}
137
138impl AccumulatorCore {
139    fn new(processor: Arc<dyn Processor + Send + Sync>, resource: Resource) -> Self {
140        AccumulatorCore {
141            current: dashmap::DashMap::new(),
142            async_instruments: Mutex::new(AsyncInstrumentState::default()),
143            current_epoch: NumberKind::U64.zero().to_atomic(),
144            processor,
145            resource,
146        }
147    }
148
149    fn register(
150        &self,
151        instrument: Arc<dyn sdk_api::AsyncInstrumentCore>,
152        runner: Option<AsyncRunner>,
153    ) -> Result<()> {
154        self.async_instruments
155            .lock()
156            .map_err(Into::into)
157            .map(|mut async_instruments| {
158                if let Some(runner) = runner {
159                    async_instruments
160                        .runners
161                        .push((runner, Some(instrument.clone())));
162                };
163                async_instruments.instruments.push(instrument);
164            })
165    }
166
167    fn register_runner(&self, runner: AsyncRunner) -> Result<()> {
168        self.async_instruments
169            .lock()
170            .map_err(Into::into)
171            .map(|mut async_instruments| async_instruments.runners.push((runner, None)))
172    }
173
174    fn collect(&self, locked_processor: &mut dyn LockedProcessor) -> usize {
175        let mut checkpointed = self.observe_async_instruments(locked_processor);
176        checkpointed += self.collect_sync_instruments(locked_processor);
177        self.current_epoch.fetch_add(&NumberKind::U64, &1u64.into());
178
179        checkpointed
180    }
181
182    fn observe_async_instruments(&self, locked_processor: &mut dyn LockedProcessor) -> usize {
183        self.async_instruments
184            .lock()
185            .map_or(0, |async_instruments| {
186                let mut async_collected = 0;
187
188                async_instruments.run();
189
190                for instrument in &async_instruments.instruments {
191                    if let Some(a) = instrument.as_any().downcast_ref::<AsyncInstrument>() {
192                        async_collected += self.checkpoint_async(a, locked_processor);
193                    }
194                }
195
196                async_collected
197            })
198    }
199
200    fn collect_sync_instruments(&self, locked_processor: &mut dyn LockedProcessor) -> usize {
201        let mut checkpointed = 0;
202
203        self.current.retain(|_key, value| {
204            let mods = &value.update_count.load();
205            let coll = &value.collected_count.load();
206
207            if mods.partial_cmp(&NumberKind::U64, coll) != Some(Ordering::Equal) {
208                // Updates happened in this interval,
209                // checkpoint and continue.
210                checkpointed += self.checkpoint_record(value, locked_processor);
211                value.collected_count.store(mods);
212            } else {
213                // Having no updates since last collection, try to remove if
214                // there are no bound handles
215                if Arc::strong_count(value) == 1 {
216                    // There's a potential race between loading collected count and
217                    // loading the strong count in this function.  Since this is the
218                    // last we'll see of this record, checkpoint.
219                    if mods.partial_cmp(&NumberKind::U64, coll) != Some(Ordering::Equal) {
220                        checkpointed += self.checkpoint_record(value, locked_processor);
221                    }
222                    return false;
223                }
224            };
225            true
226        });
227
228        checkpointed
229    }
230
231    fn checkpoint_record(
232        &self,
233        record: &Record,
234        locked_processor: &mut dyn LockedProcessor,
235    ) -> usize {
236        if let (Some(current), Some(checkpoint)) = (&record.current, &record.checkpoint) {
237            if let Err(err) = current.synchronized_move(checkpoint, record.instrument.descriptor())
238            {
239                global::handle_error(err);
240
241                return 0;
242            }
243
244            let accumulation = export::metrics::accumulation(
245                record.instrument.descriptor(),
246                &record.attributes,
247                &self.resource,
248                checkpoint,
249            );
250            if let Err(err) = locked_processor.process(accumulation) {
251                global::handle_error(err);
252            }
253
254            1
255        } else {
256            0
257        }
258    }
259
260    fn checkpoint_async(
261        &self,
262        instrument: &AsyncInstrument,
263        locked_processor: &mut dyn LockedProcessor,
264    ) -> usize {
265        instrument.recorders.lock().map_or(0, |mut recorders| {
266            let mut checkpointed = 0;
267            match recorders.as_mut() {
268                None => return checkpointed,
269                Some(recorders) => {
270                    recorders.retain(|_key, attribute_recorder| {
271                        let epoch_diff = self.current_epoch.load().partial_cmp(
272                            &NumberKind::U64,
273                            &attribute_recorder.observed_epoch.into(),
274                        );
275                        if epoch_diff == Some(Ordering::Equal) {
276                            if let Some(observed) = &attribute_recorder.observed {
277                                let accumulation = export::metrics::accumulation(
278                                    instrument.descriptor(),
279                                    &attribute_recorder.attributes,
280                                    &self.resource,
281                                    observed,
282                                );
283
284                                if let Err(err) = locked_processor.process(accumulation) {
285                                    global::handle_error(err);
286                                }
287                                checkpointed += 1;
288                            }
289                        }
290
291                        // Retain if this is not second collection cycle with no
292                        // observations for this AttributeSet.
293                        epoch_diff == Some(Ordering::Greater)
294                    });
295                }
296            }
297            if recorders.as_ref().map_or(false, |map| map.is_empty()) {
298                *recorders = None;
299            }
300
301            checkpointed
302        })
303    }
304}
305
306#[derive(Debug, Clone)]
307struct SyncInstrument {
308    instrument: Arc<Instrument>,
309}
310
311impl SyncInstrument {
312    fn acquire_handle(&self, attributes: &[KeyValue]) -> Arc<Record> {
313        let mut hasher = FnvHasher::default();
314        self.instrument
315            .descriptor
316            .attribute_hash()
317            .hash(&mut hasher);
318
319        hash_attributes(
320            &mut hasher,
321            attributes.iter().map(|kv| (&kv.key, &kv.value)),
322        );
323
324        let map_key = MapKey {
325            instrument_hash: hasher.finish(),
326        };
327        let current = &self.instrument.meter.0.current;
328        if let Some(existing_record) = current.get(&map_key) {
329            return existing_record.value().clone();
330        }
331
332        let record = Arc::new(Record {
333            update_count: NumberKind::U64.zero().to_atomic(),
334            collected_count: NumberKind::U64.zero().to_atomic(),
335            attributes: AttributeSet::from_attributes(attributes.iter().cloned()),
336            instrument: self.clone(),
337            current: self
338                .instrument
339                .meter
340                .0
341                .processor
342                .aggregation_selector()
343                .aggregator_for(&self.instrument.descriptor),
344            checkpoint: self
345                .instrument
346                .meter
347                .0
348                .processor
349                .aggregation_selector()
350                .aggregator_for(&self.instrument.descriptor),
351        });
352        current.insert(map_key, record.clone());
353
354        record
355    }
356}
357
358impl sdk_api::InstrumentCore for SyncInstrument {
359    fn descriptor(&self) -> &Descriptor {
360        self.instrument.descriptor()
361    }
362}
363
364impl sdk_api::SyncInstrumentCore for SyncInstrument {
365    fn bind(&self, attributes: &'_ [KeyValue]) -> Arc<dyn sdk_api::SyncBoundInstrumentCore> {
366        self.acquire_handle(attributes)
367    }
368    fn record_one(&self, number: Number, attributes: &'_ [KeyValue]) {
369        let handle = self.acquire_handle(attributes);
370        handle.record_one(number)
371    }
372    fn as_any(&self) -> &dyn Any {
373        self
374    }
375}
376
377#[derive(Debug)]
378struct AttributedRecorder {
379    observed_epoch: u64,
380    attributes: AttributeSet,
381    observed: Option<Arc<dyn Aggregator + Send + Sync>>,
382}
383
384#[derive(Debug, Clone)]
385struct AsyncInstrument {
386    instrument: Arc<Instrument>,
387    recorders: Arc<Mutex<Option<HashMap<u64, AttributedRecorder>>>>,
388}
389
390impl AsyncInstrument {
391    fn observe(&self, number: &Number, attributes: &AttributeSet) {
392        if let Err(err) = aggregators::range_test(number, &self.instrument.descriptor) {
393            global::handle_error(err);
394        }
395        if let Some(recorder) = self.get_recorder(attributes) {
396            if let Err(err) = recorder.update(number, &self.instrument.descriptor) {
397                global::handle_error(err)
398            }
399        }
400    }
401
402    fn get_recorder(&self, attributes: &AttributeSet) -> Option<Arc<dyn Aggregator + Send + Sync>> {
403        self.recorders.lock().map_or(None, |mut recorders| {
404            let mut hasher = FnvHasher::default();
405            hash_attributes(&mut hasher, attributes.into_iter());
406            let attribute_hash = hasher.finish();
407            if let Some(recorder) = recorders
408                .as_mut()
409                .and_then(|rec| rec.get_mut(&attribute_hash))
410            {
411                let current_epoch = self
412                    .instrument
413                    .meter
414                    .0
415                    .current_epoch
416                    .load()
417                    .to_u64(&NumberKind::U64);
418                if recorder.observed_epoch == current_epoch {
419                    // last value wins for Observers, so if we see the same attributes
420                    // in the current epoch, we replace the old recorder
421                    return self
422                        .instrument
423                        .meter
424                        .0
425                        .processor
426                        .aggregation_selector()
427                        .aggregator_for(&self.instrument.descriptor);
428                } else {
429                    recorder.observed_epoch = current_epoch;
430                }
431                return recorder.observed.clone();
432            }
433
434            let recorder = self
435                .instrument
436                .meter
437                .0
438                .processor
439                .aggregation_selector()
440                .aggregator_for(&self.instrument.descriptor);
441            if recorders.is_none() {
442                *recorders = Some(HashMap::new());
443            }
444            // This may store a recorder with no aggregator in the map, thus disabling the
445            // async_instrument for the AttributeSet for good. This is intentional, but will be
446            // revisited later.
447            let observed_epoch = self
448                .instrument
449                .meter
450                .0
451                .current_epoch
452                .load()
453                .to_u64(&NumberKind::U64);
454            recorders.as_mut().unwrap().insert(
455                attribute_hash,
456                AttributedRecorder {
457                    observed: recorder.clone(),
458                    attributes: attributes.clone(),
459                    observed_epoch,
460                },
461            );
462
463            recorder
464        })
465    }
466}
467
468impl sdk_api::InstrumentCore for AsyncInstrument {
469    fn descriptor(&self) -> &Descriptor {
470        self.instrument.descriptor()
471    }
472}
473
474impl sdk_api::AsyncInstrumentCore for AsyncInstrument {
475    fn as_any(&self) -> &dyn Any {
476        self
477    }
478}
479
480/// record maintains the state of one metric instrument.  Due
481/// the use of lock-free algorithms, there may be more than one
482/// `record` in existence at a time, although at most one can
483/// be referenced from the `Accumulator.current` map.
484#[derive(Debug)]
485struct Record {
486    /// Incremented on every call to `update`.
487    update_count: AtomicNumber,
488
489    /// Set to `update_count` on collection, supports checking for no updates during
490    /// a round.
491    collected_count: AtomicNumber,
492
493    /// The processed attribute set for this record.
494    ///
495    /// TODO: look at perf here.
496    attributes: AttributeSet,
497
498    /// The corresponding instrument.
499    instrument: SyncInstrument,
500
501    /// current implements the actual `record_one` API, depending on the type of
502    /// aggregation. If `None`, the metric was disabled by the exporter.
503    current: Option<Arc<dyn Aggregator + Send + Sync>>,
504    checkpoint: Option<Arc<dyn Aggregator + Send + Sync>>,
505}
506
507impl sdk_api::SyncBoundInstrumentCore for Record {
508    fn record_one<'a>(&self, number: Number) {
509        // check if the instrument is disabled according to the AggregatorSelector.
510        if let Some(recorder) = &self.current {
511            if let Err(err) =
512                aggregators::range_test(&number, &self.instrument.instrument.descriptor)
513                    .and_then(|_| recorder.update(&number, &self.instrument.instrument.descriptor))
514            {
515                global::handle_error(err);
516                return;
517            }
518
519            // Record was modified, inform the collect() that things need
520            // to be collected while the record is still mapped.
521            self.update_count.fetch_add(&NumberKind::U64, &1u64.into());
522        }
523    }
524}
525
526struct Instrument {
527    descriptor: Descriptor,
528    meter: Accumulator,
529}
530
531impl std::fmt::Debug for Instrument {
532    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
533        f.debug_struct("Instrument")
534            .field("descriptor", &self.descriptor)
535            .field("meter", &"Accumulator")
536            .finish()
537    }
538}
539
540impl sdk_api::InstrumentCore for Instrument {
541    fn descriptor(&self) -> &Descriptor {
542        &self.descriptor
543    }
544}
545
546impl sdk_api::MeterCore for Accumulator {
547    fn new_sync_instrument(
548        &self,
549        descriptor: Descriptor,
550    ) -> Result<Arc<dyn sdk_api::SyncInstrumentCore>> {
551        Ok(Arc::new(SyncInstrument {
552            instrument: Arc::new(Instrument {
553                descriptor,
554                meter: self.clone(),
555            }),
556        }))
557    }
558
559    fn record_batch_with_context(
560        &self,
561        _cx: &Context,
562        attributes: &[KeyValue],
563        measurements: Vec<Measurement>,
564    ) {
565        for measure in measurements.into_iter() {
566            if let Some(instrument) = measure
567                .instrument()
568                .as_any()
569                .downcast_ref::<SyncInstrument>()
570            {
571                let handle = instrument.acquire_handle(attributes);
572
573                handle.record_one(measure.into_number());
574            }
575        }
576    }
577
578    fn new_async_instrument(
579        &self,
580        descriptor: Descriptor,
581        runner: Option<AsyncRunner>,
582    ) -> Result<Arc<dyn sdk_api::AsyncInstrumentCore>> {
583        let instrument = Arc::new(AsyncInstrument {
584            instrument: Arc::new(Instrument {
585                descriptor,
586                meter: self.clone(),
587            }),
588            recorders: Arc::new(Mutex::new(None)),
589        });
590
591        self.0.register(instrument.clone(), runner)?;
592
593        Ok(instrument)
594    }
595
596    fn new_batch_observer(&self, runner: AsyncRunner) -> Result<()> {
597        self.0.register_runner(runner)
598    }
599}
600
601#[cfg(test)]
602mod tests {
603    use crate::metrics::MeterProvider;
604    use crate::sdk::export::metrics::ExportKindSelector;
605    use crate::sdk::metrics::accumulator;
606    use crate::sdk::metrics::controllers::pull;
607    use crate::sdk::metrics::selectors::simple::Selector;
608    use crate::sdk::Resource;
609    use crate::testing::metric::NoopProcessor;
610    use crate::{Key, KeyValue};
611    use std::sync::Arc;
612
613    // Prevent the debug message to get into loop
614    #[test]
615    fn test_debug_message() {
616        let controller = pull(
617            Box::new(Selector::Exact),
618            Box::new(ExportKindSelector::Delta),
619        )
620        .build();
621        let meter = controller.provider().meter("test", None);
622        let counter = meter.f64_counter("test").init();
623        println!("{:?}, {:?}, {:?}", controller, meter, counter);
624    }
625
626    #[test]
627    fn test_sdk_provided_resource_in_accumulator() {
628        let default_service_name = accumulator(Arc::new(NoopProcessor)).build();
629        assert_eq!(
630            default_service_name
631                .0
632                .resource
633                .get(Key::from_static_str("service.name"))
634                .map(|v| v.to_string()),
635            Some("unknown_service".to_string())
636        );
637
638        let custom_service_name = accumulator(Arc::new(NoopProcessor))
639            .with_resource(Resource::new(vec![KeyValue::new(
640                "service.name",
641                "test_service",
642            )]))
643            .build();
644        assert_eq!(
645            custom_service_name
646                .0
647                .resource
648                .get(Key::from_static_str("service.name"))
649                .map(|v| v.to_string()),
650            Some("test_service".to_string())
651        );
652
653        let no_service_name = accumulator(Arc::new(NoopProcessor))
654            .with_resource(Resource::empty())
655            .build();
656
657        assert_eq!(
658            no_service_name
659                .0
660                .resource
661                .get(Key::from_static_str("service.name"))
662                .map(|v| v.to_string()),
663            None
664        )
665    }
666}