use ffi::Filter;
use crate::internal::ffi;
use crate::state::State;
use crate::{Reader, Result};
pub struct QueryCondition<'domain, 'participant, 'topic, 'reader, T, F>
where
T: crate::Topicable,
F: Fn(&T) -> bool,
{
pub(crate) inner: cyclonedds_sys::dds_entity_t,
phantom_callback: std::marker::PhantomData<F>,
phantom: std::marker::PhantomData<&'reader Reader<'topic, 'domain, 'participant, T>>,
}
impl<T, F> std::fmt::Debug for QueryCondition<'_, '_, '_, '_, T, F>
where
T: crate::Topicable,
F: Fn(&T) -> bool,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("QueryCondition")
.field("inner", &self.inner)
.field("phantom", &self.phantom)
.finish()
}
}
impl<T, F> Filter<T, F> for QueryCondition<'_, '_, '_, '_, T, F>
where
T: crate::Topicable + std::panic::UnwindSafe + std::panic::RefUnwindSafe,
F: Fn(&T) -> bool,
{
}
impl<'d, 'p, 't, 'r, T, F> QueryCondition<'d, 'p, 't, 'r, T, F>
where
T: crate::Topicable + std::panic::UnwindSafe + std::panic::RefUnwindSafe,
F: Fn(&T) -> bool,
{
pub fn new(reader: &'r Reader<'d, 'p, 't, T>, mask: State, _: F) -> Result<Self> {
let _ = Self::IS_PROVIDED_CALLBACK_ZERO_SIZED;
let inner = ffi::dds_create_querycondition::<T, F, Self>(reader.inner, mask.bits())?;
Ok(Self {
inner,
phantom_callback: std::marker::PhantomData,
phantom: std::marker::PhantomData,
})
}
pub fn mask(&self) -> Result<State> {
let mask = ffi::dds_get_mask(self.inner)?;
crate::state::State::from_bits(mask).ok_or(crate::error::Error::NonSpecific)
}
pub fn triggered(&self) -> Result<bool> {
ffi::dds_triggered(self.inner)
}
pub fn take(&self) -> Result<Vec<crate::sample::SampleOrKey<T>>>
where
T: std::clone::Clone,
{
ffi::dds_take(self.inner)
}
pub fn read(&self) -> Result<Vec<crate::sample::SampleOrKey<T>>>
where
T: std::clone::Clone,
{
ffi::dds_read(self.inner)
}
pub fn peek(&self) -> Result<Vec<crate::sample::SampleOrKey<T>>>
where
T: std::clone::Clone,
{
ffi::dds_peek(self.inner)
}
}
impl<T, F> Drop for QueryCondition<'_, '_, '_, '_, T, F>
where
T: crate::Topicable,
F: Fn(&T) -> bool,
{
fn drop(&mut self) {
let result = ffi::dds_delete(self.inner);
debug_assert!(
result.is_ok(),
"unable to delete {self:?}: failed with {result:?}"
);
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::state;
fn query(data: &crate::tests::topic::Data) -> bool {
assert_eq!(data.x, 101);
true
}
#[test]
fn test_query_condition_create() {
let domain_id = crate::tests::domain::unique_id();
let domain = crate::Domain::new(domain_id).unwrap();
let topic_name = crate::tests::topic::unique_name();
let participant = crate::Participant::new(&domain).unwrap();
let topic =
crate::Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
let reader = crate::Reader::new(&topic).unwrap();
let _ = QueryCondition::new(
&reader,
state::sample::Any | state::instance::Any | state::view::Any,
query,
)
.unwrap();
}
#[test]
fn test_query_condition_create_with_invalid_reader() {
let domain_id = crate::tests::domain::unique_id();
let domain = crate::Domain::new(domain_id).unwrap();
let topic_name = crate::tests::topic::unique_name();
let participant = crate::Participant::new(&domain).unwrap();
let topic =
crate::Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
let mut reader = crate::Reader::new(&topic).unwrap();
let reader_id = reader.inner;
reader.inner = 0;
let result = QueryCondition::new(
&reader,
state::sample::Any | state::instance::Any | state::view::Any,
query,
)
.unwrap_err();
reader.inner = reader_id;
assert_eq!(result, crate::Error::BadParameter);
}
#[test]
fn test_query_condition_debug_formatting() {
let domain_id = crate::tests::domain::unique_id();
let domain = crate::Domain::new(domain_id).unwrap();
let topic_name = crate::tests::topic::unique_name();
let participant = crate::Participant::new(&domain).unwrap();
let topic =
crate::Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
let reader = crate::Reader::new(&topic).unwrap();
let query_condition = QueryCondition::new(
&reader,
state::sample::Any | state::instance::Any | state::view::Any,
query,
)
.unwrap();
let result = format!("{query_condition:?}");
assert!(result.contains(&format!("{}", query_condition.inner)));
}
#[test]
fn test_query_condition_get_mask() {
let domain_id = crate::tests::domain::unique_id();
let domain = crate::Domain::new(domain_id).unwrap();
let topic_name = crate::tests::topic::unique_name();
let participant = crate::Participant::new(&domain).unwrap();
let topic =
crate::Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
let reader = crate::Reader::new(&topic).unwrap();
let mask = state::sample::Any | state::instance::Any | state::view::Any;
let query_condition = QueryCondition::new(&reader, mask, query).unwrap();
let result = query_condition.mask().unwrap();
assert_eq!(result, mask);
let mask = state::sample::Fresh | state::instance::Unregistered | state::view::Old;
let result = query_condition.mask().unwrap();
assert_ne!(result, mask);
let read_condition = QueryCondition::new(&reader, mask, |_| false).unwrap();
let result = read_condition.mask().unwrap();
assert_eq!(result, mask);
}
#[test]
fn test_query_condition_get_mask_on_invalid_query_condition() {
let domain_id = crate::tests::domain::unique_id();
let domain = crate::Domain::new(domain_id).unwrap();
let topic_name = crate::tests::topic::unique_name();
let participant = crate::Participant::new(&domain).unwrap();
let topic =
crate::Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
let reader = crate::Reader::new(&topic).unwrap();
let mut query_condition = QueryCondition::new(
&reader,
state::sample::Any | state::instance::Any | state::view::Any,
query,
)
.unwrap();
let query_condition_id = query_condition.inner;
query_condition.inner = 0;
let result = query_condition.mask().unwrap_err();
assert_eq!(result, crate::Error::BadParameter);
let result = query_condition.triggered().unwrap_err();
assert_eq!(result, crate::Error::BadParameter);
query_condition.inner = query_condition_id;
}
#[test]
fn test_query_condition_triggering_reads() {
let domain_id = crate::tests::domain::unique_id();
let domain = crate::Domain::new(domain_id).unwrap();
let topic_name = crate::tests::topic::unique_name();
let participant = crate::Participant::new(&domain).unwrap();
let topic =
crate::Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
let reader = crate::Reader::new(&topic).unwrap();
let writer = crate::Writer::new(&topic).unwrap();
let mask = state::sample::Stale | state::instance::Any | state::view::Any;
let query_condition = QueryCondition::new(&reader, mask, query).unwrap();
let sample = crate::tests::topic::Data {
x: 101,
y: 202,
message: "hello".to_string(),
};
writer.write(&sample).unwrap();
let query_condition_received = query_condition.read().unwrap();
assert_eq!(query_condition_received.len(), 0);
let triggered = query_condition.triggered().unwrap();
assert!(!triggered);
let reader_received = reader.read().unwrap();
assert_eq!(reader_received.len(), 1);
assert_eq!(*reader_received[0], sample);
assert_eq!(
reader_received[0].info().state,
state::sample::Fresh | state::view::New | state::instance::Alive
);
let triggered = query_condition.triggered().unwrap();
assert!(triggered);
let query_condition_received = query_condition.peek().unwrap();
assert_eq!(query_condition_received.len(), 1);
assert_eq!(*query_condition_received[0], sample);
let triggered = query_condition.triggered().unwrap();
assert!(triggered);
let query_condition_received = query_condition.take().unwrap();
assert_eq!(query_condition_received.len(), 1);
assert_eq!(*query_condition_received[0], sample);
let triggered = query_condition.triggered().unwrap();
assert!(!triggered);
let reader_received = reader.read().unwrap();
assert!(reader_received.is_empty());
let query_condition_received = query_condition.read().unwrap();
assert!(query_condition_received.is_empty());
}
#[test]
fn test_query_condition_non_triggering_reads() {
let domain_id = crate::tests::domain::unique_id();
let domain = crate::Domain::new(domain_id).unwrap();
let topic_name = crate::tests::topic::unique_name();
let participant = crate::Participant::new(&domain).unwrap();
let topic =
crate::Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
let reader = crate::Reader::new(&topic).unwrap();
let writer = crate::Writer::new(&topic).unwrap();
let mask = state::sample::Stale | state::instance::Any | state::view::Any;
let query_condition = QueryCondition::new(&reader, mask, |_| false).unwrap();
let sample = crate::tests::topic::Data {
x: 101,
y: 202,
message: "hello".to_string(),
};
writer.write(&sample).unwrap();
let query_condition_received = query_condition.read().unwrap();
assert!(query_condition_received.is_empty());
let triggered = query_condition.triggered().unwrap();
assert!(!triggered);
let reader_received = reader.read().unwrap();
assert_eq!(reader_received.len(), 1);
assert_eq!(*reader_received[0], sample);
assert_eq!(
reader_received[0].info().state,
state::sample::Fresh | state::view::New | state::instance::Alive
);
let triggered = query_condition.triggered().unwrap();
assert!(!triggered);
let query_condition_received = query_condition.peek().unwrap();
assert!(query_condition_received.is_empty());
let triggered = query_condition.triggered().unwrap();
assert!(!triggered);
let query_condition_received = query_condition.take().unwrap();
assert_eq!(query_condition_received.len(), 0);
let triggered = query_condition.triggered().unwrap();
assert!(!triggered);
let reader_received = reader.read().unwrap();
assert_eq!(reader_received.len(), 1);
let query_condition_received = query_condition.read().unwrap();
assert!(query_condition_received.is_empty());
}
}