dust_dds 0.1.0

Data Distribution Service (DDS) implementation
Documentation
use crate::{
    domain::{
        domain_participant::DomainParticipant, domain_participant_factory::THE_PARTICIPANT_FACTORY,
    },
    implementation::dds_impl::builtin_subscriber::BuiltInSubscriber,
    infrastructure::{
        condition::StatusCondition,
        error::{DdsError, DdsResult},
        qos::{DataReaderQos, QosKind, SubscriberQos, TopicQos},
        status::{SampleLostStatus, StatusKind},
    },
    topic_definition::type_support::{DdsDeserialize, DdsType},
};
use crate::{
    implementation::{
        dds_impl::{
            user_defined_data_reader::AnyDataReaderListener,
            user_defined_subscriber::UserDefinedSubscriber,
        },
        utils::shared_object::DdsWeak,
    },
    infrastructure::instance::InstanceHandle,
};

use crate::topic_definition::topic::Topic;

use super::{
    data_reader::DataReader, data_reader_listener::DataReaderListener,
    subscriber_listener::SubscriberListener,
};

#[derive(PartialEq, Debug)]
pub(crate) enum SubscriberKind {
    BuiltIn(DdsWeak<BuiltInSubscriber>),
    UserDefined(DdsWeak<UserDefinedSubscriber>),
}

/// A [`Subscriber`] is the object responsible for the actual reception of the data resulting from its subscriptions.
///
/// A [`Subscriber`] acts on the behalf of one or several [`DataReader`] objects that are related to it. When it receives data (from the
/// other parts of the system), it builds the list of concerned [`DataReader`] objects, and then indicates to the application that data is
/// available, through its listener or by enabling related conditions.
#[derive(PartialEq, Debug)]
pub struct Subscriber(SubscriberKind);

impl Subscriber {
    pub(crate) fn new(subscriber_impl: SubscriberKind) -> Self {
        Self(subscriber_impl)
    }
}

impl Drop for Subscriber {
    fn drop(&mut self) {
        match &self.0 {
            SubscriberKind::BuiltIn(_) => (), // Built-in subscribers don't get deleted
            SubscriberKind::UserDefined(subscriber) => {
                if subscriber.weak_count() == 1 {
                    if let Ok(p) = self.get_participant() {
                        p.delete_subscriber(self).ok();
                    }
                }
            }
        }
    }
}

impl Subscriber {
    /// This operation creates a [`DataReader`]. The returned [`DataReader`] will be attached and belong to the [`Subscriber`].
    /// The [`DataReader`] returned by this operation has an associated [`Topic`] and a type `Foo`.
    /// The [`Topic`] passed to this operation must have been created from the same [`DomainParticipant`] that was used to create this
    /// [`Subscriber`]. If the [`Topic`] was created from a different [`DomainParticipant`], the operation will fail and
    /// return a [`DdsError::PreconditionNotMet`](crate::infrastructure::error::DdsError). In case of failure, the operation
    /// will return an error and no writer will be created.
    ///
    /// The special value [`QosKind::Default`] can be used to indicate that the [`DataReader`] should be created with the
    /// default qos set in the factory. The use of this value is equivalent to the application obtaining the default
    /// [`DataReaderQos`] by means of the operation [`Subscriber::get_default_datareader_qos`] and using the resulting qos
    /// to create the [`DataReader`]. A common application pattern to construct the [`DataReaderQos`] to ensure consistency with the
    /// associated [`TopicQos`] is to:
    /// 1. Retrieve the QoS policies on the associated [`Topic`] by means of the [`Topic::get_qos`] operation.
    /// 2. Retrieve the default [`DataReaderQos`] qos by means of the [`Subscriber::get_default_datareader_qos`] operation.
    /// 3. Combine those two qos policies using the [`Subscriber::copy_from_topic_qos`] and selectively modify policies as desired and
    /// use the resulting [`DataReaderQos`] to construct the [`DataReader`].
    pub fn create_datareader<Foo>(
        &self,
        a_topic: &Topic<Foo>,
        qos: QosKind<DataReaderQos>,
        a_listener: Option<Box<dyn DataReaderListener<Foo = Foo> + Send + Sync>>,
        mask: &[StatusKind],
    ) -> DdsResult<DataReader<Foo>>
    where
        Foo: DdsType + for<'de> DdsDeserialize<'de> + 'static,
    {
        match &self.0 {
            SubscriberKind::BuiltIn(_) => Err(DdsError::IllegalOperation),
            SubscriberKind::UserDefined(s) =>
            {
                #[allow(clippy::redundant_closure)]
                s.upgrade()?
                    .create_datareader::<Foo>(
                        &a_topic.0.upgrade()?,
                        qos,
                        a_listener.map::<Box<dyn AnyDataReaderListener + Send + Sync>, _>(|x| {
                            Box::new(x)
                        }),
                        mask,
                        &THE_PARTICIPANT_FACTORY
                            .lookup_participant_by_entity_handle(self.get_instance_handle()?),
                    )
                    .map(|x| DataReader::new(x.downgrade()))
            }
        }
    }

    /// This operation deletes a [`DataReader`] that belongs to the [`Subscriber`]. This operation must be called on the
    /// same [`Subscriber`] object used to create the [`DataReader`]. If [`Subscriber::delete_datareader`] is called on a
    /// different [`Subscriber`], the operation will have no effect and it will return [`DdsError::PreconditionNotMet`](crate::infrastructure::error::DdsError).
    pub fn delete_datareader<Foo>(&self, a_datareader: &DataReader<Foo>) -> DdsResult<()>
    where
        Foo: DdsType + for<'de> DdsDeserialize<'de> + 'static,
    {
        match &self.0 {
            SubscriberKind::BuiltIn(_) => Err(DdsError::IllegalOperation),
            SubscriberKind::UserDefined(s) => s
                .upgrade()?
                .delete_datareader(a_datareader.get_instance_handle()?),
        }
    }

    /// This operation retrieves a previously created [`DataReader`] belonging to the [`Subscriber`] that is attached to a [`Topic`].
    /// If no such [`DataReader`] exists, the operation will succeed but return [`None`].
    /// If multiple [`DataReader`] attached to the [`Subscriber`] satisfy this condition, then the operation will return one of them. It is not
    /// specified which one.
    /// The use of this operation on the built-in [`Subscriber`] allows access to the built-in [`DataReader`] entities for the built-in topics.
    pub fn lookup_datareader<Foo>(&self, topic: &Topic<Foo>) -> DdsResult<Option<DataReader<Foo>>>
    where
        Foo: DdsType + for<'de> DdsDeserialize<'de>,
    {
        match &self.0 {
            SubscriberKind::BuiltIn(s) => s
                .upgrade()?
                .lookup_datareader::<Foo>(&topic.0.upgrade()?)
                .map(|x| Some(DataReader::new(x.downgrade()))),
            SubscriberKind::UserDefined(s) => s
                .upgrade()?
                .lookup_datareader::<Foo>(&topic.0.upgrade()?)
                .map(|x| Some(DataReader::new(x.downgrade()))),
        }
    }

    /// This operation invokes the operation [`DataReaderListener::on_data_available`] on the listener objects attached to contained [`DataReader`]
    /// entities with a [`StatusKind::DataAvailable`] that is considered changed.
    /// This operation is typically invoked from the [`SubscriberListener::on_data_on_readers`] operation. That way the
    /// [`SubscriberListener`] can delegate to the [`DataReaderListener`] objects the handling of the data.
    pub fn notify_datareaders(&self) -> DdsResult<()> {
        match &self.0 {
            SubscriberKind::BuiltIn(_) => Err(DdsError::IllegalOperation),
            SubscriberKind::UserDefined(s) => s.upgrade()?.notify_datareaders(),
        }
    }

    /// This operation returns the [`DomainParticipant`] to which the [`Subscriber`] belongs.
    pub fn get_participant(&self) -> DdsResult<DomainParticipant> {
        let dp = THE_PARTICIPANT_FACTORY
            .lookup_participant_by_entity_handle(self.get_instance_handle()?);

        Ok(DomainParticipant::new(dp.downgrade()))
    }

    /// This operation allows access to the [`SampleLostStatus`].
    pub fn get_sample_lost_status(&self) -> DdsResult<SampleLostStatus> {
        match &self.0 {
            SubscriberKind::BuiltIn(_) => Err(DdsError::IllegalOperation),
            SubscriberKind::UserDefined(s) => s.upgrade()?.get_sample_lost_status(),
        }
    }

    /// This operation deletes all the entities that were created by means of the [`Subscriber::create_datareader`] operations.
    /// That is, it deletes all contained [`DataReader`] objects.
    /// he operation will return [`DdsError::PreconditionNotMet`](crate::infrastructure::error::DdsError) if the any of the
    /// contained entities is in a state where it cannot be deleted.
    /// Once this operation returns successfully, the application may delete the [`Subscriber`] knowing that it has no
    /// contained [`DataReader`] objects.
    pub fn delete_contained_entities(&self) -> DdsResult<()> {
        match &self.0 {
            SubscriberKind::BuiltIn(_) => Err(DdsError::IllegalOperation),
            SubscriberKind::UserDefined(s) => s.upgrade()?.delete_contained_entities(),
        }
    }

    /// This operation sets a default value of the [`DataReaderQos`] which will be used for newly created [`DataReader`] entities in
    /// the case where the qos policies are defaulted in the [`Subscriber::create_datareader`] operation.
    /// This operation will check that the resulting policies are self consistent; if they are not, the operation will have no effect and
    /// return [`DdsError::InconsistentPolicy`](crate::infrastructure::error::DdsError).
    /// The special value [`QosKind::Default`] may be passed to this operation to indicate that the default qos should be
    /// reset back to the initial values the factory would use, that is the default value of [`DataReaderQos`].
    pub fn set_default_datareader_qos(&self, qos: QosKind<DataReaderQos>) -> DdsResult<()> {
        match &self.0 {
            SubscriberKind::BuiltIn(_) => Err(DdsError::IllegalOperation),
            SubscriberKind::UserDefined(s) => s.upgrade()?.set_default_datareader_qos(qos),
        }
    }

    /// This operation retrieves the default value of the [`DataReaderQos`], that is, the qos policies which will be used for newly
    /// created [`DataReader`] entities in the case where the qos policies are defaulted in the [`Subscriber::create_datareader`] operation.
    /// The values retrieved by this operation will match the set of values specified on the last successful call to
    /// [`Subscriber::get_default_datareader_qos`], or else, if the call was never made, the default values of [`DataReaderQos`].
    pub fn get_default_datareader_qos(&self) -> DdsResult<DataReaderQos> {
        match &self.0 {
            SubscriberKind::BuiltIn(_) => Err(DdsError::IllegalOperation),
            SubscriberKind::UserDefined(s) => s.upgrade()?.get_default_datareader_qos(),
        }
    }

    /// This operation copies the policies in the `a_topic_qos` to the corresponding policies in the `a_datareader_qos`.
    /// This is a “convenience” operation most useful in combination with the operations [`Subscriber::get_default_datareader_qos`] and
    /// [`Topic::get_qos`]. This operation can be used to merge the [`DataReader`] default qos policies with the
    /// corresponding ones on the [`Topic`]. The resulting qos can then be used to create a new [`DataReader`], or set its qos.
    /// This operation does not check the resulting `a_datareader_qos` for consistency. This is because the merged `a_datareader_qos`
    /// may not be the final one, as the application can still modify some policies prior to applying the policies to the [`DataReader`].
    pub fn copy_from_topic_qos(
        a_datareader_qos: &mut DataReaderQos,
        a_topic_qos: &TopicQos,
    ) -> DdsResult<()> {
        UserDefinedSubscriber::copy_from_topic_qos(a_datareader_qos, a_topic_qos)
    }
}

impl Subscriber {
    /// This operation is used to set the QoS policies of the Entity and replacing the values of any policies previously set.
    /// Certain policies are “immutable;” they can only be set at Entity creation time, or before the entity is made enabled.
    /// If [`Self::set_qos()`] is invoked after the Entity is enabled and it attempts to change the value of an “immutable” policy, the operation will
    /// fail and returns [`DdsError::ImmutablePolicy`](crate::infrastructure::error::DdsError).
    /// Certain values of QoS policies can be incompatible with the settings of the other policies. This operation will also fail if it specifies
    /// a set of values that once combined with the existing values would result in an inconsistent set of policies. In this case,
    /// the return value is [`DdsError::InconsistentPolicy`](crate::infrastructure::error::DdsError).
    /// The existing set of policies are only changed if the [`Self::set_qos()`] operation succeeds. This is indicated by the [`Ok`] return value. In all
    /// other cases, none of the policies is modified.
    /// The parameter `qos` can be set to [`QosKind::Default`] to indicate that the QoS of the Entity should be changed to match the current default QoS set in the Entity’s factory.
    /// The operation [`Self::set_qos()`] cannot modify the immutable QoS so a successful return of the operation indicates that the mutable QoS for the Entity has been
    /// modified to match the current default for the Entity’s factory.
    pub fn set_qos(&self, qos: QosKind<SubscriberQos>) -> DdsResult<()> {
        match &self.0 {
            SubscriberKind::BuiltIn(_) => Err(DdsError::IllegalOperation),
            SubscriberKind::UserDefined(s) => s.upgrade()?.set_qos(qos),
        }
    }

    /// This operation allows access to the existing set of [`SubscriberQos`] policies.
    pub fn get_qos(&self) -> DdsResult<SubscriberQos> {
        Ok(match &self.0 {
            SubscriberKind::BuiltIn(s) => s.upgrade()?.get_qos(),
            SubscriberKind::UserDefined(s) => s.upgrade()?.get_qos(),
        })
    }

    /// This operation installs a Listener on the Entity. The listener will only be invoked on the changes of communication status
    /// indicated by the specified mask. It is permitted to use [`None`] as the value of the listener. The [`None`] listener behaves
    /// as a Listener whose operations perform no action.
    /// Only one listener can be attached to each Entity. If a listener was already set, the operation [`Self::set_listener()`] will replace it with the
    /// new one. Consequently if the value [`None`] is passed for the listener parameter to the [`Self::set_listener()`] operation, any existing listener
    /// will be removed.
    pub fn set_listener(
        &self,
        a_listener: Option<Box<dyn SubscriberListener>>,
        mask: &[StatusKind],
    ) -> DdsResult<()> {
        match &self.0 {
            SubscriberKind::BuiltIn(_) => Err(DdsError::IllegalOperation),
            SubscriberKind::UserDefined(s) => s.upgrade()?.set_listener(a_listener, mask),
        }
    }

    /// This operation allows access to the existing Listener attached to the Entity.
    pub fn get_listener(&self) -> DdsResult<Option<Box<dyn SubscriberListener>>> {
        match &self.0 {
            SubscriberKind::BuiltIn(s) => s.upgrade()?.get_listener(),
            SubscriberKind::UserDefined(s) => s.upgrade()?.get_listener(),
        }
    }

    /// This operation allows access to the [`StatusCondition`] associated with the Entity. The returned
    /// condition can then be added to a [`WaitSet`](crate::infrastructure::wait_set::WaitSet) so that the application can wait for specific status changes
    /// that affect the Entity.
    pub fn get_statuscondition(&self) -> DdsResult<StatusCondition> {
        match &self.0 {
            SubscriberKind::BuiltIn(s) => s.upgrade()?.get_statuscondition(),
            SubscriberKind::UserDefined(s) => s.upgrade()?.get_statuscondition(),
        }
    }

    /// This operation retrieves the list of communication statuses in the Entity that are ‘triggered.’ That is, the list of statuses whose
    /// value has changed since the last time the application read the status.
    /// When the entity is first created or if the entity is not enabled, all communication statuses are in the “untriggered” state so the
    /// list returned by the [`Self::get_status_changes`] operation will be empty.
    /// The list of statuses returned by the [`Self::get_status_changes`] operation refers to the status that are triggered on the Entity itself
    /// and does not include statuses that apply to contained entities.
    pub fn get_status_changes(&self) -> DdsResult<Vec<StatusKind>> {
        match &self.0 {
            SubscriberKind::BuiltIn(s) => s.upgrade()?.get_status_changes(),
            SubscriberKind::UserDefined(s) => s.upgrade()?.get_status_changes(),
        }
    }

    /// This operation enables the Entity. Entity objects can be created either enabled or disabled. This is controlled by the value of
    /// the [`EntityFactoryQosPolicy`](crate::infrastructure::qos_policy::EntityFactoryQosPolicy) on the corresponding factory for the Entity.
    /// The default setting of [`EntityFactoryQosPolicy`](crate::infrastructure::qos_policy::EntityFactoryQosPolicy) is such that, by default, it is not necessary to explicitly call enable on newly
    /// created entities.
    /// The [`Self::enable()`] operation is idempotent. Calling [`Self::enable()`] on an already enabled Entity returns [`Ok`] and has no effect.
    /// If an Entity has not yet been enabled, the following kinds of operations may be invoked on it:
    /// - Operations to set or get an Entity’s QoS policies (including default QoS policies) and listener
    /// - [`Self::get_statuscondition()`]
    /// - Factory and lookup operations
    /// - [`Self::get_status_changes()`] and other get status operations (although the status of a disabled entity never changes)
    /// Other operations may explicitly state that they may be called on disabled entities; those that do not will return the error
    /// NotEnabled.
    /// It is legal to delete an Entity that has not been enabled by calling the proper operation on its factory.
    /// Entities created from a factory that is disabled, are created disabled regardless of the setting of the
    /// [`EntityFactoryQosPolicy`](crate::infrastructure::qos_policy::EntityFactoryQosPolicy).
    /// Calling enable on an Entity whose factory is not enabled will fail and return [`DdsError::PreconditionNotMet`](crate::infrastructure::error::DdsError).
    /// If the `autoenable_created_entities` field of [`EntityFactoryQosPolicy`](crate::infrastructure::qos_policy::EntityFactoryQosPolicy) is set to [`true`], the [`Self::enable()`] operation on the factory will
    /// automatically enable all entities created from the factory.
    /// The Listeners associated with an entity are not called until the entity is enabled. Conditions associated with an entity that is not
    /// enabled are “inactive”, that is, the operation [`StatusCondition::get_trigger_value()`] will always return `false`.
    pub fn enable(&self) -> DdsResult<()> {
        match &self.0 {
            SubscriberKind::BuiltIn(_) => Err(DdsError::IllegalOperation),
            SubscriberKind::UserDefined(s) => s.upgrade()?.enable(
                &THE_PARTICIPANT_FACTORY
                    .lookup_participant_by_entity_handle(self.get_instance_handle()?),
            ),
        }
    }

    /// This operation returns the [`InstanceHandle`] that represents the Entity.
    pub fn get_instance_handle(&self) -> DdsResult<InstanceHandle> {
        Ok(match &self.0 {
            SubscriberKind::BuiltIn(s) => s.upgrade()?.get_instance_handle(),
            SubscriberKind::UserDefined(s) => s.upgrade()?.get_instance_handle(),
        })
    }
}