#![allow(dead_code)]
use super::{collector, Error};
use metrics::SharedString;
use std::sync::Arc;
use std::time::Duration;
pub type AutoFlushWriterFactory = Arc<dyn Fn() -> Box<dyn std::io::Write + Send> + Send + Sync>;
pub struct Builder {
cloudwatch_namespace: Option<SharedString>,
default_dimensions: Vec<(SharedString, SharedString)>,
timestamp: Option<u64>,
emit_zeros: bool,
auto_flush_interval: Option<Duration>,
auto_flush_writer: Option<AutoFlushWriterFactory>,
#[cfg(feature = "lambda")]
lambda_cold_start_span: Option<tracing::span::Span>,
#[cfg(feature = "lambda")]
lambda_cold_start: Option<&'static str>,
#[cfg(feature = "lambda")]
lambda_request_id: Option<&'static str>,
#[cfg(feature = "lambda")]
lambda_xray_trace_id: Option<&'static str>,
}
impl Builder {
#[allow(clippy::new_without_default)]
pub fn new() -> Self {
Builder {
cloudwatch_namespace: Default::default(),
default_dimensions: Default::default(),
timestamp: None,
emit_zeros: false,
auto_flush_interval: None,
auto_flush_writer: None,
#[cfg(feature = "lambda")]
lambda_cold_start_span: None,
#[cfg(feature = "lambda")]
lambda_cold_start: None,
#[cfg(feature = "lambda")]
lambda_request_id: None,
#[cfg(feature = "lambda")]
lambda_xray_trace_id: None,
}
}
pub fn cloudwatch_namespace(self, namespace: impl Into<SharedString>) -> Self {
Self {
cloudwatch_namespace: Some(namespace.into()),
..self
}
}
pub fn with_dimension(mut self, name: impl Into<SharedString>, value: impl Into<SharedString>) -> Self {
self.default_dimensions.push((name.into(), value.into()));
self
}
pub fn with_timestamp(mut self, timestamp: u64) -> Self {
self.timestamp = Some(timestamp);
self
}
pub fn emit_zeros(mut self, emit_zeros: bool) -> Self {
self.emit_zeros = emit_zeros;
self
}
pub fn with_auto_flush(self) -> Self {
self.with_auto_flush_interval(super::DEFAULT_AUTO_FLUSH_INTERVAL)
}
pub fn with_auto_flush_interval(mut self, interval: Duration) -> Self {
self.auto_flush_interval = Some(interval);
self
}
pub fn with_auto_flush_writer<F, W>(mut self, interval: Duration, writer_factory: F) -> Self
where
F: Fn() -> W + Send + Sync + 'static,
W: std::io::Write + Send + 'static,
{
self.auto_flush_interval = Some(interval);
self.auto_flush_writer = Some(Arc::new(move || {
Box::new(writer_factory()) as Box<dyn std::io::Write + Send>
}));
self
}
#[cfg(feature = "lambda")]
pub fn lambda_cold_start_span(mut self, cold_start_span: tracing::span::Span) -> Self {
self.lambda_cold_start_span = Some(cold_start_span);
self
}
#[cfg(feature = "lambda")]
pub fn lambda_cold_start_metric(mut self, name: &'static str) -> Self {
self.lambda_cold_start = Some(name);
self
}
#[cfg(feature = "lambda")]
pub fn with_lambda_request_id(mut self, name: &'static str) -> Self {
self.lambda_request_id = Some(name);
self
}
#[cfg(feature = "lambda")]
pub fn with_lambda_xray_trace_id(mut self, name: &'static str) -> Self {
self.lambda_xray_trace_id = Some(name);
self
}
#[cfg(not(feature = "lambda"))]
fn build(self) -> Result<collector::Collector, Error> {
let config = collector::Config {
cloudwatch_namespace: self.cloudwatch_namespace.ok_or("cloudwatch_namespace missing")?,
default_dimensions: self.default_dimensions,
timestamp: self.timestamp,
emit_zeros: self.emit_zeros,
};
Ok(collector::Collector::new(config))
}
#[cfg(feature = "lambda")]
fn build(self) -> Result<collector::Collector, Error> {
let config = collector::Config {
cloudwatch_namespace: self.cloudwatch_namespace.ok_or("cloudwatch_namespace missing")?,
default_dimensions: self.default_dimensions,
timestamp: self.timestamp,
emit_zeros: self.emit_zeros,
lambda_cold_start: self.lambda_cold_start,
lambda_request_id: self.lambda_request_id,
lambda_xray_trace_id: self.lambda_xray_trace_id,
};
Ok(collector::Collector::new(config, self.lambda_cold_start_span))
}
pub fn init(self) -> Result<&'static collector::Collector, Error> {
let auto_flush_interval = self.auto_flush_interval;
let auto_flush_writer = self.auto_flush_writer.clone();
let collector: &'static collector::Collector = Box::leak(Box::new(self.build()?));
if let Some(interval) = auto_flush_interval {
let writer_factory = auto_flush_writer
.unwrap_or_else(|| Arc::new(|| Box::new(std::io::stdout()) as Box<dyn std::io::Write + Send>));
spawn_auto_flush_task(collector, interval, writer_factory);
}
metrics::set_global_recorder::<collector::Recorder>(collector.into()).map_err(|e| e.to_string())?;
Ok(collector)
}
}
fn spawn_auto_flush_task(
collector: &'static collector::Collector,
interval: Duration,
writer_factory: AutoFlushWriterFactory,
) {
tokio::spawn(async move {
let mut interval_timer = tokio::time::interval(interval);
interval_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
interval_timer.tick().await;
let writer = writer_factory();
if let Err(e) = collector.flush(writer) {
tracing::error!("Auto-flush failed: {e}");
}
}
});
}