use std::fmt;
use std::str::FromStr;
use std::sync::Arc;
use std::time::{Duration, Instant};
use crate::nakadi_types::Error;
use crate::components::{
committer::{CommitError, CommitTrigger},
connector::ConnectError,
streams::EventStreamError,
};
#[cfg(feature = "metrix")]
mod metrix_impl;
#[cfg(feature = "metrix")]
pub use self::metrix_impl::{Metrix, MetrixConfig, MetrixGaugeTrackingSecs};
#[cfg(feature = "metrix")]
pub use metrix::{
driver::{DriverBuilder, TelemetryDriver},
processor::ProcessorMount,
AggregatesProcessors,
};
pub trait Instruments {
fn stream_connect_attempt_success(&self, time: Duration);
fn stream_connect_attempt_failed(&self, time: Duration);
fn stream_connected(&self, time: Duration);
fn stream_not_connected(&self, time: Duration, err: &ConnectError);
fn stream_chunk_received(&self, n_bytes: usize);
fn stream_frame_completed(&self, n_bytes: usize, time: Duration);
fn stream_tick_emitted(&self);
fn info_frame_received(&self, frame_started_at: Instant, frame_completed_at: Instant);
fn keep_alive_frame_received(&self, frame_started_at: Instant, frame_completed_at: Instant);
fn batch_frame_received(
&self,
frame_started_at: Instant,
frame_completed_at: Instant,
events_bytes: usize,
);
fn batch_frame_gap(&self, gap: Duration);
fn no_frames_warning(&self, no_frames_for: Duration);
fn no_events_warning(&self, no_events_for: Duration);
fn stream_dead(&self, after: Duration);
fn stream_error(&self, err: &EventStreamError);
fn batches_in_flight_inc(&self);
fn batches_in_flight_dec(&self);
fn batches_in_flight_dec_by(&self, by: usize);
fn event_type_partition_activated(&self);
fn event_type_partition_deactivated(&self, active_for: Duration);
fn batch_processing_started(&self, frame_started_at: Instant, frame_completed_at: Instant);
fn batch_processed(&self, n_bytes: usize, time: Duration);
fn batch_processed_n_events(&self, n_events: usize);
fn batch_deserialized(&self, n_bytes: usize, time: Duration);
fn cursor_to_commit_received(&self, frame_started_at: Instant, frame_completed_at: Instant);
fn cursors_commit_triggered(&self, trigger: CommitTrigger);
fn cursors_committed(&self, n_cursors: usize, time: Duration);
fn cursors_not_committed(&self, n_cursors: usize, time: Duration, err: &CommitError);
fn commit_cursors_attempt_failed(&self, n_cursors: usize, time: Duration);
}
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub enum MetricsDetailLevel {
Low,
Medium,
High,
}
impl MetricsDetailLevel {
env_funs!("METRICS_DETAIL_LEVEL");
}
impl Default for MetricsDetailLevel {
fn default() -> Self {
MetricsDetailLevel::Medium
}
}
impl FromStr for MetricsDetailLevel {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_lowercase().as_ref() {
"low" => Ok(MetricsDetailLevel::Low),
"medium" => Ok(MetricsDetailLevel::Medium),
"high" => Ok(MetricsDetailLevel::High),
s => Err(Error::new(format!(
"{} is not a valid MetricsDetailLevel",
s
))),
}
}
}
#[derive(Clone)]
pub struct Instrumentation {
instr: InstrumentationSelection,
detail: MetricsDetailLevel,
}
impl Instrumentation {
pub fn off() -> Self {
{
Instrumentation {
instr: InstrumentationSelection::Off,
detail: MetricsDetailLevel::default(),
}
}
}
pub fn new<I>(instruments: I, detail: MetricsDetailLevel) -> Self
where
I: Instruments + Send + Sync + 'static,
{
Instrumentation {
instr: InstrumentationSelection::Custom(Arc::new(instruments)),
detail,
}
}
#[cfg(feature = "metrix")]
pub fn metrix(metrix: Metrix, detail: MetricsDetailLevel) -> Self {
Instrumentation {
instr: InstrumentationSelection::Metrix(metrix),
detail,
}
}
#[cfg(feature = "metrix")]
pub fn metrix_mounted<A: AggregatesProcessors>(
config: &MetrixConfig,
detail: MetricsDetailLevel,
processor: &mut A,
) -> Self {
let metrix = Metrix::new(config, processor);
Self::metrix(metrix, detail)
}
#[cfg(feature = "metrix")]
pub fn metrix_mountable(
config: &MetrixConfig,
detail: MetricsDetailLevel,
mountable_name: Option<&str>,
) -> (Self, ProcessorMount) {
let (metrix, mount) = Metrix::new_mountable(config, mountable_name);
(Self::metrix(metrix, detail), mount)
}
}
impl Default for Instrumentation {
fn default() -> Self {
Instrumentation::off()
}
}
impl fmt::Debug for Instrumentation {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Instrumentation")?;
Ok(())
}
}
impl Instruments for Instrumentation {
fn stream_connect_attempt_success(&self, time: Duration) {
if self.detail >= MetricsDetailLevel::Medium {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => {
instr.stream_connect_attempt_success(time)
}
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => {
instr.stream_connect_attempt_success(time)
}
}
}
}
fn stream_connect_attempt_failed(&self, time: Duration) {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => {
instr.stream_connect_attempt_failed(time)
}
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => {
instr.stream_connect_attempt_failed(time)
}
}
}
fn stream_connected(&self, time: Duration) {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => instr.stream_connected(time),
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => instr.stream_connected(time),
}
}
fn stream_not_connected(&self, time: Duration, err: &ConnectError) {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => instr.stream_not_connected(time, err),
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => instr.stream_not_connected(time, err),
}
}
fn stream_chunk_received(&self, n_bytes: usize) {
if self.detail == MetricsDetailLevel::High {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => instr.stream_chunk_received(n_bytes),
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => instr.stream_chunk_received(n_bytes),
}
}
}
fn stream_frame_completed(&self, n_bytes: usize, time: Duration) {
if self.detail == MetricsDetailLevel::High {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => {
instr.stream_frame_completed(n_bytes, time)
}
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => {
instr.stream_frame_completed(n_bytes, time)
}
}
}
}
fn stream_tick_emitted(&self) {
if self.detail >= MetricsDetailLevel::Medium {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => instr.stream_tick_emitted(),
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => instr.stream_tick_emitted(),
}
}
}
fn info_frame_received(&self, frame_started_at: Instant, frame_completed_at: Instant) {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => {
instr.info_frame_received(frame_started_at, frame_completed_at)
}
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => {
instr.info_frame_received(frame_started_at, frame_completed_at)
}
}
}
fn keep_alive_frame_received(&self, frame_started_at: Instant, frame_completed_at: Instant) {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => {
instr.keep_alive_frame_received(frame_started_at, frame_completed_at)
}
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => {
instr.keep_alive_frame_received(frame_started_at, frame_completed_at)
}
}
}
fn batch_frame_received(
&self,
frame_started_at: Instant,
frame_completed_at: Instant,
events_bytes: usize,
) {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => {
instr.batch_frame_received(frame_started_at, frame_completed_at, events_bytes)
}
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => {
instr.batch_frame_received(frame_started_at, frame_completed_at, events_bytes)
}
}
}
fn batch_frame_gap(&self, gap: Duration) {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => instr.batch_frame_gap(gap),
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => instr.batch_frame_gap(gap),
}
}
fn no_frames_warning(&self, no_frames_for: Duration) {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => instr.no_frames_warning(no_frames_for),
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => instr.no_frames_warning(no_frames_for),
}
}
fn no_events_warning(&self, no_events_for: Duration) {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => instr.no_events_warning(no_events_for),
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => instr.no_events_warning(no_events_for),
}
}
fn stream_dead(&self, after: Duration) {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => instr.stream_dead(after),
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => instr.stream_dead(after),
}
}
fn stream_error(&self, err: &EventStreamError) {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => instr.stream_error(err),
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => instr.stream_error(err),
}
}
fn batches_in_flight_inc(&self) {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => instr.batches_in_flight_inc(),
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => instr.batches_in_flight_inc(),
}
}
fn batches_in_flight_dec(&self) {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => instr.batches_in_flight_dec(),
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => instr.batches_in_flight_dec(),
}
}
fn batches_in_flight_dec_by(&self, by: usize) {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => instr.batches_in_flight_dec_by(by),
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => instr.batches_in_flight_dec_by(by),
}
}
fn event_type_partition_activated(&self) {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => instr.event_type_partition_activated(),
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => instr.event_type_partition_activated(),
}
}
fn event_type_partition_deactivated(&self, active_for: Duration) {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => {
instr.event_type_partition_deactivated(active_for)
}
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => {
instr.event_type_partition_deactivated(active_for)
}
}
}
fn batch_processing_started(&self, frame_started_at: Instant, frame_completed_at: Instant) {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => {
instr.batch_processing_started(frame_started_at, frame_completed_at)
}
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => {
instr.batch_processing_started(frame_started_at, frame_completed_at)
}
}
}
fn batch_processed(&self, n_bytes: usize, time: Duration) {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => instr.batch_processed(n_bytes, time),
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => instr.batch_processed(n_bytes, time),
}
}
fn batch_processed_n_events(&self, n_events: usize) {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => instr.batch_processed_n_events(n_events),
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => instr.batch_processed_n_events(n_events),
}
}
fn batch_deserialized(&self, n_bytes: usize, time: Duration) {
if self.detail >= MetricsDetailLevel::Medium {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => {
instr.batch_deserialized(n_bytes, time)
}
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => {
instr.batch_deserialized(n_bytes, time)
}
}
}
}
fn cursor_to_commit_received(&self, frame_started_at: Instant, frame_completed_at: Instant) {
if self.detail >= MetricsDetailLevel::Medium {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => {
instr.cursor_to_commit_received(frame_started_at, frame_completed_at)
}
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => {
instr.cursor_to_commit_received(frame_started_at, frame_completed_at)
}
}
}
}
fn cursors_commit_triggered(&self, trigger: CommitTrigger) {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => instr.cursors_commit_triggered(trigger),
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => instr.cursors_commit_triggered(trigger),
}
}
fn cursors_committed(&self, n_cursors: usize, time: Duration) {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => instr.cursors_committed(n_cursors, time),
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => instr.cursors_committed(n_cursors, time),
}
}
fn cursors_not_committed(&self, n_cursors: usize, time: Duration, err: &CommitError) {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => {
instr.cursors_not_committed(n_cursors, time, err)
}
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => {
instr.cursors_not_committed(n_cursors, time, err)
}
}
}
fn commit_cursors_attempt_failed(&self, n_cursors: usize, time: Duration) {
match self.instr {
InstrumentationSelection::Off => {}
InstrumentationSelection::Custom(ref instr) => {
instr.commit_cursors_attempt_failed(n_cursors, time)
}
#[cfg(feature = "metrix")]
InstrumentationSelection::Metrix(ref instr) => {
instr.commit_cursors_attempt_failed(n_cursors, time)
}
}
}
}
#[derive(Clone)]
enum InstrumentationSelection {
Off,
Custom(Arc<dyn Instruments + Send + Sync>),
#[cfg(feature = "metrix")]
Metrix(Metrix),
}