use crate::{
infrastructure::time::{Duration, Time},
rtps_messages::{
submessages::{heartbeat::HeartbeatSubmessage, heartbeat_frag::HeartbeatFragSubmessage},
types::{Count, FragmentNumber},
},
transport::types::{
CacheChange, DurabilityKind, EntityId, Guid, Locator, ReliabilityKind, SequenceNumber,
},
};
use alloc::vec::Vec;
#[derive(Debug, PartialEq, Eq)]
pub struct HeartbeatMachine {
count: Count,
reader_id: EntityId,
last_heartbeat_time: Time,
}
impl HeartbeatMachine {
fn new(reader_id: EntityId) -> Self {
HeartbeatMachine {
count: 0,
reader_id,
last_heartbeat_time: Time::new(0, 0),
}
}
pub fn is_time_for_heartbeat(&self, now: Time, heartbeat_period: Duration) -> bool {
now - self.last_heartbeat_time >= heartbeat_period
}
pub fn generate_new_heartbeat(
&mut self,
writer_id: EntityId,
first_sn: SequenceNumber,
last_sn: SequenceNumber,
heartbeat_time: Time,
final_flag: bool,
) -> HeartbeatSubmessage {
self.count = self.count.wrapping_add(1);
self.last_heartbeat_time = heartbeat_time;
HeartbeatSubmessage::new(
final_flag,
false,
self.reader_id,
writer_id,
first_sn,
last_sn,
self.count,
)
}
}
#[derive(Debug, PartialEq, Eq)]
pub struct HeartbeatFragMachine {
count: Count,
reader_id: EntityId,
}
impl HeartbeatFragMachine {
fn new(reader_id: EntityId) -> Self {
HeartbeatFragMachine {
count: 0,
reader_id,
}
}
pub fn _submessage(
&mut self,
writer_id: EntityId,
writer_sn: SequenceNumber,
last_fragment_num: FragmentNumber,
) -> HeartbeatFragSubmessage {
self.count = self.count.wrapping_add(1);
HeartbeatFragSubmessage::_new(
self.reader_id,
writer_id,
writer_sn,
last_fragment_num,
self.count,
)
}
}
#[derive(Debug, PartialEq, Eq)]
pub struct RtpsReaderProxy {
remote_reader_guid: Guid,
remote_group_entity_id: EntityId,
unicast_locator_list: Vec<Locator>,
multicast_locator_list: Vec<Locator>,
highest_sent_seq_num: SequenceNumber,
highest_acked_seq_num: SequenceNumber,
requested_changes: Vec<SequenceNumber>,
expects_inline_qos: bool,
is_active: bool,
last_received_acknack_count: Count,
last_received_nack_frag_count: Count,
heartbeat_machine: HeartbeatMachine,
heartbeat_frag_machine: HeartbeatFragMachine,
reliability: ReliabilityKind,
first_relevant_sample_seq_num: SequenceNumber,
durability: DurabilityKind,
}
impl RtpsReaderProxy {
#[allow(clippy::too_many_arguments)]
pub fn new(
remote_reader_guid: Guid,
remote_group_entity_id: EntityId,
unicast_locator_list: &[Locator],
multicast_locator_list: &[Locator],
expects_inline_qos: bool,
is_active: bool,
reliability: ReliabilityKind,
first_relevant_sample_seq_num: SequenceNumber,
durability: DurabilityKind,
) -> Self {
let heartbeat_machine = HeartbeatMachine::new(remote_reader_guid.entity_id());
let heartbeat_frag_machine = HeartbeatFragMachine::new(remote_reader_guid.entity_id());
Self {
remote_reader_guid,
remote_group_entity_id,
unicast_locator_list: unicast_locator_list.to_vec(),
multicast_locator_list: multicast_locator_list.to_vec(),
highest_sent_seq_num: 0,
highest_acked_seq_num: 0,
requested_changes: Vec::new(),
expects_inline_qos,
is_active,
last_received_acknack_count: 0,
last_received_nack_frag_count: 0,
heartbeat_machine,
heartbeat_frag_machine,
reliability,
first_relevant_sample_seq_num,
durability,
}
}
pub fn remote_reader_guid(&self) -> Guid {
self.remote_reader_guid
}
pub fn unicast_locator_list(&self) -> &[Locator] {
self.unicast_locator_list.as_slice()
}
pub fn reliability(&self) -> ReliabilityKind {
self.reliability
}
pub fn heartbeat_machine(&mut self) -> &mut HeartbeatMachine {
&mut self.heartbeat_machine
}
pub fn _heartbeat_frag_machine(&mut self) -> &mut HeartbeatFragMachine {
&mut self.heartbeat_frag_machine
}
pub fn durability(&self) -> DurabilityKind {
self.durability
}
pub fn acked_changes_set(&mut self, committed_seq_num: SequenceNumber) {
if committed_seq_num > self.highest_acked_seq_num {
self.highest_acked_seq_num = committed_seq_num
}
}
pub fn next_requested_change(&mut self) -> Option<SequenceNumber> {
let next_requested_change = self.requested_changes.iter().min().cloned();
if let Some(next_sn) = &next_requested_change {
self.requested_changes.retain(|sn| sn != next_sn);
}
next_requested_change
}
pub fn next_unsent_change<'a>(
&'a self,
writer_history_cache: impl Iterator<Item = &'a CacheChange>,
) -> Option<SequenceNumber> {
writer_history_cache
.map(|cc| cc.sequence_number)
.filter(|cc_sn| cc_sn > &self.highest_sent_seq_num)
.min()
}
pub fn unsent_changes<'a>(
&'a self,
writer_history_cache: impl Iterator<Item = &'a CacheChange>,
) -> bool {
self.next_unsent_change(writer_history_cache).is_some()
}
pub fn requested_changes(&self) -> Vec<SequenceNumber> {
self.requested_changes.clone()
}
pub fn requested_changes_set(&mut self, req_seq_num_set: impl Iterator<Item = SequenceNumber>) {
for seq_num in req_seq_num_set {
if !self.requested_changes.contains(&seq_num) {
self.requested_changes.push(seq_num);
}
}
}
pub fn unacked_changes(&self, highest_available_seq_num: Option<SequenceNumber>) -> bool {
match highest_available_seq_num {
Some(highest_available_seq_num) => {
highest_available_seq_num > self.highest_acked_seq_num
}
None => false,
}
}
pub fn highest_sent_seq_num(&self) -> SequenceNumber {
self.highest_sent_seq_num
}
pub fn set_highest_sent_seq_num(&mut self, seq_num: SequenceNumber) {
if seq_num > self.highest_sent_seq_num {
self.highest_sent_seq_num = seq_num;
}
}
pub fn first_relevant_sample_seq_num(&self) -> SequenceNumber {
self.first_relevant_sample_seq_num
}
pub fn _set_first_relevant_sample_seq_num(&mut self, seq_num: SequenceNumber) {
self.first_relevant_sample_seq_num = seq_num;
}
pub fn last_received_acknack_count(&self) -> Count {
self.last_received_acknack_count
}
pub fn set_last_received_acknack_count(&mut self, count: Count) {
self.last_received_acknack_count = count;
}
pub fn last_received_nack_frag_count(&self) -> Count {
self.last_received_nack_frag_count
}
pub fn set_last_received_nack_frag_count(&mut self, count: Count) {
self.last_received_nack_frag_count = count;
}
}