use std::{
cmp::max,
collections::BTreeMap,
io,
marker::PhantomData,
pin::Pin,
sync::{Arc, Mutex, MutexGuard},
task::{Context, Poll, Waker},
};
use futures::stream::{FusedStream, Stream};
use serde::de::DeserializeOwned;
use mio_extras::channel as mio_channel;
#[allow(unused_imports)]
use log::{debug, error, info, trace, warn};
use crate::{
dds::{
adapters::with_key::{Decode, DefaultDecoder, DeserializerAdapter},
ddsdata::*,
key::*,
pubsub::Subscriber,
qos::*,
result::*,
statusevents::*,
topic::{Topic, TopicDescription},
with_key::datasample::{DeserializedCacheChange, Sample},
},
discovery::discovery::DiscoveryCommand,
mio_source::PollEventSource,
serialization::CDRDeserializerAdapter,
structure::{
cache_change::CacheChange,
dds_cache::TopicCache,
entity::RTPSEntity,
guid::{EntityId, GUID},
sequence_number::SequenceNumber,
time::Timestamp,
},
};
#[derive(Clone, Debug)]
pub(crate) enum ReaderCommand {
#[allow(dead_code)] ResetRequestedDeadlineStatus,
}
pub(crate) struct ReadState<K: Key> {
latest_instant: Timestamp, last_read_sn: BTreeMap<GUID, SequenceNumber>, hash_to_key_map: BTreeMap<KeyHash, K>, }
impl<K: Key> ReadState<K> {
fn new() -> Self {
ReadState {
latest_instant: Timestamp::ZERO,
last_read_sn: BTreeMap::new(),
hash_to_key_map: BTreeMap::<KeyHash, K>::new(),
}
}
fn get_sn_map_and_hash_map(
&mut self,
) -> (
&mut BTreeMap<GUID, SequenceNumber>,
&mut BTreeMap<KeyHash, K>,
) {
let ReadState {
last_read_sn,
hash_to_key_map,
..
} = self;
(last_read_sn, hash_to_key_map)
}
}
pub struct SimpleDataReader<D: Keyed, DA: DeserializerAdapter<D> = CDRDeserializerAdapter<D>> {
my_subscriber: Subscriber,
my_topic: Topic,
qos_policy: QosPolicies,
my_guid: GUID,
pub(crate) notification_receiver: Mutex<mio_channel::Receiver<()>>,
topic_cache: Arc<Mutex<TopicCache>>,
read_state: Mutex<ReadState<<D as Keyed>::K>>,
deserializer_type: PhantomData<DA>, discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
status_receiver: StatusChannelReceiver<DataReaderStatus>,
#[allow(dead_code)] reader_command: mio_channel::SyncSender<ReaderCommand>,
data_reader_waker: Arc<Mutex<Option<Waker>>>,
event_source: PollEventSource,
}
impl<D, DA> Drop for SimpleDataReader<D, DA>
where
D: Keyed,
DA: DeserializerAdapter<D>,
{
fn drop(&mut self) {
self.my_subscriber.remove_reader(self.my_guid);
match self
.discovery_command
.send(DiscoveryCommand::RemoveLocalReader { guid: self.my_guid })
{
Ok(_) => {}
Err(mio_channel::SendError::Disconnected(_)) => {
debug!("Failed to send DiscoveryCommand::RemoveLocalReader . Maybe shutting down?");
}
Err(e) => error!(
"Failed to send DiscoveryCommand::RemoveLocalReader. {:?}",
e
),
}
}
}
impl<D: 'static, DA> SimpleDataReader<D, DA>
where
D: Keyed,
DA: DeserializerAdapter<D>,
{
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
subscriber: Subscriber,
my_id: EntityId,
topic: Topic,
qos_policy: QosPolicies,
notification_receiver: mio_channel::Receiver<()>,
topic_cache: Arc<Mutex<TopicCache>>,
discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
status_receiver: StatusChannelReceiver<DataReaderStatus>,
reader_command: mio_channel::SyncSender<ReaderCommand>,
data_reader_waker: Arc<Mutex<Option<Waker>>>,
event_source: PollEventSource,
) -> CreateResult<Self> {
let dp = match subscriber.participant() {
Some(dp) => dp,
None => {
return Err(CreateError::ResourceDropped {
reason: "Cannot create new DataReader, DomainParticipant doesn't exist.".to_string(),
})
}
};
let my_guid = GUID::new_with_prefix_and_id(dp.guid_prefix(), my_id);
let topic_cache_name = topic_cache.lock().unwrap().topic_name();
if topic.name() != topic_cache_name {
return Err(CreateError::Internal {
reason: format!(
"Topic name = {} and topic cache name = {} not equal when creating a SimpleDataReader",
topic.name(),
topic_cache_name
),
});
}
Ok(Self {
my_subscriber: subscriber,
qos_policy,
my_guid,
notification_receiver: Mutex::new(notification_receiver),
topic_cache,
read_state: Mutex::new(ReadState::new()),
my_topic: topic,
deserializer_type: PhantomData,
discovery_command,
status_receiver,
reader_command,
data_reader_waker,
event_source,
})
}
pub(crate) fn set_waker(&self, w: Option<Waker>) {
*self.data_reader_waker.lock().unwrap() = w;
}
pub(crate) fn drain_read_notifications(&self) {
let rec = self.notification_receiver.lock().unwrap();
while rec.try_recv().is_ok() {}
self.event_source.drain();
}
fn try_take_undecoded<'a>(
is_reliable: bool,
topic_cache: &'a TopicCache,
latest_instant: Timestamp,
last_read_sn: &'a BTreeMap<GUID, SequenceNumber>,
) -> Box<dyn Iterator<Item = (Timestamp, &'a CacheChange)> + 'a> {
if is_reliable {
topic_cache.get_changes_in_range_reliable(last_read_sn)
} else {
topic_cache.get_changes_in_range_best_effort(latest_instant, Timestamp::now())
}
}
fn update_hash_to_key_map(
hash_to_key_map: &mut BTreeMap<KeyHash, D::K>,
deserialized: &Sample<D, D::K>,
) {
let instance_key = match deserialized {
Sample::Value(d) => d.key(),
Sample::Dispose(k) => k.clone(),
};
hash_to_key_map.insert(instance_key.hash_key(false), instance_key);
}
fn deserialize_with<S>(
&self,
timestamp: Timestamp,
cc: &CacheChange,
hash_to_key_map: &mut BTreeMap<KeyHash, D::K>,
decoder: S,
) -> ReadResult<DeserializedCacheChange<D>>
where
S: Decode<DA::Decoded, DA::DecodedKey>,
{
match cc.data_value {
DDSData::Data {
ref serialized_payload,
} => {
if let Some(recognized_rep_id) = DA::supported_encodings()
.iter()
.find(|r| **r == serialized_payload.representation_identifier)
{
match DA::from_bytes_with(&serialized_payload.value, *recognized_rep_id, decoder) {
Ok(payload) => {
let p = Sample::Value(payload);
Self::update_hash_to_key_map(hash_to_key_map, &p);
Ok(DeserializedCacheChange::new(timestamp, cc, p))
}
Err(e) => Err(ReadError::Deserialization {
reason: format!(
"Failed to deserialize sample bytes: {}, , Topic = {}, Type = {:?}",
e,
self.my_topic.name(),
self.my_topic.get_type()
),
}),
}
} else {
info!(
"Unknown representation id: {:?} , Topic = {}, Type = {:?} data = {:02x?}",
serialized_payload.representation_identifier,
self.my_topic.name(),
self.my_topic.get_type(),
serialized_payload.value,
);
Err(ReadError::Deserialization {
reason: format!(
"Unknown representation id {:?} , Topic = {}, Type = {:?}",
serialized_payload.representation_identifier,
self.my_topic.name(),
self.my_topic.get_type()
),
})
}
}
DDSData::DisposeByKey {
key: ref serialized_key,
..
} => {
match DA::key_from_bytes_with(
&serialized_key.value,
serialized_key.representation_identifier,
decoder,
) {
Ok(key) => {
let k = Sample::Dispose(key);
Self::update_hash_to_key_map(hash_to_key_map, &k);
Ok(DeserializedCacheChange::new(timestamp, cc, k))
}
Err(e) => Err(ReadError::Deserialization {
reason: format!(
"Failed to deserialize key {}, Topic = {}, Type = {:?}",
e,
self.my_topic.name(),
self.my_topic.get_type()
),
}),
}
}
DDSData::DisposeByKeyHash { key_hash, .. } => {
if let Some(key) = hash_to_key_map.get(&key_hash) {
Ok(DeserializedCacheChange::new(
timestamp,
cc,
Sample::Dispose(key.clone()),
))
} else {
Err(ReadError::UnknownKey {
details: format!(
"Received dispose with unknown key hash: {:x?}, Topic = {}, Type = {:?}",
key_hash,
self.my_topic.name(),
self.my_topic.get_type()
),
})
}
}
} }
pub fn try_take_one(&self) -> ReadResult<Option<DeserializedCacheChange<D>>>
where
DA: DeserializerAdapter<D> + DefaultDecoder<D>,
{
Self::try_take_one_with(self, DA::DECODER)
}
#[allow(clippy::needless_pass_by_value)]
pub fn try_take_one_with<S>(&self, decoder: S) -> ReadResult<Option<DeserializedCacheChange<D>>>
where
S: Decode<DA::Decoded, DA::DecodedKey> + Clone,
{
let is_reliable = matches!(
self.qos_policy.reliability(),
Some(policy::Reliability::Reliable { .. })
);
let topic_cache = self.acquire_the_topic_cache_guard();
let mut read_state_ref = self.read_state.lock().unwrap();
let latest_instant = read_state_ref.latest_instant;
let (last_read_sn, hash_to_key_map) = read_state_ref.get_sn_map_and_hash_map();
loop {
let (timestamp, cc) =
match Self::try_take_undecoded(is_reliable, &topic_cache, latest_instant, last_read_sn)
.next()
{
None => return Ok(None), Some((ts, cc)) => (ts, cc),
};
let result = self.deserialize_with(timestamp, cc, hash_to_key_map, decoder.clone());
if let Err(ReadError::UnknownKey { .. }) = result {
} else {
let writer_guid = cc.writer_guid;
let sequence_number = cc.sequence_number;
read_state_ref.latest_instant = max(latest_instant, timestamp);
read_state_ref
.last_read_sn
.insert(writer_guid, sequence_number);
return result.map(Some);
}
}
}
pub fn qos(&self) -> &QosPolicies {
&self.qos_policy
}
pub fn guid(&self) -> GUID {
self.my_guid
}
pub fn topic(&self) -> &Topic {
&self.my_topic
}
pub fn as_async_stream<S>(&self) -> SimpleDataReaderStream<D, S, DA>
where
DA: DefaultDecoder<D, Decoder = S>,
DA::Decoder: Clone,
S: Decode<DA::Decoded, DA::DecodedKey>,
{
Self::as_async_stream_with(self, DA::DECODER)
}
pub fn as_async_stream_with<S>(&self, decoder: S) -> SimpleDataReaderStream<D, S, DA>
where
S: Decode<DA::Decoded, DA::DecodedKey> + Clone,
{
SimpleDataReaderStream {
simple_datareader: self,
decoder,
}
}
fn acquire_the_topic_cache_guard(&self) -> MutexGuard<TopicCache> {
self.topic_cache.lock().unwrap_or_else(|e| {
panic!(
"The topic cache of topic {} is poisoned. Error: {}",
&self.my_topic.name(),
e
)
})
}
}
impl<D, DA> mio_06::Evented for SimpleDataReader<D, DA>
where
D: Keyed,
DA: DeserializerAdapter<D>,
{
fn register(
&self,
poll: &mio_06::Poll,
token: mio_06::Token,
interest: mio_06::Ready,
opts: mio_06::PollOpt,
) -> io::Result<()> {
self
.notification_receiver
.lock()
.unwrap()
.register(poll, token, interest, opts)
}
fn reregister(
&self,
poll: &mio_06::Poll,
token: mio_06::Token,
interest: mio_06::Ready,
opts: mio_06::PollOpt,
) -> io::Result<()> {
self
.notification_receiver
.lock()
.unwrap()
.reregister(poll, token, interest, opts)
}
fn deregister(&self, poll: &mio_06::Poll) -> io::Result<()> {
self.notification_receiver.lock().unwrap().deregister(poll)
}
}
impl<D, DA> mio_08::event::Source for SimpleDataReader<D, DA>
where
D: Keyed,
DA: DeserializerAdapter<D>,
{
fn register(
&mut self,
registry: &mio_08::Registry,
token: mio_08::Token,
interests: mio_08::Interest,
) -> io::Result<()> {
self.event_source.register(registry, token, interests)
}
fn reregister(
&mut self,
registry: &mio_08::Registry,
token: mio_08::Token,
interests: mio_08::Interest,
) -> io::Result<()> {
self.event_source.reregister(registry, token, interests)
}
fn deregister(&mut self, registry: &mio_08::Registry) -> io::Result<()> {
self.event_source.deregister(registry)
}
}
impl<'a, D, DA> StatusEvented<'a, DataReaderStatus, SimpleDataReaderEventStream<'a, D, DA>>
for SimpleDataReader<D, DA>
where
D: Keyed,
DA: DeserializerAdapter<D>,
{
fn as_status_evented(&mut self) -> &dyn mio_06::Evented {
self.status_receiver.as_status_evented()
}
fn as_status_source(&mut self) -> &mut dyn mio_08::event::Source {
self.status_receiver.as_status_source()
}
fn as_async_status_stream(&'a self) -> SimpleDataReaderEventStream<'a, D, DA> {
SimpleDataReaderEventStream {
simple_datareader: self,
}
}
fn try_recv_status(&self) -> Option<DataReaderStatus> {
self.status_receiver.try_recv_status()
}
}
impl<D, DA> RTPSEntity for SimpleDataReader<D, DA>
where
D: Keyed + DeserializeOwned,
DA: DeserializerAdapter<D>,
{
fn guid(&self) -> GUID {
self.my_guid
}
}
pub struct SimpleDataReaderStream<
'a,
D: Keyed + 'static,
S: Decode<DA::Decoded, DA::DecodedKey>,
DA: DeserializerAdapter<D> + 'static = CDRDeserializerAdapter<D>,
> {
simple_datareader: &'a SimpleDataReader<D, DA>,
decoder: S,
}
impl<'a, D, S, DA> Unpin for SimpleDataReaderStream<'a, D, S, DA>
where
D: Keyed + 'static,
DA: DeserializerAdapter<D>,
S: Decode<DA::Decoded, DA::DecodedKey> + Unpin,
{
}
impl<'a, D, S, DA> Stream for SimpleDataReaderStream<'a, D, S, DA>
where
D: Keyed + 'static,
DA: DeserializerAdapter<D>,
S: Decode<DA::Decoded, DA::DecodedKey> + Clone,
{
type Item = ReadResult<DeserializedCacheChange<D>>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
debug!("poll_next");
match self
.simple_datareader
.try_take_one_with(self.decoder.clone())
{
Err(e) =>
{
Poll::Ready(Some(Err(e)))
}
Ok(Some(d)) => Poll::Ready(Some(Ok(d))),
Ok(None) => {
self.simple_datareader.set_waker(Some(cx.waker().clone()));
match self
.simple_datareader
.try_take_one_with(self.decoder.clone())
{
Err(e) => Poll::Ready(Some(Err(e))),
Ok(Some(d)) => Poll::Ready(Some(Ok(d))),
Ok(None) => Poll::Pending,
}
}
} } } impl<'a, D, S, DA> FusedStream for SimpleDataReaderStream<'a, D, S, DA>
where
D: Keyed + 'static,
DA: DeserializerAdapter<D>,
S: Decode<DA::Decoded, DA::DecodedKey> + Clone,
{
fn is_terminated(&self) -> bool {
false }
}
pub struct SimpleDataReaderEventStream<
'a,
D: Keyed + 'static,
DA: DeserializerAdapter<D> + 'static = CDRDeserializerAdapter<D>,
> {
simple_datareader: &'a SimpleDataReader<D, DA>,
}
impl<'a, D, DA> Stream for SimpleDataReaderEventStream<'a, D, DA>
where
D: Keyed + 'static,
DA: DeserializerAdapter<D>,
{
type Item = DataReaderStatus;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(
&mut self
.simple_datareader
.status_receiver
.as_async_status_stream(),
)
.poll_next(cx)
} } impl<'a, D, DA> FusedStream for SimpleDataReaderEventStream<'a, D, DA>
where
D: Keyed + 'static,
DA: DeserializerAdapter<D>,
{
fn is_terminated(&self) -> bool {
self
.simple_datareader
.status_receiver
.as_async_status_stream()
.is_terminated()
}
}