1extern crate alloc;
28use alloc::string::{String, ToString};
29use alloc::sync::Arc;
30use alloc::vec::Vec;
31
32#[cfg(feature = "std")]
33use std::sync::Mutex;
34
35use zerodds_qos::{
36 DurabilityKind, DurabilityQosPolicy, HistoryKind, HistoryQosPolicy, ReliabilityKind,
37 ReliabilityQosPolicy,
38};
39
40use crate::builtin_topics::{
41 ParticipantBuiltinTopicData, PublicationBuiltinTopicData, SubscriptionBuiltinTopicData,
42 TOPIC_NAME_DCPS_PARTICIPANT, TOPIC_NAME_DCPS_PUBLICATION, TOPIC_NAME_DCPS_SUBSCRIPTION,
43 TOPIC_NAME_DCPS_TOPIC, TopicBuiltinTopicData,
44};
45use crate::dds_type::DdsType;
46use crate::error::{DdsError, Result};
47use crate::qos::{DataReaderQos, SubscriberQos, TopicQos};
48use crate::subscriber::{DataReader, Subscriber, SubscriberInner};
49use crate::topic::Topic;
50
51#[must_use]
54pub fn builtin_reader_qos() -> DataReaderQos {
55 let mut qos = DataReaderQos::default();
56 qos.reliability = ReliabilityQosPolicy {
57 kind: ReliabilityKind::Reliable,
58 max_blocking_time: qos.reliability.max_blocking_time,
59 };
60 qos.durability = DurabilityQosPolicy {
61 kind: DurabilityKind::TransientLocal,
62 };
63 qos.history = HistoryQosPolicy {
64 kind: HistoryKind::KeepLast,
65 depth: 1,
66 };
67 qos
68}
69
70#[derive(Debug)]
75pub struct BuiltinSubscriber {
76 subscriber: Subscriber,
79 participant_reader: DataReader<ParticipantBuiltinTopicData>,
81 topic_reader: DataReader<TopicBuiltinTopicData>,
83 publication_reader: DataReader<PublicationBuiltinTopicData>,
85 subscription_reader: DataReader<SubscriptionBuiltinTopicData>,
87 sinks: BuiltinSinks,
90}
91
92#[derive(Debug, Clone)]
95pub struct BuiltinSinks {
96 pub participant: Arc<Mutex<Vec<crate::runtime::UserSample>>>,
98 pub topic: Arc<Mutex<Vec<crate::runtime::UserSample>>>,
100 pub publication: Arc<Mutex<Vec<crate::runtime::UserSample>>>,
102 pub subscription: Arc<Mutex<Vec<crate::runtime::UserSample>>>,
104}
105
106impl BuiltinSinks {
107 pub fn push_participant(&self, sample: &ParticipantBuiltinTopicData) -> Result<()> {
114 push_into(&self.participant, sample)
115 }
116
117 pub fn push_topic(&self, sample: &TopicBuiltinTopicData) -> Result<()> {
122 push_into(&self.topic, sample)
123 }
124
125 pub fn push_publication(&self, sample: &PublicationBuiltinTopicData) -> Result<()> {
130 push_into(&self.publication, sample)
131 }
132
133 pub fn push_subscription(&self, sample: &SubscriptionBuiltinTopicData) -> Result<()> {
138 push_into(&self.subscription, sample)
139 }
140}
141
142fn push_into<T: DdsType>(
143 sink: &Arc<Mutex<Vec<crate::runtime::UserSample>>>,
144 sample: &T,
145) -> Result<()> {
146 let mut buf = Vec::new();
147 sample.encode(&mut buf).map_err(|e| DdsError::WireError {
148 message: format_err(&e),
149 })?;
150 let mut guard = sink.lock().map_err(|_| DdsError::PreconditionNotMet {
151 reason: "builtin sink mutex poisoned",
152 })?;
153 guard.push(crate::runtime::UserSample::Alive {
158 payload: buf,
159 writer_guid: [0u8; 16],
160 writer_strength: 0,
161 });
162 Ok(())
163}
164
165fn format_err(e: &crate::dds_type::EncodeError) -> String {
166 use core::fmt::Write;
167 let mut s = String::new();
168 let _ = write!(s, "{e}");
169 s
170}
171
172impl BuiltinSubscriber {
173 #[must_use]
177 pub fn new() -> Self {
178 let subscriber = Subscriber::new(SubscriberQos::default(), None);
181 let inner = subscriber.inner.clone();
182
183 let qos = builtin_reader_qos();
184
185 let part_inbox: Arc<Mutex<Vec<crate::runtime::UserSample>>> =
188 Arc::new(Mutex::new(Vec::new()));
189 let topic_inbox: Arc<Mutex<Vec<crate::runtime::UserSample>>> =
190 Arc::new(Mutex::new(Vec::new()));
191 let pub_inbox: Arc<Mutex<Vec<crate::runtime::UserSample>>> =
192 Arc::new(Mutex::new(Vec::new()));
193 let sub_inbox: Arc<Mutex<Vec<crate::runtime::UserSample>>> =
194 Arc::new(Mutex::new(Vec::new()));
195
196 let participant_reader = DataReader::new_builtin(
197 Topic::<ParticipantBuiltinTopicData>::new_orphan(
198 TOPIC_NAME_DCPS_PARTICIPANT.to_string(),
199 TopicQos::default(),
200 ),
201 qos.clone(),
202 inner.clone(),
203 part_inbox.clone(),
204 );
205 let topic_reader = DataReader::new_builtin(
206 Topic::<TopicBuiltinTopicData>::new_orphan(
207 TOPIC_NAME_DCPS_TOPIC.to_string(),
208 TopicQos::default(),
209 ),
210 qos.clone(),
211 inner.clone(),
212 topic_inbox.clone(),
213 );
214 let publication_reader = DataReader::new_builtin(
215 Topic::<PublicationBuiltinTopicData>::new_orphan(
216 TOPIC_NAME_DCPS_PUBLICATION.to_string(),
217 TopicQos::default(),
218 ),
219 qos.clone(),
220 inner.clone(),
221 pub_inbox.clone(),
222 );
223 let subscription_reader = DataReader::new_builtin(
224 Topic::<SubscriptionBuiltinTopicData>::new_orphan(
225 TOPIC_NAME_DCPS_SUBSCRIPTION.to_string(),
226 TopicQos::default(),
227 ),
228 qos,
229 inner,
230 sub_inbox.clone(),
231 );
232
233 Self {
234 subscriber,
235 participant_reader,
236 topic_reader,
237 publication_reader,
238 subscription_reader,
239 sinks: BuiltinSinks {
240 participant: part_inbox,
241 topic: topic_inbox,
242 publication: pub_inbox,
243 subscription: sub_inbox,
244 },
245 }
246 }
247
248 #[must_use]
250 pub fn subscriber(&self) -> &Subscriber {
251 &self.subscriber
252 }
253
254 #[must_use]
257 pub fn sinks(&self) -> BuiltinSinks {
258 self.sinks.clone()
259 }
260
261 pub fn lookup_datareader<T: BuiltinTopic>(&self, topic_name: &str) -> Result<DataReader<T>> {
273 T::lookup(self, topic_name)
274 }
275
276 #[must_use]
279 pub fn participant_reader(&self) -> DataReader<ParticipantBuiltinTopicData> {
280 clone_reader(&self.participant_reader)
281 }
282
283 #[must_use]
285 pub fn topic_reader(&self) -> DataReader<TopicBuiltinTopicData> {
286 clone_reader(&self.topic_reader)
287 }
288
289 #[must_use]
291 pub fn publication_reader(&self) -> DataReader<PublicationBuiltinTopicData> {
292 clone_reader(&self.publication_reader)
293 }
294
295 #[must_use]
297 pub fn subscription_reader(&self) -> DataReader<SubscriptionBuiltinTopicData> {
298 clone_reader(&self.subscription_reader)
299 }
300}
301
302impl Default for BuiltinSubscriber {
303 fn default() -> Self {
304 Self::new()
305 }
306}
307
308pub trait BuiltinTopic: DdsType + private::Sealed + Sized {
314 const TOPIC_NAME: &'static str;
316 #[doc(hidden)]
317 fn lookup(sub: &BuiltinSubscriber, topic_name: &str) -> Result<DataReader<Self>>;
318}
319
320mod private {
321 pub trait Sealed {}
322 impl Sealed for crate::builtin_topics::ParticipantBuiltinTopicData {}
323 impl Sealed for crate::builtin_topics::TopicBuiltinTopicData {}
324 impl Sealed for crate::builtin_topics::PublicationBuiltinTopicData {}
325 impl Sealed for crate::builtin_topics::SubscriptionBuiltinTopicData {}
326}
327
328impl BuiltinTopic for ParticipantBuiltinTopicData {
329 const TOPIC_NAME: &'static str = TOPIC_NAME_DCPS_PARTICIPANT;
330 fn lookup(sub: &BuiltinSubscriber, topic_name: &str) -> Result<DataReader<Self>> {
331 if topic_name != Self::TOPIC_NAME {
332 return Err(DdsError::BadParameter {
333 what: "builtin topic_name does not match type parameter",
334 });
335 }
336 Ok(sub.participant_reader())
337 }
338}
339
340impl BuiltinTopic for TopicBuiltinTopicData {
341 const TOPIC_NAME: &'static str = TOPIC_NAME_DCPS_TOPIC;
342 fn lookup(sub: &BuiltinSubscriber, topic_name: &str) -> Result<DataReader<Self>> {
343 if topic_name != Self::TOPIC_NAME {
344 return Err(DdsError::BadParameter {
345 what: "builtin topic_name does not match type parameter",
346 });
347 }
348 Ok(sub.topic_reader())
349 }
350}
351
352impl BuiltinTopic for PublicationBuiltinTopicData {
353 const TOPIC_NAME: &'static str = TOPIC_NAME_DCPS_PUBLICATION;
354 fn lookup(sub: &BuiltinSubscriber, topic_name: &str) -> Result<DataReader<Self>> {
355 if topic_name != Self::TOPIC_NAME {
356 return Err(DdsError::BadParameter {
357 what: "builtin topic_name does not match type parameter",
358 });
359 }
360 Ok(sub.publication_reader())
361 }
362}
363
364impl BuiltinTopic for SubscriptionBuiltinTopicData {
365 const TOPIC_NAME: &'static str = TOPIC_NAME_DCPS_SUBSCRIPTION;
366 fn lookup(sub: &BuiltinSubscriber, topic_name: &str) -> Result<DataReader<Self>> {
367 if topic_name != Self::TOPIC_NAME {
368 return Err(DdsError::BadParameter {
369 what: "builtin topic_name does not match type parameter",
370 });
371 }
372 Ok(sub.subscription_reader())
373 }
374}
375
376fn clone_reader<T: DdsType + Send + Sync + 'static>(r: &DataReader<T>) -> DataReader<T> {
383 DataReader::<T>::new_builtin(
384 r.topic().clone(),
385 r.qos().clone(),
386 builtin_clone_subscriber_inner(r),
387 r.__inbox_handle(),
388 )
389}
390
391fn builtin_clone_subscriber_inner<T: DdsType>(_r: &DataReader<T>) -> Arc<SubscriberInner> {
392 Arc::new(SubscriberInner {
396 qos: std::sync::Mutex::new(SubscriberQos::default()),
397 entity_state: crate::entity::EntityState::new(),
398 runtime: None,
399 listener: std::sync::Mutex::new(None),
400 participant: std::sync::Mutex::new(None),
401 access_scope: crate::coherent_set::GroupAccessScope::new(),
402 datareaders: std::sync::Mutex::new(alloc::vec::Vec::new()),
403 })
404}
405
406#[cfg(test)]
407#[allow(clippy::expect_used, clippy::unwrap_used)]
408mod tests {
409 use super::*;
410 use crate::builtin_topics::{
411 ParticipantBuiltinTopicData, PublicationBuiltinTopicData, SubscriptionBuiltinTopicData,
412 TopicBuiltinTopicData,
413 };
414 use zerodds_rtps::wire_types::Guid;
415
416 fn mk_guid(seed: u8) -> Guid {
417 let mut b = [0u8; 16];
418 for (i, slot) in b.iter_mut().enumerate() {
419 *slot = seed.wrapping_add(i as u8);
420 }
421 Guid::from_bytes(b)
422 }
423
424 #[test]
425 fn builtin_subscriber_has_four_readers() {
426 let bs = BuiltinSubscriber::new();
427 assert_eq!(bs.participant_reader().topic().name(), "DCPSParticipant");
428 assert_eq!(bs.topic_reader().topic().name(), "DCPSTopic");
429 assert_eq!(bs.publication_reader().topic().name(), "DCPSPublication");
430 assert_eq!(bs.subscription_reader().topic().name(), "DCPSSubscription");
431 }
432
433 #[test]
434 fn builtin_reader_qos_is_spec_default() {
435 let q = builtin_reader_qos();
436 assert_eq!(q.reliability.kind, ReliabilityKind::Reliable);
437 assert_eq!(q.durability.kind, DurabilityKind::TransientLocal);
438 assert_eq!(q.history.kind, HistoryKind::KeepLast);
439 assert_eq!(q.history.depth, 1);
440 }
441
442 #[test]
443 fn lookup_datareader_routes_by_type() {
444 let bs = BuiltinSubscriber::new();
445 let r = bs
446 .lookup_datareader::<ParticipantBuiltinTopicData>("DCPSParticipant")
447 .unwrap();
448 assert_eq!(r.topic().name(), "DCPSParticipant");
449
450 let r = bs
451 .lookup_datareader::<TopicBuiltinTopicData>("DCPSTopic")
452 .unwrap();
453 assert_eq!(r.topic().name(), "DCPSTopic");
454
455 let r = bs
456 .lookup_datareader::<PublicationBuiltinTopicData>("DCPSPublication")
457 .unwrap();
458 assert_eq!(r.topic().name(), "DCPSPublication");
459
460 let r = bs
461 .lookup_datareader::<SubscriptionBuiltinTopicData>("DCPSSubscription")
462 .unwrap();
463 assert_eq!(r.topic().name(), "DCPSSubscription");
464 }
465
466 #[test]
467 fn lookup_datareader_rejects_wrong_topic_name() {
468 let bs = BuiltinSubscriber::new();
469 let err = bs
470 .lookup_datareader::<ParticipantBuiltinTopicData>("DCPSPublication")
471 .unwrap_err();
472 assert!(matches!(err, DdsError::BadParameter { .. }));
473 }
474
475 #[test]
476 fn sinks_push_participant_lands_in_reader() {
477 let bs = BuiltinSubscriber::new();
478 let sample = ParticipantBuiltinTopicData {
479 key: mk_guid(0xA0),
480 user_data: alloc::vec![],
481 };
482 bs.sinks().push_participant(&sample).unwrap();
483 let reader = bs
484 .lookup_datareader::<ParticipantBuiltinTopicData>("DCPSParticipant")
485 .unwrap();
486 let samples = reader.take().unwrap();
487 assert_eq!(samples.len(), 1);
488 assert_eq!(samples[0].key, sample.key);
489 }
490
491 #[test]
492 fn sinks_push_topic_lands_in_reader() {
493 let bs = BuiltinSubscriber::new();
494 let sample = TopicBuiltinTopicData {
495 key: TopicBuiltinTopicData::synthesize_key("MyT", "MyType"),
496 name: "MyT".to_string(),
497 type_name: "MyType".to_string(),
498 durability: DurabilityKind::Volatile,
499 reliability: ReliabilityKind::Reliable,
500 };
501 bs.sinks().push_topic(&sample).unwrap();
502 let reader = bs
503 .lookup_datareader::<TopicBuiltinTopicData>("DCPSTopic")
504 .unwrap();
505 let samples = reader.take().unwrap();
506 assert_eq!(samples.len(), 1);
507 assert_eq!(samples[0].name, "MyT");
508 }
509
510 #[test]
511 fn sinks_push_publication_lands_in_reader() {
512 let bs = BuiltinSubscriber::new();
513 let sample = PublicationBuiltinTopicData {
514 key: mk_guid(0xB0),
515 participant_key: mk_guid(0xC0),
516 topic_name: "T".to_string(),
517 type_name: "T".to_string(),
518 durability: DurabilityKind::Volatile,
519 reliability: ReliabilityKind::BestEffort,
520 ownership: zerodds_qos::OwnershipKind::Shared,
521 ownership_strength: 0,
522 liveliness_lease_seconds: 0,
523 deadline_seconds: 0,
524 lifespan_seconds: 0,
525 partition: alloc::vec![],
526 };
527 bs.sinks().push_publication(&sample).unwrap();
528 let reader = bs
529 .lookup_datareader::<PublicationBuiltinTopicData>("DCPSPublication")
530 .unwrap();
531 let samples = reader.take().unwrap();
532 assert_eq!(samples.len(), 1);
533 assert_eq!(samples[0].key, sample.key);
534 }
535
536 #[test]
537 fn sinks_push_subscription_lands_in_reader() {
538 let bs = BuiltinSubscriber::new();
539 let sample = SubscriptionBuiltinTopicData {
540 key: mk_guid(0xD0),
541 participant_key: mk_guid(0xE0),
542 topic_name: "T".to_string(),
543 type_name: "T".to_string(),
544 durability: DurabilityKind::Volatile,
545 reliability: ReliabilityKind::Reliable,
546 ownership: zerodds_qos::OwnershipKind::Shared,
547 liveliness_lease_seconds: 0,
548 deadline_seconds: 0,
549 partition: alloc::vec![],
550 };
551 bs.sinks().push_subscription(&sample).unwrap();
552 let reader = bs
553 .lookup_datareader::<SubscriptionBuiltinTopicData>("DCPSSubscription")
554 .unwrap();
555 let samples = reader.take().unwrap();
556 assert_eq!(samples.len(), 1);
557 assert_eq!(samples[0].topic_name, "T");
558 }
559
560 #[test]
561 fn subscriber_handle_is_accessible() {
562 let bs = BuiltinSubscriber::new();
563 let _: &Subscriber = bs.subscriber();
566 }
567
568 #[test]
569 fn default_constructs_via_new() {
570 let bs: BuiltinSubscriber = Default::default();
571 assert_eq!(bs.participant_reader().topic().name(), "DCPSParticipant");
572 }
573
574 #[test]
575 fn lookup_topic_with_wrong_topic_name_for_topic_type() {
576 let bs = BuiltinSubscriber::new();
577 let err = bs
578 .lookup_datareader::<TopicBuiltinTopicData>("DCPSParticipant")
579 .unwrap_err();
580 assert!(matches!(err, DdsError::BadParameter { .. }));
581 }
582
583 #[test]
584 fn lookup_publication_with_wrong_topic_name() {
585 let bs = BuiltinSubscriber::new();
586 let err = bs
587 .lookup_datareader::<PublicationBuiltinTopicData>("DCPSTopic")
588 .unwrap_err();
589 assert!(matches!(err, DdsError::BadParameter { .. }));
590 }
591
592 #[test]
593 fn lookup_subscription_with_wrong_topic_name() {
594 let bs = BuiltinSubscriber::new();
595 let err = bs
596 .lookup_datareader::<SubscriptionBuiltinTopicData>("DCPSTopic")
597 .unwrap_err();
598 assert!(matches!(err, DdsError::BadParameter { .. }));
599 }
600
601 #[test]
602 fn read_does_not_remove_samples() {
603 let bs = BuiltinSubscriber::new();
604 let sample = ParticipantBuiltinTopicData {
605 key: mk_guid(0x33),
606 user_data: alloc::vec![],
607 };
608 bs.sinks().push_participant(&sample).unwrap();
609 let reader = bs.participant_reader();
610 let s1 = reader.read().unwrap();
611 let s2 = reader.read().unwrap();
612 assert_eq!(s1.len(), 1);
613 assert_eq!(s2.len(), 1);
614 }
615}