use std::{
env, fmt,
sync::{
mpsc::{self, Receiver, Sender},
Arc, Mutex, Weak,
},
thread,
time::{Duration, Instant},
};
use opentelemetry::{otel_debug, otel_error, otel_info, otel_warn, Context};
use crate::{
error::{OTelSdkError, OTelSdkResult},
metrics::{exporter::PushMetricExporter, reader::SdkProducer},
Resource,
};
use super::{
data::ResourceMetrics, instrument::InstrumentKind, pipeline::Pipeline, reader::MetricReader,
Temporality,
};
const DEFAULT_INTERVAL: Duration = Duration::from_secs(60);
const METRIC_EXPORT_INTERVAL_NAME: &str = "OTEL_METRIC_EXPORT_INTERVAL";
#[derive(Debug)]
pub struct PeriodicReaderBuilder<E> {
interval: Duration,
exporter: E,
}
impl<E> PeriodicReaderBuilder<E>
where
E: PushMetricExporter,
{
fn new(exporter: E) -> Self {
let interval = env::var(METRIC_EXPORT_INTERVAL_NAME)
.ok()
.and_then(|v| v.parse().map(Duration::from_millis).ok())
.unwrap_or(DEFAULT_INTERVAL);
PeriodicReaderBuilder { interval, exporter }
}
pub fn with_interval(mut self, interval: Duration) -> Self {
if !interval.is_zero() {
self.interval = interval;
}
self
}
pub fn build(self) -> PeriodicReader<E> {
PeriodicReader::new(self.exporter, self.interval)
}
}
pub struct PeriodicReader<E: PushMetricExporter> {
inner: Arc<PeriodicReaderInner<E>>,
}
impl<E: PushMetricExporter> Clone for PeriodicReader<E> {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
}
}
}
impl<E: PushMetricExporter> PeriodicReader<E> {
pub fn builder(exporter: E) -> PeriodicReaderBuilder<E> {
PeriodicReaderBuilder::new(exporter)
}
fn new(exporter: E, interval: Duration) -> Self {
let (message_sender, message_receiver): (Sender<Message>, Receiver<Message>) =
mpsc::channel();
let exporter_arc = Arc::new(exporter);
let reader = PeriodicReader {
inner: Arc::new(PeriodicReaderInner {
message_sender,
producer: Mutex::new(None),
exporter: exporter_arc.clone(),
}),
};
let cloned_reader = reader.clone();
let mut rm = ResourceMetrics {
resource: Resource::empty(),
scope_metrics: Vec::new(),
};
let result_thread_creation = thread::Builder::new()
.name("OpenTelemetry.Metrics.PeriodicReader".to_string())
.spawn(move || {
let _suppress_guard = Context::enter_telemetry_suppressed_scope();
let mut interval_start = Instant::now();
let mut remaining_interval = interval;
otel_debug!(
name: "PeriodReaderThreadStarted",
interval_in_millisecs = interval.as_millis(),
);
loop {
otel_debug!(
name: "PeriodReaderThreadLoopAlive", message = "Next export will happen after interval, unless flush or shutdown is triggered.", interval_in_millisecs = remaining_interval.as_millis()
);
match message_receiver.recv_timeout(remaining_interval) {
Ok(Message::Flush(response_sender)) => {
otel_debug!(
name: "PeriodReaderThreadExportingDueToFlush"
);
let export_result = cloned_reader.collect_and_export(&mut rm);
otel_debug!(
name: "PeriodReaderInvokedExport",
export_result = format!("{:?}", export_result)
);
if export_result.is_err() {
if response_sender.send(false).is_err() {
otel_debug!(
name: "PeriodReader.Flush.ResponseSendError",
message = "PeriodicReader's flush has failed, but unable to send this info back to caller.
This occurs when the caller has timed out waiting for the response. If you see this occuring frequently, consider increasing the flush timeout."
);
}
} else if response_sender.send(true).is_err() {
otel_debug!(
name: "PeriodReader.Flush.ResponseSendError",
message = "PeriodicReader's flush has completed successfully, but unable to send this info back to caller.
This occurs when the caller has timed out waiting for the response. If you see this occuring frequently, consider increasing the flush timeout."
);
}
let elapsed = interval_start.elapsed();
if elapsed < interval {
remaining_interval = interval - elapsed;
otel_debug!(
name: "PeriodReaderThreadAdjustingRemainingIntervalAfterFlush",
remaining_interval = remaining_interval.as_secs()
);
} else {
otel_debug!(
name: "PeriodReaderThreadAdjustingExportAfterFlush",
);
interval_start = Instant::now();
remaining_interval = Duration::ZERO;
}
}
Ok(Message::Shutdown(response_sender)) => {
otel_debug!(name: "PeriodReaderThreadExportingDueToShutdown");
let export_result = cloned_reader.collect_and_export(&mut rm);
otel_debug!(
name: "PeriodReaderInvokedExport",
export_result = format!("{:?}", export_result)
);
let shutdown_result = exporter_arc.shutdown();
otel_debug!(
name: "PeriodReaderInvokedExporterShutdown",
shutdown_result = format!("{:?}", shutdown_result)
);
if export_result.is_err() || shutdown_result.is_err() {
if response_sender.send(false).is_err() {
otel_info!(
name: "PeriodReaderThreadShutdown.ResponseSendError",
message = "PeriodicReader's shutdown has failed, but unable to send this info back to caller.
This occurs when the caller has timed out waiting for the response. If you see this occuring frequently, consider increasing the shutdown timeout."
);
}
} else if response_sender.send(true).is_err() {
otel_debug!(
name: "PeriodReaderThreadShutdown.ResponseSendError",
message = "PeriodicReader completed its shutdown, but unable to send this info back to caller.
This occurs when the caller has timed out waiting for the response. If you see this occuring frequently, consider increasing the shutdown timeout."
);
}
otel_debug!(
name: "PeriodReaderThreadExiting",
reason = "ShutdownRequested"
);
break;
}
Err(mpsc::RecvTimeoutError::Timeout) => {
let export_start = Instant::now();
otel_debug!(
name: "PeriodReaderThreadExportingDueToTimer"
);
let export_result = cloned_reader.collect_and_export(&mut rm);
otel_debug!(
name: "PeriodReaderInvokedExport",
export_result = format!("{:?}", export_result)
);
let time_taken_for_export = export_start.elapsed();
if time_taken_for_export > interval {
otel_debug!(
name: "PeriodReaderThreadExportTookLongerThanInterval"
);
interval_start = Instant::now();
remaining_interval = Duration::ZERO;
} else {
remaining_interval = interval - time_taken_for_export;
interval_start = Instant::now();
}
}
Err(mpsc::RecvTimeoutError::Disconnected) => {
otel_debug!(
name: "PeriodReaderThreadExiting",
reason = "MessageSenderDisconnected"
);
break;
}
}
}
otel_debug!(
name: "PeriodReaderThreadStopped"
);
});
#[allow(unused_variables)]
if let Err(e) = result_thread_creation {
otel_error!(
name: "PeriodReaderThreadStartError",
message = "Failed to start PeriodicReader thread. Metrics will not be exported.",
error = format!("{:?}", e)
);
}
reader
}
fn collect_and_export(&self, rm: &mut ResourceMetrics) -> OTelSdkResult {
self.inner.collect_and_export(rm)
}
}
impl<E: PushMetricExporter> fmt::Debug for PeriodicReader<E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PeriodicReader").finish()
}
}
struct PeriodicReaderInner<E: PushMetricExporter> {
exporter: Arc<E>,
message_sender: mpsc::Sender<Message>,
producer: Mutex<Option<Weak<dyn SdkProducer>>>,
}
impl<E: PushMetricExporter> PeriodicReaderInner<E> {
fn register_pipeline(&self, producer: Weak<dyn SdkProducer>) {
let mut inner = self.producer.lock().expect("lock poisoned");
*inner = Some(producer);
}
fn temporality(&self, _kind: InstrumentKind) -> Temporality {
self.exporter.temporality()
}
fn collect(&self, rm: &mut ResourceMetrics) -> OTelSdkResult {
let producer = self.producer.lock().expect("lock poisoned");
if let Some(p) = producer.as_ref() {
p.upgrade()
.ok_or(OTelSdkError::AlreadyShutdown)?
.produce(rm)?;
Ok(())
} else {
otel_warn!(
name: "PeriodReader.MeterProviderNotRegistered",
message = "PeriodicReader is not registered with MeterProvider. Metrics will not be collected. \
This occurs when a periodic reader is created but not associated with a MeterProvider \
by calling `.with_reader(reader)` on MeterProviderBuilder."
);
Err(OTelSdkError::InternalFailure(
"MeterProvider is not registered".into(),
))
}
}
fn collect_and_export(&self, rm: &mut ResourceMetrics) -> OTelSdkResult {
let current_time = Instant::now();
let collect_result = self.collect(rm);
let time_taken_for_collect = current_time.elapsed();
#[allow(clippy::question_mark)]
if let Err(e) = collect_result {
otel_warn!(
name: "PeriodReaderCollectError",
error = format!("{:?}", e)
);
return Err(OTelSdkError::InternalFailure(e.to_string()));
}
if rm.scope_metrics.is_empty() {
otel_debug!(name: "NoMetricsCollected");
return Ok(());
}
let metrics_count = rm.scope_metrics.iter().fold(0, |count, scope_metrics| {
count + scope_metrics.metrics.len()
});
otel_debug!(name: "PeriodicReaderMetricsCollected", count = metrics_count, time_taken_in_millis = time_taken_for_collect.as_millis());
futures_executor::block_on(self.exporter.export(rm))
}
fn force_flush(&self) -> OTelSdkResult {
let (response_tx, response_rx) = mpsc::channel();
self.message_sender
.send(Message::Flush(response_tx))
.map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
if let Ok(response) = response_rx.recv() {
if response {
Ok(())
} else {
Err(OTelSdkError::InternalFailure("Failed to flush".into()))
}
} else {
Err(OTelSdkError::InternalFailure("Failed to flush".into()))
}
}
fn shutdown(&self) -> OTelSdkResult {
let (response_tx, response_rx) = mpsc::channel();
self.message_sender
.send(Message::Shutdown(response_tx))
.map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
match response_rx.recv_timeout(Duration::from_secs(5)) {
Ok(response) => {
if response {
Ok(())
} else {
Err(OTelSdkError::InternalFailure("Failed to shutdown".into()))
}
}
Err(mpsc::RecvTimeoutError::Timeout) => {
Err(OTelSdkError::Timeout(Duration::from_secs(5)))
}
Err(mpsc::RecvTimeoutError::Disconnected) => {
Err(OTelSdkError::InternalFailure("Failed to shutdown".into()))
}
}
}
}
#[derive(Debug)]
enum Message {
Flush(Sender<bool>),
Shutdown(Sender<bool>),
}
impl<E: PushMetricExporter> MetricReader for PeriodicReader<E> {
fn register_pipeline(&self, pipeline: Weak<Pipeline>) {
self.inner.register_pipeline(pipeline);
}
fn collect(&self, rm: &mut ResourceMetrics) -> OTelSdkResult {
self.inner.collect(rm)
}
fn force_flush(&self) -> OTelSdkResult {
self.inner.force_flush()
}
fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
self.inner.shutdown()
}
fn temporality(&self, kind: InstrumentKind) -> Temporality {
kind.temporality_preference(self.inner.temporality(kind))
}
}
#[cfg(all(test, feature = "testing"))]
mod tests {
use super::PeriodicReader;
use crate::{
error::{OTelSdkError, OTelSdkResult},
metrics::{
data::ResourceMetrics, exporter::PushMetricExporter, reader::MetricReader,
InMemoryMetricExporter, SdkMeterProvider, Temporality,
},
Resource,
};
use opentelemetry::metrics::MeterProvider;
use std::{
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
mpsc, Arc,
},
time::Duration,
};
#[derive(Debug, Clone)]
struct MetricExporterThatFailsOnlyOnFirst {
count: Arc<AtomicUsize>,
}
impl Default for MetricExporterThatFailsOnlyOnFirst {
fn default() -> Self {
MetricExporterThatFailsOnlyOnFirst {
count: Arc::new(AtomicUsize::new(0)),
}
}
}
impl MetricExporterThatFailsOnlyOnFirst {
fn get_count(&self) -> usize {
self.count.load(Ordering::Relaxed)
}
}
impl PushMetricExporter for MetricExporterThatFailsOnlyOnFirst {
async fn export(&self, _metrics: &ResourceMetrics) -> OTelSdkResult {
if self.count.fetch_add(1, Ordering::Relaxed) == 0 {
Err(OTelSdkError::InternalFailure("export failed".into()))
} else {
Ok(())
}
}
fn force_flush(&self) -> OTelSdkResult {
Ok(())
}
fn shutdown(&self) -> OTelSdkResult {
Ok(())
}
fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
Ok(())
}
fn temporality(&self) -> Temporality {
Temporality::Cumulative
}
}
#[derive(Debug, Clone, Default)]
struct MockMetricExporter {
is_shutdown: Arc<AtomicBool>,
}
impl PushMetricExporter for MockMetricExporter {
async fn export(&self, _metrics: &ResourceMetrics) -> OTelSdkResult {
Ok(())
}
fn force_flush(&self) -> OTelSdkResult {
Ok(())
}
fn shutdown(&self) -> OTelSdkResult {
self.shutdown_with_timeout(Duration::from_secs(5))
}
fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
self.is_shutdown.store(true, Ordering::Relaxed);
Ok(())
}
fn temporality(&self) -> Temporality {
Temporality::Cumulative
}
}
#[test]
fn collection_triggered_by_interval_multiple() {
let interval = std::time::Duration::from_millis(1);
let exporter = InMemoryMetricExporter::default();
let reader = PeriodicReader::builder(exporter.clone())
.with_interval(interval)
.build();
let i = Arc::new(AtomicUsize::new(0));
let i_clone = i.clone();
let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
let meter = meter_provider.meter("test");
let _counter = meter
.u64_observable_counter("testcounter")
.with_callback(move |_| {
i_clone.fetch_add(1, Ordering::Relaxed);
})
.build();
std::thread::sleep(interval * 5 * 20);
assert!(i.load(Ordering::Relaxed) >= 5);
}
#[test]
fn shutdown_repeat() {
let exporter = InMemoryMetricExporter::default();
let reader = PeriodicReader::builder(exporter.clone()).build();
let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
let result = meter_provider.shutdown();
assert!(result.is_ok());
let result = meter_provider.shutdown();
assert!(result.is_err());
assert!(matches!(result, Err(OTelSdkError::AlreadyShutdown)));
let result = meter_provider.shutdown();
assert!(result.is_err());
assert!(matches!(result, Err(OTelSdkError::AlreadyShutdown)));
}
#[test]
fn flush_after_shutdown() {
let exporter = InMemoryMetricExporter::default();
let reader = PeriodicReader::builder(exporter.clone()).build();
let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
let result = meter_provider.force_flush();
assert!(result.is_ok());
let result = meter_provider.shutdown();
assert!(result.is_ok());
let result = meter_provider.force_flush();
assert!(result.is_err());
}
#[test]
fn flush_repeat() {
let exporter = InMemoryMetricExporter::default();
let reader = PeriodicReader::builder(exporter.clone()).build();
let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
let result = meter_provider.force_flush();
assert!(result.is_ok());
let result = meter_provider.force_flush();
assert!(result.is_ok());
}
#[test]
fn periodic_reader_without_pipeline() {
let exporter = InMemoryMetricExporter::default();
let reader = PeriodicReader::builder(exporter.clone()).build();
let rm = &mut ResourceMetrics {
resource: Resource::empty(),
scope_metrics: Vec::new(),
};
let result = reader.collect(rm);
assert!(result.is_err());
let result = reader.force_flush();
assert!(result.is_err());
let meter_provider = SdkMeterProvider::builder()
.with_reader(reader.clone())
.build();
let result = reader.collect(rm);
assert!(result.is_ok());
let result = meter_provider.force_flush();
assert!(result.is_ok());
}
#[test]
fn exporter_failures_are_handled() {
let interval = std::time::Duration::from_millis(10);
let exporter = MetricExporterThatFailsOnlyOnFirst::default();
let reader = PeriodicReader::builder(exporter.clone())
.with_interval(interval)
.build();
let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
let meter = meter_provider.meter("test");
let counter = meter.u64_counter("sync_counter").build();
counter.add(1, &[]);
let _obs_counter = meter
.u64_observable_counter("testcounter")
.with_callback(move |observer| {
observer.observe(1, &[]);
})
.build();
std::thread::sleep(Duration::from_millis(500));
assert!(exporter.get_count() >= 2);
}
#[test]
fn shutdown_passed_to_exporter() {
let exporter = MockMetricExporter::default();
let reader = PeriodicReader::builder(exporter.clone()).build();
let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
let meter = meter_provider.meter("test");
let counter = meter.u64_counter("sync_counter").build();
counter.add(1, &[]);
let result = meter_provider.shutdown();
assert!(result.is_ok());
assert!(exporter.is_shutdown.load(Ordering::Relaxed));
}
#[test]
fn collection() {
collection_triggered_by_interval_helper();
collection_triggered_by_flush_helper();
collection_triggered_by_shutdown_helper();
collection_triggered_by_drop_helper();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn collection_from_tokio_multi_with_one_worker() {
collection_triggered_by_interval_helper();
collection_triggered_by_flush_helper();
collection_triggered_by_shutdown_helper();
collection_triggered_by_drop_helper();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn collection_from_tokio_with_two_worker() {
collection_triggered_by_interval_helper();
collection_triggered_by_flush_helper();
collection_triggered_by_shutdown_helper();
collection_triggered_by_drop_helper();
}
#[tokio::test(flavor = "current_thread")]
async fn collection_from_tokio_current() {
collection_triggered_by_interval_helper();
collection_triggered_by_flush_helper();
collection_triggered_by_shutdown_helper();
collection_triggered_by_drop_helper();
}
fn collection_triggered_by_interval_helper() {
collection_helper(|_| {
std::thread::sleep(Duration::from_millis(500));
});
}
fn collection_triggered_by_flush_helper() {
collection_helper(|meter_provider| {
meter_provider.force_flush().expect("flush should succeed");
});
}
fn collection_triggered_by_shutdown_helper() {
collection_helper(|meter_provider| {
meter_provider.shutdown().expect("shutdown should succeed");
});
}
fn collection_triggered_by_drop_helper() {
collection_helper(|meter_provider| {
drop(meter_provider);
});
}
fn collection_helper(trigger: fn(SdkMeterProvider)) {
let exporter = InMemoryMetricExporter::default();
let reader = PeriodicReader::builder(exporter.clone()).build();
let (sender, receiver) = mpsc::channel();
let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
let meter = meter_provider.meter("test");
let _counter = meter
.u64_observable_counter("testcounter")
.with_callback(move |observer| {
observer.observe(1, &[]);
sender.send(()).expect("channel should still be open");
})
.build();
trigger(meter_provider);
receiver
.recv_timeout(Duration::ZERO)
.expect("message should be available in channel, indicating a collection occurred, which should trigger observable callback");
let exported_metrics = exporter
.get_finished_metrics()
.expect("this should not fail");
assert!(
!exported_metrics.is_empty(),
"Metrics should be available in exporter."
);
}
async fn some_async_function() -> u64 {
std::thread::sleep(std::time::Duration::from_millis(1));
1
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn async_inside_observable_callback_from_tokio_multi_with_one_worker() {
async_inside_observable_callback_helper();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn async_inside_observable_callback_from_tokio_multi_with_two_worker() {
async_inside_observable_callback_helper();
}
#[tokio::test(flavor = "current_thread")]
async fn async_inside_observable_callback_from_tokio_current_thread() {
async_inside_observable_callback_helper();
}
#[test]
fn async_inside_observable_callback_from_regular_main() {
async_inside_observable_callback_helper();
}
fn async_inside_observable_callback_helper() {
let interval = std::time::Duration::from_millis(10);
let exporter = InMemoryMetricExporter::default();
let reader = PeriodicReader::builder(exporter.clone())
.with_interval(interval)
.build();
let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
let meter = meter_provider.meter("test");
let _gauge = meter
.u64_observable_gauge("my_observable_gauge")
.with_callback(|observer| {
let value = futures_executor::block_on(some_async_function());
observer.observe(value, &[]);
})
.build();
meter_provider.force_flush().expect("flush should succeed");
let exported_metrics = exporter
.get_finished_metrics()
.expect("this should not fail");
assert!(
!exported_metrics.is_empty(),
"Metrics should be available in exporter."
);
}
async fn some_tokio_async_function() -> u64 {
tokio::time::sleep(Duration::from_millis(1)).await;
1
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn tokio_async_inside_observable_callback_from_tokio_multi_with_one_worker() {
tokio_async_inside_observable_callback_helper(true);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn tokio_async_inside_observable_callback_from_tokio_multi_with_two_worker() {
tokio_async_inside_observable_callback_helper(true);
}
#[tokio::test(flavor = "current_thread")]
#[ignore] async fn tokio_async_inside_observable_callback_from_tokio_current_thread() {
tokio_async_inside_observable_callback_helper(true);
}
#[test]
fn tokio_async_inside_observable_callback_from_regular_main() {
tokio_async_inside_observable_callback_helper(false);
}
fn tokio_async_inside_observable_callback_helper(use_current_tokio_runtime: bool) {
let exporter = InMemoryMetricExporter::default();
let reader = PeriodicReader::builder(exporter.clone()).build();
let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
let meter = meter_provider.meter("test");
if use_current_tokio_runtime {
let rt = tokio::runtime::Handle::current().clone();
let _gauge = meter
.u64_observable_gauge("my_observable_gauge")
.with_callback(move |observer| {
let value = rt.block_on(some_tokio_async_function());
observer.observe(value, &[]);
})
.build();
} else {
let rt = tokio::runtime::Runtime::new().unwrap();
let _gauge = meter
.u64_observable_gauge("my_observable_gauge")
.with_callback(move |observer| {
let value = rt.block_on(some_tokio_async_function());
observer.observe(value, &[]);
})
.build();
};
meter_provider.force_flush().expect("flush should succeed");
let exported_metrics = exporter
.get_finished_metrics()
.expect("this should not fail");
assert!(
!exported_metrics.is_empty(),
"Metrics should be available in exporter."
);
}
}