1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
use core::ops::Bound::{Included, Unbounded};
use std::{cmp::max, collections::BTreeMap};
#[allow(unused_imports)]
use log::{debug, error, info, trace, warn};
use crate::{
discovery::sedp_messages::DiscoveredWriterData,
structure::{
guid::{EntityId, GUID},
locator::Locator,
sequence_number::SequenceNumber,
time::Timestamp,
},
};
#[derive(Debug)] // these are not cloneable, because contained data may be large
pub(crate) struct RtpsWriterProxy {
/// Identifies the remote matched Writer
pub remote_writer_guid: GUID,
/// List of unicast (address, port) combinations that can be used to send
/// messages to the matched Writer or Writers. The list may be empty.
pub unicast_locator_list: Vec<Locator>,
/// List of multicast (address, port) combinations that can be used to send
/// messages to the matched Writer or Writers. The list may be empty.
pub multicast_locator_list: Vec<Locator>,
/// Identifies the group to which the matched Reader belongs
pub remote_group_entity_id: EntityId,
// See RTPS Spec v2.5 Section 8.4.10.4 on how the WriterProxy is supposed to
// operate.
// And 8.4.10.5 on statuses of the (cache) changes received from a writer.
// The changes are identified by sequence numbers.
// Any sequence number is at any moment in one of the following states:
// unknown, missing, received, not_available.
//
// Unknown means that we have no information of that change. This is the initial state for
// everyone. Missing means that writer claims to have it and reader may request it with
// ACKNACK. Received means that the reader has received the change via DATA (or DATAFRAGs).
// Not_available means that writer has informed via GAP message that change is not available.
// Unknown can transition to Missing (via HEARTBEAT), Received (DATA), or not_available (GAP).
// Missing can transition to Received or Not_available.
// Received cannot transition to anything.
// Not_available cannot transition to anything.
// We keep a map "changes" and a sequence number counters "ack_base", to keep track of these.
// changes.get(sn) is interpreted as follows:
// * Some(Some(timestamp)) = received at timestamp
// * Some(None) = not_available
// * None = any state, see below:
//
// All changes below ack_base are either received or not_available.
// All changes above hb_last are unknown (if they are not in "changes" map)
// All changes between ack_base and hb_last (inclusive) are missing.
// Timestamps are stored, because they are used as keys into the DDS Cache.
changes: BTreeMap<SequenceNumber, Option<Timestamp>>,
// The changes map is cleaned on heartbeat messages. The changes no longer available are dropped.
pub received_heartbeat_count: i32,
pub sent_ack_nack_count: i32,
ack_base: SequenceNumber, // We can ACK everything before this number.
// ack_base can be increased from N-1 to N, if we receive DATA with SequenceNumber N-1
// heartbeat(first,last) => ack_base can be increased to first.
// GAP is treated like receiving a message.
// These are used for quick tracking of
last_received_sequence_number: SequenceNumber,
last_received_timestamp: Timestamp,
}
impl RtpsWriterProxy {
pub fn new(
remote_writer_guid: GUID,
unicast_locator_list: Vec<Locator>,
multicast_locator_list: Vec<Locator>,
remote_group_entity_id: EntityId,
) -> Self {
Self {
remote_writer_guid,
unicast_locator_list,
multicast_locator_list,
remote_group_entity_id,
changes: BTreeMap::new(),
received_heartbeat_count: 0,
sent_ack_nack_count: 0,
// Sequence numbering must start at 1.
// Therefore, we can ACK all sequence numbers below 1 even before receiving anything.
ack_base: SequenceNumber::new(1),
last_received_sequence_number: SequenceNumber::new(0),
last_received_timestamp: Timestamp::INVALID,
}
}
pub fn next_ack_nack_sequence_number(&mut self) -> i32 {
let c = self.sent_ack_nack_count;
self.sent_ack_nack_count += 1;
c
}
// Returns a bound, below which everything can be acknowledged, i.e.
// is received either as DATA or GAP
pub fn all_ackable_before(&self) -> SequenceNumber {
self.ack_base
}
pub fn update_contents(&mut self, other: Self) {
self.unicast_locator_list = other.unicast_locator_list;
self.multicast_locator_list = other.multicast_locator_list;
self.remote_group_entity_id = other.remote_group_entity_id;
}
// This is used to check for DEADLINE policy
pub fn last_change_timestamp(&self) -> Option<Timestamp> {
if self.last_received_sequence_number > SequenceNumber::new(0) {
Some(self.last_received_timestamp)
} else {
None
}
}
// Check if we no samples in the received state.
pub fn no_changes_received(&self) -> bool {
self.ack_base == SequenceNumber::new(0) && self.changes.is_empty()
}
// Given an availability range from a HEARTBEAT, find out what we are missing.
//
// Note: Heartbeat gives bounds only. Some samples within that range may
// have been received already, or not really available, i.e. there may be GAPs
// in the range.
pub fn missing_seqnums(
&self,
hb_first_sn: SequenceNumber,
hb_last_sn: SequenceNumber,
) -> Vec<SequenceNumber> {
// Need to verify first <= last, or BTreeMap::range will crash
if hb_first_sn > hb_last_sn {
if hb_first_sn > hb_last_sn + SequenceNumber::from(1) {
warn!("Negative range of missing_seqnums first={hb_first_sn:?} last={hb_last_sn:?}");
} else {
// first == last+1
// This is normal. See RTPS 2.5 Spec Section "8.3.8.6.3 Validity"
// It means nothing is available. Since nothing is available, nothing is
// missing.
}
return vec![];
}
let mut missing_seqnums = Vec::with_capacity(32); // out of hat value
let relevant_interval = SequenceNumber::range_inclusive(
max(hb_first_sn, self.ack_base), // ignore those that we already have
hb_last_sn,
);
// iterator over known Received and Not_available changes.
let known =
// again check for negative intervals, or BTreeMap::range will crash
if relevant_interval.begin() <= relevant_interval.end() {
self.changes
.range( relevant_interval )
.map(|e| *e.0)
.collect()
} else { vec![] };
let mut known_iter = known.iter();
let mut known_head = known_iter.next();
// Iterate over all SequenceNumbers (indices) in the advertised range.
for s in relevant_interval {
match known_head {
None => missing_seqnums.push(s), // no known changes left => s is missing
Some(known_sn) => {
// there are known changes left
if *known_sn == s {
// and the index sequence matches it => not missing
// => advance to next known change and continue iteration
known_head = known_iter.next();
} else {
// but it is not yet this index s => s is missing
missing_seqnums.push(s);
}
}
}
}
missing_seqnums
}
// Check if we have already received this sequence number
// or it has been marked as not_available
pub fn should_ignore_change(&self, seqnum: SequenceNumber) -> bool {
seqnum < self.ack_base || self.changes.contains_key(&seqnum)
}
// This is used to mark DATA as received.
pub fn received_changes_add(&mut self, seq_num: SequenceNumber, receive_timestamp: Timestamp) {
self.changes.insert(seq_num, Some(receive_timestamp));
// Update deadline tracker
if seq_num > self.last_received_sequence_number {
self.last_received_sequence_number = seq_num;
self.last_received_timestamp = receive_timestamp;
}
// We get to advance ack_base if it was equal to seq_num
// If ack_base < seq_num, we are still missing seq_num-1 or others below
// If ack_base > seq_num, this is either a duplicate or ack_base was wrong.
// Remember, ack_base is the SN one past the last received/irrelevant SN.
if seq_num == self.ack_base {
self.advance_ack_base();
}
}
// Used to add individual irrelevant changes from GAP message
pub fn set_irrelevant_change(&mut self, seq_num: SequenceNumber) {
// If sequence number is still in the relevant range,
// insert not_available marker
if seq_num >= self.ack_base {
self.changes.insert(seq_num, None);
}
if seq_num == self.ack_base {
// ack_base can be advanced
self.advance_ack_base();
}
}
// Used to add range of irrelevant changes from GAP submessage or unavailable
// changes from HEARTBEAT submessage
pub fn irrelevant_changes_range(
&mut self,
remove_from: SequenceNumber,
remove_until_before: SequenceNumber,
) {
// check sanity
if remove_from > remove_until_before {
error!(
"irrelevant_changes_range: negative range: remove_from={remove_from:?} \
remove_until_before={remove_until_before:?}"
);
return;
}
// now remove_from <= remove_until_before, i.e. at least zero to remove
//
// Two cases here:
// If remove_from <= self.ack_base, then we may proceed by moving
// ack_base to remove_until_before and clearing "changes" before that.
//
// Else (remove_from > self.ack_base), which means we must insert not_available
// markers to "changes".
//
if remove_from <= self.ack_base {
let mut removed_and_after = self.changes.split_off(&remove_from);
let mut after = removed_and_after.split_off(&remove_until_before);
// let removed = removed_and_after;
self.changes.append(&mut after);
if remove_until_before > self.ack_base {
// Move the base to skip the irrelevant changes
self.ack_base = remove_until_before;
// The new base might be a sample that we already have, move the base forward
// until we hit a missing one
self.advance_ack_base();
}
debug!(
"ack_base increased to {:?} by irrelevant_changes_range {:?} to {:?}. writer={:?}",
self.ack_base, remove_from, remove_until_before, self.remote_writer_guid
);
} else {
// TODO: This potentially generates a very large BTreeMap
for na in
SequenceNumber::range_inclusive(remove_from, remove_until_before - SequenceNumber::new(1))
{
self.changes.insert(na, None);
}
}
}
// Used to mark messages irrelevant because of a HEARTBEAT message.
//
// smallest_seqnum is the lowest key to be retained
pub fn irrelevant_changes_up_to(&mut self, smallest_seqnum: SequenceNumber) {
self.irrelevant_changes_range(SequenceNumber::new(0), smallest_seqnum);
}
fn discovered_or_default(drd: &[Locator], default: &[Locator]) -> Vec<Locator> {
if drd.is_empty() {
default.to_vec()
} else {
drd.to_vec()
}
}
pub fn from_discovered_writer_data(
discovered_writer_data: &DiscoveredWriterData,
default_unicast_locators: &[Locator],
default_multicast_locators: &[Locator],
) -> RtpsWriterProxy {
let unicast_locator_list = Self::discovered_or_default(
&discovered_writer_data.writer_proxy.unicast_locator_list,
default_unicast_locators,
);
let multicast_locator_list = Self::discovered_or_default(
&discovered_writer_data.writer_proxy.multicast_locator_list,
default_multicast_locators,
);
RtpsWriterProxy {
remote_writer_guid: discovered_writer_data.writer_proxy.remote_writer_guid,
remote_group_entity_id: EntityId::UNKNOWN,
unicast_locator_list,
multicast_locator_list,
changes: BTreeMap::new(),
received_heartbeat_count: 0,
sent_ack_nack_count: 0,
ack_base: SequenceNumber::default(),
last_received_sequence_number: SequenceNumber::new(0),
last_received_timestamp: Timestamp::INVALID,
}
} // fn
// Advance ack_base as far as possible
// This function should be called after the writer proxy has modified its
// changes cache (for instance added a new received change) such that ack_base
// could be advanced
fn advance_ack_base(&mut self) {
// Start searching from current ack_base
let mut test_sn = self.ack_base;
for (&sn, _what) in self.changes.range((Included(&self.ack_base), Unbounded)) {
if sn == test_sn {
// test_sn found from changes, ack_base can be set to test_sn + 1
test_sn = test_sn + SequenceNumber::new(1);
} else {
// test_sn not found from changes, stop here
break;
}
// The changes cache contains a string of consecutive sequence numbers from
// ack_base-1 up to test_sn (excluded), so ack_base can be set to test_sn
self.ack_base = test_sn;
}
}
} // impl