rustdds/dds/with_key/
simpledatareader.rs

1use std::{
2  cmp::max,
3  collections::BTreeMap,
4  io,
5  marker::PhantomData,
6  pin::Pin,
7  sync::{Arc, Mutex, MutexGuard},
8  task::{Context, Poll, Waker},
9};
10
11use futures::stream::{FusedStream, Stream};
12use serde::de::DeserializeOwned;
13use mio_extras::channel as mio_channel;
14#[allow(unused_imports)]
15use log::{debug, error, info, trace, warn};
16
17use crate::{
18  dds::{
19    adapters::with_key::{Decode, DefaultDecoder, DeserializerAdapter},
20    ddsdata::*,
21    key::*,
22    pubsub::Subscriber,
23    qos::*,
24    result::*,
25    statusevents::*,
26    topic::{Topic, TopicDescription},
27    with_key::datasample::{DeserializedCacheChange, Sample},
28  },
29  discovery::discovery::DiscoveryCommand,
30  mio_source::PollEventSource,
31  serialization::CDRDeserializerAdapter,
32  structure::{
33    cache_change::CacheChange,
34    dds_cache::TopicCache,
35    entity::RTPSEntity,
36    guid::{EntityId, GUID},
37    sequence_number::SequenceNumber,
38    time::Timestamp,
39  },
40};
41
42#[derive(Clone, Debug)]
43pub(crate) enum ReaderCommand {
44  #[allow(dead_code)] // TODO: Implement this (resetting) feature
45  ResetRequestedDeadlineStatus,
46}
47
48// This is helper struct.
49// All mutable state needed for reading should go here.
50pub(crate) struct ReadState<K: Key> {
51  latest_instant: Timestamp, /* This is used as a read pointer from dds_cache for BEST_EFFORT
52                              * reading */
53  last_read_sn: BTreeMap<GUID, SequenceNumber>, // collection of read pointers for RELIABLE reading
54  /// hash_to_key_map is used for decoding received key hashes back to original
55  /// key values. This is needed when we receive a dispose message via hash
56  /// only.
57  hash_to_key_map: BTreeMap<KeyHash, K>, // TODO: garbage collect this somehow
58}
59
60impl<K: Key> ReadState<K> {
61  fn new() -> Self {
62    ReadState {
63      latest_instant: Timestamp::ZERO,
64      last_read_sn: BTreeMap::new(),
65      hash_to_key_map: BTreeMap::<KeyHash, K>::new(),
66    }
67  }
68
69  // This is a helper function so that borrow checker understands
70  // that we are splitting one mutable borrow into two _disjoint_ mutable
71  // borrows.
72  fn get_sn_map_and_hash_map(
73    &mut self,
74  ) -> (
75    &mut BTreeMap<GUID, SequenceNumber>,
76    &mut BTreeMap<KeyHash, K>,
77  ) {
78    let ReadState {
79      last_read_sn,
80      hash_to_key_map,
81      ..
82    } = self;
83    (last_read_sn, hash_to_key_map)
84  }
85}
86
87/// SimpleDataReaders can only do "take" semantics and does not have
88/// any deduplication or other DataSampleCache functionality.
89pub struct SimpleDataReader<D: Keyed, DA: DeserializerAdapter<D> = CDRDeserializerAdapter<D>> {
90  my_subscriber: Subscriber,
91
92  my_topic: Topic,
93  qos_policy: QosPolicies,
94  my_guid: GUID,
95
96  // mio_channel::Receiver is not thread-safe, so Mutex protects it.
97  pub(crate) notification_receiver: Mutex<mio_channel::Receiver<()>>,
98
99  // SimpleDataReader stores a pointer to a mutex on the topic cache
100  topic_cache: Arc<Mutex<TopicCache>>,
101
102  read_state: Mutex<ReadState<<D as Keyed>::K>>,
103
104  deserializer_type: PhantomData<DA>, // This is to provide use for DA
105
106  discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
107  status_receiver: StatusChannelReceiver<DataReaderStatus>,
108
109  #[allow(dead_code)] // TODO: This is currently unused, because we do not implement
110  // resetting deadline missed status. Remove attribute when it is supported.
111  reader_command: mio_channel::SyncSender<ReaderCommand>,
112  data_reader_waker: Arc<Mutex<Option<Waker>>>,
113
114  event_source: PollEventSource,
115}
116
117impl<D, DA> Drop for SimpleDataReader<D, DA>
118where
119  D: Keyed,
120  DA: DeserializerAdapter<D>,
121{
122  fn drop(&mut self) {
123    // Tell dp_event_loop
124    self.my_subscriber.remove_reader(self.my_guid);
125
126    // Tell discovery
127    match self
128      .discovery_command
129      .send(DiscoveryCommand::RemoveLocalReader { guid: self.my_guid })
130    {
131      Ok(_) => {}
132      Err(mio_channel::SendError::Disconnected(_)) => {
133        debug!("Failed to send DiscoveryCommand::RemoveLocalReader . Maybe shutting down?");
134      }
135      Err(e) => error!("Failed to send DiscoveryCommand::RemoveLocalReader. {e:?}"),
136    }
137  }
138}
139
140impl<D: 'static, DA> SimpleDataReader<D, DA>
141where
142  D: Keyed,
143  DA: DeserializerAdapter<D>,
144{
145  #[allow(clippy::too_many_arguments)]
146  pub(crate) fn new(
147    subscriber: Subscriber,
148    my_id: EntityId,
149    topic: Topic,
150    qos_policy: QosPolicies,
151    // Each notification sent to this channel must be try_recv'd
152    notification_receiver: mio_channel::Receiver<()>,
153    topic_cache: Arc<Mutex<TopicCache>>,
154    discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
155    status_receiver: StatusChannelReceiver<DataReaderStatus>,
156    reader_command: mio_channel::SyncSender<ReaderCommand>,
157    data_reader_waker: Arc<Mutex<Option<Waker>>>,
158    event_source: PollEventSource,
159  ) -> CreateResult<Self> {
160    let dp = match subscriber.participant() {
161      Some(dp) => dp,
162      None => {
163        return Err(CreateError::ResourceDropped {
164          reason: "Cannot create new DataReader, DomainParticipant doesn't exist.".to_string(),
165        })
166      }
167    };
168
169    let my_guid = GUID::new_with_prefix_and_id(dp.guid_prefix(), my_id);
170
171    // Verify that the topic cache corresponds to the topic of the Reader
172    let topic_cache_name = topic_cache.lock().unwrap().topic_name();
173    if topic.name() != topic_cache_name {
174      return Err(CreateError::Internal {
175        reason: format!(
176          "Topic name = {} and topic cache name = {} not equal when creating a SimpleDataReader",
177          topic.name(),
178          topic_cache_name
179        ),
180      });
181    }
182
183    Ok(Self {
184      my_subscriber: subscriber,
185      qos_policy,
186      my_guid,
187      notification_receiver: Mutex::new(notification_receiver),
188      topic_cache,
189      read_state: Mutex::new(ReadState::new()),
190      my_topic: topic,
191      deserializer_type: PhantomData,
192      discovery_command,
193      status_receiver,
194      reader_command,
195      data_reader_waker,
196      event_source,
197    })
198  }
199  pub(crate) fn set_waker(&self, w: Option<Waker>) {
200    *self.data_reader_waker.lock().unwrap() = w;
201  }
202
203  pub(crate) fn drain_read_notifications(&self) {
204    let rec = self.notification_receiver.lock().unwrap();
205    while rec.try_recv().is_ok() {}
206    self.event_source.drain();
207  }
208
209  fn try_take_undecoded<'a, 'b: 'a>(
210    is_reliable: bool,
211    topic_cache: &'a TopicCache,
212    latest_instant: Timestamp,
213    last_read_sn: &'b BTreeMap<GUID, SequenceNumber>,
214  ) -> impl Iterator<Item = (Timestamp, &'a CacheChange)> {
215    topic_cache.get_changes_in_range(is_reliable, latest_instant, last_read_sn)
216  }
217
218  fn update_hash_to_key_map(
219    hash_to_key_map: &mut BTreeMap<KeyHash, D::K>,
220    deserialized: &Sample<D, D::K>,
221  ) {
222    let instance_key = match deserialized {
223      Sample::Value(d) => d.key(),
224      Sample::Dispose(k) => k.clone(),
225    };
226    hash_to_key_map.insert(instance_key.hash_key(false), instance_key);
227  }
228
229  fn deserialize_with<S>(
230    &self,
231    timestamp: Timestamp,
232    cc: &CacheChange,
233    hash_to_key_map: &mut BTreeMap<KeyHash, D::K>,
234    decoder: S,
235  ) -> ReadResult<DeserializedCacheChange<D>>
236  where
237    S: Decode<DA::Decoded, DA::DecodedKey>,
238  {
239    match cc.data_value {
240      DDSData::Data {
241        ref serialized_payload,
242      } => {
243        // what is our data serialization format (representation identifier) ?
244        if let Some(recognized_rep_id) = DA::supported_encodings()
245          .iter()
246          .find(|r| **r == serialized_payload.representation_identifier)
247        {
248          match DA::from_bytes_with(&serialized_payload.value, *recognized_rep_id, decoder) {
249            // Data update, decoded ok
250            Ok(payload) => {
251              let p = Sample::Value(payload);
252              Self::update_hash_to_key_map(hash_to_key_map, &p);
253              Ok(DeserializedCacheChange::new(timestamp, cc, p))
254            }
255            Err(e) => Err(ReadError::Deserialization {
256              reason: format!(
257                "Failed to deserialize sample bytes: {}, , Topic = {}, Type = {:?}",
258                e,
259                self.my_topic.name(),
260                self.my_topic.get_type()
261              ),
262            }),
263          }
264        } else {
265          info!(
266            "Unknown representation id: {:?} , Topic = {}, Type = {:?} data = {:02x?}",
267            serialized_payload.representation_identifier,
268            self.my_topic.name(),
269            self.my_topic.get_type(),
270            serialized_payload.value,
271          );
272          Err(ReadError::Deserialization {
273            reason: format!(
274              "Unknown representation id {:?} , Topic = {}, Type = {:?}",
275              serialized_payload.representation_identifier,
276              self.my_topic.name(),
277              self.my_topic.get_type()
278            ),
279          })
280        }
281      }
282
283      DDSData::DisposeByKey {
284        key: ref serialized_key,
285        ..
286      } => {
287        match DA::key_from_bytes_with(
288          &serialized_key.value,
289          serialized_key.representation_identifier,
290          decoder,
291        ) {
292          Ok(key) => {
293            let k = Sample::Dispose(key);
294            Self::update_hash_to_key_map(hash_to_key_map, &k);
295            Ok(DeserializedCacheChange::new(timestamp, cc, k))
296          }
297          Err(e) => Err(ReadError::Deserialization {
298            reason: format!(
299              "Failed to deserialize key {}, Topic = {}, Type = {:?}",
300              e,
301              self.my_topic.name(),
302              self.my_topic.get_type()
303            ),
304          }),
305        }
306      }
307
308      DDSData::DisposeByKeyHash { key_hash, .. } => {
309        // The cache should know hash -> key mapping even if the sample
310        // has been disposed or .take()n
311        if let Some(key) = hash_to_key_map.get(&key_hash) {
312          Ok(DeserializedCacheChange::new(
313            timestamp,
314            cc,
315            Sample::Dispose(key.clone()),
316          ))
317        } else {
318          Err(ReadError::UnknownKey {
319            details: format!(
320              "Received dispose with unknown key hash: {:x?}, Topic = {}, Type = {:?}",
321              key_hash,
322              self.my_topic.name(),
323              self.my_topic.get_type()
324            ),
325          })
326        }
327      }
328    } // match
329  }
330
331  /// Note: Always remember to call .drain_read_notifications() just before
332  /// calling this one. Otherwise, new notifications may not appear.
333  pub fn try_take_one(&self) -> ReadResult<Option<DeserializedCacheChange<D>>>
334  where
335    DA: DeserializerAdapter<D> + DefaultDecoder<D>,
336  {
337    Self::try_take_one_with(self, DA::DECODER)
338  }
339
340  /// Note: Always remember to call .drain_read_notifications() just before
341  /// calling this one. Otherwise, new notifications may not appear.
342  #[allow(clippy::needless_pass_by_value)]
343  pub fn try_take_one_with<S>(&self, decoder: S) -> ReadResult<Option<DeserializedCacheChange<D>>>
344  where
345    S: Decode<DA::Decoded, DA::DecodedKey> + Clone,
346  {
347    let is_reliable = matches!(
348      self.qos_policy.reliability(),
349      Some(policy::Reliability::Reliable { .. })
350    );
351
352    let topic_cache = self.acquire_the_topic_cache_guard();
353
354    let mut read_state_ref = self.read_state.lock().unwrap();
355    let latest_instant = read_state_ref.latest_instant;
356    let (last_read_sn, hash_to_key_map) = read_state_ref.get_sn_map_and_hash_map();
357
358    let mut changes =
359      Self::try_take_undecoded(is_reliable, &topic_cache, latest_instant, last_read_sn);
360
361    // loop in case we get a sample that should be ignored, so we try next.
362    loop {
363      let (timestamp, cc) = match changes.next() {
364        None => return Ok(None), // no more data available right now
365        Some((ts, cc)) => (ts, cc),
366      };
367
368      let result = self.deserialize_with(timestamp, cc, hash_to_key_map, decoder.clone());
369
370      if let Err(ReadError::UnknownKey { .. }) = result {
371        // ignore unknown key hash, continue looping
372      } else {
373        // explicitly drop the cache changes to update the reader state
374        drop(changes);
375
376        // return with this result
377        // make copies of guid and SN to calm down borrow checker.
378        let writer_guid = cc.writer_guid;
379        let sequence_number = cc.sequence_number;
380        // Advance read pointer, error or not, because otherwise
381        // the SimpleDatareader is stuck.
382        read_state_ref.latest_instant = max(latest_instant, timestamp);
383        read_state_ref
384          .last_read_sn
385          .insert(writer_guid, sequence_number);
386
387        // // Debug sanity check:
388        // use crate::Duration;
389        // if Timestamp::now().duration_since(timestamp) > Duration::from_secs(1) {
390        //   error!("Sample delayed by {:?} , Topic = {} {:?}",
391        //     Timestamp::now().duration_since(timestamp), self.topic().name(),
392        //     sequence_number,
393        //      );
394        // }
395
396        return result.map(Some);
397      }
398    }
399  }
400
401  pub fn qos(&self) -> &QosPolicies {
402    &self.qos_policy
403  }
404
405  pub fn guid(&self) -> GUID {
406    self.my_guid
407  }
408
409  pub fn topic(&self) -> &Topic {
410    &self.my_topic
411  }
412
413  pub fn as_async_stream<S>(&self) -> SimpleDataReaderStream<'_, D, S, DA>
414  where
415    DA: DefaultDecoder<D, Decoder = S>,
416    DA::Decoder: Clone,
417    S: Decode<DA::Decoded, DA::DecodedKey>,
418  {
419    Self::as_async_stream_with(self, DA::DECODER)
420  }
421
422  pub fn as_async_stream_with<S>(&self, decoder: S) -> SimpleDataReaderStream<'_, D, S, DA>
423  where
424    S: Decode<DA::Decoded, DA::DecodedKey> + Clone,
425  {
426    SimpleDataReaderStream {
427      simple_datareader: self,
428      decoder,
429    }
430  }
431
432  fn acquire_the_topic_cache_guard(&self) -> MutexGuard<'_, TopicCache> {
433    self.topic_cache.lock().unwrap_or_else(|e| {
434      panic!(
435        "The topic cache of topic {} is poisoned. Error: {}",
436        &self.my_topic.name(),
437        e
438      )
439    })
440  }
441}
442
443// This is  not part of DDS spec. We implement mio mio_06::Evented so that the
444// application can asynchronously poll DataReader(s).
445impl<D, DA> mio_06::Evented for SimpleDataReader<D, DA>
446where
447  D: Keyed,
448  DA: DeserializerAdapter<D>,
449{
450  // We just delegate all the operations to notification_receiver, since it
451  // already implements mio_06::Evented
452  fn register(
453    &self,
454    poll: &mio_06::Poll,
455    token: mio_06::Token,
456    interest: mio_06::Ready,
457    opts: mio_06::PollOpt,
458  ) -> io::Result<()> {
459    self
460      .notification_receiver
461      .lock()
462      .unwrap()
463      .register(poll, token, interest, opts)
464  }
465
466  fn reregister(
467    &self,
468    poll: &mio_06::Poll,
469    token: mio_06::Token,
470    interest: mio_06::Ready,
471    opts: mio_06::PollOpt,
472  ) -> io::Result<()> {
473    self
474      .notification_receiver
475      .lock()
476      .unwrap()
477      .reregister(poll, token, interest, opts)
478  }
479
480  fn deregister(&self, poll: &mio_06::Poll) -> io::Result<()> {
481    self.notification_receiver.lock().unwrap().deregister(poll)
482  }
483}
484
485impl<D, DA> mio_08::event::Source for SimpleDataReader<D, DA>
486where
487  D: Keyed,
488  DA: DeserializerAdapter<D>,
489{
490  fn register(
491    &mut self,
492    registry: &mio_08::Registry,
493    token: mio_08::Token,
494    interests: mio_08::Interest,
495  ) -> io::Result<()> {
496    self.event_source.register(registry, token, interests)
497  }
498
499  fn reregister(
500    &mut self,
501    registry: &mio_08::Registry,
502    token: mio_08::Token,
503    interests: mio_08::Interest,
504  ) -> io::Result<()> {
505    self.event_source.reregister(registry, token, interests)
506  }
507
508  fn deregister(&mut self, registry: &mio_08::Registry) -> io::Result<()> {
509    self.event_source.deregister(registry)
510  }
511}
512
513impl<'a, D, DA> StatusEvented<'a, DataReaderStatus, SimpleDataReaderEventStream<'a, D, DA>>
514  for SimpleDataReader<D, DA>
515where
516  D: Keyed,
517  DA: DeserializerAdapter<D>,
518{
519  fn as_status_evented(&mut self) -> &dyn mio_06::Evented {
520    self.status_receiver.as_status_evented()
521  }
522
523  fn as_status_source(&mut self) -> &mut dyn mio_08::event::Source {
524    self.status_receiver.as_status_source()
525  }
526
527  fn as_async_status_stream(&'a self) -> SimpleDataReaderEventStream<'a, D, DA> {
528    SimpleDataReaderEventStream {
529      simple_datareader: self,
530    }
531  }
532
533  fn try_recv_status(&self) -> Option<DataReaderStatus> {
534    self.status_receiver.try_recv_status()
535  }
536}
537
538impl<D, DA> RTPSEntity for SimpleDataReader<D, DA>
539where
540  D: Keyed + DeserializeOwned,
541  DA: DeserializerAdapter<D>,
542{
543  fn guid(&self) -> GUID {
544    self.my_guid
545  }
546}
547
548// ----------------------------------------------
549// ----------------------------------------------
550
551// Async interface to the SimpleDataReader
552
553pub struct SimpleDataReaderStream<
554  'a,
555  D: Keyed + 'static,
556  S: Decode<DA::Decoded, DA::DecodedKey>,
557  DA: DeserializerAdapter<D> + 'static = CDRDeserializerAdapter<D>,
558> {
559  simple_datareader: &'a SimpleDataReader<D, DA>,
560  decoder: S,
561}
562
563// ----------------------------------------------
564// ----------------------------------------------
565
566// https://users.rust-lang.org/t/take-in-impl-future-cannot-borrow-data-in-a-dereference-of-pin/52042
567impl<D, S, DA> Unpin for SimpleDataReaderStream<'_, D, S, DA>
568where
569  D: Keyed + 'static,
570  DA: DeserializerAdapter<D>,
571  S: Decode<DA::Decoded, DA::DecodedKey> + Unpin,
572{
573}
574
575impl<D, S, DA> Stream for SimpleDataReaderStream<'_, D, S, DA>
576where
577  D: Keyed + 'static,
578  DA: DeserializerAdapter<D>,
579  S: Decode<DA::Decoded, DA::DecodedKey> + Clone,
580{
581  type Item = ReadResult<DeserializedCacheChange<D>>;
582
583  // The full return type is now
584  // Poll<Option<Result<DeserializedCacheChange<D>>>
585  // Poll -> Ready or Pending
586  // Option -> Some = stream produces a value, None = stream has ended (does not
587  // occur) Result -> Ok = No DDS error, Err = DDS processing error
588  // (inner Option -> Some = there is new value/key, None = no new data yet)
589
590  fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
591    debug!("poll_next");
592    match self
593      .simple_datareader
594      .try_take_one_with(self.decoder.clone())
595    {
596      Err(e) =>
597      // DDS fails
598      {
599        Poll::Ready(Some(Err(e)))
600      }
601
602      // ok, got something
603      Ok(Some(d)) => Poll::Ready(Some(Ok(d))),
604
605      // No new data (yet)
606      Ok(None) => {
607        // Did not get any data.
608        // --> Store waker.
609        // 1. synchronously store waker to background thread (must rendezvous)
610        // 2. try take_bare again, in case something arrived just now
611        // 3. if nothing still, return pending.
612
613        // // DEBUG
614        // if self.simple_datareader.guid().entity_id.entity_kind.is_user_defined() {
615        //   error!("Setting waker for {:?}", self.simple_datareader.topic().name());
616        // }
617        // // DEBUG
618        self.simple_datareader.set_waker(Some(cx.waker().clone()));
619        match self
620          .simple_datareader
621          .try_take_one_with(self.decoder.clone())
622        {
623          Err(e) => Poll::Ready(Some(Err(e))),
624          Ok(Some(d)) => Poll::Ready(Some(Ok(d))),
625          Ok(None) => Poll::Pending,
626        }
627      }
628    } // match
629  } // fn
630} // impl
631
632impl<D, S, DA> FusedStream for SimpleDataReaderStream<'_, D, S, DA>
633where
634  D: Keyed + 'static,
635  DA: DeserializerAdapter<D>,
636  S: Decode<DA::Decoded, DA::DecodedKey> + Clone,
637{
638  fn is_terminated(&self) -> bool {
639    false // Never terminate. This means it is always valid to call poll_next().
640  }
641}
642
643// ----------------------------------------------
644// ----------------------------------------------
645
646pub struct SimpleDataReaderEventStream<
647  'a,
648  D: Keyed + 'static,
649  DA: DeserializerAdapter<D> + 'static = CDRDeserializerAdapter<D>,
650> {
651  simple_datareader: &'a SimpleDataReader<D, DA>,
652}
653
654impl<D, DA> Stream for SimpleDataReaderEventStream<'_, D, DA>
655where
656  D: Keyed + 'static,
657  DA: DeserializerAdapter<D>,
658{
659  type Item = DataReaderStatus;
660
661  fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
662    Pin::new(
663      &mut self
664        .simple_datareader
665        .status_receiver
666        .as_async_status_stream(),
667    )
668    .poll_next(cx)
669  } // fn
670} // impl
671
672impl<D, DA> FusedStream for SimpleDataReaderEventStream<'_, D, DA>
673where
674  D: Keyed + 'static,
675  DA: DeserializerAdapter<D>,
676{
677  fn is_terminated(&self) -> bool {
678    self
679      .simple_datareader
680      .status_receiver
681      .as_async_status_stream()
682      .is_terminated()
683  }
684}