use crate::{Context, Publication};
use std::time::SystemTime;
fn rate_per_sec(count: u64, interval_secs: f64) -> f64 {
if interval_secs > 0.0 {
count as f64 / interval_secs
} else {
0.0
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PublicationMetricsSnapshot {
pub send_calls: u64,
pub packets_sent: u64,
pub bytes_sent: u64,
pub send_errors: u64,
pub captured_at: SystemTime,
}
#[derive(Debug, Clone, PartialEq)]
pub struct PublicationMetricsDelta {
pub interval_secs: f64,
pub send_calls: u64,
pub packets_sent: u64,
pub bytes_sent: u64,
pub send_errors: u64,
}
impl PublicationMetricsSnapshot {
pub fn delta_since(&self, earlier: &Self) -> Option<PublicationMetricsDelta> {
let duration = self.captured_at.duration_since(earlier.captured_at).ok()?;
let interval_secs = duration.as_secs_f64();
Some(PublicationMetricsDelta {
interval_secs,
send_calls: self.send_calls.checked_sub(earlier.send_calls)?,
packets_sent: self.packets_sent.checked_sub(earlier.packets_sent)?,
bytes_sent: self.bytes_sent.checked_sub(earlier.bytes_sent)?,
send_errors: self.send_errors.checked_sub(earlier.send_errors)?,
})
}
}
impl PublicationMetricsDelta {
pub fn send_calls_per_sec(&self) -> f64 {
rate_per_sec(self.send_calls, self.interval_secs)
}
pub fn packets_per_sec(&self) -> f64 {
rate_per_sec(self.packets_sent, self.interval_secs)
}
pub fn bytes_per_sec(&self) -> f64 {
rate_per_sec(self.bytes_sent, self.interval_secs)
}
pub fn send_errors_per_sec(&self) -> f64 {
rate_per_sec(self.send_errors, self.interval_secs)
}
}
#[derive(Debug, Clone)]
pub struct PublicationMetricsSampler<'a> {
publication: &'a Publication,
previous: Option<PublicationMetricsSnapshot>,
}
impl<'a> PublicationMetricsSampler<'a> {
pub fn new(publication: &'a Publication) -> Self {
Self {
publication,
previous: None,
}
}
pub fn snapshot(&self) -> PublicationMetricsSnapshot {
self.publication.metrics_snapshot()
}
pub fn sample(&mut self) -> Option<PublicationMetricsDelta> {
let current = self.snapshot();
self.sample_snapshot(current)
}
pub fn sample_snapshot(
&mut self,
current: PublicationMetricsSnapshot,
) -> Option<PublicationMetricsDelta> {
let delta = match &self.previous {
Some(previous) => current.delta_since(previous),
None => None,
};
self.previous = Some(current);
delta
}
pub fn reset(&mut self) {
self.previous = None;
}
pub fn previous(&self) -> Option<&PublicationMetricsSnapshot> {
self.previous.as_ref()
}
pub fn delta(&mut self) -> Option<PublicationMetricsDelta> {
self.sample()
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ContextMetricsSnapshot {
pub publications_added: u64,
pub publications_removed: u64,
pub active_publications: usize,
pub total_send_calls: u64,
pub total_packets_sent: u64,
pub total_bytes_sent: u64,
pub total_send_errors: u64,
pub captured_at: SystemTime,
}
#[derive(Debug, Clone, PartialEq)]
pub struct ContextMetricsDelta {
pub interval_secs: f64,
pub publications_added: u64,
pub publications_removed: u64,
pub send_calls: u64,
pub packets_sent: u64,
pub bytes_sent: u64,
pub send_errors: u64,
}
impl ContextMetricsSnapshot {
pub fn delta_since(&self, earlier: &Self) -> Option<ContextMetricsDelta> {
let duration = self.captured_at.duration_since(earlier.captured_at).ok()?;
let interval_secs = duration.as_secs_f64();
Some(ContextMetricsDelta {
interval_secs,
publications_added: self
.publications_added
.checked_sub(earlier.publications_added)?,
publications_removed: self
.publications_removed
.checked_sub(earlier.publications_removed)?,
send_calls: self
.total_send_calls
.checked_sub(earlier.total_send_calls)?,
packets_sent: self
.total_packets_sent
.checked_sub(earlier.total_packets_sent)?,
bytes_sent: self
.total_bytes_sent
.checked_sub(earlier.total_bytes_sent)?,
send_errors: self
.total_send_errors
.checked_sub(earlier.total_send_errors)?,
})
}
}
impl ContextMetricsDelta {
pub fn send_calls_per_sec(&self) -> f64 {
rate_per_sec(self.send_calls, self.interval_secs)
}
pub fn packets_per_sec(&self) -> f64 {
rate_per_sec(self.packets_sent, self.interval_secs)
}
pub fn bytes_per_sec(&self) -> f64 {
rate_per_sec(self.bytes_sent, self.interval_secs)
}
pub fn send_errors_per_sec(&self) -> f64 {
rate_per_sec(self.send_errors, self.interval_secs)
}
}
#[derive(Debug, Clone)]
pub struct ContextMetricsSampler<'a> {
context: &'a Context,
previous: Option<ContextMetricsSnapshot>,
}
impl<'a> ContextMetricsSampler<'a> {
pub fn new(context: &'a Context) -> Self {
Self {
context,
previous: None,
}
}
pub fn snapshot(&self) -> ContextMetricsSnapshot {
self.context.metrics_snapshot()
}
pub fn sample(&mut self) -> Option<ContextMetricsDelta> {
let current = self.snapshot();
self.sample_snapshot(current)
}
pub fn sample_snapshot(
&mut self,
current: ContextMetricsSnapshot,
) -> Option<ContextMetricsDelta> {
let delta = match &self.previous {
Some(previous) => current.delta_since(previous),
None => None,
};
self.previous = Some(current);
delta
}
pub fn reset(&mut self) {
self.previous = None;
}
pub fn previous(&self) -> Option<&ContextMetricsSnapshot> {
self.previous.as_ref()
}
pub fn delta(&mut self) -> Option<ContextMetricsDelta> {
self.sample()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
fn context_snapshot(
publications_added: u64,
publications_removed: u64,
active_publications: usize,
total_send_calls: u64,
total_packets_sent: u64,
total_bytes_sent: u64,
total_send_errors: u64,
captured_at: SystemTime,
) -> ContextMetricsSnapshot {
ContextMetricsSnapshot {
publications_added,
publications_removed,
active_publications,
total_send_calls,
total_packets_sent,
total_bytes_sent,
total_send_errors,
captured_at,
}
}
fn publication_snapshot(
send_calls: u64,
packets_sent: u64,
bytes_sent: u64,
send_errors: u64,
captured_at: SystemTime,
) -> PublicationMetricsSnapshot {
PublicationMetricsSnapshot {
send_calls,
packets_sent,
bytes_sent,
send_errors,
captured_at,
}
}
#[test]
fn context_delta_since_uses_lifetime_total_fields() {
let earlier = context_snapshot(1, 0, 1, 10, 8, 800, 2, SystemTime::UNIX_EPOCH);
let later = context_snapshot(
2,
1,
1,
14,
11,
1250,
3,
SystemTime::UNIX_EPOCH + Duration::from_secs(2),
);
let delta = later.delta_since(&earlier).unwrap();
assert_eq!(delta.interval_secs, 2.0);
assert_eq!(delta.publications_added, 1);
assert_eq!(delta.publications_removed, 1);
assert_eq!(delta.send_calls, 4);
assert_eq!(delta.packets_sent, 3);
assert_eq!(delta.bytes_sent, 450);
assert_eq!(delta.send_errors, 1);
assert_eq!(delta.packets_per_sec(), 1.5);
assert_eq!(delta.bytes_per_sec(), 225.0);
}
#[test]
fn publication_delta_since_uses_interval_and_rates() {
let earlier = publication_snapshot(4, 3, 300, 1, SystemTime::UNIX_EPOCH);
let later = publication_snapshot(
7,
5,
620,
2,
SystemTime::UNIX_EPOCH + Duration::from_secs(4),
);
let delta = later.delta_since(&earlier).unwrap();
assert_eq!(delta.interval_secs, 4.0);
assert_eq!(delta.send_calls, 3);
assert_eq!(delta.packets_sent, 2);
assert_eq!(delta.bytes_sent, 320);
assert_eq!(delta.send_errors, 1);
assert_eq!(delta.send_calls_per_sec(), 0.75);
assert_eq!(delta.packets_per_sec(), 0.5);
assert_eq!(delta.bytes_per_sec(), 80.0);
assert_eq!(delta.send_errors_per_sec(), 0.25);
}
}