1extern crate alloc;
23
24use alloc::sync::Arc;
25use alloc::vec::Vec;
26use core::sync::atomic::{AtomicBool, Ordering};
27use core::time::Duration;
28
29#[cfg(feature = "std")]
30use std::sync::{Condvar, Mutex};
31
32use crate::error::{DdsError, Result};
33
34pub trait Condition: Send + Sync {
36 fn get_trigger_value(&self) -> bool;
39}
40
41pub use crate::entity::StatusCondition;
44
45impl Condition for StatusCondition {
46 fn get_trigger_value(&self) -> bool {
47 self.trigger_value()
48 }
49}
50
51pub struct ReadCondition {
64 sample_state_mask: u32,
65 view_state_mask: u32,
66 instance_state_mask: u32,
67 trigger: Arc<dyn Fn(u32, u32, u32) -> bool + Send + Sync>,
68}
69
70impl core::fmt::Debug for ReadCondition {
71 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
72 f.debug_struct("ReadCondition")
73 .field("sample_state_mask", &self.sample_state_mask)
74 .field("view_state_mask", &self.view_state_mask)
75 .field("instance_state_mask", &self.instance_state_mask)
76 .finish_non_exhaustive()
77 }
78}
79
80impl ReadCondition {
81 #[must_use]
83 pub fn new<F>(
84 sample_state_mask: u32,
85 view_state_mask: u32,
86 instance_state_mask: u32,
87 trigger: F,
88 ) -> Arc<Self>
89 where
90 F: Fn(u32, u32, u32) -> bool + Send + Sync + 'static,
91 {
92 Arc::new(Self {
93 sample_state_mask,
94 view_state_mask,
95 instance_state_mask,
96 trigger: Arc::new(trigger),
97 })
98 }
99
100 #[must_use]
102 pub fn get_sample_state_mask(&self) -> u32 {
103 self.sample_state_mask
104 }
105
106 #[must_use]
108 pub fn get_view_state_mask(&self) -> u32 {
109 self.view_state_mask
110 }
111
112 #[must_use]
114 pub fn get_instance_state_mask(&self) -> u32 {
115 self.instance_state_mask
116 }
117}
118
119impl Condition for ReadCondition {
120 fn get_trigger_value(&self) -> bool {
121 (self.trigger)(
122 self.sample_state_mask,
123 self.view_state_mask,
124 self.instance_state_mask,
125 )
126 }
127}
128
129pub struct QueryCondition {
134 base: Arc<ReadCondition>,
135 query_expression: alloc::string::String,
136 query_parameters: Mutex<Vec<alloc::string::String>>,
137 parsed: zerodds_sql_filter::Expr,
138}
139
140impl core::fmt::Debug for QueryCondition {
141 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
142 f.debug_struct("QueryCondition")
143 .field("query_expression", &self.query_expression)
144 .field("base", &self.base)
145 .finish_non_exhaustive()
146 }
147}
148
149impl QueryCondition {
150 pub fn new(
157 base: Arc<ReadCondition>,
158 query_expression: impl Into<alloc::string::String>,
159 query_parameters: Vec<alloc::string::String>,
160 ) -> Result<Arc<Self>> {
161 let expr_str = query_expression.into();
162 let parsed = zerodds_sql_filter::parse(&expr_str).map_err(|_| DdsError::BadParameter {
163 what: "QueryCondition: invalid SQL filter expression",
164 })?;
165 Ok(Arc::new(Self {
166 base,
167 query_expression: expr_str,
168 query_parameters: Mutex::new(query_parameters),
169 parsed,
170 }))
171 }
172
173 pub fn evaluate<R: zerodds_sql_filter::RowAccess>(&self, row: &R) -> Result<bool> {
185 let params = self
186 .query_parameters
187 .lock()
188 .map_err(|_| DdsError::PreconditionNotMet {
189 reason: "query parameters poisoned",
190 })?;
191 let values: Vec<zerodds_sql_filter::Value> = params
192 .iter()
193 .map(|s| zerodds_sql_filter::Value::String(s.clone()))
194 .collect();
195 self.parsed
196 .evaluate(row, &values)
197 .map_err(|_| DdsError::PreconditionNotMet {
198 reason: "QueryCondition SQL evaluation failed",
199 })
200 }
201
202 pub fn evaluate_with_values<R: zerodds_sql_filter::RowAccess>(
209 &self,
210 row: &R,
211 params: &[zerodds_sql_filter::Value],
212 ) -> Result<bool> {
213 self.parsed
214 .evaluate(row, params)
215 .map_err(|_| DdsError::PreconditionNotMet {
216 reason: "QueryCondition SQL evaluation failed",
217 })
218 }
219
220 #[must_use]
222 pub fn get_query_expression(&self) -> &str {
223 &self.query_expression
224 }
225
226 #[must_use]
228 pub fn get_query_parameters(&self) -> Vec<alloc::string::String> {
229 self.query_parameters
230 .lock()
231 .map(|p| p.clone())
232 .unwrap_or_default()
233 }
234
235 pub fn set_query_parameters(&self, params: Vec<alloc::string::String>) -> Result<()> {
240 let mut current =
241 self.query_parameters
242 .lock()
243 .map_err(|_| DdsError::PreconditionNotMet {
244 reason: "query parameters poisoned",
245 })?;
246 *current = params;
247 Ok(())
248 }
249
250 #[must_use]
253 pub fn base(&self) -> &Arc<ReadCondition> {
254 &self.base
255 }
256}
257
258impl Condition for QueryCondition {
259 fn get_trigger_value(&self) -> bool {
260 self.base.get_trigger_value()
264 }
265}
266
267#[derive(Debug, Default)]
272pub struct GuardCondition {
273 triggered: AtomicBool,
274}
275
276impl GuardCondition {
277 #[must_use]
279 pub fn new() -> Arc<Self> {
280 Arc::new(Self::default())
281 }
282
283 pub fn set_trigger_value(&self, value: bool) {
285 self.triggered.store(value, Ordering::Release);
286 }
287}
288
289impl Condition for GuardCondition {
290 fn get_trigger_value(&self) -> bool {
291 self.triggered.load(Ordering::Acquire)
292 }
293}
294
295#[cfg(feature = "std")]
302pub struct WaitSet {
303 inner: Arc<WaitSetInner>,
304}
305
306#[cfg(feature = "std")]
307struct WaitSetInner {
308 conditions: Mutex<Vec<Arc<dyn Condition>>>,
309 cvar: Condvar,
314 poll_interval: core::time::Duration,
317 locked: Mutex<()>,
319 waiting: AtomicBool,
324}
325
326pub const MAX_WAITSET_CONDITIONS: usize = 1024;
330
331#[cfg(feature = "std")]
332impl WaitSet {
333 #[must_use]
335 pub fn new() -> Self {
336 Self {
337 inner: Arc::new(WaitSetInner {
338 conditions: Mutex::new(Vec::new()),
339 cvar: Condvar::new(),
340 poll_interval: Duration::from_millis(1),
341 locked: Mutex::new(()),
342 waiting: AtomicBool::new(false),
343 }),
344 }
345 }
346
347 pub fn attach_condition(&self, cond: Arc<dyn Condition>) -> Result<()> {
352 let mut conds = self
353 .inner
354 .conditions
355 .lock()
356 .map_err(|_| DdsError::PreconditionNotMet {
357 reason: "waitset conditions poisoned",
358 })?;
359 if conds.iter().any(|c| Arc::ptr_eq(c, &cond)) {
362 return Ok(());
363 }
364 if conds.len() >= MAX_WAITSET_CONDITIONS {
366 return Err(DdsError::OutOfResources {
367 what: "waitset condition count",
368 });
369 }
370 conds.push(cond);
371 Ok(())
372 }
373
374 pub fn detach_condition(&self, cond: &Arc<dyn Condition>) -> Result<()> {
379 let mut conds = self
380 .inner
381 .conditions
382 .lock()
383 .map_err(|_| DdsError::PreconditionNotMet {
384 reason: "waitset conditions poisoned",
385 })?;
386 conds.retain(|c| !Arc::ptr_eq(c, cond));
387 Ok(())
388 }
389
390 pub fn get_conditions(&self) -> Result<Vec<Arc<dyn Condition>>> {
395 let conds = self
396 .inner
397 .conditions
398 .lock()
399 .map_err(|_| DdsError::PreconditionNotMet {
400 reason: "waitset conditions poisoned",
401 })?;
402 Ok(conds.clone())
403 }
404
405 pub fn wait(&self, timeout: Duration) -> Result<Vec<Arc<dyn Condition>>> {
418 if self
423 .inner
424 .waiting
425 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
426 .is_err()
427 {
428 return Err(DdsError::PreconditionNotMet {
429 reason: "waitset already in wait() — single-thread-wait per spec",
430 });
431 }
432 struct WaitGuard<'a>(&'a AtomicBool);
433 impl Drop for WaitGuard<'_> {
434 fn drop(&mut self) {
435 self.0.store(false, Ordering::Release);
436 }
437 }
438 let _guard = WaitGuard(&self.inner.waiting);
439
440 let deadline = std::time::Instant::now() + timeout;
441 loop {
442 let active = self.poll_active()?;
443 if !active.is_empty() {
444 return Ok(active);
445 }
446 let now = std::time::Instant::now();
447 if now >= deadline {
448 return Err(DdsError::Timeout);
449 }
450 let sleep_for = (deadline - now).min(self.inner.poll_interval);
452 let cvlock = self
453 .inner
454 .locked
455 .lock()
456 .map_err(|_| DdsError::PreconditionNotMet {
457 reason: "waitset locked poisoned",
458 })?;
459 let _ = self.inner.cvar.wait_timeout(cvlock, sleep_for);
460 }
463 }
464
465 fn poll_active(&self) -> Result<Vec<Arc<dyn Condition>>> {
467 let conds = self
468 .inner
469 .conditions
470 .lock()
471 .map_err(|_| DdsError::PreconditionNotMet {
472 reason: "waitset conditions poisoned",
473 })?;
474 Ok(conds
475 .iter()
476 .filter(|c| c.get_trigger_value())
477 .cloned()
478 .collect())
479 }
480
481 pub fn notify(&self) {
489 self.inner.cvar.notify_all();
490 }
491}
492
493#[cfg(feature = "std")]
494impl Default for WaitSet {
495 fn default() -> Self {
496 Self::new()
497 }
498}
499
500#[cfg(feature = "std")]
501impl Clone for WaitSet {
502 fn clone(&self) -> Self {
503 Self {
504 inner: Arc::clone(&self.inner),
505 }
506 }
507}
508
509#[cfg(feature = "std")]
510impl core::fmt::Debug for WaitSet {
511 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
512 let n = self.inner.conditions.lock().map(|c| c.len()).unwrap_or(0);
513 f.debug_struct("WaitSet").field("conditions", &n).finish()
514 }
515}
516
517#[cfg(test)]
518#[cfg(feature = "std")]
519#[allow(clippy::expect_used, clippy::unwrap_used)]
520mod tests {
521 use super::*;
522 use std::thread;
523 use std::time::Instant;
524
525 #[test]
526 fn guard_condition_starts_false() {
527 let g = GuardCondition::new();
528 assert!(!g.get_trigger_value());
529 }
530
531 #[test]
532 fn guard_condition_set_trigger() {
533 let g = GuardCondition::new();
534 g.set_trigger_value(true);
535 assert!(g.get_trigger_value());
536 g.set_trigger_value(false);
537 assert!(!g.get_trigger_value());
538 }
539
540 #[test]
541 fn waitset_attach_detach_idempotent() {
542 let ws = WaitSet::new();
543 let g = GuardCondition::new();
544 let cond: Arc<dyn Condition> = g.clone();
545 ws.attach_condition(cond.clone()).unwrap();
546 ws.attach_condition(cond.clone()).unwrap(); assert_eq!(ws.get_conditions().unwrap().len(), 1);
548 ws.detach_condition(&cond).unwrap();
549 assert!(ws.get_conditions().unwrap().is_empty());
550 }
551
552 #[test]
553 fn waitset_wait_returns_immediately_if_already_triggered() {
554 let ws = WaitSet::new();
555 let g = GuardCondition::new();
556 g.set_trigger_value(true);
557 let cond: Arc<dyn Condition> = g.clone();
558 ws.attach_condition(cond).unwrap();
559 let triggered = ws.wait(Duration::from_secs(1)).unwrap();
560 assert_eq!(triggered.len(), 1);
561 }
562
563 #[test]
564 fn waitset_wait_timeout_returns_err() {
565 let ws = WaitSet::new();
566 let g = GuardCondition::new();
567 let cond: Arc<dyn Condition> = g.clone();
568 ws.attach_condition(cond).unwrap();
569 let start = Instant::now();
570 let res = ws.wait(Duration::from_millis(50));
571 assert!(matches!(res, Err(DdsError::Timeout)));
572 assert!(start.elapsed() >= Duration::from_millis(40));
574 }
575
576 #[test]
577 fn waitset_wakes_when_guard_triggers() {
578 let ws = WaitSet::new();
579 let g = GuardCondition::new();
580 let cond: Arc<dyn Condition> = g.clone();
581 ws.attach_condition(cond).unwrap();
582
583 let g_clone = g.clone();
584 let handle = thread::spawn(move || {
585 thread::sleep(Duration::from_millis(20));
586 g_clone.set_trigger_value(true);
587 });
588
589 let start = Instant::now();
590 let triggered = ws.wait(Duration::from_secs(2)).unwrap();
591 let elapsed = start.elapsed();
592 handle.join().unwrap();
593
594 assert_eq!(triggered.len(), 1);
595 assert!(elapsed < Duration::from_millis(500), "elapsed={elapsed:?}");
597 }
598
599 #[test]
600 fn waitset_with_multiple_conditions_returns_all_triggered() {
601 let ws = WaitSet::new();
602 let g1 = GuardCondition::new();
603 let g2 = GuardCondition::new();
604 ws.attach_condition(g1.clone()).unwrap();
605 ws.attach_condition(g2.clone()).unwrap();
606 g1.set_trigger_value(true);
607 g2.set_trigger_value(true);
608 let triggered = ws.wait(Duration::from_millis(100)).unwrap();
609 assert_eq!(triggered.len(), 2);
610 }
611
612 #[test]
613 fn status_condition_implements_condition_trait() {
614 let state = crate::entity::EntityState::new();
615 let sc = crate::entity::StatusCondition::new(state.clone());
616 sc.set_enabled_statuses(0b0010);
617 assert!(!sc.get_trigger_value());
618 state.set_status_bits(0b0010);
619 assert!(sc.get_trigger_value());
620 }
621
622 #[test]
623 fn waitset_clone_shares_state() {
624 let ws1 = WaitSet::new();
625 let ws2 = ws1.clone();
626 let g = GuardCondition::new();
627 ws1.attach_condition(g.clone()).unwrap();
628 assert_eq!(ws2.get_conditions().unwrap().len(), 1);
630 }
631
632 #[test]
633 fn waitset_default_is_empty() {
634 let ws = WaitSet::default();
635 assert!(ws.get_conditions().unwrap().is_empty());
636 }
637
638 #[test]
641 fn waitset_concurrent_wait_returns_precondition_not_met() {
642 let ws = WaitSet::new();
646 let ws_clone = ws.clone();
648 let handle = thread::spawn(move || ws_clone.wait(Duration::from_millis(100)));
649 thread::sleep(Duration::from_millis(20));
651 let res = ws.wait(Duration::from_millis(10));
652 assert!(matches!(res, Err(DdsError::PreconditionNotMet { .. })));
653 let _ = handle.join().unwrap();
654 }
655
656 #[test]
657 fn waitset_sequential_waits_after_first_returns_succeed() {
658 let ws = WaitSet::new();
661 let r1 = ws.wait(Duration::from_millis(10));
662 assert!(matches!(r1, Err(DdsError::Timeout)));
663 let r2 = ws.wait(Duration::from_millis(10));
665 assert!(matches!(r2, Err(DdsError::Timeout)));
666 }
667
668 #[test]
669 fn waitset_attach_above_max_returns_out_of_resources() {
670 let ws = WaitSet::new();
672 for _ in 0..MAX_WAITSET_CONDITIONS {
673 let g = GuardCondition::new();
674 ws.attach_condition(g).unwrap();
675 }
676 let g = GuardCondition::new();
678 let res = ws.attach_condition(g);
679 assert!(matches!(res, Err(DdsError::OutOfResources { .. })));
680 assert_eq!(ws.get_conditions().unwrap().len(), MAX_WAITSET_CONDITIONS);
681 }
682
683 #[test]
684 fn waitset_attach_idempotent_does_not_count_against_cap() {
685 let ws = WaitSet::new();
688 let g = GuardCondition::new();
689 let cond: Arc<dyn Condition> = g.clone();
690 ws.attach_condition(cond.clone()).unwrap();
691 ws.attach_condition(cond.clone()).unwrap();
692 ws.attach_condition(cond).unwrap();
693 assert_eq!(ws.get_conditions().unwrap().len(), 1);
694 }
695
696 #[test]
699 fn read_condition_returns_trigger_from_closure() {
700 use crate::sample_info::{instance_state_mask, sample_state_mask, view_state_mask};
701 let cond = ReadCondition::new(
703 sample_state_mask::ANY,
704 view_state_mask::ANY,
705 instance_state_mask::ANY,
706 |sm, vm, im| {
707 sm == sample_state_mask::ANY
708 && vm == view_state_mask::ANY
709 && im == instance_state_mask::ANY
710 },
711 );
712 assert_eq!(cond.get_sample_state_mask(), sample_state_mask::ANY);
713 assert_eq!(cond.get_view_state_mask(), view_state_mask::ANY);
714 assert_eq!(cond.get_instance_state_mask(), instance_state_mask::ANY);
715 assert!(cond.get_trigger_value());
716 }
717
718 #[test]
719 fn read_condition_trigger_false_when_closure_says_false() {
720 use crate::sample_info::{instance_state_mask, sample_state_mask, view_state_mask};
721 let cond = ReadCondition::new(
723 sample_state_mask::NOT_READ,
724 view_state_mask::NEW,
725 instance_state_mask::ALIVE,
726 |_, _, _| false,
727 );
728 assert!(!cond.get_trigger_value());
729 }
730
731 #[test]
732 fn read_condition_implements_condition_trait_object_safe() {
733 use crate::sample_info::{instance_state_mask, sample_state_mask, view_state_mask};
734 let cond = ReadCondition::new(
735 sample_state_mask::ANY,
736 view_state_mask::ANY,
737 instance_state_mask::ANY,
738 |_, _, _| true,
739 );
740 let dyn_cond: Arc<dyn Condition> = cond;
741 assert!(dyn_cond.get_trigger_value());
742 }
743
744 #[test]
745 fn read_condition_attaches_to_waitset() {
746 use crate::sample_info::{instance_state_mask, sample_state_mask, view_state_mask};
747 let ws = WaitSet::new();
749 let cond = ReadCondition::new(
750 sample_state_mask::ANY,
751 view_state_mask::ANY,
752 instance_state_mask::ANY,
753 |_, _, _| true, );
755 ws.attach_condition(cond.clone()).unwrap();
756 let triggered = ws.wait(Duration::from_millis(50)).unwrap();
757 assert_eq!(triggered.len(), 1);
758 }
759
760 #[test]
763 fn query_condition_inherits_trigger_from_base() {
764 use crate::sample_info::{instance_state_mask, sample_state_mask, view_state_mask};
765 let base = ReadCondition::new(
766 sample_state_mask::ANY,
767 view_state_mask::ANY,
768 instance_state_mask::ANY,
769 |_, _, _| true,
770 );
771 let qc = QueryCondition::new(base, "x > 10", alloc::vec::Vec::new()).unwrap();
772 assert!(qc.get_trigger_value());
773 assert_eq!(qc.get_query_expression(), "x > 10");
774 assert!(qc.get_query_parameters().is_empty());
775 }
776
777 #[test]
778 fn query_condition_set_query_parameters_roundtrip() {
779 use crate::sample_info::{instance_state_mask, sample_state_mask, view_state_mask};
780 let base = ReadCondition::new(
781 sample_state_mask::ANY,
782 view_state_mask::ANY,
783 instance_state_mask::ANY,
784 |_, _, _| false,
785 );
786 let qc = QueryCondition::new(base, "color = %0", alloc::vec!["RED".into()]).unwrap();
787 assert_eq!(qc.get_query_parameters(), alloc::vec!["RED".to_string()]);
788 qc.set_query_parameters(alloc::vec!["BLUE".into(), "GREEN".into()])
789 .unwrap();
790 assert_eq!(
791 qc.get_query_parameters(),
792 alloc::vec!["BLUE".to_string(), "GREEN".to_string()]
793 );
794 }
795
796 #[test]
797 fn query_condition_trigger_inherits_false_from_base() {
798 use crate::sample_info::{instance_state_mask, sample_state_mask, view_state_mask};
799 let base = ReadCondition::new(
800 sample_state_mask::READ,
801 view_state_mask::NOT_NEW,
802 instance_state_mask::NOT_ALIVE,
803 |_, _, _| false, );
805 let qc = QueryCondition::new(base, "x > 0", alloc::vec::Vec::new()).unwrap();
806 assert!(!qc.get_trigger_value());
807 }
808
809 #[test]
810 fn query_condition_base_returns_correct_read_condition() {
811 use crate::sample_info::{instance_state_mask, sample_state_mask, view_state_mask};
812 let base = ReadCondition::new(
813 sample_state_mask::NOT_READ,
814 view_state_mask::NEW,
815 instance_state_mask::ALIVE,
816 |_, _, _| true,
817 );
818 let qc = QueryCondition::new(base.clone(), "x > 0", alloc::vec::Vec::new()).unwrap();
819 assert!(Arc::ptr_eq(&base, qc.base()));
821 }
822
823 #[test]
824 fn query_condition_rejects_invalid_sql() {
825 use crate::sample_info::{instance_state_mask, sample_state_mask, view_state_mask};
826 let base = ReadCondition::new(
827 sample_state_mask::ANY,
828 view_state_mask::ANY,
829 instance_state_mask::ANY,
830 |_, _, _| true,
831 );
832 let r = QueryCondition::new(base, "x > >", alloc::vec::Vec::new());
833 assert!(matches!(r, Err(DdsError::BadParameter { .. })));
834 }
835
836 #[test]
837 fn query_condition_evaluate_filters_per_sample() {
838 use crate::sample_info::{instance_state_mask, sample_state_mask, view_state_mask};
839 use zerodds_sql_filter::{RowAccess, Value};
840 struct R(i64);
841 impl RowAccess for R {
842 fn get(&self, p: &str) -> Option<Value> {
843 if p == "x" {
844 Some(Value::Int(self.0))
845 } else {
846 None
847 }
848 }
849 }
850 let base = ReadCondition::new(
851 sample_state_mask::ANY,
852 view_state_mask::ANY,
853 instance_state_mask::ANY,
854 |_, _, _| true,
855 );
856 let qc = QueryCondition::new(base, "x > 10", alloc::vec::Vec::new()).unwrap();
857 assert!(qc.evaluate(&R(42)).unwrap());
858 assert!(!qc.evaluate(&R(5)).unwrap());
859 }
860
861 #[test]
862 fn query_condition_evaluate_with_typed_values_int_param() {
863 use crate::sample_info::{instance_state_mask, sample_state_mask, view_state_mask};
864 use zerodds_sql_filter::{RowAccess, Value};
865 struct R(i64);
866 impl RowAccess for R {
867 fn get(&self, p: &str) -> Option<Value> {
868 if p == "x" {
869 Some(Value::Int(self.0))
870 } else {
871 None
872 }
873 }
874 }
875 let base = ReadCondition::new(
876 sample_state_mask::ANY,
877 view_state_mask::ANY,
878 instance_state_mask::ANY,
879 |_, _, _| true,
880 );
881 let qc = QueryCondition::new(base, "x > %0", alloc::vec::Vec::new()).unwrap();
882 assert!(qc.evaluate_with_values(&R(42), &[Value::Int(10)]).unwrap());
883 assert!(!qc.evaluate_with_values(&R(5), &[Value::Int(10)]).unwrap());
884 }
885
886 #[test]
887 fn query_condition_evaluate_uses_string_params_by_default() {
888 use crate::sample_info::{instance_state_mask, sample_state_mask, view_state_mask};
889 use zerodds_sql_filter::{RowAccess, Value};
890 struct R(alloc::string::String);
891 impl RowAccess for R {
892 fn get(&self, p: &str) -> Option<Value> {
893 if p == "color" {
894 Some(Value::String(self.0.clone()))
895 } else {
896 None
897 }
898 }
899 }
900 let base = ReadCondition::new(
901 sample_state_mask::ANY,
902 view_state_mask::ANY,
903 instance_state_mask::ANY,
904 |_, _, _| true,
905 );
906 let qc = QueryCondition::new(base, "color = %0", alloc::vec!["RED".into()]).unwrap();
907 assert!(qc.evaluate(&R("RED".into())).unwrap());
908 assert!(!qc.evaluate(&R("BLUE".into())).unwrap());
909 }
910
911 #[test]
912 fn waitset_panic_in_wait_releases_waiting_flag() {
913 let ws = WaitSet::new();
920 let _ = ws.wait(Duration::from_millis(5));
921 let r = ws.wait(Duration::from_millis(5));
924 assert!(matches!(r, Err(DdsError::Timeout)));
925 }
926}