use std::{
cmp::max,
collections::BTreeMap,
io,
marker::PhantomData,
sync::{Arc, RwLock},
};
use serde::de::DeserializeOwned;
use mio_extras::channel as mio_channel;
#[allow(unused_imports)]
use log::{debug, error, info, warn};
use mio::{Evented, Poll, PollOpt, Ready, Token};
use crate::{
dds::{
datasample_cache::DataSampleCache,
ddsdata::DDSData,
pubsub::Subscriber,
qos::*,
readcondition::*,
statusevents::*,
topic::Topic,
traits::{key::*, serde_adapters::with_key::*, TopicDescription},
values::result::*,
with_key::datasample::*,
},
discovery::{data_types::topic_data::PublicationBuiltinTopicData, discovery::DiscoveryCommand},
log_and_err_precondition_not_met,
serialization::CDRDeserializerAdapter,
structure::{
cache_change::CacheChange,
dds_cache::DDSCache,
duration::Duration,
entity::RTPSEntity,
guid::{EntityId, GUID},
sequence_number::SequenceNumber,
time::Timestamp,
},
};
pub type DataReaderCdr<D> = DataReader<D, CDRDeserializerAdapter<D>>;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum SelectByKey {
This,
Next,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub(crate) enum ReaderCommand {
#[allow(dead_code)] ResetRequestedDeadlineStatus,
}
pub struct DataReader<
D: Keyed + DeserializeOwned,
DA: DeserializerAdapter<D> = CDRDeserializerAdapter<D>,
> {
#[allow(dead_code)] my_subscriber: Subscriber,
my_topic: Topic,
qos_policy: QosPolicies,
my_guid: GUID,
pub(crate) notification_receiver: mio_channel::Receiver<()>,
dds_cache: Arc<RwLock<DDSCache>>,
datasample_cache: DataSampleCache<D>,
latest_instant: Timestamp,
latest_sequence_number: BTreeMap<GUID, SequenceNumber>,
deserializer_type: PhantomData<DA>,
discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
status_receiver: StatusReceiver<DataReaderStatus>,
#[allow(dead_code)] reader_command: mio_channel::SyncSender<ReaderCommand>,
}
impl<D, DA> Drop for DataReader<D, DA>
where
D: Keyed + DeserializeOwned,
DA: DeserializerAdapter<D>,
{
fn drop(&mut self) {
self.my_subscriber.remove_reader(self.my_guid);
match self
.discovery_command
.send(DiscoveryCommand::RemoveLocalReader { guid: self.guid() })
{
Ok(_) => {}
Err(mio_channel::SendError::Disconnected(_)) => {
debug!("Failed to send REMOVE_LOCAL_READER DiscoveryCommand. Maybe shutting down?");
}
Err(e) => error!(
"Failed to send REMOVE_LOCAL_READER DiscoveryCommand. {:?}",
e
),
}
}
}
impl<D: 'static, DA> DataReader<D, DA>
where
D: DeserializeOwned + Keyed,
<D as Keyed>::K: Key,
DA: DeserializerAdapter<D>,
{
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
subscriber: Subscriber,
my_id: EntityId,
topic: Topic,
qos_policy: QosPolicies,
notification_receiver: mio_channel::Receiver<()>,
dds_cache: Arc<RwLock<DDSCache>>,
discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
status_channel_rec: mio_channel::Receiver<DataReaderStatus>,
reader_command: mio_channel::SyncSender<ReaderCommand>,
) -> Result<Self> {
let dp = match subscriber.participant() {
Some(dp) => dp,
None => {
return log_and_err_precondition_not_met!(
"Cannot create new DataReader, DomainParticipant doesn't exist."
)
}
};
let my_guid = GUID::new_with_prefix_and_id(dp.guid_prefix(), my_id);
Ok(Self {
my_subscriber: subscriber,
qos_policy,
my_guid,
notification_receiver,
dds_cache,
datasample_cache: DataSampleCache::new(topic.qos()),
my_topic: topic,
latest_instant: Timestamp::now(),
latest_sequence_number: BTreeMap::new(),
deserializer_type: PhantomData,
discovery_command,
status_receiver: StatusReceiver::new(status_channel_rec),
reader_command,
})
}
pub fn read(
&mut self,
max_samples: usize,
read_condition: ReadCondition,
) -> Result<Vec<DataSample<&D>>> {
while self.notification_receiver.try_recv().is_ok() {}
self.fill_local_datasample_cache();
let mut selected = self.datasample_cache.select_keys_for_access(read_condition);
selected.truncate(max_samples);
let result = self.datasample_cache.read_by_keys(&selected);
Ok(result)
}
pub fn take(
&mut self,
max_samples: usize,
read_condition: ReadCondition,
) -> Result<Vec<DataSample<D>>> {
while self.notification_receiver.try_recv().is_ok() {}
self.fill_local_datasample_cache();
let mut selected = self.datasample_cache.select_keys_for_access(read_condition);
debug!("take selected count = {}", selected.len());
selected.truncate(max_samples);
let result = self.datasample_cache.take_by_keys(&selected);
debug!("take taken count = {}", result.len());
Ok(result)
}
pub fn read_next_sample(&mut self) -> Result<Option<DataSample<&D>>> {
let mut ds = self.read(1, ReadCondition::not_read())?;
Ok(ds.pop())
}
pub fn take_next_sample(&mut self) -> Result<Option<DataSample<D>>> {
let mut ds = self.take(1, ReadCondition::not_read())?;
Ok(ds.pop())
}
fn read_bare(
&mut self,
max_samples: usize,
read_condition: ReadCondition,
) -> Result<Vec<std::result::Result<&D, D::K>>> {
while self.notification_receiver.try_recv().is_ok() {}
self.fill_local_datasample_cache();
let mut selected = self.datasample_cache.select_keys_for_access(read_condition);
selected.truncate(max_samples);
let result = self.datasample_cache.read_bare_by_keys(&selected);
Ok(result)
}
fn take_bare(
&mut self,
max_samples: usize,
read_condition: ReadCondition,
) -> Result<Vec<std::result::Result<D, D::K>>> {
while self.notification_receiver.try_recv().is_ok() {}
self.fill_local_datasample_cache();
let mut selected = self.datasample_cache.select_keys_for_access(read_condition);
debug!("take bare selected count = {}", selected.len());
selected.truncate(max_samples);
let result = self.datasample_cache.take_bare_by_keys(&selected);
debug!("take bare taken count = {}", result.len());
Ok(result)
}
pub fn iterator(&mut self) -> Result<impl Iterator<Item = std::result::Result<&D, D::K>>> {
Ok(
self
.read_bare(std::usize::MAX, ReadCondition::not_read())?
.into_iter(),
)
}
pub fn conditional_iterator(
&mut self,
read_condition: ReadCondition,
) -> Result<impl Iterator<Item = std::result::Result<&D, D::K>>> {
Ok(self.read_bare(std::usize::MAX, read_condition)?.into_iter())
}
pub fn into_iterator(&mut self) -> Result<impl Iterator<Item = std::result::Result<D, D::K>>> {
Ok(
self
.take_bare(std::usize::MAX, ReadCondition::not_read())?
.into_iter(),
)
}
pub fn into_conditional_iterator(
&mut self,
read_condition: ReadCondition,
) -> Result<impl Iterator<Item = std::result::Result<D, D::K>>> {
Ok(self.take_bare(std::usize::MAX, read_condition)?.into_iter())
}
fn fill_local_datasample_cache(&mut self) {
let is_reliable = matches!(
self.qos_policy.reliability(),
Some(policy::Reliability::Reliable { .. })
);
let dds_cache = match self.dds_cache.read() {
Ok(rwlock) => rwlock,
Err(e) => panic!(
"The DDSCache of domain participant is poisoned. Error: {}",
e
),
};
let cache_changes = dds_cache.topic_get_changes_in_range(
&self.my_topic.name(),
&self.latest_instant,
&Timestamp::now(),
);
let mut cache_changes_vec: Vec<(Timestamp, &CacheChange)> = cache_changes.collect();
cache_changes_vec.sort_by_key(|(_ts, cc)| cc.sequence_number);
for (
instant,
CacheChange {
writer_guid,
sequence_number,
write_options,
data_value,
},
) in cache_changes_vec
{
self.latest_instant = max(self.latest_instant, instant); let latest_sequence_number_have_already = self.latest_sequence_number.get(writer_guid);
if (! is_reliable &&
latest_sequence_number_have_already
.map_or(true, |latest| sequence_number > latest))
|| (is_reliable &&
latest_sequence_number_have_already
.map_or(true, |latest| *latest + SequenceNumber::from(1) == *sequence_number))
{
self
.latest_sequence_number
.insert(*writer_guid, *sequence_number);
match data_value {
DDSData::Data { serialized_payload } => {
if let Some(recognized_rep_id) = DA::supported_encodings()
.iter()
.find(|r| **r == serialized_payload.representation_identifier)
{
match DA::from_bytes(&serialized_payload.value, *recognized_rep_id) {
Ok(payload) => self.datasample_cache.add_sample(
Ok(payload),
*writer_guid,
*sequence_number,
instant,
write_options.clone(),
),
Err(e) => {
error!(
"Failed to deserialize bytes: {}, Topic = {}, Type = {:?}",
e,
self.my_topic.name(),
self.my_topic.get_type()
);
info!("Bytes were {:?}", &serialized_payload.value);
continue; }
}
} else {
warn!(
"Unknown representation id {:?}.",
serialized_payload.representation_identifier
);
info!("Serialized payload was {:?}", &serialized_payload);
continue; }
}
DDSData::DisposeByKey {
key: serialized_key,
..
} => {
match DA::key_from_bytes(
&serialized_key.value,
serialized_key.representation_identifier,
) {
Ok(key) => {
self.datasample_cache.add_sample(
Err(key),
*writer_guid,
*sequence_number,
instant,
write_options.clone(),
);
}
Err(e) => {
warn!(
"Failed to deserialize key {}, Topic = {}, Type = {:?}",
e,
self.my_topic.name(),
self.my_topic.get_type()
);
debug!("Bytes were {:?}", &serialized_key.value);
continue; }
}
}
DDSData::DisposeByKeyHash { key_hash, .. } => {
if let Some(key) = self.datasample_cache.key_by_hash(*key_hash) {
self.datasample_cache.add_sample(
Err(key),
*writer_guid,
*sequence_number,
instant,
write_options.clone(),
);
} else {
warn!("Tried to dispose with unkonwn key hash: {:x?}", key_hash);
}
}
} }
else {
}
} }
fn infer_key(
&self,
instance_key: Option<<D as Keyed>::K>,
this_or_next: SelectByKey,
) -> Option<<D as Keyed>::K> {
match instance_key {
Some(k) => match this_or_next {
SelectByKey::This => Some(k),
SelectByKey::Next => self.datasample_cache.next_key(&k),
},
None => self.datasample_cache.instance_map.keys().next().cloned(),
}
}
pub fn read_instance(
&mut self,
max_samples: usize,
read_condition: ReadCondition,
instance_key: Option<<D as Keyed>::K>,
this_or_next: SelectByKey,
) -> Result<Vec<DataSample<&D>>> {
while self.notification_receiver.try_recv().is_ok() {}
self.fill_local_datasample_cache();
let key = match self.infer_key(instance_key, this_or_next) {
Some(k) => k,
None => return Ok(Vec::new()),
};
let mut selected = self
.datasample_cache
.select_instance_keys_for_access(&key, read_condition);
selected.truncate(max_samples);
let result = self.datasample_cache.read_by_keys(&selected);
Ok(result)
}
pub fn take_instance(
&mut self,
max_samples: usize,
read_condition: ReadCondition,
instance_key: Option<<D as Keyed>::K>,
this_or_next: SelectByKey,
) -> Result<Vec<DataSample<D>>> {
while self.notification_receiver.try_recv().is_ok() {}
self.fill_local_datasample_cache();
let key = match self.infer_key(instance_key, this_or_next) {
Some(k) => k,
None => return Ok(Vec::new()),
};
let mut selected = self
.datasample_cache
.select_instance_keys_for_access(&key, read_condition);
selected.truncate(max_samples);
let result = self.datasample_cache.take_by_keys(&selected);
Ok(result)
}
pub fn wait_for_historical_data(&self, _max_wait: Duration) -> bool {
todo!()
}
pub fn get_matched_publications(&self) -> impl Iterator<Item = PublicationBuiltinTopicData> {
vec![].into_iter()
}
}
impl<D, DA> Evented for DataReader<D, DA>
where
D: Keyed + DeserializeOwned,
DA: DeserializerAdapter<D>,
{
fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
self
.notification_receiver
.register(poll, token, interest, opts)
}
fn reregister(
&self,
poll: &Poll,
token: Token,
interest: Ready,
opts: PollOpt,
) -> io::Result<()> {
self
.notification_receiver
.reregister(poll, token, interest, opts)
}
fn deregister(&self, poll: &Poll) -> io::Result<()> {
self.notification_receiver.deregister(poll)
}
}
impl<D, DA> StatusEvented<DataReaderStatus> for DataReader<D, DA>
where
D: Keyed + DeserializeOwned,
DA: DeserializerAdapter<D>,
{
fn as_status_evented(&mut self) -> &dyn Evented {
self.status_receiver.as_status_evented()
}
fn try_recv_status(&self) -> Option<DataReaderStatus> {
self.status_receiver.try_recv_status()
}
}
impl<D, DA> HasQoSPolicy for DataReader<D, DA>
where
D: Keyed + DeserializeOwned,
DA: DeserializerAdapter<D>,
{
fn qos(&self) -> QosPolicies {
self.qos_policy.clone()
}
}
impl<D, DA> RTPSEntity for DataReader<D, DA>
where
D: Keyed + DeserializeOwned,
DA: DeserializerAdapter<D>,
{
fn guid(&self) -> GUID {
self.my_guid
}
}
#[cfg(test)]
mod tests {
use std::rc::Rc;
use bytes::Bytes;
use mio_extras::channel as mio_channel;
use log::info;
use byteorder::LittleEndian;
use super::*;
use crate::{
dds::{
message_receiver::*,
participant::DomainParticipant,
reader::{Reader, ReaderIngredients},
topic::TopicKind,
traits::key::Keyed,
},
messages::submessages::{
data::Data,
submessage_elements::serialized_payload::{RepresentationIdentifier, SerializedPayload},
},
network::udp_sender::UDPSender,
serialization::{cdr_deserializer::CDRDeserializerAdapter, cdr_serializer::to_bytes},
structure::{
guid::{EntityKind, GuidPrefix},
sequence_number::SequenceNumber,
},
test::random_data::*,
};
use crate::messages::submessages::submessage_flag::*;
#[test]
fn dr_get_samples_from_ddscache() {
let dp = DomainParticipant::new(0).expect("Participant creation failed");
let mut qos = QosPolicies::qos_none();
qos.history = Some(policy::History::KeepAll);
let sub = dp.create_subscriber(&qos).unwrap();
let topic = dp
.create_topic(
"dr".to_string(),
"drtest?".to_string(),
&qos,
TopicKind::WithKey,
)
.unwrap();
let (send, _rec) = mio_channel::sync_channel::<()>(10);
let (status_sender, _status_receiver) =
mio_extras::channel::sync_channel::<DataReaderStatus>(100);
let (_reader_commander, reader_command_receiver) =
mio_extras::channel::sync_channel::<ReaderCommand>(100);
let reader_id = EntityId::default();
let reader_guid = GUID::new_with_prefix_and_id(dp.guid_prefix(), reader_id);
let reader_ing = ReaderIngredients {
guid: reader_guid,
notification_sender: send,
status_sender,
topic_name: topic.name(),
qos_policy: QosPolicies::qos_none(),
data_reader_command_receiver: reader_command_receiver,
};
let mut new_reader = Reader::new(
reader_ing,
dp.dds_cache(),
Rc::new(UDPSender::new_with_random_port().unwrap()),
mio_extras::timer::Builder::default().build(),
);
let mut matching_datareader = sub
.create_datareader::<RandomData, CDRDeserializerAdapter<RandomData>>(&topic, None)
.unwrap();
let random_data = RandomData {
a: 1,
b: "somedata".to_string(),
};
let data_key = random_data.key();
let writer_guid = GUID {
prefix: GuidPrefix::new(&[1; 12]),
entity_id: EntityId::create_custom_entity_id(
[1; 3],
EntityKind::WRITER_WITH_KEY_USER_DEFINED,
),
};
let mr_state = MessageReceiverState {
source_guid_prefix: writer_guid.prefix,
..Default::default()
};
new_reader.matched_writer_add(
writer_guid,
EntityId::UNKNOWN,
mr_state.unicast_reply_locator_list.clone(),
mr_state.multicast_reply_locator_list.clone(),
&QosPolicies::qos_none(),
);
let data_flags = DATA_Flags::Endianness | DATA_Flags::Data;
let data = Data {
reader_id: EntityId::create_custom_entity_id([1, 2, 3], EntityKind::from(111)),
writer_id: writer_guid.entity_id,
writer_sn: SequenceNumber::from(1_i64),
serialized_payload: Some(SerializedPayload {
representation_identifier: RepresentationIdentifier::CDR_LE,
representation_options: [0, 0],
value: Bytes::from(to_bytes::<RandomData, LittleEndian>(&random_data).unwrap()),
}),
..Default::default()
};
new_reader.handle_data_msg(data, data_flags, &mr_state);
std::thread::sleep(std::time::Duration::from_millis(100));
matching_datareader.fill_local_datasample_cache();
let deserialized_random_data = matching_datareader.read(1, ReadCondition::any()).unwrap()[0]
.value()
.unwrap()
.clone();
assert_eq!(deserialized_random_data, random_data);
let random_data2 = RandomData {
a: 1,
b: "somedata number 2".to_string(),
};
let data2 = Data {
reader_id: EntityId::create_custom_entity_id([1, 2, 3], EntityKind::from(111)),
writer_id: writer_guid.entity_id,
writer_sn: SequenceNumber::from(2),
serialized_payload: Some(SerializedPayload {
representation_identifier: RepresentationIdentifier::CDR_LE,
representation_options: [0, 0],
value: Bytes::from(to_bytes::<RandomData, LittleEndian>(&random_data2).unwrap()),
}),
..Default::default()
};
let random_data3 = RandomData {
a: 1,
b: "third somedata".to_string(),
};
let data3 = Data {
reader_id: EntityId::create_custom_entity_id([1, 2, 3], EntityKind::from(111)),
writer_id: writer_guid.entity_id,
writer_sn: SequenceNumber::from(3),
serialized_payload: Some(SerializedPayload {
representation_identifier: RepresentationIdentifier::CDR_LE,
representation_options: [0, 0],
value: Bytes::from(to_bytes::<RandomData, LittleEndian>(&random_data3).unwrap()),
}),
..Default::default()
};
new_reader.handle_data_msg(data2, data_flags, &mr_state);
new_reader.handle_data_msg(data3, data_flags, &mr_state);
matching_datareader.fill_local_datasample_cache();
let random_data_vec = matching_datareader
.read_instance(100, ReadCondition::any(), Some(data_key), SelectByKey::This)
.unwrap();
assert_eq!(random_data_vec.len(), 3);
}
#[test]
#[ignore]
fn dr_read_and_take() {
let dp = DomainParticipant::new(0).expect("Particpant creation failed!");
let mut qos = QosPolicies::qos_none();
qos.history = Some(policy::History::KeepAll);
let sub = dp.create_subscriber(&qos).unwrap();
let topic = dp
.create_topic(
"dr read".to_string(),
"read fn test?".to_string(),
&qos,
TopicKind::WithKey,
)
.unwrap();
let (send, _rec) = mio_channel::sync_channel::<()>(10);
let (status_sender, _status_receiver) =
mio_extras::channel::sync_channel::<DataReaderStatus>(100);
let (_reader_commander, reader_command_receiver) =
mio_extras::channel::sync_channel::<ReaderCommand>(100);
let default_id = EntityId::default();
let reader_guid = GUID::new_with_prefix_and_id(dp.guid_prefix(), default_id);
let reader_ing = ReaderIngredients {
guid: reader_guid,
notification_sender: send,
status_sender,
topic_name: topic.name(),
qos_policy: QosPolicies::qos_none(),
data_reader_command_receiver: reader_command_receiver,
};
let mut reader = Reader::new(
reader_ing,
dp.dds_cache(),
Rc::new(UDPSender::new_with_random_port().unwrap()),
mio_extras::timer::Builder::default().build(),
);
let mut datareader = sub
.create_datareader::<RandomData, CDRDeserializerAdapter<RandomData>>(&topic, None)
.unwrap();
let writer_guid = GUID {
prefix: GuidPrefix::new(&[1; 12]),
entity_id: EntityId::create_custom_entity_id(
[1; 3],
EntityKind::WRITER_WITH_KEY_USER_DEFINED,
),
};
let mr_state = MessageReceiverState {
source_guid_prefix: writer_guid.prefix,
..Default::default()
};
reader.matched_writer_add(
writer_guid,
EntityId::UNKNOWN,
mr_state.unicast_reply_locator_list.clone(),
mr_state.multicast_reply_locator_list.clone(),
&QosPolicies::qos_none(),
);
let test_data = RandomData {
a: 10,
b: ":DDD".to_string(),
};
let test_data2 = RandomData {
a: 11,
b: ":)))".to_string(),
};
let data_msg = Data {
reader_id: reader.entity_id(),
writer_id: writer_guid.entity_id,
writer_sn: SequenceNumber::from(1),
serialized_payload: Some(SerializedPayload {
representation_identifier: RepresentationIdentifier::CDR_LE,
representation_options: [0, 0],
value: Bytes::from(to_bytes::<RandomData, LittleEndian>(&test_data).unwrap()),
}),
..Default::default()
};
let data_msg2 = Data {
reader_id: reader.entity_id(),
writer_id: writer_guid.entity_id,
writer_sn: SequenceNumber::from(2),
serialized_payload: Some(SerializedPayload {
representation_identifier: RepresentationIdentifier::CDR_LE,
representation_options: [0, 0],
value: Bytes::from(to_bytes::<RandomData, LittleEndian>(&test_data2).unwrap()),
}),
..Default::default()
};
let data_flags = DATA_Flags::Endianness | DATA_Flags::Data;
reader.handle_data_msg(data_msg, data_flags, &mr_state);
reader.handle_data_msg(data_msg2, data_flags, &mr_state);
{
let result_vec = datareader.read(100, ReadCondition::any()).unwrap();
assert_eq!(result_vec.len(), 2);
let d = result_vec[0].value().unwrap();
assert_eq!(&test_data, d);
}
{
let result_vec2 = datareader.read(100, ReadCondition::any()).unwrap();
assert_eq!(result_vec2.len(), 2);
let d2 = result_vec2[1].value().unwrap();
assert_eq!(&test_data2, d2);
}
{
let result_vec3 = datareader.read(100, ReadCondition::any()).unwrap();
let d3 = result_vec3[0].value().unwrap();
assert_eq!(&test_data, d3);
}
let mut result_vec = datareader.take(100, ReadCondition::any()).unwrap();
let result_vec2 = datareader.take(100, ReadCondition::any());
let d2 = result_vec.pop().unwrap();
let d2 = d2.value().as_ref().unwrap().clone();
let d1 = result_vec.pop().unwrap();
let d1 = d1.value().as_ref().unwrap().clone();
assert_eq!(test_data2, d2);
assert_eq!(test_data, d1);
assert!(result_vec2.is_ok());
assert_eq!(result_vec2.unwrap().len(), 0);
let data_key1 = RandomData {
a: 1,
b: ":D".to_string(),
};
let data_key2_1 = RandomData {
a: 2,
b: ":(".to_string(),
};
let data_key2_2 = RandomData {
a: 2,
b: "??".to_string(),
};
let data_key2_3 = RandomData {
a: 2,
b: "xD".to_string(),
};
let key1 = data_key1.key();
let key2 = data_key2_1.key();
assert!(data_key2_1.key() == data_key2_2.key());
assert!(data_key2_3.key() == key2);
let data_msg = Data {
reader_id: reader.entity_id(),
writer_id: writer_guid.entity_id,
writer_sn: SequenceNumber::from(2),
serialized_payload: Some(SerializedPayload {
representation_identifier: RepresentationIdentifier::CDR_LE,
representation_options: [0, 0],
value: Bytes::from(to_bytes::<RandomData, LittleEndian>(&data_key1).unwrap()),
}),
..Data::default()
};
let data_msg2 = Data {
reader_id: reader.entity_id(),
writer_id: writer_guid.entity_id,
writer_sn: SequenceNumber::from(3),
serialized_payload: Some(SerializedPayload {
representation_identifier: RepresentationIdentifier::CDR_LE,
representation_options: [0, 0],
value: Bytes::from(to_bytes::<RandomData, LittleEndian>(&data_key2_1).unwrap()),
}),
..Data::default()
};
let data_msg3 = Data {
reader_id: reader.entity_id(),
writer_id: writer_guid.entity_id,
writer_sn: SequenceNumber::from(4),
serialized_payload: Some(SerializedPayload {
representation_identifier: RepresentationIdentifier::CDR_LE,
representation_options: [0, 0],
value: Bytes::from(to_bytes::<RandomData, LittleEndian>(&data_key2_2).unwrap()),
}),
..Data::default()
};
let data_msg4 = Data {
reader_id: reader.entity_id(),
writer_id: writer_guid.entity_id,
writer_sn: SequenceNumber::from(5),
serialized_payload: Some(SerializedPayload {
representation_identifier: RepresentationIdentifier::CDR_LE,
representation_options: [0, 0],
value: Bytes::from(to_bytes::<RandomData, LittleEndian>(&data_key2_3).unwrap()),
}),
..Data::default()
};
reader.handle_data_msg(data_msg, data_flags, &mr_state);
reader.handle_data_msg(data_msg2, data_flags, &mr_state);
reader.handle_data_msg(data_msg3, data_flags, &mr_state);
reader.handle_data_msg(data_msg4, data_flags, &mr_state);
info!("calling read with key 1 and this");
let results =
datareader.read_instance(100, ReadCondition::any(), Some(key1), SelectByKey::This);
assert_eq!(data_key1, results.unwrap()[0].value().unwrap().clone());
info!("calling read with None and this");
let results = datareader.read_instance(100, ReadCondition::any(), None, SelectByKey::This);
assert_eq!(data_key1, results.unwrap()[0].value().unwrap().clone());
info!("calling read with key 1 and next");
let results =
datareader.read_instance(100, ReadCondition::any(), Some(key1), SelectByKey::Next);
assert_eq!(results.as_ref().unwrap().len(), 3);
assert_eq!(data_key2_2, results.unwrap()[1].value().unwrap().clone());
info!("calling take with key 2 and this");
let results =
datareader.take_instance(100, ReadCondition::any(), Some(key2), SelectByKey::This);
assert_eq!(results.as_ref().unwrap().len(), 3);
let mut vec = results.unwrap();
let d3 = vec.pop().unwrap();
let d3 = d3.into_value().unwrap();
let d2 = vec.pop().unwrap();
let d2 = d2.into_value().unwrap();
let d1 = vec.pop().unwrap();
let d1 = d1.into_value().unwrap();
assert_eq!(data_key2_3, d3);
assert_eq!(data_key2_2, d2);
assert_eq!(data_key2_1, d1);
info!("calling take with key 2 and this");
let results =
datareader.take_instance(100, ReadCondition::any(), Some(key2), SelectByKey::This);
assert!(results.is_ok());
assert!(results.unwrap().is_empty());
}
}