use std::{
cmp::max,
collections::{BTreeMap, BTreeSet, HashSet},
iter::FromIterator,
ops::Bound::Included,
rc::Rc,
sync::{Arc, RwLock},
};
#[allow(unused_imports)]
use log::{debug, error, info, trace, warn};
use speedy::{Endianness, Writable};
use mio_extras::{
channel::{self as mio_channel, SyncSender, TrySendError},
timer::Timer,
};
use mio::Token;
use policy::{History, Reliability};
use crate::{
dds::{
ddsdata::DDSData,
dp_event_loop::{NACK_RESPONSE_DELAY, NACK_SUPPRESSION_DURATION},
qos::HasQoSPolicy,
with_key::datawriter::WriteOptions,
},
messages::submessages::submessages::AckSubmessage,
network::udp_sender::UDPSender,
serialization::{Message, MessageBuilder},
structure::{
cache_change::CacheChange,
dds_cache::DDSCache,
duration::Duration,
entity::RTPSEntity,
guid::{EntityId, GuidPrefix, GUID},
locator::Locator,
sequence_number::{FragmentNumber, SequenceNumber},
time::Timestamp,
},
};
use super::{
qos::{policy, QosPolicies},
rtps_reader_proxy::RtpsReaderProxy,
statusevents::{CountWithChange, DataWriterStatus},
};
#[derive(PartialEq, Eq, Clone, Copy)]
pub enum DeliveryMode {
Unicast,
Multicast,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub(crate) enum TimedEvent {
Heartbeat,
CacheCleaning,
SendRepairData { to_reader: GUID },
SendRepairFrags { to_reader: GUID },
}
pub(crate) struct WriterIngredients {
pub guid: GUID,
pub writer_command_receiver: mio_channel::Receiver<WriterCommand>,
pub topic_name: String,
pub qos_policies: QosPolicies,
pub status_sender: SyncSender<DataWriterStatus>,
}
impl WriterIngredients {
pub fn alt_entity_token(&self) -> Token {
self.guid.entity_id.as_alt_token()
}
}
pub(crate) struct Writer {
pub endianness: Endianness,
pub heartbeat_message_counter: i32,
pub push_mode: bool,
pub heartbeat_period: Option<Duration>,
pub cache_cleaning_period: Duration,
pub nack_response_delay: std::time::Duration,
pub nackfrag_response_delay: std::time::Duration,
#[allow(dead_code)]
pub nack_suppression_duration: std::time::Duration,
pub last_change_sequence_number: SequenceNumber,
pub first_change_sequence_number: SequenceNumber,
pub data_max_size_serialized: usize,
my_guid: GUID,
pub(crate) writer_command_receiver: mio_channel::Receiver<WriterCommand>,
readers: BTreeMap<GUID, RtpsReaderProxy>, matched_readers_count_total: i32, requested_incompatible_qos_count: i32, udp_sender: Rc<UDPSender>,
dds_cache: Arc<RwLock<DDSCache>>,
my_topic_name: String,
sequence_number_to_instant: BTreeMap<SequenceNumber, Timestamp>,
disposed_sequence_numbers: HashSet<SequenceNumber>,
pub(crate) timed_event_timer: Timer<TimedEvent>,
qos_policies: QosPolicies,
status_sender: SyncSender<DataWriterStatus>,
ack_waiter: Option<AckWaiter>,
}
pub(crate) enum WriterCommand {
DDSData {
ddsdata: DDSData,
write_options: WriteOptions,
sequence_number: SequenceNumber,
},
WaitForAcknowledgments {
all_acked: mio_channel::SyncSender<()>,
},
}
struct AckWaiter {
wait_until: SequenceNumber,
complete_channel: SyncSender<()>,
readers_pending: BTreeSet<GUID>,
}
impl Writer {
pub fn new(
i: WriterIngredients,
dds_cache: Arc<RwLock<DDSCache>>,
udp_sender: Rc<UDPSender>,
mut timed_event_timer: Timer<TimedEvent>,
) -> Self {
let heartbeat_period = i
.qos_policies
.reliability
.and_then(|reliability| {
if matches!(reliability, Reliability::Reliable { .. }) {
Some(Duration::from_secs(1))
} else {
None
}
})
.map(|hbp| {
if let Some(policy::Liveliness::ManualByTopic { lease_duration }) =
i.qos_policies.liveliness
{
let std_dur = lease_duration;
std_dur / 3
} else {
hbp
}
});
let cache_cleaning_period = Duration::from_secs(2 * 60);
if let Some(period) = heartbeat_period {
timed_event_timer.set_timeout(std::time::Duration::from(period), TimedEvent::Heartbeat);
}
timed_event_timer.set_timeout(
std::time::Duration::from(cache_cleaning_period),
TimedEvent::CacheCleaning,
);
Self {
endianness: Endianness::LittleEndian,
heartbeat_message_counter: 1,
push_mode: true,
heartbeat_period,
cache_cleaning_period,
nack_response_delay: NACK_RESPONSE_DELAY, nackfrag_response_delay: NACK_RESPONSE_DELAY, nack_suppression_duration: NACK_SUPPRESSION_DURATION,
first_change_sequence_number: SequenceNumber::from(1), last_change_sequence_number: SequenceNumber::from(0), data_max_size_serialized: 1024,
my_guid: i.guid,
writer_command_receiver: i.writer_command_receiver,
readers: BTreeMap::new(),
matched_readers_count_total: 0,
requested_incompatible_qos_count: 0,
udp_sender,
dds_cache,
my_topic_name: i.topic_name,
sequence_number_to_instant: BTreeMap::new(),
disposed_sequence_numbers: HashSet::new(),
timed_event_timer,
qos_policies: i.qos_policies,
status_sender: i.status_sender,
ack_waiter: None,
}
}
pub fn entity_token(&self) -> Token {
self.guid().entity_id.as_token()
}
pub fn is_reliable(&self) -> bool {
self.qos_policies.reliability.is_some()
}
pub fn local_readers(&self) -> Vec<EntityId> {
let min = GUID::new_with_prefix_and_id(self.my_guid.prefix, EntityId::MIN);
let max = GUID::new_with_prefix_and_id(self.my_guid.prefix, EntityId::MAX);
self
.readers
.range((Included(min), Included(max)))
.filter_map(|(guid, _)| {
if guid.prefix == self.my_guid.prefix {
Some(guid.entity_id)
} else {
None
}
})
.collect()
}
pub fn notify_new_data_to_all_readers(&mut self) {
}
pub fn handle_timed_event(&mut self) {
while let Some(e) = self.timed_event_timer.poll() {
match e {
TimedEvent::Heartbeat => {
self.handle_heartbeat_tick(false);
if let Some(period) = self.heartbeat_period {
self
.timed_event_timer
.set_timeout(std::time::Duration::from(period), TimedEvent::Heartbeat);
}
}
TimedEvent::CacheCleaning => {
self.handle_cache_cleaning();
self.timed_event_timer.set_timeout(
std::time::Duration::from(self.cache_cleaning_period),
TimedEvent::CacheCleaning,
);
}
TimedEvent::SendRepairData {
to_reader: reader_guid,
} => {
self.handle_repair_data_send(reader_guid);
if let Some(rp) = self.lookup_readerproxy_mut(reader_guid) {
if rp.repair_mode {
let delay_to_next_repair = self
.qos_policies
.deadline()
.map_or_else(|| Duration::from_millis(100), |dl| dl.0)
/ 5;
self.timed_event_timer.set_timeout(
std::time::Duration::from(delay_to_next_repair),
TimedEvent::SendRepairData {
to_reader: reader_guid,
},
);
}
}
}
TimedEvent::SendRepairFrags {
to_reader: reader_guid,
} => {
self.handle_repair_frags_send(reader_guid);
if let Some(rp) = self.lookup_readerproxy_mut(reader_guid) {
if rp.repair_frags_requested() {
self.timed_event_timer.set_timeout(
self.nackfrag_response_delay,
TimedEvent::SendRepairFrags {
to_reader: reader_guid,
},
);
} } } } } }
fn handle_cache_cleaning(&mut self) {
let resource_limit = 32;
match self.qos_policies.history {
None => {
self.remove_all_acked_changes_but_keep_depth(1);
}
Some(History::KeepAll) => {
self.remove_all_acked_changes_but_keep_depth(resource_limit);
}
Some(History::KeepLast { depth: d }) => {
self.remove_all_acked_changes_but_keep_depth(d as usize);
}
}
}
pub fn process_writer_command(&mut self) {
while let Ok(cc) = self.writer_command_receiver.try_recv() {
match cc {
WriterCommand::DDSData {
ddsdata,
write_options,
sequence_number,
} => {
let fragmentation_needed = ddsdata.payload_size() > self.data_max_size_serialized;
let timestamp =
self.insert_to_history_cache(ddsdata, write_options.clone(), sequence_number);
self.increase_heartbeat_counter();
if !fragmentation_needed {
let mut message_builder = MessageBuilder::new();
if self.push_mode {
if let Some(cache_change) = self
.dds_cache
.read()
.unwrap()
.topic_get_change(&self.my_topic_name, ×tamp)
{
if let Some(src_ts) = cache_change.write_options.source_timestamp {
message_builder = message_builder.ts_msg(self.endianness, Some(src_ts));
}
message_builder = message_builder.data_msg(
cache_change,
EntityId::UNKNOWN, self.my_guid.entity_id, self.endianness,
);
} else {
error!(
"process_writer_command: The dog ate my CacheChange {:?} topic={:?}",
sequence_number,
self.topic_name(),
);
}
} else {
};
let final_flag = false; let liveliness_flag = false; let data_hb_message = message_builder
.heartbeat_msg(self, EntityId::UNKNOWN, final_flag, liveliness_flag)
.add_header_and_build(self.my_guid.prefix);
self.send_message_to_readers(
DeliveryMode::Multicast,
&data_hb_message,
&mut self.readers.values(),
);
} else {
if let Some(cache_change) = self
.dds_cache
.read()
.unwrap()
.topic_get_change(&self.my_topic_name, ×tamp)
{
let fragment_size: u32 = self.data_max_size_serialized as u32; let data_size: u32 = cache_change.data_value.payload_size() as u32; let num_frags =
(data_size / fragment_size) + u32::from(data_size % fragment_size != 0); if self.push_mode {
for frag_num in FragmentNumber::range_inclusive(
FragmentNumber::new(1),
FragmentNumber::new(num_frags),
) {
let mut message_builder = MessageBuilder::new();
if let Some(src_ts) = cache_change.write_options.source_timestamp {
message_builder = message_builder.ts_msg(self.endianness, Some(src_ts));
}
message_builder = message_builder.data_frag_msg(
cache_change,
EntityId::UNKNOWN, self.my_guid.entity_id, frag_num,
fragment_size as u16, data_size,
self.endianness,
);
self.send_message_to_readers(
DeliveryMode::Multicast,
&message_builder.add_header_and_build(self.my_guid.prefix),
&mut self.readers.values(),
);
} }
let final_flag = false; let liveliness_flag = false; let hb_message = MessageBuilder::new()
.heartbeat_msg(self, EntityId::UNKNOWN, final_flag, liveliness_flag)
.add_header_and_build(self.my_guid.prefix);
self.send_message_to_readers(
DeliveryMode::Multicast,
&hb_message,
&mut self.readers.values(),
);
} else {
error!(
"process_writer_command (frag): The dog ate my CacheChange {:?} topic={:?}",
sequence_number,
self.topic_name(),
);
}
} }
WriterCommand::WaitForAcknowledgments { all_acked } => {
let wait_until = self.last_change_sequence_number;
let readers_pending: BTreeSet<_> = self
.readers
.iter()
.filter_map(|(guid, rp)| {
if rp.qos().reliability().is_some() {
if rp.all_acked_before <= wait_until {
Some(*guid)
} else {
None }
} else {
None }
})
.collect();
if readers_pending.is_empty() {
let _ = all_acked.try_send(()); self.ack_waiter = None;
} else {
self.ack_waiter = Some(AckWaiter {
wait_until,
complete_channel: all_acked,
readers_pending,
});
}
}
}
}
}
fn insert_to_history_cache(
&mut self,
data: DDSData,
write_options: WriteOptions,
sequence_number: SequenceNumber,
) -> Timestamp {
let new_sequence_number = sequence_number;
self.last_change_sequence_number = new_sequence_number;
self.first_change_sequence_number = match self.qos().history {
None => self.last_change_sequence_number,
Some(History::KeepAll) =>
{
max(self.first_change_sequence_number, SequenceNumber::from(1))
}
Some(History::KeepLast { depth }) => max(
self.last_change_sequence_number - SequenceNumber::from(i64::from(depth - 1)),
SequenceNumber::from(1),
),
};
assert!(self.first_change_sequence_number > SequenceNumber::zero());
assert!(self.last_change_sequence_number > SequenceNumber::zero());
let new_cache_change = CacheChange::new(self.guid(), new_sequence_number, write_options, data);
let timestamp = Timestamp::now();
self
.dds_cache
.write()
.unwrap()
.add_change(&self.my_topic_name, ×tamp, new_cache_change);
self
.sequence_number_to_instant
.insert(new_sequence_number, timestamp);
for reader in &mut self.readers.values_mut() {
reader.notify_new_cache_change(new_sequence_number);
}
timestamp
}
pub fn handle_heartbeat_tick(&mut self, is_manual_assertion: bool) {
let final_flag = false;
let liveliness_flag = is_manual_assertion;
trace!(
"heartbeat tick in topic {:?} have {} readers",
self.topic_name(),
self.readers.len()
);
self.increase_heartbeat_counter();
if self
.readers
.values()
.all(|rp| self.last_change_sequence_number < rp.all_acked_before)
{
trace!("heartbeat tick: all readers have all available data.");
} else {
let hb_message = MessageBuilder::new()
.ts_msg(self.endianness, Some(Timestamp::now()))
.heartbeat_msg(self, EntityId::UNKNOWN, final_flag, liveliness_flag)
.add_header_and_build(self.my_guid.prefix);
debug!(
"Writer {:?} topic={:} HEARTBEAT {:?}",
self.guid().entity_id,
self.topic_name(),
hb_message
);
self.send_message_to_readers(
DeliveryMode::Multicast,
&hb_message,
&mut self.readers.values(),
);
}
}
pub fn handle_ack_nack(
&mut self,
reader_guid_prefix: GuidPrefix,
ack_submessage: &AckSubmessage,
) {
if !self.is_reliable() {
warn!(
"Writer {:x?} is best effort! It should not handle acknack messages!",
self.entity_id()
);
return;
}
match ack_submessage {
AckSubmessage::AckNack(ref an) => {
let last_seq = self.last_change_sequence_number;
match an.reader_sn_state.iter().next().map(i64::from) {
Some(0) => warn!("Request for SN zero! : {:?}", an),
Some(_) | None => (), }
let my_topic = self.my_topic_name.clone(); let reader_guid = GUID::new(reader_guid_prefix, an.reader_id);
self.update_ack_waiters(reader_guid, Some(an.reader_sn_state.base()));
if let Some(reader_proxy) = self.lookup_readerproxy_mut(reader_guid) {
reader_proxy.handle_ack_nack(ack_submessage, last_seq);
let reader_guid = reader_proxy.remote_reader_guid; if let Some(&high) = reader_proxy.unsent_changes.iter().next_back() {
if high > last_seq {
warn!(
"ReaderProxy {:?} thinks we need to send {:?} but I have only up to {:?}",
reader_proxy.remote_reader_guid, reader_proxy.unsent_changes, last_seq
);
}
}
if an.reader_sn_state.base() > last_seq + SequenceNumber::from(1i64) {
warn!(
"ACKNACK from {:?} acks {:?}, but I have only up to {:?} count={:?} topic={:?}",
reader_proxy.remote_reader_guid, an.reader_sn_state, last_seq, an.count, my_topic
);
}
if let Some(max_req_sn) = an.reader_sn_state.iter().next_back() {
if max_req_sn > last_seq {
warn!(
"ACKNACK from {:?} requests {:?} but I have only up to {:?}",
reader_proxy.remote_reader_guid,
an.reader_sn_state.iter().collect::<Vec<SequenceNumber>>(),
last_seq
);
}
}
if reader_proxy.all_acked_before > last_seq {
reader_proxy.repair_mode = false;
} else {
reader_proxy.repair_mode = true; self.timed_event_timer.set_timeout(
self.nack_response_delay,
TimedEvent::SendRepairData {
to_reader: reader_guid,
},
);
}
}
}
AckSubmessage::NackFrag(ref nackfrag) => {
let reader_guid = GUID::new(reader_guid_prefix, nackfrag.reader_id);
if let Some(reader_proxy) = self.lookup_readerproxy_mut(reader_guid) {
reader_proxy.mark_frags_requested(nackfrag.writer_sn, &nackfrag.fragment_number_state);
}
self.timed_event_timer.set_timeout(
self.nackfrag_response_delay,
TimedEvent::SendRepairFrags {
to_reader: reader_guid,
},
);
}
}
}
fn update_ack_waiters(&mut self, guid: GUID, acked_before: Option<SequenceNumber>) {
let mut completed = false;
match &mut self.ack_waiter {
Some(aw) => match acked_before {
None => {
aw.readers_pending.remove(&guid);
}
Some(acked_before) => {
if aw.wait_until < acked_before {
aw.readers_pending.remove(&guid);
}
if aw.readers_pending.is_empty() {
let _ = aw.complete_channel.try_send(());
completed = true;
}
}
},
None => (),
}
if completed {
self.ack_waiter = None;
}
}
fn handle_repair_data_send(&mut self, to_reader: GUID) {
if let Some(mut reader_proxy) = self.readers.remove(&to_reader) {
self.handle_repair_data_send_worker(&mut reader_proxy);
if let Some(rp) = self
.readers
.insert(reader_proxy.remote_reader_guid, reader_proxy)
{
error!("Reader proxy was duplicated somehow??? {:?}", rp);
}
}
}
fn handle_repair_frags_send(&mut self, to_reader: GUID) {
if let Some(mut reader_proxy) = self.readers.remove(&to_reader) {
self.handle_repair_frags_send_worker(&mut reader_proxy);
if let Some(rp) = self
.readers
.insert(reader_proxy.remote_reader_guid, reader_proxy)
{
error!("Reader proxy was duplicated somehow??? (frags) {:?}", rp);
}
}
}
fn handle_repair_data_send_worker(&mut self, reader_proxy: &mut RtpsReaderProxy) {
let reader_guid = reader_proxy.remote_reader_guid;
let mut partial_message = MessageBuilder::new()
.dst_submessage(self.endianness, reader_guid.prefix)
.ts_msg(self.endianness, Some(Timestamp::now()));
debug!(
"Repair data send due to ACKNACK. ReaderProxy Unsent changes: {:?}",
reader_proxy.unsent_changes
);
let mut no_longer_relevant = Vec::new();
let mut found_data = false;
if let Some(&unsent_sn) = reader_proxy.unsent_changes.iter().next() {
if let Some(timestamp) = self.sequence_number_to_instant(unsent_sn) {
if let Some(cache_change) = self
.dds_cache
.read()
.unwrap()
.topic_get_change(&self.my_topic_name, ×tamp)
{
partial_message = partial_message.data_msg(
cache_change,
reader_guid.entity_id, self.my_guid.entity_id, self.endianness,
);
} else {
no_longer_relevant.push(unsent_sn);
}
} else {
if unsent_sn < self.first_change_sequence_number {
debug!(
"Reader {:?} requested too old data {:?}. I have only from {:?}. Topic {:?}",
&reader_proxy, unsent_sn, self.first_change_sequence_number, &self.my_topic_name
);
} else if self.disposed_sequence_numbers.contains(&unsent_sn) {
debug!(
"Reader {:?} requested disposed {:?}. Topic {:?}",
&reader_proxy, unsent_sn, &self.my_topic_name
);
} else {
error!(
"handle ack_nack writer {:?} seq.number {:?} missing from instant map",
self.my_guid, unsent_sn
);
}
no_longer_relevant.push(unsent_sn);
}
reader_proxy.unsent_changes.remove(&unsent_sn);
found_data = true;
}
if !no_longer_relevant.is_empty() {
partial_message =
partial_message.gap_msg(&BTreeSet::from_iter(no_longer_relevant), self, reader_guid);
}
let data_gap_msg = partial_message.add_header_and_build(self.my_guid.prefix);
self.send_message_to_readers(
DeliveryMode::Unicast,
&data_gap_msg,
&mut std::iter::once(&*reader_proxy),
);
if found_data {
} else {
reader_proxy.repair_mode = false;
}
}
fn handle_repair_frags_send_worker(
&mut self,
reader_proxy: &mut RtpsReaderProxy,
) {
let max_send_count = 8;
for (seq_num, frag_num) in reader_proxy.frags_requested_iterator().take(max_send_count) {
if let Some(timestamp) = self.sequence_number_to_instant(seq_num) {
if let Some(cache_change) = self
.dds_cache
.read()
.unwrap()
.topic_get_change(&self.my_topic_name, ×tamp)
{
let mut message_builder = MessageBuilder::new();
if let Some(src_ts) = cache_change.write_options.source_timestamp {
message_builder = message_builder.ts_msg(self.endianness, Some(src_ts));
}
let fragment_size: u32 = self.data_max_size_serialized as u32; let data_size: u32 = cache_change.data_value.payload_size() as u32;
message_builder = message_builder.data_frag_msg(
cache_change,
reader_proxy.remote_reader_guid.entity_id, self.my_guid.entity_id, frag_num,
fragment_size as u16, data_size,
self.endianness,
);
self.send_message_to_readers(
DeliveryMode::Unicast,
&message_builder.add_header_and_build(self.my_guid.prefix),
&mut self.readers.values(),
);
} else {
error!(
"handle_repair_frags_send_worker: {:?} missing from DDSCache. topic={:?}",
seq_num, self.my_topic_name
);
}
} else {
error!(
"handle_repair_frags_send_worker: {:?} missing from instant map. topic={:?}",
seq_num, self.my_topic_name
);
}
reader_proxy.mark_frag_sent(seq_num, &frag_num);
} }
fn remove_all_acked_changes_but_keep_depth(&mut self, depth: usize) {
let acked_by_all_readers = self
.readers
.values()
.map(RtpsReaderProxy::acked_up_to_before)
.min()
.unwrap_or_else(SequenceNumber::zero);
let first_keeper = max(
acked_by_all_readers - SequenceNumber::from(depth),
self.first_change_sequence_number,
);
if let Some(&keep_instant) = self.sequence_number_to_instant.get(&first_keeper) {
self
.dds_cache
.write()
.unwrap()
.topic_remove_before(&self.my_topic_name, keep_instant);
} else {
warn!("{:?} missing from instant map", first_keeper);
}
self.first_change_sequence_number = first_keeper;
self.sequence_number_to_instant = self.sequence_number_to_instant.split_off(&first_keeper);
}
fn increase_heartbeat_counter(&mut self) {
self.heartbeat_message_counter += 1;
}
fn send_message_to_readers(
&self,
preferred_mode: DeliveryMode,
message: &Message,
readers: &mut dyn Iterator<Item = &RtpsReaderProxy>,
) {
let buffer = message.write_to_vec_with_ctx(self.endianness).unwrap();
let mut already_sent_to = BTreeSet::new();
macro_rules! send_unless_sent_and_mark {
($locs:expr) => {
for loc in $locs.iter() {
if already_sent_to.contains(loc) {
trace!("Already sent to {:?}", loc);
} else {
self.udp_sender.send_to_locator(&buffer, loc);
already_sent_to.insert(loc.clone());
}
}
};
}
for reader in readers {
match (
preferred_mode,
reader
.unicast_locator_list
.iter()
.find(|l| Locator::is_udp(l)),
reader
.multicast_locator_list
.iter()
.find(|l| Locator::is_udp(l)),
) {
(DeliveryMode::Multicast, _, Some(_mc_locator)) => {
send_unless_sent_and_mark!(reader.multicast_locator_list);
}
(DeliveryMode::Unicast, Some(_uc_locator), _) => {
send_unless_sent_and_mark!(reader.unicast_locator_list)
}
(_delivery_mode, _, Some(_mc_locator)) => {
send_unless_sent_and_mark!(reader.multicast_locator_list);
}
(_delivery_mode, Some(_uc_locator), _) => {
send_unless_sent_and_mark!(reader.unicast_locator_list)
}
(_delivery_mode, None, None) => {
warn!("send_message_to_readers: No locators for {:?}", reader);
}
} }
}
fn send_status(&self, status: DataWriterStatus) {
self
.status_sender
.try_send(status)
.unwrap_or_else(|e| match e {
TrySendError::Full(_) => (), TrySendError::Disconnected(_) => {
debug!("send_status - status receiver is disconnected");
}
TrySendError::Io(e) => {
warn!("send_status - io error {:?}", e);
}
});
}
pub fn update_reader_proxy(
&mut self,
reader_proxy: &RtpsReaderProxy,
requested_qos: &QosPolicies,
) {
debug!("update_reader_proxy topic={:?}", self.my_topic_name);
match self.qos_policies.compliance_failure_wrt(requested_qos) {
None => {
let change = self.matched_reader_update(reader_proxy.clone());
if change > 0 {
self.matched_readers_count_total += change;
self.send_status(DataWriterStatus::PublicationMatched {
total: CountWithChange::new(self.matched_readers_count_total, change),
current: CountWithChange::new(self.readers.len() as i32, change),
});
if let Some(Reliability::Reliable { .. }) = self.qos_policies.reliability {
self.notify_new_data_to_all_readers();
}
info!(
"Matched new remote reader on topic={:?} reader={:?}",
self.topic_name(),
&reader_proxy.remote_reader_guid
);
debug!("Reader details: {:?}", &reader_proxy);
}
}
Some(bad_policy_id) => {
warn!(
"update_reader_proxy - QoS mismatch {:?} topic={:?}",
bad_policy_id,
self.topic_name()
);
self.requested_incompatible_qos_count += 1;
self.send_status(DataWriterStatus::OfferedIncompatibleQos {
count: CountWithChange::new(self.requested_incompatible_qos_count, 1),
last_policy_id: bad_policy_id,
policies: Vec::new(), });
}
} }
fn matched_reader_update(&mut self, reader_proxy: RtpsReaderProxy) -> i32 {
let (to_insert, count_change) = match self.readers.remove(&reader_proxy.remote_reader_guid) {
None => (reader_proxy, 1),
Some(existing_reader) => (
RtpsReaderProxy {
is_active: existing_reader.is_active,
all_acked_before: existing_reader.all_acked_before,
unsent_changes: existing_reader.unsent_changes,
repair_mode: existing_reader.repair_mode,
..reader_proxy
},
0,
),
};
self.readers.insert(to_insert.remote_reader_guid, to_insert);
count_change
}
fn matched_reader_remove(&mut self, guid: GUID) -> Option<RtpsReaderProxy> {
let removed = self.readers.remove(&guid);
if let Some(ref removed_reader) = removed {
info!(
"Removed reader proxy. topic={:?} reader={:?}",
self.topic_name(),
removed_reader.remote_reader_guid,
);
debug!("Removed reader proxy details: {:?}", removed_reader);
}
removed
}
pub fn reader_lost(&mut self, guid: GUID) {
if self.readers.contains_key(&guid) {
info!(
"reader_lost topic={:?} reader={:?}",
self.topic_name(),
&guid
);
self.matched_reader_remove(guid);
self.send_status(DataWriterStatus::PublicationMatched {
total: CountWithChange::new(self.matched_readers_count_total, 0),
current: CountWithChange::new(self.readers.len() as i32, -1),
});
}
self.update_ack_waiters(guid, None);
}
pub fn participant_lost(&mut self, guid_prefix: GuidPrefix) {
let lost_writers: Vec<GUID> = self
.readers
.range(guid_prefix.range())
.map(|(g, _)| *g)
.collect();
for writer in lost_writers {
self.reader_lost(writer);
}
}
fn lookup_readerproxy_mut(&mut self, guid: GUID) -> Option<&mut RtpsReaderProxy> {
self.readers.get_mut(&guid)
}
pub fn sequence_number_to_instant(&self, seqnumber: SequenceNumber) -> Option<Timestamp> {
self.sequence_number_to_instant.get(&seqnumber).copied()
}
pub fn topic_name(&self) -> &String {
&self.my_topic_name
}
}
impl RTPSEntity for Writer {
fn guid(&self) -> GUID {
self.my_guid
}
}
impl HasQoSPolicy for Writer {
fn qos(&self) -> QosPolicies {
self.qos_policies.clone()
}
}
#[cfg(test)]
mod tests {
use std::thread;
use byteorder::LittleEndian;
use log::info;
use crate::{
dds::{
participant::DomainParticipant, qos::QosPolicies, topic::TopicKind,
with_key::datawriter::DataWriter,
},
serialization::cdr_serializer::CDRSerializerAdapter,
test::random_data::*,
};
#[test]
fn test_writer_receives_datawriter_cache_change_notifications() {
let domain_participant = DomainParticipant::new(0).expect("Failed to create participant");
let qos = QosPolicies::qos_none();
let _default_dw_qos = QosPolicies::qos_none();
let publisher = domain_participant
.create_publisher(&qos)
.expect("Failed to create publisher");
let topic = domain_participant
.create_topic(
"Aasii".to_string(),
"Huh?".to_string(),
&qos,
TopicKind::WithKey,
)
.expect("Failed to create topic");
let data_writer: DataWriter<RandomData, CDRSerializerAdapter<RandomData, LittleEndian>> =
publisher
.create_datawriter(&topic, None)
.expect("Failed to create datawriter");
let data = RandomData {
a: 4,
b: "Fobar".to_string(),
};
let data2 = RandomData {
a: 2,
b: "Fobar".to_string(),
};
let data3 = RandomData {
a: 3,
b: "Fobar".to_string(),
};
let write_result = data_writer.write(data, None);
info!("writerResult: {:?}", write_result);
data_writer
.write(data2, None)
.expect("Unable to write data");
info!("writerResult: {:?}", write_result);
let write_result = data_writer.write(data3, None);
thread::sleep(std::time::Duration::from_millis(100));
info!("writerResult: {:?}", write_result);
}
}