use crate::{
common::timed_event_handler::TimedEventHandler,
network::constant::TimerMessageType,
structure::{cache_change::ChangeKind, entity::RTPSEntity},
};
use crate::structure::endpoint::{Endpoint, EndpointAttributes};
use crate::messages::submessages::submessages::*;
use crate::dds::ddsdata::DDSData;
use crate::dds::statusevents::*;
use crate::dds::rtps_writer_proxy::RtpsWriterProxy;
use crate::structure::guid::{GUID, EntityId, GuidPrefix};
use crate::structure::sequence_number::{SequenceNumber, SequenceNumberSet};
#[cfg(test)] use crate::structure::locator::LocatorList;
use crate::structure::{duration::Duration, time::Timestamp};
use std::{
collections::BTreeSet,
iter::FromIterator,
sync::{Arc, RwLock},
};
use crate::structure::dds_cache::{DDSCache};
use mio::Token;
use mio_extras::channel as mio_channel;
use log::{debug, info, warn, trace, error};
use std::fmt;
use std::collections::{HashSet, BTreeMap, };
use std::time::Duration as StdDuration;
use enumflags2::BitFlags;
use crate::structure::cache_change::CacheChange;
use crate::dds::message_receiver::MessageReceiverState;
use crate::dds::qos::{QosPolicies, HasQoSPolicy, policy};
use crate::network::udp_sender::UDPSender;
use crate::serialization::message::Message;
use crate::messages::header::Header;
use crate::messages::protocol_id::ProtocolId;
use crate::messages::protocol_version::ProtocolVersion;
use crate::messages::vendor_id::VendorId;
use crate::messages::submessages::submessage_elements::parameter_list::ParameterList;
use speedy::{Writable, Endianness};
use chrono::Duration as chronoDuration;
use super::{
with_key::datareader::ReaderCommand,
};
use super::qos::InlineQos;
pub(crate) struct Reader {
notification_sender: mio_channel::SyncSender<()>,
status_sender: mio_channel::SyncSender<DataReaderStatus>,
dds_cache: Arc<RwLock<DDSCache>>,
seqnum_instant_map: BTreeMap<SequenceNumber, Timestamp>,
topic_name: String,
qos_policy: QosPolicies,
my_guid: GUID,
pub enpoint_attributes: EndpointAttributes,
heartbeat_response_delay: StdDuration,
heartbeat_supression_duration: StdDuration,
sent_ack_nack_count: i32,
received_hearbeat_count: i32,
matched_writers: BTreeMap<GUID, RtpsWriterProxy>,
writer_match_count_total: i32,
requested_deadline_missed_count: i32,
offered_incompatible_qos_count: i32,
timed_event_handler: Option<TimedEventHandler>,
pub(crate) data_reader_command_receiver: mio_channel::Receiver<ReaderCommand>,
}
impl Reader {
pub fn new(
guid: GUID,
notification_sender: mio_channel::SyncSender<()>,
status_sender: mio_channel::SyncSender<DataReaderStatus>,
dds_cache: Arc<RwLock<DDSCache>>,
topic_name: String,
qos_policy: QosPolicies,
data_reader_command_receiver: mio_channel::Receiver<ReaderCommand>, ) -> Reader {
Reader {
notification_sender,
status_sender,
dds_cache,
topic_name,
qos_policy,
seqnum_instant_map: BTreeMap::new(),
my_guid: guid ,
enpoint_attributes: EndpointAttributes::default(),
heartbeat_response_delay: StdDuration::new(0, 500_000_000), heartbeat_supression_duration: StdDuration::new(0, 0),
sent_ack_nack_count: 0,
received_hearbeat_count: 0,
matched_writers: BTreeMap::new(),
writer_match_count_total: 0,
requested_deadline_missed_count: 0,
offered_incompatible_qos_count: 0,
timed_event_handler: None,
data_reader_command_receiver,
}
}
#[cfg(test)]
pub fn get_history_cache_change_data(&self, sequence_number: SequenceNumber) -> Option<DDSData> {
let dds_cache = self.dds_cache.read().unwrap();
let cc = dds_cache.from_topic_get_change(
&self.topic_name,
&self.seqnum_instant_map.get(&sequence_number).unwrap(),
);
debug!("history cache !!!! {:?}", cc);
match cc {
Some(cc) => Some(cc.data_value.clone()) , None => None,
}
}
pub fn get_entity_token(&self) -> Token {
self.get_guid().entityId.as_token()
}
pub fn get_reader_alt_entity_token(&self) -> Token {
self.get_guid().entityId.as_alt_token()
}
pub fn add_timed_event_handler(&mut self, time_handler: TimedEventHandler) {
self.timed_event_handler = Some(time_handler);
}
pub fn set_requested_deadline_check_timer(&mut self) {
if let Some(deadline) = self.qos_policy.deadline {
debug!("GUID={:?} set_requested_deadline_check_timer: {:?}",
self.my_guid, deadline.0.to_std() );
match chronoDuration::from_std(deadline.0.to_std()) {
Ok(cdur) => match self.timed_event_handler.as_mut() {
Some(teh) => teh.set_timeout(&cdur, TimerMessageType::ReaderDeadlineMissedCheck),
None => warn!("Unable to get timed_event_handler."),
},
Err(_) => {
warn!("Failed to get chrono duration from deadline {:?}", deadline);
}
}
} else {
trace!("GUID={:?} - no deaadline policy - do not set set_requested_deadline_check_timer",
self.my_guid);
}
}
pub fn send_status_change(&self, change: DataReaderStatus) {
match self.status_sender.try_send(change) {
Ok(()) => (), Err(mio_channel::TrySendError::Full(_)) => {
trace!("Reader cannot send new status changes, datareader is full.");
}
Err(mio_channel::TrySendError::Disconnected(_)) => {
info!("send_status_change - cannot send status, DataReader Disconnected.")
}
Err(mio_channel::TrySendError::Io(e)) => {
error!("send_status_change - cannot send status: {:?}",e);
}
}
}
fn calculate_if_requested_deadline_is_missed(&mut self) -> Vec<DataReaderStatus> {
debug!("calculate_if_requested_deadline_is_missed");
match self.qos_policy.deadline {
None => vec![],
Some(policy::Deadline(deadline_duration)) => {
let mut changes: Vec<DataReaderStatus> = vec![];
let now = Timestamp::now();
for (_g, writer_proxy) in self.matched_writers.iter_mut() {
match writer_proxy.last_change_timestamp() {
Some(last_change) => {
let since_last = now.duration_since(last_change);
trace!("Comparing deadlines: {:?} - {:?}", since_last, deadline_duration);
if since_last > deadline_duration{
debug!("Deadline missed: {:?} - {:?}", since_last, deadline_duration);
self.requested_deadline_missed_count += 1;
changes.push( DataReaderStatus::RequestedDeadlineMissed {
count: CountWithChange::start_from(self.requested_deadline_missed_count,1)
}
);
}
}
None => {
self.requested_deadline_missed_count += 1;
changes.push( DataReaderStatus::RequestedDeadlineMissed {
count: CountWithChange::start_from(self.requested_deadline_missed_count,1)
}
);
}
}
} changes
} } }
pub fn handle_timed_event(&mut self, timer_message: TimerMessageType) {
match timer_message {
TimerMessageType::ReaderDeadlineMissedCheck =>
self.handle_requested_deadline_event(),
other_message =>
error!("handle_timed_event - I do not know how to handle {:?}", other_message),
}
}
pub fn process_command(&mut self) {
trace!("process_command {:?}", self.my_guid );
loop {
use std::sync::mpsc::TryRecvError;
match self.data_reader_command_receiver.try_recv() {
Ok(ReaderCommand::RESET_REQUESTED_DEADLINE_STATUS) => {
warn!("RESET_REQUESTED_DEADLINE_STATUS not implemented!");
}
Err(TryRecvError::Disconnected) => {
trace!("DataReader disconnected");
break
}
Err(TryRecvError::Empty) => {
warn!("There was no command. Spurious command event??");
break
}
}
}
}
fn handle_requested_deadline_event(&mut self) {
debug!("handle_requested_deadline_event");
for missed_deadline in self.calculate_if_requested_deadline_is_missed() {
self.send_status_change(missed_deadline);
}
self.set_requested_deadline_check_timer();
}
pub fn get_history_cache_change(&self, sequence_number: SequenceNumber) -> Option<CacheChange> {
debug!("{:?}", sequence_number);
let dds_cache = self.dds_cache.read().unwrap();
let cc = dds_cache.from_topic_get_change(
&self.topic_name,
&self.seqnum_instant_map.get(&sequence_number).unwrap(),
);
debug!("history cache !!!! {:?}", cc);
match cc {
Some(cc) => Some(cc.clone()),
None => None,
}
}
#[cfg(test)]
pub fn get_history_cache_sequence_start_and_end_numbers(&self) -> Vec<SequenceNumber> {
let start = self.seqnum_instant_map.iter().min().unwrap().0;
let end = self.seqnum_instant_map.iter().max().unwrap().0;
return vec![*start, *end];
}
pub fn update_writer_proxy(&mut self, proxy: RtpsWriterProxy, offered_qos: QosPolicies) {
debug!("update_writer_proxy topic={:?}",self.topic_name);
match offered_qos.compliance_failure_wrt( &self.qos_policy ) {
None => { let count_change =
self.matched_writer_update(proxy.clone()); if count_change > 0 {
self.writer_match_count_total += count_change;
self.send_status_change(DataReaderStatus::SubscriptionMatched{
total: CountWithChange::new(self.writer_match_count_total, count_change ),
current: CountWithChange::new(self.matched_writers.len() as i32, count_change ),
});
info!("Matched new remote writer on topic={:?} writer= {:?}",
self.topic_name, &proxy);
}
}
Some(bad_policy_id) => { self.offered_incompatible_qos_count += 1;
self.send_status_change(DataReaderStatus::RequestedIncompatibleQos {
count: CountWithChange::new(self.offered_incompatible_qos_count, 1),
last_policy_id: bad_policy_id,
policies: Vec::new(), });
debug!("update_writer_proxy - QoS mismatch {:?}", bad_policy_id);
}
}
}
fn matched_writer_update(&mut self, proxy: RtpsWriterProxy) -> i32 {
match self.matched_writer_lookup(proxy.remote_writer_guid) {
Some(op) => {
op.update_contents(proxy);
0
}
None => {
self.matched_writers.insert(proxy.remote_writer_guid, proxy);
1
}
}
}
pub fn remove_writer_proxy(&mut self, writer_guid:GUID) {
if self.matched_writers.contains_key(&writer_guid) {
self.matched_writers.remove(&writer_guid);
self.send_status_change(DataReaderStatus::SubscriptionMatched {
total: CountWithChange::new(self.writer_match_count_total , 0 ),
current: CountWithChange::new(self.matched_writers.len() as i32 , -1)
});
}
}
pub fn participant_lost(&mut self, guid_prefix: GuidPrefix) {
let lost_readers : Vec<GUID> =
self.matched_writers.range( guid_prefix.range() )
.map(|(g,_)| *g)
.collect();
for reader in lost_readers {
self.remove_writer_proxy(reader)
}
}
pub fn contains_writer(&self, entity_id: EntityId) -> bool {
self
.matched_writers
.iter()
.find(|(&g, _)| g.entityId == entity_id)
.is_some()
}
#[cfg(test)]
pub(crate) fn matched_writer_add(
&mut self,
remote_writer_guid: GUID,
remote_group_entity_id: EntityId,
unicast_locator_list: LocatorList,
multicast_locator_list: LocatorList,
) {
let proxy = RtpsWriterProxy::new(
remote_writer_guid,
unicast_locator_list,
multicast_locator_list,
remote_group_entity_id,
);
self.update_writer_proxy(proxy, QosPolicies::qos_none() );
}
fn matched_writer_lookup(&mut self, remote_writer_guid: GUID) -> Option<&mut RtpsWriterProxy> {
self.matched_writers.get_mut(&remote_writer_guid)
}
pub fn handle_data_msg(&mut self, data: Data,
data_flags:BitFlags<DATA_Flags>, mr_state: MessageReceiverState )
{
trace!("handle_data_msg entry");
let duration = match mr_state.timestamp {
Some(ts) => Timestamp::now().duration_since(ts),
None => Duration::DURATION_ZERO,
};
match self.get_qos().lifespan {
Some(ls) => {
if ls.duration < duration {
return;
}
}
None => (),
}
let writer_guid = GUID::new_with_prefix_and_id(mr_state.source_guid_prefix, data.writer_id);
let seq_num = data.writer_sn;
let instant = Timestamp::now();
let statefull = self.matched_writers.contains_key(&writer_guid);
let mut no_writers = false;
trace!("handle_data_msg from {:?} no_writers={:?} seq={:?}",
&writer_guid, no_writers, seq_num,);
if statefull {
let my_entityid = self.my_guid.entityId; if let Some(writer_proxy) = self.matched_writer_lookup(writer_guid) {
if writer_proxy.contains_change(seq_num) {
trace!("handle_data_msg already have this seq={:?}", seq_num);
if my_entityid == EntityId::ENTITYID_SPDP_BUILTIN_PARTICIPANT_READER {
debug!("Accepting duplicate message to participant reader.");
} else {
return
}
}
writer_proxy.received_changes_add(seq_num, instant);
} else {
no_writers = true;
}
}
self.make_cache_change(data, data_flags, instant, writer_guid, no_writers);
self.seqnum_instant_map.insert(seq_num, instant);
self.notify_cache_change();
}
pub fn handle_heartbeat_msg(
&mut self,
heartbeat: Heartbeat,
final_flag_set: bool,
mr_state: MessageReceiverState,
) -> bool {
let writer_guid =
GUID::new_with_prefix_and_id(mr_state.source_guid_prefix, heartbeat.writer_id);
if !self.matched_writers.contains_key(&writer_guid) {
return false
}
if heartbeat.first_sn < SequenceNumber::default() {
warn!("Writer {:?} advertised SequenceNumbers from {:?} to {:?}!",
writer_guid, heartbeat.first_sn, heartbeat.last_sn);
}
let writer_proxy = match self.matched_writer_lookup(writer_guid) {
Some(wp) => wp,
None => return false, };
let mut mr_state = mr_state;
mr_state.unicast_reply_locator_list = writer_proxy.unicast_locator_list.clone();
if heartbeat.count <= writer_proxy.received_heartbeat_count {
return false
}
writer_proxy.received_heartbeat_count = heartbeat.count;
let removed_instances = writer_proxy.irrelevant_changes_up_to(heartbeat.first_sn);
{
let mut cache = match self.dds_cache.write() {
Ok(rwlock) => rwlock,
Err(e) => panic!("The DDSCache of is poisoned. Error: {}", e),
};
for instant in removed_instances.iter() {
match cache.from_topic_remove_change(&self.topic_name, instant) {
Some(_) => (),
None => warn!("WriterProxy told to remove an instant which was not present"),
}
}
}
let writer_proxy = match self.matched_writer_lookup(writer_guid) {
Some(wp) => wp,
None => return false, };
let missing_seqnums =
writer_proxy.get_missing_sequence_numbers(heartbeat.first_sn, heartbeat.last_sn);
if ! missing_seqnums.is_empty() || ! final_flag_set {
let reader_sn_state =
match missing_seqnums.iter().next() {
Some(&first_missing) => {
SequenceNumberSet
::from_base_and_set(first_missing,
&BTreeSet::from_iter(missing_seqnums.iter()
.map(|s| *s)
.take_while( |sn| sn < &(first_missing + SequenceNumber::from(256)) ))
)
}
None =>
match self.seqnum_instant_map.keys().next_back() {
None => SequenceNumberSet::new_empty(SequenceNumber::default()), Some(high_sn) => SequenceNumberSet::new_empty(*high_sn + SequenceNumber::new(1)),
}
};
let response_ack_nack = AckNack {
reader_id: self.get_entity_id(),
writer_id: heartbeat.writer_id,
reader_sn_state,
count: self.sent_ack_nack_count,
};
self.sent_ack_nack_count += 1;
self.send_acknack(response_ack_nack, mr_state);
return true
}
false
}
pub fn handle_gap_msg(&mut self, gap: Gap, mr_state: MessageReceiverState) {
let writer_guid = GUID::new_with_prefix_and_id(mr_state.source_guid_prefix, gap.writer_id);
let writer_proxy = match self.matched_writer_lookup(writer_guid) {
Some(wp) => wp,
None => return, };
let mut irrelevant_changes_set = HashSet::new();
for seq_num_i64 in i64::from(gap.gap_start)..i64::from(gap.gap_list.base()) {
irrelevant_changes_set.insert(SequenceNumber::from(seq_num_i64));
}
for seq_num in gap.gap_list.iter() {
irrelevant_changes_set.insert(seq_num);
}
let mut removed_instances = Vec::new();
for seq_num in &irrelevant_changes_set {
match writer_proxy.set_irrelevant_change(*seq_num) {
Some(i) => removed_instances.push(i),
None => (),
};
}
let mut cache = match self.dds_cache.write() {
Ok(rwlock) => rwlock,
Err(e) => panic!("The DDSCache of is poisoned. Error: {}", e),
};
for instant in &removed_instances {
cache.from_topic_remove_change(&self.topic_name, instant);
}
}
pub fn handle_datafrag_msg(&mut self, _datafrag: DataFrag, _mr_State: MessageReceiverState) {
todo!() }
pub fn handle_heartbeatfrag_msg(
&mut self,
_heartbeatfrag: HeartbeatFrag,
_mr_state: MessageReceiverState,
) {
todo!()
}
fn deduce_change_kind(inline_qos: Option<ParameterList>, no_writers:bool , ri:RepresentationIdentifier )
-> ChangeKind
{
match inline_qos
.as_ref().map( |iqos| InlineQos::status_info(iqos, ri).ok())
.flatten() {
Some(si) =>
si.change_kind(), None => {
if no_writers { ChangeKind::NOT_ALIVE_UNREGISTERED }
else { ChangeKind::NOT_ALIVE_DISPOSED } }
}
}
fn make_cache_change(
&mut self,
data: Data,
data_flags: BitFlags<DATA_Flags>,
instant: Timestamp,
writer_guid: GUID,
no_writers: bool,
) {
let representation_identifier =
if data_flags.contains(DATA_Flags::Endianness) { RepresentationIdentifier::CDR_LE }
else { RepresentationIdentifier::CDR_BE };
let ddsdata =
match (data.serialized_payload ,
data_flags.contains(DATA_Flags::Data) , data_flags.contains(DATA_Flags::Key)) {
(Some(sp), true, false ) => { DDSData::new(sp)
}
(Some(sp), false, true ) => { DDSData::new_disposed_by_key(Self::deduce_change_kind(data.inline_qos, no_writers, representation_identifier), sp)
}
(None, false, false ) => { let key_hash =
match data.inline_qos
.as_ref()
.map( |iqos| InlineQos::key_hash(iqos).ok() )
.flatten().flatten() {
Some(h) => h,
None => {
info!("Writer {:?} sent us DATA that has no payload and no key_hash inline QoS - discarding {:?}",
&writer_guid, &data.inline_qos );
return
}
};
let change_kind =
Self::deduce_change_kind(data.inline_qos, no_writers, representation_identifier);
DDSData::new_disposed_by_key_hash(change_kind, key_hash )
}
(Some(_), true , true ) => { info!("Writer {:?} sent us DATA that claims to be both data and key - discarding.",
writer_guid);
return
}
(Some(_), false, false ) => { error!("make_cache_change - Flags says no data or key, but got payload!");
return
}
(None, true, _ ) | (None, _ , true ) => {
error!("make_cache_change - Where is my SerializedPayload?");
return
}
};
let cache_change = CacheChange::new(writer_guid, data.writer_sn, ddsdata);
let mut cache = match self.dds_cache.write() {
Ok(rwlock) => rwlock,
Err(e) => panic!("The DDSCache of is poisoned. Error: {}", e),
};
cache.to_topic_add_change(&self.topic_name, &instant, cache_change);
}
fn notify_cache_change(&self) {
match self.notification_sender.try_send(()) {
Ok(()) => (),
Err(mio_channel::TrySendError::Full(_)) => (), Err(mio_channel::TrySendError::Disconnected(_)) => {
}
Err(mio_channel::TrySendError::Io(_)) => {
}
}
}
fn send_acknack(&self, acknack: AckNack, mr_state: MessageReceiverState) {
let sender = UDPSender::new_with_random_port();
let flags = BitFlags::<ACKNACK_Flags>::from_flag(ACKNACK_Flags::Endianness)
| BitFlags::<ACKNACK_Flags>::from_flag(ACKNACK_Flags::Final);
let infodst_flags =
BitFlags::<INFODESTINATION_Flags>::from_flag(INFODESTINATION_Flags::Endianness);
let mut message = Message::new(Header {
protocol_id: ProtocolId::default(),
protocol_version: ProtocolVersion::THIS_IMPLEMENTATION,
vendor_id: VendorId::THIS_IMPLEMENTATION,
guid_prefix: self.my_guid.guidPrefix,
});
let info_dst = InfoDestination {
guid_prefix: mr_state.source_guid_prefix,
};
match info_dst.create_submessage(infodst_flags) {
Some(m) => message.add_submessage(m),
None => return,
};
match acknack.create_submessage(flags) {
Some(m) => message.add_submessage(m),
None => return,
};
let bytes = message
.write_to_vec_with_ctx(Endianness::LittleEndian)
.unwrap();
sender.send_to_locator_list(&bytes, &mr_state.unicast_reply_locator_list);
}
pub fn send_preemptive_acknacks(&mut self) {
let sender = UDPSender::new_with_random_port();
let flags = BitFlags::<ACKNACK_Flags>::from_flag(ACKNACK_Flags::Endianness)
| BitFlags::<ACKNACK_Flags>::from_flag(ACKNACK_Flags::Final);
let infodst_flags =
BitFlags::<INFODESTINATION_Flags>::from_flag(INFODESTINATION_Flags::Endianness);
self.sent_ack_nack_count += 1;
for (_, writer_proxy) in self
.matched_writers
.iter()
.filter(|(_, p)| p.no_changes() )
{
let mut message = Message::new(Header {
protocol_id: ProtocolId::default(),
protocol_version: ProtocolVersion::THIS_IMPLEMENTATION,
vendor_id: VendorId::THIS_IMPLEMENTATION,
guid_prefix: self.my_guid.guidPrefix,
});
let info_dst = InfoDestination {
guid_prefix: writer_proxy.remote_writer_guid.guidPrefix,
};
let acknack = AckNack {
reader_id: self.get_entity_id(),
writer_id: writer_proxy.remote_writer_guid.entityId,
reader_sn_state: SequenceNumberSet::new_empty(SequenceNumber::from(1)),
count: self.sent_ack_nack_count,
};
match info_dst.create_submessage(infodst_flags) {
Some(m) => message.add_submessage(m),
None => continue, };
match acknack.create_submessage(flags) {
Some(m) => message.add_submessage(m),
None => continue, };
let bytes = message
.write_to_vec_with_ctx(Endianness::LittleEndian)
.unwrap();
sender.send_to_locator_list(&bytes, &writer_proxy.unicast_locator_list);
}
}
pub fn topic_name(&self) -> &String {
&self.topic_name
}
}
impl HasQoSPolicy for Reader {
fn get_qos(&self) -> QosPolicies {
self.qos_policy.clone()
}
}
impl RTPSEntity for Reader {
fn get_guid(&self) -> GUID {
self.my_guid
}
}
impl Endpoint for Reader {
fn as_endpoint(&self) -> &crate::structure::endpoint::EndpointAttributes {
&self.enpoint_attributes
}
}
impl fmt::Debug for Reader {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Reader")
.field("notification_sender, dds_cache", &"can't print".to_string())
.field("topic_name", &self.topic_name)
.field("my_guid", &self.my_guid)
.field("enpoint_attributes", &self.enpoint_attributes)
.field("heartbeat_response_delay", &self.heartbeat_response_delay)
.field("sent_ack_nack_count", &self.sent_ack_nack_count)
.field("received_hearbeat_count", &self.received_hearbeat_count)
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::structure::guid::{GUID, EntityId};
use crate::messages::submessages::submessage_elements::serialized_payload::{SerializedPayload};
use crate::structure::guid::{GuidPrefix, EntityKind};
use crate::dds::statusevents::DataReaderStatus;
use crate::structure::topic_kind::TopicKind;
use crate::dds::typedesc::TypeDesc;
#[test]
fn rtpsreader_notification() {
let mut guid = GUID::dummy_test_guid(EntityKind::READER_NO_KEY_USER_DEFINED);
guid.entityId = EntityId::createCustomEntityID([1, 2, 3], EntityKind::from(111));
let (send, rec) = mio_channel::sync_channel::<()>(100);
let (status_sender, _status_reciever) = mio_extras::channel::sync_channel::<DataReaderStatus>(100);
let (_reader_command_sender, reader_command_receiver) =
mio_channel::sync_channel::<ReaderCommand>(10);
let dds_cache = Arc::new(RwLock::new(DDSCache::new()));
dds_cache.write().unwrap().add_new_topic(
&"test".to_string(),
TopicKind::NoKey,
TypeDesc::new("testi"),
);
let mut reader = Reader::new(
guid,
send,
status_sender,
dds_cache,
"test".to_string(),
QosPolicies::qos_none(),
reader_command_receiver,
);
let writer_guid = GUID {
guidPrefix: GuidPrefix::new(&[1; 12]),
entityId: EntityId::createCustomEntityID([1; 3], EntityKind::WRITER_WITH_KEY_USER_DEFINED),
};
let mut mr_state = MessageReceiverState::default();
mr_state.source_guid_prefix = writer_guid.guidPrefix;
reader.matched_writer_add(
writer_guid.clone(),
EntityId::ENTITYID_UNKNOWN,
mr_state.unicast_reply_locator_list.clone(),
mr_state.multicast_reply_locator_list.clone(),
);
let mut data = Data::default();
data.reader_id = EntityId::createCustomEntityID([1, 2, 3], EntityKind::from(111));
data.writer_id = writer_guid.entityId;
reader.handle_data_msg(data, BitFlags::<DATA_Flags>::empty(), mr_state);
assert!(rec.try_recv().is_ok());
}
#[test]
fn rtpsreader_handle_data() {
let new_guid = GUID::default();
let (send, rec) = mio_channel::sync_channel::<()>(100);
let (status_sender, _status_reciever) = mio_extras::channel::sync_channel::<DataReaderStatus>(100);
let (_reader_command_sender, reader_command_receiver) =
mio_channel::sync_channel::<ReaderCommand>(10);
let dds_cache = Arc::new(RwLock::new(DDSCache::new()));
dds_cache.write().unwrap().add_new_topic(
&"test".to_string(),
TopicKind::NoKey,
TypeDesc::new("testi"),
);
let mut new_reader = Reader::new(
new_guid,
send,
status_sender,
dds_cache.clone(),
"test".to_string(),
QosPolicies::qos_none(),
reader_command_receiver,
);
let writer_guid = GUID {
guidPrefix: GuidPrefix::new(&[1; 12]),
entityId: EntityId::createCustomEntityID([1; 3], EntityKind::WRITER_WITH_KEY_USER_DEFINED),
};
let mut mr_state = MessageReceiverState::default();
mr_state.source_guid_prefix = writer_guid.guidPrefix;
new_reader.matched_writer_add(
writer_guid.clone(),
EntityId::ENTITYID_UNKNOWN,
mr_state.unicast_reply_locator_list.clone(),
mr_state.multicast_reply_locator_list.clone(),
);
let mut d = Data::default();
d.writer_id = writer_guid.entityId;
let d_seqnum = d.writer_sn;
new_reader.handle_data_msg(d.clone(), BitFlags::<DATA_Flags>::empty(), mr_state);
assert!(rec.try_recv().is_ok());
let hc_locked = dds_cache.read().unwrap();
let cc_from_chache = hc_locked.from_topic_get_change(
&new_reader.topic_name,
&new_reader.seqnum_instant_map.get(&d_seqnum).unwrap(),
);
let ddsdata = DDSData::new(d.serialized_payload.unwrap());
let cc_built_here = CacheChange::new( writer_guid, d_seqnum, ddsdata );
assert_eq!(cc_from_chache.unwrap(), &cc_built_here);
}
#[test]
fn rtpsreader_handle_heartbeat() {
let new_guid = GUID::dummy_test_guid(EntityKind::READER_NO_KEY_USER_DEFINED);
let (send, _rec) = mio_channel::sync_channel::<()>(100);
let (status_sender, _status_reciever) = mio_extras::channel::sync_channel::<DataReaderStatus>(100);
let (_reader_command_sender, reader_command_receiver) =
mio_channel::sync_channel::<ReaderCommand>(10);
let dds_cache = Arc::new(RwLock::new(DDSCache::new()));
dds_cache.write().unwrap().add_new_topic(
&"test".to_string(),
TopicKind::NoKey,
TypeDesc::new("testi"),
);
let mut new_reader = Reader::new(
new_guid,
send,
status_sender,
dds_cache,
"test".to_string(),
QosPolicies::qos_none(),
reader_command_receiver,
);
let writer_guid = GUID {
guidPrefix: GuidPrefix::new(&[1; 12]),
entityId: EntityId::createCustomEntityID([1; 3], EntityKind::WRITER_WITH_KEY_USER_DEFINED),
};
let writer_id = writer_guid.entityId;
let mut mr_state = MessageReceiverState::default();
mr_state.source_guid_prefix = writer_guid.guidPrefix;
new_reader.matched_writer_add(
writer_guid.clone(),
EntityId::ENTITYID_UNKNOWN,
mr_state.unicast_reply_locator_list.clone(),
mr_state.multicast_reply_locator_list.clone(),
);
let d = DDSData::new(SerializedPayload::default());
let mut changes = Vec::new();
let hb_new = Heartbeat {
reader_id: new_reader.get_entity_id(),
writer_id,
first_sn: SequenceNumber::from(1), last_sn: SequenceNumber::from(0),
count: 1,
};
assert!(!new_reader.handle_heartbeat_msg(hb_new, true, mr_state.clone()));
let hb_one = Heartbeat {
reader_id: new_reader.get_entity_id(),
writer_id,
first_sn: SequenceNumber::from(1), last_sn: SequenceNumber::from(1),
count: 2,
};
assert!(new_reader.handle_heartbeat_msg(hb_one, false, mr_state.clone()));
let change = CacheChange::new(
new_reader.get_guid(),
SequenceNumber::from(1),
d.clone(),
);
new_reader.dds_cache.write().unwrap().to_topic_add_change(
&new_reader.topic_name,
&Timestamp::now(),
change.clone(),
);
changes.push(change);
let hb_one2 = Heartbeat {
reader_id: new_reader.get_entity_id(),
writer_id,
first_sn: SequenceNumber::from(1), last_sn: SequenceNumber::from(1),
count: 2,
};
assert!(!new_reader.handle_heartbeat_msg(hb_one2, false, mr_state.clone()));
let hb_3_1 = Heartbeat {
reader_id: new_reader.get_entity_id(),
writer_id,
first_sn: SequenceNumber::from(1), last_sn: SequenceNumber::from(3), count: 3,
};
assert!(new_reader.handle_heartbeat_msg(hb_3_1, false, mr_state.clone()));
let change = CacheChange::new(
new_reader.get_guid(),
SequenceNumber::from(2),
d.clone(),
);
new_reader.dds_cache.write().unwrap().to_topic_add_change(
&new_reader.topic_name,
&Timestamp::now(),
change.clone(),
);
changes.push(change);
let change = CacheChange::new(
new_reader.get_guid(),
SequenceNumber::from(3),
d,
);
new_reader.dds_cache.write().unwrap().to_topic_add_change(
&new_reader.topic_name,
&Timestamp::now(),
change.clone(),
);
changes.push(change);
let hb_none = Heartbeat {
reader_id: new_reader.get_entity_id(),
writer_id,
first_sn: SequenceNumber::from(4), last_sn: SequenceNumber::from(3), count: 4,
};
assert!(new_reader.handle_heartbeat_msg(hb_none, false, mr_state));
assert_eq!(new_reader.sent_ack_nack_count, 3);
}
#[test]
fn rtpsreader_handle_gap() {
let new_guid = GUID::dummy_test_guid(EntityKind::READER_NO_KEY_USER_DEFINED);
let (send, _rec) = mio_channel::sync_channel::<()>(100);
let (status_sender, _status_reciever) = mio_extras::channel::sync_channel::<DataReaderStatus>(100);
let (_reader_command_sender, reader_command_receiver) =
mio_channel::sync_channel::<ReaderCommand>(10);
let dds_cache = Arc::new(RwLock::new(DDSCache::new()));
dds_cache.write().unwrap().add_new_topic(
&"test".to_string(),
TopicKind::NoKey,
TypeDesc::new("testi"),
);
let mut reader = Reader::new(
new_guid,
send,
status_sender,
dds_cache,
"test".to_string(),
QosPolicies::qos_none(),
reader_command_receiver,
);
let writer_guid = GUID {
guidPrefix: GuidPrefix::new(&[1; 12]),
entityId: EntityId::createCustomEntityID([1; 3], EntityKind::WRITER_WITH_KEY_USER_DEFINED),
};
let writer_id = writer_guid.entityId;
let mut mr_state = MessageReceiverState::default();
mr_state.source_guid_prefix = writer_guid.guidPrefix;
reader.matched_writer_add(
writer_guid.clone(),
EntityId::ENTITYID_UNKNOWN,
mr_state.unicast_reply_locator_list.clone(),
mr_state.multicast_reply_locator_list.clone(),
);
let n: i64 = 10;
let mut d = Data::default();
d.writer_id = writer_id;
let mut changes = Vec::new();
for i in 0..n {
d.writer_sn = SequenceNumber::from(i);
reader.handle_data_msg(d.clone(), BitFlags::<DATA_Flags>::empty(), mr_state.clone());
changes.push(
reader
.get_history_cache_change(d.writer_sn)
.unwrap()
.clone(),
);
}
let mut gap_list = SequenceNumberSet::new(SequenceNumber::from(4),7);
gap_list.insert(SequenceNumber::from(5));
gap_list.insert(SequenceNumber::from(7));
let gap = Gap {
reader_id: reader.get_entity_id(),
writer_id,
gap_start: SequenceNumber::from(1),
gap_list,
};
reader.handle_gap_msg(gap, mr_state);
assert_eq!(
reader.get_history_cache_change(SequenceNumber::from(0)),
Some(changes[0].clone())
);
assert_eq!(
reader.get_history_cache_change(SequenceNumber::from(1)),
None
);
assert_eq!(
reader.get_history_cache_change(SequenceNumber::from(2)),
None
);
assert_eq!(
reader.get_history_cache_change(SequenceNumber::from(3)),
None
);
assert_eq!(
reader.get_history_cache_change(SequenceNumber::from(4)),
Some(changes[4].clone())
);
assert_eq!(
reader.get_history_cache_change(SequenceNumber::from(5)),
None
);
assert_eq!(
reader.get_history_cache_change(SequenceNumber::from(6)),
Some(changes[6].clone())
);
assert_eq!(
reader.get_history_cache_change(SequenceNumber::from(7)),
None
);
assert_eq!(
reader.get_history_cache_change(SequenceNumber::from(8)),
Some(changes[8].clone())
);
assert_eq!(
reader.get_history_cache_change(SequenceNumber::from(9)),
Some(changes[9].clone())
);
}
}