1use crate::entity::{Entity, EntityId};
2use crate::internal::ffi;
3use crate::{Participant, Result};
4
5pub struct WaitSet<'domain, 'participant, 'attached, A> {
41 pub(crate) inner: cyclonedds_sys::dds_entity_t,
42 attached: std::collections::HashMap<EntityId, &'attached dyn Entity>,
43 phantom_blobs: std::marker::PhantomData<&'attached A>,
44 phantom: std::marker::PhantomData<&'participant Participant<'domain>>,
45}
46
47impl<A> std::fmt::Debug for WaitSet<'_, '_, '_, A> {
48 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49 f.debug_struct("WaitSet")
50 .field("inner", &self.inner)
51 .field("attached", &self.attached.keys())
52 .field("phantom", &self.phantom)
53 .finish()
54 }
55}
56
57impl<'d, 'p, 'a, A> WaitSet<'d, 'p, 'a, A> {
58 pub fn new(participant: &'p Participant<'d>) -> Result<Self> {
76 let inner = ffi::dds_create_waitset(participant.inner)?;
77 Ok(Self {
78 inner,
79 attached: std::collections::HashMap::new(),
80 phantom_blobs: std::marker::PhantomData,
81 phantom: std::marker::PhantomData,
82 })
83 }
84
85 pub fn attach(&mut self, entity: &'a dyn Entity, blob: Option<&'a A>) -> Result<()> {
119 let id = entity.id();
120 if !self.attached.contains_key(&id) {
121 ffi::dds_waitset_attach(
122 self.inner,
123 id.inner,
124 blob.map_or(std::ptr::null(), |blob| std::ptr::from_ref(blob)) as isize,
125 )?;
126 self.attached.insert(id, entity);
127 }
128 Ok(())
129 }
130
131 pub fn detach(&mut self, entity: &'a dyn Entity) -> Result<()> {
164 let entity = entity.id();
165 self.detach_id(entity)
166 }
167
168 fn detach_id(&mut self, entity_id: EntityId) -> Result<()> {
169 if self.attached.contains_key(&entity_id) {
170 ffi::dds_waitset_detach(self.inner, entity_id.inner)?;
171 self.attached.remove(&entity_id);
172 }
173
174 Ok(())
175 }
176
177 pub fn set_trigger(&mut self, trigger: bool) -> Result<()> {
200 ffi::dds_waitset_set_trigger(self.inner, trigger)
201 }
202
203 pub fn wait(&mut self, timeout: crate::Duration) -> Result<Vec<&'a A>> {
238 let (_, attachments) =
239 ffi::dds_waitset_wait::<A>(self.inner, self.attached.len(), timeout.inner)?;
240 Ok(attachments)
241 }
242
243 pub fn wait_until(&mut self, absolute_time: crate::Time) -> Result<Vec<&'a A>> {
281 let (_, attachments) =
282 ffi::dds_waitset_wait_until::<A>(self.inner, self.attached.len(), absolute_time.inner)?;
283 Ok(attachments)
284 }
285
286 pub fn is_attached(&self, entity: &'a dyn Entity) -> bool {
313 self.attached.contains_key(&entity.id())
314 }
315}
316
317impl<A> Drop for WaitSet<'_, '_, '_, A> {
318 fn drop(&mut self) {
319 for entity_id in self.attached.keys() {
320 let result = ffi::dds_waitset_detach(self.inner, entity_id.inner);
321 debug_assert!(
322 result.is_ok(),
323 "unable to detach entity: {entity_id:?} from {self:?}: {result:?}"
324 );
325 }
326
327 let result = ffi::dds_delete(self.inner);
328 debug_assert!(
329 result.is_ok(),
330 "unable to delete {self:?}: failed with {result:?}"
331 );
332 }
333}
334
335#[cfg(test)]
336mod tests {
337 use super::*;
338 use crate::state;
339
340 #[test]
341 fn test_waitset_create() {
342 let domain_id = crate::tests::domain::unique_id();
343 let domain = crate::Domain::new(domain_id).unwrap();
344 let participant = crate::Participant::new(&domain).unwrap();
345 let _ = WaitSet::<()>::new(&participant).unwrap();
346 }
347
348 #[test]
349 fn test_waitset_create_with_invalid_participant() {
350 let domain_id = crate::tests::domain::unique_id();
351 let domain = crate::Domain::new(domain_id).unwrap();
352 let mut participant = crate::Participant::new(&domain).unwrap();
353 let participant_id = participant.inner;
354 participant.inner = 0;
355 let result = WaitSet::<()>::new(&participant).unwrap_err();
356 participant.inner = participant_id;
357
358 assert_eq!(result, crate::Error::BadParameter);
359 }
360
361 #[test]
362 fn test_waitset_debug_formatting() {
363 let domain_id = crate::tests::domain::unique_id();
364 let domain = crate::Domain::new(domain_id).unwrap();
365 let participant = crate::Participant::new(&domain).unwrap();
366 let waitset = WaitSet::<()>::new(&participant).unwrap();
367
368 let result = format!("{waitset:?}");
369 assert!(result.contains(&format!("{}", waitset.inner)));
370 }
371
372 #[test]
373 fn test_waitset_attachment() {
374 let domain_id = crate::tests::domain::unique_id();
375 let domain = crate::Domain::new(domain_id).unwrap();
376 let participant = crate::Participant::new(&domain).unwrap();
377 let topic_name = crate::tests::topic::unique_name();
378 let topic =
379 crate::Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
380 let reader = crate::Reader::new(&topic).unwrap();
381 let mask = state::sample::Any | state::view::Any | state::instance::Any;
382 let read_condition = crate::ReadCondition::new(&reader, mask).unwrap();
383
384 let mut waitset = WaitSet::<()>::new(&participant).unwrap();
385
386 let result = waitset.attach(&topic, None);
387 assert!(result.is_ok());
388 let result = waitset.attach(&topic, None);
389 assert!(result.is_ok());
390 let result = waitset.attach(&read_condition, None);
391 assert!(result.is_ok());
392
393 assert!(waitset.is_attached(&topic));
394 assert!(waitset.is_attached(&read_condition));
395
396 let result = waitset.detach(&read_condition);
397 assert!(result.is_ok());
398
399 assert!(waitset.is_attached(&topic));
400 assert!(!waitset.is_attached(&read_condition));
401
402 let result = waitset.detach(&read_condition);
403 assert!(result.is_ok());
404 }
405
406 #[test]
407 fn test_waitset_attachment_with_invalid_waitset() {
408 let domain_id = crate::tests::domain::unique_id();
409 let domain = crate::Domain::new(domain_id).unwrap();
410 let participant = crate::Participant::new(&domain).unwrap();
411 let topic_name = crate::tests::topic::unique_name();
412 let topic =
413 crate::Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
414 let reader = crate::Reader::new(&topic).unwrap();
415 let mask = state::sample::Any | state::view::Any | state::instance::Any;
416 let read_condition = crate::ReadCondition::new(&reader, mask).unwrap();
417
418 let mut waitset = WaitSet::<()>::new(&participant).unwrap();
419
420 let result = waitset.attach(&topic, None);
421 assert!(result.is_ok());
422
423 let waitset_id = waitset.inner;
424 waitset.inner = 0;
425
426 let result = waitset.attach(&read_condition, None).unwrap_err();
427 assert_eq!(result, crate::Error::BadParameter);
428
429 let result = waitset.detach(&topic).unwrap_err();
430 assert_eq!(result, crate::Error::BadParameter);
431
432 waitset.inner = waitset_id;
433 }
434
435 #[test]
436 fn test_waitset_wait() {
437 let domain_id = crate::tests::domain::unique_id();
438 let domain = crate::Domain::new(domain_id).unwrap();
439 let participant = crate::Participant::new(&domain).unwrap();
440 let topic_name = crate::tests::topic::unique_name();
441 let topic =
442 crate::Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
443 let reader = crate::Reader::new(&topic).unwrap();
444 let writer = crate::Writer::new(&topic).unwrap();
445 let mask = state::sample::Any | state::view::Any | state::instance::Any;
446 let read_condition_1 = crate::ReadCondition::new(&reader, mask).unwrap();
447
448 let mask = state::sample::Any | state::view::Any | state::instance::Any;
449 let read_condition_2 = crate::ReadCondition::new(&reader, mask).unwrap();
450
451 let mask = state::sample::Any | state::view::Any | state::instance::Any;
452 let read_condition_3 = crate::ReadCondition::new(&reader, mask).unwrap();
453
454 let attach01 = String::from("hello");
455 let attach02 = String::from("world");
456 let mut waitset = WaitSet::new(&participant).unwrap();
457
458 waitset.attach(&read_condition_1, Some(&attach01)).unwrap();
459
460 let actual = waitset
461 .wait(crate::Duration::from_nanos(5_000_000))
462 .unwrap_err();
463 assert_eq!(actual, crate::Error::Timeout);
464
465 let actual = waitset.wait_until(crate::Time::from_nanos(0)).unwrap_err();
466 assert_eq!(actual, crate::Error::Timeout);
467
468 writer.write(&crate::tests::topic::Data::default()).unwrap();
469 let actual = waitset
470 .wait(crate::Duration::from_nanos(1_000_000_000))
471 .unwrap();
472 assert_eq!(actual, vec![&attach01]);
473
474 let actual = waitset.wait_until(crate::Time::from_nanos(1)).unwrap();
475 assert_eq!(actual, vec![&attach01]);
476
477 waitset.attach(&read_condition_2, Some(&attach02)).unwrap();
478 let actual = waitset
479 .wait(crate::Duration::from_nanos(1_000_000_000))
480 .unwrap();
481 assert_eq!(actual, vec![&attach01, &attach02]);
482
483 let actual = waitset.wait_until(crate::Time::from_nanos(1)).unwrap();
484 assert_eq!(actual, vec![&attach01, &attach02]);
485
486 waitset.attach(&read_condition_3, None).unwrap();
487 let actual = waitset
488 .wait(crate::Duration::from_nanos(1_000_000_000))
489 .unwrap();
490 assert_eq!(actual, vec![&attach01, &attach02]);
491
492 let actual = waitset.wait_until(crate::Time::from_nanos(1)).unwrap();
493 assert_eq!(actual, vec![&attach01, &attach02]);
494 }
495
496 #[test]
497 fn test_waitset_wait_with_invalid_waitset() {
498 let domain_id = crate::tests::domain::unique_id();
499 let domain = crate::Domain::new(domain_id).unwrap();
500 let participant = crate::Participant::new(&domain).unwrap();
501
502 let attach01 = String::from("hello");
503 let mut waitset = WaitSet::new(&participant).unwrap();
504 waitset.attach(&participant, Some(&attach01)).unwrap();
505
506 let waitset_id = waitset.inner;
507 waitset.inner = 0;
508 let result = waitset.wait(crate::Duration::INFINITE).unwrap_err();
509 assert_eq!(result, crate::Error::BadParameter);
510 let result = waitset.wait_until(crate::Time::NEVER).unwrap_err();
511 assert_eq!(result, crate::Error::BadParameter);
512
513 waitset.inner = waitset_id;
514 }
515
516 #[test]
517 fn test_waitset_set_trigger() {
518 let domain_id = crate::tests::domain::unique_id();
519 let domain = crate::Domain::new(domain_id).unwrap();
520 let participant = crate::Participant::new(&domain).unwrap();
521 let mut waitset = WaitSet::<()>::new(&participant).unwrap();
522
523 let result = waitset.set_trigger(true);
524 assert!(result.is_ok());
525 }
526
527 #[test]
528 fn test_waitset_set_trigger_with_invalid_waitset() {
529 let domain_id = crate::tests::domain::unique_id();
530 let domain = crate::Domain::new(domain_id).unwrap();
531 let participant = crate::Participant::new(&domain).unwrap();
532 let mut waitset = WaitSet::<()>::new(&participant).unwrap();
533 let waitset_id = waitset.inner;
534 waitset.inner = 0;
535
536 let result = waitset.set_trigger(true).unwrap_err();
537 assert_eq!(result, crate::Error::BadParameter);
538 waitset.inner = waitset_id;
539 }
540
541 #[test]
542 fn test_waitset_wait_dynamic_data() {
543 let domain_id = crate::tests::domain::unique_id();
544 let domain = crate::Domain::new(domain_id).unwrap();
545 let participant = crate::Participant::new(&domain).unwrap();
546 let topic_name = crate::tests::topic::unique_name();
547 let topic =
548 crate::Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
549 let reader = crate::Reader::new(&topic).unwrap();
550 let writer = crate::Writer::new(&topic).unwrap();
551
552 let data01 = 10;
553 let data02 = "String";
554 let attach01 = Box::new(data01) as _;
555 let attach02 = Box::new(data02) as _;
556 let mut waitset = WaitSet::<Box<dyn std::any::Any>>::new(&participant).unwrap();
557 waitset.attach(&reader, Some(&attach01)).unwrap();
558 waitset.attach(&writer, Some(&attach02)).unwrap();
559
560 writer.write(&crate::tests::topic::Data::default()).unwrap();
561
562 let attachments = waitset.wait(crate::Duration::INFINITE).unwrap();
563
564 assert_eq!(attachments.len(), 2);
565
566 let attach01_result = attachments[0].downcast_ref::<i32>().unwrap();
567 let attach02_result = attachments[1].downcast_ref::<&str>().unwrap();
568
569 assert_eq!(*attach01_result, data01);
570 assert_eq!(*attach02_result, data02);
571 }
572}