use crate::{
rtps_messages::{
overall_structure::RtpsMessageWrite,
submessage_elements::{Data, FragmentNumberSet, SequenceNumberSet},
submessages::{
ack_nack::AckNackSubmessage, data::DataSubmessage, data_frag::DataFragSubmessage,
info_destination::InfoDestinationSubmessage, nack_frag::NackFragSubmessage,
},
types::Count,
},
transport::{
interface::WriteMessage,
types::{EntityId, Guid, Locator, ReliabilityKind, SequenceNumber},
},
};
use alloc::{sync::Arc, vec::Vec};
use core::cmp::max;
fn total_fragments_expected(data_frag_submessage: &DataFragSubmessage) -> u32 {
let data_size = data_frag_submessage.data_size();
let fragment_size = data_frag_submessage.fragment_size() as u32;
let total_fragments_correction = if data_size % fragment_size == 0 { 0 } else { 1 };
data_size / fragment_size + total_fragments_correction
}
#[derive(Debug, PartialEq, Eq)]
pub struct RtpsWriterProxy {
remote_writer_guid: Guid,
unicast_locator_list: Vec<Locator>,
multicast_locator_list: Vec<Locator>,
remote_group_entity_id: EntityId,
first_available_seq_num: SequenceNumber,
last_available_seq_num: SequenceNumber,
highest_received_change_sn: SequenceNumber,
must_send_acknacks: bool,
last_received_heartbeat_count: Count,
last_received_heartbeat_frag_count: Count,
acknack_count: Count,
nack_frag_count: Count,
frag_buffer: Vec<DataFragSubmessage>,
reliability: ReliabilityKind,
}
impl RtpsWriterProxy {
pub fn new(
remote_writer_guid: Guid,
unicast_locator_list: &[Locator],
multicast_locator_list: &[Locator],
remote_group_entity_id: EntityId,
reliability: ReliabilityKind,
) -> Self {
Self {
remote_writer_guid,
unicast_locator_list: unicast_locator_list.to_vec(),
multicast_locator_list: multicast_locator_list.to_vec(),
remote_group_entity_id,
first_available_seq_num: 1,
last_available_seq_num: 0,
highest_received_change_sn: 0,
must_send_acknacks: false,
last_received_heartbeat_count: 0,
last_received_heartbeat_frag_count: 0,
acknack_count: 0,
nack_frag_count: 0,
frag_buffer: Vec::new(),
reliability,
}
}
pub fn push_data_frag(&mut self, submessage: DataFragSubmessage) {
if !self.frag_buffer.contains(&submessage) {
self.frag_buffer.push(submessage);
}
}
pub fn delete_data_fragments(&mut self, sequence_number: SequenceNumber) {
self.frag_buffer.retain(|f| f.writer_sn() > sequence_number);
}
pub fn reconstruct_data_from_frag(
&mut self,
seq_num: SequenceNumber,
) -> Option<DataSubmessage> {
let frag_submessage = self.frag_buffer.iter().find(|f| f.writer_sn() == seq_num)?;
let total_fragments_expected = total_fragments_expected(frag_submessage);
let total_fragments = self
.frag_buffer
.iter()
.filter(|f| f.writer_sn() == seq_num)
.fold(0, |mut acc, f| {
acc += f.fragments_in_submessage() as u32;
acc
});
if total_fragments == total_fragments_expected {
let mut data = Vec::new();
for frag_number in 0..=total_fragments {
let Some(frag) = self
.frag_buffer
.iter()
.find(|f| f.writer_sn() == seq_num && f.fragment_starting_num() == frag_number)
else {
continue;
};
data.extend_from_slice(frag.serialized_payload().as_ref());
}
let frag = self
.frag_buffer
.iter()
.find(|f| f.writer_sn() == seq_num && f.fragment_starting_num() == 1)?;
let inline_qos_flag = frag.inline_qos_flag();
let data_flag = !frag.key_flag();
let key_flag = frag.key_flag();
let non_standard_payload_flag = false;
let writer_id = self.remote_writer_guid.entity_id();
let reader_id = frag.reader_id();
let writer_sn = seq_num;
let inline_qos = frag.inline_qos().clone();
self.frag_buffer.retain(|f| f.writer_sn() != seq_num);
Some(DataSubmessage::new(
inline_qos_flag,
data_flag,
key_flag,
non_standard_payload_flag,
reader_id,
writer_id,
writer_sn,
inline_qos,
Data::new(Arc::from(data)),
))
} else {
None
}
}
pub fn remote_writer_guid(&self) -> Guid {
self.remote_writer_guid
}
pub fn unicast_locator_list(&self) -> &[Locator] {
self.unicast_locator_list.as_ref()
}
pub fn reliability(&self) -> ReliabilityKind {
self.reliability
}
pub fn available_changes_max(&self) -> SequenceNumber {
max(
self.first_available_seq_num - 1,
self.highest_received_change_sn,
)
}
pub fn irrelevant_change_set(&mut self, a_seq_num: SequenceNumber) {
if a_seq_num > self.highest_received_change_sn {
self.highest_received_change_sn = a_seq_num;
}
}
pub fn lost_changes_update(&mut self, first_available_seq_num: SequenceNumber) {
self.first_available_seq_num = first_available_seq_num;
}
pub fn missing_changes(&self) -> impl Iterator<Item = SequenceNumber> {
let highest_number = max(self.last_available_seq_num, self.highest_received_change_sn);
let first_missing_change = max(
self.first_available_seq_num,
self.highest_received_change_sn + 1,
);
first_missing_change..=highest_number
}
pub fn missing_changes_update(&mut self, last_available_seq_num: SequenceNumber) {
self.last_available_seq_num = last_available_seq_num;
}
pub fn received_change_set(&mut self, a_seq_num: SequenceNumber) {
if a_seq_num > self.highest_received_change_sn {
self.highest_received_change_sn = a_seq_num;
}
self.frag_buffer.retain(|x| x.writer_sn() > a_seq_num);
}
pub fn set_must_send_acknacks(&mut self, must_send_acknacks: bool) {
self.must_send_acknacks = must_send_acknacks;
}
pub fn must_send_acknacks(&self) -> bool {
self.must_send_acknacks
}
pub fn last_received_heartbeat_count(&self) -> Count {
self.last_received_heartbeat_count
}
pub fn set_last_received_heartbeat_count(&mut self, last_received_heartbeat_count: Count) {
self.last_received_heartbeat_count = last_received_heartbeat_count;
}
pub fn set_last_received_heartbeat_frag_count(
&mut self,
last_received_heartbeat_frag_count: Count,
) {
self.last_received_heartbeat_frag_count = last_received_heartbeat_frag_count;
}
pub fn acknack_count(&self) -> Count {
self.acknack_count
}
pub fn increment_acknack_count(&mut self) {
self.acknack_count = self.acknack_count.wrapping_add(1);
}
pub async fn write_message(
&mut self,
reader_guid: &Guid,
message_writer: &(impl WriteMessage + ?Sized),
) {
if self.must_send_acknacks() || !self.missing_changes().count() == 0 {
self.set_must_send_acknacks(false);
self.increment_acknack_count();
let info_dst_submessage =
InfoDestinationSubmessage::new(self.remote_writer_guid().prefix());
let missing_changes = self.missing_changes().take(256).take_while(|x| {
x < &self
.frag_buffer
.iter()
.map(|x| x.writer_sn())
.min()
.unwrap_or(i64::MAX)
});
let acknack_submessage = AckNackSubmessage::new(
true,
reader_guid.entity_id(),
self.remote_writer_guid().entity_id(),
SequenceNumberSet::new(self.available_changes_max() + 1, missing_changes),
self.acknack_count(),
);
let rtps_message = if let Some(missing_change_fragments_seq_num) = self
.missing_changes()
.take(256)
.find(|s| self.frag_buffer.iter().any(|x| &x.writer_sn() == s))
{
let frag = self
.frag_buffer
.iter()
.find(|x| x.writer_sn() == missing_change_fragments_seq_num)
.expect("Must exist");
let total_fragments_expected =
frag.data_size().div_ceil(frag.fragment_size() as u32);
let mut missing_fragments_iter = (1..=total_fragments_expected)
.filter(|frag_num| {
!self.frag_buffer.iter().any(|f| {
f.writer_sn() == missing_change_fragments_seq_num
&& &f.fragment_starting_num() == frag_num
})
})
.peekable();
let base = *missing_fragments_iter
.peek()
.expect("At least a fragment must be missing");
let fragment_number_state = FragmentNumberSet::new(base, missing_fragments_iter);
let nack_frag_submessage = NackFragSubmessage::new(
reader_guid.entity_id(),
self.remote_writer_guid().entity_id(),
missing_change_fragments_seq_num,
fragment_number_state,
self.nack_frag_count,
);
RtpsMessageWrite::from_submessages(
&[
&info_dst_submessage,
&acknack_submessage,
&nack_frag_submessage,
],
reader_guid.prefix(),
)
} else {
RtpsMessageWrite::from_submessages(
&[&info_dst_submessage, &acknack_submessage],
reader_guid.prefix(),
)
};
message_writer
.write_message(rtps_message.buffer(), self.unicast_locator_list())
.await;
}
}
pub fn is_historical_data_received(&self) -> bool {
let at_least_one_heartbeat_received = self.last_received_heartbeat_count > 0;
at_least_one_heartbeat_received && self.missing_changes().count() == 0
}
}