use std::{
cmp::max,
collections::{BTreeMap, BTreeSet},
ops::Bound::Included,
rc::Rc,
sync::{atomic, Arc, Mutex},
};
use core::task::Waker;
#[allow(unused_imports)]
use log::{debug, error, info, trace, warn};
use speedy::{Endianness, Writable};
use mio_extras::{
channel::{self as mio_channel, TrySendError},
timer::Timer,
};
use mio_06::Token;
use crate::{
dds::{
ddsdata::DDSData,
qos::{
policy,
policy::{History, Reliability},
HasQoSPolicy, QosPolicies,
},
statusevents::{
CountWithChange, DataWriterStatus, DomainParticipantStatusEvent, StatusChannelSender,
},
with_key::datawriter::WriteOptions,
},
messages::submessages::submessages::AckSubmessage,
network::udp_sender::UDPSender,
rtps::{
constant::{NACK_RESPONSE_DELAY, NACK_SUPPRESSION_DURATION},
rtps_reader_proxy::RtpsReaderProxy,
Message, MessageBuilder,
},
structure::{
cache_change::CacheChange,
duration::Duration,
entity::RTPSEntity,
guid::{EntityId, GuidPrefix, GUID},
locator::Locator,
sequence_number::{FragmentNumber, FragmentNumberRange, SequenceNumber},
time::Timestamp,
},
};
#[cfg(feature = "security")]
use crate::{
rtps::Submessage,
security::{security_plugins::SecurityPluginsHandle, SecurityResult},
};
#[cfg(not(feature = "security"))]
use crate::no_security::SecurityPluginsHandle;
#[derive(PartialEq, Eq, Clone, Copy)]
pub enum DeliveryMode {
Unicast,
Multicast,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub(crate) enum TimedEvent {
Heartbeat,
CacheCleaning,
SendRepairData { to_reader: GUID },
SendRepairFrags { to_reader: GUID },
}
pub(crate) struct WriterIngredients {
pub guid: GUID,
pub writer_command_receiver: mio_channel::Receiver<WriterCommand>,
pub writer_command_receiver_waker: Arc<Mutex<Option<Waker>>>,
pub topic_name: String,
pub(crate) like_stateless: bool, pub qos_policies: QosPolicies,
pub status_sender: StatusChannelSender<DataWriterStatus>,
pub(crate) security_plugins: Option<SecurityPluginsHandle>,
}
impl WriterIngredients {
pub fn alt_entity_token(&self) -> Token {
self.guid.entity_id.as_alt_token()
}
}
struct AckWaiter {
wait_until: SequenceNumber,
complete_channel: StatusChannelSender<()>,
readers_pending: BTreeSet<GUID>,
}
impl AckWaiter {
pub fn notify_wait_complete(&self) {
let _ = self.complete_channel.try_send(());
}
pub fn reader_acked_or_lost(&mut self, guid: GUID, acked_before: Option<SequenceNumber>) -> bool {
match acked_before {
None => {
self.readers_pending.remove(&guid);
}
Some(acked_before) if self.wait_until < acked_before => {
self.readers_pending.remove(&guid);
}
Some(_) => (),
}
self.readers_pending.is_empty()
}
}
struct HistoryBuffer {
first_seq: SequenceNumber, last_seq: SequenceNumber,
sequence_number_to_instant: BTreeMap<SequenceNumber, Timestamp>,
history_buffer: BTreeMap<Timestamp, CacheChange>,
topic_name: String,
}
impl HistoryBuffer {
fn new(topic_name: String) -> Self {
HistoryBuffer {
first_seq: SequenceNumber::new(1),
last_seq: SequenceNumber::new(0), sequence_number_to_instant: BTreeMap::new(),
history_buffer: BTreeMap::new(),
topic_name,
}
}
fn last_change_sequence_number(&self) -> SequenceNumber {
self.last_seq
}
fn first_change_sequence_number(&self) -> SequenceNumber {
self.first_seq
}
fn get_change(&self, ts: Timestamp) -> Option<&CacheChange> {
self.history_buffer.get(&ts)
}
fn get_by_sn(&self, sn: SequenceNumber) -> Option<&CacheChange> {
self
.sequence_number_to_instant
.get(&sn)
.and_then(|ts| self.get_change(*ts))
}
fn add_change(&mut self, timestamp: Timestamp, new_cache_change: CacheChange) {
let new_seq = new_cache_change.sequence_number;
let had_already_same = self.history_buffer.insert(timestamp, new_cache_change);
if had_already_same.is_some() {
error!(
"HistoryBuffer: Tried to insert CacheChange with duplicate key. Discarding old sample."
);
}
self.sequence_number_to_instant.insert(new_seq, timestamp);
if new_seq > self.last_change_sequence_number() {
self.last_seq = new_seq;
} else {
error!("HistoryBuffer: Tried to add changes out of SequenceNumber order.");
}
}
fn remove_changes_before(&mut self, remove_before_seq: SequenceNumber) {
if let Some(remove_before) = self.sequence_number_to_instant.get(&remove_before_seq) {
let count_before = self.history_buffer.len();
self.history_buffer = self.history_buffer.split_off(remove_before);
self.sequence_number_to_instant = self
.sequence_number_to_instant
.split_off(&remove_before_seq);
let count_after = self.history_buffer.len();
debug!(
"HistoryBuffer: remove_changes_before. count before={} after={}, topic={}",
count_before, count_after, self.topic_name
);
if remove_before_seq >= self.first_seq {
self.first_seq = remove_before_seq; } else {
error!(
"HistoryBuffer: Trying to remove before the first SequenceNumber. But how did we find \
it in the sequence_number_to_instant map? Looks like a bug."
);
}
} else {
if remove_before_seq != SequenceNumber::new(1) {
warn!(
"HistoryBuffer: remove_changes_before. Cannot find {:?} first={:?} last={:?} topic={}",
remove_before_seq, self.first_seq, self.last_seq, self.topic_name
);
}
}
}
}
pub(crate) struct Writer {
pub endianness: Endianness,
pub heartbeat_message_counter: atomic::AtomicI32,
pub push_mode: bool,
pub heartbeat_period: Option<Duration>,
pub cache_cleaning_period: Duration,
pub nack_response_delay: std::time::Duration,
pub nackfrag_response_delay: std::time::Duration,
pub repairfrags_continue_delay: std::time::Duration,
#[allow(dead_code)]
pub nack_suppression_duration: std::time::Duration,
pub data_max_size_serialized: usize,
my_guid: GUID,
pub(crate) writer_command_receiver: mio_channel::Receiver<WriterCommand>,
writer_command_receiver_waker: Arc<Mutex<Option<Waker>>>,
readers: BTreeMap<GUID, RtpsReaderProxy>,
matched_readers_count_total: i32, requested_incompatible_qos_count: i32,
udp_sender: Rc<UDPSender>,
like_stateless: bool,
my_topic_name: String,
history_buffer: HistoryBuffer,
pub(crate) timed_event_timer: Timer<TimedEvent>,
qos_policies: QosPolicies,
status_sender: StatusChannelSender<DataWriterStatus>,
ack_waiter: Option<AckWaiter>,
participant_status_sender: StatusChannelSender<DomainParticipantStatusEvent>,
security_plugins: Option<SecurityPluginsHandle>,
}
pub enum WriterCommand {
DDSData {
ddsdata: DDSData,
write_options: WriteOptions,
sequence_number: SequenceNumber,
},
WaitForAcknowledgments {
all_acked: StatusChannelSender<()>,
},
}
impl Writer {
pub fn new(
i: WriterIngredients,
udp_sender: Rc<UDPSender>,
mut timed_event_timer: Timer<TimedEvent>,
participant_status_sender: StatusChannelSender<DomainParticipantStatusEvent>,
) -> Self {
if i.like_stateless && i.qos_policies.is_reliable() {
panic!("Attempted to create a stateless-like Writer with other than BestEffort reliability");
}
let heartbeat_period = i
.qos_policies
.reliability
.and_then(|reliability| {
if matches!(reliability, Reliability::Reliable { .. }) {
Some(Duration::from_secs(1))
} else {
None
}
})
.map(|hbp| {
if let Some(policy::Liveliness::ManualByTopic { lease_duration }) =
i.qos_policies.liveliness
{
let std_dur = lease_duration;
std_dur / 3
} else {
hbp
}
});
let cache_cleaning_period = Duration::from_secs(6);
if let Some(period) = heartbeat_period {
timed_event_timer.set_timeout(std::time::Duration::from(period), TimedEvent::Heartbeat);
}
timed_event_timer.set_timeout(
std::time::Duration::from(cache_cleaning_period),
TimedEvent::CacheCleaning,
);
Self {
endianness: Endianness::LittleEndian,
heartbeat_message_counter: atomic::AtomicI32::new(1),
push_mode: true,
heartbeat_period,
cache_cleaning_period,
nack_response_delay: NACK_RESPONSE_DELAY, nackfrag_response_delay: NACK_RESPONSE_DELAY, repairfrags_continue_delay: std::time::Duration::from_millis(1),
nack_suppression_duration: NACK_SUPPRESSION_DURATION,
data_max_size_serialized: 1024,
my_guid: i.guid,
writer_command_receiver: i.writer_command_receiver,
writer_command_receiver_waker: i.writer_command_receiver_waker,
readers: BTreeMap::new(),
matched_readers_count_total: 0,
requested_incompatible_qos_count: 0,
udp_sender,
my_topic_name: i.topic_name.clone(),
history_buffer: HistoryBuffer::new(i.topic_name),
timed_event_timer,
like_stateless: i.like_stateless,
qos_policies: i.qos_policies,
status_sender: i.status_sender,
participant_status_sender,
ack_waiter: None,
security_plugins: i.security_plugins,
}
}
pub fn entity_token(&self) -> Token {
self.guid().entity_id.as_token()
}
pub fn is_reliable(&self) -> bool {
self.qos_policies.is_reliable()
}
pub fn local_readers(&self) -> Vec<EntityId> {
let min = GUID::new_with_prefix_and_id(self.my_guid.prefix, EntityId::MIN);
let max = GUID::new_with_prefix_and_id(self.my_guid.prefix, EntityId::MAX);
self
.readers
.range((Included(min), Included(max)))
.filter_map(|(guid, _)| {
if guid.prefix == self.my_guid.prefix {
Some(guid.entity_id)
} else {
None
}
})
.collect()
}
pub fn handle_timed_event(&mut self) {
while let Some(e) = self.timed_event_timer.poll() {
match e {
TimedEvent::Heartbeat => {
self.handle_heartbeat_tick(false);
if let Some(period) = self.heartbeat_period {
self
.timed_event_timer
.set_timeout(std::time::Duration::from(period), TimedEvent::Heartbeat);
}
}
TimedEvent::CacheCleaning => {
self.handle_cache_cleaning();
self.timed_event_timer.set_timeout(
std::time::Duration::from(self.cache_cleaning_period),
TimedEvent::CacheCleaning,
);
}
TimedEvent::SendRepairData {
to_reader: reader_guid,
} => {
self.handle_repair_data_send(reader_guid);
if let Some(rp) = self.lookup_reader_proxy_mut(reader_guid) {
if rp.repair_mode {
let delay_to_next_repair = self
.qos_policies
.deadline()
.map_or_else(|| Duration::from_millis(1), |dl| dl.0)
/ 5;
self.timed_event_timer.set_timeout(
std::time::Duration::from(delay_to_next_repair),
TimedEvent::SendRepairData {
to_reader: reader_guid,
},
);
}
}
}
TimedEvent::SendRepairFrags {
to_reader: reader_guid,
} => {
self.handle_repair_frags_send(reader_guid);
if let Some(rp) = self.lookup_reader_proxy_mut(reader_guid) {
if rp.repair_frags_requested() {
self.timed_event_timer.set_timeout(
self.repairfrags_continue_delay,
TimedEvent::SendRepairFrags {
to_reader: reader_guid,
},
);
} } } } } }
fn handle_cache_cleaning(&mut self) {
let resource_limit = 32;
match self.qos_policies.history {
None => {
self.remove_all_acked_changes_but_keep_depth(Some(1), resource_limit);
}
Some(History::KeepAll) => {
self.remove_all_acked_changes_but_keep_depth(None, resource_limit);
}
Some(History::KeepLast { depth: d }) => {
self.remove_all_acked_changes_but_keep_depth(Some(d as usize), resource_limit);
}
}
}
fn num_frags_and_frag_size(&self, payload_size: usize) -> (u32, u16) {
let fragment_size = self.data_max_size_serialized as u32; let data_size = payload_size as u32; let num_frags = (data_size / fragment_size) + u32::from(data_size % fragment_size != 0); debug!("Fragmenting {data_size} to {num_frags} x {fragment_size}");
(num_frags, fragment_size as u16)
}
pub fn process_writer_command(&mut self) {
while let Ok(cc) = self.writer_command_receiver.try_recv() {
match cc {
WriterCommand::DDSData {
ddsdata: dds_data,
write_options,
sequence_number,
} => {
{
self
.writer_command_receiver_waker
.lock()
.unwrap()
.as_ref()
.map(|w| w.wake_by_ref());
}
let timestamp =
self.insert_to_history_buffer(dds_data, write_options.clone(), sequence_number);
if !self.like_stateless {
for reader in &mut self.readers.values_mut() {
reader.notify_new_cache_change(sequence_number);
if let Some(single_reader_guid) = write_options.to_single_reader() {
if reader.remote_reader_guid != single_reader_guid {
reader.insert_pending_gap(sequence_number);
}
}
}
}
if self.push_mode {
if let Some(cc) = self.history_buffer.get_change(timestamp) {
let target_reader_opt = match write_options.to_single_reader() {
Some(guid) => self.readers.get(&guid), None => None, };
let send_also_heartbeat = true;
self.send_cache_change(cc, send_also_heartbeat, target_reader_opt);
} else {
error!("Lost the cache change that was just added?!");
}
} else {
let final_flag = false; let liveliness_flag = false; let hb_message = MessageBuilder::new()
.heartbeat_msg(
self.entity_id(), self.history_buffer.first_change_sequence_number(),
self.history_buffer.last_change_sequence_number(),
self.next_heartbeat_count(),
self.endianness,
EntityId::UNKNOWN, final_flag,
liveliness_flag,
)
.add_header_and_build(self.my_guid.prefix);
self.send_message_to_readers(
DeliveryMode::Multicast,
hb_message,
&mut self.readers.values(),
);
}
}
WriterCommand::WaitForAcknowledgments { all_acked } => {
if self.like_stateless {
error!(
"Attempted to wait for acknowledgements in a stateless Writer, which currently only \
supports BestEffort QoS. Ignoring. topic={:?}",
self.my_topic_name
);
let _ = all_acked.try_send(()); return;
}
let wait_until = self.history_buffer.last_change_sequence_number();
let readers_pending: BTreeSet<_> = self
.readers
.iter()
.filter_map(|(guid, rp)| {
if rp.qos().is_reliable() && rp.all_acked_before <= wait_until {
Some(*guid)
} else {
None
}
})
.collect();
self.ack_waiter = if readers_pending.is_empty() {
let _ = all_acked.try_send(());
None
} else {
Some(AckWaiter {
wait_until,
complete_channel: all_acked,
readers_pending,
})
};
}
}
}
}
fn send_cache_change(
&self,
cc: &CacheChange,
send_also_heartbeat: bool,
target_reader_opt: Option<&RtpsReaderProxy>,
) -> bool {
if let Some(single_reader_guid) = cc.write_options.to_single_reader() {
match target_reader_opt {
None => {
error!(
"Data is meant for the single reader {single_reader_guid:?} but a proxy for this \
reader was not provided. Not sending anything."
);
return false;
}
Some(target_reader) => {
if single_reader_guid != target_reader.remote_reader_guid {
error!(
"We were asked to send data meant for the reader {single_reader_guid:?} to a \
different reader {:?}. Not gonna happen.",
target_reader.remote_reader_guid
);
return false;
}
}
}
}
let messages_to_send = FragmentationIter::new(self, cc, target_reader_opt, send_also_heartbeat);
let fragmentation_needed = messages_to_send.fragmentation_needed();
for msg in messages_to_send {
match target_reader_opt {
None => {
self.send_message_to_readers(DeliveryMode::Multicast, msg, &mut self.readers.values());
}
Some(reader_proxy) => {
self.send_message_to_readers(
DeliveryMode::Unicast,
msg,
&mut std::iter::once(reader_proxy),
);
}
}
}
fragmentation_needed
}
fn insert_to_history_buffer(
&mut self,
data: DDSData,
write_options: WriteOptions,
new_sequence_number: SequenceNumber,
) -> Timestamp {
assert!(new_sequence_number > SequenceNumber::zero());
let new_cache_change = CacheChange::new(self.guid(), new_sequence_number, write_options, data);
let timestamp = Timestamp::now();
self.history_buffer.add_change(timestamp, new_cache_change);
timestamp
}
pub fn handle_heartbeat_tick(&mut self, is_manual_assertion: bool) {
if self.like_stateless {
info!(
"Ignoring handling heartbeat tick in a stateless-like Writer, since it currently supports \
only BestEffort QoS. topic={:?}",
self.my_topic_name
);
return;
}
let final_flag = false;
let liveliness_flag = is_manual_assertion;
trace!(
"heartbeat tick in topic {:?} have {} readers",
self.topic_name(),
self.readers.len()
);
let first_change = self.history_buffer.first_change_sequence_number();
let last_change = self.history_buffer.last_change_sequence_number();
if self
.readers
.values()
.all(|rp| last_change < rp.all_acked_before)
{
trace!("heartbeat tick: all readers have all available data.");
} else {
let hb_message = MessageBuilder::new()
.ts_msg(self.endianness, Some(Timestamp::now()))
.heartbeat_msg(
self.entity_id(), self.history_buffer.first_change_sequence_number(),
self.history_buffer.last_change_sequence_number(),
self.next_heartbeat_count(),
self.endianness,
EntityId::UNKNOWN, final_flag,
liveliness_flag,
)
.add_header_and_build(self.my_guid.prefix);
debug!(
"Writer {:?} topic={:} HEARTBEAT {:?} to {:?}",
self.guid().entity_id,
self.topic_name(),
first_change,
last_change,
);
if self.entity_id() == EntityId::P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER {
for rp in self.readers.values() {
if last_change < rp.all_acked_before {
} else {
self.send_message_to_readers(
DeliveryMode::Unicast,
hb_message.clone(),
&mut std::iter::once(rp),
);
}
}
} else {
self.send_message_to_readers(
DeliveryMode::Multicast,
hb_message,
&mut self.readers.values(),
);
}
}
}
pub fn handle_ack_nack(
&mut self,
reader_guid_prefix: GuidPrefix,
ack_submessage: &AckSubmessage,
) {
if !self.is_reliable() || self.like_stateless {
warn!(
"Writer {:x?} is best effort or stateless-like! It should not handle acknack messages!",
self.entity_id()
);
return;
}
match ack_submessage {
AckSubmessage::AckNack(ref an) => {
let last_seq = self.history_buffer.last_change_sequence_number();
if let Some(0) = an.reader_sn_state.iter().next().map(i64::from) {
warn!("Request for SN zero! : {an:?}");
}
let reader_guid = GUID::new(reader_guid_prefix, an.reader_id);
if an.reader_sn_state.base() < SequenceNumber::from(1) {
debug!(
"ACKNACK SequenceNumberSet minimum must be >= 1, got {:?} from {:?} topic {:?}",
an.reader_sn_state.base(),
reader_guid,
self.topic_name()
);
}
let my_topic = self.my_topic_name.clone(); self.update_ack_waiters(reader_guid, Some(an.reader_sn_state.base()));
if let Some(reader_proxy) = self.lookup_reader_proxy_mut(reader_guid) {
reader_proxy.handle_ack_nack(ack_submessage, last_seq);
let reader_guid = reader_proxy.remote_reader_guid;
if cfg!(debug_assertions) {
if let Some(req_high) = reader_proxy.unsent_changes_iter().next_back() {
if req_high > last_seq {
warn!(
"ReaderProxy {:?} thinks we need to send {:?} but I have only up to {:?}",
reader_proxy.remote_reader_guid,
reader_proxy.unsent_changes_debug(),
last_seq
);
}
}
if an.reader_sn_state.base() > last_seq.plus_1() {
warn!(
"ACKNACK from {:?} acks {:?}, but I have only up to {:?} count={:?} topic={:?}",
reader_proxy.remote_reader_guid, an.reader_sn_state, last_seq, an.count, my_topic
);
}
if let Some(max_req_sn) = an.reader_sn_state.iter().next_back() {
if max_req_sn > last_seq {
warn!(
"ACKNACK from {:?} requests {:?} but I have only up to {:?}",
reader_proxy.remote_reader_guid,
an.reader_sn_state.iter().collect::<Vec<SequenceNumber>>(),
last_seq
);
}
}
}
if reader_proxy.all_acked_before > last_seq {
reader_proxy.repair_mode = false;
} else {
reader_proxy.repair_mode = true; self.timed_event_timer.set_timeout(
self.nack_response_delay,
TimedEvent::SendRepairData {
to_reader: reader_guid,
},
);
}
}
if let Some(reader_proxy) = self.readers.get(&reader_guid) {
if !reader_proxy.get_pending_gap().is_empty() {
let gap_message = MessageBuilder::new()
.gap_msg(
reader_proxy.get_pending_gap(),
self.my_guid.entity_id,
self.endianness,
reader_guid,
)
.add_header_and_build(self.my_guid.prefix);
self.send_message_to_readers(
DeliveryMode::Unicast,
gap_message,
&mut std::iter::once(reader_proxy),
);
}
}
} AckSubmessage::NackFrag(ref nackfrag) => {
let reader_guid = GUID::new(reader_guid_prefix, nackfrag.reader_id);
if let Some(reader_proxy) = self.lookup_reader_proxy_mut(reader_guid) {
reader_proxy.mark_frags_requested(nackfrag.writer_sn, &nackfrag.fragment_number_state);
}
self.timed_event_timer.set_timeout(
self.nackfrag_response_delay,
TimedEvent::SendRepairFrags {
to_reader: reader_guid,
},
);
}
}
}
fn update_ack_waiters(&mut self, guid: GUID, acked_before: Option<SequenceNumber>) {
let completed = self
.ack_waiter
.as_mut()
.is_some_and(|aw| aw.reader_acked_or_lost(guid, acked_before));
if completed {
self
.ack_waiter
.as_ref()
.map(AckWaiter::notify_wait_complete);
self.ack_waiter = None;
}
}
fn handle_repair_data_send(&mut self, to_reader: GUID) {
if self.like_stateless {
warn!(
"Not sending repair data in a stateless-like Writer, since it currently supports only \
BestEffort behavior. topic={:?}",
self.my_topic_name
);
return;
}
if let Some(mut reader_proxy) = self.readers.remove(&to_reader) {
self.handle_repair_data_send_worker(&mut reader_proxy);
if let Some(rp) = self
.readers
.insert(reader_proxy.remote_reader_guid, reader_proxy)
{
error!("Reader proxy was duplicated somehow??? {rp:?}");
}
}
}
fn handle_repair_frags_send(&mut self, to_reader: GUID) {
if self.like_stateless {
warn!(
"Not sending repair frags in a stateless-like Writer, since it currently supports only \
BestEffort behavior. topic={:?}",
self.my_topic_name
);
return;
}
if let Some(mut reader_proxy) = self.readers.remove(&to_reader) {
self.handle_repair_frags_send_worker(&mut reader_proxy);
if let Some(rp) = self
.readers
.insert(reader_proxy.remote_reader_guid, reader_proxy)
{
error!("Reader proxy was duplicated somehow??? (frags) {rp:?}");
}
}
}
fn handle_repair_data_send_worker(&mut self, reader_proxy: &mut RtpsReaderProxy) {
let reader_guid = reader_proxy.remote_reader_guid;
debug!(
"Repair data send to {reader_guid:?} due to ACKNACK. ReaderProxy Unsent changes: {:?}",
reader_proxy.unsent_changes_debug()
);
if let Some(unsent_sn) = reader_proxy.first_unsent_change() {
let mut no_longer_relevant: BTreeSet<SequenceNumber> = BTreeSet::new();
let mut all_irrelevant_before = None;
let pending_gaps = reader_proxy.get_pending_gap();
let first_available = self.history_buffer.first_change_sequence_number();
if unsent_sn < first_available {
all_irrelevant_before = Some(first_available);
}
if pending_gaps.contains(&unsent_sn) || all_irrelevant_before.is_some() {
no_longer_relevant.extend(pending_gaps);
} else {
if let Some(cc) = self.history_buffer.get_by_sn(unsent_sn) {
let data_was_fragmented = self.send_cache_change(cc, false, Some(reader_proxy));
if data_was_fragmented {
let (num_frags, _frag_size) =
self.num_frags_and_frag_size(cc.data_value.payload_size());
reader_proxy.mark_all_frags_requested(unsent_sn, num_frags);
self.timed_event_timer.set_timeout(
self.repairfrags_continue_delay,
TimedEvent::SendRepairFrags {
to_reader: reader_guid,
},
);
}
reader_proxy.mark_change_sent(unsent_sn);
} else {
no_longer_relevant.insert(unsent_sn);
if unsent_sn < first_available {
info!(
"Reader {:?} requested too old data {:?}. I have only from {:?}. Topic {:?}",
&reader_proxy, unsent_sn, first_available, &self.my_topic_name
);
} else {
error!(
"handle_repair_data_send_worker {:?} seq.number {:?} missing. first_change={:?}",
self.my_guid, unsent_sn, first_available
);
}
}
}
if !no_longer_relevant.is_empty() || all_irrelevant_before.is_some() {
let mut gap_msg = MessageBuilder::new().dst_submessage(self.endianness, reader_guid.prefix);
if let Some(all_irrelevant_before) = all_irrelevant_before {
gap_msg = gap_msg.gap_msg_before(
all_irrelevant_before,
self.entity_id(),
self.endianness,
reader_guid,
);
reader_proxy.remove_from_unsent_set_all_before(all_irrelevant_before);
}
if !no_longer_relevant.is_empty() {
gap_msg = gap_msg.gap_msg(
&no_longer_relevant,
self.entity_id(),
self.endianness,
reader_guid,
);
no_longer_relevant
.iter()
.for_each(|sn| reader_proxy.mark_change_sent(*sn));
}
let gap_msg = gap_msg.add_header_and_build(self.my_guid.prefix);
self.send_message_to_readers(
DeliveryMode::Unicast,
gap_msg,
&mut std::iter::once(&*reader_proxy),
);
} } else {
reader_proxy.repair_mode = false;
}
}
fn handle_repair_frags_send_worker(
&mut self,
reader_proxy: &mut RtpsReaderProxy,
) {
let max_send_count = 8;
let reader_guid = reader_proxy.remote_reader_guid;
for (seq_num, frag_num) in reader_proxy.frags_requested_iterator().take(max_send_count) {
if let Some(cache_change) = self.history_buffer.get_by_sn(seq_num) {
if let Some(single_reader_guid) = cache_change.write_options.to_single_reader() {
if single_reader_guid != reader_guid {
error!(
"We were asked to send datafrags meant for the reader {single_reader_guid:?} to a \
different reader {reader_guid:?}. Not gonna happen."
);
return;
}
}
let mut message_builder = MessageBuilder::new();
if let Some(src_ts) = cache_change.write_options.source_timestamp() {
message_builder = message_builder.ts_msg(self.endianness, Some(src_ts));
}
let fragment_size: u32 = self.data_max_size_serialized as u32; let data_size: u32 = cache_change.data_value.payload_size() as u32;
message_builder = message_builder.data_frag_msg(
cache_change,
reader_guid.entity_id, self.my_guid, frag_num,
fragment_size as u16, data_size,
self.endianness,
self.security_plugins.as_ref(),
);
self.send_message_to_readers(
DeliveryMode::Unicast,
message_builder.add_header_and_build(self.my_guid.prefix),
&mut std::iter::once(&*reader_proxy),
);
} else {
error!(
"handle_repair_frags_send_worker: {:?} missing from history_buffer. topic={:?}",
seq_num, self.my_topic_name
);
}
reader_proxy.mark_frag_sent(seq_num, &frag_num);
} }
fn remove_all_acked_changes_but_keep_depth(
&mut self,
depth: Option<usize>,
resource_limit: usize,
) {
let first_keeper = if !self.like_stateless {
let acked_by_all_readers = self
.readers
.values()
.map(RtpsReaderProxy::acked_up_to_before)
.min()
.unwrap_or_else(SequenceNumber::zero);
if let Some(depth) = depth {
max(
acked_by_all_readers - SequenceNumber::from(depth),
self.history_buffer.first_change_sequence_number(),
)
} else {
self.history_buffer.first_change_sequence_number()
}
} else {
let depth = depth.unwrap_or(0);
max(
self.history_buffer.last_change_sequence_number() - SequenceNumber::from(depth),
self.history_buffer.first_change_sequence_number(),
)
};
let first_keeper = max(
max(
first_keeper,
self.history_buffer.last_change_sequence_number() - SequenceNumber::from(resource_limit),
),
SequenceNumber::zero(),
);
debug!(
"HistoryBuffer: cleaning before {first_keeper:?} topic={:?}",
self.topic_name()
);
self.history_buffer.remove_changes_before(first_keeper);
}
pub(crate) fn next_heartbeat_count(&self) -> i32 {
self
.heartbeat_message_counter
.fetch_add(1, atomic::Ordering::SeqCst)
}
#[cfg(feature = "security")]
fn security_encode(
&self,
message: Message,
readers: &[&RtpsReaderProxy],
) -> SecurityResult<Message> {
if let Some(security_plugins_handle) = &self.security_plugins {
let source_guid = self.guid();
let destination_guid_list: Vec<GUID> = readers
.iter()
.map(|reader_proxy| reader_proxy.remote_reader_guid)
.collect();
let Message {
header,
submessages,
} = message;
SecurityResult::<Vec<Vec<Submessage>>>::from_iter(submessages.iter().map(|submessage| {
security_plugins_handle
.get_plugins()
.encode_datawriter_submessage(submessage.clone(), &source_guid, &destination_guid_list)
.map(Vec::from)
}))
.map(|encoded_submessages| Message {
header,
submessages: encoded_submessages.concat(),
})
.and_then(|message| {
let source_guid_prefix = source_guid.prefix;
let destination_guid_prefix_list: Vec<GuidPrefix> = destination_guid_list
.iter()
.map(|guid| guid.prefix)
.collect();
security_plugins_handle.get_plugins().encode_message(
message,
&source_guid_prefix,
&destination_guid_prefix_list,
)
})
} else {
Ok(message)
}
}
fn send_message_to_readers(
&self,
preferred_mode: DeliveryMode,
message: Message,
readers: &mut dyn Iterator<Item = &RtpsReaderProxy>,
) {
let readers = readers.collect::<Vec<_>>();
#[cfg(feature = "security")]
let encoded = self.security_encode(message, &readers);
#[cfg(not(feature = "security"))]
let encoded: Result<Message, ()> = Ok(message);
match encoded {
Ok(message) => {
let buffer = message.write_to_vec_with_ctx(self.endianness).unwrap();
let mut already_sent_to = BTreeSet::new();
macro_rules! send_unless_sent_and_mark {
($locs:expr) => {
for loc in $locs.iter() {
if already_sent_to.contains(loc) {
trace!("Already sent to {:?}", loc);
} else {
self.udp_sender.send_to_locator(&buffer, loc);
already_sent_to.insert(loc.clone());
}
}
};
}
for reader in readers {
match (
preferred_mode,
reader
.unicast_locator_list
.iter()
.find(|l| Locator::is_udp(l)),
reader
.multicast_locator_list
.iter()
.find(|l| Locator::is_udp(l)),
) {
(DeliveryMode::Multicast, _, Some(_mc_locator)) => {
send_unless_sent_and_mark!(reader.multicast_locator_list);
}
(DeliveryMode::Unicast, Some(_uc_locator), _) => {
send_unless_sent_and_mark!(reader.unicast_locator_list)
}
(_delivery_mode, _, Some(_mc_locator)) => {
send_unless_sent_and_mark!(reader.multicast_locator_list);
}
(_delivery_mode, Some(_uc_locator), _) => {
send_unless_sent_and_mark!(reader.unicast_locator_list)
}
(_delivery_mode, None, None) => {
warn!("send_message_to_readers: No locators for {reader:?}");
}
} }
}
Err(e) => error!("Failed to send message to readers. Encoding failed: {e:?}"),
}
}
fn send_status(&self, status: DataWriterStatus) {
self
.status_sender
.try_send(status)
.unwrap_or_else(|e| match e {
TrySendError::Full(_) => (), TrySendError::Disconnected(_) => {
debug!("send_status - status receiver is disconnected");
}
TrySendError::Io(e) => {
warn!("send_status - io error {e:?}");
}
});
}
pub fn update_reader_proxy(
&mut self,
reader_proxy: &RtpsReaderProxy,
requested_qos: &QosPolicies,
) {
debug!(
"update_reader_proxy topic={:?} reader_proxy={reader_proxy:?}",
self.my_topic_name
);
match self.qos_policies.compliance_failure_wrt(requested_qos) {
None => {
let new_reader = self.matched_reader_update(reader_proxy);
if new_reader {
self.matched_readers_count_total += 1;
self.send_status(DataWriterStatus::PublicationMatched {
total: CountWithChange::new(self.matched_readers_count_total, 1),
current: CountWithChange::new(self.readers.len() as i32, 1),
reader: reader_proxy.remote_reader_guid,
});
self.send_participant_status(DomainParticipantStatusEvent::RemoteReaderMatched {
local_writer: self.my_guid,
remote_reader: reader_proxy.remote_reader_guid,
});
info!(
"Matched new remote reader on topic={:?} reader={:?}",
self.topic_name(),
&reader_proxy.remote_reader_guid
);
debug!("Reader details: {:?}", &reader_proxy);
}
}
Some(bad_policy_id) => {
warn!(
"update_reader_proxy - QoS mismatch {:?} topic={:?}",
bad_policy_id,
self.topic_name()
);
info!(
"Reader QoS={:?} Writer QoS={:?}",
requested_qos, self.qos_policies
);
self.requested_incompatible_qos_count += 1;
self.send_status(DataWriterStatus::OfferedIncompatibleQos {
count: CountWithChange::new(self.requested_incompatible_qos_count, 1),
last_policy_id: bad_policy_id,
reader: reader_proxy.remote_reader_guid,
requested_qos: Box::new(requested_qos.clone()),
offered_qos: Box::new(self.qos_policies.clone()),
});
self.send_participant_status(DomainParticipantStatusEvent::RemoteReaderQosIncompatible {
local_writer: self.my_guid,
remote_reader: reader_proxy.remote_reader_guid,
requested_qos: Box::new(requested_qos.clone()),
offered_qos: Box::new(self.qos_policies.clone()),
});
}
} }
fn matched_reader_update(&mut self, updated_reader_proxy: &RtpsReaderProxy) -> bool {
let mut is_new = false;
let is_volatile = self.qos().is_volatile(); self
.readers
.entry(updated_reader_proxy.remote_reader_guid)
.and_modify(|rp| rp.update(updated_reader_proxy, &self.my_topic_name))
.or_insert_with(|| {
is_new = true;
let mut new_proxy = updated_reader_proxy.clone();
if is_volatile {
new_proxy.set_pending_gap_up_to(self.history_buffer.last_change_sequence_number());
}
new_proxy
});
is_new
}
fn matched_reader_remove(&mut self, guid: GUID) -> Option<RtpsReaderProxy> {
let removed = self.readers.remove(&guid);
if let Some(ref removed_reader) = removed {
info!(
"Removed reader proxy. topic={:?} reader={:?}",
self.topic_name(),
removed_reader.remote_reader_guid,
);
debug!("Removed reader proxy details: {removed_reader:?}");
}
#[cfg(feature = "security")]
if let Some(security_plugins_handle) = &self.security_plugins {
security_plugins_handle
.get_plugins()
.unregister_remote_reader(&self.my_guid, &guid)
.unwrap_or_else(|e| error!("{e}"));
}
removed
}
pub fn reader_lost(&mut self, guid: GUID) {
if self.readers.contains_key(&guid) {
info!(
"reader_lost topic={:?} reader={:?}",
self.topic_name(),
&guid
);
self.matched_reader_remove(guid);
self.send_status(DataWriterStatus::PublicationMatched {
total: CountWithChange::new(self.matched_readers_count_total, 0),
current: CountWithChange::new(self.readers.len() as i32, -1),
reader: guid,
});
}
self.update_ack_waiters(guid, None);
}
pub fn participant_lost(&mut self, guid_prefix: GuidPrefix) {
let lost_readers: Vec<GUID> = self
.readers
.range(guid_prefix.range())
.map(|(g, _)| *g)
.collect();
for reader in lost_readers {
self.reader_lost(reader);
}
}
fn lookup_reader_proxy_mut(&mut self, guid: GUID) -> Option<&mut RtpsReaderProxy> {
self.readers.get_mut(&guid)
}
pub fn topic_name(&self) -> &String {
&self.my_topic_name
}
fn send_participant_status(&self, event: DomainParticipantStatusEvent) {
self
.participant_status_sender
.try_send(event)
.unwrap_or_else(|e| error!("Cannot report participant status: {e:?}"));
}
}
impl RTPSEntity for Writer {
fn guid(&self) -> GUID {
self.my_guid
}
}
impl HasQoSPolicy for Writer {
fn qos(&self) -> QosPolicies {
self.qos_policies.clone()
}
}
struct FragmentationIter<'a> {
writer: &'a Writer,
cache_change: &'a CacheChange,
target_reader_opt: Option<&'a RtpsReaderProxy>,
reader_entity_id: EntityId,
send_heartbeat: bool,
finished: bool,
state: FragmentationIterState,
}
impl<'a> FragmentationIter<'a> {
fn new(
writer: &'a Writer,
cache_change: &'a CacheChange,
target_reader_opt: Option<&'a RtpsReaderProxy>,
send_heartbeat: bool,
) -> Self {
let reader_entity_id =
target_reader_opt.map_or(EntityId::UNKNOWN, |p| p.remote_reader_guid.entity_id);
let data_size = cache_change.data_value.payload_size();
let fragmentation_needed = data_size > writer.data_max_size_serialized;
let state = if fragmentation_needed {
FragmentationIterState::Fragmented(FragmentedState::TargetReader, data_size)
} else {
FragmentationIterState::Unfragmented
};
Self {
writer,
cache_change,
target_reader_opt,
state,
reader_entity_id,
finished: false,
send_heartbeat,
}
}
fn fragmentation_needed(&self) -> bool {
matches!(self.state, FragmentationIterState::Fragmented(..))
}
}
enum FragmentationIterState {
Fragmented(FragmentedState, usize),
Unfragmented,
}
enum FragmentedState {
TargetReader,
Fragments(FragmentNumberRange, u16),
Heartbeat,
}
impl<'a> Iterator for FragmentationIter<'a> {
type Item = Message;
fn next(&mut self) -> Option<Self::Item> {
if self.finished {
return None;
}
let cc = self.cache_change;
let writer = self.writer;
let target_reader_opt = self.target_reader_opt;
let reader_entity_id = self.reader_entity_id;
let send_heartbeat = self.send_heartbeat;
match &mut self.state {
FragmentationIterState::Fragmented(state, data_size) => {
match state {
FragmentedState::TargetReader => {
let (num_frags, fragment_size) = writer.num_frags_and_frag_size(*data_size);
*state = FragmentedState::Fragments(
FragmentNumber::range_inclusive(
FragmentNumber::new(1),
FragmentNumber::new(num_frags),
),
fragment_size,
);
if let Some(reader) = target_reader_opt {
if !reader.get_pending_gap().is_empty() {
let gap_msg = MessageBuilder::new()
.dst_submessage(writer.endianness, reader.remote_reader_guid.prefix)
.gap_msg(
reader.get_pending_gap(),
writer.entity_id(),
writer.endianness,
reader.remote_reader_guid,
)
.add_header_and_build(writer.my_guid.prefix);
return Some(gap_msg);
}
}
self.next()
}
FragmentedState::Fragments(fragments, fragment_size) => {
if let Some(frag_num) = fragments.next() {
let mut message_builder = MessageBuilder::new();
if let Some(src_ts) = cc.write_options.source_timestamp() {
message_builder = message_builder.ts_msg(writer.endianness, Some(src_ts));
}
if let Some(reader) = target_reader_opt {
message_builder = message_builder
.dst_submessage(writer.endianness, reader.remote_reader_guid.prefix);
}
message_builder = message_builder.data_frag_msg(
cc,
reader_entity_id, writer.my_guid,
frag_num,
*fragment_size,
(*data_size).try_into().unwrap(),
writer.endianness,
writer.security_plugins.as_ref(),
);
let datafrag_msg = message_builder.add_header_and_build(writer.my_guid.prefix);
return Some(datafrag_msg);
}
*state = FragmentedState::Heartbeat;
self.next()
}
FragmentedState::Heartbeat => {
self.finished = true;
if send_heartbeat && !writer.like_stateless {
let final_flag = false; let liveliness_flag = false; let hb_msg = MessageBuilder::new()
.heartbeat_msg(
writer.entity_id(), writer.history_buffer.first_change_sequence_number(),
writer.history_buffer.last_change_sequence_number(),
writer.next_heartbeat_count(),
writer.endianness,
reader_entity_id, final_flag,
liveliness_flag,
)
.add_header_and_build(writer.my_guid.prefix);
return Some(hb_msg);
}
None
}
}
}
FragmentationIterState::Unfragmented => {
let mut message_builder = MessageBuilder::new();
if let Some(src_ts) = cc.write_options.source_timestamp() {
message_builder = message_builder.ts_msg(writer.endianness, Some(src_ts));
}
if let Some(reader) = target_reader_opt {
message_builder =
message_builder.dst_submessage(writer.endianness, reader.remote_reader_guid.prefix);
if !reader.get_pending_gap().is_empty() {
message_builder = message_builder.gap_msg(
reader.get_pending_gap(),
writer.entity_id(),
writer.endianness,
reader.remote_reader_guid,
);
}
}
message_builder = message_builder.data_msg(
cc,
reader_entity_id,
writer.my_guid,
writer.endianness,
writer.security_plugins.as_ref(),
);
if send_heartbeat && !writer.like_stateless {
let final_flag = false; let liveliness_flag = false; message_builder = message_builder.heartbeat_msg(
writer.entity_id(),
writer.history_buffer.first_change_sequence_number(),
writer.history_buffer.last_change_sequence_number(),
writer.next_heartbeat_count(),
writer.endianness,
reader_entity_id, final_flag,
liveliness_flag,
);
}
let data_message = message_builder.add_header_and_build(writer.my_guid.prefix);
self.finished = true;
Some(data_message)
}
}
}
}
#[cfg(test)]
mod tests {
use std::thread;
use byteorder::LittleEndian;
use log::info;
use crate::{
dds::{
participant::DomainParticipant, qos::QosPolicies, topic::TopicKind,
with_key::datawriter::DataWriter,
},
serialization::CDRSerializerAdapter,
test::random_data::*,
};
#[test]
fn test_writer_receives_datawriter_cache_change_notifications() {
let domain_participant = DomainParticipant::new(0).expect("Failed to create participant");
let qos = QosPolicies::qos_none();
let _default_dw_qos = QosPolicies::qos_none();
let publisher = domain_participant
.create_publisher(&qos)
.expect("Failed to create publisher");
let topic = domain_participant
.create_topic(
"Aasii".to_string(),
"Huh?".to_string(),
&qos,
TopicKind::WithKey,
)
.expect("Failed to create topic");
let data_writer: DataWriter<RandomData, CDRSerializerAdapter<RandomData, LittleEndian>> =
publisher
.create_datawriter(&topic, None)
.expect("Failed to create datawriter");
let data = RandomData {
a: 4,
b: "Fobar".to_string(),
};
let data2 = RandomData {
a: 2,
b: "Fobar".to_string(),
};
let data3 = RandomData {
a: 3,
b: "Fobar".to_string(),
};
let write_result = data_writer.write(data, None);
info!("writerResult: {write_result:?}");
data_writer
.write(data2, None)
.expect("Unable to write data");
info!("writerResult: {write_result:?}");
let write_result = data_writer.write(data3, None);
thread::sleep(std::time::Duration::from_millis(100));
info!("writerResult: {write_result:?}");
}
}