1use std::{
2 cmp::max,
3 collections::BTreeMap,
4 io,
5 marker::PhantomData,
6 pin::Pin,
7 sync::{Arc, Mutex, MutexGuard},
8 task::{Context, Poll, Waker},
9};
10
11use futures::stream::{FusedStream, Stream};
12use serde::de::DeserializeOwned;
13use mio_extras::channel as mio_channel;
14#[allow(unused_imports)]
15use log::{debug, error, info, trace, warn};
16
17use crate::{
18 dds::{
19 adapters::with_key::{Decode, DefaultDecoder, DeserializerAdapter},
20 ddsdata::*,
21 key::*,
22 pubsub::Subscriber,
23 qos::*,
24 result::*,
25 statusevents::*,
26 topic::{Topic, TopicDescription},
27 with_key::datasample::{DeserializedCacheChange, Sample},
28 },
29 discovery::discovery::DiscoveryCommand,
30 mio_source::PollEventSource,
31 serialization::CDRDeserializerAdapter,
32 structure::{
33 cache_change::CacheChange,
34 dds_cache::TopicCache,
35 entity::RTPSEntity,
36 guid::{EntityId, GUID},
37 sequence_number::SequenceNumber,
38 time::Timestamp,
39 },
40};
41
42#[derive(Clone, Debug)]
43pub(crate) enum ReaderCommand {
44 #[allow(dead_code)] ResetRequestedDeadlineStatus,
46}
47
48pub(crate) struct ReadState<K: Key> {
51 latest_instant: Timestamp, last_read_sn: BTreeMap<GUID, SequenceNumber>, hash_to_key_map: BTreeMap<KeyHash, K>, }
59
60impl<K: Key> ReadState<K> {
61 fn new() -> Self {
62 ReadState {
63 latest_instant: Timestamp::ZERO,
64 last_read_sn: BTreeMap::new(),
65 hash_to_key_map: BTreeMap::<KeyHash, K>::new(),
66 }
67 }
68
69 fn get_sn_map_and_hash_map(
73 &mut self,
74 ) -> (
75 &mut BTreeMap<GUID, SequenceNumber>,
76 &mut BTreeMap<KeyHash, K>,
77 ) {
78 let ReadState {
79 last_read_sn,
80 hash_to_key_map,
81 ..
82 } = self;
83 (last_read_sn, hash_to_key_map)
84 }
85}
86
87pub struct SimpleDataReader<D: Keyed, DA: DeserializerAdapter<D> = CDRDeserializerAdapter<D>> {
90 my_subscriber: Subscriber,
91
92 my_topic: Topic,
93 qos_policy: QosPolicies,
94 my_guid: GUID,
95
96 pub(crate) notification_receiver: Mutex<mio_channel::Receiver<()>>,
98
99 topic_cache: Arc<Mutex<TopicCache>>,
101
102 read_state: Mutex<ReadState<<D as Keyed>::K>>,
103
104 deserializer_type: PhantomData<DA>, discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
107 status_receiver: StatusChannelReceiver<DataReaderStatus>,
108
109 #[allow(dead_code)] reader_command: mio_channel::SyncSender<ReaderCommand>,
112 data_reader_waker: Arc<Mutex<Option<Waker>>>,
113
114 event_source: PollEventSource,
115}
116
117impl<D, DA> Drop for SimpleDataReader<D, DA>
118where
119 D: Keyed,
120 DA: DeserializerAdapter<D>,
121{
122 fn drop(&mut self) {
123 self.my_subscriber.remove_reader(self.my_guid);
125
126 match self
128 .discovery_command
129 .send(DiscoveryCommand::RemoveLocalReader { guid: self.my_guid })
130 {
131 Ok(_) => {}
132 Err(mio_channel::SendError::Disconnected(_)) => {
133 debug!("Failed to send DiscoveryCommand::RemoveLocalReader . Maybe shutting down?");
134 }
135 Err(e) => error!(
136 "Failed to send DiscoveryCommand::RemoveLocalReader. {:?}",
137 e
138 ),
139 }
140 }
141}
142
143impl<D: 'static, DA> SimpleDataReader<D, DA>
144where
145 D: Keyed,
146 DA: DeserializerAdapter<D>,
147{
148 #[allow(clippy::too_many_arguments)]
149 pub(crate) fn new(
150 subscriber: Subscriber,
151 my_id: EntityId,
152 topic: Topic,
153 qos_policy: QosPolicies,
154 notification_receiver: mio_channel::Receiver<()>,
156 topic_cache: Arc<Mutex<TopicCache>>,
157 discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
158 status_receiver: StatusChannelReceiver<DataReaderStatus>,
159 reader_command: mio_channel::SyncSender<ReaderCommand>,
160 data_reader_waker: Arc<Mutex<Option<Waker>>>,
161 event_source: PollEventSource,
162 ) -> CreateResult<Self> {
163 let dp = match subscriber.participant() {
164 Some(dp) => dp,
165 None => {
166 return Err(CreateError::ResourceDropped {
167 reason: "Cannot create new DataReader, DomainParticipant doesn't exist.".to_string(),
168 })
169 }
170 };
171
172 let my_guid = GUID::new_with_prefix_and_id(dp.guid_prefix(), my_id);
173
174 let topic_cache_name = topic_cache.lock().unwrap().topic_name();
176 if topic.name() != topic_cache_name {
177 return Err(CreateError::Internal {
178 reason: format!(
179 "Topic name = {} and topic cache name = {} not equal when creating a SimpleDataReader",
180 topic.name(),
181 topic_cache_name
182 ),
183 });
184 }
185
186 Ok(Self {
187 my_subscriber: subscriber,
188 qos_policy,
189 my_guid,
190 notification_receiver: Mutex::new(notification_receiver),
191 topic_cache,
192 read_state: Mutex::new(ReadState::new()),
193 my_topic: topic,
194 deserializer_type: PhantomData,
195 discovery_command,
196 status_receiver,
197 reader_command,
198 data_reader_waker,
199 event_source,
200 })
201 }
202 pub(crate) fn set_waker(&self, w: Option<Waker>) {
203 *self.data_reader_waker.lock().unwrap() = w;
204 }
205
206 pub(crate) fn drain_read_notifications(&self) {
207 let rec = self.notification_receiver.lock().unwrap();
208 while rec.try_recv().is_ok() {}
209 self.event_source.drain();
210 }
211
212 fn try_take_undecoded<'a>(
213 is_reliable: bool,
214 topic_cache: &'a TopicCache,
215 latest_instant: Timestamp,
216 last_read_sn: &'a BTreeMap<GUID, SequenceNumber>,
217 ) -> Box<dyn Iterator<Item = (Timestamp, &'a CacheChange)> + 'a> {
218 if is_reliable {
219 topic_cache.get_changes_in_range_reliable(last_read_sn)
220 } else {
221 topic_cache.get_changes_in_range_best_effort(latest_instant, Timestamp::now())
222 }
223 }
224
225 fn update_hash_to_key_map(
226 hash_to_key_map: &mut BTreeMap<KeyHash, D::K>,
227 deserialized: &Sample<D, D::K>,
228 ) {
229 let instance_key = match deserialized {
230 Sample::Value(d) => d.key(),
231 Sample::Dispose(k) => k.clone(),
232 };
233 hash_to_key_map.insert(instance_key.hash_key(false), instance_key);
234 }
235
236 fn deserialize_with<S>(
237 &self,
238 timestamp: Timestamp,
239 cc: &CacheChange,
240 hash_to_key_map: &mut BTreeMap<KeyHash, D::K>,
241 decoder: S,
242 ) -> ReadResult<DeserializedCacheChange<D>>
243 where
244 S: Decode<DA::Decoded, DA::DecodedKey>,
245 {
246 match cc.data_value {
247 DDSData::Data {
248 ref serialized_payload,
249 } => {
250 if let Some(recognized_rep_id) = DA::supported_encodings()
252 .iter()
253 .find(|r| **r == serialized_payload.representation_identifier)
254 {
255 match DA::from_bytes_with(&serialized_payload.value, *recognized_rep_id, decoder) {
256 Ok(payload) => {
258 let p = Sample::Value(payload);
259 Self::update_hash_to_key_map(hash_to_key_map, &p);
260 Ok(DeserializedCacheChange::new(timestamp, cc, p))
261 }
262 Err(e) => Err(ReadError::Deserialization {
263 reason: format!(
264 "Failed to deserialize sample bytes: {}, , Topic = {}, Type = {:?}",
265 e,
266 self.my_topic.name(),
267 self.my_topic.get_type()
268 ),
269 }),
270 }
271 } else {
272 info!(
273 "Unknown representation id: {:?} , Topic = {}, Type = {:?} data = {:02x?}",
274 serialized_payload.representation_identifier,
275 self.my_topic.name(),
276 self.my_topic.get_type(),
277 serialized_payload.value,
278 );
279 Err(ReadError::Deserialization {
280 reason: format!(
281 "Unknown representation id {:?} , Topic = {}, Type = {:?}",
282 serialized_payload.representation_identifier,
283 self.my_topic.name(),
284 self.my_topic.get_type()
285 ),
286 })
287 }
288 }
289
290 DDSData::DisposeByKey {
291 key: ref serialized_key,
292 ..
293 } => {
294 match DA::key_from_bytes_with(
295 &serialized_key.value,
296 serialized_key.representation_identifier,
297 decoder,
298 ) {
299 Ok(key) => {
300 let k = Sample::Dispose(key);
301 Self::update_hash_to_key_map(hash_to_key_map, &k);
302 Ok(DeserializedCacheChange::new(timestamp, cc, k))
303 }
304 Err(e) => Err(ReadError::Deserialization {
305 reason: format!(
306 "Failed to deserialize key {}, Topic = {}, Type = {:?}",
307 e,
308 self.my_topic.name(),
309 self.my_topic.get_type()
310 ),
311 }),
312 }
313 }
314
315 DDSData::DisposeByKeyHash { key_hash, .. } => {
316 if let Some(key) = hash_to_key_map.get(&key_hash) {
319 Ok(DeserializedCacheChange::new(
320 timestamp,
321 cc,
322 Sample::Dispose(key.clone()),
323 ))
324 } else {
325 Err(ReadError::UnknownKey {
326 details: format!(
327 "Received dispose with unknown key hash: {:x?}, Topic = {}, Type = {:?}",
328 key_hash,
329 self.my_topic.name(),
330 self.my_topic.get_type()
331 ),
332 })
333 }
334 }
335 } }
337
338 pub fn try_take_one(&self) -> ReadResult<Option<DeserializedCacheChange<D>>>
341 where
342 DA: DeserializerAdapter<D> + DefaultDecoder<D>,
343 {
344 Self::try_take_one_with(self, DA::DECODER)
345 }
346
347 #[allow(clippy::needless_pass_by_value)]
350 pub fn try_take_one_with<S>(&self, decoder: S) -> ReadResult<Option<DeserializedCacheChange<D>>>
351 where
352 S: Decode<DA::Decoded, DA::DecodedKey> + Clone,
353 {
354 let is_reliable = matches!(
355 self.qos_policy.reliability(),
356 Some(policy::Reliability::Reliable { .. })
357 );
358
359 let topic_cache = self.acquire_the_topic_cache_guard();
360
361 let mut read_state_ref = self.read_state.lock().unwrap();
362 let latest_instant = read_state_ref.latest_instant;
363 let (last_read_sn, hash_to_key_map) = read_state_ref.get_sn_map_and_hash_map();
364
365 loop {
367 let (timestamp, cc) =
368 match Self::try_take_undecoded(is_reliable, &topic_cache, latest_instant, last_read_sn)
369 .next()
370 {
371 None => return Ok(None), Some((ts, cc)) => (ts, cc),
373 };
374
375 let result = self.deserialize_with(timestamp, cc, hash_to_key_map, decoder.clone());
376
377 if let Err(ReadError::UnknownKey { .. }) = result {
378 } else {
380 let writer_guid = cc.writer_guid;
383 let sequence_number = cc.sequence_number;
384 read_state_ref.latest_instant = max(latest_instant, timestamp);
387 read_state_ref
388 .last_read_sn
389 .insert(writer_guid, sequence_number);
390
391 return result.map(Some);
401 }
402 }
403 }
404
405 pub fn qos(&self) -> &QosPolicies {
406 &self.qos_policy
407 }
408
409 pub fn guid(&self) -> GUID {
410 self.my_guid
411 }
412
413 pub fn topic(&self) -> &Topic {
414 &self.my_topic
415 }
416
417 pub fn as_async_stream<S>(&self) -> SimpleDataReaderStream<D, S, DA>
418 where
419 DA: DefaultDecoder<D, Decoder = S>,
420 DA::Decoder: Clone,
421 S: Decode<DA::Decoded, DA::DecodedKey>,
422 {
423 Self::as_async_stream_with(self, DA::DECODER)
424 }
425
426 pub fn as_async_stream_with<S>(&self, decoder: S) -> SimpleDataReaderStream<D, S, DA>
427 where
428 S: Decode<DA::Decoded, DA::DecodedKey> + Clone,
429 {
430 SimpleDataReaderStream {
431 simple_datareader: self,
432 decoder,
433 }
434 }
435
436 fn acquire_the_topic_cache_guard(&self) -> MutexGuard<TopicCache> {
437 self.topic_cache.lock().unwrap_or_else(|e| {
438 panic!(
439 "The topic cache of topic {} is poisoned. Error: {}",
440 &self.my_topic.name(),
441 e
442 )
443 })
444 }
445}
446
447impl<D, DA> mio_06::Evented for SimpleDataReader<D, DA>
450where
451 D: Keyed,
452 DA: DeserializerAdapter<D>,
453{
454 fn register(
457 &self,
458 poll: &mio_06::Poll,
459 token: mio_06::Token,
460 interest: mio_06::Ready,
461 opts: mio_06::PollOpt,
462 ) -> io::Result<()> {
463 self
464 .notification_receiver
465 .lock()
466 .unwrap()
467 .register(poll, token, interest, opts)
468 }
469
470 fn reregister(
471 &self,
472 poll: &mio_06::Poll,
473 token: mio_06::Token,
474 interest: mio_06::Ready,
475 opts: mio_06::PollOpt,
476 ) -> io::Result<()> {
477 self
478 .notification_receiver
479 .lock()
480 .unwrap()
481 .reregister(poll, token, interest, opts)
482 }
483
484 fn deregister(&self, poll: &mio_06::Poll) -> io::Result<()> {
485 self.notification_receiver.lock().unwrap().deregister(poll)
486 }
487}
488
489impl<D, DA> mio_08::event::Source for SimpleDataReader<D, DA>
490where
491 D: Keyed,
492 DA: DeserializerAdapter<D>,
493{
494 fn register(
495 &mut self,
496 registry: &mio_08::Registry,
497 token: mio_08::Token,
498 interests: mio_08::Interest,
499 ) -> io::Result<()> {
500 self.event_source.register(registry, token, interests)
501 }
502
503 fn reregister(
504 &mut self,
505 registry: &mio_08::Registry,
506 token: mio_08::Token,
507 interests: mio_08::Interest,
508 ) -> io::Result<()> {
509 self.event_source.reregister(registry, token, interests)
510 }
511
512 fn deregister(&mut self, registry: &mio_08::Registry) -> io::Result<()> {
513 self.event_source.deregister(registry)
514 }
515}
516
517impl<'a, D, DA> StatusEvented<'a, DataReaderStatus, SimpleDataReaderEventStream<'a, D, DA>>
518 for SimpleDataReader<D, DA>
519where
520 D: Keyed,
521 DA: DeserializerAdapter<D>,
522{
523 fn as_status_evented(&mut self) -> &dyn mio_06::Evented {
524 self.status_receiver.as_status_evented()
525 }
526
527 fn as_status_source(&mut self) -> &mut dyn mio_08::event::Source {
528 self.status_receiver.as_status_source()
529 }
530
531 fn as_async_status_stream(&'a self) -> SimpleDataReaderEventStream<'a, D, DA> {
532 SimpleDataReaderEventStream {
533 simple_datareader: self,
534 }
535 }
536
537 fn try_recv_status(&self) -> Option<DataReaderStatus> {
538 self.status_receiver.try_recv_status()
539 }
540}
541
542impl<D, DA> RTPSEntity for SimpleDataReader<D, DA>
543where
544 D: Keyed + DeserializeOwned,
545 DA: DeserializerAdapter<D>,
546{
547 fn guid(&self) -> GUID {
548 self.my_guid
549 }
550}
551
552pub struct SimpleDataReaderStream<
558 'a,
559 D: Keyed + 'static,
560 S: Decode<DA::Decoded, DA::DecodedKey>,
561 DA: DeserializerAdapter<D> + 'static = CDRDeserializerAdapter<D>,
562> {
563 simple_datareader: &'a SimpleDataReader<D, DA>,
564 decoder: S,
565}
566
567impl<D, S, DA> Unpin for SimpleDataReaderStream<'_, D, S, DA>
572where
573 D: Keyed + 'static,
574 DA: DeserializerAdapter<D>,
575 S: Decode<DA::Decoded, DA::DecodedKey> + Unpin,
576{
577}
578
579impl<D, S, DA> Stream for SimpleDataReaderStream<'_, D, S, DA>
580where
581 D: Keyed + 'static,
582 DA: DeserializerAdapter<D>,
583 S: Decode<DA::Decoded, DA::DecodedKey> + Clone,
584{
585 type Item = ReadResult<DeserializedCacheChange<D>>;
586
587 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
595 debug!("poll_next");
596 match self
597 .simple_datareader
598 .try_take_one_with(self.decoder.clone())
599 {
600 Err(e) =>
601 {
603 Poll::Ready(Some(Err(e)))
604 }
605
606 Ok(Some(d)) => Poll::Ready(Some(Ok(d))),
608
609 Ok(None) => {
611 self.simple_datareader.set_waker(Some(cx.waker().clone()));
623 match self
624 .simple_datareader
625 .try_take_one_with(self.decoder.clone())
626 {
627 Err(e) => Poll::Ready(Some(Err(e))),
628 Ok(Some(d)) => Poll::Ready(Some(Ok(d))),
629 Ok(None) => Poll::Pending,
630 }
631 }
632 } } } impl<D, S, DA> FusedStream for SimpleDataReaderStream<'_, D, S, DA>
637where
638 D: Keyed + 'static,
639 DA: DeserializerAdapter<D>,
640 S: Decode<DA::Decoded, DA::DecodedKey> + Clone,
641{
642 fn is_terminated(&self) -> bool {
643 false }
645}
646
647pub struct SimpleDataReaderEventStream<
651 'a,
652 D: Keyed + 'static,
653 DA: DeserializerAdapter<D> + 'static = CDRDeserializerAdapter<D>,
654> {
655 simple_datareader: &'a SimpleDataReader<D, DA>,
656}
657
658impl<D, DA> Stream for SimpleDataReaderEventStream<'_, D, DA>
659where
660 D: Keyed + 'static,
661 DA: DeserializerAdapter<D>,
662{
663 type Item = DataReaderStatus;
664
665 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
666 Pin::new(
667 &mut self
668 .simple_datareader
669 .status_receiver
670 .as_async_status_stream(),
671 )
672 .poll_next(cx)
673 } } impl<D, DA> FusedStream for SimpleDataReaderEventStream<'_, D, DA>
677where
678 D: Keyed + 'static,
679 DA: DeserializerAdapter<D>,
680{
681 fn is_terminated(&self) -> bool {
682 self
683 .simple_datareader
684 .status_receiver
685 .as_async_status_stream()
686 .is_terminated()
687 }
688}