use super::super::FieldLineValue;
use crate::headers::entry_name::EntryName;
use hashbrown::HashMap;
use std::sync::{
Mutex,
atomic::{AtomicI64, AtomicU64, Ordering},
};
#[derive(Debug)]
pub(super) struct ConnectionMetrics {
pub(super) priming_entries_sent: AtomicU64,
pub(super) priming_bytes_eager: AtomicU64,
pub(super) priming_references_total: AtomicU64,
pub(super) sections_total: AtomicU64,
pub(super) sections_with_primed_reference: AtomicU64,
pub(super) field_section_bytes: AtomicU64,
pub(super) encoder_stream_bytes_load_bearing: AtomicU64,
pub(super) primed_entries: Mutex<HashMap<u64, PrimedEntry>>,
pub(super) primed_acked_at_first_section: AtomicI64,
pub(super) primed_acked_section_sum: AtomicU64,
}
impl Default for ConnectionMetrics {
fn default() -> Self {
Self {
priming_entries_sent: AtomicU64::new(0),
priming_bytes_eager: AtomicU64::new(0),
priming_references_total: AtomicU64::new(0),
sections_total: AtomicU64::new(0),
sections_with_primed_reference: AtomicU64::new(0),
field_section_bytes: AtomicU64::new(0),
encoder_stream_bytes_load_bearing: AtomicU64::new(0),
primed_entries: Mutex::new(HashMap::new()),
primed_acked_at_first_section: AtomicI64::new(-1),
primed_acked_section_sum: AtomicU64::new(0),
}
}
}
#[derive(Debug)]
pub(super) struct PrimedEntry {
pub(super) name: EntryName<'static>,
pub(super) value: FieldLineValue<'static>,
pub(super) ref_count: u64,
}
impl ConnectionMetrics {
pub(super) fn record_primed_insert(
&self,
abs_idx: u64,
name: EntryName<'static>,
value: FieldLineValue<'static>,
wire_bytes: u64,
) {
self.priming_entries_sent.fetch_add(1, Ordering::Relaxed);
self.priming_bytes_eager
.fetch_add(wire_bytes, Ordering::Relaxed);
self.primed_entries.lock().unwrap().insert(
abs_idx,
PrimedEntry {
name,
value,
ref_count: 0,
},
);
}
pub(super) fn record_section(
&self,
field_section_bytes: u32,
encoder_stream_bytes: u32,
dynamic_refs: &[u64],
krc_at_encode: u64,
) {
let prev_sections = self.sections_total.fetch_add(1, Ordering::Relaxed);
let sections = prev_sections + 1;
self.field_section_bytes
.fetch_add(u64::from(field_section_bytes), Ordering::Relaxed);
self.encoder_stream_bytes_load_bearing
.fetch_add(u64::from(encoder_stream_bytes), Ordering::Relaxed);
let primed_count = self.priming_entries_sent.load(Ordering::Relaxed);
let primed_acked = krc_at_encode.min(primed_count);
self.primed_acked_section_sum
.fetch_add(primed_acked, Ordering::Relaxed);
if prev_sections == 0 {
#[allow(clippy::cast_possible_wrap)]
self.primed_acked_at_first_section
.store(primed_acked as i64, Ordering::Relaxed);
log::info!(
target: "qpack_metrics",
"first section recorded (field_section_bytes={field_section_bytes} \
encoder_stream_bytes={encoder_stream_bytes} \
dynamic_refs={} primed_acked={primed_acked}/{primed_count})",
dynamic_refs.len(),
);
}
if !dynamic_refs.is_empty() {
let mut entries = self.primed_entries.lock().unwrap();
if !entries.is_empty() {
let mut primed_refs_this_section: u64 = 0;
for abs_idx in dynamic_refs {
if let Some(entry) = entries.get_mut(abs_idx) {
entry.ref_count = entry.ref_count.saturating_add(1);
primed_refs_this_section += 1;
}
}
drop(entries);
if primed_refs_this_section > 0 {
self.sections_with_primed_reference
.fetch_add(1, Ordering::Relaxed);
self.priming_references_total
.fetch_add(primed_refs_this_section, Ordering::Relaxed);
}
}
}
if sections.is_multiple_of(10) {
self.log_summary_with_prefix("periodic snapshot");
}
}
pub(super) fn log_summary_with_prefix(&self, prefix: &str) {
let priming_entries_sent = self.priming_entries_sent.load(Ordering::Relaxed);
let priming_bytes_eager = self.priming_bytes_eager.load(Ordering::Relaxed);
let priming_references_total = self.priming_references_total.load(Ordering::Relaxed);
let sections_total = self.sections_total.load(Ordering::Relaxed);
let sections_with_primed_reference =
self.sections_with_primed_reference.load(Ordering::Relaxed);
let field_section_bytes = self.field_section_bytes.load(Ordering::Relaxed);
let encoder_stream_bytes_load_bearing = self
.encoder_stream_bytes_load_bearing
.load(Ordering::Relaxed);
let primed_acked_at_first_section =
self.primed_acked_at_first_section.load(Ordering::Relaxed);
let primed_acked_section_sum = self.primed_acked_section_sum.load(Ordering::Relaxed);
let entries = self.primed_entries.lock().unwrap();
let priming_entries_referenced = entries.values().filter(|e| e.ref_count > 0).count();
let first_section_acked = if primed_acked_at_first_section < 0 {
"n/a".to_string()
} else {
format!("{primed_acked_at_first_section}/{priming_entries_sent}")
};
let avg_acked = if sections_total > 0 && priming_entries_sent > 0 {
#[allow(clippy::cast_precision_loss)]
let avg = primed_acked_section_sum as f64 / sections_total as f64;
format!("{avg:.2}/{priming_entries_sent}")
} else {
"n/a".to_string()
};
log::info!(
target: "qpack_metrics",
"{prefix}: \
priming_entries_sent={priming_entries_sent} \
priming_entries_referenced={priming_entries_referenced} \
priming_bytes_eager={priming_bytes_eager} \
priming_references_total={priming_references_total} \
primed_acked_at_first_section={first_section_acked} \
primed_acked_per_section_avg={avg_acked} \
sections_total={sections_total} \
sections_with_primed_reference={sections_with_primed_reference} \
field_section_bytes={field_section_bytes} \
encoder_stream_bytes_load_bearing={encoder_stream_bytes_load_bearing}",
);
for (abs_idx, entry) in entries.iter() {
log::info!(
target: "qpack_metrics",
" primed entry: abs_idx={abs_idx} ref_count={} name={:?} value={:?}",
entry.ref_count,
entry.name,
String::from_utf8_lossy(entry.value.as_bytes()),
);
}
}
}