use crate::{
data::{Sample, ScopedKey},
helper::io_error,
receiver::MessageFrame,
scopes::Scopes,
};
use crossbeam_channel::Sender;
use quanta::Clock;
use std::{fmt::Display, hash::Hash, sync::Arc};
#[derive(Debug)]
pub enum SinkError {
InvalidScope,
}
pub trait AsScoped<'a> {
fn as_scoped(&'a self, base: String) -> String;
}
pub struct Sink<T: Clone + Eq + Hash + Display> {
msg_tx: Sender<MessageFrame<ScopedKey<T>>>,
clock: Clock,
scopes: Arc<Scopes>,
scope: String,
scope_id: u64,
}
impl<T: Clone + Eq + Hash + Display> Sink<T> {
pub(crate) fn new(
msg_tx: Sender<MessageFrame<ScopedKey<T>>>, clock: Clock, scopes: Arc<Scopes>, scope: String,
) -> Sink<T> {
let scope_id = scopes.register(scope.clone());
Sink {
msg_tx,
clock,
scopes,
scope,
scope_id,
}
}
pub(crate) fn new_with_scope_id(
msg_tx: Sender<MessageFrame<ScopedKey<T>>>, clock: Clock, scopes: Arc<Scopes>, scope: String, scope_id: u64,
) -> Sink<T> {
Sink {
msg_tx,
clock,
scopes,
scope,
scope_id,
}
}
pub fn scoped<'a, S: AsScoped<'a> + ?Sized>(&self, scope: &'a S) -> Sink<T> {
let new_scope = scope.as_scoped(self.scope.clone());
Sink::new(self.msg_tx.clone(), self.clock.clone(), self.scopes.clone(), new_scope)
}
pub fn clock(&self) -> &Clock { &self.clock }
pub fn update_count(&self, key: T, delta: i64) { self.send(Sample::Count(key, delta)) }
pub fn update_gauge(&self, key: T, value: u64) { self.send(Sample::Gauge(key, value)) }
pub fn update_timing(&self, key: T, start: u64, end: u64) { self.send(Sample::TimingHistogram(key, start, end, 1)) }
pub fn update_timing_with_count(&self, key: T, start: u64, end: u64, count: u64) {
self.send(Sample::TimingHistogram(key, start, end, count))
}
pub fn update_value(&self, key: T, value: u64) { self.send(Sample::ValueHistogram(key, value)) }
pub fn increment(&self, key: T) { self.update_count(key, 1) }
pub fn decrement(&self, key: T) { self.update_count(key, -1) }
fn send(&self, sample: Sample<T>) {
let _ = self
.msg_tx
.send(MessageFrame::Data(sample.into_scoped(self.scope_id)))
.map_err(|_| io_error("failed to send sample"));
}
}
impl<T: Clone + Eq + Hash + Display> Clone for Sink<T> {
fn clone(&self) -> Sink<T> {
Sink {
msg_tx: self.msg_tx.clone(),
clock: self.clock.clone(),
scopes: self.scopes.clone(),
scope: self.scope.clone(),
scope_id: self.scope_id,
}
}
}
impl<'a> AsScoped<'a> for str {
fn as_scoped(&'a self, mut base: String) -> String {
if !base.is_empty() {
base.push_str(".");
}
base.push_str(self);
base
}
}
impl<'a, 'b, T> AsScoped<'a> for T
where
&'a T: AsRef<[&'b str]>,
T: 'a,
{
fn as_scoped(&'a self, mut base: String) -> String {
for item in self.as_ref() {
if !base.is_empty() {
base.push('.');
}
base.push_str(item);
}
base
}
}