use std::{cmp, io, ops, process, slice};
use std::iter::Peekable;
use std::net::IpAddr;
use std::sync::{Arc};
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicI64, AtomicU64, Ordering};
use std::time::{Duration, SystemTimeError};
use chrono::{DateTime, TimeZone, Utc};
use rpki::uri;
use rpki::repository::tal::TalInfo;
use rpki::rtr::state::Serial;
use tokio::sync::Mutex;
use uuid::Uuid;
use crate::collector::{HttpStatus, SnapshotReason};
#[derive(Debug)]
pub struct Metrics {
pub time: DateTime<Utc>,
pub rsync: Vec<RsyncModuleMetrics>,
pub rrdp: Vec<RrdpRepositoryMetrics>,
pub tals: Vec<TalMetrics>,
pub repositories: Vec<RepositoryMetrics>,
pub publication: PublicationMetrics,
pub local: VrpMetrics,
pub vrps: VrpMetrics,
}
impl Metrics {
pub fn new() -> Self {
Metrics {
time: Utc::now(),
rsync: Vec::new(),
rrdp: Vec::new(),
tals: Vec::new(),
repositories: Vec::new(),
publication: Default::default(),
local: Default::default(),
vrps: Default::default(),
}
}
pub fn timestamp(&self) -> i64 {
self.time.timestamp()
}
pub fn rsync_complete(&self) -> bool {
for metrics in &self.rsync {
match metrics.status {
Ok(status) if !status.success() => return false,
Err(_) => return false,
_ => { }
}
}
true
}
}
impl Default for Metrics {
fn default() -> Self {
Self::new()
}
}
impl AsRef<Self> for Metrics {
fn as_ref(&self) -> &Self {
self
}
}
#[derive(Clone, Debug)]
pub struct RrdpRepositoryMetrics {
pub notify_uri: uri::Https,
pub notify_status: HttpStatus,
pub session: Option<Uuid>,
pub serial: Option<u64>,
pub snapshot_reason: Option<SnapshotReason>,
pub payload_status: Option<HttpStatus>,
pub duration: Result<Duration, SystemTimeError>,
}
impl RrdpRepositoryMetrics {
pub fn new(notify_uri: uri::Https) -> Self {
RrdpRepositoryMetrics {
notify_uri,
notify_status: HttpStatus::Error,
session: None,
serial: None,
snapshot_reason: None,
payload_status: None,
duration: Ok(Duration::from_secs(0))
}
}
pub fn status(&self) -> HttpStatus {
if self.notify_status.is_success() {
if let Some(status) = self.payload_status {
status
}
else {
self.notify_status
}
}
else {
self.notify_status
}
}
}
#[derive(Debug)]
pub struct RsyncModuleMetrics {
pub module: uri::Rsync,
pub status: Result<process::ExitStatus, io::Error>,
pub duration: Result<Duration, SystemTimeError>,
}
#[derive(Clone, Debug)]
pub struct TalMetrics {
pub tal: Arc<TalInfo>,
pub publication: PublicationMetrics,
pub vrps: VrpMetrics,
}
impl TalMetrics {
pub fn new(tal: Arc<TalInfo>) -> Self {
TalMetrics {
tal,
publication: Default::default(),
vrps: Default::default(),
}
}
pub fn name(&self) -> &str {
self.tal.name()
}
}
#[derive(Clone, Debug)]
pub struct RepositoryMetrics {
pub uri: String,
pub publication: PublicationMetrics,
pub vrps: VrpMetrics,
}
impl RepositoryMetrics {
pub fn new(uri: String) -> Self {
RepositoryMetrics {
uri,
publication: Default::default(),
vrps: Default::default(),
}
}
}
#[derive(Clone, Debug, Default)]
pub struct PublicationMetrics {
pub valid_points: u32,
pub rejected_points: u32,
pub valid_manifests: u32,
pub invalid_manifests: u32,
pub stale_manifests: u32,
pub missing_manifests: u32,
pub valid_crls: u32,
pub invalid_crls: u32,
pub stale_crls: u32,
pub stray_crls: u32,
pub valid_ca_certs: u32,
pub valid_ee_certs: u32,
pub invalid_certs: u32,
pub valid_roas: u32,
pub invalid_roas: u32,
pub valid_gbrs: u32,
pub invalid_gbrs: u32,
pub others: u32,
}
impl PublicationMetrics {
pub fn stale_objects(&self) -> u32 {
self.stale_manifests + self.stale_crls
}
}
impl ops::Add for PublicationMetrics {
type Output = Self;
fn add(mut self, other: Self) -> Self::Output {
self += other;
self
}
}
impl<'a> ops::AddAssign<&'a Self> for PublicationMetrics {
fn add_assign(&mut self, other: &'a Self) {
self.valid_points += other.valid_points;
self.rejected_points += other.rejected_points;
self.valid_manifests += other.valid_manifests;
self.invalid_manifests += other.invalid_manifests;
self.stale_manifests += other.stale_manifests;
self.missing_manifests += other.missing_manifests;
self.valid_crls += other.valid_crls;
self.invalid_crls += other.invalid_crls;
self.stale_crls += other.stale_crls;
self.stray_crls += other.stray_crls;
self.valid_ca_certs += other.valid_ca_certs;
self.valid_ee_certs += other.valid_ee_certs;
self.invalid_certs += other.invalid_certs;
self.valid_roas += other.valid_roas;
self.invalid_roas += other.invalid_roas;
self.valid_gbrs += other.valid_gbrs;
self.invalid_gbrs += other.invalid_gbrs;
self.others += other.others;
}
}
impl ops::AddAssign for PublicationMetrics {
fn add_assign(&mut self, other: Self) {
self.add_assign(&other)
}
}
#[derive(Clone, Debug, Default)]
pub struct VrpMetrics {
pub valid: u32,
pub marked_unsafe: u32,
pub locally_filtered: u32,
pub duplicate: u32,
pub contributed: u32,
}
impl ops::Add for VrpMetrics {
type Output = Self;
fn add(mut self, other: Self) -> Self::Output {
self += other;
self
}
}
impl<'a> ops::AddAssign<&'a Self> for VrpMetrics {
fn add_assign(&mut self, other: &'a Self) {
self.valid += other.valid;
self.marked_unsafe += other.marked_unsafe;
self.locally_filtered += other.locally_filtered;
self.duplicate += other.duplicate;
self.contributed += other.contributed;
}
}
impl ops::AddAssign for VrpMetrics {
fn add_assign(&mut self, other: Self) {
self.add_assign(&other)
}
}
#[derive(Debug, Default)]
pub struct HttpServerMetrics {
conn_open: AtomicU64,
conn_close: AtomicU64,
bytes_read: AtomicU64,
bytes_written: AtomicU64,
requests: AtomicU64,
}
impl HttpServerMetrics {
pub fn conn_open(&self) -> u64 {
self.conn_open.load(Ordering::Relaxed)
}
pub fn inc_conn_open(&self) {
self.conn_open.fetch_add(1, Ordering::Relaxed);
}
pub fn conn_close(&self) -> u64 {
self.conn_close.load(Ordering::Relaxed)
}
pub fn inc_conn_close(&self) {
self.conn_close.fetch_add(1, Ordering::Relaxed);
}
pub fn bytes_read(&self) -> u64 {
self.bytes_read.load(Ordering::Relaxed)
}
pub fn inc_bytes_read(&self, count: u64) {
self.bytes_read.fetch_add(count, Ordering::Relaxed);
}
pub fn bytes_written(&self) -> u64 {
self.bytes_written.load(Ordering::Relaxed)
}
pub fn inc_bytes_written(&self, count: u64) {
self.bytes_written.fetch_add(count, Ordering::Relaxed);
}
pub fn requests(&self) -> u64 {
self.requests.load(Ordering::Relaxed)
}
pub fn inc_requests(&self) {
self.requests.fetch_add(1, Ordering::Relaxed);
}
}
#[derive(Clone, Debug)]
pub struct SharedRtrServerMetrics {
metrics: Arc<Mutex<RtrServerMetrics>>,
detailed: bool,
}
impl SharedRtrServerMetrics {
pub fn new(detailed: bool) -> Self {
SharedRtrServerMetrics {
metrics: Default::default(),
detailed
}
}
pub async fn add_client(&self, client: Arc<RtrClientMetrics>) {
let mut metrics = self.metrics.lock().await;
metrics.insert_client(client);
}
pub fn detailed(&self) -> bool {
self.detailed
}
pub async fn read(
&self
) -> impl ops::Deref<Target = RtrServerMetrics> + '_ {
self.metrics.lock().await
}
}
#[derive(Clone, Debug, Default)]
pub struct RtrServerMetrics {
clients: Vec<Arc<RtrClientMetrics>>,
}
impl RtrServerMetrics {
pub fn current_connections(&self) -> usize {
self.clients.iter().filter(|client| client.is_open()).count()
}
pub fn bytes_read(&self) -> u64 {
self.clients.iter().map(|client| client.bytes_read()).sum()
}
pub fn bytes_written(&self) -> u64 {
self.clients.iter().map(|client| client.bytes_written()).sum()
}
pub fn iter_clients(
&self
) -> impl Iterator<Item = &RtrClientMetrics> + '_ {
self.clients.iter().map(AsRef::as_ref)
}
pub fn fold_clients<'a, B, F>(
&'a self, init: B, fold: F
) -> impl Iterator<Item = (IpAddr, B)> + 'a
where
B: Clone + 'a,
F: FnMut(&mut B, &RtrClientMetrics) + 'a
{
FoldedRtrClientsIter::new(self, init, fold)
}
fn insert_client(&mut self, client: Arc<RtrClientMetrics>) {
let mut collapse = false;
let mut slice = self.clients.as_slice();
while let Some((first, tail)) = slice.split_first() {
slice = tail;
if first.open.load(Ordering::Relaxed) {
continue
}
for item in tail {
if item.addr != first.addr {
break
}
if !item.open.load(Ordering::Relaxed) {
collapse = true;
break;
}
}
if collapse {
break
}
}
if collapse {
let mut new_clients = Vec::new();
let mut pending: Option<Arc<RtrClientMetrics>> = None;
let mut client = Some(client);
for item in self.clients.drain(..) {
if let Some(addr) = client.as_ref().map(|c| c.addr) {
if addr < item.addr {
if let Some(client) = client.take() {
new_clients.push(client)
}
}
}
if item.open.load(Ordering::Relaxed) {
new_clients.push(item);
continue;
}
if let Some(pending_item) = pending.take() {
if pending_item.addr == item.addr {
pending = Some(
Arc::new(pending_item.collapse_closed(&item))
);
}
else {
new_clients.push(pending_item);
pending = Some(item);
}
}
else {
pending = Some(item);
}
}
if let Some(pending) = pending.take() {
new_clients.push(pending)
}
self.clients = new_clients;
}
else {
let index = match self.clients.binary_search_by(|item| {
item.addr.cmp(&client.addr)
}) {
Ok(index) => index + 1,
Err(index) => index
};
self.clients.insert(index, client);
}
}
}
#[derive(Debug)]
pub struct RtrClientMetrics {
addr: IpAddr,
open: AtomicBool,
serial: AtomicU32,
updated: AtomicI64,
bytes_read: AtomicU64,
bytes_written: AtomicU64,
}
impl RtrClientMetrics {
pub fn new(addr: IpAddr) -> Self {
RtrClientMetrics {
addr,
open: AtomicBool::new(true),
serial: AtomicU32::new(u32::MAX),
updated: AtomicI64::new(i64::MIN),
bytes_read: AtomicU64::new(0),
bytes_written: AtomicU64::new(0),
}
}
pub fn is_open(&self) -> bool {
self.open.load(Ordering::Relaxed)
}
pub fn close(&self) {
self.open.store(false, Ordering::Relaxed)
}
pub fn bytes_read(&self) -> u64 {
self.bytes_read.load(Ordering::Relaxed)
}
pub fn inc_bytes_read(&self, count: u64) {
self.bytes_read.fetch_add(count, Ordering::Relaxed);
}
pub fn bytes_written(&self) -> u64 {
self.bytes_written.load(Ordering::Relaxed)
}
pub fn inc_bytes_written(&self, count: u64) {
self.bytes_written.fetch_add(count, Ordering::Relaxed);
}
pub fn serial(&self) -> Option<Serial> {
let serial = self.serial.load(Ordering::Relaxed);
if serial == u32::MAX {
None
}
else {
Some(serial.into())
}
}
pub fn updated(&self) -> Option<DateTime<Utc>> {
let updated = self.updated.load(Ordering::Relaxed);
if updated == i64::MIN {
None
}
else {
Some(Utc.timestamp(updated, 0))
}
}
pub fn update_now(&self, serial: Serial) {
self.serial.store(serial.into(), Ordering::Relaxed);
self.updated.store(Utc::now().timestamp(), Ordering::Relaxed);
}
fn collapse_closed(&self, other: &Self) -> Self {
let left_serial = self.serial.load(Ordering::Relaxed);
let right_serial = other.serial.load(Ordering::Relaxed);
RtrClientMetrics {
addr: self.addr,
open: AtomicBool::new(false),
serial: AtomicU32::new(
if left_serial == u32::MAX {
right_serial
}
else if right_serial == u32::MAX {
left_serial
}
else {
cmp::max(left_serial, right_serial)
}
),
updated: AtomicI64::new(
cmp::max(
self.updated.load(Ordering::Relaxed),
other.updated.load(Ordering::Relaxed)
)
),
bytes_read: AtomicU64::new(
self.bytes_read.load(Ordering::Relaxed)
+ other.bytes_read.load(Ordering::Relaxed)
),
bytes_written: AtomicU64::new(
self.bytes_written.load(Ordering::Relaxed)
+ other.bytes_written.load(Ordering::Relaxed)
),
}
}
}
struct FoldedRtrClientsIter<'a, B, F> {
clients: Peekable<slice::Iter<'a, Arc<RtrClientMetrics>>>,
init: B,
fold_fn: F
}
impl<'a, B, F> FoldedRtrClientsIter<'a, B, F> {
fn new(metrics: &'a RtrServerMetrics, init: B, fold_fn: F) -> Self {
FoldedRtrClientsIter {
clients: metrics.clients.iter().peekable(),
init,
fold_fn
}
}
}
impl<'a, B, F> Iterator for FoldedRtrClientsIter<'a, B, F>
where
B: Clone + 'a,
F: FnMut(&mut B, &RtrClientMetrics) + 'a
{
type Item = (IpAddr, B);
fn next(&mut self) -> Option<Self::Item> {
let first = self.clients.next()?;
let addr = first.addr;
let mut value = self.init.clone();
(self.fold_fn)(&mut value, first);
loop {
match self.clients.peek() {
Some(client) if client.addr == addr => {
let client = match self.clients.next() {
Some(client) => client,
None => break,
};
(self.fold_fn)(&mut value, client);
}
_ => break
}
}
Some((addr, value))
}
}
#[cfg(test)]
mod test {
use super::*;
use std::str::FromStr;
#[test]
fn insert_rtr_metrics() {
let addr1 = IpAddr::from_str("10.0.0.1").unwrap();
let addr2 = IpAddr::from_str("10.0.0.2").unwrap();
let addr3 = IpAddr::from_str("10.0.0.3").unwrap();
let addr4 = IpAddr::from_str("10.0.0.4").unwrap();
assert!(addr1 < addr2);
assert!(addr2 < addr3);
assert!(addr3 < addr4);
fn client(addr: IpAddr) -> Arc<RtrClientMetrics> {
RtrClientMetrics::new(addr).into()
}
fn assert_sequence(metrics: &RtrServerMetrics, addrs: &[IpAddr]) {
assert_eq!(metrics.clients.len(), addrs.len());
metrics.clients.iter().zip(addrs.iter()).for_each(|(m, a)| {
assert_eq!(m.addr, *a);
});
}
let mut metrics = RtrServerMetrics::default();
metrics.insert_client(client(addr4));
metrics.insert_client(client(addr2));
metrics.insert_client(client(addr4));
metrics.insert_client(client(addr3));
assert_sequence(&metrics, &[addr2, addr3, addr4, addr4]);
metrics.insert_client(client(addr3));
metrics.insert_client(client(addr3));
assert_sequence(&metrics, &[addr2, addr3, addr3, addr3, addr4, addr4]);
metrics.clients[1].inc_bytes_read(10);
metrics.clients[1].close();
metrics.clients[1].inc_bytes_read(40);
metrics.clients[3].close();
metrics.clients[4].close();
metrics.clients[5].close();
metrics.insert_client(client(addr1));
assert_sequence(&metrics, &[addr1, addr2, addr3, addr3, addr4]);
let (open3, closed3) = if metrics.clients[2].is_open() {
(&metrics.clients[2], &metrics.clients[3])
}
else {
(&metrics.clients[3], &metrics.clients[2])
};
assert!(open3.is_open());
assert!(!closed3.is_open());
assert_eq!(open3.bytes_read(), 0);
assert_eq!(closed3.bytes_read(), 50);
}
}