#[cfg(feature = "redis-store")]
use crate::utils::to_redis_parsing_error;
use crate::Dt;
use chrono::Utc;
#[cfg(feature = "redis-store")]
use redis::{self, FromRedisValue, ParsingError};
use serde::{
de::{self, Visitor},
Deserialize, Serialize,
};
use std::fmt;
use std::time::Duration;
use tokio_metrics::TaskMetrics;
use uuid::Uuid;
use hdrhistogram::serialization::{Deserializer, Serializer, V2Serializer};
use hdrhistogram::Histogram;
pub const HISTOGRAM_MAX_NS: u64 = 100_000_000_000;
pub const HISTOGRAM_SIGFIG: u8 = 2;
#[derive(Deserialize, Serialize, Debug, PartialEq, Eq, PartialOrd, Ord, Clone)]
pub struct WorkerMetrics {
pub worker_id: Uuid,
pub active_len: usize,
pub tasks: Vec<TaskInfo>,
pub last_updated: Dt,
pub ttl_ms: u64,
}
impl WorkerMetrics {
#[must_use]
pub fn new(worker_id: Uuid, active_len: usize, tasks: Vec<TaskInfo>, ttl: u64) -> Self {
use chrono::Utc;
let last_updated = Utc::now();
Self {
ttl_ms: ttl,
last_updated,
worker_id,
active_len,
tasks,
}
}
}
#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct TaskInfo {
pub task_id: u64,
pub job_id: u64,
pub metrics: TaskStats,
pub last_updated: Dt,
pub poll_histogram: HistogramWrapper,
}
impl TaskInfo {
#[must_use]
pub fn new(task_id: u64, job_id: u64, metrics: TaskMetrics, histogram: Histogram<u64>) -> Self {
let poll_histogram = HistogramWrapper(histogram);
Self {
task_id,
job_id,
metrics: TaskStats::from_metrics(metrics),
last_updated: Utc::now(),
poll_histogram,
}
}
#[allow(dead_code)]
fn update(&mut self, metrics: TaskMetrics) {
self.metrics = TaskStats::from_metrics(metrics);
self.last_updated = Utc::now();
}
}
#[allow(clippy::struct_field_names)]
#[derive(Clone, Copy, Serialize, Deserialize, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct TaskStats {
pub total_poll_count: u64,
pub total_slow_poll_count: u64,
pub total_poll_duration: Duration,
pub total_idle_duration: Duration,
pub total_scheduled_duration: Duration,
}
impl TaskStats {
const fn from_metrics(metrics: TaskMetrics) -> Self {
Self {
total_poll_count: metrics.total_poll_count,
total_slow_poll_count: metrics.total_slow_poll_count,
total_poll_duration: metrics.total_poll_duration,
total_idle_duration: metrics.total_idle_duration,
total_scheduled_duration: metrics.total_scheduled_duration,
}
}
}
#[cfg(feature = "redis-store")]
impl FromRedisValue for WorkerMetrics {
fn from_redis_value(v: redis::Value) -> Result<Self, ParsingError> {
use std::sync::Arc;
let mut bytes: Arc<[u8]> = redis::from_redis_value(v)?;
let bytes = Arc::make_mut(&mut bytes);
let metrics = simd_json::from_slice(bytes).map_err(to_redis_parsing_error)?;
Ok(metrics)
}
}
#[derive(Clone, Debug)]
pub struct HistogramWrapper(pub Histogram<u64>);
impl Serialize for HistogramWrapper {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut vec = Vec::new();
V2Serializer::new()
.serialize(self, &mut vec)
.map_err(serde::ser::Error::custom)?;
serializer.serialize_bytes(&vec)
}
}
impl<'a> Deserialize<'a> for HistogramWrapper {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'a>,
{
struct HdrVisitor;
impl<'de> Visitor<'de> for HdrVisitor {
type Value = HistogramWrapper;
fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_str("HDR V2 serialized bytes")
}
fn visit_bytes<E: de::Error>(self, mut v: &[u8]) -> Result<Self::Value, E> {
let h: Histogram<u64> = Deserializer::new()
.deserialize(&mut v)
.map_err(de::Error::custom)?;
Ok(HistogramWrapper(h))
}
fn visit_seq<A: de::SeqAccess<'de>>(self, mut seq: A) -> Result<Self::Value, A::Error> {
let mut buf = Vec::with_capacity(seq.size_hint().unwrap_or(0));
while let Some(byte) = seq.next_element::<u8>()? {
buf.push(byte);
}
self.visit_bytes(&buf)
}
}
deserializer.deserialize_bytes(HdrVisitor)
}
}
impl std::ops::DerefMut for HistogramWrapper {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl std::ops::Deref for HistogramWrapper {
type Target = Histogram<u64>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl PartialEq for HistogramWrapper {
fn eq(&self, other: &Self) -> bool {
let encode = |h: &Histogram<u64>| {
let mut buf = Vec::new();
V2Serializer::new().serialize(h, &mut buf).ok()?;
Some(buf)
};
encode(&self.0) == encode(&other.0)
}
}
impl Eq for HistogramWrapper {}
impl PartialOrd for HistogramWrapper {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for HistogramWrapper {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.len()
.cmp(&other.len())
.then_with(|| self.0.max().cmp(&other.0.max()))
}
}