use std::{
io,
pin::Pin,
sync::{Arc, Mutex, MutexGuard},
task::{Context, Poll},
};
#[allow(unused_imports)]
use log::{debug, error, info, trace, warn};
use futures::stream::{FusedStream, Stream};
use super::datasample_cache::DataSampleCache;
use crate::{
dds::{
adapters::with_key::{DefaultDecoder, *},
key::*,
qos::*,
readcondition::*,
result::ReadResult,
statusevents::*,
with_key::{datasample::*, simpledatareader::*},
ReadError,
},
discovery::sedp_messages::PublicationBuiltinTopicData,
serialization::CDRDeserializerAdapter,
structure::{duration::Duration, entity::RTPSEntity, guid::GUID, time::Timestamp},
};
pub type DataReaderCdr<D> = DataReader<D, CDRDeserializerAdapter<D>>;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum SelectByKey {
This,
Next,
}
pub struct DataReader<D: Keyed, DA: DeserializerAdapter<D> = CDRDeserializerAdapter<D>> {
simple_data_reader: SimpleDataReader<D, DA>,
datasample_cache: DataSampleCache<D>, }
impl<D: 'static, DA> DataReader<D, DA>
where
D: Keyed,
DA: DeserializerAdapter<D>,
{
pub(crate) fn from_simple_data_reader(simple_data_reader: SimpleDataReader<D, DA>) -> Self {
let dsc = DataSampleCache::new(simple_data_reader.qos().clone());
Self {
simple_data_reader,
datasample_cache: dsc,
}
}
}
impl<D: 'static, DA> DataReader<D, DA>
where
D: Keyed,
DA: DeserializerAdapter<D> + DefaultDecoder<D>,
{
fn fill_and_lock_local_datasample_cache(&mut self) -> ReadResult<()> {
while let Some(dcc) = self.simple_data_reader.try_take_one()? {
self
.datasample_cache
.fill_from_deserialized_cache_change(dcc);
}
Ok(())
}
fn drain_read_notifications(&self) {
self.simple_data_reader.drain_read_notifications();
}
fn select_keys_for_access(&self, read_condition: ReadCondition) -> Vec<(Timestamp, D::K)> {
self.datasample_cache.select_keys_for_access(read_condition)
}
fn take_by_keys(&mut self, keys: &[(Timestamp, D::K)]) -> Vec<DataSample<D>> {
self.datasample_cache.take_by_keys(keys)
}
fn take_bare_by_keys(&mut self, keys: &[(Timestamp, D::K)]) -> Vec<Sample<D, D::K>> {
self.datasample_cache.take_bare_by_keys(keys)
}
fn select_instance_keys_for_access(
&self,
instance: &D::K,
rc: ReadCondition,
) -> Vec<(Timestamp, D::K)> {
self
.datasample_cache
.select_instance_keys_for_access(instance, rc)
}
pub fn read(
&mut self,
max_samples: usize,
read_condition: ReadCondition,
) -> ReadResult<Vec<DataSample<&D>>> {
self.drain_read_notifications();
self.fill_and_lock_local_datasample_cache()?;
let mut selected = self.select_keys_for_access(read_condition);
selected.truncate(max_samples);
let result = self.datasample_cache.read_by_keys(&selected);
Ok(result)
}
pub fn take(
&mut self,
max_samples: usize,
read_condition: ReadCondition,
) -> ReadResult<Vec<DataSample<D>>> {
self.drain_read_notifications();
self.fill_and_lock_local_datasample_cache()?;
let mut selected = self.select_keys_for_access(read_condition);
trace!("take selected count = {}", selected.len());
selected.truncate(max_samples);
let result = self.take_by_keys(&selected);
trace!("take taken count = {}", result.len());
Ok(result)
}
pub fn read_next_sample(&mut self) -> ReadResult<Option<DataSample<&D>>> {
let mut ds = self.read(1, ReadCondition::not_read())?;
Ok(ds.pop())
}
pub fn take_next_sample(&mut self) -> ReadResult<Option<DataSample<D>>> {
let mut ds = self.take(1, ReadCondition::not_read())?;
Ok(ds.pop())
}
fn read_bare(
&mut self,
max_samples: usize,
read_condition: ReadCondition,
) -> ReadResult<Vec<Sample<&D, D::K>>> {
self.drain_read_notifications();
self.fill_and_lock_local_datasample_cache()?;
let mut selected = self.select_keys_for_access(read_condition);
selected.truncate(max_samples);
let result = self.datasample_cache.read_bare_by_keys(&selected);
Ok(result)
}
fn take_bare(
&mut self,
max_samples: usize,
read_condition: ReadCondition,
) -> ReadResult<Vec<Sample<D, D::K>>> {
self.drain_read_notifications();
self.fill_and_lock_local_datasample_cache()?;
let mut selected = self.select_keys_for_access(read_condition);
trace!("take bare selected count = {}", selected.len());
selected.truncate(max_samples);
let result = self.take_bare_by_keys(&selected);
trace!("take bare taken count = {}", result.len());
Ok(result)
}
pub fn iterator(&mut self) -> ReadResult<impl Iterator<Item = Sample<&D, D::K>>> {
Ok(
self
.read_bare(usize::MAX, ReadCondition::not_read())?
.into_iter(),
)
}
pub fn conditional_iterator(
&mut self,
read_condition: ReadCondition,
) -> ReadResult<impl Iterator<Item = Sample<&D, D::K>>> {
Ok(self.read_bare(usize::MAX, read_condition)?.into_iter())
}
pub fn into_iterator(&mut self) -> ReadResult<impl Iterator<Item = Sample<D, D::K>>> {
Ok(
self
.take_bare(usize::MAX, ReadCondition::not_read())?
.into_iter(),
)
}
pub fn into_conditional_iterator(
&mut self,
read_condition: ReadCondition,
) -> ReadResult<impl Iterator<Item = Sample<D, D::K>>> {
Ok(self.take_bare(usize::MAX, read_condition)?.into_iter())
}
fn infer_key(
&self,
instance_key: Option<<D as Keyed>::K>,
this_or_next: SelectByKey,
) -> Option<<D as Keyed>::K> {
match instance_key {
Some(k) => match this_or_next {
SelectByKey::This => Some(k),
SelectByKey::Next => self.datasample_cache.next_key(&k),
},
None => self.datasample_cache.instance_map.keys().next().cloned(),
}
}
pub fn read_instance(
&mut self,
max_samples: usize,
read_condition: ReadCondition,
instance_key: Option<<D as Keyed>::K>,
this_or_next: SelectByKey,
) -> ReadResult<Vec<DataSample<&D>>> {
self.drain_read_notifications();
self.fill_and_lock_local_datasample_cache()?;
let key = match Self::infer_key(self, instance_key, this_or_next) {
Some(k) => k,
None => return Ok(Vec::new()),
};
let mut selected = self
.datasample_cache
.select_instance_keys_for_access(&key, read_condition);
selected.truncate(max_samples);
let result = self.datasample_cache.read_by_keys(&selected);
Ok(result)
}
pub fn take_instance(
&mut self,
max_samples: usize,
read_condition: ReadCondition,
instance_key: Option<<D as Keyed>::K>,
this_or_next: SelectByKey,
) -> ReadResult<Vec<DataSample<D>>> {
self.drain_read_notifications();
self.fill_and_lock_local_datasample_cache()?;
let key = match self.infer_key(instance_key, this_or_next) {
Some(k) => k,
None => return Ok(Vec::new()),
};
let mut selected = self.select_instance_keys_for_access(&key, read_condition);
selected.truncate(max_samples);
let result = self.take_by_keys(&selected);
Ok(result)
}
pub fn wait_for_historical_data(&mut self, _max_wait: Duration) -> bool {
todo!()
}
pub fn get_matched_publications(&self) -> impl Iterator<Item = PublicationBuiltinTopicData> {
vec![].into_iter()
}
pub fn async_bare_sample_stream(self) -> BareDataReaderStream<D, DA> {
BareDataReaderStream {
datareader: Arc::new(Mutex::new(self)),
}
}
pub fn async_sample_stream(self) -> DataReaderStream<D, DA> {
DataReaderStream {
datareader: Arc::new(Mutex::new(self)),
}
}
}
impl<D, DA> mio_06::Evented for DataReader<D, DA>
where
D: Keyed,
DA: DeserializerAdapter<D>,
{
fn register(
&self,
poll: &mio_06::Poll,
token: mio_06::Token,
interest: mio_06::Ready,
opts: mio_06::PollOpt,
) -> io::Result<()> {
self
.simple_data_reader
.register(poll, token, interest, opts)
}
fn reregister(
&self,
poll: &mio_06::Poll,
token: mio_06::Token,
interest: mio_06::Ready,
opts: mio_06::PollOpt,
) -> io::Result<()> {
self
.simple_data_reader
.reregister(poll, token, interest, opts)
}
fn deregister(&self, poll: &mio_06::Poll) -> io::Result<()> {
self.simple_data_reader.deregister(poll)
}
}
impl<D, DA> mio_08::event::Source for DataReader<D, DA>
where
D: Keyed,
DA: DeserializerAdapter<D>,
{
fn register(
&mut self,
registry: &mio_08::Registry,
token: mio_08::Token,
interests: mio_08::Interest,
) -> io::Result<()> {
<SimpleDataReader<D, DA> as mio_08::event::Source>::register(
&mut self.simple_data_reader,
registry,
token,
interests,
)
}
fn reregister(
&mut self,
registry: &mio_08::Registry,
token: mio_08::Token,
interests: mio_08::Interest,
) -> io::Result<()> {
<SimpleDataReader<D, DA> as mio_08::event::Source>::reregister(
&mut self.simple_data_reader,
registry,
token,
interests,
)
}
fn deregister(&mut self, registry: &mio_08::Registry) -> io::Result<()> {
<SimpleDataReader<D, DA> as mio_08::event::Source>::deregister(
&mut self.simple_data_reader,
registry,
)
}
}
impl<'a, D, DA> StatusEvented<'a, DataReaderStatus, SimpleDataReaderEventStream<'a, D, DA>>
for DataReader<D, DA>
where
D: Keyed + 'static,
DA: DeserializerAdapter<D>,
{
fn as_status_evented(&mut self) -> &dyn mio_06::Evented {
self.simple_data_reader.as_status_evented()
}
fn as_status_source(&mut self) -> &mut dyn mio_08::event::Source {
self.simple_data_reader.as_status_source()
}
fn as_async_status_stream(&'a self) -> SimpleDataReaderEventStream<'a, D, DA> {
self.simple_data_reader.as_async_status_stream()
}
fn try_recv_status(&self) -> Option<DataReaderStatus> {
self.simple_data_reader.try_recv_status()
}
}
impl<D, DA> HasQoSPolicy for DataReader<D, DA>
where
D: Keyed + 'static,
DA: DeserializerAdapter<D>,
{
fn qos(&self) -> QosPolicies {
self.simple_data_reader.qos().clone()
}
}
impl<D, DA> RTPSEntity for DataReader<D, DA>
where
D: Keyed + 'static,
DA: DeserializerAdapter<D>,
{
fn guid(&self) -> GUID {
self.simple_data_reader.guid()
}
}
pub struct BareDataReaderStream<
D: Keyed + 'static,
DA: DeserializerAdapter<D> + 'static = CDRDeserializerAdapter<D>,
> {
datareader: Arc<Mutex<DataReader<D, DA>>>,
}
impl<D, DA> BareDataReaderStream<D, DA>
where
D: Keyed + 'static,
DA: DeserializerAdapter<D>,
{
pub fn async_event_stream(&self) -> DataReaderEventStream<D, DA> {
DataReaderEventStream {
datareader: Arc::clone(&self.datareader),
}
}
fn lock_datareader(&self) -> ReadResult<MutexGuard<'_, DataReader<D, DA>>> {
self.datareader.lock().map_err(|e| ReadError::Poisoned {
reason: format!("BareDataReaderStream could not lock datareader: {e:?}"),
})
}
}
impl<D, DA> Unpin for BareDataReaderStream<D, DA>
where
D: Keyed + 'static,
DA: DeserializerAdapter<D>,
{
}
impl<D, DA> Stream for BareDataReaderStream<D, DA>
where
D: Keyed + 'static,
DA: DeserializerAdapter<D> + DefaultDecoder<D>,
{
type Item = ReadResult<Sample<D, D::K>>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
debug!("poll_next");
let mut datareader = match self.lock_datareader() {
Ok(g) => g,
Err(e) => return Poll::Ready(Some(Err(e))),
};
match datareader.take_bare(1, ReadCondition::not_read()) {
Err(e) =>
{
Poll::Ready(Some(Err(e)))
}
Ok(mut v) => {
match v.pop() {
Some(d) => Poll::Ready(Some(Ok(d))),
None => {
datareader
.simple_data_reader
.set_waker(Some(cx.waker().clone()));
match datareader.take_bare(1, ReadCondition::not_read()) {
Err(e) => Poll::Ready(Some(Err(e))),
Ok(mut v) => match v.pop() {
None => Poll::Pending,
Some(d) => Poll::Ready(Some(Ok(d))),
},
}
}
}
}
}
}
}
impl<D, DA> FusedStream for BareDataReaderStream<D, DA>
where
D: Keyed + 'static,
DA: DeserializerAdapter<D> + DefaultDecoder<D>,
{
fn is_terminated(&self) -> bool {
false }
}
pub struct DataReaderStream<
D: Keyed + 'static,
DA: DeserializerAdapter<D> + 'static = CDRDeserializerAdapter<D>,
> {
datareader: Arc<Mutex<DataReader<D, DA>>>,
}
impl<D, DA> DataReaderStream<D, DA>
where
D: Keyed + 'static,
DA: DeserializerAdapter<D>,
{
pub fn async_event_stream(&self) -> DataReaderEventStream<D, DA> {
DataReaderEventStream {
datareader: Arc::clone(&self.datareader),
}
}
fn lock_datareader(&self) -> ReadResult<MutexGuard<'_, DataReader<D, DA>>> {
self.datareader.lock().map_err(|e| ReadError::Poisoned {
reason: format!("DataReaderStream could not lock datareader: {e:?}"),
})
}
}
impl<D, DA> Unpin for DataReaderStream<D, DA>
where
D: Keyed + 'static,
DA: DeserializerAdapter<D>,
{
}
impl<D, DA> Stream for DataReaderStream<D, DA>
where
D: Keyed + 'static,
DA: DeserializerAdapter<D> + DefaultDecoder<D>,
{
type Item = ReadResult<DataSample<D>>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
debug!("poll_next");
let mut datareader = match self.lock_datareader() {
Ok(g) => g,
Err(e) => return Poll::Ready(Some(Err(e))),
};
match datareader.take(1, ReadCondition::not_read()) {
Err(e) =>
{
Poll::Ready(Some(Err(e)))
}
Ok(mut v) => {
match v.pop() {
Some(d) => Poll::Ready(Some(Ok(d))),
None => {
datareader
.simple_data_reader
.set_waker(Some(cx.waker().clone()));
match datareader.take(1, ReadCondition::not_read()) {
Err(e) => Poll::Ready(Some(Err(e))),
Ok(mut v) => match v.pop() {
None => Poll::Pending,
Some(d) => Poll::Ready(Some(Ok(d))),
},
}
}
}
}
}
}
}
impl<D, DA> FusedStream for DataReaderStream<D, DA>
where
D: Keyed + 'static,
DA: DeserializerAdapter<D> + DefaultDecoder<D>,
{
fn is_terminated(&self) -> bool {
false }
}
pub struct DataReaderEventStream<
D: Keyed + 'static,
DA: DeserializerAdapter<D> + 'static = CDRDeserializerAdapter<D>,
> {
datareader: Arc<Mutex<DataReader<D, DA>>>,
}
impl<D, DA> DataReaderEventStream<D, DA>
where
D: Keyed + 'static,
DA: DeserializerAdapter<D>,
{
fn lock_datareader(&self) -> ReadResult<MutexGuard<'_, DataReader<D, DA>>> {
self.datareader.lock().map_err(|e| ReadError::Poisoned {
reason: format!("DataReaderEventStream could not lock datareader: {e:?}"),
})
}
}
impl<D, DA> Stream for DataReaderEventStream<D, DA>
where
D: Keyed + 'static,
DA: DeserializerAdapter<D>,
{
type Item = DataReaderStatus;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let datareader = match self.lock_datareader() {
Ok(g) => g,
Err(_e) => return Poll::Ready(None),
};
Pin::new(&mut datareader.simple_data_reader.as_async_status_stream()).poll_next(cx)
}
}
impl<D, DA> FusedStream for DataReaderEventStream<D, DA>
where
D: Keyed + 'static,
DA: DeserializerAdapter<D>,
{
fn is_terminated(&self) -> bool {
false }
}
#[cfg(test)]
mod tests {
use std::rc::Rc;
use bytes::Bytes;
use mio_extras::channel as mio_channel;
use log::info;
use byteorder::LittleEndian;
use super::*;
use crate::{
dds::{
participant::DomainParticipant,
topic::{TopicDescription, TopicKind},
},
messages::submessages::{
elements::serialized_payload::SerializedPayload, submessage_flag::*, submessages::Data,
},
mio_source,
network::udp_sender::UDPSender,
rtps::{
message_receiver::*,
reader::{Reader, ReaderIngredients},
},
serialization::to_vec,
structure::{
guid::{EntityId, EntityKind, GuidPrefix},
sequence_number::SequenceNumber,
},
test::random_data::*,
RepresentationIdentifier,
};
#[test]
fn read_and_take() {
let dp = DomainParticipant::new(0).expect("Participant creation failed!");
let mut qos = QosPolicies::qos_none();
qos.history = Some(policy::History::KeepAll);
let sub = dp.create_subscriber(&qos).unwrap();
let topic = dp
.create_topic(
"dr read".to_string(),
"read fn test?".to_string(),
&qos,
TopicKind::WithKey,
)
.unwrap();
let topic_cache =
dp.dds_cache()
.write()
.unwrap()
.add_new_topic(topic.name(), topic.get_type(), &topic.qos());
let (notification_sender, _notification_receiver) = mio_channel::sync_channel::<()>(100);
let (_notification_event_source, notification_event_sender) =
mio_source::make_poll_channel().unwrap();
let data_reader_waker = Arc::new(Mutex::new(None));
let (status_sender, _status_receiver) = sync_status_channel::<DataReaderStatus>(4).unwrap();
let (participant_status_sender, _participant_status_receiver) =
sync_status_channel(16).unwrap();
let (_reader_command_sender, reader_command_receiver) =
mio_channel::sync_channel::<ReaderCommand>(10);
let default_id = EntityId::default();
let reader_guid = GUID::new_with_prefix_and_id(dp.guid_prefix(), default_id);
let reader_ing = ReaderIngredients {
guid: reader_guid,
notification_sender,
status_sender,
topic_name: topic.name(),
topic_cache_handle: topic_cache,
like_stateless: false,
qos_policy: QosPolicies::qos_none(),
data_reader_command_receiver: reader_command_receiver,
data_reader_waker,
poll_event_sender: notification_event_sender,
security_plugins: None,
};
let mut reader = Reader::new(
reader_ing,
Rc::new(UDPSender::new_with_random_port().unwrap()),
mio_extras::timer::Builder::default().build(),
participant_status_sender,
);
let mut datareader = sub
.create_datareader::<RandomData, CDRDeserializerAdapter<RandomData>>(&topic, None)
.unwrap();
let writer_guid = GUID {
prefix: GuidPrefix::new(&[1; 12]),
entity_id: EntityId::create_custom_entity_id(
[1; 3],
EntityKind::WRITER_WITH_KEY_USER_DEFINED,
),
};
let mr_state = MessageReceiverState {
source_guid_prefix: writer_guid.prefix,
..Default::default()
};
reader.matched_writer_add(
writer_guid,
EntityId::UNKNOWN,
mr_state.unicast_reply_locator_list.to_vec(),
mr_state.multicast_reply_locator_list.to_vec(),
&QosPolicies::qos_none(),
);
let test_data = RandomData {
a: 10,
b: ":DDD".to_string(),
};
let test_data2 = RandomData {
a: 11,
b: ":)))".to_string(),
};
let data_msg = Data {
reader_id: reader.entity_id(),
writer_id: writer_guid.entity_id,
writer_sn: SequenceNumber::from(1),
serialized_payload: Some(
SerializedPayload {
representation_identifier: RepresentationIdentifier::CDR_LE,
representation_options: [0, 0],
value: Bytes::from(to_vec::<RandomData, LittleEndian>(&test_data).unwrap()),
}
.into(),
),
..Data::default()
};
let data_msg2 = Data {
reader_id: reader.entity_id(),
writer_id: writer_guid.entity_id,
writer_sn: SequenceNumber::from(2),
serialized_payload: Some(
SerializedPayload {
representation_identifier: RepresentationIdentifier::CDR_LE,
representation_options: [0, 0],
value: Bytes::from(to_vec::<RandomData, LittleEndian>(&test_data2).unwrap()),
}
.into(),
),
..Data::default()
};
let data_flags = DATA_Flags::Endianness | DATA_Flags::Data;
reader.handle_data_msg(data_msg, data_flags, &mr_state);
reader.handle_data_msg(data_msg2, data_flags, &mr_state);
{
let result_vec = datareader.read(100, ReadCondition::any()).unwrap();
assert_eq!(result_vec.len(), 2);
let d = result_vec[0].value().clone().unwrap();
assert_eq!(&test_data, d);
}
{
let result_vec2 = datareader.read(100, ReadCondition::any()).unwrap();
assert_eq!(result_vec2.len(), 2);
let d2 = result_vec2[1].value().clone().unwrap();
assert_eq!(&test_data2, d2);
}
{
let result_vec3 = datareader.read(100, ReadCondition::any()).unwrap();
let d3 = result_vec3[0].value().clone().unwrap();
assert_eq!(&test_data, d3);
}
let mut result_vec = datareader.take(100, ReadCondition::any()).unwrap();
let datasample2 = result_vec.pop().unwrap();
let datasample1 = result_vec.pop().unwrap();
let data2 = datasample2.into_value().unwrap();
let data1 = datasample1.into_value().unwrap();
assert_eq!(test_data2, data2);
assert_eq!(test_data, data1);
let result_vec2 = datareader.take(100, ReadCondition::any());
assert!(result_vec2.is_ok());
assert_eq!(result_vec2.unwrap().len(), 0);
}
#[test]
fn read_and_take_with_instance() {
let dp = DomainParticipant::new(0).expect("Participant creation failed!");
let mut qos = QosPolicies::qos_none();
qos.history = Some(policy::History::KeepAll);
let sub = dp.create_subscriber(&qos).unwrap();
let topic = dp
.create_topic(
"dr read".to_string(),
"read fn test?".to_string(),
&qos,
TopicKind::WithKey,
)
.unwrap();
let topic_cache =
dp.dds_cache()
.write()
.unwrap()
.add_new_topic(topic.name(), topic.get_type(), &topic.qos());
let (notification_sender, _notification_receiver) = mio_channel::sync_channel::<()>(100);
let (_notification_event_source, notification_event_sender) =
mio_source::make_poll_channel().unwrap();
let data_reader_waker = Arc::new(Mutex::new(None));
let (status_sender, _status_receiver) = sync_status_channel::<DataReaderStatus>(4).unwrap();
let (participant_status_sender, _participant_status_receiver) =
sync_status_channel(16).unwrap();
let (_reader_command_sender, reader_command_receiver) =
mio_channel::sync_channel::<ReaderCommand>(10);
let default_id = EntityId::default();
let reader_guid = GUID::new_with_prefix_and_id(dp.guid_prefix(), default_id);
let reader_ing = ReaderIngredients {
guid: reader_guid,
notification_sender,
status_sender,
topic_name: topic.name(),
topic_cache_handle: topic_cache,
like_stateless: false,
qos_policy: QosPolicies::qos_none(),
data_reader_command_receiver: reader_command_receiver,
data_reader_waker,
poll_event_sender: notification_event_sender,
security_plugins: None,
};
let mut reader = Reader::new(
reader_ing,
Rc::new(UDPSender::new_with_random_port().unwrap()),
mio_extras::timer::Builder::default().build(),
participant_status_sender,
);
let mut datareader = sub
.create_datareader::<RandomData, CDRDeserializerAdapter<RandomData>>(&topic, None)
.unwrap();
let writer_guid = GUID {
prefix: GuidPrefix::new(&[1; 12]),
entity_id: EntityId::create_custom_entity_id(
[1; 3],
EntityKind::WRITER_WITH_KEY_USER_DEFINED,
),
};
let mr_state = MessageReceiverState {
source_guid_prefix: writer_guid.prefix,
..Default::default()
};
reader.matched_writer_add(
writer_guid,
EntityId::UNKNOWN,
mr_state.unicast_reply_locator_list.to_vec(),
mr_state.multicast_reply_locator_list.to_vec(),
&QosPolicies::qos_none(),
);
let data_key1 = RandomData {
a: 1,
b: ":D".to_string(),
};
let data_key2_1 = RandomData {
a: 2,
b: ":(".to_string(),
};
let data_key2_2 = RandomData {
a: 2,
b: "??".to_string(),
};
let data_key2_3 = RandomData {
a: 2,
b: "xD".to_string(),
};
let key1 = data_key1.key();
let key2 = data_key2_1.key();
assert!(data_key2_1.key() == data_key2_2.key());
assert!(data_key2_3.key() == key2);
let data_msg = Data {
reader_id: reader.entity_id(),
writer_id: writer_guid.entity_id,
writer_sn: SequenceNumber::from(1),
serialized_payload: Some(
SerializedPayload {
representation_identifier: RepresentationIdentifier::CDR_LE,
representation_options: [0, 0],
value: Bytes::from(to_vec::<RandomData, LittleEndian>(&data_key1).unwrap()),
}
.into(),
),
..Data::default()
};
let data_msg2 = Data {
reader_id: reader.entity_id(),
writer_id: writer_guid.entity_id,
writer_sn: SequenceNumber::from(2),
serialized_payload: Some(
SerializedPayload {
representation_identifier: RepresentationIdentifier::CDR_LE,
representation_options: [0, 0],
value: Bytes::from(to_vec::<RandomData, LittleEndian>(&data_key2_1).unwrap()),
}
.into(),
),
..Data::default()
};
let data_msg3 = Data {
reader_id: reader.entity_id(),
writer_id: writer_guid.entity_id,
writer_sn: SequenceNumber::from(3),
serialized_payload: Some(
SerializedPayload {
representation_identifier: RepresentationIdentifier::CDR_LE,
representation_options: [0, 0],
value: Bytes::from(to_vec::<RandomData, LittleEndian>(&data_key2_2).unwrap()),
}
.into(),
),
..Data::default()
};
let data_msg4 = Data {
reader_id: reader.entity_id(),
writer_id: writer_guid.entity_id,
writer_sn: SequenceNumber::from(4),
serialized_payload: Some(
SerializedPayload {
representation_identifier: RepresentationIdentifier::CDR_LE,
representation_options: [0, 0],
value: Bytes::from(to_vec::<RandomData, LittleEndian>(&data_key2_3).unwrap()),
}
.into(),
),
..Data::default()
};
let data_flags = DATA_Flags::Endianness | DATA_Flags::Data;
reader.handle_data_msg(data_msg, data_flags, &mr_state);
reader.handle_data_msg(data_msg2, data_flags, &mr_state);
reader.handle_data_msg(data_msg3, data_flags, &mr_state);
reader.handle_data_msg(data_msg4, data_flags, &mr_state);
info!("calling read with key 1 and this");
let results =
datareader.read_instance(100, ReadCondition::any(), Some(key1), SelectByKey::This);
assert_eq!(&data_key1, results.unwrap()[0].value().clone().unwrap());
info!("calling read with None and this");
let results = datareader.read_instance(100, ReadCondition::any(), None, SelectByKey::This);
assert_eq!(&data_key1, results.unwrap()[0].value().clone().unwrap());
info!("calling read with key 1 and next");
let results =
datareader.read_instance(100, ReadCondition::any(), Some(key1), SelectByKey::Next);
assert_eq!(results.as_ref().unwrap().len(), 3);
assert_eq!(&data_key2_1, results.unwrap()[0].value().clone().unwrap());
info!("calling take with key 2 and this");
let results =
datareader.take_instance(100, ReadCondition::any(), Some(key2), SelectByKey::This);
assert_eq!(results.as_ref().unwrap().len(), 3);
let mut vec = results.unwrap();
let d3 = vec.pop().unwrap();
let d3 = d3.into_value().unwrap();
let d2 = vec.pop().unwrap();
let d2 = d2.into_value().unwrap();
let d1 = vec.pop().unwrap();
let d1 = d1.into_value().unwrap();
assert_eq!(data_key2_3, d3);
assert_eq!(data_key2_2, d2);
assert_eq!(data_key2_1, d1);
info!("calling take with key 2 and this");
let results =
datareader.take_instance(100, ReadCondition::any(), Some(key2), SelectByKey::This);
assert!(results.is_ok());
assert!(results.unwrap().is_empty());
}
}