1use crate::global;
3use crate::metrics::{
4 sdk_api::{self, InstrumentCore as _, SyncBoundInstrumentCore as _},
5 AsyncRunner, AtomicNumber, Descriptor, Measurement, Number, NumberKind, Observation, Result,
6};
7use crate::sdk::{
8 export::{
9 self,
10 metrics::{Aggregator, LockedProcessor, Processor},
11 },
12 resource::Resource,
13};
14use crate::{
15 attributes::{hash_attributes, AttributeSet},
16 Context, KeyValue,
17};
18use fnv::FnvHasher;
19use std::any::Any;
20use std::cmp::Ordering;
21use std::collections::HashMap;
22use std::hash::{Hash, Hasher};
23use std::sync::{Arc, Mutex};
24
25pub mod aggregators;
26pub mod controllers;
27pub mod processors;
28pub mod selectors;
29
30use crate::sdk::resource::SdkProvidedResourceDetector;
31pub use controllers::{PullController, PushController, PushControllerWorker};
32use std::time::Duration;
33
34pub fn accumulator(processor: Arc<dyn Processor + Send + Sync>) -> AccumulatorBuilder {
36 AccumulatorBuilder {
37 processor,
38 resource: None,
39 }
40}
41
42#[derive(Debug)]
44pub struct AccumulatorBuilder {
45 processor: Arc<dyn Processor + Send + Sync>,
46 resource: Option<Resource>,
47}
48
49impl AccumulatorBuilder {
50 pub fn with_resource(self, resource: Resource) -> Self {
52 AccumulatorBuilder {
53 resource: Some(resource),
54 ..self
55 }
56 }
57
58 pub fn build(self) -> Accumulator {
60 let sdk_provided_resource = Resource::from_detectors(
61 Duration::from_secs(0),
62 vec![Box::new(SdkProvidedResourceDetector)],
63 );
64 let resource = self.resource.unwrap_or(sdk_provided_resource);
65 Accumulator(Arc::new(AccumulatorCore::new(self.processor, resource)))
66 }
67}
68
69#[derive(Debug, Clone)]
77pub struct Accumulator(Arc<AccumulatorCore>);
78
79#[derive(Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
80struct MapKey {
81 instrument_hash: u64,
82}
83
84type AsyncRunnerPair = (AsyncRunner, Option<Arc<dyn sdk_api::AsyncInstrumentCore>>);
85
86#[derive(Default, Debug)]
87struct AsyncInstrumentState {
88 runners: Vec<AsyncRunnerPair>,
95
96 instruments: Vec<Arc<dyn sdk_api::AsyncInstrumentCore>>,
98}
99
100fn collect_async(attributes: &[KeyValue], observations: &[Observation]) {
101 let attributes = AttributeSet::from_attributes(attributes.iter().cloned());
102
103 for observation in observations {
104 if let Some(instrument) = observation
105 .instrument()
106 .as_any()
107 .downcast_ref::<AsyncInstrument>()
108 {
109 instrument.observe(observation.number(), &attributes)
110 }
111 }
112}
113
114impl AsyncInstrumentState {
115 fn run(&self) {
117 for (runner, instrument) in self.runners.iter() {
118 runner.run(instrument, collect_async)
119 }
120 }
121}
122
123#[derive(Debug)]
124struct AccumulatorCore {
125 current: dashmap::DashMap<MapKey, Arc<Record>>,
127 async_instruments: Mutex<AsyncInstrumentState>,
129
130 current_epoch: AtomicNumber,
132 processor: Arc<dyn Processor + Send + Sync>,
134 resource: Resource,
136}
137
138impl AccumulatorCore {
139 fn new(processor: Arc<dyn Processor + Send + Sync>, resource: Resource) -> Self {
140 AccumulatorCore {
141 current: dashmap::DashMap::new(),
142 async_instruments: Mutex::new(AsyncInstrumentState::default()),
143 current_epoch: NumberKind::U64.zero().to_atomic(),
144 processor,
145 resource,
146 }
147 }
148
149 fn register(
150 &self,
151 instrument: Arc<dyn sdk_api::AsyncInstrumentCore>,
152 runner: Option<AsyncRunner>,
153 ) -> Result<()> {
154 self.async_instruments
155 .lock()
156 .map_err(Into::into)
157 .map(|mut async_instruments| {
158 if let Some(runner) = runner {
159 async_instruments
160 .runners
161 .push((runner, Some(instrument.clone())));
162 };
163 async_instruments.instruments.push(instrument);
164 })
165 }
166
167 fn register_runner(&self, runner: AsyncRunner) -> Result<()> {
168 self.async_instruments
169 .lock()
170 .map_err(Into::into)
171 .map(|mut async_instruments| async_instruments.runners.push((runner, None)))
172 }
173
174 fn collect(&self, locked_processor: &mut dyn LockedProcessor) -> usize {
175 let mut checkpointed = self.observe_async_instruments(locked_processor);
176 checkpointed += self.collect_sync_instruments(locked_processor);
177 self.current_epoch.fetch_add(&NumberKind::U64, &1u64.into());
178
179 checkpointed
180 }
181
182 fn observe_async_instruments(&self, locked_processor: &mut dyn LockedProcessor) -> usize {
183 self.async_instruments
184 .lock()
185 .map_or(0, |async_instruments| {
186 let mut async_collected = 0;
187
188 async_instruments.run();
189
190 for instrument in &async_instruments.instruments {
191 if let Some(a) = instrument.as_any().downcast_ref::<AsyncInstrument>() {
192 async_collected += self.checkpoint_async(a, locked_processor);
193 }
194 }
195
196 async_collected
197 })
198 }
199
200 fn collect_sync_instruments(&self, locked_processor: &mut dyn LockedProcessor) -> usize {
201 let mut checkpointed = 0;
202
203 self.current.retain(|_key, value| {
204 let mods = &value.update_count.load();
205 let coll = &value.collected_count.load();
206
207 if mods.partial_cmp(&NumberKind::U64, coll) != Some(Ordering::Equal) {
208 checkpointed += self.checkpoint_record(value, locked_processor);
211 value.collected_count.store(mods);
212 } else {
213 if Arc::strong_count(value) == 1 {
216 if mods.partial_cmp(&NumberKind::U64, coll) != Some(Ordering::Equal) {
220 checkpointed += self.checkpoint_record(value, locked_processor);
221 }
222 return false;
223 }
224 };
225 true
226 });
227
228 checkpointed
229 }
230
231 fn checkpoint_record(
232 &self,
233 record: &Record,
234 locked_processor: &mut dyn LockedProcessor,
235 ) -> usize {
236 if let (Some(current), Some(checkpoint)) = (&record.current, &record.checkpoint) {
237 if let Err(err) = current.synchronized_move(checkpoint, record.instrument.descriptor())
238 {
239 global::handle_error(err);
240
241 return 0;
242 }
243
244 let accumulation = export::metrics::accumulation(
245 record.instrument.descriptor(),
246 &record.attributes,
247 &self.resource,
248 checkpoint,
249 );
250 if let Err(err) = locked_processor.process(accumulation) {
251 global::handle_error(err);
252 }
253
254 1
255 } else {
256 0
257 }
258 }
259
260 fn checkpoint_async(
261 &self,
262 instrument: &AsyncInstrument,
263 locked_processor: &mut dyn LockedProcessor,
264 ) -> usize {
265 instrument.recorders.lock().map_or(0, |mut recorders| {
266 let mut checkpointed = 0;
267 match recorders.as_mut() {
268 None => return checkpointed,
269 Some(recorders) => {
270 recorders.retain(|_key, attribute_recorder| {
271 let epoch_diff = self.current_epoch.load().partial_cmp(
272 &NumberKind::U64,
273 &attribute_recorder.observed_epoch.into(),
274 );
275 if epoch_diff == Some(Ordering::Equal) {
276 if let Some(observed) = &attribute_recorder.observed {
277 let accumulation = export::metrics::accumulation(
278 instrument.descriptor(),
279 &attribute_recorder.attributes,
280 &self.resource,
281 observed,
282 );
283
284 if let Err(err) = locked_processor.process(accumulation) {
285 global::handle_error(err);
286 }
287 checkpointed += 1;
288 }
289 }
290
291 epoch_diff == Some(Ordering::Greater)
294 });
295 }
296 }
297 if recorders.as_ref().map_or(false, |map| map.is_empty()) {
298 *recorders = None;
299 }
300
301 checkpointed
302 })
303 }
304}
305
306#[derive(Debug, Clone)]
307struct SyncInstrument {
308 instrument: Arc<Instrument>,
309}
310
311impl SyncInstrument {
312 fn acquire_handle(&self, attributes: &[KeyValue]) -> Arc<Record> {
313 let mut hasher = FnvHasher::default();
314 self.instrument
315 .descriptor
316 .attribute_hash()
317 .hash(&mut hasher);
318
319 hash_attributes(
320 &mut hasher,
321 attributes.iter().map(|kv| (&kv.key, &kv.value)),
322 );
323
324 let map_key = MapKey {
325 instrument_hash: hasher.finish(),
326 };
327 let current = &self.instrument.meter.0.current;
328 if let Some(existing_record) = current.get(&map_key) {
329 return existing_record.value().clone();
330 }
331
332 let record = Arc::new(Record {
333 update_count: NumberKind::U64.zero().to_atomic(),
334 collected_count: NumberKind::U64.zero().to_atomic(),
335 attributes: AttributeSet::from_attributes(attributes.iter().cloned()),
336 instrument: self.clone(),
337 current: self
338 .instrument
339 .meter
340 .0
341 .processor
342 .aggregation_selector()
343 .aggregator_for(&self.instrument.descriptor),
344 checkpoint: self
345 .instrument
346 .meter
347 .0
348 .processor
349 .aggregation_selector()
350 .aggregator_for(&self.instrument.descriptor),
351 });
352 current.insert(map_key, record.clone());
353
354 record
355 }
356}
357
358impl sdk_api::InstrumentCore for SyncInstrument {
359 fn descriptor(&self) -> &Descriptor {
360 self.instrument.descriptor()
361 }
362}
363
364impl sdk_api::SyncInstrumentCore for SyncInstrument {
365 fn bind(&self, attributes: &'_ [KeyValue]) -> Arc<dyn sdk_api::SyncBoundInstrumentCore> {
366 self.acquire_handle(attributes)
367 }
368 fn record_one(&self, number: Number, attributes: &'_ [KeyValue]) {
369 let handle = self.acquire_handle(attributes);
370 handle.record_one(number)
371 }
372 fn as_any(&self) -> &dyn Any {
373 self
374 }
375}
376
377#[derive(Debug)]
378struct AttributedRecorder {
379 observed_epoch: u64,
380 attributes: AttributeSet,
381 observed: Option<Arc<dyn Aggregator + Send + Sync>>,
382}
383
384#[derive(Debug, Clone)]
385struct AsyncInstrument {
386 instrument: Arc<Instrument>,
387 recorders: Arc<Mutex<Option<HashMap<u64, AttributedRecorder>>>>,
388}
389
390impl AsyncInstrument {
391 fn observe(&self, number: &Number, attributes: &AttributeSet) {
392 if let Err(err) = aggregators::range_test(number, &self.instrument.descriptor) {
393 global::handle_error(err);
394 }
395 if let Some(recorder) = self.get_recorder(attributes) {
396 if let Err(err) = recorder.update(number, &self.instrument.descriptor) {
397 global::handle_error(err)
398 }
399 }
400 }
401
402 fn get_recorder(&self, attributes: &AttributeSet) -> Option<Arc<dyn Aggregator + Send + Sync>> {
403 self.recorders.lock().map_or(None, |mut recorders| {
404 let mut hasher = FnvHasher::default();
405 hash_attributes(&mut hasher, attributes.into_iter());
406 let attribute_hash = hasher.finish();
407 if let Some(recorder) = recorders
408 .as_mut()
409 .and_then(|rec| rec.get_mut(&attribute_hash))
410 {
411 let current_epoch = self
412 .instrument
413 .meter
414 .0
415 .current_epoch
416 .load()
417 .to_u64(&NumberKind::U64);
418 if recorder.observed_epoch == current_epoch {
419 return self
422 .instrument
423 .meter
424 .0
425 .processor
426 .aggregation_selector()
427 .aggregator_for(&self.instrument.descriptor);
428 } else {
429 recorder.observed_epoch = current_epoch;
430 }
431 return recorder.observed.clone();
432 }
433
434 let recorder = self
435 .instrument
436 .meter
437 .0
438 .processor
439 .aggregation_selector()
440 .aggregator_for(&self.instrument.descriptor);
441 if recorders.is_none() {
442 *recorders = Some(HashMap::new());
443 }
444 let observed_epoch = self
448 .instrument
449 .meter
450 .0
451 .current_epoch
452 .load()
453 .to_u64(&NumberKind::U64);
454 recorders.as_mut().unwrap().insert(
455 attribute_hash,
456 AttributedRecorder {
457 observed: recorder.clone(),
458 attributes: attributes.clone(),
459 observed_epoch,
460 },
461 );
462
463 recorder
464 })
465 }
466}
467
468impl sdk_api::InstrumentCore for AsyncInstrument {
469 fn descriptor(&self) -> &Descriptor {
470 self.instrument.descriptor()
471 }
472}
473
474impl sdk_api::AsyncInstrumentCore for AsyncInstrument {
475 fn as_any(&self) -> &dyn Any {
476 self
477 }
478}
479
480#[derive(Debug)]
485struct Record {
486 update_count: AtomicNumber,
488
489 collected_count: AtomicNumber,
492
493 attributes: AttributeSet,
497
498 instrument: SyncInstrument,
500
501 current: Option<Arc<dyn Aggregator + Send + Sync>>,
504 checkpoint: Option<Arc<dyn Aggregator + Send + Sync>>,
505}
506
507impl sdk_api::SyncBoundInstrumentCore for Record {
508 fn record_one<'a>(&self, number: Number) {
509 if let Some(recorder) = &self.current {
511 if let Err(err) =
512 aggregators::range_test(&number, &self.instrument.instrument.descriptor)
513 .and_then(|_| recorder.update(&number, &self.instrument.instrument.descriptor))
514 {
515 global::handle_error(err);
516 return;
517 }
518
519 self.update_count.fetch_add(&NumberKind::U64, &1u64.into());
522 }
523 }
524}
525
526struct Instrument {
527 descriptor: Descriptor,
528 meter: Accumulator,
529}
530
531impl std::fmt::Debug for Instrument {
532 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
533 f.debug_struct("Instrument")
534 .field("descriptor", &self.descriptor)
535 .field("meter", &"Accumulator")
536 .finish()
537 }
538}
539
540impl sdk_api::InstrumentCore for Instrument {
541 fn descriptor(&self) -> &Descriptor {
542 &self.descriptor
543 }
544}
545
546impl sdk_api::MeterCore for Accumulator {
547 fn new_sync_instrument(
548 &self,
549 descriptor: Descriptor,
550 ) -> Result<Arc<dyn sdk_api::SyncInstrumentCore>> {
551 Ok(Arc::new(SyncInstrument {
552 instrument: Arc::new(Instrument {
553 descriptor,
554 meter: self.clone(),
555 }),
556 }))
557 }
558
559 fn record_batch_with_context(
560 &self,
561 _cx: &Context,
562 attributes: &[KeyValue],
563 measurements: Vec<Measurement>,
564 ) {
565 for measure in measurements.into_iter() {
566 if let Some(instrument) = measure
567 .instrument()
568 .as_any()
569 .downcast_ref::<SyncInstrument>()
570 {
571 let handle = instrument.acquire_handle(attributes);
572
573 handle.record_one(measure.into_number());
574 }
575 }
576 }
577
578 fn new_async_instrument(
579 &self,
580 descriptor: Descriptor,
581 runner: Option<AsyncRunner>,
582 ) -> Result<Arc<dyn sdk_api::AsyncInstrumentCore>> {
583 let instrument = Arc::new(AsyncInstrument {
584 instrument: Arc::new(Instrument {
585 descriptor,
586 meter: self.clone(),
587 }),
588 recorders: Arc::new(Mutex::new(None)),
589 });
590
591 self.0.register(instrument.clone(), runner)?;
592
593 Ok(instrument)
594 }
595
596 fn new_batch_observer(&self, runner: AsyncRunner) -> Result<()> {
597 self.0.register_runner(runner)
598 }
599}
600
601#[cfg(test)]
602mod tests {
603 use crate::metrics::MeterProvider;
604 use crate::sdk::export::metrics::ExportKindSelector;
605 use crate::sdk::metrics::accumulator;
606 use crate::sdk::metrics::controllers::pull;
607 use crate::sdk::metrics::selectors::simple::Selector;
608 use crate::sdk::Resource;
609 use crate::testing::metric::NoopProcessor;
610 use crate::{Key, KeyValue};
611 use std::sync::Arc;
612
613 #[test]
615 fn test_debug_message() {
616 let controller = pull(
617 Box::new(Selector::Exact),
618 Box::new(ExportKindSelector::Delta),
619 )
620 .build();
621 let meter = controller.provider().meter("test", None);
622 let counter = meter.f64_counter("test").init();
623 println!("{:?}, {:?}, {:?}", controller, meter, counter);
624 }
625
626 #[test]
627 fn test_sdk_provided_resource_in_accumulator() {
628 let default_service_name = accumulator(Arc::new(NoopProcessor)).build();
629 assert_eq!(
630 default_service_name
631 .0
632 .resource
633 .get(Key::from_static_str("service.name"))
634 .map(|v| v.to_string()),
635 Some("unknown_service".to_string())
636 );
637
638 let custom_service_name = accumulator(Arc::new(NoopProcessor))
639 .with_resource(Resource::new(vec![KeyValue::new(
640 "service.name",
641 "test_service",
642 )]))
643 .build();
644 assert_eq!(
645 custom_service_name
646 .0
647 .resource
648 .get(Key::from_static_str("service.name"))
649 .map(|v| v.to_string()),
650 Some("test_service".to_string())
651 );
652
653 let no_service_name = accumulator(Arc::new(NoopProcessor))
654 .with_resource(Resource::empty())
655 .build();
656
657 assert_eq!(
658 no_service_name
659 .0
660 .resource
661 .get(Key::from_static_str("service.name"))
662 .map(|v| v.to_string()),
663 None
664 )
665 }
666}