Skip to main content

cyclonedds/
query_condition.rs

1use ffi::Filter;
2
3use crate::internal::ffi;
4use crate::state::State;
5use crate::{Reader, Result};
6
7/// A filter on a [`Reader`](crate::Reader) that restricts samples by their
8/// [`State`](crate::State) and a predicate.
9///
10/// A `QueryCondition` extends [`ReadCondition`](crate::ReadCondition) with a
11/// user-supplied predicate `F` that is applied to each sample's payload. Only
12/// samples that both match the state mask and satisfy the predicate are
13/// returned by reads and trigger waitset wakeups.
14///
15/// # Predicate requirements
16/// The predicate closure provided to the [`QueryCondition`] must be
17/// *zero-sized*.
18///
19/// This restriction comes from how the callback is integrated with the
20/// underlying C API. For ergonomics, the closure is reconstructed inside a
21/// wrapper that converts raw C types into Rust types before invoking it. That
22/// reconstruction depends only on the closure's type, so it cannot rely on any
23/// captured state.
24///
25/// In practice, this means the callback must be either:
26///
27///   - a [function item](https://doc.rust-lang.org/reference/types/function-item.html),
28///     which is always zero-sized, or
29///   - a [closure](https://doc.rust-lang.org/reference/types/closure.html) that
30///     does not capture any variables from its environment
31///
32/// The following example will **fail to compile** because the closure captures
33/// `x`, making it non-zero-sized:
34///
35/// ```compile_fail
36/// use cyclonedds as dds;
37/// use dds::state;
38///
39/// # #[derive(
40/// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
41/// # )]
42/// struct Data {
43///     x: i32,
44/// }
45/// fn create_your_reader() -> dds::Reader<'static, 'static, 'static, Data> {
46///     unimplemented!()
47/// }
48/// let reader: dds::Reader<Data> = create_your_reader();
49/// let x = 10;
50/// let result = dds::QueryCondition::new(
51///     &reader,
52///     state::sample::Any | state::instance::Any | state::view::Any,
53///     // Error: closure captures `x`, so it is not zero-sized.
54///     |sample| sample.x < x,
55/// )?;
56/// # Ok::<(), dds::Error>(())
57/// ```
58///
59/// Instead, use a function item or a non-capturing closure, for example:
60///
61/// ```ignore
62/// |sample| sample.x < 10
63/// ```
64///
65/// The compiler will emit an error similar to:
66///
67/// ```text
68/// error[E0080]: evaluation panicked: the provided callback is not zero-sized
69///   = note: closures that capture values from their environment are not zero-sized
70///   = help: ensure the callback is either:
71///           - a function item, e.g. `fn my_callback() {}`
72///           - a closure that does not capture any external state
73/// ```
74///
75/// This is enforced via an internal compile-time assertion
76/// `assert!(size_of::<F>() == 0)` but note that the associated compiler output
77/// can be quite lengthy.
78///
79/// <details>
80/// <summary>Click to see a sample of the full compiler output</summary>
81///
82/// ```text
83/// error[E0080]: evaluation panicked: the provided callback is not zero-sized
84///                 = note: closures that capture values from their environment are not zero-sized
85///                 = help: ensure the callback is either:
86///                         - a function item, e.g. `fn my_callback() {}`
87///                         - a closure that does not capture any external state
88/// --> cyclonedds/src/internal/ffi.rs
89///  |
90///  |       const IS_PROVIDED_CALLBACK_ZERO_SIZED: () = assert!(
91///  |  _________________________________________________^
92///  | |         size_of::<F>() == 0,
93///  | |         "\
94///  | | the provided callback is not zero-sized
95///    |
96///  | | "
97///  | | );
98///  | |_^ evaluation of `<QueryCondition as Filter>::IS_PROVIDED_CALLBACK_ZERO_SIZED` failed here
99///
100/// note: erroneous constant encountered
101/// --> cyclonedds/src/query_condition.rs:84:17
102///  |
103///  |         Self::IS_PROVIDED_CALLBACK_ZERO_SIZED;
104///  |                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
105///
106/// ...
107///
108/// note: erroneous constant encountered
109/// --> cyclonedds/src/internal/ffi.rs
110///  |
111///  |     Callback::IS_PROVIDED_CALLBACK_ZERO_SIZED;
112///  |             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
113/// ```
114/// </details>
115pub struct QueryCondition<'domain, 'participant, 'topic, 'reader, T, F>
116where
117    T: crate::Topicable,
118    F: Fn(&T) -> bool,
119{
120    pub(crate) inner: cyclonedds_sys::dds_entity_t,
121    phantom_callback: std::marker::PhantomData<F>,
122    phantom: std::marker::PhantomData<&'reader Reader<'topic, 'domain, 'participant, T>>,
123}
124
125impl<T, F> std::fmt::Debug for QueryCondition<'_, '_, '_, '_, T, F>
126where
127    T: crate::Topicable,
128    F: Fn(&T) -> bool,
129{
130    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
131        f.debug_struct("QueryCondition")
132            .field("inner", &self.inner)
133            .field("phantom", &self.phantom)
134            .finish()
135    }
136}
137
138impl<T, F> Filter<T, F> for QueryCondition<'_, '_, '_, '_, T, F>
139where
140    T: crate::Topicable + std::panic::UnwindSafe + std::panic::RefUnwindSafe,
141    F: Fn(&T) -> bool,
142{
143}
144
145impl<'d, 'p, 't, 'r, T, F> QueryCondition<'d, 'p, 't, 'r, T, F>
146where
147    T: crate::Topicable + std::panic::UnwindSafe + std::panic::RefUnwindSafe,
148    F: Fn(&T) -> bool,
149{
150    /// Creates a new `QueryCondition` on `reader` that matches samples whose
151    /// state satisfies `mask` and whose payload satisfies the predicate `F`.
152    ///
153    /// The predicate is passed by value but must be zero-sized. Non-capturing
154    /// closures and function pointers satisfy this requirement. The
155    /// [`UnwindSafe`](std::panic::UnwindSafe) bound is required because the
156    /// predicate is called across the FFI boundary from within Cyclone DDS.
157    ///
158    /// # Errors
159    ///
160    /// Returns an [`Error`](crate::Error) if `F` is not zero-sized or if the
161    /// query condition fails to create.
162    ///
163    /// # Examples
164    ///
165    /// ```
166    /// use cyclonedds::{QueryCondition, state};
167    /// # use cyclonedds::{Domain, Participant, Topic, Reader};
168    /// # let domain = Domain::default();
169    /// # let participant = Participant::new(&domain)?;
170    /// # #[derive(
171    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
172    /// # )]
173    /// # struct Data {
174    /// #     x: i32,
175    /// #     y: i32,
176    /// # }
177    ///
178    /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
179    /// let reader = Reader::new(&topic)?;
180    /// let condition =
181    ///     QueryCondition::new(&reader, state::sample::Fresh, |sample: &Data| sample.x > 10)?;
182    /// # Ok::<_, cyclonedds::Error>(())
183    /// ```
184    pub fn new(reader: &'r Reader<'d, 'p, 't, T>, mask: State, _: F) -> Result<Self> {
185        let _ = Self::IS_PROVIDED_CALLBACK_ZERO_SIZED;
186        let inner = ffi::dds_create_querycondition::<T, F, Self>(reader.inner, mask.bits())?;
187        Ok(Self {
188            inner,
189            phantom_callback: std::marker::PhantomData,
190            phantom: std::marker::PhantomData,
191        })
192    }
193
194    /// Returns the state mask this condition was created with.
195    ///
196    /// # Errors
197    ///
198    /// Returns an [`Error`](crate::Error) if the mask returned by the query
199    /// condition is invalid.
200    ///
201    /// # Examples
202    ///
203    /// ```
204    /// use cyclonedds::{QueryCondition, state};
205    /// # use cyclonedds::{Domain, Participant, Topic, Reader};
206    /// # let domain = Domain::default();
207    /// # let participant = Participant::new(&domain)?;
208    /// # #[derive(
209    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
210    /// # )]
211    /// # struct Data {
212    /// #     x: i32,
213    /// #     y: i32,
214    /// # }
215    ///
216    /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
217    /// let reader = Reader::new(&topic)?;
218    /// let condition = QueryCondition::new(&reader, state::sample::Fresh, |_: &Data| true)?;
219    /// assert_eq!(condition.mask()?, state::sample::Fresh);
220    /// # Ok::<_, cyclonedds::Error>(())
221    /// ```
222    pub fn mask(&self) -> Result<State> {
223        let mask = ffi::dds_get_mask(self.inner)?;
224        crate::state::State::from_bits(mask).ok_or(crate::error::Error::NonSpecific)
225    }
226
227    /// Returns `true` if this condition is currently triggered.
228    ///
229    /// A condition is triggered when samples matching its mask are available
230    /// in the reader cache.
231    ///
232    /// # Errors
233    ///
234    /// returns an [`Error`](crate::Error) if the query condition fails to read
235    /// the trigger state.
236    ///
237    /// # Examples
238    ///
239    /// ```
240    /// use cyclonedds::{QueryCondition, state};
241    /// # use cyclonedds::{Domain, Participant, Topic, Reader, Writer};
242    /// # let domain = Domain::default();
243    /// # let participant = Participant::new(&domain)?;
244    /// # #[derive(
245    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
246    /// # )]
247    /// # struct Data {
248    /// #     x: i32,
249    /// #     y: i32,
250    /// # }
251    ///
252    /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
253    /// let reader = Reader::new(&topic)?;
254    /// let writer = Writer::new(&topic)?;
255    ///
256    /// let condition = QueryCondition::new(
257    ///     &reader,
258    ///     state::sample::Fresh | state::view::Any | state::instance::Any,
259    ///     |sample: &Data| sample.x == 4 && sample.y == 0,
260    /// )?;
261    /// writer.write(&Data { x: 4, y: 0 })?;
262    /// assert!(condition.triggered()?);
263    /// # Ok::<_, cyclonedds::Error>(())
264    /// ```
265    pub fn triggered(&self) -> Result<bool> {
266        ffi::dds_triggered(self.inner)
267    }
268
269    /// Removes and returns all samples matching this condition's mask and
270    /// predicate from the reader cache.
271    ///
272    /// # Errors
273    ///
274    /// Returns an [`Error`](crate::Error) if the query condition fails to take
275    /// samples.
276    ///
277    /// # Examples
278    ///
279    /// ```
280    /// use cyclonedds::{QueryCondition, state};
281    /// # use cyclonedds::{Domain, Participant, Topic, Reader, Writer};
282    /// # let domain = Domain::default();
283    /// # let participant = Participant::new(&domain)?;
284    /// # #[derive(
285    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
286    /// # )]
287    /// # struct Data {
288    /// #     #[dds(key)]
289    /// #     x: i32,
290    /// #     y: i32,
291    /// # }
292    ///
293    /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
294    /// let reader = Reader::new(&topic)?;
295    /// let writer = Writer::new(&topic)?;
296    ///
297    /// let condition = QueryCondition::new(
298    ///     &reader,
299    ///     state::sample::Any | state::view::Any | state::instance::Any,
300    ///     |sample: &Data| true,
301    /// )?;
302    ///
303    /// writer.write(&Data { x: 1, y: 0 })?;
304    /// writer.write(&Data { x: 100, y: 0 })?;
305    ///
306    /// // Attempt a normal read.
307    /// assert_eq!(reader.peek()?.len(), 2);
308    ///
309    /// // Both samples should match.
310    /// let samples = condition.take()?;
311    /// assert_eq!(samples.len(), 2);
312    ///
313    /// // Samples should be removed from the cache.
314    /// assert_eq!(condition.take()?.len(), 0);
315    /// # Ok::<_, cyclonedds::Error>(())
316    /// ```
317    pub fn take(&self) -> Result<Vec<crate::sample::SampleOrKey<T>>>
318    where
319        T: std::clone::Clone,
320    {
321        ffi::dds_take(self.inner)
322    }
323
324    /// Returns all samples matching this condition's mask and predicate without
325    /// marking them as read or removing them from the cache.
326    ///
327    /// # Errors
328    ///
329    /// Returns an [`Error`](crate::Error) if the query condition fails to peek
330    /// samples.
331    ///
332    /// # Examples
333    ///
334    /// ```
335    /// use cyclonedds::{QueryCondition, state};
336    /// # use cyclonedds::{Domain, Participant, Topic, Reader, Writer};
337    /// # let domain = Domain::default();
338    /// # let participant = Participant::new(&domain)?;
339    /// # #[derive(
340    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
341    /// # )]
342    /// # struct Data {
343    /// #     #[dds(key)]
344    /// #     x: i32,
345    /// #     y: i32,
346    /// # }
347    ///
348    /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
349    /// let reader = Reader::new(&topic)?;
350    /// let writer = Writer::new(&topic)?;
351    ///
352    /// let condition = QueryCondition::new(
353    ///     &reader,
354    ///     state::sample::Any | state::view::Any | state::instance::Any,
355    ///     |sample: &Data| true,
356    /// )?;
357    ///
358    /// writer.write(&Data { x: 1, y: 0 })?;
359    /// writer.write(&Data { x: 100, y: 0 })?;
360    ///
361    /// // Attempt a normal read.
362    /// assert_eq!(reader.read()?.len(), 2);
363    ///
364    /// // Both samples should match.
365    /// let samples = condition.read()?;
366    /// assert_eq!(samples.len(), 2);
367    ///
368    /// // Samples remain in the cache.
369    /// assert_eq!(condition.read()?.len(), 2);
370    /// # Ok::<_, cyclonedds::Error>(())
371    /// ```
372    pub fn read(&self) -> Result<Vec<crate::sample::SampleOrKey<T>>>
373    where
374        T: std::clone::Clone,
375    {
376        ffi::dds_read(self.inner)
377    }
378
379    /// Returns all samples matching this condition's mask and predicate without
380    /// marking them as read or removing them from the cache.
381    ///
382    /// # Errors
383    ///
384    /// Returns an [`Error`](crate::Error) if the query condition fails to peek
385    /// samples.
386    ///
387    /// # Examples
388    ///
389    /// ```
390    /// use cyclonedds::{QueryCondition, state};
391    /// # use cyclonedds::{Domain, Participant, Topic, Reader, Writer};
392    /// # let domain = Domain::default();
393    /// # let participant = Participant::new(&domain)?;
394    /// # #[derive(
395    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
396    /// # )]
397    /// # struct Data {
398    /// #     #[dds(key)]
399    /// #     x: i32,
400    /// #     y: i32,
401    /// # }
402    ///
403    /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
404    /// let reader = Reader::new(&topic)?;
405    /// let writer = Writer::new(&topic)?;
406    ///
407    /// let condition = QueryCondition::new(
408    ///     &reader,
409    ///     state::sample::Any | state::view::Any | state::instance::Any,
410    ///     |sample: &Data| true,
411    /// )?;
412    ///
413    /// writer.write(&Data { x: 1, y: 0 })?;
414    /// writer.write(&Data { x: 100, y: 0 })?;
415    ///
416    /// // Attempt a normal peek.
417    /// assert_eq!(reader.peek()?.len(), 2);
418    ///
419    /// // Both samples should match.
420    /// let samples = condition.peek()?;
421    /// assert_eq!(samples.len(), 2);
422    ///
423    /// // Samples remain in the cache.
424    /// assert_eq!(condition.peek()?.len(), 2);
425    /// # Ok::<_, cyclonedds::Error>(())
426    /// ```
427    pub fn peek(&self) -> Result<Vec<crate::sample::SampleOrKey<T>>>
428    where
429        T: std::clone::Clone,
430    {
431        ffi::dds_peek(self.inner)
432    }
433}
434
435impl<T, F> Drop for QueryCondition<'_, '_, '_, '_, T, F>
436where
437    T: crate::Topicable,
438    F: Fn(&T) -> bool,
439{
440    fn drop(&mut self) {
441        let result = ffi::dds_delete(self.inner);
442        debug_assert!(
443            result.is_ok(),
444            "unable to delete {self:?}: failed with {result:?}"
445        );
446    }
447}
448
449#[cfg(test)]
450mod tests {
451    use super::*;
452    use crate::state;
453
454    fn query(data: &crate::tests::topic::Data) -> bool {
455        assert_eq!(data.x, 101);
456        true
457    }
458
459    #[test]
460    fn test_query_condition_create() {
461        let domain_id = crate::tests::domain::unique_id();
462        let domain = crate::Domain::new(domain_id).unwrap();
463        let topic_name = crate::tests::topic::unique_name();
464        let participant = crate::Participant::new(&domain).unwrap();
465        let topic =
466            crate::Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
467        let reader = crate::Reader::new(&topic).unwrap();
468        let _ = QueryCondition::new(
469            &reader,
470            state::sample::Any | state::instance::Any | state::view::Any,
471            query,
472        )
473        .unwrap();
474    }
475
476    #[test]
477    fn test_query_condition_create_with_invalid_reader() {
478        let domain_id = crate::tests::domain::unique_id();
479        let domain = crate::Domain::new(domain_id).unwrap();
480        let topic_name = crate::tests::topic::unique_name();
481        let participant = crate::Participant::new(&domain).unwrap();
482        let topic =
483            crate::Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
484        let mut reader = crate::Reader::new(&topic).unwrap();
485        let reader_id = reader.inner;
486        reader.inner = 0;
487        let result = QueryCondition::new(
488            &reader,
489            state::sample::Any | state::instance::Any | state::view::Any,
490            query,
491        )
492        .unwrap_err();
493        reader.inner = reader_id;
494        assert_eq!(result, crate::Error::BadParameter);
495    }
496
497    #[test]
498    fn test_query_condition_debug_formatting() {
499        let domain_id = crate::tests::domain::unique_id();
500        let domain = crate::Domain::new(domain_id).unwrap();
501        let topic_name = crate::tests::topic::unique_name();
502        let participant = crate::Participant::new(&domain).unwrap();
503        let topic =
504            crate::Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
505        let reader = crate::Reader::new(&topic).unwrap();
506        let query_condition = QueryCondition::new(
507            &reader,
508            state::sample::Any | state::instance::Any | state::view::Any,
509            query,
510        )
511        .unwrap();
512
513        let result = format!("{query_condition:?}");
514        assert!(result.contains(&format!("{}", query_condition.inner)));
515    }
516
517    #[test]
518    fn test_query_condition_get_mask() {
519        let domain_id = crate::tests::domain::unique_id();
520        let domain = crate::Domain::new(domain_id).unwrap();
521        let topic_name = crate::tests::topic::unique_name();
522        let participant = crate::Participant::new(&domain).unwrap();
523        let topic =
524            crate::Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
525        let reader = crate::Reader::new(&topic).unwrap();
526
527        let mask = state::sample::Any | state::instance::Any | state::view::Any;
528
529        let query_condition = QueryCondition::new(&reader, mask, query).unwrap();
530        let result = query_condition.mask().unwrap();
531        assert_eq!(result, mask);
532
533        let mask = state::sample::Fresh | state::instance::Unregistered | state::view::Old;
534        let result = query_condition.mask().unwrap();
535        assert_ne!(result, mask);
536
537        let read_condition = QueryCondition::new(&reader, mask, |_| false).unwrap();
538        let result = read_condition.mask().unwrap();
539        assert_eq!(result, mask);
540    }
541
542    #[test]
543    fn test_query_condition_get_mask_on_invalid_query_condition() {
544        let domain_id = crate::tests::domain::unique_id();
545        let domain = crate::Domain::new(domain_id).unwrap();
546        let topic_name = crate::tests::topic::unique_name();
547        let participant = crate::Participant::new(&domain).unwrap();
548        let topic =
549            crate::Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
550        let reader = crate::Reader::new(&topic).unwrap();
551        let mut query_condition = QueryCondition::new(
552            &reader,
553            state::sample::Any | state::instance::Any | state::view::Any,
554            query,
555        )
556        .unwrap();
557        let query_condition_id = query_condition.inner;
558        query_condition.inner = 0;
559        let result = query_condition.mask().unwrap_err();
560        assert_eq!(result, crate::Error::BadParameter);
561        let result = query_condition.triggered().unwrap_err();
562        assert_eq!(result, crate::Error::BadParameter);
563        query_condition.inner = query_condition_id;
564    }
565
566    #[test]
567    fn test_query_condition_triggering_reads() {
568        let domain_id = crate::tests::domain::unique_id();
569        let domain = crate::Domain::new(domain_id).unwrap();
570        let topic_name = crate::tests::topic::unique_name();
571        let participant = crate::Participant::new(&domain).unwrap();
572        let topic =
573            crate::Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
574        let reader = crate::Reader::new(&topic).unwrap();
575        let writer = crate::Writer::new(&topic).unwrap();
576
577        let mask = state::sample::Stale | state::instance::Any | state::view::Any;
578
579        let query_condition = QueryCondition::new(&reader, mask, query).unwrap();
580
581        let sample = crate::tests::topic::Data {
582            x: 101,
583            y: 202,
584            message: "hello".to_string(),
585        };
586        writer.write(&sample).unwrap();
587
588        let query_condition_received = query_condition.read().unwrap();
589        assert_eq!(query_condition_received.len(), 0);
590        let triggered = query_condition.triggered().unwrap();
591        assert!(!triggered);
592
593        let reader_received = reader.read().unwrap();
594        assert_eq!(reader_received.len(), 1);
595        assert_eq!(*reader_received[0], sample);
596        assert_eq!(
597            reader_received[0].info().state,
598            state::sample::Fresh | state::view::New | state::instance::Alive
599        );
600
601        let triggered = query_condition.triggered().unwrap();
602        assert!(triggered);
603
604        let query_condition_received = query_condition.peek().unwrap();
605        assert_eq!(query_condition_received.len(), 1);
606        assert_eq!(*query_condition_received[0], sample);
607
608        let triggered = query_condition.triggered().unwrap();
609        assert!(triggered);
610
611        let query_condition_received = query_condition.take().unwrap();
612        assert_eq!(query_condition_received.len(), 1);
613        assert_eq!(*query_condition_received[0], sample);
614
615        let triggered = query_condition.triggered().unwrap();
616        assert!(!triggered);
617
618        let reader_received = reader.read().unwrap();
619        assert!(reader_received.is_empty());
620
621        let query_condition_received = query_condition.read().unwrap();
622        assert!(query_condition_received.is_empty());
623    }
624
625    #[test]
626    fn test_query_condition_non_triggering_reads() {
627        let domain_id = crate::tests::domain::unique_id();
628        let domain = crate::Domain::new(domain_id).unwrap();
629        let topic_name = crate::tests::topic::unique_name();
630        let participant = crate::Participant::new(&domain).unwrap();
631        let topic =
632            crate::Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
633        let reader = crate::Reader::new(&topic).unwrap();
634        let writer = crate::Writer::new(&topic).unwrap();
635
636        let mask = state::sample::Stale | state::instance::Any | state::view::Any;
637
638        let query_condition = QueryCondition::new(&reader, mask, |_| false).unwrap();
639
640        let sample = crate::tests::topic::Data {
641            x: 101,
642            y: 202,
643            message: "hello".to_string(),
644        };
645        writer.write(&sample).unwrap();
646
647        let query_condition_received = query_condition.read().unwrap();
648        assert!(query_condition_received.is_empty());
649        let triggered = query_condition.triggered().unwrap();
650        assert!(!triggered);
651
652        let reader_received = reader.read().unwrap();
653        assert_eq!(reader_received.len(), 1);
654        assert_eq!(*reader_received[0], sample);
655        assert_eq!(
656            reader_received[0].info().state,
657            state::sample::Fresh | state::view::New | state::instance::Alive
658        );
659
660        let triggered = query_condition.triggered().unwrap();
661        assert!(!triggered);
662
663        let query_condition_received = query_condition.peek().unwrap();
664        assert!(query_condition_received.is_empty());
665
666        let triggered = query_condition.triggered().unwrap();
667        assert!(!triggered);
668
669        let query_condition_received = query_condition.take().unwrap();
670        assert_eq!(query_condition_received.len(), 0);
671
672        let triggered = query_condition.triggered().unwrap();
673        assert!(!triggered);
674
675        let reader_received = reader.read().unwrap();
676        assert_eq!(reader_received.len(), 1);
677
678        let query_condition_received = query_condition.read().unwrap();
679        assert!(query_condition_received.is_empty());
680    }
681}