rustdds/dds/no_key/datareader.rs
1use std::{
2 io,
3 pin::Pin,
4 task::{Context, Poll},
5};
6
7use futures::stream::{FusedStream, Stream};
8
9use crate::{
10 dds::{
11 adapters::no_key::{DefaultDecoder, DeserializerAdapter},
12 no_key::datasample::DataSample,
13 qos::{HasQoSPolicy, QosPolicies},
14 readcondition::ReadCondition,
15 result::ReadResult,
16 statusevents::DataReaderStatus,
17 with_key::{
18 datareader as datareader_with_key,
19 datasample::{DataSample as WithKeyDataSample, Sample},
20 BareDataReaderStream as WithKeyBareDataReaderStream, DataReader as WithKeyDataReader,
21 DataReaderEventStream as WithKeyDataReaderEventStream,
22 DataReaderStream as WithKeyDataReaderStream,
23 },
24 },
25 serialization::CDRDeserializerAdapter,
26 structure::entity::RTPSEntity,
27 StatusEvented, GUID,
28};
29use super::wrappers::{DAWrapper, NoKeyWrapper};
30
31/// Simplified type for CDR encoding
32pub type DataReaderCdr<D> = DataReader<D, CDRDeserializerAdapter<D>>;
33
34// ----------------------------------------------------
35
36// DataReader for NO_KEY data. Does not require "D: Keyed"
37/// DDS DataReader for no key topics.
38/// # Examples
39///
40/// ```
41/// use serde::{Serialize, Deserialize};
42/// use rustdds::*;
43/// use rustdds::no_key::DataReader;
44/// use rustdds::serialization::CDRDeserializerAdapter;
45///
46/// let domain_participant = DomainParticipant::new(0).unwrap();
47/// let qos = QosPolicyBuilder::new().build();
48/// let subscriber = domain_participant.create_subscriber(&qos).unwrap();
49///
50/// #[derive(Serialize, Deserialize)]
51/// struct SomeType {}
52///
53/// // NoKey is important
54/// let topic = domain_participant.create_topic("some_topic".to_string(), "SomeType".to_string(), &qos, TopicKind::NoKey).unwrap();
55/// let data_reader = subscriber.create_datareader_no_key::<SomeType, CDRDeserializerAdapter<_>>(&topic, None);
56/// ```
57pub struct DataReader<D, DA: DeserializerAdapter<D> = CDRDeserializerAdapter<D>> {
58 keyed_datareader: datareader_with_key::DataReader<NoKeyWrapper<D>, DAWrapper<DA>>,
59}
60
61// TODO: rewrite DataSample so it can use current Keyed version (and send back
62// datasamples instead of current data)
63impl<D: 'static, DA> DataReader<D, DA>
64where
65 DA: DeserializerAdapter<D>,
66{
67 pub(crate) fn from_keyed(
68 keyed: datareader_with_key::DataReader<NoKeyWrapper<D>, DAWrapper<DA>>,
69 ) -> Self {
70 Self {
71 keyed_datareader: keyed,
72 }
73 }
74}
75
76impl<D: 'static, DA> DataReader<D, DA>
77where
78 DA: DefaultDecoder<D>,
79{
80 /// Reads amount of samples found with `max_samples` and `read_condition`
81 /// parameters.
82 ///
83 /// # Arguments
84 ///
85 /// * `max_samples` - Limits maximum amount of samples read
86 /// * `read_condition` - Limits results by condition
87 ///
88 /// # Examples
89 ///
90 /// ```
91 /// # use serde::{Serialize, Deserialize};
92 /// # use rustdds::*;
93 /// # use rustdds::serialization::CDRDeserializerAdapter;
94 ///
95 /// # let domain_participant = DomainParticipant::new(0).unwrap();
96 /// # let qos = QosPolicyBuilder::new().build();
97 /// # let subscriber = domain_participant.create_subscriber(&qos).unwrap();
98 /// #
99 /// # // NoKey is important
100 /// # let topic = domain_participant.create_topic("some_topic".to_string(), "SomeType".to_string(), &qos, TopicKind::NoKey).unwrap();
101 /// #
102 /// # #[derive(Serialize, Deserialize)]
103 /// # struct SomeType {}
104 /// #
105 /// let mut data_reader = subscriber.create_datareader_no_key::<SomeType, CDRDeserializerAdapter<_>>(&topic, None).unwrap();
106 /// let data = data_reader.read(10, ReadCondition::not_read());
107 /// ```
108 pub fn read(
109 &mut self,
110 max_samples: usize,
111 read_condition: ReadCondition,
112 ) -> ReadResult<Vec<DataSample<&D>>> {
113 let values: Vec<WithKeyDataSample<&NoKeyWrapper<D>>> =
114 self.keyed_datareader.read(max_samples, read_condition)?;
115 let mut result = Vec::with_capacity(values.len());
116 for ks in values {
117 if let Some(s) = DataSample::<D>::from_with_key_ref(ks) {
118 result.push(s);
119 }
120 }
121 Ok(result)
122 }
123
124 /// Takes amount of sample found with `max_samples` and `read_condition`
125 /// parameters.
126 ///
127 /// # Arguments
128 ///
129 /// * `max_samples` - Limits maximum amount of samples read
130 /// * `read_condition` - Limits results by condition
131 ///
132 /// # Examples
133 ///
134 /// ```
135 /// # use serde::{Serialize, Deserialize};
136 /// # use rustdds::*;
137 /// # use rustdds::no_key::DataReader;
138 /// # use rustdds::serialization::CDRDeserializerAdapter;
139 /// #
140 /// # let domain_participant = DomainParticipant::new(0).unwrap();
141 /// # let qos = QosPolicyBuilder::new().build();
142 /// # let subscriber = domain_participant.create_subscriber(&qos).unwrap();
143 /// #
144 /// # // NoKey is important
145 /// # let topic = domain_participant.create_topic("some_topic".to_string(), "SomeType".to_string(), &qos, TopicKind::NoKey).unwrap();
146 /// #
147 /// # #[derive(Serialize, Deserialize)]
148 /// # struct SomeType {}
149 /// #
150 /// let mut data_reader = subscriber.create_datareader_no_key::<SomeType, CDRDeserializerAdapter<_>>(&topic, None).unwrap();
151 /// let data = data_reader.take(10, ReadCondition::not_read());
152 /// ```
153 pub fn take(
154 &mut self,
155 max_samples: usize,
156 read_condition: ReadCondition,
157 ) -> ReadResult<Vec<DataSample<D>>> {
158 let values: Vec<WithKeyDataSample<NoKeyWrapper<D>>> =
159 self.keyed_datareader.take(max_samples, read_condition)?;
160 let mut result = Vec::with_capacity(values.len());
161 for ks in values {
162 if let Some(s) = DataSample::<D>::from_with_key(ks) {
163 result.push(s);
164 }
165 }
166 Ok(result)
167 }
168
169 /// Reads next unread sample
170 ///
171 /// # Examples
172 ///
173 /// ```
174 /// # use serde::{Serialize, Deserialize};
175 /// # use rustdds::*;
176 /// # use rustdds::no_key::DataReader;
177 /// # use rustdds::serialization::CDRDeserializerAdapter;
178 /// #
179 /// # let domain_participant = DomainParticipant::new(0).unwrap();
180 /// # let qos = QosPolicyBuilder::new().build();
181 /// # let subscriber = domain_participant.create_subscriber(&qos).unwrap();
182 /// #
183 /// # // NoKey is important
184 /// # let topic = domain_participant.create_topic("some_topic".to_string(), "SomeType".to_string(), &qos, TopicKind::NoKey).unwrap();
185 /// #
186 /// # #[derive(Serialize, Deserialize)]
187 /// # struct SomeType {}
188 /// #
189 /// let mut data_reader = subscriber.create_datareader_no_key::<SomeType, CDRDeserializerAdapter<_>>(&topic, None).unwrap();
190 /// while let Ok(Some(data)) = data_reader.read_next_sample() {
191 /// // Do something
192 /// }
193 /// ```
194 pub fn read_next_sample(&mut self) -> ReadResult<Option<DataSample<&D>>> {
195 let mut ds = self.read(1, ReadCondition::not_read())?;
196 Ok(ds.pop())
197 }
198
199 /// Takes next unread sample
200 ///
201 /// # Examples
202 ///
203 /// ```
204 /// # use serde::{Serialize, Deserialize};
205 /// # use rustdds::*;
206 /// # use rustdds::no_key::DataReader;
207 /// # use rustdds::serialization::CDRDeserializerAdapter;
208 /// #
209 /// # let domain_participant = DomainParticipant::new(0).unwrap();
210 /// # let qos = QosPolicyBuilder::new().build();
211 /// # let subscriber = domain_participant.create_subscriber(&qos).unwrap();
212 /// #
213 /// # // NoKey is important
214 /// # let topic = domain_participant.create_topic("some_topic".to_string(), "SomeType".to_string(), &qos, TopicKind::NoKey).unwrap();
215 /// #
216 /// # #[derive(Serialize, Deserialize)]
217 /// # struct SomeType {}
218 /// #
219 /// let mut data_reader = subscriber.create_datareader_no_key::<SomeType, CDRDeserializerAdapter<_>>(&topic, None).unwrap();
220 /// while let Ok(Some(data)) = data_reader.take_next_sample() {
221 /// // Do something
222 /// }
223 /// ```
224 pub fn take_next_sample(&mut self) -> ReadResult<Option<DataSample<D>>> {
225 let mut ds = self.take(1, ReadCondition::not_read())?;
226 Ok(ds.pop())
227 }
228
229 // Iterator interface
230
231 /// Produces an iterator over the currently available NOT_READ samples.
232 /// Yields only payload data, not SampleInfo metadata
233 /// This is not called `iter()` because it takes a mutable reference to self.
234 ///
235 /// # Examples
236 ///
237 /// ```
238 /// # use serde::{Serialize, Deserialize};
239 /// # use rustdds::*;
240 /// # use rustdds::no_key::DataReader;
241 /// # use rustdds::serialization::CDRDeserializerAdapter;
242 /// #
243 /// # let domain_participant = DomainParticipant::new(0).unwrap();
244 /// # let qos = QosPolicyBuilder::new().build();
245 /// # let subscriber = domain_participant.create_subscriber(&qos).unwrap();
246 /// #
247 /// # // NoKey is important
248 /// # let topic = domain_participant.create_topic("some_topic".to_string(), "SomeType".to_string(), &qos, TopicKind::NoKey).unwrap();
249 /// #
250 /// # #[derive(Serialize, Deserialize)]
251 /// # struct SomeType {}
252 /// #
253 /// let mut data_reader = subscriber.create_datareader_no_key::<SomeType, CDRDeserializerAdapter<_>>(&topic, None).unwrap();
254 /// for data in data_reader.iterator() {
255 /// // Do something
256 /// }
257 /// ```
258 pub fn iterator(&mut self) -> ReadResult<impl Iterator<Item = &D>> {
259 // TODO: We could come up with a more efficient implementation than wrapping a
260 // read call
261 Ok(
262 self
263 .read(usize::MAX, ReadCondition::not_read())?
264 .into_iter()
265 .map(|ds| ds.value),
266 )
267 }
268
269 /// Produces an iterator over the samples filtered by given condition.
270 /// Yields only payload data, not SampleInfo metadata
271 ///
272 /// # Examples
273 ///
274 /// ```
275 /// # use serde::{Serialize, Deserialize};
276 /// # use rustdds::*;
277 /// # use rustdds::no_key::DataReader;
278 /// # use rustdds::serialization::CDRDeserializerAdapter;
279 /// #
280 /// # let domain_participant = DomainParticipant::new(0).unwrap();
281 /// # let qos = QosPolicyBuilder::new().build();
282 /// # let subscriber = domain_participant.create_subscriber(&qos).unwrap();
283 /// #
284 /// # // NoKey is important
285 /// # let topic = domain_participant.create_topic("some_topic".to_string(), "SomeType".to_string(), &qos, TopicKind::NoKey).unwrap();
286 /// #
287 /// # #[derive(Serialize, Deserialize)]
288 /// # struct SomeType {}
289 /// #
290 /// let mut data_reader = subscriber.create_datareader_no_key::<SomeType, CDRDeserializerAdapter<_>>(&topic, None).unwrap();
291 /// for data in data_reader.conditional_iterator(ReadCondition::any()) {
292 /// // Do something
293 /// }
294 /// ```
295 pub fn conditional_iterator(
296 &mut self,
297 read_condition: ReadCondition,
298 ) -> ReadResult<impl Iterator<Item = &D>> {
299 // TODO: We could come up with a more efficient implementation than wrapping a
300 // read call
301 Ok(
302 self
303 .read(usize::MAX, read_condition)?
304 .into_iter()
305 .map(|ds| ds.value),
306 )
307 }
308
309 /// Produces an iterator over the currently available NOT_READ samples.
310 /// Yields only payload data, not SampleInfo metadata
311 /// Removes samples from `DataReader`.
312 /// <strong>Note!</strong> If the iterator is only partially consumed, all the
313 /// samples it could have provided are still removed from the `Datareader`.
314 ///
315 /// # Examples
316 ///
317 /// ```
318 /// # use serde::{Serialize, Deserialize};
319 /// # use rustdds::*;
320 /// # use rustdds::no_key::DataReader;
321 /// # use rustdds::serialization::CDRDeserializerAdapter;
322 /// #
323 /// # let domain_participant = DomainParticipant::new(0).unwrap();
324 /// # let qos = QosPolicyBuilder::new().build();
325 /// # let subscriber = domain_participant.create_subscriber(&qos).unwrap();
326 /// #
327 /// # // NoKey is important
328 /// # let topic = domain_participant.create_topic("some_topic".to_string(), "SomeType".to_string(), &qos, TopicKind::NoKey).unwrap();
329 /// #
330 /// # #[derive(Serialize, Deserialize)]
331 /// # struct SomeType {}
332 /// #
333 /// let mut data_reader = subscriber.create_datareader_no_key::<SomeType, CDRDeserializerAdapter<_>>(&topic, None).unwrap();
334 /// for data in data_reader.into_iterator() {
335 /// // Do something
336 /// }
337 /// ```
338 pub fn into_iterator(&mut self) -> ReadResult<impl Iterator<Item = D>> {
339 // TODO: We could come up with a more efficient implementation than wrapping a
340 // read call
341 Ok(
342 self
343 .take(usize::MAX, ReadCondition::not_read())?
344 .into_iter()
345 .map(|ds| ds.value),
346 )
347 }
348
349 /// Produces an iterator over the samples filtered by given condition.
350 /// Yields only payload data, not SampleInfo metadata
351 /// <strong>Note!</strong> If the iterator is only partially consumed, all the
352 /// samples it could have provided are still removed from the `Datareader`.
353 ///
354 /// # Examples
355 ///
356 /// ```
357 /// # use serde::{Serialize, Deserialize};
358 /// # use rustdds::*;
359 /// # use rustdds::no_key::DataReader;
360 /// # use rustdds::serialization::CDRDeserializerAdapter;
361 /// #
362 /// # let domain_participant = DomainParticipant::new(0).unwrap();
363 /// # let qos = QosPolicyBuilder::new().build();
364 /// # let subscriber = domain_participant.create_subscriber(&qos).unwrap();
365 /// #
366 /// # // NoKey is important
367 /// # let topic = domain_participant.create_topic("some_topic".to_string(), "SomeType".to_string(), &qos, TopicKind::NoKey).unwrap();
368 /// #
369 /// # #[derive(Serialize, Deserialize)]
370 /// # struct SomeType {}
371 /// #
372 /// let mut data_reader = subscriber.create_datareader_no_key::<SomeType, CDRDeserializerAdapter<_>>(&topic, None).unwrap();
373 /// for data in data_reader.into_conditional_iterator(ReadCondition::any()) {
374 /// // Do something
375 /// }
376 /// ```
377 pub fn into_conditional_iterator(
378 &mut self,
379 read_condition: ReadCondition,
380 ) -> ReadResult<impl Iterator<Item = D>> {
381 // TODO: We could come up with a more efficient implementation than wrapping a
382 // read call
383 Ok(
384 self
385 .take(usize::MAX, read_condition)?
386 .into_iter()
387 .map(|ds| ds.value),
388 )
389 }
390 /*
391 /// Gets latest RequestedDeadlineMissed status
392 ///
393 /// # Examples
394 ///
395 /// ```
396 /// # use serde::{Serialize, Deserialize};
397 /// # use rustdds::*;
398 /// # use rustdds::no_key::DataReader;
399 /// # use rustdds::serialization::CDRDeserializerAdapter;
400 /// #
401 /// # let domain_participant = DomainParticipant::new(0).unwrap();
402 /// # let qos = QosPolicyBuilder::new().build();
403 /// # let subscriber = domain_participant.create_subscriber(&qos).unwrap();
404 /// #
405 /// # // NoKey is important
406 /// # let topic = domain_participant.create_topic("some_topic".to_string(), "SomeType".to_string(), &qos, TopicKind::NoKey).unwrap();
407 /// #
408 /// # #[derive(Serialize, Deserialize)]
409 /// # struct SomeType {}
410 /// #
411 /// let mut data_reader = subscriber.create_datareader_no_key::<SomeType, CDRDeserializerAdapter<_>>(topic, None).unwrap();
412 /// if let Ok(Some(status)) = data_reader.get_requested_deadline_missed_status() {
413 /// // Do something
414 /// }
415 /// ```
416
417
418 pub fn get_requested_deadline_missed_status(
419 &mut self,
420 ) -> ReadResult<Option<RequestedDeadlineMissedStatus>> {
421 self.keyed_datareader.get_requested_deadline_missed_status()
422 }
423 */
424
425 /// An async stream for reading the (bare) data samples
426 pub fn async_bare_sample_stream(self) -> BareDataReaderStream<D, DA> {
427 BareDataReaderStream {
428 keyed_stream: self.keyed_datareader.async_bare_sample_stream(),
429 }
430 }
431
432 /// An async stream for reading the data samples
433 pub fn async_sample_stream(self) -> DataReaderStream<D, DA> {
434 DataReaderStream {
435 keyed_stream: self.keyed_datareader.async_sample_stream(),
436 }
437 }
438}
439
440/// WARNING! UNTESTED
441// TODO: test
442// This is not part of DDS spec. We implement mio mio_06::Evented so that the
443// application can asynchronously poll DataReader(s).
444impl<D, DA> mio_06::Evented for DataReader<D, DA>
445where
446 DA: DeserializerAdapter<D>,
447{
448 // We just delegate all the operations to notification_receiver, since it
449 // already implements mio_06::Evented
450 fn register(
451 &self,
452 poll: &mio_06::Poll,
453 token: mio_06::Token,
454 interest: mio_06::Ready,
455 opts: mio_06::PollOpt,
456 ) -> io::Result<()> {
457 self.keyed_datareader.register(poll, token, interest, opts)
458 }
459
460 fn reregister(
461 &self,
462 poll: &mio_06::Poll,
463 token: mio_06::Token,
464 interest: mio_06::Ready,
465 opts: mio_06::PollOpt,
466 ) -> io::Result<()> {
467 self
468 .keyed_datareader
469 .reregister(poll, token, interest, opts)
470 }
471
472 fn deregister(&self, poll: &mio_06::Poll) -> io::Result<()> {
473 self.keyed_datareader.deregister(poll)
474 }
475}
476
477/// WARNING! UNTESTED
478// TODO: test
479impl<D, DA> mio_08::event::Source for DataReader<D, DA>
480where
481 DA: DeserializerAdapter<D>,
482{
483 fn register(
484 &mut self,
485 registry: &mio_08::Registry,
486 token: mio_08::Token,
487 interests: mio_08::Interest,
488 ) -> io::Result<()> {
489 // with_key::DataReader implements .register() for two traits, so need to
490 // use disambiguation syntax to call .register() here.
491 <WithKeyDataReader<NoKeyWrapper<D>, DAWrapper<DA>> as mio_08::event::Source>::register(
492 &mut self.keyed_datareader,
493 registry,
494 token,
495 interests,
496 )
497 }
498
499 fn reregister(
500 &mut self,
501 registry: &mio_08::Registry,
502 token: mio_08::Token,
503 interests: mio_08::Interest,
504 ) -> io::Result<()> {
505 <WithKeyDataReader<NoKeyWrapper<D>, DAWrapper<DA>> as mio_08::event::Source>::reregister(
506 &mut self.keyed_datareader,
507 registry,
508 token,
509 interests,
510 )
511 }
512
513 fn deregister(&mut self, registry: &mio_08::Registry) -> io::Result<()> {
514 <WithKeyDataReader<NoKeyWrapper<D>, DAWrapper<DA>> as mio_08::event::Source>::deregister(
515 &mut self.keyed_datareader,
516 registry,
517 )
518 }
519}
520
521/// WARNING! UNTESTED
522// TODO: test
523use crate::no_key::SimpleDataReaderEventStream;
524
525impl<'a, D, DA> StatusEvented<'a, DataReaderStatus, SimpleDataReaderEventStream<'a, D, DA>>
526 for DataReader<D, DA>
527where
528 D: 'static,
529 DA: DeserializerAdapter<D>,
530{
531 fn as_status_evented(&mut self) -> &dyn mio_06::Evented {
532 self.keyed_datareader.as_status_evented()
533 }
534
535 fn as_status_source(&mut self) -> &mut dyn mio_08::event::Source {
536 self.keyed_datareader.as_status_source()
537 }
538
539 fn as_async_status_stream(&'a self) -> SimpleDataReaderEventStream<'a, D, DA> {
540 SimpleDataReaderEventStream::from_keyed(self.keyed_datareader.as_async_status_stream())
541 }
542
543 fn try_recv_status(&self) -> Option<DataReaderStatus> {
544 self.keyed_datareader.try_recv_status()
545 }
546}
547
548impl<D, DA> HasQoSPolicy for DataReader<D, DA>
549where
550 D: 'static,
551 DA: DeserializerAdapter<D>,
552{
553 fn qos(&self) -> QosPolicies {
554 self.keyed_datareader.qos()
555 }
556}
557
558impl<D, DA> RTPSEntity for DataReader<D, DA>
559where
560 D: 'static,
561 DA: DeserializerAdapter<D>,
562{
563 fn guid(&self) -> GUID {
564 self.keyed_datareader.guid()
565 }
566}
567
568// ----------------------------------------------
569// ----------------------------------------------
570
571// Async interface for the (bare) DataReader
572
573/// Wraps [`with_key::BareDataReaderStream`](crate::with_key::BareDataReaderStream) and
574/// unwraps [`Sample`](crate::with_key::Sample) and `NoKeyWrapper` on
575/// `poll_next`.
576pub struct BareDataReaderStream<
577 D: 'static,
578 DA: DeserializerAdapter<D> + 'static = CDRDeserializerAdapter<D>,
579> {
580 keyed_stream: WithKeyBareDataReaderStream<NoKeyWrapper<D>, DAWrapper<DA>>,
581}
582
583impl<D, DA> BareDataReaderStream<D, DA>
584where
585 D: 'static,
586 DA: DeserializerAdapter<D>,
587{
588 pub fn async_event_stream(&self) -> DataReaderEventStream<D, DA> {
589 DataReaderEventStream {
590 keyed_stream: self.keyed_stream.async_event_stream(),
591 }
592 }
593}
594
595// https://users.rust-lang.org/t/take-in-impl-future-cannot-borrow-data-in-a-dereference-of-pin/52042
596impl<D, DA> Unpin for BareDataReaderStream<D, DA>
597where
598 D: 'static,
599 DA: DeserializerAdapter<D>,
600{
601}
602
603impl<D, DA> Stream for BareDataReaderStream<D, DA>
604where
605 D: 'static,
606 DA: DefaultDecoder<D>,
607{
608 type Item = ReadResult<D>;
609
610 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
611 let mut keyed_stream = Pin::new(&mut Pin::into_inner(self).keyed_stream);
612 loop {
613 match keyed_stream.as_mut().poll_next(cx) {
614 Poll::Ready(Some(Err(e))) => break Poll::Ready(Some(Err(e))),
615 Poll::Ready(Some(Ok(Sample::Value(d)))) => break Poll::Ready(Some(Ok(d.d))), /* Unwraps Sample and NoKeyWrapper */
616 // Disposed data is ignored. However, we just received a `Poll::Ready(..)`,
617 // which means we cannot return `Poll::Pending`, because the Ready result
618 // has not left a waker behind to wake us up. Therefore, we need to loop
619 // and try again until we get a returnable result or Pending.
620 Poll::Ready(Some(Ok(Sample::Dispose(_)))) => (), // continue looping
621 Poll::Ready(None) => break Poll::Ready(None), // This should never happen
622 Poll::Pending => break Poll::Pending,
623 }
624 } // loop
625 }
626}
627
628impl<D, DA> FusedStream for BareDataReaderStream<D, DA>
629where
630 D: 'static,
631 DA: DefaultDecoder<D>,
632{
633 fn is_terminated(&self) -> bool {
634 false // Never terminate. This means it is always valid to call poll_next().
635 }
636}
637
638// Async interface for the (non-bare) DataReader
639
640/// Wraps [`with_key::DataReaderStream`](crate::with_key::DataReaderStream) and
641/// unwraps [`Sample`](crate::with_key::Sample) and `NoKeyWrapper` on
642/// `poll_next`.
643pub struct DataReaderStream<
644 D: 'static,
645 DA: DeserializerAdapter<D> + 'static = CDRDeserializerAdapter<D>,
646> {
647 keyed_stream: WithKeyDataReaderStream<NoKeyWrapper<D>, DAWrapper<DA>>,
648}
649
650impl<D, DA> DataReaderStream<D, DA>
651where
652 D: 'static,
653 DA: DeserializerAdapter<D>,
654{
655 pub fn async_event_stream(&self) -> DataReaderEventStream<D, DA> {
656 DataReaderEventStream {
657 keyed_stream: self.keyed_stream.async_event_stream(),
658 }
659 }
660}
661
662// https://users.rust-lang.org/t/take-in-impl-future-cannot-borrow-data-in-a-dereference-of-pin/52042
663impl<D, DA> Unpin for DataReaderStream<D, DA>
664where
665 D: 'static,
666 DA: DeserializerAdapter<D>,
667{
668}
669
670impl<D, DA> Stream for DataReaderStream<D, DA>
671where
672 D: 'static,
673 DA: DefaultDecoder<D>,
674{
675 type Item = ReadResult<DataSample<D>>;
676
677 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
678 let mut keyed_stream = Pin::new(&mut Pin::into_inner(self).keyed_stream);
679 loop {
680 match keyed_stream.as_mut().poll_next(cx) {
681 Poll::Ready(Some(Err(e))) => break Poll::Ready(Some(Err(e))),
682 Poll::Ready(Some(Ok(d))) => match d.value() {
683 Sample::Value(_) => match DataSample::<D>::from_with_key(d) {
684 Some(d) => break Poll::Ready(Some(Ok(d))),
685 None => break Poll::Ready(None), // This should never happen
686 },
687 // Disposed data is ignored. However, we just received a `Poll::Ready(..)`,
688 // which means we cannot return `Poll::Pending`, because the Ready result
689 // has not left a waker behind to wake us up. Therefore, we need to loop
690 // and try again until we get a returnable result or Pending.
691 Sample::Dispose(_) => (),
692 },
693 Poll::Ready(None) => break Poll::Ready(None), // This should never happen
694 Poll::Pending => break Poll::Pending,
695 }
696 } // loop
697 }
698}
699
700impl<D, DA> FusedStream for DataReaderStream<D, DA>
701where
702 D: 'static,
703 DA: DefaultDecoder<D>,
704{
705 fn is_terminated(&self) -> bool {
706 false // Never terminate. This means it is always valid to call poll_next().
707 }
708}
709
710// ----------------------------------------------------------------------------------------------------
711// ----------------------------------------------------------------------------------------------------
712
713/// Wraps [`with_key::DataReaderEventStream`](crate::with_key::DataReaderEventStream).
714pub struct DataReaderEventStream<
715 D: 'static,
716 DA: DeserializerAdapter<D> + 'static = CDRDeserializerAdapter<D>,
717> {
718 keyed_stream: WithKeyDataReaderEventStream<NoKeyWrapper<D>, DAWrapper<DA>>,
719}
720
721impl<D, DA> Stream for DataReaderEventStream<D, DA>
722where
723 D: 'static,
724 DA: DeserializerAdapter<D>,
725{
726 type Item = DataReaderStatus;
727
728 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
729 Pin::new(&mut Pin::into_inner(self).keyed_stream).poll_next(cx)
730 }
731}
732
733impl<D, DA> FusedStream for DataReaderEventStream<D, DA>
734where
735 D: 'static,
736 DA: DeserializerAdapter<D>,
737{
738 fn is_terminated(&self) -> bool {
739 false // Never terminate. This means it is always valid to call poll_next().
740 }
741}