use core::ops::Bound::{Excluded, Unbounded};
use std::{cmp::max, collections::BTreeMap, iter};
use enumflags2::BitFlags;
#[allow(unused_imports)]
use log::{debug, error, info, trace, warn};
use crate::{
dds::{ddsdata::DDSData, fragment_assembler::FragmentAssembler},
discovery::data_types::topic_data::DiscoveredWriterData,
messages::submessages::submessages::{DATAFRAG_Flags, DataFrag},
structure::{
guid::{EntityId, GUID},
locator::Locator,
sequence_number::{FragmentNumber, SequenceNumber},
time::Timestamp,
},
};
#[derive(Debug)] pub(crate) struct RtpsWriterProxy {
pub remote_writer_guid: GUID,
pub unicast_locator_list: Vec<Locator>,
pub multicast_locator_list: Vec<Locator>,
pub remote_group_entity_id: EntityId,
changes: BTreeMap<SequenceNumber, Option<Timestamp>>,
pub received_heartbeat_count: i32,
pub sent_ack_nack_count: i32,
ack_base: SequenceNumber,
last_received_sequence_number: SequenceNumber,
last_received_timestamp: Timestamp,
fragment_assembler: Option<FragmentAssembler>,
}
impl RtpsWriterProxy {
pub fn new(
remote_writer_guid: GUID,
unicast_locator_list: Vec<Locator>,
multicast_locator_list: Vec<Locator>,
remote_group_entity_id: EntityId,
) -> Self {
Self {
remote_writer_guid,
unicast_locator_list,
multicast_locator_list,
remote_group_entity_id,
changes: BTreeMap::new(),
received_heartbeat_count: 0,
sent_ack_nack_count: 0,
ack_base: SequenceNumber::new(1),
last_received_sequence_number: SequenceNumber::new(0),
last_received_timestamp: Timestamp::INVALID,
fragment_assembler: None,
}
}
pub fn next_ack_nack_sequence_number(&mut self) -> i32 {
let c = self.sent_ack_nack_count;
self.sent_ack_nack_count += 1;
c
}
pub fn all_ackable_before(&self) -> SequenceNumber {
self.ack_base
}
pub fn update_contents(&mut self, other: Self) {
self.unicast_locator_list = other.unicast_locator_list;
self.multicast_locator_list = other.multicast_locator_list;
self.remote_group_entity_id = other.remote_group_entity_id;
}
pub fn last_change_timestamp(&self) -> Option<Timestamp> {
if self.last_received_sequence_number > SequenceNumber::new(0) {
Some(self.last_received_timestamp)
} else {
None
}
}
pub fn no_changes_received(&self) -> bool {
self.ack_base == SequenceNumber::new(0) && self.changes.is_empty()
}
pub fn missing_seqnums(
&self,
hb_first_sn: SequenceNumber,
hb_last_sn: SequenceNumber,
) -> Vec<SequenceNumber> {
if hb_first_sn > hb_last_sn {
if hb_first_sn > hb_last_sn + SequenceNumber::from(1) {
warn!(
"Negative range of missing_seqnums first={:?} last={:?}",
hb_first_sn, hb_last_sn
);
} else {
}
return vec![];
}
let mut missing_seqnums = Vec::with_capacity(32);
let relevant_interval = SequenceNumber::range_inclusive(
max(hb_first_sn, self.ack_base), hb_last_sn,
);
let known =
if relevant_interval.begin() <= relevant_interval.end() {
self.changes
.range( relevant_interval )
.map(|e| *e.0)
.collect()
} else { vec![] };
let mut known_iter = known.iter();
let mut known_head = known_iter.next();
for s in relevant_interval {
match known_head {
None => missing_seqnums.push(s), Some(known_sn) => {
if *known_sn == s {
known_head = known_iter.next();
} else {
missing_seqnums.push(s);
}
}
}
}
missing_seqnums
}
pub fn should_ignore_change(&self, seqnum: SequenceNumber) -> bool {
seqnum < self.ack_base || self.changes.contains_key(&seqnum)
}
pub fn received_changes_add(&mut self, seq_num: SequenceNumber, receive_timestamp: Timestamp) {
self.changes.insert(seq_num, Some(receive_timestamp));
if seq_num > self.last_received_sequence_number {
self.last_received_sequence_number = seq_num;
self.last_received_timestamp = receive_timestamp;
}
if seq_num == self.ack_base {
let mut s = seq_num;
for (&sn, what) in self.changes.range((Excluded(&seq_num), Unbounded)) {
if sn == s + SequenceNumber::new(1) {
s = s + SequenceNumber::new(1); debug!("received_changes_add: Already have {:?} : {:?}", sn, what);
} else {
break; }
} self.ack_base = s + SequenceNumber::new(1);
debug!(
"ack_base increased to {:?} by received_changes_add {:?} writer={:?}",
self.ack_base, seq_num, self.remote_writer_guid
);
}
}
pub fn set_irrelevant_change(&mut self, seq_num: SequenceNumber) -> Option<Timestamp> {
if seq_num >= self.ack_base {
self.changes.insert(seq_num, None).flatten() } else {
None
}
}
pub fn irrelevant_changes_range(
&mut self,
remove_from: SequenceNumber,
remove_until_before: SequenceNumber,
) -> BTreeMap<SequenceNumber, Timestamp> {
if remove_from > remove_until_before {
error!(
"irrelevant_changes_range: negative range: remove_from={:?} remove_until_before={:?}",
remove_from, remove_until_before
);
return BTreeMap::new();
}
if remove_from <= self.ack_base {
let mut removed_and_after = self.changes.split_off(&remove_from);
let mut after = removed_and_after.split_off(&remove_until_before);
let removed = removed_and_after;
self.changes.append(&mut after);
self.ack_base = max(remove_until_before, self.ack_base);
debug!(
"ack_base increased to {:?} by irrelevant_changes_range {:?} to {:?}. writer={:?}",
self.ack_base, remove_from, remove_until_before, self.remote_writer_guid
);
removed
.iter()
.filter_map(|(k, v)| v.map(|c| (*k, c)))
.collect()
} else {
let mut removed = BTreeMap::new();
for na in
SequenceNumber::range_inclusive(remove_from, remove_until_before - SequenceNumber::new(1))
{
self
.changes
.entry(na)
.and_modify(|known| {
known.map(|ts| removed.insert(na, ts));
})
.or_insert(None); }
removed
}
}
pub fn irrelevant_changes_up_to(
&mut self,
smallest_seqnum: SequenceNumber,
) -> BTreeMap<SequenceNumber, Timestamp> {
self.irrelevant_changes_range(SequenceNumber::new(0), smallest_seqnum)
}
fn discovered_or_default(drd: &[Locator], default: &[Locator]) -> Vec<Locator> {
if drd.is_empty() {
default.to_vec()
} else {
drd.to_vec()
}
}
pub fn from_discovered_writer_data(
discovered_writer_data: &DiscoveredWriterData,
default_unicast_locators: &[Locator],
default_multicast_locators: &[Locator],
) -> RtpsWriterProxy {
let unicast_locator_list = Self::discovered_or_default(
&discovered_writer_data.writer_proxy.unicast_locator_list,
default_unicast_locators,
);
let multicast_locator_list = Self::discovered_or_default(
&discovered_writer_data.writer_proxy.multicast_locator_list,
default_multicast_locators,
);
RtpsWriterProxy {
remote_writer_guid: discovered_writer_data.writer_proxy.remote_writer_guid,
remote_group_entity_id: EntityId::UNKNOWN,
unicast_locator_list,
multicast_locator_list,
changes: BTreeMap::new(),
received_heartbeat_count: 0,
sent_ack_nack_count: 0,
ack_base: SequenceNumber::default(),
last_received_sequence_number: SequenceNumber::new(0),
last_received_timestamp: Timestamp::INVALID,
fragment_assembler: None,
}
}
pub fn handle_datafrag(
&mut self,
datafrag: &DataFrag,
flags: BitFlags<DATAFRAG_Flags>,
) -> Option<DDSData> {
if let Some(ref mut fa) = self.fragment_assembler {
fa.new_datafrag(datafrag, flags)
} else {
let mut fa = FragmentAssembler::new(datafrag.fragment_size);
let ret = fa.new_datafrag(datafrag, flags);
self.fragment_assembler = Some(fa);
ret
}
}
pub fn missing_frags_for<'a>(
&'a self,
seq: SequenceNumber,
) -> Box<dyn 'a + Iterator<Item = FragmentNumber>> {
if let Some(ref fa) = self.fragment_assembler {
fa.missing_frags_for(seq)
} else {
Box::new(iter::empty())
}
}
pub fn is_partially_received(&self, seq: SequenceNumber) -> bool {
if let Some(ref fa) = self.fragment_assembler {
fa.is_partially_received(seq)
} else {
false
}
}
}