use std::{collections::BTreeMap, fmt, future::Future, pin::Pin};
use futures_util::{FutureExt, Stream, future};
use crate::{
BoxFuture,
collector::{self, CloudWatch, Config, RecorderHandle, Resolution},
error::Error,
};
pub struct Builder {
cloudwatch_namespace: Option<String>,
default_dimensions: BTreeMap<String, String>,
storage_resolution: Option<Resolution>,
send_interval_secs: Option<u64>,
send_timeout_secs: Option<u64>,
shutdown_signal: Option<BoxFuture<'static, ()>>,
metric_buffer_size: usize,
force_flush_stream: Option<Pin<Box<dyn Stream<Item = ()> + Send>>>,
#[cfg(feature = "gzip")]
gzip: bool,
}
fn extract_namespace(cloudwatch_namespace: Option<String>) -> Result<String, Error> {
match cloudwatch_namespace {
Some(namespace) if !namespace.is_empty() => Ok(namespace),
_ => Err(Error::BuilderIncomplete(
"cloudwatch_namespace missing".into(),
)),
}
}
impl Builder {
pub fn new() -> Self {
Builder {
cloudwatch_namespace: Default::default(),
default_dimensions: Default::default(),
storage_resolution: Default::default(),
send_interval_secs: Default::default(),
send_timeout_secs: Default::default(),
shutdown_signal: Default::default(),
metric_buffer_size: 2048,
force_flush_stream: Default::default(),
#[cfg(feature = "gzip")]
gzip: true,
}
}
pub fn force_flush_stream(
mut self,
force_flush_stream: Pin<Box<dyn Stream<Item = ()> + Send>>,
) -> Self {
self.force_flush_stream = Some(force_flush_stream);
self
}
pub fn cloudwatch_namespace(self, namespace: impl Into<String>) -> Self {
Self {
cloudwatch_namespace: Some(namespace.into()),
..self
}
}
pub fn default_dimension(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
self.default_dimensions.insert(name.into(), value.into());
self
}
pub fn storage_resolution(self, resolution: Resolution) -> Self {
Self {
storage_resolution: Some(resolution),
..self
}
}
pub fn send_interval_secs(self, secs: u64) -> Self {
Self {
send_interval_secs: Some(secs),
..self
}
}
pub fn send_timeout_secs(self, secs: u64) -> Self {
Self {
send_timeout_secs: Some(secs),
..self
}
}
pub fn shutdown_signal(self, shutdown_signal: BoxFuture<'static, ()>) -> Self {
Self {
shutdown_signal: Some(shutdown_signal),
..self
}
}
pub fn metric_buffer_size(self, metric_buffer_size: usize) -> Self {
Self {
metric_buffer_size,
..self
}
}
#[cfg(feature = "gzip")]
pub fn gzip(self, gzip: bool) -> Self {
Self { gzip, ..self }
}
pub fn init_thread(
self,
client: aws_sdk_cloudwatch::Client,
set_global_recorder: fn(
RecorderHandle,
) -> Result<(), metrics::SetRecorderError<RecorderHandle>>,
) -> Result<(), Error> {
let (config, force_flush_stream) = self.build_config()?;
collector::init(set_global_recorder, client, config, force_flush_stream);
Ok(())
}
#[deprecated = "Use init_async instead which allows for `metrics` to be fully initialized first before starting the driver task"]
pub async fn init_future(
self,
client: aws_sdk_cloudwatch::Client,
set_global_recorder: fn(
RecorderHandle,
) -> Result<(), metrics::SetRecorderError<RecorderHandle>>,
) -> Result<(), Error> {
let (config, force_flush_stream) = self.build_config()?;
let driver =
collector::init_future(set_global_recorder, client, config, force_flush_stream).await?;
driver.await;
Ok(())
}
pub async fn init_async(
self,
client: aws_sdk_cloudwatch::Client,
set_global_recorder: fn(
RecorderHandle,
) -> Result<(), metrics::SetRecorderError<RecorderHandle>>,
) -> Result<impl Future<Output = ()>, Error> {
let (config, force_flush_stream) = self.build_config()?;
collector::init_future(set_global_recorder, client, config, force_flush_stream).await
}
#[doc(hidden)]
pub async fn init_future_mock(
self,
client: impl CloudWatch,
set_global_recorder: fn(
RecorderHandle,
) -> Result<(), metrics::SetRecorderError<RecorderHandle>>,
) -> Result<impl Future<Output = ()>, Error> {
let (config, force_flush_stream) = self.build_config()?;
collector::init_future(set_global_recorder, client, config, force_flush_stream).await
}
#[allow(clippy::type_complexity)]
fn build_config(
self,
) -> Result<(Config, Option<Pin<Box<dyn Stream<Item = ()> + Send>>>), Error> {
Ok((
Config {
cloudwatch_namespace: extract_namespace(self.cloudwatch_namespace)?,
default_dimensions: self.default_dimensions,
storage_resolution: self.storage_resolution.unwrap_or(Resolution::Minute),
send_interval_secs: self.send_interval_secs.unwrap_or(10),
send_timeout_secs: self.send_timeout_secs.unwrap_or(4),
shutdown_signal: self
.shutdown_signal
.unwrap_or_else(|| Box::pin(future::pending()))
.shared(),
metric_buffer_size: self.metric_buffer_size,
#[cfg(feature = "gzip")]
gzip: self.gzip,
},
self.force_flush_stream,
))
}
}
impl fmt::Debug for Builder {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let Self {
cloudwatch_namespace,
default_dimensions,
storage_resolution,
send_interval_secs,
send_timeout_secs,
shutdown_signal: _,
metric_buffer_size,
force_flush_stream: _,
#[cfg(feature = "gzip")]
gzip,
} = self;
let mut f = f.debug_struct("Builder");
f.field("cloudwatch_namespace", cloudwatch_namespace)
.field("default_dimensions", default_dimensions)
.field("storage_resolution", storage_resolution)
.field("send_interval_secs", send_interval_secs)
.field("send_timeout_secs", send_timeout_secs)
.field("shutdown_signal", &"BoxFuture")
.field("metric_buffer_size", metric_buffer_size)
.field("force_flush_stream", &"dyn Stream");
#[cfg(feature = "gzip")]
f.field("gzip", gzip);
f.finish()
}
}