rustdds/dds/with_key/
datareader.rs

1use std::{
2  io,
3  pin::Pin,
4  sync::{Arc, Mutex, MutexGuard},
5  task::{Context, Poll},
6};
7
8#[allow(unused_imports)]
9use log::{debug, error, info, trace, warn};
10use futures::stream::{FusedStream, Stream};
11
12use super::datasample_cache::DataSampleCache;
13use crate::{
14  dds::{
15    adapters::with_key::{DefaultDecoder, *},
16    key::*,
17    qos::*,
18    readcondition::*,
19    result::ReadResult,
20    statusevents::*,
21    with_key::{datasample::*, simpledatareader::*},
22    ReadError,
23  },
24  discovery::sedp_messages::PublicationBuiltinTopicData,
25  serialization::CDRDeserializerAdapter,
26  structure::{duration::Duration, entity::RTPSEntity, guid::GUID, time::Timestamp},
27};
28
29/// Simplified type for CDR encoding
30pub type DataReaderCdr<D> = DataReader<D, CDRDeserializerAdapter<D>>;
31
32/// Parameter for reading [Readers](../struct.With_Key_DataReader.html) data
33/// with key or with next from current key.
34#[derive(Clone, Copy, Debug, PartialEq, Eq)]
35pub enum SelectByKey {
36  This,
37  Next,
38}
39
40/// DDS DataReader for with_key topics.
41///
42/// # Examples
43///
44/// ```
45/// use serde::{Serialize, Deserialize};
46/// use rustdds::*;
47/// use rustdds::with_key::DataReader;
48/// use rustdds::serialization::CDRDeserializerAdapter;
49///
50/// let domain_participant = DomainParticipant::new(0).unwrap();
51/// let qos = QosPolicyBuilder::new().build();
52/// let subscriber = domain_participant.create_subscriber(&qos).unwrap();
53///
54/// #[derive(Serialize, Deserialize)]
55/// struct SomeType { a: i32 }
56/// impl Keyed for SomeType {
57///   type K = i32;
58///
59///   fn key(&self) -> Self::K {
60///     self.a
61///   }
62/// }
63///
64/// // WithKey is important
65/// let topic = domain_participant.create_topic("some_topic".to_string(), "SomeType".to_string(), &qos, TopicKind::WithKey).unwrap();
66/// let data_reader = subscriber.create_datareader::<SomeType, CDRDeserializerAdapter<_>>(&topic, None);
67/// ```
68///
69/// *Note:* Many DataReader methods require mutable access to `self`, because
70/// they need to mutate the datasample cache, which is an essential content of
71/// this struct.
72pub struct DataReader<D: Keyed, DA: DeserializerAdapter<D> = CDRDeserializerAdapter<D>> {
73  simple_data_reader: SimpleDataReader<D, DA>,
74  datasample_cache: DataSampleCache<D>, // DataReader-local cache of deserialized samples
75}
76
77impl<D: 'static, DA> DataReader<D, DA>
78where
79  D: Keyed,
80  DA: DeserializerAdapter<D>,
81{
82  pub(crate) fn from_simple_data_reader(simple_data_reader: SimpleDataReader<D, DA>) -> Self {
83    let dsc = DataSampleCache::new(simple_data_reader.qos().clone());
84
85    Self {
86      simple_data_reader,
87      datasample_cache: dsc,
88    }
89  }
90}
91
92impl<D: 'static, DA> DataReader<D, DA>
93where
94  D: Keyed,
95  DA: DeserializerAdapter<D> + DefaultDecoder<D>,
96{
97  // Gets all unseen cache_changes from the TopicCache. Deserializes
98  // the serialized payload and stores the DataSamples (the actual data and the
99  // samplestate) to local container, datasample_cache.
100  fn fill_and_lock_local_datasample_cache(&mut self) -> ReadResult<()> {
101    while let Some(dcc) = self.simple_data_reader.try_take_one()? {
102      self
103        .datasample_cache
104        .fill_from_deserialized_cache_change(dcc);
105    }
106    Ok(())
107  }
108
109  fn drain_read_notifications(&self) {
110    self.simple_data_reader.drain_read_notifications();
111  }
112
113  fn select_keys_for_access(&self, read_condition: ReadCondition) -> Vec<(Timestamp, D::K)> {
114    self.datasample_cache.select_keys_for_access(read_condition)
115  }
116
117  fn take_by_keys(&mut self, keys: &[(Timestamp, D::K)]) -> Vec<DataSample<D>> {
118    self.datasample_cache.take_by_keys(keys)
119  }
120
121  fn take_bare_by_keys(&mut self, keys: &[(Timestamp, D::K)]) -> Vec<Sample<D, D::K>> {
122    self.datasample_cache.take_bare_by_keys(keys)
123  }
124
125  fn select_instance_keys_for_access(
126    &self,
127    instance: &D::K,
128    rc: ReadCondition,
129  ) -> Vec<(Timestamp, D::K)> {
130    self
131      .datasample_cache
132      .select_instance_keys_for_access(instance, rc)
133  }
134
135  /// Reads amount of samples found with `max_samples` and `read_condition`
136  /// parameters.
137  ///
138  /// # Arguments
139  ///
140  /// * `max_samples` - Limits maximum amount of samples read
141  /// * `read_condition` - Limits results by condition
142  ///
143  /// # Examples
144  ///
145  /// ```
146  /// # use serde::{Serialize, Deserialize};
147  /// # use rustdds::*;
148  /// # use rustdds::with_key::DataReader;
149  /// # use rustdds::serialization::CDRDeserializerAdapter;
150  ///
151  /// let domain_participant = DomainParticipant::new(0).unwrap();
152  /// let qos = QosPolicyBuilder::new().build();
153  /// let subscriber = domain_participant.create_subscriber(&qos).unwrap();
154  /// #
155  /// # #[derive(Serialize, Deserialize)]
156  /// # struct SomeType { a: i32 }
157  /// # impl Keyed for SomeType {
158  /// #   type K = i32;
159  /// #
160  /// #   fn key(&self) -> Self::K {
161  /// #     self.a
162  /// #   }
163  /// # }
164  ///
165  /// let topic = domain_participant.create_topic("some_topic".to_string(), "SomeType".to_string(), &qos, TopicKind::WithKey).unwrap();
166  /// let mut data_reader = subscriber.create_datareader::<SomeType, CDRDeserializerAdapter<_>>(&topic, None).unwrap();
167  ///
168  /// // Wait for data to arrive...
169  ///
170  /// if let Ok(datas) = data_reader.read(10, ReadCondition::not_read()) {
171  ///   for data in datas.iter() {
172  ///     // do something
173  ///   }
174  /// }
175  /// ```
176  pub fn read(
177    &mut self,
178    max_samples: usize,
179    read_condition: ReadCondition,
180  ) -> ReadResult<Vec<DataSample<&D>>> {
181    // Clear notification buffer. This must be done first to avoid race conditions.
182    self.drain_read_notifications();
183    self.fill_and_lock_local_datasample_cache()?;
184
185    let mut selected = self.select_keys_for_access(read_condition);
186    selected.truncate(max_samples);
187
188    let result = self.datasample_cache.read_by_keys(&selected);
189
190    Ok(result)
191  }
192
193  /// Takes amount of sample found with `max_samples` and `read_condition`
194  /// parameters.
195  ///
196  /// # Arguments
197  ///
198  /// * `max_samples` - Limits maximum amount of samples read
199  /// * `read_condition` - Limits results by condition
200  ///
201  /// # Examples
202  ///
203  /// ```
204  /// # use serde::{Serialize, Deserialize};
205  /// # use rustdds::*;
206  /// # use rustdds::with_key::DataReader;
207  /// # use rustdds::serialization::CDRDeserializerAdapter;
208  ///
209  /// let domain_participant = DomainParticipant::new(0).unwrap();
210  /// let qos = QosPolicyBuilder::new().build();
211  /// let subscriber = domain_participant.create_subscriber(&qos).unwrap();
212  /// #
213  /// # #[derive(Serialize, Deserialize)]
214  /// # struct SomeType { a: i32 }
215  /// # impl Keyed for SomeType {
216  /// #   type K = i32;
217  /// #
218  /// #   fn key(&self) -> Self::K {
219  /// #     self.a
220  /// #   }
221  /// # }
222  ///
223  /// // WithKey is important
224  /// let topic = domain_participant.create_topic("some_topic".to_string(), "SomeType".to_string(), &qos, TopicKind::WithKey).unwrap();
225  /// let mut data_reader = subscriber.create_datareader::<SomeType, CDRDeserializerAdapter<_>>(&topic, None).unwrap();
226  ///
227  /// // Wait for data to arrive...
228  ///
229  /// if let Ok(datas) = data_reader.take(10, ReadCondition::not_read()) {
230  ///   for data in datas.iter() {
231  ///     // do something
232  ///   }
233  /// }
234  /// ```
235  pub fn take(
236    &mut self,
237    max_samples: usize,
238    read_condition: ReadCondition,
239  ) -> ReadResult<Vec<DataSample<D>>> {
240    // Clear notification buffer. This must be done first to avoid race conditions.
241    self.drain_read_notifications();
242
243    self.fill_and_lock_local_datasample_cache()?;
244    let mut selected = self.select_keys_for_access(read_condition);
245    trace!("take selected count = {}", selected.len());
246    selected.truncate(max_samples);
247
248    let result = self.take_by_keys(&selected);
249    trace!("take taken count = {}", result.len());
250
251    Ok(result)
252  }
253
254  /// Reads next unread sample
255  ///
256  /// # Examples
257  ///
258  /// ```
259  /// # use serde::{Serialize, Deserialize};
260  /// # use rustdds::*;
261  /// # use rustdds::with_key::DataReader;
262  /// # use rustdds::serialization::CDRDeserializerAdapter;
263  /// #
264  /// let domain_participant = DomainParticipant::new(0).unwrap();
265  /// let qos = QosPolicyBuilder::new().build();
266  /// let subscriber = domain_participant.create_subscriber(&qos).unwrap();
267  /// #
268  /// # #[derive(Serialize, Deserialize)]
269  /// # struct SomeType { a: i32 }
270  /// # impl Keyed for SomeType {
271  /// #   type K = i32;
272  /// #
273  /// #   fn key(&self) -> Self::K {
274  /// #     self.a
275  /// #   }
276  /// # }
277  ///
278  /// // WithKey is important
279  /// let topic = domain_participant.create_topic("some_topic".to_string(), "SomeType".to_string(), &qos, TopicKind::WithKey).unwrap();
280  /// let mut data_reader = subscriber.create_datareader::<SomeType, CDRDeserializerAdapter<_>>(&topic, None).unwrap();
281  ///
282  /// // Wait for data to arrive...
283  ///
284  /// while let Ok(Some(data)) = data_reader.read_next_sample() {
285  ///   // do something
286  /// }
287  /// ```
288  pub fn read_next_sample(&mut self) -> ReadResult<Option<DataSample<&D>>> {
289    let mut ds = self.read(1, ReadCondition::not_read())?;
290    Ok(ds.pop())
291  }
292
293  /// Takes next unread sample
294  ///
295  /// # Examples
296  ///
297  /// ```
298  /// # use serde::{Serialize, Deserialize};
299  /// # use rustdds::*;
300  /// # use rustdds::with_key::DataReader;
301  /// # use rustdds::serialization::CDRDeserializerAdapter;
302  /// #
303  /// let domain_participant = DomainParticipant::new(0).unwrap();
304  /// let qos = QosPolicyBuilder::new().build();
305  /// let subscriber = domain_participant.create_subscriber(&qos).unwrap();
306  /// #
307  /// # #[derive(Serialize, Deserialize)]
308  /// # struct SomeType { a: i32 }
309  /// # impl Keyed for SomeType {
310  /// #   type K = i32;
311  /// #
312  /// #   fn key(&self) -> Self::K {
313  /// #     self.a
314  /// #   }
315  /// # }
316  ///
317  /// // WithKey is important
318  /// let topic = domain_participant.create_topic("some_topic".to_string(), "SomeType".to_string(), &qos, TopicKind::WithKey).unwrap();
319  /// let mut data_reader = subscriber.create_datareader::<SomeType, CDRDeserializerAdapter<_>>(&topic, None).unwrap();
320  ///
321  /// // Wait for data to arrive...
322  ///
323  /// while let Ok(Some(data)) = data_reader.take_next_sample() {
324  ///   // do something
325  /// }
326  /// ```
327  pub fn take_next_sample(&mut self) -> ReadResult<Option<DataSample<D>>> {
328    let mut ds = self.take(1, ReadCondition::not_read())?;
329    Ok(ds.pop())
330  }
331
332  // Iterator interface
333
334  fn read_bare(
335    &mut self,
336    max_samples: usize,
337    read_condition: ReadCondition,
338  ) -> ReadResult<Vec<Sample<&D, D::K>>> {
339    self.drain_read_notifications();
340    self.fill_and_lock_local_datasample_cache()?;
341
342    let mut selected = self.select_keys_for_access(read_condition);
343    selected.truncate(max_samples);
344
345    let result = self.datasample_cache.read_bare_by_keys(&selected);
346
347    Ok(result)
348  }
349
350  fn take_bare(
351    &mut self,
352    max_samples: usize,
353    read_condition: ReadCondition,
354  ) -> ReadResult<Vec<Sample<D, D::K>>> {
355    // Clear notification buffer. This must be done first to avoid race conditions.
356    self.drain_read_notifications();
357    self.fill_and_lock_local_datasample_cache()?;
358
359    let mut selected = self.select_keys_for_access(read_condition);
360    trace!("take bare selected count = {}", selected.len());
361    selected.truncate(max_samples);
362
363    let result = self.take_bare_by_keys(&selected);
364    trace!("take bare taken count = {}", result.len());
365
366    Ok(result)
367  }
368
369  /// Produces an iterator over the currently available NOT_READ samples.
370  /// Yields only payload data, not SampleInfo metadata
371  /// This is not called `iter()` because it takes a mutable reference to self.
372  ///
373  /// # Examples
374  ///
375  /// ```
376  /// # use serde::{Serialize, Deserialize};
377  /// # use rustdds::*;
378  /// # use rustdds::with_key::DataReader;
379  /// # use rustdds::serialization::CDRDeserializerAdapter;
380  /// #
381  /// let domain_participant = DomainParticipant::new(0).unwrap();
382  /// let qos = QosPolicyBuilder::new().build();
383  /// let subscriber = domain_participant.create_subscriber(&qos).unwrap();
384  /// #
385  /// # #[derive(Serialize, Deserialize)]
386  /// # struct SomeType { a: i32 }
387  /// # impl Keyed for SomeType {
388  /// #   type K = i32;
389  /// #
390  /// #   fn key(&self) -> Self::K {
391  /// #     self.a
392  /// #   }
393  /// # }
394  ///
395  /// // WithKey is important
396  /// let topic = domain_participant.create_topic("some_topic".to_string(), "SomeType".to_string(), &qos, TopicKind::WithKey).unwrap();
397  /// let mut data_reader = subscriber.create_datareader::<SomeType, CDRDeserializerAdapter<_>>(&topic, None).unwrap();
398  ///
399  /// // Wait for data to arrive...
400  ///
401  /// for data in data_reader.iterator() {
402  ///   // do something
403  /// }
404  /// ```
405  pub fn iterator(&mut self) -> ReadResult<impl Iterator<Item = Sample<&D, D::K>>> {
406    // TODO: We could come up with a more efficient implementation than wrapping a
407    // read call
408    Ok(
409      self
410        .read_bare(usize::MAX, ReadCondition::not_read())?
411        .into_iter(),
412    )
413  }
414
415  /// Produces an iterator over the samples filtered by a given condition.
416  /// Yields only payload data, not SampleInfo metadata
417  ///
418  /// # Examples
419  ///
420  /// ```
421  /// # use serde::{Serialize, Deserialize};
422  /// # use rustdds::*;
423  /// # use rustdds::with_key::DataReader;
424  /// # use rustdds::serialization::CDRDeserializerAdapter;
425  ///
426  /// let domain_participant = DomainParticipant::new(0).unwrap();
427  /// let qos = QosPolicyBuilder::new().build();
428  /// let subscriber = domain_participant.create_subscriber(&qos).unwrap();
429  /// #
430  /// # #[derive(Serialize, Deserialize)]
431  /// # struct SomeType { a: i32 }
432  /// # impl Keyed for SomeType {
433  /// #   type K = i32;
434  /// #
435  /// #   fn key(&self) -> Self::K {
436  /// #     self.a
437  /// #   }
438  /// # }
439  ///
440  /// // WithKey is important
441  /// let topic = domain_participant.create_topic("some_topic".to_string(), "SomeType".to_string(), &qos, TopicKind::WithKey).unwrap();
442  /// let mut data_reader = subscriber.create_datareader::<SomeType, CDRDeserializerAdapter<_>>(&topic, None).unwrap();
443  ///
444  /// // Wait for data to arrive...
445  ///
446  /// for data in data_reader.conditional_iterator(ReadCondition::any()) {
447  ///   // do something
448  /// }
449  /// ```
450  pub fn conditional_iterator(
451    &mut self,
452    read_condition: ReadCondition,
453  ) -> ReadResult<impl Iterator<Item = Sample<&D, D::K>>> {
454    // TODO: We could come up with a more efficient implementation than wrapping a
455    // read call
456    Ok(self.read_bare(usize::MAX, read_condition)?.into_iter())
457  }
458
459  /// Produces an iterator over the currently available NOT_READ samples.
460  /// Yields only payload data, not SampleInfo metadata
461  /// Removes samples from `DataReader`.
462  /// <strong>Note!</strong> If the iterator is only partially consumed, all the
463  /// samples it could have provided are still removed from the `Datareader`.
464  ///
465  /// # Examples
466  ///
467  /// ```
468  /// # use serde::{Serialize, Deserialize};
469  /// # use rustdds::*;
470  /// # use rustdds::with_key::DataReader;
471  /// # use rustdds::serialization::CDRDeserializerAdapter;
472  /// #
473  /// let domain_participant = DomainParticipant::new(0).unwrap();
474  /// let qos = QosPolicyBuilder::new().build();
475  /// let subscriber = domain_participant.create_subscriber(&qos).unwrap();
476  /// #
477  /// # #[derive(Serialize, Deserialize)]
478  /// # struct SomeType { a: i32 }
479  /// # impl Keyed for SomeType {
480  /// #   type K = i32;
481  /// #
482  /// #   fn key(&self) -> Self::K {
483  /// #     self.a
484  /// #   }
485  /// # }
486  ///
487  /// let topic = domain_participant.create_topic("some_topic".to_string(), "SomeType".to_string(), &qos, TopicKind::WithKey).unwrap();
488  /// let mut data_reader = subscriber.create_datareader::<SomeType, CDRDeserializerAdapter<_>>(&topic, None).unwrap();
489  ///
490  /// // Wait for data to arrive...
491  ///
492  /// for data in data_reader.into_iterator() {
493  ///   // do something
494  /// }
495  /// ```
496  pub fn into_iterator(&mut self) -> ReadResult<impl Iterator<Item = Sample<D, D::K>>> {
497    // TODO: We could come up with a more efficient implementation than wrapping a
498    // take call
499    Ok(
500      self
501        .take_bare(usize::MAX, ReadCondition::not_read())?
502        .into_iter(),
503    )
504  }
505
506  /// Produces an iterator over the samples filtered by the given condition.
507  /// Yields only payload data, not SampleInfo metadata
508  /// <strong>Note!</strong> If the iterator is only partially consumed, all the
509  /// samples it could have provided are still removed from the `Datareader`.
510  ///
511  /// # Examples
512  ///
513  /// ```
514  /// # use serde::{Serialize, Deserialize};
515  /// # use rustdds::*;
516  /// # use rustdds::with_key::DataReader;
517  /// # use rustdds::serialization::CDRDeserializerAdapter;
518  ///
519  /// let domain_participant = DomainParticipant::new(0).unwrap();
520  /// let qos = QosPolicyBuilder::new().build();
521  /// let subscriber = domain_participant.create_subscriber(&qos).unwrap();
522  /// #
523  /// # #[derive(Serialize, Deserialize)]
524  /// # struct SomeType { a: i32 }
525  /// # impl Keyed for SomeType {
526  /// #   type K = i32;
527  /// #
528  /// #   fn key(&self) -> Self::K {
529  /// #     self.a
530  /// #   }
531  /// # }
532  ///
533  /// // WithKey is important
534  /// let topic = domain_participant.create_topic("some_topic".to_string(), "SomeType".to_string(), &qos, TopicKind::WithKey).unwrap();
535  /// let mut data_reader = subscriber.create_datareader::<SomeType, CDRDeserializerAdapter<_>>(&topic, None).unwrap();
536  ///
537  /// // Wait for data to arrive...
538  ///
539  /// for data in data_reader.into_conditional_iterator(ReadCondition::not_read()) {
540  ///   // do something
541  /// }
542  /// ```
543  pub fn into_conditional_iterator(
544    &mut self,
545    read_condition: ReadCondition,
546  ) -> ReadResult<impl Iterator<Item = Sample<D, D::K>>> {
547    // TODO: We could come up with a more efficient implementation than wrapping a
548    // take call
549    Ok(self.take_bare(usize::MAX, read_condition)?.into_iter())
550  }
551
552  // ----------------------------------------------------------------------------
553  // ----------------------------------------------------------------------------
554
555  fn infer_key(
556    &self,
557    instance_key: Option<<D as Keyed>::K>,
558    this_or_next: SelectByKey,
559  ) -> Option<<D as Keyed>::K> {
560    match instance_key {
561      Some(k) => match this_or_next {
562        SelectByKey::This => Some(k),
563        SelectByKey::Next => self.datasample_cache.next_key(&k),
564      },
565      None => self.datasample_cache.instance_map.keys().next().cloned(),
566    }
567  }
568
569  /// Works similarly to read(), but will return only samples from a specific
570  /// instance. The instance is specified by an optional key. In case the key
571  /// is not specified, the smallest (in key order) instance is selected.
572  /// If a key is specified, then the parameter this_or_next specifies whether
573  /// to access the instance with specified key or the following one, in key
574  /// order.
575  ///
576  /// This should cover DDS DataReader methods read_instance,
577  /// read_next_instance, read_next_instance_w_condition.
578  ///
579  /// # Examples
580  ///
581  /// ```
582  /// # use serde::{Serialize, Deserialize};
583  /// # use rustdds::*;
584  /// # use rustdds::with_key::DataReader;
585  /// # use rustdds::serialization::CDRDeserializerAdapter;
586  ///
587  /// let domain_participant = DomainParticipant::new(0).unwrap();
588  /// let qos = QosPolicyBuilder::new().build();
589  /// let subscriber = domain_participant.create_subscriber(&qos).unwrap();
590  /// #
591  /// # #[derive(Serialize, Deserialize)]
592  /// # struct SomeType { a: i32 }
593  /// # impl Keyed for SomeType {
594  /// #   type K = i32;
595  /// #
596  /// #   fn key(&self) -> Self::K {
597  /// #     self.a
598  /// #   }
599  /// # }
600  ///
601  /// // WithKey is important
602  /// let topic = domain_participant.create_topic("some_topic".to_string(), "SomeType".to_string(), &qos, TopicKind::WithKey).unwrap();
603  /// let mut data_reader = subscriber.create_datareader::<SomeType, CDRDeserializerAdapter<_>>(&topic, None).unwrap();
604  ///
605  /// // Wait for data to arrive...
606  ///
607  /// if let Ok(datas) = data_reader.read_instance(10, ReadCondition::any(), Some(3), SelectByKey::This) {
608  ///   for data in datas.iter() {
609  ///     // do something
610  ///   }
611  /// }
612  /// ```
613  pub fn read_instance(
614    &mut self,
615    max_samples: usize,
616    read_condition: ReadCondition,
617    // Select only samples from instance specified by key. In case of None, select the
618    // "smallest" instance as specified by the key type Ord trait.
619    instance_key: Option<<D as Keyed>::K>,
620    // This = Select instance specified by key.
621    // Next = select next instance in the order specified by Ord on keys.
622    this_or_next: SelectByKey,
623  ) -> ReadResult<Vec<DataSample<&D>>> {
624    self.drain_read_notifications();
625    self.fill_and_lock_local_datasample_cache()?;
626
627    let key = match Self::infer_key(self, instance_key, this_or_next) {
628      Some(k) => k,
629      None => return Ok(Vec::new()),
630    };
631
632    let mut selected = self
633      .datasample_cache
634      .select_instance_keys_for_access(&key, read_condition);
635    selected.truncate(max_samples);
636
637    let result = self.datasample_cache.read_by_keys(&selected);
638
639    Ok(result)
640  }
641
642  /// Similar to read_instance, but will return owned datasamples
643  /// This should cover DDS DataReader methods take_instance,
644  /// take_next_instance, take_next_instance_w_condition.
645  ///
646  /// # Examples
647  ///
648  /// ```
649  /// # use serde::{Serialize, Deserialize};
650  /// # use rustdds::*;
651  /// # use rustdds::with_key::DataReader;
652  /// # use rustdds::serialization::CDRDeserializerAdapter;
653  ///
654  /// let domain_participant = DomainParticipant::new(0).unwrap();
655  /// let qos = QosPolicyBuilder::new().build();
656  /// let subscriber = domain_participant.create_subscriber(&qos).unwrap();
657  /// #
658  /// # #[derive(Serialize, Deserialize)]
659  /// # struct SomeType { a: i32 }
660  /// # impl Keyed for SomeType {
661  /// #   type K = i32;
662  /// #
663  /// #   fn key(&self) -> Self::K {
664  /// #     self.a
665  /// #   }
666  /// # }
667  ///
668  /// // WithKey is important
669  /// let topic = domain_participant.create_topic("some_topic".to_string(), "SomeType".to_string(), &qos, TopicKind::WithKey).unwrap();
670  /// let mut data_reader = subscriber.create_datareader::<SomeType, CDRDeserializerAdapter<_>>(&topic, None).unwrap();
671  ///
672  /// // Wait for data to arrive...
673  ///
674  /// if let Ok(datas) = data_reader.take_instance(10, ReadCondition::any(), Some(3), SelectByKey::Next) {
675  ///   for data in datas.iter() {
676  ///     // do something
677  ///   }
678  /// }
679  /// ```
680  pub fn take_instance(
681    &mut self,
682    max_samples: usize,
683    read_condition: ReadCondition,
684    // Select only samples from instance specified by key. In case of None, select the
685    // "smallest" instance as specified by the key type Ord trait.
686    instance_key: Option<<D as Keyed>::K>,
687    // This = Select instance specified by key.
688    // Next = select next instance in the order specified by Ord on keys.
689    this_or_next: SelectByKey,
690  ) -> ReadResult<Vec<DataSample<D>>> {
691    // Clear notification buffer. This must be done first to avoid race conditions.
692    self.drain_read_notifications();
693
694    self.fill_and_lock_local_datasample_cache()?;
695
696    let key = match self.infer_key(instance_key, this_or_next) {
697      Some(k) => k,
698      None => return Ok(Vec::new()),
699    };
700
701    let mut selected = self.select_instance_keys_for_access(&key, read_condition);
702    selected.truncate(max_samples);
703
704    let result = self.take_by_keys(&selected);
705
706    Ok(result)
707  }
708
709  /// Return values:
710  /// true - got all historical data
711  /// false - timeout before all historical data was received
712  pub fn wait_for_historical_data(&mut self, _max_wait: Duration) -> bool {
713    todo!()
714  }
715
716  // Spec calls for two separate functions:
717  // get_matched_publications returns a list of handles
718  // get_matched_publication_data returns PublicationBuiltinTopicData for a handle
719  // But we do not believe in handle-oriented programming, so just return
720  // the actual data right away. Since the handles are quite opaque, about the
721  // only thing that could be done with the handles would be counting how many
722  // we got.
723
724  pub fn get_matched_publications(&self) -> impl Iterator<Item = PublicationBuiltinTopicData> {
725    // TODO: Obviously not implemented
726    vec![].into_iter()
727  }
728
729  /// An async stream for reading the (bare) data samples.
730  /// The resulting Stream can be used to get another stream of status events.
731  pub fn async_bare_sample_stream(self) -> BareDataReaderStream<D, DA> {
732    BareDataReaderStream {
733      datareader: Arc::new(Mutex::new(self)),
734    }
735  }
736
737  /// An async stream for reading the data samples.
738  /// The resulting Stream can be used to get another stream of status events.
739  pub fn async_sample_stream(self) -> DataReaderStream<D, DA> {
740    DataReaderStream {
741      datareader: Arc::new(Mutex::new(self)),
742    }
743  }
744} // impl
745
746// -------------------
747
748impl<D, DA> mio_06::Evented for DataReader<D, DA>
749where
750  D: Keyed,
751  DA: DeserializerAdapter<D>,
752{
753  // We just delegate all the operations to notification_receiver, since it
754  // already implements mio_06::Evented
755  fn register(
756    &self,
757    poll: &mio_06::Poll,
758    token: mio_06::Token,
759    interest: mio_06::Ready,
760    opts: mio_06::PollOpt,
761  ) -> io::Result<()> {
762    self
763      .simple_data_reader
764      .register(poll, token, interest, opts)
765  }
766
767  fn reregister(
768    &self,
769    poll: &mio_06::Poll,
770    token: mio_06::Token,
771    interest: mio_06::Ready,
772    opts: mio_06::PollOpt,
773  ) -> io::Result<()> {
774    self
775      .simple_data_reader
776      .reregister(poll, token, interest, opts)
777  }
778
779  fn deregister(&self, poll: &mio_06::Poll) -> io::Result<()> {
780    self.simple_data_reader.deregister(poll)
781  }
782}
783
784impl<D, DA> mio_08::event::Source for DataReader<D, DA>
785where
786  D: Keyed,
787  DA: DeserializerAdapter<D>,
788{
789  fn register(
790    &mut self,
791    registry: &mio_08::Registry,
792    token: mio_08::Token,
793    interests: mio_08::Interest,
794  ) -> io::Result<()> {
795    // SimpleDataReader implements .register() for two traits, so need to
796    // use disambiguation syntax to call .register() here.
797    <SimpleDataReader<D, DA> as mio_08::event::Source>::register(
798      &mut self.simple_data_reader,
799      registry,
800      token,
801      interests,
802    )
803  }
804
805  fn reregister(
806    &mut self,
807    registry: &mio_08::Registry,
808    token: mio_08::Token,
809    interests: mio_08::Interest,
810  ) -> io::Result<()> {
811    <SimpleDataReader<D, DA> as mio_08::event::Source>::reregister(
812      &mut self.simple_data_reader,
813      registry,
814      token,
815      interests,
816    )
817  }
818
819  fn deregister(&mut self, registry: &mio_08::Registry) -> io::Result<()> {
820    <SimpleDataReader<D, DA> as mio_08::event::Source>::deregister(
821      &mut self.simple_data_reader,
822      registry,
823    )
824  }
825}
826
827impl<'a, D, DA> StatusEvented<'a, DataReaderStatus, SimpleDataReaderEventStream<'a, D, DA>>
828  for DataReader<D, DA>
829where
830  D: Keyed + 'static,
831  DA: DeserializerAdapter<D>,
832{
833  fn as_status_evented(&mut self) -> &dyn mio_06::Evented {
834    self.simple_data_reader.as_status_evented()
835  }
836
837  fn as_status_source(&mut self) -> &mut dyn mio_08::event::Source {
838    self.simple_data_reader.as_status_source()
839  }
840
841  fn as_async_status_stream(&'a self) -> SimpleDataReaderEventStream<'a, D, DA> {
842    self.simple_data_reader.as_async_status_stream()
843  }
844
845  fn try_recv_status(&self) -> Option<DataReaderStatus> {
846    self.simple_data_reader.try_recv_status()
847  }
848}
849
850impl<D, DA> HasQoSPolicy for DataReader<D, DA>
851where
852  D: Keyed + 'static,
853  DA: DeserializerAdapter<D>,
854{
855  fn qos(&self) -> QosPolicies {
856    self.simple_data_reader.qos().clone()
857  }
858}
859
860impl<D, DA> RTPSEntity for DataReader<D, DA>
861where
862  D: Keyed + 'static,
863  DA: DeserializerAdapter<D>,
864{
865  fn guid(&self) -> GUID {
866    self.simple_data_reader.guid()
867  }
868}
869
870// ----------------------------------------------
871// ----------------------------------------------
872
873// Async interface to the (bare) DataReader
874
875pub struct BareDataReaderStream<
876  D: Keyed + 'static,
877  DA: DeserializerAdapter<D> + 'static = CDRDeserializerAdapter<D>,
878> {
879  datareader: Arc<Mutex<DataReader<D, DA>>>,
880}
881
882impl<D, DA> BareDataReaderStream<D, DA>
883where
884  D: Keyed + 'static,
885  DA: DeserializerAdapter<D>,
886{
887  /// Get a stream of status events
888  pub fn async_event_stream(&self) -> DataReaderEventStream<D, DA> {
889    DataReaderEventStream {
890      datareader: Arc::clone(&self.datareader),
891    }
892  }
893  fn lock_datareader(&self) -> ReadResult<MutexGuard<'_, DataReader<D, DA>>> {
894    self.datareader.lock().map_err(|e| ReadError::Poisoned {
895      reason: format!("BareDataReaderStream could not lock datareader: {e:?}"),
896    })
897  }
898}
899
900// https://users.rust-lang.org/t/take-in-impl-future-cannot-borrow-data-in-a-dereference-of-pin/52042
901impl<D, DA> Unpin for BareDataReaderStream<D, DA>
902where
903  D: Keyed + 'static,
904  DA: DeserializerAdapter<D>,
905{
906}
907
908impl<D, DA> Stream for BareDataReaderStream<D, DA>
909where
910  D: Keyed + 'static,
911  DA: DeserializerAdapter<D> + DefaultDecoder<D>,
912{
913  type Item = ReadResult<Sample<D, D::K>>;
914
915  fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
916    debug!("poll_next");
917    let mut datareader = match self.lock_datareader() {
918      Ok(g) => g,
919      Err(e) => return Poll::Ready(Some(Err(e))),
920    }; //TODO: Upgrade to ?-operator: https://github.com/rust-lang/rust/issues/84277
921
922    match datareader.take_bare(1, ReadCondition::not_read()) {
923      Err(e) =>
924      // DDS fails
925      {
926        Poll::Ready(Some(Err(e)))
927      }
928
929      Ok(mut v) => {
930        match v.pop() {
931          Some(d) => Poll::Ready(Some(Ok(d))),
932          None => {
933            // Did not get any data.
934            // --> Store waker.
935            // 1. synchronously store waker to background thread (must rendezvous)
936            // 2. try take_bare again, in case something arrived just now
937            // 3. if nothing still, return pending.
938            datareader
939              .simple_data_reader
940              .set_waker(Some(cx.waker().clone()));
941            match datareader.take_bare(1, ReadCondition::not_read()) {
942              Err(e) => Poll::Ready(Some(Err(e))),
943              Ok(mut v) => match v.pop() {
944                None => Poll::Pending,
945                Some(d) => Poll::Ready(Some(Ok(d))),
946              },
947            }
948          }
949        }
950      }
951    }
952  }
953}
954
955impl<D, DA> FusedStream for BareDataReaderStream<D, DA>
956where
957  D: Keyed + 'static,
958  DA: DeserializerAdapter<D> + DefaultDecoder<D>,
959{
960  fn is_terminated(&self) -> bool {
961    false // Never terminate. This means it is always valid to call poll_next().
962  }
963}
964
965// Async interface to the (non-bare) DataReader
966
967pub struct DataReaderStream<
968  D: Keyed + 'static,
969  DA: DeserializerAdapter<D> + 'static = CDRDeserializerAdapter<D>,
970> {
971  datareader: Arc<Mutex<DataReader<D, DA>>>,
972}
973
974impl<D, DA> DataReaderStream<D, DA>
975where
976  D: Keyed + 'static,
977  DA: DeserializerAdapter<D>,
978{
979  /// Get a stream of status events
980  pub fn async_event_stream(&self) -> DataReaderEventStream<D, DA> {
981    DataReaderEventStream {
982      datareader: Arc::clone(&self.datareader),
983    }
984  }
985  fn lock_datareader(&self) -> ReadResult<MutexGuard<'_, DataReader<D, DA>>> {
986    self.datareader.lock().map_err(|e| ReadError::Poisoned {
987      reason: format!("DataReaderStream could not lock datareader: {e:?}"),
988    })
989  }
990}
991
992// https://users.rust-lang.org/t/take-in-impl-future-cannot-borrow-data-in-a-dereference-of-pin/52042
993impl<D, DA> Unpin for DataReaderStream<D, DA>
994where
995  D: Keyed + 'static,
996  DA: DeserializerAdapter<D>,
997{
998}
999
1000impl<D, DA> Stream for DataReaderStream<D, DA>
1001where
1002  D: Keyed + 'static,
1003  DA: DeserializerAdapter<D> + DefaultDecoder<D>,
1004{
1005  type Item = ReadResult<DataSample<D>>;
1006
1007  fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1008    debug!("poll_next");
1009    let mut datareader = match self.lock_datareader() {
1010      Ok(g) => g,
1011      Err(e) => return Poll::Ready(Some(Err(e))),
1012    }; //TODO: Upgrade to ?-operator: https://github.com/rust-lang/rust/issues/84277
1013
1014    match datareader.take(1, ReadCondition::not_read()) {
1015      Err(e) =>
1016      // DDS fails
1017      {
1018        Poll::Ready(Some(Err(e)))
1019      }
1020
1021      Ok(mut v) => {
1022        match v.pop() {
1023          Some(d) => Poll::Ready(Some(Ok(d))),
1024          None => {
1025            // Did not get any data.
1026            // --> Store waker.
1027            // 1. synchronously store waker to background thread (must rendezvous)
1028            // 2. try take again, in case something arrived just now
1029            // 3. if nothing still, return pending.
1030            datareader
1031              .simple_data_reader
1032              .set_waker(Some(cx.waker().clone()));
1033            match datareader.take(1, ReadCondition::not_read()) {
1034              Err(e) => Poll::Ready(Some(Err(e))),
1035              Ok(mut v) => match v.pop() {
1036                None => Poll::Pending,
1037                Some(d) => Poll::Ready(Some(Ok(d))),
1038              },
1039            }
1040          }
1041        }
1042      }
1043    }
1044  }
1045}
1046
1047impl<D, DA> FusedStream for DataReaderStream<D, DA>
1048where
1049  D: Keyed + 'static,
1050  DA: DeserializerAdapter<D> + DefaultDecoder<D>,
1051{
1052  fn is_terminated(&self) -> bool {
1053    false // Never terminate. This means it is always valid to call poll_next().
1054  }
1055}
1056
1057// ----------------------------------------------------------------------------------------------------
1058// ----------------------------------------------------------------------------------------------------
1059
1060pub struct DataReaderEventStream<
1061  D: Keyed + 'static,
1062  DA: DeserializerAdapter<D> + 'static = CDRDeserializerAdapter<D>,
1063> {
1064  datareader: Arc<Mutex<DataReader<D, DA>>>,
1065}
1066
1067impl<D, DA> DataReaderEventStream<D, DA>
1068where
1069  D: Keyed + 'static,
1070  DA: DeserializerAdapter<D>,
1071{
1072  fn lock_datareader(&self) -> ReadResult<MutexGuard<'_, DataReader<D, DA>>> {
1073    self.datareader.lock().map_err(|e| ReadError::Poisoned {
1074      reason: format!("DataReaderEventStream could not lock datareader: {e:?}"),
1075    })
1076  }
1077}
1078
1079impl<D, DA> Stream for DataReaderEventStream<D, DA>
1080where
1081  D: Keyed + 'static,
1082  DA: DeserializerAdapter<D>,
1083{
1084  type Item = DataReaderStatus;
1085
1086  fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1087    let datareader = match self.lock_datareader() {
1088      Ok(g) => g,
1089      Err(_e) => return Poll::Ready(None),
1090      // If the locking failed, it is due to lock poisoning. This is not recoverable.
1091      // We just indicate that the stream of events has ended, because there is
1092      // no Result to return here.
1093    };
1094
1095    Pin::new(&mut datareader.simple_data_reader.as_async_status_stream()).poll_next(cx)
1096  }
1097}
1098
1099impl<D, DA> FusedStream for DataReaderEventStream<D, DA>
1100where
1101  D: Keyed + 'static,
1102  DA: DeserializerAdapter<D>,
1103{
1104  fn is_terminated(&self) -> bool {
1105    false // Never terminate. This means it is always valid to call poll_next().
1106  }
1107}
1108
1109// ----------------------------------------------------------------------------------------------------
1110// ----------------------------------------------------------------------------------------------------
1111// ----------------------------------------------------------------------------------------------------
1112// ----------------------------------------------------------------------------------------------------
1113// ----------------------------------------------------------------------------------------------------
1114// ----------------------------------------------------------------------------------------------------
1115
1116#[cfg(test)]
1117mod tests {
1118  use std::rc::Rc;
1119
1120  use bytes::Bytes;
1121  use mio_extras::channel as mio_channel;
1122  use log::info;
1123  use byteorder::LittleEndian;
1124
1125  use super::*;
1126  use crate::{
1127    dds::{
1128      participant::DomainParticipant,
1129      topic::{TopicDescription, TopicKind},
1130    },
1131    messages::submessages::{
1132      elements::serialized_payload::SerializedPayload, submessage_flag::*, submessages::Data,
1133    },
1134    mio_source,
1135    network::udp_sender::UDPSender,
1136    rtps::{
1137      message_receiver::*,
1138      reader::{Reader, ReaderIngredients},
1139    },
1140    serialization::to_vec,
1141    structure::{
1142      guid::{EntityId, EntityKind, GuidPrefix},
1143      sequence_number::SequenceNumber,
1144    },
1145    test::random_data::*,
1146    RepresentationIdentifier,
1147  };
1148
1149  #[test]
1150  fn read_and_take() {
1151    // Test the read and take methods of the DataReader
1152
1153    let dp = DomainParticipant::new(0).expect("Participant creation failed!");
1154
1155    let mut qos = QosPolicies::qos_none();
1156    qos.history = Some(policy::History::KeepAll); // Just for testing
1157
1158    let sub = dp.create_subscriber(&qos).unwrap();
1159    let topic = dp
1160      .create_topic(
1161        "dr read".to_string(),
1162        "read fn test?".to_string(),
1163        &qos,
1164        TopicKind::WithKey,
1165      )
1166      .unwrap();
1167
1168    let topic_cache =
1169      dp.dds_cache()
1170        .write()
1171        .unwrap()
1172        .add_new_topic(topic.name(), topic.get_type(), &topic.qos());
1173
1174    // Create a Reader
1175    let (notification_sender, _notification_receiver) = mio_channel::sync_channel::<()>(100);
1176    let (_notification_event_source, notification_event_sender) =
1177      mio_source::make_poll_channel().unwrap();
1178    let data_reader_waker = Arc::new(Mutex::new(None));
1179
1180    let (status_sender, _status_receiver) = sync_status_channel::<DataReaderStatus>(4).unwrap();
1181    let (participant_status_sender, _participant_status_receiver) =
1182      sync_status_channel(16).unwrap();
1183
1184    let (_reader_command_sender, reader_command_receiver) =
1185      mio_channel::sync_channel::<ReaderCommand>(10);
1186
1187    let default_id = EntityId::default();
1188    let reader_guid = GUID::new_with_prefix_and_id(dp.guid_prefix(), default_id);
1189
1190    let reader_ing = ReaderIngredients {
1191      guid: reader_guid,
1192      notification_sender,
1193      status_sender,
1194      topic_name: topic.name(),
1195      topic_cache_handle: topic_cache,
1196      like_stateless: false,
1197      qos_policy: QosPolicies::qos_none(),
1198      data_reader_command_receiver: reader_command_receiver,
1199      data_reader_waker,
1200      poll_event_sender: notification_event_sender,
1201      security_plugins: None,
1202    };
1203
1204    let mut reader = Reader::new(
1205      reader_ing,
1206      Rc::new(UDPSender::new_with_random_port().unwrap()),
1207      mio_extras::timer::Builder::default().build(),
1208      participant_status_sender,
1209    );
1210
1211    // Create the corresponding matching DataReader
1212    let mut datareader = sub
1213      .create_datareader::<RandomData, CDRDeserializerAdapter<RandomData>>(&topic, None)
1214      .unwrap();
1215
1216    let writer_guid = GUID {
1217      prefix: GuidPrefix::new(&[1; 12]),
1218      entity_id: EntityId::create_custom_entity_id(
1219        [1; 3],
1220        EntityKind::WRITER_WITH_KEY_USER_DEFINED,
1221      ),
1222    };
1223    let mr_state = MessageReceiverState {
1224      source_guid_prefix: writer_guid.prefix,
1225      ..Default::default()
1226    };
1227    reader.matched_writer_add(
1228      writer_guid,
1229      EntityId::UNKNOWN,
1230      mr_state.unicast_reply_locator_list.to_vec(),
1231      mr_state.multicast_reply_locator_list.to_vec(),
1232      &QosPolicies::qos_none(),
1233    );
1234
1235    // Reader and datareader ready, feed reader some data
1236    let test_data = RandomData {
1237      a: 10,
1238      b: ":DDD".to_string(),
1239    };
1240
1241    let test_data2 = RandomData {
1242      a: 11,
1243      b: ":)))".to_string(),
1244    };
1245    let data_msg = Data {
1246      reader_id: reader.entity_id(),
1247      writer_id: writer_guid.entity_id,
1248      writer_sn: SequenceNumber::from(1),
1249      serialized_payload: Some(
1250        SerializedPayload {
1251          representation_identifier: RepresentationIdentifier::CDR_LE,
1252          representation_options: [0, 0],
1253          value: Bytes::from(to_vec::<RandomData, LittleEndian>(&test_data).unwrap()),
1254        }
1255        .into(),
1256      ),
1257      ..Data::default()
1258    };
1259
1260    let data_msg2 = Data {
1261      reader_id: reader.entity_id(),
1262      writer_id: writer_guid.entity_id,
1263      writer_sn: SequenceNumber::from(2),
1264      serialized_payload: Some(
1265        SerializedPayload {
1266          representation_identifier: RepresentationIdentifier::CDR_LE,
1267          representation_options: [0, 0],
1268          value: Bytes::from(to_vec::<RandomData, LittleEndian>(&test_data2).unwrap()),
1269        }
1270        .into(),
1271      ),
1272      ..Data::default()
1273    };
1274
1275    let data_flags = DATA_Flags::Endianness | DATA_Flags::Data;
1276
1277    reader.handle_data_msg(data_msg, data_flags, &mr_state);
1278    reader.handle_data_msg(data_msg2, data_flags, &mr_state);
1279
1280    // Test that reading does not consume data samples, i.e. they can be read
1281    // multiple times
1282    {
1283      let result_vec = datareader.read(100, ReadCondition::any()).unwrap();
1284      assert_eq!(result_vec.len(), 2);
1285      let d = result_vec[0].value().clone().unwrap();
1286      assert_eq!(&test_data, d);
1287    }
1288    {
1289      let result_vec2 = datareader.read(100, ReadCondition::any()).unwrap();
1290      assert_eq!(result_vec2.len(), 2);
1291      let d2 = result_vec2[1].value().clone().unwrap();
1292      assert_eq!(&test_data2, d2);
1293    }
1294    {
1295      let result_vec3 = datareader.read(100, ReadCondition::any()).unwrap();
1296      let d3 = result_vec3[0].value().clone().unwrap();
1297      assert_eq!(&test_data, d3);
1298    }
1299
1300    // Test that taking consumes the data samples
1301    let mut result_vec = datareader.take(100, ReadCondition::any()).unwrap();
1302    let datasample2 = result_vec.pop().unwrap();
1303    let datasample1 = result_vec.pop().unwrap();
1304    let data2 = datasample2.into_value().unwrap();
1305    let data1 = datasample1.into_value().unwrap();
1306    assert_eq!(test_data2, data2);
1307    assert_eq!(test_data, data1);
1308
1309    let result_vec2 = datareader.take(100, ReadCondition::any());
1310    assert!(result_vec2.is_ok());
1311    assert_eq!(result_vec2.unwrap().len(), 0);
1312  }
1313
1314  #[test]
1315  fn read_and_take_with_instance() {
1316    // Test the methods read_instance and take_instance of the DataReader
1317
1318    let dp = DomainParticipant::new(0).expect("Participant creation failed!");
1319
1320    let mut qos = QosPolicies::qos_none();
1321    qos.history = Some(policy::History::KeepAll); // Just for testing
1322
1323    let sub = dp.create_subscriber(&qos).unwrap();
1324    let topic = dp
1325      .create_topic(
1326        "dr read".to_string(),
1327        "read fn test?".to_string(),
1328        &qos,
1329        TopicKind::WithKey,
1330      )
1331      .unwrap();
1332
1333    let topic_cache =
1334      dp.dds_cache()
1335        .write()
1336        .unwrap()
1337        .add_new_topic(topic.name(), topic.get_type(), &topic.qos());
1338
1339    // Create a Reader
1340    let (notification_sender, _notification_receiver) = mio_channel::sync_channel::<()>(100);
1341    let (_notification_event_source, notification_event_sender) =
1342      mio_source::make_poll_channel().unwrap();
1343    let data_reader_waker = Arc::new(Mutex::new(None));
1344
1345    let (status_sender, _status_receiver) = sync_status_channel::<DataReaderStatus>(4).unwrap();
1346    let (participant_status_sender, _participant_status_receiver) =
1347      sync_status_channel(16).unwrap();
1348
1349    let (_reader_command_sender, reader_command_receiver) =
1350      mio_channel::sync_channel::<ReaderCommand>(10);
1351
1352    let default_id = EntityId::default();
1353    let reader_guid = GUID::new_with_prefix_and_id(dp.guid_prefix(), default_id);
1354
1355    let reader_ing = ReaderIngredients {
1356      guid: reader_guid,
1357      notification_sender,
1358      status_sender,
1359      topic_name: topic.name(),
1360      topic_cache_handle: topic_cache,
1361      like_stateless: false,
1362      qos_policy: QosPolicies::qos_none(),
1363      data_reader_command_receiver: reader_command_receiver,
1364      data_reader_waker,
1365      poll_event_sender: notification_event_sender,
1366      security_plugins: None,
1367    };
1368
1369    let mut reader = Reader::new(
1370      reader_ing,
1371      Rc::new(UDPSender::new_with_random_port().unwrap()),
1372      mio_extras::timer::Builder::default().build(),
1373      participant_status_sender,
1374    );
1375
1376    // Create the corresponding matching DataReader
1377    let mut datareader = sub
1378      .create_datareader::<RandomData, CDRDeserializerAdapter<RandomData>>(&topic, None)
1379      .unwrap();
1380
1381    let writer_guid = GUID {
1382      prefix: GuidPrefix::new(&[1; 12]),
1383      entity_id: EntityId::create_custom_entity_id(
1384        [1; 3],
1385        EntityKind::WRITER_WITH_KEY_USER_DEFINED,
1386      ),
1387    };
1388    let mr_state = MessageReceiverState {
1389      source_guid_prefix: writer_guid.prefix,
1390      ..Default::default()
1391    };
1392    reader.matched_writer_add(
1393      writer_guid,
1394      EntityId::UNKNOWN,
1395      mr_state.unicast_reply_locator_list.to_vec(),
1396      mr_state.multicast_reply_locator_list.to_vec(),
1397      &QosPolicies::qos_none(),
1398    );
1399
1400    // Create 4 data items, 3 of which have the same key
1401    let data_key1 = RandomData {
1402      a: 1,
1403      b: ":D".to_string(),
1404    };
1405    let data_key2_1 = RandomData {
1406      a: 2,
1407      b: ":(".to_string(),
1408    };
1409    let data_key2_2 = RandomData {
1410      a: 2,
1411      b: "??".to_string(),
1412    };
1413    let data_key2_3 = RandomData {
1414      a: 2,
1415      b: "xD".to_string(),
1416    };
1417
1418    let key1 = data_key1.key();
1419    let key2 = data_key2_1.key();
1420
1421    assert!(data_key2_1.key() == data_key2_2.key());
1422    assert!(data_key2_3.key() == key2);
1423
1424    // Create data messages from the data items
1425    // Note that sequence numbering needs to continue as expected
1426    let data_msg = Data {
1427      reader_id: reader.entity_id(),
1428      writer_id: writer_guid.entity_id,
1429      writer_sn: SequenceNumber::from(1),
1430      serialized_payload: Some(
1431        SerializedPayload {
1432          representation_identifier: RepresentationIdentifier::CDR_LE,
1433          representation_options: [0, 0],
1434          value: Bytes::from(to_vec::<RandomData, LittleEndian>(&data_key1).unwrap()),
1435        }
1436        .into(),
1437      ),
1438      ..Data::default()
1439    };
1440    let data_msg2 = Data {
1441      reader_id: reader.entity_id(),
1442      writer_id: writer_guid.entity_id,
1443      writer_sn: SequenceNumber::from(2),
1444      serialized_payload: Some(
1445        SerializedPayload {
1446          representation_identifier: RepresentationIdentifier::CDR_LE,
1447          representation_options: [0, 0],
1448          value: Bytes::from(to_vec::<RandomData, LittleEndian>(&data_key2_1).unwrap()),
1449        }
1450        .into(),
1451      ),
1452      ..Data::default()
1453    };
1454    let data_msg3 = Data {
1455      reader_id: reader.entity_id(),
1456      writer_id: writer_guid.entity_id,
1457      writer_sn: SequenceNumber::from(3),
1458      serialized_payload: Some(
1459        SerializedPayload {
1460          representation_identifier: RepresentationIdentifier::CDR_LE,
1461          representation_options: [0, 0],
1462          value: Bytes::from(to_vec::<RandomData, LittleEndian>(&data_key2_2).unwrap()),
1463        }
1464        .into(),
1465      ),
1466      ..Data::default()
1467    };
1468    let data_msg4 = Data {
1469      reader_id: reader.entity_id(),
1470      writer_id: writer_guid.entity_id,
1471      writer_sn: SequenceNumber::from(4),
1472      serialized_payload: Some(
1473        SerializedPayload {
1474          representation_identifier: RepresentationIdentifier::CDR_LE,
1475          representation_options: [0, 0],
1476          value: Bytes::from(to_vec::<RandomData, LittleEndian>(&data_key2_3).unwrap()),
1477        }
1478        .into(),
1479      ),
1480      ..Data::default()
1481    };
1482
1483    let data_flags = DATA_Flags::Endianness | DATA_Flags::Data;
1484
1485    // Feed the data messages to the reader
1486    reader.handle_data_msg(data_msg, data_flags, &mr_state);
1487    reader.handle_data_msg(data_msg2, data_flags, &mr_state);
1488    reader.handle_data_msg(data_msg3, data_flags, &mr_state);
1489    reader.handle_data_msg(data_msg4, data_flags, &mr_state);
1490
1491    // Check that calling read_instance with different keys and SelectByKey options
1492    // works as expected
1493
1494    info!("calling read with key 1 and this");
1495    let results =
1496      datareader.read_instance(100, ReadCondition::any(), Some(key1), SelectByKey::This);
1497    assert_eq!(&data_key1, results.unwrap()[0].value().clone().unwrap());
1498
1499    info!("calling read with None and this");
1500    // Takes the smallest key, 1 in this case.
1501    let results = datareader.read_instance(100, ReadCondition::any(), None, SelectByKey::This);
1502    assert_eq!(&data_key1, results.unwrap()[0].value().clone().unwrap());
1503
1504    info!("calling read with key 1 and next");
1505    let results =
1506      datareader.read_instance(100, ReadCondition::any(), Some(key1), SelectByKey::Next);
1507    assert_eq!(results.as_ref().unwrap().len(), 3);
1508    assert_eq!(&data_key2_1, results.unwrap()[0].value().clone().unwrap());
1509
1510    // Check that calling take_instance returns all 3 samples with the same key
1511    info!("calling take with key 2 and this");
1512    let results =
1513      datareader.take_instance(100, ReadCondition::any(), Some(key2), SelectByKey::This);
1514    assert_eq!(results.as_ref().unwrap().len(), 3);
1515    let mut vec = results.unwrap();
1516    let d3 = vec.pop().unwrap();
1517    let d3 = d3.into_value().unwrap();
1518    let d2 = vec.pop().unwrap();
1519    let d2 = d2.into_value().unwrap();
1520    let d1 = vec.pop().unwrap();
1521    let d1 = d1.into_value().unwrap();
1522    assert_eq!(data_key2_3, d3);
1523    assert_eq!(data_key2_2, d2);
1524    assert_eq!(data_key2_1, d1);
1525
1526    // Check that calling take_instance again returns nothing because all samples
1527    // have been consumed
1528    info!("calling take with key 2 and this");
1529    let results =
1530      datareader.take_instance(100, ReadCondition::any(), Some(key2), SelectByKey::This);
1531    assert!(results.is_ok());
1532    assert!(results.unwrap().is_empty());
1533  }
1534}