rustdds/dds/no_key/
datareader.rs

1use std::{
2  io,
3  pin::Pin,
4  task::{Context, Poll},
5};
6
7use futures::stream::{FusedStream, Stream};
8
9use crate::{
10  dds::{
11    adapters::no_key::{DefaultDecoder, DeserializerAdapter},
12    no_key::datasample::DataSample,
13    qos::{HasQoSPolicy, QosPolicies},
14    readcondition::ReadCondition,
15    result::ReadResult,
16    statusevents::DataReaderStatus,
17    with_key::{
18      datareader as datareader_with_key,
19      datasample::{DataSample as WithKeyDataSample, Sample},
20      BareDataReaderStream as WithKeyBareDataReaderStream, DataReader as WithKeyDataReader,
21      DataReaderEventStream as WithKeyDataReaderEventStream,
22      DataReaderStream as WithKeyDataReaderStream,
23    },
24  },
25  serialization::CDRDeserializerAdapter,
26  structure::entity::RTPSEntity,
27  StatusEvented, GUID,
28};
29use super::wrappers::{DAWrapper, NoKeyWrapper};
30
31/// Simplified type for CDR encoding
32pub type DataReaderCdr<D> = DataReader<D, CDRDeserializerAdapter<D>>;
33
34// ----------------------------------------------------
35
36// DataReader for NO_KEY data. Does not require "D: Keyed"
37/// DDS DataReader for no key topics.
38/// # Examples
39///
40/// ```
41/// use serde::{Serialize, Deserialize};
42/// use rustdds::*;
43/// use rustdds::no_key::DataReader;
44/// use rustdds::serialization::CDRDeserializerAdapter;
45///
46/// let domain_participant = DomainParticipant::new(0).unwrap();
47/// let qos = QosPolicyBuilder::new().build();
48/// let subscriber = domain_participant.create_subscriber(&qos).unwrap();
49///
50/// #[derive(Serialize, Deserialize)]
51/// struct SomeType {}
52///
53/// // NoKey is important
54/// let topic = domain_participant.create_topic("some_topic".to_string(), "SomeType".to_string(), &qos, TopicKind::NoKey).unwrap();
55/// let data_reader = subscriber.create_datareader_no_key::<SomeType, CDRDeserializerAdapter<_>>(&topic, None);
56/// ```
57pub struct DataReader<D, DA: DeserializerAdapter<D> = CDRDeserializerAdapter<D>> {
58  keyed_datareader: datareader_with_key::DataReader<NoKeyWrapper<D>, DAWrapper<DA>>,
59}
60
61// TODO: rewrite DataSample so it can use current Keyed version (and send back
62// datasamples instead of current data)
63impl<D: 'static, DA> DataReader<D, DA>
64where
65  DA: DeserializerAdapter<D>,
66{
67  pub(crate) fn from_keyed(
68    keyed: datareader_with_key::DataReader<NoKeyWrapper<D>, DAWrapper<DA>>,
69  ) -> Self {
70    Self {
71      keyed_datareader: keyed,
72    }
73  }
74}
75
76impl<D: 'static, DA> DataReader<D, DA>
77where
78  DA: DefaultDecoder<D>,
79{
80  /// Reads amount of samples found with `max_samples` and `read_condition`
81  /// parameters.
82  ///
83  /// # Arguments
84  ///
85  /// * `max_samples` - Limits maximum amount of samples read
86  /// * `read_condition` - Limits results by condition
87  ///
88  /// # Examples
89  ///
90  /// ```
91  /// # use serde::{Serialize, Deserialize};
92  /// # use rustdds::*;
93  /// # use rustdds::serialization::CDRDeserializerAdapter;
94  ///
95  /// # let domain_participant = DomainParticipant::new(0).unwrap();
96  /// # let qos = QosPolicyBuilder::new().build();
97  /// # let subscriber = domain_participant.create_subscriber(&qos).unwrap();
98  /// #
99  /// # // NoKey is important
100  /// # let topic = domain_participant.create_topic("some_topic".to_string(), "SomeType".to_string(), &qos, TopicKind::NoKey).unwrap();
101  /// #
102  /// # #[derive(Serialize, Deserialize)]
103  /// # struct SomeType {}
104  /// #
105  /// let mut data_reader = subscriber.create_datareader_no_key::<SomeType, CDRDeserializerAdapter<_>>(&topic, None).unwrap();
106  /// let data = data_reader.read(10, ReadCondition::not_read());
107  /// ```
108  pub fn read(
109    &mut self,
110    max_samples: usize,
111    read_condition: ReadCondition,
112  ) -> ReadResult<Vec<DataSample<&D>>> {
113    let values: Vec<WithKeyDataSample<&NoKeyWrapper<D>>> =
114      self.keyed_datareader.read(max_samples, read_condition)?;
115    let mut result = Vec::with_capacity(values.len());
116    for ks in values {
117      if let Some(s) = DataSample::<D>::from_with_key_ref(ks) {
118        result.push(s);
119      }
120    }
121    Ok(result)
122  }
123
124  /// Takes amount of sample found with `max_samples` and `read_condition`
125  /// parameters.
126  ///
127  /// # Arguments
128  ///
129  /// * `max_samples` - Limits maximum amount of samples read
130  /// * `read_condition` - Limits results by condition
131  ///
132  /// # Examples
133  ///
134  /// ```
135  /// # use serde::{Serialize, Deserialize};
136  /// # use rustdds::*;
137  /// # use rustdds::no_key::DataReader;
138  /// # use rustdds::serialization::CDRDeserializerAdapter;
139  /// #
140  /// # let domain_participant = DomainParticipant::new(0).unwrap();
141  /// # let qos = QosPolicyBuilder::new().build();
142  /// # let subscriber = domain_participant.create_subscriber(&qos).unwrap();
143  /// #
144  /// # // NoKey is important
145  /// # let topic = domain_participant.create_topic("some_topic".to_string(), "SomeType".to_string(), &qos, TopicKind::NoKey).unwrap();
146  /// #
147  /// # #[derive(Serialize, Deserialize)]
148  /// # struct SomeType {}
149  /// #
150  /// let mut data_reader = subscriber.create_datareader_no_key::<SomeType, CDRDeserializerAdapter<_>>(&topic, None).unwrap();
151  /// let data = data_reader.take(10, ReadCondition::not_read());
152  /// ```
153  pub fn take(
154    &mut self,
155    max_samples: usize,
156    read_condition: ReadCondition,
157  ) -> ReadResult<Vec<DataSample<D>>> {
158    let values: Vec<WithKeyDataSample<NoKeyWrapper<D>>> =
159      self.keyed_datareader.take(max_samples, read_condition)?;
160    let mut result = Vec::with_capacity(values.len());
161    for ks in values {
162      if let Some(s) = DataSample::<D>::from_with_key(ks) {
163        result.push(s);
164      }
165    }
166    Ok(result)
167  }
168
169  /// Reads next unread sample
170  ///
171  /// # Examples
172  ///
173  /// ```
174  /// # use serde::{Serialize, Deserialize};
175  /// # use rustdds::*;
176  /// # use rustdds::no_key::DataReader;
177  /// # use rustdds::serialization::CDRDeserializerAdapter;
178  /// #
179  /// # let domain_participant = DomainParticipant::new(0).unwrap();
180  /// # let qos = QosPolicyBuilder::new().build();
181  /// # let subscriber = domain_participant.create_subscriber(&qos).unwrap();
182  /// #
183  /// # // NoKey is important
184  /// # let topic = domain_participant.create_topic("some_topic".to_string(), "SomeType".to_string(), &qos, TopicKind::NoKey).unwrap();
185  /// #
186  /// # #[derive(Serialize, Deserialize)]
187  /// # struct SomeType {}
188  /// #
189  /// let mut data_reader = subscriber.create_datareader_no_key::<SomeType, CDRDeserializerAdapter<_>>(&topic, None).unwrap();
190  /// while let Ok(Some(data)) = data_reader.read_next_sample() {
191  ///   // Do something
192  /// }
193  /// ```
194  pub fn read_next_sample(&mut self) -> ReadResult<Option<DataSample<&D>>> {
195    let mut ds = self.read(1, ReadCondition::not_read())?;
196    Ok(ds.pop())
197  }
198
199  /// Takes next unread sample
200  ///
201  /// # Examples
202  ///
203  /// ```
204  /// # use serde::{Serialize, Deserialize};
205  /// # use rustdds::*;
206  /// # use rustdds::no_key::DataReader;
207  /// # use rustdds::serialization::CDRDeserializerAdapter;
208  /// #
209  /// # let domain_participant = DomainParticipant::new(0).unwrap();
210  /// # let qos = QosPolicyBuilder::new().build();
211  /// # let subscriber = domain_participant.create_subscriber(&qos).unwrap();
212  /// #
213  /// # // NoKey is important
214  /// # let topic = domain_participant.create_topic("some_topic".to_string(), "SomeType".to_string(), &qos, TopicKind::NoKey).unwrap();
215  /// #
216  /// # #[derive(Serialize, Deserialize)]
217  /// # struct SomeType {}
218  /// #
219  /// let mut data_reader = subscriber.create_datareader_no_key::<SomeType, CDRDeserializerAdapter<_>>(&topic, None).unwrap();
220  /// while let Ok(Some(data)) = data_reader.take_next_sample() {
221  ///   // Do something
222  /// }
223  /// ```
224  pub fn take_next_sample(&mut self) -> ReadResult<Option<DataSample<D>>> {
225    let mut ds = self.take(1, ReadCondition::not_read())?;
226    Ok(ds.pop())
227  }
228
229  // Iterator interface
230
231  /// Produces an iterator over the currently available NOT_READ samples.
232  /// Yields only payload data, not SampleInfo metadata
233  /// This is not called `iter()` because it takes a mutable reference to self.
234  ///
235  /// # Examples
236  ///
237  /// ```
238  /// # use serde::{Serialize, Deserialize};
239  /// # use rustdds::*;
240  /// # use rustdds::no_key::DataReader;
241  /// # use rustdds::serialization::CDRDeserializerAdapter;
242  /// #
243  /// # let domain_participant = DomainParticipant::new(0).unwrap();
244  /// # let qos = QosPolicyBuilder::new().build();
245  /// # let subscriber = domain_participant.create_subscriber(&qos).unwrap();
246  /// #
247  /// # // NoKey is important
248  /// # let topic = domain_participant.create_topic("some_topic".to_string(), "SomeType".to_string(), &qos, TopicKind::NoKey).unwrap();
249  /// #
250  /// # #[derive(Serialize, Deserialize)]
251  /// # struct SomeType {}
252  /// #
253  /// let mut data_reader = subscriber.create_datareader_no_key::<SomeType, CDRDeserializerAdapter<_>>(&topic, None).unwrap();
254  /// for data in data_reader.iterator() {
255  ///   // Do something
256  /// }
257  /// ```
258  pub fn iterator(&mut self) -> ReadResult<impl Iterator<Item = &D>> {
259    // TODO: We could come up with a more efficient implementation than wrapping a
260    // read call
261    Ok(
262      self
263        .read(usize::MAX, ReadCondition::not_read())?
264        .into_iter()
265        .map(|ds| ds.value),
266    )
267  }
268
269  /// Produces an iterator over the samples filtered by given condition.
270  /// Yields only payload data, not SampleInfo metadata
271  ///
272  /// # Examples
273  ///
274  /// ```
275  /// # use serde::{Serialize, Deserialize};
276  /// # use rustdds::*;
277  /// # use rustdds::no_key::DataReader;
278  /// # use rustdds::serialization::CDRDeserializerAdapter;
279  /// #
280  /// # let domain_participant = DomainParticipant::new(0).unwrap();
281  /// # let qos = QosPolicyBuilder::new().build();
282  /// # let subscriber = domain_participant.create_subscriber(&qos).unwrap();
283  /// #
284  /// # // NoKey is important
285  /// # let topic = domain_participant.create_topic("some_topic".to_string(), "SomeType".to_string(), &qos, TopicKind::NoKey).unwrap();
286  /// #
287  /// # #[derive(Serialize, Deserialize)]
288  /// # struct SomeType {}
289  /// #
290  /// let mut data_reader = subscriber.create_datareader_no_key::<SomeType, CDRDeserializerAdapter<_>>(&topic, None).unwrap();
291  /// for data in data_reader.conditional_iterator(ReadCondition::any()) {
292  ///   // Do something
293  /// }
294  /// ```
295  pub fn conditional_iterator(
296    &mut self,
297    read_condition: ReadCondition,
298  ) -> ReadResult<impl Iterator<Item = &D>> {
299    // TODO: We could come up with a more efficient implementation than wrapping a
300    // read call
301    Ok(
302      self
303        .read(usize::MAX, read_condition)?
304        .into_iter()
305        .map(|ds| ds.value),
306    )
307  }
308
309  /// Produces an iterator over the currently available NOT_READ samples.
310  /// Yields only payload data, not SampleInfo metadata
311  /// Removes samples from `DataReader`.
312  /// <strong>Note!</strong> If the iterator is only partially consumed, all the
313  /// samples it could have provided are still removed from the `Datareader`.
314  ///
315  /// # Examples
316  ///
317  /// ```
318  /// # use serde::{Serialize, Deserialize};
319  /// # use rustdds::*;
320  /// # use rustdds::no_key::DataReader;
321  /// # use rustdds::serialization::CDRDeserializerAdapter;
322  /// #
323  /// # let domain_participant = DomainParticipant::new(0).unwrap();
324  /// # let qos = QosPolicyBuilder::new().build();
325  /// # let subscriber = domain_participant.create_subscriber(&qos).unwrap();
326  /// #
327  /// # // NoKey is important
328  /// # let topic = domain_participant.create_topic("some_topic".to_string(), "SomeType".to_string(), &qos, TopicKind::NoKey).unwrap();
329  /// #
330  /// # #[derive(Serialize, Deserialize)]
331  /// # struct SomeType {}
332  /// #
333  /// let mut data_reader = subscriber.create_datareader_no_key::<SomeType, CDRDeserializerAdapter<_>>(&topic, None).unwrap();
334  /// for data in data_reader.into_iterator() {
335  ///   // Do something
336  /// }
337  /// ```
338  pub fn into_iterator(&mut self) -> ReadResult<impl Iterator<Item = D>> {
339    // TODO: We could come up with a more efficient implementation than wrapping a
340    // read call
341    Ok(
342      self
343        .take(usize::MAX, ReadCondition::not_read())?
344        .into_iter()
345        .map(|ds| ds.value),
346    )
347  }
348
349  /// Produces an iterator over the samples filtered by given condition.
350  /// Yields only payload data, not SampleInfo metadata
351  /// <strong>Note!</strong> If the iterator is only partially consumed, all the
352  /// samples it could have provided are still removed from the `Datareader`.
353  ///
354  /// # Examples
355  ///
356  /// ```
357  /// # use serde::{Serialize, Deserialize};
358  /// # use rustdds::*;
359  /// # use rustdds::no_key::DataReader;
360  /// # use rustdds::serialization::CDRDeserializerAdapter;
361  /// #
362  /// # let domain_participant = DomainParticipant::new(0).unwrap();
363  /// # let qos = QosPolicyBuilder::new().build();
364  /// # let subscriber = domain_participant.create_subscriber(&qos).unwrap();
365  /// #
366  /// # // NoKey is important
367  /// # let topic = domain_participant.create_topic("some_topic".to_string(), "SomeType".to_string(), &qos, TopicKind::NoKey).unwrap();
368  /// #
369  /// # #[derive(Serialize, Deserialize)]
370  /// # struct SomeType {}
371  /// #
372  /// let mut data_reader = subscriber.create_datareader_no_key::<SomeType, CDRDeserializerAdapter<_>>(&topic, None).unwrap();
373  /// for data in data_reader.into_conditional_iterator(ReadCondition::any()) {
374  ///   // Do something
375  /// }
376  /// ```
377  pub fn into_conditional_iterator(
378    &mut self,
379    read_condition: ReadCondition,
380  ) -> ReadResult<impl Iterator<Item = D>> {
381    // TODO: We could come up with a more efficient implementation than wrapping a
382    // read call
383    Ok(
384      self
385        .take(usize::MAX, read_condition)?
386        .into_iter()
387        .map(|ds| ds.value),
388    )
389  }
390  /*
391  /// Gets latest RequestedDeadlineMissed status
392  ///
393  /// # Examples
394  ///
395  /// ```
396  /// # use serde::{Serialize, Deserialize};
397  /// # use rustdds::*;
398  /// # use rustdds::no_key::DataReader;
399  /// # use rustdds::serialization::CDRDeserializerAdapter;
400  /// #
401  /// # let domain_participant = DomainParticipant::new(0).unwrap();
402  /// # let qos = QosPolicyBuilder::new().build();
403  /// # let subscriber = domain_participant.create_subscriber(&qos).unwrap();
404  /// #
405  /// # // NoKey is important
406  /// # let topic = domain_participant.create_topic("some_topic".to_string(), "SomeType".to_string(), &qos, TopicKind::NoKey).unwrap();
407  /// #
408  /// # #[derive(Serialize, Deserialize)]
409  /// # struct SomeType {}
410  /// #
411  /// let mut data_reader = subscriber.create_datareader_no_key::<SomeType, CDRDeserializerAdapter<_>>(topic, None).unwrap();
412  /// if let Ok(Some(status)) = data_reader.get_requested_deadline_missed_status() {
413  ///   // Do something
414  /// }
415  /// ```
416
417
418  pub fn get_requested_deadline_missed_status(
419    &mut self,
420  ) -> ReadResult<Option<RequestedDeadlineMissedStatus>> {
421    self.keyed_datareader.get_requested_deadline_missed_status()
422  }
423  */
424
425  /// An async stream for reading the (bare) data samples
426  pub fn async_bare_sample_stream(self) -> BareDataReaderStream<D, DA> {
427    BareDataReaderStream {
428      keyed_stream: self.keyed_datareader.async_bare_sample_stream(),
429    }
430  }
431
432  /// An async stream for reading the data samples
433  pub fn async_sample_stream(self) -> DataReaderStream<D, DA> {
434    DataReaderStream {
435      keyed_stream: self.keyed_datareader.async_sample_stream(),
436    }
437  }
438}
439
440/// WARNING! UNTESTED
441//  TODO: test
442// This is  not part of DDS spec. We implement mio mio_06::Evented so that the
443// application can asynchronously poll DataReader(s).
444impl<D, DA> mio_06::Evented for DataReader<D, DA>
445where
446  DA: DeserializerAdapter<D>,
447{
448  // We just delegate all the operations to notification_receiver, since it
449  // already implements mio_06::Evented
450  fn register(
451    &self,
452    poll: &mio_06::Poll,
453    token: mio_06::Token,
454    interest: mio_06::Ready,
455    opts: mio_06::PollOpt,
456  ) -> io::Result<()> {
457    self.keyed_datareader.register(poll, token, interest, opts)
458  }
459
460  fn reregister(
461    &self,
462    poll: &mio_06::Poll,
463    token: mio_06::Token,
464    interest: mio_06::Ready,
465    opts: mio_06::PollOpt,
466  ) -> io::Result<()> {
467    self
468      .keyed_datareader
469      .reregister(poll, token, interest, opts)
470  }
471
472  fn deregister(&self, poll: &mio_06::Poll) -> io::Result<()> {
473    self.keyed_datareader.deregister(poll)
474  }
475}
476
477/// WARNING! UNTESTED
478//  TODO: test
479impl<D, DA> mio_08::event::Source for DataReader<D, DA>
480where
481  DA: DeserializerAdapter<D>,
482{
483  fn register(
484    &mut self,
485    registry: &mio_08::Registry,
486    token: mio_08::Token,
487    interests: mio_08::Interest,
488  ) -> io::Result<()> {
489    // with_key::DataReader implements .register() for two traits, so need to
490    // use disambiguation syntax to call .register() here.
491    <WithKeyDataReader<NoKeyWrapper<D>, DAWrapper<DA>> as mio_08::event::Source>::register(
492      &mut self.keyed_datareader,
493      registry,
494      token,
495      interests,
496    )
497  }
498
499  fn reregister(
500    &mut self,
501    registry: &mio_08::Registry,
502    token: mio_08::Token,
503    interests: mio_08::Interest,
504  ) -> io::Result<()> {
505    <WithKeyDataReader<NoKeyWrapper<D>, DAWrapper<DA>> as mio_08::event::Source>::reregister(
506      &mut self.keyed_datareader,
507      registry,
508      token,
509      interests,
510    )
511  }
512
513  fn deregister(&mut self, registry: &mio_08::Registry) -> io::Result<()> {
514    <WithKeyDataReader<NoKeyWrapper<D>, DAWrapper<DA>> as mio_08::event::Source>::deregister(
515      &mut self.keyed_datareader,
516      registry,
517    )
518  }
519}
520
521/// WARNING! UNTESTED
522//  TODO: test
523use crate::no_key::SimpleDataReaderEventStream;
524
525impl<'a, D, DA> StatusEvented<'a, DataReaderStatus, SimpleDataReaderEventStream<'a, D, DA>>
526  for DataReader<D, DA>
527where
528  D: 'static,
529  DA: DeserializerAdapter<D>,
530{
531  fn as_status_evented(&mut self) -> &dyn mio_06::Evented {
532    self.keyed_datareader.as_status_evented()
533  }
534
535  fn as_status_source(&mut self) -> &mut dyn mio_08::event::Source {
536    self.keyed_datareader.as_status_source()
537  }
538
539  fn as_async_status_stream(&'a self) -> SimpleDataReaderEventStream<'a, D, DA> {
540    SimpleDataReaderEventStream::from_keyed(self.keyed_datareader.as_async_status_stream())
541  }
542
543  fn try_recv_status(&self) -> Option<DataReaderStatus> {
544    self.keyed_datareader.try_recv_status()
545  }
546}
547
548impl<D, DA> HasQoSPolicy for DataReader<D, DA>
549where
550  D: 'static,
551  DA: DeserializerAdapter<D>,
552{
553  fn qos(&self) -> QosPolicies {
554    self.keyed_datareader.qos()
555  }
556}
557
558impl<D, DA> RTPSEntity for DataReader<D, DA>
559where
560  D: 'static,
561  DA: DeserializerAdapter<D>,
562{
563  fn guid(&self) -> GUID {
564    self.keyed_datareader.guid()
565  }
566}
567
568// ----------------------------------------------
569// ----------------------------------------------
570
571// Async interface for the (bare) DataReader
572
573/// Wraps [`with_key::BareDataReaderStream`](crate::with_key::BareDataReaderStream) and
574/// unwraps [`Sample`](crate::with_key::Sample) and `NoKeyWrapper` on
575/// `poll_next`.
576pub struct BareDataReaderStream<
577  D: 'static,
578  DA: DeserializerAdapter<D> + 'static = CDRDeserializerAdapter<D>,
579> {
580  keyed_stream: WithKeyBareDataReaderStream<NoKeyWrapper<D>, DAWrapper<DA>>,
581}
582
583impl<D, DA> BareDataReaderStream<D, DA>
584where
585  D: 'static,
586  DA: DeserializerAdapter<D>,
587{
588  pub fn async_event_stream(&self) -> DataReaderEventStream<D, DA> {
589    DataReaderEventStream {
590      keyed_stream: self.keyed_stream.async_event_stream(),
591    }
592  }
593}
594
595// https://users.rust-lang.org/t/take-in-impl-future-cannot-borrow-data-in-a-dereference-of-pin/52042
596impl<D, DA> Unpin for BareDataReaderStream<D, DA>
597where
598  D: 'static,
599  DA: DeserializerAdapter<D>,
600{
601}
602
603impl<D, DA> Stream for BareDataReaderStream<D, DA>
604where
605  D: 'static,
606  DA: DefaultDecoder<D>,
607{
608  type Item = ReadResult<D>;
609
610  fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
611    let mut keyed_stream = Pin::new(&mut Pin::into_inner(self).keyed_stream);
612    loop {
613      match keyed_stream.as_mut().poll_next(cx) {
614        Poll::Ready(Some(Err(e))) => break Poll::Ready(Some(Err(e))),
615        Poll::Ready(Some(Ok(Sample::Value(d)))) => break Poll::Ready(Some(Ok(d.d))), /* Unwraps Sample and NoKeyWrapper */
616        // Disposed data is ignored. However, we just received a `Poll::Ready(..)`,
617        // which means we cannot return `Poll::Pending`, because the Ready result
618        // has not left a waker behind to wake us up. Therefore, we need to loop
619        // and try again until we get a returnable result or Pending.
620        Poll::Ready(Some(Ok(Sample::Dispose(_)))) => (), // continue looping
621        Poll::Ready(None) => break Poll::Ready(None),    // This should never happen
622        Poll::Pending => break Poll::Pending,
623      }
624    } // loop
625  }
626}
627
628impl<D, DA> FusedStream for BareDataReaderStream<D, DA>
629where
630  D: 'static,
631  DA: DefaultDecoder<D>,
632{
633  fn is_terminated(&self) -> bool {
634    false // Never terminate. This means it is always valid to call poll_next().
635  }
636}
637
638// Async interface for the (non-bare) DataReader
639
640/// Wraps [`with_key::DataReaderStream`](crate::with_key::DataReaderStream) and
641/// unwraps [`Sample`](crate::with_key::Sample) and `NoKeyWrapper` on
642/// `poll_next`.
643pub struct DataReaderStream<
644  D: 'static,
645  DA: DeserializerAdapter<D> + 'static = CDRDeserializerAdapter<D>,
646> {
647  keyed_stream: WithKeyDataReaderStream<NoKeyWrapper<D>, DAWrapper<DA>>,
648}
649
650impl<D, DA> DataReaderStream<D, DA>
651where
652  D: 'static,
653  DA: DeserializerAdapter<D>,
654{
655  pub fn async_event_stream(&self) -> DataReaderEventStream<D, DA> {
656    DataReaderEventStream {
657      keyed_stream: self.keyed_stream.async_event_stream(),
658    }
659  }
660}
661
662// https://users.rust-lang.org/t/take-in-impl-future-cannot-borrow-data-in-a-dereference-of-pin/52042
663impl<D, DA> Unpin for DataReaderStream<D, DA>
664where
665  D: 'static,
666  DA: DeserializerAdapter<D>,
667{
668}
669
670impl<D, DA> Stream for DataReaderStream<D, DA>
671where
672  D: 'static,
673  DA: DefaultDecoder<D>,
674{
675  type Item = ReadResult<DataSample<D>>;
676
677  fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
678    let mut keyed_stream = Pin::new(&mut Pin::into_inner(self).keyed_stream);
679    loop {
680      match keyed_stream.as_mut().poll_next(cx) {
681        Poll::Ready(Some(Err(e))) => break Poll::Ready(Some(Err(e))),
682        Poll::Ready(Some(Ok(d))) => match d.value() {
683          Sample::Value(_) => match DataSample::<D>::from_with_key(d) {
684            Some(d) => break Poll::Ready(Some(Ok(d))),
685            None => break Poll::Ready(None), // This should never happen
686          },
687          // Disposed data is ignored. However, we just received a `Poll::Ready(..)`,
688          // which means we cannot return `Poll::Pending`, because the Ready result
689          // has not left a waker behind to wake us up. Therefore, we need to loop
690          // and try again until we get a returnable result or Pending.
691          Sample::Dispose(_) => (),
692        },
693        Poll::Ready(None) => break Poll::Ready(None), // This should never happen
694        Poll::Pending => break Poll::Pending,
695      }
696    } // loop
697  }
698}
699
700impl<D, DA> FusedStream for DataReaderStream<D, DA>
701where
702  D: 'static,
703  DA: DefaultDecoder<D>,
704{
705  fn is_terminated(&self) -> bool {
706    false // Never terminate. This means it is always valid to call poll_next().
707  }
708}
709
710// ----------------------------------------------------------------------------------------------------
711// ----------------------------------------------------------------------------------------------------
712
713/// Wraps [`with_key::DataReaderEventStream`](crate::with_key::DataReaderEventStream).
714pub struct DataReaderEventStream<
715  D: 'static,
716  DA: DeserializerAdapter<D> + 'static = CDRDeserializerAdapter<D>,
717> {
718  keyed_stream: WithKeyDataReaderEventStream<NoKeyWrapper<D>, DAWrapper<DA>>,
719}
720
721impl<D, DA> Stream for DataReaderEventStream<D, DA>
722where
723  D: 'static,
724  DA: DeserializerAdapter<D>,
725{
726  type Item = DataReaderStatus;
727
728  fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
729    Pin::new(&mut Pin::into_inner(self).keyed_stream).poll_next(cx)
730  }
731}
732
733impl<D, DA> FusedStream for DataReaderEventStream<D, DA>
734where
735  D: 'static,
736  DA: DeserializerAdapter<D>,
737{
738  fn is_terminated(&self) -> bool {
739    false // Never terminate. This means it is always valid to call poll_next().
740  }
741}