use std::{io, ops, process};
use std::net::IpAddr;
use std::sync::Arc;
use std::sync::atomic::{
AtomicU32, AtomicI64, AtomicU64, AtomicUsize, Ordering,
};
use std::sync::atomic::Ordering::Relaxed;
use std::time::{Duration, SystemTimeError};
use arc_swap::ArcSwap;
use chrono::{DateTime, TimeZone, Utc};
use rpki::uri;
use rpki::repository::tal::TalInfo;
use rpki::rtr::state::Serial;
use uuid::Uuid;
use crate::collector::{HttpStatus, SnapshotReason};
use crate::log::LogBook;
use crate::utils::sync::Mutex;
#[derive(Debug)]
pub struct Metrics {
pub time: DateTime<Utc>,
pub rsync: Vec<RsyncModuleMetrics>,
pub rrdp: Vec<RrdpRepositoryMetrics>,
pub tals: Vec<TalMetrics>,
pub repositories: Vec<RepositoryMetrics>,
pub publication: PublicationMetrics,
pub local: PayloadMetrics,
pub snapshot: SnapshotMetrics,
pub pub_point_logs: Vec<(uri::Rsync, LogBook)>,
}
impl Metrics {
pub fn new() -> Self {
Metrics {
time: Utc::now(),
rsync: Vec::new(),
rrdp: Vec::new(),
tals: Vec::new(),
repositories: Vec::new(),
publication: Default::default(),
local: Default::default(),
snapshot: Default::default(),
pub_point_logs: Default::default(),
}
}
pub fn finalize(&mut self) {
for metric in &mut self.tals {
metric.finalize();
}
for metric in &mut self.repositories {
metric.finalize();
}
self.local.finalize();
self.snapshot.finalize();
self.pub_point_logs.sort_by(|left, right| {
left.0.as_str().cmp(right.0.as_str())
});
}
pub fn timestamp(&self) -> i64 {
self.time.timestamp()
}
pub fn rsync_complete(&self) -> bool {
for metrics in &self.rsync {
match metrics.status {
Ok(status) if !status.success() => return false,
Err(_) => return false,
_ => { }
}
}
true
}
pub fn has_rrdp_logs(&self) -> bool {
self.rrdp.iter().any(|item| item.log_book.is_some())
}
pub fn has_rsync_logs(&self) -> bool {
self.rsync.iter().any(|item| item.log_book.is_some())
}
}
impl Default for Metrics {
fn default() -> Self {
Self::new()
}
}
impl AsRef<Self> for Metrics {
fn as_ref(&self) -> &Self {
self
}
}
#[derive(Clone, Debug)]
pub struct RrdpRepositoryMetrics {
pub notify_uri: uri::Https,
pub notify_status: HttpStatus,
pub session: Option<Uuid>,
pub serial: Option<u64>,
pub snapshot_reason: Option<SnapshotReason>,
pub payload_status: Option<HttpStatus>,
pub duration: Result<Duration, SystemTimeError>,
pub log_book: Option<LogBook>,
}
impl RrdpRepositoryMetrics {
pub fn new(notify_uri: uri::Https) -> Self {
RrdpRepositoryMetrics {
notify_uri,
notify_status: HttpStatus::Error,
session: None,
serial: None,
snapshot_reason: None,
payload_status: None,
duration: Ok(Duration::from_secs(0)),
log_book: None,
}
}
pub fn status(&self) -> HttpStatus {
if self.notify_status.is_success() {
if let Some(status) = self.payload_status {
status
}
else {
self.notify_status
}
}
else {
self.notify_status
}
}
}
#[derive(Debug)]
pub struct RsyncModuleMetrics {
pub module: uri::Rsync,
pub status: Result<process::ExitStatus, io::Error>,
pub duration: Result<Duration, SystemTimeError>,
pub log_book: Option<LogBook>,
}
#[derive(Clone, Debug)]
pub struct TalMetrics {
pub tal: Arc<TalInfo>,
pub publication: PublicationMetrics,
pub payload: PayloadMetrics,
}
impl TalMetrics {
pub fn new(tal: Arc<TalInfo>) -> Self {
TalMetrics {
tal,
publication: Default::default(),
payload: Default::default(),
}
}
pub fn finalize(&mut self) {
self.payload.finalize();
}
pub fn name(&self) -> &str {
self.tal.name()
}
}
#[derive(Clone, Debug)]
pub struct RepositoryMetrics {
pub uri: String,
pub publication: PublicationMetrics,
pub payload: PayloadMetrics,
}
impl RepositoryMetrics {
pub fn new(uri: String) -> Self {
RepositoryMetrics {
uri,
publication: Default::default(),
payload: Default::default(),
}
}
pub fn finalize(&mut self) {
self.payload.finalize();
}
}
#[derive(Clone, Debug, Default)]
pub struct PublicationMetrics {
pub valid_points: u32,
pub rejected_points: u32,
pub valid_manifests: u32,
pub invalid_manifests: u32,
pub premature_manifests: u32,
pub stale_manifests: u32,
pub missing_manifests: u32,
pub valid_crls: u32,
pub invalid_crls: u32,
pub stale_crls: u32,
pub stray_crls: u32,
pub valid_ca_certs: u32,
pub valid_router_certs: u32,
pub invalid_certs: u32,
pub valid_roas: u32,
pub invalid_roas: u32,
pub valid_gbrs: u32,
pub invalid_gbrs: u32,
pub valid_aspas: u32,
pub invalid_aspas: u32,
pub others: u32,
}
impl PublicationMetrics {
pub fn stale_objects(&self) -> u32 {
self.stale_manifests + self.stale_crls
}
}
impl ops::Add for PublicationMetrics {
type Output = Self;
fn add(mut self, other: Self) -> Self::Output {
self += other;
self
}
}
impl<'a> ops::AddAssign<&'a Self> for PublicationMetrics {
fn add_assign(&mut self, other: &'a Self) {
self.valid_points += other.valid_points;
self.rejected_points += other.rejected_points;
self.valid_manifests += other.valid_manifests;
self.invalid_manifests += other.invalid_manifests;
self.premature_manifests += other.premature_manifests;
self.stale_manifests += other.stale_manifests;
self.missing_manifests += other.missing_manifests;
self.valid_crls += other.valid_crls;
self.invalid_crls += other.invalid_crls;
self.stale_crls += other.stale_crls;
self.stray_crls += other.stray_crls;
self.valid_ca_certs += other.valid_ca_certs;
self.valid_router_certs += other.valid_router_certs;
self.invalid_certs += other.invalid_certs;
self.valid_roas += other.valid_roas;
self.invalid_roas += other.invalid_roas;
self.valid_gbrs += other.valid_gbrs;
self.invalid_gbrs += other.invalid_gbrs;
self.valid_aspas += other.valid_aspas;
self.invalid_aspas += other.invalid_aspas;
self.others += other.others;
}
}
impl ops::AddAssign for PublicationMetrics {
fn add_assign(&mut self, other: Self) {
self.add_assign(&other)
}
}
#[derive(Clone, Debug, Default)]
pub struct SnapshotMetrics {
pub payload: PayloadMetrics,
pub large_aspas: u32,
}
impl SnapshotMetrics {
pub fn finalize(&mut self) {
self.payload.finalize();
self.payload.aspas.valid =
self.payload.aspas.valid.saturating_sub(self.large_aspas);
}
}
#[derive(Clone, Debug, Default)]
pub struct PayloadMetrics {
pub v4_origins: VrpMetrics,
pub v6_origins: VrpMetrics,
pub origins: VrpMetrics,
pub router_keys: VrpMetrics,
pub aspas: VrpMetrics,
pub large_aspas: u32,
pub all: VrpMetrics,
}
impl PayloadMetrics {
pub fn finalize(&mut self) {
self.origins = self.v4_origins.clone();
self.origins += &self.v6_origins;
self.all = self.origins.clone();
self.all += &self.router_keys;
self.all += &self.aspas;
}
pub fn vrps(&self) -> &VrpMetrics {
&self.origins
}
}
impl ops::Add for PayloadMetrics {
type Output = Self;
fn add(mut self, other: Self) -> Self::Output {
self += other;
self
}
}
impl<'a> ops::AddAssign<&'a Self> for PayloadMetrics {
fn add_assign(&mut self, other: &'a Self) {
self.v4_origins += &other.v4_origins;
self.v6_origins += &other.v6_origins;
self.origins += &other.origins;
self.router_keys += &other.router_keys;
self.aspas += &other.aspas;
self.all += &other.all;
}
}
impl ops::AddAssign for PayloadMetrics {
fn add_assign(&mut self, other: Self) {
self.add_assign(&other)
}
}
#[derive(Clone, Debug, Default)]
pub struct VrpMetrics {
pub valid: u32,
pub marked_unsafe: u32,
pub locally_filtered: u32,
pub duplicate: u32,
pub contributed: u32,
}
impl ops::Add for VrpMetrics {
type Output = Self;
fn add(mut self, other: Self) -> Self::Output {
self += other;
self
}
}
impl<'a> ops::AddAssign<&'a Self> for VrpMetrics {
fn add_assign(&mut self, other: &'a Self) {
self.valid += other.valid;
self.marked_unsafe += other.marked_unsafe;
self.locally_filtered += other.locally_filtered;
self.duplicate += other.duplicate;
self.contributed += other.contributed;
}
}
impl ops::AddAssign for VrpMetrics {
fn add_assign(&mut self, other: Self) {
self.add_assign(&other)
}
}
#[derive(Debug, Default)]
pub struct HttpServerMetrics {
conn_open: AtomicU64,
conn_close: AtomicU64,
bytes_read: AtomicU64,
bytes_written: AtomicU64,
requests: AtomicU64,
}
impl HttpServerMetrics {
pub fn conn_open(&self) -> u64 {
self.conn_open.load(Ordering::Relaxed)
}
pub fn inc_conn_open(&self) {
self.conn_open.fetch_add(1, Ordering::Relaxed);
}
pub fn conn_close(&self) -> u64 {
self.conn_close.load(Ordering::Relaxed)
}
pub fn inc_conn_close(&self) {
self.conn_close.fetch_add(1, Ordering::Relaxed);
}
pub fn bytes_read(&self) -> u64 {
self.bytes_read.load(Ordering::Relaxed)
}
pub fn inc_bytes_read(&self, count: u64) {
self.bytes_read.fetch_add(count, Ordering::Relaxed);
}
pub fn bytes_written(&self) -> u64 {
self.bytes_written.load(Ordering::Relaxed)
}
pub fn inc_bytes_written(&self, count: u64) {
self.bytes_written.fetch_add(count, Ordering::Relaxed);
}
pub fn requests(&self) -> u64 {
self.requests.load(Ordering::Relaxed)
}
pub fn inc_requests(&self) {
self.requests.fetch_add(1, Ordering::Relaxed);
}
}
pub struct RtrServerMetrics {
global: Arc<RtrMetricsData>,
client: Option<RtrPerAddrMetrics>,
}
impl RtrServerMetrics {
pub fn new(detailed: bool) -> Self {
Self {
global: Default::default(),
client: detailed.then(Default::default)
}
}
pub fn get_client(&self, addr: IpAddr) -> RtrClientMetrics {
RtrClientMetrics {
global: self.global.clone(),
client: self.client.as_ref().map(|client| client.get(addr)),
}
}
pub fn global(&self) -> Arc<RtrMetricsData> {
self.global.clone()
}
#[allow(clippy::type_complexity)]
pub fn clients(
&self
) -> Option<Arc<Vec<(IpAddr, Arc<RtrMetricsData>)>>> {
self.client.as_ref().map(|client| client.addrs.load().clone())
}
}
#[derive(Default)]
pub struct RtrPerAddrMetrics {
addrs: ArcSwap<Vec<(IpAddr, Arc<RtrMetricsData>)>>,
write: Mutex<()>,
}
impl RtrPerAddrMetrics {
fn get(&self, addr: IpAddr) -> Arc<RtrMetricsData> {
let addrs = self.addrs.load();
if let Ok(idx) = addrs.binary_search_by(|x| x.0.cmp(&addr)) {
return addrs[idx].1.clone()
}
let _write = self.write.lock();
let addrs = self.addrs.load();
let idx = match addrs.binary_search_by(|x| x.0.cmp(&addr)) {
Ok(idx) => return addrs[idx].1.clone(),
Err(idx) => idx,
};
let mut new_addrs = Vec::with_capacity(addrs.len() + 1);
new_addrs.extend_from_slice(&addrs[..idx]);
new_addrs.push((addr, Default::default()));
new_addrs.extend_from_slice(&addrs[idx..]);
let res = new_addrs[idx].1.clone();
self.addrs.store(new_addrs.into());
res
}
}
#[derive(Debug)]
pub struct RtrClientMetrics {
global: Arc<RtrMetricsData>,
client: Option<Arc<RtrMetricsData>>,
}
impl RtrClientMetrics {
pub fn update(&self, op: impl Fn(&RtrMetricsData)) {
op(&self.global);
if let Some(client) = self.client.as_ref() {
op(client)
}
}
}
#[derive(Debug)]
pub struct RtrMetricsData {
current_connections: AtomicUsize,
serial: AtomicU32,
updated: AtomicI64,
last_reset: AtomicI64,
reset_queries: AtomicU32,
serial_queries: AtomicU32,
bytes_read: AtomicU64,
bytes_written: AtomicU64,
}
impl Default for RtrMetricsData {
fn default() -> Self {
Self {
current_connections: AtomicUsize::new(0),
serial: AtomicU32::new(u32::MAX),
updated: AtomicI64::new(i64::MIN),
last_reset: AtomicI64::new(i64::MIN),
reset_queries: AtomicU32::new(0),
serial_queries: AtomicU32::new(0),
bytes_read: AtomicU64::new(0),
bytes_written: AtomicU64::new(0),
}
}
}
impl RtrMetricsData {
pub fn current_connections(&self) -> usize {
self.current_connections.load(Relaxed)
}
pub fn inc_current_connections(&self) {
self.current_connections.fetch_add(1, Relaxed);
}
pub fn dec_current_connections(&self) {
self.current_connections.fetch_sub(1, Relaxed);
}
pub fn serial(&self) -> Option<u32> {
match self.serial.load(Relaxed) {
u32::MAX => None,
other => Some(other),
}
}
pub fn update_now(&self, serial: Serial, reset: bool) {
self.serial.store(serial.into(), Relaxed);
self.updated.store(Utc::now().timestamp(), Relaxed);
if reset {
self.last_reset.store(Utc::now().timestamp(), Relaxed);
self.reset_queries.fetch_add(1, Relaxed);
}
else {
self.serial_queries.fetch_add(1, Relaxed);
}
}
pub fn updated(&self) -> Option<DateTime<Utc>> {
match self.updated.load(Relaxed) {
i64::MIN => None,
other => Utc.timestamp_opt(other, 0).single()
}
}
pub fn last_reset(&self) -> Option<DateTime<Utc>> {
match self.last_reset.load(Relaxed) {
i64::MIN => None,
other => Utc.timestamp_opt(other, 0).single()
}
}
pub fn reset_queries(&self) -> u32 {
self.reset_queries.load(Relaxed)
}
pub fn serial_queries(&self) -> u32 {
self.serial_queries.load(Relaxed)
}
pub fn bytes_read(&self) -> u64 {
self.bytes_read.load(Relaxed)
}
pub fn inc_bytes_read(&self, count: u64) {
self.bytes_read.fetch_add(count, Relaxed);
}
pub fn bytes_written(&self) -> u64 {
self.bytes_written.load(Relaxed)
}
pub fn inc_bytes_written(&self, count: u64) {
self.bytes_written.fetch_add(count, Relaxed);
}
}