use quiche::Connection;
use quiche::ConnectionError;
use quiche::PathStats;
use quiche::Stats;
use serde::ser::SerializeStruct;
use serde::ser::Serializer;
use serde::Serialize;
use std::cmp;
use std::collections::HashMap;
use std::iter::FromIterator;
use crate::frame::CloseTriggerFrame;
use crate::frame::EnrichedHeaders;
use crate::frame::H3iFrame;
use crate::quiche;
pub const MAX_SERIALIZED_BUFFER_LEN: usize = 16384;
#[derive(Default, Debug)]
pub struct ConnectionSummary {
pub stream_map: StreamMap,
pub stats: Option<Stats>,
pub path_stats: Vec<PathStats>,
pub conn_close_details: ConnectionCloseDetails,
}
impl Serialize for ConnectionSummary {
fn serialize<S>(&self, s: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut state = s.serialize_struct("path_stats", 4)?;
state.serialize_field("stream_map", &self.stream_map)?;
state.serialize_field(
"stats",
&self.stats.as_ref().map(SerializableStats),
)?;
let p: Vec<SerializablePathStats> =
self.path_stats.iter().map(SerializablePathStats).collect();
state.serialize_field("path_stats", &p)?;
state.serialize_field("error", &self.conn_close_details)?;
state.serialize_field(
"missed_close_trigger_frames",
&self.stream_map.missing_close_trigger_frames(),
)?;
state.end()
}
}
#[derive(Clone, Debug, Default, Serialize)]
pub struct StreamMap {
stream_frame_map: HashMap<u64, Vec<H3iFrame>>,
close_trigger_frames: Option<CloseTriggerFrames>,
}
impl<T> From<T> for StreamMap
where
T: IntoIterator<Item = (u64, Vec<H3iFrame>)>,
{
fn from(value: T) -> Self {
let stream_frame_map = HashMap::from_iter(value);
Self {
stream_frame_map,
close_trigger_frames: None,
}
}
}
impl StreamMap {
pub fn all_frames(&self) -> Vec<H3iFrame> {
self.stream_frame_map
.values()
.flatten()
.map(Clone::clone)
.collect::<Vec<H3iFrame>>()
}
pub fn stream(&self, stream_id: u64) -> Vec<H3iFrame> {
self.stream_frame_map
.get(&stream_id)
.cloned()
.unwrap_or_default()
}
pub fn received_frame(&self, frame: &H3iFrame) -> bool {
self.all_frames().contains(frame)
}
pub fn received_frame_on_stream(
&self, stream: u64, frame: &H3iFrame,
) -> bool {
self.stream_frame_map
.get(&stream)
.map(|v| v.contains(frame))
.is_some()
}
pub fn is_empty(&self) -> bool {
self.stream_frame_map.is_empty()
}
pub fn headers_on_stream(&self, stream_id: u64) -> Vec<EnrichedHeaders> {
self.stream(stream_id)
.into_iter()
.filter_map(|h3i_frame| h3i_frame.to_enriched_headers())
.collect()
}
pub fn all_close_trigger_frames_seen(&self) -> bool {
if let Some(triggers) = self.close_trigger_frames.as_ref() {
triggers.saw_all_trigger_frames()
} else {
false
}
}
pub fn missing_close_trigger_frames(&self) -> Option<Vec<CloseTriggerFrame>> {
self.close_trigger_frames
.as_ref()
.map(|e| e.missing_triggers())
}
pub(crate) fn new(close_trigger_frames: Option<CloseTriggerFrames>) -> Self {
Self {
close_trigger_frames,
..Default::default()
}
}
pub(crate) fn insert(&mut self, stream_id: u64, frame: H3iFrame) {
if let Some(expected) = self.close_trigger_frames.as_mut() {
expected.receive_frame(stream_id, &frame);
}
self.stream_frame_map
.entry(stream_id)
.or_default()
.push(frame);
}
pub(crate) fn close_due_to_trigger_frames(
&self, qconn: &mut quiche::Connection,
) {
if let Some(ConnectionError {
is_app,
error_code,
reason,
}) = self.close_trigger_frames.as_ref().map(|tf| &tf.close_with)
{
let _ = qconn.close(*is_app, *error_code, reason);
}
}
}
#[derive(Clone, Serialize, Debug)]
pub struct CloseTriggerFrames {
missing: Vec<CloseTriggerFrame>,
#[serde(skip)]
close_with: ConnectionError,
}
impl CloseTriggerFrames {
pub fn new(frames: Vec<CloseTriggerFrame>) -> Self {
Self::new_with_connection_close(frames, ConnectionError {
is_app: true,
error_code: quiche::h3::WireErrorCode::NoError as u64,
reason: b"saw all close trigger frames".to_vec(),
})
}
pub fn new_with_connection_close(
frames: Vec<CloseTriggerFrame>, close_with: ConnectionError,
) -> Self {
Self {
missing: frames,
close_with,
}
}
fn receive_frame(&mut self, stream_id: u64, frame: &H3iFrame) {
for (i, trigger) in self.missing.iter_mut().enumerate() {
if trigger.is_equivalent(frame) && trigger.stream_id() == stream_id {
self.missing.remove(i);
break;
}
}
}
fn saw_all_trigger_frames(&self) -> bool {
self.missing.is_empty()
}
fn missing_triggers(&self) -> Vec<CloseTriggerFrame> {
self.missing.clone()
}
}
impl From<Vec<CloseTriggerFrame>> for CloseTriggerFrames {
fn from(value: Vec<CloseTriggerFrame>) -> Self {
Self::new(value)
}
}
#[derive(Default)]
pub struct ConnectionCloseDetails {
peer_error: Option<ConnectionError>,
local_error: Option<ConnectionError>,
pub timed_out: bool,
pub session: Option<Vec<u8>>,
}
impl core::fmt::Debug for ConnectionCloseDetails {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ConnectionCloseDetails")
.field("peer_error", &self.peer_error)
.field("local_error", &self.local_error)
.field("timed_out", &self.timed_out)
.finish()
}
}
impl ConnectionCloseDetails {
pub fn new(qconn: &Connection) -> Self {
let session = qconn.session().map(|s| s.to_vec());
Self {
peer_error: qconn.peer_error().cloned(),
local_error: qconn.local_error().cloned(),
timed_out: qconn.is_timed_out(),
session,
}
}
pub fn peer_error(&self) -> Option<&ConnectionError> {
self.peer_error.as_ref()
}
pub fn local_error(&self) -> Option<&ConnectionError> {
self.local_error.as_ref()
}
pub fn no_err(&self) -> bool {
self.peer_error.is_none() && self.local_error.is_none()
}
}
impl Serialize for ConnectionCloseDetails {
fn serialize<S>(&self, s: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut state: <S as Serializer>::SerializeStruct =
s.serialize_struct("enriched_connection_error", 3)?;
if let Some(pe) = &self.peer_error {
state.serialize_field(
"peer_error",
&SerializableConnectionError(pe),
)?;
}
if let Some(le) = &self.local_error {
state.serialize_field(
"local_error",
&SerializableConnectionError(le),
)?;
}
state.serialize_field("timed_out", &self.timed_out)?;
state.end()
}
}
pub struct SerializablePathStats<'a>(&'a quiche::PathStats);
impl Serialize for SerializablePathStats<'_> {
fn serialize<S>(&self, s: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut state = s.serialize_struct("path_stats", 17)?;
state.serialize_field("local_addr", &self.0.local_addr)?;
state.serialize_field("peer_addr", &self.0.peer_addr)?;
state.serialize_field("active", &self.0.active)?;
state.serialize_field("recv", &self.0.recv)?;
state.serialize_field("sent", &self.0.sent)?;
state.serialize_field("lost", &self.0.lost)?;
state.serialize_field("retrans", &self.0.retrans)?;
state.serialize_field("rtt", &self.0.rtt.as_secs_f64())?;
state.serialize_field(
"min_rtt",
&self.0.min_rtt.map(|x| x.as_secs_f64()),
)?;
state.serialize_field("rttvar", &self.0.rttvar.as_secs_f64())?;
state.serialize_field("cwnd", &self.0.cwnd)?;
state.serialize_field("sent_bytes", &self.0.sent_bytes)?;
state.serialize_field("recv_bytes", &self.0.recv_bytes)?;
state.serialize_field("lost_bytes", &self.0.lost_bytes)?;
state.serialize_field(
"stream_retrans_bytes",
&self.0.stream_retrans_bytes,
)?;
state.serialize_field("pmtu", &self.0.pmtu)?;
state.serialize_field("delivery_rate", &self.0.delivery_rate)?;
state.end()
}
}
pub struct SerializableStats<'a>(&'a quiche::Stats);
impl Serialize for SerializableStats<'_> {
fn serialize<S>(&self, s: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut state = s.serialize_struct("path_stats", 14)?;
state.serialize_field("recv", &self.0.recv)?;
state.serialize_field("sent", &self.0.sent)?;
state.serialize_field("lost", &self.0.lost)?;
state.serialize_field("retrans", &self.0.retrans)?;
state.serialize_field("sent_bytes", &self.0.sent_bytes)?;
state.serialize_field("recv_bytes", &self.0.recv_bytes)?;
state.serialize_field("lost_bytes", &self.0.lost_bytes)?;
state.serialize_field(
"stream_retrans_bytes",
&self.0.stream_retrans_bytes,
)?;
state.serialize_field("paths_count", &self.0.paths_count)?;
state.serialize_field(
"reset_stream_count_local",
&self.0.reset_stream_count_local,
)?;
state.serialize_field(
"stopped_stream_count_local",
&self.0.stopped_stream_count_local,
)?;
state.serialize_field(
"reset_stream_count_remote",
&self.0.reset_stream_count_remote,
)?;
state.serialize_field(
"stopped_stream_count_remote",
&self.0.stopped_stream_count_remote,
)?;
state.serialize_field(
"path_challenge_rx_count",
&self.0.path_challenge_rx_count,
)?;
state.end()
}
}
#[derive(Clone, Debug)]
pub struct SerializableConnectionError<'a>(&'a quiche::ConnectionError);
impl Serialize for SerializableConnectionError<'_> {
fn serialize<S>(&self, s: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut state = s.serialize_struct("path_stats", 3)?;
state.serialize_field("is_app", &self.0.is_app)?;
state.serialize_field("error_code", &self.0.error_code)?;
let max = cmp::min(self.0.reason.len(), MAX_SERIALIZED_BUFFER_LEN);
state.serialize_field(
"reason",
&String::from_utf8_lossy(&self.0.reason[..max]),
)?;
state.end()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::frame::EnrichedHeaders;
use quiche::h3::Header;
fn h3i_frame() -> H3iFrame {
vec![Header::new(b"hello", b"world")].into()
}
#[test]
fn close_trigger_frame() {
let frame = h3i_frame();
let mut triggers = CloseTriggerFrames::new(vec![CloseTriggerFrame::new(
0,
frame.clone(),
)]);
triggers.receive_frame(0, &frame);
assert!(triggers.saw_all_trigger_frames());
}
#[test]
fn trigger_frame_missing() {
let frame = h3i_frame();
let expected_frames = vec![
CloseTriggerFrame::new(0, frame.clone()),
CloseTriggerFrame::new(4, frame.clone()),
CloseTriggerFrame::new(8, vec![Header::new(b"go", b"jets")]),
];
let mut expected = CloseTriggerFrames::new(expected_frames.clone());
expected.receive_frame(0, &frame);
assert!(!expected.saw_all_trigger_frames());
assert_eq!(expected.missing_triggers(), expected_frames[1..].to_vec());
}
fn stream_map_data() -> Vec<H3iFrame> {
let headers =
H3iFrame::Headers(EnrichedHeaders::from(vec![Header::new(
b"hello", b"world",
)]));
let data = H3iFrame::QuicheH3(quiche::h3::frame::Frame::Data {
payload: b"hello world".to_vec(),
});
vec![headers, data]
}
#[test]
fn test_stream_map_trigger_frames_with_none() {
let stream_map: StreamMap = vec![(0, stream_map_data())].into();
assert!(!stream_map.all_close_trigger_frames_seen());
}
#[test]
fn test_stream_map_trigger_frames() {
let data = stream_map_data();
let mut stream_map = StreamMap::new(Some(
vec![
CloseTriggerFrame::new(0, data[0].clone()),
CloseTriggerFrame::new(0, data[1].clone()),
]
.into(),
));
stream_map.insert(0, data[0].clone());
assert!(!stream_map.all_close_trigger_frames_seen());
assert_eq!(stream_map.missing_close_trigger_frames().unwrap(), vec![
CloseTriggerFrame::new(0, data[1].clone())
]);
}
}