use log::warn;
use super::ConnectionActor;
use crate::{
app::{Packet, fragment_utils::fragment_packet},
correlation::CorrelatableFrame,
push::FrameLike,
};
impl<F, E> ConnectionActor<F, E>
where
F: FrameLike + CorrelatableFrame + Packet,
E: std::fmt::Debug,
{
pub(super) fn emit_multi_packet_frame(&mut self, frame: F, out: &mut Vec<F>) {
let mut frame = frame;
self.apply_multi_packet_correlation(&mut frame);
self.process_frame_with_hooks_and_metrics(frame, out);
}
pub(super) fn apply_multi_packet_correlation(&mut self, frame: &mut F) {
let Some(ctx) = self.active_output.multi_packet_mut() else {
return;
};
if !ctx.is_stamping_enabled() {
return;
}
let correlation_id = ctx.correlation_id();
frame.set_correlation_id(correlation_id);
if let Some(expected) = correlation_id {
debug_assert!(
CorrelatableFrame::correlation_id(frame) == Some(expected)
|| CorrelatableFrame::correlation_id(frame).is_none(),
"multi-packet frame correlation mismatch: expected={:?}, got={:?}",
Some(expected),
CorrelatableFrame::correlation_id(frame),
);
} else {
debug_assert!(
CorrelatableFrame::correlation_id(frame).is_none(),
"multi-packet frame correlation unexpectedly present: got={:?}",
CorrelatableFrame::correlation_id(frame),
);
}
}
pub(super) fn process_frame_with_hooks_and_metrics(&mut self, frame: F, out: &mut Vec<F>)
where
F: Packet,
{
if let Some(fragmenter) = self.fragmenter.as_deref() {
let fragmented = fragment_packet(fragmenter, frame);
match fragmented {
Ok(frames) => frames
.into_iter()
.for_each(|frame| self.push_frame(frame, out)),
Err(err) => {
warn!(
"failed to fragment frame: connection_id={:?}, peer={:?}, error={err:?}",
self.connection_id, self.peer_addr,
);
crate::metrics::inc_handler_errors();
}
}
} else {
self.push_frame(frame, out);
}
}
pub(super) fn push_frame(&mut self, frame: F, out: &mut Vec<F>) {
let mut frame = frame;
self.hooks.before_send(&mut frame, &mut self.ctx);
out.push(frame);
crate::metrics::inc_frames(crate::metrics::Direction::Outbound);
}
}