use std::time::Duration;
use log::info;
pub use crate::pool::PoolStats;
#[cfg(feature = "metrix")]
pub use self::metrix::MetrixConfig;
pub type NoInstrumentation = ();
pub trait Instrumentation {
fn checked_out_connection(&self);
fn checked_in_returned_connection(&self, flight_time: Duration);
fn checked_in_new_connection(&self);
fn connection_dropped(&self, flight_time: Duration, lifetime: Duration);
fn connection_created(&self, connected_after: Duration, total_time: Duration);
fn connection_killed(&self, lifetime: Duration);
fn reservation_added(&self);
fn reservation_fulfilled(&self, after: Duration);
fn reservation_not_fulfilled(&self, after: Duration);
fn reservation_limit_reached(&self);
fn connection_factory_failed(&self);
fn stats(&self, stats: PoolStats);
}
impl Instrumentation for NoInstrumentation {
fn checked_out_connection(&self) {}
fn checked_in_returned_connection(&self, _flight_time: Duration) {}
fn checked_in_new_connection(&self) {}
fn connection_dropped(&self, _flight_time: Duration, _lifetime: Duration) {}
fn connection_created(&self, _connected_after: Duration, _total_time: Duration) {}
fn connection_killed(&self, _lifetime: Duration) {}
fn reservation_added(&self) {}
fn reservation_fulfilled(&self, _after: Duration) {}
fn reservation_not_fulfilled(&self, _after: Duration) {}
fn reservation_limit_reached(&self) {}
fn connection_factory_failed(&self) {}
fn stats(&self, _stats: PoolStats) {}
}
pub struct StatsLogger;
impl Instrumentation for StatsLogger {
fn checked_out_connection(&self) {}
fn checked_in_returned_connection(&self, _flight_time: Duration) {}
fn checked_in_new_connection(&self) {}
fn connection_dropped(&self, _flight_time: Duration, _lifetime: Duration) {}
fn connection_created(&self, _connected_after: Duration, _total_time: Duration) {}
fn connection_killed(&self, _lifetime: Duration) {}
fn reservation_added(&self) {}
fn reservation_fulfilled(&self, _after: Duration) {}
fn reservation_not_fulfilled(&self, _after: Duration) {}
fn reservation_limit_reached(&self) {}
fn connection_factory_failed(&self) {}
fn stats(&self, stats: PoolStats) {
info!("{:#?}", stats);
}
}
#[cfg(feature = "metrix")]
pub(crate) mod metrix {
use std::time::Duration;
use metrix::cockpit::Cockpit;
use metrix::instruments::switches::StaircaseTimer;
use metrix::instruments::*;
use metrix::processor::{AggregatesProcessors, TelemetryProcessor};
use metrix::{TelemetryTransmitter, TransmitsTelemetryData};
use super::Instrumentation;
use crate::pool::PoolStats;
pub struct MetrixConfig {
pub inactivity_limit: Option<Duration>,
pub reset_histograms_after_inactivity: bool,
pub track_extrema_in_gauges: Option<Duration>,
pub alert_duration: Duration,
}
impl MetrixConfig {
pub fn inactivity_limit(mut self, v: Duration) -> Self {
self.inactivity_limit = Some(v);
self
}
pub fn reset_histograms_after_inactivity(mut self, v: bool) -> Self {
self.reset_histograms_after_inactivity = v;
self
}
pub fn track_extrema_in_gauges(mut self, v: Duration) -> Self {
self.track_extrema_in_gauges = Some(v);
self
}
pub fn alert_duration(mut self, v: Duration) -> Self {
self.alert_duration = v;
self
}
fn configure_gauge(&self, gauge: &mut Gauge) {
if let Some(ext_dur) = self.track_extrema_in_gauges {
gauge.set_memorize_extrema(ext_dur);
}
}
fn configure_histogram(&self, histogram: &mut Histogram) {
if let Some(inactivity_limit) = self.inactivity_limit {
histogram.set_inactivity_limit(inactivity_limit);
histogram.reset_after_inactivity(self.reset_histograms_after_inactivity);
}
}
fn add_alert<L>(&self, panel: &mut Panel<L>) {
let mut alert = StaircaseTimer::new_with_defaults("alert");
alert.set_switch_off_after(self.alert_duration);
panel.add_instrument(alert);
}
}
impl Default for MetrixConfig {
fn default() -> Self {
Self {
inactivity_limit: None,
reset_histograms_after_inactivity: false,
track_extrema_in_gauges: Some(Duration::from_secs(30)),
alert_duration: Duration::from_secs(60),
}
}
}
#[derive(Clone, Copy, Eq, PartialEq)]
pub enum Metric {
CheckOutConnection,
CheckedInReturnedConnection,
CheckedInNewConnection,
ConnectionDropped,
ConnectionKilled,
ConnectionCreated,
ConnectionCreatedTotalTime,
ReservationAdded,
ReservationFulfilled,
ReservationNotFulfilled,
ReservationLimitReached,
ConnectionFactoryFailed,
LifeTime,
PoolSizeChangedMin,
PoolSizeChangedMax,
InFlightConnectionsChangedMin,
InFlightConnectionsChangedMax,
IdleConnectionsChangedMin,
IdleConnectionsChangedMax,
ReservationsChangedMin,
ReservationsChangedMax,
NodeCount,
}
pub fn create<A: AggregatesProcessors>(
aggregates_processors: &mut A,
config: MetrixConfig,
) -> MetrixInstrumentation {
let mut cockpit = Cockpit::without_name(None);
let mut panel = Panel::with_name(Metric::CheckOutConnection, "checked_out_connections");
panel.set_meter(Meter::new_with_defaults("per_second"));
cockpit.add_panel(panel);
let mut panel = Panel::with_name(
Metric::CheckedInReturnedConnection,
"checked_in_returned_connections",
);
panel.set_value_scaling(ValueScaling::NanosToMicros);
panel.set_meter(Meter::new_with_defaults("per_second"));
let mut histogram = Histogram::new_with_defaults("flight_time_us");
config.configure_histogram(&mut histogram);
panel.set_histogram(histogram);
cockpit.add_panel(panel);
let mut panel =
Panel::with_name(Metric::CheckedInNewConnection, "checked_in_new_connections");
panel.set_meter(Meter::new_with_defaults("per_second"));
cockpit.add_panel(panel);
let mut panel = Panel::with_name(Metric::ConnectionDropped, "connections_dropped");
panel.set_value_scaling(ValueScaling::NanosToMicros);
panel.set_meter(Meter::new_with_defaults("per_second"));
let mut histogram = Histogram::new_with_defaults("flight_time_us");
config.configure_histogram(&mut histogram);
panel.set_histogram(histogram);
config.add_alert(&mut panel);
cockpit.add_panel(panel);
let mut panel = Panel::with_name(Metric::ConnectionKilled, "connections_killed");
panel.set_meter(Meter::new_with_defaults("per_second"));
cockpit.add_panel(panel);
let mut panel = Panel::with_name(Metric::ConnectionCreated, "connections_created");
panel.set_value_scaling(ValueScaling::NanosToMicros);
panel.set_meter(Meter::new_with_defaults("per_second"));
let mut histogram = Histogram::new_with_defaults("connect_time_us");
config.configure_histogram(&mut histogram);
panel.set_histogram(histogram);
cockpit.add_panel(panel);
let mut panel = Panel::with_name(
Metric::ConnectionCreatedTotalTime,
"connections_created_total",
);
panel.set_value_scaling(ValueScaling::NanosToMillis);
let mut histogram = Histogram::new_with_defaults("time_ms");
config.configure_histogram(&mut histogram);
panel.set_histogram(histogram);
cockpit.add_panel(panel);
let mut panel = Panel::with_name(Metric::ReservationAdded, "reservations_added");
panel.set_meter(Meter::new_with_defaults("per_second"));
cockpit.add_panel(panel);
let mut panel = Panel::with_name(Metric::ReservationFulfilled, "reservations_fulfilled");
panel.set_value_scaling(ValueScaling::NanosToMicros);
panel.set_meter(Meter::new_with_defaults("per_second"));
let mut histogram = Histogram::new_with_defaults("fulfilled_after_us");
config.configure_histogram(&mut histogram);
panel.set_histogram(histogram);
cockpit.add_panel(panel);
let mut panel = Panel::with_name(
Metric::ReservationNotFulfilled,
"reservations_not_fulfilled",
);
panel.set_value_scaling(ValueScaling::NanosToMicros);
panel.set_meter(Meter::new_with_defaults("per_second"));
let mut histogram = Histogram::new_with_defaults("not_fulfilled_after_us");
config.configure_histogram(&mut histogram);
panel.set_histogram(histogram);
cockpit.add_panel(panel);
let mut panel =
Panel::with_name(Metric::ReservationLimitReached, "reservation_limit_reached");
panel.set_meter(Meter::new_with_defaults("per_second"));
config.add_alert(&mut panel);
cockpit.add_panel(panel);
let mut panel =
Panel::with_name(Metric::ConnectionFactoryFailed, "connection_factory_failed");
panel.set_meter(Meter::new_with_defaults("per_second"));
config.add_alert(&mut panel);
cockpit.add_panel(panel);
let mut panel = Panel::with_name(Metric::LifeTime, "life_times");
panel.set_value_scaling(ValueScaling::NanosToMillis);
panel.set_meter(Meter::new_with_defaults("lifes_ended_per_second"));
panel.set_histogram(Histogram::new_with_defaults("life_time_ms"));
cockpit.add_panel(panel);
let mut panel = Panel::with_name(Metric::PoolSizeChangedMin, "pool_size_min");
let mut gauge = Gauge::new_with_defaults("count");
config.configure_gauge(&mut gauge);
panel.set_gauge(gauge);
cockpit.add_panel(panel);
let mut panel = Panel::with_name(Metric::PoolSizeChangedMax, "pool_size_max");
let mut gauge = Gauge::new_with_defaults("count");
config.configure_gauge(&mut gauge);
panel.set_gauge(gauge);
cockpit.add_panel(panel);
let mut panel = Panel::with_name(Metric::IdleConnectionsChangedMin, "idle_min");
let mut gauge = Gauge::new_with_defaults("count");
config.configure_gauge(&mut gauge);
panel.set_gauge(gauge);
cockpit.add_panel(panel);
let mut panel = Panel::with_name(Metric::IdleConnectionsChangedMax, "idle_max");
let mut gauge = Gauge::new_with_defaults("count");
config.configure_gauge(&mut gauge);
panel.set_gauge(gauge);
cockpit.add_panel(panel);
let mut panel = Panel::with_name(Metric::InFlightConnectionsChangedMin, "in_flight_min");
let mut gauge = Gauge::new_with_defaults("count");
config.configure_gauge(&mut gauge);
panel.set_gauge(gauge);
cockpit.add_panel(panel);
let mut panel = Panel::with_name(Metric::InFlightConnectionsChangedMax, "in_fligh_max");
let mut gauge = Gauge::new_with_defaults("count");
config.configure_gauge(&mut gauge);
panel.set_gauge(gauge);
cockpit.add_panel(panel);
let mut panel = Panel::with_name(Metric::ReservationsChangedMin, "reservations_min");
let mut gauge = Gauge::new_with_defaults("count");
config.configure_gauge(&mut gauge);
panel.set_gauge(gauge);
cockpit.add_panel(panel);
let mut panel = Panel::with_name(Metric::ReservationsChangedMax, "reservations_max");
let mut gauge = Gauge::new_with_defaults("count");
config.configure_gauge(&mut gauge);
panel.set_gauge(gauge);
cockpit.add_panel(panel);
let mut panel = Panel::with_name(Metric::NodeCount, "nodes");
let mut gauge = Gauge::new_with_defaults("count");
config.configure_gauge(&mut gauge);
panel.set_gauge(gauge);
cockpit.add_panel(panel);
let (tx, mut rx) = TelemetryProcessor::new_pair_without_name();
rx.add_cockpit(cockpit);
if let Some(inactivity_limit) = config.inactivity_limit {
rx.set_inactivity_limit(inactivity_limit)
}
aggregates_processors.add_processor(rx);
MetrixInstrumentation::new(tx)
}
#[derive(Clone)]
pub struct MetrixInstrumentation {
transmitter: TelemetryTransmitter<Metric>,
}
impl MetrixInstrumentation {
pub fn new(transmitter: TelemetryTransmitter<Metric>) -> Self {
Self { transmitter }
}
}
impl Instrumentation for MetrixInstrumentation {
fn checked_out_connection(&self) {
self.transmitter
.observed_one_now(Metric::CheckOutConnection);
}
fn checked_in_returned_connection(&self, flight_time: Duration) {
self.transmitter
.observed_one_duration_now(Metric::CheckedInReturnedConnection, flight_time);
}
fn checked_in_new_connection(&self) {
self.transmitter
.observed_one_now(Metric::CheckedInNewConnection);
}
fn connection_dropped(&self, flight_time: Duration, lifetime: Duration) {
self.transmitter
.observed_one_duration_now(Metric::ConnectionDropped, flight_time)
.observed_one_duration_now(Metric::LifeTime, lifetime);
}
fn connection_created(&self, connected_after: Duration, total_time: Duration) {
self.transmitter
.observed_one_duration_now(Metric::ConnectionCreated, connected_after)
.observed_one_duration_now(Metric::ConnectionCreatedTotalTime, total_time);
}
fn connection_killed(&self, lifetime: Duration) {
self.transmitter
.observed_one_now(Metric::ConnectionKilled)
.observed_one_duration_now(Metric::LifeTime, lifetime);
}
fn reservation_added(&self) {
self.transmitter.observed_one_now(Metric::ReservationAdded);
}
fn reservation_fulfilled(&self, after: Duration) {
self.transmitter
.observed_one_duration_now(Metric::ReservationFulfilled, after);
}
fn reservation_not_fulfilled(&self, after: Duration) {
self.transmitter
.observed_one_duration_now(Metric::ReservationNotFulfilled, after);
}
fn reservation_limit_reached(&self) {
self.transmitter
.observed_one_now(Metric::ReservationLimitReached);
}
fn connection_factory_failed(&self) {
self.transmitter
.observed_one_now(Metric::ConnectionFactoryFailed);
}
fn stats(&self, stats: PoolStats) {
self.transmitter
.observed_one_value_now(Metric::PoolSizeChangedMin, stats.pool_size.min() as u64)
.observed_one_value_now(Metric::PoolSizeChangedMax, stats.pool_size.max() as u64)
.observed_one_value_now(Metric::NodeCount, stats.node_count as u64)
.observed_one_value_now(
Metric::InFlightConnectionsChangedMin,
stats.in_flight.min() as u64,
)
.observed_one_value_now(
Metric::InFlightConnectionsChangedMax,
stats.in_flight.max() as u64,
)
.observed_one_value_now(Metric::IdleConnectionsChangedMin, stats.idle.min() as u64)
.observed_one_value_now(Metric::IdleConnectionsChangedMax, stats.idle.max() as u64)
.observed_one_value_now(
Metric::ReservationsChangedMin,
stats.reservations.min() as u64,
)
.observed_one_value_now(
Metric::ReservationsChangedMax,
stats.reservations.max() as u64,
);
}
}
}