use crate::{
correlation::CorrelatableFrame,
fragment::{FragmentParts, Fragmentable},
};
pub trait Packet: CorrelatableFrame + Send + Sync + 'static {
fn id(&self) -> u32;
fn into_parts(self) -> PacketParts;
fn from_parts(parts: PacketParts) -> Self;
fn is_stream_terminator(&self) -> bool { false }
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PacketParts {
id: u32,
correlation_id: Option<u64>,
payload: Vec<u8>,
}
#[derive(bincode::Decode, bincode::Encode, Debug, Clone, PartialEq, Eq)]
pub struct Envelope {
pub(crate) id: u32,
pub(crate) correlation_id: Option<u64>,
pub(crate) payload: Vec<u8>,
}
impl Envelope {
#[must_use]
pub fn new(id: u32, correlation_id: Option<u64>, payload: Vec<u8>) -> Self {
Self {
id,
correlation_id,
payload,
}
}
#[must_use]
pub fn payload_bytes(&self) -> &[u8] { &self.payload }
}
impl Packet for Envelope {
#[inline]
fn id(&self) -> u32 { self.id }
fn into_parts(self) -> PacketParts { self.into() }
fn from_parts(parts: PacketParts) -> Self { parts.into() }
}
impl CorrelatableFrame for Envelope {
fn correlation_id(&self) -> Option<u64> { self.correlation_id }
fn set_correlation_id(&mut self, correlation_id: Option<u64>) {
self.correlation_id = correlation_id;
}
}
impl PacketParts {
#[must_use]
pub fn new(id: u32, correlation_id: Option<u64>, payload: Vec<u8>) -> Self {
Self {
id,
correlation_id,
payload,
}
}
#[must_use]
pub const fn id(&self) -> u32 { self.id }
#[must_use]
pub const fn correlation_id(&self) -> Option<u64> { self.correlation_id }
#[must_use]
pub fn payload_bytes(&self) -> &[u8] { &self.payload }
#[must_use]
pub fn into_payload(self) -> Vec<u8> { self.payload }
#[must_use]
pub fn inherit_correlation(mut self, source: Option<u64>) -> Self {
let (next, mismatched) = Self::select_correlation(self.correlation_id, source);
if mismatched && let (Some(found), Some(expected)) = (self.correlation_id, next) {
log::warn!(
"mismatched correlation id in response: id={}, expected={}, found={}",
self.id,
expected,
found
);
}
self.correlation_id = next;
self
}
#[inline]
fn select_correlation(current: Option<u64>, source: Option<u64>) -> (Option<u64>, bool) {
match (current, source) {
(None, cid) => (cid, false),
(Some(cid), Some(src)) if cid != src => (Some(src), true),
(curr, _) => (curr, false),
}
}
}
impl From<Envelope> for PacketParts {
fn from(e: Envelope) -> Self { PacketParts::new(e.id, e.correlation_id, e.payload) }
}
impl From<PacketParts> for Envelope {
fn from(p: PacketParts) -> Self {
let id = p.id();
let correlation_id = p.correlation_id();
let payload = p.into_payload();
Envelope::new(id, correlation_id, payload)
}
}
impl<T: Packet> Fragmentable for T {
fn into_fragment_parts(self) -> FragmentParts {
let parts = self.into_parts();
FragmentParts::new(parts.id(), parts.correlation_id(), parts.into_payload())
}
fn from_fragment_parts(parts: FragmentParts) -> Self {
T::from_parts(PacketParts::new(
parts.id(),
parts.correlation_id(),
parts.into_payload(),
))
}
}