cyclonedds/query_condition.rs
1use ffi::Filter;
2
3use crate::internal::ffi;
4use crate::state::State;
5use crate::{Reader, Result};
6
7/// A filter on a [`Reader`](crate::Reader) that restricts samples by their
8/// [`State`](crate::State) and a predicate.
9///
10/// A `QueryCondition` extends [`ReadCondition`](crate::ReadCondition) with a
11/// user-supplied predicate `F` that is applied to each sample's payload. Only
12/// samples that both match the state mask and satisfy the predicate are
13/// returned by reads and trigger waitset wakeups.
14///
15/// # Predicate requirements
16/// The predicate closure provided to the [`QueryCondition`] must be
17/// *zero-sized*.
18///
19/// This restriction comes from how the callback is integrated with the
20/// underlying C API. For ergonomics, the closure is reconstructed inside a
21/// wrapper that converts raw C types into Rust types before invoking it. That
22/// reconstruction depends only on the closure's type, so it cannot rely on any
23/// captured state.
24///
25/// In practice, this means the callback must be either:
26///
27/// - a [function item](https://doc.rust-lang.org/reference/types/function-item.html),
28/// which is always zero-sized, or
29/// - a [closure](https://doc.rust-lang.org/reference/types/closure.html) that
30/// does not capture any variables from its environment
31///
32/// The following example will **fail to compile** because the closure captures
33/// `x`, making it non-zero-sized:
34///
35/// ```compile_fail
36/// use cyclonedds as dds;
37/// use dds::state;
38///
39/// # #[derive(
40/// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
41/// # )]
42/// struct Data {
43/// x: i32,
44/// }
45/// fn create_your_reader() -> dds::Reader<'static, 'static, 'static, Data> {
46/// unimplemented!()
47/// }
48/// let reader: dds::Reader<Data> = create_your_reader();
49/// let x = 10;
50/// let result = dds::QueryCondition::new(
51/// &reader,
52/// state::sample::Any | state::instance::Any | state::view::Any,
53/// // Error: closure captures `x`, so it is not zero-sized.
54/// |sample| sample.x < x,
55/// )?;
56/// # Ok::<(), dds::Error>(())
57/// ```
58///
59/// Instead, use a function item or a non-capturing closure, for example:
60///
61/// ```ignore
62/// |sample| sample.x < 10
63/// ```
64///
65/// The compiler will emit an error similar to:
66///
67/// ```text
68/// error[E0080]: evaluation panicked: the provided callback is not zero-sized
69/// = note: closures that capture values from their environment are not zero-sized
70/// = help: ensure the callback is either:
71/// - a function item, e.g. `fn my_callback() {}`
72/// - a closure that does not capture any external state
73/// ```
74///
75/// This is enforced via an internal compile-time assertion
76/// `assert!(size_of::<F>() == 0)` but note that the associated compiler output
77/// can be quite lengthy.
78///
79/// <details>
80/// <summary>Click to see a sample of the full compiler output</summary>
81///
82/// ```text
83/// error[E0080]: evaluation panicked: the provided callback is not zero-sized
84/// = note: closures that capture values from their environment are not zero-sized
85/// = help: ensure the callback is either:
86/// - a function item, e.g. `fn my_callback() {}`
87/// - a closure that does not capture any external state
88/// --> cyclonedds/src/internal/ffi.rs
89/// |
90/// | const IS_PROVIDED_CALLBACK_ZERO_SIZED: () = assert!(
91/// | _________________________________________________^
92/// | | size_of::<F>() == 0,
93/// | | "\
94/// | | the provided callback is not zero-sized
95/// |
96/// | | "
97/// | | );
98/// | |_^ evaluation of `<QueryCondition as Filter>::IS_PROVIDED_CALLBACK_ZERO_SIZED` failed here
99///
100/// note: erroneous constant encountered
101/// --> cyclonedds/src/query_condition.rs:84:17
102/// |
103/// | Self::IS_PROVIDED_CALLBACK_ZERO_SIZED;
104/// | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
105///
106/// ...
107///
108/// note: erroneous constant encountered
109/// --> cyclonedds/src/internal/ffi.rs
110/// |
111/// | Callback::IS_PROVIDED_CALLBACK_ZERO_SIZED;
112/// | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
113/// ```
114/// </details>
115pub struct QueryCondition<'domain, 'participant, 'topic, 'reader, T, F>
116where
117 T: crate::Topicable,
118 F: Fn(&T) -> bool,
119{
120 pub(crate) inner: cyclonedds_sys::dds_entity_t,
121 phantom_callback: std::marker::PhantomData<F>,
122 phantom: std::marker::PhantomData<&'reader Reader<'topic, 'domain, 'participant, T>>,
123}
124
125impl<T, F> std::fmt::Debug for QueryCondition<'_, '_, '_, '_, T, F>
126where
127 T: crate::Topicable,
128 F: Fn(&T) -> bool,
129{
130 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
131 f.debug_struct("QueryCondition")
132 .field("inner", &self.inner)
133 .field("phantom", &self.phantom)
134 .finish()
135 }
136}
137
138impl<T, F> Filter<T, F> for QueryCondition<'_, '_, '_, '_, T, F>
139where
140 T: crate::Topicable + std::panic::UnwindSafe + std::panic::RefUnwindSafe,
141 F: Fn(&T) -> bool,
142{
143}
144
145impl<'d, 'p, 't, 'r, T, F> QueryCondition<'d, 'p, 't, 'r, T, F>
146where
147 T: crate::Topicable + std::panic::UnwindSafe + std::panic::RefUnwindSafe,
148 F: Fn(&T) -> bool,
149{
150 /// Creates a new `QueryCondition` on `reader` that matches samples whose
151 /// state satisfies `mask` and whose payload satisfies the predicate `F`.
152 ///
153 /// The predicate is passed by value but must be zero-sized. Non-capturing
154 /// closures and function pointers satisfy this requirement. The
155 /// [`UnwindSafe`](std::panic::UnwindSafe) bound is required because the
156 /// predicate is called across the FFI boundary from within Cyclone DDS.
157 ///
158 /// # Errors
159 ///
160 /// Returns an [`Error`](crate::Error) if `F` is not zero-sized or if the
161 /// query condition fails to create.
162 ///
163 /// # Examples
164 ///
165 /// ```
166 /// use cyclonedds::{QueryCondition, state};
167 /// # use cyclonedds::{Domain, Participant, Topic, Reader};
168 /// # let domain = Domain::default();
169 /// # let participant = Participant::new(&domain)?;
170 /// # #[derive(
171 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
172 /// # )]
173 /// # struct Data {
174 /// # x: i32,
175 /// # y: i32,
176 /// # }
177 ///
178 /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
179 /// let reader = Reader::new(&topic)?;
180 /// let condition =
181 /// QueryCondition::new(&reader, state::sample::Fresh, |sample: &Data| sample.x > 10)?;
182 /// # Ok::<_, cyclonedds::Error>(())
183 /// ```
184 pub fn new(reader: &'r Reader<'d, 'p, 't, T>, mask: State, _: F) -> Result<Self> {
185 let _ = Self::IS_PROVIDED_CALLBACK_ZERO_SIZED;
186 let inner = ffi::dds_create_querycondition::<T, F, Self>(reader.inner, mask.bits())?;
187 Ok(Self {
188 inner,
189 phantom_callback: std::marker::PhantomData,
190 phantom: std::marker::PhantomData,
191 })
192 }
193
194 /// Returns the state mask this condition was created with.
195 ///
196 /// # Errors
197 ///
198 /// Returns an [`Error`](crate::Error) if the mask returned by the query
199 /// condition is invalid.
200 ///
201 /// # Examples
202 ///
203 /// ```
204 /// use cyclonedds::{QueryCondition, state};
205 /// # use cyclonedds::{Domain, Participant, Topic, Reader};
206 /// # let domain = Domain::default();
207 /// # let participant = Participant::new(&domain)?;
208 /// # #[derive(
209 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
210 /// # )]
211 /// # struct Data {
212 /// # x: i32,
213 /// # y: i32,
214 /// # }
215 ///
216 /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
217 /// let reader = Reader::new(&topic)?;
218 /// let condition = QueryCondition::new(&reader, state::sample::Fresh, |_: &Data| true)?;
219 /// assert_eq!(condition.mask()?, state::sample::Fresh);
220 /// # Ok::<_, cyclonedds::Error>(())
221 /// ```
222 pub fn mask(&self) -> Result<State> {
223 let mask = ffi::dds_get_mask(self.inner)?;
224 crate::state::State::from_bits(mask).ok_or(crate::error::Error::NonSpecific)
225 }
226
227 /// Returns `true` if this condition is currently triggered.
228 ///
229 /// A condition is triggered when samples matching its mask are available
230 /// in the reader cache.
231 ///
232 /// # Errors
233 ///
234 /// returns an [`Error`](crate::Error) if the query condition fails to read
235 /// the trigger state.
236 ///
237 /// # Examples
238 ///
239 /// ```
240 /// use cyclonedds::{QueryCondition, state};
241 /// # use cyclonedds::{Domain, Participant, Topic, Reader, Writer};
242 /// # let domain = Domain::default();
243 /// # let participant = Participant::new(&domain)?;
244 /// # #[derive(
245 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
246 /// # )]
247 /// # struct Data {
248 /// # x: i32,
249 /// # y: i32,
250 /// # }
251 ///
252 /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
253 /// let reader = Reader::new(&topic)?;
254 /// let writer = Writer::new(&topic)?;
255 ///
256 /// let condition = QueryCondition::new(
257 /// &reader,
258 /// state::sample::Fresh | state::view::Any | state::instance::Any,
259 /// |sample: &Data| sample.x == 4 && sample.y == 0,
260 /// )?;
261 /// writer.write(&Data { x: 4, y: 0 })?;
262 /// assert!(condition.triggered()?);
263 /// # Ok::<_, cyclonedds::Error>(())
264 /// ```
265 pub fn triggered(&self) -> Result<bool> {
266 ffi::dds_triggered(self.inner)
267 }
268
269 /// Removes and returns all samples matching this condition's mask and
270 /// predicate from the reader cache.
271 ///
272 /// # Errors
273 ///
274 /// Returns an [`Error`](crate::Error) if the query condition fails to take
275 /// samples.
276 ///
277 /// # Examples
278 ///
279 /// ```
280 /// use cyclonedds::{QueryCondition, state};
281 /// # use cyclonedds::{Domain, Participant, Topic, Reader, Writer};
282 /// # let domain = Domain::default();
283 /// # let participant = Participant::new(&domain)?;
284 /// # #[derive(
285 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
286 /// # )]
287 /// # struct Data {
288 /// # #[dds(key)]
289 /// # x: i32,
290 /// # y: i32,
291 /// # }
292 ///
293 /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
294 /// let reader = Reader::new(&topic)?;
295 /// let writer = Writer::new(&topic)?;
296 ///
297 /// let condition = QueryCondition::new(
298 /// &reader,
299 /// state::sample::Any | state::view::Any | state::instance::Any,
300 /// |sample: &Data| true,
301 /// )?;
302 ///
303 /// writer.write(&Data { x: 1, y: 0 })?;
304 /// writer.write(&Data { x: 100, y: 0 })?;
305 ///
306 /// // Attempt a normal read.
307 /// assert_eq!(reader.peek()?.len(), 2);
308 ///
309 /// // Both samples should match.
310 /// let samples = condition.take()?;
311 /// assert_eq!(samples.len(), 2);
312 ///
313 /// // Samples should be removed from the cache.
314 /// assert_eq!(condition.take()?.len(), 0);
315 /// # Ok::<_, cyclonedds::Error>(())
316 /// ```
317 pub fn take(&self) -> Result<Vec<crate::sample::SampleOrKey<T>>>
318 where
319 T: std::clone::Clone,
320 {
321 ffi::dds_take(self.inner)
322 }
323
324 /// Returns all samples matching this condition's mask and predicate without
325 /// marking them as read or removing them from the cache.
326 ///
327 /// # Errors
328 ///
329 /// Returns an [`Error`](crate::Error) if the query condition fails to peek
330 /// samples.
331 ///
332 /// # Examples
333 ///
334 /// ```
335 /// use cyclonedds::{QueryCondition, state};
336 /// # use cyclonedds::{Domain, Participant, Topic, Reader, Writer};
337 /// # let domain = Domain::default();
338 /// # let participant = Participant::new(&domain)?;
339 /// # #[derive(
340 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
341 /// # )]
342 /// # struct Data {
343 /// # #[dds(key)]
344 /// # x: i32,
345 /// # y: i32,
346 /// # }
347 ///
348 /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
349 /// let reader = Reader::new(&topic)?;
350 /// let writer = Writer::new(&topic)?;
351 ///
352 /// let condition = QueryCondition::new(
353 /// &reader,
354 /// state::sample::Any | state::view::Any | state::instance::Any,
355 /// |sample: &Data| true,
356 /// )?;
357 ///
358 /// writer.write(&Data { x: 1, y: 0 })?;
359 /// writer.write(&Data { x: 100, y: 0 })?;
360 ///
361 /// // Attempt a normal read.
362 /// assert_eq!(reader.read()?.len(), 2);
363 ///
364 /// // Both samples should match.
365 /// let samples = condition.read()?;
366 /// assert_eq!(samples.len(), 2);
367 ///
368 /// // Samples remain in the cache.
369 /// assert_eq!(condition.read()?.len(), 2);
370 /// # Ok::<_, cyclonedds::Error>(())
371 /// ```
372 pub fn read(&self) -> Result<Vec<crate::sample::SampleOrKey<T>>>
373 where
374 T: std::clone::Clone,
375 {
376 ffi::dds_read(self.inner)
377 }
378
379 /// Returns all samples matching this condition's mask and predicate without
380 /// marking them as read or removing them from the cache.
381 ///
382 /// # Errors
383 ///
384 /// Returns an [`Error`](crate::Error) if the query condition fails to peek
385 /// samples.
386 ///
387 /// # Examples
388 ///
389 /// ```
390 /// use cyclonedds::{QueryCondition, state};
391 /// # use cyclonedds::{Domain, Participant, Topic, Reader, Writer};
392 /// # let domain = Domain::default();
393 /// # let participant = Participant::new(&domain)?;
394 /// # #[derive(
395 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
396 /// # )]
397 /// # struct Data {
398 /// # #[dds(key)]
399 /// # x: i32,
400 /// # y: i32,
401 /// # }
402 ///
403 /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
404 /// let reader = Reader::new(&topic)?;
405 /// let writer = Writer::new(&topic)?;
406 ///
407 /// let condition = QueryCondition::new(
408 /// &reader,
409 /// state::sample::Any | state::view::Any | state::instance::Any,
410 /// |sample: &Data| true,
411 /// )?;
412 ///
413 /// writer.write(&Data { x: 1, y: 0 })?;
414 /// writer.write(&Data { x: 100, y: 0 })?;
415 ///
416 /// // Attempt a normal peek.
417 /// assert_eq!(reader.peek()?.len(), 2);
418 ///
419 /// // Both samples should match.
420 /// let samples = condition.peek()?;
421 /// assert_eq!(samples.len(), 2);
422 ///
423 /// // Samples remain in the cache.
424 /// assert_eq!(condition.peek()?.len(), 2);
425 /// # Ok::<_, cyclonedds::Error>(())
426 /// ```
427 pub fn peek(&self) -> Result<Vec<crate::sample::SampleOrKey<T>>>
428 where
429 T: std::clone::Clone,
430 {
431 ffi::dds_peek(self.inner)
432 }
433}
434
435impl<T, F> Drop for QueryCondition<'_, '_, '_, '_, T, F>
436where
437 T: crate::Topicable,
438 F: Fn(&T) -> bool,
439{
440 fn drop(&mut self) {
441 let result = ffi::dds_delete(self.inner);
442 debug_assert!(
443 result.is_ok(),
444 "unable to delete {self:?}: failed with {result:?}"
445 );
446 }
447}
448
449#[cfg(test)]
450mod tests {
451 use super::*;
452 use crate::state;
453
454 fn query(data: &crate::tests::topic::Data) -> bool {
455 assert_eq!(data.x, 101);
456 true
457 }
458
459 #[test]
460 fn test_query_condition_create() {
461 let domain_id = crate::tests::domain::unique_id();
462 let domain = crate::Domain::new(domain_id).unwrap();
463 let topic_name = crate::tests::topic::unique_name();
464 let participant = crate::Participant::new(&domain).unwrap();
465 let topic =
466 crate::Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
467 let reader = crate::Reader::new(&topic).unwrap();
468 let _ = QueryCondition::new(
469 &reader,
470 state::sample::Any | state::instance::Any | state::view::Any,
471 query,
472 )
473 .unwrap();
474 }
475
476 #[test]
477 fn test_query_condition_create_with_invalid_reader() {
478 let domain_id = crate::tests::domain::unique_id();
479 let domain = crate::Domain::new(domain_id).unwrap();
480 let topic_name = crate::tests::topic::unique_name();
481 let participant = crate::Participant::new(&domain).unwrap();
482 let topic =
483 crate::Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
484 let mut reader = crate::Reader::new(&topic).unwrap();
485 let reader_id = reader.inner;
486 reader.inner = 0;
487 let result = QueryCondition::new(
488 &reader,
489 state::sample::Any | state::instance::Any | state::view::Any,
490 query,
491 )
492 .unwrap_err();
493 reader.inner = reader_id;
494 assert_eq!(result, crate::Error::BadParameter);
495 }
496
497 #[test]
498 fn test_query_condition_debug_formatting() {
499 let domain_id = crate::tests::domain::unique_id();
500 let domain = crate::Domain::new(domain_id).unwrap();
501 let topic_name = crate::tests::topic::unique_name();
502 let participant = crate::Participant::new(&domain).unwrap();
503 let topic =
504 crate::Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
505 let reader = crate::Reader::new(&topic).unwrap();
506 let query_condition = QueryCondition::new(
507 &reader,
508 state::sample::Any | state::instance::Any | state::view::Any,
509 query,
510 )
511 .unwrap();
512
513 let result = format!("{query_condition:?}");
514 assert!(result.contains(&format!("{}", query_condition.inner)));
515 }
516
517 #[test]
518 fn test_query_condition_get_mask() {
519 let domain_id = crate::tests::domain::unique_id();
520 let domain = crate::Domain::new(domain_id).unwrap();
521 let topic_name = crate::tests::topic::unique_name();
522 let participant = crate::Participant::new(&domain).unwrap();
523 let topic =
524 crate::Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
525 let reader = crate::Reader::new(&topic).unwrap();
526
527 let mask = state::sample::Any | state::instance::Any | state::view::Any;
528
529 let query_condition = QueryCondition::new(&reader, mask, query).unwrap();
530 let result = query_condition.mask().unwrap();
531 assert_eq!(result, mask);
532
533 let mask = state::sample::Fresh | state::instance::Unregistered | state::view::Old;
534 let result = query_condition.mask().unwrap();
535 assert_ne!(result, mask);
536
537 let read_condition = QueryCondition::new(&reader, mask, |_| false).unwrap();
538 let result = read_condition.mask().unwrap();
539 assert_eq!(result, mask);
540 }
541
542 #[test]
543 fn test_query_condition_get_mask_on_invalid_query_condition() {
544 let domain_id = crate::tests::domain::unique_id();
545 let domain = crate::Domain::new(domain_id).unwrap();
546 let topic_name = crate::tests::topic::unique_name();
547 let participant = crate::Participant::new(&domain).unwrap();
548 let topic =
549 crate::Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
550 let reader = crate::Reader::new(&topic).unwrap();
551 let mut query_condition = QueryCondition::new(
552 &reader,
553 state::sample::Any | state::instance::Any | state::view::Any,
554 query,
555 )
556 .unwrap();
557 let query_condition_id = query_condition.inner;
558 query_condition.inner = 0;
559 let result = query_condition.mask().unwrap_err();
560 assert_eq!(result, crate::Error::BadParameter);
561 let result = query_condition.triggered().unwrap_err();
562 assert_eq!(result, crate::Error::BadParameter);
563 query_condition.inner = query_condition_id;
564 }
565
566 #[test]
567 fn test_query_condition_triggering_reads() {
568 let domain_id = crate::tests::domain::unique_id();
569 let domain = crate::Domain::new(domain_id).unwrap();
570 let topic_name = crate::tests::topic::unique_name();
571 let participant = crate::Participant::new(&domain).unwrap();
572 let topic =
573 crate::Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
574 let reader = crate::Reader::new(&topic).unwrap();
575 let writer = crate::Writer::new(&topic).unwrap();
576
577 let mask = state::sample::Stale | state::instance::Any | state::view::Any;
578
579 let query_condition = QueryCondition::new(&reader, mask, query).unwrap();
580
581 let sample = crate::tests::topic::Data {
582 x: 101,
583 y: 202,
584 message: "hello".to_string(),
585 };
586 writer.write(&sample).unwrap();
587
588 let query_condition_received = query_condition.read().unwrap();
589 assert_eq!(query_condition_received.len(), 0);
590 let triggered = query_condition.triggered().unwrap();
591 assert!(!triggered);
592
593 let reader_received = reader.read().unwrap();
594 assert_eq!(reader_received.len(), 1);
595 assert_eq!(*reader_received[0], sample);
596 assert_eq!(
597 reader_received[0].info().state,
598 state::sample::Fresh | state::view::New | state::instance::Alive
599 );
600
601 let triggered = query_condition.triggered().unwrap();
602 assert!(triggered);
603
604 let query_condition_received = query_condition.peek().unwrap();
605 assert_eq!(query_condition_received.len(), 1);
606 assert_eq!(*query_condition_received[0], sample);
607
608 let triggered = query_condition.triggered().unwrap();
609 assert!(triggered);
610
611 let query_condition_received = query_condition.take().unwrap();
612 assert_eq!(query_condition_received.len(), 1);
613 assert_eq!(*query_condition_received[0], sample);
614
615 let triggered = query_condition.triggered().unwrap();
616 assert!(!triggered);
617
618 let reader_received = reader.read().unwrap();
619 assert!(reader_received.is_empty());
620
621 let query_condition_received = query_condition.read().unwrap();
622 assert!(query_condition_received.is_empty());
623 }
624
625 #[test]
626 fn test_query_condition_non_triggering_reads() {
627 let domain_id = crate::tests::domain::unique_id();
628 let domain = crate::Domain::new(domain_id).unwrap();
629 let topic_name = crate::tests::topic::unique_name();
630 let participant = crate::Participant::new(&domain).unwrap();
631 let topic =
632 crate::Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
633 let reader = crate::Reader::new(&topic).unwrap();
634 let writer = crate::Writer::new(&topic).unwrap();
635
636 let mask = state::sample::Stale | state::instance::Any | state::view::Any;
637
638 let query_condition = QueryCondition::new(&reader, mask, |_| false).unwrap();
639
640 let sample = crate::tests::topic::Data {
641 x: 101,
642 y: 202,
643 message: "hello".to_string(),
644 };
645 writer.write(&sample).unwrap();
646
647 let query_condition_received = query_condition.read().unwrap();
648 assert!(query_condition_received.is_empty());
649 let triggered = query_condition.triggered().unwrap();
650 assert!(!triggered);
651
652 let reader_received = reader.read().unwrap();
653 assert_eq!(reader_received.len(), 1);
654 assert_eq!(*reader_received[0], sample);
655 assert_eq!(
656 reader_received[0].info().state,
657 state::sample::Fresh | state::view::New | state::instance::Alive
658 );
659
660 let triggered = query_condition.triggered().unwrap();
661 assert!(!triggered);
662
663 let query_condition_received = query_condition.peek().unwrap();
664 assert!(query_condition_received.is_empty());
665
666 let triggered = query_condition.triggered().unwrap();
667 assert!(!triggered);
668
669 let query_condition_received = query_condition.take().unwrap();
670 assert_eq!(query_condition_received.len(), 0);
671
672 let triggered = query_condition.triggered().unwrap();
673 assert!(!triggered);
674
675 let reader_received = reader.read().unwrap();
676 assert_eq!(reader_received.len(), 1);
677
678 let query_condition_received = query_condition.read().unwrap();
679 assert!(query_condition_received.is_empty());
680 }
681}