use std::{
collections::{BTreeMap, BTreeSet},
fmt,
rc::Rc,
sync::{Arc, RwLock},
time::Duration as StdDuration,
};
use mio::Token;
use mio_extras::{channel as mio_channel, timer::Timer};
use log::{debug, error, info, trace, warn};
use enumflags2::BitFlags;
use speedy::{Endianness, Writable};
use crate::{
dds::{
ddsdata::DDSData,
message_receiver::MessageReceiverState,
qos::{policy, HasQoSPolicy, QosPolicies},
rtps_writer_proxy::RtpsWriterProxy,
statusevents::{CountWithChange, DataReaderStatus},
with_key::datawriter::{WriteOptions, WriteOptionsBuilder},
},
messages::{
header::Header,
protocol_id::ProtocolId,
protocol_version::ProtocolVersion,
submessages::{submessage_elements::parameter_list::ParameterList, submessages::*},
vendor_id::VendorId,
},
network::udp_sender::UDPSender,
serialization::message::Message,
structure::{
cache_change::{CacheChange, ChangeKind},
dds_cache::DDSCache,
entity::RTPSEntity,
guid::{EntityId, GuidPrefix, GUID},
locator::Locator,
sequence_number::{SequenceNumber, SequenceNumberSet},
time::Timestamp,
},
};
use super::{qos::InlineQos, with_key::datareader::ReaderCommand};
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub(crate) enum TimedEvent {
DeadlineMissedCheck,
}
pub(crate) struct ReaderIngredients {
pub guid: GUID,
pub notification_sender: mio_channel::SyncSender<()>,
pub status_sender: mio_channel::SyncSender<DataReaderStatus>,
pub topic_name: String,
pub qos_policy: QosPolicies,
pub data_reader_command_receiver: mio_channel::Receiver<ReaderCommand>,
}
impl ReaderIngredients {
pub fn alt_entity_token(&self) -> Token {
self.guid.entity_id.as_alt_token()
}
}
impl fmt::Debug for ReaderIngredients {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Reader")
.field("my_guid", &self.guid)
.field("topic_name", &self.topic_name)
.field("qos_policy", &self.qos_policy)
.finish()
}
}
pub(crate) struct Reader {
notification_sender: mio_channel::SyncSender<()>,
status_sender: mio_channel::SyncSender<DataReaderStatus>,
udp_sender: Rc<UDPSender>,
is_stateful: bool, reliability: policy::Reliability,
dds_cache: Arc<RwLock<DDSCache>>,
#[cfg(test)]
seqnum_instant_map: BTreeMap<SequenceNumber, Timestamp>,
topic_name: String,
qos_policy: QosPolicies,
my_guid: GUID,
heartbeat_response_delay: StdDuration,
#[allow(dead_code)]
heartbeat_supression_duration: StdDuration,
received_hearbeat_count: i32,
matched_writers: BTreeMap<GUID, RtpsWriterProxy>,
writer_match_count_total: i32,
requested_deadline_missed_count: i32,
offered_incompatible_qos_count: i32,
pub(crate) timed_event_timer: Timer<TimedEvent>,
pub(crate) data_reader_command_receiver: mio_channel::Receiver<ReaderCommand>,
}
impl Reader {
pub fn new(
i: ReaderIngredients,
dds_cache: Arc<RwLock<DDSCache>>,
udp_sender: Rc<UDPSender>,
timed_event_timer: Timer<TimedEvent>,
) -> Self {
Self {
notification_sender: i.notification_sender,
status_sender: i.status_sender,
udp_sender,
is_stateful: true,
reliability: i
.qos_policy
.reliability() .unwrap_or(policy::Reliability::BestEffort), dds_cache,
topic_name: i.topic_name,
qos_policy: i.qos_policy,
#[cfg(test)]
seqnum_instant_map: BTreeMap::new(),
my_guid: i.guid,
heartbeat_response_delay: StdDuration::new(0, 500_000_000), heartbeat_supression_duration: StdDuration::new(0, 0),
received_hearbeat_count: 0,
matched_writers: BTreeMap::new(),
writer_match_count_total: 0,
requested_deadline_missed_count: 0,
offered_incompatible_qos_count: 0,
timed_event_timer,
data_reader_command_receiver: i.data_reader_command_receiver,
}
}
pub fn entity_token(&self) -> Token {
self.guid().entity_id.as_token()
}
pub fn set_requested_deadline_check_timer(&mut self) {
if let Some(deadline) = self.qos_policy.deadline {
debug!(
"GUID={:?} set_requested_deadline_check_timer: {:?}",
self.my_guid,
deadline.0.to_std()
);
self
.timed_event_timer
.set_timeout(deadline.0.to_std(), TimedEvent::DeadlineMissedCheck);
} else {
trace!(
"GUID={:?} - no deaadline policy - do not set set_requested_deadline_check_timer",
self.my_guid
);
}
}
pub fn send_status_change(&self, change: DataReaderStatus) {
match self.status_sender.try_send(change) {
Ok(()) => (), Err(mio_channel::TrySendError::Full(_)) => {
trace!("Reader cannot send new status changes, datareader is full.");
}
Err(mio_channel::TrySendError::Disconnected(_)) => {
info!("send_status_change - cannot send status, DataReader Disconnected.");
}
Err(mio_channel::TrySendError::Io(e)) => {
error!("send_status_change - cannot send status: {:?}", e);
}
}
}
fn calculate_if_requested_deadline_is_missed(&mut self) -> Vec<DataReaderStatus> {
debug!("calculate_if_requested_deadline_is_missed");
let deadline_duration = match self.qos_policy.deadline {
None => return vec![],
Some(policy::Deadline(deadline_duration)) => deadline_duration,
};
let mut changes: Vec<DataReaderStatus> = vec![];
let now = Timestamp::now();
for writer_proxy in self.matched_writers.values_mut() {
if let Some(last_change) = writer_proxy.last_change_timestamp() {
let since_last = now.duration_since(last_change);
trace!(
"Comparing deadlines: {:?} - {:?}",
since_last,
deadline_duration
);
if since_last > deadline_duration {
debug!(
"Deadline missed: {:?} - {:?}",
since_last, deadline_duration
);
self.requested_deadline_missed_count += 1;
changes.push(DataReaderStatus::RequestedDeadlineMissed {
count: CountWithChange::start_from(self.requested_deadline_missed_count, 1),
});
}
} else {
self.requested_deadline_missed_count += 1;
changes.push(DataReaderStatus::RequestedDeadlineMissed {
count: CountWithChange::start_from(self.requested_deadline_missed_count, 1),
});
}
} changes
}
pub fn handle_timed_event(&mut self) {
while let Some(e) = self.timed_event_timer.poll() {
match e {
TimedEvent::DeadlineMissedCheck => {
self.handle_requested_deadline_event();
self.set_requested_deadline_check_timer(); }
}
}
}
pub fn process_command(&mut self) {
trace!("process_command {:?}", self.my_guid);
loop {
use std::sync::mpsc::TryRecvError;
match self.data_reader_command_receiver.try_recv() {
Ok(ReaderCommand::ResetRequestedDeadlineStatus) => {
warn!("RESET_REQUESTED_DEADLINE_STATUS not implemented!");
}
Err(TryRecvError::Disconnected) => {
trace!("DataReader disconnected");
break;
}
Err(TryRecvError::Empty) => {
warn!("There was no command. Spurious command event??");
break;
}
}
}
}
fn handle_requested_deadline_event(&mut self) {
debug!("handle_requested_deadline_event");
for missed_deadline in self.calculate_if_requested_deadline_is_missed() {
self.send_status_change(missed_deadline);
}
}
#[cfg(test)]
pub fn history_cache_change_data(&self, sequence_number: SequenceNumber) -> Option<DDSData> {
let dds_cache = self.dds_cache.read().unwrap();
let cc = self
.seqnum_instant_map
.get(&sequence_number)
.and_then(|i| dds_cache.topic_get_change(&self.topic_name, i));
debug!("history cache !!!! {:?}", cc);
cc.map(|cc| cc.data_value.clone())
}
#[cfg(test)]
pub fn history_cache_change(&self, sequence_number: SequenceNumber) -> Option<CacheChange> {
debug!("{:?}", sequence_number);
let dds_cache = self.dds_cache.read().unwrap();
let cc = self
.seqnum_instant_map
.get(&sequence_number)
.and_then(|i| dds_cache.topic_get_change(&self.topic_name, i));
debug!("history cache !!!! {:?}", cc);
cc.cloned()
}
#[cfg(test)]
pub fn history_cache_sequence_start_and_end_numbers(&self) -> Vec<SequenceNumber> {
let start = self.seqnum_instant_map.iter().min().unwrap().0;
let end = self.seqnum_instant_map.iter().max().unwrap().0;
return vec![*start, *end];
}
pub fn update_writer_proxy(&mut self, proxy: RtpsWriterProxy, offered_qos: &QosPolicies) {
debug!("update_writer_proxy topic={:?}", self.topic_name);
match offered_qos.compliance_failure_wrt(&self.qos_policy) {
None => {
let writer_id = proxy.remote_writer_guid;
let count_change = self.matched_writer_update(proxy);
if count_change > 0 {
self.writer_match_count_total += count_change;
self.send_status_change(DataReaderStatus::SubscriptionMatched {
total: CountWithChange::new(self.writer_match_count_total, count_change),
current: CountWithChange::new(self.matched_writers.len() as i32, count_change),
});
info!(
"Matched new remote writer on topic={:?} writer= {:?}",
self.topic_name, writer_id
);
}
}
Some(bad_policy_id) => {
self.offered_incompatible_qos_count += 1;
self.send_status_change(DataReaderStatus::RequestedIncompatibleQos {
count: CountWithChange::new(self.offered_incompatible_qos_count, 1),
last_policy_id: bad_policy_id,
policies: Vec::new(), });
warn!("update_writer_proxy - QoS mismatch {:?}", bad_policy_id);
info!(
"update_writer_proxy - QoS mismatch: topic={:?} requested={:?} offered={:?}",
self.topic_name, &self.qos_policy, offered_qos
);
}
}
}
fn matched_writer_update(&mut self, proxy: RtpsWriterProxy) -> i32 {
if let Some(op) = self.matched_writer_lookup(proxy.remote_writer_guid) {
op.update_contents(proxy);
0
} else {
self.matched_writers.insert(proxy.remote_writer_guid, proxy);
1
}
}
pub fn remove_writer_proxy(&mut self, writer_guid: GUID) {
if self.matched_writers.contains_key(&writer_guid) {
self.matched_writers.remove(&writer_guid);
self.send_status_change(DataReaderStatus::SubscriptionMatched {
total: CountWithChange::new(self.writer_match_count_total, 0),
current: CountWithChange::new(self.matched_writers.len() as i32, -1),
});
}
}
pub fn participant_lost(&mut self, guid_prefix: GuidPrefix) {
let lost_readers: Vec<GUID> = self
.matched_writers
.range(guid_prefix.range())
.map(|(g, _)| *g)
.collect();
for reader in lost_readers {
self.remove_writer_proxy(reader);
}
}
pub fn contains_writer(&self, entity_id: EntityId) -> bool {
self
.matched_writers
.iter()
.any(|(&g, _)| g.entity_id == entity_id)
}
#[cfg(test)]
pub(crate) fn matched_writer_add(
&mut self,
remote_writer_guid: GUID,
remote_group_entity_id: EntityId,
unicast_locator_list: Vec<Locator>,
multicast_locator_list: Vec<Locator>,
qos: &QosPolicies,
) {
let proxy = RtpsWriterProxy::new(
remote_writer_guid,
unicast_locator_list,
multicast_locator_list,
remote_group_entity_id,
);
self.update_writer_proxy(proxy, qos);
}
fn matched_writer_lookup(&mut self, remote_writer_guid: GUID) -> Option<&mut RtpsWriterProxy> {
self.matched_writers.get_mut(&remote_writer_guid)
}
pub fn handle_data_msg(
&mut self,
data: Data,
data_flags: BitFlags<DATA_Flags>,
mr_state: &MessageReceiverState,
) {
let receive_timestamp = Timestamp::now();
let mut write_options_b = WriteOptionsBuilder::new();
if let Some(source_timestamp) = mr_state.source_timestamp {
write_options_b = write_options_b.source_timestamp(source_timestamp);
}
let ri = DATA_Flags::cdr_representation_identifier(data_flags);
if let Some(related_sample_identity) = data
.inline_qos
.as_ref()
.and_then(|iqos| InlineQos::related_sample_identity(iqos, ri).ok())
.flatten()
{
write_options_b = write_options_b.related_sample_identity(related_sample_identity);
}
let writer_guid = GUID::new_with_prefix_and_id(mr_state.source_guid_prefix, data.writer_id);
let writer_seq_num = data.writer_sn;
match self.data_to_ddsdata(data, data_flags) {
Ok(ddsdata) => self.process_received_data(
ddsdata,
receive_timestamp,
write_options_b.build(),
writer_guid,
writer_seq_num,
),
Err(e) => debug!("Parsing DATA to DDSData failed: {}", e),
}
}
pub fn handle_datafrag_msg(
&mut self,
datafrag: &DataFrag,
datafrag_flags: BitFlags<DATAFRAG_Flags>,
mr_state: &MessageReceiverState,
) {
let writer_guid = GUID::new_with_prefix_and_id(mr_state.source_guid_prefix, datafrag.writer_id);
let seq_num = datafrag.writer_sn;
let receive_timestamp = Timestamp::now();
if let (Some(source_timestamp), Some(lifespan)) =
(mr_state.source_timestamp, self.qos().lifespan)
{
let elapsed = receive_timestamp.duration_since(source_timestamp);
if lifespan.duration < elapsed {
info!(
"DataFrag {:?} from {:?} lifespan exeeded. duration={:?} elapsed={:?}",
seq_num, writer_guid, lifespan.duration, elapsed
);
return;
}
}
let mut write_options_b = WriteOptionsBuilder::new();
if let Some(source_timestamp) = mr_state.source_timestamp {
write_options_b = write_options_b.source_timestamp(source_timestamp);
}
let ri = DATAFRAG_Flags::cdr_representation_identifier(datafrag_flags);
if let Some(related_sample_identity) = datafrag
.inline_qos
.as_ref()
.and_then(|iqos| InlineQos::related_sample_identity(iqos, ri).ok())
.flatten()
{
write_options_b = write_options_b.related_sample_identity(related_sample_identity);
}
let writer_seq_num = datafrag.writer_sn; if let Some(writer_proxy) = self.matched_writer_lookup(writer_guid) {
if let Some(complete_ddsdata) = writer_proxy.handle_datafrag(datafrag, datafrag_flags) {
self.process_received_data(
complete_ddsdata,
receive_timestamp,
write_options_b.build(),
writer_guid,
writer_seq_num,
);
} else {
}
} else {
info!("Reader got DATAFRAG, but I have no writer proxy");
}
}
fn process_received_data(
&mut self,
ddsdata: DDSData,
receive_timestamp: Timestamp,
write_options: WriteOptions,
writer_guid: GUID,
writer_sn: SequenceNumber,
) {
trace!(
"handle_data_msg from {:?} seq={:?} topic={:?} reliability={:?} stateful={:?}",
&writer_guid,
writer_sn,
self.topic_name,
self.reliability,
self.is_stateful,
);
if self.is_stateful {
let my_entityid = self.my_guid.entity_id; if let Some(writer_proxy) = self.matched_writer_lookup(writer_guid) {
if writer_proxy.should_ignore_change(writer_sn) {
debug!("handle_data_msg already have this seq={:?}", writer_sn);
if my_entityid == EntityId::SPDP_BUILTIN_PARTICIPANT_READER {
debug!("Accepting duplicate message to participant reader.");
} else {
return;
}
}
writer_proxy.received_changes_add(writer_sn, receive_timestamp);
} else {
info!(
"handle_data_msg in stateful Reader {:?} has no writer proxy for {:?} topic={:?}",
my_entityid, writer_guid, self.topic_name,
);
}
} else {
todo!()
}
self.make_cache_change(
ddsdata,
receive_timestamp,
write_options,
writer_guid,
writer_sn,
);
#[cfg(test)]
self.seqnum_instant_map.insert(writer_sn, receive_timestamp);
self.notify_cache_change();
}
fn data_to_ddsdata(
&self,
data: Data,
data_flags: BitFlags<DATA_Flags>,
) -> Result<DDSData, String> {
let representation_identifier = DATA_Flags::cdr_representation_identifier(data_flags);
match (
data.serialized_payload,
data_flags.contains(DATA_Flags::Data),
data_flags.contains(DATA_Flags::Key),
) {
(Some(sp), true, false) => {
Ok(DDSData::new(sp))
}
(Some(sp), false, true) => {
Ok(DDSData::new_disposed_by_key(
Self::deduce_change_kind(&data.inline_qos, false, representation_identifier),
sp,
))
}
(None, false, false) => {
let key_hash = if let Some(h) = data
.inline_qos
.as_ref()
.and_then(|iqos| InlineQos::key_hash(iqos).ok())
.flatten()
{
Ok(h)
} else {
info!("Received DATA that has no payload and no key_hash inline QoS - discarding");
Err("DATA with no contents".to_string())
}?;
let change_kind =
Self::deduce_change_kind(&data.inline_qos, false, representation_identifier);
info!(
"status change by Inline QoS: topic={:?} change={:?}",
self.topic_name, change_kind
);
Ok(DDSData::new_disposed_by_key_hash(change_kind, key_hash))
}
(Some(_), true, true) => {
warn!("Got DATA that claims to be both data and key - discarding.");
Err("Ambiguous data/key received.".to_string())
}
(Some(_), false, false) => {
warn!("make_cache_change - Flags says no data or key, but got payload!");
Err("DATA message has mystery contents".to_string())
}
(None, true, _) | (None, _, true) => {
warn!("make_cache_change - Where is my SerializedPayload?");
Err("DATA message contents missing".to_string())
}
}
}
pub fn handle_heartbeat_msg(
&mut self,
heartbeat: &Heartbeat,
final_flag_set: bool,
mr_state: MessageReceiverState,
) -> bool {
let writer_guid =
GUID::new_with_prefix_and_id(mr_state.source_guid_prefix, heartbeat.writer_id);
if self.reliability == policy::Reliability::BestEffort {
debug!(
"HEARTBEAT from {:?}, but this Reader is BestEffort. Ignoring. topic={:?} reader={:?}",
writer_guid, self.topic_name, self.my_guid
);
return false;
}
if !self.matched_writers.contains_key(&writer_guid) {
info!(
"HEARTBEAT from {:?}, but no writer proxy available. topic={:?} reader={:?}",
writer_guid, self.topic_name, self.my_guid
);
return false;
}
if heartbeat.first_sn < SequenceNumber::default() {
warn!(
"Writer {:?} advertised SequenceNumbers from {:?} to {:?}!",
writer_guid, heartbeat.first_sn, heartbeat.last_sn
);
}
let writer_proxy = if let Some(wp) = self.matched_writer_lookup(writer_guid) {
wp
} else {
error!("Writer proxy disappeared 1!");
return false;
};
let mut mr_state = mr_state;
mr_state.unicast_reply_locator_list = writer_proxy.unicast_locator_list.clone();
if heartbeat.count <= writer_proxy.received_heartbeat_count {
return false;
}
writer_proxy.received_heartbeat_count = heartbeat.count;
let removed_instances = writer_proxy.irrelevant_changes_up_to(heartbeat.first_sn);
{
let mut cache = match self.dds_cache.write() {
Ok(rwlock) => rwlock,
Err(e) => panic!("The DDSCache of is poisoned. Error: {}", e),
};
for instant in removed_instances.values() {
if cache
.topic_remove_change(&self.topic_name, instant)
.is_none()
{
debug!("WriterProxy told to remove an instant which was not present");
}
}
}
let reader_id = self.entity_id();
let writer_proxy = if let Some(wp) = self.matched_writer_lookup(writer_guid) {
wp
} else {
error!("Writer proxy disappeared 2!");
return false;
};
let missing_seqnums = writer_proxy.missing_seqnums(heartbeat.first_sn, heartbeat.last_sn);
if !missing_seqnums.is_empty() || !final_flag_set {
let reader_sn_state = match missing_seqnums.get(0) {
Some(&first_missing) => {
SequenceNumberSet::from_base_and_set(
first_missing,
&missing_seqnums
.iter()
.copied()
.take_while(|sn| sn < &(first_missing + SequenceNumber::new(256)))
.collect(),
)
}
None => SequenceNumberSet::new_empty(writer_proxy.all_ackable_before()),
};
let response_ack_nack = AckNack {
reader_id,
writer_id: heartbeat.writer_id,
reader_sn_state,
count: writer_proxy.next_ack_nack_sequence_number(),
};
let flags = BitFlags::<ACKNACK_Flags>::from_flag(ACKNACK_Flags::Endianness)
| BitFlags::<ACKNACK_Flags>::from_flag(ACKNACK_Flags::Final);
self.send_acknack_to(
flags,
response_ack_nack,
InfoDestination {
guid_prefix: mr_state.source_guid_prefix,
},
&mr_state.unicast_reply_locator_list,
);
return true;
}
false
}
pub fn handle_gap_msg(&mut self, gap: &Gap, mr_state: &MessageReceiverState) {
let writer_guid = GUID::new_with_prefix_and_id(mr_state.source_guid_prefix, gap.writer_id);
if !self.is_stateful {
debug!(
"GAP from {:?}, reader is stateless. Ignoring. topic={:?} reader={:?}",
writer_guid, self.topic_name, self.my_guid
);
return;
}
let writer_proxy = if let Some(wp) = self.matched_writer_lookup(writer_guid) {
wp
} else {
info!(
"GAP from {:?}, but no writer proxy available. topic={:?} reader={:?}",
writer_guid, self.topic_name, self.my_guid
);
return;
};
let mut removed_changes: BTreeSet<Timestamp> = writer_proxy
.irrelevant_changes_range(gap.gap_start, gap.gap_list.base())
.values()
.copied()
.collect();
for seq_num in gap.gap_list.iter() {
writer_proxy
.set_irrelevant_change(seq_num)
.map(|t| removed_changes.insert(t));
}
let mut cache = match self.dds_cache.write() {
Ok(rwlock) => rwlock,
Err(e) => panic!("The DDSCache of is poisoned. Error: {}", e),
};
for instant in &removed_changes {
cache.topic_remove_change(&self.topic_name, instant);
}
}
pub fn handle_heartbeatfrag_msg(
&mut self,
_heartbeatfrag: &HeartbeatFrag,
_mr_state: &MessageReceiverState,
) {
todo!()
}
fn deduce_change_kind(
inline_qos: &Option<ParameterList>,
no_writers: bool,
ri: RepresentationIdentifier,
) -> ChangeKind {
match inline_qos
.as_ref()
.and_then(|iqos| InlineQos::status_info(iqos, ri).ok())
{
Some(si) => si.change_kind(), None => {
if no_writers {
ChangeKind::NotAliveUnregistered
} else {
ChangeKind::NotAliveDisposed
} }
}
}
fn make_cache_change(
&mut self,
data: DDSData,
receive_timestamp: Timestamp,
write_options: WriteOptions,
writer_guid: GUID,
writer_sn: SequenceNumber,
) {
let cache_change = CacheChange::new(writer_guid, writer_sn, write_options, data);
let mut cache = match self.dds_cache.write() {
Ok(rwlock) => rwlock,
Err(e) => panic!("The DDSCache of is poisoned. Error: {}", e),
};
cache.add_change(&self.topic_name, &receive_timestamp, cache_change);
}
pub fn notify_cache_change(&self) {
match self.notification_sender.try_send(()) {
Ok(()) => (),
Err(mio_channel::TrySendError::Full(_)) => (),
Err(mio_channel::TrySendError::Disconnected(_)) => {
}
Err(mio_channel::TrySendError::Io(_)) => {
}
}
}
fn send_acknack_to(
&self,
flags: BitFlags<ACKNACK_Flags>,
acknack: AckNack,
info_dst: InfoDestination,
dst_localtor_list: &[Locator],
) {
let infodst_flags =
BitFlags::<INFODESTINATION_Flags>::from_flag(INFODESTINATION_Flags::Endianness);
let mut message = Message::new(Header {
protocol_id: ProtocolId::default(),
protocol_version: ProtocolVersion::THIS_IMPLEMENTATION,
vendor_id: VendorId::THIS_IMPLEMENTATION,
guid_prefix: self.my_guid.prefix,
});
match info_dst.create_submessage(infodst_flags) {
Some(m) => message.add_submessage(m),
None => return,
};
match acknack.create_submessage(flags) {
Some(m) => message.add_submessage(m),
None => return,
};
let bytes = message
.write_to_vec_with_ctx(Endianness::LittleEndian)
.unwrap();
self
.udp_sender
.send_to_locator_list(&bytes, dst_localtor_list);
}
pub fn send_preemptive_acknacks(&mut self) {
let flags = BitFlags::<ACKNACK_Flags>::from_flag(ACKNACK_Flags::Endianness);
let mut writer_proxies = std::mem::take(&mut self.matched_writers);
let reader_id = self.entity_id();
for (_, writer_proxy) in writer_proxies
.iter_mut()
.filter(|(_, p)| p.no_changes_received())
{
let acknack_count = writer_proxy.next_ack_nack_sequence_number();
self.send_acknack_to(
flags,
AckNack {
reader_id,
writer_id: writer_proxy.remote_writer_guid.entity_id,
reader_sn_state: SequenceNumberSet::new_empty(SequenceNumber::new(1)),
count: acknack_count,
},
InfoDestination {
guid_prefix: writer_proxy.remote_writer_guid.prefix,
},
&writer_proxy.unicast_locator_list,
);
}
self.matched_writers = writer_proxies;
}
pub fn topic_name(&self) -> &String {
&self.topic_name
}
}
impl HasQoSPolicy for Reader {
fn qos(&self) -> QosPolicies {
self.qos_policy.clone()
}
}
impl RTPSEntity for Reader {
fn guid(&self) -> GUID {
self.my_guid
}
}
impl fmt::Debug for Reader {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Reader")
.field("notification_sender, dds_cache", &"can't print".to_string())
.field("topic_name", &self.topic_name)
.field("my_guid", &self.my_guid)
.field("heartbeat_response_delay", &self.heartbeat_response_delay)
.field("received_hearbeat_count", &self.received_hearbeat_count)
.finish()
}
}
#[cfg(test)]
mod tests {
use crate::{
dds::{
qos::policy::Reliability, statusevents::DataReaderStatus, typedesc::TypeDesc,
with_key::datawriter::WriteOptions,
},
messages::submessages::submessage_elements::serialized_payload::SerializedPayload,
structure::guid::{EntityId, EntityKind, GuidPrefix, GUID},
Duration, QosPolicyBuilder,
};
use super::*;
#[test]
#[ignore]
fn rtpsreader_notification() {
let mut guid = GUID::dummy_test_guid(EntityKind::READER_NO_KEY_USER_DEFINED);
guid.entity_id = EntityId::create_custom_entity_id([1, 2, 3], EntityKind::from(111));
let (send, rec) = mio_channel::sync_channel::<()>(100);
let (status_sender, _status_receiver) =
mio_extras::channel::sync_channel::<DataReaderStatus>(100);
let (_reader_command_sender, reader_command_receiver) =
mio_channel::sync_channel::<ReaderCommand>(10);
let dds_cache = Arc::new(RwLock::new(DDSCache::new()));
dds_cache
.write()
.unwrap()
.add_new_topic("test".to_string(), TypeDesc::new("testi".to_string()));
let reader_ing = ReaderIngredients {
guid,
notification_sender: send,
status_sender,
topic_name: "test".to_string(),
qos_policy: QosPolicies::qos_none(),
data_reader_command_receiver: reader_command_receiver,
};
let mut reader = Reader::new(
reader_ing,
dds_cache,
Rc::new(UDPSender::new(0).unwrap()),
mio_extras::timer::Builder::default().build(),
);
let writer_guid = GUID {
prefix: GuidPrefix::new(&[1; 12]),
entity_id: EntityId::create_custom_entity_id(
[1; 3],
EntityKind::WRITER_WITH_KEY_USER_DEFINED,
),
};
let mr_state = MessageReceiverState {
source_guid_prefix: writer_guid.prefix,
..Default::default()
};
reader.matched_writer_add(
writer_guid,
EntityId::UNKNOWN,
mr_state.unicast_reply_locator_list.clone(),
mr_state.multicast_reply_locator_list.clone(),
&QosPolicies::qos_none(),
);
let reader_id = EntityId::create_custom_entity_id([1, 2, 3], EntityKind::from(111));
let data = Data {
reader_id,
writer_id: writer_guid.entity_id,
..Data::default()
};
reader.handle_data_msg(data, BitFlags::<DATA_Flags>::empty(), &mr_state);
assert!(rec.try_recv().is_ok());
}
#[test]
#[ignore]
fn rtpsreader_handle_data() {
let new_guid = GUID::default();
let (send, rec) = mio_channel::sync_channel::<()>(100);
let (status_sender, _status_receiver) =
mio_extras::channel::sync_channel::<DataReaderStatus>(100);
let (_reader_command_sender, reader_command_receiver) =
mio_channel::sync_channel::<ReaderCommand>(10);
let dds_cache = Arc::new(RwLock::new(DDSCache::new()));
dds_cache
.write()
.unwrap()
.add_new_topic("test".to_string(), TypeDesc::new("testi".to_string()));
let reader_ing = ReaderIngredients {
guid: new_guid,
notification_sender: send,
status_sender,
topic_name: "test".to_string(),
qos_policy: QosPolicies::qos_none(),
data_reader_command_receiver: reader_command_receiver,
};
let mut new_reader = Reader::new(
reader_ing,
dds_cache.clone(),
Rc::new(UDPSender::new(0).unwrap()),
mio_extras::timer::Builder::default().build(),
);
let writer_guid = GUID {
prefix: GuidPrefix::new(&[1; 12]),
entity_id: EntityId::create_custom_entity_id(
[1; 3],
EntityKind::WRITER_WITH_KEY_USER_DEFINED,
),
};
let mr_state = MessageReceiverState {
source_guid_prefix: writer_guid.prefix,
..Default::default()
};
new_reader.matched_writer_add(
writer_guid,
EntityId::UNKNOWN,
mr_state.unicast_reply_locator_list.clone(),
mr_state.multicast_reply_locator_list.clone(),
&QosPolicies::qos_none(),
);
let d = Data {
writer_id: writer_guid.entity_id,
..Default::default()
};
let d_seqnum = d.writer_sn;
new_reader.handle_data_msg(d.clone(), BitFlags::<DATA_Flags>::empty(), &mr_state);
assert!(rec.try_recv().is_ok());
let hc_locked = dds_cache.read().unwrap();
let ddsdata = DDSData::new(d.serialized_payload.unwrap());
let cc_built_here = CacheChange::new(writer_guid, d_seqnum, WriteOptions::default(), ddsdata);
let cc_from_chache = hc_locked.topic_get_change(
&new_reader.topic_name,
new_reader.seqnum_instant_map.get(&d_seqnum).unwrap(),
);
assert_eq!(cc_from_chache.unwrap(), &cc_built_here);
}
#[test]
fn rtpsreader_handle_heartbeat() {
env_logger::init();
let new_guid = GUID::dummy_test_guid(EntityKind::READER_NO_KEY_USER_DEFINED);
let (send, _rec) = mio_channel::sync_channel::<()>(100);
let (status_sender, _status_receiver) =
mio_extras::channel::sync_channel::<DataReaderStatus>(100);
let (_reader_command_sender, reader_command_receiver) =
mio_channel::sync_channel::<ReaderCommand>(10);
let dds_cache = Arc::new(RwLock::new(DDSCache::new()));
dds_cache
.write()
.unwrap()
.add_new_topic("test".to_string(), TypeDesc::new("testi".to_string()));
let reliable_qos = QosPolicyBuilder::new()
.reliability(Reliability::Reliable {
max_blocking_time: Duration::from_millis(100),
})
.build();
let reader_ing = ReaderIngredients {
guid: new_guid,
notification_sender: send,
status_sender,
topic_name: "test".to_string(),
qos_policy: reliable_qos.clone(),
data_reader_command_receiver: reader_command_receiver,
};
let mut new_reader = Reader::new(
reader_ing,
dds_cache,
Rc::new(UDPSender::new(0).unwrap()),
mio_extras::timer::Builder::default().build(),
);
let writer_guid = GUID {
prefix: GuidPrefix::new(&[1; 12]),
entity_id: EntityId::create_custom_entity_id(
[1; 3],
EntityKind::WRITER_WITH_KEY_USER_DEFINED,
),
};
let writer_id = writer_guid.entity_id;
let mr_state = MessageReceiverState {
source_guid_prefix: writer_guid.prefix,
..Default::default()
};
new_reader.matched_writer_add(
writer_guid,
EntityId::UNKNOWN,
mr_state.unicast_reply_locator_list.clone(),
mr_state.multicast_reply_locator_list.clone(),
&reliable_qos,
);
let d = DDSData::new(SerializedPayload::default());
let mut changes = Vec::new();
let hb_new = Heartbeat {
reader_id: new_reader.entity_id(),
writer_id,
first_sn: SequenceNumber::new(1), last_sn: SequenceNumber::new(0),
count: 1,
};
assert!(!new_reader.handle_heartbeat_msg(&hb_new, true, mr_state.clone()));
let hb_one = Heartbeat {
reader_id: new_reader.entity_id(),
writer_id,
first_sn: SequenceNumber::new(1), last_sn: SequenceNumber::new(1),
count: 2,
};
assert!(new_reader.handle_heartbeat_msg(&hb_one, false, mr_state.clone()));
let change = CacheChange::new(
new_reader.guid(),
SequenceNumber::new(1),
WriteOptions::default(),
d.clone(),
);
new_reader.dds_cache.write().unwrap().add_change(
&new_reader.topic_name,
&Timestamp::now(),
change.clone(),
);
changes.push(change);
let hb_one2 = Heartbeat {
reader_id: new_reader.entity_id(),
writer_id,
first_sn: SequenceNumber::new(1), last_sn: SequenceNumber::new(1),
count: 2,
};
assert!(!new_reader.handle_heartbeat_msg(&hb_one2, false, mr_state.clone()));
let hb_3_1 = Heartbeat {
reader_id: new_reader.entity_id(),
writer_id,
first_sn: SequenceNumber::new(1), last_sn: SequenceNumber::new(3), count: 3,
};
assert!(new_reader.handle_heartbeat_msg(&hb_3_1, false, mr_state.clone()));
let change = CacheChange::new(
new_reader.guid(),
SequenceNumber::new(2),
WriteOptions::default(),
d.clone(),
);
new_reader.dds_cache.write().unwrap().add_change(
&new_reader.topic_name,
&Timestamp::now(),
change.clone(),
);
changes.push(change);
let change = CacheChange::new(
new_reader.guid(),
SequenceNumber::new(3),
WriteOptions::default(),
d,
);
new_reader.dds_cache.write().unwrap().add_change(
&new_reader.topic_name,
&Timestamp::now(),
change.clone(),
);
changes.push(change);
let hb_none = Heartbeat {
reader_id: new_reader.entity_id(),
writer_id,
first_sn: SequenceNumber::new(4), last_sn: SequenceNumber::new(3), count: 4,
};
assert!(new_reader.handle_heartbeat_msg(&hb_none, false, mr_state));
}
#[test]
#[ignore]
fn rtpsreader_handle_gap() {
let new_guid = GUID::dummy_test_guid(EntityKind::READER_NO_KEY_USER_DEFINED);
let (send, _rec) = mio_channel::sync_channel::<()>(100);
let (status_sender, _status_receiver) =
mio_extras::channel::sync_channel::<DataReaderStatus>(100);
let (_reader_command_sender, reader_command_receiver) =
mio_channel::sync_channel::<ReaderCommand>(10);
let dds_cache = Arc::new(RwLock::new(DDSCache::new()));
dds_cache
.write()
.unwrap()
.add_new_topic("test".to_string(), TypeDesc::new("testi".to_string()));
let reader_ing = ReaderIngredients {
guid: new_guid,
notification_sender: send,
status_sender,
topic_name: "test".to_string(),
qos_policy: QosPolicies::qos_none(),
data_reader_command_receiver: reader_command_receiver,
};
let mut reader = Reader::new(
reader_ing,
dds_cache,
Rc::new(UDPSender::new(0).unwrap()),
mio_extras::timer::Builder::default().build(),
);
let writer_guid = GUID {
prefix: GuidPrefix::new(&[1; 12]),
entity_id: EntityId::create_custom_entity_id(
[1; 3],
EntityKind::WRITER_WITH_KEY_USER_DEFINED,
),
};
let writer_id = writer_guid.entity_id;
let mr_state = MessageReceiverState {
source_guid_prefix: writer_guid.prefix,
..Default::default()
};
reader.matched_writer_add(
writer_guid,
EntityId::UNKNOWN,
mr_state.unicast_reply_locator_list.clone(),
mr_state.multicast_reply_locator_list.clone(),
&QosPolicies::qos_none(),
);
let n: i64 = 10;
let mut d = Data {
writer_id,
..Default::default()
};
let mut changes = Vec::new();
for i in 0..n {
d.writer_sn = SequenceNumber::new(i);
reader.handle_data_msg(d.clone(), BitFlags::<DATA_Flags>::empty(), &mr_state);
changes.push(reader.history_cache_change(d.writer_sn).unwrap().clone());
}
let mut gap_list = SequenceNumberSet::new(SequenceNumber::new(4), 7);
gap_list.insert(SequenceNumber::new(5));
gap_list.insert(SequenceNumber::new(7));
let gap = Gap {
reader_id: reader.entity_id(),
writer_id,
gap_start: SequenceNumber::new(1),
gap_list,
};
reader.handle_gap_msg(&gap, &mr_state);
assert_eq!(
reader.history_cache_change(SequenceNumber::new(0)),
Some(changes[0].clone())
);
assert_eq!(reader.history_cache_change(SequenceNumber::new(1)), None);
assert_eq!(reader.history_cache_change(SequenceNumber::new(2)), None);
assert_eq!(reader.history_cache_change(SequenceNumber::new(3)), None);
assert_eq!(
reader.history_cache_change(SequenceNumber::new(4)),
Some(changes[4].clone())
);
assert_eq!(reader.history_cache_change(SequenceNumber::new(5)), None);
assert_eq!(
reader.history_cache_change(SequenceNumber::new(6)),
Some(changes[6].clone())
);
assert_eq!(reader.history_cache_change(SequenceNumber::new(7)), None);
assert_eq!(
reader.history_cache_change(SequenceNumber::new(8)),
Some(changes[8].clone())
);
assert_eq!(
reader.history_cache_change(SequenceNumber::new(9)),
Some(changes[9].clone())
);
}
}