use std::fmt;
use std::str::FromStr;
use std::sync::Arc;
use std::time::{Duration, Instant};
use crate::nakadi_types::{subscription::StreamParameters, Error};
use crate::components::{
committer::CommitError,
connector::ConnectError,
streams::{EventStreamBatchStats, EventStreamError},
};
pub use crate::internals::background_committer::CommitTrigger;
#[cfg(feature = "metrix")]
mod metrix_impl;
#[cfg(feature = "metrix")]
pub use self::metrix_impl::{Metrix, MetrixConfig, MetrixGaugeTrackingSecs};
#[cfg(feature = "metrix")]
pub use metrix::{
driver::{DriverBuilder, TelemetryDriver},
processor::ProcessorMount,
AggregatesProcessors,
};
pub trait Instruments {
fn consumer_started(&self) {}
fn consumer_stopped(&self, _ran_for: Duration) {}
fn streaming_ended(&self, _streamed_for: Duration) {}
fn stream_connect_attempt_success(&self, _time: Duration) {}
fn stream_connect_attempt_failed(&self, _time: Duration) {}
fn stream_connected(&self, _time: Duration) {}
fn stream_not_connected(&self, _time: Duration, _err: &ConnectError) {}
fn stream_chunk_received(&self, _n_bytes: usize) {}
fn stream_frame_completed(&self, _n_bytes: usize, _time: Duration) {}
fn stream_tick_emitted(&self) {}
fn info_frame_received(&self, _frame_started_at: Instant, _frame_completed_at: Instant) {}
fn keep_alive_frame_received(&self, _frame_started_at: Instant, _frame_completed_at: Instant) {}
fn batch_frame_received(
&self,
_frame_started_at: Instant,
_frame_completed_at: Instant,
_events_bytes: usize,
) {
}
fn batch_frame_gap(&self, _gap: Duration) {}
fn no_frames_warning(&self, _no_frames_for: Duration) {}
fn no_events_warning(&self, _no_events_for: Duration) {}
fn stream_dead(&self, _after: Duration) {}
fn stream_error(&self, _err: &EventStreamError) {}
fn stream_unconsumed_events(&self, _n_unconsumed: usize) {}
fn batches_in_flight_incoming(&self, _stats: &EventStreamBatchStats) {}
fn batches_in_flight_processed(&self, _stats: &EventStreamBatchStats) {}
fn in_flight_stats_reset(&self) {}
fn event_type_partition_activated(&self) {}
fn event_type_partition_deactivated(&self, _active_for: Duration) {}
fn batch_processing_started(&self, _frame_started_at: Instant, _frame_completed_at: Instant) {}
fn batch_processed(&self, _n_bytes: usize, _time: Duration) {}
fn batch_processed_n_events(&self, _n_events: usize) {}
fn batch_deserialized(&self, _n_bytes: usize, _time: Duration) {}
fn cursor_to_commit_received(&self, _frame_started_at: Instant, _frame_completed_at: Instant) {}
fn cursors_commit_triggered(&self, _trigger: CommitTrigger) {}
fn cursor_ages_on_commit_attempt(
&self,
_first_cursor_age: Duration,
_last_cursor_age: Duration,
_first_cursor_age_warning: bool,
) {
}
fn cursors_committed(&self, _n_cursors: usize, _time: Duration) {}
fn batches_committed(&self, _n_batches: usize, _n_events: usize) {}
fn cursors_not_committed(&self, _n_cursors: usize, _time: Duration, _err: &CommitError) {}
fn commit_cursors_attempt_failed(&self, _n_cursors: usize, _time: Duration) {}
fn stream_parameters(&self, _params: &StreamParameters) {}
}
impl<T> crate::tools::subscription_stats::Instruments for T
where
T: Instruments,
{
fn unconsumed_events(&self, n_unconsumed: usize) {
self.stream_unconsumed_events(n_unconsumed);
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub enum MetricsDetailLevel {
Low,
Medium,
High,
}
impl MetricsDetailLevel {
env_funs!("METRICS_DETAIL_LEVEL");
}
impl Default for MetricsDetailLevel {
fn default() -> Self {
MetricsDetailLevel::Medium
}
}
impl FromStr for MetricsDetailLevel {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_lowercase().as_ref() {
"low" => Ok(MetricsDetailLevel::Low),
"medium" => Ok(MetricsDetailLevel::Medium),
"high" => Ok(MetricsDetailLevel::High),
s => Err(Error::new(format!(
"{} is not a valid MetricsDetailLevel",
s
))),
}
}
}
#[derive(Clone)]
pub struct Instrumentation {
instr: InstrumentationSelection,
detail: MetricsDetailLevel,
}
impl Instrumentation {
pub fn off() -> Self {
{
Instrumentation {
instr: InstrumentationSelection::Off,
detail: MetricsDetailLevel::default(),
}
}
}
pub fn new<I>(instruments: I, detail: MetricsDetailLevel) -> Self
where
I: Instruments + Send + Sync + 'static,
{
Instrumentation {
instr: InstrumentationSelection::Custom(Arc::new(instruments)),
detail,
}
}
#[cfg(feature = "metrix")]
pub fn metrix(metrix: Metrix, detail: MetricsDetailLevel) -> Self {
Instrumentation {
instr: InstrumentationSelection::Metrix(metrix),
detail,
}
}
#[cfg(feature = "metrix")]
pub fn metrix_mounted<A: AggregatesProcessors>(
config: &MetrixConfig,
detail: MetricsDetailLevel,
processor: &mut A,
) -> Self {
let metrix = Metrix::new(config, processor);
Self::metrix(metrix, detail)
}
#[cfg(feature = "metrix")]
pub fn metrix_mountable(
config: &MetrixConfig,
detail: MetricsDetailLevel,
mountable_name: Option<&str>,
) -> (Self, ProcessorMount) {
let (metrix, mount) = Metrix::new_mountable(config, mountable_name);
(Self::metrix(metrix, detail), mount)
}
}
impl Default for Instrumentation {
fn default() -> Self {
Instrumentation::off()
}
}
impl fmt::Debug for Instrumentation {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Instrumentation")?;
Ok(())
}
}
impl Instruments for Instrumentation {
fn consumer_started(&self) {
if self.detail >= MetricsDetailLevel::Medium {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => instr.consumer_started(),
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => instr.consumer_started(),
}
}
}
fn consumer_stopped(&self, ran_for: Duration) {
if self.detail >= MetricsDetailLevel::Medium {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => instr.consumer_stopped(ran_for),
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => instr.consumer_stopped(ran_for),
}
}
}
fn streaming_ended(&self, streamed_for: Duration) {
if self.detail >= MetricsDetailLevel::Medium {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => instr.streaming_ended(streamed_for),
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => instr.streaming_ended(streamed_for),
}
}
}
fn stream_connect_attempt_success(&self, time: Duration) {
if self.detail >= MetricsDetailLevel::Medium {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => {
instr.stream_connect_attempt_success(time)
}
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => {
instr.stream_connect_attempt_success(time)
}
}
}
}
fn stream_connect_attempt_failed(&self, time: Duration) {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => {
instr.stream_connect_attempt_failed(time)
}
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => {
instr.stream_connect_attempt_failed(time)
}
}
}
fn stream_connected(&self, time: Duration) {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => instr.stream_connected(time),
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => instr.stream_connected(time),
}
}
fn stream_not_connected(&self, time: Duration, err: &ConnectError) {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => instr.stream_not_connected(time, err),
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => instr.stream_not_connected(time, err),
}
}
fn stream_chunk_received(&self, n_bytes: usize) {
if self.detail == MetricsDetailLevel::High {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => instr.stream_chunk_received(n_bytes),
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => instr.stream_chunk_received(n_bytes),
}
}
}
fn stream_frame_completed(&self, n_bytes: usize, time: Duration) {
if self.detail == MetricsDetailLevel::High {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => {
instr.stream_frame_completed(n_bytes, time)
}
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => {
instr.stream_frame_completed(n_bytes, time)
}
}
}
}
fn stream_tick_emitted(&self) {
if self.detail >= MetricsDetailLevel::Medium {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => instr.stream_tick_emitted(),
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => instr.stream_tick_emitted(),
}
}
}
fn info_frame_received(&self, frame_started_at: Instant, frame_completed_at: Instant) {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => {
instr.info_frame_received(frame_started_at, frame_completed_at)
}
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => {
instr.info_frame_received(frame_started_at, frame_completed_at)
}
}
}
fn keep_alive_frame_received(&self, frame_started_at: Instant, frame_completed_at: Instant) {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => {
instr.keep_alive_frame_received(frame_started_at, frame_completed_at)
}
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => {
instr.keep_alive_frame_received(frame_started_at, frame_completed_at)
}
}
}
fn batch_frame_received(
&self,
frame_started_at: Instant,
frame_completed_at: Instant,
events_bytes: usize,
) {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => {
instr.batch_frame_received(frame_started_at, frame_completed_at, events_bytes)
}
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => {
instr.batch_frame_received(frame_started_at, frame_completed_at, events_bytes)
}
}
}
fn batch_frame_gap(&self, gap: Duration) {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => instr.batch_frame_gap(gap),
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => instr.batch_frame_gap(gap),
}
}
fn no_frames_warning(&self, no_frames_for: Duration) {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => instr.no_frames_warning(no_frames_for),
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => instr.no_frames_warning(no_frames_for),
}
}
fn no_events_warning(&self, no_events_for: Duration) {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => instr.no_events_warning(no_events_for),
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => instr.no_events_warning(no_events_for),
}
}
fn stream_dead(&self, after: Duration) {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => instr.stream_dead(after),
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => instr.stream_dead(after),
}
}
fn stream_error(&self, err: &EventStreamError) {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => instr.stream_error(err),
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => instr.stream_error(err),
}
}
fn stream_unconsumed_events(&self, n_unconsumed: usize) {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => {
instr.stream_unconsumed_events(n_unconsumed)
}
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => {
instr.stream_unconsumed_events(n_unconsumed)
}
}
}
fn batches_in_flight_incoming(&self, stats: &EventStreamBatchStats) {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => instr.batches_in_flight_incoming(stats),
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => instr.batches_in_flight_incoming(stats),
}
}
fn batches_in_flight_processed(&self, stats: &EventStreamBatchStats) {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => instr.batches_in_flight_processed(stats),
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => instr.batches_in_flight_processed(stats),
}
}
fn in_flight_stats_reset(&self) {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => instr.in_flight_stats_reset(),
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => instr.in_flight_stats_reset(),
}
}
fn event_type_partition_activated(&self) {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => instr.event_type_partition_activated(),
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => instr.event_type_partition_activated(),
}
}
fn event_type_partition_deactivated(&self, active_for: Duration) {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => {
instr.event_type_partition_deactivated(active_for)
}
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => {
instr.event_type_partition_deactivated(active_for)
}
}
}
fn batch_processing_started(&self, frame_started_at: Instant, frame_completed_at: Instant) {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => {
instr.batch_processing_started(frame_started_at, frame_completed_at)
}
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => {
instr.batch_processing_started(frame_started_at, frame_completed_at)
}
}
}
fn batch_processed(&self, n_bytes: usize, time: Duration) {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => instr.batch_processed(n_bytes, time),
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => instr.batch_processed(n_bytes, time),
}
}
fn batch_processed_n_events(&self, n_events: usize) {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => instr.batch_processed_n_events(n_events),
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => instr.batch_processed_n_events(n_events),
}
}
fn batch_deserialized(&self, n_bytes: usize, time: Duration) {
if self.detail >= MetricsDetailLevel::Medium {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => {
instr.batch_deserialized(n_bytes, time)
}
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => {
instr.batch_deserialized(n_bytes, time)
}
}
}
}
fn cursor_to_commit_received(&self, frame_started_at: Instant, frame_completed_at: Instant) {
if self.detail >= MetricsDetailLevel::Medium {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => {
instr.cursor_to_commit_received(frame_started_at, frame_completed_at)
}
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => {
instr.cursor_to_commit_received(frame_started_at, frame_completed_at)
}
}
}
}
fn cursors_commit_triggered(&self, trigger: CommitTrigger) {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => instr.cursors_commit_triggered(trigger),
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => instr.cursors_commit_triggered(trigger),
}
}
fn cursor_ages_on_commit_attempt(
&self,
first_cursor_age: Duration,
last_cursor_age: Duration,
first_cursor_age_warning: bool,
) {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => instr.cursor_ages_on_commit_attempt(
first_cursor_age,
last_cursor_age,
first_cursor_age_warning,
),
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => instr.cursor_ages_on_commit_attempt(
first_cursor_age,
last_cursor_age,
first_cursor_age_warning,
),
}
}
fn cursors_committed(&self, n_cursors: usize, time: Duration) {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => instr.cursors_committed(n_cursors, time),
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => instr.cursors_committed(n_cursors, time),
}
}
fn batches_committed(&self, n_batches: usize, n_events: usize) {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => {
instr.batches_committed(n_batches, n_events)
}
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => {
instr.batches_committed(n_batches, n_events)
}
}
}
fn cursors_not_committed(&self, n_cursors: usize, time: Duration, err: &CommitError) {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => {
instr.cursors_not_committed(n_cursors, time, err)
}
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => {
instr.cursors_not_committed(n_cursors, time, err)
}
}
}
fn commit_cursors_attempt_failed(&self, n_cursors: usize, time: Duration) {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => {
instr.commit_cursors_attempt_failed(n_cursors, time)
}
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => {
instr.commit_cursors_attempt_failed(n_cursors, time)
}
}
}
fn stream_parameters(&self, params: &StreamParameters) {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => instr.stream_parameters(params),
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => instr.stream_parameters(params),
}
}
}
#[derive(Clone)]
enum InstrumentationSelection {
Off,
Custom(Arc<dyn Instruments + Send + Sync>),
#[cfg(feature = "metrix")]
Metrix(Metrix),
}