use std::collections::BTreeSet;
#[allow(unused_imports)]
use log::{debug, error, trace, warn};
use crate::{
dds::{participant::DomainParticipant, qos::QosPolicies},
discovery::data_types::topic_data::DiscoveredReaderData,
messages::submessages::submessage::AckSubmessage,
network::constant::*,
structure::{
guid::{EntityId, GUID},
locator::Locator,
sequence_number::SequenceNumber,
},
};
use super::reader::ReaderIngredients;
#[derive(Debug, PartialEq, Clone)]
pub(crate) struct RtpsReaderProxy {
pub remote_reader_guid: GUID,
pub remote_group_entity_id: EntityId,
pub unicast_locator_list: Vec<Locator>,
pub multicast_locator_list: Vec<Locator>,
pub expects_in_line_qos: bool,
pub is_active: bool,
pub all_acked_before: SequenceNumber,
pub unsent_changes: BTreeSet<SequenceNumber>,
pub repair_mode: bool,
pub qos: QosPolicies,
}
impl RtpsReaderProxy {
pub fn new(remote_reader_guid: GUID, qos: QosPolicies) -> Self {
Self {
remote_reader_guid,
remote_group_entity_id: EntityId::UNKNOWN,
unicast_locator_list: Vec::default(),
multicast_locator_list: Vec::default(),
expects_in_line_qos: false,
is_active: true,
all_acked_before: SequenceNumber::zero(),
unsent_changes: BTreeSet::new(),
repair_mode: false,
qos,
}
}
pub fn qos(&self) -> &QosPolicies {
&self.qos
}
pub fn from_reader(reader: &ReaderIngredients, domain_participant: &DomainParticipant) -> Self {
let mut self_locators = domain_participant.self_locators(); let unicast_locator_list = self_locators
.remove(&USER_TRAFFIC_LISTENER_TOKEN)
.unwrap_or_default();
let multicast_locator_list = self_locators
.remove(&USER_TRAFFIC_MUL_LISTENER_TOKEN)
.unwrap_or_default();
Self {
remote_reader_guid: reader.guid,
remote_group_entity_id: EntityId::UNKNOWN, unicast_locator_list,
multicast_locator_list,
expects_in_line_qos: false,
is_active: true,
all_acked_before: SequenceNumber::zero(),
unsent_changes: BTreeSet::new(),
repair_mode: false,
qos: reader.qos_policy.clone(),
}
}
fn discovered_or_default(drd: &[Locator], default: &[Locator]) -> Vec<Locator> {
if drd.is_empty() {
default.to_vec()
} else {
drd.to_vec()
}
}
pub fn from_discovered_reader_data(
discovered_reader_data: &DiscoveredReaderData,
default_unicast_locators: &[Locator],
default_multicast_locators: &[Locator],
) -> Self {
let unicast_locator_list = Self::discovered_or_default(
&discovered_reader_data.reader_proxy.unicast_locator_list,
default_unicast_locators,
);
let multicast_locator_list = Self::discovered_or_default(
&discovered_reader_data.reader_proxy.multicast_locator_list,
default_multicast_locators,
);
Self {
remote_reader_guid: discovered_reader_data.reader_proxy.remote_reader_guid,
remote_group_entity_id: EntityId::UNKNOWN, unicast_locator_list,
multicast_locator_list,
expects_in_line_qos: discovered_reader_data.reader_proxy.expects_inline_qos,
is_active: true,
all_acked_before: SequenceNumber::zero(),
unsent_changes: BTreeSet::new(),
repair_mode: false,
qos: discovered_reader_data.subscription_topic_data.qos(),
}
}
pub fn handle_ack_nack(
&mut self,
ack_submessage: &AckSubmessage,
last_available: SequenceNumber,
) {
match ack_submessage {
AckSubmessage::AckNack(acknack) => {
self.all_acked_before = acknack.reader_sn_state.base();
self.unsent_changes = self.unsent_changes.split_off(&self.all_acked_before);
for nack_sn in acknack.reader_sn_state.iter() {
self.unsent_changes.insert(nack_sn);
}
if let Some(&high) = self.unsent_changes.iter().next_back() {
if high > last_available {
warn!(
"ReaderProxy {:?} asks for {:?} but I have only up to {:?}. ACKNACK = {:?}",
self.remote_reader_guid, self.unsent_changes, last_available, acknack
);
}
}
}
AckSubmessage::NackFrag(_nack_frag) => {
error!("NACKFRAG not implemented");
}
}
}
pub fn notify_new_cache_change(&mut self, sequence_number: SequenceNumber) {
if sequence_number == SequenceNumber::from(0) {
error!(
"new cache change with {:?}! bad! my GUID = {:?}",
sequence_number, self.remote_reader_guid
);
}
self.unsent_changes.insert(sequence_number);
}
pub fn acked_up_to_before(&self) -> SequenceNumber {
self.all_acked_before
}
}