use std::{collections::BTreeMap, fmt, iter};
use bit_vec::BitVec;
use enumflags2::BitFlags;
use bytes::BytesMut;
#[allow(unused_imports)]
use log::{debug, error, info, trace, warn};
use crate::{
dds::ddsdata::DDSData,
messages::submessages::{
elements::serialized_payload::SerializedPayload,
submessages::{DATAFRAG_Flags, DataFrag},
},
structure::{
cache_change::ChangeKind,
sequence_number::{FragmentNumber, SequenceNumber},
time::Timestamp,
},
};
struct AssemblyBuffer {
buffer_bytes: BytesMut,
fragment_count: usize,
received_bitmap: BitVec,
#[allow(dead_code)] created_time: Timestamp,
modified_time: Timestamp,
}
impl AssemblyBuffer {
pub fn new(datafrag: &DataFrag) -> Self {
let data_size: usize = datafrag.data_size.try_into().unwrap();
let fragment_size: u16 = datafrag.fragment_size;
debug!("new AssemblyBuffer data_size={data_size} frag_size={fragment_size}");
assert!(fragment_size as usize <= data_size); assert!(fragment_size > 0);
let mut buffer_bytes = BytesMut::with_capacity(data_size);
buffer_bytes.resize(data_size, 0);
let fragment_count = usize::from(datafrag.total_number_of_fragments());
let now = Timestamp::now();
Self {
buffer_bytes,
fragment_count,
received_bitmap: BitVec::from_elem(fragment_count, false),
created_time: now,
modified_time: now,
}
}
pub fn insert_frags(&mut self, datafrag: &DataFrag, frag_size: u16) {
let frag_size = usize::from(frag_size); let frags_in_submessage = usize::from(datafrag.fragments_in_submessage);
let fragment_starting_num: usize = u32::from(datafrag.fragment_starting_num)
.try_into()
.unwrap();
let start_frag_from_0 = fragment_starting_num - 1;
debug!(
"insert_frags: datafrag.writer_sn = {:?}, frag_size = {:?}, datafrag.fragment_size = {:?}, \
datafrag.fragment_starting_num = {:?}, datafrag.fragments_in_submessage = {:?}, \
datafrag.data_size = {:?}",
datafrag.writer_sn,
frag_size,
datafrag.fragment_size,
datafrag.fragment_starting_num,
datafrag.fragments_in_submessage,
datafrag.data_size
);
let from_byte = start_frag_from_0 * frag_size;
let to_before_byte = std::cmp::min(
from_byte
+ std::cmp::min(
frags_in_submessage * frag_size,
datafrag.serialized_payload.len(),
),
self.buffer_bytes.len(),
);
let payload_size = to_before_byte - from_byte;
let last_frag_in_submessage = start_frag_from_0 + frags_in_submessage;
if last_frag_in_submessage < self.fragment_count
&& datafrag.serialized_payload.len() < frags_in_submessage * frag_size
{
error!(
"Received DATAFRAG too small. fragment_starting_num={} out of fragment_count={}, \
frags_in_submessage={}, frag_size={} but payload length = {}. Original data_size={}",
fragment_starting_num,
self.fragment_count,
frags_in_submessage,
frag_size,
datafrag.serialized_payload.len(),
datafrag.data_size,
);
}
debug!("insert_frags: from_byte = {from_byte:?}, to_before_byte = {to_before_byte:?}");
debug!(
"insert_frags: dataFrag.serializedPayload.len = {:?}",
datafrag.serialized_payload.len()
);
self.buffer_bytes.as_mut()[from_byte..to_before_byte]
.copy_from_slice(&datafrag.serialized_payload[..payload_size]);
for f in 0..frags_in_submessage {
self.received_bitmap.set(start_frag_from_0 + f, true);
}
self.modified_time = Timestamp::now();
}
pub fn is_complete(&self) -> bool {
self.received_bitmap.all() }
}
pub(crate) struct FragmentAssembler {
fragment_size: u16, assembly_buffers: BTreeMap<SequenceNumber, AssemblyBuffer>,
}
impl fmt::Debug for FragmentAssembler {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("FragmentAssembler - fields omitted")
.finish()
}
}
impl FragmentAssembler {
pub fn new(fragment_size: u16) -> Self {
debug!("new FragmentAssembler. frag_size = {fragment_size}");
Self {
fragment_size,
assembly_buffers: BTreeMap::new(),
}
}
pub fn new_datafrag(
&mut self,
datafrag: &DataFrag,
flags: BitFlags<DATAFRAG_Flags>,
) -> Option<DDSData> {
let writer_sn = datafrag.writer_sn;
let frag_size = self.fragment_size;
let assembly_buffer = self
.assembly_buffers
.entry(datafrag.writer_sn)
.or_insert_with(|| AssemblyBuffer::new(datafrag));
assembly_buffer.insert_frags(datafrag, frag_size);
if assembly_buffer.is_complete() {
debug!("new_datafrag: COMPLETED FRAGMENT");
if let Some(assembly_buffer) = self.assembly_buffers.remove(&writer_sn) {
let serialized_data_or_key =
SerializedPayload::from_bytes(&assembly_buffer.buffer_bytes.freeze()).map_or_else(
|e| {
error!("Deserializing SerializedPayload from DATAFRAG: {:?}", &e);
None
},
Some,
)?;
let dds_data = if flags.contains(DATAFRAG_Flags::Key) {
DDSData::new_disposed_by_key(ChangeKind::NotAliveDisposed, serialized_data_or_key)
} else {
DDSData::new(serialized_data_or_key)
};
Some(dds_data) } else {
error!("Assembly buffer mysteriously lost");
None
}
} else {
debug!("new_dataFrag: FRAGMENT NOT COMPLETED YET");
None
}
}
pub fn garbage_collect_before(&mut self, expire_before: Timestamp) {
self.assembly_buffers.retain(|sn, ab| {
let retain = ab.modified_time >= expire_before;
if !retain {
info!("AssemblyBuffer dropping {sn:?}");
}
retain
});
}
pub fn is_partially_received(&self, sn: SequenceNumber) -> bool {
self.assembly_buffers.contains_key(&sn)
}
pub fn missing_frags_for(
&self,
seq: SequenceNumber,
) -> Box<dyn '_ + Iterator<Item = FragmentNumber>> {
match self.assembly_buffers.get(&seq) {
None => Box::new(iter::empty()),
Some(ab) => {
let iter = (0..ab.fragment_count)
.filter(move |f| !ab.received_bitmap.get(*f).unwrap_or(true))
.map(|f| FragmentNumber::new((f + 1).try_into().unwrap()));
Box::new(iter)
}
}
}
}