use bincode::error::DecodeError;
use thiserror::Error;
use super::{
Fragmentable,
FragmentationConfig,
FragmentationError,
Fragmenter,
MessageId,
Reassembler,
ReassemblyError,
decode_fragment_payload,
fragment_packet,
packet::FragmentParts,
};
#[derive(Debug, Error)]
pub enum FragmentAdapterError {
#[error("decode error: {0}")]
Decode(#[from] DecodeError),
#[error("reassembly error: {0}")]
Reassembly(#[from] ReassemblyError),
}
pub trait FragmentAdapter: Send + Sync {
fn fragment<E: Fragmentable>(&self, packet: E) -> Result<Vec<E>, FragmentationError>;
fn reassemble<E: Fragmentable>(&mut self, packet: E)
-> Result<Option<E>, FragmentAdapterError>;
fn purge_expired(&mut self) -> Vec<MessageId>;
}
#[derive(Debug)]
pub struct DefaultFragmentAdapter {
fragmenter: Fragmenter,
reassembler: Reassembler,
}
impl DefaultFragmentAdapter {
#[must_use]
pub fn new(config: FragmentationConfig) -> Self {
Self {
fragmenter: Fragmenter::new(config.fragment_payload_cap),
reassembler: Reassembler::new(config.max_message_size, config.reassembly_timeout),
}
}
pub fn fragment<E: Fragmentable>(&self, packet: E) -> Result<Vec<E>, FragmentationError> {
fragment_packet(&self.fragmenter, packet)
}
pub fn reassemble<E: Fragmentable>(
&mut self,
packet: E,
) -> Result<Option<E>, FragmentAdapterError> {
let parts = packet.into_fragment_parts();
let id = parts.id();
let correlation_id = parts.correlation_id();
let payload = parts.into_payload();
if let Some((header, fragment_payload)) = decode_fragment_payload(&payload)? {
match self.reassembler.push(header, fragment_payload)? {
Some(message) => {
let rebuilt = FragmentParts::new(id, correlation_id, message.into_payload());
Ok(Some(E::from_fragment_parts(rebuilt)))
}
None => Ok(None),
}
} else {
let passthrough = FragmentParts::new(id, correlation_id, payload);
Ok(Some(E::from_fragment_parts(passthrough)))
}
}
pub fn purge_expired(&mut self) -> Vec<MessageId> { self.reassembler.purge_expired() }
}
impl FragmentAdapter for DefaultFragmentAdapter {
fn fragment<E: Fragmentable>(&self, packet: E) -> Result<Vec<E>, FragmentationError> {
DefaultFragmentAdapter::fragment(self, packet)
}
fn reassemble<E: Fragmentable>(
&mut self,
packet: E,
) -> Result<Option<E>, FragmentAdapterError> {
DefaultFragmentAdapter::reassemble(self, packet)
}
fn purge_expired(&mut self) -> Vec<MessageId> { DefaultFragmentAdapter::purge_expired(self) }
}