#[allow(unused_imports)]
use log::{debug, warn, trace, error};
use crate::{
network::constant::get_user_traffic_multicast_port,
network::constant::get_user_traffic_unicast_port,
network::util::get_local_multicast_locators,
network::util::get_local_unicast_socket_address,
structure::{
entity::RTPSEntity,
guid::{EntityId, GUID, EntityKind},
locator::{Locator, LocatorList},
sequence_number::{SequenceNumber},
},
messages::submessages::submessages::{AckNack},
discovery::data_types::topic_data::DiscoveredReaderData,
};
use std::{
collections::BTreeSet,
net::{SocketAddr, Ipv4Addr},
};
use super::reader::Reader;
#[derive(Debug, PartialEq, Clone)]
pub(crate) struct RtpsReaderProxy {
pub remote_reader_guid: GUID,
pub remote_group_entity_id: EntityId,
pub unicast_locator_list: LocatorList,
pub multicast_locator_list: LocatorList,
pub expects_in_line_qos: bool,
pub is_active: bool,
pub all_acked_before : SequenceNumber,
pub unsent_changes: BTreeSet<SequenceNumber>,
pub repair_mode : bool,
}
impl RtpsReaderProxy {
pub fn new(remote_reader_guid: GUID) -> RtpsReaderProxy {
RtpsReaderProxy {
remote_reader_guid,
remote_group_entity_id: EntityId::ENTITYID_UNKNOWN,
unicast_locator_list: LocatorList::new(),
multicast_locator_list: LocatorList::new(),
expects_in_line_qos: false,
is_active: true,
all_acked_before: SequenceNumber::zero(),
unsent_changes: BTreeSet::new(),
repair_mode: false,
}
}
pub fn from_reader(reader: &Reader, domain_id: u16, participant_id: u16) -> RtpsReaderProxy {
let unicast_locator_list =
get_local_unicast_socket_address(get_user_traffic_unicast_port(domain_id, participant_id));
let multicast_locator_list =
get_local_multicast_locators(get_user_traffic_multicast_port(domain_id));
RtpsReaderProxy {
remote_reader_guid: reader.get_guid(),
remote_group_entity_id: EntityId::ENTITYID_UNKNOWN, unicast_locator_list,
multicast_locator_list,
expects_in_line_qos: false,
is_active: true,
all_acked_before: SequenceNumber::zero(),
unsent_changes: BTreeSet::new(),
repair_mode: false,
}
}
fn discovered_or_default(drd: &LocatorList, default: &LocatorList) -> LocatorList {
if drd.is_empty() {
default.clone()
} else {
drd.clone()
}
}
pub fn from_discovered_reader_data(
discovered_reader_data: &DiscoveredReaderData,
default_unicast_locators: LocatorList,
default_multicast_locators: LocatorList,
) -> RtpsReaderProxy {
let unicast_locator_list = Self::discovered_or_default(
&discovered_reader_data.reader_proxy.unicast_locator_list,
&default_unicast_locators);
let multicast_locator_list = Self::discovered_or_default(
&discovered_reader_data.reader_proxy.multicast_locator_list,
&default_multicast_locators);
RtpsReaderProxy {
remote_reader_guid: discovered_reader_data.reader_proxy.remote_reader_guid.clone(),
remote_group_entity_id: EntityId::ENTITYID_UNKNOWN, unicast_locator_list,
multicast_locator_list,
expects_in_line_qos: discovered_reader_data.reader_proxy.expects_inline_qos,
is_active: true,
all_acked_before: SequenceNumber::zero(),
unsent_changes: BTreeSet::new(),
repair_mode: false,
}
}
pub fn update(&mut self, updated: &RtpsReaderProxy) {
if self.remote_reader_guid == updated.remote_reader_guid {
self.unicast_locator_list = updated.unicast_locator_list.clone();
self.multicast_locator_list = updated.multicast_locator_list.clone();
self.expects_in_line_qos = updated.expects_in_line_qos.clone();
}
}
pub fn new_for_unit_testing(port_number: u16) -> RtpsReaderProxy {
let mut unicastLocators = LocatorList::new();
let locator = Locator::from(SocketAddr::new(
std::net::IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
port_number,
));
unicastLocators.push(locator);
RtpsReaderProxy {
remote_reader_guid: GUID::dummy_test_guid(EntityKind::READER_WITH_KEY_USER_DEFINED),
remote_group_entity_id: EntityId::ENTITYID_UNKNOWN,
unicast_locator_list: unicastLocators,
multicast_locator_list: LocatorList::new(),
expects_in_line_qos: false,
is_active: true,
all_acked_before: SequenceNumber::zero(),
unsent_changes: BTreeSet::new(),
repair_mode: false,
}
}
pub fn can_send(&self) -> bool {
! self.unsent_changes.is_empty()
}
pub fn handle_ack_nack(&mut self, acknack: &AckNack, last_available:SequenceNumber) {
self.all_acked_before = acknack.reader_sn_state.base();
self.unsent_changes = self.unsent_changes.split_off(&self.all_acked_before);
for nack_sn in acknack.reader_sn_state.iter() {
self.unsent_changes.insert(nack_sn);
}
if let Some(&high) = self.unsent_changes.iter().next_back() {
if high > last_available {
warn!("ReaderProxy {:?} asks for {:?} but I have only up to {:?}. ACKNACK = {:?}",
self.remote_reader_guid, self.unsent_changes, last_available, acknack);
}
}
}
pub fn notify_new_cache_change(&mut self, sequence_number: SequenceNumber) {
if sequence_number == SequenceNumber::from(0) {
error!("new cache change with {:?}! bad! my GUID = {:?}",sequence_number, self.remote_reader_guid);
}
self.unsent_changes.insert(sequence_number);
}
pub fn acked_up_to_before(&self) -> SequenceNumber {
self.all_acked_before
}
pub fn content_is_equal(&self, other: &RtpsReaderProxy) -> bool {
self.remote_reader_guid == other.remote_reader_guid
&& self.remote_group_entity_id == other.remote_group_entity_id
&& self.unicast_locator_list == other.unicast_locator_list
&& self.multicast_locator_list == other.multicast_locator_list
&& self.expects_in_line_qos == other.expects_in_line_qos
&& self.is_active == other.is_active
}
}