use std::{io};
use std::sync::{Arc, RwLock};
use std::marker::PhantomData;
use itertools::Itertools;
use serde::de::DeserializeOwned;
use mio_extras::channel as mio_channel;
#[allow(unused_imports)]
use log::{error, debug, info, warn};
use mio::{Evented, Poll, PollOpt, Ready, Token};
use crate::{
serialization::CDRDeserializerAdapter,
discovery::discovery::DiscoveryCommand,
structure::{
entity::{RTPSEntity, },
guid::{GUID, EntityId},
time::Timestamp,
dds_cache::DDSCache,
cache_change::{CacheChange, ChangeKind},
},
};
use crate::log_and_err_precondition_not_met;
use crate::dds::{
traits::{key::*, TopicDescription},
traits::serde_adapters::*,
values::result::*,
qos::*,
with_key::datasample::*,
datasample_cache::DataSampleCache,
pubsub::Subscriber,
topic::Topic,
readcondition::*,
};
use crate::dds::statusevents::*;
pub type DataReader_CDR<D> = DataReader<D,CDRDeserializerAdapter<D>>;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum SelectByKey {
This,
Next,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub(crate) enum ReaderCommand {
RESET_REQUESTED_DEADLINE_STATUS,
}
pub struct DataReader< D: Keyed + DeserializeOwned, DA: DeserializerAdapter<D> = CDRDeserializerAdapter<D> > {
my_subscriber: Subscriber,
my_topic: Topic,
qos_policy: QosPolicies,
my_guid: GUID,
pub(crate) notification_receiver: mio_channel::Receiver<()>,
dds_cache: Arc<RwLock<DDSCache>>,
datasample_cache: DataSampleCache<D>,
latest_instant: Timestamp,
deserializer_type: PhantomData<DA>,
discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
status_receiver: StatusReceiver<DataReaderStatus>,
reader_command: mio_channel::SyncSender<ReaderCommand>,
}
impl<D, DA> Drop for DataReader<D, DA>
where
D: Keyed + DeserializeOwned,
DA: DeserializerAdapter<D>,
{
fn drop(&mut self) {
match self
.discovery_command
.send(DiscoveryCommand::REMOVE_LOCAL_READER {
guid: self.get_guid(),
}) {
Ok(_) => {}
Err(e) => error!(
"Failed to send REMOVE_LOCAL_READER DiscoveryCommand. {:?}",
e
),
}
}
}
impl<D: 'static, DA> DataReader<D, DA>
where
D: DeserializeOwned + Keyed,
<D as Keyed>::K: Key,
DA: DeserializerAdapter<D>,
{
pub(crate) fn new(
subscriber: Subscriber,
my_id: EntityId,
topic: Topic,
qos_policy: QosPolicies,
notification_receiver: mio_channel::Receiver<()>,
dds_cache: Arc<RwLock<DDSCache>>,
discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
status_channel_rec: mio_channel::Receiver<DataReaderStatus>,
reader_command: mio_channel::SyncSender<ReaderCommand>,
) -> Result<Self> {
let dp = match subscriber.get_participant() {
Some(dp) => dp,
None => return
log_and_err_precondition_not_met!("Cannot create new DataReader, DomainParticipant doesn't exist.") ,
};
let my_guid = GUID::new_with_prefix_and_id(dp.get_guid_prefix().clone(), my_id);
Ok(Self {
my_subscriber: subscriber,
qos_policy,
my_guid,
notification_receiver,
dds_cache,
datasample_cache: DataSampleCache::new(topic.get_qos()),
my_topic: topic,
latest_instant: Timestamp::now(),
deserializer_type: PhantomData,
discovery_command,
status_receiver: StatusReceiver::new(status_channel_rec) ,
reader_command,
})
}
pub fn read(
&mut self,
max_samples: usize,
read_condition: ReadCondition,
) -> Result<Vec<DataSample<&D>>> {
self.fill_local_datasample_cache();
let mut selected = self.datasample_cache.select_keys_for_access(read_condition);
selected.truncate(max_samples);
let result = self.datasample_cache.read_by_keys(&selected);
while let Ok(_) = self.notification_receiver.try_recv() {}
Ok(result)
}
pub fn take(
&mut self,
max_samples: usize,
read_condition: ReadCondition,
) -> Result<Vec<DataSample<D>>> {
self.fill_local_datasample_cache();
let mut selected = self.datasample_cache.select_keys_for_access(read_condition);
selected.truncate(max_samples);
let result = self.datasample_cache.take_by_keys(&selected);
while let Ok(_) = self.notification_receiver.try_recv() {}
Ok(result)
}
pub fn read_next_sample(&mut self) -> Result<Option<DataSample<&D>>> {
let mut ds = self.read(1, ReadCondition::not_read())?;
Ok(ds.pop())
}
pub fn take_next_sample(&mut self) -> Result<Option<DataSample<D>>> {
let mut ds = self.take(1, ReadCondition::not_read())?;
Ok(ds.pop())
}
pub fn iterator(&mut self) -> Result<impl Iterator<Item = std::result::Result<&D, D::K>>> {
Ok(
self
.read(std::usize::MAX, ReadCondition::not_read())?
.into_iter()
.map(|ds| ds.value),
)
}
pub fn conditional_iterator(
&mut self,
read_condition: ReadCondition,
) -> Result<impl Iterator<Item = std::result::Result<&D, D::K>>> {
Ok(
self
.read(std::usize::MAX, read_condition)?
.into_iter()
.map(|ds| ds.value),
)
}
pub fn into_iterator(&mut self) -> Result<impl Iterator<Item = std::result::Result<D, D::K>>> {
Ok(
self
.take(std::usize::MAX, ReadCondition::not_read())?
.into_iter()
.map(|ds| ds.value),
)
}
pub fn into_conditional_iterator(
&mut self,
read_condition: ReadCondition,
) -> Result<impl Iterator<Item = std::result::Result<D, D::K>>> {
Ok(
self
.take(std::usize::MAX, read_condition)?
.into_iter()
.map(|ds| ds.value),
)
}
fn fill_local_datasample_cache(&mut self) {
let dds_cache = match self.dds_cache.read() {
Ok(rwlock) => rwlock,
Err(e) => panic!(
"The DDSCache of domain participant is poisoned. Error: {}",
e
),
};
let cache_changes = dds_cache.from_topic_get_changes_in_range(
&self.my_topic.get_name().to_string(),
&self.latest_instant,
&Timestamp::now(),
);
let cache_changes: Vec<(&Timestamp, &CacheChange)> = cache_changes
.into_iter()
.sorted_by(|(a, _), (b, _)| Ord::cmp(a, b))
.filter(|(_, cc)| cc.writer_guid.guidPrefix != self.get_guid_prefix())
.collect();
match cache_changes.last() {
Some((last_instant, _)) => self.latest_instant = **last_instant,
None => return,
};
for (
instant,
CacheChange {
kind,
writer_guid,
sequence_number: _,
data_value: payload_opt,
key: key_hash,
},
) in cache_changes
{
match kind {
ChangeKind::NOT_ALIVE_UNREGISTERED => (),
ChangeKind::NOT_ALIVE_DISPOSED => {
match self.datasample_cache.get_key_by_hash(*key_hash) {
Some(key) => self
.datasample_cache
.add_sample(Err(key), *writer_guid, *instant, None),
None => warn!("Tried to dispose with unkonwn key hash: {:x?}", key_hash),
}
}
ChangeKind::ALIVE => {
match payload_opt {
None => error!("Got CacheChange kind=ALIVE , but no serialized payload!"),
Some(serialized_payload) => {
if let Some(recognized_rep_id) =
DA::supported_encodings().iter()
.find(|r| **r == serialized_payload.representation_identifier)
{
match DA::from_bytes(&serialized_payload.value, *recognized_rep_id) {
Ok(payload) => {
self
.datasample_cache
.add_sample(Ok(payload), *writer_guid, *instant, None)
}
Err(e) => {
error!("Failed to deserialize bytes: {}, Topic = {}, Type = {:?}",
e, self.my_topic.get_name(), self.my_topic.get_type() );
debug!("Bytes were {:?}",&serialized_payload.value);
continue }
}
} else {
warn!("Unknown representation id {:?}.", serialized_payload.representation_identifier);
debug!("Serialized payload was {:?}", &serialized_payload);
continue }
}
} }
}
}
}
fn infer_key(
&self,
instance_key: Option<<D as Keyed>::K>,
this_or_next: SelectByKey,
) -> Option<<D as Keyed>::K> {
match instance_key {
Some(k) => match this_or_next {
SelectByKey::This => Some(k),
SelectByKey::Next => self.datasample_cache.get_next_key(&k),
},
None => self
.datasample_cache
.instance_map
.keys()
.next()
.map(|k| k.clone()),
}
}
pub fn read_instance(
&mut self,
max_samples: usize,
read_condition: ReadCondition,
instance_key: Option<<D as Keyed>::K>,
this_or_next: SelectByKey,
) -> Result<Vec<DataSample<&D>>> {
self.fill_local_datasample_cache();
let key = match self.infer_key(instance_key, this_or_next) {
Some(k) => k,
None => return Ok(Vec::new()),
};
let mut selected = self
.datasample_cache
.select_instance_keys_for_access(key, read_condition);
selected.truncate(max_samples);
let result = self.datasample_cache.read_by_keys(&selected);
while let Ok(_) = self.notification_receiver.try_recv() {}
Ok(result)
}
pub fn take_instance(
&mut self,
max_samples: usize,
read_condition: ReadCondition,
instance_key: Option<<D as Keyed>::K>,
this_or_next: SelectByKey,
) -> Result<Vec<DataSample<D>>> {
self.fill_local_datasample_cache();
let key = match self.infer_key(instance_key, this_or_next) {
Some(k) => k,
None => return Ok(Vec::new()),
};
let mut selected = self
.datasample_cache
.select_instance_keys_for_access(key, read_condition);
selected.truncate(max_samples);
let result = self.datasample_cache.take_by_keys(&selected);
while let Ok(_) = self.notification_receiver.try_recv() {}
Ok(result)
}
}
impl<D, DA> Evented for DataReader<D, DA>
where
D: Keyed + DeserializeOwned,
DA: DeserializerAdapter<D>,
{
fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
self
.notification_receiver
.register(poll, token, interest, opts)
}
fn reregister(
&self,
poll: &Poll,
token: Token,
interest: Ready,
opts: PollOpt,
) -> io::Result<()> {
self
.notification_receiver
.reregister(poll, token, interest, opts)
}
fn deregister(&self, poll: &Poll) -> io::Result<()> {
self.notification_receiver.deregister(poll)
}
}
impl <D,DA> StatusEvented<DataReaderStatus> for DataReader<D,DA>
where
D: Keyed + DeserializeOwned,
DA: DeserializerAdapter<D>,
{
fn as_status_evented(&mut self) -> &dyn Evented {
self.status_receiver.as_status_evented()
}
fn try_recv_status(&self) -> Option<DataReaderStatus> {
self.status_receiver.try_recv_status()
}
}
impl<D, DA> HasQoSPolicy for DataReader<D, DA>
where
D: Keyed + DeserializeOwned,
DA: DeserializerAdapter<D>,
{
fn get_qos(&self) -> QosPolicies {
self.qos_policy.clone()
}
}
impl<D, DA> RTPSEntity for DataReader<D, DA>
where
D: Keyed + DeserializeOwned,
DA: DeserializerAdapter<D>,
{
fn get_guid(&self) -> GUID {
self.my_guid
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::dds::{participant::DomainParticipant, topic::TopicKind};
use crate::test::random_data::*;
use crate::dds::traits::key::Keyed;
use mio_extras::channel as mio_channel;
use log::info;
use crate::dds::reader::Reader;
use crate::messages::submessages::data::Data;
use crate::dds::message_receiver::*;
use crate::structure::guid::GuidPrefix;
use crate::structure::sequence_number::SequenceNumber;
use crate::serialization::{cdr_deserializer::CDRDeserializerAdapter, cdr_serializer::to_bytes};
use byteorder::LittleEndian;
use crate::messages::submessages::submessage_elements::serialized_payload::SerializedPayload;
use std::{
thread,
time::{self},
};
use mio::{Events};
#[test]
fn dr_get_samples_from_ddschache() {
let dp = DomainParticipant::new(0);
let mut qos = QosPolicies::qos_none();
qos.history = Some(policy::History::KeepAll);
let sub = dp.create_subscriber(&qos).unwrap();
let topic = dp
.create_topic("dr", "drtest?", &qos, TopicKind::WithKey)
.unwrap();
let (send, _rec) = mio_channel::sync_channel::<()>(10);
let (status_sender, _status_reciever) = mio_extras::channel::sync_channel::<StatusChange>(100);
let (_reader_commander, reader_command_receiver) =
mio_extras::channel::sync_channel::<ReaderCommand>(100);
let reader_id = EntityId::default();
let datareader_id = EntityId::default();
let reader_guid = GUID::new_with_prefix_and_id(dp.get_guid_prefix(), reader_id);
let mut new_reader = Reader::new(
reader_guid,
send,
status_sender,
dp.get_dds_cache(),
topic.get_name().to_string(),
reader_command_receiver,
);
let mut matching_datareader = sub
.create_datareader::<RandomData, CDRDeserializerAdapter<RandomData>>(
topic,
Some(datareader_id),
None,
)
.unwrap();
let random_data = RandomData {
a: 1,
b: "somedata".to_string(),
};
let data_key = random_data.get_key();
let writer_guid = GUID {
guidPrefix: GuidPrefix::new(vec![1; 12]),
entityId: EntityId::createCustomEntityID([1; 3], 1),
};
let mut mr_state = MessageReceiverState::default();
mr_state.source_guid_prefix = writer_guid.guidPrefix;
new_reader.matched_writer_add(
writer_guid.clone(),
EntityId::ENTITYID_UNKNOWN,
mr_state.unicast_reply_locator_list.clone(),
mr_state.multicast_reply_locator_list.clone(),
);
let mut data = Data::default();
data.reader_id = EntityId::createCustomEntityID([1, 2, 3], 111);
data.writer_id = writer_guid.entityId;
data.writer_sn = SequenceNumber::from(0);
data.serialized_payload = Some(SerializedPayload {
representation_identifier: RepresentationIdentifier::CDR_LE as u16,
representation_options: [0, 0],
value: to_bytes::<RandomData, LittleEndian>(&random_data).unwrap(),
});
new_reader.handle_data_msg(data, mr_state.clone());
matching_datareader.fill_local_datasample_cache();
let deserialized_random_data = matching_datareader.read(1, ReadCondition::any()).unwrap()[0]
.value()
.unwrap()
.clone();
assert_eq!(deserialized_random_data, random_data);
let random_data2 = RandomData {
a: 1,
b: "somedata number 2".to_string(),
};
let mut data2 = Data::default();
data2.reader_id = EntityId::createCustomEntityID([1, 2, 3], 111);
data2.writer_id = writer_guid.entityId;
data2.writer_sn = SequenceNumber::from(1);
data2.serialized_payload = Some(SerializedPayload {
representation_identifier: RepresentationIdentifier::CDR_LE as u16,
representation_options: [0, 0],
value: to_bytes::<RandomData, LittleEndian>(&random_data2).unwrap(),
});
let random_data3 = RandomData {
a: 1,
b: "third somedata".to_string(),
};
let mut data3 = Data::default();
data3.reader_id = EntityId::createCustomEntityID([1, 2, 3], 111);
data3.writer_id = writer_guid.entityId;
data3.writer_sn = SequenceNumber::from(2);
data3.serialized_payload = Some(SerializedPayload {
representation_identifier: RepresentationIdentifier::CDR_LE as u16,
representation_options: [0, 0],
value: to_bytes::<RandomData, LittleEndian>(&random_data3).unwrap(),
});
new_reader.handle_data_msg(data2, mr_state.clone());
new_reader.handle_data_msg(data3, mr_state);
matching_datareader.fill_local_datasample_cache();
let random_data_vec = matching_datareader
.read_instance(100, ReadCondition::any(), Some(data_key), SelectByKey::This)
.unwrap();
assert_eq!(random_data_vec.len(), 3);
}
#[test]
fn dr_read_and_take() {
let dp = DomainParticipant::new(0);
let mut qos = QosPolicies::qos_none();
qos.history = Some(policy::History::KeepAll);
let sub = dp.create_subscriber(&qos).unwrap();
let topic = dp
.create_topic("dr read", "read fn test?", &qos, TopicKind::WithKey)
.unwrap();
let (send, _rec) = mio_channel::sync_channel::<()>(10);
let (status_sender, _status_reciever) = mio_extras::channel::sync_channel::<StatusChange>(100);
let (_reader_commander, reader_command_receiver) =
mio_extras::channel::sync_channel::<ReaderCommand>(100);
let default_id = EntityId::default();
let reader_guid = GUID::new_with_prefix_and_id(dp.get_guid_prefix(), default_id);
let mut reader = Reader::new(
reader_guid,
send,
status_sender,
dp.get_dds_cache(),
topic.get_name().to_string(),
reader_command_receiver,
);
let mut datareader = sub
.create_datareader::<RandomData, CDRDeserializerAdapter<RandomData>>(
topic,
Some(default_id),
None,
)
.unwrap();
let writer_guid = GUID {
guidPrefix: GuidPrefix::new(vec![1; 12]),
entityId: EntityId::createCustomEntityID([1; 3], 1),
};
let mut mr_state = MessageReceiverState::default();
mr_state.source_guid_prefix = writer_guid.guidPrefix;
reader.matched_writer_add(
writer_guid.clone(),
EntityId::ENTITYID_UNKNOWN,
mr_state.unicast_reply_locator_list.clone(),
mr_state.multicast_reply_locator_list.clone(),
);
let test_data = RandomData {
a: 10,
b: ":DDD".to_string(),
};
let test_data2 = RandomData {
a: 11,
b: ":)))".to_string(),
};
let mut data_msg = Data::default();
data_msg.reader_id = reader.get_entity_id();
data_msg.writer_id = writer_guid.entityId;
data_msg.writer_sn = SequenceNumber::from(0);
data_msg.serialized_payload = Some(SerializedPayload {
representation_identifier: RepresentationIdentifier::CDR_LE as u16,
representation_options: [0, 0],
value: to_bytes::<RandomData, LittleEndian>(&test_data).unwrap(),
});
let mut data_msg2 = Data::default();
data_msg2.reader_id = reader.get_entity_id();
data_msg2.writer_id = writer_guid.entityId;
data_msg2.writer_sn = SequenceNumber::from(1);
data_msg2.serialized_payload = Some(SerializedPayload {
representation_identifier: RepresentationIdentifier::CDR_LE as u16,
representation_options: [0, 0],
value: to_bytes::<RandomData, LittleEndian>(&test_data2).unwrap(),
});
reader.handle_data_msg(data_msg, mr_state.clone());
reader.handle_data_msg(data_msg2, mr_state.clone());
{
let result_vec = datareader.read(100, ReadCondition::any()).unwrap();
let d = result_vec[0].value().unwrap();
assert_eq!(&test_data, d);
}
{
let result_vec2 = datareader.read(100, ReadCondition::any()).unwrap();
let d2 = result_vec2[1].value().unwrap();
assert_eq!(&test_data2, d2);
}
{
let result_vec3 = datareader.read(100, ReadCondition::any()).unwrap();
let d3 = result_vec3[0].value().unwrap();
assert_eq!(&test_data, d3);
}
let mut result_vec = datareader.take(100, ReadCondition::any()).unwrap();
let result_vec2 = datareader.take(100, ReadCondition::any());
let d2 = result_vec.pop().unwrap();
let d2 = d2.value().as_ref().unwrap().clone();
let d1 = result_vec.pop().unwrap();
let d1 = d1.value().as_ref().unwrap().clone();
assert_eq!(test_data2, d2);
assert_eq!(test_data, d1);
assert!(result_vec2.is_ok());
assert_eq!(result_vec2.unwrap().len(), 0);
let data_key1 = RandomData {
a: 1,
b: ":D".to_string(),
};
let data_key2_1 = RandomData {
a: 2,
b: ":(".to_string(),
};
let data_key2_2 = RandomData {
a: 2,
b: ":)".to_string(),
};
let data_key2_3 = RandomData {
a: 2,
b: "xD".to_string(),
};
let key1 = data_key1.get_key();
let key2 = data_key2_1.get_key();
assert!(data_key2_1.get_key() == data_key2_2.get_key());
assert!(data_key2_3.get_key() == key2);
let mut data_msg = Data::default();
data_msg.reader_id = reader.get_entity_id();
data_msg.writer_id = writer_guid.entityId;
data_msg.writer_sn = SequenceNumber::from(2);
data_msg.serialized_payload = Some(SerializedPayload {
representation_identifier: RepresentationIdentifier::CDR_LE as u16,
representation_options: [0, 0],
value: to_bytes::<RandomData, LittleEndian>(&data_key1).unwrap(),
});
let mut data_msg2 = Data::default();
data_msg2.reader_id = reader.get_entity_id();
data_msg2.writer_id = writer_guid.entityId;
data_msg2.writer_sn = SequenceNumber::from(3);
data_msg2.serialized_payload = Some(SerializedPayload {
representation_identifier: RepresentationIdentifier::CDR_LE as u16,
representation_options: [0, 0],
value: to_bytes::<RandomData, LittleEndian>(&data_key2_1).unwrap(),
});
let mut data_msg3 = Data::default();
data_msg3.reader_id = reader.get_entity_id();
data_msg3.writer_id = writer_guid.entityId;
data_msg3.writer_sn = SequenceNumber::from(4);
data_msg3.serialized_payload = Some(SerializedPayload {
representation_identifier: RepresentationIdentifier::CDR_LE as u16,
representation_options: [0, 0],
value: to_bytes::<RandomData, LittleEndian>(&data_key2_2).unwrap(),
});
let mut data_msg4 = Data::default();
data_msg4.reader_id = reader.get_entity_id();
data_msg4.writer_id = writer_guid.entityId;
data_msg4.writer_sn = SequenceNumber::from(5);
data_msg4.serialized_payload = Some(SerializedPayload {
representation_identifier: RepresentationIdentifier::CDR_LE as u16,
representation_options: [0, 0],
value: to_bytes::<RandomData, LittleEndian>(&data_key2_3).unwrap(),
});
reader.handle_data_msg(data_msg, mr_state.clone());
reader.handle_data_msg(data_msg2, mr_state.clone());
reader.handle_data_msg(data_msg3, mr_state.clone());
reader.handle_data_msg(data_msg4, mr_state.clone());
info!("calling read with key 1 and this");
let results =
datareader.read_instance(100, ReadCondition::any(), Some(key1), SelectByKey::This);
assert_eq!(data_key1, results.unwrap()[0].value().unwrap().clone());
info!("calling read with None and this");
let results = datareader.read_instance(100, ReadCondition::any(), None, SelectByKey::This);
assert_eq!(data_key1, results.unwrap()[0].value().unwrap().clone());
info!("calling read with key 1 and next");
let results =
datareader.read_instance(100, ReadCondition::any(), Some(key1), SelectByKey::Next);
assert_eq!(results.as_ref().unwrap().len(), 3);
assert_eq!(data_key2_2, results.unwrap()[1].value().unwrap().clone());
info!("calling take with key 2 and this");
let results =
datareader.take_instance(100, ReadCondition::any(), Some(key2), SelectByKey::This);
assert_eq!(results.as_ref().unwrap().len(), 3);
let mut vec = results.unwrap();
let d3 = vec.pop().unwrap();
let d3 = d3.into_value().unwrap();
let d2 = vec.pop().unwrap();
let d2 = d2.into_value().unwrap();
let d1 = vec.pop().unwrap();
let d1 = d1.into_value().unwrap();
assert_eq!(data_key2_3, d3);
assert_eq!(data_key2_2, d2);
assert_eq!(data_key2_1, d1);
info!("calling take with key 2 and this");
let results =
datareader.take_instance(100, ReadCondition::any(), Some(key2), SelectByKey::This);
assert!(results.is_ok());
assert!(results.unwrap().is_empty());
}
#[test]
fn dr_wake_up() {
let dp = DomainParticipant::new(0);
let mut qos = QosPolicies::qos_none();
qos.history = Some(policy::History::KeepAll);
let sub = dp.create_subscriber(&qos).unwrap();
let topic = dp
.create_topic("wakeup", "Wake up!", &qos, TopicKind::WithKey)
.unwrap();
let (send, rec) = mio_channel::sync_channel::<()>(10);
let (status_sender, _status_reciever) = mio_extras::channel::sync_channel::<StatusChange>(100);
let (_reader_commander, reader_command_receiver) =
mio_extras::channel::sync_channel::<ReaderCommand>(100);
let default_id = EntityId::default();
let reader_guid = GUID::new_with_prefix_and_id(dp.get_guid_prefix(), default_id);
let mut reader = Reader::new(
reader_guid,
send,
status_sender,
dp.get_dds_cache(),
topic.get_name().to_string(),
reader_command_receiver,
);
let mut datareader = sub
.create_datareader::<RandomData, CDRDeserializerAdapter<RandomData>>(
topic,
Some(default_id),
None,
)
.unwrap();
datareader.notification_receiver = rec;
let writer_guid = GUID {
guidPrefix: GuidPrefix::new(vec![1; 12]),
entityId: EntityId::createCustomEntityID([1; 3], 1),
};
let mut mr_state = MessageReceiverState::default();
mr_state.source_guid_prefix = writer_guid.guidPrefix;
reader.matched_writer_add(
writer_guid.clone(),
EntityId::ENTITYID_UNKNOWN,
mr_state.unicast_reply_locator_list.clone(),
mr_state.multicast_reply_locator_list.clone(),
);
let test_data1 = RandomData {
a: 1,
b: "Testing 1".to_string(),
};
let test_data2 = RandomData {
a: 2,
b: "Testing 2".to_string(),
};
let test_data3 = RandomData {
a: 2,
b: "Testing 3".to_string(),
};
let mut data_msg = Data::default();
data_msg.reader_id = reader.get_entity_id();
data_msg.writer_id = writer_guid.entityId;
data_msg.writer_sn = SequenceNumber::from(0);
data_msg.serialized_payload = Some(SerializedPayload {
representation_identifier: RepresentationIdentifier::CDR_LE as u16,
representation_options: [0, 0],
value: to_bytes::<RandomData, byteorder::LittleEndian>(&test_data1).unwrap(),
});
let mut data_msg2 = Data::default();
data_msg2.reader_id = reader.get_entity_id();
data_msg2.writer_id = writer_guid.entityId;
data_msg2.writer_sn = SequenceNumber::from(1);
data_msg2.serialized_payload = Some(SerializedPayload {
representation_identifier: RepresentationIdentifier::CDR_LE as u16,
representation_options: [0, 0],
value: to_bytes::<RandomData, byteorder::LittleEndian>(&test_data2).unwrap(),
});
let mut data_msg3 = Data::default();
data_msg3.reader_id = reader.get_entity_id();
data_msg3.writer_id = writer_guid.entityId;
data_msg3.writer_sn = SequenceNumber::from(2);
data_msg3.serialized_payload = Some(SerializedPayload {
representation_identifier: RepresentationIdentifier::CDR_LE as u16,
representation_options: [0, 0],
value: to_bytes::<RandomData, byteorder::LittleEndian>(&test_data3).unwrap(),
});
let handle = std::thread::spawn(move || {
reader.handle_data_msg(data_msg, mr_state.clone());
thread::sleep(time::Duration::from_millis(100));
info!("I'll send the second now..");
reader.handle_data_msg(data_msg2, mr_state.clone());
thread::sleep(time::Duration::from_millis(100));
info!("I'll send the third now..");
reader.handle_data_msg(data_msg3, mr_state.clone());
});
let poll = Poll::new().unwrap();
poll
.register(&datareader, Token(100), Ready::readable(), PollOpt::edge())
.unwrap();
let mut count_to_stop = 0;
'l: loop {
let mut events = Events::with_capacity(1024);
info!("Going to poll");
poll.poll(&mut events, None).unwrap();
for event in events.into_iter() {
info!("Handling events");
if event.token() == Token(100) {
let data = datareader.take(100, ReadCondition::any());
let len = data.as_ref().unwrap().len();
info!("There were {} samples available.", len);
info!("Their strings:");
for d in data.unwrap().into_iter() {
info!("{}", d.value().as_ref().unwrap().b);
}
count_to_stop += len;
}
if count_to_stop >= 3 {
info!("I'll stop now with count {}", count_to_stop);
break 'l;
}
} }
handle.join().unwrap();
assert_eq!(count_to_stop, 3);
}
}