use opentelemetry::otel_debug;
use std::time::Duration;
use std::{
fmt,
sync::{Mutex, Weak},
};
use crate::{
error::{OTelSdkError, OTelSdkResult},
metrics::Temporality,
};
use super::{
data::ResourceMetrics,
pipeline::Pipeline,
reader::{MetricReader, SdkProducer},
};
pub struct ManualReader {
inner: Mutex<ManualReaderInner>,
temporality: Temporality,
}
impl Default for ManualReader {
fn default() -> Self {
ManualReader::builder().build()
}
}
impl fmt::Debug for ManualReader {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("ManualReader")
}
}
#[derive(Debug)]
struct ManualReaderInner {
sdk_producer: Option<Weak<dyn SdkProducer>>,
is_shutdown: bool,
}
impl ManualReader {
pub fn builder() -> ManualReaderBuilder {
ManualReaderBuilder::default()
}
pub(crate) fn new(temporality: Temporality) -> Self {
ManualReader {
inner: Mutex::new(ManualReaderInner {
sdk_producer: None,
is_shutdown: false,
}),
temporality,
}
}
}
impl MetricReader for ManualReader {
fn register_pipeline(&self, pipeline: Weak<Pipeline>) {
let _ = self.inner.lock().map(|mut inner| {
if inner.sdk_producer.is_none() {
inner.sdk_producer = Some(pipeline);
} else {
otel_debug!(
name: "ManualReader.DuplicateRegistration",
message = "The pipeline is already registered to the Reader. Registering pipeline multiple times is not allowed.");
}
});
}
fn collect(&self, rm: &mut ResourceMetrics) -> OTelSdkResult {
let inner = self
.inner
.lock()
.map_err(|_| OTelSdkError::InternalFailure("Failed to lock pipeline".into()))?;
match &inner.sdk_producer.as_ref().and_then(|w| w.upgrade()) {
Some(producer) => producer.produce(rm)?,
None => {
return Err(OTelSdkError::InternalFailure(
"reader is shut down or not registered".into(),
))
}
};
Ok(())
}
fn force_flush(&self) -> OTelSdkResult {
Ok(())
}
fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
let mut inner = self
.inner
.lock()
.map_err(|e| OTelSdkError::InternalFailure(format!("Failed to acquire lock: {e}")))?;
inner.sdk_producer = None;
inner.is_shutdown = true;
Ok(())
}
fn temporality(&self, kind: super::InstrumentKind) -> Temporality {
kind.temporality_preference(self.temporality)
}
}
#[derive(Default)]
pub struct ManualReaderBuilder {
temporality: Temporality,
}
impl fmt::Debug for ManualReaderBuilder {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("ManualReaderBuilder")
}
}
impl ManualReaderBuilder {
pub fn new() -> Self {
Default::default()
}
pub fn with_temporality(mut self, temporality: Temporality) -> Self {
self.temporality = temporality;
self
}
pub fn build(self) -> ManualReader {
ManualReader::new(self.temporality)
}
}