rustdds/dds/with_key/
simpledatareader.rs

1use std::{
2  cmp::max,
3  collections::BTreeMap,
4  io,
5  marker::PhantomData,
6  pin::Pin,
7  sync::{Arc, Mutex, MutexGuard},
8  task::{Context, Poll, Waker},
9};
10
11use futures::stream::{FusedStream, Stream};
12use serde::de::DeserializeOwned;
13use mio_extras::channel as mio_channel;
14#[allow(unused_imports)]
15use log::{debug, error, info, trace, warn};
16
17use crate::{
18  dds::{
19    adapters::with_key::{Decode, DefaultDecoder, DeserializerAdapter},
20    ddsdata::*,
21    key::*,
22    pubsub::Subscriber,
23    qos::*,
24    result::*,
25    statusevents::*,
26    topic::{Topic, TopicDescription},
27    with_key::datasample::{DeserializedCacheChange, Sample},
28  },
29  discovery::discovery::DiscoveryCommand,
30  mio_source::PollEventSource,
31  serialization::CDRDeserializerAdapter,
32  structure::{
33    cache_change::CacheChange,
34    dds_cache::TopicCache,
35    entity::RTPSEntity,
36    guid::{EntityId, GUID},
37    sequence_number::SequenceNumber,
38    time::Timestamp,
39  },
40};
41
42#[derive(Clone, Debug)]
43pub(crate) enum ReaderCommand {
44  #[allow(dead_code)] // TODO: Implement this (resetting) feature
45  ResetRequestedDeadlineStatus,
46}
47
48// This is helper struct.
49// All mutable state needed for reading should go here.
50pub(crate) struct ReadState<K: Key> {
51  latest_instant: Timestamp, /* This is used as a read pointer from dds_cache for BEST_EFFORT
52                              * reading */
53  last_read_sn: BTreeMap<GUID, SequenceNumber>, // collection of read pointers for RELIABLE reading
54  /// hash_to_key_map is used for decoding received key hashes back to original
55  /// key values. This is needed when we receive a dispose message via hash
56  /// only.
57  hash_to_key_map: BTreeMap<KeyHash, K>, // TODO: garbage collect this somehow
58}
59
60impl<K: Key> ReadState<K> {
61  fn new() -> Self {
62    ReadState {
63      latest_instant: Timestamp::ZERO,
64      last_read_sn: BTreeMap::new(),
65      hash_to_key_map: BTreeMap::<KeyHash, K>::new(),
66    }
67  }
68
69  // This is a helper function so that borrow checker understands
70  // that we are splitting one mutable borrow into two _disjoint_ mutable
71  // borrows.
72  fn get_sn_map_and_hash_map(
73    &mut self,
74  ) -> (
75    &mut BTreeMap<GUID, SequenceNumber>,
76    &mut BTreeMap<KeyHash, K>,
77  ) {
78    let ReadState {
79      last_read_sn,
80      hash_to_key_map,
81      ..
82    } = self;
83    (last_read_sn, hash_to_key_map)
84  }
85}
86
87/// SimpleDataReaders can only do "take" semantics and does not have
88/// any deduplication or other DataSampleCache functionality.
89pub struct SimpleDataReader<D: Keyed, DA: DeserializerAdapter<D> = CDRDeserializerAdapter<D>> {
90  my_subscriber: Subscriber,
91
92  my_topic: Topic,
93  qos_policy: QosPolicies,
94  my_guid: GUID,
95
96  // mio_channel::Receiver is not thread-safe, so Mutex protects it.
97  pub(crate) notification_receiver: Mutex<mio_channel::Receiver<()>>,
98
99  // SimpleDataReader stores a pointer to a mutex on the topic cache
100  topic_cache: Arc<Mutex<TopicCache>>,
101
102  read_state: Mutex<ReadState<<D as Keyed>::K>>,
103
104  deserializer_type: PhantomData<DA>, // This is to provide use for DA
105
106  discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
107  status_receiver: StatusChannelReceiver<DataReaderStatus>,
108
109  #[allow(dead_code)] // TODO: This is currently unused, because we do not implement
110  // resetting deadline missed status. Remove attribute when it is supported.
111  reader_command: mio_channel::SyncSender<ReaderCommand>,
112  data_reader_waker: Arc<Mutex<Option<Waker>>>,
113
114  event_source: PollEventSource,
115}
116
117impl<D, DA> Drop for SimpleDataReader<D, DA>
118where
119  D: Keyed,
120  DA: DeserializerAdapter<D>,
121{
122  fn drop(&mut self) {
123    // Tell dp_event_loop
124    self.my_subscriber.remove_reader(self.my_guid);
125
126    // Tell discovery
127    match self
128      .discovery_command
129      .send(DiscoveryCommand::RemoveLocalReader { guid: self.my_guid })
130    {
131      Ok(_) => {}
132      Err(mio_channel::SendError::Disconnected(_)) => {
133        debug!("Failed to send DiscoveryCommand::RemoveLocalReader . Maybe shutting down?");
134      }
135      Err(e) => error!(
136        "Failed to send DiscoveryCommand::RemoveLocalReader. {:?}",
137        e
138      ),
139    }
140  }
141}
142
143impl<D: 'static, DA> SimpleDataReader<D, DA>
144where
145  D: Keyed,
146  DA: DeserializerAdapter<D>,
147{
148  #[allow(clippy::too_many_arguments)]
149  pub(crate) fn new(
150    subscriber: Subscriber,
151    my_id: EntityId,
152    topic: Topic,
153    qos_policy: QosPolicies,
154    // Each notification sent to this channel must be try_recv'd
155    notification_receiver: mio_channel::Receiver<()>,
156    topic_cache: Arc<Mutex<TopicCache>>,
157    discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
158    status_receiver: StatusChannelReceiver<DataReaderStatus>,
159    reader_command: mio_channel::SyncSender<ReaderCommand>,
160    data_reader_waker: Arc<Mutex<Option<Waker>>>,
161    event_source: PollEventSource,
162  ) -> CreateResult<Self> {
163    let dp = match subscriber.participant() {
164      Some(dp) => dp,
165      None => {
166        return Err(CreateError::ResourceDropped {
167          reason: "Cannot create new DataReader, DomainParticipant doesn't exist.".to_string(),
168        })
169      }
170    };
171
172    let my_guid = GUID::new_with_prefix_and_id(dp.guid_prefix(), my_id);
173
174    // Verify that the topic cache corresponds to the topic of the Reader
175    let topic_cache_name = topic_cache.lock().unwrap().topic_name();
176    if topic.name() != topic_cache_name {
177      return Err(CreateError::Internal {
178        reason: format!(
179          "Topic name = {} and topic cache name = {} not equal when creating a SimpleDataReader",
180          topic.name(),
181          topic_cache_name
182        ),
183      });
184    }
185
186    Ok(Self {
187      my_subscriber: subscriber,
188      qos_policy,
189      my_guid,
190      notification_receiver: Mutex::new(notification_receiver),
191      topic_cache,
192      read_state: Mutex::new(ReadState::new()),
193      my_topic: topic,
194      deserializer_type: PhantomData,
195      discovery_command,
196      status_receiver,
197      reader_command,
198      data_reader_waker,
199      event_source,
200    })
201  }
202  pub(crate) fn set_waker(&self, w: Option<Waker>) {
203    *self.data_reader_waker.lock().unwrap() = w;
204  }
205
206  pub(crate) fn drain_read_notifications(&self) {
207    let rec = self.notification_receiver.lock().unwrap();
208    while rec.try_recv().is_ok() {}
209    self.event_source.drain();
210  }
211
212  fn try_take_undecoded<'a>(
213    is_reliable: bool,
214    topic_cache: &'a TopicCache,
215    latest_instant: Timestamp,
216    last_read_sn: &'a BTreeMap<GUID, SequenceNumber>,
217  ) -> Box<dyn Iterator<Item = (Timestamp, &'a CacheChange)> + 'a> {
218    if is_reliable {
219      topic_cache.get_changes_in_range_reliable(last_read_sn)
220    } else {
221      topic_cache.get_changes_in_range_best_effort(latest_instant, Timestamp::now())
222    }
223  }
224
225  fn update_hash_to_key_map(
226    hash_to_key_map: &mut BTreeMap<KeyHash, D::K>,
227    deserialized: &Sample<D, D::K>,
228  ) {
229    let instance_key = match deserialized {
230      Sample::Value(d) => d.key(),
231      Sample::Dispose(k) => k.clone(),
232    };
233    hash_to_key_map.insert(instance_key.hash_key(false), instance_key);
234  }
235
236  fn deserialize_with<S>(
237    &self,
238    timestamp: Timestamp,
239    cc: &CacheChange,
240    hash_to_key_map: &mut BTreeMap<KeyHash, D::K>,
241    decoder: S,
242  ) -> ReadResult<DeserializedCacheChange<D>>
243  where
244    S: Decode<DA::Decoded, DA::DecodedKey>,
245  {
246    match cc.data_value {
247      DDSData::Data {
248        ref serialized_payload,
249      } => {
250        // what is our data serialization format (representation identifier) ?
251        if let Some(recognized_rep_id) = DA::supported_encodings()
252          .iter()
253          .find(|r| **r == serialized_payload.representation_identifier)
254        {
255          match DA::from_bytes_with(&serialized_payload.value, *recognized_rep_id, decoder) {
256            // Data update, decoded ok
257            Ok(payload) => {
258              let p = Sample::Value(payload);
259              Self::update_hash_to_key_map(hash_to_key_map, &p);
260              Ok(DeserializedCacheChange::new(timestamp, cc, p))
261            }
262            Err(e) => Err(ReadError::Deserialization {
263              reason: format!(
264                "Failed to deserialize sample bytes: {}, , Topic = {}, Type = {:?}",
265                e,
266                self.my_topic.name(),
267                self.my_topic.get_type()
268              ),
269            }),
270          }
271        } else {
272          info!(
273            "Unknown representation id: {:?} , Topic = {}, Type = {:?} data = {:02x?}",
274            serialized_payload.representation_identifier,
275            self.my_topic.name(),
276            self.my_topic.get_type(),
277            serialized_payload.value,
278          );
279          Err(ReadError::Deserialization {
280            reason: format!(
281              "Unknown representation id {:?} , Topic = {}, Type = {:?}",
282              serialized_payload.representation_identifier,
283              self.my_topic.name(),
284              self.my_topic.get_type()
285            ),
286          })
287        }
288      }
289
290      DDSData::DisposeByKey {
291        key: ref serialized_key,
292        ..
293      } => {
294        match DA::key_from_bytes_with(
295          &serialized_key.value,
296          serialized_key.representation_identifier,
297          decoder,
298        ) {
299          Ok(key) => {
300            let k = Sample::Dispose(key);
301            Self::update_hash_to_key_map(hash_to_key_map, &k);
302            Ok(DeserializedCacheChange::new(timestamp, cc, k))
303          }
304          Err(e) => Err(ReadError::Deserialization {
305            reason: format!(
306              "Failed to deserialize key {}, Topic = {}, Type = {:?}",
307              e,
308              self.my_topic.name(),
309              self.my_topic.get_type()
310            ),
311          }),
312        }
313      }
314
315      DDSData::DisposeByKeyHash { key_hash, .. } => {
316        // The cache should know hash -> key mapping even if the sample
317        // has been disposed or .take()n
318        if let Some(key) = hash_to_key_map.get(&key_hash) {
319          Ok(DeserializedCacheChange::new(
320            timestamp,
321            cc,
322            Sample::Dispose(key.clone()),
323          ))
324        } else {
325          Err(ReadError::UnknownKey {
326            details: format!(
327              "Received dispose with unknown key hash: {:x?}, Topic = {}, Type = {:?}",
328              key_hash,
329              self.my_topic.name(),
330              self.my_topic.get_type()
331            ),
332          })
333        }
334      }
335    } // match
336  }
337
338  /// Note: Always remember to call .drain_read_notifications() just before
339  /// calling this one. Otherwise, new notifications may not appear.
340  pub fn try_take_one(&self) -> ReadResult<Option<DeserializedCacheChange<D>>>
341  where
342    DA: DeserializerAdapter<D> + DefaultDecoder<D>,
343  {
344    Self::try_take_one_with(self, DA::DECODER)
345  }
346
347  /// Note: Always remember to call .drain_read_notifications() just before
348  /// calling this one. Otherwise, new notifications may not appear.
349  #[allow(clippy::needless_pass_by_value)]
350  pub fn try_take_one_with<S>(&self, decoder: S) -> ReadResult<Option<DeserializedCacheChange<D>>>
351  where
352    S: Decode<DA::Decoded, DA::DecodedKey> + Clone,
353  {
354    let is_reliable = matches!(
355      self.qos_policy.reliability(),
356      Some(policy::Reliability::Reliable { .. })
357    );
358
359    let topic_cache = self.acquire_the_topic_cache_guard();
360
361    let mut read_state_ref = self.read_state.lock().unwrap();
362    let latest_instant = read_state_ref.latest_instant;
363    let (last_read_sn, hash_to_key_map) = read_state_ref.get_sn_map_and_hash_map();
364
365    // loop in case we get a sample that should be ignored, so we try next.
366    loop {
367      let (timestamp, cc) =
368        match Self::try_take_undecoded(is_reliable, &topic_cache, latest_instant, last_read_sn)
369          .next()
370        {
371          None => return Ok(None), // no more data available right now
372          Some((ts, cc)) => (ts, cc),
373        };
374
375      let result = self.deserialize_with(timestamp, cc, hash_to_key_map, decoder.clone());
376
377      if let Err(ReadError::UnknownKey { .. }) = result {
378        // ignore unknown key hash, continue looping
379      } else {
380        // return with this result
381        // make copies of guid and SN to calm down borrow checker.
382        let writer_guid = cc.writer_guid;
383        let sequence_number = cc.sequence_number;
384        // Advance read pointer, error or not, because otherwise
385        // the SimpleDatareader is stuck.
386        read_state_ref.latest_instant = max(latest_instant, timestamp);
387        read_state_ref
388          .last_read_sn
389          .insert(writer_guid, sequence_number);
390
391        // // Debug sanity check:
392        // use crate::Duration;
393        // if Timestamp::now().duration_since(timestamp) > Duration::from_secs(1) {
394        //   error!("Sample delayed by {:?} , Topic = {} {:?}",
395        //     Timestamp::now().duration_since(timestamp), self.topic().name(),
396        //     sequence_number,
397        //      );
398        // }
399
400        return result.map(Some);
401      }
402    }
403  }
404
405  pub fn qos(&self) -> &QosPolicies {
406    &self.qos_policy
407  }
408
409  pub fn guid(&self) -> GUID {
410    self.my_guid
411  }
412
413  pub fn topic(&self) -> &Topic {
414    &self.my_topic
415  }
416
417  pub fn as_async_stream<S>(&self) -> SimpleDataReaderStream<D, S, DA>
418  where
419    DA: DefaultDecoder<D, Decoder = S>,
420    DA::Decoder: Clone,
421    S: Decode<DA::Decoded, DA::DecodedKey>,
422  {
423    Self::as_async_stream_with(self, DA::DECODER)
424  }
425
426  pub fn as_async_stream_with<S>(&self, decoder: S) -> SimpleDataReaderStream<D, S, DA>
427  where
428    S: Decode<DA::Decoded, DA::DecodedKey> + Clone,
429  {
430    SimpleDataReaderStream {
431      simple_datareader: self,
432      decoder,
433    }
434  }
435
436  fn acquire_the_topic_cache_guard(&self) -> MutexGuard<TopicCache> {
437    self.topic_cache.lock().unwrap_or_else(|e| {
438      panic!(
439        "The topic cache of topic {} is poisoned. Error: {}",
440        &self.my_topic.name(),
441        e
442      )
443    })
444  }
445}
446
447// This is  not part of DDS spec. We implement mio mio_06::Evented so that the
448// application can asynchronously poll DataReader(s).
449impl<D, DA> mio_06::Evented for SimpleDataReader<D, DA>
450where
451  D: Keyed,
452  DA: DeserializerAdapter<D>,
453{
454  // We just delegate all the operations to notification_receiver, since it
455  // already implements mio_06::Evented
456  fn register(
457    &self,
458    poll: &mio_06::Poll,
459    token: mio_06::Token,
460    interest: mio_06::Ready,
461    opts: mio_06::PollOpt,
462  ) -> io::Result<()> {
463    self
464      .notification_receiver
465      .lock()
466      .unwrap()
467      .register(poll, token, interest, opts)
468  }
469
470  fn reregister(
471    &self,
472    poll: &mio_06::Poll,
473    token: mio_06::Token,
474    interest: mio_06::Ready,
475    opts: mio_06::PollOpt,
476  ) -> io::Result<()> {
477    self
478      .notification_receiver
479      .lock()
480      .unwrap()
481      .reregister(poll, token, interest, opts)
482  }
483
484  fn deregister(&self, poll: &mio_06::Poll) -> io::Result<()> {
485    self.notification_receiver.lock().unwrap().deregister(poll)
486  }
487}
488
489impl<D, DA> mio_08::event::Source for SimpleDataReader<D, DA>
490where
491  D: Keyed,
492  DA: DeserializerAdapter<D>,
493{
494  fn register(
495    &mut self,
496    registry: &mio_08::Registry,
497    token: mio_08::Token,
498    interests: mio_08::Interest,
499  ) -> io::Result<()> {
500    self.event_source.register(registry, token, interests)
501  }
502
503  fn reregister(
504    &mut self,
505    registry: &mio_08::Registry,
506    token: mio_08::Token,
507    interests: mio_08::Interest,
508  ) -> io::Result<()> {
509    self.event_source.reregister(registry, token, interests)
510  }
511
512  fn deregister(&mut self, registry: &mio_08::Registry) -> io::Result<()> {
513    self.event_source.deregister(registry)
514  }
515}
516
517impl<'a, D, DA> StatusEvented<'a, DataReaderStatus, SimpleDataReaderEventStream<'a, D, DA>>
518  for SimpleDataReader<D, DA>
519where
520  D: Keyed,
521  DA: DeserializerAdapter<D>,
522{
523  fn as_status_evented(&mut self) -> &dyn mio_06::Evented {
524    self.status_receiver.as_status_evented()
525  }
526
527  fn as_status_source(&mut self) -> &mut dyn mio_08::event::Source {
528    self.status_receiver.as_status_source()
529  }
530
531  fn as_async_status_stream(&'a self) -> SimpleDataReaderEventStream<'a, D, DA> {
532    SimpleDataReaderEventStream {
533      simple_datareader: self,
534    }
535  }
536
537  fn try_recv_status(&self) -> Option<DataReaderStatus> {
538    self.status_receiver.try_recv_status()
539  }
540}
541
542impl<D, DA> RTPSEntity for SimpleDataReader<D, DA>
543where
544  D: Keyed + DeserializeOwned,
545  DA: DeserializerAdapter<D>,
546{
547  fn guid(&self) -> GUID {
548    self.my_guid
549  }
550}
551
552// ----------------------------------------------
553// ----------------------------------------------
554
555// Async interface to the SimpleDataReader
556
557pub struct SimpleDataReaderStream<
558  'a,
559  D: Keyed + 'static,
560  S: Decode<DA::Decoded, DA::DecodedKey>,
561  DA: DeserializerAdapter<D> + 'static = CDRDeserializerAdapter<D>,
562> {
563  simple_datareader: &'a SimpleDataReader<D, DA>,
564  decoder: S,
565}
566
567// ----------------------------------------------
568// ----------------------------------------------
569
570// https://users.rust-lang.org/t/take-in-impl-future-cannot-borrow-data-in-a-dereference-of-pin/52042
571impl<D, S, DA> Unpin for SimpleDataReaderStream<'_, D, S, DA>
572where
573  D: Keyed + 'static,
574  DA: DeserializerAdapter<D>,
575  S: Decode<DA::Decoded, DA::DecodedKey> + Unpin,
576{
577}
578
579impl<D, S, DA> Stream for SimpleDataReaderStream<'_, D, S, DA>
580where
581  D: Keyed + 'static,
582  DA: DeserializerAdapter<D>,
583  S: Decode<DA::Decoded, DA::DecodedKey> + Clone,
584{
585  type Item = ReadResult<DeserializedCacheChange<D>>;
586
587  // The full return type is now
588  // Poll<Option<Result<DeserializedCacheChange<D>>>
589  // Poll -> Ready or Pending
590  // Option -> Some = stream produces a value, None = stream has ended (does not
591  // occur) Result -> Ok = No DDS error, Err = DDS processing error
592  // (inner Option -> Some = there is new value/key, None = no new data yet)
593
594  fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
595    debug!("poll_next");
596    match self
597      .simple_datareader
598      .try_take_one_with(self.decoder.clone())
599    {
600      Err(e) =>
601      // DDS fails
602      {
603        Poll::Ready(Some(Err(e)))
604      }
605
606      // ok, got something
607      Ok(Some(d)) => Poll::Ready(Some(Ok(d))),
608
609      // No new data (yet)
610      Ok(None) => {
611        // Did not get any data.
612        // --> Store waker.
613        // 1. synchronously store waker to background thread (must rendezvous)
614        // 2. try take_bare again, in case something arrived just now
615        // 3. if nothing still, return pending.
616
617        // // DEBUG
618        // if self.simple_datareader.guid().entity_id.entity_kind.is_user_defined() {
619        //   error!("Setting waker for {:?}", self.simple_datareader.topic().name());
620        // }
621        // // DEBUG
622        self.simple_datareader.set_waker(Some(cx.waker().clone()));
623        match self
624          .simple_datareader
625          .try_take_one_with(self.decoder.clone())
626        {
627          Err(e) => Poll::Ready(Some(Err(e))),
628          Ok(Some(d)) => Poll::Ready(Some(Ok(d))),
629          Ok(None) => Poll::Pending,
630        }
631      }
632    } // match
633  } // fn
634} // impl
635
636impl<D, S, DA> FusedStream for SimpleDataReaderStream<'_, D, S, DA>
637where
638  D: Keyed + 'static,
639  DA: DeserializerAdapter<D>,
640  S: Decode<DA::Decoded, DA::DecodedKey> + Clone,
641{
642  fn is_terminated(&self) -> bool {
643    false // Never terminate. This means it is always valid to call poll_next().
644  }
645}
646
647// ----------------------------------------------
648// ----------------------------------------------
649
650pub struct SimpleDataReaderEventStream<
651  'a,
652  D: Keyed + 'static,
653  DA: DeserializerAdapter<D> + 'static = CDRDeserializerAdapter<D>,
654> {
655  simple_datareader: &'a SimpleDataReader<D, DA>,
656}
657
658impl<D, DA> Stream for SimpleDataReaderEventStream<'_, D, DA>
659where
660  D: Keyed + 'static,
661  DA: DeserializerAdapter<D>,
662{
663  type Item = DataReaderStatus;
664
665  fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
666    Pin::new(
667      &mut self
668        .simple_datareader
669        .status_receiver
670        .as_async_status_stream(),
671    )
672    .poll_next(cx)
673  } // fn
674} // impl
675
676impl<D, DA> FusedStream for SimpleDataReaderEventStream<'_, D, DA>
677where
678  D: Keyed + 'static,
679  DA: DeserializerAdapter<D>,
680{
681  fn is_terminated(&self) -> bool {
682    self
683      .simple_datareader
684      .status_receiver
685      .as_async_status_stream()
686      .is_terminated()
687  }
688}