1use std::{
2 cmp::max,
3 collections::BTreeMap,
4 io,
5 marker::PhantomData,
6 pin::Pin,
7 sync::{Arc, Mutex, MutexGuard},
8 task::{Context, Poll, Waker},
9};
10
11use futures::stream::{FusedStream, Stream};
12use serde::de::DeserializeOwned;
13use mio_extras::channel as mio_channel;
14#[allow(unused_imports)]
15use log::{debug, error, info, trace, warn};
16
17use crate::{
18 dds::{
19 adapters::with_key::{Decode, DefaultDecoder, DeserializerAdapter},
20 ddsdata::*,
21 key::*,
22 pubsub::Subscriber,
23 qos::*,
24 result::*,
25 statusevents::*,
26 topic::{Topic, TopicDescription},
27 with_key::datasample::{DeserializedCacheChange, Sample},
28 },
29 discovery::discovery::DiscoveryCommand,
30 mio_source::PollEventSource,
31 serialization::CDRDeserializerAdapter,
32 structure::{
33 cache_change::CacheChange,
34 dds_cache::TopicCache,
35 entity::RTPSEntity,
36 guid::{EntityId, GUID},
37 sequence_number::SequenceNumber,
38 time::Timestamp,
39 },
40};
41
42#[derive(Clone, Debug)]
43pub(crate) enum ReaderCommand {
44 #[allow(dead_code)] ResetRequestedDeadlineStatus,
46}
47
48pub(crate) struct ReadState<K: Key> {
51 latest_instant: Timestamp, last_read_sn: BTreeMap<GUID, SequenceNumber>, hash_to_key_map: BTreeMap<KeyHash, K>, }
59
60impl<K: Key> ReadState<K> {
61 fn new() -> Self {
62 ReadState {
63 latest_instant: Timestamp::ZERO,
64 last_read_sn: BTreeMap::new(),
65 hash_to_key_map: BTreeMap::<KeyHash, K>::new(),
66 }
67 }
68
69 fn get_sn_map_and_hash_map(
73 &mut self,
74 ) -> (
75 &mut BTreeMap<GUID, SequenceNumber>,
76 &mut BTreeMap<KeyHash, K>,
77 ) {
78 let ReadState {
79 last_read_sn,
80 hash_to_key_map,
81 ..
82 } = self;
83 (last_read_sn, hash_to_key_map)
84 }
85}
86
87pub struct SimpleDataReader<D: Keyed, DA: DeserializerAdapter<D> = CDRDeserializerAdapter<D>> {
90 my_subscriber: Subscriber,
91
92 my_topic: Topic,
93 qos_policy: QosPolicies,
94 my_guid: GUID,
95
96 pub(crate) notification_receiver: Mutex<mio_channel::Receiver<()>>,
98
99 topic_cache: Arc<Mutex<TopicCache>>,
101
102 read_state: Mutex<ReadState<<D as Keyed>::K>>,
103
104 deserializer_type: PhantomData<DA>, discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
107 status_receiver: StatusChannelReceiver<DataReaderStatus>,
108
109 #[allow(dead_code)] reader_command: mio_channel::SyncSender<ReaderCommand>,
112 data_reader_waker: Arc<Mutex<Option<Waker>>>,
113
114 event_source: PollEventSource,
115}
116
117impl<D, DA> Drop for SimpleDataReader<D, DA>
118where
119 D: Keyed,
120 DA: DeserializerAdapter<D>,
121{
122 fn drop(&mut self) {
123 self.my_subscriber.remove_reader(self.my_guid);
125
126 match self
128 .discovery_command
129 .send(DiscoveryCommand::RemoveLocalReader { guid: self.my_guid })
130 {
131 Ok(_) => {}
132 Err(mio_channel::SendError::Disconnected(_)) => {
133 debug!("Failed to send DiscoveryCommand::RemoveLocalReader . Maybe shutting down?");
134 }
135 Err(e) => error!("Failed to send DiscoveryCommand::RemoveLocalReader. {e:?}"),
136 }
137 }
138}
139
140impl<D: 'static, DA> SimpleDataReader<D, DA>
141where
142 D: Keyed,
143 DA: DeserializerAdapter<D>,
144{
145 #[allow(clippy::too_many_arguments)]
146 pub(crate) fn new(
147 subscriber: Subscriber,
148 my_id: EntityId,
149 topic: Topic,
150 qos_policy: QosPolicies,
151 notification_receiver: mio_channel::Receiver<()>,
153 topic_cache: Arc<Mutex<TopicCache>>,
154 discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
155 status_receiver: StatusChannelReceiver<DataReaderStatus>,
156 reader_command: mio_channel::SyncSender<ReaderCommand>,
157 data_reader_waker: Arc<Mutex<Option<Waker>>>,
158 event_source: PollEventSource,
159 ) -> CreateResult<Self> {
160 let dp = match subscriber.participant() {
161 Some(dp) => dp,
162 None => {
163 return Err(CreateError::ResourceDropped {
164 reason: "Cannot create new DataReader, DomainParticipant doesn't exist.".to_string(),
165 })
166 }
167 };
168
169 let my_guid = GUID::new_with_prefix_and_id(dp.guid_prefix(), my_id);
170
171 let topic_cache_name = topic_cache.lock().unwrap().topic_name();
173 if topic.name() != topic_cache_name {
174 return Err(CreateError::Internal {
175 reason: format!(
176 "Topic name = {} and topic cache name = {} not equal when creating a SimpleDataReader",
177 topic.name(),
178 topic_cache_name
179 ),
180 });
181 }
182
183 Ok(Self {
184 my_subscriber: subscriber,
185 qos_policy,
186 my_guid,
187 notification_receiver: Mutex::new(notification_receiver),
188 topic_cache,
189 read_state: Mutex::new(ReadState::new()),
190 my_topic: topic,
191 deserializer_type: PhantomData,
192 discovery_command,
193 status_receiver,
194 reader_command,
195 data_reader_waker,
196 event_source,
197 })
198 }
199 pub(crate) fn set_waker(&self, w: Option<Waker>) {
200 *self.data_reader_waker.lock().unwrap() = w;
201 }
202
203 pub(crate) fn drain_read_notifications(&self) {
204 let rec = self.notification_receiver.lock().unwrap();
205 while rec.try_recv().is_ok() {}
206 self.event_source.drain();
207 }
208
209 fn try_take_undecoded<'a, 'b: 'a>(
210 is_reliable: bool,
211 topic_cache: &'a TopicCache,
212 latest_instant: Timestamp,
213 last_read_sn: &'b BTreeMap<GUID, SequenceNumber>,
214 ) -> impl Iterator<Item = (Timestamp, &'a CacheChange)> {
215 topic_cache.get_changes_in_range(is_reliable, latest_instant, last_read_sn)
216 }
217
218 fn update_hash_to_key_map(
219 hash_to_key_map: &mut BTreeMap<KeyHash, D::K>,
220 deserialized: &Sample<D, D::K>,
221 ) {
222 let instance_key = match deserialized {
223 Sample::Value(d) => d.key(),
224 Sample::Dispose(k) => k.clone(),
225 };
226 hash_to_key_map.insert(instance_key.hash_key(false), instance_key);
227 }
228
229 fn deserialize_with<S>(
230 &self,
231 timestamp: Timestamp,
232 cc: &CacheChange,
233 hash_to_key_map: &mut BTreeMap<KeyHash, D::K>,
234 decoder: S,
235 ) -> ReadResult<DeserializedCacheChange<D>>
236 where
237 S: Decode<DA::Decoded, DA::DecodedKey>,
238 {
239 match cc.data_value {
240 DDSData::Data {
241 ref serialized_payload,
242 } => {
243 if let Some(recognized_rep_id) = DA::supported_encodings()
245 .iter()
246 .find(|r| **r == serialized_payload.representation_identifier)
247 {
248 match DA::from_bytes_with(&serialized_payload.value, *recognized_rep_id, decoder) {
249 Ok(payload) => {
251 let p = Sample::Value(payload);
252 Self::update_hash_to_key_map(hash_to_key_map, &p);
253 Ok(DeserializedCacheChange::new(timestamp, cc, p))
254 }
255 Err(e) => Err(ReadError::Deserialization {
256 reason: format!(
257 "Failed to deserialize sample bytes: {}, , Topic = {}, Type = {:?}",
258 e,
259 self.my_topic.name(),
260 self.my_topic.get_type()
261 ),
262 }),
263 }
264 } else {
265 info!(
266 "Unknown representation id: {:?} , Topic = {}, Type = {:?} data = {:02x?}",
267 serialized_payload.representation_identifier,
268 self.my_topic.name(),
269 self.my_topic.get_type(),
270 serialized_payload.value,
271 );
272 Err(ReadError::Deserialization {
273 reason: format!(
274 "Unknown representation id {:?} , Topic = {}, Type = {:?}",
275 serialized_payload.representation_identifier,
276 self.my_topic.name(),
277 self.my_topic.get_type()
278 ),
279 })
280 }
281 }
282
283 DDSData::DisposeByKey {
284 key: ref serialized_key,
285 ..
286 } => {
287 match DA::key_from_bytes_with(
288 &serialized_key.value,
289 serialized_key.representation_identifier,
290 decoder,
291 ) {
292 Ok(key) => {
293 let k = Sample::Dispose(key);
294 Self::update_hash_to_key_map(hash_to_key_map, &k);
295 Ok(DeserializedCacheChange::new(timestamp, cc, k))
296 }
297 Err(e) => Err(ReadError::Deserialization {
298 reason: format!(
299 "Failed to deserialize key {}, Topic = {}, Type = {:?}",
300 e,
301 self.my_topic.name(),
302 self.my_topic.get_type()
303 ),
304 }),
305 }
306 }
307
308 DDSData::DisposeByKeyHash { key_hash, .. } => {
309 if let Some(key) = hash_to_key_map.get(&key_hash) {
312 Ok(DeserializedCacheChange::new(
313 timestamp,
314 cc,
315 Sample::Dispose(key.clone()),
316 ))
317 } else {
318 Err(ReadError::UnknownKey {
319 details: format!(
320 "Received dispose with unknown key hash: {:x?}, Topic = {}, Type = {:?}",
321 key_hash,
322 self.my_topic.name(),
323 self.my_topic.get_type()
324 ),
325 })
326 }
327 }
328 } }
330
331 pub fn try_take_one(&self) -> ReadResult<Option<DeserializedCacheChange<D>>>
334 where
335 DA: DeserializerAdapter<D> + DefaultDecoder<D>,
336 {
337 Self::try_take_one_with(self, DA::DECODER)
338 }
339
340 #[allow(clippy::needless_pass_by_value)]
343 pub fn try_take_one_with<S>(&self, decoder: S) -> ReadResult<Option<DeserializedCacheChange<D>>>
344 where
345 S: Decode<DA::Decoded, DA::DecodedKey> + Clone,
346 {
347 let is_reliable = matches!(
348 self.qos_policy.reliability(),
349 Some(policy::Reliability::Reliable { .. })
350 );
351
352 let topic_cache = self.acquire_the_topic_cache_guard();
353
354 let mut read_state_ref = self.read_state.lock().unwrap();
355 let latest_instant = read_state_ref.latest_instant;
356 let (last_read_sn, hash_to_key_map) = read_state_ref.get_sn_map_and_hash_map();
357
358 let mut changes =
359 Self::try_take_undecoded(is_reliable, &topic_cache, latest_instant, last_read_sn);
360
361 loop {
363 let (timestamp, cc) = match changes.next() {
364 None => return Ok(None), Some((ts, cc)) => (ts, cc),
366 };
367
368 let result = self.deserialize_with(timestamp, cc, hash_to_key_map, decoder.clone());
369
370 if let Err(ReadError::UnknownKey { .. }) = result {
371 } else {
373 drop(changes);
375
376 let writer_guid = cc.writer_guid;
379 let sequence_number = cc.sequence_number;
380 read_state_ref.latest_instant = max(latest_instant, timestamp);
383 read_state_ref
384 .last_read_sn
385 .insert(writer_guid, sequence_number);
386
387 return result.map(Some);
397 }
398 }
399 }
400
401 pub fn qos(&self) -> &QosPolicies {
402 &self.qos_policy
403 }
404
405 pub fn guid(&self) -> GUID {
406 self.my_guid
407 }
408
409 pub fn topic(&self) -> &Topic {
410 &self.my_topic
411 }
412
413 pub fn as_async_stream<S>(&self) -> SimpleDataReaderStream<'_, D, S, DA>
414 where
415 DA: DefaultDecoder<D, Decoder = S>,
416 DA::Decoder: Clone,
417 S: Decode<DA::Decoded, DA::DecodedKey>,
418 {
419 Self::as_async_stream_with(self, DA::DECODER)
420 }
421
422 pub fn as_async_stream_with<S>(&self, decoder: S) -> SimpleDataReaderStream<'_, D, S, DA>
423 where
424 S: Decode<DA::Decoded, DA::DecodedKey> + Clone,
425 {
426 SimpleDataReaderStream {
427 simple_datareader: self,
428 decoder,
429 }
430 }
431
432 fn acquire_the_topic_cache_guard(&self) -> MutexGuard<'_, TopicCache> {
433 self.topic_cache.lock().unwrap_or_else(|e| {
434 panic!(
435 "The topic cache of topic {} is poisoned. Error: {}",
436 &self.my_topic.name(),
437 e
438 )
439 })
440 }
441}
442
443impl<D, DA> mio_06::Evented for SimpleDataReader<D, DA>
446where
447 D: Keyed,
448 DA: DeserializerAdapter<D>,
449{
450 fn register(
453 &self,
454 poll: &mio_06::Poll,
455 token: mio_06::Token,
456 interest: mio_06::Ready,
457 opts: mio_06::PollOpt,
458 ) -> io::Result<()> {
459 self
460 .notification_receiver
461 .lock()
462 .unwrap()
463 .register(poll, token, interest, opts)
464 }
465
466 fn reregister(
467 &self,
468 poll: &mio_06::Poll,
469 token: mio_06::Token,
470 interest: mio_06::Ready,
471 opts: mio_06::PollOpt,
472 ) -> io::Result<()> {
473 self
474 .notification_receiver
475 .lock()
476 .unwrap()
477 .reregister(poll, token, interest, opts)
478 }
479
480 fn deregister(&self, poll: &mio_06::Poll) -> io::Result<()> {
481 self.notification_receiver.lock().unwrap().deregister(poll)
482 }
483}
484
485impl<D, DA> mio_08::event::Source for SimpleDataReader<D, DA>
486where
487 D: Keyed,
488 DA: DeserializerAdapter<D>,
489{
490 fn register(
491 &mut self,
492 registry: &mio_08::Registry,
493 token: mio_08::Token,
494 interests: mio_08::Interest,
495 ) -> io::Result<()> {
496 self.event_source.register(registry, token, interests)
497 }
498
499 fn reregister(
500 &mut self,
501 registry: &mio_08::Registry,
502 token: mio_08::Token,
503 interests: mio_08::Interest,
504 ) -> io::Result<()> {
505 self.event_source.reregister(registry, token, interests)
506 }
507
508 fn deregister(&mut self, registry: &mio_08::Registry) -> io::Result<()> {
509 self.event_source.deregister(registry)
510 }
511}
512
513impl<'a, D, DA> StatusEvented<'a, DataReaderStatus, SimpleDataReaderEventStream<'a, D, DA>>
514 for SimpleDataReader<D, DA>
515where
516 D: Keyed,
517 DA: DeserializerAdapter<D>,
518{
519 fn as_status_evented(&mut self) -> &dyn mio_06::Evented {
520 self.status_receiver.as_status_evented()
521 }
522
523 fn as_status_source(&mut self) -> &mut dyn mio_08::event::Source {
524 self.status_receiver.as_status_source()
525 }
526
527 fn as_async_status_stream(&'a self) -> SimpleDataReaderEventStream<'a, D, DA> {
528 SimpleDataReaderEventStream {
529 simple_datareader: self,
530 }
531 }
532
533 fn try_recv_status(&self) -> Option<DataReaderStatus> {
534 self.status_receiver.try_recv_status()
535 }
536}
537
538impl<D, DA> RTPSEntity for SimpleDataReader<D, DA>
539where
540 D: Keyed + DeserializeOwned,
541 DA: DeserializerAdapter<D>,
542{
543 fn guid(&self) -> GUID {
544 self.my_guid
545 }
546}
547
548pub struct SimpleDataReaderStream<
554 'a,
555 D: Keyed + 'static,
556 S: Decode<DA::Decoded, DA::DecodedKey>,
557 DA: DeserializerAdapter<D> + 'static = CDRDeserializerAdapter<D>,
558> {
559 simple_datareader: &'a SimpleDataReader<D, DA>,
560 decoder: S,
561}
562
563impl<D, S, DA> Unpin for SimpleDataReaderStream<'_, D, S, DA>
568where
569 D: Keyed + 'static,
570 DA: DeserializerAdapter<D>,
571 S: Decode<DA::Decoded, DA::DecodedKey> + Unpin,
572{
573}
574
575impl<D, S, DA> Stream for SimpleDataReaderStream<'_, D, S, DA>
576where
577 D: Keyed + 'static,
578 DA: DeserializerAdapter<D>,
579 S: Decode<DA::Decoded, DA::DecodedKey> + Clone,
580{
581 type Item = ReadResult<DeserializedCacheChange<D>>;
582
583 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
591 debug!("poll_next");
592 match self
593 .simple_datareader
594 .try_take_one_with(self.decoder.clone())
595 {
596 Err(e) =>
597 {
599 Poll::Ready(Some(Err(e)))
600 }
601
602 Ok(Some(d)) => Poll::Ready(Some(Ok(d))),
604
605 Ok(None) => {
607 self.simple_datareader.set_waker(Some(cx.waker().clone()));
619 match self
620 .simple_datareader
621 .try_take_one_with(self.decoder.clone())
622 {
623 Err(e) => Poll::Ready(Some(Err(e))),
624 Ok(Some(d)) => Poll::Ready(Some(Ok(d))),
625 Ok(None) => Poll::Pending,
626 }
627 }
628 } } } impl<D, S, DA> FusedStream for SimpleDataReaderStream<'_, D, S, DA>
633where
634 D: Keyed + 'static,
635 DA: DeserializerAdapter<D>,
636 S: Decode<DA::Decoded, DA::DecodedKey> + Clone,
637{
638 fn is_terminated(&self) -> bool {
639 false }
641}
642
643pub struct SimpleDataReaderEventStream<
647 'a,
648 D: Keyed + 'static,
649 DA: DeserializerAdapter<D> + 'static = CDRDeserializerAdapter<D>,
650> {
651 simple_datareader: &'a SimpleDataReader<D, DA>,
652}
653
654impl<D, DA> Stream for SimpleDataReaderEventStream<'_, D, DA>
655where
656 D: Keyed + 'static,
657 DA: DeserializerAdapter<D>,
658{
659 type Item = DataReaderStatus;
660
661 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
662 Pin::new(
663 &mut self
664 .simple_datareader
665 .status_receiver
666 .as_async_status_stream(),
667 )
668 .poll_next(cx)
669 } } impl<D, DA> FusedStream for SimpleDataReaderEventStream<'_, D, DA>
673where
674 D: Keyed + 'static,
675 DA: DeserializerAdapter<D>,
676{
677 fn is_terminated(&self) -> bool {
678 self
679 .simple_datareader
680 .status_receiver
681 .as_async_status_stream()
682 .is_terminated()
683 }
684}