1use std::{
2 io,
3 pin::Pin,
4 sync::{Arc, Mutex, MutexGuard},
5 task::{Context, Poll},
6};
7
8#[allow(unused_imports)]
9use log::{debug, error, info, trace, warn};
10use futures::stream::{FusedStream, Stream};
11
12use super::datasample_cache::DataSampleCache;
13use crate::{
14 dds::{
15 adapters::with_key::{DefaultDecoder, *},
16 key::*,
17 qos::*,
18 readcondition::*,
19 result::ReadResult,
20 statusevents::*,
21 with_key::{datasample::*, simpledatareader::*},
22 ReadError,
23 },
24 discovery::sedp_messages::PublicationBuiltinTopicData,
25 serialization::CDRDeserializerAdapter,
26 structure::{duration::Duration, entity::RTPSEntity, guid::GUID, time::Timestamp},
27};
28
29pub type DataReaderCdr<D> = DataReader<D, CDRDeserializerAdapter<D>>;
31
32#[derive(Clone, Copy, Debug, PartialEq, Eq)]
35pub enum SelectByKey {
36 This,
37 Next,
38}
39
40pub struct DataReader<D: Keyed, DA: DeserializerAdapter<D> = CDRDeserializerAdapter<D>> {
73 simple_data_reader: SimpleDataReader<D, DA>,
74 datasample_cache: DataSampleCache<D>, }
76
77impl<D: 'static, DA> DataReader<D, DA>
78where
79 D: Keyed,
80 DA: DeserializerAdapter<D>,
81{
82 pub(crate) fn from_simple_data_reader(simple_data_reader: SimpleDataReader<D, DA>) -> Self {
83 let dsc = DataSampleCache::new(simple_data_reader.qos().clone());
84
85 Self {
86 simple_data_reader,
87 datasample_cache: dsc,
88 }
89 }
90}
91
92impl<D: 'static, DA> DataReader<D, DA>
93where
94 D: Keyed,
95 DA: DeserializerAdapter<D> + DefaultDecoder<D>,
96{
97 fn fill_and_lock_local_datasample_cache(&mut self) -> ReadResult<()> {
101 while let Some(dcc) = self.simple_data_reader.try_take_one()? {
102 self
103 .datasample_cache
104 .fill_from_deserialized_cache_change(dcc);
105 }
106 Ok(())
107 }
108
109 fn drain_read_notifications(&self) {
110 self.simple_data_reader.drain_read_notifications();
111 }
112
113 fn select_keys_for_access(&self, read_condition: ReadCondition) -> Vec<(Timestamp, D::K)> {
114 self.datasample_cache.select_keys_for_access(read_condition)
115 }
116
117 fn take_by_keys(&mut self, keys: &[(Timestamp, D::K)]) -> Vec<DataSample<D>> {
118 self.datasample_cache.take_by_keys(keys)
119 }
120
121 fn take_bare_by_keys(&mut self, keys: &[(Timestamp, D::K)]) -> Vec<Sample<D, D::K>> {
122 self.datasample_cache.take_bare_by_keys(keys)
123 }
124
125 fn select_instance_keys_for_access(
126 &self,
127 instance: &D::K,
128 rc: ReadCondition,
129 ) -> Vec<(Timestamp, D::K)> {
130 self
131 .datasample_cache
132 .select_instance_keys_for_access(instance, rc)
133 }
134
135 pub fn read(
177 &mut self,
178 max_samples: usize,
179 read_condition: ReadCondition,
180 ) -> ReadResult<Vec<DataSample<&D>>> {
181 self.drain_read_notifications();
183 self.fill_and_lock_local_datasample_cache()?;
184
185 let mut selected = self.select_keys_for_access(read_condition);
186 selected.truncate(max_samples);
187
188 let result = self.datasample_cache.read_by_keys(&selected);
189
190 Ok(result)
191 }
192
193 pub fn take(
236 &mut self,
237 max_samples: usize,
238 read_condition: ReadCondition,
239 ) -> ReadResult<Vec<DataSample<D>>> {
240 self.drain_read_notifications();
242
243 self.fill_and_lock_local_datasample_cache()?;
244 let mut selected = self.select_keys_for_access(read_condition);
245 trace!("take selected count = {}", selected.len());
246 selected.truncate(max_samples);
247
248 let result = self.take_by_keys(&selected);
249 trace!("take taken count = {}", result.len());
250
251 Ok(result)
252 }
253
254 pub fn read_next_sample(&mut self) -> ReadResult<Option<DataSample<&D>>> {
289 let mut ds = self.read(1, ReadCondition::not_read())?;
290 Ok(ds.pop())
291 }
292
293 pub fn take_next_sample(&mut self) -> ReadResult<Option<DataSample<D>>> {
328 let mut ds = self.take(1, ReadCondition::not_read())?;
329 Ok(ds.pop())
330 }
331
332 fn read_bare(
335 &mut self,
336 max_samples: usize,
337 read_condition: ReadCondition,
338 ) -> ReadResult<Vec<Sample<&D, D::K>>> {
339 self.drain_read_notifications();
340 self.fill_and_lock_local_datasample_cache()?;
341
342 let mut selected = self.select_keys_for_access(read_condition);
343 selected.truncate(max_samples);
344
345 let result = self.datasample_cache.read_bare_by_keys(&selected);
346
347 Ok(result)
348 }
349
350 fn take_bare(
351 &mut self,
352 max_samples: usize,
353 read_condition: ReadCondition,
354 ) -> ReadResult<Vec<Sample<D, D::K>>> {
355 self.drain_read_notifications();
357 self.fill_and_lock_local_datasample_cache()?;
358
359 let mut selected = self.select_keys_for_access(read_condition);
360 trace!("take bare selected count = {}", selected.len());
361 selected.truncate(max_samples);
362
363 let result = self.take_bare_by_keys(&selected);
364 trace!("take bare taken count = {}", result.len());
365
366 Ok(result)
367 }
368
369 pub fn iterator(&mut self) -> ReadResult<impl Iterator<Item = Sample<&D, D::K>>> {
406 Ok(
409 self
410 .read_bare(usize::MAX, ReadCondition::not_read())?
411 .into_iter(),
412 )
413 }
414
415 pub fn conditional_iterator(
451 &mut self,
452 read_condition: ReadCondition,
453 ) -> ReadResult<impl Iterator<Item = Sample<&D, D::K>>> {
454 Ok(self.read_bare(usize::MAX, read_condition)?.into_iter())
457 }
458
459 pub fn into_iterator(&mut self) -> ReadResult<impl Iterator<Item = Sample<D, D::K>>> {
497 Ok(
500 self
501 .take_bare(usize::MAX, ReadCondition::not_read())?
502 .into_iter(),
503 )
504 }
505
506 pub fn into_conditional_iterator(
544 &mut self,
545 read_condition: ReadCondition,
546 ) -> ReadResult<impl Iterator<Item = Sample<D, D::K>>> {
547 Ok(self.take_bare(usize::MAX, read_condition)?.into_iter())
550 }
551
552 fn infer_key(
556 &self,
557 instance_key: Option<<D as Keyed>::K>,
558 this_or_next: SelectByKey,
559 ) -> Option<<D as Keyed>::K> {
560 match instance_key {
561 Some(k) => match this_or_next {
562 SelectByKey::This => Some(k),
563 SelectByKey::Next => self.datasample_cache.next_key(&k),
564 },
565 None => self.datasample_cache.instance_map.keys().next().cloned(),
566 }
567 }
568
569 pub fn read_instance(
614 &mut self,
615 max_samples: usize,
616 read_condition: ReadCondition,
617 instance_key: Option<<D as Keyed>::K>,
620 this_or_next: SelectByKey,
623 ) -> ReadResult<Vec<DataSample<&D>>> {
624 self.drain_read_notifications();
625 self.fill_and_lock_local_datasample_cache()?;
626
627 let key = match Self::infer_key(self, instance_key, this_or_next) {
628 Some(k) => k,
629 None => return Ok(Vec::new()),
630 };
631
632 let mut selected = self
633 .datasample_cache
634 .select_instance_keys_for_access(&key, read_condition);
635 selected.truncate(max_samples);
636
637 let result = self.datasample_cache.read_by_keys(&selected);
638
639 Ok(result)
640 }
641
642 pub fn take_instance(
681 &mut self,
682 max_samples: usize,
683 read_condition: ReadCondition,
684 instance_key: Option<<D as Keyed>::K>,
687 this_or_next: SelectByKey,
690 ) -> ReadResult<Vec<DataSample<D>>> {
691 self.drain_read_notifications();
693
694 self.fill_and_lock_local_datasample_cache()?;
695
696 let key = match self.infer_key(instance_key, this_or_next) {
697 Some(k) => k,
698 None => return Ok(Vec::new()),
699 };
700
701 let mut selected = self.select_instance_keys_for_access(&key, read_condition);
702 selected.truncate(max_samples);
703
704 let result = self.take_by_keys(&selected);
705
706 Ok(result)
707 }
708
709 pub fn wait_for_historical_data(&mut self, _max_wait: Duration) -> bool {
713 todo!()
714 }
715
716 pub fn get_matched_publications(&self) -> impl Iterator<Item = PublicationBuiltinTopicData> {
725 vec![].into_iter()
727 }
728
729 pub fn async_bare_sample_stream(self) -> BareDataReaderStream<D, DA> {
732 BareDataReaderStream {
733 datareader: Arc::new(Mutex::new(self)),
734 }
735 }
736
737 pub fn async_sample_stream(self) -> DataReaderStream<D, DA> {
740 DataReaderStream {
741 datareader: Arc::new(Mutex::new(self)),
742 }
743 }
744} impl<D, DA> mio_06::Evented for DataReader<D, DA>
749where
750 D: Keyed,
751 DA: DeserializerAdapter<D>,
752{
753 fn register(
756 &self,
757 poll: &mio_06::Poll,
758 token: mio_06::Token,
759 interest: mio_06::Ready,
760 opts: mio_06::PollOpt,
761 ) -> io::Result<()> {
762 self
763 .simple_data_reader
764 .register(poll, token, interest, opts)
765 }
766
767 fn reregister(
768 &self,
769 poll: &mio_06::Poll,
770 token: mio_06::Token,
771 interest: mio_06::Ready,
772 opts: mio_06::PollOpt,
773 ) -> io::Result<()> {
774 self
775 .simple_data_reader
776 .reregister(poll, token, interest, opts)
777 }
778
779 fn deregister(&self, poll: &mio_06::Poll) -> io::Result<()> {
780 self.simple_data_reader.deregister(poll)
781 }
782}
783
784impl<D, DA> mio_08::event::Source for DataReader<D, DA>
785where
786 D: Keyed,
787 DA: DeserializerAdapter<D>,
788{
789 fn register(
790 &mut self,
791 registry: &mio_08::Registry,
792 token: mio_08::Token,
793 interests: mio_08::Interest,
794 ) -> io::Result<()> {
795 <SimpleDataReader<D, DA> as mio_08::event::Source>::register(
798 &mut self.simple_data_reader,
799 registry,
800 token,
801 interests,
802 )
803 }
804
805 fn reregister(
806 &mut self,
807 registry: &mio_08::Registry,
808 token: mio_08::Token,
809 interests: mio_08::Interest,
810 ) -> io::Result<()> {
811 <SimpleDataReader<D, DA> as mio_08::event::Source>::reregister(
812 &mut self.simple_data_reader,
813 registry,
814 token,
815 interests,
816 )
817 }
818
819 fn deregister(&mut self, registry: &mio_08::Registry) -> io::Result<()> {
820 <SimpleDataReader<D, DA> as mio_08::event::Source>::deregister(
821 &mut self.simple_data_reader,
822 registry,
823 )
824 }
825}
826
827impl<'a, D, DA> StatusEvented<'a, DataReaderStatus, SimpleDataReaderEventStream<'a, D, DA>>
828 for DataReader<D, DA>
829where
830 D: Keyed + 'static,
831 DA: DeserializerAdapter<D>,
832{
833 fn as_status_evented(&mut self) -> &dyn mio_06::Evented {
834 self.simple_data_reader.as_status_evented()
835 }
836
837 fn as_status_source(&mut self) -> &mut dyn mio_08::event::Source {
838 self.simple_data_reader.as_status_source()
839 }
840
841 fn as_async_status_stream(&'a self) -> SimpleDataReaderEventStream<'a, D, DA> {
842 self.simple_data_reader.as_async_status_stream()
843 }
844
845 fn try_recv_status(&self) -> Option<DataReaderStatus> {
846 self.simple_data_reader.try_recv_status()
847 }
848}
849
850impl<D, DA> HasQoSPolicy for DataReader<D, DA>
851where
852 D: Keyed + 'static,
853 DA: DeserializerAdapter<D>,
854{
855 fn qos(&self) -> QosPolicies {
856 self.simple_data_reader.qos().clone()
857 }
858}
859
860impl<D, DA> RTPSEntity for DataReader<D, DA>
861where
862 D: Keyed + 'static,
863 DA: DeserializerAdapter<D>,
864{
865 fn guid(&self) -> GUID {
866 self.simple_data_reader.guid()
867 }
868}
869
870pub struct BareDataReaderStream<
876 D: Keyed + 'static,
877 DA: DeserializerAdapter<D> + 'static = CDRDeserializerAdapter<D>,
878> {
879 datareader: Arc<Mutex<DataReader<D, DA>>>,
880}
881
882impl<D, DA> BareDataReaderStream<D, DA>
883where
884 D: Keyed + 'static,
885 DA: DeserializerAdapter<D>,
886{
887 pub fn async_event_stream(&self) -> DataReaderEventStream<D, DA> {
889 DataReaderEventStream {
890 datareader: Arc::clone(&self.datareader),
891 }
892 }
893 fn lock_datareader(&self) -> ReadResult<MutexGuard<'_, DataReader<D, DA>>> {
894 self.datareader.lock().map_err(|e| ReadError::Poisoned {
895 reason: format!("BareDataReaderStream could not lock datareader: {e:?}"),
896 })
897 }
898}
899
900impl<D, DA> Unpin for BareDataReaderStream<D, DA>
902where
903 D: Keyed + 'static,
904 DA: DeserializerAdapter<D>,
905{
906}
907
908impl<D, DA> Stream for BareDataReaderStream<D, DA>
909where
910 D: Keyed + 'static,
911 DA: DeserializerAdapter<D> + DefaultDecoder<D>,
912{
913 type Item = ReadResult<Sample<D, D::K>>;
914
915 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
916 debug!("poll_next");
917 let mut datareader = match self.lock_datareader() {
918 Ok(g) => g,
919 Err(e) => return Poll::Ready(Some(Err(e))),
920 }; match datareader.take_bare(1, ReadCondition::not_read()) {
923 Err(e) =>
924 {
926 Poll::Ready(Some(Err(e)))
927 }
928
929 Ok(mut v) => {
930 match v.pop() {
931 Some(d) => Poll::Ready(Some(Ok(d))),
932 None => {
933 datareader
939 .simple_data_reader
940 .set_waker(Some(cx.waker().clone()));
941 match datareader.take_bare(1, ReadCondition::not_read()) {
942 Err(e) => Poll::Ready(Some(Err(e))),
943 Ok(mut v) => match v.pop() {
944 None => Poll::Pending,
945 Some(d) => Poll::Ready(Some(Ok(d))),
946 },
947 }
948 }
949 }
950 }
951 }
952 }
953}
954
955impl<D, DA> FusedStream for BareDataReaderStream<D, DA>
956where
957 D: Keyed + 'static,
958 DA: DeserializerAdapter<D> + DefaultDecoder<D>,
959{
960 fn is_terminated(&self) -> bool {
961 false }
963}
964
965pub struct DataReaderStream<
968 D: Keyed + 'static,
969 DA: DeserializerAdapter<D> + 'static = CDRDeserializerAdapter<D>,
970> {
971 datareader: Arc<Mutex<DataReader<D, DA>>>,
972}
973
974impl<D, DA> DataReaderStream<D, DA>
975where
976 D: Keyed + 'static,
977 DA: DeserializerAdapter<D>,
978{
979 pub fn async_event_stream(&self) -> DataReaderEventStream<D, DA> {
981 DataReaderEventStream {
982 datareader: Arc::clone(&self.datareader),
983 }
984 }
985 fn lock_datareader(&self) -> ReadResult<MutexGuard<'_, DataReader<D, DA>>> {
986 self.datareader.lock().map_err(|e| ReadError::Poisoned {
987 reason: format!("DataReaderStream could not lock datareader: {e:?}"),
988 })
989 }
990}
991
992impl<D, DA> Unpin for DataReaderStream<D, DA>
994where
995 D: Keyed + 'static,
996 DA: DeserializerAdapter<D>,
997{
998}
999
1000impl<D, DA> Stream for DataReaderStream<D, DA>
1001where
1002 D: Keyed + 'static,
1003 DA: DeserializerAdapter<D> + DefaultDecoder<D>,
1004{
1005 type Item = ReadResult<DataSample<D>>;
1006
1007 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1008 debug!("poll_next");
1009 let mut datareader = match self.lock_datareader() {
1010 Ok(g) => g,
1011 Err(e) => return Poll::Ready(Some(Err(e))),
1012 }; match datareader.take(1, ReadCondition::not_read()) {
1015 Err(e) =>
1016 {
1018 Poll::Ready(Some(Err(e)))
1019 }
1020
1021 Ok(mut v) => {
1022 match v.pop() {
1023 Some(d) => Poll::Ready(Some(Ok(d))),
1024 None => {
1025 datareader
1031 .simple_data_reader
1032 .set_waker(Some(cx.waker().clone()));
1033 match datareader.take(1, ReadCondition::not_read()) {
1034 Err(e) => Poll::Ready(Some(Err(e))),
1035 Ok(mut v) => match v.pop() {
1036 None => Poll::Pending,
1037 Some(d) => Poll::Ready(Some(Ok(d))),
1038 },
1039 }
1040 }
1041 }
1042 }
1043 }
1044 }
1045}
1046
1047impl<D, DA> FusedStream for DataReaderStream<D, DA>
1048where
1049 D: Keyed + 'static,
1050 DA: DeserializerAdapter<D> + DefaultDecoder<D>,
1051{
1052 fn is_terminated(&self) -> bool {
1053 false }
1055}
1056
1057pub struct DataReaderEventStream<
1061 D: Keyed + 'static,
1062 DA: DeserializerAdapter<D> + 'static = CDRDeserializerAdapter<D>,
1063> {
1064 datareader: Arc<Mutex<DataReader<D, DA>>>,
1065}
1066
1067impl<D, DA> DataReaderEventStream<D, DA>
1068where
1069 D: Keyed + 'static,
1070 DA: DeserializerAdapter<D>,
1071{
1072 fn lock_datareader(&self) -> ReadResult<MutexGuard<'_, DataReader<D, DA>>> {
1073 self.datareader.lock().map_err(|e| ReadError::Poisoned {
1074 reason: format!("DataReaderEventStream could not lock datareader: {e:?}"),
1075 })
1076 }
1077}
1078
1079impl<D, DA> Stream for DataReaderEventStream<D, DA>
1080where
1081 D: Keyed + 'static,
1082 DA: DeserializerAdapter<D>,
1083{
1084 type Item = DataReaderStatus;
1085
1086 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1087 let datareader = match self.lock_datareader() {
1088 Ok(g) => g,
1089 Err(_e) => return Poll::Ready(None),
1090 };
1094
1095 Pin::new(&mut datareader.simple_data_reader.as_async_status_stream()).poll_next(cx)
1096 }
1097}
1098
1099impl<D, DA> FusedStream for DataReaderEventStream<D, DA>
1100where
1101 D: Keyed + 'static,
1102 DA: DeserializerAdapter<D>,
1103{
1104 fn is_terminated(&self) -> bool {
1105 false }
1107}
1108
1109#[cfg(test)]
1117mod tests {
1118 use std::rc::Rc;
1119
1120 use bytes::Bytes;
1121 use mio_extras::channel as mio_channel;
1122 use log::info;
1123 use byteorder::LittleEndian;
1124
1125 use super::*;
1126 use crate::{
1127 dds::{
1128 participant::DomainParticipant,
1129 topic::{TopicDescription, TopicKind},
1130 },
1131 messages::submessages::{
1132 elements::serialized_payload::SerializedPayload, submessage_flag::*, submessages::Data,
1133 },
1134 mio_source,
1135 network::udp_sender::UDPSender,
1136 rtps::{
1137 message_receiver::*,
1138 reader::{Reader, ReaderIngredients},
1139 },
1140 serialization::to_vec,
1141 structure::{
1142 guid::{EntityId, EntityKind, GuidPrefix},
1143 sequence_number::SequenceNumber,
1144 },
1145 test::random_data::*,
1146 RepresentationIdentifier,
1147 };
1148
1149 #[test]
1150 fn read_and_take() {
1151 let dp = DomainParticipant::new(0).expect("Participant creation failed!");
1154
1155 let mut qos = QosPolicies::qos_none();
1156 qos.history = Some(policy::History::KeepAll); let sub = dp.create_subscriber(&qos).unwrap();
1159 let topic = dp
1160 .create_topic(
1161 "dr read".to_string(),
1162 "read fn test?".to_string(),
1163 &qos,
1164 TopicKind::WithKey,
1165 )
1166 .unwrap();
1167
1168 let topic_cache =
1169 dp.dds_cache()
1170 .write()
1171 .unwrap()
1172 .add_new_topic(topic.name(), topic.get_type(), &topic.qos());
1173
1174 let (notification_sender, _notification_receiver) = mio_channel::sync_channel::<()>(100);
1176 let (_notification_event_source, notification_event_sender) =
1177 mio_source::make_poll_channel().unwrap();
1178 let data_reader_waker = Arc::new(Mutex::new(None));
1179
1180 let (status_sender, _status_receiver) = sync_status_channel::<DataReaderStatus>(4).unwrap();
1181 let (participant_status_sender, _participant_status_receiver) =
1182 sync_status_channel(16).unwrap();
1183
1184 let (_reader_command_sender, reader_command_receiver) =
1185 mio_channel::sync_channel::<ReaderCommand>(10);
1186
1187 let default_id = EntityId::default();
1188 let reader_guid = GUID::new_with_prefix_and_id(dp.guid_prefix(), default_id);
1189
1190 let reader_ing = ReaderIngredients {
1191 guid: reader_guid,
1192 notification_sender,
1193 status_sender,
1194 topic_name: topic.name(),
1195 topic_cache_handle: topic_cache,
1196 like_stateless: false,
1197 qos_policy: QosPolicies::qos_none(),
1198 data_reader_command_receiver: reader_command_receiver,
1199 data_reader_waker,
1200 poll_event_sender: notification_event_sender,
1201 security_plugins: None,
1202 };
1203
1204 let mut reader = Reader::new(
1205 reader_ing,
1206 Rc::new(UDPSender::new_with_random_port().unwrap()),
1207 mio_extras::timer::Builder::default().build(),
1208 participant_status_sender,
1209 );
1210
1211 let mut datareader = sub
1213 .create_datareader::<RandomData, CDRDeserializerAdapter<RandomData>>(&topic, None)
1214 .unwrap();
1215
1216 let writer_guid = GUID {
1217 prefix: GuidPrefix::new(&[1; 12]),
1218 entity_id: EntityId::create_custom_entity_id(
1219 [1; 3],
1220 EntityKind::WRITER_WITH_KEY_USER_DEFINED,
1221 ),
1222 };
1223 let mr_state = MessageReceiverState {
1224 source_guid_prefix: writer_guid.prefix,
1225 ..Default::default()
1226 };
1227 reader.matched_writer_add(
1228 writer_guid,
1229 EntityId::UNKNOWN,
1230 mr_state.unicast_reply_locator_list.to_vec(),
1231 mr_state.multicast_reply_locator_list.to_vec(),
1232 &QosPolicies::qos_none(),
1233 );
1234
1235 let test_data = RandomData {
1237 a: 10,
1238 b: ":DDD".to_string(),
1239 };
1240
1241 let test_data2 = RandomData {
1242 a: 11,
1243 b: ":)))".to_string(),
1244 };
1245 let data_msg = Data {
1246 reader_id: reader.entity_id(),
1247 writer_id: writer_guid.entity_id,
1248 writer_sn: SequenceNumber::from(1),
1249 serialized_payload: Some(
1250 SerializedPayload {
1251 representation_identifier: RepresentationIdentifier::CDR_LE,
1252 representation_options: [0, 0],
1253 value: Bytes::from(to_vec::<RandomData, LittleEndian>(&test_data).unwrap()),
1254 }
1255 .into(),
1256 ),
1257 ..Data::default()
1258 };
1259
1260 let data_msg2 = Data {
1261 reader_id: reader.entity_id(),
1262 writer_id: writer_guid.entity_id,
1263 writer_sn: SequenceNumber::from(2),
1264 serialized_payload: Some(
1265 SerializedPayload {
1266 representation_identifier: RepresentationIdentifier::CDR_LE,
1267 representation_options: [0, 0],
1268 value: Bytes::from(to_vec::<RandomData, LittleEndian>(&test_data2).unwrap()),
1269 }
1270 .into(),
1271 ),
1272 ..Data::default()
1273 };
1274
1275 let data_flags = DATA_Flags::Endianness | DATA_Flags::Data;
1276
1277 reader.handle_data_msg(data_msg, data_flags, &mr_state);
1278 reader.handle_data_msg(data_msg2, data_flags, &mr_state);
1279
1280 {
1283 let result_vec = datareader.read(100, ReadCondition::any()).unwrap();
1284 assert_eq!(result_vec.len(), 2);
1285 let d = result_vec[0].value().clone().unwrap();
1286 assert_eq!(&test_data, d);
1287 }
1288 {
1289 let result_vec2 = datareader.read(100, ReadCondition::any()).unwrap();
1290 assert_eq!(result_vec2.len(), 2);
1291 let d2 = result_vec2[1].value().clone().unwrap();
1292 assert_eq!(&test_data2, d2);
1293 }
1294 {
1295 let result_vec3 = datareader.read(100, ReadCondition::any()).unwrap();
1296 let d3 = result_vec3[0].value().clone().unwrap();
1297 assert_eq!(&test_data, d3);
1298 }
1299
1300 let mut result_vec = datareader.take(100, ReadCondition::any()).unwrap();
1302 let datasample2 = result_vec.pop().unwrap();
1303 let datasample1 = result_vec.pop().unwrap();
1304 let data2 = datasample2.into_value().unwrap();
1305 let data1 = datasample1.into_value().unwrap();
1306 assert_eq!(test_data2, data2);
1307 assert_eq!(test_data, data1);
1308
1309 let result_vec2 = datareader.take(100, ReadCondition::any());
1310 assert!(result_vec2.is_ok());
1311 assert_eq!(result_vec2.unwrap().len(), 0);
1312 }
1313
1314 #[test]
1315 fn read_and_take_with_instance() {
1316 let dp = DomainParticipant::new(0).expect("Participant creation failed!");
1319
1320 let mut qos = QosPolicies::qos_none();
1321 qos.history = Some(policy::History::KeepAll); let sub = dp.create_subscriber(&qos).unwrap();
1324 let topic = dp
1325 .create_topic(
1326 "dr read".to_string(),
1327 "read fn test?".to_string(),
1328 &qos,
1329 TopicKind::WithKey,
1330 )
1331 .unwrap();
1332
1333 let topic_cache =
1334 dp.dds_cache()
1335 .write()
1336 .unwrap()
1337 .add_new_topic(topic.name(), topic.get_type(), &topic.qos());
1338
1339 let (notification_sender, _notification_receiver) = mio_channel::sync_channel::<()>(100);
1341 let (_notification_event_source, notification_event_sender) =
1342 mio_source::make_poll_channel().unwrap();
1343 let data_reader_waker = Arc::new(Mutex::new(None));
1344
1345 let (status_sender, _status_receiver) = sync_status_channel::<DataReaderStatus>(4).unwrap();
1346 let (participant_status_sender, _participant_status_receiver) =
1347 sync_status_channel(16).unwrap();
1348
1349 let (_reader_command_sender, reader_command_receiver) =
1350 mio_channel::sync_channel::<ReaderCommand>(10);
1351
1352 let default_id = EntityId::default();
1353 let reader_guid = GUID::new_with_prefix_and_id(dp.guid_prefix(), default_id);
1354
1355 let reader_ing = ReaderIngredients {
1356 guid: reader_guid,
1357 notification_sender,
1358 status_sender,
1359 topic_name: topic.name(),
1360 topic_cache_handle: topic_cache,
1361 like_stateless: false,
1362 qos_policy: QosPolicies::qos_none(),
1363 data_reader_command_receiver: reader_command_receiver,
1364 data_reader_waker,
1365 poll_event_sender: notification_event_sender,
1366 security_plugins: None,
1367 };
1368
1369 let mut reader = Reader::new(
1370 reader_ing,
1371 Rc::new(UDPSender::new_with_random_port().unwrap()),
1372 mio_extras::timer::Builder::default().build(),
1373 participant_status_sender,
1374 );
1375
1376 let mut datareader = sub
1378 .create_datareader::<RandomData, CDRDeserializerAdapter<RandomData>>(&topic, None)
1379 .unwrap();
1380
1381 let writer_guid = GUID {
1382 prefix: GuidPrefix::new(&[1; 12]),
1383 entity_id: EntityId::create_custom_entity_id(
1384 [1; 3],
1385 EntityKind::WRITER_WITH_KEY_USER_DEFINED,
1386 ),
1387 };
1388 let mr_state = MessageReceiverState {
1389 source_guid_prefix: writer_guid.prefix,
1390 ..Default::default()
1391 };
1392 reader.matched_writer_add(
1393 writer_guid,
1394 EntityId::UNKNOWN,
1395 mr_state.unicast_reply_locator_list.to_vec(),
1396 mr_state.multicast_reply_locator_list.to_vec(),
1397 &QosPolicies::qos_none(),
1398 );
1399
1400 let data_key1 = RandomData {
1402 a: 1,
1403 b: ":D".to_string(),
1404 };
1405 let data_key2_1 = RandomData {
1406 a: 2,
1407 b: ":(".to_string(),
1408 };
1409 let data_key2_2 = RandomData {
1410 a: 2,
1411 b: "??".to_string(),
1412 };
1413 let data_key2_3 = RandomData {
1414 a: 2,
1415 b: "xD".to_string(),
1416 };
1417
1418 let key1 = data_key1.key();
1419 let key2 = data_key2_1.key();
1420
1421 assert!(data_key2_1.key() == data_key2_2.key());
1422 assert!(data_key2_3.key() == key2);
1423
1424 let data_msg = Data {
1427 reader_id: reader.entity_id(),
1428 writer_id: writer_guid.entity_id,
1429 writer_sn: SequenceNumber::from(1),
1430 serialized_payload: Some(
1431 SerializedPayload {
1432 representation_identifier: RepresentationIdentifier::CDR_LE,
1433 representation_options: [0, 0],
1434 value: Bytes::from(to_vec::<RandomData, LittleEndian>(&data_key1).unwrap()),
1435 }
1436 .into(),
1437 ),
1438 ..Data::default()
1439 };
1440 let data_msg2 = Data {
1441 reader_id: reader.entity_id(),
1442 writer_id: writer_guid.entity_id,
1443 writer_sn: SequenceNumber::from(2),
1444 serialized_payload: Some(
1445 SerializedPayload {
1446 representation_identifier: RepresentationIdentifier::CDR_LE,
1447 representation_options: [0, 0],
1448 value: Bytes::from(to_vec::<RandomData, LittleEndian>(&data_key2_1).unwrap()),
1449 }
1450 .into(),
1451 ),
1452 ..Data::default()
1453 };
1454 let data_msg3 = Data {
1455 reader_id: reader.entity_id(),
1456 writer_id: writer_guid.entity_id,
1457 writer_sn: SequenceNumber::from(3),
1458 serialized_payload: Some(
1459 SerializedPayload {
1460 representation_identifier: RepresentationIdentifier::CDR_LE,
1461 representation_options: [0, 0],
1462 value: Bytes::from(to_vec::<RandomData, LittleEndian>(&data_key2_2).unwrap()),
1463 }
1464 .into(),
1465 ),
1466 ..Data::default()
1467 };
1468 let data_msg4 = Data {
1469 reader_id: reader.entity_id(),
1470 writer_id: writer_guid.entity_id,
1471 writer_sn: SequenceNumber::from(4),
1472 serialized_payload: Some(
1473 SerializedPayload {
1474 representation_identifier: RepresentationIdentifier::CDR_LE,
1475 representation_options: [0, 0],
1476 value: Bytes::from(to_vec::<RandomData, LittleEndian>(&data_key2_3).unwrap()),
1477 }
1478 .into(),
1479 ),
1480 ..Data::default()
1481 };
1482
1483 let data_flags = DATA_Flags::Endianness | DATA_Flags::Data;
1484
1485 reader.handle_data_msg(data_msg, data_flags, &mr_state);
1487 reader.handle_data_msg(data_msg2, data_flags, &mr_state);
1488 reader.handle_data_msg(data_msg3, data_flags, &mr_state);
1489 reader.handle_data_msg(data_msg4, data_flags, &mr_state);
1490
1491 info!("calling read with key 1 and this");
1495 let results =
1496 datareader.read_instance(100, ReadCondition::any(), Some(key1), SelectByKey::This);
1497 assert_eq!(&data_key1, results.unwrap()[0].value().clone().unwrap());
1498
1499 info!("calling read with None and this");
1500 let results = datareader.read_instance(100, ReadCondition::any(), None, SelectByKey::This);
1502 assert_eq!(&data_key1, results.unwrap()[0].value().clone().unwrap());
1503
1504 info!("calling read with key 1 and next");
1505 let results =
1506 datareader.read_instance(100, ReadCondition::any(), Some(key1), SelectByKey::Next);
1507 assert_eq!(results.as_ref().unwrap().len(), 3);
1508 assert_eq!(&data_key2_1, results.unwrap()[0].value().clone().unwrap());
1509
1510 info!("calling take with key 2 and this");
1512 let results =
1513 datareader.take_instance(100, ReadCondition::any(), Some(key2), SelectByKey::This);
1514 assert_eq!(results.as_ref().unwrap().len(), 3);
1515 let mut vec = results.unwrap();
1516 let d3 = vec.pop().unwrap();
1517 let d3 = d3.into_value().unwrap();
1518 let d2 = vec.pop().unwrap();
1519 let d2 = d2.into_value().unwrap();
1520 let d1 = vec.pop().unwrap();
1521 let d1 = d1.into_value().unwrap();
1522 assert_eq!(data_key2_3, d3);
1523 assert_eq!(data_key2_2, d2);
1524 assert_eq!(data_key2_1, d1);
1525
1526 info!("calling take with key 2 and this");
1529 let results =
1530 datareader.take_instance(100, ReadCondition::any(), Some(key2), SelectByKey::This);
1531 assert!(results.is_ok());
1532 assert!(results.unwrap().is_empty());
1533 }
1534}