#![allow(missing_docs)]
#[cfg(feature = "trace")]
mod implementation {
use super::super::event::EventData;
use super::super::{Event, EventLog, TraceId};
use std::collections::HashMap;
use std::sync::Arc;
pub struct TraceQuery {
log: Arc<EventLog>,
}
impl TraceQuery {
#[allow(dead_code)]
pub fn new(log: Arc<EventLog>) -> Self {
TraceQuery { log }
}
#[allow(dead_code)]
pub fn get_trace(&self, trace_id: TraceId) -> Vec<Event> {
self.log.get_events_by_trace(trace_id)
}
#[allow(dead_code)]
pub fn recent(&self, count: usize) -> Vec<Event> {
self.log.get_recent_events(count)
}
#[allow(dead_code)]
pub fn time_range(&self, start: u64, end: u64) -> Vec<Event> {
self.log.get_events_in_range(start, end)
}
#[allow(dead_code)]
pub fn event_count(&self) -> u64 {
self.log.event_count()
}
#[allow(dead_code)]
pub fn analyze_connection(&self, trace_id: TraceId) -> ConnectionAnalysis {
let events = self.get_trace(trace_id);
let mut analysis = ConnectionAnalysis::default();
for event in events {
match &event.event_data {
EventData::PacketSent { size, .. } => {
analysis.packets_sent += 1;
analysis.bytes_sent += *size as u64;
}
EventData::PacketReceived { size, .. } => {
analysis.packets_received += 1;
analysis.bytes_received += *size as u64;
}
EventData::PacketLost { .. } => {
analysis.packets_lost += 1;
}
EventData::ConnEstablished { rtt, .. } => {
analysis.initial_rtt = Some(*rtt);
}
_ => {}
}
}
if analysis.packets_sent > 0 {
analysis.loss_rate = analysis.packets_lost as f32 / analysis.packets_sent as f32;
}
analysis
}
#[allow(dead_code)]
pub fn find_problematic_traces(&self, recent_count: usize) -> Vec<TraceId> {
let events = self.recent(recent_count);
let mut problematic = Vec::new();
let mut trace_issues = HashMap::new();
for event in events {
match &event.event_data {
EventData::PacketLost { .. } => {
*trace_issues.entry(event.trace_id).or_insert(0) += 1;
}
EventData::StreamClosed { error_code, .. } if *error_code != 0 => {
*trace_issues.entry(event.trace_id).or_insert(0) += 10;
}
_ => {}
}
}
for (trace_id, issue_count) in trace_issues {
if issue_count > 5 {
problematic.push(trace_id);
}
}
problematic
}
}
#[derive(Debug, Default)]
pub struct ConnectionAnalysis {
pub packets_sent: u64,
pub packets_received: u64,
pub packets_lost: u64,
pub bytes_sent: u64,
pub bytes_received: u64,
pub loss_rate: f32,
pub initial_rtt: Option<u32>,
}
}
#[cfg(not(feature = "trace"))]
mod implementation {
use super::super::{Event, EventLog, TraceId};
use std::sync::Arc;
#[allow(dead_code)]
pub(super) struct TraceQuery;
impl TraceQuery {
#[allow(dead_code)]
pub(super) fn new(_log: Arc<EventLog>) -> Self {
Self
}
#[allow(dead_code)]
pub(super) fn get_trace(&self, _trace_id: TraceId) -> Vec<Event> {
vec![]
}
#[allow(dead_code)]
pub(super) fn recent(&self, _count: usize) -> Vec<Event> {
vec![]
}
#[allow(dead_code)]
pub(super) fn time_range(&self, _start: u64, _end: u64) -> Vec<Event> {
vec![]
}
#[allow(dead_code)]
pub(super) fn event_count(&self) -> u64 {
0
}
#[allow(dead_code)]
pub(super) fn analyze_connection(&self, _trace_id: TraceId) -> ConnectionAnalysis {
ConnectionAnalysis::default()
}
#[allow(dead_code)]
pub(super) fn find_problematic_traces(&self, _recent_count: usize) -> Vec<TraceId> {
vec![]
}
}
#[derive(Debug, Default)]
#[allow(dead_code)]
pub(super) struct ConnectionAnalysis {
pub packets_sent: u64,
pub packets_received: u64,
pub packets_lost: u64,
pub bytes_sent: u64,
pub bytes_received: u64,
pub loss_rate: f32,
pub initial_rtt: Option<u32>,
}
}
#[cfg(feature = "trace")]
pub use implementation::*;
#[cfg(test)]
mod tests {
#[cfg(feature = "trace")]
use super::*;
#[cfg(feature = "trace")]
use crate::tracing::{Event, EventData, EventLog, TraceId};
#[cfg(feature = "trace")]
use std::sync::Arc;
#[test]
#[cfg(feature = "trace")]
fn test_query_interface() {
let log = Arc::new(EventLog::new());
let query = TraceQuery::new(log.clone());
let trace_id = TraceId::new();
log.log(Event::conn_init(
"127.0.0.1:8080".parse().unwrap(),
trace_id,
));
log.log(Event::packet_sent(1200, 1, trace_id));
log.log(Event::packet_sent(1200, 2, trace_id));
log.log(Event::packet_received(1200, 1, trace_id));
let analysis = query.analyze_connection(trace_id);
assert_eq!(analysis.packets_sent, 2);
assert_eq!(analysis.packets_received, 1);
assert_eq!(analysis.bytes_sent, 2400);
assert_eq!(analysis.bytes_received, 1200);
}
#[test]
#[cfg(feature = "trace")]
fn query_recent_time_range_and_count_reflect_logged_events() {
let log = Arc::new(EventLog::new());
let query = TraceQuery::new(log.clone());
let trace_id = TraceId::new();
log.log(Event::packet_sent(100, 1, trace_id));
log.log(Event::packet_received(200, 2, trace_id));
assert_eq!(query.event_count(), 2);
assert_eq!(query.recent(10).len(), 2);
assert_eq!(query.get_trace(trace_id).len(), 2);
assert!(!query.time_range(0, u64::MAX).is_empty());
}
#[test]
#[cfg(feature = "trace")]
fn analyze_connection_counts_loss_and_initial_rtt() {
let log = Arc::new(EventLog::new());
let query = TraceQuery::new(log.clone());
let trace_id = TraceId::new();
log.log(Event::packet_sent(100, 1, trace_id));
log.log(Event::packet_sent(150, 2, trace_id));
log.log(Event::packet_received(75, 1, trace_id));
log.log(Event {
timestamp: crate::tracing::timestamp_now(),
trace_id,
event_data: EventData::PacketLost {
packet_num: 2,
_padding: [0u8; 56],
},
..Default::default()
});
log.log(Event {
timestamp: crate::tracing::timestamp_now(),
trace_id,
event_data: EventData::ConnEstablished {
rtt: 1234,
_padding: [0u8; 60],
},
..Default::default()
});
let analysis = query.analyze_connection(trace_id);
assert_eq!(analysis.packets_sent, 2);
assert_eq!(analysis.bytes_sent, 250);
assert_eq!(analysis.packets_received, 1);
assert_eq!(analysis.bytes_received, 75);
assert_eq!(analysis.packets_lost, 1);
assert!((analysis.loss_rate - 0.5).abs() < f32::EPSILON);
assert_eq!(analysis.initial_rtt, Some(1234));
}
#[test]
#[cfg(feature = "trace")]
fn analyze_connection_without_sent_packets_has_zero_loss_rate() {
let log = Arc::new(EventLog::new());
let query = TraceQuery::new(log.clone());
let trace_id = TraceId::new();
log.log(Event::packet_received(88, 1, trace_id));
log.log(Event {
timestamp: crate::tracing::timestamp_now(),
trace_id,
event_data: EventData::PacketLost {
packet_num: 1,
_padding: [0u8; 56],
},
..Default::default()
});
let analysis = query.analyze_connection(trace_id);
assert_eq!(analysis.packets_sent, 0);
assert_eq!(analysis.packets_lost, 1);
assert_eq!(analysis.loss_rate, 0.0);
}
#[test]
#[cfg(feature = "trace")]
fn find_problematic_traces_flags_loss_threshold() {
let log = Arc::new(EventLog::new());
let query = TraceQuery::new(log.clone());
let trace_id = TraceId::new();
for packet_num in 0..6 {
log.log(Event {
timestamp: crate::tracing::timestamp_now(),
trace_id,
event_data: EventData::PacketLost {
packet_num,
_padding: [0u8; 56],
},
..Default::default()
});
}
assert_eq!(query.find_problematic_traces(10), vec![trace_id]);
}
#[test]
#[cfg(feature = "trace")]
fn find_problematic_traces_flags_nonzero_stream_close_error() {
let log = Arc::new(EventLog::new());
let query = TraceQuery::new(log.clone());
let trace_id = TraceId::new();
log.log(Event {
timestamp: crate::tracing::timestamp_now(),
trace_id,
event_data: EventData::StreamClosed {
stream_id: 9,
error_code: 1,
_padding: [0u8; 56],
},
..Default::default()
});
assert_eq!(query.find_problematic_traces(10), vec![trace_id]);
}
#[test]
#[cfg(feature = "trace")]
fn find_problematic_traces_ignores_clean_stream_close() {
let log = Arc::new(EventLog::new());
let query = TraceQuery::new(log.clone());
let trace_id = TraceId::new();
log.log(Event {
timestamp: crate::tracing::timestamp_now(),
trace_id,
event_data: EventData::StreamClosed {
stream_id: 9,
error_code: 0,
_padding: [0u8; 56],
},
..Default::default()
});
assert!(query.find_problematic_traces(10).is_empty());
}
#[test]
#[cfg(feature = "trace")]
fn connection_analysis_default_and_debug_are_stable() {
let analysis = ConnectionAnalysis::default();
assert_eq!(analysis.packets_sent, 0);
assert_eq!(analysis.initial_rtt, None);
assert!(format!("{analysis:?}").contains("ConnectionAnalysis"));
}
#[test]
#[cfg(not(feature = "trace"))]
fn test_zero_cost_query() {
use crate::tracing::{EventLog, TraceId};
use std::sync::Arc;
let log = Arc::new(EventLog::new());
let query = super::implementation::TraceQuery::new(log);
assert_eq!(query.event_count(), 0);
assert!(query.recent(10).is_empty());
assert!(query.get_trace(TraceId::new()).is_empty());
}
}