use std::{collections::BTreeMap, convert::TryInto, fmt};
use bit_vec::BitVec;
use enumflags2::BitFlags;
use bytes::BytesMut;
#[allow(unused_imports)]
use log::{debug, error, info, trace, warn};
use crate::{
dds::ddsdata::DDSData,
messages::submessages::{
submessage_elements::serialized_payload::SerializedPayload,
submessages::{DATAFRAG_Flags, DataFrag},
},
structure::{cache_change::ChangeKind, sequence_number::SequenceNumber, time::Timestamp},
};
struct AssemblyBuffer {
buffer_bytes: BytesMut,
#[allow(dead_code)] fragment_count: usize,
received_bitmap: BitVec,
#[allow(dead_code)] created_time: Timestamp,
modified_time: Timestamp,
}
impl AssemblyBuffer {
pub fn new(data_size: u32, fragment_size: u16) -> Self {
let data_size: usize = data_size.try_into().unwrap();
let mut buffer_bytes = BytesMut::with_capacity(data_size);
buffer_bytes.resize(data_size, 0);
let frag_size = usize::from(fragment_size);
let fragment_count = (data_size / frag_size) + (if data_size % frag_size == 0 { 0 } else { 1 });
let now = Timestamp::now();
Self {
buffer_bytes,
fragment_count,
received_bitmap: BitVec::from_elem(fragment_count, false),
created_time: now,
modified_time: now,
}
}
pub fn insert_frags(&mut self, datafrag: &DataFrag, frag_size: u16) {
let frag_size = usize::from(frag_size);
let frags_in_subm = usize::from(datafrag.fragments_in_submessage);
let start_frag_from_0: usize = u32::from(datafrag.fragment_starting_num)
.try_into()
.unwrap();
let from_byte = (start_frag_from_0 - 1) * frag_size;
let to_before_byte: usize = from_byte + (frags_in_subm * frag_size);
self.buffer_bytes.as_mut()[from_byte..to_before_byte]
.copy_from_slice(&datafrag.serialized_payload.value);
for f in from_byte..to_before_byte {
self.received_bitmap.set(f, true);
}
self.modified_time = Timestamp::now();
}
pub fn is_complete(&self) -> bool {
self.received_bitmap.all() }
}
pub(crate) struct FragmentAssembler {
fragment_size: u16, assembly_buffers: BTreeMap<SequenceNumber, AssemblyBuffer>,
}
impl fmt::Debug for FragmentAssembler {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("FragmentAssembler - fields omitted")
.finish()
}
}
impl FragmentAssembler {
pub fn new(fragment_size: u16) -> Self {
Self {
fragment_size,
assembly_buffers: BTreeMap::new(),
}
}
pub fn new_datafrag(
&mut self,
datafrag: &DataFrag,
flags: BitFlags<DATAFRAG_Flags>,
) -> Option<DDSData> {
let rep_id = datafrag.serialized_payload.representation_identifier;
let writer_sn = datafrag.writer_sn;
let abuf = self
.assembly_buffers
.entry(datafrag.writer_sn)
.or_insert_with(|| AssemblyBuffer::new(datafrag.data_size, datafrag.fragment_size));
abuf.insert_frags(datafrag, self.fragment_size);
if abuf.is_complete() {
if let Some(abuf) = self.assembly_buffers.remove(&writer_sn) {
let ser_data_or_key = SerializedPayload::new(rep_id, abuf.buffer_bytes.to_vec());
let ddsdata = if flags.contains(DATAFRAG_Flags::Key) {
DDSData::new_disposed_by_key(ChangeKind::NotAliveDisposed, ser_data_or_key)
} else {
DDSData::new(ser_data_or_key)
};
Some(ddsdata) } else {
error!("Assembly buffer mysteriously lost");
None
}
} else {
None
}
}
}